From 5ca2af8124245ec5d18dcc767853c22d3bf280a7 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Thu, 20 Mar 2025 14:20:17 +0800 Subject: [PATCH] fix: [2.5]L0 brings its own start pos when syncing (#40664) See also: #40388, #40207 pr: #40663 --------- Signed-off-by: yangxuan --- internal/flushcommon/syncmgr/meta_writer.go | 23 +++++++++++++------ .../flushcommon/syncmgr/meta_writer_test.go | 4 ++-- .../flushcommon/syncmgr/sync_manager_test.go | 4 ++-- internal/flushcommon/syncmgr/task_test.go | 6 ++--- 4 files changed, 23 insertions(+), 14 deletions(-) diff --git a/internal/flushcommon/syncmgr/meta_writer.go b/internal/flushcommon/syncmgr/meta_writer.go index 58c3705078..2f3b0e0ad9 100644 --- a/internal/flushcommon/syncmgr/meta_writer.go +++ b/internal/flushcommon/syncmgr/meta_writer.go @@ -64,13 +64,22 @@ func (b *brokerMetaWriter) UpdateSync(ctx context.Context, pack *SyncTask) error Position: pack.checkpoint, }) - startPos := lo.Map(pack.metacache.GetSegmentsBy(metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Sealed, commonpb.SegmentState_Flushing), - metacache.WithStartPosNotRecorded()), func(info *metacache.SegmentInfo, _ int) *datapb.SegmentStartPosition { - return &datapb.SegmentStartPosition{ - SegmentID: info.SegmentID(), - StartPosition: info.StartPosition(), - } - }) + // Get not reported L1's start positions + startPos := lo.Map(pack.metacache.GetSegmentsBy( + metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Sealed, commonpb.SegmentState_Flushing), + metacache.WithLevel(datapb.SegmentLevel_L1), metacache.WithStartPosNotRecorded()), + func(info *metacache.SegmentInfo, _ int) *datapb.SegmentStartPosition { + return &datapb.SegmentStartPosition{ + SegmentID: info.SegmentID(), + StartPosition: info.StartPosition(), + } + }) + + // L0 brings its own start position + if segment.Level() == datapb.SegmentLevel_L0 { + startPos = append(startPos, &datapb.SegmentStartPosition{SegmentID: pack.segmentID, StartPosition: pack.StartPosition()}) + } + getBinlogNum := func(fBinlog *datapb.FieldBinlog) int { return len(fBinlog.GetBinlogs()) } log.Info("SaveBinlogPath", zap.Int64("SegmentID", pack.segmentID), diff --git a/internal/flushcommon/syncmgr/meta_writer_test.go b/internal/flushcommon/syncmgr/meta_writer_test.go index 1354f035fd..dd2c035e95 100644 --- a/internal/flushcommon/syncmgr/meta_writer_test.go +++ b/internal/flushcommon/syncmgr/meta_writer_test.go @@ -43,7 +43,7 @@ func (s *MetaWriterSuite) TestNormalSave() { bfs := pkoracle.NewBloomFilterSet() seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs, nil) metacache.UpdateNumOfRows(1000)(seg) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() task := NewSyncTask() @@ -61,7 +61,7 @@ func (s *MetaWriterSuite) TestReturnError() { seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs, nil) metacache.UpdateNumOfRows(1000)(seg) s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) task := NewSyncTask() task.WithMetaCache(s.metacache) err := s.writer.UpdateSync(ctx, task) diff --git a/internal/flushcommon/syncmgr/sync_manager_test.go b/internal/flushcommon/syncmgr/sync_manager_test.go index a09e71e333..0b4dbf4bd1 100644 --- a/internal/flushcommon/syncmgr/sync_manager_test.go +++ b/internal/flushcommon/syncmgr/sync_manager_test.go @@ -160,7 +160,7 @@ func (s *SyncManagerSuite) TestSubmit() { seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs, nil) metacache.UpdateNumOfRows(1000)(seg) s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() manager := NewSyncManager(s.chunkManager) @@ -200,7 +200,7 @@ func (s *SyncManagerSuite) TestCompacted() { seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs, nil) metacache.UpdateNumOfRows(1000)(seg) s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() manager := NewSyncManager(s.chunkManager) diff --git a/internal/flushcommon/syncmgr/task_test.go b/internal/flushcommon/syncmgr/task_test.go index 588a2047cd..4cbbb6b666 100644 --- a/internal/flushcommon/syncmgr/task_test.go +++ b/internal/flushcommon/syncmgr/task_test.go @@ -191,7 +191,7 @@ func (s *SyncTaskSuite) TestRunNormal() { metacache.UpdateNumOfRows(1000)(seg) seg.GetBloomFilterSet().Roll() s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.Run("without_data", func() { @@ -277,7 +277,7 @@ func (s *SyncTaskSuite) TestRunL0Segment() { bfs := pkoracle.NewBloomFilterSet() seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{Level: datapb.SegmentLevel_L0}, bfs, nil) s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.Run("pure_delete_l0_flush", func() { @@ -319,7 +319,7 @@ func (s *SyncTaskSuite) TestRunError() { seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, pkoracle.NewBloomFilterSet(), nil) metacache.UpdateNumOfRows(1000)(seg) s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.Run("allocate_id_fail", func() { mockAllocator := allocator.NewMockAllocator(s.T())