enhance: [cp26]Unify compaction executor task state management (#44722)

Remove stopTask.
Replace multiple task tracking maps with single unified taskState map.
Fix slot tracking, improve state transitions, and add comprehensive test

See also: #44714
pr: #44721

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2025-10-11 18:30:08 +08:00 committed by GitHub
parent 9af220e211
commit 69a5ff5518
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 581 additions and 259 deletions

View File

@ -29,7 +29,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
const (
@ -40,94 +39,137 @@ type Executor interface {
Start(ctx context.Context)
Enqueue(task Compactor) (bool, error)
Slots() int64
RemoveTask(planID int64)
GetResults(planID int64) []*datapb.CompactionPlanResult
RemoveTask(planID int64) // Deprecated in 2.6
GetResults(planID int64) []*datapb.CompactionPlanResult // Deprecated in 2.6
}
// taskState represents the state of a compaction task
// State transitions:
// - executing -> completed (success)
// - executing -> failed (error)
//
// Once a task reaches completed/failed state, it stays there until removed
type taskState struct {
compactor Compactor
state datapb.CompactionTaskState
result *datapb.CompactionPlanResult
}
type executor struct {
executing *typeutil.ConcurrentMap[int64, Compactor] // planID to compactor
completedCompactor *typeutil.ConcurrentMap[int64, Compactor] // planID to compactor
completed *typeutil.ConcurrentMap[int64, *datapb.CompactionPlanResult] // planID to CompactionPlanResult
taskCh chan Compactor
dropped *typeutil.ConcurrentSet[string] // vchannel dropped
usingSlots int64
slotMu sync.RWMutex
mu sync.RWMutex
// To prevent concurrency of release channel and compaction get results
// all released channel's compaction tasks will be discarded
resultGuard sync.RWMutex
tasks map[int64]*taskState // planID -> task state
// Task queue for pending work
taskCh chan Compactor
// Slot tracking for resource management
usingSlots int64
// Slots(Slots Cap for DataCoord), ExecPool(MaxCompactionConcurrency) are all trying to control concurrency and resource usage,
// which creates unnecessary complexity. We should use a single resource pool instead.
}
func NewExecutor() *executor {
return &executor{
executing: typeutil.NewConcurrentMap[int64, Compactor](),
completedCompactor: typeutil.NewConcurrentMap[int64, Compactor](),
completed: typeutil.NewConcurrentMap[int64, *datapb.CompactionPlanResult](),
tasks: make(map[int64]*taskState),
taskCh: make(chan Compactor, maxTaskQueueNum),
dropped: typeutil.NewConcurrentSet[string](),
usingSlots: 0,
}
}
func (e *executor) Enqueue(task Compactor) (bool, error) {
e.slotMu.Lock()
defer e.slotMu.Unlock()
newSlotUsage := task.GetSlotUsage()
func getTaskSlotUsage(task Compactor) int64 {
// Calculate slot usage
taskSlotUsage := task.GetSlotUsage()
// compatible for old datacoord or unexpected request
if task.GetSlotUsage() <= 0 {
if taskSlotUsage <= 0 {
switch task.GetCompactionType() {
case datapb.CompactionType_ClusteringCompaction:
newSlotUsage = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64()
taskSlotUsage = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_MixCompaction:
newSlotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
taskSlotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_Level0DeleteCompaction:
newSlotUsage = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64()
taskSlotUsage = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64()
}
log.Warn("illegal task slot usage, change it to a default value", zap.Int64("illegalSlotUsage", task.GetSlotUsage()), zap.Int64("newSlotUsage", newSlotUsage))
log.Warn("illegal task slot usage, change it to a default value",
zap.Int64("illegalSlotUsage", task.GetSlotUsage()),
zap.Int64("defaultSlotUsage", taskSlotUsage),
zap.String("type", task.GetCompactionType().String()))
}
e.usingSlots = e.usingSlots + newSlotUsage
_, ok := e.executing.GetOrInsert(task.GetPlanID(), task)
if ok {
return taskSlotUsage
}
func (e *executor) Enqueue(task Compactor) (bool, error) {
e.mu.Lock()
defer e.mu.Unlock()
planID := task.GetPlanID()
// Check for duplicate task
if _, exists := e.tasks[planID]; exists {
log.Warn("duplicated compaction task",
zap.Int64("planID", task.GetPlanID()),
zap.Int64("planID", planID),
zap.String("channel", task.GetChannelName()))
return false, merr.WrapErrDuplicatedCompactionTask()
}
// Update slots and add task
e.usingSlots += getTaskSlotUsage(task)
e.tasks[planID] = &taskState{
compactor: task,
state: datapb.CompactionTaskState_executing,
result: nil,
}
e.taskCh <- task
return true, nil
}
// Slots returns the available slots for compaction
// Slots returns the used slots for compaction
func (e *executor) Slots() int64 {
return e.getUsingSlots()
}
func (e *executor) getUsingSlots() int64 {
e.slotMu.RLock()
defer e.slotMu.RUnlock()
e.mu.RLock()
defer e.mu.RUnlock()
return e.usingSlots
}
func (e *executor) toCompleteState(task Compactor) {
task.Complete()
e.getAndRemoveExecuting(task.GetPlanID())
// completeTask updates task state to completed and adjusts slot usage
func (e *executor) completeTask(planID int64, result *datapb.CompactionPlanResult) {
e.mu.Lock()
defer e.mu.Unlock()
if task, exists := e.tasks[planID]; exists {
task.compactor.Complete()
// Update state based on result
if result != nil {
task.state = datapb.CompactionTaskState_completed
task.result = result
} else {
task.state = datapb.CompactionTaskState_failed
}
func (e *executor) getAndRemoveExecuting(planID typeutil.UniqueID) (Compactor, bool) {
task, ok := e.executing.GetAndRemove(planID)
if ok {
e.slotMu.Lock()
e.usingSlots = e.usingSlots - task.GetSlotUsage()
e.slotMu.Unlock()
// Adjust slot usage
e.usingSlots -= getTaskSlotUsage(task.compactor)
if e.usingSlots < 0 {
e.usingSlots = 0
}
}
return task, ok
}
func (e *executor) RemoveTask(planID int64) {
e.completed.GetAndRemove(planID)
task, loaded := e.completedCompactor.GetAndRemove(planID)
if loaded {
log.Info("Compaction task removed", zap.Int64("planID", planID), zap.String("channel", task.GetChannelName()))
e.mu.Lock()
defer e.mu.Unlock()
if task, exists := e.tasks[planID]; exists {
// Only remove completed/failed tasks, not executing ones
if task.state != datapb.CompactionTaskState_executing {
log.Info("Compaction task removed",
zap.Int64("planID", planID),
zap.String("channel", task.compactor.GetChannelName()),
zap.String("state", task.state.String()))
delete(e.tasks, planID)
}
}
}
@ -148,24 +190,24 @@ func (e *executor) Start(ctx context.Context) {
func (e *executor) executeTask(task Compactor) {
log := log.With(
zap.Int64("planID", task.GetPlanID()),
zap.Int64("Collection", task.GetCollection()),
zap.Int64("collection", task.GetCollection()),
zap.String("channel", task.GetChannelName()),
zap.String("type", task.GetCompactionType().String()),
)
defer func() {
e.toCompleteState(task)
}()
log.Info("start to execute compaction")
result, err := task.Compact()
if err != nil {
log.Warn("compaction task failed", zap.Error(err))
e.completeTask(task.GetPlanID(), nil)
return
}
e.completed.Insert(result.GetPlanID(), result)
e.completedCompactor.Insert(result.GetPlanID(), task)
// Update task with result
e.completeTask(task.GetPlanID(), result)
// Emit metrics
getDataCount := func(binlogs []*datapb.FieldBinlog) int64 {
count := int64(0)
for _, binlog := range binlogs {
@ -195,14 +237,6 @@ func (e *executor) executeTask(task Compactor) {
log.Info("end to execute compaction")
}
func (e *executor) stopTask(planID int64) {
task, loaded := e.getAndRemoveExecuting(planID)
if loaded {
log.Warn("compaction executor stop task", zap.Int64("planID", planID), zap.String("vChannelName", task.GetChannelName()))
task.Stop()
}
}
func (e *executor) GetResults(planID int64) []*datapb.CompactionPlanResult {
if planID != 0 {
result := e.getCompactionResult(planID)
@ -212,69 +246,60 @@ func (e *executor) GetResults(planID int64) []*datapb.CompactionPlanResult {
}
func (e *executor) getCompactionResult(planID int64) *datapb.CompactionPlanResult {
e.resultGuard.RLock()
defer e.resultGuard.RUnlock()
_, ok := e.executing.Get(planID)
if ok {
result := &datapb.CompactionPlanResult{
State: datapb.CompactionTaskState_executing,
e.mu.RLock()
defer e.mu.RUnlock()
if task, exists := e.tasks[planID]; exists {
if task.result != nil {
return task.result
}
return &datapb.CompactionPlanResult{
State: task.state,
PlanID: planID,
}
return result
}
result, ok2 := e.completed.Get(planID)
if !ok2 {
// Task not found, return failed state
return &datapb.CompactionPlanResult{
PlanID: planID,
State: datapb.CompactionTaskState_failed,
}
}
return result
}
func (e *executor) getAllCompactionResults() []*datapb.CompactionPlanResult {
e.resultGuard.RLock()
defer e.resultGuard.RUnlock()
e.mu.Lock()
defer e.mu.Unlock()
var (
executing []int64
completed []int64
completedLevelZero []int64
)
var executingResults []*datapb.CompactionPlanResult
results := make([]*datapb.CompactionPlanResult, 0)
// get executing results
e.executing.Range(func(planID int64, task Compactor) bool {
// Collect results from all tasks
for planID, task := range e.tasks {
if task.state == datapb.CompactionTaskState_executing {
executing = append(executing, planID)
executingResults = append(executingResults, &datapb.CompactionPlanResult{
results = append(results, &datapb.CompactionPlanResult{
State: datapb.CompactionTaskState_executing,
PlanID: planID,
})
return true
})
// get completed results
e.completed.Range(func(planID int64, result *datapb.CompactionPlanResult) bool {
} else if task.result != nil {
completed = append(completed, planID)
results = append(results, result)
results = append(results, task.result)
if result.GetType() == datapb.CompactionType_Level0DeleteCompaction {
if task.result.GetType() == datapb.CompactionType_Level0DeleteCompaction {
completedLevelZero = append(completedLevelZero, planID)
}
return true
})
}
}
// quick fix for task id may appear in both executing and completed
// TODO: make sure task id only has one state
completedIDs := typeutil.NewSet(completed...)
results = append(results, lo.Filter(executingResults, func(result *datapb.CompactionPlanResult, _ int) bool {
return !completedIDs.Contain(result.GetPlanID())
})...)
// remove level zero results
lo.ForEach(completedLevelZero, func(planID int64, _ int) {
e.completed.Remove(planID)
e.completedCompactor.Remove(planID)
})
// Remove completed level zero compaction tasks
for _, planID := range completedLevelZero {
delete(e.tasks, planID)
}
if len(results) > 0 {
log.Info("DataNode Compaction results",

View File

@ -18,7 +18,9 @@ package compactor
import (
"context"
"sync"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
@ -30,155 +32,446 @@ import (
)
func TestCompactionExecutor(t *testing.T) {
t.Run("Test execute", func(t *testing.T) {
paramtable.Get().Init(paramtable.NewBaseTable())
planID := int64(1)
mockC := NewMockCompactor(t)
mockC.EXPECT().GetPlanID().Return(planID)
mockC.EXPECT().GetChannelName().Return("ch1")
mockC.EXPECT().GetSlotUsage().Return(8)
executor := NewExecutor()
succeed, err := executor.Enqueue(mockC)
assert.Equal(t, true, succeed)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(executor.taskCh))
assert.EqualValues(t, 1, executor.executing.Len())
mockC.EXPECT().Stop().Return().Once()
executor.stopTask(planID)
})
t.Run("Test deplicate execute", func(t *testing.T) {
paramtable.Get().Init(paramtable.NewBaseTable())
planID := int64(1)
mockC := NewMockCompactor(t)
mockC.EXPECT().GetPlanID().Return(planID)
mockC.EXPECT().GetChannelName().Return("ch1")
mockC.EXPECT().GetSlotUsage().Return(8)
executor := NewExecutor()
succeed, err := executor.Enqueue(mockC)
assert.Equal(t, true, succeed)
assert.NoError(t, err)
succeed2, err2 := executor.Enqueue(mockC)
assert.Equal(t, false, succeed2)
assert.Error(t, err2)
assert.True(t, errors.Is(err2, merr.ErrDuplicatedCompactionTask))
assert.EqualValues(t, 1, len(executor.taskCh))
assert.EqualValues(t, 1, executor.executing.Len())
mockC.EXPECT().Stop().Return().Once()
executor.stopTask(planID)
})
t.Run("Test execute task with slot=0", func(t *testing.T) {
paramtable.Get().Init(paramtable.NewBaseTable())
planID := int64(1)
mockC := NewMockCompactor(t)
mockC.EXPECT().GetPlanID().Return(planID)
mockC.EXPECT().GetChannelName().Return("ch1")
mockC.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction)
mockC.EXPECT().GetSlotUsage().Return(0)
executor := NewExecutor()
succeed, err := executor.Enqueue(mockC)
assert.Equal(t, true, succeed)
assert.NoError(t, err)
assert.Equal(t, int64(4), executor.Slots())
assert.Equal(t, int64(4), executor.usingSlots)
assert.EqualValues(t, 1, len(executor.taskCh))
assert.EqualValues(t, 1, executor.executing.Len())
mockC.EXPECT().Stop().Return().Once()
executor.stopTask(planID)
})
t.Run("Test Start", func(t *testing.T) {
t.Run("Test_Enqueue_Success", func(t *testing.T) {
ex := NewExecutor()
ctx, cancel := context.WithCancel(context.TODO())
cancel()
go ex.Start(ctx)
})
t.Run("Test executeTask", func(t *testing.T) {
tests := []struct {
isvalid bool
description string
}{
{true, "compact success"},
{false, "compact return error"},
}
ex := NewExecutor()
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
mockC := NewMockCompactor(t)
mockC.EXPECT().GetPlanID().Return(int64(1))
mockC.EXPECT().GetCollection().Return(int64(1))
mockC.EXPECT().GetChannelName().Return("ch1")
mockC.EXPECT().Complete().Return().Maybe()
signal := make(chan struct{})
if test.isvalid {
mockC.EXPECT().Compact().RunAndReturn(
func() (*datapb.CompactionPlanResult, error) {
signal <- struct{}{}
return &datapb.CompactionPlanResult{PlanID: 1}, nil
}).Once()
go ex.executeTask(mockC)
<-signal
} else {
mockC.EXPECT().Compact().RunAndReturn(
func() (*datapb.CompactionPlanResult, error) {
signal <- struct{}{}
return nil, errors.New("mock error")
}).Once()
go ex.executeTask(mockC)
<-signal
}
})
}
mockC.EXPECT().GetSlotUsage().Return(int64(8))
succeed, err := ex.Enqueue(mockC)
assert.True(t, succeed)
assert.NoError(t, err)
assert.Equal(t, 1, len(ex.taskCh))
assert.Equal(t, int64(8), ex.Slots())
ex.mu.RLock()
task, exists := ex.tasks[1]
ex.mu.RUnlock()
assert.True(t, exists)
assert.Equal(t, datapb.CompactionTaskState_executing, task.state)
})
t.Run("test GetAllCompactionResults", func(t *testing.T) {
t.Run("Test_Enqueue_Duplicate", func(t *testing.T) {
ex := NewExecutor()
mockC := NewMockCompactor(t)
mockC.EXPECT().GetPlanID().Return(int64(1)).Times(2)
mockC.EXPECT().GetSlotUsage().Return(int64(8))
mockC.EXPECT().GetChannelName().Return("ch1")
succeed, err := ex.Enqueue(mockC)
assert.True(t, succeed)
assert.NoError(t, err)
succeed, err = ex.Enqueue(mockC)
assert.False(t, succeed)
assert.Error(t, err)
assert.True(t, errors.Is(err, merr.ErrDuplicatedCompactionTask))
assert.Equal(t, 1, len(ex.taskCh))
})
t.Run("Test_Enqueue_DefaultSlotUsage", func(t *testing.T) {
ex := NewExecutor()
mockC := NewMockCompactor(t)
ex.executing.Insert(int64(1), mockC)
testCases := []struct {
name string
compactionType datapb.CompactionType
expectedSlotUsage int64
}{
{
name: "MixCompaction",
compactionType: datapb.CompactionType_MixCompaction,
expectedSlotUsage: paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(),
},
{
name: "Level0DeleteCompaction",
compactionType: datapb.CompactionType_Level0DeleteCompaction,
expectedSlotUsage: paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64(),
},
{
name: "ClusteringCompaction",
compactionType: datapb.CompactionType_ClusteringCompaction,
expectedSlotUsage: paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64(),
},
}
ex.completedCompactor.Insert(int64(2), mockC)
ex.completed.Insert(int64(2), &datapb.CompactionPlanResult{
for i, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mockC := NewMockCompactor(t)
mockC.EXPECT().GetPlanID().Return(int64(i + 10))
mockC.EXPECT().GetSlotUsage().Return(int64(0)).Times(2)
mockC.EXPECT().GetCompactionType().Return(tc.compactionType)
succeed, err := ex.Enqueue(mockC)
assert.True(t, succeed)
assert.NoError(t, err)
})
}
})
t.Run("Test_ExecuteTask_Success", func(t *testing.T) {
ex := NewExecutor()
mockC := NewMockCompactor(t)
planID := int64(1)
result := &datapb.CompactionPlanResult{
PlanID: planID,
State: datapb.CompactionTaskState_completed,
Segments: []*datapb.CompactionSegment{
{
SegmentID: 100,
NumOfRows: 1000,
InsertLogs: nil,
Deltalogs: nil,
},
},
}
mockC.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction)
mockC.EXPECT().GetPlanID().Return(planID).Times(3)
mockC.EXPECT().GetCollection().Return(int64(1))
mockC.EXPECT().GetChannelName().Return("ch1")
mockC.EXPECT().GetSlotUsage().Return(int64(8)).Times(2)
mockC.EXPECT().Compact().Return(result, nil)
mockC.EXPECT().Complete().Return()
succeed, err := ex.Enqueue(mockC)
assert.True(t, succeed)
assert.NoError(t, err)
ex.executeTask(mockC)
ex.mu.RLock()
task, exists := ex.tasks[planID]
ex.mu.RUnlock()
assert.True(t, exists)
assert.Equal(t, datapb.CompactionTaskState_completed, task.state)
assert.Equal(t, result, task.result)
assert.Equal(t, int64(0), ex.Slots())
})
t.Run("Test_ExecuteTask_Failure", func(t *testing.T) {
ex := NewExecutor()
mockC := NewMockCompactor(t)
planID := int64(2)
mockC.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction)
mockC.EXPECT().GetPlanID().Return(planID).Times(3)
mockC.EXPECT().GetCollection().Return(int64(1))
mockC.EXPECT().GetChannelName().Return("ch1")
mockC.EXPECT().GetSlotUsage().Return(int64(8)).Times(2)
mockC.EXPECT().Compact().Return(nil, errors.New("compaction failed"))
mockC.EXPECT().Complete().Return()
succeed, err := ex.Enqueue(mockC)
assert.True(t, succeed)
assert.NoError(t, err)
ex.executeTask(mockC)
ex.mu.RLock()
task, exists := ex.tasks[planID]
ex.mu.RUnlock()
assert.True(t, exists)
assert.Equal(t, datapb.CompactionTaskState_failed, task.state)
assert.Nil(t, task.result)
assert.Equal(t, int64(0), ex.Slots())
})
t.Run("Test_RemoveTask", func(t *testing.T) {
ex := NewExecutor()
completedTask := &taskState{
compactor: NewMockCompactor(t),
state: datapb.CompactionTaskState_completed,
result: &datapb.CompactionPlanResult{PlanID: 1},
}
executingTask := &taskState{
compactor: NewMockCompactor(t),
state: datapb.CompactionTaskState_executing,
result: nil,
}
failedTask := &taskState{
compactor: NewMockCompactor(t),
state: datapb.CompactionTaskState_failed,
result: nil,
}
completedTask.compactor.(*MockCompactor).EXPECT().GetChannelName().Return("ch1").Maybe()
executingTask.compactor.(*MockCompactor).EXPECT().GetChannelName().Return("ch2").Maybe()
failedTask.compactor.(*MockCompactor).EXPECT().GetChannelName().Return("ch3").Maybe()
ex.tasks[1] = completedTask
ex.tasks[2] = executingTask
ex.tasks[3] = failedTask
ex.RemoveTask(1)
assert.Equal(t, 2, len(ex.tasks))
ex.RemoveTask(2)
assert.Equal(t, 2, len(ex.tasks))
ex.RemoveTask(3)
assert.Equal(t, 1, len(ex.tasks))
_, exists := ex.tasks[2]
assert.True(t, exists)
})
t.Run("Test_GetResults_SinglePlan", func(t *testing.T) {
ex := NewExecutor()
result := &datapb.CompactionPlanResult{
PlanID: 1,
State: datapb.CompactionTaskState_completed,
}
ex.tasks[1] = &taskState{
compactor: NewMockCompactor(t),
state: datapb.CompactionTaskState_completed,
result: result,
}
results := ex.GetResults(1)
assert.Equal(t, 1, len(results))
assert.Equal(t, result, results[0])
})
t.Run("Test_GetResults_NonExistentPlan", func(t *testing.T) {
ex := NewExecutor()
results := ex.GetResults(999)
assert.Equal(t, 1, len(results))
assert.Equal(t, int64(999), results[0].PlanID)
assert.Equal(t, datapb.CompactionTaskState_failed, results[0].State)
})
t.Run("Test_GetResults_All", func(t *testing.T) {
ex := NewExecutor()
mockC1 := NewMockCompactor(t)
ex.tasks[1] = &taskState{
compactor: mockC1,
state: datapb.CompactionTaskState_executing,
result: nil,
}
mockC2 := NewMockCompactor(t)
ex.tasks[2] = &taskState{
compactor: mockC2,
state: datapb.CompactionTaskState_completed,
result: &datapb.CompactionPlanResult{
PlanID: 2,
State: datapb.CompactionTaskState_completed,
Type: datapb.CompactionType_MixCompaction,
})
},
}
ex.completedCompactor.Insert(int64(3), mockC)
ex.completed.Insert(int64(3), &datapb.CompactionPlanResult{
mockC3 := NewMockCompactor(t)
ex.tasks[3] = &taskState{
compactor: mockC3,
state: datapb.CompactionTaskState_completed,
result: &datapb.CompactionPlanResult{
PlanID: 3,
State: datapb.CompactionTaskState_completed,
Type: datapb.CompactionType_Level0DeleteCompaction,
},
}
results := ex.GetResults(0)
assert.Equal(t, 3, len(results))
planIDs := make(map[int64]bool)
for _, r := range results {
planIDs[r.PlanID] = true
}
assert.True(t, planIDs[1])
assert.True(t, planIDs[2])
assert.True(t, planIDs[3])
assert.Equal(t, 2, len(ex.tasks))
_, exists := ex.tasks[3]
assert.False(t, exists)
})
require.Equal(t, 2, ex.completed.Len())
require.Equal(t, 2, ex.completedCompactor.Len())
require.Equal(t, 1, ex.executing.Len())
t.Run("Test_Start_Context_Cancel", func(t *testing.T) {
ex := NewExecutor()
ctx, cancel := context.WithCancel(context.Background())
result := ex.GetResults(0)
assert.Equal(t, 3, len(result))
done := make(chan bool)
go func() {
ex.Start(ctx)
done <- true
}()
for _, res := range result {
if res.PlanID == int64(1) {
assert.Equal(t, res.GetState(), datapb.CompactionTaskState_executing)
} else {
assert.Equal(t, res.GetState(), datapb.CompactionTaskState_completed)
cancel()
select {
case <-done:
case <-time.After(100 * time.Millisecond):
t.Fatal("Start didn't return after context cancel")
}
})
t.Run("Test_Concurrent_Operations", func(t *testing.T) {
ex := NewExecutor()
numTasks := 20
var wg sync.WaitGroup
for i := 0; i < numTasks; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
mockC := NewMockCompactor(t)
mockC.EXPECT().GetPlanID().Return(int64(id))
mockC.EXPECT().GetSlotUsage().Return(int64(1))
mockC.EXPECT().GetChannelName().Return("ch1").Maybe()
ex.Enqueue(mockC)
}(i)
}
assert.Equal(t, 1, ex.completed.Len())
require.Equal(t, 1, ex.completedCompactor.Len())
require.Equal(t, 1, ex.executing.Len())
wg.Wait()
assert.Equal(t, numTasks, len(ex.tasks))
assert.Equal(t, int64(numTasks), ex.Slots())
})
t.Run("Test_CompleteTask_SlotAdjustment", func(t *testing.T) {
ex := NewExecutor()
mockC := NewMockCompactor(t)
planID := int64(1)
slotUsage := int64(10)
mockC.EXPECT().GetPlanID().Return(planID)
mockC.EXPECT().GetSlotUsage().Return(slotUsage).Times(2)
mockC.EXPECT().Complete().Return()
ex.Enqueue(mockC)
assert.Equal(t, slotUsage, ex.Slots())
result := &datapb.CompactionPlanResult{PlanID: planID}
ex.completeTask(planID, result)
assert.Equal(t, int64(0), ex.Slots())
ex.mu.RLock()
task := ex.tasks[planID]
ex.mu.RUnlock()
assert.Equal(t, datapb.CompactionTaskState_completed, task.state)
assert.Equal(t, result, task.result)
})
t.Run("Test_CompleteTask_NegativeSlotProtection", func(t *testing.T) {
ex := NewExecutor()
ex.usingSlots = -5
mockC := NewMockCompactor(t)
mockC.EXPECT().GetSlotUsage().Return(int64(10))
mockC.EXPECT().Complete().Return()
ex.tasks[1] = &taskState{
compactor: mockC,
state: datapb.CompactionTaskState_executing,
}
ex.completeTask(1, nil)
assert.Equal(t, int64(0), ex.Slots())
})
t.Run("Test_Task_State_Transitions", func(t *testing.T) {
ex := NewExecutor()
mockC := NewMockCompactor(t)
planID := int64(1)
mockC.EXPECT().GetPlanID().Return(planID).Times(3)
mockC.EXPECT().GetSlotUsage().Return(int64(5)).Times(2)
mockC.EXPECT().GetCollection().Return(int64(1))
mockC.EXPECT().GetChannelName().Return("ch1")
mockC.EXPECT().Complete().Return()
mockC.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction)
ex.Enqueue(mockC)
ex.mu.RLock()
assert.Equal(t, datapb.CompactionTaskState_executing, ex.tasks[planID].state)
ex.mu.RUnlock()
mockC.EXPECT().Compact().Return(&datapb.CompactionPlanResult{
PlanID: planID,
State: datapb.CompactionTaskState_completed,
}, nil).Once()
ex.executeTask(mockC)
ex.mu.RLock()
assert.Equal(t, datapb.CompactionTaskState_completed, ex.tasks[planID].state)
ex.mu.RUnlock()
})
t.Run("Test_GetResults_ExecutingTask", func(t *testing.T) {
ex := NewExecutor()
ex.tasks[1] = &taskState{
compactor: NewMockCompactor(t),
state: datapb.CompactionTaskState_executing,
result: nil,
}
results := ex.GetResults(1)
assert.Equal(t, 1, len(results))
assert.Equal(t, int64(1), results[0].PlanID)
assert.Equal(t, datapb.CompactionTaskState_executing, results[0].State)
})
t.Run("Test_Multiple_ExecuteTask_WithMetrics", func(t *testing.T) {
ex := NewExecutor()
planIDs := []int64{1, 2, 3}
for _, planID := range planIDs {
mockC := NewMockCompactor(t)
mockC.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction)
mockC.EXPECT().GetPlanID().Return(planID).Times(3)
mockC.EXPECT().GetCollection().Return(int64(100))
mockC.EXPECT().GetChannelName().Return("ch1")
mockC.EXPECT().GetSlotUsage().Return(int64(4)).Times(2)
mockC.EXPECT().Complete().Return()
result := &datapb.CompactionPlanResult{
PlanID: planID,
State: datapb.CompactionTaskState_completed,
Segments: []*datapb.CompactionSegment{
{
SegmentID: planID * 100,
NumOfRows: planID * 1000,
Deltalogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 10},
},
},
},
},
},
}
mockC.EXPECT().Compact().Return(result, nil)
succeed, err := ex.Enqueue(mockC)
require.True(t, succeed)
require.NoError(t, err)
ex.executeTask(mockC)
}
results := ex.GetResults(0)
assert.Equal(t, 3, len(results))
for _, result := range results {
assert.Equal(t, datapb.CompactionTaskState_completed, result.State)
}
})
}

View File

@ -100,7 +100,7 @@ func (s *MixCompactionTaskStorageV1Suite) setupTest() {
func (s *MixCompactionTaskStorageV1Suite) SetupTest() {
s.setupTest()
paramtable.Get().Save("common.storage.enableV2", "false")
paramtable.Get().Save("common.storage.enablev2", "false")
}
func (s *MixCompactionTaskStorageV1Suite) SetupBM25() {
@ -140,7 +140,7 @@ func (s *MixCompactionTaskStorageV1Suite) SetupSubTest() {
func (s *MixCompactionTaskStorageV1Suite) TearDownTest() {
paramtable.Get().Reset(paramtable.Get().CommonCfg.EntityExpirationTTL.Key)
paramtable.Get().Reset("common.storageType")
paramtable.Get().Reset("common.storage.enableV2")
paramtable.Get().Reset("common.storage.enablev2")
}
func getMilvusBirthday() time.Time {

View File

@ -263,8 +263,8 @@ func (node *DataNode) CompactionV2(ctx context.Context, req *datapb.CompactionPl
}
}
// GetCompactionState called by DataCoord
// return status of all compaction plans
// GetCompactionState called by DataCoord return status of all compaction plans
// Deprecated after v2.6.0
func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.CompactionStateRequest) (*datapb.CompactionStateResponse, error) {
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
log.Ctx(ctx).Warn("DataNode.GetCompactionState failed", zap.Int64("nodeId", node.GetNodeID()), zap.Error(err))
@ -522,6 +522,7 @@ func (node *DataNode) QuerySlot(ctx context.Context, req *datapb.QuerySlotReques
}, nil
}
// Not in used now
func (node *DataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) {
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return merr.Status(err), nil

View File

@ -158,6 +158,7 @@ func (s *DataNodeServicesSuite) TestGetCompactionState() {
)
mockC := compactor.NewMockCompactor(s.T())
mockC.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction)
mockC.EXPECT().GetPlanID().Return(int64(1))
mockC.EXPECT().GetCollection().Return(collection)
mockC.EXPECT().GetChannelName().Return(channel)
@ -170,6 +171,7 @@ func (s *DataNodeServicesSuite) TestGetCompactionState() {
s.node.compactionExecutor.Enqueue(mockC)
mockC2 := compactor.NewMockCompactor(s.T())
mockC2.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction)
mockC2.EXPECT().GetPlanID().Return(int64(2))
mockC2.EXPECT().GetCollection().Return(collection)
mockC2.EXPECT().GetChannelName().Return(channel)

View File

@ -91,15 +91,6 @@ func (s *HelloMilvusSuite) TestHelloStreaming() {
s.NoError(err)
s.Equal(int64(rowNum), insertResult.GetInsertCnt())
// delete
deleteResult, err := c.MilvusClient.Delete(ctx, &milvuspb.DeleteRequest{
DbName: dbName,
CollectionName: collectionName,
Expr: integration.Int64Field + " in [1, 2]",
})
err = merr.CheckRPCCall(deleteResult, err)
s.NoError(err)
// flush
flushResp, err := c.MilvusClient.Flush(ctx, &milvuspb.FlushRequest{
DbName: dbName,
@ -133,7 +124,7 @@ func (s *HelloMilvusSuite) TestHelloStreaming() {
flushedSegment := lo.Filter(segments, func(info *datapb.SegmentInfo, i int) bool {
return info.GetState() == commonpb.SegmentState_Flushed || info.GetState() == commonpb.SegmentState_Flushing
})
s.Equal(2, len(flushedSegment))
s.Equal(1, len(flushedSegment))
s.Equal(int64(rowNum), segments[0].GetNumOfRows())
// load
@ -159,6 +150,16 @@ func (s *HelloMilvusSuite) TestHelloStreaming() {
s.NoError(err)
s.Equal(nq*topk, len(searchResult.GetResults().GetScores()))
// delete before query
// avoid L0 Compaction causing segment number unstable
deleteResult, err := c.MilvusClient.Delete(ctx, &milvuspb.DeleteRequest{
DbName: dbName,
CollectionName: collectionName,
Expr: integration.Int64Field + " in [1, 2]",
})
err = merr.CheckRPCCall(deleteResult, err)
s.NoError(err)
// query
queryResult, err := c.MilvusClient.Query(ctx, &milvuspb.QueryRequest{
DbName: dbName,