From ef1832ff9c6ff32e7e2f65f15a01fcf41240ac91 Mon Sep 17 00:00:00 2001 From: jaime Date: Tue, 8 Oct 2024 19:57:18 +0800 Subject: [PATCH] enhance: enable manual compaction for collections without indexes (#36577) issue: #36576 Signed-off-by: jaime --- .../datacoord/compaction_policy_single.go | 2 +- internal/datacoord/compaction_trigger.go | 4 +- internal/datacoord/compaction_trigger_test.go | 99 ++++++++++++++++++- internal/datacoord/garbage_collector.go | 2 +- internal/datacoord/handler.go | 2 +- internal/datacoord/index_meta.go | 14 +++ internal/datacoord/util.go | 8 +- .../compaction/mix_compaction_test.go | 36 ++++--- 8 files changed, 146 insertions(+), 21 deletions(-) diff --git a/internal/datacoord/compaction_policy_single.go b/internal/datacoord/compaction_policy_single.go index 68f52853b9..1311bcb2fa 100644 --- a/internal/datacoord/compaction_policy_single.go +++ b/internal/datacoord/compaction_policy_single.go @@ -100,7 +100,7 @@ func (policy *singleCompactionPolicy) triggerOneCollection(ctx context.Context, views := make([]CompactionView, 0) for _, group := range partSegments { if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() { - group.segments = FilterInIndexedSegments(policy.handler, policy.meta, group.segments...) + group.segments = FilterInIndexedSegments(policy.handler, policy.meta, false, group.segments...) } collectionTTL, err := getCollectionTTL(collection.Properties) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 516327d4ab..b0081533e4 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -335,7 +335,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { } if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() { - group.segments = FilterInIndexedSegments(t.handler, t.meta, group.segments...) + group.segments = FilterInIndexedSegments(t.handler, t.meta, signal.isForce, group.segments...) } coll, err := t.getCollection(group.collectionID) @@ -693,7 +693,7 @@ func reverseGreedySelect(candidates []*SegmentInfo, free int64, maxSegment int) func (t *compactionTrigger) getCandidateSegments(channel string, partitionID UniqueID) []*SegmentInfo { segments := t.meta.GetSegmentsByChannel(channel) if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() { - segments = FilterInIndexedSegments(t.handler, t.meta, segments...) + segments = FilterInIndexedSegments(t.handler, t.meta, false, segments...) } var res []*SegmentInfo diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 36771bd4af..64d9ea7652 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -96,6 +96,100 @@ func newMockVersionManager() IndexEngineVersionManager { var _ compactionPlanContext = (*spyCompactionHandler)(nil) +func Test_compactionTrigger_force_without_index(t *testing.T) { + catalog := mocks.NewDataCoordCatalog(t) + catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil).Maybe() + + collectionID := int64(11) + binlogs := []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {EntriesNum: 5, LogID: 1}, + }, + }, + } + deltaLogs := []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {EntriesNum: 5, LogID: 1}, + }, + }, + } + + schema := &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 101, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + }, + } + + m := &meta{ + catalog: catalog, + channelCPs: newChannelCps(), + segments: &SegmentsInfo{ + segments: map[int64]*SegmentInfo{ + 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: collectionID, + PartitionID: 1, + LastExpireTime: 100, + NumOfRows: 100, + MaxRowNum: 300, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Binlogs: binlogs, + Deltalogs: deltaLogs, + IsSorted: true, + }, + }, + }, + }, + indexMeta: &indexMeta{ + segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{}, + indexes: map[UniqueID]map[UniqueID]*model.Index{}, + }, + collections: map[int64]*collectionInfo{ + collectionID: { + ID: collectionID, + Schema: schema, + }, + }, + } + + compactionHandler := &spyCompactionHandler{t: t, spyChan: make(chan *datapb.CompactionPlan, 1), meta: m} + tr := &compactionTrigger{ + meta: m, + handler: newMockHandlerWithMeta(m), + allocator: newMock0Allocator(t), + signals: nil, + compactionHandler: compactionHandler, + globalTrigger: nil, + closeCh: lifetime.NewSafeChan(), + testingOnly: true, + } + + _, err := tr.triggerManualCompaction(collectionID) + assert.NoError(t, err) + + select { + case val := <-compactionHandler.spyChan: + assert.Equal(t, 1, len(val.SegmentBinlogs)) + return + case <-time.After(3 * time.Second): + assert.Fail(t, "failed to get plan") + return + } +} + func Test_compactionTrigger_force(t *testing.T) { paramtable.Init() type fields struct { @@ -848,6 +942,9 @@ func Test_compactionTrigger_noplan(t *testing.T) { Params.DataCoordCfg.MinSegmentToMerge.DefaultValue = "4" vecFieldID := int64(201) mock0Allocator := newMockAllocator(t) + im := newSegmentIndexMeta(nil) + im.indexes[2] = make(map[UniqueID]*model.Index) + tests := []struct { name string fields fields @@ -859,7 +956,7 @@ func Test_compactionTrigger_noplan(t *testing.T) { "test no plan", fields{ &meta{ - indexMeta: newSegmentIndexMeta(nil), + indexMeta: im, // 4 segment channelCPs: newChannelCps(), diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index 36903547e2..9fdec0b023 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -440,7 +440,7 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) { droppedCompactTo[to] = struct{}{} } } - indexedSegments := FilterInIndexedSegments(gc.handler, gc.meta, lo.Keys(droppedCompactTo)...) + indexedSegments := FilterInIndexedSegments(gc.handler, gc.meta, false, lo.Keys(droppedCompactTo)...) indexedSet := make(typeutil.UniqueSet) for _, segment := range indexedSegments { indexedSet.Insert(segment.GetID()) diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index c5b1d33b83..60bec831c3 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -129,7 +129,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . segments := h.s.meta.GetRealSegmentsForChannel(channel.GetName()) validSegmentInfos := make(map[int64]*SegmentInfo) - indexedSegments := FilterInIndexedSegments(h, h.s.meta, segments...) + indexedSegments := FilterInIndexedSegments(h, h.s.meta, false, segments...) indexed := typeutil.NewUniqueSet(lo.Map(indexedSegments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })...) unIndexedIDs := make(typeutil.UniqueSet) diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index 55b165ecca..4e5f3d194e 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -988,3 +988,17 @@ func (m *indexMeta) AreAllDiskIndex(collectionID int64, schema *schemapb.Collect allDiskIndex := len(vectorFields) == len(vectorFieldsWithDiskIndex) return allDiskIndex } + +func (m *indexMeta) HasIndex(collectionID int64) bool { + m.RLock() + defer m.RUnlock() + indexes, ok := m.indexes[collectionID] + if ok { + for _, index := range indexes { + if !index.IsDeleted { + return true + } + } + } + return false +} diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 94816931c4..775de7b9fd 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -72,7 +72,7 @@ func VerifyResponse(response interface{}, err error) error { } } -func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo) []*SegmentInfo { +func FilterInIndexedSegments(handler Handler, mt *meta, skipNoIndexCollection bool, segments ...*SegmentInfo) []*SegmentInfo { if len(segments) == 0 { return nil } @@ -83,6 +83,12 @@ func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo ret := make([]*SegmentInfo, 0) for collection, segmentList := range collectionSegments { + // No segments will be filtered if there are no indices in the collection. + if skipNoIndexCollection && !mt.indexMeta.HasIndex(collection) { + ret = append(ret, segmentList...) + continue + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) coll, err := handler.GetCollection(ctx, collection) cancel() diff --git a/tests/integration/compaction/mix_compaction_test.go b/tests/integration/compaction/mix_compaction_test.go index 042f663487..987d28b22f 100644 --- a/tests/integration/compaction/mix_compaction_test.go +++ b/tests/integration/compaction/mix_compaction_test.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metric" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/tests/integration" ) @@ -71,6 +72,17 @@ func (s *CompactionSuite) TestMixCompaction() { s.NoError(err) log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) + // create index + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: integration.FloatVecField, + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(dim, indexType, metricType), + }) + err = merr.CheckRPCCall(createIndexStatus, err) + s.NoError(err) + s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) + // show collection showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) err = merr.CheckRPCCall(showCollectionsResp, err) @@ -110,22 +122,13 @@ func (s *CompactionSuite) TestMixCompaction() { log.Info("insert done", zap.Int("i", i)) } - // create index - createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ - CollectionName: collectionName, - FieldName: integration.FloatVecField, - IndexName: "_default", - ExtraParams: integration.ConstructIndexParam(dim, indexType, metricType), - }) - err = merr.CheckRPCCall(createIndexStatus, err) - s.NoError(err) - s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) - segments, err := c.MetaWatcher.ShowSegments() s.NoError(err) s.NotEmpty(segments) - // stats task happened - s.Equal(rowNum/batch, len(segments)/2) + // The stats task of segments will create a new segment, potentially triggering compaction simultaneously, + // which may lead to an increase or decrease in the number of segments. + s.True(len(segments) > 0) + for _, segment := range segments { log.Info("show segment result", zap.String("segment", segment.String())) } @@ -135,16 +138,21 @@ func (s *CompactionSuite) TestMixCompaction() { segments, err = c.MetaWatcher.ShowSegments() s.NoError(err) s.NotEmpty(segments) + compactFromSegments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { return segment.GetState() == commonpb.SegmentState_Dropped }) compactToSegments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { return segment.GetState() == commonpb.SegmentState_Flushed }) + log.Info("ShowSegments result", zap.Int("len(compactFromSegments)", len(compactFromSegments)), zap.Int("len(compactToSegments)", len(compactToSegments))) - return len(compactToSegments) == 1 + + // The small segments can be merged based on dataCoord.compaction.min.segment + return len(compactToSegments) <= paramtable.Get().DataCoordCfg.MinSegmentToMerge.GetAsInt() } + for !showSegments() { select { case <-ctx.Done():