From 53e8f150e865ff715a4456c888341f36b352a798 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Mon, 13 Oct 2025 16:33:59 +0800 Subject: [PATCH] fix: check if qn is sqn with label and streamingnode list (#44792) issue: #44014 - On standalone, the query node inside need to load segment and watch channel, so the querynode is not a embeded querynode in streamingnode without `LabelStreamingNodeEmbeddedQueryNode`. The channel dist manager can not confirm a standalone node is a embededStreamingNode. Bug is introduced by #44099 Signed-off-by: chyezh --- cmd/milvus/util.go | 7 +++---- internal/querycoordv2/meta/channel_dist_manager.go | 2 +- .../querycoordv2/meta/channel_dist_manager_test.go | 3 +++ internal/querycoordv2/session/node_manager.go | 5 +++++ internal/util/sessionutil/session_util.go | 11 +++++++++++ internal/util/streamingutil/env.go | 13 ------------- 6 files changed, 23 insertions(+), 18 deletions(-) diff --git a/cmd/milvus/util.go b/cmd/milvus/util.go index f6a2d84bd5..3cefb1532b 100644 --- a/cmd/milvus/util.go +++ b/cmd/milvus/util.go @@ -21,7 +21,6 @@ import ( "github.com/milvus-io/milvus/cmd/roles" "github.com/milvus-io/milvus/internal/util/sessionutil" - "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/util/etcd" "github.com/milvus-io/milvus/pkg/v2/util/hardware" @@ -138,7 +137,7 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles { case typeutil.DataNodeRole: role.EnableDataNode = true case typeutil.StreamingNodeRole: - streamingutil.EnableEmbededQueryNode() + sessionutil.EnableEmbededQueryNodeLabel() role.EnableStreamingNode = true role.EnableQueryNode = true case typeutil.StandaloneRole, typeutil.EmbeddedRole: @@ -149,9 +148,9 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles { role.EnableStreamingNode = true role.Local = true role.Embedded = serverType == typeutil.EmbeddedRole + sessionutil.EnableStandaloneLabel() case typeutil.MixCoordRole: role.EnableMixCoord = true - case typeutil.MixtureRole: role.EnableRootCoord = enableRootCoord role.EnableQueryCoord = enableQueryCoord @@ -162,7 +161,7 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles { role.EnableStreamingNode = enableStreamingNode if enableStreamingNode && !enableQueryNode { role.EnableQueryNode = true - streamingutil.EnableEmbededQueryNode() + sessionutil.EnableEmbededQueryNodeLabel() } case typeutil.CDCRole: diff --git a/internal/querycoordv2/meta/channel_dist_manager.go b/internal/querycoordv2/meta/channel_dist_manager.go index 84856777c8..5004cfdff2 100644 --- a/internal/querycoordv2/meta/channel_dist_manager.go +++ b/internal/querycoordv2/meta/channel_dist_manager.go @@ -424,7 +424,7 @@ func (m *ChannelDistManager) checkIfStreamingNode(nodeID int64) bool { if node == nil { return false } - return node.IsEmbeddedQueryNodeInStreamingNode() + return node.IsEmbeddedQueryNodeInStreamingNode() || node.IsInStandalone() } func (m *ChannelDistManager) GetChannelDist(collectionID int64) []*metricsinfo.DmChannel { diff --git a/internal/querycoordv2/meta/channel_dist_manager_test.go b/internal/querycoordv2/meta/channel_dist_manager_test.go index cdb9d5433d..c3295fffcc 100644 --- a/internal/querycoordv2/meta/channel_dist_manager_test.go +++ b/internal/querycoordv2/meta/channel_dist_manager_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus/internal/coordinator/snmanager" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" @@ -81,6 +82,8 @@ func (suite *ChannelDistManagerSuite) SetupSuite() { } func (suite *ChannelDistManagerSuite) SetupTest() { + snmanager.ResetDoNothingStreamingNodeManager(suite.T()) + suite.dist = NewChannelDistManager(session.NewNodeManager()) // Distribution: // node 0 contains channel dmc0 diff --git a/internal/querycoordv2/session/node_manager.go b/internal/querycoordv2/session/node_manager.go index f2f346fc5f..ff8ca753a7 100644 --- a/internal/querycoordv2/session/node_manager.go +++ b/internal/querycoordv2/session/node_manager.go @@ -153,6 +153,11 @@ func (n *NodeInfo) Labels() map[string]string { return n.immutableInfo.Labels } +// IsInStandalone returns true if the node is in standalone. +func (n *NodeInfo) IsInStandalone() bool { + return n.immutableInfo.Labels[sessionutil.LabelStandalone] == "1" +} + func (n *NodeInfo) IsEmbeddedQueryNodeInStreamingNode() bool { return n.immutableInfo.Labels[sessionutil.LabelStreamingNodeEmbeddedQueryNode] == "1" } diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 0da9fb6c1a..d640a21fec 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -53,9 +53,20 @@ const ( DefaultIDKey = "id" SupportedLabelPrefix = "MILVUS_SERVER_LABEL_" LabelStreamingNodeEmbeddedQueryNode = "QUERYNODE_STREAMING-EMBEDDED" + LabelStandalone = "STANDALONE" MilvusNodeIDForTesting = "MILVUS_NODE_ID_FOR_TESTING" ) +// EnableEmbededQueryNodeLabel set server labels for embedded query node. +func EnableEmbededQueryNodeLabel() { + os.Setenv(SupportedLabelPrefix+LabelStreamingNodeEmbeddedQueryNode, "1") +} + +// EnableStandaloneLabel set server labels for standalone. +func EnableStandaloneLabel() { + os.Setenv(SupportedLabelPrefix+LabelStandalone, "1") +} + // SessionEventType session event type type SessionEventType int diff --git a/internal/util/streamingutil/env.go b/internal/util/streamingutil/env.go index 101fe1d53c..7c4bbdcbc0 100644 --- a/internal/util/streamingutil/env.go +++ b/internal/util/streamingutil/env.go @@ -2,8 +2,6 @@ package streamingutil import ( "os" - - "github.com/milvus-io/milvus/internal/util/sessionutil" ) const MilvusStreamingServiceEnabled = "MILVUS_STREAMING_SERVICE_ENABLED" @@ -28,14 +26,3 @@ func MustEnableStreamingService() { panic("start a streaming node without enabling streaming service, please set environment variable MILVUS_STREAMING_SERVICE_ENABLED = 1") } } - -// EnableEmbededQueryNode set server labels for embedded query node. -func EnableEmbededQueryNode() { - MustEnableStreamingService() - os.Setenv(sessionutil.SupportedLabelPrefix+sessionutil.LabelStreamingNodeEmbeddedQueryNode, "1") -} - -// IsEmbeddedQueryNode returns whether the current node is an embedded query node in streaming node. -func IsEmbeddedQueryNode() bool { - return os.Getenv(sessionutil.SupportedLabelPrefix+sessionutil.LabelStreamingNodeEmbeddedQueryNode) == "1" -}