diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 9d16959204..3ef9b47529 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -335,46 +335,25 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm return merr.Status(err), nil } - var position *msgpb.MsgPosition - deleteCheckpoint := channel.GetDeleteCheckpoint() - channelCheckpoint := channel.GetSeekPosition() - if deleteCheckpoint == nil { - // for compatibility with old version coord, which doesn't have delete checkpoint in VchannelInfo - log.Info("no delete checkpoint found, use seek position to seek", - zap.Time("seekPosition", tsoutil.PhysicalTime(channelCheckpoint.GetTimestamp())), - ) - position = &msgpb.MsgPosition{ - ChannelName: channelCheckpoint.GetChannelName(), - MsgID: channelCheckpoint.GetMsgID(), - Timestamp: channelCheckpoint.GetTimestamp(), - } - } else { - if channelCheckpoint.GetTimestamp() > deleteCheckpoint.GetTimestamp() { - msg := "channel seek position is greater than delete checkpoint, use delete checkpoint to seek" - log.Info(msg, - zap.Time("seekPosition", tsoutil.PhysicalTime(channelCheckpoint.GetTimestamp())), - zap.Time("deleteCheckpoint", tsoutil.PhysicalTime(deleteCheckpoint.GetTimestamp())), - ) - position = &msgpb.MsgPosition{ - ChannelName: deleteCheckpoint.GetChannelName(), - MsgID: deleteCheckpoint.GetMsgID(), - Timestamp: deleteCheckpoint.GetTimestamp(), - } - } else { - msg := "channel seek position is smaller than delete checkpoint, use seek position to seek" - log.Info(msg, - zap.Time("seekPosition", tsoutil.PhysicalTime(channelCheckpoint.GetTimestamp())), - zap.Time("deleteCheckpoint", tsoutil.PhysicalTime(deleteCheckpoint.GetTimestamp())), - ) - position = &msgpb.MsgPosition{ - ChannelName: channelCheckpoint.GetChannelName(), - MsgID: channelCheckpoint.GetMsgID(), - Timestamp: channelCheckpoint.GetTimestamp(), - } - } - } - - err = pipeline.ConsumeMsgStream(ctx, position) + // Use seekPosition directly to start consuming the message stream. + // + // Background: + // - seekPosition: channel checkpoint from DataCoord, represents the position where data has been persisted + // - deleteCheckpoint: the minimum startPosition among all L0 segments, indicates where unpersisted + // delete records begin + // + // Why we can use seekPosition directly: + // - L0 segments have already been loaded above (loadL0Segments), which contain delete records + // from [deleteCheckpoint, L0.endPosition] + // - The message stream will capture new delete records from [seekPosition, ∞) + // - DataCoord ensures that seekPosition is calculated based on channel checkpoint, which is updated + // after data (including deletes) is flushed, so L0 segments should cover up to seekPosition + // - Using seekPosition avoids redundant message consumption when seekPosition > deleteCheckpoint + log.Info("use channel seek position to seek", + zap.Time("seekPosition", tsoutil.PhysicalTime(channel.GetSeekPosition().GetTimestamp())), + zap.Time("deleteCheckpoint", tsoutil.PhysicalTime(channel.GetDeleteCheckpoint().GetTimestamp())), + ) + err = pipeline.ConsumeMsgStream(ctx, channel.GetSeekPosition()) if err != nil { err = merr.WrapErrServiceUnavailable(err.Error(), "InitPipelineFailed") log.Warn(err.Error(),