diff --git a/Makefile b/Makefile index 7df8ba467a..a52a7a6084 100644 --- a/Makefile +++ b/Makefile @@ -488,6 +488,8 @@ generate-mockery-datacoord: getdeps $(INSTALL_PATH)/mockery --name=SubCluster --dir=internal/datacoord --filename=mock_subcluster.go --output=internal/datacoord --structname=MockSubCluster --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=Broker --dir=internal/datacoord/broker --filename=mock_coordinator_broker.go --output=internal/datacoord/broker --structname=MockBroker --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=WorkerManager --dir=internal/datacoord --filename=mock_worker_manager.go --output=internal/datacoord --structname=MockWorkerManager --with-expecter --inpackage + $(INSTALL_PATH)/mockery --name=Manager --dir=internal/datacoord --filename=mock_segment_manager.go --output=internal/datacoord --structname=MockSegmentManager --with-expecter --inpackage + generate-mockery-datanode: getdeps $(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage diff --git a/internal/datacoord/mock_channel_store.go b/internal/datacoord/mock_channel_store.go index b5b0ecd6c4..47a3fafbd3 100644 --- a/internal/datacoord/mock_channel_store.go +++ b/internal/datacoord/mock_channel_store.go @@ -179,50 +179,6 @@ func (_c *MockRWChannelStore_GetNodeChannelCount_Call) RunAndReturn(run func(int return _c } -// GetNodeChannelsByCollectionID provides a mock function with given fields: collectionID -func (_m *MockRWChannelStore) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string { - ret := _m.Called(collectionID) - - var r0 map[int64][]string - if rf, ok := ret.Get(0).(func(int64) map[int64][]string); ok { - r0 = rf(collectionID) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(map[int64][]string) - } - } - - return r0 -} - -// MockRWChannelStore_GetNodeChannelsByCollectionID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelsByCollectionID' -type MockRWChannelStore_GetNodeChannelsByCollectionID_Call struct { - *mock.Call -} - -// GetNodeChannelsByCollectionID is a helper method to define mock.On call -// - collectionID int64 -func (_e *MockRWChannelStore_Expecter) GetNodeChannelsByCollectionID(collectionID interface{}) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { - return &MockRWChannelStore_GetNodeChannelsByCollectionID_Call{Call: _e.mock.On("GetNodeChannelsByCollectionID", collectionID)} -} - -func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Run(run func(collectionID int64)) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64)) - }) - return _c -} - -func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Return(_a0 map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) RunAndReturn(run func(int64) map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { - _c.Call.Return(run) - return _c -} - // GetNodeChannelsBy provides a mock function with given fields: nodeSelector, channelSelectors func (_m *MockRWChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo { _va := make([]interface{}, len(channelSelectors)) @@ -282,6 +238,50 @@ func (_c *MockRWChannelStore_GetNodeChannelsBy_Call) RunAndReturn(run func(NodeS return _c } +// GetNodeChannelsByCollectionID provides a mock function with given fields: collectionID +func (_m *MockRWChannelStore) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string { + ret := _m.Called(collectionID) + + var r0 map[int64][]string + if rf, ok := ret.Get(0).(func(int64) map[int64][]string); ok { + r0 = rf(collectionID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64][]string) + } + } + + return r0 +} + +// MockRWChannelStore_GetNodeChannelsByCollectionID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelsByCollectionID' +type MockRWChannelStore_GetNodeChannelsByCollectionID_Call struct { + *mock.Call +} + +// GetNodeChannelsByCollectionID is a helper method to define mock.On call +// - collectionID int64 +func (_e *MockRWChannelStore_Expecter) GetNodeChannelsByCollectionID(collectionID interface{}) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + return &MockRWChannelStore_GetNodeChannelsByCollectionID_Call{Call: _e.mock.On("GetNodeChannelsByCollectionID", collectionID)} +} + +func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Run(run func(collectionID int64)) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Return(_a0 map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) RunAndReturn(run func(int64) map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call { + _c.Call.Return(run) + return _c +} + // GetNodes provides a mock function with given fields: func (_m *MockRWChannelStore) GetNodes() []int64 { ret := _m.Called() diff --git a/internal/datacoord/mock_channelmanager.go b/internal/datacoord/mock_channelmanager.go index e8b4ebe897..5239ab6e91 100644 --- a/internal/datacoord/mock_channelmanager.go +++ b/internal/datacoord/mock_channelmanager.go @@ -376,58 +376,6 @@ func (_c *MockChannelManager_GetNodeChannelsByCollectionID_Call) RunAndReturn(ru return _c } -// GetNodeIDByChannelName provides a mock function with given fields: channel -func (_m *MockChannelManager) GetNodeIDByChannelName(channel string) (int64, bool) { - ret := _m.Called(channel) - - var r0 int64 - var r1 bool - if rf, ok := ret.Get(0).(func(string) (int64, bool)); ok { - return rf(channel) - } - if rf, ok := ret.Get(0).(func(string) int64); ok { - r0 = rf(channel) - } else { - r0 = ret.Get(0).(int64) - } - - if rf, ok := ret.Get(1).(func(string) bool); ok { - r1 = rf(channel) - } else { - r1 = ret.Get(1).(bool) - } - - return r0, r1 -} - -// MockChannelManager_GetNodeIDByChannelName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeIDByChannelName' -type MockChannelManager_GetNodeIDByChannelName_Call struct { - *mock.Call -} - -// GetNodeIDByChannelName is a helper method to define mock.On call -// - channel string -func (_e *MockChannelManager_Expecter) GetNodeIDByChannelName(channel interface{}) *MockChannelManager_GetNodeIDByChannelName_Call { - return &MockChannelManager_GetNodeIDByChannelName_Call{Call: _e.mock.On("GetNodeIDByChannelName", channel)} -} - -func (_c *MockChannelManager_GetNodeIDByChannelName_Call) Run(run func(channel string)) *MockChannelManager_GetNodeIDByChannelName_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(string)) - }) - return _c -} - -func (_c *MockChannelManager_GetNodeIDByChannelName_Call) Return(_a0 int64, _a1 bool) *MockChannelManager_GetNodeIDByChannelName_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockChannelManager_GetNodeIDByChannelName_Call) RunAndReturn(run func(string) (int64, bool)) *MockChannelManager_GetNodeIDByChannelName_Call { - _c.Call.Return(run) - return _c -} - // Match provides a mock function with given fields: nodeID, channel func (_m *MockChannelManager) Match(nodeID int64, channel string) bool { ret := _m.Called(nodeID, channel) diff --git a/internal/datacoord/mock_segment_manager.go b/internal/datacoord/mock_segment_manager.go index 58f68a81c8..1b82c30df6 100644 --- a/internal/datacoord/mock_segment_manager.go +++ b/internal/datacoord/mock_segment_manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.32.4. DO NOT EDIT. package datacoord @@ -8,21 +8,21 @@ import ( mock "github.com/stretchr/testify/mock" ) -// MockManager is an autogenerated mock type for the Manager type -type MockManager struct { +// MockSegmentManager is an autogenerated mock type for the Manager type +type MockSegmentManager struct { mock.Mock } -type MockManager_Expecter struct { +type MockSegmentManager_Expecter struct { mock *mock.Mock } -func (_m *MockManager) EXPECT() *MockManager_Expecter { - return &MockManager_Expecter{mock: &_m.Mock} +func (_m *MockSegmentManager) EXPECT() *MockSegmentManager_Expecter { + return &MockSegmentManager_Expecter{mock: &_m.Mock} } // AllocSegment provides a mock function with given fields: ctx, collectionID, partitionID, channelName, requestRows -func (_m *MockManager) AllocSegment(ctx context.Context, collectionID int64, partitionID int64, channelName string, requestRows int64) ([]*Allocation, error) { +func (_m *MockSegmentManager) AllocSegment(ctx context.Context, collectionID int64, partitionID int64, channelName string, requestRows int64) ([]*Allocation, error) { ret := _m.Called(ctx, collectionID, partitionID, channelName, requestRows) var r0 []*Allocation @@ -47,8 +47,8 @@ func (_m *MockManager) AllocSegment(ctx context.Context, collectionID int64, par return r0, r1 } -// MockManager_AllocSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocSegment' -type MockManager_AllocSegment_Call struct { +// MockSegmentManager_AllocSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocSegment' +type MockSegmentManager_AllocSegment_Call struct { *mock.Call } @@ -58,34 +58,68 @@ type MockManager_AllocSegment_Call struct { // - partitionID int64 // - channelName string // - requestRows int64 -func (_e *MockManager_Expecter) AllocSegment(ctx interface{}, collectionID interface{}, partitionID interface{}, channelName interface{}, requestRows interface{}) *MockManager_AllocSegment_Call { - return &MockManager_AllocSegment_Call{Call: _e.mock.On("AllocSegment", ctx, collectionID, partitionID, channelName, requestRows)} +func (_e *MockSegmentManager_Expecter) AllocSegment(ctx interface{}, collectionID interface{}, partitionID interface{}, channelName interface{}, requestRows interface{}) *MockSegmentManager_AllocSegment_Call { + return &MockSegmentManager_AllocSegment_Call{Call: _e.mock.On("AllocSegment", ctx, collectionID, partitionID, channelName, requestRows)} } -func (_c *MockManager_AllocSegment_Call) Run(run func(ctx context.Context, collectionID int64, partitionID int64, channelName string, requestRows int64)) *MockManager_AllocSegment_Call { +func (_c *MockSegmentManager_AllocSegment_Call) Run(run func(ctx context.Context, collectionID int64, partitionID int64, channelName string, requestRows int64)) *MockSegmentManager_AllocSegment_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(string), args[4].(int64)) }) return _c } -func (_c *MockManager_AllocSegment_Call) Return(_a0 []*Allocation, _a1 error) *MockManager_AllocSegment_Call { +func (_c *MockSegmentManager_AllocSegment_Call) Return(_a0 []*Allocation, _a1 error) *MockSegmentManager_AllocSegment_Call { _c.Call.Return(_a0, _a1) return _c } -func (_c *MockManager_AllocSegment_Call) RunAndReturn(run func(context.Context, int64, int64, string, int64) ([]*Allocation, error)) *MockManager_AllocSegment_Call { +func (_c *MockSegmentManager_AllocSegment_Call) RunAndReturn(run func(context.Context, int64, int64, string, int64) ([]*Allocation, error)) *MockSegmentManager_AllocSegment_Call { + _c.Call.Return(run) + return _c +} + +// CleanZeroSealedSegmentsOfChannel provides a mock function with given fields: channel, cpTs +func (_m *MockSegmentManager) CleanZeroSealedSegmentsOfChannel(channel string, cpTs uint64) { + _m.Called(channel, cpTs) +} + +// MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CleanZeroSealedSegmentsOfChannel' +type MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call struct { + *mock.Call +} + +// CleanZeroSealedSegmentsOfChannel is a helper method to define mock.On call +// - channel string +// - cpTs uint64 +func (_e *MockSegmentManager_Expecter) CleanZeroSealedSegmentsOfChannel(channel interface{}, cpTs interface{}) *MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call { + return &MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call{Call: _e.mock.On("CleanZeroSealedSegmentsOfChannel", channel, cpTs)} +} + +func (_c *MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call) Run(run func(channel string, cpTs uint64)) *MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string), args[1].(uint64)) + }) + return _c +} + +func (_c *MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call) Return() *MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call) RunAndReturn(run func(string, uint64)) *MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call { _c.Call.Return(run) return _c } // DropSegment provides a mock function with given fields: ctx, channel, segmentID -func (_m *MockManager) DropSegment(ctx context.Context, channel string, segmentID int64) { +func (_m *MockSegmentManager) DropSegment(ctx context.Context, channel string, segmentID int64) { _m.Called(ctx, channel, segmentID) } -// MockManager_DropSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropSegment' -type MockManager_DropSegment_Call struct { +// MockSegmentManager_DropSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropSegment' +type MockSegmentManager_DropSegment_Call struct { *mock.Call } @@ -93,97 +127,97 @@ type MockManager_DropSegment_Call struct { // - ctx context.Context // - channel string // - segmentID int64 -func (_e *MockManager_Expecter) DropSegment(ctx interface{}, channel interface{}, segmentID interface{}) *MockManager_DropSegment_Call { - return &MockManager_DropSegment_Call{Call: _e.mock.On("DropSegment", ctx, channel, segmentID)} +func (_e *MockSegmentManager_Expecter) DropSegment(ctx interface{}, channel interface{}, segmentID interface{}) *MockSegmentManager_DropSegment_Call { + return &MockSegmentManager_DropSegment_Call{Call: _e.mock.On("DropSegment", ctx, channel, segmentID)} } -func (_c *MockManager_DropSegment_Call) Run(run func(ctx context.Context, channel string, segmentID int64)) *MockManager_DropSegment_Call { +func (_c *MockSegmentManager_DropSegment_Call) Run(run func(ctx context.Context, channel string, segmentID int64)) *MockSegmentManager_DropSegment_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].(string), args[2].(int64)) }) return _c } -func (_c *MockManager_DropSegment_Call) Return() *MockManager_DropSegment_Call { +func (_c *MockSegmentManager_DropSegment_Call) Return() *MockSegmentManager_DropSegment_Call { _c.Call.Return() return _c } -func (_c *MockManager_DropSegment_Call) RunAndReturn(run func(context.Context, string, int64)) *MockManager_DropSegment_Call { +func (_c *MockSegmentManager_DropSegment_Call) RunAndReturn(run func(context.Context, string, int64)) *MockSegmentManager_DropSegment_Call { _c.Call.Return(run) return _c } // DropSegmentsOfChannel provides a mock function with given fields: ctx, channel -func (_m *MockManager) DropSegmentsOfChannel(ctx context.Context, channel string) { +func (_m *MockSegmentManager) DropSegmentsOfChannel(ctx context.Context, channel string) { _m.Called(ctx, channel) } -// MockManager_DropSegmentsOfChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropSegmentsOfChannel' -type MockManager_DropSegmentsOfChannel_Call struct { +// MockSegmentManager_DropSegmentsOfChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropSegmentsOfChannel' +type MockSegmentManager_DropSegmentsOfChannel_Call struct { *mock.Call } // DropSegmentsOfChannel is a helper method to define mock.On call // - ctx context.Context // - channel string -func (_e *MockManager_Expecter) DropSegmentsOfChannel(ctx interface{}, channel interface{}) *MockManager_DropSegmentsOfChannel_Call { - return &MockManager_DropSegmentsOfChannel_Call{Call: _e.mock.On("DropSegmentsOfChannel", ctx, channel)} +func (_e *MockSegmentManager_Expecter) DropSegmentsOfChannel(ctx interface{}, channel interface{}) *MockSegmentManager_DropSegmentsOfChannel_Call { + return &MockSegmentManager_DropSegmentsOfChannel_Call{Call: _e.mock.On("DropSegmentsOfChannel", ctx, channel)} } -func (_c *MockManager_DropSegmentsOfChannel_Call) Run(run func(ctx context.Context, channel string)) *MockManager_DropSegmentsOfChannel_Call { +func (_c *MockSegmentManager_DropSegmentsOfChannel_Call) Run(run func(ctx context.Context, channel string)) *MockSegmentManager_DropSegmentsOfChannel_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].(string)) }) return _c } -func (_c *MockManager_DropSegmentsOfChannel_Call) Return() *MockManager_DropSegmentsOfChannel_Call { +func (_c *MockSegmentManager_DropSegmentsOfChannel_Call) Return() *MockSegmentManager_DropSegmentsOfChannel_Call { _c.Call.Return() return _c } -func (_c *MockManager_DropSegmentsOfChannel_Call) RunAndReturn(run func(context.Context, string)) *MockManager_DropSegmentsOfChannel_Call { +func (_c *MockSegmentManager_DropSegmentsOfChannel_Call) RunAndReturn(run func(context.Context, string)) *MockSegmentManager_DropSegmentsOfChannel_Call { _c.Call.Return(run) return _c } // ExpireAllocations provides a mock function with given fields: channel, ts -func (_m *MockManager) ExpireAllocations(channel string, ts uint64) { +func (_m *MockSegmentManager) ExpireAllocations(channel string, ts uint64) { _m.Called(channel, ts) } -// MockManager_ExpireAllocations_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ExpireAllocations' -type MockManager_ExpireAllocations_Call struct { +// MockSegmentManager_ExpireAllocations_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ExpireAllocations' +type MockSegmentManager_ExpireAllocations_Call struct { *mock.Call } // ExpireAllocations is a helper method to define mock.On call // - channel string // - ts uint64 -func (_e *MockManager_Expecter) ExpireAllocations(channel interface{}, ts interface{}) *MockManager_ExpireAllocations_Call { - return &MockManager_ExpireAllocations_Call{Call: _e.mock.On("ExpireAllocations", channel, ts)} +func (_e *MockSegmentManager_Expecter) ExpireAllocations(channel interface{}, ts interface{}) *MockSegmentManager_ExpireAllocations_Call { + return &MockSegmentManager_ExpireAllocations_Call{Call: _e.mock.On("ExpireAllocations", channel, ts)} } -func (_c *MockManager_ExpireAllocations_Call) Run(run func(channel string, ts uint64)) *MockManager_ExpireAllocations_Call { +func (_c *MockSegmentManager_ExpireAllocations_Call) Run(run func(channel string, ts uint64)) *MockSegmentManager_ExpireAllocations_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(string), args[1].(uint64)) }) return _c } -func (_c *MockManager_ExpireAllocations_Call) Return() *MockManager_ExpireAllocations_Call { +func (_c *MockSegmentManager_ExpireAllocations_Call) Return() *MockSegmentManager_ExpireAllocations_Call { _c.Call.Return() return _c } -func (_c *MockManager_ExpireAllocations_Call) RunAndReturn(run func(string, uint64)) *MockManager_ExpireAllocations_Call { +func (_c *MockSegmentManager_ExpireAllocations_Call) RunAndReturn(run func(string, uint64)) *MockSegmentManager_ExpireAllocations_Call { _c.Call.Return(run) return _c } // GetFlushableSegments provides a mock function with given fields: ctx, channel, ts -func (_m *MockManager) GetFlushableSegments(ctx context.Context, channel string, ts uint64) ([]int64, error) { +func (_m *MockSegmentManager) GetFlushableSegments(ctx context.Context, channel string, ts uint64) ([]int64, error) { ret := _m.Called(ctx, channel, ts) var r0 []int64 @@ -208,8 +242,8 @@ func (_m *MockManager) GetFlushableSegments(ctx context.Context, channel string, return r0, r1 } -// MockManager_GetFlushableSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetFlushableSegments' -type MockManager_GetFlushableSegments_Call struct { +// MockSegmentManager_GetFlushableSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetFlushableSegments' +type MockSegmentManager_GetFlushableSegments_Call struct { *mock.Call } @@ -217,29 +251,29 @@ type MockManager_GetFlushableSegments_Call struct { // - ctx context.Context // - channel string // - ts uint64 -func (_e *MockManager_Expecter) GetFlushableSegments(ctx interface{}, channel interface{}, ts interface{}) *MockManager_GetFlushableSegments_Call { - return &MockManager_GetFlushableSegments_Call{Call: _e.mock.On("GetFlushableSegments", ctx, channel, ts)} +func (_e *MockSegmentManager_Expecter) GetFlushableSegments(ctx interface{}, channel interface{}, ts interface{}) *MockSegmentManager_GetFlushableSegments_Call { + return &MockSegmentManager_GetFlushableSegments_Call{Call: _e.mock.On("GetFlushableSegments", ctx, channel, ts)} } -func (_c *MockManager_GetFlushableSegments_Call) Run(run func(ctx context.Context, channel string, ts uint64)) *MockManager_GetFlushableSegments_Call { +func (_c *MockSegmentManager_GetFlushableSegments_Call) Run(run func(ctx context.Context, channel string, ts uint64)) *MockSegmentManager_GetFlushableSegments_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].(string), args[2].(uint64)) }) return _c } -func (_c *MockManager_GetFlushableSegments_Call) Return(_a0 []int64, _a1 error) *MockManager_GetFlushableSegments_Call { +func (_c *MockSegmentManager_GetFlushableSegments_Call) Return(_a0 []int64, _a1 error) *MockSegmentManager_GetFlushableSegments_Call { _c.Call.Return(_a0, _a1) return _c } -func (_c *MockManager_GetFlushableSegments_Call) RunAndReturn(run func(context.Context, string, uint64) ([]int64, error)) *MockManager_GetFlushableSegments_Call { +func (_c *MockSegmentManager_GetFlushableSegments_Call) RunAndReturn(run func(context.Context, string, uint64) ([]int64, error)) *MockSegmentManager_GetFlushableSegments_Call { _c.Call.Return(run) return _c } // SealAllSegments provides a mock function with given fields: ctx, channel, segIDs -func (_m *MockManager) SealAllSegments(ctx context.Context, channel string, segIDs []int64) ([]int64, error) { +func (_m *MockSegmentManager) SealAllSegments(ctx context.Context, channel string, segIDs []int64) ([]int64, error) { ret := _m.Called(ctx, channel, segIDs) var r0 []int64 @@ -264,8 +298,8 @@ func (_m *MockManager) SealAllSegments(ctx context.Context, channel string, segI return r0, r1 } -// MockManager_SealAllSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SealAllSegments' -type MockManager_SealAllSegments_Call struct { +// MockSegmentManager_SealAllSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SealAllSegments' +type MockSegmentManager_SealAllSegments_Call struct { *mock.Call } @@ -273,34 +307,34 @@ type MockManager_SealAllSegments_Call struct { // - ctx context.Context // - channel string // - segIDs []int64 -func (_e *MockManager_Expecter) SealAllSegments(ctx interface{}, channel interface{}, segIDs interface{}) *MockManager_SealAllSegments_Call { - return &MockManager_SealAllSegments_Call{Call: _e.mock.On("SealAllSegments", ctx, channel, segIDs)} +func (_e *MockSegmentManager_Expecter) SealAllSegments(ctx interface{}, channel interface{}, segIDs interface{}) *MockSegmentManager_SealAllSegments_Call { + return &MockSegmentManager_SealAllSegments_Call{Call: _e.mock.On("SealAllSegments", ctx, channel, segIDs)} } -func (_c *MockManager_SealAllSegments_Call) Run(run func(ctx context.Context, channel string, segIDs []int64)) *MockManager_SealAllSegments_Call { +func (_c *MockSegmentManager_SealAllSegments_Call) Run(run func(ctx context.Context, channel string, segIDs []int64)) *MockSegmentManager_SealAllSegments_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].(string), args[2].([]int64)) }) return _c } -func (_c *MockManager_SealAllSegments_Call) Return(_a0 []int64, _a1 error) *MockManager_SealAllSegments_Call { +func (_c *MockSegmentManager_SealAllSegments_Call) Return(_a0 []int64, _a1 error) *MockSegmentManager_SealAllSegments_Call { _c.Call.Return(_a0, _a1) return _c } -func (_c *MockManager_SealAllSegments_Call) RunAndReturn(run func(context.Context, string, []int64) ([]int64, error)) *MockManager_SealAllSegments_Call { +func (_c *MockSegmentManager_SealAllSegments_Call) RunAndReturn(run func(context.Context, string, []int64) ([]int64, error)) *MockSegmentManager_SealAllSegments_Call { _c.Call.Return(run) return _c } -// NewMockManager creates a new instance of MockManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// NewMockSegmentManager creates a new instance of MockSegmentManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. -func NewMockManager(t interface { +func NewMockSegmentManager(t interface { mock.TestingT Cleanup(func()) -}) *MockManager { - mock := &MockManager{} +}) *MockSegmentManager { + mock := &MockSegmentManager{} mock.Mock.Test(t) t.Cleanup(func() { mock.AssertExpectations(t) }) diff --git a/internal/datacoord/mock_session_manager.go b/internal/datacoord/mock_session_manager.go index d15b8b402b..d5eaecf801 100644 --- a/internal/datacoord/mock_session_manager.go +++ b/internal/datacoord/mock_session_manager.go @@ -123,7 +123,9 @@ func (_m *MockSessionManager) CheckDNHealth(ctx context.Context) *healthcheck.Re if rf, ok := ret.Get(0).(func(context.Context) *healthcheck.Result); ok { r0 = rf(ctx) } else { - r0 = ret.Get(0).(*healthcheck.Result) + if ret.Get(0) != nil { + r0 = ret.Get(0).(*healthcheck.Result) + } } return r0 @@ -147,12 +149,12 @@ func (_c *MockSessionManager_CheckDNHealth_Call) Run(run func(ctx context.Contex return _c } -func (_c *MockSessionManager_CheckDNHealth_Call) Return(_a0 healthcheck.Result) *MockSessionManager_CheckDNHealth_Call { +func (_c *MockSessionManager_CheckDNHealth_Call) Return(_a0 *healthcheck.Result) *MockSessionManager_CheckDNHealth_Call { _c.Call.Return(_a0) return _c } -func (_c *MockSessionManager_CheckDNHealth_Call) RunAndReturn(run func(context.Context) healthcheck.Result) *MockSessionManager_CheckDNHealth_Call { +func (_c *MockSessionManager_CheckDNHealth_Call) RunAndReturn(run func(context.Context) *healthcheck.Result) *MockSessionManager_CheckDNHealth_Call { _c.Call.Return(run) return _c } diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index c548788112..7765e14683 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -85,6 +85,8 @@ type Manager interface { ExpireAllocations(channel string, ts Timestamp) // DropSegmentsOfChannel drops all segments in a channel DropSegmentsOfChannel(ctx context.Context, channel string) + // CleanZeroSealedSegmentsOfChannel try to clean real empty sealed segments in a channel + CleanZeroSealedSegmentsOfChannel(channel string, cpTs Timestamp) } // Allocation records the allocation info @@ -497,9 +499,6 @@ func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel strin return nil, err } - // TODO: It's too frequent; perhaps each channel could check once per minute instead. - s.cleanupSealedSegment(t, channel) - sealed, ok := s.channel2Sealed.Get(channel) if !ok { return nil, nil @@ -551,26 +550,35 @@ func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) { }) } -func (s *SegmentManager) cleanupSealedSegment(ts Timestamp, channel string) { +func (s *SegmentManager) CleanZeroSealedSegmentsOfChannel(channel string, cpTs Timestamp) { + s.channelLock.Lock(channel) + defer s.channelLock.Unlock(channel) + sealed, ok := s.channel2Sealed.Get(channel) if !ok { + log.Info("try remove empty sealed segment after channel cp updated failed to get channel", zap.String("channel", channel)) return } sealed.Range(func(id int64) bool { segment := s.meta.GetHealthySegment(id) if segment == nil { - log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", id)) + log.Warn("try remove empty sealed segment, failed to get segment, remove it in channel2Sealed", zap.String("channel", channel), zap.Int64("segmentID", id)) sealed.Remove(id) return true } // Check if segment is empty - if segment.GetLastExpireTime() <= ts && segment.currRows == 0 { - log.Info("remove empty sealed segment", zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id)) + if segment.GetLastExpireTime() > 0 && segment.GetLastExpireTime() < cpTs && segment.currRows == 0 && segment.GetNumOfRows() == 0 { + log.Info("try remove empty sealed segment after channel cp updated", + zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id), + zap.String("channel", channel), zap.Any("cpTs", cpTs)) if err := s.meta.SetState(id, commonpb.SegmentState_Dropped); err != nil { - log.Warn("failed to set segment state to dropped", zap.String("channel", channel), + log.Warn("try remove empty sealed segment after channel cp updated, failed to set segment state to dropped", zap.String("channel", channel), zap.Int64("segmentID", id), zap.Error(err)) } else { sealed.Remove(id) + log.Info("succeed to remove empty sealed segment", + zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id), + zap.String("channel", channel), zap.Any("cpTs", cpTs), zap.Any("expireTs", segment.GetLastExpireTime())) } } return true diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 4908c34d63..d3faa4504b 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -28,10 +28,12 @@ import ( "github.com/stretchr/testify/mock" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" mockkv "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" + "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/lock" @@ -495,6 +497,15 @@ func TestGetFlushableSegments(t *testing.T) { assert.EqualValues(t, allocations[0].SegmentID, ids[0]) meta.SetCurrentRows(allocations[0].SegmentID, 0) + postions := make([]*msgpb.MsgPosition, 0) + cpTs := allocations[0].ExpireTime + 1 + postions = append(postions, &msgpb.MsgPosition{ + ChannelName: "c1", + MsgID: []byte{1, 2, 3}, + Timestamp: cpTs, + }) + meta.UpdateChannelCheckpoints(postions) + segmentManager.CleanZeroSealedSegmentsOfChannel("c1", cpTs) ids, err = segmentManager.GetFlushableSegments(context.TODO(), "c1", allocations[0].ExpireTime) assert.NoError(t, err) assert.Empty(t, ids) @@ -881,3 +892,149 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) { }) } } + +func TestSegmentManager_CleanZeroSealedSegmentsOfChannel(t *testing.T) { + partitionID := int64(100) + type fields struct { + meta *meta + segments []UniqueID + } + type args struct { + channel string + cpTs Timestamp + } + + mockCatalog := mocks.NewDataCoordCatalog(t) + mockCatalog.EXPECT().AlterSegments(mock.Anything, mock.Anything, mock.Anything).Return(nil) + + seg1 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + PartitionID: partitionID, + InsertChannel: "ch1", + State: commonpb.SegmentState_Sealed, + NumOfRows: 1, + LastExpireTime: 100, + }, + currRows: 1, + } + seg2 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + PartitionID: partitionID, + InsertChannel: "ch1", + State: commonpb.SegmentState_Sealed, + NumOfRows: 0, + LastExpireTime: 100, + }, + currRows: 0, + } + seg3 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 3, + PartitionID: partitionID, + InsertChannel: "ch1", + State: commonpb.SegmentState_Sealed, + LastExpireTime: 90, + }, + } + seg4 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 4, + PartitionID: partitionID, + InsertChannel: "ch2", + State: commonpb.SegmentState_Growing, + NumOfRows: 1, + LastExpireTime: 100, + }, + currRows: 1, + } + + newMetaFunc := func() *meta { + return &meta{ + catalog: mockCatalog, + segments: &SegmentsInfo{ + segments: map[int64]*SegmentInfo{ + 1: seg1, + 2: seg2, + 3: seg3, + 4: seg4, + }, + secondaryIndexes: segmentInfoIndexes{ + coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{ + 0: {1: seg1, 2: seg2, 3: seg3, 4: seg4}, + }, + channel2Segments: map[string]map[UniqueID]*SegmentInfo{ + "ch1": {1: seg1, 2: seg2, 3: seg3}, + "ch2": {4: seg4}, + }, + }, + }, + } + } + + tests := []struct { + name string + fields fields + args args + want []UniqueID + }{ + { + "test clean empty sealed segments with normal channel cp <= lastExpireTs", + fields{ + meta: newMetaFunc(), + segments: []UniqueID{1, 2, 3, 4}, + }, + args{ + "ch1", 100, + }, + []UniqueID{1, 2, 4}, + }, + { + "test clean empty sealed segments with normal channel cp > lastExpireTs", + fields{ + meta: newMetaFunc(), + segments: []UniqueID{1, 2, 3, 4}, + }, + args{ + "ch1", 101, + }, + []UniqueID{1, 4}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &SegmentManager{ + meta: tt.fields.meta, + channelLock: lock.NewKeyLock[string](), + channel2Growing: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](), + channel2Sealed: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](), + } + for _, segmentID := range tt.fields.segments { + segmentInfo := tt.fields.meta.GetSegment(segmentID) + channel := tt.args.channel + if segmentInfo != nil { + channel = segmentInfo.GetInsertChannel() + } + if segmentInfo == nil || segmentInfo.GetState() == commonpb.SegmentState_Growing { + growing, _ := s.channel2Growing.GetOrInsert(channel, typeutil.NewUniqueSet()) + growing.Insert(segmentID) + } else if segmentInfo.GetState() == commonpb.SegmentState_Sealed { + sealed, _ := s.channel2Sealed.GetOrInsert(channel, typeutil.NewUniqueSet()) + sealed.Insert(segmentID) + } + } + s.CleanZeroSealedSegmentsOfChannel(tt.args.channel, tt.args.cpTs) + all := make([]int64, 0) + s.channel2Sealed.Range(func(_ string, segments typeutil.UniqueSet) bool { + all = append(all, segments.Collect()...) + return true + }) + s.channel2Growing.Range(func(_ string, segments typeutil.UniqueSet) bool { + all = append(all, segments.Collect()...) + return true + }) + assert.ElementsMatch(t, tt.want, all) + }) + } +} diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 76731a93d5..65a9087246 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -824,7 +824,7 @@ func TestServer_getSystemInfoMetrics(t *testing.T) { func TestDropVirtualChannel(t *testing.T) { t.Run("normal DropVirtualChannel", func(t *testing.T) { - segmentManager := NewMockManager(t) + segmentManager := NewMockSegmentManager(t) svr := newTestServer(t, WithSegmentManager(segmentManager)) defer closeTestServer(t, svr) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 86b3be1b05..ac92862a12 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1387,6 +1387,13 @@ func (s *Server) UpdateChannelCheckpoint(ctx context.Context, req *datapb.Update return merr.Status(err), nil } + for _, pos := range checkpoints { + if pos == nil || pos.GetMsgID() == nil || pos.GetChannelName() == "" { + continue + } + s.segmentManager.CleanZeroSealedSegmentsOfChannel(pos.GetChannelName(), pos.GetTimestamp()) + } + return merr.Success(), nil }