From cf701a9bf099d71cd314b57626336ae7da2672f7 Mon Sep 17 00:00:00 2001 From: wei liu Date: Mon, 15 Jul 2024 20:51:46 +0800 Subject: [PATCH] enhance: Preserve fixed-size memory in delegator node for growing segment (#34600) issue: #34595 pr: #34596 When consuming insert data on the delegator node, QueryCoord will move out some sealed segments to manage its memory usage. After the growing segment gets flushed, some sealed segments from other workers will be moved back to the delegator node. To avoid the frequent movement of segments, we estimate the maximum growing row count and preserve a fixed-size memory in the delegator node. --------- Signed-off-by: Wei Liu --- .../channel_level_score_balancer_test.go | 9 ++++----- .../balance/score_based_balancer.go | 10 ++++++++-- .../balance/score_based_balancer_test.go | 19 ++++++++++++++----- pkg/util/paramtable/component_param.go | 11 +++++++++++ pkg/util/paramtable/component_param_test.go | 2 ++ .../replicas/balance/replica_test.go | 1 - 6 files changed, 39 insertions(+), 13 deletions(-) diff --git a/internal/querycoordv2/balance/channel_level_score_balancer_test.go b/internal/querycoordv2/balance/channel_level_score_balancer_test.go index 84256187a3..c9a3ea0bb1 100644 --- a/internal/querycoordv2/balance/channel_level_score_balancer_test.go +++ b/internal/querycoordv2/balance/channel_level_score_balancer_test.go @@ -305,10 +305,10 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegmentWithGrowing() distributions := map[int64][]*meta.Segment{ 1: { - {SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 20, CollectionID: 1}, Node: 1}, + {SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 100, CollectionID: 1}, Node: 1}, }, 2: { - {SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 20, CollectionID: 1}, Node: 2}, + {SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 100, CollectionID: 1}, Node: 2}, }, } for node, s := range distributions { @@ -333,9 +333,8 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegmentWithGrowing() // mock 50 growing row count in node 1, which is delegator, expect all segment assign to node 2 leaderView := &meta.LeaderView{ - ID: 1, - CollectionID: 1, - NumOfGrowingRows: 50, + ID: 1, + CollectionID: 1, } suite.balancer.dist.LeaderViewManager.Update(1, leaderView) plans := balancer.AssignSegment(1, toAssign, lo.Keys(distributions), false) diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 93ffdd15d2..b1e2b23f6b 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -150,17 +150,22 @@ func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []in } func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int { + delegatorOverloadFactor := params.Params.QueryCoordCfg.DelegatorMemoryOverloadFactor.GetAsFloat() + nodeRowCount := 0 + nodeCollectionRowCount := make(map[int64]int) // calculate global sealed segment row count globalSegments := b.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(nodeID)) for _, s := range globalSegments { nodeRowCount += int(s.GetNumOfRows()) + nodeCollectionRowCount[s.CollectionID] += int(s.GetNumOfRows()) } // calculate global growing segment row count views := b.dist.LeaderViewManager.GetByFilter(meta.WithNodeID2LeaderView(nodeID)) for _, view := range views { - nodeRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat()) + nodeRowCount += int(float64(view.NumOfGrowingRows)) + nodeRowCount += int(float64(nodeCollectionRowCount[view.CollectionID]) * delegatorOverloadFactor) } // calculate executing task cost in scheduler @@ -176,7 +181,8 @@ func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int { // calculate collection growing segment row count collectionViews := b.dist.LeaderViewManager.GetByFilter(meta.WithCollectionID2LeaderView(collectionID), meta.WithNodeID2LeaderView(nodeID)) for _, view := range collectionViews { - collectionRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat()) + collectionRowCount += int(float64(view.NumOfGrowingRows)) + collectionRowCount += int(float64(collectionRowCount) * delegatorOverloadFactor) } // calculate executing task cost in scheduler diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index 20ba2e5839..27382dc7e0 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -303,12 +303,22 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegmentWithGrowing() { defer suite.TearDownTest() balancer := suite.balancer + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.DelegatorMemoryOverloadFactor.Key, "0.3") + suite.balancer.meta.PutCollection(&meta.Collection{ + CollectionLoadInfo: &querypb.CollectionLoadInfo{ + CollectionID: 1, + }, + }, &meta.Partition{ + PartitionLoadInfo: &querypb.PartitionLoadInfo{ + PartitionID: 1, + }, + }) distributions := map[int64][]*meta.Segment{ 1: { - {SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 20, CollectionID: 1}, Node: 1}, + {SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 100, CollectionID: 1}, Node: 1}, }, 2: { - {SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 20, CollectionID: 1}, Node: 2}, + {SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 100, CollectionID: 1}, Node: 2}, }, } for node, s := range distributions { @@ -333,9 +343,8 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegmentWithGrowing() { // mock 50 growing row count in node 1, which is delegator, expect all segment assign to node 2 leaderView := &meta.LeaderView{ - ID: 1, - CollectionID: 1, - NumOfGrowingRows: 50, + ID: 1, + CollectionID: 1, } suite.balancer.dist.LeaderViewManager.Update(1, leaderView) plans := balancer.AssignSegment(1, toAssign, lo.Keys(distributions), false) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index f810ae0667..09d717b1fc 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1540,6 +1540,7 @@ type queryCoordConfig struct { RowCountMaxSteps ParamItem `refreshable:"true"` RandomMaxSteps ParamItem `refreshable:"true"` GrowingRowCountWeight ParamItem `refreshable:"true"` + DelegatorMemoryOverloadFactor ParamItem `refreshable:"true` BalanceCostThreshold ParamItem `refreshable:"true"` SegmentCheckInterval ParamItem `refreshable:"true"` @@ -1774,6 +1775,16 @@ func (p *queryCoordConfig) init(base *BaseTable) { } p.GrowingRowCountWeight.Init(base.mgr) + p.DelegatorMemoryOverloadFactor = ParamItem{ + Key: "queryCoord.delegatorMemoryOverloadFactor", + Version: "2.3.19", + DefaultValue: "0.3", + PanicIfEmpty: true, + Doc: "the factor of delegator overloaded memory", + Export: true, + } + p.DelegatorMemoryOverloadFactor.Init(base.mgr) + p.BalanceCostThreshold = ParamItem{ Key: "queryCoord.balanceCostThreshold", Version: "2.4.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 672aea18d0..5489a9f620 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -319,6 +319,8 @@ func TestComponentParam(t *testing.T) { params.Save("queryCoord.checkExecutedFlagInterval", "200") assert.Equal(t, 200, Params.CheckExecutedFlagInterval.GetAsInt()) params.Reset("queryCoord.checkExecutedFlagInterval") + + assert.Equal(t, 0.3, Params.DelegatorMemoryOverloadFactor.GetAsFloat()) }) t.Run("test queryNodeConfig", func(t *testing.T) { diff --git a/tests/integration/replicas/balance/replica_test.go b/tests/integration/replicas/balance/replica_test.go index de7bb10b3f..3ec17d889e 100644 --- a/tests/integration/replicas/balance/replica_test.go +++ b/tests/integration/replicas/balance/replica_test.go @@ -50,7 +50,6 @@ func (s *ReplicaTestSuit) SetupSuite() { paramtable.Init() paramtable.Get().Save(paramtable.Get().QueryCoordCfg.BalanceCheckInterval.Key, "1000") paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.Key, "1") - s.Require().NoError(s.SetupEmbedEtcd()) }