📝 Add docstrings to refine_task_assign-m

Docstrings generation was requested by @xiaocai2333.

* https://github.com/milvus-io/milvus/pull/45226#issuecomment-3688756542

The following files were modified:

* `internal/datacoord/import_util.go`
* `internal/datacoord/task/global_scheduler.go`
* `internal/datacoord/task_index.go`
* `internal/datacoord/task_stats.go`
* `internal/datacoord/util.go`
* `internal/datanode/compactor/executor.go`
* `internal/datanode/compactor/mix_compactor.go`
* `internal/datanode/index/scheduler.go`
* `internal/datanode/index/util.go`
* `pkg/metrics/datanode_metrics.go`
This commit is contained in:
coderabbitai[bot] 2025-12-24 09:53:16 +00:00 committed by GitHub
parent 7c714b0035
commit b99bed4234
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 303 additions and 98 deletions

View File

@ -283,6 +283,10 @@ func AllocImportSegment(ctx context.Context,
return segment, nil
}
// AssemblePreImportRequest builds a datapb.PreImportRequest for the given pre-import task and job.
// It fills job and task identifiers, collection, partitions, vchannels, schema, options, storage config,
// and converts the pre-import task's file stats into ImportFiles. CPU and memory slots are set from the
// task's computed slots and the plugin import context is attached to the request.
func AssemblePreImportRequest(task ImportTask, job ImportJob) *datapb.PreImportRequest {
importFiles := lo.Map(task.(*preImportTask).GetFileStats(),
func(fileStats *datapb.ImportFileStats, _ int) *internalpb.ImportFile {
@ -298,13 +302,25 @@ func AssemblePreImportRequest(task ImportTask, job ImportJob) *datapb.PreImportR
Schema: job.GetSchema(),
ImportFiles: importFiles,
Options: job.GetOptions(),
TaskSlot: task.GetTaskSlot(),
StorageConfig: createStorageConfig(),
}
req.CpuSlot, req.MemorySlot = task.GetTaskSlot()
WrapPluginContextWithImport(task.GetCollectionID(), job.GetSchema().GetProperties(), job.GetOptions(), req)
return req
}
// AssembleImportRequest builds a datapb.ImportRequest for the given import task and job.
//
// It resolves the task's SegmentIDs to request segments, allocates a timestamp if the job has none,
// computes total rows from file stats, pre-allocates an ID range for auto IDs and log IDs using the
// allocator (applying the configured expansion factor), and populates request fields such as cluster,
// job/task/collection IDs, partitions, vchannels, schema, files, options, timestamp, IDRange,
// RequestSegments, storage configuration/version, and UseLoonFfi. The request's CpuSlot and MemorySlot
// are set from the task's calculated slots and the plugin context is attached.
//
// Returns the assembled ImportRequest, or an error if a referenced segment is missing or if any
// allocation (timestamp or IDs) fails.
func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc allocator.Allocator) (*datapb.ImportRequest, error) {
requestSegments := make([]*datapb.ImportRequestSegment, 0)
for _, segmentID := range task.(*importTask).GetSegmentIDs() {
@ -377,10 +393,11 @@ func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc all
IDRange: &datapb.IDRange{Begin: idBegin, End: idEnd},
RequestSegments: requestSegments,
StorageConfig: createStorageConfig(),
TaskSlot: task.GetTaskSlot(),
StorageVersion: storageVersion,
UseLoonFfi: Params.CommonCfg.UseLoonFFI.GetAsBool(),
}
req.CpuSlot, req.MemorySlot = task.GetTaskSlot()
WrapPluginContextWithImport(task.GetCollectionID(), job.GetSchema().GetProperties(), job.GetOptions(), req)
return req, nil
}
@ -817,8 +834,12 @@ func ValidateMaxImportJobExceed(ctx context.Context, importMeta ImportMeta) erro
// The function uses a dual-constraint approach:
// 1. CPU constraint: Based on the number of files to process in parallel
// 2. Memory constraint: Based on the total buffer size required for all virtual channels and partitions
// Returns the maximum of the two constraints to ensure sufficient resources
func CalculateTaskSlot(task ImportTask, importMeta ImportMeta) int {
// CalculateTaskSlot computes required CPU and memory slot quantities for the given import task
// based on import configuration and the associated job's characteristics.
// It returns two values: the CPU-based slots (at least 1, derived from files per CPU slot) and
// the memory-based slots expressed in gigabytes (derived from the task buffer size; uses the
// delete buffer size when the job is an L0 import).
func CalculateTaskSlot(task ImportTask, importMeta ImportMeta) (float64, float64) {
job := importMeta.GetJob(context.TODO(), task.GetJobID())
// Calculate CPU-based slots
@ -843,16 +864,15 @@ func CalculateTaskSlot(task ImportTask, importMeta ImportMeta) int {
// L0 import use fixed buffer size
taskBufferSize = paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt()
}
memoryLimitPerSlot := paramtable.Get().DataCoordCfg.ImportMemoryLimitPerSlot.GetAsInt()
memoryBasedSlots := taskBufferSize / memoryLimitPerSlot
memoryBasedSlots := float64(taskBufferSize) / 1024 / 1024 / 1024 // GB
// Return the larger value to ensure both CPU and memory constraints are satisfied
if cpuBasedSlots > memoryBasedSlots {
return cpuBasedSlots
}
return memoryBasedSlots
return float64(cpuBasedSlots), memoryBasedSlots
}
// createSortCompactionTask creates a CompactionTask to perform a sort compaction of originSegment into targetSegmentID.
// If originSegment has zero rows the segment is marked Dropped in meta and nil, nil is returned. Otherwise the function
// allocates IDs, reads collection schema and TTL, computes the expected segment size, and returns a populated
// datapb.CompactionTask ready for scheduling. Returns an error if any allocation, metadata lookup, or update fails.
func createSortCompactionTask(ctx context.Context,
originSegment *SegmentInfo,
targetSegmentID int64,
@ -913,4 +933,4 @@ func createSortCompactionTask(ctx context.Context,
log.Ctx(ctx).Info("create sort compaction task success", zap.Int64("segmentID", originSegment.GetID()),
zap.Int64("targetSegmentID", targetSegmentID), zap.Int64("num rows", originSegment.GetNumOfRows()))
return task, nil
}
}

View File

@ -18,6 +18,7 @@ package task
import (
"context"
"sort"
"sync"
"time"
@ -31,6 +32,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/lock"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
"github.com/samber/lo"
)
const NullNodeID = -1
@ -137,24 +139,6 @@ func (s *globalTaskScheduler) Stop() {
s.wg.Wait()
}
func (s *globalTaskScheduler) pickNode(workerSlots map[int64]*session.WorkerSlots, taskSlot int64) int64 {
var maxAvailable int64 = -1
var nodeID int64 = NullNodeID
for id, ws := range workerSlots {
if ws.AvailableSlots > maxAvailable && ws.AvailableSlots > 0 {
maxAvailable = ws.AvailableSlots
nodeID = id
}
}
if nodeID != NullNodeID {
workerSlots[nodeID].AvailableSlots = 0
return nodeID
}
return NullNodeID
}
func (s *globalTaskScheduler) schedule() {
pendingNum := len(s.pendingTasks.TaskIDs())
if pendingNum == 0 {
@ -169,8 +153,8 @@ func (s *globalTaskScheduler) schedule() {
if task == nil {
break
}
taskSlot := task.GetTaskSlot()
nodeID := s.pickNode(nodeSlots, taskSlot)
cpuSlot, memorySlot := task.GetTaskSlot()
nodeID := s.pickNode(nodeSlots, cpuSlot, memorySlot)
if nodeID == NullNodeID {
s.pendingTasks.Push(task)
break
@ -313,6 +297,11 @@ func (s *globalTaskScheduler) updateTaskTimeMetrics() {
}
}
// NewGlobalTaskScheduler creates a GlobalScheduler that manages global task lifecycle and scheduling across the provided cluster.
// The provided ctx is used as the parent context for the scheduler's lifecycle; canceling it will stop scheduler workers.
// The cluster parameter is used to interact with worker nodes for task creation, state checks, and drops.
// The returned scheduler is initialized with per-task key locking, a priority queue for pending tasks, a concurrent map for running tasks,
// and fixed-size execution and check worker pools (128 workers each).
func NewGlobalTaskScheduler(ctx context.Context, cluster session.Cluster) GlobalScheduler {
execPool := conc.NewPool[struct{}](128)
checkPool := conc.NewPool[struct{}](128)
@ -329,3 +318,34 @@ func NewGlobalTaskScheduler(ctx context.Context, cluster session.Cluster) Global
cluster: cluster,
}
}
func (s *globalTaskScheduler) pickNode(workerSlots map[int64]*session.WorkerSlots, cpuSlot, memorySlot float64) int64 {
var nodeID int64 = NullNodeID
if len(workerSlots) <= 0 {
return nodeID
}
workerSlotsList := lo.Values(workerSlots)
sort.Slice(workerSlotsList, func(i, j int) bool {
if workerSlotsList[i].AvailableMemorySlot == workerSlotsList[j].AvailableMemorySlot {
return workerSlotsList[i].AvailableCpuSlot > workerSlotsList[j].AvailableCpuSlot
}
return workerSlotsList[i].AvailableMemorySlot > workerSlotsList[j].AvailableMemorySlot
})
optimal := workerSlotsList[0]
if memorySlot >= optimal.AvailableMemorySlot {
if optimal.TotalMemorySlot == optimal.AvailableMemorySlot {
nodeID = optimal.NodeID
}
} else {
if cpuSlot <= optimal.AvailableCpuSlot || optimal.AvailableCpuSlot == optimal.TotalCpuSlot || cpuSlot < 4 {
nodeID = optimal.NodeID
}
}
if nodeID != NullNodeID {
workerSlots[nodeID].AvailableCpuSlot -= cpuSlot
workerSlots[nodeID].AvailableMemorySlot -= memorySlot
}
return nodeID
}

View File

@ -46,7 +46,8 @@ import (
type indexBuildTask struct {
*model.SegmentIndex
taskSlot int64
cpuSlot float64
memorySlot float64
times *taskcommon.Times
@ -58,8 +59,11 @@ type indexBuildTask struct {
var _ globalTask.Task = (*indexBuildTask)(nil)
// newIndexBuildTask creates an indexBuildTask for the given SegmentIndex configured with the provided CPU and memory slots and associated managers.
// It initializes the task times and stores references to meta, handler, chunk manager, and index engine version manager.
func newIndexBuildTask(segIndex *model.SegmentIndex,
taskSlot int64,
cpuSlot float64,
memorySlot float64,
meta *meta,
handler Handler,
chunkManager storage.ChunkManager,
@ -67,7 +71,8 @@ func newIndexBuildTask(segIndex *model.SegmentIndex,
) *indexBuildTask {
return &indexBuildTask{
SegmentIndex: segIndex,
taskSlot: taskSlot,
cpuSlot: cpuSlot,
memorySlot: memorySlot,
times: taskcommon.NewTimes(),
meta: meta,
handler: handler,
@ -80,8 +85,8 @@ func (it *indexBuildTask) GetTaskID() int64 {
return it.BuildID
}
func (it *indexBuildTask) GetTaskSlot() int64 {
return it.taskSlot
func (it *indexBuildTask) GetTaskSlot() (float64, float64) {
return it.cpuSlot, it.memorySlot
}
func (it *indexBuildTask) GetTaskState() taskcommon.State {
@ -324,10 +329,11 @@ func (it *indexBuildTask) prepareJobRequest(ctx context.Context, segment *Segmen
Field: field,
PartitionKeyIsolation: partitionKeyIsolation,
StorageVersion: segment.GetStorageVersion(),
TaskSlot: it.taskSlot,
LackBinlogRows: segIndex.NumRows - totalRows,
InsertLogs: segment.GetBinlogs(),
Manifest: segment.GetManifestPath(),
CpuSlot: it.cpuSlot,
MemorySlot: it.memorySlot,
}
WrapPluginContext(segment.GetCollectionID(), schema.GetProperties(), req)
@ -428,4 +434,4 @@ func (it *indexBuildTask) tryDropTaskOnWorker(cluster session.Cluster) error {
func (it *indexBuildTask) DropTaskOnWorker(cluster session.Cluster) {
it.tryDropTaskOnWorker(cluster)
}
}

View File

@ -39,7 +39,8 @@ import (
type statsTask struct {
*indexpb.StatsTask
taskSlot int64
cpuSlot float64
memorySlot float64
times *taskcommon.Times
@ -51,21 +52,26 @@ type statsTask struct {
var _ globalTask.Task = (*statsTask)(nil)
// newStatsTask creates a new statsTask configured with the provided StatsTask, CPU and memory slot
// allocations, metadata, handler, allocator, and index engine version manager. The returned task has
// its timing tracker initialized.
func newStatsTask(t *indexpb.StatsTask,
taskSlot int64,
cpuSlot float64,
memorySlot float64,
mt *meta,
handler Handler,
allocator allocator.Allocator,
ievm IndexEngineVersionManager,
) *statsTask {
return &statsTask{
StatsTask: t,
taskSlot: taskSlot,
times: taskcommon.NewTimes(),
meta: mt,
handler: handler,
allocator: allocator,
ievm: ievm,
StatsTask: t,
cpuSlot: cpuSlot,
memorySlot: memorySlot,
times: taskcommon.NewTimes(),
meta: mt,
handler: handler,
allocator: allocator,
ievm: ievm,
}
}
@ -81,8 +87,8 @@ func (st *statsTask) GetTaskState() taskcommon.State {
return st.GetState()
}
func (st *statsTask) GetTaskSlot() int64 {
return st.taskSlot
func (st *statsTask) GetTaskSlot() (float64, float64) {
return st.cpuSlot, st.memorySlot
}
func (st *statsTask) SetTaskTime(timeType taskcommon.TimeType, time time.Time) {
@ -334,13 +340,14 @@ func (st *statsTask) prepareJobRequest(ctx context.Context, segment *SegmentInfo
TaskVersion: st.GetVersion(),
EnableJsonKeyStats: Params.CommonCfg.EnabledJSONKeyStats.GetAsBool(),
JsonKeyStatsDataFormat: common.JSONStatsDataFormatVersion,
TaskSlot: st.taskSlot,
StorageVersion: segment.StorageVersion,
CurrentScalarIndexVersion: st.ievm.GetCurrentScalarIndexEngineVersion(),
JsonStatsMaxShreddingColumns: Params.DataCoordCfg.JSONStatsMaxShreddingColumns.GetAsInt64(),
JsonStatsShreddingRatioThreshold: Params.DataCoordCfg.JSONStatsShreddingRatioThreshold.GetAsFloat(),
JsonStatsWriteBatchSize: Params.DataCoordCfg.JSONStatsWriteBatchSize.GetAsInt64(),
ManifestPath: segment.GetManifestPath(),
CpuSlot: st.cpuSlot,
MemorySlot: st.memorySlot,
}
WrapPluginContext(segment.GetCollectionID(), collInfo.Schema.GetProperties(), req)
@ -379,4 +386,4 @@ func (st *statsTask) SetJobInfo(ctx context.Context, result *workerpb.StatsResul
zap.Int64("oldSegmentID", st.GetSegmentID()), zap.Int64("targetSegmentID", st.GetTargetSegmentID()),
zap.String("subJobType", st.GetSubJobType().String()), zap.String("state", st.GetState().String()))
return nil
}
}

View File

@ -366,6 +366,7 @@ func createStorageConfig() *indexpb.StorageConfig {
return storageConfig
}
// getSortStatus reports the sort state as either "sorted" or "unsorted".
func getSortStatus(sorted bool) string {
if sorted {
return "sorted"
@ -373,35 +374,60 @@ func getSortStatus(sorted bool) string {
return "unsorted"
}
func calculateIndexTaskSlot(fieldSize int64, isVectorIndex bool) int64 {
defaultSlots := Params.DataCoordCfg.IndexTaskSlotUsage.GetAsInt64()
if !isVectorIndex {
defaultSlots = Params.DataCoordCfg.ScalarIndexTaskSlotUsage.GetAsInt64()
// calculateStatsTaskSlot computes CPU and memory slot values for a stats task based on the segment size.
// It returns cpuSlot and memorySlot, where cpuSlot is taken from the configured CPU factor and memorySlot equals the segment size converted to GiB multiplied by the configured memory factor.
func calculateStatsTaskSlot(segmentSize int64) (float64, float64) {
cpuSlot := Params.DataCoordCfg.StatsTaskCPUFactor.GetAsFloat()
memorySlot := float64(segmentSize) / 1024 / 1024 / 1024 * Params.DataCoordCfg.StatsTaskMemoryFactor.GetAsFloat()
return cpuSlot, memorySlot
}
// CalculateIndexTaskSlot computes CPU and memory slot estimates for an index-building task.
//
// CalculateIndexTaskSlot selects CPU and memory factors from configuration for scalar or vector
// indexes based on isVectorIndex. The memory slot scales linearly with fieldSize (interpreted in bytes
// and converted to GiB). The returned cpuSlot is reduced for smaller fields (full, 1/2, 1/4, 1/8),
// while memorySlot is always proportional to fieldSize * configured memory factor.
//
// Parameters:
// - fieldSize: size of the field in bytes.
// - isVectorIndex: when true, use vector-index factors; otherwise use scalar-index factors.
//
// Returns two floats: cpuSlot and memorySlot. cpuSlot is the computed CPU slot value; memorySlot is
// the computed memory slot value in the same unit as the configured memory factor.
func calculateIndexTaskSlot(fieldSize int64, isVectorIndex bool) (float64, float64) {
cpuSlot := Params.DataCoordCfg.ScalarIndexTaskCPUFactor.GetAsFloat()
memorySlot := float64(fieldSize) / 1024 / 1024 / 1024 * Params.DataCoordCfg.ScalarIndexTaskMemoryFactor.GetAsFloat()
if isVectorIndex {
cpuSlot = Params.DataCoordCfg.VectorIndexTaskCPUFactor.GetAsFloat()
memorySlot = float64(fieldSize) / 1024 / 1024 / 1024 * Params.DataCoordCfg.VectorIndexTaskMemoryFactor.GetAsFloat()
}
if fieldSize > 512*1024*1024 {
taskSlot := max(fieldSize/512/1024/1024, 1) * defaultSlots
return max(taskSlot, 1)
return cpuSlot, memorySlot
} else if fieldSize > 100*1024*1024 {
return max(defaultSlots/4, 1)
return cpuSlot / 2, memorySlot
} else if fieldSize > 10*1024*1024 {
return max(defaultSlots/16, 1)
return cpuSlot / 4, memorySlot
}
return max(defaultSlots/64, 1)
return cpuSlot / 8, memorySlot
}
func calculateStatsTaskSlot(segmentSize int64) int64 {
defaultSlots := Params.DataCoordCfg.StatsTaskSlotUsage.GetAsInt64()
if segmentSize > 512*1024*1024 {
taskSlot := max(segmentSize/512/1024/1024, 1) * defaultSlots
return max(taskSlot, 1)
} else if segmentSize > 100*1024*1024 {
return max(defaultSlots/2, 1)
} else if segmentSize > 10*1024*1024 {
return max(defaultSlots/4, 1)
// IsVectorField determines whether the specified field in the collection is a vector field.
// It returns `true` if the field exists and is a vector field, `false` otherwise.
func IsVectorField(meta *meta, collID, fieldID int64) bool {
coll := meta.GetCollection(collID)
for _, field := range coll.Schema.GetFields() {
if field.GetFieldID() == fieldID {
return typeutil.IsVectorType(field.GetDataType())
}
}
return max(defaultSlots/8, 1)
return false
}
// enableSortCompaction reports whether sort-based compaction is enabled.
// It returns true when both DataCoordCfg.EnableSortCompaction and DataCoordCfg.EnableCompaction are enabled in the configuration, false otherwise.
func enableSortCompaction() bool {
return paramtable.Get().DataCoordCfg.EnableSortCompaction.GetAsBool() && paramtable.Get().DataCoordCfg.EnableCompaction.GetAsBool()
}
@ -424,4 +450,4 @@ func stringifyBinlogs(binlogs []*datapb.FieldBinlog) []string {
strs = append(strs, fmt.Sprintf("f%d:%s", binlogs[0].GetFieldID(), strings.Join(fieldsStrs, "|")))
}
return strs
}
}

View File

@ -39,6 +39,7 @@ type Executor interface {
Start(ctx context.Context)
Enqueue(task Compactor) (bool, error)
Slots() int64
SlotsV2() (float64, float64)
RemoveTask(planID int64) // Deprecated in 2.6
GetResults(planID int64) []*datapb.CompactionPlanResult // Deprecated in 2.6
}
@ -64,20 +65,28 @@ type executor struct {
taskCh chan Compactor
// Slot tracking for resource management
usingSlots int64
usingSlots int64
usingCpuSlots float64
usingMemorySlots float64
// Slots(Slots Cap for DataCoord), ExecPool(MaxCompactionConcurrency) are all trying to control concurrency and resource usage,
// which creates unnecessary complexity. We should use a single resource pool instead.
}
// NewExecutor creates a new executor initialized with an empty task map, a buffered task channel of capacity maxTaskQueueNum, and zeroed slot counters for total, CPU, and memory usage.
func NewExecutor() *executor {
return &executor{
tasks: make(map[int64]*taskState),
taskCh: make(chan Compactor, maxTaskQueueNum),
usingSlots: 0,
tasks: make(map[int64]*taskState),
taskCh: make(chan Compactor, maxTaskQueueNum),
usingSlots: 0,
usingCpuSlots: 0,
usingMemorySlots: 0,
}
}
// getTaskSlotUsage returns the finalized slot usage for the provided compaction task.
// It uses the task's reported GetSlotUsage value when positive; otherwise it falls back
// to the configured default slot usage for the task's compaction type and logs a warning.
func getTaskSlotUsage(task Compactor) int64 {
// Calculate slot usage
taskSlotUsage := task.GetSlotUsage()
@ -100,14 +109,39 @@ func getTaskSlotUsage(task Compactor) int64 {
return taskSlotUsage
}
func (e *executor) Enqueue(task Compactor) (bool, error) {
e.mu.Lock()
defer e.mu.Unlock()
// The returned values are (cpuSlotUsage, memorySlotUsage).
func getTaskSlotUsageV2(task Compactor) (float64, float64) {
// Calculate slot usage
taskCpuSlotUsage, taskMemorySlotUsage := task.GetSlotUsageV2()
// compatible for old datacoord or unexpected request
if taskCpuSlotUsage <= 0 || taskMemorySlotUsage <= 0 {
switch task.GetCompactionType() {
case datapb.CompactionType_ClusteringCompaction:
taskCpuSlotUsage, taskMemorySlotUsage = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsFloat(), paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsFloat()
case datapb.CompactionType_MixCompaction:
taskCpuSlotUsage, taskMemorySlotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsFloat(), paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsFloat()
case datapb.CompactionType_Level0DeleteCompaction:
taskCpuSlotUsage, taskMemorySlotUsage = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsFloat(), paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsFloat()
}
illegalCpuSlot, illegalMemorySlot := task.GetSlotUsageV2()
log.Warn("illegal task slot usage, change it to a default value",
zap.Float64("illegalCpuSlotUsage", illegalCpuSlot),
zap.Float64("illegalMemorySlotUsage", illegalMemorySlot),
zap.Float64("defaultCpuSlotUsage", taskCpuSlotUsage),
zap.Float64("defaultMemorySlotUsage", taskMemorySlotUsage),
zap.String("type", task.GetCompactionType().String()))
}
return taskCpuSlotUsage, taskMemorySlotUsage
}
func (e *executor) Enqueue(task Compactor) (bool, error) {
planID := task.GetPlanID()
e.mu.Lock()
// Check for duplicate task
if _, exists := e.tasks[planID]; exists {
e.mu.Unlock()
log.Warn("duplicated compaction task",
zap.Int64("planID", planID),
zap.String("channel", task.GetChannelName()))
@ -116,12 +150,18 @@ func (e *executor) Enqueue(task Compactor) (bool, error) {
// Update slots and add task
e.usingSlots += getTaskSlotUsage(task)
cpuSlot, memorySlot := getTaskSlotUsageV2(task)
e.usingCpuSlots += cpuSlot
e.usingMemorySlots += memorySlot
e.tasks[planID] = &taskState{
compactor: task,
state: datapb.CompactionTaskState_executing,
result: nil,
}
e.mu.Unlock()
// Send to channel after releasing lock to avoid deadlock
// when channel is full and completeTask needs to acquire lock
e.taskCh <- task
return true, nil
}
@ -133,6 +173,13 @@ func (e *executor) Slots() int64 {
return e.usingSlots
}
// Slots returns the used slots for compaction
func (e *executor) SlotsV2() (float64, float64) {
e.mu.RLock()
defer e.mu.RUnlock()
return e.usingCpuSlots, e.usingMemorySlots
}
// completeTask updates task state to completed and adjusts slot usage
func (e *executor) completeTask(planID int64, result *datapb.CompactionPlanResult) {
e.mu.Lock()
@ -154,6 +201,16 @@ func (e *executor) completeTask(planID int64, result *datapb.CompactionPlanResul
if e.usingSlots < 0 {
e.usingSlots = 0
}
cpuSlot, memorySlot := getTaskSlotUsageV2(task.compactor)
e.usingCpuSlots -= cpuSlot
e.usingMemorySlots -= memorySlot
if e.usingCpuSlots < 0 {
e.usingCpuSlots = 0
}
if e.usingMemorySlots < 0 {
e.usingMemorySlots = 0
}
}
}
@ -310,4 +367,4 @@ func (e *executor) getAllCompactionResults() []*datapb.CompactionPlanResult {
}
return results
}
}

View File

@ -436,6 +436,11 @@ func (t *mixCompactionTask) GetSlotUsage() int64 {
return t.plan.GetSlotUsage()
}
func (t *mixCompactionTask) GetSlotUsageV2() (float64, float64) {
return t.plan.GetCpuSlot(), t.plan.GetMemorySlot()
}
// It returns a slice of those field IDs, or an empty slice if no BM25 functions are present.
func GetBM25FieldIDs(coll *schemapb.CollectionSchema) []int64 {
return lo.FilterMap(coll.GetFunctions(), func(function *schemapb.FunctionSchema, _ int) (int64, bool) {
if function.GetType() == schemapb.FunctionType_BM25 {
@ -443,4 +448,4 @@ func GetBM25FieldIDs(coll *schemapb.CollectionSchema) []int64 {
}
return 0, false
})
}
}

View File

@ -21,7 +21,6 @@ import (
"context"
"runtime/debug"
"sync"
"time"
"github.com/cockroachdb/errors"
"go.uber.org/atomic"
@ -45,6 +44,8 @@ type TaskQueue interface {
GetTaskNum() (int, int)
GetUsingSlot() int64
GetActiveSlot() int64
GetUsingSlotV2() (float64, float64)
GetActiveSlotV2() (float64, float64)
}
// BaseTaskQueue is a basic instance of TaskQueue.
@ -60,6 +61,9 @@ type IndexTaskQueue struct {
utBufChan chan struct{} // to block scheduler
usingSlot atomic.Int64
usingCpuSlot atomic.Float64
usingMemorySlot atomic.Float64
sched *TaskScheduler
}
@ -105,6 +109,23 @@ func (queue *IndexTaskQueue) GetActiveSlot() int64 {
return slots
}
func (queue *IndexTaskQueue) GetUsingSlotV2() (float64, float64) {
return queue.usingCpuSlot.Load(), queue.usingMemorySlot.Load()
}
func (queue *IndexTaskQueue) GetActiveSlotV2() (float64, float64) {
queue.atLock.Lock()
defer queue.atLock.Unlock()
cpuSlots, memorySlot := float64(0), float64(0)
for _, t := range queue.activeTasks {
taskCpuSlot, taskMemorySlot := t.GetSlotV2()
cpuSlots += taskCpuSlot
memorySlot += taskMemorySlot
}
return cpuSlots, memorySlot
}
// PopUnissuedTask pops a task from tasks queue.
func (queue *IndexTaskQueue) PopUnissuedTask() Task {
queue.utLock.Lock()
@ -143,6 +164,9 @@ func (queue *IndexTaskQueue) PopActiveTask(tName string) Task {
if ok {
delete(queue.activeTasks, tName)
queue.usingSlot.Sub(t.GetSlot())
taskCpuSlot, taskMemorySlot := t.GetSlotV2()
queue.usingCpuSlot.Sub(taskCpuSlot)
queue.usingMemorySlot.Sub(taskMemorySlot)
return t
}
log.Ctx(queue.sched.ctx).Debug("task was not found in the active task list", zap.String("TaskName", tName))
@ -160,6 +184,9 @@ func (queue *IndexTaskQueue) Enqueue(t Task) error {
}
queue.usingSlot.Add(t.GetSlot())
taskCpuSlot, taskMemorySlot := t.GetSlotV2()
queue.usingCpuSlot.Add(taskCpuSlot)
queue.usingMemorySlot.Add(taskMemorySlot)
return nil
}
@ -180,7 +207,9 @@ func (queue *IndexTaskQueue) GetTaskNum() (int, int) {
return utNum, atNum
}
// NewIndexBuildTaskQueue creates a new IndexBuildTaskQueue.
// NewIndexBuildTaskQueue creates and returns a new IndexTaskQueue configured for the provided TaskScheduler.
// The returned queue has empty unissued and active task containers, a max unissued capacity of 1024,
// a buffered unissued-task channel with capacity 1024, and zeroed global, CPU and memory slot counters.
func NewIndexBuildTaskQueue(sched *TaskScheduler) *IndexTaskQueue {
return &IndexTaskQueue{
unissuedTasks: list.New(),
@ -190,7 +219,9 @@ func NewIndexBuildTaskQueue(sched *TaskScheduler) *IndexTaskQueue {
utBufChan: make(chan struct{}, 1024),
sched: sched,
usingSlot: atomic.Int64{},
usingSlot: atomic.Int64{},
usingCpuSlot: atomic.Float64{},
usingMemorySlot: atomic.Float64{},
}
}
@ -264,17 +295,9 @@ func (sched *TaskScheduler) indexBuildLoop() {
return
case <-sched.TaskQueue.utChan():
t := sched.TaskQueue.PopUnissuedTask()
for {
totalSlot := CalculateNodeSlots()
availableSlot := totalSlot - sched.TaskQueue.GetActiveSlot()
if availableSlot >= t.GetSlot() || totalSlot == availableSlot {
go func(t Task) {
sched.processTask(t, sched.TaskQueue)
}(t)
break
}
time.Sleep(time.Millisecond * 50)
}
go func(t Task) {
sched.processTask(t, sched.TaskQueue)
}(t)
}
}
}
@ -290,4 +313,4 @@ func (sched *TaskScheduler) Start() error {
func (sched *TaskScheduler) Close() {
sched.cancel()
sched.wg.Wait()
}
}

View File

@ -80,6 +80,8 @@ func mapToKVPairs(m map[string]string) []*commonpb.KeyValuePair {
return kvs
}
// CalculateNodeSlots computes the number of worker slots available to a data node.
// It derives a base slot from half of the CPU count and from total memory (GiB / 8), takes the smaller of the two, multiplies that by DataNodeCfg.WorkerSlotUnit and BuildParallel, and if running in Standalone role scales the result by StandaloneSlotRatio; the final value is at least 1.
func CalculateNodeSlots() int64 {
cpuNum := hardware.GetCPUNum()
memory := hardware.GetMemoryCount()
@ -96,3 +98,20 @@ func CalculateNodeSlots() int64 {
}
return totalSlot
}
// CalculateNodeSlotsV2 computes two slot estimates for a data node: a CPU-based slot and a memory-based slot.
// The CPU slot is CPU count multiplied by the configured BuildParallel factor; the memory slot is total memory in GiB
// multiplied by the same BuildParallel factor. If running in the Standalone role, both slots are further scaled
// by the configured StandaloneSlotRatio.
func CalculateNodeSlotsV2() (float64, float64) {
cpuNum := hardware.GetCPUNum()
memory := hardware.GetMemoryCount()
cpuSlot := float64(cpuNum) * paramtable.Get().DataNodeCfg.BuildParallel.GetAsFloat()
memorySlot := float64(memory) / 1024 / 1024 / 1024 * paramtable.Get().DataNodeCfg.BuildParallel.GetAsFloat()
if paramtable.GetRole() == typeutil.StandaloneRole {
cpuSlot = cpuSlot * paramtable.Get().DataNodeCfg.StandaloneSlotRatio.GetAsFloat()
memorySlot = memorySlot * paramtable.Get().DataNodeCfg.StandaloneSlotRatio.GetAsFloat()
}
return cpuSlot, memorySlot
}

View File

@ -335,6 +335,22 @@ var (
Name: "slot",
Help: "number of available and used slot",
}, []string{nodeIDLabelName, "type"})
DataNodeCPUSlot = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "cpu_slot",
Help: "number of available and used cpu slot",
}, []string{nodeIDLabelName, "type"})
DataNodeMemorySlot = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "memory_slot",
Help: "number of available and used memory slot",
}, []string{nodeIDLabelName, "type"})
)
var registerDNOnce sync.Once
@ -346,7 +362,9 @@ func RegisterDataNode(registry *prometheus.Registry) {
})
}
// registerDataNodeOnce registers DataNode metrics
// registerDataNodeOnce registers all DataNode-related Prometheus metrics with the provided registry.
// It registers metric vectors for input, in-memory, output, compaction, deprecated, and index metrics,
// and also registers logging and slot metrics. This function is intended to be executed once during initialization.
func registerDataNodeOnce(registry *prometheus.Registry) {
registry.MustRegister(DataNodeNumFlowGraphs)
// input related
@ -386,8 +404,12 @@ func registerDataNodeOnce(registry *prometheus.Registry) {
registry.MustRegister(DataNodeBuildJSONStatsLatency)
registry.MustRegister(DataNodeSlot)
RegisterLoggingMetrics(registry)
registry.MustRegister(DataNodeCPUSlot)
registry.MustRegister(DataNodeMemorySlot)
}
// CleanupDataNodeCollectionMetrics removes collection-scoped DataNode metrics for the given node and collection.
// The channel parameter is accepted for API compatibility but is ignored.
func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel string) {
DataNodeConsumeTimeTickLag.
Delete(
@ -423,4 +445,4 @@ func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel
DataNodeWriteDataCount.Delete(prometheus.Labels{
collectionIDLabelName: fmt.Sprint(collectionID),
})
}
}