enhance: Implement OperatePrivilegeV2 message handling and unmarshal support (#41355)

- issue: #41353

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2025-04-30 12:02:53 +08:00 committed by GitHub
parent 6c377b6e86
commit 3bd6268d3c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 107 additions and 2 deletions

View File

@ -5646,9 +5646,12 @@ func (node *Proxy) OperatePrivilegeV2(ctx context.Context, req *milvuspb.Operate
if err != nil {
return merr.Status(err), nil
}
if req.Base == nil {
req.Base = &commonpb.MsgBase{}
}
req.Base.MsgType = commonpb.MsgType_OperatePrivilegeV2
req.Grantor.User = &milvuspb.UserEntity{Name: curUser}
request := &milvuspb.OperatePrivilegeRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_OperatePrivilegeV2},
Entity: &milvuspb.GrantEntity{
Role: req.Role,
Object: &milvuspb.ObjectEntity{Name: commonpb.ObjectType_Global.String()},
@ -5659,7 +5662,7 @@ func (node *Proxy) OperatePrivilegeV2(ctx context.Context, req *milvuspb.Operate
Type: req.Type,
Version: "v2",
}
req.Grantor.User = &milvuspb.UserEntity{Name: curUser}
request.Base = req.Base
result, err := node.mixCoord.OperatePrivilege(ctx, request)
if err != nil {
log.Warn("fail to operate privilege", zap.Error(err))

View File

@ -2225,6 +2225,11 @@ func SendReplicateMessagePack(ctx context.Context, replicateMsgStream msgstream.
BaseMsg: getBaseMsg(ctx, ts),
OperatePrivilegeRequest: r,
}
case *milvuspb.OperatePrivilegeV2Request:
tsMsg = &msgstream.OperatePrivilegeV2Msg{
BaseMsg: getBaseMsg(ctx, ts),
OperatePrivilegeV2Request: r,
}
default:
log.Warn("unknown request", zap.Any("request", request))
return

View File

@ -394,3 +394,56 @@ func (c *OperatePrivilegeMsg) Unmarshal(input MarshalType) (TsMsg, error) {
func (c *OperatePrivilegeMsg) Size() int {
return proto.Size(c.OperatePrivilegeRequest)
}
type OperatePrivilegeV2Msg struct {
BaseMsg
*milvuspb.OperatePrivilegeV2Request
}
var _ TsMsg = &OperatePrivilegeV2Msg{}
func (c *OperatePrivilegeV2Msg) ID() UniqueID {
return c.Base.MsgID
}
func (c *OperatePrivilegeV2Msg) SetID(id UniqueID) {
c.Base.MsgID = id
}
func (c *OperatePrivilegeV2Msg) Type() MsgType {
return c.Base.MsgType
}
func (c *OperatePrivilegeV2Msg) SourceID() int64 {
return c.Base.SourceID
}
func (c *OperatePrivilegeV2Msg) Marshal(input TsMsg) (MarshalType, error) {
operatePrivilegeV2Msg := input.(*OperatePrivilegeV2Msg)
operatePrivilegeV2Request := operatePrivilegeV2Msg.OperatePrivilegeV2Request
mb, err := proto.Marshal(operatePrivilegeV2Request)
if err != nil {
return nil, err
}
return mb, nil
}
func (c *OperatePrivilegeV2Msg) Unmarshal(input MarshalType) (TsMsg, error) {
operatePrivilegeV2Request := &milvuspb.OperatePrivilegeV2Request{}
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
err = proto.Unmarshal(in, operatePrivilegeV2Request)
if err != nil {
return nil, err
}
operatePrivilegeV2Msg := &OperatePrivilegeV2Msg{OperatePrivilegeV2Request: operatePrivilegeV2Request}
operatePrivilegeV2Msg.BeginTimestamp = operatePrivilegeV2Msg.GetBase().GetTimestamp()
operatePrivilegeV2Msg.EndTimestamp = operatePrivilegeV2Msg.GetBase().GetTimestamp()
return operatePrivilegeV2Msg, nil
}
func (c *OperatePrivilegeV2Msg) Size() int {
return proto.Size(c.OperatePrivilegeV2Request)
}

View File

@ -312,3 +312,45 @@ func TestOperatePrivilege(t *testing.T) {
assert.True(t, msg.Size() > 0)
}
func TestOperatePrivilegeV2(t *testing.T) {
var msg TsMsg = &OperatePrivilegeV2Msg{
OperatePrivilegeV2Request: &milvuspb.OperatePrivilegeV2Request{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_OperatePrivilegeV2,
MsgID: 100,
Timestamp: 1000,
SourceID: 10000,
TargetID: 100000,
ReplicateInfo: nil,
},
Grantor: &milvuspb.GrantorEntity{
User: &milvuspb.UserEntity{Name: "unit_user"},
Privilege: &milvuspb.PrivilegeEntity{
Name: "unit_privilege",
},
},
Type: milvuspb.OperatePrivilegeType_Grant,
},
}
assert.EqualValues(t, 100, msg.ID())
msg.SetID(200)
assert.EqualValues(t, 200, msg.ID())
assert.Equal(t, commonpb.MsgType_OperatePrivilegeV2, msg.Type())
assert.EqualValues(t, 10000, msg.SourceID())
msgBytes, err := msg.Marshal(msg)
assert.NoError(t, err)
var newMsg TsMsg = &OperatePrivilegeV2Msg{}
_, err = newMsg.Unmarshal("1")
assert.Error(t, err)
newMsg, err = newMsg.Unmarshal(msgBytes)
assert.NoError(t, err)
assert.EqualValues(t, 200, newMsg.ID())
assert.EqualValues(t, 1000, newMsg.BeginTs())
assert.EqualValues(t, 1000, newMsg.EndTs())
assert.EqualValues(t, "unit_user", newMsg.(*OperatePrivilegeV2Msg).GetGrantor().GetUser().GetName())
assert.EqualValues(t, "unit_privilege", newMsg.(*OperatePrivilegeV2Msg).GetGrantor().GetPrivilege().GetName())
}

View File

@ -84,6 +84,7 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher {
dropRoleMsg := DropRoleMsg{}
operateUserRoleMsg := OperateUserRoleMsg{}
operatePrivilegeMsg := OperatePrivilegeMsg{}
operatePrivilegeV2Msg := OperatePrivilegeV2Msg{}
replicateMsg := ReplicateMsg{}
importMsg := ImportMsg{}
@ -115,6 +116,7 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher {
p.TempMap[commonpb.MsgType_DropRole] = dropRoleMsg.Unmarshal
p.TempMap[commonpb.MsgType_OperateUserRole] = operateUserRoleMsg.Unmarshal
p.TempMap[commonpb.MsgType_OperatePrivilege] = operatePrivilegeMsg.Unmarshal
p.TempMap[commonpb.MsgType_OperatePrivilegeV2] = operatePrivilegeV2Msg.Unmarshal
p.TempMap[commonpb.MsgType_Replicate] = replicateMsg.Unmarshal
p.TempMap[commonpb.MsgType_Import] = importMsg.Unmarshal