mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 10:08:42 +08:00
Pass endPosition with vchannel in datanode flowgraph (#20589)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com> Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
c5dc076b04
commit
db33ffa518
@ -126,12 +126,14 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
|
|||||||
pos.ChannelName = ibNode.channelName
|
pos.ChannelName = ibNode.channelName
|
||||||
startPositions = append(startPositions, pos)
|
startPositions = append(startPositions, pos)
|
||||||
}
|
}
|
||||||
|
fgMsg.startPositions = startPositions
|
||||||
endPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.endPositions))
|
endPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.endPositions))
|
||||||
for idx := range fgMsg.endPositions {
|
for idx := range fgMsg.endPositions {
|
||||||
pos := proto.Clone(fgMsg.endPositions[idx]).(*internalpb.MsgPosition)
|
pos := proto.Clone(fgMsg.endPositions[idx]).(*internalpb.MsgPosition)
|
||||||
pos.ChannelName = ibNode.channelName
|
pos.ChannelName = ibNode.channelName
|
||||||
endPositions = append(endPositions, pos)
|
endPositions = append(endPositions, pos)
|
||||||
}
|
}
|
||||||
|
fgMsg.endPositions = endPositions
|
||||||
|
|
||||||
if startPositions[0].Timestamp < ibNode.lastTimestamp {
|
if startPositions[0].Timestamp < ibNode.lastTimestamp {
|
||||||
// message stream should guarantee that this should not happen
|
// message stream should guarantee that this should not happen
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user