enhance: Refine index task scheduler policy (#40104)

master pr: #39084 

issue: #39101

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-02-22 11:25:54 +08:00 committed by GitHub
parent 898606ae4c
commit be30f5301d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 758 additions and 292 deletions

View File

@ -27,7 +27,6 @@ import (
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
@ -66,8 +65,6 @@ type indexMeta struct {
// segmentID -> indexID -> segmentIndex
segmentIndexes map[UniqueID]map[UniqueID]*model.SegmentIndex
lastUpdateMetricTime atomic.Time
}
func newIndexTaskStats(s *model.SegmentIndex) *metricsinfo.IndexTaskStats {
@ -209,10 +206,9 @@ func (m *indexMeta) updateSegIndexMeta(segIdx *model.SegmentIndex, updateFunc fu
}
func (m *indexMeta) updateIndexTasksMetrics() {
if time.Since(m.lastUpdateMetricTime.Load()) < 120*time.Second {
return
}
defer m.lastUpdateMetricTime.Store(time.Now())
m.RLock()
defer m.RUnlock()
taskMetrics := make(map[UniqueID]map[commonpb.IndexState]int)
for _, segIdx := range m.segmentBuildInfo.List() {
if segIdx.IsDeleted || !m.isIndexExist(segIdx.CollectionID, segIdx.IndexID) {
@ -446,7 +442,6 @@ func (m *indexMeta) AddSegmentIndex(ctx context.Context, segIndex *model.Segment
log.Ctx(ctx).Info("meta update: adding segment index success", zap.Int64("collectionID", segIndex.CollectionID),
zap.Int64("segmentID", segIndex.SegmentID), zap.Int64("indexID", segIndex.IndexID),
zap.Int64("buildID", buildID))
m.updateIndexTasksMetrics()
return nil
}
@ -823,7 +818,6 @@ func (m *indexMeta) FinishTask(taskInfo *workerpb.IndexTaskInfo) error {
zap.String("state", taskInfo.GetState().String()), zap.String("fail reason", taskInfo.GetFailReason()),
zap.Int32("current_index_version", taskInfo.GetCurrentIndexVersion()),
)
m.updateIndexTasksMetrics()
metrics.FlushedSegmentFileNum.WithLabelValues(metrics.IndexFileLabel).Observe(float64(len(taskInfo.GetIndexFileKeys())))
return nil
}
@ -848,7 +842,6 @@ func (m *indexMeta) DeleteTask(buildID int64) error {
}
log.Ctx(m.ctx).Info("delete index task success", zap.Int64("buildID", buildID))
m.updateIndexTasksMetrics()
return nil
}
@ -877,8 +870,6 @@ func (m *indexMeta) BuildIndex(buildID UniqueID) error {
}
log.Ctx(m.ctx).Info("meta update: segment index in progress success", zap.Int64("buildID", segIdx.BuildID),
zap.Int64("segmentID", segIdx.SegmentID))
m.updateIndexTasksMetrics()
return nil
}
@ -931,7 +922,6 @@ func (m *indexMeta) RemoveSegmentIndex(ctx context.Context, collID, partID, segI
}
m.segmentBuildInfo.Remove(buildID)
m.updateIndexTasksMetrics()
return nil
}

View File

@ -110,7 +110,7 @@ func (jm *statsJobManager) triggerSortStatsTask() {
}))
for _, segment := range visibleSegments {
if jm.scheduler.GetTaskCount() > Params.DataCoordCfg.StatsTaskTriggerCount.GetAsInt() {
if jm.scheduler.pendingTasks.TaskCount() > Params.DataCoordCfg.StatsTaskTriggerCount.GetAsInt() {
break
}
jm.createSortStatsTaskForSegment(segment)
@ -261,7 +261,7 @@ func (jm *statsJobManager) cleanupStatsTasksLoop() {
taskIDs := jm.mt.statsTaskMeta.CanCleanedTasks()
for _, taskID := range taskIDs {
// waiting for queue processing tasks to complete
if jm.scheduler.getTask(taskID) == nil {
if !jm.scheduler.exist(taskID) {
if err := jm.mt.statsTaskMeta.DropStatsTask(taskID); err != nil {
// ignore err, if remove failed, wait next GC
log.Warn("clean up stats task failed", zap.Int64("taskID", taskID), zap.Error(err))

View File

@ -106,10 +106,11 @@ func (s *jobManagerSuite) TestJobManager_triggerStatsTaskLoop() {
loopWg: sync.WaitGroup{},
mt: mt,
scheduler: &taskScheduler{
allocator: alloc,
tasks: make(map[int64]Task),
meta: mt,
taskStats: expirable.NewLRU[UniqueID, Task](512, nil, time.Minute*5),
allocator: alloc,
pendingTasks: newFairQueuePolicy(),
runningTasks: make(map[UniqueID]Task),
meta: mt,
taskStats: expirable.NewLRU[UniqueID, Task](512, nil, time.Minute*5),
},
allocator: alloc,
}
@ -122,5 +123,5 @@ func (s *jobManagerSuite) TestJobManager_triggerStatsTaskLoop() {
jm.loopWg.Wait()
s.Equal(3, len(jm.scheduler.tasks))
s.Equal(3, jm.scheduler.pendingTasks.TaskCount())
}

View File

@ -754,11 +754,34 @@ func (s *Server) startServerLoop() {
}
}
func (s *Server) startCollectMetaMetrics(ctx context.Context) {
s.serverLoopWg.Add(1)
go s.collectMetaMetrics(ctx)
}
func (s *Server) collectMetaMetrics(ctx context.Context) {
defer s.serverLoopWg.Done()
ticker := time.NewTicker(time.Second * 120)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Ctx(s.ctx).Warn("collectMetaMetrics ctx done")
return
case <-ticker.C:
s.meta.statsTaskMeta.updateMetrics()
s.meta.indexMeta.updateIndexTasksMetrics()
}
}
}
func (s *Server) startTaskScheduler() {
s.taskScheduler.Start()
s.jobManager.Start()
s.startIndexService(s.serverLoopCtx)
s.startCollectMetaMetrics(s.serverLoopCtx)
}
func (s *Server) updateSegmentStatistics(ctx context.Context, stats []*commonpb.SegmentStats) {
@ -1082,6 +1105,7 @@ func (s *Server) Stop() error {
log.Info("datacoord garbage collector stopped")
s.stopServerLoop()
log.Info("datacoord stopServerLoop stopped")
s.importScheduler.Close()
s.importChecker.Close()

View File

@ -44,6 +44,7 @@ type WorkerManager interface {
RemoveNode(nodeID typeutil.UniqueID)
StoppingNode(nodeID typeutil.UniqueID)
PickClient() (typeutil.UniqueID, types.IndexNodeClient)
QuerySlots() map[int64]int64
ClientSupportDisk() bool
GetAllClients() map[typeutil.UniqueID]types.IndexNodeClient
GetClientByID(nodeID typeutil.UniqueID) (types.IndexNodeClient, bool)
@ -115,6 +116,42 @@ func (nm *IndexNodeManager) AddNode(nodeID typeutil.UniqueID, address string) er
return nil
}
func (nm *IndexNodeManager) QuerySlots() map[int64]int64 {
nm.lock.Lock()
defer nm.lock.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), querySlotTimeout)
defer cancel()
nodeSlots := make(map[int64]int64)
mu := &sync.Mutex{}
wg := &sync.WaitGroup{}
for nodeID, client := range nm.nodeClients {
if _, ok := nm.stoppingNodes[nodeID]; !ok {
wg.Add(1)
go func(nodeID int64) {
defer wg.Done()
resp, err := client.GetJobStats(ctx, &workerpb.GetJobStatsRequest{})
if err != nil {
log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID), zap.Error(err))
return
}
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID),
zap.String("reason", resp.GetStatus().GetReason()))
return
}
mu.Lock()
defer mu.Unlock()
nodeSlots[nodeID] = resp.GetTaskSlots()
}(nodeID)
}
}
wg.Wait()
log.Ctx(context.TODO()).Debug("query slot done", zap.Any("nodeSlots", nodeSlots))
return nodeSlots
}
func (nm *IndexNodeManager) PickClient() (typeutil.UniqueID, types.IndexNodeClient) {
nm.lock.Lock()
defer nm.lock.Unlock()

View File

@ -274,6 +274,53 @@ func (_c *MockWorkerManager_PickClient_Call) RunAndReturn(run func() (int64, typ
return _c
}
// QuerySlots provides a mock function with given fields:
func (_m *MockWorkerManager) QuerySlots() map[int64]int64 {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for QuerySlots")
}
var r0 map[int64]int64
if rf, ok := ret.Get(0).(func() map[int64]int64); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[int64]int64)
}
}
return r0
}
// MockWorkerManager_QuerySlots_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QuerySlots'
type MockWorkerManager_QuerySlots_Call struct {
*mock.Call
}
// QuerySlots is a helper method to define mock.On call
func (_e *MockWorkerManager_Expecter) QuerySlots() *MockWorkerManager_QuerySlots_Call {
return &MockWorkerManager_QuerySlots_Call{Call: _e.mock.On("QuerySlots")}
}
func (_c *MockWorkerManager_QuerySlots_Call) Run(run func()) *MockWorkerManager_QuerySlots_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockWorkerManager_QuerySlots_Call) Return(_a0 map[int64]int64) *MockWorkerManager_QuerySlots_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockWorkerManager_QuerySlots_Call) RunAndReturn(run func() map[int64]int64) *MockWorkerManager_QuerySlots_Call {
_c.Call.Return(run)
return _c
}
// RemoveNode provides a mock function with given fields: nodeID
func (_m *MockWorkerManager) RemoveNode(nodeID int64) {
_m.Called(nodeID)

View File

@ -73,6 +73,9 @@ func (stm *statsTaskMeta) reloadFromKV() error {
}
func (stm *statsTaskMeta) updateMetrics() {
stm.RLock()
defer stm.RUnlock()
taskMetrics := make(map[UniqueID]map[indexpb.JobState]int)
for _, t := range stm.tasks {
if _, ok := taskMetrics[t.GetCollectionID()]; !ok {
@ -122,7 +125,6 @@ func (stm *statsTaskMeta) AddStatsTask(t *indexpb.StatsTask) error {
}
stm.tasks[t.GetTaskID()] = t
stm.updateMetrics()
log.Info("add stats task success", zap.Int64("taskID", t.GetTaskID()), zap.Int64("originSegmentID", t.GetSegmentID()),
zap.Int64("targetSegmentID", t.GetTargetSegmentID()), zap.String("subJobType", t.GetSubJobType().String()))
@ -149,7 +151,6 @@ func (stm *statsTaskMeta) DropStatsTask(taskID int64) error {
}
delete(stm.tasks, taskID)
stm.updateMetrics()
log.Info("remove stats task success", zap.Int64("taskID", taskID), zap.Int64("segmentID", t.SegmentID))
return nil
@ -178,7 +179,6 @@ func (stm *statsTaskMeta) UpdateVersion(taskID, nodeID int64) error {
}
stm.tasks[t.TaskID] = cloneT
stm.updateMetrics()
log.Info("update stats task version success", zap.Int64("taskID", taskID), zap.Int64("nodeID", nodeID),
zap.Int64("newVersion", cloneT.GetVersion()))
return nil
@ -205,7 +205,6 @@ func (stm *statsTaskMeta) UpdateBuildingTask(taskID int64) error {
}
stm.tasks[t.TaskID] = cloneT
stm.updateMetrics()
log.Info("update building stats task success", zap.Int64("taskID", taskID))
return nil
@ -233,7 +232,6 @@ func (stm *statsTaskMeta) FinishTask(taskID int64, result *workerpb.StatsResult)
}
stm.tasks[t.TaskID] = cloneT
stm.updateMetrics()
log.Info("finish stats task meta success", zap.Int64("taskID", taskID), zap.Int64("segmentID", t.SegmentID),
zap.String("state", result.GetState().String()), zap.String("failReason", t.GetFailReason()))
@ -334,7 +332,6 @@ func (stm *statsTaskMeta) MarkTaskCanRecycle(taskID int64) error {
}
stm.tasks[t.TaskID] = cloneT
stm.updateMetrics()
log.Info("mark stats task can recycle success", zap.Int64("taskID", taskID),
zap.Int64("segmentID", t.SegmentID),

View File

@ -103,8 +103,8 @@ func (it *indexBuildTask) GetTaskType() string {
}
func (it *indexBuildTask) CheckTaskHealthy(mt *meta) bool {
_, exist := mt.indexMeta.GetIndexJob(it.GetTaskID())
return exist
job, exist := mt.indexMeta.GetIndexJob(it.GetTaskID())
return exist && !job.IsDeleted
}
func (it *indexBuildTask) SetState(state indexpb.JobState, failReason string) {
@ -262,7 +262,8 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
log.Ctx(ctx).Info("index task pre check successfully", zap.Int64("taskID", it.GetTaskID()),
zap.Int64("segID", segment.GetID()),
zap.Int32("CurrentIndexVersion", it.req.GetCurrentIndexVersion()),
zap.Int32("CurrentScalarIndexVersion", it.req.GetCurrentScalarIndexVersion()))
zap.Int32("CurrentScalarIndexVersion", it.req.GetCurrentScalarIndexVersion()),
zap.Int64("segID", segment.GetID()))
return true
}

View File

@ -0,0 +1,143 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"sync"
)
// schedulePolicy is the policy of scheduler.
type schedulePolicy interface {
Push(task Task)
// Pop get the task next ready to run.
Pop() Task
BatchPop(batch int) []Task
Get(taskID UniqueID) Task
Keys() []UniqueID
TaskCount() int
Exist(taskID UniqueID) bool
Remove(taskID UniqueID)
}
var _ schedulePolicy = &fairQueuePolicy{}
type fairQueuePolicy struct {
tasks map[UniqueID]Task
taskIDs []UniqueID
lock sync.RWMutex
}
func newFairQueuePolicy() *fairQueuePolicy {
return &fairQueuePolicy{
tasks: make(map[UniqueID]Task, 0),
taskIDs: make([]UniqueID, 0),
lock: sync.RWMutex{},
}
}
func (fqp *fairQueuePolicy) Push(t Task) {
fqp.lock.Lock()
defer fqp.lock.Unlock()
if _, ok := fqp.tasks[t.GetTaskID()]; !ok {
fqp.tasks[t.GetTaskID()] = t
fqp.taskIDs = append(fqp.taskIDs, t.GetTaskID())
}
}
func (fqp *fairQueuePolicy) Pop() Task {
fqp.lock.Lock()
defer fqp.lock.Unlock()
if len(fqp.taskIDs) == 0 {
return nil
}
taskID := fqp.taskIDs[0]
fqp.taskIDs = fqp.taskIDs[1:]
task := fqp.tasks[taskID]
delete(fqp.tasks, taskID)
return task
}
func (fqp *fairQueuePolicy) BatchPop(batch int) []Task {
fqp.lock.Lock()
defer fqp.lock.Unlock()
tasks := make([]Task, 0)
if len(fqp.taskIDs) <= batch {
for _, taskID := range fqp.taskIDs {
task := fqp.tasks[taskID]
delete(fqp.tasks, taskID)
tasks = append(tasks, task)
}
fqp.taskIDs = make([]UniqueID, 0)
return tasks
}
taskIDs := fqp.taskIDs[:batch]
for _, taskID := range taskIDs {
task := fqp.tasks[taskID]
delete(fqp.tasks, taskID)
tasks = append(tasks, task)
}
fqp.taskIDs = fqp.taskIDs[batch:]
return tasks
}
func (fqp *fairQueuePolicy) Get(taskID UniqueID) Task {
fqp.lock.RLock()
defer fqp.lock.RUnlock()
if len(fqp.taskIDs) == 0 {
return nil
}
task := fqp.tasks[taskID]
return task
}
func (fqp *fairQueuePolicy) TaskCount() int {
fqp.lock.RLock()
defer fqp.lock.RUnlock()
return len(fqp.taskIDs)
}
func (fqp *fairQueuePolicy) Exist(taskID UniqueID) bool {
fqp.lock.RLock()
defer fqp.lock.RUnlock()
_, ok := fqp.tasks[taskID]
return ok
}
func (fqp *fairQueuePolicy) Remove(taskID UniqueID) {
fqp.lock.Lock()
defer fqp.lock.Unlock()
taskIndex := -1
for i := range fqp.taskIDs {
if fqp.taskIDs[i] == taskID {
taskIndex = i
break
}
}
if taskIndex != -1 {
fqp.taskIDs = append(fqp.taskIDs[:taskIndex], fqp.taskIDs[taskIndex+1:]...)
delete(fqp.tasks, taskID)
}
}
func (fqp *fairQueuePolicy) Keys() []UniqueID {
fqp.lock.RLock()
defer fqp.lock.RUnlock()
return fqp.taskIDs
}

View File

@ -40,8 +40,6 @@ const (
)
type taskScheduler struct {
sync.RWMutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
@ -49,10 +47,13 @@ type taskScheduler struct {
scheduleDuration time.Duration
collectMetricsDuration time.Duration
// TODO @xiaocai2333: use priority queue
tasks map[int64]Task
pendingTasks schedulePolicy
runningTasks map[UniqueID]Task
runningQueueLock sync.RWMutex
taskLock *lock.KeyLock[int64]
notifyChan chan struct{}
taskLock *lock.KeyLock[int64]
meta *meta
@ -82,7 +83,8 @@ func newTaskScheduler(
ctx: ctx,
cancel: cancel,
meta: metaTable,
tasks: make(map[int64]Task),
pendingTasks: newFairQueuePolicy(),
runningTasks: make(map[UniqueID]Task),
notifyChan: make(chan struct{}, 1),
taskLock: lock.NewKeyLock[int64](),
scheduleDuration: Params.DataCoordCfg.IndexTaskSchedulerInterval.GetAsDuration(time.Millisecond),
@ -101,9 +103,10 @@ func newTaskScheduler(
}
func (s *taskScheduler) Start() {
s.wg.Add(2)
s.wg.Add(3)
go s.schedule()
go s.collectTaskMetrics()
go s.checkProcessingTasksLoop()
}
func (s *taskScheduler) Stop() {
@ -118,86 +121,100 @@ func (s *taskScheduler) reloadFromMeta() {
if segIndex.IsDeleted {
continue
}
if segIndex.IndexState != commonpb.IndexState_Finished && segIndex.IndexState != commonpb.IndexState_Failed {
s.enqueue(&indexBuildTask{
taskID: segIndex.BuildID,
nodeID: segIndex.NodeID,
taskInfo: &workerpb.IndexTaskInfo{
BuildID: segIndex.BuildID,
State: segIndex.IndexState,
FailReason: segIndex.FailReason,
},
queueTime: time.Now(),
startTime: time.Now(),
endTime: time.Now(),
})
task := &indexBuildTask{
taskID: segIndex.BuildID,
nodeID: segIndex.NodeID,
taskInfo: &workerpb.IndexTaskInfo{
BuildID: segIndex.BuildID,
State: segIndex.IndexState,
FailReason: segIndex.FailReason,
},
queueTime: time.Now(),
startTime: time.Now(),
endTime: time.Now(),
}
switch segIndex.IndexState {
case commonpb.IndexState_IndexStateNone, commonpb.IndexState_Unissued:
s.pendingTasks.Push(task)
case commonpb.IndexState_InProgress, commonpb.IndexState_Retry:
s.runningQueueLock.Lock()
s.runningTasks[segIndex.BuildID] = task
s.runningQueueLock.Unlock()
}
}
}
allAnalyzeTasks := s.meta.analyzeMeta.GetAllTasks()
for taskID, t := range allAnalyzeTasks {
if t.State != indexpb.JobState_JobStateFinished && t.State != indexpb.JobState_JobStateFailed {
s.enqueue(&analyzeTask{
taskID: taskID,
nodeID: t.NodeID,
taskInfo: &workerpb.AnalyzeResult{
TaskID: taskID,
State: t.State,
FailReason: t.FailReason,
},
queueTime: time.Now(),
startTime: time.Now(),
endTime: time.Now(),
})
task := &analyzeTask{
taskID: taskID,
nodeID: t.NodeID,
taskInfo: &workerpb.AnalyzeResult{
TaskID: taskID,
State: t.State,
FailReason: t.FailReason,
},
queueTime: time.Now(),
startTime: time.Now(),
endTime: time.Now(),
}
switch t.State {
case indexpb.JobState_JobStateNone, indexpb.JobState_JobStateInit:
s.pendingTasks.Push(task)
case indexpb.JobState_JobStateInProgress, indexpb.JobState_JobStateRetry:
s.runningQueueLock.Lock()
s.runningTasks[taskID] = task
s.runningQueueLock.Unlock()
}
}
allStatsTasks := s.meta.statsTaskMeta.GetAllTasks()
for taskID, t := range allStatsTasks {
if t.GetState() != indexpb.JobState_JobStateFinished && t.GetState() != indexpb.JobState_JobStateFailed {
if t.GetState() == indexpb.JobState_JobStateInProgress || t.GetState() == indexpb.JobState_JobStateRetry {
if t.GetState() == indexpb.JobState_JobStateInProgress || t.GetState() == indexpb.JobState_JobStateRetry {
exist, canDo := s.meta.CheckAndSetSegmentsCompacting(context.TODO(), []UniqueID{t.GetSegmentID()})
if !exist || !canDo {
log.Ctx(s.ctx).Warn("segment is not exist or is compacting, skip stats, but this should not have happened, try to remove the stats task",
zap.Int64("taskID", taskID), zap.Bool("exist", exist), zap.Bool("canDo", canDo))
err := s.meta.statsTaskMeta.DropStatsTask(t.GetTaskID())
if err == nil {
continue
}
log.Ctx(s.ctx).Warn("remove stats task failed, set to failed", zap.Int64("taskID", taskID), zap.Error(err))
t.State = indexpb.JobState_JobStateFailed
t.FailReason = "segment is not exist or is compacting"
} else {
if !s.compactionHandler.checkAndSetSegmentStating(t.GetInsertChannel(), t.GetSegmentID()) {
s.meta.SetSegmentsCompacting(context.TODO(), []UniqueID{t.GetSegmentID()}, false)
err := s.meta.statsTaskMeta.DropStatsTask(t.GetTaskID())
if err == nil {
continue
}
log.Ctx(s.ctx).Warn("remove stats task failed, set to failed", zap.Int64("taskID", taskID), zap.Error(err))
t.State = indexpb.JobState_JobStateFailed
t.FailReason = "segment is not exist or is l0 compacting"
}
task := &statsTask{
taskID: taskID,
segmentID: t.GetSegmentID(),
targetSegmentID: t.GetTargetSegmentID(),
nodeID: t.NodeID,
taskInfo: &workerpb.StatsResult{
TaskID: taskID,
State: t.GetState(),
FailReason: t.GetFailReason(),
},
queueTime: time.Now(),
startTime: time.Now(),
endTime: time.Now(),
subJobType: t.GetSubJobType(),
}
switch t.GetState() {
case indexpb.JobState_JobStateNone, indexpb.JobState_JobStateInit:
s.pendingTasks.Push(task)
case indexpb.JobState_JobStateInProgress, indexpb.JobState_JobStateRetry:
exist, canDo := s.meta.CheckAndSetSegmentsCompacting(context.TODO(), []UniqueID{t.GetSegmentID()})
if !exist || !canDo {
log.Ctx(s.ctx).Warn("segment is not exist or is compacting, skip stats, but this should not have happened, try to remove the stats task",
zap.Int64("taskID", taskID), zap.Bool("exist", exist), zap.Bool("canDo", canDo))
err := s.meta.statsTaskMeta.DropStatsTask(t.GetTaskID())
if err == nil {
continue
}
log.Ctx(s.ctx).Warn("remove stats task failed, set to failed", zap.Int64("taskID", taskID), zap.Error(err))
task.taskInfo.State = indexpb.JobState_JobStateFailed
task.taskInfo.FailReason = "segment is not exist or is compacting"
} else {
if !s.compactionHandler.checkAndSetSegmentStating(t.GetInsertChannel(), t.GetSegmentID()) {
s.meta.SetSegmentsCompacting(context.TODO(), []UniqueID{t.GetSegmentID()}, false)
err := s.meta.statsTaskMeta.DropStatsTask(t.GetTaskID())
if err == nil {
continue
}
log.Ctx(s.ctx).Warn("remove stats task failed, set to failed", zap.Int64("taskID", taskID), zap.Error(err))
task.taskInfo.State = indexpb.JobState_JobStateFailed
task.taskInfo.FailReason = "segment is not exist or is l0 compacting"
}
}
s.enqueue(&statsTask{
taskID: taskID,
segmentID: t.GetSegmentID(),
targetSegmentID: t.GetTargetSegmentID(),
nodeID: t.NodeID,
taskInfo: &workerpb.StatsResult{
TaskID: taskID,
State: t.GetState(),
FailReason: t.GetFailReason(),
},
queueTime: time.Now(),
startTime: time.Now(),
endTime: time.Now(),
subJobType: t.GetSubJobType(),
})
s.runningQueueLock.Lock()
s.runningTasks[taskID] = task
s.runningQueueLock.Unlock()
}
}
}
@ -210,36 +227,67 @@ func (s *taskScheduler) notify() {
}
}
func (s *taskScheduler) exist(taskID UniqueID) bool {
exist := s.pendingTasks.Exist(taskID)
if exist {
return true
}
s.runningQueueLock.RLock()
defer s.runningQueueLock.RUnlock()
_, ok := s.runningTasks[taskID]
return ok
}
func (s *taskScheduler) getRunningTask(taskID UniqueID) Task {
s.runningQueueLock.RLock()
defer s.runningQueueLock.RUnlock()
return s.runningTasks[taskID]
}
func (s *taskScheduler) removeRunningTask(taskID UniqueID) {
s.runningQueueLock.Lock()
defer s.runningQueueLock.Unlock()
delete(s.runningTasks, taskID)
}
func (s *taskScheduler) enqueue(task Task) {
defer s.notify()
s.Lock()
defer s.Unlock()
taskID := task.GetTaskID()
if _, ok := s.tasks[taskID]; !ok {
s.tasks[taskID] = task
s.taskStats.Add(taskID, task)
s.runningQueueLock.RLock()
_, ok := s.runningTasks[taskID]
s.runningQueueLock.RUnlock()
if !ok {
s.pendingTasks.Push(task)
task.SetQueueTime(time.Now())
log.Info("taskScheduler enqueue task", zap.Int64("taskID", taskID))
log.Ctx(s.ctx).Info("taskScheduler enqueue task", zap.Int64("taskID", taskID))
}
}
func (s *taskScheduler) GetTaskCount() int {
s.RLock()
defer s.RUnlock()
return len(s.tasks)
}
func (s *taskScheduler) AbortTask(taskID int64) {
log.Info("task scheduler receive abort task request", zap.Int64("taskID", taskID))
s.RLock()
task, ok := s.tasks[taskID]
s.RUnlock()
if ok {
log.Ctx(s.ctx).Info("task scheduler receive abort task request", zap.Int64("taskID", taskID))
task := s.pendingTasks.Get(taskID)
if task != nil {
s.taskLock.Lock(taskID)
task.SetState(indexpb.JobState_JobStateFailed, "canceled")
s.taskLock.Unlock(taskID)
}
s.runningQueueLock.Lock()
if task != nil {
s.runningTasks[taskID] = task
}
if runningTask, ok := s.runningTasks[taskID]; ok {
s.taskLock.Lock(taskID)
runningTask.SetState(indexpb.JobState_JobStateFailed, "canceled")
s.taskLock.Unlock(taskID)
}
s.runningQueueLock.Unlock()
s.pendingTasks.Remove(taskID)
}
func (s *taskScheduler) schedule() {
@ -265,69 +313,131 @@ func (s *taskScheduler) schedule() {
}
}
func (s *taskScheduler) getTask(taskID UniqueID) Task {
s.RLock()
defer s.RUnlock()
func (s *taskScheduler) checkProcessingTasksLoop() {
log.Ctx(s.ctx).Info("taskScheduler checkProcessingTasks loop start")
defer s.wg.Done()
ticker := time.NewTicker(s.scheduleDuration)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
log.Ctx(s.ctx).Warn("task scheduler ctx done")
return
case <-ticker.C:
s.checkProcessingTasks()
}
}
}
return s.tasks[taskID]
func (s *taskScheduler) checkProcessingTasks() {
runningTaskIDs := make([]UniqueID, 0)
s.runningQueueLock.RLock()
for taskID := range s.runningTasks {
runningTaskIDs = append(runningTaskIDs, taskID)
}
s.runningQueueLock.RUnlock()
log.Ctx(s.ctx).Info("check running tasks", zap.Int("runningTask num", len(runningTaskIDs)))
var wg sync.WaitGroup
sem := make(chan struct{}, 100)
for _, taskID := range runningTaskIDs {
wg.Add(1)
sem <- struct{}{}
taskID := taskID
go func(taskID int64) {
defer wg.Done()
task := s.getRunningTask(taskID)
s.taskLock.Lock(taskID)
suc := s.checkProcessingTask(task)
s.taskLock.Unlock(taskID)
if suc {
s.removeRunningTask(taskID)
}
<-sem
}(taskID)
}
wg.Wait()
}
func (s *taskScheduler) checkProcessingTask(task Task) bool {
switch task.GetState() {
case indexpb.JobState_JobStateInProgress:
return s.processInProgress(task)
case indexpb.JobState_JobStateRetry:
return s.processRetry(task)
case indexpb.JobState_JobStateFinished, indexpb.JobState_JobStateFailed:
return s.processFinished(task)
default:
log.Ctx(s.ctx).Error("invalid task state in running queue", zap.Int64("taskID", task.GetTaskID()), zap.String("state", task.GetState().String()))
}
return false
}
func (s *taskScheduler) run() {
// schedule policy
s.RLock()
taskIDs := make([]UniqueID, 0, len(s.tasks))
for tID := range s.tasks {
taskIDs = append(taskIDs, tID)
}
s.RUnlock()
if len(taskIDs) > 0 {
log.Ctx(s.ctx).Info("task scheduler", zap.Int("task num", len(taskIDs)))
pendingTaskNum := s.pendingTasks.TaskCount()
if pendingTaskNum == 0 {
return
}
s.policy(taskIDs)
for _, taskID := range taskIDs {
s.taskLock.Lock(taskID)
ok := s.process(taskID)
if !ok {
s.taskLock.Unlock(taskID)
log.Ctx(s.ctx).Info("there is no idle indexing node, waiting for retry...")
// 1. pick an indexNode client
nodeSlots := s.nodeManager.QuerySlots()
log.Ctx(s.ctx).Info("task scheduler", zap.Int("task num", pendingTaskNum), zap.Any("nodeSlots", nodeSlots))
var wg sync.WaitGroup
sem := make(chan struct{}, 100)
for {
nodeID := pickNode(nodeSlots)
if nodeID == -1 {
log.Ctx(s.ctx).Debug("pick node failed")
break
}
s.taskLock.Unlock(taskID)
task := s.pendingTasks.Pop()
if task == nil {
break
}
wg.Add(1)
sem <- struct{}{}
go func(task Task, nodeID int64) {
defer wg.Done()
s.taskLock.Lock(task.GetTaskID())
s.process(task, nodeID)
s.taskLock.Unlock(task.GetTaskID())
switch task.GetState() {
case indexpb.JobState_JobStateNone:
return
case indexpb.JobState_JobStateInit:
s.pendingTasks.Push(task)
default:
s.runningQueueLock.Lock()
s.runningTasks[task.GetTaskID()] = task
s.runningQueueLock.Unlock()
}
<-sem
}(task, nodeID)
}
wg.Wait()
}
func (s *taskScheduler) removeTask(taskID UniqueID) {
s.Lock()
defer s.Unlock()
delete(s.tasks, taskID)
}
func (s *taskScheduler) process(taskID UniqueID) bool {
task := s.getTask(taskID)
func (s *taskScheduler) process(task Task, nodeID int64) bool {
if !task.CheckTaskHealthy(s.meta) {
s.removeTask(taskID)
task.SetState(indexpb.JobState_JobStateNone, "task not healthy")
return true
}
state := task.GetState()
log.Ctx(s.ctx).Info("task is processing", zap.Int64("taskID", taskID),
zap.String("task type", task.GetTaskType()), zap.String("state", state.String()))
log.Ctx(s.ctx).Info("task is processing", zap.Int64("taskID", task.GetTaskID()),
zap.String("task type", task.GetTaskType()), zap.String("state", task.GetState().String()))
switch state {
switch task.GetState() {
case indexpb.JobState_JobStateNone:
s.removeTask(taskID)
return true
case indexpb.JobState_JobStateInit:
return s.processInit(task)
case indexpb.JobState_JobStateFinished, indexpb.JobState_JobStateFailed:
return s.processFinished(task)
case indexpb.JobState_JobStateRetry:
return s.processRetry(task)
return s.processInit(task, nodeID)
default:
// state: in_progress
return s.processInProgress(task)
log.Ctx(s.ctx).Error("invalid task state in pending queue", zap.Int64("taskID", task.GetTaskID()), zap.String("state", task.GetState().String()))
}
return true
}
@ -343,32 +453,23 @@ func (s *taskScheduler) collectTaskMetrics() {
log.Warn("task scheduler context done")
return
case <-ticker.C:
s.RLock()
taskIDs := make([]UniqueID, 0, len(s.tasks))
for tID := range s.tasks {
taskIDs = append(taskIDs, tID)
}
s.RUnlock()
maxTaskQueueingTime := make(map[string]int64)
maxTaskRunningTime := make(map[string]int64)
collectMetricsFunc := func(taskID int64) {
task := s.getTask(taskID)
collectPendingMetricsFunc := func(taskID int64) {
task := s.pendingTasks.Get(taskID)
if task == nil {
return
}
s.taskLock.Lock(taskID)
defer s.taskLock.Unlock(taskID)
state := task.GetState()
switch state {
case indexpb.JobState_JobStateNone:
return
switch task.GetState() {
case indexpb.JobState_JobStateInit:
queueingTime := time.Since(task.GetQueueTime())
if queueingTime > Params.DataCoordCfg.TaskSlowThreshold.GetAsDuration(time.Second) {
log.Warn("task queueing time is too long", zap.Int64("taskID", taskID),
log.Ctx(s.ctx).Warn("task queueing time is too long", zap.Int64("taskID", taskID),
zap.Int64("queueing time(ms)", queueingTime.Milliseconds()))
}
@ -376,10 +477,18 @@ func (s *taskScheduler) collectTaskMetrics() {
if !ok || maxQueueingTime < queueingTime.Milliseconds() {
maxTaskQueueingTime[task.GetTaskType()] = queueingTime.Milliseconds()
}
}
}
collectRunningMetricsFunc := func(task Task) {
s.taskLock.Lock(task.GetTaskID())
defer s.taskLock.Unlock(task.GetTaskID())
switch task.GetState() {
case indexpb.JobState_JobStateInProgress:
runningTime := time.Since(task.GetStartTime())
if runningTime > Params.DataCoordCfg.TaskSlowThreshold.GetAsDuration(time.Second) {
log.Warn("task running time is too long", zap.Int64("taskID", taskID),
log.Ctx(s.ctx).Warn("task running time is too long", zap.Int64("taskID", task.GetTaskID()),
zap.Int64("running time(ms)", runningTime.Milliseconds()))
}
@ -390,10 +499,18 @@ func (s *taskScheduler) collectTaskMetrics() {
}
}
taskIDs := s.pendingTasks.Keys()
for _, taskID := range taskIDs {
collectMetricsFunc(taskID)
collectPendingMetricsFunc(taskID)
}
s.runningQueueLock.RLock()
for _, task := range s.runningTasks {
collectRunningMetricsFunc(task)
}
s.runningQueueLock.RUnlock()
for taskType, queueingTime := range maxTaskQueueingTime {
metrics.DataCoordTaskExecuteLatency.
WithLabelValues(taskType, metrics.Pending).Observe(float64(queueingTime))
@ -407,7 +524,7 @@ func (s *taskScheduler) collectTaskMetrics() {
}
}
func (s *taskScheduler) processInit(task Task) bool {
func (s *taskScheduler) processInit(task Task, nodeID int64) bool {
// 0. pre check task
// Determine whether the task can be performed or if it is truly necessary.
// for example: flat index doesn't need to actually build. checkPass is false.
@ -416,10 +533,9 @@ func (s *taskScheduler) processInit(task Task) bool {
return true
}
// 1. pick an indexNode client
nodeID, client := s.nodeManager.PickClient()
if client == nil {
log.Ctx(s.ctx).Debug("pick client failed")
client, exist := s.nodeManager.GetClientByID(nodeID)
if !exist || client == nil {
log.Ctx(s.ctx).Debug("get indexnode client failed", zap.Int64("nodeID", nodeID))
return false
}
log.Ctx(s.ctx).Info("pick client success", zap.Int64("taskID", task.GetTaskID()), zap.Int64("nodeID", nodeID))
@ -458,18 +574,18 @@ func (s *taskScheduler) processInit(task Task) bool {
WithLabelValues(task.GetTaskType(), metrics.Pending).Observe(float64(queueingTime.Milliseconds()))
log.Ctx(s.ctx).Info("update task meta state to InProgress success", zap.Int64("taskID", task.GetTaskID()),
zap.Int64("nodeID", nodeID))
return s.processInProgress(task)
return true
}
func (s *taskScheduler) processFinished(task Task) bool {
if err := task.SetJobInfo(s.meta); err != nil {
log.Ctx(s.ctx).Warn("update task info failed", zap.Error(err))
return true
return false
}
task.SetEndTime(time.Now())
runningTime := task.GetEndTime().Sub(task.GetStartTime())
if runningTime > Params.DataCoordCfg.TaskSlowThreshold.GetAsDuration(time.Second) {
log.Warn("task running time is too long", zap.Int64("taskID", task.GetTaskID()),
log.Ctx(s.ctx).Warn("task running time is too long", zap.Int64("taskID", task.GetTaskID()),
zap.Int64("running time(ms)", runningTime.Milliseconds()))
}
metrics.DataCoordTaskExecuteLatency.
@ -477,10 +593,13 @@ func (s *taskScheduler) processFinished(task Task) bool {
client, exist := s.nodeManager.GetClientByID(task.GetNodeID())
if exist {
if !task.DropTaskOnWorker(s.ctx, client) {
return true
return false
}
}
s.removeTask(task.GetTaskID())
log.Ctx(s.ctx).Info("task has been finished", zap.Int64("taskID", task.GetTaskID()),
zap.Int64("queueing time(ms)", task.GetStartTime().Sub(task.GetQueueTime()).Milliseconds()),
zap.Int64("running time(ms)", runningTime.Milliseconds()),
zap.Int64("total time(ms)", task.GetEndTime().Sub(task.GetQueueTime()).Milliseconds()))
return true
}
@ -488,11 +607,16 @@ func (s *taskScheduler) processRetry(task Task) bool {
client, exist := s.nodeManager.GetClientByID(task.GetNodeID())
if exist {
if !task.DropTaskOnWorker(s.ctx, client) {
return true
return false
}
}
task.SetState(indexpb.JobState_JobStateInit, "")
task.ResetTask(s.meta)
log.Ctx(s.ctx).Info("processRetry success, set task to pending queue", zap.Int64("taskID", task.GetTaskID()),
zap.String("state", task.GetState().String()))
s.pendingTasks.Push(task)
return true
}
@ -504,8 +628,19 @@ func (s *taskScheduler) processInProgress(task Task) bool {
task.ResetTask(s.meta)
return s.processFinished(task)
}
return true
return false
}
log.Ctx(s.ctx).Info("node does not exist, set task state to retry", zap.Int64("taskID", task.GetTaskID()))
task.SetState(indexpb.JobState_JobStateRetry, "node does not exist")
return true
return false
}
func pickNode(nodeSlots map[int64]int64) int64 {
for w, slots := range nodeSlots {
if slots >= 1 {
nodeSlots[w] = slots - 1
return w
}
}
return -1
}

View File

@ -19,6 +19,7 @@ package datacoord
import (
"context"
"fmt"
"go.uber.org/zap"
"sync"
"testing"
"time"
@ -35,7 +36,9 @@ import (
catalogmocks "github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/proto/datapb"
"github.com/milvus-io/milvus/pkg/proto/indexpb"
"github.com/milvus-io/milvus/pkg/proto/workerpb"
@ -718,7 +721,7 @@ func (s *taskSchedulerSuite) createAnalyzeMeta(catalog metastore.DataCoordCatalo
SegmentIDs: s.segmentIDs,
TaskID: 2,
NodeID: s.nodeID,
State: indexpb.JobState_JobStateInProgress,
State: indexpb.JobState_JobStateInit,
FieldType: schemapb.DataType_FloatVector,
},
3: {
@ -841,7 +844,11 @@ func (s *taskSchedulerSuite) scheduler(handler Handler) {
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil)
workerManager := session.NewMockWorkerManager(s.T())
workerManager.EXPECT().PickClient().Return(s.nodeID, in)
workerManager.EXPECT().QuerySlots().RunAndReturn(func() map[int64]int64 {
return map[int64]int64{
1: 1,
}
})
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true)
mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)),
@ -854,16 +861,17 @@ func (s *taskSchedulerSuite) scheduler(handler Handler) {
cm.EXPECT().RootPath().Return("root")
scheduler := newTaskScheduler(ctx, mt, workerManager, cm, newIndexEngineVersionManager(), handler, nil, nil)
s.Equal(9, len(scheduler.tasks))
s.Equal(indexpb.JobState_JobStateInit, scheduler.tasks[1].GetState())
s.Equal(indexpb.JobState_JobStateInProgress, scheduler.tasks[2].GetState())
s.Equal(indexpb.JobState_JobStateRetry, scheduler.tasks[5].GetState())
s.Equal(indexpb.JobState_JobStateInit, scheduler.tasks[buildID].GetState())
s.Equal(indexpb.JobState_JobStateInProgress, scheduler.tasks[buildID+1].GetState())
s.Equal(indexpb.JobState_JobStateInit, scheduler.tasks[buildID+3].GetState())
s.Equal(indexpb.JobState_JobStateInProgress, scheduler.tasks[buildID+8].GetState())
s.Equal(indexpb.JobState_JobStateInit, scheduler.tasks[buildID+9].GetState())
s.Equal(indexpb.JobState_JobStateInit, scheduler.tasks[buildID+10].GetState())
s.Equal(6, scheduler.pendingTasks.TaskCount())
s.Equal(3, len(scheduler.runningTasks))
s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(1).GetState())
s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(2).GetState())
s.Equal(indexpb.JobState_JobStateRetry, scheduler.runningTasks[5].GetState())
s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(buildID).GetState())
s.Equal(indexpb.JobState_JobStateInProgress, scheduler.runningTasks[buildID+1].GetState())
s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(buildID+3).GetState())
s.Equal(indexpb.JobState_JobStateInProgress, scheduler.runningTasks[buildID+8].GetState())
s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(buildID+9).GetState())
s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(buildID+10).GetState())
mt.segments.DropSegment(segID + 9)
@ -894,12 +902,15 @@ func (s *taskSchedulerSuite) scheduler(handler Handler) {
})
for {
scheduler.RLock()
taskNum := len(scheduler.tasks)
scheduler.RUnlock()
if taskNum == 0 {
break
if scheduler.pendingTasks.TaskCount() == 0 {
// maybe task is waiting for assigning, so sleep three seconds.
time.Sleep(time.Second * 3)
scheduler.runningQueueLock.RLock()
taskNum := len(scheduler.runningTasks)
scheduler.runningQueueLock.RUnlock()
if taskNum == 0 {
break
}
}
time.Sleep(time.Second)
}
@ -973,6 +984,11 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
catalog := catalogmocks.NewDataCoordCatalog(s.T())
workerManager := session.NewMockWorkerManager(s.T())
workerManager.EXPECT().QuerySlots().RunAndReturn(func() map[int64]int64 {
return map[int64]int64{
1: 1,
}
})
mt := createMeta(catalog,
withAnalyzeMeta(&analyzeMeta{
@ -1011,12 +1027,13 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
workerManager.EXPECT().GetClientByID(mock.Anything).Return(nil, false).Once()
for {
scheduler.RLock()
taskNum := len(scheduler.tasks)
scheduler.RUnlock()
if taskNum == 0 {
break
if scheduler.pendingTasks.TaskCount() == 0 {
scheduler.runningQueueLock.RLock()
taskNum := len(scheduler.runningTasks)
scheduler.runningQueueLock.RUnlock()
if taskNum == 0 {
break
}
}
time.Sleep(time.Second)
}
@ -1034,6 +1051,11 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
in := mocks.NewMockIndexNodeClient(s.T())
workerManager := session.NewMockWorkerManager(s.T())
workerManager.EXPECT().QuerySlots().RunAndReturn(func() map[int64]int64 {
return map[int64]int64{
1: 1,
}
})
mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)),
withIndexMeta(&indexMeta{
@ -1076,14 +1098,12 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
// pick client fail --> state: init
workerManager.EXPECT().PickClient().Return(0, nil).Once()
// update version failed --> state: init
workerManager.EXPECT().PickClient().Return(s.nodeID, in)
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(errors.New("catalog update version error")).Once()
// assign task to indexNode fail --> state: retry
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Once()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(&commonpb.Status{
Code: 65535,
@ -1102,6 +1122,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
// update state to building failed --> state: retry
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Once()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(errors.New("catalog update building state error")).Once()
@ -1111,6 +1132,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
// assign success --> state: InProgress
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
@ -1164,6 +1186,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
// init --> state: InProgress
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
@ -1178,6 +1201,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
// init --> state: InProgress
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
@ -1194,6 +1218,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
// init --> state: InProgress
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
@ -1205,6 +1230,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
// init --> state: InProgress
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
@ -1250,12 +1276,14 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
for {
scheduler.RLock()
taskNum := len(scheduler.tasks)
scheduler.RUnlock()
if scheduler.pendingTasks.TaskCount() == 0 {
scheduler.runningQueueLock.RLock()
taskNum := len(scheduler.runningTasks)
scheduler.runningQueueLock.RUnlock()
if taskNum == 0 {
break
if taskNum == 0 {
break
}
}
time.Sleep(time.Second)
}
@ -1272,6 +1300,11 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() {
catalog := catalogmocks.NewDataCoordCatalog(s.T())
in := mocks.NewMockIndexNodeClient(s.T())
workerManager := session.NewMockWorkerManager(s.T())
workerManager.EXPECT().QuerySlots().RunAndReturn(func() map[int64]int64 {
return map[int64]int64{
1: 1,
}
})
mt := createMeta(catalog,
withAnalyzeMeta(&analyzeMeta{
@ -1348,89 +1381,133 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() {
scheduler.Start()
// get collection info failed --> init
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(nil, errors.New("mock error")).Once()
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, collID int64) (*collectionInfo, error) {
log.Debug("get collection info failed", zap.Int64("collectionID", collID))
return nil, errors.New("mock error")
}).Once()
// get collection info success, get dim failed --> init
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{FieldID: 100, Name: "pk", IsPrimaryKey: true, IsPartitionKey: true, DataType: schemapb.DataType_Int64},
{FieldID: s.fieldID, Name: "vec"},
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, i int64) (*collectionInfo, error) {
log.Debug("get collection info success", zap.Int64("collectionID", i))
return &collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{FieldID: 100, Name: "pk", IsPrimaryKey: true, IsPartitionKey: true, DataType: schemapb.DataType_Int64},
{FieldID: s.fieldID, Name: "vec"},
},
},
},
}, nil).Once()
}, nil
}).Once()
// assign failed --> retry
workerManager.EXPECT().PickClient().Return(s.nodeID, in).Once()
catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil).Once()
workerManager.EXPECT().GetClientByID(mock.Anything).RunAndReturn(func(nodeID int64) (types.IndexNodeClient, bool) {
log.Debug("get client success, but assign failed", zap.Int64("nodeID", nodeID))
return in, true
}).Once()
catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, indices []*model.SegmentIndex) error {
log.Debug("alter segment indexes success, but assign failed", zap.Int64("taskID", indices[0].BuildID))
return nil
}).Once()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *workerpb.CreateJobV2Request, option ...grpc.CallOption) (*commonpb.Status, error) {
indexNodeTasks[request.GetTaskID()]++
log.Debug("assign task failed", zap.Int64("taskID", request.GetTaskID()))
return nil, errors.New("mock error")
}).Once()
// retry --> init
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
workerManager.EXPECT().GetClientByID(mock.Anything).RunAndReturn(func(nodeID int64) (types.IndexNodeClient, bool) {
log.Debug("assign failed, drop task on worker", zap.Int64("nodeID", nodeID))
return in, true
}).Once()
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *workerpb.DropJobsV2Request, option ...grpc.CallOption) (*commonpb.Status, error) {
for _, taskID := range request.GetTaskIDs() {
indexNodeTasks[taskID]--
}
log.Debug("drop task on worker, success", zap.Int64s("taskIDs", request.GetTaskIDs()))
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}).Once()
// init --> inProgress
workerManager.EXPECT().PickClient().Return(s.nodeID, in).Once()
catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil).Twice()
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{FieldID: 100, Name: "pk", IsPrimaryKey: true, IsPartitionKey: true, DataType: schemapb.DataType_Int64},
{FieldID: s.fieldID, Name: "vec", TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "10"}}},
workerManager.EXPECT().GetClientByID(mock.Anything).RunAndReturn(func(nodeID int64) (types.IndexNodeClient, bool) {
log.Debug("assign task success", zap.Int64("nodeID", nodeID))
return in, true
}).Once()
catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, indices []*model.SegmentIndex) error {
log.Debug("alter segment success twice and assign task success", zap.Int64("taskID", indices[0].BuildID))
return nil
}).Twice()
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, i int64) (*collectionInfo, error) {
log.Debug("get collection success and assign task success", zap.Int64("collID", i))
return &collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{FieldID: 100, Name: "pk", IsPrimaryKey: true, IsPartitionKey: true, DataType: schemapb.DataType_Int64},
{FieldID: s.fieldID, Name: "vec", TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "10"}}},
},
},
},
}, nil).Once()
}, nil
}).Once()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *workerpb.CreateJobV2Request, option ...grpc.CallOption) (*commonpb.Status, error) {
indexNodeTasks[request.GetTaskID()]++
log.Debug("assign task success", zap.Int64("nodeID", nodeID))
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}).Once()
// inProgress --> Finished
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
in.EXPECT().QueryJobsV2(mock.Anything, mock.Anything).Return(&workerpb.QueryJobsV2Response{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
ClusterID: "",
Result: &workerpb.QueryJobsV2Response_IndexJobResults{
IndexJobResults: &workerpb.IndexJobResults{
Results: []*workerpb.IndexTaskInfo{
{
BuildID: buildID,
State: commonpb.IndexState_Finished,
IndexFileKeys: []string{"file1", "file2"},
SerializedSize: 1024,
workerManager.EXPECT().GetClientByID(mock.Anything).RunAndReturn(func(nodeID int64) (types.IndexNodeClient, bool) {
log.Debug("get task result success, task is finished", zap.Int64("nodeID", nodeID))
return in, true
}).Once()
in.EXPECT().QueryJobsV2(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *workerpb.QueryJobsV2Request, option ...grpc.CallOption) (*workerpb.QueryJobsV2Response, error) {
log.Debug("query task result success, task is finished", zap.Int64s("taskIDs", request.GetTaskIDs()))
return &workerpb.QueryJobsV2Response{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
ClusterID: "",
Result: &workerpb.QueryJobsV2Response_IndexJobResults{
IndexJobResults: &workerpb.IndexJobResults{
Results: []*workerpb.IndexTaskInfo{
{
BuildID: buildID,
State: commonpb.IndexState_Finished,
IndexFileKeys: []string{"file1", "file2"},
SerializedSize: 1024,
},
},
},
},
},
}, nil)
}, nil
})
// finished --> done
catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil).Once()
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
finishCH := make(chan struct{})
catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, indices []*model.SegmentIndex) error {
log.Debug("task is finished, alter segment index success", zap.Int64("taskID", indices[0].BuildID))
return nil
}).Once()
workerManager.EXPECT().GetClientByID(mock.Anything).RunAndReturn(func(nodeID int64) (types.IndexNodeClient, bool) {
log.Debug("task is finished, drop task on worker", zap.Int64("nodeID", nodeID))
return in, true
}).Once()
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *workerpb.DropJobsV2Request, option ...grpc.CallOption) (*commonpb.Status, error) {
for _, taskID := range request.GetTaskIDs() {
indexNodeTasks[taskID]--
finishCH <- struct{}{}
}
log.Debug("task is finished, drop task on worker success", zap.Int64("nodeID", nodeID))
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}).Once()
<-finishCH
for {
scheduler.RLock()
taskNum := len(scheduler.tasks)
scheduler.RUnlock()
if taskNum == 0 {
break
if scheduler.pendingTasks.TaskCount() == 0 {
scheduler.runningQueueLock.RLock()
taskNum := len(scheduler.runningTasks)
scheduler.runningQueueLock.RUnlock()
if taskNum == 0 {
break
}
}
time.Sleep(time.Second)
}
@ -1458,7 +1535,11 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
in := mocks.NewMockIndexNodeClient(s.T())
workerManager := session.NewMockWorkerManager(s.T())
workerManager.EXPECT().PickClient().Return(s.nodeID, in)
workerManager.EXPECT().QuerySlots().RunAndReturn(func() map[int64]int64 {
return map[int64]int64{
1: 1,
}
})
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true)
minNumberOfRowsToBuild := paramtable.Get().DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() + 1
@ -1619,18 +1700,21 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
waitTaskDoneFunc := func(sche *taskScheduler) {
for {
sche.RLock()
taskNum := len(sche.tasks)
sche.RUnlock()
if taskNum == 0 {
break
if sche.pendingTasks.TaskCount() == 0 {
sche.runningQueueLock.RLock()
taskNum := len(sche.runningTasks)
sche.runningQueueLock.RUnlock()
if taskNum == 0 {
break
}
}
time.Sleep(time.Second)
}
}
resetMetaFunc := func() {
mt.indexMeta.Lock()
defer mt.indexMeta.Unlock()
t, ok := mt.indexMeta.segmentBuildInfo.Get(buildID)
s.True(ok)
t.IndexState = commonpb.IndexState_Unissued
@ -1680,8 +1764,8 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
s.NotZero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set")
return merr.Success(), nil
}).Once()
s.Equal(1, len(scheduler.tasks))
s.Equal(indexpb.JobState_JobStateInit, scheduler.tasks[buildID].GetState())
s.Equal(1, scheduler.pendingTasks.TaskCount())
s.Equal(indexpb.JobState_JobStateInit, scheduler.pendingTasks.Get(buildID).GetState())
scheduler.Start()
waitTaskDoneFunc(scheduler)
@ -1859,6 +1943,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
s.Run("Submit partitionKeyIsolation is false when MV not enabled", func() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.Equal(in.GetIndexRequest().PartitionKeyIsolation, false)
@ -1884,7 +1969,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
isoCollInfo.Properties[common.PartitionKeyIsolationKey] = "true"
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.Equal(in.GetIndexRequest().PartitionKeyIsolation, true)
s.True(in.GetIndexRequest().PartitionKeyIsolation)
return merr.Success(), nil
}).Once()
t := &indexBuildTask{
@ -1957,8 +2042,7 @@ func (s *taskSchedulerSuite) Test_reload() {
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler)
s.NotNil(scheduler)
s.True(mt.segments.segments[1000].isCompacting)
task, ok := scheduler.tasks[statsTaskID]
s.True(ok)
task := scheduler.runningTasks[statsTaskID]
s.NotNil(task)
})
@ -1994,8 +2078,7 @@ func (s *taskSchedulerSuite) Test_reload() {
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler)
s.NotNil(scheduler)
s.True(mt.segments.segments[1000].isCompacting)
task, ok := scheduler.tasks[statsTaskID]
s.False(ok)
task := scheduler.pendingTasks.Get(statsTaskID)
s.Nil(task)
})
@ -2031,8 +2114,7 @@ func (s *taskSchedulerSuite) Test_reload() {
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler)
s.NotNil(scheduler)
s.True(mt.segments.segments[1000].isCompacting)
task, ok := scheduler.tasks[statsTaskID]
s.True(ok)
task := scheduler.runningTasks[statsTaskID]
s.Equal(indexpb.JobState_JobStateFailed, task.GetState())
})
@ -2068,8 +2150,7 @@ func (s *taskSchedulerSuite) Test_reload() {
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler)
s.NotNil(scheduler)
s.False(mt.segments.segments[1000].isCompacting)
task, ok := scheduler.tasks[statsTaskID]
s.False(ok)
task := scheduler.pendingTasks.Get(statsTaskID)
s.Nil(task)
})
@ -2105,8 +2186,7 @@ func (s *taskSchedulerSuite) Test_reload() {
scheduler := newTaskScheduler(context.Background(), mt, workerManager, nil, nil, handler, nil, compactionHandler)
s.NotNil(scheduler)
s.False(mt.segments.segments[1000].isCompacting)
task, ok := scheduler.tasks[statsTaskID]
s.True(ok)
task := scheduler.runningTasks[statsTaskID]
s.Equal(indexpb.JobState_JobStateFailed, task.GetState())
})
}

