From cedb33ceecfc71d8a855810ea188a15fad9f099f Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 8 May 2024 21:37:29 +0800 Subject: [PATCH] enhance: Improve datacoord segment filtering with collection (#32831) See also #32165 This PR modify the `SelectSegments` interface to utilizing collection id information when selecting segment with provided collection --------- Signed-off-by: Congqi Xia --- .../broker/mock_coordinator_broker.go | 53 -- internal/datacoord/compaction.go | 9 +- internal/datacoord/compaction_test.go | 4 +- .../datacoord/compaction_trigger_v2_test.go | 8 +- .../datacoord/compaction_view_manager_test.go | 8 +- internal/datacoord/garbage_collector.go | 2 +- internal/datacoord/garbage_collector_test.go | 441 ++++++++-------- internal/datacoord/handler.go | 12 +- internal/datacoord/index_service.go | 38 +- internal/datacoord/index_service_test.go | 488 +++++++++--------- internal/datacoord/meta.go | 58 +-- internal/datacoord/mock_compaction_meta.go | 35 +- internal/datacoord/segment_info.go | 68 ++- internal/datacoord/segment_operator.go | 49 ++ 14 files changed, 659 insertions(+), 614 deletions(-) diff --git a/internal/datacoord/broker/mock_coordinator_broker.go b/internal/datacoord/broker/mock_coordinator_broker.go index c0e817b6ad..c952eba15b 100644 --- a/internal/datacoord/broker/mock_coordinator_broker.go +++ b/internal/datacoord/broker/mock_coordinator_broker.go @@ -77,59 +77,6 @@ func (_c *MockBroker_DescribeCollectionInternal_Call) RunAndReturn(run func(cont return _c } -// GetDatabaseID provides a mock function with given fields: ctx, dbName -func (_m *MockBroker) GetDatabaseID(ctx context.Context, dbName string) (int64, error) { - ret := _m.Called(ctx, dbName) - - var r0 int64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string) (int64, error)); ok { - return rf(ctx, dbName) - } - if rf, ok := ret.Get(0).(func(context.Context, string) int64); ok { - r0 = rf(ctx, dbName) - } else { - r0 = ret.Get(0).(int64) - } - - if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { - r1 = rf(ctx, dbName) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockBroker_GetDatabaseID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDatabaseID' -type MockBroker_GetDatabaseID_Call struct { - *mock.Call -} - -// GetDatabaseID is a helper method to define mock.On call -// - ctx context.Context -// - dbName string -func (_e *MockBroker_Expecter) GetDatabaseID(ctx interface{}, dbName interface{}) *MockBroker_GetDatabaseID_Call { - return &MockBroker_GetDatabaseID_Call{Call: _e.mock.On("GetDatabaseID", ctx, dbName)} -} - -func (_c *MockBroker_GetDatabaseID_Call) Run(run func(ctx context.Context, dbName string)) *MockBroker_GetDatabaseID_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *MockBroker_GetDatabaseID_Call) Return(_a0 int64, _a1 error) *MockBroker_GetDatabaseID_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockBroker_GetDatabaseID_Call) RunAndReturn(run func(context.Context, string) (int64, error)) *MockBroker_GetDatabaseID_Call { - _c.Call.Return(run) - return _c -} - // HasCollection provides a mock function with given fields: ctx, collectionID func (_m *MockBroker) HasCollection(ctx context.Context, collectionID int64) (bool, error) { ret := _m.Called(ctx, collectionID) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 6e5e50a950..dff05b00aa 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -76,7 +76,7 @@ var ( ) type CompactionMeta interface { - SelectSegments(selector SegmentInfoSelector) []*SegmentInfo + SelectSegments(filters ...SegmentFilter) []*SegmentInfo GetHealthySegment(segID UniqueID) *SegmentInfo UpdateSegmentsInfo(operators ...UpdateOperator) error SetSegmentCompacting(segmentID int64, compacting bool) @@ -322,16 +322,15 @@ func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) error { // Select sealed L1 segments for LevelZero compaction that meets the condition: // dmlPos < triggerInfo.pos // TODO: select L2 segments too - sealedSegments := c.meta.SelectSegments(func(info *SegmentInfo) bool { - return info.GetCollectionID() == task.triggerInfo.collectionID && - (task.triggerInfo.partitionID == -1 || info.GetPartitionID() == task.triggerInfo.partitionID) && + sealedSegments := c.meta.SelectSegments(WithCollection(task.triggerInfo.collectionID), SegmentFilterFunc(func(info *SegmentInfo) bool { + return (task.triggerInfo.partitionID == -1 || info.GetPartitionID() == task.triggerInfo.partitionID) && info.GetInsertChannel() == plan.GetChannel() && isFlushState(info.GetState()) && !info.isCompacting && !info.GetIsImporting() && info.GetLevel() != datapb.SegmentLevel_L0 && info.GetDmlPosition().GetTimestamp() < task.triggerInfo.pos.GetTimestamp() - }) + })) if len(sealedSegments) == 0 { return errors.Errorf("Selected zero L1/L2 segments for the position=%v", task.triggerInfo.pos) } diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 285e88de78..e21f06f29f 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -203,7 +203,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleL0CompactionResults() { func (s *CompactionPlanHandlerSuite) TestRefreshL0Plan() { channel := "Ch-1" deltalogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)} - s.mockMeta.EXPECT().SelectSegments(mock.Anything).Return( + s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return( []*SegmentInfo{ {SegmentInfo: &datapb.SegmentInfo{ ID: 200, @@ -310,7 +310,7 @@ func (s *CompactionPlanHandlerSuite) TestRefreshL0Plan() { Deltalogs: deltalogs, }} }).Times(2) - s.mockMeta.EXPECT().SelectSegments(mock.Anything).Return(nil).Once() + s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return(nil).Once() // 2 l0 segments plan := &datapb.CompactionPlan{ diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index 7282c2c3f0..3176e04a0b 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -36,9 +36,11 @@ func (s *CompactionTriggerManagerSuite) SetupTest() { PartitionID: 10, Channel: "ch-1", } - s.meta = &meta{segments: &SegmentsInfo{ - segments: genSegmentsForMeta(s.testLabel), - }} + segments := genSegmentsForMeta(s.testLabel) + s.meta = &meta{segments: NewSegmentsInfo()} + for id, segment := range segments { + s.meta.segments.SetSegment(id, segment) + } s.m = NewCompactionTriggerManager(s.mockAlloc, s.mockPlanContext) } diff --git a/internal/datacoord/compaction_view_manager_test.go b/internal/datacoord/compaction_view_manager_test.go index c9709947ac..d1712e6d9e 100644 --- a/internal/datacoord/compaction_view_manager_test.go +++ b/internal/datacoord/compaction_view_manager_test.go @@ -80,9 +80,11 @@ func (s *CompactionViewManagerSuite) SetupTest() { Channel: "ch-1", } - meta := &meta{segments: &SegmentsInfo{ - segments: genSegmentsForMeta(s.testLabel), - }} + segments := genSegmentsForMeta(s.testLabel) + meta := &meta{segments: NewSegmentsInfo()} + for id, segment := range segments { + meta.segments.SetSegment(id, segment) + } s.m = NewCompactionViewManager(meta, s.mockTriggerManager, s.mockAlloc) } diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index 79cfb068ef..b62312d55d 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -410,7 +410,7 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) { log.Info("start clear dropped segments...") defer func() { log.Info("clear dropped segments done", zap.Duration("timeCost", time.Since(start))) }() - all := gc.meta.SelectSegments(func(si *SegmentInfo) bool { return true }) + all := gc.meta.SelectSegments() drops := make(map[int64]*SegmentInfo, 0) compactTo := make(map[int64]*SegmentInfo) channels := typeutil.NewSet[string]() diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 35cd62f24a..a81003cda7 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -453,28 +453,27 @@ func createMetaForRecycleUnusedSegIndexes(catalog metastore.DataCoordCatalog) *m indexID = UniqueID(400) segID = UniqueID(500) ) - return &meta{ + segments := map[int64]*SegmentInfo{ + segID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "", + NumOfRows: 1026, + State: commonpb.SegmentState_Flushed, + }, + }, + segID + 1: { + SegmentInfo: nil, + }, + } + meta := &meta{ RWMutex: sync.RWMutex{}, ctx: ctx, catalog: catalog, collections: nil, - segments: &SegmentsInfo{ - segments: map[UniqueID]*SegmentInfo{ - segID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "", - NumOfRows: 1026, - State: commonpb.SegmentState_Flushed, - }, - }, - segID + 1: { - SegmentInfo: nil, - }, - }, - }, + segments: NewSegmentsInfo(), indexMeta: &indexMeta{ catalog: catalog, segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{ @@ -558,6 +557,10 @@ func createMetaForRecycleUnusedSegIndexes(catalog metastore.DataCoordCatalog) *m channelCPs: nil, chunkManager: nil, } + for id, segment := range segments { + meta.segments.SetSegment(id, segment) + } + return meta } func TestGarbageCollector_recycleUnusedSegIndexes(t *testing.T) { @@ -608,35 +611,34 @@ func createMetaTableForRecycleUnusedIndexFiles(catalog *datacoord.Catalog) *meta segID = UniqueID(500) buildID = UniqueID(600) ) - return &meta{ + segments := map[UniqueID]*SegmentInfo{ + segID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "", + NumOfRows: 1026, + State: commonpb.SegmentState_Flushed, + }, + }, + segID + 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID + 1, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "", + NumOfRows: 1026, + State: commonpb.SegmentState_Flushed, + }, + }, + } + meta := &meta{ RWMutex: sync.RWMutex{}, ctx: ctx, catalog: catalog, collections: nil, - segments: &SegmentsInfo{ - segments: map[UniqueID]*SegmentInfo{ - segID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "", - NumOfRows: 1026, - State: commonpb.SegmentState_Flushed, - }, - }, - segID + 1: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID + 1, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "", - NumOfRows: 1026, - State: commonpb.SegmentState_Flushed, - }, - }, - }, - }, + segments: NewSegmentsInfo(), indexMeta: &indexMeta{ catalog: catalog, segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{ @@ -734,6 +736,12 @@ func createMetaTableForRecycleUnusedIndexFiles(catalog *datacoord.Catalog) *meta }, }, } + + for id, segment := range segments { + meta.segments.SetSegment(id, segment) + } + + return meta } func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) { @@ -847,195 +855,192 @@ func TestGarbageCollector_clearETCD(t *testing.T) { Timestamp: 1000, } - m := &meta{ - catalog: catalog, - channelCPs: channelCPs, - segments: &SegmentsInfo{ - compactionTo: make(map[int64]int64), - segments: map[UniqueID]*SegmentInfo{ - segID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "dmlChannel", - NumOfRows: 5000, - State: commonpb.SegmentState_Dropped, - MaxRowNum: 65536, - DroppedAt: 0, - DmlPosition: &msgpb.MsgPosition{ - Timestamp: 900, - }, - Binlogs: []*datapb.FieldBinlog{ + segments := map[UniqueID]*SegmentInfo{ + segID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "dmlChannel", + NumOfRows: 5000, + State: commonpb.SegmentState_Dropped, + MaxRowNum: 65536, + DroppedAt: 0, + DmlPosition: &msgpb.MsgPosition{ + Timestamp: 900, + }, + Binlogs: []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ { - FieldID: 1, - Binlogs: []*datapb.Binlog{ - { - LogPath: "log1", - LogSize: 1024, - }, - }, - }, - { - FieldID: 2, - Binlogs: []*datapb.Binlog{ - { - LogPath: "log2", - LogSize: 1024, - }, - }, + LogPath: "log1", + LogSize: 1024, }, }, - Deltalogs: []*datapb.FieldBinlog{ + }, + { + FieldID: 2, + Binlogs: []*datapb.Binlog{ { - FieldID: 1, - Binlogs: []*datapb.Binlog{ - { - LogPath: "del_log1", - LogSize: 1024, - }, - }, - }, - { - FieldID: 2, - Binlogs: []*datapb.Binlog{ - { - LogPath: "del_log2", - LogSize: 1024, - }, - }, - }, - }, - Statslogs: []*datapb.FieldBinlog{ - { - FieldID: 1, - Binlogs: []*datapb.Binlog{ - { - LogPath: "stats_log1", - LogSize: 1024, - }, - }, + LogPath: "log2", + LogSize: 1024, }, }, }, }, - segID + 1: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID + 1, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "dmlChannel", - NumOfRows: 5000, - State: commonpb.SegmentState_Dropped, - MaxRowNum: 65536, - DroppedAt: 0, - DmlPosition: &msgpb.MsgPosition{ - Timestamp: 900, + Deltalogs: []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + LogPath: "del_log1", + LogSize: 1024, + }, + }, + }, + { + FieldID: 2, + Binlogs: []*datapb.Binlog{ + { + LogPath: "del_log2", + LogSize: 1024, + }, }, }, }, - segID + 2: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID + 2, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "dmlChannel", - NumOfRows: 10000, - State: commonpb.SegmentState_Dropped, - MaxRowNum: 65536, - DroppedAt: 10, - DmlPosition: &msgpb.MsgPosition{ - Timestamp: 900, + Statslogs: []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + LogPath: "stats_log1", + LogSize: 1024, + }, }, - CompactionFrom: []int64{segID, segID + 1}, - }, - }, - segID + 3: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID + 3, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "dmlChannel", - NumOfRows: 2000, - State: commonpb.SegmentState_Dropped, - MaxRowNum: 65536, - DroppedAt: 10, - DmlPosition: &msgpb.MsgPosition{ - Timestamp: 900, - }, - CompactionFrom: nil, - }, - }, - segID + 4: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID + 4, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "dmlChannel", - NumOfRows: 12000, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - DroppedAt: 10, - DmlPosition: &msgpb.MsgPosition{ - Timestamp: 900, - }, - CompactionFrom: []int64{segID + 2, segID + 3}, - }, - }, - segID + 5: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID + 5, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "dmlChannel", - NumOfRows: 2000, - State: commonpb.SegmentState_Dropped, - MaxRowNum: 65535, - DroppedAt: 0, - CompactionFrom: nil, - DmlPosition: &msgpb.MsgPosition{ - Timestamp: 1200, - }, - }, - }, - segID + 6: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID + 6, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "dmlChannel", - NumOfRows: 2000, - State: commonpb.SegmentState_Dropped, - MaxRowNum: 65535, - DroppedAt: uint64(time.Now().Add(time.Hour).UnixNano()), - CompactionFrom: nil, - DmlPosition: &msgpb.MsgPosition{ - Timestamp: 900, - }, - Compacted: true, - }, - }, - // compacted and child is GCed, dml pos is big than channel cp - segID + 7: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID + 7, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "dmlChannel", - NumOfRows: 2000, - State: commonpb.SegmentState_Dropped, - MaxRowNum: 65535, - DroppedAt: 0, - CompactionFrom: nil, - DmlPosition: &msgpb.MsgPosition{ - Timestamp: 1200, - }, - Compacted: true, }, }, }, }, - + segID + 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID + 1, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "dmlChannel", + NumOfRows: 5000, + State: commonpb.SegmentState_Dropped, + MaxRowNum: 65536, + DroppedAt: 0, + DmlPosition: &msgpb.MsgPosition{ + Timestamp: 900, + }, + }, + }, + segID + 2: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID + 2, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "dmlChannel", + NumOfRows: 10000, + State: commonpb.SegmentState_Dropped, + MaxRowNum: 65536, + DroppedAt: 10, + DmlPosition: &msgpb.MsgPosition{ + Timestamp: 900, + }, + CompactionFrom: []int64{segID, segID + 1}, + }, + }, + segID + 3: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID + 3, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "dmlChannel", + NumOfRows: 2000, + State: commonpb.SegmentState_Dropped, + MaxRowNum: 65536, + DroppedAt: 10, + DmlPosition: &msgpb.MsgPosition{ + Timestamp: 900, + }, + CompactionFrom: nil, + }, + }, + segID + 4: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID + 4, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "dmlChannel", + NumOfRows: 12000, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + DroppedAt: 10, + DmlPosition: &msgpb.MsgPosition{ + Timestamp: 900, + }, + CompactionFrom: []int64{segID + 2, segID + 3}, + }, + }, + segID + 5: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID + 5, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "dmlChannel", + NumOfRows: 2000, + State: commonpb.SegmentState_Dropped, + MaxRowNum: 65535, + DroppedAt: 0, + CompactionFrom: nil, + DmlPosition: &msgpb.MsgPosition{ + Timestamp: 1200, + }, + }, + }, + segID + 6: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID + 6, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "dmlChannel", + NumOfRows: 2000, + State: commonpb.SegmentState_Dropped, + MaxRowNum: 65535, + DroppedAt: uint64(time.Now().Add(time.Hour).UnixNano()), + CompactionFrom: nil, + DmlPosition: &msgpb.MsgPosition{ + Timestamp: 900, + }, + Compacted: true, + }, + }, + // compacted and child is GCed, dml pos is big than channel cp + segID + 7: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID + 7, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "dmlChannel", + NumOfRows: 2000, + State: commonpb.SegmentState_Dropped, + MaxRowNum: 65535, + DroppedAt: 0, + CompactionFrom: nil, + DmlPosition: &msgpb.MsgPosition{ + Timestamp: 1200, + }, + Compacted: true, + }, + }, + } + m := &meta{ + catalog: catalog, + channelCPs: channelCPs, + segments: NewSegmentsInfo(), indexMeta: &indexMeta{ catalog: catalog, segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{ @@ -1162,6 +1167,10 @@ func TestGarbageCollector_clearETCD(t *testing.T) { }, } + for id, segment := range segments { + m.segments.SetSegment(id, segment) + } + for segID, segment := range map[UniqueID]*SegmentInfo{ segID: { SegmentInfo: &datapb.SegmentInfo{ diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 4d6508bd75..b02aa5d9ab 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -57,9 +57,9 @@ func newServerHandler(s *Server) *ServerHandler { // GetDataVChanPositions gets vchannel latest positions with provided dml channel names for DataNode. func (h *ServerHandler) GetDataVChanPositions(channel RWChannel, partitionID UniqueID) *datapb.VchannelInfo { - segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool { + segments := h.s.meta.SelectSegments(SegmentFilterFunc(func(s *SegmentInfo) bool { return s.InsertChannel == channel.GetName() && !s.GetIsFake() - }) + })) log.Info("GetDataVChanPositions", zap.Int64("collectionID", channel.GetCollectionID()), zap.String("channel", channel.GetName()), @@ -105,9 +105,9 @@ func (h *ServerHandler) GetDataVChanPositions(channel RWChannel, partitionID Uni // the unflushed segments are actually the segments without index, even they are flushed. func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs ...UniqueID) *datapb.VchannelInfo { // cannot use GetSegmentsByChannel since dropped segments are needed here - segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool { + segments := h.s.meta.SelectSegments(SegmentFilterFunc(func(s *SegmentInfo) bool { return s.InsertChannel == channel.GetName() && !s.GetIsFake() - }) + })) segmentInfos := make(map[int64]*SegmentInfo) indexedSegments := FilterInIndexedSegments(h, h.s.meta, segments...) indexed := make(typeutil.UniqueSet) @@ -223,9 +223,7 @@ func (h *ServerHandler) getEarliestSegmentDMLPos(channel string, partitionIDs .. var minPos *msgpb.MsgPosition var minPosSegID int64 var minPosTs uint64 - segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool { - return s.InsertChannel == channel - }) + segments := h.s.meta.SelectSegments(WithChannel(channel)) validPartitions := lo.Filter(partitionIDs, func(partitionID int64, _ int) bool { return partitionID > allPartitionID }) partitionSet := typeutil.NewUniqueSet(validPartitions...) diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 2094c28560..e4816ae334 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -93,9 +93,9 @@ func (s *Server) createIndexesForSegment(segment *SegmentInfo) error { } func (s *Server) getUnIndexTaskSegments() []*SegmentInfo { - flushedSegments := s.meta.SelectSegments(func(seg *SegmentInfo) bool { + flushedSegments := s.meta.SelectSegments(SegmentFilterFunc(func(seg *SegmentInfo) bool { return isFlush(seg) - }) + })) unindexedSegments := make([]*SegmentInfo, 0) for _, segment := range flushedSegments { @@ -127,9 +127,9 @@ func (s *Server) createIndexForSegmentLoop(ctx context.Context) { } case collectionID := <-s.notifyIndexChan: log.Info("receive create index notify", zap.Int64("collectionID", collectionID)) - segments := s.meta.SelectSegments(func(info *SegmentInfo) bool { - return isFlush(info) && collectionID == info.CollectionID - }) + segments := s.meta.SelectSegments(WithCollection(collectionID), SegmentFilterFunc(func(info *SegmentInfo) bool { + return isFlush(info) + })) for _, segment := range segments { if err := s.createIndexesForSegment(segment); err != nil { log.Warn("create index for segment fail, wait for retry", zap.Int64("segmentID", segment.ID)) @@ -396,9 +396,9 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe indexInfo := &indexpb.IndexInfo{} // The total rows of all indexes should be based on the current perspective - segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool { - return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) - }) + segments := s.selectSegmentIndexesStats(WithCollection(req.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool { + return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) + })) s.completeIndexInfo(indexInfo, indexes[0], segments, false, indexes[0].CreateTime) ret.State = indexInfo.State @@ -448,10 +448,10 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme return ret, nil } -func (s *Server) selectSegmentIndexesStats(selector SegmentInfoSelector) map[int64]*indexStats { +func (s *Server) selectSegmentIndexesStats(filters ...SegmentFilter) map[int64]*indexStats { ret := make(map[int64]*indexStats) - segments := s.meta.SelectSegments(selector) + segments := s.meta.SelectSegments(filters...) segmentIDs := lo.Map(segments, func(info *SegmentInfo, i int) int64 { return info.GetID() }) @@ -647,9 +647,9 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde } // The total rows of all indexes should be based on the current perspective - segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool { - return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) - }) + segments := s.selectSegmentIndexesStats(WithCollection(req.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool { + return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) + })) s.completeIndexInfo(indexInfo, indexes[0], segments, false, indexes[0].CreateTime) log.Info("GetIndexBuildProgress success", zap.Int64("collectionID", req.GetCollectionID()), @@ -700,9 +700,9 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe } // The total rows of all indexes should be based on the current perspective - segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool { - return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) - }) + segments := s.selectSegmentIndexesStats(WithCollection(req.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool { + return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) + })) indexInfos := make([]*indexpb.IndexInfo, 0) for _, index := range indexes { @@ -759,9 +759,9 @@ func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt } // The total rows of all indexes should be based on the current perspective - segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool { - return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) - }) + segments := s.selectSegmentIndexesStats(WithCollection(req.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool { + return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped) + })) indexInfos := make([]*indexpb.IndexInfo, 0) for _, index := range indexes { diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index d5aede0660..cda44b2558 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -693,6 +693,28 @@ func TestServer_GetIndexState(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_IndexNotExist, resp.GetStatus().GetErrorCode()) }) + segments := map[UniqueID]*SegmentInfo{ + segID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "", + NumOfRows: 10250, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS - 1, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS - 1, + }, + }, + currRows: 0, + allocations: nil, + lastFlushTime: time.Time{}, + isCompacting: false, + lastWrittenTime: time.Time{}, + }, + } s.meta = &meta{ catalog: &datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)}, indexMeta: &indexMeta{ @@ -717,31 +739,10 @@ func TestServer_GetIndexState(t *testing.T) { segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{}, }, - segments: &SegmentsInfo{ - compactionTo: make(map[int64]int64), - segments: map[UniqueID]*SegmentInfo{ - segID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "", - NumOfRows: 10250, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - LastExpireTime: createTS - 1, - StartPosition: &msgpb.MsgPosition{ - Timestamp: createTS - 1, - }, - }, - currRows: 0, - allocations: nil, - lastFlushTime: time.Time{}, - isCompacting: false, - lastWrittenTime: time.Time{}, - }, - }, - }, + segments: NewSegmentsInfo(), + } + for id, segment := range segments { + s.meta.segments.SetSegment(id, segment) } t.Run("index state is unissued", func(t *testing.T) { @@ -751,6 +752,28 @@ func TestServer_GetIndexState(t *testing.T) { assert.Equal(t, commonpb.IndexState_InProgress, resp.GetState()) }) + segments = map[UniqueID]*SegmentInfo{ + segID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "", + NumOfRows: 10250, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS - 1, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS - 1, + }, + }, + currRows: 0, + allocations: nil, + lastFlushTime: time.Time{}, + isCompacting: false, + lastWrittenTime: time.Time{}, + }, + } s.meta = &meta{ catalog: &datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)}, indexMeta: &indexMeta{ @@ -794,31 +817,10 @@ func TestServer_GetIndexState(t *testing.T) { }, }, - segments: &SegmentsInfo{ - compactionTo: make(map[int64]int64), - segments: map[UniqueID]*SegmentInfo{ - segID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "", - NumOfRows: 10250, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - LastExpireTime: createTS - 1, - StartPosition: &msgpb.MsgPosition{ - Timestamp: createTS - 1, - }, - }, - currRows: 0, - allocations: nil, - lastFlushTime: time.Time{}, - isCompacting: false, - lastWrittenTime: time.Time{}, - }, - }, - }, + segments: NewSegmentsInfo(), + } + for id, segment := range segments { + s.meta.segments.SetSegment(id, segment) } t.Run("index state is none", func(t *testing.T) { @@ -935,14 +937,14 @@ func TestServer_GetSegmentIndexState(t *testing.T) { IndexSize: 1025, WriteHandoff: false, }) - s.meta.segments.segments[segID] = &SegmentInfo{ + s.meta.segments.SetSegment(segID, &SegmentInfo{ SegmentInfo: nil, currRows: 0, allocations: nil, lastFlushTime: time.Time{}, isCompacting: false, lastWrittenTime: time.Time{}, - } + }) resp, err := s.GetSegmentIndexState(ctx, req) assert.NoError(t, err) @@ -1040,30 +1042,27 @@ func TestServer_GetIndexBuildProgress(t *testing.T) { UserIndexParams: nil, }, } - s.meta.segments = &SegmentsInfo{ - segments: map[UniqueID]*SegmentInfo{ - segID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "", - NumOfRows: 10250, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - LastExpireTime: createTS, - StartPosition: &msgpb.MsgPosition{ - Timestamp: createTS, - }, - }, - currRows: 10250, - allocations: nil, - lastFlushTime: time.Time{}, - isCompacting: false, - lastWrittenTime: time.Time{}, + s.meta.segments = NewSegmentsInfo() + s.meta.segments.SetSegment(segID, &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "", + NumOfRows: 10250, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS, }, }, - } + currRows: 10250, + allocations: nil, + lastFlushTime: time.Time{}, + isCompacting: false, + lastWrittenTime: time.Time{}, + }) resp, err := s.GetIndexBuildProgress(ctx, req) assert.NoError(t, err) @@ -1090,30 +1089,27 @@ func TestServer_GetIndexBuildProgress(t *testing.T) { IndexSize: 0, WriteHandoff: false, }) - s.meta.segments = &SegmentsInfo{ - segments: map[UniqueID]*SegmentInfo{ - segID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "", - NumOfRows: 10250, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - LastExpireTime: createTS, - StartPosition: &msgpb.MsgPosition{ - Timestamp: createTS, - }, - }, - currRows: 10250, - allocations: nil, - lastFlushTime: time.Time{}, - isCompacting: false, - lastWrittenTime: time.Time{}, + s.meta.segments = NewSegmentsInfo() + s.meta.segments.SetSegment(segID, &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "", + NumOfRows: 10250, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS, }, }, - } + currRows: 10250, + allocations: nil, + lastFlushTime: time.Time{}, + isCompacting: false, + lastWrittenTime: time.Time{}, + }) resp, err := s.GetIndexBuildProgress(ctx, req) assert.NoError(t, err) @@ -1194,6 +1190,53 @@ func TestServer_DescribeIndex(t *testing.T) { mock.Anything, ).Return(nil) + segments := map[UniqueID]*SegmentInfo{ + invalidSegID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: invalidSegID, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + // timesamp > index start time, will be filtered out + Timestamp: createTS + 1, + }, + }, + }, + segID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS, + }, + CreatedByCompaction: true, + CompactionFrom: []int64{segID - 1}, + }, + }, + segID - 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID - 1, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Dropped, + MaxRowNum: 65536, + LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS, + }, + }, + }, + } s := &Server{ meta: &meta{ catalog: catalog, @@ -1441,60 +1484,14 @@ func TestServer_DescribeIndex(t *testing.T) { }, }, - segments: &SegmentsInfo{ - compactionTo: make(map[int64]int64), - segments: map[UniqueID]*SegmentInfo{ - invalidSegID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: invalidSegID, - CollectionID: collID, - PartitionID: partID, - NumOfRows: 10000, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - LastExpireTime: createTS, - StartPosition: &msgpb.MsgPosition{ - // timesamp > index start time, will be filtered out - Timestamp: createTS + 1, - }, - }, - }, - segID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - NumOfRows: 10000, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - LastExpireTime: createTS, - StartPosition: &msgpb.MsgPosition{ - Timestamp: createTS, - }, - CreatedByCompaction: true, - CompactionFrom: []int64{segID - 1}, - }, - }, - segID - 1: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID - 1, - CollectionID: collID, - PartitionID: partID, - NumOfRows: 10000, - State: commonpb.SegmentState_Dropped, - MaxRowNum: 65536, - LastExpireTime: createTS, - StartPosition: &msgpb.MsgPosition{ - Timestamp: createTS, - }, - }, - }, - }, - }, + segments: NewSegmentsInfo(), }, allocator: newMockAllocator(), notifyIndexChan: make(chan UniqueID, 1), } + for id, segment := range segments { + s.meta.segments.SetSegment(id, segment) + } t.Run("server not available", func(t *testing.T) { s.stateCode.Store(commonpb.StateCode_Initializing) @@ -1650,10 +1647,7 @@ func TestServer_ListIndexes(t *testing.T) { segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{}, }, - segments: &SegmentsInfo{ - compactionTo: make(map[int64]int64), - segments: map[UniqueID]*SegmentInfo{}, - }, + segments: NewSegmentsInfo(), }, allocator: newMockAllocator(), notifyIndexChan: make(chan UniqueID, 1), @@ -1713,6 +1707,37 @@ func TestServer_GetIndexStatistics(t *testing.T) { mock.Anything, ).Return(nil) + segments := map[UniqueID]*SegmentInfo{ + invalidSegID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + // timesamp > index start time, will be filtered out + Timestamp: createTS + 1, + }, + }, + }, + segID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS, + StartPosition: &msgpb.MsgPosition{ + Timestamp: createTS, + }, + }, + }, + } s := &Server{ meta: &meta{ catalog: catalog, @@ -1897,44 +1922,14 @@ func TestServer_GetIndexStatistics(t *testing.T) { }, }, - segments: &SegmentsInfo{ - compactionTo: make(map[int64]int64), - segments: map[UniqueID]*SegmentInfo{ - invalidSegID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - NumOfRows: 10000, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - LastExpireTime: createTS, - StartPosition: &msgpb.MsgPosition{ - // timesamp > index start time, will be filtered out - Timestamp: createTS + 1, - }, - }, - }, - segID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - NumOfRows: 10000, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - LastExpireTime: createTS, - StartPosition: &msgpb.MsgPosition{ - Timestamp: createTS, - }, - }, - }, - }, - }, + segments: NewSegmentsInfo(), }, allocator: newMockAllocator(), notifyIndexChan: make(chan UniqueID, 1), } + for id, segment := range segments { + s.meta.segments.SetSegment(id, segment) + } t.Run("server not available", func(t *testing.T) { s.stateCode.Store(commonpb.StateCode_Initializing) @@ -2084,27 +2079,24 @@ func TestServer_DropIndex(t *testing.T) { segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{}, }, - segments: &SegmentsInfo{ - compactionTo: make(map[int64]int64), - segments: map[UniqueID]*SegmentInfo{ - segID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - NumOfRows: 10000, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - LastExpireTime: createTS, - }, - }, - }, - }, + segments: NewSegmentsInfo(), }, allocator: newMockAllocator(), notifyIndexChan: make(chan UniqueID, 1), } + s.meta.segments.SetSegment(segID, &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS, + }, + }) + t.Run("server not available", func(t *testing.T) { s.stateCode.Store(commonpb.StateCode_Initializing) resp, err := s.DropIndex(ctx, req) @@ -2250,27 +2242,23 @@ func TestServer_GetIndexInfos(t *testing.T) { }, }, - segments: &SegmentsInfo{ - compactionTo: make(map[int64]int64), - segments: map[UniqueID]*SegmentInfo{ - segID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - NumOfRows: 10000, - State: commonpb.SegmentState_Flushed, - MaxRowNum: 65536, - LastExpireTime: createTS, - }, - }, - }, - }, + segments: NewSegmentsInfo(), chunkManager: cli, }, allocator: newMockAllocator(), notifyIndexChan: make(chan UniqueID, 1), } + s.meta.segments.SetSegment(segID, &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + NumOfRows: 10000, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: createTS, + }, + }) t.Run("server not available", func(t *testing.T) { s.stateCode.Store(commonpb.StateCode_Initializing) @@ -2289,41 +2277,40 @@ func TestServer_GetIndexInfos(t *testing.T) { } func TestMeta_GetHasUnindexTaskSegments(t *testing.T) { - m := &meta{ - segments: &SegmentsInfo{ - segments: map[UniqueID]*SegmentInfo{ - segID: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "", - NumOfRows: 1025, - State: commonpb.SegmentState_Flushed, - }, - }, - segID + 1: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID + 1, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "", - NumOfRows: 1025, - State: commonpb.SegmentState_Growing, - }, - }, - segID + 2: { - SegmentInfo: &datapb.SegmentInfo{ - ID: segID + 2, - CollectionID: collID, - PartitionID: partID, - InsertChannel: "", - NumOfRows: 1025, - State: commonpb.SegmentState_Dropped, - }, - }, + segments := map[UniqueID]*SegmentInfo{ + segID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "", + NumOfRows: 1025, + State: commonpb.SegmentState_Flushed, }, }, + segID + 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID + 1, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "", + NumOfRows: 1025, + State: commonpb.SegmentState_Growing, + }, + }, + segID + 2: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID + 2, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "", + NumOfRows: 1025, + State: commonpb.SegmentState_Dropped, + }, + }, + } + m := &meta{ + segments: NewSegmentsInfo(), indexMeta: &indexMeta{ buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex), segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{}, @@ -2359,6 +2346,9 @@ func TestMeta_GetHasUnindexTaskSegments(t *testing.T) { }, }, } + for id, segment := range segments { + m.segments.SetSegment(id, segment) + } s := &Server{meta: m} t.Run("normal", func(t *testing.T) { diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 03788da8b0..91d4664077 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -962,23 +962,17 @@ func (m *meta) batchSaveDropSegments(channel string, modSegments map[int64]*Segm // GetSegmentsByChannel returns all segment info which insert channel equals provided `dmlCh` func (m *meta) GetSegmentsByChannel(channel string) []*SegmentInfo { - return m.SelectSegments(func(segment *SegmentInfo) bool { - return isSegmentHealthy(segment) && segment.InsertChannel == channel - }) + return m.SelectSegments(SegmentFilterFunc(isSegmentHealthy), WithChannel(channel)) } // GetSegmentsOfCollection get all segments of collection func (m *meta) GetSegmentsOfCollection(collectionID UniqueID) []*SegmentInfo { - return m.SelectSegments(func(segment *SegmentInfo) bool { - return isSegmentHealthy(segment) && segment.GetCollectionID() == collectionID - }) + return m.SelectSegments(SegmentFilterFunc(isSegmentHealthy), WithCollection(collectionID)) } // GetSegmentsIDOfCollection returns all segment ids which collection equals to provided `collectionID` func (m *meta) GetSegmentsIDOfCollection(collectionID UniqueID) []UniqueID { - segments := m.SelectSegments(func(segment *SegmentInfo) bool { - return isSegmentHealthy(segment) && segment.CollectionID == collectionID - }) + segments := m.SelectSegments(SegmentFilterFunc(isSegmentHealthy), WithCollection(collectionID)) return lo.Map(segments, func(segment *SegmentInfo, _ int) int64 { return segment.ID @@ -987,12 +981,11 @@ func (m *meta) GetSegmentsIDOfCollection(collectionID UniqueID) []UniqueID { // GetSegmentsIDOfCollection returns all segment ids which collection equals to provided `collectionID` func (m *meta) GetSegmentsIDOfCollectionWithDropped(collectionID UniqueID) []UniqueID { - segments := m.SelectSegments(func(segment *SegmentInfo) bool { + segments := m.SelectSegments(WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool { return segment != nil && segment.GetState() != commonpb.SegmentState_SegmentStateNone && - segment.GetState() != commonpb.SegmentState_NotExist && - segment.CollectionID == collectionID - }) + segment.GetState() != commonpb.SegmentState_NotExist + })) return lo.Map(segments, func(segment *SegmentInfo, _ int) int64 { return segment.ID @@ -1001,11 +994,10 @@ func (m *meta) GetSegmentsIDOfCollectionWithDropped(collectionID UniqueID) []Uni // GetSegmentsIDOfPartition returns all segments ids which collection & partition equals to provided `collectionID`, `partitionID` func (m *meta) GetSegmentsIDOfPartition(collectionID, partitionID UniqueID) []UniqueID { - segments := m.SelectSegments(func(segment *SegmentInfo) bool { + segments := m.SelectSegments(WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool { return isSegmentHealthy(segment) && - segment.CollectionID == collectionID && segment.PartitionID == partitionID - }) + })) return lo.Map(segments, func(segment *SegmentInfo, _ int) int64 { return segment.ID @@ -1014,12 +1006,11 @@ func (m *meta) GetSegmentsIDOfPartition(collectionID, partitionID UniqueID) []Un // GetSegmentsIDOfPartition returns all segments ids which collection & partition equals to provided `collectionID`, `partitionID` func (m *meta) GetSegmentsIDOfPartitionWithDropped(collectionID, partitionID UniqueID) []UniqueID { - segments := m.SelectSegments(func(segment *SegmentInfo) bool { + segments := m.SelectSegments(WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool { return segment.GetState() != commonpb.SegmentState_SegmentStateNone && segment.GetState() != commonpb.SegmentState_NotExist && - segment.CollectionID == collectionID && segment.PartitionID == partitionID - }) + })) return lo.Map(segments, func(segment *SegmentInfo, _ int) int64 { return segment.ID @@ -1031,34 +1022,34 @@ func (m *meta) GetNumRowsOfPartition(collectionID UniqueID, partitionID UniqueID m.RLock() defer m.RUnlock() var ret int64 - segments := m.segments.GetSegments() + segments := m.SelectSegments(WithCollection(collectionID), SegmentFilterFunc(func(si *SegmentInfo) bool { + return isSegmentHealthy(si) && si.GetPartitionID() == partitionID + })) for _, segment := range segments { - if isSegmentHealthy(segment) && segment.CollectionID == collectionID && segment.PartitionID == partitionID { - ret += segment.NumOfRows - } + ret += segment.NumOfRows } return ret } // GetUnFlushedSegments get all segments which state is not `Flushing` nor `Flushed` func (m *meta) GetUnFlushedSegments() []*SegmentInfo { - return m.SelectSegments(func(segment *SegmentInfo) bool { + return m.SelectSegments(SegmentFilterFunc(func(segment *SegmentInfo) bool { return segment.GetState() == commonpb.SegmentState_Growing || segment.GetState() == commonpb.SegmentState_Sealed - }) + })) } // GetFlushingSegments get all segments which state is `Flushing` func (m *meta) GetFlushingSegments() []*SegmentInfo { - return m.SelectSegments(func(segment *SegmentInfo) bool { + return m.SelectSegments(SegmentFilterFunc(func(segment *SegmentInfo) bool { return segment.GetState() == commonpb.SegmentState_Flushing - }) + })) } // SelectSegments select segments with selector -func (m *meta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo { +func (m *meta) SelectSegments(filters ...SegmentFilter) []*SegmentInfo { m.RLock() defer m.RUnlock() - return m.segments.GetSegmentsBySelector(selector) + return m.segments.GetSegmentsBySelector(filters...) } // AddAllocation add allocation in segment @@ -1406,12 +1397,12 @@ func (m *meta) GcConfirm(ctx context.Context, collectionID, partitionID UniqueID } func (m *meta) GetCompactableSegmentGroupByCollection() map[int64][]*SegmentInfo { - allSegs := m.SelectSegments(func(segment *SegmentInfo) bool { + allSegs := m.SelectSegments(SegmentFilterFunc(func(segment *SegmentInfo) bool { return isSegmentHealthy(segment) && isFlush(segment) && // sealed segment !segment.isCompacting && // not compacting now !segment.GetIsImporting() // not importing now - }) + })) ret := make(map[int64][]*SegmentInfo) for _, seg := range allSegs { @@ -1426,12 +1417,11 @@ func (m *meta) GetCompactableSegmentGroupByCollection() map[int64][]*SegmentInfo } func (m *meta) GetEarliestStartPositionOfGrowingSegments(label *CompactionGroupLabel) *msgpb.MsgPosition { - segments := m.SelectSegments(func(segment *SegmentInfo) bool { + segments := m.SelectSegments(WithCollection(label.CollectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool { return segment.GetState() == commonpb.SegmentState_Growing && - segment.GetCollectionID() == label.CollectionID && segment.GetPartitionID() == label.PartitionID && segment.GetInsertChannel() == label.Channel - }) + })) earliest := &msgpb.MsgPosition{Timestamp: math.MaxUint64} for _, seg := range segments { diff --git a/internal/datacoord/mock_compaction_meta.go b/internal/datacoord/mock_compaction_meta.go index b76c663839..b905124d7d 100644 --- a/internal/datacoord/mock_compaction_meta.go +++ b/internal/datacoord/mock_compaction_meta.go @@ -128,13 +128,19 @@ func (_c *MockCompactionMeta_GetHealthySegment_Call) RunAndReturn(run func(int64 return _c } -// SelectSegments provides a mock function with given fields: selector -func (_m *MockCompactionMeta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo { - ret := _m.Called(selector) +// SelectSegments provides a mock function with given fields: filters +func (_m *MockCompactionMeta) SelectSegments(filters ...SegmentFilter) []*SegmentInfo { + _va := make([]interface{}, len(filters)) + for _i := range filters { + _va[_i] = filters[_i] + } + var _ca []interface{} + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) var r0 []*SegmentInfo - if rf, ok := ret.Get(0).(func(SegmentInfoSelector) []*SegmentInfo); ok { - r0 = rf(selector) + if rf, ok := ret.Get(0).(func(...SegmentFilter) []*SegmentInfo); ok { + r0 = rf(filters...) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]*SegmentInfo) @@ -150,14 +156,21 @@ type MockCompactionMeta_SelectSegments_Call struct { } // SelectSegments is a helper method to define mock.On call -// - selector SegmentInfoSelector -func (_e *MockCompactionMeta_Expecter) SelectSegments(selector interface{}) *MockCompactionMeta_SelectSegments_Call { - return &MockCompactionMeta_SelectSegments_Call{Call: _e.mock.On("SelectSegments", selector)} +// - filters ...SegmentFilter +func (_e *MockCompactionMeta_Expecter) SelectSegments(filters ...interface{}) *MockCompactionMeta_SelectSegments_Call { + return &MockCompactionMeta_SelectSegments_Call{Call: _e.mock.On("SelectSegments", + append([]interface{}{}, filters...)...)} } -func (_c *MockCompactionMeta_SelectSegments_Call) Run(run func(selector SegmentInfoSelector)) *MockCompactionMeta_SelectSegments_Call { +func (_c *MockCompactionMeta_SelectSegments_Call) Run(run func(filters ...SegmentFilter)) *MockCompactionMeta_SelectSegments_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(SegmentInfoSelector)) + variadicArgs := make([]SegmentFilter, len(args)-0) + for i, a := range args[0:] { + if a != nil { + variadicArgs[i] = a.(SegmentFilter) + } + } + run(variadicArgs...) }) return _c } @@ -167,7 +180,7 @@ func (_c *MockCompactionMeta_SelectSegments_Call) Return(_a0 []*SegmentInfo) *Mo return _c } -func (_c *MockCompactionMeta_SelectSegments_Call) RunAndReturn(run func(SegmentInfoSelector) []*SegmentInfo) *MockCompactionMeta_SelectSegments_Call { +func (_c *MockCompactionMeta_SelectSegments_Call) RunAndReturn(run func(...SegmentFilter) []*SegmentInfo) *MockCompactionMeta_SelectSegments_Call { _c.Call.Return(run) return _c } diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 5f8540da9d..4ab3d16819 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -20,6 +20,7 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/samber/lo" "go.uber.org/atomic" "go.uber.org/zap" @@ -32,6 +33,7 @@ import ( // SegmentsInfo wraps a map, which maintains ID to SegmentInfo relation type SegmentsInfo struct { segments map[UniqueID]*SegmentInfo + collSegments map[UniqueID]*CollectionSegments compactionTo map[UniqueID]UniqueID // map the compact relation, value is the segment which `CompactFrom` contains key. // A segment can be compacted to only one segment finally in meta. } @@ -68,10 +70,15 @@ func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo { func NewSegmentsInfo() *SegmentsInfo { return &SegmentsInfo{ segments: make(map[UniqueID]*SegmentInfo), + collSegments: make(map[UniqueID]*CollectionSegments), compactionTo: make(map[UniqueID]UniqueID), } } +type CollectionSegments struct { + segments map[int64]*SegmentInfo +} + // GetSegment returns SegmentInfo // the logPath in meta is empty func (s *SegmentsInfo) GetSegment(segmentID UniqueID) *SegmentInfo { @@ -86,21 +93,33 @@ func (s *SegmentsInfo) GetSegment(segmentID UniqueID) *SegmentInfo { // no deep copy applied // the logPath in meta is empty func (s *SegmentsInfo) GetSegments() []*SegmentInfo { - segments := make([]*SegmentInfo, 0, len(s.segments)) - for _, segment := range s.segments { - segments = append(segments, segment) - } - return segments + return lo.Values(s.segments) } -func (s *SegmentsInfo) GetSegmentsBySelector(selector SegmentInfoSelector) []*SegmentInfo { - var segments []*SegmentInfo - for _, segment := range s.segments { - if selector(segment) { - segments = append(segments, segment) +func (s *SegmentsInfo) GetSegmentsBySelector(filters ...SegmentFilter) []*SegmentInfo { + criterion := &segmentCriterion{} + for _, filter := range filters { + filter.AddFilter(criterion) + } + var result []*SegmentInfo + var candidates []*SegmentInfo + // apply criterion + switch { + case criterion.collectionID > 0: + collSegments, ok := s.collSegments[criterion.collectionID] + if !ok { + return nil + } + candidates = lo.Values(collSegments.segments) + default: + candidates = lo.Values(s.segments) + } + for _, segment := range candidates { + if criterion.Match(segment) { + result = append(result, segment) } } - return segments + return result } // GetCompactionTo returns the segment that the provided segment is compacted to. @@ -125,6 +144,7 @@ func (s *SegmentsInfo) GetCompactionTo(fromSegmentID int64) (*SegmentInfo, bool) func (s *SegmentsInfo) DropSegment(segmentID UniqueID) { if segment, ok := s.segments[segmentID]; ok { s.deleteCompactTo(segment) + s.delCollection(segment) delete(s.segments, segmentID) } } @@ -136,8 +156,10 @@ func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) { if segment, ok := s.segments[segmentID]; ok { // Remove old segment compact to relation first. s.deleteCompactTo(segment) + s.delCollection(segment) } s.segments[segmentID] = segment + s.addCollection(segment) s.addCompactTo(segment) } @@ -274,6 +296,30 @@ func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo { return cloned } +func (s *SegmentsInfo) addCollection(segment *SegmentInfo) { + collID := segment.GetCollectionID() + collSegment, ok := s.collSegments[collID] + if !ok { + collSegment = &CollectionSegments{ + segments: make(map[UniqueID]*SegmentInfo), + } + s.collSegments[collID] = collSegment + } + collSegment.segments[segment.GetID()] = segment +} + +func (s *SegmentsInfo) delCollection(segment *SegmentInfo) { + collID := segment.GetCollectionID() + collSegment, ok := s.collSegments[collID] + if !ok { + return + } + delete(collSegment.segments, segment.GetID()) + if len(collSegment.segments) == 0 { + delete(s.collSegments, segment.GetCollectionID()) + } +} + // addCompactTo adds the compact relation to the segment func (s *SegmentsInfo) addCompactTo(segment *SegmentInfo) { for _, from := range segment.GetCompactionFrom() { diff --git a/internal/datacoord/segment_operator.go b/internal/datacoord/segment_operator.go index 1e2c1fe4e7..2d26f6d03d 100644 --- a/internal/datacoord/segment_operator.go +++ b/internal/datacoord/segment_operator.go @@ -28,3 +28,52 @@ func SetMaxRowCount(maxRow int64) SegmentOperator { return true } } + +type segmentCriterion struct { + collectionID int64 + others []SegmentFilter +} + +func (sc *segmentCriterion) Match(segment *SegmentInfo) bool { + for _, filter := range sc.others { + if !filter.Match(segment) { + return false + } + } + return true +} + +type SegmentFilter interface { + Match(segment *SegmentInfo) bool + AddFilter(*segmentCriterion) +} + +type CollectionFilter int64 + +func (f CollectionFilter) Match(segment *SegmentInfo) bool { + return segment.GetCollectionID() == int64(f) +} + +func (f CollectionFilter) AddFilter(criterion *segmentCriterion) { + criterion.collectionID = int64(f) +} + +func WithCollection(collectionID int64) SegmentFilter { + return CollectionFilter(collectionID) +} + +type SegmentFilterFunc func(*SegmentInfo) bool + +func (f SegmentFilterFunc) Match(segment *SegmentInfo) bool { + return f(segment) +} + +func (f SegmentFilterFunc) AddFilter(criterion *segmentCriterion) { + criterion.others = append(criterion.others, f) +} + +func WithChannel(channel string) SegmentFilter { + return SegmentFilterFunc(func(si *SegmentInfo) bool { + return si.GetInsertChannel() == channel + }) +}