diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index b2258241d8..4b8e7732a6 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -581,9 +581,11 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni if getProduceTime > 200 { log.Warn("rocksmq produce too slowly", zap.String("topic", topicName), zap.Int64("get lock elapse", getLockTime), - zap.Int64("alloc elapse", allocTime), - zap.Int64("write elapse", writeTime), - zap.Int64("updatePage elapse", getProduceTime)) + zap.Int64("alloc elapse", allocTime-getLockTime), + zap.Int64("write elapse", writeTime-allocTime), + zap.Int64("updatePage elapse", getProduceTime-writeTime), + zap.Int64("produce total elapse", getProduceTime), + ) } return msgIDs, nil } @@ -695,6 +697,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum if err := iter.Err(); err != nil { return nil, err } + iterTime := time.Since(start).Milliseconds() // When already consume to last mes, an empty slice will be returned if len(consumerMessage) == 0 { @@ -713,13 +716,17 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum zap.String("groupName", groupName), zap.Error(err)) return nil, err } + updateAckedTime := time.Since(start).Milliseconds() rmq.moveConsumePos(topicName, groupName, newID+1) // TODO add this to monitor metrics getConsumeTime := time.Since(start).Milliseconds() if getConsumeTime > 200 { log.Warn("rocksmq consume too slowly", zap.String("topic", topicName), - zap.Int64("get lock elapse", getLockTime), zap.Int64("consume elapse", getConsumeTime)) + zap.Int64("get lock elapse", getLockTime), + zap.Int64("iterator elapse", iterTime-getLockTime), + zap.Int64("updateAckedInfo elapse", updateAckedTime-iterTime), + zap.Int64("total consume elapse", getConsumeTime)) } return consumerMessage, nil }