From 321c5c32e38df73c881d3ac8fbe91fb2cacff758 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Wed, 29 Nov 2023 10:50:29 +0800 Subject: [PATCH] fix: Separate schedule and check results loop (#28692) This PR: - Separates compaction scheduler and check results loop So that slow in check-loop doesn't influence execution. - Cleans compaction tasks when drop a vchannel so dropped-channel's compaction tasks won't be checked over and over again. - Skips meta change when meta's already changed, avoid panic - Remove not inuse injectDone(bool) parameter See also: #28628, #28209 --------- Signed-off-by: yangxuan --- internal/datacoord/compaction.go | 112 ++++++++++++------ internal/datacoord/compaction_scheduler.go | 2 +- internal/datacoord/compaction_trigger_test.go | 4 + .../datacoord/mock_compaction_plan_context.go | 33 ++++++ internal/datacoord/mock_test.go | 84 ------------- internal/datacoord/server_test.go | 78 +++++------- internal/datacoord/services.go | 1 + internal/datanode/compaction_executor.go | 28 +++-- internal/datanode/compaction_executor_test.go | 9 +- internal/datanode/compactor.go | 6 +- internal/datanode/flow_graph_dd_node.go | 2 +- internal/datanode/services.go | 7 +- 12 files changed, 173 insertions(+), 193 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 0a814c691e..97810d2521 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -52,6 +52,7 @@ type compactionPlanContext interface { isFull() bool // get compaction tasks by signal id getCompactionTasksBySignalID(signalID int64) []*compactionTask + removeTasksByChannel(channel string) } type compactionTaskState int8 @@ -93,15 +94,18 @@ func (t *compactionTask) shadowClone(opts ...compactionTaskOpt) *compactionTask var _ compactionPlanContext = (*compactionPlanHandler)(nil) type compactionPlanHandler struct { - plans map[int64]*compactionTask // planID -> task - sessions *SessionManager + mu sync.RWMutex + plans map[int64]*compactionTask // planID -> task + meta *meta - chManager *ChannelManager - mu sync.RWMutex allocator allocator - quit chan struct{} - wg sync.WaitGroup + chManager *ChannelManager + sessions *SessionManager scheduler *CompactionScheduler + + stopCh chan struct{} + stopOnce sync.Once + stopWg sync.WaitGroup } func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta *meta, allocator allocator, @@ -118,20 +122,18 @@ func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta func (c *compactionPlanHandler) start() { interval := Params.DataCoordCfg.CompactionCheckIntervalInSeconds.GetAsDuration(time.Second) - c.quit = make(chan struct{}) - c.wg.Add(1) + c.stopCh = make(chan struct{}) + c.stopWg.Add(2) go func() { - defer c.wg.Done() + defer c.stopWg.Done() checkResultTicker := time.NewTicker(interval) - scheduleTicker := time.NewTicker(200 * time.Millisecond) - log.Info("compaction handler start", zap.Any("check result interval", interval)) + log.Info("Compaction handler check result loop start", zap.Any("check result interval", interval)) defer checkResultTicker.Stop() - defer scheduleTicker.Stop() for { select { - case <-c.quit: - log.Info("compaction handler quit") + case <-c.stopCh: + log.Info("compaction handler check result loop quit") return case <-checkResultTicker.C: // deal results @@ -144,6 +146,22 @@ func (c *compactionPlanHandler) start() { } cancel() _ = c.updateCompaction(ts) + } + } + }() + + // saperate check results and schedule goroutine so that check results doesn't + // influence the schedule + go func() { + defer c.stopWg.Done() + scheduleTicker := time.NewTicker(200 * time.Millisecond) + defer scheduleTicker.Stop() + log.Info("compaction handler start schedule") + for { + select { + case <-c.stopCh: + log.Info("Compaction handler quit schedule") + return case <-scheduleTicker.C: // schedule queuing tasks @@ -159,8 +177,21 @@ func (c *compactionPlanHandler) start() { } func (c *compactionPlanHandler) stop() { - close(c.quit) - c.wg.Wait() + c.stopOnce.Do(func() { + close(c.stopCh) + }) + c.stopWg.Wait() +} + +func (c *compactionPlanHandler) removeTasksByChannel(channel string) { + c.mu.Lock() + defer c.mu.Unlock() + for id, task := range c.plans { + if task.triggerInfo.channel == channel { + c.scheduler.finish(task.dataNodeID, task.plan.PlanID) + delete(c.plans, id) + } + } } func (c *compactionPlanHandler) updateTask(planID int64, opts ...compactionTaskOpt) { @@ -298,38 +329,53 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionPlan } func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) error { - // Also prepare metric updates. - _, modSegments, newSegment, metricMutation, err := c.meta.PrepareCompleteCompactionMutation(plan, result) - if err != nil { - return err - } log := log.With(zap.Int64("planID", plan.GetPlanID())) + if len(result.GetSegments()) == 0 || len(result.GetSegments()) > 1 { + // should never happen + log.Warn("illegal compaction results") + return fmt.Errorf("Illegal compaction results: %v", result) + } - if err := c.meta.alterMetaStoreAfterCompaction(newSegment, modSegments); err != nil { - log.Warn("fail to alert meta store", zap.Error(err)) - return err + // Merge compaction has one and only one segment + newSegmentInfo := c.meta.GetHealthySegment(result.GetSegments()[0].SegmentID) + if newSegmentInfo != nil { + log.Info("meta has already been changed, skip meta change and retry sync segments") + } else { + // Also prepare metric updates. + _, modSegments, newSegment, metricMutation, err := c.meta.PrepareCompleteCompactionMutation(plan, result) + if err != nil { + return err + } + + if err := c.meta.alterMetaStoreAfterCompaction(newSegment, modSegments); err != nil { + log.Warn("fail to alert meta store", zap.Error(err)) + return err + } + + // Apply metrics after successful meta update. + metricMutation.commit() + + newSegmentInfo = newSegment } nodeID := c.plans[plan.GetPlanID()].dataNodeID req := &datapb.SyncSegmentsRequest{ PlanID: plan.PlanID, - CompactedTo: newSegment.GetID(), - CompactedFrom: newSegment.GetCompactionFrom(), - NumOfRows: newSegment.GetNumOfRows(), - StatsLogs: newSegment.GetStatslogs(), + CompactedTo: newSegmentInfo.GetID(), + CompactedFrom: newSegmentInfo.GetCompactionFrom(), + NumOfRows: newSegmentInfo.GetNumOfRows(), + StatsLogs: newSegmentInfo.GetStatslogs(), ChannelName: plan.GetChannel(), - PartitionId: newSegment.GetPartitionID(), - CollectionId: newSegment.GetCollectionID(), + PartitionId: newSegmentInfo.GetPartitionID(), + CollectionId: newSegmentInfo.GetCollectionID(), } log.Info("handleCompactionResult: syncing segments with node", zap.Int64("nodeID", nodeID)) if err := c.sessions.SyncSegments(nodeID, req); err != nil { - log.Warn("handleCompactionResult: fail to sync segments with node, reverting metastore", + log.Warn("handleCompactionResult: fail to sync segments with node", zap.Int64("nodeID", nodeID), zap.Error(err)) return err } - // Apply metrics after successful meta update. - metricMutation.commit() log.Info("handleCompactionResult: success to handle merge compaction result") return nil diff --git a/internal/datacoord/compaction_scheduler.go b/internal/datacoord/compaction_scheduler.go index 2fade05264..c6f35e7ccf 100644 --- a/internal/datacoord/compaction_scheduler.go +++ b/internal/datacoord/compaction_scheduler.go @@ -24,7 +24,7 @@ type Scheduler interface { type CompactionScheduler struct { taskNumber *atomic.Int32 queuingTasks []*compactionTask - parallelTasks map[int64][]*compactionTask + parallelTasks map[int64][]*compactionTask // parallel by nodeID mu sync.RWMutex planHandler *compactionPlanHandler diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 99051242a3..bb5feb069e 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -38,6 +38,10 @@ type spyCompactionHandler struct { spyChan chan *datapb.CompactionPlan } +var _ compactionPlanContext = (*spyCompactionHandler)(nil) + +func (h *spyCompactionHandler) removeTasksByChannel(channel string) {} + // execCompactionPlan start to execute plan and return immediately func (h *spyCompactionHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error { h.spyChan <- plan diff --git a/internal/datacoord/mock_compaction_plan_context.go b/internal/datacoord/mock_compaction_plan_context.go index e217ac9f17..b22041fb7f 100644 --- a/internal/datacoord/mock_compaction_plan_context.go +++ b/internal/datacoord/mock_compaction_plan_context.go @@ -192,6 +192,39 @@ func (_c *MockCompactionPlanContext_isFull_Call) RunAndReturn(run func() bool) * return _c } +// removeTasksByChannel provides a mock function with given fields: channel +func (_m *MockCompactionPlanContext) removeTasksByChannel(channel string) { + _m.Called(channel) +} + +// MockCompactionPlanContext_removeTasksByChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'removeTasksByChannel' +type MockCompactionPlanContext_removeTasksByChannel_Call struct { + *mock.Call +} + +// removeTasksByChannel is a helper method to define mock.On call +// - channel string +func (_e *MockCompactionPlanContext_Expecter) removeTasksByChannel(channel interface{}) *MockCompactionPlanContext_removeTasksByChannel_Call { + return &MockCompactionPlanContext_removeTasksByChannel_Call{Call: _e.mock.On("removeTasksByChannel", channel)} +} + +func (_c *MockCompactionPlanContext_removeTasksByChannel_Call) Run(run func(channel string)) *MockCompactionPlanContext_removeTasksByChannel_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *MockCompactionPlanContext_removeTasksByChannel_Call) Return() *MockCompactionPlanContext_removeTasksByChannel_Call { + _c.Call.Return() + return _c +} + +func (_c *MockCompactionPlanContext_removeTasksByChannel_Call) RunAndReturn(run func(string)) *MockCompactionPlanContext_removeTasksByChannel_Call { + _c.Call.Return(run) + return _c +} + // start provides a mock function with given fields: func (_m *MockCompactionPlanContext) start() { _m.Called() diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 3a0a932bdb..685e49ff6e 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -572,90 +572,6 @@ func (m *mockRootCoordClient) ReportImport(ctx context.Context, req *rootcoordpb return merr.Success(), nil } -type mockCompactionHandler struct { - methods map[string]interface{} -} - -func (h *mockCompactionHandler) start() { - if f, ok := h.methods["start"]; ok { - if ff, ok := f.(func()); ok { - ff() - return - } - } - panic("not implemented") -} - -func (h *mockCompactionHandler) stop() { - if f, ok := h.methods["stop"]; ok { - if ff, ok := f.(func()); ok { - ff() - return - } - } - panic("not implemented") -} - -// execCompactionPlan start to execute plan and return immediately -func (h *mockCompactionHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error { - if f, ok := h.methods["execCompactionPlan"]; ok { - if ff, ok := f.(func(signal *compactionSignal, plan *datapb.CompactionPlan) error); ok { - return ff(signal, plan) - } - } - panic("not implemented") -} - -// // completeCompaction record the result of a compaction -// func (h *mockCompactionHandler) completeCompaction(result *datapb.CompactionResult) error { -// if f, ok := h.methods["completeCompaction"]; ok { -// if ff, ok := f.(func(result *datapb.CompactionResult) error); ok { -// return ff(result) -// } -// } -// panic("not implemented") -// } - -// getCompaction return compaction task. If planId does not exist, return nil. -func (h *mockCompactionHandler) getCompaction(planID int64) *compactionTask { - if f, ok := h.methods["getCompaction"]; ok { - if ff, ok := f.(func(planID int64) *compactionTask); ok { - return ff(planID) - } - } - panic("not implemented") -} - -// expireCompaction set the compaction state to expired -func (h *mockCompactionHandler) updateCompaction(ts Timestamp) error { - if f, ok := h.methods["expireCompaction"]; ok { - if ff, ok := f.(func(ts Timestamp) error); ok { - return ff(ts) - } - } - panic("not implemented") -} - -// isFull return true if the task pool is full -func (h *mockCompactionHandler) isFull() bool { - if f, ok := h.methods["isFull"]; ok { - if ff, ok := f.(func() bool); ok { - return ff() - } - } - panic("not implemented") -} - -// get compaction tasks by signal id -func (h *mockCompactionHandler) getCompactionTasksBySignalID(signalID int64) []*compactionTask { - if f, ok := h.methods["getCompactionTasksBySignalID"]; ok { - if ff, ok := f.(func(signalID int64) []*compactionTask); ok { - return ff(signalID) - } - } - panic("not implemented") -} - type mockCompactionTrigger struct { methods map[string]interface{} } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 37f306aa37..0315641611 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -3063,15 +3063,10 @@ func TestGetCompactionState(t *testing.T) { svr := &Server{} svr.stateCode.Store(commonpb.StateCode_Healthy) - svr.compactionHandler = &mockCompactionHandler{ - methods: map[string]interface{}{ - "getCompactionTasksBySignalID": func(signalID int64) []*compactionTask { - return []*compactionTask{ - {state: completed}, - } - }, - }, - } + mockHandler := NewMockCompactionPlanContext(t) + mockHandler.EXPECT().getCompactionTasksBySignalID(mock.Anything).Return( + []*compactionTask{{state: completed}}) + svr.compactionHandler = mockHandler resp, err := svr.GetCompactionState(context.Background(), &milvuspb.GetCompactionStateRequest{}) assert.NoError(t, err) @@ -3082,24 +3077,21 @@ func TestGetCompactionState(t *testing.T) { svr := &Server{} svr.stateCode.Store(commonpb.StateCode_Healthy) - svr.compactionHandler = &mockCompactionHandler{ - methods: map[string]interface{}{ - "getCompactionTasksBySignalID": func(signalID int64) []*compactionTask { - return []*compactionTask{ - {state: executing}, - {state: executing}, - {state: executing}, - {state: completed}, - {state: completed}, - {state: failed, plan: &datapb.CompactionPlan{PlanID: 1}}, - {state: timeout, plan: &datapb.CompactionPlan{PlanID: 2}}, - {state: timeout}, - {state: timeout}, - {state: timeout}, - } - }, - }, - } + mockHandler := NewMockCompactionPlanContext(t) + mockHandler.EXPECT().getCompactionTasksBySignalID(mock.Anything).Return( + []*compactionTask{ + {state: executing}, + {state: executing}, + {state: executing}, + {state: completed}, + {state: completed}, + {state: failed, plan: &datapb.CompactionPlan{PlanID: 1}}, + {state: timeout, plan: &datapb.CompactionPlan{PlanID: 2}}, + {state: timeout}, + {state: timeout}, + {state: timeout}, + }) + svr.compactionHandler = mockHandler resp, err := svr.GetCompactionState(context.Background(), &milvuspb.GetCompactionStateRequest{CompactionID: 1}) assert.NoError(t, err) @@ -3187,18 +3179,15 @@ func TestGetCompactionStateWithPlans(t *testing.T) { svr := &Server{} svr.stateCode.Store(commonpb.StateCode_Healthy) - svr.compactionHandler = &mockCompactionHandler{ - methods: map[string]interface{}{ - "getCompactionTasksBySignalID": func(signalID int64) []*compactionTask { - return []*compactionTask{ - { - triggerInfo: &compactionSignal{id: 1}, - state: executing, - }, - } + mockHandler := NewMockCompactionPlanContext(t) + mockHandler.EXPECT().getCompactionTasksBySignalID(mock.Anything).Return( + []*compactionTask{ + { + triggerInfo: &compactionSignal{id: 1}, + state: executing, }, - }, - } + }) + svr.compactionHandler = mockHandler resp, err := svr.GetCompactionStateWithPlans(context.TODO(), &milvuspb.GetCompactionPlansRequest{ CompactionID: 1, @@ -3211,19 +3200,6 @@ func TestGetCompactionStateWithPlans(t *testing.T) { t.Run("test get compaction state with closed server", func(t *testing.T) { svr := &Server{} svr.stateCode.Store(commonpb.StateCode_Abnormal) - svr.compactionHandler = &mockCompactionHandler{ - methods: map[string]interface{}{ - "getCompactionTasksBySignalID": func(signalID int64) []*compactionTask { - return []*compactionTask{ - { - triggerInfo: &compactionSignal{id: 1}, - state: executing, - }, - } - }, - }, - } - resp, err := svr.GetCompactionStateWithPlans(context.TODO(), &milvuspb.GetCompactionPlansRequest{ CompactionID: 1, }) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 3f0d743fd6..9b2566dbf6 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -589,6 +589,7 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual log.Warn("DropVChannel failed to ReleaseAndRemove", zap.String("channel", channel), zap.Error(err)) } s.segmentManager.DropSegmentsOfChannel(ctx, channel) + s.compactionHandler.removeTasksByChannel(channel) metrics.CleanupDataCoordNumStoredRows(collectionID) metrics.DataCoordCheckpointLag.DeleteLabelValues(fmt.Sprint(paramtable.GetNodeID()), channel) diff --git a/internal/datanode/compaction_executor.go b/internal/datanode/compaction_executor.go index ed36a5d93b..b70bd2988b 100644 --- a/internal/datanode/compaction_executor.go +++ b/internal/datanode/compaction_executor.go @@ -63,11 +63,11 @@ func (c *compactionExecutor) toCompleteState(task compactor) { c.executing.GetAndRemove(task.getPlanID()) } -func (c *compactionExecutor) injectDone(planID UniqueID, success bool) { +func (c *compactionExecutor) injectDone(planID UniqueID) { c.completed.GetAndRemove(planID) task, loaded := c.completedCompactor.GetAndRemove(planID) if loaded { - task.injectDone(success) + task.injectDone() } } @@ -116,34 +116,37 @@ func (c *compactionExecutor) stopTask(planID UniqueID) { } } -func (c *compactionExecutor) channelValidateForCompaction(vChannelName string) bool { +func (c *compactionExecutor) isValidChannel(channel string) bool { // if vchannel marked dropped, compaction should not proceed - return !c.dropped.Contain(vChannelName) + return !c.dropped.Contain(channel) } -func (c *compactionExecutor) stopExecutingtaskByVChannelName(vChannelName string) { - c.dropped.Insert(vChannelName) +func (c *compactionExecutor) clearTasksByChannel(channel string) { + c.dropped.Insert(channel) + + // stop executing tasks of channel c.executing.Range(func(planID int64, task compactor) bool { - if task.getChannelName() == vChannelName { + if task.getChannelName() == channel { c.stopTask(planID) } return true }) - // remove all completed plans for vChannelName + // remove all completed plans of channel c.completed.Range(func(planID int64, result *datapb.CompactionPlanResult) bool { - if result.GetChannel() == vChannelName { - c.injectDone(planID, true) + if result.GetChannel() == channel { + c.injectDone(planID) log.Info("remove compaction results for dropped channel", - zap.String("channel", vChannelName), + zap.String("channel", channel), zap.Int64("planID", planID)) } return true }) } -func (c *compactionExecutor) getAllCompactionPlanResult() []*datapb.CompactionPlanResult { +func (c *compactionExecutor) getAllCompactionResults() []*datapb.CompactionPlanResult { results := make([]*datapb.CompactionPlanResult, 0) + // get executing results c.executing.Range(func(planID int64, task compactor) bool { results = append(results, &datapb.CompactionPlanResult{ State: commonpb.CompactionState_Executing, @@ -152,6 +155,7 @@ func (c *compactionExecutor) getAllCompactionPlanResult() []*datapb.CompactionPl return true }) + // get completed results c.completed.Range(func(planID int64, result *datapb.CompactionPlanResult) bool { results = append(results, result) return true diff --git a/internal/datanode/compaction_executor_test.go b/internal/datanode/compaction_executor_test.go index cbf7b674a1..c8055b8d6f 100644 --- a/internal/datanode/compaction_executor_test.go +++ b/internal/datanode/compaction_executor_test.go @@ -83,10 +83,10 @@ func TestCompactionExecutor(t *testing.T) { {expected: false, channel: "ch2", desc: "in dropped"}, } ex := newCompactionExecutor() - ex.stopExecutingtaskByVChannelName("ch2") + ex.clearTasksByChannel("ch2") for _, test := range tests { t.Run(test.desc, func(t *testing.T) { - assert.Equal(t, test.expected, ex.channelValidateForCompaction(test.channel)) + assert.Equal(t, test.expected, ex.isValidChannel(test.channel)) }) } }) @@ -107,7 +107,7 @@ func TestCompactionExecutor(t *testing.T) { found = ex.executing.Contain(mc.getPlanID()) } - ex.stopExecutingtaskByVChannelName("mock") + ex.clearTasksByChannel("mock") select { case <-mc.ctx.Done(): @@ -142,8 +142,7 @@ func (mc *mockCompactor) complete() { mc.done <- struct{}{} } -func (mc *mockCompactor) injectDone(success bool) { -} +func (mc *mockCompactor) injectDone() {} func (mc *mockCompactor) compact() (*datapb.CompactionPlanResult, error) { if !mc.isvalid { diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index a7295dbc2f..ab137c9b05 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -56,7 +56,7 @@ type compactor interface { complete() // compact() (*datapb.CompactionResult, error) compact() (*datapb.CompactionPlanResult, error) - injectDone(success bool) + injectDone() stop() getPlanID() UniqueID getCollection() UniqueID @@ -118,7 +118,7 @@ func (t *compactionTask) complete() { func (t *compactionTask) stop() { t.cancel() <-t.done - t.injectDone(true) + t.injectDone() } func (t *compactionTask) getPlanID() UniqueID { @@ -629,7 +629,7 @@ func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) { return planResult, nil } -func (t *compactionTask) injectDone(success bool) { +func (t *compactionTask) injectDone() { for _, binlog := range t.plan.SegmentBinlogs { t.syncMgr.Unblock(binlog.SegmentID) } diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 6c8aac66c4..b79ea60cee 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -154,7 +154,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { ddn.dropMode.Store(true) log.Info("Stop compaction of vChannel", zap.String("vChannelName", ddn.vChannelName)) - ddn.compactionExecutor.stopExecutingtaskByVChannelName(ddn.vChannelName) + ddn.compactionExecutor.clearTasksByChannel(ddn.vChannelName) fgMsg.dropCollection = true pChan := funcutil.ToPhysicalChannel(ddn.vChannelName) diff --git a/internal/datanode/services.go b/internal/datanode/services.go index c27e0cbdfc..a048ccb6b4 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -245,7 +245,7 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "illegel compaction plan")), nil } - if !node.compactionExecutor.channelValidateForCompaction(req.GetChannel()) { + if !node.compactionExecutor.isValidChannel(req.GetChannel()) { log.Warn("channel of compaction is marked invalid in compaction executor", zap.String("channelName", req.GetChannel())) return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "channel is dropping")), nil } @@ -275,7 +275,7 @@ func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.Compac Status: merr.Status(err), }, nil } - results := node.compactionExecutor.getAllCompactionPlanResult() + results := node.compactionExecutor.getAllCompactionResults() if len(results) > 0 { planIDs := lo.Map(results, func(result *datapb.CompactionPlanResult, i int) UniqueID { @@ -310,6 +310,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments ds, ok := node.flowgraphManager.getFlowgraphService(req.GetChannelName()) if !ok { + node.compactionExecutor.clearTasksByChannel(req.GetChannelName()) err := merr.WrapErrChannelNotFound(req.GetChannelName()) log.Warn("failed to sync segments", zap.Error(err)) return merr.Status(err), nil @@ -322,7 +323,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments } bfs := metacache.NewBloomFilterSet(pks...) ds.metacache.CompactSegments(req.GetCompactedTo(), req.GetPartitionId(), req.GetNumOfRows(), bfs, req.GetCompactedFrom()...) - node.compactionExecutor.injectDone(req.GetPlanID(), true) + node.compactionExecutor.injectDone(req.GetPlanID()) return merr.Success(), nil }