fix: [2.5] Fix regeneratePartitionStats failed after restore clusteringCompactionTask (#43206)

issue: #43186 
master pr: #43205

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-07-10 15:30:48 +08:00 committed by GitHub
parent e0729bf1ae
commit fa382ed50a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -355,6 +355,12 @@ func (t *clusteringCompactionTask) processStats() error {
return nil
}
task := t.ShadowClone(setResultSegments(resultSegments))
err := t.saveTaskMeta(task)
if err != nil {
return merr.WrapErrClusteringCompactionMetaError("setResultSegments", err)
}
if err := t.regeneratePartitionStats(tmpToResultSegments); err != nil {
log.Warn("regenerate partition stats failed, wait for retry", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("regeneratePartitionStats", err)
@ -383,7 +389,7 @@ func (t *clusteringCompactionTask) regeneratePartitionStats(tmpToResultSegments
return err
}
partitionStatsFile := path.Join(cli.RootPath(), common.PartitionStatsPath,
metautil.JoinIDPath(t.GetTaskProto().GetCollectionID(), t.GetTaskProto().GetPartitionID()), t.plan.GetChannel(),
metautil.JoinIDPath(t.GetTaskProto().GetCollectionID(), t.GetTaskProto().GetPartitionID()), t.GetTaskProto().GetChannel(),
strconv.FormatInt(t.GetTaskProto().GetPlanID(), 10))
value, err := cli.Read(ctx, partitionStatsFile)
@ -603,14 +609,26 @@ func (t *clusteringCompactionTask) doClean() error {
} else {
// after v2.5.0, mark the results segment as dropped
var operators []UpdateOperator
for _, segID := range t.GetTaskProto().GetResultSegments() {
// Don't worry about them being loaded; they are all invisible.
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
hasResultSegments := len(t.GetTaskProto().GetResultSegments()) != 0
if hasResultSegments {
for _, segID := range t.GetTaskProto().GetResultSegments() {
// Don't worry about them being loaded; they are all invisible.
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
}
}
for _, segID := range t.GetTaskProto().GetTmpSegments() {
// Don't worry about them being loaded; they are all invisible.
// tmpSegment is always invisible
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
if !hasResultSegments {
toSegments, _ := t.meta.(*meta).GetCompactionTo(segID)
if toSegments != nil {
for _, toSeg := range toSegments {
operators = append(operators, UpdateStatusOperator(toSeg.GetID(), commonpb.SegmentState_Dropped))
}
}
}
}
err := t.meta.UpdateSegmentsInfo(context.TODO(), operators...)
if err != nil {