bigsheeper 48821690b3 Rename Service interface to Component, and update segmentLoad strategy
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
2021-01-18 10:38:41 +08:00

185 lines
5.5 KiB
Go

package querynode
import (
"errors"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
)
func (node *QueryNode) AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error) {
if node.searchService == nil || node.searchService.searchMsgStream == nil {
errMsg := "null search service or null search message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search result message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
// add request channel
pulsarBufSize := Params.SearchPulsarBufSize
consumeChannels := []string{in.RequestChannelID}
consumeSubName := Params.MsgChannelSubName
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
// add result channel
producerChannels := []string{in.ResultChannelID}
resultStream.CreatePulsarProducers(producerChannels)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
return status, nil
}
func (node *QueryNode) RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error) {
if node.searchService == nil || node.searchService.searchMsgStream == nil {
errMsg := "null search service or null search result message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search result message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
// remove request channel
pulsarBufSize := Params.SearchPulsarBufSize
consumeChannels := []string{in.RequestChannelID}
consumeSubName := Params.MsgChannelSubName
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
// TODO: searchStream.RemovePulsarConsumers(producerChannels)
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
// remove result channel
producerChannels := []string{in.ResultChannelID}
// TODO: resultStream.RemovePulsarProducer(producerChannels)
resultStream.CreatePulsarProducers(producerChannels)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
return status, nil
}
func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
if node.dataSyncService == nil || node.dataSyncService.dmStream == nil {
errMsg := "null data sync service or null data manipulation stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
fgDMMsgStream, ok := node.dataSyncService.dmStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for dm message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
// add request channel
pulsarBufSize := Params.SearchPulsarBufSize
consumeChannels := in.ChannelIDs
consumeSubName := Params.MsgChannelSubName
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
fgDMMsgStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
return status, nil
}
func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) {
// TODO: support db
fieldIDs := in.FieldIDs
for _, segmentID := range in.SegmentIDs {
indexID := UniqueID(0) // TODO: ???
err := node.segManager.loadSegment(segmentID, &fieldIDs)
if err != nil {
// TODO: return or continue?
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}
return status, err
}
err = node.segManager.loadIndex(segmentID, indexID)
if err != nil {
// TODO: return or continue?
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}
return status, err
}
}
return nil, nil
}
func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) {
// TODO: implement
return nil, nil
}
func (node *QueryNode) GetPartitionState(in *queryPb.PartitionStatesRequest) (*queryPb.PartitionStatesResponse, error) {
// TODO: implement
return nil, nil
}