mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
Related to #39095 https://go.dev/doc/modules/version-numbers Update pkg version according to golang dep version convention --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
65 lines
2.0 KiB
Go
65 lines
2.0 KiB
Go
package balance
|
|
|
|
import (
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus/internal/coordinator/snmanager"
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
|
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
|
"github.com/milvus-io/milvus/internal/util/streamingutil"
|
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
|
)
|
|
|
|
func assignChannelToWALLocatedFirstForNodeInfo(
|
|
channels []*meta.DmChannel,
|
|
nodeItems []*session.NodeInfo,
|
|
) (notFoundChannels []*meta.DmChannel, plans []ChannelAssignPlan, scoreDelta map[int64]int) {
|
|
plans = make([]ChannelAssignPlan, 0)
|
|
notFoundChannels = make([]*meta.DmChannel, 0)
|
|
scoreDelta = make(map[int64]int)
|
|
for _, c := range channels {
|
|
nodeID := snmanager.StaticStreamingNodeManager.GetWALLocated(c.GetChannelName())
|
|
// Check if nodeID is in the list of nodeItems
|
|
// The nodeID may not be in the nodeItems when multi replica mode.
|
|
// Only one replica can be assigned to the node that wal is located.
|
|
found := false
|
|
for _, item := range nodeItems {
|
|
if item.ID() == nodeID {
|
|
plans = append(plans, ChannelAssignPlan{
|
|
From: -1,
|
|
To: item.ID(),
|
|
Channel: c,
|
|
})
|
|
found = true
|
|
scoreDelta[item.ID()] += 1
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
notFoundChannels = append(notFoundChannels, c)
|
|
}
|
|
}
|
|
return notFoundChannels, plans, scoreDelta
|
|
}
|
|
|
|
// filterSQNIfStreamingServiceEnabled filter out the non-sqn querynode.
|
|
func filterSQNIfStreamingServiceEnabled(nodes []int64) []int64 {
|
|
if streamingutil.IsStreamingServiceEnabled() {
|
|
sqns := snmanager.StaticStreamingNodeManager.GetStreamingQueryNodeIDs()
|
|
expectedSQNs := make([]int64, 0, len(nodes))
|
|
unexpectedNodes := make([]int64, 0)
|
|
for _, node := range nodes {
|
|
if sqns.Contain(node) {
|
|
expectedSQNs = append(expectedSQNs, node)
|
|
} else {
|
|
unexpectedNodes = append(unexpectedNodes, node)
|
|
}
|
|
}
|
|
if len(unexpectedNodes) > 0 {
|
|
log.Warn("unexpected streaming querynode found when enable streaming service", zap.Int64s("unexpectedNodes", unexpectedNodes))
|
|
}
|
|
return expectedSQNs
|
|
}
|
|
return nodes
|
|
}
|