enhance: support alias with WAL-based DDL framework (#44865)

issue: #43897

- Alias related DDL is implemented by WAL-based DDL framework now.
- Support following message type in wal AlterAlias, DropAlias.
- Alias DDL can be synced by new CDC now.
- Refactor some UT for Alias DDL.

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-10-18 15:12:01 +08:00 committed by GitHub
parent eae6aff644
commit 496331ffa8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 723 additions and 902 deletions

View File

@ -1,54 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/util/proxyutil"
)
type alterAliasTask struct {
baseTask
Req *milvuspb.AlterAliasRequest
}
func (t *alterAliasTask) Prepare(ctx context.Context) error {
if err := CheckMsgType(t.Req.GetBase().GetMsgType(), commonpb.MsgType_AlterAlias); err != nil {
return err
}
return nil
}
func (t *alterAliasTask) Execute(ctx context.Context) error {
collID := t.core.meta.GetCollectionID(ctx, t.Req.GetDbName(), t.Req.GetCollectionName())
if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, collID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_AlterAlias)); err != nil {
return err
}
// alter alias is atomic enough.
return t.core.meta.AlterAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.Req.GetCollectionName(), t.GetTs())
}
func (t *alterAliasTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
NewCollectionLockerKey(t.Req.GetCollectionName(), true),
)
}

View File

@ -1,78 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
)
func Test_alterAliasTask_Prepare(t *testing.T) {
t.Run("invalid msg type", func(t *testing.T) {
task := &alterAliasTask{Req: &milvuspb.AlterAliasRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection}}}
err := task.Prepare(context.Background())
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
task := &alterAliasTask{Req: &milvuspb.AlterAliasRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterAlias}}}
err := task.Prepare(context.Background())
assert.NoError(t, err)
})
}
func Test_alterAliasTask_Execute(t *testing.T) {
t.Run("failed to expire cache", func(t *testing.T) {
mockMeta := mockrootcoord.NewIMetaTable(t)
mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111)
core := newTestCore(withInvalidProxyManager(), withMeta(mockMeta))
task := &alterAliasTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterAliasRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterAlias},
Alias: "test",
},
}
err := task.Execute(context.Background())
assert.Error(t, err)
})
t.Run("failed to alter alias", func(t *testing.T) {
mockMeta := mockrootcoord.NewIMetaTable(t)
mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111)
mockMeta.EXPECT().AlterAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(errors.New("failed to alter alias"))
core := newTestCore(withValidProxyManager(), withMeta(mockMeta))
task := &alterAliasTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterAliasRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterAlias},
Alias: "test",
},
}
err := task.Execute(context.Background())
assert.Error(t, err)
})
}

View File

@ -1,98 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"fmt"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/pkg/v2/log"
)
type createAliasTask struct {
baseTask
Req *milvuspb.CreateAliasRequest
}
func (t *createAliasTask) Prepare(ctx context.Context) error {
if err := CheckMsgType(t.Req.GetBase().GetMsgType(), commonpb.MsgType_CreateAlias); err != nil {
return err
}
return nil
}
func (t *createAliasTask) Execute(ctx context.Context) error {
oldColl, err := t.core.meta.GetCollectionByName(ctx, t.Req.GetDbName(), t.Req.GetCollectionName(), t.ts)
if err != nil {
log.Ctx(ctx).Warn("get collection failed during create alias",
zap.String("collectionName", t.Req.GetCollectionName()), zap.Uint64("ts", t.ts))
return err
}
return executeCreateAliasTaskSteps(ctx, t.core, t.Req, oldColl, t.GetTs())
}
func (t *createAliasTask) GetLockerKey() LockerKey {
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
NewCollectionLockerKey(t.Req.GetCollectionName(), true),
)
}
func executeCreateAliasTaskSteps(ctx context.Context, core *Core, req *milvuspb.CreateAliasRequest, oldColl *model.Collection, ts Timestamp) error {
redoTask := newBaseRedoTask(core.stepExecutor)
// properties needs to be refreshed in the cache
aliases := core.meta.ListAliasesByID(ctx, oldColl.CollectionID)
redoTask.AddSyncStep(&expireCacheStep{
baseStep: baseStep{core: core},
dbName: req.GetDbName(),
collectionNames: append(aliases, req.GetCollectionName()),
collectionID: oldColl.CollectionID,
opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_CreateAlias)},
})
redoTask.AddSyncStep(&CreateAliasStep{
baseStep: baseStep{core: core},
Req: req,
ts: ts,
})
return redoTask.Execute(ctx)
}
type CreateAliasStep struct {
baseStep
Req *milvuspb.CreateAliasRequest
ts Timestamp
}
func (c *CreateAliasStep) Execute(ctx context.Context) ([]nestedStep, error) {
err := c.core.meta.CreateAlias(ctx, c.Req.GetDbName(), c.Req.GetAlias(), c.Req.GetCollectionName(), c.ts)
return nil, err
}
func (c *CreateAliasStep) Desc() string {
return fmt.Sprintf("create alias %s, for collection %s", c.Req.GetAlias(), c.Req.GetCollectionName())
}

View File

@ -1,99 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
)
func Test_createAliasTask_Prepare(t *testing.T) {
t.Run("invalid msg type", func(t *testing.T) {
task := &createAliasTask{Req: &milvuspb.CreateAliasRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection}}}
err := task.Prepare(context.Background())
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
task := &createAliasTask{Req: &milvuspb.CreateAliasRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateAlias}}}
err := task.Prepare(context.Background())
assert.NoError(t, err)
})
}
func Test_createAliasTask_Execute(t *testing.T) {
t.Run("failed_to_describe_collection", func(t *testing.T) {
mockMeta := mockrootcoord.NewIMetaTable(t)
mockMeta.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("mocked"))
core := newTestCore(withMeta(mockMeta))
task := &createAliasTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.CreateAliasRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateAlias},
Alias: "test",
},
}
err := task.Execute(context.Background())
assert.Error(t, err)
})
t.Run("failed_to_invalidate_cache", func(t *testing.T) {
mockMeta := mockrootcoord.NewIMetaTable(t)
mockMeta.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&model.Collection{CollectionID: 111}, nil)
mockMeta.EXPECT().ListAliasesByID(mock.Anything, mock.Anything).Return([]string{})
core := newTestCore(withMeta(mockMeta), withInvalidProxyManager())
task := &createAliasTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.CreateAliasRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateAlias},
CollectionName: "coll_test",
Alias: "test",
},
}
err := task.Execute(context.Background())
assert.Error(t, err)
})
t.Run("failed_to_create_alias", func(t *testing.T) {
mockMeta := mockrootcoord.NewIMetaTable(t)
mockMeta.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&model.Collection{CollectionID: 111}, nil)
mockMeta.EXPECT().ListAliasesByID(mock.Anything, mock.Anything).Return([]string{})
mockMeta.EXPECT().CreateAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mocked"))
core := newTestCore(withMeta(mockMeta), withValidProxyManager())
task := &createAliasTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.CreateAliasRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateAlias},
CollectionName: "coll_test",
Alias: "test",
},
}
err := task.Execute(context.Background())
assert.Error(t, err)
})
}

View File

@ -38,6 +38,7 @@ func RegisterDDLCallbacks(core *Core) {
} }
ddlCallback.registerRBACCallbacks() ddlCallback.registerRBACCallbacks()
ddlCallback.registerDatabaseCallbacks() ddlCallback.registerDatabaseCallbacks()
ddlCallback.registerAliasCallbacks()
} }
// registerRBACCallbacks registers the rbac callbacks. // registerRBACCallbacks registers the rbac callbacks.
@ -62,6 +63,12 @@ func (c *DDLCallback) registerDatabaseCallbacks() {
registry.RegisterDropDatabaseV2AckCallback(c.dropDatabaseV1AckCallback) registry.RegisterDropDatabaseV2AckCallback(c.dropDatabaseV1AckCallback)
} }
// registerAliasCallbacks registers the alias callbacks.
func (c *DDLCallback) registerAliasCallbacks() {
registry.RegisterAlterAliasV2AckCallback(c.alterAliasV2AckCallback)
registry.RegisterDropAliasV2AckCallback(c.dropAliasV2AckCallback)
}
// DDLCallback is the callback of ddl. // DDLCallback is the callback of ddl.
type DDLCallback struct { type DDLCallback struct {
*Core *Core
@ -118,3 +125,16 @@ func startBroadcastWithDatabaseLock(ctx context.Context, dbName string) (broadca
} }
return broadcaster, nil return broadcaster, nil
} }
// startBroadcastWithAlterAliasLock starts a broadcast with alter alias lock.
func startBroadcastWithAlterAliasLock(ctx context.Context, dbName string, collectionName string, alias string) (broadcaster.BroadcastAPI, error) {
broadcaster, err := broadcast.StartBroadcastWithResourceKeys(ctx,
message.NewSharedDBNameResourceKey(dbName),
message.NewExclusiveCollectionNameResourceKey(dbName, collectionName),
message.NewExclusiveCollectionNameResourceKey(dbName, alias),
)
if err != nil {
return nil, errors.Wrap(err, "failed to start broadcast with alter alias lock")
}
return broadcaster, nil
}

