enhance: Pass the compaction configuration through request parameters (#41979)

issue: #41123

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-05-26 11:52:27 +08:00 committed by GitHub
parent e9c0756dc6
commit 80fe573c76
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 71 additions and 47 deletions

View File

@ -22,18 +22,22 @@ import (
)
type Params struct {
EnableStorageV2 bool `json:"enable_storage_v2,omitempty"`
BinLogMaxSize uint64 `json:"binlog_max_size,omitempty"`
UseMergeSort bool `json:"use_merge_sort,omitempty"`
MaxSegmentMergeSort int `json:"max_segment_merge_sort,omitempty"`
EnableStorageV2 bool `json:"enable_storage_v2,omitempty"`
BinLogMaxSize uint64 `json:"binlog_max_size,omitempty"`
UseMergeSort bool `json:"use_merge_sort,omitempty"`
MaxSegmentMergeSort int `json:"max_segment_merge_sort,omitempty"`
PreferSegmentSizeRatio float64 `json:"prefer_segment_size_ratio,omitempty"`
BloomFilterApplyBatchSize int `json:"bloom_filter_apply_batch_size,omitempty"`
}
func genParams() Params {
return Params{
EnableStorageV2: paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool(),
BinLogMaxSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(),
UseMergeSort: paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool(),
MaxSegmentMergeSort: paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt(),
EnableStorageV2: paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool(),
BinLogMaxSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(),
UseMergeSort: paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool(),
MaxSegmentMergeSort: paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt(),
PreferSegmentSizeRatio: paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat(),
BloomFilterApplyBatchSize: paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt(),
}
}

View File

@ -34,10 +34,12 @@ func TestGetJSONParams(t *testing.T) {
err = json.Unmarshal([]byte(jsonStr), &result)
assert.NoError(t, err)
assert.Equal(t, Params{
EnableStorageV2: paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool(),
BinLogMaxSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(),
UseMergeSort: paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool(),
MaxSegmentMergeSort: paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt(),
EnableStorageV2: paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool(),
BinLogMaxSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(),
UseMergeSort: paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool(),
MaxSegmentMergeSort: paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt(),
PreferSegmentSizeRatio: paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat(),
BloomFilterApplyBatchSize: paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt(),
}, result)
}
@ -46,14 +48,18 @@ func TestGetParamsFromJSON(t *testing.T) {
"enable_storage_v2": false,
"binlog_max_size": 4096,
"use_merge_sort": false,
"max_segment_merge_sort": 2
"max_segment_merge_sort": 2,
"prefer_segment_size_ratio": 0.1,
"bloom_filter_apply_batch_size": 1000
}`
expected := Params{
EnableStorageV2: false,
BinLogMaxSize: 4096,
UseMergeSort: false,
MaxSegmentMergeSort: 2,
EnableStorageV2: false,
BinLogMaxSize: 4096,
UseMergeSort: false,
MaxSegmentMergeSort: 2,
PreferSegmentSizeRatio: 0.1,
BloomFilterApplyBatchSize: 1000,
}
result, err := ParseParamsFromJSON(input)
@ -73,9 +79,11 @@ func TestGetParamsFromJSON_EmptyJSON(t *testing.T) {
result, err := ParseParamsFromJSON(emptyJSON)
assert.NoError(t, err)
assert.Equal(t, Params{
EnableStorageV2: paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool(),
BinLogMaxSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(),
UseMergeSort: paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool(),
MaxSegmentMergeSort: paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt(),
EnableStorageV2: paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool(),
BinLogMaxSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(),
UseMergeSort: paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool(),
MaxSegmentMergeSort: paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt(),
PreferSegmentSizeRatio: paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat(),
BloomFilterApplyBatchSize: paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt(),
}, result)
}

View File

@ -99,6 +99,8 @@ type clusteringCompactionTask struct {
offsetToBufferFunc func(int64, []uint32) *ClusterBuffer
// bm25
bm25FieldIds []int64
compactionParams compaction.Params
}
type ClusterBuffer struct {
@ -196,6 +198,12 @@ func (t *clusteringCompactionTask) init() error {
if t.plan.GetType() != datapb.CompactionType_ClusteringCompaction {
return merr.WrapErrIllegalCompactionPlan("illegal compaction type")
}
var err error
t.compactionParams, err = compaction.ParseParamsFromJSON(t.plan.GetJsonParams())
if err != nil {
return err
}
if len(t.plan.GetSegmentBinlogs()) == 0 {
return merr.WrapErrIllegalCompactionPlan("empty segment binlogs")
}
@ -316,10 +324,6 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
}
buckets, containsNull := t.splitClusterByScalarValue(analyzeDict)
scalarToClusterBufferMap := make(map[interface{}]*ClusterBuffer, 0)
compactionParams, err := compaction.ParseParamsFromJSON(t.plan.GetJsonParams())
if err != nil {
return err
}
for id, bucket := range buckets {
fieldStats, err := storage.NewFieldStats(t.clusteringKeyField.FieldID, t.clusteringKeyField.DataType, 0)
if err != nil {
@ -330,7 +334,7 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
}
alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc)
writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize))
writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize))
if err != nil {
return err
}
@ -349,7 +353,7 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
}
alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc)
writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize))
writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize))
if err != nil {
return err
}
@ -405,11 +409,7 @@ func (t *clusteringCompactionTask) generatedVectorPlan(ctx context.Context, buff
fieldStats.SetVectorCentroids(centroidValues...)
alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc)
compactionParams, err := compaction.ParseParamsFromJSON(t.plan.GetJsonParams())
if err != nil {
return err
}
writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize))
writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize))
if err != nil {
return err
}
@ -969,7 +969,7 @@ func (t *clusteringCompactionTask) switchPolicyForScalarPlan(totalRows int64, ke
}
maxRows := totalRows / bufferNumByMemory
return t.generatedScalarPlan(maxRows, int64(float64(maxRows)*paramtable.Get().DataCoordCfg.ClusteringCompactionPreferSegmentSizeRatio.GetAsFloat()), keys, dict)
return t.generatedScalarPlan(maxRows, int64(float64(maxRows)*t.compactionParams.PreferSegmentSizeRatio), keys, dict)
}
func (t *clusteringCompactionTask) splitClusterByScalarValue(dict map[interface{}]int64) ([][]interface{}, bool) {

View File

@ -57,6 +57,8 @@ type LevelZeroCompactionTask struct {
done chan struct{}
tr *timerecord.TimeRecorder
compactionParams compaction.Params
}
// make sure compactionTask implements compactor interface
@ -120,6 +122,12 @@ func (t *LevelZeroCompactionTask) Compact() (*datapb.CompactionPlanResult, error
return nil, ctx.Err()
}
var err error
t.compactionParams, err = compaction.ParseParamsFromJSON(t.plan.GetJsonParams())
if err != nil {
return nil, err
}
l0Segments := lo.Filter(t.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool {
return s.Level == datapb.SegmentLevel_L0
})
@ -131,7 +139,7 @@ func (t *LevelZeroCompactionTask) Compact() (*datapb.CompactionPlanResult, error
log.Warn("compact wrong, not target sealed segments")
return nil, errors.New("illegal compaction plan with empty target segments")
}
err := binlog.DecompressCompactionBinlogs(l0Segments)
err = binlog.DecompressCompactionBinlogs(l0Segments)
if err != nil {
log.Warn("DecompressCompactionBinlogs failed", zap.Error(err))
return nil, err
@ -285,7 +293,7 @@ type BatchApplyRet = struct {
func (t *LevelZeroCompactionTask) applyBFInParallel(ctx context.Context, deltaData *storage.DeleteData, pool *conc.Pool[any], segmentBfs map[int64]*pkoracle.BloomFilterSet) *typeutil.ConcurrentMap[int, *BatchApplyRet] {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact applyBFInParallel")
defer span.End()
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
batchSize := t.compactionParams.BloomFilterApplyBatchSize
batchPredict := func(pks []storage.PrimaryKey) map[int64][]bool {
segment2Hits := make(map[int64][]bool, 0)

View File

@ -27,6 +27,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/compaction"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/mocks"
@ -65,6 +66,9 @@ func (s *LevelZeroCompactionTaskSuite) SetupTest() {
},
}
s.task = NewLevelZeroCompactionTask(context.Background(), s.mockBinlogIO, nil, plan)
var err error
s.task.compactionParams, err = compaction.ParseParamsFromJSON("")
s.Require().NoError(err)
pk2ts := map[int64]uint64{
1: 20000,

View File

@ -63,6 +63,8 @@ type mixCompactionTask struct {
done chan struct{}
tr *timerecord.TimeRecorder
compactionParams compaction.Params
}
var _ Compactor = (*mixCompactionTask)(nil)
@ -90,6 +92,12 @@ func (t *mixCompactionTask) preCompact() error {
return t.ctx.Err()
}
var err error
t.compactionParams, err = compaction.ParseParamsFromJSON(t.plan.GetJsonParams())
if err != nil {
return err
}
if len(t.plan.GetSegmentBinlogs()) < 1 {
return errors.Newf("compaction plan is illegal, there's no segments in compaction plan, planID = %d", t.GetPlanID())
}
@ -143,11 +151,7 @@ func (t *mixCompactionTask) mergeSplit(
segIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedSegmentIDs().GetBegin(), t.plan.GetPreAllocatedSegmentIDs().GetEnd())
logIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedLogIDs().GetBegin(), t.plan.GetPreAllocatedLogIDs().GetEnd())
compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc)
compactionParams, err := compaction.ParseParamsFromJSON(t.plan.GetJsonParams())
if err != nil {
return nil, err
}
mWriter, err := NewMultiSegmentWriter(ctx, t.binlogIO, compAlloc, t.plan.GetMaxSize(), t.plan.GetSchema(), compactionParams, t.maxRows, t.partitionID, t.collectionID, t.GetChannelName(), 4096)
mWriter, err := NewMultiSegmentWriter(ctx, t.binlogIO, compAlloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.maxRows, t.partitionID, t.collectionID, t.GetChannelName(), 4096)
if err != nil {
return nil, err
}
@ -327,12 +331,7 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
return nil, errors.New("illegal compaction plan")
}
compactionParams, err := compaction.ParseParamsFromJSON(t.plan.GetJsonParams())
if err != nil {
return nil, err
}
sortMergeAppicable := compactionParams.UseMergeSort
sortMergeAppicable := t.compactionParams.UseMergeSort
if sortMergeAppicable {
for _, segment := range t.plan.GetSegmentBinlogs() {
if !segment.GetIsSorted() {
@ -341,13 +340,14 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
}
}
if len(t.plan.GetSegmentBinlogs()) <= 1 ||
len(t.plan.GetSegmentBinlogs()) > compactionParams.MaxSegmentMergeSort {
len(t.plan.GetSegmentBinlogs()) > t.compactionParams.MaxSegmentMergeSort {
// sort merge is not applicable if there is only one segment or too many segments
sortMergeAppicable = false
}
}
var res []*datapb.CompactionSegment
var err error
if sortMergeAppicable {
log.Info("compact by merge sort")
res, err = mergeSortMultipleSegments(ctxTimeout, t.plan, t.collectionID, t.partitionID, t.maxRows, t.binlogIO,