From 4dc75a6e2c4f88ad6b8ece863bd826744f652211 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Fri, 17 Oct 2025 16:38:10 +0800 Subject: [PATCH] enhance: support database with WAL-based DDL framework (#44822) issue: #43897 - Database related DDL is implemented by WAL-based DDL framework now. - Support following message type in wal CreateDatabase, AlterDatabase, DropDatabase. - Database DDL can be synced by new CDC now. - Refactor some UT for Database DDL. Signed-off-by: chyezh --- internal/rootcoord/alter_database_task.go | 246 ------------ .../rootcoord/alter_database_task_test.go | 379 ------------------ internal/rootcoord/create_db_task.go | 68 ---- internal/rootcoord/create_db_task_test.go | 124 ------ internal/rootcoord/ddl_callbacks.go | 18 +- .../rootcoord/ddl_callbacks_alter_database.go | 180 +++++++++ .../ddl_callbacks_create_database.go | 84 ++++ .../rootcoord/ddl_callbacks_database_test.go | 109 +++++ .../rootcoord/ddl_callbacks_drop_database.go | 73 ++++ internal/rootcoord/drop_db_task.go | 75 ---- internal/rootcoord/drop_db_task_test.go | 98 ----- internal/rootcoord/meta_table.go | 59 ++- internal/rootcoord/meta_table_test.go | 79 ++-- internal/rootcoord/mocks/meta_table.go | 115 +++++- internal/rootcoord/root_coord.go | 73 +--- internal/rootcoord/root_coord_test.go | 87 ---- internal/rootcoord/step.go | 16 - internal/rootcoord/task_test.go | 27 -- 18 files changed, 660 insertions(+), 1250 deletions(-) delete mode 100644 internal/rootcoord/alter_database_task.go delete mode 100644 internal/rootcoord/alter_database_task_test.go delete mode 100644 internal/rootcoord/create_db_task.go delete mode 100644 internal/rootcoord/create_db_task_test.go create mode 100644 internal/rootcoord/ddl_callbacks_alter_database.go create mode 100644 internal/rootcoord/ddl_callbacks_create_database.go create mode 100644 internal/rootcoord/ddl_callbacks_database_test.go create mode 100644 internal/rootcoord/ddl_callbacks_drop_database.go delete mode 100644 internal/rootcoord/drop_db_task.go delete mode 100644 internal/rootcoord/drop_db_task_test.go diff --git a/internal/rootcoord/alter_database_task.go b/internal/rootcoord/alter_database_task.go deleted file mode 100644 index a64ce54c46..0000000000 --- a/internal/rootcoord/alter_database_task.go +++ /dev/null @@ -1,246 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rootcoord - -import ( - "context" - - "github.com/cockroachdb/errors" - "github.com/samber/lo" - "go.uber.org/zap" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus/internal/metastore/model" - "github.com/milvus-io/milvus/internal/util/hookutil" - "github.com/milvus-io/milvus/internal/util/proxyutil" - "github.com/milvus-io/milvus/pkg/v2/common" - "github.com/milvus-io/milvus/pkg/v2/log" - "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" - "github.com/milvus-io/milvus/pkg/v2/proto/querypb" - "github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb" - "github.com/milvus-io/milvus/pkg/v2/util/merr" - "github.com/milvus-io/milvus/pkg/v2/util/typeutil" -) - -type alterDatabaseTask struct { - baseTask - Req *rootcoordpb.AlterDatabaseRequest -} - -func (a *alterDatabaseTask) Prepare(ctx context.Context) error { - if a.Req.GetDbName() == "" { - return errors.New("alter database failed, database name does not exists") - } - - // TODO SimFG maybe it will support to alter the replica.id properties in the future when the database has no collections - // now it can't be because the latest database properties can't be notified to the querycoord and datacoord - replicateID, _ := common.GetReplicateID(a.Req.Properties) - if replicateID != "" { - colls, err := a.core.meta.ListCollections(ctx, a.Req.DbName, a.ts, true) - if err != nil { - return err - } - if len(colls) > 0 { - return errors.New("can't set replicate id on database with collections") - } - } - - return nil -} - -func (a *alterDatabaseTask) Execute(ctx context.Context) error { - log := log.Ctx(ctx).With( - zap.String("alterDatabaseTask", a.Req.GetDbName()), - zap.String("db", a.Req.GetDbId()), - zap.Uint64("ts", a.GetTs())) - - if a.Req.GetProperties() == nil && a.Req.GetDeleteKeys() == nil { - log.Warn("alter database with empty properties and delete keys, expected to set either properties or delete keys ") - return errors.New("alter database with empty properties and delete keys, expected to set either properties or delete keys") - } - - if len(a.Req.GetProperties()) > 0 && len(a.Req.GetDeleteKeys()) > 0 { - return errors.New("alter database cannot modify properties and delete keys at the same time") - } - - if hookutil.ContainsCipherProperties(a.Req.GetProperties(), a.Req.GetDeleteKeys()) { - log.Info("skip to alter collection due to cipher properties were detected in the request properties") - return errors.New("can not alter cipher related properties") - } - - oldDB, err := a.core.meta.GetDatabaseByName(ctx, a.Req.GetDbName(), a.GetTs()) - if err != nil { - log.Warn("get database failed during changing database props") - return err - } - - var newProperties []*commonpb.KeyValuePair - if (len(a.Req.GetProperties())) > 0 { - if IsSubsetOfProperties(a.Req.GetProperties(), oldDB.Properties) { - log.Info("skip to alter database due to no changes were detected in the properties") - return nil - } - newProperties = MergeProperties(oldDB.Properties, a.Req.GetProperties()) - } else if (len(a.Req.GetDeleteKeys())) > 0 { - newProperties = DeleteProperties(oldDB.Properties, a.Req.GetDeleteKeys()) - } - - return executeAlterDatabaseTaskSteps(ctx, a.core, oldDB, oldDB.Properties, newProperties, a.GetTs()) -} - -func (a *alterDatabaseTask) GetLockerKey() LockerKey { - return NewLockerKeyChain( - NewClusterLockerKey(false), - NewDatabaseLockerKey(a.Req.GetDbName(), true), - ) -} - -func MergeProperties(oldProps, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair { - _, existEndTS := common.GetReplicateEndTS(updatedProps) - if existEndTS { - updatedProps = append(updatedProps, &commonpb.KeyValuePair{ - Key: common.ReplicateIDKey, - Value: "", - }) - } - - props := make(map[string]string) - for _, prop := range oldProps { - props[prop.Key] = prop.Value - } - - for _, prop := range updatedProps { - props[prop.Key] = prop.Value - } - - propKV := make([]*commonpb.KeyValuePair, 0) - - for key, value := range props { - propKV = append(propKV, &commonpb.KeyValuePair{ - Key: key, - Value: value, - }) - } - - return propKV -} - -func executeAlterDatabaseTaskSteps(ctx context.Context, - core *Core, - dbInfo *model.Database, - oldProperties []*commonpb.KeyValuePair, - newProperties []*commonpb.KeyValuePair, - ts Timestamp, -) error { - oldDB := dbInfo.Clone() - oldDB.Properties = oldProperties - newDB := dbInfo.Clone() - newDB.Properties = newProperties - redoTask := newBaseRedoTask(core.stepExecutor) - redoTask.AddSyncStep(&AlterDatabaseStep{ - baseStep: baseStep{core: core}, - oldDB: oldDB, - newDB: newDB, - ts: ts, - }) - - redoTask.AddSyncStep(&expireCacheStep{ - baseStep: baseStep{core: core}, - dbName: newDB.Name, - ts: ts, - // make sure to send the "expire cache" request - // because it won't send this request when the length of collection names array is zero - collectionNames: []string{""}, - opts: []proxyutil.ExpireCacheOpt{ - proxyutil.SetMsgType(commonpb.MsgType_AlterDatabase), - }, - }) - - oldReplicaNumber, _ := common.DatabaseLevelReplicaNumber(oldDB.Properties) - oldResourceGroups, _ := common.DatabaseLevelResourceGroups(oldDB.Properties) - newReplicaNumber, _ := common.DatabaseLevelReplicaNumber(newDB.Properties) - newResourceGroups, _ := common.DatabaseLevelResourceGroups(newDB.Properties) - left, right := lo.Difference(oldResourceGroups, newResourceGroups) - rgChanged := len(left) > 0 || len(right) > 0 - replicaChanged := oldReplicaNumber != newReplicaNumber - if rgChanged || replicaChanged { - log.Ctx(ctx).Warn("alter database trigger update load config", - zap.Int64("dbID", oldDB.ID), - zap.Int64("oldReplicaNumber", oldReplicaNumber), - zap.Int64("newReplicaNumber", newReplicaNumber), - zap.Strings("oldResourceGroups", oldResourceGroups), - zap.Strings("newResourceGroups", newResourceGroups), - ) - redoTask.AddAsyncStep(NewSimpleStep("", func(ctx context.Context) ([]nestedStep, error) { - colls, err := core.meta.ListCollections(ctx, oldDB.Name, typeutil.MaxTimestamp, true) - if err != nil { - log.Ctx(ctx).Warn("failed to trigger update load config for database", zap.Int64("dbID", oldDB.ID), zap.Error(err)) - return nil, err - } - if len(colls) == 0 { - return nil, nil - } - - resp, err := core.mixCoord.UpdateLoadConfig(ctx, &querypb.UpdateLoadConfigRequest{ - CollectionIDs: lo.Map(colls, func(coll *model.Collection, _ int) int64 { return coll.CollectionID }), - ReplicaNumber: int32(newReplicaNumber), - ResourceGroups: newResourceGroups, - }) - if err := merr.CheckRPCCall(resp, err); err != nil { - log.Ctx(ctx).Warn("failed to trigger update load config for database", zap.Int64("dbID", oldDB.ID), zap.Error(err)) - return nil, err - } - return nil, nil - })) - } - - oldReplicateEnable, _ := common.IsReplicateEnabled(oldDB.Properties) - newReplicateEnable, ok := common.IsReplicateEnabled(newDB.Properties) - if ok && !newReplicateEnable && oldReplicateEnable { - replicateID, _ := common.GetReplicateID(oldDB.Properties) - redoTask.AddAsyncStep(NewSimpleStep("send replicate end msg for db", func(ctx context.Context) ([]nestedStep, error) { - msgPack := &msgstream.MsgPack{} - msg := &msgstream.ReplicateMsg{ - BaseMsg: msgstream.BaseMsg{ - Ctx: ctx, - BeginTimestamp: ts, - EndTimestamp: ts, - HashValues: []uint32{0}, - }, - ReplicateMsg: &msgpb.ReplicateMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Replicate, - Timestamp: ts, - ReplicateInfo: &commonpb.ReplicateInfo{ - IsReplicate: true, - ReplicateID: replicateID, - }, - }, - IsEnd: true, - Database: newDB.Name, - Collection: "", - }, - } - msgPack.Msgs = append(msgPack.Msgs, msg) - log.Info("send replicate end msg for db", zap.String("db", newDB.Name), zap.String("replicateID", replicateID)) - return nil, core.chanTimeTick.broadcastDmlChannels(core.chanTimeTick.listDmlChannels(), msgPack) - })) - } - - return redoTask.Execute(ctx) -} diff --git a/internal/rootcoord/alter_database_task_test.go b/internal/rootcoord/alter_database_task_test.go deleted file mode 100644 index e9428476c9..0000000000 --- a/internal/rootcoord/alter_database_task_test.go +++ /dev/null @@ -1,379 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rootcoord - -import ( - "context" - "testing" - "time" - - "github.com/cockroachdb/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus/internal/metastore/model" - mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" - "github.com/milvus-io/milvus/pkg/v2/common" - "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" - "github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb" - "github.com/milvus-io/milvus/pkg/v2/util/funcutil" -) - -func Test_alterDatabaseTask_Prepare(t *testing.T) { - t.Run("invalid collectionID", func(t *testing.T) { - task := &alterDatabaseTask{Req: &rootcoordpb.AlterDatabaseRequest{}} - err := task.Prepare(context.Background()) - assert.Error(t, err) - }) - - t.Run("normal case", func(t *testing.T) { - task := &alterDatabaseTask{ - Req: &rootcoordpb.AlterDatabaseRequest{ - DbName: "cn", - }, - } - err := task.Prepare(context.Background()) - assert.NoError(t, err) - }) - - t.Run("replicate id", func(t *testing.T) { - { - // no collections - meta := mockrootcoord.NewIMetaTable(t) - core := newTestCore(withMeta(meta)) - meta.EXPECT(). - ListCollections(mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return([]*model.Collection{}, nil). - Once() - task := &alterDatabaseTask{ - baseTask: newBaseTask(context.Background(), core), - Req: &rootcoordpb.AlterDatabaseRequest{ - DbName: "cn", - Properties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateIDKey, - Value: "local-test", - }, - }, - }, - } - err := task.Prepare(context.Background()) - assert.NoError(t, err) - } - { - meta := mockrootcoord.NewIMetaTable(t) - core := newTestCore(withMeta(meta)) - meta.EXPECT().ListCollections(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*model.Collection{ - { - Name: "foo", - }, - }, nil).Once() - task := &alterDatabaseTask{ - baseTask: newBaseTask(context.Background(), core), - Req: &rootcoordpb.AlterDatabaseRequest{ - DbName: "cn", - Properties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateIDKey, - Value: "local-test", - }, - }, - }, - } - err := task.Prepare(context.Background()) - assert.Error(t, err) - } - { - meta := mockrootcoord.NewIMetaTable(t) - core := newTestCore(withMeta(meta)) - meta.EXPECT().ListCollections(mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return(nil, errors.New("err")). - Once() - task := &alterDatabaseTask{ - baseTask: newBaseTask(context.Background(), core), - Req: &rootcoordpb.AlterDatabaseRequest{ - DbName: "cn", - Properties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateIDKey, - Value: "local-test", - }, - }, - }, - } - err := task.Prepare(context.Background()) - assert.Error(t, err) - } - }) -} - -func Test_alterDatabaseTask_Execute(t *testing.T) { - properties := []*commonpb.KeyValuePair{ - { - Key: common.CollectionTTLConfigKey, - Value: "3600", - }, - } - - t.Run("properties is empty", func(t *testing.T) { - task := &alterDatabaseTask{Req: &rootcoordpb.AlterDatabaseRequest{}} - err := task.Execute(context.Background()) - assert.Error(t, err) - }) - - t.Run("failed to create alias", func(t *testing.T) { - core := newTestCore(withInvalidMeta()) - task := &alterDatabaseTask{ - baseTask: newBaseTask(context.Background(), core), - Req: &rootcoordpb.AlterDatabaseRequest{ - DbName: "cn", - Properties: properties, - }, - } - err := task.Execute(context.Background()) - assert.Error(t, err) - }) - - meta := mockrootcoord.NewIMetaTable(t) - properties = append(properties, &commonpb.KeyValuePair{Key: common.DatabaseForceDenyReadingKey, Value: "true"}) - - meta.On("GetDatabaseByName", - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - ).Return(&model.Database{ID: int64(1), Properties: properties}, nil).Maybe() - meta.On("AlterDatabase", - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - ).Return(nil).Maybe() - - t.Run("alter skip due to no change", func(t *testing.T) { - core := newTestCore(withMeta(meta)) - task := &alterDatabaseTask{ - baseTask: newBaseTask(context.Background(), core), - Req: &rootcoordpb.AlterDatabaseRequest{ - DbName: "cn", - Properties: []*commonpb.KeyValuePair{ - { - Key: common.DatabaseForceDenyReadingKey, - Value: "true", - }, - }, - }, - } - - err := task.Execute(context.Background()) - assert.NoError(t, err) - }) - - t.Run("alter step failed", func(t *testing.T) { - meta := mockrootcoord.NewIMetaTable(t) - meta.On("GetDatabaseByName", - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - ).Return(&model.Database{ID: int64(1)}, nil) - meta.On("AlterDatabase", - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - ).Return(errors.New("err")) - - core := newTestCore(withMeta(meta)) - task := &alterDatabaseTask{ - baseTask: newBaseTask(context.Background(), core), - Req: &rootcoordpb.AlterDatabaseRequest{ - DbName: "cn", - Properties: properties, - }, - } - - err := task.Execute(context.Background()) - assert.Error(t, err) - }) - - t.Run("alter successfully", func(t *testing.T) { - meta := mockrootcoord.NewIMetaTable(t) - meta.On("GetDatabaseByName", - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - ).Return(&model.Database{ - ID: int64(1), - Name: "cn", - Properties: []*commonpb.KeyValuePair{ - { - Key: common.ReplicateIDKey, - Value: "local-test", - }, - }, - }, nil) - meta.On("AlterDatabase", - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - ).Return(nil) - // the chan length should larger than 4, because newChanTimeTickSync will send 4 ts messages when execute the `broadcast` step - packChan := make(chan *msgstream.ConsumeMsgPack, 10) - ticker := newChanTimeTickSync(packChan) - ticker.addDmlChannels("by-dev-rootcoord-dml_1") - - core := newTestCore(withMeta(meta), withValidProxyManager(), withTtSynchronizer(ticker)) - newPros := append(properties, - &commonpb.KeyValuePair{Key: common.ReplicateEndTSKey, Value: "1000"}, - ) - task := &alterDatabaseTask{ - baseTask: newBaseTask(context.Background(), core), - Req: &rootcoordpb.AlterDatabaseRequest{ - DbName: "cn", - Properties: newPros, - }, - } - - unmarshalFactory := &msgstream.ProtoUDFactory{} - unmarshalDispatcher := unmarshalFactory.NewUnmarshalDispatcher() - - err := task.Execute(context.Background()) - assert.NoError(t, err) - time.Sleep(time.Second) - select { - case pack := <-packChan: - assert.Equal(t, commonpb.MsgType_Replicate, pack.Msgs[0].GetType()) - - tsMsg, err := pack.Msgs[0].Unmarshal(unmarshalDispatcher) - require.NoError(t, err) - replicateMsg := tsMsg.(*msgstream.ReplicateMsg) - assert.Equal(t, "cn", replicateMsg.ReplicateMsg.GetDatabase()) - assert.True(t, replicateMsg.ReplicateMsg.GetIsEnd()) - default: - assert.Fail(t, "no message sent") - } - }) - - t.Run("test update collection props", func(t *testing.T) { - oldProps := []*commonpb.KeyValuePair{ - { - Key: common.CollectionTTLConfigKey, - Value: "1", - }, - } - - updateProps1 := []*commonpb.KeyValuePair{ - { - Key: common.CollectionAutoCompactionKey, - Value: "true", - }, - } - - ret := MergeProperties(oldProps, updateProps1) - - assert.Contains(t, ret, &commonpb.KeyValuePair{ - Key: common.CollectionTTLConfigKey, - Value: "1", - }) - - assert.Contains(t, ret, &commonpb.KeyValuePair{ - Key: common.CollectionAutoCompactionKey, - Value: "true", - }) - - updateProps2 := []*commonpb.KeyValuePair{ - { - Key: common.CollectionTTLConfigKey, - Value: "2", - }, - } - ret2 := MergeProperties(ret, updateProps2) - - assert.Contains(t, ret2, &commonpb.KeyValuePair{ - Key: common.CollectionTTLConfigKey, - Value: "2", - }) - - assert.Contains(t, ret2, &commonpb.KeyValuePair{ - Key: common.CollectionAutoCompactionKey, - Value: "true", - }) - }) - - t.Run("test delete collection props", func(t *testing.T) { - oldProps := []*commonpb.KeyValuePair{ - { - Key: common.CollectionTTLConfigKey, - Value: "1", - }, - } - - deleteKeys := []string{ - common.CollectionAutoCompactionKey, - } - - ret := DeleteProperties(oldProps, deleteKeys) - - assert.Contains(t, ret, &commonpb.KeyValuePair{ - Key: common.CollectionTTLConfigKey, - Value: "1", - }) - - oldProps2 := []*commonpb.KeyValuePair{ - { - Key: common.CollectionTTLConfigKey, - Value: "1", - }, - } - - deleteKeys2 := []string{ - common.CollectionTTLConfigKey, - } - - ret2 := DeleteProperties(oldProps2, deleteKeys2) - - assert.Empty(t, ret2) - }) -} - -func TestMergeProperties(t *testing.T) { - p := MergeProperties([]*commonpb.KeyValuePair{ - { - Key: common.ReplicateIDKey, - Value: "local-test", - }, - { - Key: "foo", - Value: "xxx", - }, - }, []*commonpb.KeyValuePair{ - { - Key: common.ReplicateEndTSKey, - Value: "1001", - }, - }) - assert.Len(t, p, 3) - m := funcutil.KeyValuePair2Map(p) - assert.Equal(t, "", m[common.ReplicateIDKey]) - assert.Equal(t, "1001", m[common.ReplicateEndTSKey]) - assert.Equal(t, "xxx", m["foo"]) -} diff --git a/internal/rootcoord/create_db_task.go b/internal/rootcoord/create_db_task.go deleted file mode 100644 index 7e64ce7c3e..0000000000 --- a/internal/rootcoord/create_db_task.go +++ /dev/null @@ -1,68 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rootcoord - -import ( - "context" - - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/milvus-io/milvus/internal/metastore/model" - "github.com/milvus-io/milvus/internal/util/hookutil" - "github.com/milvus-io/milvus/pkg/v2/proto/etcdpb" - "github.com/milvus-io/milvus/pkg/v2/util/merr" -) - -type createDatabaseTask struct { - baseTask - Req *milvuspb.CreateDatabaseRequest - dbID UniqueID -} - -func (t *createDatabaseTask) Prepare(ctx context.Context) error { - dbs, err := t.core.meta.ListDatabases(ctx, t.GetTs()) - if err != nil { - return err - } - - cfgMaxDatabaseNum := Params.RootCoordCfg.MaxDatabaseNum.GetAsInt() - if len(dbs) >= cfgMaxDatabaseNum { - return merr.WrapErrDatabaseNumLimitExceeded(cfgMaxDatabaseNum) - } - - t.dbID, err = t.core.idAllocator.AllocOne() - if err != nil { - return err - } - - // Use dbID as ezID because the dbID is unqiue - properties, err := hookutil.TidyDBCipherProperties(t.dbID, t.Req.Properties) - if err != nil { - return err - } - - t.Req.Properties = properties - return nil -} - -func (t *createDatabaseTask) Execute(ctx context.Context) error { - db := model.NewDatabase(t.dbID, t.Req.GetDbName(), etcdpb.DatabaseState_DatabaseCreated, t.Req.GetProperties()) - return t.core.meta.CreateDatabase(ctx, db, t.GetTs()) -} - -func (t *createDatabaseTask) GetLockerKey() LockerKey { - return NewLockerKeyChain(NewClusterLockerKey(true)) -} diff --git a/internal/rootcoord/create_db_task_test.go b/internal/rootcoord/create_db_task_test.go deleted file mode 100644 index 1507d53bf8..0000000000 --- a/internal/rootcoord/create_db_task_test.go +++ /dev/null @@ -1,124 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rootcoord - -import ( - "context" - "strconv" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/milvus-io/milvus/internal/metastore/model" - mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" - "github.com/milvus-io/milvus/pkg/v2/util/merr" - "github.com/milvus-io/milvus/pkg/v2/util/paramtable" -) - -func Test_CreateDBTask_Prepare(t *testing.T) { - paramtable.Init() - t.Run("list database fail", func(t *testing.T) { - core := newTestCore(withInvalidMeta()) - task := &createDatabaseTask{ - baseTask: newBaseTask(context.TODO(), core), - Req: &milvuspb.CreateDatabaseRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_CreateDatabase, - }, - DbName: "db", - }, - } - err := task.Prepare(context.Background()) - assert.Error(t, err) - }) - - t.Run("check database number fail", func(t *testing.T) { - meta := mockrootcoord.NewIMetaTable(t) - cfgMaxDatabaseNum := Params.RootCoordCfg.MaxDatabaseNum.GetAsInt() - dbs := make([]*model.Database, 0, cfgMaxDatabaseNum) - for i := 0; i < cfgMaxDatabaseNum; i++ { - dbs = append(dbs, model.NewDefaultDatabase(nil)) - } - meta.On("ListDatabases", - mock.Anything, - mock.Anything). - Return(dbs, nil) - - core := newTestCore(withMeta(meta)) - - task := &createDatabaseTask{ - baseTask: newBaseTask(context.TODO(), core), - Req: &milvuspb.CreateDatabaseRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_CreateDatabase, - }, - DbName: "db", - }, - } - err := task.Prepare(context.Background()) - assert.ErrorIs(t, err, merr.ErrDatabaseNumLimitExceeded) - }) - - t.Run("ok", func(t *testing.T) { - meta := mockrootcoord.NewIMetaTable(t) - meta.On("ListDatabases", - mock.Anything, - mock.Anything). - Return([]*model.Database{model.NewDefaultDatabase(nil)}, nil) - - core := newTestCore(withMeta(meta), withValidIDAllocator()) - paramtable.Get().Save(Params.RootCoordCfg.MaxDatabaseNum.Key, strconv.Itoa(10)) - defer paramtable.Get().Reset(Params.RootCoordCfg.MaxDatabaseNum.Key) - - task := &createDatabaseTask{ - baseTask: newBaseTask(context.TODO(), core), - Req: &milvuspb.CreateDatabaseRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_CreateDatabase, - }, - DbName: "db", - }, - } - err := task.Prepare(context.Background()) - assert.NoError(t, err) - }) -} - -func Test_CreateDBTask_Execute(t *testing.T) { - meta := mockrootcoord.NewIMetaTable(t) - meta.On("CreateDatabase", - mock.Anything, - mock.Anything, - mock.Anything). - Return(nil) - - core := newTestCore(withMeta(meta)) - task := &createDatabaseTask{ - baseTask: newBaseTask(context.TODO(), core), - Req: &milvuspb.CreateDatabaseRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_CreateDatabase, - }, - DbName: "db", - }, - } - err := task.Execute(context.Background()) - assert.NoError(t, err) -} diff --git a/internal/rootcoord/ddl_callbacks.go b/internal/rootcoord/ddl_callbacks.go index b5d23dd803..286a72b380 100644 --- a/internal/rootcoord/ddl_callbacks.go +++ b/internal/rootcoord/ddl_callbacks.go @@ -36,8 +36,8 @@ func RegisterDDLCallbacks(core *Core) { ddlCallback := &DDLCallback{ Core: core, } - // RBAC ddlCallback.registerRBACCallbacks() + ddlCallback.registerDatabaseCallbacks() } // registerRBACCallbacks registers the rbac callbacks. @@ -55,6 +55,13 @@ func (c *DDLCallback) registerRBACCallbacks() { registry.RegisterRestoreRBACV2AckCallback(c.restoreRBACV2AckCallback) } +// registerDatabaseCallbacks registers the database callbacks. +func (c *DDLCallback) registerDatabaseCallbacks() { + registry.RegisterCreateDatabaseV2AckCallback(c.createDatabaseV1AckCallback) + registry.RegisterAlterDatabaseV2AckCallback(c.alterDatabaseV1AckCallback) + registry.RegisterDropDatabaseV2AckCallback(c.dropDatabaseV1AckCallback) +} + // DDLCallback is the callback of ddl. type DDLCallback struct { *Core @@ -102,3 +109,12 @@ func startBroadcastWithRBACLock(ctx context.Context) (broadcaster.BroadcastAPI, } return api, nil } + +// startBroadcastWithDatabaseLock starts a broadcast with database lock. +func startBroadcastWithDatabaseLock(ctx context.Context, dbName string) (broadcaster.BroadcastAPI, error) { + broadcaster, err := broadcast.StartBroadcastWithResourceKeys(ctx, message.NewExclusiveDBNameResourceKey(dbName)) + if err != nil { + return nil, errors.Wrap(err, "failed to start broadcast with database lock") + } + return broadcaster, nil +} diff --git a/internal/rootcoord/ddl_callbacks_alter_database.go b/internal/rootcoord/ddl_callbacks_alter_database.go new file mode 100644 index 0000000000..637afdd8b2 --- /dev/null +++ b/internal/rootcoord/ddl_callbacks_alter_database.go @@ -0,0 +1,180 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rootcoord + +import ( + "context" + "strings" + + "github.com/cockroachdb/errors" + "github.com/samber/lo" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/distributed/streaming" + "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/util/hookutil" + "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/proto/etcdpb" + "github.com/milvus-io/milvus/pkg/v2/proto/querypb" + "github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/message/ce" + "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" +) + +func (c *Core) broadcastAlterDatabase(ctx context.Context, req *rootcoordpb.AlterDatabaseRequest) error { + req.DbName = strings.TrimSpace(req.DbName) + if req.GetProperties() == nil && req.GetDeleteKeys() == nil { + return errors.New("alter database with empty properties and delete keys, expected to set either properties or delete keys") + } + + if len(req.GetProperties()) > 0 && len(req.GetDeleteKeys()) > 0 { + return errors.New("alter database cannot modify properties and delete keys at the same time") + } + + if hookutil.ContainsCipherProperties(req.GetProperties(), req.GetDeleteKeys()) { + return errors.New("can not alter cipher related properties") + } + + broadcaster, err := startBroadcastWithDatabaseLock(ctx, req.GetDbName()) + if err != nil { + return err + } + defer broadcaster.Close() + + oldDB, err := c.meta.GetDatabaseByName(ctx, req.GetDbName(), typeutil.MaxTimestamp) + if err != nil { + return errors.Wrap(err, "failed to get database by name") + } + alterLoadConfig, err := c.getAlterLoadConfigOfAlterDatabase(ctx, req.GetDbName(), oldDB.Properties, req.GetProperties()) + if err != nil { + return errors.Wrap(err, "failed to get alter load config of alter database") + } + + // We only allow to alter or delete properties, not both. + var newProperties []*commonpb.KeyValuePair + if (len(req.GetProperties())) > 0 { + if IsSubsetOfProperties(req.GetProperties(), oldDB.Properties) { + log.Info("skip to alter database due to no changes were detected in the properties") + return nil + } + newProperties = MergeProperties(oldDB.Properties, req.GetProperties()) + } else if (len(req.GetDeleteKeys())) > 0 { + newProperties = DeleteProperties(oldDB.Properties, req.GetDeleteKeys()) + } + + msg := message.NewAlterDatabaseMessageBuilderV2(). + WithHeader(&message.AlterDatabaseMessageHeader{ + DbName: req.GetDbName(), + DbId: oldDB.ID, + }). + WithBody(&message.AlterDatabaseMessageBody{ + Properties: newProperties, + AlterLoadConfig: alterLoadConfig, + }). + WithBroadcast([]string{streaming.WAL().ControlChannel()}). + MustBuildBroadcast() + _, err = broadcaster.Broadcast(ctx, msg) + return err +} + +// getAlterLoadConfigOfAlterDatabase gets the put load config of put database. +func (c *Core) getAlterLoadConfigOfAlterDatabase(ctx context.Context, dbName string, oldProps []*commonpb.KeyValuePair, newProps []*commonpb.KeyValuePair) (*message.AlterLoadConfigOfAlterDatabase, error) { + oldReplicaNumber, _ := common.DatabaseLevelReplicaNumber(oldProps) + oldResourceGroups, _ := common.DatabaseLevelResourceGroups(oldProps) + newReplicaNumber, _ := common.DatabaseLevelReplicaNumber(newProps) + newResourceGroups, _ := common.DatabaseLevelResourceGroups(newProps) + left, right := lo.Difference(oldResourceGroups, newResourceGroups) + rgChanged := len(left) > 0 || len(right) > 0 + replicaChanged := oldReplicaNumber != newReplicaNumber + if !rgChanged && !replicaChanged { + return nil, nil + } + + colls, err := c.meta.ListCollections(ctx, dbName, typeutil.MaxTimestamp, true) + if err != nil { + return nil, err + } + if len(colls) == 0 { + return nil, nil + } + return &message.AlterLoadConfigOfAlterDatabase{ + CollectionIds: lo.Map(colls, func(coll *model.Collection, _ int) int64 { return coll.CollectionID }), + ReplicaNumber: int32(newReplicaNumber), + ResourceGroups: newResourceGroups, + }, nil +} + +func (c *DDLCallback) alterDatabaseV1AckCallback(ctx context.Context, result message.BroadcastResultAlterDatabaseMessageV2) error { + header := result.Message.Header() + body := result.Message.MustBody() + + db := model.NewDatabase(header.DbId, header.DbName, etcdpb.DatabaseState_DatabaseCreated, result.Message.MustBody().Properties) + if err := c.meta.AlterDatabase(ctx, db, result.GetControlChannelResult().TimeTick); err != nil { + return errors.Wrap(err, "failed to alter database") + } + if err := c.ExpireCaches(ctx, ce.NewBuilder(). + WithLegacyProxyCollectionMetaCache( + ce.OptLPCMDBName(header.DbName), + ce.OptLPCMMsgType(commonpb.MsgType_AlterDatabase), + ), + result.GetControlChannelResult().TimeTick); err != nil { + return errors.Wrap(err, "failed to expire caches") + } + if body.AlterLoadConfig != nil { + // TODO: should replaced with calling AlterLoadConfig message ack callback. + resp, err := c.mixCoord.UpdateLoadConfig(ctx, &querypb.UpdateLoadConfigRequest{ + CollectionIDs: body.AlterLoadConfig.CollectionIds, + ReplicaNumber: body.AlterLoadConfig.ReplicaNumber, + ResourceGroups: body.AlterLoadConfig.ResourceGroups, + }) + return merr.CheckRPCCall(resp, err) + } + return nil +} + +func MergeProperties(oldProps, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair { + _, existEndTS := common.GetReplicateEndTS(updatedProps) + if existEndTS { + updatedProps = append(updatedProps, &commonpb.KeyValuePair{ + Key: common.ReplicateIDKey, + Value: "", + }) + } + + props := make(map[string]string) + for _, prop := range oldProps { + props[prop.Key] = prop.Value + } + + for _, prop := range updatedProps { + props[prop.Key] = prop.Value + } + + propKV := make([]*commonpb.KeyValuePair, 0) + + for key, value := range props { + propKV = append(propKV, &commonpb.KeyValuePair{ + Key: key, + Value: value, + }) + } + + return propKV +} diff --git a/internal/rootcoord/ddl_callbacks_create_database.go b/internal/rootcoord/ddl_callbacks_create_database.go new file mode 100644 index 0000000000..2f46efcf0f --- /dev/null +++ b/internal/rootcoord/ddl_callbacks_create_database.go @@ -0,0 +1,84 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rootcoord + +import ( + "context" + "strings" + + "github.com/cockroachdb/errors" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/distributed/streaming" + "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/util/hookutil" + "github.com/milvus-io/milvus/pkg/v2/proto/etcdpb" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/message/ce" +) + +func (c *Core) broadcastCreateDatabase(ctx context.Context, req *milvuspb.CreateDatabaseRequest) error { + req.DbName = strings.TrimSpace(req.DbName) + broadcaster, err := startBroadcastWithDatabaseLock(ctx, req.DbName) + if err != nil { + return err + } + defer broadcaster.Close() + + if err := c.meta.CheckIfDatabaseCreatable(ctx, req); err != nil { + return err + } + + dbID, err := c.idAllocator.AllocOne() + if err != nil { + return errors.Wrap(err, "failed to allocate database id") + } + + // Use dbID as ezID because the dbID is unqiue + properties, err := hookutil.TidyDBCipherProperties(dbID, req.GetProperties()) + if err != nil { + return errors.Wrap(err, "failed to tidy database cipher properties") + } + + msg := message.NewCreateDatabaseMessageBuilderV2(). + WithHeader(&message.CreateDatabaseMessageHeader{ + DbName: req.GetDbName(), + DbId: dbID, + }). + WithBody(&message.CreateDatabaseMessageBody{ + Properties: properties, + }). + WithBroadcast([]string{streaming.WAL().ControlChannel()}). + MustBuildBroadcast() + _, err = broadcaster.Broadcast(ctx, msg) + return err +} + +func (c *DDLCallback) createDatabaseV1AckCallback(ctx context.Context, result message.BroadcastResultCreateDatabaseMessageV2) error { + header := result.Message.Header() + db := model.NewDatabase(header.DbId, header.DbName, etcdpb.DatabaseState_DatabaseCreated, result.Message.MustBody().Properties) + if err := c.meta.CreateDatabase(ctx, db, result.GetControlChannelResult().TimeTick); err != nil { + return errors.Wrap(err, "failed to create database") + } + return c.ExpireCaches(ctx, ce.NewBuilder(). + WithLegacyProxyCollectionMetaCache( + ce.OptLPCMDBName(header.DbName), + ce.OptLPCMMsgType(commonpb.MsgType_DropDatabase), + ), + result.GetControlChannelResult().TimeTick) +} diff --git a/internal/rootcoord/ddl_callbacks_database_test.go b/internal/rootcoord/ddl_callbacks_database_test.go new file mode 100644 index 0000000000..5310c5f990 --- /dev/null +++ b/internal/rootcoord/ddl_callbacks_database_test.go @@ -0,0 +1,109 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rootcoord + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord" + "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry" + kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" + "github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb" + "github.com/milvus-io/milvus/pkg/v2/util/funcutil" + "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" +) + +func TestDDLCallbacksDatabaseDDL(t *testing.T) { + initStreamingSystem() + + kv, _ := kvfactory.GetEtcdAndPath() + path := funcutil.RandomString(10) + catalogKV := etcdkv.NewEtcdKV(kv, path) + + ss, err := rootcoord.NewSuffixSnapshot(catalogKV, rootcoord.SnapshotsSep, path, rootcoord.SnapshotPrefix) + require.NoError(t, err) + core := newTestCore(withHealthyCode(), + withMeta(&MetaTable{ + catalog: rootcoord.NewCatalog(catalogKV, ss), + names: newNameDb(), + aliases: newNameDb(), + dbName2Meta: make(map[string]*model.Database), + }), + withValidProxyManager(), + withValidIDAllocator(), + ) + registry.ResetRegistration() + RegisterDDLCallbacks(core) + + // Create a new database + status, err := core.CreateDatabase(context.Background(), &milvuspb.CreateDatabaseRequest{ + DbName: "test", + }) + require.NoError(t, merr.CheckRPCCall(status, err)) + db, err := core.meta.GetDatabaseByName(context.Background(), "test", typeutil.MaxTimestamp) + require.NoError(t, merr.CheckRPCCall(status, err)) + require.Equal(t, db.Name, "test") + require.Empty(t, db.Properties) + + // Alter a database to add properties + status, err = core.AlterDatabase(context.Background(), &rootcoordpb.AlterDatabaseRequest{ + DbName: "test", + Properties: []*commonpb.KeyValuePair{ + { + Key: "key", + Value: "value", + }, + { + Key: "key2", + Value: "value2", + }, + }, + }) + require.NoError(t, merr.CheckRPCCall(status, err)) + db, err = core.meta.GetDatabaseByName(context.Background(), "test", typeutil.MaxTimestamp) + require.NoError(t, merr.CheckRPCCall(status, err)) + require.Equal(t, db.Name, "test") + require.Len(t, db.Properties, 2) + + // Drop a property + status, err = core.AlterDatabase(context.Background(), &rootcoordpb.AlterDatabaseRequest{ + DbName: "test", + DeleteKeys: []string{"key"}, + }) + require.NoError(t, merr.CheckRPCCall(status, err)) + db, err = core.meta.GetDatabaseByName(context.Background(), "test", typeutil.MaxTimestamp) + require.NoError(t, merr.CheckRPCCall(status, err)) + require.Equal(t, db.Name, "test") + require.Len(t, db.Properties, 1) + + // Drop a database + status, err = core.DropDatabase(context.Background(), &milvuspb.DropDatabaseRequest{ + DbName: "test", + }) + require.NoError(t, merr.CheckRPCCall(status, err)) + db, err = core.meta.GetDatabaseByName(context.Background(), "test", typeutil.MaxTimestamp) + require.Error(t, err) + require.Nil(t, db) +} diff --git a/internal/rootcoord/ddl_callbacks_drop_database.go b/internal/rootcoord/ddl_callbacks_drop_database.go new file mode 100644 index 0000000000..cf769f2315 --- /dev/null +++ b/internal/rootcoord/ddl_callbacks_drop_database.go @@ -0,0 +1,73 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rootcoord + +import ( + "context" + "strings" + + "github.com/cockroachdb/errors" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/distributed/streaming" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/message/ce" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" +) + +func (c *Core) broadcastDropDatabase(ctx context.Context, req *milvuspb.DropDatabaseRequest) error { + req.DbName = strings.TrimSpace(req.DbName) + broadcaster, err := startBroadcastWithDatabaseLock(ctx, req.GetDbName()) + if err != nil { + return err + } + defer broadcaster.Close() + + if err := c.meta.CheckIfDatabaseDroppable(ctx, req); err != nil { + return err + } + + db, err := c.meta.GetDatabaseByName(ctx, req.GetDbName(), typeutil.MaxTimestamp) + if err != nil { + return errors.Wrap(err, "failed to get database name") + } + + msg := message.NewDropDatabaseMessageBuilderV2(). + WithHeader(&message.DropDatabaseMessageHeader{ + DbName: req.GetDbName(), + DbId: db.ID, + }). + WithBody(&message.DropDatabaseMessageBody{}). + WithBroadcast([]string{streaming.WAL().ControlChannel()}). + MustBuildBroadcast() + _, err = broadcaster.Broadcast(ctx, msg) + return err +} + +func (c *DDLCallback) dropDatabaseV1AckCallback(ctx context.Context, result message.BroadcastResultDropDatabaseMessageV2) error { + header := result.Message.Header() + if err := c.meta.DropDatabase(ctx, header.DbName, result.GetControlChannelResult().TimeTick); err != nil { + return errors.Wrap(err, "failed to drop database") + } + return c.ExpireCaches(ctx, ce.NewBuilder(). + WithLegacyProxyCollectionMetaCache( + ce.OptLPCMDBName(header.DbName), + ce.OptLPCMMsgType(commonpb.MsgType_DropDatabase), + ), + result.GetControlChannelResult().TimeTick) +} diff --git a/internal/rootcoord/drop_db_task.go b/internal/rootcoord/drop_db_task.go deleted file mode 100644 index cd6832b769..0000000000 --- a/internal/rootcoord/drop_db_task.go +++ /dev/null @@ -1,75 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rootcoord - -import ( - "context" - - "github.com/cockroachdb/errors" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/milvus-io/milvus/internal/util/proxyutil" - "github.com/milvus-io/milvus/pkg/v2/util" -) - -type dropDatabaseTask struct { - baseTask - Req *milvuspb.DropDatabaseRequest -} - -func (t *dropDatabaseTask) Prepare(ctx context.Context) error { - if t.Req.GetDbName() == util.DefaultDBName { - return errors.New("can not drop default database") - } - return nil -} - -func (t *dropDatabaseTask) Execute(ctx context.Context) error { - dbName := t.Req.GetDbName() - ts := t.GetTs() - return executeDropDatabaseTaskSteps(ctx, t.core, dbName, ts) -} - -func (t *dropDatabaseTask) GetLockerKey() LockerKey { - return NewLockerKeyChain(NewClusterLockerKey(true)) -} - -func executeDropDatabaseTaskSteps(ctx context.Context, - core *Core, - dbName string, - ts Timestamp, -) error { - redoTask := newBaseRedoTask(core.stepExecutor) - redoTask.AddSyncStep(&deleteDatabaseMetaStep{ - baseStep: baseStep{core: core}, - databaseName: dbName, - ts: ts, - }) - redoTask.AddSyncStep(&expireCacheStep{ - baseStep: baseStep{core: core}, - dbName: dbName, - ts: ts, - // make sure to send the "expire cache" request - // because it won't send this request when the length of collection names array is zero - collectionNames: []string{""}, - opts: []proxyutil.ExpireCacheOpt{ - proxyutil.SetMsgType(commonpb.MsgType_DropDatabase), - }, - }) - return redoTask.Execute(ctx) -} diff --git a/internal/rootcoord/drop_db_task_test.go b/internal/rootcoord/drop_db_task_test.go deleted file mode 100644 index 42f96f9a40..0000000000 --- a/internal/rootcoord/drop_db_task_test.go +++ /dev/null @@ -1,98 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rootcoord - -import ( - "context" - "testing" - - "github.com/cockroachdb/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" - "github.com/milvus-io/milvus/pkg/v2/util" -) - -func Test_DropDBTask(t *testing.T) { - t.Run("normal", func(t *testing.T) { - meta := mockrootcoord.NewIMetaTable(t) - meta.On("DropDatabase", - mock.Anything, - mock.Anything, - mock.Anything). - Return(nil) - - core := newTestCore(withMeta(meta), withValidProxyManager()) - task := &dropDatabaseTask{ - baseTask: newBaseTask(context.TODO(), core), - Req: &milvuspb.DropDatabaseRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_DropDatabase, - }, - DbName: "db", - }, - } - - err := task.Prepare(context.Background()) - assert.NoError(t, err) - - err = task.Execute(context.Background()) - assert.NoError(t, err) - }) - - t.Run("default db", func(t *testing.T) { - meta := mockrootcoord.NewIMetaTable(t) - core := newTestCore(withMeta(meta)) - task := &dropDatabaseTask{ - baseTask: newBaseTask(context.TODO(), core), - Req: &milvuspb.DropDatabaseRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_DropDatabase, - }, - DbName: util.DefaultDBName, - }, - } - err := task.Prepare(context.Background()) - assert.Error(t, err) - }) - - t.Run("drop db fail", func(t *testing.T) { - meta := mockrootcoord.NewIMetaTable(t) - meta.EXPECT().DropDatabase( - mock.Anything, - mock.Anything, - mock.Anything). - Return(errors.New("mock drop db error")) - - core := newTestCore(withMeta(meta)) - task := &dropDatabaseTask{ - baseTask: newBaseTask(context.TODO(), core), - Req: &milvuspb.DropDatabaseRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_DropDatabase, - }, - DbName: "db", - }, - } - - err := task.Execute(context.Background()) - assert.Error(t, err) - }) -} diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 4f54e81354..7211e35b2b 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -52,6 +52,9 @@ import ( type MetaTableChecker interface { RBACChecker + + CheckIfDatabaseCreatable(ctx context.Context, req *milvuspb.CreateDatabaseRequest) error + CheckIfDatabaseDroppable(ctx context.Context, req *milvuspb.DropDatabaseRequest) error } //go:generate mockery --name=IMetaTable --structname=MockIMetaTable --output=./ --filename=mock_meta_table.go --with-expecter --inpackage @@ -63,7 +66,7 @@ type IMetaTable interface { CreateDatabase(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error DropDatabase(ctx context.Context, dbName string, ts typeutil.Timestamp) error ListDatabases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Database, error) - AlterDatabase(ctx context.Context, oldDB *model.Database, newDB *model.Database, ts typeutil.Timestamp) error + AlterDatabase(ctx context.Context, newDB *model.Database, ts typeutil.Timestamp) error AddCollection(ctx context.Context, coll *model.Collection) error ChangeCollectionState(ctx context.Context, collectionID UniqueID, state pb.CollectionState, ts Timestamp) error @@ -325,6 +328,23 @@ func (mt *MetaTable) createDefaultDb() error { return mt.createDatabasePrivate(mt.ctx, model.NewDefaultDatabase(defaultProperties), ts) } +func (mt *MetaTable) CheckIfDatabaseCreatable(ctx context.Context, req *milvuspb.CreateDatabaseRequest) error { + dbName := req.GetDbName() + + mt.ddLock.RLock() + defer mt.ddLock.RUnlock() + + if _, ok := mt.dbName2Meta[dbName]; ok || mt.aliases.exist(dbName) || mt.names.exist(dbName) { + return fmt.Errorf("database already exist: %s", dbName) + } + + cfgMaxDatabaseNum := Params.RootCoordCfg.MaxDatabaseNum.GetAsInt() + if len(mt.dbName2Meta) > cfgMaxDatabaseNum { // not include default database so use > instead of >= here. + return merr.WrapErrDatabaseNumLimitExceeded(cfgMaxDatabaseNum) + } + return nil +} + func (mt *MetaTable) CreateDatabase(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() @@ -338,10 +358,6 @@ func (mt *MetaTable) CreateDatabase(ctx context.Context, db *model.Database, ts func (mt *MetaTable) createDatabasePrivate(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error { dbName := db.Name - if mt.names.exist(dbName) || mt.aliases.exist(dbName) { - return fmt.Errorf("database already exist: %s", dbName) - } - if err := mt.catalog.CreateDatabase(ctx, db, ts); err != nil { return err } @@ -359,35 +375,31 @@ func (mt *MetaTable) createDatabasePrivate(ctx context.Context, db *model.Databa return nil } -func (mt *MetaTable) AlterDatabase(ctx context.Context, oldDB *model.Database, newDB *model.Database, ts typeutil.Timestamp) error { +func (mt *MetaTable) AlterDatabase(ctx context.Context, newDB *model.Database, ts typeutil.Timestamp) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() - if oldDB.Name != newDB.Name || oldDB.ID != newDB.ID || oldDB.State != newDB.State { - return errors.New("alter database name/id is not supported!") - } - ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue()) if err := mt.catalog.AlterDatabase(ctx1, newDB, ts); err != nil { return err } - mt.dbName2Meta[oldDB.Name] = newDB - log.Ctx(ctx).Info("alter database finished", zap.String("dbName", oldDB.Name), zap.Uint64("ts", ts)) + mt.dbName2Meta[newDB.Name] = newDB + log.Ctx(ctx).Info("alter database finished", zap.String("dbName", newDB.Name), zap.Uint64("ts", ts)) return nil } -func (mt *MetaTable) DropDatabase(ctx context.Context, dbName string, ts typeutil.Timestamp) error { - mt.ddLock.Lock() - defer mt.ddLock.Unlock() +func (mt *MetaTable) CheckIfDatabaseDroppable(ctx context.Context, req *milvuspb.DropDatabaseRequest) error { + dbName := req.GetDbName() + mt.ddLock.RLock() + defer mt.ddLock.RUnlock() if dbName == util.DefaultDBName { return errors.New("can not drop default database") } - db, err := mt.getDatabaseByNameInternal(ctx, dbName, typeutil.MaxTimestamp) - if err != nil { + if _, err := mt.getDatabaseByNameInternal(ctx, dbName, typeutil.MaxTimestamp); err != nil { log.Ctx(ctx).Warn("not found database", zap.String("db", dbName)) - return nil + return err } colls, err := mt.listCollectionFromCache(ctx, dbName, true) @@ -397,7 +409,18 @@ func (mt *MetaTable) DropDatabase(ctx context.Context, dbName string, ts typeuti if len(colls) > 0 { return fmt.Errorf("database:%s not empty, must drop all collections before drop database", dbName) } + return nil +} +func (mt *MetaTable) DropDatabase(ctx context.Context, dbName string, ts typeutil.Timestamp) error { + mt.ddLock.Lock() + defer mt.ddLock.Unlock() + + db, err := mt.getDatabaseByNameInternal(ctx, dbName, typeutil.MaxTimestamp) + if err != nil { + log.Ctx(ctx).Warn("not found database", zap.String("db", dbName)) + return nil + } if err := mt.catalog.DropDatabase(ctx, db.ID, ts); err != nil { return err } diff --git a/internal/rootcoord/meta_table_test.go b/internal/rootcoord/meta_table_test.go index 0bd824b4c3..07d5ac58dd 100644 --- a/internal/rootcoord/meta_table_test.go +++ b/internal/rootcoord/meta_table_test.go @@ -1944,10 +1944,15 @@ func TestMetaTable_CreateDatabase(t *testing.T) { db := model.NewDatabase(1, "exist", pb.DatabaseState_DatabaseCreated, nil) t.Run("database already exist", func(t *testing.T) { meta := &MetaTable{ - names: newNameDb(), + names: newNameDb(), + aliases: newNameDb(), + dbName2Meta: make(map[string]*model.Database), } meta.names.insert("exist", "collection", 100) - err := meta.CreateDatabase(context.TODO(), db, 10000) + + err := meta.CheckIfDatabaseCreatable(context.TODO(), &milvuspb.CreateDatabaseRequest{ + DbName: "exist", + }) assert.Error(t, err) }) @@ -1975,14 +1980,16 @@ func TestMetaTable_CreateDatabase(t *testing.T) { mock.Anything, ).Return(nil) meta := &MetaTable{ - dbName2Meta: map[string]*model.Database{ - "exist": db, - }, - names: newNameDb(), - aliases: newNameDb(), - catalog: catalog, + dbName2Meta: make(map[string]*model.Database), + names: newNameDb(), + aliases: newNameDb(), + catalog: catalog, } - err := meta.CreateDatabase(context.TODO(), db, 10000) + err := meta.CheckIfDatabaseCreatable(context.TODO(), &milvuspb.CreateDatabaseRequest{ + DbName: "exist", + }) + assert.NoError(t, err) + err = meta.CreateDatabase(context.TODO(), db, 10000) assert.NoError(t, err) assert.True(t, meta.names.exist("exist")) assert.True(t, meta.aliases.exist("exist")) @@ -2017,7 +2024,7 @@ func TestAlterDatabase(t *testing.T) { Value: "value1", }, } - err := meta.AlterDatabase(context.TODO(), db, newDB, typeutil.ZeroTimestamp) + err := meta.AlterDatabase(context.TODO(), newDB, typeutil.ZeroTimestamp) assert.NoError(t, err) }) @@ -2047,33 +2054,9 @@ func TestAlterDatabase(t *testing.T) { Value: "value1", }, } - err := meta.AlterDatabase(context.TODO(), db, newDB, typeutil.ZeroTimestamp) + err := meta.AlterDatabase(context.TODO(), newDB, typeutil.ZeroTimestamp) assert.ErrorIs(t, err, mockErr) }) - - t.Run("alter database name", func(t *testing.T) { - catalog := mocks.NewRootCoordCatalog(t) - db := model.NewDatabase(1, "db1", pb.DatabaseState_DatabaseCreated, nil) - - meta := &MetaTable{ - dbName2Meta: map[string]*model.Database{ - "db1": db, - }, - names: newNameDb(), - aliases: newNameDb(), - catalog: catalog, - } - newDB := db.Clone() - newDB.Name = "db2" - db.Properties = []*commonpb.KeyValuePair{ - { - Key: "key1", - Value: "value1", - }, - } - err := meta.AlterDatabase(context.TODO(), db, newDB, typeutil.ZeroTimestamp) - assert.Error(t, err) - }) } func TestMetaTable_EmtpyDatabaseName(t *testing.T) { @@ -2171,7 +2154,9 @@ func TestMetaTable_EmtpyDatabaseName(t *testing.T) { func TestMetaTable_DropDatabase(t *testing.T) { t.Run("can't drop default database", func(t *testing.T) { mt := &MetaTable{} - err := mt.DropDatabase(context.TODO(), "default", 10000) + err := mt.CheckIfDatabaseDroppable(context.TODO(), &milvuspb.DropDatabaseRequest{ + DbName: util.DefaultDBName, + }) assert.Error(t, err) }) @@ -2180,8 +2165,10 @@ func TestMetaTable_DropDatabase(t *testing.T) { names: newNameDb(), aliases: newNameDb(), } - err := mt.DropDatabase(context.TODO(), "not_exist", 10000) - assert.NoError(t, err) + err := mt.CheckIfDatabaseDroppable(context.TODO(), &milvuspb.DropDatabaseRequest{ + DbName: "not_exist", + }) + assert.True(t, errors.Is(err, merr.ErrDatabaseNotFound)) }) t.Run("database not empty", func(t *testing.T) { @@ -2200,7 +2187,9 @@ func TestMetaTable_DropDatabase(t *testing.T) { }, } mt.names.insert("not_empty", "collection", 10000000) - err := mt.DropDatabase(context.TODO(), "not_empty", 10000) + err := mt.CheckIfDatabaseDroppable(context.TODO(), &milvuspb.DropDatabaseRequest{ + DbName: "not_empty", + }) assert.Error(t, err) }) @@ -2221,7 +2210,11 @@ func TestMetaTable_DropDatabase(t *testing.T) { } mt.names.createDbIfNotExist("not_commit") mt.aliases.createDbIfNotExist("not_commit") - err := mt.DropDatabase(context.TODO(), "not_commit", 10000) + err := mt.CheckIfDatabaseDroppable(context.TODO(), &milvuspb.DropDatabaseRequest{ + DbName: "not_commit", + }) + assert.NoError(t, err) + err = mt.DropDatabase(context.TODO(), "not_commit", 10000) assert.Error(t, err) }) @@ -2242,7 +2235,11 @@ func TestMetaTable_DropDatabase(t *testing.T) { } mt.names.createDbIfNotExist("not_commit") mt.aliases.createDbIfNotExist("not_commit") - err := mt.DropDatabase(context.TODO(), "not_commit", 10000) + err := mt.CheckIfDatabaseDroppable(context.TODO(), &milvuspb.DropDatabaseRequest{ + DbName: "not_commit", + }) + assert.NoError(t, err) + err = mt.DropDatabase(context.TODO(), "not_commit", 10000) assert.NoError(t, err) assert.False(t, mt.names.exist("not_commit")) assert.False(t, mt.aliases.exist("not_commit")) diff --git a/internal/rootcoord/mocks/meta_table.go b/internal/rootcoord/mocks/meta_table.go index 214a672287..3e75f5a127 100644 --- a/internal/rootcoord/mocks/meta_table.go +++ b/internal/rootcoord/mocks/meta_table.go @@ -275,17 +275,17 @@ func (_c *IMetaTable_AlterCredential_Call) RunAndReturn(run func(context.Context return _c } -// AlterDatabase provides a mock function with given fields: ctx, oldDB, newDB, ts -func (_m *IMetaTable) AlterDatabase(ctx context.Context, oldDB *model.Database, newDB *model.Database, ts uint64) error { - ret := _m.Called(ctx, oldDB, newDB, ts) +// AlterDatabase provides a mock function with given fields: ctx, newDB, ts +func (_m *IMetaTable) AlterDatabase(ctx context.Context, newDB *model.Database, ts uint64) error { + ret := _m.Called(ctx, newDB, ts) if len(ret) == 0 { panic("no return value specified for AlterDatabase") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *model.Database, *model.Database, uint64) error); ok { - r0 = rf(ctx, oldDB, newDB, ts) + if rf, ok := ret.Get(0).(func(context.Context, *model.Database, uint64) error); ok { + r0 = rf(ctx, newDB, ts) } else { r0 = ret.Error(0) } @@ -300,16 +300,15 @@ type IMetaTable_AlterDatabase_Call struct { // AlterDatabase is a helper method to define mock.On call // - ctx context.Context -// - oldDB *model.Database // - newDB *model.Database // - ts uint64 -func (_e *IMetaTable_Expecter) AlterDatabase(ctx interface{}, oldDB interface{}, newDB interface{}, ts interface{}) *IMetaTable_AlterDatabase_Call { - return &IMetaTable_AlterDatabase_Call{Call: _e.mock.On("AlterDatabase", ctx, oldDB, newDB, ts)} +func (_e *IMetaTable_Expecter) AlterDatabase(ctx interface{}, newDB interface{}, ts interface{}) *IMetaTable_AlterDatabase_Call { + return &IMetaTable_AlterDatabase_Call{Call: _e.mock.On("AlterDatabase", ctx, newDB, ts)} } -func (_c *IMetaTable_AlterDatabase_Call) Run(run func(ctx context.Context, oldDB *model.Database, newDB *model.Database, ts uint64)) *IMetaTable_AlterDatabase_Call { +func (_c *IMetaTable_AlterDatabase_Call) Run(run func(ctx context.Context, newDB *model.Database, ts uint64)) *IMetaTable_AlterDatabase_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*model.Database), args[2].(*model.Database), args[3].(uint64)) + run(args[0].(context.Context), args[1].(*model.Database), args[2].(uint64)) }) return _c } @@ -319,7 +318,7 @@ func (_c *IMetaTable_AlterDatabase_Call) Return(_a0 error) *IMetaTable_AlterData return _c } -func (_c *IMetaTable_AlterDatabase_Call) RunAndReturn(run func(context.Context, *model.Database, *model.Database, uint64) error) *IMetaTable_AlterDatabase_Call { +func (_c *IMetaTable_AlterDatabase_Call) RunAndReturn(run func(context.Context, *model.Database, uint64) error) *IMetaTable_AlterDatabase_Call { _c.Call.Return(run) return _c } @@ -576,6 +575,100 @@ func (_c *IMetaTable_CheckIfCreateRole_Call) RunAndReturn(run func(context.Conte return _c } +// CheckIfDatabaseCreatable provides a mock function with given fields: ctx, req +func (_m *IMetaTable) CheckIfDatabaseCreatable(ctx context.Context, req *milvuspb.CreateDatabaseRequest) error { + ret := _m.Called(ctx, req) + + if len(ret) == 0 { + panic("no return value specified for CheckIfDatabaseCreatable") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CreateDatabaseRequest) error); ok { + r0 = rf(ctx, req) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// IMetaTable_CheckIfDatabaseCreatable_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckIfDatabaseCreatable' +type IMetaTable_CheckIfDatabaseCreatable_Call struct { + *mock.Call +} + +// CheckIfDatabaseCreatable is a helper method to define mock.On call +// - ctx context.Context +// - req *milvuspb.CreateDatabaseRequest +func (_e *IMetaTable_Expecter) CheckIfDatabaseCreatable(ctx interface{}, req interface{}) *IMetaTable_CheckIfDatabaseCreatable_Call { + return &IMetaTable_CheckIfDatabaseCreatable_Call{Call: _e.mock.On("CheckIfDatabaseCreatable", ctx, req)} +} + +func (_c *IMetaTable_CheckIfDatabaseCreatable_Call) Run(run func(ctx context.Context, req *milvuspb.CreateDatabaseRequest)) *IMetaTable_CheckIfDatabaseCreatable_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.CreateDatabaseRequest)) + }) + return _c +} + +func (_c *IMetaTable_CheckIfDatabaseCreatable_Call) Return(_a0 error) *IMetaTable_CheckIfDatabaseCreatable_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *IMetaTable_CheckIfDatabaseCreatable_Call) RunAndReturn(run func(context.Context, *milvuspb.CreateDatabaseRequest) error) *IMetaTable_CheckIfDatabaseCreatable_Call { + _c.Call.Return(run) + return _c +} + +// CheckIfDatabaseDroppable provides a mock function with given fields: ctx, req +func (_m *IMetaTable) CheckIfDatabaseDroppable(ctx context.Context, req *milvuspb.DropDatabaseRequest) error { + ret := _m.Called(ctx, req) + + if len(ret) == 0 { + panic("no return value specified for CheckIfDatabaseDroppable") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.DropDatabaseRequest) error); ok { + r0 = rf(ctx, req) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// IMetaTable_CheckIfDatabaseDroppable_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckIfDatabaseDroppable' +type IMetaTable_CheckIfDatabaseDroppable_Call struct { + *mock.Call +} + +// CheckIfDatabaseDroppable is a helper method to define mock.On call +// - ctx context.Context +// - req *milvuspb.DropDatabaseRequest +func (_e *IMetaTable_Expecter) CheckIfDatabaseDroppable(ctx interface{}, req interface{}) *IMetaTable_CheckIfDatabaseDroppable_Call { + return &IMetaTable_CheckIfDatabaseDroppable_Call{Call: _e.mock.On("CheckIfDatabaseDroppable", ctx, req)} +} + +func (_c *IMetaTable_CheckIfDatabaseDroppable_Call) Run(run func(ctx context.Context, req *milvuspb.DropDatabaseRequest)) *IMetaTable_CheckIfDatabaseDroppable_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.DropDatabaseRequest)) + }) + return _c +} + +func (_c *IMetaTable_CheckIfDatabaseDroppable_Call) Return(_a0 error) *IMetaTable_CheckIfDatabaseDroppable_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *IMetaTable_CheckIfDatabaseDroppable_Call) RunAndReturn(run func(context.Context, *milvuspb.DropDatabaseRequest) error) *IMetaTable_CheckIfDatabaseDroppable_Call { + _c.Call.Return(run) + return _c +} + // CheckIfDeleteCredential provides a mock function with given fields: ctx, req func (_m *IMetaTable) CheckIfDeleteCredential(ctx context.Context, req *milvuspb.DeleteCredentialRequest) error { ret := _m.Called(ctx, req) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 32fdf90660..5c385ccbbb 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -807,28 +807,12 @@ func (c *Core) CreateDatabase(ctx context.Context, in *milvuspb.CreateDatabaseRe log.Ctx(ctx).Info("received request to create database", zap.String("role", typeutil.RootCoordRole), zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID())) - t := &createDatabaseTask{ - baseTask: newBaseTask(ctx, c), - Req: in, - } - - if err := c.scheduler.AddTask(t); err != nil { - log.Ctx(ctx).Info("failed to enqueue request to create database", - zap.String("role", typeutil.RootCoordRole), - zap.Error(err), - zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID())) - - metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() - return merr.Status(err), nil - } - - if err := t.WaitToFinish(); err != nil { + if err := c.broadcastCreateDatabase(ctx, in); err != nil { log.Ctx(ctx).Info("failed to create database", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("dbName", in.GetDbName()), - zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs())) - + zap.Int64("msgID", in.GetBase().GetMsgID())) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() return merr.Status(err), nil } @@ -837,7 +821,7 @@ func (c *Core) CreateDatabase(ctx context.Context, in *milvuspb.CreateDatabaseRe metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) log.Ctx(ctx).Info("done to create database", zap.String("role", typeutil.RootCoordRole), zap.String("dbName", in.GetDbName()), - zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs())) + zap.Int64("msgID", in.GetBase().GetMsgID())) return merr.Success(), nil } @@ -853,26 +837,15 @@ func (c *Core) DropDatabase(ctx context.Context, in *milvuspb.DropDatabaseReques log.Ctx(ctx).Info("received request to drop database", zap.String("role", typeutil.RootCoordRole), zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID())) - t := &dropDatabaseTask{ - baseTask: newBaseTask(ctx, c), - Req: in, - } - - if err := c.scheduler.AddTask(t); err != nil { - log.Ctx(ctx).Info("failed to enqueue request to drop database", zap.String("role", typeutil.RootCoordRole), - zap.Error(err), - zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID())) - - metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() - return merr.Status(err), nil - } - - if err := t.WaitToFinish(); err != nil { + if err := c.broadcastDropDatabase(ctx, in); err != nil { + if errors.Is(err, merr.ErrDatabaseNotFound) { + log.Ctx(ctx).Info("drop a database that not found, ignore it", zap.String("dbName", in.GetDbName())) + return merr.Success(), nil + } log.Ctx(ctx).Info("failed to drop database", zap.String("role", typeutil.RootCoordRole), zap.Error(err), zap.String("dbName", in.GetDbName()), - zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs())) - + zap.Int64("msgID", in.GetBase().GetMsgID())) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() return merr.Status(err), nil } @@ -881,8 +854,7 @@ func (c *Core) DropDatabase(ctx context.Context, in *milvuspb.DropDatabaseReques metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.CleanupRootCoordDBMetrics(in.GetDbName()) log.Ctx(ctx).Info("done to drop database", zap.String("role", typeutil.RootCoordRole), - zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()), - zap.Uint64("ts", t.GetTs())) + zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID())) return merr.Success(), nil } @@ -1488,27 +1460,11 @@ func (c *Core) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseR zap.String("name", in.GetDbName()), zap.Any("props", in.Properties)) - t := &alterDatabaseTask{ - baseTask: newBaseTask(ctx, c), - Req: in, - } - - if err := c.scheduler.AddTask(t); err != nil { - log.Warn("failed to enqueue request to alter database", - zap.String("role", typeutil.RootCoordRole), - zap.String("name", in.GetDbName()), - zap.Error(err)) - - metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() - return merr.Status(err), nil - } - - if err := t.WaitToFinish(); err != nil { + if err := c.broadcastAlterDatabase(ctx, in); err != nil { log.Warn("failed to alter database", zap.String("role", typeutil.RootCoordRole), zap.Error(err), - zap.String("name", in.GetDbName()), - zap.Uint64("ts", t.GetTs())) + zap.String("name", in.GetDbName())) metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() return merr.Status(err), nil @@ -1516,12 +1472,11 @@ func (c *Core) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseR metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) - metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues(method).Observe(float64(t.queueDur.Milliseconds())) + // metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues(method).Observe(float64(t.queueDur.Milliseconds())) log.Ctx(ctx).Info("done to alter database", zap.String("role", typeutil.RootCoordRole), - zap.String("name", in.GetDbName()), - zap.Uint64("ts", t.GetTs())) + zap.String("name", in.GetDbName())) return merr.Success(), nil } diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 23f612c552..c5a080ffd8 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -99,35 +99,6 @@ func TestRootCoord_CreateDatabase(t *testing.T) { assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_NotReadyServe, resp.GetErrorCode()) }) - - t.Run("failed to add task", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withInvalidScheduler()) - - ctx := context.Background() - resp, err := c.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{}) - assert.NoError(t, err) - assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - }) - - t.Run("failed to execute", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withTaskFailScheduler()) - - ctx := context.Background() - resp, err := c.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{}) - assert.NoError(t, err) - assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - }) - - t.Run("ok", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withValidScheduler()) - ctx := context.Background() - resp, err := c.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{}) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - }) } func TestRootCoord_DropDatabase(t *testing.T) { @@ -138,35 +109,6 @@ func TestRootCoord_DropDatabase(t *testing.T) { assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_NotReadyServe, resp.GetErrorCode()) }) - - t.Run("failed to add task", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withInvalidScheduler()) - - ctx := context.Background() - resp, err := c.DropDatabase(ctx, &milvuspb.DropDatabaseRequest{}) - assert.NoError(t, err) - assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - }) - - t.Run("failed to execute", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withTaskFailScheduler()) - - ctx := context.Background() - resp, err := c.DropDatabase(ctx, &milvuspb.DropDatabaseRequest{}) - assert.NoError(t, err) - assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - }) - - t.Run("ok", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withValidScheduler()) - ctx := context.Background() - resp, err := c.DropDatabase(ctx, &milvuspb.DropDatabaseRequest{}) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - }) } func TestRootCoord_ListDatabases(t *testing.T) { @@ -216,35 +158,6 @@ func TestRootCoord_AlterDatabase(t *testing.T) { assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_NotReadyServe, resp.GetErrorCode()) }) - - t.Run("failed to add task", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withInvalidScheduler()) - - ctx := context.Background() - resp, err := c.AlterDatabase(ctx, &rootcoordpb.AlterDatabaseRequest{}) - assert.NoError(t, err) - assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - }) - - t.Run("failed to execute", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withTaskFailScheduler()) - - ctx := context.Background() - resp, err := c.AlterDatabase(ctx, &rootcoordpb.AlterDatabaseRequest{}) - assert.NoError(t, err) - assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - }) - - t.Run("ok", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withValidScheduler()) - ctx := context.Background() - resp, err := c.AlterDatabase(ctx, &rootcoordpb.AlterDatabaseRequest{}) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - }) } func TestRootCoord_CreateCollection(t *testing.T) { diff --git a/internal/rootcoord/step.go b/internal/rootcoord/step.go index ff45991162..3ea759b37b 100644 --- a/internal/rootcoord/step.go +++ b/internal/rootcoord/step.go @@ -553,22 +553,6 @@ func (s *WriteSchemaChangeWALStep) Desc() string { return fmt.Sprintf("write schema change WALcollectionID: %d, ts: %d", s.collection.CollectionID, s.ts) } -type AlterDatabaseStep struct { - baseStep - oldDB *model.Database - newDB *model.Database - ts Timestamp -} - -func (a *AlterDatabaseStep) Execute(ctx context.Context) ([]nestedStep, error) { - err := a.core.meta.AlterDatabase(ctx, a.oldDB, a.newDB, a.ts) - return nil, err -} - -func (a *AlterDatabaseStep) Desc() string { - return fmt.Sprintf("alter database, databaseID: %d, databaseName: %s, ts: %d", a.oldDB.ID, a.oldDB.Name, a.ts) -} - type renameCollectionStep struct { baseStep dbName string diff --git a/internal/rootcoord/task_test.go b/internal/rootcoord/task_test.go index fde3f91047..aa82dac2c6 100644 --- a/internal/rootcoord/task_test.go +++ b/internal/rootcoord/task_test.go @@ -124,15 +124,6 @@ func TestGetLockerKey(t *testing.T) { key := tt.GetLockerKey() assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|111-2-true") }) - t.Run("alter database task locker key", func(t *testing.T) { - tt := &alterDatabaseTask{ - Req: &rootcoordpb.AlterDatabaseRequest{ - DbName: "foo", - }, - } - key := tt.GetLockerKey() - assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-true") - }) t.Run("create alias task locker key", func(t *testing.T) { metaMock := mockrootcoord.NewIMetaTable(t) c := &Core{ @@ -161,15 +152,6 @@ func TestGetLockerKey(t *testing.T) { key := tt.GetLockerKey() assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|10-2-true") }) - t.Run("create database task locker key", func(t *testing.T) { - tt := &createDatabaseTask{ - Req: &milvuspb.CreateDatabaseRequest{ - DbName: "foo", - }, - } - key := tt.GetLockerKey() - assert.Equal(t, GetLockerKeyString(key), "$-0-true") - }) t.Run("create partition task locker key", func(t *testing.T) { metaMock := mockrootcoord.NewIMetaTable(t) metaMock.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything). @@ -281,15 +263,6 @@ func TestGetLockerKey(t *testing.T) { key := tt.GetLockerKey() assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|111-2-true") }) - t.Run("drop database task locker key", func(t *testing.T) { - tt := &dropDatabaseTask{ - Req: &milvuspb.DropDatabaseRequest{ - DbName: "foo", - }, - } - key := tt.GetLockerKey() - assert.Equal(t, GetLockerKeyString(key), "$-0-true") - }) t.Run("drop partition task locker key", func(t *testing.T) { metaMock := mockrootcoord.NewIMetaTable(t) metaMock.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything).