enhance: support database with WAL-based DDL framework (#44822)

issue: #43897

- Database related DDL is implemented by WAL-based DDL framework now.
- Support following message type in wal CreateDatabase, AlterDatabase,
DropDatabase.
- Database DDL can be synced by new CDC now.
- Refactor some UT for Database DDL.

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-10-17 16:38:10 +08:00 committed by GitHub
parent 754997ac2b
commit 4dc75a6e2c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 660 additions and 1250 deletions

View File

@ -1,246 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/util/hookutil"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
type alterDatabaseTask struct {
baseTask
Req *rootcoordpb.AlterDatabaseRequest
}
func (a *alterDatabaseTask) Prepare(ctx context.Context) error {
if a.Req.GetDbName() == "" {
return errors.New("alter database failed, database name does not exists")
}
// TODO SimFG maybe it will support to alter the replica.id properties in the future when the database has no collections
// now it can't be because the latest database properties can't be notified to the querycoord and datacoord
replicateID, _ := common.GetReplicateID(a.Req.Properties)
if replicateID != "" {
colls, err := a.core.meta.ListCollections(ctx, a.Req.DbName, a.ts, true)
if err != nil {
return err
}
if len(colls) > 0 {
return errors.New("can't set replicate id on database with collections")
}
}
return nil
}
func (a *alterDatabaseTask) Execute(ctx context.Context) error {
log := log.Ctx(ctx).With(
zap.String("alterDatabaseTask", a.Req.GetDbName()),
zap.String("db", a.Req.GetDbId()),
zap.Uint64("ts", a.GetTs()))
if a.Req.GetProperties() == nil && a.Req.GetDeleteKeys() == nil {
log.Warn("alter database with empty properties and delete keys, expected to set either properties or delete keys ")
return errors.New("alter database with empty properties and delete keys, expected to set either properties or delete keys")
}
if len(a.Req.GetProperties()) > 0 && len(a.Req.GetDeleteKeys()) > 0 {
return errors.New("alter database cannot modify properties and delete keys at the same time")
}
if hookutil.ContainsCipherProperties(a.Req.GetProperties(), a.Req.GetDeleteKeys()) {
log.Info("skip to alter collection due to cipher properties were detected in the request properties")
return errors.New("can not alter cipher related properties")
}
oldDB, err := a.core.meta.GetDatabaseByName(ctx, a.Req.GetDbName(), a.GetTs())
if err != nil {
log.Warn("get database failed during changing database props")
return err
}
var newProperties []*commonpb.KeyValuePair
if (len(a.Req.GetProperties())) > 0 {
if IsSubsetOfProperties(a.Req.GetProperties(), oldDB.Properties) {
log.Info("skip to alter database due to no changes were detected in the properties")
return nil
}
newProperties = MergeProperties(oldDB.Properties, a.Req.GetProperties())
} else if (len(a.Req.GetDeleteKeys())) > 0 {
newProperties = DeleteProperties(oldDB.Properties, a.Req.GetDeleteKeys())
}
return executeAlterDatabaseTaskSteps(ctx, a.core, oldDB, oldDB.Properties, newProperties, a.GetTs())
}
func (a *alterDatabaseTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(a.Req.GetDbName(), true),
)
}
func MergeProperties(oldProps, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair {
_, existEndTS := common.GetReplicateEndTS(updatedProps)
if existEndTS {
updatedProps = append(updatedProps, &commonpb.KeyValuePair{
Key: common.ReplicateIDKey,
Value: "",
})
}
props := make(map[string]string)
for _, prop := range oldProps {
props[prop.Key] = prop.Value
}
for _, prop := range updatedProps {
props[prop.Key] = prop.Value
}
propKV := make([]*commonpb.KeyValuePair, 0)
for key, value := range props {
propKV = append(propKV, &commonpb.KeyValuePair{
Key: key,
Value: value,
})
}
return propKV
}
func executeAlterDatabaseTaskSteps(ctx context.Context,
core *Core,
dbInfo *model.Database,
oldProperties []*commonpb.KeyValuePair,
newProperties []*commonpb.KeyValuePair,
ts Timestamp,
) error {
oldDB := dbInfo.Clone()
oldDB.Properties = oldProperties
newDB := dbInfo.Clone()
newDB.Properties = newProperties
redoTask := newBaseRedoTask(core.stepExecutor)
redoTask.AddSyncStep(&AlterDatabaseStep{
baseStep: baseStep{core: core},
oldDB: oldDB,
newDB: newDB,
ts: ts,
})
redoTask.AddSyncStep(&expireCacheStep{
baseStep: baseStep{core: core},
dbName: newDB.Name,
ts: ts,
// make sure to send the "expire cache" request
// because it won't send this request when the length of collection names array is zero
collectionNames: []string{""},
opts: []proxyutil.ExpireCacheOpt{
proxyutil.SetMsgType(commonpb.MsgType_AlterDatabase),
},
})
oldReplicaNumber, _ := common.DatabaseLevelReplicaNumber(oldDB.Properties)
oldResourceGroups, _ := common.DatabaseLevelResourceGroups(oldDB.Properties)
newReplicaNumber, _ := common.DatabaseLevelReplicaNumber(newDB.Properties)
newResourceGroups, _ := common.DatabaseLevelResourceGroups(newDB.Properties)
left, right := lo.Difference(oldResourceGroups, newResourceGroups)
rgChanged := len(left) > 0 || len(right) > 0
replicaChanged := oldReplicaNumber != newReplicaNumber
if rgChanged || replicaChanged {
log.Ctx(ctx).Warn("alter database trigger update load config",
zap.Int64("dbID", oldDB.ID),
zap.Int64("oldReplicaNumber", oldReplicaNumber),
zap.Int64("newReplicaNumber", newReplicaNumber),
zap.Strings("oldResourceGroups", oldResourceGroups),
zap.Strings("newResourceGroups", newResourceGroups),
)
redoTask.AddAsyncStep(NewSimpleStep("", func(ctx context.Context) ([]nestedStep, error) {
colls, err := core.meta.ListCollections(ctx, oldDB.Name, typeutil.MaxTimestamp, true)
if err != nil {
log.Ctx(ctx).Warn("failed to trigger update load config for database", zap.Int64("dbID", oldDB.ID), zap.Error(err))
return nil, err
}
if len(colls) == 0 {
return nil, nil
}
resp, err := core.mixCoord.UpdateLoadConfig(ctx, &querypb.UpdateLoadConfigRequest{
CollectionIDs: lo.Map(colls, func(coll *model.Collection, _ int) int64 { return coll.CollectionID }),
ReplicaNumber: int32(newReplicaNumber),
ResourceGroups: newResourceGroups,
})
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Ctx(ctx).Warn("failed to trigger update load config for database", zap.Int64("dbID", oldDB.ID), zap.Error(err))
return nil, err
}
return nil, nil
}))
}
oldReplicateEnable, _ := common.IsReplicateEnabled(oldDB.Properties)
newReplicateEnable, ok := common.IsReplicateEnabled(newDB.Properties)
if ok && !newReplicateEnable && oldReplicateEnable {
replicateID, _ := common.GetReplicateID(oldDB.Properties)
redoTask.AddAsyncStep(NewSimpleStep("send replicate end msg for db", func(ctx context.Context) ([]nestedStep, error) {
msgPack := &msgstream.MsgPack{}
msg := &msgstream.ReplicateMsg{
BaseMsg: msgstream.BaseMsg{
Ctx: ctx,
BeginTimestamp: ts,
EndTimestamp: ts,
HashValues: []uint32{0},
},
ReplicateMsg: &msgpb.ReplicateMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Replicate,
Timestamp: ts,
ReplicateInfo: &commonpb.ReplicateInfo{
IsReplicate: true,
ReplicateID: replicateID,
},
},
IsEnd: true,
Database: newDB.Name,
Collection: "",
},
}
msgPack.Msgs = append(msgPack.Msgs, msg)
log.Info("send replicate end msg for db", zap.String("db", newDB.Name), zap.String("replicateID", replicateID))
return nil, core.chanTimeTick.broadcastDmlChannels(core.chanTimeTick.listDmlChannels(), msgPack)
}))
}
return redoTask.Execute(ctx)
}

