diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 338b2d749f..ead813df04 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -596,7 +596,8 @@ dataCoord: maxSize: 67108864 # The maxmum size in bytes to force trigger a LevelZero Compaction, default as 64MB deltalogMinNum: 10 # The minimum number of deltalog files to force trigger a LevelZero Compaction deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30 - forceExpiryInterval: -1 # interval in hours to do force expiry compaction, -1 means disable force expiry compaction + expiry: + tolerance: -1 # tolerant duration in hours for expiry data, negative value means disable force expiry compaction single: ratio: threshold: 0.2 # The ratio threshold of a segment to trigger a single compaction, default as 0.2 diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 5a5e1ac8a5..5f5ebdf9cd 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -53,16 +53,15 @@ type trigger interface { } type compactionSignal struct { - id UniqueID - isForce bool - collectionID UniqueID - partitionID UniqueID - channel string - segmentIDs []UniqueID - pos *msgpb.MsgPosition - resultCh chan error - waitResult bool - doStrictExpiryCompaction bool + id UniqueID + isForce bool + collectionID UniqueID + partitionID UniqueID + channel string + segmentIDs []UniqueID + pos *msgpb.MsgPosition + resultCh chan error + waitResult bool } func NewCompactionSignal() *compactionSignal { @@ -134,8 +133,6 @@ type compactionTrigger struct { // A sloopy hack, so we can test with different segment row count without worrying that // they are re-calculated in every compaction. testingOnly bool - // no need to use mutex for this map, as all operations towards should be executed serially - lastStrictExpiryCompactionTsMap map[CompactionGroupLabel]time.Time } func newCompactionTrigger( @@ -146,17 +143,16 @@ func newCompactionTrigger( indexVersionManager IndexEngineVersionManager, ) *compactionTrigger { return &compactionTrigger{ - meta: meta, - allocator: allocator, - signals: make(chan *compactionSignal, 100), - manualSignals: make(chan *compactionSignal, 100), - compactionHandler: compactionHandler, - indexEngineVersionManager: indexVersionManager, - estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, - estimateNonDiskSegmentPolicy: calBySchemaPolicy, - handler: handler, - closeCh: lifetime.NewSafeChan(), - lastStrictExpiryCompactionTsMap: make(map[CompactionGroupLabel]time.Time, 0), + meta: meta, + allocator: allocator, + signals: make(chan *compactionSignal, 100), + manualSignals: make(chan *compactionSignal, 100), + compactionHandler: compactionHandler, + indexEngineVersionManager: indexVersionManager, + estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + estimateNonDiskSegmentPolicy: calBySchemaPolicy, + handler: handler, + closeCh: lifetime.NewSafeChan(), } } @@ -325,24 +321,6 @@ func (t *compactionTrigger) allocSignalID(ctx context.Context) (UniqueID, error) return t.allocator.AllocID(ctx) } -func (t *compactionTrigger) shouldDoStrictExpiryCompaction(group *chanPartSegments) bool { - if paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.GetAsInt() > 0 { - cate := CompactionGroupLabel{group.collectionID, group.partitionID, group.channelName} - lastExpiryCompactionTime, ok := t.lastStrictExpiryCompactionTsMap[cate] - if !ok || time.Since(lastExpiryCompactionTime) >= paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.GetAsDuration(time.Hour) { - return true - } - } - return false -} - -func (t *compactionTrigger) mayUpdateStrictExpiryCompactionTs(signal *compactionSignal, plansSubmitted bool) { - if paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.GetAsInt() > 0 && signal.doStrictExpiryCompaction && plansSubmitted { - cate := CompactionGroupLabel{signal.collectionID, signal.partitionID, signal.channel} - t.lastStrictExpiryCompactionTsMap[cate] = time.Now() - } -} - // handleSignal is the internal logic to convert compactionSignal into compaction tasks. func (t *compactionTrigger) handleSignal(signal *compactionSignal) error { log := log.With(zap.Int64("compactionID", signal.id), @@ -397,13 +375,10 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error { } expectedSize := getExpectedSegmentSize(t.meta, coll) - signal.doStrictExpiryCompaction = t.shouldDoStrictExpiryCompaction(&group) plans := t.generatePlans(group.segments, signal, ct, expectedSize) - plansSubmitted := true for _, plan := range plans { if !signal.isForce && t.compactionHandler.isFull() { log.Warn("compaction plan skipped due to handler full") - plansSubmitted = false break } totalRows, inputSegmentIDs := plan.A, plan.B @@ -447,7 +422,6 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error { zap.Int64("planID", task.GetPlanID()), zap.Int64s("inputSegments", inputSegmentIDs), zap.Error(err)) - plansSubmitted = false continue } @@ -455,7 +429,6 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error { zap.Int64("time cost", time.Since(start).Milliseconds()), zap.Int64s("segmentIDs", inputSegmentIDs)) } - t.mayUpdateStrictExpiryCompactionTs(signal, plansSubmitted) } return nil } @@ -476,7 +449,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa for _, segment := range segments { segment := segment.ShadowClone() // TODO should we trigger compaction periodically even if the segment has no obvious reason to be compacted? - if signal.isForce || t.ShouldDoSingleCompaction(segment, compactTime, signal) { + if signal.isForce || t.ShouldDoSingleCompaction(segment, compactTime) { prioritizedCandidates = append(prioritizedCandidates, segment) } else if t.isSmallSegment(segment, expectedSize) { smallCandidates = append(smallCandidates, segment) @@ -680,20 +653,29 @@ func isDeleteRowsTooManySegment(segment *SegmentInfo) bool { return is } -func (t *compactionTrigger) ShouldStrictCompactExpiry(fromTs uint64, compactTime *compactTime, signal *compactionSignal, segID int64) bool { - if signal != nil && signal.doStrictExpiryCompaction && fromTs <= compactTime.expireTime { - log.Info("Trigger strict expiry compaction for segment", - zap.Int64("segmentID", segID), - zap.Int64("collectionID", signal.collectionID), - zap.Int64("partition", signal.partitionID), - zap.String("channel", signal.channel), - ) - return true +func (t *compactionTrigger) ShouldCompactExpiry(fromTs uint64, compactTime *compactTime, segment *SegmentInfo) bool { + if Params.DataCoordCfg.CompactionExpiryTolerance.GetAsInt() >= 0 { + tolerantDuration := Params.DataCoordCfg.CompactionExpiryTolerance.GetAsDuration(time.Hour) + expireTime, _ := tsoutil.ParseTS(compactTime.expireTime) + earliestTolerance := expireTime.Add(-tolerantDuration) + earliestFromTime, _ := tsoutil.ParseTS(fromTs) + if earliestFromTime.Before(earliestTolerance) { + log.Info("Trigger strict expiry compaction for segment", + zap.Int64("segmentID", segment.GetID()), + zap.Int64("collectionID", segment.GetCollectionID()), + zap.Int64("partition", segment.GetPartitionID()), + zap.String("channel", segment.GetInsertChannel()), + zap.Time("compaction expire time", expireTime), + zap.Time("earliest tolerance", earliestTolerance), + zap.Time("segment earliest from time", earliestFromTime), + ) + return true + } } return false } -func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compactTime *compactTime, signal *compactionSignal) bool { +func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compactTime *compactTime) bool { // no longer restricted binlog numbers because this is now related to field numbers log := log.Ctx(context.TODO()) @@ -724,7 +706,7 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa } } - if t.ShouldStrictCompactExpiry(earliestFromTs, compactTime, signal, segment.GetID()) { + if t.ShouldCompactExpiry(earliestFromTs, compactTime, segment) { return true } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 50fe4fb9ef..b6fd8750ef 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -178,16 +178,15 @@ func Test_compactionTrigger_force_without_index(t *testing.T) { compactionHandler := &spyCompactionHandler{t: t, spyChan: make(chan *datapb.CompactionPlan, 1), meta: m} tr := &compactionTrigger{ - meta: m, - handler: newMockHandlerWithMeta(m), - allocator: newMock0Allocator(t), - signals: make(chan *compactionSignal, 100), - manualSignals: make(chan *compactionSignal, 100), - compactionHandler: compactionHandler, - globalTrigger: nil, - closeCh: lifetime.NewSafeChan(), - testingOnly: true, - lastStrictExpiryCompactionTsMap: make(map[CompactionGroupLabel]time.Time), + meta: m, + handler: newMockHandlerWithMeta(m), + allocator: newMock0Allocator(t), + signals: make(chan *compactionSignal, 100), + manualSignals: make(chan *compactionSignal, 100), + compactionHandler: compactionHandler, + globalTrigger: nil, + closeCh: lifetime.NewSafeChan(), + testingOnly: true, } tr.closeWaiter.Add(1) go func() { @@ -1875,7 +1874,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { }, } - couldDo := trigger.ShouldDoSingleCompaction(info, &compactTime{}, nil) + couldDo := trigger.ShouldDoSingleCompaction(info, &compactTime{}) assert.True(t, couldDo) // Test too many stats log @@ -1893,7 +1892,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { }, } - couldDo = trigger.ShouldDoSingleCompaction(info, &compactTime{}, nil) + couldDo = trigger.ShouldDoSingleCompaction(info, &compactTime{}) assert.False(t, couldDo) // Test expire triggered compaction @@ -1928,29 +1927,13 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { } // expire time < Timestamp To - couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 200}, nil) + couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 200}) assert.False(t, couldDo) // didn't reach single compaction size 10 * 1024 * 1024 - couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 600}, nil) + couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 600}) assert.False(t, couldDo) - // expire time < Timestamp False - couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 1200}, nil) - assert.True(t, couldDo) - - // under strict expiry compaction mode - couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 200}, &compactionSignal{ - doStrictExpiryCompaction: true, - }) - assert.False(t, couldDo) - - // expire expireTime >= fromTs will trigger strict expiry compaction - couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 300}, &compactionSignal{ - doStrictExpiryCompaction: true, - }) - assert.True(t, couldDo) - // Test Delete triggered compaction var binlogs3 []*datapb.FieldBinlog for i := UniqueID(0); i < 100; i++ { @@ -1983,11 +1966,11 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { } // deltalog is large enough, should do compaction - couldDo = trigger.ShouldDoSingleCompaction(info3, &compactTime{}, nil) + couldDo = trigger.ShouldDoSingleCompaction(info3, &compactTime{}) assert.True(t, couldDo) mockVersionManager := NewMockVersionManager(t) - mockVersionManager.On("GetCurrentIndexEngineVersion", mock.Anything).Return(int32(2), nil) + mockVersionManager.On("GetCurrentIndexEngineVersion", mock.Anything).Return(int32(2)) trigger.indexEngineVersionManager = mockVersionManager info4 := &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ @@ -2050,7 +2033,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { // expire time < Timestamp To, but index engine version is 2 which is larger than CurrentIndexVersion in segmentIndex Params.Save(Params.DataCoordCfg.AutoUpgradeSegmentIndex.Key, "true") defer Params.Save(Params.DataCoordCfg.AutoUpgradeSegmentIndex.Key, "false") - couldDo = trigger.ShouldDoSingleCompaction(info4, &compactTime{expireTime: 300}, nil) + couldDo = trigger.ShouldDoSingleCompaction(info4, &compactTime{expireTime: 300}) assert.True(t, couldDo) indexMeta.updateSegmentIndex(&model.SegmentIndex{ @@ -2060,14 +2043,14 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { IndexFileKeys: []string{"index1"}, }) // expire time < Timestamp To, and index engine version is 2 which is equal CurrentIndexVersion in segmentIndex - couldDo = trigger.ShouldDoSingleCompaction(info5, &compactTime{expireTime: 300}, nil) + couldDo = trigger.ShouldDoSingleCompaction(info5, &compactTime{expireTime: 300}) assert.False(t, couldDo) Params.Save(Params.DataCoordCfg.ForceRebuildSegmentIndex.Key, "true") defer Params.Save(Params.DataCoordCfg.ForceRebuildSegmentIndex.Key, "false") Params.Save(Params.DataCoordCfg.TargetVecIndexVersion.Key, "5") defer Params.Save(Params.DataCoordCfg.TargetVecIndexVersion.Key, "-1") - couldDo = trigger.ShouldDoSingleCompaction(info5, &compactTime{expireTime: 300}, nil) + couldDo = trigger.ShouldDoSingleCompaction(info5, &compactTime{expireTime: 300}) assert.True(t, couldDo) indexMeta.updateSegmentIndex(&model.SegmentIndex{ @@ -2077,10 +2060,60 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { IndexFileKeys: nil, }) // expire time < Timestamp To, and index engine version is 2 which is larger than CurrentIndexVersion in segmentIndex but indexFileKeys is nil - couldDo = trigger.ShouldDoSingleCompaction(info6, &compactTime{expireTime: 300}, nil) + couldDo = trigger.ShouldDoSingleCompaction(info6, &compactTime{expireTime: 300}) assert.False(t, couldDo) } +func Test_compactionTrigger_ShouldStrictCompactExpiry(t *testing.T) { + trigger := &compactionTrigger{} + + now := time.Now() + expireTS := tsoutil.ComposeTSByTime(now, 0) + compact := &compactTime{ + expireTime: expireTS, + } + + segment := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 10, + CollectionID: 20, + PartitionID: 30, + InsertChannel: "test-channel", + }, + } + + t.Run("no tolerance, fromTs before expire time => should compact", func(t *testing.T) { + Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "0") + defer Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "-1") // reset + fromTs := tsoutil.ComposeTSByTime(now.Add(-time.Hour), 0) + shouldCompact := trigger.ShouldCompactExpiry(fromTs, compact, segment) + assert.True(t, shouldCompact) + }) + + t.Run("negative tolerance, disable force expiry compaction => should not compact", func(t *testing.T) { + fromTs := tsoutil.ComposeTSByTime(now.Add(-time.Hour), 0) + shouldCompact := trigger.ShouldCompactExpiry(fromTs, compact, segment) + assert.False(t, shouldCompact) + }) + + t.Run("with tolerance, fromTs within tolerance => should NOT compact", func(t *testing.T) { + Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "2") + defer Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "-1") // reset + + fromTs := tsoutil.ComposeTSByTime(now.Add(-time.Hour), 0) // within 2h tolerance + shouldCompact := trigger.ShouldCompactExpiry(fromTs, compact, segment) + assert.False(t, shouldCompact) + }) + + t.Run("with tolerance, fromTs before expireTime - tolerance => should compact", func(t *testing.T) { + Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "2") + defer Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "-1") // reset + fromTs := tsoutil.ComposeTSByTime(now.Add(-3*time.Hour), 0) // earlier than expireTime - 3h + shouldCompact := trigger.ShouldCompactExpiry(fromTs, compact, segment) + assert.True(t, shouldCompact) + }) +} + func Test_compactionTrigger_new(t *testing.T) { type args struct { meta *meta @@ -2621,67 +2654,6 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() { WithChannel(s.channel)) s.NoError(err) }) - s.Run("test_strict_expiry_compaction_ts_map", func() { - defer s.SetupTest() - tr := s.tr - s.compactionHandler.EXPECT().isFull().Return(false) - start := int64(20000) - s.allocator.EXPECT().AllocN(mock.Anything).RunAndReturn(func(i int64) (int64, int64, error) { - return start, start + i, nil - }).Maybe() - s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ - Schema: schema, - Properties: map[string]string{ - common.CollectionTTLConfigKey: "100", - }, - }, nil) - - // Enable strict expiry compaction - paramtable.Get().Save(paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.Key, "1") - defer paramtable.Get().Save(paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.Key, "-1") - - // First compaction should do strict expiry - cate := CompactionGroupLabel{s.collectionID, s.partitionID, s.channel} - signal := NewCompactionSignal(). - WithCollectionID(s.collectionID). - WithPartitionID(s.partitionID). - WithChannel(s.channel) - err := tr.handleSignal(signal) - s.NoError(err) - - // Verify timestamp was recorded - lastTs, exists := tr.lastStrictExpiryCompactionTsMap[cate] - s.True(exists) - s.True(time.Since(lastTs) < time.Second) - - // Second compaction within interval should not do strict expiry - shouldDoStrict := tr.shouldDoStrictExpiryCompaction(&chanPartSegments{ - collectionID: s.collectionID, - partitionID: s.partitionID, - channelName: s.channel, - }) - s.False(shouldDoStrict) - - // After interval passes, should do strict expiry again - tr.lastStrictExpiryCompactionTsMap[cate] = time.Now().Add(-2 * time.Hour) - shouldDoStrict = tr.shouldDoStrictExpiryCompaction(&chanPartSegments{ - collectionID: s.collectionID, - partitionID: s.partitionID, - channelName: s.channel, - }) - s.True(shouldDoStrict) - - // judge segments should be judged to do priority compaction - groups, err := tr.getCandidates(signal) - s.Nil(err) - s.Equal(1, len(groups)) - s.True(signal.doStrictExpiryCompaction) - coll, err := tr.getCollection(groups[0].collectionID) - ct, err := getCompactTime(tsoutil.ComposeTSByTime(time.Now(), 0), coll) - for _, segment := range groups[0].segments { - s.True(tr.ShouldDoSingleCompaction(segment, ct, signal)) - } - }) } func (s *CompactionTriggerSuite) TestSqueezeSmallSegments() { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 0b6f25944a..02962b6b3f 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3535,7 +3535,7 @@ type dataCoordConfig struct { MixCompactionTriggerInterval ParamItem `refreshable:"false"` L0CompactionTriggerInterval ParamItem `refreshable:"false"` GlobalCompactionInterval ParamItem `refreshable:"false"` - CompactionForceExpiryInterval ParamItem `refreshable:"true"` + CompactionExpiryTolerance ParamItem `refreshable:"true"` SingleCompactionRatioThreshold ParamItem `refreshable:"true"` SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"` @@ -4005,14 +4005,14 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.GlobalCompactionInterval.Init(base.mgr) - p.CompactionForceExpiryInterval = ParamItem{ - Key: "dataCoord.compaction.forceExpiryInterval", + p.CompactionExpiryTolerance = ParamItem{ + Key: "dataCoord.compaction.expiry.tolerance", Version: "2.5.0", DefaultValue: "-1", - Doc: "interval in hours to do force expiry compaction, -1 means disable force expiry compaction", + Doc: "tolerant duration in hours for expiry data, negative value means disable force expiry compaction", Export: true, } - p.CompactionForceExpiryInterval.Init(base.mgr) + p.CompactionExpiryTolerance.Init(base.mgr) p.MixCompactionTriggerInterval = ParamItem{ Key: "dataCoord.compaction.mix.triggerInterval",