From 0d70d2b98c4ceb35bbe635524b135796b3252e76 Mon Sep 17 00:00:00 2001 From: wei liu Date: Mon, 29 Dec 2025 10:53:22 +0800 Subject: [PATCH] enhance: simplify seek position selection in WatchDmChannels (#46567) issue: #46566 Remove the complex comparison logic between seekPosition and deleteCheckpoint. Use seekPosition directly since: - L0 segments are loaded before consuming message stream, which contain delete records from [deleteCheckpoint, L0.endPosition] - DataCoord ensures seekPosition is based on channel checkpoint, updated after data (including deletes) is flushed - L0 segments should cover up to seekPosition, avoiding data loss - This eliminates redundant message consumption when seekPosition > deleteCheckpoint - Core invariant: L0 segments are loaded before consuming the DM channel stream and contain delete records for range [deleteCheckpoint, L0.endPosition]; DataCoord guarantees channel.GetSeekPosition() is derived from the channel checkpoint after data (including deletes) is flushed, so L0 segments collectively cover up to that seekPosition. - Change made: removed the prior branching that built a synthetic seek position from deleteCheckpoint vs. channel checkpoint and instead always calls channel.GetSeekPosition() (used directly in ConsumeMsgStream). Added an informational log comparing seekPosition and deleteCheckpoint. - Why the removed logic was redundant: deleteCheckpoint represented the smallest start position of L0 segments and was used to avoid re-consuming delete messages already present in loaded L0 segments. Because L0 segments already include deletes up to the channel checkpoint and DataCoord updates the channel checkpoint after flush, using deleteCheckpoint to alter the seek introduces duplicate consumption without benefit. - Why this is safe (no data loss/regression): L0 segments are guaranteed to be loaded before consumption, so deletes present in L0 cover the range up to channel.GetSeekPosition(); delete records earlier than deleteCheckpoint have been compacted to L1 and can be evicted from the delete buffer. The code path still calls ConsumeMsgStream with the channel seek position, preserving original consumption/error handling, so no messages are skipped and no additional delete application occurs beyond what L0/L1 already cover. Signed-off-by: Wei Liu --- internal/querynodev2/services.go | 59 ++++++++++---------------------- 1 file changed, 19 insertions(+), 40 deletions(-) diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 9d16959204..3ef9b47529 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -335,46 +335,25 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm return merr.Status(err), nil } - var position *msgpb.MsgPosition - deleteCheckpoint := channel.GetDeleteCheckpoint() - channelCheckpoint := channel.GetSeekPosition() - if deleteCheckpoint == nil { - // for compatibility with old version coord, which doesn't have delete checkpoint in VchannelInfo - log.Info("no delete checkpoint found, use seek position to seek", - zap.Time("seekPosition", tsoutil.PhysicalTime(channelCheckpoint.GetTimestamp())), - ) - position = &msgpb.MsgPosition{ - ChannelName: channelCheckpoint.GetChannelName(), - MsgID: channelCheckpoint.GetMsgID(), - Timestamp: channelCheckpoint.GetTimestamp(), - } - } else { - if channelCheckpoint.GetTimestamp() > deleteCheckpoint.GetTimestamp() { - msg := "channel seek position is greater than delete checkpoint, use delete checkpoint to seek" - log.Info(msg, - zap.Time("seekPosition", tsoutil.PhysicalTime(channelCheckpoint.GetTimestamp())), - zap.Time("deleteCheckpoint", tsoutil.PhysicalTime(deleteCheckpoint.GetTimestamp())), - ) - position = &msgpb.MsgPosition{ - ChannelName: deleteCheckpoint.GetChannelName(), - MsgID: deleteCheckpoint.GetMsgID(), - Timestamp: deleteCheckpoint.GetTimestamp(), - } - } else { - msg := "channel seek position is smaller than delete checkpoint, use seek position to seek" - log.Info(msg, - zap.Time("seekPosition", tsoutil.PhysicalTime(channelCheckpoint.GetTimestamp())), - zap.Time("deleteCheckpoint", tsoutil.PhysicalTime(deleteCheckpoint.GetTimestamp())), - ) - position = &msgpb.MsgPosition{ - ChannelName: channelCheckpoint.GetChannelName(), - MsgID: channelCheckpoint.GetMsgID(), - Timestamp: channelCheckpoint.GetTimestamp(), - } - } - } - - err = pipeline.ConsumeMsgStream(ctx, position) + // Use seekPosition directly to start consuming the message stream. + // + // Background: + // - seekPosition: channel checkpoint from DataCoord, represents the position where data has been persisted + // - deleteCheckpoint: the minimum startPosition among all L0 segments, indicates where unpersisted + // delete records begin + // + // Why we can use seekPosition directly: + // - L0 segments have already been loaded above (loadL0Segments), which contain delete records + // from [deleteCheckpoint, L0.endPosition] + // - The message stream will capture new delete records from [seekPosition, ∞) + // - DataCoord ensures that seekPosition is calculated based on channel checkpoint, which is updated + // after data (including deletes) is flushed, so L0 segments should cover up to seekPosition + // - Using seekPosition avoids redundant message consumption when seekPosition > deleteCheckpoint + log.Info("use channel seek position to seek", + zap.Time("seekPosition", tsoutil.PhysicalTime(channel.GetSeekPosition().GetTimestamp())), + zap.Time("deleteCheckpoint", tsoutil.PhysicalTime(channel.GetDeleteCheckpoint().GetTimestamp())), + ) + err = pipeline.ConsumeMsgStream(ctx, channel.GetSeekPosition()) if err != nil { err = merr.WrapErrServiceUnavailable(err.Error(), "InitPipelineFailed") log.Warn(err.Error(),