From f3b7a8892ff2b52ca696017048fca70f6f01736f Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 21 Feb 2024 22:12:53 +0800 Subject: [PATCH] fix: Release loaded growing if WatchDmlChannel fail (#30735) See also #30734 --------- Signed-off-by: Congqi Xia --- internal/querynodev2/services.go | 10 ++++++- internal/querynodev2/services_test.go | 39 +++++++++++++++++++++++++-- 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 7a30455ed1..77b57dfa89 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -195,7 +195,7 @@ func (node *QueryNode) composeIndexMeta(indexInfos []*indexpb.IndexInfo, schema } // WatchDmChannels create consumers on dmChannels to receive Incremental data,which is the important part of real-time query -func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) { +func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChannelsRequest) (status *commonpb.Status, e error) { channel := req.GetInfos()[0] log := log.Ctx(ctx).With( zap.Int64("collectionID", req.GetCollectionID()), @@ -246,6 +246,11 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(), node.composeIndexMeta(req.GetIndexInfoList(), req.Schema), req.GetLoadMeta()) + defer func() { + if !merr.Ok(status) { + node.manager.Collection.Unref(req.GetCollectionID(), 1) + } + }() delegator, err := delegator.NewShardDelegator( ctx, @@ -316,6 +321,9 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm } err = loadGrowingSegments(ctx, delegator, req) if err != nil { + // remove legacy growing + node.manager.Segment.RemoveBy(segments.WithChannel(channel.GetChannelName()), + segments.WithType(segments.SegmentTypeGrowing)) msg := "failed to load growing segments" log.Warn(msg, zap.Error(err)) return merr.Status(err), nil diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 308d729433..f3969bdd07 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -324,6 +324,7 @@ func (suite *ServiceSuite) TestWatchDmChannelsVarchar() { // data schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_VarChar) + req := &querypb.WatchDmChannelsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_WatchDmChannels, @@ -375,6 +376,23 @@ func (suite *ServiceSuite) TestWatchDmChannels_Failed() { // data schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64) + + indexInfos := segments.GenTestIndexInfoList(suite.collectionID, schema) + + infos := suite.genSegmentLoadInfos(schema, indexInfos) + segmentInfos := lo.SliceToMap(infos, func(info *querypb.SegmentLoadInfo) (int64, *datapb.SegmentInfo) { + return info.SegmentID, &datapb.SegmentInfo{ + ID: info.SegmentID, + CollectionID: info.CollectionID, + PartitionID: info.PartitionID, + InsertChannel: info.InsertChannel, + Binlogs: info.BinlogPaths, + Statslogs: info.Statslogs, + Deltalogs: info.Deltalogs, + Level: info.Level, + } + }) + req := &querypb.WatchDmChannelsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_WatchDmChannels, @@ -397,7 +415,8 @@ func (suite *ServiceSuite) TestWatchDmChannels_Failed() { LoadMeta: &querypb.LoadMetaInfo{ MetricType: defaultMetricType, }, - IndexInfoList: segments.GenTestIndexInfoList(suite.collectionID, schema), + SegmentInfos: segmentInfos, + IndexInfoList: indexInfos, } // test channel is unsubscribing @@ -411,12 +430,28 @@ func (suite *ServiceSuite) TestWatchDmChannels_Failed() { suite.factory.EXPECT().NewTtMsgStream(mock.Anything).Return(suite.msgStream, nil) suite.msgStream.EXPECT().AsConsumer(mock.Anything, []string{suite.pchannel}, mock.Anything, mock.Anything).Return(nil) suite.msgStream.EXPECT().Close().Return() - suite.msgStream.EXPECT().Seek(mock.Anything, mock.Anything).Return(errors.New("mock error")) + suite.msgStream.EXPECT().Seek(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once() status, err = suite.node.WatchDmChannels(ctx, req) suite.NoError(err) suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) + // load growing failed + badSegmentReq := typeutil.Clone(req) + for _, info := range badSegmentReq.SegmentInfos { + for _, fbl := range info.Binlogs { + for _, binlog := range fbl.Binlogs { + binlog.LogPath += "bad_suffix" + } + } + } + for _, channel := range badSegmentReq.Infos { + channel.UnflushedSegmentIds = lo.Keys(badSegmentReq.SegmentInfos) + } + status, err = suite.node.WatchDmChannels(ctx, badSegmentReq) + err = merr.CheckRPCCall(status, err) + suite.Error(err) + // empty index req.IndexInfoList = nil status, err = suite.node.WatchDmChannels(ctx, req)