diff --git a/internal/datanode/compactor/executor.go b/internal/datanode/compactor/executor.go index 830fe71799..29e7c2d2ae 100644 --- a/internal/datanode/compactor/executor.go +++ b/internal/datanode/compactor/executor.go @@ -29,7 +29,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" - "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) const ( @@ -40,94 +39,137 @@ type Executor interface { Start(ctx context.Context) Enqueue(task Compactor) (bool, error) Slots() int64 - RemoveTask(planID int64) - GetResults(planID int64) []*datapb.CompactionPlanResult + RemoveTask(planID int64) // Deprecated in 2.6 + GetResults(planID int64) []*datapb.CompactionPlanResult // Deprecated in 2.6 +} + +// taskState represents the state of a compaction task +// State transitions: +// - executing -> completed (success) +// - executing -> failed (error) +// +// Once a task reaches completed/failed state, it stays there until removed +type taskState struct { + compactor Compactor + state datapb.CompactionTaskState + result *datapb.CompactionPlanResult } type executor struct { - executing *typeutil.ConcurrentMap[int64, Compactor] // planID to compactor - completedCompactor *typeutil.ConcurrentMap[int64, Compactor] // planID to compactor - completed *typeutil.ConcurrentMap[int64, *datapb.CompactionPlanResult] // planID to CompactionPlanResult - taskCh chan Compactor - dropped *typeutil.ConcurrentSet[string] // vchannel dropped - usingSlots int64 - slotMu sync.RWMutex + mu sync.RWMutex - // To prevent concurrency of release channel and compaction get results - // all released channel's compaction tasks will be discarded - resultGuard sync.RWMutex + tasks map[int64]*taskState // planID -> task state + + // Task queue for pending work + taskCh chan Compactor + + // Slot tracking for resource management + usingSlots int64 + + // Slots(Slots Cap for DataCoord), ExecPool(MaxCompactionConcurrency) are all trying to control concurrency and resource usage, + // which creates unnecessary complexity. We should use a single resource pool instead. } func NewExecutor() *executor { return &executor{ - executing: typeutil.NewConcurrentMap[int64, Compactor](), - completedCompactor: typeutil.NewConcurrentMap[int64, Compactor](), - completed: typeutil.NewConcurrentMap[int64, *datapb.CompactionPlanResult](), - taskCh: make(chan Compactor, maxTaskQueueNum), - dropped: typeutil.NewConcurrentSet[string](), - usingSlots: 0, + tasks: make(map[int64]*taskState), + taskCh: make(chan Compactor, maxTaskQueueNum), + usingSlots: 0, } } -func (e *executor) Enqueue(task Compactor) (bool, error) { - e.slotMu.Lock() - defer e.slotMu.Unlock() - newSlotUsage := task.GetSlotUsage() +func getTaskSlotUsage(task Compactor) int64 { + // Calculate slot usage + taskSlotUsage := task.GetSlotUsage() // compatible for old datacoord or unexpected request - if task.GetSlotUsage() <= 0 { + if taskSlotUsage <= 0 { switch task.GetCompactionType() { case datapb.CompactionType_ClusteringCompaction: - newSlotUsage = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64() + taskSlotUsage = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64() case datapb.CompactionType_MixCompaction: - newSlotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64() + taskSlotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64() case datapb.CompactionType_Level0DeleteCompaction: - newSlotUsage = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64() + taskSlotUsage = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64() } - log.Warn("illegal task slot usage, change it to a default value", zap.Int64("illegalSlotUsage", task.GetSlotUsage()), zap.Int64("newSlotUsage", newSlotUsage)) + log.Warn("illegal task slot usage, change it to a default value", + zap.Int64("illegalSlotUsage", task.GetSlotUsage()), + zap.Int64("defaultSlotUsage", taskSlotUsage), + zap.String("type", task.GetCompactionType().String())) } - e.usingSlots = e.usingSlots + newSlotUsage - _, ok := e.executing.GetOrInsert(task.GetPlanID(), task) - if ok { + + return taskSlotUsage +} + +func (e *executor) Enqueue(task Compactor) (bool, error) { + e.mu.Lock() + defer e.mu.Unlock() + + planID := task.GetPlanID() + + // Check for duplicate task + if _, exists := e.tasks[planID]; exists { log.Warn("duplicated compaction task", - zap.Int64("planID", task.GetPlanID()), + zap.Int64("planID", planID), zap.String("channel", task.GetChannelName())) return false, merr.WrapErrDuplicatedCompactionTask() } + + // Update slots and add task + e.usingSlots += getTaskSlotUsage(task) + e.tasks[planID] = &taskState{ + compactor: task, + state: datapb.CompactionTaskState_executing, + result: nil, + } + e.taskCh <- task return true, nil } -// Slots returns the available slots for compaction +// Slots returns the used slots for compaction func (e *executor) Slots() int64 { - return e.getUsingSlots() -} - -func (e *executor) getUsingSlots() int64 { - e.slotMu.RLock() - defer e.slotMu.RUnlock() + e.mu.RLock() + defer e.mu.RUnlock() return e.usingSlots } -func (e *executor) toCompleteState(task Compactor) { - task.Complete() - e.getAndRemoveExecuting(task.GetPlanID()) -} +// completeTask updates task state to completed and adjusts slot usage +func (e *executor) completeTask(planID int64, result *datapb.CompactionPlanResult) { + e.mu.Lock() + defer e.mu.Unlock() -func (e *executor) getAndRemoveExecuting(planID typeutil.UniqueID) (Compactor, bool) { - task, ok := e.executing.GetAndRemove(planID) - if ok { - e.slotMu.Lock() - e.usingSlots = e.usingSlots - task.GetSlotUsage() - e.slotMu.Unlock() + if task, exists := e.tasks[planID]; exists { + task.compactor.Complete() + + // Update state based on result + if result != nil { + task.state = datapb.CompactionTaskState_completed + task.result = result + } else { + task.state = datapb.CompactionTaskState_failed + } + + // Adjust slot usage + e.usingSlots -= getTaskSlotUsage(task.compactor) + if e.usingSlots < 0 { + e.usingSlots = 0 + } } - return task, ok } func (e *executor) RemoveTask(planID int64) { - e.completed.GetAndRemove(planID) - task, loaded := e.completedCompactor.GetAndRemove(planID) - if loaded { - log.Info("Compaction task removed", zap.Int64("planID", planID), zap.String("channel", task.GetChannelName())) + e.mu.Lock() + defer e.mu.Unlock() + + if task, exists := e.tasks[planID]; exists { + // Only remove completed/failed tasks, not executing ones + if task.state != datapb.CompactionTaskState_executing { + log.Info("Compaction task removed", + zap.Int64("planID", planID), + zap.String("channel", task.compactor.GetChannelName()), + zap.String("state", task.state.String())) + delete(e.tasks, planID) + } } } @@ -148,24 +190,24 @@ func (e *executor) Start(ctx context.Context) { func (e *executor) executeTask(task Compactor) { log := log.With( zap.Int64("planID", task.GetPlanID()), - zap.Int64("Collection", task.GetCollection()), + zap.Int64("collection", task.GetCollection()), zap.String("channel", task.GetChannelName()), + zap.String("type", task.GetCompactionType().String()), ) - defer func() { - e.toCompleteState(task) - }() - log.Info("start to execute compaction") result, err := task.Compact() if err != nil { log.Warn("compaction task failed", zap.Error(err)) + e.completeTask(task.GetPlanID(), nil) return } - e.completed.Insert(result.GetPlanID(), result) - e.completedCompactor.Insert(result.GetPlanID(), task) + // Update task with result + e.completeTask(task.GetPlanID(), result) + + // Emit metrics getDataCount := func(binlogs []*datapb.FieldBinlog) int64 { count := int64(0) for _, binlog := range binlogs { @@ -195,14 +237,6 @@ func (e *executor) executeTask(task Compactor) { log.Info("end to execute compaction") } -func (e *executor) stopTask(planID int64) { - task, loaded := e.getAndRemoveExecuting(planID) - if loaded { - log.Warn("compaction executor stop task", zap.Int64("planID", planID), zap.String("vChannelName", task.GetChannelName())) - task.Stop() - } -} - func (e *executor) GetResults(planID int64) []*datapb.CompactionPlanResult { if planID != 0 { result := e.getCompactionResult(planID) @@ -212,69 +246,60 @@ func (e *executor) GetResults(planID int64) []*datapb.CompactionPlanResult { } func (e *executor) getCompactionResult(planID int64) *datapb.CompactionPlanResult { - e.resultGuard.RLock() - defer e.resultGuard.RUnlock() - _, ok := e.executing.Get(planID) - if ok { - result := &datapb.CompactionPlanResult{ - State: datapb.CompactionTaskState_executing, - PlanID: planID, + e.mu.RLock() + defer e.mu.RUnlock() + + if task, exists := e.tasks[planID]; exists { + if task.result != nil { + return task.result } - return result - } - result, ok2 := e.completed.Get(planID) - if !ok2 { return &datapb.CompactionPlanResult{ + State: task.state, PlanID: planID, - State: datapb.CompactionTaskState_failed, } } - return result + + // Task not found, return failed state + return &datapb.CompactionPlanResult{ + PlanID: planID, + State: datapb.CompactionTaskState_failed, + } } func (e *executor) getAllCompactionResults() []*datapb.CompactionPlanResult { - e.resultGuard.RLock() - defer e.resultGuard.RUnlock() + e.mu.Lock() + defer e.mu.Unlock() + var ( executing []int64 completed []int64 completedLevelZero []int64 ) - var executingResults []*datapb.CompactionPlanResult + results := make([]*datapb.CompactionPlanResult, 0) - // get executing results - e.executing.Range(func(planID int64, task Compactor) bool { - executing = append(executing, planID) - executingResults = append(executingResults, &datapb.CompactionPlanResult{ - State: datapb.CompactionTaskState_executing, - PlanID: planID, - }) - return true - }) - // get completed results - e.completed.Range(func(planID int64, result *datapb.CompactionPlanResult) bool { - completed = append(completed, planID) - results = append(results, result) + // Collect results from all tasks + for planID, task := range e.tasks { + if task.state == datapb.CompactionTaskState_executing { + executing = append(executing, planID) + results = append(results, &datapb.CompactionPlanResult{ + State: datapb.CompactionTaskState_executing, + PlanID: planID, + }) + } else if task.result != nil { + completed = append(completed, planID) + results = append(results, task.result) - if result.GetType() == datapb.CompactionType_Level0DeleteCompaction { - completedLevelZero = append(completedLevelZero, planID) + if task.result.GetType() == datapb.CompactionType_Level0DeleteCompaction { + completedLevelZero = append(completedLevelZero, planID) + } } - return true - }) + } - // quick fix for task id may appear in both executing and completed - // TODO: make sure task id only has one state - completedIDs := typeutil.NewSet(completed...) - results = append(results, lo.Filter(executingResults, func(result *datapb.CompactionPlanResult, _ int) bool { - return !completedIDs.Contain(result.GetPlanID()) - })...) - - // remove level zero results - lo.ForEach(completedLevelZero, func(planID int64, _ int) { - e.completed.Remove(planID) - e.completedCompactor.Remove(planID) - }) + // Remove completed level zero compaction tasks + for _, planID := range completedLevelZero { + delete(e.tasks, planID) + } if len(results) > 0 { log.Info("DataNode Compaction results", diff --git a/internal/datanode/compactor/executor_test.go b/internal/datanode/compactor/executor_test.go index 3b4fc42c1f..b536fce9ff 100644 --- a/internal/datanode/compactor/executor_test.go +++ b/internal/datanode/compactor/executor_test.go @@ -18,7 +18,9 @@ package compactor import ( "context" + "sync" "testing" + "time" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" @@ -30,155 +32,446 @@ import ( ) func TestCompactionExecutor(t *testing.T) { - t.Run("Test execute", func(t *testing.T) { - paramtable.Get().Init(paramtable.NewBaseTable()) - planID := int64(1) - mockC := NewMockCompactor(t) - mockC.EXPECT().GetPlanID().Return(planID) - mockC.EXPECT().GetChannelName().Return("ch1") - mockC.EXPECT().GetSlotUsage().Return(8) - executor := NewExecutor() - succeed, err := executor.Enqueue(mockC) - assert.Equal(t, true, succeed) - assert.NoError(t, err) - assert.EqualValues(t, 1, len(executor.taskCh)) - assert.EqualValues(t, 1, executor.executing.Len()) + paramtable.Get().Init(paramtable.NewBaseTable()) - mockC.EXPECT().Stop().Return().Once() - executor.stopTask(planID) - }) - - t.Run("Test deplicate execute", func(t *testing.T) { - paramtable.Get().Init(paramtable.NewBaseTable()) - planID := int64(1) - mockC := NewMockCompactor(t) - mockC.EXPECT().GetPlanID().Return(planID) - mockC.EXPECT().GetChannelName().Return("ch1") - mockC.EXPECT().GetSlotUsage().Return(8) - executor := NewExecutor() - succeed, err := executor.Enqueue(mockC) - assert.Equal(t, true, succeed) - assert.NoError(t, err) - - succeed2, err2 := executor.Enqueue(mockC) - assert.Equal(t, false, succeed2) - assert.Error(t, err2) - assert.True(t, errors.Is(err2, merr.ErrDuplicatedCompactionTask)) - - assert.EqualValues(t, 1, len(executor.taskCh)) - assert.EqualValues(t, 1, executor.executing.Len()) - - mockC.EXPECT().Stop().Return().Once() - executor.stopTask(planID) - }) - - t.Run("Test execute task with slot=0", func(t *testing.T) { - paramtable.Get().Init(paramtable.NewBaseTable()) - planID := int64(1) - mockC := NewMockCompactor(t) - mockC.EXPECT().GetPlanID().Return(planID) - mockC.EXPECT().GetChannelName().Return("ch1") - mockC.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction) - mockC.EXPECT().GetSlotUsage().Return(0) - executor := NewExecutor() - - succeed, err := executor.Enqueue(mockC) - assert.Equal(t, true, succeed) - assert.NoError(t, err) - assert.Equal(t, int64(4), executor.Slots()) - assert.Equal(t, int64(4), executor.usingSlots) - - assert.EqualValues(t, 1, len(executor.taskCh)) - assert.EqualValues(t, 1, executor.executing.Len()) - - mockC.EXPECT().Stop().Return().Once() - executor.stopTask(planID) - }) - - t.Run("Test Start", func(t *testing.T) { + t.Run("Test_Enqueue_Success", func(t *testing.T) { ex := NewExecutor() - ctx, cancel := context.WithCancel(context.TODO()) - cancel() - go ex.Start(ctx) + mockC := NewMockCompactor(t) + mockC.EXPECT().GetPlanID().Return(int64(1)) + mockC.EXPECT().GetSlotUsage().Return(int64(8)) + + succeed, err := ex.Enqueue(mockC) + assert.True(t, succeed) + assert.NoError(t, err) + assert.Equal(t, 1, len(ex.taskCh)) + assert.Equal(t, int64(8), ex.Slots()) + + ex.mu.RLock() + task, exists := ex.tasks[1] + ex.mu.RUnlock() + assert.True(t, exists) + assert.Equal(t, datapb.CompactionTaskState_executing, task.state) }) - t.Run("Test executeTask", func(t *testing.T) { - tests := []struct { - isvalid bool + t.Run("Test_Enqueue_Duplicate", func(t *testing.T) { + ex := NewExecutor() + mockC := NewMockCompactor(t) + mockC.EXPECT().GetPlanID().Return(int64(1)).Times(2) + mockC.EXPECT().GetSlotUsage().Return(int64(8)) + mockC.EXPECT().GetChannelName().Return("ch1") - description string + succeed, err := ex.Enqueue(mockC) + assert.True(t, succeed) + assert.NoError(t, err) + + succeed, err = ex.Enqueue(mockC) + assert.False(t, succeed) + assert.Error(t, err) + assert.True(t, errors.Is(err, merr.ErrDuplicatedCompactionTask)) + assert.Equal(t, 1, len(ex.taskCh)) + }) + + t.Run("Test_Enqueue_DefaultSlotUsage", func(t *testing.T) { + ex := NewExecutor() + + testCases := []struct { + name string + compactionType datapb.CompactionType + expectedSlotUsage int64 }{ - {true, "compact success"}, - {false, "compact return error"}, + { + name: "MixCompaction", + compactionType: datapb.CompactionType_MixCompaction, + expectedSlotUsage: paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(), + }, + { + name: "Level0DeleteCompaction", + compactionType: datapb.CompactionType_Level0DeleteCompaction, + expectedSlotUsage: paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64(), + }, + { + name: "ClusteringCompaction", + compactionType: datapb.CompactionType_ClusteringCompaction, + expectedSlotUsage: paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64(), + }, } - ex := NewExecutor() - for _, test := range tests { - t.Run(test.description, func(t *testing.T) { + for i, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { mockC := NewMockCompactor(t) - mockC.EXPECT().GetPlanID().Return(int64(1)) - mockC.EXPECT().GetCollection().Return(int64(1)) - mockC.EXPECT().GetChannelName().Return("ch1") - mockC.EXPECT().Complete().Return().Maybe() - signal := make(chan struct{}) - if test.isvalid { - mockC.EXPECT().Compact().RunAndReturn( - func() (*datapb.CompactionPlanResult, error) { - signal <- struct{}{} - return &datapb.CompactionPlanResult{PlanID: 1}, nil - }).Once() - go ex.executeTask(mockC) - <-signal - } else { - mockC.EXPECT().Compact().RunAndReturn( - func() (*datapb.CompactionPlanResult, error) { - signal <- struct{}{} - return nil, errors.New("mock error") - }).Once() - go ex.executeTask(mockC) - <-signal - } + mockC.EXPECT().GetPlanID().Return(int64(i + 10)) + mockC.EXPECT().GetSlotUsage().Return(int64(0)).Times(2) + mockC.EXPECT().GetCompactionType().Return(tc.compactionType) + + succeed, err := ex.Enqueue(mockC) + assert.True(t, succeed) + assert.NoError(t, err) }) } }) - t.Run("test GetAllCompactionResults", func(t *testing.T) { + t.Run("Test_ExecuteTask_Success", func(t *testing.T) { ex := NewExecutor() - mockC := NewMockCompactor(t) - ex.executing.Insert(int64(1), mockC) - ex.completedCompactor.Insert(int64(2), mockC) - ex.completed.Insert(int64(2), &datapb.CompactionPlanResult{ - PlanID: 2, + planID := int64(1) + result := &datapb.CompactionPlanResult{ + PlanID: planID, State: datapb.CompactionTaskState_completed, - Type: datapb.CompactionType_MixCompaction, - }) - - ex.completedCompactor.Insert(int64(3), mockC) - ex.completed.Insert(int64(3), &datapb.CompactionPlanResult{ - PlanID: 3, - State: datapb.CompactionTaskState_completed, - Type: datapb.CompactionType_Level0DeleteCompaction, - }) - - require.Equal(t, 2, ex.completed.Len()) - require.Equal(t, 2, ex.completedCompactor.Len()) - require.Equal(t, 1, ex.executing.Len()) - - result := ex.GetResults(0) - assert.Equal(t, 3, len(result)) - - for _, res := range result { - if res.PlanID == int64(1) { - assert.Equal(t, res.GetState(), datapb.CompactionTaskState_executing) - } else { - assert.Equal(t, res.GetState(), datapb.CompactionTaskState_completed) - } + Segments: []*datapb.CompactionSegment{ + { + SegmentID: 100, + NumOfRows: 1000, + InsertLogs: nil, + Deltalogs: nil, + }, + }, } - assert.Equal(t, 1, ex.completed.Len()) - require.Equal(t, 1, ex.completedCompactor.Len()) - require.Equal(t, 1, ex.executing.Len()) + mockC.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction) + mockC.EXPECT().GetPlanID().Return(planID).Times(3) + mockC.EXPECT().GetCollection().Return(int64(1)) + mockC.EXPECT().GetChannelName().Return("ch1") + mockC.EXPECT().GetSlotUsage().Return(int64(8)).Times(2) + mockC.EXPECT().Compact().Return(result, nil) + mockC.EXPECT().Complete().Return() + + succeed, err := ex.Enqueue(mockC) + assert.True(t, succeed) + assert.NoError(t, err) + + ex.executeTask(mockC) + + ex.mu.RLock() + task, exists := ex.tasks[planID] + ex.mu.RUnlock() + assert.True(t, exists) + assert.Equal(t, datapb.CompactionTaskState_completed, task.state) + assert.Equal(t, result, task.result) + assert.Equal(t, int64(0), ex.Slots()) + }) + + t.Run("Test_ExecuteTask_Failure", func(t *testing.T) { + ex := NewExecutor() + mockC := NewMockCompactor(t) + + planID := int64(2) + mockC.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction) + mockC.EXPECT().GetPlanID().Return(planID).Times(3) + mockC.EXPECT().GetCollection().Return(int64(1)) + mockC.EXPECT().GetChannelName().Return("ch1") + mockC.EXPECT().GetSlotUsage().Return(int64(8)).Times(2) + mockC.EXPECT().Compact().Return(nil, errors.New("compaction failed")) + mockC.EXPECT().Complete().Return() + + succeed, err := ex.Enqueue(mockC) + assert.True(t, succeed) + assert.NoError(t, err) + + ex.executeTask(mockC) + + ex.mu.RLock() + task, exists := ex.tasks[planID] + ex.mu.RUnlock() + assert.True(t, exists) + assert.Equal(t, datapb.CompactionTaskState_failed, task.state) + assert.Nil(t, task.result) + assert.Equal(t, int64(0), ex.Slots()) + }) + + t.Run("Test_RemoveTask", func(t *testing.T) { + ex := NewExecutor() + + completedTask := &taskState{ + compactor: NewMockCompactor(t), + state: datapb.CompactionTaskState_completed, + result: &datapb.CompactionPlanResult{PlanID: 1}, + } + + executingTask := &taskState{ + compactor: NewMockCompactor(t), + state: datapb.CompactionTaskState_executing, + result: nil, + } + + failedTask := &taskState{ + compactor: NewMockCompactor(t), + state: datapb.CompactionTaskState_failed, + result: nil, + } + + completedTask.compactor.(*MockCompactor).EXPECT().GetChannelName().Return("ch1").Maybe() + executingTask.compactor.(*MockCompactor).EXPECT().GetChannelName().Return("ch2").Maybe() + failedTask.compactor.(*MockCompactor).EXPECT().GetChannelName().Return("ch3").Maybe() + + ex.tasks[1] = completedTask + ex.tasks[2] = executingTask + ex.tasks[3] = failedTask + + ex.RemoveTask(1) + assert.Equal(t, 2, len(ex.tasks)) + + ex.RemoveTask(2) + assert.Equal(t, 2, len(ex.tasks)) + + ex.RemoveTask(3) + assert.Equal(t, 1, len(ex.tasks)) + + _, exists := ex.tasks[2] + assert.True(t, exists) + }) + + t.Run("Test_GetResults_SinglePlan", func(t *testing.T) { + ex := NewExecutor() + + result := &datapb.CompactionPlanResult{ + PlanID: 1, + State: datapb.CompactionTaskState_completed, + } + + ex.tasks[1] = &taskState{ + compactor: NewMockCompactor(t), + state: datapb.CompactionTaskState_completed, + result: result, + } + + results := ex.GetResults(1) + assert.Equal(t, 1, len(results)) + assert.Equal(t, result, results[0]) + }) + + t.Run("Test_GetResults_NonExistentPlan", func(t *testing.T) { + ex := NewExecutor() + + results := ex.GetResults(999) + assert.Equal(t, 1, len(results)) + assert.Equal(t, int64(999), results[0].PlanID) + assert.Equal(t, datapb.CompactionTaskState_failed, results[0].State) + }) + + t.Run("Test_GetResults_All", func(t *testing.T) { + ex := NewExecutor() + + mockC1 := NewMockCompactor(t) + ex.tasks[1] = &taskState{ + compactor: mockC1, + state: datapb.CompactionTaskState_executing, + result: nil, + } + + mockC2 := NewMockCompactor(t) + ex.tasks[2] = &taskState{ + compactor: mockC2, + state: datapb.CompactionTaskState_completed, + result: &datapb.CompactionPlanResult{ + PlanID: 2, + State: datapb.CompactionTaskState_completed, + Type: datapb.CompactionType_MixCompaction, + }, + } + + mockC3 := NewMockCompactor(t) + ex.tasks[3] = &taskState{ + compactor: mockC3, + state: datapb.CompactionTaskState_completed, + result: &datapb.CompactionPlanResult{ + PlanID: 3, + State: datapb.CompactionTaskState_completed, + Type: datapb.CompactionType_Level0DeleteCompaction, + }, + } + + results := ex.GetResults(0) + assert.Equal(t, 3, len(results)) + + planIDs := make(map[int64]bool) + for _, r := range results { + planIDs[r.PlanID] = true + } + assert.True(t, planIDs[1]) + assert.True(t, planIDs[2]) + assert.True(t, planIDs[3]) + + assert.Equal(t, 2, len(ex.tasks)) + _, exists := ex.tasks[3] + assert.False(t, exists) + }) + + t.Run("Test_Start_Context_Cancel", func(t *testing.T) { + ex := NewExecutor() + ctx, cancel := context.WithCancel(context.Background()) + + done := make(chan bool) + go func() { + ex.Start(ctx) + done <- true + }() + + cancel() + + select { + case <-done: + case <-time.After(100 * time.Millisecond): + t.Fatal("Start didn't return after context cancel") + } + }) + + t.Run("Test_Concurrent_Operations", func(t *testing.T) { + ex := NewExecutor() + numTasks := 20 + var wg sync.WaitGroup + + for i := 0; i < numTasks; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + mockC := NewMockCompactor(t) + mockC.EXPECT().GetPlanID().Return(int64(id)) + mockC.EXPECT().GetSlotUsage().Return(int64(1)) + mockC.EXPECT().GetChannelName().Return("ch1").Maybe() + + ex.Enqueue(mockC) + }(i) + } + + wg.Wait() + + assert.Equal(t, numTasks, len(ex.tasks)) + assert.Equal(t, int64(numTasks), ex.Slots()) + }) + + t.Run("Test_CompleteTask_SlotAdjustment", func(t *testing.T) { + ex := NewExecutor() + mockC := NewMockCompactor(t) + + planID := int64(1) + slotUsage := int64(10) + + mockC.EXPECT().GetPlanID().Return(planID) + mockC.EXPECT().GetSlotUsage().Return(slotUsage).Times(2) + mockC.EXPECT().Complete().Return() + + ex.Enqueue(mockC) + assert.Equal(t, slotUsage, ex.Slots()) + + result := &datapb.CompactionPlanResult{PlanID: planID} + ex.completeTask(planID, result) + + assert.Equal(t, int64(0), ex.Slots()) + + ex.mu.RLock() + task := ex.tasks[planID] + ex.mu.RUnlock() + assert.Equal(t, datapb.CompactionTaskState_completed, task.state) + assert.Equal(t, result, task.result) + }) + + t.Run("Test_CompleteTask_NegativeSlotProtection", func(t *testing.T) { + ex := NewExecutor() + + ex.usingSlots = -5 + + mockC := NewMockCompactor(t) + mockC.EXPECT().GetSlotUsage().Return(int64(10)) + mockC.EXPECT().Complete().Return() + + ex.tasks[1] = &taskState{ + compactor: mockC, + state: datapb.CompactionTaskState_executing, + } + + ex.completeTask(1, nil) + + assert.Equal(t, int64(0), ex.Slots()) + }) + + t.Run("Test_Task_State_Transitions", func(t *testing.T) { + ex := NewExecutor() + mockC := NewMockCompactor(t) + + planID := int64(1) + mockC.EXPECT().GetPlanID().Return(planID).Times(3) + mockC.EXPECT().GetSlotUsage().Return(int64(5)).Times(2) + mockC.EXPECT().GetCollection().Return(int64(1)) + mockC.EXPECT().GetChannelName().Return("ch1") + mockC.EXPECT().Complete().Return() + mockC.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction) + + ex.Enqueue(mockC) + ex.mu.RLock() + assert.Equal(t, datapb.CompactionTaskState_executing, ex.tasks[planID].state) + ex.mu.RUnlock() + + mockC.EXPECT().Compact().Return(&datapb.CompactionPlanResult{ + PlanID: planID, + State: datapb.CompactionTaskState_completed, + }, nil).Once() + + ex.executeTask(mockC) + + ex.mu.RLock() + assert.Equal(t, datapb.CompactionTaskState_completed, ex.tasks[planID].state) + ex.mu.RUnlock() + }) + + t.Run("Test_GetResults_ExecutingTask", func(t *testing.T) { + ex := NewExecutor() + + ex.tasks[1] = &taskState{ + compactor: NewMockCompactor(t), + state: datapb.CompactionTaskState_executing, + result: nil, + } + + results := ex.GetResults(1) + assert.Equal(t, 1, len(results)) + assert.Equal(t, int64(1), results[0].PlanID) + assert.Equal(t, datapb.CompactionTaskState_executing, results[0].State) + }) + + t.Run("Test_Multiple_ExecuteTask_WithMetrics", func(t *testing.T) { + ex := NewExecutor() + + planIDs := []int64{1, 2, 3} + for _, planID := range planIDs { + mockC := NewMockCompactor(t) + mockC.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction) + mockC.EXPECT().GetPlanID().Return(planID).Times(3) + mockC.EXPECT().GetCollection().Return(int64(100)) + mockC.EXPECT().GetChannelName().Return("ch1") + mockC.EXPECT().GetSlotUsage().Return(int64(4)).Times(2) + mockC.EXPECT().Complete().Return() + + result := &datapb.CompactionPlanResult{ + PlanID: planID, + State: datapb.CompactionTaskState_completed, + Segments: []*datapb.CompactionSegment{ + { + SegmentID: planID * 100, + NumOfRows: planID * 1000, + Deltalogs: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {EntriesNum: 10}, + }, + }, + }, + }, + }, + } + mockC.EXPECT().Compact().Return(result, nil) + + succeed, err := ex.Enqueue(mockC) + require.True(t, succeed) + require.NoError(t, err) + + ex.executeTask(mockC) + } + + results := ex.GetResults(0) + assert.Equal(t, 3, len(results)) + for _, result := range results { + assert.Equal(t, datapb.CompactionTaskState_completed, result.State) + } }) } diff --git a/internal/datanode/compactor/mix_compactor_test.go b/internal/datanode/compactor/mix_compactor_test.go index 1e63377467..0339c6eb95 100644 --- a/internal/datanode/compactor/mix_compactor_test.go +++ b/internal/datanode/compactor/mix_compactor_test.go @@ -100,7 +100,7 @@ func (s *MixCompactionTaskStorageV1Suite) setupTest() { func (s *MixCompactionTaskStorageV1Suite) SetupTest() { s.setupTest() - paramtable.Get().Save("common.storage.enableV2", "false") + paramtable.Get().Save("common.storage.enablev2", "false") } func (s *MixCompactionTaskStorageV1Suite) SetupBM25() { @@ -140,7 +140,7 @@ func (s *MixCompactionTaskStorageV1Suite) SetupSubTest() { func (s *MixCompactionTaskStorageV1Suite) TearDownTest() { paramtable.Get().Reset(paramtable.Get().CommonCfg.EntityExpirationTTL.Key) paramtable.Get().Reset("common.storageType") - paramtable.Get().Reset("common.storage.enableV2") + paramtable.Get().Reset("common.storage.enablev2") } func getMilvusBirthday() time.Time { diff --git a/internal/datanode/services.go b/internal/datanode/services.go index b5aa04a89d..0dcb7f7a53 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -263,8 +263,8 @@ func (node *DataNode) CompactionV2(ctx context.Context, req *datapb.CompactionPl } } -// GetCompactionState called by DataCoord -// return status of all compaction plans +// GetCompactionState called by DataCoord return status of all compaction plans +// Deprecated after v2.6.0 func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.CompactionStateRequest) (*datapb.CompactionStateResponse, error) { if err := merr.CheckHealthy(node.GetStateCode()); err != nil { log.Ctx(ctx).Warn("DataNode.GetCompactionState failed", zap.Int64("nodeId", node.GetNodeID()), zap.Error(err)) @@ -522,6 +522,7 @@ func (node *DataNode) QuerySlot(ctx context.Context, req *datapb.QuerySlotReques }, nil } +// Not in used now func (node *DataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return merr.Status(err), nil diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index b557b8d6a5..173a5fd712 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -158,6 +158,7 @@ func (s *DataNodeServicesSuite) TestGetCompactionState() { ) mockC := compactor.NewMockCompactor(s.T()) + mockC.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction) mockC.EXPECT().GetPlanID().Return(int64(1)) mockC.EXPECT().GetCollection().Return(collection) mockC.EXPECT().GetChannelName().Return(channel) @@ -170,6 +171,7 @@ func (s *DataNodeServicesSuite) TestGetCompactionState() { s.node.compactionExecutor.Enqueue(mockC) mockC2 := compactor.NewMockCompactor(s.T()) + mockC2.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction) mockC2.EXPECT().GetPlanID().Return(int64(2)) mockC2.EXPECT().GetCollection().Return(collection) mockC2.EXPECT().GetChannelName().Return(channel) diff --git a/tests/integration/hellomilvus/hello_streaming_test.go b/tests/integration/hellomilvus/hello_streaming_test.go index 3027aa6461..eacc08c094 100644 --- a/tests/integration/hellomilvus/hello_streaming_test.go +++ b/tests/integration/hellomilvus/hello_streaming_test.go @@ -91,15 +91,6 @@ func (s *HelloMilvusSuite) TestHelloStreaming() { s.NoError(err) s.Equal(int64(rowNum), insertResult.GetInsertCnt()) - // delete - deleteResult, err := c.MilvusClient.Delete(ctx, &milvuspb.DeleteRequest{ - DbName: dbName, - CollectionName: collectionName, - Expr: integration.Int64Field + " in [1, 2]", - }) - err = merr.CheckRPCCall(deleteResult, err) - s.NoError(err) - // flush flushResp, err := c.MilvusClient.Flush(ctx, &milvuspb.FlushRequest{ DbName: dbName, @@ -133,7 +124,7 @@ func (s *HelloMilvusSuite) TestHelloStreaming() { flushedSegment := lo.Filter(segments, func(info *datapb.SegmentInfo, i int) bool { return info.GetState() == commonpb.SegmentState_Flushed || info.GetState() == commonpb.SegmentState_Flushing }) - s.Equal(2, len(flushedSegment)) + s.Equal(1, len(flushedSegment)) s.Equal(int64(rowNum), segments[0].GetNumOfRows()) // load @@ -159,6 +150,16 @@ func (s *HelloMilvusSuite) TestHelloStreaming() { s.NoError(err) s.Equal(nq*topk, len(searchResult.GetResults().GetScores())) + // delete before query + // avoid L0 Compaction causing segment number unstable + deleteResult, err := c.MilvusClient.Delete(ctx, &milvuspb.DeleteRequest{ + DbName: dbName, + CollectionName: collectionName, + Expr: integration.Int64Field + " in [1, 2]", + }) + err = merr.CheckRPCCall(deleteResult, err) + s.NoError(err) + // query queryResult, err := c.MilvusClient.Query(ctx, &milvuspb.QueryRequest{ DbName: dbName,