From cf4c014b2efc420d9ebbf21005c713cd9d4fc60a Mon Sep 17 00:00:00 2001 From: jaime Date: Wed, 16 Mar 2022 10:19:21 +0800 Subject: [PATCH] Remove collect query result by mq (#15988) Signed-off-by: yun.zhang --- internal/proxy/task_scheduler.go | 208 ------------------------------- 1 file changed, 208 deletions(-) diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index b88c31e038..fa4e00f963 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -21,7 +21,6 @@ import ( "context" "errors" "fmt" - "strconv" "sync" "github.com/milvus-io/milvus/internal/util/typeutil" @@ -32,7 +31,6 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/mq/msgstream" - "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/trace" @@ -812,212 +810,6 @@ func (sched *taskScheduler) collectionResultLoopV2() { } } -func (sched *taskScheduler) collectResultLoop() { - defer sched.wg.Done() - - queryResultMsgStream, _ := sched.msFactory.NewQueryMsgStream(sched.ctx) - // proxy didn't need to walk through all the search results in channel, because it no longer has client connections. - consumeSubName := fmt.Sprintf("%s-%d", Params.CommonCfg.ProxySubName, Params.ProxyCfg.ProxyID) - queryResultMsgStream.AsConsumerWithPosition(Params.ProxyCfg.SearchResultChannelNames, consumeSubName, mqwrapper.SubscriptionPositionLatest) - log.Debug("Proxy", zap.Strings("SearchResultChannelNames", Params.ProxyCfg.SearchResultChannelNames), - zap.Any("consumeSubName", consumeSubName)) - - queryResultMsgStream.Start() - defer queryResultMsgStream.Close() - - searchResultBufs := make(map[UniqueID]*searchResultBuf) - searchResultBufFlags := newIDCache(Params.ProxyCfg.BufFlagExpireTime, Params.ProxyCfg.BufFlagCleanupInterval) // if value is true, we can ignore searchResult - queryResultBufs := make(map[UniqueID]*queryResultBuf) - queryResultBufFlags := newIDCache(Params.ProxyCfg.BufFlagExpireTime, Params.ProxyCfg.BufFlagCleanupInterval) // if value is true, we can ignore queryResult - - for { - select { - case msgPack, ok := <-queryResultMsgStream.Chan(): - if !ok { - log.Debug("Proxy collectResultLoop exit Chan closed") - return - } - if msgPack == nil { - continue - } - - for _, tsMsg := range msgPack.Msgs { - sp, ctx := trace.StartSpanFromContext(tsMsg.TraceCtx()) - tsMsg.SetTraceCtx(ctx) - if searchResultMsg, srOk := tsMsg.(*msgstream.SearchResultMsg); srOk { - reqID := searchResultMsg.Base.MsgID - reqIDStr := strconv.FormatInt(reqID, 10) - ignoreThisResult, ok := searchResultBufFlags.Get(reqID) - if !ok { - searchResultBufFlags.Set(reqID, false) - ignoreThisResult = false - } - if ignoreThisResult { - log.Debug("Proxy collectResultLoop Got a SearchResultMsg, but we should ignore", zap.Any("ReqID", reqID)) - continue - } - t := sched.getTaskByReqID(reqID) - log.Debug("Proxy collectResultLoop Got a SearchResultMsg", zap.Any("ReqID", reqID)) - if t == nil { - log.Debug("Proxy collectResultLoop GetTaskByReqID failed", zap.String("reqID", reqIDStr)) - delete(searchResultBufs, reqID) - searchResultBufFlags.Set(reqID, true) - continue - } - - st, ok := t.(*searchTask) - if !ok { - log.Debug("Proxy collectResultLoop type assert t as searchTask failed", zap.Any("ReqID", reqID)) - delete(searchResultBufs, reqID) - searchResultBufFlags.Set(reqID, true) - continue - } - - resultBuf, ok := searchResultBufs[reqID] - if !ok { - resultBuf = newSearchResultBuf(reqID) - vchans, err := st.getVChannels() - log.Debug("Proxy collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("vchans", vchans), - zap.Error(err)) - if err != nil { - delete(searchResultBufs, reqID) - continue - } - for _, vchan := range vchans { - resultBuf.usedVChans[vchan] = struct{}{} - } - pchans, err := st.getChannels() - log.Debug("Proxy collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("pchans", pchans), - zap.Error(err)) - if err != nil { - delete(searchResultBufs, reqID) - continue - } - searchResultBufs[reqID] = resultBuf - } - resultBuf.addPartialResult(&searchResultMsg.SearchResults) - - //t := sched.getTaskByReqID(reqID) - { - colName := t.(*searchTask).query.CollectionName - log.Debug("Proxy collectResultLoop", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(searchResultBufs[reqID].resultBuf))) - } - - if resultBuf.readyToReduce() { - log.Debug("Proxy collectResultLoop readyToReduce and assign to reduce") - searchResultBufFlags.Set(reqID, true) - st.resultBuf <- resultBuf.resultBuf - delete(searchResultBufs, reqID) - } - - sp.Finish() - } - if queryResultMsg, rtOk := tsMsg.(*msgstream.RetrieveResultMsg); rtOk { - //reqID := retrieveResultMsg.Base.MsgID - //reqIDStr := strconv.FormatInt(reqID, 10) - //t := sched.getTaskByReqID(reqID) - //if t == nil { - // log.Debug("proxy", zap.String("RetrieveResult GetTaskByReqID failed, reqID = ", reqIDStr)) - // delete(queryResultBufs, reqID) - // continue - //} - // - //_, ok = queryResultBufs[reqID] - //if !ok { - // queryResultBufs[reqID] = make([]*internalpb.RetrieveResults, 0) - //} - //queryResultBufs[reqID] = append(queryResultBufs[reqID], &retrieveResultMsg.RetrieveResults) - // - //{ - // colName := t.(*RetrieveTask).retrieve.CollectionName - // log.Debug("Getcollection", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(queryResultBufs[reqID]))) - //} - //if len(queryResultBufs[reqID]) == queryNodeNum { - // t := sched.getTaskByReqID(reqID) - // if t != nil { - // rt, ok := t.(*RetrieveTask) - // if ok { - // rt.resultBuf <- queryResultBufs[reqID] - // delete(queryResultBufs, reqID) - // } - // } else { - // } - //} - - reqID := queryResultMsg.Base.MsgID - reqIDStr := strconv.FormatInt(reqID, 10) - ignoreThisResult, ok := queryResultBufFlags.Get(reqID) - if !ok { - queryResultBufFlags.Set(reqID, false) - ignoreThisResult = false - } - if ignoreThisResult { - log.Debug("Proxy collectResultLoop Got a queryResultMsg, but we should ignore", zap.Any("ReqID", reqID)) - continue - } - t := sched.getTaskByReqID(reqID) - log.Debug("Proxy collectResultLoop Got a queryResultMsg", zap.Any("ReqID", reqID)) - if t == nil { - log.Debug("Proxy collectResultLoop GetTaskByReqID failed", zap.String("reqID", reqIDStr)) - delete(queryResultBufs, reqID) - queryResultBufFlags.Set(reqID, true) - continue - } - - st, ok := t.(*queryTask) - if !ok { - log.Debug("Proxy collectResultLoop type assert t as queryTask failed") - delete(queryResultBufs, reqID) - queryResultBufFlags.Set(reqID, true) - continue - } - - resultBuf, ok := queryResultBufs[reqID] - if !ok { - resultBuf = newQueryResultBuf(reqID) - vchans, err := st.getVChannels() - log.Debug("Proxy collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("vchans", vchans), - zap.Error(err)) - if err != nil { - delete(queryResultBufs, reqID) - continue - } - for _, vchan := range vchans { - resultBuf.usedVChans[vchan] = struct{}{} - } - pchans, err := st.getChannels() - log.Debug("Proxy collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("pchans", pchans), - zap.Error(err)) - if err != nil { - delete(queryResultBufs, reqID) - continue - } - queryResultBufs[reqID] = resultBuf - } - resultBuf.addPartialResult(&queryResultMsg.RetrieveResults) - - //t := sched.getTaskByReqID(reqID) - { - colName := t.(*queryTask).query.CollectionName - log.Debug("Proxy collectResultLoop", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(queryResultBufs[reqID].resultBuf))) - } - - if resultBuf.readyToReduce() { - log.Debug("Proxy collectResultLoop readyToReduce and assign to reduce") - queryResultBufFlags.Set(reqID, true) - st.resultBuf <- resultBuf.resultBuf - delete(queryResultBufs, reqID) - } - sp.Finish() - } - } - case <-sched.ctx.Done(): - log.Debug("Proxy collectResultLoop is closed ...") - return - } - } -} - func (sched *taskScheduler) Start() error { sched.wg.Add(1) go sched.definitionLoop()