From e0fd091d412d3053d86e2ec4c3c70b11a06e32fd Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Fri, 26 Dec 2025 18:13:19 +0800 Subject: [PATCH] fix: Fix replicate lag when server is idle (#46574) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit issue: https://github.com/milvus-io/milvus/issues/46116 - Core invariant: the metric CDCLastReplicatedTimeTick must reflect the most recent time-tick when replication has effectively processed all pending messages (including idle periods), so reported replicate lag = confirmed WAL tick − last replicated tick can reach zero when the server is idle. - Exact fix (bug): addresses issue #46116 by ensuring the last-replicated metric is updated when the server is idle. Concretely, a new ReplicateMetrics.UpdateLastReplicatedTimeTick(ts uint64) was added and called from OnConfirmed (OnConfirmed now delegates to UpdateLastReplicatedTimeTick(msg.TimeTick())), and from Replicate’s self-controlled-message path when the pending queue is empty — so the code records the time tick before returning ErrReplicateIgnored. - Logic simplified / removed: direct, ad-hoc metric writes in OnConfirmed were replaced by a single UpdateLastReplicatedTimeTick helper on the metrics implementation. The scattered manual set of CDCLastReplicatedTimeTick is consolidated into one method, removing redundant direct metric manipulations and centralizing timestamp conversion (tsoutil.PhysicalTimeSeconds). - No data loss / no behavior regression: this change only updates monitoring metrics and does not alter replication control flow or message processing. Replicate still returns ErrReplicateIgnored for self-controlled messages and does not change message persistence or acknowledgement paths; OnConfirmed continues to be invoked on confirmed messages but now delegates metric recording to the new method. Therefore no replication state, message ordering, or persistence semantics are modified. Signed-off-by: bigsheeper --- internal/cdc/replication/replicatestream/metrics.go | 13 +++++++++---- .../replicatestream/replicate_stream_client_impl.go | 5 ++++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/internal/cdc/replication/replicatestream/metrics.go b/internal/cdc/replication/replicatestream/metrics.go index b323f02c08..a11f813555 100644 --- a/internal/cdc/replication/replicatestream/metrics.go +++ b/internal/cdc/replication/replicatestream/metrics.go @@ -28,6 +28,7 @@ import ( ) type ReplicateMetrics interface { + UpdateLastReplicatedTimeTick(ts uint64) StartReplicate(msg message.ImmutableMessage) OnSent(msg message.ImmutableMessage) OnConfirmed(msg message.ImmutableMessage) @@ -53,6 +54,13 @@ func NewReplicateMetrics(replicateInfo *streamingpb.ReplicatePChannelMeta) Repli } } +func (m *replicateMetrics) UpdateLastReplicatedTimeTick(ts uint64) { + metrics.CDCLastReplicatedTimeTick.WithLabelValues( + m.replicateInfo.GetSourceChannelName(), + m.replicateInfo.GetTargetChannelName(), + ).Set(tsoutil.PhysicalTimeSeconds(ts)) +} + func (m *replicateMetrics) StartReplicate(msg message.ImmutableMessage) { msgID := msg.MessageID().String() m.msgsMetrics.Insert(msgID, msgMetrics{ @@ -88,10 +96,7 @@ func (m *replicateMetrics) OnConfirmed(msg message.ImmutableMessage) { m.replicateInfo.GetTargetChannelName(), ).Observe(float64(replicateDuration.Milliseconds())) - metrics.CDCLastReplicatedTimeTick.WithLabelValues( - m.replicateInfo.GetSourceChannelName(), - m.replicateInfo.GetTargetChannelName(), - ).Set(tsoutil.PhysicalTimeSeconds(msg.TimeTick())) + m.UpdateLastReplicatedTimeTick(msg.TimeTick()) } func (m *replicateMetrics) OnInitiate() { diff --git a/internal/cdc/replication/replicatestream/replicate_stream_client_impl.go b/internal/cdc/replication/replicatestream/replicate_stream_client_impl.go index cce4e4612f..71a61294ce 100644 --- a/internal/cdc/replication/replicatestream/replicate_stream_client_impl.go +++ b/internal/cdc/replication/replicatestream/replicate_stream_client_impl.go @@ -168,8 +168,11 @@ func (r *replicateStreamClient) Replicate(msg message.ImmutableMessage) error { case <-r.ctx.Done(): return nil default: - // TODO: Should be done at streamingnode, but after move it into streamingnode, the metric need to be adjusted. if msg.MessageType().IsSelfControlled() { + // If no messages are being replicated, update the last replicated time tick. + if r.pendingMessages.Len() == 0 { + r.metrics.UpdateLastReplicatedTimeTick(msg.TimeTick()) + } return ErrReplicateIgnored } r.metrics.StartReplicate(msg)