enhance: improve mix compaction performance by removing max segment limitations (#38344)

See #37234

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
This commit is contained in:
Ted Xu 2024-12-11 20:38:42 +08:00 committed by GitHub
parent e279ccf109
commit dc85d8e968
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 51 additions and 39 deletions

View File

@ -682,6 +682,7 @@ dataNode:
levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode
levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1. levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1.
useMergeSort: false # Whether to enable mergeSort mode when performing mixCompaction. useMergeSort: false # Whether to enable mergeSort mode when performing mixCompaction.
maxSegmentMergeSort: 30 # The maximum number of segments to be merged in mergeSort mode.
gracefulStopTimeout: 1800 # seconds. force stop node without graceful stop gracefulStopTimeout: 1800 # seconds. force stop node without graceful stop
slot: slot:
slotCap: 16 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode slotCap: 16 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode

View File

@ -535,7 +535,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
toMerge := newSegmentPacker("merge", smallCandidates) toMerge := newSegmentPacker("merge", smallCandidates)
toPack := newSegmentPacker("pack", nonPlannedSegments) toPack := newSegmentPacker("pack", nonPlannedSegments)
maxSegs := Params.DataCoordCfg.MaxSegmentToMerge.GetAsInt64() maxSegs := int64(4096) // Deprecate the max segment limit since it is irrelevant in simple compactions.
minSegs := Params.DataCoordCfg.MinSegmentToMerge.GetAsInt64() minSegs := Params.DataCoordCfg.MinSegmentToMerge.GetAsInt64()
compactableProportion := Params.DataCoordCfg.SegmentCompactableProportion.GetAsFloat() compactableProportion := Params.DataCoordCfg.SegmentCompactableProportion.GetAsFloat()
satisfiedSize := int64(float64(expectedSize) * compactableProportion) satisfiedSize := int64(float64(expectedSize) * compactableProportion)
@ -543,16 +543,18 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
maxLeftSize := expectedSize - satisfiedSize maxLeftSize := expectedSize - satisfiedSize
expectedExpandedSize := int64(float64(expectedSize) * expantionRate) expectedExpandedSize := int64(float64(expectedSize) * expantionRate)
maxExpandedLeftSize := expectedExpandedSize - satisfiedSize maxExpandedLeftSize := expectedExpandedSize - satisfiedSize
reasons := make([]string, 0)
// 1. Merge small segments if they can make a full bucket // 1. Merge small segments if they can make a full bucket
for { for {
pack, _ := toMerge.pack(expectedSize, maxLeftSize, minSegs, maxSegs) pack, left := toMerge.pack(expectedSize, maxLeftSize, minSegs, maxSegs)
if len(pack) == 0 { if len(pack) == 0 {
break break
} }
reasons = append(reasons, fmt.Sprintf("merging %d small segments with left size %d", len(pack), left))
buckets = append(buckets, pack) buckets = append(buckets, pack)
} }
// 2. greedy pick from large segment to small, the goal is to fill each segment to reach 512M
// we must ensure all prioritized candidates is in a plan // 2. Pack prioritized candidates with small segments
// TODO the compaction selection policy should consider if compaction workload is high // TODO the compaction selection policy should consider if compaction workload is high
for { for {
// No limit on the remaining size because we want to pack all prioritized candidates // No limit on the remaining size because we want to pack all prioritized candidates
@ -560,6 +562,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
if len(pack) == 0 { if len(pack) == 0 {
break break
} }
reasons = append(reasons, fmt.Sprintf("packing %d prioritized segments", len(pack)))
buckets = append(buckets, pack) buckets = append(buckets, pack)
} }
@ -570,6 +573,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
if len(pack) == 0 { if len(pack) == 0 {
break break
} }
reasons = append(reasons, fmt.Sprintf("packing all %d small segments", len(pack)))
buckets = append(buckets, pack) buckets = append(buckets, pack)
} }
remaining := t.squeezeSmallSegmentsToBuckets(toMerge.candidates, buckets, expectedSize) remaining := t.squeezeSmallSegmentsToBuckets(toMerge.candidates, buckets, expectedSize)
@ -581,6 +585,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
if len(pack) == 0 { if len(pack) == 0 {
break break
} }
reasons = append(reasons, fmt.Sprintf("packing %d small segments and non-planned segments", len(pack)))
buckets = append(buckets, pack) buckets = append(buckets, pack)
} }
@ -595,7 +600,15 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
pair := typeutil.NewPair(totalRows, segmentIDs) pair := typeutil.NewPair(totalRows, segmentIDs)
tasks[i] = &pair tasks[i] = &pair
} }
log.Info("generatePlans", zap.Int64("collectionID", signal.collectionID), zap.Int("plan_num", len(tasks)))
if len(tasks) > 0 {
log.Info("generated nontrivial compaction tasks",
zap.Int64("collectionID", signal.collectionID),
zap.Int("prioritizedCandidates", len(prioritizedCandidates)),
zap.Int("smallCandidates", len(smallCandidates)),
zap.Int("nonPlannedSegments", len(nonPlannedSegments)),
zap.Strings("reasons", reasons))
}
return tasks return tasks
} }

