diff --git a/Makefile b/Makefile index 51fa5f5581..530fb1ff5d 100644 --- a/Makefile +++ b/Makefile @@ -357,4 +357,7 @@ generate-mockery: getdeps $(PWD)/bin/mockery --name=Loader --dir=$(PWD)/internal/querynodev2/segments --output=$(PWD)/internal/querynodev2/segments --filename=mock_loader.go --with-expecter --outpkg=segments --structname=MockLoader --inpackage $(PWD)/bin/mockery --name=Worker --dir=$(PWD)/internal/querynodev2/cluster --output=$(PWD)/internal/querynodev2/cluster --filename=mock_worker.go --with-expecter --outpkg=worker --structname=MockWorker --inpackage $(PWD)/bin/mockery --name=ShardDelegator --dir=$(PWD)/internal/querynodev2/delegator/ --output=$(PWD)/internal/querynodev2/delegator/ --filename=mock_delegator.go --with-expecter --outpkg=delegator --structname=MockShardDelegator --inpackage + # internal/datacoord + $(PWD)/bin/mockery --dir=internal/datacoord --name=compactionPlanContext --filename=mock_compaction_plan_context.go --output=internal/datacoord --structname=MockCompactionPlanContext --with-expecter --inpackage + $(PWD)/bin/mockery --dir=internal/datacoord --name=Handler --filename=mock_handler.go --output=internal/datacoord --structname=NMockHandler --with-expecter --inpackage diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index acc9056702..ddf8de413d 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -173,14 +173,26 @@ func (t *compactionTrigger) allocTs() (Timestamp, error) { return ts, nil } -func (t *compactionTrigger) getCompactTime(ts Timestamp, collectionID UniqueID) (*compactTime, error) { +func (t *compactionTrigger) getCollection(collectionID UniqueID) (*collectionInfo, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() coll, err := t.handler.GetCollection(ctx, collectionID) if err != nil { return nil, fmt.Errorf("collection ID %d not found, err: %w", collectionID, err) } + return coll, nil +} +func (t *compactionTrigger) isCollectionAutoCompactionEnabled(coll *collectionInfo) bool { + enabled, err := getCollectionAutoCompactionEnabled(coll.Properties) + if err != nil { + log.Warn("collection properties auto compaction not valid, returning false", zap.Error(err)) + return false + } + return enabled +} + +func (t *compactionTrigger) getCompactTime(ts Timestamp, coll *collectionInfo) (*compactTime, error) { collectionTTL, err := getCollectionTTL(coll.Properties) if err != nil { return nil, err @@ -359,7 +371,25 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { continue } - ct, err := t.getCompactTime(ts, group.collectionID) + coll, err := t.getCollection(group.collectionID) + if err != nil { + log.Warn("get collection info failed, skip handling compaction", + zap.Int64("collectionID", group.collectionID), + zap.Int64("partitionID", group.partitionID), + zap.String("channel", group.channelName), + zap.Error(err), + ) + return + } + + if !signal.isForce && !t.isCollectionAutoCompactionEnabled(coll) { + log.RatedInfo(20, "collection auto compaction disabled", + zap.Int64("collectionID", group.collectionID), + ) + return + } + + ct, err := t.getCompactTime(ts, coll) if err != nil { log.Warn("get compact time failed, skip to handle compaction", zap.Int64("collectionID", group.collectionID), @@ -430,6 +460,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { channel := segment.GetInsertChannel() partitionID := segment.GetPartitionID() + collectionID := segment.GetCollectionID() segments := t.getCandidateSegments(channel, partitionID) if len(segments) == 0 { @@ -449,7 +480,25 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { return } - ct, err := t.getCompactTime(ts, segment.GetCollectionID()) + coll, err := t.getCollection(collectionID) + if err != nil { + log.Warn("get collection info failed, skip handling compaction", + zap.Int64("collectionID", collectionID), + zap.Int64("partitionID", partitionID), + zap.String("channel", channel), + zap.Error(err), + ) + return + } + + if !signal.isForce && !t.isCollectionAutoCompactionEnabled(coll) { + log.RatedInfo(20, "collection auto compaction disabled", + zap.Int64("collectionID", collectionID), + ) + return + } + + ct, err := t.getCompactTime(ts, coll) if err != nil { log.Warn("get compact time failed, skip to handle compaction", zap.Int64("collectionID", segment.GetCollectionID()), zap.Int64("partitionID", partitionID), zap.String("channel", channel)) diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 914d183613..62ea87027e 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -21,8 +21,11 @@ import ( "testing" "time" + "github.com/cockroachdb/errors" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" @@ -1730,9 +1733,266 @@ func Test_getCompactTime(t *testing.T) { meta: m, }, }) - + coll := &collectionInfo{ + ID: 1, + Schema: newTestSchema(), + Partitions: []UniqueID{1}, + Properties: map[string]string{ + common.CollectionTTLConfigKey: "10", + }, + } now := tsoutil.GetCurrentTime() - ct, err := got.getCompactTime(now, 1) + ct, err := got.getCompactTime(now, coll) assert.NoError(t, err) assert.NotNil(t, ct) } + +type CompactionTriggerSuite struct { + suite.Suite + + collectionID int64 + partitionID int64 + channel string + + meta *meta + tr *compactionTrigger + allocator *NMockAllocator + handler *NMockHandler + compactionHandler *MockCompactionPlanContext +} + +func (s *CompactionTriggerSuite) SetupSuite() { +} + +func (s *CompactionTriggerSuite) genSeg(segID, numRows int64) *datapb.SegmentInfo { + return &datapb.SegmentInfo{ + ID: segID, + CollectionID: s.collectionID, + PartitionID: s.partitionID, + LastExpireTime: 100, + NumOfRows: numRows, + MaxRowNum: 110, + InsertChannel: s.channel, + State: commonpb.SegmentState_Flushed, + Binlogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {EntriesNum: 5, LogPath: "log1", LogSize: 100}, + }, + }, + }, + } +} + +func (s *CompactionTriggerSuite) SetupTest() { + s.collectionID = 100 + s.partitionID = 200 + s.channel = "dml_0_100v0" + s.meta = &meta{segments: &SegmentsInfo{ + map[int64]*SegmentInfo{ + 1: { + SegmentInfo: s.genSeg(1, 60), + lastFlushTime: time.Now().Add(-100 * time.Minute), + }, + 2: { + SegmentInfo: s.genSeg(2, 60), + lastFlushTime: time.Now(), + }, + 3: { + SegmentInfo: s.genSeg(3, 60), + lastFlushTime: time.Now(), + }, + 4: { + SegmentInfo: s.genSeg(4, 60), + lastFlushTime: time.Now(), + }, + 5: { + SegmentInfo: s.genSeg(5, 26), + lastFlushTime: time.Now(), + }, + 6: { + SegmentInfo: s.genSeg(6, 26), + lastFlushTime: time.Now(), + }, + }, + }, + } + s.allocator = NewNMockAllocator(s.T()) + s.compactionHandler = NewMockCompactionPlanContext(s.T()) + s.handler = NewNMockHandler(s.T()) + s.tr = newCompactionTrigger( + s.meta, + s.compactionHandler, + s.allocator, + s.handler, + ) + s.tr.testingOnly = true +} + +func (s *CompactionTriggerSuite) TestHandleSignal() { + s.Run("getCompaction_failed", func() { + defer s.SetupTest() + tr := s.tr + s.compactionHandler.EXPECT().isFull().Return(false) + s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(nil, errors.New("mocked")) + tr.handleSignal(&compactionSignal{ + segmentID: 1, + collectionID: s.collectionID, + partitionID: s.partitionID, + channel: s.channel, + isForce: false, + }) + + // suite shall check compactionHandler.execCompactionPlan never called + }) + + s.Run("collectionAutoCompactionConfigError", func() { + defer s.SetupTest() + tr := s.tr + s.compactionHandler.EXPECT().isFull().Return(false) + s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ + Properties: map[string]string{ + common.CollectionAutoCompactionKey: "bad_value", + }, + }, nil) + tr.handleSignal(&compactionSignal{ + segmentID: 1, + collectionID: s.collectionID, + partitionID: s.partitionID, + channel: s.channel, + isForce: false, + }) + + // suite shall check compactionHandler.execCompactionPlan never called + }) + + s.Run("collectionAutoCompactionDisabled", func() { + defer s.SetupTest() + tr := s.tr + s.compactionHandler.EXPECT().isFull().Return(false) + s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ + Properties: map[string]string{ + common.CollectionAutoCompactionKey: "false", + }, + }, nil) + tr.handleSignal(&compactionSignal{ + segmentID: 1, + collectionID: s.collectionID, + partitionID: s.partitionID, + channel: s.channel, + isForce: false, + }) + + // suite shall check compactionHandler.execCompactionPlan never called + }) + + s.Run("collectionAutoCompactionDisabled_force", func() { + defer s.SetupTest() + tr := s.tr + s.compactionHandler.EXPECT().isFull().Return(false) + s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + s.allocator.EXPECT().allocID(mock.Anything).Return(20000, nil) + s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ + Properties: map[string]string{ + common.CollectionAutoCompactionKey: "false", + }, + }, nil) + s.compactionHandler.EXPECT().execCompactionPlan(mock.Anything, mock.Anything).Return(nil) + tr.handleSignal(&compactionSignal{ + segmentID: 1, + collectionID: s.collectionID, + partitionID: s.partitionID, + channel: s.channel, + isForce: true, + }) + }) +} + +func (s *CompactionTriggerSuite) TestHandleGlobalSignal() { + s.Run("getCompaction_failed", func() { + defer s.SetupTest() + tr := s.tr + s.compactionHandler.EXPECT().isFull().Return(false) + s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(nil, errors.New("mocked")) + tr.handleGlobalSignal(&compactionSignal{ + segmentID: 1, + collectionID: s.collectionID, + partitionID: s.partitionID, + channel: s.channel, + isForce: false, + }) + + // suite shall check compactionHandler.execCompactionPlan never called + }) + + s.Run("collectionAutoCompactionConfigError", func() { + defer s.SetupTest() + tr := s.tr + s.compactionHandler.EXPECT().isFull().Return(false) + s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ + Properties: map[string]string{ + common.CollectionAutoCompactionKey: "bad_value", + }, + }, nil) + tr.handleGlobalSignal(&compactionSignal{ + segmentID: 1, + collectionID: s.collectionID, + partitionID: s.partitionID, + channel: s.channel, + isForce: false, + }) + + // suite shall check compactionHandler.execCompactionPlan never called + }) + + s.Run("collectionAutoCompactionDisabled", func() { + defer s.SetupTest() + tr := s.tr + s.compactionHandler.EXPECT().isFull().Return(false) + s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ + Properties: map[string]string{ + common.CollectionAutoCompactionKey: "false", + }, + }, nil) + tr.handleGlobalSignal(&compactionSignal{ + segmentID: 1, + collectionID: s.collectionID, + partitionID: s.partitionID, + channel: s.channel, + isForce: false, + }) + + // suite shall check compactionHandler.execCompactionPlan never called + }) + + s.Run("collectionAutoCompactionDisabled_force", func() { + defer s.SetupTest() + tr := s.tr + s.compactionHandler.EXPECT().isFull().Return(false) + s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + s.allocator.EXPECT().allocID(mock.Anything).Return(20000, nil) + s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ + Properties: map[string]string{ + common.CollectionAutoCompactionKey: "false", + }, + }, nil) + s.compactionHandler.EXPECT().execCompactionPlan(mock.Anything, mock.Anything).Return(nil) + tr.handleGlobalSignal(&compactionSignal{ + segmentID: 1, + collectionID: s.collectionID, + partitionID: s.partitionID, + channel: s.channel, + isForce: true, + }) + }) +} + +func TestCompactionTriggerSuite(t *testing.T) { + suite.Run(t, new(CompactionTriggerSuite)) +} diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 74ef472aa8..505820e970 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -36,9 +36,9 @@ import ( // Handler handles some channel method for ChannelManager type Handler interface { // GetQueryVChanPositions gets the information recovery needed of a channel for QueryCoord - GetQueryVChanPositions(channel *channel, partitionIDs ...UniqueID) *datapb.VchannelInfo + GetQueryVChanPositions(ch *channel, partitionIDs ...UniqueID) *datapb.VchannelInfo // GetDataVChanPositions gets the information recovery needed of a channel for DataNode - GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo + GetDataVChanPositions(ch *channel, partitionID UniqueID) *datapb.VchannelInfo CheckShouldDropChannel(channel string, collectionID UniqueID) bool FinishDropChannel(channel string) error GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error) diff --git a/internal/datacoord/mock_compaction_plan_context.go b/internal/datacoord/mock_compaction_plan_context.go new file mode 100644 index 0000000000..612a0458b3 --- /dev/null +++ b/internal/datacoord/mock_compaction_plan_context.go @@ -0,0 +1,279 @@ +// Code generated by mockery v2.16.0. DO NOT EDIT. + +package datacoord + +import ( + datapb "github.com/milvus-io/milvus/internal/proto/datapb" + mock "github.com/stretchr/testify/mock" +) + +// MockCompactionPlanContext is an autogenerated mock type for the compactionPlanContext type +type MockCompactionPlanContext struct { + mock.Mock +} + +type MockCompactionPlanContext_Expecter struct { + mock *mock.Mock +} + +func (_m *MockCompactionPlanContext) EXPECT() *MockCompactionPlanContext_Expecter { + return &MockCompactionPlanContext_Expecter{mock: &_m.Mock} +} + +// execCompactionPlan provides a mock function with given fields: signal, plan +func (_m *MockCompactionPlanContext) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error { + ret := _m.Called(signal, plan) + + var r0 error + if rf, ok := ret.Get(0).(func(*compactionSignal, *datapb.CompactionPlan) error); ok { + r0 = rf(signal, plan) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockCompactionPlanContext_execCompactionPlan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'execCompactionPlan' +type MockCompactionPlanContext_execCompactionPlan_Call struct { + *mock.Call +} + +// execCompactionPlan is a helper method to define mock.On call +// - signal *compactionSignal +// - plan *datapb.CompactionPlan +func (_e *MockCompactionPlanContext_Expecter) execCompactionPlan(signal interface{}, plan interface{}) *MockCompactionPlanContext_execCompactionPlan_Call { + return &MockCompactionPlanContext_execCompactionPlan_Call{Call: _e.mock.On("execCompactionPlan", signal, plan)} +} + +func (_c *MockCompactionPlanContext_execCompactionPlan_Call) Run(run func(signal *compactionSignal, plan *datapb.CompactionPlan)) *MockCompactionPlanContext_execCompactionPlan_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*compactionSignal), args[1].(*datapb.CompactionPlan)) + }) + return _c +} + +func (_c *MockCompactionPlanContext_execCompactionPlan_Call) Return(_a0 error) *MockCompactionPlanContext_execCompactionPlan_Call { + _c.Call.Return(_a0) + return _c +} + +// getCompaction provides a mock function with given fields: planID +func (_m *MockCompactionPlanContext) getCompaction(planID int64) *compactionTask { + ret := _m.Called(planID) + + var r0 *compactionTask + if rf, ok := ret.Get(0).(func(int64) *compactionTask); ok { + r0 = rf(planID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*compactionTask) + } + } + + return r0 +} + +// MockCompactionPlanContext_getCompaction_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'getCompaction' +type MockCompactionPlanContext_getCompaction_Call struct { + *mock.Call +} + +// getCompaction is a helper method to define mock.On call +// - planID int64 +func (_e *MockCompactionPlanContext_Expecter) getCompaction(planID interface{}) *MockCompactionPlanContext_getCompaction_Call { + return &MockCompactionPlanContext_getCompaction_Call{Call: _e.mock.On("getCompaction", planID)} +} + +func (_c *MockCompactionPlanContext_getCompaction_Call) Run(run func(planID int64)) *MockCompactionPlanContext_getCompaction_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockCompactionPlanContext_getCompaction_Call) Return(_a0 *compactionTask) *MockCompactionPlanContext_getCompaction_Call { + _c.Call.Return(_a0) + return _c +} + +// getCompactionTasksBySignalID provides a mock function with given fields: signalID +func (_m *MockCompactionPlanContext) getCompactionTasksBySignalID(signalID int64) []*compactionTask { + ret := _m.Called(signalID) + + var r0 []*compactionTask + if rf, ok := ret.Get(0).(func(int64) []*compactionTask); ok { + r0 = rf(signalID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*compactionTask) + } + } + + return r0 +} + +// MockCompactionPlanContext_getCompactionTasksBySignalID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'getCompactionTasksBySignalID' +type MockCompactionPlanContext_getCompactionTasksBySignalID_Call struct { + *mock.Call +} + +// getCompactionTasksBySignalID is a helper method to define mock.On call +// - signalID int64 +func (_e *MockCompactionPlanContext_Expecter) getCompactionTasksBySignalID(signalID interface{}) *MockCompactionPlanContext_getCompactionTasksBySignalID_Call { + return &MockCompactionPlanContext_getCompactionTasksBySignalID_Call{Call: _e.mock.On("getCompactionTasksBySignalID", signalID)} +} + +func (_c *MockCompactionPlanContext_getCompactionTasksBySignalID_Call) Run(run func(signalID int64)) *MockCompactionPlanContext_getCompactionTasksBySignalID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockCompactionPlanContext_getCompactionTasksBySignalID_Call) Return(_a0 []*compactionTask) *MockCompactionPlanContext_getCompactionTasksBySignalID_Call { + _c.Call.Return(_a0) + return _c +} + +// isFull provides a mock function with given fields: +func (_m *MockCompactionPlanContext) isFull() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockCompactionPlanContext_isFull_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'isFull' +type MockCompactionPlanContext_isFull_Call struct { + *mock.Call +} + +// isFull is a helper method to define mock.On call +func (_e *MockCompactionPlanContext_Expecter) isFull() *MockCompactionPlanContext_isFull_Call { + return &MockCompactionPlanContext_isFull_Call{Call: _e.mock.On("isFull")} +} + +func (_c *MockCompactionPlanContext_isFull_Call) Run(run func()) *MockCompactionPlanContext_isFull_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockCompactionPlanContext_isFull_Call) Return(_a0 bool) *MockCompactionPlanContext_isFull_Call { + _c.Call.Return(_a0) + return _c +} + +// start provides a mock function with given fields: +func (_m *MockCompactionPlanContext) start() { + _m.Called() +} + +// MockCompactionPlanContext_start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'start' +type MockCompactionPlanContext_start_Call struct { + *mock.Call +} + +// start is a helper method to define mock.On call +func (_e *MockCompactionPlanContext_Expecter) start() *MockCompactionPlanContext_start_Call { + return &MockCompactionPlanContext_start_Call{Call: _e.mock.On("start")} +} + +func (_c *MockCompactionPlanContext_start_Call) Run(run func()) *MockCompactionPlanContext_start_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockCompactionPlanContext_start_Call) Return() *MockCompactionPlanContext_start_Call { + _c.Call.Return() + return _c +} + +// stop provides a mock function with given fields: +func (_m *MockCompactionPlanContext) stop() { + _m.Called() +} + +// MockCompactionPlanContext_stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'stop' +type MockCompactionPlanContext_stop_Call struct { + *mock.Call +} + +// stop is a helper method to define mock.On call +func (_e *MockCompactionPlanContext_Expecter) stop() *MockCompactionPlanContext_stop_Call { + return &MockCompactionPlanContext_stop_Call{Call: _e.mock.On("stop")} +} + +func (_c *MockCompactionPlanContext_stop_Call) Run(run func()) *MockCompactionPlanContext_stop_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockCompactionPlanContext_stop_Call) Return() *MockCompactionPlanContext_stop_Call { + _c.Call.Return() + return _c +} + +// updateCompaction provides a mock function with given fields: ts +func (_m *MockCompactionPlanContext) updateCompaction(ts uint64) error { + ret := _m.Called(ts) + + var r0 error + if rf, ok := ret.Get(0).(func(uint64) error); ok { + r0 = rf(ts) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockCompactionPlanContext_updateCompaction_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'updateCompaction' +type MockCompactionPlanContext_updateCompaction_Call struct { + *mock.Call +} + +// updateCompaction is a helper method to define mock.On call +// - ts uint64 +func (_e *MockCompactionPlanContext_Expecter) updateCompaction(ts interface{}) *MockCompactionPlanContext_updateCompaction_Call { + return &MockCompactionPlanContext_updateCompaction_Call{Call: _e.mock.On("updateCompaction", ts)} +} + +func (_c *MockCompactionPlanContext_updateCompaction_Call) Run(run func(ts uint64)) *MockCompactionPlanContext_updateCompaction_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(uint64)) + }) + return _c +} + +func (_c *MockCompactionPlanContext_updateCompaction_Call) Return(_a0 error) *MockCompactionPlanContext_updateCompaction_Call { + _c.Call.Return(_a0) + return _c +} + +type mockConstructorTestingTNewMockCompactionPlanContext interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockCompactionPlanContext creates a new instance of MockCompactionPlanContext. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockCompactionPlanContext(t mockConstructorTestingTNewMockCompactionPlanContext) *MockCompactionPlanContext { + mock := &MockCompactionPlanContext{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/datacoord/mock_handler.go b/internal/datacoord/mock_handler.go new file mode 100644 index 0000000000..87806fa08c --- /dev/null +++ b/internal/datacoord/mock_handler.go @@ -0,0 +1,254 @@ +// Code generated by mockery v2.16.0. DO NOT EDIT. + +package datacoord + +import ( + context "context" + + datapb "github.com/milvus-io/milvus/internal/proto/datapb" + mock "github.com/stretchr/testify/mock" +) + +// NMockHandler is an autogenerated mock type for the Handler type +type NMockHandler struct { + mock.Mock +} + +type NMockHandler_Expecter struct { + mock *mock.Mock +} + +func (_m *NMockHandler) EXPECT() *NMockHandler_Expecter { + return &NMockHandler_Expecter{mock: &_m.Mock} +} + +// CheckShouldDropChannel provides a mock function with given fields: channel, collectionID +func (_m *NMockHandler) CheckShouldDropChannel(channel string, collectionID int64) bool { + ret := _m.Called(channel, collectionID) + + var r0 bool + if rf, ok := ret.Get(0).(func(string, int64) bool); ok { + r0 = rf(channel, collectionID) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// NMockHandler_CheckShouldDropChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckShouldDropChannel' +type NMockHandler_CheckShouldDropChannel_Call struct { + *mock.Call +} + +// CheckShouldDropChannel is a helper method to define mock.On call +// - channel string +// - collectionID int64 +func (_e *NMockHandler_Expecter) CheckShouldDropChannel(channel interface{}, collectionID interface{}) *NMockHandler_CheckShouldDropChannel_Call { + return &NMockHandler_CheckShouldDropChannel_Call{Call: _e.mock.On("CheckShouldDropChannel", channel, collectionID)} +} + +func (_c *NMockHandler_CheckShouldDropChannel_Call) Run(run func(channel string, collectionID int64)) *NMockHandler_CheckShouldDropChannel_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(int64)) + }) + return _c +} + +func (_c *NMockHandler_CheckShouldDropChannel_Call) Return(_a0 bool) *NMockHandler_CheckShouldDropChannel_Call { + _c.Call.Return(_a0) + return _c +} + +// FinishDropChannel provides a mock function with given fields: channel +func (_m *NMockHandler) FinishDropChannel(channel string) error { + ret := _m.Called(channel) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(channel) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NMockHandler_FinishDropChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FinishDropChannel' +type NMockHandler_FinishDropChannel_Call struct { + *mock.Call +} + +// FinishDropChannel is a helper method to define mock.On call +// - channel string +func (_e *NMockHandler_Expecter) FinishDropChannel(channel interface{}) *NMockHandler_FinishDropChannel_Call { + return &NMockHandler_FinishDropChannel_Call{Call: _e.mock.On("FinishDropChannel", channel)} +} + +func (_c *NMockHandler_FinishDropChannel_Call) Run(run func(channel string)) *NMockHandler_FinishDropChannel_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *NMockHandler_FinishDropChannel_Call) Return(_a0 error) *NMockHandler_FinishDropChannel_Call { + _c.Call.Return(_a0) + return _c +} + +// GetCollection provides a mock function with given fields: ctx, collectionID +func (_m *NMockHandler) GetCollection(ctx context.Context, collectionID int64) (*collectionInfo, error) { + ret := _m.Called(ctx, collectionID) + + var r0 *collectionInfo + if rf, ok := ret.Get(0).(func(context.Context, int64) *collectionInfo); ok { + r0 = rf(ctx, collectionID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*collectionInfo) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, collectionID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NMockHandler_GetCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCollection' +type NMockHandler_GetCollection_Call struct { + *mock.Call +} + +// GetCollection is a helper method to define mock.On call +// - ctx context.Context +// - collectionID int64 +func (_e *NMockHandler_Expecter) GetCollection(ctx interface{}, collectionID interface{}) *NMockHandler_GetCollection_Call { + return &NMockHandler_GetCollection_Call{Call: _e.mock.On("GetCollection", ctx, collectionID)} +} + +func (_c *NMockHandler_GetCollection_Call) Run(run func(ctx context.Context, collectionID int64)) *NMockHandler_GetCollection_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *NMockHandler_GetCollection_Call) Return(_a0 *collectionInfo, _a1 error) *NMockHandler_GetCollection_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +// GetDataVChanPositions provides a mock function with given fields: channel, partitionID +func (_m *NMockHandler) GetDataVChanPositions(ch *channel, partitionID int64) *datapb.VchannelInfo { + ret := _m.Called(ch, partitionID) + + var r0 *datapb.VchannelInfo + if rf, ok := ret.Get(0).(func(*channel, int64) *datapb.VchannelInfo); ok { + r0 = rf(ch, partitionID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datapb.VchannelInfo) + } + } + + return r0 +} + +// NMockHandler_GetDataVChanPositions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDataVChanPositions' +type NMockHandler_GetDataVChanPositions_Call struct { + *mock.Call +} + +// GetDataVChanPositions is a helper method to define mock.On call +// - channel *channel +// - partitionID int64 +func (_e *NMockHandler_Expecter) GetDataVChanPositions(channel interface{}, partitionID interface{}) *NMockHandler_GetDataVChanPositions_Call { + return &NMockHandler_GetDataVChanPositions_Call{Call: _e.mock.On("GetDataVChanPositions", channel, partitionID)} +} + +func (_c *NMockHandler_GetDataVChanPositions_Call) Run(run func(channel *channel, partitionID int64)) *NMockHandler_GetDataVChanPositions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*channel), args[1].(int64)) + }) + return _c +} + +func (_c *NMockHandler_GetDataVChanPositions_Call) Return(_a0 *datapb.VchannelInfo) *NMockHandler_GetDataVChanPositions_Call { + _c.Call.Return(_a0) + return _c +} + +// GetQueryVChanPositions provides a mock function with given fields: channel, partitionIDs +func (_m *NMockHandler) GetQueryVChanPositions(ch *channel, partitionIDs ...int64) *datapb.VchannelInfo { + _va := make([]interface{}, len(partitionIDs)) + for _i := range partitionIDs { + _va[_i] = partitionIDs[_i] + } + var _ca []interface{} + _ca = append(_ca, ch) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *datapb.VchannelInfo + if rf, ok := ret.Get(0).(func(*channel, ...int64) *datapb.VchannelInfo); ok { + r0 = rf(ch, partitionIDs...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datapb.VchannelInfo) + } + } + + return r0 +} + +// NMockHandler_GetQueryVChanPositions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetQueryVChanPositions' +type NMockHandler_GetQueryVChanPositions_Call struct { + *mock.Call +} + +// GetQueryVChanPositions is a helper method to define mock.On call +// - channel *channel +// - partitionIDs ...int64 +func (_e *NMockHandler_Expecter) GetQueryVChanPositions(channel interface{}, partitionIDs ...interface{}) *NMockHandler_GetQueryVChanPositions_Call { + return &NMockHandler_GetQueryVChanPositions_Call{Call: _e.mock.On("GetQueryVChanPositions", + append([]interface{}{channel}, partitionIDs...)...)} +} + +func (_c *NMockHandler_GetQueryVChanPositions_Call) Run(run func(channel *channel, partitionIDs ...int64)) *NMockHandler_GetQueryVChanPositions_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]int64, len(args)-1) + for i, a := range args[1:] { + if a != nil { + variadicArgs[i] = a.(int64) + } + } + run(args[0].(*channel), variadicArgs...) + }) + return _c +} + +func (_c *NMockHandler_GetQueryVChanPositions_Call) Return(_a0 *datapb.VchannelInfo) *NMockHandler_GetQueryVChanPositions_Call { + _c.Call.Return(_a0) + return _c +} + +type mockConstructorTestingTNewNMockHandler interface { + mock.TestingT + Cleanup(func()) +} + +// NewNMockHandler creates a new instance of NMockHandler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewNMockHandler(t mockConstructorTestingTNewNMockHandler) *NMockHandler { + mock := &NMockHandler{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 37a4ddde97..c8712236ae 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -161,6 +161,20 @@ func getCollectionTTL(properties map[string]string) (time.Duration, error) { return Params.CommonCfg.EntityExpirationTTL.GetAsDuration(time.Second), nil } +// getCollectionAutoCompactionEnabled returns whether auto compaction for collection is enabled. +// if not set, returns global auto compaction config. +func getCollectionAutoCompactionEnabled(properties map[string]string) (bool, error) { + v, ok := properties[common.CollectionAutoCompactionKey] + if ok { + enabled, err := strconv.ParseBool(v) + if err != nil { + return false, err + } + return enabled, nil + } + return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool(), nil +} + func getIndexType(indexParams []*commonpb.KeyValuePair) string { for _, param := range indexParams { if param.Key == "index_type" { diff --git a/internal/datacoord/util_test.go b/internal/datacoord/util_test.go index c98bf1f6ae..f4f6273670 100644 --- a/internal/datacoord/util_test.go +++ b/internal/datacoord/util_test.go @@ -196,3 +196,24 @@ func (suite *UtilSuite) TestGetCollectionTTL() { suite.NoError(err) suite.Equal(ttl, Params.CommonCfg.EntityExpirationTTL.GetAsDuration(time.Second)) } + +func (suite *UtilSuite) TestGetCollectionAutoCompactionEnabled() { + properties := map[string]string{ + common.CollectionAutoCompactionKey: "true", + } + + enabled, err := getCollectionAutoCompactionEnabled(properties) + suite.NoError(err) + suite.True(enabled) + + properties = map[string]string{ + common.CollectionAutoCompactionKey: "bad_value", + } + + _, err = getCollectionAutoCompactionEnabled(properties) + suite.Error(err) + + enabled, err = getCollectionAutoCompactionEnabled(map[string]string{}) + suite.NoError(err) + suite.Equal(Params.DataCoordCfg.EnableAutoCompaction.GetAsBool(), enabled) +} diff --git a/pkg/common/common.go b/pkg/common/common.go index 1b4bbe8119..c423f1ae90 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -90,7 +90,8 @@ const ( // Collection properties key const ( - CollectionTTLConfigKey = "collection.ttl.seconds" + CollectionTTLConfigKey = "collection.ttl.seconds" + CollectionAutoCompactionKey = "collection.autocompaction.enabled" ) const (