diff --git a/cmd/tools/migration/backend/etcd210.go b/cmd/tools/migration/backend/etcd210.go index 54722f8c1a..137ad51d8d 100644 --- a/cmd/tools/migration/backend/etcd210.go +++ b/cmd/tools/migration/backend/etcd210.go @@ -21,7 +21,9 @@ import ( "github.com/milvus-io/milvus/cmd/tools/migration/utils" "github.com/milvus-io/milvus/cmd/tools/migration/versions" "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord" + "github.com/milvus-io/milvus/internal/metastore/model" pb "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/typeutil" ) @@ -293,6 +295,35 @@ func (b etcd210) loadLastDDLRecords() (meta.LastDDLRecords, error) { return records, nil } +func (b etcd210) loadLoadInfos() (meta.CollectionLoadInfo210, error) { + loadInfo := make(meta.CollectionLoadInfo210) + _, collectionValues, err := b.txn.LoadWithPrefix(legacy.CollectionLoadMetaPrefixV1) + if err != nil { + return nil, err + } + for _, value := range collectionValues { + collectionInfo := querypb.CollectionInfo{} + err = proto.Unmarshal([]byte(value), &collectionInfo) + if err != nil { + return nil, err + } + if collectionInfo.InMemoryPercentage < 100 { + continue + } + loadInfo[collectionInfo.CollectionID] = &model.CollectionLoadInfo{ + CollectionID: collectionInfo.GetCollectionID(), + PartitionIDs: collectionInfo.GetPartitionIDs(), + ReleasedPartitionIDs: collectionInfo.GetReleasedPartitionIDs(), + LoadType: collectionInfo.GetLoadType(), + LoadPercentage: 100, + Status: querypb.LoadStatus_Loaded, + ReplicaNumber: collectionInfo.GetReplicaNumber(), + FieldIndexID: make(map[int64]int64), + } + } + return loadInfo, nil +} + func (b etcd210) Load() (*meta.Meta, error) { ttCollections, err := b.loadTtCollections() if err != nil { @@ -326,17 +357,22 @@ func (b etcd210) Load() (*meta.Meta, error) { if err != nil { return nil, err } + loadInfos, err := b.loadLoadInfos() + if err != nil { + return nil, err + } return &meta.Meta{ Version: versions.Version210, Meta210: &meta.All210{ - TtCollections: ttCollections, - Collections: collections, - TtAliases: ttAliases, - Aliases: aliases, - CollectionIndexes: collectionIndexes, - SegmentIndexes: segmentIndexes, - IndexBuildMeta: indexBuildMeta, - LastDDLRecords: lastDdlRecords, + TtCollections: ttCollections, + Collections: collections, + TtAliases: ttAliases, + Aliases: aliases, + CollectionIndexes: collectionIndexes, + SegmentIndexes: segmentIndexes, + IndexBuildMeta: indexBuildMeta, + LastDDLRecords: lastDdlRecords, + CollectionLoadInfos: loadInfos, }, }, nil } diff --git a/cmd/tools/migration/backend/etcd220.go b/cmd/tools/migration/backend/etcd220.go index af8ba7d434..8ad186e488 100644 --- a/cmd/tools/migration/backend/etcd220.go +++ b/cmd/tools/migration/backend/etcd220.go @@ -99,6 +99,24 @@ func (b etcd220) Save(metas *meta.Meta) error { return err } } + { + saves, err := metas.Meta220.CollectionLoadInfos.GenerateSaves() + if err != nil { + return err + } + if err := b.save(saves); err != nil { + return err + } + } + { + saves, err := metas.Meta220.PartitionLoadInfos.GenerateSaves() + if err != nil { + return err + } + if err := b.save(saves); err != nil { + return err + } + } return nil } diff --git a/cmd/tools/migration/legacy/constant.go b/cmd/tools/migration/legacy/constant.go index b84595eee7..38cbd6f739 100644 --- a/cmd/tools/migration/legacy/constant.go +++ b/cmd/tools/migration/legacy/constant.go @@ -8,4 +8,5 @@ const ( IndexMetaBefore220Prefix = rootcoord.ComponentPrefix + "/index" SegmentIndexPrefixBefore220 = rootcoord.ComponentPrefix + "/segment-index" IndexBuildPrefixBefore220 = "indexes" + CollectionLoadMetaPrefixV1 = "queryCoord-collectionMeta" ) diff --git a/cmd/tools/migration/meta/210_to_220.go b/cmd/tools/migration/meta/210_to_220.go index 528da0d191..f2724c5141 100644 --- a/cmd/tools/migration/meta/210_to_220.go +++ b/cmd/tools/migration/meta/210_to_220.go @@ -5,10 +5,13 @@ import ( "sort" "strings" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/typeutil" + "go.uber.org/zap" "github.com/milvus-io/milvus/internal/metastore/model" pb "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/cmd/tools/migration/versions" ) @@ -96,6 +99,39 @@ func (meta *CollectionsMeta210) to220() (CollectionsMeta220, FieldIndexes210, er return collections, fieldIndexes, nil } +func (meta *CollectionLoadInfo210) to220() (CollectionLoadInfo220, PartitionLoadInfo220, error) { + collectionLoadInfos := make(CollectionLoadInfo220) + partitionLoadInfos := make(PartitionLoadInfo220) + for collectionID, loadInfo := range *meta { + if loadInfo.LoadPercentage < 100 { + continue + } + + switch loadInfo.LoadType { + case querypb.LoadType_LoadCollection: + collectionLoadInfos[collectionID] = loadInfo + case querypb.LoadType_LoadPartition: + partitions, ok := partitionLoadInfos[collectionID] + if !ok { + partitions = make(map[int64]*model.PartitionLoadInfo) + partitionLoadInfos[collectionID] = partitions + } + for _, partitionID := range loadInfo.PartitionIDs { + partitions[partitionID] = &model.PartitionLoadInfo{ + CollectionID: collectionID, + PartitionID: partitionID, + LoadType: querypb.LoadType_LoadPartition, + LoadPercentage: 100, + Status: querypb.LoadStatus_Loaded, + ReplicaNumber: loadInfo.ReplicaNumber, + } + } + } + } + + return collectionLoadInfos, partitionLoadInfos, nil +} + func combineToCollectionIndexesMeta220(fieldIndexes FieldIndexes210, collectionIndexes CollectionIndexesMeta210) (CollectionIndexesMeta220, error) { indexes := make(CollectionIndexesMeta220) for collectionID := range fieldIndexes { @@ -176,6 +212,33 @@ func combineToSegmentIndexesMeta220(segmentIndexes SegmentIndexesMeta210, indexB return segmentIndexModels, nil } +func combineToLoadInfo220(collectionLoadInfo CollectionLoadInfo220, partitionLoadInto PartitionLoadInfo220, fieldIndexes FieldIndexes210) { + for collectionID, loadInfo := range collectionLoadInfo { + indexes, ok := fieldIndexes[collectionID] + if !ok { + log.Warn("release the collection without index", zap.Int64("collectionID", collectionID)) + delete(collectionLoadInfo, collectionID) + } + + for _, index := range indexes.indexes { + loadInfo.FieldIndexID[index.GetFiledID()] = index.GetIndexID() + } + } + + for collectionID, partitions := range partitionLoadInto { + indexes, ok := fieldIndexes[collectionID] + if !ok { + log.Warn("release the collection without index", zap.Int64("collectionID", collectionID)) + delete(collectionLoadInfo, collectionID) + } + for _, loadInfo := range partitions { + for _, index := range indexes.indexes { + loadInfo.FieldIndexID[index.GetFiledID()] = index.GetIndexID() + } + } + } +} + func From210To220(metas *Meta) (*Meta, error) { if !metas.Version.EQ(versions.Version210) { return nil, fmt.Errorf("version mismatch: %s", metas.Version.String()) @@ -196,6 +259,11 @@ func From210To220(metas *Meta) (*Meta, error) { if err != nil { return nil, err } + collectionLoadInfos, partitionLoadInfos, err := metas.Meta210.CollectionLoadInfos.to220() + if err != nil { + return nil, err + } + fieldIndexes.Merge(fieldIndexes2) collectionIndexes, err := combineToCollectionIndexesMeta220(fieldIndexes, metas.Meta210.CollectionIndexes) if err != nil { @@ -205,20 +273,24 @@ func From210To220(metas *Meta) (*Meta, error) { if err != nil { return nil, err } + combineToLoadInfo220(collectionLoadInfos, partitionLoadInfos, fieldIndexes) + metas220 := &Meta{ SourceVersion: metas.Version, Version: versions.Version220, Meta220: &All220{ - TtCollections: ttCollections, - Collections: collections, - TtAliases: ttAliases, - Aliases: aliases, - TtPartitions: make(TtPartitionsMeta220), - Partitions: make(PartitionsMeta220), - TtFields: make(TtFieldsMeta220), - Fields: make(FieldsMeta220), - CollectionIndexes: collectionIndexes, - SegmentIndexes: segmentIndexes, + TtCollections: ttCollections, + Collections: collections, + TtAliases: ttAliases, + Aliases: aliases, + TtPartitions: make(TtPartitionsMeta220), + Partitions: make(PartitionsMeta220), + TtFields: make(TtFieldsMeta220), + Fields: make(FieldsMeta220), + CollectionIndexes: collectionIndexes, + SegmentIndexes: segmentIndexes, + CollectionLoadInfos: collectionLoadInfos, + PartitionLoadInfos: partitionLoadInfos, }, } return metas220, nil diff --git a/cmd/tools/migration/meta/meta210.go b/cmd/tools/migration/meta/meta210.go index e11706e379..a5c6d6209c 100644 --- a/cmd/tools/migration/meta/meta210.go +++ b/cmd/tools/migration/meta/meta210.go @@ -8,6 +8,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord" + "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus-proto/go-api/schemapb" pb "github.com/milvus-io/milvus/internal/proto/etcdpb" @@ -32,6 +33,8 @@ type IndexBuildMeta210 map[UniqueID]*legacypb.IndexMeta // index_build_id -> ind type LastDDLRecords map[string]string // We don't care this since it didn't work. +type CollectionLoadInfo210 map[UniqueID]*model.CollectionLoadInfo // collectionID -> CollectionLoadInfo + type All210 struct { TtAliases TtAliasesMeta210 Aliases AliasesMeta210 @@ -44,6 +47,8 @@ type All210 struct { IndexBuildMeta IndexBuildMeta210 LastDDLRecords LastDDLRecords + + CollectionLoadInfos CollectionLoadInfo210 } func (meta *All210) GenerateSaves() map[string]string { diff --git a/cmd/tools/migration/meta/meta220.go b/cmd/tools/migration/meta/meta220.go index f359b99518..7105645d62 100644 --- a/cmd/tools/migration/meta/meta220.go +++ b/cmd/tools/migration/meta/meta220.go @@ -5,8 +5,10 @@ import ( "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/cmd/tools/migration/versions" "github.com/milvus-io/milvus/internal/metastore/kv/indexcoord" + "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" "github.com/milvus-io/milvus/internal/metastore/kv/rootcoord" "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/proto/querypb" ) type TtCollectionsMeta220 map[UniqueID]map[Timestamp]*model.Collection // coll_id -> ts -> coll @@ -24,6 +26,9 @@ type FieldsMeta220 map[UniqueID][]*model.Field // coll_id -> ts type CollectionIndexesMeta220 map[UniqueID]map[UniqueID]*model.Index // coll_id -> index_id -> index type SegmentIndexesMeta220 map[UniqueID]map[UniqueID]*model.SegmentIndex // seg_id -> index_id -> segment index +type CollectionLoadInfo220 map[UniqueID]*model.CollectionLoadInfo // collectionID -> CollectionLoadInfo +type PartitionLoadInfo220 map[UniqueID]map[UniqueID]*model.PartitionLoadInfo // collectionID, partitionID -> PartitionLoadInfo + func (meta *TtCollectionsMeta220) GenerateSaves(sourceVersion semver.Version) (map[string]string, error) { saves := make(map[string]string) @@ -230,6 +235,46 @@ func (meta *SegmentIndexesMeta220) AddRecord(segID UniqueID, indexID UniqueID, r } } +func (meta *CollectionLoadInfo220) GenerateSaves() (map[string]string, error) { + saves := make(map[string]string) + for _, loadInfo := range *meta { + k := querycoord.EncodeCollectionLoadInfoKey(loadInfo.CollectionID) + v, err := proto.Marshal(&querypb.CollectionLoadInfo{ + CollectionID: loadInfo.CollectionID, + ReleasedPartitions: loadInfo.ReleasedPartitionIDs, + ReplicaNumber: loadInfo.ReplicaNumber, + Status: loadInfo.Status, + FieldIndexID: loadInfo.FieldIndexID, + }) + if err != nil { + return nil, err + } + saves[k] = string(v) + } + return saves, nil +} + +func (meta *PartitionLoadInfo220) GenerateSaves() (map[string]string, error) { + saves := make(map[string]string) + for _, partitions := range *meta { + for _, loadInfo := range partitions { + k := querycoord.EncodePartitionLoadInfoKey(loadInfo.CollectionID, loadInfo.PartitionID) + v, err := proto.Marshal(&querypb.PartitionLoadInfo{ + CollectionID: loadInfo.CollectionID, + PartitionID: loadInfo.PartitionID, + ReplicaNumber: loadInfo.ReplicaNumber, + Status: loadInfo.Status, + FieldIndexID: loadInfo.FieldIndexID, + }) + if err != nil { + return nil, err + } + saves[k] = string(v) + } + } + return saves, nil +} + type All220 struct { TtCollections TtCollectionsMeta220 Collections CollectionsMeta220 @@ -245,4 +290,8 @@ type All220 struct { CollectionIndexes CollectionIndexesMeta220 SegmentIndexes SegmentIndexesMeta220 + + // QueryCoord Meta + CollectionLoadInfos CollectionLoadInfo220 + PartitionLoadInfos PartitionLoadInfo220 } diff --git a/internal/metastore/kv/querycoord/kv_catalog.go b/internal/metastore/kv/querycoord/kv_catalog.go index 66ea36d858..14485123af 100644 --- a/internal/metastore/kv/querycoord/kv_catalog.go +++ b/internal/metastore/kv/querycoord/kv_catalog.go @@ -39,7 +39,7 @@ func NewCatalog(cli kv.MetaKv) Catalog { } func (s Catalog) SaveCollection(info *querypb.CollectionLoadInfo) error { - k := encodeCollectionLoadInfoKey(info.GetCollectionID()) + k := EncodeCollectionLoadInfoKey(info.GetCollectionID()) v, err := proto.Marshal(info) if err != nil { return err @@ -50,7 +50,7 @@ func (s Catalog) SaveCollection(info *querypb.CollectionLoadInfo) error { func (s Catalog) SavePartition(info ...*querypb.PartitionLoadInfo) error { kvs := make(map[string]string) for _, partition := range info { - key := encodePartitionLoadInfoKey(partition.GetCollectionID(), partition.GetPartitionID()) + key := EncodePartitionLoadInfoKey(partition.GetCollectionID(), partition.GetPartitionID()) value, err := proto.Marshal(partition) if err != nil { return err @@ -61,7 +61,7 @@ func (s Catalog) SavePartition(info ...*querypb.PartitionLoadInfo) error { } func (s Catalog) SaveReplica(replica *querypb.Replica) error { - key := encodeReplicaKey(replica.GetCollectionID(), replica.GetID()) + key := EncodeReplicaKey(replica.GetCollectionID(), replica.GetID()) value, err := proto.Marshal(replica) if err != nil { return err @@ -83,38 +83,6 @@ func (s Catalog) GetCollections() ([]*querypb.CollectionLoadInfo, error) { ret = append(ret, &info) } - collectionsV1, err := s.getCollectionsFromV1() - if err != nil { - return nil, err - } - ret = append(ret, collectionsV1...) - - return ret, nil -} - -// getCollectionsFromV1 recovers collections from 2.1 meta store -func (s Catalog) getCollectionsFromV1() ([]*querypb.CollectionLoadInfo, error) { - _, collectionValues, err := s.cli.LoadWithPrefix(CollectionMetaPrefixV1) - if err != nil { - return nil, err - } - ret := make([]*querypb.CollectionLoadInfo, 0, len(collectionValues)) - for _, value := range collectionValues { - collectionInfo := querypb.CollectionInfo{} - err = proto.Unmarshal([]byte(value), &collectionInfo) - if err != nil { - return nil, err - } - if collectionInfo.LoadType != querypb.LoadType_LoadCollection { - continue - } - ret = append(ret, &querypb.CollectionLoadInfo{ - CollectionID: collectionInfo.GetCollectionID(), - ReleasedPartitions: collectionInfo.GetReleasedPartitionIDs(), - ReplicaNumber: collectionInfo.GetReplicaNumber(), - Status: querypb.LoadStatus_Loaded, - }) - } return ret, nil } @@ -132,43 +100,6 @@ func (s Catalog) GetPartitions() (map[int64][]*querypb.PartitionLoadInfo, error) ret[info.GetCollectionID()] = append(ret[info.GetCollectionID()], &info) } - partitionsV1, err := s.getPartitionsFromV1() - if err != nil { - return nil, err - } - for _, partition := range partitionsV1 { - ret[partition.GetCollectionID()] = append(ret[partition.GetCollectionID()], partition) - } - - return ret, nil -} - -// getCollectionsFromV1 recovers collections from 2.1 meta store -func (s Catalog) getPartitionsFromV1() ([]*querypb.PartitionLoadInfo, error) { - _, collectionValues, err := s.cli.LoadWithPrefix(CollectionMetaPrefixV1) - if err != nil { - return nil, err - } - ret := make([]*querypb.PartitionLoadInfo, 0, len(collectionValues)) - for _, value := range collectionValues { - collectionInfo := querypb.CollectionInfo{} - err = proto.Unmarshal([]byte(value), &collectionInfo) - if err != nil { - return nil, err - } - if collectionInfo.LoadType != querypb.LoadType_LoadPartition { - continue - } - - for _, partition := range collectionInfo.GetPartitionIDs() { - ret = append(ret, &querypb.PartitionLoadInfo{ - CollectionID: collectionInfo.GetCollectionID(), - PartitionID: partition, - ReplicaNumber: collectionInfo.GetReplicaNumber(), - Status: querypb.LoadStatus_Loaded, - }) - } - } return ret, nil } @@ -219,48 +150,48 @@ func (s Catalog) getReplicasFromV1() ([]*querypb.Replica, error) { } func (s Catalog) ReleaseCollection(id int64) error { - k := encodeCollectionLoadInfoKey(id) + k := EncodeCollectionLoadInfoKey(id) return s.cli.Remove(k) } func (s Catalog) ReleasePartition(collection int64, partitions ...int64) error { keys := lo.Map(partitions, func(partition int64, _ int) string { - return encodePartitionLoadInfoKey(collection, partition) + return EncodePartitionLoadInfoKey(collection, partition) }) return s.cli.MultiRemove(keys) } func (s Catalog) ReleaseReplicas(collectionID int64) error { - key := encodeCollectionReplicaKey(collectionID) + key := EncodeCollectionReplicaKey(collectionID) return s.cli.RemoveWithPrefix(key) } func (s Catalog) ReleaseReplica(collection, replica int64) error { - key := encodeReplicaKey(collection, replica) + key := EncodeReplicaKey(collection, replica) return s.cli.Remove(key) } func (s Catalog) RemoveHandoffEvent(info *querypb.SegmentInfo) error { - key := encodeHandoffEventKey(info.CollectionID, info.PartitionID, info.SegmentID) + key := EncodeHandoffEventKey(info.CollectionID, info.PartitionID, info.SegmentID) return s.cli.Remove(key) } -func encodeCollectionLoadInfoKey(collection int64) string { +func EncodeCollectionLoadInfoKey(collection int64) string { return fmt.Sprintf("%s/%d", CollectionLoadInfoPrefix, collection) } -func encodePartitionLoadInfoKey(collection, partition int64) string { +func EncodePartitionLoadInfoKey(collection, partition int64) string { return fmt.Sprintf("%s/%d/%d", PartitionLoadInfoPrefix, collection, partition) } -func encodeReplicaKey(collection, replica int64) string { +func EncodeReplicaKey(collection, replica int64) string { return fmt.Sprintf("%s/%d/%d", ReplicaPrefix, collection, replica) } -func encodeCollectionReplicaKey(collection int64) string { +func EncodeCollectionReplicaKey(collection int64) string { return fmt.Sprintf("%s/%d", ReplicaPrefix, collection) } -func encodeHandoffEventKey(collection, partition, segment int64) string { +func EncodeHandoffEventKey(collection, partition, segment int64) string { return fmt.Sprintf("%s/%d/%d/%d", util.HandoffSegmentPrefix, collection, partition, segment) } diff --git a/internal/metastore/model/load_info.go b/internal/metastore/model/load_info.go new file mode 100644 index 0000000000..1a79a50164 --- /dev/null +++ b/internal/metastore/model/load_info.go @@ -0,0 +1,24 @@ +package model + +import "github.com/milvus-io/milvus/internal/proto/querypb" + +type CollectionLoadInfo struct { + CollectionID int64 + PartitionIDs []int64 + ReleasedPartitionIDs []int64 + LoadType querypb.LoadType + LoadPercentage int64 + Status querypb.LoadStatus + ReplicaNumber int32 + FieldIndexID map[int64]int64 +} + +type PartitionLoadInfo struct { + CollectionID int64 + PartitionID int64 + LoadType querypb.LoadType + LoadPercentage int64 + Status querypb.LoadStatus + ReplicaNumber int32 + FieldIndexID map[int64]int64 +} diff --git a/internal/querycoordv2/meta/collection_manager_test.go b/internal/querycoordv2/meta/collection_manager_test.go index ddbcd5fea4..ea767ce313 100644 --- a/internal/querycoordv2/meta/collection_manager_test.go +++ b/internal/querycoordv2/meta/collection_manager_test.go @@ -21,7 +21,6 @@ import ( "testing" "time" - "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/proto/querypb" @@ -243,46 +242,6 @@ func (suite *CollectionManagerSuite) TestRecover() { exist := suite.loadPercentage[i] == 100 suite.Equal(exist, mgr.Exist(collection)) } - - // Test recover from 2.1 meta store - collectionInfo := querypb.CollectionInfo{ - CollectionID: 1000, - PartitionIDs: []int64{100, 101}, - LoadType: querypb.LoadType_LoadCollection, - ReplicaNumber: 3, - } - value, err := proto.Marshal(&collectionInfo) - suite.NoError(err) - err = suite.kv.Save(CollectionMetaPrefixV1+"/1000", string(value)) - suite.NoError(err) - - collectionInfo = querypb.CollectionInfo{ - CollectionID: 1001, - PartitionIDs: []int64{102, 103}, - LoadType: querypb.LoadType_LoadPartition, - ReplicaNumber: 1, - } - value, err = proto.Marshal(&collectionInfo) - suite.NoError(err) - err = suite.kv.Save(CollectionMetaPrefixV1+"/1001", string(value)) - suite.NoError(err) - - suite.clearMemory() - err = mgr.Recover() - suite.NoError(err) - - // Verify collection - suite.True(mgr.Exist(1000)) - suite.Equal(querypb.LoadType_LoadCollection, mgr.GetLoadType(1000)) - suite.EqualValues(3, mgr.GetReplicaNumber(1000)) - - // Verify partitions - suite.True(mgr.Exist(1001)) - suite.Equal(querypb.LoadType_LoadPartition, mgr.GetLoadType(1001)) - suite.EqualValues(1, mgr.GetReplicaNumber(1001)) - suite.NotNil(mgr.GetPartition(102)) - suite.NotNil(mgr.GetPartition(103)) - suite.Len(mgr.getPartitionsByCollection(1001), 2) } func (suite *CollectionManagerSuite) loadAll() { diff --git a/internal/querycoordv2/meta/store.go b/internal/querycoordv2/meta/store.go index 9231814e39..fe9fea3ee9 100644 --- a/internal/querycoordv2/meta/store.go +++ b/internal/querycoordv2/meta/store.go @@ -107,38 +107,6 @@ func (s metaStore) GetCollections() ([]*querypb.CollectionLoadInfo, error) { ret = append(ret, &info) } - collectionsV1, err := s.getCollectionsFromV1() - if err != nil { - return nil, err - } - ret = append(ret, collectionsV1...) - - return ret, nil -} - -// getCollectionsFromV1 recovers collections from 2.1 meta store -func (s metaStore) getCollectionsFromV1() ([]*querypb.CollectionLoadInfo, error) { - _, collectionValues, err := s.cli.LoadWithPrefix(CollectionMetaPrefixV1) - if err != nil { - return nil, err - } - ret := make([]*querypb.CollectionLoadInfo, 0, len(collectionValues)) - for _, value := range collectionValues { - collectionInfo := querypb.CollectionInfo{} - err = proto.Unmarshal([]byte(value), &collectionInfo) - if err != nil { - return nil, err - } - if collectionInfo.LoadType != querypb.LoadType_LoadCollection { - continue - } - ret = append(ret, &querypb.CollectionLoadInfo{ - CollectionID: collectionInfo.GetCollectionID(), - ReleasedPartitions: collectionInfo.GetReleasedPartitionIDs(), - ReplicaNumber: collectionInfo.GetReplicaNumber(), - Status: querypb.LoadStatus_Loaded, - }) - } return ret, nil } @@ -156,43 +124,6 @@ func (s metaStore) GetPartitions() (map[int64][]*querypb.PartitionLoadInfo, erro ret[info.GetCollectionID()] = append(ret[info.GetCollectionID()], &info) } - partitionsV1, err := s.getPartitionsFromV1() - if err != nil { - return nil, err - } - for _, partition := range partitionsV1 { - ret[partition.GetCollectionID()] = append(ret[partition.GetCollectionID()], partition) - } - - return ret, nil -} - -// getCollectionsFromV1 recovers collections from 2.1 meta store -func (s metaStore) getPartitionsFromV1() ([]*querypb.PartitionLoadInfo, error) { - _, collectionValues, err := s.cli.LoadWithPrefix(CollectionMetaPrefixV1) - if err != nil { - return nil, err - } - ret := make([]*querypb.PartitionLoadInfo, 0, len(collectionValues)) - for _, value := range collectionValues { - collectionInfo := querypb.CollectionInfo{} - err = proto.Unmarshal([]byte(value), &collectionInfo) - if err != nil { - return nil, err - } - if collectionInfo.LoadType != querypb.LoadType_LoadPartition { - continue - } - - for _, partition := range collectionInfo.GetPartitionIDs() { - ret = append(ret, &querypb.PartitionLoadInfo{ - CollectionID: collectionInfo.GetCollectionID(), - PartitionID: partition, - ReplicaNumber: collectionInfo.GetReplicaNumber(), - Status: querypb.LoadStatus_Loaded, - }) - } - } return ret, nil }