diff --git a/pkg/mq/msgstream/mq_msgstream.go b/pkg/mq/msgstream/mq_msgstream.go index e70d7c1c49..2d4706ef1a 100644 --- a/pkg/mq/msgstream/mq_msgstream.go +++ b/pkg/mq/msgstream/mq_msgstream.go @@ -398,6 +398,8 @@ func (ms *mqMsgStream) receiveMsg(consumer mqwrapper.Consumer) { Msgs: []TsMsg{tsMsg}, StartPositions: []*msgpb.MsgPosition{tsMsg.Position()}, EndPositions: []*msgpb.MsgPosition{tsMsg.Position()}, + BeginTs: tsMsg.BeginTs(), + EndTs: tsMsg.EndTs(), } select { case ms.receiveBuf <- &msgPack: