enhance: refine compaction trigger to reduce read/write amplifaction(#41336) (#41728)

related: #41336

Signed-off-by: MrPresent-Han <chun.han@gmail.com>
Co-authored-by: MrPresent-Han <chun.han@gmail.com>
This commit is contained in:
Chun Han 2025-06-04 11:24:38 +08:00 committed by GitHub
parent 508264f953
commit e9b5d9e8bc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 535 additions and 186 deletions

View File

@ -458,18 +458,14 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
} }
buckets := [][]*SegmentInfo{} buckets := [][]*SegmentInfo{}
toUpdate := newSegmentPacker("update", prioritizedCandidates) toUpdate := newSegmentPacker("update", prioritizedCandidates, compactTime)
toMerge := newSegmentPacker("merge", smallCandidates) toMerge := newSegmentPacker("merge", smallCandidates, compactTime)
toPack := newSegmentPacker("pack", nonPlannedSegments)
maxSegs := int64(4096) // Deprecate the max segment limit since it is irrelevant in simple compactions. maxSegs := int64(4096) // Deprecate the max segment limit since it is irrelevant in simple compactions.
minSegs := Params.DataCoordCfg.MinSegmentToMerge.GetAsInt64() minSegs := Params.DataCoordCfg.MinSegmentToMerge.GetAsInt64()
compactableProportion := Params.DataCoordCfg.SegmentCompactableProportion.GetAsFloat() compactableProportion := Params.DataCoordCfg.SegmentCompactableProportion.GetAsFloat()
satisfiedSize := int64(float64(expectedSize) * compactableProportion) satisfiedSize := int64(float64(expectedSize) * compactableProportion)
expantionRate := Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()
maxLeftSize := expectedSize - satisfiedSize maxLeftSize := expectedSize - satisfiedSize
expectedExpandedSize := int64(float64(expectedSize) * expantionRate)
maxExpandedLeftSize := expectedExpandedSize - satisfiedSize
reasons := make([]string, 0) reasons := make([]string, 0)
// 1. Merge small segments if they can make a full bucket // 1. Merge small segments if they can make a full bucket
for { for {
@ -492,11 +488,12 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
reasons = append(reasons, fmt.Sprintf("packing %d prioritized segments", len(pack))) reasons = append(reasons, fmt.Sprintf("packing %d prioritized segments", len(pack)))
buckets = append(buckets, pack) buckets = append(buckets, pack)
} }
// if there is any segment toUpdate left, its size must greater than expectedSize, add it to the buckets // if there is any segment toUpdate left, its size must be greater than expectedSize, add it to the buckets
for _, s := range toUpdate.candidates { for _, s := range toUpdate.candidates {
buckets = append(buckets, []*SegmentInfo{s}) buckets = append(buckets, []*SegmentInfo{s})
reasons = append(reasons, fmt.Sprintf("force packing prioritized segment %d", s.GetID())) reasons = append(reasons, fmt.Sprintf("force packing prioritized segment %d", s.GetID()))
} }
// 2.+ legacy: squeeze small segments // 2.+ legacy: squeeze small segments
// Try merge all small segments, and then squeeze // Try merge all small segments, and then squeeze
for { for {
@ -507,18 +504,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
reasons = append(reasons, fmt.Sprintf("packing all %d small segments", len(pack))) reasons = append(reasons, fmt.Sprintf("packing all %d small segments", len(pack)))
buckets = append(buckets, pack) buckets = append(buckets, pack)
} }
remaining := t.squeezeSmallSegmentsToBuckets(toMerge.candidates, buckets, expectedSize) smallRemaining := t.squeezeSmallSegmentsToBuckets(toMerge.candidates, buckets, expectedSize)
toMerge = newSegmentPacker("merge", remaining)
// 3. pack remaining small segments with non-planned segments
for {
pack, _ := toMerge.packWith(expectedExpandedSize, maxExpandedLeftSize, minSegs, maxSegs, toPack)
if len(pack) == 0 {
break
}
reasons = append(reasons, fmt.Sprintf("packing %d small segments and non-planned segments", len(pack)))
buckets = append(buckets, pack)
}
tasks := make([]*typeutil.Pair[int64, []int64], len(buckets)) tasks := make([]*typeutil.Pair[int64, []int64], len(buckets))
for i, b := range buckets { for i, b := range buckets {
@ -540,6 +526,13 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
zap.Int("nonPlannedSegments", len(nonPlannedSegments)), zap.Int("nonPlannedSegments", len(nonPlannedSegments)),
zap.Strings("reasons", reasons)) zap.Strings("reasons", reasons))
} }
if len(smallRemaining) > 0 {
log.RatedInfo(300, "remain small segments",
zap.Int64("collectionID", signal.collectionID),
zap.Int64("partitionID", signal.partitionID),
zap.String("channel", signal.channel),
zap.Int("smallRemainingCount", len(smallRemaining)))
}
return tasks return tasks
} }

