From fb61344dc9b5ef241be895fffc40209518fc302e Mon Sep 17 00:00:00 2001 From: wayblink Date: Wed, 3 Jul 2024 10:32:08 +0800 Subject: [PATCH] fix: Revert a optimize in clustering compaction (#34299) #30633 Signed-off-by: wayblink --- .../datacoord/compaction_task_clustering.go | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 70ad1b7abb..4dc446a77c 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -410,24 +410,27 @@ func (t *clusteringCompactionTask) doCompact() error { if t.NeedReAssignNodeID() { return errors.New("not assign nodeID") } + + // todo refine this logic: GetCompactionPlanResult return a fail result when this is no compaction in datanode which is weird // check whether the compaction plan is already submitted considering // datacoord may crash between call sessions.Compaction and updateTaskState to executing - result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID()) - if err != nil { - if errors.Is(err, merr.ErrNodeNotFound) { - log.Warn("GetCompactionPlanResult fail", zap.Error(err)) - // setNodeID(NullNodeID) to trigger reassign node ID - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) - return nil - } - return merr.WrapErrGetCompactionPlanResultFail(err) - } - if result != nil { - log.Info("compaction already submitted") - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) - return nil - } + // result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID()) + // if err != nil { + // if errors.Is(err, merr.ErrNodeNotFound) { + // log.Warn("GetCompactionPlanResult fail", zap.Error(err)) + // // setNodeID(NullNodeID) to trigger reassign node ID + // t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + // return nil + // } + // return merr.WrapErrGetCompactionPlanResultFail(err) + // } + // if result != nil { + // log.Info("compaction already submitted") + // t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) + // return nil + // } + var err error t.plan, err = t.BuildCompactionRequest() if err != nil { log.Warn("Failed to BuildCompactionRequest", zap.Error(err))