From 315cfb7f321696cb3a2196d8f90ea2eb2a8ba947 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Tue, 25 Feb 2025 18:07:55 +0800 Subject: [PATCH] fix: Negative -1 executing compaction tasks (#39954) See also: #39675 --------- Signed-off-by: yangxuan --- internal/datacoord/compaction.go | 2 +- internal/datacoord/compaction_task_clustering.go | 16 +++++++++++----- internal/datacoord/compaction_task_l0.go | 15 +++++++++++++-- internal/datacoord/compaction_task_mix.go | 9 ++++++++- 4 files changed, 33 insertions(+), 9 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 9013ccf5d7..2c0622740b 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -722,7 +722,7 @@ func (c *compactionPlanHandler) checkCompaction(assigner NodeAssigner) error { if ok := assigner.assign(t); !ok { break } - metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Dec() + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Dec() metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Executing).Inc() } } diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 0888539e18..e272cc3c65 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -700,12 +700,18 @@ func (t *clusteringCompactionTask) doCompact() error { } err = t.sessions.Compaction(context.Background(), t.GetTaskProto().GetNodeID(), t.GetPlan()) if err != nil { - if errors.Is(err, merr.ErrDataNodeSlotExhausted) { - log.Warn("fail to notify compaction tasks to DataNode because the node slots exhausted") - return t.updateAndSaveTaskMeta(setNodeID(NullNodeID)) + originNodeID := t.GetTaskProto().GetNodeID() + log.Warn("Failed to notify compaction tasks to DataNode", + zap.Int64("planID", t.GetTaskProto().GetPlanID()), + zap.Int64("nodeID", originNodeID), + zap.Error(err)) + err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + if err != nil { + log.Warn("updateAndSaveTaskMeta fail", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err)) + return err } - log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err)) - return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", originNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Dec() + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Inc() } return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) } diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index 42bf2d099d..b30b85b7fb 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/datacoord/session" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" @@ -103,8 +104,18 @@ func (t *l0CompactionTask) processPipelining() bool { err = t.sessions.Compaction(context.TODO(), t.GetTaskProto().GetNodeID(), plan) if err != nil { - log.Warn("l0CompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err)) - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + originNodeID := t.GetTaskProto().GetNodeID() + log.Warn("l0CompactionTask failed to notify compaction tasks to DataNode", + zap.Int64("planID", t.GetTaskProto().GetPlanID()), + zap.Int64("nodeID", originNodeID), + zap.Error(err)) + err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + if err != nil { + log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err)) + return false + } + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", originNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Dec() + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Inc() return false } diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index 7dea339d95..f812fbf06a 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -14,6 +14,7 @@ import ( "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/session" "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" @@ -70,11 +71,17 @@ func (t *mixCompactionTask) processPipelining() bool { // Compaction tasks may be refused by DataNode because of slot limit. In this case, the node id is reset // to enable a retry in compaction.checkCompaction(). // This is tricky, we should remove the reassignment here. - log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err)) + originNodeID := t.GetTaskProto().GetNodeID() + log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", + zap.Int64("planID", t.GetTaskProto().GetPlanID()), + zap.Int64("nodeID", originNodeID), + zap.Error(err)) err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) if err != nil { log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) } + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", originNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Dec() + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Inc() return false } log.Info("mixCompactionTask notify compaction tasks to DataNode")