diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index c4ae60b117..dd921d97c1 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -549,12 +549,20 @@ func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) e log := log.With(zap.Int64("planID", task.GetPlanID()), zap.Int64("triggerID", task.GetTriggerID()), zap.Int64("collectionID", task.GetCollectionID()), zap.String("type", task.GetType().String())) t, err := c.createCompactTask(task) if err != nil { + // Conflict is normal + if errors.Is(err, merr.ErrCompactionPlanConflict) { + log.RatedInfo(60, "Failed to create compaction task, compaction plan conflict", zap.Error(err)) + } else { + log.Warn("Failed to create compaction task, unable to create compaction task", zap.Error(err)) + } return err } + t.SetTask(t.ShadowClone(setStartTime(time.Now().Unix()))) err = t.SaveTaskMeta() if err != nil { c.meta.SetSegmentsCompacting(t.GetInputSegments(), false) + log.Warn("Failed to enqueue compaction task, unable to save task meta", zap.Error(err)) return err } c.submitTask(t) @@ -608,7 +616,7 @@ func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) { for _, t := range tasks { nodeID := c.pickAnyNode(slots) if nodeID == NullNodeID { - log.Info("cannot find datanode for compaction task", + log.Info("compactionHandler cannot find datanode for compaction task", zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel())) continue } diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index 77fe6b0bd3..355ace014c 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -44,6 +44,8 @@ type l0CompactionTask struct { meta CompactionMeta } +// Note: return True means exit this state machine. +// ONLY return True for processCompleted or processFailed func (t *l0CompactionTask) Process() bool { switch t.GetState() { case datapb.CompactionTaskState_pipelining: @@ -275,7 +277,7 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err for _, segInfo := range sealedSegments { // TODO should allow parallel executing of l0 compaction if segInfo.isCompacting { - log.Info("l0 compaction candidate segment is compacting", zap.Int64("segmentID", segInfo.GetID())) + log.Warn("l0CompactionTask candidate segment is compacting", zap.Int64("segmentID", segInfo.GetID())) return nil, merr.WrapErrCompactionPlanConflict(fmt.Sprintf("segment %d is compacting", segInfo.GetID())) } } @@ -292,7 +294,7 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err }) plan.SegmentBinlogs = append(plan.SegmentBinlogs, sealedSegBinlogs...) - log.Info("Compaction handler refreshed level zero compaction plan", + log.Info("l0CompactionTask refreshed level zero compaction plan", zap.Any("target position", t.GetPos()), zap.Any("target segments count", len(sealedSegBinlogs))) return plan, nil diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index 8806ce9852..1957b71bbe 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -30,29 +30,38 @@ func (t *mixCompactionTask) processPipelining() bool { if t.NeedReAssignNodeID() { return false } + + log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("nodeID", t.GetNodeID())) var err error t.plan, err = t.BuildCompactionRequest() - // Segment not found if err != nil { - err2 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error())) - return err2 == nil + log.Warn("mixCompactionTask failed to build compaction request", zap.Error(err)) + err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error())) + if err != nil { + log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) + return false + } + return t.processFailed() } - err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan()) + + err = t.sessions.Compaction(context.TODO(), t.GetNodeID(), t.GetPlan()) if err != nil { - log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err)) + log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) return false } + t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) return false } func (t *mixCompactionTask) processMetaSaved() bool { - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)) - if err == nil { - return t.processCompleted() + if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)); err != nil { + log.Warn("mixCompactionTask failed to proccessMetaSaved", zap.Error(err)) + return false } - return false + + return t.processCompleted() } func (t *mixCompactionTask) processExecuting() bool { @@ -62,44 +71,49 @@ func (t *mixCompactionTask) processExecuting() bool { if errors.Is(err, merr.ErrNodeNotFound) { t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) } + log.Warn("mixCompactionTask failed to get compaction result", zap.Error(err)) return false } switch result.GetState() { case datapb.CompactionTaskState_executing: if t.checkTimeout() { err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout)) - if err == nil { - return t.processTimeout() + if err != nil { + log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) + return false } + return t.processTimeout() } - return false case datapb.CompactionTaskState_completed: t.result = result if len(result.GetSegments()) == 0 || len(result.GetSegments()) > 1 { log.Info("illegal compaction results") err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)) if err != nil { + log.Warn("mixCompactionTask failed to setState failed", zap.Error(err)) return false } return t.processFailed() } - err2 := t.saveSegmentMeta() - if err2 != nil { - if errors.Is(err2, merr.ErrIllegalCompactionPlan) { - err3 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)) - if err3 != nil { - log.Warn("fail to updateAndSaveTaskMeta") + if err := t.saveSegmentMeta(); err != nil { + log.Warn("mixCompactionTask failed to save segment meta", zap.Error(err)) + if errors.Is(err, merr.ErrIllegalCompactionPlan) { + err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)) + if err != nil { + log.Warn("mixCompactionTask failed to setState failed", zap.Error(err)) + return false } - return true + return t.processFailed() } return false } segments := []UniqueID{t.newSegment.GetID()} - err3 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setResultSegments(segments)) - if err3 == nil { - return t.processMetaSaved() + err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setResultSegments(segments)) + if err != nil { + log.Warn("mixCompaction failed to setState meta saved", zap.Error(err)) + return false } - return false + return t.processMetaSaved() case datapb.CompactionTaskState_failed: err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)) if err != nil { @@ -132,6 +146,8 @@ func (t *mixCompactionTask) saveSegmentMeta() error { return nil } +// Note: return True means exit this state machine. +// ONLY return True for processCompleted or processFailed func (t *mixCompactionTask) Process() bool { switch t.GetState() { case datapb.CompactionTaskState_pipelining: