From 9fbd41a97da1d02dae232d770ba2ac2ba1a3082f Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Wed, 23 Jul 2025 21:28:54 +0800 Subject: [PATCH] fix: Adjust binlog and parquet reader buffer size for import (#43495) 1. Modify the binlog reader to stop reading a fixed 4096 rows and instead use the calculated bufferSize to avoid generating small binlogs. 2. Use a fixed bufferSize (32MB) for the Parquet reader to prevent OOM. issue: https://github.com/milvus-io/milvus/issues/43387 --------- Signed-off-by: bigsheeper --- configs/milvus.yaml | 1 - internal/datanode/importv2/scheduler.go | 8 ++++---- internal/util/importutilv2/binlog/reader.go | 11 ++++++++++- internal/util/importutilv2/binlog/reader_test.go | 10 +++++----- internal/util/importutilv2/parquet/reader.go | 4 +++- internal/util/importutilv2/reader.go | 2 +- pkg/util/paramtable/component_param.go | 11 ----------- pkg/util/paramtable/component_param_test.go | 1 - 8 files changed, 23 insertions(+), 25 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 0b01da6594..fd7a30c49e 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -754,7 +754,6 @@ dataNode: maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files. readBufferSizeInMB: 16 # The base insert buffer size (in MB) during import. The actual buffer size will be dynamically calculated based on the number of shards. readDeleteBufferSizeInMB: 16 # The delete buffer size (in MB) during import. - maxTaskSlotNum: 16 # The maximum number of slots occupied by each import/pre-import task. memoryLimitPercentage: 20 # The percentage of memory limit for import/pre-import tasks. compaction: levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode diff --git a/internal/datanode/importv2/scheduler.go b/internal/datanode/importv2/scheduler.go index ddbbf2bf63..bbcd1fa583 100644 --- a/internal/datanode/importv2/scheduler.go +++ b/internal/datanode/importv2/scheduler.go @@ -94,14 +94,14 @@ func (s *scheduler) scheduleTasks() { } } - log.Info("processing selected tasks", - zap.Int("pending", len(pendingTasks)), - zap.Int("selected", len(selectedTasks))) - if len(selectedTasks) == 0 { return } + log.Info("processing selected tasks", + zap.Int("pending", len(pendingTasks)), + zap.Int("selected", len(selectedTasks))) + futures := make(map[int64][]*conc.Future[any]) for _, task := range selectedTasks { fs := task.Execute() diff --git a/internal/util/importutilv2/binlog/reader.go b/internal/util/importutilv2/binlog/reader.go index e00bb90a5d..79422d5f6f 100644 --- a/internal/util/importutilv2/binlog/reader.go +++ b/internal/util/importutilv2/binlog/reader.go @@ -41,6 +41,7 @@ type reader struct { storageVersion int64 fileSize *atomic.Int64 + bufferSize int deleteData map[any]typeutil.Timestamp // pk2ts insertLogs map[int64][]string // fieldID (or fieldGroupID if storage v2) -> binlogs @@ -56,6 +57,7 @@ func NewReader(ctx context.Context, paths []string, tsStart, tsEnd uint64, + bufferSize int, ) (*reader, error) { systemFieldsAbsent := true for _, field := range schema.Fields { @@ -73,6 +75,7 @@ func NewReader(ctx context.Context, schema: schema, storageVersion: storageVersion, fileSize: atomic.NewInt64(0), + bufferSize: bufferSize, } err := r.init(paths, tsStart, tsEnd, storageConfig) if err != nil { @@ -193,7 +196,8 @@ func (r *reader) Read() (*storage.InsertData, error) { if err != nil { return nil, err } - for range 4096 { + rowNum := 0 + for { v, err := r.dr.NextValue() if err == io.EOF { if insertData.GetRowNum() == 0 { @@ -219,6 +223,11 @@ func (r *reader) Read() (*storage.InsertData, error) { if err != nil { return nil, err } + rowNum++ + } + if rowNum%100 == 0 && // Prevent frequent memory check + insertData.GetMemorySize() >= r.bufferSize { + break } } insertData, err = r.filter(insertData) diff --git a/internal/util/importutilv2/binlog/reader_test.go b/internal/util/importutilv2/binlog/reader_test.go index b5559ddeca..937d054658 100644 --- a/internal/util/importutilv2/binlog/reader_test.go +++ b/internal/util/importutilv2/binlog/reader_test.go @@ -334,7 +334,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data cm, originalInsertData := suite.createMockChunk(schema, insertBinlogs, true) cm.EXPECT().Size(mock.Anything, mock.Anything).Return(128, nil) - reader, err := NewReader(context.Background(), cm, schema, &indexpb.StorageConfig{}, storage.StorageV1, []string{insertPrefix, deltaPrefix}, suite.tsStart, suite.tsEnd) + reader, err := NewReader(context.Background(), cm, schema, &indexpb.StorageConfig{}, storage.StorageV1, []string{insertPrefix, deltaPrefix}, suite.tsStart, suite.tsEnd, 64*1024*1024) suite.NoError(err) insertData, err := reader.Read() suite.NoError(err) @@ -531,18 +531,18 @@ func (suite *ReaderSuite) TestVerify() { checkFunc := func() { cm, _ := suite.createMockChunk(schema, insertBinlogs, false) - reader, err := NewReader(context.Background(), cm, schema, &indexpb.StorageConfig{}, storage.StorageV1, []string{insertPrefix, deltaPrefix}, suite.tsStart, suite.tsEnd) + reader, err := NewReader(context.Background(), cm, schema, &indexpb.StorageConfig{}, storage.StorageV1, []string{insertPrefix, deltaPrefix}, suite.tsStart, suite.tsEnd, 64*1024*1024) suite.Error(err) suite.Nil(reader) } // no insert binlogs to import - reader, err := NewReader(context.Background(), nil, schema, &indexpb.StorageConfig{}, storage.StorageV1, []string{}, suite.tsStart, suite.tsEnd) + reader, err := NewReader(context.Background(), nil, schema, &indexpb.StorageConfig{}, storage.StorageV1, []string{}, suite.tsStart, suite.tsEnd, 64*1024*1024) suite.Error(err) suite.Nil(reader) // too many input paths - reader, err = NewReader(context.Background(), nil, schema, &indexpb.StorageConfig{}, storage.StorageV1, []string{insertPrefix, deltaPrefix, "dummy"}, suite.tsStart, suite.tsEnd) + reader, err = NewReader(context.Background(), nil, schema, &indexpb.StorageConfig{}, storage.StorageV1, []string{insertPrefix, deltaPrefix, "dummy"}, suite.tsStart, suite.tsEnd, 64*1024*1024) suite.Error(err) suite.Nil(reader) @@ -695,7 +695,7 @@ func (suite *ReaderSuite) TestZeroDeltaRead() { checkFunc := func(targetSchema *schemapb.CollectionSchema, expectReadBinlogs map[int64][]string) { cm := mockChunkFunc(sourceSchema, expectReadBinlogs) - reader, err := NewReader(context.Background(), cm, targetSchema, &indexpb.StorageConfig{}, storage.StorageV1, []string{insertPrefix, deltaPrefix}, suite.tsStart, suite.tsEnd) + reader, err := NewReader(context.Background(), cm, targetSchema, &indexpb.StorageConfig{}, storage.StorageV1, []string{insertPrefix, deltaPrefix}, suite.tsStart, suite.tsEnd, 64*1024*1024) suite.NoError(err) suite.NotNil(reader) diff --git a/internal/util/importutilv2/parquet/reader.go b/internal/util/importutilv2/parquet/reader.go index 1e324f2fdf..643bb087a0 100644 --- a/internal/util/importutilv2/parquet/reader.go +++ b/internal/util/importutilv2/parquet/reader.go @@ -35,6 +35,8 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/merr" ) +const fileReaderBufferSize = int64(32 * 1024 * 1024) + type reader struct { ctx context.Context cm storage.ChunkManager @@ -57,7 +59,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co return nil, err } r, err := file.NewParquetReader(cmReader, file.WithReadProps(&parquet.ReaderProperties{ - BufferSize: int64(bufferSize), + BufferSize: fileReaderBufferSize, BufferedStreamEnabled: true, })) if err != nil { diff --git a/internal/util/importutilv2/reader.go b/internal/util/importutilv2/reader.go index eb53febcc2..8fe36d24e7 100644 --- a/internal/util/importutilv2/reader.go +++ b/internal/util/importutilv2/reader.go @@ -63,7 +63,7 @@ func NewReader(ctx context.Context, if err != nil { return nil, err } - return binlog.NewReader(ctx, cm, schema, storageConfig, storageVersion, paths, tsStart, tsEnd) + return binlog.NewReader(ctx, cm, schema, storageConfig, storageVersion, paths, tsStart, tsEnd, bufferSize) } fileType, err := GetFileType(importFile) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 37a06c3f56..12e680d157 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -5217,7 +5217,6 @@ type dataNodeConfig struct { MaxImportFileSizeInGB ParamItem `refreshable:"true"` ImportBaseBufferSize ParamItem `refreshable:"true"` ImportDeleteBufferSize ParamItem `refreshable:"true"` - MaxTaskSlotNum ParamItem `refreshable:"true"` ImportMemoryLimitPercentage ParamItem `refreshable:"true"` // Compaction @@ -5552,16 +5551,6 @@ if this parameter <= 0, will set it as 10`, } p.ImportDeleteBufferSize.Init(base.mgr) - p.MaxTaskSlotNum = ParamItem{ - Key: "dataNode.import.maxTaskSlotNum", - Version: "2.4.13", - Doc: "The maximum number of slots occupied by each import/pre-import task.", - DefaultValue: "16", - PanicIfEmpty: false, - Export: true, - } - p.MaxTaskSlotNum.Init(base.mgr) - p.ImportMemoryLimitPercentage = ParamItem{ Key: "dataNode.import.memoryLimitPercentage", Version: "2.5.15", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 2e0b1dd77f..f3428f9ad7 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -623,7 +623,6 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64()) assert.Equal(t, 16*1024*1024, Params.ImportBaseBufferSize.GetAsInt()) assert.Equal(t, 16*1024*1024, Params.ImportDeleteBufferSize.GetAsInt()) - assert.Equal(t, 16, Params.MaxTaskSlotNum.GetAsInt()) assert.Equal(t, 20.0, Params.ImportMemoryLimitPercentage.GetAsFloat()) params.Save("datanode.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))