View File

@ -0,0 +1,186 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
pb "github.com/milvus-io/milvus/pkg/v2/proto/etcdpb"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
func TestDDLCallbacksAliasDDL(t *testing.T) {
initStreamingSystem()
kv, _ := kvfactory.GetEtcdAndPath()
path := funcutil.RandomString(10)
catalogKV := etcdkv.NewEtcdKV(kv, path)
ss, err := rootcoord.NewSuffixSnapshot(catalogKV, rootcoord.SnapshotsSep, path, rootcoord.SnapshotPrefix)
require.NoError(t, err)
testDB := newNameDb()
collID2Meta := make(map[typeutil.UniqueID]*model.Collection)
core := newTestCore(withHealthyCode(),
withMeta(&MetaTable{
catalog: rootcoord.NewCatalog(catalogKV, ss),
names: testDB,
aliases: newNameDb(),
dbName2Meta: make(map[string]*model.Database),
collID2Meta: collID2Meta,
}),
withValidProxyManager(),
withValidIDAllocator(),
)
registry.ResetRegistration()
RegisterDDLCallbacks(core)
status, err := core.CreateDatabase(context.Background(), &milvuspb.CreateDatabaseRequest{
DbName: "test",
})
require.NoError(t, merr.CheckRPCCall(status, err))
// TODO: after refactor create collection, we can use CreateCollection to create a collection directly.
testDB.insert("test", "test_collection", 1)
testDB.insert("test", "test_collection2", 2)
collID2Meta[1] = &model.Collection{
CollectionID: 1,
Name: "test_collection",
State: pb.CollectionState_CollectionCreated,
}
collID2Meta[2] = &model.Collection{
CollectionID: 2,
Name: "test_collection2",
State: pb.CollectionState_CollectionCreated,
}
// create an alias with a not-exist database.
status, err = core.CreateAlias(context.Background(), &milvuspb.CreateAliasRequest{
DbName: "test2",
CollectionName: "test_collection",
Alias: "test_alias",
})
require.Error(t, merr.CheckRPCCall(status, err))
// create an alias with a not-exist collection.
status, err = core.CreateAlias(context.Background(), &milvuspb.CreateAliasRequest{
DbName: "test",
CollectionName: "test_collection3",
Alias: "test_alias",
})
require.Error(t, merr.CheckRPCCall(status, err))
// create an alias
status, err = core.CreateAlias(context.Background(), &milvuspb.CreateAliasRequest{
DbName: "test",
CollectionName: "test_collection",
Alias: "test_alias",
})
require.NoError(t, merr.CheckRPCCall(status, err))
coll, err := core.meta.GetCollectionByName(context.Background(), "test", "test_alias", typeutil.MaxTimestamp)
require.NoError(t, err)
require.Equal(t, int64(1), coll.CollectionID)
require.Equal(t, "test_collection", coll.Name)
// create an alias already created on current collection should be ok.
status, err = core.CreateAlias(context.Background(), &milvuspb.CreateAliasRequest{
DbName: "test",
CollectionName: "test_collection",
Alias: "test_alias",
})
require.NoError(t, merr.CheckRPCCall(status, err))
// create an alias already created on another collection should be error.
status, err = core.CreateAlias(context.Background(), &milvuspb.CreateAliasRequest{
DbName: "test",
CollectionName: "test_collection2",
Alias: "test_alias",
})
require.Error(t, merr.CheckRPCCall(status, err))
// test alter alias already created on current collection should be ok.
status, err = core.AlterAlias(context.Background(), &milvuspb.AlterAliasRequest{
DbName: "test",
CollectionName: "test_collection",
Alias: "test_alias",
})
require.NoError(t, merr.CheckRPCCall(status, err))
// test alter alias to another collection should be ok.
status, err = core.AlterAlias(context.Background(), &milvuspb.AlterAliasRequest{
DbName: "test",
CollectionName: "test_collection2",
Alias: "test_alias",
})
require.NoError(t, merr.CheckRPCCall(status, err))
// alter alias to a not-exist database should be error.
status, err = core.AlterAlias(context.Background(), &milvuspb.AlterAliasRequest{
DbName: "test2",
CollectionName: "test_collection2",
Alias: "test_alias",
})
require.Error(t, merr.CheckRPCCall(status, err))
// alter alias to a not-exist collection should be error.
status, err = core.AlterAlias(context.Background(), &milvuspb.AlterAliasRequest{
DbName: "test",
CollectionName: "test_collection3",
Alias: "test_alias",
})
require.Error(t, merr.CheckRPCCall(status, err))
// alter alias to a not exist alias should be error.
status, err = core.AlterAlias(context.Background(), &milvuspb.AlterAliasRequest{
DbName: "test",
CollectionName: "test_collection2",
Alias: "test_alias2",
})
require.Error(t, merr.CheckRPCCall(status, err))
// drop a not exist alias should be ok.
status, err = core.DropAlias(context.Background(), &milvuspb.DropAliasRequest{
DbName: "test",
Alias: "test_alias2",
})
require.NoError(t, merr.CheckRPCCall(status, err))
// drop a alias exist should be ok.
status, err = core.DropAlias(context.Background(), &milvuspb.DropAliasRequest{
DbName: "test",
Alias: "test_alias",
})
require.NoError(t, merr.CheckRPCCall(status, err))
// drop a alias already dropped should be ok.
status, err = core.DropAlias(context.Background(), &milvuspb.DropAliasRequest{
DbName: "test",
Alias: "test_alias",
})
require.NoError(t, merr.CheckRPCCall(status, err))
}

View File

@ -0,0 +1,117 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"strings"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message/ce"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
func (c *Core) broadcastCreateAlias(ctx context.Context, req *milvuspb.CreateAliasRequest) error {
req.DbName = strings.TrimSpace(req.DbName)
req.Alias = strings.TrimSpace(req.Alias)
req.CollectionName = strings.TrimSpace(req.CollectionName)
broadcaster, err := startBroadcastWithAlterAliasLock(ctx, req.GetDbName(), req.GetCollectionName(), req.GetAlias())
if err != nil {
return err
}
defer broadcaster.Close()
if err := c.meta.CheckIfAliasCreatable(ctx, req.GetDbName(), req.GetAlias(), req.GetCollectionName()); err != nil {
return err
}
db, err := c.meta.GetDatabaseByName(ctx, req.GetDbName(), typeutil.MaxTimestamp)
if err != nil {
return err
}
collection, err := c.meta.GetCollectionByName(ctx, req.GetDbName(), req.GetCollectionName(), typeutil.MaxTimestamp)
if err != nil {
return err
}
msg := message.NewAlterAliasMessageBuilderV2().
WithHeader(&message.AlterAliasMessageHeader{
DbId: db.ID,
DbName: req.GetDbName(),
CollectionId: collection.CollectionID,
Alias: req.GetAlias(),
CollectionName: req.GetCollectionName(),
}).
WithBody(&message.AlterAliasMessageBody{}).
WithBroadcast([]string{streaming.WAL().ControlChannel()}).
MustBuildBroadcast()
_, err = broadcaster.Broadcast(ctx, msg)
return err
}
func (c *Core) broadcastAlterAlias(ctx context.Context, req *milvuspb.AlterAliasRequest) error {
req.DbName = strings.TrimSpace(req.DbName)
req.Alias = strings.TrimSpace(req.Alias)
req.CollectionName = strings.TrimSpace(req.CollectionName)
broadcaster, err := startBroadcastWithAlterAliasLock(ctx, req.GetDbName(), req.GetCollectionName(), req.GetAlias())
if err != nil {
return err
}
defer broadcaster.Close()
if err := c.meta.CheckIfAliasAlterable(ctx, req.GetDbName(), req.GetAlias(), req.GetCollectionName()); err != nil {
return err
}
db, err := c.meta.GetDatabaseByName(ctx, req.GetDbName(), typeutil.MaxTimestamp)
if err != nil {
return err
}
collection, err := c.meta.GetCollectionByName(ctx, req.GetDbName(), req.GetCollectionName(), typeutil.MaxTimestamp)
if err != nil {
return err
}
msg := message.NewAlterAliasMessageBuilderV2().
WithHeader(&message.AlterAliasMessageHeader{
DbId: db.ID,
DbName: req.GetDbName(),
CollectionId: collection.CollectionID,
Alias: req.GetAlias(),
CollectionName: req.GetCollectionName(),
}).
WithBody(&message.AlterAliasMessageBody{}).
WithBroadcast([]string{streaming.WAL().ControlChannel()}).
MustBuildBroadcast()
_, err = broadcaster.Broadcast(ctx, msg)
return err
}
func (c *DDLCallback) alterAliasV2AckCallback(ctx context.Context, result message.BroadcastResultAlterAliasMessageV2) error {
if err := c.meta.AlterAlias(ctx, result); err != nil {
return err
}
return c.ExpireCaches(ctx,
ce.NewBuilder().WithLegacyProxyCollectionMetaCache(
ce.OptLPCMDBName(result.Message.Header().DbName),
ce.OptLPCMCollectionName(result.Message.Header().Alias),
ce.OptLPCMMsgType(commonpb.MsgType_AlterAlias)),
result.GetControlChannelResult().TimeTick)
}

