diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index d485a65b90..7fa71809e3 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -23,7 +23,6 @@ import ( "sync" "time" - "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/zap" @@ -35,7 +34,6 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/logutil" - "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -313,34 +311,17 @@ func (t *compactionTrigger) allocSignalID() (UniqueID, error) { return t.allocator.allocID(ctx) } -func (t *compactionTrigger) reCalcSegmentMaxNumOfRows(collectionID UniqueID, isDisk bool) (int, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - collMeta, err := t.handler.GetCollection(ctx, collectionID) - if err != nil { - return -1, fmt.Errorf("failed to get collection %d", collectionID) - } - if isDisk { - return t.estimateDiskSegmentPolicy(collMeta.Schema) - } - return t.estimateNonDiskSegmentPolicy(collMeta.Schema) -} - -// TODO: Updated segment info should be written back to meta and etcd, write in here without lock is very dangerous -func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, error) { - if len(segments) == 0 { - return false, nil - } - - collectionID := segments[0].GetCollectionID() - indexInfos := t.meta.indexMeta.GetIndexesForCollection(segments[0].GetCollectionID(), "") +func (t *compactionTrigger) getExpectedSegmentSize(collectionID int64) int64 { + indexInfos := t.meta.indexMeta.GetIndexesForCollection(collectionID, "") ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() collMeta, err := t.handler.GetCollection(ctx, collectionID) if err != nil { - return false, fmt.Errorf("failed to get collection %d", collectionID) + log.Warn("failed to get collection", zap.Int64("collectionID", collectionID), zap.Error(err)) + return Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 } + vectorFields := typeutil.GetVectorFieldSchemas(collMeta.Schema) fieldIndexTypes := lo.SliceToMap(indexInfos, func(t *model.Index) (int64, indexparamcheck.IndexType) { return t.FieldID, GetIndexType(t.IndexParams) @@ -352,49 +333,13 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, return false }) - updateSegments := func(segments []*SegmentInfo, newMaxRows int64, isDiskAnn bool) error { - for idx, segmentInfo := range segments { - if segmentInfo.GetMaxRowNum() != newMaxRows { - log.Info("segment max row recalculated", - zap.Int64("segmentID", segmentInfo.GetID()), - zap.Int64("old max rows", segmentInfo.GetMaxRowNum()), - zap.Int64("new max rows", newMaxRows), - zap.Bool("isDiskANN", isDiskAnn), - ) - err := t.meta.UpdateSegment(segmentInfo.GetID(), SetMaxRowCount(newMaxRows)) - if err != nil && !errors.Is(err, merr.ErrSegmentNotFound) { - return err - } - segments[idx] = t.meta.GetSegment(segmentInfo.GetID()) - } - } - return nil - } - allDiskIndex := len(vectorFields) == len(vectorFieldsWithDiskIndex) if allDiskIndex { // Only if all vector fields index type are DiskANN, recalc segment max size here. - newMaxRows, err := t.reCalcSegmentMaxNumOfRows(collectionID, true) - if err != nil { - return false, err - } - err = updateSegments(segments, int64(newMaxRows), true) - if err != nil { - return false, err - } + return Params.DataCoordCfg.DiskSegmentMaxSize.GetAsInt64() * 1024 * 1024 } // If some vector fields index type are not DiskANN, recalc segment max size using default policy. - if !allDiskIndex && !t.testingOnly { - newMaxRows, err := t.reCalcSegmentMaxNumOfRows(collectionID, false) - if err != nil { - return allDiskIndex, err - } - err = updateSegments(segments, int64(newMaxRows), true) - if err != nil { - return false, err - } - } - return allDiskIndex, nil + return Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 } func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { @@ -451,12 +396,6 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { group.segments = FilterInIndexedSegments(t.handler, t.meta, group.segments...) } - isDiskIndex, err := t.updateSegmentMaxSize(group.segments) - if err != nil { - log.Warn("failed to update segment max size", zap.Error(err)) - continue - } - coll, err := t.getCollection(group.collectionID) if err != nil { log.Warn("get collection info failed, skip handling compaction", zap.Error(err)) @@ -479,7 +418,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { return err } - plans := t.generatePlans(group.segments, signal.isForce, isDiskIndex, ct) + plans := t.generatePlans(group.segments, signal.isForce, ct) for _, plan := range plans { segIDs := fetchSegIDs(plan.GetSegmentBinlogs()) @@ -551,12 +490,6 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { return } - isDiskIndex, err := t.updateSegmentMaxSize(segments) - if err != nil { - log.Warn("failed to update segment max size", zap.Error(err)) - return - } - ts, err := t.allocTs() if err != nil { log.Warn("allocate ts failed, skip to handle compaction", zap.Int64("collectionID", signal.collectionID), @@ -589,7 +522,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { return } - plans := t.generatePlans(segments, signal.isForce, isDiskIndex, ct) + plans := t.generatePlans(segments, signal.isForce, ct) for _, plan := range plans { if t.compactionHandler.isFull() { log.Warn("compaction plan skipped due to handler full", zap.Int64("collection", signal.collectionID), zap.Int64("planID", plan.PlanID)) @@ -618,53 +551,57 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { } } -func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, isDiskIndex bool, compactTime *compactTime) []*datapb.CompactionPlan { +func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, compactTime *compactTime) []*datapb.CompactionPlan { + if len(segments) == 0 { + log.Warn("the number of candidate segments is 0, skip to generate compaction plan") + return []*datapb.CompactionPlan{} + } + // find segments need internal compaction // TODO add low priority candidates, for example if the segment is smaller than full 0.9 * max segment size but larger than small segment boundary, we only execute compaction when there are no compaction running actively var prioritizedCandidates []*SegmentInfo var smallCandidates []*SegmentInfo var nonPlannedSegments []*SegmentInfo + expectedSize := t.getExpectedSegmentSize(segments[0].CollectionID) + // TODO, currently we lack of the measurement of data distribution, there should be another compaction help on redistributing segment based on scalar/vector field distribution for _, segment := range segments { segment := segment.ShadowClone() // TODO should we trigger compaction periodically even if the segment has no obvious reason to be compacted? - if force || t.ShouldDoSingleCompaction(segment, isDiskIndex, compactTime) { + if force || t.ShouldDoSingleCompaction(segment, compactTime) { prioritizedCandidates = append(prioritizedCandidates, segment) - } else if t.isSmallSegment(segment) { + } else if t.isSmallSegment(segment, expectedSize) { smallCandidates = append(smallCandidates, segment) } else { nonPlannedSegments = append(nonPlannedSegments, segment) } } - var plans []*datapb.CompactionPlan + buckets := [][]*SegmentInfo{} // sort segment from large to small sort.Slice(prioritizedCandidates, func(i, j int) bool { - if prioritizedCandidates[i].GetNumOfRows() != prioritizedCandidates[j].GetNumOfRows() { - return prioritizedCandidates[i].GetNumOfRows() > prioritizedCandidates[j].GetNumOfRows() + if prioritizedCandidates[i].getSegmentSize() != prioritizedCandidates[j].getSegmentSize() { + return prioritizedCandidates[i].getSegmentSize() > prioritizedCandidates[j].getSegmentSize() } return prioritizedCandidates[i].GetID() < prioritizedCandidates[j].GetID() }) sort.Slice(smallCandidates, func(i, j int) bool { - if smallCandidates[i].GetNumOfRows() != smallCandidates[j].GetNumOfRows() { - return smallCandidates[i].GetNumOfRows() > smallCandidates[j].GetNumOfRows() + if smallCandidates[i].getSegmentSize() != smallCandidates[j].getSegmentSize() { + return smallCandidates[i].getSegmentSize() > smallCandidates[j].getSegmentSize() } return smallCandidates[i].GetID() < smallCandidates[j].GetID() }) // Sort non-planned from small to large. sort.Slice(nonPlannedSegments, func(i, j int) bool { - if nonPlannedSegments[i].GetNumOfRows() != nonPlannedSegments[j].GetNumOfRows() { - return nonPlannedSegments[i].GetNumOfRows() < nonPlannedSegments[j].GetNumOfRows() + if nonPlannedSegments[i].getSegmentSize() != nonPlannedSegments[j].getSegmentSize() { + return nonPlannedSegments[i].getSegmentSize() < nonPlannedSegments[j].getSegmentSize() } return nonPlannedSegments[i].GetID() > nonPlannedSegments[j].GetID() }) - getSegmentIDs := func(segment *SegmentInfo, _ int) int64 { - return segment.GetID() - } // greedy pick from large segment to small, the goal is to fill each segment to reach 512M // we must ensure all prioritized candidates is in a plan // TODO the compaction selection policy should consider if compaction workload is high @@ -676,9 +613,9 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, i prioritizedCandidates = prioritizedCandidates[1:] // only do single file compaction if segment is already large enough - if segment.GetNumOfRows() < segment.GetMaxRowNum() { + if segment.getSegmentSize() < expectedSize { var result []*SegmentInfo - free := segment.GetMaxRowNum() - segment.GetNumOfRows() + free := expectedSize - segment.getSegmentSize() maxNum := Params.DataCoordCfg.MaxSegmentToMerge.GetAsInt() - 1 prioritizedCandidates, result, free = greedySelect(prioritizedCandidates, free, maxNum) bucket = append(bucket, result...) @@ -689,25 +626,9 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, i } } // since this is priority compaction, we will execute even if there is only segment - plan := segmentsToPlan(bucket, compactTime) - var size int64 - var row int64 - for _, s := range bucket { - size += s.getSegmentSize() - row += s.GetNumOfRows() - } - log.Info("generate a plan for priority candidates", zap.Any("plan", plan), - zap.Int64("target segment row", row), zap.Int64("target segment size", size)) - plans = append(plans, plan) + buckets = append(buckets, bucket) } - getSegIDsFromPlan := func(plan *datapb.CompactionPlan) []int64 { - var segmentIDs []int64 - for _, binLog := range plan.GetSegmentBinlogs() { - segmentIDs = append(segmentIDs, binLog.GetSegmentID()) - } - return segmentIDs - } var remainingSmallSegs []*SegmentInfo // check if there are small candidates left can be merged into large segments for len(smallCandidates) > 0 { @@ -718,28 +639,23 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, i smallCandidates = smallCandidates[1:] var result []*SegmentInfo - free := segment.GetMaxRowNum() - segment.GetNumOfRows() + free := expectedSize - segment.getSegmentSize() // for small segment merge, we pick one largest segment and merge as much as small segment together with it // Why reverse? try to merge as many segments as expected. // for instance, if a 255M and 255M is the largest small candidates, they will never be merged because of the MinSegmentToMerge limit. smallCandidates, result, _ = reverseGreedySelect(smallCandidates, free, Params.DataCoordCfg.MaxSegmentToMerge.GetAsInt()-1) bucket = append(bucket, result...) - var size int64 + var targetSize int64 var targetRow int64 for _, s := range bucket { - size += s.getSegmentSize() + targetSize += s.getSegmentSize() targetRow += s.GetNumOfRows() } // only merge if candidate number is large than MinSegmentToMerge or if target row is large enough if len(bucket) >= Params.DataCoordCfg.MinSegmentToMerge.GetAsInt() || - len(bucket) > 1 && t.isCompactableSegment(targetRow, segment) { - plan := segmentsToPlan(bucket, compactTime) - log.Info("generate a plan for small candidates", - zap.Int64s("plan segmentIDs", lo.Map(bucket, getSegmentIDs)), - zap.Int64("target segment row", targetRow), - zap.Int64("target segment size", size)) - plans = append(plans, plan) + len(bucket) > 1 && t.isCompactableSegment(targetSize, expectedSize) { + buckets = append(buckets, bucket) } else { remainingSmallSegs = append(remainingSmallSegs, bucket...) } @@ -747,56 +663,43 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, i // Try adding remaining segments to existing plans. for i := len(remainingSmallSegs) - 1; i >= 0; i-- { s := remainingSmallSegs[i] - if !isExpandableSmallSegment(s) { + if !isExpandableSmallSegment(s, expectedSize) { continue } // Try squeeze this segment into existing plans. This could cause segment size to exceed maxSize. - for _, plan := range plans { - if plan.TotalRows+s.GetNumOfRows() <= int64(Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()*float64(s.GetMaxRowNum())) { - segmentBinLogs := &datapb.CompactionSegmentBinlogs{ - SegmentID: s.GetID(), - FieldBinlogs: s.GetBinlogs(), - Field2StatslogPaths: s.GetStatslogs(), - Deltalogs: s.GetDeltalogs(), - Level: s.GetLevel(), - CollectionID: s.GetCollectionID(), - PartitionID: s.GetPartitionID(), - } - plan.TotalRows += s.GetNumOfRows() - plan.SegmentBinlogs = append(plan.SegmentBinlogs, segmentBinLogs) - log.Info("small segment appended on existing plan", - zap.Int64("segmentID", s.GetID()), - zap.Int64("target rows", plan.GetTotalRows()), - zap.Int64s("plan segmentID", getSegIDsFromPlan(plan)), - ) - - remainingSmallSegs = append(remainingSmallSegs[:i], remainingSmallSegs[i+1:]...) - break + for i, b := range buckets { + totalSize := lo.SumBy(b, func(s *SegmentInfo) int64 { return s.getSegmentSize() }) + if totalSize+s.getSegmentSize() > int64(Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()*float64(expectedSize)) { + continue } + buckets[i] = append(buckets[i], s) + + remainingSmallSegs = append(remainingSmallSegs[:i], remainingSmallSegs[i+1:]...) + break } } // If there are still remaining small segments, try adding them to non-planned segments. for _, npSeg := range nonPlannedSegments { bucket := []*SegmentInfo{npSeg} - targetRow := npSeg.GetNumOfRows() + targetSize := npSeg.getSegmentSize() for i := len(remainingSmallSegs) - 1; i >= 0; i-- { // Note: could also simply use MaxRowNum as limit. - if targetRow+remainingSmallSegs[i].GetNumOfRows() <= - int64(Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()*float64(npSeg.GetMaxRowNum())) { + if targetSize+remainingSmallSegs[i].getSegmentSize() <= + int64(Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()*float64(expectedSize)) { bucket = append(bucket, remainingSmallSegs[i]) - targetRow += remainingSmallSegs[i].GetNumOfRows() + targetSize += remainingSmallSegs[i].getSegmentSize() remainingSmallSegs = append(remainingSmallSegs[:i], remainingSmallSegs[i+1:]...) } } if len(bucket) > 1 { - plan := segmentsToPlan(bucket, compactTime) - plans = append(plans, plan) - log.Info("generate a plan for to squeeze small candidates into non-planned segment", - zap.Int64s("plan segmentIDs", lo.Map(bucket, getSegmentIDs)), - zap.Int64("target segment row", targetRow), - ) + buckets = append(buckets, bucket) } } + + plans := make([]*datapb.CompactionPlan, len(buckets)) + for i, b := range buckets { + plans[i] = segmentsToPlan(b, compactTime) + } return plans } @@ -807,6 +710,7 @@ func segmentsToPlan(segments []*SegmentInfo, compactTime *compactTime) *datapb.C CollectionTtl: compactTime.collectionTTL.Nanoseconds(), } + var size int64 for _, s := range segments { segmentBinlogs := &datapb.CompactionSegmentBinlogs{ SegmentID: s.GetID(), @@ -817,9 +721,12 @@ func segmentsToPlan(segments []*SegmentInfo, compactTime *compactTime) *datapb.C PartitionID: s.GetPartitionID(), } plan.TotalRows += s.GetNumOfRows() + size += s.getSegmentSize() plan.SegmentBinlogs = append(plan.SegmentBinlogs, segmentBinlogs) } + log.Info("generate a plan for priority candidates", zap.Any("plan", plan), + zap.Int64("target segment row", plan.TotalRows), zap.Int64("target segment size", size)) return plan } @@ -828,9 +735,9 @@ func greedySelect(candidates []*SegmentInfo, free int64, maxSegment int) ([]*Seg for i := 0; i < len(candidates); { candidate := candidates[i] - if len(result) < maxSegment && candidate.GetNumOfRows() < free { + if len(result) < maxSegment && candidate.getSegmentSize() < free { result = append(result, candidate) - free -= candidate.GetNumOfRows() + free -= candidate.getSegmentSize() candidates = append(candidates[:i], candidates[i+1:]...) } else { i++ @@ -845,9 +752,9 @@ func reverseGreedySelect(candidates []*SegmentInfo, free int64, maxSegment int) for i := len(candidates) - 1; i >= 0; i-- { candidate := candidates[i] - if (len(result) < maxSegment) && (candidate.GetNumOfRows() < free) { + if (len(result) < maxSegment) && (candidate.getSegmentSize() < free) { result = append(result, candidate) - free -= candidate.GetNumOfRows() + free -= candidate.getSegmentSize() candidates = append(candidates[:i], candidates[i+1:]...) } } @@ -877,11 +784,11 @@ func (t *compactionTrigger) getCandidateSegments(channel string, partitionID Uni return res } -func (t *compactionTrigger) isSmallSegment(segment *SegmentInfo) bool { - return segment.GetNumOfRows() < int64(float64(segment.GetMaxRowNum())*Params.DataCoordCfg.SegmentSmallProportion.GetAsFloat()) +func (t *compactionTrigger) isSmallSegment(segment *SegmentInfo, expectedSize int64) bool { + return segment.getSegmentSize() < int64(float64(expectedSize)*Params.DataCoordCfg.SegmentSmallProportion.GetAsFloat()) } -func (t *compactionTrigger) isCompactableSegment(targetRow int64, segment *SegmentInfo) bool { +func (t *compactionTrigger) isCompactableSegment(targetSize, expectedSize int64) bool { smallProportion := Params.DataCoordCfg.SegmentSmallProportion.GetAsFloat() compactableProportion := Params.DataCoordCfg.SegmentCompactableProportion.GetAsFloat() @@ -890,42 +797,17 @@ func (t *compactionTrigger) isCompactableSegment(targetRow int64, segment *Segme compactableProportion = smallProportion } - return targetRow > int64(float64(segment.GetMaxRowNum())*compactableProportion) + return targetSize > int64(float64(expectedSize)*compactableProportion) } -func isExpandableSmallSegment(segment *SegmentInfo) bool { - return segment.GetNumOfRows() < int64(float64(segment.GetMaxRowNum())*(Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()-1)) +func isExpandableSmallSegment(segment *SegmentInfo, expectedSize int64) bool { + return segment.getSegmentSize() < int64(float64(expectedSize)*(Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()-1)) } -func (t *compactionTrigger) isStaleSegment(segment *SegmentInfo) bool { - return time.Since(segment.lastFlushTime).Minutes() >= segmentTimedFlushDuration -} - -func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, isDiskIndex bool, compactTime *compactTime) bool { +func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compactTime *compactTime) bool { // no longer restricted binlog numbers because this is now related to field numbers binlogCount := GetBinlogCount(segment.GetBinlogs()) - - // count all the statlog file count, only for flush generated segments - if len(segment.CompactionFrom) == 0 { - statsLogCount := GetBinlogCount(segment.GetStatslogs()) - - var maxSize int - if isDiskIndex { - maxSize = int(Params.DataCoordCfg.DiskSegmentMaxSize.GetAsInt64() * 1024 * 1024 / Params.DataNodeCfg.BinLogMaxSize.GetAsInt64()) - } else { - maxSize = int(Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 / Params.DataNodeCfg.BinLogMaxSize.GetAsInt64()) - } - - // if stats log is more than expected, trigger compaction to reduce stats log size. - // TODO maybe we want to compact to single statslog to reduce watch dml channel cost - // TODO avoid rebuild index twice. - if statsLogCount > maxSize*2.0 { - log.Info("stats number is too much, trigger compaction", zap.Int64("segmentID", segment.ID), zap.Int("Bin logs", binlogCount), zap.Int("Stat logs", statsLogCount)) - return true - } - } - deltaLogCount := GetBinlogCount(segment.GetDeltalogs()) if deltaLogCount > Params.DataCoordCfg.SingleCompactionDeltalogMaxNum.GetAsInt() { log.Info("total delta number is too much, trigger compaction", zap.Int64("segmentID", segment.ID), zap.Int("Bin logs", binlogCount), zap.Int("Delta logs", deltaLogCount)) diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 125a7eac27..cb4b76b725 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -24,7 +24,6 @@ import ( "time" "github.com/cockroachdb/errors" - "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -36,7 +35,6 @@ import ( "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" - "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) @@ -513,7 +511,7 @@ func Test_compactionTrigger_force(t *testing.T) { _, err := tr.forceTriggerCompaction(tt.collectionID) assert.Equal(t, tt.wantErr, err != nil) // expect max row num = 2048*1024*1024/(128*4) = 4194304 - assert.EqualValues(t, 4194304, tt.fields.meta.segments.GetSegments()[0].MaxRowNum) + // assert.EqualValues(t, 4194304, tt.fields.meta.segments.GetSegments()[0].MaxRowNum) spy := (tt.fields.compactionHandler).(*spyCompactionHandler) <-spy.spyChan }) @@ -999,10 +997,6 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) { compactionHandler compactionPlanContext globalTrigger *time.Ticker } - type args struct { - collectionID int64 - compactTime *compactTime - } vecFieldID := int64(201) genSeg := func(segID, numRows int64) *datapb.SegmentInfo { @@ -1210,7 +1204,7 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) { Binlogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "log1", LogSize: 100}, + {EntriesNum: 5, LogPath: "log1", LogSize: numRows * 1024 * 1024}, }, }, }, @@ -1249,31 +1243,31 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) { segments: &SegmentsInfo{ segments: map[int64]*SegmentInfo{ 1: { - SegmentInfo: genSeg(1, 20), + SegmentInfo: genSeg(1, 200), lastFlushTime: time.Now().Add(-100 * time.Minute), }, 2: { - SegmentInfo: genSeg(2, 20), + SegmentInfo: genSeg(2, 200), lastFlushTime: time.Now(), }, 3: { - SegmentInfo: genSeg(3, 20), + SegmentInfo: genSeg(3, 200), lastFlushTime: time.Now(), }, 4: { - SegmentInfo: genSeg(4, 20), + SegmentInfo: genSeg(4, 200), lastFlushTime: time.Now(), }, 5: { - SegmentInfo: genSeg(5, 20), + SegmentInfo: genSeg(5, 200), lastFlushTime: time.Now(), }, 6: { - SegmentInfo: genSeg(6, 20), + SegmentInfo: genSeg(6, 200), lastFlushTime: time.Now(), }, 7: { - SegmentInfo: genSeg(7, 20), + SegmentInfo: genSeg(7, 200), lastFlushTime: time.Now(), }, }, @@ -1403,7 +1397,7 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) { Binlogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "log1", LogSize: 100}, + {EntriesNum: 5, LogPath: "log1", LogSize: numRows * 1024 * 1024}, }, }, }, @@ -1442,27 +1436,27 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) { segments: &SegmentsInfo{ segments: map[int64]*SegmentInfo{ 1: { - SegmentInfo: genSeg(1, 60), + SegmentInfo: genSeg(1, 600), lastFlushTime: time.Now().Add(-100 * time.Minute), }, 2: { - SegmentInfo: genSeg(2, 60), + SegmentInfo: genSeg(2, 600), lastFlushTime: time.Now(), }, 3: { - SegmentInfo: genSeg(3, 60), + SegmentInfo: genSeg(3, 600), lastFlushTime: time.Now(), }, 4: { - SegmentInfo: genSeg(4, 60), + SegmentInfo: genSeg(4, 600), lastFlushTime: time.Now(), }, 5: { - SegmentInfo: genSeg(5, 26), + SegmentInfo: genSeg(5, 260), lastFlushTime: time.Now(), }, 6: { - SegmentInfo: genSeg(6, 26), + SegmentInfo: genSeg(6, 260), lastFlushTime: time.Now(), }, }, @@ -1551,9 +1545,9 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) { spy := (tt.fields.compactionHandler).(*spyCompactionHandler) select { case val := <-spy.spyChan: - // max # of rows == 110, expansion rate == 1.25. - // segment 5 and 6 are squeezed into a non-planned segment. Total # of rows: 60 + 26 + 26 == 112, - // which is greater than 110 but smaller than 110 * 1.25 + // max size == 1000, expansion rate == 1.25. + // segment 5 and 6 are squeezed into a non-planned segment. Total size: 600 + 260 + 260 == 1120, + // which is greater than 1000 but smaller than 1000 * 1.25 assert.Equal(t, len(val.SegmentBinlogs), 3) return case <-time.After(3 * time.Second): @@ -1630,7 +1624,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { Binlogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ - {EntriesNum: 5, LogPath: "log1", LogSize: size[i] * 1024 * 1024}, + {EntriesNum: 5, LogPath: "log1", LogSize: size[i] * 2 * 1024 * 1024}, }, }, }, @@ -1787,7 +1781,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { }, } - couldDo := trigger.ShouldDoSingleCompaction(info, false, &compactTime{}) + couldDo := trigger.ShouldDoSingleCompaction(info, &compactTime{}) assert.True(t, couldDo) // Test too many stats log @@ -1805,22 +1799,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { }, } - couldDo = trigger.ShouldDoSingleCompaction(info, false, &compactTime{}) - assert.True(t, couldDo) - - couldDo = trigger.ShouldDoSingleCompaction(info, true, &compactTime{}) - assert.True(t, couldDo) - - // if only 10 bin logs, then disk index won't trigger compaction - info.Statslogs = binlogs[0:40] - couldDo = trigger.ShouldDoSingleCompaction(info, false, &compactTime{}) - assert.True(t, couldDo) - - couldDo = trigger.ShouldDoSingleCompaction(info, true, &compactTime{}) - assert.False(t, couldDo) - // Test too many stats log but compacted - info.CompactionFrom = []int64{0, 1} - couldDo = trigger.ShouldDoSingleCompaction(info, false, &compactTime{}) + couldDo = trigger.ShouldDoSingleCompaction(info, &compactTime{}) assert.False(t, couldDo) // Test expire triggered compaction @@ -1855,15 +1834,15 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { } // expire time < Timestamp To - couldDo = trigger.ShouldDoSingleCompaction(info2, false, &compactTime{expireTime: 300}) + couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 300}) assert.False(t, couldDo) // didn't reach single compaction size 10 * 1024 * 1024 - couldDo = trigger.ShouldDoSingleCompaction(info2, false, &compactTime{expireTime: 600}) + couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 600}) assert.False(t, couldDo) // expire time < Timestamp False - couldDo = trigger.ShouldDoSingleCompaction(info2, false, &compactTime{expireTime: 1200}) + couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 1200}) assert.True(t, couldDo) // Test Delete triggered compaction @@ -1898,7 +1877,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { } // deltalog is large enough, should do compaction - couldDo = trigger.ShouldDoSingleCompaction(info3, false, &compactTime{}) + couldDo = trigger.ShouldDoSingleCompaction(info3, &compactTime{}) assert.True(t, couldDo) mockVersionManager := NewMockVersionManager(t) @@ -1964,7 +1943,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { // expire time < Timestamp To, but index engine version is 2 which is larger than CurrentIndexVersion in segmentIndex Params.Save(Params.DataCoordCfg.AutoUpgradeSegmentIndex.Key, "true") - couldDo = trigger.ShouldDoSingleCompaction(info4, false, &compactTime{expireTime: 300}) + couldDo = trigger.ShouldDoSingleCompaction(info4, &compactTime{expireTime: 300}) assert.True(t, couldDo) indexMeta.updateSegmentIndex(&model.SegmentIndex{ @@ -1974,7 +1953,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { IndexFileKeys: []string{"index1"}, }) // expire time < Timestamp To, and index engine version is 2 which is equal CurrentIndexVersion in segmentIndex - couldDo = trigger.ShouldDoSingleCompaction(info5, false, &compactTime{expireTime: 300}) + couldDo = trigger.ShouldDoSingleCompaction(info5, &compactTime{expireTime: 300}) assert.False(t, couldDo) indexMeta.updateSegmentIndex(&model.SegmentIndex{ @@ -1984,7 +1963,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { IndexFileKeys: nil, }) // expire time < Timestamp To, and index engine version is 2 which is larger than CurrentIndexVersion in segmentIndex but indexFileKeys is nil - couldDo = trigger.ShouldDoSingleCompaction(info6, false, &compactTime{expireTime: 300}) + couldDo = trigger.ShouldDoSingleCompaction(info6, &compactTime{expireTime: 300}) assert.False(t, couldDo) } @@ -2621,288 +2600,6 @@ func (s *CompactionTriggerSuite) TestIsChannelCheckpointHealthy() { }) } -// test updateSegmentMaxSize -func Test_compactionTrigger_updateSegmentMaxSize(t *testing.T) { - type fields struct { - meta *meta - allocator allocator - signals chan *compactionSignal - compactionHandler compactionPlanContext - globalTrigger *time.Ticker - } - type args struct { - collectionID int64 - compactTime *compactTime - } - collectionID := int64(2) - vecFieldID1 := int64(201) - vecFieldID2 := int64(202) - segmentInfos := make([]*SegmentInfo, 0) - - for i := UniqueID(0); i < 50; i++ { - info := &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ - ID: i, - CollectionID: collectionID, - }, - } - segmentInfos = append(segmentInfos, info) - } - segmentsInfo := &SegmentsInfo{ - segments: lo.SliceToMap(segmentInfos, func(t *SegmentInfo) (UniqueID, *SegmentInfo) { - return t.ID, t - }), - } - info := &collectionInfo{ - ID: collectionID, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: vecFieldID1, - DataType: schemapb.DataType_FloatVector, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: common.DimKey, - Value: "128", - }, - }, - }, - { - FieldID: vecFieldID2, - DataType: schemapb.DataType_FloatVector, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: common.DimKey, - Value: "128", - }, - }, - }, - }, - }, - } - - catalog := mocks.NewDataCoordCatalog(t) - catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil).Maybe() - - tests := []struct { - name string - fields fields - args args - isDiskANN bool - }{ - { - "all mem index", - fields{ - &meta{ - channelCPs: newChannelCps(), - catalog: catalog, - segments: segmentsInfo, - collections: map[int64]*collectionInfo{ - collectionID: info, - }, - indexMeta: &indexMeta{ - indexes: map[UniqueID]map[UniqueID]*model.Index{ - collectionID: { - indexID: { - TenantID: "", - CollectionID: 2, - FieldID: vecFieldID1, - IndexID: indexID, - IndexName: "_default_idx_1", - IsDeleted: false, - CreateTime: 0, - TypeParams: nil, - IndexParams: []*commonpb.KeyValuePair{ - { - Key: common.IndexTypeKey, - Value: "HNSW", - }, - }, - IsAutoIndex: false, - UserIndexParams: nil, - }, - indexID + 1: { - TenantID: "", - CollectionID: 2, - FieldID: vecFieldID2, - IndexID: indexID + 1, - IndexName: "_default_idx_2", - IsDeleted: false, - CreateTime: 0, - TypeParams: nil, - IndexParams: []*commonpb.KeyValuePair{ - { - Key: common.IndexTypeKey, - Value: "HNSW", - }, - }, - IsAutoIndex: false, - UserIndexParams: nil, - }, - }, - }, - }, - }, - newMockAllocator(), - nil, - &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 2)}, - nil, - }, - args{ - collectionID, - &compactTime{}, - }, - false, - }, - { - "all disk index", - fields{ - &meta{ - channelCPs: newChannelCps(), - catalog: catalog, - segments: segmentsInfo, - collections: map[int64]*collectionInfo{ - collectionID: info, - }, - indexMeta: &indexMeta{ - indexes: map[UniqueID]map[UniqueID]*model.Index{ - collectionID: { - indexID: { - TenantID: "", - CollectionID: 2, - FieldID: vecFieldID1, - IndexID: indexID, - IndexName: "_default_idx_1", - IsDeleted: false, - CreateTime: 0, - TypeParams: nil, - IndexParams: []*commonpb.KeyValuePair{ - { - Key: common.IndexTypeKey, - Value: indexparamcheck.IndexDISKANN, - }, - }, - IsAutoIndex: false, - UserIndexParams: nil, - }, - indexID + 1: { - TenantID: "", - CollectionID: 2, - FieldID: vecFieldID2, - IndexID: indexID + 1, - IndexName: "_default_idx_2", - IsDeleted: false, - CreateTime: 0, - TypeParams: nil, - IndexParams: []*commonpb.KeyValuePair{ - { - Key: common.IndexTypeKey, - Value: indexparamcheck.IndexDISKANN, - }, - }, - IsAutoIndex: false, - UserIndexParams: nil, - }, - }, - }, - }, - }, - newMockAllocator(), - nil, - &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 2)}, - nil, - }, - args{ - collectionID, - &compactTime{}, - }, - true, - }, - { - "some mme index", - fields{ - &meta{ - channelCPs: newChannelCps(), - catalog: catalog, - segments: segmentsInfo, - collections: map[int64]*collectionInfo{ - collectionID: info, - }, - indexMeta: &indexMeta{ - indexes: map[UniqueID]map[UniqueID]*model.Index{ - collectionID: { - indexID: { - TenantID: "", - CollectionID: 2, - FieldID: vecFieldID1, - IndexID: indexID, - IndexName: "_default_idx_1", - IsDeleted: false, - CreateTime: 0, - TypeParams: nil, - IndexParams: []*commonpb.KeyValuePair{ - { - Key: common.IndexTypeKey, - Value: indexparamcheck.IndexDISKANN, - }, - }, - IsAutoIndex: false, - UserIndexParams: nil, - }, - indexID + 1: { - TenantID: "", - CollectionID: 2, - FieldID: vecFieldID2, - IndexID: indexID + 1, - IndexName: "_default_idx_2", - IsDeleted: false, - CreateTime: 0, - TypeParams: nil, - IndexParams: []*commonpb.KeyValuePair{ - { - Key: common.IndexTypeKey, - Value: indexparamcheck.IndexHNSW, - }, - }, - IsAutoIndex: false, - UserIndexParams: nil, - }, - }, - }, - }, - }, - newMockAllocator(), - nil, - &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 2)}, - nil, - }, - args{ - collectionID, - &compactTime{}, - }, - false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tr := &compactionTrigger{ - meta: tt.fields.meta, - handler: newMockHandlerWithMeta(tt.fields.meta), - allocator: tt.fields.allocator, - signals: tt.fields.signals, - compactionHandler: tt.fields.compactionHandler, - globalTrigger: tt.fields.globalTrigger, - estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, - estimateNonDiskSegmentPolicy: calBySchemaPolicy, - testingOnly: true, - } - res, err := tr.updateSegmentMaxSize(segmentInfos) - assert.NoError(t, err) - assert.Equal(t, tt.isDiskANN, res) - }) - } -} - func TestCompactionTriggerSuite(t *testing.T) { suite.Run(t, new(CompactionTriggerSuite)) }