From b3bc7f3985d8e1ab9b476c14bcc034fa7437ac63 Mon Sep 17 00:00:00 2001 From: wei liu Date: Fri, 26 Jul 2024 10:13:46 +0800 Subject: [PATCH] enhance: Limit collection's normal balance speed (#34810) (#34987) issue: #34798 pr: #34810 after we remove the task priority on query coord, to avoid load/release segment blocked by too much balance task, we limit the balance task size in each round. at same time, we reduce the balance interval to trigger balance more frequently. Signed-off-by: Wei Liu --- internal/querycoordv2/balance/balance.go | 6 ++++++ internal/querycoordv2/balance/balance_test.go | 5 +++++ .../querycoordv2/balance/rowcount_based_balancer.go | 4 ++++ .../querycoordv2/balance/score_based_balancer.go | 10 ++++++++++ internal/querycoordv2/checkers/balance_checker.go | 5 +++++ pkg/util/paramtable/component_param.go | 12 +++++++++++- pkg/util/paramtable/component_param_test.go | 1 + 7 files changed, 42 insertions(+), 1 deletion(-) diff --git a/internal/querycoordv2/balance/balance.go b/internal/querycoordv2/balance/balance.go index 17228d6bb0..117c895efb 100644 --- a/internal/querycoordv2/balance/balance.go +++ b/internal/querycoordv2/balance/balance.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type SegmentAssignPlan struct { @@ -82,6 +83,8 @@ func (b *RoundRobinBalancer) AssignSegment(collectionID int64, segments []*meta. delta1, delta2 := b.scheduler.GetSegmentTaskDelta(id1, -1), b.scheduler.GetSegmentTaskDelta(id2, -1) return cnt1+delta1 < cnt2+delta2 }) + + balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt() ret := make([]SegmentAssignPlan, 0, len(segments)) for i, s := range segments { plan := SegmentAssignPlan{ @@ -90,6 +93,9 @@ func (b *RoundRobinBalancer) AssignSegment(collectionID int64, segments []*meta. To: nodesInfo[i%len(nodesInfo)].ID(), } ret = append(ret, plan) + if len(ret) > balanceBatchSize { + break + } } return ret } diff --git a/internal/querycoordv2/balance/balance_test.go b/internal/querycoordv2/balance/balance_test.go index 543c04c534..b99e1272cf 100644 --- a/internal/querycoordv2/balance/balance_test.go +++ b/internal/querycoordv2/balance/balance_test.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type BalanceTestSuite struct { @@ -35,6 +36,10 @@ type BalanceTestSuite struct { roundRobinBalancer *RoundRobinBalancer } +func (suite *BalanceTestSuite) SetupSuite() { + paramtable.Init() +} + func (suite *BalanceTestSuite) SetupTest() { nodeManager := session.NewNodeManager() suite.mockScheduler = task.NewMockScheduler(suite.T()) diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index d36815432c..05c4f57a5f 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -64,6 +64,7 @@ func (b *RowCountBasedBalancer) AssignSegment(collectionID int64, segments []*me return segments[i].GetNumOfRows() > segments[j].GetNumOfRows() }) + balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt() plans := make([]SegmentAssignPlan, 0, len(segments)) for _, s := range segments { // pick the node with the least row count and allocate to it. @@ -74,6 +75,9 @@ func (b *RowCountBasedBalancer) AssignSegment(collectionID int64, segments []*me Segment: s, } plans = append(plans, plan) + if len(plans) > balanceBatchSize { + break + } // change node's priority and push back p := ni.getPriority() ni.setPriority(p + int(s.GetNumOfRows())) diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 87d7c6b5e0..eb34841f2d 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -82,6 +82,7 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta. return segments[i].GetNumOfRows() > segments[j].GetNumOfRows() }) + balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt() plans := make([]SegmentAssignPlan, 0, len(segments)) for _, s := range segments { func(s *meta.Segment) { @@ -112,6 +113,10 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta. } targetNode.setPriority(targetNode.getPriority() + priorityChange) }(s) + + if len(plans) > balanceBatchSize { + break + } } return plans } @@ -295,6 +300,8 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [ return nil } + balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt() + // find the segment from the node which has more score than the average segmentsToMove := make([]*meta.Segment, 0) average := totalScore / len(onlineNodes) @@ -309,6 +316,9 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [ }) for _, s := range segments { segmentsToMove = append(segmentsToMove, s) + if len(segmentsToMove) >= balanceBatchSize { + break + } leftScore -= b.calculateSegmentScore(s) if leftScore <= average { break diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index 86cfb06453..7d946690ed 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -171,6 +171,11 @@ func (b *BalanceChecker) Check(ctx context.Context) []task.Task { replicasToBalance := b.replicasToBalance() segmentPlans, channelPlans := b.balanceReplicas(replicasToBalance) + // iterate all collection to find a collection to balance + for len(segmentPlans) == 0 && len(channelPlans) == 0 && b.normalBalanceCollectionsCurrentRound.Len() > 0 { + replicasToBalance := b.replicasToBalance() + segmentPlans, channelPlans = b.balanceReplicas(replicasToBalance) + } tasks := balance.CreateSegmentTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), segmentPlans) task.SetPriority(task.TaskPriorityLow, tasks...) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index f213d294ad..bccc1842ff 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1638,6 +1638,7 @@ type queryCoordConfig struct { CollectionObserverInterval ParamItem `refreshable:"false"` CheckExecutedFlagInterval ParamItem `refreshable:"false"` UpdateCollectionLoadStatusInterval ParamItem `refreshable:"false"` + CollectionBalanceSegmentBatchSize ParamItem `refreshable true` } func (p *queryCoordConfig) init(base *BaseTable) { @@ -1890,7 +1891,7 @@ func (p *queryCoordConfig) init(base *BaseTable) { p.BalanceCheckInterval = ParamItem{ Key: "queryCoord.checkBalanceInterval", Version: "2.3.0", - DefaultValue: "10000", + DefaultValue: "3000", PanicIfEmpty: true, Export: true, } @@ -2162,6 +2163,15 @@ func (p *queryCoordConfig) init(base *BaseTable) { Export: false, } p.CheckExecutedFlagInterval.Init(base.mgr) + + p.CollectionBalanceSegmentBatchSize = ParamItem{ + Key: "queryCoord.collectionBalanceSegmentBatchSize", + Version: "2.4.7", + DefaultValue: "5", + Doc: "the max balance task number for collection at each round", + Export: false, + } + p.CollectionBalanceSegmentBatchSize.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 7e574ae81d..70af23bbc8 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -339,6 +339,7 @@ func TestComponentParam(t *testing.T) { params.Reset("queryCoord.checkExecutedFlagInterval") assert.Equal(t, 0.3, Params.DelegatorMemoryOverloadFactor.GetAsFloat()) + assert.Equal(t, 5, Params.CollectionBalanceSegmentBatchSize.GetAsInt()) }) t.Run("test queryNodeConfig", func(t *testing.T) {