mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
fix: [2.5] Fix collections with duplicate names can be created (#40147)
This PR introduces two restrictions: 1. Before dropping a collection, all aliases associated with that collection must be dropped. 2. When creating a collection, if the collection name duplicates any alias, the collection creation will fail. issue: https://github.com/milvus-io/milvus/issues/40142 pr: https://github.com/milvus-io/milvus/pull/40143 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
d60511abc3
commit
eee98fd044
@ -598,6 +598,14 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
|
|||||||
UpdateTimestamp: ts,
|
UpdateTimestamp: ts,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if the collection name duplicates an alias.
|
||||||
|
_, err = t.core.meta.DescribeAlias(ctx, t.Req.GetDbName(), t.Req.GetCollectionName(), typeutil.MaxTimestamp)
|
||||||
|
if err == nil {
|
||||||
|
err2 := fmt.Errorf("collection name [%s] conflicts with an existing alias, please choose a unique name", t.Req.GetCollectionName())
|
||||||
|
log.Ctx(ctx).Warn("create collection failed", zap.String("database", t.Req.GetDbName()), zap.Error(err2))
|
||||||
|
return err2
|
||||||
|
}
|
||||||
|
|
||||||
// We cannot check the idempotency inside meta table when adding collection, since we'll execute duplicate steps
|
// We cannot check the idempotency inside meta table when adding collection, since we'll execute duplicate steps
|
||||||
// if add collection successfully due to idempotency check. Some steps may be risky to be duplicate executed if they
|
// if add collection successfully due to idempotency check. Some steps may be risky to be duplicate executed if they
|
||||||
// are not promised idempotent.
|
// are not promised idempotent.
|
||||||
@ -613,6 +621,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
|
|||||||
log.Ctx(ctx).Warn("add duplicate collection", zap.String("collection", t.Req.GetCollectionName()), zap.Uint64("ts", ts))
|
log.Ctx(ctx).Warn("add duplicate collection", zap.String("collection", t.Req.GetCollectionName()), zap.Uint64("ts", ts))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
log.Ctx(ctx).Info("check collection existence", zap.String("collection", t.Req.GetCollectionName()), zap.Error(err))
|
||||||
|
|
||||||
// TODO: The create collection is not idempotent for other component, such as wal.
|
// TODO: The create collection is not idempotent for other component, such as wal.
|
||||||
// we need to make the create collection operation must success after some persistent operation, refactor it in future.
|
// we need to make the create collection operation must success after some persistent operation, refactor it in future.
|
||||||
|
|||||||
@ -20,6 +20,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"math"
|
"math"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -903,6 +904,8 @@ func Test_createCollectionTask_Execute(t *testing.T) {
|
|||||||
mock.Anything,
|
mock.Anything,
|
||||||
mock.Anything,
|
mock.Anything,
|
||||||
).Return(coll, nil)
|
).Return(coll, nil)
|
||||||
|
meta.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||||
|
Return("", merr.WrapErrAliasNotFound("", ""))
|
||||||
|
|
||||||
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker))
|
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker))
|
||||||
|
|
||||||
@ -950,6 +953,8 @@ func Test_createCollectionTask_Execute(t *testing.T) {
|
|||||||
mock.Anything,
|
mock.Anything,
|
||||||
mock.Anything,
|
mock.Anything,
|
||||||
).Return(coll, nil)
|
).Return(coll, nil)
|
||||||
|
meta.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||||
|
Return("", merr.WrapErrAliasNotFound("", ""))
|
||||||
|
|
||||||
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker))
|
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker))
|
||||||
|
|
||||||
@ -972,10 +977,11 @@ func Test_createCollectionTask_Execute(t *testing.T) {
|
|||||||
ticker := newTickerWithMockFailStream()
|
ticker := newTickerWithMockFailStream()
|
||||||
shardNum := 2
|
shardNum := 2
|
||||||
pchans := ticker.getDmlChannelNames(shardNum)
|
pchans := ticker.getDmlChannelNames(shardNum)
|
||||||
meta := newMockMetaTable()
|
meta := mockrootcoord.NewIMetaTable(t)
|
||||||
meta.GetCollectionByNameFunc = func(ctx context.Context, collectionName string, ts Timestamp) (*model.Collection, error) {
|
meta.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||||
return nil, errors.New("error mock GetCollectionByName")
|
Return(nil, errors.New("error mock GetCollectionByName"))
|
||||||
}
|
meta.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||||
|
Return("", merr.WrapErrAliasNotFound("", ""))
|
||||||
core := newTestCore(withTtSynchronizer(ticker), withMeta(meta))
|
core := newTestCore(withTtSynchronizer(ticker), withMeta(meta))
|
||||||
schema := &schemapb.CollectionSchema{Name: "", Fields: []*schemapb.FieldSchema{{}}}
|
schema := &schemapb.CollectionSchema{Name: "", Fields: []*schemapb.FieldSchema{{}}}
|
||||||
task := &createCollectionTask{
|
task := &createCollectionTask{
|
||||||
@ -996,6 +1002,40 @@ func Test_createCollectionTask_Execute(t *testing.T) {
|
|||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("collection name duplicates an alias", func(t *testing.T) {
|
||||||
|
defer cleanTestEnv()
|
||||||
|
|
||||||
|
collectionName := funcutil.GenRandomStr()
|
||||||
|
ticker := newRocksMqTtSynchronizer()
|
||||||
|
field1 := funcutil.GenRandomStr()
|
||||||
|
schema := &schemapb.CollectionSchema{Name: collectionName, Fields: []*schemapb.FieldSchema{{Name: field1}}}
|
||||||
|
|
||||||
|
meta := mockrootcoord.NewIMetaTable(t)
|
||||||
|
// mock collection name duplicates an alias
|
||||||
|
meta.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||||
|
Return(collectionName, nil)
|
||||||
|
|
||||||
|
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker))
|
||||||
|
task := &createCollectionTask{
|
||||||
|
baseTask: newBaseTask(context.Background(), core),
|
||||||
|
Req: &milvuspb.CreateCollectionRequest{
|
||||||
|
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection},
|
||||||
|
DbName: "mock-db",
|
||||||
|
CollectionName: collectionName,
|
||||||
|
Properties: []*commonpb.KeyValuePair{
|
||||||
|
{
|
||||||
|
Key: common.ConsistencyLevel,
|
||||||
|
Value: "1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
schema: schema,
|
||||||
|
}
|
||||||
|
err := task.Execute(context.Background())
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.True(t, strings.Contains(err.Error(), "conflicts with an existing alias"))
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("normal case", func(t *testing.T) {
|
t.Run("normal case", func(t *testing.T) {
|
||||||
defer cleanTestEnv()
|
defer cleanTestEnv()
|
||||||
|
|
||||||
@ -1023,6 +1063,8 @@ func Test_createCollectionTask_Execute(t *testing.T) {
|
|||||||
mock.Anything,
|
mock.Anything,
|
||||||
mock.Anything,
|
mock.Anything,
|
||||||
).Return(nil)
|
).Return(nil)
|
||||||
|
meta.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||||
|
Return("", merr.WrapErrAliasNotFound("", ""))
|
||||||
|
|
||||||
dc := newMockDataCoord()
|
dc := newMockDataCoord()
|
||||||
dc.GetComponentStatesFunc = func(ctx context.Context) (*milvuspb.ComponentStates, error) {
|
dc.GetComponentStatesFunc = func(ctx context.Context) (*milvuspb.ComponentStates, error) {
|
||||||
@ -1107,6 +1149,8 @@ func Test_createCollectionTask_Execute(t *testing.T) {
|
|||||||
mock.Anything,
|
mock.Anything,
|
||||||
mock.Anything,
|
mock.Anything,
|
||||||
).Return(errors.New("error mock ChangeCollectionState"))
|
).Return(errors.New("error mock ChangeCollectionState"))
|
||||||
|
meta.EXPECT().DescribeAlias(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||||
|
Return("", merr.WrapErrAliasNotFound("", ""))
|
||||||
|
|
||||||
removeCollectionCalled := false
|
removeCollectionCalled := false
|
||||||
removeCollectionChan := make(chan struct{}, 1)
|
removeCollectionChan := make(chan struct{}, 1)
|
||||||
|
|||||||
@ -63,7 +63,6 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error {
|
|||||||
log.Ctx(ctx).Warn("drop non-existent collection", zap.String("collection", t.Req.GetCollectionName()), zap.String("database", t.Req.GetDbName()))
|
log.Ctx(ctx).Warn("drop non-existent collection", zap.String("collection", t.Req.GetCollectionName()), zap.String("database", t.Req.GetDbName()))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -71,6 +70,13 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error {
|
|||||||
// meta cache of all aliases should also be cleaned.
|
// meta cache of all aliases should also be cleaned.
|
||||||
aliases := t.core.meta.ListAliasesByID(ctx, collMeta.CollectionID)
|
aliases := t.core.meta.ListAliasesByID(ctx, collMeta.CollectionID)
|
||||||
|
|
||||||
|
// Check if all aliases have been dropped.
|
||||||
|
if len(aliases) > 0 {
|
||||||
|
err = fmt.Errorf("unable to drop the collection [%s] because it has associated aliases %v, please remove all aliases before dropping the collection", t.Req.GetCollectionName(), aliases)
|
||||||
|
log.Ctx(ctx).Warn("drop collection failed", zap.String("database", t.Req.GetDbName()), zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
ts := t.GetTs()
|
ts := t.GetTs()
|
||||||
return executeDropCollectionTaskSteps(ctx,
|
return executeDropCollectionTaskSteps(ctx,
|
||||||
t.core, collMeta, t.Req.GetDbName(), aliases,
|
t.core, collMeta, t.Req.GetDbName(), aliases,
|
||||||
|
|||||||
@ -18,6 +18,7 @@ package rootcoord
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -181,6 +182,40 @@ func Test_dropCollectionTask_Execute(t *testing.T) {
|
|||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("aliases have not been dropped", func(t *testing.T) {
|
||||||
|
defer cleanTestEnv()
|
||||||
|
|
||||||
|
collectionName := funcutil.GenRandomStr()
|
||||||
|
shardNum := 2
|
||||||
|
|
||||||
|
ticker := newRocksMqTtSynchronizer()
|
||||||
|
pchans := ticker.getDmlChannelNames(shardNum)
|
||||||
|
ticker.addDmlChannels(pchans...)
|
||||||
|
|
||||||
|
coll := &model.Collection{Name: collectionName, ShardsNum: int32(shardNum), PhysicalChannelNames: pchans}
|
||||||
|
|
||||||
|
meta := mockrootcoord.NewIMetaTable(t)
|
||||||
|
meta.EXPECT().GetCollectionByName(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||||
|
Return(coll.Clone(), nil)
|
||||||
|
meta.EXPECT().ListAliasesByID(mock.Anything, mock.Anything).
|
||||||
|
Return([]string{"mock-alias-0", "mock-alias-1"})
|
||||||
|
|
||||||
|
core := newTestCore(
|
||||||
|
withMeta(meta),
|
||||||
|
withTtSynchronizer(ticker))
|
||||||
|
|
||||||
|
task := &dropCollectionTask{
|
||||||
|
baseTask: newBaseTask(context.Background(), core),
|
||||||
|
Req: &milvuspb.DropCollectionRequest{
|
||||||
|
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_DropCollection},
|
||||||
|
CollectionName: collectionName,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := task.Execute(context.Background())
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.True(t, strings.Contains(err.Error(), "please remove all aliases"))
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("normal case, redo", func(t *testing.T) {
|
t.Run("normal case, redo", func(t *testing.T) {
|
||||||
defer cleanTestEnv()
|
defer cleanTestEnv()
|
||||||
|
|
||||||
|
|||||||
@ -179,6 +179,7 @@ class TestMilvusClientAliasInvalid(TestMilvusClientV2Base):
|
|||||||
f"[alias={alias}]"}
|
f"[alias={alias}]"}
|
||||||
self.create_alias(client, collection_name_1, alias,
|
self.create_alias(client, collection_name_1, alias,
|
||||||
check_task=CheckTasks.err_res, check_items=error)
|
check_task=CheckTasks.err_res, check_items=error)
|
||||||
|
self.drop_alias(client, alias)
|
||||||
self.drop_collection(client, collection_name)
|
self.drop_collection(client, collection_name)
|
||||||
|
|
||||||
@pytest.mark.tags(CaseLabel.L1)
|
@pytest.mark.tags(CaseLabel.L1)
|
||||||
@ -349,6 +350,7 @@ class TestMilvusClientAliasInvalid(TestMilvusClientV2Base):
|
|||||||
error = {ct.err_code: 1600, ct.err_msg: f"alias not found[database=default][alias={another_alias}]"}
|
error = {ct.err_code: 1600, ct.err_msg: f"alias not found[database=default][alias={another_alias}]"}
|
||||||
self.alter_alias(client, collection_name, another_alias,
|
self.alter_alias(client, collection_name, another_alias,
|
||||||
check_task=CheckTasks.err_res, check_items=error)
|
check_task=CheckTasks.err_res, check_items=error)
|
||||||
|
self.drop_alias(client, alias)
|
||||||
self.drop_collection(client, collection_name)
|
self.drop_collection(client, collection_name)
|
||||||
|
|
||||||
|
|
||||||
@ -477,4 +479,6 @@ class TestMilvusClientAliasValid(TestMilvusClientV2Base):
|
|||||||
# 4. assert collection is equal to alias according to partitions
|
# 4. assert collection is equal to alias according to partitions
|
||||||
partition_name_list_alias = self.list_partitions(client, another_alias)[0]
|
partition_name_list_alias = self.list_partitions(client, another_alias)[0]
|
||||||
assert partition_name_list == partition_name_list_alias
|
assert partition_name_list == partition_name_list_alias
|
||||||
|
self.drop_alias(client, alias)
|
||||||
|
self.drop_alias(client, another_alias)
|
||||||
self.drop_collection(client, collection_name)
|
self.drop_collection(client, collection_name)
|
||||||
|
|||||||
@ -122,6 +122,7 @@ class TestMilvusClientSearchIteratorInValid(TestMilvusClientV2Base):
|
|||||||
self.release_collection(client, collection_name)
|
self.release_collection(client, collection_name)
|
||||||
self.drop_collection(client, collection_name)
|
self.drop_collection(client, collection_name)
|
||||||
self.release_collection(client, collection_name_new)
|
self.release_collection(client, collection_name_new)
|
||||||
|
self.drop_alias(client, alias)
|
||||||
self.drop_collection(client, collection_name_new)
|
self.drop_collection(client, collection_name_new)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -222,6 +222,7 @@ class TestAliasOperation(TestcaseBase):
|
|||||||
self.utility_wrap.drop_collection(alias_name,
|
self.utility_wrap.drop_collection(alias_name,
|
||||||
check_task=CheckTasks.err_res,
|
check_task=CheckTasks.err_res,
|
||||||
check_items=error)
|
check_items=error)
|
||||||
|
self.utility_wrap.drop_alias(alias_name)
|
||||||
self.utility_wrap.drop_collection(c_name)
|
self.utility_wrap.drop_collection(c_name)
|
||||||
assert not self.utility_wrap.has_collection(c_name)[0]
|
assert not self.utility_wrap.has_collection(c_name)[0]
|
||||||
|
|
||||||
@ -447,6 +448,7 @@ class TestAliasOperationInvalid(TestcaseBase):
|
|||||||
assert len(res) == 1
|
assert len(res) == 1
|
||||||
|
|
||||||
# dropping collection that has an alias shall drop the alias as well
|
# dropping collection that has an alias shall drop the alias as well
|
||||||
|
self.utility_wrap.drop_alias(alias_name)
|
||||||
collection_w.drop()
|
collection_w.drop()
|
||||||
collection_w = self.init_collection_wrap(name=c_name, schema=default_schema,
|
collection_w = self.init_collection_wrap(name=c_name, schema=default_schema,
|
||||||
check_task=CheckTasks.check_collection_property,
|
check_task=CheckTasks.check_collection_property,
|
||||||
@ -454,13 +456,9 @@ class TestAliasOperationInvalid(TestcaseBase):
|
|||||||
res2 = self.utility_wrap.list_aliases(c_name)[0]
|
res2 = self.utility_wrap.list_aliases(c_name)[0]
|
||||||
assert len(res2) == 0
|
assert len(res2) == 0
|
||||||
# the same alias name can be reused for another collection
|
# the same alias name can be reused for another collection
|
||||||
error = {ct.err_code: 999,
|
self.utility_wrap.create_alias(c_name, alias_name)
|
||||||
ct.err_msg: f"{alias_name} is alias to another collection: {collection_w.name}: alias already exist"}
|
res2 = self.utility_wrap.list_aliases(c_name)[0]
|
||||||
self.utility_wrap.create_alias(c_name, alias_name,
|
assert len(res2) == 1
|
||||||
check_task=CheckTasks.err_res,
|
|
||||||
check_items=error)
|
|
||||||
# res2 = self.utility_wrap.list_aliases(c_name)[0]
|
|
||||||
# assert len(res2) == 1
|
|
||||||
|
|
||||||
@pytest.mark.tags(CaseLabel.L0)
|
@pytest.mark.tags(CaseLabel.L0)
|
||||||
def test_alias_rename_collection_to_alias_name(self):
|
def test_alias_rename_collection_to_alias_name(self):
|
||||||
@ -469,7 +467,7 @@ class TestAliasOperationInvalid(TestcaseBase):
|
|||||||
method:
|
method:
|
||||||
1.create a collection
|
1.create a collection
|
||||||
2.create an alias for the collection
|
2.create an alias for the collection
|
||||||
3.rename the collection to the alias name no matter the collection was dropped or not
|
3.rename the collection to the alias name
|
||||||
expected: in step 3, rename collection to alias name failed
|
expected: in step 3, rename collection to alias name failed
|
||||||
"""
|
"""
|
||||||
self._connect()
|
self._connect()
|
||||||
@ -483,10 +481,3 @@ class TestAliasOperationInvalid(TestcaseBase):
|
|||||||
ct.err_msg: f"cannot rename collection to an existing alias: {alias_name}"}
|
ct.err_msg: f"cannot rename collection to an existing alias: {alias_name}"}
|
||||||
self.utility_wrap.rename_collection(collection_w.name, alias_name,
|
self.utility_wrap.rename_collection(collection_w.name, alias_name,
|
||||||
check_task=CheckTasks.err_res, check_items=error)
|
check_task=CheckTasks.err_res, check_items=error)
|
||||||
|
|
||||||
collection_w.drop()
|
|
||||||
collection_w = self.init_collection_wrap(name=c_name, schema=default_schema,
|
|
||||||
check_task=CheckTasks.check_collection_property,
|
|
||||||
check_items={exp_name: c_name, exp_schema: default_schema})
|
|
||||||
self.utility_wrap.rename_collection(collection_w.name, alias_name,
|
|
||||||
check_task=CheckTasks.err_res, check_items=error)
|
|
||||||
|
|||||||
@ -3737,6 +3737,7 @@ class TestQueryCount(TestcaseBase):
|
|||||||
collection_w_alias.drop(check_task=CheckTasks.err_res,
|
collection_w_alias.drop(check_task=CheckTasks.err_res,
|
||||||
check_items={ct.err_code: 1,
|
check_items={ct.err_code: 1,
|
||||||
ct.err_msg: "cannot drop the collection via alias"})
|
ct.err_msg: "cannot drop the collection via alias"})
|
||||||
|
self.utility_wrap.drop_alias(alias)
|
||||||
collection_w.drop()
|
collection_w.drop()
|
||||||
|
|
||||||
@pytest.mark.tags(CaseLabel.L2)
|
@pytest.mark.tags(CaseLabel.L2)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user