diff --git a/internal/cdc/replication/replicatestream/metrics.go b/internal/cdc/replication/replicatestream/metrics.go index b323f02c08..a11f813555 100644 --- a/internal/cdc/replication/replicatestream/metrics.go +++ b/internal/cdc/replication/replicatestream/metrics.go @@ -28,6 +28,7 @@ import ( ) type ReplicateMetrics interface { + UpdateLastReplicatedTimeTick(ts uint64) StartReplicate(msg message.ImmutableMessage) OnSent(msg message.ImmutableMessage) OnConfirmed(msg message.ImmutableMessage) @@ -53,6 +54,13 @@ func NewReplicateMetrics(replicateInfo *streamingpb.ReplicatePChannelMeta) Repli } } +func (m *replicateMetrics) UpdateLastReplicatedTimeTick(ts uint64) { + metrics.CDCLastReplicatedTimeTick.WithLabelValues( + m.replicateInfo.GetSourceChannelName(), + m.replicateInfo.GetTargetChannelName(), + ).Set(tsoutil.PhysicalTimeSeconds(ts)) +} + func (m *replicateMetrics) StartReplicate(msg message.ImmutableMessage) { msgID := msg.MessageID().String() m.msgsMetrics.Insert(msgID, msgMetrics{ @@ -88,10 +96,7 @@ func (m *replicateMetrics) OnConfirmed(msg message.ImmutableMessage) { m.replicateInfo.GetTargetChannelName(), ).Observe(float64(replicateDuration.Milliseconds())) - metrics.CDCLastReplicatedTimeTick.WithLabelValues( - m.replicateInfo.GetSourceChannelName(), - m.replicateInfo.GetTargetChannelName(), - ).Set(tsoutil.PhysicalTimeSeconds(msg.TimeTick())) + m.UpdateLastReplicatedTimeTick(msg.TimeTick()) } func (m *replicateMetrics) OnInitiate() { diff --git a/internal/cdc/replication/replicatestream/replicate_stream_client_impl.go b/internal/cdc/replication/replicatestream/replicate_stream_client_impl.go index cce4e4612f..71a61294ce 100644 --- a/internal/cdc/replication/replicatestream/replicate_stream_client_impl.go +++ b/internal/cdc/replication/replicatestream/replicate_stream_client_impl.go @@ -168,8 +168,11 @@ func (r *replicateStreamClient) Replicate(msg message.ImmutableMessage) error { case <-r.ctx.Done(): return nil default: - // TODO: Should be done at streamingnode, but after move it into streamingnode, the metric need to be adjusted. if msg.MessageType().IsSelfControlled() { + // If no messages are being replicated, update the last replicated time tick. + if r.pendingMessages.Len() == 0 { + r.metrics.UpdateLastReplicatedTimeTick(msg.TimeTick()) + } return ErrReplicateIgnored } r.metrics.StartReplicate(msg)