fix: Adjust binlog and parquet reader buffer size for import (#43495)

1. Modify the binlog reader to stop reading a fixed 4096 rows and
instead use the calculated bufferSize to avoid generating small binlogs.
2. Use a fixed bufferSize (32MB) for the Parquet reader to prevent OOM.

issue: https://github.com/milvus-io/milvus/issues/43387

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-07-23 21:28:54 +08:00 committed by GitHub
parent ed57650b52
commit 9fbd41a97d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 23 additions and 25 deletions

View File

@ -754,7 +754,6 @@ dataNode:
maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files.
readBufferSizeInMB: 16 # The base insert buffer size (in MB) during import. The actual buffer size will be dynamically calculated based on the number of shards.
readDeleteBufferSizeInMB: 16 # The delete buffer size (in MB) during import.
maxTaskSlotNum: 16 # The maximum number of slots occupied by each import/pre-import task.
memoryLimitPercentage: 20 # The percentage of memory limit for import/pre-import tasks.
compaction:
levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode

View File

@ -94,14 +94,14 @@ func (s *scheduler) scheduleTasks() {
}
}
log.Info("processing selected tasks",
zap.Int("pending", len(pendingTasks)),
zap.Int("selected", len(selectedTasks)))
if len(selectedTasks) == 0 {
return
}
log.Info("processing selected tasks",
zap.Int("pending", len(pendingTasks)),
zap.Int("selected", len(selectedTasks)))
futures := make(map[int64][]*conc.Future[any])
for _, task := range selectedTasks {
fs := task.Execute()

View File

@ -41,6 +41,7 @@ type reader struct {
storageVersion int64
fileSize *atomic.Int64
bufferSize int
deleteData map[any]typeutil.Timestamp // pk2ts
insertLogs map[int64][]string // fieldID (or fieldGroupID if storage v2) -> binlogs
@ -56,6 +57,7 @@ func NewReader(ctx context.Context,
paths []string,
tsStart,
tsEnd uint64,
bufferSize int,
) (*reader, error) {
systemFieldsAbsent := true
for _, field := range schema.Fields {
@ -73,6 +75,7 @@ func NewReader(ctx context.Context,
schema: schema,
storageVersion: storageVersion,
fileSize: atomic.NewInt64(0),
bufferSize: bufferSize,
}
err := r.init(paths, tsStart, tsEnd, storageConfig)
if err != nil {
@ -193,7 +196,8 @@ func (r *reader) Read() (*storage.InsertData, error) {
if err != nil {
return nil, err
}
for range 4096 {
rowNum := 0
for {
v, err := r.dr.NextValue()
if err == io.EOF {
if insertData.GetRowNum() == 0 {
@ -219,6 +223,11 @@ func (r *reader) Read() (*storage.InsertData, error) {
if err != nil {
return nil, err
}
rowNum++
}
if rowNum%100 == 0 && // Prevent frequent memory check
insertData.GetMemorySize() >= r.bufferSize {
break
}
}
insertData, err = r.filter(insertData)

View File

@ -334,7 +334,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
cm, originalInsertData := suite.createMockChunk(schema, insertBinlogs, true)
cm.EXPECT().Size(mock.Anything, mock.Anything).Return(128, nil)
reader, err := NewReader(context.Background(), cm, schema, &indexpb.StorageConfig{}, storage.StorageV1, []string{insertPrefix, deltaPrefix}, suite.tsStart, suite.tsEnd)
reader, err := NewReader(context.Background(), cm, schema, &indexpb.StorageConfig{}, storage.StorageV1, []string{insertPrefix, deltaPrefix}, suite.tsStart, suite.tsEnd, 64*1024*1024)
suite.NoError(err)
insertData, err := reader.Read()
suite.NoError(err)
@ -531,18 +531,18 @@ func (suite *ReaderSuite) TestVerify() {
checkFunc := func() {
cm, _ := suite.createMockChunk(schema, insertBinlogs, false)
reader, err := NewReader(context.Background(), cm, schema, &indexpb.StorageConfig{}, storage.StorageV1, []string{insertPrefix, deltaPrefix}, suite.tsStart, suite.tsEnd)
reader, err := NewReader(context.Background(), cm, schema, &indexpb.StorageConfig{}, storage.StorageV1, []string{insertPrefix, deltaPrefix}, suite.tsStart, suite.tsEnd, 64*1024*1024)
suite.Error(err)
suite.Nil(reader)
}
// no insert binlogs to import
reader, err := NewReader(context.Background(), nil, schema, &indexpb.StorageConfig{}, storage.StorageV1, []string{}, suite.tsStart, suite.tsEnd)
reader, err := NewReader(context.Background(), nil, schema, &indexpb.StorageConfig{}, storage.StorageV1, []string{}, suite.tsStart, suite.tsEnd, 64*1024*1024)
suite.Error(err)
suite.Nil(reader)
// too many input paths
reader, err = NewReader(context.Background(), nil, schema, &indexpb.StorageConfig{}, storage.StorageV1, []string{insertPrefix, deltaPrefix, "dummy"}, suite.tsStart, suite.tsEnd)
reader, err = NewReader(context.Background(), nil, schema, &indexpb.StorageConfig{}, storage.StorageV1, []string{insertPrefix, deltaPrefix, "dummy"}, suite.tsStart, suite.tsEnd, 64*1024*1024)
suite.Error(err)
suite.Nil(reader)
@ -695,7 +695,7 @@ func (suite *ReaderSuite) TestZeroDeltaRead() {
checkFunc := func(targetSchema *schemapb.CollectionSchema, expectReadBinlogs map[int64][]string) {
cm := mockChunkFunc(sourceSchema, expectReadBinlogs)
reader, err := NewReader(context.Background(), cm, targetSchema, &indexpb.StorageConfig{}, storage.StorageV1, []string{insertPrefix, deltaPrefix}, suite.tsStart, suite.tsEnd)
reader, err := NewReader(context.Background(), cm, targetSchema, &indexpb.StorageConfig{}, storage.StorageV1, []string{insertPrefix, deltaPrefix}, suite.tsStart, suite.tsEnd, 64*1024*1024)
suite.NoError(err)
suite.NotNil(reader)

View File

@ -35,6 +35,8 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/merr"
)
const fileReaderBufferSize = int64(32 * 1024 * 1024)
type reader struct {
ctx context.Context
cm storage.ChunkManager
@ -57,7 +59,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co
return nil, err
}
r, err := file.NewParquetReader(cmReader, file.WithReadProps(&parquet.ReaderProperties{
BufferSize: int64(bufferSize),
BufferSize: fileReaderBufferSize,
BufferedStreamEnabled: true,
}))
if err != nil {

View File

@ -63,7 +63,7 @@ func NewReader(ctx context.Context,
if err != nil {
return nil, err
}
return binlog.NewReader(ctx, cm, schema, storageConfig, storageVersion, paths, tsStart, tsEnd)
return binlog.NewReader(ctx, cm, schema, storageConfig, storageVersion, paths, tsStart, tsEnd, bufferSize)
}
fileType, err := GetFileType(importFile)

View File

@ -5217,7 +5217,6 @@ type dataNodeConfig struct {
MaxImportFileSizeInGB ParamItem `refreshable:"true"`
ImportBaseBufferSize ParamItem `refreshable:"true"`
ImportDeleteBufferSize ParamItem `refreshable:"true"`
MaxTaskSlotNum ParamItem `refreshable:"true"`
ImportMemoryLimitPercentage ParamItem `refreshable:"true"`
// Compaction
@ -5552,16 +5551,6 @@ if this parameter <= 0, will set it as 10`,
}
p.ImportDeleteBufferSize.Init(base.mgr)
p.MaxTaskSlotNum = ParamItem{
Key: "dataNode.import.maxTaskSlotNum",
Version: "2.4.13",
Doc: "The maximum number of slots occupied by each import/pre-import task.",
DefaultValue: "16",
PanicIfEmpty: false,
Export: true,
}
p.MaxTaskSlotNum.Init(base.mgr)
p.ImportMemoryLimitPercentage = ParamItem{
Key: "dataNode.import.memoryLimitPercentage",
Version: "2.5.15",

View File

@ -623,7 +623,6 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64())
assert.Equal(t, 16*1024*1024, Params.ImportBaseBufferSize.GetAsInt())
assert.Equal(t, 16*1024*1024, Params.ImportDeleteBufferSize.GetAsInt())
assert.Equal(t, 16, Params.MaxTaskSlotNum.GetAsInt())
assert.Equal(t, 20.0, Params.ImportMemoryLimitPercentage.GetAsFloat())
params.Save("datanode.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))