diff --git a/internal/querycoordv2/balance/channel_level_score_balancer_test.go b/internal/querycoordv2/balance/channel_level_score_balancer_test.go index 84256187a3..c9a3ea0bb1 100644 --- a/internal/querycoordv2/balance/channel_level_score_balancer_test.go +++ b/internal/querycoordv2/balance/channel_level_score_balancer_test.go @@ -305,10 +305,10 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegmentWithGrowing() distributions := map[int64][]*meta.Segment{ 1: { - {SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 20, CollectionID: 1}, Node: 1}, + {SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 100, CollectionID: 1}, Node: 1}, }, 2: { - {SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 20, CollectionID: 1}, Node: 2}, + {SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 100, CollectionID: 1}, Node: 2}, }, } for node, s := range distributions { @@ -333,9 +333,8 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegmentWithGrowing() // mock 50 growing row count in node 1, which is delegator, expect all segment assign to node 2 leaderView := &meta.LeaderView{ - ID: 1, - CollectionID: 1, - NumOfGrowingRows: 50, + ID: 1, + CollectionID: 1, } suite.balancer.dist.LeaderViewManager.Update(1, leaderView) plans := balancer.AssignSegment(1, toAssign, lo.Keys(distributions), false) diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 93ffdd15d2..b1e2b23f6b 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -150,17 +150,22 @@ func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []in } func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int { + delegatorOverloadFactor := params.Params.QueryCoordCfg.DelegatorMemoryOverloadFactor.GetAsFloat() + nodeRowCount := 0 + nodeCollectionRowCount := make(map[int64]int) // calculate global sealed segment row count globalSegments := b.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(nodeID)) for _, s := range globalSegments { nodeRowCount += int(s.GetNumOfRows()) + nodeCollectionRowCount[s.CollectionID] += int(s.GetNumOfRows()) } // calculate global growing segment row count views := b.dist.LeaderViewManager.GetByFilter(meta.WithNodeID2LeaderView(nodeID)) for _, view := range views { - nodeRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat()) + nodeRowCount += int(float64(view.NumOfGrowingRows)) + nodeRowCount += int(float64(nodeCollectionRowCount[view.CollectionID]) * delegatorOverloadFactor) } // calculate executing task cost in scheduler @@ -176,7 +181,8 @@ func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int { // calculate collection growing segment row count collectionViews := b.dist.LeaderViewManager.GetByFilter(meta.WithCollectionID2LeaderView(collectionID), meta.WithNodeID2LeaderView(nodeID)) for _, view := range collectionViews { - collectionRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat()) + collectionRowCount += int(float64(view.NumOfGrowingRows)) + collectionRowCount += int(float64(collectionRowCount) * delegatorOverloadFactor) } // calculate executing task cost in scheduler diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index 20ba2e5839..27382dc7e0 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -303,12 +303,22 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegmentWithGrowing() { defer suite.TearDownTest() balancer := suite.balancer + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.DelegatorMemoryOverloadFactor.Key, "0.3") + suite.balancer.meta.PutCollection(&meta.Collection{ + CollectionLoadInfo: &querypb.CollectionLoadInfo{ + CollectionID: 1, + }, + }, &meta.Partition{ + PartitionLoadInfo: &querypb.PartitionLoadInfo{ + PartitionID: 1, + }, + }) distributions := map[int64][]*meta.Segment{ 1: { - {SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 20, CollectionID: 1}, Node: 1}, + {SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 100, CollectionID: 1}, Node: 1}, }, 2: { - {SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 20, CollectionID: 1}, Node: 2}, + {SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 100, CollectionID: 1}, Node: 2}, }, } for node, s := range distributions { @@ -333,9 +343,8 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegmentWithGrowing() { // mock 50 growing row count in node 1, which is delegator, expect all segment assign to node 2 leaderView := &meta.LeaderView{ - ID: 1, - CollectionID: 1, - NumOfGrowingRows: 50, + ID: 1, + CollectionID: 1, } suite.balancer.dist.LeaderViewManager.Update(1, leaderView) plans := balancer.AssignSegment(1, toAssign, lo.Keys(distributions), false) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index f810ae0667..09d717b1fc 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1540,6 +1540,7 @@ type queryCoordConfig struct { RowCountMaxSteps ParamItem `refreshable:"true"` RandomMaxSteps ParamItem `refreshable:"true"` GrowingRowCountWeight ParamItem `refreshable:"true"` + DelegatorMemoryOverloadFactor ParamItem `refreshable:"true` BalanceCostThreshold ParamItem `refreshable:"true"` SegmentCheckInterval ParamItem `refreshable:"true"` @@ -1774,6 +1775,16 @@ func (p *queryCoordConfig) init(base *BaseTable) { } p.GrowingRowCountWeight.Init(base.mgr) + p.DelegatorMemoryOverloadFactor = ParamItem{ + Key: "queryCoord.delegatorMemoryOverloadFactor", + Version: "2.3.19", + DefaultValue: "0.3", + PanicIfEmpty: true, + Doc: "the factor of delegator overloaded memory", + Export: true, + } + p.DelegatorMemoryOverloadFactor.Init(base.mgr) + p.BalanceCostThreshold = ParamItem{ Key: "queryCoord.balanceCostThreshold", Version: "2.4.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 672aea18d0..5489a9f620 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -319,6 +319,8 @@ func TestComponentParam(t *testing.T) { params.Save("queryCoord.checkExecutedFlagInterval", "200") assert.Equal(t, 200, Params.CheckExecutedFlagInterval.GetAsInt()) params.Reset("queryCoord.checkExecutedFlagInterval") + + assert.Equal(t, 0.3, Params.DelegatorMemoryOverloadFactor.GetAsFloat()) }) t.Run("test queryNodeConfig", func(t *testing.T) { diff --git a/tests/integration/replicas/balance/replica_test.go b/tests/integration/replicas/balance/replica_test.go index de7bb10b3f..3ec17d889e 100644 --- a/tests/integration/replicas/balance/replica_test.go +++ b/tests/integration/replicas/balance/replica_test.go @@ -50,7 +50,6 @@ func (s *ReplicaTestSuit) SetupSuite() { paramtable.Init() paramtable.Get().Save(paramtable.Get().QueryCoordCfg.BalanceCheckInterval.Key, "1000") paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.Key, "1") - s.Require().NoError(s.SetupEmbedEtcd()) }