From 289df1cc3ddeef5a1b08cc19fa84f1da2d3f0298 Mon Sep 17 00:00:00 2001 From: SimFG Date: Mon, 2 Jan 2023 16:39:32 +0800 Subject: [PATCH] Make the `UnsubDmChannel` rpc unsuccessful when the releaseCollectionTask fails (#21469) Signed-off-by: SimFG --- internal/querynode/impl.go | 24 ++++++++--------- internal/querynode/impl_test.go | 48 +++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 12 deletions(-) diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index db24e62c03..e80cc6567f 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -433,19 +433,19 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC } log.Info("unsubDmChannel(ReleaseCollection) enqueue done", zap.Int64("collectionID", req.GetCollectionID())) - func() { - err = dct.WaitToFinish() - if err != nil { - log.Warn("failed to do subscribe channel task successfully", zap.Error(err)) - return - } - log.Info("unsubDmChannel(ReleaseCollection) WaitToFinish done", zap.Int64("collectionID", req.GetCollectionID())) - }() - - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, + err = dct.WaitToFinish() + if err != nil { + log.Warn("failed to do subscribe channel task successfully", zap.Error(err)) + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: err.Error(), + }, nil } - return status, nil + + log.Info("unsubDmChannel(ReleaseCollection) WaitToFinish done", zap.Int64("collectionID", req.GetCollectionID())) + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, nil } // LoadSegments load historical data into query node, historical data can be vector data or index diff --git a/internal/querynode/impl_test.go b/internal/querynode/impl_test.go index ba94216c0d..efb6ba031f 100644 --- a/internal/querynode/impl_test.go +++ b/internal/querynode/impl_test.go @@ -212,6 +212,54 @@ func TestImpl_UnsubDmChannel(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) + t.Run("normal run", func(t *testing.T) { + schema := genTestCollectionSchema() + req := &queryPb.WatchDmChannelsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_WatchDmChannels, + MsgID: rand.Int63(), + TargetID: node.session.ServerID, + }, + NodeID: 0, + CollectionID: defaultCollectionID, + PartitionIDs: []UniqueID{defaultPartitionID}, + Schema: schema, + Infos: []*datapb.VchannelInfo{ + { + CollectionID: 1000, + ChannelName: "1000-dmc0", + }, + }, + } + + status, err := node.WatchDmChannels(ctx, req) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) + + { + req := &queryPb.UnsubDmChannelRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_UnsubDmChannel, + MsgID: rand.Int63(), + TargetID: node.session.ServerID, + }, + NodeID: 0, + CollectionID: defaultCollectionID, + ChannelName: "1000-dmc0", + } + originMetaReplica := node.metaReplica + node.metaReplica = newMockReplicaInterface() + status, err := node.UnsubDmChannel(ctx, req) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode) + + node.metaReplica = originMetaReplica + status, err = node.UnsubDmChannel(ctx, req) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) + } + }) + t.Run("target not match", func(t *testing.T) { req := &queryPb.UnsubDmChannelRequest{ Base: &commonpb.MsgBase{