enhance: [2.5] Adjust default import buffer size (#42542)

Increase insert buffer size from 16MB to 64MB, while keeping delete
buffer size at 16MB.

issue: https://github.com/milvus-io/milvus/issues/42518

pr: https://github.com/milvus-io/milvus/pull/42541

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-06-05 18:46:33 +08:00 committed by GitHub
parent 72a8777c9d
commit 28aa364bf7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 23 additions and 10 deletions

View File

@ -707,7 +707,8 @@ dataNode:
import:
maxConcurrentTaskNum: 16 # The maximum number of import/pre-import tasks allowed to run concurrently on a datanode.
maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files.
readBufferSizeInMB: 16 # The data block size (in MB) read from chunk manager by the datanode during import.
readBufferSizeInMB: 64 # The insert buffer size (in MB) during import.
readDeleteBufferSizeInMB: 16 # The delete buffer size (in MB) during import.
compaction:
levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode
levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1.

View File

@ -143,7 +143,7 @@ func (t *ImportTask) Clone() Task {
}
func (t *ImportTask) Execute() []*conc.Future[any] {
bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024
bufferSize := paramtable.Get().DataNodeCfg.ImportInsertBufferSize.GetAsInt() * 1024 * 1024
log.Info("start to import", WrapLogFields(t,
zap.Int("bufferSize", bufferSize),
zap.Any("schema", t.GetSchema()))...)

View File

@ -127,7 +127,7 @@ func (t *L0ImportTask) Clone() Task {
}
func (t *L0ImportTask) Execute() []*conc.Future[any] {
bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024
bufferSize := paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt() * 1024 * 1024
log.Info("start to import l0", WrapLogFields(t,
zap.Int("bufferSize", bufferSize),
zap.Any("schema", t.GetSchema()))...)

View File

@ -115,7 +115,7 @@ func (t *L0PreImportTask) Clone() Task {
}
func (t *L0PreImportTask) Execute() []*conc.Future[any] {
bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024
bufferSize := paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt() * 1024 * 1024
log.Info("start to preimport l0", WrapLogFields(t,
zap.Int("bufferSize", bufferSize),
zap.Any("schema", t.GetSchema()))...)

View File

@ -127,7 +127,7 @@ func (t *PreImportTask) Clone() Task {
}
func (t *PreImportTask) Execute() []*conc.Future[any] {
bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024
bufferSize := paramtable.Get().DataNodeCfg.ImportInsertBufferSize.GetAsInt() * 1024 * 1024
log.Info("start to preimport", WrapLogFields(t,
zap.Int("bufferSize", bufferSize),
zap.Any("schema", t.GetSchema()))...)

View File

@ -4749,7 +4749,8 @@ type dataNodeConfig struct {
// import
MaxConcurrentImportTaskNum ParamItem `refreshable:"true"`
MaxImportFileSizeInGB ParamItem `refreshable:"true"`
ReadBufferSizeInMB ParamItem `refreshable:"true"`
ImportInsertBufferSize ParamItem `refreshable:"true"`
ImportDeleteBufferSize ParamItem `refreshable:"true"`
// Compaction
L0BatchMemoryRatio ParamItem `refreshable:"true"`
@ -5049,15 +5050,25 @@ if this parameter <= 0, will set it as 10`,
}
p.MaxImportFileSizeInGB.Init(base.mgr)
p.ReadBufferSizeInMB = ParamItem{
p.ImportInsertBufferSize = ParamItem{
Key: "dataNode.import.readBufferSizeInMB",
Version: "2.4.0",
Doc: "The data block size (in MB) read from chunk manager by the datanode during import.",
Doc: "The insert buffer size (in MB) during import.",
DefaultValue: "64",
PanicIfEmpty: false,
Export: true,
}
p.ImportInsertBufferSize.Init(base.mgr)
p.ImportDeleteBufferSize = ParamItem{
Key: "dataNode.import.readDeleteBufferSizeInMB",
Version: "2.5.14",
Doc: "The delete buffer size (in MB) during import.",
DefaultValue: "16",
PanicIfEmpty: false,
Export: true,
}
p.ReadBufferSizeInMB.Init(base.mgr)
p.ImportDeleteBufferSize.Init(base.mgr)
p.L0BatchMemoryRatio = ParamItem{
Key: "dataNode.compaction.levelZeroBatchMemoryRatio",

View File

@ -602,7 +602,8 @@ func TestComponentParam(t *testing.T) {
t.Logf("maxConcurrentImportTaskNum: %d", maxConcurrentImportTaskNum)
assert.Equal(t, 16, maxConcurrentImportTaskNum)
assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64())
assert.Equal(t, 16, Params.ReadBufferSizeInMB.GetAsInt())
assert.Equal(t, 64, Params.ImportInsertBufferSize.GetAsInt())
assert.Equal(t, 16, Params.ImportDeleteBufferSize.GetAsInt())
params.Save("datanode.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
assert.Equal(t, 16, Params.SlotCap.GetAsInt())