fix: [2.5]Fix sort stats task failed when segment is compacting (#45185)

issue: #45184

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-11-03 11:25:33 +08:00 committed by GitHub
parent 0ca74f234f
commit dfcef7d14d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 113 additions and 3 deletions

View File

@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/metastore"
catalogmocks "github.com/milvus-io/milvus/internal/metastore/mocks"
@ -2394,3 +2395,110 @@ func (s *taskSchedulerSuite) Test_zeroSegmentStats() {
s.Equal(int64(0), segment.NumOfRows)
s.Equal(commonpb.SegmentState_Dropped, segment.State)
}
func (s *taskSchedulerSuite) Test_CompactingSegment() {
tasks := typeutil.NewConcurrentMap[UniqueID, *indexpb.StatsTask]()
st := &indexpb.StatsTask{
CollectionID: 10,
PartitionID: 11,
SegmentID: 12,
InsertChannel: "",
TaskID: 1,
Version: 1,
NodeID: 1,
State: indexpb.JobState_JobStateInit,
FailReason: "",
TargetSegmentID: 2000,
SubJobType: indexpb.StatsSubJob_Sort,
CanRecycle: false,
}
tasks.Insert(1, st)
mt := &meta{
segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
12: {
SegmentInfo: &datapb.SegmentInfo{
ID: 12,
CollectionID: 10,
PartitionID: 11,
State: commonpb.SegmentState_Flushed,
NumOfRows: 20000,
},
isCompacting: true,
},
},
},
statsTaskMeta: &statsTaskMeta{
tasks: tasks,
keyLock: lock.NewKeyLock[UniqueID](),
},
}
handler := NewNMockHandler(s.T())
fieldsSchema := []*schemapb.FieldSchema{
{
FieldID: fieldID,
Name: "vec",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: common.MetricTypeKey,
Value: "L2",
},
{
Key: common.IndexTypeKey,
Value: "HNSW",
},
},
},
{
FieldID: partitionKeyID,
Name: "scalar",
DataType: schemapb.DataType_VarChar,
IsPartitionKey: true,
},
}
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{
ID: 10,
Schema: &schemapb.CollectionSchema{
Name: "coll",
Fields: fieldsSchema,
EnableDynamicField: false,
Properties: nil,
},
}, nil)
alloc := allocator.NewMockAllocator(s.T())
alloc.EXPECT().AllocN(mock.Anything).Return(0, 100, nil)
nm := session.NewMockWorkerManager(s.T())
nm.EXPECT().GetClientByID(mock.Anything).Return(mocks.NewMockIndexNodeClient(s.T()), true)
sch := &taskScheduler{
meta: mt,
handler: handler,
allocator: alloc,
nodeManager: nm,
}
t := &statsTask{
taskID: 1,
segmentID: 12,
taskInfo: &workerpb.StatsResult{
TaskID: 1,
State: indexpb.JobState_JobStateInit,
},
subJobType: indexpb.StatsSubJob_Sort,
}
s.False(sch.process(t, 1))
s.Equal(indexpb.JobState_JobStateNone, t.taskInfo.GetState())
exit, canDo := mt.CheckAndSetSegmentsCompacting(context.Background(), []UniqueID{12})
s.True(exit)
s.False(canDo)
}

View File

@ -143,8 +143,10 @@ func (st *statsTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta
if exist, canDo := meta.CheckAndSetSegmentsCompacting(ctx, []UniqueID{st.segmentID}); !exist || !canDo {
log.Warn("segment is not exist or is compacting, skip stats",
zap.Bool("exist", exist), zap.Bool("canDo", canDo))
// Fail stats task if segment is compacting, it's ok because the segment will be dropped after the compaction.
st.SetState(indexpb.JobState_JobStateFailed, "segment is not healthy")
// Discard this stats task because the segment is either unhealthy or undergoing compaction.
// To prevent the task from incorrectly setting the compacting state.
// Setting it to None will clear the meta tag.
st.SetState(indexpb.JobState_JobStateNone, "segment is not healthy")
st.SetStartTime(time.Now())
return fmt.Errorf("mark segment compacting failed, isCompacting: %v", !canDo)
}
@ -184,7 +186,7 @@ func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bo
// set segment compacting
segment := dependency.meta.GetHealthySegment(ctx, st.segmentID)
if segment == nil {
log.Warn("segment is node healthy, skip stats")
log.Warn("segment is not healthy, skip stats")
st.SetState(indexpb.JobState_JobStateNone, "segment is not healthy")
return false
}