View File

@ -1509,150 +1509,6 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) {
} }
} }
func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) {
type fields struct {
meta *meta
allocator allocator.Allocator
signals chan *compactionSignal
inspector CompactionInspector
globalTrigger *time.Ticker
}
type args struct {
collectionID int64
compactTime *compactTime
}
vecFieldID := int64(201)
genSegIndex := func(segID, indexID UniqueID, numRows int64) *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex] {
segIdx := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx.Insert(indexID, &model.SegmentIndex{
SegmentID: segID,
CollectionID: 2,
PartitionID: 1,
NumRows: numRows,
IndexID: indexID,
BuildID: segID,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
})
return segIdx
}
im := &indexMeta{
segmentIndexes: typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]](),
indexes: map[UniqueID]map[UniqueID]*model.Index{
2: {
indexID: {
TenantID: "",
CollectionID: 2,
FieldID: vecFieldID,
IndexID: indexID,
IndexName: "_default_idx",
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: "HNSW",
},
},
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
}
im.segmentIndexes.Insert(1, genSegIndex(1, indexID, 20))
im.segmentIndexes.Insert(2, genSegIndex(2, indexID, 20))
im.segmentIndexes.Insert(3, genSegIndex(3, indexID, 20))
im.segmentIndexes.Insert(4, genSegIndex(4, indexID, 20))
im.segmentIndexes.Insert(5, genSegIndex(5, indexID, 20))
im.segmentIndexes.Insert(6, genSegIndex(6, indexID, 20))
mock0Allocator := newMockAllocator(t)
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(2, &collectionInfo{
ID: 2,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
},
},
},
})
tests := []struct {
name string
fields fields
args args
wantErr bool
wantPlans []*datapb.CompactionPlan
}{
{
"test small segment",
fields{
&meta{
channelCPs: newChannelCps(),
segments: mockSegmentsInfo(600, 600, 600, 600, 260, 260),
indexMeta: im,
collections: collections,
},
mock0Allocator,
make(chan *compactionSignal, 1),
&spyCompactionInspector{t: t, spyChan: make(chan *datapb.CompactionPlan, 1)},
nil,
},
args{
2,
&compactTime{},
},
false,
nil,
},
}
for _, tt := range tests {
(tt.fields.inspector).(*spyCompactionInspector).meta = tt.fields.meta
t.Run(tt.name, func(t *testing.T) {
tt.fields.meta.channelCPs.checkpoints["ch1"] = &msgpb.MsgPosition{
Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0),
MsgID: []byte{1, 2, 3, 4},
}
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: make(chan *compactionSignal, 100),
inspector: tt.fields.inspector,
globalTrigger: tt.fields.globalTrigger,
indexEngineVersionManager: newMockVersionManager(),
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
tr.start()
defer tr.stop()
_, err := tr.TriggerCompaction(context.TODO(), NewCompactionSignal())
assert.Equal(t, tt.wantErr, err != nil)
spy := (tt.fields.inspector).(*spyCompactionInspector)
select {
case val := <-spy.spyChan:
// max size == 1000, expansion rate == 1.25.
// segment 5 and 6 are squeezed into a non-planned segment. Total size: 600 + 260 + 260 == 1120,
// which is greater than 1000 but smaller than 1000 * 1.25
assert.Equal(t, len(val.SegmentBinlogs), 3)
return
case <-time.After(3 * time.Second):
assert.Fail(t, "failed to get plan")
return
}
})
}
}
// Test segment compaction target size // Test segment compaction target size
func Test_compactionTrigger_noplan_random_size(t *testing.T) { func Test_compactionTrigger_noplan_random_size(t *testing.T) {
type fields struct { type fields struct {
@ -2731,7 +2587,12 @@ func Test_compactionTrigger_generatePlans(t *testing.T) {
Binlogs: []*datapb.FieldBinlog{ Binlogs: []*datapb.FieldBinlog{
{ {
Binlogs: []*datapb.Binlog{ Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1}, {
EntriesNum: 5,
LogID: 1,
TimestampFrom: 1000,
TimestampTo: 2000,
},
}, },
}, },
}, },
@ -2759,7 +2620,13 @@ func Test_compactionTrigger_generatePlans(t *testing.T) {
Binlogs: []*datapb.FieldBinlog{ Binlogs: []*datapb.FieldBinlog{
{ {
Binlogs: []*datapb.Binlog{ Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 2, MemorySize: 3 * 1024 * 1024 * 1024}, {
EntriesNum: 5,
LogID: 2,
MemorySize: 3 * 1024 * 1024 * 1024,
TimestampFrom: 2000,
TimestampTo: 3000,
},
}, },
}, },
}, },
@ -2845,7 +2712,7 @@ func Test_compactionTrigger_generatePlans(t *testing.T) {
{ {
name: "force trigger on large segments", name: "force trigger on large segments",
fields: fields{ fields: fields{
&meta{ meta: &meta{
catalog: catalog, catalog: catalog,
channelCPs: newChannelCps(), channelCPs: newChannelCps(),
segments: &SegmentsInfo{ segments: &SegmentsInfo{
@ -2889,10 +2756,10 @@ func Test_compactionTrigger_generatePlans(t *testing.T) {
}, },
collections: collections, collections: collections,
}, },
mock0Allocator, allocator: mock0Allocator,
nil, signals: nil,
&spyCompactionInspector{t: t, spyChan: make(chan *datapb.CompactionPlan, 1)}, inspector: &spyCompactionInspector{t: t, spyChan: make(chan *datapb.CompactionPlan, 1)},
nil, globalTrigger: nil,
}, },
args: args{ args: args{
segments: []*SegmentInfo{seg1, seg2}, segments: []*SegmentInfo{seg1, seg2},
@ -2926,3 +2793,305 @@ func Test_compactionTrigger_generatePlans(t *testing.T) {
}) })
} }
} }
func Test_compactionTrigger_generatePlansByTime(t *testing.T) {
catalog := mocks.NewDataCoordCatalog(t)
catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil).Maybe()
vecFieldID := int64(201)
indexID := int64(1001)
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: vecFieldID,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
}
mock0Allocator := newMock0Allocator(t)
seg1 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{
EntriesNum: 5,
LogID: 1,
TimestampFrom: 1000,
TimestampTo: 2000,
},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 1, MemorySize: 1 * 1024 * 1024},
},
},
},
IsSorted: true,
},
}
seg2 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{
EntriesNum: 5,
LogID: 2,
TimestampFrom: 1000,
TimestampTo: 2000,
},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 2},
},
},
},
IsSorted: true,
},
}
seg3 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{
EntriesNum: 5,
LogID: 3,
TimestampFrom: 3000,
TimestampTo: 4000,
},
},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogID: 3},
},
},
},
IsSorted: true,
},
}
type fields struct {
meta *meta
allocator allocator.Allocator
signals chan *compactionSignal
inspector CompactionInspector
globalTrigger *time.Ticker
}
type args struct {
segments []*SegmentInfo
signal *compactionSignal
compactTime *compactTime
expectedSize int64
}
collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()
collections.Insert(2, &collectionInfo{
ID: 2,
Schema: schema,
Properties: map[string]string{
common.CollectionTTLConfigKey: "0",
},
})
segIndexes := typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]]()
segIdx0 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx0.Insert(indexID, &model.SegmentIndex{
SegmentID: 1,
CollectionID: 2,
PartitionID: 1,
NumRows: 100,
IndexID: indexID,
BuildID: 1,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 0,
IndexFileKeys: nil,
IndexSerializedSize: 0,
WriteHandoff: false,
})
segIndexes.Insert(1, segIdx0)
segIdx1 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx1.Insert(indexID, &model.SegmentIndex{
SegmentID: 2,
CollectionID: 2,
PartitionID: 1,
NumRows: 100,
IndexID: indexID,
BuildID: 2,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 0,
IndexFileKeys: nil,
IndexSerializedSize: 0,
WriteHandoff: false,
})
segIndexes.Insert(2, segIdx1)
segIdx2 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]()
segIdx2.Insert(indexID, &model.SegmentIndex{
SegmentID: 3,
CollectionID: 2,
PartitionID: 1,
NumRows: 100,
IndexID: indexID,
BuildID: 3,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreatedUTCTime: 0,
IndexFileKeys: nil,
IndexSerializedSize: 0,
WriteHandoff: false,
})
segIndexes.Insert(3, segIdx2)
tests := []struct {
name string
fields fields
args args
want []*typeutil.Pair[int64, []int64]
}{
{
name: "test time-based compaction",
fields: fields{
meta: &meta{
catalog: catalog,
channelCPs: newChannelCps(),
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: seg1,
2: seg2,
3: seg3,
},
secondaryIndexes: segmentInfoIndexes{
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
2: {
seg1.GetID(): seg1,
seg2.GetID(): seg2,
seg3.GetID(): seg3,
},
},
},
},
indexMeta: &indexMeta{
segmentIndexes: segIndexes,
indexes: map[UniqueID]map[UniqueID]*model.Index{
2: {
indexID: {
TenantID: "",
CollectionID: 2,
FieldID: vecFieldID,
IndexID: indexID,
IndexName: "_default_idx",
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: "HNSW",
},
},
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
},
collections: collections,
},
allocator: mock0Allocator,
signals: nil,
inspector: &spyCompactionInspector{t: t, spyChan: make(chan *datapb.CompactionPlan, 1)},
globalTrigger: nil,
},
args: args{
segments: []*SegmentInfo{seg1, seg2, seg3},
signal: &compactionSignal{collectionID: 2, partitionID: 1, channel: "ch1", isForce: false},
compactTime: &compactTime{startTime: 1000, collectionTTL: time.Hour},
expectedSize: 1024 * 1024 * 1024,
},
want: []*typeutil.Pair[int64, []int64]{
{A: 300, B: []int64{1, 2, 3}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
inspector: tt.fields.inspector,
globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
got := tr.generatePlans(tt.args.segments, tt.args.signal, tt.args.compactTime, tt.args.expectedSize)
for i, pair := range got {
t.Logf("got[%d]: totalRows=%d, segmentIDs=%v", i, pair.A, pair.B)
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("compactionTrigger.generatePlans() = %+v, want %+v", got, tt.want)
}
})
}
}

