mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
fix: Use delete checkpoint to prevent delete record loss in L0 refactoring (#42628)
issue: #39333 #41570 Fix delete record missing issue introduced in PR #39552 L0 refactoring: - Use delete checkpoint as consume start position when deleteCP < channelCP - Add logging when delete checkpoint is used instead of seek position - Prevent delete record loss when deleteCP is earlier than default channelCP Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
36a4b74fc0
commit
f3fe117840
@ -322,11 +322,32 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
|
||||
log.Warn(msg, zap.Error(err))
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
position := &msgpb.MsgPosition{
|
||||
|
||||
var position *msgpb.MsgPosition
|
||||
if channel.GetSeekPosition().GetTimestamp() > channel.GetDeleteCheckpoint().GetTimestamp() {
|
||||
msg := "channel seek position is greater than delete checkpoint, use delete checkpoint to seek"
|
||||
log.Info(msg,
|
||||
zap.Time("seekPosition", tsoutil.PhysicalTime(channel.GetSeekPosition().GetTimestamp())),
|
||||
zap.Time("deleteCheckpoint", tsoutil.PhysicalTime(channel.GetDeleteCheckpoint().GetTimestamp())),
|
||||
)
|
||||
position = &msgpb.MsgPosition{
|
||||
ChannelName: channel.DeleteCheckpoint.ChannelName,
|
||||
MsgID: channel.DeleteCheckpoint.MsgID,
|
||||
Timestamp: channel.DeleteCheckpoint.Timestamp,
|
||||
}
|
||||
} else {
|
||||
msg := "channel seek position is smaller than delete checkpoint, use seek position to seek"
|
||||
log.Info(msg,
|
||||
zap.Time("seekPosition", tsoutil.PhysicalTime(channel.GetSeekPosition().GetTimestamp())),
|
||||
zap.Time("deleteCheckpoint", tsoutil.PhysicalTime(channel.GetDeleteCheckpoint().GetTimestamp())),
|
||||
)
|
||||
position = &msgpb.MsgPosition{
|
||||
ChannelName: channel.SeekPosition.ChannelName,
|
||||
MsgID: channel.SeekPosition.MsgID,
|
||||
Timestamp: channel.SeekPosition.Timestamp,
|
||||
}
|
||||
}
|
||||
|
||||
err = pipeline.ConsumeMsgStream(ctx, position)
|
||||
if err != nil {
|
||||
err = merr.WrapErrServiceUnavailable(err.Error(), "InitPipelineFailed")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user