diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index fbd487f14b..2b7fac7572 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -178,9 +178,11 @@ func (job *LoadCollectionJob) Execute() error { partitions := lo.Map(lackPartitionIDs, func(partID int64, _ int) *meta.Partition { return &meta.Partition{ PartitionLoadInfo: &querypb.PartitionLoadInfo{ - CollectionID: req.GetCollectionID(), - PartitionID: partID, - Status: querypb.LoadStatus_Loading, + CollectionID: req.GetCollectionID(), + PartitionID: partID, + ReplicaNumber: req.GetReplicaNumber(), + Status: querypb.LoadStatus_Loading, + FieldIndexID: req.GetFieldIndexID(), }, CreatedAt: time.Now(), } @@ -352,9 +354,11 @@ func (job *LoadPartitionJob) Execute() error { partitions := lo.Map(lackPartitionIDs, func(partID int64, _ int) *meta.Partition { return &meta.Partition{ PartitionLoadInfo: &querypb.PartitionLoadInfo{ - CollectionID: req.GetCollectionID(), - PartitionID: partID, - Status: querypb.LoadStatus_Loading, + CollectionID: req.GetCollectionID(), + PartitionID: partID, + ReplicaNumber: req.GetReplicaNumber(), + Status: querypb.LoadStatus_Loading, + FieldIndexID: req.GetFieldIndexID(), }, CreatedAt: time.Now(), } diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index cdca7a6964..f6194dd952 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -127,7 +127,12 @@ func (m *CollectionManager) Recover(broker Broker) error { for _, collection := range collections { // Collections not loaded done should be deprecated - if collection.GetStatus() != querypb.LoadStatus_Loaded { + if collection.GetStatus() != querypb.LoadStatus_Loaded || collection.GetReplicaNumber() <= 0 { + log.Info("skip recovery and release collection", + zap.Int64("collectionID", collection.GetCollectionID()), + zap.String("status", collection.GetStatus().String()), + zap.Int32("replicaNumber", collection.GetReplicaNumber()), + ) m.store.ReleaseCollection(collection.GetCollectionID()) continue } @@ -137,16 +142,29 @@ func (m *CollectionManager) Recover(broker Broker) error { } for collection, partitions := range partitions { + sawLoaded := false for _, partition := range partitions { // Partitions not loaded done should be deprecated - if partition.GetStatus() != querypb.LoadStatus_Loaded { + if partition.GetStatus() != querypb.LoadStatus_Loaded || partition.GetReplicaNumber() <= 0 { + log.Info("skip recovery and release partition", + zap.Int64("collectionID", collection), + zap.Int64("partitionID", partition.GetPartitionID()), + zap.String("status", partition.GetStatus().String()), + zap.Int32("replicaNumber", partition.GetReplicaNumber()), + ) m.store.ReleasePartition(collection, partition.GetPartitionID()) continue } + + sawLoaded = true m.partitions[partition.PartitionID] = &Partition{ PartitionLoadInfo: partition, } } + + if !sawLoaded { + m.store.ReleaseCollection(collection) + } } err = m.upgradeRecover(broker) @@ -170,9 +188,11 @@ func (m *CollectionManager) upgradeRecover(broker Broker) error { partitions := lo.Map(partitionIDs, func(partitionID int64, _ int) *Partition { return &Partition{ PartitionLoadInfo: &querypb.PartitionLoadInfo{ - CollectionID: collection.GetCollectionID(), - PartitionID: partitionID, - Status: querypb.LoadStatus_Loaded, + CollectionID: collection.GetCollectionID(), + PartitionID: partitionID, + ReplicaNumber: collection.GetReplicaNumber(), + Status: querypb.LoadStatus_Loaded, + FieldIndexID: collection.GetFieldIndexID(), }, LoadPercentage: 100, }