View File

@ -26,6 +26,8 @@ import (
type Sizable interface { type Sizable interface {
getSegmentSize() int64 getSegmentSize() int64
GetID() int64 GetID() int64
GetEarliestTs() uint64
GetResidualSegmentSize() int64
} }
type Knapsack[T Sizable] struct { type Knapsack[T Sizable] struct {
@ -33,12 +35,9 @@ type Knapsack[T Sizable] struct {
candidates []T candidates []T
} }
func newKnapsack[T Sizable](name string, candidates []T) *Knapsack[T] { func newKnapsack[T Sizable](name string, candidates []T, less func(T, T) bool) *Knapsack[T] {
sort.Slice(candidates, func(i, j int) bool { sort.Slice(candidates, func(i, j int) bool {
if candidates[i].getSegmentSize() != candidates[j].getSegmentSize() { return less(candidates[i], candidates[j])
return candidates[i].getSegmentSize() > candidates[j].getSegmentSize()
}
return candidates[i].GetID() < candidates[j].GetID()
}) })
return &Knapsack[T]{ return &Knapsack[T]{
name: name, name: name,
@ -53,9 +52,9 @@ func (c *Knapsack[T]) tryPack(size, maxLeftSize, minSegs, maxSegs int64) (bitset
if maxSegs == 0 { if maxSegs == 0 {
break break
} }
if segment.getSegmentSize() <= left { if segment.GetResidualSegmentSize() <= left {
selection.Set(uint(i)) selection.Set(uint(i))
left -= segment.getSegmentSize() left -= segment.GetResidualSegmentSize()
maxSegs-- maxSegs--
} }
} }
@ -118,6 +117,27 @@ func (c *Knapsack[T]) packWith(size, maxLeftSize, minSegs, maxSegs int64, other
return append(segs, otherSegs...), left return append(segs, otherSegs...), left
} }
func newSegmentPacker(name string, candidates []*SegmentInfo) *Knapsack[*SegmentInfo] { func newSegmentPacker(name string, candidates []*SegmentInfo, compactTime *compactTime) *Knapsack[*SegmentInfo] {
return newKnapsack(name, candidates) if compactTime != nil && compactTime.collectionTTL > 0 {
return newKnapsack(name, candidates, func(a, b *SegmentInfo) bool {
if a.GetEarliestTs() != b.GetEarliestTs() {
return a.GetEarliestTs() < b.GetEarliestTs()
} else if a.GetResidualSegmentSize() != b.GetResidualSegmentSize() {
return a.GetResidualSegmentSize() > b.GetResidualSegmentSize()
} else if a.getSegmentSize() != b.getSegmentSize() {
return a.getSegmentSize() > b.getSegmentSize()
}
return a.GetID() < b.GetID()
})
}
return newKnapsack(name, candidates, func(a, b *SegmentInfo) bool {
if a.GetResidualSegmentSize() != b.GetResidualSegmentSize() {
return a.GetResidualSegmentSize() > b.GetResidualSegmentSize()
} else if a.getSegmentSize() != b.getSegmentSize() {
return a.getSegmentSize() > b.getSegmentSize()
} else if a.GetEarliestTs() != b.GetEarliestTs() {
return a.GetEarliestTs() < b.GetEarliestTs()
}
return a.GetID() < b.GetID()
})
} }

View File

@ -23,8 +23,9 @@ import (
) )
type element struct { type element struct {
size int64 size int64
id int64 id int64
residualSize int64
} }
var _ Sizable = (*element)(nil) var _ Sizable = (*element)(nil)
@ -33,6 +34,14 @@ func (e *element) getSegmentSize() int64 {
return e.size return e.size
} }
func (e *element) GetResidualSegmentSize() int64 {
return e.size
}
func (e *element) GetEarliestTs() uint64 {
return 0
}
func (e *element) GetID() int64 { func (e *element) GetID() int64 {
return e.id return e.id
} }
@ -110,10 +119,133 @@ func Test_pack(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
p := newKnapsack[*element](tt.name, tt.candidates) p := newKnapsack[*element](tt.name, tt.candidates, func(a, b *element) bool {
if a.GetResidualSegmentSize() != b.GetResidualSegmentSize() {
return a.GetResidualSegmentSize() > b.GetResidualSegmentSize()
} else if a.getSegmentSize() != b.getSegmentSize() {
return a.getSegmentSize() > b.getSegmentSize()
} else if a.GetEarliestTs() != b.GetEarliestTs() {
return a.GetEarliestTs() < b.GetEarliestTs()
}
return a.GetID() < b.GetID()
})
got, left := p.pack(tt.args.size, tt.args.leftSize, tt.args.minSegs, tt.args.maxSegs) got, left := p.pack(tt.args.size, tt.args.leftSize, tt.args.minSegs, tt.args.maxSegs)
assert.Equal(t, tt.want, got) assert.Equal(t, tt.want, got)
assert.Equal(t, tt.wantLeft, left) assert.Equal(t, tt.wantLeft, left)
}) })
} }
} }
type timeElement struct {
element
earliestTs uint64
residualSize int64
}
func (e *timeElement) GetEarliestTs() uint64 { return e.earliestTs }
func (e *timeElement) GetResidualSegmentSize() int64 { return e.residualSize }
var _ Sizable = (*timeElement)(nil)
func Test_newKnapsackTimeBased(t *testing.T) {
type args struct {
size int64
leftSize int64
minSegs int64
maxSegs int64
}
mockTimeElements := func(params ...struct {
size int64
earliestTs uint64
residualSize int64
},
) []*timeElement {
var out []*timeElement
for i, p := range params {
out = append(out, &timeElement{
element: element{size: p.size, id: int64(i)},
earliestTs: p.earliestTs,
residualSize: p.residualSize,
})
}
return out
}
tests := []struct {
name string
candidates []*timeElement
args args
init_order []int64
pack_order []int64
}{
{
name: "sort by earliestTs",
candidates: mockTimeElements(
struct {
size int64
earliestTs uint64
residualSize int64
}{size: 10, earliestTs: 10, residualSize: 5},
struct {
size int64
earliestTs uint64
residualSize int64
}{size: 20, earliestTs: 5, residualSize: 8},
struct {
size int64
earliestTs uint64
residualSize int64
}{size: 30, earliestTs: 5, residualSize: 8},
),
args: args{size: 60, leftSize: 60, minSegs: 1, maxSegs: 3},
init_order: []int64{2, 1, 0},
pack_order: []int64{2, 1, 0},
},
{
name: "sort by residualSize",
candidates: mockTimeElements(
struct {
size int64
earliestTs uint64
residualSize int64
}{size: 100, earliestTs: 5, residualSize: 80},
struct {
size int64
earliestTs uint64
residualSize int64
}{size: 101, earliestTs: 5, residualSize: 60},
struct {
size int64
earliestTs uint64
residualSize int64
}{size: 102, earliestTs: 5, residualSize: 60},
),
args: args{size: 150, leftSize: 100, minSegs: 1, maxSegs: 3},
init_order: []int64{0, 2, 1},
pack_order: []int64{0, 2},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := newKnapsack[*timeElement](tt.name, tt.candidates, func(a, b *timeElement) bool {
if a.GetEarliestTs() != b.GetEarliestTs() {
return a.GetEarliestTs() < b.GetEarliestTs()
} else if a.GetResidualSegmentSize() != b.GetResidualSegmentSize() {
return a.GetResidualSegmentSize() > b.GetResidualSegmentSize()
} else if a.getSegmentSize() != b.getSegmentSize() {
return a.getSegmentSize() > b.getSegmentSize()
}
return a.GetID() < b.GetID()
})
for i, candidate := range p.candidates {
assert.Equal(t, tt.init_order[i], candidate.id)
}
got, _ := p.pack(tt.args.size, tt.args.leftSize, tt.args.minSegs, tt.args.maxSegs)
assert.Equal(t, len(tt.pack_order), len(got))
for i, candidate := range got {
assert.Equal(t, tt.pack_order[i], candidate.id)
}
})
}
}

