From d50543a0307949d0c06dc328122d5061e0665d73 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 8 Nov 2021 20:56:07 +0800 Subject: [PATCH] Batch consume msg in rockmq internal loop (#11373) Signed-off-by: Congqi Xia --- .../rocksmq/client/rocksmq/client_impl.go | 23 +++++---- .../rocksmq/server/rocksmq/rocksmq_impl.go | 48 ++++++++++++++----- 2 files changed, 49 insertions(+), 22 deletions(-) diff --git a/internal/util/rocksmq/client/rocksmq/client_impl.go b/internal/util/rocksmq/client/rocksmq/client_impl.go index 38bf53b860..7097b353fc 100644 --- a/internal/util/rocksmq/client/rocksmq/client_impl.go +++ b/internal/util/rocksmq/client/rocksmq/client_impl.go @@ -134,24 +134,27 @@ func (c *client) consume(consumer *consumer) { } for { - msg, err := consumer.client.server.Consume(consumer.topic, consumer.consumerName, 1) + n := cap(consumer.messageCh) - len(consumer.messageCh) + if n < 100 { // batch min size + n = 100 + } + msgs, err := consumer.client.server.Consume(consumer.topic, consumer.consumerName, n) if err != nil { log.Debug("Consumer's goroutine cannot consume from (" + consumer.topic + "," + consumer.consumerName + "): " + err.Error()) break } - if len(msg) != 1 { - //log.Debug("Consumer's goroutine cannot consume from (" + consumer.topic + - // "," + consumer.consumerName + "): message len(" + strconv.Itoa(len(msg)) + - // ") is not 1") + // no more msgs + if len(msgs) == 0 { break } - - consumer.messageCh <- ConsumerMessage{ - MsgID: msg[0].MsgID, - Payload: msg[0].Payload, - Topic: consumer.Topic(), + for _, msg := range msgs { + consumer.messageCh <- ConsumerMessage{ + MsgID: msg.MsgID, + Payload: msg.Payload, + Topic: consumer.Topic(), + } } } } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index deb15aa33d..ff88b61829 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -14,6 +14,8 @@ package rocksmq import ( "errors" "fmt" + "math" + "path" "strconv" "sync" "time" @@ -635,15 +637,20 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum return consumerMessage, nil } - newID := consumerMessage[len(consumerMessage)-1].MsgID + consumedIDs := make([]UniqueID, 0, len(consumerMessage)) + msgSize := make([]int64, 0, len(consumerMessage)) + for _, msg := range consumerMessage { + consumedIDs = append(consumedIDs, msg.MsgID) + msgSize = append(msgSize, int64(len(msg.Payload))) + } + newID := consumedIDs[len(consumedIDs)-1] err = rmq.seek(topicName, groupName, newID) if err != nil { log.Debug("RocksMQ: Seek(" + groupName + "," + topicName + "," + strconv.FormatInt(newID, 10) + ") failed") return nil, err } - msgSize := len(consumerMessage[len(consumerMessage)-1].Payload) - go rmq.updateAckedInfo(topicName, groupName, newID, int64(msgSize)) + go rmq.updateAckedInfo(topicName, groupName, consumedIDs, msgSize) return consumerMessage, nil } @@ -764,7 +771,10 @@ func (rmq *rocksmq) Notify(topicName, groupName string) { } // updateAckedInfo update acked informations for retention after consume -func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID, msgSize int64) error { +func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID, msgSize []int64) error { + if len(ids) == 0 { + return nil + } ll, ok := topicMu.Load(topicName) if !ok { return fmt.Errorf("topic name = %s not exist", topicName) @@ -776,20 +786,22 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID, lock.Lock() defer lock.Unlock() + lastID := ids[len(ids)-1] + fixedBeginIDKey, err := constructKey(BeginIDTitle, topicName) if err != nil { return err } // Update begin_id for the consumer_group beginIDKey := fixedBeginIDKey + "/" + groupName - err = rmq.kv.Save(beginIDKey, strconv.FormatInt(newID, 10)) + err = rmq.kv.Save(beginIDKey, strconv.FormatInt(lastID, 10)) if err != nil { return err } // Update begin_id for topic if vals, ok := rmq.consumers.Load(topicName); ok { - var minBeginID int64 = -1 + var minBeginID int64 = math.MaxInt64 for _, v := range vals.([]*Consumer) { curBeginIDKey := fixedBeginIDKey + "/" + v.GroupName curBeginIDVal, err := rmq.kv.Load(curBeginIDKey) @@ -800,7 +812,7 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID, if err != nil { return err } - if curBeginID > minBeginID { + if curBeginID < minBeginID { minBeginID = curBeginID } } @@ -815,13 +827,25 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID, if err != nil { return err } - ackedTsKey := fixedAckedTsKey + "/" + strconv.FormatInt(minBeginID, 10) - ts := time.Now().Unix() - err = rmq.kv.Save(ackedTsKey, strconv.FormatInt(ts, 10)) + + ts := strconv.FormatInt(time.Now().Unix(), 10) + // current behavior is to ack all safe msgID(before minBeginID) + // TODO @silverxia @yukun ack only page separator msg id + ackMsgKvs := make(map[string]string) + totalAckMsgSize := int64(0) + for i, id := range ids { + // depends on the ids are monotonically increasing + if id <= minBeginID { + totalAckMsgSize += msgSize[i] + key := path.Join(fixedAckedTsKey, strconv.FormatInt(id, 10)) + ackMsgKvs[key] = ts + } + } + err = rmq.kv.MultiSave(ackMsgKvs) if err != nil { return err } - if minBeginID == newID { + if minBeginID == lastID { // Means the begin_id of topic update to newID, so needs to update acked size ackedSizeKey := AckedSizeTitle + topicName ackedSizeVal, err := rmq.kv.Load(ackedSizeKey) @@ -832,7 +856,7 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID, if err != nil { return err } - ackedSize += msgSize + ackedSize += totalAckMsgSize err = rmq.kv.Save(ackedSizeKey, strconv.FormatInt(ackedSize, 10)) if err != nil { return err