enhance: Optimize channel node balancing for uneven QN distribution (#42786) (#43423)

issue: #42860
pr: #42786
Fix channel node allocation when QueryNode count is not a multiple of
channel count. The previous algorithm used simple division which caused
uneven distribution with remainders.

Key improvements:
- Implement smart remainder distribution algorithm
- Refactor large function into focused helper functions
- Support two-phase rebalancing (release then allocate)
- Handle edge cases like insufficient nodes gracefully

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2025-07-21 17:04:54 +08:00 committed by GitHub
parent 0c316b9172
commit ad0bf9cad8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 670 additions and 60 deletions

View File

@ -1,8 +1,12 @@
package meta
import (
"sort"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
@ -159,6 +163,10 @@ func (replica *Replica) CopyForWrite() *mutableReplica {
}
}
func (replica *Replica) IsChannelExclusiveModeEnabled() bool {
return replica.replicaPB.ChannelNodeInfos != nil && len(replica.replicaPB.ChannelNodeInfos) > 0
}
// mutableReplica is a mutable type (COW) for manipulating replica meta info for replica manager.
type mutableReplica struct {
*Replica
@ -233,56 +241,197 @@ func (replica *mutableReplica) removeChannelExclusiveNodes(nodes ...int64) {
}
}
func (replica *mutableReplica) TryEnableChannelExclusiveMode(channelNames ...string) {
if replica.replicaPB.ChannelNodeInfos == nil {
replica.replicaPB.ChannelNodeInfos = make(map[string]*querypb.ChannelNodeInfo)
for _, channelName := range channelNames {
replica.replicaPB.ChannelNodeInfos[channelName] = &querypb.ChannelNodeInfo{}
}
}
if replica.exclusiveRWNodeToChannel == nil {
replica.exclusiveRWNodeToChannel = make(map[int64]string)
}
}
func (replica *mutableReplica) DisableChannelExclusiveMode() {
if replica.replicaPB.ChannelNodeInfos != nil {
channelNodeInfos := make(map[string]*querypb.ChannelNodeInfo)
for channelName := range replica.replicaPB.ChannelNodeInfos {
channelNodeInfos[channelName] = &querypb.ChannelNodeInfo{}
}
replica.replicaPB.ChannelNodeInfos = channelNodeInfos
}
replica.exclusiveRWNodeToChannel = make(map[int64]string)
}
// tryBalanceNodeForChannel attempts to balance nodes across channels using an improved algorithm
func (replica *mutableReplica) tryBalanceNodeForChannel() {
channelNodeInfos := replica.replicaPB.GetChannelNodeInfos()
if len(channelNodeInfos) == 0 {
return
}
balancePolicy := paramtable.Get().QueryCoordCfg.Balancer.GetValue()
enableChannelExclusiveMode := balancePolicy == ChannelLevelScoreBalancerName
channelExclusiveFactor := paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.GetAsInt()
// if balance policy or node count doesn't match condition, clean up channel node info
if !enableChannelExclusiveMode || len(replica.rwNodes) < len(channelNodeInfos)*channelExclusiveFactor {
for name := range replica.replicaPB.GetChannelNodeInfos() {
replica.replicaPB.ChannelNodeInfos[name] = &querypb.ChannelNodeInfo{}
}
// Check if channel exclusive mode should be enabled
if !replica.shouldEnableChannelExclusiveMode(channelNodeInfos) {
replica.DisableChannelExclusiveMode()
return
}
if channelNodeInfos != nil {
average := replica.RWNodesCount() / len(channelNodeInfos)
// Calculate optimal node assignments
targetAssignments := replica.calculateOptimalAssignments(channelNodeInfos)
// release node in channel
for channelName, channelNodeInfo := range channelNodeInfos {
currentNodes := channelNodeInfo.GetRwNodes()
if len(currentNodes) > average {
replica.replicaPB.ChannelNodeInfos[channelName].RwNodes = currentNodes[:average]
for _, nodeID := range currentNodes[average:] {
delete(replica.exclusiveRWNodeToChannel, nodeID)
}
}
// Apply the rebalancing with minimal node movement
replica.rebalanceChannelNodes(channelNodeInfos, targetAssignments)
}
// shouldEnableChannelExclusiveMode determines if channel exclusive mode should be enabled
func (replica *mutableReplica) shouldEnableChannelExclusiveMode(channelInfos map[string]*querypb.ChannelNodeInfo) bool {
balancePolicy := paramtable.Get().QueryCoordCfg.Balancer.GetValue()
channelExclusiveFactor := paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.GetAsInt()
return balancePolicy == ChannelLevelScoreBalancerName &&
replica.RWNodesCount() >= len(channelInfos)*channelExclusiveFactor
}
// calculateOptimalAssignments calculates the optimal node count for each channel
func (replica *mutableReplica) calculateOptimalAssignments(channelInfos map[string]*querypb.ChannelNodeInfo) map[string]int {
channelCount := len(channelInfos)
totalNodes := replica.RWNodesCount()
// Get channels sorted by current node count (descending)
sortedChannels := replica.getSortedChannelsByNodeCount(channelInfos)
// Calculate base assignment: average nodes per channel
assignments := make(map[string]int, channelCount)
baseNodes := totalNodes / channelCount
extraNodes := totalNodes % channelCount
// Distribute extra nodes to channels with fewer current nodes first
for i, channel := range sortedChannels {
nodeCount := baseNodes
if i < extraNodes {
nodeCount++
}
assignments[channel] = nodeCount
}
// acquire node in channel
for channelName, channelNodeInfo := range channelNodeInfos {
currentNodes := channelNodeInfo.GetRwNodes()
if len(currentNodes) < average {
for _, nodeID := range replica.rwNodes.Collect() {
if _, ok := replica.exclusiveRWNodeToChannel[nodeID]; !ok {
currentNodes = append(currentNodes, nodeID)
replica.exclusiveRWNodeToChannel[nodeID] = channelName
if len(currentNodes) == average {
break
}
}
}
replica.replicaPB.ChannelNodeInfos[channelName].RwNodes = currentNodes
return assignments
}
// getSortedChannelsByNodeCount returns channels sorted by current node count (descending)
func (replica *mutableReplica) getSortedChannelsByNodeCount(channelInfos map[string]*querypb.ChannelNodeInfo) []string {
// channelNodeAssignment represents a channel's node assignment
type channelNodeAssignment struct {
name string
nodes []int64
}
assignments := make([]channelNodeAssignment, 0, len(channelInfos))
for name, channelNodeInfo := range channelInfos {
assignments = append(assignments, channelNodeAssignment{
name: name,
nodes: channelNodeInfo.GetRwNodes(),
})
}
// Sort by node count (descending) to prioritize channels with more nodes for reduction
sort.Slice(assignments, func(i, j int) bool {
return len(assignments[i].nodes) > len(assignments[j].nodes)
})
channels := make([]string, len(assignments))
for i, assignment := range assignments {
channels[i] = assignment.name
}
return channels
}
// rebalanceChannelNodes performs the actual node rebalancing
func (replica *mutableReplica) rebalanceChannelNodes(channelInfos map[string]*querypb.ChannelNodeInfo, targetAssignments map[string]int) {
// Phase 1: Release excess nodes from over-allocated channels
replica.releaseExcessNodes(channelInfos, targetAssignments)
// Phase 2: Allocate nodes to under-allocated channels
replica.allocateInsufficientNodes(channelInfos, targetAssignments)
}
// releaseExcessNodes releases nodes from channels that have more than their target allocation
func (replica *mutableReplica) releaseExcessNodes(channelInfos map[string]*querypb.ChannelNodeInfo, targetAssignments map[string]int) {
for channelName, channelNodeInfo := range channelInfos {
currentNodes := channelNodeInfo.GetRwNodes()
targetCount := targetAssignments[channelName]
if len(currentNodes) > targetCount {
// Keep the first targetCount nodes, release the rest
replica.replicaPB.ChannelNodeInfos[channelName].RwNodes = currentNodes[:targetCount]
// Remove released nodes from the exclusive mapping
for _, nodeID := range currentNodes[targetCount:] {
delete(replica.exclusiveRWNodeToChannel, nodeID)
}
}
}
}
// allocateInsufficientNodes allocates nodes to channels that need more nodes
func (replica *mutableReplica) allocateInsufficientNodes(channelInfos map[string]*querypb.ChannelNodeInfo, targetAssignments map[string]int) {
// Get available nodes (not exclusively assigned to any channel)
availableNodes := replica.getAvailableNodes()
for channelName, channelNodeInfo := range channelInfos {
currentNodes := channelNodeInfo.GetRwNodes()
targetCount := targetAssignments[channelName]
if len(currentNodes) < targetCount {
neededCount := targetCount - len(currentNodes)
allocatedNodes := replica.allocateNodesFromPool(availableNodes, neededCount, channelName)
// Update channel's node list
updatedNodes := make([]int64, 0, len(currentNodes)+len(allocatedNodes))
updatedNodes = append(updatedNodes, currentNodes...)
updatedNodes = append(updatedNodes, allocatedNodes...)
replica.replicaPB.ChannelNodeInfos[channelName].RwNodes = updatedNodes
}
log.Info("channel exclusive node list",
zap.String("channelName", channelName),
zap.Int64s("nodes", replica.replicaPB.ChannelNodeInfos[channelName].RwNodes))
}
}
// getAvailableNodes returns nodes that are not exclusively assigned to any channel
func (replica *mutableReplica) getAvailableNodes() []int64 {
allNodes := replica.rwNodes.Collect()
availableNodes := make([]int64, 0, len(allNodes))
for _, nodeID := range allNodes {
if _, isExclusive := replica.exclusiveRWNodeToChannel[nodeID]; !isExclusive {
availableNodes = append(availableNodes, nodeID)
}
}
return availableNodes
}
// allocateNodesFromPool allocates nodes from the available pool to a channel
func (replica *mutableReplica) allocateNodesFromPool(availableNodes []int64, neededCount int, channelName string) []int64 {
allocatedCount := 0
allocatedNodes := make([]int64, 0, neededCount)
for _, nodeID := range availableNodes {
if allocatedCount >= neededCount {
break
}
// Check if node is still available (not assigned since we got the list)
if _, isExclusive := replica.exclusiveRWNodeToChannel[nodeID]; !isExclusive {
allocatedNodes = append(allocatedNodes, nodeID)
replica.exclusiveRWNodeToChannel[nodeID] = channelName
allocatedCount++
}
}
return allocatedNodes
}
// IntoReplica returns the immutable replica, After calling this method, the mutable replica should not be used again.
func (replica *mutableReplica) IntoReplica() *Replica {
r := replica.Replica

View File

@ -140,18 +140,17 @@ func (m *ReplicaManager) Spawn(ctx context.Context, collection int64, replicaNum
return nil, err
}
channelExclusiveNodeInfo := make(map[string]*querypb.ChannelNodeInfo)
replica := newReplica(&querypb.Replica{
ID: id,
CollectionID: collection,
ResourceGroup: rgName,
})
if enableChannelExclusiveMode {
for _, channel := range channels {
channelExclusiveNodeInfo[channel] = &querypb.ChannelNodeInfo{}
}
mutableReplica := replica.CopyForWrite()
mutableReplica.TryEnableChannelExclusiveMode(channels...)
replica = mutableReplica.IntoReplica()
}
replicas = append(replicas, newReplica(&querypb.Replica{
ID: id,
CollectionID: collection,
ResourceGroup: rgName,
ChannelNodeInfos: channelExclusiveNodeInfo,
}))
replicas = append(replicas, replica)
}
}
if err := m.put(ctx, replicas...); err != nil {
@ -438,7 +437,10 @@ func (m *ReplicaManager) RecoverNodesInCollection(ctx context.Context, collectio
zap.Int64("replicaID", assignment.GetReplicaID()),
zap.Int64s("newRONodes", roNodes),
zap.Int64s("roToRWNodes", recoverableNodes),
zap.Int64s("newIncomingNodes", incomingNode))
zap.Int64s("newIncomingNodes", incomingNode),
zap.Bool("enableChannelExclusiveMode", mutableReplica.IsChannelExclusiveModeEnabled()),
zap.Any("channelNodeInfos", mutableReplica.replicaPB.GetChannelNodeInfos()),
)
modifiedReplicas = append(modifiedReplicas, mutableReplica.IntoReplica())
})
})

View File

@ -236,6 +236,449 @@ func (suite *ReplicaSuite) TestChannelExclusiveMode() {
}
}
// TestTryBalanceNodeForChannelEmptyChannels tests behavior when no channels exist
func (suite *ReplicaSuite) TestTryBalanceNodeForChannelEmptyChannels() {
r := newReplica(&querypb.Replica{
ID: 1,
CollectionID: 2,
ResourceGroup: DefaultResourceGroupName,
Nodes: []int64{1, 2, 3, 4},
ChannelNodeInfos: make(map[string]*querypb.ChannelNodeInfo),
})
mutableReplica := r.CopyForWrite()
// Should not panic and should return early
mutableReplica.tryBalanceNodeForChannel()
// Verify no changes were made
newR := mutableReplica.IntoReplica()
suite.Equal(0, len(newR.replicaPB.GetChannelNodeInfos()))
}
// TestTryBalanceNodeForChannelDisabledMode tests when channel exclusive mode is disabled
func (suite *ReplicaSuite) TestTryBalanceNodeForChannelDisabledMode() {
// Set balance policy to non-ChannelLevelScoreBalancer
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.Balancer.Key, RoundRobinBalancerName)
defer paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.Balancer.Key)
r := newReplica(&querypb.Replica{
ID: 1,
CollectionID: 2,
ResourceGroup: DefaultResourceGroupName,
Nodes: []int64{1, 2, 3, 4},
ChannelNodeInfos: map[string]*querypb.ChannelNodeInfo{
"channel1": {RwNodes: []int64{1, 2}},
"channel2": {RwNodes: []int64{3, 4}},
},
})
mutableReplica := r.CopyForWrite()
mutableReplica.tryBalanceNodeForChannel()
newR := mutableReplica.IntoReplica()
// Channel node infos should be cleared when exclusive mode is disabled
for _, channelNodeInfo := range newR.replicaPB.GetChannelNodeInfos() {
suite.Equal(0, len(channelNodeInfo.GetRwNodes()))
}
// exclusiveRWNodeToChannel should be reset
suite.Equal(0, len(mutableReplica.exclusiveRWNodeToChannel))
}
// TestTryBalanceNodeForChannelInsufficientNodes tests when there are not enough nodes
func (suite *ReplicaSuite) TestTryBalanceNodeForChannelInsufficientNodes() {
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.Balancer.Key, ChannelLevelScoreBalancerName)
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key, "2")
defer func() {
paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.Balancer.Key)
paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key)
}()
// 2 nodes for 2 channels, but factor is 2, so need 4 nodes minimum
r := newReplica(&querypb.Replica{
ID: 1,
CollectionID: 2,
ResourceGroup: DefaultResourceGroupName,
Nodes: []int64{1, 2},
ChannelNodeInfos: map[string]*querypb.ChannelNodeInfo{
"channel1": {RwNodes: []int64{1}},
"channel2": {RwNodes: []int64{2}},
},
})
mutableReplica := r.CopyForWrite()
mutableReplica.tryBalanceNodeForChannel()
newR := mutableReplica.IntoReplica()
// Should clear channel node infos due to insufficient nodes
for _, channelNodeInfo := range newR.replicaPB.GetChannelNodeInfos() {
suite.Equal(0, len(channelNodeInfo.GetRwNodes()))
}
}
// TestTryBalanceNodeForChannelPerfectBalance tests perfect node distribution
func (suite *ReplicaSuite) TestTryBalanceNodeForChannelPerfectBalance() {
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.Balancer.Key, ChannelLevelScoreBalancerName)
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key, "1")
defer func() {
paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.Balancer.Key)
paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key)
}()
// 6 nodes for 3 channels = 2 nodes per channel
r := newReplica(&querypb.Replica{
ID: 1,
CollectionID: 2,
ResourceGroup: DefaultResourceGroupName,
Nodes: []int64{1, 2, 3, 4, 5, 6},
ChannelNodeInfos: map[string]*querypb.ChannelNodeInfo{
"channel1": {},
"channel2": {},
"channel3": {},
},
})
mutableReplica := r.CopyForWrite()
mutableReplica.tryBalanceNodeForChannel()
newR := mutableReplica.IntoReplica()
// Each channel should have exactly 2 nodes
totalAssignedNodes := 0
for _, channelNodeInfo := range newR.replicaPB.GetChannelNodeInfos() {
suite.Equal(2, len(channelNodeInfo.GetRwNodes()))
totalAssignedNodes += len(channelNodeInfo.GetRwNodes())
}
suite.Equal(6, totalAssignedNodes)
// All nodes should be assigned exclusively
suite.Equal(6, len(mutableReplica.exclusiveRWNodeToChannel))
}
// TestTryBalanceNodeForChannelUnbalancedToBalanced tests rebalancing from unbalanced state
func (suite *ReplicaSuite) TestTryBalanceNodeForChannelUnbalancedToBalanced() {
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.Balancer.Key, ChannelLevelScoreBalancerName)
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key, "1")
defer func() {
paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.Balancer.Key)
paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key)
}()
// Start with unbalanced distribution: channel1 has 4 nodes, channel2 has 1 node, channel3 has 0 nodes
r := newReplica(&querypb.Replica{
ID: 1,
CollectionID: 2,
ResourceGroup: DefaultResourceGroupName,
Nodes: []int64{1, 2, 3, 4, 5},
ChannelNodeInfos: map[string]*querypb.ChannelNodeInfo{
"channel1": {RwNodes: []int64{1, 2, 3, 4}},
"channel2": {RwNodes: []int64{5}},
"channel3": {},
},
})
mutableReplica := r.CopyForWrite()
// Initialize exclusiveRWNodeToChannel to simulate existing assignments
mutableReplica.exclusiveRWNodeToChannel = map[int64]string{
1: "channel1", 2: "channel1", 3: "channel1", 4: "channel1", 5: "channel2",
}
mutableReplica.tryBalanceNodeForChannel()
newR := mutableReplica.IntoReplica()
// Should be rebalanced: 5 nodes / 3 channels = 1 node each, with 2 channels getting 2 nodes
nodeCountPerChannel := make(map[string]int)
totalNodes := 0
for channelName, channelNodeInfo := range newR.replicaPB.GetChannelNodeInfos() {
nodeCount := len(channelNodeInfo.GetRwNodes())
nodeCountPerChannel[channelName] = nodeCount
totalNodes += nodeCount
// Each channel should have 1 or 2 nodes
suite.True(nodeCount >= 1 && nodeCount <= 2, "Channel %s has %d nodes", channelName, nodeCount)
}
suite.Equal(5, totalNodes)
// Two channels should have 2 nodes, one should have 1 node
countOfChannelsWith2Nodes := 0
countOfChannelsWith1Node := 0
for _, count := range nodeCountPerChannel {
if count == 2 {
countOfChannelsWith2Nodes++
} else if count == 1 {
countOfChannelsWith1Node++
}
}
suite.Equal(2, countOfChannelsWith2Nodes)
suite.Equal(1, countOfChannelsWith1Node)
}
// TestTryBalanceNodeForChannelWithExtraNodes tests distribution with extra nodes
func (suite *ReplicaSuite) TestTryBalanceNodeForChannelWithExtraNodes() {
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.Balancer.Key, ChannelLevelScoreBalancerName)
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key, "1")
defer func() {
paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.Balancer.Key)
paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key)
}()
// 7 nodes for 3 channels = 2 nodes per channel + 1 extra
r := newReplica(&querypb.Replica{
ID: 1,
CollectionID: 2,
ResourceGroup: DefaultResourceGroupName,
Nodes: []int64{1, 2, 3, 4, 5, 6, 7},
ChannelNodeInfos: map[string]*querypb.ChannelNodeInfo{
"channel1": {},
"channel2": {},
"channel3": {},
},
})
mutableReplica := r.CopyForWrite()
mutableReplica.tryBalanceNodeForChannel()
newR := mutableReplica.IntoReplica()
// Should distribute extra node: 2 channels get 3 nodes, 1 channel gets 2 nodes
// Or: 1 channel gets 3 nodes, 2 channels get 2 nodes
nodeCountPerChannel := make([]int, 0, 3)
totalNodes := 0
for _, channelNodeInfo := range newR.replicaPB.GetChannelNodeInfos() {
nodeCount := len(channelNodeInfo.GetRwNodes())
nodeCountPerChannel = append(nodeCountPerChannel, nodeCount)
totalNodes += nodeCount
suite.True(nodeCount >= 2 && nodeCount <= 3, "Each channel should have 2 or 3 nodes, got %d", nodeCount)
}
suite.Equal(7, totalNodes)
// Sum should be 7 (2+2+3 or 2+3+2 or 3+2+2)
sum := 0
for _, count := range nodeCountPerChannel {
sum += count
}
suite.Equal(7, sum)
}
// TestShouldEnableChannelExclusiveMode tests the condition checking function
func (suite *ReplicaSuite) TestShouldEnableChannelExclusiveMode() {
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.Balancer.Key, ChannelLevelScoreBalancerName)
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key, "2")
defer func() {
paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.Balancer.Key)
paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key)
}()
r := newReplica(&querypb.Replica{
ID: 1,
CollectionID: 2,
ResourceGroup: DefaultResourceGroupName,
Nodes: []int64{1, 2, 3, 4},
})
mutableReplica := r.CopyForWrite()
// Test with sufficient nodes (4 nodes, 2 channels, factor 2: 4 >= 2*2)
channelInfos := map[string]*querypb.ChannelNodeInfo{
"channel1": {},
"channel2": {},
}
suite.True(mutableReplica.shouldEnableChannelExclusiveMode(channelInfos))
// Test with insufficient nodes (4 nodes, 3 channels, factor 2: 4 < 3*2)
channelInfos = map[string]*querypb.ChannelNodeInfo{
"channel1": {},
"channel2": {},
"channel3": {},
}
suite.False(mutableReplica.shouldEnableChannelExclusiveMode(channelInfos))
// Test with disabled balancer
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.Balancer.Key, RoundRobinBalancerName)
channelInfos = map[string]*querypb.ChannelNodeInfo{
"channel1": {},
"channel2": {},
}
suite.False(mutableReplica.shouldEnableChannelExclusiveMode(channelInfos))
}
// TestClearChannelNodeInfos tests the channel clearing function
func (suite *ReplicaSuite) TestClearChannelNodeInfos() {
r := newReplica(&querypb.Replica{
ID: 1,
CollectionID: 2,
ResourceGroup: DefaultResourceGroupName,
Nodes: []int64{1, 2, 3, 4},
ChannelNodeInfos: map[string]*querypb.ChannelNodeInfo{
"channel1": {RwNodes: []int64{1, 2}},
"channel2": {RwNodes: []int64{3, 4}},
},
})
mutableReplica := r.CopyForWrite()
mutableReplica.exclusiveRWNodeToChannel = map[int64]string{
1: "channel1", 2: "channel1", 3: "channel2", 4: "channel2",
}
mutableReplica.DisableChannelExclusiveMode()
// All channel node infos should be cleared
for _, channelNodeInfo := range mutableReplica.replicaPB.GetChannelNodeInfos() {
suite.Equal(0, len(channelNodeInfo.GetRwNodes()))
}
// exclusiveRWNodeToChannel should be reset
suite.Equal(0, len(mutableReplica.exclusiveRWNodeToChannel))
}
// TestGetAvailableNodes tests the available nodes retrieval function
func (suite *ReplicaSuite) TestGetAvailableNodes() {
r := newReplica(&querypb.Replica{
ID: 1,
CollectionID: 2,
ResourceGroup: DefaultResourceGroupName,
Nodes: []int64{1, 2, 3, 4, 5},
})
mutableReplica := r.CopyForWrite()
// Initially all nodes should be available
availableNodes := mutableReplica.getAvailableNodes()
suite.ElementsMatch([]int64{1, 2, 3, 4, 5}, availableNodes)
// Mark some nodes as exclusively assigned
mutableReplica.exclusiveRWNodeToChannel = map[int64]string{
1: "channel1",
3: "channel2",
}
availableNodes = mutableReplica.getAvailableNodes()
suite.ElementsMatch([]int64{2, 4, 5}, availableNodes)
}
// TestAllocateNodesFromPool tests the node allocation function
func (suite *ReplicaSuite) TestAllocateNodesFromPool() {
r := newReplica(&querypb.Replica{
ID: 1,
CollectionID: 2,
ResourceGroup: DefaultResourceGroupName,
Nodes: []int64{1, 2, 3, 4, 5},
})
mutableReplica := r.CopyForWrite()
mutableReplica.exclusiveRWNodeToChannel = make(map[int64]string)
// Test allocating 3 nodes from pool of 5
availableNodes := []int64{1, 2, 3, 4, 5}
allocatedNodes := mutableReplica.allocateNodesFromPool(availableNodes, 3, "channel1")
suite.Equal(3, len(allocatedNodes))
for _, nodeID := range allocatedNodes {
suite.Equal("channel1", mutableReplica.exclusiveRWNodeToChannel[nodeID])
}
// Test allocating more nodes than available
availableNodes = []int64{6, 7}
allocatedNodes = mutableReplica.allocateNodesFromPool(availableNodes, 5, "channel2")
suite.Equal(2, len(allocatedNodes))
suite.ElementsMatch([]int64{6, 7}, allocatedNodes)
for _, nodeID := range allocatedNodes {
suite.Equal("channel2", mutableReplica.exclusiveRWNodeToChannel[nodeID])
}
// Test allocating from empty pool
availableNodes = []int64{}
allocatedNodes = mutableReplica.allocateNodesFromPool(availableNodes, 2, "channel3")
suite.Equal(0, len(allocatedNodes))
}
// TestGetSortedChannelsByNodeCount tests the channel sorting function
func (suite *ReplicaSuite) TestGetSortedChannelsByNodeCount() {
r := newReplica(&querypb.Replica{
ID: 1,
CollectionID: 2,
ResourceGroup: DefaultResourceGroupName,
})
mutableReplica := r.CopyForWrite()
channelInfos := map[string]*querypb.ChannelNodeInfo{
"channel1": {RwNodes: []int64{1}}, // 1 node
"channel2": {RwNodes: []int64{2, 3, 4}}, // 3 nodes
"channel3": {RwNodes: []int64{5, 6}}, // 2 nodes
"channel4": {RwNodes: []int64{}}, // 0 nodes
}
sortedChannels := mutableReplica.getSortedChannelsByNodeCount(channelInfos)
// Should be sorted by node count descending: channel2(3), channel3(2), channel1(1), channel4(0)
suite.Equal(4, len(sortedChannels))
suite.Equal("channel2", sortedChannels[0])
suite.Equal("channel3", sortedChannels[1])
suite.Equal("channel1", sortedChannels[2])
suite.Equal("channel4", sortedChannels[3])
}
// TestCalculateOptimalAssignments tests the assignment calculation function
func (suite *ReplicaSuite) TestCalculateOptimalAssignments() {
r := newReplica(&querypb.Replica{
ID: 1,
CollectionID: 2,
ResourceGroup: DefaultResourceGroupName,
Nodes: []int64{1, 2, 3, 4, 5, 6, 7},
})
mutableReplica := r.CopyForWrite()
// Test perfect division: 6 nodes, 3 channels = 2 nodes each
channelInfos := map[string]*querypb.ChannelNodeInfo{
"channel1": {RwNodes: []int64{1, 2, 3}},
"channel2": {RwNodes: []int64{4}},
"channel3": {RwNodes: []int64{}},
}
// Mock RWNodesCount to return 6 for this test
originalNodes := mutableReplica.rwNodes
mutableReplica.rwNodes.Clear()
mutableReplica.rwNodes.Insert(1, 2, 3, 4, 5, 6)
assignments := mutableReplica.calculateOptimalAssignments(channelInfos)
suite.Equal(3, len(assignments))
totalAssigned := 0
for _, count := range assignments {
totalAssigned += count
suite.True(count >= 2 && count <= 2, "Each channel should get exactly 2 nodes")
}
suite.Equal(6, totalAssigned)
// Restore original nodes
mutableReplica.rwNodes = originalNodes
// Test with remainder: 7 nodes, 3 channels = 2 nodes each + 1 extra
mutableReplica.rwNodes.Clear()
mutableReplica.rwNodes.Insert(1, 2, 3, 4, 5, 6, 7)
assignments = mutableReplica.calculateOptimalAssignments(channelInfos)
suite.Equal(3, len(assignments))
totalAssigned = 0
countsOfTwo := 0
countsOfThree := 0
for _, count := range assignments {
totalAssigned += count
if count == 2 {
countsOfTwo++
} else if count == 3 {
countsOfThree++
}
}
suite.Equal(7, totalAssigned)
suite.Equal(2, countsOfTwo) // 2 channels get 2 nodes
suite.Equal(1, countsOfThree) // 1 channel gets 3 nodes
}
func TestReplica(t *testing.T) {
suite.Run(t, new(ReplicaSuite))
}