View File

@ -18,6 +18,7 @@ package datacoord
import ( import (
"fmt" "fmt"
"math"
"runtime/debug" "runtime/debug"
"time" "time"
@ -56,12 +57,46 @@ type SegmentInfo struct {
// a cache to avoid calculate twice // a cache to avoid calculate twice
size atomic.Int64 size atomic.Int64
deltaRowcount atomic.Int64 deltaRowcount atomic.Int64
earliestTs atomic.Uint64
lastWrittenTime time.Time lastWrittenTime time.Time
// It is only to ensure mutual exclusion between L0 compacting and stats tasks // It is only to ensure mutual exclusion between L0 compacting and stats tasks
isStating bool isStating bool
} }
func (s *SegmentInfo) GetResidualSegmentSize() int64 {
if s.GetNumOfRows() == 0 {
return 0
}
totalDeletedRows := 0
for _, deltaLogs := range s.GetDeltalogs() {
for _, l := range deltaLogs.GetBinlogs() {
totalDeletedRows += int(l.GetEntriesNum())
}
}
deltaRatio := float64(totalDeletedRows) / float64(s.GetNumOfRows())
if deltaRatio >= 1.0 {
// segments with too many deleted rows should be considered as prioritized segments and be compacted definitely
return s.getSegmentSize()
}
residualRatio := 1.0 - deltaRatio
return int64(residualRatio * float64(s.getSegmentSize()))
}
func (s *SegmentInfo) GetEarliestTs() uint64 {
if s.earliestTs.Load() == 0 {
var earliestFromTs uint64 = math.MaxUint64
for _, binlogs := range s.GetBinlogs() {
for _, l := range binlogs.GetBinlogs() {
earliestFromTs = min(earliestFromTs, l.TimestampFrom)
}
}
s.earliestTs.Store(earliestFromTs)
}
return s.earliestTs.Load()
}
// NewSegmentInfo create `SegmentInfo` wrapper from `datapb.SegmentInfo` // NewSegmentInfo create `SegmentInfo` wrapper from `datapb.SegmentInfo`
// assign current rows to last checkpoint and pre-allocate `allocations` slice // assign current rows to last checkpoint and pre-allocate `allocations` slice
// Note that the allocation information is not preserved, // Note that the allocation information is not preserved,

View File

@ -916,7 +916,7 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele
zap.String("db", request.DbName), zap.String("db", request.DbName),
zap.String("collection", request.CollectionName)) zap.String("collection", request.CollectionName))
log.Debug(rpcReceived(method)) log.Info(rpcReceived(method))
if err := node.sched.ddQueue.Enqueue(rct); err != nil { if err := node.sched.ddQueue.Enqueue(rct); err != nil {
log.Warn( log.Warn(
@ -928,7 +928,7 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele
return merr.Status(err), nil return merr.Status(err), nil
} }
log.Debug( log.Info(
rpcEnqueued(method), rpcEnqueued(method),
zap.Uint64("BeginTS", rct.BeginTs()), zap.Uint64("BeginTS", rct.BeginTs()),
zap.Uint64("EndTS", rct.EndTs())) zap.Uint64("EndTS", rct.EndTs()))
@ -945,7 +945,7 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele
return merr.Status(err), nil return merr.Status(err), nil
} }
log.Debug( log.Info(
rpcDone(method), rpcDone(method),
zap.Uint64("BeginTS", rct.BeginTs()), zap.Uint64("BeginTS", rct.BeginTs()),
zap.Uint64("EndTS", rct.EndTs())) zap.Uint64("EndTS", rct.EndTs()))

View File

@ -247,7 +247,7 @@ func MergeSort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordR
} }
} }
// If poped idx reaches end of segment, invalidate cache and advance to next segment // If poped idx reaches end of segment, invalidate cache and advance to next record
if idx.i == recs[idx.ri].Len()-1 { if idx.i == recs[idx.ri].Len()-1 {
err := advanceRecord(idx.ri) err := advanceRecord(idx.ri)
if err == io.EOF { if err == io.EOF {