mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
Cherry-pick from master pr: #45969 Related to #45960 Follow-up to #45961 After #45961 ensured that handleNodeUp is always called for nodes discovered during rewatchNodes (including stopping nodes), this change adds a safeguard in ResourceManager.handleNodeUp to skip adding stopping nodes to resource groups. 1. **resource_manager.go**: Add check for IsStoppingState() in handleNodeUp to prevent stopping nodes from being added to incomingNode set and assigned to resource groups. 2. **server.go**: - Delete processed nodes from sessionMap to avoid duplicate processing in the subsequent loop - Add warning logs for stopping state transitions during rewatch Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
61c80235dd
commit
e70e70699c
@ -469,7 +469,12 @@ func (rm *ResourceManager) HandleNodeUp(ctx context.Context, node int64) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (rm *ResourceManager) handleNodeUp(ctx context.Context, node int64) {
|
func (rm *ResourceManager) handleNodeUp(ctx context.Context, node int64) {
|
||||||
if nodeInfo := rm.nodeMgr.Get(node); nodeInfo == nil || nodeInfo.IsEmbeddedQueryNodeInStreamingNode() {
|
nodeInfo := rm.nodeMgr.Get(node)
|
||||||
|
if nodeInfo == nil || nodeInfo.IsEmbeddedQueryNodeInStreamingNode() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if nodeInfo.IsStoppingState() {
|
||||||
|
log.Warn("node is stopping, skip handle node up in resource manager", zap.Int64("node", node))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rm.incomingNode.Insert(node)
|
rm.incomingNode.Insert(node)
|
||||||
|
|||||||
@ -799,11 +799,15 @@ func (s *Server) rewatchNodes(sessions map[string]*sessionutil.Session) error {
|
|||||||
// node in node manager but session not exist, means it's offline
|
// node in node manager but session not exist, means it's offline
|
||||||
s.nodeMgr.Remove(node.ID())
|
s.nodeMgr.Remove(node.ID())
|
||||||
s.handleNodeDown(node.ID())
|
s.handleNodeDown(node.ID())
|
||||||
} else if nodeSession.Stopping && !node.IsStoppingState() {
|
} else {
|
||||||
|
if nodeSession.Stopping && !node.IsStoppingState() {
|
||||||
// node in node manager but session is stopping, means it's stopping
|
// node in node manager but session is stopping, means it's stopping
|
||||||
|
log.Warn("rewatch found old querynode in stopping state", zap.Int64("nodeID", nodeSession.ServerID))
|
||||||
s.nodeMgr.Stopping(node.ID())
|
s.nodeMgr.Stopping(node.ID())
|
||||||
s.handleNodeStopping(node.ID())
|
s.handleNodeStopping(node.ID())
|
||||||
}
|
}
|
||||||
|
delete(sessionMap, node.ID())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// then add all on new online nodes
|
// then add all on new online nodes
|
||||||
@ -823,6 +827,7 @@ func (s *Server) rewatchNodes(sessions map[string]*sessionutil.Session) error {
|
|||||||
s.handleNodeUp(nodeSession.GetServerID())
|
s.handleNodeUp(nodeSession.GetServerID())
|
||||||
|
|
||||||
if nodeSession.Stopping {
|
if nodeSession.Stopping {
|
||||||
|
log.Warn("rewatch found new querynode in stopping state", zap.Int64("nodeID", nodeSession.ServerID))
|
||||||
s.nodeMgr.Stopping(nodeSession.ServerID)
|
s.nodeMgr.Stopping(nodeSession.ServerID)
|
||||||
s.handleNodeStopping(nodeSession.ServerID)
|
s.handleNodeStopping(nodeSession.ServerID)
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user