From adce6aab4f9e36cab6cac8a76de5d5bc498f9177 Mon Sep 17 00:00:00 2001 From: smellthemoon <64083300+smellthemoon@users.noreply.github.com> Date: Thu, 30 Nov 2023 17:06:31 +0800 Subject: [PATCH] fix:handle err when ManualCompaction (#28804) handle err when ManualCompaction #28644 Signed-off-by: lixinguo Co-authored-by: lixinguo --- internal/datacoord/compaction_trigger.go | 53 +++++++++++++++--------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 58f3914bad..e5cdafd76d 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -119,8 +119,14 @@ func (t *compactionTrigger) start() { case signal := <-t.signals: switch { case signal.isGlobal: - t.handleGlobalSignal(signal) + // ManualCompaction also use use handleGlobalSignal + // so throw err here + err := t.handleGlobalSignal(signal) + if err != nil { + log.Warn("unable to handleGlobalSignal", zap.Error(err)) + } default: + // no need to handle err in handleSignal t.handleSignal(signal) // shouldn't reset, otherwise a frequent flushed collection will affect other collections // t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval) @@ -262,7 +268,13 @@ func (t *compactionTrigger) forceTriggerCompaction(collectionID int64) (UniqueID isGlobal: true, collectionID: collectionID, } - t.handleGlobalSignal(signal) + + err = t.handleGlobalSignal(signal) + if err != nil { + log.Warn("unable to handleGlobalSignal", zap.Error(err)) + return -1, err + } + return id, nil } @@ -332,11 +344,14 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, return isDiskANN, nil } -func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { +func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { t.forceMu.Lock() defer t.forceMu.Unlock() - log := log.With(zap.Int64("compactionID", signal.id)) + log := log.With(zap.Int64("compactionID", signal.id), + zap.Int64("signal.collectionID", signal.collectionID), + zap.Int64("signal.partitionID", signal.partitionID), + zap.Int64("signal.segmentID", signal.segmentID)) m := t.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool { return (signal.collectionID == 0 || segment.CollectionID == signal.collectionID) && isSegmentHealthy(segment) && @@ -346,20 +361,22 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { }) // m is list of chanPartSegments, which is channel-partition organized segments if len(m) == 0 { - return + log.Info("the length of SegmentsChanPart is 0, skip to handle compaction") + return nil } ts, err := t.allocTs() if err != nil { - log.Warn("allocate ts failed, skip to handle compaction", - zap.Int64("collectionID", signal.collectionID), - zap.Int64("partitionID", signal.partitionID), - zap.Int64("segmentID", signal.segmentID)) - return + log.Warn("allocate ts failed, skip to handle compaction") + return err } for _, group := range m { + log := log.With(zap.Int64("collectionID", group.collectionID), + zap.Int64("partitionID", group.partitionID), + zap.String("channel", group.channelName)) if !signal.isForce && t.compactionHandler.isFull() { + log.Warn("compaction plan skipped due to handler full") break } if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() { @@ -374,20 +391,15 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { coll, err := t.getCollection(group.collectionID) if err != nil { - log.Warn("get collection info failed, skip handling compaction", - zap.Int64("collectionID", group.collectionID), - zap.Int64("partitionID", group.partitionID), - zap.String("channel", group.channelName), - zap.Error(err), - ) - return + log.Warn("get collection info failed, skip handling compaction", zap.Error(err)) + return err } if !signal.isForce && !t.isCollectionAutoCompactionEnabled(coll) { log.RatedInfo(20, "collection auto compaction disabled", zap.Int64("collectionID", group.collectionID), ) - return + return nil } ct, err := t.getCompactTime(ts, coll) @@ -396,7 +408,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { zap.Int64("collectionID", group.collectionID), zap.Int64("partitionID", group.partitionID), zap.String("channel", group.channelName)) - return + return err } plans := t.generatePlans(group.segments, signal.isForce, isDiskIndex, ct) @@ -442,6 +454,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { zap.Int64s("segmentIDs", segIDs)) } } + return nil } // handleSignal processes segment flush caused partition-chan level compaction signal @@ -451,6 +464,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { // 1. check whether segment's binlogs should be compacted or not if t.compactionHandler.isFull() { + log.Warn("compaction plan skipped due to handler full") return } @@ -466,6 +480,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { segments := t.getCandidateSegments(channel, partitionID) if len(segments) == 0 { + log.Info("the length of segments is 0, skip to handle compaction") return }