View File

@ -0,0 +1,77 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"strings"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/distributed/streaming"
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/broadcast"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message/ce"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
func (c *Core) broadcastDropAlias(ctx context.Context, req *milvuspb.DropAliasRequest) error {
req.DbName = strings.TrimSpace(req.DbName)
req.Alias = strings.TrimSpace(req.Alias)
broadcaster, err := broadcast.StartBroadcastWithResourceKeys(ctx,
message.NewSharedDBNameResourceKey(req.GetDbName()),
message.NewExclusiveCollectionNameResourceKey(req.GetDbName(), req.GetAlias()))
if err != nil {
return err
}
defer broadcaster.Close()
db, err := c.meta.GetDatabaseByName(ctx, req.GetDbName(), typeutil.MaxTimestamp)
if err != nil {
return err
}
if err := c.meta.CheckIfAliasDroppable(ctx, req.GetDbName(), req.GetAlias()); err != nil {
return err
}
msg := message.NewDropAliasMessageBuilderV2().
WithHeader(&message.DropAliasMessageHeader{
DbId: db.ID,
DbName: req.GetDbName(),
Alias: req.GetAlias(),
}).
WithBody(&message.DropAliasMessageBody{}).
WithBroadcast([]string{streaming.WAL().ControlChannel()}).
MustBuildBroadcast()
_, err = broadcaster.Broadcast(ctx, msg)
return err
}
func (c *DDLCallback) dropAliasV2AckCallback(ctx context.Context, result message.BroadcastResultDropAliasMessageV2) error {
if err := c.meta.DropAlias(ctx, result); err != nil {
return errors.Wrap(err, "failed to drop alias")
}
return c.ExpireCaches(ctx,
ce.NewBuilder().WithLegacyProxyCollectionMetaCache(
ce.OptLPCMDBName(result.Message.Header().DbName),
ce.OptLPCMCollectionName(result.Message.Header().Alias),
ce.OptLPCMMsgType(commonpb.MsgType_DropAlias),
),
result.GetControlChannelResult().TimeTick,
)
}

View File

@ -1,55 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/util/proxyutil"
)
type dropAliasTask struct {
baseTask
Req *milvuspb.DropAliasRequest
}
func (t *dropAliasTask) Prepare(ctx context.Context) error {
if err := CheckMsgType(t.Req.GetBase().GetMsgType(), commonpb.MsgType_DropAlias); err != nil {
return err
}
return nil
}
func (t *dropAliasTask) Execute(ctx context.Context) error {
collID := t.core.meta.GetCollectionID(ctx, t.Req.GetDbName(), t.Req.GetAlias())
// drop alias is atomic enough.
if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetAlias()}, collID, "", t.GetTs(), proxyutil.SetMsgType(commonpb.MsgType_DropAlias)); err != nil {
return err
}
return t.core.meta.DropAlias(ctx, t.Req.GetDbName(), t.Req.GetAlias(), t.GetTs())
}
func (t *dropAliasTask) GetLockerKey() LockerKey {
collection := t.core.getCollectionIDStr(t.ctx, t.Req.GetDbName(), t.Req.GetAlias(), 0)
return NewLockerKeyChain(
NewClusterLockerKey(false),
NewDatabaseLockerKey(t.Req.GetDbName(), false),
NewCollectionLockerKey(collection, true),
)
}

View File

@ -1,104 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rootcoord
import (
"context"
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
)
func Test_dropAliasTask_Prepare(t *testing.T) {
t.Run("invalid msg type", func(t *testing.T) {
task := &dropAliasTask{
Req: &milvuspb.DropAliasRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection}},
}
err := task.Prepare(context.Background())
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
task := &dropAliasTask{
Req: &milvuspb.DropAliasRequest{Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropAlias}},
}
err := task.Prepare(context.Background())
assert.NoError(t, err)
})
}
func Test_dropAliasTask_Execute(t *testing.T) {
t.Run("failed to expire cache", func(t *testing.T) {
mockMeta := mockrootcoord.NewIMetaTable(t)
mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111)
alias := funcutil.GenRandomStr()
core := newTestCore(withInvalidProxyManager(), withMeta(mockMeta))
task := &dropAliasTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.DropAliasRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropAlias},
Alias: alias,
},
}
err := task.Execute(context.Background())
assert.Error(t, err)
})
t.Run("failed to drop alias", func(t *testing.T) {
mockMeta := mockrootcoord.NewIMetaTable(t)
mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111)
mockMeta.EXPECT().DropAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(errors.New("failed to alter alias"))
core := newTestCore(withValidProxyManager(), withMeta(mockMeta))
alias := funcutil.GenRandomStr()
task := &dropAliasTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.DropAliasRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropAlias},
Alias: alias,
},
}
err := task.Execute(context.Background())
assert.Error(t, err)
})
t.Run("normal case", func(t *testing.T) {
mockMeta := mockrootcoord.NewIMetaTable(t)
mockMeta.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(111)
mockMeta.EXPECT().DropAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil)
core := newTestCore(withValidProxyManager(), withMeta(mockMeta))
alias := funcutil.GenRandomStr()
task := &dropAliasTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.DropAliasRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropAlias},
Alias: alias,
},
}
err := task.Execute(context.Background())
assert.NoError(t, err)
})
}

View File