View File

@ -751,7 +751,8 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
}, },
} }
for i := UniqueID(0); i < 50; i++ { nSegments := 50
for i := UniqueID(0); i < UniqueID(nSegments); i++ {
info := &SegmentInfo{ info := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{
ID: i, ID: i,
@ -913,16 +914,13 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
assert.Equal(t, tt.wantErr, err != nil) assert.Equal(t, tt.wantErr, err != nil)
spy := (tt.fields.compactionHandler).(*spyCompactionHandler) spy := (tt.fields.compactionHandler).(*spyCompactionHandler)
// should be split into two plans select {
plan := <-spy.spyChan case plan := <-spy.spyChan:
assert.NotEmpty(t, plan) assert.NotEmpty(t, plan)
assert.Equal(t, len(plan.SegmentBinlogs), nSegments)
// TODO CZS case <-time.After(2 * time.Second):
// assert.Equal(t, len(plan.SegmentBinlogs), 30) assert.Fail(t, "timeout")
plan = <-spy.spyChan }
assert.NotEmpty(t, plan)
// TODO CZS
// assert.Equal(t, len(plan.SegmentBinlogs), 20)
}) })
} }
} }

View File

@ -21,9 +21,6 @@ import (
"sort" "sort"
"github.com/bits-and-blooms/bitset" "github.com/bits-and-blooms/bitset"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
) )
type Sizable interface { type Sizable interface {
@ -65,11 +62,6 @@ func (c *Knapsack[T]) tryPack(size, maxLeftSize, minSegs, maxSegs int64) (bitset
nSelections := selection.Count() nSelections := selection.Count()
if left > maxLeftSize || nSelections < uint(minSegs) { if left > maxLeftSize || nSelections < uint(minSegs) {
log.Debug("tryPack failed",
zap.String("name", c.name),
zap.Int64("left", left), zap.Int64("maxLeftSize", maxLeftSize),
zap.Int64("minSegs", minSegs),
zap.Uint("nselections", nSelections))
selection.ClearAll() selection.ClearAll()
left = size left = size
} }

View File

@ -329,17 +329,23 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
return nil, errors.New("illegal compaction plan") return nil, errors.New("illegal compaction plan")
} }
allSorted := true sortMergeAppicable := paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool()
for _, segment := range t.plan.GetSegmentBinlogs() { if sortMergeAppicable {
if !segment.GetIsSorted() { for _, segment := range t.plan.GetSegmentBinlogs() {
allSorted = false if !segment.GetIsSorted() {
break sortMergeAppicable = false
break
}
}
if len(insertPaths) <= 1 || len(insertPaths) > paramtable.Get().DataNodeCfg.MaxSegmentMergeSort.GetAsInt() {
// sort merge is not applicable if there is only one segment or too many segments
sortMergeAppicable = false
} }
} }
var res []*datapb.CompactionSegment var res []*datapb.CompactionSegment
if paramtable.Get().DataNodeCfg.UseMergeSort.GetAsBool() && allSorted && len(t.plan.GetSegmentBinlogs()) > 1 { if sortMergeAppicable {
log.Info("all segments are sorted, use merge sort") log.Info("compact by merge sort")
res, err = mergeSortMultipleSegments(ctxTimeout, t.plan, t.collectionID, t.partitionID, t.maxRows, t.binlogIO, res, err = mergeSortMultipleSegments(ctxTimeout, t.plan, t.collectionID, t.partitionID, t.maxRows, t.binlogIO,
t.plan.GetSegmentBinlogs(), t.tr, t.currentTs, t.plan.GetCollectionTtl(), t.bm25FieldIDs) t.plan.GetSegmentBinlogs(), t.tr, t.currentTs, t.plan.GetCollectionTtl(), t.bm25FieldIDs)
if err != nil { if err != nil {

View File

@ -3284,7 +3284,6 @@ type dataCoordConfig struct {
CompactionMaxParallelTasks ParamItem `refreshable:"true"` CompactionMaxParallelTasks ParamItem `refreshable:"true"`
CompactionWorkerParalleTasks ParamItem `refreshable:"true"` CompactionWorkerParalleTasks ParamItem `refreshable:"true"`
MinSegmentToMerge ParamItem `refreshable:"true"` MinSegmentToMerge ParamItem `refreshable:"true"`
MaxSegmentToMerge ParamItem `refreshable:"true"`
SegmentSmallProportion ParamItem `refreshable:"true"` SegmentSmallProportion ParamItem `refreshable:"true"`
SegmentCompactableProportion ParamItem `refreshable:"true"` SegmentCompactableProportion ParamItem `refreshable:"true"`
SegmentExpansionRate ParamItem `refreshable:"true"` SegmentExpansionRate ParamItem `refreshable:"true"`
@ -3610,13 +3609,6 @@ mix is prioritized by level: mix compactions first, then L0 compactions, then cl
} }
p.MinSegmentToMerge.Init(base.mgr) p.MinSegmentToMerge.Init(base.mgr)
p.MaxSegmentToMerge = ParamItem{
Key: "dataCoord.compaction.max.segment",
Version: "2.0.0",
DefaultValue: "30",
}
p.MaxSegmentToMerge.Init(base.mgr)
p.SegmentSmallProportion = ParamItem{ p.SegmentSmallProportion = ParamItem{
Key: "dataCoord.segment.smallProportion", Key: "dataCoord.segment.smallProportion",
Version: "2.0.0", Version: "2.0.0",
@ -4303,6 +4295,7 @@ type dataNodeConfig struct {
L0BatchMemoryRatio ParamItem `refreshable:"true"` L0BatchMemoryRatio ParamItem `refreshable:"true"`
L0CompactionMaxBatchSize ParamItem `refreshable:"true"` L0CompactionMaxBatchSize ParamItem `refreshable:"true"`
UseMergeSort ParamItem `refreshable:"true"` UseMergeSort ParamItem `refreshable:"true"`
MaxSegmentMergeSort ParamItem `refreshable:"true"`
GracefulStopTimeout ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"`
@ -4643,6 +4636,15 @@ if this parameter <= 0, will set it as 10`,
} }
p.UseMergeSort.Init(base.mgr) p.UseMergeSort.Init(base.mgr)
p.MaxSegmentMergeSort = ParamItem{
Key: "dataNode.compaction.maxSegmentMergeSort",
Version: "2.5.0",
Doc: "The maximum number of segments to be merged in mergeSort mode.",
DefaultValue: "30",
Export: true,
}
p.MaxSegmentMergeSort.Init(base.mgr)
p.GracefulStopTimeout = ParamItem{ p.GracefulStopTimeout = ParamItem{
Key: "dataNode.gracefulStopTimeout", Key: "dataNode.gracefulStopTimeout",
Version: "2.3.7", Version: "2.3.7",