enhance: support more ddl apis to replicate according to cdc (#41678)

- issue: #41677

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2025-05-08 20:02:58 +08:00 committed by GitHub
parent f337d2989b
commit dfd2548c1b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 468 additions and 29 deletions

View File

@ -1338,6 +1338,7 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC
Condition: NewTaskCondition(ctx), Condition: NewTaskCondition(ctx),
AlterCollectionRequest: request, AlterCollectionRequest: request,
mixCoord: node.mixCoord, mixCoord: node.mixCoord,
replicateMsgStream: node.replicateMsgStream,
} }
log := log.Ctx(ctx).With( log := log.Ctx(ctx).With(
@ -1402,6 +1403,7 @@ func (node *Proxy) AlterCollectionField(ctx context.Context, request *milvuspb.A
Condition: NewTaskCondition(ctx), Condition: NewTaskCondition(ctx),
AlterCollectionFieldRequest: request, AlterCollectionFieldRequest: request,
mixCoord: node.mixCoord, mixCoord: node.mixCoord,
replicateMsgStream: node.replicateMsgStream,
} }
log := log.Ctx(ctx).With( log := log.Ctx(ctx).With(
@ -3895,6 +3897,7 @@ func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAlia
Condition: NewTaskCondition(ctx), Condition: NewTaskCondition(ctx),
CreateAliasRequest: request, CreateAliasRequest: request,
mixCoord: node.mixCoord, mixCoord: node.mixCoord,
replicateMsgStream: node.replicateMsgStream,
} }
method := "CreateAlias" method := "CreateAlias"
@ -4086,6 +4089,7 @@ func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasReq
Condition: NewTaskCondition(ctx), Condition: NewTaskCondition(ctx),
DropAliasRequest: request, DropAliasRequest: request,
mixCoord: node.mixCoord, mixCoord: node.mixCoord,
replicateMsgStream: node.replicateMsgStream,
} }
method := "DropAlias" method := "DropAlias"
@ -4149,6 +4153,7 @@ func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasR
Condition: NewTaskCondition(ctx), Condition: NewTaskCondition(ctx),
AlterAliasRequest: request, AlterAliasRequest: request,
mixCoord: node.mixCoord, mixCoord: node.mixCoord,
replicateMsgStream: node.replicateMsgStream,
} }
method := "AlterAlias" method := "AlterAlias"
@ -5975,6 +5980,10 @@ func (node *Proxy) RenameCollection(ctx context.Context, req *milvuspb.RenameCol
return merr.Status(err), err return merr.Status(err), err
} }
if merr.Ok(resp) {
SendReplicateMessagePack(ctx, node.replicateMsgStream, req)
}
return resp, nil return resp, nil
} }

View File

