From bb2de0d964f30c934a044b2c535dfdcf3b327377 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Mon, 4 Mar 2024 16:52:59 +0800 Subject: [PATCH] fix: [cherry-pick] Clear DN unknown compaction tasks (#30972) If DC restarted, those unkonwn compaction tasks will never get call back in DN, so that the segments in the compaction task will be locked, unable to sync and compaction again, blocking cp advance and compaction executing. See also: #30137 pr: #30850 Signed-off-by: yangxuan --- internal/datacoord/compaction.go | 109 ++++++++++++++++---------- internal/datacoord/compaction_test.go | 1 + internal/datacoord/session_manager.go | 13 +-- internal/datanode/services.go | 10 ++- internal/datanode/services_test.go | 19 +++-- 5 files changed, 93 insertions(+), 59 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 3fbb31c176..099cde2786 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -23,6 +23,7 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -30,6 +31,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/tsoutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // TODO this num should be determined by resources of datanode, for now, we set to a fixed value for simple @@ -336,67 +338,90 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { // for DC might add new task while GetCompactionState. executingTasks := c.getTasksByState(executing) timeoutTasks := c.getTasksByState(timeout) - planStates := c.sessions.GetCompactionState() + planStates := c.sessions.GetCompactionPlanResults() + cachedPlans := []int64{} c.mu.Lock() defer c.mu.Unlock() for _, task := range executingTasks { - stateResult, ok := planStates[task.plan.PlanID] - state := stateResult.GetState() + log := log.With(zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID)) + // stateResult, ok := planStates[task.plan.PlanID] + // state := stateResult.GetState() planID := task.plan.PlanID - // check whether the state of CompactionPlan is working - if ok { - if state == commonpb.CompactionState_Completed { - log.Info("compaction completed", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID)) - err := c.completeCompaction(stateResult.GetResult()) - if err != nil { - log.Warn("fail to complete compaction", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID), zap.Error(err)) - } - continue - } - // check wether the CompactionPlan is timeout - if state == commonpb.CompactionState_Executing && !c.isTimeout(ts, task.plan.GetStartTime(), task.plan.GetTimeoutInSeconds()) { - continue - } - log.Warn("compaction timeout", - zap.Int64("planID", task.plan.PlanID), - zap.Int64("nodeID", task.dataNodeID), - zap.Uint64("startTime", task.plan.GetStartTime()), - zap.Uint64("now", ts), - ) - c.plans[planID] = c.plans[planID].shadowClone(setState(timeout)) - continue - } + cachedPlans = append(cachedPlans, planID) + if nodePlan, ok := planStates[planID]; ok { + planResult := nodePlan.B - log.Info("compaction failed", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID)) - c.plans[planID] = c.plans[planID].shadowClone(setState(failed)) - c.setSegmentsCompacting(task.plan, false) - c.executingTaskNum-- - c.releaseQueue(task.dataNodeID) + switch planResult.GetState() { + case commonpb.CompactionState_Completed: + log.Info("start to complete compaction") + if err := c.completeCompaction(planResult.GetResult()); err != nil { + log.Warn("fail to complete compaction", zap.Error(err)) + } + case commonpb.CompactionState_Executing: + if c.isTimeout(ts, task.plan.GetStartTime(), task.plan.GetTimeoutInSeconds()) { + log.Warn("compaction timeout", + zap.Int32("timeout in seconds", task.plan.GetTimeoutInSeconds()), + zap.Uint64("startTime", task.plan.GetStartTime()), + zap.Uint64("now", ts), + ) + c.plans[planID] = c.plans[planID].shadowClone(setState(timeout)) + } + } + } else { + log.Info("compaction failed") + c.plans[planID] = c.plans[planID].shadowClone(setState(failed)) + c.setSegmentsCompacting(task.plan, false) + c.executingTaskNum-- + c.releaseQueue(task.dataNodeID) + } } // Timeout tasks will be timeout and failed in DataNode // need to wait for DataNode reporting failure and // clean the status. for _, task := range timeoutTasks { - stateResult, ok := planStates[task.plan.PlanID] - planID := task.plan.PlanID + log := log.With(zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID)) - if !ok { - log.Info("compaction failed for timeout", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID)) + planID := task.plan.PlanID + cachedPlans = append(cachedPlans, planID) + if nodePlan, ok := planStates[task.plan.PlanID]; ok { + if nodePlan.B.GetState() == commonpb.CompactionState_Executing { + log.RatedInfo(1, "compaction timeout in DataCoord yet DataNode is still running") + } + } else { + // compaction task in DC but not found in DN means the compactino plan has failed + log.Info("compaction failed for timeout") c.plans[planID] = c.plans[planID].shadowClone(setState(failed)) c.setSegmentsCompacting(task.plan, false) c.executingTaskNum-- c.releaseQueue(task.dataNodeID) } + } - // DataNode will check if plan's are timeout but not as sensitive as DataCoord, - // just wait another round. - if ok && stateResult.GetState() == commonpb.CompactionState_Executing { - log.Info("compaction timeout in DataCoord yet DataNode is still running", - zap.Int64("planID", planID), - zap.Int64("nodeID", task.dataNodeID)) - continue + // Compaction plans in DN but not in DC are unknown plans, need to notify DN to clear it. + // No locks needed, because no changes in DC memeory + completedPlans := lo.PickBy(planStates, func(planID int64, planState *typeutil.Pair[int64, *datapb.CompactionStateResult]) bool { + return planState.B.GetState() == commonpb.CompactionState_Completed + }) + + unkonwnPlansInWorker, _ := lo.Difference(lo.Keys(completedPlans), cachedPlans) + for _, planID := range unkonwnPlansInWorker { + if nodeUnKnownPlan, ok := completedPlans[planID]; ok { + nodeID := nodeUnKnownPlan.A + log := log.With(zap.Int64("planID", planID), zap.Int64("nodeID", nodeID)) + + // Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task + // without changing the meta + req := &datapb.SyncSegmentsRequest{ + PlanID: planID, + } + + log.Info("compaction syncing unknown plan with node") + if err := c.sessions.SyncSegments(nodeID, req); err != nil { + log.Warn("compaction failed to sync segments with node", zap.Error(err)) + return err + } } } return nil diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 82242a739b..52d014cbf3 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -795,6 +795,7 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) { {PlanID: 3, State: commonpb.CompactionState_Completed, Result: &datapb.CompactionResult{PlanID: 3}}, {PlanID: 4, State: commonpb.CompactionState_Executing}, {PlanID: 6, State: commonpb.CompactionState_Executing}, + {PlanID: 7, State: commonpb.CompactionState_Completed, Result: &datapb.CompactionResult{PlanID: 7}}, }, }, }}, diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index a40250f9d7..2627e91854 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -58,7 +58,7 @@ type SessionManager interface { SyncSegments(nodeID int64, req *datapb.SyncSegmentsRequest) error Import(ctx context.Context, nodeID int64, itr *datapb.ImportTaskRequest) AddImportSegment(ctx context.Context, nodeID int64, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error) - GetCompactionState() map[int64]*datapb.CompactionStateResult + GetCompactionPlanResults() map[int64]*typeutil.Pair[int64, *datapb.CompactionStateResult] CheckHealth(ctx context.Context) error Close() } @@ -260,11 +260,11 @@ func (c *SessionManagerImpl) execImport(ctx context.Context, nodeID int64, itr * log.Info("success to import", zap.Int64("node", nodeID), zap.Any("import task", itr)) } -func (c *SessionManagerImpl) GetCompactionState() map[int64]*datapb.CompactionStateResult { +func (c *SessionManagerImpl) GetCompactionPlanResults() map[int64]*typeutil.Pair[int64, *datapb.CompactionStateResult] { wg := sync.WaitGroup{} ctx := context.Background() - plans := typeutil.NewConcurrentMap[int64, *datapb.CompactionStateResult]() + plans := typeutil.NewConcurrentMap[int64, *typeutil.Pair[int64, *datapb.CompactionStateResult]]() c.sessions.RLock() for nodeID, s := range c.sessions.data { wg.Add(1) @@ -293,15 +293,16 @@ func (c *SessionManagerImpl) GetCompactionState() map[int64]*datapb.CompactionSt return } for _, rst := range resp.GetResults() { - plans.Insert(rst.PlanID, rst) + nodeRst := typeutil.NewPair(nodeID, rst) + plans.Insert(rst.PlanID, &nodeRst) } }(nodeID, s) } c.sessions.RUnlock() wg.Wait() - rst := make(map[int64]*datapb.CompactionStateResult) - plans.Range(func(planID int64, result *datapb.CompactionStateResult) bool { + rst := make(map[int64]*typeutil.Pair[int64, *datapb.CompactionStateResult]) + plans.Range(func(planID int64, result *typeutil.Pair[int64, *datapb.CompactionStateResult]) bool { rst[planID] = result return true }) diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 1a568ee360..32a6e29d85 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -341,20 +341,22 @@ func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.Compac func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegmentsRequest) (*commonpb.Status, error) { log := log.Ctx(ctx).With( zap.Int64("planID", req.GetPlanID()), + zap.Int64("nodeID", paramtable.GetNodeID()), zap.Int64("targetSegmentID", req.GetCompactedTo()), zap.Int64s("compactedFrom", req.GetCompactedFrom()), - ) - log.Info("DataNode receives SyncSegments", zap.Int64("numOfRows", req.GetNumOfRows()), ) + log.Info("DataNode receives SyncSegments") if err := merr.CheckHealthy(node.GetStateCode()); err != nil { - log.Warn("DataNode.SyncSegments failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err)) + log.Warn("DataNode.SyncSegments failed", zap.Error(err)) return merr.Status(err), nil } if len(req.GetCompactedFrom()) <= 0 { - return merr.Status(merr.WrapErrParameterInvalid(">0", "0", "compacted from segments shouldn't be empty")), nil + log.Info("SyncSegments with empty compactedFrom, clearing the plan") + node.compactionExecutor.injectDone(req.GetPlanID(), false) + return merr.Success(), nil } var ( diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index c81cf02245..ac07f5cd41 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -760,19 +760,24 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { s3.segmentID: &s3, } - s.Run("invalid compacted from", func() { + s.Run("empty compactedFrom", func() { req := &datapb.SyncSegmentsRequest{ CompactedTo: 400, NumOfRows: 100, } - - req.CompactedFrom = []UniqueID{} status, err := s.node.SyncSegments(s.ctx, req) - s.Assert().NoError(err) - s.Assert().False(merr.Ok(status)) + s.NoError(err) + s.True(merr.Ok(status)) + }) - req.CompactedFrom = []UniqueID{101, 201} - status, err = s.node.SyncSegments(s.ctx, req) + s.Run("invalid compacted from", func() { + req := &datapb.SyncSegmentsRequest{ + CompactedTo: 400, + NumOfRows: 100, + CompactedFrom: []UniqueID{101, 201}, + } + + status, err := s.node.SyncSegments(s.ctx, req) s.Assert().NoError(err) s.Assert().True(merr.Ok(status)) })