From 6336e2326c6f46a651cbf34fabdee13a02e36bb9 Mon Sep 17 00:00:00 2001 From: xige-16 Date: Tue, 25 Jan 2022 17:26:13 +0800 Subject: [PATCH] Set triggerCondition when queryCoord reload loadbalanceTask meta (#15380) Signed-off-by: xige-16 --- internal/querycoord/impl.go | 1 + internal/querycoord/query_coord.go | 1 + internal/querycoord/task.go | 12 ++++++++++++ internal/querycoord/task_scheduler.go | 3 +++ 4 files changed, 17 insertions(+) diff --git a/internal/querycoord/impl.go b/internal/querycoord/impl.go index f88beb73f7..493830872e 100644 --- a/internal/querycoord/impl.go +++ b/internal/querycoord/impl.go @@ -847,6 +847,7 @@ func (qc *QueryCoord) LoadBalance(ctx context.Context, req *querypb.LoadBalanceR } baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_LoadBalance) + req.BalanceReason = querypb.TriggerCondition_LoadBalance loadBalanceTask := &loadBalanceTask{ baseTask: baseTask, LoadBalanceRequest: req, diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 96790c5357..ed633f1cf4 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -343,6 +343,7 @@ func (qc *QueryCoord) watchNodeLoop() { MsgType: commonpb.MsgType_LoadBalanceSegments, SourceID: qc.session.ServerID, }, + BalanceReason: querypb.TriggerCondition_NodeDown, SourceNodeIDs: offlineNodeIDs, } diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 56e5210c96..f86008f921 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -72,6 +72,7 @@ type task interface { msgType() commonpb.MsgType timestamp() Timestamp getTriggerCondition() querypb.TriggerCondition + setTriggerCondition(trigger querypb.TriggerCondition) preExecute(ctx context.Context) error execute(ctx context.Context) error postExecute(ctx context.Context) error @@ -109,6 +110,7 @@ type baseTask struct { taskID UniqueID triggerCondition querypb.TriggerCondition + triggerMu sync.RWMutex parentTask task childTasks []task childTasksMu sync.RWMutex @@ -146,9 +148,19 @@ func (bt *baseTask) traceCtx() context.Context { } func (bt *baseTask) getTriggerCondition() querypb.TriggerCondition { + bt.triggerMu.RLock() + defer bt.triggerMu.RUnlock() + return bt.triggerCondition } +func (bt *baseTask) setTriggerCondition(trigger querypb.TriggerCondition) { + bt.triggerMu.Lock() + defer bt.triggerMu.Unlock() + + bt.triggerCondition = trigger +} + func (bt *baseTask) taskPriority() querypb.TriggerCondition { return bt.triggerCondition } diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index 691aa45f24..aec5d1910a 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -403,6 +403,9 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task, if err != nil { return nil, err } + // if triggerCondition == nodeDown, and the queryNode resources are insufficient, + // queryCoord will waits until queryNode can load the data, ensuring that the data is not lost + baseTask.setTriggerCondition(loadReq.BalanceReason) loadBalanceTask := &loadBalanceTask{ baseTask: baseTask, LoadBalanceRequest: &loadReq,