diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 3fbb31c176..099cde2786 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -23,6 +23,7 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -30,6 +31,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/tsoutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // TODO this num should be determined by resources of datanode, for now, we set to a fixed value for simple @@ -336,67 +338,90 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { // for DC might add new task while GetCompactionState. executingTasks := c.getTasksByState(executing) timeoutTasks := c.getTasksByState(timeout) - planStates := c.sessions.GetCompactionState() + planStates := c.sessions.GetCompactionPlanResults() + cachedPlans := []int64{} c.mu.Lock() defer c.mu.Unlock() for _, task := range executingTasks { - stateResult, ok := planStates[task.plan.PlanID] - state := stateResult.GetState() + log := log.With(zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID)) + // stateResult, ok := planStates[task.plan.PlanID] + // state := stateResult.GetState() planID := task.plan.PlanID - // check whether the state of CompactionPlan is working - if ok { - if state == commonpb.CompactionState_Completed { - log.Info("compaction completed", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID)) - err := c.completeCompaction(stateResult.GetResult()) - if err != nil { - log.Warn("fail to complete compaction", zap.Int64("planID", planID), zap.Int64("nodeID", task.dataNodeID), zap.Error(err)) - } - continue - } - // check wether the CompactionPlan is timeout - if state == commonpb.CompactionState_Executing && !c.isTimeout(ts, task.plan.GetStartTime(), task.plan.GetTimeoutInSeconds()) { - continue - } - log.Warn("compaction timeout", - zap.Int64("planID", task.plan.PlanID), - zap.Int64("nodeID", task.dataNodeID), - zap.Uint64("startTime", task.plan.GetStartTime()), - zap.Uint64("now", ts), - ) - c.plans[planID] = c.plans[planID].shadowClone(setState(timeout)) - continue - } + cachedPlans = append(cachedPlans, planID) + if nodePlan, ok := planStates[planID]; ok { + planResult := nodePlan.B - log.Info("compaction failed", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID)) - c.plans[planID] = c.plans[planID].shadowClone(setState(failed)) - c.setSegmentsCompacting(task.plan, false) - c.executingTaskNum-- - c.releaseQueue(task.dataNodeID) + switch planResult.GetState() { + case commonpb.CompactionState_Completed: + log.Info("start to complete compaction") + if err := c.completeCompaction(planResult.GetResult()); err != nil { + log.Warn("fail to complete compaction", zap.Error(err)) + } + case commonpb.CompactionState_Executing: + if c.isTimeout(ts, task.plan.GetStartTime(), task.plan.GetTimeoutInSeconds()) { + log.Warn("compaction timeout", + zap.Int32("timeout in seconds", task.plan.GetTimeoutInSeconds()), + zap.Uint64("startTime", task.plan.GetStartTime()), + zap.Uint64("now", ts), + ) + c.plans[planID] = c.plans[planID].shadowClone(setState(timeout)) + } + } + } else { + log.Info("compaction failed") + c.plans[planID] = c.plans[planID].shadowClone(setState(failed)) + c.setSegmentsCompacting(task.plan, false) + c.executingTaskNum-- + c.releaseQueue(task.dataNodeID) + } } // Timeout tasks will be timeout and failed in DataNode // need to wait for DataNode reporting failure and // clean the status. for _, task := range timeoutTasks { - stateResult, ok := planStates[task.plan.PlanID] - planID := task.plan.PlanID + log := log.With(zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID)) - if !ok { - log.Info("compaction failed for timeout", zap.Int64("planID", task.plan.PlanID), zap.Int64("nodeID", task.dataNodeID)) + planID := task.plan.PlanID + cachedPlans = append(cachedPlans, planID) + if nodePlan, ok := planStates[task.plan.PlanID]; ok { + if nodePlan.B.GetState() == commonpb.CompactionState_Executing { + log.RatedInfo(1, "compaction timeout in DataCoord yet DataNode is still running") + } + } else { + // compaction task in DC but not found in DN means the compactino plan has failed + log.Info("compaction failed for timeout") c.plans[planID] = c.plans[planID].shadowClone(setState(failed)) c.setSegmentsCompacting(task.plan, false) c.executingTaskNum-- c.releaseQueue(task.dataNodeID) } + } - // DataNode will check if plan's are timeout but not as sensitive as DataCoord, - // just wait another round. - if ok && stateResult.GetState() == commonpb.CompactionState_Executing { - log.Info("compaction timeout in DataCoord yet DataNode is still running", - zap.Int64("planID", planID), - zap.Int64("nodeID", task.dataNodeID)) - continue + // Compaction plans in DN but not in DC are unknown plans, need to notify DN to clear it. + // No locks needed, because no changes in DC memeory + completedPlans := lo.PickBy(planStates, func(planID int64, planState *typeutil.Pair[int64, *datapb.CompactionStateResult]) bool { + return planState.B.GetState() == commonpb.CompactionState_Completed + }) + + unkonwnPlansInWorker, _ := lo.Difference(lo.Keys(completedPlans), cachedPlans) + for _, planID := range unkonwnPlansInWorker { + if nodeUnKnownPlan, ok := completedPlans[planID]; ok { + nodeID := nodeUnKnownPlan.A + log := log.With(zap.Int64("planID", planID), zap.Int64("nodeID", nodeID)) + + // Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task + // without changing the meta + req := &datapb.SyncSegmentsRequest{ + PlanID: planID, + } + + log.Info("compaction syncing unknown plan with node") + if err := c.sessions.SyncSegments(nodeID, req); err != nil { + log.Warn("compaction failed to sync segments with node", zap.Error(err)) + return err + } } } return nil diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 82242a739b..52d014cbf3 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -795,6 +795,7 @@ func Test_compactionPlanHandler_updateCompaction(t *testing.T) { {PlanID: 3, State: commonpb.CompactionState_Completed, Result: &datapb.CompactionResult{PlanID: 3}}, {PlanID: 4, State: commonpb.CompactionState_Executing}, {PlanID: 6, State: commonpb.CompactionState_Executing}, + {PlanID: 7, State: commonpb.CompactionState_Completed, Result: &datapb.CompactionResult{PlanID: 7}}, }, }, }}, diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index a40250f9d7..2627e91854 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -58,7 +58,7 @@ type SessionManager interface { SyncSegments(nodeID int64, req *datapb.SyncSegmentsRequest) error Import(ctx context.Context, nodeID int64, itr *datapb.ImportTaskRequest) AddImportSegment(ctx context.Context, nodeID int64, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error) - GetCompactionState() map[int64]*datapb.CompactionStateResult + GetCompactionPlanResults() map[int64]*typeutil.Pair[int64, *datapb.CompactionStateResult] CheckHealth(ctx context.Context) error Close() } @@ -260,11 +260,11 @@ func (c *SessionManagerImpl) execImport(ctx context.Context, nodeID int64, itr * log.Info("success to import", zap.Int64("node", nodeID), zap.Any("import task", itr)) } -func (c *SessionManagerImpl) GetCompactionState() map[int64]*datapb.CompactionStateResult { +func (c *SessionManagerImpl) GetCompactionPlanResults() map[int64]*typeutil.Pair[int64, *datapb.CompactionStateResult] { wg := sync.WaitGroup{} ctx := context.Background() - plans := typeutil.NewConcurrentMap[int64, *datapb.CompactionStateResult]() + plans := typeutil.NewConcurrentMap[int64, *typeutil.Pair[int64, *datapb.CompactionStateResult]]() c.sessions.RLock() for nodeID, s := range c.sessions.data { wg.Add(1) @@ -293,15 +293,16 @@ func (c *SessionManagerImpl) GetCompactionState() map[int64]*datapb.CompactionSt return } for _, rst := range resp.GetResults() { - plans.Insert(rst.PlanID, rst) + nodeRst := typeutil.NewPair(nodeID, rst) + plans.Insert(rst.PlanID, &nodeRst) } }(nodeID, s) } c.sessions.RUnlock() wg.Wait() - rst := make(map[int64]*datapb.CompactionStateResult) - plans.Range(func(planID int64, result *datapb.CompactionStateResult) bool { + rst := make(map[int64]*typeutil.Pair[int64, *datapb.CompactionStateResult]) + plans.Range(func(planID int64, result *typeutil.Pair[int64, *datapb.CompactionStateResult]) bool { rst[planID] = result return true }) diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 1a568ee360..32a6e29d85 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -341,20 +341,22 @@ func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.Compac func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegmentsRequest) (*commonpb.Status, error) { log := log.Ctx(ctx).With( zap.Int64("planID", req.GetPlanID()), + zap.Int64("nodeID", paramtable.GetNodeID()), zap.Int64("targetSegmentID", req.GetCompactedTo()), zap.Int64s("compactedFrom", req.GetCompactedFrom()), - ) - log.Info("DataNode receives SyncSegments", zap.Int64("numOfRows", req.GetNumOfRows()), ) + log.Info("DataNode receives SyncSegments") if err := merr.CheckHealthy(node.GetStateCode()); err != nil { - log.Warn("DataNode.SyncSegments failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err)) + log.Warn("DataNode.SyncSegments failed", zap.Error(err)) return merr.Status(err), nil } if len(req.GetCompactedFrom()) <= 0 { - return merr.Status(merr.WrapErrParameterInvalid(">0", "0", "compacted from segments shouldn't be empty")), nil + log.Info("SyncSegments with empty compactedFrom, clearing the plan") + node.compactionExecutor.injectDone(req.GetPlanID(), false) + return merr.Success(), nil } var ( diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index c81cf02245..ac07f5cd41 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -760,19 +760,24 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { s3.segmentID: &s3, } - s.Run("invalid compacted from", func() { + s.Run("empty compactedFrom", func() { req := &datapb.SyncSegmentsRequest{ CompactedTo: 400, NumOfRows: 100, } - - req.CompactedFrom = []UniqueID{} status, err := s.node.SyncSegments(s.ctx, req) - s.Assert().NoError(err) - s.Assert().False(merr.Ok(status)) + s.NoError(err) + s.True(merr.Ok(status)) + }) - req.CompactedFrom = []UniqueID{101, 201} - status, err = s.node.SyncSegments(s.ctx, req) + s.Run("invalid compacted from", func() { + req := &datapb.SyncSegmentsRequest{ + CompactedTo: 400, + NumOfRows: 100, + CompactedFrom: []UniqueID{101, 201}, + } + + status, err := s.node.SyncSegments(s.ctx, req) s.Assert().NoError(err) s.Assert().True(merr.Ok(status)) })