fix: deleted the sealed segment data accidentally (#39423)

issue: https://github.com/milvus-io/milvus/issues/39333
pr: https://github.com/milvus-io/milvus/pull/39421

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
zhenshan.cao 2025-01-20 15:27:06 +08:00 committed by GitHub
parent 2ead5d6735
commit 9ab3fac416
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 324 additions and 166 deletions

View File

@ -488,6 +488,8 @@ generate-mockery-datacoord: getdeps
$(INSTALL_PATH)/mockery --name=SubCluster --dir=internal/datacoord --filename=mock_subcluster.go --output=internal/datacoord --structname=MockSubCluster --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=SubCluster --dir=internal/datacoord --filename=mock_subcluster.go --output=internal/datacoord --structname=MockSubCluster --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=Broker --dir=internal/datacoord/broker --filename=mock_coordinator_broker.go --output=internal/datacoord/broker --structname=MockBroker --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=Broker --dir=internal/datacoord/broker --filename=mock_coordinator_broker.go --output=internal/datacoord/broker --structname=MockBroker --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=WorkerManager --dir=internal/datacoord --filename=mock_worker_manager.go --output=internal/datacoord --structname=MockWorkerManager --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=WorkerManager --dir=internal/datacoord --filename=mock_worker_manager.go --output=internal/datacoord --structname=MockWorkerManager --with-expecter --inpackage
$(INSTALL_PATH)/mockery --name=Manager --dir=internal/datacoord --filename=mock_segment_manager.go --output=internal/datacoord --structname=MockSegmentManager --with-expecter --inpackage
generate-mockery-datanode: getdeps generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage $(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage

View File

@ -179,50 +179,6 @@ func (_c *MockRWChannelStore_GetNodeChannelCount_Call) RunAndReturn(run func(int
return _c return _c
} }
// GetNodeChannelsByCollectionID provides a mock function with given fields: collectionID
func (_m *MockRWChannelStore) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string {
ret := _m.Called(collectionID)
var r0 map[int64][]string
if rf, ok := ret.Get(0).(func(int64) map[int64][]string); ok {
r0 = rf(collectionID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[int64][]string)
}
}
return r0
}
// MockRWChannelStore_GetNodeChannelsByCollectionID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelsByCollectionID'
type MockRWChannelStore_GetNodeChannelsByCollectionID_Call struct {
*mock.Call
}
// GetNodeChannelsByCollectionID is a helper method to define mock.On call
// - collectionID int64
func (_e *MockRWChannelStore_Expecter) GetNodeChannelsByCollectionID(collectionID interface{}) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
return &MockRWChannelStore_GetNodeChannelsByCollectionID_Call{Call: _e.mock.On("GetNodeChannelsByCollectionID", collectionID)}
}
func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Run(run func(collectionID int64)) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Return(_a0 map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) RunAndReturn(run func(int64) map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
_c.Call.Return(run)
return _c
}
// GetNodeChannelsBy provides a mock function with given fields: nodeSelector, channelSelectors // GetNodeChannelsBy provides a mock function with given fields: nodeSelector, channelSelectors
func (_m *MockRWChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo { func (_m *MockRWChannelStore) GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo {
_va := make([]interface{}, len(channelSelectors)) _va := make([]interface{}, len(channelSelectors))
@ -282,6 +238,50 @@ func (_c *MockRWChannelStore_GetNodeChannelsBy_Call) RunAndReturn(run func(NodeS
return _c return _c
} }
// GetNodeChannelsByCollectionID provides a mock function with given fields: collectionID
func (_m *MockRWChannelStore) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string {
ret := _m.Called(collectionID)
var r0 map[int64][]string
if rf, ok := ret.Get(0).(func(int64) map[int64][]string); ok {
r0 = rf(collectionID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[int64][]string)
}
}
return r0
}
// MockRWChannelStore_GetNodeChannelsByCollectionID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelsByCollectionID'
type MockRWChannelStore_GetNodeChannelsByCollectionID_Call struct {
*mock.Call
}
// GetNodeChannelsByCollectionID is a helper method to define mock.On call
// - collectionID int64
func (_e *MockRWChannelStore_Expecter) GetNodeChannelsByCollectionID(collectionID interface{}) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
return &MockRWChannelStore_GetNodeChannelsByCollectionID_Call{Call: _e.mock.On("GetNodeChannelsByCollectionID", collectionID)}
}
func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Run(run func(collectionID int64)) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) Return(_a0 map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockRWChannelStore_GetNodeChannelsByCollectionID_Call) RunAndReturn(run func(int64) map[int64][]string) *MockRWChannelStore_GetNodeChannelsByCollectionID_Call {
_c.Call.Return(run)
return _c
}
// GetNodes provides a mock function with given fields: // GetNodes provides a mock function with given fields:
func (_m *MockRWChannelStore) GetNodes() []int64 { func (_m *MockRWChannelStore) GetNodes() []int64 {
ret := _m.Called() ret := _m.Called()

View File

@ -376,58 +376,6 @@ func (_c *MockChannelManager_GetNodeChannelsByCollectionID_Call) RunAndReturn(ru
return _c return _c
} }
// GetNodeIDByChannelName provides a mock function with given fields: channel
func (_m *MockChannelManager) GetNodeIDByChannelName(channel string) (int64, bool) {
ret := _m.Called(channel)
var r0 int64
var r1 bool
if rf, ok := ret.Get(0).(func(string) (int64, bool)); ok {
return rf(channel)
}
if rf, ok := ret.Get(0).(func(string) int64); ok {
r0 = rf(channel)
} else {
r0 = ret.Get(0).(int64)
}
if rf, ok := ret.Get(1).(func(string) bool); ok {
r1 = rf(channel)
} else {
r1 = ret.Get(1).(bool)
}
return r0, r1
}
// MockChannelManager_GetNodeIDByChannelName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeIDByChannelName'
type MockChannelManager_GetNodeIDByChannelName_Call struct {
*mock.Call
}
// GetNodeIDByChannelName is a helper method to define mock.On call
// - channel string
func (_e *MockChannelManager_Expecter) GetNodeIDByChannelName(channel interface{}) *MockChannelManager_GetNodeIDByChannelName_Call {
return &MockChannelManager_GetNodeIDByChannelName_Call{Call: _e.mock.On("GetNodeIDByChannelName", channel)}
}
func (_c *MockChannelManager_GetNodeIDByChannelName_Call) Run(run func(channel string)) *MockChannelManager_GetNodeIDByChannelName_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockChannelManager_GetNodeIDByChannelName_Call) Return(_a0 int64, _a1 bool) *MockChannelManager_GetNodeIDByChannelName_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockChannelManager_GetNodeIDByChannelName_Call) RunAndReturn(run func(string) (int64, bool)) *MockChannelManager_GetNodeIDByChannelName_Call {
_c.Call.Return(run)
return _c
}
// Match provides a mock function with given fields: nodeID, channel // Match provides a mock function with given fields: nodeID, channel
func (_m *MockChannelManager) Match(nodeID int64, channel string) bool { func (_m *MockChannelManager) Match(nodeID int64, channel string) bool {
ret := _m.Called(nodeID, channel) ret := _m.Called(nodeID, channel)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.30.1. DO NOT EDIT. // Code generated by mockery v2.32.4. DO NOT EDIT.
package datacoord package datacoord
@ -8,21 +8,21 @@ import (
mock "github.com/stretchr/testify/mock" mock "github.com/stretchr/testify/mock"
) )
// MockManager is an autogenerated mock type for the Manager type // MockSegmentManager is an autogenerated mock type for the Manager type
type MockManager struct { type MockSegmentManager struct {
mock.Mock mock.Mock
} }
type MockManager_Expecter struct { type MockSegmentManager_Expecter struct {
mock *mock.Mock mock *mock.Mock
} }
func (_m *MockManager) EXPECT() *MockManager_Expecter { func (_m *MockSegmentManager) EXPECT() *MockSegmentManager_Expecter {
return &MockManager_Expecter{mock: &_m.Mock} return &MockSegmentManager_Expecter{mock: &_m.Mock}
} }
// AllocSegment provides a mock function with given fields: ctx, collectionID, partitionID, channelName, requestRows // AllocSegment provides a mock function with given fields: ctx, collectionID, partitionID, channelName, requestRows
func (_m *MockManager) AllocSegment(ctx context.Context, collectionID int64, partitionID int64, channelName string, requestRows int64) ([]*Allocation, error) { func (_m *MockSegmentManager) AllocSegment(ctx context.Context, collectionID int64, partitionID int64, channelName string, requestRows int64) ([]*Allocation, error) {
ret := _m.Called(ctx, collectionID, partitionID, channelName, requestRows) ret := _m.Called(ctx, collectionID, partitionID, channelName, requestRows)
var r0 []*Allocation var r0 []*Allocation
@ -47,8 +47,8 @@ func (_m *MockManager) AllocSegment(ctx context.Context, collectionID int64, par
return r0, r1 return r0, r1
} }
// MockManager_AllocSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocSegment' // MockSegmentManager_AllocSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocSegment'
type MockManager_AllocSegment_Call struct { type MockSegmentManager_AllocSegment_Call struct {
*mock.Call *mock.Call
} }
@ -58,34 +58,68 @@ type MockManager_AllocSegment_Call struct {
// - partitionID int64 // - partitionID int64
// - channelName string // - channelName string
// - requestRows int64 // - requestRows int64
func (_e *MockManager_Expecter) AllocSegment(ctx interface{}, collectionID interface{}, partitionID interface{}, channelName interface{}, requestRows interface{}) *MockManager_AllocSegment_Call { func (_e *MockSegmentManager_Expecter) AllocSegment(ctx interface{}, collectionID interface{}, partitionID interface{}, channelName interface{}, requestRows interface{}) *MockSegmentManager_AllocSegment_Call {
return &MockManager_AllocSegment_Call{Call: _e.mock.On("AllocSegment", ctx, collectionID, partitionID, channelName, requestRows)} return &MockSegmentManager_AllocSegment_Call{Call: _e.mock.On("AllocSegment", ctx, collectionID, partitionID, channelName, requestRows)}
} }
func (_c *MockManager_AllocSegment_Call) Run(run func(ctx context.Context, collectionID int64, partitionID int64, channelName string, requestRows int64)) *MockManager_AllocSegment_Call { func (_c *MockSegmentManager_AllocSegment_Call) Run(run func(ctx context.Context, collectionID int64, partitionID int64, channelName string, requestRows int64)) *MockSegmentManager_AllocSegment_Call {
_c.Call.Run(func(args mock.Arguments) { _c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(string), args[4].(int64)) run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(string), args[4].(int64))
}) })
return _c return _c
} }
func (_c *MockManager_AllocSegment_Call) Return(_a0 []*Allocation, _a1 error) *MockManager_AllocSegment_Call { func (_c *MockSegmentManager_AllocSegment_Call) Return(_a0 []*Allocation, _a1 error) *MockSegmentManager_AllocSegment_Call {
_c.Call.Return(_a0, _a1) _c.Call.Return(_a0, _a1)
return _c return _c
} }
func (_c *MockManager_AllocSegment_Call) RunAndReturn(run func(context.Context, int64, int64, string, int64) ([]*Allocation, error)) *MockManager_AllocSegment_Call { func (_c *MockSegmentManager_AllocSegment_Call) RunAndReturn(run func(context.Context, int64, int64, string, int64) ([]*Allocation, error)) *MockSegmentManager_AllocSegment_Call {
_c.Call.Return(run)
return _c
}
// CleanZeroSealedSegmentsOfChannel provides a mock function with given fields: channel, cpTs
func (_m *MockSegmentManager) CleanZeroSealedSegmentsOfChannel(channel string, cpTs uint64) {
_m.Called(channel, cpTs)
}
// MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CleanZeroSealedSegmentsOfChannel'
type MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call struct {
*mock.Call
}
// CleanZeroSealedSegmentsOfChannel is a helper method to define mock.On call
// - channel string
// - cpTs uint64
func (_e *MockSegmentManager_Expecter) CleanZeroSealedSegmentsOfChannel(channel interface{}, cpTs interface{}) *MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call {
return &MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call{Call: _e.mock.On("CleanZeroSealedSegmentsOfChannel", channel, cpTs)}
}
func (_c *MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call) Run(run func(channel string, cpTs uint64)) *MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(uint64))
})
return _c
}
func (_c *MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call) Return() *MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call {
_c.Call.Return()
return _c
}
func (_c *MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call) RunAndReturn(run func(string, uint64)) *MockSegmentManager_CleanZeroSealedSegmentsOfChannel_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }
// DropSegment provides a mock function with given fields: ctx, channel, segmentID // DropSegment provides a mock function with given fields: ctx, channel, segmentID
func (_m *MockManager) DropSegment(ctx context.Context, channel string, segmentID int64) { func (_m *MockSegmentManager) DropSegment(ctx context.Context, channel string, segmentID int64) {
_m.Called(ctx, channel, segmentID) _m.Called(ctx, channel, segmentID)
} }
// MockManager_DropSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropSegment' // MockSegmentManager_DropSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropSegment'
type MockManager_DropSegment_Call struct { type MockSegmentManager_DropSegment_Call struct {
*mock.Call *mock.Call
} }
@ -93,97 +127,97 @@ type MockManager_DropSegment_Call struct {
// - ctx context.Context // - ctx context.Context
// - channel string // - channel string
// - segmentID int64 // - segmentID int64
func (_e *MockManager_Expecter) DropSegment(ctx interface{}, channel interface{}, segmentID interface{}) *MockManager_DropSegment_Call { func (_e *MockSegmentManager_Expecter) DropSegment(ctx interface{}, channel interface{}, segmentID interface{}) *MockSegmentManager_DropSegment_Call {
return &MockManager_DropSegment_Call{Call: _e.mock.On("DropSegment", ctx, channel, segmentID)} return &MockSegmentManager_DropSegment_Call{Call: _e.mock.On("DropSegment", ctx, channel, segmentID)}
} }
func (_c *MockManager_DropSegment_Call) Run(run func(ctx context.Context, channel string, segmentID int64)) *MockManager_DropSegment_Call { func (_c *MockSegmentManager_DropSegment_Call) Run(run func(ctx context.Context, channel string, segmentID int64)) *MockSegmentManager_DropSegment_Call {
_c.Call.Run(func(args mock.Arguments) { _c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(int64)) run(args[0].(context.Context), args[1].(string), args[2].(int64))
}) })
return _c return _c
} }
func (_c *MockManager_DropSegment_Call) Return() *MockManager_DropSegment_Call { func (_c *MockSegmentManager_DropSegment_Call) Return() *MockSegmentManager_DropSegment_Call {
_c.Call.Return() _c.Call.Return()
return _c return _c
} }
func (_c *MockManager_DropSegment_Call) RunAndReturn(run func(context.Context, string, int64)) *MockManager_DropSegment_Call { func (_c *MockSegmentManager_DropSegment_Call) RunAndReturn(run func(context.Context, string, int64)) *MockSegmentManager_DropSegment_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }
// DropSegmentsOfChannel provides a mock function with given fields: ctx, channel // DropSegmentsOfChannel provides a mock function with given fields: ctx, channel
func (_m *MockManager) DropSegmentsOfChannel(ctx context.Context, channel string) { func (_m *MockSegmentManager) DropSegmentsOfChannel(ctx context.Context, channel string) {
_m.Called(ctx, channel) _m.Called(ctx, channel)
} }
// MockManager_DropSegmentsOfChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropSegmentsOfChannel' // MockSegmentManager_DropSegmentsOfChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropSegmentsOfChannel'
type MockManager_DropSegmentsOfChannel_Call struct { type MockSegmentManager_DropSegmentsOfChannel_Call struct {
*mock.Call *mock.Call
} }
// DropSegmentsOfChannel is a helper method to define mock.On call // DropSegmentsOfChannel is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - channel string // - channel string
func (_e *MockManager_Expecter) DropSegmentsOfChannel(ctx interface{}, channel interface{}) *MockManager_DropSegmentsOfChannel_Call { func (_e *MockSegmentManager_Expecter) DropSegmentsOfChannel(ctx interface{}, channel interface{}) *MockSegmentManager_DropSegmentsOfChannel_Call {
return &MockManager_DropSegmentsOfChannel_Call{Call: _e.mock.On("DropSegmentsOfChannel", ctx, channel)} return &MockSegmentManager_DropSegmentsOfChannel_Call{Call: _e.mock.On("DropSegmentsOfChannel", ctx, channel)}
} }
func (_c *MockManager_DropSegmentsOfChannel_Call) Run(run func(ctx context.Context, channel string)) *MockManager_DropSegmentsOfChannel_Call { func (_c *MockSegmentManager_DropSegmentsOfChannel_Call) Run(run func(ctx context.Context, channel string)) *MockSegmentManager_DropSegmentsOfChannel_Call {
_c.Call.Run(func(args mock.Arguments) { _c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string)) run(args[0].(context.Context), args[1].(string))
}) })
return _c return _c
} }
func (_c *MockManager_DropSegmentsOfChannel_Call) Return() *MockManager_DropSegmentsOfChannel_Call { func (_c *MockSegmentManager_DropSegmentsOfChannel_Call) Return() *MockSegmentManager_DropSegmentsOfChannel_Call {
_c.Call.Return() _c.Call.Return()
return _c return _c
} }
func (_c *MockManager_DropSegmentsOfChannel_Call) RunAndReturn(run func(context.Context, string)) *MockManager_DropSegmentsOfChannel_Call { func (_c *MockSegmentManager_DropSegmentsOfChannel_Call) RunAndReturn(run func(context.Context, string)) *MockSegmentManager_DropSegmentsOfChannel_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }
// ExpireAllocations provides a mock function with given fields: channel, ts // ExpireAllocations provides a mock function with given fields: channel, ts
func (_m *MockManager) ExpireAllocations(channel string, ts uint64) { func (_m *MockSegmentManager) ExpireAllocations(channel string, ts uint64) {
_m.Called(channel, ts) _m.Called(channel, ts)
} }
// MockManager_ExpireAllocations_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ExpireAllocations' // MockSegmentManager_ExpireAllocations_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ExpireAllocations'
type MockManager_ExpireAllocations_Call struct { type MockSegmentManager_ExpireAllocations_Call struct {
*mock.Call *mock.Call
} }
// ExpireAllocations is a helper method to define mock.On call // ExpireAllocations is a helper method to define mock.On call
// - channel string // - channel string
// - ts uint64 // - ts uint64
func (_e *MockManager_Expecter) ExpireAllocations(channel interface{}, ts interface{}) *MockManager_ExpireAllocations_Call { func (_e *MockSegmentManager_Expecter) ExpireAllocations(channel interface{}, ts interface{}) *MockSegmentManager_ExpireAllocations_Call {
return &MockManager_ExpireAllocations_Call{Call: _e.mock.On("ExpireAllocations", channel, ts)} return &MockSegmentManager_ExpireAllocations_Call{Call: _e.mock.On("ExpireAllocations", channel, ts)}
} }
func (_c *MockManager_ExpireAllocations_Call) Run(run func(channel string, ts uint64)) *MockManager_ExpireAllocations_Call { func (_c *MockSegmentManager_ExpireAllocations_Call) Run(run func(channel string, ts uint64)) *MockSegmentManager_ExpireAllocations_Call {
_c.Call.Run(func(args mock.Arguments) { _c.Call.Run(func(args mock.Arguments) {
run(args[0].(string), args[1].(uint64)) run(args[0].(string), args[1].(uint64))
}) })
return _c return _c
} }
func (_c *MockManager_ExpireAllocations_Call) Return() *MockManager_ExpireAllocations_Call { func (_c *MockSegmentManager_ExpireAllocations_Call) Return() *MockSegmentManager_ExpireAllocations_Call {
_c.Call.Return() _c.Call.Return()
return _c return _c
} }
func (_c *MockManager_ExpireAllocations_Call) RunAndReturn(run func(string, uint64)) *MockManager_ExpireAllocations_Call { func (_c *MockSegmentManager_ExpireAllocations_Call) RunAndReturn(run func(string, uint64)) *MockSegmentManager_ExpireAllocations_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }
// GetFlushableSegments provides a mock function with given fields: ctx, channel, ts // GetFlushableSegments provides a mock function with given fields: ctx, channel, ts
func (_m *MockManager) GetFlushableSegments(ctx context.Context, channel string, ts uint64) ([]int64, error) { func (_m *MockSegmentManager) GetFlushableSegments(ctx context.Context, channel string, ts uint64) ([]int64, error) {
ret := _m.Called(ctx, channel, ts) ret := _m.Called(ctx, channel, ts)
var r0 []int64 var r0 []int64
@ -208,8 +242,8 @@ func (_m *MockManager) GetFlushableSegments(ctx context.Context, channel string,
return r0, r1 return r0, r1
} }
// MockManager_GetFlushableSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetFlushableSegments' // MockSegmentManager_GetFlushableSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetFlushableSegments'
type MockManager_GetFlushableSegments_Call struct { type MockSegmentManager_GetFlushableSegments_Call struct {
*mock.Call *mock.Call
} }
@ -217,29 +251,29 @@ type MockManager_GetFlushableSegments_Call struct {
// - ctx context.Context // - ctx context.Context
// - channel string // - channel string
// - ts uint64 // - ts uint64
func (_e *MockManager_Expecter) GetFlushableSegments(ctx interface{}, channel interface{}, ts interface{}) *MockManager_GetFlushableSegments_Call { func (_e *MockSegmentManager_Expecter) GetFlushableSegments(ctx interface{}, channel interface{}, ts interface{}) *MockSegmentManager_GetFlushableSegments_Call {
return &MockManager_GetFlushableSegments_Call{Call: _e.mock.On("GetFlushableSegments", ctx, channel, ts)} return &MockSegmentManager_GetFlushableSegments_Call{Call: _e.mock.On("GetFlushableSegments", ctx, channel, ts)}
} }
func (_c *MockManager_GetFlushableSegments_Call) Run(run func(ctx context.Context, channel string, ts uint64)) *MockManager_GetFlushableSegments_Call { func (_c *MockSegmentManager_GetFlushableSegments_Call) Run(run func(ctx context.Context, channel string, ts uint64)) *MockSegmentManager_GetFlushableSegments_Call {
_c.Call.Run(func(args mock.Arguments) { _c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(uint64)) run(args[0].(context.Context), args[1].(string), args[2].(uint64))
}) })
return _c return _c
} }
func (_c *MockManager_GetFlushableSegments_Call) Return(_a0 []int64, _a1 error) *MockManager_GetFlushableSegments_Call { func (_c *MockSegmentManager_GetFlushableSegments_Call) Return(_a0 []int64, _a1 error) *MockSegmentManager_GetFlushableSegments_Call {
_c.Call.Return(_a0, _a1) _c.Call.Return(_a0, _a1)
return _c return _c
} }
func (_c *MockManager_GetFlushableSegments_Call) RunAndReturn(run func(context.Context, string, uint64) ([]int64, error)) *MockManager_GetFlushableSegments_Call { func (_c *MockSegmentManager_GetFlushableSegments_Call) RunAndReturn(run func(context.Context, string, uint64) ([]int64, error)) *MockSegmentManager_GetFlushableSegments_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }
// SealAllSegments provides a mock function with given fields: ctx, channel, segIDs // SealAllSegments provides a mock function with given fields: ctx, channel, segIDs
func (_m *MockManager) SealAllSegments(ctx context.Context, channel string, segIDs []int64) ([]int64, error) { func (_m *MockSegmentManager) SealAllSegments(ctx context.Context, channel string, segIDs []int64) ([]int64, error) {
ret := _m.Called(ctx, channel, segIDs) ret := _m.Called(ctx, channel, segIDs)
var r0 []int64 var r0 []int64
@ -264,8 +298,8 @@ func (_m *MockManager) SealAllSegments(ctx context.Context, channel string, segI
return r0, r1 return r0, r1
} }
// MockManager_SealAllSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SealAllSegments' // MockSegmentManager_SealAllSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SealAllSegments'
type MockManager_SealAllSegments_Call struct { type MockSegmentManager_SealAllSegments_Call struct {
*mock.Call *mock.Call
} }
@ -273,34 +307,34 @@ type MockManager_SealAllSegments_Call struct {
// - ctx context.Context // - ctx context.Context
// - channel string // - channel string
// - segIDs []int64 // - segIDs []int64
func (_e *MockManager_Expecter) SealAllSegments(ctx interface{}, channel interface{}, segIDs interface{}) *MockManager_SealAllSegments_Call { func (_e *MockSegmentManager_Expecter) SealAllSegments(ctx interface{}, channel interface{}, segIDs interface{}) *MockSegmentManager_SealAllSegments_Call {
return &MockManager_SealAllSegments_Call{Call: _e.mock.On("SealAllSegments", ctx, channel, segIDs)} return &MockSegmentManager_SealAllSegments_Call{Call: _e.mock.On("SealAllSegments", ctx, channel, segIDs)}
} }
func (_c *MockManager_SealAllSegments_Call) Run(run func(ctx context.Context, channel string, segIDs []int64)) *MockManager_SealAllSegments_Call { func (_c *MockSegmentManager_SealAllSegments_Call) Run(run func(ctx context.Context, channel string, segIDs []int64)) *MockSegmentManager_SealAllSegments_Call {
_c.Call.Run(func(args mock.Arguments) { _c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].([]int64)) run(args[0].(context.Context), args[1].(string), args[2].([]int64))
}) })
return _c return _c
} }
func (_c *MockManager_SealAllSegments_Call) Return(_a0 []int64, _a1 error) *MockManager_SealAllSegments_Call { func (_c *MockSegmentManager_SealAllSegments_Call) Return(_a0 []int64, _a1 error) *MockSegmentManager_SealAllSegments_Call {
_c.Call.Return(_a0, _a1) _c.Call.Return(_a0, _a1)
return _c return _c
} }
func (_c *MockManager_SealAllSegments_Call) RunAndReturn(run func(context.Context, string, []int64) ([]int64, error)) *MockManager_SealAllSegments_Call { func (_c *MockSegmentManager_SealAllSegments_Call) RunAndReturn(run func(context.Context, string, []int64) ([]int64, error)) *MockSegmentManager_SealAllSegments_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }
// NewMockManager creates a new instance of MockManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // NewMockSegmentManager creates a new instance of MockSegmentManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value. // The first argument is typically a *testing.T value.
func NewMockManager(t interface { func NewMockSegmentManager(t interface {
mock.TestingT mock.TestingT
Cleanup(func()) Cleanup(func())
}) *MockManager { }) *MockSegmentManager {
mock := &MockManager{} mock := &MockSegmentManager{}
mock.Mock.Test(t) mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) }) t.Cleanup(func() { mock.AssertExpectations(t) })

