diff --git a/configs/milvus.yaml b/configs/milvus.yaml index d99532360d..449e9121d3 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -707,7 +707,8 @@ dataNode: import: maxConcurrentTaskNum: 16 # The maximum number of import/pre-import tasks allowed to run concurrently on a datanode. maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files. - readBufferSizeInMB: 16 # The data block size (in MB) read from chunk manager by the datanode during import. + readBufferSizeInMB: 64 # The insert buffer size (in MB) during import. + readDeleteBufferSizeInMB: 16 # The delete buffer size (in MB) during import. compaction: levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1. diff --git a/internal/datanode/importv2/task_import.go b/internal/datanode/importv2/task_import.go index a6076943e3..cbdcd86a55 100644 --- a/internal/datanode/importv2/task_import.go +++ b/internal/datanode/importv2/task_import.go @@ -143,7 +143,7 @@ func (t *ImportTask) Clone() Task { } func (t *ImportTask) Execute() []*conc.Future[any] { - bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024 + bufferSize := paramtable.Get().DataNodeCfg.ImportInsertBufferSize.GetAsInt() * 1024 * 1024 log.Info("start to import", WrapLogFields(t, zap.Int("bufferSize", bufferSize), zap.Any("schema", t.GetSchema()))...) diff --git a/internal/datanode/importv2/task_l0_import.go b/internal/datanode/importv2/task_l0_import.go index 4fa7c67d77..30b639404e 100644 --- a/internal/datanode/importv2/task_l0_import.go +++ b/internal/datanode/importv2/task_l0_import.go @@ -127,7 +127,7 @@ func (t *L0ImportTask) Clone() Task { } func (t *L0ImportTask) Execute() []*conc.Future[any] { - bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024 + bufferSize := paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt() * 1024 * 1024 log.Info("start to import l0", WrapLogFields(t, zap.Int("bufferSize", bufferSize), zap.Any("schema", t.GetSchema()))...) diff --git a/internal/datanode/importv2/task_l0_preimport.go b/internal/datanode/importv2/task_l0_preimport.go index b49f2dab77..688830a457 100644 --- a/internal/datanode/importv2/task_l0_preimport.go +++ b/internal/datanode/importv2/task_l0_preimport.go @@ -115,7 +115,7 @@ func (t *L0PreImportTask) Clone() Task { } func (t *L0PreImportTask) Execute() []*conc.Future[any] { - bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024 + bufferSize := paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt() * 1024 * 1024 log.Info("start to preimport l0", WrapLogFields(t, zap.Int("bufferSize", bufferSize), zap.Any("schema", t.GetSchema()))...) diff --git a/internal/datanode/importv2/task_preimport.go b/internal/datanode/importv2/task_preimport.go index 055915ab89..b3225d063d 100644 --- a/internal/datanode/importv2/task_preimport.go +++ b/internal/datanode/importv2/task_preimport.go @@ -127,7 +127,7 @@ func (t *PreImportTask) Clone() Task { } func (t *PreImportTask) Execute() []*conc.Future[any] { - bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024 + bufferSize := paramtable.Get().DataNodeCfg.ImportInsertBufferSize.GetAsInt() * 1024 * 1024 log.Info("start to preimport", WrapLogFields(t, zap.Int("bufferSize", bufferSize), zap.Any("schema", t.GetSchema()))...) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 75269a9376..6aa9d76628 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -4749,7 +4749,8 @@ type dataNodeConfig struct { // import MaxConcurrentImportTaskNum ParamItem `refreshable:"true"` MaxImportFileSizeInGB ParamItem `refreshable:"true"` - ReadBufferSizeInMB ParamItem `refreshable:"true"` + ImportInsertBufferSize ParamItem `refreshable:"true"` + ImportDeleteBufferSize ParamItem `refreshable:"true"` // Compaction L0BatchMemoryRatio ParamItem `refreshable:"true"` @@ -5049,15 +5050,25 @@ if this parameter <= 0, will set it as 10`, } p.MaxImportFileSizeInGB.Init(base.mgr) - p.ReadBufferSizeInMB = ParamItem{ + p.ImportInsertBufferSize = ParamItem{ Key: "dataNode.import.readBufferSizeInMB", Version: "2.4.0", - Doc: "The data block size (in MB) read from chunk manager by the datanode during import.", + Doc: "The insert buffer size (in MB) during import.", + DefaultValue: "64", + PanicIfEmpty: false, + Export: true, + } + p.ImportInsertBufferSize.Init(base.mgr) + + p.ImportDeleteBufferSize = ParamItem{ + Key: "dataNode.import.readDeleteBufferSizeInMB", + Version: "2.5.14", + Doc: "The delete buffer size (in MB) during import.", DefaultValue: "16", PanicIfEmpty: false, Export: true, } - p.ReadBufferSizeInMB.Init(base.mgr) + p.ImportDeleteBufferSize.Init(base.mgr) p.L0BatchMemoryRatio = ParamItem{ Key: "dataNode.compaction.levelZeroBatchMemoryRatio", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 3fed122211..2535ca717c 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -602,7 +602,8 @@ func TestComponentParam(t *testing.T) { t.Logf("maxConcurrentImportTaskNum: %d", maxConcurrentImportTaskNum) assert.Equal(t, 16, maxConcurrentImportTaskNum) assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64()) - assert.Equal(t, 16, Params.ReadBufferSizeInMB.GetAsInt()) + assert.Equal(t, 64, Params.ImportInsertBufferSize.GetAsInt()) + assert.Equal(t, 16, Params.ImportDeleteBufferSize.GetAsInt()) params.Save("datanode.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) assert.Equal(t, 16, Params.SlotCap.GetAsInt())