diff --git a/internal/querynode/search.go b/internal/querynode/search.go index 1d16bd2b01..e885e8dd8f 100644 --- a/internal/querynode/search.go +++ b/internal/querynode/search.go @@ -30,9 +30,9 @@ import ( // searchOnSegments performs search on listed segments // all segment ids are validated before calling this function -func searchOnSegments(ctx context.Context, replica ReplicaInterface, segType segmentType, searchReq *searchRequest, segIDs []UniqueID) ([]*SearchResult, error) { +func searchSegments(ctx context.Context, replica ReplicaInterface, segType segmentType, searchReq *searchRequest, segIDs []UniqueID) ([]*SearchResult, error) { // results variables - searchResults := make([]*SearchResult, len(segIDs)) + resultCh := make(chan *SearchResult, len(segIDs)) errs := make([]error, len(segIDs)) searchLabel := metrics.SealedSegmentLabel if segType == commonpb.SegmentState_Growing { @@ -57,19 +57,27 @@ func searchOnSegments(ctx context.Context, replica ReplicaInterface, segType seg tr := timerecord.NewTimeRecorder("searchOnSegments") searchResult, err := seg.search(searchReq) errs[i] = err - searchResults[i] = searchResult + resultCh <- searchResult // update metrics metrics.QueryNodeSQSegmentLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel, searchLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) }(segID, i) } wg.Wait() + close(resultCh) + + searchResults := make([]*SearchResult, 0, len(segIDs)) + for result := range resultCh { + searchResults = append(searchResults, result) + } + for _, err := range errs { if err != nil { deleteSearchResults(searchResults) return nil, err } } + return searchResults, nil } @@ -86,7 +94,7 @@ func searchHistorical(ctx context.Context, replica ReplicaInterface, searchReq * if err != nil { return searchResults, searchSegmentIDs, searchPartIDs, err } - searchResults, err = searchOnSegments(ctx, replica, segmentTypeSealed, searchReq, searchSegmentIDs) + searchResults, err = searchSegments(ctx, replica, segmentTypeSealed, searchReq, searchSegmentIDs) return searchResults, searchPartIDs, searchSegmentIDs, err } @@ -102,6 +110,6 @@ func searchStreaming(ctx context.Context, replica ReplicaInterface, searchReq *s if err != nil { return searchResults, searchSegmentIDs, searchPartIDs, err } - searchResults, err = searchOnSegments(ctx, replica, segmentTypeGrowing, searchReq, searchSegmentIDs) + searchResults, err = searchSegments(ctx, replica, segmentTypeGrowing, searchReq, searchSegmentIDs) return searchResults, searchPartIDs, searchSegmentIDs, err }