fix search timeout error (#6027)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2021-06-23 18:04:22 +08:00 committed by GitHub
parent 0f89f40488
commit dca24f313a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -469,39 +469,49 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
zap.Any("collectionID", r.req.CollectionID), zap.Any("collectionID", r.req.CollectionID),
zap.Any("partitionIDs", r.req.PartitionIDs)) zap.Any("partitionIDs", r.req.PartitionIDs))
hCol, err := r.node.historical.replica.getCollectionByID(r.req.CollectionID) const gracefulReleaseTime = 1
if err != nil { func() { // release synchronously
log.Error(err.Error()) errMsg := "release partitions failed, collectionID = " + strconv.FormatInt(r.req.CollectionID, 10) + ", err = "
return err time.Sleep(gracefulReleaseTime * time.Second)
}
sCol, err := r.node.streaming.replica.getCollectionByID(r.req.CollectionID) hCol, err := r.node.historical.replica.getCollectionByID(r.req.CollectionID)
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(errMsg + err.Error())
return err return
}
for _, id := range r.req.PartitionIDs {
r.node.streaming.dataSyncService.removePartitionFlowGraph(id)
hasPartitionInHistorical := r.node.historical.replica.hasPartition(id)
if hasPartitionInHistorical {
err := r.node.historical.replica.removePartition(id)
if err != nil {
// not return, try to release all partitions
log.Error(err.Error())
}
} }
hCol.addReleasedPartition(id)
hasPartitionInStreaming := r.node.streaming.replica.hasPartition(id) sCol, err := r.node.streaming.replica.getCollectionByID(r.req.CollectionID)
if hasPartitionInStreaming { if err != nil {
err := r.node.streaming.replica.removePartition(id) log.Error(errMsg + err.Error())
if err != nil { return
log.Error(err.Error())
}
} }
sCol.addReleasedPartition(id)
} for _, id := range r.req.PartitionIDs {
r.node.streaming.dataSyncService.removePartitionFlowGraph(id)
hasPartitionInHistorical := r.node.historical.replica.hasPartition(id)
if hasPartitionInHistorical {
err = r.node.historical.replica.removePartition(id)
if err != nil {
// not return, try to release all partitions
log.Error(errMsg + err.Error())
}
}
hCol.addReleasedPartition(id)
hasPartitionInStreaming := r.node.streaming.replica.hasPartition(id)
if hasPartitionInStreaming {
err = r.node.streaming.replica.removePartition(id)
if err != nil {
log.Error(errMsg + err.Error())
}
}
sCol.addReleasedPartition(id)
}
log.Debug("release partition task done",
zap.Any("collectionID", r.req.CollectionID),
zap.Any("partitionIDs", r.req.PartitionIDs))
}()
log.Debug("release partition task done", log.Debug("release partition task done",
zap.Any("collectionID", r.req.CollectionID), zap.Any("collectionID", r.req.CollectionID),