View File

@ -1,379 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/metastore/model"
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
)
func Test_alterDatabaseTask_Prepare(t *testing.T) {
t.Run("invalid collectionID", func(t *testing.T) {
task := &alterDatabaseTask{Req: &rootcoordpb.AlterDatabaseRequest{}}
err := task.Prepare(context.Background())
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
task := &alterDatabaseTask{
Req: &rootcoordpb.AlterDatabaseRequest{
DbName: "cn",
},
}
err := task.Prepare(context.Background())
assert.NoError(t, err)
})
t.Run("replicate id", func(t *testing.T) {
{
// no collections
meta := mockrootcoord.NewIMetaTable(t)
core := newTestCore(withMeta(meta))
meta.EXPECT().
ListCollections(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return([]*model.Collection{}, nil).
Once()
task := &alterDatabaseTask{
baseTask: newBaseTask(context.Background(), core),
Req: &rootcoordpb.AlterDatabaseRequest{
DbName: "cn",
Properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateIDKey,
Value: "local-test",
},
},
},
}
err := task.Prepare(context.Background())
assert.NoError(t, err)
}
{
meta := mockrootcoord.NewIMetaTable(t)
core := newTestCore(withMeta(meta))
meta.EXPECT().ListCollections(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*model.Collection{
{
Name: "foo",
},
}, nil).Once()
task := &alterDatabaseTask{
baseTask: newBaseTask(context.Background(), core),
Req: &rootcoordpb.AlterDatabaseRequest{
DbName: "cn",
Properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateIDKey,
Value: "local-test",
},
},
},
}
err := task.Prepare(context.Background())
assert.Error(t, err)
}
{
meta := mockrootcoord.NewIMetaTable(t)
core := newTestCore(withMeta(meta))
meta.EXPECT().ListCollections(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil, errors.New("err")).
Once()
task := &alterDatabaseTask{
baseTask: newBaseTask(context.Background(), core),
Req: &rootcoordpb.AlterDatabaseRequest{
DbName: "cn",
Properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateIDKey,
Value: "local-test",
},
},
},
}
err := task.Prepare(context.Background())
assert.Error(t, err)
}
})
}
func Test_alterDatabaseTask_Execute(t *testing.T) {
properties := []*commonpb.KeyValuePair{
{
Key: common.CollectionTTLConfigKey,
Value: "3600",
},
}
t.Run("properties is empty", func(t *testing.T) {
task := &alterDatabaseTask{Req: &rootcoordpb.AlterDatabaseRequest{}}
err := task.Execute(context.Background())
assert.Error(t, err)
})
t.Run("failed to create alias", func(t *testing.T) {
core := newTestCore(withInvalidMeta())
task := &alterDatabaseTask{
baseTask: newBaseTask(context.Background(), core),
Req: &rootcoordpb.AlterDatabaseRequest{
DbName: "cn",
Properties: properties,
},
}
err := task.Execute(context.Background())
assert.Error(t, err)
})
meta := mockrootcoord.NewIMetaTable(t)
properties = append(properties, &commonpb.KeyValuePair{Key: common.DatabaseForceDenyReadingKey, Value: "true"})
meta.On("GetDatabaseByName",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(&model.Database{ID: int64(1), Properties: properties}, nil).Maybe()
meta.On("AlterDatabase",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(nil).Maybe()
t.Run("alter skip due to no change", func(t *testing.T) {
core := newTestCore(withMeta(meta))
task := &alterDatabaseTask{
baseTask: newBaseTask(context.Background(), core),
Req: &rootcoordpb.AlterDatabaseRequest{
DbName: "cn",
Properties: []*commonpb.KeyValuePair{
{
Key: common.DatabaseForceDenyReadingKey,
Value: "true",
},
},
},
}
err := task.Execute(context.Background())
assert.NoError(t, err)
})
t.Run("alter step failed", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.On("GetDatabaseByName",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(&model.Database{ID: int64(1)}, nil)
meta.On("AlterDatabase",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(errors.New("err"))
core := newTestCore(withMeta(meta))
task := &alterDatabaseTask{
baseTask: newBaseTask(context.Background(), core),
Req: &rootcoordpb.AlterDatabaseRequest{
DbName: "cn",
Properties: properties,
},
}
err := task.Execute(context.Background())
assert.Error(t, err)
})
t.Run("alter successfully", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.On("GetDatabaseByName",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(&model.Database{
ID: int64(1),
Name: "cn",
Properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateIDKey,
Value: "local-test",
},
},
}, nil)
meta.On("AlterDatabase",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(nil)
// the chan length should larger than 4, because newChanTimeTickSync will send 4 ts messages when execute the `broadcast` step
packChan := make(chan *msgstream.ConsumeMsgPack, 10)
ticker := newChanTimeTickSync(packChan)
ticker.addDmlChannels("by-dev-rootcoord-dml_1")
core := newTestCore(withMeta(meta), withValidProxyManager(), withTtSynchronizer(ticker))
newPros := append(properties,
&commonpb.KeyValuePair{Key: common.ReplicateEndTSKey, Value: "1000"},
)
task := &alterDatabaseTask{
baseTask: newBaseTask(context.Background(), core),
Req: &rootcoordpb.AlterDatabaseRequest{
DbName: "cn",
Properties: newPros,
},
}
unmarshalFactory := &msgstream.ProtoUDFactory{}
unmarshalDispatcher := unmarshalFactory.NewUnmarshalDispatcher()
err := task.Execute(context.Background())
assert.NoError(t, err)
time.Sleep(time.Second)
select {
case pack := <-packChan:
assert.Equal(t, commonpb.MsgType_Replicate, pack.Msgs[0].GetType())
tsMsg, err := pack.Msgs[0].Unmarshal(unmarshalDispatcher)
require.NoError(t, err)
replicateMsg := tsMsg.(*msgstream.ReplicateMsg)
assert.Equal(t, "cn", replicateMsg.ReplicateMsg.GetDatabase())
assert.True(t, replicateMsg.ReplicateMsg.GetIsEnd())
default:
assert.Fail(t, "no message sent")
}
})
t.Run("test update collection props", func(t *testing.T) {
oldProps := []*commonpb.KeyValuePair{
{
Key: common.CollectionTTLConfigKey,
Value: "1",
},
}
updateProps1 := []*commonpb.KeyValuePair{
{
Key: common.CollectionAutoCompactionKey,
Value: "true",
},
}
ret := MergeProperties(oldProps, updateProps1)
assert.Contains(t, ret, &commonpb.KeyValuePair{
Key: common.CollectionTTLConfigKey,
Value: "1",
})
assert.Contains(t, ret, &commonpb.KeyValuePair{
Key: common.CollectionAutoCompactionKey,
Value: "true",
})
updateProps2 := []*commonpb.KeyValuePair{
{
Key: common.CollectionTTLConfigKey,
Value: "2",
},
}
ret2 := MergeProperties(ret, updateProps2)
assert.Contains(t, ret2, &commonpb.KeyValuePair{
Key: common.CollectionTTLConfigKey,
Value: "2",
})
assert.Contains(t, ret2, &commonpb.KeyValuePair{
Key: common.CollectionAutoCompactionKey,
Value: "true",
})
})
t.Run("test delete collection props", func(t *testing.T) {
oldProps := []*commonpb.KeyValuePair{
{
Key: common.CollectionTTLConfigKey,
Value: "1",
},
}
deleteKeys := []string{
common.CollectionAutoCompactionKey,
}
ret := DeleteProperties(oldProps, deleteKeys)
assert.Contains(t, ret, &commonpb.KeyValuePair{
Key: common.CollectionTTLConfigKey,
Value: "1",
})
oldProps2 := []*commonpb.KeyValuePair{
{
Key: common.CollectionTTLConfigKey,
Value: "1",
},
}
deleteKeys2 := []string{
common.CollectionTTLConfigKey,
}
ret2 := DeleteProperties(oldProps2, deleteKeys2)
assert.Empty(t, ret2)
})
}
func TestMergeProperties(t *testing.T) {
p := MergeProperties([]*commonpb.KeyValuePair{
{
Key: common.ReplicateIDKey,
Value: "local-test",
},
{
Key: "foo",
Value: "xxx",
},
}, []*commonpb.KeyValuePair{
{
Key: common.ReplicateEndTSKey,
Value: "1001",
},
})
assert.Len(t, p, 3)
m := funcutil.KeyValuePair2Map(p)
assert.Equal(t, "", m[common.ReplicateIDKey])
assert.Equal(t, "1001", m[common.ReplicateEndTSKey])
assert.Equal(t, "xxx", m["foo"])
}

View File

@ -1,68 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/util/hookutil"
"github.com/milvus-io/milvus/pkg/v2/proto/etcdpb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
)
type createDatabaseTask struct {
baseTask
Req *milvuspb.CreateDatabaseRequest
dbID UniqueID
}
func (t *createDatabaseTask) Prepare(ctx context.Context) error {
dbs, err := t.core.meta.ListDatabases(ctx, t.GetTs())
if err != nil {
return err
}
cfgMaxDatabaseNum := Params.RootCoordCfg.MaxDatabaseNum.GetAsInt()
if len(dbs) >= cfgMaxDatabaseNum {
return merr.WrapErrDatabaseNumLimitExceeded(cfgMaxDatabaseNum)
}
t.dbID, err = t.core.idAllocator.AllocOne()
if err != nil {
return err
}
// Use dbID as ezID because the dbID is unqiue
properties, err := hookutil.TidyDBCipherProperties(t.dbID, t.Req.Properties)
if err != nil {
return err
}
t.Req.Properties = properties
return nil
}
func (t *createDatabaseTask) Execute(ctx context.Context) error {
db := model.NewDatabase(t.dbID, t.Req.GetDbName(), etcdpb.DatabaseState_DatabaseCreated, t.Req.GetProperties())
return t.core.meta.CreateDatabase(ctx, db, t.GetTs())
}
func (t *createDatabaseTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(NewClusterLockerKey(true))
}

View File

@ -1,124 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"strconv"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
func Test_CreateDBTask_Prepare(t *testing.T) {
paramtable.Init()
t.Run("list database fail", func(t *testing.T) {
core := newTestCore(withInvalidMeta())
task := &createDatabaseTask{
baseTask: newBaseTask(context.TODO(), core),
Req: &milvuspb.CreateDatabaseRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateDatabase,
},
DbName: "db",
},
}
err := task.Prepare(context.Background())
assert.Error(t, err)
})
t.Run("check database number fail", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
cfgMaxDatabaseNum := Params.RootCoordCfg.MaxDatabaseNum.GetAsInt()
dbs := make([]*model.Database, 0, cfgMaxDatabaseNum)
for i := 0; i < cfgMaxDatabaseNum; i++ {
dbs = append(dbs, model.NewDefaultDatabase(nil))
}
meta.On("ListDatabases",
mock.Anything,
mock.Anything).
Return(dbs, nil)
core := newTestCore(withMeta(meta))
task := &createDatabaseTask{
baseTask: newBaseTask(context.TODO(), core),
Req: &milvuspb.CreateDatabaseRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateDatabase,
},
DbName: "db",
},
}
err := task.Prepare(context.Background())
assert.ErrorIs(t, err, merr.ErrDatabaseNumLimitExceeded)
})
t.Run("ok", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.On("ListDatabases",
mock.Anything,
mock.Anything).
Return([]*model.Database{model.NewDefaultDatabase(nil)}, nil)
core := newTestCore(withMeta(meta), withValidIDAllocator())
paramtable.Get().Save(Params.RootCoordCfg.MaxDatabaseNum.Key, strconv.Itoa(10))
defer paramtable.Get().Reset(Params.RootCoordCfg.MaxDatabaseNum.Key)
task := &createDatabaseTask{
baseTask: newBaseTask(context.TODO(), core),
Req: &milvuspb.CreateDatabaseRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateDatabase,
},
DbName: "db",
},
}
err := task.Prepare(context.Background())
assert.NoError(t, err)
})
}
func Test_CreateDBTask_Execute(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.On("CreateDatabase",
mock.Anything,
mock.Anything,
mock.Anything).
Return(nil)
core := newTestCore(withMeta(meta))
task := &createDatabaseTask{
baseTask: newBaseTask(context.TODO(), core),
Req: &milvuspb.CreateDatabaseRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreateDatabase,
},
DbName: "db",
},
}
err := task.Execute(context.Background())
assert.NoError(t, err)
}

