chyezh c725416288
enhance: move streaming proto into pkg (#35284)
issue: #33285

- move streaming related proto into pkg.
- add v2 message type and change flush message into v2 message.

Signed-off-by: chyezh <chyezh@outlook.com>
2024-08-07 10:34:16 +08:00

150 lines
5.4 KiB
Go

package adaptor
import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
var unmashalerDispatcher = (&msgstream.ProtoUDFactory{}).NewUnmarshalDispatcher()
// FromMessageToMsgPack converts message to msgpack.
// Same TimeTick must be sent with same msgpack.
// !!! Msgs must be keep same time tick.
// TODO: remove this function after remove the msgstream implementation.
func NewMsgPackFromMessage(msgs ...message.ImmutableMessage) (*msgstream.MsgPack, error) {
if len(msgs) == 0 {
return nil, nil
}
allTsMsgs := make([]msgstream.TsMsg, 0, len(msgs))
var finalErr error
for _, msg := range msgs {
var tsMsg msgstream.TsMsg
var err error
switch msg.Version() {
case message.VersionOld:
tsMsg, err = fromMessageToTsMsgVOld(msg)
case message.VersionV1:
tsMsg, err = fromMessageToTsMsgV1(msg)
case message.VersionV2:
tsMsg, err = fromMessageToTsMsgV2(msg)
default:
panic("unsupported message version")
}
if err != nil {
finalErr = errors.CombineErrors(finalErr, errors.Wrapf(err, "Failed to convert message to msgpack, %v", msg.MessageID()))
continue
}
allTsMsgs = append(allTsMsgs, tsMsg)
}
if len(allTsMsgs) == 0 {
return nil, finalErr
}
// msgs is sorted by time tick.
// Postition use the last confirmed message id.
// 1. So use the first tsMsgs's Position can read all messages which timetick is greater or equal than the first tsMsgs's BeginTs.
// In other words, from the StartPositions, you can read the full msgPack.
// 2. Use the last tsMsgs's Position as the EndPosition, you can read all messages following the msgPack.
return &msgstream.MsgPack{
BeginTs: allTsMsgs[0].BeginTs(),
EndTs: allTsMsgs[len(allTsMsgs)-1].EndTs(),
Msgs: allTsMsgs,
StartPositions: []*msgstream.MsgPosition{allTsMsgs[0].Position()},
EndPositions: []*msgstream.MsgPosition{allTsMsgs[len(allTsMsgs)-1].Position()},
}, finalErr
}
func fromMessageToTsMsgVOld(msg message.ImmutableMessage) (msgstream.TsMsg, error) {
panic("Not implemented")
}
// fromMessageToTsMsgV1 converts message to ts message.
func fromMessageToTsMsgV1(msg message.ImmutableMessage) (msgstream.TsMsg, error) {
tsMsg, err := unmashalerDispatcher.Unmarshal(msg.Payload(), MustGetCommonpbMsgTypeFromMessageType(msg.MessageType()))
if err != nil {
return nil, errors.Wrap(err, "Failed to unmarshal message")
}
tsMsg.SetTs(msg.TimeTick())
tsMsg.SetPosition(&msgpb.MsgPosition{
ChannelName: msg.VChannel(),
// from the last confirmed message id, you can read all messages which timetick is greater or equal than current message id.
MsgID: MustGetMQWrapperIDFromMessage(msg.LastConfirmedMessageID()).Serialize(),
MsgGroup: "", // Not important any more.
Timestamp: msg.TimeTick(),
})
return recoverMessageFromHeader(tsMsg, msg)
}
// fromMessageToTsMsgV2 converts message to ts message.
func fromMessageToTsMsgV2(msg message.ImmutableMessage) (msgstream.TsMsg, error) {
var tsMsg msgstream.TsMsg
var err error
switch msg.MessageType() {
case message.MessageTypeFlush:
tsMsg, err = NewFlushMessageBody(msg)
default:
panic("unsupported message type")
}
if err != nil {
return nil, err
}
tsMsg.SetTs(msg.TimeTick())
tsMsg.SetPosition(&msgpb.MsgPosition{
ChannelName: msg.VChannel(),
// from the last confirmed message id, you can read all messages which timetick is greater or equal than current message id.
MsgID: MustGetMQWrapperIDFromMessage(msg.LastConfirmedMessageID()).Serialize(),
MsgGroup: "", // Not important any more.
Timestamp: msg.TimeTick(),
})
return tsMsg, nil
}
// recoverMessageFromHeader recovers message from header.
func recoverMessageFromHeader(tsMsg msgstream.TsMsg, msg message.ImmutableMessage) (msgstream.TsMsg, error) {
switch msg.MessageType() {
case message.MessageTypeInsert:
insertMessage, err := message.AsImmutableInsertMessageV1(msg)
if err != nil {
return nil, errors.Wrap(err, "Failed to convert message to insert message")
}
// insertMsg has multiple partition and segment assignment is done by insert message header.
// so recover insert message from header before send it.
return recoverInsertMsgFromHeader(tsMsg.(*msgstream.InsertMsg), insertMessage.Header(), msg.TimeTick())
default:
return tsMsg, nil
}
}
// recoverInsertMsgFromHeader recovers insert message from header.
func recoverInsertMsgFromHeader(insertMsg *msgstream.InsertMsg, header *message.InsertMessageHeader, timetick uint64) (msgstream.TsMsg, error) {
if insertMsg.GetCollectionID() != header.GetCollectionId() {
panic("unreachable code, collection id is not equal")
}
// header promise a batch insert on vchannel in future, so header has multiple partition.
var assignment *message.PartitionSegmentAssignment
for _, p := range header.Partitions {
if p.GetPartitionId() == insertMsg.GetPartitionID() {
assignment = p
break
}
}
if assignment.GetSegmentAssignment().GetSegmentId() == 0 {
panic("unreachable code, partition id is not exist")
}
insertMsg.SegmentID = assignment.GetSegmentAssignment().GetSegmentId()
// timetick should has been assign at streaming node.
// so overwrite the timetick on insertRequest.
timestamps := make([]uint64, insertMsg.GetNumRows())
for i := 0; i < len(timestamps); i++ {
timestamps[i] = timetick
}
insertMsg.Timestamps = timestamps
return insertMsg, nil
}