diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 383e0642c0..ed4c0accfe 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -620,6 +620,8 @@ dataCoord: maxSize: 67108864 # The maxmum size in bytes to force trigger a LevelZero Compaction, default as 64MB deltalogMinNum: 10 # The minimum number of deltalog files to force trigger a LevelZero Compaction deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30 + expiry: + tolerance: -1 # tolerant duration in hours for expiry data, negative value means no toleration and equivalent to zero single: ratio: threshold: 0.2 # The ratio threshold of a segment to trigger a single compaction, default as 0.2 diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 8a8dfa41d3..27cf286f57 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -658,6 +658,28 @@ func isDeleteRowsTooManySegment(segment *SegmentInfo) bool { return is } +func (t *compactionTrigger) ShouldCompactExpiry(fromTs uint64, compactTime *compactTime, segment *SegmentInfo) bool { + if Params.DataCoordCfg.CompactionExpiryTolerance.GetAsInt() >= 0 { + tolerantDuration := Params.DataCoordCfg.CompactionExpiryTolerance.GetAsDuration(time.Hour) + expireTime, _ := tsoutil.ParseTS(compactTime.expireTime) + earliestTolerance := expireTime.Add(-tolerantDuration) + earliestFromTime, _ := tsoutil.ParseTS(fromTs) + if earliestFromTime.Before(earliestTolerance) { + log.Info("Trigger strict expiry compaction for segment", + zap.Int64("segmentID", segment.GetID()), + zap.Int64("collectionID", segment.GetCollectionID()), + zap.Int64("partition", segment.GetPartitionID()), + zap.String("channel", segment.GetInsertChannel()), + zap.Time("compaction expire time", expireTime), + zap.Time("earliest tolerance", earliestTolerance), + zap.Time("segment earliest from time", earliestFromTime), + ) + return true + } + } + return false +} + func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compactTime *compactTime) bool { // no longer restricted binlog numbers because this is now related to field numbers @@ -672,6 +694,7 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa // if expire time is enabled, put segment into compaction candidate totalExpiredSize := int64(0) totalExpiredRows := 0 + var earliestFromTs uint64 = math.MaxUint64 for _, binlogs := range segment.GetBinlogs() { for _, l := range binlogs.GetBinlogs() { // TODO, we should probably estimate expired log entries by total rows in binlog and the ralationship of timeTo, timeFrom and expire time @@ -684,8 +707,12 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa totalExpiredRows += int(l.GetEntriesNum()) totalExpiredSize += l.GetMemorySize() } + earliestFromTs = min(earliestFromTs, l.TimestampFrom) } } + if t.ShouldCompactExpiry(earliestFromTs, compactTime, segment) { + return true + } if float64(totalExpiredRows)/float64(segment.GetNumOfRows()) >= Params.DataCoordCfg.SingleCompactionRatioThreshold.GetAsFloat() || totalExpiredSize > Params.DataCoordCfg.SingleCompactionExpiredLogMaxSize.GetAsInt64() { diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 947bb83645..7f84a1d591 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -2077,6 +2077,56 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { assert.False(t, couldDo) } +func Test_compactionTrigger_ShouldStrictCompactExpiry(t *testing.T) { + trigger := &compactionTrigger{} + + now := time.Now() + expireTS := tsoutil.ComposeTSByTime(now, 0) + compact := &compactTime{ + expireTime: expireTS, + } + + segment := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 10, + CollectionID: 20, + PartitionID: 30, + InsertChannel: "test-channel", + }, + } + + t.Run("no tolerance, fromTs before expire time => should compact", func(t *testing.T) { + Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "0") + defer Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "-1") // reset + fromTs := tsoutil.ComposeTSByTime(now.Add(-time.Hour), 0) + shouldCompact := trigger.ShouldCompactExpiry(fromTs, compact, segment) + assert.True(t, shouldCompact) + }) + + t.Run("negative tolerance, disable force expiry compaction => should not compact", func(t *testing.T) { + fromTs := tsoutil.ComposeTSByTime(now.Add(-time.Hour), 0) + shouldCompact := trigger.ShouldCompactExpiry(fromTs, compact, segment) + assert.False(t, shouldCompact) + }) + + t.Run("with tolerance, fromTs within tolerance => should NOT compact", func(t *testing.T) { + Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "2") + defer Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "-1") // reset + + fromTs := tsoutil.ComposeTSByTime(now.Add(-time.Hour), 0) // within 2h tolerance + shouldCompact := trigger.ShouldCompactExpiry(fromTs, compact, segment) + assert.False(t, shouldCompact) + }) + + t.Run("with tolerance, fromTs before expireTime - tolerance => should compact", func(t *testing.T) { + Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "2") + defer Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "-1") // reset + fromTs := tsoutil.ComposeTSByTime(now.Add(-3*time.Hour), 0) // earlier than expireTime - 30m + shouldCompact := trigger.ShouldCompactExpiry(fromTs, compact, segment) + assert.True(t, shouldCompact) + }) +} + func Test_compactionTrigger_new(t *testing.T) { type args struct { meta *meta diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 123305bd92..740e892e2f 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3812,6 +3812,7 @@ type dataCoordConfig struct { MixCompactionTriggerInterval ParamItem `refreshable:"false"` L0CompactionTriggerInterval ParamItem `refreshable:"false"` GlobalCompactionInterval ParamItem `refreshable:"false"` + CompactionExpiryTolerance ParamItem `refreshable:"true"` SingleCompactionRatioThreshold ParamItem `refreshable:"true"` SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"` @@ -4282,6 +4283,15 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.GlobalCompactionInterval.Init(base.mgr) + p.CompactionExpiryTolerance = ParamItem{ + Key: "dataCoord.compaction.expiry.tolerance", + Version: "2.5.0", + DefaultValue: "-1", + Doc: "tolerant duration in hours for expiry data, negative value means no toleration and equivalent to zero", + Export: true, + } + p.CompactionExpiryTolerance.Init(base.mgr) + p.MixCompactionTriggerInterval = ParamItem{ Key: "dataCoord.compaction.mix.triggerInterval", Version: "2.4.15",