Remove collect query result by mq (#15988)

Signed-off-by: yun.zhang <yun.zhang@zilliz.com>
This commit is contained in:
jaime 2022-03-16 10:19:21 +08:00 committed by GitHub
parent f3eeecf146
commit cf4c014b2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -21,7 +21,6 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/util/typeutil"
@ -32,7 +31,6 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/trace"
@ -812,212 +810,6 @@ func (sched *taskScheduler) collectionResultLoopV2() {
}
}
func (sched *taskScheduler) collectResultLoop() {
defer sched.wg.Done()
queryResultMsgStream, _ := sched.msFactory.NewQueryMsgStream(sched.ctx)
// proxy didn't need to walk through all the search results in channel, because it no longer has client connections.
consumeSubName := fmt.Sprintf("%s-%d", Params.CommonCfg.ProxySubName, Params.ProxyCfg.ProxyID)
queryResultMsgStream.AsConsumerWithPosition(Params.ProxyCfg.SearchResultChannelNames, consumeSubName, mqwrapper.SubscriptionPositionLatest)
log.Debug("Proxy", zap.Strings("SearchResultChannelNames", Params.ProxyCfg.SearchResultChannelNames),
zap.Any("consumeSubName", consumeSubName))
queryResultMsgStream.Start()
defer queryResultMsgStream.Close()
searchResultBufs := make(map[UniqueID]*searchResultBuf)
searchResultBufFlags := newIDCache(Params.ProxyCfg.BufFlagExpireTime, Params.ProxyCfg.BufFlagCleanupInterval) // if value is true, we can ignore searchResult
queryResultBufs := make(map[UniqueID]*queryResultBuf)
queryResultBufFlags := newIDCache(Params.ProxyCfg.BufFlagExpireTime, Params.ProxyCfg.BufFlagCleanupInterval) // if value is true, we can ignore queryResult
for {
select {
case msgPack, ok := <-queryResultMsgStream.Chan():
if !ok {
log.Debug("Proxy collectResultLoop exit Chan closed")
return
}
if msgPack == nil {
continue
}
for _, tsMsg := range msgPack.Msgs {
sp, ctx := trace.StartSpanFromContext(tsMsg.TraceCtx())
tsMsg.SetTraceCtx(ctx)
if searchResultMsg, srOk := tsMsg.(*msgstream.SearchResultMsg); srOk {
reqID := searchResultMsg.Base.MsgID
reqIDStr := strconv.FormatInt(reqID, 10)
ignoreThisResult, ok := searchResultBufFlags.Get(reqID)
if !ok {
searchResultBufFlags.Set(reqID, false)
ignoreThisResult = false
}
if ignoreThisResult {
log.Debug("Proxy collectResultLoop Got a SearchResultMsg, but we should ignore", zap.Any("ReqID", reqID))
continue
}
t := sched.getTaskByReqID(reqID)
log.Debug("Proxy collectResultLoop Got a SearchResultMsg", zap.Any("ReqID", reqID))
if t == nil {
log.Debug("Proxy collectResultLoop GetTaskByReqID failed", zap.String("reqID", reqIDStr))
delete(searchResultBufs, reqID)
searchResultBufFlags.Set(reqID, true)
continue
}
st, ok := t.(*searchTask)
if !ok {
log.Debug("Proxy collectResultLoop type assert t as searchTask failed", zap.Any("ReqID", reqID))
delete(searchResultBufs, reqID)
searchResultBufFlags.Set(reqID, true)
continue
}
resultBuf, ok := searchResultBufs[reqID]
if !ok {
resultBuf = newSearchResultBuf(reqID)
vchans, err := st.getVChannels()
log.Debug("Proxy collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("vchans", vchans),
zap.Error(err))
if err != nil {
delete(searchResultBufs, reqID)
continue
}
for _, vchan := range vchans {
resultBuf.usedVChans[vchan] = struct{}{}
}
pchans, err := st.getChannels()
log.Debug("Proxy collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("pchans", pchans),
zap.Error(err))
if err != nil {
delete(searchResultBufs, reqID)
continue
}
searchResultBufs[reqID] = resultBuf
}
resultBuf.addPartialResult(&searchResultMsg.SearchResults)
//t := sched.getTaskByReqID(reqID)
{
colName := t.(*searchTask).query.CollectionName
log.Debug("Proxy collectResultLoop", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(searchResultBufs[reqID].resultBuf)))
}
if resultBuf.readyToReduce() {
log.Debug("Proxy collectResultLoop readyToReduce and assign to reduce")
searchResultBufFlags.Set(reqID, true)
st.resultBuf <- resultBuf.resultBuf
delete(searchResultBufs, reqID)
}
sp.Finish()
}
if queryResultMsg, rtOk := tsMsg.(*msgstream.RetrieveResultMsg); rtOk {
//reqID := retrieveResultMsg.Base.MsgID
//reqIDStr := strconv.FormatInt(reqID, 10)
//t := sched.getTaskByReqID(reqID)
//if t == nil {
// log.Debug("proxy", zap.String("RetrieveResult GetTaskByReqID failed, reqID = ", reqIDStr))
// delete(queryResultBufs, reqID)
// continue
//}
//
//_, ok = queryResultBufs[reqID]
//if !ok {
// queryResultBufs[reqID] = make([]*internalpb.RetrieveResults, 0)
//}
//queryResultBufs[reqID] = append(queryResultBufs[reqID], &retrieveResultMsg.RetrieveResults)
//
//{
// colName := t.(*RetrieveTask).retrieve.CollectionName
// log.Debug("Getcollection", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(queryResultBufs[reqID])))
//}
//if len(queryResultBufs[reqID]) == queryNodeNum {
// t := sched.getTaskByReqID(reqID)
// if t != nil {
// rt, ok := t.(*RetrieveTask)
// if ok {
// rt.resultBuf <- queryResultBufs[reqID]
// delete(queryResultBufs, reqID)
// }
// } else {
// }
//}
reqID := queryResultMsg.Base.MsgID
reqIDStr := strconv.FormatInt(reqID, 10)
ignoreThisResult, ok := queryResultBufFlags.Get(reqID)
if !ok {
queryResultBufFlags.Set(reqID, false)
ignoreThisResult = false
}
if ignoreThisResult {
log.Debug("Proxy collectResultLoop Got a queryResultMsg, but we should ignore", zap.Any("ReqID", reqID))
continue
}
t := sched.getTaskByReqID(reqID)
log.Debug("Proxy collectResultLoop Got a queryResultMsg", zap.Any("ReqID", reqID))
if t == nil {
log.Debug("Proxy collectResultLoop GetTaskByReqID failed", zap.String("reqID", reqIDStr))
delete(queryResultBufs, reqID)
queryResultBufFlags.Set(reqID, true)
continue
}
st, ok := t.(*queryTask)
if !ok {
log.Debug("Proxy collectResultLoop type assert t as queryTask failed")
delete(queryResultBufs, reqID)
queryResultBufFlags.Set(reqID, true)
continue
}
resultBuf, ok := queryResultBufs[reqID]
if !ok {
resultBuf = newQueryResultBuf(reqID)
vchans, err := st.getVChannels()
log.Debug("Proxy collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("vchans", vchans),
zap.Error(err))
if err != nil {
delete(queryResultBufs, reqID)
continue
}
for _, vchan := range vchans {
resultBuf.usedVChans[vchan] = struct{}{}
}
pchans, err := st.getChannels()
log.Debug("Proxy collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("pchans", pchans),
zap.Error(err))
if err != nil {
delete(queryResultBufs, reqID)
continue
}
queryResultBufs[reqID] = resultBuf
}
resultBuf.addPartialResult(&queryResultMsg.RetrieveResults)
//t := sched.getTaskByReqID(reqID)
{
colName := t.(*queryTask).query.CollectionName
log.Debug("Proxy collectResultLoop", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(queryResultBufs[reqID].resultBuf)))
}
if resultBuf.readyToReduce() {
log.Debug("Proxy collectResultLoop readyToReduce and assign to reduce")
queryResultBufFlags.Set(reqID, true)
st.resultBuf <- resultBuf.resultBuf
delete(queryResultBufs, reqID)
}
sp.Finish()
}
}
case <-sched.ctx.Done():
log.Debug("Proxy collectResultLoop is closed ...")
return
}
}
}
func (sched *taskScheduler) Start() error {
sched.wg.Add(1)
go sched.definitionLoop()