fix: check if qn is sqn with label and streamingnode list (#44792)

issue: #44014

- On standalone, the query node inside need to load segment and watch
channel, so the querynode is not a embeded querynode in streamingnode
without `LabelStreamingNodeEmbeddedQueryNode`. The channel dist manager
can not confirm a standalone node is a embededStreamingNode.

Bug is introduced by #44099

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-10-13 16:33:59 +08:00 committed by GitHub
parent a444e2f937
commit 53e8f150e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 23 additions and 18 deletions

View File

@ -21,7 +21,6 @@ import (
"github.com/milvus-io/milvus/cmd/roles"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
"github.com/milvus-io/milvus/pkg/v2/util/hardware"
@ -138,7 +137,7 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
case typeutil.DataNodeRole:
role.EnableDataNode = true
case typeutil.StreamingNodeRole:
streamingutil.EnableEmbededQueryNode()
sessionutil.EnableEmbededQueryNodeLabel()
role.EnableStreamingNode = true
role.EnableQueryNode = true
case typeutil.StandaloneRole, typeutil.EmbeddedRole:
@ -149,9 +148,9 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
role.EnableStreamingNode = true
role.Local = true
role.Embedded = serverType == typeutil.EmbeddedRole
sessionutil.EnableStandaloneLabel()
case typeutil.MixCoordRole:
role.EnableMixCoord = true
case typeutil.MixtureRole:
role.EnableRootCoord = enableRootCoord
role.EnableQueryCoord = enableQueryCoord
@ -162,7 +161,7 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
role.EnableStreamingNode = enableStreamingNode
if enableStreamingNode && !enableQueryNode {
role.EnableQueryNode = true
streamingutil.EnableEmbededQueryNode()
sessionutil.EnableEmbededQueryNodeLabel()
}
case typeutil.CDCRole:

View File

@ -424,7 +424,7 @@ func (m *ChannelDistManager) checkIfStreamingNode(nodeID int64) bool {
if node == nil {
return false
}
return node.IsEmbeddedQueryNodeInStreamingNode()
return node.IsEmbeddedQueryNodeInStreamingNode() || node.IsInStandalone()
}
func (m *ChannelDistManager) GetChannelDist(collectionID int64) []*metricsinfo.DmChannel {

View File

@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
@ -81,6 +82,8 @@ func (suite *ChannelDistManagerSuite) SetupSuite() {
}
func (suite *ChannelDistManagerSuite) SetupTest() {
snmanager.ResetDoNothingStreamingNodeManager(suite.T())
suite.dist = NewChannelDistManager(session.NewNodeManager())
// Distribution:
// node 0 contains channel dmc0

View File

@ -153,6 +153,11 @@ func (n *NodeInfo) Labels() map[string]string {
return n.immutableInfo.Labels
}
// IsInStandalone returns true if the node is in standalone.
func (n *NodeInfo) IsInStandalone() bool {
return n.immutableInfo.Labels[sessionutil.LabelStandalone] == "1"
}
func (n *NodeInfo) IsEmbeddedQueryNodeInStreamingNode() bool {
return n.immutableInfo.Labels[sessionutil.LabelStreamingNodeEmbeddedQueryNode] == "1"
}

View File

@ -53,9 +53,20 @@ const (
DefaultIDKey = "id"
SupportedLabelPrefix = "MILVUS_SERVER_LABEL_"
LabelStreamingNodeEmbeddedQueryNode = "QUERYNODE_STREAMING-EMBEDDED"
LabelStandalone = "STANDALONE"
MilvusNodeIDForTesting = "MILVUS_NODE_ID_FOR_TESTING"
)
// EnableEmbededQueryNodeLabel set server labels for embedded query node.
func EnableEmbededQueryNodeLabel() {
os.Setenv(SupportedLabelPrefix+LabelStreamingNodeEmbeddedQueryNode, "1")
}
// EnableStandaloneLabel set server labels for standalone.
func EnableStandaloneLabel() {
os.Setenv(SupportedLabelPrefix+LabelStandalone, "1")
}
// SessionEventType session event type
type SessionEventType int

View File

@ -2,8 +2,6 @@ package streamingutil
import (
"os"
"github.com/milvus-io/milvus/internal/util/sessionutil"
)
const MilvusStreamingServiceEnabled = "MILVUS_STREAMING_SERVICE_ENABLED"
@ -28,14 +26,3 @@ func MustEnableStreamingService() {
panic("start a streaming node without enabling streaming service, please set environment variable MILVUS_STREAMING_SERVICE_ENABLED = 1")
}
}
// EnableEmbededQueryNode set server labels for embedded query node.
func EnableEmbededQueryNode() {
MustEnableStreamingService()
os.Setenv(sessionutil.SupportedLabelPrefix+sessionutil.LabelStreamingNodeEmbeddedQueryNode, "1")
}
// IsEmbeddedQueryNode returns whether the current node is an embedded query node in streaming node.
func IsEmbeddedQueryNode() bool {
return os.Getenv(sessionutil.SupportedLabelPrefix+sessionutil.LabelStreamingNodeEmbeddedQueryNode) == "1"
}