From e97936ba1c3b60dd375e2b28031336678d902e3e Mon Sep 17 00:00:00 2001 From: yukun Date: Thu, 10 Jun 2021 14:33:49 +0800 Subject: [PATCH] Fix query hang bug (#5709) Signed-off-by: fishpenguin --- internal/proxynode/impl.go | 4 ++-- internal/querynode/retrieve_collection.go | 9 +++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go index 76ce2fabb0..f91d1ecbfa 100644 --- a/internal/proxynode/impl.go +++ b/internal/proxynode/impl.go @@ -1217,7 +1217,7 @@ func (node *ProxyNode) Retrieve(ctx context.Context, request *milvuspb.RetrieveR zap.String("db", request.DbName), zap.String("collection", request.CollectionName), zap.Any("partitions", request.PartitionNames), - zap.Any("len(Ids)", len(request.Ids.IdField.(*schemapb.IDs_IntId).IntId.Data))) + zap.Any("len(Ids)", len(rt.result.Ids.IdField.(*schemapb.IDs_IntId).IntId.Data))) }() err = rt.WaitToFinish() @@ -1368,7 +1368,7 @@ func (node *ProxyNode) Query(ctx context.Context, request *milvuspb.QueryRequest zap.String("db", retrieveRequest.DbName), zap.String("collection", retrieveRequest.CollectionName), zap.Any("partitions", retrieveRequest.PartitionNames), - zap.Any("len(Ids)", len(retrieveRequest.Ids.IdField.(*schemapb.IDs_IntId).IntId.Data))) + zap.Any("len(Ids)", len(rt.result.Ids.IdField.(*schemapb.IDs_IntId).IntId.Data))) }() err = rt.WaitToFinish() diff --git a/internal/querynode/retrieve_collection.go b/internal/querynode/retrieve_collection.go index 295728339c..dc1d14e27a 100644 --- a/internal/querynode/retrieve_collection.go +++ b/internal/querynode/retrieve_collection.go @@ -94,6 +94,12 @@ func (rc *retrieveCollection) getServiceableTime() Timestamp { func (rc *retrieveCollection) setServiceableTime(t Timestamp) { rc.serviceableTimeMutex.Lock() + defer rc.serviceableTimeMutex.Unlock() + + if t < rc.serviceableTime { + return + } + gracefulTimeInMilliSecond := Params.GracefulTime if gracefulTimeInMilliSecond > 0 { gracefulTime := tsoutil.ComposeTS(gracefulTimeInMilliSecond, 0) @@ -101,7 +107,6 @@ func (rc *retrieveCollection) setServiceableTime(t Timestamp) { } else { rc.serviceableTime = t } - rc.serviceableTimeMutex.Unlock() } func (rc *retrieveCollection) waitNewTSafe() Timestamp { @@ -128,7 +133,7 @@ func (rc *retrieveCollection) start() { func (rc *retrieveCollection) register() { // register tSafe watcher and init watcher select case - collection, err := rc.historicalReplica.getCollectionByID(rc.collectionID) + collection, err := rc.streamingReplica.getCollectionByID(rc.collectionID) if err != nil { log.Error(err.Error()) return