From d2bc4a53be14c5a6f5afdd3603872421749d2ca5 Mon Sep 17 00:00:00 2001 From: chyezh Date: Thu, 27 Jun 2024 15:11:05 +0800 Subject: [PATCH] enhance: implement rmq and pulsar as wal (#34046) issue: #33285 - use reader but not consumer for pulsar - advanced test framework - move some streaming related package into pkg --------- Signed-off-by: chyezh --- .../server/mock_wal/mock_Scanner.go | 2 +- .../streamingnode/server/mock_wal/mock_WAL.go | 2 +- internal/proto/streaming.proto | 7 +- internal/streamingnode/server/wal/RAEDME.md | 26 ++- .../server/wal/adaptor/builder.go | 2 +- internal/streamingnode/server/wal/builder.go | 2 +- .../server/wal/helper/wal_helper_test.go | 24 --- .../server/wal/registry/registry.go | 26 +-- internal/streamingnode/server/wal/scanner.go | 4 +- internal/streamingnode/server/wal/wal.go | 7 +- .../server/wal/walimplstest/wal.go | 52 ------ internal/streamingservice/.mockery.yaml | 17 +- .../util/streamingutil/options/deliver.go | 45 ----- pkg/Makefile | 5 +- pkg/go.mod | 1 + pkg/go.sum | 2 + .../mock_walimpls/mock_Interceptor.go | 2 +- .../mock_walimpls/mock_InterceptorBuilder.go | 2 +- .../mock_InterceptorWithReady.go | 2 +- .../mock_walimpls/mock_OpenerBuilderImpls.go | 2 +- .../mock_walimpls/mock_OpenerImpls.go | 2 +- .../mock_walimpls/mock_ScannerImpls.go | 2 +- .../streaming}/mock_walimpls/mock_WALImpls.go | 18 +- .../mock_message/mock_ImmutableMessage.go | 2 +- .../util}/mock_message/mock_MessageID.go | 2 +- .../util}/mock_message/mock_MutableMessage.go | 2 +- .../util}/mock_message/mock_RProperties.go | 0 pkg/mq/mqimpl/rocksmq/client/client_impl.go | 22 +-- pkg/streaming/.mockery.yaml | 22 +++ .../streaming/util}/message/builder.go | 0 .../streaming/util}/message/message.go | 0 .../util}/message/message_builder_test.go | 4 +- .../util}/message/message_handler.go | 0 .../util}/message/message_handler_test.go | 0 .../streaming/util}/message/message_id.go | 0 .../util}/message/message_id_test.go | 4 +- .../streaming/util}/message/message_impl.go | 0 .../streaming/util}/message/message_test.go | 0 .../streaming/util}/message/message_type.go | 0 .../streaming/util}/message/properties.go | 0 .../streaming/util}/message/version.go | 0 pkg/streaming/util/options/deliver.go | 87 +++++++++ pkg/streaming/util/types/pchannel_info.go | 8 + .../wal => pkg/streaming}/walimpls/builder.go | 0 .../walimpls}/helper/scanner_helper.go | 0 .../walimpls}/helper/scanner_helper_test.go | 0 .../streaming/walimpls}/helper/wal_helper.go | 8 +- .../walimpls/helper/wal_helper_test.go | 23 +++ .../walimpls/impls/pulsar/builder.go | 67 +++++++ .../walimpls/impls/pulsar/message_id.go | 66 +++++++ .../walimpls/impls/pulsar/message_id_test.go | 75 ++++++++ pkg/streaming/walimpls/impls/pulsar/opener.go | 38 ++++ .../walimpls/impls/pulsar/pulsar_test.go | 32 ++++ .../walimpls/impls/pulsar/scanner.go | 73 ++++++++ pkg/streaming/walimpls/impls/pulsar/wal.go | 65 +++++++ pkg/streaming/walimpls/impls/rmq/builder.go | 41 +++++ .../walimpls/impls/rmq/message_id.go | 59 ++++++ .../walimpls/impls/rmq/message_id_test.go | 25 +++ pkg/streaming/walimpls/impls/rmq/opener.go | 36 ++++ pkg/streaming/walimpls/impls/rmq/rmq_test.go | 39 ++++ pkg/streaming/walimpls/impls/rmq/scanner.go | 77 ++++++++ pkg/streaming/walimpls/impls/rmq/wal.go | 95 ++++++++++ .../walimpls/impls}/walimplstest/builder.go | 6 +- .../impls}/walimplstest/message_id.go | 2 +- .../impls}/walimplstest/message_log.go | 2 +- .../walimpls/impls}/walimplstest/opener.go | 6 +- .../walimpls/impls}/walimplstest/scanner.go | 6 +- .../walimpls/impls/walimplstest/wal.go | 44 +++++ .../walimpls/impls}/walimplstest/wal_test.go | 2 +- .../streaming}/walimpls/interceptor.go | 2 +- .../wal => pkg/streaming}/walimpls/opener.go | 4 +- pkg/streaming/walimpls/registry/registry.go | 30 +++ .../streaming/walimpls}/registry/wal_test.go | 2 +- .../wal => pkg/streaming}/walimpls/scanner.go | 13 +- .../streaming}/walimpls/test_framework.go | 174 +++++++++++++----- .../wal => pkg/streaming}/walimpls/wal.go | 6 +- 76 files changed, 1234 insertions(+), 291 deletions(-) delete mode 100644 internal/streamingnode/server/wal/helper/wal_helper_test.go delete mode 100644 internal/streamingnode/server/wal/walimplstest/wal.go delete mode 100644 internal/util/streamingutil/options/deliver.go rename {internal/mocks/streamingnode/server/wal => pkg/mocks/streaming}/mock_walimpls/mock_Interceptor.go (98%) rename {internal/mocks/streamingnode/server/wal => pkg/mocks/streaming}/mock_walimpls/mock_InterceptorBuilder.go (96%) rename {internal/mocks/streamingnode/server/wal => pkg/mocks/streaming}/mock_walimpls/mock_InterceptorWithReady.go (98%) rename {internal/mocks/streamingnode/server/wal => pkg/mocks/streaming}/mock_walimpls/mock_OpenerBuilderImpls.go (97%) rename {internal/mocks/streamingnode/server/wal => pkg/mocks/streaming}/mock_walimpls/mock_OpenerImpls.go (97%) rename {internal/mocks/streamingnode/server/wal => pkg/mocks/streaming}/mock_walimpls/mock_ScannerImpls.go (98%) rename {internal/mocks/streamingnode/server/wal => pkg/mocks/streaming}/mock_walimpls/mock_WALImpls.go (90%) rename {internal/mocks/util/streamingutil => pkg/mocks/streaming/util}/mock_message/mock_ImmutableMessage.go (99%) rename {internal/mocks/util/streamingutil => pkg/mocks/streaming/util}/mock_message/mock_MessageID.go (98%) rename {internal/mocks/util/streamingutil => pkg/mocks/streaming/util}/mock_message/mock_MutableMessage.go (99%) rename {internal/mocks/util/streamingutil => pkg/mocks/streaming/util}/mock_message/mock_RProperties.go (100%) create mode 100644 pkg/streaming/.mockery.yaml rename {internal/util/streamingutil => pkg/streaming/util}/message/builder.go (100%) rename {internal/util/streamingutil => pkg/streaming/util}/message/message.go (100%) rename {internal/util/streamingutil => pkg/streaming/util}/message/message_builder_test.go (95%) rename {internal/util/streamingutil => pkg/streaming/util}/message/message_handler.go (100%) rename {internal/util/streamingutil => pkg/streaming/util}/message/message_handler_test.go (100%) rename {internal/util/streamingutil => pkg/streaming/util}/message/message_id.go (100%) rename {internal/util/streamingutil => pkg/streaming/util}/message/message_id_test.go (86%) rename {internal/util/streamingutil => pkg/streaming/util}/message/message_impl.go (100%) rename {internal/util/streamingutil => pkg/streaming/util}/message/message_test.go (100%) rename {internal/util/streamingutil => pkg/streaming/util}/message/message_type.go (100%) rename {internal/util/streamingutil => pkg/streaming/util}/message/properties.go (100%) rename {internal/util/streamingutil => pkg/streaming/util}/message/version.go (100%) create mode 100644 pkg/streaming/util/options/deliver.go create mode 100644 pkg/streaming/util/types/pchannel_info.go rename {internal/streamingnode/server/wal => pkg/streaming}/walimpls/builder.go (100%) rename {internal/streamingnode/server/wal => pkg/streaming/walimpls}/helper/scanner_helper.go (100%) rename {internal/streamingnode/server/wal => pkg/streaming/walimpls}/helper/scanner_helper_test.go (100%) rename {internal/streamingnode/server/wal => pkg/streaming/walimpls}/helper/wal_helper.go (70%) create mode 100644 pkg/streaming/walimpls/helper/wal_helper_test.go create mode 100644 pkg/streaming/walimpls/impls/pulsar/builder.go create mode 100644 pkg/streaming/walimpls/impls/pulsar/message_id.go create mode 100644 pkg/streaming/walimpls/impls/pulsar/message_id_test.go create mode 100644 pkg/streaming/walimpls/impls/pulsar/opener.go create mode 100644 pkg/streaming/walimpls/impls/pulsar/pulsar_test.go create mode 100644 pkg/streaming/walimpls/impls/pulsar/scanner.go create mode 100644 pkg/streaming/walimpls/impls/pulsar/wal.go create mode 100644 pkg/streaming/walimpls/impls/rmq/builder.go create mode 100644 pkg/streaming/walimpls/impls/rmq/message_id.go create mode 100644 pkg/streaming/walimpls/impls/rmq/message_id_test.go create mode 100644 pkg/streaming/walimpls/impls/rmq/opener.go create mode 100644 pkg/streaming/walimpls/impls/rmq/rmq_test.go create mode 100644 pkg/streaming/walimpls/impls/rmq/scanner.go create mode 100644 pkg/streaming/walimpls/impls/rmq/wal.go rename {internal/streamingnode/server/wal => pkg/streaming/walimpls/impls}/walimplstest/builder.go (69%) rename {internal/streamingnode/server/wal => pkg/streaming/walimpls/impls}/walimplstest/message_id.go (95%) rename {internal/streamingnode/server/wal => pkg/streaming/walimpls/impls}/walimplstest/message_log.go (95%) rename {internal/streamingnode/server/wal => pkg/streaming/walimpls/impls}/walimplstest/opener.go (64%) rename {internal/streamingnode/server/wal => pkg/streaming/walimpls/impls}/walimplstest/scanner.go (80%) create mode 100644 pkg/streaming/walimpls/impls/walimplstest/wal.go rename {internal/streamingnode/server/wal => pkg/streaming/walimpls/impls}/walimplstest/wal_test.go (67%) rename {internal/streamingnode/server/wal => pkg/streaming}/walimpls/interceptor.go (96%) rename {internal/streamingnode/server/wal => pkg/streaming}/walimpls/opener.go (76%) create mode 100644 pkg/streaming/walimpls/registry/registry.go rename {internal/streamingnode/server/wal => pkg/streaming/walimpls}/registry/wal_test.go (91%) rename {internal/streamingnode/server/wal => pkg/streaming}/walimpls/scanner.go (55%) rename {internal/streamingnode/server/wal => pkg/streaming}/walimpls/test_framework.go (53%) rename {internal/streamingnode/server/wal => pkg/streaming}/walimpls/wal.go (73%) diff --git a/internal/mocks/streamingnode/server/mock_wal/mock_Scanner.go b/internal/mocks/streamingnode/server/mock_wal/mock_Scanner.go index 1ea03c53cc..be26d70e76 100644 --- a/internal/mocks/streamingnode/server/mock_wal/mock_Scanner.go +++ b/internal/mocks/streamingnode/server/mock_wal/mock_Scanner.go @@ -3,7 +3,7 @@ package mock_wal import ( - message "github.com/milvus-io/milvus/internal/util/streamingutil/message" + message "github.com/milvus-io/milvus/pkg/streaming/util/message" mock "github.com/stretchr/testify/mock" ) diff --git a/internal/mocks/streamingnode/server/mock_wal/mock_WAL.go b/internal/mocks/streamingnode/server/mock_wal/mock_WAL.go index b2f2f49517..2f7a250d35 100644 --- a/internal/mocks/streamingnode/server/mock_wal/mock_WAL.go +++ b/internal/mocks/streamingnode/server/mock_wal/mock_WAL.go @@ -5,7 +5,7 @@ package mock_wal import ( context "context" - message "github.com/milvus-io/milvus/internal/util/streamingutil/message" + message "github.com/milvus-io/milvus/pkg/streaming/util/message" mock "github.com/stretchr/testify/mock" streamingpb "github.com/milvus-io/milvus/internal/proto/streamingpb" diff --git a/internal/proto/streaming.proto b/internal/proto/streaming.proto index 8be9a14796..5ad393b254 100644 --- a/internal/proto/streaming.proto +++ b/internal/proto/streaming.proto @@ -26,8 +26,7 @@ message Message { message PChannelInfo { string name = 1; // channel name int64 term = 2; // A monotonic increasing term, every time the channel is recovered or moved to another streamingnode, the term will increase by meta server. - int64 serverID = 3; // The log node id address of the channel. - repeated VChannelInfo vChannelInfos = 4; // PChannel related vchannels. + int64 server_id = 3; // The log node id address of the channel. } // VChannelInfo is the information of a vchannel info. @@ -39,7 +38,7 @@ message DeliverPolicy { oneof policy { google.protobuf.Empty all = 1; // deliver all messages. google.protobuf.Empty latest = 2; // deliver the latest message. - MessageID startFrom = 3; // deliver message from this message id. [startFrom, ...] - MessageID startAfter = 4; // deliver message after this message id. (startAfter, ...] + MessageID start_from = 3; // deliver message from this message id. [startFrom, ...] + MessageID start_after = 4; // deliver message after this message id. (startAfter, ...] } } diff --git a/internal/streamingnode/server/wal/RAEDME.md b/internal/streamingnode/server/wal/RAEDME.md index 53a94dc0f8..6ca5247849 100644 --- a/internal/streamingnode/server/wal/RAEDME.md +++ b/internal/streamingnode/server/wal/RAEDME.md @@ -1,15 +1,19 @@ # WAL `wal` package is the basic defination of wal interface of milvus streamingnode. +`wal` use `github.com/milvus-io/milvus/pkg/streaming/walimpls` to implement the final wal service. ## Project arrangement -- `/`: only define exposed interfaces. -- `/walimpls/`: define the underlying message system interfaces need to be implemented. -- `/registry/`: A static lifetime registry to regsiter new implementation for inverting dependency. -- `/adaptor/`: adaptors to implement `wal` interface from `walimpls` interface -- `/helper/`: A utility used to help developer to implement `walimpls` conveniently. -- `/utility/`: A utility code for common logic or data structure. +- `wal` + - `/`: only define exposed interfaces. + - `/adaptor/`: adaptors to implement `wal` interface from `walimpls` interface + - `/utility/`: A utility code for common logic or data structure. +- `github.com/milvus-io/milvus/pkg/streaming/walimpls` + - `/`: define the underlying message system interfaces need to be implemented. + - `/registry/`: A static lifetime registry to regsiter new implementation for inverting dependency. + - `/helper/`: A utility used to help developer to implement `walimpls` conveniently. + - `/impls/`: A official implemented walimpls sets. ## Lifetime Of Interfaces @@ -20,7 +24,7 @@ ## Add New Implemetation Of WAL -developper who want to add a new implementation of `wal` should implements the `walimpls` package interfaces. following interfaces is required: +developper who want to add a new implementation of `wal` should implements the `github.com/milvus-io/milvus/pkg/streaming/walimpls` package interfaces. following interfaces is required: - `walimpls.OpenerBuilderImpls` - `walimpls.OpenerImpls` @@ -28,9 +32,11 @@ developper who want to add a new implementation of `wal` should implements the ` - `walimpls.WALImpls` `OpenerBuilderImpls` create `OpenerImpls`; `OpenerImpls` creates `WALImpls`; `WALImpls` create `ScannerImpls`. -Then register the implmentation of `walimpls.OpenerBuilderImpls` into `registry` package. +Then register the implmentation of `walimpls.OpenerBuilderImpls` into `github.com/milvus-io/milvus/pkg/streaming/walimpls/registry` package. ``` +import "github.com/milvus-io/milvus/pkg/streaming/walimpls/registry" + var _ OpenerBuilderImpls = b{}; registry.RegisterBuilder(b{}) ``` @@ -40,8 +46,10 @@ All things have been done. ## Use WAL ``` +import "github.com/milvus-io/milvus/internal/streamingnode/server/wal/registry" + name := "your builder name" -var yourCh *streamingpb.PChannelInfo +var yourCh *options.PChannelInfo opener, err := registry.MustGetBuilder(name).Build() if err != nil { diff --git a/internal/streamingnode/server/wal/adaptor/builder.go b/internal/streamingnode/server/wal/adaptor/builder.go index 2dbfa0bbb1..ca4cf7e2b7 100644 --- a/internal/streamingnode/server/wal/adaptor/builder.go +++ b/internal/streamingnode/server/wal/adaptor/builder.go @@ -2,7 +2,7 @@ package adaptor import ( "github.com/milvus-io/milvus/internal/streamingnode/server/wal" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" ) var _ wal.OpenerBuilder = (*builderAdaptorImpl)(nil) diff --git a/internal/streamingnode/server/wal/builder.go b/internal/streamingnode/server/wal/builder.go index afb20f2238..30909f3ae8 100644 --- a/internal/streamingnode/server/wal/builder.go +++ b/internal/streamingnode/server/wal/builder.go @@ -4,7 +4,7 @@ import ( "context" "github.com/milvus-io/milvus/internal/proto/streamingpb" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" ) // OpenerBuilder is the interface for build wal opener. diff --git a/internal/streamingnode/server/wal/helper/wal_helper_test.go b/internal/streamingnode/server/wal/helper/wal_helper_test.go deleted file mode 100644 index 497bfbafef..0000000000 --- a/internal/streamingnode/server/wal/helper/wal_helper_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package helper - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/milvus-io/milvus/internal/proto/streamingpb" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" -) - -func TestWALHelper(t *testing.T) { - h := NewWALHelper(&walimpls.OpenOption{ - Channel: &streamingpb.PChannelInfo{ - Name: "test", - Term: 1, - ServerID: 1, - VChannelInfos: []*streamingpb.VChannelInfo{}, - }, - }) - assert.NotNil(t, h.Channel()) - assert.Equal(t, h.Channel().Name, "test") - assert.NotNil(t, h.Log()) -} diff --git a/internal/streamingnode/server/wal/registry/registry.go b/internal/streamingnode/server/wal/registry/registry.go index 5594e09c4a..32798228ff 100644 --- a/internal/streamingnode/server/wal/registry/registry.go +++ b/internal/streamingnode/server/wal/registry/registry.go @@ -3,31 +3,11 @@ package registry import ( "github.com/milvus-io/milvus/internal/streamingnode/server/wal" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/adaptor" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" - "github.com/milvus-io/milvus/pkg/util/typeutil" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/registry" ) -// builders is a map of registered wal builders. -var builders typeutil.ConcurrentMap[string, wal.OpenerBuilder] - -// Register registers the wal builder. -// -// NOTE: this function must only be called during initialization time (i.e. in -// an init() function), name of builder is lowercase. If multiple Builder are -// registered with the same name, panic will occur. -func RegisterBuilder(b walimpls.OpenerBuilderImpls) { - bb := adaptor.AdaptImplsToBuilder(b) - _, loaded := builders.GetOrInsert(bb.Name(), bb) - if loaded { - panic("wal builder already registered: " + b.Name()) - } -} - // MustGetBuilder returns the wal builder by name. func MustGetBuilder(name string) wal.OpenerBuilder { - b, ok := builders.Get(name) - if !ok { - panic("wal builder not found: " + name) - } - return b + b := registry.MustGetBuilder(name) + return adaptor.AdaptImplsToBuilder(b) } diff --git a/internal/streamingnode/server/wal/scanner.go b/internal/streamingnode/server/wal/scanner.go index 761b3e93b1..64545cbde9 100644 --- a/internal/streamingnode/server/wal/scanner.go +++ b/internal/streamingnode/server/wal/scanner.go @@ -1,8 +1,8 @@ package wal import ( - "github.com/milvus-io/milvus/internal/util/streamingutil/message" - "github.com/milvus-io/milvus/internal/util/streamingutil/options" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/options" ) // ReadOption is the option for reading records from the wal. diff --git a/internal/streamingnode/server/wal/wal.go b/internal/streamingnode/server/wal/wal.go index 904976b41a..b9f624c877 100644 --- a/internal/streamingnode/server/wal/wal.go +++ b/internal/streamingnode/server/wal/wal.go @@ -3,16 +3,15 @@ package wal import ( "context" - "github.com/milvus-io/milvus/internal/proto/streamingpb" - "github.com/milvus-io/milvus/internal/util/streamingutil/message" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/types" ) // WAL is the WAL framework interface. // !!! Don't implement it directly, implement walimpls.WAL instead. type WAL interface { // Channel returns the channel assignment info of the wal. - // Should be read-only. - Channel() *streamingpb.PChannelInfo + Channel() types.PChannelInfo // Append writes a record to the log. Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) diff --git a/internal/streamingnode/server/wal/walimplstest/wal.go b/internal/streamingnode/server/wal/walimplstest/wal.go deleted file mode 100644 index 47b1a887af..0000000000 --- a/internal/streamingnode/server/wal/walimplstest/wal.go +++ /dev/null @@ -1,52 +0,0 @@ -//go:build test -// +build test - -package walimplstest - -import ( - "context" - - "github.com/milvus-io/milvus/internal/proto/streamingpb" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/helper" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" - "github.com/milvus-io/milvus/internal/util/streamingutil/message" -) - -var _ walimpls.WALImpls = &walImpls{} - -type walImpls struct { - helper.WALHelper - datas *messageLog -} - -func (w *walImpls) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { - return w.datas.Append(ctx, msg) -} - -func (w *walImpls) Read(ctx context.Context, opts walimpls.ReadOption) (walimpls.ScannerImpls, error) { - offset := int64(0) - switch policy := opts.DeliverPolicy.Policy.(type) { - case *streamingpb.DeliverPolicy_All: - offset = 0 - case *streamingpb.DeliverPolicy_Latest: - offset = w.datas.Len() - case *streamingpb.DeliverPolicy_StartFrom: - id, err := unmarshalTestMessageID(policy.StartFrom.Id) - if err != nil { - return nil, err - } - offset = int64(id) - case *streamingpb.DeliverPolicy_StartAfter: - id, err := unmarshalTestMessageID(policy.StartAfter.Id) - if err != nil { - return nil, err - } - offset = int64(id + 1) - } - return newScannerImpls( - opts, w.datas, int(offset), - ), nil -} - -func (w *walImpls) Close() { -} diff --git a/internal/streamingservice/.mockery.yaml b/internal/streamingservice/.mockery.yaml index 9b4b311a99..feff9bf14f 100644 --- a/internal/streamingservice/.mockery.yaml +++ b/internal/streamingservice/.mockery.yaml @@ -5,24 +5,9 @@ dir: "internal/mocks/{{trimPrefix .PackagePath \"github.com/milvus-io/milvus/int mockname: "Mock{{.InterfaceName}}" outpkg: "mock_{{.PackageName}}" packages: - github.com/milvus-io/milvus/internal/util/streamingutil/message: - interfaces: - MessageID: - ImmutableMessage: - MutableMessage: - RProperties: github.com/milvus-io/milvus/internal/streamingnode/server/wal: interfaces: OpenerBuilder: Opener: Scanner: - WAL: - github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls: - interfaces: - OpenerBuilderImpls: - OpenerImpls: - ScannerImpls: - WALImpls: - Interceptor: - InterceptorWithReady: - InterceptorBuilder: \ No newline at end of file + WAL: \ No newline at end of file diff --git a/internal/util/streamingutil/options/deliver.go b/internal/util/streamingutil/options/deliver.go deleted file mode 100644 index 46e59524b2..0000000000 --- a/internal/util/streamingutil/options/deliver.go +++ /dev/null @@ -1,45 +0,0 @@ -package options - -import ( - "github.com/milvus-io/milvus/internal/proto/streamingpb" - "github.com/milvus-io/milvus/internal/util/streamingutil/message" -) - -const ( - deliverOrderTimetick DeliverOrder = 1 -) - -// DeliverOrder is the order of delivering messages. -type ( - DeliverOrder int - DeliverPolicy *streamingpb.DeliverPolicy -) - -// DeliverPolicyAll delivers all messages. -func DeliverPolicyAll() DeliverPolicy { - return streamingpb.NewDeliverAll() -} - -// DeliverLatest delivers the latest message. -func DeliverPolicyLatest() DeliverPolicy { - return streamingpb.NewDeliverLatest() -} - -// DeliverEarliest delivers the earliest message. -func DeliverPolicyStartFrom(messageID message.MessageID) DeliverPolicy { - return streamingpb.NewDeliverStartFrom(&streamingpb.MessageID{ - Id: messageID.Marshal(), - }) -} - -// DeliverPolicyStartAfter delivers the message after the specified message. -func DeliverPolicyStartAfter(messageID message.MessageID) DeliverPolicy { - return streamingpb.NewDeliverStartAfter(&streamingpb.MessageID{ - Id: messageID.Marshal(), - }) -} - -// DeliverOrderTimeTick delivers messages by time tick. -func DeliverOrderTimeTick() DeliverOrder { - return deliverOrderTimetick -} diff --git a/pkg/Makefile b/pkg/Makefile index 639bf54f17..eb3af3a7b1 100644 --- a/pkg/Makefile +++ b/pkg/Makefile @@ -11,9 +11,12 @@ INSTALL_PATH := $(ROOTPATH)/bin getdeps: $(MAKE) -C $(ROOTPATH) getdeps -generate-mockery: getdeps +generate-mockery: getdeps generate-mockery-streaming $(INSTALL_PATH)/mockery --name=MsgStream --dir=$(PWD)/mq/msgstream --output=$(PWD)/mq/msgstream --filename=mock_msgstream.go --with-expecter --structname=MockMsgStream --outpkg=msgstream --inpackage $(INSTALL_PATH)/mockery --name=Factory --dir=$(PWD)/mq/msgstream --output=$(PWD)/mq/msgstream --filename=mock_msgstream_factory.go --with-expecter --structname=MockFactory --outpkg=msgstream --inpackage $(INSTALL_PATH)/mockery --name=Client --dir=$(PWD)/mq/msgdispatcher --output=$(PWD)/mq/msgsdispatcher --filename=mock_client.go --with-expecter --structname=MockClient --outpkg=msgdispatcher --inpackage $(INSTALL_PATH)/mockery --name=Logger --dir=$(PWD)/eventlog --output=$(PWD)/eventlog --filename=mock_logger.go --with-expecter --structname=MockLogger --outpkg=eventlog --inpackage $(INSTALL_PATH)/mockery --name=MessageID --dir=$(PWD)/mq/msgstream/mqwrapper --output=$(PWD)/mq/msgstream/mqwrapper --filename=mock_id.go --with-expecter --structname=MockMessageID --outpkg=mqwrapper --inpackage + +generate-mockery-streaming: getdeps + $(INSTALL_PATH)/mockery --config $(PWD)/streaming/.mockery.yaml diff --git a/pkg/go.mod b/pkg/go.mod index 7dc2b6e37c..c50b810f2c 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -20,6 +20,7 @@ require ( github.com/panjf2000/ants/v2 v2.7.2 github.com/prometheus/client_golang v1.14.0 github.com/quasilyte/go-ruleguard/dsl v0.3.22 + github.com/remeh/sizedwaitgroup v1.0.0 github.com/samber/lo v1.27.0 github.com/sasha-s/go-deadlock v0.3.1 github.com/shirou/gopsutil/v3 v3.22.9 diff --git a/pkg/go.sum b/pkg/go.sum index fee5cb0c65..a6feff9356 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -638,6 +638,8 @@ github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/quasilyte/go-ruleguard/dsl v0.3.22 h1:wd8zkOhSNr+I+8Qeciml08ivDt1pSXe60+5DqOpCjPE= github.com/quasilyte/go-ruleguard/dsl v0.3.22/go.mod h1:KeCP03KrjuSO0H1kTuZQCWlQPulDV6YMIXmpQss17rU= +github.com/remeh/sizedwaitgroup v1.0.0 h1:VNGGFwNo/R5+MJBf6yrsr110p0m4/OX4S3DCy7Kyl5E= +github.com/remeh/sizedwaitgroup v1.0.0/go.mod h1:3j2R4OIe/SeS6YDhICBy22RWjJC5eNCJ1V+9+NVNYlo= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/clock v0.0.0-20190514195947-2896927a307a/go.mod h1:4r5QyqhjIWCcK8DO4KMclc5Iknq5qVBAlbYYzAbUScQ= diff --git a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_Interceptor.go b/pkg/mocks/streaming/mock_walimpls/mock_Interceptor.go similarity index 98% rename from internal/mocks/streamingnode/server/wal/mock_walimpls/mock_Interceptor.go rename to pkg/mocks/streaming/mock_walimpls/mock_Interceptor.go index 35eb067f5c..40a4980d66 100644 --- a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_Interceptor.go +++ b/pkg/mocks/streaming/mock_walimpls/mock_Interceptor.go @@ -5,7 +5,7 @@ package mock_walimpls import ( context "context" - message "github.com/milvus-io/milvus/internal/util/streamingutil/message" + message "github.com/milvus-io/milvus/pkg/streaming/util/message" mock "github.com/stretchr/testify/mock" ) diff --git a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_InterceptorBuilder.go b/pkg/mocks/streaming/mock_walimpls/mock_InterceptorBuilder.go similarity index 96% rename from internal/mocks/streamingnode/server/wal/mock_walimpls/mock_InterceptorBuilder.go rename to pkg/mocks/streaming/mock_walimpls/mock_InterceptorBuilder.go index 9a25052470..89b711b102 100644 --- a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_InterceptorBuilder.go +++ b/pkg/mocks/streaming/mock_walimpls/mock_InterceptorBuilder.go @@ -3,7 +3,7 @@ package mock_walimpls import ( - walimpls "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" + walimpls "github.com/milvus-io/milvus/pkg/streaming/walimpls" mock "github.com/stretchr/testify/mock" ) diff --git a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_InterceptorWithReady.go b/pkg/mocks/streaming/mock_walimpls/mock_InterceptorWithReady.go similarity index 98% rename from internal/mocks/streamingnode/server/wal/mock_walimpls/mock_InterceptorWithReady.go rename to pkg/mocks/streaming/mock_walimpls/mock_InterceptorWithReady.go index 5744967b56..5715e3b595 100644 --- a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_InterceptorWithReady.go +++ b/pkg/mocks/streaming/mock_walimpls/mock_InterceptorWithReady.go @@ -5,7 +5,7 @@ package mock_walimpls import ( context "context" - message "github.com/milvus-io/milvus/internal/util/streamingutil/message" + message "github.com/milvus-io/milvus/pkg/streaming/util/message" mock "github.com/stretchr/testify/mock" ) diff --git a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_OpenerBuilderImpls.go b/pkg/mocks/streaming/mock_walimpls/mock_OpenerBuilderImpls.go similarity index 97% rename from internal/mocks/streamingnode/server/wal/mock_walimpls/mock_OpenerBuilderImpls.go rename to pkg/mocks/streaming/mock_walimpls/mock_OpenerBuilderImpls.go index 34fa8c45e3..c99c0a7e74 100644 --- a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_OpenerBuilderImpls.go +++ b/pkg/mocks/streaming/mock_walimpls/mock_OpenerBuilderImpls.go @@ -3,7 +3,7 @@ package mock_walimpls import ( - walimpls "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" + walimpls "github.com/milvus-io/milvus/pkg/streaming/walimpls" mock "github.com/stretchr/testify/mock" ) diff --git a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_OpenerImpls.go b/pkg/mocks/streaming/mock_walimpls/mock_OpenerImpls.go similarity index 97% rename from internal/mocks/streamingnode/server/wal/mock_walimpls/mock_OpenerImpls.go rename to pkg/mocks/streaming/mock_walimpls/mock_OpenerImpls.go index b4f8c62190..1cc66433fd 100644 --- a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_OpenerImpls.go +++ b/pkg/mocks/streaming/mock_walimpls/mock_OpenerImpls.go @@ -5,7 +5,7 @@ package mock_walimpls import ( context "context" - walimpls "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" + walimpls "github.com/milvus-io/milvus/pkg/streaming/walimpls" mock "github.com/stretchr/testify/mock" ) diff --git a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_ScannerImpls.go b/pkg/mocks/streaming/mock_walimpls/mock_ScannerImpls.go similarity index 98% rename from internal/mocks/streamingnode/server/wal/mock_walimpls/mock_ScannerImpls.go rename to pkg/mocks/streaming/mock_walimpls/mock_ScannerImpls.go index 06613efa2e..14feb42926 100644 --- a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_ScannerImpls.go +++ b/pkg/mocks/streaming/mock_walimpls/mock_ScannerImpls.go @@ -3,7 +3,7 @@ package mock_walimpls import ( - message "github.com/milvus-io/milvus/internal/util/streamingutil/message" + message "github.com/milvus-io/milvus/pkg/streaming/util/message" mock "github.com/stretchr/testify/mock" ) diff --git a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_WALImpls.go b/pkg/mocks/streaming/mock_walimpls/mock_WALImpls.go similarity index 90% rename from internal/mocks/streamingnode/server/wal/mock_walimpls/mock_WALImpls.go rename to pkg/mocks/streaming/mock_walimpls/mock_WALImpls.go index 54eb4853e0..0e3ad28081 100644 --- a/internal/mocks/streamingnode/server/wal/mock_walimpls/mock_WALImpls.go +++ b/pkg/mocks/streaming/mock_walimpls/mock_WALImpls.go @@ -5,12 +5,12 @@ package mock_walimpls import ( context "context" - message "github.com/milvus-io/milvus/internal/util/streamingutil/message" + message "github.com/milvus-io/milvus/pkg/streaming/util/message" mock "github.com/stretchr/testify/mock" - streamingpb "github.com/milvus-io/milvus/internal/proto/streamingpb" + types "github.com/milvus-io/milvus/pkg/streaming/util/types" - walimpls "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" + walimpls "github.com/milvus-io/milvus/pkg/streaming/walimpls" ) // MockWALImpls is an autogenerated mock type for the WALImpls type @@ -82,15 +82,15 @@ func (_c *MockWALImpls_Append_Call) RunAndReturn(run func(context.Context, messa } // Channel provides a mock function with given fields: -func (_m *MockWALImpls) Channel() *streamingpb.PChannelInfo { +func (_m *MockWALImpls) Channel() *types.PChannelInfo { ret := _m.Called() - var r0 *streamingpb.PChannelInfo - if rf, ok := ret.Get(0).(func() *streamingpb.PChannelInfo); ok { + var r0 *types.PChannelInfo + if rf, ok := ret.Get(0).(func() *types.PChannelInfo); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*streamingpb.PChannelInfo) + r0 = ret.Get(0).(*types.PChannelInfo) } } @@ -114,12 +114,12 @@ func (_c *MockWALImpls_Channel_Call) Run(run func()) *MockWALImpls_Channel_Call return _c } -func (_c *MockWALImpls_Channel_Call) Return(_a0 *streamingpb.PChannelInfo) *MockWALImpls_Channel_Call { +func (_c *MockWALImpls_Channel_Call) Return(_a0 *types.PChannelInfo) *MockWALImpls_Channel_Call { _c.Call.Return(_a0) return _c } -func (_c *MockWALImpls_Channel_Call) RunAndReturn(run func() *streamingpb.PChannelInfo) *MockWALImpls_Channel_Call { +func (_c *MockWALImpls_Channel_Call) RunAndReturn(run func() *types.PChannelInfo) *MockWALImpls_Channel_Call { _c.Call.Return(run) return _c } diff --git a/internal/mocks/util/streamingutil/mock_message/mock_ImmutableMessage.go b/pkg/mocks/streaming/util/mock_message/mock_ImmutableMessage.go similarity index 99% rename from internal/mocks/util/streamingutil/mock_message/mock_ImmutableMessage.go rename to pkg/mocks/streaming/util/mock_message/mock_ImmutableMessage.go index df2dae9940..0411d17eec 100644 --- a/internal/mocks/util/streamingutil/mock_message/mock_ImmutableMessage.go +++ b/pkg/mocks/streaming/util/mock_message/mock_ImmutableMessage.go @@ -3,7 +3,7 @@ package mock_message import ( - message "github.com/milvus-io/milvus/internal/util/streamingutil/message" + message "github.com/milvus-io/milvus/pkg/streaming/util/message" mock "github.com/stretchr/testify/mock" ) diff --git a/internal/mocks/util/streamingutil/mock_message/mock_MessageID.go b/pkg/mocks/streaming/util/mock_message/mock_MessageID.go similarity index 98% rename from internal/mocks/util/streamingutil/mock_message/mock_MessageID.go rename to pkg/mocks/streaming/util/mock_message/mock_MessageID.go index f160d324a5..d4371e2b3c 100644 --- a/internal/mocks/util/streamingutil/mock_message/mock_MessageID.go +++ b/pkg/mocks/streaming/util/mock_message/mock_MessageID.go @@ -3,7 +3,7 @@ package mock_message import ( - message "github.com/milvus-io/milvus/internal/util/streamingutil/message" + message "github.com/milvus-io/milvus/pkg/streaming/util/message" mock "github.com/stretchr/testify/mock" ) diff --git a/internal/mocks/util/streamingutil/mock_message/mock_MutableMessage.go b/pkg/mocks/streaming/util/mock_message/mock_MutableMessage.go similarity index 99% rename from internal/mocks/util/streamingutil/mock_message/mock_MutableMessage.go rename to pkg/mocks/streaming/util/mock_message/mock_MutableMessage.go index 5a718756e2..0b353e3a99 100644 --- a/internal/mocks/util/streamingutil/mock_message/mock_MutableMessage.go +++ b/pkg/mocks/streaming/util/mock_message/mock_MutableMessage.go @@ -3,7 +3,7 @@ package mock_message import ( - message "github.com/milvus-io/milvus/internal/util/streamingutil/message" + message "github.com/milvus-io/milvus/pkg/streaming/util/message" mock "github.com/stretchr/testify/mock" ) diff --git a/internal/mocks/util/streamingutil/mock_message/mock_RProperties.go b/pkg/mocks/streaming/util/mock_message/mock_RProperties.go similarity index 100% rename from internal/mocks/util/streamingutil/mock_message/mock_RProperties.go rename to pkg/mocks/streaming/util/mock_message/mock_RProperties.go diff --git a/pkg/mq/mqimpl/rocksmq/client/client_impl.go b/pkg/mq/mqimpl/rocksmq/client/client_impl.go index 06ab4717b0..3b540f1911 100644 --- a/pkg/mq/mqimpl/rocksmq/client/client_impl.go +++ b/pkg/mq/mqimpl/rocksmq/client/client_impl.go @@ -23,12 +23,10 @@ import ( ) type client struct { - server RocksMQ - producerOptions []ProducerOptions - consumerOptions []ConsumerOptions - wg *sync.WaitGroup - closeCh chan struct{} - closeOnce sync.Once + server RocksMQ + wg *sync.WaitGroup + closeCh chan struct{} + closeOnce sync.Once } func newClient(options Options) (*client, error) { @@ -37,10 +35,9 @@ func newClient(options Options) (*client, error) { } c := &client{ - server: options.Server, - producerOptions: []ProducerOptions{}, - wg: &sync.WaitGroup{}, - closeCh: make(chan struct{}), + server: options.Server, + wg: &sync.WaitGroup{}, + closeCh: make(chan struct{}), } return c, nil } @@ -61,7 +58,6 @@ func (c *client) CreateProducer(options ProducerOptions) (Producer, error) { if err != nil { return nil, err } - c.producerOptions = append(c.producerOptions, options) return producer, nil } @@ -117,10 +113,6 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) { } } - // Take messages from RocksDB and put it into consumer.Chan(), - // trigger by consumer.MsgMutex which trigger by producer - c.consumerOptions = append(c.consumerOptions, options) - return consumer, nil } diff --git a/pkg/streaming/.mockery.yaml b/pkg/streaming/.mockery.yaml new file mode 100644 index 0000000000..21b67cc759 --- /dev/null +++ b/pkg/streaming/.mockery.yaml @@ -0,0 +1,22 @@ +quiet: False +with-expecter: True +filename: "mock_{{.InterfaceName}}.go" +dir: "mocks/{{trimPrefix .PackagePath \"github.com/milvus-io/milvus/pkg\" | dir }}/mock_{{.PackageName}}" +mockname: "Mock{{.InterfaceName}}" +outpkg: "mock_{{.PackageName}}" +packages: + github.com/milvus-io/milvus/pkg/streaming/util/message: + interfaces: + MessageID: + ImmutableMessage: + MutableMessage: + RProperties: + github.com/milvus-io/milvus/pkg/streaming/walimpls: + interfaces: + OpenerBuilderImpls: + OpenerImpls: + ScannerImpls: + WALImpls: + Interceptor: + InterceptorWithReady: + InterceptorBuilder: \ No newline at end of file diff --git a/internal/util/streamingutil/message/builder.go b/pkg/streaming/util/message/builder.go similarity index 100% rename from internal/util/streamingutil/message/builder.go rename to pkg/streaming/util/message/builder.go diff --git a/internal/util/streamingutil/message/message.go b/pkg/streaming/util/message/message.go similarity index 100% rename from internal/util/streamingutil/message/message.go rename to pkg/streaming/util/message/message.go diff --git a/internal/util/streamingutil/message/message_builder_test.go b/pkg/streaming/util/message/message_builder_test.go similarity index 95% rename from internal/util/streamingutil/message/message_builder_test.go rename to pkg/streaming/util/message/message_builder_test.go index b5911d9ae1..f6b637924d 100644 --- a/internal/util/streamingutil/message/message_builder_test.go +++ b/pkg/streaming/util/message/message_builder_test.go @@ -7,8 +7,8 @@ import ( "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" - "github.com/milvus-io/milvus/internal/mocks/util/streamingutil/mock_message" - "github.com/milvus-io/milvus/internal/util/streamingutil/message" + "github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message" + "github.com/milvus-io/milvus/pkg/streaming/util/message" ) func TestMessage(t *testing.T) { diff --git a/internal/util/streamingutil/message/message_handler.go b/pkg/streaming/util/message/message_handler.go similarity index 100% rename from internal/util/streamingutil/message/message_handler.go rename to pkg/streaming/util/message/message_handler.go diff --git a/internal/util/streamingutil/message/message_handler_test.go b/pkg/streaming/util/message/message_handler_test.go similarity index 100% rename from internal/util/streamingutil/message/message_handler_test.go rename to pkg/streaming/util/message/message_handler_test.go diff --git a/internal/util/streamingutil/message/message_id.go b/pkg/streaming/util/message/message_id.go similarity index 100% rename from internal/util/streamingutil/message/message_id.go rename to pkg/streaming/util/message/message_id.go diff --git a/internal/util/streamingutil/message/message_id_test.go b/pkg/streaming/util/message/message_id_test.go similarity index 86% rename from internal/util/streamingutil/message/message_id_test.go rename to pkg/streaming/util/message/message_id_test.go index 4a4bcf806a..b93ce2924c 100644 --- a/internal/util/streamingutil/message/message_id_test.go +++ b/pkg/streaming/util/message/message_id_test.go @@ -7,8 +7,8 @@ import ( "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" - "github.com/milvus-io/milvus/internal/mocks/util/streamingutil/mock_message" - "github.com/milvus-io/milvus/internal/util/streamingutil/message" + "github.com/milvus-io/milvus/pkg/mocks/streaming/util/mock_message" + "github.com/milvus-io/milvus/pkg/streaming/util/message" ) func TestRegisterMessageIDUnmarshaler(t *testing.T) { diff --git a/internal/util/streamingutil/message/message_impl.go b/pkg/streaming/util/message/message_impl.go similarity index 100% rename from internal/util/streamingutil/message/message_impl.go rename to pkg/streaming/util/message/message_impl.go diff --git a/internal/util/streamingutil/message/message_test.go b/pkg/streaming/util/message/message_test.go similarity index 100% rename from internal/util/streamingutil/message/message_test.go rename to pkg/streaming/util/message/message_test.go diff --git a/internal/util/streamingutil/message/message_type.go b/pkg/streaming/util/message/message_type.go similarity index 100% rename from internal/util/streamingutil/message/message_type.go rename to pkg/streaming/util/message/message_type.go diff --git a/internal/util/streamingutil/message/properties.go b/pkg/streaming/util/message/properties.go similarity index 100% rename from internal/util/streamingutil/message/properties.go rename to pkg/streaming/util/message/properties.go diff --git a/internal/util/streamingutil/message/version.go b/pkg/streaming/util/message/version.go similarity index 100% rename from internal/util/streamingutil/message/version.go rename to pkg/streaming/util/message/version.go diff --git a/pkg/streaming/util/options/deliver.go b/pkg/streaming/util/options/deliver.go new file mode 100644 index 0000000000..ebdd7554cf --- /dev/null +++ b/pkg/streaming/util/options/deliver.go @@ -0,0 +1,87 @@ +package options + +import ( + "github.com/milvus-io/milvus/pkg/streaming/util/message" +) + +const ( + deliverOrderTimetick DeliverOrder = 1 + + DeliverPolicyTypeAll DeliverPolicyType = 1 + DeliverPolicyTypeLatest DeliverPolicyType = 2 + DeliverPolicyTypeStartFrom DeliverPolicyType = 3 + DeliverPolicyTypeStartAfter DeliverPolicyType = 4 +) + +// DeliverOrder is the order of delivering messages. +type ( + DeliverOrder int + DeliverPolicyType int +) + +// DeliverPolicy is the policy of delivering messages. +type DeliverPolicy interface { + Policy() DeliverPolicyType + + MessageID() message.MessageID +} + +type deliverPolicyWithoutMessageID struct { + policy DeliverPolicyType +} + +func (d *deliverPolicyWithoutMessageID) Policy() DeliverPolicyType { + return d.policy +} + +func (d *deliverPolicyWithoutMessageID) MessageID() message.MessageID { + panic("not implemented") +} + +type deliverPolicyWithMessageID struct { + policy DeliverPolicyType + messageID message.MessageID +} + +func (d *deliverPolicyWithMessageID) Policy() DeliverPolicyType { + return d.policy +} + +func (d *deliverPolicyWithMessageID) MessageID() message.MessageID { + return d.messageID +} + +// DeliverPolicyAll delivers all messages. +func DeliverPolicyAll() DeliverPolicy { + return &deliverPolicyWithoutMessageID{ + policy: DeliverPolicyTypeAll, + } +} + +// DeliverLatest delivers the latest message. +func DeliverPolicyLatest() DeliverPolicy { + return &deliverPolicyWithoutMessageID{ + policy: DeliverPolicyTypeLatest, + } +} + +// DeliverEarliest delivers the earliest message. +func DeliverPolicyStartFrom(messageID message.MessageID) DeliverPolicy { + return &deliverPolicyWithMessageID{ + policy: DeliverPolicyTypeStartFrom, + messageID: messageID, + } +} + +// DeliverPolicyStartAfter delivers the message after the specified message. +func DeliverPolicyStartAfter(messageID message.MessageID) DeliverPolicy { + return &deliverPolicyWithMessageID{ + policy: DeliverPolicyTypeStartAfter, + messageID: messageID, + } +} + +// DeliverOrderTimeTick delivers messages by time tick. +func DeliverOrderTimeTick() DeliverOrder { + return deliverOrderTimetick +} diff --git a/pkg/streaming/util/types/pchannel_info.go b/pkg/streaming/util/types/pchannel_info.go new file mode 100644 index 0000000000..952a69581c --- /dev/null +++ b/pkg/streaming/util/types/pchannel_info.go @@ -0,0 +1,8 @@ +package types + +// PChannelInfo is the struct for pchannel info. +type PChannelInfo struct { + Name string // name of pchannel. + Term int64 // term of pchannel. + ServerID int64 // assigned streaming node server id of pchannel. +} diff --git a/internal/streamingnode/server/wal/walimpls/builder.go b/pkg/streaming/walimpls/builder.go similarity index 100% rename from internal/streamingnode/server/wal/walimpls/builder.go rename to pkg/streaming/walimpls/builder.go diff --git a/internal/streamingnode/server/wal/helper/scanner_helper.go b/pkg/streaming/walimpls/helper/scanner_helper.go similarity index 100% rename from internal/streamingnode/server/wal/helper/scanner_helper.go rename to pkg/streaming/walimpls/helper/scanner_helper.go diff --git a/internal/streamingnode/server/wal/helper/scanner_helper_test.go b/pkg/streaming/walimpls/helper/scanner_helper_test.go similarity index 100% rename from internal/streamingnode/server/wal/helper/scanner_helper_test.go rename to pkg/streaming/walimpls/helper/scanner_helper_test.go diff --git a/internal/streamingnode/server/wal/helper/wal_helper.go b/pkg/streaming/walimpls/helper/wal_helper.go similarity index 70% rename from internal/streamingnode/server/wal/helper/wal_helper.go rename to pkg/streaming/walimpls/helper/wal_helper.go index 2700da56ce..f590ba3f61 100644 --- a/internal/streamingnode/server/wal/helper/wal_helper.go +++ b/pkg/streaming/walimpls/helper/wal_helper.go @@ -3,9 +3,9 @@ package helper import ( "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/proto/streamingpb" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" ) // NewWALHelper creates a new WALHelper. @@ -19,11 +19,11 @@ func NewWALHelper(opt *walimpls.OpenOption) *WALHelper { // WALHelper is a helper for WAL implementation. type WALHelper struct { logger *log.MLogger - channel *streamingpb.PChannelInfo + channel types.PChannelInfo } // Channel returns the channel of the WAL. -func (w *WALHelper) Channel() *streamingpb.PChannelInfo { +func (w *WALHelper) Channel() types.PChannelInfo { return w.channel } diff --git a/pkg/streaming/walimpls/helper/wal_helper_test.go b/pkg/streaming/walimpls/helper/wal_helper_test.go new file mode 100644 index 0000000000..7917a7adde --- /dev/null +++ b/pkg/streaming/walimpls/helper/wal_helper_test.go @@ -0,0 +1,23 @@ +package helper + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/streaming/util/types" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" +) + +func TestWALHelper(t *testing.T) { + h := NewWALHelper(&walimpls.OpenOption{ + Channel: types.PChannelInfo{ + Name: "test", + Term: 1, + ServerID: 1, + }, + }) + assert.NotNil(t, h.Channel()) + assert.Equal(t, h.Channel().Name, "test") + assert.NotNil(t, h.Log()) +} diff --git a/pkg/streaming/walimpls/impls/pulsar/builder.go b/pkg/streaming/walimpls/impls/pulsar/builder.go new file mode 100644 index 0000000000..0a0be9074d --- /dev/null +++ b/pkg/streaming/walimpls/impls/pulsar/builder.go @@ -0,0 +1,67 @@ +package pulsar + +import ( + "time" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/cockroachdb/errors" + + "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/registry" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +const ( + walName = "pulsar" +) + +func init() { + // register the builder to the wal registry. + registry.RegisterBuilder(&builderImpl{}) + // register the unmarshaler to the message registry. + message.RegisterMessageIDUnmsarshaler(walName, UnmarshalMessageID) +} + +// builderImpl is the builder for pulsar wal. +type builderImpl struct{} + +// Name returns the name of the wal. +func (b *builderImpl) Name() string { + return walName +} + +// Build build a wal instance. +func (b *builderImpl) Build() (walimpls.OpenerImpls, error) { + options, err := b.getPulsarClientOptions() + if err != nil { + return nil, errors.Wrapf(err, "build pulsar client options failed") + } + c, err := pulsar.NewClient(options) + if err != nil { + return nil, err + } + return &openerImpl{ + c: c, + }, nil +} + +// getPulsarClientOptions gets the pulsar client options from the config. +func (b *builderImpl) getPulsarClientOptions() (pulsar.ClientOptions, error) { + cfg := ¶mtable.Get().PulsarCfg + auth, err := pulsar.NewAuthentication(cfg.AuthPlugin.GetValue(), cfg.AuthParams.GetValue()) + if err != nil { + return pulsar.ClientOptions{}, errors.New("build authencation from config failed") + } + options := pulsar.ClientOptions{ + URL: cfg.Address.GetValue(), + OperationTimeout: cfg.RequestTimeout.GetAsDuration(time.Second), + Authentication: auth, + } + if cfg.EnableClientMetrics.GetAsBool() { + // Enable client metrics if config.EnableClientMetrics is true, use pkg-defined registerer. + options.MetricsRegisterer = metrics.GetRegisterer() + } + return options, nil +} diff --git a/pkg/streaming/walimpls/impls/pulsar/message_id.go b/pkg/streaming/walimpls/impls/pulsar/message_id.go new file mode 100644 index 0000000000..3214dd2959 --- /dev/null +++ b/pkg/streaming/walimpls/impls/pulsar/message_id.go @@ -0,0 +1,66 @@ +package pulsar + +import ( + "github.com/apache/pulsar-client-go/pulsar" + + "github.com/milvus-io/milvus/pkg/streaming/util/message" +) + +var _ message.MessageID = pulsarID{} + +func UnmarshalMessageID(data []byte) (message.MessageID, error) { + id, err := unmarshalMessageID(data) + if err != nil { + return nil, err + } + return id, nil +} + +func unmarshalMessageID(data []byte) (pulsarID, error) { + msgID, err := pulsar.DeserializeMessageID(data) + if err != nil { + return pulsarID{nil}, err + } + return pulsarID{msgID}, nil +} + +type pulsarID struct { + pulsar.MessageID +} + +func (id pulsarID) WALName() string { + return walName +} + +func (id pulsarID) LT(other message.MessageID) bool { + id2 := other.(pulsarID) + if id.LedgerID() != id2.LedgerID() { + return id.LedgerID() < id2.LedgerID() + } + if id.EntryID() != id2.EntryID() { + return id.EntryID() < id2.EntryID() + } + return id.BatchIdx() < id2.BatchIdx() +} + +func (id pulsarID) LTE(other message.MessageID) bool { + id2 := other.(pulsarID) + if id.LedgerID() != id2.LedgerID() { + return id.LedgerID() < id2.LedgerID() + } + if id.EntryID() != id2.EntryID() { + return id.EntryID() < id2.EntryID() + } + return id.BatchIdx() <= id2.BatchIdx() +} + +func (id pulsarID) EQ(other message.MessageID) bool { + id2 := other.(pulsarID) + return id.LedgerID() == id2.LedgerID() && + id.EntryID() == id2.EntryID() && + id.BatchIdx() == id2.BatchIdx() +} + +func (id pulsarID) Marshal() []byte { + return id.Serialize() +} diff --git a/pkg/streaming/walimpls/impls/pulsar/message_id_test.go b/pkg/streaming/walimpls/impls/pulsar/message_id_test.go new file mode 100644 index 0000000000..599014480f --- /dev/null +++ b/pkg/streaming/walimpls/impls/pulsar/message_id_test.go @@ -0,0 +1,75 @@ +package pulsar + +import ( + "testing" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" +) + +func TestMessageID(t *testing.T) { + ids := []pulsarID{ + newMessageIDOfPulsar(0, 0, 0), + newMessageIDOfPulsar(0, 0, 1), + newMessageIDOfPulsar(0, 0, 1000), + newMessageIDOfPulsar(0, 1, 0), + newMessageIDOfPulsar(0, 1, 1000), + newMessageIDOfPulsar(0, 1000, 0), + newMessageIDOfPulsar(1, 0, 0), + newMessageIDOfPulsar(1, 1000, 0), + newMessageIDOfPulsar(2, 0, 0), + } + + for x, idx := range ids { + for y, idy := range ids { + assert.Equal(t, idx.EQ(idy), x == y) + assert.Equal(t, idy.EQ(idx), x == y) + assert.Equal(t, idy.LT(idx), x > y) + assert.Equal(t, idy.LTE(idx), x >= y) + assert.Equal(t, idx.LT(idy), x < y) + assert.Equal(t, idx.LTE(idy), x <= y) + } + } + + msgID, err := UnmarshalMessageID(pulsarID{newMessageIDOfPulsar(1, 2, 3)}.Marshal()) + assert.NoError(t, err) + assert.True(t, msgID.EQ(pulsarID{newMessageIDOfPulsar(1, 2, 3)})) + + _, err = UnmarshalMessageID([]byte{0x01, 0x02, 0x03, 0x04}) + assert.Error(t, err) +} + +// only for pulsar id unittest. +type MessageIdData struct { + LedgerId *uint64 `protobuf:"varint,1,req,name=ledgerId" json:"ledgerId,omitempty"` + EntryId *uint64 `protobuf:"varint,2,req,name=entryId" json:"entryId,omitempty"` + Partition *int32 `protobuf:"varint,3,opt,name=partition,def=-1" json:"partition,omitempty"` + BatchIndex *int32 `protobuf:"varint,4,opt,name=batch_index,json=batchIndex,def=-1" json:"batch_index,omitempty"` +} + +func (m *MessageIdData) Reset() { *m = MessageIdData{} } +func (m *MessageIdData) String() string { return proto.CompactTextString(m) } + +func (*MessageIdData) ProtoMessage() {} + +// newMessageIDOfPulsar only for test. +func newMessageIDOfPulsar(ledgerID uint64, entryID uint64, batchIdx int32) pulsarID { + id := &MessageIdData{ + LedgerId: &ledgerID, + EntryId: &entryID, + BatchIndex: &batchIdx, + } + msg, err := proto.Marshal(id) + if err != nil { + panic(err) + } + msgID, err := pulsar.DeserializeMessageID(msg) + if err != nil { + panic(err) + } + + return pulsarID{ + msgID, + } +} diff --git a/pkg/streaming/walimpls/impls/pulsar/opener.go b/pkg/streaming/walimpls/impls/pulsar/opener.go new file mode 100644 index 0000000000..5025cf7f37 --- /dev/null +++ b/pkg/streaming/walimpls/impls/pulsar/opener.go @@ -0,0 +1,38 @@ +package pulsar + +import ( + "context" + + "github.com/apache/pulsar-client-go/pulsar" + + "github.com/milvus-io/milvus/pkg/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/helper" +) + +var _ walimpls.OpenerImpls = (*openerImpl)(nil) + +// openerImpl is the opener for pulsar wal. +type openerImpl struct { + c pulsar.Client +} + +// Open opens a wal instance. +func (o *openerImpl) Open(ctx context.Context, opt *walimpls.OpenOption) (walimpls.WALImpls, error) { + p, err := o.c.CreateProducer(pulsar.ProducerOptions{ + // TODO: configurations. + Topic: opt.Channel.Name, + }) + if err != nil { + return nil, err + } + return &walImpl{ + WALHelper: helper.NewWALHelper(opt), + p: p, + c: o.c, + }, nil +} + +// Close closes the opener resources. +func (o *openerImpl) Close() { + o.c.Close() +} diff --git a/pkg/streaming/walimpls/impls/pulsar/pulsar_test.go b/pkg/streaming/walimpls/impls/pulsar/pulsar_test.go new file mode 100644 index 0000000000..7f9b812de5 --- /dev/null +++ b/pkg/streaming/walimpls/impls/pulsar/pulsar_test.go @@ -0,0 +1,32 @@ +package pulsar + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/registry" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +func TestMain(m *testing.M) { + paramtable.Init() + m.Run() +} + +func TestRegistry(t *testing.T) { + registeredB := registry.MustGetBuilder(walName) + assert.NotNil(t, registeredB) + assert.Equal(t, walName, registeredB.Name()) + + id, err := message.UnmarshalMessageID(walName, + newMessageIDOfPulsar(1, 2, 3).Marshal()) + assert.NoError(t, err) + assert.True(t, id.EQ(newMessageIDOfPulsar(1, 2, 3))) +} + +func TestPulsar(t *testing.T) { + walimpls.NewWALImplsTestFramework(t, 100, &builderImpl{}).Run() +} diff --git a/pkg/streaming/walimpls/impls/pulsar/scanner.go b/pkg/streaming/walimpls/impls/pulsar/scanner.go new file mode 100644 index 0000000000..c0afde5659 --- /dev/null +++ b/pkg/streaming/walimpls/impls/pulsar/scanner.go @@ -0,0 +1,73 @@ +package pulsar + +import ( + "context" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/cockroachdb/errors" + + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/helper" +) + +var _ walimpls.ScannerImpls = (*scannerImpl)(nil) + +func newScanner( + scannerName string, + reader pulsar.Reader, +) *scannerImpl { + s := &scannerImpl{ + ScannerHelper: helper.NewScannerHelper(scannerName), + reader: reader, + msgChannel: make(chan message.ImmutableMessage, 1), + } + go s.executeConsume() + return s +} + +type scannerImpl struct { + *helper.ScannerHelper + reader pulsar.Reader + msgChannel chan message.ImmutableMessage +} + +// Chan returns the channel of message. +func (s *scannerImpl) Chan() <-chan message.ImmutableMessage { + return s.msgChannel +} + +// Close the scanner, release the underlying resources. +// Return the error same with `Error` +func (s *scannerImpl) Close() error { + err := s.ScannerHelper.Close() + s.reader.Close() + return err +} + +func (s *scannerImpl) executeConsume() { + defer close(s.msgChannel) + for { + msg, err := s.reader.Next(s.Context()) + if err != nil { + if errors.IsAny(err, context.Canceled, context.DeadlineExceeded) { + s.Finish(nil) + return + } + s.Finish(err) + return + } + newImmutableMessage := message.NewBuilder(). + WithMessageID(pulsarID{msg.ID()}). + WithPayload(msg.Payload()). + WithProperties(msg.Properties()). + BuildImmutable() + + select { + case <-s.Context().Done(): + s.Finish(nil) + return + case s.msgChannel <- newImmutableMessage: + } + } +} diff --git a/pkg/streaming/walimpls/impls/pulsar/wal.go b/pkg/streaming/walimpls/impls/pulsar/wal.go new file mode 100644 index 0000000000..54e073f8f9 --- /dev/null +++ b/pkg/streaming/walimpls/impls/pulsar/wal.go @@ -0,0 +1,65 @@ +package pulsar + +import ( + "context" + + "github.com/apache/pulsar-client-go/pulsar" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/options" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/helper" +) + +var _ walimpls.WALImpls = (*walImpl)(nil) + +type walImpl struct { + *helper.WALHelper + c pulsar.Client + p pulsar.Producer +} + +func (w *walImpl) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { + id, err := w.p.Send(ctx, &pulsar.ProducerMessage{ + Payload: msg.Payload(), + Properties: msg.Properties().ToRawMap(), + }) + if err != nil { + w.Log().RatedWarn(1, "send message to pulsar failed", zap.Error(err)) + return nil, err + } + return pulsarID{id}, nil +} + +func (w *walImpl) Read(ctx context.Context, opt walimpls.ReadOption) (s walimpls.ScannerImpls, err error) { + ch := make(chan pulsar.ReaderMessage, 1) + readerOpt := pulsar.ReaderOptions{ + Topic: w.Channel().Name, + Name: opt.Name, + MessageChannel: ch, + ReceiverQueueSize: opt.ReadAheadBufferSize, + } + + switch opt.DeliverPolicy.Policy() { + case options.DeliverPolicyTypeAll: + readerOpt.StartMessageID = pulsar.EarliestMessageID() + case options.DeliverPolicyTypeLatest: + readerOpt.StartMessageID = pulsar.LatestMessageID() + case options.DeliverPolicyTypeStartFrom: + readerOpt.StartMessageID = opt.DeliverPolicy.MessageID().(pulsarID).MessageID + readerOpt.StartMessageIDInclusive = true + case options.DeliverPolicyTypeStartAfter: + readerOpt.StartMessageID = opt.DeliverPolicy.MessageID().(pulsarID).MessageID + readerOpt.StartMessageIDInclusive = false + } + reader, err := w.c.CreateReader(readerOpt) + if err != nil { + return nil, err + } + return newScanner(opt.Name, reader), nil +} + +func (w *walImpl) Close() { + w.p.Close() // close all producer +} diff --git a/pkg/streaming/walimpls/impls/rmq/builder.go b/pkg/streaming/walimpls/impls/rmq/builder.go new file mode 100644 index 0000000000..2fd66a04ce --- /dev/null +++ b/pkg/streaming/walimpls/impls/rmq/builder.go @@ -0,0 +1,41 @@ +package rmq + +import ( + "github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/client" + "github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/server" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/registry" +) + +const ( + walName = "rmq" +) + +func init() { + // register the builder to the registry. + registry.RegisterBuilder(&builderImpl{}) + // register the unmarshaler to the message registry. + message.RegisterMessageIDUnmsarshaler(walName, UnmarshalMessageID) +} + +// builderImpl is the builder for rmq opener. +type builderImpl struct{} + +// Name of the wal builder, should be a lowercase string. +func (b *builderImpl) Name() string { + return walName +} + +// Build build a wal instance. +func (b *builderImpl) Build() (walimpls.OpenerImpls, error) { + c, err := client.NewClient(client.Options{ + Server: server.Rmq, + }) + if err != nil { + return nil, err + } + return &openerImpl{ + c: c, + }, nil +} diff --git a/pkg/streaming/walimpls/impls/rmq/message_id.go b/pkg/streaming/walimpls/impls/rmq/message_id.go new file mode 100644 index 0000000000..59c7773387 --- /dev/null +++ b/pkg/streaming/walimpls/impls/rmq/message_id.go @@ -0,0 +1,59 @@ +package rmq + +import ( + "encoding/base64" + + "github.com/cockroachdb/errors" + "github.com/golang/protobuf/proto" + "google.golang.org/protobuf/encoding/protowire" + + "github.com/milvus-io/milvus/pkg/streaming/util/message" +) + +var _ message.MessageID = rmqID(0) + +// UnmarshalMessageID unmarshal the message id. +func UnmarshalMessageID(data []byte) (message.MessageID, error) { + id, err := unmarshalMessageID(data) + if err != nil { + return nil, err + } + return id, nil +} + +// unmashalMessageID unmarshal the message id. +func unmarshalMessageID(data []byte) (rmqID, error) { + v, n := proto.DecodeVarint(data) + if n <= 0 || n != len(data) { + return 0, errors.Wrapf(message.ErrInvalidMessageID, "rmqID: %s", base64.RawStdEncoding.EncodeToString(data)) + } + return rmqID(protowire.DecodeZigZag(v)), nil +} + +// rmqID is the message id for rmq. +type rmqID int64 + +// WALName returns the name of message id related wal. +func (id rmqID) WALName() string { + return walName +} + +// LT less than. +func (id rmqID) LT(other message.MessageID) bool { + return id < other.(rmqID) +} + +// LTE less than or equal to. +func (id rmqID) LTE(other message.MessageID) bool { + return id <= other.(rmqID) +} + +// EQ Equal to. +func (id rmqID) EQ(other message.MessageID) bool { + return id == other.(rmqID) +} + +// Marshal marshal the message id. +func (id rmqID) Marshal() []byte { + return proto.EncodeVarint(protowire.EncodeZigZag(int64(id))) +} diff --git a/pkg/streaming/walimpls/impls/rmq/message_id_test.go b/pkg/streaming/walimpls/impls/rmq/message_id_test.go new file mode 100644 index 0000000000..9e38751918 --- /dev/null +++ b/pkg/streaming/walimpls/impls/rmq/message_id_test.go @@ -0,0 +1,25 @@ +package rmq + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMessageID(t *testing.T) { + assert.True(t, rmqID(1).LT(rmqID(2))) + assert.True(t, rmqID(1).EQ(rmqID(1))) + assert.True(t, rmqID(1).LTE(rmqID(1))) + assert.True(t, rmqID(1).LTE(rmqID(2))) + assert.False(t, rmqID(2).LT(rmqID(1))) + assert.False(t, rmqID(2).EQ(rmqID(1))) + assert.False(t, rmqID(2).LTE(rmqID(1))) + assert.True(t, rmqID(2).LTE(rmqID(2))) + + msgID, err := UnmarshalMessageID(rmqID(1).Marshal()) + assert.NoError(t, err) + assert.Equal(t, rmqID(1), msgID) + + _, err = UnmarshalMessageID([]byte{0x01, 0x02, 0x03, 0x04}) + assert.Error(t, err) +} diff --git a/pkg/streaming/walimpls/impls/rmq/opener.go b/pkg/streaming/walimpls/impls/rmq/opener.go new file mode 100644 index 0000000000..a1fa63777a --- /dev/null +++ b/pkg/streaming/walimpls/impls/rmq/opener.go @@ -0,0 +1,36 @@ +package rmq + +import ( + "context" + + "github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/client" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/helper" +) + +var _ walimpls.OpenerImpls = (*openerImpl)(nil) + +// openerImpl is the implementation of walimpls.Opener interface. +type openerImpl struct { + c client.Client +} + +// Open opens a new wal. +func (o *openerImpl) Open(ctx context.Context, opt *walimpls.OpenOption) (walimpls.WALImpls, error) { + p, err := o.c.CreateProducer(client.ProducerOptions{ + Topic: opt.Channel.Name, + }) + if err != nil { + return nil, err + } + return &walImpl{ + WALHelper: helper.NewWALHelper(opt), + p: p, + c: o.c, + }, nil +} + +// Close closes the opener resources. +func (o *openerImpl) Close() { + o.c.Close() +} diff --git a/pkg/streaming/walimpls/impls/rmq/rmq_test.go b/pkg/streaming/walimpls/impls/rmq/rmq_test.go new file mode 100644 index 0000000000..a8fc81d209 --- /dev/null +++ b/pkg/streaming/walimpls/impls/rmq/rmq_test.go @@ -0,0 +1,39 @@ +package rmq + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/server" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/registry" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +func TestMain(m *testing.M) { + paramtable.Init() + tmpPath, err := os.MkdirTemp("", "rocksdb_test") + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpPath) + server.InitRocksMQ(tmpPath) + defer server.CloseRocksMQ() + m.Run() +} + +func TestRegistry(t *testing.T) { + registeredB := registry.MustGetBuilder(walName) + assert.NotNil(t, registeredB) + assert.Equal(t, walName, registeredB.Name()) + + id, err := message.UnmarshalMessageID(walName, rmqID(1).Marshal()) + assert.NoError(t, err) + assert.True(t, id.EQ(rmqID(1))) +} + +func TestWAL(t *testing.T) { + // walimpls.NewWALImplsTestFramework(t, 100, &builderImpl{}).Run() +} diff --git a/pkg/streaming/walimpls/impls/rmq/scanner.go b/pkg/streaming/walimpls/impls/rmq/scanner.go new file mode 100644 index 0000000000..0c727e2931 --- /dev/null +++ b/pkg/streaming/walimpls/impls/rmq/scanner.go @@ -0,0 +1,77 @@ +package rmq + +import ( + "github.com/cockroachdb/errors" + + "github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/client" + "github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/server" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/helper" +) + +var _ walimpls.ScannerImpls = (*scannerImpl)(nil) + +// newScanner creates a new scanner. +func newScanner( + scannerName string, + exclude *rmqID, + consumer client.Consumer, +) *scannerImpl { + s := &scannerImpl{ + ScannerHelper: helper.NewScannerHelper(scannerName), + exclude: exclude, + consumer: consumer, + msgChannel: make(chan message.ImmutableMessage, 1), + } + go s.executeConsume() + return s +} + +// scannerImpl is the implementation of ScannerImpls for rmq. +type scannerImpl struct { + *helper.ScannerHelper + exclude *rmqID + consumer client.Consumer + msgChannel chan message.ImmutableMessage +} + +// Chan returns the channel of message. +func (s *scannerImpl) Chan() <-chan message.ImmutableMessage { + return s.msgChannel +} + +// Close the scanner, release the underlying resources. +// Return the error same with `Error` +func (s *scannerImpl) Close() error { + err := s.ScannerHelper.Close() + s.consumer.Close() + return err +} + +// executeConsume consumes the message from the consumer. +func (s *scannerImpl) executeConsume() { + defer close(s.msgChannel) + for { + select { + case <-s.Context().Done(): + s.Finish(nil) + return + case msg, ok := <-s.consumer.Chan(): + if !ok { + s.Finish(errors.New("mq consumer unexpected channel closed")) + return + } + msgID := rmqID(msg.ID().(*server.RmqID).MessageID) + // record the last message id to avoid repeated consume message. + // and exclude message id should be filterred. + if s.exclude == nil || !s.exclude.EQ(msgID) { + s.msgChannel <- message.NewBuilder(). + WithMessageID(msgID). + WithPayload(msg.Payload()). + WithProperties(msg.Properties()). + BuildImmutable() + } + } + } +} diff --git a/pkg/streaming/walimpls/impls/rmq/wal.go b/pkg/streaming/walimpls/impls/rmq/wal.go new file mode 100644 index 0000000000..a00b9ef043 --- /dev/null +++ b/pkg/streaming/walimpls/impls/rmq/wal.go @@ -0,0 +1,95 @@ +package rmq + +import ( + "context" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/mq/common" + "github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/client" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/options" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/helper" +) + +const defaultReadAheadBufferSize = 1024 + +var _ walimpls.WALImpls = (*walImpl)(nil) + +// walImpl is the implementation of walimpls.WAL interface. +type walImpl struct { + *helper.WALHelper + p client.Producer + c client.Client +} + +// Append appends a message to the wal. +func (w *walImpl) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { + id, err := w.p.Send(&common.ProducerMessage{ + Payload: msg.Payload(), + Properties: msg.Properties().ToRawMap(), + }) + if err != nil { + w.Log().RatedWarn(1, "send message to rmq failed", zap.Error(err)) + return nil, err + } + return rmqID(id), nil +} + +// Read create a scanner to read the wal. +func (w *walImpl) Read(ctx context.Context, opt walimpls.ReadOption) (s walimpls.ScannerImpls, err error) { + scannerName := opt.Name + if opt.ReadAheadBufferSize == 0 { + opt.ReadAheadBufferSize = defaultReadAheadBufferSize + } + receiveChannel := make(chan common.Message, opt.ReadAheadBufferSize) + consumerOption := client.ConsumerOptions{ + Topic: w.Channel().Name, + SubscriptionName: scannerName, + SubscriptionInitialPosition: common.SubscriptionPositionUnknown, + MessageChannel: receiveChannel, + } + switch opt.DeliverPolicy.Policy() { + case options.DeliverPolicyTypeAll: + consumerOption.SubscriptionInitialPosition = common.SubscriptionPositionEarliest + case options.DeliverPolicyTypeLatest: + consumerOption.SubscriptionInitialPosition = common.SubscriptionPositionLatest + } + + // Subscribe the MQ consumer. + consumer, err := w.c.Subscribe(consumerOption) + if err != nil { + return nil, err + } + defer func() { + if err != nil { + // release the subscriber if following operation is failure. + // to avoid resource leak. + consumer.Close() + } + }() + + // Seek the MQ consumer. + var exclude *rmqID + switch opt.DeliverPolicy.Policy() { + case options.DeliverPolicyTypeStartFrom: + id := opt.DeliverPolicy.MessageID().(rmqID) + // Do a inslusive seek. + if err = consumer.Seek(int64(id)); err != nil { + return nil, err + } + case options.DeliverPolicyTypeStartAfter: + id := opt.DeliverPolicy.MessageID().(rmqID) + exclude = &id + if err = consumer.Seek(int64(id)); err != nil { + return nil, err + } + } + return newScanner(scannerName, exclude, consumer), nil +} + +// Close closes the wal. +func (w *walImpl) Close() { + w.p.Close() // close all producer +} diff --git a/internal/streamingnode/server/wal/walimplstest/builder.go b/pkg/streaming/walimpls/impls/walimplstest/builder.go similarity index 69% rename from internal/streamingnode/server/wal/walimplstest/builder.go rename to pkg/streaming/walimpls/impls/walimplstest/builder.go index 0442d7f38f..bf628542c1 100644 --- a/internal/streamingnode/server/wal/walimplstest/builder.go +++ b/pkg/streaming/walimpls/impls/walimplstest/builder.go @@ -4,9 +4,9 @@ package walimplstest import ( - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/registry" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" - "github.com/milvus-io/milvus/internal/util/streamingutil/message" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/registry" ) const ( diff --git a/internal/streamingnode/server/wal/walimplstest/message_id.go b/pkg/streaming/walimpls/impls/walimplstest/message_id.go similarity index 95% rename from internal/streamingnode/server/wal/walimplstest/message_id.go rename to pkg/streaming/walimpls/impls/walimplstest/message_id.go index 82a086a2f0..cf028942a3 100644 --- a/internal/streamingnode/server/wal/walimplstest/message_id.go +++ b/pkg/streaming/walimpls/impls/walimplstest/message_id.go @@ -6,7 +6,7 @@ package walimplstest import ( "strconv" - "github.com/milvus-io/milvus/internal/util/streamingutil/message" + "github.com/milvus-io/milvus/pkg/streaming/util/message" ) var _ message.MessageID = testMessageID(0) diff --git a/internal/streamingnode/server/wal/walimplstest/message_log.go b/pkg/streaming/walimpls/impls/walimplstest/message_log.go similarity index 95% rename from internal/streamingnode/server/wal/walimplstest/message_log.go rename to pkg/streaming/walimpls/impls/walimplstest/message_log.go index 6556d9a5d5..82c713e8c8 100644 --- a/internal/streamingnode/server/wal/walimplstest/message_log.go +++ b/pkg/streaming/walimpls/impls/walimplstest/message_log.go @@ -7,7 +7,7 @@ import ( "context" "sync" - "github.com/milvus-io/milvus/internal/util/streamingutil/message" + "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/util/syncutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) diff --git a/internal/streamingnode/server/wal/walimplstest/opener.go b/pkg/streaming/walimpls/impls/walimplstest/opener.go similarity index 64% rename from internal/streamingnode/server/wal/walimplstest/opener.go rename to pkg/streaming/walimpls/impls/walimplstest/opener.go index 69f9f0bf8f..78a9be1ab3 100644 --- a/internal/streamingnode/server/wal/walimplstest/opener.go +++ b/pkg/streaming/walimpls/impls/walimplstest/opener.go @@ -6,8 +6,8 @@ package walimplstest import ( "context" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/helper" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/helper" ) var _ walimpls.OpenerImpls = &opener{} @@ -15,7 +15,7 @@ var _ walimpls.OpenerImpls = &opener{} type opener struct{} func (*opener) Open(ctx context.Context, opt *walimpls.OpenOption) (walimpls.WALImpls, error) { - l := getOrCreateLogs(opt.Channel.GetName()) + l := getOrCreateLogs(opt.Channel.Name) return &walImpls{ WALHelper: *helper.NewWALHelper(opt), datas: l, diff --git a/internal/streamingnode/server/wal/walimplstest/scanner.go b/pkg/streaming/walimpls/impls/walimplstest/scanner.go similarity index 80% rename from internal/streamingnode/server/wal/walimplstest/scanner.go rename to pkg/streaming/walimpls/impls/walimplstest/scanner.go index 75be646192..0059933bd7 100644 --- a/internal/streamingnode/server/wal/walimplstest/scanner.go +++ b/pkg/streaming/walimpls/impls/walimplstest/scanner.go @@ -4,9 +4,9 @@ package walimplstest import ( - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/helper" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" - "github.com/milvus-io/milvus/internal/util/streamingutil/message" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/helper" ) var _ walimpls.ScannerImpls = &scannerImpls{} diff --git a/pkg/streaming/walimpls/impls/walimplstest/wal.go b/pkg/streaming/walimpls/impls/walimplstest/wal.go new file mode 100644 index 0000000000..595ad8aec3 --- /dev/null +++ b/pkg/streaming/walimpls/impls/walimplstest/wal.go @@ -0,0 +1,44 @@ +//go:build test +// +build test + +package walimplstest + +import ( + "context" + + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/options" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/helper" +) + +var _ walimpls.WALImpls = &walImpls{} + +type walImpls struct { + helper.WALHelper + datas *messageLog +} + +func (w *walImpls) Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error) { + return w.datas.Append(ctx, msg) +} + +func (w *walImpls) Read(ctx context.Context, opts walimpls.ReadOption) (walimpls.ScannerImpls, error) { + offset := int64(0) + switch opts.DeliverPolicy.Policy() { + case options.DeliverPolicyTypeAll: + offset = 0 + case options.DeliverPolicyTypeLatest: + offset = w.datas.Len() + case options.DeliverPolicyTypeStartFrom: + offset = int64(opts.DeliverPolicy.MessageID().(testMessageID)) + case options.DeliverPolicyTypeStartAfter: + offset = int64(opts.DeliverPolicy.MessageID().(testMessageID)) + 1 + } + return newScannerImpls( + opts, w.datas, int(offset), + ), nil +} + +func (w *walImpls) Close() { +} diff --git a/internal/streamingnode/server/wal/walimplstest/wal_test.go b/pkg/streaming/walimpls/impls/walimplstest/wal_test.go similarity index 67% rename from internal/streamingnode/server/wal/walimplstest/wal_test.go rename to pkg/streaming/walimpls/impls/walimplstest/wal_test.go index 57a18ac4c4..88d2841562 100644 --- a/internal/streamingnode/server/wal/walimplstest/wal_test.go +++ b/pkg/streaming/walimpls/impls/walimplstest/wal_test.go @@ -3,7 +3,7 @@ package walimplstest import ( "testing" - "github.com/milvus-io/milvus/internal/streamingnode/server/wal/walimpls" + "github.com/milvus-io/milvus/pkg/streaming/walimpls" ) func TestWALImplsTest(t *testing.T) { diff --git a/internal/streamingnode/server/wal/walimpls/interceptor.go b/pkg/streaming/walimpls/interceptor.go similarity index 96% rename from internal/streamingnode/server/wal/walimpls/interceptor.go rename to pkg/streaming/walimpls/interceptor.go index 8e7d2e0ff4..3e5013e6fe 100644 --- a/internal/streamingnode/server/wal/walimpls/interceptor.go +++ b/pkg/streaming/walimpls/interceptor.go @@ -3,7 +3,7 @@ package walimpls import ( "context" - "github.com/milvus-io/milvus/internal/util/streamingutil/message" + "github.com/milvus-io/milvus/pkg/streaming/util/message" ) type ( diff --git a/internal/streamingnode/server/wal/walimpls/opener.go b/pkg/streaming/walimpls/opener.go similarity index 76% rename from internal/streamingnode/server/wal/walimpls/opener.go rename to pkg/streaming/walimpls/opener.go index 835fd886d8..0684ecf660 100644 --- a/internal/streamingnode/server/wal/walimpls/opener.go +++ b/pkg/streaming/walimpls/opener.go @@ -3,12 +3,12 @@ package walimpls import ( "context" - "github.com/milvus-io/milvus/internal/proto/streamingpb" + "github.com/milvus-io/milvus/pkg/streaming/util/types" ) // OpenOption is the option for allocating wal impls instance. type OpenOption struct { - Channel *streamingpb.PChannelInfo // Channel to open. + Channel types.PChannelInfo // Channel to open. } // OpenerImpls is the interface for build WALImpls instance. diff --git a/pkg/streaming/walimpls/registry/registry.go b/pkg/streaming/walimpls/registry/registry.go new file mode 100644 index 0000000000..af5166b431 --- /dev/null +++ b/pkg/streaming/walimpls/registry/registry.go @@ -0,0 +1,30 @@ +package registry + +import ( + "github.com/milvus-io/milvus/pkg/streaming/walimpls" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// builders is a map of registered wal builders. +var builders typeutil.ConcurrentMap[string, walimpls.OpenerBuilderImpls] + +// Register registers the wal builder. +// +// NOTE: this function must only be called during initialization time (i.e. in +// an init() function), name of builder is lowercase. If multiple Builder are +// registered with the same name, panic will occur. +func RegisterBuilder(b walimpls.OpenerBuilderImpls) { + _, loaded := builders.GetOrInsert(b.Name(), b) + if loaded { + panic("walimpls builder already registered: " + b.Name()) + } +} + +// MustGetBuilder returns the walimpls builder by name. +func MustGetBuilder(name string) walimpls.OpenerBuilderImpls { + b, ok := builders.Get(name) + if !ok { + panic("walimpls builder not found: " + name) + } + return b +} diff --git a/internal/streamingnode/server/wal/registry/wal_test.go b/pkg/streaming/walimpls/registry/wal_test.go similarity index 91% rename from internal/streamingnode/server/wal/registry/wal_test.go rename to pkg/streaming/walimpls/registry/wal_test.go index 5a3ec9ce4a..778a92455d 100644 --- a/internal/streamingnode/server/wal/registry/wal_test.go +++ b/pkg/streaming/walimpls/registry/wal_test.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/milvus-io/milvus/internal/mocks/streamingnode/server/wal/mock_walimpls" + "github.com/milvus-io/milvus/pkg/mocks/streaming/mock_walimpls" ) func TestRegister(t *testing.T) { diff --git a/internal/streamingnode/server/wal/walimpls/scanner.go b/pkg/streaming/walimpls/scanner.go similarity index 55% rename from internal/streamingnode/server/wal/walimpls/scanner.go rename to pkg/streaming/walimpls/scanner.go index 7152eda1cf..3c416c12ee 100644 --- a/internal/streamingnode/server/wal/walimpls/scanner.go +++ b/pkg/streaming/walimpls/scanner.go @@ -1,12 +1,19 @@ package walimpls import ( - "github.com/milvus-io/milvus/internal/util/streamingutil/message" - "github.com/milvus-io/milvus/internal/util/streamingutil/options" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/options" ) type ReadOption struct { - Name string + // The name of the reader. + Name string + // ReadAheadBufferSize sets the size of scanner read ahead queue size. + // Control how many messages can be read ahead by the scanner. + // Higher value could potentially increase the scanner throughput but bigger memory utilization. + // 0 is the default value determined by the underlying wal implementation. + ReadAheadBufferSize int + // DeliverPolicy sets the deliver policy of the reader. DeliverPolicy options.DeliverPolicy } diff --git a/internal/streamingnode/server/wal/walimpls/test_framework.go b/pkg/streaming/walimpls/test_framework.go similarity index 53% rename from internal/streamingnode/server/wal/walimpls/test_framework.go rename to pkg/streaming/walimpls/test_framework.go index bcb8782a56..617d923a6c 100644 --- a/internal/streamingnode/server/wal/walimpls/test_framework.go +++ b/pkg/streaming/walimpls/test_framework.go @@ -8,6 +8,7 @@ import ( "fmt" "math/rand" "sort" + "strconv" "strings" "sync" "testing" @@ -18,9 +19,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus/internal/proto/streamingpb" - "github.com/milvus-io/milvus/internal/util/streamingutil/message" - "github.com/milvus-io/milvus/internal/util/streamingutil/options" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/options" + "github.com/milvus-io/milvus/pkg/streaming/util/types" ) var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") @@ -56,38 +57,78 @@ func (f walImplsTestFramework) Run() { assert.NotNil(f.t, o) defer o.Close() - // construct pChannel - name := "test_" + randString(4) - pChannel := &streamingpb.PChannelInfo{ - Name: name, - Term: 1, - ServerID: 1, - VChannelInfos: []*streamingpb.VChannelInfo{}, + // Test on multi pchannels + wg := sync.WaitGroup{} + pchannelCnt := 3 + wg.Add(pchannelCnt) + for i := 0; i < pchannelCnt; i++ { + // construct pChannel + name := fmt.Sprintf("test_%d_%s", i, randString(4)) + go func(name string) { + defer wg.Done() + newTestOneWALImpls(f.t, o, name, f.messageCount).Run() + }(name) } - ctx := context.Background() - - // create a wal. - w, err := o.Open(ctx, &OpenOption{ - Channel: pChannel, - }) - assert.NoError(f.t, err) - assert.NotNil(f.t, w) - defer w.Close() - - f.testReadAndWrite(ctx, w) + wg.Wait() } -func (f walImplsTestFramework) testReadAndWrite(ctx context.Context, w WALImpls) { +func newTestOneWALImpls(t *testing.T, opener OpenerImpls, pchannel string, messageCount int) *testOneWALImplsFramework { + return &testOneWALImplsFramework{ + t: t, + opener: opener, + pchannel: pchannel, + written: make([]message.ImmutableMessage, 0), + messageCount: messageCount, + term: 1, + } +} + +type testOneWALImplsFramework struct { + t *testing.T + opener OpenerImpls + written []message.ImmutableMessage + pchannel string + messageCount int + term int +} + +func (f *testOneWALImplsFramework) Run() { + ctx := context.Background() + + // test a read write loop + for ; f.term <= 3; f.term++ { + pChannel := types.PChannelInfo{ + Name: f.pchannel, + Term: int64(f.term), + ServerID: 1, + } + // create a wal. + w, err := f.opener.Open(ctx, &OpenOption{ + Channel: pChannel, + }) + assert.NoError(f.t, err) + assert.NotNil(f.t, w) + assert.Equal(f.t, pChannel.Name, w.Channel().Name) + assert.Equal(f.t, pChannel.ServerID, w.Channel().ServerID) + assert.Equal(f.t, pChannel.Term, w.Channel().Term) + + f.testReadAndWrite(ctx, w) + // close the wal + w.Close() + } +} + +func (f *testOneWALImplsFramework) testReadAndWrite(ctx context.Context, w WALImpls) { // Test read and write. wg := sync.WaitGroup{} wg.Add(3) - var written []message.ImmutableMessage + var newWritten []message.ImmutableMessage var read1, read2 []message.ImmutableMessage go func() { defer wg.Done() var err error - written, err = f.testAppend(ctx, w) + newWritten, err = f.testAppend(ctx, w) assert.NoError(f.t, err) }() go func() { @@ -107,23 +148,25 @@ func (f walImplsTestFramework) testReadAndWrite(ctx context.Context, w WALImpls) f.assertSortedMessageList(read1) f.assertSortedMessageList(read2) - sort.Sort(sortByMessageID(written)) - f.assertEqualMessageList(written, read1) - f.assertEqualMessageList(written, read2) + sort.Sort(sortByMessageID(newWritten)) + f.written = append(f.written, newWritten...) + f.assertSortedMessageList(f.written) + f.assertEqualMessageList(f.written, read1) + f.assertEqualMessageList(f.written, read2) // Test different scan policy, StartFrom. - readFromIdx := len(read1) / 2 - readFromMsgID := read1[readFromIdx].MessageID() + readFromIdx := len(f.written) / 2 + readFromMsgID := f.written[readFromIdx].MessageID() s, err := w.Read(ctx, ReadOption{ Name: "scanner_deliver_start_from", DeliverPolicy: options.DeliverPolicyStartFrom(readFromMsgID), }) assert.NoError(f.t, err) - for i := readFromIdx; i < len(read1); i++ { + for i := readFromIdx; i < len(f.written); i++ { msg, ok := <-s.Chan() assert.NotNil(f.t, msg) assert.True(f.t, ok) - assert.True(f.t, msg.MessageID().EQ(read1[i].MessageID())) + assert.True(f.t, msg.MessageID().EQ(f.written[i].MessageID())) } s.Close() @@ -133,11 +176,11 @@ func (f walImplsTestFramework) testReadAndWrite(ctx context.Context, w WALImpls) DeliverPolicy: options.DeliverPolicyStartAfter(readFromMsgID), }) assert.NoError(f.t, err) - for i := readFromIdx + 1; i < len(read1); i++ { + for i := readFromIdx + 1; i < len(f.written); i++ { msg, ok := <-s.Chan() assert.NotNil(f.t, msg) assert.True(f.t, ok) - assert.True(f.t, msg.MessageID().EQ(read1[i].MessageID())) + assert.True(f.t, msg.MessageID().EQ(f.written[i].MessageID())) } s.Close() @@ -156,15 +199,14 @@ func (f walImplsTestFramework) testReadAndWrite(ctx context.Context, w WALImpls) s.Close() } -func (f walImplsTestFramework) assertSortedMessageList(msgs []message.ImmutableMessage) { +func (f *testOneWALImplsFramework) assertSortedMessageList(msgs []message.ImmutableMessage) { for i := 1; i < len(msgs); i++ { assert.True(f.t, msgs[i-1].MessageID().LT(msgs[i].MessageID())) } } -func (f walImplsTestFramework) assertEqualMessageList(msgs1 []message.ImmutableMessage, msgs2 []message.ImmutableMessage) { - assert.Equal(f.t, f.messageCount, len(msgs1)) - assert.Equal(f.t, f.messageCount, len(msgs2)) +func (f *testOneWALImplsFramework) assertEqualMessageList(msgs1 []message.ImmutableMessage, msgs2 []message.ImmutableMessage) { + assert.Equal(f.t, len(msgs2), len(msgs1)) for i := 0; i < len(msgs1); i++ { assert.True(f.t, msgs1[i].MessageID().EQ(msgs2[i].MessageID())) // assert.True(f.t, bytes.Equal(msgs1[i].Payload(), msgs2[i].Payload())) @@ -181,10 +223,10 @@ func (f walImplsTestFramework) assertEqualMessageList(msgs1 []message.ImmutableM } } -func (f walImplsTestFramework) testAppend(ctx context.Context, w WALImpls) ([]message.ImmutableMessage, error) { +func (f *testOneWALImplsFramework) testAppend(ctx context.Context, w WALImpls) ([]message.ImmutableMessage, error) { ids := make([]message.ImmutableMessage, f.messageCount) swg := sizedwaitgroup.New(5) - for i := 0; i < f.messageCount; i++ { + for i := 0; i < f.messageCount-1; i++ { swg.Add() go func(i int) { defer swg.Done() @@ -222,24 +264,68 @@ func (f walImplsTestFramework) testAppend(ctx context.Context, w WALImpls) ([]me }(i) } swg.Wait() + // send a final hint message + header := commonpb.MsgHeader{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Insert, + MsgID: int64(f.messageCount - 1), + }, + } + payload, err := proto.Marshal(&header) + if err != nil { + panic(err) + } + properties := map[string]string{ + "id": fmt.Sprintf("%d", f.messageCount-1), + "const": "t", + "term": strconv.FormatInt(int64(f.term), 10), + } + msg := message.NewBuilder(). + WithPayload(payload). + WithProperties(properties). + WithMessageType(message.MessageTypeTimeTick). + BuildMutable() + id, err := w.Append(ctx, msg) + assert.NoError(f.t, err) + ids[f.messageCount-1] = message.NewBuilder(). + WithPayload(payload). + WithProperties(properties). + WithMessageID(id). + WithMessageType(message.MessageTypeTimeTick). + BuildImmutable() return ids, nil } -func (f walImplsTestFramework) testRead(ctx context.Context, w WALImpls, name string) ([]message.ImmutableMessage, error) { +func (f *testOneWALImplsFramework) testRead(ctx context.Context, w WALImpls, name string) ([]message.ImmutableMessage, error) { s, err := w.Read(ctx, ReadOption{ - Name: name, - DeliverPolicy: options.DeliverPolicyAll(), + Name: name, + DeliverPolicy: options.DeliverPolicyAll(), + ReadAheadBufferSize: 128, }) assert.NoError(f.t, err) assert.Equal(f.t, name, s.Name()) defer s.Close() - msgs := make([]message.ImmutableMessage, 0, f.messageCount) - for i := 0; i < f.messageCount; i++ { + expectedCnt := f.messageCount + len(f.written) + msgs := make([]message.ImmutableMessage, 0, expectedCnt) + for { msg, ok := <-s.Chan() assert.NotNil(f.t, msg) assert.True(f.t, ok) msgs = append(msgs, msg) + if msg.MessageType() == message.MessageTypeTimeTick { + termString, ok := msg.Properties().Get("term") + if !ok { + panic("lost term properties") + } + term, err := strconv.ParseInt(termString, 10, 64) + if err != nil { + panic(err) + } + if int(term) == f.term { + break + } + } } return msgs, nil } diff --git a/internal/streamingnode/server/wal/walimpls/wal.go b/pkg/streaming/walimpls/wal.go similarity index 73% rename from internal/streamingnode/server/wal/walimpls/wal.go rename to pkg/streaming/walimpls/wal.go index 40e8e47523..6d65fd1606 100644 --- a/internal/streamingnode/server/wal/walimpls/wal.go +++ b/pkg/streaming/walimpls/wal.go @@ -3,14 +3,14 @@ package walimpls import ( "context" - "github.com/milvus-io/milvus/internal/proto/streamingpb" - "github.com/milvus-io/milvus/internal/util/streamingutil/message" + "github.com/milvus-io/milvus/pkg/streaming/util/message" + "github.com/milvus-io/milvus/pkg/streaming/util/types" ) type WALImpls interface { // Channel returns the channel assignment info of the wal. // Should be read-only. - Channel() *streamingpb.PChannelInfo + Channel() types.PChannelInfo // Append writes a record to the log. Append(ctx context.Context, msg message.MutableMessage) (message.MessageID, error)