mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: Segment version doesn't update as expected (#30953)
issue: #30950 pr: #30951 due to segment version doesn't update as expected. This PR will update segment version until segment become loaded Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
3ed73c54bd
commit
b0c7f8653f
@ -183,7 +183,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
|
||||
return nil, nil
|
||||
}
|
||||
// Filter out loaded & loading segments
|
||||
infos := loader.prepare(segmentType, version, segments...)
|
||||
infos := loader.prepare(segmentType, segments...)
|
||||
defer loader.unregister(infos...)
|
||||
|
||||
log.With(
|
||||
@ -292,7 +292,8 @@ func (loader *segmentLoader) Load(ctx context.Context,
|
||||
}
|
||||
|
||||
// Wait for all segments loaded
|
||||
if err := loader.waitSegmentLoadDone(ctx, segmentType, lo.Map(segments, func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })...); err != nil {
|
||||
segmentIDs := lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })
|
||||
if err := loader.waitSegmentLoadDone(ctx, segmentType, segmentIDs, version); err != nil {
|
||||
log.Warn("failed to wait the filtered out segments load done", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
@ -306,7 +307,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (loader *segmentLoader) prepare(segmentType SegmentType, version int64, segments ...*querypb.SegmentLoadInfo) []*querypb.SegmentLoadInfo {
|
||||
func (loader *segmentLoader) prepare(segmentType SegmentType, segments ...*querypb.SegmentLoadInfo) []*querypb.SegmentLoadInfo {
|
||||
loader.mut.Lock()
|
||||
defer loader.mut.Unlock()
|
||||
|
||||
@ -319,9 +320,6 @@ func (loader *segmentLoader) prepare(segmentType SegmentType, version int64, seg
|
||||
infos = append(infos, segment)
|
||||
loader.loadingSegments.Insert(segment.GetSegmentID(), newLoadResult())
|
||||
} else {
|
||||
// try to update segment version before skip load operation
|
||||
loader.manager.Segment.UpdateSegmentBy(IncreaseVersion(version),
|
||||
WithType(segmentType), WithID(segment.SegmentID))
|
||||
log.Info("skip loaded/loading segment", zap.Int64("segmentID", segment.GetSegmentID()),
|
||||
zap.Bool("isLoaded", len(loader.manager.Segment.GetBy(WithType(segmentType), WithID(segment.GetSegmentID()))) > 0),
|
||||
zap.Bool("isLoading", loader.loadingSegments.Contain(segment.GetSegmentID())),
|
||||
@ -427,7 +425,7 @@ func (loader *segmentLoader) freeRequest(resource LoadResource) {
|
||||
loader.committedResource.Sub(resource)
|
||||
}
|
||||
|
||||
func (loader *segmentLoader) waitSegmentLoadDone(ctx context.Context, segmentType SegmentType, segmentIDs ...int64) error {
|
||||
func (loader *segmentLoader) waitSegmentLoadDone(ctx context.Context, segmentType SegmentType, segmentIDs []int64, version int64) error {
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.String("segmentType", segmentType.String()),
|
||||
zap.Int64s("segmentIDs", segmentIDs),
|
||||
@ -470,6 +468,9 @@ func (loader *segmentLoader) waitSegmentLoadDone(ctx context.Context, segmentTyp
|
||||
return merr.WrapErrSegmentLack(segmentID, "failed to wait segment loaded")
|
||||
}
|
||||
|
||||
// try to update segment version after wait segment loaded
|
||||
loader.manager.Segment.UpdateSegmentBy(IncreaseVersion(version),
|
||||
WithType(segmentType), WithID(segmentID))
|
||||
log.Info("segment loaded...", zap.Int64("segmentID", segmentID))
|
||||
}
|
||||
return nil
|
||||
@ -1096,7 +1097,7 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegmen
|
||||
|
||||
// Filter out LOADING segments only
|
||||
// use None to avoid loaded check
|
||||
infos := loader.prepare(commonpb.SegmentState_SegmentStateNone, version, loadInfo)
|
||||
infos := loader.prepare(commonpb.SegmentState_SegmentStateNone, loadInfo)
|
||||
defer loader.unregister(infos...)
|
||||
|
||||
indexInfo := lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *querypb.SegmentLoadInfo {
|
||||
@ -1142,7 +1143,7 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context, segment *LocalSegmen
|
||||
loader.notifyLoadFinish(loadInfo)
|
||||
}
|
||||
|
||||
return loader.waitSegmentLoadDone(ctx, commonpb.SegmentState_SegmentStateNone, loadInfo.GetSegmentID())
|
||||
return loader.waitSegmentLoadDone(ctx, commonpb.SegmentState_SegmentStateNone, []int64{loadInfo.GetSegmentID()}, version)
|
||||
}
|
||||
|
||||
func getBinlogDataSize(fieldBinlog *datapb.FieldBinlog) int64 {
|
||||
|
||||
@ -696,14 +696,15 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() {
|
||||
}
|
||||
return nil
|
||||
})
|
||||
infos = suite.loader.prepare(SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{
|
||||
suite.segmentManager.EXPECT().UpdateSegmentBy(mock.Anything, mock.Anything, mock.Anything).Return(0)
|
||||
infos = suite.loader.prepare(SegmentTypeSealed, &querypb.SegmentLoadInfo{
|
||||
SegmentID: suite.segmentID,
|
||||
PartitionID: suite.partitionID,
|
||||
CollectionID: suite.collectionID,
|
||||
NumOfRows: 100,
|
||||
})
|
||||
|
||||
err := suite.loader.waitSegmentLoadDone(context.Background(), SegmentTypeSealed, suite.segmentID)
|
||||
err := suite.loader.waitSegmentLoadDone(context.Background(), SegmentTypeSealed, []int64{suite.segmentID}, 0)
|
||||
suite.NoError(err)
|
||||
})
|
||||
|
||||
@ -724,14 +725,15 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() {
|
||||
|
||||
return nil
|
||||
})
|
||||
infos = suite.loader.prepare(SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{
|
||||
|
||||
infos = suite.loader.prepare(SegmentTypeSealed, &querypb.SegmentLoadInfo{
|
||||
SegmentID: suite.segmentID,
|
||||
PartitionID: suite.partitionID,
|
||||
CollectionID: suite.collectionID,
|
||||
NumOfRows: 100,
|
||||
})
|
||||
|
||||
err := suite.loader.waitSegmentLoadDone(context.Background(), SegmentTypeSealed, suite.segmentID)
|
||||
err := suite.loader.waitSegmentLoadDone(context.Background(), SegmentTypeSealed, []int64{suite.segmentID}, 0)
|
||||
suite.Error(err)
|
||||
})
|
||||
|
||||
@ -742,7 +744,7 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() {
|
||||
suite.segmentManager.EXPECT().GetWithType(suite.segmentID, SegmentTypeSealed).RunAndReturn(func(segmentID int64, segmentType commonpb.SegmentState) Segment {
|
||||
return nil
|
||||
})
|
||||
suite.loader.prepare(SegmentTypeSealed, 0, &querypb.SegmentLoadInfo{
|
||||
suite.loader.prepare(SegmentTypeSealed, &querypb.SegmentLoadInfo{
|
||||
SegmentID: suite.segmentID,
|
||||
PartitionID: suite.partitionID,
|
||||
CollectionID: suite.collectionID,
|
||||
@ -752,7 +754,7 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
err := suite.loader.waitSegmentLoadDone(ctx, SegmentTypeSealed, suite.segmentID)
|
||||
err := suite.loader.waitSegmentLoadDone(ctx, SegmentTypeSealed, []int64{suite.segmentID}, 0)
|
||||
suite.Error(err)
|
||||
suite.True(merr.IsCanceledOrTimeout(err))
|
||||
})
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user