From 7fe8e5068910f83d2275d9f8fb14e10010c11476 Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Thu, 20 Oct 2022 16:05:27 +0800 Subject: [PATCH] print some group info in rocksmq monitor (#19863) Signed-off-by: aoiasd Signed-off-by: aoiasd --- .../mq/mqimpl/rocksmq/server/rocksmq_impl.go | 29 ++++++++++++++++++- .../rocksmq/server/rocksmq_retention_test.go | 1 + 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go index de09425088..eb54227b20 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go @@ -222,7 +222,8 @@ func NewRocksMQ(params paramtable.BaseTable, name string, idAllocator allocator. // TODO add this to monitor metrics go func() { for { - time.Sleep(5 * time.Minute) + time.Sleep(10 * time.Minute) + log.Info("Rocksmq stats", zap.String("cache", kv.DB.GetProperty("rocksdb.block-cache-usage")), zap.String("rockskv memtable ", kv.DB.GetProperty("rocksdb.size-all-mem-tables")), @@ -237,6 +238,7 @@ func NewRocksMQ(params paramtable.BaseTable, name string, idAllocator allocator. zap.String("store l3 file num", db.GetProperty("rocksdb.num-files-at-level3")), zap.String("store l4 file num", db.GetProperty("rocksdb.num-files-at-level4")), ) + rmq.Info() } }() @@ -272,6 +274,30 @@ func (rmq *rocksmq) Close() { log.Info("Successfully close rocksmq") } +//print rmq consumer Info +func (rmq *rocksmq) Info() { + rmq.consumers.Range(func(key, vals interface{}) bool { + topic, _ := key.(string) + consumers, _ := vals.([]*Consumer) + + consumersPosition := make([]UniqueID, len(consumers)) + consumersName := make([]string, len(consumers)) + for id, consumer := range consumers { + groupKey := constructCurrentID(consumer.Topic, consumer.GroupName) + groupPosition, ok := rmq.consumersID.Load(groupKey) + if !ok { + log.Error("some group not regist", zap.String("topic", consumer.Topic), zap.String("groupName", consumer.GroupName)) + continue + } + consumersPosition[id] = groupPosition.(UniqueID) + consumersName[id] = consumer.GroupName + } + + log.Info("Rocksmq Info", zap.String("topic", topic), zap.Int("consumer num", len(consumers)), zap.Any("group names", consumersName), zap.Any("group positions", consumersPosition)) + return true + }) +} + func (rmq *rocksmq) stopRetention() { if rmq.retentionInfo != nil { rmq.retentionInfo.Stop() @@ -603,6 +629,7 @@ func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes // Current page is full newPageSize := curMsgSize + msgSize pageEndID := id + log.Info("new page", zap.String("topic", topicName), zap.Int64("pageId", pageEndID)) // Update page message size for current page. key is page end ID pageMsgSizeKey := fixedPageSizeKey + "/" + strconv.FormatInt(pageEndID, 10) mutateBuffer[pageMsgSizeKey] = strconv.FormatInt(newPageSize, 10) diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go index 3b54dd71fc..94c9ff188b 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go @@ -91,6 +91,7 @@ func TestRmqRetention_Basic(t *testing.T) { } assert.Equal(t, len(cMsgs), msgNum) + rmq.Info() time.Sleep(time.Duration(checkTimeInterval+1) * time.Second) // Seek to a previous consumed message, the message should be clean up