From 86a36b105a041cbdeb6a49ffe995ad9c503c94df Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Fri, 14 Jun 2024 14:34:01 +0800 Subject: [PATCH] enhance: Tidy compaction executor (#33778) Move compaction executor to compaction pacakge. issue: https://github.com/milvus-io/milvus/issues/32451 Signed-off-by: bigsheeper --- .../executor.go} | 134 ++++++++++-------- .../executor_test.go} | 39 +++-- internal/datanode/data_node.go | 9 +- internal/datanode/data_sync_service.go | 3 +- internal/datanode/flow_graph_dd_node.go | 9 +- internal/datanode/flow_graph_dd_node_test.go | 7 +- internal/datanode/services.go | 16 +-- internal/datanode/services_test.go | 71 +++++----- 8 files changed, 156 insertions(+), 132 deletions(-) rename internal/datanode/{compaction_executor.go => compaction/executor.go} (57%) rename internal/datanode/{compaction_executor_test.go => compaction/executor_test.go} (85%) diff --git a/internal/datanode/compaction_executor.go b/internal/datanode/compaction/executor.go similarity index 57% rename from internal/datanode/compaction_executor.go rename to internal/datanode/compaction/executor.go index c044311733..f0e4a427de 100644 --- a/internal/datanode/compaction_executor.go +++ b/internal/datanode/compaction/executor.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package datanode +package compaction import ( "context" @@ -24,9 +24,9 @@ import ( "go.uber.org/zap" "golang.org/x/sync/semaphore" - "github.com/milvus-io/milvus/internal/datanode/compaction" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -35,11 +35,21 @@ const ( maxParallelTaskNum = 10 ) -type compactionExecutor struct { - executing *typeutil.ConcurrentMap[int64, compaction.Compactor] // planID to compactor - completedCompactor *typeutil.ConcurrentMap[int64, compaction.Compactor] // planID to compactor +type Executor interface { + Start(ctx context.Context) + Execute(task Compactor) + Slots() int64 + RemoveTask(planID int64) + GetResults(planID int64) []*datapb.CompactionPlanResult + DiscardByDroppedChannel(channel string) + DiscardPlan(channel string) +} + +type executor struct { + executing *typeutil.ConcurrentMap[int64, Compactor] // planID to compactor + completedCompactor *typeutil.ConcurrentMap[int64, Compactor] // planID to compactor completed *typeutil.ConcurrentMap[int64, *datapb.CompactionPlanResult] // planID to CompactionPlanResult - taskCh chan compaction.Compactor + taskCh chan Compactor taskSem *semaphore.Weighted dropped *typeutil.ConcurrentSet[string] // vchannel dropped @@ -48,58 +58,62 @@ type compactionExecutor struct { resultGuard sync.RWMutex } -func newCompactionExecutor() *compactionExecutor { - return &compactionExecutor{ - executing: typeutil.NewConcurrentMap[int64, compaction.Compactor](), - completedCompactor: typeutil.NewConcurrentMap[int64, compaction.Compactor](), +func NewExecutor() *executor { + return &executor{ + executing: typeutil.NewConcurrentMap[int64, Compactor](), + completedCompactor: typeutil.NewConcurrentMap[int64, Compactor](), completed: typeutil.NewConcurrentMap[int64, *datapb.CompactionPlanResult](), - taskCh: make(chan compaction.Compactor, maxTaskQueueNum), + taskCh: make(chan Compactor, maxTaskQueueNum), taskSem: semaphore.NewWeighted(maxParallelTaskNum), dropped: typeutil.NewConcurrentSet[string](), } } -func (c *compactionExecutor) execute(task compaction.Compactor) { - c.taskCh <- task - c.toExecutingState(task) +func (e *executor) Execute(task Compactor) { + e.taskCh <- task + e.toExecutingState(task) } -func (c *compactionExecutor) toExecutingState(task compaction.Compactor) { - c.executing.Insert(task.GetPlanID(), task) +func (e *executor) Slots() int64 { + return paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64() - int64(e.executing.Len()) } -func (c *compactionExecutor) toCompleteState(task compaction.Compactor) { +func (e *executor) toExecutingState(task Compactor) { + e.executing.Insert(task.GetPlanID(), task) +} + +func (e *executor) toCompleteState(task Compactor) { task.Complete() - c.executing.GetAndRemove(task.GetPlanID()) + e.executing.GetAndRemove(task.GetPlanID()) } -func (c *compactionExecutor) removeTask(planID UniqueID) { - c.completed.GetAndRemove(planID) - task, loaded := c.completedCompactor.GetAndRemove(planID) +func (e *executor) RemoveTask(planID int64) { + e.completed.GetAndRemove(planID) + task, loaded := e.completedCompactor.GetAndRemove(planID) if loaded { log.Info("Compaction task removed", zap.Int64("planID", planID), zap.String("channel", task.GetChannelName())) } } -func (c *compactionExecutor) start(ctx context.Context) { +func (e *executor) Start(ctx context.Context) { for { select { case <-ctx.Done(): return - case task := <-c.taskCh: - err := c.taskSem.Acquire(ctx, 1) + case task := <-e.taskCh: + err := e.taskSem.Acquire(ctx, 1) if err != nil { return } go func() { - defer c.taskSem.Release(1) - c.executeTask(task) + defer e.taskSem.Release(1) + e.executeTask(task) }() } } } -func (c *compactionExecutor) executeTask(task compaction.Compactor) { +func (e *executor) executeTask(task Compactor) { log := log.With( zap.Int64("planID", task.GetPlanID()), zap.Int64("Collection", task.GetCollection()), @@ -107,7 +121,7 @@ func (c *compactionExecutor) executeTask(task compaction.Compactor) { ) defer func() { - c.toCompleteState(task) + e.toCompleteState(task) }() log.Info("start to execute compaction") @@ -117,45 +131,45 @@ func (c *compactionExecutor) executeTask(task compaction.Compactor) { log.Warn("compaction task failed", zap.Error(err)) return } - c.completed.Insert(result.GetPlanID(), result) - c.completedCompactor.Insert(result.GetPlanID(), task) + e.completed.Insert(result.GetPlanID(), result) + e.completedCompactor.Insert(result.GetPlanID(), task) log.Info("end to execute compaction") } -func (c *compactionExecutor) stopTask(planID UniqueID) { - task, loaded := c.executing.GetAndRemove(planID) +func (e *executor) stopTask(planID int64) { + task, loaded := e.executing.GetAndRemove(planID) if loaded { log.Warn("compaction executor stop task", zap.Int64("planID", planID), zap.String("vChannelName", task.GetChannelName())) task.Stop() } } -func (c *compactionExecutor) isValidChannel(channel string) bool { +func (e *executor) isValidChannel(channel string) bool { // if vchannel marked dropped, compaction should not proceed - return !c.dropped.Contain(channel) + return !e.dropped.Contain(channel) } -func (c *compactionExecutor) discardByDroppedChannel(channel string) { - c.dropped.Insert(channel) - c.discardPlan(channel) +func (e *executor) DiscardByDroppedChannel(channel string) { + e.dropped.Insert(channel) + e.DiscardPlan(channel) } -func (c *compactionExecutor) discardPlan(channel string) { - c.resultGuard.Lock() - defer c.resultGuard.Unlock() +func (e *executor) DiscardPlan(channel string) { + e.resultGuard.Lock() + defer e.resultGuard.Unlock() - c.executing.Range(func(planID int64, task compaction.Compactor) bool { + e.executing.Range(func(planID int64, task Compactor) bool { if task.GetChannelName() == channel { - c.stopTask(planID) + e.stopTask(planID) } return true }) // remove all completed plans of channel - c.completed.Range(func(planID int64, result *datapb.CompactionPlanResult) bool { + e.completed.Range(func(planID int64, result *datapb.CompactionPlanResult) bool { if result.GetChannel() == channel { - c.removeTask(planID) + e.RemoveTask(planID) log.Info("remove compaction plan and results", zap.String("channel", channel), zap.Int64("planID", planID)) @@ -164,10 +178,18 @@ func (c *compactionExecutor) discardPlan(channel string) { }) } -func (c *compactionExecutor) getCompactionResult(planID int64) *datapb.CompactionPlanResult { - c.resultGuard.RLock() - defer c.resultGuard.RUnlock() - _, ok := c.executing.Get(planID) +func (e *executor) GetResults(planID int64) []*datapb.CompactionPlanResult { + if planID != 0 { + result := e.getCompactionResult(planID) + return []*datapb.CompactionPlanResult{result} + } + return e.getAllCompactionResults() +} + +func (e *executor) getCompactionResult(planID int64) *datapb.CompactionPlanResult { + e.resultGuard.RLock() + defer e.resultGuard.RUnlock() + _, ok := e.executing.Get(planID) if ok { result := &datapb.CompactionPlanResult{ State: datapb.CompactionTaskState_executing, @@ -175,7 +197,7 @@ func (c *compactionExecutor) getCompactionResult(planID int64) *datapb.Compactio } return result } - result, ok2 := c.completed.Get(planID) + result, ok2 := e.completed.Get(planID) if !ok2 { return &datapb.CompactionPlanResult{ PlanID: planID, @@ -185,9 +207,9 @@ func (c *compactionExecutor) getCompactionResult(planID int64) *datapb.Compactio return result } -func (c *compactionExecutor) getAllCompactionResults() []*datapb.CompactionPlanResult { - c.resultGuard.RLock() - defer c.resultGuard.RUnlock() +func (e *executor) getAllCompactionResults() []*datapb.CompactionPlanResult { + e.resultGuard.RLock() + defer e.resultGuard.RUnlock() var ( executing []int64 completed []int64 @@ -195,7 +217,7 @@ func (c *compactionExecutor) getAllCompactionResults() []*datapb.CompactionPlanR ) results := make([]*datapb.CompactionPlanResult, 0) // get executing results - c.executing.Range(func(planID int64, task compaction.Compactor) bool { + e.executing.Range(func(planID int64, task Compactor) bool { executing = append(executing, planID) results = append(results, &datapb.CompactionPlanResult{ State: datapb.CompactionTaskState_executing, @@ -205,7 +227,7 @@ func (c *compactionExecutor) getAllCompactionResults() []*datapb.CompactionPlanR }) // get completed results - c.completed.Range(func(planID int64, result *datapb.CompactionPlanResult) bool { + e.completed.Range(func(planID int64, result *datapb.CompactionPlanResult) bool { completed = append(completed, planID) results = append(results, result) @@ -217,8 +239,8 @@ func (c *compactionExecutor) getAllCompactionResults() []*datapb.CompactionPlanR // remove level zero results lo.ForEach(completedLevelZero, func(planID int64, _ int) { - c.completed.Remove(planID) - c.completedCompactor.Remove(planID) + e.completed.Remove(planID) + e.completedCompactor.Remove(planID) }) if len(results) > 0 { diff --git a/internal/datanode/compaction_executor_test.go b/internal/datanode/compaction/executor_test.go similarity index 85% rename from internal/datanode/compaction_executor_test.go rename to internal/datanode/compaction/executor_test.go index fe1e164afb..164852c8e8 100644 --- a/internal/datanode/compaction_executor_test.go +++ b/internal/datanode/compaction/executor_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package datanode +package compaction import ( "context" @@ -24,18 +24,17 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/milvus-io/milvus/internal/datanode/compaction" "github.com/milvus-io/milvus/internal/proto/datapb" ) func TestCompactionExecutor(t *testing.T) { t.Run("Test execute", func(t *testing.T) { planID := int64(1) - mockC := compaction.NewMockCompactor(t) + mockC := NewMockCompactor(t) mockC.EXPECT().GetPlanID().Return(planID).Once() mockC.EXPECT().GetChannelName().Return("ch1").Once() - executor := newCompactionExecutor() - executor.execute(mockC) + executor := NewExecutor() + executor.Execute(mockC) assert.EqualValues(t, 1, len(executor.taskCh)) assert.EqualValues(t, 1, executor.executing.Len()) @@ -44,11 +43,11 @@ func TestCompactionExecutor(t *testing.T) { executor.stopTask(planID) }) - t.Run("Test start", func(t *testing.T) { - ex := newCompactionExecutor() + t.Run("Test Start", func(t *testing.T) { + ex := NewExecutor() ctx, cancel := context.WithCancel(context.TODO()) cancel() - go ex.start(ctx) + go ex.Start(ctx) }) t.Run("Test executeTask", func(t *testing.T) { @@ -61,10 +60,10 @@ func TestCompactionExecutor(t *testing.T) { {false, "compact return error"}, } - ex := newCompactionExecutor() + ex := NewExecutor() for _, test := range tests { t.Run(test.description, func(t *testing.T) { - mockC := compaction.NewMockCompactor(t) + mockC := NewMockCompactor(t) mockC.EXPECT().GetPlanID().Return(int64(1)) mockC.EXPECT().GetCollection().Return(int64(1)) mockC.EXPECT().GetChannelName().Return("ch1") @@ -100,8 +99,8 @@ func TestCompactionExecutor(t *testing.T) { {expected: true, channel: "ch1", desc: "no in dropped"}, {expected: false, channel: "ch2", desc: "in dropped"}, } - ex := newCompactionExecutor() - ex.discardByDroppedChannel("ch2") + ex := NewExecutor() + ex.DiscardByDroppedChannel("ch2") for _, test := range tests { t.Run(test.desc, func(t *testing.T) { assert.Equal(t, test.expected, ex.isValidChannel(test.channel)) @@ -110,26 +109,26 @@ func TestCompactionExecutor(t *testing.T) { }) t.Run("test stop vchannel tasks", func(t *testing.T) { - ex := newCompactionExecutor() - mc := compaction.NewMockCompactor(t) + ex := NewExecutor() + mc := NewMockCompactor(t) mc.EXPECT().GetPlanID().Return(int64(1)) mc.EXPECT().GetChannelName().Return("mock") mc.EXPECT().Compact().Return(&datapb.CompactionPlanResult{PlanID: 1}, nil).Maybe() mc.EXPECT().Stop().Return().Once() - ex.execute(mc) + ex.Execute(mc) require.True(t, ex.executing.Contain(int64(1))) - ex.discardByDroppedChannel("mock") + ex.DiscardByDroppedChannel("mock") assert.True(t, ex.dropped.Contain("mock")) assert.False(t, ex.executing.Contain(int64(1))) }) - t.Run("test getAllCompactionResults", func(t *testing.T) { - ex := newCompactionExecutor() + t.Run("test GetAllCompactionResults", func(t *testing.T) { + ex := NewExecutor() - mockC := compaction.NewMockCompactor(t) + mockC := NewMockCompactor(t) ex.executing.Insert(int64(1), mockC) ex.completedCompactor.Insert(int64(2), mockC) @@ -150,7 +149,7 @@ func TestCompactionExecutor(t *testing.T) { require.Equal(t, 2, ex.completedCompactor.Len()) require.Equal(t, 1, ex.executing.Len()) - result := ex.getAllCompactionResults() + result := ex.GetResults(0) assert.Equal(t, 3, len(result)) for _, res := range result { diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 922ba5cd70..23e968db51 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/datanode/broker" + "github.com/milvus-io/milvus/internal/datanode/compaction" "github.com/milvus-io/milvus/internal/datanode/importv2" "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/internal/datanode/writebuffer" @@ -99,7 +100,7 @@ type DataNode struct { clearSignal chan string // vchannel name segmentCache *Cache - compactionExecutor *compactionExecutor + compactionExecutor compaction.Executor timeTickSender *timeTickSender channelCheckpointUpdater *channelCheckpointUpdater @@ -142,7 +143,7 @@ func NewDataNode(ctx context.Context, factory dependency.Factory) *DataNode { dataCoord: nil, factory: factory, segmentCache: newCache(), - compactionExecutor: newCompactionExecutor(), + compactionExecutor: compaction.NewExecutor(), clearSignal: make(chan string, 100), @@ -327,7 +328,7 @@ func (node *DataNode) handleChannelEvt(evt *clientv3.Event) { func (node *DataNode) tryToReleaseFlowgraph(channel string) { log.Info("try to release flowgraph", zap.String("channel", channel)) if node.compactionExecutor != nil { - node.compactionExecutor.discardPlan(channel) + node.compactionExecutor.DiscardPlan(channel) } if node.flowgraphManager != nil { node.flowgraphManager.RemoveFlowgraph(channel) @@ -396,7 +397,7 @@ func (node *DataNode) Start() error { node.stopWaiter.Add(1) go node.BackGroundGC(node.clearSignal) - go node.compactionExecutor.start(node.ctx) + go node.compactionExecutor.Start(node.ctx) go node.importScheduler.Start() diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 73dcd166e5..10c2534f17 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -25,6 +25,7 @@ import ( "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/datanode/broker" + "github.com/milvus-io/milvus/internal/datanode/compaction" "github.com/milvus-io/milvus/internal/datanode/io" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" @@ -65,7 +66,7 @@ type dataSyncService struct { flushCh chan flushMsg resendTTCh chan resendTTMsg // chan to ask for resending DataNode time tick message. timetickSender *timeTickSender // reference to timeTickSender - compactor *compactionExecutor // reference to compaction executor + compactor compaction.Executor // reference to compaction executor flushingSegCache *Cache // a guarding cache stores currently flushing segment ids clearSignal chan<- string // signal channel to notify flowgraph close for collection/partition drop msg consumed diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index db78853f95..f829f1c2ae 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/datanode/compaction" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/pkg/log" @@ -64,7 +65,7 @@ type ddNode struct { vChannelName string dropMode atomic.Value - compactionExecutor *compactionExecutor + compactionExecutor compaction.Executor // for recovery growingSegInfo map[UniqueID]*datapb.SegmentInfo // segmentID @@ -149,7 +150,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { ddn.dropMode.Store(true) log.Info("Stop compaction for dropped channel", zap.String("channel", ddn.vChannelName)) - ddn.compactionExecutor.discardByDroppedChannel(ddn.vChannelName) + ddn.compactionExecutor.DiscardByDroppedChannel(ddn.vChannelName) fgMsg.dropCollection = true } @@ -280,7 +281,7 @@ func (ddn *ddNode) Close() { } func newDDNode(ctx context.Context, collID UniqueID, vChannelName string, droppedSegmentIDs []UniqueID, - sealedSegments []*datapb.SegmentInfo, growingSegments []*datapb.SegmentInfo, compactor *compactionExecutor, + sealedSegments []*datapb.SegmentInfo, growingSegments []*datapb.SegmentInfo, executor compaction.Executor, ) (*ddNode, error) { baseNode := BaseNode{} baseNode.SetMaxQueueLength(Params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32()) @@ -294,7 +295,7 @@ func newDDNode(ctx context.Context, collID UniqueID, vChannelName string, droppe growingSegInfo: make(map[UniqueID]*datapb.SegmentInfo, len(growingSegments)), droppedSegmentIDs: droppedSegmentIDs, vChannelName: vChannelName, - compactionExecutor: compactor, + compactionExecutor: executor, } dd.dropMode.Store(false) diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index f191c34e8e..fc80d5f49b 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/datanode/compaction" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -76,7 +77,7 @@ func TestFlowGraph_DDNode_newDDNode(t *testing.T) { droppedSegIDs, test.inSealedSegs, test.inGrowingSegs, - newCompactionExecutor(), + compaction.NewExecutor(), ) require.NoError(t, err) require.NotNil(t, ddNode) @@ -141,7 +142,7 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) { ctx: context.Background(), collectionID: test.ddnCollID, vChannelName: "ddn_drop_msg", - compactionExecutor: newCompactionExecutor(), + compactionExecutor: compaction.NewExecutor(), } var dropCollMsg msgstream.TsMsg = &msgstream.DropCollectionMsg{ @@ -194,7 +195,7 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) { ctx: context.Background(), collectionID: test.ddnCollID, vChannelName: "ddn_drop_msg", - compactionExecutor: newCompactionExecutor(), + compactionExecutor: compaction.NewExecutor(), } var dropPartMsg msgstream.TsMsg = &msgstream.DropPartitionMsg{ diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 0014beb649..33a512a8f8 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -250,7 +250,7 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan return merr.Status(merr.WrapErrParameterInvalidMsg("Unknown compaction type: %v", req.GetType().String())), nil } - node.compactionExecutor.execute(task) + node.compactionExecutor.Execute(task) return merr.Success(), nil } @@ -264,13 +264,7 @@ func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.Compac }, nil } - results := make([]*datapb.CompactionPlanResult, 0) - if req.GetPlanID() != 0 { - result := node.compactionExecutor.getCompactionResult(req.GetPlanID()) - results = append(results, result) - } else { - results = node.compactionExecutor.getAllCompactionResults() - } + results := node.compactionExecutor.GetResults(req.GetPlanID()) return &datapb.CompactionStateResponse{ Status: merr.Success(), Results: results, @@ -301,7 +295,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments ds, ok := node.flowgraphManager.GetFlowgraphService(req.GetChannelName()) if !ok { - node.compactionExecutor.discardPlan(req.GetChannelName()) + node.compactionExecutor.DiscardPlan(req.GetChannelName()) err := merr.WrapErrChannelNotFound(req.GetChannelName()) log.Warn("failed to get flow graph service", zap.Error(err)) return merr.Status(err), nil @@ -562,7 +556,7 @@ func (node *DataNode) QuerySlot(ctx context.Context, req *datapb.QuerySlotReques return &datapb.QuerySlotResponse{ Status: merr.Success(), - NumSlots: Params.DataNodeCfg.SlotCap.GetAsInt64() - int64(node.compactionExecutor.executing.Len()), + NumSlots: node.compactionExecutor.Slots(), }, nil } @@ -571,7 +565,7 @@ func (node *DataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropCo return merr.Status(err), nil } - node.compactionExecutor.removeTask(req.GetPlanID()) + node.compactionExecutor.RemoveTask(req.GetPlanID()) log.Ctx(ctx).Info("DropCompactionPlans success", zap.Int64("planID", req.GetPlanID())) return merr.Success(), nil } diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 5f79aa0186..904154c904 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -19,8 +19,8 @@ package datanode import ( "context" "math/rand" - "sync" "testing" + "time" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -160,44 +160,49 @@ func (s *DataNodeServicesSuite) TestGetComponentStates() { func (s *DataNodeServicesSuite) TestGetCompactionState() { s.Run("success", func() { + const ( + collection = int64(100) + channel = "ch-0" + ) + mockC := compaction.NewMockCompactor(s.T()) - s.node.compactionExecutor.executing.Insert(int64(3), mockC) - - mockC2 := compaction.NewMockCompactor(s.T()) - s.node.compactionExecutor.executing.Insert(int64(2), mockC2) - - s.node.compactionExecutor.completed.Insert(int64(1), &datapb.CompactionPlanResult{ + mockC.EXPECT().GetPlanID().Return(int64(1)) + mockC.EXPECT().GetCollection().Return(collection) + mockC.EXPECT().GetChannelName().Return(channel) + mockC.EXPECT().Complete().Return() + mockC.EXPECT().Compact().Return(&datapb.CompactionPlanResult{ PlanID: 1, State: datapb.CompactionTaskState_completed, - Segments: []*datapb.CompactionSegment{ - {SegmentID: 10}, - }, - }) + }, nil) + s.node.compactionExecutor.Execute(mockC) - s.node.compactionExecutor.completed.Insert(int64(4), &datapb.CompactionPlanResult{ - PlanID: 4, - Type: datapb.CompactionType_Level0DeleteCompaction, - State: datapb.CompactionTaskState_completed, - }) + mockC2 := compaction.NewMockCompactor(s.T()) + mockC2.EXPECT().GetPlanID().Return(int64(2)) + mockC2.EXPECT().GetCollection().Return(collection) + mockC2.EXPECT().GetChannelName().Return(channel) + mockC2.EXPECT().Complete().Return() + mockC2.EXPECT().Compact().Return(&datapb.CompactionPlanResult{ + PlanID: 2, + State: datapb.CompactionTaskState_failed, + }, nil) + s.node.compactionExecutor.Execute(mockC2) - stat, err := s.node.GetCompactionState(s.ctx, nil) - s.Assert().NoError(err) - s.Assert().Equal(4, len(stat.GetResults())) - - var mu sync.RWMutex - cnt := 0 - for _, v := range stat.GetResults() { - if v.GetState() == datapb.CompactionTaskState_completed { - mu.Lock() - cnt++ - mu.Unlock() + s.Eventually(func() bool { + stat, err := s.node.GetCompactionState(s.ctx, nil) + s.Assert().NoError(err) + s.Assert().Equal(2, len(stat.GetResults())) + doneCnt := 0 + failCnt := 0 + for _, res := range stat.GetResults() { + if res.GetState() == datapb.CompactionTaskState_completed { + doneCnt++ + } + if res.GetState() == datapb.CompactionTaskState_failed { + failCnt++ + } } - } - mu.Lock() - s.Assert().Equal(2, cnt) - mu.Unlock() - - s.Assert().Equal(1, s.node.compactionExecutor.completed.Len()) + return doneCnt == 1 && failCnt == 1 + }, 5*time.Second, 10*time.Millisecond) }) s.Run("unhealthy", func() {