From dfcef7d14d6c6aa88485331ae6ca2fe33978bbe1 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Mon, 3 Nov 2025 11:25:33 +0800 Subject: [PATCH] fix: [2.5]Fix sort stats task failed when segment is compacting (#45185) issue: #45184 Signed-off-by: Cai Zhang --- internal/datacoord/task_scheduler_test.go | 108 ++++++++++++++++++++++ internal/datacoord/task_stats.go | 8 +- 2 files changed, 113 insertions(+), 3 deletions(-) diff --git a/internal/datacoord/task_scheduler_test.go b/internal/datacoord/task_scheduler_test.go index 2c51617a4c..6fe5e48ae3 100644 --- a/internal/datacoord/task_scheduler_test.go +++ b/internal/datacoord/task_scheduler_test.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/session" "github.com/milvus-io/milvus/internal/metastore" catalogmocks "github.com/milvus-io/milvus/internal/metastore/mocks" @@ -2394,3 +2395,110 @@ func (s *taskSchedulerSuite) Test_zeroSegmentStats() { s.Equal(int64(0), segment.NumOfRows) s.Equal(commonpb.SegmentState_Dropped, segment.State) } + +func (s *taskSchedulerSuite) Test_CompactingSegment() { + tasks := typeutil.NewConcurrentMap[UniqueID, *indexpb.StatsTask]() + st := &indexpb.StatsTask{ + CollectionID: 10, + PartitionID: 11, + SegmentID: 12, + InsertChannel: "", + TaskID: 1, + Version: 1, + NodeID: 1, + State: indexpb.JobState_JobStateInit, + FailReason: "", + TargetSegmentID: 2000, + SubJobType: indexpb.StatsSubJob_Sort, + CanRecycle: false, + } + tasks.Insert(1, st) + mt := &meta{ + segments: &SegmentsInfo{ + segments: map[UniqueID]*SegmentInfo{ + 12: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 12, + CollectionID: 10, + PartitionID: 11, + State: commonpb.SegmentState_Flushed, + NumOfRows: 20000, + }, + isCompacting: true, + }, + }, + }, + statsTaskMeta: &statsTaskMeta{ + tasks: tasks, + keyLock: lock.NewKeyLock[UniqueID](), + }, + } + handler := NewNMockHandler(s.T()) + fieldsSchema := []*schemapb.FieldSchema{ + { + FieldID: fieldID, + Name: "vec", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.MetricTypeKey, + Value: "L2", + }, + { + Key: common.IndexTypeKey, + Value: "HNSW", + }, + }, + }, + { + FieldID: partitionKeyID, + Name: "scalar", + DataType: schemapb.DataType_VarChar, + IsPartitionKey: true, + }, + } + handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{ + ID: 10, + Schema: &schemapb.CollectionSchema{ + Name: "coll", + Fields: fieldsSchema, + EnableDynamicField: false, + Properties: nil, + }, + }, nil) + + alloc := allocator.NewMockAllocator(s.T()) + alloc.EXPECT().AllocN(mock.Anything).Return(0, 100, nil) + + nm := session.NewMockWorkerManager(s.T()) + nm.EXPECT().GetClientByID(mock.Anything).Return(mocks.NewMockIndexNodeClient(s.T()), true) + sch := &taskScheduler{ + meta: mt, + handler: handler, + allocator: alloc, + nodeManager: nm, + } + + t := &statsTask{ + taskID: 1, + segmentID: 12, + taskInfo: &workerpb.StatsResult{ + TaskID: 1, + State: indexpb.JobState_JobStateInit, + }, + subJobType: indexpb.StatsSubJob_Sort, + } + + s.False(sch.process(t, 1)) + + s.Equal(indexpb.JobState_JobStateNone, t.taskInfo.GetState()) + exit, canDo := mt.CheckAndSetSegmentsCompacting(context.Background(), []UniqueID{12}) + s.True(exit) + s.False(canDo) +} diff --git a/internal/datacoord/task_stats.go b/internal/datacoord/task_stats.go index 429f40e601..9095f5f5da 100644 --- a/internal/datacoord/task_stats.go +++ b/internal/datacoord/task_stats.go @@ -143,8 +143,10 @@ func (st *statsTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta if exist, canDo := meta.CheckAndSetSegmentsCompacting(ctx, []UniqueID{st.segmentID}); !exist || !canDo { log.Warn("segment is not exist or is compacting, skip stats", zap.Bool("exist", exist), zap.Bool("canDo", canDo)) - // Fail stats task if segment is compacting, it's ok because the segment will be dropped after the compaction. - st.SetState(indexpb.JobState_JobStateFailed, "segment is not healthy") + // Discard this stats task because the segment is either unhealthy or undergoing compaction. + // To prevent the task from incorrectly setting the compacting state. + // Setting it to None will clear the meta tag. + st.SetState(indexpb.JobState_JobStateNone, "segment is not healthy") st.SetStartTime(time.Now()) return fmt.Errorf("mark segment compacting failed, isCompacting: %v", !canDo) } @@ -184,7 +186,7 @@ func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bo // set segment compacting segment := dependency.meta.GetHealthySegment(ctx, st.segmentID) if segment == nil { - log.Warn("segment is node healthy, skip stats") + log.Warn("segment is not healthy, skip stats") st.SetState(indexpb.JobState_JobStateNone, "segment is not healthy") return false }