diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 27cf286f57..a43c78e656 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -458,18 +458,14 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa } buckets := [][]*SegmentInfo{} - toUpdate := newSegmentPacker("update", prioritizedCandidates) - toMerge := newSegmentPacker("merge", smallCandidates) - toPack := newSegmentPacker("pack", nonPlannedSegments) + toUpdate := newSegmentPacker("update", prioritizedCandidates, compactTime) + toMerge := newSegmentPacker("merge", smallCandidates, compactTime) maxSegs := int64(4096) // Deprecate the max segment limit since it is irrelevant in simple compactions. minSegs := Params.DataCoordCfg.MinSegmentToMerge.GetAsInt64() compactableProportion := Params.DataCoordCfg.SegmentCompactableProportion.GetAsFloat() satisfiedSize := int64(float64(expectedSize) * compactableProportion) - expantionRate := Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat() maxLeftSize := expectedSize - satisfiedSize - expectedExpandedSize := int64(float64(expectedSize) * expantionRate) - maxExpandedLeftSize := expectedExpandedSize - satisfiedSize reasons := make([]string, 0) // 1. Merge small segments if they can make a full bucket for { @@ -492,11 +488,12 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa reasons = append(reasons, fmt.Sprintf("packing %d prioritized segments", len(pack))) buckets = append(buckets, pack) } - // if there is any segment toUpdate left, its size must greater than expectedSize, add it to the buckets + // if there is any segment toUpdate left, its size must be greater than expectedSize, add it to the buckets for _, s := range toUpdate.candidates { buckets = append(buckets, []*SegmentInfo{s}) reasons = append(reasons, fmt.Sprintf("force packing prioritized segment %d", s.GetID())) } + // 2.+ legacy: squeeze small segments // Try merge all small segments, and then squeeze for { @@ -507,18 +504,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa reasons = append(reasons, fmt.Sprintf("packing all %d small segments", len(pack))) buckets = append(buckets, pack) } - remaining := t.squeezeSmallSegmentsToBuckets(toMerge.candidates, buckets, expectedSize) - toMerge = newSegmentPacker("merge", remaining) - - // 3. pack remaining small segments with non-planned segments - for { - pack, _ := toMerge.packWith(expectedExpandedSize, maxExpandedLeftSize, minSegs, maxSegs, toPack) - if len(pack) == 0 { - break - } - reasons = append(reasons, fmt.Sprintf("packing %d small segments and non-planned segments", len(pack))) - buckets = append(buckets, pack) - } + smallRemaining := t.squeezeSmallSegmentsToBuckets(toMerge.candidates, buckets, expectedSize) tasks := make([]*typeutil.Pair[int64, []int64], len(buckets)) for i, b := range buckets { @@ -540,6 +526,13 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa zap.Int("nonPlannedSegments", len(nonPlannedSegments)), zap.Strings("reasons", reasons)) } + if len(smallRemaining) > 0 { + log.RatedInfo(300, "remain small segments", + zap.Int64("collectionID", signal.collectionID), + zap.Int64("partitionID", signal.partitionID), + zap.String("channel", signal.channel), + zap.Int("smallRemainingCount", len(smallRemaining))) + } return tasks } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 7f84a1d591..2c08279bed 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -1509,150 +1509,6 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) { } } -func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) { - type fields struct { - meta *meta - allocator allocator.Allocator - signals chan *compactionSignal - inspector CompactionInspector - globalTrigger *time.Ticker - } - type args struct { - collectionID int64 - compactTime *compactTime - } - vecFieldID := int64(201) - - genSegIndex := func(segID, indexID UniqueID, numRows int64) *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex] { - segIdx := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]() - segIdx.Insert(indexID, &model.SegmentIndex{ - SegmentID: segID, - CollectionID: 2, - PartitionID: 1, - NumRows: numRows, - IndexID: indexID, - BuildID: segID, - NodeID: 0, - IndexVersion: 1, - IndexState: commonpb.IndexState_Finished, - }) - return segIdx - } - im := &indexMeta{ - segmentIndexes: typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]](), - indexes: map[UniqueID]map[UniqueID]*model.Index{ - 2: { - indexID: { - TenantID: "", - CollectionID: 2, - FieldID: vecFieldID, - IndexID: indexID, - IndexName: "_default_idx", - IsDeleted: false, - CreateTime: 0, - TypeParams: nil, - IndexParams: []*commonpb.KeyValuePair{ - { - Key: common.IndexTypeKey, - Value: "HNSW", - }, - }, - IsAutoIndex: false, - UserIndexParams: nil, - }, - }, - }, - } - im.segmentIndexes.Insert(1, genSegIndex(1, indexID, 20)) - im.segmentIndexes.Insert(2, genSegIndex(2, indexID, 20)) - im.segmentIndexes.Insert(3, genSegIndex(3, indexID, 20)) - im.segmentIndexes.Insert(4, genSegIndex(4, indexID, 20)) - im.segmentIndexes.Insert(5, genSegIndex(5, indexID, 20)) - im.segmentIndexes.Insert(6, genSegIndex(6, indexID, 20)) - mock0Allocator := newMockAllocator(t) - - collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() - collections.Insert(2, &collectionInfo{ - ID: 2, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: vecFieldID, - DataType: schemapb.DataType_FloatVector, - }, - }, - }, - }) - - tests := []struct { - name string - fields fields - args args - wantErr bool - wantPlans []*datapb.CompactionPlan - }{ - { - "test small segment", - fields{ - &meta{ - channelCPs: newChannelCps(), - segments: mockSegmentsInfo(600, 600, 600, 600, 260, 260), - indexMeta: im, - collections: collections, - }, - mock0Allocator, - make(chan *compactionSignal, 1), - &spyCompactionInspector{t: t, spyChan: make(chan *datapb.CompactionPlan, 1)}, - nil, - }, - args{ - 2, - &compactTime{}, - }, - false, - nil, - }, - } - for _, tt := range tests { - (tt.fields.inspector).(*spyCompactionInspector).meta = tt.fields.meta - t.Run(tt.name, func(t *testing.T) { - tt.fields.meta.channelCPs.checkpoints["ch1"] = &msgpb.MsgPosition{ - Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0), - MsgID: []byte{1, 2, 3, 4}, - } - tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: make(chan *compactionSignal, 100), - inspector: tt.fields.inspector, - globalTrigger: tt.fields.globalTrigger, - indexEngineVersionManager: newMockVersionManager(), - estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, - estimateNonDiskSegmentPolicy: calBySchemaPolicy, - closeCh: lifetime.NewSafeChan(), - testingOnly: true, - } - tr.start() - defer tr.stop() - _, err := tr.TriggerCompaction(context.TODO(), NewCompactionSignal()) - assert.Equal(t, tt.wantErr, err != nil) - spy := (tt.fields.inspector).(*spyCompactionInspector) - select { - case val := <-spy.spyChan: - // max size == 1000, expansion rate == 1.25. - // segment 5 and 6 are squeezed into a non-planned segment. Total size: 600 + 260 + 260 == 1120, - // which is greater than 1000 but smaller than 1000 * 1.25 - assert.Equal(t, len(val.SegmentBinlogs), 3) - return - case <-time.After(3 * time.Second): - assert.Fail(t, "failed to get plan") - return - } - }) - } -} - // Test segment compaction target size func Test_compactionTrigger_noplan_random_size(t *testing.T) { type fields struct { @@ -2731,7 +2587,12 @@ func Test_compactionTrigger_generatePlans(t *testing.T) { Binlogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogID: 1}, + { + EntriesNum: 5, + LogID: 1, + TimestampFrom: 1000, + TimestampTo: 2000, + }, }, }, }, @@ -2759,7 +2620,13 @@ func Test_compactionTrigger_generatePlans(t *testing.T) { Binlogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogID: 2, MemorySize: 3 * 1024 * 1024 * 1024}, + { + EntriesNum: 5, + LogID: 2, + MemorySize: 3 * 1024 * 1024 * 1024, + TimestampFrom: 2000, + TimestampTo: 3000, + }, }, }, }, @@ -2845,7 +2712,7 @@ func Test_compactionTrigger_generatePlans(t *testing.T) { { name: "force trigger on large segments", fields: fields{ - &meta{ + meta: &meta{ catalog: catalog, channelCPs: newChannelCps(), segments: &SegmentsInfo{ @@ -2889,10 +2756,10 @@ func Test_compactionTrigger_generatePlans(t *testing.T) { }, collections: collections, }, - mock0Allocator, - nil, - &spyCompactionInspector{t: t, spyChan: make(chan *datapb.CompactionPlan, 1)}, - nil, + allocator: mock0Allocator, + signals: nil, + inspector: &spyCompactionInspector{t: t, spyChan: make(chan *datapb.CompactionPlan, 1)}, + globalTrigger: nil, }, args: args{ segments: []*SegmentInfo{seg1, seg2}, @@ -2926,3 +2793,305 @@ func Test_compactionTrigger_generatePlans(t *testing.T) { }) } } + +func Test_compactionTrigger_generatePlansByTime(t *testing.T) { + catalog := mocks.NewDataCoordCatalog(t) + catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil).Maybe() + + vecFieldID := int64(201) + indexID := int64(1001) + + schema := &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + }, + } + + mock0Allocator := newMock0Allocator(t) + + seg1 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 2, + PartitionID: 1, + LastExpireTime: 100, + NumOfRows: 100, + MaxRowNum: 300, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Binlogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 5, + LogID: 1, + TimestampFrom: 1000, + TimestampTo: 2000, + }, + }, + }, + }, + Deltalogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {EntriesNum: 5, LogID: 1, MemorySize: 1 * 1024 * 1024}, + }, + }, + }, + IsSorted: true, + }, + } + + seg2 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + CollectionID: 2, + PartitionID: 1, + LastExpireTime: 100, + NumOfRows: 100, + MaxRowNum: 300, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Binlogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 5, + LogID: 2, + TimestampFrom: 1000, + TimestampTo: 2000, + }, + }, + }, + }, + Deltalogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {EntriesNum: 5, LogID: 2}, + }, + }, + }, + IsSorted: true, + }, + } + + seg3 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 3, + CollectionID: 2, + PartitionID: 1, + LastExpireTime: 100, + NumOfRows: 100, + MaxRowNum: 300, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Binlogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 5, + LogID: 3, + TimestampFrom: 3000, + TimestampTo: 4000, + }, + }, + }, + }, + Deltalogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {EntriesNum: 5, LogID: 3}, + }, + }, + }, + IsSorted: true, + }, + } + + type fields struct { + meta *meta + allocator allocator.Allocator + signals chan *compactionSignal + inspector CompactionInspector + globalTrigger *time.Ticker + } + type args struct { + segments []*SegmentInfo + signal *compactionSignal + compactTime *compactTime + expectedSize int64 + } + + collections := typeutil.NewConcurrentMap[UniqueID, *collectionInfo]() + collections.Insert(2, &collectionInfo{ + ID: 2, + Schema: schema, + Properties: map[string]string{ + common.CollectionTTLConfigKey: "0", + }, + }) + + segIndexes := typeutil.NewConcurrentMap[UniqueID, *typeutil.ConcurrentMap[UniqueID, *model.SegmentIndex]]() + segIdx0 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]() + segIdx0.Insert(indexID, &model.SegmentIndex{ + SegmentID: 1, + CollectionID: 2, + PartitionID: 1, + NumRows: 100, + IndexID: indexID, + BuildID: 1, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreatedUTCTime: 0, + IndexFileKeys: nil, + IndexSerializedSize: 0, + WriteHandoff: false, + }) + segIndexes.Insert(1, segIdx0) + segIdx1 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]() + segIdx1.Insert(indexID, &model.SegmentIndex{ + SegmentID: 2, + CollectionID: 2, + PartitionID: 1, + NumRows: 100, + IndexID: indexID, + BuildID: 2, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreatedUTCTime: 0, + IndexFileKeys: nil, + IndexSerializedSize: 0, + WriteHandoff: false, + }) + segIndexes.Insert(2, segIdx1) + segIdx2 := typeutil.NewConcurrentMap[UniqueID, *model.SegmentIndex]() + segIdx2.Insert(indexID, &model.SegmentIndex{ + SegmentID: 3, + CollectionID: 2, + PartitionID: 1, + NumRows: 100, + IndexID: indexID, + BuildID: 3, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreatedUTCTime: 0, + IndexFileKeys: nil, + IndexSerializedSize: 0, + WriteHandoff: false, + }) + segIndexes.Insert(3, segIdx2) + + tests := []struct { + name string + fields fields + args args + want []*typeutil.Pair[int64, []int64] + }{ + { + name: "test time-based compaction", + fields: fields{ + meta: &meta{ + catalog: catalog, + channelCPs: newChannelCps(), + segments: &SegmentsInfo{ + segments: map[int64]*SegmentInfo{ + 1: seg1, + 2: seg2, + 3: seg3, + }, + secondaryIndexes: segmentInfoIndexes{ + coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{ + 2: { + seg1.GetID(): seg1, + seg2.GetID(): seg2, + seg3.GetID(): seg3, + }, + }, + }, + }, + indexMeta: &indexMeta{ + segmentIndexes: segIndexes, + indexes: map[UniqueID]map[UniqueID]*model.Index{ + 2: { + indexID: { + TenantID: "", + CollectionID: 2, + FieldID: vecFieldID, + IndexID: indexID, + IndexName: "_default_idx", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: "HNSW", + }, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + }, + }, + }, + collections: collections, + }, + allocator: mock0Allocator, + signals: nil, + inspector: &spyCompactionInspector{t: t, spyChan: make(chan *datapb.CompactionPlan, 1)}, + globalTrigger: nil, + }, + args: args{ + segments: []*SegmentInfo{seg1, seg2, seg3}, + signal: &compactionSignal{collectionID: 2, partitionID: 1, channel: "ch1", isForce: false}, + compactTime: &compactTime{startTime: 1000, collectionTTL: time.Hour}, + expectedSize: 1024 * 1024 * 1024, + }, + want: []*typeutil.Pair[int64, []int64]{ + {A: 300, B: []int64{1, 2, 3}}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tr := &compactionTrigger{ + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: tt.fields.signals, + inspector: tt.fields.inspector, + globalTrigger: tt.fields.globalTrigger, + estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + estimateNonDiskSegmentPolicy: calBySchemaPolicy, + closeCh: lifetime.NewSafeChan(), + testingOnly: true, + } + + got := tr.generatePlans(tt.args.segments, tt.args.signal, tt.args.compactTime, tt.args.expectedSize) + for i, pair := range got { + t.Logf("got[%d]: totalRows=%d, segmentIDs=%v", i, pair.A, pair.B) + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("compactionTrigger.generatePlans() = %+v, want %+v", got, tt.want) + } + }) + } +} diff --git a/internal/datacoord/knapsack.go b/internal/datacoord/knapsack.go index 23a9bb9ee1..bbbc8d3a9b 100644 --- a/internal/datacoord/knapsack.go +++ b/internal/datacoord/knapsack.go @@ -26,6 +26,8 @@ import ( type Sizable interface { getSegmentSize() int64 GetID() int64 + GetEarliestTs() uint64 + GetResidualSegmentSize() int64 } type Knapsack[T Sizable] struct { @@ -33,12 +35,9 @@ type Knapsack[T Sizable] struct { candidates []T } -func newKnapsack[T Sizable](name string, candidates []T) *Knapsack[T] { +func newKnapsack[T Sizable](name string, candidates []T, less func(T, T) bool) *Knapsack[T] { sort.Slice(candidates, func(i, j int) bool { - if candidates[i].getSegmentSize() != candidates[j].getSegmentSize() { - return candidates[i].getSegmentSize() > candidates[j].getSegmentSize() - } - return candidates[i].GetID() < candidates[j].GetID() + return less(candidates[i], candidates[j]) }) return &Knapsack[T]{ name: name, @@ -53,9 +52,9 @@ func (c *Knapsack[T]) tryPack(size, maxLeftSize, minSegs, maxSegs int64) (bitset if maxSegs == 0 { break } - if segment.getSegmentSize() <= left { + if segment.GetResidualSegmentSize() <= left { selection.Set(uint(i)) - left -= segment.getSegmentSize() + left -= segment.GetResidualSegmentSize() maxSegs-- } } @@ -118,6 +117,27 @@ func (c *Knapsack[T]) packWith(size, maxLeftSize, minSegs, maxSegs int64, other return append(segs, otherSegs...), left } -func newSegmentPacker(name string, candidates []*SegmentInfo) *Knapsack[*SegmentInfo] { - return newKnapsack(name, candidates) +func newSegmentPacker(name string, candidates []*SegmentInfo, compactTime *compactTime) *Knapsack[*SegmentInfo] { + if compactTime != nil && compactTime.collectionTTL > 0 { + return newKnapsack(name, candidates, func(a, b *SegmentInfo) bool { + if a.GetEarliestTs() != b.GetEarliestTs() { + return a.GetEarliestTs() < b.GetEarliestTs() + } else if a.GetResidualSegmentSize() != b.GetResidualSegmentSize() { + return a.GetResidualSegmentSize() > b.GetResidualSegmentSize() + } else if a.getSegmentSize() != b.getSegmentSize() { + return a.getSegmentSize() > b.getSegmentSize() + } + return a.GetID() < b.GetID() + }) + } + return newKnapsack(name, candidates, func(a, b *SegmentInfo) bool { + if a.GetResidualSegmentSize() != b.GetResidualSegmentSize() { + return a.GetResidualSegmentSize() > b.GetResidualSegmentSize() + } else if a.getSegmentSize() != b.getSegmentSize() { + return a.getSegmentSize() > b.getSegmentSize() + } else if a.GetEarliestTs() != b.GetEarliestTs() { + return a.GetEarliestTs() < b.GetEarliestTs() + } + return a.GetID() < b.GetID() + }) } diff --git a/internal/datacoord/knapsack_test.go b/internal/datacoord/knapsack_test.go index 0bcae99d25..2b083ff649 100644 --- a/internal/datacoord/knapsack_test.go +++ b/internal/datacoord/knapsack_test.go @@ -23,8 +23,9 @@ import ( ) type element struct { - size int64 - id int64 + size int64 + id int64 + residualSize int64 } var _ Sizable = (*element)(nil) @@ -33,6 +34,14 @@ func (e *element) getSegmentSize() int64 { return e.size } +func (e *element) GetResidualSegmentSize() int64 { + return e.size +} + +func (e *element) GetEarliestTs() uint64 { + return 0 +} + func (e *element) GetID() int64 { return e.id } @@ -110,10 +119,133 @@ func Test_pack(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - p := newKnapsack[*element](tt.name, tt.candidates) + p := newKnapsack[*element](tt.name, tt.candidates, func(a, b *element) bool { + if a.GetResidualSegmentSize() != b.GetResidualSegmentSize() { + return a.GetResidualSegmentSize() > b.GetResidualSegmentSize() + } else if a.getSegmentSize() != b.getSegmentSize() { + return a.getSegmentSize() > b.getSegmentSize() + } else if a.GetEarliestTs() != b.GetEarliestTs() { + return a.GetEarliestTs() < b.GetEarliestTs() + } + return a.GetID() < b.GetID() + }) got, left := p.pack(tt.args.size, tt.args.leftSize, tt.args.minSegs, tt.args.maxSegs) assert.Equal(t, tt.want, got) assert.Equal(t, tt.wantLeft, left) }) } } + +type timeElement struct { + element + earliestTs uint64 + residualSize int64 +} + +func (e *timeElement) GetEarliestTs() uint64 { return e.earliestTs } +func (e *timeElement) GetResidualSegmentSize() int64 { return e.residualSize } + +var _ Sizable = (*timeElement)(nil) + +func Test_newKnapsackTimeBased(t *testing.T) { + type args struct { + size int64 + leftSize int64 + minSegs int64 + maxSegs int64 + } + mockTimeElements := func(params ...struct { + size int64 + earliestTs uint64 + residualSize int64 + }, + ) []*timeElement { + var out []*timeElement + for i, p := range params { + out = append(out, &timeElement{ + element: element{size: p.size, id: int64(i)}, + earliestTs: p.earliestTs, + residualSize: p.residualSize, + }) + } + return out + } + + tests := []struct { + name string + candidates []*timeElement + args args + init_order []int64 + pack_order []int64 + }{ + { + name: "sort by earliestTs", + candidates: mockTimeElements( + struct { + size int64 + earliestTs uint64 + residualSize int64 + }{size: 10, earliestTs: 10, residualSize: 5}, + struct { + size int64 + earliestTs uint64 + residualSize int64 + }{size: 20, earliestTs: 5, residualSize: 8}, + struct { + size int64 + earliestTs uint64 + residualSize int64 + }{size: 30, earliestTs: 5, residualSize: 8}, + ), + args: args{size: 60, leftSize: 60, minSegs: 1, maxSegs: 3}, + init_order: []int64{2, 1, 0}, + pack_order: []int64{2, 1, 0}, + }, + { + name: "sort by residualSize", + candidates: mockTimeElements( + struct { + size int64 + earliestTs uint64 + residualSize int64 + }{size: 100, earliestTs: 5, residualSize: 80}, + struct { + size int64 + earliestTs uint64 + residualSize int64 + }{size: 101, earliestTs: 5, residualSize: 60}, + struct { + size int64 + earliestTs uint64 + residualSize int64 + }{size: 102, earliestTs: 5, residualSize: 60}, + ), + args: args{size: 150, leftSize: 100, minSegs: 1, maxSegs: 3}, + init_order: []int64{0, 2, 1}, + pack_order: []int64{0, 2}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := newKnapsack[*timeElement](tt.name, tt.candidates, func(a, b *timeElement) bool { + if a.GetEarliestTs() != b.GetEarliestTs() { + return a.GetEarliestTs() < b.GetEarliestTs() + } else if a.GetResidualSegmentSize() != b.GetResidualSegmentSize() { + return a.GetResidualSegmentSize() > b.GetResidualSegmentSize() + } else if a.getSegmentSize() != b.getSegmentSize() { + return a.getSegmentSize() > b.getSegmentSize() + } + return a.GetID() < b.GetID() + }) + for i, candidate := range p.candidates { + assert.Equal(t, tt.init_order[i], candidate.id) + } + got, _ := p.pack(tt.args.size, tt.args.leftSize, tt.args.minSegs, tt.args.maxSegs) + assert.Equal(t, len(tt.pack_order), len(got)) + for i, candidate := range got { + assert.Equal(t, tt.pack_order[i], candidate.id) + } + }) + } +} diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 3af851a3d4..14004f9da9 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -18,6 +18,7 @@ package datacoord import ( "fmt" + "math" "runtime/debug" "time" @@ -56,12 +57,46 @@ type SegmentInfo struct { // a cache to avoid calculate twice size atomic.Int64 deltaRowcount atomic.Int64 + earliestTs atomic.Uint64 lastWrittenTime time.Time // It is only to ensure mutual exclusion between L0 compacting and stats tasks isStating bool } +func (s *SegmentInfo) GetResidualSegmentSize() int64 { + if s.GetNumOfRows() == 0 { + return 0 + } + totalDeletedRows := 0 + for _, deltaLogs := range s.GetDeltalogs() { + for _, l := range deltaLogs.GetBinlogs() { + totalDeletedRows += int(l.GetEntriesNum()) + } + } + + deltaRatio := float64(totalDeletedRows) / float64(s.GetNumOfRows()) + if deltaRatio >= 1.0 { + // segments with too many deleted rows should be considered as prioritized segments and be compacted definitely + return s.getSegmentSize() + } + residualRatio := 1.0 - deltaRatio + return int64(residualRatio * float64(s.getSegmentSize())) +} + +func (s *SegmentInfo) GetEarliestTs() uint64 { + if s.earliestTs.Load() == 0 { + var earliestFromTs uint64 = math.MaxUint64 + for _, binlogs := range s.GetBinlogs() { + for _, l := range binlogs.GetBinlogs() { + earliestFromTs = min(earliestFromTs, l.TimestampFrom) + } + } + s.earliestTs.Store(earliestFromTs) + } + return s.earliestTs.Load() +} + // NewSegmentInfo create `SegmentInfo` wrapper from `datapb.SegmentInfo` // assign current rows to last checkpoint and pre-allocate `allocations` slice // Note that the allocation information is not preserved, diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index f6eef71207..2184228803 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -916,7 +916,7 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - log.Debug(rpcReceived(method)) + log.Info(rpcReceived(method)) if err := node.sched.ddQueue.Enqueue(rct); err != nil { log.Warn( @@ -928,7 +928,7 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele return merr.Status(err), nil } - log.Debug( + log.Info( rpcEnqueued(method), zap.Uint64("BeginTS", rct.BeginTs()), zap.Uint64("EndTS", rct.EndTs())) @@ -945,7 +945,7 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele return merr.Status(err), nil } - log.Debug( + log.Info( rpcDone(method), zap.Uint64("BeginTS", rct.BeginTs()), zap.Uint64("EndTS", rct.EndTs())) diff --git a/internal/storage/sort.go b/internal/storage/sort.go index cb005e5008..9dbb8384af 100644 --- a/internal/storage/sort.go +++ b/internal/storage/sort.go @@ -247,7 +247,7 @@ func MergeSort(batchSize uint64, schema *schemapb.CollectionSchema, rr []RecordR } } - // If poped idx reaches end of segment, invalidate cache and advance to next segment + // If poped idx reaches end of segment, invalidate cache and advance to next record if idx.i == recs[idx.ri].Len()-1 { err := advanceRecord(idx.ri) if err == io.EOF {