mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-28 22:45:26 +08:00
issue: #43897 also for issue: #46166 add ack_sync_up flag into broadcast message header, which indicates that whether the broadcast operation is need to be synced up between the streaming node and the coordinator. If the ack_sync_up is false, the broadcast operation will be acked once the recovery storage see the message at current vchannel, the fast ack operation can be applied to speed up the broadcast operation. If the ack_sync_up is true, the broadcast operation will be acked after the checkpoint of current vchannel reach current message. The fast ack operation can not be applied to speed up the broadcast operation, because the ack operation need to be synced up with streaming node. e.g. if truncate collection operation want to call ack once callback after the all segment are flushed at current vchannel, it should set the ack_sync_up to be true. TODO: current implementation doesn't promise the ack sync up semantic, it only promise FastAck operation will not be applied, wait for 3.0 to implement the ack sync up semantic. only for truncate api now. --------- Signed-off-by: chyezh <chyezh@outlook.com>
429 lines
14 KiB
Go
429 lines
14 KiB
Go
package message
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"reflect"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/messagespb"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
|
)
|
|
|
|
// NewMutableMessageBeforeAppend creates a new mutable message.
|
|
// !!! Only used at server side for streamingnode internal service, don't use it at client side.
|
|
func NewMutableMessageBeforeAppend(payload []byte, properties map[string]string) MutableMessage {
|
|
m := &messageImpl{
|
|
payload: payload,
|
|
properties: properties,
|
|
}
|
|
return m
|
|
}
|
|
|
|
// NewBroadcastMutableMessageBeforeAppend creates a new broadcast mutable message.
|
|
// !!! Only used at server side for streamingcoord internal service, don't use it at client side.
|
|
func NewBroadcastMutableMessageBeforeAppend(payload []byte, properties map[string]string) BroadcastMutableMessage {
|
|
m := &messageImpl{
|
|
payload: payload,
|
|
properties: properties,
|
|
}
|
|
if !m.properties.Exist(messageBroadcastHeader) {
|
|
panic("current message is not a broadcast message")
|
|
}
|
|
return m
|
|
}
|
|
|
|
// NewImmutableMessage creates a new immutable message.
|
|
// !!! Only used at server side for streaming internal service, don't use it at client side.
|
|
func NewImmutableMesasge(
|
|
id MessageID,
|
|
payload []byte,
|
|
properties map[string]string,
|
|
) ImmutableMessage {
|
|
return &immutableMessageImpl{
|
|
id: id,
|
|
messageImpl: messageImpl{
|
|
payload: payload,
|
|
properties: properties,
|
|
},
|
|
}
|
|
}
|
|
|
|
// MustNewReplicateMessage creates a new replicate message.
|
|
func MustNewReplicateMessage(clustrID string, im *commonpb.ImmutableMessage) ReplicateMutableMessage {
|
|
m, err := NewReplicateMessage(clustrID, im)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return m
|
|
}
|
|
|
|
// NewReplicateMessage creates a new replicate message.
|
|
func NewReplicateMessage(clustrID string, im *commonpb.ImmutableMessage) (ReplicateMutableMessage, error) {
|
|
messageID := MustUnmarshalMessageID(im.GetId())
|
|
msg := NewImmutableMesasge(messageID, im.GetPayload(), im.GetProperties()).(*immutableMessageImpl)
|
|
if msg.ReplicateHeader() != nil {
|
|
return nil, errors.New("message is already a replicate message")
|
|
}
|
|
|
|
m := &messageImpl{
|
|
payload: msg.payload,
|
|
properties: msg.properties.Clone(),
|
|
}
|
|
m.properties.Delete(messageLastConfirmedIDSameWithMessageID)
|
|
m.properties.Delete(messageTimeTick)
|
|
m.properties.Delete(messageLastConfirmed)
|
|
m.properties.Delete(messageWALTerm)
|
|
m.WithReplicateHeader(&ReplicateHeader{
|
|
ClusterID: clustrID,
|
|
MessageID: msg.MessageID(),
|
|
LastConfirmedMessageID: msg.LastConfirmedMessageID(),
|
|
TimeTick: msg.TimeTick(),
|
|
VChannel: msg.VChannel(),
|
|
})
|
|
return m, nil
|
|
}
|
|
|
|
func MilvusMessageToImmutableMessage(im *commonpb.ImmutableMessage) ImmutableMessage {
|
|
messageID := MustUnmarshalMessageID(im.GetId())
|
|
msg := NewImmutableMesasge(messageID, im.GetPayload(), im.GetProperties())
|
|
return msg
|
|
}
|
|
|
|
func ImmutableMessageToMilvusMessage(walName string, im ImmutableMessage) *commonpb.ImmutableMessage {
|
|
msg := im.IntoImmutableMessageProto()
|
|
return &commonpb.ImmutableMessage{
|
|
Id: im.MessageID().IntoProto(),
|
|
Payload: msg.GetPayload(),
|
|
Properties: msg.GetProperties(),
|
|
}
|
|
}
|
|
|
|
// newMutableMessageBuilder creates a new builder.
|
|
// Should only used at client side.
|
|
func newMutableMessageBuilder[H proto.Message, B proto.Message]() *mutableMesasgeBuilder[H, B] {
|
|
messageType := MustGetMessageTypeWithVersion[H, B]()
|
|
properties := make(propertiesImpl)
|
|
properties.Set(messageTypeKey, messageType.MessageType.marshal())
|
|
properties.Set(messageVersion, messageType.Version.String())
|
|
return &mutableMesasgeBuilder[H, B]{
|
|
properties: properties,
|
|
}
|
|
}
|
|
|
|
// mutableMesasgeBuilder is the builder for message.
|
|
type mutableMesasgeBuilder[H proto.Message, B proto.Message] struct {
|
|
header H
|
|
body B
|
|
properties propertiesImpl
|
|
cipherConfig *CipherConfig
|
|
allVChannel bool
|
|
}
|
|
|
|
// WithMessageHeader creates a new builder with determined message type.
|
|
func (b *mutableMesasgeBuilder[H, B]) WithHeader(h H) *mutableMesasgeBuilder[H, B] {
|
|
b.header = h
|
|
return b
|
|
}
|
|
|
|
// WithNotPersist creates a new builder with not persisted property.
|
|
func (b *mutableMesasgeBuilder[H, B]) WithNotPersisted() *mutableMesasgeBuilder[H, B] {
|
|
messageType := MustGetMessageTypeWithVersion[H, B]()
|
|
if messageType.MessageType != MessageTypeTimeTick {
|
|
panic("only time tick message can be not persisted")
|
|
}
|
|
b.WithProperty(messageNotPersisteted, "")
|
|
return b
|
|
}
|
|
|
|
// WithBody creates a new builder with message body.
|
|
func (b *mutableMesasgeBuilder[H, B]) WithBody(body B) *mutableMesasgeBuilder[H, B] {
|
|
b.body = body
|
|
return b
|
|
}
|
|
|
|
// WithVChannel creates a new builder with virtual channel.
|
|
func (b *mutableMesasgeBuilder[H, B]) WithVChannel(vchannel string) *mutableMesasgeBuilder[H, B] {
|
|
if b.allVChannel {
|
|
panic("a all vchannel message cannot set up vchannel property")
|
|
}
|
|
b.WithProperty(messageVChannel, vchannel)
|
|
return b
|
|
}
|
|
|
|
// OptBuildBroadcast is the option for building broadcast message.
|
|
type OptBuildBroadcast func(*messagespb.BroadcastHeader)
|
|
|
|
// OptBuildBroadcastAckSyncUp sets the ack sync up of the broadcast message.
|
|
// Whether the broadcast operation is need to be synced up between the streaming node and the coordinator.
|
|
// If set, the broadcast operation will be acked after the checkpoint of current vchannel reach current message.
|
|
// the fast ack operation can not be applied to speed up the broadcast operation, because the ack operation need to be synced up with streaming node.
|
|
// TODO: current implementation doesn't promise the ack sync up semantic,
|
|
// it only promise FastAck operation will not be applied, wait for 3.0 to implement the ack sync up semantic.
|
|
// only for truncate api now.
|
|
func OptBuildBroadcastAckSyncUp() OptBuildBroadcast {
|
|
return func(bh *messagespb.BroadcastHeader) {
|
|
bh.AckSyncUp = true
|
|
}
|
|
}
|
|
|
|
// WithBroadcast creates a new builder with broadcast property.
|
|
func (b *mutableMesasgeBuilder[H, B]) WithBroadcast(vchannels []string, opts ...OptBuildBroadcast) *mutableMesasgeBuilder[H, B] {
|
|
if len(vchannels) < 1 {
|
|
panic("broadcast message must have at least one vchannel")
|
|
}
|
|
if b.allVChannel {
|
|
panic("a all vchannel message cannot set up vchannel property")
|
|
}
|
|
if b.properties.Exist(messageVChannel) {
|
|
panic("a broadcast message cannot set up vchannel property")
|
|
}
|
|
deduplicated := typeutil.NewSet(vchannels...)
|
|
bhpb := &messagespb.BroadcastHeader{
|
|
Vchannels: deduplicated.Collect(),
|
|
}
|
|
for _, opt := range opts {
|
|
opt(bhpb)
|
|
}
|
|
|
|
bh, err := EncodeProto(bhpb)
|
|
if err != nil {
|
|
panic("failed to encode vchannels")
|
|
}
|
|
b.properties.Set(messageBroadcastHeader, bh)
|
|
return b
|
|
}
|
|
|
|
// WithAllVChannel creates a new builder with all vchannel property.
|
|
func (b *mutableMesasgeBuilder[H, B]) WithAllVChannel() *mutableMesasgeBuilder[H, B] {
|
|
if b.properties.Exist(messageVChannel) || b.properties.Exist(messageBroadcastHeader) {
|
|
panic("a vchannel or broadcast message cannot set up all vchannel property")
|
|
}
|
|
b.allVChannel = true
|
|
return b
|
|
}
|
|
|
|
// WithProperty creates a new builder with message property.
|
|
// A key started with '_' is reserved for streaming system, should never used at user of client.
|
|
func (b *mutableMesasgeBuilder[H, B]) WithProperty(key string, val string) *mutableMesasgeBuilder[H, B] {
|
|
b.properties.Set(key, val)
|
|
return b
|
|
}
|
|
|
|
// WithProperties creates a new builder with message properties.
|
|
// A key started with '_' is reserved for streaming system, should never used at user of client.
|
|
func (b *mutableMesasgeBuilder[H, B]) WithProperties(kvs map[string]string) *mutableMesasgeBuilder[H, B] {
|
|
for key, val := range kvs {
|
|
b.properties.Set(key, val)
|
|
}
|
|
return b
|
|
}
|
|
|
|
// WithCipher creates a new builder with cipher property.
|
|
func (b *mutableMesasgeBuilder[H, B]) WithCipher(cipherConfig *CipherConfig) *mutableMesasgeBuilder[H, B] {
|
|
b.cipherConfig = cipherConfig
|
|
return b
|
|
}
|
|
|
|
// BuildMutable builds a mutable message.
|
|
// Panic if not set payload and message type.
|
|
// should only used at client side.
|
|
func (b *mutableMesasgeBuilder[H, B]) BuildMutable() (MutableMessage, error) {
|
|
if !b.allVChannel && !b.properties.Exist(messageVChannel) {
|
|
panic("a non broadcast message builder not ready for vchannel field")
|
|
}
|
|
|
|
msg, err := b.build()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return msg, nil
|
|
}
|
|
|
|
// MustBuildMutable builds a mutable message.
|
|
// Panics if build failed.
|
|
func (b *mutableMesasgeBuilder[H, B]) MustBuildMutable() MutableMessage {
|
|
msg, err := b.BuildMutable()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return msg
|
|
}
|
|
|
|
// BuildBroadcast builds a broad mutable message.
|
|
// Panic if not set payload and message type.
|
|
// should only used at client side.
|
|
func (b *mutableMesasgeBuilder[H, B]) BuildBroadcast() (BroadcastMutableMessage, error) {
|
|
if !b.properties.Exist(messageBroadcastHeader) {
|
|
panic("a broadcast message builder not ready for vchannel field")
|
|
}
|
|
|
|
msg, err := b.build()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return msg, nil
|
|
}
|
|
|
|
// MustBuildBroadcast build broadcast message
|
|
// Panics if build failed.
|
|
func (b *mutableMesasgeBuilder[H, B]) MustBuildBroadcast() BroadcastMutableMessage {
|
|
msg, err := b.BuildBroadcast()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return msg
|
|
}
|
|
|
|
// build builds a message.
|
|
func (b *mutableMesasgeBuilder[H, B]) build() (*messageImpl, error) {
|
|
// payload and header must be a pointer
|
|
if reflect.ValueOf(b.header).IsNil() {
|
|
panic("message builder not ready for header field")
|
|
}
|
|
if reflect.ValueOf(b.body).IsNil() {
|
|
panic("message builder not ready for body field")
|
|
}
|
|
|
|
// setup header.
|
|
sp, err := EncodeProto(b.header)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to encode header")
|
|
}
|
|
b.properties.Set(messageHeader, sp)
|
|
|
|
payload, err := proto.Marshal(b.body)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to marshal body")
|
|
}
|
|
if b.cipherConfig != nil {
|
|
messageType := MustGetMessageTypeWithVersion[H, B]()
|
|
if !messageType.MessageType.CanEnableCipher() {
|
|
panic(fmt.Sprintf("the message type cannot enable cipher, %s", messageType))
|
|
}
|
|
|
|
cipher := mustGetCipher()
|
|
encryptor, safeKey, err := cipher.GetEncryptor(b.cipherConfig.EzID, b.cipherConfig.CollectionID)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to get encryptor")
|
|
}
|
|
payloadBytes := len(payload)
|
|
if payload, err = encryptor.Encrypt(payload); err != nil {
|
|
return nil, errors.Wrap(err, "failed to encrypt payload")
|
|
}
|
|
ch, err := EncodeProto(&messagespb.CipherHeader{
|
|
EzId: b.cipherConfig.EzID,
|
|
CollectionId: b.cipherConfig.CollectionID,
|
|
SafeKey: safeKey,
|
|
PayloadBytes: int64(payloadBytes),
|
|
})
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to encode cipher header")
|
|
}
|
|
b.properties.Set(messageCipherHeader, ch)
|
|
}
|
|
return &messageImpl{
|
|
payload: payload,
|
|
properties: b.properties,
|
|
}, nil
|
|
}
|
|
|
|
// NewImmutableTxnMessageBuilder creates a new txn builder.
|
|
func NewImmutableTxnMessageBuilder(begin ImmutableBeginTxnMessageV2) *ImmutableTxnMessageBuilder {
|
|
return &ImmutableTxnMessageBuilder{
|
|
txnCtx: *begin.TxnContext(),
|
|
begin: begin,
|
|
messages: make([]ImmutableMessage, 0),
|
|
}
|
|
}
|
|
|
|
// ImmutableTxnMessageBuilder is a builder for txn message.
|
|
type ImmutableTxnMessageBuilder struct {
|
|
txnCtx TxnContext
|
|
begin ImmutableBeginTxnMessageV2
|
|
messages []ImmutableMessage
|
|
}
|
|
|
|
// ExpiredTimeTick returns the expired time tick of the txn.
|
|
func (b *ImmutableTxnMessageBuilder) ExpiredTimeTick() uint64 {
|
|
if b.txnCtx.Keepalive == TxnKeepaliveInfinite {
|
|
return math.MaxUint64
|
|
}
|
|
if len(b.messages) > 0 {
|
|
return tsoutil.AddPhysicalDurationOnTs(b.messages[len(b.messages)-1].TimeTick(), b.txnCtx.Keepalive)
|
|
}
|
|
return tsoutil.AddPhysicalDurationOnTs(b.begin.TimeTick(), b.txnCtx.Keepalive)
|
|
}
|
|
|
|
// Push pushes a message into the txn builder.
|
|
func (b *ImmutableTxnMessageBuilder) Add(msg ImmutableMessage) *ImmutableTxnMessageBuilder {
|
|
b.messages = append(b.messages, msg)
|
|
return b
|
|
}
|
|
|
|
// EstimateSize estimates the size of the txn message.
|
|
func (b *ImmutableTxnMessageBuilder) EstimateSize() int {
|
|
size := b.begin.EstimateSize()
|
|
for _, m := range b.messages {
|
|
size += m.EstimateSize()
|
|
}
|
|
return size
|
|
}
|
|
|
|
// LastConfirmedMessageID returns the last confirmed message id of the txn.
|
|
func (b *ImmutableTxnMessageBuilder) LastConfirmedMessageID() MessageID {
|
|
return b.begin.LastConfirmedMessageID()
|
|
}
|
|
|
|
// Messages returns the begin message and body messages.
|
|
func (b *ImmutableTxnMessageBuilder) Messages() (ImmutableBeginTxnMessageV2, []ImmutableMessage) {
|
|
return b.begin, b.messages
|
|
}
|
|
|
|
// Build builds a txn message.
|
|
func (b *ImmutableTxnMessageBuilder) Build(commit ImmutableCommitTxnMessageV2) (ImmutableTxnMessage, error) {
|
|
msg, err := newImmutableTxnMesasgeFromWAL(b.begin, b.messages, commit)
|
|
b.begin = nil
|
|
b.messages = nil
|
|
return msg, err
|
|
}
|
|
|
|
// newImmutableTxnMesasgeFromWAL creates a new immutable transaction message.
|
|
func newImmutableTxnMesasgeFromWAL(
|
|
begin ImmutableBeginTxnMessageV2,
|
|
body []ImmutableMessage,
|
|
commit ImmutableCommitTxnMessageV2,
|
|
) (ImmutableTxnMessage, error) {
|
|
// combine begin and commit messages into one.
|
|
msg, err := newMutableMessageBuilder[*TxnMessageHeader, *TxnMessageBody]().
|
|
WithHeader(&TxnMessageHeader{}).
|
|
WithBody(&TxnMessageBody{}).
|
|
WithVChannel(begin.VChannel()).
|
|
BuildMutable()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// begin message will be used to replicate, so we also need to set it timetick and last confirmed message id into committed message.
|
|
var beginImmutable ImmutableMessage = begin
|
|
beginImmutable = beginImmutable.(*specializedImmutableMessageImpl[*BeginTxnMessageHeader, *BeginTxnMessageBody]).cloneForTxnBody(commit.TimeTick(), commit.LastConfirmedMessageID())
|
|
for idx, m := range body {
|
|
body[idx] = m.(*immutableMessageImpl).cloneForTxnBody(commit.TimeTick(), commit.LastConfirmedMessageID())
|
|
}
|
|
immutableMessage := msg.WithTimeTick(commit.TimeTick()).
|
|
WithLastConfirmed(commit.LastConfirmedMessageID()).
|
|
WithTxnContext(*commit.TxnContext()).
|
|
WithReplicateHeader(commit.ReplicateHeader()).
|
|
IntoImmutableMessage(commit.MessageID())
|
|
return &immutableTxnMessageImpl{
|
|
immutableMessageImpl: *immutableMessage.(*immutableMessageImpl),
|
|
begin: MustAsImmutableBeginTxnMessageV2(beginImmutable),
|
|
messages: body,
|
|
commit: commit,
|
|
}, nil
|
|
}
|