From 1e87b5454202d95d4e486f95caa1970da83d1706 Mon Sep 17 00:00:00 2001 From: yah01 Date: Tue, 21 Jun 2022 16:08:13 +0800 Subject: [PATCH] Add the offlines nodes previously crashed to offline nodes channel (#17663) Signed-off-by: yah01 --- internal/querycoord/query_coord.go | 52 +++++++++--------------------- 1 file changed, 16 insertions(+), 36 deletions(-) diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 76503c4965..1d515be86b 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -89,8 +89,9 @@ type QueryCoord struct { indexCoordClient types.IndexCoord broker *globalMetaBroker - session *sessionutil.Session - eventChan <-chan *sessionutil.SessionEvent + session *sessionutil.Session + eventChan <-chan *sessionutil.SessionEvent + offlineNodesChan chan UniqueID stateCode atomic.Value @@ -288,10 +289,11 @@ func NewQueryCoord(ctx context.Context, factory dependency.Factory) (*QueryCoord rand.Seed(time.Now().UnixNano()) ctx1, cancel := context.WithCancel(ctx) service := &QueryCoord{ - loopCtx: ctx1, - loopCancel: cancel, - factory: factory, - newNodeFn: newQueryNode, + loopCtx: ctx1, + loopCancel: cancel, + factory: factory, + newNodeFn: newQueryNode, + offlineNodesChan: make(chan UniqueID, 100), } service.UpdateStateCode(internalpb.StateCode_Abnormal) @@ -346,28 +348,9 @@ func (qc *QueryCoord) watchNodeLoop() { } } - offlineNodeIDs := qc.cluster.OfflineNodeIDs() - if len(offlineNodeIDs) != 0 { - loadBalanceSegment := &querypb.LoadBalanceRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_LoadBalanceSegments, - SourceID: qc.session.ServerID, - }, - BalanceReason: querypb.TriggerCondition_NodeDown, - SourceNodeIDs: offlineNodeIDs, - } - - baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_NodeDown) - loadBalanceTask := &loadBalanceTask{ - baseTask: baseTask, - LoadBalanceRequest: loadBalanceSegment, - broker: qc.broker, - cluster: qc.cluster, - meta: qc.meta, - } - //TODO::deal enqueue error - qc.scheduler.Enqueue(loadBalanceTask) - log.Info("start a loadBalance task", zap.Any("task", loadBalanceTask)) + go qc.loadBalanceNodeLoop(ctx) + for _, nodeID := range qc.cluster.OfflineNodeIDs() { + qc.offlineNodesChan <- nodeID } // TODO silverxia add Rewatch logic @@ -405,9 +388,6 @@ func (qc *QueryCoord) getUnallocatedNodes() []int64 { } func (qc *QueryCoord) handleNodeEvent(ctx context.Context) { - offlineNodeCh := make(chan UniqueID, 100) - go qc.loadBalanceNodeLoop(ctx, offlineNodeCh) - for { select { case <-ctx.Done(): @@ -452,19 +432,19 @@ func (qc *QueryCoord) handleNodeEvent(ctx context.Context) { } qc.cluster.StopNode(serverID) - offlineNodeCh <- serverID + qc.offlineNodesChan <- serverID } } } } -func (qc *QueryCoord) loadBalanceNodeLoop(ctx context.Context, offlineNodeCh chan UniqueID) { +func (qc *QueryCoord) loadBalanceNodeLoop(ctx context.Context) { for { select { case <-ctx.Done(): return - case node := <-offlineNodeCh: + case node := <-qc.offlineNodesChan: loadBalanceSegment := &querypb.LoadBalanceRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_LoadBalanceSegments, @@ -489,7 +469,7 @@ func (qc *QueryCoord) loadBalanceNodeLoop(ctx context.Context, offlineNodeCh cha log.Warn("failed to enqueue LoadBalance task into the scheduler", zap.Int64("nodeID", node), zap.Error(err)) - offlineNodeCh <- node + qc.offlineNodesChan <- node continue } @@ -502,7 +482,7 @@ func (qc *QueryCoord) loadBalanceNodeLoop(ctx context.Context, offlineNodeCh cha log.Warn("failed to process LoadBalance task", zap.Int64("nodeID", node), zap.Error(err)) - offlineNodeCh <- node + qc.offlineNodesChan <- node continue }