diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 3da6cf071b..efe4f7cf92 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -169,7 +169,7 @@ queryCoord: distPullInterval: 500 loadTimeoutSeconds: 600 checkHandoffInterval: 5000 - taskMergeCap: 2 + taskMergeCap: 8 diff --git a/internal/querycoordv2/checkers/controller.go b/internal/querycoordv2/checkers/controller.go index 591bb74eb6..151f607116 100644 --- a/internal/querycoordv2/checkers/controller.go +++ b/internal/querycoordv2/checkers/controller.go @@ -14,7 +14,7 @@ import ( ) var ( - checkRoundTaskNumLimit = 128 + checkRoundTaskNumLimit = 256 ) type CheckerController struct { @@ -86,20 +86,21 @@ func (controller *CheckerController) Stop() { // check is the real implementation of Check func (controller *CheckerController) check(ctx context.Context) { tasks := make([]task.Task, 0) - for id, checker := range controller.checkers { - log := log.With(zap.Int("checkerID", id)) - + for _, checker := range controller.checkers { tasks = append(tasks, checker.Check(ctx)...) - if len(tasks) >= checkRoundTaskNumLimit { - log.Info("checkers have spawn too many tasks, won't run subsequent checkers, and truncate the spawned tasks", + } + + added := 0 + for _, task := range tasks { + err := controller.scheduler.Add(task) + if err != nil { + continue + } + added++ + if added >= checkRoundTaskNumLimit { + log.Info("checkers have added too many tasks, truncate the subsequent tasks", zap.Int("taskNum", len(tasks)), zap.Int("taskNumLimit", checkRoundTaskNumLimit)) - tasks = tasks[:checkRoundTaskNumLimit] - break } } - - for _, task := range tasks { - controller.scheduler.Add(task) - } } diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 4b805fa123..493fb99ddb 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -19,7 +19,7 @@ const ( TaskTypeReduce TaskTypeMove - taskPoolSize = 128 + taskPoolSize = 256 ) var ( diff --git a/internal/querynode/task.go b/internal/querynode/task.go index e9838202ee..0e8df7b82c 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -38,6 +38,7 @@ import ( type task interface { ID() UniqueID // return ReqID + Ctx() context.Context Timestamp() Timestamp PreExecute(ctx context.Context) error Execute(ctx context.Context) error @@ -539,6 +540,12 @@ func (l *loadSegmentsTask) PreExecute(ctx context.Context) error { func (l *loadSegmentsTask) Execute(ctx context.Context) error { log.Info("LoadSegmentTask Execute start", zap.Int64("msgID", l.req.Base.MsgID)) + if len(l.req.Infos) == 0 { + log.Info("all segments loaded", + zap.Int64("msgID", l.req.GetBase().GetMsgID())) + return nil + } + segmentIDs := lo.Map(l.req.Infos, func(info *queryPb.SegmentLoadInfo, idx int) UniqueID { return info.SegmentID }) l.node.metaReplica.addSegmentsLoadingList(segmentIDs) defer l.node.metaReplica.removeSegmentsLoadingList(segmentIDs) diff --git a/internal/querynode/task_scheduler.go b/internal/querynode/task_scheduler.go index 96a2036532..17fbc4ec40 100644 --- a/internal/querynode/task_scheduler.go +++ b/internal/querynode/task_scheduler.go @@ -123,7 +123,13 @@ func (s *taskScheduler) taskLoop() { case <-s.queue.utChan(): if !s.queue.utEmpty() { t := s.queue.PopUnissuedTask() - s.processTask(t, s.queue) + select { + case <-t.Ctx().Done(): + t.Notify(context.Canceled) + continue + default: + s.processTask(t, s.queue) + } } } } diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index fe7a23cc79..ec9524a83d 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -661,7 +661,7 @@ func (p *queryCoordConfig) initTaskRetryInterval() { } func (p *queryCoordConfig) initTaskMergeCap() { - p.TaskMergeCap = p.Base.ParseInt32WithDefault("queryCoord.taskMergeCap", 2) + p.TaskMergeCap = p.Base.ParseInt32WithDefault("queryCoord.taskMergeCap", 8) } func (p *queryCoordConfig) initAutoHandoff() {