diff --git a/Makefile b/Makefile index 4d6b4c53a1..875e026f4f 100644 --- a/Makefile +++ b/Makefile @@ -448,6 +448,7 @@ generate-mockery-datanode: getdeps $(INSTALL_PATH)/mockery --name=SyncManager --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_sync_manager.go --with-expecter --structname=MockSyncManager --outpkg=syncmgr --inpackage $(INSTALL_PATH)/mockery --name=MetaWriter --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_meta_writer.go --with-expecter --structname=MockMetaWriter --outpkg=syncmgr --inpackage $(INSTALL_PATH)/mockery --name=Serializer --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_serializer.go --with-expecter --structname=MockSerializer --outpkg=syncmgr --inpackage + $(INSTALL_PATH)/mockery --name=Task --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_task.go --with-expecter --structname=MockTask --outpkg=syncmgr --inpackage $(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage $(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_mananger.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage $(INSTALL_PATH)/mockery --name=BinlogIO --dir=$(PWD)/internal/datanode/io --output=$(PWD)/internal/datanode/io --filename=mock_binlogio.go --with-expecter --structname=MockBinlogIO --outpkg=io --inpackage diff --git a/internal/datanode/syncmgr/key_lock_dispatcher.go b/internal/datanode/syncmgr/key_lock_dispatcher.go index 2148bd0760..3fc304d634 100644 --- a/internal/datanode/syncmgr/key_lock_dispatcher.go +++ b/internal/datanode/syncmgr/key_lock_dispatcher.go @@ -8,6 +8,7 @@ import ( type Task interface { SegmentID() int64 + CalcTargetSegment() (int64, error) Checkpoint() *msgpb.MsgPosition StartPosition() *msgpb.MsgPosition ChannelName() string diff --git a/internal/datanode/syncmgr/key_lock_dispatcher_test.go b/internal/datanode/syncmgr/key_lock_dispatcher_test.go index 25af7d88f6..111c0d84ad 100644 --- a/internal/datanode/syncmgr/key_lock_dispatcher_test.go +++ b/internal/datanode/syncmgr/key_lock_dispatcher_test.go @@ -6,19 +6,22 @@ import ( "github.com/stretchr/testify/suite" "go.uber.org/atomic" - - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" ) +/* type mockTask struct { - ch chan struct{} - err error + targetID int64 + ch chan struct{} + err error } func (t *mockTask) done() { close(t.ch) } +func (t *mockTask) CalcTargetSegment() (int64, error) { + return t.targetID, t.err +} func (t *mockTask) SegmentID() int64 { panic("no implementation") } func (t *mockTask) Checkpoint() *msgpb.MsgPosition { panic("no implementation") } func (t *mockTask) StartPosition() *msgpb.MsgPosition { panic("no implementation") } @@ -34,7 +37,7 @@ func newMockTask(err error) *mockTask { err: err, ch: make(chan struct{}), } -} +}*/ type KeyLockDispatcherSuite struct { suite.Suite @@ -43,15 +46,19 @@ type KeyLockDispatcherSuite struct { func (s *KeyLockDispatcherSuite) TestKeyLock() { d := newKeyLockDispatcher[int64](2) - t1 := newMockTask(nil) - t2 := newMockTask(nil) + done := make(chan struct{}) + t1 := NewMockTask(s.T()) + t1.EXPECT().Run().Run(func() { + <-done + }).Return(nil) + t2 := NewMockTask(s.T()) + t2.EXPECT().Run().Return(nil) sig := atomic.NewBool(false) d.Submit(1, t1) go func() { - defer t2.done() d.Submit(1, t2) sig.Store(true) @@ -59,7 +66,7 @@ func (s *KeyLockDispatcherSuite) TestKeyLock() { s.False(sig.Load(), "task 2 will never be submit before task 1 done") - t1.done() + close(done) s.Eventually(sig.Load, time.Second, time.Millisecond*100) } @@ -67,14 +74,20 @@ func (s *KeyLockDispatcherSuite) TestKeyLock() { func (s *KeyLockDispatcherSuite) TestCap() { d := newKeyLockDispatcher[int64](1) - t1 := newMockTask(nil) - t2 := newMockTask(nil) + t1 := NewMockTask(s.T()) + t2 := NewMockTask(s.T()) + + done := make(chan struct{}) + t1.EXPECT().Run().Run(func() { + <-done + }).Return(nil) + t2.EXPECT().Run().Return(nil) sig := atomic.NewBool(false) d.Submit(1, t1) go func() { - defer t2.done() + // defer t2.done() d.Submit(2, t2) sig.Store(true) @@ -82,7 +95,7 @@ func (s *KeyLockDispatcherSuite) TestCap() { s.False(sig.Load(), "task 2 will never be submit before task 1 done") - t1.done() + close(done) s.Eventually(sig.Load, time.Second, time.Millisecond*100) } diff --git a/internal/datanode/syncmgr/mock_task.go b/internal/datanode/syncmgr/mock_task.go new file mode 100644 index 0000000000..1d26bf2913 --- /dev/null +++ b/internal/datanode/syncmgr/mock_task.go @@ -0,0 +1,295 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package syncmgr + +import ( + msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + mock "github.com/stretchr/testify/mock" +) + +// MockTask is an autogenerated mock type for the Task type +type MockTask struct { + mock.Mock +} + +type MockTask_Expecter struct { + mock *mock.Mock +} + +func (_m *MockTask) EXPECT() *MockTask_Expecter { + return &MockTask_Expecter{mock: &_m.Mock} +} + +// CalcTargetSegment provides a mock function with given fields: +func (_m *MockTask) CalcTargetSegment() (int64, error) { + ret := _m.Called() + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func() (int64, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockTask_CalcTargetSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CalcTargetSegment' +type MockTask_CalcTargetSegment_Call struct { + *mock.Call +} + +// CalcTargetSegment is a helper method to define mock.On call +func (_e *MockTask_Expecter) CalcTargetSegment() *MockTask_CalcTargetSegment_Call { + return &MockTask_CalcTargetSegment_Call{Call: _e.mock.On("CalcTargetSegment")} +} + +func (_c *MockTask_CalcTargetSegment_Call) Run(run func()) *MockTask_CalcTargetSegment_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockTask_CalcTargetSegment_Call) Return(_a0 int64, _a1 error) *MockTask_CalcTargetSegment_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockTask_CalcTargetSegment_Call) RunAndReturn(run func() (int64, error)) *MockTask_CalcTargetSegment_Call { + _c.Call.Return(run) + return _c +} + +// ChannelName provides a mock function with given fields: +func (_m *MockTask) ChannelName() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockTask_ChannelName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ChannelName' +type MockTask_ChannelName_Call struct { + *mock.Call +} + +// ChannelName is a helper method to define mock.On call +func (_e *MockTask_Expecter) ChannelName() *MockTask_ChannelName_Call { + return &MockTask_ChannelName_Call{Call: _e.mock.On("ChannelName")} +} + +func (_c *MockTask_ChannelName_Call) Run(run func()) *MockTask_ChannelName_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockTask_ChannelName_Call) Return(_a0 string) *MockTask_ChannelName_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTask_ChannelName_Call) RunAndReturn(run func() string) *MockTask_ChannelName_Call { + _c.Call.Return(run) + return _c +} + +// Checkpoint provides a mock function with given fields: +func (_m *MockTask) Checkpoint() *msgpb.MsgPosition { + ret := _m.Called() + + var r0 *msgpb.MsgPosition + if rf, ok := ret.Get(0).(func() *msgpb.MsgPosition); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*msgpb.MsgPosition) + } + } + + return r0 +} + +// MockTask_Checkpoint_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Checkpoint' +type MockTask_Checkpoint_Call struct { + *mock.Call +} + +// Checkpoint is a helper method to define mock.On call +func (_e *MockTask_Expecter) Checkpoint() *MockTask_Checkpoint_Call { + return &MockTask_Checkpoint_Call{Call: _e.mock.On("Checkpoint")} +} + +func (_c *MockTask_Checkpoint_Call) Run(run func()) *MockTask_Checkpoint_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockTask_Checkpoint_Call) Return(_a0 *msgpb.MsgPosition) *MockTask_Checkpoint_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTask_Checkpoint_Call) RunAndReturn(run func() *msgpb.MsgPosition) *MockTask_Checkpoint_Call { + _c.Call.Return(run) + return _c +} + +// Run provides a mock function with given fields: +func (_m *MockTask) Run() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockTask_Run_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Run' +type MockTask_Run_Call struct { + *mock.Call +} + +// Run is a helper method to define mock.On call +func (_e *MockTask_Expecter) Run() *MockTask_Run_Call { + return &MockTask_Run_Call{Call: _e.mock.On("Run")} +} + +func (_c *MockTask_Run_Call) Run(run func()) *MockTask_Run_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockTask_Run_Call) Return(_a0 error) *MockTask_Run_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTask_Run_Call) RunAndReturn(run func() error) *MockTask_Run_Call { + _c.Call.Return(run) + return _c +} + +// SegmentID provides a mock function with given fields: +func (_m *MockTask) SegmentID() int64 { + ret := _m.Called() + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// MockTask_SegmentID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SegmentID' +type MockTask_SegmentID_Call struct { + *mock.Call +} + +// SegmentID is a helper method to define mock.On call +func (_e *MockTask_Expecter) SegmentID() *MockTask_SegmentID_Call { + return &MockTask_SegmentID_Call{Call: _e.mock.On("SegmentID")} +} + +func (_c *MockTask_SegmentID_Call) Run(run func()) *MockTask_SegmentID_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockTask_SegmentID_Call) Return(_a0 int64) *MockTask_SegmentID_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTask_SegmentID_Call) RunAndReturn(run func() int64) *MockTask_SegmentID_Call { + _c.Call.Return(run) + return _c +} + +// StartPosition provides a mock function with given fields: +func (_m *MockTask) StartPosition() *msgpb.MsgPosition { + ret := _m.Called() + + var r0 *msgpb.MsgPosition + if rf, ok := ret.Get(0).(func() *msgpb.MsgPosition); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*msgpb.MsgPosition) + } + } + + return r0 +} + +// MockTask_StartPosition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StartPosition' +type MockTask_StartPosition_Call struct { + *mock.Call +} + +// StartPosition is a helper method to define mock.On call +func (_e *MockTask_Expecter) StartPosition() *MockTask_StartPosition_Call { + return &MockTask_StartPosition_Call{Call: _e.mock.On("StartPosition")} +} + +func (_c *MockTask_StartPosition_Call) Run(run func()) *MockTask_StartPosition_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockTask_StartPosition_Call) Return(_a0 *msgpb.MsgPosition) *MockTask_StartPosition_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTask_StartPosition_Call) RunAndReturn(run func() *msgpb.MsgPosition) *MockTask_StartPosition_Call { + _c.Call.Return(run) + return _c +} + +// NewMockTask creates a new instance of MockTask. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockTask(t interface { + mock.TestingT + Cleanup(func()) +}) *MockTask { + mock := &MockTask{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/datanode/syncmgr/sync_manager.go b/internal/datanode/syncmgr/sync_manager.go index b78c8cda2a..2cbcbff0a0 100644 --- a/internal/datanode/syncmgr/sync_manager.go +++ b/internal/datanode/syncmgr/sync_manager.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" + "github.com/cockroachdb/errors" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" @@ -111,14 +112,38 @@ func (mgr *syncManager) SyncData(ctx context.Context, task Task) *conc.Future[er t.WithAllocator(mgr.allocator) } + return mgr.safeSubmitTask(task) +} + +// safeSubmitTask handles submitting task logic with optimistic target check logic +// when task returns errTargetSegmentNotMatch error +// perform refetch then retry logic +func (mgr *syncManager) safeSubmitTask(task Task) *conc.Future[error] { taskKey := fmt.Sprintf("%d-%d", task.SegmentID(), task.Checkpoint().GetTimestamp()) mgr.tasks.Insert(taskKey, task) - // make sync for same segment execute in sequence - // if previous sync task is not finished, block here - return mgr.Submit(task.SegmentID(), task, func(err error) { - // remove task from records - mgr.tasks.Remove(taskKey) + return conc.Go[error](func() (error, error) { + defer mgr.tasks.Remove(taskKey) + for { + targetID, err := task.CalcTargetSegment() + if err != nil { + return err, nil + } + log.Info("task calculated target segment id", + zap.Int64("targetID", targetID), + zap.Int64("segmentID", task.SegmentID()), + ) + + // make sync for same segment execute in sequence + // if previous sync task is not finished, block here + f := mgr.Submit(targetID, task) + err, _ = f.Await() + if errors.Is(err, errTargetSegmentNotMatch) { + log.Info("target updated during submitting", zap.Error(err)) + continue + } + return err, nil + } }) } diff --git a/internal/datanode/syncmgr/sync_manager_test.go b/internal/datanode/syncmgr/sync_manager_test.go index 953f5c4c49..5ddb5e332a 100644 --- a/internal/datanode/syncmgr/sync_manager_test.go +++ b/internal/datanode/syncmgr/sync_manager_test.go @@ -308,6 +308,23 @@ func (s *SyncManagerSuite) TestNewSyncManager() { s.Error(err) } +func (s *SyncManagerSuite) TestTargetUpdated() { + manager, err := NewSyncManager(s.chunkManager, s.allocator) + s.NoError(err) + + task := NewMockTask(s.T()) + task.EXPECT().SegmentID().Return(1000) + task.EXPECT().Checkpoint().Return(&msgpb.MsgPosition{}) + task.EXPECT().CalcTargetSegment().Return(1000, nil).Once() + task.EXPECT().CalcTargetSegment().Return(1001, nil).Once() + task.EXPECT().Run().Return(errTargetSegmentNotMatch).Once() + task.EXPECT().Run().Return(nil).Once() + + f := manager.SyncData(context.Background(), task) + err, _ = f.Await() + s.NoError(err) +} + func TestSyncManager(t *testing.T) { suite.Run(t, new(SyncManagerSuite)) } diff --git a/internal/datanode/syncmgr/task.go b/internal/datanode/syncmgr/task.go index c761bd1ccb..fbc782f8b6 100644 --- a/internal/datanode/syncmgr/task.go +++ b/internal/datanode/syncmgr/task.go @@ -21,7 +21,9 @@ import ( "fmt" "path" + "github.com/cockroachdb/errors" "github.com/samber/lo" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -42,6 +44,8 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) +var errTargetSegmentNotMatch = errors.New("target segment not match") + type SyncTask struct { chunkManager storage.ChunkManager allocator allocator.Interface @@ -60,6 +64,9 @@ type SyncTask struct { batchSize int64 level datapb.SegmentLevel + // targetSegmentID stores the "current" segmentID task shall be handling + targetSegmentID atomic.Int64 + tsFrom typeutil.Timestamp tsTo typeutil.Timestamp @@ -103,6 +110,9 @@ func (t *SyncTask) getLogger() *log.MLogger { } func (t *SyncTask) handleError(err error) { + if errors.Is(err, errTargetSegmentNotMatch) { + return + } if t.failureCallback != nil { t.failureCallback(err) } @@ -138,6 +148,14 @@ func (t *SyncTask) Run() (err error) { } if t.segment.CompactTo() > 0 { + // current task does not hold the key lock for "true" target segment id + if t.segment.CompactTo() != t.targetSegmentID.Load() { + log.Info("sync task does not hold target segment id lock, return error and retry", + zap.Int64("compactTo", t.segment.CompactTo()), + zap.Int64("currentTarget", t.targetSegmentID.Load()), + ) + return errors.Wrap(errTargetSegmentNotMatch, "task does not hold target segment id lock") + } log.Info("syncing segment compacted, update segment id", zap.Int64("compactTo", t.segment.CompactTo())) // update sync task segment id // it's ok to use compactTo segmentID here, since there shall be no insert for compacted segment @@ -328,6 +346,20 @@ func (t *SyncTask) SegmentID() int64 { return t.segmentID } +func (t *SyncTask) CalcTargetSegment() (int64, error) { + segment, has := t.metacache.GetSegmentByID(t.segmentID) + if !has { + return -1, merr.WrapErrSegmentNotFound(t.segmentID) + } + target := segment.SegmentID() + if compactTo := segment.CompactTo(); compactTo > 0 { + target = compactTo + } + t.targetSegmentID.Store(target) + + return target, nil +} + func (t *SyncTask) Checkpoint() *msgpb.MsgPosition { return t.checkpoint } diff --git a/internal/datanode/syncmgr/task_test.go b/internal/datanode/syncmgr/task_test.go index 2b06d62c71..13b8318734 100644 --- a/internal/datanode/syncmgr/task_test.go +++ b/internal/datanode/syncmgr/task_test.go @@ -328,6 +328,24 @@ func (s *SyncTaskSuite) TestCompactToNull() { } func (s *SyncTaskSuite) TestRunError() { + s.Run("target_segment_not_match", func() { + flag := false + seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ + ID: s.segmentID, + }, metacache.NewBloomFilterSet()) + metacache.CompactTo(s.segmentID + 1)(seg) + s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true).Once() + + handler := func(_ error) { flag = true } + task := s.getSuiteSyncTask().WithFailureCallback(handler) + + task.targetSegmentID.Store(s.segmentID) + err := task.Run() + + s.Error(err) + s.False(flag, "target not match shall not trigger error handler") + }) + s.Run("segment_not_found", func() { s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(nil, false) flag := false @@ -409,6 +427,63 @@ func (s *SyncTaskSuite) TestNextID() { }) } +func (s *SyncTaskSuite) TestCalcTargetID() { + task := s.getSuiteSyncTask() + + s.Run("normal_calc_segment", func() { + s.Run("not_compacted", func() { + segment := metacache.NewSegmentInfo(&datapb.SegmentInfo{ + ID: s.segmentID, + PartitionID: s.partitionID, + CollectionID: s.collectionID, + }, metacache.NewBloomFilterSet()) + s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(segment, true).Once() + + targetID, err := task.CalcTargetSegment() + s.Require().NoError(err) + s.Equal(s.segmentID, targetID) + s.Equal(s.segmentID, task.targetSegmentID.Load()) + }) + + s.Run("compacted_normal", func() { + segment := metacache.NewSegmentInfo(&datapb.SegmentInfo{ + ID: s.segmentID, + PartitionID: s.partitionID, + CollectionID: s.collectionID, + }, metacache.NewBloomFilterSet()) + metacache.CompactTo(1000)(segment) + s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(segment, true).Once() + + targetID, err := task.CalcTargetSegment() + s.Require().NoError(err) + s.EqualValues(1000, targetID) + s.EqualValues(1000, task.targetSegmentID.Load()) + }) + + s.Run("compacted_null", func() { + segment := metacache.NewSegmentInfo(&datapb.SegmentInfo{ + ID: s.segmentID, + PartitionID: s.partitionID, + CollectionID: s.collectionID, + }, metacache.NewBloomFilterSet()) + metacache.CompactTo(metacache.NullSegment)(segment) + s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(segment, true).Once() + + targetID, err := task.CalcTargetSegment() + s.Require().NoError(err) + s.Equal(s.segmentID, targetID) + s.Equal(s.segmentID, task.targetSegmentID.Load()) + }) + }) + + s.Run("segment_not_found", func() { + s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(nil, false).Once() + + _, err := task.CalcTargetSegment() + s.Error(err) + }) +} + func TestSyncTask(t *testing.T) { suite.Run(t, new(SyncTaskSuite)) }