From 07daa8f12b46a9d11ccb464d6a76bdecffc25a5f Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Wed, 3 Jul 2024 19:02:09 +0800 Subject: [PATCH] enhance:[Cherry-pick] avoid maintain checkpoint info in sync manager (#33413) (#34285) relate: https://github.com/milvus-io/milvus/issues/32915 pr: https://github.com/milvus-io/milvus/pull/33413 Signed-off-by: aoiasd --- internal/datanode/importv2/scheduler_test.go | 6 +- .../datanode/importv2/task_l0_import_test.go | 4 +- .../datanode/syncmgr/mock_sync_manager.go | 89 ++++-------- internal/datanode/syncmgr/sync_manager.go | 19 +-- .../writebuffer/bf_write_buffer_test.go | 4 +- internal/datanode/writebuffer/write_buffer.go | 129 +++++++++++------- .../datanode/writebuffer/write_buffer_test.go | 24 ++-- 7 files changed, 129 insertions(+), 146 deletions(-) diff --git a/internal/datanode/importv2/scheduler_test.go b/internal/datanode/importv2/scheduler_test.go index b016770b5d..112d537f3a 100644 --- a/internal/datanode/importv2/scheduler_test.go +++ b/internal/datanode/importv2/scheduler_test.go @@ -246,7 +246,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import() { cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) s.cm = cm - s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[struct{}] { + s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] { future := conc.Go(func() (struct{}, error) { return struct{}{}, nil }) @@ -307,7 +307,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() { cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) s.cm = cm - s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[struct{}] { + s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] { future := conc.Go(func() (struct{}, error) { return struct{}{}, errors.New("mock err") }) @@ -384,7 +384,7 @@ func (s *SchedulerSuite) TestScheduler_ReadFileStat() { } func (s *SchedulerSuite) TestScheduler_ImportFile() { - s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[struct{}] { + s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] { future := conc.Go(func() (struct{}, error) { return struct{}{}, nil }) diff --git a/internal/datanode/importv2/task_l0_import_test.go b/internal/datanode/importv2/task_l0_import_test.go index a86ee1bc12..3781e0f314 100644 --- a/internal/datanode/importv2/task_l0_import_test.go +++ b/internal/datanode/importv2/task_l0_import_test.go @@ -131,8 +131,8 @@ func (s *L0ImportSuite) TestL0PreImport() { } func (s *L0ImportSuite) TestL0Import() { - s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything). - RunAndReturn(func(ctx context.Context, task syncmgr.Task) *conc.Future[struct{}] { + s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] { alloc := allocator.NewMockAllocator(s.T()) alloc.EXPECT().Alloc(mock.Anything).Return(1, int64(s.delCnt)+1, nil) task.(*syncmgr.SyncTask).WithAllocator(alloc) diff --git a/internal/datanode/syncmgr/mock_sync_manager.go b/internal/datanode/syncmgr/mock_sync_manager.go index ee19d324d3..37baca41fa 100644 --- a/internal/datanode/syncmgr/mock_sync_manager.go +++ b/internal/datanode/syncmgr/mock_sync_manager.go @@ -8,8 +8,6 @@ import ( conc "github.com/milvus-io/milvus/pkg/util/conc" mock "github.com/stretchr/testify/mock" - - msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" ) // MockSyncManager is an autogenerated mock type for the SyncManager type @@ -25,67 +23,20 @@ func (_m *MockSyncManager) EXPECT() *MockSyncManager_Expecter { return &MockSyncManager_Expecter{mock: &_m.Mock} } -// GetEarliestPosition provides a mock function with given fields: channel -func (_m *MockSyncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) { - ret := _m.Called(channel) - - var r0 int64 - var r1 *msgpb.MsgPosition - if rf, ok := ret.Get(0).(func(string) (int64, *msgpb.MsgPosition)); ok { - return rf(channel) +// SyncData provides a mock function with given fields: ctx, task, callbacks +func (_m *MockSyncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] { + _va := make([]interface{}, len(callbacks)) + for _i := range callbacks { + _va[_i] = callbacks[_i] } - if rf, ok := ret.Get(0).(func(string) int64); ok { - r0 = rf(channel) - } else { - r0 = ret.Get(0).(int64) - } - - if rf, ok := ret.Get(1).(func(string) *msgpb.MsgPosition); ok { - r1 = rf(channel) - } else { - if ret.Get(1) != nil { - r1 = ret.Get(1).(*msgpb.MsgPosition) - } - } - - return r0, r1 -} - -// MockSyncManager_GetEarliestPosition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetEarliestPosition' -type MockSyncManager_GetEarliestPosition_Call struct { - *mock.Call -} - -// GetEarliestPosition is a helper method to define mock.On call -// - channel string -func (_e *MockSyncManager_Expecter) GetEarliestPosition(channel interface{}) *MockSyncManager_GetEarliestPosition_Call { - return &MockSyncManager_GetEarliestPosition_Call{Call: _e.mock.On("GetEarliestPosition", channel)} -} - -func (_c *MockSyncManager_GetEarliestPosition_Call) Run(run func(channel string)) *MockSyncManager_GetEarliestPosition_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(string)) - }) - return _c -} - -func (_c *MockSyncManager_GetEarliestPosition_Call) Return(_a0 int64, _a1 *msgpb.MsgPosition) *MockSyncManager_GetEarliestPosition_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockSyncManager_GetEarliestPosition_Call) RunAndReturn(run func(string) (int64, *msgpb.MsgPosition)) *MockSyncManager_GetEarliestPosition_Call { - _c.Call.Return(run) - return _c -} - -// SyncData provides a mock function with given fields: ctx, task -func (_m *MockSyncManager) SyncData(ctx context.Context, task Task) *conc.Future[struct{}] { - ret := _m.Called(ctx, task) + var _ca []interface{} + _ca = append(_ca, ctx, task) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) var r0 *conc.Future[struct{}] - if rf, ok := ret.Get(0).(func(context.Context, Task) *conc.Future[struct{}]); ok { - r0 = rf(ctx, task) + if rf, ok := ret.Get(0).(func(context.Context, Task, ...func(error) error) *conc.Future[struct{}]); ok { + r0 = rf(ctx, task, callbacks...) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*conc.Future[struct{}]) @@ -103,13 +54,21 @@ type MockSyncManager_SyncData_Call struct { // SyncData is a helper method to define mock.On call // - ctx context.Context // - task Task -func (_e *MockSyncManager_Expecter) SyncData(ctx interface{}, task interface{}) *MockSyncManager_SyncData_Call { - return &MockSyncManager_SyncData_Call{Call: _e.mock.On("SyncData", ctx, task)} +// - callbacks ...func(error) error +func (_e *MockSyncManager_Expecter) SyncData(ctx interface{}, task interface{}, callbacks ...interface{}) *MockSyncManager_SyncData_Call { + return &MockSyncManager_SyncData_Call{Call: _e.mock.On("SyncData", + append([]interface{}{ctx, task}, callbacks...)...)} } -func (_c *MockSyncManager_SyncData_Call) Run(run func(ctx context.Context, task Task)) *MockSyncManager_SyncData_Call { +func (_c *MockSyncManager_SyncData_Call) Run(run func(ctx context.Context, task Task, callbacks ...func(error) error)) *MockSyncManager_SyncData_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(Task)) + variadicArgs := make([]func(error) error, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(func(error) error) + } + } + run(args[0].(context.Context), args[1].(Task), variadicArgs...) }) return _c } @@ -119,7 +78,7 @@ func (_c *MockSyncManager_SyncData_Call) Return(_a0 *conc.Future[struct{}]) *Moc return _c } -func (_c *MockSyncManager_SyncData_Call) RunAndReturn(run func(context.Context, Task) *conc.Future[struct{}]) *MockSyncManager_SyncData_Call { +func (_c *MockSyncManager_SyncData_Call) RunAndReturn(run func(context.Context, Task, ...func(error) error) *conc.Future[struct{}]) *MockSyncManager_SyncData_Call { _c.Call.Return(run) return _c } diff --git a/internal/datanode/syncmgr/sync_manager.go b/internal/datanode/syncmgr/sync_manager.go index 3a5ac4211d..1443f50dfb 100644 --- a/internal/datanode/syncmgr/sync_manager.go +++ b/internal/datanode/syncmgr/sync_manager.go @@ -45,9 +45,7 @@ type SyncMeta struct { //go:generate mockery --name=SyncManager --structname=MockSyncManager --output=./ --filename=mock_sync_manager.go --with-expecter --inpackage type SyncManager interface { // SyncData is the method to submit sync task. - SyncData(ctx context.Context, task Task) *conc.Future[struct{}] - // GetEarliestPosition returns the earliest position (normally start position) of the processing sync task of provided channel. - GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) + SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] } type syncManager struct { @@ -99,7 +97,7 @@ func (mgr *syncManager) resizeHandler(evt *config.Event) { } } -func (mgr *syncManager) SyncData(ctx context.Context, task Task) *conc.Future[struct{}] { +func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] { switch t := task.(type) { case *SyncTask: t.WithAllocator(mgr.allocator).WithChunkManager(mgr.chunkManager) @@ -107,7 +105,7 @@ func (mgr *syncManager) SyncData(ctx context.Context, task Task) *conc.Future[st t.WithAllocator(mgr.allocator) } - return mgr.safeSubmitTask(task) + return mgr.safeSubmitTask(task, callbacks...) } // safeSubmitTask submits task to SyncManager @@ -116,11 +114,10 @@ func (mgr *syncManager) safeSubmitTask(task Task, callbacks ...func(error) error mgr.tasks.Insert(taskKey, task) key := task.SegmentID() - return mgr.submit(key, task) + return mgr.submit(key, task, callbacks...) } -func (mgr *syncManager) submit(key int64, task Task) *conc.Future[struct{}] { - taskKey := fmt.Sprintf("%d-%d", task.SegmentID(), task.Checkpoint().GetTimestamp()) +func (mgr *syncManager) submit(key int64, task Task, callbacks ...func(error) error) *conc.Future[struct{}] { handler := func(err error) error { if err == nil { return nil @@ -128,11 +125,9 @@ func (mgr *syncManager) submit(key int64, task Task) *conc.Future[struct{}] { task.HandleError(err) return err } + callbacks = append([]func(error) error{handler}, callbacks...) log.Info("sync mgr sumbit task with key", zap.Int64("key", key)) - return mgr.Submit(key, task, handler, func(err error) error { - mgr.tasks.Remove(taskKey) - return err - }) + return mgr.Submit(key, task, callbacks...) } func (mgr *syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) { diff --git a/internal/datanode/writebuffer/bf_write_buffer_test.go b/internal/datanode/writebuffer/bf_write_buffer_test.go index 529c84dd11..f4796930e8 100644 --- a/internal/datanode/writebuffer/bf_write_buffer_test.go +++ b/internal/datanode/writebuffer/bf_write_buffer_test.go @@ -325,7 +325,7 @@ func (s *BFWriteBufferSuite) TestAutoSync() { s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return() - s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(nil) + s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything).Return(nil) pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64) delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) @@ -406,7 +406,7 @@ func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() { s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return() s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return() - s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(nil) + s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything).Return(nil) pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64) delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index 8423c1fa23..5ae6524ef9 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -13,7 +13,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/datanode/broker" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -59,6 +58,48 @@ type WriteBuffer interface { Close(drop bool) } +type checkpointCandidate struct { + segmentID int64 + position *msgpb.MsgPosition + source string +} + +type checkpointCandidates struct { + candidates map[string]*checkpointCandidate + mu sync.RWMutex +} + +func newCheckpointCandiates() *checkpointCandidates { + return &checkpointCandidates{ + candidates: make(map[string]*checkpointCandidate), + } +} + +func (c *checkpointCandidates) Remove(segmentID int64, timestamp uint64) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.candidates, fmt.Sprintf("%d-%d", segmentID, timestamp)) +} + +func (c *checkpointCandidates) Add(segmentID int64, position *msgpb.MsgPosition, source string) { + c.mu.Lock() + defer c.mu.Unlock() + c.candidates[fmt.Sprintf("%d-%d", segmentID, position.GetTimestamp())] = &checkpointCandidate{segmentID, position, source} +} + +func (c *checkpointCandidates) GetEarliestWithDefault(def *checkpointCandidate) *checkpointCandidate { + c.mu.RLock() + defer c.mu.RUnlock() + + var result *checkpointCandidate = def + for _, candidate := range c.candidates { + if result == nil || candidate.position.GetTimestamp() < result.position.GetTimestamp() { + result = candidate + } + } + return result +} + func NewWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, syncMgr syncmgr.SyncManager, opts ...WriteBufferOption) (WriteBuffer, error) { option := defaultWBOption(metacache) for _, opt := range opts { @@ -88,13 +129,14 @@ type writeBufferBase struct { pkField *schemapb.FieldSchema estSizePerRecord int metaCache metacache.MetaCache - syncMgr syncmgr.SyncManager - broker broker.Broker - serializer syncmgr.Serializer buffers map[int64]*segmentBuffer // segmentID => segmentBuffer syncPolicies []SyncPolicy + syncCheckpoint *checkpointCandidates + syncMgr syncmgr.SyncManager + serializer syncmgr.Serializer + checkpoint *msgpb.MsgPosition flushTimestamp *atomic.Uint64 @@ -154,6 +196,7 @@ func newWriteBufferBase(channel string, metacache metacache.MetaCache, storageV2 buffers: make(map[int64]*segmentBuffer), metaCache: metacache, serializer: serializer, + syncCheckpoint: newCheckpointCandiates(), syncPolicies: option.syncPolicies, flushTimestamp: flushTs, storagev2Cache: storageV2Cache, @@ -232,59 +275,28 @@ func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition { wb.mut.RLock() defer wb.mut.RUnlock() - // syncCandidate from sync manager - syncSegmentID, syncCandidate := wb.syncMgr.GetEarliestPosition(wb.channelName) - - type checkpointCandidate struct { - segmentID int64 - position *msgpb.MsgPosition - } - var bufferCandidate *checkpointCandidate - candidates := lo.MapToSlice(wb.buffers, func(_ int64, buf *segmentBuffer) *checkpointCandidate { - return &checkpointCandidate{buf.segmentID, buf.EarliestPosition()} + return &checkpointCandidate{buf.segmentID, buf.EarliestPosition(), "segment buffer"} }) candidates = lo.Filter(candidates, func(candidate *checkpointCandidate, _ int) bool { return candidate.position != nil }) - if len(candidates) > 0 { - bufferCandidate = lo.MinBy(candidates, func(a, b *checkpointCandidate) bool { - return a.position.GetTimestamp() < b.position.GetTimestamp() - }) - } + checkpoint := wb.syncCheckpoint.GetEarliestWithDefault(lo.MinBy(candidates, func(a, b *checkpointCandidate) bool { + return a.position.GetTimestamp() < b.position.GetTimestamp() + })) - var checkpoint *msgpb.MsgPosition - var segmentID int64 - var cpSource string - switch { - case bufferCandidate == nil && syncCandidate == nil: + if checkpoint == nil { // all buffer are empty - log.RatedInfo(60, "checkpoint from latest consumed msg") + log.RatedDebug(60, "checkpoint from latest consumed msg", zap.Uint64("cpTimestamp", wb.checkpoint.GetTimestamp())) return wb.checkpoint - case bufferCandidate == nil && syncCandidate != nil: - checkpoint = syncCandidate - segmentID = syncSegmentID - cpSource = "syncManager" - case syncCandidate == nil && bufferCandidate != nil: - checkpoint = bufferCandidate.position - segmentID = bufferCandidate.segmentID - cpSource = "segmentBuffer" - case syncCandidate.GetTimestamp() >= bufferCandidate.position.GetTimestamp(): - checkpoint = bufferCandidate.position - segmentID = bufferCandidate.segmentID - cpSource = "segmentBuffer" - case syncCandidate.GetTimestamp() < bufferCandidate.position.GetTimestamp(): - checkpoint = syncCandidate - segmentID = syncSegmentID - cpSource = "syncManager" } - log.RatedInfo(20, "checkpoint evaluated", - zap.String("cpSource", cpSource), - zap.Int64("segmentID", segmentID), - zap.Uint64("cpTimestamp", checkpoint.GetTimestamp())) - return checkpoint + log.RatedDebug(20, "checkpoint evaluated", + zap.String("cpSource", checkpoint.source), + zap.Int64("segmentID", checkpoint.segmentID), + zap.Uint64("cpTimestamp", checkpoint.position.GetTimestamp())) + return checkpoint.position } func (wb *writeBufferBase) triggerSync() (segmentIDs []int64) { @@ -328,7 +340,16 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) } } - result = append(result, wb.syncMgr.SyncData(ctx, syncTask)) + result = append(result, wb.syncMgr.SyncData(ctx, syncTask, func(err error) error { + if err != nil { + return err + } + + if syncTask.StartPosition() != nil { + wb.syncCheckpoint.Remove(syncTask.SegmentID(), syncTask.StartPosition().GetTimestamp()) + } + return nil + })) } return result } @@ -559,6 +580,10 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (sy tsFrom, tsTo = timeRange.timestampMin, timeRange.timestampMax } + if startPos != nil { + wb.syncCheckpoint.Add(segmentID, startPos, "syncing task") + } + actions := []metacache.SegmentAction{} for _, chunk := range insert { @@ -629,7 +654,15 @@ func (wb *writeBufferBase) Close(drop bool) { t.WithDrop() } - f := wb.syncMgr.SyncData(context.Background(), syncTask) + f := wb.syncMgr.SyncData(context.Background(), syncTask, func(err error) error { + if err != nil { + return err + } + if syncTask.StartPosition() != nil { + wb.syncCheckpoint.Remove(syncTask.SegmentID(), syncTask.StartPosition().GetTimestamp()) + } + return nil + }) futures = append(futures, f) } diff --git a/internal/datanode/writebuffer/write_buffer_test.go b/internal/datanode/writebuffer/write_buffer_test.go index 22b11ead94..c507d70162 100644 --- a/internal/datanode/writebuffer/write_buffer_test.go +++ b/internal/datanode/writebuffer/write_buffer_test.go @@ -126,20 +126,17 @@ func (s *WriteBufferSuite) TestGetCheckpoint() { Timestamp: 1000, } - s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(0, nil).Once() - checkpoint := s.wb.GetCheckpoint() s.EqualValues(1000, checkpoint.GetTimestamp()) }) - s.Run("use_sync_mgr_cp", func() { + s.Run("use_syncing_segment_cp", func() { s.wb.checkpoint = &msgpb.MsgPosition{ Timestamp: 1000, } - s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(1, &msgpb.MsgPosition{ - Timestamp: 500, - }).Once() + s.wb.syncCheckpoint.Add(1, &msgpb.MsgPosition{Timestamp: 500}, "syncing segments") + defer s.wb.syncCheckpoint.Remove(1, 500) checkpoint := s.wb.GetCheckpoint() s.EqualValues(500, checkpoint.GetTimestamp()) @@ -150,7 +147,8 @@ func (s *WriteBufferSuite) TestGetCheckpoint() { Timestamp: 1000, } - s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(0, nil).Once() + s.wb.syncCheckpoint.Add(1, &msgpb.MsgPosition{Timestamp: 500}, "syncing segments") + defer s.wb.syncCheckpoint.Remove(1, 500) buf1, err := newSegmentBuffer(2, s.collSchema) s.Require().NoError(err) @@ -189,9 +187,8 @@ func (s *WriteBufferSuite) TestGetCheckpoint() { Timestamp: 1000, } - s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(1, &msgpb.MsgPosition{ - Timestamp: 300, - }).Once() + s.wb.syncCheckpoint.Add(1, &msgpb.MsgPosition{Timestamp: 300}, "syncing segments") + defer s.wb.syncCheckpoint.Remove(1, 300) buf1, err := newSegmentBuffer(2, s.collSchema) s.Require().NoError(err) @@ -230,9 +227,8 @@ func (s *WriteBufferSuite) TestGetCheckpoint() { Timestamp: 1000, } - s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(1, &msgpb.MsgPosition{ - Timestamp: 800, - }).Once() + s.wb.syncCheckpoint.Add(1, &msgpb.MsgPosition{Timestamp: 800}, "syncing segments") + defer s.wb.syncCheckpoint.Remove(1, 800) buf1, err := newSegmentBuffer(2, s.collSchema) s.Require().NoError(err) @@ -357,7 +353,7 @@ func (s *WriteBufferSuite) TestEvictBuffer() { s.metacache.EXPECT().GetSegmentByID(int64(2)).Return(segment, true) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(syncmgr.NewSyncTask(), nil) - s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(conc.Go[struct{}](func() (struct{}, error) { + s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything).Return(conc.Go[struct{}](func() (struct{}, error) { return struct{}{}, nil })) defer func() {