From 496331ffa8f003a82833616ff52df185301432e5 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Sat, 18 Oct 2025 15:12:01 +0800 Subject: [PATCH] enhance: support alias with WAL-based DDL framework (#44865) issue: #43897 - Alias related DDL is implemented by WAL-based DDL framework now. - Support following message type in wal AlterAlias, DropAlias. - Alias DDL can be synced by new CDC now. - Refactor some UT for Alias DDL. Signed-off-by: chyezh --- internal/rootcoord/alter_alias_task.go | 54 ---- internal/rootcoord/alter_alias_task_test.go | 78 ------ internal/rootcoord/create_alias_task.go | 98 ------- internal/rootcoord/create_alias_task_test.go | 99 ------- internal/rootcoord/ddl_callbacks.go | 20 ++ .../rootcoord/ddl_callbacks_alias_test.go | 186 +++++++++++++ .../rootcoord/ddl_callbacks_alter_alias.go | 117 +++++++++ .../rootcoord/ddl_callbacks_drop_alias.go | 77 ++++++ internal/rootcoord/drop_alias_task.go | 55 ---- internal/rootcoord/drop_alias_task_test.go | 104 -------- internal/rootcoord/meta_table.go | 136 +++++----- internal/rootcoord/meta_table_test.go | 12 +- internal/rootcoord/mock_test.go | 24 +- internal/rootcoord/mocks/meta_table.go | 245 ++++++++++++------ internal/rootcoord/root_coord.go | 135 +++------- internal/rootcoord/root_coord_test.go | 88 +------ internal/rootcoord/task_test.go | 49 ---- .../server/broadcaster/resource_key_locker.go | 12 +- .../broadcaster/resource_key_locker_test.go | 36 ++- 19 files changed, 723 insertions(+), 902 deletions(-) delete mode 100644 internal/rootcoord/alter_alias_task.go delete mode 100644 internal/rootcoord/alter_alias_task_test.go delete mode 100644 internal/rootcoord/create_alias_task.go delete mode 100644 internal/rootcoord/create_alias_task_test.go create mode 100644 internal/rootcoord/ddl_callbacks_alias_test.go create mode 100644 internal/rootcoord/ddl_callbacks_alter_alias.go create mode 100644 internal/rootcoord/ddl_callbacks_drop_alias.go delete mode 100644 internal/rootcoord/drop_alias_task.go delete mode 100644 internal/rootcoord/drop_alias_task_test.go diff --git a/internal/rootcoord/alter_alias_task.go b/internal/rootcoord/alter_alias_task.go deleted file mode 100644 index 080e18605f..0000000000 --- a/internal/rootcoord/alter_alias_task.go +++ /dev/null @@ -1,54 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rootcoord - -import ( - "context" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/milvus-io/milvus/internal/util/proxyutil" -) - -type alterAliasTask struct { - baseTask - Req *milvuspb.AlterAliasRequest -} - -func (t *alterAliasTask) Prepare(ctx context.Context) error { - if err := CheckMsgType(t.Req.GetBase().GetMsgType(), commonpb.MsgType_AlterAlias); err != nil { - return err - } - return nil -} - -func (t *alterAliasTask) Execute(ctx context.Context) error { - collID := t.core.meta.GetCollectionID(ctx, t.Req.GetDbName(), t.Req.GetCollectionName()) - if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, collID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_AlterAlias)); err != nil { - return err - } - // alter alias is atomic enough. - return t.core.meta.AlterAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.Req.GetCollectionName(), t.GetTs()) -} - -func (t *alterAliasTask) GetLockerKey() LockerKey { - return NewLockerKeyChain( - NewClusterLockerKey(false), - NewDatabaseLockerKey(t.Req.GetDbName(), false), - NewCollectionLockerKey(t.Req.GetCollectionName(), true), - ) -} diff --git a/internal/rootcoord/alter_alias_task_test.go b/internal/rootcoord/alter_alias_task_test.go deleted file mode 100644 index a9579afe25..0000000000 --- a/internal/rootcoord/alter_alias_task_test.go +++ /dev/null @@ -1,78 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rootcoord - -import ( - "context" - "testing" - - "github.com/cockroachdb/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" -) - -func Test_alterAliasTask_Prepare(t *testing.T) { - t.Run("invalid msg type", func(t *testing.T) { - task := &alterAliasTask{Req: &milvuspb.AlterAliasRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection}}} - err := task.Prepare(context.Background()) - assert.Error(t, err) - }) - - t.Run("normal case", func(t *testing.T) { - task := &alterAliasTask{Req: &milvuspb.AlterAliasRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterAlias}}} - err := task.Prepare(context.Background()) - assert.NoError(t, err) - }) -} - -func Test_alterAliasTask_Execute(t *testing.T) { - t.Run("failed to expire cache", func(t *testing.T) { - mockMeta := mockrootcoord.NewIMetaTable(t) - mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111) - core := newTestCore(withInvalidProxyManager(), withMeta(mockMeta)) - task := &alterAliasTask{ - baseTask: newBaseTask(context.Background(), core), - Req: &milvuspb.AlterAliasRequest{ - Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterAlias}, - Alias: "test", - }, - } - err := task.Execute(context.Background()) - assert.Error(t, err) - }) - - t.Run("failed to alter alias", func(t *testing.T) { - mockMeta := mockrootcoord.NewIMetaTable(t) - mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111) - mockMeta.EXPECT().AlterAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return(errors.New("failed to alter alias")) - core := newTestCore(withValidProxyManager(), withMeta(mockMeta)) - task := &alterAliasTask{ - baseTask: newBaseTask(context.Background(), core), - Req: &milvuspb.AlterAliasRequest{ - Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterAlias}, - Alias: "test", - }, - } - err := task.Execute(context.Background()) - assert.Error(t, err) - }) -} diff --git a/internal/rootcoord/create_alias_task.go b/internal/rootcoord/create_alias_task.go deleted file mode 100644 index 9527487c5d..0000000000 --- a/internal/rootcoord/create_alias_task.go +++ /dev/null @@ -1,98 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rootcoord - -import ( - "context" - "fmt" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/milvus-io/milvus/internal/metastore/model" - "github.com/milvus-io/milvus/internal/util/proxyutil" - "github.com/milvus-io/milvus/pkg/v2/log" -) - -type createAliasTask struct { - baseTask - Req *milvuspb.CreateAliasRequest -} - -func (t *createAliasTask) Prepare(ctx context.Context) error { - if err := CheckMsgType(t.Req.GetBase().GetMsgType(), commonpb.MsgType_CreateAlias); err != nil { - return err - } - return nil -} - -func (t *createAliasTask) Execute(ctx context.Context) error { - oldColl, err := t.core.meta.GetCollectionByName(ctx, t.Req.GetDbName(), t.Req.GetCollectionName(), t.ts) - if err != nil { - log.Ctx(ctx).Warn("get collection failed during create alias", - zap.String("collectionName", t.Req.GetCollectionName()), zap.Uint64("ts", t.ts)) - return err - } - - return executeCreateAliasTaskSteps(ctx, t.core, t.Req, oldColl, t.GetTs()) -} - -func (t *createAliasTask) GetLockerKey() LockerKey { - return NewLockerKeyChain( - NewClusterLockerKey(false), - NewDatabaseLockerKey(t.Req.GetDbName(), false), - NewCollectionLockerKey(t.Req.GetCollectionName(), true), - ) -} - -func executeCreateAliasTaskSteps(ctx context.Context, core *Core, req *milvuspb.CreateAliasRequest, oldColl *model.Collection, ts Timestamp) error { - redoTask := newBaseRedoTask(core.stepExecutor) - - // properties needs to be refreshed in the cache - aliases := core.meta.ListAliasesByID(ctx, oldColl.CollectionID) - redoTask.AddSyncStep(&expireCacheStep{ - baseStep: baseStep{core: core}, - dbName: req.GetDbName(), - collectionNames: append(aliases, req.GetCollectionName()), - collectionID: oldColl.CollectionID, - opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_CreateAlias)}, - }) - - redoTask.AddSyncStep(&CreateAliasStep{ - baseStep: baseStep{core: core}, - Req: req, - ts: ts, - }) - - return redoTask.Execute(ctx) -} - -type CreateAliasStep struct { - baseStep - Req *milvuspb.CreateAliasRequest - ts Timestamp -} - -func (c *CreateAliasStep) Execute(ctx context.Context) ([]nestedStep, error) { - err := c.core.meta.CreateAlias(ctx, c.Req.GetDbName(), c.Req.GetAlias(), c.Req.GetCollectionName(), c.ts) - return nil, err -} - -func (c *CreateAliasStep) Desc() string { - return fmt.Sprintf("create alias %s, for collection %s", c.Req.GetAlias(), c.Req.GetCollectionName()) -} diff --git a/internal/rootcoord/create_alias_task_test.go b/internal/rootcoord/create_alias_task_test.go deleted file mode 100644 index e435fde21e..0000000000 --- a/internal/rootcoord/create_alias_task_test.go +++ /dev/null @@ -1,99 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rootcoord - -import ( - "context" - "testing" - - "github.com/cockroachdb/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/milvus-io/milvus/internal/metastore/model" - mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" -) - -func Test_createAliasTask_Prepare(t *testing.T) { - t.Run("invalid msg type", func(t *testing.T) { - task := &createAliasTask{Req: &milvuspb.CreateAliasRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection}}} - err := task.Prepare(context.Background()) - assert.Error(t, err) - }) - - t.Run("normal case", func(t *testing.T) { - task := &createAliasTask{Req: &milvuspb.CreateAliasRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateAlias}}} - err := task.Prepare(context.Background()) - assert.NoError(t, err) - }) -} - -func Test_createAliasTask_Execute(t *testing.T) { - t.Run("failed_to_describe_collection", func(t *testing.T) { - mockMeta := mockrootcoord.NewIMetaTable(t) - mockMeta.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("mocked")) - - core := newTestCore(withMeta(mockMeta)) - task := &createAliasTask{ - baseTask: newBaseTask(context.Background(), core), - Req: &milvuspb.CreateAliasRequest{ - Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateAlias}, - Alias: "test", - }, - } - err := task.Execute(context.Background()) - assert.Error(t, err) - }) - - t.Run("failed_to_invalidate_cache", func(t *testing.T) { - mockMeta := mockrootcoord.NewIMetaTable(t) - mockMeta.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&model.Collection{CollectionID: 111}, nil) - mockMeta.EXPECT().ListAliasesByID(mock.Anything, mock.Anything).Return([]string{}) - - core := newTestCore(withMeta(mockMeta), withInvalidProxyManager()) - task := &createAliasTask{ - baseTask: newBaseTask(context.Background(), core), - Req: &milvuspb.CreateAliasRequest{ - Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateAlias}, - CollectionName: "coll_test", - Alias: "test", - }, - } - err := task.Execute(context.Background()) - assert.Error(t, err) - }) - - t.Run("failed_to_create_alias", func(t *testing.T) { - mockMeta := mockrootcoord.NewIMetaTable(t) - mockMeta.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&model.Collection{CollectionID: 111}, nil) - mockMeta.EXPECT().ListAliasesByID(mock.Anything, mock.Anything).Return([]string{}) - mockMeta.EXPECT().CreateAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mocked")) - core := newTestCore(withMeta(mockMeta), withValidProxyManager()) - task := &createAliasTask{ - baseTask: newBaseTask(context.Background(), core), - Req: &milvuspb.CreateAliasRequest{ - Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateAlias}, - CollectionName: "coll_test", - Alias: "test", - }, - } - err := task.Execute(context.Background()) - assert.Error(t, err) - }) -} diff --git a/internal/rootcoord/ddl_callbacks.go b/internal/rootcoord/ddl_callbacks.go index 286a72b380..8bfafcd72d 100644 --- a/internal/rootcoord/ddl_callbacks.go +++ b/internal/rootcoord/ddl_callbacks.go @@ -38,6 +38,7 @@ func RegisterDDLCallbacks(core *Core) { } ddlCallback.registerRBACCallbacks() ddlCallback.registerDatabaseCallbacks() + ddlCallback.registerAliasCallbacks() } // registerRBACCallbacks registers the rbac callbacks. @@ -62,6 +63,12 @@ func (c *DDLCallback) registerDatabaseCallbacks() { registry.RegisterDropDatabaseV2AckCallback(c.dropDatabaseV1AckCallback) } +// registerAliasCallbacks registers the alias callbacks. +func (c *DDLCallback) registerAliasCallbacks() { + registry.RegisterAlterAliasV2AckCallback(c.alterAliasV2AckCallback) + registry.RegisterDropAliasV2AckCallback(c.dropAliasV2AckCallback) +} + // DDLCallback is the callback of ddl. type DDLCallback struct { *Core @@ -118,3 +125,16 @@ func startBroadcastWithDatabaseLock(ctx context.Context, dbName string) (broadca } return broadcaster, nil } + +// startBroadcastWithAlterAliasLock starts a broadcast with alter alias lock. +func startBroadcastWithAlterAliasLock(ctx context.Context, dbName string, collectionName string, alias string) (broadcaster.BroadcastAPI, error) { + broadcaster, err := broadcast.StartBroadcastWithResourceKeys(ctx, + message.NewSharedDBNameResourceKey(dbName), + message.NewExclusiveCollectionNameResourceKey(dbName, collectionName), + message.NewExclusiveCollectionNameResourceKey(dbName, alias), + ) + if err != nil { + return nil, errors.Wrap(err, "failed to start broadcast with alter alias lock") + } + return broadcaster, nil +} diff --git a/internal/rootcoord/ddl_callbacks_alias_test.go b/internal/rootcoord/ddl_callbacks_alias_test.go new file mode 100644 index 0000000000..d2cf9ce4e3 --- /dev/null +++ b/internal/rootcoord/ddl_callbacks_alias_test.go @@ -0,0 +1,186 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rootcoord + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord" + "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry" + kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" + pb "github.com/milvus-io/milvus/pkg/v2/proto/etcdpb" + "github.com/milvus-io/milvus/pkg/v2/util/funcutil" + "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" +) + +func TestDDLCallbacksAliasDDL(t *testing.T) { + initStreamingSystem() + + kv, _ := kvfactory.GetEtcdAndPath() + path := funcutil.RandomString(10) + catalogKV := etcdkv.NewEtcdKV(kv, path) + + ss, err := rootcoord.NewSuffixSnapshot(catalogKV, rootcoord.SnapshotsSep, path, rootcoord.SnapshotPrefix) + require.NoError(t, err) + + testDB := newNameDb() + collID2Meta := make(map[typeutil.UniqueID]*model.Collection) + core := newTestCore(withHealthyCode(), + withMeta(&MetaTable{ + catalog: rootcoord.NewCatalog(catalogKV, ss), + names: testDB, + aliases: newNameDb(), + dbName2Meta: make(map[string]*model.Database), + collID2Meta: collID2Meta, + }), + withValidProxyManager(), + withValidIDAllocator(), + ) + registry.ResetRegistration() + RegisterDDLCallbacks(core) + + status, err := core.CreateDatabase(context.Background(), &milvuspb.CreateDatabaseRequest{ + DbName: "test", + }) + require.NoError(t, merr.CheckRPCCall(status, err)) + // TODO: after refactor create collection, we can use CreateCollection to create a collection directly. + testDB.insert("test", "test_collection", 1) + testDB.insert("test", "test_collection2", 2) + collID2Meta[1] = &model.Collection{ + CollectionID: 1, + Name: "test_collection", + State: pb.CollectionState_CollectionCreated, + } + collID2Meta[2] = &model.Collection{ + CollectionID: 2, + Name: "test_collection2", + State: pb.CollectionState_CollectionCreated, + } + + // create an alias with a not-exist database. + status, err = core.CreateAlias(context.Background(), &milvuspb.CreateAliasRequest{ + DbName: "test2", + CollectionName: "test_collection", + Alias: "test_alias", + }) + require.Error(t, merr.CheckRPCCall(status, err)) + + // create an alias with a not-exist collection. + status, err = core.CreateAlias(context.Background(), &milvuspb.CreateAliasRequest{ + DbName: "test", + CollectionName: "test_collection3", + Alias: "test_alias", + }) + require.Error(t, merr.CheckRPCCall(status, err)) + + // create an alias + status, err = core.CreateAlias(context.Background(), &milvuspb.CreateAliasRequest{ + DbName: "test", + CollectionName: "test_collection", + Alias: "test_alias", + }) + require.NoError(t, merr.CheckRPCCall(status, err)) + + coll, err := core.meta.GetCollectionByName(context.Background(), "test", "test_alias", typeutil.MaxTimestamp) + require.NoError(t, err) + require.Equal(t, int64(1), coll.CollectionID) + require.Equal(t, "test_collection", coll.Name) + + // create an alias already created on current collection should be ok. + status, err = core.CreateAlias(context.Background(), &milvuspb.CreateAliasRequest{ + DbName: "test", + CollectionName: "test_collection", + Alias: "test_alias", + }) + require.NoError(t, merr.CheckRPCCall(status, err)) + + // create an alias already created on another collection should be error. + status, err = core.CreateAlias(context.Background(), &milvuspb.CreateAliasRequest{ + DbName: "test", + CollectionName: "test_collection2", + Alias: "test_alias", + }) + require.Error(t, merr.CheckRPCCall(status, err)) + + // test alter alias already created on current collection should be ok. + status, err = core.AlterAlias(context.Background(), &milvuspb.AlterAliasRequest{ + DbName: "test", + CollectionName: "test_collection", + Alias: "test_alias", + }) + require.NoError(t, merr.CheckRPCCall(status, err)) + + // test alter alias to another collection should be ok. + status, err = core.AlterAlias(context.Background(), &milvuspb.AlterAliasRequest{ + DbName: "test", + CollectionName: "test_collection2", + Alias: "test_alias", + }) + require.NoError(t, merr.CheckRPCCall(status, err)) + + // alter alias to a not-exist database should be error. + status, err = core.AlterAlias(context.Background(), &milvuspb.AlterAliasRequest{ + DbName: "test2", + CollectionName: "test_collection2", + Alias: "test_alias", + }) + require.Error(t, merr.CheckRPCCall(status, err)) + + // alter alias to a not-exist collection should be error. + status, err = core.AlterAlias(context.Background(), &milvuspb.AlterAliasRequest{ + DbName: "test", + CollectionName: "test_collection3", + Alias: "test_alias", + }) + require.Error(t, merr.CheckRPCCall(status, err)) + + // alter alias to a not exist alias should be error. + status, err = core.AlterAlias(context.Background(), &milvuspb.AlterAliasRequest{ + DbName: "test", + CollectionName: "test_collection2", + Alias: "test_alias2", + }) + require.Error(t, merr.CheckRPCCall(status, err)) + + // drop a not exist alias should be ok. + status, err = core.DropAlias(context.Background(), &milvuspb.DropAliasRequest{ + DbName: "test", + Alias: "test_alias2", + }) + require.NoError(t, merr.CheckRPCCall(status, err)) + + // drop a alias exist should be ok. + status, err = core.DropAlias(context.Background(), &milvuspb.DropAliasRequest{ + DbName: "test", + Alias: "test_alias", + }) + require.NoError(t, merr.CheckRPCCall(status, err)) + + // drop a alias already dropped should be ok. + status, err = core.DropAlias(context.Background(), &milvuspb.DropAliasRequest{ + DbName: "test", + Alias: "test_alias", + }) + require.NoError(t, merr.CheckRPCCall(status, err)) +} diff --git a/internal/rootcoord/ddl_callbacks_alter_alias.go b/internal/rootcoord/ddl_callbacks_alter_alias.go new file mode 100644 index 0000000000..6e5f564050 --- /dev/null +++ b/internal/rootcoord/ddl_callbacks_alter_alias.go @@ -0,0 +1,117 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rootcoord + +import ( + "context" + "strings" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/distributed/streaming" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/message/ce" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" +) + +func (c *Core) broadcastCreateAlias(ctx context.Context, req *milvuspb.CreateAliasRequest) error { + req.DbName = strings.TrimSpace(req.DbName) + req.Alias = strings.TrimSpace(req.Alias) + req.CollectionName = strings.TrimSpace(req.CollectionName) + broadcaster, err := startBroadcastWithAlterAliasLock(ctx, req.GetDbName(), req.GetCollectionName(), req.GetAlias()) + if err != nil { + return err + } + defer broadcaster.Close() + + if err := c.meta.CheckIfAliasCreatable(ctx, req.GetDbName(), req.GetAlias(), req.GetCollectionName()); err != nil { + return err + } + + db, err := c.meta.GetDatabaseByName(ctx, req.GetDbName(), typeutil.MaxTimestamp) + if err != nil { + return err + } + collection, err := c.meta.GetCollectionByName(ctx, req.GetDbName(), req.GetCollectionName(), typeutil.MaxTimestamp) + if err != nil { + return err + } + + msg := message.NewAlterAliasMessageBuilderV2(). + WithHeader(&message.AlterAliasMessageHeader{ + DbId: db.ID, + DbName: req.GetDbName(), + CollectionId: collection.CollectionID, + Alias: req.GetAlias(), + CollectionName: req.GetCollectionName(), + }). + WithBody(&message.AlterAliasMessageBody{}). + WithBroadcast([]string{streaming.WAL().ControlChannel()}). + MustBuildBroadcast() + _, err = broadcaster.Broadcast(ctx, msg) + return err +} + +func (c *Core) broadcastAlterAlias(ctx context.Context, req *milvuspb.AlterAliasRequest) error { + req.DbName = strings.TrimSpace(req.DbName) + req.Alias = strings.TrimSpace(req.Alias) + req.CollectionName = strings.TrimSpace(req.CollectionName) + broadcaster, err := startBroadcastWithAlterAliasLock(ctx, req.GetDbName(), req.GetCollectionName(), req.GetAlias()) + if err != nil { + return err + } + defer broadcaster.Close() + + if err := c.meta.CheckIfAliasAlterable(ctx, req.GetDbName(), req.GetAlias(), req.GetCollectionName()); err != nil { + return err + } + + db, err := c.meta.GetDatabaseByName(ctx, req.GetDbName(), typeutil.MaxTimestamp) + if err != nil { + return err + } + collection, err := c.meta.GetCollectionByName(ctx, req.GetDbName(), req.GetCollectionName(), typeutil.MaxTimestamp) + if err != nil { + return err + } + + msg := message.NewAlterAliasMessageBuilderV2(). + WithHeader(&message.AlterAliasMessageHeader{ + DbId: db.ID, + DbName: req.GetDbName(), + CollectionId: collection.CollectionID, + Alias: req.GetAlias(), + CollectionName: req.GetCollectionName(), + }). + WithBody(&message.AlterAliasMessageBody{}). + WithBroadcast([]string{streaming.WAL().ControlChannel()}). + MustBuildBroadcast() + _, err = broadcaster.Broadcast(ctx, msg) + return err +} + +func (c *DDLCallback) alterAliasV2AckCallback(ctx context.Context, result message.BroadcastResultAlterAliasMessageV2) error { + if err := c.meta.AlterAlias(ctx, result); err != nil { + return err + } + return c.ExpireCaches(ctx, + ce.NewBuilder().WithLegacyProxyCollectionMetaCache( + ce.OptLPCMDBName(result.Message.Header().DbName), + ce.OptLPCMCollectionName(result.Message.Header().Alias), + ce.OptLPCMMsgType(commonpb.MsgType_AlterAlias)), + result.GetControlChannelResult().TimeTick) +} diff --git a/internal/rootcoord/ddl_callbacks_drop_alias.go b/internal/rootcoord/ddl_callbacks_drop_alias.go new file mode 100644 index 0000000000..18ba709968 --- /dev/null +++ b/internal/rootcoord/ddl_callbacks_drop_alias.go @@ -0,0 +1,77 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rootcoord + +import ( + "context" + "strings" + + "github.com/cockroachdb/errors" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/distributed/streaming" + "github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/broadcast" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/message/ce" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" +) + +func (c *Core) broadcastDropAlias(ctx context.Context, req *milvuspb.DropAliasRequest) error { + req.DbName = strings.TrimSpace(req.DbName) + req.Alias = strings.TrimSpace(req.Alias) + broadcaster, err := broadcast.StartBroadcastWithResourceKeys(ctx, + message.NewSharedDBNameResourceKey(req.GetDbName()), + message.NewExclusiveCollectionNameResourceKey(req.GetDbName(), req.GetAlias())) + if err != nil { + return err + } + defer broadcaster.Close() + + db, err := c.meta.GetDatabaseByName(ctx, req.GetDbName(), typeutil.MaxTimestamp) + if err != nil { + return err + } + if err := c.meta.CheckIfAliasDroppable(ctx, req.GetDbName(), req.GetAlias()); err != nil { + return err + } + msg := message.NewDropAliasMessageBuilderV2(). + WithHeader(&message.DropAliasMessageHeader{ + DbId: db.ID, + DbName: req.GetDbName(), + Alias: req.GetAlias(), + }). + WithBody(&message.DropAliasMessageBody{}). + WithBroadcast([]string{streaming.WAL().ControlChannel()}). + MustBuildBroadcast() + _, err = broadcaster.Broadcast(ctx, msg) + return err +} + +func (c *DDLCallback) dropAliasV2AckCallback(ctx context.Context, result message.BroadcastResultDropAliasMessageV2) error { + if err := c.meta.DropAlias(ctx, result); err != nil { + return errors.Wrap(err, "failed to drop alias") + } + return c.ExpireCaches(ctx, + ce.NewBuilder().WithLegacyProxyCollectionMetaCache( + ce.OptLPCMDBName(result.Message.Header().DbName), + ce.OptLPCMCollectionName(result.Message.Header().Alias), + ce.OptLPCMMsgType(commonpb.MsgType_DropAlias), + ), + result.GetControlChannelResult().TimeTick, + ) +} diff --git a/internal/rootcoord/drop_alias_task.go b/internal/rootcoord/drop_alias_task.go deleted file mode 100644 index 18d390b6b2..0000000000 --- a/internal/rootcoord/drop_alias_task.go +++ /dev/null @@ -1,55 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rootcoord - -import ( - "context" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/milvus-io/milvus/internal/util/proxyutil" -) - -type dropAliasTask struct { - baseTask - Req *milvuspb.DropAliasRequest -} - -func (t *dropAliasTask) Prepare(ctx context.Context) error { - if err := CheckMsgType(t.Req.GetBase().GetMsgType(), commonpb.MsgType_DropAlias); err != nil { - return err - } - return nil -} - -func (t *dropAliasTask) Execute(ctx context.Context) error { - collID := t.core.meta.GetCollectionID(ctx, t.Req.GetDbName(), t.Req.GetAlias()) - // drop alias is atomic enough. - if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, collID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_DropAlias)); err != nil { - return err - } - return t.core.meta.DropAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.GetTs()) -} - -func (t *dropAliasTask) GetLockerKey() LockerKey { - collection := t.core.getCollectionIDStr(t.ctx, t.Req.GetDbName(), t.Req.GetAlias(), 0) - return NewLockerKeyChain( - NewClusterLockerKey(false), - NewDatabaseLockerKey(t.Req.GetDbName(), false), - NewCollectionLockerKey(collection, true), - ) -} diff --git a/internal/rootcoord/drop_alias_task_test.go b/internal/rootcoord/drop_alias_task_test.go deleted file mode 100644 index 6e8c93be09..0000000000 --- a/internal/rootcoord/drop_alias_task_test.go +++ /dev/null @@ -1,104 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rootcoord - -import ( - "context" - "testing" - - "github.com/cockroachdb/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" - "github.com/milvus-io/milvus/pkg/v2/util/funcutil" -) - -func Test_dropAliasTask_Prepare(t *testing.T) { - t.Run("invalid msg type", func(t *testing.T) { - task := &dropAliasTask{ - Req: &milvuspb.DropAliasRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection}}, - } - err := task.Prepare(context.Background()) - assert.Error(t, err) - }) - - t.Run("normal case", func(t *testing.T) { - task := &dropAliasTask{ - Req: &milvuspb.DropAliasRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropAlias}}, - } - err := task.Prepare(context.Background()) - assert.NoError(t, err) - }) -} - -func Test_dropAliasTask_Execute(t *testing.T) { - t.Run("failed to expire cache", func(t *testing.T) { - mockMeta := mockrootcoord.NewIMetaTable(t) - mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111) - alias := funcutil.GenRandomStr() - core := newTestCore(withInvalidProxyManager(), withMeta(mockMeta)) - task := &dropAliasTask{ - baseTask: newBaseTask(context.Background(), core), - Req: &milvuspb.DropAliasRequest{ - Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropAlias}, - Alias: alias, - }, - } - err := task.Execute(context.Background()) - assert.Error(t, err) - }) - - t.Run("failed to drop alias", func(t *testing.T) { - mockMeta := mockrootcoord.NewIMetaTable(t) - mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111) - mockMeta.EXPECT().DropAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return(errors.New("failed to alter alias")) - core := newTestCore(withValidProxyManager(), withMeta(mockMeta)) - alias := funcutil.GenRandomStr() - task := &dropAliasTask{ - baseTask: newBaseTask(context.Background(), core), - Req: &milvuspb.DropAliasRequest{ - Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropAlias}, - Alias: alias, - }, - } - err := task.Execute(context.Background()) - assert.Error(t, err) - }) - - t.Run("normal case", func(t *testing.T) { - mockMeta := mockrootcoord.NewIMetaTable(t) - mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111) - mockMeta.EXPECT().DropAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return(nil) - - core := newTestCore(withValidProxyManager(), withMeta(mockMeta)) - alias := funcutil.GenRandomStr() - task := &dropAliasTask{ - baseTask: newBaseTask(context.Background(), core), - Req: &milvuspb.DropAliasRequest{ - Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropAlias}, - Alias: alias, - }, - } - err := task.Execute(context.Background()) - assert.NoError(t, err) - }) -} diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 7211e35b2b..41bf6d09e3 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -50,11 +50,17 @@ import ( "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) +var errIgnoredAlterAlias = errors.New("ignored alter alias") // alias already created on current collection, so it can be ignored. + type MetaTableChecker interface { RBACChecker CheckIfDatabaseCreatable(ctx context.Context, req *milvuspb.CreateDatabaseRequest) error CheckIfDatabaseDroppable(ctx context.Context, req *milvuspb.DropDatabaseRequest) error + + CheckIfAliasCreatable(ctx context.Context, dbName string, alias string, collectionName string) error + CheckIfAliasAlterable(ctx context.Context, dbName string, alias string, collectionName string) error + CheckIfAliasDroppable(ctx context.Context, dbName string, alias string) error } //go:generate mockery --name=IMetaTable --structname=MockIMetaTable --output=./ --filename=mock_meta_table.go --with-expecter --inpackage @@ -89,11 +95,13 @@ type IMetaTable interface { AddPartition(ctx context.Context, partition *model.Partition) error ChangePartitionState(ctx context.Context, collectionID UniqueID, partitionID UniqueID, state pb.PartitionState, ts Timestamp) error RemovePartition(ctx context.Context, dbID int64, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error - CreateAlias(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error - DropAlias(ctx context.Context, dbName string, alias string, ts Timestamp) error - AlterAlias(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error + + // Alias + AlterAlias(ctx context.Context, result message.BroadcastResultAlterAliasMessageV2) error + DropAlias(ctx context.Context, result message.BroadcastResultDropAliasMessageV2) error DescribeAlias(ctx context.Context, dbName string, alias string, ts Timestamp) (string, error) ListAliases(ctx context.Context, dbName string, collectionName string, ts Timestamp) ([]string, error) + AlterCollection(ctx context.Context, oldColl *model.Collection, newColl *model.Collection, ts Timestamp, fieldModify bool) error RenameCollection(ctx context.Context, dbName string, oldName string, newDBName string, newName string, ts Timestamp) error GetGeneralCount(ctx context.Context) int @@ -1113,9 +1121,9 @@ func (mt *MetaTable) RemovePartition(ctx context.Context, dbID int64, collection return nil } -func (mt *MetaTable) CreateAlias(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error { - mt.ddLock.Lock() - defer mt.ddLock.Unlock() +func (mt *MetaTable) CheckIfAliasCreatable(ctx context.Context, dbName string, alias string, collectionName string) error { + mt.ddLock.RLock() + defer mt.ddLock.RUnlock() // backward compatibility for rolling upgrade if dbName == "" { log.Ctx(ctx).Warn("db name is empty", zap.String("alias", alias), zap.String("collection", collectionName)) @@ -1149,8 +1157,8 @@ func (mt *MetaTable) CreateAlias(ctx context.Context, dbName string, alias strin // check if alias exists. aliasedCollectionID, ok := mt.aliases.get(dbName, alias) if ok && aliasedCollectionID == collectionID { - log.Ctx(ctx).Warn("add duplicate alias", zap.String("alias", alias), zap.String("collection", collectionName), zap.Uint64("ts", ts)) - return nil + log.Ctx(ctx).Warn("add duplicate alias", zap.String("alias", alias), zap.String("collection", collectionName)) + return errIgnoredAlterAlias } else if ok { // TODO: better to check if aliasedCollectionID exist or is available, though not very possible. aliasedColl := mt.collID2Meta[aliasedCollectionID] @@ -1164,63 +1172,69 @@ func (mt *MetaTable) CreateAlias(ctx context.Context, dbName string, alias strin // you cannot alias to a non-existent collection. return merr.WrapErrCollectionNotFoundWithDB(dbName, collectionName) } - - ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue()) - if err := mt.catalog.CreateAlias(ctx1, &model.Alias{ - Name: alias, - CollectionID: collectionID, - CreatedTime: ts, - State: pb.AliasState_AliasCreated, - DbID: coll.DBID, - }, ts); err != nil { - return err - } - - mt.aliases.insert(dbName, alias, collectionID) - - log.Ctx(ctx).Info("create alias", - zap.String("db", dbName), - zap.String("alias", alias), - zap.String("collection", collectionName), - zap.Int64("id", coll.CollectionID), - zap.Uint64("ts", ts), - ) - return nil } -func (mt *MetaTable) DropAlias(ctx context.Context, dbName string, alias string, ts Timestamp) error { +func (mt *MetaTable) CheckIfAliasDroppable(ctx context.Context, dbName string, alias string) error { + mt.ddLock.RLock() + defer mt.ddLock.RUnlock() + + if _, ok := mt.aliases.get(dbName, alias); !ok { + return merr.WrapErrAliasNotFound(dbName, alias) + } + return nil +} + +func (mt *MetaTable) DropAlias(ctx context.Context, result message.BroadcastResultDropAliasMessageV2) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() - // backward compatibility for rolling upgrade - if dbName == "" { - log.Ctx(ctx).Warn("db name is empty", zap.String("alias", alias), zap.Uint64("ts", ts)) - dbName = util.DefaultDBName - } + + header := result.Message.Header() ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue()) - db, err := mt.getDatabaseByNameInternal(ctx, dbName, typeutil.MaxTimestamp) - if err != nil { + if err := mt.catalog.DropAlias(ctx1, header.DbId, header.Alias, result.GetControlChannelResult().TimeTick); err != nil { return err } - if err := mt.catalog.DropAlias(ctx1, db.ID, alias, ts); err != nil { - return err - } - - mt.aliases.remove(dbName, alias) + mt.aliases.remove(header.DbName, header.Alias) log.Ctx(ctx).Info("drop alias", - zap.String("db", dbName), - zap.String("alias", alias), - zap.Uint64("ts", ts), + zap.String("db", header.DbName), + zap.String("alias", header.Alias), + zap.Uint64("ts", result.GetControlChannelResult().TimeTick), ) - return nil } -func (mt *MetaTable) AlterAlias(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error { +func (mt *MetaTable) AlterAlias(ctx context.Context, result message.BroadcastResultAlterAliasMessageV2) error { mt.ddLock.Lock() defer mt.ddLock.Unlock() + + header := result.Message.Header() + if err := mt.catalog.AlterAlias(ctx, &model.Alias{ + Name: header.Alias, + CollectionID: header.CollectionId, + CreatedTime: result.GetControlChannelResult().TimeTick, + State: pb.AliasState_AliasCreated, + DbID: header.DbId, + }, result.GetControlChannelResult().TimeTick); err != nil { + return err + } + + // alias switch to another collection anyway. + mt.aliases.insert(header.DbName, header.Alias, header.CollectionId) + + log.Ctx(ctx).Info("alter alias", + zap.String("db", header.DbName), + zap.String("alias", header.Alias), + zap.String("collection", header.CollectionName), + zap.Uint64("ts", result.GetControlChannelResult().TimeTick), + ) + return nil +} + +func (mt *MetaTable) CheckIfAliasAlterable(ctx context.Context, dbName string, alias string, collectionName string) error { + mt.ddLock.RLock() + defer mt.ddLock.RUnlock() // backward compatibility for rolling upgrade if dbName == "" { log.Ctx(ctx).Warn("db name is empty", zap.String("alias", alias), zap.String("collection", collectionName)) @@ -1255,33 +1269,13 @@ func (mt *MetaTable) AlterAlias(ctx context.Context, dbName string, alias string } // check if alias exists. - _, ok = mt.aliases.get(dbName, alias) + existAliasCollectionID, ok := mt.aliases.get(dbName, alias) if !ok { - // return merr.WrapErrAliasNotFound(dbName, alias) } - - ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue()) - if err := mt.catalog.AlterAlias(ctx1, &model.Alias{ - Name: alias, - CollectionID: collectionID, - CreatedTime: ts, - State: pb.AliasState_AliasCreated, - DbID: coll.DBID, - }, ts); err != nil { - return err + if existAliasCollectionID == collectionID { + return errIgnoredAlterAlias } - - // alias switch to another collection anyway. - mt.aliases.insert(dbName, alias, collectionID) - - log.Ctx(ctx).Info("alter alias", - zap.String("db", dbName), - zap.String("alias", alias), - zap.String("collection", collectionName), - zap.Uint64("ts", ts), - ) - return nil } diff --git a/internal/rootcoord/meta_table_test.go b/internal/rootcoord/meta_table_test.go index 07d5ac58dd..62ad72bf57 100644 --- a/internal/rootcoord/meta_table_test.go +++ b/internal/rootcoord/meta_table_test.go @@ -2136,17 +2136,7 @@ func TestMetaTable_EmtpyDatabaseName(t *testing.T) { } mt.names.insert(util.DefaultDBName, "name", 1) - err := mt.CreateAlias(context.TODO(), "", "name", "name", typeutil.MaxTimestamp) - assert.Error(t, err) - }) - - t.Run("DropAlias with empty db", func(t *testing.T) { - mt := &MetaTable{ - names: newNameDb(), - } - - mt.names.insert(util.DefaultDBName, "name", 1) - err := mt.DropAlias(context.TODO(), "", "name", typeutil.MaxTimestamp) + err := mt.CheckIfAliasCreatable(context.TODO(), "", "name", "name") assert.Error(t, err) }) } diff --git a/internal/rootcoord/mock_test.go b/internal/rootcoord/mock_test.go index a1ce921a5b..d484bec7bb 100644 --- a/internal/rootcoord/mock_test.go +++ b/internal/rootcoord/mock_test.go @@ -72,9 +72,8 @@ type mockMetaTable struct { AddPartitionFunc func(ctx context.Context, partition *model.Partition) error ChangePartitionStateFunc func(ctx context.Context, collectionID UniqueID, partitionID UniqueID, state pb.PartitionState, ts Timestamp) error RemovePartitionFunc func(ctx context.Context, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error - CreateAliasFunc func(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error - AlterAliasFunc func(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error - DropAliasFunc func(ctx context.Context, dbName string, alias string, ts Timestamp) error + AlterAliasFunc func(ctx context.Context, result message.BroadcastResultAlterAliasMessageV2) error + DropAliasFunc func(ctx context.Context, result message.BroadcastResultDropAliasMessageV2) error IsAliasFunc func(ctx context.Context, dbName, name string) bool DescribeAliasFunc func(ctx context.Context, dbName, alias string, ts Timestamp) (string, error) ListAliasesFunc func(ctx context.Context, dbName, collectionName string, ts Timestamp) ([]string, error) @@ -152,16 +151,12 @@ func (m mockMetaTable) RemovePartition(ctx context.Context, dbID int64, collecti return m.RemovePartitionFunc(ctx, collectionID, partitionID, ts) } -func (m mockMetaTable) CreateAlias(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error { - return m.CreateAliasFunc(ctx, dbName, alias, collectionName, ts) +func (m mockMetaTable) AlterAlias(ctx context.Context, result message.BroadcastResultAlterAliasMessageV2) error { + return m.AlterAliasFunc(ctx, result) } -func (m mockMetaTable) AlterAlias(ctx context.Context, dbName, alias string, collectionName string, ts Timestamp) error { - return m.AlterAliasFunc(ctx, dbName, alias, collectionName, ts) -} - -func (m mockMetaTable) DropAlias(ctx context.Context, dbName, alias string, ts Timestamp) error { - return m.DropAliasFunc(ctx, dbName, alias, ts) +func (m mockMetaTable) DropAlias(ctx context.Context, result message.BroadcastResultDropAliasMessageV2) error { + return m.DropAliasFunc(ctx, result) } func (m mockMetaTable) IsAlias(ctx context.Context, dbName, name string) bool { @@ -511,13 +506,10 @@ func withInvalidMeta() Opt { meta.ChangePartitionStateFunc = func(ctx context.Context, collectionID UniqueID, partitionID UniqueID, state pb.PartitionState, ts Timestamp) error { return errors.New("error mock ChangePartitionState") } - meta.CreateAliasFunc = func(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error { - return errors.New("error mock CreateAlias") - } - meta.AlterAliasFunc = func(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error { + meta.AlterAliasFunc = func(ctx context.Context, result message.BroadcastResultAlterAliasMessageV2) error { return errors.New("error mock AlterAlias") } - meta.DropAliasFunc = func(ctx context.Context, dbName string, alias string, ts Timestamp) error { + meta.DropAliasFunc = func(ctx context.Context, result message.BroadcastResultDropAliasMessageV2) error { return errors.New("error mock DropAlias") } meta.AddCredentialFunc = func(ctx context.Context, credInfo *internalpb.CredentialInfo) error { diff --git a/internal/rootcoord/mocks/meta_table.go b/internal/rootcoord/mocks/meta_table.go index 3e75f5a127..75c3021a86 100644 --- a/internal/rootcoord/mocks/meta_table.go +++ b/internal/rootcoord/mocks/meta_table.go @@ -128,17 +128,17 @@ func (_c *IMetaTable_AddPartition_Call) RunAndReturn(run func(context.Context, * return _c } -// AlterAlias provides a mock function with given fields: ctx, dbName, alias, collectionName, ts -func (_m *IMetaTable) AlterAlias(ctx context.Context, dbName string, alias string, collectionName string, ts uint64) error { - ret := _m.Called(ctx, dbName, alias, collectionName, ts) +// AlterAlias provides a mock function with given fields: ctx, result +func (_m *IMetaTable) AlterAlias(ctx context.Context, result message.BroadcastResult[*messagespb.AlterAliasMessageHeader, *messagespb.AlterAliasMessageBody]) error { + ret := _m.Called(ctx, result) if len(ret) == 0 { panic("no return value specified for AlterAlias") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, string, string, uint64) error); ok { - r0 = rf(ctx, dbName, alias, collectionName, ts) + if rf, ok := ret.Get(0).(func(context.Context, message.BroadcastResult[*messagespb.AlterAliasMessageHeader, *messagespb.AlterAliasMessageBody]) error); ok { + r0 = rf(ctx, result) } else { r0 = ret.Error(0) } @@ -153,17 +153,14 @@ type IMetaTable_AlterAlias_Call struct { // AlterAlias is a helper method to define mock.On call // - ctx context.Context -// - dbName string -// - alias string -// - collectionName string -// - ts uint64 -func (_e *IMetaTable_Expecter) AlterAlias(ctx interface{}, dbName interface{}, alias interface{}, collectionName interface{}, ts interface{}) *IMetaTable_AlterAlias_Call { - return &IMetaTable_AlterAlias_Call{Call: _e.mock.On("AlterAlias", ctx, dbName, alias, collectionName, ts)} +// - result message.BroadcastResult[*messagespb.AlterAliasMessageHeader,*messagespb.AlterAliasMessageBody] +func (_e *IMetaTable_Expecter) AlterAlias(ctx interface{}, result interface{}) *IMetaTable_AlterAlias_Call { + return &IMetaTable_AlterAlias_Call{Call: _e.mock.On("AlterAlias", ctx, result)} } -func (_c *IMetaTable_AlterAlias_Call) Run(run func(ctx context.Context, dbName string, alias string, collectionName string, ts uint64)) *IMetaTable_AlterAlias_Call { +func (_c *IMetaTable_AlterAlias_Call) Run(run func(ctx context.Context, result message.BroadcastResult[*messagespb.AlterAliasMessageHeader, *messagespb.AlterAliasMessageBody])) *IMetaTable_AlterAlias_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(string), args[4].(uint64)) + run(args[0].(context.Context), args[1].(message.BroadcastResult[*messagespb.AlterAliasMessageHeader, *messagespb.AlterAliasMessageBody])) }) return _c } @@ -173,7 +170,7 @@ func (_c *IMetaTable_AlterAlias_Call) Return(_a0 error) *IMetaTable_AlterAlias_C return _c } -func (_c *IMetaTable_AlterAlias_Call) RunAndReturn(run func(context.Context, string, string, string, uint64) error) *IMetaTable_AlterAlias_Call { +func (_c *IMetaTable_AlterAlias_Call) RunAndReturn(run func(context.Context, message.BroadcastResult[*messagespb.AlterAliasMessageHeader, *messagespb.AlterAliasMessageBody]) error) *IMetaTable_AlterAlias_Call { _c.Call.Return(run) return _c } @@ -528,6 +525,152 @@ func (_c *IMetaTable_CheckIfAddCredential_Call) RunAndReturn(run func(context.Co return _c } +// CheckIfAliasAlterable provides a mock function with given fields: ctx, dbName, alias, collectionName +func (_m *IMetaTable) CheckIfAliasAlterable(ctx context.Context, dbName string, alias string, collectionName string) error { + ret := _m.Called(ctx, dbName, alias, collectionName) + + if len(ret) == 0 { + panic("no return value specified for CheckIfAliasAlterable") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, string) error); ok { + r0 = rf(ctx, dbName, alias, collectionName) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// IMetaTable_CheckIfAliasAlterable_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckIfAliasAlterable' +type IMetaTable_CheckIfAliasAlterable_Call struct { + *mock.Call +} + +// CheckIfAliasAlterable is a helper method to define mock.On call +// - ctx context.Context +// - dbName string +// - alias string +// - collectionName string +func (_e *IMetaTable_Expecter) CheckIfAliasAlterable(ctx interface{}, dbName interface{}, alias interface{}, collectionName interface{}) *IMetaTable_CheckIfAliasAlterable_Call { + return &IMetaTable_CheckIfAliasAlterable_Call{Call: _e.mock.On("CheckIfAliasAlterable", ctx, dbName, alias, collectionName)} +} + +func (_c *IMetaTable_CheckIfAliasAlterable_Call) Run(run func(ctx context.Context, dbName string, alias string, collectionName string)) *IMetaTable_CheckIfAliasAlterable_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(string)) + }) + return _c +} + +func (_c *IMetaTable_CheckIfAliasAlterable_Call) Return(_a0 error) *IMetaTable_CheckIfAliasAlterable_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *IMetaTable_CheckIfAliasAlterable_Call) RunAndReturn(run func(context.Context, string, string, string) error) *IMetaTable_CheckIfAliasAlterable_Call { + _c.Call.Return(run) + return _c +} + +// CheckIfAliasCreatable provides a mock function with given fields: ctx, dbName, alias, collectionName +func (_m *IMetaTable) CheckIfAliasCreatable(ctx context.Context, dbName string, alias string, collectionName string) error { + ret := _m.Called(ctx, dbName, alias, collectionName) + + if len(ret) == 0 { + panic("no return value specified for CheckIfAliasCreatable") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, string) error); ok { + r0 = rf(ctx, dbName, alias, collectionName) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// IMetaTable_CheckIfAliasCreatable_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckIfAliasCreatable' +type IMetaTable_CheckIfAliasCreatable_Call struct { + *mock.Call +} + +// CheckIfAliasCreatable is a helper method to define mock.On call +// - ctx context.Context +// - dbName string +// - alias string +// - collectionName string +func (_e *IMetaTable_Expecter) CheckIfAliasCreatable(ctx interface{}, dbName interface{}, alias interface{}, collectionName interface{}) *IMetaTable_CheckIfAliasCreatable_Call { + return &IMetaTable_CheckIfAliasCreatable_Call{Call: _e.mock.On("CheckIfAliasCreatable", ctx, dbName, alias, collectionName)} +} + +func (_c *IMetaTable_CheckIfAliasCreatable_Call) Run(run func(ctx context.Context, dbName string, alias string, collectionName string)) *IMetaTable_CheckIfAliasCreatable_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(string)) + }) + return _c +} + +func (_c *IMetaTable_CheckIfAliasCreatable_Call) Return(_a0 error) *IMetaTable_CheckIfAliasCreatable_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *IMetaTable_CheckIfAliasCreatable_Call) RunAndReturn(run func(context.Context, string, string, string) error) *IMetaTable_CheckIfAliasCreatable_Call { + _c.Call.Return(run) + return _c +} + +// CheckIfAliasDroppable provides a mock function with given fields: ctx, dbName, alias +func (_m *IMetaTable) CheckIfAliasDroppable(ctx context.Context, dbName string, alias string) error { + ret := _m.Called(ctx, dbName, alias) + + if len(ret) == 0 { + panic("no return value specified for CheckIfAliasDroppable") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { + r0 = rf(ctx, dbName, alias) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// IMetaTable_CheckIfAliasDroppable_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckIfAliasDroppable' +type IMetaTable_CheckIfAliasDroppable_Call struct { + *mock.Call +} + +// CheckIfAliasDroppable is a helper method to define mock.On call +// - ctx context.Context +// - dbName string +// - alias string +func (_e *IMetaTable_Expecter) CheckIfAliasDroppable(ctx interface{}, dbName interface{}, alias interface{}) *IMetaTable_CheckIfAliasDroppable_Call { + return &IMetaTable_CheckIfAliasDroppable_Call{Call: _e.mock.On("CheckIfAliasDroppable", ctx, dbName, alias)} +} + +func (_c *IMetaTable_CheckIfAliasDroppable_Call) Run(run func(ctx context.Context, dbName string, alias string)) *IMetaTable_CheckIfAliasDroppable_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string)) + }) + return _c +} + +func (_c *IMetaTable_CheckIfAliasDroppable_Call) Return(_a0 error) *IMetaTable_CheckIfAliasDroppable_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *IMetaTable_CheckIfAliasDroppable_Call) RunAndReturn(run func(context.Context, string, string) error) *IMetaTable_CheckIfAliasDroppable_Call { + _c.Call.Return(run) + return _c +} + // CheckIfCreateRole provides a mock function with given fields: ctx, req func (_m *IMetaTable) CheckIfCreateRole(ctx context.Context, req *milvuspb.CreateRoleRequest) error { ret := _m.Called(ctx, req) @@ -1045,56 +1188,6 @@ func (_c *IMetaTable_CheckIfUpdateCredential_Call) RunAndReturn(run func(context return _c } -// CreateAlias provides a mock function with given fields: ctx, dbName, alias, collectionName, ts -func (_m *IMetaTable) CreateAlias(ctx context.Context, dbName string, alias string, collectionName string, ts uint64) error { - ret := _m.Called(ctx, dbName, alias, collectionName, ts) - - if len(ret) == 0 { - panic("no return value specified for CreateAlias") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, string, string, uint64) error); ok { - r0 = rf(ctx, dbName, alias, collectionName, ts) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// IMetaTable_CreateAlias_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateAlias' -type IMetaTable_CreateAlias_Call struct { - *mock.Call -} - -// CreateAlias is a helper method to define mock.On call -// - ctx context.Context -// - dbName string -// - alias string -// - collectionName string -// - ts uint64 -func (_e *IMetaTable_Expecter) CreateAlias(ctx interface{}, dbName interface{}, alias interface{}, collectionName interface{}, ts interface{}) *IMetaTable_CreateAlias_Call { - return &IMetaTable_CreateAlias_Call{Call: _e.mock.On("CreateAlias", ctx, dbName, alias, collectionName, ts)} -} - -func (_c *IMetaTable_CreateAlias_Call) Run(run func(ctx context.Context, dbName string, alias string, collectionName string, ts uint64)) *IMetaTable_CreateAlias_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(string), args[4].(uint64)) - }) - return _c -} - -func (_c *IMetaTable_CreateAlias_Call) Return(_a0 error) *IMetaTable_CreateAlias_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *IMetaTable_CreateAlias_Call) RunAndReturn(run func(context.Context, string, string, string, uint64) error) *IMetaTable_CreateAlias_Call { - _c.Call.Return(run) - return _c -} - // CreateDatabase provides a mock function with given fields: ctx, db, ts func (_m *IMetaTable) CreateDatabase(ctx context.Context, db *model.Database, ts uint64) error { ret := _m.Called(ctx, db, ts) @@ -1344,17 +1437,17 @@ func (_c *IMetaTable_DescribeAlias_Call) RunAndReturn(run func(context.Context, return _c } -// DropAlias provides a mock function with given fields: ctx, dbName, alias, ts -func (_m *IMetaTable) DropAlias(ctx context.Context, dbName string, alias string, ts uint64) error { - ret := _m.Called(ctx, dbName, alias, ts) +// DropAlias provides a mock function with given fields: ctx, result +func (_m *IMetaTable) DropAlias(ctx context.Context, result message.BroadcastResult[*messagespb.DropAliasMessageHeader, *messagespb.DropAliasMessageBody]) error { + ret := _m.Called(ctx, result) if len(ret) == 0 { panic("no return value specified for DropAlias") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, string, uint64) error); ok { - r0 = rf(ctx, dbName, alias, ts) + if rf, ok := ret.Get(0).(func(context.Context, message.BroadcastResult[*messagespb.DropAliasMessageHeader, *messagespb.DropAliasMessageBody]) error); ok { + r0 = rf(ctx, result) } else { r0 = ret.Error(0) } @@ -1369,16 +1462,14 @@ type IMetaTable_DropAlias_Call struct { // DropAlias is a helper method to define mock.On call // - ctx context.Context -// - dbName string -// - alias string -// - ts uint64 -func (_e *IMetaTable_Expecter) DropAlias(ctx interface{}, dbName interface{}, alias interface{}, ts interface{}) *IMetaTable_DropAlias_Call { - return &IMetaTable_DropAlias_Call{Call: _e.mock.On("DropAlias", ctx, dbName, alias, ts)} +// - result message.BroadcastResult[*messagespb.DropAliasMessageHeader,*messagespb.DropAliasMessageBody] +func (_e *IMetaTable_Expecter) DropAlias(ctx interface{}, result interface{}) *IMetaTable_DropAlias_Call { + return &IMetaTable_DropAlias_Call{Call: _e.mock.On("DropAlias", ctx, result)} } -func (_c *IMetaTable_DropAlias_Call) Run(run func(ctx context.Context, dbName string, alias string, ts uint64)) *IMetaTable_DropAlias_Call { +func (_c *IMetaTable_DropAlias_Call) Run(run func(ctx context.Context, result message.BroadcastResult[*messagespb.DropAliasMessageHeader, *messagespb.DropAliasMessageBody])) *IMetaTable_DropAlias_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(uint64)) + run(args[0].(context.Context), args[1].(message.BroadcastResult[*messagespb.DropAliasMessageHeader, *messagespb.DropAliasMessageBody])) }) return _c } @@ -1388,7 +1479,7 @@ func (_c *IMetaTable_DropAlias_Call) Return(_a0 error) *IMetaTable_DropAlias_Cal return _c } -func (_c *IMetaTable_DropAlias_Call) RunAndReturn(run func(context.Context, string, string, uint64) error) *IMetaTable_DropAlias_Call { +func (_c *IMetaTable_DropAlias_Call) RunAndReturn(run func(context.Context, message.BroadcastResult[*messagespb.DropAliasMessageHeader, *messagespb.DropAliasMessageBody]) error) *IMetaTable_DropAlias_Call { _c.Call.Return(run) return _c } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 5c385ccbbb..0cb1ed7fc1 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1865,49 +1865,27 @@ func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("CreateAlias") - - log.Ctx(ctx).Info("received request to create alias", + logger := log.Ctx(ctx).With( zap.String("role", typeutil.RootCoordRole), + zap.String("dbName", in.GetDbName()), zap.String("alias", in.GetAlias()), - zap.String("collection", in.GetCollectionName())) - - t := &createAliasTask{ - baseTask: newBaseTask(ctx, c), - Req: in, - } - - if err := c.scheduler.AddTask(t); err != nil { - log.Ctx(ctx).Info("failed to enqueue request to create alias", - zap.String("role", typeutil.RootCoordRole), - zap.Error(err), - zap.String("alias", in.GetAlias()), - zap.String("collection", in.GetCollectionName())) - - metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.FailLabel).Inc() - return merr.Status(err), nil - } - - if err := t.WaitToFinish(); err != nil { - log.Ctx(ctx).Info("failed to create alias", - zap.String("role", typeutil.RootCoordRole), - zap.Error(err), - zap.String("alias", in.GetAlias()), - zap.String("collection", in.GetCollectionName()), - zap.Uint64("ts", t.GetTs())) + zap.String("collectionName", in.GetCollectionName())) + logger.Info("received request to create alias") + if err := c.broadcastCreateAlias(ctx, in); err != nil { + if errors.Is(err, errIgnoredAlterAlias) { + logger.Info("create alias already set on this collection, ignore it") + metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.SuccessLabel).Inc() + return merr.Success(), nil + } + logger.Info("failed to create alias", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.FailLabel).Inc() return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("CreateAlias").Observe(float64(tr.ElapseSpan().Milliseconds())) - metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("CreateAlias").Observe(float64(t.queueDur.Milliseconds())) - - log.Ctx(ctx).Info("done to create alias", - zap.String("role", typeutil.RootCoordRole), - zap.String("alias", in.GetAlias()), - zap.String("collection", in.GetCollectionName()), - zap.Uint64("ts", t.GetTs())) + logger.Info("done to create alias") return merr.Success(), nil } @@ -1919,45 +1897,25 @@ func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*c metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("DropAlias") - - log.Ctx(ctx).Info("received request to drop alias", - zap.String("role", typeutil.RootCoordRole), + logger := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), + zap.String("dbName", in.GetDbName()), zap.String("alias", in.GetAlias())) + logger.Info("received request to drop alias") - t := &dropAliasTask{ - baseTask: newBaseTask(ctx, c), - Req: in, - } - - if err := c.scheduler.AddTask(t); err != nil { - log.Ctx(ctx).Info("failed to enqueue request to drop alias", - zap.String("role", typeutil.RootCoordRole), - zap.Error(err), - zap.String("alias", in.GetAlias())) - - metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.FailLabel).Inc() - return merr.Status(err), nil - } - - if err := t.WaitToFinish(); err != nil { - log.Ctx(ctx).Info("failed to drop alias", - zap.String("role", typeutil.RootCoordRole), - zap.Error(err), - zap.String("alias", in.GetAlias()), - zap.Uint64("ts", t.GetTs())) - + if err := c.broadcastDropAlias(ctx, in); err != nil { + if errors.Is(err, merr.ErrAliasNotFound) { + logger.Info("drop alias not found, ignore it") + metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.SuccessLabel).Inc() + return merr.Success(), nil + } + logger.Info("failed to drop alias", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.FailLabel).Inc() return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("DropAlias").Observe(float64(tr.ElapseSpan().Milliseconds())) - metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("DropAlias").Observe(float64(t.queueDur.Milliseconds())) - - log.Ctx(ctx).Info("done to drop alias", - zap.String("role", typeutil.RootCoordRole), - zap.String("alias", in.GetAlias()), - zap.Uint64("ts", t.GetTs())) + logger.Info("done to drop alias") return merr.Success(), nil } @@ -1969,49 +1927,26 @@ func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) ( metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("AlterAlias") - - log.Ctx(ctx).Info("received request to alter alias", - zap.String("role", typeutil.RootCoordRole), + logger := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole), + zap.String("dbName", in.GetDbName()), zap.String("alias", in.GetAlias()), - zap.String("collection", in.GetCollectionName())) - - t := &alterAliasTask{ - baseTask: newBaseTask(ctx, c), - Req: in, - } - - if err := c.scheduler.AddTask(t); err != nil { - log.Ctx(ctx).Info("failed to enqueue request to alter alias", - zap.String("role", typeutil.RootCoordRole), - zap.Error(err), - zap.String("alias", in.GetAlias()), - zap.String("collection", in.GetCollectionName())) - - metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.FailLabel).Inc() - return merr.Status(err), nil - } - - if err := t.WaitToFinish(); err != nil { - log.Ctx(ctx).Info("failed to alter alias", - zap.String("role", typeutil.RootCoordRole), - zap.Error(err), - zap.String("alias", in.GetAlias()), - zap.String("collection", in.GetCollectionName()), - zap.Uint64("ts", t.GetTs())) + zap.String("collectionName", in.GetCollectionName())) + logger.Info("received request to alter alias") + if err := c.broadcastAlterAlias(ctx, in); err != nil { + if errors.Is(err, errIgnoredAlterAlias) { + logger.Info("alter alias already set on this collection, ignore it") + metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.SuccessLabel).Inc() + return merr.Success(), nil + } + logger.Info("failed to alter alias", zap.Error(err)) metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.FailLabel).Inc() return merr.Status(err), nil } metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues("AlterAlias").Observe(float64(tr.ElapseSpan().Milliseconds())) - metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("AlterAlias").Observe(float64(t.queueDur.Milliseconds())) - - log.Info("done to alter alias", - zap.String("role", typeutil.RootCoordRole), - zap.String("alias", in.GetAlias()), - zap.String("collection", in.GetCollectionName()), - zap.Uint64("ts", t.GetTs())) + logger.Info("done to alter alias") return merr.Success(), nil } diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index c5a080ffd8..0c7d255652 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -84,7 +84,9 @@ func initStreamingSystem() { bapi.EXPECT().Close().Return() mb := mock_broadcaster.NewMockBroadcaster(t) - mb.EXPECT().WithResourceKeys(mock.Anything, mock.Anything).Return(bapi, nil) + mb.EXPECT().WithResourceKeys(mock.Anything, mock.Anything).Return(bapi, nil).Maybe() + mb.EXPECT().WithResourceKeys(mock.Anything, mock.Anything, mock.Anything).Return(bapi, nil).Maybe() + mb.EXPECT().WithResourceKeys(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(bapi, nil).Maybe() mb.EXPECT().Close().Return() broadcast.Release() broadcast.ResetBroadcaster() @@ -328,34 +330,6 @@ func TestRootCoord_CreateAlias(t *testing.T) { assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) }) - - t.Run("failed to add task", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withInvalidScheduler()) - - ctx := context.Background() - resp, err := c.CreateAlias(ctx, &milvuspb.CreateAliasRequest{}) - assert.NoError(t, err) - assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - }) - - t.Run("failed to execute", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withTaskFailScheduler()) - ctx := context.Background() - resp, err := c.CreateAlias(ctx, &milvuspb.CreateAliasRequest{}) - assert.NoError(t, err) - assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - }) - - t.Run("normal case, everything is ok", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withValidScheduler()) - ctx := context.Background() - resp, err := c.CreateAlias(ctx, &milvuspb.CreateAliasRequest{}) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - }) } func TestRootCoord_DropAlias(t *testing.T) { @@ -366,34 +340,6 @@ func TestRootCoord_DropAlias(t *testing.T) { assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) }) - - t.Run("failed to add task", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withInvalidScheduler()) - - ctx := context.Background() - resp, err := c.DropAlias(ctx, &milvuspb.DropAliasRequest{}) - assert.NoError(t, err) - assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - }) - - t.Run("failed to execute", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withTaskFailScheduler()) - ctx := context.Background() - resp, err := c.DropAlias(ctx, &milvuspb.DropAliasRequest{}) - assert.NoError(t, err) - assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - }) - - t.Run("normal case, everything is ok", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withValidScheduler()) - ctx := context.Background() - resp, err := c.DropAlias(ctx, &milvuspb.DropAliasRequest{}) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - }) } func TestRootCoord_AlterAlias(t *testing.T) { @@ -404,34 +350,6 @@ func TestRootCoord_AlterAlias(t *testing.T) { assert.NoError(t, err) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) }) - - t.Run("failed to add task", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withInvalidScheduler()) - - ctx := context.Background() - resp, err := c.AlterAlias(ctx, &milvuspb.AlterAliasRequest{}) - assert.NoError(t, err) - assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - }) - - t.Run("failed to execute", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withTaskFailScheduler()) - ctx := context.Background() - resp, err := c.AlterAlias(ctx, &milvuspb.AlterAliasRequest{}) - assert.NoError(t, err) - assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - }) - - t.Run("normal case, everything is ok", func(t *testing.T) { - c := newTestCore(withHealthyCode(), - withValidScheduler()) - ctx := context.Background() - resp, err := c.AlterAlias(ctx, &milvuspb.AlterAliasRequest{}) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - }) } func TestRootCoord_DescribeAlias(t *testing.T) { diff --git a/internal/rootcoord/task_test.go b/internal/rootcoord/task_test.go index aa82dac2c6..fc57d4ad4f 100644 --- a/internal/rootcoord/task_test.go +++ b/internal/rootcoord/task_test.go @@ -72,16 +72,6 @@ func TestLockerKey(t *testing.T) { } func TestGetLockerKey(t *testing.T) { - t.Run("alter alias task locker key", func(t *testing.T) { - tt := &alterAliasTask{ - Req: &milvuspb.AlterAliasRequest{ - DbName: "foo", - CollectionName: "bar", - }, - } - key := tt.GetLockerKey() - assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|bar-2-true") - }) t.Run("alter collection task locker key", func(t *testing.T) { metaMock := mockrootcoord.NewIMetaTable(t) metaMock.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything). @@ -124,23 +114,6 @@ func TestGetLockerKey(t *testing.T) { key := tt.GetLockerKey() assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|111-2-true") }) - t.Run("create alias task locker key", func(t *testing.T) { - metaMock := mockrootcoord.NewIMetaTable(t) - c := &Core{ - meta: metaMock, - } - tt := &createAliasTask{ - baseTask: baseTask{ - core: c, - }, - Req: &milvuspb.CreateAliasRequest{ - DbName: "foo", - CollectionName: "bar", - }, - } - key := tt.GetLockerKey() - assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|bar-2-true") - }) t.Run("create collection task locker key", func(t *testing.T) { tt := &createCollectionTask{ Req: &milvuspb.CreateCollectionRequest{ @@ -219,28 +192,6 @@ func TestGetLockerKey(t *testing.T) { key := tt.GetLockerKey() assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false") }) - t.Run("drop alias task locker key", func(t *testing.T) { - metaMock := mockrootcoord.NewIMetaTable(t) - metaMock.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything). - RunAndReturn(func(ctx context.Context, s string, s2 string, u uint64) (*model.Collection, error) { - return &model.Collection{ - Name: "real" + s2, - CollectionID: 111, - }, nil - }) - c := &Core{ - meta: metaMock, - } - tt := &dropAliasTask{ - baseTask: baseTask{core: c}, - Req: &milvuspb.DropAliasRequest{ - DbName: "foo", - Alias: "bar", - }, - } - key := tt.GetLockerKey() - assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|111-2-true") - }) t.Run("drop collection task locker key", func(t *testing.T) { metaMock := mockrootcoord.NewIMetaTable(t) metaMock.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything). diff --git a/internal/streamingcoord/server/broadcaster/resource_key_locker.go b/internal/streamingcoord/server/broadcaster/resource_key_locker.go index e3fc164210..5b40450621 100644 --- a/internal/streamingcoord/server/broadcaster/resource_key_locker.go +++ b/internal/streamingcoord/server/broadcaster/resource_key_locker.go @@ -8,6 +8,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/messagespb" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/util/lock" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) // errFastLockFailed is the error for fast lock failed. @@ -83,7 +84,7 @@ func (l *lockGuard) Unlock() { // FastLock locks the resource keys without waiting. // return error if the resource key is already locked. func (r *resourceKeyLocker) FastLock(keys ...message.ResourceKey) (*lockGuards, error) { - sortResourceKeys(keys) + keys = uniqueSortResourceKeys(keys) g := &lockGuards{} for _, key := range keys { @@ -106,7 +107,8 @@ func (r *resourceKeyLocker) FastLock(keys ...message.ResourceKey) (*lockGuards, // Lock locks the resource keys. func (r *resourceKeyLocker) Lock(keys ...message.ResourceKey) *lockGuards { // lock the keys in order to avoid deadlock. - sortResourceKeys(keys) + keys = uniqueSortResourceKeys(keys) + g := &lockGuards{} for _, key := range keys { if key.Shared { @@ -128,12 +130,14 @@ func (r *resourceKeyLocker) unlockWithKey(key message.ResourceKey) { r.inner.Unlock(newResourceLockKey(key)) } -// sortResourceKeys sorts the resource keys. -func sortResourceKeys(keys []message.ResourceKey) { +// uniqueSortResourceKeys sorts the resource keys. +func uniqueSortResourceKeys(keys []message.ResourceKey) []message.ResourceKey { + keys = typeutil.NewSet(keys...).Collect() sort.Slice(keys, func(i, j int) bool { if keys[i].Domain != keys[j].Domain { return keys[i].Domain < keys[j].Domain } return keys[i].Key < keys[j].Key }) + return keys } diff --git a/internal/streamingcoord/server/broadcaster/resource_key_locker_test.go b/internal/streamingcoord/server/broadcaster/resource_key_locker_test.go index b80d9a0373..a1f2f8188d 100644 --- a/internal/streamingcoord/server/broadcaster/resource_key_locker_test.go +++ b/internal/streamingcoord/server/broadcaster/resource_key_locker_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" ) @@ -110,7 +112,7 @@ func TestResourceKeyLocker(t *testing.T) { key := message.NewCollectionNameResourceKey("test_collection") // First fast lock should succeed - guards1, err := locker.FastLock(key) + guards1, err := locker.FastLock(key, key) if err != nil { t.Fatalf("First FastLock failed: %v", err) } @@ -130,3 +132,35 @@ func TestResourceKeyLocker(t *testing.T) { guards2.Unlock() }) } + +func TestUniqueSortResourceKeys(t *testing.T) { + keys := []message.ResourceKey{ + message.NewSharedDBNameResourceKey("test_db_1"), + message.NewSharedDBNameResourceKey("test_db_1"), + message.NewSharedDBNameResourceKey("test_db_2"), + message.NewExclusiveCollectionNameResourceKey("test_db_1", "test_collection_11"), + message.NewExclusiveCollectionNameResourceKey("test_db_1", "test_collection_11"), + message.NewExclusiveCollectionNameResourceKey("test_db_1", "test_collection_12"), + message.NewExclusiveCollectionNameResourceKey("test_db_1", "test_collection_13"), + message.NewExclusiveCollectionNameResourceKey("test_db_2", "test_collection_21"), + message.NewExclusiveCollectionNameResourceKey("test_db_2", "test_collection_21"), + message.NewExclusiveCollectionNameResourceKey("test_db_2", "test_collection_22"), + message.NewSharedClusterResourceKey(), + } + for i := 0; i < 10; i++ { + rand.Shuffle(len(keys), func(i, j int) { + keys[i], keys[j] = keys[j], keys[i] + }) + keys2 := uniqueSortResourceKeys(keys) + assert.Equal(t, keys2, []message.ResourceKey{ + message.NewExclusiveCollectionNameResourceKey("test_db_1", "test_collection_11"), + message.NewExclusiveCollectionNameResourceKey("test_db_1", "test_collection_12"), + message.NewExclusiveCollectionNameResourceKey("test_db_1", "test_collection_13"), + message.NewExclusiveCollectionNameResourceKey("test_db_2", "test_collection_21"), + message.NewExclusiveCollectionNameResourceKey("test_db_2", "test_collection_22"), + message.NewSharedDBNameResourceKey("test_db_1"), + message.NewSharedDBNameResourceKey("test_db_2"), + message.NewSharedClusterResourceKey(), + }) + } +}