From e9a8d1c404e07a191d3fdec47a77b50a67f5f7fa Mon Sep 17 00:00:00 2001 From: yukun Date: Wed, 16 Jun 2021 20:15:59 +0800 Subject: [PATCH] Add vChannels in proxy for query results (#5802) * Fix proxynode for new retrieve logic Signed-off-by: fishpenguin * Remove querynodenum from proxynode and querynode Signed-off-by: fishpenguin * Remove QueryNodeIDList from proxy Signed-off-by: fishpenguin Co-authored-by: zhenshan.cao --- configs/milvus.yaml | 3 - docs/developer_guides/chap05_proxy.md | 2 - internal/proxynode/impl.go | 1 + internal/proxynode/paramtable.go | 28 --- internal/proxynode/task.go | 27 +++ internal/proxynode/task_scheduler.go | 245 ++++++++++++++++++------- internal/querynode/param_table.go | 54 +----- internal/querynode/param_table_test.go | 7 +- internal/util/paramtable/paramtable.go | 17 -- 9 files changed, 206 insertions(+), 178 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 6668f76bad..2e5becdf0d 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -10,9 +10,6 @@ # or implied. See the License for the specific language governing permissions and limitations under the License. -nodeID: # will be deprecated later - queryNodeIDList: [1] - etcd: endpoints: - localhost:2379 diff --git a/docs/developer_guides/chap05_proxy.md b/docs/developer_guides/chap05_proxy.md index 929ea45303..d718b5aee7 100644 --- a/docs/developer_guides/chap05_proxy.md +++ b/docs/developer_guides/chap05_proxy.md @@ -383,8 +383,6 @@ type GlobalParamsTable struct { MasterAddress string PulsarAddress string - QueryNodeNum int - QueryNodeIDList []UniqueID ProxyID UniqueID TimeTickInterval time.Duration InsertChannelNames []string diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go index a9febf8dc9..9a9929933d 100644 --- a/internal/proxynode/impl.go +++ b/internal/proxynode/impl.go @@ -1355,6 +1355,7 @@ func (node *ProxyNode) Query(ctx context.Context, request *milvuspb.QueryRequest queryMsgStream: node.queryMsgStream, resultBuf: make(chan []*internalpb.RetrieveResults), retrieve: retrieveRequest, + chMgr: node.chMgr, } err := node.sched.DqQueue.Enqueue(rt) diff --git a/internal/proxynode/paramtable.go b/internal/proxynode/paramtable.go index 4421a8f29c..b506adde4a 100644 --- a/internal/proxynode/paramtable.go +++ b/internal/proxynode/paramtable.go @@ -19,8 +19,6 @@ import ( "sync" "time" - "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/paramtable" ) @@ -43,8 +41,6 @@ type ParamTable struct { MasterAddress string PulsarAddress string - QueryNodeNum int - QueryNodeIDList []UniqueID ProxyID UniqueID TimeTickInterval time.Duration K2SChannelNames []string @@ -85,8 +81,6 @@ func (pt *ParamTable) initParams() { pt.initEtcdEndpoints() pt.initMetaRootPath() pt.initPulsarAddress() - pt.initQueryNodeIDList() - pt.initQueryNodeNum() pt.initTimeTickInterval() pt.initK2SChannelNames() pt.initProxySubName() @@ -110,28 +104,6 @@ func (pt *ParamTable) initPulsarAddress() { pt.PulsarAddress = ret } -func (pt *ParamTable) initQueryNodeNum() { - pt.QueryNodeNum = len(pt.QueryNodeIDList) -} - -func (pt *ParamTable) initQueryNodeIDList() []UniqueID { - queryNodeIDStr, err := pt.Load("nodeID.queryNodeIDList") - if err != nil { - panic(err) - } - var ret []UniqueID - queryNodeIDs := strings.Split(queryNodeIDStr, ",") - for _, i := range queryNodeIDs { - v, err := strconv.Atoi(i) - if err != nil { - log.Error("ProxyNode ParamsTable", zap.String("load QueryNodeID list error", err.Error())) - } - ret = append(ret, UniqueID(v)) - } - pt.QueryNodeIDList = ret - return ret -} - func (pt *ParamTable) initTimeTickInterval() { intervalStr, err := pt.Load("proxyNode.timeTickInterval") if err != nil { diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go index c0ede67bb3..b498d66bb5 100644 --- a/internal/proxynode/task.go +++ b/internal/proxynode/task.go @@ -1466,6 +1466,7 @@ type RetrieveTask struct { resultBuf chan []*internalpb.RetrieveResults result *milvuspb.RetrieveResults retrieve *milvuspb.RetrieveRequest + chMgr channelsMgr } func (rt *RetrieveTask) TraceCtx() context.Context { @@ -1505,6 +1506,32 @@ func (rt *RetrieveTask) OnEnqueue() error { return nil } +func (rt *RetrieveTask) getChannels() ([]pChan, error) { + collID, err := globalMetaCache.GetCollectionID(rt.ctx, rt.retrieve.CollectionName) + if err != nil { + return nil, err + } + + return rt.chMgr.getChannels(collID) +} + +func (rt *RetrieveTask) getVChannels() ([]vChan, error) { + collID, err := globalMetaCache.GetCollectionID(rt.ctx, rt.retrieve.CollectionName) + if err != nil { + return nil, err + } + + _, err = rt.chMgr.getChannels(collID) + if err != nil { + err := rt.chMgr.createDMLMsgStream(collID) + if err != nil { + return nil, err + } + } + + return rt.chMgr.getVChannels(collID) +} + func (rt *RetrieveTask) PreExecute(ctx context.Context) error { rt.Base.MsgType = commonpb.MsgType_Retrieve rt.Base.SourceID = Params.ProxyID diff --git a/internal/proxynode/task_scheduler.go b/internal/proxynode/task_scheduler.go index 462bd78723..34cd2dc4ff 100644 --- a/internal/proxynode/task_scheduler.go +++ b/internal/proxynode/task_scheduler.go @@ -522,25 +522,50 @@ func (sched *TaskScheduler) queryLoop() { } } -type searchResultBuf struct { +type resultBufHeader struct { usedVChans map[interface{}]struct{} // set of vChan usedChans map[interface{}]struct{} // set of Chan todo receivedVChansSet map[interface{}]struct{} // set of vChan receivedSealedSegmentIDsSet map[interface{}]struct{} // set of UniqueID receivedGlobalSegmentIDsSet map[interface{}]struct{} // set of UniqueID - resultBuf []*internalpb.SearchResults haveError bool } +type searchResultBuf struct { + resultBufHeader + resultBuf []*internalpb.SearchResults +} + +type queryResultBuf struct { + resultBufHeader + resultBuf []*internalpb.RetrieveResults +} + func newSearchResultBuf() *searchResultBuf { return &searchResultBuf{ - usedVChans: make(map[interface{}]struct{}), - usedChans: make(map[interface{}]struct{}), - receivedVChansSet: make(map[interface{}]struct{}), - receivedSealedSegmentIDsSet: make(map[interface{}]struct{}), - receivedGlobalSegmentIDsSet: make(map[interface{}]struct{}), - resultBuf: make([]*internalpb.SearchResults, 0), - haveError: false, + resultBufHeader: resultBufHeader{ + usedVChans: make(map[interface{}]struct{}), + usedChans: make(map[interface{}]struct{}), + receivedVChansSet: make(map[interface{}]struct{}), + receivedSealedSegmentIDsSet: make(map[interface{}]struct{}), + receivedGlobalSegmentIDsSet: make(map[interface{}]struct{}), + haveError: false, + }, + resultBuf: make([]*internalpb.SearchResults, 0), + } +} + +func newQueryResultBuf() *queryResultBuf { + return &queryResultBuf{ + resultBufHeader: resultBufHeader{ + usedVChans: make(map[interface{}]struct{}), + usedChans: make(map[interface{}]struct{}), + receivedVChansSet: make(map[interface{}]struct{}), + receivedSealedSegmentIDsSet: make(map[interface{}]struct{}), + receivedGlobalSegmentIDsSet: make(map[interface{}]struct{}), + haveError: false, + }, + resultBuf: make([]*internalpb.RetrieveResults, 0), } } @@ -563,7 +588,7 @@ func setContain(m1, m2 map[interface{}]struct{}) bool { return true } -func (sr *searchResultBuf) readyToReduce() bool { +func (sr *resultBufHeader) readyToReduce() bool { if sr.haveError { log.Debug("ProxyNode searchResultBuf readyToReduce", zap.Any("haveError", true)) return true @@ -613,27 +638,42 @@ func (sr *searchResultBuf) readyToReduce() bool { return ret } +func (sr *resultBufHeader) addPartialResult(vchans []vChan, searchSegIDs, globalSegIDs []UniqueID) { + + for _, vchan := range vchans { + sr.receivedVChansSet[vchan] = struct{}{} + } + + for _, sealedSegment := range searchSegIDs { + sr.receivedSealedSegmentIDsSet[sealedSegment] = struct{}{} + } + + for _, globalSegment := range globalSegIDs { + sr.receivedGlobalSegmentIDsSet[globalSegment] = struct{}{} + } +} + func (sr *searchResultBuf) addPartialResult(result *internalpb.SearchResults) { sr.resultBuf = append(sr.resultBuf, result) if result.Status.ErrorCode != commonpb.ErrorCode_Success { sr.haveError = true return } - - for _, vchan := range result.ChannelIDsSearched { - sr.receivedVChansSet[vchan] = struct{}{} - } - - for _, sealedSegment := range result.SealedSegmentIDsSearched { - sr.receivedSealedSegmentIDsSet[sealedSegment] = struct{}{} - } - - for _, globalSegment := range result.GlobalSealedSegmentIDs { - sr.receivedGlobalSegmentIDsSet[globalSegment] = struct{}{} - } + sr.resultBufHeader.addPartialResult(result.ChannelIDsSearched, result.SealedSegmentIDsSearched, + result.GlobalSealedSegmentIDs) } -func (sched *TaskScheduler) queryResultLoop() { +func (qr *queryResultBuf) addPartialResult(result *internalpb.RetrieveResults) { + qr.resultBuf = append(qr.resultBuf, result) + if result.Status.ErrorCode != commonpb.ErrorCode_Success { + qr.haveError = true + return + } + qr.resultBufHeader.addPartialResult(result.ChannelIDsRetrieved, result.SealedSegmentIDsRetrieved, + result.GlobalSealedSegmentIDs) +} + +func (sched *TaskScheduler) collectResultLoop() { defer sched.wg.Done() queryResultMsgStream, _ := sched.msFactory.NewQueryMsgStream(sched.ctx) @@ -641,20 +681,19 @@ func (sched *TaskScheduler) queryResultLoop() { log.Debug("ProxyNode", zap.Strings("SearchResultChannelNames", Params.SearchResultChannelNames), zap.Any("ProxySubName", Params.ProxySubName)) - queryNodeNum := Params.QueryNodeNum - queryResultMsgStream.Start() defer queryResultMsgStream.Close() - queryResultBuf := make(map[UniqueID]*searchResultBuf) - queryResultBufFlag := make(map[UniqueID]bool) // if value is true, we can ignore queryResult - retrieveResultBuf := make(map[UniqueID][]*internalpb.RetrieveResults) + searchResultBufs := make(map[UniqueID]*searchResultBuf) + searchResultBufFlags := make(map[UniqueID]bool) // if value is true, we can ignore queryResult + queryResultBufs := make(map[UniqueID]*queryResultBuf) + queryResultBufFlags := make(map[UniqueID]bool) // if value is true, we can ignore queryResult for { select { case msgPack, ok := <-queryResultMsgStream.Chan(): if !ok { - log.Debug("ProxyNode queryResultLoop exit Chan closed") + log.Debug("ProxyNode collectResultLoop exit Chan closed") return } if msgPack == nil { @@ -666,110 +705,178 @@ func (sched *TaskScheduler) queryResultLoop() { if searchResultMsg, srOk := tsMsg.(*msgstream.SearchResultMsg); srOk { reqID := searchResultMsg.Base.MsgID reqIDStr := strconv.FormatInt(reqID, 10) - ignoreThisResult, ok := queryResultBufFlag[reqID] + ignoreThisResult, ok := searchResultBufFlags[reqID] if !ok { - queryResultBufFlag[reqID] = false + searchResultBufFlags[reqID] = false ignoreThisResult = false } if ignoreThisResult { - log.Debug("ProxyNode queryResultLoop Got a SearchResultMsg, but we should ignore", zap.Any("ReqID", reqID)) + log.Debug("ProxyNode collectResultLoop Got a SearchResultMsg, but we should ignore", zap.Any("ReqID", reqID)) continue } t := sched.getTaskByReqID(reqID) - log.Debug("ProxyNode queryResultLoop Got a SearchResultMsg", zap.Any("ReqID", reqID), zap.Any("t", t)) + log.Debug("ProxyNode collectResultLoop Got a SearchResultMsg", zap.Any("ReqID", reqID), zap.Any("t", t)) if t == nil { - log.Debug("ProxyNode queryResultLoop GetTaskByReqID failed", zap.String("reqID", reqIDStr)) - delete(queryResultBuf, reqID) - queryResultBufFlag[reqID] = true + log.Debug("ProxyNode collectResultLoop GetTaskByReqID failed", zap.String("reqID", reqIDStr)) + delete(searchResultBufs, reqID) + searchResultBufFlags[reqID] = true continue } st, ok := t.(*SearchTask) if !ok { - log.Debug("ProxyNode queryResultLoop type assert t as SearchTask failed", zap.Any("t", t)) - delete(queryResultBuf, reqID) - queryResultBufFlag[reqID] = true + log.Debug("ProxyNode collectResultLoop type assert t as SearchTask failed", zap.Any("t", t)) + delete(searchResultBufs, reqID) + searchResultBufFlags[reqID] = true continue } - resultBuf, ok := queryResultBuf[reqID] + resultBuf, ok := searchResultBufs[reqID] if !ok { resultBuf = newSearchResultBuf() vchans, err := st.getVChannels() - log.Debug("ProxyNode queryResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("vchans", vchans), + log.Debug("ProxyNode collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("vchans", vchans), zap.Error(err)) if err != nil { - delete(queryResultBuf, reqID) + delete(searchResultBufs, reqID) continue } for _, vchan := range vchans { resultBuf.usedVChans[vchan] = struct{}{} } pchans, err := st.getChannels() - log.Debug("ProxyNode queryResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("pchans", pchans), + log.Debug("ProxyNode collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("pchans", pchans), zap.Error(err)) if err != nil { - delete(queryResultBuf, reqID) + delete(searchResultBufs, reqID) continue } for _, pchan := range pchans { resultBuf.usedChans[pchan] = struct{}{} } - queryResultBuf[reqID] = resultBuf + searchResultBufs[reqID] = resultBuf } resultBuf.addPartialResult(&searchResultMsg.SearchResults) //t := sched.getTaskByReqID(reqID) { colName := t.(*SearchTask).query.CollectionName - log.Debug("ProxyNode queryResultLoop", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(queryResultBuf[reqID].resultBuf))) + log.Debug("ProxyNode collectResultLoop", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(searchResultBufs[reqID].resultBuf))) } if resultBuf.readyToReduce() { - log.Debug("ProxyNode queryResultLoop readyToReduce and assign to reduce") - queryResultBufFlag[reqID] = true + log.Debug("ProxyNode collectResultLoop readyToReduce and assign to reduce") + searchResultBufFlags[reqID] = true st.resultBuf <- resultBuf.resultBuf - delete(queryResultBuf, reqID) + delete(searchResultBufs, reqID) } sp.Finish() } - if retrieveResultMsg, rtOk := tsMsg.(*msgstream.RetrieveResultMsg); rtOk { - reqID := retrieveResultMsg.Base.MsgID + if queryResultMsg, rtOk := tsMsg.(*msgstream.RetrieveResultMsg); rtOk { + //reqID := retrieveResultMsg.Base.MsgID + //reqIDStr := strconv.FormatInt(reqID, 10) + //t := sched.getTaskByReqID(reqID) + //if t == nil { + // log.Debug("proxynode", zap.String("RetrieveResult GetTaskByReqID failed, reqID = ", reqIDStr)) + // delete(queryResultBufs, reqID) + // continue + //} + // + //_, ok = queryResultBufs[reqID] + //if !ok { + // queryResultBufs[reqID] = make([]*internalpb.RetrieveResults, 0) + //} + //queryResultBufs[reqID] = append(queryResultBufs[reqID], &retrieveResultMsg.RetrieveResults) + // + //{ + // colName := t.(*RetrieveTask).retrieve.CollectionName + // log.Debug("Getcollection", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(queryResultBufs[reqID]))) + //} + //if len(queryResultBufs[reqID]) == queryNodeNum { + // t := sched.getTaskByReqID(reqID) + // if t != nil { + // rt, ok := t.(*RetrieveTask) + // if ok { + // rt.resultBuf <- queryResultBufs[reqID] + // delete(queryResultBufs, reqID) + // } + // } else { + // } + //} + + reqID := queryResultMsg.Base.MsgID reqIDStr := strconv.FormatInt(reqID, 10) + ignoreThisResult, ok := queryResultBufFlags[reqID] + if !ok { + queryResultBufFlags[reqID] = false + ignoreThisResult = false + } + if ignoreThisResult { + log.Debug("ProxyNode collectResultLoop Got a queryResultMsg, but we should ignore", zap.Any("ReqID", reqID)) + continue + } t := sched.getTaskByReqID(reqID) + log.Debug("ProxyNode collectResultLoop Got a queryResultMsg", zap.Any("ReqID", reqID), zap.Any("t", t)) if t == nil { - log.Debug("proxynode", zap.String("RetrieveResult GetTaskByReqID failed, reqID = ", reqIDStr)) - delete(retrieveResultBuf, reqID) + log.Debug("ProxyNode collectResultLoop GetTaskByReqID failed", zap.String("reqID", reqIDStr)) + delete(queryResultBufs, reqID) + queryResultBufFlags[reqID] = true continue } - _, ok = retrieveResultBuf[reqID] + st, ok := t.(*RetrieveTask) if !ok { - retrieveResultBuf[reqID] = make([]*internalpb.RetrieveResults, 0) + log.Debug("ProxyNode collectResultLoop type assert t as RetrieveTask failed", zap.Any("t", t)) + delete(queryResultBufs, reqID) + queryResultBufFlags[reqID] = true + continue } - retrieveResultBuf[reqID] = append(retrieveResultBuf[reqID], &retrieveResultMsg.RetrieveResults) + resultBuf, ok := queryResultBufs[reqID] + if !ok { + resultBuf = newQueryResultBuf() + vchans, err := st.getVChannels() + log.Debug("ProxyNode collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("vchans", vchans), + zap.Error(err)) + if err != nil { + delete(queryResultBufs, reqID) + continue + } + for _, vchan := range vchans { + resultBuf.usedVChans[vchan] = struct{}{} + } + pchans, err := st.getChannels() + log.Debug("ProxyNode collectResultLoop, first receive", zap.Any("reqID", reqID), zap.Any("pchans", pchans), + zap.Error(err)) + if err != nil { + delete(queryResultBufs, reqID) + continue + } + for _, pchan := range pchans { + resultBuf.usedChans[pchan] = struct{}{} + } + queryResultBufs[reqID] = resultBuf + } + resultBuf.addPartialResult(&queryResultMsg.RetrieveResults) + + //t := sched.getTaskByReqID(reqID) { colName := t.(*RetrieveTask).retrieve.CollectionName - log.Debug("Getcollection", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(retrieveResultBuf[reqID]))) + log.Debug("ProxyNode collectResultLoop", zap.String("collection name", colName), zap.String("reqID", reqIDStr), zap.Int("answer cnt", len(queryResultBufs[reqID].resultBuf))) } - if len(retrieveResultBuf[reqID]) == queryNodeNum { - t := sched.getTaskByReqID(reqID) - if t != nil { - rt, ok := t.(*RetrieveTask) - if ok { - rt.resultBuf <- retrieveResultBuf[reqID] - delete(retrieveResultBuf, reqID) - } - } else { - } + + if resultBuf.readyToReduce() { + log.Debug("ProxyNode collectResultLoop readyToReduce and assign to reduce") + queryResultBufFlags[reqID] = true + st.resultBuf <- resultBuf.resultBuf + delete(queryResultBufs, reqID) } sp.Finish() } } case <-sched.ctx.Done(): - log.Debug("proxynode server is closed ...") + log.Debug("ProxyNode collectResultLoop is closed ...") return } } @@ -786,7 +893,7 @@ func (sched *TaskScheduler) Start() error { go sched.queryLoop() sched.wg.Add(1) - go sched.queryResultLoop() + go sched.collectResultLoop() return nil } diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index 8710458559..50eb0d3aee 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -13,7 +13,6 @@ package querynode import ( "fmt" - "os" "path" "strconv" "strings" @@ -33,7 +32,6 @@ type ParamTable struct { QueryNodeIP string QueryNodePort int64 QueryNodeID UniqueID - QueryNodeNum int QueryTimeTickChannelName string FlowGraphMaxQueueLength int32 @@ -82,23 +80,6 @@ func (p *ParamTable) Init() { panic(err) } - queryNodeIDStr := os.Getenv("QUERY_NODE_ID") - if queryNodeIDStr == "" { - queryNodeIDList := p.QueryNodeIDList() - if len(queryNodeIDList) <= 0 { - queryNodeIDStr = "0" - } else { - queryNodeIDStr = strconv.Itoa(int(queryNodeIDList[0])) - } - } - - err = p.Save("_queryNodeID", queryNodeIDStr) - if err != nil { - panic(err) - } - - p.initQueryNodeID() - p.initQueryNodeNum() //p.initQueryTimeTickChannelName() p.initMinioEndPoint() @@ -113,7 +94,6 @@ func (p *ParamTable) Init() { p.initGracefulTime() p.initMsgChannelSubName() - p.initSliceIndex() p.initFlowGraphMaxQueueLength() p.initFlowGraphMaxParallelism() @@ -130,22 +110,6 @@ func (p *ParamTable) Init() { } // ---------------------------------------------------------- query node -func (p *ParamTable) initQueryNodeID() { - queryNodeID, err := p.Load("_queryNodeID") - if err != nil { - panic(err) - } - id, err := strconv.Atoi(queryNodeID) - if err != nil { - panic(err) - } - p.QueryNodeID = UniqueID(id) -} - -func (p *ParamTable) initQueryNodeNum() { - p.QueryNodeNum = len(p.QueryNodeIDList()) -} - func (p *ParamTable) initQueryTimeTickChannelName() { ch, err := p.Load("msgChannel.chanNamePrefix.queryTimeTick") if err != nil { @@ -265,11 +229,7 @@ func (p *ParamTable) initMsgChannelSubName() { if err != nil { log.Error(err.Error()) } - queryNodeIDStr, err := p.Load("_QueryNodeID") - if err != nil { - panic(err) - } - p.MsgChannelSubName = name + "-" + queryNodeIDStr + p.MsgChannelSubName = name } func (p *ParamTable) initStatsChannelName() { @@ -280,18 +240,6 @@ func (p *ParamTable) initStatsChannelName() { p.StatsChannelName = channels } -func (p *ParamTable) initSliceIndex() { - queryNodeID := p.QueryNodeID - queryNodeIDList := p.QueryNodeIDList() - for i := 0; i < len(queryNodeIDList); i++ { - if queryNodeID == queryNodeIDList[i] { - p.SliceIndex = i - return - } - } - p.SliceIndex = -1 -} - func (p *ParamTable) initLogCfg() { p.Log = log.Config{} format, err := p.Load("log.format") diff --git a/internal/querynode/param_table_test.go b/internal/querynode/param_table_test.go index 90bb325fbc..338c63c664 100644 --- a/internal/querynode/param_table_test.go +++ b/internal/querynode/param_table_test.go @@ -27,11 +27,6 @@ func TestParamTable_PulsarAddress(t *testing.T) { } func TestParamTable_QueryNode(t *testing.T) { - t.Run("Test id", func(t *testing.T) { - id := Params.QueryNodeID - assert.Contains(t, Params.QueryNodeIDList(), id) - }) - t.Run("Test time tick channel", func(t *testing.T) { ch := Params.QueryTimeTickChannelName assert.Equal(t, ch, "queryTimeTick") @@ -93,7 +88,7 @@ func TestParamTable_flowGraphMaxParallelism(t *testing.T) { func TestParamTable_msgChannelSubName(t *testing.T) { name := Params.MsgChannelSubName - expectName := fmt.Sprintf("queryNode-%d", Params.QueryNodeID) + expectName := "queryNode" assert.Equal(t, expectName, name) } diff --git a/internal/util/paramtable/paramtable.go b/internal/util/paramtable/paramtable.go index a4beecce17..6d5b3909d1 100644 --- a/internal/util/paramtable/paramtable.go +++ b/internal/util/paramtable/paramtable.go @@ -303,23 +303,6 @@ func (gp *BaseTable) ParseInt(key string) int { return value } -func (gp *BaseTable) QueryNodeIDList() []UniqueID { - queryNodeIDStr, err := gp.Load("nodeID.queryNodeIDList") - if err != nil { - panic(err) - } - var ret []UniqueID - queryNodeIDs := strings.Split(queryNodeIDStr, ",") - for _, i := range queryNodeIDs { - v, err := strconv.Atoi(i) - if err != nil { - log.Panicf("load proxy id list error, %s", err.Error()) - } - ret = append(ret, UniqueID(v)) - } - return ret -} - // package methods func ConvertRangeToIntRange(rangeStr, sep string) []int {