mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-02 01:06:41 +08:00
Support rename db for collection (#25813)
Signed-off-by: jaime <yun.zhang@zilliz.com>
This commit is contained in:
parent
b2a0aec9ac
commit
c15a165d76
4
go.mod
4
go.mod
@ -164,7 +164,7 @@ require (
|
||||
go.opentelemetry.io/proto/otlp v0.9.0 // indirect
|
||||
go.uber.org/multierr v1.6.0
|
||||
golang.org/x/mod v0.8.0 // indirect
|
||||
golang.org/x/net v0.10.0 // indirect
|
||||
golang.org/x/net v0.10.0
|
||||
golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602
|
||||
golang.org/x/sys v0.8.0 // indirect
|
||||
golang.org/x/term v0.8.0 // indirect
|
||||
@ -185,7 +185,7 @@ require github.com/cockroachdb/errors v1.9.1
|
||||
|
||||
require (
|
||||
github.com/aliyun/credentials-go v1.2.7
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.2.12-0.20230716112826-24039be1d5f9
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.2.12-0.20230720100152-c9e2bb810f38
|
||||
github.com/tidwall/gjson v1.14.4
|
||||
)
|
||||
|
||||
|
||||
6
go.sum
6
go.sum
@ -580,10 +580,8 @@ github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/le
|
||||
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
|
||||
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.2.11 h1:ewkLCEqXIxXwQgpyZvOkW/mEOT/V8Vy3NsEQDnuZllA=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.2.11/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.2.12-0.20230716112826-24039be1d5f9 h1:2Q8jjBuvgJ6qlbAFuECHbeTDZPJqG0mGH+nY740k30U=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.2.12-0.20230716112826-24039be1d5f9/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.2.12-0.20230720100152-c9e2bb810f38 h1:GZBrqGskTHvJtzgQo8Qoe6Rx5GegOyI5/NF9Ye/8C90=
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.2.12-0.20230720100152-c9e2bb810f38/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
|
||||
github.com/milvus-io/pulsar-client-go v0.6.8 h1:fZdZH73aPRszu2fazyeeahQEz34tyn1Pt9EkqJmV100=
|
||||
github.com/milvus-io/pulsar-client-go v0.6.8/go.mod h1:oFIlYIk23tamkSLttw849qphmMIpHY8ztEBWDWJW+sc=
|
||||
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
|
||||
|
||||
@ -33,7 +33,7 @@ macro(build_aws_sdk_s3)
|
||||
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=deprecated-declarations")
|
||||
if (APPLE)
|
||||
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-error -Wno-error=strict-prototypes")
|
||||
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-error -Wmissing-braces -Wmissing-field-initializers -Wno-error=strict-prototypes")
|
||||
if(CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
|
||||
set(CPU_ARCH "x86_64")
|
||||
elseif(CMAKE_SYSTEM_PROCESSOR STREQUAL "arm64")
|
||||
|
||||
@ -10,7 +10,6 @@ import (
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
@ -454,12 +453,18 @@ func (kc *Catalog) alterModifyCollection(oldColl *model.Collection, newColl *mod
|
||||
oldCollClone.CreateTime = newColl.CreateTime
|
||||
oldCollClone.ConsistencyLevel = newColl.ConsistencyLevel
|
||||
oldCollClone.State = newColl.State
|
||||
key := BuildCollectionKey(newColl.DBID, oldColl.CollectionID)
|
||||
|
||||
oldKey := BuildCollectionKey(oldColl.DBID, oldColl.CollectionID)
|
||||
newKey := BuildCollectionKey(newColl.DBID, oldColl.CollectionID)
|
||||
value, err := proto.Marshal(model.MarshalCollectionModel(oldCollClone))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return kc.Snapshot.Save(key, string(value), ts)
|
||||
saves := map[string]string{newKey: string(value)}
|
||||
if oldKey == newKey {
|
||||
return kc.Snapshot.Save(newKey, string(value), ts)
|
||||
}
|
||||
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(saves, []string{oldKey}, ts)
|
||||
}
|
||||
|
||||
func (kc *Catalog) AlterCollection(ctx context.Context, oldColl *model.Collection, newColl *model.Collection, alterType metastore.AlterType, ts typeutil.Timestamp) error {
|
||||
|
||||
@ -14,6 +14,7 @@ import (
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/atomic"
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
@ -1009,6 +1010,24 @@ func TestCatalog_AlterCollection(t *testing.T) {
|
||||
err := kc.AlterCollection(ctx, oldC, newC, metastore.MODIFY, 0)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("modify db name", func(t *testing.T) {
|
||||
var collectionID int64 = 1
|
||||
snapshot := kv.NewMockSnapshotKV()
|
||||
snapshot.MultiSaveAndRemoveWithPrefixFunc = func(saves map[string]string, removals []string, ts typeutil.Timestamp) error {
|
||||
assert.ElementsMatch(t, []string{BuildCollectionKey(0, collectionID)}, removals)
|
||||
assert.Equal(t, len(saves), 1)
|
||||
assert.Contains(t, maps.Keys(saves), BuildCollectionKey(1, collectionID))
|
||||
return nil
|
||||
}
|
||||
|
||||
kc := &Catalog{Snapshot: snapshot}
|
||||
ctx := context.Background()
|
||||
oldC := &model.Collection{DBID: 0, CollectionID: collectionID, State: pb.CollectionState_CollectionCreated}
|
||||
newC := &model.Collection{DBID: 1, CollectionID: collectionID, State: pb.CollectionState_CollectionCreated}
|
||||
err := kc.AlterCollection(ctx, oldC, newC, metastore.MODIFY, 0)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCatalog_AlterPartition(t *testing.T) {
|
||||
|
||||
@ -26,6 +26,7 @@ import (
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metastore"
|
||||
@ -64,7 +65,7 @@ type IMetaTable interface {
|
||||
DropAlias(ctx context.Context, dbName string, alias string, ts Timestamp) error
|
||||
AlterAlias(ctx context.Context, dbName string, alias string, collectionName string, ts Timestamp) error
|
||||
AlterCollection(ctx context.Context, oldColl *model.Collection, newColl *model.Collection, ts Timestamp) error
|
||||
RenameCollection(ctx context.Context, dbName string, oldName string, newName string, ts Timestamp) error
|
||||
RenameCollection(ctx context.Context, dbName string, oldName string, newDBName string, newName string, ts Timestamp) error
|
||||
|
||||
// TODO: it'll be a big cost if we handle the time travel logic, since we should always list all aliases in catalog.
|
||||
IsAlias(db, name string) bool
|
||||
@ -696,7 +697,7 @@ func (mt *MetaTable) AlterCollection(ctx context.Context, oldColl *model.Collect
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mt *MetaTable) RenameCollection(ctx context.Context, dbName string, oldName string, newName string, ts Timestamp) error {
|
||||
func (mt *MetaTable) RenameCollection(ctx context.Context, dbName string, oldName string, newDBName string, newName string, ts Timestamp) error {
|
||||
mt.ddLock.Lock()
|
||||
defer mt.ddLock.Unlock()
|
||||
ctx = contextutil.WithTenantID(ctx, Params.CommonCfg.ClusterName)
|
||||
@ -705,22 +706,35 @@ func (mt *MetaTable) RenameCollection(ctx context.Context, dbName string, oldNam
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.String("db", dbName),
|
||||
zap.String("oldName", oldName),
|
||||
zap.String("newDBName", newDBName),
|
||||
zap.String("newName", newName),
|
||||
)
|
||||
|
||||
if dbName == "" {
|
||||
log.Warn("db name is empty")
|
||||
dbName = util.DefaultDBName
|
||||
}
|
||||
|
||||
if newDBName == "" {
|
||||
log.Warn("target db name is empty")
|
||||
newDBName = dbName
|
||||
}
|
||||
|
||||
// check target db
|
||||
targetDB, ok := mt.dbName2Meta[newDBName]
|
||||
if !ok {
|
||||
return fmt.Errorf("target database:%s not found", newDBName)
|
||||
}
|
||||
|
||||
//old collection should not be an alias
|
||||
_, ok := mt.aliases.get(dbName, oldName)
|
||||
_, ok = mt.aliases.get(dbName, oldName)
|
||||
if ok {
|
||||
log.Warn("unsupported use a alias to rename collection")
|
||||
return fmt.Errorf("unsupported use an alias to rename collection, alias:%s", oldName)
|
||||
}
|
||||
|
||||
// check new collection already exists
|
||||
newColl, err := mt.getCollectionByNameInternal(ctx, dbName, newName, ts)
|
||||
newColl, err := mt.getCollectionByNameInternal(ctx, newDBName, newName, ts)
|
||||
if newColl != nil {
|
||||
return fmt.Errorf("duplicated new collection name :%s with other collection name or alias", newName)
|
||||
}
|
||||
@ -735,13 +749,20 @@ func (mt *MetaTable) RenameCollection(ctx context.Context, dbName string, oldNam
|
||||
return err
|
||||
}
|
||||
|
||||
// unsupported rename collection while the collection has aliases
|
||||
aliases := mt.listAliasesByID(oldColl.CollectionID)
|
||||
if len(aliases) > 0 && oldColl.DBID != targetDB.ID {
|
||||
return fmt.Errorf("fail to rename db name, must drop all aliases of this collection before rename")
|
||||
}
|
||||
|
||||
newColl = oldColl.Clone()
|
||||
newColl.Name = newName
|
||||
newColl.DBID = targetDB.ID
|
||||
if err := mt.catalog.AlterCollection(ctx, oldColl, newColl, metastore.MODIFY, ts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mt.names.insert(dbName, newName, oldColl.CollectionID)
|
||||
mt.names.insert(newDBName, newName, oldColl.CollectionID)
|
||||
mt.names.remove(dbName, oldName)
|
||||
|
||||
mt.collID2Meta[oldColl.CollectionID] = newColl
|
||||
|
||||
@ -1160,7 +1160,16 @@ func TestMetaTable_RenameCollection(t *testing.T) {
|
||||
aliases: newNameDb(),
|
||||
}
|
||||
meta.names.insert("", "alias", 1)
|
||||
err := meta.RenameCollection(context.TODO(), "", "alias", "new", typeutil.MaxTimestamp)
|
||||
err := meta.RenameCollection(context.TODO(), "", "alias", "", "new", typeutil.MaxTimestamp)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("target db doesn't exist", func(t *testing.T) {
|
||||
meta := &MetaTable{
|
||||
names: newNameDb(),
|
||||
aliases: newNameDb(),
|
||||
}
|
||||
err := meta.RenameCollection(context.TODO(), "", "non-exists", "non-exists", "new", typeutil.MaxTimestamp)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
@ -1169,7 +1178,7 @@ func TestMetaTable_RenameCollection(t *testing.T) {
|
||||
names: newNameDb(),
|
||||
aliases: newNameDb(),
|
||||
}
|
||||
err := meta.RenameCollection(context.TODO(), "", "non-exists", "new", typeutil.MaxTimestamp)
|
||||
err := meta.RenameCollection(context.TODO(), "", "non-exists", "", "new", typeutil.MaxTimestamp)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
@ -1179,7 +1188,7 @@ func TestMetaTable_RenameCollection(t *testing.T) {
|
||||
aliases: newNameDb(),
|
||||
}
|
||||
meta.names.insert("", "old", 1)
|
||||
err := meta.RenameCollection(context.TODO(), "", "old", "new", typeutil.MaxTimestamp)
|
||||
err := meta.RenameCollection(context.TODO(), "", "old", "", "new", typeutil.MaxTimestamp)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
@ -1197,7 +1206,7 @@ func TestMetaTable_RenameCollection(t *testing.T) {
|
||||
}
|
||||
meta.names.insert(util.DefaultDBName, "old", 1)
|
||||
meta.names.insert(util.DefaultDBName, "new", 2)
|
||||
err := meta.RenameCollection(context.TODO(), "", "old", "new", 1000)
|
||||
err := meta.RenameCollection(context.TODO(), "", "old", "", "new", 1000)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
@ -1219,7 +1228,7 @@ func TestMetaTable_RenameCollection(t *testing.T) {
|
||||
}
|
||||
meta.names.insert(util.DefaultDBName, "old", 1)
|
||||
meta.names.insert(util.DefaultDBName, "new", 2)
|
||||
err := meta.RenameCollection(context.TODO(), util.DefaultDBName, "old", "new", 1000)
|
||||
err := meta.RenameCollection(context.TODO(), util.DefaultDBName, "old", "", "new", 1000)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
@ -1254,7 +1263,37 @@ func TestMetaTable_RenameCollection(t *testing.T) {
|
||||
},
|
||||
}
|
||||
meta.names.insert(util.DefaultDBName, "old", 1)
|
||||
err := meta.RenameCollection(context.TODO(), util.DefaultDBName, "old", "new", 1000)
|
||||
err := meta.RenameCollection(context.TODO(), util.DefaultDBName, "old", "", "new", 1000)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("rename db name fails if aliases exists", func(t *testing.T) {
|
||||
catalog := mocks.NewRootCoordCatalog(t)
|
||||
catalog.On("GetCollectionByName",
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
).Return(nil, common.NewCollectionNotExistError("error"))
|
||||
meta := &MetaTable{
|
||||
dbName2Meta: map[string]*model.Database{
|
||||
util.DefaultDBName: model.NewDefaultDatabase(),
|
||||
"db1": model.NewDatabase(2, "db1", pb.DatabaseState_DatabaseCreated),
|
||||
},
|
||||
catalog: catalog,
|
||||
names: newNameDb(),
|
||||
aliases: newNameDb(),
|
||||
collID2Meta: map[typeutil.UniqueID]*model.Collection{
|
||||
1: {
|
||||
CollectionID: 1,
|
||||
Name: "old",
|
||||
},
|
||||
},
|
||||
}
|
||||
meta.names.insert(util.DefaultDBName, "old", 1)
|
||||
meta.aliases.insert(util.DefaultDBName, "alias", 1)
|
||||
|
||||
err := meta.RenameCollection(context.TODO(), util.DefaultDBName, "old", "db1", "new", 1000)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
@ -1288,7 +1327,7 @@ func TestMetaTable_RenameCollection(t *testing.T) {
|
||||
},
|
||||
}
|
||||
meta.names.insert(util.DefaultDBName, "old", 1)
|
||||
err := meta.RenameCollection(context.TODO(), util.DefaultDBName, "old", "new", 1000)
|
||||
err := meta.RenameCollection(context.TODO(), util.DefaultDBName, "old", "", "new", 1000)
|
||||
assert.NoError(t, err)
|
||||
|
||||
id, ok := meta.names.get(util.DefaultDBName, "new")
|
||||
|
||||
@ -26,6 +26,7 @@ import (
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/allocator"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
@ -137,7 +138,7 @@ func (m mockMetaTable) AlterCollection(ctx context.Context, oldColl *model.Colle
|
||||
return m.AlterCollectionFunc(ctx, oldColl, newColl, ts)
|
||||
}
|
||||
|
||||
func (m *mockMetaTable) RenameCollection(ctx context.Context, dbName string, oldName string, newName string, ts Timestamp) error {
|
||||
func (m *mockMetaTable) RenameCollection(ctx context.Context, dbName string, oldName string, newDBName string, newName string, ts Timestamp) error {
|
||||
return m.RenameCollectionFunc(ctx, oldName, newName, ts)
|
||||
}
|
||||
|
||||
|
||||
@ -650,13 +650,13 @@ func (_m *IMetaTable) RemovePartition(ctx context.Context, dbID int64, collectio
|
||||
return r0
|
||||
}
|
||||
|
||||
// RenameCollection provides a mock function with given fields: ctx, dbName, oldName, newName, ts
|
||||
func (_m *IMetaTable) RenameCollection(ctx context.Context, dbName string, oldName string, newName string, ts uint64) error {
|
||||
ret := _m.Called(ctx, dbName, oldName, newName, ts)
|
||||
// RenameCollection provides a mock function with given fields: ctx, dbName, oldName, newDBName, newName, ts
|
||||
func (_m *IMetaTable) RenameCollection(ctx context.Context, dbName string, oldName string, newDBName string, newName string, ts uint64) error {
|
||||
ret := _m.Called(ctx, dbName, oldName, newDBName, newName, ts)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, string, string, uint64) error); ok {
|
||||
r0 = rf(ctx, dbName, oldName, newName, ts)
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string, string, string, string, uint64) error); ok {
|
||||
r0 = rf(ctx, dbName, oldName, newDBName, newName, ts)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
@ -39,5 +39,5 @@ func (t *renameCollectionTask) Execute(ctx context.Context) error {
|
||||
if err := t.core.ExpireMetaCache(ctx, t.Req.GetDbName(), []string{t.Req.GetOldName()}, InvalidCollectionID, t.GetTs()); err != nil {
|
||||
return err
|
||||
}
|
||||
return t.core.meta.RenameCollection(ctx, t.Req.GetDbName(), t.Req.GetOldName(), t.Req.GetNewName(), t.GetTs())
|
||||
return t.core.meta.RenameCollection(ctx, t.Req.GetDbName(), t.Req.GetOldName(), t.Req.GetNewDBName(), t.Req.GetNewName(), t.GetTs())
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user