From 055dd7ea1d3aeabfdc59db43bce7e890bd5a4852 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Fri, 29 Mar 2024 10:53:12 +0800 Subject: [PATCH] fix: Clear compaction tasks when release channel (#31694) See also: #31648 pr: #31666 Signed-off-by: yangxuan --- internal/datanode/compaction_executor.go | 69 +++++++++++++++++-- internal/datanode/compaction_executor_test.go | 4 +- internal/datanode/data_node.go | 9 +-- internal/datanode/flow_graph_dd_node.go | 2 +- internal/datanode/services.go | 25 +------ 5 files changed, 71 insertions(+), 38 deletions(-) diff --git a/internal/datanode/compaction_executor.go b/internal/datanode/compaction_executor.go index 272edef586..5a25187223 100644 --- a/internal/datanode/compaction_executor.go +++ b/internal/datanode/compaction_executor.go @@ -18,9 +18,11 @@ package datanode import ( "context" + "sync" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -36,6 +38,10 @@ type compactionExecutor struct { completed *typeutil.ConcurrentMap[int64, *datapb.CompactionResult] // planID to CompactionResult taskCh chan compactor dropped *typeutil.ConcurrentSet[string] // vchannel dropped + + // To prevent concurrency of release channel and compaction get results + // all released channel's compaction tasks will be discarded + resultGuard sync.RWMutex } func newCompactionExecutor() *compactionExecutor { @@ -120,22 +126,71 @@ func (c *compactionExecutor) channelValidateForCompaction(vChannelName string) b return !c.dropped.Contain(vChannelName) } -func (c *compactionExecutor) stopExecutingtaskByVChannelName(vChannelName string) { - c.dropped.Insert(vChannelName) +func (c *compactionExecutor) discardByDroppedChannel(channel string) { + c.dropped.Insert(channel) + c.discardPlan(channel) +} + +func (c *compactionExecutor) discardPlan(channel string) { + c.resultGuard.Lock() + defer c.resultGuard.Unlock() + c.executing.Range(func(planID int64, task compactor) bool { - if task.getChannelName() == vChannelName { + if task.getChannelName() == channel { c.stopTask(planID) } return true }) - // remove all completed plans for vChannelName + + // remove all completed plans for channel c.completed.Range(func(planID int64, result *datapb.CompactionResult) bool { - if result.GetChannel() == vChannelName { + if result.GetChannel() == channel { c.injectDone(planID, true) - log.Info("remove compaction results for dropped channel", - zap.String("channel", vChannelName), + log.Info("remove compaction plan", + zap.String("channel", channel), zap.Int64("planID", planID)) } return true }) } + +func (c *compactionExecutor) getAllCompactionResults() []*datapb.CompactionStateResult { + c.resultGuard.RLock() + defer c.resultGuard.RUnlock() + + var ( + executing []int64 + completed []int64 + ) + results := make([]*datapb.CompactionStateResult, 0) + // get executing results + c.executing.Range(func(planID int64, task compactor) bool { + executing = append(executing, planID) + results = append(results, &datapb.CompactionStateResult{ + State: commonpb.CompactionState_Executing, + PlanID: planID, + }) + return true + }) + + // get completed results + c.completed.Range(func(planID int64, result *datapb.CompactionResult) bool { + completed = append(completed, planID) + results = append(results, &datapb.CompactionStateResult{ + State: commonpb.CompactionState_Completed, + PlanID: planID, + Result: result, + }) + + return true + }) + + if len(results) > 0 { + log.Info("DataNode Compaction results", + zap.Int64s("executing", executing), + zap.Int64s("completed", completed), + ) + } + + return results +} diff --git a/internal/datanode/compaction_executor_test.go b/internal/datanode/compaction_executor_test.go index 107eddcd16..bf1a9add4a 100644 --- a/internal/datanode/compaction_executor_test.go +++ b/internal/datanode/compaction_executor_test.go @@ -83,7 +83,7 @@ func TestCompactionExecutor(t *testing.T) { {expected: false, channel: "ch2", desc: "in dropped"}, } ex := newCompactionExecutor() - ex.stopExecutingtaskByVChannelName("ch2") + ex.discardByDroppedChannel("ch2") for _, test := range tests { t.Run(test.desc, func(t *testing.T) { assert.Equal(t, test.expected, ex.channelValidateForCompaction(test.channel)) @@ -107,7 +107,7 @@ func TestCompactionExecutor(t *testing.T) { found = ex.executing.Contain(mc.getPlanID()) } - ex.stopExecutingtaskByVChannelName("mock") + ex.discardByDroppedChannel("mock") select { case <-mc.ctx.Done(): diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 15a4a2f8c4..10d283dbd9 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -278,10 +278,11 @@ func (node *DataNode) handleChannelEvt(evt *clientv3.Event) { node.handleWatchInfo(e, string(evt.Kv.Key), evt.Kv.Value) } -// tryToReleaseFlowgraph tries to release a flowgraph -func (node *DataNode) tryToReleaseFlowgraph(vChanName string) { - log.Info("try to release flowgraph", zap.String("vChanName", vChanName)) - node.flowgraphManager.release(vChanName) +// tryToReleaseFlowgraph tries to release a flowgraph and tidy channel related meta +func (node *DataNode) tryToReleaseFlowgraph(channel string) { + log.Info("try to release flowgraph", zap.String("channel", channel)) + node.compactionExecutor.discardPlan(channel) + node.flowgraphManager.release(channel) } // BackGroundGC runs in background to release datanode resources diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index f110f3500a..0ab94d71fe 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -154,7 +154,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { ddn.dropMode.Store(true) log.Info("Stop compaction of vChannel", zap.String("vChannelName", ddn.vChannelName)) - ddn.compactionExecutor.stopExecutingtaskByVChannelName(ddn.vChannelName) + ddn.compactionExecutor.discardByDroppedChannel(ddn.vChannelName) fgMsg.dropCollection = true pChan := funcutil.ToPhysicalChannel(ddn.vChannelName) diff --git a/internal/datanode/services.go b/internal/datanode/services.go index d9f7ac9097..943da13d90 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -27,7 +27,6 @@ import ( "time" "github.com/cockroachdb/errors" - "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -308,29 +307,7 @@ func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.Compac Status: merr.Status(err), }, nil } - results := make([]*datapb.CompactionStateResult, 0) - node.compactionExecutor.executing.Range(func(planID int64, task compactor) bool { - results = append(results, &datapb.CompactionStateResult{ - State: commonpb.CompactionState_Executing, - PlanID: planID, - }) - return true - }) - node.compactionExecutor.completed.Range(func(planID int64, result *datapb.CompactionResult) bool { - results = append(results, &datapb.CompactionStateResult{ - State: commonpb.CompactionState_Completed, - PlanID: planID, - Result: result, - }) - return true - }) - - if len(results) > 0 { - planIDs := lo.Map(results, func(result *datapb.CompactionStateResult, i int) UniqueID { - return result.GetPlanID() - }) - log.Info("Compaction results", zap.Int64s("planIDs", planIDs)) - } + results := node.compactionExecutor.getAllCompactionResults() return &datapb.CompactionStateResponse{ Status: merr.Success(), Results: results,