diff --git a/internal/datanode/compaction_executor.go b/internal/datanode/compaction_executor.go index 9890310f2d..cf6300f8a2 100644 --- a/internal/datanode/compaction_executor.go +++ b/internal/datanode/compaction_executor.go @@ -59,11 +59,11 @@ func (c *compactionExecutor) toCompleteState(task compactor) { c.executing.Delete(task.getPlanID()) } -func (c *compactionExecutor) injectDone(planID UniqueID) { +func (c *compactionExecutor) injectDone(planID UniqueID, success bool) { c.completed.Delete(planID) task, loaded := c.completedCompactor.LoadAndDelete(planID) if loaded { - task.(compactor).injectDone() + task.(compactor).injectDone(success) } } @@ -129,7 +129,7 @@ func (c *compactionExecutor) stopExecutingtaskByVChannelName(vChannelName string // remove all completed plans for vChannelName c.completed.Range(func(key interface{}, value interface{}) bool { if value.(*datapb.CompactionResult).GetChannel() == vChannelName { - c.injectDone(key.(UniqueID)) + c.injectDone(key.(UniqueID), true) log.Info("remove compaction results for dropped channel", zap.String("channel", vChannelName), zap.Int64("planID", key.(UniqueID))) diff --git a/internal/datanode/compaction_executor_test.go b/internal/datanode/compaction_executor_test.go index 020fa37f39..5e0e4e2e1f 100644 --- a/internal/datanode/compaction_executor_test.go +++ b/internal/datanode/compaction_executor_test.go @@ -20,8 +20,9 @@ import ( "context" "testing" - "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/proto/datapb" ) func TestCompactionExecutor(t *testing.T) { @@ -41,6 +42,20 @@ func TestCompactionExecutor(t *testing.T) { ex.stopTask(UniqueID(1)) }) + t.Run("Test injectDone", func(t *testing.T) { + ex := newCompactionExecutor() + mc := newMockCompactor(true) + ex.completed.Store(UniqueID(1), &datapb.CompactionResult{}) + ex.completedCompactor.Store(UniqueID(1), mc) + + ex.injectDone(UniqueID(1), false) + + _, ok := ex.completed.Load(UniqueID(1)) + assert.False(t, ok) + _, ok = ex.completedCompactor.Load(UniqueID(1)) + assert.False(t, ok) + }) + t.Run("Test start", func(t *testing.T) { ex := newCompactionExecutor() ctx, cancel := context.WithCancel(context.TODO()) @@ -153,7 +168,7 @@ func (mc *mockCompactor) compact() (*datapb.CompactionResult, error) { return nil, nil } -func (mc *mockCompactor) injectDone() { +func (mc *mockCompactor) injectDone(success bool) { } diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index d4e90c24c5..be033e40bf 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -57,7 +57,7 @@ type iterator = storage.Iterator type compactor interface { complete() compact() (*datapb.CompactionResult, error) - injectDone() + injectDone(success bool) stop() getPlanID() UniqueID getCollection() UniqueID @@ -123,7 +123,7 @@ func (t *compactionTask) complete() { func (t *compactionTask) stop() { t.cancel() <-t.done - t.injectDone() + t.injectDone(true) } func (t *compactionTask) getPlanID() UniqueID { @@ -689,10 +689,10 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) { return pack, nil } -func (t *compactionTask) injectDone() { +func (t *compactionTask) injectDone(success bool) { if t.inject != nil { uninjectStart := time.Now() - t.inject.injectDone(true) + t.inject.injectDone(success) uninjectEnd := time.Now() log.Info("uninject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(uninjectEnd.Sub(uninjectStart)))) } diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index a841c26b8c..68bda34388 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -767,7 +767,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { assert.NotEmpty(t, result.Field2StatslogPaths) assert.Equal(t, 0, mockfm.injectCount()) - task.injectDone() + task.injectDone(true) time.Sleep(500 * time.Millisecond) assert.Equal(t, 1, mockfm.injectCount()) } @@ -855,7 +855,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { assert.NotEmpty(t, result.Field2StatslogPaths) assert.Equal(t, 0, mockfm.injectCount()) - task.injectDone() + task.injectDone(true) time.Sleep(500 * time.Millisecond) assert.Equal(t, 1, mockfm.injectCount()) }) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 5311195f84..26db33fb8a 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -954,9 +954,10 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments if err = channel.mergeFlushedSegments(ctx, targetSeg, req.GetPlanID(), req.GetCompactedFrom()); err != nil { log.Ctx(ctx).Warn("fail to merge flushed segments", zap.Error(err)) status.Reason = err.Error() + node.compactionExecutor.injectDone(req.GetPlanID(), false) return status, nil } - node.compactionExecutor.injectDone(req.GetPlanID()) + node.compactionExecutor.injectDone(req.GetPlanID(), true) status.ErrorCode = commonpb.ErrorCode_Success return status, nil }