View File

@ -36,8 +36,8 @@ func RegisterDDLCallbacks(core *Core) {
ddlCallback := &DDLCallback{
Core: core,
}
// RBAC
ddlCallback.registerRBACCallbacks()
ddlCallback.registerDatabaseCallbacks()
}
// registerRBACCallbacks registers the rbac callbacks.
@ -55,6 +55,13 @@ func (c *DDLCallback) registerRBACCallbacks() {
registry.RegisterRestoreRBACV2AckCallback(c.restoreRBACV2AckCallback)
}
// registerDatabaseCallbacks registers the database callbacks.
func (c *DDLCallback) registerDatabaseCallbacks() {
registry.RegisterCreateDatabaseV2AckCallback(c.createDatabaseV1AckCallback)
registry.RegisterAlterDatabaseV2AckCallback(c.alterDatabaseV1AckCallback)
registry.RegisterDropDatabaseV2AckCallback(c.dropDatabaseV1AckCallback)
}
// DDLCallback is the callback of ddl.
type DDLCallback struct {
*Core
@ -102,3 +109,12 @@ func startBroadcastWithRBACLock(ctx context.Context) (broadcaster.BroadcastAPI,
}
return api, nil
}
// startBroadcastWithDatabaseLock starts a broadcast with database lock.
func startBroadcastWithDatabaseLock(ctx context.Context, dbName string) (broadcaster.BroadcastAPI, error) {
broadcaster, err := broadcast.StartBroadcastWithResourceKeys(ctx, message.NewExclusiveDBNameResourceKey(dbName))
if err != nil {
return nil, errors.Wrap(err, "failed to start broadcast with database lock")
}
return broadcaster, nil
}

View File

