From a6d4b47879aa570341eb41d72c17018a24f88fec Mon Sep 17 00:00:00 2001 From: Chun Han <116052805+MrPresent-Han@users.noreply.github.com> Date: Thu, 22 May 2025 12:14:25 +0800 Subject: [PATCH] fix: failed to reset time point for force expiry compaction(#41855) (#42000) related: #41855 Signed-off-by: MrPresent-Han Co-authored-by: MrPresent-Han --- internal/datacoord/compaction_trigger.go | 27 +++++++++++++++--------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 5a5e1ac8a5..5a856b694a 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -326,19 +326,24 @@ func (t *compactionTrigger) allocSignalID(ctx context.Context) (UniqueID, error) } func (t *compactionTrigger) shouldDoStrictExpiryCompaction(group *chanPartSegments) bool { - if paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.GetAsInt() > 0 { + if group != nil && paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.GetAsInt() > 0 { cate := CompactionGroupLabel{group.collectionID, group.partitionID, group.channelName} lastExpiryCompactionTime, ok := t.lastStrictExpiryCompactionTsMap[cate] if !ok || time.Since(lastExpiryCompactionTime) >= paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.GetAsDuration(time.Hour) { + log.Info("Try to do StrictExpiryCompaction", + zap.Duration("duration", time.Since(lastExpiryCompactionTime)), + zap.Int64("CollectionID", group.collectionID), + zap.Int64("PartitionID", group.partitionID), + zap.String("Channel", group.channelName)) return true } } return false } -func (t *compactionTrigger) mayUpdateStrictExpiryCompactionTs(signal *compactionSignal, plansSubmitted bool) { +func (t *compactionTrigger) mayUpdateStrictExpiryCompactionTs(signal *compactionSignal, plansSubmitted bool, group *chanPartSegments) { if paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.GetAsInt() > 0 && signal.doStrictExpiryCompaction && plansSubmitted { - cate := CompactionGroupLabel{signal.collectionID, signal.partitionID, signal.channel} + cate := CompactionGroupLabel{group.collectionID, group.partitionID, group.channelName} t.lastStrictExpiryCompactionTsMap[cate] = time.Now() } } @@ -455,7 +460,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error { zap.Int64("time cost", time.Since(start).Milliseconds()), zap.Int64s("segmentIDs", inputSegmentIDs)) } - t.mayUpdateStrictExpiryCompactionTs(signal, plansSubmitted) + t.mayUpdateStrictExpiryCompactionTs(signal, plansSubmitted, &group) } return nil } @@ -680,13 +685,15 @@ func isDeleteRowsTooManySegment(segment *SegmentInfo) bool { return is } -func (t *compactionTrigger) ShouldStrictCompactExpiry(fromTs uint64, compactTime *compactTime, signal *compactionSignal, segID int64) bool { +func (t *compactionTrigger) ShouldStrictCompactExpiry(fromTs uint64, compactTime *compactTime, signal *compactionSignal, segment *SegmentInfo) bool { if signal != nil && signal.doStrictExpiryCompaction && fromTs <= compactTime.expireTime { log.Info("Trigger strict expiry compaction for segment", - zap.Int64("segmentID", segID), - zap.Int64("collectionID", signal.collectionID), - zap.Int64("partition", signal.partitionID), - zap.String("channel", signal.channel), + zap.Int64("segmentID", segment.GetID()), + zap.Int64("collectionID", segment.GetCollectionID()), + zap.Int64("partition", segment.GetPartitionID()), + zap.String("channel", segment.GetInsertChannel()), + zap.Uint64("segment fromTs", fromTs), + zap.Uint64("expireTime", compactTime.expireTime), ) return true } @@ -724,7 +731,7 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa } } - if t.ShouldStrictCompactExpiry(earliestFromTs, compactTime, signal, segment.GetID()) { + if t.ShouldStrictCompactExpiry(earliestFromTs, compactTime, signal, segment) { return true }