diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index dedf65d643..d6b4d46f45 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -27,7 +27,6 @@ import ( "github.com/hashicorp/golang-lru/v2/expirable" "github.com/prometheus/client_golang/prometheus" "github.com/samber/lo" - "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -66,8 +65,6 @@ type indexMeta struct { // segmentID -> indexID -> segmentIndex segmentIndexes map[UniqueID]map[UniqueID]*model.SegmentIndex - - lastUpdateMetricTime atomic.Time } func newIndexTaskStats(s *model.SegmentIndex) *metricsinfo.IndexTaskStats { @@ -209,10 +206,9 @@ func (m *indexMeta) updateSegIndexMeta(segIdx *model.SegmentIndex, updateFunc fu } func (m *indexMeta) updateIndexTasksMetrics() { - if time.Since(m.lastUpdateMetricTime.Load()) < 120*time.Second { - return - } - defer m.lastUpdateMetricTime.Store(time.Now()) + m.RLock() + defer m.RUnlock() + taskMetrics := make(map[UniqueID]map[commonpb.IndexState]int) for _, segIdx := range m.segmentBuildInfo.List() { if segIdx.IsDeleted || !m.isIndexExist(segIdx.CollectionID, segIdx.IndexID) { @@ -446,7 +442,6 @@ func (m *indexMeta) AddSegmentIndex(ctx context.Context, segIndex *model.Segment log.Ctx(ctx).Info("meta update: adding segment index success", zap.Int64("collectionID", segIndex.CollectionID), zap.Int64("segmentID", segIndex.SegmentID), zap.Int64("indexID", segIndex.IndexID), zap.Int64("buildID", buildID)) - m.updateIndexTasksMetrics() return nil } @@ -823,7 +818,6 @@ func (m *indexMeta) FinishTask(taskInfo *workerpb.IndexTaskInfo) error { zap.String("state", taskInfo.GetState().String()), zap.String("fail reason", taskInfo.GetFailReason()), zap.Int32("current_index_version", taskInfo.GetCurrentIndexVersion()), ) - m.updateIndexTasksMetrics() metrics.FlushedSegmentFileNum.WithLabelValues(metrics.IndexFileLabel).Observe(float64(len(taskInfo.GetIndexFileKeys()))) return nil } @@ -848,7 +842,6 @@ func (m *indexMeta) DeleteTask(buildID int64) error { } log.Ctx(m.ctx).Info("delete index task success", zap.Int64("buildID", buildID)) - m.updateIndexTasksMetrics() return nil } @@ -877,8 +870,6 @@ func (m *indexMeta) BuildIndex(buildID UniqueID) error { } log.Ctx(m.ctx).Info("meta update: segment index in progress success", zap.Int64("buildID", segIdx.BuildID), zap.Int64("segmentID", segIdx.SegmentID)) - - m.updateIndexTasksMetrics() return nil } @@ -931,7 +922,6 @@ func (m *indexMeta) RemoveSegmentIndex(ctx context.Context, collID, partID, segI } m.segmentBuildInfo.Remove(buildID) - m.updateIndexTasksMetrics() return nil } diff --git a/internal/datacoord/job_manager.go b/internal/datacoord/job_manager.go index eee196a259..7c6766ab19 100644 --- a/internal/datacoord/job_manager.go +++ b/internal/datacoord/job_manager.go @@ -110,7 +110,7 @@ func (jm *statsJobManager) triggerSortStatsTask() { })) for _, segment := range visibleSegments { - if jm.scheduler.GetTaskCount() > Params.DataCoordCfg.StatsTaskTriggerCount.GetAsInt() { + if jm.scheduler.pendingTasks.TaskCount() > Params.DataCoordCfg.StatsTaskTriggerCount.GetAsInt() { break } jm.createSortStatsTaskForSegment(segment) @@ -261,7 +261,7 @@ func (jm *statsJobManager) cleanupStatsTasksLoop() { taskIDs := jm.mt.statsTaskMeta.CanCleanedTasks() for _, taskID := range taskIDs { // waiting for queue processing tasks to complete - if jm.scheduler.getTask(taskID) == nil { + if !jm.scheduler.exist(taskID) { if err := jm.mt.statsTaskMeta.DropStatsTask(taskID); err != nil { // ignore err, if remove failed, wait next GC log.Warn("clean up stats task failed", zap.Int64("taskID", taskID), zap.Error(err)) diff --git a/internal/datacoord/job_manager_test.go b/internal/datacoord/job_manager_test.go index 53e784c0b8..c30be4b53b 100644 --- a/internal/datacoord/job_manager_test.go +++ b/internal/datacoord/job_manager_test.go @@ -106,10 +106,11 @@ func (s *jobManagerSuite) TestJobManager_triggerStatsTaskLoop() { loopWg: sync.WaitGroup{}, mt: mt, scheduler: &taskScheduler{ - allocator: alloc, - tasks: make(map[int64]Task), - meta: mt, - taskStats: expirable.NewLRU[UniqueID, Task](512, nil, time.Minute*5), + allocator: alloc, + pendingTasks: newFairQueuePolicy(), + runningTasks: make(map[UniqueID]Task), + meta: mt, + taskStats: expirable.NewLRU[UniqueID, Task](512, nil, time.Minute*5), }, allocator: alloc, } @@ -122,5 +123,5 @@ func (s *jobManagerSuite) TestJobManager_triggerStatsTaskLoop() { jm.loopWg.Wait() - s.Equal(3, len(jm.scheduler.tasks)) + s.Equal(3, jm.scheduler.pendingTasks.TaskCount()) } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 648b8796a9..e2225e1abc 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -754,11 +754,34 @@ func (s *Server) startServerLoop() { } } +func (s *Server) startCollectMetaMetrics(ctx context.Context) { + s.serverLoopWg.Add(1) + go s.collectMetaMetrics(ctx) +} + +func (s *Server) collectMetaMetrics(ctx context.Context) { + defer s.serverLoopWg.Done() + + ticker := time.NewTicker(time.Second * 120) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + log.Ctx(s.ctx).Warn("collectMetaMetrics ctx done") + return + case <-ticker.C: + s.meta.statsTaskMeta.updateMetrics() + s.meta.indexMeta.updateIndexTasksMetrics() + } + } +} + func (s *Server) startTaskScheduler() { s.taskScheduler.Start() s.jobManager.Start() s.startIndexService(s.serverLoopCtx) + s.startCollectMetaMetrics(s.serverLoopCtx) } func (s *Server) updateSegmentStatistics(ctx context.Context, stats []*commonpb.SegmentStats) { @@ -1082,6 +1105,7 @@ func (s *Server) Stop() error { log.Info("datacoord garbage collector stopped") s.stopServerLoop() + log.Info("datacoord stopServerLoop stopped") s.importScheduler.Close() s.importChecker.Close() diff --git a/internal/datacoord/session/indexnode_manager.go b/internal/datacoord/session/indexnode_manager.go index 65f42828a4..178951a3f6 100644 --- a/internal/datacoord/session/indexnode_manager.go +++ b/internal/datacoord/session/indexnode_manager.go @@ -44,6 +44,7 @@ type WorkerManager interface { RemoveNode(nodeID typeutil.UniqueID) StoppingNode(nodeID typeutil.UniqueID) PickClient() (typeutil.UniqueID, types.IndexNodeClient) + QuerySlots() map[int64]int64 ClientSupportDisk() bool GetAllClients() map[typeutil.UniqueID]types.IndexNodeClient GetClientByID(nodeID typeutil.UniqueID) (types.IndexNodeClient, bool) @@ -115,6 +116,42 @@ func (nm *IndexNodeManager) AddNode(nodeID typeutil.UniqueID, address string) er return nil } +func (nm *IndexNodeManager) QuerySlots() map[int64]int64 { + nm.lock.Lock() + defer nm.lock.Unlock() + + ctx, cancel := context.WithTimeout(context.Background(), querySlotTimeout) + defer cancel() + + nodeSlots := make(map[int64]int64) + mu := &sync.Mutex{} + wg := &sync.WaitGroup{} + for nodeID, client := range nm.nodeClients { + if _, ok := nm.stoppingNodes[nodeID]; !ok { + wg.Add(1) + go func(nodeID int64) { + defer wg.Done() + resp, err := client.GetJobStats(ctx, &workerpb.GetJobStatsRequest{}) + if err != nil { + log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID), zap.Error(err)) + return + } + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { + log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID), + zap.String("reason", resp.GetStatus().GetReason())) + return + } + mu.Lock() + defer mu.Unlock() + nodeSlots[nodeID] = resp.GetTaskSlots() + }(nodeID) + } + } + wg.Wait() + log.Ctx(context.TODO()).Debug("query slot done", zap.Any("nodeSlots", nodeSlots)) + return nodeSlots +} + func (nm *IndexNodeManager) PickClient() (typeutil.UniqueID, types.IndexNodeClient) { nm.lock.Lock() defer nm.lock.Unlock() diff --git a/internal/datacoord/session/mock_worker_manager.go b/internal/datacoord/session/mock_worker_manager.go index 556f590a10..4ecab8be08 100644 --- a/internal/datacoord/session/mock_worker_manager.go +++ b/internal/datacoord/session/mock_worker_manager.go @@ -274,6 +274,53 @@ func (_c *MockWorkerManager_PickClient_Call) RunAndReturn(run func() (int64, typ return _c } +// QuerySlots provides a mock function with given fields: +func (_m *MockWorkerManager) QuerySlots() map[int64]int64 { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for QuerySlots") + } + + var r0 map[int64]int64 + if rf, ok := ret.Get(0).(func() map[int64]int64); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64]int64) + } + } + + return r0 +} + +// MockWorkerManager_QuerySlots_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QuerySlots' +type MockWorkerManager_QuerySlots_Call struct { + *mock.Call +} + +// QuerySlots is a helper method to define mock.On call +func (_e *MockWorkerManager_Expecter) QuerySlots() *MockWorkerManager_QuerySlots_Call { + return &MockWorkerManager_QuerySlots_Call{Call: _e.mock.On("QuerySlots")} +} + +func (_c *MockWorkerManager_QuerySlots_Call) Run(run func()) *MockWorkerManager_QuerySlots_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockWorkerManager_QuerySlots_Call) Return(_a0 map[int64]int64) *MockWorkerManager_QuerySlots_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWorkerManager_QuerySlots_Call) RunAndReturn(run func() map[int64]int64) *MockWorkerManager_QuerySlots_Call { + _c.Call.Return(run) + return _c +} + // RemoveNode provides a mock function with given fields: nodeID func (_m *MockWorkerManager) RemoveNode(nodeID int64) { _m.Called(nodeID) diff --git a/internal/datacoord/stats_task_meta.go b/internal/datacoord/stats_task_meta.go index d5de7063d7..c7b7c9607b 100644 --- a/internal/datacoord/stats_task_meta.go +++ b/internal/datacoord/stats_task_meta.go @@ -73,6 +73,9 @@ func (stm *statsTaskMeta) reloadFromKV() error { } func (stm *statsTaskMeta) updateMetrics() { + stm.RLock() + defer stm.RUnlock() + taskMetrics := make(map[UniqueID]map[indexpb.JobState]int) for _, t := range stm.tasks { if _, ok := taskMetrics[t.GetCollectionID()]; !ok { @@ -122,7 +125,6 @@ func (stm *statsTaskMeta) AddStatsTask(t *indexpb.StatsTask) error { } stm.tasks[t.GetTaskID()] = t - stm.updateMetrics() log.Info("add stats task success", zap.Int64("taskID", t.GetTaskID()), zap.Int64("originSegmentID", t.GetSegmentID()), zap.Int64("targetSegmentID", t.GetTargetSegmentID()), zap.String("subJobType", t.GetSubJobType().String())) @@ -149,7 +151,6 @@ func (stm *statsTaskMeta) DropStatsTask(taskID int64) error { } delete(stm.tasks, taskID) - stm.updateMetrics() log.Info("remove stats task success", zap.Int64("taskID", taskID), zap.Int64("segmentID", t.SegmentID)) return nil @@ -178,7 +179,6 @@ func (stm *statsTaskMeta) UpdateVersion(taskID, nodeID int64) error { } stm.tasks[t.TaskID] = cloneT - stm.updateMetrics() log.Info("update stats task version success", zap.Int64("taskID", taskID), zap.Int64("nodeID", nodeID), zap.Int64("newVersion", cloneT.GetVersion())) return nil @@ -205,7 +205,6 @@ func (stm *statsTaskMeta) UpdateBuildingTask(taskID int64) error { } stm.tasks[t.TaskID] = cloneT - stm.updateMetrics() log.Info("update building stats task success", zap.Int64("taskID", taskID)) return nil @@ -233,7 +232,6 @@ func (stm *statsTaskMeta) FinishTask(taskID int64, result *workerpb.StatsResult) } stm.tasks[t.TaskID] = cloneT - stm.updateMetrics() log.Info("finish stats task meta success", zap.Int64("taskID", taskID), zap.Int64("segmentID", t.SegmentID), zap.String("state", result.GetState().String()), zap.String("failReason", t.GetFailReason())) @@ -334,7 +332,6 @@ func (stm *statsTaskMeta) MarkTaskCanRecycle(taskID int64) error { } stm.tasks[t.TaskID] = cloneT - stm.updateMetrics() log.Info("mark stats task can recycle success", zap.Int64("taskID", taskID), zap.Int64("segmentID", t.SegmentID), diff --git a/internal/datacoord/task_index.go b/internal/datacoord/task_index.go index a333302642..237023b453 100644 --- a/internal/datacoord/task_index.go +++ b/internal/datacoord/task_index.go @@ -103,8 +103,8 @@ func (it *indexBuildTask) GetTaskType() string { } func (it *indexBuildTask) CheckTaskHealthy(mt *meta) bool { - _, exist := mt.indexMeta.GetIndexJob(it.GetTaskID()) - return exist + job, exist := mt.indexMeta.GetIndexJob(it.GetTaskID()) + return exist && !job.IsDeleted } func (it *indexBuildTask) SetState(state indexpb.JobState, failReason string) { @@ -262,7 +262,8 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule log.Ctx(ctx).Info("index task pre check successfully", zap.Int64("taskID", it.GetTaskID()), zap.Int64("segID", segment.GetID()), zap.Int32("CurrentIndexVersion", it.req.GetCurrentIndexVersion()), - zap.Int32("CurrentScalarIndexVersion", it.req.GetCurrentScalarIndexVersion())) + zap.Int32("CurrentScalarIndexVersion", it.req.GetCurrentScalarIndexVersion()), + zap.Int64("segID", segment.GetID())) return true } diff --git a/internal/datacoord/task_queue.go b/internal/datacoord/task_queue.go new file mode 100644 index 0000000000..eb3c489e1d --- /dev/null +++ b/internal/datacoord/task_queue.go @@ -0,0 +1,143 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datacoord + +import ( + "sync" +) + +// schedulePolicy is the policy of scheduler. +type schedulePolicy interface { + Push(task Task) + // Pop get the task next ready to run. + Pop() Task + BatchPop(batch int) []Task + Get(taskID UniqueID) Task + Keys() []UniqueID + TaskCount() int + Exist(taskID UniqueID) bool + Remove(taskID UniqueID) +} + +var _ schedulePolicy = &fairQueuePolicy{} + +type fairQueuePolicy struct { + tasks map[UniqueID]Task + taskIDs []UniqueID + lock sync.RWMutex +} + +func newFairQueuePolicy() *fairQueuePolicy { + return &fairQueuePolicy{ + tasks: make(map[UniqueID]Task, 0), + taskIDs: make([]UniqueID, 0), + lock: sync.RWMutex{}, + } +} + +func (fqp *fairQueuePolicy) Push(t Task) { + fqp.lock.Lock() + defer fqp.lock.Unlock() + if _, ok := fqp.tasks[t.GetTaskID()]; !ok { + fqp.tasks[t.GetTaskID()] = t + fqp.taskIDs = append(fqp.taskIDs, t.GetTaskID()) + } +} + +func (fqp *fairQueuePolicy) Pop() Task { + fqp.lock.Lock() + defer fqp.lock.Unlock() + if len(fqp.taskIDs) == 0 { + return nil + } + taskID := fqp.taskIDs[0] + fqp.taskIDs = fqp.taskIDs[1:] + task := fqp.tasks[taskID] + delete(fqp.tasks, taskID) + return task +} + +func (fqp *fairQueuePolicy) BatchPop(batch int) []Task { + fqp.lock.Lock() + defer fqp.lock.Unlock() + tasks := make([]Task, 0) + if len(fqp.taskIDs) <= batch { + for _, taskID := range fqp.taskIDs { + task := fqp.tasks[taskID] + delete(fqp.tasks, taskID) + tasks = append(tasks, task) + } + fqp.taskIDs = make([]UniqueID, 0) + return tasks + } + + taskIDs := fqp.taskIDs[:batch] + for _, taskID := range taskIDs { + task := fqp.tasks[taskID] + delete(fqp.tasks, taskID) + tasks = append(tasks, task) + } + fqp.taskIDs = fqp.taskIDs[batch:] + return tasks +} + +func (fqp *fairQueuePolicy) Get(taskID UniqueID) Task { + fqp.lock.RLock() + defer fqp.lock.RUnlock() + if len(fqp.taskIDs) == 0 { + return nil + } + task := fqp.tasks[taskID] + return task +} + +func (fqp *fairQueuePolicy) TaskCount() int { + fqp.lock.RLock() + defer fqp.lock.RUnlock() + return len(fqp.taskIDs) +} + +func (fqp *fairQueuePolicy) Exist(taskID UniqueID) bool { + fqp.lock.RLock() + defer fqp.lock.RUnlock() + _, ok := fqp.tasks[taskID] + return ok +} + +func (fqp *fairQueuePolicy) Remove(taskID UniqueID) { + fqp.lock.Lock() + defer fqp.lock.Unlock() + + taskIndex := -1 + for i := range fqp.taskIDs { + if fqp.taskIDs[i] == taskID { + taskIndex = i + break + } + } + if taskIndex != -1 { + fqp.taskIDs = append(fqp.taskIDs[:taskIndex], fqp.taskIDs[taskIndex+1:]...) + delete(fqp.tasks, taskID) + } +} + +func (fqp *fairQueuePolicy) Keys() []UniqueID { + fqp.lock.RLock() + defer fqp.lock.RUnlock() + + return fqp.taskIDs +} diff --git a/internal/datacoord/task_scheduler.go b/internal/datacoord/task_scheduler.go index d1baf94059..fb80b0f377 100644 --- a/internal/datacoord/task_scheduler.go +++ b/internal/datacoord/task_scheduler.go @@ -40,8 +40,6 @@ const ( ) type taskScheduler struct { - sync.RWMutex - ctx context.Context cancel context.CancelFunc wg sync.WaitGroup @@ -49,10 +47,13 @@ type taskScheduler struct { scheduleDuration time.Duration collectMetricsDuration time.Duration - // TODO @xiaocai2333: use priority queue - tasks map[int64]Task + pendingTasks schedulePolicy + runningTasks map[UniqueID]Task + runningQueueLock sync.RWMutex + + taskLock *lock.KeyLock[int64] + notifyChan chan struct{} - taskLock *lock.KeyLock[int64] meta *meta @@ -82,7 +83,8 @@ func newTaskScheduler( ctx: ctx, cancel: cancel, meta: metaTable, - tasks: make(map[int64]Task), + pendingTasks: newFairQueuePolicy(), + runningTasks: make(map[UniqueID]Task), notifyChan: make(chan struct{}, 1), taskLock: lock.NewKeyLock[int64](), scheduleDuration: Params.DataCoordCfg.IndexTaskSchedulerInterval.GetAsDuration(time.Millisecond), @@ -101,9 +103,10 @@ func newTaskScheduler( } func (s *taskScheduler) Start() { - s.wg.Add(2) + s.wg.Add(3) go s.schedule() go s.collectTaskMetrics() + go s.checkProcessingTasksLoop() } func (s *taskScheduler) Stop() { @@ -118,86 +121,100 @@ func (s *taskScheduler) reloadFromMeta() { if segIndex.IsDeleted { continue } - if segIndex.IndexState != commonpb.IndexState_Finished && segIndex.IndexState != commonpb.IndexState_Failed { - s.enqueue(&indexBuildTask{ - taskID: segIndex.BuildID, - nodeID: segIndex.NodeID, - taskInfo: &workerpb.IndexTaskInfo{ - BuildID: segIndex.BuildID, - State: segIndex.IndexState, - FailReason: segIndex.FailReason, - }, - queueTime: time.Now(), - startTime: time.Now(), - endTime: time.Now(), - }) + task := &indexBuildTask{ + taskID: segIndex.BuildID, + nodeID: segIndex.NodeID, + taskInfo: &workerpb.IndexTaskInfo{ + BuildID: segIndex.BuildID, + State: segIndex.IndexState, + FailReason: segIndex.FailReason, + }, + queueTime: time.Now(), + startTime: time.Now(), + endTime: time.Now(), + } + switch segIndex.IndexState { + case commonpb.IndexState_IndexStateNone, commonpb.IndexState_Unissued: + s.pendingTasks.Push(task) + case commonpb.IndexState_InProgress, commonpb.IndexState_Retry: + s.runningQueueLock.Lock() + s.runningTasks[segIndex.BuildID] = task + s.runningQueueLock.Unlock() } } } allAnalyzeTasks := s.meta.analyzeMeta.GetAllTasks() for taskID, t := range allAnalyzeTasks { - if t.State != indexpb.JobState_JobStateFinished && t.State != indexpb.JobState_JobStateFailed { - s.enqueue(&analyzeTask{ - taskID: taskID, - nodeID: t.NodeID, - taskInfo: &workerpb.AnalyzeResult{ - TaskID: taskID, - State: t.State, - FailReason: t.FailReason, - }, - queueTime: time.Now(), - startTime: time.Now(), - endTime: time.Now(), - }) + task := &analyzeTask{ + taskID: taskID, + nodeID: t.NodeID, + taskInfo: &workerpb.AnalyzeResult{ + TaskID: taskID, + State: t.State, + FailReason: t.FailReason, + }, + queueTime: time.Now(), + startTime: time.Now(), + endTime: time.Now(), + } + switch t.State { + case indexpb.JobState_JobStateNone, indexpb.JobState_JobStateInit: + s.pendingTasks.Push(task) + case indexpb.JobState_JobStateInProgress, indexpb.JobState_JobStateRetry: + s.runningQueueLock.Lock() + s.runningTasks[taskID] = task + s.runningQueueLock.Unlock() } } allStatsTasks := s.meta.statsTaskMeta.GetAllTasks() for taskID, t := range allStatsTasks { - if t.GetState() != indexpb.JobState_JobStateFinished && t.GetState() != indexpb.JobState_JobStateFailed { - if t.GetState() == indexpb.JobState_JobStateInProgress || t.GetState() == indexpb.JobState_JobStateRetry { - if t.GetState() == indexpb.JobState_JobStateInProgress || t.GetState() == indexpb.JobState_JobStateRetry { - exist, canDo := s.meta.CheckAndSetSegmentsCompacting(context.TODO(), []UniqueID{t.GetSegmentID()}) - if !exist || !canDo { - log.Ctx(s.ctx).Warn("segment is not exist or is compacting, skip stats, but this should not have happened, try to remove the stats task", - zap.Int64("taskID", taskID), zap.Bool("exist", exist), zap.Bool("canDo", canDo)) - err := s.meta.statsTaskMeta.DropStatsTask(t.GetTaskID()) - if err == nil { - continue - } - log.Ctx(s.ctx).Warn("remove stats task failed, set to failed", zap.Int64("taskID", taskID), zap.Error(err)) - t.State = indexpb.JobState_JobStateFailed - t.FailReason = "segment is not exist or is compacting" - } else { - if !s.compactionHandler.checkAndSetSegmentStating(t.GetInsertChannel(), t.GetSegmentID()) { - s.meta.SetSegmentsCompacting(context.TODO(), []UniqueID{t.GetSegmentID()}, false) - err := s.meta.statsTaskMeta.DropStatsTask(t.GetTaskID()) - if err == nil { - continue - } - log.Ctx(s.ctx).Warn("remove stats task failed, set to failed", zap.Int64("taskID", taskID), zap.Error(err)) - t.State = indexpb.JobState_JobStateFailed - t.FailReason = "segment is not exist or is l0 compacting" - } + task := &statsTask{ + taskID: taskID, + segmentID: t.GetSegmentID(), + targetSegmentID: t.GetTargetSegmentID(), + nodeID: t.NodeID, + taskInfo: &workerpb.StatsResult{ + TaskID: taskID, + State: t.GetState(), + FailReason: t.GetFailReason(), + }, + queueTime: time.Now(), + startTime: time.Now(), + endTime: time.Now(), + subJobType: t.GetSubJobType(), + } + switch t.GetState() { + case indexpb.JobState_JobStateNone, indexpb.JobState_JobStateInit: + s.pendingTasks.Push(task) + case indexpb.JobState_JobStateInProgress, indexpb.JobState_JobStateRetry: + exist, canDo := s.meta.CheckAndSetSegmentsCompacting(context.TODO(), []UniqueID{t.GetSegmentID()}) + if !exist || !canDo { + log.Ctx(s.ctx).Warn("segment is not exist or is compacting, skip stats, but this should not have happened, try to remove the stats task", + zap.Int64("taskID", taskID), zap.Bool("exist", exist), zap.Bool("canDo", canDo)) + err := s.meta.statsTaskMeta.DropStatsTask(t.GetTaskID()) + if err == nil { + continue + } + log.Ctx(s.ctx).Warn("remove stats task failed, set to failed", zap.Int64("taskID", taskID), zap.Error(err)) + task.taskInfo.State = indexpb.JobState_JobStateFailed + task.taskInfo.FailReason = "segment is not exist or is compacting" + } else { + if !s.compactionHandler.checkAndSetSegmentStating(t.GetInsertChannel(), t.GetSegmentID()) { + s.meta.SetSegmentsCompacting(context.TODO(), []UniqueID{t.GetSegmentID()}, false) + err := s.meta.statsTaskMeta.DropStatsTask(t.GetTaskID()) + if err == nil { + continue } + log.Ctx(s.ctx).Warn("remove stats task failed, set to failed", zap.Int64("taskID", taskID), zap.Error(err)) + task.taskInfo.State = indexpb.JobState_JobStateFailed + task.taskInfo.FailReason = "segment is not exist or is l0 compacting" } } - s.enqueue(&statsTask{ - taskID: taskID, - segmentID: t.GetSegmentID(), - targetSegmentID: t.GetTargetSegmentID(), - nodeID: t.NodeID, - taskInfo: &workerpb.StatsResult{ - TaskID: taskID, - State: t.GetState(), - FailReason: t.GetFailReason(), - }, - queueTime: time.Now(), - startTime: time.Now(), - endTime: time.Now(), - subJobType: t.GetSubJobType(), - }) + s.runningQueueLock.Lock() + s.runningTasks[taskID] = task + s.runningQueueLock.Unlock() } } } @@ -210,36 +227,67 @@ func (s *taskScheduler) notify() { } } +func (s *taskScheduler) exist(taskID UniqueID) bool { + exist := s.pendingTasks.Exist(taskID) + if exist { + return true + } + + s.runningQueueLock.RLock() + defer s.runningQueueLock.RUnlock() + _, ok := s.runningTasks[taskID] + return ok +} + +func (s *taskScheduler) getRunningTask(taskID UniqueID) Task { + s.runningQueueLock.RLock() + defer s.runningQueueLock.RUnlock() + + return s.runningTasks[taskID] +} + +func (s *taskScheduler) removeRunningTask(taskID UniqueID) { + s.runningQueueLock.Lock() + defer s.runningQueueLock.Unlock() + + delete(s.runningTasks, taskID) +} + func (s *taskScheduler) enqueue(task Task) { defer s.notify() - - s.Lock() - defer s.Unlock() taskID := task.GetTaskID() - if _, ok := s.tasks[taskID]; !ok { - s.tasks[taskID] = task - s.taskStats.Add(taskID, task) + + s.runningQueueLock.RLock() + _, ok := s.runningTasks[taskID] + s.runningQueueLock.RUnlock() + if !ok { + s.pendingTasks.Push(task) task.SetQueueTime(time.Now()) - log.Info("taskScheduler enqueue task", zap.Int64("taskID", taskID)) + log.Ctx(s.ctx).Info("taskScheduler enqueue task", zap.Int64("taskID", taskID)) } } -func (s *taskScheduler) GetTaskCount() int { - s.RLock() - defer s.RUnlock() - return len(s.tasks) -} - func (s *taskScheduler) AbortTask(taskID int64) { - log.Info("task scheduler receive abort task request", zap.Int64("taskID", taskID)) - s.RLock() - task, ok := s.tasks[taskID] - s.RUnlock() - if ok { + log.Ctx(s.ctx).Info("task scheduler receive abort task request", zap.Int64("taskID", taskID)) + + task := s.pendingTasks.Get(taskID) + if task != nil { s.taskLock.Lock(taskID) task.SetState(indexpb.JobState_JobStateFailed, "canceled") s.taskLock.Unlock(taskID) } + + s.runningQueueLock.Lock() + if task != nil { + s.runningTasks[taskID] = task + } + if runningTask, ok := s.runningTasks[taskID]; ok { + s.taskLock.Lock(taskID) + runningTask.SetState(indexpb.JobState_JobStateFailed, "canceled") + s.taskLock.Unlock(taskID) + } + s.runningQueueLock.Unlock() + s.pendingTasks.Remove(taskID) } func (s *taskScheduler) schedule() { @@ -265,69 +313,131 @@ func (s *taskScheduler) schedule() { } } -func (s *taskScheduler) getTask(taskID UniqueID) Task { - s.RLock() - defer s.RUnlock() +func (s *taskScheduler) checkProcessingTasksLoop() { + log.Ctx(s.ctx).Info("taskScheduler checkProcessingTasks loop start") + defer s.wg.Done() + ticker := time.NewTicker(s.scheduleDuration) + defer ticker.Stop() + for { + select { + case <-s.ctx.Done(): + log.Ctx(s.ctx).Warn("task scheduler ctx done") + return + case <-ticker.C: + s.checkProcessingTasks() + } + } +} - return s.tasks[taskID] +func (s *taskScheduler) checkProcessingTasks() { + runningTaskIDs := make([]UniqueID, 0) + s.runningQueueLock.RLock() + for taskID := range s.runningTasks { + runningTaskIDs = append(runningTaskIDs, taskID) + } + s.runningQueueLock.RUnlock() + + log.Ctx(s.ctx).Info("check running tasks", zap.Int("runningTask num", len(runningTaskIDs))) + + var wg sync.WaitGroup + sem := make(chan struct{}, 100) + for _, taskID := range runningTaskIDs { + wg.Add(1) + sem <- struct{}{} + taskID := taskID + go func(taskID int64) { + defer wg.Done() + task := s.getRunningTask(taskID) + s.taskLock.Lock(taskID) + suc := s.checkProcessingTask(task) + s.taskLock.Unlock(taskID) + if suc { + s.removeRunningTask(taskID) + } + <-sem + }(taskID) + } + wg.Wait() +} + +func (s *taskScheduler) checkProcessingTask(task Task) bool { + switch task.GetState() { + case indexpb.JobState_JobStateInProgress: + return s.processInProgress(task) + case indexpb.JobState_JobStateRetry: + return s.processRetry(task) + case indexpb.JobState_JobStateFinished, indexpb.JobState_JobStateFailed: + return s.processFinished(task) + default: + log.Ctx(s.ctx).Error("invalid task state in running queue", zap.Int64("taskID", task.GetTaskID()), zap.String("state", task.GetState().String())) + } + return false } func (s *taskScheduler) run() { // schedule policy - s.RLock() - taskIDs := make([]UniqueID, 0, len(s.tasks)) - for tID := range s.tasks { - taskIDs = append(taskIDs, tID) - } - s.RUnlock() - if len(taskIDs) > 0 { - log.Ctx(s.ctx).Info("task scheduler", zap.Int("task num", len(taskIDs))) + pendingTaskNum := s.pendingTasks.TaskCount() + if pendingTaskNum == 0 { + return } - s.policy(taskIDs) - - for _, taskID := range taskIDs { - s.taskLock.Lock(taskID) - ok := s.process(taskID) - if !ok { - s.taskLock.Unlock(taskID) - log.Ctx(s.ctx).Info("there is no idle indexing node, waiting for retry...") + // 1. pick an indexNode client + nodeSlots := s.nodeManager.QuerySlots() + log.Ctx(s.ctx).Info("task scheduler", zap.Int("task num", pendingTaskNum), zap.Any("nodeSlots", nodeSlots)) + var wg sync.WaitGroup + sem := make(chan struct{}, 100) + for { + nodeID := pickNode(nodeSlots) + if nodeID == -1 { + log.Ctx(s.ctx).Debug("pick node failed") break } - s.taskLock.Unlock(taskID) + + task := s.pendingTasks.Pop() + if task == nil { + break + } + + wg.Add(1) + sem <- struct{}{} + go func(task Task, nodeID int64) { + defer wg.Done() + + s.taskLock.Lock(task.GetTaskID()) + s.process(task, nodeID) + s.taskLock.Unlock(task.GetTaskID()) + + switch task.GetState() { + case indexpb.JobState_JobStateNone: + return + case indexpb.JobState_JobStateInit: + s.pendingTasks.Push(task) + default: + s.runningQueueLock.Lock() + s.runningTasks[task.GetTaskID()] = task + s.runningQueueLock.Unlock() + } + <-sem + }(task, nodeID) } + wg.Wait() } -func (s *taskScheduler) removeTask(taskID UniqueID) { - s.Lock() - defer s.Unlock() - delete(s.tasks, taskID) -} - -func (s *taskScheduler) process(taskID UniqueID) bool { - task := s.getTask(taskID) - +func (s *taskScheduler) process(task Task, nodeID int64) bool { if !task.CheckTaskHealthy(s.meta) { - s.removeTask(taskID) + task.SetState(indexpb.JobState_JobStateNone, "task not healthy") return true } - state := task.GetState() - log.Ctx(s.ctx).Info("task is processing", zap.Int64("taskID", taskID), - zap.String("task type", task.GetTaskType()), zap.String("state", state.String())) + log.Ctx(s.ctx).Info("task is processing", zap.Int64("taskID", task.GetTaskID()), + zap.String("task type", task.GetTaskType()), zap.String("state", task.GetState().String())) - switch state { + switch task.GetState() { case indexpb.JobState_JobStateNone: - s.removeTask(taskID) - + return true case indexpb.JobState_JobStateInit: - return s.processInit(task) - case indexpb.JobState_JobStateFinished, indexpb.JobState_JobStateFailed: - return s.processFinished(task) - case indexpb.JobState_JobStateRetry: - return s.processRetry(task) + return s.processInit(task, nodeID) default: - // state: in_progress - return s.processInProgress(task) + log.Ctx(s.ctx).Error("invalid task state in pending queue", zap.Int64("taskID", task.GetTaskID()), zap.String("state", task.GetState().String())) } return true } @@ -343,32 +453,23 @@ func (s *taskScheduler) collectTaskMetrics() { log.Warn("task scheduler context done") return case <-ticker.C: - s.RLock() - taskIDs := make([]UniqueID, 0, len(s.tasks)) - for tID := range s.tasks { - taskIDs = append(taskIDs, tID) - } - s.RUnlock() - maxTaskQueueingTime := make(map[string]int64) maxTaskRunningTime := make(map[string]int64) - collectMetricsFunc := func(taskID int64) { - task := s.getTask(taskID) + collectPendingMetricsFunc := func(taskID int64) { + task := s.pendingTasks.Get(taskID) if task == nil { return } + s.taskLock.Lock(taskID) defer s.taskLock.Unlock(taskID) - state := task.GetState() - switch state { - case indexpb.JobState_JobStateNone: - return + switch task.GetState() { case indexpb.JobState_JobStateInit: queueingTime := time.Since(task.GetQueueTime()) if queueingTime > Params.DataCoordCfg.TaskSlowThreshold.GetAsDuration(time.Second) { - log.Warn("task queueing time is too long", zap.Int64("taskID", taskID), + log.Ctx(s.ctx).Warn("task queueing time is too long", zap.Int64("taskID", taskID), zap.Int64("queueing time(ms)", queueingTime.Milliseconds())) } @@ -376,10 +477,18 @@ func (s *taskScheduler) collectTaskMetrics() { if !ok || maxQueueingTime < queueingTime.Milliseconds() { maxTaskQueueingTime[task.GetTaskType()] = queueingTime.Milliseconds() } + } + } + + collectRunningMetricsFunc := func(task Task) { + s.taskLock.Lock(task.GetTaskID()) + defer s.taskLock.Unlock(task.GetTaskID()) + + switch task.GetState() { case indexpb.JobState_JobStateInProgress: runningTime := time.Since(task.GetStartTime()) if runningTime > Params.DataCoordCfg.TaskSlowThreshold.GetAsDuration(time.Second) { - log.Warn("task running time is too long", zap.Int64("taskID", taskID), + log.Ctx(s.ctx).Warn("task running time is too long", zap.Int64("taskID", task.GetTaskID()), zap.Int64("running time(ms)", runningTime.Milliseconds())) } @@ -390,10 +499,18 @@ func (s *taskScheduler) collectTaskMetrics() { } } + taskIDs := s.pendingTasks.Keys() + for _, taskID := range taskIDs { - collectMetricsFunc(taskID) + collectPendingMetricsFunc(taskID) } + s.runningQueueLock.RLock() + for _, task := range s.runningTasks { + collectRunningMetricsFunc(task) + } + s.runningQueueLock.RUnlock() + for taskType, queueingTime := range maxTaskQueueingTime { metrics.DataCoordTaskExecuteLatency. WithLabelValues(taskType, metrics.Pending).Observe(float64(queueingTime)) @@ -407,7 +524,7 @@ func (s *taskScheduler) collectTaskMetrics() { } } -func (s *taskScheduler) processInit(task Task) bool { +func (s *taskScheduler) processInit(task Task, nodeID int64) bool { // 0. pre check task // Determine whether the task can be performed or if it is truly necessary. // for example: flat index doesn't need to actually build. checkPass is false. @@ -416,10 +533,9 @@ func (s *taskScheduler) processInit(task Task) bool { return true } - // 1. pick an indexNode client - nodeID, client := s.nodeManager.PickClient() - if client == nil { - log.Ctx(s.ctx).Debug("pick client failed") + client, exist := s.nodeManager.GetClientByID(nodeID) + if !exist || client == nil { + log.Ctx(s.ctx).Debug("get indexnode client failed", zap.Int64("nodeID", nodeID)) return false } log.Ctx(s.ctx).Info("pick client success", zap.Int64("taskID", task.GetTaskID()), zap.Int64("nodeID", nodeID)) @@ -458,18 +574,18 @@ func (s *taskScheduler) processInit(task Task) bool { WithLabelValues(task.GetTaskType(), metrics.Pending).Observe(float64(queueingTime.Milliseconds())) log.Ctx(s.ctx).Info("update task meta state to InProgress success", zap.Int64("taskID", task.GetTaskID()), zap.Int64("nodeID", nodeID)) - return s.processInProgress(task) + return true } func (s *taskScheduler) processFinished(task Task) bool { if err := task.SetJobInfo(s.meta); err != nil { log.Ctx(s.ctx).Warn("update task info failed", zap.Error(err)) - return true + return false } task.SetEndTime(time.Now()) runningTime := task.GetEndTime().Sub(task.GetStartTime()) if runningTime > Params.DataCoordCfg.TaskSlowThreshold.GetAsDuration(time.Second) { - log.Warn("task running time is too long", zap.Int64("taskID", task.GetTaskID()), + log.Ctx(s.ctx).Warn("task running time is too long", zap.Int64("taskID", task.GetTaskID()), zap.Int64("running time(ms)", runningTime.Milliseconds())) } metrics.DataCoordTaskExecuteLatency. @@ -477,10 +593,13 @@ func (s *taskScheduler) processFinished(task Task) bool { client, exist := s.nodeManager.GetClientByID(task.GetNodeID()) if exist { if !task.DropTaskOnWorker(s.ctx, client) { - return true + return false } } - s.removeTask(task.GetTaskID()) + log.Ctx(s.ctx).Info("task has been finished", zap.Int64("taskID", task.GetTaskID()), + zap.Int64("queueing time(ms)", task.GetStartTime().Sub(task.GetQueueTime()).Milliseconds()), + zap.Int64("running time(ms)", runningTime.Milliseconds()), + zap.Int64("total time(ms)", task.GetEndTime().Sub(task.GetQueueTime()).Milliseconds())) return true } @@ -488,11 +607,16 @@ func (s *taskScheduler) processRetry(task Task) bool { client, exist := s.nodeManager.GetClientByID(task.GetNodeID()) if exist { if !task.DropTaskOnWorker(s.ctx, client) { - return true + return false } } task.SetState(indexpb.JobState_JobStateInit, "") task.ResetTask(s.meta) + + log.Ctx(s.ctx).Info("processRetry success, set task to pending queue", zap.Int64("taskID", task.GetTaskID()), + zap.String("state", task.GetState().String())) + + s.pendingTasks.Push(task) return true } @@ -504,8 +628,19 @@ func (s *taskScheduler) processInProgress(task Task) bool { task.ResetTask(s.meta) return s.processFinished(task) } - return true + return false } + log.Ctx(s.ctx).Info("node does not exist, set task state to retry", zap.Int64("taskID", task.GetTaskID())) task.SetState(indexpb.JobState_JobStateRetry, "node does not exist") - return true + return false +} + +func pickNode(nodeSlots map[int64]int64) int64 { + for w, slots := range nodeSlots { + if slots >= 1 { + nodeSlots[w] = slots - 1 + return w + } + } + return -1 } diff --git a/internal/datacoord/task_scheduler_test.go b/internal/datacoord/task_scheduler_test.go index 7d49f232b5..9f94da47de 100644 --- a/internal/datacoord/task_scheduler_test.go +++ b/internal/datacoord/task_scheduler_test.go @@ -19,6 +19,7 @@ package datacoord import ( "context" "fmt" + "go.uber.org/zap" "sync" "testing" "time" @@ -35,7 +36,9 @@ import ( catalogmocks "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/mocks" + "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/proto/indexpb" "github.com/milvus-io/milvus/pkg/proto/workerpb" @@ -718,7 +721,7 @@ func (s *taskSchedulerSuite) createAnalyzeMeta(catalog metastore.DataCoordCatalo SegmentIDs: s.segmentIDs, TaskID: 2, NodeID: s.nodeID, - State: indexpb.JobState_JobStateInProgress, + State: indexpb.JobState_JobStateInit, FieldType: schemapb.DataType_FloatVector, }, 3: { @@ -841,7 +844,11 @@ func (s *taskSchedulerSuite) scheduler(handler Handler) { in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil) workerManager := session.NewMockWorkerManager(s.T()) - workerManager.EXPECT().PickClient().Return(s.nodeID, in) + workerManager.EXPECT().QuerySlots().RunAndReturn(func() map[int64]int64 { + return map[int64]int64{ + 1: 1, + } + }) workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true) mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)), @@ -854,16 +861,17 @@ func (s *taskSchedulerSuite) scheduler(handler Handler) { cm.EXPECT().RootPath().Return("root") scheduler := newTaskScheduler(ctx, mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil, nil) - s.Equal(9, len(scheduler.tasks)) - s.Equal(indexpb.JobState_JobStateInit, scheduler.tasks[1].GetState()) - s.Equal(indexpb.JobState_JobStateInProgress, scheduler.tasks[2].GetState()) - s.Equal(indexpb.JobState_JobStateRetry, scheduler.tasks[5].GetState()) - s.Equal(indexpb.JobState_JobStateInit, scheduler.tasks[buildID].GetState()) - s.Equal(indexpb.JobState_JobStateInProgress, scheduler.tasks[buildID+1].GetState()) - s.Equal(indexpb.JobState_JobStateInit, scheduler.tasks[buildID+3].GetState()) - s.Equal(indexpb.JobState_JobStateInProgress, scheduler.tasks[buildID+8].GetState()) - s.Equal(indexpb.JobState_JobStateInit, scheduler.tasks[buildID+9].GetState()) - s.Equal(indexpb.JobState_JobStateInit, scheduler.tasks[buildID+10].GetState()) + s.Equal(6, scheduler.pendingTasks.TaskCount()) + s.Equal(3, len(scheduler.runningTasks)) + s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(1).GetState()) + s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(2).GetState()) + s.Equal(indexpb.JobState_JobStateRetry, scheduler.runningTasks[5].GetState()) + s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(buildID).GetState()) + s.Equal(indexpb.JobState_JobStateInProgress, scheduler.runningTasks[buildID+1].GetState()) + s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(buildID+3).GetState()) + s.Equal(indexpb.JobState_JobStateInProgress, scheduler.runningTasks[buildID+8].GetState()) + s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(buildID+9).GetState()) + s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(buildID+10).GetState()) mt.segments.DropSegment(segID + 9) @@ -894,12 +902,15 @@ func (s *taskSchedulerSuite) scheduler(handler Handler) { }) for { - scheduler.RLock() - taskNum := len(scheduler.tasks) - scheduler.RUnlock() - - if taskNum == 0 { - break + if scheduler.pendingTasks.TaskCount() == 0 { + // maybe task is waiting for assigning, so sleep three seconds. + time.Sleep(time.Second * 3) + scheduler.runningQueueLock.RLock() + taskNum := len(scheduler.runningTasks) + scheduler.runningQueueLock.RUnlock() + if taskNum == 0 { + break + } } time.Sleep(time.Second) } @@ -973,6 +984,11 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() { catalog := catalogmocks.NewDataCoordCatalog(s.T()) workerManager := session.NewMockWorkerManager(s.T()) + workerManager.EXPECT().QuerySlots().RunAndReturn(func() map[int64]int64 { + return map[int64]int64{ + 1: 1, + } + }) mt := createMeta(catalog, withAnalyzeMeta(&analyzeMeta{ @@ -1011,12 +1027,13 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() { workerManager.EXPECT().GetClientByID(mock.Anything).Return(nil, false).Once() for { - scheduler.RLock() - taskNum := len(scheduler.tasks) - scheduler.RUnlock() - - if taskNum == 0 { - break + if scheduler.pendingTasks.TaskCount() == 0 { + scheduler.runningQueueLock.RLock() + taskNum := len(scheduler.runningTasks) + scheduler.runningQueueLock.RUnlock() + if taskNum == 0 { + break + } } time.Sleep(time.Second) } @@ -1034,6 +1051,11 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() { in := mocks.NewMockIndexNodeClient(s.T()) workerManager := session.NewMockWorkerManager(s.T()) + workerManager.EXPECT().QuerySlots().RunAndReturn(func() map[int64]int64 { + return map[int64]int64{ + 1: 1, + } + }) mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(&indexMeta{ @@ -1076,14 +1098,12 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() { workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once() in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once() - // pick client fail --> state: init - workerManager.EXPECT().PickClient().Return(0, nil).Once() - // update version failed --> state: init - workerManager.EXPECT().PickClient().Return(s.nodeID, in) + workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once() catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(errors.New("catalog update version error")).Once() // assign task to indexNode fail --> state: retry + workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once() catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Once() in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(&commonpb.Status{ Code: 65535, @@ -1102,6 +1122,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() { in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once() // update state to building failed --> state: retry + workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once() catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Once() in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once() catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(errors.New("catalog update building state error")).Once() @@ -1111,6 +1132,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() { in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once() // assign success --> state: InProgress + workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once() catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice() in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once() @@ -1164,6 +1186,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() { in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once() // init --> state: InProgress + workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once() catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice() in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once() @@ -1178,6 +1201,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() { in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once() // init --> state: InProgress + workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once() catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice() in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once() @@ -1194,6 +1218,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() { in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once() // init --> state: InProgress + workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once() catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice() in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once() @@ -1205,6 +1230,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() { in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once() // init --> state: InProgress + workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once() catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice() in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once() @@ -1250,12 +1276,14 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() { in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once() for { - scheduler.RLock() - taskNum := len(scheduler.tasks) - scheduler.RUnlock() + if scheduler.pendingTasks.TaskCount() == 0 { + scheduler.runningQueueLock.RLock() + taskNum := len(scheduler.runningTasks) + scheduler.runningQueueLock.RUnlock() - if taskNum == 0 { - break + if taskNum == 0 { + break + } } time.Sleep(time.Second) } @@ -1272,6 +1300,11 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() { catalog := catalogmocks.NewDataCoordCatalog(s.T()) in := mocks.NewMockIndexNodeClient(s.T()) workerManager := session.NewMockWorkerManager(s.T()) + workerManager.EXPECT().QuerySlots().RunAndReturn(func() map[int64]int64 { + return map[int64]int64{ + 1: 1, + } + }) mt := createMeta(catalog, withAnalyzeMeta(&analyzeMeta{ @@ -1348,89 +1381,133 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() { scheduler.Start() // get collection info failed --> init - handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(nil, errors.New("mock error")).Once() + handler.EXPECT().GetCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, collID int64) (*collectionInfo, error) { + log.Debug("get collection info failed", zap.Int64("collectionID", collID)) + return nil, errors.New("mock error") + }).Once() // get collection info success, get dim failed --> init - handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{ - ID: collID, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - {FieldID: 100, Name: "pk", IsPrimaryKey: true, IsPartitionKey: true, DataType: schemapb.DataType_Int64}, - {FieldID: s.fieldID, Name: "vec"}, + handler.EXPECT().GetCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, i int64) (*collectionInfo, error) { + log.Debug("get collection info success", zap.Int64("collectionID", i)) + return &collectionInfo{ + ID: collID, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + {FieldID: 100, Name: "pk", IsPrimaryKey: true, IsPartitionKey: true, DataType: schemapb.DataType_Int64}, + {FieldID: s.fieldID, Name: "vec"}, + }, }, - }, - }, nil).Once() + }, nil + }).Once() // assign failed --> retry - workerManager.EXPECT().PickClient().Return(s.nodeID, in).Once() - catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil).Once() + workerManager.EXPECT().GetClientByID(mock.Anything).RunAndReturn(func(nodeID int64) (types.IndexNodeClient, bool) { + log.Debug("get client success, but assign failed", zap.Int64("nodeID", nodeID)) + return in, true + }).Once() + catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, indices []*model.SegmentIndex) error { + log.Debug("alter segment indexes success, but assign failed", zap.Int64("taskID", indices[0].BuildID)) + return nil + }).Once() in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *workerpb.CreateJobV2Request, option ...grpc.CallOption) (*commonpb.Status, error) { indexNodeTasks[request.GetTaskID()]++ + log.Debug("assign task failed", zap.Int64("taskID", request.GetTaskID())) return nil, errors.New("mock error") }).Once() // retry --> init - workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once() + workerManager.EXPECT().GetClientByID(mock.Anything).RunAndReturn(func(nodeID int64) (types.IndexNodeClient, bool) { + log.Debug("assign failed, drop task on worker", zap.Int64("nodeID", nodeID)) + return in, true + }).Once() in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *workerpb.DropJobsV2Request, option ...grpc.CallOption) (*commonpb.Status, error) { for _, taskID := range request.GetTaskIDs() { indexNodeTasks[taskID]-- } + log.Debug("drop task on worker, success", zap.Int64s("taskIDs", request.GetTaskIDs())) return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil }).Once() // init --> inProgress - workerManager.EXPECT().PickClient().Return(s.nodeID, in).Once() - catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil).Twice() - handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{ - ID: collID, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - {FieldID: 100, Name: "pk", IsPrimaryKey: true, IsPartitionKey: true, DataType: schemapb.DataType_Int64}, - {FieldID: s.fieldID, Name: "vec", TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "10"}}}, + workerManager.EXPECT().GetClientByID(mock.Anything).RunAndReturn(func(nodeID int64) (types.IndexNodeClient, bool) { + log.Debug("assign task success", zap.Int64("nodeID", nodeID)) + return in, true + }).Once() + catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, indices []*model.SegmentIndex) error { + log.Debug("alter segment success twice and assign task success", zap.Int64("taskID", indices[0].BuildID)) + return nil + }).Twice() + handler.EXPECT().GetCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, i int64) (*collectionInfo, error) { + log.Debug("get collection success and assign task success", zap.Int64("collID", i)) + return &collectionInfo{ + ID: collID, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + {FieldID: 100, Name: "pk", IsPrimaryKey: true, IsPartitionKey: true, DataType: schemapb.DataType_Int64}, + {FieldID: s.fieldID, Name: "vec", TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "10"}}}, + }, }, - }, - }, nil).Once() + }, nil + }).Once() in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *workerpb.CreateJobV2Request, option ...grpc.CallOption) (*commonpb.Status, error) { indexNodeTasks[request.GetTaskID()]++ + log.Debug("assign task success", zap.Int64("nodeID", nodeID)) return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil }).Once() // inProgress --> Finished - workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once() - in.EXPECT().QueryJobsV2(mock.Anything, mock.Anything).Return(&workerpb.QueryJobsV2Response{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, - ClusterID: "", - Result: &workerpb.QueryJobsV2Response_IndexJobResults{ - IndexJobResults: &workerpb.IndexJobResults{ - Results: []*workerpb.IndexTaskInfo{ - { - BuildID: buildID, - State: commonpb.IndexState_Finished, - IndexFileKeys: []string{"file1", "file2"}, - SerializedSize: 1024, + workerManager.EXPECT().GetClientByID(mock.Anything).RunAndReturn(func(nodeID int64) (types.IndexNodeClient, bool) { + log.Debug("get task result success, task is finished", zap.Int64("nodeID", nodeID)) + return in, true + }).Once() + in.EXPECT().QueryJobsV2(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *workerpb.QueryJobsV2Request, option ...grpc.CallOption) (*workerpb.QueryJobsV2Response, error) { + log.Debug("query task result success, task is finished", zap.Int64s("taskIDs", request.GetTaskIDs())) + return &workerpb.QueryJobsV2Response{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + ClusterID: "", + Result: &workerpb.QueryJobsV2Response_IndexJobResults{ + IndexJobResults: &workerpb.IndexJobResults{ + Results: []*workerpb.IndexTaskInfo{ + { + BuildID: buildID, + State: commonpb.IndexState_Finished, + IndexFileKeys: []string{"file1", "file2"}, + SerializedSize: 1024, + }, }, }, }, - }, - }, nil) + }, nil + }) // finished --> done - catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil).Once() - workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once() + finishCH := make(chan struct{}) + catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, indices []*model.SegmentIndex) error { + log.Debug("task is finished, alter segment index success", zap.Int64("taskID", indices[0].BuildID)) + return nil + }).Once() + workerManager.EXPECT().GetClientByID(mock.Anything).RunAndReturn(func(nodeID int64) (types.IndexNodeClient, bool) { + log.Debug("task is finished, drop task on worker", zap.Int64("nodeID", nodeID)) + return in, true + }).Once() in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *workerpb.DropJobsV2Request, option ...grpc.CallOption) (*commonpb.Status, error) { for _, taskID := range request.GetTaskIDs() { indexNodeTasks[taskID]-- + finishCH <- struct{}{} } + log.Debug("task is finished, drop task on worker success", zap.Int64("nodeID", nodeID)) return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil }).Once() + <-finishCH for { - scheduler.RLock() - taskNum := len(scheduler.tasks) - scheduler.RUnlock() - - if taskNum == 0 { - break + if scheduler.pendingTasks.TaskCount() == 0 { + scheduler.runningQueueLock.RLock() + taskNum := len(scheduler.runningTasks) + scheduler.runningQueueLock.RUnlock() + if taskNum == 0 { + break + } } time.Sleep(time.Second) } @@ -1458,7 +1535,11 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { in := mocks.NewMockIndexNodeClient(s.T()) workerManager := session.NewMockWorkerManager(s.T()) - workerManager.EXPECT().PickClient().Return(s.nodeID, in) + workerManager.EXPECT().QuerySlots().RunAndReturn(func() map[int64]int64 { + return map[int64]int64{ + 1: 1, + } + }) workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true) minNumberOfRowsToBuild := paramtable.Get().DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() + 1 @@ -1619,18 +1700,21 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { waitTaskDoneFunc := func(sche *taskScheduler) { for { - sche.RLock() - taskNum := len(sche.tasks) - sche.RUnlock() - - if taskNum == 0 { - break + if sche.pendingTasks.TaskCount() == 0 { + sche.runningQueueLock.RLock() + taskNum := len(sche.runningTasks) + sche.runningQueueLock.RUnlock() + if taskNum == 0 { + break + } } time.Sleep(time.Second) } } resetMetaFunc := func() { + mt.indexMeta.Lock() + defer mt.indexMeta.Unlock() t, ok := mt.indexMeta.segmentBuildInfo.Get(buildID) s.True(ok) t.IndexState = commonpb.IndexState_Unissued @@ -1680,8 +1764,8 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { s.NotZero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set") return merr.Success(), nil }).Once() - s.Equal(1, len(scheduler.tasks)) - s.Equal(indexpb.JobState_JobStateInit, scheduler.tasks[buildID].GetState()) + s.Equal(1, scheduler.pendingTasks.TaskCount()) + s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(buildID).GetState()) scheduler.Start() waitTaskDoneFunc(scheduler) @@ -1859,6 +1943,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { s.Run("Submit partitionKeyIsolation is false when MV not enabled", func() { paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") + defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { s.Equal(in.GetIndexRequest().PartitionKeyIsolation, false) @@ -1884,7 +1969,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { isoCollInfo.Properties[common.PartitionKeyIsolationKey] = "true" in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { - s.Equal(in.GetIndexRequest().PartitionKeyIsolation, true) + s.True(in.GetIndexRequest().PartitionKeyIsolation) return merr.Success(), nil }).Once() t := &indexBuildTask{ @@ -1957,8 +2042,7 @@ func (s *taskSchedulerSuite) Test_reload() { scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler) s.NotNil(scheduler) s.True(mt.segments.segments[1000].isCompacting) - task, ok := scheduler.tasks[statsTaskID] - s.True(ok) + task := scheduler.runningTasks[statsTaskID] s.NotNil(task) }) @@ -1994,8 +2078,7 @@ func (s *taskSchedulerSuite) Test_reload() { scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler) s.NotNil(scheduler) s.True(mt.segments.segments[1000].isCompacting) - task, ok := scheduler.tasks[statsTaskID] - s.False(ok) + task := scheduler.pendingTasks.Get(statsTaskID) s.Nil(task) }) @@ -2031,8 +2114,7 @@ func (s *taskSchedulerSuite) Test_reload() { scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler) s.NotNil(scheduler) s.True(mt.segments.segments[1000].isCompacting) - task, ok := scheduler.tasks[statsTaskID] - s.True(ok) + task := scheduler.runningTasks[statsTaskID] s.Equal(indexpb.JobState_JobStateFailed, task.GetState()) }) @@ -2068,8 +2150,7 @@ func (s *taskSchedulerSuite) Test_reload() { scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler) s.NotNil(scheduler) s.False(mt.segments.segments[1000].isCompacting) - task, ok := scheduler.tasks[statsTaskID] - s.False(ok) + task := scheduler.pendingTasks.Get(statsTaskID) s.Nil(task) }) @@ -2105,8 +2186,7 @@ func (s *taskSchedulerSuite) Test_reload() { scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler) s.NotNil(scheduler) s.False(mt.segments.segments[1000].isCompacting) - task, ok := scheduler.tasks[statsTaskID] - s.True(ok) + task := scheduler.runningTasks[statsTaskID] s.Equal(indexpb.JobState_JobStateFailed, task.GetState()) }) } diff --git a/internal/datacoord/task_stats.go b/internal/datacoord/task_stats.go index abb569f07b..a15d04f1e3 100644 --- a/internal/datacoord/task_stats.go +++ b/internal/datacoord/task_stats.go @@ -160,7 +160,6 @@ func (st *statsTask) UpdateMetaBuildingState(meta *meta) error { } func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bool { - // set segment compacting log := log.Ctx(ctx).With(zap.Int64("taskID", st.taskID), zap.Int64("segmentID", st.segmentID)) segment := dependency.meta.GetHealthySegment(ctx, st.segmentID) if segment == nil { diff --git a/internal/indexnode/indexnode_service.go b/internal/indexnode/indexnode_service.go index b09617106d..947336c84b 100644 --- a/internal/indexnode/indexnode_service.go +++ b/internal/indexnode/indexnode_service.go @@ -382,6 +382,7 @@ func (i *IndexNode) CreateJobV2(ctx context.Context, req *workerpb.CreateJobV2Re log.Info("receive stats job", zap.Int64("collectionID", statsRequest.GetCollectionID()), zap.Int64("partitionID", statsRequest.GetPartitionID()), zap.Int64("segmentID", statsRequest.GetSegmentID()), + zap.Int64("numRows", statsRequest.GetNumRows()), zap.Int64("targetSegmentID", statsRequest.GetTargetSegmentID()), zap.String("subJobType", statsRequest.GetSubJobType().String()), zap.Int64("startLogID", statsRequest.GetStartLogID()), diff --git a/internal/indexnode/task_stats.go b/internal/indexnode/task_stats.go index 9d1572f421..7972bae622 100644 --- a/internal/indexnode/task_stats.go +++ b/internal/indexnode/task_stats.go @@ -116,12 +116,13 @@ func (st *statsTask) PreExecute(ctx context.Context) error { defer span.End() st.queueDur = st.tr.RecordSpan() - log.Ctx(ctx).Info("Begin to prepare stats task", + log.Ctx(ctx).Info("Begin to PreExecute stats task", zap.String("clusterID", st.req.GetClusterID()), zap.Int64("taskID", st.req.GetTaskID()), zap.Int64("collectionID", st.req.GetCollectionID()), zap.Int64("partitionID", st.req.GetPartitionID()), zap.Int64("segmentID", st.req.GetSegmentID()), + zap.Int64("queue duration(ms)", st.queueDur.Milliseconds()), ) if err := binlog.DecompressBinLogWithRootPath(st.req.GetStorageConfig().GetRootPath(), storage.InsertBinlog, st.req.GetCollectionID(), st.req.GetPartitionID(), @@ -152,6 +153,16 @@ func (st *statsTask) PreExecute(ctx context.Context) error { } } + preExecuteRecordSpan := st.tr.RecordSpan() + + log.Ctx(ctx).Info("successfully PreExecute stats task", + zap.String("clusterID", st.req.GetClusterID()), + zap.Int64("taskID", st.req.GetTaskID()), + zap.Int64("collectionID", st.req.GetCollectionID()), + zap.Int64("partitionID", st.req.GetPartitionID()), + zap.Int64("segmentID", st.req.GetSegmentID()), + zap.Int64("preExecuteRecordSpan(ms)", preExecuteRecordSpan.Milliseconds()), + ) return nil }