diff --git a/configs/milvus.yaml b/configs/milvus.yaml index fd7a30c49e..6d1c3f3874 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -754,7 +754,7 @@ dataNode: maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files. readBufferSizeInMB: 16 # The base insert buffer size (in MB) during import. The actual buffer size will be dynamically calculated based on the number of shards. readDeleteBufferSizeInMB: 16 # The delete buffer size (in MB) during import. - memoryLimitPercentage: 20 # The percentage of memory limit for import/pre-import tasks. + memoryLimitPercentage: 10 # The percentage of memory limit for import/pre-import tasks. compaction: levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1. diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 761abf9e32..77d2c85ad2 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -819,15 +819,22 @@ func CalculateTaskSlot(task ImportTask, importMeta ImportMeta) int { } // Calculate memory-based slots + var taskBufferSize int baseBufferSize := paramtable.Get().DataNodeCfg.ImportBaseBufferSize.GetAsInt() - totalBufferSize := baseBufferSize * len(job.GetVchannels()) * len(job.GetPartitionIDs()) + if task.GetType() == ImportTaskType { + // ImportTask use dynamic buffer size calculated by vchannels and partitions + taskBufferSize = int(baseBufferSize) * len(job.GetVchannels()) * len(job.GetPartitionIDs()) + } else { + // PreImportTask use fixed buffer size + taskBufferSize = baseBufferSize + } isL0Import := importutilv2.IsL0Import(job.GetOptions()) if isL0Import { - // L0 import won't hash the data by channel or partition - totalBufferSize = paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt() + // L0 import use fixed buffer size + taskBufferSize = paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt() } memoryLimitPerSlot := paramtable.Get().DataCoordCfg.ImportMemoryLimitPerSlot.GetAsInt() - memoryBasedSlots := totalBufferSize / memoryLimitPerSlot + memoryBasedSlots := taskBufferSize / memoryLimitPerSlot // Return the larger value to ensure both CPU and memory constraints are satisfied if cpuBasedSlots > memoryBasedSlots { diff --git a/internal/datanode/importv2/memory_allocator.go b/internal/datanode/importv2/memory_allocator.go index d3341fa70f..0b9855bb45 100644 --- a/internal/datanode/importv2/memory_allocator.go +++ b/internal/datanode/importv2/memory_allocator.go @@ -22,14 +22,28 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/util/hardware" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) +var ( + globalMemoryAllocator MemoryAllocator + globalMemoryAllocatorOnce sync.Once +) + +// GetMemoryAllocator returns the global memory allocator instance +func GetMemoryAllocator() MemoryAllocator { + globalMemoryAllocatorOnce.Do(func() { + globalMemoryAllocator = NewMemoryAllocator(int64(hardware.GetMemoryCount())) + }) + return globalMemoryAllocator +} + // MemoryAllocator handles memory allocation and deallocation for import tasks type MemoryAllocator interface { - // TryAllocate attempts to allocate memory of the specified size - // Returns true if allocation is successful, false if insufficient memory - TryAllocate(taskID int64, size int64) bool + // BlockingAllocate blocks until memory is available and then allocates + // This method will block until memory becomes available + BlockingAllocate(taskID int64, size int64) // Release releases memory of the specified size Release(taskID int64, size int64) @@ -39,43 +53,46 @@ type memoryAllocator struct { systemTotalMemory int64 usedMemory int64 mutex sync.RWMutex + cond *sync.Cond } // NewMemoryAllocator creates a new MemoryAllocator instance func NewMemoryAllocator(systemTotalMemory int64) MemoryAllocator { log.Info("new import memory allocator", zap.Int64("systemTotalMemory", systemTotalMemory)) ma := &memoryAllocator{ - systemTotalMemory: int64(systemTotalMemory), + systemTotalMemory: systemTotalMemory, usedMemory: 0, } + ma.cond = sync.NewCond(&ma.mutex) return ma } -// TryAllocate attempts to allocate memory of the specified size -func (ma *memoryAllocator) TryAllocate(taskID int64, size int64) bool { +// BlockingAllocate blocks until memory is available and then allocates +func (ma *memoryAllocator) BlockingAllocate(taskID int64, size int64) { ma.mutex.Lock() defer ma.mutex.Unlock() percentage := paramtable.Get().DataNodeCfg.ImportMemoryLimitPercentage.GetAsFloat() memoryLimit := int64(float64(ma.systemTotalMemory) * percentage / 100.0) - if ma.usedMemory+size > memoryLimit { - log.Info("memory allocation failed, insufficient memory", + // Wait until enough memory is available + for ma.usedMemory+size > memoryLimit { + log.Warn("task waiting for memory allocation...", zap.Int64("taskID", taskID), zap.Int64("requestedSize", size), zap.Int64("usedMemory", ma.usedMemory), zap.Int64("availableMemory", memoryLimit-ma.usedMemory)) - return false + + ma.cond.Wait() } + // Allocate memory ma.usedMemory += size log.Info("memory allocated successfully", zap.Int64("taskID", taskID), zap.Int64("allocatedSize", size), zap.Int64("usedMemory", ma.usedMemory), zap.Int64("availableMemory", memoryLimit-ma.usedMemory)) - - return true } // Release releases memory of the specified size @@ -95,4 +112,7 @@ func (ma *memoryAllocator) Release(taskID int64, size int64) { zap.Int64("taskID", taskID), zap.Int64("releasedSize", size), zap.Int64("usedMemory", ma.usedMemory)) + + // Wake up waiting tasks after memory is released + ma.cond.Broadcast() } diff --git a/internal/datanode/importv2/memory_allocator_test.go b/internal/datanode/importv2/memory_allocator_test.go index da2b161adb..d692a2145e 100644 --- a/internal/datanode/importv2/memory_allocator_test.go +++ b/internal/datanode/importv2/memory_allocator_test.go @@ -17,7 +17,10 @@ package importv2 import ( + "math/rand" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -30,14 +33,12 @@ func TestMemoryAllocatorBasicOperations(t *testing.T) { // Test initial state assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory) - // Test memory allocation for task 1 - success := ma.TryAllocate(1, 50*1024*1024) // 50MB for task 1 - assert.True(t, success) + // Test memory allocation for task 1 using BlockingAllocate + ma.BlockingAllocate(1, 50*1024*1024) // 50MB for task 1 assert.Equal(t, int64(50*1024*1024), ma.(*memoryAllocator).usedMemory) // Test memory allocation for task 2 - success = ma.TryAllocate(2, 50*1024*1024) // 50MB for task 2 - assert.True(t, success) + ma.BlockingAllocate(2, 50*1024*1024) // 50MB for task 2 assert.Equal(t, int64(100*1024*1024), ma.(*memoryAllocator).usedMemory) // Test memory release for task 1 @@ -60,17 +61,25 @@ func TestMemoryAllocatorMemoryLimit(t *testing.T) { testSize := memoryLimit / 10 // Use 10% of available memory // Allocate memory up to the limit - success := ma.TryAllocate(1, testSize) - assert.True(t, success) + ma.BlockingAllocate(1, testSize) assert.Equal(t, testSize, ma.(*memoryAllocator).usedMemory) - // Try to allocate more memory than available (should fail) - success = ma.TryAllocate(2, memoryLimit) - assert.False(t, success) - assert.Equal(t, testSize, ma.(*memoryAllocator).usedMemory) // Should remain unchanged + // Try to allocate more memory than available (this will block, so we test in a goroutine) + done := make(chan bool) + go func() { + ma.BlockingAllocate(2, testSize) + done <- true + }() - // Release the allocated memory + // Release the allocated memory to unblock the waiting allocation ma.Release(1, testSize) + <-done + + // Verify that the second allocation succeeded after release + assert.Equal(t, testSize, ma.(*memoryAllocator).usedMemory) + + // Release the second allocation + ma.Release(2, testSize) assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory) } @@ -84,10 +93,8 @@ func TestMemoryAllocatorConcurrentAccess(t *testing.T) { for i := 0; i < 10; i++ { taskID := int64(i + 1) go func() { - success := ma.TryAllocate(taskID, 50*1024*1024) // 50MB each - if success { - ma.Release(taskID, 50*1024*1024) - } + ma.BlockingAllocate(taskID, 50*1024*1024) // 50MB each + ma.Release(taskID, 50*1024*1024) done <- true }() } @@ -97,10 +104,9 @@ func TestMemoryAllocatorConcurrentAccess(t *testing.T) { <-done } - // Verify final state - should be 0 or the sum of remaining allocations - // depending on memory limit and concurrent execution + // Verify final state - should be 0 since all allocations were released finalMemory := ma.(*memoryAllocator).usedMemory - assert.GreaterOrEqual(t, finalMemory, int64(0)) + assert.Equal(t, int64(0), finalMemory) } // TestMemoryAllocatorNegativeRelease tests handling of negative memory release @@ -109,8 +115,7 @@ func TestMemoryAllocatorNegativeRelease(t *testing.T) { ma := NewMemoryAllocator(1024 * 1024 * 1024) // Allocate some memory - success := ma.TryAllocate(1, 100*1024*1024) // 100MB - assert.True(t, success) + ma.BlockingAllocate(1, 100*1024*1024) // 100MB assert.Equal(t, int64(100*1024*1024), ma.(*memoryAllocator).usedMemory) // Release more than allocated (should not go negative) @@ -121,15 +126,14 @@ func TestMemoryAllocatorNegativeRelease(t *testing.T) { // TestMemoryAllocatorMultipleTasks tests memory management for multiple tasks func TestMemoryAllocatorMultipleTasks(t *testing.T) { // Create memory allocator with 1GB system memory - ma := NewMemoryAllocator(1024 * 1024 * 1024) + ma := NewMemoryAllocator(1024 * 1024 * 1024 * 2) // Allocate memory for multiple tasks with smaller sizes taskIDs := []int64{1, 2, 3, 4, 5} sizes := []int64{20, 30, 25, 15, 35} // Total: 125MB for i, taskID := range taskIDs { - success := ma.TryAllocate(taskID, sizes[i]*1024*1024) - assert.True(t, success) + ma.BlockingAllocate(taskID, sizes[i]*1024*1024) } // Verify total used memory @@ -155,3 +159,70 @@ func TestMemoryAllocatorMultipleTasks(t *testing.T) { // Verify final state assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory) } + +// TestMemoryAllocatorZeroSize tests handling of zero size allocations +func TestMemoryAllocatorZeroSize(t *testing.T) { + // Create memory allocator + ma := NewMemoryAllocator(1024 * 1024 * 1024) + + // Test zero size allocation + ma.BlockingAllocate(1, 0) + assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory) + + // Test zero size release + ma.Release(1, 0) + assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory) +} + +// TestMemoryAllocatorSimple tests basic functionality without external dependencies +func TestMemoryAllocatorSimple(t *testing.T) { + // Create memory allocator with 1GB system memory + ma := NewMemoryAllocator(1024 * 1024 * 1024) + + // Test initial state + assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory) + + // Test memory allocation + ma.BlockingAllocate(1, 50*1024*1024) // 50MB + assert.Equal(t, int64(50*1024*1024), ma.(*memoryAllocator).usedMemory) + + // Test memory release + ma.Release(1, 50*1024*1024) + assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory) +} + +// TestMemoryAllocatorMassiveConcurrency tests massive concurrent memory allocation and release +func TestMemoryAllocatorMassiveConcurrency(t *testing.T) { + // Create memory allocator with 1.6GB system memory + totalMemory := int64(16 * 1024 * 1024 * 1024) // 16GB * 10% + ma := NewMemoryAllocator(totalMemory) + + const numTasks = 200 + + var wg sync.WaitGroup + wg.Add(numTasks) + // Start concurrent allocation and release + for i := 0; i < numTasks; i++ { + taskID := int64(i + 1) + var memorySize int64 + // 10% chance to allocate 1.6GB, 90% chance to allocate 128MB-1536MB + if rand.Float64() < 0.1 { + memorySize = int64(1600 * 1024 * 1024) + } else { + multiple := rand.Intn(12) + 1 + memorySize = int64(multiple * 128 * 1024 * 1024) // 128MB to 1536MB + } + + go func(id int64, size int64) { + defer wg.Done() + ma.BlockingAllocate(id, size) + time.Sleep(1 * time.Millisecond) + ma.Release(id, size) + }(taskID, memorySize) + } + wg.Wait() + + // Assert that all memory is released + finalMemory := ma.(*memoryAllocator).usedMemory + assert.Equal(t, int64(0), finalMemory, "All memory should be released") +} diff --git a/internal/datanode/importv2/scheduler.go b/internal/datanode/importv2/scheduler.go index bbcd1fa583..9262ce2663 100644 --- a/internal/datanode/importv2/scheduler.go +++ b/internal/datanode/importv2/scheduler.go @@ -27,7 +27,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/conc" - "github.com/milvus-io/milvus/pkg/v2/util/hardware" ) type Scheduler interface { @@ -37,19 +36,16 @@ type Scheduler interface { } type scheduler struct { - manager TaskManager - memoryAllocator MemoryAllocator + manager TaskManager closeOnce sync.Once closeChan chan struct{} } func NewScheduler(manager TaskManager) Scheduler { - memoryAllocator := NewMemoryAllocator(int64(hardware.GetMemoryCount())) return &scheduler{ - manager: manager, - memoryAllocator: memoryAllocator, - closeChan: make(chan struct{}), + manager: manager, + closeChan: make(chan struct{}), } } @@ -76,34 +72,23 @@ func (s *scheduler) Start() { } } -// scheduleTasks implements memory-based task scheduling using MemoryAllocator -// It selects tasks that can fit within the available memory. func (s *scheduler) scheduleTasks() { - pendingTasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending)) - sort.Slice(pendingTasks, func(i, j int) bool { - return pendingTasks[i].GetTaskID() < pendingTasks[j].GetTaskID() + tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending)) + sort.Slice(tasks, func(i, j int) bool { + return tasks[i].GetTaskID() < tasks[j].GetTaskID() }) - selectedTasks := make([]Task, 0) - tasksBufferSize := make(map[int64]int64) - for _, task := range pendingTasks { - taskBufferSize := task.GetBufferSize() - if s.memoryAllocator.TryAllocate(task.GetTaskID(), taskBufferSize) { - selectedTasks = append(selectedTasks, task) - tasksBufferSize[task.GetTaskID()] = taskBufferSize - } - } - - if len(selectedTasks) == 0 { + if len(tasks) == 0 { return } - log.Info("processing selected tasks", - zap.Int("pending", len(pendingTasks)), - zap.Int("selected", len(selectedTasks))) + taskIDs := lo.Map(tasks, func(t Task, _ int) int64 { + return t.GetTaskID() + }) + log.Info("processing tasks...", zap.Int64s("taskIDs", taskIDs)) futures := make(map[int64][]*conc.Future[any]) - for _, task := range selectedTasks { + for _, task := range tasks { fs := task.Execute() futures[task.GetTaskID()] = fs } @@ -111,16 +96,16 @@ func (s *scheduler) scheduleTasks() { for taskID, fs := range futures { err := conc.AwaitAll(fs...) if err != nil { - s.memoryAllocator.Release(taskID, tasksBufferSize[taskID]) continue } s.manager.Update(taskID, UpdateState(datapb.ImportTaskStateV2_Completed)) - s.memoryAllocator.Release(taskID, tasksBufferSize[taskID]) log.Info("preimport/import done", zap.Int64("taskID", taskID)) } + + log.Info("all tasks completed", zap.Int64s("taskIDs", taskIDs)) } -// Slots returns the available slots for import +// Slots returns the used slots for import func (s *scheduler) Slots() int64 { tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending, datapb.ImportTaskStateV2_InProgress)) used := lo.SumBy(tasks, func(t Task) int64 { diff --git a/internal/datanode/importv2/scheduler_test.go b/internal/datanode/importv2/scheduler_test.go index 400e3755aa..ba6d976558 100644 --- a/internal/datanode/importv2/scheduler_test.go +++ b/internal/datanode/importv2/scheduler_test.go @@ -26,7 +26,6 @@ import ( "time" "github.com/cockroachdb/errors" - "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -553,88 +552,6 @@ func (s *SchedulerSuite) TestScheduler_ImportFileWithFunction() { s.NoError(err) } -// TestScheduler_ScheduleTasks tests the scheduleTasks method with various scenarios -func (s *SchedulerSuite) TestScheduler_ScheduleTasks() { - // Memory limit exceeded - some tasks should be skipped - s.Run("MemoryLimitExceeded", func() { - manager := NewMockTaskManager(s.T()) - s.scheduler.manager = manager - - // Add tasks that exceed memory limit - tasks := make(map[int64]Task, 0) - for i := 0; i < 5; i++ { - t := NewMockTask(s.T()) - t.EXPECT().GetTaskID().Return(int64(i)) - t.EXPECT().GetBufferSize().Return(int64(300)) - if i < 3 { // Only first 3 tasks should be allocated (900 total) - t.EXPECT().Execute().Return([]*conc.Future[any]{}) - } - tasks[t.GetTaskID()] = t - } - - manager.EXPECT().GetBy(mock.Anything).Return(lo.Values(tasks)) - manager.EXPECT().Update(mock.Anything, mock.Anything).Return() - - memAllocator := NewMemoryAllocator(1000 / 0.2) - s.scheduler.memoryAllocator = memAllocator - - s.scheduler.scheduleTasks() - s.Equal(int64(0), memAllocator.(*memoryAllocator).usedMemory) - }) - - // Task execution failure - memory should be released - s.Run("TaskExecutionFailure", func() { - manager := NewMockTaskManager(s.T()) - s.scheduler.manager = manager - - tasks := make(map[int64]Task, 0) - // Create a task that will fail execution - failedTask := NewMockTask(s.T()) - failedTask.EXPECT().GetTaskID().Return(int64(1)) - failedTask.EXPECT().GetBufferSize().Return(int64(256)) - - // Create a future that will fail - failedFuture := conc.Go(func() (any, error) { - return nil, errors.New("mock execution error") - }) - failedTask.EXPECT().Execute().Return([]*conc.Future[any]{failedFuture}) - tasks[failedTask.GetTaskID()] = failedTask - - // Create a successful task - successTask := NewMockTask(s.T()) - successTask.EXPECT().GetTaskID().Return(int64(2)) - successTask.EXPECT().GetBufferSize().Return(int64(128)) - successTask.EXPECT().Execute().Return([]*conc.Future[any]{}) - tasks[successTask.GetTaskID()] = successTask - - manager.EXPECT().GetBy(mock.Anything).Return(lo.Values(tasks)) - manager.EXPECT().Update(mock.Anything, mock.Anything).Return() - - memAllocator := NewMemoryAllocator(512 * 5) - s.scheduler.memoryAllocator = memAllocator - - s.scheduler.scheduleTasks() - s.Equal(int64(0), memAllocator.(*memoryAllocator).usedMemory) - }) - - // Empty task list - s.Run("EmptyTaskList", func() { - manager := NewMockTaskManager(s.T()) - s.scheduler.manager = manager - - memAllocator := NewMemoryAllocator(1024) - s.scheduler.memoryAllocator = memAllocator - - manager.EXPECT().GetBy(mock.Anything).Return(nil) - - // Should not panic or error - s.NotPanics(func() { - s.scheduler.scheduleTasks() - }) - s.Equal(int64(0), memAllocator.(*memoryAllocator).usedMemory) - }) -} - func TestScheduler(t *testing.T) { suite.Run(t, new(SchedulerSuite)) } diff --git a/internal/datanode/importv2/task_import.go b/internal/datanode/importv2/task_import.go index bdf8c4aa6e..0d9fcd7102 100644 --- a/internal/datanode/importv2/task_import.go +++ b/internal/datanode/importv2/task_import.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "runtime/debug" "time" "github.com/cockroachdb/errors" @@ -36,6 +37,8 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" "github.com/milvus-io/milvus/pkg/v2/util/conc" + "github.com/milvus-io/milvus/pkg/v2/util/hardware" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -107,7 +110,27 @@ func (t *ImportTask) GetSlots() int64 { } func (t *ImportTask) GetBufferSize() int64 { - return GetTaskBufferSize(t) + // Calculate the task buffer size based on the number of vchannels and partitions + baseBufferSize := paramtable.Get().DataNodeCfg.ImportBaseBufferSize.GetAsInt() + vchannelNum := len(t.GetVchannels()) + partitionNum := len(t.GetPartitionIDs()) + taskBufferSize := int64(baseBufferSize * vchannelNum * partitionNum) + + // If the file size is smaller than the task buffer size, use the file size + fileSize := lo.MaxBy(t.GetFileStats(), func(a, b *datapb.ImportFileStats) bool { + return a.GetTotalMemorySize() > b.GetTotalMemorySize() + }).GetTotalMemorySize() + if fileSize != 0 && fileSize < taskBufferSize { + taskBufferSize = fileSize + } + + // Task buffer size should not exceed the memory limit + percentage := paramtable.Get().DataNodeCfg.ImportMemoryLimitPercentage.GetAsFloat() + memoryLimit := int64(float64(hardware.GetMemoryCount()) * percentage / 100.0) + if taskBufferSize > memoryLimit { + return memoryLimit + } + return taskBufferSize } func (t *ImportTask) Cancel() { @@ -139,11 +162,11 @@ func (t *ImportTask) Clone() Task { } func (t *ImportTask) Execute() []*conc.Future[any] { - bufferSize := int(t.GetBufferSize()) + bufferSize := t.GetBufferSize() log.Info("start to import", WrapLogFields(t, - zap.Int("bufferSize", bufferSize), + zap.Int64("bufferSize", bufferSize), zap.Int64("taskSlot", t.GetSlots()), - zap.Any("files", t.GetFileStats()), + zap.Any("files", t.req.GetFiles()), zap.Any("schema", t.GetSchema()), )...) t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) @@ -151,7 +174,7 @@ func (t *ImportTask) Execute() []*conc.Future[any] { req := t.req fn := func(file *internalpb.ImportFile) error { - reader, err := importutilv2.NewReader(t.ctx, t.cm, t.GetSchema(), file, req.GetOptions(), bufferSize, t.req.GetStorageConfig()) + reader, err := importutilv2.NewReader(t.ctx, t.cm, t.GetSchema(), file, req.GetOptions(), int(bufferSize), t.req.GetStorageConfig()) if err != nil { log.Warn("new reader failed", WrapLogFields(t, zap.String("file", file.String()), zap.Error(err))...) reason := fmt.Sprintf("error: %v, file: %s", err, file.String()) @@ -176,6 +199,12 @@ func (t *ImportTask) Execute() []*conc.Future[any] { for _, file := range req.GetFiles() { file := file f := GetExecPool().Submit(func() (any, error) { + // Use blocking allocation - this will wait until memory is available + GetMemoryAllocator().BlockingAllocate(t.GetTaskID(), bufferSize) + defer func() { + GetMemoryAllocator().Release(t.GetTaskID(), bufferSize) + debug.FreeOSMemory() + }() err := fn(file) return err, err }) diff --git a/internal/datanode/importv2/task_l0_import.go b/internal/datanode/importv2/task_l0_import.go index ffc96c13cc..d36a88ecbd 100644 --- a/internal/datanode/importv2/task_l0_import.go +++ b/internal/datanode/importv2/task_l0_import.go @@ -140,7 +140,7 @@ func (t *L0ImportTask) Execute() []*conc.Future[any] { log.Info("start to import l0", WrapLogFields(t, zap.Int("bufferSize", bufferSize), zap.Int64("taskSlot", t.GetSlots()), - zap.Any("files", t.GetFileStats()), + zap.Any("files", t.req.GetFiles()), zap.Any("schema", t.GetSchema()), )...) t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) diff --git a/internal/datanode/importv2/task_l0_preimport.go b/internal/datanode/importv2/task_l0_preimport.go index 8f085208f5..8e3dab269d 100644 --- a/internal/datanode/importv2/task_l0_preimport.go +++ b/internal/datanode/importv2/task_l0_preimport.go @@ -128,7 +128,7 @@ func (t *L0PreImportTask) Execute() []*conc.Future[any] { log.Info("start to preimport l0", WrapLogFields(t, zap.Int("bufferSize", bufferSize), zap.Int64("taskSlot", t.GetSlots()), - zap.Any("files", t.GetFileStats()), + zap.Any("files", t.req.GetImportFiles()), zap.Any("schema", t.GetSchema()), )...) t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) diff --git a/internal/datanode/importv2/task_preimport.go b/internal/datanode/importv2/task_preimport.go index bdc04132e4..e7880e708d 100644 --- a/internal/datanode/importv2/task_preimport.go +++ b/internal/datanode/importv2/task_preimport.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "runtime/debug" "time" "github.com/cockroachdb/errors" @@ -107,8 +108,9 @@ func (t *PreImportTask) GetSlots() int64 { return t.req.GetTaskSlot() } +// PreImportTask buffer size is fixed func (t *PreImportTask) GetBufferSize() int64 { - return GetTaskBufferSize(t) + return paramtable.Get().DataNodeCfg.ImportBaseBufferSize.GetAsInt64() } func (t *PreImportTask) Cancel() { @@ -136,7 +138,7 @@ func (t *PreImportTask) Execute() []*conc.Future[any] { log.Info("start to preimport", WrapLogFields(t, zap.Int("bufferSize", bufferSize), zap.Int64("taskSlot", t.GetSlots()), - zap.Any("files", t.GetFileStats()), + zap.Any("files", t.req.GetImportFiles()), zap.Any("schema", t.GetSchema()), )...) t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) @@ -172,6 +174,9 @@ func (t *PreImportTask) Execute() []*conc.Future[any] { i := i file := file f := GetExecPool().Submit(func() (any, error) { + defer func() { + debug.FreeOSMemory() + }() err := fn(i, file) return err, err }) diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index 78541b3472..7fe7c090cc 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -39,9 +39,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" - "github.com/milvus-io/milvus/pkg/v2/util/hardware" "github.com/milvus-io/milvus/pkg/v2/util/merr" - "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -558,19 +556,3 @@ func NewMetaCache(req *datapb.ImportRequest) map[string]metacache.MetaCache { } return metaCaches } - -// GetBufferSize returns the memory buffer size required by this task -func GetTaskBufferSize(task Task) int64 { - baseBufferSize := paramtable.Get().DataNodeCfg.ImportBaseBufferSize.GetAsInt() - vchannelNum := len(task.GetVchannels()) - partitionNum := len(task.GetPartitionIDs()) - totalBufferSize := int64(baseBufferSize * vchannelNum * partitionNum) - - percentage := paramtable.Get().DataNodeCfg.ImportMemoryLimitPercentage.GetAsFloat() - memoryLimit := int64(float64(hardware.GetMemoryCount()) * percentage / 100.0) - - if totalBufferSize > memoryLimit { - return memoryLimit - } - return totalBufferSize -} diff --git a/internal/util/importutilv2/parquet/reader.go b/internal/util/importutilv2/parquet/reader.go index 643bb087a0..91859f7e31 100644 --- a/internal/util/importutilv2/parquet/reader.go +++ b/internal/util/importutilv2/parquet/reader.go @@ -68,7 +68,15 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co log.Info("parquet file info", zap.Int("row group num", r.NumRowGroups()), zap.Int64("num rows", r.NumRows())) - fileReader, err := pqarrow.NewFileReader(r, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator) + count, err := common.EstimateReadCountPerBatch(bufferSize, schema) + if err != nil { + return nil, err + } + + readProps := pqarrow.ArrowReadProperties{ + BatchSize: count, + } + fileReader, err := pqarrow.NewFileReader(r, readProps, memory.DefaultAllocator) if err != nil { return nil, merr.WrapErrImportFailed(fmt.Sprintf("new parquet file reader failed, err=%v", err)) } @@ -77,10 +85,6 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co if err != nil { return nil, err } - count, err := common.EstimateReadCountPerBatch(bufferSize, schema) - if err != nil { - return nil, err - } return &reader{ ctx: ctx, cm: cm, diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 12e680d157..58dfe8809b 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -5555,14 +5555,14 @@ if this parameter <= 0, will set it as 10`, Key: "dataNode.import.memoryLimitPercentage", Version: "2.5.15", Doc: "The percentage of memory limit for import/pre-import tasks.", - DefaultValue: "20", + DefaultValue: "10", PanicIfEmpty: false, Export: true, Formatter: func(v string) string { percentage := getAsFloat(v) if percentage <= 0 || percentage > 100 { - log.Warn("invalid import memory limit percentage, using default 20%") - return "20" + log.Warn("invalid import memory limit percentage, using default 10%") + return "10" } return fmt.Sprintf("%f", percentage) }, diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index f3428f9ad7..6f224e165f 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -623,7 +623,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64()) assert.Equal(t, 16*1024*1024, Params.ImportBaseBufferSize.GetAsInt()) assert.Equal(t, 16*1024*1024, Params.ImportDeleteBufferSize.GetAsInt()) - assert.Equal(t, 20.0, Params.ImportMemoryLimitPercentage.GetAsFloat()) + assert.Equal(t, 10.0, Params.ImportMemoryLimitPercentage.GetAsFloat()) params.Save("datanode.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) assert.Equal(t, 16, Params.SlotCap.GetAsInt())