mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
enhance: Add trigger interval config for auto balance (#39154)
issue: #39156 Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
f7d9587720
commit
b9e3ec7175
@ -362,6 +362,7 @@ queryCoord:
|
|||||||
checkSegmentInterval: 1000
|
checkSegmentInterval: 1000
|
||||||
checkChannelInterval: 1000
|
checkChannelInterval: 1000
|
||||||
checkBalanceInterval: 3000
|
checkBalanceInterval: 3000
|
||||||
|
autoBalanceInterval: 3000 # the interval for triggerauto balance
|
||||||
checkIndexInterval: 10000
|
checkIndexInterval: 10000
|
||||||
channelTaskTimeout: 60000 # 1 minute
|
channelTaskTimeout: 60000 # 1 minute
|
||||||
segmentTaskTimeout: 120000 # 2 minute
|
segmentTaskTimeout: 120000 # 2 minute
|
||||||
|
|||||||
@ -46,6 +46,9 @@ type BalanceChecker struct {
|
|||||||
scheduler task.Scheduler
|
scheduler task.Scheduler
|
||||||
targetMgr meta.TargetManagerInterface
|
targetMgr meta.TargetManagerInterface
|
||||||
getBalancerFunc GetBalancerFunc
|
getBalancerFunc GetBalancerFunc
|
||||||
|
|
||||||
|
// record auto balance ts
|
||||||
|
autoBalanceTs time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBalanceChecker(meta *meta.Meta,
|
func NewBalanceChecker(meta *meta.Meta,
|
||||||
@ -80,22 +83,12 @@ func (b *BalanceChecker) readyToCheck(ctx context.Context, collectionID int64) b
|
|||||||
return metaExist && targetExist
|
return metaExist && targetExist
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BalanceChecker) replicasToBalance(ctx context.Context) []int64 {
|
func (b *BalanceChecker) getReplicaForStoppingBalance(ctx context.Context) []int64 {
|
||||||
ids := b.meta.GetAll(ctx)
|
ids := b.meta.GetAll(ctx)
|
||||||
|
|
||||||
// all replicas belonging to loading collection will be skipped
|
|
||||||
loadedCollections := lo.Filter(ids, func(cid int64, _ int) bool {
|
|
||||||
collection := b.meta.GetCollection(ctx, cid)
|
|
||||||
return collection != nil && collection.GetStatus() == querypb.LoadStatus_Loaded
|
|
||||||
})
|
|
||||||
sort.Slice(loadedCollections, func(i, j int) bool {
|
|
||||||
return loadedCollections[i] < loadedCollections[j]
|
|
||||||
})
|
|
||||||
|
|
||||||
if paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
|
if paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
|
||||||
// balance collections influenced by stopping nodes
|
// balance collections influenced by stopping nodes
|
||||||
stoppingReplicas := make([]int64, 0)
|
stoppingReplicas := make([]int64, 0)
|
||||||
for _, cid := range loadedCollections {
|
for _, cid := range ids {
|
||||||
// if target and meta isn't ready, skip balance this collection
|
// if target and meta isn't ready, skip balance this collection
|
||||||
if !b.readyToCheck(ctx, cid) {
|
if !b.readyToCheck(ctx, cid) {
|
||||||
continue
|
continue
|
||||||
@ -113,12 +106,27 @@ func (b *BalanceChecker) replicasToBalance(ctx context.Context) []int64 {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BalanceChecker) getReplicaForNormalBalance(ctx context.Context) []int64 {
|
||||||
// 1. no stopping balance and auto balance is disabled, return empty collections for balance
|
// 1. no stopping balance and auto balance is disabled, return empty collections for balance
|
||||||
// 2. when balancer isn't active, skip auto balance
|
// 2. when balancer isn't active, skip auto balance
|
||||||
if !Params.QueryCoordCfg.AutoBalance.GetAsBool() || !b.IsActive() {
|
if !Params.QueryCoordCfg.AutoBalance.GetAsBool() || !b.IsActive() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ids := b.meta.GetAll(ctx)
|
||||||
|
|
||||||
|
// all replicas belonging to loading collection will be skipped
|
||||||
|
loadedCollections := lo.Filter(ids, func(cid int64, _ int) bool {
|
||||||
|
collection := b.meta.GetCollection(ctx, cid)
|
||||||
|
return collection != nil && collection.GetStatus() == querypb.LoadStatus_Loaded
|
||||||
|
})
|
||||||
|
sort.Slice(loadedCollections, func(i, j int) bool {
|
||||||
|
return loadedCollections[i] < loadedCollections[j]
|
||||||
|
})
|
||||||
|
|
||||||
// Before performing balancing, check the CurrentTarget/LeaderView/Distribution for all collections.
|
// Before performing balancing, check the CurrentTarget/LeaderView/Distribution for all collections.
|
||||||
// If any collection has unready info, skip the balance operation to avoid inconsistencies.
|
// If any collection has unready info, skip the balance operation to avoid inconsistencies.
|
||||||
notReadyCollections := lo.Filter(loadedCollections, func(cid int64, _ int) bool {
|
notReadyCollections := lo.Filter(loadedCollections, func(cid int64, _ int) bool {
|
||||||
@ -173,16 +181,27 @@ func (b *BalanceChecker) balanceReplicas(ctx context.Context, replicaIDs []int64
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
|
func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
|
||||||
ret := make([]task.Task, 0)
|
var segmentPlans []balance.SegmentAssignPlan
|
||||||
|
var channelPlans []balance.ChannelAssignPlan
|
||||||
replicasToBalance := b.replicasToBalance(ctx)
|
stoppingReplicas := b.getReplicaForStoppingBalance(ctx)
|
||||||
segmentPlans, channelPlans := b.balanceReplicas(ctx, replicasToBalance)
|
if len(stoppingReplicas) > 0 {
|
||||||
|
// check for stopping balance first
|
||||||
|
segmentPlans, channelPlans = b.balanceReplicas(ctx, stoppingReplicas)
|
||||||
|
} else {
|
||||||
|
// then check for auto balance
|
||||||
|
if time.Since(b.autoBalanceTs) > paramtable.Get().QueryCoordCfg.AutoBalanceInterval.GetAsDuration(time.Millisecond) {
|
||||||
|
b.autoBalanceTs = time.Now()
|
||||||
|
replicasToBalance := b.getReplicaForNormalBalance(ctx)
|
||||||
|
segmentPlans, channelPlans = b.balanceReplicas(ctx, replicasToBalance)
|
||||||
// iterate all collection to find a collection to balance
|
// iterate all collection to find a collection to balance
|
||||||
for len(segmentPlans) == 0 && len(channelPlans) == 0 && b.normalBalanceCollectionsCurrentRound.Len() > 0 {
|
for len(segmentPlans) == 0 && len(channelPlans) == 0 && b.normalBalanceCollectionsCurrentRound.Len() > 0 {
|
||||||
replicasToBalance := b.replicasToBalance(ctx)
|
replicasToBalance := b.getReplicaForNormalBalance(ctx)
|
||||||
segmentPlans, channelPlans = b.balanceReplicas(ctx, replicasToBalance)
|
segmentPlans, channelPlans = b.balanceReplicas(ctx, replicasToBalance)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ret := make([]task.Task, 0)
|
||||||
tasks := balance.CreateSegmentTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), segmentPlans)
|
tasks := balance.CreateSegmentTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), segmentPlans)
|
||||||
task.SetPriority(task.TaskPriorityLow, tasks...)
|
task.SetPriority(task.TaskPriorityLow, tasks...)
|
||||||
task.SetReason("segment unbalanced", tasks...)
|
task.SetReason("segment unbalanced", tasks...)
|
||||||
|
|||||||
@ -19,9 +19,11 @@ package checkers
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
|
"go.uber.org/atomic"
|
||||||
|
|
||||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||||
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
|
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
|
||||||
@ -144,7 +146,7 @@ func (suite *BalanceCheckerTestSuite) TestAutoBalanceConf() {
|
|||||||
suite.scheduler.EXPECT().GetSegmentTaskNum().Maybe().Return(func() int {
|
suite.scheduler.EXPECT().GetSegmentTaskNum().Maybe().Return(func() int {
|
||||||
return 0
|
return 0
|
||||||
})
|
})
|
||||||
replicasToBalance := suite.checker.replicasToBalance(ctx)
|
replicasToBalance := suite.checker.getReplicaForNormalBalance(ctx)
|
||||||
suite.Empty(replicasToBalance)
|
suite.Empty(replicasToBalance)
|
||||||
segPlans, _ := suite.checker.balanceReplicas(ctx, replicasToBalance)
|
segPlans, _ := suite.checker.balanceReplicas(ctx, replicasToBalance)
|
||||||
suite.Empty(segPlans)
|
suite.Empty(segPlans)
|
||||||
@ -152,14 +154,14 @@ func (suite *BalanceCheckerTestSuite) TestAutoBalanceConf() {
|
|||||||
// test enable auto balance
|
// test enable auto balance
|
||||||
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
|
paramtable.Get().Save(Params.QueryCoordCfg.AutoBalance.Key, "true")
|
||||||
idsToBalance := []int64{int64(replicaID1)}
|
idsToBalance := []int64{int64(replicaID1)}
|
||||||
replicasToBalance = suite.checker.replicasToBalance(ctx)
|
replicasToBalance = suite.checker.getReplicaForNormalBalance(ctx)
|
||||||
suite.ElementsMatch(idsToBalance, replicasToBalance)
|
suite.ElementsMatch(idsToBalance, replicasToBalance)
|
||||||
// next round
|
// next round
|
||||||
idsToBalance = []int64{int64(replicaID2)}
|
idsToBalance = []int64{int64(replicaID2)}
|
||||||
replicasToBalance = suite.checker.replicasToBalance(ctx)
|
replicasToBalance = suite.checker.getReplicaForNormalBalance(ctx)
|
||||||
suite.ElementsMatch(idsToBalance, replicasToBalance)
|
suite.ElementsMatch(idsToBalance, replicasToBalance)
|
||||||
// final round
|
// final round
|
||||||
replicasToBalance = suite.checker.replicasToBalance(ctx)
|
replicasToBalance = suite.checker.getReplicaForNormalBalance(ctx)
|
||||||
suite.Empty(replicasToBalance)
|
suite.Empty(replicasToBalance)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -221,7 +223,7 @@ func (suite *BalanceCheckerTestSuite) TestBusyScheduler() {
|
|||||||
suite.scheduler.EXPECT().GetSegmentTaskNum().Maybe().Return(func() int {
|
suite.scheduler.EXPECT().GetSegmentTaskNum().Maybe().Return(func() int {
|
||||||
return 1
|
return 1
|
||||||
})
|
})
|
||||||
replicasToBalance := suite.checker.replicasToBalance(ctx)
|
replicasToBalance := suite.checker.getReplicaForNormalBalance(ctx)
|
||||||
suite.Len(replicasToBalance, 1)
|
suite.Len(replicasToBalance, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -289,7 +291,7 @@ func (suite *BalanceCheckerTestSuite) TestStoppingBalance() {
|
|||||||
|
|
||||||
// test stopping balance
|
// test stopping balance
|
||||||
idsToBalance := []int64{int64(replicaID1), int64(replicaID2)}
|
idsToBalance := []int64{int64(replicaID1), int64(replicaID2)}
|
||||||
replicasToBalance := suite.checker.replicasToBalance(ctx)
|
replicasToBalance := suite.checker.getReplicaForStoppingBalance(ctx)
|
||||||
suite.ElementsMatch(idsToBalance, replicasToBalance)
|
suite.ElementsMatch(idsToBalance, replicasToBalance)
|
||||||
|
|
||||||
// checker check
|
// checker check
|
||||||
@ -347,7 +349,7 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
|
|||||||
// test normal balance when one collection has unready target
|
// test normal balance when one collection has unready target
|
||||||
mockTarget.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true)
|
mockTarget.EXPECT().IsNextTargetExist(mock.Anything, mock.Anything).Return(true)
|
||||||
mockTarget.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(false)
|
mockTarget.EXPECT().IsCurrentTargetReady(mock.Anything, mock.Anything).Return(false)
|
||||||
replicasToBalance := suite.checker.replicasToBalance(ctx)
|
replicasToBalance := suite.checker.getReplicaForNormalBalance(ctx)
|
||||||
suite.Len(replicasToBalance, 0)
|
suite.Len(replicasToBalance, 0)
|
||||||
|
|
||||||
// test stopping balance with target not ready
|
// test stopping balance with target not ready
|
||||||
@ -364,10 +366,80 @@ func (suite *BalanceCheckerTestSuite) TestTargetNotReady() {
|
|||||||
suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica())
|
suite.checker.meta.ReplicaManager.Put(ctx, mr2.IntoReplica())
|
||||||
|
|
||||||
idsToBalance := []int64{int64(replicaID1)}
|
idsToBalance := []int64{int64(replicaID1)}
|
||||||
replicasToBalance = suite.checker.replicasToBalance(ctx)
|
replicasToBalance = suite.checker.getReplicaForStoppingBalance(ctx)
|
||||||
suite.ElementsMatch(idsToBalance, replicasToBalance)
|
suite.ElementsMatch(idsToBalance, replicasToBalance)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (suite *BalanceCheckerTestSuite) TestAutoBalanceInterval() {
|
||||||
|
ctx := context.Background()
|
||||||
|
// set up nodes info
|
||||||
|
nodeID1, nodeID2 := 1, 2
|
||||||
|
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||||
|
NodeID: int64(nodeID1),
|
||||||
|
Address: "localhost",
|
||||||
|
Hostname: "localhost",
|
||||||
|
}))
|
||||||
|
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
|
||||||
|
NodeID: int64(nodeID2),
|
||||||
|
Address: "localhost",
|
||||||
|
Hostname: "localhost",
|
||||||
|
}))
|
||||||
|
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID1))
|
||||||
|
suite.checker.meta.ResourceManager.HandleNodeUp(ctx, int64(nodeID2))
|
||||||
|
|
||||||
|
segments := []*datapb.SegmentInfo{
|
||||||
|
{
|
||||||
|
ID: 1,
|
||||||
|
PartitionID: 1,
|
||||||
|
InsertChannel: "test-insert-channel",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: 2,
|
||||||
|
PartitionID: 1,
|
||||||
|
InsertChannel: "test-insert-channel",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
channels := []*datapb.VchannelInfo{
|
||||||
|
{
|
||||||
|
CollectionID: 1,
|
||||||
|
ChannelName: "test-insert-channel",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(channels, segments, nil)
|
||||||
|
|
||||||
|
// set collections meta
|
||||||
|
cid1, replicaID1, partitionID1 := 1, 1, 1
|
||||||
|
collection1 := utils.CreateTestCollection(int64(cid1), int32(replicaID1))
|
||||||
|
collection1.Status = querypb.LoadStatus_Loaded
|
||||||
|
replica1 := utils.CreateTestReplica(int64(replicaID1), int64(cid1), []int64{int64(nodeID1), int64(nodeID2)})
|
||||||
|
partition1 := utils.CreateTestPartition(int64(cid1), int64(partitionID1))
|
||||||
|
suite.checker.meta.CollectionManager.PutCollection(ctx, collection1, partition1)
|
||||||
|
suite.checker.meta.ReplicaManager.Put(ctx, replica1)
|
||||||
|
suite.targetMgr.UpdateCollectionNextTarget(ctx, int64(cid1))
|
||||||
|
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, int64(cid1))
|
||||||
|
|
||||||
|
funcCallCounter := atomic.NewInt64(0)
|
||||||
|
suite.balancer.EXPECT().BalanceReplica(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, r *meta.Replica) ([]balance.SegmentAssignPlan, []balance.ChannelAssignPlan) {
|
||||||
|
funcCallCounter.Inc()
|
||||||
|
return nil, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
// first auto balance should be triggered
|
||||||
|
suite.checker.Check(ctx)
|
||||||
|
suite.Equal(funcCallCounter.Load(), int64(1))
|
||||||
|
|
||||||
|
// second auto balance won't be triggered due to autoBalanceInterval == 3s
|
||||||
|
suite.checker.Check(ctx)
|
||||||
|
suite.Equal(funcCallCounter.Load(), int64(1))
|
||||||
|
|
||||||
|
// set autoBalanceInterval == 1, sleep 1s, auto balance should be triggered
|
||||||
|
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.AutoBalanceInterval.Key, "1000")
|
||||||
|
paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.AutoBalanceInterval.Key)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
suite.checker.Check(ctx)
|
||||||
|
suite.Equal(funcCallCounter.Load(), int64(1))
|
||||||
|
}
|
||||||
|
|
||||||
func TestBalanceCheckerSuite(t *testing.T) {
|
func TestBalanceCheckerSuite(t *testing.T) {
|
||||||
suite.Run(t, new(BalanceCheckerTestSuite))
|
suite.Run(t, new(BalanceCheckerTestSuite))
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1864,6 +1864,7 @@ type queryCoordConfig struct {
|
|||||||
SegmentCheckInterval ParamItem `refreshable:"true"`
|
SegmentCheckInterval ParamItem `refreshable:"true"`
|
||||||
ChannelCheckInterval ParamItem `refreshable:"true"`
|
ChannelCheckInterval ParamItem `refreshable:"true"`
|
||||||
BalanceCheckInterval ParamItem `refreshable:"true"`
|
BalanceCheckInterval ParamItem `refreshable:"true"`
|
||||||
|
AutoBalanceInterval ParamItem `refreshable:"true"`
|
||||||
IndexCheckInterval ParamItem `refreshable:"true"`
|
IndexCheckInterval ParamItem `refreshable:"true"`
|
||||||
ChannelTaskTimeout ParamItem `refreshable:"true"`
|
ChannelTaskTimeout ParamItem `refreshable:"true"`
|
||||||
SegmentTaskTimeout ParamItem `refreshable:"true"`
|
SegmentTaskTimeout ParamItem `refreshable:"true"`
|
||||||
@ -2479,6 +2480,16 @@ If this parameter is set false, Milvus simply searches the growing segments with
|
|||||||
Export: false,
|
Export: false,
|
||||||
}
|
}
|
||||||
p.ClusterLevelLoadResourceGroups.Init(base.mgr)
|
p.ClusterLevelLoadResourceGroups.Init(base.mgr)
|
||||||
|
|
||||||
|
p.AutoBalanceInterval = ParamItem{
|
||||||
|
Key: "queryCoord.autoBalanceInterval",
|
||||||
|
Version: "2.5.3",
|
||||||
|
DefaultValue: "3000",
|
||||||
|
Doc: "the interval for triggerauto balance",
|
||||||
|
PanicIfEmpty: true,
|
||||||
|
Export: true,
|
||||||
|
}
|
||||||
|
p.AutoBalanceInterval.Init(base.mgr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// /////////////////////////////////////////////////////////////////////////////
|
// /////////////////////////////////////////////////////////////////////////////
|
||||||
|
|||||||
@ -379,6 +379,7 @@ func TestComponentParam(t *testing.T) {
|
|||||||
assert.Len(t, Params.ClusterLevelLoadResourceGroups.GetAsStrings(), 0)
|
assert.Len(t, Params.ClusterLevelLoadResourceGroups.GetAsStrings(), 0)
|
||||||
|
|
||||||
assert.Equal(t, 10, Params.CollectionChannelCountFactor.GetAsInt())
|
assert.Equal(t, 10, Params.CollectionChannelCountFactor.GetAsInt())
|
||||||
|
assert.Equal(t, 3000, Params.AutoBalanceInterval.GetAsInt())
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("test queryNodeConfig", func(t *testing.T) {
|
t.Run("test queryNodeConfig", func(t *testing.T) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user