diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go index eb54227b20..a03d9bc86d 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go @@ -275,27 +275,54 @@ func (rmq *rocksmq) Close() { } //print rmq consumer Info -func (rmq *rocksmq) Info() { +func (rmq *rocksmq) Info() bool { + rtn := true rmq.consumers.Range(func(key, vals interface{}) bool { topic, _ := key.(string) - consumers, _ := vals.([]*Consumer) + consumerList, _ := vals.([]*Consumer) - consumersPosition := make([]UniqueID, len(consumers)) - consumersName := make([]string, len(consumers)) - for id, consumer := range consumers { - groupKey := constructCurrentID(consumer.Topic, consumer.GroupName) - groupPosition, ok := rmq.consumersID.Load(groupKey) + minConsumerPosition := UniqueID(-1) + minConsumerGroupName := "" + for _, consumer := range consumerList { + consumerKey := constructCurrentID(consumer.Topic, consumer.GroupName) + consumerPosition, ok := rmq.consumersID.Load(consumerKey) if !ok { log.Error("some group not regist", zap.String("topic", consumer.Topic), zap.String("groupName", consumer.GroupName)) continue } - consumersPosition[id] = groupPosition.(UniqueID) - consumersName[id] = consumer.GroupName + if minConsumerPosition == UniqueID(-1) || consumerPosition.(UniqueID) < minConsumerPosition { + minConsumerPosition = consumerPosition.(UniqueID) + minConsumerGroupName = consumer.GroupName + } } - log.Info("Rocksmq Info", zap.String("topic", topic), zap.Int("consumer num", len(consumers)), zap.Any("group names", consumersName), zap.Any("group positions", consumersPosition)) + pageTsSizeKey := constructKey(PageTsTitle, topic) + pages, _, err := rmq.kv.LoadWithPrefix(pageTsSizeKey) + if err != nil { + log.Error("Rocksmq get page num failed", zap.String("topic", topic)) + rtn = false + return false + } + + msgSizeKey := MessageSizeTitle + topic + msgSizeVal, err := rmq.kv.Load(msgSizeKey) + if err != nil { + log.Error("Rocksmq get last page size failed", zap.String("topic", topic)) + rtn = false + return false + } + + log.Info("Rocksmq Info", + zap.String("topic", topic), + zap.Int("consumer num", len(consumerList)), + zap.String("min position group names", minConsumerGroupName), + zap.Int64("min positions", minConsumerPosition), + zap.Int("page sum", len(pages)), + zap.String("last page size", msgSizeVal), + ) return true }) + return rtn } func (rmq *rocksmq) stopRetention() { @@ -629,7 +656,6 @@ func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes // Current page is full newPageSize := curMsgSize + msgSize pageEndID := id - log.Info("new page", zap.String("topic", topicName), zap.Int64("pageId", pageEndID)) // Update page message size for current page. key is page end ID pageMsgSizeKey := fixedPageSizeKey + "/" + strconv.FormatInt(pageEndID, 10) mutateBuffer[pageMsgSizeKey] = strconv.FormatInt(newPageSize, 10) diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go index e60973dd74..52727cbc3c 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go @@ -1057,3 +1057,48 @@ func TestRocksmq_updateAckedInfoErr(t *testing.T) { // update acked for all page in rmq but some consumer not in rmq.consumers assert.Error(t, rmq.updateAckedInfo(topicName, groupName, 0, ids[len(ids)-1])) } + +func TestRocksmq_Info(t *testing.T) { + ep := etcdEndpoints() + etcdCli, err := etcd.GetRemoteEtcdClient(ep) + assert.Nil(t, err) + etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root") + defer etcdKV.Close() + idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV) + _ = idAllocator.Initialize() + + name := "/tmp/rocksmq_testinfo" + defer os.RemoveAll(name) + kvName := name + "_meta_kv" + _ = os.RemoveAll(kvName) + defer os.RemoveAll(kvName) + var params paramtable.BaseTable + params.Init() + atomic.StoreInt64(&RocksmqPageSize, 10) + rmq, err := NewRocksMQ(params, name, idAllocator) + assert.Nil(t, err) + defer rmq.Close() + + topicName := "test_testinfo" + groupName := "test" + rmq.CreateTopic(topicName) + defer rmq.DestroyTopic(topicName) + + consumer := &Consumer{ + Topic: topicName, + GroupName: groupName, + } + + _ = rmq.DestroyConsumerGroup(topicName, groupName) + err = rmq.CreateConsumerGroup(topicName, groupName) + assert.Nil(t, err) + + err = rmq.RegisterConsumer(consumer) + assert.Nil(t, err) + + assert.True(t, rmq.Info()) + + //test error + rmq.kv = &rocksdbkv.RocksdbKV{} + assert.False(t, rmq.Info()) +}