Batch consume msg in rockmq internal loop (#11373)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2021-11-08 20:56:07 +08:00 committed by GitHub
parent 3623c64ec2
commit d50543a030
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 49 additions and 22 deletions

View File

@ -134,24 +134,27 @@ func (c *client) consume(consumer *consumer) {
}
for {
msg, err := consumer.client.server.Consume(consumer.topic, consumer.consumerName, 1)
n := cap(consumer.messageCh) - len(consumer.messageCh)
if n < 100 { // batch min size
n = 100
}
msgs, err := consumer.client.server.Consume(consumer.topic, consumer.consumerName, n)
if err != nil {
log.Debug("Consumer's goroutine cannot consume from (" + consumer.topic +
"," + consumer.consumerName + "): " + err.Error())
break
}
if len(msg) != 1 {
//log.Debug("Consumer's goroutine cannot consume from (" + consumer.topic +
// "," + consumer.consumerName + "): message len(" + strconv.Itoa(len(msg)) +
// ") is not 1")
// no more msgs
if len(msgs) == 0 {
break
}
consumer.messageCh <- ConsumerMessage{
MsgID: msg[0].MsgID,
Payload: msg[0].Payload,
Topic: consumer.Topic(),
for _, msg := range msgs {
consumer.messageCh <- ConsumerMessage{
MsgID: msg.MsgID,
Payload: msg.Payload,
Topic: consumer.Topic(),
}
}
}
}

View File

@ -14,6 +14,8 @@ package rocksmq
import (
"errors"
"fmt"
"math"
"path"
"strconv"
"sync"
"time"
@ -635,15 +637,20 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
return consumerMessage, nil
}
newID := consumerMessage[len(consumerMessage)-1].MsgID
consumedIDs := make([]UniqueID, 0, len(consumerMessage))
msgSize := make([]int64, 0, len(consumerMessage))
for _, msg := range consumerMessage {
consumedIDs = append(consumedIDs, msg.MsgID)
msgSize = append(msgSize, int64(len(msg.Payload)))
}
newID := consumedIDs[len(consumedIDs)-1]
err = rmq.seek(topicName, groupName, newID)
if err != nil {
log.Debug("RocksMQ: Seek(" + groupName + "," + topicName + "," + strconv.FormatInt(newID, 10) + ") failed")
return nil, err
}
msgSize := len(consumerMessage[len(consumerMessage)-1].Payload)
go rmq.updateAckedInfo(topicName, groupName, newID, int64(msgSize))
go rmq.updateAckedInfo(topicName, groupName, consumedIDs, msgSize)
return consumerMessage, nil
}
@ -764,7 +771,10 @@ func (rmq *rocksmq) Notify(topicName, groupName string) {
}
// updateAckedInfo update acked informations for retention after consume
func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID, msgSize int64) error {
func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID, msgSize []int64) error {
if len(ids) == 0 {
return nil
}
ll, ok := topicMu.Load(topicName)
if !ok {
return fmt.Errorf("topic name = %s not exist", topicName)
@ -776,20 +786,22 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID,
lock.Lock()
defer lock.Unlock()
lastID := ids[len(ids)-1]
fixedBeginIDKey, err := constructKey(BeginIDTitle, topicName)
if err != nil {
return err
}
// Update begin_id for the consumer_group
beginIDKey := fixedBeginIDKey + "/" + groupName
err = rmq.kv.Save(beginIDKey, strconv.FormatInt(newID, 10))
err = rmq.kv.Save(beginIDKey, strconv.FormatInt(lastID, 10))
if err != nil {
return err
}
// Update begin_id for topic
if vals, ok := rmq.consumers.Load(topicName); ok {
var minBeginID int64 = -1
var minBeginID int64 = math.MaxInt64
for _, v := range vals.([]*Consumer) {
curBeginIDKey := fixedBeginIDKey + "/" + v.GroupName
curBeginIDVal, err := rmq.kv.Load(curBeginIDKey)
@ -800,7 +812,7 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID,
if err != nil {
return err
}
if curBeginID > minBeginID {
if curBeginID < minBeginID {
minBeginID = curBeginID
}
}
@ -815,13 +827,25 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID,
if err != nil {
return err
}
ackedTsKey := fixedAckedTsKey + "/" + strconv.FormatInt(minBeginID, 10)
ts := time.Now().Unix()
err = rmq.kv.Save(ackedTsKey, strconv.FormatInt(ts, 10))
ts := strconv.FormatInt(time.Now().Unix(), 10)
// current behavior is to ack all safe msgID(before minBeginID)
// TODO @silverxia @yukun ack only page separator msg id
ackMsgKvs := make(map[string]string)
totalAckMsgSize := int64(0)
for i, id := range ids {
// depends on the ids are monotonically increasing
if id <= minBeginID {
totalAckMsgSize += msgSize[i]
key := path.Join(fixedAckedTsKey, strconv.FormatInt(id, 10))
ackMsgKvs[key] = ts
}
}
err = rmq.kv.MultiSave(ackMsgKvs)
if err != nil {
return err
}
if minBeginID == newID {
if minBeginID == lastID {
// Means the begin_id of topic update to newID, so needs to update acked size
ackedSizeKey := AckedSizeTitle + topicName
ackedSizeVal, err := rmq.kv.Load(ackedSizeKey)
@ -832,7 +856,7 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID,
if err != nil {
return err
}
ackedSize += msgSize
ackedSize += totalAckMsgSize
err = rmq.kv.Save(ackedSizeKey, strconv.FormatInt(ackedSize, 10))
if err != nil {
return err