diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 704f50ded9..d803a6ce4e 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" @@ -367,6 +368,7 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC node.pipelineManager.Remove(req.GetChannelName()) node.manager.Segment.RemoveBy(segments.WithChannel(req.GetChannelName()), segments.WithType(segments.SegmentTypeGrowing)) + node.manager.Segment.RemoveBy(segments.WithChannel(req.GetChannelName()), segments.WithLevel(datapb.SegmentLevel_L0)) node.tSafeManager.Remove(ctx, req.GetChannelName()) node.manager.Collection.Unref(req.GetCollectionID(), 1) diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 28f29fdb09..8e48598123 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -45,6 +45,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/streamrpc" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/conc" @@ -465,6 +466,18 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Normal() { // prepate suite.TestWatchDmChannelsInt64() + l0Segment := segments.NewMockSegment(suite.T()) + l0Segment.EXPECT().ID().Return(10000) + l0Segment.EXPECT().Collection().Return(suite.collectionID) + l0Segment.EXPECT().Partition().Return(common.AllPartitionsID) + l0Segment.EXPECT().Level().Return(datapb.SegmentLevel_L0) + l0Segment.EXPECT().Type().Return(commonpb.SegmentState_Sealed) + l0Segment.EXPECT().Indexes().Return(nil) + l0Segment.EXPECT().Shard().Return(suite.vchannel) + l0Segment.EXPECT().Release().Return() + + suite.node.manager.Segment.Put(segments.SegmentTypeSealed, l0Segment) + // data req := &querypb.UnsubDmChannelRequest{ Base: &commonpb.MsgBase{ @@ -478,8 +491,11 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Normal() { } status, err := suite.node.UnsubDmChannel(ctx, req) - suite.NoError(err) - suite.Equal(commonpb.ErrorCode_Success, status.GetErrorCode()) + suite.NoError(merr.CheckRPCCall(status, err)) + + suite.Len(suite.node.manager.Segment.GetBy( + segments.WithChannel(suite.vchannel), + segments.WithLevel(datapb.SegmentLevel_L0)), 0) } func (suite *ServiceSuite) TestUnsubDmChannels_Failed() {