View File

@ -123,7 +123,9 @@ func (_m *MockSessionManager) CheckDNHealth(ctx context.Context) *healthcheck.Re
if rf, ok := ret.Get(0).(func(context.Context) *healthcheck.Result); ok { if rf, ok := ret.Get(0).(func(context.Context) *healthcheck.Result); ok {
r0 = rf(ctx) r0 = rf(ctx)
} else { } else {
r0 = ret.Get(0).(*healthcheck.Result) if ret.Get(0) != nil {
r0 = ret.Get(0).(*healthcheck.Result)
}
} }
return r0 return r0
@ -147,12 +149,12 @@ func (_c *MockSessionManager_CheckDNHealth_Call) Run(run func(ctx context.Contex
return _c return _c
} }
func (_c *MockSessionManager_CheckDNHealth_Call) Return(_a0 healthcheck.Result) *MockSessionManager_CheckDNHealth_Call { func (_c *MockSessionManager_CheckDNHealth_Call) Return(_a0 *healthcheck.Result) *MockSessionManager_CheckDNHealth_Call {
_c.Call.Return(_a0) _c.Call.Return(_a0)
return _c return _c
} }
func (_c *MockSessionManager_CheckDNHealth_Call) RunAndReturn(run func(context.Context) healthcheck.Result) *MockSessionManager_CheckDNHealth_Call { func (_c *MockSessionManager_CheckDNHealth_Call) RunAndReturn(run func(context.Context) *healthcheck.Result) *MockSessionManager_CheckDNHealth_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }

