fix: Improve import memory management to prevent OOM (#43568)

1. Use blocking memory allocation to wait until memory becomes available
2. Perform memory allocation at the file level instead of per task
3. Limit Parquet file reader batch size to prevent excessive memory
consumption
4. Limit import buffer size from 20% to 10% of total memory

issue: https://github.com/milvus-io/milvus/issues/43387,
https://github.com/milvus-io/milvus/issues/43131

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-07-28 21:25:35 +08:00 committed by GitHub
parent 5b9b895cb0
commit a29b3272b0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 209 additions and 189 deletions

View File

@ -754,7 +754,7 @@ dataNode:
maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files.
readBufferSizeInMB: 16 # The base insert buffer size (in MB) during import. The actual buffer size will be dynamically calculated based on the number of shards.
readDeleteBufferSizeInMB: 16 # The delete buffer size (in MB) during import.
memoryLimitPercentage: 20 # The percentage of memory limit for import/pre-import tasks.
memoryLimitPercentage: 10 # The percentage of memory limit for import/pre-import tasks.
compaction:
levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode
levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1.

View File

@ -819,15 +819,22 @@ func CalculateTaskSlot(task ImportTask, importMeta ImportMeta) int {
}
// Calculate memory-based slots
var taskBufferSize int
baseBufferSize := paramtable.Get().DataNodeCfg.ImportBaseBufferSize.GetAsInt()
totalBufferSize := baseBufferSize * len(job.GetVchannels()) * len(job.GetPartitionIDs())
if task.GetType() == ImportTaskType {
// ImportTask use dynamic buffer size calculated by vchannels and partitions
taskBufferSize = int(baseBufferSize) * len(job.GetVchannels()) * len(job.GetPartitionIDs())
} else {
// PreImportTask use fixed buffer size
taskBufferSize = baseBufferSize
}
isL0Import := importutilv2.IsL0Import(job.GetOptions())
if isL0Import {
// L0 import won't hash the data by channel or partition
totalBufferSize = paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt()
// L0 import use fixed buffer size
taskBufferSize = paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt()
}
memoryLimitPerSlot := paramtable.Get().DataCoordCfg.ImportMemoryLimitPerSlot.GetAsInt()
memoryBasedSlots := totalBufferSize / memoryLimitPerSlot
memoryBasedSlots := taskBufferSize / memoryLimitPerSlot
// Return the larger value to ensure both CPU and memory constraints are satisfied
if cpuBasedSlots > memoryBasedSlots {

View File

@ -22,14 +22,28 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/hardware"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
var (
globalMemoryAllocator MemoryAllocator
globalMemoryAllocatorOnce sync.Once
)
// GetMemoryAllocator returns the global memory allocator instance
func GetMemoryAllocator() MemoryAllocator {
globalMemoryAllocatorOnce.Do(func() {
globalMemoryAllocator = NewMemoryAllocator(int64(hardware.GetMemoryCount()))
})
return globalMemoryAllocator
}
// MemoryAllocator handles memory allocation and deallocation for import tasks
type MemoryAllocator interface {
// TryAllocate attempts to allocate memory of the specified size
// Returns true if allocation is successful, false if insufficient memory
TryAllocate(taskID int64, size int64) bool
// BlockingAllocate blocks until memory is available and then allocates
// This method will block until memory becomes available
BlockingAllocate(taskID int64, size int64)
// Release releases memory of the specified size
Release(taskID int64, size int64)
@ -39,43 +53,46 @@ type memoryAllocator struct {
systemTotalMemory int64
usedMemory int64
mutex sync.RWMutex
cond *sync.Cond
}
// NewMemoryAllocator creates a new MemoryAllocator instance
func NewMemoryAllocator(systemTotalMemory int64) MemoryAllocator {
log.Info("new import memory allocator", zap.Int64("systemTotalMemory", systemTotalMemory))
ma := &memoryAllocator{
systemTotalMemory: int64(systemTotalMemory),
systemTotalMemory: systemTotalMemory,
usedMemory: 0,
}
ma.cond = sync.NewCond(&ma.mutex)
return ma
}
// TryAllocate attempts to allocate memory of the specified size
func (ma *memoryAllocator) TryAllocate(taskID int64, size int64) bool {
// BlockingAllocate blocks until memory is available and then allocates
func (ma *memoryAllocator) BlockingAllocate(taskID int64, size int64) {
ma.mutex.Lock()
defer ma.mutex.Unlock()
percentage := paramtable.Get().DataNodeCfg.ImportMemoryLimitPercentage.GetAsFloat()
memoryLimit := int64(float64(ma.systemTotalMemory) * percentage / 100.0)
if ma.usedMemory+size > memoryLimit {
log.Info("memory allocation failed, insufficient memory",
// Wait until enough memory is available
for ma.usedMemory+size > memoryLimit {
log.Warn("task waiting for memory allocation...",
zap.Int64("taskID", taskID),
zap.Int64("requestedSize", size),
zap.Int64("usedMemory", ma.usedMemory),
zap.Int64("availableMemory", memoryLimit-ma.usedMemory))
return false
ma.cond.Wait()
}
// Allocate memory
ma.usedMemory += size
log.Info("memory allocated successfully",
zap.Int64("taskID", taskID),
zap.Int64("allocatedSize", size),
zap.Int64("usedMemory", ma.usedMemory),
zap.Int64("availableMemory", memoryLimit-ma.usedMemory))
return true
}
// Release releases memory of the specified size
@ -95,4 +112,7 @@ func (ma *memoryAllocator) Release(taskID int64, size int64) {
zap.Int64("taskID", taskID),
zap.Int64("releasedSize", size),
zap.Int64("usedMemory", ma.usedMemory))
// Wake up waiting tasks after memory is released
ma.cond.Broadcast()
}

View File

@ -17,7 +17,10 @@
package importv2
import (
"math/rand"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
@ -30,14 +33,12 @@ func TestMemoryAllocatorBasicOperations(t *testing.T) {
// Test initial state
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory)
// Test memory allocation for task 1
success := ma.TryAllocate(1, 50*1024*1024) // 50MB for task 1
assert.True(t, success)
// Test memory allocation for task 1 using BlockingAllocate
ma.BlockingAllocate(1, 50*1024*1024) // 50MB for task 1
assert.Equal(t, int64(50*1024*1024), ma.(*memoryAllocator).usedMemory)
// Test memory allocation for task 2
success = ma.TryAllocate(2, 50*1024*1024) // 50MB for task 2
assert.True(t, success)
ma.BlockingAllocate(2, 50*1024*1024) // 50MB for task 2
assert.Equal(t, int64(100*1024*1024), ma.(*memoryAllocator).usedMemory)
// Test memory release for task 1
@ -60,17 +61,25 @@ func TestMemoryAllocatorMemoryLimit(t *testing.T) {
testSize := memoryLimit / 10 // Use 10% of available memory
// Allocate memory up to the limit
success := ma.TryAllocate(1, testSize)
assert.True(t, success)
ma.BlockingAllocate(1, testSize)
assert.Equal(t, testSize, ma.(*memoryAllocator).usedMemory)
// Try to allocate more memory than available (should fail)
success = ma.TryAllocate(2, memoryLimit)
assert.False(t, success)
assert.Equal(t, testSize, ma.(*memoryAllocator).usedMemory) // Should remain unchanged
// Try to allocate more memory than available (this will block, so we test in a goroutine)
done := make(chan bool)
go func() {
ma.BlockingAllocate(2, testSize)
done <- true
}()
// Release the allocated memory
// Release the allocated memory to unblock the waiting allocation
ma.Release(1, testSize)
<-done
// Verify that the second allocation succeeded after release
assert.Equal(t, testSize, ma.(*memoryAllocator).usedMemory)
// Release the second allocation
ma.Release(2, testSize)
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory)
}
@ -84,10 +93,8 @@ func TestMemoryAllocatorConcurrentAccess(t *testing.T) {
for i := 0; i < 10; i++ {
taskID := int64(i + 1)
go func() {
success := ma.TryAllocate(taskID, 50*1024*1024) // 50MB each
if success {
ma.BlockingAllocate(taskID, 50*1024*1024) // 50MB each
ma.Release(taskID, 50*1024*1024)
}
done <- true
}()
}
@ -97,10 +104,9 @@ func TestMemoryAllocatorConcurrentAccess(t *testing.T) {
<-done
}
// Verify final state - should be 0 or the sum of remaining allocations
// depending on memory limit and concurrent execution
// Verify final state - should be 0 since all allocations were released
finalMemory := ma.(*memoryAllocator).usedMemory
assert.GreaterOrEqual(t, finalMemory, int64(0))
assert.Equal(t, int64(0), finalMemory)
}
// TestMemoryAllocatorNegativeRelease tests handling of negative memory release
@ -109,8 +115,7 @@ func TestMemoryAllocatorNegativeRelease(t *testing.T) {
ma := NewMemoryAllocator(1024 * 1024 * 1024)
// Allocate some memory
success := ma.TryAllocate(1, 100*1024*1024) // 100MB
assert.True(t, success)
ma.BlockingAllocate(1, 100*1024*1024) // 100MB
assert.Equal(t, int64(100*1024*1024), ma.(*memoryAllocator).usedMemory)
// Release more than allocated (should not go negative)
@ -121,15 +126,14 @@ func TestMemoryAllocatorNegativeRelease(t *testing.T) {
// TestMemoryAllocatorMultipleTasks tests memory management for multiple tasks
func TestMemoryAllocatorMultipleTasks(t *testing.T) {
// Create memory allocator with 1GB system memory
ma := NewMemoryAllocator(1024 * 1024 * 1024)
ma := NewMemoryAllocator(1024 * 1024 * 1024 * 2)
// Allocate memory for multiple tasks with smaller sizes
taskIDs := []int64{1, 2, 3, 4, 5}
sizes := []int64{20, 30, 25, 15, 35} // Total: 125MB
for i, taskID := range taskIDs {
success := ma.TryAllocate(taskID, sizes[i]*1024*1024)
assert.True(t, success)
ma.BlockingAllocate(taskID, sizes[i]*1024*1024)
}
// Verify total used memory
@ -155,3 +159,70 @@ func TestMemoryAllocatorMultipleTasks(t *testing.T) {
// Verify final state
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory)
}
// TestMemoryAllocatorZeroSize tests handling of zero size allocations
func TestMemoryAllocatorZeroSize(t *testing.T) {
// Create memory allocator
ma := NewMemoryAllocator(1024 * 1024 * 1024)
// Test zero size allocation
ma.BlockingAllocate(1, 0)
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory)
// Test zero size release
ma.Release(1, 0)
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory)
}
// TestMemoryAllocatorSimple tests basic functionality without external dependencies
func TestMemoryAllocatorSimple(t *testing.T) {
// Create memory allocator with 1GB system memory
ma := NewMemoryAllocator(1024 * 1024 * 1024)
// Test initial state
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory)
// Test memory allocation
ma.BlockingAllocate(1, 50*1024*1024) // 50MB
assert.Equal(t, int64(50*1024*1024), ma.(*memoryAllocator).usedMemory)
// Test memory release
ma.Release(1, 50*1024*1024)
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory)
}
// TestMemoryAllocatorMassiveConcurrency tests massive concurrent memory allocation and release
func TestMemoryAllocatorMassiveConcurrency(t *testing.T) {
// Create memory allocator with 1.6GB system memory
totalMemory := int64(16 * 1024 * 1024 * 1024) // 16GB * 10%
ma := NewMemoryAllocator(totalMemory)
const numTasks = 200
var wg sync.WaitGroup
wg.Add(numTasks)
// Start concurrent allocation and release
for i := 0; i < numTasks; i++ {
taskID := int64(i + 1)
var memorySize int64
// 10% chance to allocate 1.6GB, 90% chance to allocate 128MB-1536MB
if rand.Float64() < 0.1 {
memorySize = int64(1600 * 1024 * 1024)
} else {
multiple := rand.Intn(12) + 1
memorySize = int64(multiple * 128 * 1024 * 1024) // 128MB to 1536MB
}
go func(id int64, size int64) {
defer wg.Done()
ma.BlockingAllocate(id, size)
time.Sleep(1 * time.Millisecond)
ma.Release(id, size)
}(taskID, memorySize)
}
wg.Wait()
// Assert that all memory is released
finalMemory := ma.(*memoryAllocator).usedMemory
assert.Equal(t, int64(0), finalMemory, "All memory should be released")
}

View File

@ -27,7 +27,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/conc"
"github.com/milvus-io/milvus/pkg/v2/util/hardware"
)
type Scheduler interface {
@ -38,17 +37,14 @@ type Scheduler interface {
type scheduler struct {
manager TaskManager
memoryAllocator MemoryAllocator
closeOnce sync.Once
closeChan chan struct{}
}
func NewScheduler(manager TaskManager) Scheduler {
memoryAllocator := NewMemoryAllocator(int64(hardware.GetMemoryCount()))
return &scheduler{
manager: manager,
memoryAllocator: memoryAllocator,
closeChan: make(chan struct{}),
}
}
@ -76,34 +72,23 @@ func (s *scheduler) Start() {
}
}
// scheduleTasks implements memory-based task scheduling using MemoryAllocator
// It selects tasks that can fit within the available memory.
func (s *scheduler) scheduleTasks() {
pendingTasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending))
sort.Slice(pendingTasks, func(i, j int) bool {
return pendingTasks[i].GetTaskID() < pendingTasks[j].GetTaskID()
tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending))
sort.Slice(tasks, func(i, j int) bool {
return tasks[i].GetTaskID() < tasks[j].GetTaskID()
})
selectedTasks := make([]Task, 0)
tasksBufferSize := make(map[int64]int64)
for _, task := range pendingTasks {
taskBufferSize := task.GetBufferSize()
if s.memoryAllocator.TryAllocate(task.GetTaskID(), taskBufferSize) {
selectedTasks = append(selectedTasks, task)
tasksBufferSize[task.GetTaskID()] = taskBufferSize
}
}
if len(selectedTasks) == 0 {
if len(tasks) == 0 {
return
}
log.Info("processing selected tasks",
zap.Int("pending", len(pendingTasks)),
zap.Int("selected", len(selectedTasks)))
taskIDs := lo.Map(tasks, func(t Task, _ int) int64 {
return t.GetTaskID()
})
log.Info("processing tasks...", zap.Int64s("taskIDs", taskIDs))
futures := make(map[int64][]*conc.Future[any])
for _, task := range selectedTasks {
for _, task := range tasks {
fs := task.Execute()
futures[task.GetTaskID()] = fs
}
@ -111,16 +96,16 @@ func (s *scheduler) scheduleTasks() {
for taskID, fs := range futures {
err := conc.AwaitAll(fs...)
if err != nil {
s.memoryAllocator.Release(taskID, tasksBufferSize[taskID])
continue
}
s.manager.Update(taskID, UpdateState(datapb.ImportTaskStateV2_Completed))
s.memoryAllocator.Release(taskID, tasksBufferSize[taskID])
log.Info("preimport/import done", zap.Int64("taskID", taskID))
}
log.Info("all tasks completed", zap.Int64s("taskIDs", taskIDs))
}
// Slots returns the available slots for import
// Slots returns the used slots for import
func (s *scheduler) Slots() int64 {
tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending, datapb.ImportTaskStateV2_InProgress))
used := lo.SumBy(tasks, func(t Task) int64 {

View File

@ -26,7 +26,6 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
@ -553,88 +552,6 @@ func (s *SchedulerSuite) TestScheduler_ImportFileWithFunction() {
s.NoError(err)
}
// TestScheduler_ScheduleTasks tests the scheduleTasks method with various scenarios
func (s *SchedulerSuite) TestScheduler_ScheduleTasks() {
// Memory limit exceeded - some tasks should be skipped
s.Run("MemoryLimitExceeded", func() {
manager := NewMockTaskManager(s.T())
s.scheduler.manager = manager
// Add tasks that exceed memory limit
tasks := make(map[int64]Task, 0)
for i := 0; i < 5; i++ {
t := NewMockTask(s.T())
t.EXPECT().GetTaskID().Return(int64(i))
t.EXPECT().GetBufferSize().Return(int64(300))
if i < 3 { // Only first 3 tasks should be allocated (900 total)
t.EXPECT().Execute().Return([]*conc.Future[any]{})
}
tasks[t.GetTaskID()] = t
}
manager.EXPECT().GetBy(mock.Anything).Return(lo.Values(tasks))
manager.EXPECT().Update(mock.Anything, mock.Anything).Return()
memAllocator := NewMemoryAllocator(1000 / 0.2)
s.scheduler.memoryAllocator = memAllocator
s.scheduler.scheduleTasks()
s.Equal(int64(0), memAllocator.(*memoryAllocator).usedMemory)
})
// Task execution failure - memory should be released
s.Run("TaskExecutionFailure", func() {
manager := NewMockTaskManager(s.T())
s.scheduler.manager = manager
tasks := make(map[int64]Task, 0)
// Create a task that will fail execution
failedTask := NewMockTask(s.T())
failedTask.EXPECT().GetTaskID().Return(int64(1))
failedTask.EXPECT().GetBufferSize().Return(int64(256))
// Create a future that will fail
failedFuture := conc.Go(func() (any, error) {
return nil, errors.New("mock execution error")
})
failedTask.EXPECT().Execute().Return([]*conc.Future[any]{failedFuture})
tasks[failedTask.GetTaskID()] = failedTask
// Create a successful task
successTask := NewMockTask(s.T())
successTask.EXPECT().GetTaskID().Return(int64(2))
successTask.EXPECT().GetBufferSize().Return(int64(128))
successTask.EXPECT().Execute().Return([]*conc.Future[any]{})
tasks[successTask.GetTaskID()] = successTask
manager.EXPECT().GetBy(mock.Anything).Return(lo.Values(tasks))
manager.EXPECT().Update(mock.Anything, mock.Anything).Return()
memAllocator := NewMemoryAllocator(512 * 5)
s.scheduler.memoryAllocator = memAllocator
s.scheduler.scheduleTasks()
s.Equal(int64(0), memAllocator.(*memoryAllocator).usedMemory)
})
// Empty task list
s.Run("EmptyTaskList", func() {
manager := NewMockTaskManager(s.T())
s.scheduler.manager = manager
memAllocator := NewMemoryAllocator(1024)
s.scheduler.memoryAllocator = memAllocator
manager.EXPECT().GetBy(mock.Anything).Return(nil)
// Should not panic or error
s.NotPanics(func() {
s.scheduler.scheduleTasks()
})
s.Equal(int64(0), memAllocator.(*memoryAllocator).usedMemory)
})
}
func TestScheduler(t *testing.T) {
suite.Run(t, new(SchedulerSuite))
}

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"runtime/debug"
"time"
"github.com/cockroachdb/errors"
@ -36,6 +37,8 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/util/conc"
"github.com/milvus-io/milvus/pkg/v2/util/hardware"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -107,7 +110,27 @@ func (t *ImportTask) GetSlots() int64 {
}
func (t *ImportTask) GetBufferSize() int64 {
return GetTaskBufferSize(t)
// Calculate the task buffer size based on the number of vchannels and partitions
baseBufferSize := paramtable.Get().DataNodeCfg.ImportBaseBufferSize.GetAsInt()
vchannelNum := len(t.GetVchannels())
partitionNum := len(t.GetPartitionIDs())
taskBufferSize := int64(baseBufferSize * vchannelNum * partitionNum)
// If the file size is smaller than the task buffer size, use the file size
fileSize := lo.MaxBy(t.GetFileStats(), func(a, b *datapb.ImportFileStats) bool {
return a.GetTotalMemorySize() > b.GetTotalMemorySize()
}).GetTotalMemorySize()
if fileSize != 0 && fileSize < taskBufferSize {
taskBufferSize = fileSize
}
// Task buffer size should not exceed the memory limit
percentage := paramtable.Get().DataNodeCfg.ImportMemoryLimitPercentage.GetAsFloat()
memoryLimit := int64(float64(hardware.GetMemoryCount()) * percentage / 100.0)
if taskBufferSize > memoryLimit {
return memoryLimit
}
return taskBufferSize
}
func (t *ImportTask) Cancel() {
@ -139,11 +162,11 @@ func (t *ImportTask) Clone() Task {
}
func (t *ImportTask) Execute() []*conc.Future[any] {
bufferSize := int(t.GetBufferSize())
bufferSize := t.GetBufferSize()
log.Info("start to import", WrapLogFields(t,
zap.Int("bufferSize", bufferSize),
zap.Int64("bufferSize", bufferSize),
zap.Int64("taskSlot", t.GetSlots()),
zap.Any("files", t.GetFileStats()),
zap.Any("files", t.req.GetFiles()),
zap.Any("schema", t.GetSchema()),
)...)
t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress))
@ -151,7 +174,7 @@ func (t *ImportTask) Execute() []*conc.Future[any] {
req := t.req
fn := func(file *internalpb.ImportFile) error {
reader, err := importutilv2.NewReader(t.ctx, t.cm, t.GetSchema(), file, req.GetOptions(), bufferSize, t.req.GetStorageConfig())
reader, err := importutilv2.NewReader(t.ctx, t.cm, t.GetSchema(), file, req.GetOptions(), int(bufferSize), t.req.GetStorageConfig())
if err != nil {
log.Warn("new reader failed", WrapLogFields(t, zap.String("file", file.String()), zap.Error(err))...)
reason := fmt.Sprintf("error: %v, file: %s", err, file.String())
@ -176,6 +199,12 @@ func (t *ImportTask) Execute() []*conc.Future[any] {
for _, file := range req.GetFiles() {
file := file
f := GetExecPool().Submit(func() (any, error) {
// Use blocking allocation - this will wait until memory is available
GetMemoryAllocator().BlockingAllocate(t.GetTaskID(), bufferSize)
defer func() {
GetMemoryAllocator().Release(t.GetTaskID(), bufferSize)
debug.FreeOSMemory()
}()
err := fn(file)
return err, err
})

View File

@ -140,7 +140,7 @@ func (t *L0ImportTask) Execute() []*conc.Future[any] {
log.Info("start to import l0", WrapLogFields(t,
zap.Int("bufferSize", bufferSize),
zap.Int64("taskSlot", t.GetSlots()),
zap.Any("files", t.GetFileStats()),
zap.Any("files", t.req.GetFiles()),
zap.Any("schema", t.GetSchema()),
)...)
t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress))

View File

@ -128,7 +128,7 @@ func (t *L0PreImportTask) Execute() []*conc.Future[any] {
log.Info("start to preimport l0", WrapLogFields(t,
zap.Int("bufferSize", bufferSize),
zap.Int64("taskSlot", t.GetSlots()),
zap.Any("files", t.GetFileStats()),
zap.Any("files", t.req.GetImportFiles()),
zap.Any("schema", t.GetSchema()),
)...)
t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress))

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"runtime/debug"
"time"
"github.com/cockroachdb/errors"
@ -107,8 +108,9 @@ func (t *PreImportTask) GetSlots() int64 {
return t.req.GetTaskSlot()
}
// PreImportTask buffer size is fixed
func (t *PreImportTask) GetBufferSize() int64 {
return GetTaskBufferSize(t)
return paramtable.Get().DataNodeCfg.ImportBaseBufferSize.GetAsInt64()
}
func (t *PreImportTask) Cancel() {
@ -136,7 +138,7 @@ func (t *PreImportTask) Execute() []*conc.Future[any] {
log.Info("start to preimport", WrapLogFields(t,
zap.Int("bufferSize", bufferSize),
zap.Int64("taskSlot", t.GetSlots()),
zap.Any("files", t.GetFileStats()),
zap.Any("files", t.req.GetImportFiles()),
zap.Any("schema", t.GetSchema()),
)...)
t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress))
@ -172,6 +174,9 @@ func (t *PreImportTask) Execute() []*conc.Future[any] {
i := i
file := file
f := GetExecPool().Submit(func() (any, error) {
defer func() {
debug.FreeOSMemory()
}()
err := fn(i, file)
return err, err
})

View File

@ -39,9 +39,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/util/hardware"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -558,19 +556,3 @@ func NewMetaCache(req *datapb.ImportRequest) map[string]metacache.MetaCache {
}
return metaCaches
}
// GetBufferSize returns the memory buffer size required by this task
func GetTaskBufferSize(task Task) int64 {
baseBufferSize := paramtable.Get().DataNodeCfg.ImportBaseBufferSize.GetAsInt()
vchannelNum := len(task.GetVchannels())
partitionNum := len(task.GetPartitionIDs())
totalBufferSize := int64(baseBufferSize * vchannelNum * partitionNum)
percentage := paramtable.Get().DataNodeCfg.ImportMemoryLimitPercentage.GetAsFloat()
memoryLimit := int64(float64(hardware.GetMemoryCount()) * percentage / 100.0)
if totalBufferSize > memoryLimit {
return memoryLimit
}
return totalBufferSize
}

View File

@ -68,7 +68,15 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co
log.Info("parquet file info", zap.Int("row group num", r.NumRowGroups()),
zap.Int64("num rows", r.NumRows()))
fileReader, err := pqarrow.NewFileReader(r, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
count, err := common.EstimateReadCountPerBatch(bufferSize, schema)
if err != nil {
return nil, err
}
readProps := pqarrow.ArrowReadProperties{
BatchSize: count,
}
fileReader, err := pqarrow.NewFileReader(r, readProps, memory.DefaultAllocator)
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("new parquet file reader failed, err=%v", err))
}
@ -77,10 +85,6 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co
if err != nil {
return nil, err
}
count, err := common.EstimateReadCountPerBatch(bufferSize, schema)
if err != nil {
return nil, err
}
return &reader{
ctx: ctx,
cm: cm,

View File

@ -5555,14 +5555,14 @@ if this parameter <= 0, will set it as 10`,
Key: "dataNode.import.memoryLimitPercentage",
Version: "2.5.15",
Doc: "The percentage of memory limit for import/pre-import tasks.",
DefaultValue: "20",
DefaultValue: "10",
PanicIfEmpty: false,
Export: true,
Formatter: func(v string) string {
percentage := getAsFloat(v)
if percentage <= 0 || percentage > 100 {
log.Warn("invalid import memory limit percentage, using default 20%")
return "20"
log.Warn("invalid import memory limit percentage, using default 10%")
return "10"
}
return fmt.Sprintf("%f", percentage)
},

View File

@ -623,7 +623,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64())
assert.Equal(t, 16*1024*1024, Params.ImportBaseBufferSize.GetAsInt())
assert.Equal(t, 16*1024*1024, Params.ImportDeleteBufferSize.GetAsInt())
assert.Equal(t, 20.0, Params.ImportMemoryLimitPercentage.GetAsFloat())
assert.Equal(t, 10.0, Params.ImportMemoryLimitPercentage.GetAsFloat())
params.Save("datanode.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
assert.Equal(t, 16, Params.SlotCap.GetAsInt())