feat: add NotifyDropPartition in mixcoord for droppartition in dc (#42029)

add NotifyDropPartition in mixcoord for droppartition in dc
issue:https://github.com/milvus-io/milvus/issues/41976
https://github.com/milvus-io/milvus/issues/41542

Signed-off-by: Xianhui.Lin <xianhui.lin@zilliz.com>
This commit is contained in:
Xianhui Lin 2025-05-23 18:32:26 +08:00 committed by GitHub
parent 38c804fb01
commit a72492169f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 256 additions and 4 deletions

View File

@ -1052,6 +1052,10 @@ func (s *mixCoordImpl) AllocSegment(ctx context.Context, req *datapb.AllocSegmen
return s.datacoordServer.AllocSegment(ctx, req)
}
func (s *mixCoordImpl) NotifyDropPartition(ctx context.Context, channel string, partitionIDs []int64) error {
return s.datacoordServer.NotifyDropPartition(ctx, channel, partitionIDs)
}
// RegisterStreamingCoordGRPCService registers the grpc service of streaming coordinator.
func (s *mixCoordImpl) RegisterStreamingCoordGRPCService(server *grpc.Server) {
s.streamingCoord.RegisterGRPCService(server)

View File

@ -2218,3 +2218,45 @@ func (m *meta) getSegmentsMetrics(collectionID int64) []*metricsinfo.Segment {
return segments
}
func (m *meta) DropSegmentsOfPartition(ctx context.Context, partitionIDs []int64) error {
m.segMu.Lock()
defer m.segMu.Unlock()
// Filter out the segments of the partition to be dropped.
metricMutation := &segMetricMutation{
stateChange: make(map[string]map[string]map[string]int),
}
modSegments := make([]*SegmentInfo, 0)
segments := make([]*datapb.SegmentInfo, 0)
// set existed segments of channel to Dropped
for _, seg := range m.segments.segments {
if contains(partitionIDs, seg.PartitionID) {
clonedSeg := seg.Clone()
updateSegStateAndPrepareMetrics(clonedSeg, commonpb.SegmentState_Dropped, metricMutation)
modSegments = append(modSegments, clonedSeg)
segments = append(segments, clonedSeg.SegmentInfo)
}
}
// Save dropped segments in batch into meta.
err := m.catalog.SaveDroppedSegmentsInBatch(m.ctx, segments)
if err != nil {
return err
}
// update memory info
for _, segment := range modSegments {
m.segments.SetSegment(segment.GetID(), segment)
}
metricMutation.commit()
return nil
}
func contains(arr []int64, target int64) bool {
for _, val := range arr {
if val == target {
return true
}
}
return false
}

View File

@ -1488,3 +1488,37 @@ func TestMeta_GetSegmentsJSON(t *testing.T) {
assert.Equal(t, "Sealed", segments[1].State)
assert.True(t, segments[1].Compacted)
}
func Test_meta_DropSegmentsOfPartition(t *testing.T) {
meta, err := newMemoryMeta(t)
assert.NoError(t, err)
err = meta.AddSegment(context.Background(), NewSegmentInfo(&datapb.SegmentInfo{
ID: 1,
PartitionID: 1,
CollectionID: 1,
}))
assert.NoError(t, err)
err = meta.AddSegment(context.Background(), NewSegmentInfo(&datapb.SegmentInfo{
ID: 2,
PartitionID: 1,
CollectionID: 1,
}))
assert.NoError(t, err)
err = meta.AddSegment(context.Background(), NewSegmentInfo(&datapb.SegmentInfo{
ID: 3,
PartitionID: 2,
CollectionID: 1,
}))
assert.NoError(t, err)
err = meta.DropSegmentsOfPartition(context.Background(), []int64{1})
assert.NoError(t, err)
segment := meta.GetSegment(context.Background(), 1)
assert.Equal(t, commonpb.SegmentState_Dropped, segment.GetState())
segment = meta.GetSegment(context.Background(), 2)
assert.Equal(t, commonpb.SegmentState_Dropped, segment.GetState())
segment = meta.GetSegment(context.Background(), 3)
assert.NotEqual(t, commonpb.SegmentState_Dropped, segment.GetState())
}

View File

@ -247,6 +247,41 @@ func (_c *MockManager_DropSegmentsOfChannel_Call) RunAndReturn(run func(context.
return _c
}
// DropSegmentsOfPartition provides a mock function with given fields: ctx, channel, partitionID
func (_m *MockManager) DropSegmentsOfPartition(ctx context.Context, channel string, partitionID []int64) {
_m.Called(ctx, channel, partitionID)
}
// MockManager_DropSegmentsOfPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropSegmentsOfPartition'
type MockManager_DropSegmentsOfPartition_Call struct {
*mock.Call
}
// DropSegmentsOfPartition is a helper method to define mock.On call
// - ctx context.Context
// - channel string
// - partitionID []int64
func (_e *MockManager_Expecter) DropSegmentsOfPartition(ctx interface{}, channel interface{}, partitionID interface{}) *MockManager_DropSegmentsOfPartition_Call {
return &MockManager_DropSegmentsOfPartition_Call{Call: _e.mock.On("DropSegmentsOfPartition", ctx, channel, partitionID)}
}
func (_c *MockManager_DropSegmentsOfPartition_Call) Run(run func(ctx context.Context, channel string, partitionID []int64)) *MockManager_DropSegmentsOfPartition_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].([]int64))
})
return _c
}
func (_c *MockManager_DropSegmentsOfPartition_Call) Return() *MockManager_DropSegmentsOfPartition_Call {
_c.Call.Return()
return _c
}
func (_c *MockManager_DropSegmentsOfPartition_Call) RunAndReturn(run func(context.Context, string, []int64)) *MockManager_DropSegmentsOfPartition_Call {
_c.Run(run)
return _c
}
// ExpireAllocations provides a mock function with given fields: ctx, channel, ts
func (_m *MockManager) ExpireAllocations(ctx context.Context, channel string, ts uint64) {
_m.Called(ctx, channel, ts)

View File

@ -881,6 +881,10 @@ func (s *mockMixCoord) AllocSegment(ctx context.Context, req *datapb.AllocSegmen
panic("implement me")
}
func (s *mockMixCoord) NotifyDropPartition(ctx context.Context, channel string, partitionIDs []int64) error {
panic("implement me")
}
// RegisterStreamingCoordGRPCService registers the grpc service of streaming coordinator.
func (s *mockMixCoord) RegisterStreamingCoordGRPCService(server *grpc.Server) {
panic("implement me")

View File

@ -101,6 +101,8 @@ type Manager interface {
DropSegmentsOfChannel(ctx context.Context, channel string)
// CleanZeroSealedSegmentsOfChannel try to clean real empty sealed segments in a channel
CleanZeroSealedSegmentsOfChannel(ctx context.Context, channel string, cpTs Timestamp)
// DropSegmentsOfPartition drops all segments in a partition
DropSegmentsOfPartition(ctx context.Context, channel string, partitionID []int64)
}
// Allocation records the allocation info
@ -706,3 +708,48 @@ func (s *SegmentManager) DropSegmentsOfChannel(ctx context.Context, channel stri
})
s.channel2Growing.Remove(channel)
}
func (s *SegmentManager) DropSegmentsOfPartition(ctx context.Context, channel string, partitionIDs []int64) {
s.channelLock.Lock(channel)
defer s.channelLock.Unlock(channel)
if growing, ok := s.channel2Growing.Get(channel); ok {
for sid := range growing {
segment := s.meta.GetHealthySegment(ctx, sid)
if segment == nil {
log.Warn("failed to get segment, remove it",
zap.String("channel", channel),
zap.Int64("segmentID", sid))
growing.Remove(sid)
continue
}
if contains(partitionIDs, segment.GetPartitionID()) {
growing.Remove(sid)
}
s.meta.SetAllocations(sid, nil)
for _, allocation := range segment.allocations {
putAllocation(allocation)
}
}
}
if sealed, ok := s.channel2Sealed.Get(channel); ok {
for sid := range sealed {
segment := s.meta.GetHealthySegment(ctx, sid)
if segment == nil {
log.Warn("failed to get segment, remove it",
zap.String("channel", channel),
zap.Int64("segmentID", sid))
sealed.Remove(sid)
continue
}
if contains(partitionIDs, segment.GetPartitionID()) {
sealed.Remove(sid)
}
s.meta.SetAllocations(sid, nil)
for _, allocation := range segment.allocations {
putAllocation(allocation)
}
}
}
}

