From ad0bf9cad8f48a2d959ab3ebfe8a8dc6573301ab Mon Sep 17 00:00:00 2001 From: wei liu Date: Mon, 21 Jul 2025 17:04:54 +0800 Subject: [PATCH] enhance: Optimize channel node balancing for uneven QN distribution (#42786) (#43423) issue: #42860 pr: #42786 Fix channel node allocation when QueryNode count is not a multiple of channel count. The previous algorithm used simple division which caused uneven distribution with remainders. Key improvements: - Implement smart remainder distribution algorithm - Refactor large function into focused helper functions - Support two-phase rebalancing (release then allocate) - Handle edge cases like insufficient nodes gracefully --------- Signed-off-by: Wei Liu --- internal/querycoordv2/meta/replica.go | 215 +++++++-- internal/querycoordv2/meta/replica_manager.go | 24 +- internal/querycoordv2/meta/replica_test.go | 443 ++++++++++++++++++ .../observers/replica_observer.go | 47 +- .../balance/channel_exclusive_balance_test.go | 1 + 5 files changed, 670 insertions(+), 60 deletions(-) diff --git a/internal/querycoordv2/meta/replica.go b/internal/querycoordv2/meta/replica.go index bee254aebd..8f134e1aad 100644 --- a/internal/querycoordv2/meta/replica.go +++ b/internal/querycoordv2/meta/replica.go @@ -1,8 +1,12 @@ package meta import ( + "sort" + + "go.uber.org/zap" "google.golang.org/protobuf/proto" + "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/querypb" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" @@ -159,6 +163,10 @@ func (replica *Replica) CopyForWrite() *mutableReplica { } } +func (replica *Replica) IsChannelExclusiveModeEnabled() bool { + return replica.replicaPB.ChannelNodeInfos != nil && len(replica.replicaPB.ChannelNodeInfos) > 0 +} + // mutableReplica is a mutable type (COW) for manipulating replica meta info for replica manager. type mutableReplica struct { *Replica @@ -233,56 +241,197 @@ func (replica *mutableReplica) removeChannelExclusiveNodes(nodes ...int64) { } } +func (replica *mutableReplica) TryEnableChannelExclusiveMode(channelNames ...string) { + if replica.replicaPB.ChannelNodeInfos == nil { + replica.replicaPB.ChannelNodeInfos = make(map[string]*querypb.ChannelNodeInfo) + for _, channelName := range channelNames { + replica.replicaPB.ChannelNodeInfos[channelName] = &querypb.ChannelNodeInfo{} + } + } + if replica.exclusiveRWNodeToChannel == nil { + replica.exclusiveRWNodeToChannel = make(map[int64]string) + } +} + +func (replica *mutableReplica) DisableChannelExclusiveMode() { + if replica.replicaPB.ChannelNodeInfos != nil { + channelNodeInfos := make(map[string]*querypb.ChannelNodeInfo) + for channelName := range replica.replicaPB.ChannelNodeInfos { + channelNodeInfos[channelName] = &querypb.ChannelNodeInfo{} + } + replica.replicaPB.ChannelNodeInfos = channelNodeInfos + } + replica.exclusiveRWNodeToChannel = make(map[int64]string) +} + +// tryBalanceNodeForChannel attempts to balance nodes across channels using an improved algorithm func (replica *mutableReplica) tryBalanceNodeForChannel() { channelNodeInfos := replica.replicaPB.GetChannelNodeInfos() if len(channelNodeInfos) == 0 { return } - balancePolicy := paramtable.Get().QueryCoordCfg.Balancer.GetValue() - enableChannelExclusiveMode := balancePolicy == ChannelLevelScoreBalancerName - channelExclusiveFactor := paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.GetAsInt() - // if balance policy or node count doesn't match condition, clean up channel node info - if !enableChannelExclusiveMode || len(replica.rwNodes) < len(channelNodeInfos)*channelExclusiveFactor { - for name := range replica.replicaPB.GetChannelNodeInfos() { - replica.replicaPB.ChannelNodeInfos[name] = &querypb.ChannelNodeInfo{} - } + // Check if channel exclusive mode should be enabled + if !replica.shouldEnableChannelExclusiveMode(channelNodeInfos) { + replica.DisableChannelExclusiveMode() return } - if channelNodeInfos != nil { - average := replica.RWNodesCount() / len(channelNodeInfos) + // Calculate optimal node assignments + targetAssignments := replica.calculateOptimalAssignments(channelNodeInfos) - // release node in channel - for channelName, channelNodeInfo := range channelNodeInfos { - currentNodes := channelNodeInfo.GetRwNodes() - if len(currentNodes) > average { - replica.replicaPB.ChannelNodeInfos[channelName].RwNodes = currentNodes[:average] - for _, nodeID := range currentNodes[average:] { - delete(replica.exclusiveRWNodeToChannel, nodeID) - } - } + // Apply the rebalancing with minimal node movement + replica.rebalanceChannelNodes(channelNodeInfos, targetAssignments) +} + +// shouldEnableChannelExclusiveMode determines if channel exclusive mode should be enabled +func (replica *mutableReplica) shouldEnableChannelExclusiveMode(channelInfos map[string]*querypb.ChannelNodeInfo) bool { + balancePolicy := paramtable.Get().QueryCoordCfg.Balancer.GetValue() + channelExclusiveFactor := paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.GetAsInt() + return balancePolicy == ChannelLevelScoreBalancerName && + replica.RWNodesCount() >= len(channelInfos)*channelExclusiveFactor +} + +// calculateOptimalAssignments calculates the optimal node count for each channel +func (replica *mutableReplica) calculateOptimalAssignments(channelInfos map[string]*querypb.ChannelNodeInfo) map[string]int { + channelCount := len(channelInfos) + totalNodes := replica.RWNodesCount() + + // Get channels sorted by current node count (descending) + sortedChannels := replica.getSortedChannelsByNodeCount(channelInfos) + // Calculate base assignment: average nodes per channel + assignments := make(map[string]int, channelCount) + baseNodes := totalNodes / channelCount + extraNodes := totalNodes % channelCount + + // Distribute extra nodes to channels with fewer current nodes first + for i, channel := range sortedChannels { + nodeCount := baseNodes + if i < extraNodes { + nodeCount++ } + assignments[channel] = nodeCount + } - // acquire node in channel - for channelName, channelNodeInfo := range channelNodeInfos { - currentNodes := channelNodeInfo.GetRwNodes() - if len(currentNodes) < average { - for _, nodeID := range replica.rwNodes.Collect() { - if _, ok := replica.exclusiveRWNodeToChannel[nodeID]; !ok { - currentNodes = append(currentNodes, nodeID) - replica.exclusiveRWNodeToChannel[nodeID] = channelName - if len(currentNodes) == average { - break - } - } - } - replica.replicaPB.ChannelNodeInfos[channelName].RwNodes = currentNodes + return assignments +} + +// getSortedChannelsByNodeCount returns channels sorted by current node count (descending) +func (replica *mutableReplica) getSortedChannelsByNodeCount(channelInfos map[string]*querypb.ChannelNodeInfo) []string { + // channelNodeAssignment represents a channel's node assignment + type channelNodeAssignment struct { + name string + nodes []int64 + } + + assignments := make([]channelNodeAssignment, 0, len(channelInfos)) + for name, channelNodeInfo := range channelInfos { + assignments = append(assignments, channelNodeAssignment{ + name: name, + nodes: channelNodeInfo.GetRwNodes(), + }) + } + + // Sort by node count (descending) to prioritize channels with more nodes for reduction + sort.Slice(assignments, func(i, j int) bool { + return len(assignments[i].nodes) > len(assignments[j].nodes) + }) + + channels := make([]string, len(assignments)) + for i, assignment := range assignments { + channels[i] = assignment.name + } + + return channels +} + +// rebalanceChannelNodes performs the actual node rebalancing +func (replica *mutableReplica) rebalanceChannelNodes(channelInfos map[string]*querypb.ChannelNodeInfo, targetAssignments map[string]int) { + // Phase 1: Release excess nodes from over-allocated channels + replica.releaseExcessNodes(channelInfos, targetAssignments) + + // Phase 2: Allocate nodes to under-allocated channels + replica.allocateInsufficientNodes(channelInfos, targetAssignments) +} + +// releaseExcessNodes releases nodes from channels that have more than their target allocation +func (replica *mutableReplica) releaseExcessNodes(channelInfos map[string]*querypb.ChannelNodeInfo, targetAssignments map[string]int) { + for channelName, channelNodeInfo := range channelInfos { + currentNodes := channelNodeInfo.GetRwNodes() + targetCount := targetAssignments[channelName] + + if len(currentNodes) > targetCount { + // Keep the first targetCount nodes, release the rest + replica.replicaPB.ChannelNodeInfos[channelName].RwNodes = currentNodes[:targetCount] + + // Remove released nodes from the exclusive mapping + for _, nodeID := range currentNodes[targetCount:] { + delete(replica.exclusiveRWNodeToChannel, nodeID) } } } } +// allocateInsufficientNodes allocates nodes to channels that need more nodes +func (replica *mutableReplica) allocateInsufficientNodes(channelInfos map[string]*querypb.ChannelNodeInfo, targetAssignments map[string]int) { + // Get available nodes (not exclusively assigned to any channel) + availableNodes := replica.getAvailableNodes() + + for channelName, channelNodeInfo := range channelInfos { + currentNodes := channelNodeInfo.GetRwNodes() + targetCount := targetAssignments[channelName] + + if len(currentNodes) < targetCount { + neededCount := targetCount - len(currentNodes) + allocatedNodes := replica.allocateNodesFromPool(availableNodes, neededCount, channelName) + + // Update channel's node list + updatedNodes := make([]int64, 0, len(currentNodes)+len(allocatedNodes)) + updatedNodes = append(updatedNodes, currentNodes...) + updatedNodes = append(updatedNodes, allocatedNodes...) + replica.replicaPB.ChannelNodeInfos[channelName].RwNodes = updatedNodes + } + log.Info("channel exclusive node list", + zap.String("channelName", channelName), + zap.Int64s("nodes", replica.replicaPB.ChannelNodeInfos[channelName].RwNodes)) + } +} + +// getAvailableNodes returns nodes that are not exclusively assigned to any channel +func (replica *mutableReplica) getAvailableNodes() []int64 { + allNodes := replica.rwNodes.Collect() + availableNodes := make([]int64, 0, len(allNodes)) + + for _, nodeID := range allNodes { + if _, isExclusive := replica.exclusiveRWNodeToChannel[nodeID]; !isExclusive { + availableNodes = append(availableNodes, nodeID) + } + } + + return availableNodes +} + +// allocateNodesFromPool allocates nodes from the available pool to a channel +func (replica *mutableReplica) allocateNodesFromPool(availableNodes []int64, neededCount int, channelName string) []int64 { + allocatedCount := 0 + allocatedNodes := make([]int64, 0, neededCount) + + for _, nodeID := range availableNodes { + if allocatedCount >= neededCount { + break + } + + // Check if node is still available (not assigned since we got the list) + if _, isExclusive := replica.exclusiveRWNodeToChannel[nodeID]; !isExclusive { + allocatedNodes = append(allocatedNodes, nodeID) + replica.exclusiveRWNodeToChannel[nodeID] = channelName + allocatedCount++ + } + } + + return allocatedNodes +} + // IntoReplica returns the immutable replica, After calling this method, the mutable replica should not be used again. func (replica *mutableReplica) IntoReplica() *Replica { r := replica.Replica diff --git a/internal/querycoordv2/meta/replica_manager.go b/internal/querycoordv2/meta/replica_manager.go index fdd867aa30..75a47c6a2a 100644 --- a/internal/querycoordv2/meta/replica_manager.go +++ b/internal/querycoordv2/meta/replica_manager.go @@ -140,18 +140,17 @@ func (m *ReplicaManager) Spawn(ctx context.Context, collection int64, replicaNum return nil, err } - channelExclusiveNodeInfo := make(map[string]*querypb.ChannelNodeInfo) + replica := newReplica(&querypb.Replica{ + ID: id, + CollectionID: collection, + ResourceGroup: rgName, + }) if enableChannelExclusiveMode { - for _, channel := range channels { - channelExclusiveNodeInfo[channel] = &querypb.ChannelNodeInfo{} - } + mutableReplica := replica.CopyForWrite() + mutableReplica.TryEnableChannelExclusiveMode(channels...) + replica = mutableReplica.IntoReplica() } - replicas = append(replicas, newReplica(&querypb.Replica{ - ID: id, - CollectionID: collection, - ResourceGroup: rgName, - ChannelNodeInfos: channelExclusiveNodeInfo, - })) + replicas = append(replicas, replica) } } if err := m.put(ctx, replicas...); err != nil { @@ -438,7 +437,10 @@ func (m *ReplicaManager) RecoverNodesInCollection(ctx context.Context, collectio zap.Int64("replicaID", assignment.GetReplicaID()), zap.Int64s("newRONodes", roNodes), zap.Int64s("roToRWNodes", recoverableNodes), - zap.Int64s("newIncomingNodes", incomingNode)) + zap.Int64s("newIncomingNodes", incomingNode), + zap.Bool("enableChannelExclusiveMode", mutableReplica.IsChannelExclusiveModeEnabled()), + zap.Any("channelNodeInfos", mutableReplica.replicaPB.GetChannelNodeInfos()), + ) modifiedReplicas = append(modifiedReplicas, mutableReplica.IntoReplica()) }) }) diff --git a/internal/querycoordv2/meta/replica_test.go b/internal/querycoordv2/meta/replica_test.go index eb69c584e9..924e3db956 100644 --- a/internal/querycoordv2/meta/replica_test.go +++ b/internal/querycoordv2/meta/replica_test.go @@ -236,6 +236,449 @@ func (suite *ReplicaSuite) TestChannelExclusiveMode() { } } +// TestTryBalanceNodeForChannelEmptyChannels tests behavior when no channels exist +func (suite *ReplicaSuite) TestTryBalanceNodeForChannelEmptyChannels() { + r := newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 2, + ResourceGroup: DefaultResourceGroupName, + Nodes: []int64{1, 2, 3, 4}, + ChannelNodeInfos: make(map[string]*querypb.ChannelNodeInfo), + }) + + mutableReplica := r.CopyForWrite() + // Should not panic and should return early + mutableReplica.tryBalanceNodeForChannel() + + // Verify no changes were made + newR := mutableReplica.IntoReplica() + suite.Equal(0, len(newR.replicaPB.GetChannelNodeInfos())) +} + +// TestTryBalanceNodeForChannelDisabledMode tests when channel exclusive mode is disabled +func (suite *ReplicaSuite) TestTryBalanceNodeForChannelDisabledMode() { + // Set balance policy to non-ChannelLevelScoreBalancer + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.Balancer.Key, RoundRobinBalancerName) + defer paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.Balancer.Key) + + r := newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 2, + ResourceGroup: DefaultResourceGroupName, + Nodes: []int64{1, 2, 3, 4}, + ChannelNodeInfos: map[string]*querypb.ChannelNodeInfo{ + "channel1": {RwNodes: []int64{1, 2}}, + "channel2": {RwNodes: []int64{3, 4}}, + }, + }) + + mutableReplica := r.CopyForWrite() + mutableReplica.tryBalanceNodeForChannel() + + newR := mutableReplica.IntoReplica() + // Channel node infos should be cleared when exclusive mode is disabled + for _, channelNodeInfo := range newR.replicaPB.GetChannelNodeInfos() { + suite.Equal(0, len(channelNodeInfo.GetRwNodes())) + } + // exclusiveRWNodeToChannel should be reset + suite.Equal(0, len(mutableReplica.exclusiveRWNodeToChannel)) +} + +// TestTryBalanceNodeForChannelInsufficientNodes tests when there are not enough nodes +func (suite *ReplicaSuite) TestTryBalanceNodeForChannelInsufficientNodes() { + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.Balancer.Key, ChannelLevelScoreBalancerName) + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key, "2") + defer func() { + paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.Balancer.Key) + paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key) + }() + + // 2 nodes for 2 channels, but factor is 2, so need 4 nodes minimum + r := newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 2, + ResourceGroup: DefaultResourceGroupName, + Nodes: []int64{1, 2}, + ChannelNodeInfos: map[string]*querypb.ChannelNodeInfo{ + "channel1": {RwNodes: []int64{1}}, + "channel2": {RwNodes: []int64{2}}, + }, + }) + + mutableReplica := r.CopyForWrite() + mutableReplica.tryBalanceNodeForChannel() + + newR := mutableReplica.IntoReplica() + // Should clear channel node infos due to insufficient nodes + for _, channelNodeInfo := range newR.replicaPB.GetChannelNodeInfos() { + suite.Equal(0, len(channelNodeInfo.GetRwNodes())) + } +} + +// TestTryBalanceNodeForChannelPerfectBalance tests perfect node distribution +func (suite *ReplicaSuite) TestTryBalanceNodeForChannelPerfectBalance() { + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.Balancer.Key, ChannelLevelScoreBalancerName) + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key, "1") + defer func() { + paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.Balancer.Key) + paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key) + }() + + // 6 nodes for 3 channels = 2 nodes per channel + r := newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 2, + ResourceGroup: DefaultResourceGroupName, + Nodes: []int64{1, 2, 3, 4, 5, 6}, + ChannelNodeInfos: map[string]*querypb.ChannelNodeInfo{ + "channel1": {}, + "channel2": {}, + "channel3": {}, + }, + }) + + mutableReplica := r.CopyForWrite() + mutableReplica.tryBalanceNodeForChannel() + + newR := mutableReplica.IntoReplica() + + // Each channel should have exactly 2 nodes + totalAssignedNodes := 0 + for _, channelNodeInfo := range newR.replicaPB.GetChannelNodeInfos() { + suite.Equal(2, len(channelNodeInfo.GetRwNodes())) + totalAssignedNodes += len(channelNodeInfo.GetRwNodes()) + } + suite.Equal(6, totalAssignedNodes) + + // All nodes should be assigned exclusively + suite.Equal(6, len(mutableReplica.exclusiveRWNodeToChannel)) +} + +// TestTryBalanceNodeForChannelUnbalancedToBalanced tests rebalancing from unbalanced state +func (suite *ReplicaSuite) TestTryBalanceNodeForChannelUnbalancedToBalanced() { + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.Balancer.Key, ChannelLevelScoreBalancerName) + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key, "1") + defer func() { + paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.Balancer.Key) + paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key) + }() + + // Start with unbalanced distribution: channel1 has 4 nodes, channel2 has 1 node, channel3 has 0 nodes + r := newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 2, + ResourceGroup: DefaultResourceGroupName, + Nodes: []int64{1, 2, 3, 4, 5}, + ChannelNodeInfos: map[string]*querypb.ChannelNodeInfo{ + "channel1": {RwNodes: []int64{1, 2, 3, 4}}, + "channel2": {RwNodes: []int64{5}}, + "channel3": {}, + }, + }) + + mutableReplica := r.CopyForWrite() + // Initialize exclusiveRWNodeToChannel to simulate existing assignments + mutableReplica.exclusiveRWNodeToChannel = map[int64]string{ + 1: "channel1", 2: "channel1", 3: "channel1", 4: "channel1", 5: "channel2", + } + + mutableReplica.tryBalanceNodeForChannel() + + newR := mutableReplica.IntoReplica() + + // Should be rebalanced: 5 nodes / 3 channels = 1 node each, with 2 channels getting 2 nodes + nodeCountPerChannel := make(map[string]int) + totalNodes := 0 + for channelName, channelNodeInfo := range newR.replicaPB.GetChannelNodeInfos() { + nodeCount := len(channelNodeInfo.GetRwNodes()) + nodeCountPerChannel[channelName] = nodeCount + totalNodes += nodeCount + // Each channel should have 1 or 2 nodes + suite.True(nodeCount >= 1 && nodeCount <= 2, "Channel %s has %d nodes", channelName, nodeCount) + } + + suite.Equal(5, totalNodes) + // Two channels should have 2 nodes, one should have 1 node + countOfChannelsWith2Nodes := 0 + countOfChannelsWith1Node := 0 + for _, count := range nodeCountPerChannel { + if count == 2 { + countOfChannelsWith2Nodes++ + } else if count == 1 { + countOfChannelsWith1Node++ + } + } + suite.Equal(2, countOfChannelsWith2Nodes) + suite.Equal(1, countOfChannelsWith1Node) +} + +// TestTryBalanceNodeForChannelWithExtraNodes tests distribution with extra nodes +func (suite *ReplicaSuite) TestTryBalanceNodeForChannelWithExtraNodes() { + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.Balancer.Key, ChannelLevelScoreBalancerName) + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key, "1") + defer func() { + paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.Balancer.Key) + paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key) + }() + + // 7 nodes for 3 channels = 2 nodes per channel + 1 extra + r := newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 2, + ResourceGroup: DefaultResourceGroupName, + Nodes: []int64{1, 2, 3, 4, 5, 6, 7}, + ChannelNodeInfos: map[string]*querypb.ChannelNodeInfo{ + "channel1": {}, + "channel2": {}, + "channel3": {}, + }, + }) + + mutableReplica := r.CopyForWrite() + mutableReplica.tryBalanceNodeForChannel() + + newR := mutableReplica.IntoReplica() + + // Should distribute extra node: 2 channels get 3 nodes, 1 channel gets 2 nodes + // Or: 1 channel gets 3 nodes, 2 channels get 2 nodes + nodeCountPerChannel := make([]int, 0, 3) + totalNodes := 0 + for _, channelNodeInfo := range newR.replicaPB.GetChannelNodeInfos() { + nodeCount := len(channelNodeInfo.GetRwNodes()) + nodeCountPerChannel = append(nodeCountPerChannel, nodeCount) + totalNodes += nodeCount + suite.True(nodeCount >= 2 && nodeCount <= 3, "Each channel should have 2 or 3 nodes, got %d", nodeCount) + } + + suite.Equal(7, totalNodes) + + // Sum should be 7 (2+2+3 or 2+3+2 or 3+2+2) + sum := 0 + for _, count := range nodeCountPerChannel { + sum += count + } + suite.Equal(7, sum) +} + +// TestShouldEnableChannelExclusiveMode tests the condition checking function +func (suite *ReplicaSuite) TestShouldEnableChannelExclusiveMode() { + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.Balancer.Key, ChannelLevelScoreBalancerName) + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key, "2") + defer func() { + paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.Balancer.Key) + paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key) + }() + + r := newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 2, + ResourceGroup: DefaultResourceGroupName, + Nodes: []int64{1, 2, 3, 4}, + }) + mutableReplica := r.CopyForWrite() + + // Test with sufficient nodes (4 nodes, 2 channels, factor 2: 4 >= 2*2) + channelInfos := map[string]*querypb.ChannelNodeInfo{ + "channel1": {}, + "channel2": {}, + } + suite.True(mutableReplica.shouldEnableChannelExclusiveMode(channelInfos)) + + // Test with insufficient nodes (4 nodes, 3 channels, factor 2: 4 < 3*2) + channelInfos = map[string]*querypb.ChannelNodeInfo{ + "channel1": {}, + "channel2": {}, + "channel3": {}, + } + suite.False(mutableReplica.shouldEnableChannelExclusiveMode(channelInfos)) + + // Test with disabled balancer + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.Balancer.Key, RoundRobinBalancerName) + channelInfos = map[string]*querypb.ChannelNodeInfo{ + "channel1": {}, + "channel2": {}, + } + suite.False(mutableReplica.shouldEnableChannelExclusiveMode(channelInfos)) +} + +// TestClearChannelNodeInfos tests the channel clearing function +func (suite *ReplicaSuite) TestClearChannelNodeInfos() { + r := newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 2, + ResourceGroup: DefaultResourceGroupName, + Nodes: []int64{1, 2, 3, 4}, + ChannelNodeInfos: map[string]*querypb.ChannelNodeInfo{ + "channel1": {RwNodes: []int64{1, 2}}, + "channel2": {RwNodes: []int64{3, 4}}, + }, + }) + + mutableReplica := r.CopyForWrite() + mutableReplica.exclusiveRWNodeToChannel = map[int64]string{ + 1: "channel1", 2: "channel1", 3: "channel2", 4: "channel2", + } + + mutableReplica.DisableChannelExclusiveMode() + + // All channel node infos should be cleared + for _, channelNodeInfo := range mutableReplica.replicaPB.GetChannelNodeInfos() { + suite.Equal(0, len(channelNodeInfo.GetRwNodes())) + } + + // exclusiveRWNodeToChannel should be reset + suite.Equal(0, len(mutableReplica.exclusiveRWNodeToChannel)) +} + +// TestGetAvailableNodes tests the available nodes retrieval function +func (suite *ReplicaSuite) TestGetAvailableNodes() { + r := newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 2, + ResourceGroup: DefaultResourceGroupName, + Nodes: []int64{1, 2, 3, 4, 5}, + }) + + mutableReplica := r.CopyForWrite() + + // Initially all nodes should be available + availableNodes := mutableReplica.getAvailableNodes() + suite.ElementsMatch([]int64{1, 2, 3, 4, 5}, availableNodes) + + // Mark some nodes as exclusively assigned + mutableReplica.exclusiveRWNodeToChannel = map[int64]string{ + 1: "channel1", + 3: "channel2", + } + + availableNodes = mutableReplica.getAvailableNodes() + suite.ElementsMatch([]int64{2, 4, 5}, availableNodes) +} + +// TestAllocateNodesFromPool tests the node allocation function +func (suite *ReplicaSuite) TestAllocateNodesFromPool() { + r := newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 2, + ResourceGroup: DefaultResourceGroupName, + Nodes: []int64{1, 2, 3, 4, 5}, + }) + + mutableReplica := r.CopyForWrite() + mutableReplica.exclusiveRWNodeToChannel = make(map[int64]string) + + // Test allocating 3 nodes from pool of 5 + availableNodes := []int64{1, 2, 3, 4, 5} + allocatedNodes := mutableReplica.allocateNodesFromPool(availableNodes, 3, "channel1") + + suite.Equal(3, len(allocatedNodes)) + for _, nodeID := range allocatedNodes { + suite.Equal("channel1", mutableReplica.exclusiveRWNodeToChannel[nodeID]) + } + + // Test allocating more nodes than available + availableNodes = []int64{6, 7} + allocatedNodes = mutableReplica.allocateNodesFromPool(availableNodes, 5, "channel2") + + suite.Equal(2, len(allocatedNodes)) + suite.ElementsMatch([]int64{6, 7}, allocatedNodes) + for _, nodeID := range allocatedNodes { + suite.Equal("channel2", mutableReplica.exclusiveRWNodeToChannel[nodeID]) + } + + // Test allocating from empty pool + availableNodes = []int64{} + allocatedNodes = mutableReplica.allocateNodesFromPool(availableNodes, 2, "channel3") + + suite.Equal(0, len(allocatedNodes)) +} + +// TestGetSortedChannelsByNodeCount tests the channel sorting function +func (suite *ReplicaSuite) TestGetSortedChannelsByNodeCount() { + r := newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 2, + ResourceGroup: DefaultResourceGroupName, + }) + + mutableReplica := r.CopyForWrite() + + channelInfos := map[string]*querypb.ChannelNodeInfo{ + "channel1": {RwNodes: []int64{1}}, // 1 node + "channel2": {RwNodes: []int64{2, 3, 4}}, // 3 nodes + "channel3": {RwNodes: []int64{5, 6}}, // 2 nodes + "channel4": {RwNodes: []int64{}}, // 0 nodes + } + + sortedChannels := mutableReplica.getSortedChannelsByNodeCount(channelInfos) + + // Should be sorted by node count descending: channel2(3), channel3(2), channel1(1), channel4(0) + suite.Equal(4, len(sortedChannels)) + suite.Equal("channel2", sortedChannels[0]) + suite.Equal("channel3", sortedChannels[1]) + suite.Equal("channel1", sortedChannels[2]) + suite.Equal("channel4", sortedChannels[3]) +} + +// TestCalculateOptimalAssignments tests the assignment calculation function +func (suite *ReplicaSuite) TestCalculateOptimalAssignments() { + r := newReplica(&querypb.Replica{ + ID: 1, + CollectionID: 2, + ResourceGroup: DefaultResourceGroupName, + Nodes: []int64{1, 2, 3, 4, 5, 6, 7}, + }) + + mutableReplica := r.CopyForWrite() + + // Test perfect division: 6 nodes, 3 channels = 2 nodes each + channelInfos := map[string]*querypb.ChannelNodeInfo{ + "channel1": {RwNodes: []int64{1, 2, 3}}, + "channel2": {RwNodes: []int64{4}}, + "channel3": {RwNodes: []int64{}}, + } + + // Mock RWNodesCount to return 6 for this test + originalNodes := mutableReplica.rwNodes + mutableReplica.rwNodes.Clear() + mutableReplica.rwNodes.Insert(1, 2, 3, 4, 5, 6) + + assignments := mutableReplica.calculateOptimalAssignments(channelInfos) + + suite.Equal(3, len(assignments)) + totalAssigned := 0 + for _, count := range assignments { + totalAssigned += count + suite.True(count >= 2 && count <= 2, "Each channel should get exactly 2 nodes") + } + suite.Equal(6, totalAssigned) + + // Restore original nodes + mutableReplica.rwNodes = originalNodes + + // Test with remainder: 7 nodes, 3 channels = 2 nodes each + 1 extra + mutableReplica.rwNodes.Clear() + mutableReplica.rwNodes.Insert(1, 2, 3, 4, 5, 6, 7) + + assignments = mutableReplica.calculateOptimalAssignments(channelInfos) + + suite.Equal(3, len(assignments)) + totalAssigned = 0 + countsOfTwo := 0 + countsOfThree := 0 + for _, count := range assignments { + totalAssigned += count + if count == 2 { + countsOfTwo++ + } else if count == 3 { + countsOfThree++ + } + } + suite.Equal(7, totalAssigned) + suite.Equal(2, countsOfTwo) // 2 channels get 2 nodes + suite.Equal(1, countsOfThree) // 1 channel gets 3 nodes +} + func TestReplica(t *testing.T) { suite.Run(t, new(ReplicaSuite)) } diff --git a/internal/querycoordv2/observers/replica_observer.go b/internal/querycoordv2/observers/replica_observer.go index d057b01850..f9c7523d69 100644 --- a/internal/querycoordv2/observers/replica_observer.go +++ b/internal/querycoordv2/observers/replica_observer.go @@ -21,21 +21,24 @@ import ( "sync" "time" + "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/syncutil" ) // check replica, find read only nodes and remove it from replica if all segment/channel has been moved type ReplicaObserver struct { - cancel context.CancelFunc - wg sync.WaitGroup - meta *meta.Meta - distMgr *meta.DistributionManager + cancel context.CancelFunc + wg sync.WaitGroup + meta *meta.Meta + distMgr *meta.DistributionManager + targetMgr meta.TargetManagerInterface startOnce sync.Once stopOnce sync.Once @@ -93,26 +96,41 @@ func (ob *ReplicaObserver) waitNodeChangedOrTimeout(ctx context.Context, listene func (ob *ReplicaObserver) checkNodesInReplica() { ctx := context.Background() - log := log.Ctx(ctx).WithRateGroup("qcv2.replicaObserver", 1, 60) + log := log.Ctx(ctx).WithRateGroup("qcv2.checkNodesInReplica", 1, 60) collections := ob.meta.GetAll(ctx) for _, collectionID := range collections { utils.RecoverReplicaOfCollection(ctx, ob.meta, collectionID) } + balancePolicy := paramtable.Get().QueryCoordCfg.Balancer.GetValue() + enableChannelExclusiveMode := balancePolicy == meta.ChannelLevelScoreBalancerName + // check all ro nodes, remove it from replica if all segment/channel has been moved for _, collectionID := range collections { replicas := ob.meta.ReplicaManager.GetByCollection(ctx, collectionID) for _, replica := range replicas { + if enableChannelExclusiveMode && !replica.IsChannelExclusiveModeEnabled() { + // register channel for enable exclusive mode + mutableReplica := replica.CopyForWrite() + channels := ob.targetMgr.GetDmChannelsByCollection(ctx, collectionID, meta.CurrentTargetFirst) + mutableReplica.TryEnableChannelExclusiveMode(lo.Keys(channels)...) + replica = mutableReplica.IntoReplica() + ob.meta.ReplicaManager.Put(ctx, replica) + } + roNodes := replica.GetRONodes() rwNodes := replica.GetRWNodes() if len(roNodes) == 0 { continue } - log.RatedInfo(10, "found ro nodes in replica", + logger := log.With( zap.Int64("collectionID", replica.GetCollectionID()), zap.Int64("replicaID", replica.GetID()), - zap.Int64s("RONodes", roNodes), + zap.Int64s("roNodes", roNodes), + zap.Int64s("rwNodes", rwNodes), ) + + log.RatedInfo(10, "found ro nodes in replica") removeNodes := make([]int64, 0, len(roNodes)) for _, node := range roNodes { channels := ob.distMgr.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(node)) @@ -124,18 +142,15 @@ func (ob *ReplicaObserver) checkNodesInReplica() { if len(removeNodes) == 0 { continue } - logger := log.With( - zap.Int64("collectionID", replica.GetCollectionID()), - zap.Int64("replicaID", replica.GetID()), - zap.Int64s("removedNodes", removeNodes), - zap.Int64s("roNodes", roNodes), - zap.Int64s("rwNodes", rwNodes), - ) if err := ob.meta.ReplicaManager.RemoveNode(ctx, replica.GetID(), removeNodes...); err != nil { - logger.Warn("fail to remove node from replica", zap.Error(err)) + logger.Warn("fail to remove node from replica", + zap.Int64s("removedNodes", removeNodes), + zap.Error(err)) continue } - logger.Info("all segment/channel has been removed from ro node, try to remove it from replica") + logger.Info("all segment/channel has been removed from ro node, remove it from replica", + zap.Int64s("removedNodes", removeNodes), + ) } } } diff --git a/tests/integration/balance/channel_exclusive_balance_test.go b/tests/integration/balance/channel_exclusive_balance_test.go index e8fec558ea..b6f794db58 100644 --- a/tests/integration/balance/channel_exclusive_balance_test.go +++ b/tests/integration/balance/channel_exclusive_balance_test.go @@ -54,6 +54,7 @@ func (s *ChannelExclusiveBalanceSuit) SetupSuite() { paramtable.Get().Save(paramtable.Get().QueryCoordCfg.Balancer.Key, meta.ChannelLevelScoreBalancerName) paramtable.Get().Save(paramtable.Get().QueryCoordCfg.ChannelExclusiveNodeFactor.Key, "2") + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.CheckNodeInReplicaInterval.Key, "3") // disable compaction paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableCompaction.Key, "false")