mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
fix: catchup cannot work if using StartAfter (#41201)
issue: #41062 Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
parent
793fdeafe1
commit
224728c2d2
@ -89,7 +89,9 @@ func (rc *resumableConsumerImpl) resumeLoop() {
|
|||||||
// Consume ordering is always time tick order now.
|
// Consume ordering is always time tick order now.
|
||||||
if rc.mh.lastConfirmedMessageID != nil {
|
if rc.mh.lastConfirmedMessageID != nil {
|
||||||
// set the deliver policy start after the last message id.
|
// set the deliver policy start after the last message id.
|
||||||
deliverPolicy = options.DeliverPolicyStartAfter(rc.mh.lastConfirmedMessageID)
|
// !!! we always set the deliver policy to start from the last confirmed message id.
|
||||||
|
// because the catchup scanner at the streamingnode server must see the last confirmed message id if it's the last timetick.
|
||||||
|
deliverPolicy = options.DeliverPolicyStartFrom(rc.mh.lastConfirmedMessageID)
|
||||||
newDeliverFilters := make([]options.DeliverFilter, 0, len(deliverFilters)+1)
|
newDeliverFilters := make([]options.DeliverFilter, 0, len(deliverFilters)+1)
|
||||||
for _, filter := range deliverFilters {
|
for _, filter := range deliverFilters {
|
||||||
if !options.IsDeliverFilterTimeTick(filter) {
|
if !options.IsDeliverFilterTimeTick(filter) {
|
||||||
|
|||||||
@ -177,7 +177,9 @@ func (impl *WALFlusherImpl) generateScanner(ctx context.Context, l wal.WAL) (wal
|
|||||||
}
|
}
|
||||||
if startMessageID := impl.flusherComponents.StartMessageID(); startMessageID != nil {
|
if startMessageID := impl.flusherComponents.StartMessageID(); startMessageID != nil {
|
||||||
impl.logger.Info("wal start to scan from minimum checkpoint", zap.Stringer("startMessageID", startMessageID))
|
impl.logger.Info("wal start to scan from minimum checkpoint", zap.Stringer("startMessageID", startMessageID))
|
||||||
readOpt.DeliverPolicy = options.DeliverPolicyStartAfter(startMessageID)
|
// !!! we always set the deliver policy to start from the last confirmed message id.
|
||||||
|
// because the catchup scanner at the streamingnode server must see the last confirmed message id if it's the last timetick.
|
||||||
|
readOpt.DeliverPolicy = options.DeliverPolicyStartFrom(startMessageID)
|
||||||
}
|
}
|
||||||
impl.logger.Info("wal start to scan from the beginning")
|
impl.logger.Info("wal start to scan from the beginning")
|
||||||
return l.Read(ctx, readOpt)
|
return l.Read(ctx, readOpt)
|
||||||
|
|||||||
@ -145,7 +145,8 @@ func (s *catchupScanner) consumeWithScanner(ctx context.Context, scanner walimpl
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if msg.MessageType() != message.MessageTypeTimeTick || s.writeAheadBuffer == nil {
|
if msg.MessageType() != message.MessageTypeTimeTick || s.writeAheadBuffer == nil {
|
||||||
// If there's no write ahead buffer, we cannot switch into tailing mode, so skip the checking.
|
// Only timetick message is keep the same order with the write ahead buffer.
|
||||||
|
// So we can only use the timetick message to catchup the write ahead buffer.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Here's a timetick message from the scanner, make tailing read if we catch up the writeahead buffer.
|
// Here's a timetick message from the scanner, make tailing read if we catch up the writeahead buffer.
|
||||||
|
|||||||
@ -74,6 +74,11 @@ func (q *pendingQueue) Push(msgs []message.ImmutableMessage) {
|
|||||||
q.evict(now)
|
q.evict(now)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LastTimeTick returns the last time tick of the buffer.
|
||||||
|
func (q *pendingQueue) LastTimeTick() uint64 {
|
||||||
|
return q.lastTimeTick
|
||||||
|
}
|
||||||
|
|
||||||
// Evict removes messages that have been in the buffer for longer than the keepAlive duration.
|
// Evict removes messages that have been in the buffer for longer than the keepAlive duration.
|
||||||
func (q *pendingQueue) Evict() {
|
func (q *pendingQueue) Evict() {
|
||||||
q.evict(time.Now())
|
q.evict(time.Now())
|
||||||
|
|||||||
@ -160,6 +160,11 @@ func (w *WriteAheadBuffer) createSnapshotFromTimeTick(ctx context.Context, timeT
|
|||||||
|
|
||||||
// error is eof, which means that the time tick is behind the message buffer.
|
// error is eof, which means that the time tick is behind the message buffer.
|
||||||
// The lastTimeTickMessage should always be greater or equal to the lastTimeTick in the pending queue.
|
// The lastTimeTickMessage should always be greater or equal to the lastTimeTick in the pending queue.
|
||||||
|
if w.pendingMessages.LastTimeTick() == timeTick {
|
||||||
|
offset := w.pendingMessages.CurrentOffset() + 1
|
||||||
|
w.cond.L.Unlock()
|
||||||
|
return nil, offset, nil
|
||||||
|
}
|
||||||
|
|
||||||
if w.lastTimeTickMessage.TimeTick() > timeTick {
|
if w.lastTimeTickMessage.TimeTick() > timeTick {
|
||||||
// check if the last time tick is greater than the given time tick, return it to update the timetick.
|
// check if the last time tick is greater than the given time tick, return it to update the timetick.
|
||||||
@ -170,11 +175,6 @@ func (w *WriteAheadBuffer) createSnapshotFromTimeTick(ctx context.Context, timeT
|
|||||||
w.cond.L.Unlock()
|
w.cond.L.Unlock()
|
||||||
return []messageWithOffset{msg}, msg.Offset, nil
|
return []messageWithOffset{msg}, msg.Offset, nil
|
||||||
}
|
}
|
||||||
if w.lastTimeTickMessage.TimeTick() == timeTick {
|
|
||||||
offset := w.pendingMessages.CurrentOffset() + 1
|
|
||||||
w.cond.L.Unlock()
|
|
||||||
return nil, offset, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := w.cond.Wait(ctx); err != nil {
|
if err := w.cond.Wait(ctx); err != nil {
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user