enhance: add message and msgstream msgpack adaptor (#34874)

issue: #33285

- make message builder and message conversion type safe
- add adaptor type and function to adapt old msgstream msgpack and
message interface

---------

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
chyezh 2024-07-22 20:59:42 +08:00 committed by GitHub
parent 6c19f9baf8
commit 39c7e06bc5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 1759 additions and 662 deletions

1
.gitignore vendored
View File

@ -103,4 +103,5 @@ cwrapper_rocksdb_build/
internal/proto/**/*.pb.go internal/proto/**/*.pb.go
internal/core/src/pb/*.pb.h internal/core/src/pb/*.pb.h
internal/core/src/pb/*.pb.cc internal/core/src/pb/*.pb.cc
*.pb.go
**/legacypb/*.pb.go **/legacypb/*.pb.go

View File

@ -59,16 +59,13 @@ func TestProducer(t *testing.T) {
assert.NotNil(t, producer) assert.NotNil(t, producer)
ch := make(chan struct{}) ch := make(chan struct{})
go func() { go func() {
msgID, err := producer.Produce(ctx, message.NewMutableMessageBuilder(). msg := message.CreateTestEmptyInsertMesage(1, nil)
WithMessageType(message.MessageTypeUnknown). msgID, err := producer.Produce(ctx, msg)
WithPayload([]byte{}).
BuildMutable())
assert.Error(t, err) assert.Error(t, err)
assert.Nil(t, msgID) assert.Nil(t, msgID)
msgID, err = producer.Produce(ctx, message.NewMutableMessageBuilder().
WithMessageType(message.MessageTypeUnknown). msg = message.CreateTestEmptyInsertMesage(1, nil)
WithPayload([]byte{}). msgID, err = producer.Produce(ctx, msg)
BuildMutable())
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, msgID) assert.NotNil(t, msgID)
close(ch) close(ch)
@ -101,10 +98,8 @@ func TestProducer(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel() defer cancel()
_, err = producer.Produce(ctx, message.NewMutableMessageBuilder(). msg := message.CreateTestEmptyInsertMesage(1, nil)
WithMessageType(message.MessageTypeUnknown). _, err = producer.Produce(ctx, msg)
WithPayload([]byte{}).
BuildMutable())
assert.ErrorIs(t, err, context.DeadlineExceeded) assert.ErrorIs(t, err, context.DeadlineExceeded)
assert.True(t, producer.IsAvailable()) assert.True(t, producer.IsAvailable())
producer.Close() producer.Close()

View File

@ -143,11 +143,7 @@ func (p *ProduceServer) recvLoop() (err error) {
// handleProduce handles the produce message request. // handleProduce handles the produce message request.
func (p *ProduceServer) handleProduce(req *streamingpb.ProduceMessageRequest) { func (p *ProduceServer) handleProduce(req *streamingpb.ProduceMessageRequest) {
p.logger.Debug("recv produce message from client", zap.Int64("requestID", req.RequestId)) p.logger.Debug("recv produce message from client", zap.Int64("requestID", req.RequestId))
msg := message.NewMutableMessageBuilder(). msg := message.NewMutableMessage(req.GetMessage().GetPayload(), req.GetMessage().GetProperties())
WithPayload(req.GetMessage().GetPayload()).
WithProperties(req.GetMessage().GetProperties()).
BuildMutable()
if err := p.validateMessage(msg); err != nil { if err := p.validateMessage(msg); err != nil {
p.logger.Warn("produce message validation failed", zap.Int64("requestID", req.RequestId), zap.Error(err)) p.logger.Warn("produce message validation failed", zap.Int64("requestID", req.RequestId), zap.Error(err))
p.sendProduceResult(req.RequestId, nil, err) p.sendProduceResult(req.RequestId, nil, err)

View File

@ -0,0 +1,131 @@
package adaptor
import (
"context"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type defaultMessageHandler chan message.ImmutableMessage
func (h defaultMessageHandler) Handle(ctx context.Context, upstream <-chan message.ImmutableMessage, msg message.ImmutableMessage) (incoming message.ImmutableMessage, ok bool, err error) {
var sendingCh chan message.ImmutableMessage
if msg != nil {
sendingCh = h
}
select {
case <-ctx.Done():
return nil, false, ctx.Err()
case msg, ok := <-upstream:
if !ok {
return nil, false, wal.ErrUpstreamClosed
}
return msg, false, nil
case sendingCh <- msg:
return nil, true, nil
}
}
func (d defaultMessageHandler) Close() {
close(d)
}
// NewMsgPackAdaptorHandler create a new message pack adaptor handler.
func NewMsgPackAdaptorHandler() *MsgPackAdaptorHandler {
return &MsgPackAdaptorHandler{
logger: log.With(),
channel: make(chan *msgstream.MsgPack),
pendings: make([]message.ImmutableMessage, 0),
pendingMsgPack: typeutil.NewMultipartQueue[*msgstream.MsgPack](),
}
}
// MsgPackAdaptorHandler is the handler for message pack.
type MsgPackAdaptorHandler struct {
logger *log.MLogger
channel chan *msgstream.MsgPack
pendings []message.ImmutableMessage // pendings hold the vOld message which has same time tick.
pendingMsgPack *typeutil.MultipartQueue[*msgstream.MsgPack] // pendingMsgPack hold unsent msgPack.
}
// Chan is the channel for message.
func (m *MsgPackAdaptorHandler) Chan() <-chan *msgstream.MsgPack {
return m.channel
}
// Handle is the callback for handling message.
func (m *MsgPackAdaptorHandler) Handle(ctx context.Context, upstream <-chan message.ImmutableMessage, msg message.ImmutableMessage) (incoming message.ImmutableMessage, ok bool, err error) {
// not handle new message if there are pending msgPack.
if msg != nil && m.pendingMsgPack.Len() == 0 {
m.generateMsgPack(msg)
ok = true
}
for {
var sendCh chan<- *msgstream.MsgPack
if m.pendingMsgPack.Len() != 0 {
sendCh = m.channel
}
select {
case <-ctx.Done():
return nil, ok, ctx.Err()
case msg, notClose := <-upstream:
if !notClose {
return nil, ok, wal.ErrUpstreamClosed
}
return msg, ok, nil
case sendCh <- m.pendingMsgPack.Next():
m.pendingMsgPack.UnsafeAdvance()
if m.pendingMsgPack.Len() > 0 {
continue
}
return nil, ok, nil
}
}
}
// generateMsgPack generate msgPack from message.
func (m *MsgPackAdaptorHandler) generateMsgPack(msg message.ImmutableMessage) {
switch msg.Version() {
case message.VersionOld:
if len(m.pendings) != 0 {
if msg.TimeTick() > m.pendings[0].TimeTick() {
m.addMsgPackIntoPending(m.pendings...)
m.pendings = nil
}
}
m.pendings = append(m.pendings, msg)
case message.VersionV1:
if len(m.pendings) != 0 { // all previous message should be vOld.
m.addMsgPackIntoPending(m.pendings...)
m.pendings = nil
}
m.addMsgPackIntoPending(msg)
default:
panic("unsupported message version")
}
}
// addMsgPackIntoPending add message into pending msgPack.
func (m *MsgPackAdaptorHandler) addMsgPackIntoPending(msgs ...message.ImmutableMessage) {
newPack, err := adaptor.NewMsgPackFromMessage(msgs...)
if err != nil {
m.logger.Warn("failed to convert message to msgpack", zap.Error(err))
}
if newPack != nil {
m.pendingMsgPack.AddOne(newPack)
}
}
// Close close the handler.
func (m *MsgPackAdaptorHandler) Close() {
close(m.channel)
}

View File

@ -0,0 +1,76 @@
package adaptor
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq"
)
func TestMsgPackAdaptorHandler(t *testing.T) {
messageID := rmq.NewRmqID(1)
tt := uint64(100)
msg := message.CreateTestInsertMessage(
t,
1,
1000,
tt,
messageID,
)
immutableMsg := msg.IntoImmutableMessage(messageID)
upstream := make(chan message.ImmutableMessage, 1)
ctx := context.Background()
h := NewMsgPackAdaptorHandler()
done := make(chan struct{})
go func() {
for range h.Chan() {
}
close(done)
}()
upstream <- immutableMsg
newMsg, ok, err := h.Handle(ctx, upstream, nil)
assert.Equal(t, newMsg, immutableMsg)
assert.False(t, ok)
assert.NoError(t, err)
newMsg, ok, err = h.Handle(ctx, upstream, newMsg)
assert.NoError(t, err)
assert.Nil(t, newMsg)
assert.True(t, ok)
h.Close()
<-done
}
func TestDefaultHandler(t *testing.T) {
h := make(defaultMessageHandler, 1)
done := make(chan struct{})
go func() {
for range h {
}
close(done)
}()
upstream := make(chan message.ImmutableMessage, 1)
msg := mock_message.NewMockImmutableMessage(t)
upstream <- msg
newMsg, ok, err := h.Handle(context.Background(), upstream, nil)
assert.NotNil(t, newMsg)
assert.NoError(t, err)
assert.False(t, ok)
assert.Equal(t, newMsg, msg)
newMsg, ok, err = h.Handle(context.Background(), upstream, newMsg)
assert.NoError(t, err)
assert.Nil(t, newMsg)
assert.True(t, ok)
h.Close()
<-done
}

View File

@ -10,6 +10,7 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/streaming/walimpls" "github.com/milvus-io/milvus/pkg/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/helper" "github.com/milvus-io/milvus/pkg/streaming/walimpls/helper"
"github.com/milvus-io/milvus/pkg/util/typeutil"
) )
var _ wal.Scanner = (*scannerAdaptorImpl)(nil) var _ wal.Scanner = (*scannerAdaptorImpl)(nil)
@ -21,13 +22,15 @@ func newScannerAdaptor(
readOption wal.ReadOption, readOption wal.ReadOption,
cleanup func(), cleanup func(),
) wal.Scanner { ) wal.Scanner {
if readOption.MesasgeHandler == nil {
readOption.MesasgeHandler = defaultMessageHandler(make(chan message.ImmutableMessage))
}
s := &scannerAdaptorImpl{ s := &scannerAdaptorImpl{
logger: log.With(zap.String("name", name), zap.String("channel", l.Channel().Name)), logger: log.With(zap.String("name", name), zap.String("channel", l.Channel().Name)),
innerWAL: l, innerWAL: l,
readOption: readOption, readOption: readOption,
sendingCh: make(chan message.ImmutableMessage, 1),
reorderBuffer: utility.NewReOrderBuffer(), reorderBuffer: utility.NewReOrderBuffer(),
pendingQueue: utility.NewImmutableMessageQueue(), pendingQueue: typeutil.NewMultipartQueue[message.ImmutableMessage](),
cleanup: cleanup, cleanup: cleanup,
ScannerHelper: helper.NewScannerHelper(name), ScannerHelper: helper.NewScannerHelper(name),
} }
@ -41,9 +44,8 @@ type scannerAdaptorImpl struct {
logger *log.MLogger logger *log.MLogger
innerWAL walimpls.WALImpls innerWAL walimpls.WALImpls
readOption wal.ReadOption readOption wal.ReadOption
sendingCh chan message.ImmutableMessage
reorderBuffer *utility.ReOrderByTimeTickBuffer // only support time tick reorder now. reorderBuffer *utility.ReOrderByTimeTickBuffer // only support time tick reorder now.
pendingQueue *utility.ImmutableMessageQueue // pendingQueue *typeutil.MultipartQueue[message.ImmutableMessage] //
cleanup func() cleanup func()
} }
@ -52,9 +54,9 @@ func (s *scannerAdaptorImpl) Channel() types.PChannelInfo {
return s.innerWAL.Channel() return s.innerWAL.Channel()
} }
// Chan returns the channel of message. // Chan returns the message channel of the scanner.
func (s *scannerAdaptorImpl) Chan() <-chan message.ImmutableMessage { func (s *scannerAdaptorImpl) Chan() <-chan message.ImmutableMessage {
return s.sendingCh return s.readOption.MesasgeHandler.(defaultMessageHandler)
} }
// Close the scanner, release the underlying resources. // Close the scanner, release the underlying resources.
@ -68,7 +70,7 @@ func (s *scannerAdaptorImpl) Close() error {
} }
func (s *scannerAdaptorImpl) executeConsume() { func (s *scannerAdaptorImpl) executeConsume() {
defer close(s.sendingCh) defer s.readOption.MesasgeHandler.Close()
innerScanner, err := s.innerWAL.Read(s.Context(), walimpls.ReadOption{ innerScanner, err := s.innerWAL.Read(s.Context(), walimpls.ReadOption{
Name: s.Name(), Name: s.Name(),
@ -83,36 +85,29 @@ func (s *scannerAdaptorImpl) executeConsume() {
for { for {
// generate the event channel and do the event loop. // generate the event channel and do the event loop.
// TODO: Consume from local cache. // TODO: Consume from local cache.
upstream, sending := s.getEventCh(innerScanner) upstream := s.getUpstream(innerScanner)
select {
case <-s.Context().Done(): msg, ok, err := s.readOption.MesasgeHandler.Handle(s.Context(), upstream, s.pendingQueue.Next())
if err != nil {
s.Finish(err) s.Finish(err)
return return
case msg, ok := <-upstream:
if !ok {
s.Finish(innerScanner.Error())
return
} }
s.handleUpstream(msg) if ok {
case sending <- s.pendingQueue.Next():
s.pendingQueue.UnsafeAdvance() s.pendingQueue.UnsafeAdvance()
} }
if msg != nil {
s.handleUpstream(msg)
}
} }
} }
func (s *scannerAdaptorImpl) getEventCh(scanner walimpls.ScannerImpls) (<-chan message.ImmutableMessage, chan<- message.ImmutableMessage) { func (s *scannerAdaptorImpl) getUpstream(scanner walimpls.ScannerImpls) <-chan message.ImmutableMessage {
if s.pendingQueue.Len() == 0 {
// If pending queue is empty,
// no more message can be sent,
// we always need to recv message from upstream to avoid starve.
return scanner.Chan(), nil
}
// TODO: configurable pending buffer count. // TODO: configurable pending buffer count.
// If the pending queue is full, we need to wait until it's consumed to avoid scanner overloading. // If the pending queue is full, we need to wait until it's consumed to avoid scanner overloading.
if s.pendingQueue.Len() > 16 { if s.pendingQueue.Len() > 16 {
return nil, s.sendingCh return nil
} }
return scanner.Chan(), s.sendingCh return scanner.Chan()
} }
func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) { func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) {

View File

@ -10,11 +10,9 @@ import (
"testing" "testing"
"time" "time"
"github.com/golang/protobuf/proto"
"github.com/remeh/sizedwaitgroup" "github.com/remeh/sizedwaitgroup"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource/idalloc" "github.com/milvus-io/milvus/internal/streamingnode/server/resource/idalloc"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/wal"
@ -154,26 +152,10 @@ func (f *testOneWALFramework) testAppend(ctx context.Context, w wal.WAL) ([]mess
time.Sleep(time.Duration(5+rand.Int31n(10)) * time.Millisecond) time.Sleep(time.Duration(5+rand.Int31n(10)) * time.Millisecond)
// ...rocksmq has a dirty implement of properties, // ...rocksmq has a dirty implement of properties,
// without commonpb.MsgHeader, it can not work. // without commonpb.MsgHeader, it can not work.
header := commonpb.MsgHeader{ msg := message.CreateTestEmptyInsertMesage(int64(i), map[string]string{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
MsgID: int64(i),
},
}
payload, err := proto.Marshal(&header)
if err != nil {
panic(err)
}
properties := map[string]string{
"id": fmt.Sprintf("%d", i), "id": fmt.Sprintf("%d", i),
"const": "t", "const": "t",
} })
typ := message.MessageTypeUnknown
msg := message.NewMutableMessageBuilder().
WithMessageType(typ).
WithPayload(payload).
WithProperties(properties).
BuildMutable()
id, err := w.Append(ctx, msg) id, err := w.Append(ctx, msg)
assert.NoError(f.t, err) assert.NoError(f.t, err)
assert.NotNil(f.t, id) assert.NotNil(f.t, id)
@ -181,27 +163,12 @@ func (f *testOneWALFramework) testAppend(ctx context.Context, w wal.WAL) ([]mess
}(i) }(i)
} }
swg.Wait() swg.Wait()
// send a final hint message
header := commonpb.MsgHeader{ msg := message.CreateTestEmptyInsertMesage(int64(f.messageCount-1), map[string]string{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
MsgID: int64(f.messageCount - 1),
},
}
payload, err := proto.Marshal(&header)
if err != nil {
panic(err)
}
properties := map[string]string{
"id": fmt.Sprintf("%d", f.messageCount-1), "id": fmt.Sprintf("%d", f.messageCount-1),
"const": "t", "const": "t",
"term": strconv.FormatInt(int64(f.term), 10), "term": strconv.FormatInt(int64(f.term), 10),
} })
msg := message.NewMutableMessageBuilder().
WithPayload(payload).
WithProperties(properties).
WithMessageType(message.MessageTypeUnknown).
BuildMutable()
id, err := w.Append(ctx, msg) id, err := w.Append(ctx, msg)
assert.NoError(f.t, err) assert.NoError(f.t, err)
messages[f.messageCount-1] = msg.IntoImmutableMessage(id) messages[f.messageCount-1] = msg.IntoImmutableMessage(id)

View File

@ -1,48 +1,28 @@
package timetick package timetick
import ( import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/commonpbutil"
) )
func newTimeTickMsg(ts uint64, sourceID int64) (message.MutableMessage, error) { func newTimeTickMsg(ts uint64, sourceID int64) (message.MutableMessage, error) {
// TODO: time tick should be put on properties, for compatibility, we put it on message body now. // TODO: time tick should be put on properties, for compatibility, we put it on message body now.
msgstreamMsg := &msgstream.TimeTickMsg{ // Common message's time tick is set on interceptor.
BaseMsg: msgstream.BaseMsg{ // TimeTickMsg's time tick should be set here.
BeginTimestamp: ts, msg, err := message.NewTimeTickMessageBuilderV1().
EndTimestamp: ts, WithMessageHeader(&message.TimeTickMessageHeader{}).
HashValues: []uint32{0}, WithPayload(&msgpb.TimeTickMsg{
},
TimeTickMsg: msgpb.TimeTickMsg{
Base: commonpbutil.NewMsgBase( Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_TimeTick), commonpbutil.WithMsgType(commonpb.MsgType_TimeTick),
commonpbutil.WithMsgID(0), commonpbutil.WithMsgID(0),
commonpbutil.WithTimeStamp(ts), commonpbutil.WithTimeStamp(ts),
commonpbutil.WithSourceID(sourceID), commonpbutil.WithSourceID(sourceID),
), ),
}, }).BuildMutable()
}
bytes, err := msgstreamMsg.Marshal(msgstreamMsg)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "marshal time tick message failed") return nil, err
} }
return msg.WithTimeTick(ts), nil
payload, ok := bytes.([]byte)
if !ok {
return nil, errors.New("marshal time tick message as []byte failed")
}
// Common message's time tick is set on interceptor.
// TimeTickMsg's time tick should be set here.
msg := message.NewMutableMessageBuilder().
WithMessageType(message.MessageTypeTimeTick).
WithPayload(payload).
BuildMutable().
WithTimeTick(ts)
return msg, nil
} }

View File

@ -1,6 +1,10 @@
package wal package wal
import ( import (
"context"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/options" "github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/streaming/util/types"
@ -8,15 +12,22 @@ import (
type MessageFilter = func(message.ImmutableMessage) bool type MessageFilter = func(message.ImmutableMessage) bool
var ErrUpstreamClosed = errors.New("upstream closed")
// ReadOption is the option for reading records from the wal. // ReadOption is the option for reading records from the wal.
type ReadOption struct { type ReadOption struct {
DeliverPolicy options.DeliverPolicy DeliverPolicy options.DeliverPolicy
MessageFilter MessageFilter MessageFilter MessageFilter
MesasgeHandler MessageHandler // message handler for message processing.
// If the message handler is nil (no redundant operation need to apply),
// the default message handler will be used, and the receiver will be returned from Chan.
// Otherwise, Chan will panic.
// vaild every message will be passed to this handler before being delivered to the consumer.
} }
// Scanner is the interface for reading records from the wal. // Scanner is the interface for reading records from the wal.
type Scanner interface { type Scanner interface {
// Chan returns the channel of message. // Chan returns the channel of message if Option.MessageHandler is nil.
Chan() <-chan message.ImmutableMessage Chan() <-chan message.ImmutableMessage
// Channel returns the channel assignment info of the wal. // Channel returns the channel assignment info of the wal.
@ -33,3 +44,19 @@ type Scanner interface {
// Return the error same with `Error` // Return the error same with `Error`
Close() error Close() error
} }
// MessageHandler is used to handle message read from log.
// TODO: should be removed in future after msgstream is removed.
type MessageHandler interface {
// Handle is the callback for handling message.
// The message will be passed to the handler for processing.
// Handle operation can be blocked, but should listen to the context.Done() and upstream.
// If the context is canceled, the handler should return immediately with ctx.Err.
// If the upstream is closed, the handler should return immediately with ErrUpstreamClosed.
// If the upstream recv a message, the handler should return the incoming message.
// If the handler handle the message successfully, it should return the ok=true.
Handle(ctx context.Context, upstream <-chan message.ImmutableMessage, msg message.ImmutableMessage) (incoming message.ImmutableMessage, ok bool, err error)
// Close is called after all messages are handled or handling is interrupted.
Close()
}

View File

@ -1,51 +0,0 @@
package utility
import "github.com/milvus-io/milvus/pkg/streaming/util/message"
// NewImmutableMessageQueue create a new immutable message queue.
func NewImmutableMessageQueue() *ImmutableMessageQueue {
return &ImmutableMessageQueue{
pendings: make([][]message.ImmutableMessage, 0),
cnt: 0,
}
}
// ImmutableMessageQueue is a queue of messages.
type ImmutableMessageQueue struct {
pendings [][]message.ImmutableMessage
cnt int
}
// Len return the queue size.
func (pq *ImmutableMessageQueue) Len() int {
return pq.cnt
}
// Add add a slice of message as pending one
func (pq *ImmutableMessageQueue) Add(msgs []message.ImmutableMessage) {
if len(msgs) == 0 {
return
}
pq.pendings = append(pq.pendings, msgs)
pq.cnt += len(msgs)
}
// Next return the next message in pending queue.
func (pq *ImmutableMessageQueue) Next() message.ImmutableMessage {
if len(pq.pendings) != 0 && len(pq.pendings[0]) != 0 {
return pq.pendings[0][0]
}
return nil
}
// UnsafeAdvance do a advance without check.
// !!! Should only be called `Next` do not return nil.
func (pq *ImmutableMessageQueue) UnsafeAdvance() {
if len(pq.pendings[0]) == 1 {
pq.pendings = pq.pendings[1:]
pq.cnt--
return
}
pq.pendings[0] = pq.pendings[0][1:]
pq.cnt--
}

View File

@ -1,25 +0,0 @@
package utility
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
func TestImmutableMessageQueue(t *testing.T) {
q := NewImmutableMessageQueue()
for i := 0; i < 100; i++ {
q.Add([]message.ImmutableMessage{
mock_message.NewMockImmutableMessage(t),
})
assert.Equal(t, i+1, q.Len())
}
for i := 100; i > 0; i-- {
assert.NotNil(t, q.Next())
q.UnsafeAdvance()
assert.Equal(t, i-1, q.Len())
}
}

View File

@ -45,6 +45,9 @@ func (bm *MockMsg) SetID(id msgstream.UniqueID) {
// do nothing // do nothing
} }
func (bm *MockMsg) SetTs(ts uint64) {
}
func (bm *MockMsg) BeginTs() Timestamp { func (bm *MockMsg) BeginTs() Timestamp {
return 0 return 0
} }

View File

@ -18,3 +18,6 @@ generate-mockery: getdeps
$(INSTALL_PATH)/mockery --name=Client --dir=$(PWD)/mq/msgdispatcher --output=$(PWD)/mq/msgsdispatcher --filename=mock_client.go --with-expecter --structname=MockClient --outpkg=msgdispatcher --inpackage $(INSTALL_PATH)/mockery --name=Client --dir=$(PWD)/mq/msgdispatcher --output=$(PWD)/mq/msgsdispatcher --filename=mock_client.go --with-expecter --structname=MockClient --outpkg=msgdispatcher --inpackage
$(INSTALL_PATH)/mockery --name=Logger --dir=$(PWD)/eventlog --output=$(PWD)/eventlog --filename=mock_logger.go --with-expecter --structname=MockLogger --outpkg=eventlog --inpackage $(INSTALL_PATH)/mockery --name=Logger --dir=$(PWD)/eventlog --output=$(PWD)/eventlog --filename=mock_logger.go --with-expecter --structname=MockLogger --outpkg=eventlog --inpackage
$(INSTALL_PATH)/mockery --name=MessageID --dir=$(PWD)/mq/msgstream/mqwrapper --output=$(PWD)/mq/msgstream/mqwrapper --filename=mock_id.go --with-expecter --structname=MockMessageID --outpkg=mqwrapper --inpackage $(INSTALL_PATH)/mockery --name=MessageID --dir=$(PWD)/mq/msgstream/mqwrapper --output=$(PWD)/mq/msgstream/mqwrapper --filename=mock_id.go --with-expecter --structname=MockMessageID --outpkg=mqwrapper --inpackage
generate-proto:
$(ROOTPATH)/cmake_build/bin/protoc --proto_path=$(PWD)/streaming/util/message/messagepb --go_out=plugins=grpc,paths=source_relative:./streaming/util/message/messagepb $(PWD)/streaming/util/message/messagepb/message.proto

View File

@ -384,6 +384,11 @@ func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) (map[string][]MessageID, erro
} }
func (ms *mqMsgStream) getTsMsgFromConsumerMsg(msg common.Message) (TsMsg, error) { func (ms *mqMsgStream) getTsMsgFromConsumerMsg(msg common.Message) (TsMsg, error) {
return GetTsMsgFromConsumerMsg(ms.unmarshal, msg)
}
// GetTsMsgFromConsumerMsg get TsMsg from consumer message
func GetTsMsgFromConsumerMsg(unmarshalDispatcher UnmarshalDispatcher, msg common.Message) (TsMsg, error) {
header := commonpb.MsgHeader{} header := commonpb.MsgHeader{}
if msg.Payload() == nil { if msg.Payload() == nil {
return nil, fmt.Errorf("failed to unmarshal message header, payload is empty") return nil, fmt.Errorf("failed to unmarshal message header, payload is empty")
@ -395,7 +400,7 @@ func (ms *mqMsgStream) getTsMsgFromConsumerMsg(msg common.Message) (TsMsg, error
if header.Base == nil { if header.Base == nil {
return nil, fmt.Errorf("failed to unmarshal message, header is uncomplete") return nil, fmt.Errorf("failed to unmarshal message, header is uncomplete")
} }
tsMsg, err := ms.unmarshal.Unmarshal(msg.Payload(), header.Base.MsgType) tsMsg, err := unmarshalDispatcher.Unmarshal(msg.Payload(), header.Base.MsgType)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to unmarshal tsMsg, err %s", err.Error()) return nil, fmt.Errorf("failed to unmarshal tsMsg, err %s", err.Error())
} }

View File

@ -24,6 +24,13 @@ import (
"github.com/milvus-io/milvus/pkg/mq/common" "github.com/milvus-io/milvus/pkg/mq/common"
) )
// NewPulsarID creates a new pulsarID
func NewPulsarID(id pulsar.MessageID) *pulsarID {
return &pulsarID{
messageID: id,
}
}
type pulsarID struct { type pulsarID struct {
messageID pulsar.MessageID messageID pulsar.MessageID
} }
@ -31,6 +38,10 @@ type pulsarID struct {
// Check if pulsarID implements and MessageID interface // Check if pulsarID implements and MessageID interface
var _ common.MessageID = &pulsarID{} var _ common.MessageID = &pulsarID{}
func (pid *pulsarID) PulsarID() pulsar.MessageID {
return pid.messageID
}
func (pid *pulsarID) Serialize() []byte { func (pid *pulsarID) Serialize() []byte {
return pid.messageID.Serialize() return pid.messageID.Serialize()
} }

View File

@ -53,6 +53,7 @@ type TsMsg interface {
Unmarshal(MarshalType) (TsMsg, error) Unmarshal(MarshalType) (TsMsg, error)
Position() *MsgPosition Position() *MsgPosition
SetPosition(*MsgPosition) SetPosition(*MsgPosition)
SetTs(ts uint64)
Size() int Size() int
} }
@ -111,6 +112,11 @@ func (bm *BaseMsg) SetPosition(position *MsgPosition) {
bm.MsgPosition = position bm.MsgPosition = position
} }
func (bm *BaseMsg) SetTs(ts uint64) {
bm.BeginTimestamp = ts
bm.EndTimestamp = ts
}
func convertToByteArray(input interface{}) ([]byte, error) { func convertToByteArray(input interface{}) ([]byte, error) {
switch output := input.(type) { switch output := input.(type) {
case []byte: case []byte:

5
pkg/streaming/OWNERS Normal file
View File

@ -0,0 +1,5 @@
reviewers:
- chyezh
approvers:
- maintainers

View File

@ -0,0 +1,124 @@
package adaptor
import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
var unmashalerDispatcher = (&msgstream.ProtoUDFactory{}).NewUnmarshalDispatcher()
// FromMessageToMsgPack converts message to msgpack.
// Same TimeTick must be sent with same msgpack.
// !!! Msgs must be keep same time tick.
// TODO: remove this function after remove the msgstream implementation.
func NewMsgPackFromMessage(msgs ...message.ImmutableMessage) (*msgstream.MsgPack, error) {
if len(msgs) == 0 {
return nil, nil
}
allTsMsgs := make([]msgstream.TsMsg, 0, len(msgs))
var finalErr error
for _, msg := range msgs {
var tsMsg msgstream.TsMsg
var err error
switch msg.Version() {
case message.VersionOld:
tsMsg, err = fromMessageToTsMsgVOld(msg)
case message.VersionV1:
tsMsg, err = fromMessageToTsMsgV1(msg)
default:
panic("unsupported message version")
}
if err != nil {
finalErr = errors.CombineErrors(finalErr, errors.Wrapf(err, "Failed to convert message to msgpack, %v", msg.MessageID()))
continue
}
allTsMsgs = append(allTsMsgs, tsMsg)
}
if len(allTsMsgs) == 0 {
return nil, finalErr
}
// msgs is sorted by time tick.
// Postition use the last confirmed message id.
// 1. So use the first tsMsgs's Position can read all messages which timetick is greater or equal than the first tsMsgs's BeginTs.
// In other words, from the StartPositions, you can read the full msgPack.
// 2. Use the last tsMsgs's Position as the EndPosition, you can read all messages following the msgPack.
return &msgstream.MsgPack{
BeginTs: allTsMsgs[0].BeginTs(),
EndTs: allTsMsgs[len(allTsMsgs)-1].EndTs(),
Msgs: allTsMsgs,
StartPositions: []*msgstream.MsgPosition{allTsMsgs[0].Position()},
EndPositions: []*msgstream.MsgPosition{allTsMsgs[len(allTsMsgs)-1].Position()},
}, finalErr
}
func fromMessageToTsMsgVOld(msg message.ImmutableMessage) (msgstream.TsMsg, error) {
panic("Not implemented")
}
// fromMessageToTsMsgV1 converts message to ts message.
func fromMessageToTsMsgV1(msg message.ImmutableMessage) (msgstream.TsMsg, error) {
tsMsg, err := unmashalerDispatcher.Unmarshal(msg.Payload(), commonpb.MsgType(msg.MessageType()))
if err != nil {
return nil, errors.Wrap(err, "Failed to unmarshal message")
}
tsMsg.SetTs(msg.TimeTick())
tsMsg.SetPosition(&msgpb.MsgPosition{
ChannelName: msg.VChannel(),
// from the last confirmed message id, you can read all messages which timetick is greater or equal than current message id.
MsgID: MustGetMQWrapperIDFromMessage(msg.LastConfirmedMessageID()).Serialize(),
MsgGroup: "", // Not important any more.
Timestamp: msg.TimeTick(),
})
return recoverMessageFromHeader(tsMsg, msg)
}
// recoverMessageFromHeader recovers message from header.
func recoverMessageFromHeader(tsMsg msgstream.TsMsg, msg message.ImmutableMessage) (msgstream.TsMsg, error) {
switch msg.MessageType() {
case message.MessageTypeInsert:
insertMessage, err := message.AsImmutableInsertMessage(msg)
if err != nil {
return nil, errors.Wrap(err, "Failed to convert message to insert message")
}
// insertMsg has multiple partition and segment assignment is done by insert message header.
// so recover insert message from header before send it.
return recoverInsertMsgFromHeader(tsMsg.(*msgstream.InsertMsg), insertMessage.MessageHeader(), msg.TimeTick())
default:
return tsMsg, nil
}
}
// recoverInsertMsgFromHeader recovers insert message from header.
func recoverInsertMsgFromHeader(insertMsg *msgstream.InsertMsg, header *message.InsertMessageHeader, timetick uint64) (msgstream.TsMsg, error) {
if insertMsg.GetCollectionID() != header.GetCollectionId() {
panic("unreachable code, collection id is not equal")
}
// header promise a batch insert on vchannel in future, so header has multiple partition.
var assignment *message.PartitionSegmentAssignment
for _, p := range header.Partitions {
if p.GetPartitionId() == insertMsg.GetPartitionID() {
assignment = p
break
}
}
if assignment.GetSegmentAssignment().GetSegmentId() == 0 {
panic("unreachable code, partition id is not exist")
}
insertMsg.SegmentID = assignment.GetSegmentAssignment().GetSegmentId()
// timetick should has been assign at streaming node.
// so overwrite the timetick on insertRequest.
timestamps := make([]uint64, insertMsg.GetNumRows())
for i := 0; i < len(timestamps); i++ {
timestamps[i] = timetick
}
insertMsg.Timestamps = timestamps
return insertMsg, nil
}

View File

@ -0,0 +1,34 @@
package adaptor
import (
"github.com/apache/pulsar-client-go/pulsar"
"github.com/milvus-io/milvus/pkg/mq/common"
"github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/server"
mqpulsar "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper/pulsar"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
msgpulsar "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq"
)
// MustGetMQWrapperIDFromMessage converts message.MessageID to common.MessageID
// TODO: should be removed in future after common.MessageID is removed
func MustGetMQWrapperIDFromMessage(messageID message.MessageID) common.MessageID {
if id, ok := messageID.(interface{ PulsarID() pulsar.MessageID }); ok {
return mqpulsar.NewPulsarID(id.PulsarID())
} else if id, ok := messageID.(interface{ RmqID() int64 }); ok {
return &server.RmqID{MessageID: id.RmqID()}
}
panic("unsupported now")
}
// MustGetMessageIDFromMQWrapperID converts common.MessageID to message.MessageID
// TODO: should be removed in future after common.MessageID is removed
func MustGetMessageIDFromMQWrapperID(commonMessageID common.MessageID) message.MessageID {
if id, ok := commonMessageID.(interface{ PulsarID() pulsar.MessageID }); ok {
return msgpulsar.NewPulsarID(id.PulsarID())
} else if id, ok := commonMessageID.(*server.RmqID); ok {
return rmq.NewRmqID(id.MessageID)
}
return nil
}

View File

@ -0,0 +1,20 @@
package adaptor
import (
"testing"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/stretchr/testify/assert"
msgpulsar "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq"
)
func TestIDCoversion(t *testing.T) {
id := MustGetMessageIDFromMQWrapperID(MustGetMQWrapperIDFromMessage(rmq.NewRmqID(1)))
assert.True(t, id.EQ(rmq.NewRmqID(1)))
msgID := pulsar.EarliestMessageID()
id = MustGetMessageIDFromMQWrapperID(MustGetMQWrapperIDFromMessage(msgpulsar.NewPulsarID(msgID)))
assert.True(t, id.EQ(msgpulsar.NewPulsarID(msgID)))
}

View File

@ -0,0 +1,72 @@
package adaptor
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq"
)
func TestNewMsgPackFromInsertMessage(t *testing.T) {
id := rmq.NewRmqID(1)
fieldCount := map[int64]int{
3: 1000,
4: 2000,
5: 3000,
6: 5000,
}
tt := uint64(time.Now().UnixNano())
immutableMessages := make([]message.ImmutableMessage, 0, len(fieldCount))
for segmentID, rowNum := range fieldCount {
insertMsg := message.CreateTestInsertMessage(t, segmentID, rowNum, tt, id)
immutableMessage := insertMsg.IntoImmutableMessage(id)
immutableMessages = append(immutableMessages, immutableMessage)
}
pack, err := NewMsgPackFromMessage(immutableMessages...)
assert.NoError(t, err)
assert.NotNil(t, pack)
assert.Equal(t, tt, pack.BeginTs)
assert.Equal(t, tt, pack.EndTs)
assert.Len(t, pack.Msgs, len(fieldCount))
for _, msg := range pack.Msgs {
insertMsg := msg.(*msgstream.InsertMsg)
rowNum, ok := fieldCount[insertMsg.GetSegmentID()]
assert.True(t, ok)
assert.Len(t, insertMsg.Timestamps, rowNum)
assert.Len(t, insertMsg.RowIDs, rowNum)
assert.Len(t, insertMsg.FieldsData, 2)
for _, fieldData := range insertMsg.FieldsData {
if data := fieldData.GetScalars().GetBoolData(); data != nil {
assert.Len(t, data.Data, rowNum)
} else if data := fieldData.GetScalars().GetIntData(); data != nil {
assert.Len(t, data.Data, rowNum)
}
}
for _, ts := range insertMsg.Timestamps {
assert.Equal(t, ts, tt)
}
}
}
func TestNewMsgPackFromCreateCollectionMessage(t *testing.T) {
id := rmq.NewRmqID(1)
tt := uint64(time.Now().UnixNano())
msg := message.CreateTestCreateCollectionMessage(t, 1, tt, id)
immutableMessage := msg.IntoImmutableMessage(id)
pack, err := NewMsgPackFromMessage(immutableMessage)
assert.NoError(t, err)
assert.NotNil(t, pack)
assert.Equal(t, tt, pack.BeginTs)
assert.Equal(t, tt, pack.EndTs)
}

View File

@ -1,5 +1,24 @@
package message package message
import (
"fmt"
"reflect"
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
)
// NewMutableMessage creates a new mutable message.
// Only used at server side for streamingnode internal service, don't use it at client side.
func NewMutableMessage(payload []byte, properties map[string]string) MutableMessage {
return &messageImpl{
payload: payload,
properties: properties,
}
}
// NewImmutableMessage creates a new immutable message. // NewImmutableMessage creates a new immutable message.
func NewImmutableMesasge( func NewImmutableMesasge(
id MessageID, id MessageID,
@ -15,43 +34,69 @@ func NewImmutableMesasge(
} }
} }
// NewMutableMessageBuilder creates a new builder. // List all type-safe mutable message builders here.
// Should only used at client side. var (
func NewMutableMessageBuilder() *MutableMesasgeBuilder { NewTimeTickMessageBuilderV1 = createNewMessageBuilderV1[*TimeTickMessageHeader, *msgpb.TimeTickMsg]()
return &MutableMesasgeBuilder{ NewInsertMessageBuilderV1 = createNewMessageBuilderV1[*InsertMessageHeader, *msgpb.InsertRequest]()
payload: nil, NewDeleteMessageBuilderV1 = createNewMessageBuilderV1[*DeleteMessageHeader, *msgpb.DeleteRequest]()
properties: make(propertiesImpl), NewCreateCollectionMessageBuilderV1 = createNewMessageBuilderV1[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest]()
NewDropCollectionMessageBuilderV1 = createNewMessageBuilderV1[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]()
NewCreatePartitionMessageBuilderV1 = createNewMessageBuilderV1[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]()
NewDropPartitionMessageBuilderV1 = createNewMessageBuilderV1[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]()
)
// createNewMessageBuilderV1 creates a new message builder with v1 marker.
func createNewMessageBuilderV1[H proto.Message, P proto.Message]() func() *mutableMesasgeBuilder[H, P] {
return func() *mutableMesasgeBuilder[H, P] {
return newMutableMessageBuilder[H, P](VersionV1)
} }
} }
// MutableMesasgeBuilder is the builder for message. // newMutableMessageBuilder creates a new builder.
type MutableMesasgeBuilder struct { // Should only used at client side.
payload []byte func newMutableMessageBuilder[H proto.Message, P proto.Message](v Version) *mutableMesasgeBuilder[H, P] {
var h H
messageType := mustGetMessageTypeFromMessageHeader(h)
properties := make(propertiesImpl)
properties.Set(messageTypeKey, messageType.marshal())
properties.Set(messageVersion, v.String())
return &mutableMesasgeBuilder[H, P]{
properties: properties,
}
}
// mutableMesasgeBuilder is the builder for message.
type mutableMesasgeBuilder[H proto.Message, P proto.Message] struct {
header H
payload P
properties propertiesImpl properties propertiesImpl
} }
func (b *MutableMesasgeBuilder) WithMessageType(t MessageType) *MutableMesasgeBuilder { // WithMessageHeader creates a new builder with determined message type.
b.properties.Set(messageTypeKey, t.marshal()) func (b *mutableMesasgeBuilder[H, P]) WithMessageHeader(h H) *mutableMesasgeBuilder[H, P] {
b.header = h
return b return b
} }
// WithPayload creates a new builder with message payload. // WithPayload creates a new builder with message payload.
// The MessageType is required to indicate which message type payload is. func (b *mutableMesasgeBuilder[H, P]) WithPayload(p P) *mutableMesasgeBuilder[H, P] {
func (b *MutableMesasgeBuilder) WithPayload(payload []byte) *MutableMesasgeBuilder { b.payload = p
b.payload = payload
return b return b
} }
// WithProperty creates a new builder with message property. // WithProperty creates a new builder with message property.
// A key started with '_' is reserved for streaming system, should never used at user of client. // A key started with '_' is reserved for streaming system, should never used at user of client.
func (b *MutableMesasgeBuilder) WithProperty(key string, val string) *MutableMesasgeBuilder { func (b *mutableMesasgeBuilder[H, P]) WithProperty(key string, val string) *mutableMesasgeBuilder[H, P] {
if b.properties.Exist(key) {
panic(fmt.Sprintf("message builder already set property field, key = %s", key))
}
b.properties.Set(key, val) b.properties.Set(key, val)
return b return b
} }
// WithProperties creates a new builder with message properties. // WithProperties creates a new builder with message properties.
// A key started with '_' is reserved for streaming system, should never used at user of client. // A key started with '_' is reserved for streaming system, should never used at user of client.
func (b *MutableMesasgeBuilder) WithProperties(kvs map[string]string) *MutableMesasgeBuilder { func (b *mutableMesasgeBuilder[H, P]) WithProperties(kvs map[string]string) *mutableMesasgeBuilder[H, P] {
for key, val := range kvs { for key, val := range kvs {
b.properties.Set(key, val) b.properties.Set(key, val)
} }
@ -61,17 +106,28 @@ func (b *MutableMesasgeBuilder) WithProperties(kvs map[string]string) *MutableMe
// BuildMutable builds a mutable message. // BuildMutable builds a mutable message.
// Panic if not set payload and message type. // Panic if not set payload and message type.
// should only used at client side. // should only used at client side.
func (b *MutableMesasgeBuilder) BuildMutable() MutableMessage { func (b *mutableMesasgeBuilder[H, P]) BuildMutable() (MutableMessage, error) {
if b.payload == nil { // payload and header must be a pointer
if reflect.ValueOf(b.header).IsNil() {
panic("message builder not ready for header field")
}
if reflect.ValueOf(b.payload).IsNil() {
panic("message builder not ready for payload field") panic("message builder not ready for payload field")
} }
if !b.properties.Exist(messageTypeKey) {
panic("message builder not ready for message type field") // setup header.
sp, err := EncodeProto(b.header)
if err != nil {
return nil, errors.Wrap(err, "failed to encode header")
}
b.properties.Set(messageSpecialiedHeader, sp)
payload, err := proto.Marshal(b.payload)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal payload")
} }
// Set message version.
b.properties.Set(messageVersion, VersionV1.String())
return &messageImpl{ return &messageImpl{
payload: b.payload, payload: payload,
properties: b.properties, properties: b.properties,
} }, nil
} }

View File

@ -1,6 +1,12 @@
package message package message
import "strconv" import (
"encoding/base64"
"strconv"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
)
const base = 36 const base = 36
@ -23,3 +29,21 @@ func DecodeUint64(value string) (uint64, error) {
func DecodeInt64(value string) (int64, error) { func DecodeInt64(value string) (int64, error) {
return strconv.ParseInt(value, base, 64) return strconv.ParseInt(value, base, 64)
} }
// EncodeProto encodes proto message to json string.
func EncodeProto(m proto.Message) (string, error) {
result, err := proto.Marshal(m)
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(result), nil
}
// DecodeProto
func DecodeProto(data string, m proto.Message) error {
val, err := base64.StdEncoding.DecodeString(data)
if err != nil {
return errors.Wrap(err, "failed to decode base64")
}
return proto.Unmarshal(val, m)
}

View File

@ -0,0 +1,27 @@
package message
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestEncoder(t *testing.T) {
result, err := DecodeInt64(EncodeInt64(1))
assert.NoError(t, err)
assert.Equal(t, int64(1), result)
result2, err := DecodeUint64(EncodeUint64(1))
assert.NoError(t, err)
assert.Equal(t, uint64(1), result2)
result3, err := EncodeProto(&InsertMessageHeader{
CollectionId: 1,
})
assert.NoError(t, err)
var result4 InsertMessageHeader
err = DecodeProto(result3, &result4)
assert.NoError(t, err)
assert.Equal(t, result4.CollectionId, int64(1))
}

View File

@ -1,5 +1,7 @@
package message package message
import "github.com/golang/protobuf/proto"
var ( var (
_ BasicMessage = (*messageImpl)(nil) _ BasicMessage = (*messageImpl)(nil)
_ MutableMessage = (*messageImpl)(nil) _ MutableMessage = (*messageImpl)(nil)
@ -76,3 +78,30 @@ type ImmutableMessage interface {
// MessageID returns the message id of current message. // MessageID returns the message id of current message.
MessageID() MessageID MessageID() MessageID
} }
// specializedMutableMessage is the specialized mutable message interface.
type specializedMutableMessage[H proto.Message] interface {
BasicMessage
// VChannel returns the vchannel of the message.
VChannel() string
// TimeTick returns the time tick of the message.
TimeTick() uint64
// MessageHeader returns the message header.
// Modifications to the returned header will be reflected in the message.
MessageHeader() H
// OverwriteMessageHeader overwrites the message header.
OverwriteMessageHeader(header H)
}
// specializedImmutableMessage is the specialized immutable message interface.
type specializedImmutableMessage[H proto.Message] interface {
ImmutableMessage
// MessageHeader returns the message header.
// Modifications to the returned header will be reflected in the message.
MessageHeader() H
}

View File

@ -1,30 +1,35 @@
package message_test package message_test
import ( import (
"bytes"
"fmt" "fmt"
"testing" "testing"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message" "github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/util/message"
) )
func TestMessage(t *testing.T) { func TestMessage(t *testing.T) {
b := message.NewMutableMessageBuilder() b := message.NewTimeTickMessageBuilderV1()
mutableMessage := b. mutableMessage, err := b.WithMessageHeader(&message.TimeTickMessageHeader{}).
WithMessageType(message.MessageTypeTimeTick).
WithPayload([]byte("payload")).
WithProperties(map[string]string{"key": "value"}). WithProperties(map[string]string{"key": "value"}).
BuildMutable() WithPayload(&msgpb.TimeTickMsg{}).BuildMutable()
assert.NoError(t, err)
assert.Equal(t, "payload", string(mutableMessage.Payload())) payload, err := proto.Marshal(&message.TimeTickMessageHeader{})
assert.NoError(t, err)
assert.True(t, bytes.Equal(payload, mutableMessage.Payload()))
assert.True(t, mutableMessage.Properties().Exist("key")) assert.True(t, mutableMessage.Properties().Exist("key"))
v, ok := mutableMessage.Properties().Get("key") v, ok := mutableMessage.Properties().Get("key")
assert.Equal(t, "value", v) assert.Equal(t, "value", v)
assert.True(t, ok) assert.True(t, ok)
assert.Equal(t, message.MessageTypeTimeTick, mutableMessage.MessageType()) assert.Equal(t, message.MessageTypeTimeTick, mutableMessage.MessageType())
assert.Equal(t, 24, mutableMessage.EstimateSize()) assert.Equal(t, 20, mutableMessage.EstimateSize())
mutableMessage.WithTimeTick(123) mutableMessage.WithTimeTick(123)
v, ok = mutableMessage.Properties().Get("_tt") v, ok = mutableMessage.Properties().Get("_tt")
assert.True(t, ok) assert.True(t, ok)
@ -96,6 +101,6 @@ func TestMessage(t *testing.T) {
}) })
assert.Panics(t, func() { assert.Panics(t, func() {
message.NewMutableMessageBuilder().BuildMutable() message.NewTimeTickMessageBuilderV1().BuildMutable()
}) })
} }

View File

@ -69,6 +69,28 @@ func (m *messageImpl) IntoImmutableMessage(id MessageID) ImmutableMessage {
} }
} }
// TimeTick returns the time tick of current message.
func (m *messageImpl) TimeTick() uint64 {
value, ok := m.properties.Get(messageTimeTick)
if !ok {
panic(fmt.Sprintf("there's a bug in the message codes, timetick lost in properties of message"))
}
tt, err := DecodeUint64(value)
if err != nil {
panic(fmt.Sprintf("there's a bug in the message codes, dirty timetick %s in properties of message", value))
}
return tt
}
// VChannel returns the vchannel of current message.
func (m *messageImpl) VChannel() string {
value, ok := m.properties.Get(messageVChannel)
if !ok {
panic(fmt.Sprintf("there's a bug in the message codes, vchannel lost in properties of message"))
}
return value
}
type immutableMessageImpl struct { type immutableMessageImpl struct {
messageImpl messageImpl
id MessageID id MessageID
@ -92,6 +114,14 @@ func (m *immutableMessageImpl) TimeTick() uint64 {
return tt return tt
} }
func (m *immutableMessageImpl) VChannel() string {
value, ok := m.properties.Get(messageVChannel)
if !ok {
panic(fmt.Sprintf("there's a bug in the message codes, vchannel lost in properties of message, id: %+v", m.id))
}
return value
}
func (m *immutableMessageImpl) LastConfirmedMessageID() MessageID { func (m *immutableMessageImpl) LastConfirmedMessageID() MessageID {
value, ok := m.properties.Get(messageLastConfirmed) value, ok := m.properties.Get(messageLastConfirmed)
if !ok { if !ok {
@ -108,11 +138,3 @@ func (m *immutableMessageImpl) LastConfirmedMessageID() MessageID {
func (m *immutableMessageImpl) MessageID() MessageID { func (m *immutableMessageImpl) MessageID() MessageID {
return m.id return m.id
} }
func (m *immutableMessageImpl) VChannel() string {
value, ok := m.properties.Get(messageVChannel)
if !ok {
panic(fmt.Sprintf("there's a bug in the message codes, vchannel lost in properties of message, id: %+v", m.id))
}
return value
}

View File

@ -0,0 +1,84 @@
syntax = "proto3";
package milvus.proto.message;
option go_package = "github.com/milvus-io/milvus/pkg/streaming/util/message/messagepb";
///
/// Message Payload Definitions
/// Some message payload is defined at msg.proto at milvus-proto for
/// compatibility.
/// 1. InsertRequest
/// 2. DeleteRequest
/// 3. TimeTickRequest
/// 4. CreateCollectionRequest
/// 5. DropCollectionRequest
/// 6. CreatePartitionRequest
/// 7. DropPartitionRequest
///
// FlushMessagePayload is the payload of flush message.
message FlushMessagePayload {
int64 collection_id =
1; // indicate which the collection that segment belong to.
repeated int64 segment_id = 2; // indicate which segment to flush.
}
///
/// Message Header Definitions
/// Used to fast handling at streaming node write ahead.
/// The header should be simple and light enough to be parsed.
/// Do not put too much information in the header if unnecessary.
///
// TimeTickMessageHeader just nothing.
message TimeTickMessageHeader {}
// InsertMessageHeader is the header of insert message.
message InsertMessageHeader {
int64 collection_id = 1;
repeated PartitionSegmentAssignment partitions = 2;
}
// PartitionSegmentAssignment is the segment assignment of a partition.
message PartitionSegmentAssignment {
int64 partition_id = 1;
uint64 rows = 2;
uint64 binary_size = 3;
SegmentAssignment segment_assignment = 4;
}
// SegmentAssignment is the assignment of a segment.
message SegmentAssignment {
int64 segment_id = 1;
}
// DeleteMessageHeader
message DeleteMessageHeader {
int64 collection_id = 1;
}
// FlushMessageHeader just nothing.
message FlushMessageHeader {}
// CreateCollectionMessageHeader is the header of create collection message.
message CreateCollectionMessageHeader {
int64 collection_id = 1;
}
// DropCollectionMessageHeader is the header of drop collection message.
message DropCollectionMessageHeader {
int64 collection_id = 1;
}
// CreatePartitionMessageHeader is the header of create partition message.
message CreatePartitionMessageHeader {
int64 collection_id = 1;
int64 partition_id = 2;
}
// DropPartitionMessageHeader is the header of drop partition message.
message DropPartitionMessageHeader {
int64 collection_id = 1;
int64 partition_id = 2;
}

View File

@ -7,6 +7,7 @@ const (
messageTimeTick = "_tt" // message time tick. messageTimeTick = "_tt" // message time tick.
messageLastConfirmed = "_lc" // message last confirmed message id. messageLastConfirmed = "_lc" // message last confirmed message id.
messageVChannel = "_vc" // message virtual channel. messageVChannel = "_vc" // message virtual channel.
messageSpecialiedHeader = "_sh" // specialized message header.
) )
var ( var (

View File

@ -0,0 +1,186 @@
package message
import (
"fmt"
"reflect"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"github.com/milvus-io/milvus/pkg/streaming/util/message/messagepb"
)
type (
SegmentAssignment = messagepb.SegmentAssignment
PartitionSegmentAssignment = messagepb.PartitionSegmentAssignment
TimeTickMessageHeader = messagepb.TimeTickMessageHeader
InsertMessageHeader = messagepb.InsertMessageHeader
DeleteMessageHeader = messagepb.DeleteMessageHeader
CreateCollectionMessageHeader = messagepb.CreateCollectionMessageHeader
DropCollectionMessageHeader = messagepb.DropCollectionMessageHeader
CreatePartitionMessageHeader = messagepb.CreatePartitionMessageHeader
DropPartitionMessageHeader = messagepb.DropPartitionMessageHeader
)
// messageTypeMap maps the proto message type to the message type.
var messageTypeMap = map[reflect.Type]MessageType{
reflect.TypeOf(&TimeTickMessageHeader{}): MessageTypeTimeTick,
reflect.TypeOf(&InsertMessageHeader{}): MessageTypeInsert,
reflect.TypeOf(&DeleteMessageHeader{}): MessageTypeDelete,
reflect.TypeOf(&CreateCollectionMessageHeader{}): MessageTypeCreateCollection,
reflect.TypeOf(&DropCollectionMessageHeader{}): MessageTypeDropCollection,
reflect.TypeOf(&CreatePartitionMessageHeader{}): MessageTypeCreatePartition,
reflect.TypeOf(&DropPartitionMessageHeader{}): MessageTypeDropPartition,
}
// List all specialized message types.
type (
MutableTimeTickMessage = specializedMutableMessage[*TimeTickMessageHeader]
MutableInsertMessage = specializedMutableMessage[*InsertMessageHeader]
MutableDeleteMessage = specializedMutableMessage[*DeleteMessageHeader]
MutableCreateCollection = specializedMutableMessage[*CreateCollectionMessageHeader]
MutableDropCollection = specializedMutableMessage[*DropCollectionMessageHeader]
MutableCreatePartition = specializedMutableMessage[*CreatePartitionMessageHeader]
MutableDropPartition = specializedMutableMessage[*DropPartitionMessageHeader]
ImmutableTimeTickMessage = specializedImmutableMessage[*TimeTickMessageHeader]
ImmutableInsertMessage = specializedImmutableMessage[*InsertMessageHeader]
ImmutableDeleteMessage = specializedImmutableMessage[*DeleteMessageHeader]
ImmutableCreateCollection = specializedImmutableMessage[*CreateCollectionMessageHeader]
ImmutableDropCollection = specializedImmutableMessage[*DropCollectionMessageHeader]
ImmutableCreatePartition = specializedImmutableMessage[*CreatePartitionMessageHeader]
ImmutableDropPartition = specializedImmutableMessage[*DropPartitionMessageHeader]
)
// List all as functions for specialized messages.
var (
AsMutableTimeTickMessage = asSpecializedMutableMessage[*TimeTickMessageHeader]
AsMutableInsertMessage = asSpecializedMutableMessage[*InsertMessageHeader]
AsMutableDeleteMessage = asSpecializedMutableMessage[*DeleteMessageHeader]
AsMutableCreateCollection = asSpecializedMutableMessage[*CreateCollectionMessageHeader]
AsMutableDropCollection = asSpecializedMutableMessage[*DropCollectionMessageHeader]
AsMutableCreatePartition = asSpecializedMutableMessage[*CreatePartitionMessageHeader]
AsMutableDropPartition = asSpecializedMutableMessage[*DropPartitionMessageHeader]
AsImmutableTimeTickMessage = asSpecializedImmutableMessage[*TimeTickMessageHeader]
AsImmutableInsertMessage = asSpecializedImmutableMessage[*InsertMessageHeader]
AsImmutableDeleteMessage = asSpecializedImmutableMessage[*DeleteMessageHeader]
AsImmutableCreateCollection = asSpecializedImmutableMessage[*CreateCollectionMessageHeader]
AsImmutableDropCollection = asSpecializedImmutableMessage[*DropCollectionMessageHeader]
AsImmutableCreatePartition = asSpecializedImmutableMessage[*CreatePartitionMessageHeader]
AsImmutableDropPartition = asSpecializedImmutableMessage[*DropPartitionMessageHeader]
)
// asSpecializedMutableMessage converts a MutableMessage to a specialized MutableMessage.
// Return nil, nil if the message is not the target specialized message.
// Return nil, error if the message is the target specialized message but failed to decode the specialized header.
// Return specializedMutableMessage, nil if the message is the target specialized message and successfully decoded the specialized header.
func asSpecializedMutableMessage[H proto.Message](msg MutableMessage) (specializedMutableMessage[H], error) {
underlying := msg.(*messageImpl)
var header H
msgType := mustGetMessageTypeFromMessageHeader(header)
if underlying.MessageType() != msgType {
// The message type do not match the specialized header.
return nil, nil
}
// Get the specialized header from the message.
val, ok := underlying.properties.Get(messageSpecialiedHeader)
if !ok {
return nil, errors.Errorf("lost specialized header, %s", msgType.String())
}
// Decode the specialized header.
// Must be pointer type.
t := reflect.TypeOf(header)
t.Elem()
header = reflect.New(t.Elem()).Interface().(H)
// must be a pointer to a proto message
if err := DecodeProto(val, header); err != nil {
return nil, errors.Wrap(err, "failed to decode specialized header")
}
return &specializedMutableMessageImpl[H]{
header: header,
messageImpl: underlying,
}, nil
}
// asSpecializedImmutableMessage converts a ImmutableMessage to a specialized ImmutableMessage.
// Return nil, nil if the message is not the target specialized message.
// Return nil, error if the message is the target specialized message but failed to decode the specialized header.
// Return asSpecializedImmutableMessage, nil if the message is the target specialized message and successfully decoded the specialized header.
func asSpecializedImmutableMessage[H proto.Message](msg ImmutableMessage) (specializedImmutableMessage[H], error) {
underlying := msg.(*immutableMessageImpl)
var header H
msgType := mustGetMessageTypeFromMessageHeader(header)
if underlying.MessageType() != msgType {
// The message type do not match the specialized header.
return nil, nil
}
// Get the specialized header from the message.
val, ok := underlying.properties.Get(messageSpecialiedHeader)
if !ok {
return nil, errors.Errorf("lost specialized header, %s", msgType.String())
}
// Decode the specialized header.
// Must be pointer type.
t := reflect.TypeOf(header)
t.Elem()
header = reflect.New(t.Elem()).Interface().(H)
// must be a pointer to a proto message
if err := DecodeProto(val, header); err != nil {
return nil, errors.Wrap(err, "failed to decode specialized header")
}
return &specializedImmutableMessageImpl[H]{
header: header,
immutableMessageImpl: underlying,
}, nil
}
// mustGetMessageTypeFromMessageHeader returns the message type of the given message header.
func mustGetMessageTypeFromMessageHeader(msg proto.Message) MessageType {
t := reflect.TypeOf(msg)
mt, ok := messageTypeMap[t]
if !ok {
panic(fmt.Sprintf("unsupported message type of proto header: %s", t.Name()))
}
return mt
}
// specializedMutableMessageImpl is the specialized mutable message implementation.
type specializedMutableMessageImpl[H proto.Message] struct {
header H
*messageImpl
}
// MessageHeader returns the message header.
func (m *specializedMutableMessageImpl[H]) MessageHeader() H {
return m.header
}
// OverwriteMessageHeader overwrites the message header.
func (m *specializedMutableMessageImpl[H]) OverwriteMessageHeader(header H) {
m.header = header
newHeader, err := EncodeProto(m.header)
if err != nil {
panic(fmt.Sprintf("failed to encode insert header, there's a bug, %+v, %s", m.header, err.Error()))
}
m.messageImpl.properties.Set(messageSpecialiedHeader, newHeader)
}
// specializedImmutableMessageImpl is the specialized immmutable message implementation.
type specializedImmutableMessageImpl[H proto.Message] struct {
header H
*immutableMessageImpl
}
// MessageHeader returns the message header.
func (m *specializedImmutableMessageImpl[H]) MessageHeader() H {
return m.header
}

View File

@ -0,0 +1,54 @@
package message_test
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
func TestAsSpecializedMessage(t *testing.T) {
m, err := message.NewInsertMessageBuilderV1().
WithMessageHeader(&message.InsertMessageHeader{
CollectionId: 1,
Partitions: []*message.PartitionSegmentAssignment{
{
PartitionId: 1,
Rows: 100,
BinarySize: 1000,
},
},
}).
WithPayload(&msgpb.InsertRequest{}).BuildMutable()
assert.NoError(t, err)
insertMsg, err := message.AsMutableInsertMessage(m)
assert.NoError(t, err)
assert.NotNil(t, insertMsg)
assert.Equal(t, int64(1), insertMsg.MessageHeader().CollectionId)
h := insertMsg.MessageHeader()
h.Partitions[0].SegmentAssignment = &message.SegmentAssignment{
SegmentId: 1,
}
insertMsg.OverwriteMessageHeader(h)
createColMsg, err := message.AsMutableCreateCollection(m)
assert.NoError(t, err)
assert.Nil(t, createColMsg)
m2 := m.IntoImmutableMessage(mock_message.NewMockMessageID(t))
insertMsg2, err := message.AsImmutableInsertMessage(m2)
assert.NoError(t, err)
assert.NotNil(t, insertMsg2)
assert.Equal(t, int64(1), insertMsg2.MessageHeader().CollectionId)
assert.Equal(t, insertMsg2.MessageHeader().Partitions[0].SegmentAssignment.SegmentId, int64(1))
createColMsg2, err := message.AsMutableCreateCollection(m)
assert.NoError(t, err)
assert.Nil(t, createColMsg2)
}

View File

@ -0,0 +1,148 @@
//go:build test
// +build test
package message
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
)
func CreateTestInsertMessage(t *testing.T, segmentID int64, totalRows int, timetick uint64, messageID MessageID) MutableMessage {
timestamps := make([]uint64, 0, totalRows)
for i := 0; i < totalRows; i++ {
timestamps = append(timestamps, uint64(0))
}
rowIDs := make([]int64, 0, totalRows)
for i := 0; i < totalRows; i++ {
rowIDs = append(rowIDs, int64(i))
}
intFieldArray := make([]int32, 0, totalRows)
for i := 0; i < totalRows; i++ {
intFieldArray = append(intFieldArray, int32(i))
}
boolFieldArray := make([]bool, 0, totalRows)
for i := 0; i < totalRows; i++ {
boolFieldArray = append(boolFieldArray, i%2 == 0)
}
fieldsData := []*schemapb.FieldData{
{
Type: schemapb.DataType_Int64,
FieldName: "f1",
FieldId: 1,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: intFieldArray,
},
},
},
},
},
{
Type: schemapb.DataType_Bool,
FieldName: "f2",
FieldId: 2,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_BoolData{
BoolData: &schemapb.BoolArray{
Data: boolFieldArray,
},
},
},
},
},
}
msg, err := NewInsertMessageBuilderV1().
WithMessageHeader(&InsertMessageHeader{
CollectionId: 1,
Partitions: []*PartitionSegmentAssignment{
{
PartitionId: 2,
Rows: uint64(totalRows),
BinarySize: 10000,
SegmentAssignment: &SegmentAssignment{SegmentId: segmentID},
},
},
}).
WithPayload(&msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
Timestamp: 100,
SourceID: 1,
},
ShardName: "v1",
DbName: "test_name",
CollectionName: "test_name",
PartitionName: "test_name",
DbID: 1,
CollectionID: 1,
PartitionID: 2,
SegmentID: 0,
Version: msgpb.InsertDataVersion_ColumnBased,
FieldsData: fieldsData,
RowIDs: rowIDs,
Timestamps: timestamps,
NumRows: uint64(totalRows),
}).BuildMutable()
if err != nil {
panic(err)
}
msg.WithVChannel("v1")
msg.WithTimeTick(timetick)
msg.WithLastConfirmed(messageID)
return msg
}
func CreateTestCreateCollectionMessage(t *testing.T, collectionID int64, timetick uint64, messageID MessageID) MutableMessage {
header := &CreateCollectionMessageHeader{
CollectionId: collectionID,
}
payload := &msgpb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateCollection,
MsgID: collectionID,
Timestamp: 100,
},
DbName: "db",
CollectionName: "collection",
PartitionName: "partition",
DbID: 1,
CollectionID: collectionID,
}
msg, err := NewCreateCollectionMessageBuilderV1().
WithMessageHeader(header).
WithPayload(payload).
BuildMutable()
assert.NoError(t, err)
msg.WithVChannel("v1")
msg.WithTimeTick(timetick)
msg.WithLastConfirmed(messageID)
return msg
}
// CreateTestEmptyInsertMesage creates an empty insert message for testing
func CreateTestEmptyInsertMesage(msgID int64, extraProperties map[string]string) MutableMessage {
msg, err := NewInsertMessageBuilderV1().
WithMessageHeader(&InsertMessageHeader{}).
WithPayload(&msgpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
MsgID: msgID,
},
}).
WithProperties(extraProperties).
BuildMutable()
if err != nil {
panic(err)
}
return msg
}

View File

@ -1,7 +1,7 @@
package pulsar package pulsar
import ( import (
"encoding/hex" "encoding/base64"
"github.com/apache/pulsar-client-go/pulsar" "github.com/apache/pulsar-client-go/pulsar"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
@ -11,6 +11,12 @@ import (
var _ message.MessageID = pulsarID{} var _ message.MessageID = pulsarID{}
// NewPulsarID creates a new pulsarID
// TODO: remove in future.
func NewPulsarID(id pulsar.MessageID) message.MessageID {
return pulsarID{id}
}
func UnmarshalMessageID(data string) (message.MessageID, error) { func UnmarshalMessageID(data string) (message.MessageID, error) {
id, err := unmarshalMessageID(data) id, err := unmarshalMessageID(data)
if err != nil { if err != nil {
@ -20,9 +26,9 @@ func UnmarshalMessageID(data string) (message.MessageID, error) {
} }
func unmarshalMessageID(data string) (pulsarID, error) { func unmarshalMessageID(data string) (pulsarID, error) {
val, err := hex.DecodeString(data) val, err := base64.StdEncoding.DecodeString(data)
if err != nil { if err != nil {
return pulsarID{nil}, errors.Wrapf(message.ErrInvalidMessageID, "decode pulsar fail when decode hex with err: %s, id: %s", err.Error(), data) return pulsarID{nil}, errors.Wrapf(message.ErrInvalidMessageID, "decode pulsar fail when decode base64 with err: %s, id: %s", err.Error(), data)
} }
msgID, err := pulsar.DeserializeMessageID(val) msgID, err := pulsar.DeserializeMessageID(val)
if err != nil { if err != nil {
@ -35,6 +41,13 @@ type pulsarID struct {
pulsar.MessageID pulsar.MessageID
} }
// PulsarID returns the pulsar message id.
// Don't delete this function until conversion logic removed.
// TODO: remove in future.
func (id pulsarID) PulsarID() pulsar.MessageID {
return id.MessageID
}
func (id pulsarID) WALName() string { func (id pulsarID) WALName() string {
return walName return walName
} }
@ -69,5 +82,5 @@ func (id pulsarID) EQ(other message.MessageID) bool {
} }
func (id pulsarID) Marshal() string { func (id pulsarID) Marshal() string {
return hex.EncodeToString(id.Serialize()) return base64.StdEncoding.EncodeToString(id.Serialize())
} }

View File

@ -8,6 +8,12 @@ import (
var _ message.MessageID = rmqID(0) var _ message.MessageID = rmqID(0)
// NewRmqID creates a new rmqID.
// TODO: remove in future.
func NewRmqID(id int64) message.MessageID {
return rmqID(id)
}
// UnmarshalMessageID unmarshal the message id. // UnmarshalMessageID unmarshal the message id.
func UnmarshalMessageID(data string) (message.MessageID, error) { func UnmarshalMessageID(data string) (message.MessageID, error) {
id, err := unmarshalMessageID(data) id, err := unmarshalMessageID(data)
@ -29,6 +35,13 @@ func unmarshalMessageID(data string) (rmqID, error) {
// rmqID is the message id for rmq. // rmqID is the message id for rmq.
type rmqID int64 type rmqID int64
// RmqID returns the message id for conversion
// Don't delete this function until conversion logic removed.
// TODO: remove in future.
func (id rmqID) RmqID() int64 {
return int64(id)
}
// WALName returns the name of message id related wal. // WALName returns the name of message id related wal.
func (id rmqID) WALName() string { func (id rmqID) WALName() string {
return walName return walName

View File

@ -14,11 +14,11 @@ import (
"testing" "testing"
"time" "time"
"github.com/golang/protobuf/proto"
"github.com/remeh/sizedwaitgroup" "github.com/remeh/sizedwaitgroup"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/options" "github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/streaming/util/types"
@ -230,26 +230,11 @@ func (f *testOneWALImplsFramework) testAppend(ctx context.Context, w WALImpls) (
defer swg.Done() defer swg.Done()
// ...rocksmq has a dirty implement of properties, // ...rocksmq has a dirty implement of properties,
// without commonpb.MsgHeader, it can not work. // without commonpb.MsgHeader, it can not work.
header := commonpb.MsgHeader{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
MsgID: int64(i),
},
}
payload, err := proto.Marshal(&header)
if err != nil {
panic(err)
}
properties := map[string]string{ properties := map[string]string{
"id": fmt.Sprintf("%d", i), "id": fmt.Sprintf("%d", i),
"const": "t", "const": "t",
} }
typ := message.MessageTypeUnknown msg := message.CreateTestEmptyInsertMesage(int64(i), properties)
msg := message.NewMutableMessageBuilder().
WithMessageType(typ).
WithPayload(payload).
WithProperties(properties).
BuildMutable()
id, err := w.Append(ctx, msg) id, err := w.Append(ctx, msg)
assert.NoError(f.t, err) assert.NoError(f.t, err)
assert.NotNil(f.t, id) assert.NotNil(f.t, id)
@ -257,27 +242,20 @@ func (f *testOneWALImplsFramework) testAppend(ctx context.Context, w WALImpls) (
}(i) }(i)
} }
swg.Wait() swg.Wait()
// send a final hint message
header := commonpb.MsgHeader{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
MsgID: int64(f.messageCount - 1),
},
}
payload, err := proto.Marshal(&header)
if err != nil {
panic(err)
}
properties := map[string]string{ properties := map[string]string{
"id": fmt.Sprintf("%d", f.messageCount-1), "id": fmt.Sprintf("%d", f.messageCount-1),
"const": "t", "const": "t",
"term": strconv.FormatInt(int64(f.term), 10), "term": strconv.FormatInt(int64(f.term), 10),
} }
msg := message.NewMutableMessageBuilder(). msg, err := message.NewTimeTickMessageBuilderV1().WithMessageHeader(&message.TimeTickMessageHeader{}).WithPayload(&msgpb.TimeTickMsg{
WithPayload(payload). Base: &commonpb.MsgBase{
WithProperties(properties). MsgType: commonpb.MsgType_TimeTick,
WithMessageType(message.MessageTypeTimeTick). MsgID: int64(f.messageCount - 1),
BuildMutable() },
}).WithProperties(properties).BuildMutable()
assert.NoError(f.t, err)
id, err := w.Append(ctx, msg) id, err := w.Append(ctx, msg)
assert.NoError(f.t, err) assert.NoError(f.t, err)
ids[f.messageCount-1] = msg.IntoImmutableMessage(id) ids[f.messageCount-1] = msg.IntoImmutableMessage(id)

View File

@ -0,0 +1,55 @@
package typeutil
// NewMultipartQueue create a new multi-part queue.
func NewMultipartQueue[T any]() *MultipartQueue[T] {
return &MultipartQueue[T]{
pendings: make([][]T, 0),
cnt: 0,
}
}
// MultipartQueue is a multi-part queue.
type MultipartQueue[T any] struct {
pendings [][]T
cnt int
}
// Len return the queue size.
func (pq *MultipartQueue[T]) Len() int {
return pq.cnt
}
// AddOne add a message as pending one
func (pq *MultipartQueue[T]) AddOne(msg T) {
pq.Add([]T{msg})
}
// Add add a slice of message as pending one
func (pq *MultipartQueue[T]) Add(msgs []T) {
if len(msgs) == 0 {
return
}
pq.pendings = append(pq.pendings, msgs)
pq.cnt += len(msgs)
}
// Next return the next message in pending queue.
func (pq *MultipartQueue[T]) Next() T {
if len(pq.pendings) != 0 && len(pq.pendings[0]) != 0 {
return pq.pendings[0][0]
}
var val T
return val
}
// UnsafeAdvance do a advance without check.
// !!! Should only be called `Next` do not return nil.
func (pq *MultipartQueue[T]) UnsafeAdvance() {
if len(pq.pendings[0]) == 1 {
pq.pendings = pq.pendings[1:]
pq.cnt--
return
}
pq.pendings[0] = pq.pendings[0][1:]
pq.cnt--
}

View File

@ -0,0 +1,20 @@
package typeutil
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestMultipartQueue(t *testing.T) {
q := NewMultipartQueue[int]()
for i := 0; i < 100; i++ {
q.AddOne(i)
assert.Equal(t, i+1, q.Len())
}
for i := 100; i > 0; i-- {
assert.NotNil(t, q.Next())
q.UnsafeAdvance()
assert.Equal(t, i-1, q.Len())
}
}

View File

@ -89,3 +89,10 @@ ${protoc_opt} --cpp_out=$CPP_SRC_DIR/src/pb cgo_msg.proto|| { echo 'generate cgo
${protoc_opt} --cpp_out=$CPP_SRC_DIR/src/pb plan.proto|| { echo 'generate plan.proto failed'; exit 1; } ${protoc_opt} --cpp_out=$CPP_SRC_DIR/src/pb plan.proto|| { echo 'generate plan.proto failed'; exit 1; }
popd popd
pushd $ROOT_DIR/pkg/streaming/util/message/messagepb
# streaming node message protobuf
${PROTOC_BIN} --proto_path=. --go_out=plugins=grpc,paths=source_relative:. message.proto
popd