mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
enhance: Enable properites in database meta (#31394)
issue: #30040 This PR add properties in database meta, so we can store some database level info. Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
68af832405
commit
e6d50def4f
@ -16,6 +16,7 @@ type RootCoordCatalog interface {
|
|||||||
CreateDatabase(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error
|
CreateDatabase(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error
|
||||||
DropDatabase(ctx context.Context, dbID int64, ts typeutil.Timestamp) error
|
DropDatabase(ctx context.Context, dbID int64, ts typeutil.Timestamp) error
|
||||||
ListDatabases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Database, error)
|
ListDatabases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Database, error)
|
||||||
|
AlterDatabase(ctx context.Context, newDB *model.Database, ts typeutil.Timestamp) error
|
||||||
|
|
||||||
CreateCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error
|
CreateCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error
|
||||||
GetCollectionByID(ctx context.Context, dbID int64, ts typeutil.Timestamp, collectionID typeutil.UniqueID) (*model.Collection, error)
|
GetCollectionByID(ctx context.Context, dbID int64, ts typeutil.Timestamp, collectionID typeutil.UniqueID) (*model.Collection, error)
|
||||||
|
|||||||
@ -110,6 +110,16 @@ func (kc *Catalog) CreateDatabase(ctx context.Context, db *model.Database, ts ty
|
|||||||
return kc.Snapshot.Save(key, string(v), ts)
|
return kc.Snapshot.Save(key, string(v), ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kc *Catalog) AlterDatabase(ctx context.Context, newColl *model.Database, ts typeutil.Timestamp) error {
|
||||||
|
key := BuildDatabaseKey(newColl.ID)
|
||||||
|
dbInfo := model.MarshalDatabaseModel(newColl)
|
||||||
|
v, err := proto.Marshal(dbInfo)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return kc.Snapshot.Save(key, string(v), ts)
|
||||||
|
}
|
||||||
|
|
||||||
func (kc *Catalog) DropDatabase(ctx context.Context, dbID int64, ts typeutil.Timestamp) error {
|
func (kc *Catalog) DropDatabase(ctx context.Context, dbID int64, ts typeutil.Timestamp) error {
|
||||||
key := BuildDatabaseKey(dbID)
|
key := BuildDatabaseKey(dbID)
|
||||||
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{key}, ts)
|
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{key}, ts)
|
||||||
|
|||||||
@ -2524,3 +2524,30 @@ func TestRBAC_Grant(t *testing.T) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCatalog_AlterDatabase(t *testing.T) {
|
||||||
|
kvmock := mocks.NewSnapShotKV(t)
|
||||||
|
c := &Catalog{Snapshot: kvmock}
|
||||||
|
db := model.NewDatabase(1, "db", pb.DatabaseState_DatabaseCreated)
|
||||||
|
|
||||||
|
kvmock.EXPECT().Save(mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// test alter database success
|
||||||
|
newDB := db.Clone()
|
||||||
|
db.Properties = []*commonpb.KeyValuePair{
|
||||||
|
{
|
||||||
|
Key: "key1",
|
||||||
|
Value: "value1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := c.AlterDatabase(ctx, newDB, typeutil.ZeroTimestamp)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// test alter database fail
|
||||||
|
mockErr := errors.New("access kv store error")
|
||||||
|
kvmock.ExpectedCalls = nil
|
||||||
|
kvmock.EXPECT().Save(mock.Anything, mock.Anything, mock.Anything).Return(mockErr)
|
||||||
|
err = c.AlterDatabase(ctx, newDB, typeutil.ZeroTimestamp)
|
||||||
|
assert.ErrorIs(t, err, mockErr)
|
||||||
|
}
|
||||||
|
|||||||
@ -159,6 +159,50 @@ func (_c *RootCoordCatalog_AlterCredential_Call) RunAndReturn(run func(context.C
|
|||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AlterDatabase provides a mock function with given fields: ctx, newDB, ts
|
||||||
|
func (_m *RootCoordCatalog) AlterDatabase(ctx context.Context, newDB *model.Database, ts uint64) error {
|
||||||
|
ret := _m.Called(ctx, newDB, ts)
|
||||||
|
|
||||||
|
var r0 error
|
||||||
|
if rf, ok := ret.Get(0).(func(context.Context, *model.Database, uint64) error); ok {
|
||||||
|
r0 = rf(ctx, newDB, ts)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Error(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0
|
||||||
|
}
|
||||||
|
|
||||||
|
// RootCoordCatalog_AlterDatabase_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterDatabase'
|
||||||
|
type RootCoordCatalog_AlterDatabase_Call struct {
|
||||||
|
*mock.Call
|
||||||
|
}
|
||||||
|
|
||||||
|
// AlterDatabase is a helper method to define mock.On call
|
||||||
|
// - ctx context.Context
|
||||||
|
// - newDB *model.Database
|
||||||
|
// - ts uint64
|
||||||
|
func (_e *RootCoordCatalog_Expecter) AlterDatabase(ctx interface{}, newDB interface{}, ts interface{}) *RootCoordCatalog_AlterDatabase_Call {
|
||||||
|
return &RootCoordCatalog_AlterDatabase_Call{Call: _e.mock.On("AlterDatabase", ctx, newDB, ts)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *RootCoordCatalog_AlterDatabase_Call) Run(run func(ctx context.Context, newDB *model.Database, ts uint64)) *RootCoordCatalog_AlterDatabase_Call {
|
||||||
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
|
run(args[0].(context.Context), args[1].(*model.Database), args[2].(uint64))
|
||||||
|
})
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *RootCoordCatalog_AlterDatabase_Call) Return(_a0 error) *RootCoordCatalog_AlterDatabase_Call {
|
||||||
|
_c.Call.Return(_a0)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_c *RootCoordCatalog_AlterDatabase_Call) RunAndReturn(run func(context.Context, *model.Database, uint64) error) *RootCoordCatalog_AlterDatabase_Call {
|
||||||
|
_c.Call.Return(run)
|
||||||
|
return _c
|
||||||
|
}
|
||||||
|
|
||||||
// AlterGrant provides a mock function with given fields: ctx, tenant, entity, operateType
|
// AlterGrant provides a mock function with given fields: ctx, tenant, entity, operateType
|
||||||
func (_m *RootCoordCatalog) AlterGrant(ctx context.Context, tenant string, entity *milvuspb.GrantEntity, operateType milvuspb.OperatePrivilegeType) error {
|
func (_m *RootCoordCatalog) AlterGrant(ctx context.Context, tenant string, entity *milvuspb.GrantEntity, operateType milvuspb.OperatePrivilegeType) error {
|
||||||
ret := _m.Called(ctx, tenant, entity, operateType)
|
ret := _m.Called(ctx, tenant, entity, operateType)
|
||||||
|
|||||||
@ -3,7 +3,9 @@ package model
|
|||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
|
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||||
|
"github.com/milvus-io/milvus/pkg/common"
|
||||||
"github.com/milvus-io/milvus/pkg/util"
|
"github.com/milvus-io/milvus/pkg/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -13,14 +15,16 @@ type Database struct {
|
|||||||
Name string
|
Name string
|
||||||
State pb.DatabaseState
|
State pb.DatabaseState
|
||||||
CreatedTime uint64
|
CreatedTime uint64
|
||||||
|
Properties []*commonpb.KeyValuePair
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDatabase(id int64, name string, sate pb.DatabaseState) *Database {
|
func NewDatabase(id int64, name string, state pb.DatabaseState) *Database {
|
||||||
return &Database{
|
return &Database{
|
||||||
ID: id,
|
ID: id,
|
||||||
Name: name,
|
Name: name,
|
||||||
State: sate,
|
State: state,
|
||||||
CreatedTime: uint64(time.Now().UnixNano()),
|
CreatedTime: uint64(time.Now().UnixNano()),
|
||||||
|
Properties: make([]*commonpb.KeyValuePair, 0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -39,6 +43,7 @@ func (c *Database) Clone() *Database {
|
|||||||
Name: c.Name,
|
Name: c.Name,
|
||||||
State: c.State,
|
State: c.State,
|
||||||
CreatedTime: c.CreatedTime,
|
CreatedTime: c.CreatedTime,
|
||||||
|
Properties: common.CloneKeyValuePairs(c.Properties),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -47,7 +52,8 @@ func (c *Database) Equal(other Database) bool {
|
|||||||
c.Name == other.Name &&
|
c.Name == other.Name &&
|
||||||
c.ID == other.ID &&
|
c.ID == other.ID &&
|
||||||
c.State == other.State &&
|
c.State == other.State &&
|
||||||
c.CreatedTime == other.CreatedTime
|
c.CreatedTime == other.CreatedTime &&
|
||||||
|
checkParamsEqual(c.Properties, other.Properties)
|
||||||
}
|
}
|
||||||
|
|
||||||
func MarshalDatabaseModel(db *Database) *pb.DatabaseInfo {
|
func MarshalDatabaseModel(db *Database) *pb.DatabaseInfo {
|
||||||
@ -61,6 +67,7 @@ func MarshalDatabaseModel(db *Database) *pb.DatabaseInfo {
|
|||||||
Name: db.Name,
|
Name: db.Name,
|
||||||
State: db.State,
|
State: db.State,
|
||||||
CreatedTime: db.CreatedTime,
|
CreatedTime: db.CreatedTime,
|
||||||
|
Properties: db.Properties,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,5 +82,6 @@ func UnmarshalDatabaseModel(info *pb.DatabaseInfo) *Database {
|
|||||||
CreatedTime: info.GetCreatedTime(),
|
CreatedTime: info.GetCreatedTime(),
|
||||||
State: info.GetState(),
|
State: info.GetState(),
|
||||||
TenantID: info.GetTenantId(),
|
TenantID: info.GetTenantId(),
|
||||||
|
Properties: info.GetProperties(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -5,16 +5,28 @@ import (
|
|||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/etcdpb"
|
"github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
properties = []*commonpb.KeyValuePair{
|
||||||
|
{
|
||||||
|
Key: "key1",
|
||||||
|
Value: "value1",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: "key2",
|
||||||
|
Value: "value2",
|
||||||
|
},
|
||||||
|
}
|
||||||
dbPB = &etcdpb.DatabaseInfo{
|
dbPB = &etcdpb.DatabaseInfo{
|
||||||
TenantId: "1",
|
TenantId: "1",
|
||||||
Name: "test",
|
Name: "test",
|
||||||
Id: 1,
|
Id: 1,
|
||||||
CreatedTime: 1,
|
CreatedTime: 1,
|
||||||
State: etcdpb.DatabaseState_DatabaseCreated,
|
State: etcdpb.DatabaseState_DatabaseCreated,
|
||||||
|
Properties: properties,
|
||||||
}
|
}
|
||||||
|
|
||||||
dbModel = &Database{
|
dbModel = &Database{
|
||||||
@ -23,6 +35,7 @@ var (
|
|||||||
ID: 1,
|
ID: 1,
|
||||||
CreatedTime: 1,
|
CreatedTime: 1,
|
||||||
State: etcdpb.DatabaseState_DatabaseCreated,
|
State: etcdpb.DatabaseState_DatabaseCreated,
|
||||||
|
Properties: properties,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -41,6 +54,7 @@ func TestUnmarshalDatabaseModel(t *testing.T) {
|
|||||||
func TestDatabaseCloneAndEqual(t *testing.T) {
|
func TestDatabaseCloneAndEqual(t *testing.T) {
|
||||||
clone := dbModel.Clone()
|
clone := dbModel.Clone()
|
||||||
assert.Equal(t, dbModel, clone)
|
assert.Equal(t, dbModel, clone)
|
||||||
|
assert.True(t, dbModel.Equal(*clone))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDatabaseAvailable(t *testing.T) {
|
func TestDatabaseAvailable(t *testing.T) {
|
||||||
|
|||||||
@ -93,6 +93,7 @@ message DatabaseInfo {
|
|||||||
int64 id = 3;
|
int64 id = 3;
|
||||||
DatabaseState state = 4;
|
DatabaseState state = 4;
|
||||||
uint64 created_time = 5;
|
uint64 created_time = 5;
|
||||||
|
repeated common.KeyValuePair properties = 6;
|
||||||
}
|
}
|
||||||
|
|
||||||
message SegmentIndexInfo {
|
message SegmentIndexInfo {
|
||||||
|
|||||||
@ -276,6 +276,23 @@ func (mt *MetaTable) createDatabasePrivate(ctx context.Context, db *model.Databa
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mt *MetaTable) AlterDatabase(ctx context.Context, oldDB *model.Database, newDB *model.Database, ts typeutil.Timestamp) error {
|
||||||
|
mt.ddLock.Lock()
|
||||||
|
defer mt.ddLock.Unlock()
|
||||||
|
|
||||||
|
if oldDB.Name != newDB.Name || oldDB.ID != newDB.ID || oldDB.State != newDB.State {
|
||||||
|
return fmt.Errorf("alter database name/id is not supported!")
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
|
||||||
|
if err := mt.catalog.AlterDatabase(ctx1, newDB, ts); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
mt.dbName2Meta[oldDB.Name] = newDB
|
||||||
|
log.Info("alter database finished", zap.String("dbName", oldDB.Name), zap.Uint64("ts", ts))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (mt *MetaTable) DropDatabase(ctx context.Context, dbName string, ts typeutil.Timestamp) error {
|
func (mt *MetaTable) DropDatabase(ctx context.Context, dbName string, ts typeutil.Timestamp) error {
|
||||||
mt.ddLock.Lock()
|
mt.ddLock.Lock()
|
||||||
defer mt.ddLock.Unlock()
|
defer mt.ddLock.Unlock()
|
||||||
|
|||||||
@ -26,6 +26,7 @@ import (
|
|||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||||
memkv "github.com/milvus-io/milvus/internal/kv/mem"
|
memkv "github.com/milvus-io/milvus/internal/kv/mem"
|
||||||
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
|
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
|
||||||
@ -1722,6 +1723,91 @@ func TestMetaTable_CreateDatabase(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAlterDatabase(t *testing.T) {
|
||||||
|
t.Run("normal case", func(t *testing.T) {
|
||||||
|
catalog := mocks.NewRootCoordCatalog(t)
|
||||||
|
catalog.On("AlterDatabase",
|
||||||
|
mock.Anything,
|
||||||
|
mock.Anything,
|
||||||
|
mock.Anything,
|
||||||
|
).Return(nil)
|
||||||
|
|
||||||
|
db := model.NewDatabase(1, "db1", pb.DatabaseState_DatabaseCreated)
|
||||||
|
|
||||||
|
meta := &MetaTable{
|
||||||
|
dbName2Meta: map[string]*model.Database{
|
||||||
|
"db1": db,
|
||||||
|
},
|
||||||
|
names: newNameDb(),
|
||||||
|
aliases: newNameDb(),
|
||||||
|
catalog: catalog,
|
||||||
|
}
|
||||||
|
newDB := db.Clone()
|
||||||
|
db.Properties = []*commonpb.KeyValuePair{
|
||||||
|
{
|
||||||
|
Key: "key1",
|
||||||
|
Value: "value1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := meta.AlterDatabase(context.TODO(), db, newDB, typeutil.ZeroTimestamp)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("access catalog failed", func(t *testing.T) {
|
||||||
|
catalog := mocks.NewRootCoordCatalog(t)
|
||||||
|
mockErr := errors.New("access catalog failed")
|
||||||
|
catalog.On("AlterDatabase",
|
||||||
|
mock.Anything,
|
||||||
|
mock.Anything,
|
||||||
|
mock.Anything,
|
||||||
|
).Return(mockErr)
|
||||||
|
|
||||||
|
db := model.NewDatabase(1, "db1", pb.DatabaseState_DatabaseCreated)
|
||||||
|
|
||||||
|
meta := &MetaTable{
|
||||||
|
dbName2Meta: map[string]*model.Database{
|
||||||
|
"db1": db,
|
||||||
|
},
|
||||||
|
names: newNameDb(),
|
||||||
|
aliases: newNameDb(),
|
||||||
|
catalog: catalog,
|
||||||
|
}
|
||||||
|
newDB := db.Clone()
|
||||||
|
db.Properties = []*commonpb.KeyValuePair{
|
||||||
|
{
|
||||||
|
Key: "key1",
|
||||||
|
Value: "value1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := meta.AlterDatabase(context.TODO(), db, newDB, typeutil.ZeroTimestamp)
|
||||||
|
assert.ErrorIs(t, err, mockErr)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("alter database name", func(t *testing.T) {
|
||||||
|
catalog := mocks.NewRootCoordCatalog(t)
|
||||||
|
db := model.NewDatabase(1, "db1", pb.DatabaseState_DatabaseCreated)
|
||||||
|
|
||||||
|
meta := &MetaTable{
|
||||||
|
dbName2Meta: map[string]*model.Database{
|
||||||
|
"db1": db,
|
||||||
|
},
|
||||||
|
names: newNameDb(),
|
||||||
|
aliases: newNameDb(),
|
||||||
|
catalog: catalog,
|
||||||
|
}
|
||||||
|
newDB := db.Clone()
|
||||||
|
newDB.Name = "db2"
|
||||||
|
db.Properties = []*commonpb.KeyValuePair{
|
||||||
|
{
|
||||||
|
Key: "key1",
|
||||||
|
Value: "value1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := meta.AlterDatabase(context.TODO(), db, newDB, typeutil.ZeroTimestamp)
|
||||||
|
assert.Error(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestMetaTable_EmtpyDatabaseName(t *testing.T) {
|
func TestMetaTable_EmtpyDatabaseName(t *testing.T) {
|
||||||
t.Run("getDatabaseByNameInternal with empty db", func(t *testing.T) {
|
t.Run("getDatabaseByNameInternal with empty db", func(t *testing.T) {
|
||||||
mt := &MetaTable{
|
mt := &MetaTable{
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user