diff --git a/Makefile b/Makefile index ba8a747eec..614c900646 100644 --- a/Makefile +++ b/Makefile @@ -542,6 +542,10 @@ generate-yaml: milvus-tools @echo "Updating milvus config yaml" @$(PWD)/bin/tools/config gen-yaml && mv milvus.yaml configs/milvus.yaml +generate-message-codegen: getdeps + @echo "Generating message codegen ..." + @(cd pkg/streaming/util/message/codegen && PATH=$(PWD)/bin:$(PATH) go generate .) + MMAP_MIGRATION_PATH = $(PWD)/cmd/tools/migration/mmap/tool mmap-migration: @echo "Building migration tool ..." diff --git a/internal/streamingnode/server/flusher/flusherimpl/msg_handler_impl_test.go b/internal/streamingnode/server/flusher/flusherimpl/msg_handler_impl_test.go index 4605354fad..76cbde38fb 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/msg_handler_impl_test.go +++ b/internal/streamingnode/server/flusher/flusherimpl/msg_handler_impl_test.go @@ -122,7 +122,7 @@ func TestFlushMsgHandler_HandlSchemaChange(t *testing.T) { handler := newMsgHandler(wbMgr) msgID := mock_message.NewMockMessageID(t) - im := message.MustAsImmutableCollectionSchemaChangeV2(msg.IntoImmutableMessage(msgID)) + im := message.MustAsImmutableSchemaChangeMessageV2(msg.IntoImmutableMessage(msgID)) err := handler.HandleSchemaChange(context.Background(), im) assert.Error(t, err) diff --git a/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go b/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go index caaef123b4..95561ebfbd 100644 --- a/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go +++ b/internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go @@ -227,7 +227,7 @@ func (impl *shardInterceptor) handleManualFlushMessage(ctx context.Context, msg // handleSchemaChange handles the schema change message. func (impl *shardInterceptor) handleSchemaChange(ctx context.Context, msg message.MutableMessage, appendOp interceptors.Append) (message.MessageID, error) { - schemaChangeMsg := message.MustAsMutableCollectionSchemaChangeV2(msg) + schemaChangeMsg := message.MustAsMutableSchemaChangeMessageV2(msg) header := schemaChangeMsg.Header() segmentIDs, err := impl.shardManager.FlushAndFenceSegmentAllocUntil(header.GetCollectionId(), msg.TimeTick()) if err != nil { diff --git a/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go b/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go index 556ce54b1c..ccf2f4ba6a 100644 --- a/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go +++ b/internal/streamingnode/server/wal/recovery/recovery_storage_impl.go @@ -280,7 +280,7 @@ func (r *recoveryStorageImpl) handleMessage(msg message.ImmutableMessage) { immutableMsg := message.MustAsImmutableImportMessageV1(msg) r.handleImport(immutableMsg) case message.MessageTypeSchemaChange: - immutableMsg := message.MustAsImmutableCollectionSchemaChangeV2(msg) + immutableMsg := message.MustAsImmutableSchemaChangeMessageV2(msg) r.handleSchemaChange(immutableMsg) case message.MessageTypeTimeTick: // nothing, the time tick message make no recovery operation. diff --git a/internal/streamingnode/server/wal/recovery/vchannel_recovery_info_test.go b/internal/streamingnode/server/wal/recovery/vchannel_recovery_info_test.go index d78c8d9d83..4297aaba55 100644 --- a/internal/streamingnode/server/wal/recovery/vchannel_recovery_info_test.go +++ b/internal/streamingnode/server/wal/recovery/vchannel_recovery_info_test.go @@ -195,7 +195,7 @@ func TestNewVChannelRecoveryInfoFromCreateCollectionMessage(t *testing.T) { msgID5 := rmq.NewRmqID(5) ts += 1 immutableMsg5 := msg5.WithTimeTick(ts).WithLastConfirmed(msgID5).IntoImmutableMessage(msgID5) - info.ObserveSchemaChange(message.MustAsImmutableCollectionSchemaChangeV2(immutableMsg5)) + info.ObserveSchemaChange(message.MustAsImmutableSchemaChangeMessageV2(immutableMsg5)) idx, schema2Saved := info.GetSchema(0) assert.Equal(t, 1, idx) diff --git a/pkg/go.mod b/pkg/go.mod index 0b20da71cf..e4eca7c241 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -15,6 +15,7 @@ require ( github.com/cockroachdb/errors v1.9.1 github.com/confluentinc/confluent-kafka-go v1.9.1 github.com/containerd/cgroups/v3 v3.0.3 + github.com/dave/jennifer v1.7.1 github.com/expr-lang/expr v1.15.7 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/jolestar/go-commons-pool/v2 v2.1.2 diff --git a/pkg/go.sum b/pkg/go.sum index 43c4a1eda1..6094ae3f11 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -182,6 +182,8 @@ github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 h1:iwZdTE0PVqJCos1v github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM= github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0= github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0= +github.com/dave/jennifer v1.7.1 h1:B4jJJDHelWcDhlRQxWeo0Npa/pYKBLrirAQoTN45txo= +github.com/dave/jennifer v1.7.1/go.mod h1:nXbxhEmQfOZhWml3D1cDK5M1FLnMSozpbFN/m3RmGZc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= diff --git a/pkg/mocks/streaming/util/mock_message/mock_BroadcastMutableMessage.go b/pkg/mocks/streaming/util/mock_message/mock_BroadcastMutableMessage.go index 8754887a89..f83ef10f8f 100644 --- a/pkg/mocks/streaming/util/mock_message/mock_BroadcastMutableMessage.go +++ b/pkg/mocks/streaming/util/mock_message/mock_BroadcastMutableMessage.go @@ -344,6 +344,51 @@ func (_c *MockBroadcastMutableMessage_MessageType_Call) RunAndReturn(run func() return _c } +// MessageTypeWithVersion provides a mock function with no fields +func (_m *MockBroadcastMutableMessage) MessageTypeWithVersion() message.MessageTypeWithVersion { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for MessageTypeWithVersion") + } + + var r0 message.MessageTypeWithVersion + if rf, ok := ret.Get(0).(func() message.MessageTypeWithVersion); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(message.MessageTypeWithVersion) + } + + return r0 +} + +// MockBroadcastMutableMessage_MessageTypeWithVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MessageTypeWithVersion' +type MockBroadcastMutableMessage_MessageTypeWithVersion_Call struct { + *mock.Call +} + +// MessageTypeWithVersion is a helper method to define mock.On call +func (_e *MockBroadcastMutableMessage_Expecter) MessageTypeWithVersion() *MockBroadcastMutableMessage_MessageTypeWithVersion_Call { + return &MockBroadcastMutableMessage_MessageTypeWithVersion_Call{Call: _e.mock.On("MessageTypeWithVersion")} +} + +func (_c *MockBroadcastMutableMessage_MessageTypeWithVersion_Call) Run(run func()) *MockBroadcastMutableMessage_MessageTypeWithVersion_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockBroadcastMutableMessage_MessageTypeWithVersion_Call) Return(_a0 message.MessageTypeWithVersion) *MockBroadcastMutableMessage_MessageTypeWithVersion_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockBroadcastMutableMessage_MessageTypeWithVersion_Call) RunAndReturn(run func() message.MessageTypeWithVersion) *MockBroadcastMutableMessage_MessageTypeWithVersion_Call { + _c.Call.Return(run) + return _c +} + // Payload provides a mock function with no fields func (_m *MockBroadcastMutableMessage) Payload() []byte { ret := _m.Called() diff --git a/pkg/mocks/streaming/util/mock_message/mock_ImmutableMessage.go b/pkg/mocks/streaming/util/mock_message/mock_ImmutableMessage.go index 7e314641a6..4022bb96b7 100644 --- a/pkg/mocks/streaming/util/mock_message/mock_ImmutableMessage.go +++ b/pkg/mocks/streaming/util/mock_message/mock_ImmutableMessage.go @@ -161,6 +161,53 @@ func (_c *MockImmutableMessage_EstimateSize_Call) RunAndReturn(run func() int) * return _c } +// IntoImmutableMessageProto provides a mock function with no fields +func (_m *MockImmutableMessage) IntoImmutableMessageProto() *messagespb.ImmutableMessage { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for IntoImmutableMessageProto") + } + + var r0 *messagespb.ImmutableMessage + if rf, ok := ret.Get(0).(func() *messagespb.ImmutableMessage); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*messagespb.ImmutableMessage) + } + } + + return r0 +} + +// MockImmutableMessage_IntoImmutableMessageProto_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IntoImmutableMessageProto' +type MockImmutableMessage_IntoImmutableMessageProto_Call struct { + *mock.Call +} + +// IntoImmutableMessageProto is a helper method to define mock.On call +func (_e *MockImmutableMessage_Expecter) IntoImmutableMessageProto() *MockImmutableMessage_IntoImmutableMessageProto_Call { + return &MockImmutableMessage_IntoImmutableMessageProto_Call{Call: _e.mock.On("IntoImmutableMessageProto")} +} + +func (_c *MockImmutableMessage_IntoImmutableMessageProto_Call) Run(run func()) *MockImmutableMessage_IntoImmutableMessageProto_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockImmutableMessage_IntoImmutableMessageProto_Call) Return(_a0 *messagespb.ImmutableMessage) *MockImmutableMessage_IntoImmutableMessageProto_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockImmutableMessage_IntoImmutableMessageProto_Call) RunAndReturn(run func() *messagespb.ImmutableMessage) *MockImmutableMessage_IntoImmutableMessageProto_Call { + _c.Call.Return(run) + return _c +} + // IntoMessageProto provides a mock function with no fields func (_m *MockImmutableMessage) IntoMessageProto() *messagespb.Message { ret := _m.Called() @@ -438,6 +485,51 @@ func (_c *MockImmutableMessage_MessageType_Call) RunAndReturn(run func() message return _c } +// MessageTypeWithVersion provides a mock function with no fields +func (_m *MockImmutableMessage) MessageTypeWithVersion() message.MessageTypeWithVersion { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for MessageTypeWithVersion") + } + + var r0 message.MessageTypeWithVersion + if rf, ok := ret.Get(0).(func() message.MessageTypeWithVersion); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(message.MessageTypeWithVersion) + } + + return r0 +} + +// MockImmutableMessage_MessageTypeWithVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MessageTypeWithVersion' +type MockImmutableMessage_MessageTypeWithVersion_Call struct { + *mock.Call +} + +// MessageTypeWithVersion is a helper method to define mock.On call +func (_e *MockImmutableMessage_Expecter) MessageTypeWithVersion() *MockImmutableMessage_MessageTypeWithVersion_Call { + return &MockImmutableMessage_MessageTypeWithVersion_Call{Call: _e.mock.On("MessageTypeWithVersion")} +} + +func (_c *MockImmutableMessage_MessageTypeWithVersion_Call) Run(run func()) *MockImmutableMessage_MessageTypeWithVersion_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockImmutableMessage_MessageTypeWithVersion_Call) Return(_a0 message.MessageTypeWithVersion) *MockImmutableMessage_MessageTypeWithVersion_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockImmutableMessage_MessageTypeWithVersion_Call) RunAndReturn(run func() message.MessageTypeWithVersion) *MockImmutableMessage_MessageTypeWithVersion_Call { + _c.Call.Return(run) + return _c +} + // Payload provides a mock function with no fields func (_m *MockImmutableMessage) Payload() []byte { ret := _m.Called() diff --git a/pkg/mocks/streaming/util/mock_message/mock_ImmutableTxnMessage.go b/pkg/mocks/streaming/util/mock_message/mock_ImmutableTxnMessage.go index 8cb0be0e8f..cae9070713 100644 --- a/pkg/mocks/streaming/util/mock_message/mock_ImmutableTxnMessage.go +++ b/pkg/mocks/streaming/util/mock_message/mock_ImmutableTxnMessage.go @@ -255,6 +255,53 @@ func (_c *MockImmutableTxnMessage_EstimateSize_Call) RunAndReturn(run func() int return _c } +// IntoImmutableMessageProto provides a mock function with no fields +func (_m *MockImmutableTxnMessage) IntoImmutableMessageProto() *messagespb.ImmutableMessage { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for IntoImmutableMessageProto") + } + + var r0 *messagespb.ImmutableMessage + if rf, ok := ret.Get(0).(func() *messagespb.ImmutableMessage); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*messagespb.ImmutableMessage) + } + } + + return r0 +} + +// MockImmutableTxnMessage_IntoImmutableMessageProto_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IntoImmutableMessageProto' +type MockImmutableTxnMessage_IntoImmutableMessageProto_Call struct { + *mock.Call +} + +// IntoImmutableMessageProto is a helper method to define mock.On call +func (_e *MockImmutableTxnMessage_Expecter) IntoImmutableMessageProto() *MockImmutableTxnMessage_IntoImmutableMessageProto_Call { + return &MockImmutableTxnMessage_IntoImmutableMessageProto_Call{Call: _e.mock.On("IntoImmutableMessageProto")} +} + +func (_c *MockImmutableTxnMessage_IntoImmutableMessageProto_Call) Run(run func()) *MockImmutableTxnMessage_IntoImmutableMessageProto_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockImmutableTxnMessage_IntoImmutableMessageProto_Call) Return(_a0 *messagespb.ImmutableMessage) *MockImmutableTxnMessage_IntoImmutableMessageProto_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockImmutableTxnMessage_IntoImmutableMessageProto_Call) RunAndReturn(run func() *messagespb.ImmutableMessage) *MockImmutableTxnMessage_IntoImmutableMessageProto_Call { + _c.Call.Return(run) + return _c +} + // IntoMessageProto provides a mock function with no fields func (_m *MockImmutableTxnMessage) IntoMessageProto() *messagespb.Message { ret := _m.Called() @@ -532,6 +579,51 @@ func (_c *MockImmutableTxnMessage_MessageType_Call) RunAndReturn(run func() mess return _c } +// MessageTypeWithVersion provides a mock function with no fields +func (_m *MockImmutableTxnMessage) MessageTypeWithVersion() message.MessageTypeWithVersion { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for MessageTypeWithVersion") + } + + var r0 message.MessageTypeWithVersion + if rf, ok := ret.Get(0).(func() message.MessageTypeWithVersion); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(message.MessageTypeWithVersion) + } + + return r0 +} + +// MockImmutableTxnMessage_MessageTypeWithVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MessageTypeWithVersion' +type MockImmutableTxnMessage_MessageTypeWithVersion_Call struct { + *mock.Call +} + +// MessageTypeWithVersion is a helper method to define mock.On call +func (_e *MockImmutableTxnMessage_Expecter) MessageTypeWithVersion() *MockImmutableTxnMessage_MessageTypeWithVersion_Call { + return &MockImmutableTxnMessage_MessageTypeWithVersion_Call{Call: _e.mock.On("MessageTypeWithVersion")} +} + +func (_c *MockImmutableTxnMessage_MessageTypeWithVersion_Call) Run(run func()) *MockImmutableTxnMessage_MessageTypeWithVersion_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockImmutableTxnMessage_MessageTypeWithVersion_Call) Return(_a0 message.MessageTypeWithVersion) *MockImmutableTxnMessage_MessageTypeWithVersion_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockImmutableTxnMessage_MessageTypeWithVersion_Call) RunAndReturn(run func() message.MessageTypeWithVersion) *MockImmutableTxnMessage_MessageTypeWithVersion_Call { + _c.Call.Return(run) + return _c +} + // Payload provides a mock function with no fields func (_m *MockImmutableTxnMessage) Payload() []byte { ret := _m.Called() diff --git a/pkg/mocks/streaming/util/mock_message/mock_MutableMessage.go b/pkg/mocks/streaming/util/mock_message/mock_MutableMessage.go index efe0720860..4e7e6275c2 100644 --- a/pkg/mocks/streaming/util/mock_message/mock_MutableMessage.go +++ b/pkg/mocks/streaming/util/mock_message/mock_MutableMessage.go @@ -392,6 +392,51 @@ func (_c *MockMutableMessage_MessageType_Call) RunAndReturn(run func() message.M return _c } +// MessageTypeWithVersion provides a mock function with no fields +func (_m *MockMutableMessage) MessageTypeWithVersion() message.MessageTypeWithVersion { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for MessageTypeWithVersion") + } + + var r0 message.MessageTypeWithVersion + if rf, ok := ret.Get(0).(func() message.MessageTypeWithVersion); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(message.MessageTypeWithVersion) + } + + return r0 +} + +// MockMutableMessage_MessageTypeWithVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MessageTypeWithVersion' +type MockMutableMessage_MessageTypeWithVersion_Call struct { + *mock.Call +} + +// MessageTypeWithVersion is a helper method to define mock.On call +func (_e *MockMutableMessage_Expecter) MessageTypeWithVersion() *MockMutableMessage_MessageTypeWithVersion_Call { + return &MockMutableMessage_MessageTypeWithVersion_Call{Call: _e.mock.On("MessageTypeWithVersion")} +} + +func (_c *MockMutableMessage_MessageTypeWithVersion_Call) Run(run func()) *MockMutableMessage_MessageTypeWithVersion_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockMutableMessage_MessageTypeWithVersion_Call) Return(_a0 message.MessageTypeWithVersion) *MockMutableMessage_MessageTypeWithVersion_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMutableMessage_MessageTypeWithVersion_Call) RunAndReturn(run func() message.MessageTypeWithVersion) *MockMutableMessage_MessageTypeWithVersion_Call { + _c.Call.Return(run) + return _c +} + // Payload provides a mock function with no fields func (_m *MockMutableMessage) Payload() []byte { ret := _m.Called() diff --git a/pkg/streaming/util/message/adaptor/ts_msg_newer.go b/pkg/streaming/util/message/adaptor/ts_msg_newer.go index eee2e23977..4a1aa166be 100644 --- a/pkg/streaming/util/message/adaptor/ts_msg_newer.go +++ b/pkg/streaming/util/message/adaptor/ts_msg_newer.go @@ -132,7 +132,7 @@ func (s *SchemaChangeMessageBody) ID() msgstream.UniqueID { } func NewSchemaChangeMessageBody(msg message.ImmutableMessage) (msgstream.TsMsg, error) { - schChgMsg, err := message.AsImmutableCollectionSchemaChangeV2(msg) + schChgMsg, err := message.AsImmutableSchemaChangeMessageV2(msg) if err != nil { return nil, err } diff --git a/pkg/streaming/util/message/builder.go b/pkg/streaming/util/message/builder.go index a5da317001..553753b720 100644 --- a/pkg/streaming/util/message/builder.go +++ b/pkg/streaming/util/message/builder.go @@ -7,7 +7,6 @@ import ( "github.com/cockroachdb/errors" "google.golang.org/protobuf/proto" - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/pkg/v2/proto/messagespb" "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" @@ -36,6 +35,20 @@ func NewBroadcastMutableMessageBeforeAppend(payload []byte, properties map[strin return m } +// NewImmutableMessageFromProto creates a new immutable message from the proto message. +// !!! Only used at server side for streaming internal service, don't use it at client side. +func NewImmutableMessageFromProto(walName string, msg *messagespb.ImmutableMessage) ImmutableMessage { + id, err := UnmarshalMessageID(walName, msg.Id.Id) + if err != nil { + panic(err) + } + return NewImmutableMesasge( + id, + msg.Payload, + msg.Properties, + ) +} + // NewImmutableMessage creates a new immutable message. // !!! Only used at server side for streaming internal service, don't use it at client side. func NewImmutableMesasge( @@ -52,48 +65,13 @@ func NewImmutableMesasge( } } -// List all type-safe mutable message builders here. -var ( - NewTimeTickMessageBuilderV1 = createNewMessageBuilderV1[*TimeTickMessageHeader, *msgpb.TimeTickMsg]() - NewInsertMessageBuilderV1 = createNewMessageBuilderV1[*InsertMessageHeader, *msgpb.InsertRequest]() - NewDeleteMessageBuilderV1 = createNewMessageBuilderV1[*DeleteMessageHeader, *msgpb.DeleteRequest]() - NewCreateCollectionMessageBuilderV1 = createNewMessageBuilderV1[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest]() - NewDropCollectionMessageBuilderV1 = createNewMessageBuilderV1[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest]() - NewCreatePartitionMessageBuilderV1 = createNewMessageBuilderV1[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest]() - NewDropPartitionMessageBuilderV1 = createNewMessageBuilderV1[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest]() - NewImportMessageBuilderV1 = createNewMessageBuilderV1[*ImportMessageHeader, *msgpb.ImportMsg]() - NewCreateSegmentMessageBuilderV2 = createNewMessageBuilderV2[*CreateSegmentMessageHeader, *CreateSegmentMessageBody]() - NewFlushMessageBuilderV2 = createNewMessageBuilderV2[*FlushMessageHeader, *FlushMessageBody]() - NewManualFlushMessageBuilderV2 = createNewMessageBuilderV2[*ManualFlushMessageHeader, *ManualFlushMessageBody]() - NewBeginTxnMessageBuilderV2 = createNewMessageBuilderV2[*BeginTxnMessageHeader, *BeginTxnMessageBody]() - NewCommitTxnMessageBuilderV2 = createNewMessageBuilderV2[*CommitTxnMessageHeader, *CommitTxnMessageBody]() - NewRollbackTxnMessageBuilderV2 = createNewMessageBuilderV2[*RollbackTxnMessageHeader, *RollbackTxnMessageBody]() - NewSchemaChangeMessageBuilderV2 = createNewMessageBuilderV2[*SchemaChangeMessageHeader, *SchemaChangeMessageBody]() - newTxnMessageBuilderV2 = createNewMessageBuilderV2[*TxnMessageHeader, *TxnMessageBody]() -) - -// createNewMessageBuilderV1 creates a new message builder with v1 marker. -func createNewMessageBuilderV1[H proto.Message, B proto.Message]() func() *mutableMesasgeBuilder[H, B] { - return func() *mutableMesasgeBuilder[H, B] { - return newMutableMessageBuilder[H, B](VersionV1) - } -} - -// List all type-safe mutable message builders here. -func createNewMessageBuilderV2[H proto.Message, B proto.Message]() func() *mutableMesasgeBuilder[H, B] { - return func() *mutableMesasgeBuilder[H, B] { - return newMutableMessageBuilder[H, B](VersionV2) - } -} - // newMutableMessageBuilder creates a new builder. // Should only used at client side. -func newMutableMessageBuilder[H proto.Message, B proto.Message](v Version) *mutableMesasgeBuilder[H, B] { - var h H - messageType := mustGetMessageTypeFromHeader(h) +func newMutableMessageBuilder[H proto.Message, B proto.Message]() *mutableMesasgeBuilder[H, B] { + messageType := MustGetMessageTypeWithVersion[H, B]() properties := make(propertiesImpl) - properties.Set(messageTypeKey, messageType.marshal()) - properties.Set(messageVersion, v.String()) + properties.Set(messageTypeKey, messageType.MessageType.marshal()) + properties.Set(messageVersion, messageType.Version.String()) return &mutableMesasgeBuilder[H, B]{ properties: properties, } @@ -116,8 +94,8 @@ func (b *mutableMesasgeBuilder[H, B]) WithHeader(h H) *mutableMesasgeBuilder[H, // WithNotPersist creates a new builder with not persisted property. func (b *mutableMesasgeBuilder[H, B]) WithNotPersisted() *mutableMesasgeBuilder[H, B] { - messageType := mustGetMessageTypeFromHeader(b.header) - if messageType != MessageTypeTimeTick { + messageType := MustGetMessageTypeWithVersion[H, B]() + if messageType.MessageType != MessageTypeTimeTick { panic("only time tick message can be not persisted") } b.WithProperty(messageNotPersisteted, "") @@ -266,8 +244,8 @@ func (b *mutableMesasgeBuilder[H, B]) build() (*messageImpl, error) { return nil, errors.Wrap(err, "failed to marshal body") } if b.cipherConfig != nil { - messageType := mustGetMessageTypeFromHeader(b.header) - if !messageType.CanEnableCipher() { + messageType := MustGetMessageTypeWithVersion[H, B]() + if !messageType.MessageType.CanEnableCipher() { panic(fmt.Sprintf("the message type cannot enable cipher, %s", messageType)) } @@ -356,7 +334,7 @@ func newImmutableTxnMesasgeFromWAL( commit ImmutableCommitTxnMessageV2, ) (ImmutableTxnMessage, error) { // combine begin and commit messages into one. - msg, err := newTxnMessageBuilderV2(). + msg, err := newMutableMessageBuilder[*TxnMessageHeader, *TxnMessageBody](). WithHeader(&TxnMessageHeader{}). WithBody(&TxnMessageBody{}). WithVChannel(begin.VChannel()). diff --git a/pkg/streaming/util/message/codegen/codegen_test.go b/pkg/streaming/util/message/codegen/codegen_test.go new file mode 100644 index 0000000000..727f3187a4 --- /dev/null +++ b/pkg/streaming/util/message/codegen/codegen_test.go @@ -0,0 +1,12 @@ +package main + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCodegen(t *testing.T) { + _, err := codegen() + assert.NoError(t, err) +} diff --git a/pkg/streaming/util/message/codegen/main.go b/pkg/streaming/util/message/codegen/main.go new file mode 100644 index 0000000000..d16e879e3e --- /dev/null +++ b/pkg/streaming/util/message/codegen/main.go @@ -0,0 +1,355 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:generate go run main.go +//go:generate gofumpt -w ../reflect_info.go +package main + +import ( + "encoding/json" + "fmt" + "os" + "strings" + + "github.com/dave/jennifer/jen" +) + +const ( + messagePackage = "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" + reflectInfoFile = "reflect_info.json" + reflectInfoGo = "../reflect_info.go" +) + +type MessageSpecializedType struct { + HeaderType string `json:"HeaderType"` + BodyType string `json:"BodyType"` +} + +type MessageTypeWithVersion struct { + MessageType string `json:"MessageType"` + Version int64 `json:"Version"` +} + +// JSONMessageSpecializedType represents the JSON format for specialized types +type JSONMessageSpecializedType struct { + HeaderType string `json:"HeaderType"` + BodyType string `json:"BodyType"` +} + +// JSONMessageTypeWithVersion represents the JSON format for message with version +type JSONMessageTypeWithVersion struct { + MessageType string `json:"MessageType"` + Version int `json:"Version"` +} + +// JSONMessageReflectInfo represents the JSON format for message reflect info +type JSONMessageReflectInfo struct { + MessageSpecializedType JSONMessageSpecializedType `json:"MessageSpecializedType"` + MessageTypeWithVersion JSONMessageTypeWithVersion `json:"MessageTypeWithVersion"` + NoUtilFunctions bool `json:"NoUtilFunctions"` +} + +// JSONConfig represents the JSON configuration file format +type JSONConfig struct { + Packages map[string]string `json:"packages"` + ExtraExportTypes []string `json:"extraExportTypes"` + MessageReflectInfoTable []JSONMessageReflectInfo `json:"messageReflectInfoTable"` +} + +// Generator is the generator for message reflect info +type Generator struct { + Config JSONConfig + File *jen.File + ExportTypes map[string]string +} + +// NewGenerator creates a new generator +func NewGenerator() *Generator { + return &Generator{ + File: jen.NewFilePathName(messagePackage, "message"), + ExportTypes: make(map[string]string), + } +} + +// parseTypeString parses a type string like "msgpb.TimeTickMsg" into package and type name +func parseTypeString(typeStr string) (pkg, typeName string) { + parts := strings.Split(typeStr, ".") + if len(parts) == 2 { + return parts[0], parts[1] + } + return "", parts[0] +} + +// getMessageTypeName extracts the message type name from the string +func getMessageTypeName(messageTypeStr string) string { + if strings.HasPrefix(messageTypeStr, "MessageType") { + return messageTypeStr[11:] // Remove "MessageType" prefix + } + return messageTypeStr +} + +// getVersionSuffix converts Version to string suffix +func getVersionSuffix(v int) string { + return fmt.Sprintf("V%d", v) +} + +// getMessageTypeWithVersionId returns the id for the message type with version +func getMessageTypeWithVersionId(info JSONMessageReflectInfo) string { + msgTypeName := getMessageTypeName(info.MessageTypeWithVersion.MessageType) + versionSuffix := getVersionSuffix(info.MessageTypeWithVersion.Version) + return fmt.Sprintf("MessageType%s%s", msgTypeName, versionSuffix) +} + +// getSpecializedMessageTypeId returns the type id for the specialized message +func getSpecializedMessageTypeId(info JSONMessageReflectInfo) string { + msgTypeName := getMessageTypeName(info.MessageTypeWithVersion.MessageType) + versionSuffix := getVersionSuffix(info.MessageTypeWithVersion.Version) + return fmt.Sprintf("SpecializedType%s%s", msgTypeName, versionSuffix) +} + +func (g *Generator) getPackagePath(pkg string) string { + if importPath, exists := g.Config.Packages[pkg]; exists { + return importPath + } + panic(fmt.Sprintf("package %s not found", pkg)) +} + +// createQualifiedType creates a qualified type reference for jennifer +func (g *Generator) createQualifiedType(typeStr string) *jen.Statement { + pkg, typeName := parseTypeString(typeStr) + if pkg == "" || g.ExportTypes[typeStr] != "" { + return jen.Op("*").Id(g.ExportTypes[typeStr]) + } + return jen.Op("*").Qual(g.getPackagePath(pkg), typeName) +} + +// generateHelperFunctions generates all helper functions for a single messageReflectInfo +func (g *Generator) generateHelperFunctions(info JSONMessageReflectInfo) { + f := g.File + + msgTypeName := getMessageTypeName(info.MessageTypeWithVersion.MessageType) + versionSuffix := getVersionSuffix(info.MessageTypeWithVersion.Version) + + // Create qualified type references + headerType := g.createQualifiedType(info.MessageSpecializedType.HeaderType) + bodyType := g.createQualifiedType(info.MessageSpecializedType.BodyType) + + baseName := msgTypeName + "Message" + versionSuffix + + // Type aliases + f.Comment(fmt.Sprintf("// Type aliases for %s", baseName)) + f.Type().Id("Mutable"+baseName).Op("=").Qual(messagePackage, "specializedMutableMessage").Types(headerType, bodyType) + f.Type().Id("Immutable"+baseName).Op("=").Qual(messagePackage, "SpecializedImmutableMessage").Types(headerType, bodyType) + f.Type().Id("Broadcast"+baseName).Op("=").Qual(messagePackage, "SpecializedBroadcastMessage").Types(headerType, bodyType) + f.Line() + + // MessageTypeWithVersion constant + f.Comment(fmt.Sprintf("// MessageTypeWithVersion for %s", baseName)) + f.Var().Id(getMessageTypeWithVersionId(info)).Op("=").Qual(messagePackage, "MessageTypeWithVersion").Values(jen.Dict{ + jen.Id("MessageType"): jen.Qual(messagePackage, info.MessageTypeWithVersion.MessageType), + jen.Id("Version"): jen.Id("Version" + versionSuffix), + }) + // MessageSpecializedType constant + f.Comment(fmt.Sprintf("// MessageSpecializedType for %s", baseName)) + f.Var().Id(getSpecializedMessageTypeId(info)).Op("=").Qual(messagePackage, "MessageSpecializedType").Values(jen.Dict{ + jen.Id("HeaderType"): jen.Qual("reflect", "TypeOf").Call(jen.Parens(headerType).Parens(jen.Nil())), + jen.Id("BodyType"): jen.Qual("reflect", "TypeOf").Call(jen.Parens(bodyType).Parens(jen.Nil())), + }) + + if !info.NoUtilFunctions { + // AsMutable function + f.Comment(fmt.Sprintf("// AsMutable%s converts a BasicMessage to Mutable%s", baseName, baseName)) + f.Var().Id("AsMutable"+baseName).Op("=").Qual(messagePackage, "asSpecializedMutableMessage").Types(headerType, bodyType) + // MustAsMutable function + f.Comment(fmt.Sprintf("// MustAsMutable%s converts a BasicMessage to Mutable%s, panics on error", baseName, baseName)) + f.Var().Id("MustAsMutable"+baseName).Op("=").Qual(messagePackage, "mustAsSpecializedMutableMessage").Types(headerType, bodyType) + // AsImmutable function + f.Comment(fmt.Sprintf("// AsImmutable%s converts an ImmutableMessage to Immutable%s", baseName, baseName)) + f.Var().Id("AsImmutable"+baseName).Op("=").Qual(messagePackage, "asSpecializedImmutableMessage").Types(headerType, bodyType) + // MustAsImmutable function + f.Comment(fmt.Sprintf("// MustAsImmutable%s converts an ImmutableMessage to Immutable%s, panics on error", baseName, baseName)) + f.Var().Id("MustAsImmutable"+baseName).Op("=").Qual(messagePackage, "MustAsSpecializedImmutableMessage").Types(headerType, bodyType) + // AsBroadcast function + f.Comment(fmt.Sprintf("// AsBroadcast%s converts a BasicMessage to Broadcast%s", baseName, baseName)) + f.Var().Id("AsBroadcast"+baseName).Op("=").Qual(messagePackage, "asSpecializedBroadcastMessage").Types(headerType, bodyType) + // MustAsBroadcast function + f.Comment(fmt.Sprintf("// MustAsBroadcast%s converts a BasicMessage to Broadcast%s, panics on error", baseName, baseName)) + f.Var().Id("MustAsBroadcast"+baseName).Op("=").Qual(messagePackage, "MustAsSpecializedBroadcastMessage").Types(headerType, bodyType) + f.Line() + + // NewBuilder function + f.Comment(fmt.Sprintf("// New%sMessageBuilder%s creates a new message builder for %s", msgTypeName, versionSuffix, baseName)) + f.Var().Id("New"+msgTypeName+"MessageBuilder"+versionSuffix).Op("=").Qual(messagePackage, "newMutableMessageBuilder").Types(headerType, bodyType) + f.Line() + } +} + +// GenerateCodeFromJSON generates Go code from JSON configuration +func (g *Generator) GenerateCodeFromJSON(jsonData []byte) (string, error) { + if err := json.Unmarshal(jsonData, &g.Config); err != nil { + return "", fmt.Errorf("failed to parse JSON: %w", err) + } + + g.File.HeaderComment("// Code generated by message-codegen. DO NOT EDIT.") + + // Export types + g.exportTypes() + + // Generate functions for each messageReflectInfo + for _, info := range g.Config.MessageReflectInfoTable { + g.generateHelperFunctions(info) + } + + // Generate tables + g.generateTables() + + // Generate export AsImmutableTxnMessage + return g.File.GoString(), nil +} + +// exportTypes exports the types for the message reflect info +func (g *Generator) exportTypes() { + // Generate message types + g.File.Comment("// Export message types") + g.File.Const().Id("MessageTypeUnknown"). + Qual(messagePackage, "MessageType").Op("=").Qual(messagePackage, "MessageType"). + Call(jen.Qual(g.getPackagePath("messagespb"), "MessageType_Unknown")) + for _, info := range g.Config.MessageReflectInfoTable { + msgTypeName := getMessageTypeName(info.MessageTypeWithVersion.MessageType) + g.File.Const().Id(info.MessageTypeWithVersion.MessageType). + Qual(messagePackage, "MessageType").Op("=").Qual(messagePackage, "MessageType"). + Call(jen.Qual(g.getPackagePath("messagespb"), "MessageType_"+msgTypeName)) + } + + // Generate extra export types + g.File.Comment("// Export extra message type") + for _, extraExportType := range g.Config.ExtraExportTypes { + pkg, typeName := parseTypeString(extraExportType) + g.File.Type().Id(typeName).Op("=").Qual(g.getPackagePath(pkg), typeName) + g.ExportTypes[extraExportType] = typeName + } + g.File.Line() + + // Export message header and body types + g.File.Comment("// Export message header and body types") + for _, info := range g.Config.MessageReflectInfoTable { + headerPkg, headerName := parseTypeString(info.MessageSpecializedType.HeaderType) + bodyPkg, bodyName := parseTypeString(info.MessageSpecializedType.BodyType) + g.File.Type().Id(headerName).Op("=").Qual(g.getPackagePath(headerPkg), headerName) + g.File.Type().Id(bodyName).Op("=").Qual(g.getPackagePath(bodyPkg), bodyName) + g.ExportTypes[info.MessageSpecializedType.HeaderType] = headerName + g.ExportTypes[info.MessageSpecializedType.BodyType] = bodyName + } + g.File.Line() +} + +// generateTables generates the tables for the message reflect info +func (g *Generator) generateTables() { + // Generate messageTypeMap + g.File.Comment("// messageTypeMap make the contriants that one header type can only be used for one message type.") + g.File.Var().Id("messageTypeMap").Op("=").Map(jen.Qual("reflect", "Type")).Qual(messagePackage, "MessageType").Values( + jen.DictFunc(func(d jen.Dict) { + for _, info := range g.Config.MessageReflectInfoTable { + headerPkg, headerName := parseTypeString(info.MessageSpecializedType.HeaderType) + d[jen.Qual("reflect", "TypeOf").Call(jen.Op("&").Qual(g.getPackagePath(headerPkg), headerName).Block())] = jen.Qual(messagePackage, info.MessageTypeWithVersion.MessageType) + } + }), + ) + g.File.Line() + + // Generate MessageTypeWithVersion and MessageSpecializedType + g.File.Comment("// MessageTypeWithVersion identifies a message type and version") + g.File.Type().Id("MessageTypeWithVersion").Struct( + jen.Id("MessageType").Qual(messagePackage, "MessageType"), + jen.Id("Version").Qual(messagePackage, "Version"), + ) + g.File.Line() + + // String returns a string representation of MessageTypeWithVersion + g.File.Func().Params( + jen.Id("m").Id("MessageTypeWithVersion"), + ).Id("String").Params().String().Block( + jen.Return(jen.Qual("fmt", "Sprintf").Call( + jen.Lit("%s@v%d"), + jen.Id("m").Dot("MessageType").Dot("String").Call(), + jen.Id("m").Dot("Version"), + )), + ) + g.File.Line() + + g.File.Comment("// MessageSpecializedType contains reflection types for message headers and bodies") + g.File.Type().Id("MessageSpecializedType").Struct( + jen.Id("HeaderType").Qual("reflect", "Type"), + jen.Id("BodyType").Qual("reflect", "Type"), + ) + g.File.Line() + + // Generate mapping table + g.File.Comment("// messageTypeVersionSpecializedMap maps MessageTypeWithVersion to MessageSpecializedType") + g.File.Var().Id("messageTypeVersionSpecializedMap").Op("=").Map(jen.Id("MessageTypeWithVersion")).Id("MessageSpecializedType").Values( + jen.DictFunc(func(d jen.Dict) { + for _, info := range g.Config.MessageReflectInfoTable { + key := jen.Id(getMessageTypeWithVersionId(info)) + d[key] = jen.Id(getSpecializedMessageTypeId(info)) + } + }), + ) + g.File.Line() + + // Generate reverse mapping table + g.File.Comment("// messageSpecializedTypeVersionMap maps MessageSpecializedType to MessageTypeWithVersion") + g.File.Var().Id("messageSpecializedTypeVersionMap").Op("=").Map(jen.Id("MessageSpecializedType")).Id("MessageTypeWithVersion").Values( + jen.DictFunc(func(d jen.Dict) { + for _, info := range g.Config.MessageReflectInfoTable { + key := jen.Id(getSpecializedMessageTypeId(info)) + d[key] = jen.Id(getMessageTypeWithVersionId(info)) + } + }), + ) + g.File.Line() +} + +// codegen generates the code from the JSON file +func codegen() (string, error) { + // Read JSON file + jsonData, err := os.ReadFile(reflectInfoFile) + if err != nil { + return "", err + } + + // Generate code + g := NewGenerator() + code, err := g.GenerateCodeFromJSON(jsonData) + if err != nil { + return "", err + } + return code, nil +} + +func main() { + code, err := codegen() + if err != nil { + fmt.Fprintf(os.Stderr, "Error generating code: %v\n", err) + os.Exit(1) + } + + err = os.WriteFile(reflectInfoGo, []byte(code), 0o644) // #nosec G306 + if err != nil { + fmt.Fprintf(os.Stderr, "Error writing to file %s: %v\n", reflectInfoGo, err) + os.Exit(1) + } + fmt.Printf("Generated code written to %s\n", reflectInfoGo) +} diff --git a/pkg/streaming/util/message/codegen/reflect_info.json b/pkg/streaming/util/message/codegen/reflect_info.json new file mode 100644 index 0000000000..d0cbf17741 --- /dev/null +++ b/pkg/streaming/util/message/codegen/reflect_info.json @@ -0,0 +1,174 @@ +{ + "packages": { + "msgpb": "github.com/milvus-io/milvus-proto/go-api/v2/msgpb", + "messagespb": "github.com/milvus-io/milvus/pkg/v2/proto/messagespb" + }, + "ExtraExportTypes": [ + "messagespb.PartitionSegmentAssignment", + "messagespb.SegmentAssignment", + "messagespb.ManualFlushExtraResponse" + ], + "messageReflectInfoTable": [ + { + "MessageSpecializedType": { + "HeaderType": "messagespb.TimeTickMessageHeader", + "BodyType": "msgpb.TimeTickMsg" + }, + "MessageTypeWithVersion": { + "MessageType": "MessageTypeTimeTick", + "Version": 1 + } + }, + { + "MessageSpecializedType": { + "HeaderType": "messagespb.InsertMessageHeader", + "BodyType": "msgpb.InsertRequest" + }, + "MessageTypeWithVersion": { + "MessageType": "MessageTypeInsert", + "Version": 1 + } + }, + { + "MessageSpecializedType": { + "HeaderType": "messagespb.DeleteMessageHeader", + "BodyType": "msgpb.DeleteRequest" + }, + "MessageTypeWithVersion": { + "MessageType": "MessageTypeDelete", + "Version": 1 + } + }, + { + "MessageSpecializedType": { + "HeaderType": "messagespb.CreateCollectionMessageHeader", + "BodyType": "msgpb.CreateCollectionRequest" + }, + "MessageTypeWithVersion": { + "MessageType": "MessageTypeCreateCollection", + "Version": 1 + } + }, + { + "MessageSpecializedType": { + "HeaderType": "messagespb.DropCollectionMessageHeader", + "BodyType": "msgpb.DropCollectionRequest" + }, + "MessageTypeWithVersion": { + "MessageType": "MessageTypeDropCollection", + "Version": 1 + } + }, + { + "MessageSpecializedType": { + "HeaderType": "messagespb.CreatePartitionMessageHeader", + "BodyType": "msgpb.CreatePartitionRequest" + }, + "MessageTypeWithVersion": { + "MessageType": "MessageTypeCreatePartition", + "Version": 1 + } + }, + { + "MessageSpecializedType": { + "HeaderType": "messagespb.DropPartitionMessageHeader", + "BodyType": "msgpb.DropPartitionRequest" + }, + "MessageTypeWithVersion": { + "MessageType": "MessageTypeDropPartition", + "Version": 1 + } + }, + { + "MessageSpecializedType": { + "HeaderType": "messagespb.ImportMessageHeader", + "BodyType": "msgpb.ImportMsg" + }, + "MessageTypeWithVersion": { + "MessageType": "MessageTypeImport", + "Version": 1 + } + }, + { + "MessageSpecializedType": { + "HeaderType": "messagespb.CreateSegmentMessageHeader", + "BodyType": "messagespb.CreateSegmentMessageBody" + }, + "MessageTypeWithVersion": { + "MessageType": "MessageTypeCreateSegment", + "Version": 2 + } + }, + { + "MessageSpecializedType": { + "HeaderType": "messagespb.FlushMessageHeader", + "BodyType": "messagespb.FlushMessageBody" + }, + "MessageTypeWithVersion": { + "MessageType": "MessageTypeFlush", + "Version": 2 + } + }, + { + "MessageSpecializedType": { + "HeaderType": "messagespb.ManualFlushMessageHeader", + "BodyType": "messagespb.ManualFlushMessageBody" + }, + "MessageTypeWithVersion": { + "MessageType": "MessageTypeManualFlush", + "Version": 2 + } + }, + { + "MessageSpecializedType": { + "HeaderType": "messagespb.BeginTxnMessageHeader", + "BodyType": "messagespb.BeginTxnMessageBody" + }, + "MessageTypeWithVersion": { + "MessageType": "MessageTypeBeginTxn", + "Version": 2 + } + }, + { + "MessageSpecializedType": { + "HeaderType": "messagespb.CommitTxnMessageHeader", + "BodyType": "messagespb.CommitTxnMessageBody" + }, + "MessageTypeWithVersion": { + "MessageType": "MessageTypeCommitTxn", + "Version": 2 + } + }, + { + "MessageSpecializedType": { + "HeaderType": "messagespb.RollbackTxnMessageHeader", + "BodyType": "messagespb.RollbackTxnMessageBody" + }, + "MessageTypeWithVersion": { + "MessageType": "MessageTypeRollbackTxn", + "Version": 2 + } + }, + { + "MessageSpecializedType": { + "HeaderType": "messagespb.TxnMessageHeader", + "BodyType": "messagespb.TxnMessageBody" + }, + "MessageTypeWithVersion": { + "MessageType": "MessageTypeTxn", + "Version": 2 + }, + "NoUtilFunctions": true + }, + { + "MessageSpecializedType": { + "HeaderType": "messagespb.SchemaChangeMessageHeader", + "BodyType": "messagespb.SchemaChangeMessageBody" + }, + "MessageTypeWithVersion": { + "MessageType": "MessageTypeSchemaChange", + "Version": 2 + } + } + ] +} \ No newline at end of file diff --git a/pkg/streaming/util/message/marshal_log_object.go b/pkg/streaming/util/message/marshal_log_object.go index 6e1d4698ba..61e9e52a16 100644 --- a/pkg/streaming/util/message/marshal_log_object.go +++ b/pkg/streaming/util/message/marshal_log_object.go @@ -1,6 +1,7 @@ package message import ( + "fmt" "reflect" "strconv" "strings" @@ -26,7 +27,7 @@ func (m *messageImpl) MarshalLogObject(enc zapcore.ObjectEncoder) error { enc.AddInt64("broadcastID", int64(broadcast.BroadcastID)) } enc.AddInt("size", len(m.payload)) - marshalSpecializedHeader(m.MessageType(), m.properties[messageHeader], enc) + marshalSpecializedHeader(m.MessageType(), m.Version(), m.properties[messageHeader], enc) return nil } @@ -47,7 +48,7 @@ func (m *immutableMessageImpl) MarshalLogObject(enc zapcore.ObjectEncoder) error enc.AddInt64("broadcastID", int64(broadcast.BroadcastID)) } enc.AddInt("size", len(m.payload)) - marshalSpecializedHeader(m.MessageType(), m.properties[messageHeader], enc) + marshalSpecializedHeader(m.MessageType(), m.Version(), m.properties[messageHeader], enc) return nil } @@ -67,10 +68,14 @@ func (m *immutableTxnMessageImpl) MarshalLogObject(enc zapcore.ObjectEncoder) er } // marshalSpecializedHeader marshals the specialized header of the message. -func marshalSpecializedHeader(t MessageType, h string, enc zapcore.ObjectEncoder) { - typ := messageTypeToCustomHeaderMap[t] +func marshalSpecializedHeader(t MessageType, v Version, h string, enc zapcore.ObjectEncoder) { + typ, ok := GetSerializeType(NewMessageTypeWithVersion(t, v)) + if !ok { + enc.AddString("headerDecodeError", fmt.Sprintf("message type %s version %d not found", t, v)) + return + } // must be a proto type. - header := reflect.New(typ.Elem()).Interface().(proto.Message) + header := reflect.New(typ.HeaderType.Elem()).Interface().(proto.Message) if err := DecodeProto(h, header); err != nil { enc.AddString("headerDecodeError", err.Error()) return diff --git a/pkg/streaming/util/message/message.go b/pkg/streaming/util/message/message.go index 9c42ed3715..c224de56dd 100644 --- a/pkg/streaming/util/message/message.go +++ b/pkg/streaming/util/message/message.go @@ -26,6 +26,9 @@ type BasicMessage interface { // from 1: new version after streamingnode. Version() Version + // MessageTypeWithVersion returns the message type with version. + MessageTypeWithVersion() MessageTypeWithVersion + // Payload returns the message payload. // If the underlying message is encrypted, the payload will be decrypted. // !!! So if the message is encrypted, additional overhead will be paid for decryption. @@ -143,6 +146,9 @@ type ImmutableMessage interface { // Available only when the message's version greater than 0. // Otherwise, it will panic. LastConfirmedMessageID() MessageID + + // IntoImmutableMessageProto converts the message to a protobuf immutable message. + IntoImmutableMessageProto() *messagespb.ImmutableMessage } // ImmutableTxnMessage is the read-only transaction message interface. @@ -165,6 +171,25 @@ type ImmutableTxnMessage interface { Size() int } +// SpecializedBroadcastMessage is the specialized broadcast message interface. +type SpecializedBroadcastMessage[H proto.Message, B proto.Message] interface { + BasicMessage + + // MessageHeader returns the message header. + // Modifications to the returned header will be reflected in the message. + Header() H + + // Body returns the message body. + // !!! Do these will trigger a unmarshal operation, so it should be used with caution. + Body() (B, error) + + // MustBody return the message body, panic if error occurs. + MustBody() B + + // OverwriteHeader overwrites the message header. + OverwriteHeader(header H) +} + // specializedMutableMessage is the specialized mutable message interface. type specializedMutableMessage[H proto.Message, B proto.Message] interface { BasicMessage @@ -187,8 +212,8 @@ type specializedMutableMessage[H proto.Message, B proto.Message] interface { OverwriteHeader(header H) } -// specializedImmutableMessage is the specialized immutable message interface. -type specializedImmutableMessage[H proto.Message, B proto.Message] interface { +// SpecializedImmutableMessage is the specialized immutable message interface. +type SpecializedImmutableMessage[H proto.Message, B proto.Message] interface { ImmutableMessage // Header returns the message header. diff --git a/pkg/streaming/util/message/message_impl.go b/pkg/streaming/util/message/message_impl.go index 8b1defc126..3504ede35c 100644 --- a/pkg/streaming/util/message/message_impl.go +++ b/pkg/streaming/util/message/message_impl.go @@ -29,6 +29,14 @@ func (m *messageImpl) Version() Version { return newMessageVersionFromString(value) } +// MessageTypeWithVersion returns the message type with version. +func (m *messageImpl) MessageTypeWithVersion() MessageTypeWithVersion { + return MessageTypeWithVersion{ + MessageType: m.MessageType(), + Version: m.Version(), + } +} + // Payload returns payload of current message. func (m *messageImpl) Payload() []byte { if ch := m.cipherHeader(); ch != nil { @@ -361,6 +369,17 @@ func (m *immutableMessageImpl) overwriteLastConfirmedMessageID(id MessageID) { m.WithLastConfirmed(id) } +// IntoImmutableMessageProto converts the message to a protobuf immutable message. +func (m *immutableMessageImpl) IntoImmutableMessageProto() *messagespb.ImmutableMessage { + return &messagespb.ImmutableMessage{ + Id: &messagespb.MessageID{ + Id: m.id.Marshal(), + }, + Payload: m.payload, + Properties: m.properties.ToRawMap(), + } +} + // immutableTxnMessageImpl is a immutable transaction message. type immutableTxnMessageImpl struct { immutableMessageImpl diff --git a/pkg/streaming/util/message/message_test.go b/pkg/streaming/util/message/message_test.go index fd5c2404fd..efc548a62c 100644 --- a/pkg/streaming/util/message/message_test.go +++ b/pkg/streaming/util/message/message_test.go @@ -8,17 +8,18 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/pkg/v2/mocks/github.com/milvus-io/milvus-proto/go-api/v2/mock_hook" + "github.com/milvus-io/milvus/pkg/v2/proto/messagespb" ) func TestMessageType(t *testing.T) { - s := MessageTypeUnknown.marshal() + s := MessageType(messagespb.MessageType_Unknown).marshal() assert.Equal(t, "0", s) typ := unmarshalMessageType("0") - assert.Equal(t, MessageTypeUnknown, typ) - assert.False(t, MessageTypeUnknown.Valid()) + assert.Equal(t, MessageType(messagespb.MessageType_Unknown), typ) + assert.False(t, MessageType(messagespb.MessageType_Unknown).Valid()) typ = unmarshalMessageType("882s9") - assert.Equal(t, MessageTypeUnknown, typ) + assert.Equal(t, MessageType(messagespb.MessageType_Unknown), typ) s = MessageTypeTimeTick.marshal() typ = unmarshalMessageType(s) @@ -75,7 +76,7 @@ func TestBroadcast(t *testing.T) { assert.Len(t, msgs[0].BroadcastHeader().ResourceKeys, 2) assert.ElementsMatch(t, []string{"v1", "v2"}, []string{msgs[0].VChannel(), msgs[1].VChannel()}) - MustAsMutableCreateCollectionMessageV1(msg) + MustAsBroadcastCreateCollectionMessageV1(msg) } func TestCiper(t *testing.T) { diff --git a/pkg/streaming/util/message/message_type.go b/pkg/streaming/util/message/message_type.go index f5d38a95d8..b15891c39a 100644 --- a/pkg/streaming/util/message/message_type.go +++ b/pkg/streaming/util/message/message_type.go @@ -8,49 +8,9 @@ import ( type MessageType messagespb.MessageType -const ( - MessageTypeUnknown MessageType = MessageType(messagespb.MessageType_Unknown) - MessageTypeTimeTick MessageType = MessageType(messagespb.MessageType_TimeTick) - MessageTypeInsert MessageType = MessageType(messagespb.MessageType_Insert) - MessageTypeDelete MessageType = MessageType(messagespb.MessageType_Delete) - MessageTypeCreateSegment MessageType = MessageType(messagespb.MessageType_CreateSegment) - MessageTypeFlush MessageType = MessageType(messagespb.MessageType_Flush) - MessageTypeManualFlush MessageType = MessageType(messagespb.MessageType_ManualFlush) - MessageTypeCreateCollection MessageType = MessageType(messagespb.MessageType_CreateCollection) - MessageTypeDropCollection MessageType = MessageType(messagespb.MessageType_DropCollection) - MessageTypeCreatePartition MessageType = MessageType(messagespb.MessageType_CreatePartition) - MessageTypeDropPartition MessageType = MessageType(messagespb.MessageType_DropPartition) - MessageTypeTxn MessageType = MessageType(messagespb.MessageType_Txn) - MessageTypeBeginTxn MessageType = MessageType(messagespb.MessageType_BeginTxn) - MessageTypeCommitTxn MessageType = MessageType(messagespb.MessageType_CommitTxn) - MessageTypeRollbackTxn MessageType = MessageType(messagespb.MessageType_RollbackTxn) - MessageTypeImport MessageType = MessageType(messagespb.MessageType_Import) - MessageTypeSchemaChange MessageType = MessageType(messagespb.MessageType_SchemaChange) -) - -var messageTypeName = map[MessageType]string{ - MessageTypeUnknown: "UNKNOWN", - MessageTypeTimeTick: "TIME_TICK", - MessageTypeInsert: "INSERT", - MessageTypeDelete: "DELETE", - MessageTypeFlush: "FLUSH", - MessageTypeCreateSegment: "CREATE_SEGMENT", - MessageTypeManualFlush: "MANUAL_FLUSH", - MessageTypeCreateCollection: "CREATE_COLLECTION", - MessageTypeDropCollection: "DROP_COLLECTION", - MessageTypeCreatePartition: "CREATE_PARTITION", - MessageTypeDropPartition: "DROP_PARTITION", - MessageTypeTxn: "TXN", - MessageTypeBeginTxn: "BEGIN_TXN", - MessageTypeCommitTxn: "COMMIT_TXN", - MessageTypeRollbackTxn: "ROLLBACK_TXN", - MessageTypeImport: "IMPORT", - MessageTypeSchemaChange: "SCHEMA_CHANGE", -} - // String implements fmt.Stringer interface. func (t MessageType) String() string { - return messageTypeName[t] + return messagespb.MessageType_name[int32(t)] } // marshal marshal MessageType to string. @@ -60,7 +20,8 @@ func (t MessageType) marshal() string { // Valid checks if the MessageType is valid. func (t MessageType) Valid() bool { - _, ok := messageTypeName[t] + typ := int32(t) + _, ok := messagespb.MessageType_name[typ] return t != MessageTypeUnknown && ok } diff --git a/pkg/streaming/util/message/reflect_info.go b/pkg/streaming/util/message/reflect_info.go new file mode 100644 index 0000000000..7cf893e19c --- /dev/null +++ b/pkg/streaming/util/message/reflect_info.go @@ -0,0 +1,770 @@ +// Code generated by message-codegen. DO NOT EDIT. + +package message + +import ( + "fmt" + "reflect" + + msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + messagespb "github.com/milvus-io/milvus/pkg/v2/proto/messagespb" +) + +// Export message types +const ( + MessageTypeUnknown MessageType = MessageType(messagespb.MessageType_Unknown) + MessageTypeTimeTick MessageType = MessageType(messagespb.MessageType_TimeTick) + MessageTypeInsert MessageType = MessageType(messagespb.MessageType_Insert) + MessageTypeDelete MessageType = MessageType(messagespb.MessageType_Delete) + MessageTypeCreateCollection MessageType = MessageType(messagespb.MessageType_CreateCollection) + MessageTypeDropCollection MessageType = MessageType(messagespb.MessageType_DropCollection) + MessageTypeCreatePartition MessageType = MessageType(messagespb.MessageType_CreatePartition) + MessageTypeDropPartition MessageType = MessageType(messagespb.MessageType_DropPartition) + MessageTypeImport MessageType = MessageType(messagespb.MessageType_Import) + MessageTypeCreateSegment MessageType = MessageType(messagespb.MessageType_CreateSegment) + MessageTypeFlush MessageType = MessageType(messagespb.MessageType_Flush) + MessageTypeManualFlush MessageType = MessageType(messagespb.MessageType_ManualFlush) + MessageTypeBeginTxn MessageType = MessageType(messagespb.MessageType_BeginTxn) + MessageTypeCommitTxn MessageType = MessageType(messagespb.MessageType_CommitTxn) + MessageTypeRollbackTxn MessageType = MessageType(messagespb.MessageType_RollbackTxn) + MessageTypeTxn MessageType = MessageType(messagespb.MessageType_Txn) + MessageTypeSchemaChange MessageType = MessageType(messagespb.MessageType_SchemaChange) +) + +// Export extra message type +type ( + PartitionSegmentAssignment = messagespb.PartitionSegmentAssignment + SegmentAssignment = messagespb.SegmentAssignment + ManualFlushExtraResponse = messagespb.ManualFlushExtraResponse +) + +// Export message header and body types +type ( + TimeTickMessageHeader = messagespb.TimeTickMessageHeader + TimeTickMsg = msgpb.TimeTickMsg + InsertMessageHeader = messagespb.InsertMessageHeader + InsertRequest = msgpb.InsertRequest + DeleteMessageHeader = messagespb.DeleteMessageHeader + DeleteRequest = msgpb.DeleteRequest + CreateCollectionMessageHeader = messagespb.CreateCollectionMessageHeader + CreateCollectionRequest = msgpb.CreateCollectionRequest + DropCollectionMessageHeader = messagespb.DropCollectionMessageHeader + DropCollectionRequest = msgpb.DropCollectionRequest + CreatePartitionMessageHeader = messagespb.CreatePartitionMessageHeader + CreatePartitionRequest = msgpb.CreatePartitionRequest + DropPartitionMessageHeader = messagespb.DropPartitionMessageHeader + DropPartitionRequest = msgpb.DropPartitionRequest + ImportMessageHeader = messagespb.ImportMessageHeader + ImportMsg = msgpb.ImportMsg + CreateSegmentMessageHeader = messagespb.CreateSegmentMessageHeader + CreateSegmentMessageBody = messagespb.CreateSegmentMessageBody + FlushMessageHeader = messagespb.FlushMessageHeader + FlushMessageBody = messagespb.FlushMessageBody + ManualFlushMessageHeader = messagespb.ManualFlushMessageHeader + ManualFlushMessageBody = messagespb.ManualFlushMessageBody + BeginTxnMessageHeader = messagespb.BeginTxnMessageHeader + BeginTxnMessageBody = messagespb.BeginTxnMessageBody + CommitTxnMessageHeader = messagespb.CommitTxnMessageHeader + CommitTxnMessageBody = messagespb.CommitTxnMessageBody + RollbackTxnMessageHeader = messagespb.RollbackTxnMessageHeader + RollbackTxnMessageBody = messagespb.RollbackTxnMessageBody + TxnMessageHeader = messagespb.TxnMessageHeader + TxnMessageBody = messagespb.TxnMessageBody + SchemaChangeMessageHeader = messagespb.SchemaChangeMessageHeader + SchemaChangeMessageBody = messagespb.SchemaChangeMessageBody +) + +// Type aliases for TimeTickMessageV1 +type ( + MutableTimeTickMessageV1 = specializedMutableMessage[*TimeTickMessageHeader, *TimeTickMsg] + ImmutableTimeTickMessageV1 = SpecializedImmutableMessage[*TimeTickMessageHeader, *TimeTickMsg] + BroadcastTimeTickMessageV1 = SpecializedBroadcastMessage[*TimeTickMessageHeader, *TimeTickMsg] +) + +// MessageTypeWithVersion for TimeTickMessageV1 +var MessageTypeTimeTickV1 = MessageTypeWithVersion{ + MessageType: MessageTypeTimeTick, + Version: VersionV1, +} + +// MessageSpecializedType for TimeTickMessageV1 +var SpecializedTypeTimeTickV1 = MessageSpecializedType{ + BodyType: reflect.TypeOf((*TimeTickMsg)(nil)), + HeaderType: reflect.TypeOf((*TimeTickMessageHeader)(nil)), +} + +// AsMutableTimeTickMessageV1 converts a BasicMessage to MutableTimeTickMessageV1 +var AsMutableTimeTickMessageV1 = asSpecializedMutableMessage[*TimeTickMessageHeader, *TimeTickMsg] + +// MustAsMutableTimeTickMessageV1 converts a BasicMessage to MutableTimeTickMessageV1, panics on error +var MustAsMutableTimeTickMessageV1 = mustAsSpecializedMutableMessage[*TimeTickMessageHeader, *TimeTickMsg] + +// AsImmutableTimeTickMessageV1 converts an ImmutableMessage to ImmutableTimeTickMessageV1 +var AsImmutableTimeTickMessageV1 = asSpecializedImmutableMessage[*TimeTickMessageHeader, *TimeTickMsg] + +// MustAsImmutableTimeTickMessageV1 converts an ImmutableMessage to ImmutableTimeTickMessageV1, panics on error +var MustAsImmutableTimeTickMessageV1 = MustAsSpecializedImmutableMessage[*TimeTickMessageHeader, *TimeTickMsg] + +// AsBroadcastTimeTickMessageV1 converts a BasicMessage to BroadcastTimeTickMessageV1 +var AsBroadcastTimeTickMessageV1 = asSpecializedBroadcastMessage[*TimeTickMessageHeader, *TimeTickMsg] + +// MustAsBroadcastTimeTickMessageV1 converts a BasicMessage to BroadcastTimeTickMessageV1, panics on error +var MustAsBroadcastTimeTickMessageV1 = MustAsSpecializedBroadcastMessage[*TimeTickMessageHeader, *TimeTickMsg] + +// NewTimeTickMessageBuilderV1 creates a new message builder for TimeTickMessageV1 +var NewTimeTickMessageBuilderV1 = newMutableMessageBuilder[*TimeTickMessageHeader, *TimeTickMsg] + +// Type aliases for InsertMessageV1 +type ( + MutableInsertMessageV1 = specializedMutableMessage[*InsertMessageHeader, *InsertRequest] + ImmutableInsertMessageV1 = SpecializedImmutableMessage[*InsertMessageHeader, *InsertRequest] + BroadcastInsertMessageV1 = SpecializedBroadcastMessage[*InsertMessageHeader, *InsertRequest] +) + +// MessageTypeWithVersion for InsertMessageV1 +var MessageTypeInsertV1 = MessageTypeWithVersion{ + MessageType: MessageTypeInsert, + Version: VersionV1, +} + +// MessageSpecializedType for InsertMessageV1 +var SpecializedTypeInsertV1 = MessageSpecializedType{ + BodyType: reflect.TypeOf((*InsertRequest)(nil)), + HeaderType: reflect.TypeOf((*InsertMessageHeader)(nil)), +} + +// AsMutableInsertMessageV1 converts a BasicMessage to MutableInsertMessageV1 +var AsMutableInsertMessageV1 = asSpecializedMutableMessage[*InsertMessageHeader, *InsertRequest] + +// MustAsMutableInsertMessageV1 converts a BasicMessage to MutableInsertMessageV1, panics on error +var MustAsMutableInsertMessageV1 = mustAsSpecializedMutableMessage[*InsertMessageHeader, *InsertRequest] + +// AsImmutableInsertMessageV1 converts an ImmutableMessage to ImmutableInsertMessageV1 +var AsImmutableInsertMessageV1 = asSpecializedImmutableMessage[*InsertMessageHeader, *InsertRequest] + +// MustAsImmutableInsertMessageV1 converts an ImmutableMessage to ImmutableInsertMessageV1, panics on error +var MustAsImmutableInsertMessageV1 = MustAsSpecializedImmutableMessage[*InsertMessageHeader, *InsertRequest] + +// AsBroadcastInsertMessageV1 converts a BasicMessage to BroadcastInsertMessageV1 +var AsBroadcastInsertMessageV1 = asSpecializedBroadcastMessage[*InsertMessageHeader, *InsertRequest] + +// MustAsBroadcastInsertMessageV1 converts a BasicMessage to BroadcastInsertMessageV1, panics on error +var MustAsBroadcastInsertMessageV1 = MustAsSpecializedBroadcastMessage[*InsertMessageHeader, *InsertRequest] + +// NewInsertMessageBuilderV1 creates a new message builder for InsertMessageV1 +var NewInsertMessageBuilderV1 = newMutableMessageBuilder[*InsertMessageHeader, *InsertRequest] + +// Type aliases for DeleteMessageV1 +type ( + MutableDeleteMessageV1 = specializedMutableMessage[*DeleteMessageHeader, *DeleteRequest] + ImmutableDeleteMessageV1 = SpecializedImmutableMessage[*DeleteMessageHeader, *DeleteRequest] + BroadcastDeleteMessageV1 = SpecializedBroadcastMessage[*DeleteMessageHeader, *DeleteRequest] +) + +// MessageTypeWithVersion for DeleteMessageV1 +var MessageTypeDeleteV1 = MessageTypeWithVersion{ + MessageType: MessageTypeDelete, + Version: VersionV1, +} + +// MessageSpecializedType for DeleteMessageV1 +var SpecializedTypeDeleteV1 = MessageSpecializedType{ + BodyType: reflect.TypeOf((*DeleteRequest)(nil)), + HeaderType: reflect.TypeOf((*DeleteMessageHeader)(nil)), +} + +// AsMutableDeleteMessageV1 converts a BasicMessage to MutableDeleteMessageV1 +var AsMutableDeleteMessageV1 = asSpecializedMutableMessage[*DeleteMessageHeader, *DeleteRequest] + +// MustAsMutableDeleteMessageV1 converts a BasicMessage to MutableDeleteMessageV1, panics on error +var MustAsMutableDeleteMessageV1 = mustAsSpecializedMutableMessage[*DeleteMessageHeader, *DeleteRequest] + +// AsImmutableDeleteMessageV1 converts an ImmutableMessage to ImmutableDeleteMessageV1 +var AsImmutableDeleteMessageV1 = asSpecializedImmutableMessage[*DeleteMessageHeader, *DeleteRequest] + +// MustAsImmutableDeleteMessageV1 converts an ImmutableMessage to ImmutableDeleteMessageV1, panics on error +var MustAsImmutableDeleteMessageV1 = MustAsSpecializedImmutableMessage[*DeleteMessageHeader, *DeleteRequest] + +// AsBroadcastDeleteMessageV1 converts a BasicMessage to BroadcastDeleteMessageV1 +var AsBroadcastDeleteMessageV1 = asSpecializedBroadcastMessage[*DeleteMessageHeader, *DeleteRequest] + +// MustAsBroadcastDeleteMessageV1 converts a BasicMessage to BroadcastDeleteMessageV1, panics on error +var MustAsBroadcastDeleteMessageV1 = MustAsSpecializedBroadcastMessage[*DeleteMessageHeader, *DeleteRequest] + +// NewDeleteMessageBuilderV1 creates a new message builder for DeleteMessageV1 +var NewDeleteMessageBuilderV1 = newMutableMessageBuilder[*DeleteMessageHeader, *DeleteRequest] + +// Type aliases for CreateCollectionMessageV1 +type ( + MutableCreateCollectionMessageV1 = specializedMutableMessage[*CreateCollectionMessageHeader, *CreateCollectionRequest] + ImmutableCreateCollectionMessageV1 = SpecializedImmutableMessage[*CreateCollectionMessageHeader, *CreateCollectionRequest] + BroadcastCreateCollectionMessageV1 = SpecializedBroadcastMessage[*CreateCollectionMessageHeader, *CreateCollectionRequest] +) + +// MessageTypeWithVersion for CreateCollectionMessageV1 +var MessageTypeCreateCollectionV1 = MessageTypeWithVersion{ + MessageType: MessageTypeCreateCollection, + Version: VersionV1, +} + +// MessageSpecializedType for CreateCollectionMessageV1 +var SpecializedTypeCreateCollectionV1 = MessageSpecializedType{ + BodyType: reflect.TypeOf((*CreateCollectionRequest)(nil)), + HeaderType: reflect.TypeOf((*CreateCollectionMessageHeader)(nil)), +} + +// AsMutableCreateCollectionMessageV1 converts a BasicMessage to MutableCreateCollectionMessageV1 +var AsMutableCreateCollectionMessageV1 = asSpecializedMutableMessage[*CreateCollectionMessageHeader, *CreateCollectionRequest] + +// MustAsMutableCreateCollectionMessageV1 converts a BasicMessage to MutableCreateCollectionMessageV1, panics on error +var MustAsMutableCreateCollectionMessageV1 = mustAsSpecializedMutableMessage[*CreateCollectionMessageHeader, *CreateCollectionRequest] + +// AsImmutableCreateCollectionMessageV1 converts an ImmutableMessage to ImmutableCreateCollectionMessageV1 +var AsImmutableCreateCollectionMessageV1 = asSpecializedImmutableMessage[*CreateCollectionMessageHeader, *CreateCollectionRequest] + +// MustAsImmutableCreateCollectionMessageV1 converts an ImmutableMessage to ImmutableCreateCollectionMessageV1, panics on error +var MustAsImmutableCreateCollectionMessageV1 = MustAsSpecializedImmutableMessage[*CreateCollectionMessageHeader, *CreateCollectionRequest] + +// AsBroadcastCreateCollectionMessageV1 converts a BasicMessage to BroadcastCreateCollectionMessageV1 +var AsBroadcastCreateCollectionMessageV1 = asSpecializedBroadcastMessage[*CreateCollectionMessageHeader, *CreateCollectionRequest] + +// MustAsBroadcastCreateCollectionMessageV1 converts a BasicMessage to BroadcastCreateCollectionMessageV1, panics on error +var MustAsBroadcastCreateCollectionMessageV1 = MustAsSpecializedBroadcastMessage[*CreateCollectionMessageHeader, *CreateCollectionRequest] + +// NewCreateCollectionMessageBuilderV1 creates a new message builder for CreateCollectionMessageV1 +var NewCreateCollectionMessageBuilderV1 = newMutableMessageBuilder[*CreateCollectionMessageHeader, *CreateCollectionRequest] + +// Type aliases for DropCollectionMessageV1 +type ( + MutableDropCollectionMessageV1 = specializedMutableMessage[*DropCollectionMessageHeader, *DropCollectionRequest] + ImmutableDropCollectionMessageV1 = SpecializedImmutableMessage[*DropCollectionMessageHeader, *DropCollectionRequest] + BroadcastDropCollectionMessageV1 = SpecializedBroadcastMessage[*DropCollectionMessageHeader, *DropCollectionRequest] +) + +// MessageTypeWithVersion for DropCollectionMessageV1 +var MessageTypeDropCollectionV1 = MessageTypeWithVersion{ + MessageType: MessageTypeDropCollection, + Version: VersionV1, +} + +// MessageSpecializedType for DropCollectionMessageV1 +var SpecializedTypeDropCollectionV1 = MessageSpecializedType{ + BodyType: reflect.TypeOf((*DropCollectionRequest)(nil)), + HeaderType: reflect.TypeOf((*DropCollectionMessageHeader)(nil)), +} + +// AsMutableDropCollectionMessageV1 converts a BasicMessage to MutableDropCollectionMessageV1 +var AsMutableDropCollectionMessageV1 = asSpecializedMutableMessage[*DropCollectionMessageHeader, *DropCollectionRequest] + +// MustAsMutableDropCollectionMessageV1 converts a BasicMessage to MutableDropCollectionMessageV1, panics on error +var MustAsMutableDropCollectionMessageV1 = mustAsSpecializedMutableMessage[*DropCollectionMessageHeader, *DropCollectionRequest] + +// AsImmutableDropCollectionMessageV1 converts an ImmutableMessage to ImmutableDropCollectionMessageV1 +var AsImmutableDropCollectionMessageV1 = asSpecializedImmutableMessage[*DropCollectionMessageHeader, *DropCollectionRequest] + +// MustAsImmutableDropCollectionMessageV1 converts an ImmutableMessage to ImmutableDropCollectionMessageV1, panics on error +var MustAsImmutableDropCollectionMessageV1 = MustAsSpecializedImmutableMessage[*DropCollectionMessageHeader, *DropCollectionRequest] + +// AsBroadcastDropCollectionMessageV1 converts a BasicMessage to BroadcastDropCollectionMessageV1 +var AsBroadcastDropCollectionMessageV1 = asSpecializedBroadcastMessage[*DropCollectionMessageHeader, *DropCollectionRequest] + +// MustAsBroadcastDropCollectionMessageV1 converts a BasicMessage to BroadcastDropCollectionMessageV1, panics on error +var MustAsBroadcastDropCollectionMessageV1 = MustAsSpecializedBroadcastMessage[*DropCollectionMessageHeader, *DropCollectionRequest] + +// NewDropCollectionMessageBuilderV1 creates a new message builder for DropCollectionMessageV1 +var NewDropCollectionMessageBuilderV1 = newMutableMessageBuilder[*DropCollectionMessageHeader, *DropCollectionRequest] + +// Type aliases for CreatePartitionMessageV1 +type ( + MutableCreatePartitionMessageV1 = specializedMutableMessage[*CreatePartitionMessageHeader, *CreatePartitionRequest] + ImmutableCreatePartitionMessageV1 = SpecializedImmutableMessage[*CreatePartitionMessageHeader, *CreatePartitionRequest] + BroadcastCreatePartitionMessageV1 = SpecializedBroadcastMessage[*CreatePartitionMessageHeader, *CreatePartitionRequest] +) + +// MessageTypeWithVersion for CreatePartitionMessageV1 +var MessageTypeCreatePartitionV1 = MessageTypeWithVersion{ + MessageType: MessageTypeCreatePartition, + Version: VersionV1, +} + +// MessageSpecializedType for CreatePartitionMessageV1 +var SpecializedTypeCreatePartitionV1 = MessageSpecializedType{ + BodyType: reflect.TypeOf((*CreatePartitionRequest)(nil)), + HeaderType: reflect.TypeOf((*CreatePartitionMessageHeader)(nil)), +} + +// AsMutableCreatePartitionMessageV1 converts a BasicMessage to MutableCreatePartitionMessageV1 +var AsMutableCreatePartitionMessageV1 = asSpecializedMutableMessage[*CreatePartitionMessageHeader, *CreatePartitionRequest] + +// MustAsMutableCreatePartitionMessageV1 converts a BasicMessage to MutableCreatePartitionMessageV1, panics on error +var MustAsMutableCreatePartitionMessageV1 = mustAsSpecializedMutableMessage[*CreatePartitionMessageHeader, *CreatePartitionRequest] + +// AsImmutableCreatePartitionMessageV1 converts an ImmutableMessage to ImmutableCreatePartitionMessageV1 +var AsImmutableCreatePartitionMessageV1 = asSpecializedImmutableMessage[*CreatePartitionMessageHeader, *CreatePartitionRequest] + +// MustAsImmutableCreatePartitionMessageV1 converts an ImmutableMessage to ImmutableCreatePartitionMessageV1, panics on error +var MustAsImmutableCreatePartitionMessageV1 = MustAsSpecializedImmutableMessage[*CreatePartitionMessageHeader, *CreatePartitionRequest] + +// AsBroadcastCreatePartitionMessageV1 converts a BasicMessage to BroadcastCreatePartitionMessageV1 +var AsBroadcastCreatePartitionMessageV1 = asSpecializedBroadcastMessage[*CreatePartitionMessageHeader, *CreatePartitionRequest] + +// MustAsBroadcastCreatePartitionMessageV1 converts a BasicMessage to BroadcastCreatePartitionMessageV1, panics on error +var MustAsBroadcastCreatePartitionMessageV1 = MustAsSpecializedBroadcastMessage[*CreatePartitionMessageHeader, *CreatePartitionRequest] + +// NewCreatePartitionMessageBuilderV1 creates a new message builder for CreatePartitionMessageV1 +var NewCreatePartitionMessageBuilderV1 = newMutableMessageBuilder[*CreatePartitionMessageHeader, *CreatePartitionRequest] + +// Type aliases for DropPartitionMessageV1 +type ( + MutableDropPartitionMessageV1 = specializedMutableMessage[*DropPartitionMessageHeader, *DropPartitionRequest] + ImmutableDropPartitionMessageV1 = SpecializedImmutableMessage[*DropPartitionMessageHeader, *DropPartitionRequest] + BroadcastDropPartitionMessageV1 = SpecializedBroadcastMessage[*DropPartitionMessageHeader, *DropPartitionRequest] +) + +// MessageTypeWithVersion for DropPartitionMessageV1 +var MessageTypeDropPartitionV1 = MessageTypeWithVersion{ + MessageType: MessageTypeDropPartition, + Version: VersionV1, +} + +// MessageSpecializedType for DropPartitionMessageV1 +var SpecializedTypeDropPartitionV1 = MessageSpecializedType{ + BodyType: reflect.TypeOf((*DropPartitionRequest)(nil)), + HeaderType: reflect.TypeOf((*DropPartitionMessageHeader)(nil)), +} + +// AsMutableDropPartitionMessageV1 converts a BasicMessage to MutableDropPartitionMessageV1 +var AsMutableDropPartitionMessageV1 = asSpecializedMutableMessage[*DropPartitionMessageHeader, *DropPartitionRequest] + +// MustAsMutableDropPartitionMessageV1 converts a BasicMessage to MutableDropPartitionMessageV1, panics on error +var MustAsMutableDropPartitionMessageV1 = mustAsSpecializedMutableMessage[*DropPartitionMessageHeader, *DropPartitionRequest] + +// AsImmutableDropPartitionMessageV1 converts an ImmutableMessage to ImmutableDropPartitionMessageV1 +var AsImmutableDropPartitionMessageV1 = asSpecializedImmutableMessage[*DropPartitionMessageHeader, *DropPartitionRequest] + +// MustAsImmutableDropPartitionMessageV1 converts an ImmutableMessage to ImmutableDropPartitionMessageV1, panics on error +var MustAsImmutableDropPartitionMessageV1 = MustAsSpecializedImmutableMessage[*DropPartitionMessageHeader, *DropPartitionRequest] + +// AsBroadcastDropPartitionMessageV1 converts a BasicMessage to BroadcastDropPartitionMessageV1 +var AsBroadcastDropPartitionMessageV1 = asSpecializedBroadcastMessage[*DropPartitionMessageHeader, *DropPartitionRequest] + +// MustAsBroadcastDropPartitionMessageV1 converts a BasicMessage to BroadcastDropPartitionMessageV1, panics on error +var MustAsBroadcastDropPartitionMessageV1 = MustAsSpecializedBroadcastMessage[*DropPartitionMessageHeader, *DropPartitionRequest] + +// NewDropPartitionMessageBuilderV1 creates a new message builder for DropPartitionMessageV1 +var NewDropPartitionMessageBuilderV1 = newMutableMessageBuilder[*DropPartitionMessageHeader, *DropPartitionRequest] + +// Type aliases for ImportMessageV1 +type ( + MutableImportMessageV1 = specializedMutableMessage[*ImportMessageHeader, *ImportMsg] + ImmutableImportMessageV1 = SpecializedImmutableMessage[*ImportMessageHeader, *ImportMsg] + BroadcastImportMessageV1 = SpecializedBroadcastMessage[*ImportMessageHeader, *ImportMsg] +) + +// MessageTypeWithVersion for ImportMessageV1 +var MessageTypeImportV1 = MessageTypeWithVersion{ + MessageType: MessageTypeImport, + Version: VersionV1, +} + +// MessageSpecializedType for ImportMessageV1 +var SpecializedTypeImportV1 = MessageSpecializedType{ + BodyType: reflect.TypeOf((*ImportMsg)(nil)), + HeaderType: reflect.TypeOf((*ImportMessageHeader)(nil)), +} + +// AsMutableImportMessageV1 converts a BasicMessage to MutableImportMessageV1 +var AsMutableImportMessageV1 = asSpecializedMutableMessage[*ImportMessageHeader, *ImportMsg] + +// MustAsMutableImportMessageV1 converts a BasicMessage to MutableImportMessageV1, panics on error +var MustAsMutableImportMessageV1 = mustAsSpecializedMutableMessage[*ImportMessageHeader, *ImportMsg] + +// AsImmutableImportMessageV1 converts an ImmutableMessage to ImmutableImportMessageV1 +var AsImmutableImportMessageV1 = asSpecializedImmutableMessage[*ImportMessageHeader, *ImportMsg] + +// MustAsImmutableImportMessageV1 converts an ImmutableMessage to ImmutableImportMessageV1, panics on error +var MustAsImmutableImportMessageV1 = MustAsSpecializedImmutableMessage[*ImportMessageHeader, *ImportMsg] + +// AsBroadcastImportMessageV1 converts a BasicMessage to BroadcastImportMessageV1 +var AsBroadcastImportMessageV1 = asSpecializedBroadcastMessage[*ImportMessageHeader, *ImportMsg] + +// MustAsBroadcastImportMessageV1 converts a BasicMessage to BroadcastImportMessageV1, panics on error +var MustAsBroadcastImportMessageV1 = MustAsSpecializedBroadcastMessage[*ImportMessageHeader, *ImportMsg] + +// NewImportMessageBuilderV1 creates a new message builder for ImportMessageV1 +var NewImportMessageBuilderV1 = newMutableMessageBuilder[*ImportMessageHeader, *ImportMsg] + +// Type aliases for CreateSegmentMessageV2 +type ( + MutableCreateSegmentMessageV2 = specializedMutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] + ImmutableCreateSegmentMessageV2 = SpecializedImmutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] + BroadcastCreateSegmentMessageV2 = SpecializedBroadcastMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] +) + +// MessageTypeWithVersion for CreateSegmentMessageV2 +var MessageTypeCreateSegmentV2 = MessageTypeWithVersion{ + MessageType: MessageTypeCreateSegment, + Version: VersionV2, +} + +// MessageSpecializedType for CreateSegmentMessageV2 +var SpecializedTypeCreateSegmentV2 = MessageSpecializedType{ + BodyType: reflect.TypeOf((*CreateSegmentMessageBody)(nil)), + HeaderType: reflect.TypeOf((*CreateSegmentMessageHeader)(nil)), +} + +// AsMutableCreateSegmentMessageV2 converts a BasicMessage to MutableCreateSegmentMessageV2 +var AsMutableCreateSegmentMessageV2 = asSpecializedMutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] + +// MustAsMutableCreateSegmentMessageV2 converts a BasicMessage to MutableCreateSegmentMessageV2, panics on error +var MustAsMutableCreateSegmentMessageV2 = mustAsSpecializedMutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] + +// AsImmutableCreateSegmentMessageV2 converts an ImmutableMessage to ImmutableCreateSegmentMessageV2 +var AsImmutableCreateSegmentMessageV2 = asSpecializedImmutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] + +// MustAsImmutableCreateSegmentMessageV2 converts an ImmutableMessage to ImmutableCreateSegmentMessageV2, panics on error +var MustAsImmutableCreateSegmentMessageV2 = MustAsSpecializedImmutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] + +// AsBroadcastCreateSegmentMessageV2 converts a BasicMessage to BroadcastCreateSegmentMessageV2 +var AsBroadcastCreateSegmentMessageV2 = asSpecializedBroadcastMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] + +// MustAsBroadcastCreateSegmentMessageV2 converts a BasicMessage to BroadcastCreateSegmentMessageV2, panics on error +var MustAsBroadcastCreateSegmentMessageV2 = MustAsSpecializedBroadcastMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] + +// NewCreateSegmentMessageBuilderV2 creates a new message builder for CreateSegmentMessageV2 +var NewCreateSegmentMessageBuilderV2 = newMutableMessageBuilder[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] + +// Type aliases for FlushMessageV2 +type ( + MutableFlushMessageV2 = specializedMutableMessage[*FlushMessageHeader, *FlushMessageBody] + ImmutableFlushMessageV2 = SpecializedImmutableMessage[*FlushMessageHeader, *FlushMessageBody] + BroadcastFlushMessageV2 = SpecializedBroadcastMessage[*FlushMessageHeader, *FlushMessageBody] +) + +// MessageTypeWithVersion for FlushMessageV2 +var MessageTypeFlushV2 = MessageTypeWithVersion{ + MessageType: MessageTypeFlush, + Version: VersionV2, +} + +// MessageSpecializedType for FlushMessageV2 +var SpecializedTypeFlushV2 = MessageSpecializedType{ + BodyType: reflect.TypeOf((*FlushMessageBody)(nil)), + HeaderType: reflect.TypeOf((*FlushMessageHeader)(nil)), +} + +// AsMutableFlushMessageV2 converts a BasicMessage to MutableFlushMessageV2 +var AsMutableFlushMessageV2 = asSpecializedMutableMessage[*FlushMessageHeader, *FlushMessageBody] + +// MustAsMutableFlushMessageV2 converts a BasicMessage to MutableFlushMessageV2, panics on error +var MustAsMutableFlushMessageV2 = mustAsSpecializedMutableMessage[*FlushMessageHeader, *FlushMessageBody] + +// AsImmutableFlushMessageV2 converts an ImmutableMessage to ImmutableFlushMessageV2 +var AsImmutableFlushMessageV2 = asSpecializedImmutableMessage[*FlushMessageHeader, *FlushMessageBody] + +// MustAsImmutableFlushMessageV2 converts an ImmutableMessage to ImmutableFlushMessageV2, panics on error +var MustAsImmutableFlushMessageV2 = MustAsSpecializedImmutableMessage[*FlushMessageHeader, *FlushMessageBody] + +// AsBroadcastFlushMessageV2 converts a BasicMessage to BroadcastFlushMessageV2 +var AsBroadcastFlushMessageV2 = asSpecializedBroadcastMessage[*FlushMessageHeader, *FlushMessageBody] + +// MustAsBroadcastFlushMessageV2 converts a BasicMessage to BroadcastFlushMessageV2, panics on error +var MustAsBroadcastFlushMessageV2 = MustAsSpecializedBroadcastMessage[*FlushMessageHeader, *FlushMessageBody] + +// NewFlushMessageBuilderV2 creates a new message builder for FlushMessageV2 +var NewFlushMessageBuilderV2 = newMutableMessageBuilder[*FlushMessageHeader, *FlushMessageBody] + +// Type aliases for ManualFlushMessageV2 +type ( + MutableManualFlushMessageV2 = specializedMutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody] + ImmutableManualFlushMessageV2 = SpecializedImmutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody] + BroadcastManualFlushMessageV2 = SpecializedBroadcastMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody] +) + +// MessageTypeWithVersion for ManualFlushMessageV2 +var MessageTypeManualFlushV2 = MessageTypeWithVersion{ + MessageType: MessageTypeManualFlush, + Version: VersionV2, +} + +// MessageSpecializedType for ManualFlushMessageV2 +var SpecializedTypeManualFlushV2 = MessageSpecializedType{ + BodyType: reflect.TypeOf((*ManualFlushMessageBody)(nil)), + HeaderType: reflect.TypeOf((*ManualFlushMessageHeader)(nil)), +} + +// AsMutableManualFlushMessageV2 converts a BasicMessage to MutableManualFlushMessageV2 +var AsMutableManualFlushMessageV2 = asSpecializedMutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody] + +// MustAsMutableManualFlushMessageV2 converts a BasicMessage to MutableManualFlushMessageV2, panics on error +var MustAsMutableManualFlushMessageV2 = mustAsSpecializedMutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody] + +// AsImmutableManualFlushMessageV2 converts an ImmutableMessage to ImmutableManualFlushMessageV2 +var AsImmutableManualFlushMessageV2 = asSpecializedImmutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody] + +// MustAsImmutableManualFlushMessageV2 converts an ImmutableMessage to ImmutableManualFlushMessageV2, panics on error +var MustAsImmutableManualFlushMessageV2 = MustAsSpecializedImmutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody] + +// AsBroadcastManualFlushMessageV2 converts a BasicMessage to BroadcastManualFlushMessageV2 +var AsBroadcastManualFlushMessageV2 = asSpecializedBroadcastMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody] + +// MustAsBroadcastManualFlushMessageV2 converts a BasicMessage to BroadcastManualFlushMessageV2, panics on error +var MustAsBroadcastManualFlushMessageV2 = MustAsSpecializedBroadcastMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody] + +// NewManualFlushMessageBuilderV2 creates a new message builder for ManualFlushMessageV2 +var NewManualFlushMessageBuilderV2 = newMutableMessageBuilder[*ManualFlushMessageHeader, *ManualFlushMessageBody] + +// Type aliases for BeginTxnMessageV2 +type ( + MutableBeginTxnMessageV2 = specializedMutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] + ImmutableBeginTxnMessageV2 = SpecializedImmutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] + BroadcastBeginTxnMessageV2 = SpecializedBroadcastMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] +) + +// MessageTypeWithVersion for BeginTxnMessageV2 +var MessageTypeBeginTxnV2 = MessageTypeWithVersion{ + MessageType: MessageTypeBeginTxn, + Version: VersionV2, +} + +// MessageSpecializedType for BeginTxnMessageV2 +var SpecializedTypeBeginTxnV2 = MessageSpecializedType{ + BodyType: reflect.TypeOf((*BeginTxnMessageBody)(nil)), + HeaderType: reflect.TypeOf((*BeginTxnMessageHeader)(nil)), +} + +// AsMutableBeginTxnMessageV2 converts a BasicMessage to MutableBeginTxnMessageV2 +var AsMutableBeginTxnMessageV2 = asSpecializedMutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] + +// MustAsMutableBeginTxnMessageV2 converts a BasicMessage to MutableBeginTxnMessageV2, panics on error +var MustAsMutableBeginTxnMessageV2 = mustAsSpecializedMutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] + +// AsImmutableBeginTxnMessageV2 converts an ImmutableMessage to ImmutableBeginTxnMessageV2 +var AsImmutableBeginTxnMessageV2 = asSpecializedImmutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] + +// MustAsImmutableBeginTxnMessageV2 converts an ImmutableMessage to ImmutableBeginTxnMessageV2, panics on error +var MustAsImmutableBeginTxnMessageV2 = MustAsSpecializedImmutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] + +// AsBroadcastBeginTxnMessageV2 converts a BasicMessage to BroadcastBeginTxnMessageV2 +var AsBroadcastBeginTxnMessageV2 = asSpecializedBroadcastMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] + +// MustAsBroadcastBeginTxnMessageV2 converts a BasicMessage to BroadcastBeginTxnMessageV2, panics on error +var MustAsBroadcastBeginTxnMessageV2 = MustAsSpecializedBroadcastMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] + +// NewBeginTxnMessageBuilderV2 creates a new message builder for BeginTxnMessageV2 +var NewBeginTxnMessageBuilderV2 = newMutableMessageBuilder[*BeginTxnMessageHeader, *BeginTxnMessageBody] + +// Type aliases for CommitTxnMessageV2 +type ( + MutableCommitTxnMessageV2 = specializedMutableMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody] + ImmutableCommitTxnMessageV2 = SpecializedImmutableMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody] + BroadcastCommitTxnMessageV2 = SpecializedBroadcastMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody] +) + +// MessageTypeWithVersion for CommitTxnMessageV2 +var MessageTypeCommitTxnV2 = MessageTypeWithVersion{ + MessageType: MessageTypeCommitTxn, + Version: VersionV2, +} + +// MessageSpecializedType for CommitTxnMessageV2 +var SpecializedTypeCommitTxnV2 = MessageSpecializedType{ + BodyType: reflect.TypeOf((*CommitTxnMessageBody)(nil)), + HeaderType: reflect.TypeOf((*CommitTxnMessageHeader)(nil)), +} + +// AsMutableCommitTxnMessageV2 converts a BasicMessage to MutableCommitTxnMessageV2 +var AsMutableCommitTxnMessageV2 = asSpecializedMutableMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody] + +// MustAsMutableCommitTxnMessageV2 converts a BasicMessage to MutableCommitTxnMessageV2, panics on error +var MustAsMutableCommitTxnMessageV2 = mustAsSpecializedMutableMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody] + +// AsImmutableCommitTxnMessageV2 converts an ImmutableMessage to ImmutableCommitTxnMessageV2 +var AsImmutableCommitTxnMessageV2 = asSpecializedImmutableMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody] + +// MustAsImmutableCommitTxnMessageV2 converts an ImmutableMessage to ImmutableCommitTxnMessageV2, panics on error +var MustAsImmutableCommitTxnMessageV2 = MustAsSpecializedImmutableMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody] + +// AsBroadcastCommitTxnMessageV2 converts a BasicMessage to BroadcastCommitTxnMessageV2 +var AsBroadcastCommitTxnMessageV2 = asSpecializedBroadcastMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody] + +// MustAsBroadcastCommitTxnMessageV2 converts a BasicMessage to BroadcastCommitTxnMessageV2, panics on error +var MustAsBroadcastCommitTxnMessageV2 = MustAsSpecializedBroadcastMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody] + +// NewCommitTxnMessageBuilderV2 creates a new message builder for CommitTxnMessageV2 +var NewCommitTxnMessageBuilderV2 = newMutableMessageBuilder[*CommitTxnMessageHeader, *CommitTxnMessageBody] + +// Type aliases for RollbackTxnMessageV2 +type ( + MutableRollbackTxnMessageV2 = specializedMutableMessage[*RollbackTxnMessageHeader, *RollbackTxnMessageBody] + ImmutableRollbackTxnMessageV2 = SpecializedImmutableMessage[*RollbackTxnMessageHeader, *RollbackTxnMessageBody] + BroadcastRollbackTxnMessageV2 = SpecializedBroadcastMessage[*RollbackTxnMessageHeader, *RollbackTxnMessageBody] +) + +// MessageTypeWithVersion for RollbackTxnMessageV2 +var MessageTypeRollbackTxnV2 = MessageTypeWithVersion{ + MessageType: MessageTypeRollbackTxn, + Version: VersionV2, +} + +// MessageSpecializedType for RollbackTxnMessageV2 +var SpecializedTypeRollbackTxnV2 = MessageSpecializedType{ + BodyType: reflect.TypeOf((*RollbackTxnMessageBody)(nil)), + HeaderType: reflect.TypeOf((*RollbackTxnMessageHeader)(nil)), +} + +// AsMutableRollbackTxnMessageV2 converts a BasicMessage to MutableRollbackTxnMessageV2 +var AsMutableRollbackTxnMessageV2 = asSpecializedMutableMessage[*RollbackTxnMessageHeader, *RollbackTxnMessageBody] + +// MustAsMutableRollbackTxnMessageV2 converts a BasicMessage to MutableRollbackTxnMessageV2, panics on error +var MustAsMutableRollbackTxnMessageV2 = mustAsSpecializedMutableMessage[*RollbackTxnMessageHeader, *RollbackTxnMessageBody] + +// AsImmutableRollbackTxnMessageV2 converts an ImmutableMessage to ImmutableRollbackTxnMessageV2 +var AsImmutableRollbackTxnMessageV2 = asSpecializedImmutableMessage[*RollbackTxnMessageHeader, *RollbackTxnMessageBody] + +// MustAsImmutableRollbackTxnMessageV2 converts an ImmutableMessage to ImmutableRollbackTxnMessageV2, panics on error +var MustAsImmutableRollbackTxnMessageV2 = MustAsSpecializedImmutableMessage[*RollbackTxnMessageHeader, *RollbackTxnMessageBody] + +// AsBroadcastRollbackTxnMessageV2 converts a BasicMessage to BroadcastRollbackTxnMessageV2 +var AsBroadcastRollbackTxnMessageV2 = asSpecializedBroadcastMessage[*RollbackTxnMessageHeader, *RollbackTxnMessageBody] + +// MustAsBroadcastRollbackTxnMessageV2 converts a BasicMessage to BroadcastRollbackTxnMessageV2, panics on error +var MustAsBroadcastRollbackTxnMessageV2 = MustAsSpecializedBroadcastMessage[*RollbackTxnMessageHeader, *RollbackTxnMessageBody] + +// NewRollbackTxnMessageBuilderV2 creates a new message builder for RollbackTxnMessageV2 +var NewRollbackTxnMessageBuilderV2 = newMutableMessageBuilder[*RollbackTxnMessageHeader, *RollbackTxnMessageBody] + +// Type aliases for TxnMessageV2 +type ( + MutableTxnMessageV2 = specializedMutableMessage[*TxnMessageHeader, *TxnMessageBody] + ImmutableTxnMessageV2 = SpecializedImmutableMessage[*TxnMessageHeader, *TxnMessageBody] + BroadcastTxnMessageV2 = SpecializedBroadcastMessage[*TxnMessageHeader, *TxnMessageBody] +) + +// MessageTypeWithVersion for TxnMessageV2 +var MessageTypeTxnV2 = MessageTypeWithVersion{ + MessageType: MessageTypeTxn, + Version: VersionV2, +} + +// MessageSpecializedType for TxnMessageV2 +var SpecializedTypeTxnV2 = MessageSpecializedType{ + BodyType: reflect.TypeOf((*TxnMessageBody)(nil)), + HeaderType: reflect.TypeOf((*TxnMessageHeader)(nil)), +} + +// Type aliases for SchemaChangeMessageV2 +type ( + MutableSchemaChangeMessageV2 = specializedMutableMessage[*SchemaChangeMessageHeader, *SchemaChangeMessageBody] + ImmutableSchemaChangeMessageV2 = SpecializedImmutableMessage[*SchemaChangeMessageHeader, *SchemaChangeMessageBody] + BroadcastSchemaChangeMessageV2 = SpecializedBroadcastMessage[*SchemaChangeMessageHeader, *SchemaChangeMessageBody] +) + +// MessageTypeWithVersion for SchemaChangeMessageV2 +var MessageTypeSchemaChangeV2 = MessageTypeWithVersion{ + MessageType: MessageTypeSchemaChange, + Version: VersionV2, +} + +// MessageSpecializedType for SchemaChangeMessageV2 +var SpecializedTypeSchemaChangeV2 = MessageSpecializedType{ + BodyType: reflect.TypeOf((*SchemaChangeMessageBody)(nil)), + HeaderType: reflect.TypeOf((*SchemaChangeMessageHeader)(nil)), +} + +// AsMutableSchemaChangeMessageV2 converts a BasicMessage to MutableSchemaChangeMessageV2 +var AsMutableSchemaChangeMessageV2 = asSpecializedMutableMessage[*SchemaChangeMessageHeader, *SchemaChangeMessageBody] + +// MustAsMutableSchemaChangeMessageV2 converts a BasicMessage to MutableSchemaChangeMessageV2, panics on error +var MustAsMutableSchemaChangeMessageV2 = mustAsSpecializedMutableMessage[*SchemaChangeMessageHeader, *SchemaChangeMessageBody] + +// AsImmutableSchemaChangeMessageV2 converts an ImmutableMessage to ImmutableSchemaChangeMessageV2 +var AsImmutableSchemaChangeMessageV2 = asSpecializedImmutableMessage[*SchemaChangeMessageHeader, *SchemaChangeMessageBody] + +// MustAsImmutableSchemaChangeMessageV2 converts an ImmutableMessage to ImmutableSchemaChangeMessageV2, panics on error +var MustAsImmutableSchemaChangeMessageV2 = MustAsSpecializedImmutableMessage[*SchemaChangeMessageHeader, *SchemaChangeMessageBody] + +// AsBroadcastSchemaChangeMessageV2 converts a BasicMessage to BroadcastSchemaChangeMessageV2 +var AsBroadcastSchemaChangeMessageV2 = asSpecializedBroadcastMessage[*SchemaChangeMessageHeader, *SchemaChangeMessageBody] + +// MustAsBroadcastSchemaChangeMessageV2 converts a BasicMessage to BroadcastSchemaChangeMessageV2, panics on error +var MustAsBroadcastSchemaChangeMessageV2 = MustAsSpecializedBroadcastMessage[*SchemaChangeMessageHeader, *SchemaChangeMessageBody] + +// NewSchemaChangeMessageBuilderV2 creates a new message builder for SchemaChangeMessageV2 +var NewSchemaChangeMessageBuilderV2 = newMutableMessageBuilder[*SchemaChangeMessageHeader, *SchemaChangeMessageBody] + +// messageTypeMap make the contriants that one header type can only be used for one message type. +var messageTypeMap = map[reflect.Type]MessageType{ + reflect.TypeOf(&messagespb.BeginTxnMessageHeader{}): MessageTypeBeginTxn, + reflect.TypeOf(&messagespb.CommitTxnMessageHeader{}): MessageTypeCommitTxn, + reflect.TypeOf(&messagespb.CreateCollectionMessageHeader{}): MessageTypeCreateCollection, + reflect.TypeOf(&messagespb.CreatePartitionMessageHeader{}): MessageTypeCreatePartition, + reflect.TypeOf(&messagespb.CreateSegmentMessageHeader{}): MessageTypeCreateSegment, + reflect.TypeOf(&messagespb.DeleteMessageHeader{}): MessageTypeDelete, + reflect.TypeOf(&messagespb.DropCollectionMessageHeader{}): MessageTypeDropCollection, + reflect.TypeOf(&messagespb.DropPartitionMessageHeader{}): MessageTypeDropPartition, + reflect.TypeOf(&messagespb.FlushMessageHeader{}): MessageTypeFlush, + reflect.TypeOf(&messagespb.ImportMessageHeader{}): MessageTypeImport, + reflect.TypeOf(&messagespb.InsertMessageHeader{}): MessageTypeInsert, + reflect.TypeOf(&messagespb.ManualFlushMessageHeader{}): MessageTypeManualFlush, + reflect.TypeOf(&messagespb.RollbackTxnMessageHeader{}): MessageTypeRollbackTxn, + reflect.TypeOf(&messagespb.SchemaChangeMessageHeader{}): MessageTypeSchemaChange, + reflect.TypeOf(&messagespb.TimeTickMessageHeader{}): MessageTypeTimeTick, + reflect.TypeOf(&messagespb.TxnMessageHeader{}): MessageTypeTxn, +} + +// MessageTypeWithVersion identifies a message type and version +type MessageTypeWithVersion struct { + MessageType MessageType + Version Version +} + +func (m MessageTypeWithVersion) String() string { + return fmt.Sprintf("%s@v%d", m.MessageType.String(), m.Version) +} + +// MessageSpecializedType contains reflection types for message headers and bodies +type MessageSpecializedType struct { + HeaderType reflect.Type + BodyType reflect.Type +} + +// messageTypeVersionSpecializedMap maps MessageTypeWithVersion to MessageSpecializedType +var messageTypeVersionSpecializedMap = map[MessageTypeWithVersion]MessageSpecializedType{ + MessageTypeBeginTxnV2: SpecializedTypeBeginTxnV2, + MessageTypeCommitTxnV2: SpecializedTypeCommitTxnV2, + MessageTypeCreateCollectionV1: SpecializedTypeCreateCollectionV1, + MessageTypeCreatePartitionV1: SpecializedTypeCreatePartitionV1, + MessageTypeCreateSegmentV2: SpecializedTypeCreateSegmentV2, + MessageTypeDeleteV1: SpecializedTypeDeleteV1, + MessageTypeDropCollectionV1: SpecializedTypeDropCollectionV1, + MessageTypeDropPartitionV1: SpecializedTypeDropPartitionV1, + MessageTypeFlushV2: SpecializedTypeFlushV2, + MessageTypeImportV1: SpecializedTypeImportV1, + MessageTypeInsertV1: SpecializedTypeInsertV1, + MessageTypeManualFlushV2: SpecializedTypeManualFlushV2, + MessageTypeRollbackTxnV2: SpecializedTypeRollbackTxnV2, + MessageTypeSchemaChangeV2: SpecializedTypeSchemaChangeV2, + MessageTypeTimeTickV1: SpecializedTypeTimeTickV1, + MessageTypeTxnV2: SpecializedTypeTxnV2, +} + +// messageSpecializedTypeVersionMap maps MessageSpecializedType to MessageTypeWithVersion +var messageSpecializedTypeVersionMap = map[MessageSpecializedType]MessageTypeWithVersion{ + SpecializedTypeBeginTxnV2: MessageTypeBeginTxnV2, + SpecializedTypeCommitTxnV2: MessageTypeCommitTxnV2, + SpecializedTypeCreateCollectionV1: MessageTypeCreateCollectionV1, + SpecializedTypeCreatePartitionV1: MessageTypeCreatePartitionV1, + SpecializedTypeCreateSegmentV2: MessageTypeCreateSegmentV2, + SpecializedTypeDeleteV1: MessageTypeDeleteV1, + SpecializedTypeDropCollectionV1: MessageTypeDropCollectionV1, + SpecializedTypeDropPartitionV1: MessageTypeDropPartitionV1, + SpecializedTypeFlushV2: MessageTypeFlushV2, + SpecializedTypeImportV1: MessageTypeImportV1, + SpecializedTypeInsertV1: MessageTypeInsertV1, + SpecializedTypeManualFlushV2: MessageTypeManualFlushV2, + SpecializedTypeRollbackTxnV2: MessageTypeRollbackTxnV2, + SpecializedTypeSchemaChangeV2: MessageTypeSchemaChangeV2, + SpecializedTypeTimeTickV1: MessageTypeTimeTickV1, + SpecializedTypeTxnV2: MessageTypeTxnV2, +} diff --git a/pkg/streaming/util/message/specialized_message.go b/pkg/streaming/util/message/specialized_message.go index c596bc92b4..879041b67e 100644 --- a/pkg/streaming/util/message/specialized_message.go +++ b/pkg/streaming/util/message/specialized_message.go @@ -6,87 +6,8 @@ import ( "github.com/cockroachdb/errors" "google.golang.org/protobuf/proto" - - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus/pkg/v2/proto/messagespb" ) -type ( - SegmentAssignment = messagespb.SegmentAssignment - PartitionSegmentAssignment = messagespb.PartitionSegmentAssignment - TimeTickMessageHeader = messagespb.TimeTickMessageHeader - InsertMessageHeader = messagespb.InsertMessageHeader - DeleteMessageHeader = messagespb.DeleteMessageHeader - CreateCollectionMessageHeader = messagespb.CreateCollectionMessageHeader - DropCollectionMessageHeader = messagespb.DropCollectionMessageHeader - CreatePartitionMessageHeader = messagespb.CreatePartitionMessageHeader - DropPartitionMessageHeader = messagespb.DropPartitionMessageHeader - FlushMessageHeader = messagespb.FlushMessageHeader - CreateSegmentMessageHeader = messagespb.CreateSegmentMessageHeader - ManualFlushMessageHeader = messagespb.ManualFlushMessageHeader - BeginTxnMessageHeader = messagespb.BeginTxnMessageHeader - CommitTxnMessageHeader = messagespb.CommitTxnMessageHeader - RollbackTxnMessageHeader = messagespb.RollbackTxnMessageHeader - TxnMessageHeader = messagespb.TxnMessageHeader - ImportMessageHeader = messagespb.ImportMessageHeader - SchemaChangeMessageHeader = messagespb.SchemaChangeMessageHeader -) - -type ( - FlushMessageBody = messagespb.FlushMessageBody - CreateSegmentMessageBody = messagespb.CreateSegmentMessageBody - ManualFlushMessageBody = messagespb.ManualFlushMessageBody - BeginTxnMessageBody = messagespb.BeginTxnMessageBody - CommitTxnMessageBody = messagespb.CommitTxnMessageBody - RollbackTxnMessageBody = messagespb.RollbackTxnMessageBody - TxnMessageBody = messagespb.TxnMessageBody - SchemaChangeMessageBody = messagespb.SchemaChangeMessageBody -) - -type ( - ManualFlushExtraResponse = messagespb.ManualFlushExtraResponse -) - -// messageTypeMap maps the proto message type to the message type. -var messageTypeMap = map[reflect.Type]MessageType{ - reflect.TypeOf(&TimeTickMessageHeader{}): MessageTypeTimeTick, - reflect.TypeOf(&InsertMessageHeader{}): MessageTypeInsert, - reflect.TypeOf(&DeleteMessageHeader{}): MessageTypeDelete, - reflect.TypeOf(&CreateCollectionMessageHeader{}): MessageTypeCreateCollection, - reflect.TypeOf(&DropCollectionMessageHeader{}): MessageTypeDropCollection, - reflect.TypeOf(&CreatePartitionMessageHeader{}): MessageTypeCreatePartition, - reflect.TypeOf(&DropPartitionMessageHeader{}): MessageTypeDropPartition, - reflect.TypeOf(&CreateSegmentMessageHeader{}): MessageTypeCreateSegment, - reflect.TypeOf(&FlushMessageHeader{}): MessageTypeFlush, - reflect.TypeOf(&ManualFlushMessageHeader{}): MessageTypeManualFlush, - reflect.TypeOf(&BeginTxnMessageHeader{}): MessageTypeBeginTxn, - reflect.TypeOf(&CommitTxnMessageHeader{}): MessageTypeCommitTxn, - reflect.TypeOf(&RollbackTxnMessageHeader{}): MessageTypeRollbackTxn, - reflect.TypeOf(&TxnMessageHeader{}): MessageTypeTxn, - reflect.TypeOf(&ImportMessageHeader{}): MessageTypeImport, - reflect.TypeOf(&SchemaChangeMessageHeader{}): MessageTypeSchemaChange, -} - -// messageTypeToCustomHeaderMap maps the message type to the proto message type. -var messageTypeToCustomHeaderMap = map[MessageType]reflect.Type{ - MessageTypeTimeTick: reflect.TypeOf(&TimeTickMessageHeader{}), - MessageTypeInsert: reflect.TypeOf(&InsertMessageHeader{}), - MessageTypeDelete: reflect.TypeOf(&DeleteMessageHeader{}), - MessageTypeCreateCollection: reflect.TypeOf(&CreateCollectionMessageHeader{}), - MessageTypeDropCollection: reflect.TypeOf(&DropCollectionMessageHeader{}), - MessageTypeCreatePartition: reflect.TypeOf(&CreatePartitionMessageHeader{}), - MessageTypeDropPartition: reflect.TypeOf(&DropPartitionMessageHeader{}), - MessageTypeCreateSegment: reflect.TypeOf(&CreateSegmentMessageHeader{}), - MessageTypeFlush: reflect.TypeOf(&FlushMessageHeader{}), - MessageTypeManualFlush: reflect.TypeOf(&ManualFlushMessageHeader{}), - MessageTypeBeginTxn: reflect.TypeOf(&BeginTxnMessageHeader{}), - MessageTypeCommitTxn: reflect.TypeOf(&CommitTxnMessageHeader{}), - MessageTypeRollbackTxn: reflect.TypeOf(&RollbackTxnMessageHeader{}), - MessageTypeTxn: reflect.TypeOf(&TxnMessageHeader{}), - MessageTypeImport: reflect.TypeOf(&ImportMessageHeader{}), - MessageTypeSchemaChange: reflect.TypeOf(&SchemaChangeMessageHeader{}), -} - // A system preserved message, should not allowed to provide outside of the streaming system. var systemMessageType = map[MessageType]struct{}{ MessageTypeTimeTick: {}, @@ -110,112 +31,6 @@ var exclusiveRequiredMessageType = map[MessageType]struct{}{ MessageTypeSchemaChange: {}, } -// List all specialized message types. -type ( - MutableTimeTickMessageV1 = specializedMutableMessage[*TimeTickMessageHeader, *msgpb.TimeTickMsg] - MutableInsertMessageV1 = specializedMutableMessage[*InsertMessageHeader, *msgpb.InsertRequest] - MutableDeleteMessageV1 = specializedMutableMessage[*DeleteMessageHeader, *msgpb.DeleteRequest] - MutableCreateCollectionMessageV1 = specializedMutableMessage[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest] - MutableDropCollectionMessageV1 = specializedMutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest] - MutableCreatePartitionMessageV1 = specializedMutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest] - MutableDropPartitionMessageV1 = specializedMutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest] - MutableImportMessageV1 = specializedMutableMessage[*ImportMessageHeader, *msgpb.ImportMsg] - MutableCreateSegmentMessageV2 = specializedMutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] - MutableFlushMessageV2 = specializedMutableMessage[*FlushMessageHeader, *FlushMessageBody] - MutableBeginTxnMessageV2 = specializedMutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] - MutableCommitTxnMessageV2 = specializedMutableMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody] - MutableRollbackTxnMessageV2 = specializedMutableMessage[*RollbackTxnMessageHeader, *RollbackTxnMessageBody] - MutableSchemaChangeMessageV2 = specializedMutableMessage[*SchemaChangeMessageHeader, *SchemaChangeMessageBody] - - ImmutableTimeTickMessageV1 = specializedImmutableMessage[*TimeTickMessageHeader, *msgpb.TimeTickMsg] - ImmutableInsertMessageV1 = specializedImmutableMessage[*InsertMessageHeader, *msgpb.InsertRequest] - ImmutableDeleteMessageV1 = specializedImmutableMessage[*DeleteMessageHeader, *msgpb.DeleteRequest] - ImmutableCreateCollectionMessageV1 = specializedImmutableMessage[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest] - ImmutableDropCollectionMessageV1 = specializedImmutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest] - ImmutableCreatePartitionMessageV1 = specializedImmutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest] - ImmutableDropPartitionMessageV1 = specializedImmutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest] - ImmutableImportMessageV1 = specializedImmutableMessage[*ImportMessageHeader, *msgpb.ImportMsg] - ImmutableCreateSegmentMessageV2 = specializedImmutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] - ImmutableFlushMessageV2 = specializedImmutableMessage[*FlushMessageHeader, *FlushMessageBody] - ImmutableManualFlushMessageV2 = specializedImmutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody] - ImmutableBeginTxnMessageV2 = specializedImmutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] - ImmutableCommitTxnMessageV2 = specializedImmutableMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody] - ImmutableRollbackTxnMessageV2 = specializedImmutableMessage[*RollbackTxnMessageHeader, *RollbackTxnMessageBody] - ImmutableSchemaChangeMessageV2 = specializedImmutableMessage[*SchemaChangeMessageHeader, *SchemaChangeMessageBody] -) - -// List all as functions for specialized messages. -var ( - AsMutableTimeTickMessageV1 = asSpecializedMutableMessage[*TimeTickMessageHeader, *msgpb.TimeTickMsg] - AsMutableInsertMessageV1 = asSpecializedMutableMessage[*InsertMessageHeader, *msgpb.InsertRequest] - AsMutableDeleteMessageV1 = asSpecializedMutableMessage[*DeleteMessageHeader, *msgpb.DeleteRequest] - AsMutableCreateCollectionMessageV1 = asSpecializedMutableMessage[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest] - AsMutableDropCollectionMessageV1 = asSpecializedMutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest] - AsMutableCreatePartitionMessageV1 = asSpecializedMutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest] - AsMutableDropPartitionMessageV1 = asSpecializedMutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest] - AsMutableImportMessageV1 = asSpecializedMutableMessage[*ImportMessageHeader, *msgpb.ImportMsg] - AsMutableCreateSegmentMessageV2 = asSpecializedMutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] - AsMutableFlushMessageV2 = asSpecializedMutableMessage[*FlushMessageHeader, *FlushMessageBody] - AsMutableManualFlushMessageV2 = asSpecializedMutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody] - AsMutableBeginTxnMessageV2 = asSpecializedMutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] - AsMutableCommitTxnMessageV2 = asSpecializedMutableMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody] - AsMutableRollbackTxnMessageV2 = asSpecializedMutableMessage[*RollbackTxnMessageHeader, *RollbackTxnMessageBody] - - MustAsMutableTimeTickMessageV1 = mustAsSpecializedMutableMessage[*TimeTickMessageHeader, *msgpb.TimeTickMsg] - MustAsMutableInsertMessageV1 = mustAsSpecializedMutableMessage[*InsertMessageHeader, *msgpb.InsertRequest] - MustAsMutableDeleteMessageV1 = mustAsSpecializedMutableMessage[*DeleteMessageHeader, *msgpb.DeleteRequest] - MustAsMutableCreateCollectionMessageV1 = mustAsSpecializedMutableMessage[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest] - MustAsMutableDropCollectionMessageV1 = mustAsSpecializedMutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest] - MustAsMutableCreatePartitionMessageV1 = mustAsSpecializedMutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest] - MustAsMutableDropPartitionMessageV1 = mustAsSpecializedMutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest] - MustAsMutableImportMessageV1 = mustAsSpecializedMutableMessage[*ImportMessageHeader, *msgpb.ImportMsg] - MustAsMutableCreateSegmentMessageV2 = mustAsSpecializedMutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] - MustAsMutableFlushMessageV2 = mustAsSpecializedMutableMessage[*FlushMessageHeader, *FlushMessageBody] - MustAsMutableManualFlushMessageV2 = mustAsSpecializedMutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody] - MustAsMutableBeginTxnMessageV2 = mustAsSpecializedMutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] - MustAsMutableCommitTxnMessageV2 = mustAsSpecializedMutableMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody] - MustAsMutableRollbackTxnMessageV2 = mustAsSpecializedMutableMessage[*RollbackTxnMessageHeader, *RollbackTxnMessageBody] - MustAsMutableCollectionSchemaChangeV2 = mustAsSpecializedMutableMessage[*SchemaChangeMessageHeader, *SchemaChangeMessageBody] - - AsImmutableTimeTickMessageV1 = asSpecializedImmutableMessage[*TimeTickMessageHeader, *msgpb.TimeTickMsg] - AsImmutableInsertMessageV1 = asSpecializedImmutableMessage[*InsertMessageHeader, *msgpb.InsertRequest] - AsImmutableDeleteMessageV1 = asSpecializedImmutableMessage[*DeleteMessageHeader, *msgpb.DeleteRequest] - AsImmutableCreateCollectionMessageV1 = asSpecializedImmutableMessage[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest] - AsImmutableDropCollectionMessageV1 = asSpecializedImmutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest] - AsImmutableCreatePartitionMessageV1 = asSpecializedImmutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest] - AsImmutableDropPartitionMessageV1 = asSpecializedImmutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest] - AsImmutableImportMessageV1 = asSpecializedImmutableMessage[*ImportMessageHeader, *msgpb.ImportMsg] - AsImmutableCreateSegmentMessageV2 = asSpecializedImmutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] - AsImmutableFlushMessageV2 = asSpecializedImmutableMessage[*FlushMessageHeader, *FlushMessageBody] - AsImmutableManualFlushMessageV2 = asSpecializedImmutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody] - AsImmutableBeginTxnMessageV2 = asSpecializedImmutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] - AsImmutableCommitTxnMessageV2 = asSpecializedImmutableMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody] - AsImmutableRollbackTxnMessageV2 = asSpecializedImmutableMessage[*RollbackTxnMessageHeader, *RollbackTxnMessageBody] - AsImmutableCollectionSchemaChangeV2 = asSpecializedImmutableMessage[*SchemaChangeMessageHeader, *SchemaChangeMessageBody] - - MustAsImmutableTimeTickMessageV1 = mustAsSpecializedImmutableMessage[*TimeTickMessageHeader, *msgpb.TimeTickMsg] - MustAsImmutableInsertMessageV1 = mustAsSpecializedImmutableMessage[*InsertMessageHeader, *msgpb.InsertRequest] - MustAsImmutableDeleteMessageV1 = mustAsSpecializedImmutableMessage[*DeleteMessageHeader, *msgpb.DeleteRequest] - MustAsImmutableCreateCollectionMessageV1 = mustAsSpecializedImmutableMessage[*CreateCollectionMessageHeader, *msgpb.CreateCollectionRequest] - MustAsImmutableDropCollectionMessageV1 = mustAsSpecializedImmutableMessage[*DropCollectionMessageHeader, *msgpb.DropCollectionRequest] - MustAsImmutableCreatePartitionMessageV1 = mustAsSpecializedImmutableMessage[*CreatePartitionMessageHeader, *msgpb.CreatePartitionRequest] - MustAsImmutableDropPartitionMessageV1 = mustAsSpecializedImmutableMessage[*DropPartitionMessageHeader, *msgpb.DropPartitionRequest] - MustAsImmutableImportMessageV1 = mustAsSpecializedImmutableMessage[*ImportMessageHeader, *msgpb.ImportMsg] - MustAsImmutableCreateSegmentMessageV2 = mustAsSpecializedImmutableMessage[*CreateSegmentMessageHeader, *CreateSegmentMessageBody] - MustAsImmutableFlushMessageV2 = mustAsSpecializedImmutableMessage[*FlushMessageHeader, *FlushMessageBody] - MustAsImmutableManualFlushMessageV2 = mustAsSpecializedImmutableMessage[*ManualFlushMessageHeader, *ManualFlushMessageBody] - MustAsImmutableBeginTxnMessageV2 = mustAsSpecializedImmutableMessage[*BeginTxnMessageHeader, *BeginTxnMessageBody] - MustAsImmutableCommitTxnMessageV2 = mustAsSpecializedImmutableMessage[*CommitTxnMessageHeader, *CommitTxnMessageBody] - MustAsImmutableCollectionSchemaChangeV2 = mustAsSpecializedImmutableMessage[*SchemaChangeMessageHeader, *SchemaChangeMessageBody] - AsImmutableTxnMessage = func(msg ImmutableMessage) ImmutableTxnMessage { - underlying, ok := msg.(*immutableTxnMessageImpl) - if !ok { - return nil - } - return underlying - } -) - // mustAsSpecializedMutableMessage converts a MutableMessage to a specialized MutableMessage. // It will panic if the message is not the target specialized message or failed to decode the specialized header. func mustAsSpecializedMutableMessage[H proto.Message, B proto.Message](msg BasicMessage) specializedMutableMessage[H, B] { @@ -242,8 +57,8 @@ func asSpecializedMutableMessage[H proto.Message, B proto.Message](msg BasicMess underlying := msg.(*messageImpl) var header H - msgType := mustGetMessageTypeFromHeader(header) - if underlying.MessageType() != msgType { + msgType := MustGetMessageTypeWithVersion[H, B]() + if underlying.MessageType() != msgType.MessageType { // The message type do not match the specialized header. return nil, errors.New("message type do not match specialized header") } @@ -270,9 +85,9 @@ func asSpecializedMutableMessage[H proto.Message, B proto.Message](msg BasicMess }, nil } -// mustAsSpecializedMutableMessage converts a ImmutableMutableMessage to a specialized ImmutableMutableMessage. +// MustAsSpecializedImmutableMessage converts a ImmutableMutableMessage to a specialized ImmutableMutableMessage. // It will panic if the message is not the target specialized message or failed to decode the specialized header. -func mustAsSpecializedImmutableMessage[H proto.Message, B proto.Message](msg ImmutableMessage) specializedImmutableMessage[H, B] { +func MustAsSpecializedImmutableMessage[H proto.Message, B proto.Message](msg ImmutableMessage) SpecializedImmutableMessage[H, B] { smsg, err := asSpecializedImmutableMessage[H, B](msg) if err != nil { panic( @@ -291,8 +106,8 @@ func mustAsSpecializedImmutableMessage[H proto.Message, B proto.Message](msg Imm // asSpecializedImmutableMessage converts a ImmutableMessage to a specialized ImmutableMessage. // Return nil, error if the message is the target specialized message but failed to decode the specialized header. // Return asSpecializedImmutableMessage, nil if the message is the target specialized message and successfully decoded the specialized header. -func asSpecializedImmutableMessage[H proto.Message, B proto.Message](msg ImmutableMessage) (specializedImmutableMessage[H, B], error) { - if already, ok := msg.(specializedImmutableMessage[H, B]); ok { +func asSpecializedImmutableMessage[H proto.Message, B proto.Message](msg ImmutableMessage) (SpecializedImmutableMessage[H, B], error) { + if already, ok := msg.(SpecializedImmutableMessage[H, B]); ok { return already, nil } underlying, ok := msg.(*immutableMessageImpl) @@ -302,8 +117,8 @@ func asSpecializedImmutableMessage[H proto.Message, B proto.Message](msg Immutab } var header H - msgType := mustGetMessageTypeFromHeader(header) - if underlying.MessageType() != msgType { + msgType := MustGetMessageTypeWithVersion[H, B]() + if underlying.MessageType() != msgType.MessageType { // The message type do not match the specialized header. return nil, errors.New("message type do not match specialized header") } @@ -329,14 +144,28 @@ func asSpecializedImmutableMessage[H proto.Message, B proto.Message](msg Immutab }, nil } -// mustGetMessageTypeFromMessageHeader returns the message type of the given message header. -func mustGetMessageTypeFromHeader(msg proto.Message) MessageType { - t := reflect.TypeOf(msg) - mt, ok := messageTypeMap[t] - if !ok { - panic(fmt.Sprintf("unsupported message type of proto header: %s", t.Name())) +// asSpecializedBroadcastMessage converts a BasicMessage to a specialized BroadcastMessage. +// Return nil, error if the message is not the target specialized message or failed to decode the specialized header. +// Return specializedBroadcastMessage, nil if the message is the target specialized message and successfully decoded the specialized header. +func asSpecializedBroadcastMessage[H proto.Message, B proto.Message](msg BasicMessage) (SpecializedBroadcastMessage[H, B], error) { + if already, ok := msg.(SpecializedBroadcastMessage[H, B]); ok { + return already, nil } - return mt + sm, err := asSpecializedMutableMessage[H, B](msg) + if err != nil { + return nil, err + } + return sm, nil +} + +// MustAsSpecializedBroadcastMessage converts a BasicMessage to a specialized BroadcastMessage. +// It will panic if the message is not the target specialized message or failed to decode the specialized header. +func MustAsSpecializedBroadcastMessage[H proto.Message, B proto.Message](msg BasicMessage) SpecializedBroadcastMessage[H, B] { + smsg, err := asSpecializedBroadcastMessage[H, B](msg) + if err != nil { + panic(err) + } + return smsg } // specializedMutableMessageImpl is the specialized mutable message implementation. diff --git a/pkg/streaming/util/message/utils.go b/pkg/streaming/util/message/utils.go new file mode 100644 index 0000000000..e219658bd1 --- /dev/null +++ b/pkg/streaming/util/message/utils.go @@ -0,0 +1,53 @@ +package message + +import ( + "reflect" + + "google.golang.org/protobuf/proto" +) + +// AsImmutableTxnMessage converts an ImmutableMessage to ImmutableTxnMessage +var AsImmutableTxnMessage = func(msg ImmutableMessage) ImmutableTxnMessage { + underlying, ok := msg.(*immutableTxnMessageImpl) + if !ok { + return nil + } + return underlying +} + +// NewMessageTypeWithVersion creates a new MessageTypeWithVersion. +func NewMessageTypeWithVersion(t MessageType, v Version) MessageTypeWithVersion { + return MessageTypeWithVersion{MessageType: t, Version: v} +} + +// GetSerializeType returns the specialized message type for the given message type and version. +func GetSerializeType(mv MessageTypeWithVersion) (MessageSpecializedType, bool) { + if mv.Version == VersionOld { + // There's some old messages that is coming from old arch of msgstream. + // We need to convert them to versionV1 to find the specialized type. + mv.Version = VersionV1 + } + typ, ok := messageTypeVersionSpecializedMap[mv] + return typ, ok +} + +// GetMessageTypeWithVersion returns the message type with version for the given message type and version. +func GetMessageTypeWithVersion[H proto.Message, B proto.Message]() (MessageTypeWithVersion, bool) { + var h H + var b B + styp := MessageSpecializedType{ + HeaderType: reflect.TypeOf(h), + BodyType: reflect.TypeOf(b), + } + mv, ok := messageSpecializedTypeVersionMap[styp] + return mv, ok +} + +// MustGetMessageTypeWithVersion returns the message type with version for the given message type and version, panics on error. +func MustGetMessageTypeWithVersion[H proto.Message, B proto.Message]() MessageTypeWithVersion { + mv, ok := GetMessageTypeWithVersion[H, B]() + if !ok { + panic("message type not found") + } + return mv +}