enhance: simplify seek position selection in WatchDmChannels (#46567)

issue: #46566
Remove the complex comparison logic between seekPosition and
deleteCheckpoint. Use seekPosition directly since:

- L0 segments are loaded before consuming message stream, which contain
delete records from [deleteCheckpoint, L0.endPosition]
- DataCoord ensures seekPosition is based on channel checkpoint, updated
after data (including deletes) is flushed
- L0 segments should cover up to seekPosition, avoiding data loss
- This eliminates redundant message consumption when seekPosition >
deleteCheckpoint

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
- Core invariant: L0 segments are loaded before consuming the DM channel
stream and contain delete records for range [deleteCheckpoint,
L0.endPosition]; DataCoord guarantees channel.GetSeekPosition() is
derived from the channel checkpoint after data (including deletes) is
flushed, so L0 segments collectively cover up to that seekPosition.
- Change made: removed the prior branching that built a synthetic seek
position from deleteCheckpoint vs. channel checkpoint and instead always
calls channel.GetSeekPosition() (used directly in ConsumeMsgStream).
Added an informational log comparing seekPosition and deleteCheckpoint.
- Why the removed logic was redundant: deleteCheckpoint represented the
smallest start position of L0 segments and was used to avoid
re-consuming delete messages already present in loaded L0 segments.
Because L0 segments already include deletes up to the channel checkpoint
and DataCoord updates the channel checkpoint after flush, using
deleteCheckpoint to alter the seek introduces duplicate consumption
without benefit.
- Why this is safe (no data loss/regression): L0 segments are guaranteed
to be loaded before consumption, so deletes present in L0 cover the
range up to channel.GetSeekPosition(); delete records earlier than
deleteCheckpoint have been compacted to L1 and can be evicted from the
delete buffer. The code path still calls ConsumeMsgStream with the
channel seek position, preserving original consumption/error handling,
so no messages are skipped and no additional delete application occurs
beyond what L0/L1 already cover.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2025-12-29 10:53:22 +08:00 committed by GitHub
parent 3c2cf2c066
commit 0d70d2b98c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -335,46 +335,25 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
return merr.Status(err), nil
}
var position *msgpb.MsgPosition
deleteCheckpoint := channel.GetDeleteCheckpoint()
channelCheckpoint := channel.GetSeekPosition()
if deleteCheckpoint == nil {
// for compatibility with old version coord, which doesn't have delete checkpoint in VchannelInfo
log.Info("no delete checkpoint found, use seek position to seek",
zap.Time("seekPosition", tsoutil.PhysicalTime(channelCheckpoint.GetTimestamp())),
)
position = &msgpb.MsgPosition{
ChannelName: channelCheckpoint.GetChannelName(),
MsgID: channelCheckpoint.GetMsgID(),
Timestamp: channelCheckpoint.GetTimestamp(),
}
} else {
if channelCheckpoint.GetTimestamp() > deleteCheckpoint.GetTimestamp() {
msg := "channel seek position is greater than delete checkpoint, use delete checkpoint to seek"
log.Info(msg,
zap.Time("seekPosition", tsoutil.PhysicalTime(channelCheckpoint.GetTimestamp())),
zap.Time("deleteCheckpoint", tsoutil.PhysicalTime(deleteCheckpoint.GetTimestamp())),
)
position = &msgpb.MsgPosition{
ChannelName: deleteCheckpoint.GetChannelName(),
MsgID: deleteCheckpoint.GetMsgID(),
Timestamp: deleteCheckpoint.GetTimestamp(),
}
} else {
msg := "channel seek position is smaller than delete checkpoint, use seek position to seek"
log.Info(msg,
zap.Time("seekPosition", tsoutil.PhysicalTime(channelCheckpoint.GetTimestamp())),
zap.Time("deleteCheckpoint", tsoutil.PhysicalTime(deleteCheckpoint.GetTimestamp())),
)
position = &msgpb.MsgPosition{
ChannelName: channelCheckpoint.GetChannelName(),
MsgID: channelCheckpoint.GetMsgID(),
Timestamp: channelCheckpoint.GetTimestamp(),
}
}
}
err = pipeline.ConsumeMsgStream(ctx, position)
// Use seekPosition directly to start consuming the message stream.
//
// Background:
// - seekPosition: channel checkpoint from DataCoord, represents the position where data has been persisted
// - deleteCheckpoint: the minimum startPosition among all L0 segments, indicates where unpersisted
// delete records begin
//
// Why we can use seekPosition directly:
// - L0 segments have already been loaded above (loadL0Segments), which contain delete records
// from [deleteCheckpoint, L0.endPosition]
// - The message stream will capture new delete records from [seekPosition, ∞)
// - DataCoord ensures that seekPosition is calculated based on channel checkpoint, which is updated
// after data (including deletes) is flushed, so L0 segments should cover up to seekPosition
// - Using seekPosition avoids redundant message consumption when seekPosition > deleteCheckpoint
log.Info("use channel seek position to seek",
zap.Time("seekPosition", tsoutil.PhysicalTime(channel.GetSeekPosition().GetTimestamp())),
zap.Time("deleteCheckpoint", tsoutil.PhysicalTime(channel.GetDeleteCheckpoint().GetTimestamp())),
)
err = pipeline.ConsumeMsgStream(ctx, channel.GetSeekPosition())
if err != nil {
err = merr.WrapErrServiceUnavailable(err.Error(), "InitPipelineFailed")
log.Warn(err.Error(),