From b0c7f8653faf203ad7d50db6966eaeb99e582f66 Mon Sep 17 00:00:00 2001 From: wei liu Date: Fri, 1 Mar 2024 14:21:10 +0800 Subject: [PATCH] fix: Segment version doesn't update as expected (#30953) issue: #30950 pr: #30951 due to segment version doesn't update as expected. This PR will update segment version until segment become loaded Signed-off-by: Wei Liu --- .../querynodev2/segments/segment_loader.go | 19 ++++++++++--------- .../segments/segment_loader_test.go | 14 ++++++++------ 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 71b92442ab..6b60a7e3f8 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -183,7 +183,7 @@ func (loader *segmentLoader) Load(ctx context.Context, return nil, nil } // Filter out loaded & loading segments - infos := loader.prepare(segmentType, version, segments...) + infos := loader.prepare(segmentType, segments...) defer loader.unregister(infos...) log.With( @@ -292,7 +292,8 @@ func (loader *segmentLoader) Load(ctx context.Context, } // Wait for all segments loaded - if err := loader.waitSegmentLoadDone(ctx, segmentType, lo.Map(segments, func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })...); err != nil { + segmentIDs := lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() }) + if err := loader.waitSegmentLoadDone(ctx, segmentType, segmentIDs, version); err != nil { log.Warn("failed to wait the filtered out segments load done", zap.Error(err)) return nil, err } @@ -306,7 +307,7 @@ func (loader *segmentLoader) Load(ctx context.Context, return result, nil } -func (loader *segmentLoader) prepare(segmentType SegmentType, version int64, segments ...*querypb.SegmentLoadInfo) []*querypb.SegmentLoadInfo { +func (loader *segmentLoader) prepare(segmentType SegmentType, segments ...*querypb.SegmentLoadInfo) []*querypb.SegmentLoadInfo { loader.mut.Lock() defer loader.mut.Unlock() @@ -319,9 +320,6 @@ func (loader *segmentLoader) prepare(segmentType SegmentType, version int64, seg infos = append(infos, segment) loader.loadingSegments.Insert(segment.GetSegmentID(), newLoadResult()) } else { - // try to update segment version before skip load operation - loader.manager.Segment.UpdateSegmentBy(IncreaseVersion(version), - WithType(segmentType), WithID(segment.SegmentID)) log.Info("skip loaded/loading segment", zap.Int64("segmentID", segment.GetSegmentID()), zap.Bool("isLoaded", len(loader.manager.Segment.GetBy(WithType(segmentType), WithID(segment.GetSegmentID()))) > 0), zap.Bool("isLoading", loader.loadingSegments.Contain(segment.GetSegmentID())), @@ -427,7 +425,7 @@ func (loader *segmentLoader) freeRequest(resource LoadResource) { loader.committedResource.Sub(resource) } -func (loader *segmentLoader) waitSegmentLoadDone(ctx context.Context, segmentType SegmentType, segmentIDs ...int64) error { +func (loader *segmentLoader) waitSegmentLoadDone(ctx context.Context, segmentType SegmentType, segmentIDs []int64, version int64) error { log := log.Ctx(ctx).With( zap.String("segmentType", segmentType.String()), zap.Int64s("segmentIDs", segmentIDs), @@ -470,6 +468,9 @@ func (loader *segmentLoader) waitSegmentLoadDone(ctx context.Context, segmentTyp return merr.WrapErrSegmentLack(segmentID, "failed to wait segment loaded") } + // try to update segment version after wait segment loaded + loader.manager.Segment.UpdateSegmentBy(IncreaseVersion(version), + WithType(segmentType), WithID(segmentID)) log.Info("segment loaded...", zap.Int64("segmentID", segmentID)) } return nil @@ -1096,7 +1097,7 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegmen // Filter out LOADING segments only // use None to avoid loaded check - infos := loader.prepare(commonpb.SegmentState_SegmentStateNone, version, loadInfo) + infos := loader.prepare(commonpb.SegmentState_SegmentStateNone, loadInfo) defer loader.unregister(infos...) indexInfo := lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *querypb.SegmentLoadInfo { @@ -1142,7 +1143,7 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegmen loader.notifyLoadFinish(loadInfo) } - return loader.waitSegmentLoadDone(ctx, commonpb.SegmentState_SegmentStateNone, loadInfo.GetSegmentID()) + return loader.waitSegmentLoadDone(ctx, commonpb.SegmentState_SegmentStateNone, []int64{loadInfo.GetSegmentID()}, version) } func getBinlogDataSize(fieldBinlog *datapb.FieldBinlog) int64 { diff --git a/internal/querynodev2/segments/segment_loader_test.go b/internal/querynodev2/segments/segment_loader_test.go index 9d94a3eaf7..96dff5601e 100644 --- a/internal/querynodev2/segments/segment_loader_test.go +++ b/internal/querynodev2/segments/segment_loader_test.go @@ -696,14 +696,15 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() { } return nil }) - infos = suite.loader.prepare(SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{ + suite.segmentManager.EXPECT().UpdateSegmentBy(mock.Anything, mock.Anything, mock.Anything).Return(0) + infos = suite.loader.prepare(SegmentTypeSealed, &querypb.SegmentLoadInfo{ SegmentID: suite.segmentID, PartitionID: suite.partitionID, CollectionID: suite.collectionID, NumOfRows: 100, }) - err := suite.loader.waitSegmentLoadDone(context.Background(), SegmentTypeSealed, suite.segmentID) + err := suite.loader.waitSegmentLoadDone(context.Background(), SegmentTypeSealed, []int64{suite.segmentID}, 0) suite.NoError(err) }) @@ -724,14 +725,15 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() { return nil }) - infos = suite.loader.prepare(SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{ + + infos = suite.loader.prepare(SegmentTypeSealed, &querypb.SegmentLoadInfo{ SegmentID: suite.segmentID, PartitionID: suite.partitionID, CollectionID: suite.collectionID, NumOfRows: 100, }) - err := suite.loader.waitSegmentLoadDone(context.Background(), SegmentTypeSealed, suite.segmentID) + err := suite.loader.waitSegmentLoadDone(context.Background(), SegmentTypeSealed, []int64{suite.segmentID}, 0) suite.Error(err) }) @@ -742,7 +744,7 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() { suite.segmentManager.EXPECT().GetWithType(suite.segmentID, SegmentTypeSealed).RunAndReturn(func(segmentID int64, segmentType commonpb.SegmentState) Segment { return nil }) - suite.loader.prepare(SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{ + suite.loader.prepare(SegmentTypeSealed, &querypb.SegmentLoadInfo{ SegmentID: suite.segmentID, PartitionID: suite.partitionID, CollectionID: suite.collectionID, @@ -752,7 +754,7 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() { ctx, cancel := context.WithCancel(context.Background()) cancel() - err := suite.loader.waitSegmentLoadDone(ctx, SegmentTypeSealed, suite.segmentID) + err := suite.loader.waitSegmentLoadDone(ctx, SegmentTypeSealed, []int64{suite.segmentID}, 0) suite.Error(err) suite.True(merr.IsCanceledOrTimeout(err)) })