milvus/internal/querycoordv2/balance/streaming_query_node_channel_helper.go
congqixia cb7f2fa6fd
enhance: Use v2 package name for pkg module (#39990)
Related to #39095

https://go.dev/doc/modules/version-numbers

Update pkg version according to golang dep version convention

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
2025-02-22 23:15:58 +08:00

65 lines
2.0 KiB
Go

package balance
import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/v2/log"
)
func assignChannelToWALLocatedFirstForNodeInfo(
channels []*meta.DmChannel,
nodeItems []*session.NodeInfo,
) (notFoundChannels []*meta.DmChannel, plans []ChannelAssignPlan, scoreDelta map[int64]int) {
plans = make([]ChannelAssignPlan, 0)
notFoundChannels = make([]*meta.DmChannel, 0)
scoreDelta = make(map[int64]int)
for _, c := range channels {
nodeID := snmanager.StaticStreamingNodeManager.GetWALLocated(c.GetChannelName())
// Check if nodeID is in the list of nodeItems
// The nodeID may not be in the nodeItems when multi replica mode.
// Only one replica can be assigned to the node that wal is located.
found := false
for _, item := range nodeItems {
if item.ID() == nodeID {
plans = append(plans, ChannelAssignPlan{
From: -1,
To: item.ID(),
Channel: c,
})
found = true
scoreDelta[item.ID()] += 1
break
}
}
if !found {
notFoundChannels = append(notFoundChannels, c)
}
}
return notFoundChannels, plans, scoreDelta
}
// filterSQNIfStreamingServiceEnabled filter out the non-sqn querynode.
func filterSQNIfStreamingServiceEnabled(nodes []int64) []int64 {
if streamingutil.IsStreamingServiceEnabled() {
sqns := snmanager.StaticStreamingNodeManager.GetStreamingQueryNodeIDs()
expectedSQNs := make([]int64, 0, len(nodes))
unexpectedNodes := make([]int64, 0)
for _, node := range nodes {
if sqns.Contain(node) {
expectedSQNs = append(expectedSQNs, node)
} else {
unexpectedNodes = append(unexpectedNodes, node)
}
}
if len(unexpectedNodes) > 0 {
log.Warn("unexpected streaming querynode found when enable streaming service", zap.Int64s("unexpectedNodes", unexpectedNodes))
}
return expectedSQNs
}
return nodes
}