From f21c0ef2e98405577c7ea59d047cf5a89c03803b Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Thu, 26 Nov 2020 17:58:08 +0800 Subject: [PATCH] Reduce cpu usage Signed-off-by: bigsheeper --- internal/msgstream/msgstream.go | 68 ++++++++++++++++++---------- internal/querynode/search_service.go | 9 +++- 2 files changed, 51 insertions(+), 26 deletions(-) diff --git a/internal/msgstream/msgstream.go b/internal/msgstream/msgstream.go index 33dc230cd2..c36d488ab3 100644 --- a/internal/msgstream/msgstream.go +++ b/internal/msgstream/msgstream.go @@ -3,6 +3,7 @@ package msgstream import ( "context" "log" + "reflect" "sync" "github.com/apache/pulsar-client-go/pulsar" @@ -111,9 +112,6 @@ func (ms *PulsarMsgStream) Start() { func (ms *PulsarMsgStream) Close() { ms.streamCancel() - if ms.wait != nil { - ms.wait.Wait() - } for _, producer := range ms.producers { if producer != nil { @@ -227,37 +225,59 @@ func (ms *PulsarMsgStream) Consume() *MsgPack { func (ms *PulsarMsgStream) bufMsgPackToChannel() { defer ms.wait.Done() + + cases := make([]reflect.SelectCase, len(ms.consumers)) + for i := 0; i < len(ms.consumers); i++ { + ch := (*ms.consumers[i]).Chan() + cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)} + } + for { select { case <-ms.ctx.Done(): return default: tsMsgList := make([]TsMsg, 0) - for i := 0; i < len(ms.consumers); i++ { - consumerChan := (*ms.consumers[i]).Chan() - chanLen := len(consumerChan) - for l := 0; l < chanLen; l++ { - pulsarMsg, ok := <-consumerChan - if !ok { - log.Printf("channel closed") - return - } - (*ms.consumers[i]).AckID(pulsarMsg.ID()) - headerMsg := internalPb.MsgHeader{} - err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg) - if err != nil { - log.Printf("Failed to unmarshal message header, error = %v", err) - continue + for { + chosen, value, ok := reflect.Select(cases) + if !ok { + log.Printf("channel closed") + return + } + + pulsarMsg, ok := value.Interface().(pulsar.ConsumerMessage) + if !ok { + log.Printf("type assertion failed, not consumer message type") + continue + } + (*ms.consumers[chosen]).AckID(pulsarMsg.ID()) + + headerMsg := internalPb.MsgHeader{} + err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg) + if err != nil { + log.Printf("Failed to unmarshal message header, error = %v", err) + continue + } + tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.MsgType) + if err != nil { + log.Printf("Failed to unmarshal tsMsg, error = %v", err) + continue + } + tsMsgList = append(tsMsgList, tsMsg) + + noMoreMessage := true + for i := 0; i < len(ms.consumers); i++ { + if len((*ms.consumers[i]).Chan()) > 0 { + noMoreMessage = false } - tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.MsgType) - if err != nil { - log.Printf("Failed to unmarshal tsMsg, error = %v", err) - continue - } - tsMsgList = append(tsMsgList, tsMsg) + } + + if noMoreMessage { + break } } + if len(tsMsgList) > 0 { msgPack := MsgPack{Msgs: tsMsgList} ms.receiveBuf <- &msgPack diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index 87f7c999c1..589e3567d6 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -172,15 +172,20 @@ func (ss *searchService) doUnsolvedMsgSearch() { ss.unsolvedMsg = append(ss.unsolvedMsg, msg) } - msgBufferLength := len(ss.msgBuffer) - for i := 0; i < msgBufferLength; i++ { + for { msg := <-ss.msgBuffer if msg.EndTs() <= serviceTime { searchMsg = append(searchMsg, msg) continue } ss.unsolvedMsg = append(ss.unsolvedMsg, msg) + + msgBufferLength := len(ss.msgBuffer) + if msgBufferLength <= 0 { + break + } } + if len(searchMsg) <= 0 { continue }