From dadb02db75a320fa9b7ca68d9411aa2f782ff6c8 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Fri, 23 Apr 2021 10:07:45 +0800 Subject: [PATCH] Remove unused params in query node (#4987) Signed-off-by: bigsheeper --- configs/advanced/channel.yaml | 2 - configs/advanced/query_node.yaml | 22 ----- internal/proxynode/paramtable.go | 38 -------- internal/querynode/param_table.go | 112 +--------------------- internal/querynode/param_table_test.go | 42 -------- internal/querynode/query_node.go | 4 - internal/querynode/query_node_test.go | 5 - internal/querynode/search_service.go | 11 +-- internal/querynode/search_service_test.go | 2 +- internal/queryservice/impl.go | 6 +- internal/queryservice/param_table.go | 24 +++++ 11 files changed, 35 insertions(+), 233 deletions(-) diff --git a/configs/advanced/channel.yaml b/configs/advanced/channel.yaml index b99fb4eecf..3e5d137b7d 100644 --- a/configs/advanced/channel.yaml +++ b/configs/advanced/channel.yaml @@ -40,5 +40,3 @@ msgChannel: # default channel range [0, 1) channelRange: k2s: [0, 1] - search: [0, 1] - searchResult: [0, 1] diff --git a/configs/advanced/query_node.yaml b/configs/advanced/query_node.yaml index 33320c851c..8aed4fcd07 100644 --- a/configs/advanced/query_node.yaml +++ b/configs/advanced/query_node.yaml @@ -19,31 +19,9 @@ queryNode: maxParallelism: 1024 msgStream: - insert: - recvBufSize: 1024 # msgPack chan buffer size - pulsarBufSize: 1024 # pulsar chan buffer size - - dataDefinition: - recvBufSize: 64 # msgPack chan buffer size - pulsarBufSize: 64 # pulsar chan buffer size - - delete: - recvBufSize: 1024 # msgPack chan buffer size - pulsarBufSize: 1024 # pulsar chan buffer size - search: recvBufSize: 512 pulsarBufSize: 512 searchResult: recvBufSize: 64 - - stats: - recvBufSize: 64 - - loadIndex: - recvBufSize: 512 - pulsarBufSize: 512 - - timeTick: - recvBufSize: 64 diff --git a/internal/proxynode/paramtable.go b/internal/proxynode/paramtable.go index f2bae81b2c..329c754130 100644 --- a/internal/proxynode/paramtable.go +++ b/internal/proxynode/paramtable.go @@ -146,8 +146,6 @@ func (pt *ParamTable) initParams() { pt.initQueryNodeNum() pt.initTimeTickInterval() pt.initK2SChannelNames() - pt.initSearchChannelNames() - pt.initSearchResultChannelNames() pt.initProxySubName() pt.initProxyTimeTickChannelNames() pt.initMsgStreamInsertBufSize() @@ -225,42 +223,6 @@ func (pt *ParamTable) initK2SChannelNames() { pt.K2SChannelNames = ret } -func (pt *ParamTable) initSearchChannelNames() { - prefix, err := pt.Load("msgChannel.chanNamePrefix.search") - if err != nil { - panic(err) - } - prefix += "-" - sRangeStr, err := pt.Load("msgChannel.channelRange.search") - if err != nil { - panic(err) - } - channelIDs := paramtable.ConvertRangeToIntSlice(sRangeStr, ",") - var ret []string - for _, ID := range channelIDs { - ret = append(ret, prefix+strconv.Itoa(ID)) - } - pt.SearchChannelNames = ret -} - -func (pt *ParamTable) initSearchResultChannelNames() { - prefix, err := pt.Load("msgChannel.chanNamePrefix.searchResult") - if err != nil { - panic(err) - } - prefix += "-" - sRangeStr, err := pt.Load("msgChannel.channelRange.searchResult") - if err != nil { - panic(err) - } - channelIDs := paramtable.ConvertRangeToIntSlice(sRangeStr, ",") - var ret []string - for _, ID := range channelIDs { - ret = append(ret, prefix+strconv.Itoa(ID)) - } - pt.SearchResultChannelNames = ret -} - func (pt *ParamTable) initProxySubName() { prefix, err := pt.Load("msgChannel.subNamePrefix.proxySubNamePrefix") if err != nil { diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index dcf1c8675f..b01f4a67b0 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -29,12 +29,11 @@ type ParamTable struct { ETCDAddress string MetaRootPath string - QueryNodeIP string - QueryNodePort int64 - QueryNodeID UniqueID - QueryNodeNum int - QueryTimeTickChannelName string - QueryTimeTickReceiveBufSize int64 + QueryNodeIP string + QueryNodePort int64 + QueryNodeID UniqueID + QueryNodeNum int + QueryTimeTickChannelName string FlowGraphMaxQueueLength int32 FlowGraphMaxParallelism int32 @@ -46,17 +45,7 @@ type ParamTable struct { MinioUseSSLStr bool MinioBucketName string - // dm - InsertReceiveBufSize int64 - InsertPulsarBufSize int64 - - // dd - DDReceiveBufSize int64 - DDPulsarBufSize int64 - // search - SearchChannelNames []string - SearchResultChannelNames []string SearchReceiveBufSize int64 SearchPulsarBufSize int64 SearchResultReceiveBufSize int64 @@ -64,7 +53,6 @@ type ParamTable struct { // stats StatsPublishInterval int StatsChannelName string - StatsReceiveBufSize int64 GracefulTime int64 MsgChannelSubName string @@ -102,7 +90,6 @@ func (p *ParamTable) Init() { p.initQueryNodeID() p.initQueryNodeNum() //p.initQueryTimeTickChannelName() - p.initQueryTimeTickReceiveBufSize() p.initMinioEndPoint() p.initMinioAccessKeyID() @@ -121,21 +108,12 @@ func (p *ParamTable) Init() { p.initFlowGraphMaxQueueLength() p.initFlowGraphMaxParallelism() - p.initInsertReceiveBufSize() - p.initInsertPulsarBufSize() - - p.initDDReceiveBufSize() - p.initDDPulsarBufSize() - - //p.initSearchChannelNames() - //p.initSearchResultChannelNames() p.initSearchReceiveBufSize() p.initSearchPulsarBufSize() p.initSearchResultReceiveBufSize() p.initStatsPublishInterval() //p.initStatsChannelName() - p.initStatsReceiveBufSize() p.initLogCfg() }) @@ -166,10 +144,6 @@ func (p *ParamTable) initQueryTimeTickChannelName() { p.QueryTimeTickChannelName = ch } -func (p *ParamTable) initQueryTimeTickReceiveBufSize() { - p.QueryTimeTickReceiveBufSize = p.ParseInt64("queryNode.msgStream.timeTick.recvBufSize") -} - // ---------------------------------------------------------- minio func (p *ParamTable) initMinioEndPoint() { url, err := p.Load("_MinioAddress") @@ -239,38 +213,6 @@ func (p *ParamTable) initFlowGraphMaxParallelism() { } // msgStream -func (p *ParamTable) initInsertReceiveBufSize() { - p.InsertReceiveBufSize = p.ParseInt64("queryNode.msgStream.insert.recvBufSize") -} - -func (p *ParamTable) initInsertPulsarBufSize() { - p.InsertPulsarBufSize = p.ParseInt64("queryNode.msgStream.insert.pulsarBufSize") -} - -func (p *ParamTable) initDDReceiveBufSize() { - revBufSize, err := p.Load("queryNode.msgStream.dataDefinition.recvBufSize") - if err != nil { - panic(err) - } - bufSize, err := strconv.Atoi(revBufSize) - if err != nil { - panic(err) - } - p.DDReceiveBufSize = int64(bufSize) -} - -func (p *ParamTable) initDDPulsarBufSize() { - pulsarBufSize, err := p.Load("queryNode.msgStream.dataDefinition.pulsarBufSize") - if err != nil { - panic(err) - } - bufSize, err := strconv.Atoi(pulsarBufSize) - if err != nil { - panic(err) - } - p.DDPulsarBufSize = int64(bufSize) -} - func (p *ParamTable) initSearchReceiveBufSize() { p.SearchReceiveBufSize = p.ParseInt64("queryNode.msgStream.search.recvBufSize") } @@ -283,10 +225,6 @@ func (p *ParamTable) initSearchResultReceiveBufSize() { p.SearchResultReceiveBufSize = p.ParseInt64("queryNode.msgStream.searchResult.recvBufSize") } -func (p *ParamTable) initStatsReceiveBufSize() { - p.StatsReceiveBufSize = p.ParseInt64("queryNode.msgStream.stats.recvBufSize") -} - func (p *ParamTable) initETCDAddress() { ETCDAddress, err := p.Load("_EtcdAddress") if err != nil { @@ -311,46 +249,6 @@ func (p *ParamTable) initGracefulTime() { p.GracefulTime = p.ParseInt64("queryNode.gracefulTime") } -func (p *ParamTable) initSearchChannelNames() { - prefix, err := p.Load("msgChannel.chanNamePrefix.search") - if err != nil { - log.Error(err.Error()) - } - prefix += "-" - channelRange, err := p.Load("msgChannel.channelRange.search") - if err != nil { - panic(err) - } - - channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",") - - var ret []string - for _, ID := range channelIDs { - ret = append(ret, prefix+strconv.Itoa(ID)) - } - p.SearchChannelNames = ret -} - -func (p *ParamTable) initSearchResultChannelNames() { - prefix, err := p.Load("msgChannel.chanNamePrefix.searchResult") - if err != nil { - log.Error(err.Error()) - } - prefix += "-" - channelRange, err := p.Load("msgChannel.channelRange.searchResult") - if err != nil { - panic(err) - } - - channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",") - - var ret []string - for _, ID := range channelIDs { - ret = append(ret, prefix+strconv.Itoa(ID)) - } - p.SearchResultChannelNames = ret -} - func (p *ParamTable) initMsgChannelSubName() { // TODO: subName = namePrefix + "-" + queryNodeID, queryNodeID is assigned by master name, err := p.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix") diff --git a/internal/querynode/param_table_test.go b/internal/querynode/param_table_test.go index 1509c54d23..90bb325fbc 100644 --- a/internal/querynode/param_table_test.go +++ b/internal/querynode/param_table_test.go @@ -36,11 +36,6 @@ func TestParamTable_QueryNode(t *testing.T) { ch := Params.QueryTimeTickChannelName assert.Equal(t, ch, "queryTimeTick") }) - - t.Run("Test time tick ReceiveBufSize", func(t *testing.T) { - size := Params.QueryTimeTickReceiveBufSize - assert.Equal(t, size, int64(64)) - }) } func TestParamTable_minio(t *testing.T) { @@ -71,21 +66,6 @@ func TestParamTable_statsServiceTimeInterval(t *testing.T) { assert.Equal(t, 1000, interval) } -func TestParamTable_statsMsgStreamReceiveBufSize(t *testing.T) { - bufSize := Params.StatsReceiveBufSize - assert.Equal(t, int64(64), bufSize) -} - -func TestParamTable_insertMsgStreamReceiveBufSize(t *testing.T) { - bufSize := Params.InsertReceiveBufSize - assert.Equal(t, int64(1024), bufSize) -} - -func TestParamTable_ddMsgStreamReceiveBufSize(t *testing.T) { - bufSize := Params.DDReceiveBufSize - assert.Equal(t, bufSize, int64(64)) -} - func TestParamTable_searchMsgStreamReceiveBufSize(t *testing.T) { bufSize := Params.SearchReceiveBufSize assert.Equal(t, int64(512), bufSize) @@ -101,16 +81,6 @@ func TestParamTable_searchPulsarBufSize(t *testing.T) { assert.Equal(t, int64(512), bufSize) } -func TestParamTable_insertPulsarBufSize(t *testing.T) { - bufSize := Params.InsertPulsarBufSize - assert.Equal(t, int64(1024), bufSize) -} - -func TestParamTable_ddPulsarBufSize(t *testing.T) { - bufSize := Params.DDPulsarBufSize - assert.Equal(t, bufSize, int64(64)) -} - func TestParamTable_flowGraphMaxQueueLength(t *testing.T) { length := Params.FlowGraphMaxQueueLength assert.Equal(t, int32(1024), length) @@ -121,18 +91,6 @@ func TestParamTable_flowGraphMaxParallelism(t *testing.T) { assert.Equal(t, int32(1024), maxParallelism) } -func TestParamTable_searchChannelNames(t *testing.T) { - names := Params.SearchChannelNames - assert.Equal(t, len(names), 1) - contains := strings.Contains(names[0], "search-0") - assert.Equal(t, contains, true) -} - -func TestParamTable_searchResultChannelNames(t *testing.T) { - names := Params.SearchResultChannelNames - assert.NotNil(t, names) -} - func TestParamTable_msgChannelSubName(t *testing.T) { name := Params.MsgChannelSubName expectName := fmt.Sprintf("queryNode-%d", Params.QueryNodeID) diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 727e6bf200..d09ad2c342 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -140,10 +140,6 @@ func (node *QueryNode) Init() error { Params.StatsChannelName = kv.Value case "TimeTickChannelName": Params.QueryTimeTickChannelName = kv.Value - case "QueryChannelName": - Params.SearchChannelNames = append(Params.SearchChannelNames, kv.Value) - case "QueryResultChannelName": - Params.SearchResultChannelNames = append(Params.SearchResultChannelNames, kv.Value) default: return fmt.Errorf("Invalid key: %v", kv.Key) } diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index fbca9f5044..1fab8b028d 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -44,11 +44,8 @@ func setup() { Params.Init() //Params.QueryNodeID = 1 Params.initQueryTimeTickChannelName() - Params.initSearchResultChannelNames() Params.initStatsChannelName() - Params.initSearchChannelNames() Params.MetaRootPath = "/etcd/test/root/querynode" - } func genTestCollectionMeta(collectionID UniqueID, isBinary bool) *etcdpb.CollectionInfo { @@ -196,8 +193,6 @@ func makeNewChannelNames(names []string, suffix string) []string { func refreshChannelNames() { suffix := "-test-query-node" + strconv.FormatInt(rand.Int63n(1000000), 10) - Params.SearchChannelNames = makeNewChannelNames(Params.SearchChannelNames, suffix) - Params.SearchResultChannelNames = makeNewChannelNames(Params.SearchResultChannelNames, suffix) Params.StatsChannelName = Params.StatsChannelName + suffix } diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index 112227e4e1..14f3c98147 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -20,7 +20,6 @@ import ( "github.com/milvus-io/milvus/internal/util/trace" "go.uber.org/zap" "strconv" - "strings" ) type searchService struct { @@ -38,18 +37,10 @@ type searchService struct { } func newSearchService(ctx context.Context, replica ReplicaInterface, factory msgstream.Factory) *searchService { + // query node doesn't need to consumer any search or search result channel actively. searchStream, _ := factory.NewQueryMsgStream(ctx) searchResultStream, _ := factory.NewQueryMsgStream(ctx) - // query node doesn't need to consumer any search or search result channel actively. - consumeChannels := Params.SearchChannelNames - consumeSubName := Params.MsgChannelSubName - searchStream.AsConsumer(consumeChannels, consumeSubName) - log.Debug("query node AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName) - producerChannels := Params.SearchResultChannelNames - searchResultStream.AsProducer(producerChannels) - log.Debug("query node AsProducer: " + strings.Join(producerChannels, ", ")) - searchServiceCtx, searchServiceCancel := context.WithCancel(ctx) return &searchService{ ctx: searchServiceCtx, diff --git a/internal/querynode/search_service_test.go b/internal/querynode/search_service_test.go index 56d664554d..a711cc5c9f 100644 --- a/internal/querynode/search_service_test.go +++ b/internal/querynode/search_service_test.go @@ -62,7 +62,7 @@ func sendSearchRequest(ctx context.Context, DIM int) error { if err != nil { return err } - searchProducerChannels := Params.SearchChannelNames + searchProducerChannels := []string{"test-query"} searchStream, _ := msFactory.NewMsgStream(ctx) searchStream.AsProducer(searchProducerChannels) diff --git a/internal/queryservice/impl.go b/internal/queryservice/impl.go index ceb4cce279..13905bef8c 100644 --- a/internal/queryservice/impl.go +++ b/internal/queryservice/impl.go @@ -402,8 +402,10 @@ func (qs *QueryService) ReleasePartitions(ctx context.Context, req *querypb.Rele func (qs *QueryService) CreateQueryChannel(ctx context.Context) (*querypb.CreateQueryChannelResponse, error) { channelID := len(qs.queryChannels) - allocatedQueryChannel := "query-" + strconv.FormatInt(int64(channelID), 10) - allocatedQueryResultChannel := "queryResult-" + strconv.FormatInt(int64(channelID), 10) + searchPrefix := Params.SearchChannelName + searchResultPrefix := Params.SearchResultChannelName + allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(int64(channelID), 10) + allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(int64(channelID), 10) qs.qcMutex.Lock() qs.queryChannels = append(qs.queryChannels, &queryChannelInfo{ diff --git a/internal/queryservice/param_table.go b/internal/queryservice/param_table.go index 580b941453..b3a634d146 100644 --- a/internal/queryservice/param_table.go +++ b/internal/queryservice/param_table.go @@ -40,6 +40,10 @@ type ParamTable struct { Log log.Config RoleName string + + // search + SearchChannelName string + SearchResultChannelName string } var Params ParamTable @@ -70,6 +74,8 @@ func (p *ParamTable) Init() { p.initTimeTickChannelName() p.initQueryServiceAddress() p.initRoleName() + p.initSearchChannelName() + p.initSearchResultChannelName() }) } @@ -140,3 +146,21 @@ func (p *ParamTable) initQueryServiceAddress() { func (p *ParamTable) initRoleName() { p.RoleName = fmt.Sprintf("%s-%d", "QueryService", p.NodeID) } + +func (p *ParamTable) initSearchChannelName() { + channelName, err := p.Load("msgChannel.chanNamePrefix.search") + if err != nil { + log.Error(err.Error()) + } + + p.SearchChannelName = channelName +} + +func (p *ParamTable) initSearchResultChannelName() { + channelName, err := p.Load("msgChannel.chanNamePrefix.searchResult") + if err != nil { + log.Error(err.Error()) + } + + p.SearchResultChannelName = channelName +}