fix: filter the streaming node from resource group (#43984)

issue: #43981

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-08-22 19:21:47 +08:00 committed by GitHub
parent f3d7e47227
commit cbb9392564
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 17 additions and 4 deletions

View File

@ -469,6 +469,9 @@ func (rm *ResourceManager) HandleNodeUp(ctx context.Context, node int64) {
}
func (rm *ResourceManager) handleNodeUp(ctx context.Context, node int64) {
if nodeInfo := rm.nodeMgr.Get(node); nodeInfo == nil || nodeInfo.IsEmbeddedQueryNodeInStreamingNode() {
return
}
rm.incomingNode.Insert(node)
// Trigger assign incoming node right away.
// error can be ignored here, because `AssignPendingIncomingNode`` will retry assign node.
@ -1020,6 +1023,9 @@ func (rm *ResourceManager) CheckNodesInResourceGroup(ctx context.Context) {
} else if info.GetState() == session.NodeStateStopping {
log.Warn("node is stopping", zap.Int64("node", node))
rm.handleNodeStopping(ctx, node)
} else if info.IsEmbeddedQueryNodeInStreamingNode() {
log.Warn("unreachable code, but just for dirty meta clean up", zap.Int64("node", node))
rm.handleNodeStopping(ctx, node)
}
}
}

View File

@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/v2/kv"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
@ -221,6 +222,16 @@ func (suite *ResourceManagerSuite) TestManipulateResourceGroup() {
// RemoveResourceGroup will remove all nodes from the resource group.
err = suite.manager.RemoveResourceGroup(ctx, "rg2")
suite.NoError(err)
suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 10,
Address: "localhost",
Hostname: "localhost",
Labels: map[string]string{
sessionutil.LabelStreamingNodeEmbeddedQueryNode: "1",
},
}))
suite.manager.HandleNodeUp(ctx, 10)
}
func (suite *ResourceManagerSuite) TestNodeUpAndDown() {

View File

@ -738,10 +738,6 @@ func (s *Server) handleNodeUp(node int64) {
// start dist handler
s.distController.StartDistInstance(s.ctx, node)
if nodeInfo.IsEmbeddedQueryNodeInStreamingNode() {
// The querynode embedded in the streaming node can not work with streaming node.
return
}
// need assign to new rg and replica
s.meta.ResourceManager.HandleNodeUp(s.ctx, node)