feature: support compact expiry data(#41336) (#42056)

related: #41336

Signed-off-by: MrPresent-Han <chun.han@gmail.com>
Co-authored-by: MrPresent-Han <chun.han@gmail.com>
This commit is contained in:
Chun Han 2025-05-25 16:46:31 +08:00 committed by GitHub
parent 194b492f05
commit d1cfa58a0a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 89 additions and 0 deletions

View File

@ -620,6 +620,8 @@ dataCoord:
maxSize: 67108864 # The maxmum size in bytes to force trigger a LevelZero Compaction, default as 64MB
deltalogMinNum: 10 # The minimum number of deltalog files to force trigger a LevelZero Compaction
deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30
expiry:
tolerance: -1 # tolerant duration in hours for expiry data, negative value means no toleration and equivalent to zero
single:
ratio:
threshold: 0.2 # The ratio threshold of a segment to trigger a single compaction, default as 0.2

View File

@ -658,6 +658,28 @@ func isDeleteRowsTooManySegment(segment *SegmentInfo) bool {
return is
}
func (t *compactionTrigger) ShouldCompactExpiry(fromTs uint64, compactTime *compactTime, segment *SegmentInfo) bool {
if Params.DataCoordCfg.CompactionExpiryTolerance.GetAsInt() >= 0 {
tolerantDuration := Params.DataCoordCfg.CompactionExpiryTolerance.GetAsDuration(time.Hour)
expireTime, _ := tsoutil.ParseTS(compactTime.expireTime)
earliestTolerance := expireTime.Add(-tolerantDuration)
earliestFromTime, _ := tsoutil.ParseTS(fromTs)
if earliestFromTime.Before(earliestTolerance) {
log.Info("Trigger strict expiry compaction for segment",
zap.Int64("segmentID", segment.GetID()),
zap.Int64("collectionID", segment.GetCollectionID()),
zap.Int64("partition", segment.GetPartitionID()),
zap.String("channel", segment.GetInsertChannel()),
zap.Time("compaction expire time", expireTime),
zap.Time("earliest tolerance", earliestTolerance),
zap.Time("segment earliest from time", earliestFromTime),
)
return true
}
}
return false
}
func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compactTime *compactTime) bool {
// no longer restricted binlog numbers because this is now related to field numbers
@ -672,6 +694,7 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa
// if expire time is enabled, put segment into compaction candidate
totalExpiredSize := int64(0)
totalExpiredRows := 0
var earliestFromTs uint64 = math.MaxUint64
for _, binlogs := range segment.GetBinlogs() {
for _, l := range binlogs.GetBinlogs() {
// TODO, we should probably estimate expired log entries by total rows in binlog and the ralationship of timeTo, timeFrom and expire time
@ -684,8 +707,12 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa
totalExpiredRows += int(l.GetEntriesNum())
totalExpiredSize += l.GetMemorySize()
}
earliestFromTs = min(earliestFromTs, l.TimestampFrom)
}
}
if t.ShouldCompactExpiry(earliestFromTs, compactTime, segment) {
return true
}
if float64(totalExpiredRows)/float64(segment.GetNumOfRows()) >= Params.DataCoordCfg.SingleCompactionRatioThreshold.GetAsFloat() ||
totalExpiredSize > Params.DataCoordCfg.SingleCompactionExpiredLogMaxSize.GetAsInt64() {

View File

@ -2077,6 +2077,56 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
assert.False(t, couldDo)
}
func Test_compactionTrigger_ShouldStrictCompactExpiry(t *testing.T) {
trigger := &compactionTrigger{}
now := time.Now()
expireTS := tsoutil.ComposeTSByTime(now, 0)
compact := &compactTime{
expireTime: expireTS,
}
segment := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 10,
CollectionID: 20,
PartitionID: 30,
InsertChannel: "test-channel",
},
}
t.Run("no tolerance, fromTs before expire time => should compact", func(t *testing.T) {
Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "0")
defer Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "-1") // reset
fromTs := tsoutil.ComposeTSByTime(now.Add(-time.Hour), 0)
shouldCompact := trigger.ShouldCompactExpiry(fromTs, compact, segment)
assert.True(t, shouldCompact)
})
t.Run("negative tolerance, disable force expiry compaction => should not compact", func(t *testing.T) {
fromTs := tsoutil.ComposeTSByTime(now.Add(-time.Hour), 0)
shouldCompact := trigger.ShouldCompactExpiry(fromTs, compact, segment)
assert.False(t, shouldCompact)
})
t.Run("with tolerance, fromTs within tolerance => should NOT compact", func(t *testing.T) {
Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "2")
defer Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "-1") // reset
fromTs := tsoutil.ComposeTSByTime(now.Add(-time.Hour), 0) // within 2h tolerance
shouldCompact := trigger.ShouldCompactExpiry(fromTs, compact, segment)
assert.False(t, shouldCompact)
})
t.Run("with tolerance, fromTs before expireTime - tolerance => should compact", func(t *testing.T) {
Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "2")
defer Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "-1") // reset
fromTs := tsoutil.ComposeTSByTime(now.Add(-3*time.Hour), 0) // earlier than expireTime - 30m
shouldCompact := trigger.ShouldCompactExpiry(fromTs, compact, segment)
assert.True(t, shouldCompact)
})
}
func Test_compactionTrigger_new(t *testing.T) {
type args struct {
meta *meta

View File

@ -3812,6 +3812,7 @@ type dataCoordConfig struct {
MixCompactionTriggerInterval ParamItem `refreshable:"false"`
L0CompactionTriggerInterval ParamItem `refreshable:"false"`
GlobalCompactionInterval ParamItem `refreshable:"false"`
CompactionExpiryTolerance ParamItem `refreshable:"true"`
SingleCompactionRatioThreshold ParamItem `refreshable:"true"`
SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"`
@ -4282,6 +4283,15 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.GlobalCompactionInterval.Init(base.mgr)
p.CompactionExpiryTolerance = ParamItem{
Key: "dataCoord.compaction.expiry.tolerance",
Version: "2.5.0",
DefaultValue: "-1",
Doc: "tolerant duration in hours for expiry data, negative value means no toleration and equivalent to zero",
Export: true,
}
p.CompactionExpiryTolerance.Init(base.mgr)
p.MixCompactionTriggerInterval = ParamItem{
Key: "dataCoord.compaction.mix.triggerInterval",
Version: "2.4.15",