mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 01:58:34 +08:00
Protect the QueryCoord meta from stale data migrated from old version (#23412)
Fix #23411 Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
parent
ba84f52119
commit
bed8d6892e
@ -178,9 +178,11 @@ func (job *LoadCollectionJob) Execute() error {
|
|||||||
partitions := lo.Map(lackPartitionIDs, func(partID int64, _ int) *meta.Partition {
|
partitions := lo.Map(lackPartitionIDs, func(partID int64, _ int) *meta.Partition {
|
||||||
return &meta.Partition{
|
return &meta.Partition{
|
||||||
PartitionLoadInfo: &querypb.PartitionLoadInfo{
|
PartitionLoadInfo: &querypb.PartitionLoadInfo{
|
||||||
CollectionID: req.GetCollectionID(),
|
CollectionID: req.GetCollectionID(),
|
||||||
PartitionID: partID,
|
PartitionID: partID,
|
||||||
Status: querypb.LoadStatus_Loading,
|
ReplicaNumber: req.GetReplicaNumber(),
|
||||||
|
Status: querypb.LoadStatus_Loading,
|
||||||
|
FieldIndexID: req.GetFieldIndexID(),
|
||||||
},
|
},
|
||||||
CreatedAt: time.Now(),
|
CreatedAt: time.Now(),
|
||||||
}
|
}
|
||||||
@ -352,9 +354,11 @@ func (job *LoadPartitionJob) Execute() error {
|
|||||||
partitions := lo.Map(lackPartitionIDs, func(partID int64, _ int) *meta.Partition {
|
partitions := lo.Map(lackPartitionIDs, func(partID int64, _ int) *meta.Partition {
|
||||||
return &meta.Partition{
|
return &meta.Partition{
|
||||||
PartitionLoadInfo: &querypb.PartitionLoadInfo{
|
PartitionLoadInfo: &querypb.PartitionLoadInfo{
|
||||||
CollectionID: req.GetCollectionID(),
|
CollectionID: req.GetCollectionID(),
|
||||||
PartitionID: partID,
|
PartitionID: partID,
|
||||||
Status: querypb.LoadStatus_Loading,
|
ReplicaNumber: req.GetReplicaNumber(),
|
||||||
|
Status: querypb.LoadStatus_Loading,
|
||||||
|
FieldIndexID: req.GetFieldIndexID(),
|
||||||
},
|
},
|
||||||
CreatedAt: time.Now(),
|
CreatedAt: time.Now(),
|
||||||
}
|
}
|
||||||
|
|||||||
@ -127,7 +127,12 @@ func (m *CollectionManager) Recover(broker Broker) error {
|
|||||||
|
|
||||||
for _, collection := range collections {
|
for _, collection := range collections {
|
||||||
// Collections not loaded done should be deprecated
|
// Collections not loaded done should be deprecated
|
||||||
if collection.GetStatus() != querypb.LoadStatus_Loaded {
|
if collection.GetStatus() != querypb.LoadStatus_Loaded || collection.GetReplicaNumber() <= 0 {
|
||||||
|
log.Info("skip recovery and release collection",
|
||||||
|
zap.Int64("collectionID", collection.GetCollectionID()),
|
||||||
|
zap.String("status", collection.GetStatus().String()),
|
||||||
|
zap.Int32("replicaNumber", collection.GetReplicaNumber()),
|
||||||
|
)
|
||||||
m.store.ReleaseCollection(collection.GetCollectionID())
|
m.store.ReleaseCollection(collection.GetCollectionID())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -137,16 +142,29 @@ func (m *CollectionManager) Recover(broker Broker) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for collection, partitions := range partitions {
|
for collection, partitions := range partitions {
|
||||||
|
sawLoaded := false
|
||||||
for _, partition := range partitions {
|
for _, partition := range partitions {
|
||||||
// Partitions not loaded done should be deprecated
|
// Partitions not loaded done should be deprecated
|
||||||
if partition.GetStatus() != querypb.LoadStatus_Loaded {
|
if partition.GetStatus() != querypb.LoadStatus_Loaded || partition.GetReplicaNumber() <= 0 {
|
||||||
|
log.Info("skip recovery and release partition",
|
||||||
|
zap.Int64("collectionID", collection),
|
||||||
|
zap.Int64("partitionID", partition.GetPartitionID()),
|
||||||
|
zap.String("status", partition.GetStatus().String()),
|
||||||
|
zap.Int32("replicaNumber", partition.GetReplicaNumber()),
|
||||||
|
)
|
||||||
m.store.ReleasePartition(collection, partition.GetPartitionID())
|
m.store.ReleasePartition(collection, partition.GetPartitionID())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sawLoaded = true
|
||||||
m.partitions[partition.PartitionID] = &Partition{
|
m.partitions[partition.PartitionID] = &Partition{
|
||||||
PartitionLoadInfo: partition,
|
PartitionLoadInfo: partition,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !sawLoaded {
|
||||||
|
m.store.ReleaseCollection(collection)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = m.upgradeRecover(broker)
|
err = m.upgradeRecover(broker)
|
||||||
@ -170,9 +188,11 @@ func (m *CollectionManager) upgradeRecover(broker Broker) error {
|
|||||||
partitions := lo.Map(partitionIDs, func(partitionID int64, _ int) *Partition {
|
partitions := lo.Map(partitionIDs, func(partitionID int64, _ int) *Partition {
|
||||||
return &Partition{
|
return &Partition{
|
||||||
PartitionLoadInfo: &querypb.PartitionLoadInfo{
|
PartitionLoadInfo: &querypb.PartitionLoadInfo{
|
||||||
CollectionID: collection.GetCollectionID(),
|
CollectionID: collection.GetCollectionID(),
|
||||||
PartitionID: partitionID,
|
PartitionID: partitionID,
|
||||||
Status: querypb.LoadStatus_Loaded,
|
ReplicaNumber: collection.GetReplicaNumber(),
|
||||||
|
Status: querypb.LoadStatus_Loaded,
|
||||||
|
FieldIndexID: collection.GetFieldIndexID(),
|
||||||
},
|
},
|
||||||
LoadPercentage: 100,
|
LoadPercentage: 100,
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user