fix: Prevent import from generating small binlogs (#43132)

- Introduce dynamic buffer sizing to avoid generating small binlogs
during import
- Refactor import slot calculation based on CPU and memory constraints
- Implement dynamic pool sizing for sync manager and import tasks
according to CPU core count

issue: https://github.com/milvus-io/milvus/issues/43131

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-07-07 21:32:47 +08:00 committed by GitHub
parent a0ae5bccc9
commit 9cbd194c6b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 1823 additions and 988 deletions

View File

@ -687,13 +687,15 @@ dataCoord:
import: import:
filesPerPreImportTask: 2 # The maximum number of files allowed per pre-import task. filesPerPreImportTask: 2 # The maximum number of files allowed per pre-import task.
taskRetention: 10800 # The retention period in seconds for tasks in the Completed or Failed state. taskRetention: 10800 # The retention period in seconds for tasks in the Completed or Failed state.
maxSizeInMBPerImportTask: 6144 # To prevent generating of small segments, we will re-group imported files. This parameter represents the sum of file sizes in each group (each ImportTask). maxSizeInMBPerImportTask: 16384 # To prevent generating of small segments, we will re-group imported files. This parameter represents the sum of file sizes in each group (each ImportTask).
scheduleInterval: 2 # The interval for scheduling import, measured in seconds. scheduleInterval: 2 # The interval for scheduling import, measured in seconds.
checkIntervalHigh: 2 # The interval for checking import, measured in seconds, is set to a high frequency for the import checker. checkIntervalHigh: 2 # The interval for checking import, measured in seconds, is set to a high frequency for the import checker.
checkIntervalLow: 120 # The interval for checking import, measured in seconds, is set to a low frequency for the import checker. checkIntervalLow: 120 # The interval for checking import, measured in seconds, is set to a low frequency for the import checker.
maxImportFileNumPerReq: 1024 # The maximum number of files allowed per single import request. maxImportFileNumPerReq: 1024 # The maximum number of files allowed per single import request.
maxImportJobNum: 1024 # Maximum number of import jobs that are executing or pending. maxImportJobNum: 1024 # Maximum number of import jobs that are executing or pending.
waitForIndex: true # Indicates whether the import operation waits for the completion of index building. waitForIndex: true # Indicates whether the import operation waits for the completion of index building.
fileNumPerSlot: 1 # The files number per slot for pre-import/import task.
memoryLimitPerSlot: 160 # The memory limit (in MB) of buffer size per slot for pre-import/import task.
gracefulStopTimeout: 5 # seconds. force stop node without graceful stop gracefulStopTimeout: 5 # seconds. force stop node without graceful stop
slot: slot:
clusteringCompactionUsage: 65536 # slot usage of clustering compaction task, setting it to 65536 means it takes up a whole worker. clusteringCompactionUsage: 65536 # slot usage of clustering compaction task, setting it to 65536 means it takes up a whole worker.
@ -719,7 +721,7 @@ dataNode:
flowGraph: flowGraph:
maxQueueLength: 16 # Maximum length of task queue in flowgraph maxQueueLength: 16 # Maximum length of task queue in flowgraph
maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph
maxParallelSyncMgrTasks: 64 # The max concurrent sync task number of datanode sync mgr globally maxParallelSyncMgrTasksPerCPUCore: 16 # The max concurrent sync task number of datanode sync mgr per CPU core
skipMode: skipMode:
enable: true # Support skip some timetick message to reduce CPU usage enable: true # Support skip some timetick message to reduce CPU usage
skipNum: 4 # Consume one for every n records skipped skipNum: 4 # Consume one for every n records skipped
@ -751,11 +753,12 @@ dataNode:
maxChannelCheckpointsPerPRC: 128 # The maximum number of channel checkpoints per UpdateChannelCheckpoint RPC. maxChannelCheckpointsPerPRC: 128 # The maximum number of channel checkpoints per UpdateChannelCheckpoint RPC.
channelCheckpointUpdateTickInSeconds: 10 # The frequency, in seconds, at which the channel checkpoint updater executes updates. channelCheckpointUpdateTickInSeconds: 10 # The frequency, in seconds, at which the channel checkpoint updater executes updates.
import: import:
maxConcurrentTaskNum: 16 # The maximum number of import/pre-import tasks allowed to run concurrently on a datanode. concurrencyPerCPUCore: 4 # The execution concurrency unit for import/pre-import tasks per CPU core.
maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files. maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files.
readBufferSizeInMB: 64 # The insert buffer size (in MB) during import. readBufferSizeInMB: 16 # The base insert buffer size (in MB) during import. The actual buffer size will be dynamically calculated based on the number of shards.
readDeleteBufferSizeInMB: 16 # The delete buffer size (in MB) during import. readDeleteBufferSizeInMB: 16 # The delete buffer size (in MB) during import.
maxTaskSlotNum: 16 # The maximum number of slots occupied by each import/pre-import task. maxTaskSlotNum: 16 # The maximum number of slots occupied by each import/pre-import task.
memoryLimitPercentage: 20 # The percentage of memory limit for import/pre-import tasks.
compaction: compaction:
levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode
levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1. levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1.

View File

@ -442,7 +442,7 @@ func (m *CompactionTriggerManager) addL0ImportTaskForImport(ctx context.Context,
}, },
}, },
}, },
}, job, m.allocator, m.meta, m.importMeta) }, job, m.allocator, m.meta, m.importMeta, paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt())
if err != nil { if err != nil {
log.Warn("new import tasks failed", zap.Error(err)) log.Warn("new import tasks failed", zap.Error(err))
return err return err

View File

@ -284,9 +284,9 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
return return
} }
allDiskIndex := c.meta.indexMeta.AreAllDiskIndex(job.GetCollectionID(), job.GetSchema()) segmentMaxSize := GetSegmentMaxSize(job, c.meta)
groups := RegroupImportFiles(job, lacks, allDiskIndex) groups := RegroupImportFiles(job, lacks, segmentMaxSize)
newTasks, err := NewImportTasks(groups, job, c.alloc, c.meta, c.importMeta) newTasks, err := NewImportTasks(groups, job, c.alloc, c.meta, c.importMeta, segmentMaxSize)
if err != nil { if err != nil {
log.Warn("new import tasks failed", zap.Error(err)) log.Warn("new import tasks failed", zap.Error(err))
return return

View File

@ -34,7 +34,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/taskcommon" "github.com/milvus-io/milvus/pkg/v2/taskcommon"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo" "github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord" "github.com/milvus-io/milvus/pkg/v2/util/timerecord"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
@ -126,14 +125,7 @@ func (t *importTask) GetTaskNodeID() int64 {
} }
func (t *importTask) GetTaskSlot() int64 { func (t *importTask) GetTaskSlot() int64 {
// Consider the following two scenarios: return int64(CalculateTaskSlot(t, t.importMeta))
// 1. Importing a large number of small files results in
// a small total data size, making file count unsuitable as a slot number.
// 2. Importing a file with many shards number results in many segments and a small total data size,
// making segment count unsuitable as a slot number.
// Taking these factors into account, we've decided to use the
// minimum value between segment count and file count as the slot number.
return int64(funcutil.Min(len(t.GetFileStats()), len(t.GetSegmentIDs())))
} }
func (t *importTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster) { func (t *importTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster) {

View File

@ -30,7 +30,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/taskcommon" "github.com/milvus-io/milvus/pkg/v2/taskcommon"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo" "github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord" "github.com/milvus-io/milvus/pkg/v2/util/timerecord"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
@ -92,7 +91,7 @@ func (p *preImportTask) GetTaskState() taskcommon.State {
} }
func (p *preImportTask) GetTaskSlot() int64 { func (p *preImportTask) GetTaskSlot() int64 {
return int64(funcutil.Min(len(p.GetFileStats()))) return int64(CalculateTaskSlot(p, p.importMeta))
} }
func (p *preImportTask) SetTaskTime(timeType taskcommon.TimeType, time time.Time) { func (p *preImportTask) SetTaskTime(timeType taskcommon.TimeType, time time.Time) {

View File

@ -95,7 +95,7 @@ func NewPreImportTasks(fileGroups [][]*internalpb.ImportFile,
} }
func NewImportTasks(fileGroups [][]*datapb.ImportFileStats, func NewImportTasks(fileGroups [][]*datapb.ImportFileStats,
job ImportJob, alloc allocator.Allocator, meta *meta, importMeta ImportMeta, job ImportJob, alloc allocator.Allocator, meta *meta, importMeta ImportMeta, segmentMaxSize int,
) ([]ImportTask, error) { ) ([]ImportTask, error) {
idBegin, _, err := alloc.AllocN(int64(len(fileGroups))) idBegin, _, err := alloc.AllocN(int64(len(fileGroups)))
if err != nil { if err != nil {
@ -120,7 +120,7 @@ func NewImportTasks(fileGroups [][]*datapb.ImportFileStats,
times: taskcommon.NewTimes(), times: taskcommon.NewTimes(),
} }
task.task.Store(taskProto) task.task.Store(taskProto)
segments, err := AssignSegments(job, task, alloc, meta) segments, err := AssignSegments(job, task, alloc, meta, int64(segmentMaxSize))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -138,7 +138,25 @@ func NewImportTasks(fileGroups [][]*datapb.ImportFileStats,
return tasks, nil return tasks, nil
} }
func AssignSegments(job ImportJob, task ImportTask, alloc allocator.Allocator, meta *meta) ([]int64, error) { func GetSegmentMaxSize(job ImportJob, meta *meta) int {
allDiskIndex := meta.indexMeta.AreAllDiskIndex(job.GetCollectionID(), job.GetSchema())
var segmentMaxSize int
if allDiskIndex {
// Only if all vector fields index type are DiskANN, recalc segment max size here.
segmentMaxSize = paramtable.Get().DataCoordCfg.DiskSegmentMaxSize.GetAsInt() * 1024 * 1024
} else {
// If some vector fields index type are not DiskANN, recalc segment max size using default policy.
segmentMaxSize = paramtable.Get().DataCoordCfg.SegmentMaxSize.GetAsInt() * 1024 * 1024
}
isL0Import := importutilv2.IsL0Import(job.GetOptions())
if isL0Import {
segmentMaxSize = paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt()
}
return segmentMaxSize
}
func AssignSegments(job ImportJob, task ImportTask, alloc allocator.Allocator, meta *meta, segmentMaxSize int64) ([]int64, error) {
pkField, err := typeutil.GetPrimaryFieldSchema(job.GetSchema()) pkField, err := typeutil.GetPrimaryFieldSchema(job.GetSchema())
if err != nil { if err != nil {
return nil, err return nil, err
@ -158,11 +176,6 @@ func AssignSegments(job ImportJob, task ImportTask, alloc allocator.Allocator, m
} }
isL0Import := importutilv2.IsL0Import(job.GetOptions()) isL0Import := importutilv2.IsL0Import(job.GetOptions())
segmentMaxSize := paramtable.Get().DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024
if isL0Import {
segmentMaxSize = paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64()
}
segmentLevel := datapb.SegmentLevel_L1 segmentLevel := datapb.SegmentLevel_L1
if isL0Import { if isL0Import {
segmentLevel = datapb.SegmentLevel_L0 segmentLevel = datapb.SegmentLevel_L0
@ -287,6 +300,7 @@ func AssemblePreImportRequest(task ImportTask, job ImportJob) *datapb.PreImportR
ImportFiles: importFiles, ImportFiles: importFiles,
Options: job.GetOptions(), Options: job.GetOptions(),
StorageConfig: createStorageConfig(), StorageConfig: createStorageConfig(),
TaskSlot: task.GetTaskSlot(),
} }
} }
@ -353,27 +367,15 @@ func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc all
IDRange: &datapb.IDRange{Begin: idBegin, End: idEnd}, IDRange: &datapb.IDRange{Begin: idBegin, End: idEnd},
RequestSegments: requestSegments, RequestSegments: requestSegments,
StorageConfig: createStorageConfig(), StorageConfig: createStorageConfig(),
TaskSlot: task.GetTaskSlot(),
}, nil }, nil
} }
func RegroupImportFiles(job ImportJob, files []*datapb.ImportFileStats, allDiskIndex bool) [][]*datapb.ImportFileStats { func RegroupImportFiles(job ImportJob, files []*datapb.ImportFileStats, segmentMaxSize int) [][]*datapb.ImportFileStats {
if len(files) == 0 { if len(files) == 0 {
return nil return nil
} }
var segmentMaxSize int
if allDiskIndex {
// Only if all vector fields index type are DiskANN, recalc segment max size here.
segmentMaxSize = Params.DataCoordCfg.DiskSegmentMaxSize.GetAsInt() * 1024 * 1024
} else {
// If some vector fields index type are not DiskANN, recalc segment max size using default policy.
segmentMaxSize = Params.DataCoordCfg.SegmentMaxSize.GetAsInt() * 1024 * 1024
}
isL0Import := importutilv2.IsL0Import(job.GetOptions())
if isL0Import {
segmentMaxSize = paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt()
}
threshold := paramtable.Get().DataCoordCfg.MaxSizeInMBPerImportTask.GetAsInt() * 1024 * 1024 threshold := paramtable.Get().DataCoordCfg.MaxSizeInMBPerImportTask.GetAsInt() * 1024 * 1024
maxSizePerFileGroup := segmentMaxSize * len(job.GetPartitionIDs()) * len(job.GetVchannels()) maxSizePerFileGroup := segmentMaxSize * len(job.GetPartitionIDs()) * len(job.GetVchannels())
if maxSizePerFileGroup > threshold { if maxSizePerFileGroup > threshold {
@ -751,3 +753,36 @@ func ValidateMaxImportJobExceed(ctx context.Context, importMeta ImportMeta) erro
} }
return nil return nil
} }
// CalculateTaskSlot calculates the required resource slots for an import task based on CPU and memory constraints
// The function uses a dual-constraint approach:
// 1. CPU constraint: Based on the number of files to process in parallel
// 2. Memory constraint: Based on the total buffer size required for all virtual channels and partitions
// Returns the maximum of the two constraints to ensure sufficient resources
func CalculateTaskSlot(task ImportTask, importMeta ImportMeta) int {
job := importMeta.GetJob(context.TODO(), task.GetJobID())
// Calculate CPU-based slots
fileNumPerSlot := paramtable.Get().DataCoordCfg.ImportFileNumPerSlot.GetAsInt()
cpuBasedSlots := len(task.GetFileStats()) / fileNumPerSlot
if cpuBasedSlots < 1 {
cpuBasedSlots = 1
}
// Calculate memory-based slots
baseBufferSize := paramtable.Get().DataNodeCfg.ImportBaseBufferSize.GetAsInt()
totalBufferSize := baseBufferSize * len(job.GetVchannels()) * len(job.GetPartitionIDs())
isL0Import := importutilv2.IsL0Import(job.GetOptions())
if isL0Import {
// L0 import won't hash the data by channel or partition
totalBufferSize = paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt()
}
memoryLimitPerSlot := paramtable.Get().DataCoordCfg.ImportMemoryLimitPerSlot.GetAsInt()
memoryBasedSlots := totalBufferSize / memoryLimitPerSlot
// Return the larger value to ensure both CPU and memory constraints are satisfied
if cpuBasedSlots > memoryBasedSlots {
return cpuBasedSlots
}
return memoryBasedSlots
}

View File

@ -140,7 +140,7 @@ func TestImportUtil_NewImportTasks(t *testing.T) {
meta, err := newMeta(context.TODO(), catalog, nil, broker) meta, err := newMeta(context.TODO(), catalog, nil, broker)
assert.NoError(t, err) assert.NoError(t, err)
tasks, err := NewImportTasks(fileGroups, job, alloc, meta, nil) tasks, err := NewImportTasks(fileGroups, job, alloc, meta, nil, 1*1024*1024*1024)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 2, len(tasks)) assert.Equal(t, 2, len(tasks))
for _, task := range tasks { for _, task := range tasks {
@ -212,7 +212,7 @@ func TestImportUtil_NewImportTasksWithDataTt(t *testing.T) {
meta, err := newMeta(context.TODO(), catalog, nil, broker) meta, err := newMeta(context.TODO(), catalog, nil, broker)
assert.NoError(t, err) assert.NoError(t, err)
tasks, err := NewImportTasks(fileGroups, job, alloc, meta, nil) tasks, err := NewImportTasks(fileGroups, job, alloc, meta, nil, 1*1024*1024*1024)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 2, len(tasks)) assert.Equal(t, 2, len(tasks))
for _, task := range tasks { for _, task := range tasks {
@ -225,6 +225,8 @@ func TestImportUtil_AssembleRequest(t *testing.T) {
var job ImportJob = &importJob{ var job ImportJob = &importJob{
ImportJob: &datapb.ImportJob{JobID: 0, CollectionID: 1, PartitionIDs: []int64{2}, Vchannels: []string{"v0"}}, ImportJob: &datapb.ImportJob{JobID: 0, CollectionID: 1, PartitionIDs: []int64{2}, Vchannels: []string{"v0"}},
} }
importMeta := NewMockImportMeta(t)
importMeta.EXPECT().GetJob(mock.Anything, mock.Anything).Return(job)
preImportTaskProto := &datapb.PreImportTask{ preImportTaskProto := &datapb.PreImportTask{
JobID: 0, JobID: 0,
@ -233,7 +235,9 @@ func TestImportUtil_AssembleRequest(t *testing.T) {
State: datapb.ImportTaskStateV2_Pending, State: datapb.ImportTaskStateV2_Pending,
} }
var pt ImportTask = &preImportTask{} var pt ImportTask = &preImportTask{
importMeta: importMeta,
}
pt.(*preImportTask).task.Store(preImportTaskProto) pt.(*preImportTask).task.Store(preImportTaskProto)
preimportReq := AssemblePreImportRequest(pt, job) preimportReq := AssemblePreImportRequest(pt, job)
assert.Equal(t, pt.GetJobID(), preimportReq.GetJobID()) assert.Equal(t, pt.GetJobID(), preimportReq.GetJobID())
@ -248,7 +252,9 @@ func TestImportUtil_AssembleRequest(t *testing.T) {
CollectionID: 1, CollectionID: 1,
SegmentIDs: []int64{5, 6}, SegmentIDs: []int64{5, 6},
} }
var task ImportTask = &importTask{} var task ImportTask = &importTask{
importMeta: importMeta,
}
task.(*importTask).task.Store(importTaskProto) task.(*importTask).task.Store(importTaskProto)
catalog := mocks.NewDataCoordCatalog(t) catalog := mocks.NewDataCoordCatalog(t)
@ -294,6 +300,8 @@ func TestImportUtil_AssembleRequestWithDataTt(t *testing.T) {
var job ImportJob = &importJob{ var job ImportJob = &importJob{
ImportJob: &datapb.ImportJob{JobID: 0, CollectionID: 1, PartitionIDs: []int64{2}, Vchannels: []string{"v0"}, DataTs: 100}, ImportJob: &datapb.ImportJob{JobID: 0, CollectionID: 1, PartitionIDs: []int64{2}, Vchannels: []string{"v0"}, DataTs: 100},
} }
importMeta := NewMockImportMeta(t)
importMeta.EXPECT().GetJob(mock.Anything, mock.Anything).Return(job)
preImportTaskProto := &datapb.PreImportTask{ preImportTaskProto := &datapb.PreImportTask{
JobID: 0, JobID: 0,
@ -302,7 +310,9 @@ func TestImportUtil_AssembleRequestWithDataTt(t *testing.T) {
State: datapb.ImportTaskStateV2_Pending, State: datapb.ImportTaskStateV2_Pending,
} }
var pt ImportTask = &preImportTask{} var pt ImportTask = &preImportTask{
importMeta: importMeta,
}
pt.(*preImportTask).task.Store(preImportTaskProto) pt.(*preImportTask).task.Store(preImportTaskProto)
preimportReq := AssemblePreImportRequest(pt, job) preimportReq := AssemblePreImportRequest(pt, job)
assert.Equal(t, pt.GetJobID(), preimportReq.GetJobID()) assert.Equal(t, pt.GetJobID(), preimportReq.GetJobID())
@ -317,7 +327,9 @@ func TestImportUtil_AssembleRequestWithDataTt(t *testing.T) {
CollectionID: 1, CollectionID: 1,
SegmentIDs: []int64{5, 6}, SegmentIDs: []int64{5, 6},
} }
var task ImportTask = &importTask{} var task ImportTask = &importTask{
importMeta: importMeta,
}
task.(*importTask).task.Store(importTaskProto) task.(*importTask).task.Store(importTaskProto)
catalog := mocks.NewDataCoordCatalog(t) catalog := mocks.NewDataCoordCatalog(t)
@ -382,7 +394,7 @@ func TestImportUtil_RegroupImportFiles(t *testing.T) {
}, },
} }
groups := RegroupImportFiles(job, files, false) groups := RegroupImportFiles(job, files, 1*1024*1024*1024)
total := 0 total := 0
for i, fs := range groups { for i, fs := range groups {
sum := lo.SumBy(fs, func(f *datapb.ImportFileStats) int64 { sum := lo.SumBy(fs, func(f *datapb.ImportFileStats) int64 {

View File

@ -18,3 +18,4 @@ packages:
github.com/milvus-io/milvus/internal/datanode/importv2: github.com/milvus-io/milvus/internal/datanode/importv2:
interfaces: interfaces:
Task: Task:
TaskManager:

View File

@ -0,0 +1,98 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importv2
import (
"sync"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
// MemoryAllocator handles memory allocation and deallocation for import tasks
type MemoryAllocator interface {
// TryAllocate attempts to allocate memory of the specified size
// Returns true if allocation is successful, false if insufficient memory
TryAllocate(taskID int64, size int64) bool
// Release releases memory of the specified size
Release(taskID int64, size int64)
}
type memoryAllocator struct {
systemTotalMemory int64
usedMemory int64
mutex sync.RWMutex
}
// NewMemoryAllocator creates a new MemoryAllocator instance
func NewMemoryAllocator(systemTotalMemory int64) MemoryAllocator {
log.Info("new import memory allocator", zap.Int64("systemTotalMemory", systemTotalMemory))
ma := &memoryAllocator{
systemTotalMemory: int64(systemTotalMemory),
usedMemory: 0,
}
return ma
}
// TryAllocate attempts to allocate memory of the specified size
func (ma *memoryAllocator) TryAllocate(taskID int64, size int64) bool {
ma.mutex.Lock()
defer ma.mutex.Unlock()
percentage := paramtable.Get().DataNodeCfg.ImportMemoryLimitPercentage.GetAsFloat()
memoryLimit := int64(float64(ma.systemTotalMemory) * percentage / 100.0)
if ma.usedMemory+size > memoryLimit {
log.Info("memory allocation failed, insufficient memory",
zap.Int64("taskID", taskID),
zap.Int64("requestedSize", size),
zap.Int64("usedMemory", ma.usedMemory),
zap.Int64("availableMemory", memoryLimit-ma.usedMemory))
return false
}
ma.usedMemory += size
log.Info("memory allocated successfully",
zap.Int64("taskID", taskID),
zap.Int64("allocatedSize", size),
zap.Int64("usedMemory", ma.usedMemory),
zap.Int64("availableMemory", memoryLimit-ma.usedMemory))
return true
}
// Release releases memory of the specified size
func (ma *memoryAllocator) Release(taskID int64, size int64) {
ma.mutex.Lock()
defer ma.mutex.Unlock()
ma.usedMemory -= size
if ma.usedMemory < 0 {
ma.usedMemory = 0 // Prevent negative memory usage
log.Warn("memory release resulted in negative usage, reset to 0",
zap.Int64("taskID", taskID),
zap.Int64("releaseSize", size))
}
log.Info("memory released successfully",
zap.Int64("taskID", taskID),
zap.Int64("releasedSize", size),
zap.Int64("usedMemory", ma.usedMemory))
}

View File

@ -0,0 +1,157 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importv2
import (
"testing"
"github.com/stretchr/testify/assert"
)
// TestMemoryAllocatorBasicOperations tests basic memory allocation and release operations
func TestMemoryAllocatorBasicOperations(t *testing.T) {
// Create memory allocator with 1GB system memory
ma := NewMemoryAllocator(1024 * 1024 * 1024)
// Test initial state
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory)
// Test memory allocation for task 1
success := ma.TryAllocate(1, 50*1024*1024) // 50MB for task 1
assert.True(t, success)
assert.Equal(t, int64(50*1024*1024), ma.(*memoryAllocator).usedMemory)
// Test memory allocation for task 2
success = ma.TryAllocate(2, 50*1024*1024) // 50MB for task 2
assert.True(t, success)
assert.Equal(t, int64(100*1024*1024), ma.(*memoryAllocator).usedMemory)
// Test memory release for task 1
ma.Release(1, 50*1024*1024)
assert.Equal(t, int64(50*1024*1024), ma.(*memoryAllocator).usedMemory)
// Test memory release for task 2
ma.Release(2, 50*1024*1024)
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory)
}
// TestMemoryAllocatorMemoryLimit tests memory limit enforcement
func TestMemoryAllocatorMemoryLimit(t *testing.T) {
// Create memory allocator with 1GB system memory
ma := NewMemoryAllocator(1024 * 1024 * 1024)
// Get the memory limit based on system memory and configuration percentage
memoryLimit := ma.(*memoryAllocator).systemTotalMemory
// Use a reasonable test size that should be within limits
testSize := memoryLimit / 10 // Use 10% of available memory
// Allocate memory up to the limit
success := ma.TryAllocate(1, testSize)
assert.True(t, success)
assert.Equal(t, testSize, ma.(*memoryAllocator).usedMemory)
// Try to allocate more memory than available (should fail)
success = ma.TryAllocate(2, memoryLimit)
assert.False(t, success)
assert.Equal(t, testSize, ma.(*memoryAllocator).usedMemory) // Should remain unchanged
// Release the allocated memory
ma.Release(1, testSize)
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory)
}
// TestMemoryAllocatorConcurrentAccess tests concurrent memory allocation and release
func TestMemoryAllocatorConcurrentAccess(t *testing.T) {
// Create memory allocator with 1GB system memory
ma := NewMemoryAllocator(1024 * 1024 * 1024)
// Test concurrent memory requests
done := make(chan bool, 10)
for i := 0; i < 10; i++ {
taskID := int64(i + 1)
go func() {
success := ma.TryAllocate(taskID, 50*1024*1024) // 50MB each
if success {
ma.Release(taskID, 50*1024*1024)
}
done <- true
}()
}
// Wait for all goroutines to complete
for i := 0; i < 10; i++ {
<-done
}
// Verify final state - should be 0 or the sum of remaining allocations
// depending on memory limit and concurrent execution
finalMemory := ma.(*memoryAllocator).usedMemory
assert.GreaterOrEqual(t, finalMemory, int64(0))
}
// TestMemoryAllocatorNegativeRelease tests handling of negative memory release
func TestMemoryAllocatorNegativeRelease(t *testing.T) {
// Create memory allocator with 1GB system memory
ma := NewMemoryAllocator(1024 * 1024 * 1024)
// Allocate some memory
success := ma.TryAllocate(1, 100*1024*1024) // 100MB
assert.True(t, success)
assert.Equal(t, int64(100*1024*1024), ma.(*memoryAllocator).usedMemory)
// Release more than allocated (should not go negative)
ma.Release(1, 200*1024*1024) // 200MB
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory) // Should be reset to 0
}
// TestMemoryAllocatorMultipleTasks tests memory management for multiple tasks
func TestMemoryAllocatorMultipleTasks(t *testing.T) {
// Create memory allocator with 1GB system memory
ma := NewMemoryAllocator(1024 * 1024 * 1024)
// Allocate memory for multiple tasks with smaller sizes
taskIDs := []int64{1, 2, 3, 4, 5}
sizes := []int64{20, 30, 25, 15, 35} // Total: 125MB
for i, taskID := range taskIDs {
success := ma.TryAllocate(taskID, sizes[i]*1024*1024)
assert.True(t, success)
}
// Verify total used memory
expectedTotal := int64(0)
for _, size := range sizes {
expectedTotal += size * 1024 * 1024
}
assert.Equal(t, expectedTotal, ma.(*memoryAllocator).usedMemory)
// Release memory for specific tasks
ma.Release(2, 30*1024*1024) // Release task 2
ma.Release(4, 15*1024*1024) // Release task 4
// Verify updated memory usage
expectedTotal = (20 + 25 + 35) * 1024 * 1024 // 80MB
assert.Equal(t, expectedTotal, ma.(*memoryAllocator).usedMemory)
// Release remaining tasks
ma.Release(1, 20*1024*1024)
ma.Release(3, 25*1024*1024)
ma.Release(5, 35*1024*1024)
// Verify final state
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory)
}

View File

@ -150,6 +150,51 @@ func (_c *MockTask_Execute_Call) RunAndReturn(run func() []*conc.Future[interfac
return _c return _c
} }
// GetBufferSize provides a mock function with no fields
func (_m *MockTask) GetBufferSize() int64 {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for GetBufferSize")
}
var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}
return r0
}
// MockTask_GetBufferSize_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetBufferSize'
type MockTask_GetBufferSize_Call struct {
*mock.Call
}
// GetBufferSize is a helper method to define mock.On call
func (_e *MockTask_Expecter) GetBufferSize() *MockTask_GetBufferSize_Call {
return &MockTask_GetBufferSize_Call{Call: _e.mock.On("GetBufferSize")}
}
func (_c *MockTask_GetBufferSize_Call) Run(run func()) *MockTask_GetBufferSize_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockTask_GetBufferSize_Call) Return(_a0 int64) *MockTask_GetBufferSize_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTask_GetBufferSize_Call) RunAndReturn(run func() int64) *MockTask_GetBufferSize_Call {
_c.Call.Return(run)
return _c
}
// GetCollectionID provides a mock function with no fields // GetCollectionID provides a mock function with no fields
func (_m *MockTask) GetCollectionID() int64 { func (_m *MockTask) GetCollectionID() int64 {
ret := _m.Called() ret := _m.Called()

View File

@ -0,0 +1,255 @@
// Code generated by mockery v2.53.3. DO NOT EDIT.
package importv2
import mock "github.com/stretchr/testify/mock"
// MockTaskManager is an autogenerated mock type for the TaskManager type
type MockTaskManager struct {
mock.Mock
}
type MockTaskManager_Expecter struct {
mock *mock.Mock
}
func (_m *MockTaskManager) EXPECT() *MockTaskManager_Expecter {
return &MockTaskManager_Expecter{mock: &_m.Mock}
}
// Add provides a mock function with given fields: task
func (_m *MockTaskManager) Add(task Task) {
_m.Called(task)
}
// MockTaskManager_Add_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Add'
type MockTaskManager_Add_Call struct {
*mock.Call
}
// Add is a helper method to define mock.On call
// - task Task
func (_e *MockTaskManager_Expecter) Add(task interface{}) *MockTaskManager_Add_Call {
return &MockTaskManager_Add_Call{Call: _e.mock.On("Add", task)}
}
func (_c *MockTaskManager_Add_Call) Run(run func(task Task)) *MockTaskManager_Add_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(Task))
})
return _c
}
func (_c *MockTaskManager_Add_Call) Return() *MockTaskManager_Add_Call {
_c.Call.Return()
return _c
}
func (_c *MockTaskManager_Add_Call) RunAndReturn(run func(Task)) *MockTaskManager_Add_Call {
_c.Run(run)
return _c
}
// Get provides a mock function with given fields: taskID
func (_m *MockTaskManager) Get(taskID int64) Task {
ret := _m.Called(taskID)
if len(ret) == 0 {
panic("no return value specified for Get")
}
var r0 Task
if rf, ok := ret.Get(0).(func(int64) Task); ok {
r0 = rf(taskID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(Task)
}
}
return r0
}
// MockTaskManager_Get_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Get'
type MockTaskManager_Get_Call struct {
*mock.Call
}
// Get is a helper method to define mock.On call
// - taskID int64
func (_e *MockTaskManager_Expecter) Get(taskID interface{}) *MockTaskManager_Get_Call {
return &MockTaskManager_Get_Call{Call: _e.mock.On("Get", taskID)}
}
func (_c *MockTaskManager_Get_Call) Run(run func(taskID int64)) *MockTaskManager_Get_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockTaskManager_Get_Call) Return(_a0 Task) *MockTaskManager_Get_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTaskManager_Get_Call) RunAndReturn(run func(int64) Task) *MockTaskManager_Get_Call {
_c.Call.Return(run)
return _c
}
// GetBy provides a mock function with given fields: filters
func (_m *MockTaskManager) GetBy(filters ...TaskFilter) []Task {
_va := make([]interface{}, len(filters))
for _i := range filters {
_va[_i] = filters[_i]
}
var _ca []interface{}
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
if len(ret) == 0 {
panic("no return value specified for GetBy")
}
var r0 []Task
if rf, ok := ret.Get(0).(func(...TaskFilter) []Task); ok {
r0 = rf(filters...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]Task)
}
}
return r0
}
// MockTaskManager_GetBy_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetBy'
type MockTaskManager_GetBy_Call struct {
*mock.Call
}
// GetBy is a helper method to define mock.On call
// - filters ...TaskFilter
func (_e *MockTaskManager_Expecter) GetBy(filters ...interface{}) *MockTaskManager_GetBy_Call {
return &MockTaskManager_GetBy_Call{Call: _e.mock.On("GetBy",
append([]interface{}{}, filters...)...)}
}
func (_c *MockTaskManager_GetBy_Call) Run(run func(filters ...TaskFilter)) *MockTaskManager_GetBy_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]TaskFilter, len(args)-0)
for i, a := range args[0:] {
if a != nil {
variadicArgs[i] = a.(TaskFilter)
}
}
run(variadicArgs...)
})
return _c
}
func (_c *MockTaskManager_GetBy_Call) Return(_a0 []Task) *MockTaskManager_GetBy_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTaskManager_GetBy_Call) RunAndReturn(run func(...TaskFilter) []Task) *MockTaskManager_GetBy_Call {
_c.Call.Return(run)
return _c
}
// Remove provides a mock function with given fields: taskID
func (_m *MockTaskManager) Remove(taskID int64) {
_m.Called(taskID)
}
// MockTaskManager_Remove_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Remove'
type MockTaskManager_Remove_Call struct {
*mock.Call
}
// Remove is a helper method to define mock.On call
// - taskID int64
func (_e *MockTaskManager_Expecter) Remove(taskID interface{}) *MockTaskManager_Remove_Call {
return &MockTaskManager_Remove_Call{Call: _e.mock.On("Remove", taskID)}
}
func (_c *MockTaskManager_Remove_Call) Run(run func(taskID int64)) *MockTaskManager_Remove_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockTaskManager_Remove_Call) Return() *MockTaskManager_Remove_Call {
_c.Call.Return()
return _c
}
func (_c *MockTaskManager_Remove_Call) RunAndReturn(run func(int64)) *MockTaskManager_Remove_Call {
_c.Run(run)
return _c
}
// Update provides a mock function with given fields: taskID, actions
func (_m *MockTaskManager) Update(taskID int64, actions ...UpdateAction) {
_va := make([]interface{}, len(actions))
for _i := range actions {
_va[_i] = actions[_i]
}
var _ca []interface{}
_ca = append(_ca, taskID)
_ca = append(_ca, _va...)
_m.Called(_ca...)
}
// MockTaskManager_Update_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Update'
type MockTaskManager_Update_Call struct {
*mock.Call
}
// Update is a helper method to define mock.On call
// - taskID int64
// - actions ...UpdateAction
func (_e *MockTaskManager_Expecter) Update(taskID interface{}, actions ...interface{}) *MockTaskManager_Update_Call {
return &MockTaskManager_Update_Call{Call: _e.mock.On("Update",
append([]interface{}{taskID}, actions...)...)}
}
func (_c *MockTaskManager_Update_Call) Run(run func(taskID int64, actions ...UpdateAction)) *MockTaskManager_Update_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]UpdateAction, len(args)-1)
for i, a := range args[1:] {
if a != nil {
variadicArgs[i] = a.(UpdateAction)
}
}
run(args[0].(int64), variadicArgs...)
})
return _c
}
func (_c *MockTaskManager_Update_Call) Return() *MockTaskManager_Update_Call {
_c.Call.Return()
return _c
}
func (_c *MockTaskManager_Update_Call) RunAndReturn(run func(int64, ...UpdateAction)) *MockTaskManager_Update_Call {
_c.Run(run)
return _c
}
// NewMockTaskManager creates a new instance of MockTaskManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockTaskManager(t interface {
mock.TestingT
Cleanup(func())
}) *MockTaskManager {
mock := &MockTaskManager{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/config" "github.com/milvus-io/milvus/pkg/v2/config"
"github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/conc" "github.com/milvus-io/milvus/pkg/v2/util/conc"
"github.com/milvus-io/milvus/pkg/v2/util/hardware"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
) )
@ -36,7 +37,8 @@ var (
func initExecPool() { func initExecPool() {
pt := paramtable.Get() pt := paramtable.Get()
initPoolSize := paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt() cpuNum := hardware.GetCPUNum()
initPoolSize := cpuNum * pt.DataNodeCfg.ImportConcurrencyPerCPUCore.GetAsInt()
execPool = conc.NewPool[any]( execPool = conc.NewPool[any](
initPoolSize, initPoolSize,
conc.WithPreAlloc(false), // pre alloc must be false to resize pool dynamically, use warmup to alloc worker here conc.WithPreAlloc(false), // pre alloc must be false to resize pool dynamically, use warmup to alloc worker here
@ -44,14 +46,15 @@ func initExecPool() {
) )
conc.WarmupPool(execPool, runtime.LockOSThread) conc.WarmupPool(execPool, runtime.LockOSThread)
watchKey := pt.DataNodeCfg.MaxConcurrentImportTaskNum.Key watchKey := pt.DataNodeCfg.ImportConcurrencyPerCPUCore.Key
pt.Watch(watchKey, config.NewHandler(watchKey, resizeExecPool)) pt.Watch(watchKey, config.NewHandler(watchKey, resizeExecPool))
log.Info("init import execution pool done", zap.Int("size", initPoolSize)) log.Info("init import execution pool done", zap.Int("size", initPoolSize))
} }
func resizeExecPool(evt *config.Event) { func resizeExecPool(evt *config.Event) {
if evt.HasUpdated { if evt.HasUpdated {
newSize := paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt() cpuNum := hardware.GetCPUNum()
newSize := cpuNum * paramtable.Get().DataNodeCfg.ImportConcurrencyPerCPUCore.GetAsInt()
log := log.Ctx(context.Background()).With(zap.Int("newSize", newSize)) log := log.Ctx(context.Background()).With(zap.Int("newSize", newSize))
err := GetExecPool().Resize(newSize) err := GetExecPool().Resize(newSize)

View File

@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/v2/config" "github.com/milvus-io/milvus/pkg/v2/config"
"github.com/milvus-io/milvus/pkg/v2/util/hardware"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
) )
@ -31,31 +32,32 @@ func TestResizePools(t *testing.T) {
pt := paramtable.Get() pt := paramtable.Get()
defer func() { defer func() {
_ = pt.Reset(pt.DataNodeCfg.MaxConcurrentImportTaskNum.Key) _ = pt.Reset(pt.DataNodeCfg.ImportConcurrencyPerCPUCore.Key)
}() }()
t.Run("ExecPool", func(t *testing.T) { t.Run("ExecPool", func(t *testing.T) {
expectedCap := pt.DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt() cpuNum := hardware.GetCPUNum()
expectedCap := cpuNum * pt.DataNodeCfg.ImportConcurrencyPerCPUCore.GetAsInt()
assert.Equal(t, expectedCap, GetExecPool().Cap()) assert.Equal(t, expectedCap, GetExecPool().Cap())
resizeExecPool(&config.Event{ resizeExecPool(&config.Event{
HasUpdated: true, HasUpdated: true,
}) })
assert.Equal(t, expectedCap, GetExecPool().Cap()) assert.Equal(t, expectedCap, GetExecPool().Cap())
_ = pt.Save(pt.DataNodeCfg.MaxConcurrentImportTaskNum.Key, fmt.Sprintf("%d", expectedCap*2)) _ = pt.Save(pt.DataNodeCfg.ImportConcurrencyPerCPUCore.Key, fmt.Sprintf("%d", expectedCap*2))
expectedCap = pt.DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt() expectedCap = cpuNum * pt.DataNodeCfg.ImportConcurrencyPerCPUCore.GetAsInt()
resizeExecPool(&config.Event{ resizeExecPool(&config.Event{
HasUpdated: true, HasUpdated: true,
}) })
assert.Equal(t, expectedCap, GetExecPool().Cap()) assert.Equal(t, expectedCap, GetExecPool().Cap())
_ = pt.Save(pt.DataNodeCfg.MaxConcurrentImportTaskNum.Key, "0") _ = pt.Save(pt.DataNodeCfg.ImportConcurrencyPerCPUCore.Key, "0")
resizeExecPool(&config.Event{ resizeExecPool(&config.Event{
HasUpdated: true, HasUpdated: true,
}) })
assert.Equal(t, expectedCap, GetExecPool().Cap(), "pool shall not be resized when newSize is 0") assert.Equal(t, expectedCap, GetExecPool().Cap(), "pool shall not be resized when newSize is 0")
_ = pt.Save(pt.DataNodeCfg.MaxConcurrentImportTaskNum.Key, "invalid") _ = pt.Save(pt.DataNodeCfg.ImportConcurrencyPerCPUCore.Key, "invalid")
resizeExecPool(&config.Event{ resizeExecPool(&config.Event{
HasUpdated: true, HasUpdated: true,
}) })

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/conc" "github.com/milvus-io/milvus/pkg/v2/util/conc"
"github.com/milvus-io/milvus/pkg/v2/util/hardware"
) )
type Scheduler interface { type Scheduler interface {
@ -36,57 +37,89 @@ type Scheduler interface {
} }
type scheduler struct { type scheduler struct {
manager TaskManager manager TaskManager
memoryAllocator MemoryAllocator
closeOnce sync.Once closeOnce sync.Once
closeChan chan struct{} closeChan chan struct{}
} }
func NewScheduler(manager TaskManager) Scheduler { func NewScheduler(manager TaskManager) Scheduler {
memoryAllocator := NewMemoryAllocator(int64(hardware.GetMemoryCount()))
return &scheduler{ return &scheduler{
manager: manager, manager: manager,
closeChan: make(chan struct{}), memoryAllocator: memoryAllocator,
closeChan: make(chan struct{}),
} }
} }
func (s *scheduler) Start() { func (s *scheduler) Start() {
log.Info("start import scheduler") log.Info("start import scheduler")
var ( var (
exeTicker = time.NewTicker(1 * time.Second) exeTicker = time.NewTicker(1 * time.Second)
logTicker = time.NewTicker(10 * time.Minute) logTicker = time.NewTicker(10 * time.Minute)
) )
defer exeTicker.Stop() defer exeTicker.Stop()
defer logTicker.Stop() defer logTicker.Stop()
for { for {
select { select {
case <-s.closeChan: case <-s.closeChan:
log.Info("import scheduler exited") log.Info("import scheduler exited")
return return
case <-exeTicker.C: case <-exeTicker.C:
tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending)) s.scheduleTasks()
sort.Slice(tasks, func(i, j int) bool {
return tasks[i].GetTaskID() < tasks[j].GetTaskID()
})
futures := make(map[int64][]*conc.Future[any])
for _, task := range tasks {
fs := task.Execute()
futures[task.GetTaskID()] = fs
tryFreeFutures(futures)
}
for taskID, fs := range futures {
err := conc.AwaitAll(fs...)
if err != nil {
continue
}
s.manager.Update(taskID, UpdateState(datapb.ImportTaskStateV2_Completed))
log.Info("preimport/import done", zap.Int64("taskID", taskID))
}
case <-logTicker.C: case <-logTicker.C:
LogStats(s.manager) LogStats(s.manager)
} }
} }
} }
// scheduleTasks implements memory-based task scheduling using MemoryAllocator
// It selects tasks that can fit within the available memory.
func (s *scheduler) scheduleTasks() {
pendingTasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending))
sort.Slice(pendingTasks, func(i, j int) bool {
return pendingTasks[i].GetTaskID() < pendingTasks[j].GetTaskID()
})
selectedTasks := make([]Task, 0)
tasksBufferSize := make(map[int64]int64)
for _, task := range pendingTasks {
taskBufferSize := task.GetBufferSize()
if s.memoryAllocator.TryAllocate(task.GetTaskID(), taskBufferSize) {
selectedTasks = append(selectedTasks, task)
tasksBufferSize[task.GetTaskID()] = taskBufferSize
}
}
log.Info("processing selected tasks",
zap.Int("pending", len(pendingTasks)),
zap.Int("selected", len(selectedTasks)))
if len(selectedTasks) == 0 {
return
}
futures := make(map[int64][]*conc.Future[any])
for _, task := range selectedTasks {
fs := task.Execute()
futures[task.GetTaskID()] = fs
}
for taskID, fs := range futures {
err := conc.AwaitAll(fs...)
if err != nil {
s.memoryAllocator.Release(taskID, tasksBufferSize[taskID])
continue
}
s.manager.Update(taskID, UpdateState(datapb.ImportTaskStateV2_Completed))
s.memoryAllocator.Release(taskID, tasksBufferSize[taskID])
log.Info("preimport/import done", zap.Int64("taskID", taskID))
}
}
// Slots returns the available slots for import // Slots returns the available slots for import
func (s *scheduler) Slots() int64 { func (s *scheduler) Slots() int64 {
tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending, datapb.ImportTaskStateV2_InProgress)) tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending, datapb.ImportTaskStateV2_InProgress))

View File

@ -26,6 +26,7 @@ import (
"time" "time"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
@ -121,6 +122,10 @@ func (s *SchedulerSuite) SetupTest() {
s.scheduler = NewScheduler(s.manager).(*scheduler) s.scheduler = NewScheduler(s.manager).(*scheduler)
} }
func (s *SchedulerSuite) TearDownTest() {
s.scheduler.Close()
}
func (s *SchedulerSuite) TestScheduler_Slots() { func (s *SchedulerSuite) TestScheduler_Slots() {
preimportReq := &datapb.PreImportRequest{ preimportReq := &datapb.PreImportRequest{
JobID: 1, JobID: 1,
@ -130,12 +135,13 @@ func (s *SchedulerSuite) TestScheduler_Slots() {
Vchannels: []string{"ch-0"}, Vchannels: []string{"ch-0"},
Schema: s.schema, Schema: s.schema,
ImportFiles: []*internalpb.ImportFile{{Paths: []string{"dummy.json"}}}, ImportFiles: []*internalpb.ImportFile{{Paths: []string{"dummy.json"}}},
TaskSlot: 10,
} }
preimportTask := NewPreImportTask(preimportReq, s.manager, s.cm) preimportTask := NewPreImportTask(preimportReq, s.manager, s.cm)
s.manager.Add(preimportTask) s.manager.Add(preimportTask)
slots := s.scheduler.Slots() slots := s.scheduler.Slots()
s.Equal(int64(1), slots) s.Equal(int64(10), slots)
} }
func (s *SchedulerSuite) TestScheduler_Start_Preimport() { func (s *SchedulerSuite) TestScheduler_Start_Preimport() {
@ -547,6 +553,88 @@ func (s *SchedulerSuite) TestScheduler_ImportFileWithFunction() {
s.NoError(err) s.NoError(err)
} }
// TestScheduler_ScheduleTasks tests the scheduleTasks method with various scenarios
func (s *SchedulerSuite) TestScheduler_ScheduleTasks() {
// Memory limit exceeded - some tasks should be skipped
s.Run("MemoryLimitExceeded", func() {
manager := NewMockTaskManager(s.T())
s.scheduler.manager = manager
// Add tasks that exceed memory limit
tasks := make(map[int64]Task, 0)
for i := 0; i < 5; i++ {
t := NewMockTask(s.T())
t.EXPECT().GetTaskID().Return(int64(i))
t.EXPECT().GetBufferSize().Return(int64(300))
if i < 3 { // Only first 3 tasks should be allocated (900 total)
t.EXPECT().Execute().Return([]*conc.Future[any]{})
}
tasks[t.GetTaskID()] = t
}
manager.EXPECT().GetBy(mock.Anything).Return(lo.Values(tasks))
manager.EXPECT().Update(mock.Anything, mock.Anything).Return()
memAllocator := NewMemoryAllocator(1000 / 0.2)
s.scheduler.memoryAllocator = memAllocator
s.scheduler.scheduleTasks()
s.Equal(int64(0), memAllocator.(*memoryAllocator).usedMemory)
})
// Task execution failure - memory should be released
s.Run("TaskExecutionFailure", func() {
manager := NewMockTaskManager(s.T())
s.scheduler.manager = manager
tasks := make(map[int64]Task, 0)
// Create a task that will fail execution
failedTask := NewMockTask(s.T())
failedTask.EXPECT().GetTaskID().Return(int64(1))
failedTask.EXPECT().GetBufferSize().Return(int64(256))
// Create a future that will fail
failedFuture := conc.Go(func() (any, error) {
return nil, errors.New("mock execution error")
})
failedTask.EXPECT().Execute().Return([]*conc.Future[any]{failedFuture})
tasks[failedTask.GetTaskID()] = failedTask
// Create a successful task
successTask := NewMockTask(s.T())
successTask.EXPECT().GetTaskID().Return(int64(2))
successTask.EXPECT().GetBufferSize().Return(int64(128))
successTask.EXPECT().Execute().Return([]*conc.Future[any]{})
tasks[successTask.GetTaskID()] = successTask
manager.EXPECT().GetBy(mock.Anything).Return(lo.Values(tasks))
manager.EXPECT().Update(mock.Anything, mock.Anything).Return()
memAllocator := NewMemoryAllocator(512 * 5)
s.scheduler.memoryAllocator = memAllocator
s.scheduler.scheduleTasks()
s.Equal(int64(0), memAllocator.(*memoryAllocator).usedMemory)
})
// Empty task list
s.Run("EmptyTaskList", func() {
manager := NewMockTaskManager(s.T())
s.scheduler.manager = manager
memAllocator := NewMemoryAllocator(1024)
s.scheduler.memoryAllocator = memAllocator
manager.EXPECT().GetBy(mock.Anything).Return(nil)
// Should not panic or error
s.NotPanics(func() {
s.scheduler.scheduleTasks()
})
s.Equal(int64(0), memAllocator.(*memoryAllocator).usedMemory)
})
}
func TestScheduler(t *testing.T) { func TestScheduler(t *testing.T) {
suite.Run(t, new(SchedulerSuite)) suite.Run(t, new(SchedulerSuite))
} }

View File

@ -163,6 +163,7 @@ type Task interface {
GetReason() string GetReason() string
GetSchema() *schemapb.CollectionSchema GetSchema() *schemapb.CollectionSchema
GetSlots() int64 GetSlots() int64
GetBufferSize() int64
Cancel() Cancel()
Clone() Task Clone() Task
} }

View File

@ -36,8 +36,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/util/conc" "github.com/milvus-io/milvus/pkg/v2/util/conc"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
) )
@ -105,14 +103,11 @@ func (t *ImportTask) GetSchema() *schemapb.CollectionSchema {
} }
func (t *ImportTask) GetSlots() int64 { func (t *ImportTask) GetSlots() int64 {
// Consider the following two scenarios: return t.req.GetTaskSlot()
// 1. Importing a large number of small files results in }
// a small total data size, making file count unsuitable as a slot number.
// 2. Importing a file with many shards number results in many segments and a small total data size, func (t *ImportTask) GetBufferSize() int64 {
// making segment count unsuitable as a slot number. return GetTaskBufferSize(t)
// Taking these factors into account, we've decided to use the
// minimum value between segment count and file count as the slot number.
return int64(funcutil.Min(len(t.GetFileStats()), len(t.GetSegmentIDs()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt()))
} }
func (t *ImportTask) Cancel() { func (t *ImportTask) Cancel() {
@ -140,9 +135,10 @@ func (t *ImportTask) Clone() Task {
} }
func (t *ImportTask) Execute() []*conc.Future[any] { func (t *ImportTask) Execute() []*conc.Future[any] {
bufferSize := paramtable.Get().DataNodeCfg.ImportInsertBufferSize.GetAsInt() bufferSize := int(t.GetBufferSize())
log.Info("start to import", WrapLogFields(t, log.Info("start to import", WrapLogFields(t,
zap.Int("bufferSize", bufferSize), zap.Int("bufferSize", bufferSize),
zap.Int64("taskSlot", t.GetSlots()),
zap.Any("schema", t.GetSchema()))...) zap.Any("schema", t.GetSchema()))...)
t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress))

View File

@ -99,7 +99,12 @@ func (t *L0ImportTask) GetSchema() *schemapb.CollectionSchema {
} }
func (t *L0ImportTask) GetSlots() int64 { func (t *L0ImportTask) GetSlots() int64 {
return 1 return t.req.GetTaskSlot()
}
// L0 import task buffer size is fixed
func (t *L0ImportTask) GetBufferSize() int64 {
return paramtable.Get().DataNodeCfg.ImportBaseBufferSize.GetAsInt64()
} }
func (t *L0ImportTask) Cancel() { func (t *L0ImportTask) Cancel() {
@ -127,9 +132,10 @@ func (t *L0ImportTask) Clone() Task {
} }
func (t *L0ImportTask) Execute() []*conc.Future[any] { func (t *L0ImportTask) Execute() []*conc.Future[any] {
bufferSize := paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt() bufferSize := int(t.GetBufferSize())
log.Info("start to import l0", WrapLogFields(t, log.Info("start to import l0", WrapLogFields(t,
zap.Int("bufferSize", bufferSize), zap.Int("bufferSize", bufferSize),
zap.Int64("taskSlot", t.GetSlots()),
zap.Any("schema", t.GetSchema()))...) zap.Any("schema", t.GetSchema()))...)
t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress))

View File

@ -45,6 +45,7 @@ type L0PreImportTask struct {
partitionIDs []int64 partitionIDs []int64
vchannels []string vchannels []string
schema *schemapb.CollectionSchema schema *schemapb.CollectionSchema
req *datapb.PreImportRequest
manager TaskManager manager TaskManager
cm storage.ChunkManager cm storage.ChunkManager
@ -73,6 +74,7 @@ func NewL0PreImportTask(req *datapb.PreImportRequest,
partitionIDs: req.GetPartitionIDs(), partitionIDs: req.GetPartitionIDs(),
vchannels: req.GetVchannels(), vchannels: req.GetVchannels(),
schema: req.GetSchema(), schema: req.GetSchema(),
req: req,
manager: manager, manager: manager,
cm: cm, cm: cm,
} }
@ -95,7 +97,12 @@ func (t *L0PreImportTask) GetSchema() *schemapb.CollectionSchema {
} }
func (t *L0PreImportTask) GetSlots() int64 { func (t *L0PreImportTask) GetSlots() int64 {
return 1 return t.req.GetTaskSlot()
}
// L0 preimport task buffer size is fixed
func (t *L0PreImportTask) GetBufferSize() int64 {
return paramtable.Get().DataNodeCfg.ImportBaseBufferSize.GetAsInt64()
} }
func (t *L0PreImportTask) Cancel() { func (t *L0PreImportTask) Cancel() {
@ -115,9 +122,10 @@ func (t *L0PreImportTask) Clone() Task {
} }
func (t *L0PreImportTask) Execute() []*conc.Future[any] { func (t *L0PreImportTask) Execute() []*conc.Future[any] {
bufferSize := paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt() bufferSize := int(t.GetBufferSize())
log.Info("start to preimport l0", WrapLogFields(t, log.Info("start to preimport l0", WrapLogFields(t,
zap.Int("bufferSize", bufferSize), zap.Int("bufferSize", bufferSize),
zap.Int64("taskSlot", t.GetSlots()),
zap.Any("schema", t.GetSchema()))...) zap.Any("schema", t.GetSchema()))...)
t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress))

View File

@ -34,7 +34,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/util/conc" "github.com/milvus-io/milvus/pkg/v2/util/conc"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
) )
@ -47,6 +46,7 @@ type PreImportTask struct {
vchannels []string vchannels []string
schema *schemapb.CollectionSchema schema *schemapb.CollectionSchema
options []*commonpb.KeyValuePair options []*commonpb.KeyValuePair
req *datapb.PreImportRequest
manager TaskManager manager TaskManager
cm storage.ChunkManager cm storage.ChunkManager
@ -81,6 +81,7 @@ func NewPreImportTask(req *datapb.PreImportRequest,
vchannels: req.GetVchannels(), vchannels: req.GetVchannels(),
schema: req.GetSchema(), schema: req.GetSchema(),
options: req.GetOptions(), options: req.GetOptions(),
req: req,
manager: manager, manager: manager,
cm: cm, cm: cm,
} }
@ -103,7 +104,11 @@ func (t *PreImportTask) GetSchema() *schemapb.CollectionSchema {
} }
func (t *PreImportTask) GetSlots() int64 { func (t *PreImportTask) GetSlots() int64 {
return int64(funcutil.Min(len(t.GetFileStats()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt())) return t.req.GetTaskSlot()
}
func (t *PreImportTask) GetBufferSize() int64 {
return GetTaskBufferSize(t)
} }
func (t *PreImportTask) Cancel() { func (t *PreImportTask) Cancel() {
@ -124,9 +129,10 @@ func (t *PreImportTask) Clone() Task {
} }
func (t *PreImportTask) Execute() []*conc.Future[any] { func (t *PreImportTask) Execute() []*conc.Future[any] {
bufferSize := paramtable.Get().DataNodeCfg.ImportInsertBufferSize.GetAsInt() bufferSize := int(t.GetBufferSize())
log.Info("start to preimport", WrapLogFields(t, log.Info("start to preimport", WrapLogFields(t,
zap.Int("bufferSize", bufferSize), zap.Int("bufferSize", bufferSize),
zap.Int64("taskSlot", t.GetSlots()),
zap.Any("schema", t.GetSchema()))...) zap.Any("schema", t.GetSchema()))...)
t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress)) t.manager.Update(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress))
files := lo.Map(t.GetFileStats(), files := lo.Map(t.GetFileStats(),

View File

@ -38,7 +38,9 @@ import (
"github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/hardware"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
) )
@ -545,3 +547,19 @@ func NewMetaCache(req *datapb.ImportRequest) map[string]metacache.MetaCache {
} }
return metaCaches return metaCaches
} }
// GetBufferSize returns the memory buffer size required by this task
func GetTaskBufferSize(task Task) int64 {
baseBufferSize := paramtable.Get().DataNodeCfg.ImportBaseBufferSize.GetAsInt()
vchannelNum := len(task.GetVchannels())
partitionNum := len(task.GetPartitionIDs())
totalBufferSize := int64(baseBufferSize * vchannelNum * partitionNum)
percentage := paramtable.Get().DataNodeCfg.ImportMemoryLimitPercentage.GetAsFloat()
memoryLimit := int64(float64(hardware.GetMemoryCount()) * percentage / 100.0)
if totalBufferSize > memoryLimit {
return memoryLimit
}
return totalBufferSize
}

