mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance: Limit collection's normal balance speed (#34810)
issue: #34798 after we remove the task priority on query coord, to avoid load/release segment blocked by too much balance task, we limit the balance task size in each round. at same time, we reduce the balance interval to trigger balance more frequently. Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
c5da4fa094
commit
166fc902b0
@ -26,6 +26,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
type SegmentAssignPlan struct {
|
||||
@ -82,6 +83,8 @@ func (b *RoundRobinBalancer) AssignSegment(collectionID int64, segments []*meta.
|
||||
delta1, delta2 := b.scheduler.GetSegmentTaskDelta(id1, -1), b.scheduler.GetSegmentTaskDelta(id2, -1)
|
||||
return cnt1+delta1 < cnt2+delta2
|
||||
})
|
||||
|
||||
balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt()
|
||||
ret := make([]SegmentAssignPlan, 0, len(segments))
|
||||
for i, s := range segments {
|
||||
plan := SegmentAssignPlan{
|
||||
@ -90,6 +93,9 @@ func (b *RoundRobinBalancer) AssignSegment(collectionID int64, segments []*meta.
|
||||
To: nodesInfo[i%len(nodesInfo)].ID(),
|
||||
}
|
||||
ret = append(ret, plan)
|
||||
if len(ret) > balanceBatchSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
@ -27,6 +27,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
type BalanceTestSuite struct {
|
||||
@ -35,6 +36,10 @@ type BalanceTestSuite struct {
|
||||
roundRobinBalancer *RoundRobinBalancer
|
||||
}
|
||||
|
||||
func (suite *BalanceTestSuite) SetupSuite() {
|
||||
paramtable.Init()
|
||||
}
|
||||
|
||||
func (suite *BalanceTestSuite) SetupTest() {
|
||||
nodeManager := session.NewNodeManager()
|
||||
suite.mockScheduler = task.NewMockScheduler(suite.T())
|
||||
|
||||
@ -64,6 +64,7 @@ func (b *RowCountBasedBalancer) AssignSegment(collectionID int64, segments []*me
|
||||
return segments[i].GetNumOfRows() > segments[j].GetNumOfRows()
|
||||
})
|
||||
|
||||
balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt()
|
||||
plans := make([]SegmentAssignPlan, 0, len(segments))
|
||||
for _, s := range segments {
|
||||
// pick the node with the least row count and allocate to it.
|
||||
@ -74,6 +75,9 @@ func (b *RowCountBasedBalancer) AssignSegment(collectionID int64, segments []*me
|
||||
Segment: s,
|
||||
}
|
||||
plans = append(plans, plan)
|
||||
if len(plans) > balanceBatchSize {
|
||||
break
|
||||
}
|
||||
// change node's priority and push back
|
||||
p := ni.getPriority()
|
||||
ni.setPriority(p + int(s.GetNumOfRows()))
|
||||
|
||||
@ -82,6 +82,7 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta.
|
||||
return segments[i].GetNumOfRows() > segments[j].GetNumOfRows()
|
||||
})
|
||||
|
||||
balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt()
|
||||
plans := make([]SegmentAssignPlan, 0, len(segments))
|
||||
for _, s := range segments {
|
||||
func(s *meta.Segment) {
|
||||
@ -112,6 +113,10 @@ func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta.
|
||||
}
|
||||
targetNode.setPriority(targetNode.getPriority() + priorityChange)
|
||||
}(s)
|
||||
|
||||
if len(plans) > balanceBatchSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
return plans
|
||||
}
|
||||
@ -295,6 +300,8 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [
|
||||
return nil
|
||||
}
|
||||
|
||||
balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt()
|
||||
|
||||
// find the segment from the node which has more score than the average
|
||||
segmentsToMove := make([]*meta.Segment, 0)
|
||||
average := totalScore / len(onlineNodes)
|
||||
@ -309,6 +316,9 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [
|
||||
})
|
||||
for _, s := range segments {
|
||||
segmentsToMove = append(segmentsToMove, s)
|
||||
if len(segmentsToMove) >= balanceBatchSize {
|
||||
break
|
||||
}
|
||||
leftScore -= b.calculateSegmentScore(s)
|
||||
if leftScore <= average {
|
||||
break
|
||||
|
||||
@ -171,6 +171,11 @@ func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
|
||||
|
||||
replicasToBalance := b.replicasToBalance()
|
||||
segmentPlans, channelPlans := b.balanceReplicas(replicasToBalance)
|
||||
// iterate all collection to find a collection to balance
|
||||
for len(segmentPlans) == 0 && len(channelPlans) == 0 && b.normalBalanceCollectionsCurrentRound.Len() > 0 {
|
||||
replicasToBalance := b.replicasToBalance()
|
||||
segmentPlans, channelPlans = b.balanceReplicas(replicasToBalance)
|
||||
}
|
||||
|
||||
tasks := balance.CreateSegmentTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), segmentPlans)
|
||||
task.SetPriority(task.TaskPriorityLow, tasks...)
|
||||
|
||||
@ -1613,8 +1613,9 @@ type queryCoordConfig struct {
|
||||
EnableStoppingBalance ParamItem `refreshable:"true"`
|
||||
ChannelExclusiveNodeFactor ParamItem `refreshable:"true"`
|
||||
|
||||
CollectionObserverInterval ParamItem `refreshable:"false"`
|
||||
CheckExecutedFlagInterval ParamItem `refreshable:"false"`
|
||||
CollectionObserverInterval ParamItem `refreshable:"false"`
|
||||
CheckExecutedFlagInterval ParamItem `refreshable:"false"`
|
||||
CollectionBalanceSegmentBatchSize ParamItem `refreshable true`
|
||||
}
|
||||
|
||||
func (p *queryCoordConfig) init(base *BaseTable) {
|
||||
@ -1867,7 +1868,7 @@ func (p *queryCoordConfig) init(base *BaseTable) {
|
||||
p.BalanceCheckInterval = ParamItem{
|
||||
Key: "queryCoord.checkBalanceInterval",
|
||||
Version: "2.3.0",
|
||||
DefaultValue: "10000",
|
||||
DefaultValue: "3000",
|
||||
PanicIfEmpty: true,
|
||||
Export: true,
|
||||
}
|
||||
@ -2128,6 +2129,15 @@ func (p *queryCoordConfig) init(base *BaseTable) {
|
||||
Export: false,
|
||||
}
|
||||
p.CheckExecutedFlagInterval.Init(base.mgr)
|
||||
|
||||
p.CollectionBalanceSegmentBatchSize = ParamItem{
|
||||
Key: "queryCoord.collectionBalanceSegmentBatchSize",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "5",
|
||||
Doc: "the max balance task number for collection at each round",
|
||||
Export: false,
|
||||
}
|
||||
p.CollectionBalanceSegmentBatchSize.Init(base.mgr)
|
||||
}
|
||||
|
||||
// /////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@ -330,6 +330,7 @@ func TestComponentParam(t *testing.T) {
|
||||
params.Reset("queryCoord.checkExecutedFlagInterval")
|
||||
|
||||
assert.Equal(t, 0.3, Params.DelegatorMemoryOverloadFactor.GetAsFloat())
|
||||
assert.Equal(t, 5, Params.CollectionBalanceSegmentBatchSize.GetAsInt())
|
||||
})
|
||||
|
||||
t.Run("test queryNodeConfig", func(t *testing.T) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user