@ -0,0 +1,180 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"strings"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/util/hookutil"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/etcdpb"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message/ce"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
func (c *Core) broadcastAlterDatabase(ctx context.Context, req *rootcoordpb.AlterDatabaseRequest) error {
req.DbName = strings.TrimSpace(req.DbName)
if req.GetProperties() == nil && req.GetDeleteKeys() == nil {
return errors.New("alter database with empty properties and delete keys, expected to set either properties or delete keys")
}
if len(req.GetProperties()) > 0 && len(req.GetDeleteKeys()) > 0 {
return errors.New("alter database cannot modify properties and delete keys at the same time")
}
if hookutil.ContainsCipherProperties(req.GetProperties(), req.GetDeleteKeys()) {
return errors.New("can not alter cipher related properties")
}
broadcaster, err := startBroadcastWithDatabaseLock(ctx, req.GetDbName())
if err != nil {
return err
}
defer broadcaster.Close()
oldDB, err := c.meta.GetDatabaseByName(ctx, req.GetDbName(), typeutil.MaxTimestamp)
if err != nil {
return errors.Wrap(err, "failed to get database by name")
}
alterLoadConfig, err := c.getAlterLoadConfigOfAlterDatabase(ctx, req.GetDbName(), oldDB.Properties, req.GetProperties())
if err != nil {
return errors.Wrap(err, "failed to get alter load config of alter database")
}
// We only allow to alter or delete properties, not both.
var newProperties []*commonpb.KeyValuePair
if (len(req.GetProperties())) > 0 {
if IsSubsetOfProperties(req.GetProperties(), oldDB.Properties) {
log.Info("skip to alter database due to no changes were detected in the properties")
return nil
}
newProperties = MergeProperties(oldDB.Properties, req.GetProperties())
} else if (len(req.GetDeleteKeys())) > 0 {
newProperties = DeleteProperties(oldDB.Properties, req.GetDeleteKeys())
}
msg := message.NewAlterDatabaseMessageBuilderV2().
WithHeader(&message.AlterDatabaseMessageHeader{
DbName: req.GetDbName(),
DbId: oldDB.ID,
}).
WithBody(&message.AlterDatabaseMessageBody{
Properties: newProperties,
AlterLoadConfig: alterLoadConfig,
}).
WithBroadcast([]string{streaming.WAL().ControlChannel()}).
MustBuildBroadcast()
_, err = broadcaster.Broadcast(ctx, msg)
return err
}
// getAlterLoadConfigOfAlterDatabase gets the put load config of put database.
func (c *Core) getAlterLoadConfigOfAlterDatabase(ctx context.Context, dbName string, oldProps []*commonpb.KeyValuePair, newProps []*commonpb.KeyValuePair) (*message.AlterLoadConfigOfAlterDatabase, error) {
oldReplicaNumber, _ := common.DatabaseLevelReplicaNumber(oldProps)
oldResourceGroups, _ := common.DatabaseLevelResourceGroups(oldProps)
newReplicaNumber, _ := common.DatabaseLevelReplicaNumber(newProps)
newResourceGroups, _ := common.DatabaseLevelResourceGroups(newProps)
left, right := lo.Difference(oldResourceGroups, newResourceGroups)
rgChanged := len(left) > 0 || len(right) > 0
replicaChanged := oldReplicaNumber != newReplicaNumber
if !rgChanged && !replicaChanged {
return nil, nil
}
colls, err := c.meta.ListCollections(ctx, dbName, typeutil.MaxTimestamp, true)
if err != nil {
return nil, err
}
if len(colls) == 0 {
return nil, nil
}
return &message.AlterLoadConfigOfAlterDatabase{
CollectionIds: lo.Map(colls, func(coll *model.Collection, _ int) int64 { return coll.CollectionID }),
ReplicaNumber: int32(newReplicaNumber),
ResourceGroups: newResourceGroups,
}, nil
}
func (c *DDLCallback) alterDatabaseV1AckCallback(ctx context.Context, result message.BroadcastResultAlterDatabaseMessageV2) error {
header := result.Message.Header()
body := result.Message.MustBody()
db := model.NewDatabase(header.DbId, header.DbName, etcdpb.DatabaseState_DatabaseCreated, result.Message.MustBody().Properties)
if err := c.meta.AlterDatabase(ctx, db, result.GetControlChannelResult().TimeTick); err != nil {
return errors.Wrap(err, "failed to alter database")
}
if err := c.ExpireCaches(ctx, ce.NewBuilder().
WithLegacyProxyCollectionMetaCache(
ce.OptLPCMDBName(header.DbName),
ce.OptLPCMMsgType(commonpb.MsgType_AlterDatabase),
),
result.GetControlChannelResult().TimeTick); err != nil {
return errors.Wrap(err, "failed to expire caches")
}
if body.AlterLoadConfig != nil {
// TODO: should replaced with calling AlterLoadConfig message ack callback.
resp, err := c.mixCoord.UpdateLoadConfig(ctx, &querypb.UpdateLoadConfigRequest{
CollectionIDs: body.AlterLoadConfig.CollectionIds,
ReplicaNumber: body.AlterLoadConfig.ReplicaNumber,
ResourceGroups: body.AlterLoadConfig.ResourceGroups,
})
return merr.CheckRPCCall(resp, err)
}
return nil
}
func MergeProperties(oldProps, updatedProps []*commonpb.KeyValuePair) []*commonpb.KeyValuePair {
_, existEndTS := common.GetReplicateEndTS(updatedProps)
if existEndTS {
updatedProps = append(updatedProps, &commonpb.KeyValuePair{
Key: common.ReplicateIDKey,
Value: "",
})
}
props := make(map[string]string)
for _, prop := range oldProps {
props[prop.Key] = prop.Value
}
for _, prop := range updatedProps {
props[prop.Key] = prop.Value
}
propKV := make([]*commonpb.KeyValuePair, 0)
for key, value := range props {
propKV = append(propKV, &commonpb.KeyValuePair{
Key: key,
Value: value,
})
}
return propKV
}

View File

@ -0,0 +1,84 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"strings"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/util/hookutil"
"github.com/milvus-io/milvus/pkg/v2/proto/etcdpb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message/ce"
)
func (c *Core) broadcastCreateDatabase(ctx context.Context, req *milvuspb.CreateDatabaseRequest) error {
req.DbName = strings.TrimSpace(req.DbName)
broadcaster, err := startBroadcastWithDatabaseLock(ctx, req.DbName)
if err != nil {
return err
}
defer broadcaster.Close()
if err := c.meta.CheckIfDatabaseCreatable(ctx, req); err != nil {
return err
}
dbID, err := c.idAllocator.AllocOne()
if err != nil {
return errors.Wrap(err, "failed to allocate database id")
}
// Use dbID as ezID because the dbID is unqiue
properties, err := hookutil.TidyDBCipherProperties(dbID, req.GetProperties())
if err != nil {
return errors.Wrap(err, "failed to tidy database cipher properties")
}
msg := message.NewCreateDatabaseMessageBuilderV2().
WithHeader(&message.CreateDatabaseMessageHeader{
DbName: req.GetDbName(),
DbId: dbID,
}).
WithBody(&message.CreateDatabaseMessageBody{
Properties: properties,
}).
WithBroadcast([]string{streaming.WAL().ControlChannel()}).
MustBuildBroadcast()
_, err = broadcaster.Broadcast(ctx, msg)
return err
}
func (c *DDLCallback) createDatabaseV1AckCallback(ctx context.Context, result message.BroadcastResultCreateDatabaseMessageV2) error {
header := result.Message.Header()
db := model.NewDatabase(header.DbId, header.DbName, etcdpb.DatabaseState_DatabaseCreated, result.Message.MustBody().Properties)
if err := c.meta.CreateDatabase(ctx, db, result.GetControlChannelResult().TimeTick); err != nil {
return errors.Wrap(err, "failed to create database")
}
return c.ExpireCaches(ctx, ce.NewBuilder().
WithLegacyProxyCollectionMetaCache(
ce.OptLPCMDBName(header.DbName),
ce.OptLPCMMsgType(commonpb.MsgType_DropDatabase),
),
result.GetControlChannelResult().TimeTick)
}

View File

@ -0,0 +1,109 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
func TestDDLCallbacksDatabaseDDL(t *testing.T) {
initStreamingSystem()
kv, _ := kvfactory.GetEtcdAndPath()
path := funcutil.RandomString(10)
catalogKV := etcdkv.NewEtcdKV(kv, path)
ss, err := rootcoord.NewSuffixSnapshot(catalogKV, rootcoord.SnapshotsSep, path, rootcoord.SnapshotPrefix)
require.NoError(t, err)
core := newTestCore(withHealthyCode(),
withMeta(&MetaTable{
catalog: rootcoord.NewCatalog(catalogKV, ss),
names: newNameDb(),
aliases: newNameDb(),
dbName2Meta: make(map[string]*model.Database),
}),
withValidProxyManager(),
withValidIDAllocator(),
)
registry.ResetRegistration()
RegisterDDLCallbacks(core)
// Create a new database
status, err := core.CreateDatabase(context.Background(), &milvuspb.CreateDatabaseRequest{
DbName: "test",
})
require.NoError(t, merr.CheckRPCCall(status, err))
db, err := core.meta.GetDatabaseByName(context.Background(), "test", typeutil.MaxTimestamp)
require.NoError(t, merr.CheckRPCCall(status, err))
require.Equal(t, db.Name, "test")
require.Empty(t, db.Properties)
// Alter a database to add properties
status, err = core.AlterDatabase(context.Background(), &rootcoordpb.AlterDatabaseRequest{
DbName: "test",
Properties: []*commonpb.KeyValuePair{
{
Key: "key",
Value: "value",
},
{
Key: "key2",
Value: "value2",
},
},
})
require.NoError(t, merr.CheckRPCCall(status, err))
db, err = core.meta.GetDatabaseByName(context.Background(), "test", typeutil.MaxTimestamp)
require.NoError(t, merr.CheckRPCCall(status, err))
require.Equal(t, db.Name, "test")
require.Len(t, db.Properties, 2)
// Drop a property
status, err = core.AlterDatabase(context.Background(), &rootcoordpb.AlterDatabaseRequest{
DbName: "test",
DeleteKeys: []string{"key"},
})
require.NoError(t, merr.CheckRPCCall(status, err))
db, err = core.meta.GetDatabaseByName(context.Background(), "test", typeutil.MaxTimestamp)
require.NoError(t, merr.CheckRPCCall(status, err))
require.Equal(t, db.Name, "test")
require.Len(t, db.Properties, 1)
// Drop a database
status, err = core.DropDatabase(context.Background(), &milvuspb.DropDatabaseRequest{
DbName: "test",
})
require.NoError(t, merr.CheckRPCCall(status, err))
db, err = core.meta.GetDatabaseByName(context.Background(), "test", typeutil.MaxTimestamp)
require.Error(t, err)
require.Nil(t, db)
}

