From 0b2add0d7b42ac6e4f2233a05900de21cac81cad Mon Sep 17 00:00:00 2001 From: dragondriver Date: Thu, 19 Nov 2020 18:45:24 +0800 Subject: [PATCH] Fix query result loop logic in Proxy Signed-off-by: dragondriver --- internal/proxy/proxy.go | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 15e43ee293..f0134835cd 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -186,26 +186,27 @@ func (p *Proxy) queryResultLoop() { if msgPack == nil { continue } - tsMsg := msgPack.Msgs[0] - searchResultMsg, _ := tsMsg.(*msgstream.SearchResultMsg) - reqID := searchResultMsg.GetReqID() - _, ok = queryResultBuf[reqID] - if !ok { - queryResultBuf[reqID] = make([]*internalpb.SearchResult, 0) - } - queryResultBuf[reqID] = append(queryResultBuf[reqID], &searchResultMsg.SearchResult) - if len(queryResultBuf[reqID]) == 4 { - // TODO: use the number of query node instead - t := p.taskSch.getTaskByReqID(reqID) - if t != nil { - qt, ok := t.(*QueryTask) - if ok { - log.Printf("address of query task: %p", qt) - qt.resultBuf <- queryResultBuf[reqID] - delete(queryResultBuf, reqID) + for _, tsMsg := range msgPack.Msgs { + searchResultMsg, _ := tsMsg.(*msgstream.SearchResultMsg) + reqID := searchResultMsg.GetReqID() + _, ok = queryResultBuf[reqID] + if !ok { + queryResultBuf[reqID] = make([]*internalpb.SearchResult, 0) + } + queryResultBuf[reqID] = append(queryResultBuf[reqID], &searchResultMsg.SearchResult) + if len(queryResultBuf[reqID]) == 4 { + // TODO: use the number of query node instead + t := p.taskSch.getTaskByReqID(reqID) + if t != nil { + qt, ok := t.(*QueryTask) + if ok { + log.Printf("address of query task: %p", qt) + qt.resultBuf <- queryResultBuf[reqID] + delete(queryResultBuf, reqID) + } + } else { + log.Printf("task with reqID %v is nil", reqID) } - } else { - log.Printf("task with reqID %v is nil", reqID) } } case <-p.proxyLoopCtx.Done():