fix: Negative -1 executing compaction tasks (#39954)

See also: #39675

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2025-02-25 18:07:55 +08:00 committed by GitHub
parent 84df80b5e4
commit 315cfb7f32
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 33 additions and 9 deletions

View File

@ -722,7 +722,7 @@ func (c *compactionPlanHandler) checkCompaction(assigner NodeAssigner) error {
if ok := assigner.assign(t); !ok { if ok := assigner.assign(t); !ok {
break break
} }
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Dec() metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Executing).Inc() metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Executing).Inc()
} }
} }

View File

@ -700,12 +700,18 @@ func (t *clusteringCompactionTask) doCompact() error {
} }
err = t.sessions.Compaction(context.Background(), t.GetTaskProto().GetNodeID(), t.GetPlan()) err = t.sessions.Compaction(context.Background(), t.GetTaskProto().GetNodeID(), t.GetPlan())
if err != nil { if err != nil {
if errors.Is(err, merr.ErrDataNodeSlotExhausted) { originNodeID := t.GetTaskProto().GetNodeID()
log.Warn("fail to notify compaction tasks to DataNode because the node slots exhausted") log.Warn("Failed to notify compaction tasks to DataNode",
return t.updateAndSaveTaskMeta(setNodeID(NullNodeID)) zap.Int64("planID", t.GetTaskProto().GetPlanID()),
zap.Int64("nodeID", originNodeID),
zap.Error(err))
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
if err != nil {
log.Warn("updateAndSaveTaskMeta fail", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return err
} }
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err)) metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", originNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Dec()
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Inc()
} }
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
} }

View File

@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/datacoord/session" "github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
@ -103,8 +104,18 @@ func (t *l0CompactionTask) processPipelining() bool {
err = t.sessions.Compaction(context.TODO(), t.GetTaskProto().GetNodeID(), plan) err = t.sessions.Compaction(context.TODO(), t.GetTaskProto().GetNodeID(), plan)
if err != nil { if err != nil {
log.Warn("l0CompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err)) originNodeID := t.GetTaskProto().GetNodeID()
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) log.Warn("l0CompactionTask failed to notify compaction tasks to DataNode",
zap.Int64("planID", t.GetTaskProto().GetPlanID()),
zap.Int64("nodeID", originNodeID),
zap.Error(err))
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
if err != nil {
log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return false
}
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", originNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Inc()
return false return false
} }

View File

@ -14,6 +14,7 @@ import (
"github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/session" "github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
@ -70,11 +71,17 @@ func (t *mixCompactionTask) processPipelining() bool {
// Compaction tasks may be refused by DataNode because of slot limit. In this case, the node id is reset // Compaction tasks may be refused by DataNode because of slot limit. In this case, the node id is reset
// to enable a retry in compaction.checkCompaction(). // to enable a retry in compaction.checkCompaction().
// This is tricky, we should remove the reassignment here. // This is tricky, we should remove the reassignment here.
log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err)) originNodeID := t.GetTaskProto().GetNodeID()
log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode",
zap.Int64("planID", t.GetTaskProto().GetPlanID()),
zap.Int64("nodeID", originNodeID),
zap.Error(err))
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
if err != nil { if err != nil {
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
} }
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", originNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Inc()
return false return false
} }
log.Info("mixCompactionTask notify compaction tasks to DataNode") log.Info("mixCompactionTask notify compaction tasks to DataNode")