mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
Refactor rocksmq comments and logs for readability (#15801)
Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
parent
529098fd87
commit
e394ba8c88
@ -289,17 +289,18 @@ func (rmq *rocksmq) CreateTopic(topicName string) error {
|
|||||||
|
|
||||||
// Check if topicName contains "/"
|
// Check if topicName contains "/"
|
||||||
if strings.Contains(topicName, "/") {
|
if strings.Contains(topicName, "/") {
|
||||||
log.Error("RocksMQ: create topic failed because topic name contains \"/\"", zap.String("topic", topicName))
|
log.Error("rocksmq failed to create topic for topic name contains \"/\"", zap.String("topic", topicName))
|
||||||
return retry.Unrecoverable(fmt.Errorf("topic name = %s contains \"/\"", topicName))
|
return retry.Unrecoverable(fmt.Errorf("topic name = %s contains \"/\"", topicName))
|
||||||
}
|
}
|
||||||
|
|
||||||
// TopicBeginIDTitle is the only identifier of a topic exist or not
|
// topicIDKey is the only identifier of a topic
|
||||||
topicIDKey := TopicIDTitle + topicName
|
topicIDKey := TopicIDTitle + topicName
|
||||||
val, err := rmq.kv.Load(topicIDKey)
|
val, err := rmq.kv.Load(topicIDKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if val != "" {
|
if val != "" {
|
||||||
|
log.Debug("rocksmq topic already exists ", zap.String("topic", topicName))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -307,19 +308,17 @@ func (rmq *rocksmq) CreateTopic(topicName string) error {
|
|||||||
topicMu.Store(topicName, new(sync.Mutex))
|
topicMu.Store(topicName, new(sync.Mutex))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// msgSizeKey -> msgSize
|
||||||
|
// topicIDKey -> topic creating time
|
||||||
kvs := make(map[string]string)
|
kvs := make(map[string]string)
|
||||||
|
|
||||||
// Initialize topic message size to 0
|
// Initialize topic message size to 0
|
||||||
msgSizeKey := MessageSizeTitle + topicName
|
msgSizeKey := MessageSizeTitle + topicName
|
||||||
kvs[msgSizeKey] = "0"
|
kvs[msgSizeKey] = "0"
|
||||||
|
|
||||||
// Initialize topic id to its create Tme, we don't really use it for now
|
// Initialize topic id to its creating time, we don't really use it for now
|
||||||
nowTs := strconv.FormatInt(time.Now().Unix(), 10)
|
nowTs := strconv.FormatInt(time.Now().Unix(), 10)
|
||||||
kvs[topicIDKey] = nowTs
|
kvs[topicIDKey] = nowTs
|
||||||
err = rmq.kv.Save(topicIDKey, nowTs)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
rmq.kv.MultiSave(kvs)
|
rmq.kv.MultiSave(kvs)
|
||||||
|
|
||||||
rmq.retentionInfo.mutex.Lock()
|
rmq.retentionInfo.mutex.Lock()
|
||||||
@ -329,7 +328,7 @@ func (rmq *rocksmq) CreateTopic(topicName string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DestroyTopic removes messages for topic in rocksdb
|
// DestroyTopic removes messages for topic in rocksmq
|
||||||
func (rmq *rocksmq) DestroyTopic(topicName string) error {
|
func (rmq *rocksmq) DestroyTopic(topicName string) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
ll, ok := topicMu.Load(topicName)
|
ll, ok := topicMu.Load(topicName)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user