Fix miss load the same name collection during recover stage (#24941)

Signed-off-by: jaime <yun.zhang@zilliz.com>
This commit is contained in:
jaime 2023-06-21 16:13:54 +08:00 committed by GitHub
parent b1b52970a0
commit 277cbbd7c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 107 additions and 69 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
@ -15,12 +16,12 @@ import (
type RootCoordCatalog interface {
CreateDatabase(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error
DropDatabase(ctx context.Context, dbID int64, ts typeutil.Timestamp) error
ListDatabases(ctx context.Context, ts typeutil.Timestamp) (map[string]*model.Database, error)
ListDatabases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Database, error)
CreateCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error
GetCollectionByID(ctx context.Context, dbID int64, ts typeutil.Timestamp, collectionID typeutil.UniqueID) (*model.Collection, error)
GetCollectionByName(ctx context.Context, dbID int64, collectionName string, ts typeutil.Timestamp) (*model.Collection, error)
ListCollections(ctx context.Context, dbID int64, ts typeutil.Timestamp) (map[string]*model.Collection, error)
ListCollections(ctx context.Context, dbID int64, ts typeutil.Timestamp) ([]*model.Collection, error)
CollectionExists(ctx context.Context, dbID int64, collectionID typeutil.UniqueID, ts typeutil.Timestamp) bool
DropCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error
AlterCollection(ctx context.Context, oldColl *model.Collection, newColl *model.Collection, alterType AlterType, ts typeutil.Timestamp) error

View File

@ -16,13 +16,14 @@ import (
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metastore/db/dbmodel"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/util/contextutil"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
)
type Catalog struct {
@ -48,9 +49,9 @@ func (tc *Catalog) DropDatabase(ctx context.Context, dbID int64, ts typeutil.Tim
}
func (tc *Catalog) ListDatabases(ctx context.Context, ts typeutil.Timestamp) (map[string]*model.Database, error) {
func (tc *Catalog) ListDatabases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Database, error) {
//TODO
return make(map[string]*model.Database), nil
return make([]*model.Database, 0), nil
}
func (tc *Catalog) CreateCollection(ctx context.Context, collection *model.Collection, ts typeutil.Timestamp) error {
@ -253,7 +254,7 @@ func (tc *Catalog) GetCollectionByName(ctx context.Context, dbID int64, collecti
// [collection3, t3, is_deleted=false]
// t1, t2, t3 are the largest timestamp that less than or equal to @param ts
// the final result will only return collection2 and collection3 since collection1 is deleted
func (tc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.Timestamp) (map[string]*model.Collection, error) {
func (tc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.Timestamp) ([]*model.Collection, error) {
tenantID := contextutil.TenantID(ctx)
// 1. find each collection_id with latest ts <= @param ts
@ -262,7 +263,7 @@ func (tc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.
return nil, err
}
if len(cidTsPairs) == 0 {
return map[string]*model.Collection{}, nil
return make([]*model.Collection, 0), nil
}
// 2. populate each collection
@ -287,13 +288,7 @@ func (tc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.
log.Error("list collections by collection_id & ts pair failed", zap.Uint64("ts", ts), zap.Error(err))
return nil, err
}
r := map[string]*model.Collection{}
for _, c := range collections {
r[c.Name] = c
}
return r, nil
return collections, nil
}
func (tc *Catalog) CollectionExists(ctx context.Context, dbID int64, collectionID typeutil.UniqueID, ts typeutil.Timestamp) bool {

View File

@ -14,6 +14,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/db/dbmodel"
@ -538,17 +539,17 @@ func TestTableCatalog_ListCollections(t *testing.T) {
// collection basic info
require.Equal(t, nil, gotErr)
require.Equal(t, 1, len(res))
require.Equal(t, coll.TenantID, res[coll.CollectionName].TenantID)
require.Equal(t, coll.CollectionID, res[coll.CollectionName].CollectionID)
require.Equal(t, coll.CollectionName, res[coll.CollectionName].Name)
require.Equal(t, coll.AutoID, res[coll.CollectionName].AutoID)
require.Equal(t, coll.Ts, res[coll.CollectionName].CreateTime)
require.Empty(t, res[coll.CollectionName].StartPositions)
require.Equal(t, coll.TenantID, res[0].TenantID)
require.Equal(t, coll.CollectionID, res[0].CollectionID)
require.Equal(t, coll.CollectionName, res[0].Name)
require.Equal(t, coll.AutoID, res[0].AutoID)
require.Equal(t, coll.Ts, res[0].CreateTime)
require.Empty(t, res[0].StartPositions)
// partitions/fields/channels
require.NotEmpty(t, res[coll.CollectionName].Partitions)
require.NotEmpty(t, res[coll.CollectionName].Fields)
require.NotEmpty(t, res[coll.CollectionName].VirtualChannelNames)
require.NotEmpty(t, res[coll.CollectionName].PhysicalChannelNames)
require.NotEmpty(t, res[0].Partitions)
require.NotEmpty(t, res[0].Fields)
require.NotEmpty(t, res[0].VirtualChannelNames)
require.NotEmpty(t, res[0].PhysicalChannelNames)
}
func TestTableCatalog_CollectionExists(t *testing.T) {

View File

@ -10,6 +10,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
@ -113,20 +114,20 @@ func (kc *Catalog) DropDatabase(ctx context.Context, dbID int64, ts typeutil.Tim
return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{key}, ts)
}
func (kc *Catalog) ListDatabases(ctx context.Context, ts typeutil.Timestamp) (map[string]*model.Database, error) {
func (kc *Catalog) ListDatabases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Database, error) {
_, vals, err := kc.Snapshot.LoadWithPrefix(DBInfoMetaPrefix, ts)
if err != nil {
return nil, err
}
dbs := make(map[string]*model.Database)
dbs := make([]*model.Database, 0, len(vals))
for _, val := range vals {
dbMeta := &pb.DatabaseInfo{}
err := proto.Unmarshal([]byte(val), dbMeta)
if err != nil {
return nil, err
}
dbs[dbMeta.Name] = model.UnmarshalDatabaseModel(dbMeta)
dbs = append(dbs, model.UnmarshalDatabaseModel(dbMeta))
}
return dbs, nil
}
@ -575,7 +576,7 @@ func (kc *Catalog) GetCollectionByName(ctx context.Context, dbID int64, collecti
return nil, common.NewCollectionNotExistError(fmt.Sprintf("can't find collection: %s, at timestamp = %d", collectionName, ts))
}
func (kc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.Timestamp) (map[string]*model.Collection, error) {
func (kc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.Timestamp) ([]*model.Collection, error) {
prefix := getDatabasePrefix(dbID)
_, vals, err := kc.Snapshot.LoadWithPrefix(prefix, ts)
if err != nil {
@ -586,7 +587,7 @@ func (kc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.
return nil, err
}
colls := make(map[string]*model.Collection)
colls := make([]*model.Collection, 0, len(vals))
for _, val := range vals {
collMeta := pb.CollectionInfo{}
err := proto.Unmarshal([]byte(val), &collMeta)
@ -598,7 +599,7 @@ func (kc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.
if err != nil {
return nil, err
}
colls[collMeta.Schema.Name] = collection
colls = append(colls, collection)
}
return colls, nil

View File

@ -18,6 +18,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/kv/mocks"
@ -75,6 +76,18 @@ func TestCatalog_ListCollections(t *testing.T) {
{},
},
},
State: pb.CollectionState_CollectionCreated,
}
coll3 := &pb.CollectionInfo{
ID: 3,
Schema: &schemapb.CollectionSchema{
Name: "c1",
Fields: []*schemapb.FieldSchema{
{},
},
},
State: pb.CollectionState_CollectionDropping,
}
targetErr := errors.New("fail")
@ -155,8 +168,7 @@ func TestCatalog_ListCollections(t *testing.T) {
ret, err := kc.ListCollections(ctx, util.NonDBID, ts)
assert.NoError(t, err)
assert.Equal(t, 1, len(ret))
modCol := ret["c1"]
assert.Equal(t, coll1.ID, modCol.CollectionID)
assert.Equal(t, coll1.ID, ret[0].CollectionID)
})
t.Run("list collection with db", func(t *testing.T) {
@ -201,8 +213,12 @@ func TestCatalog_ListCollections(t *testing.T) {
bColl, err := proto.Marshal(coll2)
assert.NoError(t, err)
aColl, err := proto.Marshal(coll3)
assert.NoError(t, err)
kv.On("LoadWithPrefix", CollectionMetaPrefix, ts).
Return([]string{"key"}, []string{string(bColl)}, nil)
Return([]string{"key", "key2"}, []string{string(bColl), string(aColl)}, nil)
partitionMeta := &pb.PartitionInfo{}
pm, err := proto.Marshal(partitionMeta)
@ -228,7 +244,9 @@ func TestCatalog_ListCollections(t *testing.T) {
ret, err := kc.ListCollections(ctx, util.NonDBID, ts)
assert.NoError(t, err)
assert.NotNil(t, ret)
assert.Equal(t, 1, len(ret))
assert.Equal(t, 2, len(ret))
assert.Equal(t, int64(2), ret[0].CollectionID)
assert.Equal(t, int64(3), ret[1].CollectionID)
})
}

View File

@ -396,15 +396,15 @@ func (_m *RootCoordCatalog) ListAliases(ctx context.Context, dbID int64, ts uint
}
// ListCollections provides a mock function with given fields: ctx, dbID, ts
func (_m *RootCoordCatalog) ListCollections(ctx context.Context, dbID int64, ts uint64) (map[string]*model.Collection, error) {
func (_m *RootCoordCatalog) ListCollections(ctx context.Context, dbID int64, ts uint64) ([]*model.Collection, error) {
ret := _m.Called(ctx, dbID, ts)
var r0 map[string]*model.Collection
if rf, ok := ret.Get(0).(func(context.Context, int64, uint64) map[string]*model.Collection); ok {
var r0 []*model.Collection
if rf, ok := ret.Get(0).(func(context.Context, int64, uint64) []*model.Collection); ok {
r0 = rf(ctx, dbID, ts)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[string]*model.Collection)
r0 = ret.Get(0).([]*model.Collection)
}
}
@ -442,15 +442,15 @@ func (_m *RootCoordCatalog) ListCredentials(ctx context.Context) ([]string, erro
}
// ListDatabases provides a mock function with given fields: ctx, ts
func (_m *RootCoordCatalog) ListDatabases(ctx context.Context, ts uint64) (map[string]*model.Database, error) {
func (_m *RootCoordCatalog) ListDatabases(ctx context.Context, ts uint64) ([]*model.Database, error) {
ret := _m.Called(ctx, ts)
var r0 map[string]*model.Database
if rf, ok := ret.Get(0).(func(context.Context, uint64) map[string]*model.Database); ok {
var r0 []*model.Database
if rf, ok := ret.Get(0).(func(context.Context, uint64) []*model.Database); ok {
r0 = rf(ctx, ts)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[string]*model.Database)
r0 = ret.Get(0).([]*model.Database)
}
}

View File

@ -142,7 +142,9 @@ func (mt *MetaTable) reload() error {
}
log.Info("recover databases", zap.Int("num of dbs", len(dbs)))
maps.Copy(mt.dbName2Meta, dbs)
for _, db := range dbs {
mt.dbName2Meta[db.Name] = db
}
dbNames := maps.Keys(mt.dbName2Meta)
// create default database.
if !funcutil.SliceContain(dbNames, util.DefaultDBName) {
@ -167,10 +169,10 @@ func (mt *MetaTable) reload() error {
if err != nil {
return err
}
for collName, collection := range collections {
for _, collection := range collections {
mt.collID2Meta[collection.CollectionID] = collection
mt.names.insert(dbName, collName, collection.CollectionID)
if collection.Available() {
mt.names.insert(dbName, collection.Name, collection.CollectionID)
collectionNum++
partitionNum += int64(collection.GetPartitionNum(true))
}
@ -209,8 +211,8 @@ func (mt *MetaTable) reloadWithNonDatabase() error {
for _, collection := range oldCollections {
mt.collID2Meta[collection.CollectionID] = collection
mt.names.insert(util.DefaultDBName, collection.Name, collection.CollectionID)
if collection.Available() {
mt.names.insert(util.DefaultDBName, collection.Name, collection.CollectionID)
collectionNum++
partitionNum += int64(collection.GetPartitionNum(true))
}
@ -642,22 +644,21 @@ func (mt *MetaTable) listCollectionFromCache(dbName string, onlyAvail bool) ([]*
dbName = util.DefaultDBName
}
collectionIDs, err := mt.names.listCollectionID(dbName)
if err != nil {
return nil, err
db, ok := mt.dbName2Meta[dbName]
if !ok {
return nil, fmt.Errorf("database:%s not found", dbName)
}
collectionFromCache := make([]*model.Collection, 0, len(collectionIDs))
for _, colID := range collectionIDs {
meta, ok := mt.collID2Meta[colID]
if !ok {
return nil, fmt.Errorf("collectionID:%d not existwithin db:%s", colID, dbName)
}
collectionFromCache := make([]*model.Collection, 0, len(mt.collID2Meta))
for _, collMeta := range mt.collID2Meta {
if (collMeta.DBID != util.NonDBID && db.ID == collMeta.DBID) ||
(collMeta.DBID == util.NonDBID && dbName == util.DefaultDBName) {
if onlyAvail && !collMeta.Available() {
continue
}
if onlyAvail && !meta.Available() {
continue
collectionFromCache = append(collectionFromCache, collMeta)
}
collectionFromCache = append(collectionFromCache, meta)
}
return collectionFromCache, nil
}

View File

@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/common"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
@ -803,7 +804,7 @@ func TestMetaTable_reload(t *testing.T) {
catalog.On("ListDatabases",
mock.Anything,
mock.Anything,
).Return(make(map[string]*model.Database), nil)
).Return(make([]*model.Database, 0), nil)
catalog.On("CreateDatabase",
mock.Anything,
mock.Anything,
@ -864,7 +865,7 @@ func TestMetaTable_reload(t *testing.T) {
mock.Anything,
mock.Anything,
).Return(
map[string]*model.Collection{"test": {CollectionID: 100, Name: "test"}},
[]*model.Collection{{CollectionID: 100, Name: "test"}},
nil)
},
func(catalog *mocks.RootCoordCatalog) {
@ -887,7 +888,7 @@ func TestMetaTable_reload(t *testing.T) {
mock.Anything,
mock.Anything,
mock.Anything,
).Return(map[string]*model.Collection{}, nil)
).Return(make([]*model.Collection, 0), nil)
},
func(catalog *mocks.RootCoordCatalog) {
catalog.On("ListAliases",
@ -915,15 +916,13 @@ func TestMetaTable_reload(t *testing.T) {
catalog.On("ListDatabases",
mock.Anything,
mock.Anything,
).Return(map[string]*model.Database{
util.DefaultDBName: model.NewDefaultDatabase(),
}, nil)
).Return([]*model.Database{model.NewDefaultDatabase()}, nil)
catalog.On("ListCollections",
mock.Anything,
mock.Anything,
mock.Anything,
).Return(
map[string]*model.Collection{"test": {CollectionID: 100, Name: "test", State: pb.CollectionState_CollectionCreated}},
[]*model.Collection{{CollectionID: 100, Name: "test", State: pb.CollectionState_CollectionCreated}},
nil)
catalog.On("ListAliases",
mock.Anything,
@ -953,7 +952,7 @@ func TestMetaTable_reload(t *testing.T) {
mock.Anything,
mock.Anything,
).Return(
map[string]*model.Collection{"test": {CollectionID: 100, Name: "test", State: pb.CollectionState_CollectionCreated}},
[]*model.Collection{{CollectionID: 100, Name: "test", State: pb.CollectionState_CollectionCreated}},
nil)
},
func(catalog *mocks.RootCoordCatalog) {
@ -1453,19 +1452,41 @@ func TestMetaTable_EmtpyDatabaseName(t *testing.T) {
t.Run("listCollectionFromCache with empty db", func(t *testing.T) {
mt := &MetaTable{
names: newNameDb(),
dbName2Meta: map[string]*model.Database{
util.DefaultDBName: model.NewDefaultDatabase(),
"db2": model.NewDatabase(2, "db2", pb.DatabaseState_DatabaseCreated),
},
collID2Meta: map[typeutil.UniqueID]*model.Collection{
1: {
CollectionID: 1,
State: pb.CollectionState_CollectionCreated,
},
2: {
CollectionID: 2,
State: pb.CollectionState_CollectionDropping,
DBID: util.DefaultDBID,
},
3: {
CollectionID: 3,
State: pb.CollectionState_CollectionCreated,
DBID: 2,
},
},
}
mt.names.insert(util.DefaultDBName, "name", 1)
ret, err := mt.listCollectionFromCache("", false)
ret, err := mt.listCollectionFromCache("none", false)
assert.Error(t, err)
assert.Nil(t, ret)
ret, err = mt.listCollectionFromCache("", false)
assert.NoError(t, err)
assert.Equal(t, 2, len(ret))
assert.Equal(t, []int64{ret[0].CollectionID, ret[1].CollectionID}, []int64{1, 2})
ret, err = mt.listCollectionFromCache("db2", false)
assert.NoError(t, err)
assert.Equal(t, 1, len(ret))
assert.Equal(t, int64(1), ret[0].CollectionID)
assert.Equal(t, int64(3), ret[0].CollectionID)
})
t.Run("CreateAlias with empty db", func(t *testing.T) {