View File

@ -0,0 +1,73 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"strings"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message/ce"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
func (c *Core) broadcastDropDatabase(ctx context.Context, req *milvuspb.DropDatabaseRequest) error {
req.DbName = strings.TrimSpace(req.DbName)
broadcaster, err := startBroadcastWithDatabaseLock(ctx, req.GetDbName())
if err != nil {
return err
}
defer broadcaster.Close()
if err := c.meta.CheckIfDatabaseDroppable(ctx, req); err != nil {
return err
}
db, err := c.meta.GetDatabaseByName(ctx, req.GetDbName(), typeutil.MaxTimestamp)
if err != nil {
return errors.Wrap(err, "failed to get database name")
}
msg := message.NewDropDatabaseMessageBuilderV2().
WithHeader(&message.DropDatabaseMessageHeader{
DbName: req.GetDbName(),
DbId: db.ID,
}).
WithBody(&message.DropDatabaseMessageBody{}).
WithBroadcast([]string{streaming.WAL().ControlChannel()}).
MustBuildBroadcast()
_, err = broadcaster.Broadcast(ctx, msg)
return err
}
func (c *DDLCallback) dropDatabaseV1AckCallback(ctx context.Context, result message.BroadcastResultDropDatabaseMessageV2) error {
header := result.Message.Header()
if err := c.meta.DropDatabase(ctx, header.DbName, result.GetControlChannelResult().TimeTick); err != nil {
return errors.Wrap(err, "failed to drop database")
}
return c.ExpireCaches(ctx, ce.NewBuilder().
WithLegacyProxyCollectionMetaCache(
ce.OptLPCMDBName(header.DbName),
ce.OptLPCMMsgType(commonpb.MsgType_DropDatabase),
),
result.GetControlChannelResult().TimeTick)
}

View File

@ -1,75 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/pkg/v2/util"
)
type dropDatabaseTask struct {
baseTask
Req *milvuspb.DropDatabaseRequest
}
func (t *dropDatabaseTask) Prepare(ctx context.Context) error {
if t.Req.GetDbName() == util.DefaultDBName {
return errors.New("can not drop default database")
}
return nil
}
func (t *dropDatabaseTask) Execute(ctx context.Context) error {
dbName := t.Req.GetDbName()
ts := t.GetTs()
return executeDropDatabaseTaskSteps(ctx, t.core, dbName, ts)
}
func (t *dropDatabaseTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(NewClusterLockerKey(true))
}
func executeDropDatabaseTaskSteps(ctx context.Context,
core *Core,
dbName string,
ts Timestamp,
) error {
redoTask := newBaseRedoTask(core.stepExecutor)
redoTask.AddSyncStep(&deleteDatabaseMetaStep{
baseStep: baseStep{core: core},
databaseName: dbName,
ts: ts,
})
redoTask.AddSyncStep(&expireCacheStep{
baseStep: baseStep{core: core},
dbName: dbName,
ts: ts,
// make sure to send the "expire cache" request
// because it won't send this request when the length of collection names array is zero
collectionNames: []string{""},
opts: []proxyutil.ExpireCacheOpt{
proxyutil.SetMsgType(commonpb.MsgType_DropDatabase),
},
})
return redoTask.Execute(ctx)
}

View File

@ -1,98 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
"github.com/milvus-io/milvus/pkg/v2/util"
)
func Test_DropDBTask(t *testing.T) {
t.Run("normal", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.On("DropDatabase",
mock.Anything,
mock.Anything,
mock.Anything).
Return(nil)
core := newTestCore(withMeta(meta), withValidProxyManager())
task := &dropDatabaseTask{
baseTask: newBaseTask(context.TODO(), core),
Req: &milvuspb.DropDatabaseRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropDatabase,
},
DbName: "db",
},
}
err := task.Prepare(context.Background())
assert.NoError(t, err)
err = task.Execute(context.Background())
assert.NoError(t, err)
})
t.Run("default db", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
core := newTestCore(withMeta(meta))
task := &dropDatabaseTask{
baseTask: newBaseTask(context.TODO(), core),
Req: &milvuspb.DropDatabaseRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropDatabase,
},
DbName: util.DefaultDBName,
},
}
err := task.Prepare(context.Background())
assert.Error(t, err)
})
t.Run("drop db fail", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.EXPECT().DropDatabase(
mock.Anything,
mock.Anything,
mock.Anything).
Return(errors.New("mock drop db error"))
core := newTestCore(withMeta(meta))
task := &dropDatabaseTask{
baseTask: newBaseTask(context.TODO(), core),
Req: &milvuspb.DropDatabaseRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropDatabase,
},
DbName: "db",
},
}
err := task.Execute(context.Background())
assert.Error(t, err)
})
}

View File

