fix: [StorageV2] Correct read and write buffer size (#43335)

Correct read and buffer size to 64MB to prevent OOM during clustering
compaction.

issue: https://github.com/milvus-io/milvus/issues/43310

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-07-16 14:28:52 +08:00 committed by GitHub
parent 003c348d6d
commit b69e601fe1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 34 additions and 21 deletions

View File

@ -90,8 +90,10 @@ type clusteringCompactionTask struct {
clusteringKeyField *schemapb.FieldSchema clusteringKeyField *schemapb.FieldSchema
primaryKeyField *schemapb.FieldSchema primaryKeyField *schemapb.FieldSchema
memoryBufferSize int64 memoryLimit int64
clusterBuffers []*ClusterBuffer bufferSize int64
clusterBuffers []*ClusterBuffer
// scalar // scalar
keyToBufferFunc func(interface{}) *ClusterBuffer keyToBufferFunc func(interface{}) *ClusterBuffer
// vector // vector
@ -235,11 +237,12 @@ func (t *clusteringCompactionTask) init() error {
t.primaryKeyField = pkField t.primaryKeyField = pkField
t.isVectorClusteringKey = typeutil.IsVectorType(t.clusteringKeyField.DataType) t.isVectorClusteringKey = typeutil.IsVectorType(t.clusteringKeyField.DataType)
t.currentTime = time.Now() t.currentTime = time.Now()
t.memoryBufferSize = t.getMemoryBufferSize() t.memoryLimit = t.getMemoryLimit()
t.bufferSize = int64(t.compactionParams.BinLogMaxSize) // Use binlog max size as read and write buffer size
workerPoolSize := t.getWorkerPoolSize() workerPoolSize := t.getWorkerPoolSize()
t.mappingPool = conc.NewPool[any](workerPoolSize) t.mappingPool = conc.NewPool[any](workerPoolSize)
t.flushPool = conc.NewPool[any](workerPoolSize) t.flushPool = conc.NewPool[any](workerPoolSize)
log.Info("clustering compaction task initialed", zap.Int64("memory_buffer_size", t.memoryBufferSize), zap.Int("worker_pool_size", workerPoolSize)) log.Info("clustering compaction task initialed", zap.Int64("memory_buffer_size", t.memoryLimit), zap.Int("worker_pool_size", workerPoolSize))
return nil return nil
} }
@ -331,7 +334,11 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
} }
alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc) alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc)
writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize), storage.WithStorageConfig(t.compactionParams.StorageConfig)) writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc,
t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.plan.MaxSegmentRows,
t.partitionID, t.collectionID, t.plan.Channel, 100,
storage.WithBufferSize(t.bufferSize),
storage.WithStorageConfig(t.compactionParams.StorageConfig))
if err != nil { if err != nil {
return err return err
} }
@ -350,7 +357,11 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
} }
alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc) alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc)
writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize), storage.WithStorageConfig(t.compactionParams.StorageConfig)) writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc,
t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.plan.MaxSegmentRows,
t.partitionID, t.collectionID, t.plan.Channel, 100,
storage.WithBufferSize(t.bufferSize),
storage.WithStorageConfig(t.compactionParams.StorageConfig))
if err != nil { if err != nil {
return err return err
} }
@ -406,7 +417,11 @@ func (t *clusteringCompactionTask) generatedVectorPlan(ctx context.Context, buff
fieldStats.SetVectorCentroids(centroidValues...) fieldStats.SetVectorCentroids(centroidValues...)
alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc) alloc := NewCompactionAllocator(t.segIDAlloc, t.logIDAlloc)
writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc, t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.plan.MaxSegmentRows, t.partitionID, t.collectionID, t.plan.Channel, 100, storage.WithBufferSize(t.memoryBufferSize), storage.WithStorageConfig(t.compactionParams.StorageConfig)) writer, err := NewMultiSegmentWriter(ctx, t.binlogIO, alloc,
t.plan.GetMaxSize(), t.plan.GetSchema(), t.compactionParams, t.plan.MaxSegmentRows,
t.partitionID, t.collectionID, t.plan.Channel, 100,
storage.WithBufferSize(t.bufferSize),
storage.WithStorageConfig(t.compactionParams.StorageConfig))
if err != nil { if err != nil {
return err return err
} }
@ -423,7 +438,7 @@ func (t *clusteringCompactionTask) generatedVectorPlan(ctx context.Context, buff
func (t *clusteringCompactionTask) switchPolicyForVectorPlan(ctx context.Context, centroids *clusteringpb.ClusteringCentroidsStats) error { func (t *clusteringCompactionTask) switchPolicyForVectorPlan(ctx context.Context, centroids *clusteringpb.ClusteringCentroidsStats) error {
bufferNum := len(centroids.GetCentroids()) bufferNum := len(centroids.GetCentroids())
bufferNumByMemory := int(t.memoryBufferSize / expectedBinlogSize) bufferNumByMemory := int(t.memoryLimit / expectedBinlogSize)
if bufferNumByMemory < bufferNum { if bufferNumByMemory < bufferNum {
bufferNum = bufferNumByMemory bufferNum = bufferNumByMemory
} }
@ -591,7 +606,7 @@ func (t *clusteringCompactionTask) mappingSegment(
return t.binlogIO.Download(ctx, paths) return t.binlogIO.Download(ctx, paths)
}), }),
storage.WithVersion(segment.StorageVersion), storage.WithVersion(segment.StorageVersion),
storage.WithBufferSize(t.memoryBufferSize), storage.WithBufferSize(t.bufferSize),
storage.WithStorageConfig(t.compactionParams.StorageConfig), storage.WithStorageConfig(t.compactionParams.StorageConfig),
) )
if err != nil { if err != nil {
@ -674,17 +689,17 @@ func (t *clusteringCompactionTask) getWorkerPoolSize() int {
return int(math.Max(float64(paramtable.Get().DataNodeCfg.ClusteringCompactionWorkerPoolSize.GetAsInt()), 1.0)) return int(math.Max(float64(paramtable.Get().DataNodeCfg.ClusteringCompactionWorkerPoolSize.GetAsInt()), 1.0))
} }
// getMemoryBufferSize return memoryBufferSize // getMemoryLimit returns the maximum memory that a clustering compaction task is allowed to use
func (t *clusteringCompactionTask) getMemoryBufferSize() int64 { func (t *clusteringCompactionTask) getMemoryLimit() int64 {
return int64(float64(hardware.GetMemoryCount()) * paramtable.Get().DataNodeCfg.ClusteringCompactionMemoryBufferRatio.GetAsFloat()) return int64(float64(hardware.GetMemoryCount()) * paramtable.Get().DataNodeCfg.ClusteringCompactionMemoryBufferRatio.GetAsFloat())
} }
func (t *clusteringCompactionTask) getMemoryBufferLowWatermark() int64 { func (t *clusteringCompactionTask) getMemoryBufferLowWatermark() int64 {
return int64(float64(t.memoryBufferSize) * 0.3) return int64(float64(t.memoryLimit) * 0.3)
} }
func (t *clusteringCompactionTask) getMemoryBufferHighWatermark() int64 { func (t *clusteringCompactionTask) getMemoryBufferHighWatermark() int64 {
return int64(float64(t.memoryBufferSize) * 0.7) return int64(float64(t.memoryLimit) * 0.7)
} }
func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) error { func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) error {
@ -867,7 +882,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
return t.binlogIO.Download(ctx, paths) return t.binlogIO.Download(ctx, paths)
}), }),
storage.WithVersion(segment.StorageVersion), storage.WithVersion(segment.StorageVersion),
storage.WithBufferSize(t.memoryBufferSize), storage.WithBufferSize(t.bufferSize),
storage.WithStorageConfig(t.compactionParams.StorageConfig), storage.WithStorageConfig(t.compactionParams.StorageConfig),
) )
if err != nil { if err != nil {
@ -971,7 +986,7 @@ func (t *clusteringCompactionTask) generatedScalarPlan(maxRows, preferRows int64
func (t *clusteringCompactionTask) switchPolicyForScalarPlan(totalRows int64, keys []interface{}, dict map[interface{}]int64) [][]interface{} { func (t *clusteringCompactionTask) switchPolicyForScalarPlan(totalRows int64, keys []interface{}, dict map[interface{}]int64) [][]interface{} {
bufferNumBySegmentMaxRows := totalRows / t.plan.MaxSegmentRows bufferNumBySegmentMaxRows := totalRows / t.plan.MaxSegmentRows
bufferNumByMemory := t.memoryBufferSize / expectedBinlogSize bufferNumByMemory := t.memoryLimit / expectedBinlogSize
log.Info("switchPolicyForScalarPlan", zap.Int64("totalRows", totalRows), log.Info("switchPolicyForScalarPlan", zap.Int64("totalRows", totalRows),
zap.Int64("bufferNumBySegmentMaxRows", bufferNumBySegmentMaxRows), zap.Int64("bufferNumBySegmentMaxRows", bufferNumBySegmentMaxRows),
zap.Int64("bufferNumByMemory", bufferNumByMemory)) zap.Int64("bufferNumByMemory", bufferNumByMemory))

View File

@ -179,7 +179,7 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionInit() {
s.Require().NoError(err) s.Require().NoError(err)
s.Equal(s.task.primaryKeyField, s.task.plan.Schema.Fields[2]) s.Equal(s.task.primaryKeyField, s.task.plan.Schema.Fields[2])
s.Equal(false, s.task.isVectorClusteringKey) s.Equal(false, s.task.isVectorClusteringKey)
s.Equal(true, s.task.memoryBufferSize > 0) s.Equal(true, s.task.memoryLimit > 0)
s.Equal(8, s.task.getWorkerPoolSize()) s.Equal(8, s.task.getWorkerPoolSize())
s.Equal(8, s.task.mappingPool.Cap()) s.Equal(8, s.task.mappingPool.Cap())
s.Equal(8, s.task.flushPool.Cap()) s.Equal(8, s.task.flushPool.Cap())
@ -305,7 +305,7 @@ func (s *ClusteringCompactionTaskSuite) prepareScalarCompactionNormalByMemoryLim
func(ctx context.Context, strings []string) ([][]byte, error) { func(ctx context.Context, strings []string) ([][]byte, error) {
// 32m, only two buffers can be generated // 32m, only two buffers can be generated
one.Do(func() { one.Do(func() {
s.task.memoryBufferSize = 32 * 1024 * 1024 s.task.memoryLimit = 32 * 1024 * 1024
}) })
return lo.Values(kvs), nil return lo.Values(kvs), nil
}) })

View File

@ -277,7 +277,7 @@ func NewBinlogRecordWriter(ctx context.Context, collectionID, partitionID, segme
) )
case StorageV2: case StorageV2:
return newPackedBinlogRecordWriter(collectionID, partitionID, segmentID, schema, return newPackedBinlogRecordWriter(collectionID, partitionID, segmentID, schema,
blobsWriter, allocator, chunkSize, maxRowNum, blobsWriter, allocator, maxRowNum,
rwOptions.bufferSize, rwOptions.multiPartUploadSize, rwOptions.columnGroups, rwOptions.bufferSize, rwOptions.multiPartUploadSize, rwOptions.columnGroups,
rwOptions.storageConfig, rwOptions.storageConfig,
) )

View File

@ -276,7 +276,6 @@ type PackedBinlogRecordWriter struct {
schema *schemapb.CollectionSchema schema *schemapb.CollectionSchema
BlobsWriter ChunkedBlobsWriter BlobsWriter ChunkedBlobsWriter
allocator allocator.Interface allocator allocator.Interface
chunkSize uint64
maxRowNum int64 maxRowNum int64
arrowSchema *arrow.Schema arrowSchema *arrow.Schema
bufferSize int64 bufferSize int64
@ -531,7 +530,7 @@ func (pw *PackedBinlogRecordWriter) GetBufferUncompressed() uint64 {
} }
func newPackedBinlogRecordWriter(collectionID, partitionID, segmentID UniqueID, schema *schemapb.CollectionSchema, func newPackedBinlogRecordWriter(collectionID, partitionID, segmentID UniqueID, schema *schemapb.CollectionSchema,
blobsWriter ChunkedBlobsWriter, allocator allocator.Interface, chunkSize uint64, maxRowNum int64, bufferSize, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup, blobsWriter ChunkedBlobsWriter, allocator allocator.Interface, maxRowNum int64, bufferSize, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup,
storageConfig *indexpb.StorageConfig, storageConfig *indexpb.StorageConfig,
) (*PackedBinlogRecordWriter, error) { ) (*PackedBinlogRecordWriter, error) {
arrowSchema, err := ConvertToArrowSchema(schema.Fields) arrowSchema, err := ConvertToArrowSchema(schema.Fields)
@ -566,7 +565,6 @@ func newPackedBinlogRecordWriter(collectionID, partitionID, segmentID UniqueID,
arrowSchema: arrowSchema, arrowSchema: arrowSchema,
BlobsWriter: blobsWriter, BlobsWriter: blobsWriter,
allocator: allocator, allocator: allocator,
chunkSize: chunkSize,
maxRowNum: maxRowNum, maxRowNum: maxRowNum,
bufferSize: bufferSize, bufferSize: bufferSize,
multiPartUploadSize: multiPartUploadSize, multiPartUploadSize: multiPartUploadSize,