diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 5ce6566883..eb6b599326 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -1338,6 +1338,7 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC Condition: NewTaskCondition(ctx), AlterCollectionRequest: request, mixCoord: node.mixCoord, + replicateMsgStream: node.replicateMsgStream, } log := log.Ctx(ctx).With( @@ -1402,6 +1403,7 @@ func (node *Proxy) AlterCollectionField(ctx context.Context, request *milvuspb.A Condition: NewTaskCondition(ctx), AlterCollectionFieldRequest: request, mixCoord: node.mixCoord, + replicateMsgStream: node.replicateMsgStream, } log := log.Ctx(ctx).With( @@ -3895,6 +3897,7 @@ func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAlia Condition: NewTaskCondition(ctx), CreateAliasRequest: request, mixCoord: node.mixCoord, + replicateMsgStream: node.replicateMsgStream, } method := "CreateAlias" @@ -4082,10 +4085,11 @@ func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasReq defer sp.End() dat := &DropAliasTask{ - ctx: ctx, - Condition: NewTaskCondition(ctx), - DropAliasRequest: request, - mixCoord: node.mixCoord, + ctx: ctx, + Condition: NewTaskCondition(ctx), + DropAliasRequest: request, + mixCoord: node.mixCoord, + replicateMsgStream: node.replicateMsgStream, } method := "DropAlias" @@ -4145,10 +4149,11 @@ func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasR defer sp.End() aat := &AlterAliasTask{ - ctx: ctx, - Condition: NewTaskCondition(ctx), - AlterAliasRequest: request, - mixCoord: node.mixCoord, + ctx: ctx, + Condition: NewTaskCondition(ctx), + AlterAliasRequest: request, + mixCoord: node.mixCoord, + replicateMsgStream: node.replicateMsgStream, } method := "AlterAlias" @@ -5975,6 +5980,10 @@ func (node *Proxy) RenameCollection(ctx context.Context, req *milvuspb.RenameCol return merr.Status(err), err } + if merr.Ok(resp) { + SendReplicateMessagePack(ctx, node.replicateMsgStream, req) + } + return resp, nil } diff --git a/internal/proxy/task.go b/internal/proxy/task.go index bd7ff2a7ac..5dcbe6d3ec 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -991,9 +991,10 @@ type alterCollectionTask struct { baseTask Condition *milvuspb.AlterCollectionRequest - ctx context.Context - mixCoord types.MixCoordClient - result *commonpb.Status + ctx context.Context + mixCoord types.MixCoordClient + result *commonpb.Status + replicateMsgStream msgstream.MsgStream } func (t *alterCollectionTask) TraceCtx() context.Context { @@ -1206,7 +1207,11 @@ func (t *alterCollectionTask) PreExecute(ctx context.Context) error { func (t *alterCollectionTask) Execute(ctx context.Context) error { var err error t.result, err = t.mixCoord.AlterCollection(ctx, t.AlterCollectionRequest) - return merr.CheckRPCCall(t.result, err) + if err = merr.CheckRPCCall(t.result, err); err != nil { + return err + } + SendReplicateMessagePack(ctx, t.replicateMsgStream, t.AlterCollectionRequest) + return nil } func (t *alterCollectionTask) PostExecute(ctx context.Context) error { @@ -1217,9 +1222,10 @@ type alterCollectionFieldTask struct { baseTask Condition *milvuspb.AlterCollectionFieldRequest - ctx context.Context - mixCoord types.MixCoordClient - result *commonpb.Status + ctx context.Context + mixCoord types.MixCoordClient + result *commonpb.Status + replicateMsgStream msgstream.MsgStream } func (t *alterCollectionFieldTask) TraceCtx() context.Context { @@ -1382,7 +1388,11 @@ func (t *alterCollectionFieldTask) PreExecute(ctx context.Context) error { func (t *alterCollectionFieldTask) Execute(ctx context.Context) error { var err error t.result, err = t.mixCoord.AlterCollectionField(ctx, t.AlterCollectionFieldRequest) - return merr.CheckRPCCall(t.result, err) + if err = merr.CheckRPCCall(t.result, err); err != nil { + return err + } + SendReplicateMessagePack(ctx, t.replicateMsgStream, t.AlterCollectionFieldRequest) + return nil } func (t *alterCollectionFieldTask) PostExecute(ctx context.Context) error { diff --git a/internal/proxy/task_alias.go b/internal/proxy/task_alias.go index cf164e7754..3b33507f92 100644 --- a/internal/proxy/task_alias.go +++ b/internal/proxy/task_alias.go @@ -22,6 +22,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" "github.com/milvus-io/milvus/pkg/v2/util/commonpbutil" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" @@ -32,9 +33,10 @@ type CreateAliasTask struct { baseTask Condition *milvuspb.CreateAliasRequest - ctx context.Context - mixCoord types.MixCoordClient - result *commonpb.Status + ctx context.Context + mixCoord types.MixCoordClient + replicateMsgStream msgstream.MsgStream + result *commonpb.Status } // TraceCtx returns the trace context of the task. @@ -106,7 +108,11 @@ func (t *CreateAliasTask) PreExecute(ctx context.Context) error { func (t *CreateAliasTask) Execute(ctx context.Context) error { var err error t.result, err = t.mixCoord.CreateAlias(ctx, t.CreateAliasRequest) - return merr.CheckRPCCall(t.result, err) + if err = merr.CheckRPCCall(t.result, err); err != nil { + return err + } + SendReplicateMessagePack(ctx, t.replicateMsgStream, t.CreateAliasRequest) + return nil } // PostExecute defines the post execution, do nothing for create alias @@ -119,9 +125,10 @@ type DropAliasTask struct { baseTask Condition *milvuspb.DropAliasRequest - ctx context.Context - mixCoord types.MixCoordClient - result *commonpb.Status + ctx context.Context + mixCoord types.MixCoordClient + replicateMsgStream msgstream.MsgStream + result *commonpb.Status } // TraceCtx returns the context for trace @@ -180,7 +187,11 @@ func (t *DropAliasTask) PreExecute(ctx context.Context) error { func (t *DropAliasTask) Execute(ctx context.Context) error { var err error t.result, err = t.mixCoord.DropAlias(ctx, t.DropAliasRequest) - return merr.CheckRPCCall(t.result, err) + if err = merr.CheckRPCCall(t.result, err); err != nil { + return err + } + SendReplicateMessagePack(ctx, t.replicateMsgStream, t.DropAliasRequest) + return nil } func (t *DropAliasTask) PostExecute(ctx context.Context) error { @@ -192,9 +203,10 @@ type AlterAliasTask struct { baseTask Condition *milvuspb.AlterAliasRequest - ctx context.Context - mixCoord types.MixCoordClient - result *commonpb.Status + ctx context.Context + mixCoord types.MixCoordClient + replicateMsgStream msgstream.MsgStream + result *commonpb.Status } func (t *AlterAliasTask) TraceCtx() context.Context { @@ -256,7 +268,11 @@ func (t *AlterAliasTask) PreExecute(ctx context.Context) error { func (t *AlterAliasTask) Execute(ctx context.Context) error { var err error t.result, err = t.mixCoord.AlterAlias(ctx, t.AlterAliasRequest) - return merr.CheckRPCCall(t.result, err) + if err = merr.CheckRPCCall(t.result, err); err != nil { + return err + } + SendReplicateMessagePack(ctx, t.replicateMsgStream, t.AlterAliasRequest) + return nil } func (t *AlterAliasTask) PostExecute(ctx context.Context) error { diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 4355a06981..0dc5b89bc2 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -2135,6 +2135,21 @@ func SendReplicateMessagePack(ctx context.Context, replicateMsgStream msgstream. var tsMsg msgstream.TsMsg switch r := request.(type) { + case *milvuspb.AlterCollectionRequest: + tsMsg = &msgstream.AlterCollectionMsg{ + BaseMsg: getBaseMsg(ctx, ts), + AlterCollectionRequest: r, + } + case *milvuspb.AlterCollectionFieldRequest: + tsMsg = &msgstream.AlterCollectionFieldMsg{ + BaseMsg: getBaseMsg(ctx, ts), + AlterCollectionFieldRequest: r, + } + case *milvuspb.RenameCollectionRequest: + tsMsg = &msgstream.RenameCollectionMsg{ + BaseMsg: getBaseMsg(ctx, ts), + RenameCollectionRequest: r, + } case *milvuspb.CreateDatabaseRequest: tsMsg = &msgstream.CreateDatabaseMsg{ BaseMsg: getBaseMsg(ctx, ts), @@ -2230,6 +2245,21 @@ func SendReplicateMessagePack(ctx context.Context, replicateMsgStream msgstream. BaseMsg: getBaseMsg(ctx, ts), OperatePrivilegeV2Request: r, } + case *milvuspb.CreateAliasRequest: + tsMsg = &msgstream.CreateAliasMsg{ + BaseMsg: getBaseMsg(ctx, ts), + CreateAliasRequest: r, + } + case *milvuspb.DropAliasRequest: + tsMsg = &msgstream.DropAliasMsg{ + BaseMsg: getBaseMsg(ctx, ts), + DropAliasRequest: r, + } + case *milvuspb.AlterAliasRequest: + tsMsg = &msgstream.AlterAliasMsg{ + BaseMsg: getBaseMsg(ctx, ts), + AlterAliasRequest: r, + } default: log.Warn("unknown request", zap.Any("request", request)) return diff --git a/internal/proxy/util_test.go b/internal/proxy/util_test.go index 4b46271bf5..0275dca113 100644 --- a/internal/proxy/util_test.go +++ b/internal/proxy/util_test.go @@ -2447,6 +2447,9 @@ func TestSendReplicateMessagePack(t *testing.T) { t.Run("normal case", func(t *testing.T) { mockStream.EXPECT().Produce(mock.Anything, mock.Anything).Return(nil) + SendReplicateMessagePack(ctx, mockStream, &milvuspb.AlterCollectionRequest{}) + SendReplicateMessagePack(ctx, mockStream, &milvuspb.AlterCollectionFieldRequest{}) + SendReplicateMessagePack(ctx, mockStream, &milvuspb.RenameCollectionRequest{}) SendReplicateMessagePack(ctx, mockStream, &milvuspb.CreateDatabaseRequest{}) SendReplicateMessagePack(ctx, mockStream, &milvuspb.DropDatabaseRequest{}) SendReplicateMessagePack(ctx, mockStream, &milvuspb.FlushRequest{}) @@ -2456,6 +2459,15 @@ func TestSendReplicateMessagePack(t *testing.T) { SendReplicateMessagePack(ctx, mockStream, &milvuspb.DropIndexRequest{}) SendReplicateMessagePack(ctx, mockStream, &milvuspb.LoadPartitionsRequest{}) SendReplicateMessagePack(ctx, mockStream, &milvuspb.ReleasePartitionsRequest{}) + SendReplicateMessagePack(ctx, mockStream, &milvuspb.CreateCredentialRequest{}) + SendReplicateMessagePack(ctx, mockStream, &milvuspb.DeleteCredentialRequest{}) + SendReplicateMessagePack(ctx, mockStream, &milvuspb.CreateRoleRequest{}) + SendReplicateMessagePack(ctx, mockStream, &milvuspb.DropRoleRequest{}) + SendReplicateMessagePack(ctx, mockStream, &milvuspb.OperateUserRoleRequest{}) + SendReplicateMessagePack(ctx, mockStream, &milvuspb.OperatePrivilegeRequest{}) + SendReplicateMessagePack(ctx, mockStream, &milvuspb.CreateAliasRequest{}) + SendReplicateMessagePack(ctx, mockStream, &milvuspb.DropAliasRequest{}) + SendReplicateMessagePack(ctx, mockStream, &milvuspb.AlterAliasRequest{}) }) } diff --git a/pkg/mq/msgdispatcher/dispatcher.go b/pkg/mq/msgdispatcher/dispatcher.go index b83473f92f..e3e2bfb32c 100644 --- a/pkg/mq/msgdispatcher/dispatcher.go +++ b/pkg/mq/msgdispatcher/dispatcher.go @@ -249,9 +249,10 @@ func (d *Dispatcher) work() { for vchannel, p := range targetPacks { var err error t, _ := d.targets.Get(vchannel) + isReplicateChannel := strings.Contains(vchannel, paramtable.Get().CommonCfg.ReplicateMsgChannel.GetValue()) // The dispatcher seeks from the oldest target, // so for each target, msg before the target position must be filtered out. - if p.EndTs <= t.pos.GetTimestamp() { + if p.EndTs <= t.pos.GetTimestamp() && !isReplicateChannel { log.Info("skip msg", zap.String("vchannel", vchannel), zap.Int("msgCount", len(p.Msgs)), diff --git a/pkg/mq/msgstream/msg_for_alias.go b/pkg/mq/msgstream/msg_for_alias.go new file mode 100644 index 0000000000..3587aeba8a --- /dev/null +++ b/pkg/mq/msgstream/msg_for_alias.go @@ -0,0 +1,184 @@ +/* + * Licensed to the LF AI & Data foundation under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package msgstream + +import ( + "google.golang.org/protobuf/proto" + + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" +) + +type CreateAliasMsg struct { + BaseMsg + *milvuspb.CreateAliasRequest +} + +var _ TsMsg = &CreateAliasMsg{} + +func (c *CreateAliasMsg) ID() UniqueID { + return c.Base.MsgID +} + +func (c *CreateAliasMsg) SetID(id UniqueID) { + c.Base.MsgID = id +} + +func (c *CreateAliasMsg) Type() MsgType { + return c.Base.MsgType +} + +func (c *CreateAliasMsg) SourceID() int64 { + return c.Base.SourceID +} + +func (c *CreateAliasMsg) Marshal(input TsMsg) (MarshalType, error) { + createAliasMsg := input.(*CreateAliasMsg) + createAliasRequest := createAliasMsg.CreateAliasRequest + mb, err := proto.Marshal(createAliasRequest) + if err != nil { + return nil, err + } + return mb, nil +} + +func (c *CreateAliasMsg) Unmarshal(input MarshalType) (TsMsg, error) { + createAliasRequest := &milvuspb.CreateAliasRequest{} + in, err := convertToByteArray(input) + if err != nil { + return nil, err + } + err = proto.Unmarshal(in, createAliasRequest) + if err != nil { + return nil, err + } + createAliasMsg := &CreateAliasMsg{CreateAliasRequest: createAliasRequest} + createAliasMsg.BeginTimestamp = createAliasRequest.GetBase().GetTimestamp() + createAliasMsg.EndTimestamp = createAliasRequest.GetBase().GetTimestamp() + return createAliasMsg, nil +} + +func (c *CreateAliasMsg) Size() int { + return proto.Size(c.CreateAliasRequest) +} + +type DropAliasMsg struct { + BaseMsg + *milvuspb.DropAliasRequest +} + +var _ TsMsg = &DropAliasMsg{} + +func (d *DropAliasMsg) ID() UniqueID { + return d.Base.MsgID +} + +func (d *DropAliasMsg) SetID(id UniqueID) { + d.Base.MsgID = id +} + +func (d *DropAliasMsg) Type() MsgType { + return d.Base.MsgType +} + +func (d *DropAliasMsg) SourceID() int64 { + return d.Base.SourceID +} + +func (d *DropAliasMsg) Marshal(input TsMsg) (MarshalType, error) { + dropAliasMsg := input.(*DropAliasMsg) + dropAliasRequest := dropAliasMsg.DropAliasRequest + mb, err := proto.Marshal(dropAliasRequest) + if err != nil { + return nil, err + } + return mb, nil +} + +func (d *DropAliasMsg) Unmarshal(input MarshalType) (TsMsg, error) { + dropAliasRequest := &milvuspb.DropAliasRequest{} + in, err := convertToByteArray(input) + if err != nil { + return nil, err + } + err = proto.Unmarshal(in, dropAliasRequest) + if err != nil { + return nil, err + } + dropAliasMsg := &DropAliasMsg{DropAliasRequest: dropAliasRequest} + dropAliasMsg.BeginTimestamp = dropAliasRequest.GetBase().GetTimestamp() + dropAliasMsg.EndTimestamp = dropAliasRequest.GetBase().GetTimestamp() + return dropAliasMsg, nil +} + +func (d *DropAliasMsg) Size() int { + return proto.Size(d.DropAliasRequest) +} + +type AlterAliasMsg struct { + BaseMsg + *milvuspb.AlterAliasRequest +} + +var _ TsMsg = &AlterAliasMsg{} + +func (a *AlterAliasMsg) ID() UniqueID { + return a.Base.MsgID +} + +func (a *AlterAliasMsg) SetID(id UniqueID) { + a.Base.MsgID = id +} + +func (a *AlterAliasMsg) Type() MsgType { + return a.Base.MsgType +} + +func (a *AlterAliasMsg) SourceID() int64 { + return a.Base.SourceID +} + +func (a *AlterAliasMsg) Marshal(input TsMsg) (MarshalType, error) { + alterAliasMsg := input.(*AlterAliasMsg) + alterAliasRequest := alterAliasMsg.AlterAliasRequest + mb, err := proto.Marshal(alterAliasRequest) + if err != nil { + return nil, err + } + return mb, nil +} + +func (a *AlterAliasMsg) Unmarshal(input MarshalType) (TsMsg, error) { + alterAliasRequest := &milvuspb.AlterAliasRequest{} + in, err := convertToByteArray(input) + if err != nil { + return nil, err + } + err = proto.Unmarshal(in, alterAliasRequest) + if err != nil { + return nil, err + } + alterAliasMsg := &AlterAliasMsg{AlterAliasRequest: alterAliasRequest} + alterAliasMsg.BeginTimestamp = alterAliasRequest.GetBase().GetTimestamp() + alterAliasMsg.EndTimestamp = alterAliasRequest.GetBase().GetTimestamp() + return alterAliasMsg, nil +} + +func (a *AlterAliasMsg) Size() int { + return proto.Size(a.AlterAliasRequest) +} diff --git a/pkg/mq/msgstream/msg_for_collection.go b/pkg/mq/msgstream/msg_for_collection.go index ae08317fe7..15d149ad29 100644 --- a/pkg/mq/msgstream/msg_for_collection.go +++ b/pkg/mq/msgstream/msg_for_collection.go @@ -188,3 +188,166 @@ func (f *FlushMsg) Unmarshal(input MarshalType) (TsMsg, error) { func (f *FlushMsg) Size() int { return proto.Size(f.FlushRequest) } + +type AlterCollectionMsg struct { + BaseMsg + *milvuspb.AlterCollectionRequest +} + +var _ TsMsg = &AlterCollectionMsg{} + +func (a *AlterCollectionMsg) ID() UniqueID { + return a.Base.MsgID +} + +func (a *AlterCollectionMsg) SetID(id UniqueID) { + a.Base.MsgID = id +} + +func (a *AlterCollectionMsg) Type() MsgType { + return a.Base.MsgType +} + +func (a *AlterCollectionMsg) SourceID() int64 { + return a.Base.SourceID +} + +func (a *AlterCollectionMsg) Marshal(input TsMsg) (MarshalType, error) { + alterCollectionMsg := input.(*AlterCollectionMsg) + alterCollectionRequest := alterCollectionMsg.AlterCollectionRequest + mb, err := proto.Marshal(alterCollectionRequest) + if err != nil { + return nil, err + } + return mb, nil +} + +func (a *AlterCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error) { + alterCollectionRequest := &milvuspb.AlterCollectionRequest{} + in, err := convertToByteArray(input) + if err != nil { + return nil, err + } + err = proto.Unmarshal(in, alterCollectionRequest) + if err != nil { + return nil, err + } + alterCollectionMsg := &AlterCollectionMsg{AlterCollectionRequest: alterCollectionRequest} + alterCollectionMsg.BeginTimestamp = alterCollectionMsg.GetBase().GetTimestamp() + alterCollectionMsg.EndTimestamp = alterCollectionMsg.GetBase().GetTimestamp() + + return alterCollectionMsg, nil +} + +func (a *AlterCollectionMsg) Size() int { + return proto.Size(a.AlterCollectionRequest) +} + +type AlterCollectionFieldMsg struct { + BaseMsg + *milvuspb.AlterCollectionFieldRequest +} + +var _ TsMsg = &AlterCollectionFieldMsg{} + +func (a *AlterCollectionFieldMsg) ID() UniqueID { + return a.Base.MsgID +} + +func (a *AlterCollectionFieldMsg) SetID(id UniqueID) { + a.Base.MsgID = id +} + +func (a *AlterCollectionFieldMsg) Type() MsgType { + return a.Base.MsgType +} + +func (a *AlterCollectionFieldMsg) SourceID() int64 { + return a.Base.SourceID +} + +func (a *AlterCollectionFieldMsg) Marshal(input TsMsg) (MarshalType, error) { + alterCollectionFieldMsg := input.(*AlterCollectionFieldMsg) + alterCollectionFieldRequest := alterCollectionFieldMsg.AlterCollectionFieldRequest + mb, err := proto.Marshal(alterCollectionFieldRequest) + if err != nil { + return nil, err + } + return mb, nil +} + +func (a *AlterCollectionFieldMsg) Unmarshal(input MarshalType) (TsMsg, error) { + alterCollectionFieldRequest := &milvuspb.AlterCollectionFieldRequest{} + in, err := convertToByteArray(input) + if err != nil { + return nil, err + } + err = proto.Unmarshal(in, alterCollectionFieldRequest) + if err != nil { + return nil, err + } + alterCollectionFieldMsg := &AlterCollectionFieldMsg{AlterCollectionFieldRequest: alterCollectionFieldRequest} + alterCollectionFieldMsg.BeginTimestamp = alterCollectionFieldMsg.GetBase().GetTimestamp() + alterCollectionFieldMsg.EndTimestamp = alterCollectionFieldMsg.GetBase().GetTimestamp() + + return alterCollectionFieldMsg, nil +} + +func (a *AlterCollectionFieldMsg) Size() int { + return proto.Size(a.AlterCollectionFieldRequest) +} + +// TODO fubang maybe it will break the cdc replication +type RenameCollectionMsg struct { + BaseMsg + *milvuspb.RenameCollectionRequest +} + +var _ TsMsg = &RenameCollectionMsg{} + +func (r *RenameCollectionMsg) ID() UniqueID { + return r.Base.MsgID +} + +func (r *RenameCollectionMsg) SetID(id UniqueID) { + r.Base.MsgID = id +} + +func (r *RenameCollectionMsg) Type() MsgType { + return r.Base.MsgType +} + +func (r *RenameCollectionMsg) SourceID() int64 { + return r.Base.SourceID +} + +func (r *RenameCollectionMsg) Marshal(input TsMsg) (MarshalType, error) { + renameCollectionMsg := input.(*RenameCollectionMsg) + renameCollectionRequest := renameCollectionMsg.RenameCollectionRequest + mb, err := proto.Marshal(renameCollectionRequest) + if err != nil { + return nil, err + } + return mb, nil +} + +func (r *RenameCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error) { + renameCollectionRequest := &milvuspb.RenameCollectionRequest{} + in, err := convertToByteArray(input) + if err != nil { + return nil, err + } + err = proto.Unmarshal(in, renameCollectionRequest) + if err != nil { + return nil, err + } + renameCollectionMsg := &RenameCollectionMsg{RenameCollectionRequest: renameCollectionRequest} + renameCollectionMsg.BeginTimestamp = renameCollectionMsg.GetBase().GetTimestamp() + renameCollectionMsg.EndTimestamp = renameCollectionMsg.GetBase().GetTimestamp() + + return renameCollectionMsg, nil +} + +func (r *RenameCollectionMsg) Size() int { + return proto.Size(r.RenameCollectionRequest) +} diff --git a/pkg/mq/msgstream/unmarshal.go b/pkg/mq/msgstream/unmarshal.go index 158616177a..4ac0888c6d 100644 --- a/pkg/mq/msgstream/unmarshal.go +++ b/pkg/mq/msgstream/unmarshal.go @@ -59,6 +59,10 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher { timeTickMsg := TimeTickMsg{} createCollectionMsg := CreateCollectionMsg{} dropCollectionMsg := DropCollectionMsg{} + alterCollectionMsg := AlterCollectionMsg{} + alterCollectionFieldMsg := AlterCollectionFieldMsg{} + renameCollectionMsg := RenameCollectionMsg{} + createPartitionMsg := CreatePartitionMsg{} dropPartitionMsg := DropPartitionMsg{} dataNodeTtMsg := DataNodeTtMsg{} @@ -88,6 +92,10 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher { replicateMsg := ReplicateMsg{} importMsg := ImportMsg{} + createAliasMsg := CreateAliasMsg{} + dropAliasMsg := DropAliasMsg{} + alterAliasMsg := AlterAliasMsg{} + p := &ProtoUnmarshalDispatcher{} p.TempMap = make(map[commonpb.MsgType]UnmarshalFunc) p.TempMap[commonpb.MsgType_Insert] = insertMsg.Unmarshal @@ -96,6 +104,9 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher { p.TempMap[commonpb.MsgType_CreateCollection] = createCollectionMsg.Unmarshal p.TempMap[commonpb.MsgType_DropCollection] = dropCollectionMsg.Unmarshal p.TempMap[commonpb.MsgType_CreatePartition] = createPartitionMsg.Unmarshal + p.TempMap[commonpb.MsgType_AlterCollection] = alterCollectionMsg.Unmarshal + p.TempMap[commonpb.MsgType_AlterCollectionField] = alterCollectionFieldMsg.Unmarshal + p.TempMap[commonpb.MsgType_RenameCollection] = renameCollectionMsg.Unmarshal p.TempMap[commonpb.MsgType_DropPartition] = dropPartitionMsg.Unmarshal p.TempMap[commonpb.MsgType_DataNodeTt] = dataNodeTtMsg.Unmarshal p.TempMap[commonpb.MsgType_CreateIndex] = createIndexMsg.Unmarshal @@ -119,6 +130,9 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher { p.TempMap[commonpb.MsgType_OperatePrivilegeV2] = operatePrivilegeV2Msg.Unmarshal p.TempMap[commonpb.MsgType_Replicate] = replicateMsg.Unmarshal p.TempMap[commonpb.MsgType_Import] = importMsg.Unmarshal + p.TempMap[commonpb.MsgType_CreateAlias] = createAliasMsg.Unmarshal + p.TempMap[commonpb.MsgType_DropAlias] = dropAliasMsg.Unmarshal + p.TempMap[commonpb.MsgType_AlterAlias] = alterAliasMsg.Unmarshal return p }