From d7b29063184e085f81e612919cf6180beffe5b05 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Fri, 25 Oct 2024 16:51:29 +0800 Subject: [PATCH] enhance: Make dataNode.import.maxConcurrentTaskNum dynamic (#37102) Resize import execution pool when config `dataNode.import.maxConcurrentTaskNum` update. issue: https://github.com/milvus-io/milvus/issues/37095 Signed-off-by: bigsheeper --- internal/datanode/importv2/pool.go | 32 ++++++++++++- internal/datanode/importv2/pool_test.go | 63 +++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 2 deletions(-) create mode 100644 internal/datanode/importv2/pool_test.go diff --git a/internal/datanode/importv2/pool.go b/internal/datanode/importv2/pool.go index 3558477773..121aae28d8 100644 --- a/internal/datanode/importv2/pool.go +++ b/internal/datanode/importv2/pool.go @@ -17,8 +17,14 @@ package importv2 import ( + "context" + "runtime" "sync" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/config" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -29,10 +35,32 @@ var ( ) func initExecPool() { + pt := paramtable.Get() + initPoolSize := paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt() execPool = conc.NewPool[any]( - paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt(), - conc.WithPreAlloc(true), + initPoolSize, + conc.WithPreAlloc(false), // pre alloc must be false to resize pool dynamically, use warmup to alloc worker here + conc.WithDisablePurge(true), ) + conc.WarmupPool(execPool, runtime.LockOSThread) + + watchKey := pt.DataNodeCfg.MaxConcurrentImportTaskNum.Key + pt.Watch(watchKey, config.NewHandler(watchKey, resizeExecPool)) + log.Info("init import execution pool done", zap.Int("size", initPoolSize)) +} + +func resizeExecPool(evt *config.Event) { + if evt.HasUpdated { + newSize := paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt() + log := log.Ctx(context.Background()).With(zap.Int("newSize", newSize)) + + err := GetExecPool().Resize(newSize) + if err != nil { + log.Warn("failed to resize pool", zap.Error(err)) + return + } + log.Info("pool resize successfully") + } } func GetExecPool() *conc.Pool[any] { diff --git a/internal/datanode/importv2/pool_test.go b/internal/datanode/importv2/pool_test.go new file mode 100644 index 0000000000..06873c6d31 --- /dev/null +++ b/internal/datanode/importv2/pool_test.go @@ -0,0 +1,63 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importv2 + +import ( + "fmt" + "testing" + + "github.com/milvus-io/milvus/pkg/config" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/stretchr/testify/assert" +) + +func TestResizePools(t *testing.T) { + paramtable.Get().Init(paramtable.NewBaseTable(paramtable.SkipRemote(true))) + pt := paramtable.Get() + + defer func() { + _ = pt.Reset(pt.DataNodeCfg.MaxConcurrentImportTaskNum.Key) + }() + + t.Run("ExecPool", func(t *testing.T) { + expectedCap := pt.DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt() + assert.Equal(t, expectedCap, GetExecPool().Cap()) + resizeExecPool(&config.Event{ + HasUpdated: true, + }) + assert.Equal(t, expectedCap, GetExecPool().Cap()) + + _ = pt.Save(pt.DataNodeCfg.MaxConcurrentImportTaskNum.Key, fmt.Sprintf("%d", expectedCap*2)) + expectedCap = pt.DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt() + resizeExecPool(&config.Event{ + HasUpdated: true, + }) + assert.Equal(t, expectedCap, GetExecPool().Cap()) + + _ = pt.Save(pt.DataNodeCfg.MaxConcurrentImportTaskNum.Key, "0") + resizeExecPool(&config.Event{ + HasUpdated: true, + }) + assert.Equal(t, expectedCap, GetExecPool().Cap(), "pool shall not be resized when newSize is 0") + + _ = pt.Save(pt.DataNodeCfg.MaxConcurrentImportTaskNum.Key, "invalid") + resizeExecPool(&config.Event{ + HasUpdated: true, + }) + assert.Equal(t, expectedCap, GetExecPool().Cap()) + }) +}