enhance: refine expiring compaction(#41336) (#42063)

related: #41336
pr: https://github.com/milvus-io/milvus/pull/42056

Signed-off-by: MrPresent-Han <chun.han@gmail.com>
Co-authored-by: MrPresent-Han <chun.han@gmail.com>
This commit is contained in:
Chun Han 2025-05-25 16:46:31 +08:00 committed by GitHub
parent ba5ed97846
commit e9cb634788
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 116 additions and 161 deletions

View File

@ -596,7 +596,8 @@ dataCoord:
maxSize: 67108864 # The maxmum size in bytes to force trigger a LevelZero Compaction, default as 64MB
deltalogMinNum: 10 # The minimum number of deltalog files to force trigger a LevelZero Compaction
deltalogMaxNum: 30 # The maxmum number of deltalog files to force trigger a LevelZero Compaction, default as 30
forceExpiryInterval: -1 # interval in hours to do force expiry compaction, -1 means disable force expiry compaction
expiry:
tolerance: -1 # tolerant duration in hours for expiry data, negative value means disable force expiry compaction
single:
ratio:
threshold: 0.2 # The ratio threshold of a segment to trigger a single compaction, default as 0.2

View File

@ -53,16 +53,15 @@ type trigger interface {
}
type compactionSignal struct {
id UniqueID
isForce bool
collectionID UniqueID
partitionID UniqueID
channel string
segmentIDs []UniqueID
pos *msgpb.MsgPosition
resultCh chan error
waitResult bool
doStrictExpiryCompaction bool
id UniqueID
isForce bool
collectionID UniqueID
partitionID UniqueID
channel string
segmentIDs []UniqueID
pos *msgpb.MsgPosition
resultCh chan error
waitResult bool
}
func NewCompactionSignal() *compactionSignal {
@ -134,8 +133,6 @@ type compactionTrigger struct {
// A sloopy hack, so we can test with different segment row count without worrying that
// they are re-calculated in every compaction.
testingOnly bool
// no need to use mutex for this map, as all operations towards should be executed serially
lastStrictExpiryCompactionTsMap map[CompactionGroupLabel]time.Time
}
func newCompactionTrigger(
@ -146,17 +143,16 @@ func newCompactionTrigger(
indexVersionManager IndexEngineVersionManager,
) *compactionTrigger {
return &compactionTrigger{
meta: meta,
allocator: allocator,
signals: make(chan *compactionSignal, 100),
manualSignals: make(chan *compactionSignal, 100),
compactionHandler: compactionHandler,
indexEngineVersionManager: indexVersionManager,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
handler: handler,
closeCh: lifetime.NewSafeChan(),
lastStrictExpiryCompactionTsMap: make(map[CompactionGroupLabel]time.Time, 0),
meta: meta,
allocator: allocator,
signals: make(chan *compactionSignal, 100),
manualSignals: make(chan *compactionSignal, 100),
compactionHandler: compactionHandler,
indexEngineVersionManager: indexVersionManager,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
handler: handler,
closeCh: lifetime.NewSafeChan(),
}
}
@ -325,24 +321,6 @@ func (t *compactionTrigger) allocSignalID(ctx context.Context) (UniqueID, error)
return t.allocator.AllocID(ctx)
}
func (t *compactionTrigger) shouldDoStrictExpiryCompaction(group *chanPartSegments) bool {
if paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.GetAsInt() > 0 {
cate := CompactionGroupLabel{group.collectionID, group.partitionID, group.channelName}
lastExpiryCompactionTime, ok := t.lastStrictExpiryCompactionTsMap[cate]
if !ok || time.Since(lastExpiryCompactionTime) >= paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.GetAsDuration(time.Hour) {
return true
}
}
return false
}
func (t *compactionTrigger) mayUpdateStrictExpiryCompactionTs(signal *compactionSignal, plansSubmitted bool) {
if paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.GetAsInt() > 0 && signal.doStrictExpiryCompaction && plansSubmitted {
cate := CompactionGroupLabel{signal.collectionID, signal.partitionID, signal.channel}
t.lastStrictExpiryCompactionTsMap[cate] = time.Now()
}
}
// handleSignal is the internal logic to convert compactionSignal into compaction tasks.
func (t *compactionTrigger) handleSignal(signal *compactionSignal) error {
log := log.With(zap.Int64("compactionID", signal.id),
@ -397,13 +375,10 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error {
}
expectedSize := getExpectedSegmentSize(t.meta, coll)
signal.doStrictExpiryCompaction = t.shouldDoStrictExpiryCompaction(&group)
plans := t.generatePlans(group.segments, signal, ct, expectedSize)
plansSubmitted := true
for _, plan := range plans {
if !signal.isForce && t.compactionHandler.isFull() {
log.Warn("compaction plan skipped due to handler full")
plansSubmitted = false
break
}
totalRows, inputSegmentIDs := plan.A, plan.B
@ -447,7 +422,6 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error {
zap.Int64("planID", task.GetPlanID()),
zap.Int64s("inputSegments", inputSegmentIDs),
zap.Error(err))
plansSubmitted = false
continue
}
@ -455,7 +429,6 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error {
zap.Int64("time cost", time.Since(start).Milliseconds()),
zap.Int64s("segmentIDs", inputSegmentIDs))
}
t.mayUpdateStrictExpiryCompactionTs(signal, plansSubmitted)
}
return nil
}
@ -476,7 +449,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, signal *compa
for _, segment := range segments {
segment := segment.ShadowClone()
// TODO should we trigger compaction periodically even if the segment has no obvious reason to be compacted?
if signal.isForce || t.ShouldDoSingleCompaction(segment, compactTime, signal) {
if signal.isForce || t.ShouldDoSingleCompaction(segment, compactTime) {
prioritizedCandidates = append(prioritizedCandidates, segment)
} else if t.isSmallSegment(segment, expectedSize) {
smallCandidates = append(smallCandidates, segment)
@ -680,20 +653,29 @@ func isDeleteRowsTooManySegment(segment *SegmentInfo) bool {
return is
}
func (t *compactionTrigger) ShouldStrictCompactExpiry(fromTs uint64, compactTime *compactTime, signal *compactionSignal, segID int64) bool {
if signal != nil && signal.doStrictExpiryCompaction && fromTs <= compactTime.expireTime {
log.Info("Trigger strict expiry compaction for segment",
zap.Int64("segmentID", segID),
zap.Int64("collectionID", signal.collectionID),
zap.Int64("partition", signal.partitionID),
zap.String("channel", signal.channel),
)
return true
func (t *compactionTrigger) ShouldCompactExpiry(fromTs uint64, compactTime *compactTime, segment *SegmentInfo) bool {
if Params.DataCoordCfg.CompactionExpiryTolerance.GetAsInt() >= 0 {
tolerantDuration := Params.DataCoordCfg.CompactionExpiryTolerance.GetAsDuration(time.Hour)
expireTime, _ := tsoutil.ParseTS(compactTime.expireTime)
earliestTolerance := expireTime.Add(-tolerantDuration)
earliestFromTime, _ := tsoutil.ParseTS(fromTs)
if earliestFromTime.Before(earliestTolerance) {
log.Info("Trigger strict expiry compaction for segment",
zap.Int64("segmentID", segment.GetID()),
zap.Int64("collectionID", segment.GetCollectionID()),
zap.Int64("partition", segment.GetPartitionID()),
zap.String("channel", segment.GetInsertChannel()),
zap.Time("compaction expire time", expireTime),
zap.Time("earliest tolerance", earliestTolerance),
zap.Time("segment earliest from time", earliestFromTime),
)
return true
}
}
return false
}
func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compactTime *compactTime, signal *compactionSignal) bool {
func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compactTime *compactTime) bool {
// no longer restricted binlog numbers because this is now related to field numbers
log := log.Ctx(context.TODO())
@ -724,7 +706,7 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa
}
}
if t.ShouldStrictCompactExpiry(earliestFromTs, compactTime, signal, segment.GetID()) {
if t.ShouldCompactExpiry(earliestFromTs, compactTime, segment) {
return true
}

View File

@ -178,16 +178,15 @@ func Test_compactionTrigger_force_without_index(t *testing.T) {
compactionHandler := &spyCompactionHandler{t: t, spyChan: make(chan *datapb.CompactionPlan, 1), meta: m}
tr := &compactionTrigger{
meta: m,
handler: newMockHandlerWithMeta(m),
allocator: newMock0Allocator(t),
signals: make(chan *compactionSignal, 100),
manualSignals: make(chan *compactionSignal, 100),
compactionHandler: compactionHandler,
globalTrigger: nil,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
lastStrictExpiryCompactionTsMap: make(map[CompactionGroupLabel]time.Time),
meta: m,
handler: newMockHandlerWithMeta(m),
allocator: newMock0Allocator(t),
signals: make(chan *compactionSignal, 100),
manualSignals: make(chan *compactionSignal, 100),
compactionHandler: compactionHandler,
globalTrigger: nil,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
tr.closeWaiter.Add(1)
go func() {
@ -1875,7 +1874,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
},
}
couldDo := trigger.ShouldDoSingleCompaction(info, &compactTime{}, nil)
couldDo := trigger.ShouldDoSingleCompaction(info, &compactTime{})
assert.True(t, couldDo)
// Test too many stats log
@ -1893,7 +1892,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
},
}
couldDo = trigger.ShouldDoSingleCompaction(info, &compactTime{}, nil)
couldDo = trigger.ShouldDoSingleCompaction(info, &compactTime{})
assert.False(t, couldDo)
// Test expire triggered compaction
@ -1928,29 +1927,13 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
}
// expire time < Timestamp To
couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 200}, nil)
couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 200})
assert.False(t, couldDo)
// didn't reach single compaction size 10 * 1024 * 1024
couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 600}, nil)
couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 600})
assert.False(t, couldDo)
// expire time < Timestamp False
couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 1200}, nil)
assert.True(t, couldDo)
// under strict expiry compaction mode
couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 200}, &compactionSignal{
doStrictExpiryCompaction: true,
})
assert.False(t, couldDo)
// expire expireTime >= fromTs will trigger strict expiry compaction
couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{expireTime: 300}, &compactionSignal{
doStrictExpiryCompaction: true,
})
assert.True(t, couldDo)
// Test Delete triggered compaction
var binlogs3 []*datapb.FieldBinlog
for i := UniqueID(0); i < 100; i++ {
@ -1983,11 +1966,11 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
}
// deltalog is large enough, should do compaction
couldDo = trigger.ShouldDoSingleCompaction(info3, &compactTime{}, nil)
couldDo = trigger.ShouldDoSingleCompaction(info3, &compactTime{})
assert.True(t, couldDo)
mockVersionManager := NewMockVersionManager(t)
mockVersionManager.On("GetCurrentIndexEngineVersion", mock.Anything).Return(int32(2), nil)
mockVersionManager.On("GetCurrentIndexEngineVersion", mock.Anything).Return(int32(2))
trigger.indexEngineVersionManager = mockVersionManager
info4 := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
@ -2050,7 +2033,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
// expire time < Timestamp To, but index engine version is 2 which is larger than CurrentIndexVersion in segmentIndex
Params.Save(Params.DataCoordCfg.AutoUpgradeSegmentIndex.Key, "true")
defer Params.Save(Params.DataCoordCfg.AutoUpgradeSegmentIndex.Key, "false")
couldDo = trigger.ShouldDoSingleCompaction(info4, &compactTime{expireTime: 300}, nil)
couldDo = trigger.ShouldDoSingleCompaction(info4, &compactTime{expireTime: 300})
assert.True(t, couldDo)
indexMeta.updateSegmentIndex(&model.SegmentIndex{
@ -2060,14 +2043,14 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
IndexFileKeys: []string{"index1"},
})
// expire time < Timestamp To, and index engine version is 2 which is equal CurrentIndexVersion in segmentIndex
couldDo = trigger.ShouldDoSingleCompaction(info5, &compactTime{expireTime: 300}, nil)
couldDo = trigger.ShouldDoSingleCompaction(info5, &compactTime{expireTime: 300})
assert.False(t, couldDo)
Params.Save(Params.DataCoordCfg.ForceRebuildSegmentIndex.Key, "true")
defer Params.Save(Params.DataCoordCfg.ForceRebuildSegmentIndex.Key, "false")
Params.Save(Params.DataCoordCfg.TargetVecIndexVersion.Key, "5")
defer Params.Save(Params.DataCoordCfg.TargetVecIndexVersion.Key, "-1")
couldDo = trigger.ShouldDoSingleCompaction(info5, &compactTime{expireTime: 300}, nil)
couldDo = trigger.ShouldDoSingleCompaction(info5, &compactTime{expireTime: 300})
assert.True(t, couldDo)
indexMeta.updateSegmentIndex(&model.SegmentIndex{
@ -2077,10 +2060,60 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
IndexFileKeys: nil,
})
// expire time < Timestamp To, and index engine version is 2 which is larger than CurrentIndexVersion in segmentIndex but indexFileKeys is nil
couldDo = trigger.ShouldDoSingleCompaction(info6, &compactTime{expireTime: 300}, nil)
couldDo = trigger.ShouldDoSingleCompaction(info6, &compactTime{expireTime: 300})
assert.False(t, couldDo)
}
func Test_compactionTrigger_ShouldStrictCompactExpiry(t *testing.T) {
trigger := &compactionTrigger{}
now := time.Now()
expireTS := tsoutil.ComposeTSByTime(now, 0)
compact := &compactTime{
expireTime: expireTS,
}
segment := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 10,
CollectionID: 20,
PartitionID: 30,
InsertChannel: "test-channel",
},
}
t.Run("no tolerance, fromTs before expire time => should compact", func(t *testing.T) {
Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "0")
defer Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "-1") // reset
fromTs := tsoutil.ComposeTSByTime(now.Add(-time.Hour), 0)
shouldCompact := trigger.ShouldCompactExpiry(fromTs, compact, segment)
assert.True(t, shouldCompact)
})
t.Run("negative tolerance, disable force expiry compaction => should not compact", func(t *testing.T) {
fromTs := tsoutil.ComposeTSByTime(now.Add(-time.Hour), 0)
shouldCompact := trigger.ShouldCompactExpiry(fromTs, compact, segment)
assert.False(t, shouldCompact)
})
t.Run("with tolerance, fromTs within tolerance => should NOT compact", func(t *testing.T) {
Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "2")
defer Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "-1") // reset
fromTs := tsoutil.ComposeTSByTime(now.Add(-time.Hour), 0) // within 2h tolerance
shouldCompact := trigger.ShouldCompactExpiry(fromTs, compact, segment)
assert.False(t, shouldCompact)
})
t.Run("with tolerance, fromTs before expireTime - tolerance => should compact", func(t *testing.T) {
Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "2")
defer Params.Save(Params.DataCoordCfg.CompactionExpiryTolerance.Key, "-1") // reset
fromTs := tsoutil.ComposeTSByTime(now.Add(-3*time.Hour), 0) // earlier than expireTime - 3h
shouldCompact := trigger.ShouldCompactExpiry(fromTs, compact, segment)
assert.True(t, shouldCompact)
})
}
func Test_compactionTrigger_new(t *testing.T) {
type args struct {
meta *meta
@ -2621,67 +2654,6 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() {
WithChannel(s.channel))
s.NoError(err)
})
s.Run("test_strict_expiry_compaction_ts_map", func() {
defer s.SetupTest()
tr := s.tr
s.compactionHandler.EXPECT().isFull().Return(false)
start := int64(20000)
s.allocator.EXPECT().AllocN(mock.Anything).RunAndReturn(func(i int64) (int64, int64, error) {
return start, start + i, nil
}).Maybe()
s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{
Schema: schema,
Properties: map[string]string{
common.CollectionTTLConfigKey: "100",
},
}, nil)
// Enable strict expiry compaction
paramtable.Get().Save(paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.Key, "1")
defer paramtable.Get().Save(paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.Key, "-1")
// First compaction should do strict expiry
cate := CompactionGroupLabel{s.collectionID, s.partitionID, s.channel}
signal := NewCompactionSignal().
WithCollectionID(s.collectionID).
WithPartitionID(s.partitionID).
WithChannel(s.channel)
err := tr.handleSignal(signal)
s.NoError(err)
// Verify timestamp was recorded
lastTs, exists := tr.lastStrictExpiryCompactionTsMap[cate]
s.True(exists)
s.True(time.Since(lastTs) < time.Second)
// Second compaction within interval should not do strict expiry
shouldDoStrict := tr.shouldDoStrictExpiryCompaction(&chanPartSegments{
collectionID: s.collectionID,
partitionID: s.partitionID,
channelName: s.channel,
})
s.False(shouldDoStrict)
// After interval passes, should do strict expiry again
tr.lastStrictExpiryCompactionTsMap[cate] = time.Now().Add(-2 * time.Hour)
shouldDoStrict = tr.shouldDoStrictExpiryCompaction(&chanPartSegments{
collectionID: s.collectionID,
partitionID: s.partitionID,
channelName: s.channel,
})
s.True(shouldDoStrict)
// judge segments should be judged to do priority compaction
groups, err := tr.getCandidates(signal)
s.Nil(err)
s.Equal(1, len(groups))
s.True(signal.doStrictExpiryCompaction)
coll, err := tr.getCollection(groups[0].collectionID)
ct, err := getCompactTime(tsoutil.ComposeTSByTime(time.Now(), 0), coll)
for _, segment := range groups[0].segments {
s.True(tr.ShouldDoSingleCompaction(segment, ct, signal))
}
})
}
func (s *CompactionTriggerSuite) TestSqueezeSmallSegments() {

View File

@ -3535,7 +3535,7 @@ type dataCoordConfig struct {
MixCompactionTriggerInterval ParamItem `refreshable:"false"`
L0CompactionTriggerInterval ParamItem `refreshable:"false"`
GlobalCompactionInterval ParamItem `refreshable:"false"`
CompactionForceExpiryInterval ParamItem `refreshable:"true"`
CompactionExpiryTolerance ParamItem `refreshable:"true"`
SingleCompactionRatioThreshold ParamItem `refreshable:"true"`
SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"`
@ -4005,14 +4005,14 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.GlobalCompactionInterval.Init(base.mgr)
p.CompactionForceExpiryInterval = ParamItem{
Key: "dataCoord.compaction.forceExpiryInterval",
p.CompactionExpiryTolerance = ParamItem{
Key: "dataCoord.compaction.expiry.tolerance",
Version: "2.5.0",
DefaultValue: "-1",
Doc: "interval in hours to do force expiry compaction, -1 means disable force expiry compaction",
Doc: "tolerant duration in hours for expiry data, negative value means disable force expiry compaction",
Export: true,
}
p.CompactionForceExpiryInterval.Init(base.mgr)
p.CompactionExpiryTolerance.Init(base.mgr)
p.MixCompactionTriggerInterval = ParamItem{
Key: "dataCoord.compaction.mix.triggerInterval",