mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance: add more metrics for DDL framework (#45559)
issue: #43897 pr: #45558 --------- Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
parent
679d248c59
commit
601687d8d2
@ -158,9 +158,12 @@ func (s *ackCallbackScheduler) doAckCallback(bt *broadcastTask, g *lockGuards) (
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// call the ack callback until done.
|
// call the ack callback until done.
|
||||||
|
bt.ObserveAckCallbackBegin()
|
||||||
if err := s.callMessageAckCallbackUntilDone(s.notifier.Context(), msg, makeMap); err != nil {
|
if err := s.callMessageAckCallbackUntilDone(s.notifier.Context(), msg, makeMap); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
bt.ObserveAckCallbackDone()
|
||||||
|
|
||||||
logger.Debug("ack callback done")
|
logger.Debug("ack callback done")
|
||||||
if err := bt.MarkAckCallbackDone(s.notifier.Context()); err != nil {
|
if err := bt.MarkAckCallbackDone(s.notifier.Context()); err != nil {
|
||||||
// The catalog is reliable to write, so we can mark the ack callback done without retrying.
|
// The catalog is reliable to write, so we can mark the ack callback done without retrying.
|
||||||
|
|||||||
@ -3,6 +3,7 @@ package broadcaster
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/cockroachdb/errors"
|
"github.com/cockroachdb/errors"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@ -112,6 +113,7 @@ func (bm *broadcastTaskManager) WithResourceKeys(ctx context.Context, resourceKe
|
|||||||
return nil, errors.Wrapf(err, "allocate new id failed")
|
return nil, errors.Wrapf(err, "allocate new id failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
startLockInstant := time.Now()
|
||||||
resourceKeys = bm.appendSharedClusterRK(resourceKeys...)
|
resourceKeys = bm.appendSharedClusterRK(resourceKeys...)
|
||||||
guards := bm.resourceKeyLocker.Lock(resourceKeys...)
|
guards := bm.resourceKeyLocker.Lock(resourceKeys...)
|
||||||
|
|
||||||
@ -120,6 +122,8 @@ func (bm *broadcastTaskManager) WithResourceKeys(ctx context.Context, resourceKe
|
|||||||
guards.Unlock()
|
guards.Unlock()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
bm.metrics.ObserveAcquireLockDuration(startLockInstant, guards.ResourceKeys())
|
||||||
|
|
||||||
return &broadcasterWithRK{
|
return &broadcasterWithRK{
|
||||||
broadcaster: bm,
|
broadcaster: bm,
|
||||||
broadcastID: id,
|
broadcastID: id,
|
||||||
@ -129,6 +133,9 @@ func (bm *broadcastTaskManager) WithResourceKeys(ctx context.Context, resourceKe
|
|||||||
|
|
||||||
// checkClusterRole checks if the cluster status is primary, otherwise return error.
|
// checkClusterRole checks if the cluster status is primary, otherwise return error.
|
||||||
func (bm *broadcastTaskManager) checkClusterRole(ctx context.Context) error {
|
func (bm *broadcastTaskManager) checkClusterRole(ctx context.Context) error {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
// Check if the cluster status is primary, otherwise return error.
|
// Check if the cluster status is primary, otherwise return error.
|
||||||
b, err := balance.GetWithContext(ctx)
|
b, err := balance.GetWithContext(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@ -18,14 +18,14 @@ import (
|
|||||||
|
|
||||||
// newBroadcastTaskFromProto creates a new broadcast task from the proto.
|
// newBroadcastTaskFromProto creates a new broadcast task from the proto.
|
||||||
func newBroadcastTaskFromProto(proto *streamingpb.BroadcastTask, metrics *broadcasterMetrics, ackCallbackScheduler *ackCallbackScheduler) *broadcastTask {
|
func newBroadcastTaskFromProto(proto *streamingpb.BroadcastTask, metrics *broadcasterMetrics, ackCallbackScheduler *ackCallbackScheduler) *broadcastTask {
|
||||||
m := metrics.NewBroadcastTask(proto.GetState())
|
|
||||||
msg := message.NewBroadcastMutableMessageBeforeAppend(proto.Message.Payload, proto.Message.Properties)
|
msg := message.NewBroadcastMutableMessageBeforeAppend(proto.Message.Payload, proto.Message.Properties)
|
||||||
|
m := metrics.NewBroadcastTask(msg.MessageType(), proto.GetState(), msg.BroadcastHeader().ResourceKeys.Collect())
|
||||||
bt := &broadcastTask{
|
bt := &broadcastTask{
|
||||||
mu: sync.Mutex{},
|
mu: sync.Mutex{},
|
||||||
|
taskMetricsGuard: m,
|
||||||
msg: msg,
|
msg: msg,
|
||||||
task: proto,
|
task: proto,
|
||||||
dirty: true, // the task is recovered from the recovery info, so it's persisted.
|
dirty: true, // the task is recovered from the recovery info, so it's persisted.
|
||||||
metrics: m,
|
|
||||||
ackCallbackScheduler: ackCallbackScheduler,
|
ackCallbackScheduler: ackCallbackScheduler,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
allAcked: make(chan struct{}),
|
allAcked: make(chan struct{}),
|
||||||
@ -41,12 +41,13 @@ func newBroadcastTaskFromProto(proto *streamingpb.BroadcastTask, metrics *broadc
|
|||||||
|
|
||||||
// newBroadcastTaskFromBroadcastMessage creates a new broadcast task from the broadcast message.
|
// newBroadcastTaskFromBroadcastMessage creates a new broadcast task from the broadcast message.
|
||||||
func newBroadcastTaskFromBroadcastMessage(msg message.BroadcastMutableMessage, metrics *broadcasterMetrics, ackCallbackScheduler *ackCallbackScheduler) *broadcastTask {
|
func newBroadcastTaskFromBroadcastMessage(msg message.BroadcastMutableMessage, metrics *broadcasterMetrics, ackCallbackScheduler *ackCallbackScheduler) *broadcastTask {
|
||||||
m := metrics.NewBroadcastTask(streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_PENDING)
|
m := metrics.NewBroadcastTask(msg.MessageType(), streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_PENDING, msg.BroadcastHeader().ResourceKeys.Collect())
|
||||||
header := msg.BroadcastHeader()
|
header := msg.BroadcastHeader()
|
||||||
bt := &broadcastTask{
|
bt := &broadcastTask{
|
||||||
Binder: log.Binder{},
|
Binder: log.Binder{},
|
||||||
mu: sync.Mutex{},
|
taskMetricsGuard: m,
|
||||||
msg: msg,
|
mu: sync.Mutex{},
|
||||||
|
msg: msg,
|
||||||
task: &streamingpb.BroadcastTask{
|
task: &streamingpb.BroadcastTask{
|
||||||
Message: msg.IntoMessageProto(),
|
Message: msg.IntoMessageProto(),
|
||||||
State: streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_PENDING,
|
State: streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_PENDING,
|
||||||
@ -54,7 +55,6 @@ func newBroadcastTaskFromBroadcastMessage(msg message.BroadcastMutableMessage, m
|
|||||||
AckedCheckpoints: make([]*streamingpb.AckedCheckpoint, len(header.VChannels)),
|
AckedCheckpoints: make([]*streamingpb.AckedCheckpoint, len(header.VChannels)),
|
||||||
},
|
},
|
||||||
dirty: false,
|
dirty: false,
|
||||||
metrics: m,
|
|
||||||
ackCallbackScheduler: ackCallbackScheduler,
|
ackCallbackScheduler: ackCallbackScheduler,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
allAcked: make(chan struct{}),
|
allAcked: make(chan struct{}),
|
||||||
@ -68,18 +68,19 @@ func newBroadcastTaskFromImmutableMessage(msg message.ImmutableMessage, metrics
|
|||||||
task := newBroadcastTaskFromBroadcastMessage(broadcastMsg, metrics, ackCallbackScheduler)
|
task := newBroadcastTaskFromBroadcastMessage(broadcastMsg, metrics, ackCallbackScheduler)
|
||||||
// if the task is created from the immutable message, it already has been broadcasted, so transfer its state into recovered.
|
// if the task is created from the immutable message, it already has been broadcasted, so transfer its state into recovered.
|
||||||
task.task.State = streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_REPLICATED
|
task.task.State = streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_REPLICATED
|
||||||
task.metrics.ToState(streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_REPLICATED)
|
task.ObserveStateChanged(streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_REPLICATED)
|
||||||
return task
|
return task
|
||||||
}
|
}
|
||||||
|
|
||||||
// broadcastTask is the state of the broadcast task.
|
// broadcastTask is the state of the broadcast task.
|
||||||
type broadcastTask struct {
|
type broadcastTask struct {
|
||||||
log.Binder
|
log.Binder
|
||||||
|
*taskMetricsGuard
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
msg message.BroadcastMutableMessage
|
msg message.BroadcastMutableMessage
|
||||||
task *streamingpb.BroadcastTask
|
task *streamingpb.BroadcastTask
|
||||||
dirty bool // a flag to indicate that the task has been modified and needs to be saved into the recovery info.
|
dirty bool // a flag to indicate that the task has been modified and needs to be saved into the recovery info.
|
||||||
metrics *taskMetricsGuard
|
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
allAcked chan struct{}
|
allAcked chan struct{}
|
||||||
guards *lockGuards
|
guards *lockGuards
|
||||||
@ -248,7 +249,6 @@ func (b *broadcastTask) ack(ctx context.Context, msgs ...message.ImmutableMessag
|
|||||||
}
|
}
|
||||||
if allDone {
|
if allDone {
|
||||||
close(b.allAcked)
|
close(b.allAcked)
|
||||||
b.metrics.ObserveAckAll()
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -338,11 +338,10 @@ func findIdxOfVChannel(vchannel string, vchannels []string) (int, error) {
|
|||||||
// FastAck trigger a fast ack operation when the broadcast operation is done.
|
// FastAck trigger a fast ack operation when the broadcast operation is done.
|
||||||
func (b *broadcastTask) FastAck(ctx context.Context, broadcastResult map[string]*types.AppendResult) error {
|
func (b *broadcastTask) FastAck(ctx context.Context, broadcastResult map[string]*types.AppendResult) error {
|
||||||
// Broadcast operation is done.
|
// Broadcast operation is done.
|
||||||
b.metrics.ObserveBroadcastDone()
|
|
||||||
|
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
defer b.mu.Unlock()
|
defer b.mu.Unlock()
|
||||||
|
|
||||||
|
b.ObserveBroadcastDone()
|
||||||
// because we need to wait for the streamingnode to ack the message,
|
// because we need to wait for the streamingnode to ack the message,
|
||||||
// however, if the message is already write into wal, the message is determined,
|
// however, if the message is already write into wal, the message is determined,
|
||||||
// so we can make a fast ack operation here to speed up the ack operation.
|
// so we can make a fast ack operation here to speed up the ack operation.
|
||||||
@ -421,7 +420,7 @@ func (b *broadcastTask) saveTaskIfDirty(ctx context.Context, logger *log.MLogger
|
|||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
b.metrics.ToState(b.task.State)
|
b.ObserveStateChanged(b.task.State)
|
||||||
logger.Info("save broadcast task done")
|
logger.Info("save broadcast task done")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,13 +1,14 @@
|
|||||||
package broadcaster
|
package broadcaster
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/pkg/v2/metrics"
|
"github.com/milvus-io/milvus/pkg/v2/metrics"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/proto/messagespb"
|
|
||||||
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
|
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
|
||||||
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
|
||||||
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -17,68 +18,91 @@ func newBroadcasterMetrics() *broadcasterMetrics {
|
|||||||
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
|
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
|
||||||
}
|
}
|
||||||
return &broadcasterMetrics{
|
return &broadcasterMetrics{
|
||||||
taskTotal: metrics.StreamingCoordBroadcasterTaskTotal.MustCurryWith(constLabel),
|
taskTotal: metrics.StreamingCoordBroadcasterTaskTotal.MustCurryWith(constLabel),
|
||||||
resourceKeyTotal: metrics.StreamingCoordResourceKeyTotal.MustCurryWith(constLabel),
|
executionDuration: metrics.StreamingCoordBroadcasterTaskExecutionDurationSeconds.MustCurryWith(constLabel),
|
||||||
broadcastDuration: metrics.StreamingCoordBroadcastDurationSeconds.With(constLabel),
|
broadcastDuration: metrics.StreamingCoordBroadcasterTaskBroadcastDurationSeconds.MustCurryWith(constLabel),
|
||||||
ackAllDuration: metrics.StreamingCoordBroadcasterAckAllDurationSeconds.With(constLabel),
|
ackCallbackDuration: metrics.StreamingCoordBroadcasterTaskAckCallbackDurationSeconds.MustCurryWith(constLabel),
|
||||||
|
acquireLockDuration: metrics.StreamingCoordBroadcasterTaskAcquireLockDurationSeconds.MustCurryWith(constLabel),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// broadcasterMetrics is the metrics of the broadcaster.
|
// broadcasterMetrics is the metrics of the broadcaster.
|
||||||
type broadcasterMetrics struct {
|
type broadcasterMetrics struct {
|
||||||
taskTotal *prometheus.GaugeVec
|
taskTotal *prometheus.GaugeVec
|
||||||
resourceKeyTotal *prometheus.GaugeVec
|
executionDuration prometheus.ObserverVec
|
||||||
broadcastDuration prometheus.Observer
|
broadcastDuration prometheus.ObserverVec
|
||||||
ackAllDuration prometheus.Observer
|
ackCallbackDuration prometheus.ObserverVec
|
||||||
|
acquireLockDuration prometheus.ObserverVec
|
||||||
|
}
|
||||||
|
|
||||||
|
// ObserveAcquireLockDuration observes the acquire lock duration.
|
||||||
|
func (m *broadcasterMetrics) ObserveAcquireLockDuration(from time.Time, rks []message.ResourceKey) {
|
||||||
|
m.acquireLockDuration.WithLabelValues(formatResourceKeys(rks)).Observe(time.Since(from).Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
// fromStateToState updates the metrics when the state of the broadcast task changes.
|
// fromStateToState updates the metrics when the state of the broadcast task changes.
|
||||||
func (m *broadcasterMetrics) fromStateToState(from streamingpb.BroadcastTaskState, to streamingpb.BroadcastTaskState) {
|
func (m *broadcasterMetrics) fromStateToState(msgType message.MessageType, from streamingpb.BroadcastTaskState, to streamingpb.BroadcastTaskState) {
|
||||||
if from != streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_UNKNOWN {
|
if from != streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_UNKNOWN {
|
||||||
m.taskTotal.WithLabelValues(from.String()).Dec()
|
m.taskTotal.WithLabelValues(msgType.String(), from.String()).Dec()
|
||||||
}
|
}
|
||||||
if to != streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_DONE {
|
if to != streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_DONE {
|
||||||
m.taskTotal.WithLabelValues(to.String()).Inc()
|
m.taskTotal.WithLabelValues(msgType.String(), to.String()).Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBroadcastTask creates a new broadcast task.
|
// NewBroadcastTask creates a new broadcast task.
|
||||||
func (m *broadcasterMetrics) NewBroadcastTask(state streamingpb.BroadcastTaskState) *taskMetricsGuard {
|
func (m *broadcasterMetrics) NewBroadcastTask(msgType message.MessageType, state streamingpb.BroadcastTaskState, rks []message.ResourceKey) *taskMetricsGuard {
|
||||||
|
rks = uniqueSortResourceKeys(rks)
|
||||||
g := &taskMetricsGuard{
|
g := &taskMetricsGuard{
|
||||||
start: time.Now(),
|
start: time.Now(),
|
||||||
|
ackCallbackBegin: time.Now(),
|
||||||
state: state,
|
state: state,
|
||||||
|
resourceKeys: formatResourceKeys(rks),
|
||||||
broadcasterMetrics: m,
|
broadcasterMetrics: m,
|
||||||
|
messageType: msgType,
|
||||||
}
|
}
|
||||||
g.broadcasterMetrics.fromStateToState(streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_UNKNOWN, state)
|
g.broadcasterMetrics.fromStateToState(msgType, streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_UNKNOWN, state)
|
||||||
return g
|
return g
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *broadcasterMetrics) IncomingResourceKey(domain messagespb.ResourceDomain) {
|
|
||||||
m.resourceKeyTotal.WithLabelValues(domain.String()).Inc()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *broadcasterMetrics) GoneResourceKey(domain messagespb.ResourceDomain) {
|
|
||||||
m.resourceKeyTotal.WithLabelValues(domain.String()).Dec()
|
|
||||||
}
|
|
||||||
|
|
||||||
type taskMetricsGuard struct {
|
type taskMetricsGuard struct {
|
||||||
start time.Time
|
start time.Time
|
||||||
state streamingpb.BroadcastTaskState
|
ackCallbackBegin time.Time
|
||||||
|
state streamingpb.BroadcastTaskState
|
||||||
|
resourceKeys string
|
||||||
|
messageType message.MessageType
|
||||||
*broadcasterMetrics
|
*broadcasterMetrics
|
||||||
}
|
}
|
||||||
|
|
||||||
// ToState updates the state of the broadcast task.
|
// ObserveStateChanged updates the state of the broadcast task.
|
||||||
func (g *taskMetricsGuard) ToState(state streamingpb.BroadcastTaskState) {
|
func (g *taskMetricsGuard) ObserveStateChanged(state streamingpb.BroadcastTaskState) {
|
||||||
g.broadcasterMetrics.fromStateToState(g.state, state)
|
g.broadcasterMetrics.fromStateToState(g.messageType, g.state, state)
|
||||||
|
if state == streamingpb.BroadcastTaskState_BROADCAST_TASK_STATE_TOMBSTONE {
|
||||||
|
g.executionDuration.WithLabelValues(g.messageType.String()).Observe(time.Since(g.start).Seconds())
|
||||||
|
}
|
||||||
g.state = state
|
g.state = state
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObserveBroadcastDone observes the broadcast done.
|
// ObserveBroadcastDone observes the broadcast done.
|
||||||
func (g *taskMetricsGuard) ObserveBroadcastDone() {
|
func (g *taskMetricsGuard) ObserveBroadcastDone() {
|
||||||
g.broadcastDuration.Observe(time.Since(g.start).Seconds())
|
g.broadcastDuration.WithLabelValues(g.messageType.String()).Observe(time.Since(g.start).Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObserverAckOne observes the ack all.
|
// ObserveAckCallbackBegin observes the ack callback begin.
|
||||||
func (g *taskMetricsGuard) ObserveAckAll() {
|
func (g *taskMetricsGuard) ObserveAckCallbackBegin() {
|
||||||
g.ackAllDuration.Observe(time.Since(g.start).Seconds())
|
g.ackCallbackBegin = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ObserveAckCallbackDone observes the ack callback done.
|
||||||
|
func (g *taskMetricsGuard) ObserveAckCallbackDone() {
|
||||||
|
g.ackCallbackDuration.WithLabelValues(g.messageType.String()).Observe(time.Since(g.ackCallbackBegin).Seconds())
|
||||||
|
}
|
||||||
|
|
||||||
|
// formatResourceKeys formats the resource keys.
|
||||||
|
func formatResourceKeys(rks []message.ResourceKey) string {
|
||||||
|
keys := make([]string, 0, len(rks))
|
||||||
|
for _, rk := range rks {
|
||||||
|
keys = append(keys, rk.ShortString())
|
||||||
|
}
|
||||||
|
return strings.Join(keys, "|")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -22,7 +22,7 @@ const (
|
|||||||
WALStatusError = "error"
|
WALStatusError = "error"
|
||||||
|
|
||||||
BroadcasterTaskStateLabelName = "state"
|
BroadcasterTaskStateLabelName = "state"
|
||||||
ResourceKeyDomainLabelName = "domain"
|
ResourceKeyLockLabelName = "rk_lock"
|
||||||
WALAccessModelLabelName = "access_model"
|
WALAccessModelLabelName = "access_model"
|
||||||
WALScannerModelLabelName = "scanner_model"
|
WALScannerModelLabelName = "scanner_model"
|
||||||
TimeTickSyncTypeLabelName = "type"
|
TimeTickSyncTypeLabelName = "type"
|
||||||
@ -126,24 +126,31 @@ var (
|
|||||||
StreamingCoordBroadcasterTaskTotal = newStreamingCoordGaugeVec(prometheus.GaugeOpts{
|
StreamingCoordBroadcasterTaskTotal = newStreamingCoordGaugeVec(prometheus.GaugeOpts{
|
||||||
Name: "broadcaster_task_total",
|
Name: "broadcaster_task_total",
|
||||||
Help: "Total of broadcaster task",
|
Help: "Total of broadcaster task",
|
||||||
}, BroadcasterTaskStateLabelName)
|
}, WALMessageTypeLabelName, BroadcasterTaskStateLabelName)
|
||||||
|
|
||||||
StreamingCoordBroadcastDurationSeconds = newStreamingCoordHistogramVec(prometheus.HistogramOpts{
|
StreamingCoordBroadcasterTaskExecutionDurationSeconds = newStreamingCoordHistogramVec(prometheus.HistogramOpts{
|
||||||
Name: "broadcaster_broadcast_duration_seconds",
|
Name: "broadcaster_task_execution_duration_seconds",
|
||||||
Help: "Duration of broadcast",
|
Help: "Duration of broadcast execution, including broadcast message into wal and ack callback, without lock acquisition duration",
|
||||||
Buckets: secondsBuckets,
|
Buckets: secondsBuckets,
|
||||||
})
|
}, WALMessageTypeLabelName)
|
||||||
|
|
||||||
StreamingCoordBroadcasterAckAllDurationSeconds = newStreamingCoordHistogramVec(prometheus.HistogramOpts{
|
StreamingCoordBroadcasterTaskBroadcastDurationSeconds = newStreamingCoordHistogramVec(prometheus.HistogramOpts{
|
||||||
Name: "broadcaster_ack_all_duration_seconds",
|
Name: "broadcaster_task_broadcast_duration_seconds",
|
||||||
Help: "Duration of acknowledge all message",
|
Help: "Duration of broadcast message into wal",
|
||||||
Buckets: secondsBuckets,
|
Buckets: secondsBuckets,
|
||||||
})
|
}, WALMessageTypeLabelName)
|
||||||
|
|
||||||
StreamingCoordResourceKeyTotal = newStreamingCoordGaugeVec(prometheus.GaugeOpts{
|
StreamingCoordBroadcasterTaskAcquireLockDurationSeconds = newStreamingCoordHistogramVec(prometheus.HistogramOpts{
|
||||||
Name: "resource_key_total",
|
Name: "broadcaster_task_acquire_lock_duration_seconds",
|
||||||
Help: "Total of resource key hold at streaming coord",
|
Help: "Duration of acquire lock of resource key",
|
||||||
}, ResourceKeyDomainLabelName)
|
Buckets: secondsBuckets,
|
||||||
|
}, ResourceKeyLockLabelName)
|
||||||
|
|
||||||
|
StreamingCoordBroadcasterTaskAckCallbackDurationSeconds = newStreamingCoordHistogramVec(prometheus.HistogramOpts{
|
||||||
|
Name: "broadcaster_task_ack_callback_duration_seconds",
|
||||||
|
Help: "Duration of ack callback handler execution duration",
|
||||||
|
Buckets: secondsBuckets,
|
||||||
|
}, WALMessageTypeLabelName)
|
||||||
|
|
||||||
// StreamingNode Producer Server Metrics.
|
// StreamingNode Producer Server Metrics.
|
||||||
StreamingNodeProducerTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{
|
StreamingNodeProducerTotal = newStreamingNodeGaugeVec(prometheus.GaugeOpts{
|
||||||
@ -483,9 +490,10 @@ func registerStreamingCoord(registry *prometheus.Registry) {
|
|||||||
registry.MustRegister(StreamingCoordAssignmentVersion)
|
registry.MustRegister(StreamingCoordAssignmentVersion)
|
||||||
registry.MustRegister(StreamingCoordAssignmentListenerTotal)
|
registry.MustRegister(StreamingCoordAssignmentListenerTotal)
|
||||||
registry.MustRegister(StreamingCoordBroadcasterTaskTotal)
|
registry.MustRegister(StreamingCoordBroadcasterTaskTotal)
|
||||||
registry.MustRegister(StreamingCoordBroadcastDurationSeconds)
|
registry.MustRegister(StreamingCoordBroadcasterTaskExecutionDurationSeconds)
|
||||||
registry.MustRegister(StreamingCoordBroadcasterAckAllDurationSeconds)
|
registry.MustRegister(StreamingCoordBroadcasterTaskBroadcastDurationSeconds)
|
||||||
registry.MustRegister(StreamingCoordResourceKeyTotal)
|
registry.MustRegister(StreamingCoordBroadcasterTaskAcquireLockDurationSeconds)
|
||||||
|
registry.MustRegister(StreamingCoordBroadcasterTaskAckCallbackDurationSeconds)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterStreamingNode registers streaming node metrics
|
// RegisterStreamingNode registers streaming node metrics
|
||||||
|
|||||||
@ -47,6 +47,14 @@ func (r ResourceKey) String() string {
|
|||||||
return fmt.Sprintf("%s:%s@X", domain, r.Key)
|
return fmt.Sprintf("%s:%s@X", domain, r.Key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r ResourceKey) ShortString() string {
|
||||||
|
domain, _ := strings.CutPrefix(r.Domain.String(), "ResourceDomain")
|
||||||
|
if r.Shared {
|
||||||
|
return fmt.Sprintf("%s@R", domain)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("%s@X", domain)
|
||||||
|
}
|
||||||
|
|
||||||
// NewSharedClusterResourceKey creates a shared cluster resource key.
|
// NewSharedClusterResourceKey creates a shared cluster resource key.
|
||||||
func NewSharedClusterResourceKey() ResourceKey {
|
func NewSharedClusterResourceKey() ResourceKey {
|
||||||
return ResourceKey{
|
return ResourceKey{
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user