From d90e01532ff69ca7032797bd7630b541440cbb07 Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 22 Mar 2024 10:27:17 +0800 Subject: [PATCH] enhance: Release level zero segments when channel unsub (#31486) Related to #27349 See also #30816 Level zero is not allowed to balance among delegators, they shall always serve current delegator. This PR releases all level zero segments after channel is unsubscribed and preventing level zero segment blocking graceful stop. Signed-off-by: Congqi Xia --- internal/querynodev2/services.go | 2 ++ internal/querynodev2/services_test.go | 20 ++++++++++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 704f50ded9..d803a6ce4e 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" @@ -367,6 +368,7 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC node.pipelineManager.Remove(req.GetChannelName()) node.manager.Segment.RemoveBy(segments.WithChannel(req.GetChannelName()), segments.WithType(segments.SegmentTypeGrowing)) + node.manager.Segment.RemoveBy(segments.WithChannel(req.GetChannelName()), segments.WithLevel(datapb.SegmentLevel_L0)) node.tSafeManager.Remove(ctx, req.GetChannelName()) node.manager.Collection.Unref(req.GetCollectionID(), 1) diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 28f29fdb09..8e48598123 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -45,6 +45,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/streamrpc" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/conc" @@ -465,6 +466,18 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Normal() { // prepate suite.TestWatchDmChannelsInt64() + l0Segment := segments.NewMockSegment(suite.T()) + l0Segment.EXPECT().ID().Return(10000) + l0Segment.EXPECT().Collection().Return(suite.collectionID) + l0Segment.EXPECT().Partition().Return(common.AllPartitionsID) + l0Segment.EXPECT().Level().Return(datapb.SegmentLevel_L0) + l0Segment.EXPECT().Type().Return(commonpb.SegmentState_Sealed) + l0Segment.EXPECT().Indexes().Return(nil) + l0Segment.EXPECT().Shard().Return(suite.vchannel) + l0Segment.EXPECT().Release().Return() + + suite.node.manager.Segment.Put(segments.SegmentTypeSealed, l0Segment) + // data req := &querypb.UnsubDmChannelRequest{ Base: &commonpb.MsgBase{ @@ -478,8 +491,11 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Normal() { } status, err := suite.node.UnsubDmChannel(ctx, req) - suite.NoError(err) - suite.Equal(commonpb.ErrorCode_Success, status.GetErrorCode()) + suite.NoError(merr.CheckRPCCall(status, err)) + + suite.Len(suite.node.manager.Segment.GetBy( + segments.WithChannel(suite.vchannel), + segments.WithLevel(datapb.SegmentLevel_L0)), 0) } func (suite *ServiceSuite) TestUnsubDmChannels_Failed() {