diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index bff7b92675..7210e96b11 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -35,6 +35,7 @@ import ( const ( maxParallelCompactionTaskNum = 100 rpcCompactionTimeout = 10 * time.Second + tsTimeout = uint64(1) ) type compactionPlanContext interface { @@ -183,6 +184,12 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla ts, err := c.allocator.allocTimestamp(context.TODO()) if err != nil { log.Warn("Alloc start time for CompactionPlan failed", zap.Int64("planID", plan.GetPlanID())) + // update plan ts to TIMEOUT ts + c.mu.Lock() + c.plans[plan.PlanID] = c.plans[plan.PlanID].shadowClone(func(task *compactionTask) { + task.plan.StartTime = tsTimeout + }) + c.mu.Unlock() return } diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index e35b6794b0..3ca88a5bed 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -18,6 +18,7 @@ package datacoord import ( "context" + "errors" "sync" "testing" "time" @@ -41,9 +42,10 @@ import ( func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { ch := make(chan interface{}, 1) type fields struct { - plans map[int64]*compactionTask - sessions *SessionManager - chManager *ChannelManager + plans map[int64]*compactionTask + sessions *SessionManager + chManager *ChannelManager + allocatorFactory func() allocator } type args struct { signal *compactionSignal @@ -77,6 +79,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { }, }, }, + allocatorFactory: func() allocator { return newMockAllocator() }, }, args{ signal: &compactionSignal{id: 100}, @@ -106,6 +109,49 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { }, }, }, + allocatorFactory: func() allocator { return newMockAllocator() }, + }, + args{ + signal: &compactionSignal{id: 100}, + plan: &datapb.CompactionPlan{PlanID: 1, Channel: "ch1", Type: datapb.CompactionType_MergeCompaction}, + }, + true, + nil, + }, + { + "test_allocate_ts_failed", + fields{ + plans: map[int64]*compactionTask{}, + sessions: &SessionManager{ + sessions: struct { + sync.RWMutex + data map[int64]*Session + }{ + data: map[int64]*Session{ + 1: {client: &mockDataNodeClient{ch: ch, compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}}, + }, + }, + }, + chManager: &ChannelManager{ + store: &ChannelStore{ + channelsInfo: map[int64]*NodeChannelInfo{ + 1: {NodeID: 1, Channels: []*channel{{Name: "ch1"}}}, + }, + }, + }, + allocatorFactory: func() allocator { + a := &NMockAllocator{} + start := time.Now() + a.EXPECT().allocTimestamp(mock.Anything).Call.Return(func(_ context.Context) Timestamp { + return tsoutil.ComposeTSByTime(time.Now(), 0) + }, func(_ context.Context) error { + if time.Since(start) > time.Second*2 { + return nil + } + return errors.New("mocked") + }) + return a + }, }, args{ signal: &compactionSignal{id: 100}, @@ -122,14 +168,13 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { sessions: tt.fields.sessions, chManager: tt.fields.chManager, parallelCh: make(map[int64]chan struct{}), - allocator: newMockAllocator(), + allocator: tt.fields.allocatorFactory(), } Params.Save(Params.DataCoordCfg.CompactionCheckIntervalInSeconds.Key, "1") c.start() err := c.execCompactionPlan(tt.args.signal, tt.args.plan) assert.Equal(t, tt.err, err) if err == nil { - <-ch task := c.getCompaction(tt.args.plan.PlanID) if !tt.wantErr { assert.Equal(t, tt.args.plan, task.plan) diff --git a/internal/datacoord/mock_allocator_test.go b/internal/datacoord/mock_allocator_test.go new file mode 100644 index 0000000000..92a21425c0 --- /dev/null +++ b/internal/datacoord/mock_allocator_test.go @@ -0,0 +1,125 @@ +// Code generated by mockery v2.16.0. DO NOT EDIT. + +package datacoord + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// NMockAllocator is an autogenerated mock type for the allocator type +type NMockAllocator struct { + mock.Mock +} + +type NMockAllocator_Expecter struct { + mock *mock.Mock +} + +func (_m *NMockAllocator) EXPECT() *NMockAllocator_Expecter { + return &NMockAllocator_Expecter{mock: &_m.Mock} +} + +// allocID provides a mock function with given fields: _a0 +func (_m *NMockAllocator) allocID(_a0 context.Context) (int64, error) { + ret := _m.Called(_a0) + + var r0 int64 + if rf, ok := ret.Get(0).(func(context.Context) int64); ok { + r0 = rf(_a0) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NMockAllocator_allocID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'allocID' +type NMockAllocator_allocID_Call struct { + *mock.Call +} + +// allocID is a helper method to define mock.On call +// - _a0 context.Context +func (_e *NMockAllocator_Expecter) allocID(_a0 interface{}) *NMockAllocator_allocID_Call { + return &NMockAllocator_allocID_Call{Call: _e.mock.On("allocID", _a0)} +} + +func (_c *NMockAllocator_allocID_Call) Run(run func(_a0 context.Context)) *NMockAllocator_allocID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *NMockAllocator_allocID_Call) Return(_a0 int64, _a1 error) *NMockAllocator_allocID_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// allocTimestamp provides a mock function with given fields: _a0 +func (_m *NMockAllocator) allocTimestamp(_a0 context.Context) (uint64, error) { + ret := _m.Called(_a0) + + var r0 uint64 + if rf, ok := ret.Get(0).(func(context.Context) uint64); ok { + r0 = rf(_a0) + } else { + r0 = ret.Get(0).(uint64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NMockAllocator_allocTimestamp_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'allocTimestamp' +type NMockAllocator_allocTimestamp_Call struct { + *mock.Call +} + +// allocTimestamp is a helper method to define mock.On call +// - _a0 context.Context +func (_e *NMockAllocator_Expecter) allocTimestamp(_a0 interface{}) *NMockAllocator_allocTimestamp_Call { + return &NMockAllocator_allocTimestamp_Call{Call: _e.mock.On("allocTimestamp", _a0)} +} + +func (_c *NMockAllocator_allocTimestamp_Call) Run(run func(_a0 context.Context)) *NMockAllocator_allocTimestamp_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *NMockAllocator_allocTimestamp_Call) Return(_a0 uint64, _a1 error) *NMockAllocator_allocTimestamp_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +type mockConstructorTestingTNewNMockAllocator interface { + mock.TestingT + Cleanup(func()) +} + +// NewNMockAllocator creates a new instance of NMockAllocator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewNMockAllocator(t mockConstructorTestingTNewNMockAllocator) *NMockAllocator { + mock := &NMockAllocator{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}