mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
Remove stopTask. Replace multiple task tracking maps with single unified taskState map. Fix slot tracking, improve state transitions, and add comprehensive test See also: #44714 --------- Signed-off-by: yangxuan <xuan.yang@zilliz.com>
314 lines
8.4 KiB
Go
314 lines
8.4 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package compactor
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/samber/lo"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
|
"github.com/milvus-io/milvus/pkg/v2/metrics"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
|
)
|
|
|
|
const (
|
|
maxTaskQueueNum = 1024
|
|
)
|
|
|
|
type Executor interface {
|
|
Start(ctx context.Context)
|
|
Enqueue(task Compactor) (bool, error)
|
|
Slots() int64
|
|
RemoveTask(planID int64) // Deprecated in 2.6
|
|
GetResults(planID int64) []*datapb.CompactionPlanResult // Deprecated in 2.6
|
|
}
|
|
|
|
// taskState represents the state of a compaction task
|
|
// State transitions:
|
|
// - executing -> completed (success)
|
|
// - executing -> failed (error)
|
|
//
|
|
// Once a task reaches completed/failed state, it stays there until removed
|
|
type taskState struct {
|
|
compactor Compactor
|
|
state datapb.CompactionTaskState
|
|
result *datapb.CompactionPlanResult
|
|
}
|
|
|
|
type executor struct {
|
|
mu sync.RWMutex
|
|
|
|
tasks map[int64]*taskState // planID -> task state
|
|
|
|
// Task queue for pending work
|
|
taskCh chan Compactor
|
|
|
|
// Slot tracking for resource management
|
|
usingSlots int64
|
|
|
|
// Slots(Slots Cap for DataCoord), ExecPool(MaxCompactionConcurrency) are all trying to control concurrency and resource usage,
|
|
// which creates unnecessary complexity. We should use a single resource pool instead.
|
|
}
|
|
|
|
func NewExecutor() *executor {
|
|
return &executor{
|
|
tasks: make(map[int64]*taskState),
|
|
taskCh: make(chan Compactor, maxTaskQueueNum),
|
|
usingSlots: 0,
|
|
}
|
|
}
|
|
|
|
func getTaskSlotUsage(task Compactor) int64 {
|
|
// Calculate slot usage
|
|
taskSlotUsage := task.GetSlotUsage()
|
|
// compatible for old datacoord or unexpected request
|
|
if taskSlotUsage <= 0 {
|
|
switch task.GetCompactionType() {
|
|
case datapb.CompactionType_ClusteringCompaction:
|
|
taskSlotUsage = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64()
|
|
case datapb.CompactionType_MixCompaction:
|
|
taskSlotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
|
|
case datapb.CompactionType_Level0DeleteCompaction:
|
|
taskSlotUsage = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64()
|
|
}
|
|
log.Warn("illegal task slot usage, change it to a default value",
|
|
zap.Int64("illegalSlotUsage", task.GetSlotUsage()),
|
|
zap.Int64("defaultSlotUsage", taskSlotUsage),
|
|
zap.String("type", task.GetCompactionType().String()))
|
|
}
|
|
|
|
return taskSlotUsage
|
|
}
|
|
|
|
func (e *executor) Enqueue(task Compactor) (bool, error) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
planID := task.GetPlanID()
|
|
|
|
// Check for duplicate task
|
|
if _, exists := e.tasks[planID]; exists {
|
|
log.Warn("duplicated compaction task",
|
|
zap.Int64("planID", planID),
|
|
zap.String("channel", task.GetChannelName()))
|
|
return false, merr.WrapErrDuplicatedCompactionTask()
|
|
}
|
|
|
|
// Update slots and add task
|
|
e.usingSlots += getTaskSlotUsage(task)
|
|
e.tasks[planID] = &taskState{
|
|
compactor: task,
|
|
state: datapb.CompactionTaskState_executing,
|
|
result: nil,
|
|
}
|
|
|
|
e.taskCh <- task
|
|
return true, nil
|
|
}
|
|
|
|
// Slots returns the used slots for compaction
|
|
func (e *executor) Slots() int64 {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
return e.usingSlots
|
|
}
|
|
|
|
// completeTask updates task state to completed and adjusts slot usage
|
|
func (e *executor) completeTask(planID int64, result *datapb.CompactionPlanResult) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
if task, exists := e.tasks[planID]; exists {
|
|
task.compactor.Complete()
|
|
|
|
// Update state based on result
|
|
if result != nil {
|
|
task.state = datapb.CompactionTaskState_completed
|
|
task.result = result
|
|
} else {
|
|
task.state = datapb.CompactionTaskState_failed
|
|
}
|
|
|
|
// Adjust slot usage
|
|
e.usingSlots -= getTaskSlotUsage(task.compactor)
|
|
if e.usingSlots < 0 {
|
|
e.usingSlots = 0
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *executor) RemoveTask(planID int64) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
if task, exists := e.tasks[planID]; exists {
|
|
// Only remove completed/failed tasks, not executing ones
|
|
if task.state != datapb.CompactionTaskState_executing {
|
|
log.Info("Compaction task removed",
|
|
zap.Int64("planID", planID),
|
|
zap.String("channel", task.compactor.GetChannelName()),
|
|
zap.String("state", task.state.String()))
|
|
delete(e.tasks, planID)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *executor) Start(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case task := <-e.taskCh:
|
|
GetExecPool().Submit(func() (any, error) {
|
|
e.executeTask(task)
|
|
return nil, nil
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *executor) executeTask(task Compactor) {
|
|
log := log.With(
|
|
zap.Int64("planID", task.GetPlanID()),
|
|
zap.Int64("collection", task.GetCollection()),
|
|
zap.String("channel", task.GetChannelName()),
|
|
zap.String("type", task.GetCompactionType().String()),
|
|
)
|
|
|
|
log.Info("start to execute compaction")
|
|
|
|
result, err := task.Compact()
|
|
if err != nil {
|
|
log.Warn("compaction task failed", zap.Error(err))
|
|
e.completeTask(task.GetPlanID(), nil)
|
|
return
|
|
}
|
|
|
|
// Update task with result
|
|
e.completeTask(task.GetPlanID(), result)
|
|
|
|
// Emit metrics
|
|
getDataCount := func(binlogs []*datapb.FieldBinlog) int64 {
|
|
count := int64(0)
|
|
for _, binlog := range binlogs {
|
|
for _, fbinlog := range binlog.GetBinlogs() {
|
|
count += fbinlog.GetEntriesNum()
|
|
}
|
|
}
|
|
return count
|
|
}
|
|
|
|
var entityCount int64
|
|
var deleteCount int64
|
|
lo.ForEach(result.Segments, func(seg *datapb.CompactionSegment, _ int) {
|
|
entityCount += seg.GetNumOfRows()
|
|
deleteCount += getDataCount(seg.GetDeltalogs())
|
|
})
|
|
metrics.DataNodeWriteDataCount.WithLabelValues(
|
|
fmt.Sprint(paramtable.GetNodeID()),
|
|
metrics.CompactionDataSourceLabel,
|
|
metrics.InsertLabel,
|
|
fmt.Sprint(task.GetCollection())).Add(float64(entityCount))
|
|
metrics.DataNodeWriteDataCount.WithLabelValues(
|
|
fmt.Sprint(paramtable.GetNodeID()),
|
|
metrics.CompactionDataSourceLabel,
|
|
metrics.DeleteLabel,
|
|
fmt.Sprint(task.GetCollection())).Add(float64(deleteCount))
|
|
log.Info("end to execute compaction")
|
|
}
|
|
|
|
func (e *executor) GetResults(planID int64) []*datapb.CompactionPlanResult {
|
|
if planID != 0 {
|
|
result := e.getCompactionResult(planID)
|
|
return []*datapb.CompactionPlanResult{result}
|
|
}
|
|
return e.getAllCompactionResults()
|
|
}
|
|
|
|
func (e *executor) getCompactionResult(planID int64) *datapb.CompactionPlanResult {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
|
|
if task, exists := e.tasks[planID]; exists {
|
|
if task.result != nil {
|
|
return task.result
|
|
}
|
|
return &datapb.CompactionPlanResult{
|
|
State: task.state,
|
|
PlanID: planID,
|
|
}
|
|
}
|
|
|
|
// Task not found, return failed state
|
|
return &datapb.CompactionPlanResult{
|
|
PlanID: planID,
|
|
State: datapb.CompactionTaskState_failed,
|
|
}
|
|
}
|
|
|
|
func (e *executor) getAllCompactionResults() []*datapb.CompactionPlanResult {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
var (
|
|
executing []int64
|
|
completed []int64
|
|
completedLevelZero []int64
|
|
)
|
|
|
|
results := make([]*datapb.CompactionPlanResult, 0)
|
|
|
|
// Collect results from all tasks
|
|
for planID, task := range e.tasks {
|
|
if task.state == datapb.CompactionTaskState_executing {
|
|
executing = append(executing, planID)
|
|
results = append(results, &datapb.CompactionPlanResult{
|
|
State: datapb.CompactionTaskState_executing,
|
|
PlanID: planID,
|
|
})
|
|
} else if task.result != nil {
|
|
completed = append(completed, planID)
|
|
results = append(results, task.result)
|
|
|
|
if task.result.GetType() == datapb.CompactionType_Level0DeleteCompaction {
|
|
completedLevelZero = append(completedLevelZero, planID)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Remove completed level zero compaction tasks
|
|
for _, planID := range completedLevelZero {
|
|
delete(e.tasks, planID)
|
|
}
|
|
|
|
if len(results) > 0 {
|
|
log.Info("DataNode Compaction results",
|
|
zap.Int64s("executing", executing),
|
|
zap.Int64s("completed", completed),
|
|
zap.Int64s("completed levelzero", completedLevelZero),
|
|
)
|
|
}
|
|
|
|
return results
|
|
}
|