enhance: refactor the consumer grpc proto for reusing grpc stream for multi-consumer (#37564)

issue: #33285

- Modify the proto of consumer of streaming service.
- Make VChannel as a required option for streaming

---------

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2024-11-11 17:24:29 +08:00 committed by GitHub
parent 5e6c3df253
commit 1b6edd0b4b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 252 additions and 102 deletions

View File

@ -952,6 +952,12 @@ func (s *Server) GetChannelRecoveryInfo(ctx context.Context, req *datapb.GetChan
channel := NewRWChannel(req.GetVchannel(), collectionID, nil, collection.Schema, 0) // TODO: remove RWChannel, just use vchannel + collectionID
channelInfo := s.handler.GetDataVChanPositions(channel, allPartitionID)
if channelInfo.SeekPosition == nil {
log.Warn("channel recovery start position is not found, may collection is on creating")
resp.Status = merr.Status(merr.WrapErrChannelNotAvailable(req.GetVchannel(), "start position is nil"))
return resp, nil
}
log.Info("datacoord get channel recovery info",
zap.String("channel", channelInfo.GetChannelName()),
zap.Int("# of unflushed segments", len(channelInfo.GetUnflushedSegmentIds())),

View File

@ -1533,7 +1533,7 @@ func TestGetChannelRecoveryInfo(t *testing.T) {
channelInfo := &datapb.VchannelInfo{
CollectionID: 0,
ChannelName: "ch-1",
SeekPosition: nil,
SeekPosition: &msgpb.MsgPosition{Timestamp: 10},
UnflushedSegmentIds: []int64{1},
FlushedSegmentIds: []int64{2},
DroppedSegmentIds: []int64{3},

View File

@ -34,13 +34,18 @@ func (w *walAccesserImpl) getProducer(pchannel string) *producer.ResumableProduc
return p
}
// assertNoSystemMessage asserts the message is not system message.
func assertNoSystemMessage(msgs ...message.MutableMessage) {
// assertValidMessage asserts the message is not system message.
func assertValidMessage(msgs ...message.MutableMessage) {
for _, msg := range msgs {
if msg.MessageType().IsSystem() {
panic("system message is not allowed to append from client")
}
}
for _, msg := range msgs {
if msg.VChannel() == "" {
panic("vchannel is empty")
}
}
}
// We only support delete and insert message for txn now.

View File

@ -12,6 +12,9 @@ type ConsumerOptions struct {
// PChannel is the pchannel of the consumer.
PChannel string
// VChannel is the vchannel of the consumer.
VChannel string
// DeliverPolicy is the deliver policy of the consumer.
DeliverPolicy options.DeliverPolicy

View File

@ -100,6 +100,7 @@ func (rc *resumableConsumerImpl) resumeLoop() {
}
opts := &handler.ConsumerOptions{
PChannel: rc.opts.PChannel,
VChannel: rc.opts.VChannel,
DeliverPolicy: deliverPolicy,
DeliverFilters: deliverFilters,
MessageHandler: nopCloseMH,

View File

@ -51,6 +51,7 @@ type TxnOption struct {
type ReadOption struct {
// VChannel is the target vchannel to read.
// It must be set to read message from a vchannel.
VChannel string
// DeliverPolicy is the deliver policy of the consumer.

View File

@ -23,7 +23,7 @@ type txnImpl struct {
// Append writes records to the log.
func (t *txnImpl) Append(ctx context.Context, msg message.MutableMessage, opts ...AppendOption) error {
assertNoSystemMessage(msg)
assertValidMessage(msg)
assertIsDmlMessage(msg)
t.mu.Lock()

View File

@ -14,7 +14,7 @@ import (
// Otherwise, it will be sent as individual messages.
// !!! This function do not promise the atomicity and deliver order of the messages appending.
func (u *walAccesserImpl) AppendMessages(ctx context.Context, msgs ...message.MutableMessage) AppendResponses {
assertNoSystemMessage(msgs...)
assertValidMessage(msgs...)
// dispatch the messages into different vchannel.
dispatchedMessages, indexes := u.dispatchMessages(msgs...)

View File

@ -13,7 +13,6 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/client/handler"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/funcutil"
@ -55,7 +54,7 @@ type walAccesserImpl struct {
// RawAppend writes a record to the log.
func (w *walAccesserImpl) RawAppend(ctx context.Context, msg message.MutableMessage, opts ...AppendOption) (*types.AppendResult, error) {
assertNoSystemMessage(msg)
assertValidMessage(msg)
if err := w.lifetime.Add(lifetime.IsWorking); err != nil {
return nil, status.NewOnShutdownError("wal accesser closed, %s", err.Error())
}
@ -71,14 +70,17 @@ func (w *walAccesserImpl) Read(_ context.Context, opts ReadOption) Scanner {
newErrScanner(status.NewOnShutdownError("wal accesser closed, %s", err.Error()))
}
defer w.lifetime.Done()
if opts.VChannel == "" {
return newErrScanner(status.NewInvaildArgument("vchannel is required"))
}
// TODO: optimize the consumer into pchannel level.
pchannel := funcutil.ToPhysicalChannel(opts.VChannel)
filters := append(opts.DeliverFilters, options.DeliverFilterVChannel(opts.VChannel))
rc := consumer.NewResumableConsumer(w.handlerClient.CreateConsumer, &consumer.ConsumerOptions{
PChannel: pchannel,
VChannel: opts.VChannel,
DeliverPolicy: opts.DeliverPolicy,
DeliverFilters: filters,
DeliverFilters: opts.DeliverFilters,
MessageHandler: opts.MessageHandler,
})
return rc

View File

@ -24,6 +24,9 @@ type ConsumerOptions struct {
// The cosume target
Assignment *types.PChannelInfoAssigned
// VChannel is the vchannel of the consumer.
VChannel string
// DeliverPolicy is the deliver policy of the consumer.
DeliverPolicy options.DeliverPolicy
@ -66,7 +69,7 @@ func CreateConsumer(
cli := &consumerImpl{
ctx: ctx,
walName: createResp.GetWalName(),
assignment: *opts.Assignment,
opts: opts,
grpcStreamClient: streamClient,
handlerClient: handlerClient,
logger: log.With(
@ -87,16 +90,14 @@ func createConsumeRequest(ctx context.Context, opts *ConsumerOptions) (context.C
ctx = contextutil.WithPickServerID(ctx, opts.Assignment.Node.ServerID)
// create the consumer request.
return contextutil.WithCreateConsumer(ctx, &streamingpb.CreateConsumerRequest{
Pchannel: types.NewProtoFromPChannelInfo(opts.Assignment.Channel),
DeliverPolicy: opts.DeliverPolicy,
DeliverFilters: opts.DeliverFilters,
Pchannel: types.NewProtoFromPChannelInfo(opts.Assignment.Channel),
}), nil
}
type consumerImpl struct {
ctx context.Context // TODO: the cancel method of consumer should be managed by consumerImpl, fix it in future.
walName string
assignment types.PChannelInfoAssigned
opts *ConsumerOptions
grpcStreamClient streamingpb.StreamingNodeHandlerService_ConsumeClient
handlerClient streamingpb.StreamingNodeHandlerServiceClient
logger *log.MLogger
@ -158,7 +159,9 @@ func (c *consumerImpl) recvLoop() (err error) {
c.finishErr.Set(err)
c.msgHandler.Close()
}()
if err := c.createVChannelConsumer(); err != nil {
return err
}
for {
resp, err := c.grpcStreamClient.Recv()
if errors.Is(err, io.EOF) {
@ -200,6 +203,30 @@ func (c *consumerImpl) recvLoop() (err error) {
}
}
func (c *consumerImpl) createVChannelConsumer() error {
// Create the vchannel client.
if err := c.grpcStreamClient.Send(&streamingpb.ConsumeRequest{
Request: &streamingpb.ConsumeRequest_CreateVchannelConsumer{
CreateVchannelConsumer: &streamingpb.CreateVChannelConsumerRequest{
Vchannel: c.opts.VChannel,
DeliverPolicy: c.opts.DeliverPolicy,
DeliverFilters: c.opts.DeliverFilters,
},
},
}); err != nil {
return err
}
resp, err := c.grpcStreamClient.Recv()
if err != nil {
return err
}
createVChannelResp := resp.GetCreateVchannel()
if createVChannelResp == nil {
return status.NewInvalidRequestSeq("expect create vchannel response")
}
return nil
}
func (c *consumerImpl) handleTxnMessage(msg message.ImmutableMessage) error {
switch msg.MessageType() {
case message.MessageTypeBeginTxn:

View File

@ -139,13 +139,13 @@ func newMockedConsumerImpl(t *testing.T, ctx context.Context, h message.Handler)
})
opts := &ConsumerOptions{
VChannel: "test-1",
Assignment: &types.PChannelInfoAssigned{
Channel: types.PChannelInfo{Name: "test", Term: 1},
Node: types.StreamingNodeInfo{ServerID: 1, Address: "localhost"},
},
DeliverPolicy: options.DeliverPolicyAll(),
DeliverFilters: []options.DeliverFilter{
options.DeliverFilterVChannel("test-1"),
options.DeliverFilterTimeTickGT(100),
},
MessageHandler: h,
@ -158,6 +158,15 @@ func newMockedConsumerImpl(t *testing.T, ctx context.Context, h message.Handler)
},
},
}
recvCh <- &streamingpb.ConsumeResponse{
Response: &streamingpb.ConsumeResponse_CreateVchannel{
CreateVchannel: &streamingpb.CreateVChannelConsumerResponse{
Response: &streamingpb.CreateVChannelConsumerResponse_ConsumerId{
ConsumerId: 1,
},
},
},
}
consumer, err := CreateConsumer(ctx, opts, c)
if err != nil {
panic(err)

View File

@ -49,6 +49,9 @@ type ConsumerOptions struct {
// PChannel is the pchannel of the consumer.
PChannel string
// VChannel is the vchannel of the consumer.
VChannel string
// DeliverPolicy is the deliver policy of the consumer.
DeliverPolicy options.DeliverPolicy

View File

@ -71,6 +71,7 @@ func (hc *handlerClientImpl) CreateConsumer(ctx context.Context, opts *ConsumerO
}
return hc.newConsumer(ctx, &consumer.ConsumerOptions{
Assignment: assign,
VChannel: opts.VChannel,
DeliverPolicy: opts.DeliverPolicy,
DeliverFilters: opts.DeliverFilters,
MessageHandler: opts.MessageHandler,

View File

@ -98,11 +98,11 @@ func TestHandlerClient(t *testing.T) {
consumer, err := handler.CreateConsumer(ctx, &ConsumerOptions{
PChannel: "pchannel",
VChannel: "vchannel",
DeliverPolicy: options.DeliverPolicyAll(),
DeliverFilters: []options.DeliverFilter{
options.DeliverFilterTimeTickGT(10),
options.DeliverFilterTimeTickGTE(10),
options.DeliverFilterVChannel("vchannel"),
},
MessageHandler: make(message.ChanMessageHandler),
})

View File

@ -105,10 +105,8 @@ func (c *channelLifetime) Run() error {
policy := options.DeliverPolicyStartFrom(messageID)
handler := adaptor2.NewMsgPackAdaptorHandler()
ro := wal.ReadOption{
DeliverPolicy: policy,
MessageFilter: []options.DeliverFilter{
options.DeliverFilterVChannel(c.vchannel),
},
VChannel: c.vchannel,
DeliverPolicy: policy,
MesasgeHandler: handler,
}
scanner, err := c.wal.Read(ctx, ro)

View File

@ -25,6 +25,15 @@ func (p *consumeGrpcServerHelper) SendCreated(resp *streamingpb.CreateConsumerRe
})
}
// SendCreated sends the create response to client.
func (p *consumeGrpcServerHelper) SendCreateVChannelConsumer(resp *streamingpb.CreateVChannelConsumerResponse) error {
return p.Send(&streamingpb.ConsumeResponse{
Response: &streamingpb.ConsumeResponse_CreateVchannel{
CreateVchannel: resp,
},
})
}
// SendClosed sends the close response to client.
// no more message should be sent after sending close response.
func (p *consumeGrpcServerHelper) SendClosed() error {

View File

@ -20,40 +20,73 @@ import (
// CreateConsumeServer create a new consumer.
// Expected message sequence:
// CreateConsumeServer:
// -> ConsumeResponse 1
// -> ConsumeResponse 2
// -> ConsumeResponse 3
// <- CreateVChannelConsumer 1
// -> CreateVChannelConsuemr 1
// -> ConsumeMessage 1.1
// <- CreateVChannelConsumer 2
// -> ConsumeMessage 1.2
// -> CreateVChannelConsumer 2
// -> ConsumeMessage 2.1
// -> ConsumeMessage 2.2
// -> ConsumeMessage 1.3
// <- CloseVChannelConsumer 1
// -> CloseVChannelConsumer 1
// -> ConsumeMessage 2.3
// <- CloseVChannelConsumer 2
// -> CloseVChannelConsumer 2
// CloseConsumer:
func CreateConsumeServer(walManager walmanager.Manager, streamServer streamingpb.StreamingNodeHandlerService_ConsumeServer) (*ConsumeServer, error) {
createReq, err := contextutil.GetCreateConsumer(streamServer.Context())
if err != nil {
return nil, status.NewInvaildArgument("create consumer request is required")
}
l, err := walManager.GetAvailableWAL(types.NewPChannelInfoFromProto(createReq.GetPchannel()))
if err != nil {
return nil, err
}
scanner, err := l.Read(streamServer.Context(), wal.ReadOption{
DeliverPolicy: createReq.GetDeliverPolicy(),
MessageFilter: createReq.DeliverFilters,
})
if err != nil {
return nil, err
}
consumeServer := &consumeGrpcServerHelper{
StreamingNodeHandlerService_ConsumeServer: streamServer,
}
if err := consumeServer.SendCreated(&streamingpb.CreateConsumerResponse{
WalName: l.WALName(),
}); err != nil {
return nil, errors.Wrap(err, "at send created")
}
req, err := streamServer.Recv()
if err != nil {
return nil, errors.New("receive create consumer request failed")
}
createVChannelReq := req.GetCreateVchannelConsumer()
if createVChannelReq == nil {
return nil, errors.New("The first message must be create vchannel consumer request")
}
scanner, err := l.Read(streamServer.Context(), wal.ReadOption{
VChannel: createVChannelReq.GetVchannel(),
DeliverPolicy: createVChannelReq.GetDeliverPolicy(),
MessageFilter: createVChannelReq.GetDeliverFilters(),
})
if err != nil {
return nil, err
}
// TODO: consumerID should be generated after we enabling multi-vchannel consuming on same grpc stream.
consumerID := int64(1)
if err := consumeServer.SendCreateVChannelConsumer(&streamingpb.CreateVChannelConsumerResponse{
Response: &streamingpb.CreateVChannelConsumerResponse_ConsumerId{
ConsumerId: consumerID,
},
}); err != nil {
// release the scanner to avoid resource leak.
if err := scanner.Close(); err != nil {
log.Warn("close scanner failed at create consume server", zap.Error(err))
}
return nil, errors.Wrap(err, "at send created")
return nil, err
}
metrics := newConsumerMetrics(l.Channel().Name)
return &ConsumeServer{
consumerID: 1,
scanner: scanner,
consumeServer: consumeServer,
logger: log.With(zap.String("channel", l.Channel().Name), zap.Int64("term", l.Channel().Term)), // Add trace info for all log.
@ -64,6 +97,7 @@ func CreateConsumeServer(walManager walmanager.Manager, streamServer streamingpb
// ConsumeServer is a ConsumeServer of log messages.
type ConsumeServer struct {
consumerID int64
scanner wal.Scanner
consumeServer *consumeGrpcServerHelper
logger *log.MLogger
@ -151,6 +185,7 @@ func (c *ConsumeServer) sendImmutableMessage(msg message.ImmutableMessage) (err
// Send Consumed message to client and do metrics.
if err := c.consumeServer.SendConsumeMessage(&streamingpb.ConsumeMessageReponse{
ConsumerId: c.consumerID,
Message: &messagespb.ImmutableMessage{
Id: &messagespb.MessageID{
Id: msg.MessageID().Marshal(),

View File

@ -45,9 +45,6 @@ func TestCreateConsumeServer(t *testing.T) {
Name: "test",
Term: 1,
},
DeliverPolicy: &streamingpb.DeliverPolicy{
Policy: &streamingpb.DeliverPolicy_All{},
},
}))
ctx := metadata.NewIncomingContext(context.Background(), meta)
grpcConsumeServer.ExpectedCalls = nil
@ -55,8 +52,30 @@ func TestCreateConsumeServer(t *testing.T) {
manager.EXPECT().GetAvailableWAL(types.PChannelInfo{Name: "test", Term: int64(1)}).Return(nil, errors.New("wal not exist"))
assertCreateConsumeServerFail(t, manager, grpcConsumeServer)
// Return error if create scanner failed.
// Return error if send created failed.
l := mock_wal.NewMockWAL(t)
manager.ExpectedCalls = nil
l.EXPECT().WALName().Return("test")
manager.EXPECT().GetAvailableWAL(types.PChannelInfo{Name: "test", Term: int64(1)}).Return(l, nil)
grpcConsumeServer.EXPECT().Send(mock.Anything).Return(errors.New("send created failed"))
assertCreateConsumeServerFail(t, manager, grpcConsumeServer)
// Return error if recv failed.
grpcConsumeServer.EXPECT().Send(mock.Anything).Unset()
grpcConsumeServer.EXPECT().Send(mock.Anything).Return(nil)
grpcConsumeServer.EXPECT().Recv().Return(nil, io.ErrUnexpectedEOF)
assertCreateConsumeServerFail(t, manager, grpcConsumeServer)
// Return error if create scanner failed.
grpcConsumeServer.EXPECT().Recv().Unset()
grpcConsumeServer.EXPECT().Recv().Return(&streamingpb.ConsumeRequest{
Request: &streamingpb.ConsumeRequest_CreateVchannelConsumer{
CreateVchannelConsumer: &streamingpb.CreateVChannelConsumerRequest{
Vchannel: "test",
},
},
}, nil)
l = mock_wal.NewMockWAL(t)
l.EXPECT().Read(mock.Anything, mock.Anything).Return(nil, errors.New("create scanner failed"))
l.EXPECT().WALName().Return("test")
manager.ExpectedCalls = nil
@ -64,7 +83,15 @@ func TestCreateConsumeServer(t *testing.T) {
assertCreateConsumeServerFail(t, manager, grpcConsumeServer)
// Return error if send created failed.
grpcConsumeServer.EXPECT().Send(mock.Anything).Return(errors.New("send created failed"))
grpcConsumeServer.EXPECT().Send(mock.Anything).Unset()
i := 0
grpcConsumeServer.EXPECT().Send(mock.Anything).RunAndReturn(func(cr *streamingpb.ConsumeResponse) error {
if i >= 1 {
return io.ErrUnexpectedEOF
}
i++
return nil
})
l.EXPECT().Read(mock.Anything, mock.Anything).Unset()
s := mock_wal.NewMockScanner(t)
s.EXPECT().Close().Return(nil)

View File

@ -28,6 +28,9 @@ func newScannerAdaptor(
scanMetrics *metricsutil.ScannerMetrics,
cleanup func(),
) wal.Scanner {
if readOption.VChannel == "" {
panic("vchannel of scanner must be set")
}
if readOption.MesasgeHandler == nil {
readOption.MesasgeHandler = defaultMessageHandler(make(chan message.ImmutableMessage))
}
@ -170,6 +173,12 @@ func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) {
return
}
// Filtering the vchannel
// If the message is not belong to any vchannel, it should be broadcasted to all vchannels.
// Otherwise, it should be filtered by vchannel.
if msg.VChannel() != "" && s.readOption.VChannel != msg.VChannel() {
return
}
// Filtering the message if needed.
// System message should never be filtered.
if s.filterFunc != nil && !s.filterFunc(msg) {

View File

@ -20,16 +20,25 @@ func TestScannerAdaptorReadError(t *testing.T) {
l.EXPECT().Read(mock.Anything, mock.Anything).Return(nil, err)
l.EXPECT().Channel().Return(types.PChannelInfo{})
s := newScannerAdaptor("scanner",
l,
assert.Panics(t, func() {
s := newScannerAdaptor("scanner", l,
wal.ReadOption{
DeliverPolicy: options.DeliverPolicyAll(),
MessageFilter: nil,
},
metricsutil.NewScanMetrics(types.PChannelInfo{}).NewScannerMetrics(),
func() {})
defer s.Close()
})
s := newScannerAdaptor("scanner", l,
wal.ReadOption{
VChannel: "test",
DeliverPolicy: options.DeliverPolicyAll(),
MessageFilter: nil,
},
metricsutil.NewScanMetrics(types.PChannelInfo{}).NewScannerMetrics(),
func() {})
defer s.Close()
<-s.Chan()
<-s.Done()
assert.ErrorIs(t, s.Error(), err)

View File

@ -36,7 +36,9 @@ func TestWalAdaptorReadFail(t *testing.T) {
})
lAdapted := adaptImplsToWAL(l, nil, func() {})
scanner, err := lAdapted.Read(context.Background(), wal.ReadOption{})
scanner, err := lAdapted.Read(context.Background(), wal.ReadOption{
VChannel: "test",
})
assert.NoError(t, err)
assert.NotNil(t, scanner)
assert.ErrorIs(t, scanner.Error(), expectedErr)
@ -91,7 +93,7 @@ func TestWALAdaptor(t *testing.T) {
go func(i int) {
defer wg.Done()
scanner, err := lAdapted.Read(context.Background(), wal.ReadOption{})
scanner, err := lAdapted.Read(context.Background(), wal.ReadOption{VChannel: "test"})
if err != nil {
assertShutdownError(t, err)
return
@ -121,7 +123,7 @@ func TestWALAdaptor(t *testing.T) {
lAdapted.AppendAsync(context.Background(), msg, func(mi *wal.AppendResult, err error) {
assertShutdownError(t, err)
})
_, err = lAdapted.Read(context.Background(), wal.ReadOption{})
_, err = lAdapted.Read(context.Background(), wal.ReadOption{VChannel: "test"})
assertShutdownError(t, err)
}

View File

@ -30,6 +30,8 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
)
const testVChannel = "v1"
type walTestFramework struct {
b wal.OpenerBuilder
t *testing.T
@ -184,7 +186,7 @@ func (f *testOneWALFramework) testSendCreateCollection(ctx context.Context, w wa
PartitionIds: []int64{2},
}).
WithBody(&msgpb.CreateCollectionRequest{}).
WithVChannel("v1").
WithVChannel(testVChannel).
BuildMutable()
assert.NoError(f.t, err)
@ -203,7 +205,7 @@ func (f *testOneWALFramework) testSendDropCollection(ctx context.Context, w wal.
CollectionId: 1,
}).
WithBody(&msgpb.DropCollectionRequest{}).
WithVChannel("v1").
WithVChannel(testVChannel).
BuildMutable()
assert.NoError(f.t, err)
@ -223,7 +225,7 @@ func (f *testOneWALFramework) testAppend(ctx context.Context, w wal.WAL) ([]mess
createPartOfTxn := func() (*message.ImmutableTxnMessageBuilder, *message.TxnContext) {
msg, err := message.NewBeginTxnMessageBuilderV2().
WithVChannel("v1").
WithVChannel(testVChannel).
WithHeader(&message.BeginTxnMessageHeader{
KeepaliveMilliseconds: 1000,
}).
@ -322,6 +324,7 @@ func (f *testOneWALFramework) testAppend(ctx context.Context, w wal.WAL) ([]mess
func (f *testOneWALFramework) testRead(ctx context.Context, w wal.WAL) ([]message.ImmutableMessage, error) {
s, err := w.Read(ctx, wal.ReadOption{
VChannel: testVChannel,
DeliverPolicy: options.DeliverPolicyAll(),
MessageFilter: []options.DeliverFilter{
options.DeliverFilterMessageType(message.MessageTypeInsert),
@ -367,6 +370,7 @@ func (f *testOneWALFramework) testReadWithOption(ctx context.Context, w wal.WAL)
// Test start from some message and timetick is gte than it.
readFromMsg := f.written[idx]
s, err := w.Read(ctx, wal.ReadOption{
VChannel: testVChannel,
DeliverPolicy: options.DeliverPolicyStartFrom(readFromMsg.LastConfirmedMessageID()),
MessageFilter: []options.DeliverFilter{
options.DeliverFilterTimeTickGTE(readFromMsg.TimeTick()),

View File

@ -16,6 +16,7 @@ var ErrUpstreamClosed = errors.New("upstream closed")
// ReadOption is the option for reading records from the wal.
type ReadOption struct {
VChannel string // vchannel name
DeliverPolicy options.DeliverPolicy
MessageFilter []options.DeliverFilter
MesasgeHandler MessageHandler // message handler for message processing.

View File

@ -17,9 +17,6 @@ func TestWithCreateConsumer(t *testing.T) {
Name: "test",
Term: 1,
},
DeliverPolicy: &streamingpb.DeliverPolicy{
Policy: &streamingpb.DeliverPolicy_All{},
},
}
ctx := WithCreateConsumer(context.Background(), req)
@ -32,7 +29,6 @@ func TestWithCreateConsumer(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, req.Pchannel.Name, req2.Pchannel.Name)
assert.Equal(t, req.Pchannel.Term, req2.Pchannel.Term)
assert.Equal(t, req.DeliverPolicy.String(), req2.DeliverPolicy.String())
// panic case.
assert.NotPanics(t, func() { WithCreateConsumer(context.Background(), nil) })

View File

@ -156,8 +156,7 @@ message DeliverFilter {
oneof filter {
DeliverFilterTimeTickGT time_tick_gt = 1;
DeliverFilterTimeTickGTE time_tick_gte = 2;
DeliverFilterVChannel vchannel = 3;
DeliverFilterMessageType message_type = 4;
DeliverFilterMessageType message_type = 3;
}
}
@ -175,11 +174,6 @@ message DeliverFilterTimeTickGTE {
// equal to this value.
}
// DeliverFilterVChannel is the filter to deliver message with vchannel name.
message DeliverFilterVChannel {
string vchannel = 1; // deliver message with vchannel name.
}
message DeliverFilterMessageType {
// deliver message with message type.
repeated messages.MessageType message_types = 1;
@ -273,8 +267,8 @@ message ProduceResponse {
// CreateProducerResponse is the result of the CreateProducer RPC.
message CreateProducerResponse {
string wal_name = 1; // wal name at server side.
int64 producer_id = 2; // A unique producer id on streamingnode for this
// producer in streamingnode lifetime.
int64 producer_server_id = 2; // A unique producer server id on streamingnode
// for this producer in streamingnode lifetime.
// Is used to identify the producer in streamingnode for other unary grpc
// call at producer level.
}
@ -304,7 +298,11 @@ message CloseProducerResponse {}
// Add more control block in future.
message ConsumeRequest {
oneof request {
CloseConsumerRequest close = 1;
CreateVChannelConsumerRequest create_vchannel_consumer = 1;
CreateVChannelConsumersRequest create_vchannel_consumers =
2; // Create multiple vchannel consumers, used for recovery in future.
CloseVChannelConsumerRequest close_vchannel = 3;
CloseConsumerRequest close = 4;
}
}
@ -316,25 +314,67 @@ message CloseConsumerRequest {}
// CreateConsumerRequest is passed in the header of stream.
message CreateConsumerRequest {
PChannelInfo pchannel = 1;
}
message CreateVChannelConsumersRequest {
repeated CreateVChannelConsumerRequest create_vchannels = 1;
}
// CreateVChannelConsumerRequest is the request of the CreateVChannelConsumer
// RPC. It's used to create a new vchannel consumer at server side.
message CreateVChannelConsumerRequest {
string vchannel = 1;
DeliverPolicy deliver_policy = 2; // deliver policy.
repeated DeliverFilter deliver_filters = 3; // deliver filter.
}
// ConsumeMessageRequest is the request of the Consume RPC.
message CreateVChannelConsumersResponse {
repeated CreateVChannelConsumerResponse create_vchannels = 1;
}
// CreateVChannelConsumerResponse is the response of the CreateVChannelConsumer
// RPC.
message CreateVChannelConsumerResponse {
oneof response {
int64 consumer_id = 1;
StreamingError error = 2;
}
}
// CloseVChannelConsumerRequest is the request of the CloseVChannelConsumer RPC.
message CloseVChannelConsumerRequest {
int64 consumer_id = 1;
}
// CloseVChannelConsumerResponse is the response of the CloseVChannelConsumer
// RPC.
message CloseVChannelConsumerResponse {
int64 consumer_id = 1;
}
// ConsumeResponse is the reponse of the Consume RPC.
message ConsumeResponse {
oneof response {
CreateConsumerResponse create = 1;
ConsumeMessageReponse consume = 2;
CloseConsumerResponse close = 3;
CreateVChannelConsumerResponse create_vchannel = 3;
CreateVChannelConsumersResponse create_vchannels = 4;
CloseVChannelConsumerResponse close_vchannel = 5;
CloseConsumerResponse close = 6;
}
}
message CreateConsumerResponse {
string wal_name = 1; // wal name at server side.
// A unique consumer id on streamingnode for this
// consumer in streamingnode lifetime.
int64 consumer_server_id = 2;
}
message ConsumeMessageReponse {
messages.ImmutableMessage message = 1;
int64 consumer_id = 1;
messages.ImmutableMessage message = 2;
}
message CloseConsumerResponse {}

View File

@ -16,8 +16,7 @@ const (
DeliverFilterTypeTimeTickGT deliverFilterType = 1
DeliverFilterTypeTimeTickGTE deliverFilterType = 2
DeliverFilterTypeVChannel deliverFilterType = 3
DeliverFilterTypeMessageType deliverFilterType = 4
DeliverFilterTypeMessageType deliverFilterType = 3
)
type (
@ -91,17 +90,6 @@ func DeliverFilterTimeTickGTE(timeTick uint64) DeliverFilter {
}
}
// DeliverFilterVChannel delivers messages filtered by vchannel.
func DeliverFilterVChannel(vchannel string) DeliverFilter {
return &streamingpb.DeliverFilter{
Filter: &streamingpb.DeliverFilter_Vchannel{
Vchannel: &streamingpb.DeliverFilterVChannel{
Vchannel: vchannel,
},
},
}
}
// DeliverFilterMessageType delivers messages filtered by message type.
func DeliverFilterMessageType(messageType ...message.MessageType) DeliverFilter {
messageTypes := make([]messagespb.MessageType, 0, len(messageType))
@ -154,11 +142,6 @@ func GetFilterFunc(filters []DeliverFilter) func(message.ImmutableMessage) bool
}
return true
})
case *streamingpb.DeliverFilter_Vchannel:
filterFuncs = append(filterFuncs, func(im message.ImmutableMessage) bool {
// vchannel == "" is a broadcast operation.
return im.VChannel() == "" || im.VChannel() == filter.GetVchannel().Vchannel
})
case *streamingpb.DeliverFilter_MessageType:
filterFuncs = append(filterFuncs, func(im message.ImmutableMessage) bool {
// system message cannot be filterred.

View File

@ -33,9 +33,6 @@ func TestDeliverFilter(t *testing.T) {
filter = DeliverFilterTimeTickGTE(2)
_ = filter.GetFilter().(*streamingpb.DeliverFilter_TimeTickGte)
filter = DeliverFilterVChannel("vchannel")
_ = filter.GetFilter().(*streamingpb.DeliverFilter_Vchannel)
filter = DeliverFilterMessageType(message.MessageTypeDelete)
_ = filter.GetFilter().(*streamingpb.DeliverFilter_MessageType)
}
@ -43,45 +40,33 @@ func TestDeliverFilter(t *testing.T) {
func TestNewMessageFilter(t *testing.T) {
filters := []DeliverFilter{
DeliverFilterTimeTickGT(1),
DeliverFilterVChannel("test"),
}
filterFunc := GetFilterFunc(filters)
msg := mock_message.NewMockImmutableMessage(t)
msg.EXPECT().TimeTick().Return(2).Maybe()
msg.EXPECT().VChannel().Return("test2").Maybe()
msg.EXPECT().TimeTick().Return(1).Maybe()
msg.EXPECT().TxnContext().Return(nil).Maybe()
assert.False(t, filterFunc(msg))
msg = mock_message.NewMockImmutableMessage(t)
msg.EXPECT().TimeTick().Return(1).Maybe()
msg.EXPECT().VChannel().Return("test").Maybe()
msg.EXPECT().TxnContext().Return(nil).Maybe()
assert.False(t, filterFunc(msg))
msg = mock_message.NewMockImmutableMessage(t)
msg.EXPECT().TimeTick().Return(1).Maybe()
msg.EXPECT().VChannel().Return("").Maybe() // vchannel == "" should not be filtered.
msg.EXPECT().TxnContext().Return(nil).Maybe()
assert.False(t, filterFunc(msg))
msg = mock_message.NewMockImmutableMessage(t)
msg.EXPECT().TimeTick().Return(2).Maybe()
msg.EXPECT().VChannel().Return("test").Maybe()
msg.EXPECT().TxnContext().Return(nil).Maybe()
assert.True(t, filterFunc(msg))
// if message is a txn message, it should be only filterred by time tick when the message type is commit.
msg = mock_message.NewMockImmutableMessage(t)
msg.EXPECT().TimeTick().Return(1).Maybe()
msg.EXPECT().VChannel().Return("test").Maybe()
msg.EXPECT().TxnContext().Return(&message.TxnContext{}).Maybe()
msg.EXPECT().MessageType().Return(message.MessageTypeCommitTxn).Maybe()
assert.False(t, filterFunc(msg))
msg = mock_message.NewMockImmutableMessage(t)
msg.EXPECT().TimeTick().Return(1).Maybe()
msg.EXPECT().VChannel().Return("test").Maybe()
msg.EXPECT().TxnContext().Return(&message.TxnContext{}).Maybe()
msg.EXPECT().MessageType().Return(message.MessageTypeInsert).Maybe()
assert.True(t, filterFunc(msg))
@ -89,41 +74,35 @@ func TestNewMessageFilter(t *testing.T) {
// if message is a txn message, it should be only filterred by time tick when the message type is commit.
msg = mock_message.NewMockImmutableMessage(t)
msg.EXPECT().TimeTick().Return(1).Maybe()
msg.EXPECT().VChannel().Return("test").Maybe()
msg.EXPECT().TxnContext().Return(&message.TxnContext{}).Maybe()
msg.EXPECT().MessageType().Return(message.MessageTypeCommitTxn).Maybe()
assert.False(t, filterFunc(msg))
msg = mock_message.NewMockImmutableMessage(t)
msg.EXPECT().TimeTick().Return(1).Maybe()
msg.EXPECT().VChannel().Return("test").Maybe()
msg.EXPECT().TxnContext().Return(&message.TxnContext{}).Maybe()
msg.EXPECT().MessageType().Return(message.MessageTypeInsert).Maybe()
assert.True(t, filterFunc(msg))
filters = []*streamingpb.DeliverFilter{
DeliverFilterTimeTickGTE(1),
DeliverFilterVChannel("test"),
}
filterFunc = GetFilterFunc(filters)
msg = mock_message.NewMockImmutableMessage(t)
msg.EXPECT().TimeTick().Return(1).Maybe()
msg.EXPECT().VChannel().Return("test").Maybe()
msg.EXPECT().TxnContext().Return(nil).Maybe()
assert.True(t, filterFunc(msg))
// if message is a txn message, it should be only filterred by time tick when the message type is commit.
msg = mock_message.NewMockImmutableMessage(t)
msg.EXPECT().TimeTick().Return(1).Maybe()
msg.EXPECT().VChannel().Return("test").Maybe()
msg.EXPECT().TxnContext().Return(&message.TxnContext{}).Maybe()
msg.EXPECT().MessageType().Return(message.MessageTypeCommitTxn).Maybe()
assert.True(t, filterFunc(msg))
msg = mock_message.NewMockImmutableMessage(t)
msg.EXPECT().TimeTick().Return(1).Maybe()
msg.EXPECT().VChannel().Return("test").Maybe()
msg.EXPECT().TxnContext().Return(&message.TxnContext{}).Maybe()
msg.EXPECT().MessageType().Return(message.MessageTypeInsert).Maybe()
assert.True(t, filterFunc(msg))