fix: failed to reset time point for force expiry compaction(#41855) (#42000)

related: #41855

Signed-off-by: MrPresent-Han <chun.han@gmail.com>
Co-authored-by: MrPresent-Han <chun.han@gmail.com>
This commit is contained in:
Chun Han 2025-05-22 12:14:25 +08:00 committed by GitHub
parent 19ab9513ff
commit a6d4b47879
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -326,19 +326,24 @@ func (t *compactionTrigger) allocSignalID(ctx context.Context) (UniqueID, error)
}
func (t *compactionTrigger) shouldDoStrictExpiryCompaction(group *chanPartSegments) bool {
if paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.GetAsInt() > 0 {
if group != nil && paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.GetAsInt() > 0 {
cate := CompactionGroupLabel{group.collectionID, group.partitionID, group.channelName}
lastExpiryCompactionTime, ok := t.lastStrictExpiryCompactionTsMap[cate]
if !ok || time.Since(lastExpiryCompactionTime) >= paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.GetAsDuration(time.Hour) {
log.Info("Try to do StrictExpiryCompaction",
zap.Duration("duration", time.Since(lastExpiryCompactionTime)),
zap.Int64("CollectionID", group.collectionID),
zap.Int64("PartitionID", group.partitionID),
zap.String("Channel", group.channelName))
return true
}
}
return false
}
func (t *compactionTrigger) mayUpdateStrictExpiryCompactionTs(signal *compactionSignal, plansSubmitted bool) {
func (t *compactionTrigger) mayUpdateStrictExpiryCompactionTs(signal *compactionSignal, plansSubmitted bool, group *chanPartSegments) {
if paramtable.Get().DataCoordCfg.CompactionForceExpiryInterval.GetAsInt() > 0 && signal.doStrictExpiryCompaction && plansSubmitted {
cate := CompactionGroupLabel{signal.collectionID, signal.partitionID, signal.channel}
cate := CompactionGroupLabel{group.collectionID, group.partitionID, group.channelName}
t.lastStrictExpiryCompactionTsMap[cate] = time.Now()
}
}
@ -455,7 +460,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) error {
zap.Int64("time cost", time.Since(start).Milliseconds()),
zap.Int64s("segmentIDs", inputSegmentIDs))
}
t.mayUpdateStrictExpiryCompactionTs(signal, plansSubmitted)
t.mayUpdateStrictExpiryCompactionTs(signal, plansSubmitted, &group)
}
return nil
}
@ -680,13 +685,15 @@ func isDeleteRowsTooManySegment(segment *SegmentInfo) bool {
return is
}
func (t *compactionTrigger) ShouldStrictCompactExpiry(fromTs uint64, compactTime *compactTime, signal *compactionSignal, segID int64) bool {
func (t *compactionTrigger) ShouldStrictCompactExpiry(fromTs uint64, compactTime *compactTime, signal *compactionSignal, segment *SegmentInfo) bool {
if signal != nil && signal.doStrictExpiryCompaction && fromTs <= compactTime.expireTime {
log.Info("Trigger strict expiry compaction for segment",
zap.Int64("segmentID", segID),
zap.Int64("collectionID", signal.collectionID),
zap.Int64("partition", signal.partitionID),
zap.String("channel", signal.channel),
zap.Int64("segmentID", segment.GetID()),
zap.Int64("collectionID", segment.GetCollectionID()),
zap.Int64("partition", segment.GetPartitionID()),
zap.String("channel", segment.GetInsertChannel()),
zap.Uint64("segment fromTs", fromTs),
zap.Uint64("expireTime", compactTime.expireTime),
)
return true
}
@ -724,7 +731,7 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compa
}
}
if t.ShouldStrictCompactExpiry(earliestFromTs, compactTime, signal, segment.GetID()) {
if t.ShouldStrictCompactExpiry(earliestFromTs, compactTime, signal, segment) {
return true
}