View File

@ -85,6 +85,8 @@ type Manager interface {
ExpireAllocations(channel string, ts Timestamp) ExpireAllocations(channel string, ts Timestamp)
// DropSegmentsOfChannel drops all segments in a channel // DropSegmentsOfChannel drops all segments in a channel
DropSegmentsOfChannel(ctx context.Context, channel string) DropSegmentsOfChannel(ctx context.Context, channel string)
// CleanZeroSealedSegmentsOfChannel try to clean real empty sealed segments in a channel
CleanZeroSealedSegmentsOfChannel(channel string, cpTs Timestamp)
} }
// Allocation records the allocation info // Allocation records the allocation info
@ -497,9 +499,6 @@ func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel strin
return nil, err return nil, err
} }
// TODO: It's too frequent; perhaps each channel could check once per minute instead.
s.cleanupSealedSegment(t, channel)
sealed, ok := s.channel2Sealed.Get(channel) sealed, ok := s.channel2Sealed.Get(channel)
if !ok { if !ok {
return nil, nil return nil, nil
@ -551,26 +550,35 @@ func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) {
}) })
} }
func (s *SegmentManager) cleanupSealedSegment(ts Timestamp, channel string) { func (s *SegmentManager) CleanZeroSealedSegmentsOfChannel(channel string, cpTs Timestamp) {
s.channelLock.Lock(channel)
defer s.channelLock.Unlock(channel)
sealed, ok := s.channel2Sealed.Get(channel) sealed, ok := s.channel2Sealed.Get(channel)
if !ok { if !ok {
log.Info("try remove empty sealed segment after channel cp updated failed to get channel", zap.String("channel", channel))
return return
} }
sealed.Range(func(id int64) bool { sealed.Range(func(id int64) bool {
segment := s.meta.GetHealthySegment(id) segment := s.meta.GetHealthySegment(id)
if segment == nil { if segment == nil {
log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", id)) log.Warn("try remove empty sealed segment, failed to get segment, remove it in channel2Sealed", zap.String("channel", channel), zap.Int64("segmentID", id))
sealed.Remove(id) sealed.Remove(id)
return true return true
} }
// Check if segment is empty // Check if segment is empty
if segment.GetLastExpireTime() <= ts && segment.currRows == 0 { if segment.GetLastExpireTime() > 0 && segment.GetLastExpireTime() < cpTs && segment.currRows == 0 && segment.GetNumOfRows() == 0 {
log.Info("remove empty sealed segment", zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id)) log.Info("try remove empty sealed segment after channel cp updated",
zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id),
zap.String("channel", channel), zap.Any("cpTs", cpTs))
if err := s.meta.SetState(id, commonpb.SegmentState_Dropped); err != nil { if err := s.meta.SetState(id, commonpb.SegmentState_Dropped); err != nil {
log.Warn("failed to set segment state to dropped", zap.String("channel", channel), log.Warn("try remove empty sealed segment after channel cp updated, failed to set segment state to dropped", zap.String("channel", channel),
zap.Int64("segmentID", id), zap.Error(err)) zap.Int64("segmentID", id), zap.Error(err))
} else { } else {
sealed.Remove(id) sealed.Remove(id)
log.Info("succeed to remove empty sealed segment",
zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id),
zap.String("channel", channel), zap.Any("cpTs", cpTs), zap.Any("expireTs", segment.GetLastExpireTime()))
} }
} }
return true return true

View File

@ -28,10 +28,12 @@ import (
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
mockkv "github.com/milvus-io/milvus/internal/kv/mocks" mockkv "github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/lock"
@ -495,6 +497,15 @@ func TestGetFlushableSegments(t *testing.T) {
assert.EqualValues(t, allocations[0].SegmentID, ids[0]) assert.EqualValues(t, allocations[0].SegmentID, ids[0])
meta.SetCurrentRows(allocations[0].SegmentID, 0) meta.SetCurrentRows(allocations[0].SegmentID, 0)
postions := make([]*msgpb.MsgPosition, 0)
cpTs := allocations[0].ExpireTime + 1
postions = append(postions, &msgpb.MsgPosition{
ChannelName: "c1",
MsgID: []byte{1, 2, 3},
Timestamp: cpTs,
})
meta.UpdateChannelCheckpoints(postions)
segmentManager.CleanZeroSealedSegmentsOfChannel("c1", cpTs)
ids, err = segmentManager.GetFlushableSegments(context.TODO(), "c1", allocations[0].ExpireTime) ids, err = segmentManager.GetFlushableSegments(context.TODO(), "c1", allocations[0].ExpireTime)
assert.NoError(t, err) assert.NoError(t, err)
assert.Empty(t, ids) assert.Empty(t, ids)
@ -881,3 +892,149 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) {
}) })
} }
} }
func TestSegmentManager_CleanZeroSealedSegmentsOfChannel(t *testing.T) {
partitionID := int64(100)
type fields struct {
meta *meta
segments []UniqueID
}
type args struct {
channel string
cpTs Timestamp
}
mockCatalog := mocks.NewDataCoordCatalog(t)
mockCatalog.EXPECT().AlterSegments(mock.Anything, mock.Anything, mock.Anything).Return(nil)
seg1 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
PartitionID: partitionID,
InsertChannel: "ch1",
State: commonpb.SegmentState_Sealed,
NumOfRows: 1,
LastExpireTime: 100,
},
currRows: 1,
}
seg2 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
PartitionID: partitionID,
InsertChannel: "ch1",
State: commonpb.SegmentState_Sealed,
NumOfRows: 0,
LastExpireTime: 100,
},
currRows: 0,
}
seg3 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
PartitionID: partitionID,
InsertChannel: "ch1",
State: commonpb.SegmentState_Sealed,
LastExpireTime: 90,
},
}
seg4 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 4,
PartitionID: partitionID,
InsertChannel: "ch2",
State: commonpb.SegmentState_Growing,
NumOfRows: 1,
LastExpireTime: 100,
},
currRows: 1,
}
newMetaFunc := func() *meta {
return &meta{
catalog: mockCatalog,
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: seg1,
2: seg2,
3: seg3,
4: seg4,
},
secondaryIndexes: segmentInfoIndexes{
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
0: {1: seg1, 2: seg2, 3: seg3, 4: seg4},
},
channel2Segments: map[string]map[UniqueID]*SegmentInfo{
"ch1": {1: seg1, 2: seg2, 3: seg3},
"ch2": {4: seg4},
},
},
},
}
}
tests := []struct {
name string
fields fields
args args
want []UniqueID
}{
{
"test clean empty sealed segments with normal channel cp <= lastExpireTs",
fields{
meta: newMetaFunc(),
segments: []UniqueID{1, 2, 3, 4},
},
args{
"ch1", 100,
},
[]UniqueID{1, 2, 4},
},
{
"test clean empty sealed segments with normal channel cp > lastExpireTs",
fields{
meta: newMetaFunc(),
segments: []UniqueID{1, 2, 3, 4},
},
args{
"ch1", 101,
},
[]UniqueID{1, 4},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &SegmentManager{
meta: tt.fields.meta,
channelLock: lock.NewKeyLock[string](),
channel2Growing: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](),
channel2Sealed: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](),
}
for _, segmentID := range tt.fields.segments {
segmentInfo := tt.fields.meta.GetSegment(segmentID)
channel := tt.args.channel
if segmentInfo != nil {
channel = segmentInfo.GetInsertChannel()
}
if segmentInfo == nil || segmentInfo.GetState() == commonpb.SegmentState_Growing {
growing, _ := s.channel2Growing.GetOrInsert(channel, typeutil.NewUniqueSet())
growing.Insert(segmentID)
} else if segmentInfo.GetState() == commonpb.SegmentState_Sealed {
sealed, _ := s.channel2Sealed.GetOrInsert(channel, typeutil.NewUniqueSet())
sealed.Insert(segmentID)
}
}
s.CleanZeroSealedSegmentsOfChannel(tt.args.channel, tt.args.cpTs)
all := make([]int64, 0)
s.channel2Sealed.Range(func(_ string, segments typeutil.UniqueSet) bool {
all = append(all, segments.Collect()...)
return true
})
s.channel2Growing.Range(func(_ string, segments typeutil.UniqueSet) bool {
all = append(all, segments.Collect()...)
return true
})
assert.ElementsMatch(t, tt.want, all)
})
}
}

View File

@ -824,7 +824,7 @@ func TestServer_getSystemInfoMetrics(t *testing.T) {
func TestDropVirtualChannel(t *testing.T) { func TestDropVirtualChannel(t *testing.T) {
t.Run("normal DropVirtualChannel", func(t *testing.T) { t.Run("normal DropVirtualChannel", func(t *testing.T) {
segmentManager := NewMockManager(t) segmentManager := NewMockSegmentManager(t)
svr := newTestServer(t, WithSegmentManager(segmentManager)) svr := newTestServer(t, WithSegmentManager(segmentManager))
defer closeTestServer(t, svr) defer closeTestServer(t, svr)

View File

@ -1387,6 +1387,13 @@ func (s *Server) UpdateChannelCheckpoint(ctx context.Context, req *datapb.Update
return merr.Status(err), nil return merr.Status(err), nil
} }
for _, pos := range checkpoints {
if pos == nil || pos.GetMsgID() == nil || pos.GetChannelName() == "" {
continue
}
s.segmentManager.CleanZeroSealedSegmentsOfChannel(pos.GetChannelName(), pos.GetTimestamp())
}
return merr.Success(), nil return merr.Success(), nil
} }