View File

@ -1070,3 +1070,26 @@ func TestSegmentManager_CleanZeroSealedSegmentsOfChannel(t *testing.T) {
})
}
}
func TestDropSegmentOfPartition(t *testing.T) {
paramtable.Init()
mockAllocator := newMockAllocator(t)
meta, err := newMemoryMeta(t)
assert.NoError(t, err)
schema := newTestSchema()
collID, err := mockAllocator.AllocID(context.Background())
assert.NoError(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
segmentManager, _ := newSegmentManager(meta, mockAllocator)
allocations, err := segmentManager.AllocSegment(context.Background(), collID, 100, "c1", 1000, storage.StorageV1)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations))
segID := allocations[0].SegmentID
segment := meta.GetHealthySegment(context.TODO(), segID)
assert.NotNil(t, segment)
segmentManager.DropSegmentsOfPartition(context.Background(), "c1", []int64{100})
segment = meta.GetHealthySegment(context.TODO(), segID)
assert.NotNil(t, segment)
}

View File

@ -1954,3 +1954,16 @@ func (s *Server) ListImports(ctx context.Context, req *internalpb.ListImportsReq
}
return resp, nil
}
// NotifyDropPartition notifies DataCoord to drop segments of specified partition
func (s *Server) NotifyDropPartition(ctx context.Context, channel string, partitionIDs []int64) error {
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
return err
}
log.Ctx(ctx).Info("receive NotifyDropPartition request",
zap.String("channelname", channel),
zap.Any("partitionID", partitionIDs))
s.segmentManager.DropSegmentsOfPartition(ctx, channel, partitionIDs)
// release all segments of the partition.
return s.meta.DropSegmentsOfPartition(ctx, partitionIDs)
}

