diff --git a/internal/querycoordv2/balance/multi_target_balance.go b/internal/querycoordv2/balance/multi_target_balance.go index 82f09d48e6..8ed1973c8c 100644 --- a/internal/querycoordv2/balance/multi_target_balance.go +++ b/internal/querycoordv2/balance/multi_target_balance.go @@ -493,6 +493,11 @@ func (b *MultiTargetBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAs // print current distribution before generating plans segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) if len(offlineNodes) != 0 { + if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { + log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", offlineNodes)) + return nil, nil + } + log.Info("Handle stopping nodes", zap.Any("stopping nodes", offlineNodes), zap.Any("available nodes", onlineNodes), diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index 799a51a3f7..1dfc1aa115 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -179,6 +179,11 @@ func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) ([]Segment segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) if len(offlineNodes) != 0 { + if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { + log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", offlineNodes)) + return nil, nil + } + log.Info("Handle stopping nodes", zap.Any("stopping nodes", offlineNodes), zap.Any("available nodes", onlineNodes), diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 416092069c..8369728e3d 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -213,6 +213,11 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss // print current distribution before generating plans segmentPlans, channelPlans := make([]SegmentAssignPlan, 0), make([]ChannelAssignPlan, 0) if len(offlineNodes) != 0 { + if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { + log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", offlineNodes)) + return nil, nil + } + log.Info("Handle stopping nodes", zap.Any("stopping nodes", offlineNodes), zap.Any("available nodes", onlineNodes), diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index 8b710ecdb3..2aa55a20c8 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -90,27 +91,29 @@ func (b *BalanceChecker) replicasToBalance() []int64 { return loadedCollections[i] < loadedCollections[j] }) - // balance collections influenced by stopping nodes - stoppingReplicas := make([]int64, 0) - for _, cid := range loadedCollections { - // if target and meta isn't ready, skip balance this collection - if !b.readyToCheck(cid) { - continue - } - replicas := b.meta.ReplicaManager.GetByCollection(cid) - for _, replica := range replicas { - for _, nodeID := range replica.GetNodes() { - isStopping, _ := b.nodeManager.IsStoppingNode(nodeID) - if isStopping { - stoppingReplicas = append(stoppingReplicas, replica.GetID()) - break + if paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { + // balance collections influenced by stopping nodes + stoppingReplicas := make([]int64, 0) + for _, cid := range loadedCollections { + // if target and meta isn't ready, skip balance this collection + if !b.readyToCheck(cid) { + continue + } + replicas := b.meta.ReplicaManager.GetByCollection(cid) + for _, replica := range replicas { + for _, nodeID := range replica.GetNodes() { + isStopping, _ := b.nodeManager.IsStoppingNode(nodeID) + if isStopping { + stoppingReplicas = append(stoppingReplicas, replica.GetID()) + break + } } } } - } - // do stopping balance only in this round - if len(stoppingReplicas) > 0 { - return stoppingReplicas + // do stopping balance only in this round + if len(stoppingReplicas) > 0 { + return stoppingReplicas + } } // no stopping balance and auto balance is disabled, return empty collections for balance diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index c629be0817..3b980cc806 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -105,22 +105,8 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task { } func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica) []task.Task { - log := log.Ctx(ctx).WithRateGroup("qcv2.SegmentChecker", 1, 60).With( - zap.Int64("collectionID", replica.CollectionID), - zap.Int64("replicaID", replica.ID)) ret := make([]task.Task, 0) - // get channel dist by replica (ch -> node list), cause more then one delegator may exists during channel balance. - // if more than one delegator exist, load/release segment may causes chaos, so we can skip it until channel balance finished. - dist := c.dist.ChannelDistManager.GetChannelDistByReplica(replica) - for ch, nodes := range dist { - if len(nodes) > 1 { - log.Info("skip check segment due to two shard leader exists", - zap.String("channelName", ch)) - return ret - } - } - // compare with targets to find the lack and redundancy of segments lacks, redundancies := c.getSealedSegmentDiff(replica.GetCollectionID(), replica.GetID()) // loadCtx := trace.ContextWithSpan(context.Background(), c.meta.GetCollection(replica.CollectionID).LoadSpan) diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index fe3928ebe9..21bd7e458a 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -196,46 +196,6 @@ func (suite *SegmentCheckerTestSuite) TestSkipLoadSegments() { suite.Len(tasks, 0) } -func (suite *SegmentCheckerTestSuite) TestSkipCheckReplica() { - checker := suite.checker - // set meta - checker.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) - checker.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1)) - checker.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) - suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) - suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) - checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) - checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) - - // set target - segments := []*datapb.SegmentInfo{ - { - ID: 1, - PartitionID: 1, - InsertChannel: "test-insert-channel", - }, - } - - channels := []*datapb.VchannelInfo{ - { - CollectionID: 1, - ChannelName: "test-insert-channel", - }, - } - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( - channels, segments, nil) - checker.targetMgr.UpdateCollectionNextTarget(int64(1)) - - // set dist - checker.dist.ChannelDistManager.Update(1, utils.CreateTestChannel(1, 1, 1, "test-insert-channel")) - checker.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 2, "test-insert-channel")) - checker.dist.SegmentDistManager.Update(2, utils.CreateTestSegment(1, 1, 11, 1, 1, "test-insert-channel")) - checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})) - - tasks := checker.Check(context.TODO()) - suite.Len(tasks, 0) -} - func (suite *SegmentCheckerTestSuite) TestReleaseSegments() { checker := suite.checker // set meta diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 2de2f87ddf..3609b6b975 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1467,6 +1467,7 @@ type queryCoordConfig struct { CheckAutoBalanceConfigInterval ParamItem `refreshable:"false"` CheckNodeSessionInterval ParamItem `refreshable:"false"` GracefulStopTimeout ParamItem `refreshable:"true"` + EnableStoppingBalance ParamItem `refreshable:"true"` } func (p *queryCoordConfig) init(base *BaseTable) { @@ -1934,6 +1935,15 @@ func (p *queryCoordConfig) init(base *BaseTable) { Export: true, } p.GracefulStopTimeout.Init(base.mgr) + + p.EnableStoppingBalance = ParamItem{ + Key: "queryCoord.enableStoppingBalance", + Version: "2.3.13", + DefaultValue: "true", + Doc: "whether enable stopping balance", + Export: true, + } + p.EnableStoppingBalance.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 17d68f8325..468a578c2e 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -294,6 +294,7 @@ func TestComponentParam(t *testing.T) { params.Save("queryCoord.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) + assert.Equal(t, true, Params.EnableStoppingBalance.GetAsBool()) }) t.Run("test queryNodeConfig", func(t *testing.T) { diff --git a/tests/integration/balance/balance_test.go b/tests/integration/balance/balance_test.go index c93b9f4531..893e4644a6 100644 --- a/tests/integration/balance/balance_test.go +++ b/tests/integration/balance/balance_test.go @@ -233,6 +233,86 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() { }, 10*time.Second, 1*time.Second) } +func (s *BalanceTestSuit) TestNodeDown() { + ctx := context.Background() + + // disable compact + s.Cluster.DataCoord.GcControl(ctx, &datapb.GcControlRequest{ + Base: commonpbutil.NewMsgBase(), + Command: datapb.GcCommand_Pause, + Params: []*commonpb.KeyValuePair{ + {Key: "duration", Value: "3600"}, + }, + }) + defer s.Cluster.DataCoord.GcControl(ctx, &datapb.GcControlRequest{ + Base: commonpbutil.NewMsgBase(), + Command: datapb.GcCommand_Resume, + }) + + // disable balance channel + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.AutoBalanceChannel.Key, "false") + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.EnableStoppingBalance.Key, "false") + + // init collection with 3 channel, each channel has 15 segment, each segment has 2000 row + // and load it with 2 replicas on 2 nodes. + name := "test_balance_" + funcutil.GenRandomStr() + s.initCollection(name, 1, 2, 15, 2000) + + // then we add 2 query node, after balance happens, expected each node have 1 channel and 2 segments + qn1 := s.Cluster.AddQueryNode() + qn2 := s.Cluster.AddQueryNode() + + // check segment num on new query node + s.Eventually(func() bool { + resp, err := qn1.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments)) + return len(resp.Channels) == 0 && len(resp.Segments) == 10 + }, 30*time.Second, 1*time.Second) + + s.Eventually(func() bool { + resp, err := qn2.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments)) + return len(resp.Channels) == 0 && len(resp.Segments) == 10 + }, 30*time.Second, 1*time.Second) + + // then we force stop qn1 and resume balance channel, let balance channel and load segment happens concurrently on qn2 + paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.AutoBalanceChannel.Key) + time.Sleep(1 * time.Second) + qn1.Stop() + + info, err := s.Cluster.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ + Base: commonpbutil.NewMsgBase(), + CollectionName: name, + }) + s.NoError(err) + s.True(merr.Ok(info.GetStatus())) + collectionID := info.GetCollectionID() + + // expected channel and segment concurrent move to qn2 + s.Eventually(func() bool { + resp, err := qn2.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments)) + return len(resp.Channels) == 1 && len(resp.Segments) == 15 + }, 30*time.Second, 1*time.Second) + + // expect all delegator will recover to healthy + s.Eventually(func() bool { + resp, err := s.Cluster.QueryCoord.GetShardLeaders(ctx, &querypb.GetShardLeadersRequest{ + Base: commonpbutil.NewMsgBase(), + CollectionID: collectionID, + }) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + return len(resp.Shards) == 2 + }, 30*time.Second, 1*time.Second) +} + func TestBalance(t *testing.T) { suite.Run(t, new(BalanceTestSuit)) }