mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: [2.5]L0 brings its own start pos when syncing (#40664)
See also: #40388, #40207 pr: #40663 --------- Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
cff0e82f57
commit
5ca2af8124
@ -64,13 +64,22 @@ func (b *brokerMetaWriter) UpdateSync(ctx context.Context, pack *SyncTask) error
|
||||
Position: pack.checkpoint,
|
||||
})
|
||||
|
||||
startPos := lo.Map(pack.metacache.GetSegmentsBy(metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Sealed, commonpb.SegmentState_Flushing),
|
||||
metacache.WithStartPosNotRecorded()), func(info *metacache.SegmentInfo, _ int) *datapb.SegmentStartPosition {
|
||||
return &datapb.SegmentStartPosition{
|
||||
SegmentID: info.SegmentID(),
|
||||
StartPosition: info.StartPosition(),
|
||||
}
|
||||
})
|
||||
// Get not reported L1's start positions
|
||||
startPos := lo.Map(pack.metacache.GetSegmentsBy(
|
||||
metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Sealed, commonpb.SegmentState_Flushing),
|
||||
metacache.WithLevel(datapb.SegmentLevel_L1), metacache.WithStartPosNotRecorded()),
|
||||
func(info *metacache.SegmentInfo, _ int) *datapb.SegmentStartPosition {
|
||||
return &datapb.SegmentStartPosition{
|
||||
SegmentID: info.SegmentID(),
|
||||
StartPosition: info.StartPosition(),
|
||||
}
|
||||
})
|
||||
|
||||
// L0 brings its own start position
|
||||
if segment.Level() == datapb.SegmentLevel_L0 {
|
||||
startPos = append(startPos, &datapb.SegmentStartPosition{SegmentID: pack.segmentID, StartPosition: pack.StartPosition()})
|
||||
}
|
||||
|
||||
getBinlogNum := func(fBinlog *datapb.FieldBinlog) int { return len(fBinlog.GetBinlogs()) }
|
||||
log.Info("SaveBinlogPath",
|
||||
zap.Int64("SegmentID", pack.segmentID),
|
||||
|
||||
@ -43,7 +43,7 @@ func (s *MetaWriterSuite) TestNormalSave() {
|
||||
bfs := pkoracle.NewBloomFilterSet()
|
||||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs, nil)
|
||||
metacache.UpdateNumOfRows(1000)(seg)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true)
|
||||
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
|
||||
task := NewSyncTask()
|
||||
@ -61,7 +61,7 @@ func (s *MetaWriterSuite) TestReturnError() {
|
||||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs, nil)
|
||||
metacache.UpdateNumOfRows(1000)(seg)
|
||||
s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
task := NewSyncTask()
|
||||
task.WithMetaCache(s.metacache)
|
||||
err := s.writer.UpdateSync(ctx, task)
|
||||
|
||||
@ -160,7 +160,7 @@ func (s *SyncManagerSuite) TestSubmit() {
|
||||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs, nil)
|
||||
metacache.UpdateNumOfRows(1000)(seg)
|
||||
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
|
||||
|
||||
manager := NewSyncManager(s.chunkManager)
|
||||
@ -200,7 +200,7 @@ func (s *SyncManagerSuite) TestCompacted() {
|
||||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs, nil)
|
||||
metacache.UpdateNumOfRows(1000)(seg)
|
||||
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
|
||||
|
||||
manager := NewSyncManager(s.chunkManager)
|
||||
|
||||
@ -191,7 +191,7 @@ func (s *SyncTaskSuite) TestRunNormal() {
|
||||
metacache.UpdateNumOfRows(1000)(seg)
|
||||
seg.GetBloomFilterSet().Roll()
|
||||
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
|
||||
|
||||
s.Run("without_data", func() {
|
||||
@ -277,7 +277,7 @@ func (s *SyncTaskSuite) TestRunL0Segment() {
|
||||
bfs := pkoracle.NewBloomFilterSet()
|
||||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{Level: datapb.SegmentLevel_L0}, bfs, nil)
|
||||
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
|
||||
|
||||
s.Run("pure_delete_l0_flush", func() {
|
||||
@ -319,7 +319,7 @@ func (s *SyncTaskSuite) TestRunError() {
|
||||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, pkoracle.NewBloomFilterSet(), nil)
|
||||
metacache.UpdateNumOfRows(1000)(seg)
|
||||
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
|
||||
s.Run("allocate_id_fail", func() {
|
||||
mockAllocator := allocator.NewMockAllocator(s.T())
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user