diff --git a/internal/querynode/historical.go b/internal/querynode/historical.go index 846705158f..d0e513ecae 100644 --- a/internal/querynode/historical.go +++ b/internal/querynode/historical.go @@ -276,20 +276,39 @@ func (h *historical) search(searchReqs []*searchRequest, collID UniqueID, partID if err != nil { return searchResults, searchSegmentIDs, err } + + var err2 error + var segmentLock sync.RWMutex + var wg sync.WaitGroup for _, segID := range segIDs { - seg, err := h.replica.getSegmentByID(segID) - if err != nil { - return searchResults, searchSegmentIDs, err - } - if !seg.getOnService() { - continue - } - searchResult, err := seg.search(plan, searchReqs, []Timestamp{searchTs}) - if err != nil { - return searchResults, searchSegmentIDs, err - } - searchResults = append(searchResults, searchResult) - searchSegmentIDs = append(searchSegmentIDs, seg.segmentID) + segID2 := segID + wg.Add(1) + go func() { + defer wg.Done() + seg, err := h.replica.getSegmentByID(segID2) + if err != nil { + err2 = err + return + } + if !seg.getOnService() { + return + } + searchResult, err := seg.search(plan, searchReqs, []Timestamp{searchTs}) + if err != nil { + err2 = err + return + } + + segmentLock.Lock() + searchResults = append(searchResults, searchResult) + searchSegmentIDs = append(searchSegmentIDs, seg.segmentID) + segmentLock.Unlock() + }() + + } + wg.Wait() + if err2 != nil { + return searchResults, searchSegmentIDs, err2 } } diff --git a/internal/querynode/streaming.go b/internal/querynode/streaming.go index fe979da5f3..ef4dfea3e5 100644 --- a/internal/querynode/streaming.go +++ b/internal/querynode/streaming.go @@ -15,6 +15,7 @@ import ( "context" "errors" "fmt" + "sync" "go.uber.org/zap" @@ -174,38 +175,53 @@ func (s *streaming) search(searchReqs []*searchRequest, collID UniqueID, partIDs log.Warn(err.Error()) return searchResults, err } + + var err2 error + var wg sync.WaitGroup for _, segID := range segIDs { - seg, err := s.replica.getSegmentByID(segID) - if err != nil { - log.Warn(err.Error()) - return searchResults, err - } + segID2 := segID + wg.Add(1) + go func() { + defer wg.Done() + seg, err := s.replica.getSegmentByID(segID2) + if err != nil { + log.Warn(err.Error()) + err2 = err + return + } - // TSafe less than searchTs means this vChannel is not available - //ts := s.tSafeReplica.getTSafe(seg.vChannelID) - //gracefulTimeInMilliSecond := Params.GracefulTime - //if gracefulTimeInMilliSecond > 0 { - // gracefulTime := tsoutil.ComposeTS(gracefulTimeInMilliSecond, 0) - // ts += gracefulTime - //} - //tsp, _ := tsoutil.ParseTS(ts) - //stp, _ := tsoutil.ParseTS(searchTs) - //log.Debug("timestamp check in streaming search", - // zap.Any("collectionID", collID), - // zap.Any("serviceTime_l", ts), - // zap.Any("searchTime_l", searchTs), - // zap.Any("serviceTime_p", tsp), - // zap.Any("searchTime_p", stp), - //) - //if ts < searchTs { - // continue - //} + // TSafe less than searchTs means this vChannel is not available + //ts := s.tSafeReplica.getTSafe(seg.vChannelID) + //gracefulTimeInMilliSecond := Params.GracefulTime + //if gracefulTimeInMilliSecond > 0 { + // gracefulTime := tsoutil.ComposeTS(gracefulTimeInMilliSecond, 0) + // ts += gracefulTime + //} + //tsp, _ := tsoutil.ParseTS(ts) + //stp, _ := tsoutil.ParseTS(searchTs) + //log.Debug("timestamp check in streaming search", + // zap.Any("collectionID", collID), + // zap.Any("serviceTime_l", ts), + // zap.Any("searchTime_l", searchTs), + // zap.Any("serviceTime_p", tsp), + // zap.Any("searchTime_p", stp), + //) + //if ts < searchTs { + // continue + //} - searchResult, err := seg.search(plan, searchReqs, []Timestamp{searchTs}) - if err != nil { - return searchResults, err - } - searchResults = append(searchResults, searchResult) + searchResult, err := seg.search(plan, searchReqs, []Timestamp{searchTs}) + if err != nil { + err2 = err + return + } + searchResults = append(searchResults, searchResult) + }() + + } + wg.Wait() + if err2 != nil { + return searchResults, err2 } }