From b64bb63e775c2aa9fc666352bde5f6cd508c23f3 Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 27 Mar 2025 16:40:23 +0800 Subject: [PATCH] enhance: [2.5] Add trigger interval config for auto balance (#39154) (#39918) issue: #39156 pr: #39154 Signed-off-by: Wei Liu --- configs/milvus.yaml | 1 + .../querycoordv2/checkers/balance_checker.go | 59 ++++++++----- .../checkers/balance_checker_test.go | 88 +++++++++++++++++-- pkg/util/paramtable/component_param.go | 11 +++ pkg/util/paramtable/component_param_test.go | 1 + 5 files changed, 132 insertions(+), 28 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 55cebd318e..602039eced 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -359,6 +359,7 @@ queryCoord: checkSegmentInterval: 1000 checkChannelInterval: 1000 checkBalanceInterval: 3000 + autoBalanceInterval: 3000 # the interval for triggerauto balance checkIndexInterval: 10000 channelTaskTimeout: 60000 # 1 minute segmentTaskTimeout: 120000 # 2 minute diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index 6fd07d6999..54b43ccc30 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -46,6 +46,9 @@ type BalanceChecker struct { scheduler task.Scheduler targetMgr meta.TargetManagerInterface getBalancerFunc GetBalancerFunc + + // record auto balance ts + autoBalanceTs time.Time } func NewBalanceChecker(meta *meta.Meta, @@ -80,22 +83,12 @@ func (b *BalanceChecker) readyToCheck(ctx context.Context, collectionID int64) b return metaExist && targetExist } -func (b *BalanceChecker) replicasToBalance(ctx context.Context) []int64 { +func (b *BalanceChecker) getReplicaForStoppingBalance(ctx context.Context) []int64 { ids := b.meta.GetAll(ctx) - - // all replicas belonging to loading collection will be skipped - loadedCollections := lo.Filter(ids, func(cid int64, _ int) bool { - collection := b.meta.GetCollection(ctx, cid) - return collection != nil && collection.GetStatus() == querypb.LoadStatus_Loaded - }) - sort.Slice(loadedCollections, func(i, j int) bool { - return loadedCollections[i] < loadedCollections[j] - }) - if paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { // balance collections influenced by stopping nodes stoppingReplicas := make([]int64, 0) - for _, cid := range loadedCollections { + for _, cid := range ids { // if target and meta isn't ready, skip balance this collection if !b.readyToCheck(ctx, cid) { continue @@ -113,12 +106,27 @@ func (b *BalanceChecker) replicasToBalance(ctx context.Context) []int64 { } } + return nil +} + +func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context) []int64 { // 1. no stopping balance and auto balance is disabled, return empty collections for balance // 2. when balancer isn't active, skip auto balance if !Params.QueryCoordCfg.AutoBalance.GetAsBool() || !b.IsActive() { return nil } + ids := b.meta.GetAll(ctx) + + // all replicas belonging to loading collection will be skipped + loadedCollections := lo.Filter(ids, func(cid int64, _ int) bool { + collection := b.meta.GetCollection(ctx, cid) + return collection != nil && collection.GetStatus() == querypb.LoadStatus_Loaded + }) + sort.Slice(loadedCollections, func(i, j int) bool { + return loadedCollections[i] < loadedCollections[j] + }) + // Before performing balancing, check the CurrentTarget/LeaderView/Distribution for all collections. // If any collection has unready info, skip the balance operation to avoid inconsistencies. notReadyCollections := lo.Filter(loadedCollections, func(cid int64, _ int) bool { @@ -173,16 +181,27 @@ func (b *BalanceChecker) balanceReplicas(ctx context.Context, replicaIDs []int64 } func (b *BalanceChecker) Check(ctx context.Context) []task.Task { - ret := make([]task.Task, 0) - - replicasToBalance := b.replicasToBalance(ctx) - segmentPlans, channelPlans := b.balanceReplicas(ctx, replicasToBalance) - // iterate all collection to find a collection to balance - for len(segmentPlans) == 0 && len(channelPlans) == 0 && b.normalBalanceCollectionsCurrentRound.Len() > 0 { - replicasToBalance := b.replicasToBalance(ctx) - segmentPlans, channelPlans = b.balanceReplicas(ctx, replicasToBalance) + var segmentPlans []balance.SegmentAssignPlan + var channelPlans []balance.ChannelAssignPlan + stoppingReplicas := b.getReplicaForStoppingBalance(ctx) + if len(stoppingReplicas) > 0 { + // check for stopping balance first + segmentPlans, channelPlans = b.balanceReplicas(ctx, stoppingReplicas) + } else { + // then check for auto balance + if time.Since(b.autoBalanceTs) > paramtable.Get().QueryCoordCfg.AutoBalanceInterval.GetAsDuration(time.Millisecond) { + b.autoBalanceTs = time.Now() + replicasToBalance := b.getReplicaForNormalBalance(ctx) + segmentPlans, channelPlans = b.balanceReplicas(ctx, replicasToBalance) + // iterate all collection to find a collection to balance + for len(segmentPlans) == 0 && len(channelPlans) == 0 && b.normalBalanceCollectionsCurrentRound.Len() > 0 { + replicasToBalance := b.getReplicaForNormalBalance(ctx) + segmentPlans, channelPlans = b.balanceReplicas(ctx, replicasToBalance) + } + } } + ret := make([]task.Task, 0) tasks := balance.CreateSegmentTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), segmentPlans) task.SetPriority(task.TaskPriorityLow, tasks...) task.SetReason("segment unbalanced", tasks...) diff --git a/internal/querycoordv2/checkers/balance_checker_test.go b/internal/querycoordv2/checkers/balance_checker_test.go index 15d40b0951..8b62b6c781 100644 --- a/internal/querycoordv2/checkers/balance_checker_test.go +++ b/internal/querycoordv2/checkers/balance_checker_test.go @@ -19,9 +19,11 @@ package checkers import ( "context" "testing" + "time" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "go.uber.org/atomic" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" @@ -144,7 +146,7 @@ func (suite *BalanceCheckerTestSuite) TestAutoBalanceConf() { suite.scheduler.EXPECT().GetSegmentTaskNum().Maybe().Return(func() int { return 0 }) - replicasToBalance := suite.checker.replicasToBalance(ctx) + replicasToBalance := suite.checker.getReplicaForNormalBalance(ctx) suite.Empty(replicasToBalance) segPlans, _ := suite.checker.balanceReplicas(ctx, replicasToBalance) suite.Empty(segPlans) @@ -152,14 +154,14 @@ func (suite *BalanceCheckerTestSuite) TestAutoBalanceConf() { // test enable auto balance paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true") idsToBalance := []int64{int64(replicaID1)} - replicasToBalance = suite.checker.replicasToBalance(ctx) + replicasToBalance = suite.checker.getReplicaForNormalBalance(ctx) suite.ElementsMatch(idsToBalance, replicasToBalance) // next round idsToBalance = []int64{int64(replicaID2)} - replicasToBalance = suite.checker.replicasToBalance(ctx) + replicasToBalance = suite.checker.getReplicaForNormalBalance(ctx) suite.ElementsMatch(idsToBalance, replicasToBalance) // final round - replicasToBalance = suite.checker.replicasToBalance(ctx) + replicasToBalance = suite.checker.getReplicaForNormalBalance(ctx) suite.Empty(replicasToBalance) } @@ -221,7 +223,7 @@ func (suite *BalanceCheckerTestSuite) TestBusyScheduler() { suite.scheduler.EXPECT().GetSegmentTaskNum().Maybe().Return(func() int { return 1 }) - replicasToBalance := suite.checker.replicasToBalance(ctx) + replicasToBalance := suite.checker.getReplicaForNormalBalance(ctx) suite.Len(replicasToBalance, 1) } @@ -289,7 +291,7 @@ func (suite *BalanceCheckerTestSuite) TestStoppingBalance() { // test stopping balance idsToBalance := []int64{int64(replicaID1), int64(replicaID2)} - replicasToBalance := suite.checker.replicasToBalance(ctx) + replicasToBalance := suite.checker.getReplicaForStoppingBalance(ctx) suite.ElementsMatch(idsToBalance, replicasToBalance) // checker check @@ -347,7 +349,7 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() { // test normal balance when one collection has unready target mockTarget.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true) mockTarget.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(false) - replicasToBalance := suite.checker.replicasToBalance(ctx) + replicasToBalance := suite.checker.getReplicaForNormalBalance(ctx) suite.Len(replicasToBalance, 0) // test stopping balance with target not ready @@ -364,10 +366,80 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() { suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica()) idsToBalance := []int64{int64(replicaID1)} - replicasToBalance = suite.checker.replicasToBalance(ctx) + replicasToBalance = suite.checker.getReplicaForStoppingBalance(ctx) suite.ElementsMatch(idsToBalance, replicasToBalance) } +func (suite *BalanceCheckerTestSuite) TestAutoBalanceInterval() { + ctx := context.Background() + // set up nodes info + nodeID1, nodeID2 := 1, 2 + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(nodeID1), + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(nodeID2), + Address: "localhost", + Hostname: "localhost", + })) + suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID1)) + suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID2)) + + segments := []*datapb.SegmentInfo{ + { + ID: 1, + PartitionID: 1, + InsertChannel: "test-insert-channel", + }, + { + ID: 2, + PartitionID: 1, + InsertChannel: "test-insert-channel", + }, + } + channels := []*datapb.VchannelInfo{ + { + CollectionID: 1, + ChannelName: "test-insert-channel", + }, + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil) + + // set collections meta + cid1, replicaID1, partitionID1 := 1, 1, 1 + collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1)) + collection1.Status = querypb.LoadStatus_Loaded + replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)}) + partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1)) + suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1) + suite.checker.meta.ReplicaManager.Put(ctx, replica1) + suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid1)) + suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid1)) + + funcCallCounter := atomic.NewInt64(0) + suite.balancer.EXPECT().BalanceReplica(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, r *meta.Replica) ([]balance.SegmentAssignPlan, []balance.ChannelAssignPlan) { + funcCallCounter.Inc() + return nil, nil + }) + + // first auto balance should be triggered + suite.checker.Check(ctx) + suite.Equal(funcCallCounter.Load(), int64(1)) + + // second auto balance won't be triggered due to autoBalanceInterval == 3s + suite.checker.Check(ctx) + suite.Equal(funcCallCounter.Load(), int64(1)) + + // set autoBalanceInterval == 1, sleep 1s, auto balance should be triggered + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.AutoBalanceInterval.Key, "1000") + paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.AutoBalanceInterval.Key) + time.Sleep(1 * time.Second) + suite.checker.Check(ctx) + suite.Equal(funcCallCounter.Load(), int64(1)) +} + func TestBalanceCheckerSuite(t *testing.T) { suite.Run(t, new(BalanceCheckerTestSuite)) } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index bb89c21ce9..71addba83a 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1894,6 +1894,7 @@ type queryCoordConfig struct { SegmentCheckInterval ParamItem `refreshable:"true"` ChannelCheckInterval ParamItem `refreshable:"true"` BalanceCheckInterval ParamItem `refreshable:"true"` + AutoBalanceInterval ParamItem `refreshable:"true"` IndexCheckInterval ParamItem `refreshable:"true"` ChannelTaskTimeout ParamItem `refreshable:"true"` SegmentTaskTimeout ParamItem `refreshable:"true"` @@ -2509,6 +2510,16 @@ If this parameter is set false, Milvus simply searches the growing segments with Export: false, } p.ClusterLevelLoadResourceGroups.Init(base.mgr) + + p.AutoBalanceInterval = ParamItem{ + Key: "queryCoord.autoBalanceInterval", + Version: "2.5.3", + DefaultValue: "3000", + Doc: "the interval for triggerauto balance", + PanicIfEmpty: true, + Export: true, + } + p.AutoBalanceInterval.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index ee156c33fe..9b7b30f9e1 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -379,6 +379,7 @@ func TestComponentParam(t *testing.T) { assert.Len(t, Params.ClusterLevelLoadResourceGroups.GetAsStrings(), 0) assert.Equal(t, 10, Params.CollectionChannelCountFactor.GetAsInt()) + assert.Equal(t, 3000, Params.AutoBalanceInterval.GetAsInt()) }) t.Run("test queryNodeConfig", func(t *testing.T) {