View File

@ -160,7 +160,6 @@ func (st *statsTask) UpdateMetaBuildingState(meta *meta) error {
}
func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bool {
// set segment compacting
log := log.Ctx(ctx).With(zap.Int64("taskID", st.taskID), zap.Int64("segmentID", st.segmentID))
segment := dependency.meta.GetHealthySegment(ctx, st.segmentID)
if segment == nil {

View File

@ -382,6 +382,7 @@ func (i *IndexNode) CreateJobV2(ctx context.Context, req *workerpb.CreateJobV2Re
log.Info("receive stats job", zap.Int64("collectionID", statsRequest.GetCollectionID()),
zap.Int64("partitionID", statsRequest.GetPartitionID()),
zap.Int64("segmentID", statsRequest.GetSegmentID()),
zap.Int64("numRows", statsRequest.GetNumRows()),
zap.Int64("targetSegmentID", statsRequest.GetTargetSegmentID()),
zap.String("subJobType", statsRequest.GetSubJobType().String()),
zap.Int64("startLogID", statsRequest.GetStartLogID()),

View File

@ -116,12 +116,13 @@ func (st *statsTask) PreExecute(ctx context.Context) error {
defer span.End()
st.queueDur = st.tr.RecordSpan()
log.Ctx(ctx).Info("Begin to prepare stats task",
log.Ctx(ctx).Info("Begin to PreExecute stats task",
zap.String("clusterID", st.req.GetClusterID()),
zap.Int64("taskID", st.req.GetTaskID()),
zap.Int64("collectionID", st.req.GetCollectionID()),
zap.Int64("partitionID", st.req.GetPartitionID()),
zap.Int64("segmentID", st.req.GetSegmentID()),
zap.Int64("queue duration(ms)", st.queueDur.Milliseconds()),
)
if err := binlog.DecompressBinLogWithRootPath(st.req.GetStorageConfig().GetRootPath(), storage.InsertBinlog, st.req.GetCollectionID(), st.req.GetPartitionID(),
@ -152,6 +153,16 @@ func (st *statsTask) PreExecute(ctx context.Context) error {
}
}
preExecuteRecordSpan := st.tr.RecordSpan()
log.Ctx(ctx).Info("successfully PreExecute stats task",
zap.String("clusterID", st.req.GetClusterID()),
zap.Int64("taskID", st.req.GetTaskID()),
zap.Int64("collectionID", st.req.GetCollectionID()),
zap.Int64("partitionID", st.req.GetPartitionID()),
zap.Int64("segmentID", st.req.GetSegmentID()),
zap.Int64("preExecuteRecordSpan(ms)", preExecuteRecordSpan.Milliseconds()),
)
return nil
}