From 0c200ff78114b11b96787c4974283f0d04973716 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Wed, 7 Jan 2026 15:47:25 +0800 Subject: [PATCH] enhance:Limit the number of concurrent vector index builds per worker (#46773) issue: #46772 --------- Signed-off-by: Cai Zhang --- internal/datanode/index/pool.go | 65 +++++++++++++++++++++ internal/datanode/index/pool_test.go | 64 ++++++++++++++++++++ internal/datanode/index/scheduler.go | 22 ++++--- internal/datanode/index/scheduler_test.go | 4 ++ internal/datanode/index/task.go | 1 + internal/datanode/index/task_analyze.go | 4 ++ internal/datanode/index/task_index.go | 5 ++ internal/datanode/index/task_stats.go | 4 ++ internal/datanode/index/util.go | 9 +++ pkg/util/paramtable/component_param.go | 11 +++- pkg/util/paramtable/component_param_test.go | 2 + 11 files changed, 178 insertions(+), 13 deletions(-) create mode 100644 internal/datanode/index/pool.go create mode 100644 internal/datanode/index/pool_test.go diff --git a/internal/datanode/index/pool.go b/internal/datanode/index/pool.go new file mode 100644 index 0000000000..dc4288dd7e --- /dev/null +++ b/internal/datanode/index/pool.go @@ -0,0 +1,65 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package index + +import ( + "context" + "sync" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/v2/config" + "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/util/conc" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" +) + +var ( + vecIndexBuildPool *conc.Pool[any] + vecIndexBuildPoolInitOnce sync.Once +) + +func initVecIndexBuildPool() { + pt := paramtable.Get() + initPoolSize := pt.DataNodeCfg.MaxVecIndexBuildConcurrency.GetAsInt() + vecIndexBuildPool = conc.NewPool[any]( + initPoolSize, + ) + + watchKey := pt.DataNodeCfg.MaxVecIndexBuildConcurrency.Key + pt.Watch(watchKey, config.NewHandler(watchKey, resizeVecIndexBuildPool)) + log.Info("init vector index building pool done", zap.Int("size", initPoolSize)) +} + +func resizeVecIndexBuildPool(evt *config.Event) { + if evt.HasUpdated { + newSize := paramtable.Get().DataNodeCfg.MaxVecIndexBuildConcurrency.GetAsInt() + log := log.Ctx(context.Background()).With(zap.Int("newSize", newSize)) + + err := GetVecIndexBuildPool().Resize(newSize) + if err != nil { + log.Warn("failed to resize pool", zap.Error(err)) + return + } + log.Info("vector index building pool resize successfully") + } +} + +func GetVecIndexBuildPool() *conc.Pool[any] { + vecIndexBuildPoolInitOnce.Do(initVecIndexBuildPool) + return vecIndexBuildPool +} diff --git a/internal/datanode/index/pool_test.go b/internal/datanode/index/pool_test.go new file mode 100644 index 0000000000..1ab49aa00c --- /dev/null +++ b/internal/datanode/index/pool_test.go @@ -0,0 +1,64 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package index + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/v2/config" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" +) + +func TestResizePools(t *testing.T) { + paramtable.Get().Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) + pt := paramtable.Get() + + defer func() { + _ = pt.Reset(pt.DataNodeCfg.MaxVecIndexBuildConcurrency.Key) + }() + + t.Run("GetVecIndexBuildPool", func(t *testing.T) { + expectedCap := pt.DataNodeCfg.MaxVecIndexBuildConcurrency.GetAsInt() + assert.Equal(t, expectedCap, GetVecIndexBuildPool().Cap()) + resizeVecIndexBuildPool(&config.Event{ + HasUpdated: true, + }) + assert.Equal(t, expectedCap, GetVecIndexBuildPool().Cap()) + + _ = pt.Save(pt.DataNodeCfg.MaxVecIndexBuildConcurrency.Key, fmt.Sprintf("%d", expectedCap*2)) + expectedCap = pt.DataNodeCfg.MaxVecIndexBuildConcurrency.GetAsInt() + resizeVecIndexBuildPool(&config.Event{ + HasUpdated: true, + }) + assert.Equal(t, expectedCap, GetVecIndexBuildPool().Cap()) + + _ = pt.Save(pt.DataNodeCfg.MaxVecIndexBuildConcurrency.Key, "0") + resizeVecIndexBuildPool(&config.Event{ + HasUpdated: true, + }) + assert.Equal(t, expectedCap, GetVecIndexBuildPool().Cap(), "pool shall not be resized when newSize is 0") + + _ = pt.Save(pt.DataNodeCfg.MaxVecIndexBuildConcurrency.Key, "invalid") + resizeVecIndexBuildPool(&config.Event{ + HasUpdated: true, + }) + assert.Equal(t, expectedCap, GetVecIndexBuildPool().Cap()) + }) +} diff --git a/internal/datanode/index/scheduler.go b/internal/datanode/index/scheduler.go index ce175c8764..7d48973efe 100644 --- a/internal/datanode/index/scheduler.go +++ b/internal/datanode/index/scheduler.go @@ -21,7 +21,6 @@ import ( "context" "runtime/debug" "sync" - "time" "github.com/cockroachdb/errors" "go.uber.org/atomic" @@ -227,7 +226,7 @@ func getStateFromError(err error) indexpb.JobState { return indexpb.JobState_JobStateRetry } -func (sched *TaskScheduler) processTask(t Task, q TaskQueue) { +func (sched *TaskScheduler) processTask(t Task) { wrap := func(fn func(ctx context.Context) error) error { select { case <-t.Ctx().Done(): @@ -264,17 +263,16 @@ func (sched *TaskScheduler) indexBuildLoop() { return case <-sched.TaskQueue.utChan(): t := sched.TaskQueue.PopUnissuedTask() - for { - totalSlot := CalculateNodeSlots() - availableSlot := totalSlot - sched.TaskQueue.GetActiveSlot() - if availableSlot >= t.GetSlot() || totalSlot == availableSlot { - go func(t Task) { - sched.processTask(t, sched.TaskQueue) - }(t) - break + go func(t Task) { + if t.IsVectorIndex() { + GetVecIndexBuildPool().Submit(func() (any, error) { + sched.processTask(t) + return nil, nil + }) + } else { + sched.processTask(t) } - time.Sleep(time.Millisecond * 50) - } + }(t) } } } diff --git a/internal/datanode/index/scheduler_test.go b/internal/datanode/index/scheduler_test.go index 501484bc24..dd2c051545 100644 --- a/internal/datanode/index/scheduler_test.go +++ b/internal/datanode/index/scheduler_test.go @@ -152,6 +152,10 @@ func (t *fakeTask) GetState() indexpb.JobState { return t.retstate } +func (t *fakeTask) IsVectorIndex() bool { + return false +} + var ( idLock sync.Mutex id = 0 diff --git a/internal/datanode/index/task.go b/internal/datanode/index/task.go index 6f96cc339b..6d82006780 100644 --- a/internal/datanode/index/task.go +++ b/internal/datanode/index/task.go @@ -46,4 +46,5 @@ type Task interface { PostExecute(context.Context) error Reset() GetSlot() int64 + IsVectorIndex() bool } diff --git a/internal/datanode/index/task_analyze.go b/internal/datanode/index/task_analyze.go index 259fe6a1ab..55e7ec658e 100644 --- a/internal/datanode/index/task_analyze.go +++ b/internal/datanode/index/task_analyze.go @@ -75,6 +75,10 @@ func (at *analyzeTask) GetSlot() int64 { return at.req.GetTaskSlot() } +func (at *analyzeTask) IsVectorIndex() bool { + return false +} + func (at *analyzeTask) PreExecute(ctx context.Context) error { at.queueDur = at.tr.RecordSpan() log := log.Ctx(ctx).With(zap.String("clusterID", at.req.GetClusterID()), diff --git a/internal/datanode/index/task_index.go b/internal/datanode/index/task_index.go index 098170ff7a..39f0a01ac8 100644 --- a/internal/datanode/index/task_index.go +++ b/internal/datanode/index/task_index.go @@ -143,6 +143,11 @@ func (it *indexBuildTask) GetSlot() int64 { return it.req.GetTaskSlot() } +func (it *indexBuildTask) IsVectorIndex() bool { + indexType := GetIndexType(it.req.GetIndexParams()) + return vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType) +} + func (it *indexBuildTask) PreExecute(ctx context.Context) error { it.queueDur = it.tr.RecordSpan() log.Ctx(ctx).Info("Begin to prepare indexBuildTask", zap.Int64("buildID", it.req.GetBuildID()), diff --git a/internal/datanode/index/task_stats.go b/internal/datanode/index/task_stats.go index 8259edc0b3..c3cf8de1df 100644 --- a/internal/datanode/index/task_stats.go +++ b/internal/datanode/index/task_stats.go @@ -136,6 +136,10 @@ func (st *statsTask) GetSlot() int64 { return st.req.GetTaskSlot() } +func (st *statsTask) IsVectorIndex() bool { + return false +} + func (st *statsTask) PreExecute(ctx context.Context) error { ctx, span := otel.Tracer(typeutil.IndexNodeRole).Start(ctx, fmt.Sprintf("Stats-PreExecute-%s-%d", st.req.GetClusterID(), st.req.GetTaskID())) defer span.End() diff --git a/internal/datanode/index/util.go b/internal/datanode/index/util.go index bc64dc8a21..59c32e6a15 100644 --- a/internal/datanode/index/util.go +++ b/internal/datanode/index/util.go @@ -96,3 +96,12 @@ func CalculateNodeSlots() int64 { } return totalSlot } + +func GetIndexType(indexParams []*commonpb.KeyValuePair) string { + for _, param := range indexParams { + if param.Key == common.IndexTypeKey { + return param.Value + } + } + return "" +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index d356063d6a..24aa99a2cb 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -6028,7 +6028,8 @@ type dataNodeConfig struct { DeltalogFormat ParamItem `refreshable:"false"` // index services config - BuildParallel ParamItem `refreshable:"false"` + BuildParallel ParamItem `refreshable:"false"` + MaxVecIndexBuildConcurrency ParamItem `refreshable:"true"` WorkerSlotUnit ParamItem `refreshable:"true"` StandaloneSlotRatio ParamItem `refreshable:"false"` @@ -6469,6 +6470,14 @@ if this parameter <= 0, will set it as 10`, } p.BuildParallel.Init(base.mgr) + p.MaxVecIndexBuildConcurrency = ParamItem{ + Key: "dataNode.index.maxVecIndexBuildConcurrency", + Version: "2.6.9", + DefaultValue: "4", + Export: false, + } + p.MaxVecIndexBuildConcurrency.Init(base.mgr) + p.WorkerSlotUnit = ParamItem{ Key: "dataNode.workerSlotUnit", Version: "2.5.7", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 86fefd5251..a535cfb698 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -669,6 +669,8 @@ func TestComponentParam(t *testing.T) { // compaction assert.Equal(t, 10, Params.MaxCompactionConcurrency.GetAsInt()) + assert.Equal(t, 4, Params.MaxVecIndexBuildConcurrency.GetAsInt()) + // clustering compaction params.Save("datanode.clusteringCompaction.memoryBufferRatio", "0.1") assert.Equal(t, 0.1, Params.ClusteringCompactionMemoryBufferRatio.GetAsFloat())