enhance: enable write ahead buffer for streaming service (#39771)

issue: #38399

- Make a timetick-commit-based write ahead buffer at write side.
- Add a switchable scanner at read side to transfer the state between
catchup and tailing read

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-02-12 20:38:46 +08:00 committed by GitHub
parent 28c2558f5d
commit 0988807160
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
44 changed files with 1614 additions and 412 deletions

View File

@ -1131,6 +1131,9 @@ streaming:
concurrencyRatio: 1 # The concurrency ratio based on number of CPU for wal broadcaster, 1 by default.
txn:
defaultKeepaliveTimeout: 10s # The default keepalive timeout for wal txn, 10s by default
walWriteAheadBuffer:
capacity: 64m # The capacity of write ahead buffer of each wal, 64M by default
keepalive: 30s # The keepalive duration for entries in write ahead buffer of each wal, 30s by default
# Any configuration related to the knowhere vector search engine
knowhere:

View File

@ -58,6 +58,9 @@ packages:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector:
interfaces:
TimeTickSyncOperator:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/wab:
interfaces:
ROWriteAheadBuffer:
google.golang.org/grpc:
interfaces:
ClientStream:

View File

@ -0,0 +1,96 @@
// Code generated by mockery v2.46.0. DO NOT EDIT.
package mock_wab
import (
context "context"
wab "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/wab"
mock "github.com/stretchr/testify/mock"
)
// MockROWriteAheadBuffer is an autogenerated mock type for the ROWriteAheadBuffer type
type MockROWriteAheadBuffer struct {
mock.Mock
}
type MockROWriteAheadBuffer_Expecter struct {
mock *mock.Mock
}
func (_m *MockROWriteAheadBuffer) EXPECT() *MockROWriteAheadBuffer_Expecter {
return &MockROWriteAheadBuffer_Expecter{mock: &_m.Mock}
}
// ReadFromExclusiveTimeTick provides a mock function with given fields: ctx, timetick
func (_m *MockROWriteAheadBuffer) ReadFromExclusiveTimeTick(ctx context.Context, timetick uint64) (*wab.WriteAheadBufferReader, error) {
ret := _m.Called(ctx, timetick)
if len(ret) == 0 {
panic("no return value specified for ReadFromExclusiveTimeTick")
}
var r0 *wab.WriteAheadBufferReader
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, uint64) (*wab.WriteAheadBufferReader, error)); ok {
return rf(ctx, timetick)
}
if rf, ok := ret.Get(0).(func(context.Context, uint64) *wab.WriteAheadBufferReader); ok {
r0 = rf(ctx, timetick)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*wab.WriteAheadBufferReader)
}
}
if rf, ok := ret.Get(1).(func(context.Context, uint64) error); ok {
r1 = rf(ctx, timetick)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReadFromExclusiveTimeTick'
type MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call struct {
*mock.Call
}
// ReadFromExclusiveTimeTick is a helper method to define mock.On call
// - ctx context.Context
// - timetick uint64
func (_e *MockROWriteAheadBuffer_Expecter) ReadFromExclusiveTimeTick(ctx interface{}, timetick interface{}) *MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call {
return &MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call{Call: _e.mock.On("ReadFromExclusiveTimeTick", ctx, timetick)}
}
func (_c *MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call) Run(run func(ctx context.Context, timetick uint64)) *MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(uint64))
})
return _c
}
func (_c *MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call) Return(_a0 *wab.WriteAheadBufferReader, _a1 error) *MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call) RunAndReturn(run func(context.Context, uint64) (*wab.WriteAheadBufferReader, error)) *MockROWriteAheadBuffer_ReadFromExclusiveTimeTick_Call {
_c.Call.Return(run)
return _c
}
// NewMockROWriteAheadBuffer creates a new instance of MockROWriteAheadBuffer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockROWriteAheadBuffer(t interface {
mock.TestingT
Cleanup(func())
}) *MockROWriteAheadBuffer {
mock := &MockROWriteAheadBuffer{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -5,10 +5,11 @@ package mock_inspector
import (
context "context"
inspector "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector"
mock "github.com/stretchr/testify/mock"
types "github.com/milvus-io/milvus/pkg/streaming/util/types"
wab "github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/wab"
)
// MockTimeTickSyncOperator is an autogenerated mock type for the TimeTickSyncOperator type
@ -102,49 +103,60 @@ func (_c *MockTimeTickSyncOperator_Sync_Call) RunAndReturn(run func(context.Cont
return _c
}
// TimeTickNotifier provides a mock function with given fields:
func (_m *MockTimeTickSyncOperator) TimeTickNotifier() *inspector.TimeTickNotifier {
ret := _m.Called()
// WriteAheadBuffer provides a mock function with given fields: ctx
func (_m *MockTimeTickSyncOperator) WriteAheadBuffer(ctx context.Context) (wab.ROWriteAheadBuffer, error) {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for TimeTickNotifier")
panic("no return value specified for WriteAheadBuffer")
}
var r0 *inspector.TimeTickNotifier
if rf, ok := ret.Get(0).(func() *inspector.TimeTickNotifier); ok {
r0 = rf()
var r0 wab.ROWriteAheadBuffer
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) (wab.ROWriteAheadBuffer, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) wab.ROWriteAheadBuffer); ok {
r0 = rf(ctx)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*inspector.TimeTickNotifier)
r0 = ret.Get(0).(wab.ROWriteAheadBuffer)
}
}
return r0
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockTimeTickSyncOperator_TimeTickNotifier_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TimeTickNotifier'
type MockTimeTickSyncOperator_TimeTickNotifier_Call struct {
// MockTimeTickSyncOperator_WriteAheadBuffer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WriteAheadBuffer'
type MockTimeTickSyncOperator_WriteAheadBuffer_Call struct {
*mock.Call
}
// TimeTickNotifier is a helper method to define mock.On call
func (_e *MockTimeTickSyncOperator_Expecter) TimeTickNotifier() *MockTimeTickSyncOperator_TimeTickNotifier_Call {
return &MockTimeTickSyncOperator_TimeTickNotifier_Call{Call: _e.mock.On("TimeTickNotifier")}
// WriteAheadBuffer is a helper method to define mock.On call
// - ctx context.Context
func (_e *MockTimeTickSyncOperator_Expecter) WriteAheadBuffer(ctx interface{}) *MockTimeTickSyncOperator_WriteAheadBuffer_Call {
return &MockTimeTickSyncOperator_WriteAheadBuffer_Call{Call: _e.mock.On("WriteAheadBuffer", ctx)}
}
func (_c *MockTimeTickSyncOperator_TimeTickNotifier_Call) Run(run func()) *MockTimeTickSyncOperator_TimeTickNotifier_Call {
func (_c *MockTimeTickSyncOperator_WriteAheadBuffer_Call) Run(run func(ctx context.Context)) *MockTimeTickSyncOperator_WriteAheadBuffer_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
run(args[0].(context.Context))
})
return _c
}
func (_c *MockTimeTickSyncOperator_TimeTickNotifier_Call) Return(_a0 *inspector.TimeTickNotifier) *MockTimeTickSyncOperator_TimeTickNotifier_Call {
_c.Call.Return(_a0)
func (_c *MockTimeTickSyncOperator_WriteAheadBuffer_Call) Return(_a0 wab.ROWriteAheadBuffer, _a1 error) *MockTimeTickSyncOperator_WriteAheadBuffer_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockTimeTickSyncOperator_TimeTickNotifier_Call) RunAndReturn(run func() *inspector.TimeTickNotifier) *MockTimeTickSyncOperator_TimeTickNotifier_Call {
func (_c *MockTimeTickSyncOperator_WriteAheadBuffer_Call) RunAndReturn(run func(context.Context) (wab.ROWriteAheadBuffer, error)) *MockTimeTickSyncOperator_WriteAheadBuffer_Call {
_c.Call.Return(run)
return _c
}

View File

@ -18,8 +18,6 @@ import (
func TestAssignChannelToWALLocatedFirst(t *testing.T) {
balancer := mock_balancer.NewMockBalancer(t)
snmanager.StaticStreamingNodeManager.SetBalancerReady(balancer)
balancer.EXPECT().WatchChannelAssignments(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, cb func(typeutil.VersionInt64Pair, []types.PChannelInfoAssigned) error) error {
versions := []typeutil.VersionInt64Pair{
{Global: 1, Local: 2},
@ -46,6 +44,7 @@ func TestAssignChannelToWALLocatedFirst(t *testing.T) {
<-ctx.Done()
return context.Cause(ctx)
})
snmanager.StaticStreamingNodeManager.SetBalancerReady(balancer)
channels := []*meta.DmChannel{
{VchannelInfo: &datapb.VchannelInfo{ChannelName: "pchannel_v1"}},

View File

@ -173,19 +173,12 @@ func (impl *flusherComponents) recover(ctx context.Context, recoverInfos map[str
futures[vchannel] = future
}
dataServices := make(map[string]*dataSyncServiceWrapper, len(futures))
var firstErr error
for vchannel, future := range futures {
ds, err := future.Await()
if err == nil {
dataServices[vchannel] = ds.(*dataSyncServiceWrapper)
continue
if err != nil {
return err
}
if firstErr == nil {
firstErr = err
}
}
if firstErr != nil {
return firstErr
dataServices[vchannel] = ds.(*dataSyncServiceWrapper)
}
impl.dataServices = dataServices
for vchannel, ds := range dataServices {

View File

@ -97,7 +97,7 @@ func (impl *WALFlusherImpl) getRecoveryInfo(ctx context.Context, vchannel string
return err
}
// The channel has been dropped, skip to recover it.
if len(resp.GetInfo().GetSeekPosition().GetMsgID()) == 0 && resp.GetInfo().GetSeekPosition().GetTimestamp() == math.MaxUint64 {
if isDroppedChannel(resp) {
impl.logger.Info("channel has been dropped, the vchannel can not be recovered", zap.String("vchannel", vchannel))
return retry.Unrecoverable(errChannelLifetimeUnrecoverable)
}
@ -105,3 +105,7 @@ func (impl *WALFlusherImpl) getRecoveryInfo(ctx context.Context, vchannel string
}, retry.AttemptAlways())
return resp, err
}
func isDroppedChannel(resp *datapb.GetChannelRecoveryInfoResponse) bool {
return len(resp.GetInfo().GetSeekPosition().GetMsgID()) == 0 && resp.GetInfo().GetSeekPosition().GetTimestamp() == math.MaxUint64
}

View File

@ -1,12 +1,13 @@
package adaptor
import (
"context"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility"
"github.com/milvus-io/milvus/pkg/log"
@ -16,7 +17,6 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/helper"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
var _ wal.Scanner = (*scannerAdaptorImpl)(nil)
@ -39,35 +39,33 @@ func newScannerAdaptor(
zap.String("channel", l.Channel().Name),
)
s := &scannerAdaptorImpl{
logger: logger,
innerWAL: l,
readOption: readOption,
filterFunc: options.GetFilterFunc(readOption.MessageFilter),
reorderBuffer: utility.NewReOrderBuffer(),
pendingQueue: utility.NewPendingQueue(),
txnBuffer: utility.NewTxnBuffer(logger, scanMetrics),
cleanup: cleanup,
ScannerHelper: helper.NewScannerHelper(name),
lastTimeTickInfo: inspector.TimeTickInfo{},
metrics: scanMetrics,
logger: logger,
innerWAL: l,
readOption: readOption,
filterFunc: options.GetFilterFunc(readOption.MessageFilter),
reorderBuffer: utility.NewReOrderBuffer(),
pendingQueue: utility.NewPendingQueue(),
txnBuffer: utility.NewTxnBuffer(logger, scanMetrics),
cleanup: cleanup,
ScannerHelper: helper.NewScannerHelper(name),
metrics: scanMetrics,
}
go s.executeConsume()
go s.execute()
return s
}
// scannerAdaptorImpl is a wrapper of ScannerImpls to extend it into a Scanner interface.
type scannerAdaptorImpl struct {
*helper.ScannerHelper
logger *log.MLogger
innerWAL walimpls.WALImpls
readOption wal.ReadOption
filterFunc func(message.ImmutableMessage) bool
reorderBuffer *utility.ReOrderByTimeTickBuffer // only support time tick reorder now.
pendingQueue *utility.PendingQueue
txnBuffer *utility.TxnBuffer // txn buffer for txn message.
cleanup func()
lastTimeTickInfo inspector.TimeTickInfo
metrics *metricsutil.ScannerMetrics
logger *log.MLogger
innerWAL walimpls.WALImpls
readOption wal.ReadOption
filterFunc func(message.ImmutableMessage) bool
reorderBuffer *utility.ReOrderByTimeTickBuffer // support time tick reorder.
pendingQueue *utility.PendingQueue
txnBuffer *utility.TxnBuffer // txn buffer for txn message.
cleanup func()
metrics *metricsutil.ScannerMetrics
}
// Channel returns the channel assignment info of the wal.
@ -91,33 +89,73 @@ func (s *scannerAdaptorImpl) Close() error {
return err
}
func (s *scannerAdaptorImpl) executeConsume() {
defer s.readOption.MesasgeHandler.Close()
func (s *scannerAdaptorImpl) execute() {
defer func() {
s.readOption.MesasgeHandler.Close()
s.Finish(nil)
s.logger.Info("scanner is closed")
}()
s.logger.Info("scanner start background task")
innerScanner, err := s.innerWAL.Read(s.Context(), walimpls.ReadOption{
Name: s.Name(),
DeliverPolicy: s.readOption.DeliverPolicy,
})
if err != nil {
s.Finish(err)
msgChan := make(chan message.ImmutableMessage)
ch := make(chan struct{})
// TODO: optimize the extra goroutine here after msgstream is removed.
go func() {
defer close(ch)
err := s.produceEventLoop(msgChan)
if errors.Is(err, context.Canceled) {
s.logger.Info("the produce event loop of scanner is closed")
return
}
s.logger.Warn("the produce event loop of scanner is closed with unexpected error", zap.Error(err))
}()
err := s.consumeEventLoop(msgChan)
if errors.Is(err, context.Canceled) {
s.logger.Info("the consuming event loop of scanner is closed")
return
}
defer innerScanner.Close()
s.logger.Warn("the consuming event loop of scanner is closed with unexpected error", zap.Error(err))
timeTickNotifier := resource.Resource().TimeTickInspector().MustGetOperator(s.Channel()).TimeTickNotifier()
// waiting for the produce event loop to close.
<-ch
}
// produceEventLoop produces the message from the wal and write ahead buffer.
func (s *scannerAdaptorImpl) produceEventLoop(msgChan chan<- message.ImmutableMessage) error {
wb, err := resource.Resource().TimeTickInspector().MustGetOperator(s.Channel()).WriteAheadBuffer(s.Context())
if err != nil {
return err
}
scanner := newSwithableScanner(s.Name(), s.logger, s.innerWAL, wb, s.readOption.DeliverPolicy, msgChan)
s.logger.Info("start produce loop of scanner at mode", zap.String("mode", scanner.Mode()))
for {
if scanner, err = scanner.Do(s.Context()); err != nil {
return err
}
s.logger.Info("switch scanner mode", zap.String("mode", scanner.Mode()))
}
}
// consumeEventLoop consumes the message from the message channel and handle it.
func (s *scannerAdaptorImpl) consumeEventLoop(msgChan <-chan message.ImmutableMessage) error {
for {
var upstream <-chan message.ImmutableMessage
if s.pendingQueue.Len() > 16 {
// If the pending queue is full, we need to wait until it's consumed to avoid scanner overloading.
upstream = nil
} else {
upstream = msgChan
}
// generate the event channel and do the event loop.
// TODO: Consume from local cache.
handleResult := s.readOption.MesasgeHandler.Handle(message.HandleParam{
Ctx: s.Context(),
Upstream: s.getUpstream(innerScanner),
TimeTickChan: s.getTimeTickUpdateChan(timeTickNotifier),
Message: s.pendingQueue.Next(),
Ctx: s.Context(),
Upstream: upstream,
Message: s.pendingQueue.Next(),
})
if handleResult.Error != nil {
s.Finish(handleResult.Error)
return
return handleResult.Error
}
if handleResult.MessageHandled {
s.pendingQueue.UnsafeAdvance()
@ -126,30 +164,10 @@ func (s *scannerAdaptorImpl) executeConsume() {
if handleResult.Incoming != nil {
s.handleUpstream(handleResult.Incoming)
}
// If the timetick just updated with a non persist operation,
// we just make a fake message to keep timetick sync if there are no more pending message.
if handleResult.TimeTickUpdated {
s.handleTimeTickUpdated(timeTickNotifier)
}
}
}
func (s *scannerAdaptorImpl) getTimeTickUpdateChan(timeTickNotifier *inspector.TimeTickNotifier) <-chan struct{} {
if s.pendingQueue.Len() == 0 && s.reorderBuffer.Len() == 0 && !s.lastTimeTickInfo.IsZero() {
return timeTickNotifier.WatchAtMessageID(s.lastTimeTickInfo.MessageID, s.lastTimeTickInfo.TimeTick)
}
return nil
}
func (s *scannerAdaptorImpl) getUpstream(scanner walimpls.ScannerImpls) <-chan message.ImmutableMessage {
// TODO: configurable pending buffer count.
// If the pending queue is full, we need to wait until it's consumed to avoid scanner overloading.
if s.pendingQueue.Len() > 16 {
return nil
}
return scanner.Chan()
}
// handleUpstream handles the incoming message from the upstream.
func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) {
// Observe the message.
s.metrics.ObserveMessage(msg.MessageType(), msg.EstimateSize())
@ -163,15 +181,17 @@ func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) {
msgs := s.txnBuffer.HandleImmutableMessages(messages, msg.TimeTick())
s.metrics.UpdateTxnBufSize(s.txnBuffer.Bytes())
// Push the confirmed messages into pending queue for consuming.
// and push forward timetick info.
s.pendingQueue.Add(msgs)
s.metrics.UpdatePendingQueueSize(s.pendingQueue.Bytes())
s.lastTimeTickInfo = inspector.TimeTickInfo{
MessageID: msg.MessageID(),
TimeTick: msg.TimeTick(),
LastConfirmedMessageID: msg.LastConfirmedMessageID(),
if len(msgs) > 0 {
// Push the confirmed messages into pending queue for consuming.
s.pendingQueue.Add(msgs)
} else if s.pendingQueue.Len() == 0 {
// If there's no new message incoming and there's no pending message in the queue.
// Add current timetick message into pending queue to make timetick push forward.
// TODO: current milvus can only run on timetick pushing,
// after qview is applied, those trival time tick message can be erased.
s.pendingQueue.Add([]message.ImmutableMessage{msg})
}
s.metrics.UpdatePendingQueueSize(s.pendingQueue.Bytes())
return
}
@ -188,7 +208,9 @@ func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) {
}
// otherwise add message into reorder buffer directly.
if err := s.reorderBuffer.Push(msg); err != nil {
s.metrics.ObserveTimeTickViolation(msg.MessageType())
if errors.Is(err, utility.ErrTimeTickVoilation) {
s.metrics.ObserveTimeTickViolation(msg.MessageType())
}
s.logger.Warn("failed to push message into reorder buffer",
zap.Any("msgID", msg.MessageID()),
zap.Uint64("timetick", msg.TimeTick()),
@ -199,21 +221,3 @@ func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) {
s.metrics.UpdateTimeTickBufSize(s.reorderBuffer.Bytes())
s.metrics.ObserveFilteredMessage(msg.MessageType(), msg.EstimateSize())
}
func (s *scannerAdaptorImpl) handleTimeTickUpdated(timeTickNotifier *inspector.TimeTickNotifier) {
timeTickInfo := timeTickNotifier.Get()
if timeTickInfo.MessageID.EQ(s.lastTimeTickInfo.MessageID) && timeTickInfo.TimeTick > s.lastTimeTickInfo.TimeTick {
s.lastTimeTickInfo.TimeTick = timeTickInfo.TimeTick
msg, err := timetick.NewTimeTickMsg(
s.lastTimeTickInfo.TimeTick,
s.lastTimeTickInfo.LastConfirmedMessageID,
paramtable.GetNodeID(),
)
if err != nil {
s.logger.Warn("unreachable: a marshal timetick operation must be success")
return
}
s.pendingQueue.AddOne(msg.IntoImmutableMessage(s.lastTimeTickInfo.MessageID))
s.metrics.UpdatePendingQueueSize(s.pendingQueue.Bytes())
}
}

View File

@ -2,11 +2,15 @@ package adaptor
import (
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/interceptors/mock_wab"
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/interceptors/timetick/mock_inspector"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/pkg/mocks/streaming/mock_walimpls"
@ -15,6 +19,15 @@ import (
)
func TestScannerAdaptorReadError(t *testing.T) {
resource.InitForTest(t)
operator := mock_inspector.NewMockTimeTickSyncOperator(t)
operator.EXPECT().Channel().Return(types.PChannelInfo{})
operator.EXPECT().Sync(mock.Anything).Return()
wb := mock_wab.NewMockROWriteAheadBuffer(t)
operator.EXPECT().WriteAheadBuffer(mock.Anything).Return(wb, nil)
resource.Resource().TimeTickInspector().RegisterSyncOperator(operator)
err := errors.New("read error")
l := mock_walimpls.NewMockWALImpls(t)
l.EXPECT().Read(mock.Anything, mock.Anything).Return(nil, err)
@ -28,8 +41,9 @@ func TestScannerAdaptorReadError(t *testing.T) {
},
metricsutil.NewScanMetrics(types.PChannelInfo{}).NewScannerMetrics(),
func() {})
defer s.Close()
time.Sleep(200 * time.Millisecond)
s.Close()
<-s.Chan()
<-s.Done()
assert.ErrorIs(t, s.Error(), err)
assert.NoError(t, s.Error())
}

View File

@ -0,0 +1,213 @@
package adaptor
import (
"context"
"time"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/wab"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var (
_ switchableScanner = (*tailingScanner)(nil)
_ switchableScanner = (*catchupScanner)(nil)
)
// newSwitchableScanner creates a new switchable scanner.
func newSwithableScanner(
scannerName string,
logger *log.MLogger,
innerWAL walimpls.WALImpls,
writeAheadBuffer wab.ROWriteAheadBuffer,
deliverPolicy options.DeliverPolicy,
msgChan chan<- message.ImmutableMessage,
) switchableScanner {
return &catchupScanner{
switchableScannerImpl: switchableScannerImpl{
scannerName: scannerName,
logger: logger,
innerWAL: innerWAL,
msgChan: msgChan,
writeAheadBuffer: writeAheadBuffer,
},
deliverPolicy: deliverPolicy,
exclusiveStartTimeTick: 0,
}
}
// switchableScanner is a scanner that can switch between Catchup and Tailing mode
type switchableScanner interface {
// Mode is Catchup or Tailing
Mode() string
// Execute make a scanner work at background.
// When the scanner want to change the mode, it will return a new scanner with the new mode.
// When error is returned, the scanner is canceled and unrecoverable forever.
Do(ctx context.Context) (switchableScanner, error)
}
type switchableScannerImpl struct {
scannerName string
logger *log.MLogger
innerWAL walimpls.WALImpls
msgChan chan<- message.ImmutableMessage
writeAheadBuffer wab.ROWriteAheadBuffer
}
func (s *switchableScannerImpl) HandleMessage(ctx context.Context, msg message.ImmutableMessage) error {
select {
case <-ctx.Done():
return ctx.Err()
case s.msgChan <- msg:
return nil
}
}
// catchupScanner is a scanner that make a read at underlying wal, and try to catchup the writeahead buffer then switch to tailing mode.
type catchupScanner struct {
switchableScannerImpl
deliverPolicy options.DeliverPolicy
exclusiveStartTimeTick uint64 // scanner should filter out the message that less than or equal to this time tick.
}
func (s *catchupScanner) Mode() string {
return "Catchup"
}
func (s *catchupScanner) Do(ctx context.Context) (switchableScanner, error) {
for {
if ctx.Err() != nil {
return nil, ctx.Err()
}
scanner, err := s.createReaderWithBackoff(ctx, s.deliverPolicy)
if err != nil {
// Only the cancellation error will be returned, other error will keep backoff.
return nil, err
}
switchedScanner, err := s.consumeWithScanner(ctx, scanner)
if err != nil {
s.logger.Warn("scanner consuming was interrpurted with error, start a backoff", zap.Error(err))
continue
}
return switchedScanner, nil
}
}
func (s *catchupScanner) consumeWithScanner(ctx context.Context, scanner walimpls.ScannerImpls) (switchableScanner, error) {
defer scanner.Close()
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case msg, ok := <-scanner.Chan():
if !ok {
return nil, scanner.Error()
}
if msg.TimeTick() <= s.exclusiveStartTimeTick {
// we should filter out the message that less than or equal to this time tick to remove duplicate message
// when we switch from tailing mode to catchup mode.
continue
}
if err := s.HandleMessage(ctx, msg); err != nil {
return nil, err
}
if msg.MessageType() != message.MessageTypeTimeTick {
continue
}
// Here's a timetick message from the scanner, make tailing read if we catch up the writeahead buffer.
if reader, err := s.writeAheadBuffer.ReadFromExclusiveTimeTick(ctx, msg.TimeTick()); err == nil {
s.logger.Info(
"scanner consuming was interrpted because catup done",
zap.Uint64("timetick", msg.TimeTick()),
zap.Stringer("messageID", msg.MessageID()),
zap.Stringer("lastConfirmedMessageID", msg.LastConfirmedMessageID()),
)
return &tailingScanner{
switchableScannerImpl: s.switchableScannerImpl,
reader: reader,
lastConsumedMessage: msg,
}, nil
}
}
}
}
func (s *catchupScanner) createReaderWithBackoff(ctx context.Context, deliverPolicy options.DeliverPolicy) (walimpls.ScannerImpls, error) {
backoffTimer := typeutil.NewBackoffTimer(typeutil.BackoffTimerConfig{
Default: 5 * time.Second,
Backoff: typeutil.BackoffConfig{
InitialInterval: 100 * time.Millisecond,
Multiplier: 2.0,
MaxInterval: 5 * time.Second,
},
})
backoffTimer.EnableBackoff()
for {
innerScanner, err := s.innerWAL.Read(ctx, walimpls.ReadOption{
Name: s.scannerName,
DeliverPolicy: deliverPolicy,
})
if err == nil {
return innerScanner, nil
}
if ctx.Err() != nil {
// The scanner is closing, so stop the backoff.
return nil, ctx.Err()
}
waker, nextInterval := backoffTimer.NextTimer()
s.logger.Warn("create underlying scanner for wal scanner, start a backoff",
zap.Duration("nextInterval", nextInterval),
zap.Error(err),
)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-waker:
}
}
}
// tailingScanner is used to perform a tailing read from the writeaheadbuffer of wal.
type tailingScanner struct {
switchableScannerImpl
reader *wab.WriteAheadBufferReader
lastConsumedMessage message.ImmutableMessage
}
func (s *tailingScanner) Mode() string {
return "Tailing"
}
func (s *tailingScanner) Do(ctx context.Context) (switchableScanner, error) {
for {
msg, err := s.reader.Next(ctx)
if errors.Is(err, wab.ErrEvicted) {
// The tailing read is failure, switch into catchup mode.
s.logger.Info(
"scanner consuming was interrpted because tailing eviction",
zap.Uint64("timetick", s.lastConsumedMessage.TimeTick()),
zap.Stringer("messageID", s.lastConsumedMessage.MessageID()),
zap.Stringer("lastConfirmedMessageID", s.lastConsumedMessage.LastConfirmedMessageID()),
)
return &catchupScanner{
switchableScannerImpl: s.switchableScannerImpl,
deliverPolicy: options.DeliverPolicyStartFrom(s.lastConsumedMessage.LastConfirmedMessageID()),
exclusiveStartTimeTick: s.lastConsumedMessage.TimeTick(),
}, nil
}
if err != nil {
return nil, err
}
if err := s.HandleMessage(ctx, msg); err != nil {
return nil, err
}
s.lastConsumedMessage = msg
}
}

View File

@ -111,7 +111,6 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage)
func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) {
if notPersistHint := utility.GetNotPersisted(ctx); notPersistHint != nil {
// do not persist the message if the hint is set.
// only used by time tick sync operator.
return notPersistHint.MessageID, nil
}
metricsGuard.StartWALImplAppend()

View File

@ -9,14 +9,15 @@ import (
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.uber.org/atomic"
"github.com/milvus-io/milvus/internal/mocks/mock_metastore"
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/interceptors/mock_wab"
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/interceptors/timetick/mock_inspector"
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/mock_interceptors"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/mocks/streaming/mock_walimpls"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
@ -27,14 +28,32 @@ import (
)
func TestWalAdaptorReadFail(t *testing.T) {
resource.InitForTest(t)
l := mock_walimpls.NewMockWALImpls(t)
expectedErr := errors.New("test")
l.EXPECT().WALName().Return("test")
l.EXPECT().Channel().Return(types.PChannelInfo{})
cnt := atomic.NewInt64(2)
l.EXPECT().Read(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, ro walimpls.ReadOption) (walimpls.ScannerImpls, error) {
if cnt.Dec() < 0 {
s := mock_walimpls.NewMockScannerImpls(t)
s.EXPECT().Chan().Return(make(chan message.ImmutableMessage, 1))
s.EXPECT().Close().Return(nil)
return s, nil
}
return nil, expectedErr
})
}).Maybe()
writeAheadBuffer := mock_wab.NewMockROWriteAheadBuffer(t)
operator := mock_inspector.NewMockTimeTickSyncOperator(t)
operator.EXPECT().Channel().Return(types.PChannelInfo{}).Maybe()
operator.EXPECT().Sync(mock.Anything).Return().Maybe()
operator.EXPECT().WriteAheadBuffer(mock.Anything).Return(writeAheadBuffer, nil).Maybe()
resource.Resource().TimeTickInspector().RegisterSyncOperator(
operator,
)
lAdapted := adaptImplsToWAL(l, nil, func() {})
scanner, err := lAdapted.Read(context.Background(), wal.ReadOption{
@ -42,7 +61,8 @@ func TestWalAdaptorReadFail(t *testing.T) {
})
assert.NoError(t, err)
assert.NotNil(t, scanner)
assert.ErrorIs(t, scanner.Error(), expectedErr)
time.Sleep(time.Second)
scanner.Close()
}
func TestWALAdaptor(t *testing.T) {
@ -52,9 +72,10 @@ func TestWALAdaptor(t *testing.T) {
resource.InitForTest(t, resource.OptStreamingNodeCatalog(snMeta))
operator := mock_inspector.NewMockTimeTickSyncOperator(t)
operator.EXPECT().TimeTickNotifier().Return(inspector.NewTimeTickNotifier())
operator.EXPECT().Channel().Return(types.PChannelInfo{})
operator.EXPECT().Sync(mock.Anything).Return()
buffer := mock_wab.NewMockROWriteAheadBuffer(t)
operator.EXPECT().WriteAheadBuffer(mock.Anything).Return(buffer, nil)
resource.Resource().TimeTickInspector().RegisterSyncOperator(operator)
// Create a mock WAL implementation

View File

@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)
@ -60,6 +61,11 @@ func TestWAL(t *testing.T) {
}
func initResourceForTest(t *testing.T) {
paramtable.Init()
params := paramtable.Get()
params.Save(params.StreamingCfg.WALWriteAheadBufferKeepalive.Key, "500ms")
params.Save(params.StreamingCfg.WALWriteAheadBufferCapacity.Key, "10k")
rc := idalloc.NewMockRootCoordClient(t)
rc.EXPECT().GetPChannelInfo(mock.Anything, mock.Anything).Return(&rootcoordpb.GetPChannelInfoResponse{}, nil)
@ -345,8 +351,14 @@ func (f *testOneWALFramework) testRead(ctx context.Context, w wal.WAL) ([]messag
expectedCnt := f.messageCount + len(f.written)
msgs := make([]message.ImmutableMessage, 0, expectedCnt)
cnt := 5
for {
msg, ok := <-s.Chan()
// make a random slow down to trigger cache expire.
if rand.Int31n(10) == 0 && cnt > 0 {
cnt--
time.Sleep(time.Duration(rand.Int31n(500)+100) * time.Millisecond)
}
if msg.MessageType() != message.MessageTypeInsert && msg.MessageType() != message.MessageTypeTxn {
continue
}

View File

@ -17,6 +17,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -200,8 +201,10 @@ func TestAckManager(t *testing.T) {
time.Sleep(time.Duration(rand.Int31n(5)) * time.Millisecond)
id, err := resource.Resource().TSOAllocator().Allocate(ctx)
assert.NoError(t, err)
msg := mock_message.NewMockImmutableMessage(t)
msg.EXPECT().MessageID().Return(walimplstest.NewTestMessageID(int64(id))).Maybe()
ts.Ack(
OptMessageID(walimplstest.NewTestMessageID(int64(id))),
OptImmutableMessage(msg),
)
}()
}
@ -216,9 +219,9 @@ func TestAckManager(t *testing.T) {
time.Sleep(time.Duration(rand.Int31n(5)) * time.Millisecond)
id, err := resource.Resource().TSOAllocator().Allocate(ctx)
assert.NoError(t, err)
ts.Ack(
OptMessageID(walimplstest.NewTestMessageID(int64(id))),
)
msg := mock_message.NewMockImmutableMessage(t)
msg.EXPECT().MessageID().Return(walimplstest.NewTestMessageID(int64(id))).Maybe()
ts.Ack(OptImmutableMessage(msg))
}(i)
}
wg.Wait()

View File

@ -26,7 +26,7 @@ type AckDetail struct {
EndTimestamp uint64 // the timestamp when acker is acknowledged.
// for avoiding allocation of timestamp failure, the timestamp will use the ack manager last allocated timestamp.
LastConfirmedMessageID message.MessageID
MessageID message.MessageID
Message message.ImmutableMessage
TxnSession *txn.TxnSession
IsSync bool
Err error
@ -49,10 +49,10 @@ func OptError(err error) AckOption {
}
}
// OptMessageID marks the message id for acker.
func OptMessageID(messageID message.MessageID) AckOption {
// OptImmutableMessage marks the acker is done.
func OptImmutableMessage(msg message.ImmutableMessage) AckOption {
return func(detail *AckDetail) {
detail.MessageID = messageID
detail.Message = msg
}
}

View File

@ -28,8 +28,9 @@ func TestDetail(t *testing.T) {
OptError(errors.New("test"))(ackDetail)
assert.Error(t, ackDetail.Err)
OptMessageID(walimplstest.NewTestMessageID(1))(ackDetail)
assert.NotNil(t, ackDetail.MessageID)
msg := mock_message.NewMockImmutableMessage(t)
OptImmutableMessage(msg)(ackDetail)
assert.NotNil(t, ackDetail.Message)
OptTxnSession(&txn.TxnSession{})(ackDetail)
assert.NotNil(t, ackDetail.TxnSession)

View File

@ -33,7 +33,7 @@ func (m *lastConfirmedManager) AddConfirmedDetails(details sortedDetails, ts uin
}
m.notDoneTxnMessage.Push(&uncommittedTxnInfo{
session: detail.TxnSession,
messageID: detail.MessageID,
messageID: detail.Message.MessageID(),
})
}
m.updateLastConfirmedMessageID(ts)

View File

@ -3,15 +3,17 @@ package inspector
import (
"context"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/wab"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
)
type TimeTickSyncOperator interface {
TimeTickNotifier() *TimeTickNotifier
// Channel returns the pchannel info.
Channel() types.PChannelInfo
// WriteAheadBuffer get the related WriteAhead buffer.
WriteAheadBuffer(ctx context.Context) (wab.ROWriteAheadBuffer, error)
// Sync trigger a sync operation, try to send the timetick message into wal.
// Sync operation is a blocking operation, and not thread-safe, will only call in one goroutine.
Sync(ctx context.Context)

View File

@ -3,7 +3,6 @@ package inspector
import (
"sync"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -50,81 +49,3 @@ func (n *syncNotifier) Get() typeutil.Set[types.PChannelInfo] {
n.cond.L.Unlock()
return signal
}
// TimeTickInfo records the information of time tick.
type TimeTickInfo struct {
MessageID message.MessageID // the message id.
TimeTick uint64 // the time tick.
LastConfirmedMessageID message.MessageID // the last confirmed message id.
// The time tick may be updated, without last timetickMessage
}
// IsZero returns true if the time tick info is zero.
func (t *TimeTickInfo) IsZero() bool {
return t.TimeTick == 0
}
// NewTimeTickNotifier creates a new time tick info listener.
func NewTimeTickNotifier() *TimeTickNotifier {
return &TimeTickNotifier{
cond: syncutil.NewContextCond(&sync.Mutex{}),
info: TimeTickInfo{},
}
}
// TimeTickNotifier is a listener for time tick info.
type TimeTickNotifier struct {
cond *syncutil.ContextCond
info TimeTickInfo
}
// Update only update the time tick info, but not notify the waiter.
func (l *TimeTickNotifier) Update(info TimeTickInfo) {
l.cond.L.Lock()
if l.info.IsZero() || l.info.MessageID.LT(info.MessageID) {
l.info = info
}
l.cond.L.Unlock()
}
// OnlyUpdateTs only updates the time tick, and notify the waiter.
func (l *TimeTickNotifier) OnlyUpdateTs(timetick uint64) {
l.cond.LockAndBroadcast()
if !l.info.IsZero() && l.info.TimeTick < timetick {
l.info.TimeTick = timetick
}
l.cond.L.Unlock()
}
// WatchAtMessageID watch the message id.
// If the message id is not equal to the last message id, return nil channel.
// Or if the time tick is less than the last time tick, return channel.
func (l *TimeTickNotifier) WatchAtMessageID(messageID message.MessageID, ts uint64) <-chan struct{} {
l.cond.L.Lock()
// If incoming messageID is less than the producer messageID,
// the consumer can read the new greater messageID from wal,
// so the watch operation is not necessary.
if l.info.IsZero() || messageID.LT(l.info.MessageID) {
l.cond.L.Unlock()
return nil
}
// messageID may be greater than MessageID in notifier.
// because consuming operation is fast than produce operation.
// so doing a listening here.
if ts < l.info.TimeTick {
ch := make(chan struct{})
close(ch)
l.cond.L.Unlock()
return ch
}
return l.cond.WaitChan()
}
// Get gets the time tick info.
func (l *TimeTickNotifier) Get() TimeTickInfo {
l.cond.L.Lock()
info := l.info
l.cond.L.Unlock()
return info
}

View File

@ -6,7 +6,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
)
func TestSyncNotifier(t *testing.T) {
@ -40,36 +39,3 @@ func shouldBeBlocked(ch <-chan struct{}) {
default:
}
}
func TestTimeTickNotifier(t *testing.T) {
n := NewTimeTickNotifier()
info := n.Get()
assert.True(t, info.IsZero())
msgID := walimplstest.NewTestMessageID(1)
assert.Nil(t, n.WatchAtMessageID(msgID, 0))
n.Update(TimeTickInfo{
MessageID: msgID,
TimeTick: 2,
LastConfirmedMessageID: walimplstest.NewTestMessageID(0),
})
ch := n.WatchAtMessageID(msgID, 0)
assert.NotNil(t, ch)
<-ch // should not block.
ch = n.WatchAtMessageID(msgID, 2)
assert.NotNil(t, ch)
shouldBeBlocked(ch) // should block.
n.OnlyUpdateTs(3)
<-ch // should not block.
info = n.Get()
assert.Equal(t, uint64(3), info.TimeTick)
ch = n.WatchAtMessageID(msgID, 3)
n.Update(TimeTickInfo{
MessageID: walimplstest.NewTestMessageID(3),
TimeTick: 4,
})
shouldBeBlocked(ch)
}

View File

@ -59,7 +59,7 @@ func (impl *timeTickAppendInterceptor) DoAppend(ctx context.Context, msg message
return
}
acker.Ack(
ack.OptMessageID(msgID),
ack.OptImmutableMessage(msg.IntoImmutableMessage(msgID)),
ack.OptTxnSession(txnSession),
)
}()
@ -204,7 +204,6 @@ func (impl *timeTickAppendInterceptor) appendMsg(
if err != nil {
return nil, err
}
utility.ReplaceAppendResultTimeTick(ctx, msg.TimeTick())
utility.ReplaceAppendResultTxnContext(ctx, msg.TxnContext())
return msgID, nil

View File

@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/ack"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/wab"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility"
"github.com/milvus-io/milvus/pkg/log"
@ -39,7 +40,6 @@ func newTimeTickSyncOperator(param interceptors.InterceptorBuildParam) *timeTick
ackManager: nil,
ackDetails: ack.NewAckDetails(),
sourceID: paramtable.GetNodeID(),
timeTickNotifier: inspector.NewTimeTickNotifier(),
metrics: metricsutil.NewTimeTickMetrics(param.WALImpls.Channel().Name),
}
}
@ -56,20 +56,28 @@ type timeTickSyncOperator struct {
ackManager *ack.AckManager // ack manager.
ackDetails *ack.AckDetails // all acknowledged details, all acked messages but not sent to wal will be kept here.
sourceID int64 // the current node id.
timeTickNotifier *inspector.TimeTickNotifier // used to notify the time tick change.
writeAheadBuffer *wab.WriteAheadBuffer // write ahead buffer.
metrics *metricsutil.TimeTickMetrics
}
// WriteAheadBuffer returns the write ahead buffer.
func (impl *timeTickSyncOperator) WriteAheadBuffer(ctx context.Context) (wab.ROWriteAheadBuffer, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-impl.ready:
}
if impl.writeAheadBuffer == nil {
panic("unreachable write ahead buffer is not ready")
}
return impl.writeAheadBuffer, nil
}
// Channel returns the pchannel info.
func (impl *timeTickSyncOperator) Channel() types.PChannelInfo {
return impl.pchannel
}
// TimeTickNotifier returns the time tick notifier.
func (impl *timeTickSyncOperator) TimeTickNotifier() *inspector.TimeTickNotifier {
return impl.timeTickNotifier
}
// Sync trigger a sync operation.
// Sync operation is not thread safe, so call it in a single goroutine.
func (impl *timeTickSyncOperator) Sync(ctx context.Context) {
@ -143,7 +151,12 @@ func (impl *timeTickSyncOperator) blockUntilSyncTimeTickReady() error {
lastErr = errors.Wrap(err, "allocate timestamp failed")
continue
}
msgID, err := impl.sendPersistentTsMsg(impl.ctx, ts, nil, underlyingWALImpls.Append)
msg, err := NewTimeTickMsg(ts, nil, impl.sourceID)
if err != nil {
lastErr = errors.Wrap(err, "at build time tick msg")
continue
}
msgID, err := underlyingWALImpls.Append(impl.ctx, msg)
if err != nil {
lastErr = errors.Wrap(err, "send first timestamp message failed")
continue
@ -153,7 +166,15 @@ func (impl *timeTickSyncOperator) blockUntilSyncTimeTickReady() error {
impl.logger.Info(
"send first time tick success",
zap.Uint64("timestamp", ts),
zap.String("messageID", msgID.String()))
zap.Stringer("messageID", msgID))
capacity := int(paramtable.Get().StreamingCfg.WALWriteAheadBufferCapacity.GetAsSize())
keepalive := paramtable.Get().StreamingCfg.WALWriteAheadBufferKeepalive.GetAsDurationByParse()
impl.writeAheadBuffer = wab.NewWirteAheadBuffer(
impl.logger,
capacity,
keepalive,
msg.IntoImmutableMessage(msgID),
)
break
}
// interceptor is ready now.
@ -202,31 +223,36 @@ func (impl *timeTickSyncOperator) sendTsMsg(ctx context.Context, appender func(c
// Construct time tick message.
ts := impl.ackDetails.LastAllAcknowledgedTimestamp()
lastConfirmedMessageID := impl.ackDetails.EarliestLastConfirmedMessageID()
persist := !impl.ackDetails.IsNoPersistedMessage()
if impl.ackDetails.IsNoPersistedMessage() {
// there's no persisted message, so no need to send persistent time tick message.
return impl.sendNoPersistentTsMsg(ctx, ts, appender)
}
// otherwise, send persistent time tick message.
_, err := impl.sendPersistentTsMsg(ctx, ts, lastConfirmedMessageID, appender)
return err
return impl.sendTsMsgToWAL(ctx, ts, lastConfirmedMessageID, persist, appender)
}
// sendPersistentTsMsg sends persistent time tick message to wal.
func (impl *timeTickSyncOperator) sendPersistentTsMsg(ctx context.Context,
func (impl *timeTickSyncOperator) sendTsMsgToWAL(ctx context.Context,
ts uint64,
lastConfirmedMessageID message.MessageID,
persist bool,
appender func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error),
) (message.MessageID, error) {
) error {
msg, err := NewTimeTickMsg(ts, lastConfirmedMessageID, impl.sourceID)
if err != nil {
return nil, errors.Wrap(err, "at build time tick msg")
return errors.Wrap(err, "at build time tick msg")
}
if !persist {
// there's no persisted message, so no need to send persistent time tick message.
// With the hint of not persisted message, the underlying wal will not persist it.
// but the interceptors will still be triggered.
ctx = utility.WithNotPersisted(ctx, &utility.NotPersistedHint{
MessageID: lastConfirmedMessageID,
})
}
// Append it to wal.
msgID, err := appender(ctx, msg)
if err != nil {
return nil, errors.Wrapf(err,
return errors.Wrapf(err,
"append time tick msg to wal failed, timestamp: %d, previous message counter: %d",
impl.ackDetails.LastAllAcknowledgedTimestamp(),
impl.ackDetails.Len(),
@ -234,54 +260,20 @@ func (impl *timeTickSyncOperator) sendPersistentTsMsg(ctx context.Context,
}
// metrics updates
impl.metrics.CountPersistentTimeTickSync(ts)
impl.metrics.CountTimeTickSync(ts, persist)
msgs := make([]message.ImmutableMessage, 0, impl.ackDetails.Len())
impl.ackDetails.Range(func(detail *ack.AckDetail) bool {
impl.metrics.CountSyncTimeTick(detail.IsSync)
if !detail.IsSync && detail.Err == nil {
msgs = append(msgs, detail.Message)
}
return true
})
// Ack details has been committed to wal, clear it.
impl.ackDetails.Clear()
// Update last time tick message id and time tick.
impl.timeTickNotifier.Update(inspector.TimeTickInfo{
MessageID: msgID,
TimeTick: ts,
})
return msgID, nil
}
// sendNoPersistentTsMsg sends no persistent time tick message to wal.
func (impl *timeTickSyncOperator) sendNoPersistentTsMsg(ctx context.Context, ts uint64, appender func(ctx context.Context, msg message.MutableMessage) (message.MessageID, error)) error {
msg, err := NewTimeTickMsg(ts, nil, impl.sourceID)
if err != nil {
return errors.Wrap(err, "at build time tick msg when send no persist msg")
}
// with the hint of not persisted message, the underlying wal will not persist it.
// but the interceptors will still be triggered.
ctx = utility.WithNotPersisted(ctx, &utility.NotPersistedHint{
MessageID: impl.timeTickNotifier.Get().MessageID,
})
// Append it to wal.
_, err = appender(ctx, msg)
if err != nil {
return errors.Wrapf(err,
"append no persist time tick msg to wal failed, timestamp: %d, previous message counter: %d",
impl.ackDetails.LastAllAcknowledgedTimestamp(),
impl.ackDetails.Len(),
)
}
// metrics updates.
impl.metrics.CountMemoryTimeTickSync(ts)
impl.ackDetails.Range(func(detail *ack.AckDetail) bool {
impl.metrics.CountSyncTimeTick(detail.IsSync)
return true
})
// Ack details has been committed to wal, clear it.
impl.ackDetails.Clear()
// Only update time tick.
impl.timeTickNotifier.OnlyUpdateTs(ts)
tsMsg := msg.IntoImmutableMessage(msgID)
// Add it into write ahead buffer.
impl.writeAheadBuffer.Append(msgs, tsMsg)
return nil
}

View File

@ -12,7 +12,6 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/ack"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility"
"github.com/milvus-io/milvus/pkg/mocks/streaming/mock_walimpls"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
@ -66,40 +65,23 @@ func TestTimeTickSyncOperator(t *testing.T) {
ctx := context.Background()
ts, err := resource.Resource().TSOAllocator().Allocate(ctx)
assert.NoError(t, err)
ch := operator.TimeTickNotifier().WatchAtMessageID(msgID, ts)
shouldBlock(ch)
wb, err := operator.WriteAheadBuffer(ctx)
assert.NoError(t, err)
ctx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
defer cancel()
r, err := wb.ReadFromExclusiveTimeTick(ctx, ts)
assert.ErrorIs(t, err, context.DeadlineExceeded)
assert.Nil(t, r)
// should not trigger any wal operation, but only update the timetick.
operator.Sync(ctx)
// should not block because timetick updates.
<-ch
// Test alloc a real message but not ack.
// because the timetick message id is updated, so the old watcher should be invalidated.
ch = operator.TimeTickNotifier().WatchAtMessageID(msgID, operator.TimeTickNotifier().Get().TimeTick)
shouldBlock(ch)
acker, err := operator.AckManager().Allocate(ctx)
r, err = wb.ReadFromExclusiveTimeTick(context.Background(), ts)
assert.NoError(t, err)
// should block timetick notifier.
ts, _ = resource.Resource().TSOAllocator().Allocate(ctx)
ch = operator.TimeTickNotifier().WatchAtMessageID(walimplstest.NewTestMessageID(2), ts)
shouldBlock(ch)
// sync operation just do nothing, so there's no wal operation triggered.
operator.Sync(ctx)
// After ack, a wal operation will be trigger.
acker.Ack(ack.OptMessageID(msgID), ack.OptTxnSession(nil))
l.EXPECT().Append(mock.Anything, mock.Anything).Unset()
l.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (*types.AppendResult, error) {
ts, _ := resource.Resource().TSOAllocator().Allocate(ctx)
return &types.AppendResult{
MessageID: walimplstest.NewTestMessageID(2),
TimeTick: ts,
}, nil
})
// should trigger a wal operation.
operator.Sync(ctx)
// ch should still be blocked, because the timetick message id is updated, old message id watch is not notified.
shouldBlock(ch)
// should not block because timetick updates.
msg, err := r.Next(context.Background())
assert.NoError(t, err)
assert.NotNil(t, msg)
assert.Greater(t, msg.TimeTick(), ts)
}
func shouldBlock(ch <-chan struct{}) {

View File

@ -0,0 +1,176 @@
package wab
import (
"io"
"time"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
// ErrEvicted is returned when the expected message has been evicted.
var ErrEvicted = errors.New("message has been evicted")
// messageWithOffset is a message with an offset as a unique continuous identifier.
type messageWithOffset struct {
Message message.ImmutableMessage
Offset int
Eviction time.Time
}
// newPendingQueue creates a new pendingQueue with given configuration
func newPendingQueue(capacity int, keepAlive time.Duration, lastConfirmedMessage message.ImmutableMessage) *pendingQueue {
pq := &pendingQueue{
lastTimeTick: 0,
latestOffset: -1,
buf: make([]messageWithOffset, 0, 10),
size: 0,
capacity: capacity,
keepAlive: keepAlive,
}
pq.Push([]message.ImmutableMessage{lastConfirmedMessage})
return pq
}
// pendingQueue is a buffer that stores messages in order of time tick.
// pendingQueue only keep the persisted messages in the buffer.
type pendingQueue struct {
lastTimeTick uint64
latestOffset int
buf []messageWithOffset
size int
capacity int
keepAlive time.Duration
}
// Push adds messages to the buffer.
func (q *pendingQueue) Push(msgs []message.ImmutableMessage) {
now := time.Now()
for _, msg := range msgs {
q.pushOne(msg, now)
}
q.evict(now)
}
// Evict removes messages that have been in the buffer for longer than the keepAlive duration.
func (q *pendingQueue) Evict() {
q.evict(time.Now())
}
// CurrentOffset returns the next offset of the buffer.
func (q *pendingQueue) CurrentOffset() int {
return q.latestOffset
}
// push adds a message to the buffer.
func (q *pendingQueue) pushOne(msg message.ImmutableMessage, now time.Time) {
if msg.Version().EQ(message.VersionOld) {
panic("old message version is not supported")
}
if (msg.MessageType() == message.MessageTypeTimeTick && msg.TimeTick() < q.lastTimeTick) ||
(msg.MessageType() != message.MessageTypeTimeTick && msg.TimeTick() <= q.lastTimeTick) {
// only timetick message can be repeated with the last time tick.
panic("message time tick is not in ascending order")
}
q.latestOffset++
q.buf = append(q.buf, messageWithOffset{
Offset: q.latestOffset,
Message: msg,
Eviction: now.Add(q.keepAlive),
})
q.size += msg.EstimateSize()
q.lastTimeTick = msg.TimeTick()
}
// CreateSnapshotFromOffset creates a snapshot of the buffer from the given offset.
// The continuous slice of messages after [offset, ...] will be returned.
func (q *pendingQueue) CreateSnapshotFromOffset(offset int) ([]messageWithOffset, error) {
if offset > q.latestOffset {
if offset != q.latestOffset+1 {
panic("unreachable: bug here, the offset is not continuous")
}
// If the given version is a version that has not been generated yet, we reach the end of the buffer.
// Return io.EOF to perform a block operation.
return nil, io.EOF
}
if len(q.buf) == 0 || offset < q.buf[0].Offset {
// The expected version is out of range, the expected messages has been evicted.
// So return ErrEvicted to indicate a unrecoverable operation.
return nil, ErrEvicted
}
// Find the offset of the expected offset in the buffer.
idx := offset - q.buf[0].Offset
return q.makeSnapshot(idx), nil
}
// CreateSnapshotFromExclusiveTimeTick creates a snapshot of the buffer from the given timetick.
// The coutinous slice of messages after (timeTick, ...] will be returned.
func (q *pendingQueue) CreateSnapshotFromExclusiveTimeTick(timeTick uint64) ([]messageWithOffset, error) {
if timeTick >= q.lastTimeTick {
// If the given timetick is a timetick that has not been generated yet, we reach the end of the buffer.
// Return io.EOF to perform a block operation.
return nil, io.EOF
}
if len(q.buf) == 0 || timeTick < q.buf[0].Message.TimeTick() {
// The expected timetick is out of range, the expected messages may evict.
// So return ErrEvicted to indicate a unrecoverable operation.
return nil, ErrEvicted
}
// Find the offset of the expected timetick in the buffer.
idx := lowerboundOfMessageList(q.buf, timeTick)
return q.makeSnapshot(idx), nil
}
// makeSnapshot creates a snapshot of the buffer from the given offset.
func (q *pendingQueue) makeSnapshot(idx int) []messageWithOffset {
snapshot := make([]messageWithOffset, len(q.buf)-idx) // we need a extra position to set a time tick message.
copy(snapshot, q.buf[idx:])
return snapshot
}
// evict removes messages that have been in the buffer for longer than the keepAlive duration.
func (q *pendingQueue) evict(now time.Time) {
releaseUntilIdx := -1
needRelease := 0
if q.size > q.capacity {
needRelease = q.size - q.capacity
}
for i := 0; i < len(q.buf); i++ {
if q.buf[i].Eviction.Before(now) || needRelease > 0 {
releaseUntilIdx = i
needRelease -= q.buf[i].Message.EstimateSize()
} else {
break
}
}
preservedIdx := releaseUntilIdx + 1
if preservedIdx > 0 {
for i := 0; i < preservedIdx; i++ {
// reset the message as zero to release the resource.
q.size -= q.buf[i].Message.EstimateSize()
q.buf[i] = messageWithOffset{}
}
q.buf = q.buf[preservedIdx:]
}
}
// lowerboundOfMessageList returns the lowerbound of the message list.
func lowerboundOfMessageList(data []messageWithOffset, timetick uint64) int {
// perform a lowerbound search here.
left, right := 0, len(data)-1
result := -1
for left <= right {
mid := (left + right) / 2
if data[mid].Message.TimeTick() > timetick {
result = mid
right = mid - 1
} else {
left = mid + 1
}
}
return result
}

View File

@ -0,0 +1,138 @@
package wab
import (
"io"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
func TestPendingQueue(t *testing.T) {
pq := newPendingQueue(100, 5*time.Second, newImmutableTimeTickMessage(t, 99))
snapshot, err := pq.CreateSnapshotFromOffset(0)
assert.NoError(t, err)
assert.Len(t, snapshot, 1)
snapshot, err = pq.CreateSnapshotFromExclusiveTimeTick(100)
assert.ErrorIs(t, err, io.EOF)
assert.Nil(t, snapshot)
pq.Push([]message.ImmutableMessage{
newImmutableMessage(t, 100, 10),
newImmutableMessage(t, 101, 20),
newImmutableMessage(t, 103, 30),
newImmutableMessage(t, 104, 40),
})
assert.Equal(t, pq.CurrentOffset(), 4)
assert.Len(t, pq.buf, 5)
snapshot, err = pq.CreateSnapshotFromExclusiveTimeTick(100)
assert.NoError(t, err)
assert.Len(t, snapshot, 3)
assert.Equal(t, snapshot[0].Message.TimeTick(), uint64(101))
assert.Equal(t, snapshot[2].Message.TimeTick(), uint64(104))
snapshot, err = pq.CreateSnapshotFromExclusiveTimeTick(98)
assert.ErrorIs(t, err, ErrEvicted)
assert.Nil(t, snapshot)
snapshot, err = pq.CreateSnapshotFromExclusiveTimeTick(102)
assert.NoError(t, err)
assert.Len(t, snapshot, 2)
assert.Equal(t, snapshot[0].Message.TimeTick(), uint64(103))
assert.Equal(t, snapshot[1].Message.TimeTick(), uint64(104))
snapshot, err = pq.CreateSnapshotFromExclusiveTimeTick(104)
assert.ErrorIs(t, err, io.EOF)
assert.Nil(t, snapshot)
snapshot, err = pq.CreateSnapshotFromExclusiveTimeTick(105)
assert.ErrorIs(t, err, io.EOF)
assert.Nil(t, snapshot)
snapshot, err = pq.CreateSnapshotFromOffset(1)
assert.NoError(t, err)
assert.Len(t, snapshot, 4)
assert.Equal(t, snapshot[0].Message.TimeTick(), uint64(100))
assert.Equal(t, snapshot[3].Message.TimeTick(), uint64(104))
snapshot, err = pq.CreateSnapshotFromOffset(3)
assert.NoError(t, err)
assert.Len(t, snapshot, 2)
assert.Equal(t, snapshot[0].Message.TimeTick(), uint64(103))
assert.Equal(t, snapshot[1].Message.TimeTick(), uint64(104))
snapshot, err = pq.CreateSnapshotFromOffset(4)
assert.NoError(t, err)
assert.Len(t, snapshot, 1)
assert.Equal(t, snapshot[0].Message.TimeTick(), uint64(104))
snapshot, err = pq.CreateSnapshotFromOffset(5)
assert.ErrorIs(t, err, io.EOF)
assert.Nil(t, snapshot)
// push a new item will trigger eviction
snapshot, err = pq.CreateSnapshotFromOffset(1)
assert.NoError(t, err, io.EOF)
assert.Len(t, snapshot, 4)
pq.Push([]message.ImmutableMessage{
newImmutableMessage(t, 105, 60),
})
assert.Equal(t, pq.CurrentOffset(), 5)
assert.Len(t, pq.buf, 2)
// Previous snapshot should be available.
assert.Equal(t, snapshot[0].Message.TimeTick(), uint64(100))
assert.Equal(t, snapshot[1].Message.TimeTick(), uint64(101))
assert.Equal(t, snapshot[2].Message.TimeTick(), uint64(103))
assert.Equal(t, snapshot[3].Message.TimeTick(), uint64(104))
// offset 2 should be evcited
snapshot, err = pq.CreateSnapshotFromOffset(3)
assert.ErrorIs(t, err, ErrEvicted)
assert.Nil(t, snapshot)
// offset 3 should be ok.
snapshot, err = pq.CreateSnapshotFromOffset(4)
assert.NoError(t, err)
assert.Len(t, snapshot, 2)
assert.Equal(t, snapshot[0].Message.TimeTick(), uint64(104))
assert.Equal(t, snapshot[1].Message.TimeTick(), uint64(105))
// Test time based eviction
pq = newPendingQueue(100, 10*time.Millisecond, newImmutableTimeTickMessage(t, 99))
pq.Push([]message.ImmutableMessage{
newImmutableMessage(t, 100, 10),
})
assert.Equal(t, pq.CurrentOffset(), 1)
assert.Len(t, pq.buf, 2)
time.Sleep(20 * time.Millisecond)
pq.Evict()
assert.Len(t, pq.buf, 0)
assert.Panics(t, func() {
pq.Push([]message.ImmutableMessage{newImmutableMessage(t, 99, 10)})
})
}
func newImmutableMessage(t *testing.T, timetick uint64, estimateSize int) message.ImmutableMessage {
msg := mock_message.NewMockImmutableMessage(t)
msg.EXPECT().TimeTick().Return(timetick).Maybe()
msg.EXPECT().EstimateSize().Return(estimateSize).Maybe()
msg.EXPECT().Version().Return(message.VersionV1).Maybe()
msg.EXPECT().MessageType().Return(message.MessageTypeInsert).Maybe()
return msg
}
func newImmutableTimeTickMessage(t *testing.T, timetick uint64) message.ImmutableMessage {
msg := mock_message.NewMockImmutableMessage(t)
msg.EXPECT().TimeTick().Return(timetick).Maybe()
msg.EXPECT().EstimateSize().Return(0).Maybe()
msg.EXPECT().MessageType().Return(message.MessageTypeTimeTick).Maybe()
msg.EXPECT().Version().Return(message.VersionV1).Maybe()
return msg
}

View File

@ -0,0 +1,46 @@
package wab
import (
"context"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
// WriteAheadBufferReader is used to read messages from WriteAheadBuffer.
type WriteAheadBufferReader struct {
nextOffset int
lastTimeTick uint64
snapshot []messageWithOffset
underlyingBuf *WriteAheadBuffer
}
// Next returns the next message in the buffer.
func (r *WriteAheadBufferReader) Next(ctx context.Context) (message.ImmutableMessage, error) {
// Consume snapshot first.
if msg := r.nextFromSnapshot(); msg != nil {
return msg, nil
}
snapshot, err := r.underlyingBuf.createSnapshotFromOffset(ctx, r.nextOffset, r.lastTimeTick)
if err != nil {
return nil, err
}
r.snapshot = snapshot
return r.nextFromSnapshot(), nil
}
// nextFromSnapshot returns the next message from the snapshot.
func (r *WriteAheadBufferReader) nextFromSnapshot() message.ImmutableMessage {
if len(r.snapshot) == 0 {
return nil
}
nextMsg := r.snapshot[0]
newNextOffset := nextMsg.Offset + 1
if newNextOffset < r.nextOffset {
panic("unreachable: next offset should be monotonically increasing")
}
r.nextOffset = newNextOffset
r.lastTimeTick = nextMsg.Message.TimeTick()
r.snapshot = r.snapshot[1:]
return nextMsg.Message
}

View File

@ -0,0 +1,158 @@
package wab
import (
"context"
"io"
"sync"
"time"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)
var _ ROWriteAheadBuffer = (*WriteAheadBuffer)(nil)
// ROWriteAheadBuffer is the interface of the read-only write-ahead buffer.
type ROWriteAheadBuffer interface {
// ReadFromExclusiveTimeTick reads messages from the buffer from the exclusive time tick.
// Return a reader if the timetick can be consumed from the write-ahead buffer, otherwise return error.
ReadFromExclusiveTimeTick(ctx context.Context, timetick uint64) (*WriteAheadBufferReader, error)
}
// NewWriteAheadBuffer creates a new WriteAheadBuffer.
func NewWirteAheadBuffer(
logger *log.MLogger,
capacity int,
keepalive time.Duration,
lastConfirmedTimeTickMessage message.ImmutableMessage,
) *WriteAheadBuffer {
return &WriteAheadBuffer{
logger: logger,
cond: syncutil.NewContextCond(&sync.Mutex{}),
pendingMessages: newPendingQueue(capacity, keepalive, lastConfirmedTimeTickMessage),
lastTimeTickMessage: lastConfirmedTimeTickMessage,
}
}
// WriteAheadBuffer is a buffer that stores messages in order of time tick.
type WriteAheadBuffer struct {
logger *log.MLogger
cond *syncutil.ContextCond
pendingMessages *pendingQueue // The pending message is always sorted by timetick in monotonic ascending order.
// Only keep the persisted messages in the buffer.
lastTimeTickMessage message.ImmutableMessage
}
// Append appends a message to the buffer.
func (w *WriteAheadBuffer) Append(msgs []message.ImmutableMessage, tsMsg message.ImmutableMessage) {
w.cond.LockAndBroadcast()
defer w.cond.L.Unlock()
if tsMsg.MessageType() != message.MessageTypeTimeTick {
panic("the message is not a time tick message")
}
if tsMsg.TimeTick() <= w.lastTimeTickMessage.TimeTick() {
panic("the time tick of the message is less or equal than the last time tick message")
}
if len(msgs) > 0 {
if msgs[0].TimeTick() <= w.lastTimeTickMessage.TimeTick() {
panic("the time tick of the message is less than or equal to the last time tick message")
}
if msgs[len(msgs)-1].TimeTick() > tsMsg.TimeTick() {
panic("the time tick of the message is greater than the time tick message")
}
// if the len(msgs) > 0, the tsMsg is a persisted message.
w.pendingMessages.Push(msgs)
w.pendingMessages.Push([]message.ImmutableMessage{tsMsg})
} else {
w.pendingMessages.Evict()
}
w.lastTimeTickMessage = tsMsg
}
// ReadFromExclusiveTimeTick reads messages from the buffer from the exclusive time tick.
func (w *WriteAheadBuffer) ReadFromExclusiveTimeTick(ctx context.Context, timetick uint64) (*WriteAheadBufferReader, error) {
snapshot, nextOffset, err := w.createSnapshotFromTimeTick(ctx, timetick)
if err != nil {
return nil, err
}
return &WriteAheadBufferReader{
nextOffset: nextOffset,
snapshot: snapshot,
underlyingBuf: w,
}, nil
}
// createSnapshotFromOffset creates a snapshot of the buffer from the given offset.
func (w *WriteAheadBuffer) createSnapshotFromOffset(ctx context.Context, offset int, timeTick uint64) ([]messageWithOffset, error) {
w.cond.L.Lock()
for {
msgs, err := w.pendingMessages.CreateSnapshotFromOffset(offset)
if err == nil {
w.cond.L.Unlock()
return msgs, nil
}
if !errors.Is(err, io.EOF) {
w.cond.L.Unlock()
return nil, err
}
// error is eof, which means that the time tick is behind the message buffer.
// check if the last time tick is greater than the given time tick.
// if so, return it to update the timetick.
// lastTimeTickMessage will never be nil if call this api.
if w.lastTimeTickMessage.TimeTick() > timeTick {
msg := messageWithOffset{
Message: w.lastTimeTickMessage,
Offset: w.pendingMessages.CurrentOffset(),
}
w.cond.L.Unlock()
return []messageWithOffset{msg}, nil
}
// Block until the buffer updates.
if err := w.cond.Wait(ctx); err != nil {
return nil, err
}
}
}
// createSnapshotFromTimeTick creates a snapshot of the buffer from the given time tick.
func (w *WriteAheadBuffer) createSnapshotFromTimeTick(ctx context.Context, timeTick uint64) ([]messageWithOffset, int, error) {
w.cond.L.Lock()
for {
msgs, err := w.pendingMessages.CreateSnapshotFromExclusiveTimeTick(timeTick)
if err == nil {
w.cond.L.Unlock()
return msgs, msgs[0].Offset, nil
}
if !errors.Is(err, io.EOF) {
w.cond.L.Unlock()
return nil, 0, err
}
// error is eof, which means that the time tick is behind the message buffer.
// The lastTimeTickMessage should always be greater or equal to the lastTimeTick in the pending queue.
if w.lastTimeTickMessage.TimeTick() > timeTick {
// check if the last time tick is greater than the given time tick, return it to update the timetick.
msg := messageWithOffset{
Message: w.lastTimeTickMessage,
Offset: w.pendingMessages.CurrentOffset(), // We add a extra timetick message, so reuse the current offset.
}
w.cond.L.Unlock()
return []messageWithOffset{msg}, msg.Offset, nil
}
if w.lastTimeTickMessage.TimeTick() == timeTick {
offset := w.pendingMessages.CurrentOffset() + 1
w.cond.L.Unlock()
return nil, offset, nil
}
if err := w.cond.Wait(ctx); err != nil {
return nil, 0, err
}
}
}

View File

@ -0,0 +1,254 @@
package wab
import (
"context"
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)
func TestWriteAheadBufferWithOnlyTrivialTimeTick(t *testing.T) {
ctx := context.Background()
wb := NewWirteAheadBuffer(log.With(), 5*1024*1024, 30*time.Second, createTimeTickMessage(0))
// Test timeout
ctx, cancel := context.WithTimeout(ctx, 1*time.Millisecond)
defer cancel()
r, err := wb.ReadFromExclusiveTimeTick(ctx, 100)
assert.Nil(t, r)
assert.ErrorIs(t, err, context.DeadlineExceeded)
readErr := syncutil.NewFuture[struct{}]()
expectedLastTimeTick := uint64(10000)
go func() {
r, err := wb.ReadFromExclusiveTimeTick(context.Background(), 100)
assert.NoError(t, err)
assert.NotNil(t, r)
lastTimeTick := uint64(0)
for {
msg, err := r.Next(context.Background())
assert.NoError(t, err)
assert.NotNil(t, msg)
assert.Greater(t, msg.TimeTick(), lastTimeTick)
lastTimeTick = msg.TimeTick()
if msg.TimeTick() > expectedLastTimeTick {
break
}
}
// Because there's no more message updated, so the Next operation should be blocked forever.
ctx, cancel = context.WithTimeout(ctx, 5*time.Millisecond)
defer cancel()
msg, err := r.Next(ctx)
assert.Nil(t, msg)
assert.ErrorIs(t, err, context.DeadlineExceeded)
readErr.Set(struct{}{})
}()
// Current the cache last timetick will be push to 100,
// But we make a exclusive read, so the read operation should be blocked.
wb.Append(nil, createTimeTickMessage(100))
ctx, cancel = context.WithTimeout(ctx, 5*time.Millisecond)
defer cancel()
_, err = readErr.GetWithContext(ctx)
assert.ErrorIs(t, err, context.DeadlineExceeded)
nextTimeTick := uint64(100)
for {
nextTimeTick += uint64(rand.Int31n(1000))
wb.Append(nil, createTimeTickMessage(nextTimeTick))
if nextTimeTick > expectedLastTimeTick {
break
}
}
readErr.Get()
r, err = wb.ReadFromExclusiveTimeTick(context.Background(), 0)
assert.NoError(t, err)
msg, err := r.Next(context.Background())
assert.NoError(t, err)
assert.Equal(t, message.MessageTypeTimeTick, msg.MessageType())
assert.Equal(t, nextTimeTick, msg.TimeTick())
}
func TestWriteAheadBuffer(t *testing.T) {
// Concurrent add message into bufffer and make syncup.
// The reader should never lost any message if no eviction happen.
wb := NewWirteAheadBuffer(log.With(), 5*1024*1024, 30*time.Second, createTimeTickMessage(1))
expectedLastTimeTick := uint64(10000)
ch := make(chan struct{})
totalCnt := 0
go func() {
defer close(ch)
nextTimeTick := uint64(100)
for {
msgs := make([]message.ImmutableMessage, 0)
for i := 0; i < int(rand.Int31n(10))+1; i++ {
nextTimeTick += uint64(rand.Int31n(100) + 1)
msgs = append(msgs, createInsertMessage(nextTimeTick))
if nextTimeTick > expectedLastTimeTick {
break
}
}
wb.Append(msgs, createTimeTickMessage(msgs[len(msgs)-1].TimeTick()))
totalCnt += (len(msgs) + 1)
if nextTimeTick > expectedLastTimeTick {
break
}
}
}()
r1, err := wb.ReadFromExclusiveTimeTick(context.Background(), 1)
assert.NoError(t, err)
assert.NotNil(t, r1)
lastTimeTick := uint64(0)
timeticks := make([]uint64, 0)
for {
msg, err := r1.Next(context.Background())
assert.NoError(t, err)
if msg.MessageType() == message.MessageTypeTimeTick {
assert.GreaterOrEqual(t, msg.TimeTick(), lastTimeTick)
} else {
assert.Greater(t, msg.TimeTick(), lastTimeTick)
}
lastTimeTick = msg.TimeTick()
timeticks = append(timeticks, msg.TimeTick())
if msg.TimeTick() > expectedLastTimeTick {
break
}
}
msg, err := r1.Next(context.Background())
// There should be a time tick message.
assert.NoError(t, err)
assert.Equal(t, message.MessageTypeTimeTick, msg.MessageType())
// Read from half of the timetick
<-ch
assert.Equal(t, totalCnt, len(timeticks))
targetTimeTickIdx := len(timeticks) / 2
for targetTimeTickIdx < len(timeticks) && timeticks[targetTimeTickIdx+1] == timeticks[targetTimeTickIdx] {
targetTimeTickIdx++
}
targetTimeTick := timeticks[targetTimeTickIdx]
r2, err := wb.ReadFromExclusiveTimeTick(context.Background(), targetTimeTick)
assert.NoError(t, err)
assert.NotNil(t, r2)
lastTimeTick = uint64(0)
for i := 1; ; i++ {
msg, err := r2.Next(context.Background())
assert.NoError(t, err)
if msg.MessageType() == message.MessageTypeTimeTick {
assert.GreaterOrEqual(t, msg.TimeTick(), lastTimeTick)
} else {
assert.Greater(t, msg.TimeTick(), lastTimeTick)
}
lastTimeTick = msg.TimeTick()
assert.Equal(t, timeticks[targetTimeTickIdx+i], msg.TimeTick())
if msg.TimeTick() > expectedLastTimeTick {
break
}
}
msg, err = r2.Next(context.Background())
// There should be a time tick message.
assert.NoError(t, err)
assert.Equal(t, message.MessageTypeTimeTick, msg.MessageType())
rEvicted, err := wb.ReadFromExclusiveTimeTick(context.Background(), 0)
assert.Nil(t, rEvicted)
assert.ErrorIs(t, err, ErrEvicted)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
defer cancel()
_, err = r1.Next(ctx)
assert.ErrorIs(t, err, context.DeadlineExceeded)
ctx, cancel = context.WithTimeout(context.Background(), 20*time.Millisecond)
defer cancel()
_, err = r2.Next(ctx)
assert.ErrorIs(t, err, context.DeadlineExceeded)
wb.Append(nil, createTimeTickMessage(timeticks[len(timeticks)-1]+1))
msg, err = r1.Next(ctx)
assert.Equal(t, message.MessageTypeTimeTick, msg.MessageType())
assert.NoError(t, err)
msg, err = r2.Next(ctx)
assert.Equal(t, message.MessageTypeTimeTick, msg.MessageType())
assert.NoError(t, err)
}
func TestWriteAheadBufferEviction(t *testing.T) {
wb := NewWirteAheadBuffer(log.With(), 5*1024*1024, 50*time.Millisecond, createTimeTickMessage(0))
msgs := make([]message.ImmutableMessage, 0)
for i := 1; i < 100; i++ {
msgs = append(msgs, createInsertMessage(uint64(i)))
}
wb.Append(msgs, createTimeTickMessage(99))
// We can read from 0 to 100 messages
r, err := wb.ReadFromExclusiveTimeTick(context.Background(), 0)
assert.NoError(t, err)
assert.NotNil(t, r)
msg, err := r.Next(context.Background())
assert.NoError(t, err)
assert.Equal(t, msg.TimeTick(), uint64(1))
msgs = make([]message.ImmutableMessage, 0)
for i := 100; i < 200; i++ {
msgs = append(msgs, createInsertMessage(uint64(i)))
}
wb.Append(msgs, createTimeTickMessage(199))
time.Sleep(60 * time.Millisecond)
wb.Append(nil, createTimeTickMessage(200))
// wait for expiration.
lastTimeTick := uint64(0)
for {
msg, err := r.Next(context.Background())
if err != nil {
assert.ErrorIs(t, err, ErrEvicted)
break
}
if msg.MessageType() == message.MessageTypeTimeTick {
assert.GreaterOrEqual(t, msg.TimeTick(), lastTimeTick)
} else {
assert.Greater(t, msg.TimeTick(), lastTimeTick)
}
lastTimeTick = msg.TimeTick()
}
assert.Equal(t, uint64(99), lastTimeTick)
}
func createTimeTickMessage(timetick uint64) message.ImmutableMessage {
msg, err := message.NewTimeTickMessageBuilderV1().
WithAllVChannel().
WithHeader(&message.TimeTickMessageHeader{}).
WithBody(&msgpb.TimeTickMsg{}).
BuildMutable()
if err != nil {
panic(err)
}
return msg.WithTimeTick(timetick).IntoImmutableMessage(
walimplstest.NewTestMessageID(1),
)
}
func createInsertMessage(timetick uint64) message.ImmutableMessage {
msg, err := message.NewInsertMessageBuilderV1().
WithVChannel("vchannel").
WithHeader(&message.InsertMessageHeader{}).
WithBody(&msgpb.InsertRequest{}).
BuildMutable()
if err != nil {
panic(err)
}
return msg.WithTimeTick(timetick).IntoImmutableMessage(
walimplstest.NewTestMessageID(1),
)
}

View File

@ -82,21 +82,17 @@ func (m *TimeTickMetrics) CountSyncTimeTick(isSync bool) {
m.mu.Unlock()
}
func (m *TimeTickMetrics) CountMemoryTimeTickSync(ts uint64) {
func (m *TimeTickMetrics) CountTimeTickSync(ts uint64, persist bool) {
if !m.mu.LockIfNotClosed() {
return
}
m.nonPersistentTimeTickSyncCounter.Inc()
m.nonPersistentTimeTickSync.Set(tsoutil.PhysicalTimeSeconds(ts))
m.mu.Unlock()
}
func (m *TimeTickMetrics) CountPersistentTimeTickSync(ts uint64) {
if !m.mu.LockIfNotClosed() {
return
if persist {
m.persistentTimeTickSyncCounter.Inc()
m.persistentTimeTickSync.Set(tsoutil.PhysicalTimeSeconds(ts))
} else {
m.nonPersistentTimeTickSyncCounter.Inc()
m.nonPersistentTimeTickSync.Set(tsoutil.PhysicalTimeSeconds(ts))
}
m.persistentTimeTickSyncCounter.Inc()
m.persistentTimeTickSync.Set(tsoutil.PhysicalTimeSeconds(ts))
m.mu.Unlock()
}

View File

@ -0,0 +1,84 @@
package utility
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/anypb"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
)
func TestWithNotPersisted(t *testing.T) {
ctx := context.Background()
hint := &NotPersistedHint{MessageID: walimplstest.NewTestMessageID(1)}
ctx = WithNotPersisted(ctx, hint)
retrievedHint := GetNotPersisted(ctx)
assert.NotNil(t, retrievedHint)
assert.True(t, retrievedHint.MessageID.EQ(hint.MessageID))
}
func TestWithExtraAppendResult(t *testing.T) {
ctx := context.Background()
extra := &anypb.Any{}
txnCtx := &message.TxnContext{
TxnID: 1,
}
result := &ExtraAppendResult{TimeTick: 123, TxnCtx: txnCtx, Extra: extra}
ctx = WithExtraAppendResult(ctx, result)
retrievedResult := ctx.Value(extraAppendResultValue).(*ExtraAppendResult)
assert.NotNil(t, retrievedResult)
assert.Equal(t, uint64(123), retrievedResult.TimeTick)
assert.Equal(t, txnCtx.TxnID, retrievedResult.TxnCtx.TxnID)
assert.Equal(t, extra, retrievedResult.Extra)
}
func TestModifyAppendResultExtra(t *testing.T) {
ctx := context.Background()
extra := &anypb.Any{}
result := &ExtraAppendResult{Extra: extra}
ctx = WithExtraAppendResult(ctx, result)
modifier := func(old *anypb.Any) *anypb.Any {
return &anypb.Any{TypeUrl: "modified"}
}
ModifyAppendResultExtra(ctx, modifier)
retrievedResult := ctx.Value(extraAppendResultValue).(*ExtraAppendResult)
assert.Equal(t, retrievedResult.Extra.(*anypb.Any).TypeUrl, "modified")
ModifyAppendResultExtra(ctx, func(old *anypb.Any) *anypb.Any {
return nil
})
retrievedResult = ctx.Value(extraAppendResultValue).(*ExtraAppendResult)
assert.Nil(t, retrievedResult.Extra)
}
func TestReplaceAppendResultTimeTick(t *testing.T) {
ctx := context.Background()
result := &ExtraAppendResult{TimeTick: 1}
ctx = WithExtraAppendResult(ctx, result)
ReplaceAppendResultTimeTick(ctx, 2)
retrievedResult := ctx.Value(extraAppendResultValue).(*ExtraAppendResult)
assert.Equal(t, retrievedResult.TimeTick, uint64(2))
}
func TestReplaceAppendResultTxnContext(t *testing.T) {
ctx := context.Background()
txnCtx := &message.TxnContext{}
result := &ExtraAppendResult{TxnCtx: txnCtx}
ctx = WithExtraAppendResult(ctx, result)
newTxnCtx := &message.TxnContext{TxnID: 2}
ReplaceAppendResultTxnContext(ctx, newTxnCtx)
retrievedResult := ctx.Value(extraAppendResultValue).(*ExtraAppendResult)
assert.Equal(t, retrievedResult.TxnCtx.TxnID, newTxnCtx.TxnID)
}

View File

@ -0,0 +1,37 @@
package utility
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
)
func TestPendingQueue(t *testing.T) {
pq := NewPendingQueue()
// Test initial state
assert.Equal(t, 0, pq.Bytes())
// Test AddOne
msg1 := mock_message.NewMockImmutableMessage(t)
msg1.EXPECT().EstimateSize().Return(1)
pq.AddOne(msg1)
assert.Equal(t, msg1.EstimateSize(), pq.Bytes())
// Test Add
msg2 := mock_message.NewMockImmutableMessage(t)
msg2.EXPECT().EstimateSize().Return(2)
msg3 := mock_message.NewMockImmutableMessage(t)
msg3.EXPECT().EstimateSize().Return(3)
pq.Add([]message.ImmutableMessage{msg2, msg3})
expectedBytes := msg1.EstimateSize() + msg2.EstimateSize() + msg3.EstimateSize()
assert.Equal(t, expectedBytes, pq.Bytes())
// Test UnsafeAdvance
pq.UnsafeAdvance()
expectedBytes -= msg1.EstimateSize()
assert.Equal(t, expectedBytes, pq.Bytes())
}

View File

@ -7,8 +7,14 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var ErrTimeTickVoilation = errors.New("time tick violation")
// ReOrderByTimeTickBuffer is a buffer that stores messages and pops them in order of time tick.
type ReOrderByTimeTickBuffer struct {
messageIDs typeutil.Set[string] // After enabling write ahead buffer, we has two stream to consume,
// write ahead buffer works with the timetick order, but the walscannerimpl works with the message order.
// so repeated message may generate when the swithing between the two stream.
// The deduplicate is used to avoid the repeated message.
messageHeap typeutil.Heap[message.ImmutableMessage]
lastPopTimeTick uint64
bytes int
@ -17,6 +23,7 @@ type ReOrderByTimeTickBuffer struct {
// NewReOrderBuffer creates a new ReOrderBuffer.
func NewReOrderBuffer() *ReOrderByTimeTickBuffer {
return &ReOrderByTimeTickBuffer{
messageIDs: typeutil.NewSet[string](),
messageHeap: typeutil.NewHeap[message.ImmutableMessage](&immutableMessageHeap{}),
}
}
@ -26,9 +33,14 @@ func (r *ReOrderByTimeTickBuffer) Push(msg message.ImmutableMessage) error {
// !!! Drop the unexpected broken timetick rule message.
// It will be enabled until the first timetick coming.
if msg.TimeTick() < r.lastPopTimeTick {
return errors.Errorf("message time tick is less than last pop time tick: %d", r.lastPopTimeTick)
return errors.Wrapf(ErrTimeTickVoilation, "message time tick is less than last pop time tick: %d", r.lastPopTimeTick)
}
msgID := msg.MessageID().Marshal()
if r.messageIDs.Contain(msgID) {
return errors.Errorf("message is duplicated: %s", msgID)
}
r.messageHeap.Push(msg)
r.messageIDs.Insert(msgID)
r.bytes += msg.EstimateSize()
return nil
}
@ -39,6 +51,7 @@ func (r *ReOrderByTimeTickBuffer) PopUtilTimeTick(timetick uint64) []message.Imm
var res []message.ImmutableMessage
for r.messageHeap.Len() > 0 && r.messageHeap.Peek().TimeTick() <= timetick {
r.bytes -= r.messageHeap.Peek().EstimateSize()
r.messageIDs.Remove(r.messageHeap.Peek().MessageID().Marshal())
res = append(res, r.messageHeap.Pop())
}
r.lastPopTimeTick = timetick

View File

@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
)
func TestReOrderByTimeTickBuffer(t *testing.T) {
@ -15,6 +16,7 @@ func TestReOrderByTimeTickBuffer(t *testing.T) {
for i, timetick := range timeticks {
msg := mock_message.NewMockImmutableMessage(t)
msg.EXPECT().EstimateSize().Return(1)
msg.EXPECT().MessageID().Return(walimplstest.NewTestMessageID(int64(i)))
msg.EXPECT().TimeTick().Return(uint64(timetick + 1))
buf.Push(msg)
assert.Equal(t, i+1, buf.Len())

View File

@ -21,13 +21,11 @@ func (h ChanMessageHandler) Handle(param message.HandleParam) message.HandleResu
return message.HandleResult{Error: param.Ctx.Err()}
case msg, ok := <-param.Upstream:
if !ok {
return message.HandleResult{Error: message.ErrUpstreamClosed}
panic("unreachable code: upstream should never closed")
}
return message.HandleResult{Incoming: msg}
case sendingCh <- param.Message:
return message.HandleResult{MessageHandled: true}
case <-param.TimeTickChan:
return message.HandleResult{TimeTickUpdated: true}
}
}
@ -74,12 +72,9 @@ func (m *MsgPackAdaptorHandler) Handle(param message.HandleParam) message.Handle
MessageHandled: messageHandled,
Error: param.Ctx.Err(),
}
case msg, notClose := <-param.Upstream:
if !notClose {
return message.HandleResult{
MessageHandled: messageHandled,
Error: message.ErrUpstreamClosed,
}
case msg, ok := <-param.Upstream:
if !ok {
panic("unreachable code: upstream should never closed")
}
return message.HandleResult{
Incoming: msg,
@ -91,11 +86,6 @@ func (m *MsgPackAdaptorHandler) Handle(param message.HandleParam) message.Handle
continue
}
return message.HandleResult{MessageHandled: messageHandled}
case <-param.TimeTickChan:
return message.HandleResult{
MessageHandled: messageHandled,
TimeTickUpdated: true,
}
}
}
}

View File

@ -294,16 +294,16 @@ func newImmutableTxnMesasgeFromWAL(
return nil, err
}
// we don't need to modify the begin message's timetick, but set all the timetick of body messages.
for _, m := range body {
m.(*immutableMessageImpl).overwriteTimeTick(commit.TimeTick())
m.(*immutableMessageImpl).overwriteLastConfirmedMessageID(commit.LastConfirmedMessageID())
for idx, m := range body {
body[idx] = m.(*immutableMessageImpl).cloneForTxnBody(commit.TimeTick(), commit.LastConfirmedMessageID())
}
immutableMsg := msg.WithTimeTick(commit.TimeTick()).
immutableMessage := msg.WithTimeTick(commit.TimeTick()).
WithLastConfirmed(commit.LastConfirmedMessageID()).
WithTxnContext(*commit.TxnContext()).
IntoImmutableMessage(commit.MessageID())
return &immutableTxnMessageImpl{
immutableMessageImpl: *immutableMsg.(*immutableMessageImpl),
immutableMessageImpl: *immutableMessage.(*immutableMessageImpl),
begin: begin,
messages: body,
commit: commit,

View File

@ -2,26 +2,20 @@ package message
import (
"context"
"github.com/cockroachdb/errors"
)
var ErrUpstreamClosed = errors.New("upstream closed")
// HandleParam is the parameter for handler.
type HandleParam struct {
Ctx context.Context
Upstream <-chan ImmutableMessage
Message ImmutableMessage
TimeTickChan <-chan struct{}
Ctx context.Context
Upstream <-chan ImmutableMessage
Message ImmutableMessage
}
// HandleResult is the result of handler.
type HandleResult struct {
Incoming ImmutableMessage // Not nil if upstream return new message.
MessageHandled bool // True if Message is handled successfully.
TimeTickUpdated bool // True if TimeTickChan is triggered.
Error error // Error is context is canceled.
Incoming ImmutableMessage // Not nil if upstream return new message.
MessageHandled bool // True if Message is handled successfully.
Error error // Error is context is canceled.
}
// Handler is used to handle message read from log.

View File

@ -113,9 +113,14 @@ func (m *messageImpl) WithBroadcastID(id uint64) BroadcastMutableMessage {
// IntoImmutableMessage converts current message to immutable message.
func (m *messageImpl) IntoImmutableMessage(id MessageID) ImmutableMessage {
// payload and id is always immutable, so we only clone the prop here is ok.
prop := m.properties.Clone()
return &immutableMessageImpl{
messageImpl: *m,
id: id,
id: id,
messageImpl: messageImpl{
payload: m.payload,
properties: prop,
},
}
}
@ -260,6 +265,26 @@ func (m *immutableMessageImpl) LastConfirmedMessageID() MessageID {
return id
}
// cloneForTxnBody clone the message and update timetick and last confirmed message id.
func (m *immutableMessageImpl) cloneForTxnBody(timetick uint64, LastConfirmedMessageID MessageID) *immutableMessageImpl {
newMsg := m.clone()
newMsg.overwriteTimeTick(timetick)
newMsg.overwriteLastConfirmedMessageID(LastConfirmedMessageID)
return newMsg
}
// clone clones the current message.
func (m *immutableMessageImpl) clone() *immutableMessageImpl {
// payload and message id is always immutable, so we only clone the prop here is ok.
return &immutableMessageImpl{
id: m.id,
messageImpl: messageImpl{
payload: m.payload,
properties: m.properties.Clone(),
},
}
}
// overwriteTimeTick overwrites the time tick of current message.
func (m *immutableMessageImpl) overwriteTimeTick(timetick uint64) {
m.properties.Delete(messageTimeTick)

View File

@ -65,6 +65,14 @@ func (prop propertiesImpl) ToRawMap() map[string]string {
return map[string]string(prop)
}
func (prop propertiesImpl) Clone() propertiesImpl {
cloned := make(map[string]string, len(prop))
for k, v := range prop {
cloned[k] = v
}
return cloned
}
// EstimateSize returns the estimated size of properties.
func (prop propertiesImpl) EstimateSize() int {
size := 0

View File

@ -28,3 +28,7 @@ func (v Version) String() string {
func (v Version) GT(v2 Version) bool {
return v > v2
}
func (v Version) EQ(v2 Version) bool {
return v == v2
}

View File

@ -22,7 +22,7 @@ func newScanner(
ScannerHelper: helper.NewScannerHelper(scannerName),
exclude: exclude,
consumer: consumer,
msgChannel: make(chan message.ImmutableMessage, 1),
msgChannel: make(chan message.ImmutableMessage),
}
go s.executeConsume()
return s
@ -50,27 +50,30 @@ func (s *scannerImpl) Close() error {
}
// executeConsume consumes the message from the consumer.
func (s *scannerImpl) executeConsume() {
defer close(s.msgChannel)
func (s *scannerImpl) executeConsume() (err error) {
defer func() {
s.Finish(err)
close(s.msgChannel)
}()
for {
select {
case <-s.Context().Done():
s.Finish(nil)
return
return nil
case msg, ok := <-s.consumer.Chan():
if !ok {
s.Finish(errors.New("mq consumer unexpected channel closed"))
return
return errors.New("mq consumer unexpected channel closed")
}
msgID := rmqID(msg.ID().(*server.RmqID).MessageID)
// record the last message id to avoid repeated consume message.
// and exclude message id should be filterred.
if s.exclude == nil || !s.exclude.EQ(msgID) {
s.msgChannel <- message.NewImmutableMesasge(
msgID,
msg.Payload(),
msg.Properties(),
)
msg := message.NewImmutableMesasge(msgID, msg.Payload(), msg.Properties())
select {
case <-s.Context().Done():
return nil
case s.msgChannel <- msg:
}
}
}
}

View File

@ -30,14 +30,20 @@ type scannerImpls struct {
}
func (s *scannerImpls) executeConsume() {
defer close(s.ch)
defer func() {
close(s.ch)
s.Finish(nil)
}()
for {
msg, err := s.datas.ReadAt(s.Context(), s.offset)
if err != nil {
s.Finish(nil)
return
}
s.ch <- msg
select {
case <-s.Context().Done():
return
case s.ch <- msg:
}
s.offset++
}
}

View File

@ -23,6 +23,8 @@ type ScannerImpls interface {
Name() string
// Chan returns the channel of message.
// If the scanner is failure, the channel will be closed.
// And an error will be returned by Error().
Chan() <-chan message.ImmutableMessage
// Error returns the error of scanner failed.

View File

@ -4895,6 +4895,10 @@ type streamingConfig struct {
// txn
TxnDefaultKeepaliveTimeout ParamItem `refreshable:"true"`
// write ahead buffer
WALWriteAheadBufferCapacity ParamItem `refreshable:"true"`
WALWriteAheadBufferKeepalive ParamItem `refreshable:"true"`
}
func (p *streamingConfig) init(base *BaseTable) {
@ -4944,6 +4948,23 @@ It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDura
Export: true,
}
p.TxnDefaultKeepaliveTimeout.Init(base.mgr)
p.WALWriteAheadBufferCapacity = ParamItem{
Key: "streaming.walWriteAheadBuffer.capacity",
Version: "2.6.0",
Doc: "The capacity of write ahead buffer of each wal, 64M by default",
DefaultValue: "64m",
Export: true,
}
p.WALWriteAheadBufferCapacity.Init(base.mgr)
p.WALWriteAheadBufferKeepalive = ParamItem{
Key: "streaming.walWriteAheadBuffer.keepalive",
Version: "2.6.0",
Doc: "The keepalive duration for entries in write ahead buffer of each wal, 30s by default",
DefaultValue: "30s",
Export: true,
}
p.WALWriteAheadBufferKeepalive.Init(base.mgr)
}
// runtimeConfig is just a private environment value table.

View File

@ -618,16 +618,22 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 2.0, params.StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat())
assert.Equal(t, 1.0, params.StreamingCfg.WALBroadcasterConcurrencyRatio.GetAsFloat())
assert.Equal(t, 10*time.Second, params.StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse())
assert.Equal(t, 30*time.Second, params.StreamingCfg.WALWriteAheadBufferKeepalive.GetAsDurationByParse())
assert.Equal(t, int64(64*1024*1024), params.StreamingCfg.WALWriteAheadBufferCapacity.GetAsSize())
params.Save(params.StreamingCfg.WALBalancerTriggerInterval.Key, "50s")
params.Save(params.StreamingCfg.WALBalancerBackoffInitialInterval.Key, "50s")
params.Save(params.StreamingCfg.WALBalancerBackoffMultiplier.Key, "3.5")
params.Save(params.StreamingCfg.WALBroadcasterConcurrencyRatio.Key, "1.5")
params.Save(params.StreamingCfg.TxnDefaultKeepaliveTimeout.Key, "3500ms")
params.Save(params.StreamingCfg.WALWriteAheadBufferKeepalive.Key, "10s")
params.Save(params.StreamingCfg.WALWriteAheadBufferCapacity.Key, "128k")
assert.Equal(t, 50*time.Second, params.StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse())
assert.Equal(t, 50*time.Second, params.StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse())
assert.Equal(t, 3.5, params.StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat())
assert.Equal(t, 1.5, params.StreamingCfg.WALBroadcasterConcurrencyRatio.GetAsFloat())
assert.Equal(t, 3500*time.Millisecond, params.StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse())
assert.Equal(t, 10*time.Second, params.StreamingCfg.WALWriteAheadBufferKeepalive.GetAsDurationByParse())
assert.Equal(t, int64(128*1024), params.StreamingCfg.WALWriteAheadBufferCapacity.GetAsSize())
})
t.Run("channel config priority", func(t *testing.T) {