From a29b3272b0792c92fe528024e3aa1c5edb1b4fb3 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Mon, 28 Jul 2025 21:25:35 +0800 Subject: [PATCH] fix: Improve import memory management to prevent OOM (#43568) 1. Use blocking memory allocation to wait until memory becomes available 2. Perform memory allocation at the file level instead of per task 3. Limit Parquet file reader batch size to prevent excessive memory consumption 4. Limit import buffer size from 20% to 10% of total memory issue: https://github.com/milvus-io/milvus/issues/43387, https://github.com/milvus-io/milvus/issues/43131 --------- Signed-off-by: bigsheeper --- configs/milvus.yaml | 2 +- internal/datacoord/import_util.go | 15 ++- .../datanode/importv2/memory_allocator.go | 42 +++++-- .../importv2/memory_allocator_test.go | 119 ++++++++++++++---- internal/datanode/importv2/scheduler.go | 45 +++---- internal/datanode/importv2/scheduler_test.go | 83 ------------ internal/datanode/importv2/task_import.go | 39 +++++- internal/datanode/importv2/task_l0_import.go | 2 +- .../datanode/importv2/task_l0_preimport.go | 2 +- internal/datanode/importv2/task_preimport.go | 9 +- internal/datanode/importv2/util.go | 18 --- internal/util/importutilv2/parquet/reader.go | 14 ++- pkg/util/paramtable/component_param.go | 6 +- pkg/util/paramtable/component_param_test.go | 2 +- 14 files changed, 209 insertions(+), 189 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index fd7a30c49e..6d1c3f3874 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -754,7 +754,7 @@ dataNode: maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files. readBufferSizeInMB: 16 # The base insert buffer size (in MB) during import. The actual buffer size will be dynamically calculated based on the number of shards. readDeleteBufferSizeInMB: 16 # The delete buffer size (in MB) during import. - memoryLimitPercentage: 20 # The percentage of memory limit for import/pre-import tasks. + memoryLimitPercentage: 10 # The percentage of memory limit for import/pre-import tasks. compaction: levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1. diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 761abf9e32..77d2c85ad2 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -819,15 +819,22 @@ func CalculateTaskSlot(task ImportTask, importMeta ImportMeta) int { } // Calculate memory-based slots + var taskBufferSize int baseBufferSize := paramtable.Get().DataNodeCfg.ImportBaseBufferSize.GetAsInt() - totalBufferSize := baseBufferSize * len(job.GetVchannels()) * len(job.GetPartitionIDs()) + if task.GetType() == ImportTaskType { + // ImportTask use dynamic buffer size calculated by vchannels and partitions + taskBufferSize = int(baseBufferSize) * len(job.GetVchannels()) * len(job.GetPartitionIDs()) + } else { + // PreImportTask use fixed buffer size + taskBufferSize = baseBufferSize + } isL0Import := importutilv2.IsL0Import(job.GetOptions()) if isL0Import { - // L0 import won't hash the data by channel or partition - totalBufferSize = paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt() + // L0 import use fixed buffer size + taskBufferSize = paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt() } memoryLimitPerSlot := paramtable.Get().DataCoordCfg.ImportMemoryLimitPerSlot.GetAsInt() - memoryBasedSlots := totalBufferSize / memoryLimitPerSlot + memoryBasedSlots := taskBufferSize / memoryLimitPerSlot // Return the larger value to ensure both CPU and memory constraints are satisfied if cpuBasedSlots > memoryBasedSlots { diff --git a/internal/datanode/importv2/memory_allocator.go b/internal/datanode/importv2/memory_allocator.go index d3341fa70f..0b9855bb45 100644 --- a/internal/datanode/importv2/memory_allocator.go +++ b/internal/datanode/importv2/memory_allocator.go @@ -22,14 +22,28 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/util/hardware" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) +var ( + globalMemoryAllocator MemoryAllocator + globalMemoryAllocatorOnce sync.Once +) + +// GetMemoryAllocator returns the global memory allocator instance +func GetMemoryAllocator() MemoryAllocator { + globalMemoryAllocatorOnce.Do(func() { + globalMemoryAllocator = NewMemoryAllocator(int64(hardware.GetMemoryCount())) + }) + return globalMemoryAllocator +} + // MemoryAllocator handles memory allocation and deallocation for import tasks type MemoryAllocator interface { - // TryAllocate attempts to allocate memory of the specified size - // Returns true if allocation is successful, false if insufficient memory - TryAllocate(taskID int64, size int64) bool + // BlockingAllocate blocks until memory is available and then allocates + // This method will block until memory becomes available + BlockingAllocate(taskID int64, size int64) // Release releases memory of the specified size Release(taskID int64, size int64) @@ -39,43 +53,46 @@ type memoryAllocator struct { systemTotalMemory int64 usedMemory int64 mutex sync.RWMutex + cond *sync.Cond } // NewMemoryAllocator creates a new MemoryAllocator instance func NewMemoryAllocator(systemTotalMemory int64) MemoryAllocator { log.Info("new import memory allocator", zap.Int64("systemTotalMemory", systemTotalMemory)) ma := &memoryAllocator{ - systemTotalMemory: int64(systemTotalMemory), + systemTotalMemory: systemTotalMemory, usedMemory: 0, } + ma.cond = sync.NewCond(&ma.mutex) return ma } -// TryAllocate attempts to allocate memory of the specified size -func (ma *memoryAllocator) TryAllocate(taskID int64, size int64) bool { +// BlockingAllocate blocks until memory is available and then allocates +func (ma *memoryAllocator) BlockingAllocate(taskID int64, size int64) { ma.mutex.Lock() defer ma.mutex.Unlock() percentage := paramtable.Get().DataNodeCfg.ImportMemoryLimitPercentage.GetAsFloat() memoryLimit := int64(float64(ma.systemTotalMemory) * percentage / 100.0) - if ma.usedMemory+size > memoryLimit { - log.Info("memory allocation failed, insufficient memory", + // Wait until enough memory is available + for ma.usedMemory+size > memoryLimit { + log.Warn("task waiting for memory allocation...", zap.Int64("taskID", taskID), zap.Int64("requestedSize", size), zap.Int64("usedMemory", ma.usedMemory), zap.Int64("availableMemory", memoryLimit-ma.usedMemory)) - return false + + ma.cond.Wait() } + // Allocate memory ma.usedMemory += size log.Info("memory allocated successfully", zap.Int64("taskID", taskID), zap.Int64("allocatedSize", size), zap.Int64("usedMemory", ma.usedMemory), zap.Int64("availableMemory", memoryLimit-ma.usedMemory)) - - return true } // Release releases memory of the specified size @@ -95,4 +112,7 @@ func (ma *memoryAllocator) Release(taskID int64, size int64) { zap.Int64("taskID", taskID), zap.Int64("releasedSize", size), zap.Int64("usedMemory", ma.usedMemory)) + + // Wake up waiting tasks after memory is released + ma.cond.Broadcast() } diff --git a/internal/datanode/importv2/memory_allocator_test.go b/internal/datanode/importv2/memory_allocator_test.go index da2b161adb..d692a2145e 100644 --- a/internal/datanode/importv2/memory_allocator_test.go +++ b/internal/datanode/importv2/memory_allocator_test.go @@ -17,7 +17,10 @@ package importv2 import ( + "math/rand" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -30,14 +33,12 @@ func TestMemoryAllocatorBasicOperations(t *testing.T) { // Test initial state assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory) - // Test memory allocation for task 1 - success := ma.TryAllocate(1, 50*1024*1024) // 50MB for task 1 - assert.True(t, success) + // Test memory allocation for task 1 using BlockingAllocate + ma.BlockingAllocate(1, 50*1024*1024) // 50MB for task 1 assert.Equal(t, int64(50*1024*1024), ma.(*memoryAllocator).usedMemory) // Test memory allocation for task 2 - success = ma.TryAllocate(2, 50*1024*1024) // 50MB for task 2 - assert.True(t, success) + ma.BlockingAllocate(2, 50*1024*1024) // 50MB for task 2 assert.Equal(t, int64(100*1024*1024), ma.(*memoryAllocator).usedMemory) // Test memory release for task 1 @@ -60,17 +61,25 @@ func TestMemoryAllocatorMemoryLimit(t *testing.T) { testSize := memoryLimit / 10 // Use 10% of available memory // Allocate memory up to the limit - success := ma.TryAllocate(1, testSize) - assert.True(t, success) + ma.BlockingAllocate(1, testSize) assert.Equal(t, testSize, ma.(*memoryAllocator).usedMemory) - // Try to allocate more memory than available (should fail) - success = ma.TryAllocate(2, memoryLimit) - assert.False(t, success) - assert.Equal(t, testSize, ma.(*memoryAllocator).usedMemory) // Should remain unchanged + // Try to allocate more memory than available (this will block, so we test in a goroutine) + done := make(chan bool) + go func() { + ma.BlockingAllocate(2, testSize) + done <- true + }() - // Release the allocated memory + // Release the allocated memory to unblock the waiting allocation ma.Release(1, testSize) + <-done + + // Verify that the second allocation succeeded after release + assert.Equal(t, testSize, ma.(*memoryAllocator).usedMemory) + + // Release the second allocation + ma.Release(2, testSize) assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory) } @@ -84,10 +93,8 @@ func TestMemoryAllocatorConcurrentAccess(t *testing.T) { for i := 0; i < 10; i++ { taskID := int64(i + 1) go func() { - success := ma.TryAllocate(taskID, 50*1024*1024) // 50MB each - if success { - ma.Release(taskID, 50*1024*1024) - } + ma.BlockingAllocate(taskID, 50*1024*1024) // 50MB each + ma.Release(taskID, 50*1024*1024) done <- true }() } @@ -97,10 +104,9 @@ func TestMemoryAllocatorConcurrentAccess(t *testing.T) { <-done } - // Verify final state - should be 0 or the sum of remaining allocations - // depending on memory limit and concurrent execution + // Verify final state - should be 0 since all allocations were released finalMemory := ma.(*memoryAllocator).usedMemory - assert.GreaterOrEqual(t, finalMemory, int64(0)) + assert.Equal(t, int64(0), finalMemory) } // TestMemoryAllocatorNegativeRelease tests handling of negative memory release @@ -109,8 +115,7 @@ func TestMemoryAllocatorNegativeRelease(t *testing.T) { ma := NewMemoryAllocator(1024 * 1024 * 1024) // Allocate some memory - success := ma.TryAllocate(1, 100*1024*1024) // 100MB - assert.True(t, success) + ma.BlockingAllocate(1, 100*1024*1024) // 100MB assert.Equal(t, int64(100*1024*1024), ma.(*memoryAllocator).usedMemory) // Release more than allocated (should not go negative) @@ -121,15 +126,14 @@ func TestMemoryAllocatorNegativeRelease(t *testing.T) { // TestMemoryAllocatorMultipleTasks tests memory management for multiple tasks func TestMemoryAllocatorMultipleTasks(t *testing.T) { // Create memory allocator with 1GB system memory - ma := NewMemoryAllocator(1024 * 1024 * 1024) + ma := NewMemoryAllocator(1024 * 1024 * 1024 * 2) // Allocate memory for multiple tasks with smaller sizes taskIDs := []int64{1, 2, 3, 4, 5} sizes := []int64{20, 30, 25, 15, 35} // Total: 125MB for i, taskID := range taskIDs { - success := ma.TryAllocate(taskID, sizes[i]*1024*1024) - assert.True(t, success) + ma.BlockingAllocate(taskID, sizes[i]*1024*1024) } // Verify total used memory @@ -155,3 +159,70 @@ func TestMemoryAllocatorMultipleTasks(t *testing.T) { // Verify final state assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory) } + +// TestMemoryAllocatorZeroSize tests handling of zero size allocations +func TestMemoryAllocatorZeroSize(t *testing.T) { + // Create memory allocator + ma := NewMemoryAllocator(1024 * 1024 * 1024) + + // Test zero size allocation + ma.BlockingAllocate(1, 0) + assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory) + + // Test zero size release + ma.Release(1, 0) + assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory) +} + +// TestMemoryAllocatorSimple tests basic functionality without external dependencies +func TestMemoryAllocatorSimple(t *testing.T) { + // Create memory allocator with 1GB system memory + ma := NewMemoryAllocator(1024 * 1024 * 1024) + + // Test initial state + assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory) + + // Test memory allocation + ma.BlockingAllocate(1, 50*1024*1024) // 50MB + assert.Equal(t, int64(50*1024*1024), ma.(*memoryAllocator).usedMemory) + + // Test memory release + ma.Release(1, 50*1024*1024) + assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory) +} + +// TestMemoryAllocatorMassiveConcurrency tests massive concurrent memory allocation and release +func TestMemoryAllocatorMassiveConcurrency(t *testing.T) { + // Create memory allocator with 1.6GB system memory + totalMemory := int64(16 * 1024 * 1024 * 1024) // 16GB * 10% + ma := NewMemoryAllocator(totalMemory) + + const numTasks = 200 + + var wg sync.WaitGroup + wg.Add(numTasks) + // Start concurrent allocation and release + for i := 0; i < numTasks; i++ { + taskID := int64(i + 1) + var memorySize int64 + // 10% chance to allocate 1.6GB, 90% chance to allocate 128MB-1536MB + if rand.Float64() < 0.1 { + memorySize = int64(1600 * 1024 * 1024) + } else { + multiple := rand.Intn(12) + 1 + memorySize = int64(multiple * 128 * 1024 * 1024) // 128MB to 1536MB + } + + go func(id int64, size int64) { + defer wg.Done() + ma.BlockingAllocate(id, size) + time.Sleep(1 * time.Millisecond) + ma.Release(id, size) + }(taskID, memorySize) + } + wg.Wait() + + // Assert that all memory is released + finalMemory := ma.(*memoryAllocator).usedMemory + assert.Equal(t, int64(0), finalMemory, "All memory should be released") +} diff --git a/internal/datanode/importv2/scheduler.go b/internal/datanode/importv2/scheduler.go index bbcd1fa583..9262ce2663 100644 --- a/internal/datanode/importv2/scheduler.go +++ b/internal/datanode/importv2/scheduler.go @@ -27,7 +27,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/conc" - "github.com/milvus-io/milvus/pkg/v2/util/hardware" ) type Scheduler interface { @@ -37,19 +36,16 @@ type Scheduler interface { } type scheduler struct { - manager TaskManager - memoryAllocator MemoryAllocator + manager TaskManager closeOnce sync.Once closeChan chan struct{} } func NewScheduler(manager TaskManager) Scheduler { - memoryAllocator := NewMemoryAllocator(int64(hardware.GetMemoryCount())) return &scheduler{ - manager: manager, - memoryAllocator: memoryAllocator, - closeChan: make(chan struct{}), + manager: manager, + closeChan: make(chan struct{}), } } @@ -76,34 +72,23 @@ func (s *scheduler) Start() { } } -// scheduleTasks implements memory-based task scheduling using MemoryAllocator -// It selects tasks that can fit within the available memory. func (s *scheduler) scheduleTasks() { - pendingTasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending)) - sort.Slice(pendingTasks, func(i, j int) bool { - return pendingTasks[i].GetTaskID() < pendingTasks[j].GetTaskID() + tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending)) + sort.Slice(tasks, func(i, j int) bool { + return tasks[i].GetTaskID() < tasks[j].GetTaskID() }) - selectedTasks := make([]Task, 0) - tasksBufferSize := make(map[int64]int64) - for _, task := range pendingTasks { - taskBufferSize := task.GetBufferSize() - if s.memoryAllocator.TryAllocate(task.GetTaskID(), taskBufferSize) { - selectedTasks = append(selectedTasks, task) - tasksBufferSize[task.GetTaskID()] = taskBufferSize - } - } - - if len(selectedTasks) == 0 { + if len(tasks) == 0 { return } - log.Info("processing selected tasks", - zap.Int("pending", len(pendingTasks)), - zap.Int("selected", len(selectedTasks))) + taskIDs := lo.Map(tasks, func(t Task, _ int) int64 { + return t.GetTaskID() + }) + log.Info("processing tasks...", zap.Int64s("taskIDs", taskIDs)) futures := make(map[int64][]*conc.Future[any]) - for _, task := range selectedTasks { + for _, task := range tasks { fs := task.Execute() futures[task.GetTaskID()] = fs } @@ -111,16 +96,16 @@ func (s *scheduler) scheduleTasks() { for taskID, fs := range futures { err := conc.AwaitAll(fs...) if err != nil { - s.memoryAllocator.Release(taskID, tasksBufferSize[taskID]) continue } s.manager.Update(taskID, UpdateState(datapb.ImportTaskStateV2_Completed)) - s.memoryAllocator.Release(taskID, tasksBufferSize[taskID]) log.Info("preimport/import done", zap.Int64("taskID", taskID)) } + + log.Info("all tasks completed", zap.Int64s("taskIDs", taskIDs)) } -// Slots returns the available slots for import +// Slots returns the used slots for import func (s *scheduler) Slots() int64 { tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending, datapb.ImportTaskStateV2_InProgress)) used := lo.SumBy(tasks, func(t Task) int64 { diff --git a/internal/datanode/importv2/scheduler_test.go b/internal/datanode/importv2/scheduler_test.go index 400e3755aa..ba6d976558 100644 --- a/internal/datanode/importv2/scheduler_test.go +++ b/internal/datanode/importv2/scheduler_test.go @@ -26,7 +26,6 @@ import ( "time" "github.com/cockroachdb/errors" - "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -553,88 +552,6 @@ func (s *SchedulerSuite) TestScheduler_ImportFileWithFunction() { s.NoError(err) } -// TestScheduler_ScheduleTasks tests the scheduleTasks method with various scenarios -func (s *SchedulerSuite) TestScheduler_ScheduleTasks() { - // Memory limit exceeded - some tasks should be skipped - s.Run("MemoryLimitExceeded", func() { - manager := NewMockTaskManager(s.T()) - s.scheduler.manager = manager - - // Add tasks that exceed memory limit - tasks := make(map[int64]Task, 0) - for i := 0; i < 5; i++ { - t := NewMockTask(s.T()) - t.EXPECT().GetTaskID().Return(int64(i)) - t.EXPECT().GetBufferSize().Return(int64(300)) - if i < 3 { // Only first 3 tasks should be allocated (900 total) - t.EXPECT().Execute().Return([]*conc.Future[any]{}) - } - tasks[t.GetTaskID()] = t - } - - manager.EXPECT().GetBy(mock.Anything).Return(lo.Values(tasks)) - manager.EXPECT().Update(mock.Anything, mock.Anything).Return() - - memAllocator := NewMemoryAllocator(1000 / 0.2) - s.scheduler.memoryAllocator = memAllocator - - s.scheduler.scheduleTasks() - s.Equal(int64(0), memAllocator.(*memoryAllocator).usedMemory) - }) - - // Task execution failure - memory should be released - s.Run("TaskExecutionFailure", func() { - manager := NewMockTaskManager(s.T()) - s.scheduler.manager = manager - - tasks := make(map[int64]Task, 0) - // Create a task that will fail execution - failedTask := NewMockTask(s.T()) - failedTask.EXPECT().GetTaskID().Return(int64(1)) - failedTask.EXPECT().GetBufferSize().Return(int64(256)) - - // Create a future that will fail - failedFuture := conc.Go(func() (any, error) { - return nil, errors.New("mock execution error") - }) - failedTask.EXPECT().Execute().Return([]*conc.Future[any]{failedFuture}) - tasks[failedTask.GetTaskID()] = failedTask - - // Create a successful task - successTask := NewMockTask(s.T()) - successTask.EXPECT().GetTaskID().Return(int64(2)) - successTask.EXPECT().GetBufferSize().Return(int64(128)) - successTask.EXPECT().Execute().Return([]*conc.Future[any]{}) - tasks[successTask.GetTaskID()] = successTask - - manager.EXPECT().GetBy(mock.Anything).Return(lo.Values(tasks)) - manager.EXPECT().Update(mock.Anything, mock.Anything).Return() - - memAllocator := NewMemoryAllocator(512 * 5) - s.scheduler.memoryAllocator = memAllocator - - s.scheduler.scheduleTasks() - s.Equal(int64(0), memAllocator.(*memoryAllocator).usedMemory) - }) - - // Empty task list - s.Run("EmptyTaskList", func() { - manager := NewMockTaskManager(s.T()) - s.scheduler.manager = manager - - memAllocator := NewMemoryAllocator(1024) - s.scheduler.memoryAllocator = memAllocator - - manager.EXPECT().GetBy(mock.Anything).Return(nil) - - // Should not panic or error - s.NotPanics(func() { - s.scheduler.scheduleTasks() - }) - s.Equal(int64(0), memAllocator.(*memoryAllocator).usedMemory) - }) -} - func TestScheduler(t *testing.T) { suite.Run(t, new(SchedulerSuite)) } diff --git a/internal/datanode/importv2/task_import.go b/internal/datanode/importv2/task_import.go index bdf8c4aa6e..0d9fcd7102 100644 --- a/internal/datanode/importv2/task_import.go +++ b/internal/datanode/importv2/task_import.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "runtime/debug" "time" "github.com/cockroachdb/errors" @@ -36,6 +37,8 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" "github.com/milvus-io/milvus/pkg/v2/util/conc" + "github.com/milvus-io/milvus/pkg/v2/util/hardware" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -107,7 +110,27 @@ func (t *ImportTask) GetSlots() int64 { } func (t *ImportTask) GetBufferSize() int64 { - return GetTaskBufferSize(t) + // Calculate the task buffer size based on the number of vchannels and partitions + baseBufferSize := paramtable.Get().DataNodeCfg.ImportBaseBufferSize.GetAsInt() + vchannelNum := len(t.GetVchannels()) + partitionNum := len(t.GetPartitionIDs()) + taskBufferSize := int64(baseBufferSize * vchannelNum * partitionNum) + + // If the file size is smaller than the task buffer size, use the file size + fileSize := lo.MaxBy(t.GetFileStats(), func(a, b *datapb.ImportFileStats) bool { + return a.GetTotalMemorySize() > b.GetTotalMemorySize() + }).GetTotalMemorySize() + if fileSize != 0 && fileSize < taskBufferSize { + taskBufferSize = fileSize + } + + // Task buffer size should not exceed the memory limit + percentage := paramtable.Get().DataNodeCfg.ImportMemoryLimitPercentage.GetAsFloat() + memoryLimit := int64(float64(hardware.GetMemoryCount()) * percentage / 100.0) + if taskBufferSize > memoryLimit { + return memoryLimit + } + return taskBufferSize } func (t *ImportTask) Cancel() { @@ -139,11 +162,11 @@ func (t *ImportTask) Clone() Task { } func (t *ImportTask) Execute() []*conc.Future[any] { - bufferSize := int(t.GetBufferSize()) + bufferSize := t.GetBufferSize() log.Info("start to import", WrapLogFields(t, - zap.Int("bufferSize", bufferSize), + zap.Int64("bufferSize", bufferSize), zap.Int64("taskSlot", t.GetSlots()), - zap.Any("files", t.GetFileStats()), + zap.Any("files", t.req.GetFiles()), zap.Any("schema", t.GetSchema()), )...) t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) @@ -151,7 +174,7 @@ func (t *ImportTask) Execute() []*conc.Future[any] { req := t.req fn := func(file *internalpb.ImportFile) error { - reader, err := importutilv2.NewReader(t.ctx, t.cm, t.GetSchema(), file, req.GetOptions(), bufferSize, t.req.GetStorageConfig()) + reader, err := importutilv2.NewReader(t.ctx, t.cm, t.GetSchema(), file, req.GetOptions(), int(bufferSize), t.req.GetStorageConfig()) if err != nil { log.Warn("new reader failed", WrapLogFields(t, zap.String("file", file.String()), zap.Error(err))...) reason := fmt.Sprintf("error: %v, file: %s", err, file.String()) @@ -176,6 +199,12 @@ func (t *ImportTask) Execute() []*conc.Future[any] { for _, file := range req.GetFiles() { file := file f := GetExecPool().Submit(func() (any, error) { + // Use blocking allocation - this will wait until memory is available + GetMemoryAllocator().BlockingAllocate(t.GetTaskID(), bufferSize) + defer func() { + GetMemoryAllocator().Release(t.GetTaskID(), bufferSize) + debug.FreeOSMemory() + }() err := fn(file) return err, err }) diff --git a/internal/datanode/importv2/task_l0_import.go b/internal/datanode/importv2/task_l0_import.go index ffc96c13cc..d36a88ecbd 100644 --- a/internal/datanode/importv2/task_l0_import.go +++ b/internal/datanode/importv2/task_l0_import.go @@ -140,7 +140,7 @@ func (t *L0ImportTask) Execute() []*conc.Future[any] { log.Info("start to import l0", WrapLogFields(t, zap.Int("bufferSize", bufferSize), zap.Int64("taskSlot", t.GetSlots()), - zap.Any("files", t.GetFileStats()), + zap.Any("files", t.req.GetFiles()), zap.Any("schema", t.GetSchema()), )...) t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) diff --git a/internal/datanode/importv2/task_l0_preimport.go b/internal/datanode/importv2/task_l0_preimport.go index 8f085208f5..8e3dab269d 100644 --- a/internal/datanode/importv2/task_l0_preimport.go +++ b/internal/datanode/importv2/task_l0_preimport.go @@ -128,7 +128,7 @@ func (t *L0PreImportTask) Execute() []*conc.Future[any] { log.Info("start to preimport l0", WrapLogFields(t, zap.Int("bufferSize", bufferSize), zap.Int64("taskSlot", t.GetSlots()), - zap.Any("files", t.GetFileStats()), + zap.Any("files", t.req.GetImportFiles()), zap.Any("schema", t.GetSchema()), )...) t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) diff --git a/internal/datanode/importv2/task_preimport.go b/internal/datanode/importv2/task_preimport.go index bdc04132e4..e7880e708d 100644 --- a/internal/datanode/importv2/task_preimport.go +++ b/internal/datanode/importv2/task_preimport.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "runtime/debug" "time" "github.com/cockroachdb/errors" @@ -107,8 +108,9 @@ func (t *PreImportTask) GetSlots() int64 { return t.req.GetTaskSlot() } +// PreImportTask buffer size is fixed func (t *PreImportTask) GetBufferSize() int64 { - return GetTaskBufferSize(t) + return paramtable.Get().DataNodeCfg.ImportBaseBufferSize.GetAsInt64() } func (t *PreImportTask) Cancel() { @@ -136,7 +138,7 @@ func (t *PreImportTask) Execute() []*conc.Future[any] { log.Info("start to preimport", WrapLogFields(t, zap.Int("bufferSize", bufferSize), zap.Int64("taskSlot", t.GetSlots()), - zap.Any("files", t.GetFileStats()), + zap.Any("files", t.req.GetImportFiles()), zap.Any("schema", t.GetSchema()), )...) t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) @@ -172,6 +174,9 @@ func (t *PreImportTask) Execute() []*conc.Future[any] { i := i file := file f := GetExecPool().Submit(func() (any, error) { + defer func() { + debug.FreeOSMemory() + }() err := fn(i, file) return err, err }) diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index 78541b3472..7fe7c090cc 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -39,9 +39,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" - "github.com/milvus-io/milvus/pkg/v2/util/hardware" "github.com/milvus-io/milvus/pkg/v2/util/merr" - "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -558,19 +556,3 @@ func NewMetaCache(req *datapb.ImportRequest) map[string]metacache.MetaCache { } return metaCaches } - -// GetBufferSize returns the memory buffer size required by this task -func GetTaskBufferSize(task Task) int64 { - baseBufferSize := paramtable.Get().DataNodeCfg.ImportBaseBufferSize.GetAsInt() - vchannelNum := len(task.GetVchannels()) - partitionNum := len(task.GetPartitionIDs()) - totalBufferSize := int64(baseBufferSize * vchannelNum * partitionNum) - - percentage := paramtable.Get().DataNodeCfg.ImportMemoryLimitPercentage.GetAsFloat() - memoryLimit := int64(float64(hardware.GetMemoryCount()) * percentage / 100.0) - - if totalBufferSize > memoryLimit { - return memoryLimit - } - return totalBufferSize -} diff --git a/internal/util/importutilv2/parquet/reader.go b/internal/util/importutilv2/parquet/reader.go index 643bb087a0..91859f7e31 100644 --- a/internal/util/importutilv2/parquet/reader.go +++ b/internal/util/importutilv2/parquet/reader.go @@ -68,7 +68,15 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co log.Info("parquet file info", zap.Int("row group num", r.NumRowGroups()), zap.Int64("num rows", r.NumRows())) - fileReader, err := pqarrow.NewFileReader(r, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator) + count, err := common.EstimateReadCountPerBatch(bufferSize, schema) + if err != nil { + return nil, err + } + + readProps := pqarrow.ArrowReadProperties{ + BatchSize: count, + } + fileReader, err := pqarrow.NewFileReader(r, readProps, memory.DefaultAllocator) if err != nil { return nil, merr.WrapErrImportFailed(fmt.Sprintf("new parquet file reader failed, err=%v", err)) } @@ -77,10 +85,6 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co if err != nil { return nil, err } - count, err := common.EstimateReadCountPerBatch(bufferSize, schema) - if err != nil { - return nil, err - } return &reader{ ctx: ctx, cm: cm, diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 12e680d157..58dfe8809b 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -5555,14 +5555,14 @@ if this parameter <= 0, will set it as 10`, Key: "dataNode.import.memoryLimitPercentage", Version: "2.5.15", Doc: "The percentage of memory limit for import/pre-import tasks.", - DefaultValue: "20", + DefaultValue: "10", PanicIfEmpty: false, Export: true, Formatter: func(v string) string { percentage := getAsFloat(v) if percentage <= 0 || percentage > 100 { - log.Warn("invalid import memory limit percentage, using default 20%") - return "20" + log.Warn("invalid import memory limit percentage, using default 10%") + return "10" } return fmt.Sprintf("%f", percentage) }, diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index f3428f9ad7..6f224e165f 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -623,7 +623,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64()) assert.Equal(t, 16*1024*1024, Params.ImportBaseBufferSize.GetAsInt()) assert.Equal(t, 16*1024*1024, Params.ImportDeleteBufferSize.GetAsInt()) - assert.Equal(t, 20.0, Params.ImportMemoryLimitPercentage.GetAsFloat()) + assert.Equal(t, 10.0, Params.ImportMemoryLimitPercentage.GetAsFloat()) params.Save("datanode.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) assert.Equal(t, 16, Params.SlotCap.GetAsInt())