diff --git a/internal/datacoord/task_scheduler_test.go b/internal/datacoord/task_scheduler_test.go index 6bcf4f1bc5..f28199dd50 100644 --- a/internal/datacoord/task_scheduler_test.go +++ b/internal/datacoord/task_scheduler_test.go @@ -29,6 +29,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" + "github.com/hashicorp/golang-lru/v2/expirable" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datacoord/session" @@ -42,6 +43,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/workerpb" + "github.com/milvus-io/milvus/pkg/v2/util/lock" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) @@ -2190,3 +2192,103 @@ func (s *taskSchedulerSuite) Test_reload() { s.Equal(indexpb.JobState_JobStateFailed, task.GetState()) }) } + +func (s *taskSchedulerSuite) Test_zeroSegmentStats() { + ctx := context.Background() + catalog := catalogmocks.NewDataCoordCatalog(s.T()) + taskID := UniqueID(111) + segID := UniqueID(112) + targetSegID := UniqueID(113) + + workerManager := session.NewMockWorkerManager(s.T()) + workerManager.EXPECT().QuerySlots().RunAndReturn(func() map[int64]int64 { + return map[int64]int64{ + 1: 1, + } + }) + + mt := &meta{ + ctx: ctx, + catalog: catalog, + segments: NewSegmentsInfo(), + statsTaskMeta: &statsTaskMeta{ + ctx: ctx, + catalog: catalog, + tasks: map[int64]*indexpb.StatsTask{ + taskID: { + CollectionID: 1, + PartitionID: 2, + SegmentID: segID, + InsertChannel: "ch-1", + TaskID: taskID, + Version: 0, + NodeID: 0, + State: indexpb.JobState_JobStateInit, + FailReason: "", + TargetSegmentID: targetSegID, + SubJobType: indexpb.StatsSubJob_Sort, + CanRecycle: false, + }, + }, + }, + } + + catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil) + err := mt.AddSegment(ctx, &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "ch-1", + State: commonpb.SegmentState_Flushed, + NumOfRows: 0, + }, + }) + s.NoError(err) + cm := mocks.NewChunkManager(s.T()) + catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything, mock.Anything).Return(nil) + catalog.EXPECT().SaveStatsTask(mock.Anything, mock.Anything).Return(nil) + in := mocks.NewMockIndexNodeClient(s.T()) + in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil) + workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true) + + handler := NewNMockHandler(s.T()) + ctx, cancel := context.WithCancel(ctx) + scheduler := &taskScheduler{ + ctx: ctx, + cancel: cancel, + meta: mt, + pendingTasks: newFairQueuePolicy(), + runningTasks: make(map[UniqueID]Task), + notifyChan: make(chan struct{}, 1), + taskLock: lock.NewKeyLock[int64](), + scheduleDuration: Params.DataCoordCfg.IndexTaskSchedulerInterval.GetAsDuration(time.Millisecond), + collectMetricsDuration: time.Minute, + policy: defaultBuildIndexPolicy, + nodeManager: workerManager, + chunkManager: cm, + handler: handler, + indexEngineVersionManager: newIndexEngineVersionManager(), + allocator: nil, + taskStats: expirable.NewLRU[UniqueID, Task](512, nil, time.Minute*15), + compactionHandler: nil, + } + scheduler.Start() + + scheduler.enqueue(newStatsTask(taskID, segID, targetSegID, indexpb.StatsSubJob_Sort)) + for { + time.Sleep(time.Second) + if scheduler.pendingTasks.TaskCount() == 0 { + scheduler.runningQueueLock.RLock() + taskNum := len(scheduler.runningTasks) + scheduler.runningQueueLock.RUnlock() + if taskNum == 0 { + break + } + } + } + scheduler.Stop() + segment := mt.GetSegment(ctx, targetSegID) + s.Equal(int64(0), segment.NumOfRows) + s.Equal(commonpb.SegmentState_Dropped, segment.State) +} diff --git a/internal/datacoord/task_stats.go b/internal/datacoord/task_stats.go index 3ca859efb2..5be68a84eb 100644 --- a/internal/datacoord/task_stats.go +++ b/internal/datacoord/task_stats.go @@ -176,6 +176,20 @@ func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bo return false } + if segment.GetNumOfRows() == 0 { + st.setResult(&workerpb.StatsResult{ + TaskID: st.taskID, + State: indexpb.JobState_JobStateFinished, + FailReason: "segment num row is zero", + CollectionID: st.req.GetCollectionID(), + PartitionID: st.req.GetPartitionID(), + SegmentID: st.targetSegmentID, + Channel: st.req.GetInsertChannel(), + NumRows: 0, + }) + return false + } + if segment.GetIsSorted() && st.subJobType == indexpb.StatsSubJob_Sort { log.Info("stats task is marked as sorted, skip stats") st.SetState(indexpb.JobState_JobStateNone, "segment is marked as sorted")