diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index bd8e38a8f9..3ec3008100 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -283,6 +283,10 @@ func AllocImportSegment(ctx context.Context, return segment, nil } +// AssemblePreImportRequest builds a datapb.PreImportRequest for the given pre-import task and job. +// It fills job and task identifiers, collection, partitions, vchannels, schema, options, storage config, +// and converts the pre-import task's file stats into ImportFiles. CPU and memory slots are set from the +// task's computed slots and the plugin import context is attached to the request. func AssemblePreImportRequest(task ImportTask, job ImportJob) *datapb.PreImportRequest { importFiles := lo.Map(task.(*preImportTask).GetFileStats(), func(fileStats *datapb.ImportFileStats, _ int) *internalpb.ImportFile { @@ -298,13 +302,25 @@ func AssemblePreImportRequest(task ImportTask, job ImportJob) *datapb.PreImportR Schema: job.GetSchema(), ImportFiles: importFiles, Options: job.GetOptions(), - TaskSlot: task.GetTaskSlot(), StorageConfig: createStorageConfig(), } + + req.CpuSlot, req.MemorySlot = task.GetTaskSlot() WrapPluginContextWithImport(task.GetCollectionID(), job.GetSchema().GetProperties(), job.GetOptions(), req) return req } +// AssembleImportRequest builds a datapb.ImportRequest for the given import task and job. +// +// It resolves the task's SegmentIDs to request segments, allocates a timestamp if the job has none, +// computes total rows from file stats, pre-allocates an ID range for auto IDs and log IDs using the +// allocator (applying the configured expansion factor), and populates request fields such as cluster, +// job/task/collection IDs, partitions, vchannels, schema, files, options, timestamp, IDRange, +// RequestSegments, storage configuration/version, and UseLoonFfi. The request's CpuSlot and MemorySlot +// are set from the task's calculated slots and the plugin context is attached. +// +// Returns the assembled ImportRequest, or an error if a referenced segment is missing or if any +// allocation (timestamp or IDs) fails. func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc allocator.Allocator) (*datapb.ImportRequest, error) { requestSegments := make([]*datapb.ImportRequestSegment, 0) for _, segmentID := range task.(*importTask).GetSegmentIDs() { @@ -377,10 +393,11 @@ func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc all IDRange: &datapb.IDRange{Begin: idBegin, End: idEnd}, RequestSegments: requestSegments, StorageConfig: createStorageConfig(), - TaskSlot: task.GetTaskSlot(), StorageVersion: storageVersion, UseLoonFfi: Params.CommonCfg.UseLoonFFI.GetAsBool(), } + + req.CpuSlot, req.MemorySlot = task.GetTaskSlot() WrapPluginContextWithImport(task.GetCollectionID(), job.GetSchema().GetProperties(), job.GetOptions(), req) return req, nil } @@ -817,8 +834,12 @@ func ValidateMaxImportJobExceed(ctx context.Context, importMeta ImportMeta) erro // The function uses a dual-constraint approach: // 1. CPU constraint: Based on the number of files to process in parallel // 2. Memory constraint: Based on the total buffer size required for all virtual channels and partitions -// Returns the maximum of the two constraints to ensure sufficient resources -func CalculateTaskSlot(task ImportTask, importMeta ImportMeta) int { +// CalculateTaskSlot computes required CPU and memory slot quantities for the given import task +// based on import configuration and the associated job's characteristics. +// It returns two values: the CPU-based slots (at least 1, derived from files per CPU slot) and +// the memory-based slots expressed in gigabytes (derived from the task buffer size; uses the +// delete buffer size when the job is an L0 import). +func CalculateTaskSlot(task ImportTask, importMeta ImportMeta) (float64, float64) { job := importMeta.GetJob(context.TODO(), task.GetJobID()) // Calculate CPU-based slots @@ -843,16 +864,15 @@ func CalculateTaskSlot(task ImportTask, importMeta ImportMeta) int { // L0 import use fixed buffer size taskBufferSize = paramtable.Get().DataNodeCfg.ImportDeleteBufferSize.GetAsInt() } - memoryLimitPerSlot := paramtable.Get().DataCoordCfg.ImportMemoryLimitPerSlot.GetAsInt() - memoryBasedSlots := taskBufferSize / memoryLimitPerSlot + memoryBasedSlots := float64(taskBufferSize) / 1024 / 1024 / 1024 // GB - // Return the larger value to ensure both CPU and memory constraints are satisfied - if cpuBasedSlots > memoryBasedSlots { - return cpuBasedSlots - } - return memoryBasedSlots + return float64(cpuBasedSlots), memoryBasedSlots } +// createSortCompactionTask creates a CompactionTask to perform a sort compaction of originSegment into targetSegmentID. +// If originSegment has zero rows the segment is marked Dropped in meta and nil, nil is returned. Otherwise the function +// allocates IDs, reads collection schema and TTL, computes the expected segment size, and returns a populated +// datapb.CompactionTask ready for scheduling. Returns an error if any allocation, metadata lookup, or update fails. func createSortCompactionTask(ctx context.Context, originSegment *SegmentInfo, targetSegmentID int64, @@ -913,4 +933,4 @@ func createSortCompactionTask(ctx context.Context, log.Ctx(ctx).Info("create sort compaction task success", zap.Int64("segmentID", originSegment.GetID()), zap.Int64("targetSegmentID", targetSegmentID), zap.Int64("num rows", originSegment.GetNumOfRows())) return task, nil -} +} \ No newline at end of file diff --git a/internal/datacoord/task/global_scheduler.go b/internal/datacoord/task/global_scheduler.go index 3749d0bdb1..bcbe7f3488 100644 --- a/internal/datacoord/task/global_scheduler.go +++ b/internal/datacoord/task/global_scheduler.go @@ -18,6 +18,7 @@ package task import ( "context" + "sort" "sync" "time" @@ -31,6 +32,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/lock" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" + "github.com/samber/lo" ) const NullNodeID = -1 @@ -137,24 +139,6 @@ func (s *globalTaskScheduler) Stop() { s.wg.Wait() } -func (s *globalTaskScheduler) pickNode(workerSlots map[int64]*session.WorkerSlots, taskSlot int64) int64 { - var maxAvailable int64 = -1 - var nodeID int64 = NullNodeID - - for id, ws := range workerSlots { - if ws.AvailableSlots > maxAvailable && ws.AvailableSlots > 0 { - maxAvailable = ws.AvailableSlots - nodeID = id - } - } - - if nodeID != NullNodeID { - workerSlots[nodeID].AvailableSlots = 0 - return nodeID - } - return NullNodeID -} - func (s *globalTaskScheduler) schedule() { pendingNum := len(s.pendingTasks.TaskIDs()) if pendingNum == 0 { @@ -169,8 +153,8 @@ func (s *globalTaskScheduler) schedule() { if task == nil { break } - taskSlot := task.GetTaskSlot() - nodeID := s.pickNode(nodeSlots, taskSlot) + cpuSlot, memorySlot := task.GetTaskSlot() + nodeID := s.pickNode(nodeSlots, cpuSlot, memorySlot) if nodeID == NullNodeID { s.pendingTasks.Push(task) break @@ -313,6 +297,11 @@ func (s *globalTaskScheduler) updateTaskTimeMetrics() { } } +// NewGlobalTaskScheduler creates a GlobalScheduler that manages global task lifecycle and scheduling across the provided cluster. +// The provided ctx is used as the parent context for the scheduler's lifecycle; canceling it will stop scheduler workers. +// The cluster parameter is used to interact with worker nodes for task creation, state checks, and drops. +// The returned scheduler is initialized with per-task key locking, a priority queue for pending tasks, a concurrent map for running tasks, +// and fixed-size execution and check worker pools (128 workers each). func NewGlobalTaskScheduler(ctx context.Context, cluster session.Cluster) GlobalScheduler { execPool := conc.NewPool[struct{}](128) checkPool := conc.NewPool[struct{}](128) @@ -329,3 +318,34 @@ func NewGlobalTaskScheduler(ctx context.Context, cluster session.Cluster) Global cluster: cluster, } } + +func (s *globalTaskScheduler) pickNode(workerSlots map[int64]*session.WorkerSlots, cpuSlot, memorySlot float64) int64 { + var nodeID int64 = NullNodeID + if len(workerSlots) <= 0 { + return nodeID + } + + workerSlotsList := lo.Values(workerSlots) + sort.Slice(workerSlotsList, func(i, j int) bool { + if workerSlotsList[i].AvailableMemorySlot == workerSlotsList[j].AvailableMemorySlot { + return workerSlotsList[i].AvailableCpuSlot > workerSlotsList[j].AvailableCpuSlot + } + return workerSlotsList[i].AvailableMemorySlot > workerSlotsList[j].AvailableMemorySlot + }) + optimal := workerSlotsList[0] + if memorySlot >= optimal.AvailableMemorySlot { + if optimal.TotalMemorySlot == optimal.AvailableMemorySlot { + nodeID = optimal.NodeID + } + } else { + if cpuSlot <= optimal.AvailableCpuSlot || optimal.AvailableCpuSlot == optimal.TotalCpuSlot || cpuSlot < 4 { + nodeID = optimal.NodeID + } + } + + if nodeID != NullNodeID { + workerSlots[nodeID].AvailableCpuSlot -= cpuSlot + workerSlots[nodeID].AvailableMemorySlot -= memorySlot + } + return nodeID +} \ No newline at end of file diff --git a/internal/datacoord/task_index.go b/internal/datacoord/task_index.go index 1812716b54..f7419bc6f9 100644 --- a/internal/datacoord/task_index.go +++ b/internal/datacoord/task_index.go @@ -46,7 +46,8 @@ import ( type indexBuildTask struct { *model.SegmentIndex - taskSlot int64 + cpuSlot float64 + memorySlot float64 times *taskcommon.Times @@ -58,8 +59,11 @@ type indexBuildTask struct { var _ globalTask.Task = (*indexBuildTask)(nil) +// newIndexBuildTask creates an indexBuildTask for the given SegmentIndex configured with the provided CPU and memory slots and associated managers. +// It initializes the task times and stores references to meta, handler, chunk manager, and index engine version manager. func newIndexBuildTask(segIndex *model.SegmentIndex, - taskSlot int64, + cpuSlot float64, + memorySlot float64, meta *meta, handler Handler, chunkManager storage.ChunkManager, @@ -67,7 +71,8 @@ func newIndexBuildTask(segIndex *model.SegmentIndex, ) *indexBuildTask { return &indexBuildTask{ SegmentIndex: segIndex, - taskSlot: taskSlot, + cpuSlot: cpuSlot, + memorySlot: memorySlot, times: taskcommon.NewTimes(), meta: meta, handler: handler, @@ -80,8 +85,8 @@ func (it *indexBuildTask) GetTaskID() int64 { return it.BuildID } -func (it *indexBuildTask) GetTaskSlot() int64 { - return it.taskSlot +func (it *indexBuildTask) GetTaskSlot() (float64, float64) { + return it.cpuSlot, it.memorySlot } func (it *indexBuildTask) GetTaskState() taskcommon.State { @@ -324,10 +329,11 @@ func (it *indexBuildTask) prepareJobRequest(ctx context.Context, segment *Segmen Field: field, PartitionKeyIsolation: partitionKeyIsolation, StorageVersion: segment.GetStorageVersion(), - TaskSlot: it.taskSlot, LackBinlogRows: segIndex.NumRows - totalRows, InsertLogs: segment.GetBinlogs(), Manifest: segment.GetManifestPath(), + CpuSlot: it.cpuSlot, + MemorySlot: it.memorySlot, } WrapPluginContext(segment.GetCollectionID(), schema.GetProperties(), req) @@ -428,4 +434,4 @@ func (it *indexBuildTask) tryDropTaskOnWorker(cluster session.Cluster) error { func (it *indexBuildTask) DropTaskOnWorker(cluster session.Cluster) { it.tryDropTaskOnWorker(cluster) -} +} \ No newline at end of file diff --git a/internal/datacoord/task_stats.go b/internal/datacoord/task_stats.go index ee5c12ac2f..1ae5bf9ca4 100644 --- a/internal/datacoord/task_stats.go +++ b/internal/datacoord/task_stats.go @@ -39,7 +39,8 @@ import ( type statsTask struct { *indexpb.StatsTask - taskSlot int64 + cpuSlot float64 + memorySlot float64 times *taskcommon.Times @@ -51,21 +52,26 @@ type statsTask struct { var _ globalTask.Task = (*statsTask)(nil) +// newStatsTask creates a new statsTask configured with the provided StatsTask, CPU and memory slot +// allocations, metadata, handler, allocator, and index engine version manager. The returned task has +// its timing tracker initialized. func newStatsTask(t *indexpb.StatsTask, - taskSlot int64, + cpuSlot float64, + memorySlot float64, mt *meta, handler Handler, allocator allocator.Allocator, ievm IndexEngineVersionManager, ) *statsTask { return &statsTask{ - StatsTask: t, - taskSlot: taskSlot, - times: taskcommon.NewTimes(), - meta: mt, - handler: handler, - allocator: allocator, - ievm: ievm, + StatsTask: t, + cpuSlot: cpuSlot, + memorySlot: memorySlot, + times: taskcommon.NewTimes(), + meta: mt, + handler: handler, + allocator: allocator, + ievm: ievm, } } @@ -81,8 +87,8 @@ func (st *statsTask) GetTaskState() taskcommon.State { return st.GetState() } -func (st *statsTask) GetTaskSlot() int64 { - return st.taskSlot +func (st *statsTask) GetTaskSlot() (float64, float64) { + return st.cpuSlot, st.memorySlot } func (st *statsTask) SetTaskTime(timeType taskcommon.TimeType, time time.Time) { @@ -334,13 +340,14 @@ func (st *statsTask) prepareJobRequest(ctx context.Context, segment *SegmentInfo TaskVersion: st.GetVersion(), EnableJsonKeyStats: Params.CommonCfg.EnabledJSONKeyStats.GetAsBool(), JsonKeyStatsDataFormat: common.JSONStatsDataFormatVersion, - TaskSlot: st.taskSlot, StorageVersion: segment.StorageVersion, CurrentScalarIndexVersion: st.ievm.GetCurrentScalarIndexEngineVersion(), JsonStatsMaxShreddingColumns: Params.DataCoordCfg.JSONStatsMaxShreddingColumns.GetAsInt64(), JsonStatsShreddingRatioThreshold: Params.DataCoordCfg.JSONStatsShreddingRatioThreshold.GetAsFloat(), JsonStatsWriteBatchSize: Params.DataCoordCfg.JSONStatsWriteBatchSize.GetAsInt64(), ManifestPath: segment.GetManifestPath(), + CpuSlot: st.cpuSlot, + MemorySlot: st.memorySlot, } WrapPluginContext(segment.GetCollectionID(), collInfo.Schema.GetProperties(), req) @@ -379,4 +386,4 @@ func (st *statsTask) SetJobInfo(ctx context.Context, result *workerpb.StatsResul zap.Int64("oldSegmentID", st.GetSegmentID()), zap.Int64("targetSegmentID", st.GetTargetSegmentID()), zap.String("subJobType", st.GetSubJobType().String()), zap.String("state", st.GetState().String())) return nil -} +} \ No newline at end of file diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 383f711712..ddffa33f2a 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -366,6 +366,7 @@ func createStorageConfig() *indexpb.StorageConfig { return storageConfig } +// getSortStatus reports the sort state as either "sorted" or "unsorted". func getSortStatus(sorted bool) string { if sorted { return "sorted" @@ -373,35 +374,60 @@ func getSortStatus(sorted bool) string { return "unsorted" } -func calculateIndexTaskSlot(fieldSize int64, isVectorIndex bool) int64 { - defaultSlots := Params.DataCoordCfg.IndexTaskSlotUsage.GetAsInt64() - if !isVectorIndex { - defaultSlots = Params.DataCoordCfg.ScalarIndexTaskSlotUsage.GetAsInt64() +// calculateStatsTaskSlot computes CPU and memory slot values for a stats task based on the segment size. +// It returns cpuSlot and memorySlot, where cpuSlot is taken from the configured CPU factor and memorySlot equals the segment size converted to GiB multiplied by the configured memory factor. +func calculateStatsTaskSlot(segmentSize int64) (float64, float64) { + cpuSlot := Params.DataCoordCfg.StatsTaskCPUFactor.GetAsFloat() + memorySlot := float64(segmentSize) / 1024 / 1024 / 1024 * Params.DataCoordCfg.StatsTaskMemoryFactor.GetAsFloat() + return cpuSlot, memorySlot +} + +// CalculateIndexTaskSlot computes CPU and memory slot estimates for an index-building task. +// +// CalculateIndexTaskSlot selects CPU and memory factors from configuration for scalar or vector +// indexes based on isVectorIndex. The memory slot scales linearly with fieldSize (interpreted in bytes +// and converted to GiB). The returned cpuSlot is reduced for smaller fields (full, 1/2, 1/4, 1/8), +// while memorySlot is always proportional to fieldSize * configured memory factor. +// +// Parameters: +// - fieldSize: size of the field in bytes. +// - isVectorIndex: when true, use vector-index factors; otherwise use scalar-index factors. +// +// Returns two floats: cpuSlot and memorySlot. cpuSlot is the computed CPU slot value; memorySlot is +// the computed memory slot value in the same unit as the configured memory factor. +func calculateIndexTaskSlot(fieldSize int64, isVectorIndex bool) (float64, float64) { + cpuSlot := Params.DataCoordCfg.ScalarIndexTaskCPUFactor.GetAsFloat() + memorySlot := float64(fieldSize) / 1024 / 1024 / 1024 * Params.DataCoordCfg.ScalarIndexTaskMemoryFactor.GetAsFloat() + + if isVectorIndex { + cpuSlot = Params.DataCoordCfg.VectorIndexTaskCPUFactor.GetAsFloat() + memorySlot = float64(fieldSize) / 1024 / 1024 / 1024 * Params.DataCoordCfg.VectorIndexTaskMemoryFactor.GetAsFloat() } + if fieldSize > 512*1024*1024 { - taskSlot := max(fieldSize/512/1024/1024, 1) * defaultSlots - return max(taskSlot, 1) + return cpuSlot, memorySlot } else if fieldSize > 100*1024*1024 { - return max(defaultSlots/4, 1) + return cpuSlot / 2, memorySlot } else if fieldSize > 10*1024*1024 { - return max(defaultSlots/16, 1) + return cpuSlot / 4, memorySlot } - return max(defaultSlots/64, 1) + return cpuSlot / 8, memorySlot } -func calculateStatsTaskSlot(segmentSize int64) int64 { - defaultSlots := Params.DataCoordCfg.StatsTaskSlotUsage.GetAsInt64() - if segmentSize > 512*1024*1024 { - taskSlot := max(segmentSize/512/1024/1024, 1) * defaultSlots - return max(taskSlot, 1) - } else if segmentSize > 100*1024*1024 { - return max(defaultSlots/2, 1) - } else if segmentSize > 10*1024*1024 { - return max(defaultSlots/4, 1) +// IsVectorField determines whether the specified field in the collection is a vector field. +// It returns `true` if the field exists and is a vector field, `false` otherwise. +func IsVectorField(meta *meta, collID, fieldID int64) bool { + coll := meta.GetCollection(collID) + for _, field := range coll.Schema.GetFields() { + if field.GetFieldID() == fieldID { + return typeutil.IsVectorType(field.GetDataType()) + } } - return max(defaultSlots/8, 1) + return false } +// enableSortCompaction reports whether sort-based compaction is enabled. +// It returns true when both DataCoordCfg.EnableSortCompaction and DataCoordCfg.EnableCompaction are enabled in the configuration, false otherwise. func enableSortCompaction() bool { return paramtable.Get().DataCoordCfg.EnableSortCompaction.GetAsBool() && paramtable.Get().DataCoordCfg.EnableCompaction.GetAsBool() } @@ -424,4 +450,4 @@ func stringifyBinlogs(binlogs []*datapb.FieldBinlog) []string { strs = append(strs, fmt.Sprintf("f%d:%s", binlogs[0].GetFieldID(), strings.Join(fieldsStrs, "|"))) } return strs -} +} \ No newline at end of file diff --git a/internal/datanode/compactor/executor.go b/internal/datanode/compactor/executor.go index 29e7c2d2ae..16b738ed4d 100644 --- a/internal/datanode/compactor/executor.go +++ b/internal/datanode/compactor/executor.go @@ -39,6 +39,7 @@ type Executor interface { Start(ctx context.Context) Enqueue(task Compactor) (bool, error) Slots() int64 + SlotsV2() (float64, float64) RemoveTask(planID int64) // Deprecated in 2.6 GetResults(planID int64) []*datapb.CompactionPlanResult // Deprecated in 2.6 } @@ -64,20 +65,28 @@ type executor struct { taskCh chan Compactor // Slot tracking for resource management - usingSlots int64 + usingSlots int64 + usingCpuSlots float64 + usingMemorySlots float64 // Slots(Slots Cap for DataCoord), ExecPool(MaxCompactionConcurrency) are all trying to control concurrency and resource usage, // which creates unnecessary complexity. We should use a single resource pool instead. } +// NewExecutor creates a new executor initialized with an empty task map, a buffered task channel of capacity maxTaskQueueNum, and zeroed slot counters for total, CPU, and memory usage. func NewExecutor() *executor { return &executor{ - tasks: make(map[int64]*taskState), - taskCh: make(chan Compactor, maxTaskQueueNum), - usingSlots: 0, + tasks: make(map[int64]*taskState), + taskCh: make(chan Compactor, maxTaskQueueNum), + usingSlots: 0, + usingCpuSlots: 0, + usingMemorySlots: 0, } } +// getTaskSlotUsage returns the finalized slot usage for the provided compaction task. +// It uses the task's reported GetSlotUsage value when positive; otherwise it falls back +// to the configured default slot usage for the task's compaction type and logs a warning. func getTaskSlotUsage(task Compactor) int64 { // Calculate slot usage taskSlotUsage := task.GetSlotUsage() @@ -100,14 +109,39 @@ func getTaskSlotUsage(task Compactor) int64 { return taskSlotUsage } -func (e *executor) Enqueue(task Compactor) (bool, error) { - e.mu.Lock() - defer e.mu.Unlock() +// The returned values are (cpuSlotUsage, memorySlotUsage). +func getTaskSlotUsageV2(task Compactor) (float64, float64) { + // Calculate slot usage + taskCpuSlotUsage, taskMemorySlotUsage := task.GetSlotUsageV2() + // compatible for old datacoord or unexpected request + if taskCpuSlotUsage <= 0 || taskMemorySlotUsage <= 0 { + switch task.GetCompactionType() { + case datapb.CompactionType_ClusteringCompaction: + taskCpuSlotUsage, taskMemorySlotUsage = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsFloat(), paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsFloat() + case datapb.CompactionType_MixCompaction: + taskCpuSlotUsage, taskMemorySlotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsFloat(), paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsFloat() + case datapb.CompactionType_Level0DeleteCompaction: + taskCpuSlotUsage, taskMemorySlotUsage = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsFloat(), paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsFloat() + } + illegalCpuSlot, illegalMemorySlot := task.GetSlotUsageV2() + log.Warn("illegal task slot usage, change it to a default value", + zap.Float64("illegalCpuSlotUsage", illegalCpuSlot), + zap.Float64("illegalMemorySlotUsage", illegalMemorySlot), + zap.Float64("defaultCpuSlotUsage", taskCpuSlotUsage), + zap.Float64("defaultMemorySlotUsage", taskMemorySlotUsage), + zap.String("type", task.GetCompactionType().String())) + } + return taskCpuSlotUsage, taskMemorySlotUsage +} + +func (e *executor) Enqueue(task Compactor) (bool, error) { planID := task.GetPlanID() + e.mu.Lock() // Check for duplicate task if _, exists := e.tasks[planID]; exists { + e.mu.Unlock() log.Warn("duplicated compaction task", zap.Int64("planID", planID), zap.String("channel", task.GetChannelName())) @@ -116,12 +150,18 @@ func (e *executor) Enqueue(task Compactor) (bool, error) { // Update slots and add task e.usingSlots += getTaskSlotUsage(task) + cpuSlot, memorySlot := getTaskSlotUsageV2(task) + e.usingCpuSlots += cpuSlot + e.usingMemorySlots += memorySlot e.tasks[planID] = &taskState{ compactor: task, state: datapb.CompactionTaskState_executing, result: nil, } + e.mu.Unlock() + // Send to channel after releasing lock to avoid deadlock + // when channel is full and completeTask needs to acquire lock e.taskCh <- task return true, nil } @@ -133,6 +173,13 @@ func (e *executor) Slots() int64 { return e.usingSlots } +// Slots returns the used slots for compaction +func (e *executor) SlotsV2() (float64, float64) { + e.mu.RLock() + defer e.mu.RUnlock() + return e.usingCpuSlots, e.usingMemorySlots +} + // completeTask updates task state to completed and adjusts slot usage func (e *executor) completeTask(planID int64, result *datapb.CompactionPlanResult) { e.mu.Lock() @@ -154,6 +201,16 @@ func (e *executor) completeTask(planID int64, result *datapb.CompactionPlanResul if e.usingSlots < 0 { e.usingSlots = 0 } + + cpuSlot, memorySlot := getTaskSlotUsageV2(task.compactor) + e.usingCpuSlots -= cpuSlot + e.usingMemorySlots -= memorySlot + if e.usingCpuSlots < 0 { + e.usingCpuSlots = 0 + } + if e.usingMemorySlots < 0 { + e.usingMemorySlots = 0 + } } } @@ -310,4 +367,4 @@ func (e *executor) getAllCompactionResults() []*datapb.CompactionPlanResult { } return results -} +} \ No newline at end of file diff --git a/internal/datanode/compactor/mix_compactor.go b/internal/datanode/compactor/mix_compactor.go index 083bf06ec8..bdab9b18d7 100644 --- a/internal/datanode/compactor/mix_compactor.go +++ b/internal/datanode/compactor/mix_compactor.go @@ -436,6 +436,11 @@ func (t *mixCompactionTask) GetSlotUsage() int64 { return t.plan.GetSlotUsage() } +func (t *mixCompactionTask) GetSlotUsageV2() (float64, float64) { + return t.plan.GetCpuSlot(), t.plan.GetMemorySlot() +} + +// It returns a slice of those field IDs, or an empty slice if no BM25 functions are present. func GetBM25FieldIDs(coll *schemapb.CollectionSchema) []int64 { return lo.FilterMap(coll.GetFunctions(), func(function *schemapb.FunctionSchema, _ int) (int64, bool) { if function.GetType() == schemapb.FunctionType_BM25 { @@ -443,4 +448,4 @@ func GetBM25FieldIDs(coll *schemapb.CollectionSchema) []int64 { } return 0, false }) -} +} \ No newline at end of file diff --git a/internal/datanode/index/scheduler.go b/internal/datanode/index/scheduler.go index ce175c8764..0dd0c52b69 100644 --- a/internal/datanode/index/scheduler.go +++ b/internal/datanode/index/scheduler.go @@ -21,7 +21,6 @@ import ( "context" "runtime/debug" "sync" - "time" "github.com/cockroachdb/errors" "go.uber.org/atomic" @@ -45,6 +44,8 @@ type TaskQueue interface { GetTaskNum() (int, int) GetUsingSlot() int64 GetActiveSlot() int64 + GetUsingSlotV2() (float64, float64) + GetActiveSlotV2() (float64, float64) } // BaseTaskQueue is a basic instance of TaskQueue. @@ -60,6 +61,9 @@ type IndexTaskQueue struct { utBufChan chan struct{} // to block scheduler usingSlot atomic.Int64 + usingCpuSlot atomic.Float64 + usingMemorySlot atomic.Float64 + sched *TaskScheduler } @@ -105,6 +109,23 @@ func (queue *IndexTaskQueue) GetActiveSlot() int64 { return slots } +func (queue *IndexTaskQueue) GetUsingSlotV2() (float64, float64) { + return queue.usingCpuSlot.Load(), queue.usingMemorySlot.Load() +} + +func (queue *IndexTaskQueue) GetActiveSlotV2() (float64, float64) { + queue.atLock.Lock() + defer queue.atLock.Unlock() + + cpuSlots, memorySlot := float64(0), float64(0) + for _, t := range queue.activeTasks { + taskCpuSlot, taskMemorySlot := t.GetSlotV2() + cpuSlots += taskCpuSlot + memorySlot += taskMemorySlot + } + return cpuSlots, memorySlot +} + // PopUnissuedTask pops a task from tasks queue. func (queue *IndexTaskQueue) PopUnissuedTask() Task { queue.utLock.Lock() @@ -143,6 +164,9 @@ func (queue *IndexTaskQueue) PopActiveTask(tName string) Task { if ok { delete(queue.activeTasks, tName) queue.usingSlot.Sub(t.GetSlot()) + taskCpuSlot, taskMemorySlot := t.GetSlotV2() + queue.usingCpuSlot.Sub(taskCpuSlot) + queue.usingMemorySlot.Sub(taskMemorySlot) return t } log.Ctx(queue.sched.ctx).Debug("task was not found in the active task list", zap.String("TaskName", tName)) @@ -160,6 +184,9 @@ func (queue *IndexTaskQueue) Enqueue(t Task) error { } queue.usingSlot.Add(t.GetSlot()) + taskCpuSlot, taskMemorySlot := t.GetSlotV2() + queue.usingCpuSlot.Add(taskCpuSlot) + queue.usingMemorySlot.Add(taskMemorySlot) return nil } @@ -180,7 +207,9 @@ func (queue *IndexTaskQueue) GetTaskNum() (int, int) { return utNum, atNum } -// NewIndexBuildTaskQueue creates a new IndexBuildTaskQueue. +// NewIndexBuildTaskQueue creates and returns a new IndexTaskQueue configured for the provided TaskScheduler. +// The returned queue has empty unissued and active task containers, a max unissued capacity of 1024, +// a buffered unissued-task channel with capacity 1024, and zeroed global, CPU and memory slot counters. func NewIndexBuildTaskQueue(sched *TaskScheduler) *IndexTaskQueue { return &IndexTaskQueue{ unissuedTasks: list.New(), @@ -190,7 +219,9 @@ func NewIndexBuildTaskQueue(sched *TaskScheduler) *IndexTaskQueue { utBufChan: make(chan struct{}, 1024), sched: sched, - usingSlot: atomic.Int64{}, + usingSlot: atomic.Int64{}, + usingCpuSlot: atomic.Float64{}, + usingMemorySlot: atomic.Float64{}, } } @@ -264,17 +295,9 @@ func (sched *TaskScheduler) indexBuildLoop() { return case <-sched.TaskQueue.utChan(): t := sched.TaskQueue.PopUnissuedTask() - for { - totalSlot := CalculateNodeSlots() - availableSlot := totalSlot - sched.TaskQueue.GetActiveSlot() - if availableSlot >= t.GetSlot() || totalSlot == availableSlot { - go func(t Task) { - sched.processTask(t, sched.TaskQueue) - }(t) - break - } - time.Sleep(time.Millisecond * 50) - } + go func(t Task) { + sched.processTask(t, sched.TaskQueue) + }(t) } } } @@ -290,4 +313,4 @@ func (sched *TaskScheduler) Start() error { func (sched *TaskScheduler) Close() { sched.cancel() sched.wg.Wait() -} +} \ No newline at end of file diff --git a/internal/datanode/index/util.go b/internal/datanode/index/util.go index bc64dc8a21..ed94336be4 100644 --- a/internal/datanode/index/util.go +++ b/internal/datanode/index/util.go @@ -80,6 +80,8 @@ func mapToKVPairs(m map[string]string) []*commonpb.KeyValuePair { return kvs } +// CalculateNodeSlots computes the number of worker slots available to a data node. +// It derives a base slot from half of the CPU count and from total memory (GiB / 8), takes the smaller of the two, multiplies that by DataNodeCfg.WorkerSlotUnit and BuildParallel, and if running in Standalone role scales the result by StandaloneSlotRatio; the final value is at least 1. func CalculateNodeSlots() int64 { cpuNum := hardware.GetCPUNum() memory := hardware.GetMemoryCount() @@ -96,3 +98,20 @@ func CalculateNodeSlots() int64 { } return totalSlot } + +// CalculateNodeSlotsV2 computes two slot estimates for a data node: a CPU-based slot and a memory-based slot. +// The CPU slot is CPU count multiplied by the configured BuildParallel factor; the memory slot is total memory in GiB +// multiplied by the same BuildParallel factor. If running in the Standalone role, both slots are further scaled +// by the configured StandaloneSlotRatio. +func CalculateNodeSlotsV2() (float64, float64) { + cpuNum := hardware.GetCPUNum() + memory := hardware.GetMemoryCount() + + cpuSlot := float64(cpuNum) * paramtable.Get().DataNodeCfg.BuildParallel.GetAsFloat() + memorySlot := float64(memory) / 1024 / 1024 / 1024 * paramtable.Get().DataNodeCfg.BuildParallel.GetAsFloat() + if paramtable.GetRole() == typeutil.StandaloneRole { + cpuSlot = cpuSlot * paramtable.Get().DataNodeCfg.StandaloneSlotRatio.GetAsFloat() + memorySlot = memorySlot * paramtable.Get().DataNodeCfg.StandaloneSlotRatio.GetAsFloat() + } + return cpuSlot, memorySlot +} \ No newline at end of file diff --git a/pkg/metrics/datanode_metrics.go b/pkg/metrics/datanode_metrics.go index cbcf164912..395d40d318 100644 --- a/pkg/metrics/datanode_metrics.go +++ b/pkg/metrics/datanode_metrics.go @@ -335,6 +335,22 @@ var ( Name: "slot", Help: "number of available and used slot", }, []string{nodeIDLabelName, "type"}) + + DataNodeCPUSlot = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataNodeRole, + Name: "cpu_slot", + Help: "number of available and used cpu slot", + }, []string{nodeIDLabelName, "type"}) + + DataNodeMemorySlot = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataNodeRole, + Name: "memory_slot", + Help: "number of available and used memory slot", + }, []string{nodeIDLabelName, "type"}) ) var registerDNOnce sync.Once @@ -346,7 +362,9 @@ func RegisterDataNode(registry *prometheus.Registry) { }) } -// registerDataNodeOnce registers DataNode metrics +// registerDataNodeOnce registers all DataNode-related Prometheus metrics with the provided registry. +// It registers metric vectors for input, in-memory, output, compaction, deprecated, and index metrics, +// and also registers logging and slot metrics. This function is intended to be executed once during initialization. func registerDataNodeOnce(registry *prometheus.Registry) { registry.MustRegister(DataNodeNumFlowGraphs) // input related @@ -386,8 +404,12 @@ func registerDataNodeOnce(registry *prometheus.Registry) { registry.MustRegister(DataNodeBuildJSONStatsLatency) registry.MustRegister(DataNodeSlot) RegisterLoggingMetrics(registry) + registry.MustRegister(DataNodeCPUSlot) + registry.MustRegister(DataNodeMemorySlot) } +// CleanupDataNodeCollectionMetrics removes collection-scoped DataNode metrics for the given node and collection. +// The channel parameter is accepted for API compatibility but is ignored. func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel string) { DataNodeConsumeTimeTickLag. Delete( @@ -423,4 +445,4 @@ func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel DataNodeWriteDataCount.Delete(prometheus.Labels{ collectionIDLabelName: fmt.Sprint(collectionID), }) -} +} \ No newline at end of file