diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 3dab234cf8..974c4f9c49 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -289,17 +289,18 @@ func (rmq *rocksmq) CreateTopic(topicName string) error { // Check if topicName contains "/" if strings.Contains(topicName, "/") { - log.Error("RocksMQ: create topic failed because topic name contains \"/\"", zap.String("topic", topicName)) + log.Error("rocksmq failed to create topic for topic name contains \"/\"", zap.String("topic", topicName)) return retry.Unrecoverable(fmt.Errorf("topic name = %s contains \"/\"", topicName)) } - // TopicBeginIDTitle is the only identifier of a topic exist or not + // topicIDKey is the only identifier of a topic topicIDKey := TopicIDTitle + topicName val, err := rmq.kv.Load(topicIDKey) if err != nil { return err } if val != "" { + log.Debug("rocksmq topic already exists ", zap.String("topic", topicName)) return nil } @@ -307,19 +308,17 @@ func (rmq *rocksmq) CreateTopic(topicName string) error { topicMu.Store(topicName, new(sync.Mutex)) } + // msgSizeKey -> msgSize + // topicIDKey -> topic creating time kvs := make(map[string]string) + // Initialize topic message size to 0 msgSizeKey := MessageSizeTitle + topicName kvs[msgSizeKey] = "0" - // Initialize topic id to its create Tme, we don't really use it for now + // Initialize topic id to its creating time, we don't really use it for now nowTs := strconv.FormatInt(time.Now().Unix(), 10) kvs[topicIDKey] = nowTs - err = rmq.kv.Save(topicIDKey, nowTs) - if err != nil { - return err - } - rmq.kv.MultiSave(kvs) rmq.retentionInfo.mutex.Lock() @@ -329,7 +328,7 @@ func (rmq *rocksmq) CreateTopic(topicName string) error { return nil } -// DestroyTopic removes messages for topic in rocksdb +// DestroyTopic removes messages for topic in rocksmq func (rmq *rocksmq) DestroyTopic(topicName string) error { start := time.Now() ll, ok := topicMu.Load(topicName)