diff --git a/internal/metastore/catalog.go b/internal/metastore/catalog.go index bea626eee5..cc664808d7 100644 --- a/internal/metastore/catalog.go +++ b/internal/metastore/catalog.go @@ -4,6 +4,7 @@ import ( "context" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -15,12 +16,12 @@ import ( type RootCoordCatalog interface { CreateDatabase(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error DropDatabase(ctx context.Context, dbID int64, ts typeutil.Timestamp) error - ListDatabases(ctx context.Context, ts typeutil.Timestamp) (map[string]*model.Database, error) + ListDatabases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Database, error) CreateCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error GetCollectionByID(ctx context.Context, dbID int64, ts typeutil.Timestamp, collectionID typeutil.UniqueID) (*model.Collection, error) GetCollectionByName(ctx context.Context, dbID int64, collectionName string, ts typeutil.Timestamp) (*model.Collection, error) - ListCollections(ctx context.Context, dbID int64, ts typeutil.Timestamp) (map[string]*model.Collection, error) + ListCollections(ctx context.Context, dbID int64, ts typeutil.Timestamp) ([]*model.Collection, error) CollectionExists(ctx context.Context, dbID int64, collectionID typeutil.UniqueID, ts typeutil.Timestamp) bool DropCollection(ctx context.Context, collectionInfo *model.Collection, ts typeutil.Timestamp) error AlterCollection(ctx context.Context, oldColl *model.Collection, newColl *model.Collection, alterType AlterType, ts typeutil.Timestamp) error diff --git a/internal/metastore/db/rootcoord/table_catalog.go b/internal/metastore/db/rootcoord/table_catalog.go index ef7d00a91f..e31ab3d847 100644 --- a/internal/metastore/db/rootcoord/table_catalog.go +++ b/internal/metastore/db/rootcoord/table_catalog.go @@ -16,13 +16,14 @@ import ( "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metastore/db/dbmodel" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/util/contextutil" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/typeutil" - "go.uber.org/zap" ) type Catalog struct { @@ -48,9 +49,9 @@ func (tc *Catalog) DropDatabase(ctx context.Context, dbID int64, ts typeutil.Tim } -func (tc *Catalog) ListDatabases(ctx context.Context, ts typeutil.Timestamp) (map[string]*model.Database, error) { +func (tc *Catalog) ListDatabases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Database, error) { //TODO - return make(map[string]*model.Database), nil + return make([]*model.Database, 0), nil } func (tc *Catalog) CreateCollection(ctx context.Context, collection *model.Collection, ts typeutil.Timestamp) error { @@ -253,7 +254,7 @@ func (tc *Catalog) GetCollectionByName(ctx context.Context, dbID int64, collecti // [collection3, t3, is_deleted=false] // t1, t2, t3 are the largest timestamp that less than or equal to @param ts // the final result will only return collection2 and collection3 since collection1 is deleted -func (tc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.Timestamp) (map[string]*model.Collection, error) { +func (tc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.Timestamp) ([]*model.Collection, error) { tenantID := contextutil.TenantID(ctx) // 1. find each collection_id with latest ts <= @param ts @@ -262,7 +263,7 @@ func (tc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil. return nil, err } if len(cidTsPairs) == 0 { - return map[string]*model.Collection{}, nil + return make([]*model.Collection, 0), nil } // 2. populate each collection @@ -287,13 +288,7 @@ func (tc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil. log.Error("list collections by collection_id & ts pair failed", zap.Uint64("ts", ts), zap.Error(err)) return nil, err } - - r := map[string]*model.Collection{} - for _, c := range collections { - r[c.Name] = c - } - - return r, nil + return collections, nil } func (tc *Catalog) CollectionExists(ctx context.Context, dbID int64, collectionID typeutil.UniqueID, ts typeutil.Timestamp) bool { diff --git a/internal/metastore/db/rootcoord/table_catalog_test.go b/internal/metastore/db/rootcoord/table_catalog_test.go index b23bae5a4a..23d33621e8 100644 --- a/internal/metastore/db/rootcoord/table_catalog_test.go +++ b/internal/metastore/db/rootcoord/table_catalog_test.go @@ -14,6 +14,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/metastore/db/dbmodel" @@ -538,17 +539,17 @@ func TestTableCatalog_ListCollections(t *testing.T) { // collection basic info require.Equal(t, nil, gotErr) require.Equal(t, 1, len(res)) - require.Equal(t, coll.TenantID, res[coll.CollectionName].TenantID) - require.Equal(t, coll.CollectionID, res[coll.CollectionName].CollectionID) - require.Equal(t, coll.CollectionName, res[coll.CollectionName].Name) - require.Equal(t, coll.AutoID, res[coll.CollectionName].AutoID) - require.Equal(t, coll.Ts, res[coll.CollectionName].CreateTime) - require.Empty(t, res[coll.CollectionName].StartPositions) + require.Equal(t, coll.TenantID, res[0].TenantID) + require.Equal(t, coll.CollectionID, res[0].CollectionID) + require.Equal(t, coll.CollectionName, res[0].Name) + require.Equal(t, coll.AutoID, res[0].AutoID) + require.Equal(t, coll.Ts, res[0].CreateTime) + require.Empty(t, res[0].StartPositions) // partitions/fields/channels - require.NotEmpty(t, res[coll.CollectionName].Partitions) - require.NotEmpty(t, res[coll.CollectionName].Fields) - require.NotEmpty(t, res[coll.CollectionName].VirtualChannelNames) - require.NotEmpty(t, res[coll.CollectionName].PhysicalChannelNames) + require.NotEmpty(t, res[0].Partitions) + require.NotEmpty(t, res[0].Fields) + require.NotEmpty(t, res[0].VirtualChannelNames) + require.NotEmpty(t, res[0].PhysicalChannelNames) } func TestTableCatalog_CollectionExists(t *testing.T) { diff --git a/internal/metastore/kv/rootcoord/kv_catalog.go b/internal/metastore/kv/rootcoord/kv_catalog.go index 96413cc7ec..5781efc1f6 100644 --- a/internal/metastore/kv/rootcoord/kv_catalog.go +++ b/internal/metastore/kv/rootcoord/kv_catalog.go @@ -10,6 +10,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/log" @@ -113,20 +114,20 @@ func (kc *Catalog) DropDatabase(ctx context.Context, dbID int64, ts typeutil.Tim return kc.Snapshot.MultiSaveAndRemoveWithPrefix(nil, []string{key}, ts) } -func (kc *Catalog) ListDatabases(ctx context.Context, ts typeutil.Timestamp) (map[string]*model.Database, error) { +func (kc *Catalog) ListDatabases(ctx context.Context, ts typeutil.Timestamp) ([]*model.Database, error) { _, vals, err := kc.Snapshot.LoadWithPrefix(DBInfoMetaPrefix, ts) if err != nil { return nil, err } - dbs := make(map[string]*model.Database) + dbs := make([]*model.Database, 0, len(vals)) for _, val := range vals { dbMeta := &pb.DatabaseInfo{} err := proto.Unmarshal([]byte(val), dbMeta) if err != nil { return nil, err } - dbs[dbMeta.Name] = model.UnmarshalDatabaseModel(dbMeta) + dbs = append(dbs, model.UnmarshalDatabaseModel(dbMeta)) } return dbs, nil } @@ -575,7 +576,7 @@ func (kc *Catalog) GetCollectionByName(ctx context.Context, dbID int64, collecti return nil, common.NewCollectionNotExistError(fmt.Sprintf("can't find collection: %s, at timestamp = %d", collectionName, ts)) } -func (kc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.Timestamp) (map[string]*model.Collection, error) { +func (kc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.Timestamp) ([]*model.Collection, error) { prefix := getDatabasePrefix(dbID) _, vals, err := kc.Snapshot.LoadWithPrefix(prefix, ts) if err != nil { @@ -586,7 +587,7 @@ func (kc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil. return nil, err } - colls := make(map[string]*model.Collection) + colls := make([]*model.Collection, 0, len(vals)) for _, val := range vals { collMeta := pb.CollectionInfo{} err := proto.Unmarshal([]byte(val), &collMeta) @@ -598,7 +599,7 @@ func (kc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil. if err != nil { return nil, err } - colls[collMeta.Schema.Name] = collection + colls = append(colls, collection) } return colls, nil diff --git a/internal/metastore/kv/rootcoord/kv_catalog_test.go b/internal/metastore/kv/rootcoord/kv_catalog_test.go index 51d9d47418..58f6034b66 100644 --- a/internal/metastore/kv/rootcoord/kv_catalog_test.go +++ b/internal/metastore/kv/rootcoord/kv_catalog_test.go @@ -18,6 +18,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/kv/mocks" @@ -75,6 +76,18 @@ func TestCatalog_ListCollections(t *testing.T) { {}, }, }, + State: pb.CollectionState_CollectionCreated, + } + + coll3 := &pb.CollectionInfo{ + ID: 3, + Schema: &schemapb.CollectionSchema{ + Name: "c1", + Fields: []*schemapb.FieldSchema{ + {}, + }, + }, + State: pb.CollectionState_CollectionDropping, } targetErr := errors.New("fail") @@ -155,8 +168,7 @@ func TestCatalog_ListCollections(t *testing.T) { ret, err := kc.ListCollections(ctx, util.NonDBID, ts) assert.NoError(t, err) assert.Equal(t, 1, len(ret)) - modCol := ret["c1"] - assert.Equal(t, coll1.ID, modCol.CollectionID) + assert.Equal(t, coll1.ID, ret[0].CollectionID) }) t.Run("list collection with db", func(t *testing.T) { @@ -201,8 +213,12 @@ func TestCatalog_ListCollections(t *testing.T) { bColl, err := proto.Marshal(coll2) assert.NoError(t, err) + + aColl, err := proto.Marshal(coll3) + assert.NoError(t, err) + kv.On("LoadWithPrefix", CollectionMetaPrefix, ts). - Return([]string{"key"}, []string{string(bColl)}, nil) + Return([]string{"key", "key2"}, []string{string(bColl), string(aColl)}, nil) partitionMeta := &pb.PartitionInfo{} pm, err := proto.Marshal(partitionMeta) @@ -228,7 +244,9 @@ func TestCatalog_ListCollections(t *testing.T) { ret, err := kc.ListCollections(ctx, util.NonDBID, ts) assert.NoError(t, err) assert.NotNil(t, ret) - assert.Equal(t, 1, len(ret)) + assert.Equal(t, 2, len(ret)) + assert.Equal(t, int64(2), ret[0].CollectionID) + assert.Equal(t, int64(3), ret[1].CollectionID) }) } diff --git a/internal/metastore/mocks/RootCoordCatalog.go b/internal/metastore/mocks/RootCoordCatalog.go index 5a55800389..18099f87d4 100644 --- a/internal/metastore/mocks/RootCoordCatalog.go +++ b/internal/metastore/mocks/RootCoordCatalog.go @@ -396,15 +396,15 @@ func (_m *RootCoordCatalog) ListAliases(ctx context.Context, dbID int64, ts uint } // ListCollections provides a mock function with given fields: ctx, dbID, ts -func (_m *RootCoordCatalog) ListCollections(ctx context.Context, dbID int64, ts uint64) (map[string]*model.Collection, error) { +func (_m *RootCoordCatalog) ListCollections(ctx context.Context, dbID int64, ts uint64) ([]*model.Collection, error) { ret := _m.Called(ctx, dbID, ts) - var r0 map[string]*model.Collection - if rf, ok := ret.Get(0).(func(context.Context, int64, uint64) map[string]*model.Collection); ok { + var r0 []*model.Collection + if rf, ok := ret.Get(0).(func(context.Context, int64, uint64) []*model.Collection); ok { r0 = rf(ctx, dbID, ts) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(map[string]*model.Collection) + r0 = ret.Get(0).([]*model.Collection) } } @@ -442,15 +442,15 @@ func (_m *RootCoordCatalog) ListCredentials(ctx context.Context) ([]string, erro } // ListDatabases provides a mock function with given fields: ctx, ts -func (_m *RootCoordCatalog) ListDatabases(ctx context.Context, ts uint64) (map[string]*model.Database, error) { +func (_m *RootCoordCatalog) ListDatabases(ctx context.Context, ts uint64) ([]*model.Database, error) { ret := _m.Called(ctx, ts) - var r0 map[string]*model.Database - if rf, ok := ret.Get(0).(func(context.Context, uint64) map[string]*model.Database); ok { + var r0 []*model.Database + if rf, ok := ret.Get(0).(func(context.Context, uint64) []*model.Database); ok { r0 = rf(ctx, ts) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(map[string]*model.Database) + r0 = ret.Get(0).([]*model.Database) } } diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index a1e786b59c..ac6abd57f9 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -142,7 +142,9 @@ func (mt *MetaTable) reload() error { } log.Info("recover databases", zap.Int("num of dbs", len(dbs))) - maps.Copy(mt.dbName2Meta, dbs) + for _, db := range dbs { + mt.dbName2Meta[db.Name] = db + } dbNames := maps.Keys(mt.dbName2Meta) // create default database. if !funcutil.SliceContain(dbNames, util.DefaultDBName) { @@ -167,10 +169,10 @@ func (mt *MetaTable) reload() error { if err != nil { return err } - for collName, collection := range collections { + for _, collection := range collections { mt.collID2Meta[collection.CollectionID] = collection - mt.names.insert(dbName, collName, collection.CollectionID) if collection.Available() { + mt.names.insert(dbName, collection.Name, collection.CollectionID) collectionNum++ partitionNum += int64(collection.GetPartitionNum(true)) } @@ -209,8 +211,8 @@ func (mt *MetaTable) reloadWithNonDatabase() error { for _, collection := range oldCollections { mt.collID2Meta[collection.CollectionID] = collection - mt.names.insert(util.DefaultDBName, collection.Name, collection.CollectionID) if collection.Available() { + mt.names.insert(util.DefaultDBName, collection.Name, collection.CollectionID) collectionNum++ partitionNum += int64(collection.GetPartitionNum(true)) } @@ -642,22 +644,21 @@ func (mt *MetaTable) listCollectionFromCache(dbName string, onlyAvail bool) ([]* dbName = util.DefaultDBName } - collectionIDs, err := mt.names.listCollectionID(dbName) - if err != nil { - return nil, err + db, ok := mt.dbName2Meta[dbName] + if !ok { + return nil, fmt.Errorf("database:%s not found", dbName) } - collectionFromCache := make([]*model.Collection, 0, len(collectionIDs)) - for _, colID := range collectionIDs { - meta, ok := mt.collID2Meta[colID] - if !ok { - return nil, fmt.Errorf("collectionID:%d not existwithin db:%s", colID, dbName) - } + collectionFromCache := make([]*model.Collection, 0, len(mt.collID2Meta)) + for _, collMeta := range mt.collID2Meta { + if (collMeta.DBID != util.NonDBID && db.ID == collMeta.DBID) || + (collMeta.DBID == util.NonDBID && dbName == util.DefaultDBName) { + if onlyAvail && !collMeta.Available() { + continue + } - if onlyAvail && !meta.Available() { - continue + collectionFromCache = append(collectionFromCache, collMeta) } - collectionFromCache = append(collectionFromCache, meta) } return collectionFromCache, nil } diff --git a/internal/rootcoord/meta_table_test.go b/internal/rootcoord/meta_table_test.go index d57d6eff77..1d8fde88b4 100644 --- a/internal/rootcoord/meta_table_test.go +++ b/internal/rootcoord/meta_table_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/require" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/common" memkv "github.com/milvus-io/milvus/internal/kv/mem" "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord" @@ -803,7 +804,7 @@ func TestMetaTable_reload(t *testing.T) { catalog.On("ListDatabases", mock.Anything, mock.Anything, - ).Return(make(map[string]*model.Database), nil) + ).Return(make([]*model.Database, 0), nil) catalog.On("CreateDatabase", mock.Anything, mock.Anything, @@ -864,7 +865,7 @@ func TestMetaTable_reload(t *testing.T) { mock.Anything, mock.Anything, ).Return( - map[string]*model.Collection{"test": {CollectionID: 100, Name: "test"}}, + []*model.Collection{{CollectionID: 100, Name: "test"}}, nil) }, func(catalog *mocks.RootCoordCatalog) { @@ -887,7 +888,7 @@ func TestMetaTable_reload(t *testing.T) { mock.Anything, mock.Anything, mock.Anything, - ).Return(map[string]*model.Collection{}, nil) + ).Return(make([]*model.Collection, 0), nil) }, func(catalog *mocks.RootCoordCatalog) { catalog.On("ListAliases", @@ -915,15 +916,13 @@ func TestMetaTable_reload(t *testing.T) { catalog.On("ListDatabases", mock.Anything, mock.Anything, - ).Return(map[string]*model.Database{ - util.DefaultDBName: model.NewDefaultDatabase(), - }, nil) + ).Return([]*model.Database{model.NewDefaultDatabase()}, nil) catalog.On("ListCollections", mock.Anything, mock.Anything, mock.Anything, ).Return( - map[string]*model.Collection{"test": {CollectionID: 100, Name: "test", State: pb.CollectionState_CollectionCreated}}, + []*model.Collection{{CollectionID: 100, Name: "test", State: pb.CollectionState_CollectionCreated}}, nil) catalog.On("ListAliases", mock.Anything, @@ -953,7 +952,7 @@ func TestMetaTable_reload(t *testing.T) { mock.Anything, mock.Anything, ).Return( - map[string]*model.Collection{"test": {CollectionID: 100, Name: "test", State: pb.CollectionState_CollectionCreated}}, + []*model.Collection{{CollectionID: 100, Name: "test", State: pb.CollectionState_CollectionCreated}}, nil) }, func(catalog *mocks.RootCoordCatalog) { @@ -1453,19 +1452,41 @@ func TestMetaTable_EmtpyDatabaseName(t *testing.T) { t.Run("listCollectionFromCache with empty db", func(t *testing.T) { mt := &MetaTable{ names: newNameDb(), + dbName2Meta: map[string]*model.Database{ + util.DefaultDBName: model.NewDefaultDatabase(), + "db2": model.NewDatabase(2, "db2", pb.DatabaseState_DatabaseCreated), + }, collID2Meta: map[typeutil.UniqueID]*model.Collection{ 1: { CollectionID: 1, State: pb.CollectionState_CollectionCreated, }, + 2: { + CollectionID: 2, + State: pb.CollectionState_CollectionDropping, + DBID: util.DefaultDBID, + }, + 3: { + CollectionID: 3, + State: pb.CollectionState_CollectionCreated, + DBID: 2, + }, }, } - mt.names.insert(util.DefaultDBName, "name", 1) - ret, err := mt.listCollectionFromCache("", false) + ret, err := mt.listCollectionFromCache("none", false) + assert.Error(t, err) + assert.Nil(t, ret) + + ret, err = mt.listCollectionFromCache("", false) + assert.NoError(t, err) + assert.Equal(t, 2, len(ret)) + assert.Equal(t, []int64{ret[0].CollectionID, ret[1].CollectionID}, []int64{1, 2}) + + ret, err = mt.listCollectionFromCache("db2", false) assert.NoError(t, err) assert.Equal(t, 1, len(ret)) - assert.Equal(t, int64(1), ret[0].CollectionID) + assert.Equal(t, int64(3), ret[0].CollectionID) }) t.Run("CreateAlias with empty db", func(t *testing.T) {