mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
enhance: Adjust default import buffer size (#42541)
Increase insert buffer size from 16MB to 64MB, while keeping delete buffer size at 16MB. issue: https://github.com/milvus-io/milvus/issues/42518 Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
c40ea7403d
commit
837349dead
@ -743,7 +743,8 @@ dataNode:
|
|||||||
import:
|
import:
|
||||||
maxConcurrentTaskNum: 16 # The maximum number of import/pre-import tasks allowed to run concurrently on a datanode.
|
maxConcurrentTaskNum: 16 # The maximum number of import/pre-import tasks allowed to run concurrently on a datanode.
|
||||||
maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files.
|
maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files.
|
||||||
readBufferSizeInMB: 16 # The data block size (in MB) read from chunk manager by the datanode during import.
|
readBufferSizeInMB: 64 # The insert buffer size (in MB) during import.
|
||||||
|
readDeleteBufferSizeInMB: 16 # The delete buffer size (in MB) during import.
|
||||||
maxTaskSlotNum: 16 # The maximum number of slots occupied by each import/pre-import task.
|
maxTaskSlotNum: 16 # The maximum number of slots occupied by each import/pre-import task.
|
||||||
compaction:
|
compaction:
|
||||||
levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode
|
levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode
|
||||||
|
|||||||
@ -139,7 +139,7 @@ func (t *ImportTask) Clone() Task {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *ImportTask) Execute() []*conc.Future[any] {
|
func (t *ImportTask) Execute() []*conc.Future[any] {
|
||||||
bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024
|
bufferSize := paramtable.Get().DataNodeCfg.ImportInsertBufferSize.GetAsInt() * 1024 * 1024
|
||||||
log.Info("start to import", WrapLogFields(t,
|
log.Info("start to import", WrapLogFields(t,
|
||||||
zap.Int("bufferSize", bufferSize),
|
zap.Int("bufferSize", bufferSize),
|
||||||
zap.Any("schema", t.GetSchema()))...)
|
zap.Any("schema", t.GetSchema()))...)
|
||||||
|
|||||||
@ -127,7 +127,7 @@ func (t *L0ImportTask) Clone() Task {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *L0ImportTask) Execute() []*conc.Future[any] {
|
func (t *L0ImportTask) Execute() []*conc.Future[any] {
|
||||||
bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024
|
bufferSize := paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt() * 1024 * 1024
|
||||||
log.Info("start to import l0", WrapLogFields(t,
|
log.Info("start to import l0", WrapLogFields(t,
|
||||||
zap.Int("bufferSize", bufferSize),
|
zap.Int("bufferSize", bufferSize),
|
||||||
zap.Any("schema", t.GetSchema()))...)
|
zap.Any("schema", t.GetSchema()))...)
|
||||||
|
|||||||
@ -115,7 +115,7 @@ func (t *L0PreImportTask) Clone() Task {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *L0PreImportTask) Execute() []*conc.Future[any] {
|
func (t *L0PreImportTask) Execute() []*conc.Future[any] {
|
||||||
bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024
|
bufferSize := paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt() * 1024 * 1024
|
||||||
log.Info("start to preimport l0", WrapLogFields(t,
|
log.Info("start to preimport l0", WrapLogFields(t,
|
||||||
zap.Int("bufferSize", bufferSize),
|
zap.Int("bufferSize", bufferSize),
|
||||||
zap.Any("schema", t.GetSchema()))...)
|
zap.Any("schema", t.GetSchema()))...)
|
||||||
|
|||||||
@ -124,7 +124,7 @@ func (t *PreImportTask) Clone() Task {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *PreImportTask) Execute() []*conc.Future[any] {
|
func (t *PreImportTask) Execute() []*conc.Future[any] {
|
||||||
bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024
|
bufferSize := paramtable.Get().DataNodeCfg.ImportInsertBufferSize.GetAsInt() * 1024 * 1024
|
||||||
log.Info("start to preimport", WrapLogFields(t,
|
log.Info("start to preimport", WrapLogFields(t,
|
||||||
zap.Int("bufferSize", bufferSize),
|
zap.Int("bufferSize", bufferSize),
|
||||||
zap.Any("schema", t.GetSchema()))...)
|
zap.Any("schema", t.GetSchema()))...)
|
||||||
|
|||||||
@ -5131,7 +5131,8 @@ type dataNodeConfig struct {
|
|||||||
// import
|
// import
|
||||||
MaxConcurrentImportTaskNum ParamItem `refreshable:"true"`
|
MaxConcurrentImportTaskNum ParamItem `refreshable:"true"`
|
||||||
MaxImportFileSizeInGB ParamItem `refreshable:"true"`
|
MaxImportFileSizeInGB ParamItem `refreshable:"true"`
|
||||||
ReadBufferSizeInMB ParamItem `refreshable:"true"`
|
ImportInsertBufferSize ParamItem `refreshable:"true"`
|
||||||
|
ImportDeleteBufferSize ParamItem `refreshable:"true"`
|
||||||
MaxTaskSlotNum ParamItem `refreshable:"true"`
|
MaxTaskSlotNum ParamItem `refreshable:"true"`
|
||||||
|
|
||||||
// Compaction
|
// Compaction
|
||||||
@ -5437,15 +5438,25 @@ if this parameter <= 0, will set it as 10`,
|
|||||||
}
|
}
|
||||||
p.MaxImportFileSizeInGB.Init(base.mgr)
|
p.MaxImportFileSizeInGB.Init(base.mgr)
|
||||||
|
|
||||||
p.ReadBufferSizeInMB = ParamItem{
|
p.ImportInsertBufferSize = ParamItem{
|
||||||
Key: "dataNode.import.readBufferSizeInMB",
|
Key: "dataNode.import.readBufferSizeInMB",
|
||||||
Version: "2.4.0",
|
Version: "2.4.0",
|
||||||
Doc: "The data block size (in MB) read from chunk manager by the datanode during import.",
|
Doc: "The insert buffer size (in MB) during import.",
|
||||||
|
DefaultValue: "64",
|
||||||
|
PanicIfEmpty: false,
|
||||||
|
Export: true,
|
||||||
|
}
|
||||||
|
p.ImportInsertBufferSize.Init(base.mgr)
|
||||||
|
|
||||||
|
p.ImportDeleteBufferSize = ParamItem{
|
||||||
|
Key: "dataNode.import.readDeleteBufferSizeInMB",
|
||||||
|
Version: "2.5.14",
|
||||||
|
Doc: "The delete buffer size (in MB) during import.",
|
||||||
DefaultValue: "16",
|
DefaultValue: "16",
|
||||||
PanicIfEmpty: false,
|
PanicIfEmpty: false,
|
||||||
Export: true,
|
Export: true,
|
||||||
}
|
}
|
||||||
p.ReadBufferSizeInMB.Init(base.mgr)
|
p.ImportDeleteBufferSize.Init(base.mgr)
|
||||||
|
|
||||||
p.MaxTaskSlotNum = ParamItem{
|
p.MaxTaskSlotNum = ParamItem{
|
||||||
Key: "dataNode.import.maxTaskSlotNum",
|
Key: "dataNode.import.maxTaskSlotNum",
|
||||||
|
|||||||
@ -603,7 +603,8 @@ func TestComponentParam(t *testing.T) {
|
|||||||
t.Logf("maxConcurrentImportTaskNum: %d", maxConcurrentImportTaskNum)
|
t.Logf("maxConcurrentImportTaskNum: %d", maxConcurrentImportTaskNum)
|
||||||
assert.Equal(t, 16, maxConcurrentImportTaskNum)
|
assert.Equal(t, 16, maxConcurrentImportTaskNum)
|
||||||
assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64())
|
assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64())
|
||||||
assert.Equal(t, 16, Params.ReadBufferSizeInMB.GetAsInt())
|
assert.Equal(t, 64, Params.ImportInsertBufferSize.GetAsInt())
|
||||||
|
assert.Equal(t, 16, Params.ImportDeleteBufferSize.GetAsInt())
|
||||||
assert.Equal(t, 16, Params.MaxTaskSlotNum.GetAsInt())
|
assert.Equal(t, 16, Params.MaxTaskSlotNum.GetAsInt())
|
||||||
params.Save("datanode.gracefulStopTimeout", "100")
|
params.Save("datanode.gracefulStopTimeout", "100")
|
||||||
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
|
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user