From fdbfa62b270eb498193323adfea3983b7c2eac6f Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Thu, 7 Oct 2021 22:18:56 +0800 Subject: [PATCH] Add log for rocksmq (#8889) Signed-off-by: xiaofan-luan --- .../rocksmq/server/rocksmq/rocksmq_impl.go | 40 ++++++++++++------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 18e3d62e6a..5cf2211f8c 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -162,7 +162,7 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro rmq.retentionInfo = ri rmq.retentionInfo.startRetentionInfo() - + log.Debug("Rocksmq start successfully ", zap.String("name", name)) return rmq, nil } @@ -202,15 +202,16 @@ func (rmq *rocksmq) checkKeyExist(key string) bool { } func (rmq *rocksmq) CreateTopic(topicName string) error { + start := time.Now() beginKey := topicName + "/begin_id" endKey := topicName + "/end_id" // Check if topic exist if rmq.checkKeyExist(beginKey) || rmq.checkKeyExist(endKey) { - log.Debug("RocksMQ: " + beginKey + " or " + endKey + " existed.") + log.Warn("RocksMQ: " + beginKey + " or " + endKey + " existed.") return nil } - + // TODO change rmq kv save logic into a batch err := rmq.kv.Save(beginKey, "0") if err != nil { return err @@ -265,11 +266,12 @@ func (rmq *rocksmq) CreateTopic(topicName string) error { rmq.retentionInfo.ackedInfo.Store(topicName, &topicAckedInfo{ ackedTs: map[UniqueID]int64{}, }) + log.Debug("Rocksmq create topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds())) return nil } func (rmq *rocksmq) DestroyTopic(topicName string) error { - log.Debug("In DestroyTopic") + start := time.Now() beginKey := topicName + "/begin_id" endKey := topicName + "/end_id" @@ -312,7 +314,7 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error { rmq.retentionInfo.ackedInfo.Delete(topicName) rmq.retentionInfo.lastRetentionTime.Delete(topicName) rmq.retentionInfo.pageInfo.Delete(topicName) - + log.Debug("Rocksmq destroy topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds())) return nil } @@ -331,6 +333,7 @@ func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Cons } func (rmq *rocksmq) CreateConsumerGroup(topicName, groupName string) error { + start := time.Now() key := constructCurrentID(topicName, groupName) if rmq.checkKeyExist(key) { log.Debug("RocksMQ: " + key + " existed.") @@ -340,11 +343,14 @@ func (rmq *rocksmq) CreateConsumerGroup(topicName, groupName string) error { if err != nil { return err } - + log.Debug("Rocksmq create consumer group successfully ", zap.String("topic", topicName), + zap.String("group", groupName), + zap.Int64("elapsed", time.Since(start).Milliseconds())) return nil } func (rmq *rocksmq) RegisterConsumer(consumer *Consumer) { + start := time.Now() if vals, ok := rmq.consumers.Load(consumer.Topic); ok { for _, v := range vals.([]*Consumer) { if v.GroupName == consumer.GroupName { @@ -359,9 +365,11 @@ func (rmq *rocksmq) RegisterConsumer(consumer *Consumer) { consumers[0] = consumer rmq.consumers.Store(consumer.Topic, consumers) } + log.Debug("Rocksmq register consumer successfully ", zap.String("topic", consumer.Topic), zap.Int64("elapsed", time.Since(start).Milliseconds())) } func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error { + start := time.Now() ll, ok := topicMu.Load(topicName) if !ok { return fmt.Errorf("topic name = %s not exist", topicName) @@ -389,7 +397,9 @@ func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error { } } } - + log.Debug("Rocksmq destroy consumer group successfully ", zap.String("topic", topicName), + zap.String("group", groupName), + zap.Int64("elapsed", time.Since(start).Milliseconds())) return nil } @@ -479,14 +489,14 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni // Update message page info // TODO(yukun): Should this be in a go routine - err = rmq.UpdatePageInfo(topicName, msgIDs, msgSizes) + err = rmq.updatePageInfo(topicName, msgIDs, msgSizes) if err != nil { return []UniqueID{}, err } return msgIDs, nil } -func (rmq *rocksmq) UpdatePageInfo(topicName string, msgIDs []UniqueID, msgSizes map[UniqueID]int64) error { +func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes map[UniqueID]int64) error { msgSizeKey := MessageSizeTitle + topicName msgSizeVal, err := rmq.kv.Load(msgSizeKey) if err != nil { @@ -622,7 +632,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum } msgSize := len(consumerMessage[len(consumerMessage)-1].Payload) - go rmq.UpdateAckedInfo(topicName, groupName, newID, int64(msgSize)) + go rmq.updateAckedInfo(topicName, groupName, newID, int64(msgSize)) return consumerMessage, nil } @@ -633,13 +643,13 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err defer rmq.storeMu.Unlock() key := constructCurrentID(topicName, groupName) if !rmq.checkKeyExist(key) { - log.Debug("RocksMQ: channel " + key + " not exists") + log.Warn("RocksMQ: channel " + key + " not exists") return fmt.Errorf("ConsumerGroup %s, channel %s not exists", groupName, topicName) } storeKey, err := combKey(topicName, msgID) if err != nil { - log.Debug("RocksMQ: combKey(" + topicName + "," + strconv.FormatInt(msgID, 10) + ") failed") + log.Warn("RocksMQ: combKey(" + topicName + "," + strconv.FormatInt(msgID, 10) + ") failed") return err } @@ -648,14 +658,14 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err val, err := rmq.store.Get(opts, []byte(storeKey)) defer val.Free() if err != nil { - log.Debug("RocksMQ: get " + storeKey + " failed") + log.Warn("RocksMQ: get " + storeKey + " failed") return err } /* Step II: Save current_id in kv */ err = rmq.kv.Save(key, strconv.FormatInt(msgID, 10)) if err != nil { - log.Debug("RocksMQ: save " + key + " failed") + log.Warn("RocksMQ: save " + key + " failed") return err } @@ -708,7 +718,7 @@ func (rmq *rocksmq) Notify(topicName, groupName string) { } } -func (rmq *rocksmq) UpdateAckedInfo(topicName, groupName string, newID UniqueID, msgSize int64) error { +func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID, msgSize int64) error { ll, ok := topicMu.Load(topicName) if !ok { return fmt.Errorf("topic name = %s not exist", topicName)