From 0ccb95303ea7cd64ea9c7c2aa8e4855a76e3bbca Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Tue, 29 Jul 2025 17:17:36 +0800 Subject: [PATCH] feat: [CMEK] Add utils to load plugins (#42986) See also: #40321 --------- Signed-off-by: yangxuan --- .../flushcommon/syncmgr/storage_serializer.go | 1 + .../syncmgr/storage_serializer_test.go | 1 + internal/util/hookutil/cipher.go | 244 ++++++++++++++++++ internal/util/hookutil/cipher_test.go | 161 ++++++++++++ pkg/util/paramtable/cipher_config.go | 42 +++ pkg/util/paramtable/hook_config.go | 4 + pkg/util/paramtable/runtime.go | 12 +- 7 files changed, 464 insertions(+), 1 deletion(-) create mode 100644 internal/util/hookutil/cipher.go create mode 100644 internal/util/hookutil/cipher_test.go create mode 100644 pkg/util/paramtable/cipher_config.go diff --git a/internal/flushcommon/syncmgr/storage_serializer.go b/internal/flushcommon/syncmgr/storage_serializer.go index 7b00803c7d..dec1ab9a6c 100644 --- a/internal/flushcommon/syncmgr/storage_serializer.go +++ b/internal/flushcommon/syncmgr/storage_serializer.go @@ -47,6 +47,7 @@ func NewStorageSerializer(metacache metacache.MetaCache, schema *schemapb.Collec return nil, merr.WrapErrServiceInternal("cannot find pk field") } meta := &etcdpb.CollectionMeta{ + ID: metacache.Collection(), Schema: schema, } inCodec := storage.NewInsertCodecWithSchema(meta) diff --git a/internal/flushcommon/syncmgr/storage_serializer_test.go b/internal/flushcommon/syncmgr/storage_serializer_test.go index 71b68cb5fe..4da6bf128c 100644 --- a/internal/flushcommon/syncmgr/storage_serializer_test.go +++ b/internal/flushcommon/syncmgr/storage_serializer_test.go @@ -93,6 +93,7 @@ func (s *StorageV1SerializerSuite) SetupSuite() { func (s *StorageV1SerializerSuite) SetupTest() { s.mockCache.EXPECT().GetSchema(mock.Anything).Return(s.schema).Maybe() + s.mockCache.EXPECT().Collection().Return(s.collectionID).Once() var err error s.serializer, err = NewStorageSerializer(s.mockCache, s.schema) diff --git a/internal/util/hookutil/cipher.go b/internal/util/hookutil/cipher.go new file mode 100644 index 0000000000..02448420c9 --- /dev/null +++ b/internal/util/hookutil/cipher.go @@ -0,0 +1,244 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package hookutil + +import ( + "bytes" + "fmt" + "plugin" + "strconv" + "sync" + + "github.com/cockroachdb/errors" + "go.uber.org/atomic" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/hook" + "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" +) + +var ( + Cipher atomic.Value + initCipherOnce sync.Once + ErrCipherPluginMissing = errors.New("cipher plugin is missing") +) + +// GetCipher returns singleton hook.Cipher instance. +// If Milvus is not built with cipher plugin, it will return nil +// If Milvus is built with cipher plugin, it will return hook.Cipher +func GetCipher() hook.Cipher { + InitOnceCipher() + return Cipher.Load().(cipherContainer).cipher +} + +func IsClusterEncyptionEnabled() bool { + return GetCipher() != nil +} + +const ( + EncryptionEnabledKey = "cipher.enabled" + EncryptionRootKeyKey = "cipher.key" + EncryptionEzIDKey = "cipher.ezID" + + CipherConfigCreateEZ = "cipher.ez.create" + CipherConfigRemoveEZ = "cipher.ez.remove" + CipherConfigMilvusRoleName = "cipher.milvusRoleName" + CipherConfigKeyKmsKeyArn = "cipher.kmsKeyArn" +) + +type EZ struct { + EzID int64 + CollectionID int64 +} + +type CipherContext struct { + EZ + key []byte +} + +func GetEzByCollProperties(collProperties []*commonpb.KeyValuePair, collectionID int64) *EZ { + if len(collProperties) == 0 { + log.Warn("GetEzByCollProperties empty properties", + zap.Any("insertCodec collID", collectionID), + zap.Any("properties", collProperties), + ) + } + for _, property := range collProperties { + if property.Key == EncryptionEzIDKey { + ezID, _ := strconv.ParseInt(property.Value, 10, 64) + return &EZ{ + EzID: ezID, + CollectionID: collectionID, + } + } + } + return nil +} + +func TidyDBCipherProperties(dbProperties []*commonpb.KeyValuePair) ([]*commonpb.KeyValuePair, error) { + if IsDBEncyptionEnabled(dbProperties) { + if !IsClusterEncyptionEnabled() { + return nil, ErrCipherPluginMissing + } + for _, property := range dbProperties { + if property.Key == EncryptionRootKeyKey { + return dbProperties, nil + } + } + + // set default root key from config if EncryuptionRootKeyKey left empty + dbProperties = append(dbProperties, &commonpb.KeyValuePair{ + Key: EncryptionRootKeyKey, + Value: paramtable.GetCipherParams().DefaultRootKey.GetValue(), + }) + } + return dbProperties, nil +} + +func IsDBEncyptionEnabled(dbProperties []*commonpb.KeyValuePair) bool { + for _, property := range dbProperties { + if property.Key == EncryptionEnabledKey { + return true + } + } + return false +} + +func GetEZRootKeyByDBProperties(dbProperties []*commonpb.KeyValuePair) string { + for _, property := range dbProperties { + if property.Key == EncryptionRootKeyKey { + return property.Value + } + } + return paramtable.GetCipherParams().DefaultRootKey.GetValue() +} + +// For test only +func InitTestCipher() { + InitOnceCipher() + storeCipher(testCipher{}) +} + +// cipherContainer is Container to wrap hook.Cipher interface +// this struct is used to be stored in atomic.Value +// since different type stored in it will cause panicking. +type cipherContainer struct { + cipher hook.Cipher +} + +func storeCipher(cipher hook.Cipher) { + Cipher.Store(cipherContainer{cipher: cipher}) +} + +func initCipher() error { + storeCipher(nil) + + pathGo := paramtable.GetCipherParams().SoPathGo.GetValue() + if pathGo == "" { + log.Info("empty so path for go plugin, skip to load cipher plugin") + return nil + } + + pathCpp := paramtable.GetCipherParams().SoPathCpp.GetValue() + if pathCpp == "" { + log.Info("empty so path for cpp plugin, skip to load cipher plugin") + return nil + } + + log.Info("start to load cipher plugin", zap.String("path", pathGo)) + p, err := plugin.Open(pathGo) + if err != nil { + return fmt.Errorf("fail to open the cipher plugin, error: %s", err.Error()) + } + log.Info("cipher plugin opened", zap.String("path", pathGo)) + + h, err := p.Lookup("CipherPlugin") + if err != nil { + return fmt.Errorf("fail to the 'CipherPlugin' object in the plugin, error: %s", err.Error()) + } + + var cipherVal hook.Cipher + var ok bool + cipherVal, ok = h.(hook.Cipher) + if !ok { + return fmt.Errorf("fail to convert the `CipherPlugin` interface") + } + + initConfigs := paramtable.Get().EtcdCfg.GetAll() + initConfigs[CipherConfigMilvusRoleName] = paramtable.GetRole() + if err = cipherVal.Init(initConfigs); err != nil { + return fmt.Errorf("fail to init configs for the cipher plugin, error: %s", err.Error()) + } + storeCipher((cipherVal)) + return nil +} + +func InitOnceCipher() { + initCipherOnce.Do(func() { + err := initCipher() + if err != nil { + log.Panic("fail to init cipher plugin", + zap.String("Go so path", paramtable.GetCipherParams().SoPathGo.GetValue()), + zap.String("Cpp so path", paramtable.GetCipherParams().SoPathCpp.GetValue()), + zap.Error(err)) + } + }) +} + +// testCipher encryption will append magicStr to plainText, magicStr is str of ezID and collectionID +type testCipher struct{} + +var ( + _ hook.Cipher = (*testCipher)(nil) + _ hook.Encryptor = (*testCryptoImpl)(nil) + _ hook.Decryptor = (*testCryptoImpl)(nil) +) + +func (d testCipher) Init(params map[string]string) error { + return nil +} + +func (d testCipher) GetEncryptor(ezID, collectionID int64) (encryptor hook.Encryptor, safeKey []byte, err error) { + return createTestCryptoImpl(ezID, collectionID), []byte("safe key"), nil +} + +func (d testCipher) GetDecryptor(ezID, collectionID int64, safeKey []byte) (hook.Decryptor, error) { + return createTestCryptoImpl(ezID, collectionID), nil +} + +func (d testCipher) GetUnsafeKey(ezID, collectionID int64) []byte { + return []byte("unsafe key") +} + +// append magicStr to plainText +type testCryptoImpl struct { + magicStr string +} + +func createTestCryptoImpl(ezID, collectionID int64) testCryptoImpl { + return testCryptoImpl{fmt.Sprintf("%d%d", ezID, collectionID)} +} + +func (c testCryptoImpl) Encrypt(plainText []byte) (cipherText []byte, err error) { + return append(plainText, []byte(c.magicStr)...), nil +} + +func (c testCryptoImpl) Decrypt(cipherText []byte) (plainText []byte, err error) { + return bytes.TrimSuffix(cipherText, []byte(c.magicStr)), nil +} diff --git a/internal/util/hookutil/cipher_test.go b/internal/util/hookutil/cipher_test.go new file mode 100644 index 0000000000..d677d2d82a --- /dev/null +++ b/internal/util/hookutil/cipher_test.go @@ -0,0 +1,161 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package hookutil + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" +) + +type CipherSuite struct { + suite.Suite +} + +func TestCipherSuite(t *testing.T) { + suite.Run(t, new(CipherSuite)) +} + +func (s *CipherSuite) SetupSuite() { + paramtable.Init() +} + +func (s *CipherSuite) TestGetCipherNil() { + s.Nil(GetCipher()) +} + +func (s *CipherSuite) TestGetTestCipher() { + InitTestCipher() + cipher := GetCipher() + s.NotNil(cipher) + s.IsType(testCipher{}, cipher) + + ezID, collectionID := int64(1), int64(2) + encryptor, safeKey, err := GetCipher().GetEncryptor(ezID, collectionID) + s.NoError(err) + s.Equal([]byte("safe key"), safeKey) + + plainText := []byte("test plain text") + cipherText, err := encryptor.Encrypt(plainText) + s.NoError(err) + s.Equal(append(plainText, []byte(fmt.Sprintf("%d%d", ezID, collectionID))...), cipherText) + + decryptor, err := GetCipher().GetDecryptor(ezID, collectionID, safeKey) + s.NoError(err) + s.NotNil(decryptor) + gotPlainText, err := decryptor.Decrypt(cipherText) + s.NoError(err) + s.Equal(plainText, gotPlainText) + + // test GetUnsafeKey + s.Equal([]byte("unsafe key"), GetCipher().GetUnsafeKey(1, 2)) +} + +func (s *CipherSuite) TestGetEzByCollProperties() { + collProperties := []*commonpb.KeyValuePair{ + {Key: EncryptionEzIDKey, Value: "123"}, + } + result := GetEzByCollProperties(collProperties, 456) + s.NotNil(result) + s.Equal(int64(123), result.EzID) + s.Equal(int64(456), result.CollectionID) + + emptyResult := GetEzByCollProperties([]*commonpb.KeyValuePair{}, 456) + s.Nil(emptyResult) +} + +func (s *CipherSuite) TestTidyDBCipherProperties() { + // Test with encryption enabled and root key already present + dbPropertiesWithRootKey := []*commonpb.KeyValuePair{ + {Key: EncryptionEnabledKey, Value: "true"}, + {Key: EncryptionRootKeyKey, Value: "existing-root-key"}, + } + result, err := TidyDBCipherProperties(dbPropertiesWithRootKey) + s.NoError(err) + s.Equal(dbPropertiesWithRootKey, result) + + // Test with encryption enabled and test cipher available + InitTestCipher() + dbPropertiesWithoutRootKey := []*commonpb.KeyValuePair{ + {Key: EncryptionEnabledKey, Value: "true"}, + } + result, err = TidyDBCipherProperties(dbPropertiesWithoutRootKey) + s.NoError(err) + s.Len(result, 2) // should have EncryptionEnabledKey + added default root key + s.Equal(EncryptionEnabledKey, result[0].Key) + s.Equal(EncryptionRootKeyKey, result[1].Key) + + // Test without encryption enabled + dbPropertiesWithoutEncryption := []*commonpb.KeyValuePair{} + result, err = TidyDBCipherProperties(dbPropertiesWithoutEncryption) + s.NoError(err) + s.NotNil(result) + s.Equal(dbPropertiesWithoutEncryption, result) +} + +func (s *CipherSuite) TestIsDBEncyptionEnabled() { + dbProperties := []*commonpb.KeyValuePair{ + {Key: EncryptionEnabledKey, Value: "true"}, + } + s.True(IsDBEncyptionEnabled(dbProperties)) + + dbProperties = []*commonpb.KeyValuePair{} + s.False(IsDBEncyptionEnabled(dbProperties)) +} + +func (s *CipherSuite) TestGetEZRootKeyByDBProperties() { + dbProperties := []*commonpb.KeyValuePair{ + {Key: EncryptionRootKeyKey, Value: "rootKey"}, + } + rootKey := GetEZRootKeyByDBProperties(dbProperties) + s.Equal("rootKey", rootKey) + + emptyProperties := []*commonpb.KeyValuePair{} + defaultRootKey := GetEZRootKeyByDBProperties(emptyProperties) + s.Equal(paramtable.GetCipherParams().DefaultRootKey.GetValue(), defaultRootKey) +} + +func (s *CipherSuite) TestTidyDBCipherPropertiesError() { + // Reset cipher to nil to test error case + storeCipher(nil) + dbProperties := []*commonpb.KeyValuePair{ + {Key: EncryptionEnabledKey, Value: "true"}, + } + _, err := TidyDBCipherProperties(dbProperties) + s.Error(err) + s.Equal(ErrCipherPluginMissing, err) +} + +func (s *CipherSuite) TestTestCipherInit() { + cipher := testCipher{} + err := cipher.Init(map[string]string{"key": "value"}) + s.NoError(err) +} + +func (s *CipherSuite) TestIsClusterEncyptionEnabled() { + // Test when cipher is nil + storeCipher(nil) + s.False(IsClusterEncyptionEnabled()) + + // Test when cipher is not nil + InitTestCipher() + s.True(IsClusterEncyptionEnabled()) +} diff --git a/pkg/util/paramtable/cipher_config.go b/pkg/util/paramtable/cipher_config.go new file mode 100644 index 0000000000..62491c10b9 --- /dev/null +++ b/pkg/util/paramtable/cipher_config.go @@ -0,0 +1,42 @@ +package paramtable + +import ( + "github.com/milvus-io/milvus/pkg/v2/log" +) + +const cipherYamlFile = "cipher.yaml" + +type cipherConfig struct { + cipherBase *BaseTable + + SoPathGo ParamItem `refreshable:"false"` + SoPathCpp ParamItem `refreshable:"false"` + DefaultRootKey ParamItem `refreshable:"false"` +} + +func (c *cipherConfig) init(base *BaseTable) { + c.cipherBase = base + log.Info("init cipher config") + + c.SoPathGo = ParamItem{ + Key: "cipherPlugin.soPathGo", + Version: "2.6.0", + } + c.SoPathGo.Init(base.mgr) + + c.SoPathCpp = ParamItem{ + Key: "cipherPlugin.soPathCpp", + Version: "2.6.0", + } + c.SoPathCpp.Init(base.mgr) + + c.DefaultRootKey = ParamItem{ + Key: "cipherPlugin.defaultKmsKeyArn", + Version: "2.6.0", + } + c.DefaultRootKey.Init(base.mgr) +} + +func (c *cipherConfig) Save(key string, value string) error { + return c.cipherBase.Save(key, value) +} diff --git a/pkg/util/paramtable/hook_config.go b/pkg/util/paramtable/hook_config.go index bdc4bfc553..5fbe95469d 100644 --- a/pkg/util/paramtable/hook_config.go +++ b/pkg/util/paramtable/hook_config.go @@ -41,3 +41,7 @@ func (h *hookConfig) WatchHookWithPrefix(ident string, keyPrefix string, onEvent func (h *hookConfig) GetAll() map[string]string { return h.hookBase.mgr.GetConfigs() } + +func (h *hookConfig) Save(key string, value string) error { + return h.hookBase.Save(key, value) +} diff --git a/pkg/util/paramtable/runtime.go b/pkg/util/paramtable/runtime.go index 1ed48a4e9d..13b2db3d00 100644 --- a/pkg/util/paramtable/runtime.go +++ b/pkg/util/paramtable/runtime.go @@ -38,7 +38,8 @@ var ( runtimeParam = runtimeConfig{ components: typeutil.ConcurrentSet[string]{}, } - hookParams hookConfig + hookParams hookConfig + cipherParams cipherConfig ) func Init() { @@ -54,6 +55,9 @@ func Init() { params.Init(baseTable) hookBaseTable := NewBaseTableFromYamlOnly(hookYamlFile) hookParams.init(hookBaseTable) + + cipherBaseTable := NewBaseTableFromYamlOnly(cipherYamlFile) + cipherParams.init(cipherBaseTable) }) } @@ -62,6 +66,8 @@ func InitWithBaseTable(baseTable *BaseTable) { params.Init(baseTable) hookBaseTable := NewBaseTableFromYamlOnly(hookYamlFile) hookParams.init(hookBaseTable) + cipherBaseTable := NewBaseTableFromYamlOnly(cipherYamlFile) + cipherParams.init(cipherBaseTable) }) } @@ -78,6 +84,10 @@ func GetHookParams() *hookConfig { return &hookParams } +func GetCipherParams() *cipherConfig { + return &cipherParams +} + func SetNodeID(newID UniqueID) { runtimeParam.nodeID.Store(newID) }