@ -52,6 +52,9 @@ import (
type MetaTableChecker interface {
RBACChecker
CheckIfDatabaseCreatable(ctx context.Context, req *milvuspb.CreateDatabaseRequest) error
CheckIfDatabaseDroppable(ctx context.Context, req *milvuspb.DropDatabaseRequest) error
}
//go:generate mockery --name=IMetaTable --structname=MockIMetaTable --output=./ --filename=mock_meta_table.go --with-expecter --inpackage
@ -63,7 +66,7 @@ type IMetaTable interface {
CreateDatabase(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error
DropDatabase(ctx context.Context, dbName string, ts typeutil.Timestamp) error
ListDatabases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Database, error)
AlterDatabase(ctx context.Context, oldDB *model.Database, newDB *model.Database, ts typeutil.Timestamp) error
AlterDatabase(ctx context.Context, newDB *model.Database, ts typeutil.Timestamp) error
AddCollection(ctx context.Context, coll *model.Collection) error
ChangeCollectionState(ctx context.Context, collectionID UniqueID, state pb.CollectionState, ts Timestamp) error
@ -325,6 +328,23 @@ func (mt *MetaTable) createDefaultDb() error {
return mt.createDatabasePrivate(mt.ctx, model.NewDefaultDatabase(defaultProperties), ts)
}
func (mt *MetaTable) CheckIfDatabaseCreatable(ctx context.Context, req *milvuspb.CreateDatabaseRequest) error {
dbName := req.GetDbName()
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
if _, ok := mt.dbName2Meta[dbName]; ok || mt.aliases.exist(dbName) || mt.names.exist(dbName) {
return fmt.Errorf("database already exist: %s", dbName)
}
cfgMaxDatabaseNum := Params.RootCoordCfg.MaxDatabaseNum.GetAsInt()
if len(mt.dbName2Meta) > cfgMaxDatabaseNum { // not include default database so use > instead of >= here.
return merr.WrapErrDatabaseNumLimitExceeded(cfgMaxDatabaseNum)
}
return nil
}
func (mt *MetaTable) CreateDatabase(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -338,10 +358,6 @@ func (mt *MetaTable) CreateDatabase(ctx context.Context, db *model.Database, ts
func (mt *MetaTable) createDatabasePrivate(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error {
dbName := db.Name
if mt.names.exist(dbName) || mt.aliases.exist(dbName) {
return fmt.Errorf("database already exist: %s", dbName)
}
if err := mt.catalog.CreateDatabase(ctx, db, ts); err != nil {
return err
}
@ -359,35 +375,31 @@ func (mt *MetaTable) createDatabasePrivate(ctx context.Context, db *model.Databa
return nil
}
func (mt *MetaTable) AlterDatabase(ctx context.Context, oldDB *model.Database, newDB *model.Database, ts typeutil.Timestamp) error {
func (mt *MetaTable) AlterDatabase(ctx context.Context, newDB *model.Database, ts typeutil.Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
if oldDB.Name != newDB.Name || oldDB.ID != newDB.ID || oldDB.State != newDB.State {
return errors.New("alter database name/id is not supported!")
}
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
if err := mt.catalog.AlterDatabase(ctx1, newDB, ts); err != nil {
return err
}
mt.dbName2Meta[oldDB.Name] = newDB
log.Ctx(ctx).Info("alter database finished", zap.String("dbName", oldDB.Name), zap.Uint64("ts", ts))
mt.dbName2Meta[newDB.Name] = newDB
log.Ctx(ctx).Info("alter database finished", zap.String("dbName", newDB.Name), zap.Uint64("ts", ts))
return nil
}
func (mt *MetaTable) DropDatabase(ctx context.Context, dbName string, ts typeutil.Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
func (mt *MetaTable) CheckIfDatabaseDroppable(ctx context.Context, req *milvuspb.DropDatabaseRequest) error {
dbName := req.GetDbName()
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
if dbName == util.DefaultDBName {
return errors.New("can not drop default database")
}
db, err := mt.getDatabaseByNameInternal(ctx, dbName, typeutil.MaxTimestamp)
if err != nil {
if _, err := mt.getDatabaseByNameInternal(ctx, dbName, typeutil.MaxTimestamp); err != nil {
log.Ctx(ctx).Warn("not found database", zap.String("db", dbName))
return nil
return err
}
colls, err := mt.listCollectionFromCache(ctx, dbName, true)
@ -397,7 +409,18 @@ func (mt *MetaTable) DropDatabase(ctx context.Context, dbName string, ts typeuti
if len(colls) > 0 {
return fmt.Errorf("database:%s not empty, must drop all collections before drop database", dbName)
}
return nil
}
func (mt *MetaTable) DropDatabase(ctx context.Context, dbName string, ts typeutil.Timestamp) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
db, err := mt.getDatabaseByNameInternal(ctx, dbName, typeutil.MaxTimestamp)
if err != nil {
log.Ctx(ctx).Warn("not found database", zap.String("db", dbName))
return nil
}
if err := mt.catalog.DropDatabase(ctx, db.ID, ts); err != nil {
return err
}

View File

@ -1944,10 +1944,15 @@ func TestMetaTable_CreateDatabase(t *testing.T) {
db := model.NewDatabase(1, "exist", pb.DatabaseState_DatabaseCreated, nil)
t.Run("database already exist", func(t *testing.T) {
meta := &MetaTable{
names: newNameDb(),
names: newNameDb(),
aliases: newNameDb(),
dbName2Meta: make(map[string]*model.Database),
}
meta.names.insert("exist", "collection", 100)
err := meta.CreateDatabase(context.TODO(), db, 10000)
err := meta.CheckIfDatabaseCreatable(context.TODO(), &milvuspb.CreateDatabaseRequest{
DbName: "exist",
})
assert.Error(t, err)
})
@ -1975,14 +1980,16 @@ func TestMetaTable_CreateDatabase(t *testing.T) {
mock.Anything,
).Return(nil)
meta := &MetaTable{
dbName2Meta: map[string]*model.Database{
"exist": db,
},
names: newNameDb(),
aliases: newNameDb(),
catalog: catalog,
dbName2Meta: make(map[string]*model.Database),
names: newNameDb(),
aliases: newNameDb(),
catalog: catalog,
}
err := meta.CreateDatabase(context.TODO(), db, 10000)
err := meta.CheckIfDatabaseCreatable(context.TODO(), &milvuspb.CreateDatabaseRequest{
DbName: "exist",
})
assert.NoError(t, err)
err = meta.CreateDatabase(context.TODO(), db, 10000)
assert.NoError(t, err)
assert.True(t, meta.names.exist("exist"))
assert.True(t, meta.aliases.exist("exist"))
@ -2017,7 +2024,7 @@ func TestAlterDatabase(t *testing.T) {
Value: "value1",
},
}
err := meta.AlterDatabase(context.TODO(), db, newDB, typeutil.ZeroTimestamp)
err := meta.AlterDatabase(context.TODO(), newDB, typeutil.ZeroTimestamp)
assert.NoError(t, err)
})
@ -2047,33 +2054,9 @@ func TestAlterDatabase(t *testing.T) {
Value: "value1",
},
}
err := meta.AlterDatabase(context.TODO(), db, newDB, typeutil.ZeroTimestamp)
err := meta.AlterDatabase(context.TODO(), newDB, typeutil.ZeroTimestamp)
assert.ErrorIs(t, err, mockErr)
})
t.Run("alter database name", func(t *testing.T) {
catalog := mocks.NewRootCoordCatalog(t)
db := model.NewDatabase(1, "db1", pb.DatabaseState_DatabaseCreated, nil)
meta := &MetaTable{
dbName2Meta: map[string]*model.Database{
"db1": db,
},
names: newNameDb(),
aliases: newNameDb(),
catalog: catalog,
}
newDB := db.Clone()
newDB.Name = "db2"
db.Properties = []*commonpb.KeyValuePair{
{
Key: "key1",
Value: "value1",
},
}
err := meta.AlterDatabase(context.TODO(), db, newDB, typeutil.ZeroTimestamp)
assert.Error(t, err)
})
}
func TestMetaTable_EmtpyDatabaseName(t *testing.T) {
@ -2171,7 +2154,9 @@ func TestMetaTable_EmtpyDatabaseName(t *testing.T) {
func TestMetaTable_DropDatabase(t *testing.T) {
t.Run("can't drop default database", func(t *testing.T) {
mt := &MetaTable{}
err := mt.DropDatabase(context.TODO(), "default", 10000)
err := mt.CheckIfDatabaseDroppable(context.TODO(), &milvuspb.DropDatabaseRequest{
DbName: util.DefaultDBName,
})
assert.Error(t, err)
})
@ -2180,8 +2165,10 @@ func TestMetaTable_DropDatabase(t *testing.T) {
names: newNameDb(),
aliases: newNameDb(),
}
err := mt.DropDatabase(context.TODO(), "not_exist", 10000)
assert.NoError(t, err)
err := mt.CheckIfDatabaseDroppable(context.TODO(), &milvuspb.DropDatabaseRequest{
DbName: "not_exist",
})
assert.True(t, errors.Is(err, merr.ErrDatabaseNotFound))
})
t.Run("database not empty", func(t *testing.T) {
@ -2200,7 +2187,9 @@ func TestMetaTable_DropDatabase(t *testing.T) {
},
}
mt.names.insert("not_empty", "collection", 10000000)
err := mt.DropDatabase(context.TODO(), "not_empty", 10000)
err := mt.CheckIfDatabaseDroppable(context.TODO(), &milvuspb.DropDatabaseRequest{
DbName: "not_empty",
})
assert.Error(t, err)
})
@ -2221,7 +2210,11 @@ func TestMetaTable_DropDatabase(t *testing.T) {
}
mt.names.createDbIfNotExist("not_commit")
mt.aliases.createDbIfNotExist("not_commit")
err := mt.DropDatabase(context.TODO(), "not_commit", 10000)
err := mt.CheckIfDatabaseDroppable(context.TODO(), &milvuspb.DropDatabaseRequest{
DbName: "not_commit",
})
assert.NoError(t, err)
err = mt.DropDatabase(context.TODO(), "not_commit", 10000)
assert.Error(t, err)
})
@ -2242,7 +2235,11 @@ func TestMetaTable_DropDatabase(t *testing.T) {
}
mt.names.createDbIfNotExist("not_commit")
mt.aliases.createDbIfNotExist("not_commit")
err := mt.DropDatabase(context.TODO(), "not_commit", 10000)
err := mt.CheckIfDatabaseDroppable(context.TODO(), &milvuspb.DropDatabaseRequest{
DbName: "not_commit",
})
assert.NoError(t, err)
err = mt.DropDatabase(context.TODO(), "not_commit", 10000)
assert.NoError(t, err)
assert.False(t, mt.names.exist("not_commit"))
assert.False(t, mt.aliases.exist("not_commit"))

View File

@ -275,17 +275,17 @@ func (_c *IMetaTable_AlterCredential_Call) RunAndReturn(run func(context.Context
return _c
}
// AlterDatabase provides a mock function with given fields: ctx, oldDB, newDB, ts
func (_m *IMetaTable) AlterDatabase(ctx context.Context, oldDB *model.Database, newDB *model.Database, ts uint64) error {
ret := _m.Called(ctx, oldDB, newDB, ts)
// AlterDatabase provides a mock function with given fields: ctx, newDB, ts
func (_m *IMetaTable) AlterDatabase(ctx context.Context, newDB *model.Database, ts uint64) error {
ret := _m.Called(ctx, newDB, ts)
if len(ret) == 0 {
panic("no return value specified for AlterDatabase")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *model.Database, *model.Database, uint64) error); ok {
r0 = rf(ctx, oldDB, newDB, ts)
if rf, ok := ret.Get(0).(func(context.Context, *model.Database, uint64) error); ok {
r0 = rf(ctx, newDB, ts)
} else {
r0 = ret.Error(0)
}
@ -300,16 +300,15 @@ type IMetaTable_AlterDatabase_Call struct {
// AlterDatabase is a helper method to define mock.On call
// - ctx context.Context
// - oldDB *model.Database
// - newDB *model.Database
// - ts uint64
func (_e *IMetaTable_Expecter) AlterDatabase(ctx interface{}, oldDB interface{}, newDB interface{}, ts interface{}) *IMetaTable_AlterDatabase_Call {
return &IMetaTable_AlterDatabase_Call{Call: _e.mock.On("AlterDatabase", ctx, oldDB, newDB, ts)}
func (_e *IMetaTable_Expecter) AlterDatabase(ctx interface{}, newDB interface{}, ts interface{}) *IMetaTable_AlterDatabase_Call {
return &IMetaTable_AlterDatabase_Call{Call: _e.mock.On("AlterDatabase", ctx, newDB, ts)}
}
func (_c *IMetaTable_AlterDatabase_Call) Run(run func(ctx context.Context, oldDB *model.Database, newDB *model.Database, ts uint64)) *IMetaTable_AlterDatabase_Call {
func (_c *IMetaTable_AlterDatabase_Call) Run(run func(ctx context.Context, newDB *model.Database, ts uint64)) *IMetaTable_AlterDatabase_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*model.Database), args[2].(*model.Database), args[3].(uint64))
run(args[0].(context.Context), args[1].(*model.Database), args[2].(uint64))
})
return _c
}
@ -319,7 +318,7 @@ func (_c *IMetaTable_AlterDatabase_Call) Return(_a0 error) *IMetaTable_AlterData
return _c
}
func (_c *IMetaTable_AlterDatabase_Call) RunAndReturn(run func(context.Context, *model.Database, *model.Database, uint64) error) *IMetaTable_AlterDatabase_Call {
func (_c *IMetaTable_AlterDatabase_Call) RunAndReturn(run func(context.Context, *model.Database, uint64) error) *IMetaTable_AlterDatabase_Call {
_c.Call.Return(run)
return _c
}
@ -576,6 +575,100 @@ func (_c *IMetaTable_CheckIfCreateRole_Call) RunAndReturn(run func(context.Conte
return _c
}
// CheckIfDatabaseCreatable provides a mock function with given fields: ctx, req
func (_m *IMetaTable) CheckIfDatabaseCreatable(ctx context.Context, req *milvuspb.CreateDatabaseRequest) error {
ret := _m.Called(ctx, req)
if len(ret) == 0 {
panic("no return value specified for CheckIfDatabaseCreatable")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CreateDatabaseRequest) error); ok {
r0 = rf(ctx, req)
} else {
r0 = ret.Error(0)
}
return r0
}
// IMetaTable_CheckIfDatabaseCreatable_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckIfDatabaseCreatable'
type IMetaTable_CheckIfDatabaseCreatable_Call struct {
*mock.Call
}
// CheckIfDatabaseCreatable is a helper method to define mock.On call
// - ctx context.Context
// - req *milvuspb.CreateDatabaseRequest
func (_e *IMetaTable_Expecter) CheckIfDatabaseCreatable(ctx interface{}, req interface{}) *IMetaTable_CheckIfDatabaseCreatable_Call {
return &IMetaTable_CheckIfDatabaseCreatable_Call{Call: _e.mock.On("CheckIfDatabaseCreatable", ctx, req)}
}
func (_c *IMetaTable_CheckIfDatabaseCreatable_Call) Run(run func(ctx context.Context, req *milvuspb.CreateDatabaseRequest)) *IMetaTable_CheckIfDatabaseCreatable_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.CreateDatabaseRequest))
})
return _c
}
func (_c *IMetaTable_CheckIfDatabaseCreatable_Call) Return(_a0 error) *IMetaTable_CheckIfDatabaseCreatable_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *IMetaTable_CheckIfDatabaseCreatable_Call) RunAndReturn(run func(context.Context, *milvuspb.CreateDatabaseRequest) error) *IMetaTable_CheckIfDatabaseCreatable_Call {
_c.Call.Return(run)
return _c
}
// CheckIfDatabaseDroppable provides a mock function with given fields: ctx, req
func (_m *IMetaTable) CheckIfDatabaseDroppable(ctx context.Context, req *milvuspb.DropDatabaseRequest) error {
ret := _m.Called(ctx, req)
if len(ret) == 0 {
panic("no return value specified for CheckIfDatabaseDroppable")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.DropDatabaseRequest) error); ok {
r0 = rf(ctx, req)
} else {
r0 = ret.Error(0)
}
return r0
}
// IMetaTable_CheckIfDatabaseDroppable_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckIfDatabaseDroppable'
type IMetaTable_CheckIfDatabaseDroppable_Call struct {
*mock.Call
}
// CheckIfDatabaseDroppable is a helper method to define mock.On call
// - ctx context.Context
// - req *milvuspb.DropDatabaseRequest
func (_e *IMetaTable_Expecter) CheckIfDatabaseDroppable(ctx interface{}, req interface{}) *IMetaTable_CheckIfDatabaseDroppable_Call {
return &IMetaTable_CheckIfDatabaseDroppable_Call{Call: _e.mock.On("CheckIfDatabaseDroppable", ctx, req)}
}
func (_c *IMetaTable_CheckIfDatabaseDroppable_Call) Run(run func(ctx context.Context, req *milvuspb.DropDatabaseRequest)) *IMetaTable_CheckIfDatabaseDroppable_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.DropDatabaseRequest))
})
return _c
}
func (_c *IMetaTable_CheckIfDatabaseDroppable_Call) Return(_a0 error) *IMetaTable_CheckIfDatabaseDroppable_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *IMetaTable_CheckIfDatabaseDroppable_Call) RunAndReturn(run func(context.Context, *milvuspb.DropDatabaseRequest) error) *IMetaTable_CheckIfDatabaseDroppable_Call {
_c.Call.Return(run)
return _c
}
// CheckIfDeleteCredential provides a mock function with given fields: ctx, req
func (_m *IMetaTable) CheckIfDeleteCredential(ctx context.Context, req *milvuspb.DeleteCredentialRequest) error {
ret := _m.Called(ctx, req)

View File

@ -807,28 +807,12 @@ func (c *Core) CreateDatabase(ctx context.Context, in *milvuspb.CreateDatabaseRe
log.Ctx(ctx).Info("received request to create database", zap.String("role", typeutil.RootCoordRole),
zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
t := &createDatabaseTask{
baseTask: newBaseTask(ctx, c),
Req: in,
}
if err := c.scheduler.AddTask(t); err != nil {
log.Ctx(ctx).Info("failed to enqueue request to create database",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
return merr.Status(err), nil
}
if err := t.WaitToFinish(); err != nil {
if err := c.broadcastCreateDatabase(ctx, in); err != nil {
log.Ctx(ctx).Info("failed to create database",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("dbName", in.GetDbName()),
zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
zap.Int64("msgID", in.GetBase().GetMsgID()))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
return merr.Status(err), nil
}
@ -837,7 +821,7 @@ func (c *Core) CreateDatabase(ctx context.Context, in *milvuspb.CreateDatabaseRe
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Ctx(ctx).Info("done to create database", zap.String("role", typeutil.RootCoordRole),
zap.String("dbName", in.GetDbName()),
zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
zap.Int64("msgID", in.GetBase().GetMsgID()))
return merr.Success(), nil
}
@ -853,26 +837,15 @@ func (c *Core) DropDatabase(ctx context.Context, in *milvuspb.DropDatabaseReques
log.Ctx(ctx).Info("received request to drop database", zap.String("role", typeutil.RootCoordRole),
zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
t := &dropDatabaseTask{
baseTask: newBaseTask(ctx, c),
Req: in,
}
if err := c.scheduler.AddTask(t); err != nil {
log.Ctx(ctx).Info("failed to enqueue request to drop database", zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
return merr.Status(err), nil
}
if err := t.WaitToFinish(); err != nil {
if err := c.broadcastDropDatabase(ctx, in); err != nil {
if errors.Is(err, merr.ErrDatabaseNotFound) {
log.Ctx(ctx).Info("drop a database that not found, ignore it", zap.String("dbName", in.GetDbName()))
return merr.Success(), nil
}
log.Ctx(ctx).Info("failed to drop database", zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("dbName", in.GetDbName()),
zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
zap.Int64("msgID", in.GetBase().GetMsgID()))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
return merr.Status(err), nil
}
@ -881,8 +854,7 @@ func (c *Core) DropDatabase(ctx context.Context, in *milvuspb.DropDatabaseReques
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.CleanupRootCoordDBMetrics(in.GetDbName())
log.Ctx(ctx).Info("done to drop database", zap.String("role", typeutil.RootCoordRole),
zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()),
zap.Uint64("ts", t.GetTs()))
zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()))
return merr.Success(), nil
}
@ -1488,27 +1460,11 @@ func (c *Core) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseR
zap.String("name", in.GetDbName()),
zap.Any("props", in.Properties))
t := &alterDatabaseTask{
baseTask: newBaseTask(ctx, c),
Req: in,
}
if err := c.scheduler.AddTask(t); err != nil {
log.Warn("failed to enqueue request to alter database",
zap.String("role", typeutil.RootCoordRole),
zap.String("name", in.GetDbName()),
zap.Error(err))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
return merr.Status(err), nil
}
if err := t.WaitToFinish(); err != nil {
if err := c.broadcastAlterDatabase(ctx, in); err != nil {
log.Warn("failed to alter database",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("name", in.GetDbName()),
zap.Uint64("ts", t.GetTs()))
zap.String("name", in.GetDbName()))
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
return merr.Status(err), nil
@ -1516,12 +1472,11 @@ func (c *Core) AlterDatabase(ctx context.Context, in *rootcoordpb.AlterDatabaseR
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues(method).Observe(float64(t.queueDur.Milliseconds()))
// metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues(method).Observe(float64(t.queueDur.Milliseconds()))
log.Ctx(ctx).Info("done to alter database",
zap.String("role", typeutil.RootCoordRole),
zap.String("name", in.GetDbName()),
zap.Uint64("ts", t.GetTs()))
zap.String("name", in.GetDbName()))
return merr.Success(), nil
}

View File

@ -99,35 +99,6 @@ func TestRootCoord_CreateDatabase(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_NotReadyServe, resp.GetErrorCode())
})
t.Run("failed to add task", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withInvalidScheduler())
ctx := context.Background()
resp, err := c.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
t.Run("failed to execute", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withTaskFailScheduler())
ctx := context.Background()
resp, err := c.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
t.Run("ok", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withValidScheduler())
ctx := context.Background()
resp, err := c.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
}
func TestRootCoord_DropDatabase(t *testing.T) {
@ -138,35 +109,6 @@ func TestRootCoord_DropDatabase(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_NotReadyServe, resp.GetErrorCode())
})
t.Run("failed to add task", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withInvalidScheduler())
ctx := context.Background()
resp, err := c.DropDatabase(ctx, &milvuspb.DropDatabaseRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
t.Run("failed to execute", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withTaskFailScheduler())
ctx := context.Background()
resp, err := c.DropDatabase(ctx, &milvuspb.DropDatabaseRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
t.Run("ok", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withValidScheduler())
ctx := context.Background()
resp, err := c.DropDatabase(ctx, &milvuspb.DropDatabaseRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
}
func TestRootCoord_ListDatabases(t *testing.T) {
@ -216,35 +158,6 @@ func TestRootCoord_AlterDatabase(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_NotReadyServe, resp.GetErrorCode())
})
t.Run("failed to add task", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withInvalidScheduler())
ctx := context.Background()
resp, err := c.AlterDatabase(ctx, &rootcoordpb.AlterDatabaseRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
t.Run("failed to execute", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withTaskFailScheduler())
ctx := context.Background()
resp, err := c.AlterDatabase(ctx, &rootcoordpb.AlterDatabaseRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
t.Run("ok", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withValidScheduler())
ctx := context.Background()
resp, err := c.AlterDatabase(ctx, &rootcoordpb.AlterDatabaseRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
}
func TestRootCoord_CreateCollection(t *testing.T) {

View File

@ -553,22 +553,6 @@ func (s *WriteSchemaChangeWALStep) Desc() string {
return fmt.Sprintf("write schema change WALcollectionID: %d, ts: %d", s.collection.CollectionID, s.ts)
}
type AlterDatabaseStep struct {
baseStep
oldDB *model.Database
newDB *model.Database
ts Timestamp
}
func (a *AlterDatabaseStep) Execute(ctx context.Context) ([]nestedStep, error) {
err := a.core.meta.AlterDatabase(ctx, a.oldDB, a.newDB, a.ts)
return nil, err
}
func (a *AlterDatabaseStep) Desc() string {
return fmt.Sprintf("alter database, databaseID: %d, databaseName: %s, ts: %d", a.oldDB.ID, a.oldDB.Name, a.ts)
}
type renameCollectionStep struct {
baseStep
dbName string

View File

@ -124,15 +124,6 @@ func TestGetLockerKey(t *testing.T) {
key := tt.GetLockerKey()
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|111-2-true")
})
t.Run("alter database task locker key", func(t *testing.T) {
tt := &alterDatabaseTask{
Req: &rootcoordpb.AlterDatabaseRequest{
DbName: "foo",
},
}
key := tt.GetLockerKey()
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-true")
})
t.Run("create alias task locker key", func(t *testing.T) {
metaMock := mockrootcoord.NewIMetaTable(t)
c := &Core{
@ -161,15 +152,6 @@ func TestGetLockerKey(t *testing.T) {
key := tt.GetLockerKey()
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|10-2-true")
})
t.Run("create database task locker key", func(t *testing.T) {
tt := &createDatabaseTask{
Req: &milvuspb.CreateDatabaseRequest{
DbName: "foo",
},
}
key := tt.GetLockerKey()
assert.Equal(t, GetLockerKeyString(key), "$-0-true")
})
t.Run("create partition task locker key", func(t *testing.T) {
metaMock := mockrootcoord.NewIMetaTable(t)
metaMock.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
@ -281,15 +263,6 @@ func TestGetLockerKey(t *testing.T) {
key := tt.GetLockerKey()
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|111-2-true")
})
t.Run("drop database task locker key", func(t *testing.T) {
tt := &dropDatabaseTask{
Req: &milvuspb.DropDatabaseRequest{
DbName: "foo",
},
}
key := tt.GetLockerKey()
assert.Equal(t, GetLockerKeyString(key), "$-0-true")
})
t.Run("drop partition task locker key", func(t *testing.T) {
metaMock := mockrootcoord.NewIMetaTable(t)
metaMock.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything).