From 39d988cf8d9c307699aa77de557c22cd12a560c5 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Mon, 8 Apr 2024 21:05:17 +0800 Subject: [PATCH] enhance: Use an individual buffer size parameter for imports (#31833) (#31937) Use an individual buffer size parameter for imports and set buffer size to 64MB. issue: https://github.com/milvus-io/milvus/issues/28521 pr: https://github.com/milvus-io/milvus/pull/31833 Signed-off-by: bigsheeper --- configs/milvus.yaml | 1 + internal/datanode/importv2/scheduler.go | 4 ++-- pkg/util/paramtable/component_param.go | 11 +++++++++++ pkg/util/paramtable/component_param_test.go | 1 + 4 files changed, 15 insertions(+), 2 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 93ddcd6b21..da9cf325f9 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -504,6 +504,7 @@ dataNode: import: maxConcurrentTaskNum: 16 # The maximum number of import/pre-import tasks allowed to run concurrently on a datanode. maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files. + readBufferSizeInMB: 16 # The data block size (in MB) read from chunk manager by the datanode during import. # Configures the system log output. log: diff --git a/internal/datanode/importv2/scheduler.go b/internal/datanode/importv2/scheduler.go index d681344888..9e9b8e61d4 100644 --- a/internal/datanode/importv2/scheduler.go +++ b/internal/datanode/importv2/scheduler.go @@ -150,7 +150,7 @@ func (s *scheduler) handleErr(task Task, err error, msg string) { } func (s *scheduler) PreImport(task Task) []*conc.Future[any] { - bufferSize := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt() + bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024 log.Info("start to preimport", WrapLogFields(task, zap.Int("bufferSize", bufferSize), zap.Any("schema", task.GetSchema()))...) @@ -241,7 +241,7 @@ func (s *scheduler) readFileStat(reader importutilv2.Reader, task Task, fileIdx } func (s *scheduler) Import(task Task) []*conc.Future[any] { - bufferSize := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt() + bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024 log.Info("start to import", WrapLogFields(task, zap.Int("bufferSize", bufferSize), zap.Any("schema", task.GetSchema()))...) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index f5237260ee..e5c09bb108 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3169,6 +3169,7 @@ type dataNodeConfig struct { // import MaxConcurrentImportTaskNum ParamItem `refreshable:"true"` MaxImportFileSizeInGB ParamItem `refreshable:"true"` + ReadBufferSizeInMB ParamItem `refreshable:"true"` // Compaction L0BatchMemoryRatio ParamItem `refreshable:"true"` @@ -3437,6 +3438,16 @@ func (p *dataNodeConfig) init(base *BaseTable) { } p.MaxImportFileSizeInGB.Init(base.mgr) + p.ReadBufferSizeInMB = ParamItem{ + Key: "datanode.import.readBufferSizeInMB", + Version: "2.4.0", + Doc: "The data block size (in MB) read from chunk manager by the datanode during import.", + DefaultValue: "16", + PanicIfEmpty: false, + Export: true, + } + p.ReadBufferSizeInMB.Init(base.mgr) + p.L0BatchMemoryRatio = ParamItem{ Key: "datanode.compaction.levelZeroBatchMemoryRatio", Version: "2.4.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 9aad84b749..effba260a1 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -437,6 +437,7 @@ func TestComponentParam(t *testing.T) { t.Logf("maxConcurrentImportTaskNum: %d", maxConcurrentImportTaskNum) assert.Equal(t, 16, maxConcurrentImportTaskNum) assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64()) + assert.Equal(t, 16, Params.ReadBufferSizeInMB.GetAsInt()) params.Save("datanode.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) })