enhance: erase the rpc level when wal is located at same node (#38858)

issue: #38399

- Make the wal scanner interface same with streaming scanner.
- Use wal if the wal is located at current node.
- Otherwise fallback the old logic.

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-02-05 22:25:10 +08:00 committed by GitHub
parent c1794cc490
commit 5669016af0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 429 additions and 609 deletions

View File

@ -74,11 +74,12 @@ func (rc *resumableConsumerImpl) resumeLoop() {
// consumer need to resume when error occur, so message handler shouldn't close if the internal consumer encounter failure.
nopCloseMH := nopCloseHandler{
Handler: rc.mh,
HandleInterceptor: func(ctx context.Context, msg message.ImmutableMessage, handle handleFunc) (bool, error) {
g := rc.metrics.StartConsume(msg.EstimateSize())
ok, err := handle(ctx, msg)
g.Finish()
return ok, err
HandleInterceptor: func(handleParam message.HandleParam, h message.Handler) message.HandleResult {
if handleParam.Message != nil {
g := rc.metrics.StartConsume(handleParam.Message.EstimateSize())
defer func() { g.Finish() }()
}
return h.Handle(handleParam)
},
}

View File

@ -12,6 +12,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/client/handler"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/consumer"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
)
@ -22,22 +23,25 @@ func TestResumableConsumer(t *testing.T) {
ch := make(chan struct{})
c.EXPECT().Done().Return(ch)
c.EXPECT().Error().Return(errors.New("test"))
c.EXPECT().Close().Return()
c.EXPECT().Close().Return(nil)
rc := NewResumableConsumer(func(ctx context.Context, opts *handler.ConsumerOptions) (consumer.Consumer, error) {
if i == 0 {
i++
ok, err := opts.MessageHandler.Handle(context.Background(), message.NewImmutableMesasge(
walimplstest.NewTestMessageID(123),
[]byte("payload"),
map[string]string{
"key": "value",
"_t": "1",
"_tt": message.EncodeUint64(456),
"_v": "1",
"_lc": walimplstest.NewTestMessageID(123).Marshal(),
}))
assert.True(t, ok)
assert.NoError(t, err)
result := opts.MessageHandler.Handle(message.HandleParam{
Ctx: context.Background(),
Message: message.NewImmutableMesasge(
walimplstest.NewTestMessageID(123),
[]byte("payload"),
map[string]string{
"key": "value",
"_t": "1",
"_tt": message.EncodeUint64(456),
"_v": "1",
"_lc": walimplstest.NewTestMessageID(123).Marshal(),
}),
})
assert.True(t, result.MessageHandled)
assert.NoError(t, result.Error)
return c, nil
} else if i == 1 {
i++
@ -46,7 +50,7 @@ func TestResumableConsumer(t *testing.T) {
newC := mock_consumer.NewMockConsumer(t)
newC.EXPECT().Done().Return(make(<-chan struct{}))
newC.EXPECT().Error().Return(errors.New("test"))
newC.EXPECT().Close().Return()
newC.EXPECT().Close().Return(nil)
return newC, nil
}, &ConsumerOptions{
PChannel: "test",
@ -54,7 +58,7 @@ func TestResumableConsumer(t *testing.T) {
DeliverFilters: []options.DeliverFilter{
options.DeliverFilterTimeTickGT(1),
},
MessageHandler: message.ChanMessageHandler(make(chan message.ImmutableMessage, 2)),
MessageHandler: adaptor.ChanMessageHandler(make(chan message.ImmutableMessage, 2)),
})
select {
@ -76,10 +80,13 @@ func TestResumableConsumer(t *testing.T) {
func TestHandler(t *testing.T) {
ch := make(chan message.ImmutableMessage, 100)
hNop := nopCloseHandler{
Handler: message.ChanMessageHandler(ch),
Handler: adaptor.ChanMessageHandler(ch),
}
hNop.Handle(context.Background(), nil)
assert.Nil(t, <-ch)
hNop.Handle(message.HandleParam{
Ctx: context.Background(),
Message: message.NewImmutableMesasge(walimplstest.NewTestMessageID(123), []byte("payload"), nil),
})
assert.NotNil(t, <-ch)
hNop.Close()
select {
case <-ch:

View File

@ -1,25 +1,21 @@
package consumer
import (
"context"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
type handleFunc func(ctx context.Context, msg message.ImmutableMessage) (bool, error)
// nopCloseHandler is a handler that do nothing when close.
type nopCloseHandler struct {
message.Handler
HandleInterceptor func(ctx context.Context, msg message.ImmutableMessage, handle handleFunc) (bool, error)
HandleInterceptor func(handleParam message.HandleParam, h message.Handler) message.HandleResult
}
// Handle is the callback for handling message.
func (nch nopCloseHandler) Handle(ctx context.Context, msg message.ImmutableMessage) (bool, error) {
func (nch nopCloseHandler) Handle(handleParam message.HandleParam) message.HandleResult {
if nch.HandleInterceptor != nil {
return nch.HandleInterceptor(ctx, msg, nch.Handler.Handle)
return nch.HandleInterceptor(handleParam, nch.Handler)
}
return nch.Handler.Handle(ctx, msg)
return nch.Handler.Handle(handleParam)
}
// Close is called after all messages are handled or handling is interrupted.

View File

@ -1,8 +1,6 @@
package consumer
import (
"context"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
@ -13,16 +11,20 @@ type timeTickOrderMessageHandler struct {
lastTimeTick uint64
}
func (mh *timeTickOrderMessageHandler) Handle(ctx context.Context, msg message.ImmutableMessage) (bool, error) {
lastConfirmedMessageID := msg.LastConfirmedMessageID()
timetick := msg.TimeTick()
ok, err := mh.inner.Handle(ctx, msg)
if ok {
mh.lastConfirmedMessageID = lastConfirmedMessageID
mh.lastTimeTick = timetick
func (mh *timeTickOrderMessageHandler) Handle(handleParam message.HandleParam) message.HandleResult {
var lastConfirmedMessageID message.MessageID
var lastTimeTick uint64
if handleParam.Message != nil {
lastConfirmedMessageID = handleParam.Message.LastConfirmedMessageID()
lastTimeTick = handleParam.Message.TimeTick()
}
return ok, err
result := mh.inner.Handle(handleParam)
if result.MessageHandled {
mh.lastConfirmedMessageID = lastConfirmedMessageID
mh.lastTimeTick = lastTimeTick
}
return result
}
func (mh *timeTickOrderMessageHandler) Close() {

View File

@ -15,6 +15,7 @@ import (
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -77,7 +78,7 @@ type ResumableProducer struct {
}
// Produce produce a new message to log service.
func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMessage) (result *producer.ProduceResult, err error) {
func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMessage) (result *types.AppendResult, err error) {
if !p.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, errors.Wrapf(errs.ErrClosed, "produce on closed producer")
}
@ -94,7 +95,7 @@ func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMess
return nil, err
}
produceResult, err := producerHandler.Produce(ctx, msg)
produceResult, err := producerHandler.Append(ctx, msg)
if err == nil {
return produceResult, nil
}

View File

@ -14,12 +14,13 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/producer"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
)
func TestResumableProducer(t *testing.T) {
p := mock_producer.NewMockProducer(t)
msgID := mock_message.NewMockMessageID(t)
p.EXPECT().Produce(mock.Anything, mock.Anything).Return(&producer.ProduceResult{
p.EXPECT().Append(mock.Anything, mock.Anything).Return(&types.AppendResult{
MessageID: msgID,
TimeTick: 100,
}, nil)
@ -47,11 +48,11 @@ func TestResumableProducer(t *testing.T) {
} else if i == 2 {
p := mock_producer.NewMockProducer(t)
msgID := mock_message.NewMockMessageID(t)
p.EXPECT().Produce(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (*producer.ProduceResult, error) {
p.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (*types.AppendResult, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
return &producer.ProduceResult{
return &types.AppendResult{
MessageID: msgID,
TimeTick: 100,
}, nil

View File

@ -10,6 +10,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
_ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -109,7 +110,7 @@ func TestStreamingConsume(t *testing.T) {
t.Skip()
streaming.Init()
defer streaming.Release()
ch := make(message.ChanMessageHandler, 10)
ch := make(adaptor.ChanMessageHandler, 10)
s := streaming.WAL().Read(context.Background(), streaming.ReadOption{
VChannel: vChannels[0],
DeliverPolicy: options.DeliverPolicyAll(),

View File

@ -70,7 +70,7 @@ func TestWAL(t *testing.T) {
return true
}
})
p.EXPECT().Produce(mock.Anything, mock.Anything).Return(&types.AppendResult{
p.EXPECT().Append(mock.Anything, mock.Anything).Return(&types.AppendResult{
MessageID: walimplstest.NewTestMessageID(1),
TimeTick: 10,
TxnCtx: &message.TxnContext{

View File

@ -242,7 +242,6 @@ func (s *Server) init() (err error) {
WithDataCoordClient(s.dataCoord).
WithSession(s.session).
WithMetaKV(s.metaKV).
WithChunkManager(s.chunkManager).
Build()
if err := s.streamingnode.Init(s.ctx); err != nil {
return errors.Wrap(err, "StreamingNode service init failed")

View File

@ -18,8 +18,21 @@ func (_m *MockConsumer) EXPECT() *MockConsumer_Expecter {
}
// Close provides a mock function with given fields:
func (_m *MockConsumer) Close() {
_m.Called()
func (_m *MockConsumer) Close() error {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Close")
}
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// MockConsumer_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
@ -39,12 +52,12 @@ func (_c *MockConsumer_Close_Call) Run(run func()) *MockConsumer_Close_Call {
return _c
}
func (_c *MockConsumer_Close_Call) Return() *MockConsumer_Close_Call {
_c.Call.Return()
func (_c *MockConsumer_Close_Call) Return(_a0 error) *MockConsumer_Close_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockConsumer_Close_Call) RunAndReturn(run func()) *MockConsumer_Close_Call {
func (_c *MockConsumer_Close_Call) RunAndReturn(run func() error) *MockConsumer_Close_Call {
_c.Call.Return(run)
return _c
}

View File

@ -24,47 +24,61 @@ func (_m *MockProducer) EXPECT() *MockProducer_Expecter {
return &MockProducer_Expecter{mock: &_m.Mock}
}
// Assignment provides a mock function with given fields:
func (_m *MockProducer) Assignment() types.PChannelInfoAssigned {
ret := _m.Called()
// Append provides a mock function with given fields: ctx, msg
func (_m *MockProducer) Append(ctx context.Context, msg message.MutableMessage) (*types.AppendResult, error) {
ret := _m.Called(ctx, msg)
if len(ret) == 0 {
panic("no return value specified for Assignment")
panic("no return value specified for Append")
}
var r0 types.PChannelInfoAssigned
if rf, ok := ret.Get(0).(func() types.PChannelInfoAssigned); ok {
r0 = rf()
var r0 *types.AppendResult
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) (*types.AppendResult, error)); ok {
return rf(ctx, msg)
}
if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) *types.AppendResult); ok {
r0 = rf(ctx, msg)
} else {
r0 = ret.Get(0).(types.PChannelInfoAssigned)
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.AppendResult)
}
}
return r0
if rf, ok := ret.Get(1).(func(context.Context, message.MutableMessage) error); ok {
r1 = rf(ctx, msg)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockProducer_Assignment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Assignment'
type MockProducer_Assignment_Call struct {
// MockProducer_Append_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Append'
type MockProducer_Append_Call struct {
*mock.Call
}
// Assignment is a helper method to define mock.On call
func (_e *MockProducer_Expecter) Assignment() *MockProducer_Assignment_Call {
return &MockProducer_Assignment_Call{Call: _e.mock.On("Assignment")}
// Append is a helper method to define mock.On call
// - ctx context.Context
// - msg message.MutableMessage
func (_e *MockProducer_Expecter) Append(ctx interface{}, msg interface{}) *MockProducer_Append_Call {
return &MockProducer_Append_Call{Call: _e.mock.On("Append", ctx, msg)}
}
func (_c *MockProducer_Assignment_Call) Run(run func()) *MockProducer_Assignment_Call {
func (_c *MockProducer_Append_Call) Run(run func(ctx context.Context, msg message.MutableMessage)) *MockProducer_Append_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
run(args[0].(context.Context), args[1].(message.MutableMessage))
})
return _c
}
func (_c *MockProducer_Assignment_Call) Return(_a0 types.PChannelInfoAssigned) *MockProducer_Assignment_Call {
_c.Call.Return(_a0)
func (_c *MockProducer_Append_Call) Return(_a0 *types.AppendResult, _a1 error) *MockProducer_Append_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockProducer_Assignment_Call) RunAndReturn(run func() types.PChannelInfoAssigned) *MockProducer_Assignment_Call {
func (_c *MockProducer_Append_Call) RunAndReturn(run func(context.Context, message.MutableMessage) (*types.AppendResult, error)) *MockProducer_Append_Call {
_c.Call.Return(run)
return _c
}
@ -193,65 +207,6 @@ func (_c *MockProducer_IsAvailable_Call) RunAndReturn(run func() bool) *MockProd
return _c
}
// Produce provides a mock function with given fields: ctx, msg
func (_m *MockProducer) Produce(ctx context.Context, msg message.MutableMessage) (*types.AppendResult, error) {
ret := _m.Called(ctx, msg)
if len(ret) == 0 {
panic("no return value specified for Produce")
}
var r0 *types.AppendResult
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) (*types.AppendResult, error)); ok {
return rf(ctx, msg)
}
if rf, ok := ret.Get(0).(func(context.Context, message.MutableMessage) *types.AppendResult); ok {
r0 = rf(ctx, msg)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.AppendResult)
}
}
if rf, ok := ret.Get(1).(func(context.Context, message.MutableMessage) error); ok {
r1 = rf(ctx, msg)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockProducer_Produce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Produce'
type MockProducer_Produce_Call struct {
*mock.Call
}
// Produce is a helper method to define mock.On call
// - ctx context.Context
// - msg message.MutableMessage
func (_e *MockProducer_Expecter) Produce(ctx interface{}, msg interface{}) *MockProducer_Produce_Call {
return &MockProducer_Produce_Call{Call: _e.mock.On("Produce", ctx, msg)}
}
func (_c *MockProducer_Produce_Call) Run(run func(ctx context.Context, msg message.MutableMessage)) *MockProducer_Produce_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(message.MutableMessage))
})
return _c
}
func (_c *MockProducer_Produce_Call) Return(_a0 *types.AppendResult, _a1 error) *MockProducer_Produce_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockProducer_Produce_Call) RunAndReturn(run func(context.Context, message.MutableMessage) (*types.AppendResult, error)) *MockProducer_Produce_Call {
_c.Call.Return(run)
return _c
}
// NewMockProducer creates a new instance of MockProducer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockProducer(t interface {

View File

@ -14,5 +14,5 @@ type Consumer interface {
Done() <-chan struct{}
// Close the consumer, release the underlying resources.
Close()
Close() error
}

View File

@ -107,7 +107,7 @@ type consumerImpl struct {
}
// Close close the consumer client.
func (c *consumerImpl) Close() {
func (c *consumerImpl) Close() error {
// Send the close request to server.
if err := c.grpcStreamClient.Send(&streamingpb.ConsumeRequest{
Request: &streamingpb.ConsumeRequest_Close{},
@ -118,7 +118,7 @@ func (c *consumerImpl) Close() {
if err := c.grpcStreamClient.CloseSend(); err != nil {
c.logger.Warn("close grpc stream failed", zap.Error(err))
}
<-c.finishErr.Done()
return c.finishErr.Get()
}
// Error returns the error of the consumer client.
@ -189,9 +189,12 @@ func (c *consumerImpl) recvLoop() (err error) {
if c.txnBuilder != nil {
panic("unreachable code: txn builder should be nil if we receive a non-txn message")
}
if _, err := c.msgHandler.Handle(c.ctx, newImmutableMsg); err != nil {
if result := c.msgHandler.Handle(message.HandleParam{
Ctx: c.ctx,
Message: newImmutableMsg,
}); result.Error != nil {
c.logger.Warn("message handle canceled", zap.Error(err))
return errors.Wrapf(err, "At Handler")
return errors.Wrapf(result.Error, "At Handler")
}
}
case *streamingpb.ConsumeResponse_Close:
@ -255,7 +258,10 @@ func (c *consumerImpl) handleTxnMessage(msg message.ImmutableMessage) error {
c.logger.Warn("failed to build txn message", zap.Any("messageID", commitMsg.MessageID()), zap.Error(err))
return nil
}
if _, err := c.msgHandler.Handle(c.ctx, msg); err != nil {
if result := c.msgHandler.Handle(message.HandleParam{
Ctx: c.ctx,
Message: msg,
}); result.Error != nil {
c.logger.Warn("message handle canceled at txn", zap.Error(err))
return errors.Wrap(err, "At Handler Of Txn")
}

View File

@ -14,6 +14,7 @@ import (
"github.com/milvus-io/milvus/pkg/proto/messagespb"
"github.com/milvus-io/milvus/pkg/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
@ -21,7 +22,7 @@ import (
)
func TestConsumer(t *testing.T) {
resultCh := make(message.ChanMessageHandler, 1)
resultCh := make(adaptor.ChanMessageHandler, 1)
c := newMockedConsumerImpl(t, context.Background(), resultCh)
mmsg, _ := message.NewInsertMessageBuilderV1().
@ -70,7 +71,7 @@ func TestConsumer(t *testing.T) {
}
func TestConsumerWithCancellation(t *testing.T) {
resultCh := make(message.ChanMessageHandler, 1)
resultCh := make(adaptor.ChanMessageHandler, 1)
ctx, cancel := context.WithCancel(context.Background())
c := newMockedConsumerImpl(t, ctx, resultCh)

View File

@ -11,6 +11,8 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/assignment"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/consumer"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/producer"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/registry"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/util/streamingutil/service/balancer/picker"
"github.com/milvus-io/milvus/internal/util/streamingutil/service/lazygrpc"
"github.com/milvus-io/milvus/internal/util/streamingutil/service/resolver"
@ -21,7 +23,11 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var errWaitNextBackoff = errors.New("wait for next backoff")
var (
errWaitNextBackoff = errors.New("wait for next backoff")
_ producer.Producer = wal.WAL(nil)
_ consumer.Consumer = wal.Scanner(nil)
)
type handlerClientImpl struct {
lifetime *typeutil.Lifetime
@ -40,15 +46,27 @@ func (hc *handlerClientImpl) CreateProducer(ctx context.Context, opts *ProducerO
}
defer hc.lifetime.Done()
p, err := hc.createHandlerAfterStreamingNodeReady(ctx, opts.PChannel, func(ctx context.Context, assign *types.PChannelInfoAssigned) (any, error) {
p, err := hc.createHandlerAfterStreamingNodeReady(ctx, opts.PChannel, func(ctx context.Context, assign *types.PChannelInfoAssigned) (*handlerCreateResult, error) {
// Check if the localWAL is assigned at local
localWAL, err := registry.GetAvailableWAL(assign.Channel)
if err == nil {
return localResult(localWAL), nil
}
if !shouldUseRemoteWAL(err) {
return nil, err
}
// Wait for handler service is ready.
handlerService, err := hc.service.GetService(ctx)
if err != nil {
return nil, err
}
return hc.newProducer(ctx, &producer.ProducerOptions{
remoteWAL, err := hc.newProducer(ctx, &producer.ProducerOptions{
Assignment: assign,
}, handlerService)
if err != nil {
return nil, err
}
return remoteResult(remoteWAL), nil
})
if err != nil {
return nil, err
@ -63,19 +81,41 @@ func (hc *handlerClientImpl) CreateConsumer(ctx context.Context, opts *ConsumerO
}
defer hc.lifetime.Done()
c, err := hc.createHandlerAfterStreamingNodeReady(ctx, opts.PChannel, func(ctx context.Context, assign *types.PChannelInfoAssigned) (any, error) {
c, err := hc.createHandlerAfterStreamingNodeReady(ctx, opts.PChannel, func(ctx context.Context, assign *types.PChannelInfoAssigned) (*handlerCreateResult, error) {
// Check if the localWAL is assigned at local
localWAL, err := registry.GetAvailableWAL(assign.Channel)
if err == nil {
localScanner, err := localWAL.Read(ctx, wal.ReadOption{
VChannel: opts.VChannel,
DeliverPolicy: opts.DeliverPolicy,
MessageFilter: opts.DeliverFilters,
MesasgeHandler: opts.MessageHandler,
})
if err != nil {
return nil, err
}
return localResult(localScanner), nil
}
if !shouldUseRemoteWAL(err) {
return nil, err
}
// Wait for handler service is ready.
handlerService, err := hc.service.GetService(ctx)
if err != nil {
return nil, err
}
return hc.newConsumer(ctx, &consumer.ConsumerOptions{
remoteScanner, err := hc.newConsumer(ctx, &consumer.ConsumerOptions{
Assignment: assign,
VChannel: opts.VChannel,
DeliverPolicy: opts.DeliverPolicy,
DeliverFilters: opts.DeliverFilters,
MessageHandler: opts.MessageHandler,
}, handlerService)
if err != nil {
return nil, err
}
return remoteResult(remoteScanner), nil
})
if err != nil {
return nil, err
@ -83,9 +123,24 @@ func (hc *handlerClientImpl) CreateConsumer(ctx context.Context, opts *ConsumerO
return c.(Consumer), nil
}
func localResult(result any) *handlerCreateResult {
return &handlerCreateResult{result: result, isLocal: true}
}
func remoteResult(result any) *handlerCreateResult {
return &handlerCreateResult{result: result, isLocal: false}
}
type handlerCreateResult struct {
result any
isLocal bool
}
type handlerCreateFunc func(ctx context.Context, assign *types.PChannelInfoAssigned) (*handlerCreateResult, error)
// createHandlerAfterStreamingNodeReady creates a handler until streaming node ready.
// If streaming node is not ready, it will block until new assignment term is coming or context timeout.
func (hc *handlerClientImpl) createHandlerAfterStreamingNodeReady(ctx context.Context, pchannel string, create func(ctx context.Context, assign *types.PChannelInfoAssigned) (any, error)) (any, error) {
func (hc *handlerClientImpl) createHandlerAfterStreamingNodeReady(ctx context.Context, pchannel string, create handlerCreateFunc) (any, error) {
logger := log.With(zap.String("pchannel", pchannel))
// TODO: backoff should be configurable.
backoff := backoff.NewExponentialBackOff()
@ -93,9 +148,10 @@ func (hc *handlerClientImpl) createHandlerAfterStreamingNodeReady(ctx context.Co
assign := hc.watcher.Get(ctx, pchannel)
if assign != nil {
// Find assignment, try to create producer on this assignment.
c, err := create(ctx, assign)
createResult, err := create(ctx, assign)
if err == nil {
return c, nil
logger.Info("create handler success", zap.Any("assignment", assign), zap.Bool("isLocal", createResult.isLocal))
return createResult.result, nil
}
logger.Warn("create handler failed", zap.Any("assignment", assign), zap.Error(err))
@ -158,3 +214,18 @@ func isPermanentFailureUntilNewAssignment(err error) bool {
streamingServiceErr := status.AsStreamingError(err)
return streamingServiceErr.IsWrongStreamingNode()
}
// shouldUseRemoteWAL checks if use remote wal when given error happens.
func shouldUseRemoteWAL(err error) bool {
if err == nil {
panic("the incoming error should never be nil")
}
// When following error happens, we should try to make a remote wal fetch.
// 1. If current node didn't deploy any streaming node.
if errors.Is(err, registry.ErrNoStreamingNodeDeployed) {
return true
}
// 2. If the wal is not exist at current streaming node.
streamingServiceErr := status.AsStreamingError(err)
return streamingServiceErr.IsWrongStreamingNode()
}

View File

@ -19,7 +19,7 @@ import (
"github.com/milvus-io/milvus/pkg/mocks/proto/mock_streamingpb"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_types"
"github.com/milvus-io/milvus/pkg/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -41,9 +41,9 @@ func TestHandlerClient(t *testing.T) {
w.EXPECT().Close().Run(func() {})
p := mock_producer.NewMockProducer(t)
p.EXPECT().Close().Run(func() {})
p.EXPECT().Close().RunAndReturn(func() {})
c := mock_consumer.NewMockConsumer(t)
c.EXPECT().Close().Run(func() {})
c.EXPECT().Close().RunAndReturn(func() error { return nil })
rebalanceTrigger := mock_types.NewMockAssignmentRebalanceTrigger(t)
rebalanceTrigger.EXPECT().ReportAssignmentError(mock.Anything, mock.Anything, mock.Anything).Return(nil)
@ -104,7 +104,7 @@ func TestHandlerClient(t *testing.T) {
options.DeliverFilterTimeTickGT(10),
options.DeliverFilterTimeTickGTE(10),
},
MessageHandler: make(message.ChanMessageHandler),
MessageHandler: make(adaptor.ChanMessageHandler),
})
assert.NoError(t, err)
assert.NotNil(t, consumer)

View File

@ -9,18 +9,13 @@ import (
var _ Producer = (*producerImpl)(nil)
type ProduceResult = types.AppendResult
// Producer is the interface that wraps the basic produce method on grpc stream.
// Producer is work on a single stream on grpc,
// so Producer cannot recover from failure because of the stream is broken.
type Producer interface {
// Assignment returns the assignment of the producer.
Assignment() types.PChannelInfoAssigned
// Produce sends the produce message to server.
// Append sends the produce message to server.
// TODO: Support Batch produce here.
Produce(ctx context.Context, msg message.MutableMessage) (*ProduceResult, error)
Append(ctx context.Context, msg message.MutableMessage) (*types.AppendResult, error)
// Check if a producer is available.
IsAvailable() bool

View File

@ -114,17 +114,12 @@ type produceRequest struct {
}
type produceResponse struct {
result *ProduceResult
result *types.AppendResult
err error
}
// Assignment returns the assignment of the producer.
func (p *producerImpl) Assignment() types.PChannelInfoAssigned {
return p.assignment
}
// Produce sends the produce message to server.
func (p *producerImpl) Produce(ctx context.Context, msg message.MutableMessage) (*ProduceResult, error) {
// Append sends the produce message to server.
func (p *producerImpl) Append(ctx context.Context, msg message.MutableMessage) (*types.AppendResult, error) {
if !p.lifetime.Add(typeutil.LifetimeStateWorking) {
return nil, status.NewOnShutdownError("producer client is shutting down")
}
@ -293,7 +288,7 @@ func (p *producerImpl) recvLoop() (err error) {
return err
}
result = produceResponse{
result: &ProduceResult{
result: &types.AppendResult{
MessageID: msgID,
TimeTick: produceResp.Result.GetTimetick(),
TxnCtx: message.NewTxnContextFromProto(produceResp.Result.GetTxnContext()),

View File

@ -61,12 +61,12 @@ func TestProducer(t *testing.T) {
ch := make(chan struct{})
go func() {
msg := message.CreateTestEmptyInsertMesage(1, nil)
msgID, err := producer.Produce(ctx, msg)
msgID, err := producer.Append(ctx, msg)
assert.Error(t, err)
assert.Nil(t, msgID)
msg = message.CreateTestEmptyInsertMesage(1, nil)
msgID, err = producer.Produce(ctx, msg)
msgID, err = producer.Append(ctx, msg)
assert.NoError(t, err)
assert.NotNil(t, msgID)
close(ch)
@ -100,7 +100,7 @@ func TestProducer(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
msg := message.CreateTestEmptyInsertMesage(1, nil)
_, err = producer.Produce(ctx, msg)
_, err = producer.Append(ctx, msg)
assert.ErrorIs(t, err, context.DeadlineExceeded)
assert.True(t, producer.IsAvailable())
producer.Close()

View File

@ -0,0 +1,44 @@
package registry
import (
"context"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var (
registry = syncutil.NewFuture[WALManager]()
ErrNoStreamingNodeDeployed = errors.New("no streaming node deployed")
)
// RegisterLocalWALManager registers the local wal manager.
// When the streaming node is started, it should call this function to register the wal manager.
func RegisterLocalWALManager(manager WALManager) {
if !paramtable.IsLocalComponentEnabled(typeutil.StreamingNodeRole) {
panic("unreachable: streaming node is not enabled but wal setup")
}
registry.Set(manager)
log.Ctx(context.Background()).Info("register local wal manager done")
}
// GetAvailableWAL returns a available wal instance for the channel.
func GetAvailableWAL(channel types.PChannelInfo) (wal.WAL, error) {
if !paramtable.IsLocalComponentEnabled(typeutil.StreamingNodeRole) {
return nil, ErrNoStreamingNodeDeployed
}
return registry.Get().GetAvailableWAL(channel)
}
// WALManager is a hint type for wal manager at streaming node.
type WALManager interface {
// GetAvailableWAL returns a available wal instance for the channel.
// Return nil if the wal instance is not found.
GetAvailableWAL(channel types.PChannelInfo) (wal.WAL, error)
}

View File

@ -29,7 +29,6 @@ import (
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
adaptor2 "github.com/milvus-io/milvus/internal/streamingnode/server/wal/adaptor"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/proto/datapb"
@ -112,7 +111,7 @@ func (c *channelLifetime) Run() error {
// Create scanner.
policy := options.DeliverPolicyStartFrom(messageID)
handler := adaptor2.NewMsgPackAdaptorHandler()
handler := adaptor.NewMsgPackAdaptorHandler()
ro := wal.ReadOption{
VChannel: c.vchannel,
DeliverPolicy: policy,

View File

@ -5,6 +5,7 @@ import (
"google.golang.org/grpc"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/registry"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/service"
"github.com/milvus-io/milvus/internal/streamingnode/server/walmanager"
@ -66,6 +67,8 @@ func (s *Server) initBasicComponent(_ context.Context) {
if err != nil {
panic("open wal manager failed")
}
// Register the wal manager to the local registry.
registry.RegisterLocalWALManager(s.walManager)
}
// initService initializes the grpc service.

View File

@ -1,107 +0,0 @@
package adaptor
import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
)
var (
_ wal.MessageHandler = defaultMessageHandler(nil)
_ wal.MessageHandler = (*MsgPackAdaptorHandler)(nil)
)
type defaultMessageHandler chan message.ImmutableMessage
func (h defaultMessageHandler) Handle(param wal.HandleParam) wal.HandleResult {
var sendingCh chan message.ImmutableMessage
if param.Message != nil {
sendingCh = h
}
select {
case <-param.Ctx.Done():
return wal.HandleResult{Error: param.Ctx.Err()}
case msg, ok := <-param.Upstream:
if !ok {
return wal.HandleResult{Error: wal.ErrUpstreamClosed}
}
return wal.HandleResult{Incoming: msg}
case sendingCh <- param.Message:
return wal.HandleResult{MessageHandled: true}
case <-param.TimeTickChan:
return wal.HandleResult{TimeTickUpdated: true}
}
}
func (d defaultMessageHandler) Close() {
close(d)
}
// NewMsgPackAdaptorHandler create a new message pack adaptor handler.
func NewMsgPackAdaptorHandler() *MsgPackAdaptorHandler {
return &MsgPackAdaptorHandler{
base: adaptor.NewBaseMsgPackAdaptorHandler(),
}
}
type MsgPackAdaptorHandler struct {
base *adaptor.BaseMsgPackAdaptorHandler
}
// Chan is the channel for message.
func (m *MsgPackAdaptorHandler) Chan() <-chan *msgstream.MsgPack {
return m.base.Channel
}
// Handle is the callback for handling message.
func (m *MsgPackAdaptorHandler) Handle(param wal.HandleParam) wal.HandleResult {
messageHandled := false
// not handle new message if there are pending msgPack.
if param.Message != nil && m.base.PendingMsgPack.Len() == 0 {
m.base.GenerateMsgPack(param.Message)
messageHandled = true
}
for {
var sendCh chan<- *msgstream.MsgPack
if m.base.PendingMsgPack.Len() != 0 {
sendCh = m.base.Channel
}
select {
case <-param.Ctx.Done():
return wal.HandleResult{
MessageHandled: messageHandled,
Error: param.Ctx.Err(),
}
case msg, notClose := <-param.Upstream:
if !notClose {
return wal.HandleResult{
MessageHandled: messageHandled,
Error: wal.ErrUpstreamClosed,
}
}
return wal.HandleResult{
Incoming: msg,
MessageHandled: messageHandled,
}
case sendCh <- m.base.PendingMsgPack.Next():
m.base.PendingMsgPack.UnsafeAdvance()
if m.base.PendingMsgPack.Len() > 0 {
continue
}
return wal.HandleResult{MessageHandled: messageHandled}
case <-param.TimeTickChan:
return wal.HandleResult{
MessageHandled: messageHandled,
TimeTickUpdated: true,
}
}
}
}
// Close closes the handler.
func (m *MsgPackAdaptorHandler) Close() {
close(m.base.Channel)
}

View File

@ -1,93 +0,0 @@
package adaptor
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq"
)
func TestMsgPackAdaptorHandler(t *testing.T) {
messageID := rmq.NewRmqID(1)
tt := uint64(100)
msg := message.CreateTestInsertMessage(
t,
1,
1000,
tt,
messageID,
)
immutableMsg := msg.IntoImmutableMessage(messageID)
upstream := make(chan message.ImmutableMessage, 1)
ctx := context.Background()
h := NewMsgPackAdaptorHandler()
done := make(chan struct{})
go func() {
for range h.Chan() {
}
close(done)
}()
upstream <- immutableMsg
resp := h.Handle(wal.HandleParam{
Ctx: ctx,
Upstream: upstream,
Message: nil,
})
assert.Equal(t, resp.Incoming, immutableMsg)
assert.False(t, resp.MessageHandled)
assert.NoError(t, resp.Error)
resp = h.Handle(wal.HandleParam{
Ctx: ctx,
Upstream: upstream,
Message: resp.Incoming,
})
assert.NoError(t, resp.Error)
assert.Nil(t, resp.Incoming)
assert.True(t, resp.MessageHandled)
h.Close()
<-done
}
func TestDefaultHandler(t *testing.T) {
h := make(defaultMessageHandler, 1)
done := make(chan struct{})
go func() {
for range h {
}
close(done)
}()
upstream := make(chan message.ImmutableMessage, 1)
msg := mock_message.NewMockImmutableMessage(t)
upstream <- msg
resp := h.Handle(wal.HandleParam{
Ctx: context.Background(),
Upstream: upstream,
Message: nil,
})
assert.NotNil(t, resp.Incoming)
assert.NoError(t, resp.Error)
assert.False(t, resp.MessageHandled)
assert.Equal(t, resp.Incoming, msg)
resp = h.Handle(wal.HandleParam{
Ctx: context.Background(),
Upstream: upstream,
Message: resp.Incoming,
})
assert.NoError(t, resp.Error)
assert.Nil(t, resp.Incoming)
assert.True(t, resp.MessageHandled)
h.Close()
<-done
}

View File

@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
@ -32,7 +33,7 @@ func newScannerAdaptor(
panic("vchannel of scanner must be set")
}
if readOption.MesasgeHandler == nil {
readOption.MesasgeHandler = defaultMessageHandler(make(chan message.ImmutableMessage))
readOption.MesasgeHandler = adaptor.ChanMessageHandler(make(chan message.ImmutableMessage))
}
options.GetFilterFunc(readOption.MessageFilter)
logger := resource.Resource().Logger().With(
@ -79,7 +80,7 @@ func (s *scannerAdaptorImpl) Channel() types.PChannelInfo {
// Chan returns the message channel of the scanner.
func (s *scannerAdaptorImpl) Chan() <-chan message.ImmutableMessage {
return s.readOption.MesasgeHandler.(defaultMessageHandler)
return s.readOption.MesasgeHandler.(adaptor.ChanMessageHandler)
}
// Close the scanner, release the underlying resources.
@ -111,7 +112,7 @@ func (s *scannerAdaptorImpl) executeConsume() {
for {
// generate the event channel and do the event loop.
// TODO: Consume from local cache.
handleResult := s.readOption.MesasgeHandler.Handle(wal.HandleParam{
handleResult := s.readOption.MesasgeHandler.Handle(message.HandleParam{
Ctx: s.Context(),
Upstream: s.getUpstream(innerScanner),
TimeTickChan: s.getTimeTickUpdateChan(timeTickNotifier),

View File

@ -1,8 +1,6 @@
package wal
import (
"context"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
@ -19,7 +17,7 @@ type ReadOption struct {
VChannel string // vchannel name
DeliverPolicy options.DeliverPolicy
MessageFilter []options.DeliverFilter
MesasgeHandler MessageHandler // message handler for message processing.
MesasgeHandler message.Handler // message handler for message processing.
// If the message handler is nil (no redundant operation need to apply),
// the default message handler will be used, and the receiver will be returned from Chan.
// Otherwise, Chan will panic.
@ -45,27 +43,3 @@ type Scanner interface {
// Return the error same with `Error`
Close() error
}
type HandleParam struct {
Ctx context.Context
Upstream <-chan message.ImmutableMessage
Message message.ImmutableMessage
TimeTickChan <-chan struct{}
}
type HandleResult struct {
Incoming message.ImmutableMessage // Not nil if upstream return new message.
MessageHandled bool // True if Message is handled successfully.
TimeTickUpdated bool // True if TimeTickChan is triggered.
Error error // Error is context is canceled.
}
// MessageHandler is used to handle message read from log.
// TODO: should be removed in future after msgstream is removed.
type MessageHandler interface {
// Handle is the callback for handling message.
Handle(param HandleParam) HandleResult
// Close is called after all messages are handled or handling is interrupted.
Close()
}

View File

@ -1,8 +1,6 @@
package adaptor
import (
"context"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/log"
@ -11,6 +9,32 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type ChanMessageHandler chan message.ImmutableMessage
func (h ChanMessageHandler) Handle(param message.HandleParam) message.HandleResult {
var sendingCh chan message.ImmutableMessage
if param.Message != nil {
sendingCh = h
}
select {
case <-param.Ctx.Done():
return message.HandleResult{Error: param.Ctx.Err()}
case msg, ok := <-param.Upstream:
if !ok {
return message.HandleResult{Error: message.ErrUpstreamClosed}
}
return message.HandleResult{Incoming: msg}
case sendingCh <- param.Message:
return message.HandleResult{MessageHandled: true}
case <-param.TimeTickChan:
return message.HandleResult{TimeTickUpdated: true}
}
}
func (d ChanMessageHandler) Close() {
close(d)
}
// NewMsgPackAdaptorHandler create a new message pack adaptor handler.
func NewMsgPackAdaptorHandler() *MsgPackAdaptorHandler {
return &MsgPackAdaptorHandler{
@ -18,7 +42,6 @@ func NewMsgPackAdaptorHandler() *MsgPackAdaptorHandler {
}
}
// MsgPackAdaptorHandler is the handler for message pack.
type MsgPackAdaptorHandler struct {
base *BaseMsgPackAdaptorHandler
}
@ -29,20 +52,53 @@ func (m *MsgPackAdaptorHandler) Chan() <-chan *msgstream.MsgPack {
}
// Handle is the callback for handling message.
func (m *MsgPackAdaptorHandler) Handle(ctx context.Context, msg message.ImmutableMessage) (bool, error) {
m.base.GenerateMsgPack(msg)
for m.base.PendingMsgPack.Len() > 0 {
func (m *MsgPackAdaptorHandler) Handle(param message.HandleParam) message.HandleResult {
messageHandled := false
// not handle new message if there are pending msgPack.
if param.Message != nil && m.base.PendingMsgPack.Len() == 0 {
m.base.GenerateMsgPack(param.Message)
messageHandled = true
}
for {
var sendCh chan<- *msgstream.MsgPack
if m.base.PendingMsgPack.Len() != 0 {
sendCh = m.base.Channel
}
select {
case <-ctx.Done():
return true, ctx.Err()
case m.base.Channel <- m.base.PendingMsgPack.Next():
case <-param.Ctx.Done():
return message.HandleResult{
MessageHandled: messageHandled,
Error: param.Ctx.Err(),
}
case msg, notClose := <-param.Upstream:
if !notClose {
return message.HandleResult{
MessageHandled: messageHandled,
Error: message.ErrUpstreamClosed,
}
}
return message.HandleResult{
Incoming: msg,
MessageHandled: messageHandled,
}
case sendCh <- m.base.PendingMsgPack.Next():
m.base.PendingMsgPack.UnsafeAdvance()
if m.base.PendingMsgPack.Len() > 0 {
continue
}
return message.HandleResult{MessageHandled: messageHandled}
case <-param.TimeTickChan:
return message.HandleResult{
MessageHandled: messageHandled,
TimeTickUpdated: true,
}
}
}
return true, nil
}
// Close is the callback for closing message.
// Close closes the handler.
func (m *MsgPackAdaptorHandler) Close() {
close(m.base.Channel)
}

View File

@ -3,167 +3,90 @@ package adaptor
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq"
)
func TestMsgPackAdaptorHandler(t *testing.T) {
id := rmq.NewRmqID(1)
messageID := rmq.NewRmqID(1)
tt := uint64(100)
msg := message.CreateTestInsertMessage(
t,
1,
1000,
tt,
messageID,
)
immutableMsg := msg.IntoImmutableMessage(messageID)
upstream := make(chan message.ImmutableMessage, 1)
ctx := context.Background()
h := NewMsgPackAdaptorHandler()
insertMsg := message.CreateTestInsertMessage(t, 1, 100, 10, id)
insertImmutableMessage := insertMsg.IntoImmutableMessage(id)
ch := make(chan *msgstream.MsgPack, 1)
done := make(chan struct{})
go func() {
for msgPack := range h.Chan() {
ch <- msgPack
for range h.Chan() {
}
close(ch)
close(done)
}()
ok, err := h.Handle(context.Background(), insertImmutableMessage)
assert.True(t, ok)
assert.NoError(t, err)
msgPack := <-ch
upstream <- immutableMsg
resp := h.Handle(message.HandleParam{
Ctx: ctx,
Upstream: upstream,
Message: nil,
})
assert.Equal(t, resp.Incoming, immutableMsg)
assert.False(t, resp.MessageHandled)
assert.NoError(t, resp.Error)
assert.Equal(t, uint64(10), msgPack.BeginTs)
assert.Equal(t, uint64(10), msgPack.EndTs)
for _, tsMsg := range msgPack.Msgs {
assert.Equal(t, uint64(10), tsMsg.BeginTs())
assert.Equal(t, uint64(10), tsMsg.EndTs())
for _, ts := range tsMsg.(*msgstream.InsertMsg).Timestamps {
assert.Equal(t, uint64(10), ts)
resp = h.Handle(message.HandleParam{
Ctx: ctx,
Upstream: upstream,
Message: resp.Incoming,
})
assert.NoError(t, resp.Error)
assert.Nil(t, resp.Incoming)
assert.True(t, resp.MessageHandled)
h.Close()
<-done
}
func TestDefaultHandler(t *testing.T) {
h := make(ChanMessageHandler, 1)
done := make(chan struct{})
go func() {
for range h {
}
}
close(done)
}()
deleteMsg, err := message.NewDeleteMessageBuilderV1().
WithVChannel("vchan1").
WithHeader(&message.DeleteMessageHeader{
CollectionId: 1,
}).
WithBody(&msgpb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Delete,
},
CollectionID: 1,
PartitionID: 1,
Timestamps: []uint64{10},
}).
BuildMutable()
assert.NoError(t, err)
upstream := make(chan message.ImmutableMessage, 1)
msg := mock_message.NewMockImmutableMessage(t)
upstream <- msg
resp := h.Handle(message.HandleParam{
Ctx: context.Background(),
Upstream: upstream,
Message: nil,
})
assert.NotNil(t, resp.Incoming)
assert.NoError(t, resp.Error)
assert.False(t, resp.MessageHandled)
assert.Equal(t, resp.Incoming, msg)
deleteImmutableMsg := deleteMsg.
WithTimeTick(11).
WithLastConfirmedUseMessageID().
IntoImmutableMessage(id)
ok, err = h.Handle(context.Background(), deleteImmutableMsg)
assert.True(t, ok)
assert.NoError(t, err)
msgPack = <-ch
assert.Equal(t, uint64(11), msgPack.BeginTs)
assert.Equal(t, uint64(11), msgPack.EndTs)
for _, tsMsg := range msgPack.Msgs {
assert.Equal(t, uint64(11), tsMsg.BeginTs())
assert.Equal(t, uint64(11), tsMsg.EndTs())
for _, ts := range tsMsg.(*msgstream.DeleteMsg).Timestamps {
assert.Equal(t, uint64(11), ts)
}
}
// Create a txn message
msg, err := message.NewBeginTxnMessageBuilderV2().
WithVChannel("vchan1").
WithHeader(&message.BeginTxnMessageHeader{
KeepaliveMilliseconds: 1000,
}).
WithBody(&message.BeginTxnMessageBody{}).
BuildMutable()
assert.NoError(t, err)
assert.NotNil(t, msg)
txnCtx := message.TxnContext{
TxnID: 1,
Keepalive: time.Second,
}
beginImmutableMsg, err := message.AsImmutableBeginTxnMessageV2(msg.WithTimeTick(9).
WithTxnContext(txnCtx).
WithLastConfirmedUseMessageID().
IntoImmutableMessage(rmq.NewRmqID(2)))
assert.NoError(t, err)
msg, err = message.NewCommitTxnMessageBuilderV2().
WithVChannel("vchan1").
WithHeader(&message.CommitTxnMessageHeader{}).
WithBody(&message.CommitTxnMessageBody{}).
BuildMutable()
assert.NoError(t, err)
commitImmutableMsg, err := message.AsImmutableCommitTxnMessageV2(msg.WithTimeTick(12).
WithTxnContext(txnCtx).
WithTxnContext(message.TxnContext{}).
WithLastConfirmedUseMessageID().
IntoImmutableMessage(rmq.NewRmqID(3)))
assert.NoError(t, err)
txn, err := message.NewImmutableTxnMessageBuilder(beginImmutableMsg).
Add(insertMsg.WithTxnContext(txnCtx).IntoImmutableMessage(id)).
Add(deleteMsg.WithTxnContext(txnCtx).IntoImmutableMessage(id)).
Build(commitImmutableMsg)
assert.NoError(t, err)
ok, err = h.Handle(context.Background(), txn)
assert.True(t, ok)
assert.NoError(t, err)
msgPack = <-ch
assert.Equal(t, uint64(12), msgPack.BeginTs)
assert.Equal(t, uint64(12), msgPack.EndTs)
// Create flush message
msg, err = message.NewFlushMessageBuilderV2().
WithVChannel("vchan1").
WithHeader(&message.FlushMessageHeader{}).
WithBody(&message.FlushMessageBody{}).
BuildMutable()
assert.NoError(t, err)
flushMsg := msg.
WithTimeTick(13).
WithLastConfirmedUseMessageID().
IntoImmutableMessage(rmq.NewRmqID(4))
ok, err = h.Handle(context.Background(), flushMsg)
assert.True(t, ok)
assert.NoError(t, err)
msgPack = <-ch
assert.Equal(t, uint64(13), msgPack.BeginTs)
assert.Equal(t, uint64(13), msgPack.EndTs)
resp = h.Handle(message.HandleParam{
Ctx: context.Background(),
Upstream: upstream,
Message: resp.Incoming,
})
assert.NoError(t, resp.Error)
assert.Nil(t, resp.Incoming)
assert.True(t, resp.MessageHandled)
h.Close()
<-ch
}
func TestMsgPackAdaptorHandlerTimeout(t *testing.T) {
id := rmq.NewRmqID(1)
insertMsg := message.CreateTestInsertMessage(t, 1, 100, 10, id)
insertImmutableMessage := insertMsg.IntoImmutableMessage(id)
h := NewMsgPackAdaptorHandler()
ctx, cancel := context.WithCancel(context.Background())
cancel()
ok, err := h.Handle(ctx, insertImmutableMessage)
assert.True(t, ok)
assert.ErrorIs(t, err, ctx.Err())
<-done
}

View File

@ -1,6 +1,28 @@
package message
import "context"
import (
"context"
"github.com/cockroachdb/errors"
)
var ErrUpstreamClosed = errors.New("upstream closed")
// HandleParam is the parameter for handler.
type HandleParam struct {
Ctx context.Context
Upstream <-chan ImmutableMessage
Message ImmutableMessage
TimeTickChan <-chan struct{}
}
// HandleResult is the result of handler.
type HandleResult struct {
Incoming ImmutableMessage // Not nil if upstream return new message.
MessageHandled bool // True if Message is handled successfully.
TimeTickUpdated bool // True if TimeTickChan is triggered.
Error error // Error is context is canceled.
}
// Handler is used to handle message read from log.
type Handler interface {
@ -8,29 +30,9 @@ type Handler interface {
// Return true if the message is consumed, false if the message is not consumed.
// Should return error if and only if ctx is done.
// !!! It's a bad implementation for compatibility for msgstream,
// should be removed in the future.
Handle(ctx context.Context, msg ImmutableMessage) (bool, error)
// will be removed in the future.
Handle(param HandleParam) HandleResult
// Close is called after all messages are handled or handling is interrupted.
Close()
}
var _ Handler = ChanMessageHandler(nil)
// ChanMessageHandler is a handler just forward the message into a channel.
type ChanMessageHandler chan ImmutableMessage
// Handle is the callback for handling message.
func (cmh ChanMessageHandler) Handle(ctx context.Context, msg ImmutableMessage) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
case cmh <- msg:
return true, nil
}
}
// Close is called after all messages are handled or handling is interrupted.
func (cmh ChanMessageHandler) Close() {
close(cmh)
}

View File

@ -1,27 +0,0 @@
package message
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
)
func TestMessageHandler(t *testing.T) {
ch := make(chan ImmutableMessage, 1)
h := ChanMessageHandler(ch)
ok, err := h.Handle(context.Background(), nil)
assert.NoError(t, err)
assert.True(t, ok)
ctx, cancel := context.WithCancel(context.Background())
cancel()
ok, err = h.Handle(ctx, nil)
assert.ErrorIs(t, err, ctx.Err())
assert.False(t, ok)
assert.Nil(t, <-ch)
h.Close()
_, ok = <-ch
assert.False(t, ok)
}

View File

@ -365,6 +365,7 @@ func (cluster *MiniClusterV2) Start() error {
}
if streamingutil.IsStreamingServiceEnabled() {
paramtable.SetLocalComponentEnabled(typeutil.StreamingNodeRole)
runComponent(cluster.StreamingNode)
}