@ -50,11 +50,17 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
) )
var errIgnoredAlterAlias = errors.New("ignored alter alias") // alias already created on current collection, so it can be ignored.
type MetaTableChecker interface { type MetaTableChecker interface {
RBACChecker RBACChecker
CheckIfDatabaseCreatable(ctx context.Context, req *milvuspb.CreateDatabaseRequest) error CheckIfDatabaseCreatable(ctx context.Context, req *milvuspb.CreateDatabaseRequest) error
CheckIfDatabaseDroppable(ctx context.Context, req *milvuspb.DropDatabaseRequest) error CheckIfDatabaseDroppable(ctx context.Context, req *milvuspb.DropDatabaseRequest) error
CheckIfAliasCreatable(ctx context.Context, dbName string, alias string, collectionName string) error
CheckIfAliasAlterable(ctx context.Context, dbName string, alias string, collectionName string) error
CheckIfAliasDroppable(ctx context.Context, dbName string, alias string) error
} }
//go:generate mockery --name=IMetaTable --structname=MockIMetaTable --output=./ --filename=mock_meta_table.go --with-expecter --inpackage //go:generate mockery --name=IMetaTable --structname=MockIMetaTable --output=./ --filename=mock_meta_table.go --with-expecter --inpackage
@ -89,11 +95,13 @@ type IMetaTable interface {
AddPartition(ctx context.Context, partition *model.Partition) error AddPartition(ctx context.Context, partition *model.Partition) error
ChangePartitionState(ctx context.Context, collectionID UniqueID, partitionID UniqueID, state pb.PartitionState, ts Timestamp) error ChangePartitionState(ctx context.Context, collectionID UniqueID, partitionID UniqueID, state pb.PartitionState, ts Timestamp) error
RemovePartition(ctx context.Context, dbID int64, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error RemovePartition(ctx context.Context, dbID int64, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error
CreateAlias(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error
DropAlias(ctx context.Context, dbName string, alias string, ts Timestamp) error // Alias
AlterAlias(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error AlterAlias(ctx context.Context, result message.BroadcastResultAlterAliasMessageV2) error
DropAlias(ctx context.Context, result message.BroadcastResultDropAliasMessageV2) error
DescribeAlias(ctx context.Context, dbName string, alias string, ts Timestamp) (string, error) DescribeAlias(ctx context.Context, dbName string, alias string, ts Timestamp) (string, error)
ListAliases(ctx context.Context, dbName string, collectionName string, ts Timestamp) ([]string, error) ListAliases(ctx context.Context, dbName string, collectionName string, ts Timestamp) ([]string, error)
AlterCollection(ctx context.Context, oldColl *model.Collection, newColl *model.Collection, ts Timestamp, fieldModify bool) error AlterCollection(ctx context.Context, oldColl *model.Collection, newColl *model.Collection, ts Timestamp, fieldModify bool) error
RenameCollection(ctx context.Context, dbName string, oldName string, newDBName string, newName string, ts Timestamp) error RenameCollection(ctx context.Context, dbName string, oldName string, newDBName string, newName string, ts Timestamp) error
GetGeneralCount(ctx context.Context) int GetGeneralCount(ctx context.Context) int
@ -1113,9 +1121,9 @@ func (mt *MetaTable) RemovePartition(ctx context.Context, dbID int64, collection
return nil return nil
} }
func (mt *MetaTable) CreateAlias(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error { func (mt *MetaTable) CheckIfAliasCreatable(ctx context.Context, dbName string, alias string, collectionName string) error {
mt.ddLock.Lock() mt.ddLock.RLock()
defer mt.ddLock.Unlock() defer mt.ddLock.RUnlock()
// backward compatibility for rolling upgrade // backward compatibility for rolling upgrade
if dbName == "" { if dbName == "" {
log.Ctx(ctx).Warn("db name is empty", zap.String("alias", alias), zap.String("collection", collectionName)) log.Ctx(ctx).Warn("db name is empty", zap.String("alias", alias), zap.String("collection", collectionName))
@ -1149,8 +1157,8 @@ func (mt *MetaTable) CreateAlias(ctx context.Context, dbName string, alias strin
// check if alias exists. // check if alias exists.
aliasedCollectionID, ok := mt.aliases.get(dbName, alias) aliasedCollectionID, ok := mt.aliases.get(dbName, alias)
if ok && aliasedCollectionID == collectionID { if ok && aliasedCollectionID == collectionID {
log.Ctx(ctx).Warn("add duplicate alias", zap.String("alias", alias), zap.String("collection", collectionName), zap.Uint64("ts", ts)) log.Ctx(ctx).Warn("add duplicate alias", zap.String("alias", alias), zap.String("collection", collectionName))
return nil return errIgnoredAlterAlias
} else if ok { } else if ok {
// TODO: better to check if aliasedCollectionID exist or is available, though not very possible. // TODO: better to check if aliasedCollectionID exist or is available, though not very possible.
aliasedColl := mt.collID2Meta[aliasedCollectionID] aliasedColl := mt.collID2Meta[aliasedCollectionID]
@ -1164,63 +1172,69 @@ func (mt *MetaTable) CreateAlias(ctx context.Context, dbName string, alias strin
// you cannot alias to a non-existent collection. // you cannot alias to a non-existent collection.
return merr.WrapErrCollectionNotFoundWithDB(dbName, collectionName) return merr.WrapErrCollectionNotFoundWithDB(dbName, collectionName)
} }
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
if err := mt.catalog.CreateAlias(ctx1, &model.Alias{
Name: alias,
CollectionID: collectionID,
CreatedTime: ts,
State: pb.AliasState_AliasCreated,
DbID: coll.DBID,
}, ts); err != nil {
return err
}
mt.aliases.insert(dbName, alias, collectionID)
log.Ctx(ctx).Info("create alias",
zap.String("db", dbName),
zap.String("alias", alias),
zap.String("collection", collectionName),
zap.Int64("id", coll.CollectionID),
zap.Uint64("ts", ts),
)
return nil return nil
} }
func (mt *MetaTable) DropAlias(ctx context.Context, dbName string, alias string, ts Timestamp) error { func (mt *MetaTable) CheckIfAliasDroppable(ctx context.Context, dbName string, alias string) error {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
if _, ok := mt.aliases.get(dbName, alias); !ok {
return merr.WrapErrAliasNotFound(dbName, alias)
}
return nil
}
func (mt *MetaTable) DropAlias(ctx context.Context, result message.BroadcastResultDropAliasMessageV2) error {
mt.ddLock.Lock() mt.ddLock.Lock()
defer mt.ddLock.Unlock() defer mt.ddLock.Unlock()
// backward compatibility for rolling upgrade
if dbName == "" { header := result.Message.Header()
log.Ctx(ctx).Warn("db name is empty", zap.String("alias", alias), zap.Uint64("ts", ts))
dbName = util.DefaultDBName
}
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue()) ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue())
db, err := mt.getDatabaseByNameInternal(ctx, dbName, typeutil.MaxTimestamp) if err := mt.catalog.DropAlias(ctx1, header.DbId, header.Alias, result.GetControlChannelResult().TimeTick); err != nil {
if err != nil {
return err return err
} }
if err := mt.catalog.DropAlias(ctx1, db.ID, alias, ts); err != nil { mt.aliases.remove(header.DbName, header.Alias)
return err
}
mt.aliases.remove(dbName, alias)
log.Ctx(ctx).Info("drop alias", log.Ctx(ctx).Info("drop alias",
zap.String("db", dbName), zap.String("db", header.DbName),
zap.String("alias", alias), zap.String("alias", header.Alias),
zap.Uint64("ts", ts), zap.Uint64("ts", result.GetControlChannelResult().TimeTick),
) )
return nil return nil
} }
func (mt *MetaTable) AlterAlias(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error { func (mt *MetaTable) AlterAlias(ctx context.Context, result message.BroadcastResultAlterAliasMessageV2) error {
mt.ddLock.Lock() mt.ddLock.Lock()
defer mt.ddLock.Unlock() defer mt.ddLock.Unlock()
header := result.Message.Header()
if err := mt.catalog.AlterAlias(ctx, &model.Alias{
Name: header.Alias,
CollectionID: header.CollectionId,
CreatedTime: result.GetControlChannelResult().TimeTick,
State: pb.AliasState_AliasCreated,
DbID: header.DbId,
}, result.GetControlChannelResult().TimeTick); err != nil {
return err
}
// alias switch to another collection anyway.
mt.aliases.insert(header.DbName, header.Alias, header.CollectionId)
log.Ctx(ctx).Info("alter alias",
zap.String("db", header.DbName),
zap.String("alias", header.Alias),
zap.String("collection", header.CollectionName),
zap.Uint64("ts", result.GetControlChannelResult().TimeTick),
)
return nil
}
func (mt *MetaTable) CheckIfAliasAlterable(ctx context.Context, dbName string, alias string, collectionName string) error {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
// backward compatibility for rolling upgrade // backward compatibility for rolling upgrade
if dbName == "" { if dbName == "" {
log.Ctx(ctx).Warn("db name is empty", zap.String("alias", alias), zap.String("collection", collectionName)) log.Ctx(ctx).Warn("db name is empty", zap.String("alias", alias), zap.String("collection", collectionName))
@ -1255,33 +1269,13 @@ func (mt *MetaTable) AlterAlias(ctx context.Context, dbName string, alias string
} }
// check if alias exists. // check if alias exists.
_, ok = mt.aliases.get(dbName, alias) existAliasCollectionID, ok := mt.aliases.get(dbName, alias)
if !ok { if !ok {
//
return merr.WrapErrAliasNotFound(dbName, alias) return merr.WrapErrAliasNotFound(dbName, alias)
} }
if existAliasCollectionID == collectionID {
ctx1 := contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName.GetValue()) return errIgnoredAlterAlias
if err := mt.catalog.AlterAlias(ctx1, &model.Alias{
Name: alias,
CollectionID: collectionID,
CreatedTime: ts,
State: pb.AliasState_AliasCreated,
DbID: coll.DBID,
}, ts); err != nil {
return err
} }
// alias switch to another collection anyway.
mt.aliases.insert(dbName, alias, collectionID)
log.Ctx(ctx).Info("alter alias",
zap.String("db", dbName),
zap.String("alias", alias),
zap.String("collection", collectionName),
zap.Uint64("ts", ts),
)
return nil return nil
} }

View File

@ -2136,17 +2136,7 @@ func TestMetaTable_EmtpyDatabaseName(t *testing.T) {
} }
mt.names.insert(util.DefaultDBName, "name", 1) mt.names.insert(util.DefaultDBName, "name", 1)
err := mt.CreateAlias(context.TODO(), "", "name", "name", typeutil.MaxTimestamp) err := mt.CheckIfAliasCreatable(context.TODO(), "", "name", "name")
assert.Error(t, err)
})
t.Run("DropAlias with empty db", func(t *testing.T) {
mt := &MetaTable{
names: newNameDb(),
}
mt.names.insert(util.DefaultDBName, "name", 1)
err := mt.DropAlias(context.TODO(), "", "name", typeutil.MaxTimestamp)
assert.Error(t, err) assert.Error(t, err)
}) })
} }

View File

@ -72,9 +72,8 @@ type mockMetaTable struct {
AddPartitionFunc func(ctx context.Context, partition *model.Partition) error AddPartitionFunc func(ctx context.Context, partition *model.Partition) error
ChangePartitionStateFunc func(ctx context.Context, collectionID UniqueID, partitionID UniqueID, state pb.PartitionState, ts Timestamp) error ChangePartitionStateFunc func(ctx context.Context, collectionID UniqueID, partitionID UniqueID, state pb.PartitionState, ts Timestamp) error
RemovePartitionFunc func(ctx context.Context, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error RemovePartitionFunc func(ctx context.Context, collectionID UniqueID, partitionID UniqueID, ts Timestamp) error
CreateAliasFunc func(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error AlterAliasFunc func(ctx context.Context, result message.BroadcastResultAlterAliasMessageV2) error
AlterAliasFunc func(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error DropAliasFunc func(ctx context.Context, result message.BroadcastResultDropAliasMessageV2) error
DropAliasFunc func(ctx context.Context, dbName string, alias string, ts Timestamp) error
IsAliasFunc func(ctx context.Context, dbName, name string) bool IsAliasFunc func(ctx context.Context, dbName, name string) bool
DescribeAliasFunc func(ctx context.Context, dbName, alias string, ts Timestamp) (string, error) DescribeAliasFunc func(ctx context.Context, dbName, alias string, ts Timestamp) (string, error)
ListAliasesFunc func(ctx context.Context, dbName, collectionName string, ts Timestamp) ([]string, error) ListAliasesFunc func(ctx context.Context, dbName, collectionName string, ts Timestamp) ([]string, error)
@ -152,16 +151,12 @@ func (m mockMetaTable) RemovePartition(ctx context.Context, dbID int64, collecti
return m.RemovePartitionFunc(ctx, collectionID, partitionID, ts) return m.RemovePartitionFunc(ctx, collectionID, partitionID, ts)
} }
func (m mockMetaTable) CreateAlias(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error { func (m mockMetaTable) AlterAlias(ctx context.Context, result message.BroadcastResultAlterAliasMessageV2) error {
return m.CreateAliasFunc(ctx, dbName, alias, collectionName, ts) return m.AlterAliasFunc(ctx, result)
} }
func (m mockMetaTable) AlterAlias(ctx context.Context, dbName, alias string, collectionName string, ts Timestamp) error { func (m mockMetaTable) DropAlias(ctx context.Context, result message.BroadcastResultDropAliasMessageV2) error {
return m.AlterAliasFunc(ctx, dbName, alias, collectionName, ts) return m.DropAliasFunc(ctx, result)
}
func (m mockMetaTable) DropAlias(ctx context.Context, dbName, alias string, ts Timestamp) error {
return m.DropAliasFunc(ctx, dbName, alias, ts)
} }
func (m mockMetaTable) IsAlias(ctx context.Context, dbName, name string) bool { func (m mockMetaTable) IsAlias(ctx context.Context, dbName, name string) bool {
@ -511,13 +506,10 @@ func withInvalidMeta() Opt {
meta.ChangePartitionStateFunc = func(ctx context.Context, collectionID UniqueID, partitionID UniqueID, state pb.PartitionState, ts Timestamp) error { meta.ChangePartitionStateFunc = func(ctx context.Context, collectionID UniqueID, partitionID UniqueID, state pb.PartitionState, ts Timestamp) error {
return errors.New("error mock ChangePartitionState") return errors.New("error mock ChangePartitionState")
} }
meta.CreateAliasFunc = func(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error { meta.AlterAliasFunc = func(ctx context.Context, result message.BroadcastResultAlterAliasMessageV2) error {
return errors.New("error mock CreateAlias")
}
meta.AlterAliasFunc = func(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error {
return errors.New("error mock AlterAlias") return errors.New("error mock AlterAlias")
} }
meta.DropAliasFunc = func(ctx context.Context, dbName string, alias string, ts Timestamp) error { meta.DropAliasFunc = func(ctx context.Context, result message.BroadcastResultDropAliasMessageV2) error {
return errors.New("error mock DropAlias") return errors.New("error mock DropAlias")
} }
meta.AddCredentialFunc = func(ctx context.Context, credInfo *internalpb.CredentialInfo) error { meta.AddCredentialFunc = func(ctx context.Context, credInfo *internalpb.CredentialInfo) error {

View File

@ -128,17 +128,17 @@ func (_c *IMetaTable_AddPartition_Call) RunAndReturn(run func(context.Context, *
return _c return _c
} }
// AlterAlias provides a mock function with given fields: ctx, dbName, alias, collectionName, ts // AlterAlias provides a mock function with given fields: ctx, result
func (_m *IMetaTable) AlterAlias(ctx context.Context, dbName string, alias string, collectionName string, ts uint64) error { func (_m *IMetaTable) AlterAlias(ctx context.Context, result message.BroadcastResult[*messagespb.AlterAliasMessageHeader, *messagespb.AlterAliasMessageBody]) error {
ret := _m.Called(ctx, dbName, alias, collectionName, ts) ret := _m.Called(ctx, result)
if len(ret) == 0 { if len(ret) == 0 {
panic("no return value specified for AlterAlias") panic("no return value specified for AlterAlias")
} }
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, string, uint64) error); ok { if rf, ok := ret.Get(0).(func(context.Context, message.BroadcastResult[*messagespb.AlterAliasMessageHeader, *messagespb.AlterAliasMessageBody]) error); ok {
r0 = rf(ctx, dbName, alias, collectionName, ts) r0 = rf(ctx, result)
} else { } else {
r0 = ret.Error(0) r0 = ret.Error(0)
} }
@ -153,17 +153,14 @@ type IMetaTable_AlterAlias_Call struct {
// AlterAlias is a helper method to define mock.On call // AlterAlias is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - dbName string // - result message.BroadcastResult[*messagespb.AlterAliasMessageHeader,*messagespb.AlterAliasMessageBody]
// - alias string func (_e *IMetaTable_Expecter) AlterAlias(ctx interface{}, result interface{}) *IMetaTable_AlterAlias_Call {
// - collectionName string return &IMetaTable_AlterAlias_Call{Call: _e.mock.On("AlterAlias", ctx, result)}
// - ts uint64
func (_e *IMetaTable_Expecter) AlterAlias(ctx interface{}, dbName interface{}, alias interface{}, collectionName interface{}, ts interface{}) *IMetaTable_AlterAlias_Call {
return &IMetaTable_AlterAlias_Call{Call: _e.mock.On("AlterAlias", ctx, dbName, alias, collectionName, ts)}
} }
func (_c *IMetaTable_AlterAlias_Call) Run(run func(ctx context.Context, dbName string, alias string, collectionName string, ts uint64)) *IMetaTable_AlterAlias_Call { func (_c *IMetaTable_AlterAlias_Call) Run(run func(ctx context.Context, result message.BroadcastResult[*messagespb.AlterAliasMessageHeader, *messagespb.AlterAliasMessageBody])) *IMetaTable_AlterAlias_Call {
_c.Call.Run(func(args mock.Arguments) { _c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(string), args[4].(uint64)) run(args[0].(context.Context), args[1].(message.BroadcastResult[*messagespb.AlterAliasMessageHeader, *messagespb.AlterAliasMessageBody]))
}) })
return _c return _c
} }
@ -173,7 +170,7 @@ func (_c *IMetaTable_AlterAlias_Call) Return(_a0 error) *IMetaTable_AlterAlias_C
return _c return _c
} }
func (_c *IMetaTable_AlterAlias_Call) RunAndReturn(run func(context.Context, string, string, string, uint64) error) *IMetaTable_AlterAlias_Call { func (_c *IMetaTable_AlterAlias_Call) RunAndReturn(run func(context.Context, message.BroadcastResult[*messagespb.AlterAliasMessageHeader, *messagespb.AlterAliasMessageBody]) error) *IMetaTable_AlterAlias_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }
@ -528,6 +525,152 @@ func (_c *IMetaTable_CheckIfAddCredential_Call) RunAndReturn(run func(context.Co
return _c return _c
} }
// CheckIfAliasAlterable provides a mock function with given fields: ctx, dbName, alias, collectionName
func (_m *IMetaTable) CheckIfAliasAlterable(ctx context.Context, dbName string, alias string, collectionName string) error {
ret := _m.Called(ctx, dbName, alias, collectionName)
if len(ret) == 0 {
panic("no return value specified for CheckIfAliasAlterable")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, string) error); ok {
r0 = rf(ctx, dbName, alias, collectionName)
} else {
r0 = ret.Error(0)
}
return r0
}
// IMetaTable_CheckIfAliasAlterable_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckIfAliasAlterable'
type IMetaTable_CheckIfAliasAlterable_Call struct {
*mock.Call
}
// CheckIfAliasAlterable is a helper method to define mock.On call
// - ctx context.Context
// - dbName string
// - alias string
// - collectionName string
func (_e *IMetaTable_Expecter) CheckIfAliasAlterable(ctx interface{}, dbName interface{}, alias interface{}, collectionName interface{}) *IMetaTable_CheckIfAliasAlterable_Call {
return &IMetaTable_CheckIfAliasAlterable_Call{Call: _e.mock.On("CheckIfAliasAlterable", ctx, dbName, alias, collectionName)}
}
func (_c *IMetaTable_CheckIfAliasAlterable_Call) Run(run func(ctx context.Context, dbName string, alias string, collectionName string)) *IMetaTable_CheckIfAliasAlterable_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(string))
})
return _c
}
func (_c *IMetaTable_CheckIfAliasAlterable_Call) Return(_a0 error) *IMetaTable_CheckIfAliasAlterable_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *IMetaTable_CheckIfAliasAlterable_Call) RunAndReturn(run func(context.Context, string, string, string) error) *IMetaTable_CheckIfAliasAlterable_Call {
_c.Call.Return(run)
return _c
}
// CheckIfAliasCreatable provides a mock function with given fields: ctx, dbName, alias, collectionName
func (_m *IMetaTable) CheckIfAliasCreatable(ctx context.Context, dbName string, alias string, collectionName string) error {
ret := _m.Called(ctx, dbName, alias, collectionName)
if len(ret) == 0 {
panic("no return value specified for CheckIfAliasCreatable")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, string) error); ok {
r0 = rf(ctx, dbName, alias, collectionName)
} else {
r0 = ret.Error(0)
}
return r0
}
// IMetaTable_CheckIfAliasCreatable_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckIfAliasCreatable'
type IMetaTable_CheckIfAliasCreatable_Call struct {
*mock.Call
}
// CheckIfAliasCreatable is a helper method to define mock.On call
// - ctx context.Context
// - dbName string
// - alias string
// - collectionName string
func (_e *IMetaTable_Expecter) CheckIfAliasCreatable(ctx interface{}, dbName interface{}, alias interface{}, collectionName interface{}) *IMetaTable_CheckIfAliasCreatable_Call {
return &IMetaTable_CheckIfAliasCreatable_Call{Call: _e.mock.On("CheckIfAliasCreatable", ctx, dbName, alias, collectionName)}
}
func (_c *IMetaTable_CheckIfAliasCreatable_Call) Run(run func(ctx context.Context, dbName string, alias string, collectionName string)) *IMetaTable_CheckIfAliasCreatable_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(string))
})
return _c
}
func (_c *IMetaTable_CheckIfAliasCreatable_Call) Return(_a0 error) *IMetaTable_CheckIfAliasCreatable_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *IMetaTable_CheckIfAliasCreatable_Call) RunAndReturn(run func(context.Context, string, string, string) error) *IMetaTable_CheckIfAliasCreatable_Call {
_c.Call.Return(run)
return _c
}
// CheckIfAliasDroppable provides a mock function with given fields: ctx, dbName, alias
func (_m *IMetaTable) CheckIfAliasDroppable(ctx context.Context, dbName string, alias string) error {
ret := _m.Called(ctx, dbName, alias)
if len(ret) == 0 {
panic("no return value specified for CheckIfAliasDroppable")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok {
r0 = rf(ctx, dbName, alias)
} else {
r0 = ret.Error(0)
}
return r0
}
// IMetaTable_CheckIfAliasDroppable_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckIfAliasDroppable'
type IMetaTable_CheckIfAliasDroppable_Call struct {
*mock.Call
}
// CheckIfAliasDroppable is a helper method to define mock.On call
// - ctx context.Context
// - dbName string
// - alias string
func (_e *IMetaTable_Expecter) CheckIfAliasDroppable(ctx interface{}, dbName interface{}, alias interface{}) *IMetaTable_CheckIfAliasDroppable_Call {
return &IMetaTable_CheckIfAliasDroppable_Call{Call: _e.mock.On("CheckIfAliasDroppable", ctx, dbName, alias)}
}
func (_c *IMetaTable_CheckIfAliasDroppable_Call) Run(run func(ctx context.Context, dbName string, alias string)) *IMetaTable_CheckIfAliasDroppable_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(string))
})
return _c
}
func (_c *IMetaTable_CheckIfAliasDroppable_Call) Return(_a0 error) *IMetaTable_CheckIfAliasDroppable_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *IMetaTable_CheckIfAliasDroppable_Call) RunAndReturn(run func(context.Context, string, string) error) *IMetaTable_CheckIfAliasDroppable_Call {
_c.Call.Return(run)
return _c
}
// CheckIfCreateRole provides a mock function with given fields: ctx, req // CheckIfCreateRole provides a mock function with given fields: ctx, req
func (_m *IMetaTable) CheckIfCreateRole(ctx context.Context, req *milvuspb.CreateRoleRequest) error { func (_m *IMetaTable) CheckIfCreateRole(ctx context.Context, req *milvuspb.CreateRoleRequest) error {
ret := _m.Called(ctx, req) ret := _m.Called(ctx, req)
@ -1045,56 +1188,6 @@ func (_c *IMetaTable_CheckIfUpdateCredential_Call) RunAndReturn(run func(context
return _c return _c
} }
// CreateAlias provides a mock function with given fields: ctx, dbName, alias, collectionName, ts
func (_m *IMetaTable) CreateAlias(ctx context.Context, dbName string, alias string, collectionName string, ts uint64) error {
ret := _m.Called(ctx, dbName, alias, collectionName, ts)
if len(ret) == 0 {
panic("no return value specified for CreateAlias")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, string, uint64) error); ok {
r0 = rf(ctx, dbName, alias, collectionName, ts)
} else {
r0 = ret.Error(0)
}
return r0
}
// IMetaTable_CreateAlias_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateAlias'
type IMetaTable_CreateAlias_Call struct {
*mock.Call
}
// CreateAlias is a helper method to define mock.On call
// - ctx context.Context
// - dbName string
// - alias string
// - collectionName string
// - ts uint64
func (_e *IMetaTable_Expecter) CreateAlias(ctx interface{}, dbName interface{}, alias interface{}, collectionName interface{}, ts interface{}) *IMetaTable_CreateAlias_Call {
return &IMetaTable_CreateAlias_Call{Call: _e.mock.On("CreateAlias", ctx, dbName, alias, collectionName, ts)}
}
func (_c *IMetaTable_CreateAlias_Call) Run(run func(ctx context.Context, dbName string, alias string, collectionName string, ts uint64)) *IMetaTable_CreateAlias_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(string), args[4].(uint64))
})
return _c
}
func (_c *IMetaTable_CreateAlias_Call) Return(_a0 error) *IMetaTable_CreateAlias_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *IMetaTable_CreateAlias_Call) RunAndReturn(run func(context.Context, string, string, string, uint64) error) *IMetaTable_CreateAlias_Call {
_c.Call.Return(run)
return _c
}
// CreateDatabase provides a mock function with given fields: ctx, db, ts // CreateDatabase provides a mock function with given fields: ctx, db, ts
func (_m *IMetaTable) CreateDatabase(ctx context.Context, db *model.Database, ts uint64) error { func (_m *IMetaTable) CreateDatabase(ctx context.Context, db *model.Database, ts uint64) error {
ret := _m.Called(ctx, db, ts) ret := _m.Called(ctx, db, ts)
@ -1344,17 +1437,17 @@ func (_c *IMetaTable_DescribeAlias_Call) RunAndReturn(run func(context.Context,
return _c return _c
} }
// DropAlias provides a mock function with given fields: ctx, dbName, alias, ts // DropAlias provides a mock function with given fields: ctx, result
func (_m *IMetaTable) DropAlias(ctx context.Context, dbName string, alias string, ts uint64) error { func (_m *IMetaTable) DropAlias(ctx context.Context, result message.BroadcastResult[*messagespb.DropAliasMessageHeader, *messagespb.DropAliasMessageBody]) error {
ret := _m.Called(ctx, dbName, alias, ts) ret := _m.Called(ctx, result)
if len(ret) == 0 { if len(ret) == 0 {
panic("no return value specified for DropAlias") panic("no return value specified for DropAlias")
} }
var r0 error var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, uint64) error); ok { if rf, ok := ret.Get(0).(func(context.Context, message.BroadcastResult[*messagespb.DropAliasMessageHeader, *messagespb.DropAliasMessageBody]) error); ok {
r0 = rf(ctx, dbName, alias, ts) r0 = rf(ctx, result)
} else { } else {
r0 = ret.Error(0) r0 = ret.Error(0)
} }
@ -1369,16 +1462,14 @@ type IMetaTable_DropAlias_Call struct {
// DropAlias is a helper method to define mock.On call // DropAlias is a helper method to define mock.On call
// - ctx context.Context // - ctx context.Context
// - dbName string // - result message.BroadcastResult[*messagespb.DropAliasMessageHeader,*messagespb.DropAliasMessageBody]
// - alias string func (_e *IMetaTable_Expecter) DropAlias(ctx interface{}, result interface{}) *IMetaTable_DropAlias_Call {
// - ts uint64 return &IMetaTable_DropAlias_Call{Call: _e.mock.On("DropAlias", ctx, result)}
func (_e *IMetaTable_Expecter) DropAlias(ctx interface{}, dbName interface{}, alias interface{}, ts interface{}) *IMetaTable_DropAlias_Call {
return &IMetaTable_DropAlias_Call{Call: _e.mock.On("DropAlias", ctx, dbName, alias, ts)}
} }
func (_c *IMetaTable_DropAlias_Call) Run(run func(ctx context.Context, dbName string, alias string, ts uint64)) *IMetaTable_DropAlias_Call { func (_c *IMetaTable_DropAlias_Call) Run(run func(ctx context.Context, result message.BroadcastResult[*messagespb.DropAliasMessageHeader, *messagespb.DropAliasMessageBody])) *IMetaTable_DropAlias_Call {
_c.Call.Run(func(args mock.Arguments) { _c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(uint64)) run(args[0].(context.Context), args[1].(message.BroadcastResult[*messagespb.DropAliasMessageHeader, *messagespb.DropAliasMessageBody]))
}) })
return _c return _c
} }
@ -1388,7 +1479,7 @@ func (_c *IMetaTable_DropAlias_Call) Return(_a0 error) *IMetaTable_DropAlias_Cal
return _c return _c
} }
func (_c *IMetaTable_DropAlias_Call) RunAndReturn(run func(context.Context, string, string, uint64) error) *IMetaTable_DropAlias_Call { func (_c *IMetaTable_DropAlias_Call) RunAndReturn(run func(context.Context, message.BroadcastResult[*messagespb.DropAliasMessageHeader, *messagespb.DropAliasMessageBody]) error) *IMetaTable_DropAlias_Call {
_c.Call.Return(run) _c.Call.Return(run)
return _c return _c
} }

