diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index 52fcf001b7..f142ced68f 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -46,8 +46,8 @@ type ParamTable struct { MinioBucketName string // search - SearchChannelName string - SearchResultChannelName string + SearchChannelNames []string + SearchResultChannelNames []string SearchReceiveBufSize int64 SearchPulsarBufSize int64 SearchResultReceiveBufSize int64 diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index f2a36b9231..5005c667dc 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -141,9 +141,9 @@ func (node *QueryNode) Init() error { case "TimeTickChannelName": Params.QueryTimeTickChannelName = kv.Value case "SearchChannelName": - Params.SearchChannelName = kv.Value + Params.SearchChannelNames = append(Params.SearchChannelNames, kv.Value) case "SearchResultChannelName": - Params.SearchResultChannelName = kv.Value + Params.SearchResultChannelNames = append(Params.SearchResultChannelNames, kv.Value) default: return fmt.Errorf("Invalid key: %v", kv.Key) } diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index 3341500e53..8359ae8e15 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -40,13 +40,13 @@ func newSearchService(ctx context.Context, replica ReplicaInterface, factory msg searchStream, _ := factory.NewQueryMsgStream(ctx) searchResultStream, _ := factory.NewQueryMsgStream(ctx) - if Params.SearchChannelName != "" && Params.SearchResultChannelName != "" { + if len(Params.SearchChannelNames) > 0 && len(Params.SearchResultChannelNames) > 0 { // query node need to consumer search channels and produce search result channels when init. - consumeChannels := []string{Params.SearchChannelName} + consumeChannels := Params.SearchChannelNames consumeSubName := Params.MsgChannelSubName searchStream.AsConsumer(consumeChannels, consumeSubName) log.Debug("query node AsConsumer", zap.Any("searchChannels", consumeChannels), zap.Any("consumeSubName", consumeSubName)) - producerChannels := []string{Params.SearchResultChannelName} + producerChannels := Params.SearchResultChannelNames searchResultStream.AsProducer(producerChannels) log.Debug("query node AsProducer", zap.Any("searchResultChannels", producerChannels)) } diff --git a/internal/queryservice/impl.go b/internal/queryservice/impl.go index 72ccfe2547..4ab6d5c904 100644 --- a/internal/queryservice/impl.go +++ b/internal/queryservice/impl.go @@ -402,8 +402,8 @@ func (qs *QueryService) ReleasePartitions(ctx context.Context, req *querypb.Rele func (qs *QueryService) CreateQueryChannel(ctx context.Context) (*querypb.CreateQueryChannelResponse, error) { channelID := len(qs.queryChannels) - searchPrefix := Params.SearchChannelName - searchResultPrefix := Params.SearchResultChannelName + searchPrefix := Params.SearchChannelPrefix + searchResultPrefix := Params.SearchResultChannelPrefix allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(int64(channelID), 10) allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(int64(channelID), 10) diff --git a/internal/queryservice/param_table.go b/internal/queryservice/param_table.go index b3a634d146..7e8147b79d 100644 --- a/internal/queryservice/param_table.go +++ b/internal/queryservice/param_table.go @@ -42,8 +42,8 @@ type ParamTable struct { RoleName string // search - SearchChannelName string - SearchResultChannelName string + SearchChannelPrefix string + SearchResultChannelPrefix string } var Params ParamTable @@ -74,8 +74,8 @@ func (p *ParamTable) Init() { p.initTimeTickChannelName() p.initQueryServiceAddress() p.initRoleName() - p.initSearchChannelName() - p.initSearchResultChannelName() + p.initSearchChannelPrefix() + p.initSearchResultChannelPrefix() }) } @@ -147,20 +147,20 @@ func (p *ParamTable) initRoleName() { p.RoleName = fmt.Sprintf("%s-%d", "QueryService", p.NodeID) } -func (p *ParamTable) initSearchChannelName() { +func (p *ParamTable) initSearchChannelPrefix() { channelName, err := p.Load("msgChannel.chanNamePrefix.search") if err != nil { log.Error(err.Error()) } - p.SearchChannelName = channelName + p.SearchChannelPrefix = channelName } -func (p *ParamTable) initSearchResultChannelName() { +func (p *ParamTable) initSearchResultChannelPrefix() { channelName, err := p.Load("msgChannel.chanNamePrefix.searchResult") if err != nil { log.Error(err.Error()) } - p.SearchResultChannelName = channelName + p.SearchResultChannelPrefix = channelName }