fix: [Cherry-pick] Release loaded growing if WatchDmlChannel fail (#30735) (#30745)

Cherry pick from master
pr: #30735
See also #30734

---------

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2024-02-22 16:44:55 +08:00 committed by GitHub
parent 31f33f67e0
commit 3d8b6a4d2e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 48 additions and 5 deletions

View File

@ -193,7 +193,7 @@ func (node *QueryNode) composeIndexMeta(indexInfos []*indexpb.IndexInfo, schema
}
// WatchDmChannels create consumers on dmChannels to receive Incremental datawhich is the important part of real-time query
func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) {
func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChannelsRequest) (status *commonpb.Status, e error) {
channel := req.GetInfos()[0]
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
@ -251,6 +251,12 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(),
node.composeIndexMeta(req.GetIndexInfoList(), req.Schema), req.GetLoadMeta())
defer func() {
if !merr.Ok(status) {
node.manager.Collection.Unref(req.GetCollectionID(), 1)
}
}()
collection := node.manager.Collection.Get(req.GetCollectionID())
collection.SetMetricType(req.GetLoadMeta().GetMetricType())
@ -316,6 +322,13 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
pipeline.ExcludedSegments(droppedInfos)
}
defer func() {
if err != nil {
// remove legacy growing
node.manager.Segment.RemoveBy(segments.WithChannel(channel.GetChannelName()),
segments.WithType(segments.SegmentTypeGrowing))
}
}()
err = loadGrowingSegments(ctx, delegator, req)
if err != nil {
msg := "failed to load growing segments"

View File

@ -361,6 +361,21 @@ func (suite *ServiceSuite) TestWatchDmChannels_Failed() {
ctx := context.Background()
// data
schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64)
infos := suite.genSegmentLoadInfos(schema)
segmentInfos := lo.SliceToMap(infos, func(info *querypb.SegmentLoadInfo) (int64, *datapb.SegmentInfo) {
return info.SegmentID, &datapb.SegmentInfo{
ID: info.SegmentID,
CollectionID: info.CollectionID,
PartitionID: info.PartitionID,
InsertChannel: info.InsertChannel,
Binlogs: info.BinlogPaths,
Statslogs: info.Statslogs,
Deltalogs: info.Deltalogs,
}
})
req := &querypb.WatchDmChannelsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchDmChannels,
@ -383,9 +398,8 @@ func (suite *ServiceSuite) TestWatchDmChannels_Failed() {
LoadMeta: &querypb.LoadMetaInfo{
MetricType: defaultMetricType,
},
IndexInfoList: []*indexpb.IndexInfo{
{},
},
SegmentInfos: segmentInfos,
IndexInfoList: []*indexpb.IndexInfo{{}},
}
// test channel is unsubscribing
@ -399,12 +413,28 @@ func (suite *ServiceSuite) TestWatchDmChannels_Failed() {
suite.factory.EXPECT().NewTtMsgStream(mock.Anything).Return(suite.msgStream, nil)
suite.msgStream.EXPECT().AsConsumer(mock.Anything, []string{suite.pchannel}, mock.Anything, mock.Anything).Return(nil)
suite.msgStream.EXPECT().Close().Return()
suite.msgStream.EXPECT().Seek(mock.Anything, mock.Anything).Return(errors.New("mock error"))
suite.msgStream.EXPECT().Seek(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once()
status, err = suite.node.WatchDmChannels(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
// load growing failed
badSegmentReq := typeutil.Clone(req)
for _, info := range badSegmentReq.SegmentInfos {
for _, fbl := range info.Binlogs {
for _, binlog := range fbl.Binlogs {
binlog.LogPath += "bad_suffix"
}
}
}
for _, channel := range badSegmentReq.Infos {
channel.UnflushedSegmentIds = lo.Keys(badSegmentReq.SegmentInfos)
}
status, err = suite.node.WatchDmChannels(ctx, badSegmentReq)
err = merr.CheckRPCCall(status, err)
suite.Error(err)
// empty index
req.IndexInfoList = nil
status, err = suite.node.WatchDmChannels(ctx, req)