mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
196 lines
5.8 KiB
Go
196 lines
5.8 KiB
Go
package querynode
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
|
|
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
|
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
|
queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
|
|
)
|
|
|
|
func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error) {
|
|
select {
|
|
case <-ctx.Done():
|
|
errMsg := "context exceeded"
|
|
status := &commonpb.Status{
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
Reason: errMsg,
|
|
}
|
|
|
|
return status, errors.New(errMsg)
|
|
default:
|
|
if node.searchService == nil || node.searchService.searchMsgStream == nil {
|
|
errMsg := "null search service or null search message stream"
|
|
status := &commonpb.Status{
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
Reason: errMsg,
|
|
}
|
|
|
|
return status, errors.New(errMsg)
|
|
}
|
|
|
|
searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream)
|
|
if !ok {
|
|
errMsg := "type assertion failed for search message stream"
|
|
status := &commonpb.Status{
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
Reason: errMsg,
|
|
}
|
|
|
|
return status, errors.New(errMsg)
|
|
}
|
|
|
|
resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream)
|
|
if !ok {
|
|
errMsg := "type assertion failed for search result message stream"
|
|
status := &commonpb.Status{
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
Reason: errMsg,
|
|
}
|
|
|
|
return status, errors.New(errMsg)
|
|
}
|
|
|
|
// add request channel
|
|
pulsarBufSize := Params.SearchPulsarBufSize
|
|
consumeChannels := []string{in.RequestChannelID}
|
|
consumeSubName := Params.MsgChannelSubName
|
|
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
|
|
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
|
|
|
|
// add result channel
|
|
producerChannels := []string{in.ResultChannelID}
|
|
resultStream.CreatePulsarProducers(producerChannels)
|
|
|
|
status := &commonpb.Status{
|
|
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
|
}
|
|
return status, nil
|
|
}
|
|
}
|
|
|
|
func (node *QueryNode) RemoveQueryChannel(ctx context.Context, in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error) {
|
|
select {
|
|
case <-ctx.Done():
|
|
errMsg := "context exceeded"
|
|
status := &commonpb.Status{
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
Reason: errMsg,
|
|
}
|
|
|
|
return status, errors.New(errMsg)
|
|
default:
|
|
if node.searchService == nil || node.searchService.searchMsgStream == nil {
|
|
errMsg := "null search service or null search result message stream"
|
|
status := &commonpb.Status{
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
Reason: errMsg,
|
|
}
|
|
|
|
return status, errors.New(errMsg)
|
|
}
|
|
|
|
searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream)
|
|
if !ok {
|
|
errMsg := "type assertion failed for search message stream"
|
|
status := &commonpb.Status{
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
Reason: errMsg,
|
|
}
|
|
|
|
return status, errors.New(errMsg)
|
|
}
|
|
|
|
resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream)
|
|
if !ok {
|
|
errMsg := "type assertion failed for search result message stream"
|
|
status := &commonpb.Status{
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
Reason: errMsg,
|
|
}
|
|
|
|
return status, errors.New(errMsg)
|
|
}
|
|
|
|
// remove request channel
|
|
pulsarBufSize := Params.SearchPulsarBufSize
|
|
consumeChannels := []string{in.RequestChannelID}
|
|
consumeSubName := Params.MsgChannelSubName
|
|
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
|
|
// TODO: searchStream.RemovePulsarConsumers(producerChannels)
|
|
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
|
|
|
|
// remove result channel
|
|
producerChannels := []string{in.ResultChannelID}
|
|
// TODO: resultStream.RemovePulsarProducer(producerChannels)
|
|
resultStream.CreatePulsarProducers(producerChannels)
|
|
|
|
status := &commonpb.Status{
|
|
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
|
}
|
|
return status, nil
|
|
}
|
|
}
|
|
|
|
func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
|
|
select {
|
|
case <-ctx.Done():
|
|
errMsg := "context exceeded"
|
|
status := &commonpb.Status{
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
Reason: errMsg,
|
|
}
|
|
|
|
return status, errors.New(errMsg)
|
|
default:
|
|
if node.dataSyncService == nil || node.dataSyncService.dmStream == nil {
|
|
errMsg := "null data sync service or null data manipulation stream"
|
|
status := &commonpb.Status{
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
Reason: errMsg,
|
|
}
|
|
|
|
return status, errors.New(errMsg)
|
|
}
|
|
|
|
fgDMMsgStream, ok := node.dataSyncService.dmStream.(*msgstream.PulsarMsgStream)
|
|
if !ok {
|
|
errMsg := "type assertion failed for dm message stream"
|
|
status := &commonpb.Status{
|
|
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
|
Reason: errMsg,
|
|
}
|
|
|
|
return status, errors.New(errMsg)
|
|
}
|
|
|
|
// add request channel
|
|
pulsarBufSize := Params.SearchPulsarBufSize
|
|
consumeChannels := in.ChannelIDs
|
|
consumeSubName := Params.MsgChannelSubName
|
|
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
|
|
fgDMMsgStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
|
|
|
|
status := &commonpb.Status{
|
|
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
|
}
|
|
return status, nil
|
|
}
|
|
}
|
|
|
|
func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) {
|
|
// TODO: implement
|
|
return nil, nil
|
|
}
|
|
|
|
func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) {
|
|
// TODO: implement
|
|
return nil, nil
|
|
}
|
|
|
|
func (node *QueryNode) GetPartitionState(ctx context.Context, in *queryPb.PartitionStatesRequest) (*queryPb.PartitionStatesResponse, error) {
|
|
// TODO: implement
|
|
return nil, nil
|
|
}
|