milvus/internal/querycoordv2/balance/streaming_query_node_channel_helper.go
Zhen Ye c84a0748c4
enhance: add rw/ro streaming query node replica management (#38677)
issue: #38399

- Embed the query node into streaming node to make delegator available
at streaming node.
- The embedded query node has a special server label
`QUERYNODE_STREAMING-EMBEDDED`.
- Change the balance strategy to make the channel assigned to streaming
node as much as possible.

Signed-off-by: chyezh <chyezh@outlook.com>
2025-01-24 16:55:07 +08:00

65 lines
2.0 KiB
Go

package balance
import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/log"
)
func assignChannelToWALLocatedFirstForNodeInfo(
channels []*meta.DmChannel,
nodeItems []*session.NodeInfo,
) (notFoundChannels []*meta.DmChannel, plans []ChannelAssignPlan, scoreDelta map[int64]int) {
plans = make([]ChannelAssignPlan, 0)
notFoundChannels = make([]*meta.DmChannel, 0)
scoreDelta = make(map[int64]int)
for _, c := range channels {
nodeID := snmanager.StaticStreamingNodeManager.GetWALLocated(c.GetChannelName())
// Check if nodeID is in the list of nodeItems
// The nodeID may not be in the nodeItems when multi replica mode.
// Only one replica can be assigned to the node that wal is located.
found := false
for _, item := range nodeItems {
if item.ID() == nodeID {
plans = append(plans, ChannelAssignPlan{
From: -1,
To: item.ID(),
Channel: c,
})
found = true
scoreDelta[item.ID()] += 1
break
}
}
if !found {
notFoundChannels = append(notFoundChannels, c)
}
}
return notFoundChannels, plans, scoreDelta
}
// filterSQNIfStreamingServiceEnabled filter out the non-sqn querynode.
func filterSQNIfStreamingServiceEnabled(nodes []int64) []int64 {
if streamingutil.IsStreamingServiceEnabled() {
sqns := snmanager.StaticStreamingNodeManager.GetStreamingQueryNodeIDs()
expectedSQNs := make([]int64, 0, len(nodes))
unexpectedNodes := make([]int64, 0)
for _, node := range nodes {
if sqns.Contain(node) {
expectedSQNs = append(expectedSQNs, node)
} else {
unexpectedNodes = append(unexpectedNodes, node)
}
}
if len(unexpectedNodes) > 0 {
log.Warn("unexpected streaming querynode found when enable streaming service", zap.Int64s("unexpectedNodes", unexpectedNodes))
}
return expectedSQNs
}
return nodes
}