diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 729dd5385f..a4d8b4a293 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -361,7 +361,7 @@ queryCoord: balanceCostThreshold: 0.001 # the threshold of balance cost, if the difference of cluster's cost after executing the balance plan is less than this value, the plan will not be executed checkSegmentInterval: 1000 checkChannelInterval: 1000 - checkBalanceInterval: 3000 + checkBalanceInterval: 300 autoBalanceInterval: 3000 # the interval for triggerauto balance checkIndexInterval: 10000 channelTaskTimeout: 60000 # 1 minute diff --git a/internal/datanode/compactor/clustering_compactor_storage_v2_test.go b/internal/datanode/compactor/clustering_compactor_storage_v2_test.go index 9667448064..dce9db50eb 100644 --- a/internal/datanode/compactor/clustering_compactor_storage_v2_test.go +++ b/internal/datanode/compactor/clustering_compactor_storage_v2_test.go @@ -23,6 +23,10 @@ import ( "testing" "time" + "github.com/samber/lo" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/flushcommon/metacache" "github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle" @@ -34,9 +38,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" - "github.com/samber/lo" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/suite" ) func TestClusteringCompactionTaskStorageV2Suite(t *testing.T) { diff --git a/internal/datanode/compactor/mix_compactor_storage_v2_test.go b/internal/datanode/compactor/mix_compactor_storage_v2_test.go index 459d3e8616..20382cf618 100644 --- a/internal/datanode/compactor/mix_compactor_storage_v2_test.go +++ b/internal/datanode/compactor/mix_compactor_storage_v2_test.go @@ -24,6 +24,10 @@ import ( "testing" "time" + "github.com/samber/lo" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/flushcommon/metacache" @@ -38,9 +42,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" - "github.com/samber/lo" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/suite" ) func TestMixCompactionTaskStorageV2Suite(t *testing.T) { diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index 52d17e2653..ea27820485 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -41,12 +41,14 @@ import ( // BalanceChecker checks the cluster distribution and generates balance tasks. type BalanceChecker struct { *checkerActivation - meta *meta.Meta - nodeManager *session.NodeManager - normalBalanceCollectionsCurrentRound typeutil.UniqueSet - scheduler task.Scheduler - targetMgr meta.TargetManagerInterface - getBalancerFunc GetBalancerFunc + meta *meta.Meta + nodeManager *session.NodeManager + scheduler task.Scheduler + targetMgr meta.TargetManagerInterface + getBalancerFunc GetBalancerFunc + + normalBalanceCollectionsCurrentRound typeutil.UniqueSet + stoppingBalanceCollectionsCurrentRound typeutil.UniqueSet // record auto balance ts autoBalanceTs time.Time @@ -59,13 +61,14 @@ func NewBalanceChecker(meta *meta.Meta, getBalancerFunc GetBalancerFunc, ) *BalanceChecker { return &BalanceChecker{ - checkerActivation: newCheckerActivation(), - meta: meta, - targetMgr: targetMgr, - nodeManager: nodeMgr, - normalBalanceCollectionsCurrentRound: typeutil.NewUniqueSet(), - scheduler: scheduler, - getBalancerFunc: getBalancerFunc, + checkerActivation: newCheckerActivation(), + meta: meta, + targetMgr: targetMgr, + nodeManager: nodeMgr, + normalBalanceCollectionsCurrentRound: typeutil.NewUniqueSet(), + stoppingBalanceCollectionsCurrentRound: typeutil.NewUniqueSet(), + scheduler: scheduler, + getBalancerFunc: getBalancerFunc, } } @@ -91,11 +94,24 @@ func (b *BalanceChecker) getReplicaForStoppingBalance(ctx context.Context) []int ids = b.sortCollections(ctx, ids) if paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { + hasUnbalancedCollection := false + defer func() { + if !hasUnbalancedCollection { + b.stoppingBalanceCollectionsCurrentRound.Clear() + log.RatedDebug(10, "BalanceChecker has triggered stopping balance for all "+ + "collections in one round, clear collectionIDs for this round") + } + }() for _, cid := range ids { // if target and meta isn't ready, skip balance this collection if !b.readyToCheck(ctx, cid) { continue } + if b.stoppingBalanceCollectionsCurrentRound.Contain(cid) { + log.RatedDebug(10, "BalanceChecker is balancing this collection, skip balancing in this round", + zap.Int64("collectionID", cid)) + continue + } replicas := b.meta.ReplicaManager.GetByCollection(ctx, cid) stoppingReplicas := make([]int64, 0) for _, replica := range replicas { @@ -104,6 +120,8 @@ func (b *BalanceChecker) getReplicaForStoppingBalance(ctx context.Context) []int } } if len(stoppingReplicas) > 0 { + hasUnbalancedCollection = true + b.stoppingBalanceCollectionsCurrentRound.Insert(cid) return stoppingReplicas } } @@ -146,7 +164,7 @@ func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context) []int64 hasUnbalancedCollection := false for _, cid := range loadedCollections { if b.normalBalanceCollectionsCurrentRound.Contain(cid) { - log.RatedDebug(10, "ScoreBasedBalancer is balancing this collection, skip balancing in this round", + log.RatedDebug(10, "BalanceChecker is balancing this collection, skip balancing in this round", zap.Int64("collectionID", cid)) continue } @@ -160,7 +178,7 @@ func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context) []int64 if !hasUnbalancedCollection { b.normalBalanceCollectionsCurrentRound.Clear() - log.RatedDebug(10, "ScoreBasedBalancer has balanced all "+ + log.RatedDebug(10, "BalanceChecker has triggered normal balance for all "+ "collections in one round, clear collectionIDs for this round") } return normalReplicasToBalance @@ -191,7 +209,7 @@ func (b *BalanceChecker) Check(ctx context.Context) []task.Task { // check for stopping balance first segmentPlans, channelPlans = b.balanceReplicas(ctx, stoppingReplicas) // iterate all collection to find a collection to balance - for len(segmentPlans) == 0 && len(channelPlans) == 0 && b.normalBalanceCollectionsCurrentRound.Len() > 0 { + for len(segmentPlans) == 0 && len(channelPlans) == 0 && b.stoppingBalanceCollectionsCurrentRound.Len() > 0 { replicasToBalance := b.getReplicaForStoppingBalance(ctx) segmentPlans, channelPlans = b.balanceReplicas(ctx, replicasToBalance) } diff --git a/internal/querycoordv2/checkers/balance_checker_test.go b/internal/querycoordv2/checkers/balance_checker_test.go index 97e76e1152..088d97059e 100644 --- a/internal/querycoordv2/checkers/balance_checker_test.go +++ b/internal/querycoordv2/checkers/balance_checker_test.go @@ -290,9 +290,31 @@ func (suite *BalanceCheckerTestSuite) TestStoppingBalance() { suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica()) // test stopping balance + // First round: check replica1 idsToBalance := []int64{int64(replicaID1)} replicasToBalance := suite.checker.getReplicaForStoppingBalance(ctx) suite.ElementsMatch(idsToBalance, replicasToBalance) + suite.True(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid1))) + suite.False(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid2))) + + // Second round: should skip replica1, check replica2 + idsToBalance = []int64{int64(replicaID2)} + replicasToBalance = suite.checker.getReplicaForStoppingBalance(ctx) + suite.ElementsMatch(idsToBalance, replicasToBalance) + suite.True(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid1))) + suite.True(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid2))) + + // Third round: all collections checked, should return nil and clear the set + replicasToBalance = suite.checker.getReplicaForStoppingBalance(ctx) + suite.Empty(replicasToBalance) + suite.False(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid1))) + suite.False(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid2))) + + // reset meta for Check test + suite.checker.stoppingBalanceCollectionsCurrentRound.Clear() + mr1 = replica1.CopyForWrite() + mr1.AddRONode(1) + suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica()) // checker check segPlans, chanPlans := make([]balance.SegmentAssignPlan, 0), make([]balance.ChannelAssignPlan, 0) @@ -730,6 +752,7 @@ func (suite *BalanceCheckerTestSuite) TestBalanceTriggerOrder() { replicas = suite.checker.getReplicaForNormalBalance(ctx) suite.Contains(replicas, replicaID1, "Should balance collection with lowest ID first") + suite.checker.stoppingBalanceCollectionsCurrentRound.Clear() // Stopping balance should also pick the collection with lowest ID first replicas = suite.checker.getReplicaForStoppingBalance(ctx) suite.Contains(replicas, replicaID1, "Stopping balance should prioritize collection with lowest ID") diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index 70da653358..e13eb273eb 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -127,7 +127,7 @@ func (s *Server) balanceSegments(ctx context.Context, actions = append(actions, releaseAction) } - t, err := task.NewSegmentTask(ctx, + t, err := task.NewSegmentTask(s.ctx, Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), utils.ManualBalance, collectionID, diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 2513718b82..72ecbfaf2b 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2308,7 +2308,7 @@ If this parameter is set false, Milvus simply searches the growing segments with p.BalanceCheckInterval = ParamItem{ Key: "queryCoord.checkBalanceInterval", Version: "2.3.0", - DefaultValue: "3000", + DefaultValue: "300", PanicIfEmpty: true, Export: true, } diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 6d71c17ccd..af51410031 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -347,6 +347,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 1000, Params.SegmentCheckInterval.GetAsInt()) assert.Equal(t, 1000, Params.ChannelCheckInterval.GetAsInt()) + assert.Equal(t, 300, Params.BalanceCheckInterval.GetAsInt()) params.Save(Params.BalanceCheckInterval.Key, "3000") assert.Equal(t, 3000, Params.BalanceCheckInterval.GetAsInt()) assert.Equal(t, 10000, Params.IndexCheckInterval.GetAsInt())