diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 42fd1b30ca..70c0c9482c 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -460,6 +460,10 @@ func (m *mockRootCoordClient) ListDatabases(ctx context.Context, in *milvuspb.Li panic("not implemented") // TODO: Implement } +func (m *mockRootCoordClient) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + panic("not implemented") // TODO: Implement +} + func (m *mockRootCoordClient) AlterCollection(ctx context.Context, request *milvuspb.AlterCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { panic("not implemented") // TODO: Implement } diff --git a/internal/distributed/rootcoord/client/client.go b/internal/distributed/rootcoord/client/client.go index 13c70b5a00..1379df5ce5 100644 --- a/internal/distributed/rootcoord/client/client.go +++ b/internal/distributed/rootcoord/client/client.go @@ -659,3 +659,14 @@ func (c *Client) DescribeDatabase(ctx context.Context, req *rootcoordpb.Describe return client.DescribeDatabase(ctx, req) }) } + +func (c *Client) AlterDatabase(ctx context.Context, request *rootcoordpb.AlterDatabaseRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + request = typeutil.Clone(request) + commonpbutil.UpdateMsgBase( + request.GetBase(), + commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())), + ) + return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) { + return client.AlterDatabase(ctx, request) + }) +} diff --git a/internal/distributed/rootcoord/client/client_test.go b/internal/distributed/rootcoord/client/client_test.go index f5a0a8c688..07c092e687 100644 --- a/internal/distributed/rootcoord/client/client_test.go +++ b/internal/distributed/rootcoord/client/client_test.go @@ -236,6 +236,10 @@ func Test_NewClient(t *testing.T) { r, err := client.ListDatabases(ctx, nil) retCheck(retNotNil, r, err) } + { + r, err := client.AlterDatabase(ctx, nil) + retCheck(retNotNil, r, err) + } } client.(*Client).grpcClient = &mock.GRPCClientBase[rootcoordpb.RootCoordClient]{ diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index 2d6e3f6155..42b326fab5 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -93,6 +93,10 @@ func (s *Server) ListDatabases(ctx context.Context, request *milvuspb.ListDataba return s.rootCoord.ListDatabases(ctx, request) } +func (s *Server) AlterDatabase(ctx context.Context, request *rootcoordpb.AlterDatabaseRequest) (*commonpb.Status, error) { + return s.rootCoord.AlterDatabase(ctx, request) +} + func (s *Server) CheckHealth(ctx context.Context, request *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { return s.rootCoord.CheckHealth(ctx, request) } diff --git a/internal/distributed/rootcoord/service_test.go b/internal/distributed/rootcoord/service_test.go index 4df96aeafe..4ba6fc3b3d 100644 --- a/internal/distributed/rootcoord/service_test.go +++ b/internal/distributed/rootcoord/service_test.go @@ -31,11 +31,13 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/mocks" + "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/types" kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tikv" ) @@ -58,6 +60,10 @@ func (m *mockCore) ListDatabases(ctx context.Context, request *milvuspb.ListData }, nil } +func (m *mockCore) AlterDatabase(ctx context.Context, request *rootcoordpb.AlterDatabaseRequest) (*commonpb.Status, error) { + return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil +} + func (m *mockCore) RenameCollection(ctx context.Context, request *milvuspb.RenameCollectionRequest) (*commonpb.Status, error) { return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil } @@ -197,6 +203,13 @@ func TestRun(t *testing.T) { assert.Nil(t, err) assert.Equal(t, commonpb.ErrorCode_Success, ret.GetStatus().GetErrorCode()) }) + + t.Run("AlterDatabase", func(t *testing.T) { + ret, err := svr.AlterDatabase(ctx, nil) + assert.Nil(t, err) + assert.True(t, merr.Ok(ret)) + }) + err = svr.Stop() assert.NoError(t, err) } diff --git a/internal/mocks/mock_rootcoord.go b/internal/mocks/mock_rootcoord.go index 34dc55c0ed..d41d2cbd9a 100644 --- a/internal/mocks/mock_rootcoord.go +++ b/internal/mocks/mock_rootcoord.go @@ -256,6 +256,61 @@ func (_c *RootCoord_AlterCollection_Call) RunAndReturn(run func(context.Context, return _c } +// AlterDatabase provides a mock function with given fields: _a0, _a1 +func (_m *RootCoord) AlterDatabase(_a0 context.Context, _a1 *rootcoordpb.AlterDatabaseRequest) (*commonpb.Status, error) { + ret := _m.Called(_a0, _a1) + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *rootcoordpb.AlterDatabaseRequest) (*commonpb.Status, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *rootcoordpb.AlterDatabaseRequest) *commonpb.Status); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *rootcoordpb.AlterDatabaseRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RootCoord_AlterDatabase_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterDatabase' +type RootCoord_AlterDatabase_Call struct { + *mock.Call +} + +// AlterDatabase is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *rootcoordpb.AlterDatabaseRequest +func (_e *RootCoord_Expecter) AlterDatabase(_a0 interface{}, _a1 interface{}) *RootCoord_AlterDatabase_Call { + return &RootCoord_AlterDatabase_Call{Call: _e.mock.On("AlterDatabase", _a0, _a1)} +} + +func (_c *RootCoord_AlterDatabase_Call) Run(run func(_a0 context.Context, _a1 *rootcoordpb.AlterDatabaseRequest)) *RootCoord_AlterDatabase_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*rootcoordpb.AlterDatabaseRequest)) + }) + return _c +} + +func (_c *RootCoord_AlterDatabase_Call) Return(_a0 *commonpb.Status, _a1 error) *RootCoord_AlterDatabase_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *RootCoord_AlterDatabase_Call) RunAndReturn(run func(context.Context, *rootcoordpb.AlterDatabaseRequest) (*commonpb.Status, error)) *RootCoord_AlterDatabase_Call { + _c.Call.Return(run) + return _c +} + // CheckHealth provides a mock function with given fields: _a0, _a1 func (_m *RootCoord) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_rootcoord_client.go b/internal/mocks/mock_rootcoord_client.go index 83852dc4fc..4a1ba8099b 100644 --- a/internal/mocks/mock_rootcoord_client.go +++ b/internal/mocks/mock_rootcoord_client.go @@ -313,6 +313,76 @@ func (_c *MockRootCoordClient_AlterCollection_Call) RunAndReturn(run func(contex return _c } +// AlterDatabase provides a mock function with given fields: ctx, in, opts +func (_m *MockRootCoordClient) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *commonpb.Status + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *rootcoordpb.AlterDatabaseRequest, ...grpc.CallOption) (*commonpb.Status, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *rootcoordpb.AlterDatabaseRequest, ...grpc.CallOption) *commonpb.Status); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *rootcoordpb.AlterDatabaseRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockRootCoordClient_AlterDatabase_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterDatabase' +type MockRootCoordClient_AlterDatabase_Call struct { + *mock.Call +} + +// AlterDatabase is a helper method to define mock.On call +// - ctx context.Context +// - in *rootcoordpb.AlterDatabaseRequest +// - opts ...grpc.CallOption +func (_e *MockRootCoordClient_Expecter) AlterDatabase(ctx interface{}, in interface{}, opts ...interface{}) *MockRootCoordClient_AlterDatabase_Call { + return &MockRootCoordClient_AlterDatabase_Call{Call: _e.mock.On("AlterDatabase", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockRootCoordClient_AlterDatabase_Call) Run(run func(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest, opts ...grpc.CallOption)) *MockRootCoordClient_AlterDatabase_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*rootcoordpb.AlterDatabaseRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockRootCoordClient_AlterDatabase_Call) Return(_a0 *commonpb.Status, _a1 error) *MockRootCoordClient_AlterDatabase_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockRootCoordClient_AlterDatabase_Call) RunAndReturn(run func(context.Context, *rootcoordpb.AlterDatabaseRequest, ...grpc.CallOption) (*commonpb.Status, error)) *MockRootCoordClient_AlterDatabase_Call { + _c.Call.Return(run) + return _c +} + // CheckHealth provides a mock function with given fields: ctx, in, opts func (_m *MockRootCoordClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { _va := make([]interface{}, len(opts)) diff --git a/internal/proto/root_coord.proto b/internal/proto/root_coord.proto index f197d23b06..eb0605273a 100644 --- a/internal/proto/root_coord.proto +++ b/internal/proto/root_coord.proto @@ -141,6 +141,7 @@ service RootCoord { rpc DropDatabase(milvus.DropDatabaseRequest) returns (common.Status) {} rpc ListDatabases(milvus.ListDatabasesRequest) returns (milvus.ListDatabasesResponse) {} rpc DescribeDatabase(DescribeDatabaseRequest) returns(DescribeDatabaseResponse){} + rpc AlterDatabase(AlterDatabaseRequest) returns(common.Status){} } message AllocTimestampRequest { @@ -218,3 +219,10 @@ message DescribeDatabaseResponse { int64 dbID = 3; uint64 created_timestamp = 4; } + +message AlterDatabaseRequest { + common.MsgBase base = 1; + string db_name = 2; + string db_id = 3; + repeated common.KeyValuePair properties = 4; +} diff --git a/internal/proxy/rootcoord_mock_test.go b/internal/proxy/rootcoord_mock_test.go index 53d639e91c..4d6d4a4392 100644 --- a/internal/proxy/rootcoord_mock_test.go +++ b/internal/proxy/rootcoord_mock_test.go @@ -1115,6 +1115,10 @@ func (coord *RootCoordMock) DescribeDatabase(ctx context.Context, in *rootcoordp return &rootcoordpb.DescribeDatabaseResponse{}, nil } +func (coord *RootCoordMock) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + return &commonpb.Status{}, nil +} + type DescribeCollectionFunc func(ctx context.Context, request *milvuspb.DescribeCollectionRequest, opts ...grpc.CallOption) (*milvuspb.DescribeCollectionResponse, error) type ShowPartitionsFunc func(ctx context.Context, request *milvuspb.ShowPartitionsRequest, opts ...grpc.CallOption) (*milvuspb.ShowPartitionsResponse, error) diff --git a/internal/rootcoord/alter_database_task.go b/internal/rootcoord/alter_database_task.go new file mode 100644 index 0000000000..7f7340dfc5 --- /dev/null +++ b/internal/rootcoord/alter_database_task.go @@ -0,0 +1,93 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rootcoord + +import ( + "context" + "fmt" + + "github.com/cockroachdb/errors" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + "github.com/milvus-io/milvus/pkg/log" +) + +type alterDatabaseTask struct { + baseTask + Req *rootcoordpb.AlterDatabaseRequest +} + +func (a *alterDatabaseTask) Prepare(ctx context.Context) error { + if a.Req.GetDbName() == "" { + return fmt.Errorf("alter database failed, database name does not exists") + } + + return nil +} + +func (a *alterDatabaseTask) Execute(ctx context.Context) error { + // Now we only support alter properties of database + if a.Req.GetProperties() == nil { + return errors.New("only support alter database properties, but database properties is empty") + } + + oldDB, err := a.core.meta.GetDatabaseByName(ctx, a.Req.GetDbName(), a.ts) + if err != nil { + log.Ctx(ctx).Warn("get database failed during changing database props", + zap.String("databaseName", a.Req.GetDbName()), zap.Uint64("ts", a.ts)) + return err + } + + newDB := oldDB.Clone() + ret := updateProperties(oldDB.Properties, a.Req.GetProperties()) + newDB.Properties = ret + + ts := a.GetTs() + redoTask := newBaseRedoTask(a.core.stepExecutor) + redoTask.AddSyncStep(&AlterDatabaseStep{ + baseStep: baseStep{core: a.core}, + oldDB: oldDB, + newDB: newDB, + ts: ts, + }) + + return redoTask.Execute(ctx) +} + +func updateProperties(oldProps []*commonpb.KeyValuePair, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair { + props := make(map[string]string) + for _, prop := range oldProps { + props[prop.Key] = prop.Value + } + + for _, prop := range updatedProps { + props[prop.Key] = prop.Value + } + + propKV := make([]*commonpb.KeyValuePair, 0) + + for key, value := range props { + propKV = append(propKV, &commonpb.KeyValuePair{ + Key: key, + Value: value, + }) + } + + return propKV +} diff --git a/internal/rootcoord/alter_database_task_test.go b/internal/rootcoord/alter_database_task_test.go new file mode 100644 index 0000000000..31b0ce3e6e --- /dev/null +++ b/internal/rootcoord/alter_database_task_test.go @@ -0,0 +1,180 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rootcoord + +import ( + "context" + "testing" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/proto/rootcoordpb" + mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" + "github.com/milvus-io/milvus/pkg/common" +) + +func Test_alterDatabaseTask_Prepare(t *testing.T) { + t.Run("invalid collectionID", func(t *testing.T) { + task := &alterDatabaseTask{Req: &rootcoordpb.AlterDatabaseRequest{}} + err := task.Prepare(context.Background()) + assert.Error(t, err) + }) + + t.Run("normal case", func(t *testing.T) { + task := &alterDatabaseTask{ + Req: &rootcoordpb.AlterDatabaseRequest{ + DbName: "cn", + }, + } + err := task.Prepare(context.Background()) + assert.NoError(t, err) + }) +} + +func Test_alterDatabaseTask_Execute(t *testing.T) { + properties := []*commonpb.KeyValuePair{ + { + Key: common.CollectionTTLConfigKey, + Value: "3600", + }, + } + + t.Run("properties is empty", func(t *testing.T) { + task := &alterDatabaseTask{Req: &rootcoordpb.AlterDatabaseRequest{}} + err := task.Execute(context.Background()) + assert.Error(t, err) + }) + + t.Run("failed to create alias", func(t *testing.T) { + core := newTestCore(withInvalidMeta()) + task := &alterDatabaseTask{ + baseTask: newBaseTask(context.Background(), core), + Req: &rootcoordpb.AlterDatabaseRequest{ + DbName: "cn", + Properties: properties, + }, + } + err := task.Execute(context.Background()) + assert.Error(t, err) + }) + + t.Run("alter step failed", func(t *testing.T) { + meta := mockrootcoord.NewIMetaTable(t) + meta.On("GetDatabaseByName", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(&model.Database{ID: int64(1)}, nil) + meta.On("AlterDatabase", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(errors.New("err")) + + core := newTestCore(withMeta(meta)) + task := &alterDatabaseTask{ + baseTask: newBaseTask(context.Background(), core), + Req: &rootcoordpb.AlterDatabaseRequest{ + DbName: "cn", + Properties: properties, + }, + } + + err := task.Execute(context.Background()) + assert.Error(t, err) + }) + + t.Run("alter successfully", func(t *testing.T) { + meta := mockrootcoord.NewIMetaTable(t) + meta.On("GetDatabaseByName", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(&model.Database{ID: int64(1)}, nil) + meta.On("AlterDatabase", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(nil) + + core := newTestCore(withMeta(meta)) + task := &alterDatabaseTask{ + baseTask: newBaseTask(context.Background(), core), + Req: &rootcoordpb.AlterDatabaseRequest{ + DbName: "cn", + Properties: properties, + }, + } + + err := task.Execute(context.Background()) + assert.NoError(t, err) + }) + + t.Run("test update collection props", func(t *testing.T) { + oldProps := []*commonpb.KeyValuePair{ + { + Key: common.CollectionTTLConfigKey, + Value: "1", + }, + } + + updateProps1 := []*commonpb.KeyValuePair{ + { + Key: common.CollectionAutoCompactionKey, + Value: "true", + }, + } + + ret := updateProperties(oldProps, updateProps1) + + assert.Contains(t, ret, &commonpb.KeyValuePair{ + Key: common.CollectionTTLConfigKey, + Value: "1", + }) + + assert.Contains(t, ret, &commonpb.KeyValuePair{ + Key: common.CollectionAutoCompactionKey, + Value: "true", + }) + + updateProps2 := []*commonpb.KeyValuePair{ + { + Key: common.CollectionTTLConfigKey, + Value: "2", + }, + } + ret2 := updateProperties(ret, updateProps2) + + assert.Contains(t, ret2, &commonpb.KeyValuePair{ + Key: common.CollectionTTLConfigKey, + Value: "2", + }) + + assert.Contains(t, ret2, &commonpb.KeyValuePair{ + Key: common.CollectionAutoCompactionKey, + Value: "true", + }) + }) +} diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 0ec6489ba6..3054276271 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -49,6 +49,7 @@ type IMetaTable interface { CreateDatabase(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error DropDatabase(ctx context.Context, dbName string, ts typeutil.Timestamp) error ListDatabases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Database, error) + AlterDatabase(ctx context.Context, oldDB *model.Database, newDB *model.Database, ts typeutil.Timestamp) error AddCollection(ctx context.Context, coll *model.Collection) error ChangeCollectionState(ctx context.Context, collectionID UniqueID, state pb.CollectionState, ts Timestamp) error diff --git a/internal/rootcoord/mocks/meta_table.go b/internal/rootcoord/mocks/meta_table.go index 26ee021238..f67bc65c8f 100644 --- a/internal/rootcoord/mocks/meta_table.go +++ b/internal/rootcoord/mocks/meta_table.go @@ -289,6 +289,51 @@ func (_c *IMetaTable_AlterCredential_Call) RunAndReturn(run func(*internalpb.Cre return _c } +// AlterDatabase provides a mock function with given fields: ctx, oldDB, newDB, ts +func (_m *IMetaTable) AlterDatabase(ctx context.Context, oldDB *model.Database, newDB *model.Database, ts uint64) error { + ret := _m.Called(ctx, oldDB, newDB, ts) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *model.Database, *model.Database, uint64) error); ok { + r0 = rf(ctx, oldDB, newDB, ts) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// IMetaTable_AlterDatabase_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterDatabase' +type IMetaTable_AlterDatabase_Call struct { + *mock.Call +} + +// AlterDatabase is a helper method to define mock.On call +// - ctx context.Context +// - oldDB *model.Database +// - newDB *model.Database +// - ts uint64 +func (_e *IMetaTable_Expecter) AlterDatabase(ctx interface{}, oldDB interface{}, newDB interface{}, ts interface{}) *IMetaTable_AlterDatabase_Call { + return &IMetaTable_AlterDatabase_Call{Call: _e.mock.On("AlterDatabase", ctx, oldDB, newDB, ts)} +} + +func (_c *IMetaTable_AlterDatabase_Call) Run(run func(ctx context.Context, oldDB *model.Database, newDB *model.Database, ts uint64)) *IMetaTable_AlterDatabase_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*model.Database), args[2].(*model.Database), args[3].(uint64)) + }) + return _c +} + +func (_c *IMetaTable_AlterDatabase_Call) Return(_a0 error) *IMetaTable_AlterDatabase_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *IMetaTable_AlterDatabase_Call) RunAndReturn(run func(context.Context, *model.Database, *model.Database, uint64) error) *IMetaTable_AlterDatabase_Call { + _c.Call.Return(run) + return _c +} + // ChangeCollectionState provides a mock function with given fields: ctx, collectionID, state, ts func (_m *IMetaTable) ChangeCollectionState(ctx context.Context, collectionID int64, state etcdpb.CollectionState, ts uint64) error { ret := _m.Called(ctx, collectionID, state, ts) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 7be954d95b..cc330fa3c9 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1290,6 +1290,58 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection return merr.Success(), nil } +func (c *Core) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest) (*commonpb.Status, error) { + if err := merr.CheckHealthy(c.GetStateCode()); err != nil { + return merr.Status(err), nil + } + + method := "AlterDatabase" + + metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.TotalLabel).Inc() + tr := timerecord.NewTimeRecorder(method) + + log.Ctx(ctx).Info("received request to alter database", + zap.String("role", typeutil.RootCoordRole), + zap.String("name", in.GetDbName()), + zap.Any("props", in.Properties)) + + t := &alterDatabaseTask{ + baseTask: newBaseTask(ctx, c), + Req: in, + } + + if err := c.scheduler.AddTask(t); err != nil { + log.Warn("failed to enqueue request to alter database", + zap.String("role", typeutil.RootCoordRole), + zap.String("name", in.GetDbName()), + zap.Error(err)) + + metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() + return merr.Status(err), nil + } + + if err := t.WaitToFinish(); err != nil { + log.Warn("failed to alter database", + zap.String("role", typeutil.RootCoordRole), + zap.Error(err), + zap.String("name", in.GetDbName()), + zap.Uint64("ts", t.GetTs())) + + metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() + return merr.Status(err), nil + } + + metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() + metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues(method).Observe(float64(t.queueDur.Milliseconds())) + + log.Ctx(ctx).Info("done to alter database", + zap.String("role", typeutil.RootCoordRole), + zap.String("name", in.GetDbName()), + zap.Uint64("ts", t.GetTs())) + return merr.Success(), nil +} + // CreatePartition create partition func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { if err := merr.CheckHealthy(c.GetStateCode()); err != nil { diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 570f5d6f6b..832526d61b 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -179,6 +179,45 @@ func TestRootCoord_ListDatabases(t *testing.T) { }) } +func TestRootCoord_AlterDatabase(t *testing.T) { + t.Run("not healthy", func(t *testing.T) { + c := newTestCore(withAbnormalCode()) + ctx := context.Background() + resp, err := c.AlterDatabase(ctx, &rootcoordpb.AlterDatabaseRequest{}) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_NotReadyServe, resp.GetErrorCode()) + }) + + t.Run("failed to add task", func(t *testing.T) { + c := newTestCore(withHealthyCode(), + withInvalidScheduler()) + + ctx := context.Background() + resp, err := c.AlterDatabase(ctx, &rootcoordpb.AlterDatabaseRequest{}) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) + }) + + t.Run("failed to execute", func(t *testing.T) { + c := newTestCore(withHealthyCode(), + withTaskFailScheduler()) + + ctx := context.Background() + resp, err := c.AlterDatabase(ctx, &rootcoordpb.AlterDatabaseRequest{}) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) + }) + + t.Run("ok", func(t *testing.T) { + c := newTestCore(withHealthyCode(), + withValidScheduler()) + ctx := context.Background() + resp, err := c.AlterDatabase(ctx, &rootcoordpb.AlterDatabaseRequest{}) + assert.NoError(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) + }) +} + func TestRootCoord_CreateCollection(t *testing.T) { t.Run("not healthy", func(t *testing.T) { c := newTestCore(withAbnormalCode()) diff --git a/internal/rootcoord/step.go b/internal/rootcoord/step.go index 9f9da39da0..7c76715029 100644 --- a/internal/rootcoord/step.go +++ b/internal/rootcoord/step.go @@ -459,6 +459,22 @@ func (b *BroadcastAlteredCollectionStep) Desc() string { return fmt.Sprintf("broadcast altered collection, collectionID: %d", b.req.CollectionID) } +type AlterDatabaseStep struct { + baseStep + oldDB *model.Database + newDB *model.Database + ts Timestamp +} + +func (a *AlterDatabaseStep) Execute(ctx context.Context) ([]nestedStep, error) { + err := a.core.meta.AlterDatabase(ctx, a.oldDB, a.newDB, a.ts) + return nil, err +} + +func (a *AlterDatabaseStep) Desc() string { + return fmt.Sprintf("alter database, databaseID: %d, databaseName: %s, ts: %d", a.oldDB.ID, a.oldDB.Name, a.ts) +} + var ( confirmGCInterval = time.Minute * 20 allPartition UniqueID = -1 diff --git a/internal/util/mock/grpc_rootcoord_client.go b/internal/util/mock/grpc_rootcoord_client.go index e14c6da179..097303b651 100644 --- a/internal/util/mock/grpc_rootcoord_client.go +++ b/internal/util/mock/grpc_rootcoord_client.go @@ -258,6 +258,10 @@ func (m *GrpcRootCoordClient) AlterCollection(ctx context.Context, in *milvuspb. return &commonpb.Status{}, m.Err } +func (m *GrpcRootCoordClient) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + return &commonpb.Status{}, m.Err +} + func (m *GrpcRootCoordClient) Close() error { return nil }