diff --git a/internal/querycoordv2/balance/balance.go b/internal/querycoordv2/balance/balance.go index 17228d6bb0..117c895efb 100644 --- a/internal/querycoordv2/balance/balance.go +++ b/internal/querycoordv2/balance/balance.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type SegmentAssignPlan struct { @@ -82,6 +83,8 @@ func (b *RoundRobinBalancer) AssignSegment(collectionID int64, segments []*meta. delta1, delta2 := b.scheduler.GetSegmentTaskDelta(id1, -1), b.scheduler.GetSegmentTaskDelta(id2, -1) return cnt1+delta1 < cnt2+delta2 }) + + balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt() ret := make([]SegmentAssignPlan, 0, len(segments)) for i, s := range segments { plan := SegmentAssignPlan{ @@ -90,6 +93,9 @@ func (b *RoundRobinBalancer) AssignSegment(collectionID int64, segments []*meta. To: nodesInfo[i%len(nodesInfo)].ID(), } ret = append(ret, plan) + if len(ret) > balanceBatchSize { + break + } } return ret } diff --git a/internal/querycoordv2/balance/balance_test.go b/internal/querycoordv2/balance/balance_test.go index 543c04c534..b99e1272cf 100644 --- a/internal/querycoordv2/balance/balance_test.go +++ b/internal/querycoordv2/balance/balance_test.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type BalanceTestSuite struct { @@ -35,6 +36,10 @@ type BalanceTestSuite struct { roundRobinBalancer *RoundRobinBalancer } +func (suite *BalanceTestSuite) SetupSuite() { + paramtable.Init() +} + func (suite *BalanceTestSuite) SetupTest() { nodeManager := session.NewNodeManager() suite.mockScheduler = task.NewMockScheduler(suite.T()) diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index d36815432c..05c4f57a5f 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -64,6 +64,7 @@ func (b *RowCountBasedBalancer) AssignSegment(collectionID int64, segments []*me return segments[i].GetNumOfRows() > segments[j].GetNumOfRows() }) + balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt() plans := make([]SegmentAssignPlan, 0, len(segments)) for _, s := range segments { // pick the node with the least row count and allocate to it. @@ -74,6 +75,9 @@ func (b *RowCountBasedBalancer) AssignSegment(collectionID int64, segments []*me Segment: s, } plans = append(plans, plan) + if len(plans) > balanceBatchSize { + break + } // change node's priority and push back p := ni.getPriority() ni.setPriority(p + int(s.GetNumOfRows())) diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 87d7c6b5e0..eb34841f2d 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -82,6 +82,7 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta. return segments[i].GetNumOfRows() > segments[j].GetNumOfRows() }) + balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt() plans := make([]SegmentAssignPlan, 0, len(segments)) for _, s := range segments { func(s *meta.Segment) { @@ -112,6 +113,10 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta. } targetNode.setPriority(targetNode.getPriority() + priorityChange) }(s) + + if len(plans) > balanceBatchSize { + break + } } return plans } @@ -295,6 +300,8 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [ return nil } + balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt() + // find the segment from the node which has more score than the average segmentsToMove := make([]*meta.Segment, 0) average := totalScore / len(onlineNodes) @@ -309,6 +316,9 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [ }) for _, s := range segments { segmentsToMove = append(segmentsToMove, s) + if len(segmentsToMove) >= balanceBatchSize { + break + } leftScore -= b.calculateSegmentScore(s) if leftScore <= average { break diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index 86cfb06453..7d946690ed 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -171,6 +171,11 @@ func (b *BalanceChecker) Check(ctx context.Context) []task.Task { replicasToBalance := b.replicasToBalance() segmentPlans, channelPlans := b.balanceReplicas(replicasToBalance) + // iterate all collection to find a collection to balance + for len(segmentPlans) == 0 && len(channelPlans) == 0 && b.normalBalanceCollectionsCurrentRound.Len() > 0 { + replicasToBalance := b.replicasToBalance() + segmentPlans, channelPlans = b.balanceReplicas(replicasToBalance) + } tasks := balance.CreateSegmentTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), segmentPlans) task.SetPriority(task.TaskPriorityLow, tasks...) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index f213d294ad..bccc1842ff 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1638,6 +1638,7 @@ type queryCoordConfig struct { CollectionObserverInterval ParamItem `refreshable:"false"` CheckExecutedFlagInterval ParamItem `refreshable:"false"` UpdateCollectionLoadStatusInterval ParamItem `refreshable:"false"` + CollectionBalanceSegmentBatchSize ParamItem `refreshable true` } func (p *queryCoordConfig) init(base *BaseTable) { @@ -1890,7 +1891,7 @@ func (p *queryCoordConfig) init(base *BaseTable) { p.BalanceCheckInterval = ParamItem{ Key: "queryCoord.checkBalanceInterval", Version: "2.3.0", - DefaultValue: "10000", + DefaultValue: "3000", PanicIfEmpty: true, Export: true, } @@ -2162,6 +2163,15 @@ func (p *queryCoordConfig) init(base *BaseTable) { Export: false, } p.CheckExecutedFlagInterval.Init(base.mgr) + + p.CollectionBalanceSegmentBatchSize = ParamItem{ + Key: "queryCoord.collectionBalanceSegmentBatchSize", + Version: "2.4.7", + DefaultValue: "5", + Doc: "the max balance task number for collection at each round", + Export: false, + } + p.CollectionBalanceSegmentBatchSize.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 7e574ae81d..70af23bbc8 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -339,6 +339,7 @@ func TestComponentParam(t *testing.T) { params.Reset("queryCoord.checkExecutedFlagInterval") assert.Equal(t, 0.3, Params.DelegatorMemoryOverloadFactor.GetAsFloat()) + assert.Equal(t, 5, Params.CollectionBalanceSegmentBatchSize.GetAsInt()) }) t.Run("test queryNodeConfig", func(t *testing.T) {