From fa382ed50a840751d74ee05249902869cfdfbf91 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Thu, 10 Jul 2025 15:30:48 +0800 Subject: [PATCH] fix: [2.5] Fix regeneratePartitionStats failed after restore clusteringCompactionTask (#43206) issue: #43186 master pr: #43205 --------- Signed-off-by: Cai Zhang --- .../datacoord/compaction_task_clustering.go | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 8354dc17eb..ecf352ec91 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -355,6 +355,12 @@ func (t *clusteringCompactionTask) processStats() error { return nil } + task := t.ShadowClone(setResultSegments(resultSegments)) + err := t.saveTaskMeta(task) + if err != nil { + return merr.WrapErrClusteringCompactionMetaError("setResultSegments", err) + } + if err := t.regeneratePartitionStats(tmpToResultSegments); err != nil { log.Warn("regenerate partition stats failed, wait for retry", zap.Error(err)) return merr.WrapErrClusteringCompactionMetaError("regeneratePartitionStats", err) @@ -383,7 +389,7 @@ func (t *clusteringCompactionTask) regeneratePartitionStats(tmpToResultSegments return err } partitionStatsFile := path.Join(cli.RootPath(), common.PartitionStatsPath, - metautil.JoinIDPath(t.GetTaskProto().GetCollectionID(), t.GetTaskProto().GetPartitionID()), t.plan.GetChannel(), + metautil.JoinIDPath(t.GetTaskProto().GetCollectionID(), t.GetTaskProto().GetPartitionID()), t.GetTaskProto().GetChannel(), strconv.FormatInt(t.GetTaskProto().GetPlanID(), 10)) value, err := cli.Read(ctx, partitionStatsFile) @@ -603,14 +609,26 @@ func (t *clusteringCompactionTask) doClean() error { } else { // after v2.5.0, mark the results segment as dropped var operators []UpdateOperator - for _, segID := range t.GetTaskProto().GetResultSegments() { - // Don't worry about them being loaded; they are all invisible. - operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped)) + hasResultSegments := len(t.GetTaskProto().GetResultSegments()) != 0 + if hasResultSegments { + for _, segID := range t.GetTaskProto().GetResultSegments() { + // Don't worry about them being loaded; they are all invisible. + operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped)) + } } + for _, segID := range t.GetTaskProto().GetTmpSegments() { // Don't worry about them being loaded; they are all invisible. // tmpSegment is always invisible operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped)) + if !hasResultSegments { + toSegments, _ := t.meta.(*meta).GetCompactionTo(segID) + if toSegments != nil { + for _, toSeg := range toSegments { + operators = append(operators, UpdateStatusOperator(toSeg.GetID(), commonpb.SegmentState_Dropped)) + } + } + } } err := t.meta.UpdateSegmentsInfo(context.TODO(), operators...) if err != nil {