From af734f19dc9a5bc74197b9c506e7ae78ac45c1bc Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 1 Dec 2025 16:17:10 +0800 Subject: [PATCH] enhance: skip adding stopping node to resource group in handleNodeUp (#45969) Related to #45960 Follow-up to #45961 After #45961 ensured that handleNodeUp is always called for nodes discovered during rewatchNodes (including stopping nodes), this change adds a safeguard in ResourceManager.handleNodeUp to skip adding stopping nodes to resource groups. 1. **resource_manager.go**: Add check for IsStoppingState() in handleNodeUp to prevent stopping nodes from being added to incomingNode set and assigned to resource groups. 2. **server.go**: - Delete processed nodes from sessionMap to avoid duplicate processing in the subsequent loop - Add warning logs for stopping state transitions during rewatch Signed-off-by: Congqi Xia --- internal/querycoordv2/meta/resource_manager.go | 7 ++++++- internal/querycoordv2/server.go | 13 +++++++++---- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/internal/querycoordv2/meta/resource_manager.go b/internal/querycoordv2/meta/resource_manager.go index 99402a6aa7..d5abfd6534 100644 --- a/internal/querycoordv2/meta/resource_manager.go +++ b/internal/querycoordv2/meta/resource_manager.go @@ -532,7 +532,12 @@ func (rm *ResourceManager) HandleNodeUp(ctx context.Context, node int64) { } func (rm *ResourceManager) handleNodeUp(ctx context.Context, node int64) { - if nodeInfo := rm.nodeMgr.Get(node); nodeInfo == nil || nodeInfo.IsEmbeddedQueryNodeInStreamingNode() { + nodeInfo := rm.nodeMgr.Get(node) + if nodeInfo == nil || nodeInfo.IsEmbeddedQueryNodeInStreamingNode() { + return + } + if nodeInfo.IsStoppingState() { + log.Warn("node is stopping, skip handle node up in resource manager", zap.Int64("node", node)) return } rm.incomingNode.Insert(node) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 311003eb93..e6aea2bec5 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -715,10 +715,14 @@ func (s *Server) rewatchNodes(sessions map[string]*sessionutil.Session) error { // node in node manager but session not exist, means it's offline s.nodeMgr.Remove(node.ID()) s.handleNodeDown(node.ID()) - } else if nodeSession.Stopping && !node.IsStoppingState() { - // node in node manager but session is stopping, means it's stopping - s.nodeMgr.Stopping(node.ID()) - s.handleNodeStopping(node.ID()) + } else { + if nodeSession.Stopping && !node.IsStoppingState() { + // node in node manager but session is stopping, means it's stopping + log.Warn("rewatch found old querynode in stopping state", zap.Int64("nodeID", nodeSession.ServerID)) + s.nodeMgr.Stopping(node.ID()) + s.handleNodeStopping(node.ID()) + } + delete(sessionMap, node.ID()) } } @@ -739,6 +743,7 @@ func (s *Server) rewatchNodes(sessions map[string]*sessionutil.Session) error { s.handleNodeUp(nodeSession.GetServerID()) if nodeSession.Stopping { + log.Warn("rewatch found new querynode in stopping state", zap.Int64("nodeID", nodeSession.ServerID)) s.nodeMgr.Stopping(nodeSession.ServerID) s.handleNodeStopping(nodeSession.ServerID) }