diff --git a/codecov.yml b/codecov.yml index 7ea888c3d9..8d039a792e 100644 --- a/codecov.yml +++ b/codecov.yml @@ -38,7 +38,7 @@ ignore: - "**/*.pb.go" - "**/*.proto" - "internal/metastore/db/dbmodel/mocks/.*" - - "internal/mocks" + - "**/mock_*.go" diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 9f7ffe1c98..315547dbae 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -184,6 +184,7 @@ queryCoord: loadTimeoutSeconds: 600 checkHandoffInterval: 5000 taskMergeCap: 16 + taskExecutionCap: 256 enableActiveStandby: false # Enable active-standby # Related configuration of queryNode, used to run hybrid search between vector and scalar data. diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 543679ae63..c2e98674ff 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -311,6 +311,7 @@ func (s *Server) Start() error { } for _, node := range sessions { s.nodeMgr.Add(session.NewNodeInfo(node.ServerID, node.Address)) + s.taskScheduler.AddExecutor(node.ServerID) } s.checkReplicas() for _, node := range sessions { @@ -571,6 +572,7 @@ func (s *Server) watchNodes(revision int64) { func (s *Server) handleNodeUp(node int64) { log := log.With(zap.Int64("nodeID", node)) + s.taskScheduler.AddExecutor(node) s.distController.StartDistInstance(s.ctx, node) for _, collection := range s.meta.CollectionManager.GetAll() { @@ -598,6 +600,7 @@ func (s *Server) handleNodeUp(node int64) { func (s *Server) handleNodeDown(node int64) { log := log.With(zap.Int64("nodeID", node)) + s.taskScheduler.RemoveExecutor(node) s.distController.Remove(node) // Refresh the targets, to avoid consuming messages too early from channel diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 3346a0b573..a695a74c4c 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -26,9 +26,11 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" + . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/internal/util/tsoutil" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -45,7 +47,8 @@ type Executor struct { // Merge load segment requests merger *Merger[segmentIndex, *querypb.LoadSegmentsRequest] - executingTasks sync.Map + executingTasks sync.Map + executingTaskNum atomic.Int32 } func NewExecutor(meta *meta.Meta, @@ -82,10 +85,14 @@ func (ex *Executor) Stop() { // does nothing and returns false if the action is already committed, // returns true otherwise. func (ex *Executor) Execute(task Task, step int) bool { + if ex.executingTaskNum.Load() > Params.QueryCoordCfg.TaskExecutionCap { + return false + } _, exist := ex.executingTasks.LoadOrStore(task.ID(), struct{}{}) if exist { return false } + ex.executingTaskNum.Inc() log := log.With( zap.Int64("taskID", task.ID()), @@ -137,7 +144,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) { defer func() { for i := range mergeTask.tasks { mergeTask.tasks[i].SetErr(task.Err()) - ex.removeAction(mergeTask.tasks[i], mergeTask.steps[i]) + ex.removeTask(mergeTask.tasks[i], mergeTask.steps[i]) } }() @@ -180,7 +187,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) { log.Info("load segments done", zap.Int64("taskID", task.ID()), zap.Duration("timeTaken", elapsed)) } -func (ex *Executor) removeAction(task Task, step int) { +func (ex *Executor) removeTask(task Task, step int) { if task.Err() != nil { log.Info("excute action done, remove it", zap.Int64("taskID", task.ID()), @@ -189,6 +196,7 @@ func (ex *Executor) removeAction(task Task, step int) { } ex.executingTasks.Delete(task.ID()) + ex.executingTaskNum.Dec() } func (ex *Executor) executeSegmentAction(task *SegmentTask, step int) { @@ -218,7 +226,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { if err != nil { task.SetErr(err) task.Cancel() - ex.removeAction(task, step) + ex.removeTask(task, step) } }() @@ -270,7 +278,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { } func (ex *Executor) releaseSegment(task *SegmentTask, step int) { - defer ex.removeAction(task, step) + defer ex.removeTask(task, step) startTs := time.Now() action := task.Actions()[step].(*SegmentAction) defer action.isReleaseCommitted.Store(true) @@ -343,7 +351,7 @@ func (ex *Executor) executeDmChannelAction(task *ChannelTask, step int) { } func (ex *Executor) subDmChannel(task *ChannelTask, step int) error { - defer ex.removeAction(task, step) + defer ex.removeTask(task, step) startTs := time.Now() action := task.Actions()[step].(*ChannelAction) log := log.With( @@ -415,7 +423,7 @@ func (ex *Executor) subDmChannel(task *ChannelTask, step int) error { } func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) error { - defer ex.removeAction(task, step) + defer ex.removeTask(task, step) startTs := time.Now() action := task.Actions()[step].(*ChannelAction) log := log.With( diff --git a/internal/querycoordv2/task/mock_scheduler.go b/internal/querycoordv2/task/mock_scheduler.go index c9dc4b0979..83773b8c55 100644 --- a/internal/querycoordv2/task/mock_scheduler.go +++ b/internal/querycoordv2/task/mock_scheduler.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.14.1. DO NOT EDIT. package task @@ -58,6 +58,34 @@ func (_c *MockScheduler_Add_Call) Return(_a0 error) *MockScheduler_Add_Call { return _c } +// AddExecutor provides a mock function with given fields: nodeID +func (_m *MockScheduler) AddExecutor(nodeID int64) { + _m.Called(nodeID) +} + +// MockScheduler_AddExecutor_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddExecutor' +type MockScheduler_AddExecutor_Call struct { + *mock.Call +} + +// AddExecutor is a helper method to define mock.On call +// - nodeID int64 +func (_e *MockScheduler_Expecter) AddExecutor(nodeID interface{}) *MockScheduler_AddExecutor_Call { + return &MockScheduler_AddExecutor_Call{Call: _e.mock.On("AddExecutor", nodeID)} +} + +func (_c *MockScheduler_AddExecutor_Call) Run(run func(nodeID int64)) *MockScheduler_AddExecutor_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockScheduler_AddExecutor_Call) Return() *MockScheduler_AddExecutor_Call { + _c.Call.Return() + return _c +} + // Dispatch provides a mock function with given fields: node func (_m *MockScheduler) Dispatch(node int64) { _m.Called(node) @@ -188,6 +216,34 @@ func (_c *MockScheduler_RemoveByNode_Call) Return() *MockScheduler_RemoveByNode_ return _c } +// RemoveExecutor provides a mock function with given fields: nodeID +func (_m *MockScheduler) RemoveExecutor(nodeID int64) { + _m.Called(nodeID) +} + +// MockScheduler_RemoveExecutor_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveExecutor' +type MockScheduler_RemoveExecutor_Call struct { + *mock.Call +} + +// RemoveExecutor is a helper method to define mock.On call +// - nodeID int64 +func (_e *MockScheduler_Expecter) RemoveExecutor(nodeID interface{}) *MockScheduler_RemoveExecutor_Call { + return &MockScheduler_RemoveExecutor_Call{Call: _e.mock.On("RemoveExecutor", nodeID)} +} + +func (_c *MockScheduler_RemoveExecutor_Call) Run(run func(nodeID int64)) *MockScheduler_RemoveExecutor_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockScheduler_RemoveExecutor_Call) Return() *MockScheduler_RemoveExecutor_Call { + _c.Call.Return() + return _c +} + // Start provides a mock function with given fields: ctx func (_m *MockScheduler) Start(ctx context.Context) { _m.Called(ctx) diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 5265eacfd0..1e06d8588e 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -38,8 +38,6 @@ const ( TaskTypeGrow Type = iota + 1 TaskTypeReduce TaskTypeMove - - taskPoolSize = 256 ) var ( @@ -56,8 +54,6 @@ var ( // No enough memory to load segment ErrResourceNotEnough = errors.New("ResourceNotEnough") - ErrTaskQueueFull = errors.New("TaskQueueFull") - ErrFailedResponse = errors.New("RpcFailed") ErrTaskAlreadyDone = errors.New("TaskAlreadyDone") ) @@ -85,17 +81,17 @@ type replicaChannelIndex struct { } type taskQueue struct { - // TaskPriority -> Tasks - buckets [][]Task - - cap int + // TaskPriority -> TaskID -> Task + buckets []map[int64]Task } -func newTaskQueue(cap int) *taskQueue { +func newTaskQueue() *taskQueue { + buckets := make([]map[int64]Task, len(TaskPriorities)) + for i := range buckets { + buckets[i] = make(map[int64]Task) + } return &taskQueue{ - buckets: make([][]Task, len(TaskPriorities)), - - cap: cap, + buckets: buckets, } } @@ -108,35 +104,21 @@ func (queue *taskQueue) Len() int { return taskNum } -func (queue *taskQueue) Cap() int { - return queue.cap -} - -func (queue *taskQueue) Add(task Task) bool { - if queue.Len() >= queue.Cap() { - return false - } - - queue.buckets[task.Priority()] = append(queue.buckets[task.Priority()], task) - return true +func (queue *taskQueue) Add(task Task) { + bucket := queue.buckets[task.Priority()] + bucket[task.ID()] = task } func (queue *taskQueue) Remove(task Task) { - bucket := &queue.buckets[task.Priority()] - - for i := range *bucket { - if (*bucket)[i].ID() == task.ID() { - *bucket = append((*bucket)[:i], (*bucket)[i+1:]...) - break - } - } + bucket := queue.buckets[task.Priority()] + delete(bucket, task.ID()) } // Range iterates all tasks in the queue ordered by priority from high to low func (queue *taskQueue) Range(fn func(task Task) bool) { for priority := len(queue.buckets) - 1; priority >= 0; priority-- { - for i := range queue.buckets[priority] { - if !fn(queue.buckets[priority][i]) { + for _, task := range queue.buckets[priority] { + if !fn(task) { return } } @@ -146,6 +128,8 @@ func (queue *taskQueue) Range(fn func(task Task) bool) { type Scheduler interface { Start(ctx context.Context) Stop() + AddExecutor(nodeID int64) + RemoveExecutor(nodeID int64) Add(task Task) error Dispatch(node int64) RemoveByNode(node int64) @@ -156,13 +140,14 @@ type Scheduler interface { type taskScheduler struct { rwmutex sync.RWMutex ctx context.Context - executor *Executor + executors map[int64]*Executor // NodeID -> Executor idAllocator func() UniqueID distMgr *meta.DistributionManager meta *meta.Meta targetMgr *meta.TargetManager broker meta.Broker + cluster session.Cluster nodeMgr *session.NodeManager tasks UniqueSet @@ -181,8 +166,8 @@ func NewScheduler(ctx context.Context, nodeMgr *session.NodeManager) *taskScheduler { id := int64(0) return &taskScheduler{ - ctx: ctx, - executor: NewExecutor(meta, distMgr, broker, targetMgr, cluster, nodeMgr), + ctx: ctx, + executors: make(map[int64]*Executor), idAllocator: func() UniqueID { id++ return id @@ -192,22 +177,59 @@ func NewScheduler(ctx context.Context, meta: meta, targetMgr: targetMgr, broker: broker, + cluster: cluster, nodeMgr: nodeMgr, tasks: make(UniqueSet), segmentTasks: make(map[replicaSegmentIndex]Task), channelTasks: make(map[replicaChannelIndex]Task), - processQueue: newTaskQueue(taskPoolSize), - waitQueue: newTaskQueue(taskPoolSize * 10), + processQueue: newTaskQueue(), + waitQueue: newTaskQueue(), } } -func (scheduler *taskScheduler) Start(ctx context.Context) { - scheduler.executor.Start(ctx) -} +func (scheduler *taskScheduler) Start(ctx context.Context) {} func (scheduler *taskScheduler) Stop() { - scheduler.executor.Stop() + scheduler.rwmutex.Lock() + defer scheduler.rwmutex.Unlock() + + for nodeID, executor := range scheduler.executors { + executor.Stop() + delete(scheduler.executors, nodeID) + } +} + +func (scheduler *taskScheduler) AddExecutor(nodeID int64) { + scheduler.rwmutex.Lock() + defer scheduler.rwmutex.Unlock() + + if _, exist := scheduler.executors[nodeID]; exist { + return + } + + executor := NewExecutor(scheduler.meta, + scheduler.distMgr, + scheduler.broker, + scheduler.targetMgr, + scheduler.cluster, + scheduler.nodeMgr) + + scheduler.executors[nodeID] = executor + executor.Start(scheduler.ctx) + log.Info("add executor for new QueryNode", zap.Int64("nodeID", nodeID)) +} + +func (scheduler *taskScheduler) RemoveExecutor(nodeID int64) { + scheduler.rwmutex.Lock() + defer scheduler.rwmutex.Unlock() + + executor, ok := scheduler.executors[nodeID] + if ok { + executor.Stop() + delete(scheduler.executors, nodeID) + log.Info("remove executor of offline QueryNode", zap.Int64("nodeID", nodeID)) + } } func (scheduler *taskScheduler) Add(task Task) error { @@ -219,12 +241,8 @@ func (scheduler *taskScheduler) Add(task Task) error { return err } - if !scheduler.waitQueue.Add(task) { - log.Warn("failed to add task", zap.String("task", task.String())) - return ErrTaskQueueFull - } - task.SetID(scheduler.idAllocator()) + scheduler.waitQueue.Add(task) scheduler.tasks.Insert(task.ID()) switch task := task.(type) { case *SegmentTask: @@ -317,33 +335,35 @@ func (scheduler *taskScheduler) promote(task Task) error { return err } - if scheduler.processQueue.Add(task) { - task.SetStatus(TaskStatusStarted) - return nil - } - - return ErrTaskQueueFull + scheduler.processQueue.Add(task) + task.SetStatus(TaskStatusStarted) + return nil } func (scheduler *taskScheduler) tryPromoteAll() { // Promote waiting tasks - toPromote := make([]Task, 0, scheduler.processQueue.Cap()-scheduler.processQueue.Len()) + toPromote := make([]Task, 0, scheduler.waitQueue.Len()) toRemove := make([]Task, 0) scheduler.waitQueue.Range(func(task Task) bool { err := scheduler.promote(task) - if errors.Is(err, ErrTaskStale) { // Task canceled or stale - task.SetStatus(TaskStatusStale) - task.SetErr(err) - toRemove = append(toRemove, task) - } else if errors.Is(err, ErrTaskCanceled) { + + if err != nil { task.SetStatus(TaskStatusCanceled) + if errors.Is(err, ErrTaskStale) { // Task canceled or stale + task.SetStatus(TaskStatusStale) + } + + log.Warn("failed to promote task", + zap.Int64("taskID", task.ID()), + zap.Error(err), + ) task.SetErr(err) toRemove = append(toRemove, task) - } else if err == nil { + } else { toPromote = append(toPromote, task) } - return !errors.Is(err, ErrTaskQueueFull) + return true }) for _, task := range toPromote { @@ -527,7 +547,28 @@ func (scheduler *taskScheduler) process(task Task) bool { zap.Int64("source", task.SourceID()), ) - if !scheduler.executor.Exist(task.ID()) && task.IsFinished(scheduler.distMgr) { + actions, step := task.Actions(), task.Step() + for step < len(actions) && actions[step].IsFinished(scheduler.distMgr) { + task.StepUp() + step++ + } + + if step == len(actions) { + step-- + } + + executor, ok := scheduler.executors[actions[step].Node()] + if !ok { + log.Warn("no executor for QueryNode", + zap.Int("step", step), + zap.Int64("nodeID", actions[step].Node())) + return false + } + + if task.IsFinished(scheduler.distMgr) { + if executor.Exist(task.ID()) { + return false + } task.SetStatus(TaskStatusSucceeded) } else if scheduler.checkCanceled(task) { task.SetStatus(TaskStatusCanceled) @@ -539,11 +580,10 @@ func (scheduler *taskScheduler) process(task Task) bool { task.SetErr(ErrTaskStale) } - step := task.Step() log = log.With(zap.Int("step", step)) switch task.Status() { case TaskStatusStarted: - if scheduler.executor.Execute(task, step) { + if executor.Execute(task, step) { return true } diff --git a/internal/querycoordv2/task/task.go b/internal/querycoordv2/task/task.go index d8d4bab7b9..ea164a91ee 100644 --- a/internal/querycoordv2/task/task.go +++ b/internal/querycoordv2/task/task.go @@ -74,6 +74,7 @@ type Task interface { Wait() error Actions() []Action Step() int + StepUp() int IsFinished(dist *meta.DistributionManager) bool String() string } @@ -188,18 +189,16 @@ func (task *baseTask) Step() int { return task.step } +func (task *baseTask) StepUp() int { + task.step++ + return task.step +} + func (task *baseTask) IsFinished(distMgr *meta.DistributionManager) bool { if task.Status() != TaskStatusStarted { return false } - - actions, step := task.Actions(), task.Step() - for step < len(actions) && actions[step].IsFinished(distMgr) { - task.step++ - step++ - } - - return task.Step() >= len(actions) + return task.Step() >= len(task.Actions()) } func (task *baseTask) String() string { diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index b7a1900522..3c83021e38 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -132,6 +132,9 @@ func (suite *TaskSuite) SetupTest() { suite.scheduler = suite.newScheduler() suite.scheduler.Start(context.Background()) + suite.scheduler.AddExecutor(1) + suite.scheduler.AddExecutor(2) + suite.scheduler.AddExecutor(3) } func (suite *TaskSuite) BeforeTest(suiteName, testName string) { @@ -147,7 +150,8 @@ func (suite *TaskSuite) BeforeTest(suiteName, testName string) { "TestTaskCanceled", "TestMoveSegmentTask", "TestSubmitDuplicateLoadSegmentTask", - "TestSubmitDuplicateSubscribeChannelTask": + "TestSubmitDuplicateSubscribeChannelTask", + "TestNoExecutor": suite.meta.PutCollection(&meta.Collection{ CollectionLoadInfo: &querypb.CollectionLoadInfo{ CollectionID: suite.collection, @@ -367,7 +371,7 @@ func (suite *TaskSuite) TestLoadSegmentTask() { // Expect suite.broker.EXPECT().GetCollectionSchema(mock.Anything, suite.collection).Return(&schemapb.CollectionSchema{ - Name: "TestSubscribeChannelTask", + Name: "TestLoadSegmentTask", }, nil) suite.broker.EXPECT().GetPartitions(mock.Anything, suite.collection).Return([]int64{100, 101}, nil) for _, segment := range suite.loadSegments { @@ -496,7 +500,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() { // Expect suite.broker.EXPECT().GetCollectionSchema(mock.Anything, suite.collection).Return(&schemapb.CollectionSchema{ - Name: "TestSubscribeChannelTask", + Name: "TestLoadSegmentTask", }, nil) suite.broker.EXPECT().GetPartitions(mock.Anything, suite.collection).Return([]int64{100, 101}, nil) for _, segment := range suite.loadSegments { @@ -1129,6 +1133,76 @@ func (suite *TaskSuite) TestSegmentTaskReplace() { suite.AssertTaskNum(0, segmentNum, 0, segmentNum) } +func (suite *TaskSuite) TestNoExecutor() { + ctx := context.Background() + timeout := 10 * time.Second + targetNode := int64(-1) + channel := &datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: Params.CommonCfg.RootCoordDml + "-test", + } + suite.nodeMgr.Add(session.NewNodeInfo(targetNode, "localhost")) + suite.meta.ReplicaManager.Put( + utils.CreateTestReplica(suite.replica, suite.collection, []int64{1, 2, 3, -1})) + + // Test load segment task + suite.dist.ChannelDistManager.Update(targetNode, meta.DmChannelFromVChannel(&datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: channel.ChannelName, + })) + tasks := []Task{} + segments := make([]*datapb.SegmentBinlogs, 0) + for _, segment := range suite.loadSegments { + segments = append(segments, &datapb.SegmentBinlogs{ + SegmentID: segment, + InsertChannel: channel.ChannelName, + }) + task, err := NewSegmentTask( + ctx, + timeout, + 0, + suite.collection, + suite.replica, + NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment), + ) + suite.NoError(err) + tasks = append(tasks, task) + err = suite.scheduler.Add(task) + suite.NoError(err) + } + suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, suite.collection, int64(1)).Return(nil, segments, nil) + suite.target.UpdateCollectionNextTargetWithPartitions(suite.collection, int64(1)) + segmentsNum := len(suite.loadSegments) + suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum) + + // Process tasks + suite.dispatchAndWait(targetNode) + suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum) + + // Other nodes' HB can't trigger the procedure of tasks + suite.dispatchAndWait(targetNode + 1) + suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum) + + // Process tasks done + // Dist contains channels + view := &meta.LeaderView{ + ID: targetNode, + CollectionID: suite.collection, + Segments: map[int64]*querypb.SegmentDist{}, + } + for _, segment := range suite.loadSegments { + view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0} + } + suite.dist.LeaderViewManager.Update(targetNode, view) + suite.dispatchAndWait(targetNode) + suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum) + + for _, task := range tasks { + suite.Equal(TaskStatusStarted, task.Status()) + suite.NoError(task.Err()) + } +} + func (suite *TaskSuite) AssertTaskNum(process, wait, channel, segment int) { scheduler := suite.scheduler @@ -1148,11 +1222,15 @@ func (suite *TaskSuite) dispatchAndWait(node int64) { for start := time.Now(); time.Since(start) < timeout; { count = 0 keys = make([]any, 0) - suite.scheduler.executor.executingTasks.Range(func(key, value any) bool { - keys = append(keys, key) - count++ - return true - }) + + for _, executor := range suite.scheduler.executors { + executor.executingTasks.Range(func(key, value any) bool { + keys = append(keys, key) + count++ + return true + }) + } + if count == 0 { return } diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index 9941981323..12580f8798 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -706,9 +706,10 @@ type queryCoordConfig struct { UpdatedTime time.Time //---- Task --- - RetryNum int32 - RetryInterval int64 - TaskMergeCap int32 + RetryNum int32 + RetryInterval int64 + TaskMergeCap int32 + TaskExecutionCap int32 //---- Handoff --- AutoHandoff bool @@ -736,6 +737,7 @@ func (p *queryCoordConfig) init(base *BaseTable) { p.initTaskRetryNum() p.initTaskRetryInterval() p.initTaskMergeCap() + p.initTaskExecutionCap() //---- Handoff --- p.initAutoHandoff() @@ -767,6 +769,10 @@ func (p *queryCoordConfig) initTaskMergeCap() { p.TaskMergeCap = p.Base.ParseInt32WithDefault("queryCoord.taskMergeCap", 16) } +func (p *queryCoordConfig) initTaskExecutionCap() { + p.TaskExecutionCap = p.Base.ParseInt32WithDefault("queryCoord.taskExecutionCap", 256) +} + func (p *queryCoordConfig) initAutoHandoff() { handoff, err := p.Base.Load("queryCoord.autoHandoff") if err != nil {