View File

@ -5903,6 +5903,54 @@ func (_c *MixCoord_MarkSegmentsDropped_Call) RunAndReturn(run func(context.Conte
return _c
}
// NotifyDropPartition provides a mock function with given fields: ctx, channel, partitionIDs
func (_m *MixCoord) NotifyDropPartition(ctx context.Context, channel string, partitionIDs []int64) error {
ret := _m.Called(ctx, channel, partitionIDs)
if len(ret) == 0 {
panic("no return value specified for NotifyDropPartition")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, []int64) error); ok {
r0 = rf(ctx, channel, partitionIDs)
} else {
r0 = ret.Error(0)
}
return r0
}
// MixCoord_NotifyDropPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NotifyDropPartition'
type MixCoord_NotifyDropPartition_Call struct {
*mock.Call
}
// NotifyDropPartition is a helper method to define mock.On call
// - ctx context.Context
// - channel string
// - partitionIDs []int64
func (_e *MixCoord_Expecter) NotifyDropPartition(ctx interface{}, channel interface{}, partitionIDs interface{}) *MixCoord_NotifyDropPartition_Call {
return &MixCoord_NotifyDropPartition_Call{Call: _e.mock.On("NotifyDropPartition", ctx, channel, partitionIDs)}
}
func (_c *MixCoord_NotifyDropPartition_Call) Run(run func(ctx context.Context, channel string, partitionIDs []int64)) *MixCoord_NotifyDropPartition_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].([]int64))
})
return _c
}
func (_c *MixCoord_NotifyDropPartition_Call) Return(_a0 error) *MixCoord_NotifyDropPartition_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MixCoord_NotifyDropPartition_Call) RunAndReturn(run func(context.Context, string, []int64) error) *MixCoord_NotifyDropPartition_Call {
_c.Call.Return(run)
return _c
}
// OperatePrivilege provides a mock function with given fields: _a0, _a1
func (_m *MixCoord) OperatePrivilege(_a0 context.Context, _a1 *milvuspb.OperatePrivilegeRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.46.0. DO NOT EDIT.
// Code generated by mockery v2.53.3. DO NOT EDIT.
package proxy
@ -197,7 +197,7 @@ func (_c *MockChannelsMgr_getVChannels_Call) RunAndReturn(run func(int64) ([]str
return _c
}
// removeAllDMLStream provides a mock function with given fields:
// removeAllDMLStream provides a mock function with no fields
func (_m *MockChannelsMgr) removeAllDMLStream() {
_m.Called()
}
@ -225,7 +225,7 @@ func (_c *MockChannelsMgr_removeAllDMLStream_Call) Return() *MockChannelsMgr_rem
}
func (_c *MockChannelsMgr_removeAllDMLStream_Call) RunAndReturn(run func()) *MockChannelsMgr_removeAllDMLStream_Call {
_c.Call.Return(run)
_c.Run(run)
return _c
}
@ -258,7 +258,7 @@ func (_c *MockChannelsMgr_removeDMLStream_Call) Return() *MockChannelsMgr_remove
}
func (_c *MockChannelsMgr_removeDMLStream_Call) RunAndReturn(run func(int64)) *MockChannelsMgr_removeDMLStream_Call {
_c.Call.Return(run)
_c.Run(run)
return _c
}

View File

@ -303,6 +303,8 @@ type MixCoord interface {
// GetMetrics notifies MixCoordComponent to collect metrics for specified component
GetQcMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
// GetMetrics notifies MixCoordComponent to collect metrics for specified component
NotifyDropPartition(ctx context.Context, channel string, partitionIDs []int64) error
}
// MixCoordComponent is used by grpc server of MixCoord