From 7537dbfa379ea8db41be10b534d3f2bfbc420ff7 Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 10 Nov 2022 17:53:04 +0800 Subject: [PATCH] skip balance on loading collection (#20483) Signed-off-by: Wei Liu Signed-off-by: Wei Liu --- .../balance/rowcount_based_balancer.go | 9 +++- .../balance/rowcount_based_balancer_test.go | 48 ++++++++++++++++++- 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index 31f25a3b73..13ed1553b9 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -19,9 +19,11 @@ package balance import ( "sort" + "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/samber/lo" ) type RowCountBasedBalancer struct { @@ -81,8 +83,13 @@ func (b *RowCountBasedBalancer) convertToNodeItems(nodeIDs []int64) []*nodeItem func (b *RowCountBasedBalancer) Balance() ([]SegmentAssignPlan, []ChannelAssignPlan) { ids := b.meta.CollectionManager.GetAll() + // loading collection should skip balance + loadedCollections := lo.Filter(ids, func(cid int64, _ int) bool { + return b.meta.GetStatus(cid) == querypb.LoadStatus_Loaded + }) + segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) - for _, cid := range ids { + for _, cid := range loadedCollections { replicas := b.meta.ReplicaManager.GetByCollection(cid) for _, replica := range replicas { splans, cplans := b.balanceReplica(replica) diff --git a/internal/querycoordv2/balance/rowcount_based_balancer_test.go b/internal/querycoordv2/balance/rowcount_based_balancer_test.go index e4943525c6..6d62b89212 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer_test.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer_test.go @@ -21,6 +21,7 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" @@ -144,7 +145,52 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { suite.SetupSuite() defer suite.TearDownTest() balancer := suite.balancer - balancer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + collection := utils.CreateTestCollection(1, 1) + collection.LoadPercentage = 100 + collection.Status = querypb.LoadStatus_Loaded + balancer.meta.CollectionManager.PutCollection(collection) + balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, c.nodes)) + for node, s := range c.distributions { + balancer.dist.SegmentDistManager.Update(node, s...) + } + segmentPlans, channelPlans := balancer.Balance() + suite.Empty(channelPlans) + suite.ElementsMatch(c.expectPlans, segmentPlans) + }) + } + +} + +func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnLoadingCollection() { + cases := []struct { + name string + nodes []int64 + distributions map[int64][]*meta.Segment + expectPlans []SegmentAssignPlan + }{ + { + name: "normal balance", + nodes: []int64{1, 2}, + distributions: map[int64][]*meta.Segment{ + 1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}}, + 2: { + {SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 20}, Node: 2}, + {SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 30}, Node: 2}, + }, + }, + expectPlans: []SegmentAssignPlan{}, + }, + } + + for _, c := range cases { + suite.Run(c.name, func() { + suite.SetupSuite() + defer suite.TearDownTest() + balancer := suite.balancer + collection := utils.CreateTestCollection(1, 1) + collection.LoadPercentage = 100 + collection.Status = querypb.LoadStatus_Loading + balancer.meta.CollectionManager.PutCollection(collection) balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, c.nodes)) for node, s := range c.distributions { balancer.dist.SegmentDistManager.Update(node, s...)