mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: add ddl and dcl concurrency to avoid competition (#37672)
issue: #37166 Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
parent
65d3c6622a
commit
81fa7dd52c
@ -286,6 +286,8 @@ proxy:
|
||||
ginLogging: true
|
||||
ginLogSkipPaths: / # skip url path for gin log
|
||||
maxTaskNum: 1024 # The maximum number of tasks in the task queue of the proxy.
|
||||
ddlConcurrency: 16 # The concurrent execution number of DDL at proxy.
|
||||
dclConcurrency: 16 # The concurrent execution number of DCL at proxy.
|
||||
mustUsePartitionKey: false # switch for whether proxy must use partition key for the collection
|
||||
accessLog:
|
||||
enable: false # Whether to enable the access log feature.
|
||||
|
||||
@ -493,6 +493,8 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) {
|
||||
// definitionLoop schedules the ddl tasks.
|
||||
func (sched *taskScheduler) definitionLoop() {
|
||||
defer sched.wg.Done()
|
||||
|
||||
pool := conc.NewPool[struct{}](paramtable.Get().ProxyCfg.DDLConcurrency.GetAsInt(), conc.WithExpiryDuration(time.Minute))
|
||||
for {
|
||||
select {
|
||||
case <-sched.ctx.Done():
|
||||
@ -500,7 +502,10 @@ func (sched *taskScheduler) definitionLoop() {
|
||||
case <-sched.ddQueue.utChan():
|
||||
if !sched.ddQueue.utEmpty() {
|
||||
t := sched.scheduleDdTask()
|
||||
sched.processTask(t, sched.ddQueue)
|
||||
pool.Submit(func() (struct{}, error) {
|
||||
sched.processTask(t, sched.ddQueue)
|
||||
return struct{}{}, nil
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -509,6 +514,8 @@ func (sched *taskScheduler) definitionLoop() {
|
||||
// controlLoop schedule the data control operation, such as flush
|
||||
func (sched *taskScheduler) controlLoop() {
|
||||
defer sched.wg.Done()
|
||||
|
||||
pool := conc.NewPool[struct{}](paramtable.Get().ProxyCfg.DCLConcurrency.GetAsInt(), conc.WithExpiryDuration(time.Minute))
|
||||
for {
|
||||
select {
|
||||
case <-sched.ctx.Done():
|
||||
@ -516,7 +523,10 @@ func (sched *taskScheduler) controlLoop() {
|
||||
case <-sched.dcQueue.utChan():
|
||||
if !sched.dcQueue.utEmpty() {
|
||||
t := sched.scheduleDcTask()
|
||||
sched.processTask(t, sched.dcQueue)
|
||||
pool.Submit(func() (struct{}, error) {
|
||||
sched.processTask(t, sched.dcQueue)
|
||||
return struct{}{}, nil
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1241,6 +1241,8 @@ type proxyConfig struct {
|
||||
MaxUserNum ParamItem `refreshable:"true"`
|
||||
MaxRoleNum ParamItem `refreshable:"true"`
|
||||
MaxTaskNum ParamItem `refreshable:"false"`
|
||||
DDLConcurrency ParamItem `refreshable:"true"`
|
||||
DCLConcurrency ParamItem `refreshable:"true"`
|
||||
ShardLeaderCacheInterval ParamItem `refreshable:"false"`
|
||||
ReplicaSelectionPolicy ParamItem `refreshable:"false"`
|
||||
CheckQueryNodeHealthInterval ParamItem `refreshable:"false"`
|
||||
@ -1387,6 +1389,24 @@ func (p *proxyConfig) init(base *BaseTable) {
|
||||
}
|
||||
p.MaxTaskNum.Init(base.mgr)
|
||||
|
||||
p.DDLConcurrency = ParamItem{
|
||||
Key: "proxy.ddlConcurrency",
|
||||
Version: "2.5.0",
|
||||
DefaultValue: "16",
|
||||
Doc: "The concurrent execution number of DDL at proxy.",
|
||||
Export: true,
|
||||
}
|
||||
p.DDLConcurrency.Init(base.mgr)
|
||||
|
||||
p.DCLConcurrency = ParamItem{
|
||||
Key: "proxy.dclConcurrency",
|
||||
Version: "2.5.0",
|
||||
DefaultValue: "16",
|
||||
Doc: "The concurrent execution number of DCL at proxy.",
|
||||
Export: true,
|
||||
}
|
||||
p.DCLConcurrency.Init(base.mgr)
|
||||
|
||||
p.GinLogging = ParamItem{
|
||||
Key: "proxy.ginLogging",
|
||||
Version: "2.2.0",
|
||||
|
||||
@ -211,6 +211,9 @@ func TestComponentParam(t *testing.T) {
|
||||
|
||||
assert.Equal(t, int64(10), Params.CheckWorkloadRequestNum.GetAsInt64())
|
||||
assert.Equal(t, float64(0.1), Params.WorkloadToleranceFactor.GetAsFloat())
|
||||
|
||||
assert.Equal(t, int64(16), Params.DDLConcurrency.GetAsInt64())
|
||||
assert.Equal(t, int64(16), Params.DCLConcurrency.GetAsInt64())
|
||||
})
|
||||
|
||||
// t.Run("test proxyConfig panic", func(t *testing.T) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user