diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 7fa71809e3..df7ebc65d9 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -537,17 +537,17 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { log.Warn("failed to execute compaction plan", zap.Int64("collection", signal.collectionID), zap.Int64("planID", plan.PlanID), - zap.Int64s("segment IDs", fetchSegIDs(plan.GetSegmentBinlogs())), + zap.Int64s("segmentIDs", fetchSegIDs(plan.GetSegmentBinlogs())), zap.Error(err)) continue } log.Info("time cost of generating compaction", - zap.Int64("plan ID", plan.PlanID), + zap.Int64("planID", plan.PlanID), zap.Int64("time cost", time.Since(start).Milliseconds()), zap.Int64("collectionID", signal.collectionID), zap.String("channel", channel), zap.Int64("partitionID", partitionID), - zap.Int64s("segment IDs", fetchSegIDs(plan.GetSegmentBinlogs()))) + zap.Int64s("segmentIDs", fetchSegIDs(plan.GetSegmentBinlogs()))) } } @@ -626,6 +626,12 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, c } } // since this is priority compaction, we will execute even if there is only segment + log.Info("pick priority candidate for compaction", + zap.Int64("prioritized segmentID", segment.GetID()), + zap.Int64s("picked segmentIDs", lo.Map(bucket, func(s *SegmentInfo, _ int) int64 { return s.GetID() })), + zap.Int64("target size", lo.SumBy(bucket, func(s *SegmentInfo) int64 { return s.getSegmentSize() })), + zap.Int64("target count", lo.SumBy(bucket, func(s *SegmentInfo) int64 { return s.GetNumOfRows() })), + ) buckets = append(buckets, bucket) } @@ -646,13 +652,8 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, c smallCandidates, result, _ = reverseGreedySelect(smallCandidates, free, Params.DataCoordCfg.MaxSegmentToMerge.GetAsInt()-1) bucket = append(bucket, result...) - var targetSize int64 - var targetRow int64 - for _, s := range bucket { - targetSize += s.getSegmentSize() - targetRow += s.GetNumOfRows() - } - // only merge if candidate number is large than MinSegmentToMerge or if target row is large enough + // only merge if candidate number is large than MinSegmentToMerge or if target size is large enough + targetSize := lo.SumBy(bucket, func(s *SegmentInfo) int64 { return s.getSegmentSize() }) if len(bucket) >= Params.DataCoordCfg.MinSegmentToMerge.GetAsInt() || len(bucket) > 1 && t.isCompactableSegment(targetSize, expectedSize) { buckets = append(buckets, bucket) @@ -660,24 +661,9 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, c remainingSmallSegs = append(remainingSmallSegs, bucket...) } } - // Try adding remaining segments to existing plans. - for i := len(remainingSmallSegs) - 1; i >= 0; i-- { - s := remainingSmallSegs[i] - if !isExpandableSmallSegment(s, expectedSize) { - continue - } - // Try squeeze this segment into existing plans. This could cause segment size to exceed maxSize. - for i, b := range buckets { - totalSize := lo.SumBy(b, func(s *SegmentInfo) int64 { return s.getSegmentSize() }) - if totalSize+s.getSegmentSize() > int64(Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()*float64(expectedSize)) { - continue - } - buckets[i] = append(buckets[i], s) - remainingSmallSegs = append(remainingSmallSegs[:i], remainingSmallSegs[i+1:]...) - break - } - } + remainingSmallSegs = t.squeezeSmallSegmentsToBuckets(remainingSmallSegs, buckets, expectedSize) + // If there are still remaining small segments, try adding them to non-planned segments. for _, npSeg := range nonPlannedSegments { bucket := []*SegmentInfo{npSeg} @@ -890,3 +876,26 @@ func fetchSegIDs(segBinLogs []*datapb.CompactionSegmentBinlogs) []int64 { } return segIDs } + +// buckets will be updated inplace +func (t *compactionTrigger) squeezeSmallSegmentsToBuckets(small []*SegmentInfo, buckets [][]*SegmentInfo, expectedSize int64) (remaining []*SegmentInfo) { + for i := len(small) - 1; i >= 0; i-- { + s := small[i] + if !isExpandableSmallSegment(s, expectedSize) { + continue + } + // Try squeeze this segment into existing plans. This could cause segment size to exceed maxSize. + for bidx, b := range buckets { + totalSize := lo.SumBy(b, func(s *SegmentInfo) int64 { return s.getSegmentSize() }) + if totalSize+s.getSegmentSize() > int64(Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()*float64(expectedSize)) { + continue + } + buckets[bidx] = append(buckets[bidx], s) + + small = append(small[:i], small[i+1:]...) + break + } + } + + return small +} diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index cb4b76b725..a99c10d810 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -19,7 +19,7 @@ package datacoord import ( "context" "sort" - "sync/atomic" + satomic "sync/atomic" "testing" "time" @@ -27,6 +27,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "go.uber.org/atomic" + "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" @@ -35,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) @@ -2074,7 +2077,7 @@ func Test_triggerSingleCompaction(t *testing.T) { err := got.triggerSingleCompaction(2, 2, 2, "b", false) assert.NoError(t, err) } - var i atomic.Value + var i satomic.Value i.Store(0) check := func() { for { @@ -2095,7 +2098,7 @@ func Test_triggerSingleCompaction(t *testing.T) { err := got.triggerSingleCompaction(3, 3, 3, "c", true) assert.NoError(t, err) } - var j atomic.Value + var j satomic.Value j.Store(0) go func() { timeoutCtx, cancelFunc := context.WithTimeout(context.Background(), time.Second) @@ -2600,6 +2603,27 @@ func (s *CompactionTriggerSuite) TestIsChannelCheckpointHealthy() { }) } +func (s *CompactionTriggerSuite) TestSqueezeSmallSegments() { + expectedSize := int64(70000) + smallsegments := []*SegmentInfo{ + {SegmentInfo: &datapb.SegmentInfo{ID: 3}, size: *atomic.NewInt64(69999)}, + {SegmentInfo: &datapb.SegmentInfo{ID: 1}, size: *atomic.NewInt64(100)}, + } + + largeSegment := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 2}, size: *atomic.NewInt64(expectedSize)} + buckets := [][]*SegmentInfo{{largeSegment}} + s.Require().Equal(1, len(buckets)) + s.Require().Equal(1, len(buckets[0])) + + remaining := s.tr.squeezeSmallSegmentsToBuckets(smallsegments, buckets, expectedSize) + s.Equal(1, len(remaining)) + s.EqualValues(3, remaining[0].ID) + + s.Equal(1, len(buckets)) + s.Equal(2, len(buckets[0])) + log.Info("buckets", zap.Any("buckets", buckets)) +} + func TestCompactionTriggerSuite(t *testing.T) { suite.Run(t, new(CompactionTriggerSuite)) }