From a2c19af3ed58e0dd1399845c752a40e1c5a376b6 Mon Sep 17 00:00:00 2001 From: wei liu Date: Mon, 29 Jul 2024 11:33:50 +0800 Subject: [PATCH] enhance: Solve channel unbalance on datanode (#34984) (#35033) issue: #33583 pr: #34984 the old policy permit datanode has at most 2 more channels than other datanode. so if milvus has 2 datanode and 2 channels, both 2 channels will be assign to 1 datanode, left another datanode empty. This PR refine the balance policy to solve channel unbalance on datanode --------- Signed-off-by: Wei Liu --- internal/datacoord/policy.go | 21 +++++++++++++++++-- internal/datacoord/policy_test.go | 35 +++++++++++++++++++++++++++---- 2 files changed, 50 insertions(+), 6 deletions(-) diff --git a/internal/datacoord/policy.go b/internal/datacoord/policy.go index bdb32a545e..b764512f8f 100644 --- a/internal/datacoord/policy.go +++ b/internal/datacoord/policy.go @@ -332,17 +332,32 @@ func AvgBalanceChannelPolicy(cluster Assignments) *ChannelOpSet { totalChannelNum += len(nodeChs.Channels) } channelCountPerNode := totalChannelNum / avaNodeNum + maxChannelCountPerNode := channelCountPerNode + remainder := totalChannelNum % avaNodeNum + if remainder > 0 { + maxChannelCountPerNode += 1 + } for _, nChannels := range cluster { chCount := len(nChannels.Channels) - if chCount <= channelCountPerNode+1 { + if chCount == 0 { + continue + } + + toReleaseCount := chCount - channelCountPerNode + if remainder > 0 && chCount >= maxChannelCountPerNode { + remainder -= 1 + toReleaseCount = chCount - maxChannelCountPerNode + } + + if toReleaseCount == 0 { log.Info("node channel count is not much larger than average, skip reallocate", zap.Int64("nodeID", nChannels.NodeID), zap.Int("channelCount", chCount), zap.Int("channelCountPerNode", channelCountPerNode)) continue } + reallocate := NewNodeChannelInfo(nChannels.NodeID) - toReleaseCount := chCount - channelCountPerNode - 1 for _, ch := range nChannels.Channels { reallocate.AddChannel(ch) toReleaseCount-- @@ -377,6 +392,7 @@ func AvgAssignByCountPolicy(currentCluster Assignments, toAssign *NodeChannelInf fromCluster = append(fromCluster, info) channelNum += len(info.Channels) nodeToAvg.Insert(info.NodeID) + return } // Get toCluster by filtering out execlusive nodes @@ -385,6 +401,7 @@ func AvgAssignByCountPolicy(currentCluster Assignments, toAssign *NodeChannelInf } toCluster = append(toCluster, info) + channelNum += len(info.Channels) nodeToAvg.Insert(info.NodeID) }) diff --git a/internal/datacoord/policy_test.go b/internal/datacoord/policy_test.go index 15e85f204d..a74fdcc786 100644 --- a/internal/datacoord/policy_test.go +++ b/internal/datacoord/policy_test.go @@ -339,16 +339,16 @@ func (s *PolicySuite) TestAvgBalanceChannelPolicy() { s.Nil(opSet) }) s.Run("test uneven with conservative effect", func() { - // as we deem that the node having only one channel more than average as even, so there's no reallocation - // for this test case - // even distribution should have not results uneven := []*NodeChannelInfo{ {100, getChannels(map[string]int64{"ch1": 1, "ch2": 1})}, {NodeID: 101}, } opSet := AvgBalanceChannelPolicy(uneven) - s.Nil(opSet) + s.Equal(opSet.Len(), 1) + for _, op := range opSet.Collect() { + s.True(lo.Contains([]string{"ch1", "ch2"}, op.GetChannelNames()[0])) + } }) s.Run("test uneven with zero", func() { uneven := []*NodeChannelInfo{ @@ -639,4 +639,31 @@ func (s *AssignByCountPolicySuite) TestWithUnassignedChannels() { }) s.ElementsMatch([]int64{3, 1}, nodeIDs) }) + + s.Run("assign to reach average", func() { + curCluster := []*NodeChannelInfo{ + {1, getChannels(map[string]int64{"ch-1": 1, "ch-2": 1, "ch-3": 1})}, + {2, getChannels(map[string]int64{"ch-4": 1, "ch-5": 1, "ch-6": 4, "ch-7": 4, "ch-8": 4})}, + } + unassigned := NewNodeChannelInfo(bufferID, + getChannel("new-ch-1", 1), + getChannel("new-ch-2", 1), + getChannel("new-ch-3", 1), + ) + + opSet := AvgAssignByCountPolicy(curCluster, unassigned, nil) + s.NotNil(opSet) + + s.Equal(3, opSet.GetChannelNumber()) + s.Equal(2, opSet.Len()) + for _, op := range opSet.Collect() { + if op.Type == Delete { + s.Equal(int64(bufferID), op.NodeID) + } + + if op.Type == Watch { + s.Equal(int64(1), op.NodeID) + } + } + }) }