fix: Address manual balance and balance check issues (#41037)

issue: #37651
- Fix context propagation for manual balance segment task creation from
PR #38080.
- Optimize stopping balance by preventing redundant checks per round,
addressing performance regression from PR #40297.
- Decrease default `checkBalanceInterval` from 3000ms to 300ms.
- Correct minor log messages in `BalanceChecker`.

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2025-04-03 15:48:27 +08:00 committed by GitHub
parent afb4621012
commit bf8547578f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 69 additions and 25 deletions

View File

@ -361,7 +361,7 @@ queryCoord:
balanceCostThreshold: 0.001 # the threshold of balance cost, if the difference of cluster's cost after executing the balance plan is less than this value, the plan will not be executed balanceCostThreshold: 0.001 # the threshold of balance cost, if the difference of cluster's cost after executing the balance plan is less than this value, the plan will not be executed
checkSegmentInterval: 1000 checkSegmentInterval: 1000
checkChannelInterval: 1000 checkChannelInterval: 1000
checkBalanceInterval: 3000 checkBalanceInterval: 300
autoBalanceInterval: 3000 # the interval for triggerauto balance autoBalanceInterval: 3000 # the interval for triggerauto balance
checkIndexInterval: 10000 checkIndexInterval: 10000
channelTaskTimeout: 60000 # 1 minute channelTaskTimeout: 60000 # 1 minute

View File

@ -23,6 +23,10 @@ import (
"testing" "testing"
"time" "time"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/flushcommon/metacache" "github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle" "github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
@ -34,9 +38,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil" "github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
) )
func TestClusteringCompactionTaskStorageV2Suite(t *testing.T) { func TestClusteringCompactionTaskStorageV2Suite(t *testing.T) {

View File

@ -24,6 +24,10 @@ import (
"testing" "testing"
"time" "time"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/metacache" "github.com/milvus-io/milvus/internal/flushcommon/metacache"
@ -38,9 +42,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil" "github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
) )
func TestMixCompactionTaskStorageV2Suite(t *testing.T) { func TestMixCompactionTaskStorageV2Suite(t *testing.T) {

View File

@ -43,11 +43,13 @@ type BalanceChecker struct {
*checkerActivation *checkerActivation
meta *meta.Meta meta *meta.Meta
nodeManager *session.NodeManager nodeManager *session.NodeManager
normalBalanceCollectionsCurrentRound typeutil.UniqueSet
scheduler task.Scheduler scheduler task.Scheduler
targetMgr meta.TargetManagerInterface targetMgr meta.TargetManagerInterface
getBalancerFunc GetBalancerFunc getBalancerFunc GetBalancerFunc
normalBalanceCollectionsCurrentRound typeutil.UniqueSet
stoppingBalanceCollectionsCurrentRound typeutil.UniqueSet
// record auto balance ts // record auto balance ts
autoBalanceTs time.Time autoBalanceTs time.Time
} }
@ -64,6 +66,7 @@ func NewBalanceChecker(meta *meta.Meta,
targetMgr: targetMgr, targetMgr: targetMgr,
nodeManager: nodeMgr, nodeManager: nodeMgr,
normalBalanceCollectionsCurrentRound: typeutil.NewUniqueSet(), normalBalanceCollectionsCurrentRound: typeutil.NewUniqueSet(),
stoppingBalanceCollectionsCurrentRound: typeutil.NewUniqueSet(),
scheduler: scheduler, scheduler: scheduler,
getBalancerFunc: getBalancerFunc, getBalancerFunc: getBalancerFunc,
} }
@ -91,11 +94,24 @@ func (b *BalanceChecker) getReplicaForStoppingBalance(ctx context.Context) []int
ids = b.sortCollections(ctx, ids) ids = b.sortCollections(ctx, ids)
if paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { if paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
hasUnbalancedCollection := false
defer func() {
if !hasUnbalancedCollection {
b.stoppingBalanceCollectionsCurrentRound.Clear()
log.RatedDebug(10, "BalanceChecker has triggered stopping balance for all "+
"collections in one round, clear collectionIDs for this round")
}
}()
for _, cid := range ids { for _, cid := range ids {
// if target and meta isn't ready, skip balance this collection // if target and meta isn't ready, skip balance this collection
if !b.readyToCheck(ctx, cid) { if !b.readyToCheck(ctx, cid) {
continue continue
} }
if b.stoppingBalanceCollectionsCurrentRound.Contain(cid) {
log.RatedDebug(10, "BalanceChecker is balancing this collection, skip balancing in this round",
zap.Int64("collectionID", cid))
continue
}
replicas := b.meta.ReplicaManager.GetByCollection(ctx, cid) replicas := b.meta.ReplicaManager.GetByCollection(ctx, cid)
stoppingReplicas := make([]int64, 0) stoppingReplicas := make([]int64, 0)
for _, replica := range replicas { for _, replica := range replicas {
@ -104,6 +120,8 @@ func (b *BalanceChecker) getReplicaForStoppingBalance(ctx context.Context) []int
} }
} }
if len(stoppingReplicas) > 0 { if len(stoppingReplicas) > 0 {
hasUnbalancedCollection = true
b.stoppingBalanceCollectionsCurrentRound.Insert(cid)
return stoppingReplicas return stoppingReplicas
} }
} }
@ -146,7 +164,7 @@ func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context) []int64
hasUnbalancedCollection := false hasUnbalancedCollection := false
for _, cid := range loadedCollections { for _, cid := range loadedCollections {
if b.normalBalanceCollectionsCurrentRound.Contain(cid) { if b.normalBalanceCollectionsCurrentRound.Contain(cid) {
log.RatedDebug(10, "ScoreBasedBalancer is balancing this collection, skip balancing in this round", log.RatedDebug(10, "BalanceChecker is balancing this collection, skip balancing in this round",
zap.Int64("collectionID", cid)) zap.Int64("collectionID", cid))
continue continue
} }
@ -160,7 +178,7 @@ func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context) []int64
if !hasUnbalancedCollection { if !hasUnbalancedCollection {
b.normalBalanceCollectionsCurrentRound.Clear() b.normalBalanceCollectionsCurrentRound.Clear()
log.RatedDebug(10, "ScoreBasedBalancer has balanced all "+ log.RatedDebug(10, "BalanceChecker has triggered normal balance for all "+
"collections in one round, clear collectionIDs for this round") "collections in one round, clear collectionIDs for this round")
} }
return normalReplicasToBalance return normalReplicasToBalance
@ -191,7 +209,7 @@ func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
// check for stopping balance first // check for stopping balance first
segmentPlans, channelPlans = b.balanceReplicas(ctx, stoppingReplicas) segmentPlans, channelPlans = b.balanceReplicas(ctx, stoppingReplicas)
// iterate all collection to find a collection to balance // iterate all collection to find a collection to balance
for len(segmentPlans) == 0 && len(channelPlans) == 0 && b.normalBalanceCollectionsCurrentRound.Len() > 0 { for len(segmentPlans) == 0 && len(channelPlans) == 0 && b.stoppingBalanceCollectionsCurrentRound.Len() > 0 {
replicasToBalance := b.getReplicaForStoppingBalance(ctx) replicasToBalance := b.getReplicaForStoppingBalance(ctx)
segmentPlans, channelPlans = b.balanceReplicas(ctx, replicasToBalance) segmentPlans, channelPlans = b.balanceReplicas(ctx, replicasToBalance)
} }

View File

@ -290,9 +290,31 @@ func (suite *BalanceCheckerTestSuite) TestStoppingBalance() {
suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica()) suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica())
// test stopping balance // test stopping balance
// First round: check replica1
idsToBalance := []int64{int64(replicaID1)} idsToBalance := []int64{int64(replicaID1)}
replicasToBalance := suite.checker.getReplicaForStoppingBalance(ctx) replicasToBalance := suite.checker.getReplicaForStoppingBalance(ctx)
suite.ElementsMatch(idsToBalance, replicasToBalance) suite.ElementsMatch(idsToBalance, replicasToBalance)
suite.True(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid1)))
suite.False(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid2)))
// Second round: should skip replica1, check replica2
idsToBalance = []int64{int64(replicaID2)}
replicasToBalance = suite.checker.getReplicaForStoppingBalance(ctx)
suite.ElementsMatch(idsToBalance, replicasToBalance)
suite.True(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid1)))
suite.True(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid2)))
// Third round: all collections checked, should return nil and clear the set
replicasToBalance = suite.checker.getReplicaForStoppingBalance(ctx)
suite.Empty(replicasToBalance)
suite.False(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid1)))
suite.False(suite.checker.stoppingBalanceCollectionsCurrentRound.Contain(int64(cid2)))
// reset meta for Check test
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
mr1 = replica1.CopyForWrite()
mr1.AddRONode(1)
suite.checker.meta.ReplicaManager.Put(ctx, mr1.IntoReplica())
// checker check // checker check
segPlans, chanPlans := make([]balance.SegmentAssignPlan, 0), make([]balance.ChannelAssignPlan, 0) segPlans, chanPlans := make([]balance.SegmentAssignPlan, 0), make([]balance.ChannelAssignPlan, 0)
@ -730,6 +752,7 @@ func (suite *BalanceCheckerTestSuite) TestBalanceTriggerOrder() {
replicas = suite.checker.getReplicaForNormalBalance(ctx) replicas = suite.checker.getReplicaForNormalBalance(ctx)
suite.Contains(replicas, replicaID1, "Should balance collection with lowest ID first") suite.Contains(replicas, replicaID1, "Should balance collection with lowest ID first")
suite.checker.stoppingBalanceCollectionsCurrentRound.Clear()
// Stopping balance should also pick the collection with lowest ID first // Stopping balance should also pick the collection with lowest ID first
replicas = suite.checker.getReplicaForStoppingBalance(ctx) replicas = suite.checker.getReplicaForStoppingBalance(ctx)
suite.Contains(replicas, replicaID1, "Stopping balance should prioritize collection with lowest ID") suite.Contains(replicas, replicaID1, "Stopping balance should prioritize collection with lowest ID")

View File

@ -127,7 +127,7 @@ func (s *Server) balanceSegments(ctx context.Context,
actions = append(actions, releaseAction) actions = append(actions, releaseAction)
} }
t, err := task.NewSegmentTask(ctx, t, err := task.NewSegmentTask(s.ctx,
Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond),
utils.ManualBalance, utils.ManualBalance,
collectionID, collectionID,

View File

@ -2308,7 +2308,7 @@ If this parameter is set false, Milvus simply searches the growing segments with
p.BalanceCheckInterval = ParamItem{ p.BalanceCheckInterval = ParamItem{
Key: "queryCoord.checkBalanceInterval", Key: "queryCoord.checkBalanceInterval",
Version: "2.3.0", Version: "2.3.0",
DefaultValue: "3000", DefaultValue: "300",
PanicIfEmpty: true, PanicIfEmpty: true,
Export: true, Export: true,
} }

View File

@ -347,6 +347,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 1000, Params.SegmentCheckInterval.GetAsInt()) assert.Equal(t, 1000, Params.SegmentCheckInterval.GetAsInt())
assert.Equal(t, 1000, Params.ChannelCheckInterval.GetAsInt()) assert.Equal(t, 1000, Params.ChannelCheckInterval.GetAsInt())
assert.Equal(t, 300, Params.BalanceCheckInterval.GetAsInt())
params.Save(Params.BalanceCheckInterval.Key, "3000") params.Save(Params.BalanceCheckInterval.Key, "3000")
assert.Equal(t, 3000, Params.BalanceCheckInterval.GetAsInt()) assert.Equal(t, 3000, Params.BalanceCheckInterval.GetAsInt())
assert.Equal(t, 10000, Params.IndexCheckInterval.GetAsInt()) assert.Equal(t, 10000, Params.IndexCheckInterval.GetAsInt())