@ -994,6 +994,7 @@ type alterCollectionTask struct {
ctx context.Context ctx context.Context
mixCoord types.MixCoordClient mixCoord types.MixCoordClient
result *commonpb.Status result *commonpb.Status
replicateMsgStream msgstream.MsgStream
} }
func (t *alterCollectionTask) TraceCtx() context.Context { func (t *alterCollectionTask) TraceCtx() context.Context {
@ -1206,7 +1207,11 @@ func (t *alterCollectionTask) PreExecute(ctx context.Context) error {
func (t *alterCollectionTask) Execute(ctx context.Context) error { func (t *alterCollectionTask) Execute(ctx context.Context) error {
var err error var err error
t.result, err = t.mixCoord.AlterCollection(ctx, t.AlterCollectionRequest) t.result, err = t.mixCoord.AlterCollection(ctx, t.AlterCollectionRequest)
return merr.CheckRPCCall(t.result, err) if err = merr.CheckRPCCall(t.result, err); err != nil {
return err
}
SendReplicateMessagePack(ctx, t.replicateMsgStream, t.AlterCollectionRequest)
return nil
} }
func (t *alterCollectionTask) PostExecute(ctx context.Context) error { func (t *alterCollectionTask) PostExecute(ctx context.Context) error {
@ -1220,6 +1225,7 @@ type alterCollectionFieldTask struct {
ctx context.Context ctx context.Context
mixCoord types.MixCoordClient mixCoord types.MixCoordClient
result *commonpb.Status result *commonpb.Status
replicateMsgStream msgstream.MsgStream
} }
func (t *alterCollectionFieldTask) TraceCtx() context.Context { func (t *alterCollectionFieldTask) TraceCtx() context.Context {
@ -1382,7 +1388,11 @@ func (t *alterCollectionFieldTask) PreExecute(ctx context.Context) error {
func (t *alterCollectionFieldTask) Execute(ctx context.Context) error { func (t *alterCollectionFieldTask) Execute(ctx context.Context) error {
var err error var err error
t.result, err = t.mixCoord.AlterCollectionField(ctx, t.AlterCollectionFieldRequest) t.result, err = t.mixCoord.AlterCollectionField(ctx, t.AlterCollectionFieldRequest)
return merr.CheckRPCCall(t.result, err) if err = merr.CheckRPCCall(t.result, err); err != nil {
return err
}
SendReplicateMessagePack(ctx, t.replicateMsgStream, t.AlterCollectionFieldRequest)
return nil
} }
func (t *alterCollectionFieldTask) PostExecute(ctx context.Context) error { func (t *alterCollectionFieldTask) PostExecute(ctx context.Context) error {

View File

@ -22,6 +22,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil" "github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
@ -34,6 +35,7 @@ type CreateAliasTask struct {
*milvuspb.CreateAliasRequest *milvuspb.CreateAliasRequest
ctx context.Context ctx context.Context
mixCoord types.MixCoordClient mixCoord types.MixCoordClient
replicateMsgStream msgstream.MsgStream
result *commonpb.Status result *commonpb.Status
} }
@ -106,7 +108,11 @@ func (t *CreateAliasTask) PreExecute(ctx context.Context) error {
func (t *CreateAliasTask) Execute(ctx context.Context) error { func (t *CreateAliasTask) Execute(ctx context.Context) error {
var err error var err error
t.result, err = t.mixCoord.CreateAlias(ctx, t.CreateAliasRequest) t.result, err = t.mixCoord.CreateAlias(ctx, t.CreateAliasRequest)
return merr.CheckRPCCall(t.result, err) if err = merr.CheckRPCCall(t.result, err); err != nil {
return err
}
SendReplicateMessagePack(ctx, t.replicateMsgStream, t.CreateAliasRequest)
return nil
} }
// PostExecute defines the post execution, do nothing for create alias // PostExecute defines the post execution, do nothing for create alias
@ -121,6 +127,7 @@ type DropAliasTask struct {
*milvuspb.DropAliasRequest *milvuspb.DropAliasRequest
ctx context.Context ctx context.Context
mixCoord types.MixCoordClient mixCoord types.MixCoordClient
replicateMsgStream msgstream.MsgStream
result *commonpb.Status result *commonpb.Status
} }
@ -180,7 +187,11 @@ func (t *DropAliasTask) PreExecute(ctx context.Context) error {
func (t *DropAliasTask) Execute(ctx context.Context) error { func (t *DropAliasTask) Execute(ctx context.Context) error {
var err error var err error
t.result, err = t.mixCoord.DropAlias(ctx, t.DropAliasRequest) t.result, err = t.mixCoord.DropAlias(ctx, t.DropAliasRequest)
return merr.CheckRPCCall(t.result, err) if err = merr.CheckRPCCall(t.result, err); err != nil {
return err
}
SendReplicateMessagePack(ctx, t.replicateMsgStream, t.DropAliasRequest)
return nil
} }
func (t *DropAliasTask) PostExecute(ctx context.Context) error { func (t *DropAliasTask) PostExecute(ctx context.Context) error {
@ -194,6 +205,7 @@ type AlterAliasTask struct {
*milvuspb.AlterAliasRequest *milvuspb.AlterAliasRequest
ctx context.Context ctx context.Context
mixCoord types.MixCoordClient mixCoord types.MixCoordClient
replicateMsgStream msgstream.MsgStream
result *commonpb.Status result *commonpb.Status
} }
@ -256,7 +268,11 @@ func (t *AlterAliasTask) PreExecute(ctx context.Context) error {
func (t *AlterAliasTask) Execute(ctx context.Context) error { func (t *AlterAliasTask) Execute(ctx context.Context) error {
var err error var err error
t.result, err = t.mixCoord.AlterAlias(ctx, t.AlterAliasRequest) t.result, err = t.mixCoord.AlterAlias(ctx, t.AlterAliasRequest)
return merr.CheckRPCCall(t.result, err) if err = merr.CheckRPCCall(t.result, err); err != nil {
return err
}
SendReplicateMessagePack(ctx, t.replicateMsgStream, t.AlterAliasRequest)
return nil
} }
func (t *AlterAliasTask) PostExecute(ctx context.Context) error { func (t *AlterAliasTask) PostExecute(ctx context.Context) error {

View File

@ -2135,6 +2135,21 @@ func SendReplicateMessagePack(ctx context.Context, replicateMsgStream msgstream.
var tsMsg msgstream.TsMsg var tsMsg msgstream.TsMsg
switch r := request.(type) { switch r := request.(type) {
case *milvuspb.AlterCollectionRequest:
tsMsg = &msgstream.AlterCollectionMsg{
BaseMsg: getBaseMsg(ctx, ts),
AlterCollectionRequest: r,
}
case *milvuspb.AlterCollectionFieldRequest:
tsMsg = &msgstream.AlterCollectionFieldMsg{
BaseMsg: getBaseMsg(ctx, ts),
AlterCollectionFieldRequest: r,
}
case *milvuspb.RenameCollectionRequest:
tsMsg = &msgstream.RenameCollectionMsg{
BaseMsg: getBaseMsg(ctx, ts),
RenameCollectionRequest: r,
}
case *milvuspb.CreateDatabaseRequest: case *milvuspb.CreateDatabaseRequest:
tsMsg = &msgstream.CreateDatabaseMsg{ tsMsg = &msgstream.CreateDatabaseMsg{
BaseMsg: getBaseMsg(ctx, ts), BaseMsg: getBaseMsg(ctx, ts),
@ -2230,6 +2245,21 @@ func SendReplicateMessagePack(ctx context.Context, replicateMsgStream msgstream.
BaseMsg: getBaseMsg(ctx, ts), BaseMsg: getBaseMsg(ctx, ts),
OperatePrivilegeV2Request: r, OperatePrivilegeV2Request: r,
} }
case *milvuspb.CreateAliasRequest:
tsMsg = &msgstream.CreateAliasMsg{
BaseMsg: getBaseMsg(ctx, ts),
CreateAliasRequest: r,
}
case *milvuspb.DropAliasRequest:
tsMsg = &msgstream.DropAliasMsg{
BaseMsg: getBaseMsg(ctx, ts),
DropAliasRequest: r,
}
case *milvuspb.AlterAliasRequest:
tsMsg = &msgstream.AlterAliasMsg{
BaseMsg: getBaseMsg(ctx, ts),
AlterAliasRequest: r,
}
default: default:
log.Warn("unknown request", zap.Any("request", request)) log.Warn("unknown request", zap.Any("request", request))
return return

View File

@ -2447,6 +2447,9 @@ func TestSendReplicateMessagePack(t *testing.T) {
t.Run("normal case", func(t *testing.T) { t.Run("normal case", func(t *testing.T) {
mockStream.EXPECT().Produce(mock.Anything, mock.Anything).Return(nil) mockStream.EXPECT().Produce(mock.Anything, mock.Anything).Return(nil)
SendReplicateMessagePack(ctx, mockStream, &milvuspb.AlterCollectionRequest{})
SendReplicateMessagePack(ctx, mockStream, &milvuspb.AlterCollectionFieldRequest{})
SendReplicateMessagePack(ctx, mockStream, &milvuspb.RenameCollectionRequest{})
SendReplicateMessagePack(ctx, mockStream, &milvuspb.CreateDatabaseRequest{}) SendReplicateMessagePack(ctx, mockStream, &milvuspb.CreateDatabaseRequest{})
SendReplicateMessagePack(ctx, mockStream, &milvuspb.DropDatabaseRequest{}) SendReplicateMessagePack(ctx, mockStream, &milvuspb.DropDatabaseRequest{})
SendReplicateMessagePack(ctx, mockStream, &milvuspb.FlushRequest{}) SendReplicateMessagePack(ctx, mockStream, &milvuspb.FlushRequest{})
@ -2456,6 +2459,15 @@ func TestSendReplicateMessagePack(t *testing.T) {
SendReplicateMessagePack(ctx, mockStream, &milvuspb.DropIndexRequest{}) SendReplicateMessagePack(ctx, mockStream, &milvuspb.DropIndexRequest{})
SendReplicateMessagePack(ctx, mockStream, &milvuspb.LoadPartitionsRequest{}) SendReplicateMessagePack(ctx, mockStream, &milvuspb.LoadPartitionsRequest{})
SendReplicateMessagePack(ctx, mockStream, &milvuspb.ReleasePartitionsRequest{}) SendReplicateMessagePack(ctx, mockStream, &milvuspb.ReleasePartitionsRequest{})
SendReplicateMessagePack(ctx, mockStream, &milvuspb.CreateCredentialRequest{})
SendReplicateMessagePack(ctx, mockStream, &milvuspb.DeleteCredentialRequest{})
SendReplicateMessagePack(ctx, mockStream, &milvuspb.CreateRoleRequest{})
SendReplicateMessagePack(ctx, mockStream, &milvuspb.DropRoleRequest{})
SendReplicateMessagePack(ctx, mockStream, &milvuspb.OperateUserRoleRequest{})
SendReplicateMessagePack(ctx, mockStream, &milvuspb.OperatePrivilegeRequest{})
SendReplicateMessagePack(ctx, mockStream, &milvuspb.CreateAliasRequest{})
SendReplicateMessagePack(ctx, mockStream, &milvuspb.DropAliasRequest{})
SendReplicateMessagePack(ctx, mockStream, &milvuspb.AlterAliasRequest{})
}) })
} }

View File

@ -249,9 +249,10 @@ func (d *Dispatcher) work() {
for vchannel, p := range targetPacks { for vchannel, p := range targetPacks {
var err error var err error
t, _ := d.targets.Get(vchannel) t, _ := d.targets.Get(vchannel)
isReplicateChannel := strings.Contains(vchannel, paramtable.Get().CommonCfg.ReplicateMsgChannel.GetValue())
// The dispatcher seeks from the oldest target, // The dispatcher seeks from the oldest target,
// so for each target, msg before the target position must be filtered out. // so for each target, msg before the target position must be filtered out.
if p.EndTs <= t.pos.GetTimestamp() { if p.EndTs <= t.pos.GetTimestamp() && !isReplicateChannel {
log.Info("skip msg", log.Info("skip msg",
zap.String("vchannel", vchannel), zap.String("vchannel", vchannel),
zap.Int("msgCount", len(p.Msgs)), zap.Int("msgCount", len(p.Msgs)),

View File

@ -0,0 +1,184 @@
/*
* Licensed to the LF AI & Data foundation under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package msgstream
import (
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
)
type CreateAliasMsg struct {
BaseMsg
*milvuspb.CreateAliasRequest
}
var _ TsMsg = &CreateAliasMsg{}
func (c *CreateAliasMsg) ID() UniqueID {
return c.Base.MsgID
}
func (c *CreateAliasMsg) SetID(id UniqueID) {
c.Base.MsgID = id
}
func (c *CreateAliasMsg) Type() MsgType {
return c.Base.MsgType
}
func (c *CreateAliasMsg) SourceID() int64 {
return c.Base.SourceID
}
func (c *CreateAliasMsg) Marshal(input TsMsg) (MarshalType, error) {
createAliasMsg := input.(*CreateAliasMsg)
createAliasRequest := createAliasMsg.CreateAliasRequest
mb, err := proto.Marshal(createAliasRequest)
if err != nil {
return nil, err
}
return mb, nil
}
func (c *CreateAliasMsg) Unmarshal(input MarshalType) (TsMsg, error) {
createAliasRequest := &milvuspb.CreateAliasRequest{}
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
err = proto.Unmarshal(in, createAliasRequest)
if err != nil {
return nil, err
}
createAliasMsg := &CreateAliasMsg{CreateAliasRequest: createAliasRequest}
createAliasMsg.BeginTimestamp = createAliasRequest.GetBase().GetTimestamp()
createAliasMsg.EndTimestamp = createAliasRequest.GetBase().GetTimestamp()
return createAliasMsg, nil
}
func (c *CreateAliasMsg) Size() int {
return proto.Size(c.CreateAliasRequest)
}
type DropAliasMsg struct {
BaseMsg
*milvuspb.DropAliasRequest
}
var _ TsMsg = &DropAliasMsg{}
func (d *DropAliasMsg) ID() UniqueID {
return d.Base.MsgID
}
func (d *DropAliasMsg) SetID(id UniqueID) {
d.Base.MsgID = id
}
func (d *DropAliasMsg) Type() MsgType {
return d.Base.MsgType
}
func (d *DropAliasMsg) SourceID() int64 {
return d.Base.SourceID
}
func (d *DropAliasMsg) Marshal(input TsMsg) (MarshalType, error) {
dropAliasMsg := input.(*DropAliasMsg)
dropAliasRequest := dropAliasMsg.DropAliasRequest
mb, err := proto.Marshal(dropAliasRequest)
if err != nil {
return nil, err
}
return mb, nil
}
func (d *DropAliasMsg) Unmarshal(input MarshalType) (TsMsg, error) {
dropAliasRequest := &milvuspb.DropAliasRequest{}
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
err = proto.Unmarshal(in, dropAliasRequest)
if err != nil {
return nil, err
}
dropAliasMsg := &DropAliasMsg{DropAliasRequest: dropAliasRequest}
dropAliasMsg.BeginTimestamp = dropAliasRequest.GetBase().GetTimestamp()
dropAliasMsg.EndTimestamp = dropAliasRequest.GetBase().GetTimestamp()
return dropAliasMsg, nil
}
func (d *DropAliasMsg) Size() int {
return proto.Size(d.DropAliasRequest)
}
type AlterAliasMsg struct {
BaseMsg
*milvuspb.AlterAliasRequest
}
var _ TsMsg = &AlterAliasMsg{}
func (a *AlterAliasMsg) ID() UniqueID {
return a.Base.MsgID
}
func (a *AlterAliasMsg) SetID(id UniqueID) {
a.Base.MsgID = id
}
func (a *AlterAliasMsg) Type() MsgType {
return a.Base.MsgType
}
func (a *AlterAliasMsg) SourceID() int64 {
return a.Base.SourceID
}
func (a *AlterAliasMsg) Marshal(input TsMsg) (MarshalType, error) {
alterAliasMsg := input.(*AlterAliasMsg)
alterAliasRequest := alterAliasMsg.AlterAliasRequest
mb, err := proto.Marshal(alterAliasRequest)
if err != nil {
return nil, err
}
return mb, nil
}
func (a *AlterAliasMsg) Unmarshal(input MarshalType) (TsMsg, error) {
alterAliasRequest := &milvuspb.AlterAliasRequest{}
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
err = proto.Unmarshal(in, alterAliasRequest)
if err != nil {
return nil, err
}
alterAliasMsg := &AlterAliasMsg{AlterAliasRequest: alterAliasRequest}
alterAliasMsg.BeginTimestamp = alterAliasRequest.GetBase().GetTimestamp()
alterAliasMsg.EndTimestamp = alterAliasRequest.GetBase().GetTimestamp()
return alterAliasMsg, nil
}
func (a *AlterAliasMsg) Size() int {
return proto.Size(a.AlterAliasRequest)
}

View File

@ -188,3 +188,166 @@ func (f *FlushMsg) Unmarshal(input MarshalType) (TsMsg, error) {
func (f *FlushMsg) Size() int { func (f *FlushMsg) Size() int {
return proto.Size(f.FlushRequest) return proto.Size(f.FlushRequest)
} }
type AlterCollectionMsg struct {
BaseMsg
*milvuspb.AlterCollectionRequest
}
var _ TsMsg = &AlterCollectionMsg{}
func (a *AlterCollectionMsg) ID() UniqueID {
return a.Base.MsgID
}
func (a *AlterCollectionMsg) SetID(id UniqueID) {
a.Base.MsgID = id
}
func (a *AlterCollectionMsg) Type() MsgType {
return a.Base.MsgType
}
func (a *AlterCollectionMsg) SourceID() int64 {
return a.Base.SourceID
}
func (a *AlterCollectionMsg) Marshal(input TsMsg) (MarshalType, error) {
alterCollectionMsg := input.(*AlterCollectionMsg)
alterCollectionRequest := alterCollectionMsg.AlterCollectionRequest
mb, err := proto.Marshal(alterCollectionRequest)
if err != nil {
return nil, err
}
return mb, nil
}
func (a *AlterCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error) {
alterCollectionRequest := &milvuspb.AlterCollectionRequest{}
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
err = proto.Unmarshal(in, alterCollectionRequest)
if err != nil {
return nil, err
}
alterCollectionMsg := &AlterCollectionMsg{AlterCollectionRequest: alterCollectionRequest}
alterCollectionMsg.BeginTimestamp = alterCollectionMsg.GetBase().GetTimestamp()
alterCollectionMsg.EndTimestamp = alterCollectionMsg.GetBase().GetTimestamp()
return alterCollectionMsg, nil
}
func (a *AlterCollectionMsg) Size() int {
return proto.Size(a.AlterCollectionRequest)
}
type AlterCollectionFieldMsg struct {
BaseMsg
*milvuspb.AlterCollectionFieldRequest
}
var _ TsMsg = &AlterCollectionFieldMsg{}
func (a *AlterCollectionFieldMsg) ID() UniqueID {
return a.Base.MsgID
}
func (a *AlterCollectionFieldMsg) SetID(id UniqueID) {
a.Base.MsgID = id
}
func (a *AlterCollectionFieldMsg) Type() MsgType {
return a.Base.MsgType
}
func (a *AlterCollectionFieldMsg) SourceID() int64 {
return a.Base.SourceID
}
func (a *AlterCollectionFieldMsg) Marshal(input TsMsg) (MarshalType, error) {
alterCollectionFieldMsg := input.(*AlterCollectionFieldMsg)
alterCollectionFieldRequest := alterCollectionFieldMsg.AlterCollectionFieldRequest
mb, err := proto.Marshal(alterCollectionFieldRequest)
if err != nil {
return nil, err
}
return mb, nil
}
func (a *AlterCollectionFieldMsg) Unmarshal(input MarshalType) (TsMsg, error) {
alterCollectionFieldRequest := &milvuspb.AlterCollectionFieldRequest{}
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
err = proto.Unmarshal(in, alterCollectionFieldRequest)
if err != nil {
return nil, err
}
alterCollectionFieldMsg := &AlterCollectionFieldMsg{AlterCollectionFieldRequest: alterCollectionFieldRequest}
alterCollectionFieldMsg.BeginTimestamp = alterCollectionFieldMsg.GetBase().GetTimestamp()
alterCollectionFieldMsg.EndTimestamp = alterCollectionFieldMsg.GetBase().GetTimestamp()
return alterCollectionFieldMsg, nil
}
func (a *AlterCollectionFieldMsg) Size() int {
return proto.Size(a.AlterCollectionFieldRequest)
}
// TODO fubang maybe it will break the cdc replication
type RenameCollectionMsg struct {
BaseMsg
*milvuspb.RenameCollectionRequest
}
var _ TsMsg = &RenameCollectionMsg{}
func (r *RenameCollectionMsg) ID() UniqueID {
return r.Base.MsgID
}
func (r *RenameCollectionMsg) SetID(id UniqueID) {
r.Base.MsgID = id
}
func (r *RenameCollectionMsg) Type() MsgType {
return r.Base.MsgType
}
func (r *RenameCollectionMsg) SourceID() int64 {
return r.Base.SourceID
}
func (r *RenameCollectionMsg) Marshal(input TsMsg) (MarshalType, error) {
renameCollectionMsg := input.(*RenameCollectionMsg)
renameCollectionRequest := renameCollectionMsg.RenameCollectionRequest
mb, err := proto.Marshal(renameCollectionRequest)
if err != nil {
return nil, err
}
return mb, nil
}
func (r *RenameCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error) {
renameCollectionRequest := &milvuspb.RenameCollectionRequest{}
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
err = proto.Unmarshal(in, renameCollectionRequest)
if err != nil {
return nil, err
}
renameCollectionMsg := &RenameCollectionMsg{RenameCollectionRequest: renameCollectionRequest}
renameCollectionMsg.BeginTimestamp = renameCollectionMsg.GetBase().GetTimestamp()
renameCollectionMsg.EndTimestamp = renameCollectionMsg.GetBase().GetTimestamp()
return renameCollectionMsg, nil
}
func (r *RenameCollectionMsg) Size() int {
return proto.Size(r.RenameCollectionRequest)
}

View File

@ -59,6 +59,10 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher {
timeTickMsg := TimeTickMsg{} timeTickMsg := TimeTickMsg{}
createCollectionMsg := CreateCollectionMsg{} createCollectionMsg := CreateCollectionMsg{}
dropCollectionMsg := DropCollectionMsg{} dropCollectionMsg := DropCollectionMsg{}
alterCollectionMsg := AlterCollectionMsg{}
alterCollectionFieldMsg := AlterCollectionFieldMsg{}
renameCollectionMsg := RenameCollectionMsg{}
createPartitionMsg := CreatePartitionMsg{} createPartitionMsg := CreatePartitionMsg{}
dropPartitionMsg := DropPartitionMsg{} dropPartitionMsg := DropPartitionMsg{}
dataNodeTtMsg := DataNodeTtMsg{} dataNodeTtMsg := DataNodeTtMsg{}
@ -88,6 +92,10 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher {
replicateMsg := ReplicateMsg{} replicateMsg := ReplicateMsg{}
importMsg := ImportMsg{} importMsg := ImportMsg{}
createAliasMsg := CreateAliasMsg{}
dropAliasMsg := DropAliasMsg{}
alterAliasMsg := AlterAliasMsg{}
p := &ProtoUnmarshalDispatcher{} p := &ProtoUnmarshalDispatcher{}
p.TempMap = make(map[commonpb.MsgType]UnmarshalFunc) p.TempMap = make(map[commonpb.MsgType]UnmarshalFunc)
p.TempMap[commonpb.MsgType_Insert] = insertMsg.Unmarshal p.TempMap[commonpb.MsgType_Insert] = insertMsg.Unmarshal
@ -96,6 +104,9 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher {
p.TempMap[commonpb.MsgType_CreateCollection] = createCollectionMsg.Unmarshal p.TempMap[commonpb.MsgType_CreateCollection] = createCollectionMsg.Unmarshal
p.TempMap[commonpb.MsgType_DropCollection] = dropCollectionMsg.Unmarshal p.TempMap[commonpb.MsgType_DropCollection] = dropCollectionMsg.Unmarshal
p.TempMap[commonpb.MsgType_CreatePartition] = createPartitionMsg.Unmarshal p.TempMap[commonpb.MsgType_CreatePartition] = createPartitionMsg.Unmarshal
p.TempMap[commonpb.MsgType_AlterCollection] = alterCollectionMsg.Unmarshal
p.TempMap[commonpb.MsgType_AlterCollectionField] = alterCollectionFieldMsg.Unmarshal
p.TempMap[commonpb.MsgType_RenameCollection] = renameCollectionMsg.Unmarshal
p.TempMap[commonpb.MsgType_DropPartition] = dropPartitionMsg.Unmarshal p.TempMap[commonpb.MsgType_DropPartition] = dropPartitionMsg.Unmarshal
p.TempMap[commonpb.MsgType_DataNodeTt] = dataNodeTtMsg.Unmarshal p.TempMap[commonpb.MsgType_DataNodeTt] = dataNodeTtMsg.Unmarshal
p.TempMap[commonpb.MsgType_CreateIndex] = createIndexMsg.Unmarshal p.TempMap[commonpb.MsgType_CreateIndex] = createIndexMsg.Unmarshal
@ -119,6 +130,9 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher {
p.TempMap[commonpb.MsgType_OperatePrivilegeV2] = operatePrivilegeV2Msg.Unmarshal p.TempMap[commonpb.MsgType_OperatePrivilegeV2] = operatePrivilegeV2Msg.Unmarshal
p.TempMap[commonpb.MsgType_Replicate] = replicateMsg.Unmarshal p.TempMap[commonpb.MsgType_Replicate] = replicateMsg.Unmarshal
p.TempMap[commonpb.MsgType_Import] = importMsg.Unmarshal p.TempMap[commonpb.MsgType_Import] = importMsg.Unmarshal
p.TempMap[commonpb.MsgType_CreateAlias] = createAliasMsg.Unmarshal
p.TempMap[commonpb.MsgType_DropAlias] = dropAliasMsg.Unmarshal
p.TempMap[commonpb.MsgType_AlterAlias] = alterAliasMsg.Unmarshal
return p return p
} }