From 80fe573c76dad3fb506fb6d6129a17bdc8a3b910 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Mon, 26 May 2025 11:52:27 +0800 Subject: [PATCH] enhance: Pass the compaction configuration through request parameters (#41979) issue: #41123 --------- Signed-off-by: Cai Zhang --- internal/compaction/params.go | 20 ++++++----- internal/compaction/params_test.go | 34 ++++++++++++------- .../compactor/clustering_compactor.go | 24 ++++++------- internal/datanode/compactor/l0_compactor.go | 12 +++++-- .../datanode/compactor/l0_compactor_test.go | 4 +++ internal/datanode/compactor/mix_compactor.go | 24 ++++++------- 6 files changed, 71 insertions(+), 47 deletions(-) diff --git a/internal/compaction/params.go b/internal/compaction/params.go index 35f572d689..fabffa77aa 100644 --- a/internal/compaction/params.go +++ b/internal/compaction/params.go @@ -22,18 +22,22 @@ import ( ) type Params struct { - EnableStorageV2 bool `json:"enable_storage_v2,omitempty"` - BinLogMaxSize uint64 `json:"binlog_max_size,omitempty"` - UseMergeSort bool `json:"use_merge_sort,omitempty"` - MaxSegmentMergeSort int `json:"max_segment_merge_sort,omitempty"` + EnableStorageV2 bool `json:"enable_storage_v2,omitempty"` + BinLogMaxSize uint64 `json:"binlog_max_size,omitempty"` + UseMergeSort bool `json:"use_merge_sort,omitempty"` + MaxSegmentMergeSort int `json:"max_segment_merge_sort,omitempty"` + PreferSegmentSizeRatio float64 `json:"prefer_segment_size_ratio,omitempty"` + BloomFilterApplyBatchSize int `json:"bloom_filter_apply_batch_size,omitempty"` } func genParams() Params { return Params{ - EnableStorageV2: paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool(), - BinLogMaxSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(), - UseMergeSort: paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool(), - MaxSegmentMergeSort: paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt(), + EnableStorageV2: paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool(), + BinLogMaxSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(), + UseMergeSort: paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool(), + MaxSegmentMergeSort: paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt(), + PreferSegmentSizeRatio: paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat(), + BloomFilterApplyBatchSize: paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt(), } } diff --git a/internal/compaction/params_test.go b/internal/compaction/params_test.go index b9c3c14b73..094fb95247 100644 --- a/internal/compaction/params_test.go +++ b/internal/compaction/params_test.go @@ -34,10 +34,12 @@ func TestGetJSONParams(t *testing.T) { err = json.Unmarshal([]byte(jsonStr), &result) assert.NoError(t, err) assert.Equal(t, Params{ - EnableStorageV2: paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool(), - BinLogMaxSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(), - UseMergeSort: paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool(), - MaxSegmentMergeSort: paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt(), + EnableStorageV2: paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool(), + BinLogMaxSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(), + UseMergeSort: paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool(), + MaxSegmentMergeSort: paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt(), + PreferSegmentSizeRatio: paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat(), + BloomFilterApplyBatchSize: paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt(), }, result) } @@ -46,14 +48,18 @@ func TestGetParamsFromJSON(t *testing.T) { "enable_storage_v2": false, "binlog_max_size": 4096, "use_merge_sort": false, - "max_segment_merge_sort": 2 + "max_segment_merge_sort": 2, + "prefer_segment_size_ratio": 0.1, + "bloom_filter_apply_batch_size": 1000 }` expected := Params{ - EnableStorageV2: false, - BinLogMaxSize: 4096, - UseMergeSort: false, - MaxSegmentMergeSort: 2, + EnableStorageV2: false, + BinLogMaxSize: 4096, + UseMergeSort: false, + MaxSegmentMergeSort: 2, + PreferSegmentSizeRatio: 0.1, + BloomFilterApplyBatchSize: 1000, } result, err := ParseParamsFromJSON(input) @@ -73,9 +79,11 @@ func TestGetParamsFromJSON_EmptyJSON(t *testing.T) { result, err := ParseParamsFromJSON(emptyJSON) assert.NoError(t, err) assert.Equal(t, Params{ - EnableStorageV2: paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool(), - BinLogMaxSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(), - UseMergeSort: paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool(), - MaxSegmentMergeSort: paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt(), + EnableStorageV2: paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool(), + BinLogMaxSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(), + UseMergeSort: paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool(), + MaxSegmentMergeSort: paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt(), + PreferSegmentSizeRatio: paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat(), + BloomFilterApplyBatchSize: paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt(), }, result) } diff --git a/internal/datanode/compactor/clustering_compactor.go b/internal/datanode/compactor/clustering_compactor.go index 2f2259742d..560d5ecbcf 100644 --- a/internal/datanode/compactor/clustering_compactor.go +++ b/internal/datanode/compactor/clustering_compactor.go @@ -99,6 +99,8 @@ type clusteringCompactionTask struct { offsetToBufferFunc func(int64, []uint32) *ClusterBuffer // bm25 bm25FieldIds []int64 + + compactionParams compaction.Params } type ClusterBuffer struct { @@ -196,6 +198,12 @@ func (t *clusteringCompactionTask) init() error { if t.plan.GetType() != datapb.CompactionType_ClusteringCompaction { return merr.WrapErrIllegalCompactionPlan("illegal compaction type") } + var err error + t.compactionParams, err = compaction.ParseParamsFromJSON(t.plan.GetJsonParams()) + if err != nil { + return err + } + if len(t.plan.GetSegmentBinlogs()) == 0 { return merr.WrapErrIllegalCompactionPlan("empty segment binlogs") } @@ -316,10 +324,6 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e } buckets, containsNull := t.splitClusterByScalarValue(analyzeDict) scalarToClusterBufferMap := make(map[interface{}]*ClusterBuffer, 0) - compactionParams, err := compaction.ParseParamsFromJSON(t.plan.GetJsonParams()) - if err != nil { - return err - } for id, bucket := range buckets { fieldStats, err := storage.NewFieldStats(t.clusteringKeyField.FieldID, t.clusteringKeyField.DataType, 0) if err != nil { @@ -330,7 +334,7 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e } alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc) - writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize)) + writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize)) if err != nil { return err } @@ -349,7 +353,7 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e } alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc) - writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize)) + writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize)) if err != nil { return err } @@ -405,11 +409,7 @@ func (t *clusteringCompactionTask) generatedVectorPlan(ctx context.Context, buff fieldStats.SetVectorCentroids(centroidValues...) alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc) - compactionParams, err := compaction.ParseParamsFromJSON(t.plan.GetJsonParams()) - if err != nil { - return err - } - writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize)) + writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize)) if err != nil { return err } @@ -969,7 +969,7 @@ func (t *clusteringCompactionTask) switchPolicyForScalarPlan(totalRows int64, ke } maxRows := totalRows / bufferNumByMemory - return t.generatedScalarPlan(maxRows, int64(float64(maxRows)*paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat()), keys, dict) + return t.generatedScalarPlan(maxRows, int64(float64(maxRows)*t.compactionParams.PreferSegmentSizeRatio), keys, dict) } func (t *clusteringCompactionTask) splitClusterByScalarValue(dict map[interface{}]int64) ([][]interface{}, bool) { diff --git a/internal/datanode/compactor/l0_compactor.go b/internal/datanode/compactor/l0_compactor.go index 56e985d3e6..f456b6ce6c 100644 --- a/internal/datanode/compactor/l0_compactor.go +++ b/internal/datanode/compactor/l0_compactor.go @@ -57,6 +57,8 @@ type LevelZeroCompactionTask struct { done chan struct{} tr *timerecord.TimeRecorder + + compactionParams compaction.Params } // make sure compactionTask implements compactor interface @@ -120,6 +122,12 @@ func (t *LevelZeroCompactionTask) Compact() (*datapb.CompactionPlanResult, error return nil, ctx.Err() } + var err error + t.compactionParams, err = compaction.ParseParamsFromJSON(t.plan.GetJsonParams()) + if err != nil { + return nil, err + } + l0Segments := lo.Filter(t.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool { return s.Level == datapb.SegmentLevel_L0 }) @@ -131,7 +139,7 @@ func (t *LevelZeroCompactionTask) Compact() (*datapb.CompactionPlanResult, error log.Warn("compact wrong, not target sealed segments") return nil, errors.New("illegal compaction plan with empty target segments") } - err := binlog.DecompressCompactionBinlogs(l0Segments) + err = binlog.DecompressCompactionBinlogs(l0Segments) if err != nil { log.Warn("DecompressCompactionBinlogs failed", zap.Error(err)) return nil, err @@ -285,7 +293,7 @@ type BatchApplyRet = struct { func (t *LevelZeroCompactionTask) applyBFInParallel(ctx context.Context, deltaData *storage.DeleteData, pool *conc.Pool[any], segmentBfs map[int64]*pkoracle.BloomFilterSet) *typeutil.ConcurrentMap[int, *BatchApplyRet] { _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact applyBFInParallel") defer span.End() - batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() + batchSize := t.compactionParams.BloomFilterApplyBatchSize batchPredict := func(pks []storage.PrimaryKey) map[int64][]bool { segment2Hits := make(map[int64][]bool, 0) diff --git a/internal/datanode/compactor/l0_compactor_test.go b/internal/datanode/compactor/l0_compactor_test.go index dacb385c1b..69479f790c 100644 --- a/internal/datanode/compactor/l0_compactor_test.go +++ b/internal/datanode/compactor/l0_compactor_test.go @@ -27,6 +27,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/compaction" "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle" "github.com/milvus-io/milvus/internal/mocks" @@ -65,6 +66,9 @@ func (s *LevelZeroCompactionTaskSuite) SetupTest() { }, } s.task = NewLevelZeroCompactionTask(context.Background(), s.mockBinlogIO, nil, plan) + var err error + s.task.compactionParams, err = compaction.ParseParamsFromJSON("") + s.Require().NoError(err) pk2ts := map[int64]uint64{ 1: 20000, diff --git a/internal/datanode/compactor/mix_compactor.go b/internal/datanode/compactor/mix_compactor.go index b728781c5d..6534f47c36 100644 --- a/internal/datanode/compactor/mix_compactor.go +++ b/internal/datanode/compactor/mix_compactor.go @@ -63,6 +63,8 @@ type mixCompactionTask struct { done chan struct{} tr *timerecord.TimeRecorder + + compactionParams compaction.Params } var _ Compactor = (*mixCompactionTask)(nil) @@ -90,6 +92,12 @@ func (t *mixCompactionTask) preCompact() error { return t.ctx.Err() } + var err error + t.compactionParams, err = compaction.ParseParamsFromJSON(t.plan.GetJsonParams()) + if err != nil { + return err + } + if len(t.plan.GetSegmentBinlogs()) < 1 { return errors.Newf("compaction plan is illegal, there's no segments in compaction plan, planID = %d", t.GetPlanID()) } @@ -143,11 +151,7 @@ func (t *mixCompactionTask) mergeSplit( segIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedSegmentIDs().GetBegin(), t.plan.GetPreAllocatedSegmentIDs().GetEnd()) logIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedLogIDs().GetBegin(), t.plan.GetPreAllocatedLogIDs().GetEnd()) compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc) - compactionParams, err := compaction.ParseParamsFromJSON(t.plan.GetJsonParams()) - if err != nil { - return nil, err - } - mWriter, err := NewMultiSegmentWriter(ctx, t.binlogIO, compAlloc, t.plan.GetMaxSize(), t.plan.GetSchema(), compactionParams, t.maxRows, t.partitionID, t.collectionID, t.GetChannelName(), 4096) + mWriter, err := NewMultiSegmentWriter(ctx, t.binlogIO, compAlloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.maxRows, t.partitionID, t.collectionID, t.GetChannelName(), 4096) if err != nil { return nil, err } @@ -327,12 +331,7 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { return nil, errors.New("illegal compaction plan") } - compactionParams, err := compaction.ParseParamsFromJSON(t.plan.GetJsonParams()) - if err != nil { - return nil, err - } - - sortMergeAppicable := compactionParams.UseMergeSort + sortMergeAppicable := t.compactionParams.UseMergeSort if sortMergeAppicable { for _, segment := range t.plan.GetSegmentBinlogs() { if !segment.GetIsSorted() { @@ -341,13 +340,14 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { } } if len(t.plan.GetSegmentBinlogs()) <= 1 || - len(t.plan.GetSegmentBinlogs()) > compactionParams.MaxSegmentMergeSort { + len(t.plan.GetSegmentBinlogs()) > t.compactionParams.MaxSegmentMergeSort { // sort merge is not applicable if there is only one segment or too many segments sortMergeAppicable = false } } var res []*datapb.CompactionSegment + var err error if sortMergeAppicable { log.Info("compact by merge sort") res, err = mergeSortMultipleSegments(ctxTimeout, t.plan, t.collectionID, t.partitionID, t.maxRows, t.binlogIO,