View File

@ -1865,49 +1865,27 @@ func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest)
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.TotalLabel).Inc() metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder("CreateAlias") tr := timerecord.NewTimeRecorder("CreateAlias")
logger := log.Ctx(ctx).With(
log.Ctx(ctx).Info("received request to create alias",
zap.String("role", typeutil.RootCoordRole), zap.String("role", typeutil.RootCoordRole),
zap.String("dbName", in.GetDbName()),
zap.String("alias", in.GetAlias()), zap.String("alias", in.GetAlias()),
zap.String("collection", in.GetCollectionName())) zap.String("collectionName", in.GetCollectionName()))
logger.Info("received request to create alias")
t := &createAliasTask{ if err := c.broadcastCreateAlias(ctx, in); err != nil {
baseTask: newBaseTask(ctx, c), if errors.Is(err, errIgnoredAlterAlias) {
Req: in, logger.Info("create alias already set on this collection, ignore it")
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.SuccessLabel).Inc()
return merr.Success(), nil
} }
logger.Info("failed to create alias", zap.Error(err))
if err := c.scheduler.AddTask(t); err != nil {
log.Ctx(ctx).Info("failed to enqueue request to create alias",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("alias", in.GetAlias()),
zap.String("collection", in.GetCollectionName()))
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.FailLabel).Inc()
return merr.Status(err), nil
}
if err := t.WaitToFinish(); err != nil {
log.Ctx(ctx).Info("failed to create alias",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("alias", in.GetAlias()),
zap.String("collection", in.GetCollectionName()),
zap.Uint64("ts", t.GetTs()))
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.FailLabel).Inc() metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.FailLabel).Inc()
return merr.Status(err), nil return merr.Status(err), nil
} }
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("CreateAlias").Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordDDLReqLatency.WithLabelValues("CreateAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("CreateAlias").Observe(float64(t.queueDur.Milliseconds())) logger.Info("done to create alias")
log.Ctx(ctx).Info("done to create alias",
zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.GetAlias()),
zap.String("collection", in.GetCollectionName()),
zap.Uint64("ts", t.GetTs()))
return merr.Success(), nil return merr.Success(), nil
} }
@ -1919,45 +1897,25 @@ func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*c
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc() metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder("DropAlias") tr := timerecord.NewTimeRecorder("DropAlias")
logger := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole),
log.Ctx(ctx).Info("received request to drop alias", zap.String("dbName", in.GetDbName()),
zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.GetAlias())) zap.String("alias", in.GetAlias()))
logger.Info("received request to drop alias")
t := &dropAliasTask{ if err := c.broadcastDropAlias(ctx, in); err != nil {
baseTask: newBaseTask(ctx, c), if errors.Is(err, merr.ErrAliasNotFound) {
Req: in, logger.Info("drop alias not found, ignore it")
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.SuccessLabel).Inc()
return merr.Success(), nil
} }
logger.Info("failed to drop alias", zap.Error(err))
if err := c.scheduler.AddTask(t); err != nil {
log.Ctx(ctx).Info("failed to enqueue request to drop alias",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("alias", in.GetAlias()))
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.FailLabel).Inc()
return merr.Status(err), nil
}
if err := t.WaitToFinish(); err != nil {
log.Ctx(ctx).Info("failed to drop alias",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("alias", in.GetAlias()),
zap.Uint64("ts", t.GetTs()))
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.FailLabel).Inc() metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.FailLabel).Inc()
return merr.Status(err), nil return merr.Status(err), nil
} }
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("DropAlias").Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordDDLReqLatency.WithLabelValues("DropAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("DropAlias").Observe(float64(t.queueDur.Milliseconds())) logger.Info("done to drop alias")
log.Ctx(ctx).Info("done to drop alias",
zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.GetAlias()),
zap.Uint64("ts", t.GetTs()))
return merr.Success(), nil return merr.Success(), nil
} }
@ -1969,49 +1927,26 @@ func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc() metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc()
tr := timerecord.NewTimeRecorder("AlterAlias") tr := timerecord.NewTimeRecorder("AlterAlias")
logger := log.Ctx(ctx).With(zap.String("role", typeutil.RootCoordRole),
log.Ctx(ctx).Info("received request to alter alias", zap.String("dbName", in.GetDbName()),
zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.GetAlias()), zap.String("alias", in.GetAlias()),
zap.String("collection", in.GetCollectionName())) zap.String("collectionName", in.GetCollectionName()))
logger.Info("received request to alter alias")
t := &alterAliasTask{ if err := c.broadcastAlterAlias(ctx, in); err != nil {
baseTask: newBaseTask(ctx, c), if errors.Is(err, errIgnoredAlterAlias) {
Req: in, logger.Info("alter alias already set on this collection, ignore it")
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.SuccessLabel).Inc()
return merr.Success(), nil
} }
logger.Info("failed to alter alias", zap.Error(err))
if err := c.scheduler.AddTask(t); err != nil {
log.Ctx(ctx).Info("failed to enqueue request to alter alias",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("alias", in.GetAlias()),
zap.String("collection", in.GetCollectionName()))
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.FailLabel).Inc()
return merr.Status(err), nil
}
if err := t.WaitToFinish(); err != nil {
log.Ctx(ctx).Info("failed to alter alias",
zap.String("role", typeutil.RootCoordRole),
zap.Error(err),
zap.String("alias", in.GetAlias()),
zap.String("collection", in.GetCollectionName()),
zap.Uint64("ts", t.GetTs()))
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.FailLabel).Inc() metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.FailLabel).Inc()
return merr.Status(err), nil return merr.Status(err), nil
} }
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues("AlterAlias").Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.RootCoordDDLReqLatency.WithLabelValues("AlterAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordDDLReqLatencyInQueue.WithLabelValues("AlterAlias").Observe(float64(t.queueDur.Milliseconds())) logger.Info("done to alter alias")
log.Info("done to alter alias",
zap.String("role", typeutil.RootCoordRole),
zap.String("alias", in.GetAlias()),
zap.String("collection", in.GetCollectionName()),
zap.Uint64("ts", t.GetTs()))
return merr.Success(), nil return merr.Success(), nil
} }