View File

@ -21,21 +21,24 @@ import (
"sync"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
)
// check replica, find read only nodes and remove it from replica if all segment/channel has been moved
type ReplicaObserver struct {
cancel context.CancelFunc
wg sync.WaitGroup
meta *meta.Meta
distMgr *meta.DistributionManager
cancel context.CancelFunc
wg sync.WaitGroup
meta *meta.Meta
distMgr *meta.DistributionManager
targetMgr meta.TargetManagerInterface
startOnce sync.Once
stopOnce sync.Once
@ -93,26 +96,41 @@ func (ob *ReplicaObserver) waitNodeChangedOrTimeout(ctx context.Context, listene
func (ob *ReplicaObserver) checkNodesInReplica() {
ctx := context.Background()
log := log.Ctx(ctx).WithRateGroup("qcv2.replicaObserver", 1, 60)
log := log.Ctx(ctx).WithRateGroup("qcv2.checkNodesInReplica", 1, 60)
collections := ob.meta.GetAll(ctx)
for _, collectionID := range collections {
utils.RecoverReplicaOfCollection(ctx, ob.meta, collectionID)
}
balancePolicy := paramtable.Get().QueryCoordCfg.Balancer.GetValue()
enableChannelExclusiveMode := balancePolicy == meta.ChannelLevelScoreBalancerName
// check all ro nodes, remove it from replica if all segment/channel has been moved
for _, collectionID := range collections {
replicas := ob.meta.ReplicaManager.GetByCollection(ctx, collectionID)
for _, replica := range replicas {
if enableChannelExclusiveMode && !replica.IsChannelExclusiveModeEnabled() {
// register channel for enable exclusive mode
mutableReplica := replica.CopyForWrite()
channels := ob.targetMgr.GetDmChannelsByCollection(ctx, collectionID, meta.CurrentTargetFirst)
mutableReplica.TryEnableChannelExclusiveMode(lo.Keys(channels)...)
replica = mutableReplica.IntoReplica()
ob.meta.ReplicaManager.Put(ctx, replica)
}
roNodes := replica.GetRONodes()
rwNodes := replica.GetRWNodes()
if len(roNodes) == 0 {
continue
}
log.RatedInfo(10, "found ro nodes in replica",
logger := log.With(
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.Int64s("RONodes", roNodes),
zap.Int64s("roNodes", roNodes),
zap.Int64s("rwNodes", rwNodes),
)
log.RatedInfo(10, "found ro nodes in replica")
removeNodes := make([]int64, 0, len(roNodes))
for _, node := range roNodes {
channels := ob.distMgr.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(node))
@ -124,18 +142,15 @@ func (ob *ReplicaObserver) checkNodesInReplica() {
if len(removeNodes) == 0 {
continue
}
logger := log.With(
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replica.GetID()),
zap.Int64s("removedNodes", removeNodes),
zap.Int64s("roNodes", roNodes),
zap.Int64s("rwNodes", rwNodes),
)
if err := ob.meta.ReplicaManager.RemoveNode(ctx, replica.GetID(), removeNodes...); err != nil {
logger.Warn("fail to remove node from replica", zap.Error(err))
logger.Warn("fail to remove node from replica",
zap.Int64s("removedNodes", removeNodes),
zap.Error(err))
continue
}
logger.Info("all segment/channel has been removed from ro node, try to remove it from replica")
logger.Info("all segment/channel has been removed from ro node, remove it from replica",
zap.Int64s("removedNodes", removeNodes),
)
}
}
}

View File

@ -54,6 +54,7 @@ func (s *ChannelExclusiveBalanceSuit) SetupSuite() {
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.Balancer.Key, meta.ChannelLevelScoreBalancerName)
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key, "2")
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.CheckNodeInReplicaInterval.Key, "3")
// disable compaction
paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableCompaction.Key, "false")