diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 581ca74bbe..11b52a3ab3 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -50,7 +50,7 @@ type Executor struct { // Merge load segment requests merger *Merger[segmentIndex, *querypb.LoadSegmentsRequest] - executingTasks *typeutil.ConcurrentSet[int64] // taskID + executingTasks *typeutil.ConcurrentSet[string] // task index executingTaskNum atomic.Int32 } @@ -70,7 +70,7 @@ func NewExecutor(meta *meta.Meta, nodeMgr: nodeMgr, merger: NewMerger[segmentIndex, *querypb.LoadSegmentsRequest](), - executingTasks: typeutil.NewConcurrentSet[int64](), + executingTasks: typeutil.NewConcurrentSet[string](), } } @@ -88,12 +88,12 @@ func (ex *Executor) Stop() { // does nothing and returns false if the action is already committed, // returns true otherwise. func (ex *Executor) Execute(task Task, step int) bool { - exist := !ex.executingTasks.Insert(task.ID()) + exist := !ex.executingTasks.Insert(task.Index()) if exist { return false } if ex.executingTaskNum.Inc() > Params.QueryCoordCfg.TaskExecutionCap.GetAsInt32() { - ex.executingTasks.Remove(task.ID()) + ex.executingTasks.Remove(task.Index()) ex.executingTaskNum.Dec() return false } @@ -120,10 +120,6 @@ func (ex *Executor) Execute(task Task, step int) bool { return true } -func (ex *Executor) Exist(taskID int64) bool { - return ex.executingTasks.Contain(taskID) -} - func (ex *Executor) scheduleRequests() { ex.wg.Add(1) go func() { @@ -208,7 +204,7 @@ func (ex *Executor) removeTask(task Task, step int) { zap.Error(task.Err())) } - ex.executingTasks.Remove(task.ID()) + ex.executingTasks.Remove(task.Index()) ex.executingTaskNum.Dec() } diff --git a/internal/querycoordv2/task/task.go b/internal/querycoordv2/task/task.go index 407d806cc3..72be3e6a25 100644 --- a/internal/querycoordv2/task/task.go +++ b/internal/querycoordv2/task/task.go @@ -72,6 +72,7 @@ type Task interface { Err() error Priority() Priority SetPriority(priority Priority) + Index() string // dedup indexing string Cancel(err error) Wait() error @@ -166,6 +167,10 @@ func (task *baseTask) SetPriority(priority Priority) { task.priority = priority } +func (task *baseTask) Index() string { + return fmt.Sprintf("[replica=%d]", task.replicaID) +} + func (task *baseTask) Err() error { select { case <-task.doneCh: @@ -290,6 +295,10 @@ func (task *SegmentTask) SegmentID() UniqueID { return task.segmentID } +func (task *SegmentTask) Index() string { + return fmt.Sprintf("%s[segment=%d][growing=%t]", task.baseTask.Index(), task.segmentID, task.Actions()[0].(*SegmentAction).Scope() == querypb.DataScope_Streaming) +} + func (task *SegmentTask) String() string { return fmt.Sprintf("%s [segmentID=%d]", task.baseTask.String(), task.segmentID) } @@ -335,6 +344,10 @@ func (task *ChannelTask) Channel() string { return task.shard } +func (task *ChannelTask) Index() string { + return fmt.Sprintf("%s[channel=%s]", task.baseTask.Index(), task.shard) +} + func (task *ChannelTask) String() string { return fmt.Sprintf("%s [channel=%s]", task.baseTask.String(), task.Channel()) } diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index e6add07182..3b6b5a8e88 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -1286,8 +1286,8 @@ func (suite *TaskSuite) dispatchAndWait(node int64) { keys = make([]any, 0) for _, executor := range suite.scheduler.executors { - executor.executingTasks.Range(func(taskID int64) bool { - keys = append(keys, taskID) + executor.executingTasks.Range(func(taskIndex string) bool { + keys = append(keys, taskIndex) count++ return true })