View File

@ -84,7 +84,9 @@ func initStreamingSystem() {
bapi.EXPECT().Close().Return() bapi.EXPECT().Close().Return()
mb := mock_broadcaster.NewMockBroadcaster(t) mb := mock_broadcaster.NewMockBroadcaster(t)
mb.EXPECT().WithResourceKeys(mock.Anything, mock.Anything).Return(bapi, nil) mb.EXPECT().WithResourceKeys(mock.Anything, mock.Anything).Return(bapi, nil).Maybe()
mb.EXPECT().WithResourceKeys(mock.Anything, mock.Anything, mock.Anything).Return(bapi, nil).Maybe()
mb.EXPECT().WithResourceKeys(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(bapi, nil).Maybe()
mb.EXPECT().Close().Return() mb.EXPECT().Close().Return()
broadcast.Release() broadcast.Release()
broadcast.ResetBroadcaster() broadcast.ResetBroadcaster()
@ -328,34 +330,6 @@ func TestRootCoord_CreateAlias(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
}) })
t.Run("failed to add task", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withInvalidScheduler())
ctx := context.Background()
resp, err := c.CreateAlias(ctx, &milvuspb.CreateAliasRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
t.Run("failed to execute", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withTaskFailScheduler())
ctx := context.Background()
resp, err := c.CreateAlias(ctx, &milvuspb.CreateAliasRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
t.Run("normal case, everything is ok", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withValidScheduler())
ctx := context.Background()
resp, err := c.CreateAlias(ctx, &milvuspb.CreateAliasRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
} }
func TestRootCoord_DropAlias(t *testing.T) { func TestRootCoord_DropAlias(t *testing.T) {
@ -366,34 +340,6 @@ func TestRootCoord_DropAlias(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
}) })
t.Run("failed to add task", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withInvalidScheduler())
ctx := context.Background()
resp, err := c.DropAlias(ctx, &milvuspb.DropAliasRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
t.Run("failed to execute", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withTaskFailScheduler())
ctx := context.Background()
resp, err := c.DropAlias(ctx, &milvuspb.DropAliasRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
t.Run("normal case, everything is ok", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withValidScheduler())
ctx := context.Background()
resp, err := c.DropAlias(ctx, &milvuspb.DropAliasRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
} }
func TestRootCoord_AlterAlias(t *testing.T) { func TestRootCoord_AlterAlias(t *testing.T) {
@ -404,34 +350,6 @@ func TestRootCoord_AlterAlias(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
}) })
t.Run("failed to add task", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withInvalidScheduler())
ctx := context.Background()
resp, err := c.AlterAlias(ctx, &milvuspb.AlterAliasRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
t.Run("failed to execute", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withTaskFailScheduler())
ctx := context.Background()
resp, err := c.AlterAlias(ctx, &milvuspb.AlterAliasRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
t.Run("normal case, everything is ok", func(t *testing.T) {
c := newTestCore(withHealthyCode(),
withValidScheduler())
ctx := context.Background()
resp, err := c.AlterAlias(ctx, &milvuspb.AlterAliasRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
} }
func TestRootCoord_DescribeAlias(t *testing.T) { func TestRootCoord_DescribeAlias(t *testing.T) {

View File

@ -72,16 +72,6 @@ func TestLockerKey(t *testing.T) {
} }
func TestGetLockerKey(t *testing.T) { func TestGetLockerKey(t *testing.T) {
t.Run("alter alias task locker key", func(t *testing.T) {
tt := &alterAliasTask{
Req: &milvuspb.AlterAliasRequest{
DbName: "foo",
CollectionName: "bar",
},
}
key := tt.GetLockerKey()
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|bar-2-true")
})
t.Run("alter collection task locker key", func(t *testing.T) { t.Run("alter collection task locker key", func(t *testing.T) {
metaMock := mockrootcoord.NewIMetaTable(t) metaMock := mockrootcoord.NewIMetaTable(t)
metaMock.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything). metaMock.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
@ -124,23 +114,6 @@ func TestGetLockerKey(t *testing.T) {
key := tt.GetLockerKey() key := tt.GetLockerKey()
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|111-2-true") assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|111-2-true")
}) })
t.Run("create alias task locker key", func(t *testing.T) {
metaMock := mockrootcoord.NewIMetaTable(t)
c := &Core{
meta: metaMock,
}
tt := &createAliasTask{
baseTask: baseTask{
core: c,
},
Req: &milvuspb.CreateAliasRequest{
DbName: "foo",
CollectionName: "bar",
},
}
key := tt.GetLockerKey()
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|bar-2-true")
})
t.Run("create collection task locker key", func(t *testing.T) { t.Run("create collection task locker key", func(t *testing.T) {
tt := &createCollectionTask{ tt := &createCollectionTask{
Req: &milvuspb.CreateCollectionRequest{ Req: &milvuspb.CreateCollectionRequest{
@ -219,28 +192,6 @@ func TestGetLockerKey(t *testing.T) {
key := tt.GetLockerKey() key := tt.GetLockerKey()
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false") assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false")
}) })
t.Run("drop alias task locker key", func(t *testing.T) {
metaMock := mockrootcoord.NewIMetaTable(t)
metaMock.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, s string, s2 string, u uint64) (*model.Collection, error) {
return &model.Collection{
Name: "real" + s2,
CollectionID: 111,
}, nil
})
c := &Core{
meta: metaMock,
}
tt := &dropAliasTask{
baseTask: baseTask{core: c},
Req: &milvuspb.DropAliasRequest{
DbName: "foo",
Alias: "bar",
},
}
key := tt.GetLockerKey()
assert.Equal(t, GetLockerKeyString(key), "$-0-false|foo-1-false|111-2-true")
})
t.Run("drop collection task locker key", func(t *testing.T) { t.Run("drop collection task locker key", func(t *testing.T) {
metaMock := mockrootcoord.NewIMetaTable(t) metaMock := mockrootcoord.NewIMetaTable(t)
metaMock.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything). metaMock.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything).

View File

@ -8,6 +8,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/messagespb" "github.com/milvus-io/milvus/pkg/v2/proto/messagespb"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
"github.com/milvus-io/milvus/pkg/v2/util/lock" "github.com/milvus-io/milvus/pkg/v2/util/lock"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
) )
// errFastLockFailed is the error for fast lock failed. // errFastLockFailed is the error for fast lock failed.
@ -83,7 +84,7 @@ func (l *lockGuard) Unlock() {
// FastLock locks the resource keys without waiting. // FastLock locks the resource keys without waiting.
// return error if the resource key is already locked. // return error if the resource key is already locked.
func (r *resourceKeyLocker) FastLock(keys ...message.ResourceKey) (*lockGuards, error) { func (r *resourceKeyLocker) FastLock(keys ...message.ResourceKey) (*lockGuards, error) {
sortResourceKeys(keys) keys = uniqueSortResourceKeys(keys)
g := &lockGuards{} g := &lockGuards{}
for _, key := range keys { for _, key := range keys {
@ -106,7 +107,8 @@ func (r *resourceKeyLocker) FastLock(keys ...message.ResourceKey) (*lockGuards,
// Lock locks the resource keys. // Lock locks the resource keys.
func (r *resourceKeyLocker) Lock(keys ...message.ResourceKey) *lockGuards { func (r *resourceKeyLocker) Lock(keys ...message.ResourceKey) *lockGuards {
// lock the keys in order to avoid deadlock. // lock the keys in order to avoid deadlock.
sortResourceKeys(keys) keys = uniqueSortResourceKeys(keys)
g := &lockGuards{} g := &lockGuards{}
for _, key := range keys { for _, key := range keys {
if key.Shared { if key.Shared {
@ -128,12 +130,14 @@ func (r *resourceKeyLocker) unlockWithKey(key message.ResourceKey) {
r.inner.Unlock(newResourceLockKey(key)) r.inner.Unlock(newResourceLockKey(key))
} }
// sortResourceKeys sorts the resource keys. // uniqueSortResourceKeys sorts the resource keys.
func sortResourceKeys(keys []message.ResourceKey) { func uniqueSortResourceKeys(keys []message.ResourceKey) []message.ResourceKey {
keys = typeutil.NewSet(keys...).Collect()
sort.Slice(keys, func(i, j int) bool { sort.Slice(keys, func(i, j int) bool {
if keys[i].Domain != keys[j].Domain { if keys[i].Domain != keys[j].Domain {
return keys[i].Domain < keys[j].Domain return keys[i].Domain < keys[j].Domain
} }
return keys[i].Key < keys[j].Key return keys[i].Key < keys[j].Key
}) })
return keys
} }

View File

@ -6,6 +6,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
) )
@ -110,7 +112,7 @@ func TestResourceKeyLocker(t *testing.T) {
key := message.NewCollectionNameResourceKey("test_collection") key := message.NewCollectionNameResourceKey("test_collection")
// First fast lock should succeed // First fast lock should succeed
guards1, err := locker.FastLock(key) guards1, err := locker.FastLock(key, key)
if err != nil { if err != nil {
t.Fatalf("First FastLock failed: %v", err) t.Fatalf("First FastLock failed: %v", err)
} }
@ -130,3 +132,35 @@ func TestResourceKeyLocker(t *testing.T) {
guards2.Unlock() guards2.Unlock()
}) })
} }
func TestUniqueSortResourceKeys(t *testing.T) {
keys := []message.ResourceKey{
message.NewSharedDBNameResourceKey("test_db_1"),
message.NewSharedDBNameResourceKey("test_db_1"),
message.NewSharedDBNameResourceKey("test_db_2"),
message.NewExclusiveCollectionNameResourceKey("test_db_1", "test_collection_11"),
message.NewExclusiveCollectionNameResourceKey("test_db_1", "test_collection_11"),
message.NewExclusiveCollectionNameResourceKey("test_db_1", "test_collection_12"),
message.NewExclusiveCollectionNameResourceKey("test_db_1", "test_collection_13"),
message.NewExclusiveCollectionNameResourceKey("test_db_2", "test_collection_21"),
message.NewExclusiveCollectionNameResourceKey("test_db_2", "test_collection_21"),
message.NewExclusiveCollectionNameResourceKey("test_db_2", "test_collection_22"),
message.NewSharedClusterResourceKey(),
}
for i := 0; i < 10; i++ {
rand.Shuffle(len(keys), func(i, j int) {
keys[i], keys[j] = keys[j], keys[i]
})
keys2 := uniqueSortResourceKeys(keys)
assert.Equal(t, keys2, []message.ResourceKey{
message.NewExclusiveCollectionNameResourceKey("test_db_1", "test_collection_11"),
message.NewExclusiveCollectionNameResourceKey("test_db_1", "test_collection_12"),
message.NewExclusiveCollectionNameResourceKey("test_db_1", "test_collection_13"),
message.NewExclusiveCollectionNameResourceKey("test_db_2", "test_collection_21"),
message.NewExclusiveCollectionNameResourceKey("test_db_2", "test_collection_22"),
message.NewSharedDBNameResourceKey("test_db_1"),
message.NewSharedDBNameResourceKey("test_db_2"),
message.NewSharedClusterResourceKey(),
})
}
}