From e70e70699cfc12abbd644d9f133d59a88fb4614f Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 2 Dec 2025 10:23:13 +0800 Subject: [PATCH] enhance: [2.5] skip adding stopping node to resource group in handleNodeUp (#45969) (#45982) Cherry-pick from master pr: #45969 Related to #45960 Follow-up to #45961 After #45961 ensured that handleNodeUp is always called for nodes discovered during rewatchNodes (including stopping nodes), this change adds a safeguard in ResourceManager.handleNodeUp to skip adding stopping nodes to resource groups. 1. **resource_manager.go**: Add check for IsStoppingState() in handleNodeUp to prevent stopping nodes from being added to incomingNode set and assigned to resource groups. 2. **server.go**: - Delete processed nodes from sessionMap to avoid duplicate processing in the subsequent loop - Add warning logs for stopping state transitions during rewatch Signed-off-by: Congqi Xia --- internal/querycoordv2/meta/resource_manager.go | 7 ++++++- internal/querycoordv2/server.go | 13 +++++++++---- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/internal/querycoordv2/meta/resource_manager.go b/internal/querycoordv2/meta/resource_manager.go index 1b9c6d2222..5d35ad7ba8 100644 --- a/internal/querycoordv2/meta/resource_manager.go +++ b/internal/querycoordv2/meta/resource_manager.go @@ -469,7 +469,12 @@ func (rm *ResourceManager) HandleNodeUp(ctx context.Context, node int64) { } func (rm *ResourceManager) handleNodeUp(ctx context.Context, node int64) { - if nodeInfo := rm.nodeMgr.Get(node); nodeInfo == nil || nodeInfo.IsEmbeddedQueryNodeInStreamingNode() { + nodeInfo := rm.nodeMgr.Get(node) + if nodeInfo == nil || nodeInfo.IsEmbeddedQueryNodeInStreamingNode() { + return + } + if nodeInfo.IsStoppingState() { + log.Warn("node is stopping, skip handle node up in resource manager", zap.Int64("node", node)) return } rm.incomingNode.Insert(node) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index e354aa6b35..756460916e 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -799,10 +799,14 @@ func (s *Server) rewatchNodes(sessions map[string]*sessionutil.Session) error { // node in node manager but session not exist, means it's offline s.nodeMgr.Remove(node.ID()) s.handleNodeDown(node.ID()) - } else if nodeSession.Stopping && !node.IsStoppingState() { - // node in node manager but session is stopping, means it's stopping - s.nodeMgr.Stopping(node.ID()) - s.handleNodeStopping(node.ID()) + } else { + if nodeSession.Stopping && !node.IsStoppingState() { + // node in node manager but session is stopping, means it's stopping + log.Warn("rewatch found old querynode in stopping state", zap.Int64("nodeID", nodeSession.ServerID)) + s.nodeMgr.Stopping(node.ID()) + s.handleNodeStopping(node.ID()) + } + delete(sessionMap, node.ID()) } } @@ -823,6 +827,7 @@ func (s *Server) rewatchNodes(sessions map[string]*sessionutil.Session) error { s.handleNodeUp(nodeSession.GetServerID()) if nodeSession.Stopping { + log.Warn("rewatch found new querynode in stopping state", zap.Int64("nodeID", nodeSession.ServerID)) s.nodeMgr.Stopping(nodeSession.ServerID) s.handleNodeStopping(nodeSession.ServerID) }