From 0d0c1901dd8b638ecfaeb434119b3ffab16c4fe4 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Fri, 25 Jun 2021 20:18:09 +0800 Subject: [PATCH] Fix service time check of search in streaming (#6068) Signed-off-by: bigsheeper --- internal/querynode/query_collection.go | 10 ++++++--- internal/querynode/streaming.go | 30 +++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index 53518a11fe..78bdde4121 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -384,9 +384,9 @@ func (q *queryCollection) receiveSearch(msg *msgstream.SearchMsg) { } serviceTime := q.getServiceableTime() + bt, _ := tsoutil.ParseTS(msg.BeginTs()) + st, _ := tsoutil.ParseTS(serviceTime) if msg.BeginTs() > serviceTime { - bt, _ := tsoutil.ParseTS(msg.BeginTs()) - st, _ := tsoutil.ParseTS(serviceTime) log.Debug("query node::receiveSearchMsg: add to unsolvedMsg", zap.Any("collectionID", q.collectionID), zap.Any("sm.BeginTs", bt), @@ -407,6 +407,10 @@ func (q *queryCollection) receiveSearch(msg *msgstream.SearchMsg) { log.Debug("doing search in receiveSearchMsg...", zap.Int64("collectionID", msg.CollectionID), zap.Int64("msgID", msg.ID()), + zap.Any("serviceTime_l", serviceTime), + zap.Any("searchTime_l", msg.BeginTs()), + zap.Any("serviceTime_p", st), + zap.Any("searchTime_p", bt), ) err = q.search(msg) if err != nil { @@ -730,7 +734,7 @@ func (q *queryCollection) search(searchMsg *msgstream.SearchMsg) error { sp, ctx := trace.StartSpanFromContext(searchMsg.TraceCtx()) defer sp.Finish() searchMsg.SetTraceCtx(ctx) - searchTimestamp := searchMsg.SearchRequest.TravelTimestamp + searchTimestamp := searchMsg.BeginTs() collectionID := searchMsg.CollectionID collection, err := q.streaming.replica.getCollectionByID(collectionID) diff --git a/internal/querynode/streaming.go b/internal/querynode/streaming.go index f5e0869274..08599346f1 100644 --- a/internal/querynode/streaming.go +++ b/internal/querynode/streaming.go @@ -16,11 +16,12 @@ import ( "errors" "fmt" - etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "go.uber.org/zap" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" + "github.com/milvus-io/milvus/internal/util/tsoutil" ) type streaming struct { @@ -128,19 +129,46 @@ func (s *streaming) search(searchReqs []*searchRequest, zap.Any("searchPartitionIDs", searchPartIDs), ) + log.Debug("print streaming replica when searching...", + zap.Any("collectionID", collID), + ) + s.replica.printReplica() + for _, partID := range searchPartIDs { segIDs, err := s.replica.getSegmentIDsByVChannel(partID, vChannel) + log.Debug("get segmentIDs by vChannel", + zap.Any("collectionID", collID), + zap.Any("vChannel", vChannel), + zap.Any("partitionID", partID), + zap.Any("segmentIDs", segIDs), + ) if err != nil { + log.Error(err.Error()) return searchResults, segmentResults, err } for _, segID := range segIDs { seg, err := s.replica.getSegmentByID(segID) if err != nil { + log.Error(err.Error()) return searchResults, segmentResults, err } // TSafe less than searchTs means this vChannel is not available ts := s.tSafeReplica.getTSafe(seg.vChannelID) + gracefulTimeInMilliSecond := Params.GracefulTime + if gracefulTimeInMilliSecond > 0 { + gracefulTime := tsoutil.ComposeTS(gracefulTimeInMilliSecond, 0) + ts += gracefulTime + } + tsp, _ := tsoutil.ParseTS(ts) + stp, _ := tsoutil.ParseTS(searchTs) + log.Debug("timestamp check in streaming search", + zap.Any("collectionID", collID), + zap.Any("serviceTime_l", ts), + zap.Any("searchTime_l", searchTs), + zap.Any("serviceTime_p", tsp), + zap.Any("searchTime_p", stp), + ) if ts < searchTs { continue }