From 7e182a230a2f34b46da843f1fd4565cd882ae0d9 Mon Sep 17 00:00:00 2001 From: neza2017 Date: Sat, 7 Nov 2020 13:19:31 +0800 Subject: [PATCH] Fix bug :GetTimeTick not return error if TimeBarrier is closed Signed-off-by: neza2017 --- docs/developer_guides/developer_guides.md | 4 +- internal/msgstream/marshaler.go | 169 ------------ internal/msgstream/msgstream.go | 266 +++++++++++-------- internal/msgstream/msgstream_test.go | 179 +++++++++---- internal/msgstream/newstream.go | 71 ----- internal/msgstream/newstream_test.go | 252 ------------------ internal/msgstream/task.go | 113 ++++++++ internal/msgstream/unmarshal.go | 41 +++ internal/reader/manipulation_service.go | 11 +- internal/reader/manipulation_service_test.go | 1 + 10 files changed, 450 insertions(+), 657 deletions(-) delete mode 100644 internal/msgstream/marshaler.go delete mode 100644 internal/msgstream/newstream.go delete mode 100644 internal/msgstream/newstream_test.go create mode 100644 internal/msgstream/unmarshal.go diff --git a/docs/developer_guides/developer_guides.md b/docs/developer_guides/developer_guides.md index 4d373fc22c..f79aa11129 100644 --- a/docs/developer_guides/developer_guides.md +++ b/docs/developer_guides/developer_guides.md @@ -1173,7 +1173,7 @@ type softTimeTickBarrier struct { ctx context.Context } -func (ttBarrier *softTimeTickBarrier) GetTimeTick() Timestamp +func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp,error) func (ttBarrier *softTimeTickBarrier) Start() error func newSoftTimeTickBarrier(ctx context.Context, ttStream *MsgStream, peerIds []UniqueId, minTtInterval Timestamp) *softTimeTickBarrier @@ -1193,7 +1193,7 @@ type hardTimeTickBarrier struct { ctx context.Context } -func (ttBarrier *hardTimeTickBarrier) GetTimeTick() Timestamp +func (ttBarrier *hardTimeTickBarrier) GetTimeTick() (Timestamp,error) func (ttBarrier *hardTimeTickBarrier) Start() error func newHardTimeTickBarrier(ctx context.Context, ttStream *MsgStream, peerIds []UniqueId) *softTimeTickBarrier diff --git a/internal/msgstream/marshaler.go b/internal/msgstream/marshaler.go deleted file mode 100644 index 2940f88725..0000000000 --- a/internal/msgstream/marshaler.go +++ /dev/null @@ -1,169 +0,0 @@ -package msgstream - -import ( - "github.com/golang/protobuf/proto" - commonPb "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" -) - -type TsMsgMarshaler interface { - Marshal(input *TsMsg) ([]byte, commonPb.Status) - Unmarshal(input []byte) (*TsMsg, commonPb.Status) -} - -func GetMarshalers(inputMsgType MsgType, outputMsgType MsgType) (*TsMsgMarshaler, *TsMsgMarshaler) { - return GetMarshaler(inputMsgType), GetMarshaler(outputMsgType) -} - -func GetMarshaler(MsgType MsgType) *TsMsgMarshaler { - switch MsgType { - case internalPb.MsgType_kInsert: - insertMarshaler := &InsertMarshaler{} - var tsMsgMarshaller TsMsgMarshaler = insertMarshaler - return &tsMsgMarshaller - case internalPb.MsgType_kDelete: - deleteMarshaler := &DeleteMarshaler{} - var tsMsgMarshaller TsMsgMarshaler = deleteMarshaler - return &tsMsgMarshaller - case internalPb.MsgType_kSearch: - searchMarshaler := &SearchMarshaler{} - var tsMsgMarshaller TsMsgMarshaler = searchMarshaler - return &tsMsgMarshaller - case internalPb.MsgType_kSearchResult: - searchResultMarshler := &SearchResultMarshaler{} - var tsMsgMarshaller TsMsgMarshaler = searchResultMarshler - return &tsMsgMarshaller - case internalPb.MsgType_kTimeTick: - timeTickMarshaler := &TimeTickMarshaler{} - var tsMsgMarshaller TsMsgMarshaler = timeTickMarshaler - return &tsMsgMarshaller - default: - return nil - } -} - -//////////////////////////////////////Insert/////////////////////////////////////////////// - -type InsertMarshaler struct{} - -func (im *InsertMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) { - insertTask := (*input).(*InsertMsg) - insertRequest := &insertTask.InsertRequest - mb, err := proto.Marshal(insertRequest) - if err != nil { - return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} - } - return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} -} - -func (im *InsertMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) { - insertRequest := internalPb.InsertRequest{} - err := proto.Unmarshal(input, &insertRequest) - insertMsg := &InsertMsg{InsertRequest: insertRequest} - - if err != nil { - return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} - } - var tsMsg TsMsg = insertMsg - return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} -} - -/////////////////////////////////////Delete////////////////////////////////////////////// - -type DeleteMarshaler struct{} - -func (dm *DeleteMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) { - deleteMsg := (*input).(*DeleteMsg) - deleteRequest := &deleteMsg.DeleteRequest - mb, err := proto.Marshal(deleteRequest) - if err != nil { - return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} - } - return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} -} - -func (dm *DeleteMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) { - deleteRequest := internalPb.DeleteRequest{} - err := proto.Unmarshal(input, &deleteRequest) - deleteMsg := &DeleteMsg{DeleteRequest: deleteRequest} - if err != nil { - return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} - } - var tsMsg TsMsg = deleteMsg - return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} -} - -/////////////////////////////////////Search/////////////////////////////////////////////// - -type SearchMarshaler struct{} - -func (sm *SearchMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) { - searchMsg := (*input).(*SearchMsg) - searchRequest := &searchMsg.SearchRequest - mb, err := proto.Marshal(searchRequest) - if err != nil { - return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} - } - return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} -} - -func (sm *SearchMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) { - searchRequest := internalPb.SearchRequest{} - err := proto.Unmarshal(input, &searchRequest) - searchMsg := &SearchMsg{SearchRequest: searchRequest} - if err != nil { - return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} - } - var tsMsg TsMsg = searchMsg - return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} -} - -/////////////////////////////////////SearchResult/////////////////////////////////////////////// - -type SearchResultMarshaler struct{} - -func (srm *SearchResultMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) { - searchResultMsg := (*input).(*SearchResultMsg) - searchResult := &searchResultMsg.SearchResult - mb, err := proto.Marshal(searchResult) - if err != nil { - return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} - } - return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} -} - -func (srm *SearchResultMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) { - searchResult := internalPb.SearchResult{} - err := proto.Unmarshal(input, &searchResult) - searchResultMsg := &SearchResultMsg{SearchResult: searchResult} - if err != nil { - return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} - } - var tsMsg TsMsg = searchResultMsg - return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} -} - -/////////////////////////////////////TimeTick/////////////////////////////////////////////// - -type TimeTickMarshaler struct{} - -func (tm *TimeTickMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) { - timeTickMsg := (*input).(*TimeTickMsg) - timeTick := &timeTickMsg.TimeTickMsg - mb, err := proto.Marshal(timeTick) - if err != nil { - return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} - } - return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} -} - -func (tm *TimeTickMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) { - timeTickMsg := internalPb.TimeTickMsg{} - err := proto.Unmarshal(input, &timeTickMsg) - timeTick := &TimeTickMsg{TimeTickMsg: timeTickMsg} - if err != nil { - return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} - } - var tsMsg TsMsg = timeTick - return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} -} diff --git a/internal/msgstream/msgstream.go b/internal/msgstream/msgstream.go index 4e7f2b3127..faff08d3cf 100644 --- a/internal/msgstream/msgstream.go +++ b/internal/msgstream/msgstream.go @@ -2,13 +2,13 @@ package msgstream import ( "context" + "github.com/gogo/protobuf/proto" "log" "sync" internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/apache/pulsar-client-go/pulsar" - commonPb "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) @@ -28,45 +28,53 @@ type MsgStream interface { Start() Close() - SetRepackFunc(repackFunc RepackFunc) - SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) - Produce(*MsgPack) commonPb.Status - Consume() *MsgPack // message can be consumed exactly once + Produce(*MsgPack) error + Broadcast(*MsgPack) error + Consume() *MsgPack } type PulsarMsgStream struct { - client *pulsar.Client - producers []*pulsar.Producer - consumers []*pulsar.Consumer - repackFunc RepackFunc // return a map from produceChannel idx to *MsgPack + ctx context.Context + client *pulsar.Client + producers []*pulsar.Producer + consumers []*pulsar.Consumer + repackFunc RepackFunc + unmarshal *UnmarshalDispatcher + receiveBuf chan *MsgPack + receiveBufSize int64 + wait sync.WaitGroup +} - receiveBuf chan *MsgPack - - msgMarshaler *TsMsgMarshaler - msgUnmarshaler *TsMsgMarshaler - inputChannel chan *MsgPack - outputChannel chan *MsgPack +func NewPulsarMsgStream(ctx context.Context, receiveBufSize int64) *PulsarMsgStream{ + return &PulsarMsgStream{ + ctx: ctx, + receiveBufSize: receiveBufSize, + } } func (ms *PulsarMsgStream) SetPulsarCient(address string) { client, err := pulsar.NewClient(pulsar.ClientOptions{URL: address}) if err != nil { - log.Printf("connect pulsar failed, %v", err) + log.Printf("Set pulsar client failed, error = %v", err) } ms.client = &client } -func (ms *PulsarMsgStream) SetProducers(channels []string) { +func (ms *PulsarMsgStream) CreatePulsarProducers(channels []string) { for i := 0; i < len(channels); i++ { pp, err := (*ms.client).CreateProducer(pulsar.ProducerOptions{Topic: channels[i]}) if err != nil { - log.Printf("failed to create reader producer %s, error = %v", channels[i], err) + log.Printf("Failed to create reader producer %s, error = %v", channels[i], err) } ms.producers = append(ms.producers, &pp) } } -func (ms *PulsarMsgStream) SetConsumers(channels []string, subName string, pulsarBufSize int64) { +func (ms *PulsarMsgStream) CreatePulsarConsumers(channels []string, + subName string, + unmarshal *UnmarshalDispatcher, + pulsarBufSize int64) { + ms.unmarshal = unmarshal for i := 0; i < len(channels); i++ { receiveChannel := make(chan pulsar.ConsumerMessage, pulsarBufSize) pc, err := (*ms.client).Subscribe(pulsar.ConsumerOptions{ @@ -77,22 +85,18 @@ func (ms *PulsarMsgStream) SetConsumers(channels []string, subName string, pulsa MessageChannel: receiveChannel, }) if err != nil { - log.Printf("failed to subscribe topic, error = %v", err) + log.Printf("Failed to subscribe topic, error = %v", err) } ms.consumers = append(ms.consumers, &pc) } } -func (ms *PulsarMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) { - ms.msgMarshaler = marshal - ms.msgUnmarshaler = unmarshal -} - func (ms *PulsarMsgStream) SetRepackFunc(repackFunc RepackFunc) { ms.repackFunc = repackFunc } func (ms *PulsarMsgStream) Start() { + ms.wait.Add(1) go ms.bufMsgPackToChannel() } @@ -110,17 +114,14 @@ func (ms *PulsarMsgStream) Close() { if ms.client != nil { (*ms.client).Close() } + ms.wait.Wait() } -func (ms *PulsarMsgStream) InitMsgPackBuf(msgPackBufSize int64) { - ms.receiveBuf = make(chan *MsgPack, msgPackBufSize) -} - -func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) commonPb.Status { +func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error { tsMsgs := msgPack.Msgs if len(tsMsgs) <= 0 { - log.Println("receive empty msgPack") - return commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} + log.Printf("Warning: Receive empty msgPack") + return nil } reBucketValues := make([][]int32, len(tsMsgs)) for channelId, tsMsg := range tsMsgs { @@ -151,46 +152,41 @@ func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) commonPb.Status { } for k, v := range result { for i := 0; i < len(v.Msgs); i++ { - mb, status := (*ms.msgMarshaler).Marshal(v.Msgs[i]) - if status.ErrorCode != commonPb.ErrorCode_SUCCESS { - log.Printf("Marshal ManipulationReqMsg failed, error ") - continue + mb, err := (*v.Msgs[i]).Marshal(v.Msgs[i]) + if err != nil { + return err } if _, err := (*ms.producers[k]).Send( context.Background(), &pulsar.ProducerMessage{Payload: mb}, ); err != nil { - log.Printf("post into pulsar filed, error = %v", err) + return err } } } - - return commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} + return nil } -func (ms *PulsarMsgStream) BroadCast(msgPack *MsgPack) commonPb.Status { +func (ms *PulsarMsgStream) Broadcast(msgPack *MsgPack) error { producerLen := len(ms.producers) for _, v := range msgPack.Msgs { - mb, status := (*ms.msgMarshaler).Marshal(v) - if status.ErrorCode != commonPb.ErrorCode_SUCCESS { - log.Printf("Marshal ManipulationReqMsg failed, error ") - continue + mb, err := (*v).Marshal(v) + if err != nil { + return err } for i := 0; i < producerLen; i++ { if _, err := (*ms.producers[i]).Send( context.Background(), &pulsar.ProducerMessage{Payload: mb}, ); err != nil { - log.Printf("post into pulsar filed, error = %v", err) + return err } } } - - return commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} + return nil } func (ms *PulsarMsgStream) Consume() *MsgPack { - ctx := context.Background() for { select { case cm, ok := <-ms.receiveBuf: @@ -199,34 +195,56 @@ func (ms *PulsarMsgStream) Consume() *MsgPack { return nil } return cm - case <-ctx.Done(): + case <-ms.ctx.Done(): + log.Printf("context closed") return nil } } } func (ms *PulsarMsgStream) bufMsgPackToChannel() { + defer ms.wait.Done() + ms.receiveBuf = make(chan *MsgPack, ms.receiveBufSize) for { - tsMsgList := make([]*TsMsg, 0) - for i := 0; i < len(ms.consumers); i++ { - consumerChan := (*ms.consumers[i]).Chan() - chanLen := len(consumerChan) - for l := 0; l < chanLen; l++ { - pulsarMsg, ok := <-consumerChan - if ok == false { - log.Printf("channel closed") + select { + case <-ms.ctx.Done(): + return + default: + tsMsgList := make([]*TsMsg, 0) + for i := 0; i < len(ms.consumers); i++ { + consumerChan := (*ms.consumers[i]).Chan() + chanLen := len(consumerChan) + for l := 0; l < chanLen; l++ { + pulsarMsg, ok := <-consumerChan + if ok == false { + log.Printf("channel closed") + return + } + (*ms.consumers[i]).AckID(pulsarMsg.ID()) + + headerMsg := internalPb.MsgHeader{} + err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg) + if err != nil { + log.Printf("Failed to unmarshal message header, error = %v", err) + continue + } + unMarshalFunc, ok:= (*ms.unmarshal).tempMap[headerMsg.MsgType] + if ok == false { + log.Printf("Not set unmarshalFunc for messageType %v", headerMsg.MsgType) + continue + } + tsMsg, err := unMarshalFunc(pulsarMsg.Payload()) + if err != nil { + log.Printf("Failed to unmarshal tsMsg, error = %v", err) + continue + } + tsMsgList = append(tsMsgList, tsMsg) } - (*ms.consumers[i]).AckID(pulsarMsg.ID()) - tsMsg, status := (*ms.msgUnmarshaler).Unmarshal(pulsarMsg.Payload()) - if status.ErrorCode != commonPb.ErrorCode_SUCCESS { - log.Printf("Marshal ManipulationReqMsg failed, error ") - } - tsMsgList = append(tsMsgList, tsMsg) } - } - if len(tsMsgList) > 0 { - msgPack := MsgPack{Msgs: tsMsgList} - ms.receiveBuf <- &msgPack + if len(tsMsgList) > 0 { + msgPack := MsgPack{Msgs: tsMsgList} + ms.receiveBuf <- &msgPack + } } } } @@ -235,73 +253,101 @@ type PulsarTtMsgStream struct { PulsarMsgStream inputBuf []*TsMsg unsolvedBuf []*TsMsg - msgPacks []*MsgPack lastTimeStamp Timestamp } +func NewPulsarTtMsgStream(ctx context.Context, receiveBufSize int64) *PulsarTtMsgStream { + pulsarMsgStream := PulsarMsgStream{ + ctx: ctx, + receiveBufSize: receiveBufSize, + } + return &PulsarTtMsgStream{ + PulsarMsgStream: pulsarMsgStream, + } +} + func (ms *PulsarTtMsgStream) Start() { + ms.wait.Add(1) go ms.bufMsgPackToChannel() } func (ms *PulsarTtMsgStream) bufMsgPackToChannel() { - wg := sync.WaitGroup{} - wg.Add(len(ms.consumers)) - eofMsgTimeStamp := make(map[int]Timestamp) - mu := sync.Mutex{} - for i := 0; i < len(ms.consumers); i++ { - go ms.findTimeTick(context.Background(), i, eofMsgTimeStamp, &wg, &mu) - } - wg.Wait() - timeStamp, ok := checkTimeTickMsg(eofMsgTimeStamp) - if ok == false { - log.Fatal("timeTick err") - } + defer ms.wait.Done() + ms.receiveBuf = make(chan *MsgPack, ms.receiveBufSize) + ms.unsolvedBuf = make([]*TsMsg, 0) + ms.inputBuf = make([]*TsMsg, 0) + for { + select { + case <-ms.ctx.Done(): + return + default: + wg := sync.WaitGroup{} + wg.Add(len(ms.consumers)) + eofMsgTimeStamp := make(map[int]Timestamp) + mu := sync.Mutex{} + for i := 0; i < len(ms.consumers); i++ { + go ms.findTimeTick(i, eofMsgTimeStamp, &wg, &mu) + } + wg.Wait() + timeStamp, ok := checkTimeTickMsg(eofMsgTimeStamp) + if ok == false { + log.Printf("timeTick err") + } - timeTickBuf := make([]*TsMsg, 0) - ms.inputBuf = append(ms.inputBuf, ms.unsolvedBuf...) - ms.unsolvedBuf = ms.unsolvedBuf[:0] - for _, v := range ms.inputBuf { - if (*v).EndTs() >= timeStamp { - timeTickBuf = append(timeTickBuf, v) - } else { - ms.unsolvedBuf = append(ms.unsolvedBuf, v) + timeTickBuf := make([]*TsMsg, 0) + ms.inputBuf = append(ms.inputBuf, ms.unsolvedBuf...) + ms.unsolvedBuf = ms.unsolvedBuf[:0] + for _, v := range ms.inputBuf { + if (*v).EndTs() <= timeStamp { + timeTickBuf = append(timeTickBuf, v) + } else { + ms.unsolvedBuf = append(ms.unsolvedBuf, v) + } + } + ms.inputBuf = ms.inputBuf[:0] + + msgPack := MsgPack{ + BeginTs: ms.lastTimeStamp, + EndTs: timeStamp, + Msgs: timeTickBuf, + } + + ms.receiveBuf <- &msgPack + ms.lastTimeStamp = timeStamp } } - ms.inputBuf = ms.inputBuf[:0] - msgPack := MsgPack{ - BeginTs: ms.lastTimeStamp, - EndTs: timeStamp, - Msgs: timeTickBuf, - } - - ms.receiveBuf <- &msgPack } -func (ms *PulsarTtMsgStream) findTimeTick(ctx context.Context, - channelIndex int, +func (ms *PulsarTtMsgStream) findTimeTick(channelIndex int, eofMsgMap map[int]Timestamp, wg *sync.WaitGroup, mu *sync.Mutex) { + defer wg.Done() for { select { - case <-ctx.Done(): + case <-ms.ctx.Done(): return case pulsarMsg, ok := <-(*ms.consumers[channelIndex]).Chan(): if ok == false { - log.Fatal("consumer closed!") - continue - } - (*ms.consumers[channelIndex]).Ack(pulsarMsg) - tsMsg, status := (*ms.msgUnmarshaler).Unmarshal(pulsarMsg.Payload()) - // TODO:: Find the EOF - if (*tsMsg).Type() == internalPb.MsgType_kTimeTick { - eofMsgMap[channelIndex] = (*tsMsg).EndTs() - wg.Done() + log.Printf("consumer closed!") return } - if status.ErrorCode != commonPb.ErrorCode_SUCCESS { - log.Printf("Marshal ManipulationReqMsg failed, error ") + (*ms.consumers[channelIndex]).Ack(pulsarMsg) + + headerMsg := internalPb.MsgHeader{} + err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg) + if err != nil { + log.Printf("Failed to unmarshal, error = %v", err) + } + unMarshalFunc := (*ms.unmarshal).tempMap[headerMsg.MsgType] + tsMsg, err := unMarshalFunc(pulsarMsg.Payload()) + if err != nil { + log.Printf("Failed to unmarshal, error = %v", err) + } + if headerMsg.MsgType == internalPb.MsgType_kTimeTick { + eofMsgMap[channelIndex] = (*tsMsg).(*TimeTickMsg).Timestamp + return } mu.Lock() ms.inputBuf = append(ms.inputBuf, tsMsg) diff --git a/internal/msgstream/msgstream_test.go b/internal/msgstream/msgstream_test.go index 2fafdd21b7..71301936e5 100644 --- a/internal/msgstream/msgstream_test.go +++ b/internal/msgstream/msgstream_test.go @@ -1,6 +1,7 @@ package msgstream import ( + "context" "fmt" "testing" @@ -27,7 +28,9 @@ func repackFunc(msgs []*TsMsg, hashKeys [][]int32) map[int32]*MsgPack { func getTsMsg(msgType MsgType, reqId UniqueID, hashValue int32) *TsMsg { var tsMsg TsMsg baseMsg := BaseMsg{ - HashValues: []int32{hashValue}, + BeginTimestamp: 0, + EndTimestamp: 0, + HashValues: []int32{hashValue}, } switch msgType { case internalPb.MsgType_kInsert: @@ -104,42 +107,82 @@ func getTsMsg(msgType MsgType, reqId UniqueID, hashValue int32) *TsMsg { return &tsMsg } -func initStream(pulsarAddress string, +func getTimeTickMsg(msgType MsgType, reqId UniqueID, hashValue int32, time uint64) *TsMsg { + var tsMsg TsMsg + baseMsg := BaseMsg{ + BeginTimestamp: 0, + EndTimestamp: 0, + HashValues: []int32{hashValue}, + } + timeTickResult := internalPb.TimeTickMsg{ + MsgType: internalPb.MsgType_kTimeTick, + PeerId: reqId, + Timestamp: time, + } + timeTickMsg := &TimeTickMsg{ + BaseMsg: baseMsg, + TimeTickMsg: timeTickResult, + } + tsMsg = timeTickMsg + return &tsMsg +} + +func initPulsarStream(pulsarAddress string, producerChannels []string, consumerChannels []string, consumerSubName string, - msgPack *MsgPack, - inputMsgType MsgType, - outputMsgType MsgType, - broadCast bool) { + opts ...RepackFunc) (*MsgStream, *MsgStream) { // set input stream - inputStream := PulsarMsgStream{} + inputStream := NewPulsarMsgStream(context.Background(), 100) inputStream.SetPulsarCient(pulsarAddress) - inputStream.SetMsgMarshaler(GetMarshaler(inputMsgType), nil) - inputStream.SetProducers(producerChannels) - inputStream.SetRepackFunc(repackFunc) + inputStream.CreatePulsarProducers(producerChannels) + for _, opt := range opts { + inputStream.SetRepackFunc(opt) + } + var input MsgStream = inputStream // set output stream - outputStream := PulsarMsgStream{} + outputStream := NewPulsarMsgStream(context.Background(), 100) outputStream.SetPulsarCient(pulsarAddress) - outputStream.SetMsgMarshaler(nil, GetMarshaler(outputMsgType)) - outputStream.SetConsumers(consumerChannels, consumerSubName, 100) - outputStream.InitMsgPackBuf(100) + unmarshalDispatcher := NewUnmarshalDispatcher() + outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100) outputStream.Start() + var output MsgStream = outputStream - //send msgPack - if broadCast { - inputStream.BroadCast(msgPack) - } else { - inputStream.Produce(msgPack) - //outputStream.Start() + return &input, &output +} + +func initPulsarTtStream(pulsarAddress string, + producerChannels []string, + consumerChannels []string, + consumerSubName string, + opts ...RepackFunc) (*MsgStream, *MsgStream) { + + // set input stream + inputStream := NewPulsarMsgStream(context.Background(), 100) + inputStream.SetPulsarCient(pulsarAddress) + inputStream.CreatePulsarProducers(producerChannels) + for _, opt := range opts { + inputStream.SetRepackFunc(opt) } + var input MsgStream = inputStream - // receive msg + // set output stream + outputStream := NewPulsarTtMsgStream(context.Background(), 100) + outputStream.SetPulsarCient(pulsarAddress) + unmarshalDispatcher := NewUnmarshalDispatcher() + outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100) + outputStream.Start() + var output MsgStream = outputStream + + return &input, &output +} + +func receiveMsg(outputStream *MsgStream, msgCount int) { receiveCount := 0 for { - result := outputStream.Consume() + result := (*outputStream).Consume() if len(result.Msgs) > 0 { msgs := result.Msgs for _, v := range msgs { @@ -147,19 +190,13 @@ func initStream(pulsarAddress string, fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v) } } - if broadCast { - if receiveCount >= len(msgPack.Msgs)*len(producerChannels) { - break - } - } else { - if receiveCount >= len(msgPack.Msgs) { - break - } + if receiveCount >= msgCount { + break } } } -func TestStream_Insert(t *testing.T) { +func TestStream_PulsarMsgStream_Insert(t *testing.T) { pulsarAddress := "pulsar://localhost:6650" producerChannels := []string{"insert1", "insert2"} consumerChannels := []string{"insert1", "insert2"} @@ -169,11 +206,12 @@ func TestStream_Insert(t *testing.T) { msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 1, 1)) msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 3, 3)) - //run stream - initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, internalPb.MsgType_kInsert, internalPb.MsgType_kInsert, false) + inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) + (*inputStream).Produce(&msgPack) + receiveMsg(outputStream, len(msgPack.Msgs)) } -func TestStream_Delete(t *testing.T) { +func TestStream_PulsarMsgStream_Delete(t *testing.T) { pulsarAddress := "pulsar://localhost:6650" producerChannels := []string{"delete"} consumerChannels := []string{"delete"} @@ -183,11 +221,12 @@ func TestStream_Delete(t *testing.T) { msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kDelete, 1, 1)) msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kDelete, 3, 3)) - //run stream - initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, internalPb.MsgType_kDelete, internalPb.MsgType_kDelete, false) + inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) + (*inputStream).Produce(&msgPack) + receiveMsg(outputStream, len(msgPack.Msgs)) } -func TestStream_Search(t *testing.T) { +func TestStream_PulsarMsgStream_Search(t *testing.T) { pulsarAddress := "pulsar://localhost:6650" producerChannels := []string{"search"} consumerChannels := []string{"search"} @@ -197,11 +236,12 @@ func TestStream_Search(t *testing.T) { msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearch, 1, 1)) msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearch, 3, 3)) - //run stream - initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, internalPb.MsgType_kSearch, internalPb.MsgType_kSearch, false) + inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) + (*inputStream).Produce(&msgPack) + receiveMsg(outputStream, len(msgPack.Msgs)) } -func TestStream_SearchResult(t *testing.T) { +func TestStream_PulsarMsgStream_SearchResult(t *testing.T) { pulsarAddress := "pulsar://localhost:6650" producerChannels := []string{"search"} consumerChannels := []string{"search"} @@ -211,11 +251,12 @@ func TestStream_SearchResult(t *testing.T) { msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearchResult, 1, 1)) msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearchResult, 3, 3)) - //run stream - initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, internalPb.MsgType_kSearchResult, internalPb.MsgType_kSearchResult, false) + inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) + (*inputStream).Produce(&msgPack) + receiveMsg(outputStream, len(msgPack.Msgs)) } -func TestStream_TimeTick(t *testing.T) { +func TestStream_PulsarMsgStream_TimeTick(t *testing.T) { pulsarAddress := "pulsar://localhost:6650" producerChannels := []string{"search"} consumerChannels := []string{"search"} @@ -225,11 +266,12 @@ func TestStream_TimeTick(t *testing.T) { msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 1, 1)) msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 3, 3)) - //run stream - initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, internalPb.MsgType_kTimeTick, internalPb.MsgType_kTimeTick, false) + inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) + (*inputStream).Produce(&msgPack) + receiveMsg(outputStream, len(msgPack.Msgs)) } -func TestStream_BroadCast(t *testing.T) { +func TestStream_PulsarMsgStream_BroadCast(t *testing.T) { pulsarAddress := "pulsar://localhost:6650" producerChannels := []string{"insert1", "insert2"} consumerChannels := []string{"insert1", "insert2"} @@ -239,6 +281,47 @@ func TestStream_BroadCast(t *testing.T) { msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 1, 1)) msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 3, 3)) - //run stream - initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, internalPb.MsgType_kTimeTick, internalPb.MsgType_kTimeTick, true) + inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) + (*inputStream).Broadcast(&msgPack) + receiveMsg(outputStream, len(consumerChannels)*len(msgPack.Msgs)) +} + +func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) { + pulsarAddress := "pulsar://localhost:6650" + producerChannels := []string{"insert1", "insert2"} + consumerChannels := []string{"insert1", "insert2"} + consumerSubName := "subInsert" + + msgPack := MsgPack{} + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 1, 1)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 3, 3)) + + inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, repackFunc) + (*inputStream).Produce(&msgPack) + receiveMsg(outputStream, len(msgPack.Msgs)) +} + +func TestStream_PulsarTtMsgStream_Insert(t *testing.T) { + pulsarAddress := "pulsar://localhost:6650" + producerChannels := []string{"insert1", "insert2"} + consumerChannels := []string{"insert1", "insert2"} + consumerSubName := "subInsert" + + msgPack0 := MsgPack{} + msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(internalPb.MsgType_kTimeTick, 0, 0, 0)) + + msgPack1 := MsgPack{} + msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(internalPb.MsgType_kInsert, 1, 1)) + msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(internalPb.MsgType_kInsert, 3, 3)) + + msgPack2 := MsgPack{} + msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(internalPb.MsgType_kTimeTick, 5, 5, 5)) + + inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) + (*inputStream).Broadcast(&msgPack0) + (*inputStream).Produce(&msgPack1) + (*inputStream).Broadcast(&msgPack2) + receiveMsg(outputStream, len(msgPack1.Msgs)) + outputTtStream := (*outputStream).(*PulsarTtMsgStream) + fmt.Printf("timestamp = %v", outputTtStream.lastTimeStamp) } diff --git a/internal/msgstream/newstream.go b/internal/msgstream/newstream.go deleted file mode 100644 index 5c35d96e65..0000000000 --- a/internal/msgstream/newstream.go +++ /dev/null @@ -1,71 +0,0 @@ -package msgstream - -func NewInputStream(pulsarAddress string, - producerChannels []string, - timeTick bool) *MsgStream { - var stream MsgStream - if timeTick { - pulsarTtStream := PulsarTtMsgStream{} - pulsarTtStream.SetPulsarCient(pulsarAddress) - pulsarTtStream.SetProducers(producerChannels) - stream = &pulsarTtStream - } else { - pulsarStream := PulsarMsgStream{} - pulsarStream.SetPulsarCient(pulsarAddress) - pulsarStream.SetProducers(producerChannels) - stream = &pulsarStream - } - - return &stream -} - -func NewOutputStream(pulsarAddress string, - pulsarBufSize int64, - consumerChannelSize int64, - consumerChannels []string, - consumerSubName string, - timeTick bool) *MsgStream { - var stream MsgStream - if timeTick { - pulsarTtStream := PulsarTtMsgStream{} - pulsarTtStream.SetPulsarCient(pulsarAddress) - pulsarTtStream.SetConsumers(consumerChannels, consumerSubName, pulsarBufSize) - pulsarTtStream.InitMsgPackBuf(consumerChannelSize) - stream = &pulsarTtStream - } else { - pulsarStream := PulsarMsgStream{} - pulsarStream.SetPulsarCient(pulsarAddress) - pulsarStream.SetConsumers(consumerChannels, consumerSubName, pulsarBufSize) - pulsarStream.InitMsgPackBuf(consumerChannelSize) - stream = &pulsarStream - } - - return &stream -} - -func NewPipeStream(pulsarAddress string, - pulsarBufSize int64, - consumerChannelSize int64, - producerChannels []string, - consumerChannels []string, - consumerSubName string, - timeTick bool) *MsgStream { - var stream MsgStream - if timeTick { - pulsarTtStream := PulsarTtMsgStream{} - pulsarTtStream.SetPulsarCient(pulsarAddress) - pulsarTtStream.SetProducers(producerChannels) - pulsarTtStream.SetConsumers(consumerChannels, consumerSubName, pulsarBufSize) - pulsarTtStream.InitMsgPackBuf(consumerChannelSize) - stream = &pulsarTtStream - } else { - pulsarStream := PulsarMsgStream{} - pulsarStream.SetPulsarCient(pulsarAddress) - pulsarStream.SetProducers(producerChannels) - pulsarStream.SetConsumers(consumerChannels, consumerSubName, pulsarBufSize) - pulsarStream.InitMsgPackBuf(consumerChannelSize) - stream = &pulsarStream - } - - return &stream -} diff --git a/internal/msgstream/newstream_test.go b/internal/msgstream/newstream_test.go deleted file mode 100644 index 5eecbdc0a4..0000000000 --- a/internal/msgstream/newstream_test.go +++ /dev/null @@ -1,252 +0,0 @@ -package msgstream - -import ( - "fmt" - "testing" - - internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" -) - -func TestNewStream_Insert(t *testing.T) { - pulsarAddress := "pulsar://localhost:6650" - producerChannels := []string{"insert1", "insert2"} - consumerChannels := []string{"insert1", "insert2"} - consumerSubName := "subInsert" - - msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 1, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 3, 3)) - inputStream := NewInputStream(pulsarAddress, producerChannels, false) - outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false) - - (*inputStream).SetMsgMarshaler(GetMarshaler(internalPb.MsgType_kInsert), nil) - (*inputStream).SetRepackFunc(repackFunc) - (*outputStream).SetMsgMarshaler(nil, GetMarshaler(internalPb.MsgType_kInsert)) - (*outputStream).Start() - - //send msgPack - (*inputStream).Produce(&msgPack) - //(*outputStream).Start() - - // receive msg - receiveCount := 0 - for { - result := (*outputStream).Consume() - if len(result.Msgs) > 0 { - msgs := result.Msgs - for _, v := range msgs { - receiveCount++ - fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v) - } - } - if receiveCount >= len(msgPack.Msgs) { - break - } - } -} - -func TestNewStream_Delete(t *testing.T) { - pulsarAddress := "pulsar://localhost:6650" - producerChannels := []string{"delete1", "delete2"} - consumerChannels := []string{"delete1", "delete2"} - consumerSubName := "subDelete" - - msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kDelete, 1, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kDelete, 3, 3)) - inputStream := NewInputStream(pulsarAddress, producerChannels, false) - outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false) - - (*inputStream).SetMsgMarshaler(GetMarshaler(internalPb.MsgType_kDelete), nil) - (*inputStream).SetRepackFunc(repackFunc) - (*outputStream).SetMsgMarshaler(nil, GetMarshaler(internalPb.MsgType_kDelete)) - (*outputStream).Start() - - //send msgPack - (*inputStream).Produce(&msgPack) - //(*outputStream).Start() - - // receive msg - receiveCount := 0 - for { - result := (*outputStream).Consume() - if len(result.Msgs) > 0 { - msgs := result.Msgs - for _, v := range msgs { - receiveCount++ - fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v) - } - } - if receiveCount >= len(msgPack.Msgs) { - break - } - } -} - -func TestNewStream_Search(t *testing.T) { - pulsarAddress := "pulsar://localhost:6650" - producerChannels := []string{"search1", "search2"} - consumerChannels := []string{"search1", "search2"} - consumerSubName := "subSearch" - - msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearch, 1, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearch, 3, 3)) - inputStream := NewInputStream(pulsarAddress, producerChannels, false) - outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false) - - (*inputStream).SetMsgMarshaler(GetMarshaler(internalPb.MsgType_kSearch), nil) - (*inputStream).SetRepackFunc(repackFunc) - (*outputStream).SetMsgMarshaler(nil, GetMarshaler(internalPb.MsgType_kSearch)) - (*outputStream).Start() - - //send msgPack - (*inputStream).Produce(&msgPack) - //(*outputStream).Start() - - // receive msg - receiveCount := 0 - for { - result := (*outputStream).Consume() - if len(result.Msgs) > 0 { - msgs := result.Msgs - for _, v := range msgs { - receiveCount++ - fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v) - } - } - if receiveCount >= len(msgPack.Msgs) { - break - } - } -} - -func TestNewStream_SearchResult(t *testing.T) { - pulsarAddress := "pulsar://localhost:6650" - producerChannels := []string{"searchResult1", "searchResult2"} - consumerChannels := []string{"searchResult1", "searchResult2"} - consumerSubName := "subInsert" - - msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearchResult, 1, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearchResult, 3, 3)) - inputStream := NewInputStream(pulsarAddress, producerChannels, false) - outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false) - - (*inputStream).SetMsgMarshaler(GetMarshaler(internalPb.MsgType_kSearchResult), nil) - (*inputStream).SetRepackFunc(repackFunc) - (*outputStream).SetMsgMarshaler(nil, GetMarshaler(internalPb.MsgType_kSearchResult)) - (*outputStream).Start() - - //send msgPack - (*inputStream).Produce(&msgPack) - //(*outputStream).Start() - - // receive msg - receiveCount := 0 - for { - result := (*outputStream).Consume() - if len(result.Msgs) > 0 { - msgs := result.Msgs - for _, v := range msgs { - receiveCount++ - fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v) - } - } - if receiveCount >= len(msgPack.Msgs) { - break - } - } -} - -func TestNewStream_TimeTick(t *testing.T) { - pulsarAddress := "pulsar://localhost:6650" - producerChannels := []string{"timeSync1", "timeSync2"} - consumerChannels := []string{"timeSync1", "timeSync2"} - consumerSubName := "subInsert" - - msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 1, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 3, 3)) - inputStream := NewInputStream(pulsarAddress, producerChannels, false) - outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false) - - (*inputStream).SetMsgMarshaler(GetMarshaler(internalPb.MsgType_kTimeTick), nil) - (*inputStream).SetRepackFunc(repackFunc) - (*outputStream).SetMsgMarshaler(nil, GetMarshaler(internalPb.MsgType_kTimeTick)) - (*outputStream).Start() - - //send msgPack - (*inputStream).Produce(&msgPack) - - // receive msg - receiveCount := 0 - for { - result := (*outputStream).Consume() - if len(result.Msgs) > 0 { - msgs := result.Msgs - for _, v := range msgs { - receiveCount++ - fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v) - } - } - if receiveCount >= len(msgPack.Msgs) { - break - } - } -} - -func TestNewTtStream_Insert_TimeSync(t *testing.T) { - pulsarAddress := "pulsar://localhost:6650" - producerChannels := []string{"insert"} - consumerChannels := []string{"insert"} - consumerSubName := "subInsert" - - msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 1, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 3, 3)) - - insertRequest := internalPb.InsertRequest{ - MsgType: internalPb.MsgType_kTimeTick, - ReqId: 2, - CollectionName: "Collection", - PartitionTag: "Partition", - SegmentId: 1, - ChannelId: 1, - ProxyId: 1, - Timestamps: []Timestamp{1}, - } - insertMsg := &InsertMsg{ - BaseMsg: BaseMsg{HashValues: []int32{2}}, - InsertRequest: insertRequest, - } - var tsMsg TsMsg = insertMsg - msgPack.Msgs = append(msgPack.Msgs, &tsMsg) - - inputStream := NewInputStream(pulsarAddress, producerChannels, false) - outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, true) - - (*inputStream).SetMsgMarshaler(GetMarshaler(internalPb.MsgType_kInsert), nil) - (*inputStream).SetRepackFunc(repackFunc) - (*outputStream).SetMsgMarshaler(nil, GetMarshaler(internalPb.MsgType_kInsert)) - (*outputStream).Start() - - //send msgPack - (*inputStream).Produce(&msgPack) - - // receive msg - receiveCount := 0 - for { - result := (*outputStream).Consume() - if len(result.Msgs) > 0 { - msgs := result.Msgs - for _, v := range msgs { - receiveCount++ - fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v) - } - } - if receiveCount+1 >= len(msgPack.Msgs) { - break - } - } -} diff --git a/internal/msgstream/task.go b/internal/msgstream/task.go index 70eb5fe63c..b3d6e4fd0b 100644 --- a/internal/msgstream/task.go +++ b/internal/msgstream/task.go @@ -1,6 +1,7 @@ package msgstream import ( + "github.com/gogo/protobuf/proto" internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" ) @@ -11,6 +12,8 @@ type TsMsg interface { EndTs() Timestamp Type() MsgType HashKeys() []int32 + Marshal(*TsMsg) ([]byte, error) + Unmarshal([]byte) (*TsMsg, error) } type BaseMsg struct { @@ -41,6 +44,28 @@ func (it *InsertMsg) Type() MsgType { return it.MsgType } +func (it *InsertMsg) Marshal(input *TsMsg) ([]byte, error) { + insertMsg := (*input).(*InsertMsg) + insertRequest := &insertMsg.InsertRequest + mb, err := proto.Marshal(insertRequest) + if err != nil { + return nil, err + } + return mb, nil +} + +func (it *InsertMsg) Unmarshal(input []byte) (*TsMsg, error) { + insertRequest := internalPb.InsertRequest{} + err := proto.Unmarshal(input, &insertRequest) + insertMsg := &InsertMsg{InsertRequest: insertRequest} + + if err != nil { + return nil, err + } + var tsMsg TsMsg = insertMsg + return &tsMsg, nil +} + /////////////////////////////////////////Delete////////////////////////////////////////// type DeleteMsg struct { BaseMsg @@ -51,6 +76,28 @@ func (dt *DeleteMsg) Type() MsgType { return dt.MsgType } +func (dt *DeleteMsg) Marshal(input *TsMsg) ([]byte, error) { + deleteTask := (*input).(*DeleteMsg) + deleteRequest := &deleteTask.DeleteRequest + mb, err := proto.Marshal(deleteRequest) + if err != nil { + return nil, err + } + return mb, nil +} + +func (dt *DeleteMsg) Unmarshal(input []byte) (*TsMsg, error) { + deleteRequest := internalPb.DeleteRequest{} + err := proto.Unmarshal(input, &deleteRequest) + deleteMsg := &DeleteMsg{DeleteRequest: deleteRequest} + + if err != nil { + return nil, err + } + var tsMsg TsMsg = deleteMsg + return &tsMsg, nil +} + /////////////////////////////////////////Search////////////////////////////////////////// type SearchMsg struct { BaseMsg @@ -61,6 +108,28 @@ func (st *SearchMsg) Type() MsgType { return st.MsgType } +func (st *SearchMsg) Marshal(input *TsMsg) ([]byte, error) { + searchTask := (*input).(*SearchMsg) + searchRequest := &searchTask.SearchRequest + mb, err := proto.Marshal(searchRequest) + if err != nil { + return nil, err + } + return mb, nil +} + +func (st *SearchMsg) Unmarshal(input []byte) (*TsMsg, error) { + searchRequest := internalPb.SearchRequest{} + err := proto.Unmarshal(input, &searchRequest) + searchMsg := &SearchMsg{SearchRequest: searchRequest} + + if err != nil { + return nil, err + } + var tsMsg TsMsg = searchMsg + return &tsMsg, nil +} + /////////////////////////////////////////SearchResult////////////////////////////////////////// type SearchResultMsg struct { BaseMsg @@ -71,6 +140,28 @@ func (srt *SearchResultMsg) Type() MsgType { return srt.MsgType } +func (srt *SearchResultMsg) Marshal(input *TsMsg) ([]byte, error) { + searchResultTask := (*input).(*SearchResultMsg) + searchResultRequest := &searchResultTask.SearchResult + mb, err := proto.Marshal(searchResultRequest) + if err != nil { + return nil, err + } + return mb, nil +} + +func (srt *SearchResultMsg) Unmarshal(input []byte) (*TsMsg, error) { + searchResultRequest := internalPb.SearchResult{} + err := proto.Unmarshal(input, &searchResultRequest) + searchResultMsg := &SearchResultMsg{SearchResult: searchResultRequest} + + if err != nil { + return nil, err + } + var tsMsg TsMsg = searchResultMsg + return &tsMsg, nil +} + /////////////////////////////////////////TimeTick////////////////////////////////////////// type TimeTickMsg struct { BaseMsg @@ -81,6 +172,28 @@ func (tst *TimeTickMsg) Type() MsgType { return tst.MsgType } +func (tst *TimeTickMsg) Marshal(input *TsMsg) ([]byte, error) { + timeTickTask := (*input).(*TimeTickMsg) + timeTick := &timeTickTask.TimeTickMsg + mb, err := proto.Marshal(timeTick) + if err != nil { + return nil, err + } + return mb, nil +} + +func (tst *TimeTickMsg) Unmarshal(input []byte) (*TsMsg, error) { + timeTickMsg := internalPb.TimeTickMsg{} + err := proto.Unmarshal(input, &timeTickMsg) + timeTick := &TimeTickMsg{TimeTickMsg: timeTickMsg} + + if err != nil { + return nil, err + } + var tsMsg TsMsg = timeTick + return &tsMsg, nil +} + ///////////////////////////////////////////Key2Seg////////////////////////////////////////// //type Key2SegMsg struct { // BaseMsg diff --git a/internal/msgstream/unmarshal.go b/internal/msgstream/unmarshal.go new file mode 100644 index 0000000000..afc8ff4aa1 --- /dev/null +++ b/internal/msgstream/unmarshal.go @@ -0,0 +1,41 @@ +package msgstream + +import ( + internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" +) + +type MarshalFunc func(*TsMsg) ([]byte, error) +type UnmarshalFunc func([]byte) (*TsMsg, error) + +type UnmarshalDispatcher struct { + tempMap map[internalPb.MsgType]UnmarshalFunc +} + +func (dispatcher *UnmarshalDispatcher) Unmarshal(input []byte, msgType internalPb.MsgType) (*TsMsg, error) { + unmarshalFunc := dispatcher.tempMap[msgType] + return unmarshalFunc(input) +} + +func (dispatcher *UnmarshalDispatcher) AddMsgTemplate(msgType internalPb.MsgType, unmarshal UnmarshalFunc) { + dispatcher.tempMap[msgType] = unmarshal +} + +func (dispatcher *UnmarshalDispatcher) addDefaultMsgTemplates() { + insertMsg := InsertMsg{} + deleteMsg := DeleteMsg{} + searchMsg := SearchMsg{} + searchResultMsg := SearchResultMsg{} + timeTickMsg := TimeTickMsg{} + dispatcher.tempMap = make(map[internalPb.MsgType]UnmarshalFunc) + dispatcher.tempMap[internalPb.MsgType_kInsert] = insertMsg.Unmarshal + dispatcher.tempMap[internalPb.MsgType_kDelete] = deleteMsg.Unmarshal + dispatcher.tempMap[internalPb.MsgType_kSearch] = searchMsg.Unmarshal + dispatcher.tempMap[internalPb.MsgType_kSearchResult] = searchResultMsg.Unmarshal + dispatcher.tempMap[internalPb.MsgType_kTimeTick] = timeTickMsg.Unmarshal +} + +func NewUnmarshalDispatcher() *UnmarshalDispatcher { + unmarshalDispatcher := UnmarshalDispatcher{} + unmarshalDispatcher.addDefaultMsgTemplates() + return &unmarshalDispatcher +} diff --git a/internal/reader/manipulation_service.go b/internal/reader/manipulation_service.go index d49f694dd3..d8023b62f8 100644 --- a/internal/reader/manipulation_service.go +++ b/internal/reader/manipulation_service.go @@ -2,7 +2,6 @@ package reader import ( "context" - internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "log" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -36,10 +35,12 @@ func (dmService *manipulationService) start() { consumerChannels := []string{"insert"} consumerSubName := "subInsert" - outputStream := msgstream.NewOutputStream(dmService.pulsarURL, pulsarBufSize, consumerChannelSize, consumerChannels, consumerSubName, true) - - (*outputStream).SetMsgMarshaler(nil, msgstream.GetMarshaler(internalPb.MsgType_kInsert)) - go (*outputStream).Start() + // TODO:: load receiveBufSize from config file + outputStream := msgstream.NewPulsarTtMsgStream(dmService.ctx, 100) + outputStream.SetPulsarCient(dmService.pulsarURL) + unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() + outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, pulsarBufSize) + (*outputStream).Start() dmService.initNodes() go dmService.fg.Start() diff --git a/internal/reader/manipulation_service_test.go b/internal/reader/manipulation_service_test.go index 6fa550577d..0c860b98e6 100644 --- a/internal/reader/manipulation_service_test.go +++ b/internal/reader/manipulation_service_test.go @@ -64,6 +64,7 @@ func TestManipulationService_Start(t *testing.T) { for i := 0; i < msgLength; i++ { var msg msgstream.TsMsg = &msgstream.InsertMsg{ InsertRequest: internalPb.InsertRequest{ + //MsgType: internalPb.MsgType_kInsert, ReqId: int64(0), CollectionName: "collection0", PartitionTag: "default",