View File

@ -286,6 +286,7 @@ func (node *DataNode) FlushChannels(ctx context.Context, req *datapb.FlushChanne
func (node *DataNode) PreImport(ctx context.Context, req *datapb.PreImportRequest) (*commonpb.Status, error) { func (node *DataNode) PreImport(ctx context.Context, req *datapb.PreImportRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx).With(zap.Int64("taskID", req.GetTaskID()), log := log.Ctx(ctx).With(zap.Int64("taskID", req.GetTaskID()),
zap.Int64("jobID", req.GetJobID()), zap.Int64("jobID", req.GetJobID()),
zap.Int64("taskSlot", req.GetTaskSlot()),
zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64s("partitionIDs", req.GetPartitionIDs()), zap.Int64s("partitionIDs", req.GetPartitionIDs()),
zap.Strings("vchannels", req.GetVchannels()), zap.Strings("vchannels", req.GetVchannels()),
@ -321,6 +322,7 @@ func (node *DataNode) PreImport(ctx context.Context, req *datapb.PreImportReques
func (node *DataNode) ImportV2(ctx context.Context, req *datapb.ImportRequest) (*commonpb.Status, error) { func (node *DataNode) ImportV2(ctx context.Context, req *datapb.ImportRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx).With(zap.Int64("taskID", req.GetTaskID()), log := log.Ctx(ctx).With(zap.Int64("taskID", req.GetTaskID()),
zap.Int64("jobID", req.GetJobID()), zap.Int64("jobID", req.GetJobID()),
zap.Int64("taskSlot", req.GetTaskSlot()),
zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64s("partitionIDs", req.GetPartitionIDs()), zap.Int64s("partitionIDs", req.GetPartitionIDs()),
zap.Strings("vchannels", req.GetVchannels()), zap.Strings("vchannels", req.GetVchannels()),

View File

@ -19,6 +19,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/config" "github.com/milvus-io/milvus/pkg/v2/config"
"github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/conc" "github.com/milvus-io/milvus/pkg/v2/util/conc"
"github.com/milvus-io/milvus/pkg/v2/util/hardware"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
) )
@ -67,9 +68,10 @@ type syncManager struct {
func NewSyncManager(chunkManager storage.ChunkManager) SyncManager { func NewSyncManager(chunkManager storage.ChunkManager) SyncManager {
params := paramtable.Get() params := paramtable.Get()
initPoolSize := params.DataNodeCfg.MaxParallelSyncMgrTasks.GetAsInt() cpuNum := hardware.GetCPUNum()
initPoolSize := cpuNum * params.DataNodeCfg.MaxParallelSyncMgrTasksPerCPUCore.GetAsInt()
dispatcher := newKeyLockDispatcher[int64](initPoolSize) dispatcher := newKeyLockDispatcher[int64](initPoolSize)
log.Info("sync manager initialized", zap.Int("initPoolSize", initPoolSize)) log.Info("sync manager initialized", zap.Int("initPoolSize", initPoolSize), zap.Int("cpuNum", cpuNum))
syncMgr := &syncManager{ syncMgr := &syncManager{
keyLockDispatcher: dispatcher, keyLockDispatcher: dispatcher,
@ -80,7 +82,7 @@ func NewSyncManager(chunkManager storage.ChunkManager) SyncManager {
// setup config update watcher // setup config update watcher
handler := config.NewHandler("datanode.syncmgr.poolsize", syncMgr.resizeHandler) handler := config.NewHandler("datanode.syncmgr.poolsize", syncMgr.resizeHandler)
syncMgr.handler = handler syncMgr.handler = handler
params.Watch(params.DataNodeCfg.MaxParallelSyncMgrTasks.Key, handler) params.Watch(params.DataNodeCfg.MaxParallelSyncMgrTasksPerCPUCore.Key, handler)
return syncMgr return syncMgr
} }
@ -90,12 +92,13 @@ func (mgr *syncManager) resizeHandler(evt *config.Event) {
zap.String("key", evt.Key), zap.String("key", evt.Key),
zap.String("value", evt.Value), zap.String("value", evt.Value),
) )
cpuNum := hardware.GetCPUNum()
size, err := strconv.ParseInt(evt.Value, 10, 64) size, err := strconv.ParseInt(evt.Value, 10, 64)
if err != nil { if err != nil {
log.Warn("failed to parse new datanode syncmgr pool size", zap.Error(err)) log.Warn("failed to parse new datanode syncmgr pool size", zap.Error(err))
return return
} }
err = mgr.keyLockDispatcher.workerPool.Resize(int(size)) err = mgr.keyLockDispatcher.workerPool.Resize(cpuNum * int(size))
if err != nil { if err != nil {
log.Warn("failed to resize datanode syncmgr pool size", zap.String("key", evt.Key), zap.String("value", evt.Value), zap.Error(err)) log.Warn("failed to resize datanode syncmgr pool size", zap.String("key", evt.Key), zap.String("value", evt.Value), zap.Error(err))
return return
@ -173,7 +176,7 @@ func (mgr *syncManager) TaskStatsJSON() string {
} }
func (mgr *syncManager) Close() error { func (mgr *syncManager) Close() error {
paramtable.Get().Unwatch(paramtable.Get().DataNodeCfg.MaxParallelSyncMgrTasks.Key, mgr.handler) paramtable.Get().Unwatch(paramtable.Get().DataNodeCfg.MaxParallelSyncMgrTasksPerCPUCore.Key, mgr.handler)
timeout := paramtable.Get().CommonCfg.SyncTaskPoolReleaseTimeoutSeconds.GetAsDuration(time.Second) timeout := paramtable.Get().CommonCfg.SyncTaskPoolReleaseTimeoutSeconds.GetAsDuration(time.Second)
return mgr.workerPool.ReleaseTimeout(timeout) return mgr.workerPool.ReleaseTimeout(timeout)
} }

View File

@ -172,7 +172,8 @@ func (s *SyncManagerSuite) TestResizePool() {
s.NotZero(cap) s.NotZero(cap)
params := paramtable.Get() params := paramtable.Get()
configKey := params.DataNodeCfg.MaxParallelSyncMgrTasks.Key configKey := params.DataNodeCfg.MaxParallelSyncMgrTasksPerCPUCore.Key
oldValue := params.DataNodeCfg.MaxParallelSyncMgrTasksPerCPUCore.GetAsInt()
syncMgr.resizeHandler(&config.Event{ syncMgr.resizeHandler(&config.Event{
Key: configKey, Key: configKey,
@ -191,7 +192,7 @@ func (s *SyncManagerSuite) TestResizePool() {
syncMgr.resizeHandler(&config.Event{ syncMgr.resizeHandler(&config.Event{
Key: configKey, Key: configKey,
Value: strconv.FormatInt(int64(cap*2), 10), Value: strconv.FormatInt(int64(oldValue*2), 10),
HasUpdated: true, HasUpdated: true,
}) })
s.Equal(cap*2, syncMgr.keyLockDispatcher.workerPool.Cap()) s.Equal(cap*2, syncMgr.keyLockDispatcher.workerPool.Cap())

View File

@ -828,6 +828,7 @@ message PreImportRequest {
repeated internal.ImportFile import_files = 8; repeated internal.ImportFile import_files = 8;
repeated common.KeyValuePair options = 9; repeated common.KeyValuePair options = 9;
index.StorageConfig storage_config = 10; index.StorageConfig storage_config = 10;
int64 task_slot = 11;
} }
message IDRange { message IDRange {
@ -855,6 +856,7 @@ message ImportRequest {
IDRange ID_range = 11; IDRange ID_range = 11;
repeated ImportRequestSegment request_segments = 12; repeated ImportRequestSegment request_segments = 12;
index.StorageConfig storage_config = 13; index.StorageConfig storage_config = 13;
int64 task_slot = 14;
} }
message QueryPreImportRequest { message QueryPreImportRequest {

File diff suppressed because it is too large Load Diff

View File

@ -4053,6 +4053,8 @@ type dataCoordConfig struct {
MaxImportJobNum ParamItem `refreshable:"true"` MaxImportJobNum ParamItem `refreshable:"true"`
WaitForIndex ParamItem `refreshable:"true"` WaitForIndex ParamItem `refreshable:"true"`
ImportPreAllocIDExpansionFactor ParamItem `refreshable:"true"` ImportPreAllocIDExpansionFactor ParamItem `refreshable:"true"`
ImportFileNumPerSlot ParamItem `refreshable:"true"`
ImportMemoryLimitPerSlot ParamItem `refreshable:"true"`
GracefulStopTimeout ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"`
@ -4899,7 +4901,7 @@ if param targetVecIndexVersion is not set, the default value is -1, which means
Version: "2.4.0", Version: "2.4.0",
Doc: "To prevent generating of small segments, we will re-group imported files. " + Doc: "To prevent generating of small segments, we will re-group imported files. " +
"This parameter represents the sum of file sizes in each group (each ImportTask).", "This parameter represents the sum of file sizes in each group (each ImportTask).",
DefaultValue: "6144", DefaultValue: "16384",
PanicIfEmpty: false, PanicIfEmpty: false,
Export: true, Export: true,
} }
@ -4973,6 +4975,30 @@ if param targetVecIndexVersion is not set, the default value is -1, which means
} }
p.ImportPreAllocIDExpansionFactor.Init(base.mgr) p.ImportPreAllocIDExpansionFactor.Init(base.mgr)
p.ImportFileNumPerSlot = ParamItem{
Key: "dataCoord.import.fileNumPerSlot",
Version: "2.5.15",
Doc: "The files number per slot for pre-import/import task.",
DefaultValue: "1",
PanicIfEmpty: false,
Export: true,
}
p.ImportFileNumPerSlot.Init(base.mgr)
p.ImportMemoryLimitPerSlot = ParamItem{
Key: "dataCoord.import.memoryLimitPerSlot",
Version: "2.5.15",
Doc: "The memory limit (in MB) of buffer size per slot for pre-import/import task.",
DefaultValue: "160",
PanicIfEmpty: false,
Export: true,
Formatter: func(value string) string {
bufferSize := getAsFloat(value)
return fmt.Sprintf("%d", int(megaBytes2Bytes(bufferSize)))
},
}
p.ImportMemoryLimitPerSlot.Init(base.mgr)
p.GracefulStopTimeout = ParamItem{ p.GracefulStopTimeout = ParamItem{
Key: "dataCoord.gracefulStopTimeout", Key: "dataCoord.gracefulStopTimeout",
Version: "2.3.7", Version: "2.3.7",
@ -5155,10 +5181,10 @@ if param targetVecIndexVersion is not set, the default value is -1, which means
// ///////////////////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////////////////
// --- datanode --- // --- datanode ---
type dataNodeConfig struct { type dataNodeConfig struct {
FlowGraphMaxQueueLength ParamItem `refreshable:"false"` FlowGraphMaxQueueLength ParamItem `refreshable:"false"`
FlowGraphMaxParallelism ParamItem `refreshable:"false"` FlowGraphMaxParallelism ParamItem `refreshable:"false"`
MaxParallelSyncTaskNum ParamItem `refreshable:"false"` MaxParallelSyncTaskNum ParamItem `refreshable:"false"`
MaxParallelSyncMgrTasks ParamItem `refreshable:"true"` MaxParallelSyncMgrTasksPerCPUCore ParamItem `refreshable:"true"`
// skip mode // skip mode
FlowGraphSkipModeEnable ParamItem `refreshable:"true"` FlowGraphSkipModeEnable ParamItem `refreshable:"true"`
@ -5202,11 +5228,12 @@ type dataNodeConfig struct {
ChannelCheckpointUpdateTickInSeconds ParamItem `refreshable:"true"` ChannelCheckpointUpdateTickInSeconds ParamItem `refreshable:"true"`
// import // import
MaxConcurrentImportTaskNum ParamItem `refreshable:"true"` ImportConcurrencyPerCPUCore ParamItem `refreshable:"true"`
MaxImportFileSizeInGB ParamItem `refreshable:"true"` MaxImportFileSizeInGB ParamItem `refreshable:"true"`
ImportInsertBufferSize ParamItem `refreshable:"true"` ImportBaseBufferSize ParamItem `refreshable:"true"`
ImportDeleteBufferSize ParamItem `refreshable:"true"` ImportDeleteBufferSize ParamItem `refreshable:"true"`
MaxTaskSlotNum ParamItem `refreshable:"true"` MaxTaskSlotNum ParamItem `refreshable:"true"`
ImportMemoryLimitPercentage ParamItem `refreshable:"true"`
// Compaction // Compaction
L0BatchMemoryRatio ParamItem `refreshable:"true"` L0BatchMemoryRatio ParamItem `refreshable:"true"`
@ -5292,22 +5319,22 @@ func (p *dataNodeConfig) init(base *BaseTable) {
} }
p.MaxParallelSyncTaskNum.Init(base.mgr) p.MaxParallelSyncTaskNum.Init(base.mgr)
p.MaxParallelSyncMgrTasks = ParamItem{ p.MaxParallelSyncMgrTasksPerCPUCore = ParamItem{
Key: "dataNode.dataSync.maxParallelSyncMgrTasks", Key: "dataNode.dataSync.maxParallelSyncMgrTasksPerCPUCore",
Version: "2.3.4", Version: "2.3.4",
DefaultValue: "64", DefaultValue: "16",
Doc: "The max concurrent sync task number of datanode sync mgr globally", Doc: "The max concurrent sync task number of datanode sync mgr per CPU core",
Formatter: func(v string) string { Formatter: func(v string) string {
concurrency := getAsInt(v) concurrency := getAsInt(v)
if concurrency < 1 { if concurrency < 1 {
log.Warn("positive parallel task number, reset to default 64", zap.String("value", v)) log.Warn("positive parallel task number, reset to default 16", zap.String("value", v))
return "64" // MaxParallelSyncMgrTasks must >= 1 return "16" // MaxParallelSyncMgrTasksPerCPUCore must >= 1
} }
return strconv.FormatInt(int64(concurrency), 10) return strconv.FormatInt(int64(concurrency), 10)
}, },
Export: true, Export: true,
} }
p.MaxParallelSyncMgrTasks.Init(base.mgr) p.MaxParallelSyncMgrTasksPerCPUCore.Init(base.mgr)
p.FlushInsertBufferSize = ParamItem{ p.FlushInsertBufferSize = ParamItem{
Key: "dataNode.segment.insertBufSize", Key: "dataNode.segment.insertBufSize",
@ -5492,15 +5519,15 @@ if this parameter <= 0, will set it as 10`,
} }
p.ChannelCheckpointUpdateTickInSeconds.Init(base.mgr) p.ChannelCheckpointUpdateTickInSeconds.Init(base.mgr)
p.MaxConcurrentImportTaskNum = ParamItem{ p.ImportConcurrencyPerCPUCore = ParamItem{
Key: "dataNode.import.maxConcurrentTaskNum", Key: "dataNode.import.concurrencyPerCPUCore",
Version: "2.4.0", Version: "2.4.0",
Doc: "The maximum number of import/pre-import tasks allowed to run concurrently on a datanode.", Doc: "The execution concurrency unit for import/pre-import tasks per CPU core.",
DefaultValue: "16", DefaultValue: "4",
PanicIfEmpty: false, PanicIfEmpty: false,
Export: true, Export: true,
} }
p.MaxConcurrentImportTaskNum.Init(base.mgr) p.ImportConcurrencyPerCPUCore.Init(base.mgr)
p.MaxImportFileSizeInGB = ParamItem{ p.MaxImportFileSizeInGB = ParamItem{
Key: "dataNode.import.maxImportFileSizeInGB", Key: "dataNode.import.maxImportFileSizeInGB",
@ -5512,11 +5539,11 @@ if this parameter <= 0, will set it as 10`,
} }
p.MaxImportFileSizeInGB.Init(base.mgr) p.MaxImportFileSizeInGB.Init(base.mgr)
p.ImportInsertBufferSize = ParamItem{ p.ImportBaseBufferSize = ParamItem{
Key: "dataNode.import.readBufferSizeInMB", Key: "dataNode.import.readBufferSizeInMB",
Version: "2.4.0", Version: "2.4.0",
Doc: "The insert buffer size (in MB) during import.", Doc: "The base insert buffer size (in MB) during import. The actual buffer size will be dynamically calculated based on the number of shards.",
DefaultValue: "64", DefaultValue: "16",
Formatter: func(v string) string { Formatter: func(v string) string {
bufferSize := getAsFloat(v) bufferSize := getAsFloat(v)
return fmt.Sprintf("%d", int(megaBytes2Bytes(bufferSize))) return fmt.Sprintf("%d", int(megaBytes2Bytes(bufferSize)))
@ -5524,7 +5551,7 @@ if this parameter <= 0, will set it as 10`,
PanicIfEmpty: false, PanicIfEmpty: false,
Export: true, Export: true,
} }
p.ImportInsertBufferSize.Init(base.mgr) p.ImportBaseBufferSize.Init(base.mgr)
p.ImportDeleteBufferSize = ParamItem{ p.ImportDeleteBufferSize = ParamItem{
Key: "dataNode.import.readDeleteBufferSizeInMB", Key: "dataNode.import.readDeleteBufferSizeInMB",
@ -5550,6 +5577,24 @@ if this parameter <= 0, will set it as 10`,
} }
p.MaxTaskSlotNum.Init(base.mgr) p.MaxTaskSlotNum.Init(base.mgr)
p.ImportMemoryLimitPercentage = ParamItem{
Key: "dataNode.import.memoryLimitPercentage",
Version: "2.5.15",
Doc: "The percentage of memory limit for import/pre-import tasks.",
DefaultValue: "20",
PanicIfEmpty: false,
Export: true,
Formatter: func(v string) string {
percentage := getAsFloat(v)
if percentage <= 0 || percentage > 100 {
log.Warn("invalid import memory limit percentage, using default 20%")
return "20"
}
return fmt.Sprintf("%f", percentage)
},
}
p.ImportMemoryLimitPercentage.Init(base.mgr)
p.L0BatchMemoryRatio = ParamItem{ p.L0BatchMemoryRatio = ParamItem{
Key: "dataNode.compaction.levelZeroBatchMemoryRatio", Key: "dataNode.compaction.levelZeroBatchMemoryRatio",
Version: "2.4.0", Version: "2.4.0",

View File

@ -516,13 +516,15 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, false, Params.AutoUpgradeSegmentIndex.GetAsBool()) assert.Equal(t, false, Params.AutoUpgradeSegmentIndex.GetAsBool())
assert.Equal(t, 2, Params.FilesPerPreImportTask.GetAsInt()) assert.Equal(t, 2, Params.FilesPerPreImportTask.GetAsInt())
assert.Equal(t, 10800*time.Second, Params.ImportTaskRetention.GetAsDuration(time.Second)) assert.Equal(t, 10800*time.Second, Params.ImportTaskRetention.GetAsDuration(time.Second))
assert.Equal(t, 6144, Params.MaxSizeInMBPerImportTask.GetAsInt()) assert.Equal(t, 16384, Params.MaxSizeInMBPerImportTask.GetAsInt())
assert.Equal(t, 2*time.Second, Params.ImportScheduleInterval.GetAsDuration(time.Second)) assert.Equal(t, 2*time.Second, Params.ImportScheduleInterval.GetAsDuration(time.Second))
assert.Equal(t, 2*time.Second, Params.ImportCheckIntervalHigh.GetAsDuration(time.Second)) assert.Equal(t, 2*time.Second, Params.ImportCheckIntervalHigh.GetAsDuration(time.Second))
assert.Equal(t, 120*time.Second, Params.ImportCheckIntervalLow.GetAsDuration(time.Second)) assert.Equal(t, 120*time.Second, Params.ImportCheckIntervalLow.GetAsDuration(time.Second))
assert.Equal(t, 1024, Params.MaxFilesPerImportReq.GetAsInt()) assert.Equal(t, 1024, Params.MaxFilesPerImportReq.GetAsInt())
assert.Equal(t, 1024, Params.MaxImportJobNum.GetAsInt()) assert.Equal(t, 1024, Params.MaxImportJobNum.GetAsInt())
assert.Equal(t, true, Params.WaitForIndex.GetAsBool()) assert.Equal(t, true, Params.WaitForIndex.GetAsBool())
assert.Equal(t, 1, Params.ImportFileNumPerSlot.GetAsInt())
assert.Equal(t, 160*1024*1024, Params.ImportMemoryLimitPerSlot.GetAsInt())
params.Save("datacoord.gracefulStopTimeout", "100") params.Save("datacoord.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
@ -593,6 +595,10 @@ func TestComponentParam(t *testing.T) {
maxParallelSyncTaskNum := Params.MaxParallelSyncTaskNum.GetAsInt() maxParallelSyncTaskNum := Params.MaxParallelSyncTaskNum.GetAsInt()
t.Logf("maxParallelSyncTaskNum: %d", maxParallelSyncTaskNum) t.Logf("maxParallelSyncTaskNum: %d", maxParallelSyncTaskNum)
maxParallelSyncMgrTasksPerCPUCore := Params.MaxParallelSyncMgrTasksPerCPUCore.GetAsInt()
t.Logf("maxParallelSyncMgrTasksPerCPUCore: %d", maxParallelSyncMgrTasksPerCPUCore)
assert.Equal(t, 16, maxParallelSyncMgrTasksPerCPUCore)
size := Params.FlushInsertBufferSize.GetAsInt() size := Params.FlushInsertBufferSize.GetAsInt()
t.Logf("FlushInsertBufferSize: %d", size) t.Logf("FlushInsertBufferSize: %d", size)
@ -610,13 +616,12 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 128, Params.MaxChannelCheckpointsPerRPC.GetAsInt()) assert.Equal(t, 128, Params.MaxChannelCheckpointsPerRPC.GetAsInt())
assert.Equal(t, 10*time.Second, Params.ChannelCheckpointUpdateTickInSeconds.GetAsDuration(time.Second)) assert.Equal(t, 10*time.Second, Params.ChannelCheckpointUpdateTickInSeconds.GetAsDuration(time.Second))
maxConcurrentImportTaskNum := Params.MaxConcurrentImportTaskNum.GetAsInt() assert.Equal(t, 4, Params.ImportConcurrencyPerCPUCore.GetAsInt())
t.Logf("maxConcurrentImportTaskNum: %d", maxConcurrentImportTaskNum)
assert.Equal(t, 16, maxConcurrentImportTaskNum)
assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64()) assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64())
assert.Equal(t, 64*1024*1024, Params.ImportInsertBufferSize.GetAsInt()) assert.Equal(t, 16*1024*1024, Params.ImportBaseBufferSize.GetAsInt())
assert.Equal(t, 16*1024*1024, Params.ImportDeleteBufferSize.GetAsInt()) assert.Equal(t, 16*1024*1024, Params.ImportDeleteBufferSize.GetAsInt())
assert.Equal(t, 16, Params.MaxTaskSlotNum.GetAsInt()) assert.Equal(t, 16, Params.MaxTaskSlotNum.GetAsInt())
assert.Equal(t, 20.0, Params.ImportMemoryLimitPercentage.GetAsFloat())
params.Save("datanode.gracefulStopTimeout", "100") params.Save("datanode.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
assert.Equal(t, 16, Params.SlotCap.GetAsInt()) assert.Equal(t, 16, Params.SlotCap.GetAsInt())

View File

@ -179,7 +179,7 @@ func (s *BulkInsertSuite) runTestAutoID() {
func (s *BulkInsertSuite) TestAutoID() { func (s *BulkInsertSuite) TestAutoID() {
// make buffer size small to trigger multiple sync // make buffer size small to trigger multiple sync
revertGuard := s.Cluster.MustModifyMilvusConfig(map[string]string{ revertGuard := s.Cluster.MustModifyMilvusConfig(map[string]string{
paramtable.Get().DataNodeCfg.ImportInsertBufferSize.Key: "0.000001", paramtable.Get().DataNodeCfg.ImportBaseBufferSize.Key: "0.000001",
}) })
defer revertGuard() defer revertGuard()