diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index d125eabcf9..0e9ef0327f 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -401,6 +401,7 @@ func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error { // Produce produces messages for topic and updates page infos for retention func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]UniqueID, error) { + start := time.Now() ll, ok := topicMu.Load(topicName) if !ok { return []UniqueID{}, fmt.Errorf("topic name = %s not exist", topicName) @@ -412,6 +413,11 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni lock.Lock() defer lock.Unlock() + getLockTime := time.Since(start).Milliseconds() + if getLockTime > 200 { + log.Warn("rocksmq produce get lock slow", zap.Int64("elapse", getLockTime)) + } + msgLen := len(messages) idStart, idEnd, err := rmq.idAllocator.Alloc(uint32(msgLen)) @@ -490,6 +496,8 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni if err != nil { return []UniqueID{}, err } + log.Debug("Rocksmq produce successfully ", zap.String("topic", topicName), + zap.Int64("elapsed", time.Since(start).Milliseconds())) return msgIDs, nil } @@ -534,6 +542,7 @@ func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes // 2. Update current_id to the last consumed message // 3. Update ack informations in rocksdb func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error) { + start := time.Now() ll, ok := topicMu.Load(topicName) if !ok { return nil, fmt.Errorf("topic name = %s not exist", topicName) @@ -624,7 +633,9 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum } go rmq.updateAckedInfo(topicName, groupName, consumedIDs) - + log.Debug("Rocksmq produce successfully ", zap.String("topic", topicName), + zap.String("groupName", groupName), + zap.Int64("elapsed", time.Since(start).Milliseconds())) return consumerMessage, nil }