From b517bc9e6a6627bca4c97f3c80b635d98daa559c Mon Sep 17 00:00:00 2001 From: MrPresent-Han <116052805+MrPresent-Han@users.noreply.github.com> Date: Thu, 4 May 2023 12:22:40 +0800 Subject: [PATCH] refine balance mechanism including:(#23454) (#23763) (#23791) 1. balance granuity to replica to avoid influence unrelated replicas 2. avoid balance back and forth Signed-off-by: MrPresent-Han --- configs/milvus.yaml | 4 + internal/datacoord/compaction.go | 2 +- internal/datacoord/meta.go | 2 +- internal/datacoord/server.go | 2 +- internal/datacoord/server_test.go | 2 +- internal/datanode/flow_graph_manager_test.go | 2 - internal/querycoordv2/balance/balance.go | 12 +- .../querycoordv2/balance/mock_balancer.go | 31 +-- .../balance/rowcount_based_balancer.go | 31 +-- .../balance/rowcount_based_balancer_test.go | 21 +- .../balance/score_based_balancer.go | 100 ++------- .../balance/score_based_balancer_test.go | 154 ++++++------- internal/querycoordv2/balance/utils.go | 38 ++-- .../querycoordv2/checkers/balance_checker.go | 107 ++++++++- .../checkers/balance_checker_test.go | 209 ++++++++++++++++++ internal/querycoordv2/checkers/controller.go | 2 +- internal/querycoordv2/task/executor.go | 2 +- internal/querycoordv2/task/scheduler.go | 6 +- pkg/util/paramtable/component_param.go | 15 +- pkg/util/paramtable/component_param_test.go | 12 + 20 files changed, 497 insertions(+), 257 deletions(-) create mode 100644 internal/querycoordv2/checkers/balance_checker_test.go diff --git a/configs/milvus.yaml b/configs/milvus.yaml index cd14a7b659..f70da23261 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -170,6 +170,10 @@ proxy: queryCoord: autoHandoff: true # Enable auto handoff autoBalance: true # Enable auto balance + balancer: RowCountBasedBalancer # Balancer to use + globalRowCountFactor: 0.1 # expert parameters, only used by scoreBasedBalancer + scoreUnbalanceTolerationFactor: 0.05 # expert parameters, only used by scoreBasedBalancer + reverseUnBalanceTolerationFactor: 1.3 #expert parameters, only used by scoreBasedBalancer overloadedMemoryThresholdPercentage: 90 # The threshold percentage that memory overload balanceIntervalSeconds: 60 memoryUsageMaxDifferencePercentage: 30 diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 5680d11b02..7d9920fd6a 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -308,7 +308,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { stateResult, ok := planStates[task.plan.PlanID] state := stateResult.GetState() planID := task.plan.PlanID - // check wether the state of CompactionPlan is working + // check whether the state of CompactionPlan is working if ok { if state == commonpb.CompactionState_Completed { log.Info("compaction completed", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID)) diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 4f906ca859..aff41ecd2e 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1329,7 +1329,7 @@ func isFlushState(state commonpb.SegmentState) bool { // updateSegStateAndPrepareMetrics updates a segment's in-memory state and prepare for the corresponding metric update. func updateSegStateAndPrepareMetrics(segToUpdate *SegmentInfo, targetState commonpb.SegmentState, metricMutation *segMetricMutation) { log.Debug("updating segment state and updating metrics", - zap.Int64("segment ID", segToUpdate.GetID()), + zap.Int64("segmentID", segToUpdate.GetID()), zap.String("old state", segToUpdate.GetState().String()), zap.String("new state", targetState.String()), zap.Int64("# of rows", segToUpdate.GetNumOfRows())) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 7998adaaaf..063438f32c 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -880,7 +880,7 @@ func (s *Server) startFlushLoop(ctx context.Context) { func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error { segment := s.meta.GetHealthySegment(segmentID) if segment == nil { - return errors.New("segment not found, might be a faked segemnt, ignore post flush") + return errors.New("segment not found, might be a faked segment, ignore post flush") } // set segment to SegmentState_Flushed if err := s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil { diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 7567f9d3eb..9e2afb4838 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -3305,7 +3305,7 @@ func TestPostFlush(t *testing.T) { defer closeTestServer(t, svr) err := svr.postFlush(context.Background(), 1) - assert.EqualValues(t, "segment not found, might be a faked segemnt, ignore post flush", err.Error()) + assert.EqualValues(t, errors.New("segment not found, might be a faked segment, ignore post flush"), err) }) t.Run("success post flush", func(t *testing.T) { diff --git a/internal/datanode/flow_graph_manager_test.go b/internal/datanode/flow_graph_manager_test.go index 3e494406e4..c72c538040 100644 --- a/internal/datanode/flow_graph_manager_test.go +++ b/internal/datanode/flow_graph_manager_test.go @@ -50,8 +50,6 @@ func TestFlowGraphManager(t *testing.T) { node.SetEtcdClient(etcdCli) err = node.Init() require.Nil(t, err) - err = node.Start() - require.Nil(t, err) fm := newFlowgraphManager() defer func() { diff --git a/internal/querycoordv2/balance/balance.go b/internal/querycoordv2/balance/balance.go index e342ea586e..c853d00f6d 100644 --- a/internal/querycoordv2/balance/balance.go +++ b/internal/querycoordv2/balance/balance.go @@ -58,7 +58,7 @@ var ( type Balance interface { AssignSegment(collectionID int64, segments []*meta.Segment, nodes []int64) []SegmentAssignPlan AssignChannel(channels []*meta.DmChannel, nodes []int64) []ChannelAssignPlan - Balance() ([]SegmentAssignPlan, []ChannelAssignPlan) + BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) } type RoundRobinBalancer struct { @@ -112,6 +112,11 @@ func (b *RoundRobinBalancer) AssignChannel(channels []*meta.DmChannel, nodes []i return ret } +func (b *RoundRobinBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) { + //TODO by chun.han + return nil, nil +} + func (b *RoundRobinBalancer) getNodes(nodes []int64) []*session.NodeInfo { ret := make([]*session.NodeInfo, 0, len(nodes)) for _, n := range nodes { @@ -123,11 +128,6 @@ func (b *RoundRobinBalancer) getNodes(nodes []int64) []*session.NodeInfo { return ret } -func (b *RoundRobinBalancer) Balance() ([]SegmentAssignPlan, []ChannelAssignPlan) { - // TODO(sunby) - return nil, nil -} - func NewRoundRobinBalancer(scheduler task.Scheduler, nodeManager *session.NodeManager) *RoundRobinBalancer { return &RoundRobinBalancer{ scheduler: scheduler, diff --git a/internal/querycoordv2/balance/mock_balancer.go b/internal/querycoordv2/balance/mock_balancer.go index 5d1c10aa40..b5e6504183 100644 --- a/internal/querycoordv2/balance/mock_balancer.go +++ b/internal/querycoordv2/balance/mock_balancer.go @@ -101,13 +101,13 @@ func (_c *MockBalancer_AssignSegment_Call) Return(_a0 []SegmentAssignPlan) *Mock return _c } -// Balance provides a mock function with given fields: -func (_m *MockBalancer) Balance() ([]SegmentAssignPlan, []ChannelAssignPlan) { - ret := _m.Called() +// BalanceReplica provides a mock function with given fields: replica +func (_m *MockBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) { + ret := _m.Called(replica) var r0 []SegmentAssignPlan - if rf, ok := ret.Get(0).(func() []SegmentAssignPlan); ok { - r0 = rf() + if rf, ok := ret.Get(0).(func(*meta.Replica) []SegmentAssignPlan); ok { + r0 = rf(replica) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]SegmentAssignPlan) @@ -115,8 +115,8 @@ func (_m *MockBalancer) Balance() ([]SegmentAssignPlan, []ChannelAssignPlan) { } var r1 []ChannelAssignPlan - if rf, ok := ret.Get(1).(func() []ChannelAssignPlan); ok { - r1 = rf() + if rf, ok := ret.Get(1).(func(*meta.Replica) []ChannelAssignPlan); ok { + r1 = rf(replica) } else { if ret.Get(1) != nil { r1 = ret.Get(1).([]ChannelAssignPlan) @@ -126,24 +126,25 @@ func (_m *MockBalancer) Balance() ([]SegmentAssignPlan, []ChannelAssignPlan) { return r0, r1 } -// MockBalancer_Balance_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Balance' -type MockBalancer_Balance_Call struct { +// MockBalancer_BalanceReplica_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BalanceReplica' +type MockBalancer_BalanceReplica_Call struct { *mock.Call } -// Balance is a helper method to define mock.On call -func (_e *MockBalancer_Expecter) Balance() *MockBalancer_Balance_Call { - return &MockBalancer_Balance_Call{Call: _e.mock.On("Balance")} +// BalanceReplica is a helper method to define mock.On call +// - replica *meta.Replica +func (_e *MockBalancer_Expecter) BalanceReplica(replica interface{}) *MockBalancer_BalanceReplica_Call { + return &MockBalancer_BalanceReplica_Call{Call: _e.mock.On("BalanceReplica", replica)} } -func (_c *MockBalancer_Balance_Call) Run(run func()) *MockBalancer_Balance_Call { +func (_c *MockBalancer_BalanceReplica_Call) Run(run func(replica *meta.Replica)) *MockBalancer_BalanceReplica_Call { _c.Call.Run(func(args mock.Arguments) { - run() + run(args[0].(*meta.Replica)) }) return _c } -func (_c *MockBalancer_Balance_Call) Return(_a0 []SegmentAssignPlan, _a1 []ChannelAssignPlan) *MockBalancer_Balance_Call { +func (_c *MockBalancer_BalanceReplica_Call) Return(_a0 []SegmentAssignPlan, _a1 []ChannelAssignPlan) *MockBalancer_BalanceReplica_Call { _c.Call.Return(_a0, _a1) return _c } diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index bf27c0637d..383b42856f 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -17,13 +17,11 @@ package balance import ( - "context" "sort" "github.com/samber/lo" "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" @@ -85,34 +83,7 @@ func (b *RowCountBasedBalancer) convertToNodeItems(nodeIDs []int64) []*nodeItem return ret } -func (b *RowCountBasedBalancer) Balance() ([]SegmentAssignPlan, []ChannelAssignPlan) { - ids := b.meta.CollectionManager.GetAll() - - // loading collection should skip balance - loadedCollections := lo.Filter(ids, func(cid int64, _ int) bool { - return b.meta.CalculateLoadStatus(cid) == querypb.LoadStatus_Loaded - }) - - segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) - for _, cid := range loadedCollections { - replicas := b.meta.ReplicaManager.GetByCollection(cid) - for _, replica := range replicas { - splans, cplans := b.balanceReplica(replica) - if len(splans) > 0 || len(cplans) > 0 { - log.Debug("nodes info in replica", - zap.Int64("collection", replica.CollectionID), - zap.Int64("replica", replica.ID), - zap.Int64s("nodes", replica.GetNodes())) - } - segmentPlans = append(segmentPlans, splans...) - channelPlans = append(channelPlans, cplans...) - } - } - return segmentPlans, channelPlans -} - -func (b *RowCountBasedBalancer) balanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) { - log := log.Ctx(context.Background()).WithRateGroup("qcv2.rowCountBalancer", 1.0, 60.0) +func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) { nodes := replica.GetNodes() if len(nodes) < 2 { return nil, nil diff --git a/internal/querycoordv2/balance/rowcount_based_balancer_test.go b/internal/querycoordv2/balance/rowcount_based_balancer_test.go index 53f3ec1d75..d8cd0db3f0 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer_test.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer_test.go @@ -387,7 +387,8 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { suite.balancer.nodeManager.Add(nodeInfo) suite.balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, c.nodes[i]) } - segmentPlans, channelPlans := balancer.Balance() + + segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, 1) suite.ElementsMatch(c.expectChannelPlans, channelPlans) suite.ElementsMatch(c.expectPlans, segmentPlans) }) @@ -585,7 +586,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnPartStopping() { suite.balancer.nodeManager.Add(nodeInfo) suite.balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, c.nodes[i]) } - segmentPlans, channelPlans := balancer.Balance() + segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, 1) suite.ElementsMatch(c.expectChannelPlans, channelPlans) suite.ElementsMatch(c.expectPlans, segmentPlans) }) @@ -697,7 +698,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() { suite.NoError(err) err = balancer.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) suite.NoError(err) - segmentPlans, channelPlans := balancer.Balance() + segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, 1) suite.ElementsMatch(c.expectChannelPlans, channelPlans) suite.ElementsMatch(c.expectPlans, segmentPlans) }) @@ -739,7 +740,7 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnLoadingCollection() { for node, s := range c.distributions { balancer.dist.SegmentDistManager.Update(node, s...) } - segmentPlans, channelPlans := balancer.Balance() + segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, 1) suite.Empty(channelPlans) suite.ElementsMatch(c.expectPlans, segmentPlans) }) @@ -747,6 +748,18 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnLoadingCollection() { } +func (suite *RowCountBasedBalancerTestSuite) getCollectionBalancePlans(balancer *RowCountBasedBalancer, + collectionID int64) ([]SegmentAssignPlan, []ChannelAssignPlan) { + replicas := balancer.meta.ReplicaManager.GetByCollection(collectionID) + segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) + for _, replica := range replicas { + sPlans, cPlans := balancer.BalanceReplica(replica) + segmentPlans = append(segmentPlans, sPlans...) + channelPlans = append(channelPlans, cPlans...) + } + return segmentPlans, channelPlans +} + func TestRowCountBasedBalancerSuite(t *testing.T) { suite.Run(t, new(RowCountBasedBalancerTestSuite)) } diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 0e450a1c0f..428ae14604 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -23,7 +23,6 @@ import ( "go.uber.org/zap" "golang.org/x/exp/maps" - "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" @@ -34,7 +33,6 @@ import ( type ScoreBasedBalancer struct { *RowCountBasedBalancer - balancedCollectionsCurrentRound typeutil.UniqueSet } func NewScoreBasedBalancer(scheduler task.Scheduler, @@ -43,8 +41,7 @@ func NewScoreBasedBalancer(scheduler task.Scheduler, meta *meta.Meta, targetMgr *meta.TargetManager) *ScoreBasedBalancer { return &ScoreBasedBalancer{ - RowCountBasedBalancer: NewRowCountBasedBalancer(scheduler, nodeManager, dist, meta, targetMgr), - balancedCollectionsCurrentRound: typeutil.NewUniqueSet(), + RowCountBasedBalancer: NewRowCountBasedBalancer(scheduler, nodeManager, dist, meta, targetMgr), } } @@ -109,50 +106,7 @@ func (b *ScoreBasedBalancer) calculatePriority(collectionID, nodeID int64) int { params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat()) } -func (b *ScoreBasedBalancer) Balance() ([]SegmentAssignPlan, []ChannelAssignPlan) { - ids := b.meta.CollectionManager.GetAll() - - // loading collection should skip balance - loadedCollections := lo.Filter(ids, func(cid int64, _ int) bool { - return b.meta.GetCollection(cid).Status == querypb.LoadStatus_Loaded - }) - - sort.Slice(loadedCollections, func(i, j int) bool { - return loadedCollections[i] < loadedCollections[j] - }) - - segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) - hasUnBalancedCollections := false - for _, cid := range loadedCollections { - if b.balancedCollectionsCurrentRound.Contain(cid) { - log.Debug("ScoreBasedBalancer has balanced collection, skip balancing in this round", - zap.Int64("collectionID", cid)) - continue - } - hasUnBalancedCollections = true - replicas := b.meta.ReplicaManager.GetByCollection(cid) - for _, replica := range replicas { - sPlans, cPlans := b.balanceReplica(replica) - PrintNewBalancePlans(cid, replica.GetID(), sPlans, cPlans) - segmentPlans = append(segmentPlans, sPlans...) - channelPlans = append(channelPlans, cPlans...) - } - b.balancedCollectionsCurrentRound.Insert(cid) - if len(segmentPlans) != 0 || len(channelPlans) != 0 { - log.Debug("ScoreBasedBalancer has generated balance plans for", zap.Int64("collectionID", cid)) - break - } - } - if !hasUnBalancedCollections { - b.balancedCollectionsCurrentRound.Clear() - log.Debug("ScoreBasedBalancer has balanced all " + - "collections in one round, clear collectionIDs for this round") - } - - return segmentPlans, channelPlans -} - -func (b *ScoreBasedBalancer) balanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) { +func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) { nodes := replica.GetNodes() if len(nodes) == 0 { return nil, nil @@ -209,7 +163,7 @@ func (b *ScoreBasedBalancer) balanceReplica(replica *meta.Replica) ([]SegmentAss return nil, nil } //print current distribution before generating plans - PrintCurrentReplicaDist(replica, stoppingNodesSegments, nodesSegments, b.dist.ChannelDistManager) + segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) if len(stoppingNodesSegments) != 0 { log.Info("Handle stopping nodes", zap.Int64("collection", replica.CollectionID), @@ -219,11 +173,18 @@ func (b *ScoreBasedBalancer) balanceReplica(replica *meta.Replica) ([]SegmentAss zap.Any("available nodes", maps.Keys(nodesSegments)), ) // handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score - return b.getStoppedSegmentPlan(replica, nodesSegments, stoppingNodesSegments), b.getStoppedChannelPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments)) + segmentPlans = append(segmentPlans, b.getStoppedSegmentPlan(replica, nodesSegments, stoppingNodesSegments)...) + channelPlans = append(channelPlans, b.genChannelPlan(replica, lo.Keys(nodesSegments), lo.Keys(stoppingNodesSegments))...) + } else { + // normal balance, find segments from largest score nodes and transfer to smallest score nodes. + segmentPlans = append(segmentPlans, b.getNormalSegmentPlan(replica, nodesSegments)...) + channelPlans = append(channelPlans, b.genChannelPlan(replica, lo.Keys(nodesSegments), nil)...) + } + if len(segmentPlans) != 0 || len(channelPlans) != 0 { + PrintCurrentReplicaDist(replica, stoppingNodesSegments, nodesSegments, b.dist.ChannelDistManager, b.dist.SegmentDistManager) } - // normal balance, find segments from largest score nodes and transfer to smallest score nodes. - return b.getNormalSegmentPlan(replica, nodesSegments), b.getNormalChannelPlan(replica, lo.Keys(nodesSegments)) + return segmentPlans, channelPlans } func (b *ScoreBasedBalancer) getStoppedSegmentPlan(replica *meta.Replica, nodesSegments map[int64][]*meta.Segment, stoppingNodesSegments map[int64][]*meta.Segment) []SegmentAssignPlan { @@ -269,25 +230,7 @@ func (b *ScoreBasedBalancer) getStoppedSegmentPlan(replica *meta.Replica, nodesS return segmentPlans } -func (b *ScoreBasedBalancer) getStoppedChannelPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []ChannelAssignPlan { - channelPlans := make([]ChannelAssignPlan, 0) - for _, nodeID := range offlineNodes { - dmChannels := b.dist.ChannelDistManager.GetByCollectionAndNode(replica.GetCollectionID(), nodeID) - plans := b.AssignChannel(dmChannels, onlineNodes) - for i := range plans { - plans[i].From = nodeID - plans[i].ReplicaID = replica.ID - } - channelPlans = append(channelPlans, plans...) - } - return channelPlans -} - func (b *ScoreBasedBalancer) getNormalSegmentPlan(replica *meta.Replica, nodesSegments map[int64][]*meta.Segment) []SegmentAssignPlan { - if b.scheduler.GetSegmentTaskNum() != 0 { - // scheduler is handling segment task, skip - return nil - } segmentPlans := make([]SegmentAssignPlan, 0) // generate candidates @@ -302,6 +245,13 @@ func (b *ScoreBasedBalancer) getNormalSegmentPlan(replica *meta.Replica, nodesSe toNode := nodeItems[0] fromNode := nodeItems[lastIdx] + fromPriority := fromNode.priority + toPriority := toNode.priority + unbalance := float64(fromPriority - toPriority) + if unbalance < float64(toPriority)*params.Params.QueryCoordCfg.ScoreUnbalanceTolerationFactor.GetAsFloat() { + break + } + // sort the segments in asc order, try to mitigate to-from-unbalance // TODO: segment infos inside dist manager may change in the process of making balance plan fromSegments := b.dist.SegmentDistManager.GetByCollectionAndNode(replica.CollectionID, fromNode.nodeID) @@ -322,9 +272,6 @@ func (b *ScoreBasedBalancer) getNormalSegmentPlan(replica *meta.Replica, nodesSe break } - fromPriority := fromNode.priority - toPriority := toNode.priority - unbalance := fromPriority - toPriority nextFromPriority := fromPriority - int(targetSegmentToMove.GetNumOfRows()) - int(float64(targetSegmentToMove.GetNumOfRows())* params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat()) nextToPriority := toPriority + int(targetSegmentToMove.GetNumOfRows()) + int(float64(targetSegmentToMove.GetNumOfRows())* @@ -344,7 +291,7 @@ func (b *ScoreBasedBalancer) getNormalSegmentPlan(replica *meta.Replica, nodesSe //only trigger following balance when the generated reverted balance //is far smaller than the original unbalance nextUnbalance := nextToPriority - nextFromPriority - if int(float64(nextUnbalance)*params.Params.QueryCoordCfg.ScoreUnbalanceTolerationFactor.GetAsFloat()) < unbalance { + if float64(nextUnbalance)*params.Params.QueryCoordCfg.ReverseUnbalanceTolerationFactor.GetAsFloat() < unbalance { plan := SegmentAssignPlan{ ReplicaID: replica.GetID(), From: fromNode.nodeID, @@ -368,8 +315,3 @@ func (b *ScoreBasedBalancer) getNormalSegmentPlan(replica *meta.Replica, nodesSe } return segmentPlans } - -func (b *ScoreBasedBalancer) getNormalChannelPlan(replica *meta.Replica, onlineNodes []int64) []ChannelAssignPlan { - // TODO - return make([]ChannelAssignPlan, 0) -} diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index cb29979446..b512bdd441 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -229,10 +229,9 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() { cases := []struct { name string nodes []int64 - notExistedNodes []int64 - collectionIDs []int64 - replicaIDs []int64 - collectionsSegments [][]*datapb.SegmentInfo + collectionID int64 + replicaID int64 + collectionsSegments []*datapb.SegmentInfo states []session.State shouldMock bool distributions map[int64][]*meta.Segment @@ -241,16 +240,12 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() { expectChannelPlans []ChannelAssignPlan }{ { - name: "normal balance for one collection only", - nodes: []int64{1, 2}, - collectionIDs: []int64{1}, - replicaIDs: []int64{1}, - collectionsSegments: [][]*datapb.SegmentInfo{ - { - {ID: 1, PartitionID: 1}, - {ID: 2, PartitionID: 1}, - {ID: 3, PartitionID: 1}, - }, + name: "normal balance for one collection only", + nodes: []int64{1, 2}, + collectionID: 1, + replicaID: 1, + collectionsSegments: []*datapb.SegmentInfo{ + {ID: 1, PartitionID: 1}, {ID: 2, PartitionID: 1}, {ID: 3, PartitionID: 1}, }, states: []session.State{session.NodeStateNormal, session.NodeStateNormal}, distributions: map[int64][]*meta.Segment{ @@ -266,16 +261,12 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() { expectChannelPlans: []ChannelAssignPlan{}, }, { - name: "already balanced for one collection only", - nodes: []int64{1, 2}, - collectionIDs: []int64{1}, - replicaIDs: []int64{1}, - collectionsSegments: [][]*datapb.SegmentInfo{ - { - {ID: 1, PartitionID: 1}, - {ID: 2, PartitionID: 1}, - {ID: 3, PartitionID: 1}, - }, + name: "already balanced for one collection only", + nodes: []int64{1, 2}, + collectionID: 1, + replicaID: 1, + collectionsSegments: []*datapb.SegmentInfo{ + {ID: 1, PartitionID: 1}, {ID: 2, PartitionID: 1}, {ID: 3, PartitionID: 1}, }, states: []session.State{session.NodeStateNormal, session.NodeStateNormal}, distributions: map[int64][]*meta.Segment{ @@ -292,7 +283,6 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() { }, } - suite.mockScheduler.EXPECT().GetSegmentTaskNum().Return(0) for _, c := range cases { suite.Run(c.name, func() { suite.SetupSuite() @@ -300,21 +290,16 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() { balancer := suite.balancer //1. set up target for multi collections - collections := make([]*meta.Collection, 0, len(c.collectionIDs)) - for i := range c.collectionIDs { - collection := utils.CreateTestCollection(c.collectionIDs[i], int32(c.replicaIDs[i])) - collections = append(collections, collection) - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, c.collectionIDs[i]).Return( - nil, c.collectionsSegments[i], nil) - balancer.targetMgr.UpdateCollectionNextTargetWithPartitions(c.collectionIDs[i], c.collectionIDs[i]) - balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionIDs[i], c.collectionIDs[i]) - collection.LoadPercentage = 100 - collection.Status = querypb.LoadStatus_Loaded - collection.LoadType = querypb.LoadType_LoadCollection - balancer.meta.CollectionManager.PutCollection(collection) - balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaIDs[i], c.collectionIDs[i], - append(c.nodes, c.notExistedNodes...))) - } + collection := utils.CreateTestCollection(c.collectionID, int32(c.replicaID)) + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, c.collectionID).Return( + nil, c.collectionsSegments, nil) + suite.broker.EXPECT().GetPartitions(mock.Anything, c.collectionID).Return([]int64{c.collectionID}, nil).Maybe() + balancer.targetMgr.UpdateCollectionNextTargetWithPartitions(c.collectionID, c.collectionID) + balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID, c.collectionID) + collection.LoadPercentage = 100 + collection.Status = querypb.LoadStatus_Loaded + balancer.meta.CollectionManager.PutCollection(collection) + balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes)) //2. set up target for distribution for multi collections for node, s := range c.distributions { @@ -334,7 +319,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() { } //4. balance and verify result - segmentPlans, channelPlans := balancer.Balance() + segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, c.collectionID) suite.ElementsMatch(c.expectChannelPlans, channelPlans) suite.ElementsMatch(c.expectPlans, segmentPlans) }) @@ -407,7 +392,6 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceMultiRound() { defer suite.TearDownTest() balancer := suite.balancer - suite.mockScheduler.EXPECT().GetSegmentTaskNum().Return(0) //1. set up target for multi collections collections := make([]*meta.Collection, 0, len(balanceCase.collectionIDs)) for i := range balanceCase.collectionIDs { @@ -439,7 +423,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceMultiRound() { } //4. first round balance - segmentPlans, _ := balancer.Balance() + segmentPlans, _ := suite.getCollectionBalancePlans(balancer, balanceCase.collectionIDs[0]) suite.ElementsMatch(balanceCase.expectPlans[0], segmentPlans) //5. update segment distribution to simulate balance effect @@ -448,11 +432,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceMultiRound() { } //6. balance again - segmentPlans, _ = balancer.Balance() - suite.ElementsMatch(balanceCase.expectPlans[1], segmentPlans) - - //6. balance one more and finish this round - segmentPlans, _ = balancer.Balance() + segmentPlans, _ = suite.getCollectionBalancePlans(balancer, balanceCase.collectionIDs[1]) suite.ElementsMatch(balanceCase.expectPlans[1], segmentPlans) } @@ -461,10 +441,9 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() { name string nodes []int64 outBoundNodes []int64 - notExistedNodes []int64 - collectionIDs []int64 - replicaIDs []int64 - collectionsSegments [][]*datapb.SegmentInfo + collectionID int64 + replicaID int64 + collectionsSegments []*datapb.SegmentInfo states []session.State shouldMock bool distributions map[int64][]*meta.Segment @@ -476,14 +455,10 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() { name: "stopped balance for one collection", nodes: []int64{1, 2, 3}, outBoundNodes: []int64{}, - collectionIDs: []int64{1}, - replicaIDs: []int64{1}, - collectionsSegments: [][]*datapb.SegmentInfo{ - { - {ID: 1, PartitionID: 1}, - {ID: 2, PartitionID: 1}, - {ID: 3, PartitionID: 1}, - }, + collectionID: 1, + replicaID: 1, + collectionsSegments: []*datapb.SegmentInfo{ + {ID: 1, PartitionID: 1}, {ID: 2, PartitionID: 1}, {ID: 3, PartitionID: 1}, }, states: []session.State{session.NodeStateStopping, session.NodeStateNormal, session.NodeStateNormal}, distributions: map[int64][]*meta.Segment{ @@ -507,12 +482,10 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() { name: "all nodes stopping", nodes: []int64{1, 2, 3}, outBoundNodes: []int64{}, - collectionIDs: []int64{1}, - replicaIDs: []int64{1}, - collectionsSegments: [][]*datapb.SegmentInfo{ - { - {ID: 1}, {ID: 2}, {ID: 3}, - }, + collectionID: 1, + replicaID: 1, + collectionsSegments: []*datapb.SegmentInfo{ + {ID: 1}, {ID: 2}, {ID: 3}, }, states: []session.State{session.NodeStateStopping, session.NodeStateStopping, session.NodeStateStopping}, distributions: map[int64][]*meta.Segment{ @@ -531,12 +504,10 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() { name: "all nodes outbound", nodes: []int64{1, 2, 3}, outBoundNodes: []int64{1, 2, 3}, - collectionIDs: []int64{1}, - replicaIDs: []int64{1}, - collectionsSegments: [][]*datapb.SegmentInfo{ - { - {ID: 1}, {ID: 2}, {ID: 3}, - }, + collectionID: 1, + replicaID: 1, + collectionsSegments: []*datapb.SegmentInfo{ + {ID: 1}, {ID: 2}, {ID: 3}, }, states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal}, distributions: map[int64][]*meta.Segment{ @@ -562,21 +533,16 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() { balancer := suite.balancer //1. set up target for multi collections - collections := make([]*meta.Collection, 0, len(c.collectionIDs)) - for i := range c.collectionIDs { - collection := utils.CreateTestCollection(c.collectionIDs[i], int32(c.replicaIDs[i])) - collections = append(collections, collection) - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, c.collectionIDs[i]).Return( - nil, c.collectionsSegments[i], nil) - balancer.targetMgr.UpdateCollectionNextTargetWithPartitions(c.collectionIDs[i], c.collectionIDs[i]) - balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionIDs[i], c.collectionIDs[i]) - collection.LoadPercentage = 100 - collection.Status = querypb.LoadStatus_Loaded - collection.LoadType = querypb.LoadType_LoadCollection - balancer.meta.CollectionManager.PutCollection(collection) - balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaIDs[i], c.collectionIDs[i], - append(c.nodes, c.notExistedNodes...))) - } + collection := utils.CreateTestCollection(c.collectionID, int32(c.replicaID)) + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, c.collectionID).Return( + nil, c.collectionsSegments, nil) + suite.broker.EXPECT().GetPartitions(mock.Anything, c.collectionID).Return([]int64{c.collectionID}, nil).Maybe() + balancer.targetMgr.UpdateCollectionNextTargetWithPartitions(c.collectionID, c.collectionID) + balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID, c.collectionID) + collection.LoadPercentage = 100 + collection.Status = querypb.LoadStatus_Loaded + balancer.meta.CollectionManager.PutCollection(collection) + balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes)) //2. set up target for distribution for multi collections for node, s := range c.distributions { @@ -600,7 +566,7 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() { } //4. balance and verify result - segmentPlans, channelPlans := balancer.Balance() + segmentPlans, channelPlans := suite.getCollectionBalancePlans(suite.balancer, c.collectionID) suite.ElementsMatch(c.expectChannelPlans, channelPlans) suite.ElementsMatch(c.expectPlans, segmentPlans) }) @@ -610,3 +576,15 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() { func TestScoreBasedBalancerSuite(t *testing.T) { suite.Run(t, new(ScoreBasedBalancerTestSuite)) } + +func (suite *ScoreBasedBalancerTestSuite) getCollectionBalancePlans(balancer *ScoreBasedBalancer, + collectionID int64) ([]SegmentAssignPlan, []ChannelAssignPlan) { + replicas := balancer.meta.ReplicaManager.GetByCollection(collectionID) + segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) + for _, replica := range replicas { + sPlans, cPlans := balancer.BalanceReplica(replica) + segmentPlans = append(segmentPlans, sPlans...) + channelPlans = append(channelPlans, cPlans...) + } + return segmentPlans, channelPlans +} diff --git a/internal/querycoordv2/balance/utils.go b/internal/querycoordv2/balance/utils.go index 77c2a97c38..edec10d503 100644 --- a/internal/querycoordv2/balance/utils.go +++ b/internal/querycoordv2/balance/utils.go @@ -29,7 +29,8 @@ import ( ) const ( - InfoPrefix = "Balance-Info:" + PlanInfoPrefix = "Balance-Plans:" + DistInfoPrefix = "Balance-Dists:" ) func CreateSegmentTasksFromPlans(ctx context.Context, checkerID int64, timeout time.Duration, plans []SegmentAssignPlan) []task.Task { @@ -121,7 +122,7 @@ func CreateChannelTasksFromPlans(ctx context.Context, checkerID int64, timeout t func PrintNewBalancePlans(collectionID int64, replicaID int64, segmentPlans []SegmentAssignPlan, channelPlans []ChannelAssignPlan) { - balanceInfo := fmt.Sprintf("%s{collectionID:%d, replicaID:%d, ", InfoPrefix, collectionID, replicaID) + balanceInfo := fmt.Sprintf("%s new plans:{collectionID:%d, replicaID:%d, ", PlanInfoPrefix, collectionID, replicaID) for _, segmentPlan := range segmentPlans { balanceInfo += segmentPlan.ToString() } @@ -134,8 +135,8 @@ func PrintNewBalancePlans(collectionID int64, replicaID int64, segmentPlans []Se func PrintCurrentReplicaDist(replica *meta.Replica, stoppingNodesSegments map[int64][]*meta.Segment, nodeSegments map[int64][]*meta.Segment, - channelManager *meta.ChannelDistManager) { - distInfo := fmt.Sprintf("%s {collectionID:%d, replicaID:%d, ", InfoPrefix, replica.CollectionID, replica.GetID()) + channelManager *meta.ChannelDistManager, segmentDistMgr *meta.SegmentDistManager) { + distInfo := fmt.Sprintf("%s {collectionID:%d, replicaID:%d, ", DistInfoPrefix, replica.CollectionID, replica.GetID()) //1. print stopping nodes segment distribution distInfo += "[stoppingNodesSegmentDist:" for stoppingNodeID, stoppedSegments := range stoppingNodesSegments { @@ -146,26 +147,31 @@ func PrintCurrentReplicaDist(replica *meta.Replica, } distInfo += "]]" } - distInfo += "]\n" + distInfo += "]" //2. print normal nodes segment distribution distInfo += "[normalNodesSegmentDist:" - for normalNodeID, normalNodeSegments := range nodeSegments { + for normalNodeID, normalNodeCollectionSegments := range nodeSegments { distInfo += fmt.Sprintf("[nodeID:%d, ", normalNodeID) distInfo += "loaded-segments:[" nodeRowSum := int64(0) - for _, normalSegment := range normalNodeSegments { - distInfo += fmt.Sprintf("[segmentID: %d, rowCount: %d] ", - normalSegment.GetID(), normalSegment.GetNumOfRows()) - nodeRowSum += normalSegment.GetNumOfRows() + normalNodeSegments := segmentDistMgr.GetByNode(normalNodeID) + for _, normalNodeSegment := range normalNodeSegments { + nodeRowSum += normalNodeSegment.GetNumOfRows() } - distInfo += fmt.Sprintf("] nodeRowSum:%d]", nodeRowSum) + nodeCollectionRowSum := int64(0) + for _, normalCollectionSegment := range normalNodeCollectionSegments { + distInfo += fmt.Sprintf("[segmentID: %d, rowCount: %d] ", + normalCollectionSegment.GetID(), normalCollectionSegment.GetNumOfRows()) + nodeCollectionRowSum += normalCollectionSegment.GetNumOfRows() + } + distInfo += fmt.Sprintf("] nodeRowSum:%d nodeCollectionRowSum:%d]", nodeRowSum, nodeCollectionRowSum) } - distInfo += "]\n" + distInfo += "]" //3. print stopping nodes channel distribution distInfo += "[stoppingNodesChannelDist:" for stoppingNodeID := range stoppingNodesSegments { - stoppingNodeChannels := channelManager.GetByNode(stoppingNodeID) + stoppingNodeChannels := channelManager.GetByCollectionAndNode(replica.GetCollectionID(), stoppingNodeID) distInfo += fmt.Sprintf("[nodeID:%d, count:%d,", stoppingNodeID, len(stoppingNodeChannels)) distInfo += "channels:[" for _, stoppingChan := range stoppingNodeChannels { @@ -173,12 +179,12 @@ func PrintCurrentReplicaDist(replica *meta.Replica, } distInfo += "]]" } - distInfo += "]\n" + distInfo += "]" //4. print normal nodes channel distribution distInfo += "[normalNodesChannelDist:" for normalNodeID := range nodeSegments { - normalNodeChannels := channelManager.GetByNode(normalNodeID) + normalNodeChannels := channelManager.GetByCollectionAndNode(replica.GetCollectionID(), normalNodeID) distInfo += fmt.Sprintf("[nodeID:%d, count:%d,", normalNodeID, len(normalNodeChannels)) distInfo += "channels:[" for _, normalNodeChan := range normalNodeChannels { @@ -186,7 +192,7 @@ func PrintCurrentReplicaDist(replica *meta.Replica, } distInfo += "]]" } - distInfo += "]\n" + distInfo += "]" log.Info(distInfo) } diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index b05dd111b2..2e08b9f879 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -18,22 +18,39 @@ package checkers import ( "context" + "sort" "time" + "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/balance" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" . "github.com/milvus-io/milvus/internal/querycoordv2/params" + "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/typeutil" + + "github.com/samber/lo" + "go.uber.org/zap" ) // BalanceChecker checks the cluster distribution and generates balance tasks. type BalanceChecker struct { baseChecker balance.Balance + meta *meta.Meta + nodeManager *session.NodeManager + normalBalanceCollectionsCurrentRound typeutil.UniqueSet + scheduler task.Scheduler } -func NewBalanceChecker(balancer balance.Balance) *BalanceChecker { +func NewBalanceChecker(meta *meta.Meta, balancer balance.Balance, nodeMgr *session.NodeManager, scheduler task.Scheduler) *BalanceChecker { return &BalanceChecker{ - Balance: balancer, + Balance: balancer, + meta: meta, + nodeManager: nodeMgr, + normalBalanceCollectionsCurrentRound: typeutil.NewUniqueSet(), + scheduler: scheduler, } } @@ -41,12 +58,90 @@ func (b *BalanceChecker) Description() string { return "BalanceChecker checks the cluster distribution and generates balance tasks" } +func (b *BalanceChecker) replicasToBalance() []int64 { + ids := b.meta.GetAll() + + // all replicas belonging to loading collection will be skipped + loadedCollections := lo.Filter(ids, func(cid int64, _ int) bool { + collection := b.meta.GetCollection(cid) + return collection != nil && b.meta.GetCollection(cid).Status == querypb.LoadStatus_Loaded + }) + sort.Slice(loadedCollections, func(i, j int) bool { + return loadedCollections[i] < loadedCollections[j] + }) + + // balance collections influenced by stopping nodes + stoppingReplicas := make([]int64, 0) + for _, cid := range loadedCollections { + replicas := b.meta.ReplicaManager.GetByCollection(cid) + for _, replica := range replicas { + for _, nodeID := range replica.GetNodes() { + isStopping, _ := b.nodeManager.IsStoppingNode(nodeID) + if isStopping { + stoppingReplicas = append(stoppingReplicas, replica.GetID()) + break + } + } + } + } + //do stopping balance only in this round + if len(stoppingReplicas) > 0 { + return stoppingReplicas + } + + //no stopping balance and auto balance is disabled, return empty collections for balance + if !Params.QueryCoordCfg.AutoBalance.GetAsBool() { + return nil + } + // scheduler is handling segment task, skip + if b.scheduler.GetSegmentTaskNum() != 0 { + return nil + } + + //iterator one normal collection in one round + normalReplicasToBalance := make([]int64, 0) + hasUnBalancedCollections := false + for _, cid := range loadedCollections { + if b.normalBalanceCollectionsCurrentRound.Contain(cid) { + log.Debug("ScoreBasedBalancer has balanced collection, skip balancing in this round", + zap.Int64("collectionID", cid)) + continue + } + hasUnBalancedCollections = true + b.normalBalanceCollectionsCurrentRound.Insert(cid) + for _, replica := range b.meta.ReplicaManager.GetByCollection(cid) { + normalReplicasToBalance = append(normalReplicasToBalance, replica.GetID()) + } + break + } + + if !hasUnBalancedCollections { + b.normalBalanceCollectionsCurrentRound.Clear() + log.Debug("ScoreBasedBalancer has balanced all " + + "collections in one round, clear collectionIDs for this round") + } + return normalReplicasToBalance +} + +func (b *BalanceChecker) balanceReplicas(replicaIDs []int64) ([]balance.SegmentAssignPlan, []balance.ChannelAssignPlan) { + segmentPlans, channelPlans := make([]balance.SegmentAssignPlan, 0), make([]balance.ChannelAssignPlan, 0) + for _, rid := range replicaIDs { + replica := b.meta.ReplicaManager.Get(rid) + sPlans, cPlans := b.Balance.BalanceReplica(replica) + segmentPlans = append(segmentPlans, sPlans...) + channelPlans = append(channelPlans, cPlans...) + if len(segmentPlans) != 0 || len(channelPlans) != 0 { + balance.PrintNewBalancePlans(replica.GetCollectionID(), replica.GetID(), sPlans, cPlans) + } + } + return segmentPlans, channelPlans +} + func (b *BalanceChecker) Check(ctx context.Context) []task.Task { ret := make([]task.Task, 0) - if !Params.QueryCoordCfg.AutoBalance.GetAsBool() { - return ret - } - segmentPlans, channelPlans := b.Balance.Balance() + + replicasToBalance := b.replicasToBalance() + segmentPlans, channelPlans := b.balanceReplicas(replicasToBalance) tasks := balance.CreateSegmentTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), segmentPlans) task.SetPriority(task.TaskPriorityLow, tasks...) diff --git a/internal/querycoordv2/checkers/balance_checker_test.go b/internal/querycoordv2/checkers/balance_checker_test.go new file mode 100644 index 0000000000..8331baa7c3 --- /dev/null +++ b/internal/querycoordv2/checkers/balance_checker_test.go @@ -0,0 +1,209 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkers + +import ( + "context" + "testing" + + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/balance" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" + . "github.com/milvus-io/milvus/internal/querycoordv2/params" + "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" +) + +type BalanceCheckerTestSuite struct { + suite.Suite + kv *etcdkv.EtcdKV + checker *BalanceChecker + balancer *balance.MockBalancer + meta *meta.Meta + broker *meta.MockBroker + nodeMgr *session.NodeManager + scheduler *task.MockScheduler +} + +func (suite *BalanceCheckerTestSuite) SetupSuite() { + Params.Init() +} + +func (suite *BalanceCheckerTestSuite) SetupTest() { + var err error + config := GenerateEtcdConfig() + cli, err := etcd.GetEtcdClient( + config.UseEmbedEtcd.GetAsBool(), + config.EtcdUseSSL.GetAsBool(), + config.Endpoints.GetAsStrings(), + config.EtcdTLSCert.GetValue(), + config.EtcdTLSKey.GetValue(), + config.EtcdTLSCACert.GetValue(), + config.EtcdTLSMinVersion.GetValue()) + suite.Require().NoError(err) + suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue()) + + // meta + store := meta.NewMetaStore(suite.kv) + idAllocator := RandomIncrementIDAllocator() + suite.nodeMgr = session.NewNodeManager() + suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr) + suite.broker = meta.NewMockBroker(suite.T()) + suite.scheduler = task.NewMockScheduler(suite.T()) + + suite.balancer = balance.NewMockBalancer(suite.T()) + suite.checker = NewBalanceChecker(suite.meta, suite.balancer, suite.nodeMgr, suite.scheduler) +} + +func (suite *BalanceCheckerTestSuite) TearDownTest() { + suite.kv.Close() +} + +func (suite *BalanceCheckerTestSuite) TestAutoBalanceConf() { + //set up nodes info + nodeID1, nodeID2 := 1, 2 + suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID1), "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID2), "localhost")) + suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID1)) + suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID2)) + + // set collections meta + cid1, replicaID1 := 1, 1 + collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1)) + collection1.Status = querypb.LoadStatus_Loaded + replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)}) + suite.checker.meta.CollectionManager.PutCollection(collection1) + suite.checker.meta.ReplicaManager.Put(replica1) + + cid2, replicaID2 := 2, 2 + collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2)) + collection2.Status = querypb.LoadStatus_Loaded + replica2 := utils.CreateTestReplica(int64(replicaID2), int64(cid2), []int64{int64(nodeID1), int64(nodeID2)}) + suite.checker.meta.CollectionManager.PutCollection(collection2) + suite.checker.meta.ReplicaManager.Put(replica2) + + //test disable auto balance + paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "false") + suite.scheduler.EXPECT().GetSegmentTaskNum().Maybe().Return(func() int { + return 0 + }) + replicasToBalance := suite.checker.replicasToBalance() + suite.Empty(replicasToBalance) + segPlans, _ := suite.checker.balanceReplicas(replicasToBalance) + suite.Empty(segPlans) + + //test enable auto balance + paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true") + idsToBalance := []int64{int64(replicaID1)} + replicasToBalance = suite.checker.replicasToBalance() + suite.ElementsMatch(idsToBalance, replicasToBalance) + //next round + idsToBalance = []int64{int64(replicaID2)} + replicasToBalance = suite.checker.replicasToBalance() + suite.ElementsMatch(idsToBalance, replicasToBalance) + //final round + replicasToBalance = suite.checker.replicasToBalance() + suite.Empty(replicasToBalance) +} + +func (suite *BalanceCheckerTestSuite) TestBusyScheduler() { + //set up nodes info + nodeID1, nodeID2 := 1, 2 + suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID1), "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID2), "localhost")) + suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID1)) + suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID2)) + + // set collections meta + cid1, replicaID1 := 1, 1 + collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1)) + collection1.Status = querypb.LoadStatus_Loaded + replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)}) + suite.checker.meta.CollectionManager.PutCollection(collection1) + suite.checker.meta.ReplicaManager.Put(replica1) + + cid2, replicaID2 := 2, 2 + collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2)) + collection2.Status = querypb.LoadStatus_Loaded + replica2 := utils.CreateTestReplica(int64(replicaID2), int64(cid2), []int64{int64(nodeID1), int64(nodeID2)}) + suite.checker.meta.CollectionManager.PutCollection(collection2) + suite.checker.meta.ReplicaManager.Put(replica2) + + //test scheduler busy + paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true") + suite.scheduler.EXPECT().GetSegmentTaskNum().Maybe().Return(func() int { + return 1 + }) + replicasToBalance := suite.checker.replicasToBalance() + suite.Empty(replicasToBalance) + segPlans, _ := suite.checker.balanceReplicas(replicasToBalance) + suite.Empty(segPlans) +} + +func (suite *BalanceCheckerTestSuite) TestStoppingBalance() { + //set up nodes info, stopping node1 + nodeID1, nodeID2 := 1, 2 + suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID1), "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(int64(nodeID2), "localhost")) + suite.nodeMgr.Stopping(int64(nodeID1)) + suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID1)) + suite.checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, int64(nodeID2)) + + // set collections meta + cid1, replicaID1 := 1, 1 + collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1)) + collection1.Status = querypb.LoadStatus_Loaded + replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)}) + suite.checker.meta.CollectionManager.PutCollection(collection1) + suite.checker.meta.ReplicaManager.Put(replica1) + + cid2, replicaID2 := 2, 2 + collection2 := utils.CreateTestCollection(int64(cid2), int32(replicaID2)) + collection2.Status = querypb.LoadStatus_Loaded + replica2 := utils.CreateTestReplica(int64(replicaID2), int64(cid2), []int64{int64(nodeID1), int64(nodeID2)}) + suite.checker.meta.CollectionManager.PutCollection(collection2) + suite.checker.meta.ReplicaManager.Put(replica2) + + //test stopping balance + idsToBalance := []int64{int64(replicaID1), int64(replicaID2)} + replicasToBalance := suite.checker.replicasToBalance() + suite.ElementsMatch(idsToBalance, replicasToBalance) + + //checker check + segPlans, chanPlans := make([]balance.SegmentAssignPlan, 0), make([]balance.ChannelAssignPlan, 0) + mockPlan := balance.SegmentAssignPlan{ + Segment: utils.CreateTestSegment(1, 1, 1, 1, 1, "1"), + ReplicaID: 1, + From: 1, + To: 2, + } + segPlans = append(segPlans, mockPlan) + suite.balancer.EXPECT().BalanceReplica(mock.Anything).Return(segPlans, chanPlans) + tasks := suite.checker.Check(context.TODO()) + suite.Len(tasks, 2) +} + +func TestBalanceCheckerSuite(t *testing.T) { + suite.Run(t, new(BalanceCheckerTestSuite)) +} diff --git a/internal/querycoordv2/checkers/controller.go b/internal/querycoordv2/checkers/controller.go index 90addbf1b9..9df4fc652c 100644 --- a/internal/querycoordv2/checkers/controller.go +++ b/internal/querycoordv2/checkers/controller.go @@ -62,7 +62,7 @@ func NewCheckerController( checkers := []Checker{ NewChannelChecker(meta, dist, targetMgr, balancer), NewSegmentChecker(meta, dist, targetMgr, balancer, nodeMgr), - NewBalanceChecker(balancer), + NewBalanceChecker(meta, balancer, nodeMgr, scheduler), } for i, checker := range checkers { checker.SetID(int64(i + 1)) diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index f0b34974da..5819931b65 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -201,7 +201,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) { func (ex *Executor) removeTask(task Task, step int) { if task.Err() != nil { - log.Info("excute action done, remove it", + log.Info("execute action done, remove it", zap.Int64("taskID", task.ID()), zap.Int("step", step), zap.Error(task.Err())) diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 9aee83dc9e..0529023abc 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -248,7 +248,7 @@ func (scheduler *taskScheduler) Add(task Task) error { return nil } -// check checks whether the task is valid to add, +// check whether the task is valid to add, // must hold lock func (scheduler *taskScheduler) preAdd(task Task) error { switch task := task.(type) { @@ -736,7 +736,7 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error { case ActionTypeReduce: // Do nothing here, - // the task should succeeded if the segment not exists + // the task should succeed if the segment not exists } } return nil @@ -761,7 +761,7 @@ func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) error { case ActionTypeReduce: // Do nothing here, - // the task should succeeded if the channel not exists + // the task should succeed if the channel not exists } } return nil diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 6e61e71cbd..c4395e2259 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1109,6 +1109,7 @@ type queryCoordConfig struct { Balancer ParamItem `refreshable:"true"` GlobalRowCountFactor ParamItem `refreshable:"true"` ScoreUnbalanceTolerationFactor ParamItem `refreshable:"true"` + ReverseUnbalanceTolerationFactor ParamItem `refreshable:"true"` OverloadedMemoryThresholdPercentage ParamItem `refreshable:"true"` BalanceIntervalSeconds ParamItem `refreshable:"true"` MemoryUsageMaxDifferencePercentage ParamItem `refreshable:"true"` @@ -1206,13 +1207,23 @@ func (p *queryCoordConfig) init(base *BaseTable) { p.ScoreUnbalanceTolerationFactor = ParamItem{ Key: "queryCoord.scoreUnbalanceTolerationFactor", Version: "2.0.0", - DefaultValue: "1.3", + DefaultValue: "0.05", PanicIfEmpty: true, - Doc: "the largest value for unbalanced extent between from and to nodes when doing balance", + Doc: "the least value for unbalanced extent between from and to nodes when doing balance", Export: true, } p.ScoreUnbalanceTolerationFactor.Init(base.mgr) + p.ReverseUnbalanceTolerationFactor = ParamItem{ + Key: "queryCoord.reverseUnBalanceTolerationFactor", + Version: "2.0.0", + DefaultValue: "1.3", + PanicIfEmpty: true, + Doc: "the largest value for unbalanced extent between from and to nodes after doing balance", + Export: true, + } + p.ReverseUnbalanceTolerationFactor.Init(base.mgr) + p.OverloadedMemoryThresholdPercentage = ParamItem{ Key: "queryCoord.overloadedMemoryThresholdPercentage", Version: "2.0.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 9264aaf6c4..f7a5f8da9a 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -272,6 +272,18 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 3000, checkHealthInterval) checkHealthRPCTimeout := Params.CheckHealthRPCTimeout.GetAsInt() assert.Equal(t, 100, checkHealthRPCTimeout) + + assert.Equal(t, 0.1, Params.GlobalRowCountFactor.GetAsFloat()) + params.Save("queryCoord.globalRowCountFactor", "0.4") + assert.Equal(t, 0.4, Params.GlobalRowCountFactor.GetAsFloat()) + + assert.Equal(t, 0.05, Params.ScoreUnbalanceTolerationFactor.GetAsFloat()) + params.Save("queryCoord.scoreUnbalanceTolerationFactor", "0.4") + assert.Equal(t, 0.4, Params.ScoreUnbalanceTolerationFactor.GetAsFloat()) + + assert.Equal(t, 1.3, Params.ReverseUnbalanceTolerationFactor.GetAsFloat()) + params.Save("queryCoord.reverseUnBalanceTolerationFactor", "1.5") + assert.Equal(t, 1.5, Params.ReverseUnbalanceTolerationFactor.GetAsFloat()) }) t.Run("test queryNodeConfig", func(t *testing.T) {