From 224728c2d22dd348acde81c16ab6919900ffc561 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Thu, 10 Apr 2025 19:04:27 +0800 Subject: [PATCH] fix: catchup cannot work if using StartAfter (#41201) issue: #41062 Signed-off-by: chyezh --- .../streaming/internal/consumer/consumer_impl.go | 4 +++- .../server/flusher/flusherimpl/wal_flusher.go | 4 +++- .../server/wal/adaptor/scanner_switchable.go | 3 ++- .../server/wal/interceptors/wab/pending_queue.go | 5 +++++ .../server/wal/interceptors/wab/write_ahead_buffer.go | 10 +++++----- 5 files changed, 18 insertions(+), 8 deletions(-) diff --git a/internal/distributed/streaming/internal/consumer/consumer_impl.go b/internal/distributed/streaming/internal/consumer/consumer_impl.go index aa7fbf2b1c..a9d177273c 100644 --- a/internal/distributed/streaming/internal/consumer/consumer_impl.go +++ b/internal/distributed/streaming/internal/consumer/consumer_impl.go @@ -89,7 +89,9 @@ func (rc *resumableConsumerImpl) resumeLoop() { // Consume ordering is always time tick order now. if rc.mh.lastConfirmedMessageID != nil { // set the deliver policy start after the last message id. - deliverPolicy = options.DeliverPolicyStartAfter(rc.mh.lastConfirmedMessageID) + // !!! we always set the deliver policy to start from the last confirmed message id. + // because the catchup scanner at the streamingnode server must see the last confirmed message id if it's the last timetick. + deliverPolicy = options.DeliverPolicyStartFrom(rc.mh.lastConfirmedMessageID) newDeliverFilters := make([]options.DeliverFilter, 0, len(deliverFilters)+1) for _, filter := range deliverFilters { if !options.IsDeliverFilterTimeTick(filter) { diff --git a/internal/streamingnode/server/flusher/flusherimpl/wal_flusher.go b/internal/streamingnode/server/flusher/flusherimpl/wal_flusher.go index 60222bfdee..8e210013dd 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/wal_flusher.go +++ b/internal/streamingnode/server/flusher/flusherimpl/wal_flusher.go @@ -177,7 +177,9 @@ func (impl *WALFlusherImpl) generateScanner(ctx context.Context, l wal.WAL) (wal } if startMessageID := impl.flusherComponents.StartMessageID(); startMessageID != nil { impl.logger.Info("wal start to scan from minimum checkpoint", zap.Stringer("startMessageID", startMessageID)) - readOpt.DeliverPolicy = options.DeliverPolicyStartAfter(startMessageID) + // !!! we always set the deliver policy to start from the last confirmed message id. + // because the catchup scanner at the streamingnode server must see the last confirmed message id if it's the last timetick. + readOpt.DeliverPolicy = options.DeliverPolicyStartFrom(startMessageID) } impl.logger.Info("wal start to scan from the beginning") return l.Read(ctx, readOpt) diff --git a/internal/streamingnode/server/wal/adaptor/scanner_switchable.go b/internal/streamingnode/server/wal/adaptor/scanner_switchable.go index b776ca9e75..6ea49c4bb1 100644 --- a/internal/streamingnode/server/wal/adaptor/scanner_switchable.go +++ b/internal/streamingnode/server/wal/adaptor/scanner_switchable.go @@ -145,7 +145,8 @@ func (s *catchupScanner) consumeWithScanner(ctx context.Context, scanner walimpl return nil, err } if msg.MessageType() != message.MessageTypeTimeTick || s.writeAheadBuffer == nil { - // If there's no write ahead buffer, we cannot switch into tailing mode, so skip the checking. + // Only timetick message is keep the same order with the write ahead buffer. + // So we can only use the timetick message to catchup the write ahead buffer. continue } // Here's a timetick message from the scanner, make tailing read if we catch up the writeahead buffer. diff --git a/internal/streamingnode/server/wal/interceptors/wab/pending_queue.go b/internal/streamingnode/server/wal/interceptors/wab/pending_queue.go index a0edb88b41..c03a7cf474 100644 --- a/internal/streamingnode/server/wal/interceptors/wab/pending_queue.go +++ b/internal/streamingnode/server/wal/interceptors/wab/pending_queue.go @@ -74,6 +74,11 @@ func (q *pendingQueue) Push(msgs []message.ImmutableMessage) { q.evict(now) } +// LastTimeTick returns the last time tick of the buffer. +func (q *pendingQueue) LastTimeTick() uint64 { + return q.lastTimeTick +} + // Evict removes messages that have been in the buffer for longer than the keepAlive duration. func (q *pendingQueue) Evict() { q.evict(time.Now()) diff --git a/internal/streamingnode/server/wal/interceptors/wab/write_ahead_buffer.go b/internal/streamingnode/server/wal/interceptors/wab/write_ahead_buffer.go index 4a4eb82e30..fcf0cc5008 100644 --- a/internal/streamingnode/server/wal/interceptors/wab/write_ahead_buffer.go +++ b/internal/streamingnode/server/wal/interceptors/wab/write_ahead_buffer.go @@ -160,6 +160,11 @@ func (w *WriteAheadBuffer) createSnapshotFromTimeTick(ctx context.Context, timeT // error is eof, which means that the time tick is behind the message buffer. // The lastTimeTickMessage should always be greater or equal to the lastTimeTick in the pending queue. + if w.pendingMessages.LastTimeTick() == timeTick { + offset := w.pendingMessages.CurrentOffset() + 1 + w.cond.L.Unlock() + return nil, offset, nil + } if w.lastTimeTickMessage.TimeTick() > timeTick { // check if the last time tick is greater than the given time tick, return it to update the timetick. @@ -170,11 +175,6 @@ func (w *WriteAheadBuffer) createSnapshotFromTimeTick(ctx context.Context, timeT w.cond.L.Unlock() return []messageWithOffset{msg}, msg.Offset, nil } - if w.lastTimeTickMessage.TimeTick() == timeTick { - offset := w.pendingMessages.CurrentOffset() + 1 - w.cond.L.Unlock() - return nil, offset, nil - } if err := w.cond.Wait(ctx); err != nil { return nil, 0, err