diff --git a/internal/coordinator/mix_coord.go b/internal/coordinator/mix_coord.go index 804021cce2..638b3a78f2 100644 --- a/internal/coordinator/mix_coord.go +++ b/internal/coordinator/mix_coord.go @@ -1052,6 +1052,10 @@ func (s *mixCoordImpl) AllocSegment(ctx context.Context, req *datapb.AllocSegmen return s.datacoordServer.AllocSegment(ctx, req) } +func (s *mixCoordImpl) NotifyDropPartition(ctx context.Context, channel string, partitionIDs []int64) error { + return s.datacoordServer.NotifyDropPartition(ctx, channel, partitionIDs) +} + // RegisterStreamingCoordGRPCService registers the grpc service of streaming coordinator. func (s *mixCoordImpl) RegisterStreamingCoordGRPCService(server *grpc.Server) { s.streamingCoord.RegisterGRPCService(server) diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 0a4d96ef20..c7efc802f7 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -2218,3 +2218,45 @@ func (m *meta) getSegmentsMetrics(collectionID int64) []*metricsinfo.Segment { return segments } + +func (m *meta) DropSegmentsOfPartition(ctx context.Context, partitionIDs []int64) error { + m.segMu.Lock() + defer m.segMu.Unlock() + + // Filter out the segments of the partition to be dropped. + metricMutation := &segMetricMutation{ + stateChange: make(map[string]map[string]map[string]int), + } + modSegments := make([]*SegmentInfo, 0) + segments := make([]*datapb.SegmentInfo, 0) + // set existed segments of channel to Dropped + for _, seg := range m.segments.segments { + if contains(partitionIDs, seg.PartitionID) { + clonedSeg := seg.Clone() + updateSegStateAndPrepareMetrics(clonedSeg, commonpb.SegmentState_Dropped, metricMutation) + modSegments = append(modSegments, clonedSeg) + segments = append(segments, clonedSeg.SegmentInfo) + } + } + + // Save dropped segments in batch into meta. + err := m.catalog.SaveDroppedSegmentsInBatch(m.ctx, segments) + if err != nil { + return err + } + // update memory info + for _, segment := range modSegments { + m.segments.SetSegment(segment.GetID(), segment) + } + metricMutation.commit() + return nil +} + +func contains(arr []int64, target int64) bool { + for _, val := range arr { + if val == target { + return true + } + } + return false +} diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 85ee3ba2ad..22f3ce0a44 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -1488,3 +1488,37 @@ func TestMeta_GetSegmentsJSON(t *testing.T) { assert.Equal(t, "Sealed", segments[1].State) assert.True(t, segments[1].Compacted) } + +func Test_meta_DropSegmentsOfPartition(t *testing.T) { + meta, err := newMemoryMeta(t) + assert.NoError(t, err) + + err = meta.AddSegment(context.Background(), NewSegmentInfo(&datapb.SegmentInfo{ + ID: 1, + PartitionID: 1, + CollectionID: 1, + })) + assert.NoError(t, err) + err = meta.AddSegment(context.Background(), NewSegmentInfo(&datapb.SegmentInfo{ + ID: 2, + PartitionID: 1, + CollectionID: 1, + })) + assert.NoError(t, err) + err = meta.AddSegment(context.Background(), NewSegmentInfo(&datapb.SegmentInfo{ + ID: 3, + PartitionID: 2, + CollectionID: 1, + })) + assert.NoError(t, err) + + err = meta.DropSegmentsOfPartition(context.Background(), []int64{1}) + assert.NoError(t, err) + + segment := meta.GetSegment(context.Background(), 1) + assert.Equal(t, commonpb.SegmentState_Dropped, segment.GetState()) + segment = meta.GetSegment(context.Background(), 2) + assert.Equal(t, commonpb.SegmentState_Dropped, segment.GetState()) + segment = meta.GetSegment(context.Background(), 3) + assert.NotEqual(t, commonpb.SegmentState_Dropped, segment.GetState()) +} diff --git a/internal/datacoord/mock_segment_manager.go b/internal/datacoord/mock_segment_manager.go index 927b69916b..047fe569b1 100644 --- a/internal/datacoord/mock_segment_manager.go +++ b/internal/datacoord/mock_segment_manager.go @@ -247,6 +247,41 @@ func (_c *MockManager_DropSegmentsOfChannel_Call) RunAndReturn(run func(context. return _c } +// DropSegmentsOfPartition provides a mock function with given fields: ctx, channel, partitionID +func (_m *MockManager) DropSegmentsOfPartition(ctx context.Context, channel string, partitionID []int64) { + _m.Called(ctx, channel, partitionID) +} + +// MockManager_DropSegmentsOfPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropSegmentsOfPartition' +type MockManager_DropSegmentsOfPartition_Call struct { + *mock.Call +} + +// DropSegmentsOfPartition is a helper method to define mock.On call +// - ctx context.Context +// - channel string +// - partitionID []int64 +func (_e *MockManager_Expecter) DropSegmentsOfPartition(ctx interface{}, channel interface{}, partitionID interface{}) *MockManager_DropSegmentsOfPartition_Call { + return &MockManager_DropSegmentsOfPartition_Call{Call: _e.mock.On("DropSegmentsOfPartition", ctx, channel, partitionID)} +} + +func (_c *MockManager_DropSegmentsOfPartition_Call) Run(run func(ctx context.Context, channel string, partitionID []int64)) *MockManager_DropSegmentsOfPartition_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].([]int64)) + }) + return _c +} + +func (_c *MockManager_DropSegmentsOfPartition_Call) Return() *MockManager_DropSegmentsOfPartition_Call { + _c.Call.Return() + return _c +} + +func (_c *MockManager_DropSegmentsOfPartition_Call) RunAndReturn(run func(context.Context, string, []int64)) *MockManager_DropSegmentsOfPartition_Call { + _c.Run(run) + return _c +} + // ExpireAllocations provides a mock function with given fields: ctx, channel, ts func (_m *MockManager) ExpireAllocations(ctx context.Context, channel string, ts uint64) { _m.Called(ctx, channel, ts) diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 2b2543e541..8d85f3234e 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -881,6 +881,10 @@ func (s *mockMixCoord) AllocSegment(ctx context.Context, req *datapb.AllocSegmen panic("implement me") } +func (s *mockMixCoord) NotifyDropPartition(ctx context.Context, channel string, partitionIDs []int64) error { + panic("implement me") +} + // RegisterStreamingCoordGRPCService registers the grpc service of streaming coordinator. func (s *mockMixCoord) RegisterStreamingCoordGRPCService(server *grpc.Server) { panic("implement me") diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 1273ee1892..52b3f77581 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -101,6 +101,8 @@ type Manager interface { DropSegmentsOfChannel(ctx context.Context, channel string) // CleanZeroSealedSegmentsOfChannel try to clean real empty sealed segments in a channel CleanZeroSealedSegmentsOfChannel(ctx context.Context, channel string, cpTs Timestamp) + // DropSegmentsOfPartition drops all segments in a partition + DropSegmentsOfPartition(ctx context.Context, channel string, partitionID []int64) } // Allocation records the allocation info @@ -706,3 +708,48 @@ func (s *SegmentManager) DropSegmentsOfChannel(ctx context.Context, channel stri }) s.channel2Growing.Remove(channel) } + +func (s *SegmentManager) DropSegmentsOfPartition(ctx context.Context, channel string, partitionIDs []int64) { + s.channelLock.Lock(channel) + defer s.channelLock.Unlock(channel) + if growing, ok := s.channel2Growing.Get(channel); ok { + for sid := range growing { + segment := s.meta.GetHealthySegment(ctx, sid) + if segment == nil { + log.Warn("failed to get segment, remove it", + zap.String("channel", channel), + zap.Int64("segmentID", sid)) + growing.Remove(sid) + continue + } + + if contains(partitionIDs, segment.GetPartitionID()) { + growing.Remove(sid) + } + s.meta.SetAllocations(sid, nil) + for _, allocation := range segment.allocations { + putAllocation(allocation) + } + } + } + + if sealed, ok := s.channel2Sealed.Get(channel); ok { + for sid := range sealed { + segment := s.meta.GetHealthySegment(ctx, sid) + if segment == nil { + log.Warn("failed to get segment, remove it", + zap.String("channel", channel), + zap.Int64("segmentID", sid)) + sealed.Remove(sid) + continue + } + if contains(partitionIDs, segment.GetPartitionID()) { + sealed.Remove(sid) + } + s.meta.SetAllocations(sid, nil) + for _, allocation := range segment.allocations { + putAllocation(allocation) + } + } + } +} diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 50c689307c..402ed086d3 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -1070,3 +1070,26 @@ func TestSegmentManager_CleanZeroSealedSegmentsOfChannel(t *testing.T) { }) } } + +func TestDropSegmentOfPartition(t *testing.T) { + paramtable.Init() + mockAllocator := newMockAllocator(t) + meta, err := newMemoryMeta(t) + assert.NoError(t, err) + + schema := newTestSchema() + collID, err := mockAllocator.AllocID(context.Background()) + assert.NoError(t, err) + meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) + segmentManager, _ := newSegmentManager(meta, mockAllocator) + allocations, err := segmentManager.AllocSegment(context.Background(), collID, 100, "c1", 1000, storage.StorageV1) + assert.NoError(t, err) + assert.EqualValues(t, 1, len(allocations)) + segID := allocations[0].SegmentID + segment := meta.GetHealthySegment(context.TODO(), segID) + assert.NotNil(t, segment) + + segmentManager.DropSegmentsOfPartition(context.Background(), "c1", []int64{100}) + segment = meta.GetHealthySegment(context.TODO(), segID) + assert.NotNil(t, segment) +} diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 70849509a4..e0a1aedbe0 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1954,3 +1954,16 @@ func (s *Server) ListImports(ctx context.Context, req *internalpb.ListImportsReq } return resp, nil } + +// NotifyDropPartition notifies DataCoord to drop segments of specified partition +func (s *Server) NotifyDropPartition(ctx context.Context, channel string, partitionIDs []int64) error { + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return err + } + log.Ctx(ctx).Info("receive NotifyDropPartition request", + zap.String("channelname", channel), + zap.Any("partitionID", partitionIDs)) + s.segmentManager.DropSegmentsOfPartition(ctx, channel, partitionIDs) + // release all segments of the partition. + return s.meta.DropSegmentsOfPartition(ctx, partitionIDs) +} diff --git a/internal/mocks/mock_mixcoord.go b/internal/mocks/mock_mixcoord.go index 7a3544c3e7..af9787d7d0 100644 --- a/internal/mocks/mock_mixcoord.go +++ b/internal/mocks/mock_mixcoord.go @@ -5903,6 +5903,54 @@ func (_c *MixCoord_MarkSegmentsDropped_Call) RunAndReturn(run func(context.Conte return _c } +// NotifyDropPartition provides a mock function with given fields: ctx, channel, partitionIDs +func (_m *MixCoord) NotifyDropPartition(ctx context.Context, channel string, partitionIDs []int64) error { + ret := _m.Called(ctx, channel, partitionIDs) + + if len(ret) == 0 { + panic("no return value specified for NotifyDropPartition") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, []int64) error); ok { + r0 = rf(ctx, channel, partitionIDs) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MixCoord_NotifyDropPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NotifyDropPartition' +type MixCoord_NotifyDropPartition_Call struct { + *mock.Call +} + +// NotifyDropPartition is a helper method to define mock.On call +// - ctx context.Context +// - channel string +// - partitionIDs []int64 +func (_e *MixCoord_Expecter) NotifyDropPartition(ctx interface{}, channel interface{}, partitionIDs interface{}) *MixCoord_NotifyDropPartition_Call { + return &MixCoord_NotifyDropPartition_Call{Call: _e.mock.On("NotifyDropPartition", ctx, channel, partitionIDs)} +} + +func (_c *MixCoord_NotifyDropPartition_Call) Run(run func(ctx context.Context, channel string, partitionIDs []int64)) *MixCoord_NotifyDropPartition_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].([]int64)) + }) + return _c +} + +func (_c *MixCoord_NotifyDropPartition_Call) Return(_a0 error) *MixCoord_NotifyDropPartition_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MixCoord_NotifyDropPartition_Call) RunAndReturn(run func(context.Context, string, []int64) error) *MixCoord_NotifyDropPartition_Call { + _c.Call.Return(run) + return _c +} + // OperatePrivilege provides a mock function with given fields: _a0, _a1 func (_m *MixCoord) OperatePrivilege(_a0 context.Context, _a1 *milvuspb.OperatePrivilegeRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/proxy/mock_channels_manager.go b/internal/proxy/mock_channels_manager.go index a21686b1ab..54a672d0a8 100644 --- a/internal/proxy/mock_channels_manager.go +++ b/internal/proxy/mock_channels_manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.46.0. DO NOT EDIT. +// Code generated by mockery v2.53.3. DO NOT EDIT. package proxy @@ -197,7 +197,7 @@ func (_c *MockChannelsMgr_getVChannels_Call) RunAndReturn(run func(int64) ([]str return _c } -// removeAllDMLStream provides a mock function with given fields: +// removeAllDMLStream provides a mock function with no fields func (_m *MockChannelsMgr) removeAllDMLStream() { _m.Called() } @@ -225,7 +225,7 @@ func (_c *MockChannelsMgr_removeAllDMLStream_Call) Return() *MockChannelsMgr_rem } func (_c *MockChannelsMgr_removeAllDMLStream_Call) RunAndReturn(run func()) *MockChannelsMgr_removeAllDMLStream_Call { - _c.Call.Return(run) + _c.Run(run) return _c } @@ -258,7 +258,7 @@ func (_c *MockChannelsMgr_removeDMLStream_Call) Return() *MockChannelsMgr_remove } func (_c *MockChannelsMgr_removeDMLStream_Call) RunAndReturn(run func(int64)) *MockChannelsMgr_removeDMLStream_Call { - _c.Call.Return(run) + _c.Run(run) return _c } diff --git a/internal/types/types.go b/internal/types/types.go index 5b8d156c90..9d08867c80 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -303,6 +303,8 @@ type MixCoord interface { // GetMetrics notifies MixCoordComponent to collect metrics for specified component GetQcMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) + // GetMetrics notifies MixCoordComponent to collect metrics for specified component + NotifyDropPartition(ctx context.Context, channel string, partitionIDs []int64) error } // MixCoordComponent is used by grpc server of MixCoord