fix: Skip executing stats for zero segment (#40448)

issue: #40241

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-03-09 21:14:02 +08:00 committed by GitHub
parent faae8ee518
commit d6a650bd14
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 116 additions and 0 deletions

View File

@ -29,6 +29,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datacoord/session"
@ -42,6 +43,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/util/lock"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
@ -2190,3 +2192,103 @@ func (s *taskSchedulerSuite) Test_reload() {
s.Equal(indexpb.JobState_JobStateFailed, task.GetState())
})
}
func (s *taskSchedulerSuite) Test_zeroSegmentStats() {
ctx := context.Background()
catalog := catalogmocks.NewDataCoordCatalog(s.T())
taskID := UniqueID(111)
segID := UniqueID(112)
targetSegID := UniqueID(113)
workerManager := session.NewMockWorkerManager(s.T())
workerManager.EXPECT().QuerySlots().RunAndReturn(func() map[int64]int64 {
return map[int64]int64{
1: 1,
}
})
mt := &meta{
ctx: ctx,
catalog: catalog,
segments: NewSegmentsInfo(),
statsTaskMeta: &statsTaskMeta{
ctx: ctx,
catalog: catalog,
tasks: map[int64]*indexpb.StatsTask{
taskID: {
CollectionID: 1,
PartitionID: 2,
SegmentID: segID,
InsertChannel: "ch-1",
TaskID: taskID,
Version: 0,
NodeID: 0,
State: indexpb.JobState_JobStateInit,
FailReason: "",
TargetSegmentID: targetSegID,
SubJobType: indexpb.StatsSubJob_Sort,
CanRecycle: false,
},
},
},
}
catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil)
err := mt.AddSegment(ctx, &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "ch-1",
State: commonpb.SegmentState_Flushed,
NumOfRows: 0,
},
})
s.NoError(err)
cm := mocks.NewChunkManager(s.T())
catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything, mock.Anything).Return(nil)
catalog.EXPECT().SaveStatsTask(mock.Anything, mock.Anything).Return(nil)
in := mocks.NewMockIndexNodeClient(s.T())
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil)
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true)
handler := NewNMockHandler(s.T())
ctx, cancel := context.WithCancel(ctx)
scheduler := &taskScheduler{
ctx: ctx,
cancel: cancel,
meta: mt,
pendingTasks: newFairQueuePolicy(),
runningTasks: make(map[UniqueID]Task),
notifyChan: make(chan struct{}, 1),
taskLock: lock.NewKeyLock[int64](),
scheduleDuration: Params.DataCoordCfg.IndexTaskSchedulerInterval.GetAsDuration(time.Millisecond),
collectMetricsDuration: time.Minute,
policy: defaultBuildIndexPolicy,
nodeManager: workerManager,
chunkManager: cm,
handler: handler,
indexEngineVersionManager: newIndexEngineVersionManager(),
allocator: nil,
taskStats: expirable.NewLRU[UniqueID, Task](512, nil, time.Minute*15),
compactionHandler: nil,
}
scheduler.Start()
scheduler.enqueue(newStatsTask(taskID, segID, targetSegID, indexpb.StatsSubJob_Sort))
for {
time.Sleep(time.Second)
if scheduler.pendingTasks.TaskCount() == 0 {
scheduler.runningQueueLock.RLock()
taskNum := len(scheduler.runningTasks)
scheduler.runningQueueLock.RUnlock()
if taskNum == 0 {
break
}
}
}
scheduler.Stop()
segment := mt.GetSegment(ctx, targetSegID)
s.Equal(int64(0), segment.NumOfRows)
s.Equal(commonpb.SegmentState_Dropped, segment.State)
}

View File

@ -176,6 +176,20 @@ func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bo
return false
}
if segment.GetNumOfRows() == 0 {
st.setResult(&workerpb.StatsResult{
TaskID: st.taskID,
State: indexpb.JobState_JobStateFinished,
FailReason: "segment num row is zero",
CollectionID: st.req.GetCollectionID(),
PartitionID: st.req.GetPartitionID(),
SegmentID: st.targetSegmentID,
Channel: st.req.GetInsertChannel(),
NumRows: 0,
})
return false
}
if segment.GetIsSorted() && st.subJobType == indexpb.StatsSubJob_Sort {
log.Info("stats task is marked as sorted, skip stats")
st.SetState(indexpb.JobState_JobStateNone, "segment is marked as sorted")