Add search channels to query node (#5012)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2021-04-23 18:11:26 +08:00 committed by GitHub
parent a2875f9d95
commit 00bb1014cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 19 additions and 3 deletions

View File

@ -46,6 +46,8 @@ type ParamTable struct {
MinioBucketName string
// search
SearchChannelName string
SearchResultChannelName string
SearchReceiveBufSize int64
SearchPulsarBufSize int64
SearchResultReceiveBufSize int64

View File

@ -140,6 +140,10 @@ func (node *QueryNode) Init() error {
Params.StatsChannelName = kv.Value
case "TimeTickChannelName":
Params.QueryTimeTickChannelName = kv.Value
case "SearchChannelName":
Params.SearchChannelName = kv.Value
case "SearchResultChannelName":
Params.SearchResultChannelName = kv.Value
default:
return fmt.Errorf("Invalid key: %v", kv.Key)
}

View File

@ -37,10 +37,20 @@ type searchService struct {
}
func newSearchService(ctx context.Context, replica ReplicaInterface, factory msgstream.Factory) *searchService {
// query node doesn't need to consumer any search or search result channel actively.
searchStream, _ := factory.NewQueryMsgStream(ctx)
searchResultStream, _ := factory.NewQueryMsgStream(ctx)
if Params.SearchChannelName != "" && Params.SearchResultChannelName != "" {
// query node need to consumer search channels and produce search result channels when init.
consumeChannels := []string{Params.SearchChannelName}
consumeSubName := Params.MsgChannelSubName
searchStream.AsConsumer(consumeChannels, consumeSubName)
log.Debug("query node AsConsumer", zap.Any("searchChannels", consumeChannels), zap.Any("consumeSubName", consumeSubName))
producerChannels := []string{Params.SearchResultChannelName}
searchResultStream.AsProducer(producerChannels)
log.Debug("query node AsProducer", zap.Any("searchResultChannels", producerChannels))
}
searchServiceCtx, searchServiceCancel := context.WithCancel(ctx)
return &searchService{
ctx: searchServiceCtx,

View File

@ -112,11 +112,11 @@ func (qs *QueryService) RegisterNode(ctx context.Context, req *querypb.RegisterN
qs.qcMutex.Lock()
for _, queryChannel := range qs.queryChannels {
startParams = append(startParams, &commonpb.KeyValuePair{
Key: "QueryChannelName",
Key: "SearchChannelName",
Value: queryChannel.requestChannel,
})
startParams = append(startParams, &commonpb.KeyValuePair{
Key: "QueryResultChannelName",
Key: "SearchResultChannelName",
Value: queryChannel.responseChannel,
})
}