mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
fix: Fix potential panic when DeleteCheckpoint is nil (#42664)
issue: #42663 Fix panic issue when processing VchannelInfo messages from older coordinator versions that don't have DeleteCheckpoint field. Changes: - Add null safety check for DeleteCheckpoint before accessing methods - Maintain backward compatibility with legacy message formats - Improve seek position selection logic for both old and new versions --------- Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
cbed31933a
commit
78c39edbce
@ -324,27 +324,41 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
|
||||
}
|
||||
|
||||
var position *msgpb.MsgPosition
|
||||
if channel.GetSeekPosition().GetTimestamp() > channel.GetDeleteCheckpoint().GetTimestamp() {
|
||||
msg := "channel seek position is greater than delete checkpoint, use delete checkpoint to seek"
|
||||
log.Info(msg,
|
||||
zap.Time("seekPosition", tsoutil.PhysicalTime(channel.GetSeekPosition().GetTimestamp())),
|
||||
zap.Time("deleteCheckpoint", tsoutil.PhysicalTime(channel.GetDeleteCheckpoint().GetTimestamp())),
|
||||
deleteCheckpoint := channel.GetDeleteCheckpoint()
|
||||
channelCheckpoint := channel.GetSeekPosition()
|
||||
if deleteCheckpoint == nil {
|
||||
// for compatibility with old version coord, which doesn't have delete checkpoint in VchannelInfo
|
||||
log.Info("no delete checkpoint found, use seek position to seek",
|
||||
zap.Time("seekPosition", tsoutil.PhysicalTime(channelCheckpoint.GetTimestamp())),
|
||||
)
|
||||
position = &msgpb.MsgPosition{
|
||||
ChannelName: channel.DeleteCheckpoint.ChannelName,
|
||||
MsgID: channel.DeleteCheckpoint.MsgID,
|
||||
Timestamp: channel.DeleteCheckpoint.Timestamp,
|
||||
ChannelName: channelCheckpoint.GetChannelName(),
|
||||
MsgID: channelCheckpoint.GetMsgID(),
|
||||
Timestamp: channelCheckpoint.GetTimestamp(),
|
||||
}
|
||||
} else {
|
||||
msg := "channel seek position is smaller than delete checkpoint, use seek position to seek"
|
||||
log.Info(msg,
|
||||
zap.Time("seekPosition", tsoutil.PhysicalTime(channel.GetSeekPosition().GetTimestamp())),
|
||||
zap.Time("deleteCheckpoint", tsoutil.PhysicalTime(channel.GetDeleteCheckpoint().GetTimestamp())),
|
||||
)
|
||||
position = &msgpb.MsgPosition{
|
||||
ChannelName: channel.SeekPosition.ChannelName,
|
||||
MsgID: channel.SeekPosition.MsgID,
|
||||
Timestamp: channel.SeekPosition.Timestamp,
|
||||
if channelCheckpoint.GetTimestamp() > deleteCheckpoint.GetTimestamp() {
|
||||
msg := "channel seek position is greater than delete checkpoint, use delete checkpoint to seek"
|
||||
log.Info(msg,
|
||||
zap.Time("seekPosition", tsoutil.PhysicalTime(channelCheckpoint.GetTimestamp())),
|
||||
zap.Time("deleteCheckpoint", tsoutil.PhysicalTime(deleteCheckpoint.GetTimestamp())),
|
||||
)
|
||||
position = &msgpb.MsgPosition{
|
||||
ChannelName: deleteCheckpoint.GetChannelName(),
|
||||
MsgID: deleteCheckpoint.GetMsgID(),
|
||||
Timestamp: deleteCheckpoint.GetTimestamp(),
|
||||
}
|
||||
} else {
|
||||
msg := "channel seek position is smaller than delete checkpoint, use seek position to seek"
|
||||
log.Info(msg,
|
||||
zap.Time("seekPosition", tsoutil.PhysicalTime(channelCheckpoint.GetTimestamp())),
|
||||
zap.Time("deleteCheckpoint", tsoutil.PhysicalTime(deleteCheckpoint.GetTimestamp())),
|
||||
)
|
||||
position = &msgpb.MsgPosition{
|
||||
ChannelName: channelCheckpoint.GetChannelName(),
|
||||
MsgID: channelCheckpoint.GetMsgID(),
|
||||
Timestamp: channelCheckpoint.GetTimestamp(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user