From 40e20427280b72db21e38a644a07f527bf5acb84 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Fri, 14 Nov 2025 15:19:37 +0800 Subject: [PATCH] enhance: add more metrics for DDL framework (#45558) issue: #43897 --------- Signed-off-by: chyezh --- .../broadcaster/ack_callback_scheduler.go | 3 + .../server/broadcaster/broadcast_manager.go | 7 ++ .../server/broadcaster/broadcast_task.go | 25 +++--- .../server/broadcaster/metrics.go | 86 ++++++++++++------- pkg/metrics/streaming_service_metrics.go | 42 +++++---- pkg/streaming/util/message/resource_key.go | 8 ++ 6 files changed, 110 insertions(+), 61 deletions(-) diff --git a/internal/streamingcoord/server/broadcaster/ack_callback_scheduler.go b/internal/streamingcoord/server/broadcaster/ack_callback_scheduler.go index f96b220975..69ae6db46c 100644 --- a/internal/streamingcoord/server/broadcaster/ack_callback_scheduler.go +++ b/internal/streamingcoord/server/broadcaster/ack_callback_scheduler.go @@ -158,9 +158,12 @@ func (s *ackCallbackScheduler) doAckCallback(bt *broadcastTask, g *lockGuards) ( } } // call the ack callback until done. + bt.ObserveAckCallbackBegin() if err := s.callMessageAckCallbackUntilDone(s.notifier.Context(), msg, makeMap); err != nil { return err } + bt.ObserveAckCallbackDone() + logger.Debug("ack callback done") if err := bt.MarkAckCallbackDone(s.notifier.Context()); err != nil { // The catalog is reliable to write, so we can mark the ack callback done without retrying. diff --git a/internal/streamingcoord/server/broadcaster/broadcast_manager.go b/internal/streamingcoord/server/broadcaster/broadcast_manager.go index 2690c5c771..6545930b8e 100644 --- a/internal/streamingcoord/server/broadcaster/broadcast_manager.go +++ b/internal/streamingcoord/server/broadcaster/broadcast_manager.go @@ -3,6 +3,7 @@ package broadcaster import ( "context" "sync" + "time" "github.com/cockroachdb/errors" "go.uber.org/zap" @@ -112,6 +113,7 @@ func (bm *broadcastTaskManager) WithResourceKeys(ctx context.Context, resourceKe return nil, errors.Wrapf(err, "allocate new id failed") } + startLockInstant := time.Now() resourceKeys = bm.appendSharedClusterRK(resourceKeys...) guards := bm.resourceKeyLocker.Lock(resourceKeys...) @@ -120,6 +122,8 @@ func (bm *broadcastTaskManager) WithResourceKeys(ctx context.Context, resourceKe guards.Unlock() return nil, err } + bm.metrics.ObserveAcquireLockDuration(startLockInstant, guards.ResourceKeys()) + return &broadcasterWithRK{ broadcaster: bm, broadcastID: id, @@ -129,6 +133,9 @@ func (bm *broadcastTaskManager) WithResourceKeys(ctx context.Context, resourceKe // checkClusterRole checks if the cluster status is primary, otherwise return error. func (bm *broadcastTaskManager) checkClusterRole(ctx context.Context) error { + if ctx.Err() != nil { + return ctx.Err() + } // Check if the cluster status is primary, otherwise return error. b, err := balance.GetWithContext(ctx) if err != nil { diff --git a/internal/streamingcoord/server/broadcaster/broadcast_task.go b/internal/streamingcoord/server/broadcaster/broadcast_task.go index 154e926212..c1836adec3 100644 --- a/internal/streamingcoord/server/broadcaster/broadcast_task.go +++ b/internal/streamingcoord/server/broadcaster/broadcast_task.go @@ -18,14 +18,14 @@ import ( // newBroadcastTaskFromProto creates a new broadcast task from the proto. func newBroadcastTaskFromProto(proto *streamingpb.BroadcastTask, metrics *broadcasterMetrics, ackCallbackScheduler *ackCallbackScheduler) *broadcastTask { - m := metrics.NewBroadcastTask(proto.GetState()) msg := message.NewBroadcastMutableMessageBeforeAppend(proto.Message.Payload, proto.Message.Properties) + m := metrics.NewBroadcastTask(msg.MessageType(), proto.GetState(), msg.BroadcastHeader().ResourceKeys.Collect()) bt := &broadcastTask{ mu: sync.Mutex{}, + taskMetricsGuard: m, msg: msg, task: proto, dirty: true, // the task is recovered from the recovery info, so it's persisted. - metrics: m, ackCallbackScheduler: ackCallbackScheduler, done: make(chan struct{}), allAcked: make(chan struct{}), @@ -41,12 +41,13 @@ func newBroadcastTaskFromProto(proto *streamingpb.BroadcastTask, metrics *broadc // newBroadcastTaskFromBroadcastMessage creates a new broadcast task from the broadcast message. func newBroadcastTaskFromBroadcastMessage(msg message.BroadcastMutableMessage, metrics *broadcasterMetrics, ackCallbackScheduler *ackCallbackScheduler) *broadcastTask { - m := metrics.NewBroadcastTask(streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_PENDING) + m := metrics.NewBroadcastTask(msg.MessageType(), streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_PENDING, msg.BroadcastHeader().ResourceKeys.Collect()) header := msg.BroadcastHeader() bt := &broadcastTask{ - Binder: log.Binder{}, - mu: sync.Mutex{}, - msg: msg, + Binder: log.Binder{}, + taskMetricsGuard: m, + mu: sync.Mutex{}, + msg: msg, task: &streamingpb.BroadcastTask{ Message: msg.IntoMessageProto(), State: streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_PENDING, @@ -54,7 +55,6 @@ func newBroadcastTaskFromBroadcastMessage(msg message.BroadcastMutableMessage, m AckedCheckpoints: make([]*streamingpb.AckedCheckpoint, len(header.VChannels)), }, dirty: false, - metrics: m, ackCallbackScheduler: ackCallbackScheduler, done: make(chan struct{}), allAcked: make(chan struct{}), @@ -68,18 +68,19 @@ func newBroadcastTaskFromImmutableMessage(msg message.ImmutableMessage, metrics task := newBroadcastTaskFromBroadcastMessage(broadcastMsg, metrics, ackCallbackScheduler) // if the task is created from the immutable message, it already has been broadcasted, so transfer its state into recovered. task.task.State = streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_REPLICATED - task.metrics.ToState(streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_REPLICATED) + task.ObserveStateChanged(streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_REPLICATED) return task } // broadcastTask is the state of the broadcast task. type broadcastTask struct { log.Binder + *taskMetricsGuard + mu sync.Mutex msg message.BroadcastMutableMessage task *streamingpb.BroadcastTask dirty bool // a flag to indicate that the task has been modified and needs to be saved into the recovery info. - metrics *taskMetricsGuard done chan struct{} allAcked chan struct{} guards *lockGuards @@ -248,7 +249,6 @@ func (b *broadcastTask) ack(ctx context.Context, msgs ...message.ImmutableMessag } if allDone { close(b.allAcked) - b.metrics.ObserveAckAll() } return nil } @@ -338,11 +338,10 @@ func findIdxOfVChannel(vchannel string, vchannels []string) (int, error) { // FastAck trigger a fast ack operation when the broadcast operation is done. func (b *broadcastTask) FastAck(ctx context.Context, broadcastResult map[string]*types.AppendResult) error { // Broadcast operation is done. - b.metrics.ObserveBroadcastDone() - b.mu.Lock() defer b.mu.Unlock() + b.ObserveBroadcastDone() // because we need to wait for the streamingnode to ack the message, // however, if the message is already write into wal, the message is determined, // so we can make a fast ack operation here to speed up the ack operation. @@ -421,7 +420,7 @@ func (b *broadcastTask) saveTaskIfDirty(ctx context.Context, logger *log.MLogger } return err } - b.metrics.ToState(b.task.State) + b.ObserveStateChanged(b.task.State) logger.Info("save broadcast task done") return nil } diff --git a/internal/streamingcoord/server/broadcaster/metrics.go b/internal/streamingcoord/server/broadcaster/metrics.go index cb64fab6d5..72c22ad0b4 100644 --- a/internal/streamingcoord/server/broadcaster/metrics.go +++ b/internal/streamingcoord/server/broadcaster/metrics.go @@ -1,13 +1,14 @@ package broadcaster import ( + "strings" "time" "github.com/prometheus/client_golang/prometheus" "github.com/milvus-io/milvus/pkg/v2/metrics" - "github.com/milvus-io/milvus/pkg/v2/proto/messagespb" "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) @@ -17,68 +18,91 @@ func newBroadcasterMetrics() *broadcasterMetrics { metrics.NodeIDLabelName: paramtable.GetStringNodeID(), } return &broadcasterMetrics{ - taskTotal: metrics.StreamingCoordBroadcasterTaskTotal.MustCurryWith(constLabel), - resourceKeyTotal: metrics.StreamingCoordResourceKeyTotal.MustCurryWith(constLabel), - broadcastDuration: metrics.StreamingCoordBroadcastDurationSeconds.With(constLabel), - ackAllDuration: metrics.StreamingCoordBroadcasterAckAllDurationSeconds.With(constLabel), + taskTotal: metrics.StreamingCoordBroadcasterTaskTotal.MustCurryWith(constLabel), + executionDuration: metrics.StreamingCoordBroadcasterTaskExecutionDurationSeconds.MustCurryWith(constLabel), + broadcastDuration: metrics.StreamingCoordBroadcasterTaskBroadcastDurationSeconds.MustCurryWith(constLabel), + ackCallbackDuration: metrics.StreamingCoordBroadcasterTaskAckCallbackDurationSeconds.MustCurryWith(constLabel), + acquireLockDuration: metrics.StreamingCoordBroadcasterTaskAcquireLockDurationSeconds.MustCurryWith(constLabel), } } // broadcasterMetrics is the metrics of the broadcaster. type broadcasterMetrics struct { - taskTotal *prometheus.GaugeVec - resourceKeyTotal *prometheus.GaugeVec - broadcastDuration prometheus.Observer - ackAllDuration prometheus.Observer + taskTotal *prometheus.GaugeVec + executionDuration prometheus.ObserverVec + broadcastDuration prometheus.ObserverVec + ackCallbackDuration prometheus.ObserverVec + acquireLockDuration prometheus.ObserverVec +} + +// ObserveAcquireLockDuration observes the acquire lock duration. +func (m *broadcasterMetrics) ObserveAcquireLockDuration(from time.Time, rks []message.ResourceKey) { + m.acquireLockDuration.WithLabelValues(formatResourceKeys(rks)).Observe(time.Since(from).Seconds()) } // fromStateToState updates the metrics when the state of the broadcast task changes. -func (m *broadcasterMetrics) fromStateToState(from streamingpb.BroadcastTaskState, to streamingpb.BroadcastTaskState) { +func (m *broadcasterMetrics) fromStateToState(msgType message.MessageType, from streamingpb.BroadcastTaskState, to streamingpb.BroadcastTaskState) { if from != streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_UNKNOWN { - m.taskTotal.WithLabelValues(from.String()).Dec() + m.taskTotal.WithLabelValues(msgType.String(), from.String()).Dec() } if to != streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_DONE { - m.taskTotal.WithLabelValues(to.String()).Inc() + m.taskTotal.WithLabelValues(msgType.String(), to.String()).Inc() } } // NewBroadcastTask creates a new broadcast task. -func (m *broadcasterMetrics) NewBroadcastTask(state streamingpb.BroadcastTaskState) *taskMetricsGuard { +func (m *broadcasterMetrics) NewBroadcastTask(msgType message.MessageType, state streamingpb.BroadcastTaskState, rks []message.ResourceKey) *taskMetricsGuard { + rks = uniqueSortResourceKeys(rks) g := &taskMetricsGuard{ start: time.Now(), + ackCallbackBegin: time.Now(), state: state, + resourceKeys: formatResourceKeys(rks), broadcasterMetrics: m, + messageType: msgType, } - g.broadcasterMetrics.fromStateToState(streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_UNKNOWN, state) + g.broadcasterMetrics.fromStateToState(msgType, streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_UNKNOWN, state) return g } -func (m *broadcasterMetrics) IncomingResourceKey(domain messagespb.ResourceDomain) { - m.resourceKeyTotal.WithLabelValues(domain.String()).Inc() -} - -func (m *broadcasterMetrics) GoneResourceKey(domain messagespb.ResourceDomain) { - m.resourceKeyTotal.WithLabelValues(domain.String()).Dec() -} - type taskMetricsGuard struct { - start time.Time - state streamingpb.BroadcastTaskState + start time.Time + ackCallbackBegin time.Time + state streamingpb.BroadcastTaskState + resourceKeys string + messageType message.MessageType *broadcasterMetrics } -// ToState updates the state of the broadcast task. -func (g *taskMetricsGuard) ToState(state streamingpb.BroadcastTaskState) { - g.broadcasterMetrics.fromStateToState(g.state, state) +// ObserveStateChanged updates the state of the broadcast task. +func (g *taskMetricsGuard) ObserveStateChanged(state streamingpb.BroadcastTaskState) { + g.broadcasterMetrics.fromStateToState(g.messageType, g.state, state) + if state == streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_TOMBSTONE { + g.executionDuration.WithLabelValues(g.messageType.String()).Observe(time.Since(g.start).Seconds()) + } g.state = state } // ObserveBroadcastDone observes the broadcast done. func (g *taskMetricsGuard) ObserveBroadcastDone() { - g.broadcastDuration.Observe(time.Since(g.start).Seconds()) + g.broadcastDuration.WithLabelValues(g.messageType.String()).Observe(time.Since(g.start).Seconds()) } -// ObserverAckOne observes the ack all. -func (g *taskMetricsGuard) ObserveAckAll() { - g.ackAllDuration.Observe(time.Since(g.start).Seconds()) +// ObserveAckCallbackBegin observes the ack callback begin. +func (g *taskMetricsGuard) ObserveAckCallbackBegin() { + g.ackCallbackBegin = time.Now() +} + +// ObserveAckCallbackDone observes the ack callback done. +func (g *taskMetricsGuard) ObserveAckCallbackDone() { + g.ackCallbackDuration.WithLabelValues(g.messageType.String()).Observe(time.Since(g.ackCallbackBegin).Seconds()) +} + +// formatResourceKeys formats the resource keys. +func formatResourceKeys(rks []message.ResourceKey) string { + keys := make([]string, 0, len(rks)) + for _, rk := range rks { + keys = append(keys, rk.ShortString()) + } + return strings.Join(keys, "|") } diff --git a/pkg/metrics/streaming_service_metrics.go b/pkg/metrics/streaming_service_metrics.go index db0797b286..e224695f0d 100644 --- a/pkg/metrics/streaming_service_metrics.go +++ b/pkg/metrics/streaming_service_metrics.go @@ -22,7 +22,7 @@ const ( WALStatusError = "error" BroadcasterTaskStateLabelName = "state" - ResourceKeyDomainLabelName = "domain" + ResourceKeyLockLabelName = "rk_lock" WALAccessModelLabelName = "access_model" WALScannerModelLabelName = "scanner_model" TimeTickSyncTypeLabelName = "type" @@ -126,24 +126,31 @@ var ( StreamingCoordBroadcasterTaskTotal = newStreamingCoordGaugeVec(prometheus.GaugeOpts{ Name: "broadcaster_task_total", Help: "Total of broadcaster task", - }, BroadcasterTaskStateLabelName) + }, WALMessageTypeLabelName, BroadcasterTaskStateLabelName) - StreamingCoordBroadcastDurationSeconds = newStreamingCoordHistogramVec(prometheus.HistogramOpts{ - Name: "broadcaster_broadcast_duration_seconds", - Help: "Duration of broadcast", + StreamingCoordBroadcasterTaskExecutionDurationSeconds = newStreamingCoordHistogramVec(prometheus.HistogramOpts{ + Name: "broadcaster_task_execution_duration_seconds", + Help: "Duration of broadcast execution, including broadcast message into wal and ack callback, without lock acquisition duration", Buckets: secondsBuckets, - }) + }, WALMessageTypeLabelName) - StreamingCoordBroadcasterAckAllDurationSeconds = newStreamingCoordHistogramVec(prometheus.HistogramOpts{ - Name: "broadcaster_ack_all_duration_seconds", - Help: "Duration of acknowledge all message", + StreamingCoordBroadcasterTaskBroadcastDurationSeconds = newStreamingCoordHistogramVec(prometheus.HistogramOpts{ + Name: "broadcaster_task_broadcast_duration_seconds", + Help: "Duration of broadcast message into wal", Buckets: secondsBuckets, - }) + }, WALMessageTypeLabelName) - StreamingCoordResourceKeyTotal = newStreamingCoordGaugeVec(prometheus.GaugeOpts{ - Name: "resource_key_total", - Help: "Total of resource key hold at streaming coord", - }, ResourceKeyDomainLabelName) + StreamingCoordBroadcasterTaskAcquireLockDurationSeconds = newStreamingCoordHistogramVec(prometheus.HistogramOpts{ + Name: "broadcaster_task_acquire_lock_duration_seconds", + Help: "Duration of acquire lock of resource key", + Buckets: secondsBuckets, + }, ResourceKeyLockLabelName) + + StreamingCoordBroadcasterTaskAckCallbackDurationSeconds = newStreamingCoordHistogramVec(prometheus.HistogramOpts{ + Name: "broadcaster_task_ack_callback_duration_seconds", + Help: "Duration of ack callback handler execution duration", + Buckets: secondsBuckets, + }, WALMessageTypeLabelName) // StreamingNode Producer Server Metrics. StreamingNodeProducerTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{ @@ -483,9 +490,10 @@ func registerStreamingCoord(registry *prometheus.Registry) { registry.MustRegister(StreamingCoordAssignmentVersion) registry.MustRegister(StreamingCoordAssignmentListenerTotal) registry.MustRegister(StreamingCoordBroadcasterTaskTotal) - registry.MustRegister(StreamingCoordBroadcastDurationSeconds) - registry.MustRegister(StreamingCoordBroadcasterAckAllDurationSeconds) - registry.MustRegister(StreamingCoordResourceKeyTotal) + registry.MustRegister(StreamingCoordBroadcasterTaskExecutionDurationSeconds) + registry.MustRegister(StreamingCoordBroadcasterTaskBroadcastDurationSeconds) + registry.MustRegister(StreamingCoordBroadcasterTaskAcquireLockDurationSeconds) + registry.MustRegister(StreamingCoordBroadcasterTaskAckCallbackDurationSeconds) } // RegisterStreamingNode registers streaming node metrics diff --git a/pkg/streaming/util/message/resource_key.go b/pkg/streaming/util/message/resource_key.go index df77d4e3e3..de5c8c03d6 100644 --- a/pkg/streaming/util/message/resource_key.go +++ b/pkg/streaming/util/message/resource_key.go @@ -47,6 +47,14 @@ func (r ResourceKey) String() string { return fmt.Sprintf("%s:%s@X", domain, r.Key) } +func (r ResourceKey) ShortString() string { + domain, _ := strings.CutPrefix(r.Domain.String(), "ResourceDomain") + if r.Shared { + return fmt.Sprintf("%s@R", domain) + } + return fmt.Sprintf("%s@X", domain) +} + // NewSharedClusterResourceKey creates a shared cluster resource key. func NewSharedClusterResourceKey() ResourceKey { return ResourceKey{