diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 3a00d218d0..7cfffdbf08 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -12,7 +12,6 @@ package rocksmq import ( - "bytes" "errors" "fmt" "strconv" @@ -723,30 +722,28 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error { fixChanName, _ := fixChannelName(topicName) iter.Seek([]byte(fixChanName + "/")) - var last []byte + iKey := iter.Key() // iter.SeekToLast bypass prefix limitation - // use for range until find next prefix for now + // use for range until iterator invalid for now if iter.Valid() { - last = iter.Key().Data() - current := last - for bytes.HasPrefix(current, []byte(topicName)) { + iter.Next() + for iter.Valid() { + iKey.Free() + iKey = iter.Key() iter.Next() - if iter.Valid() { - current = last - last = iter.Key().Data() - } else { - break - } } } else { // In this case there are no messages, so shouldn't return error return nil } - - if len(last) <= FixedChannelNameLen { + if iKey == nil { return nil } - msgID, err := strconv.ParseInt(string(last)[FixedChannelNameLen+1:], 10, 64) + + seekMsgID := string(iKey.Data()) // bytes to string, copy + iKey.Free() + + msgID, err := strconv.ParseInt(seekMsgID[FixedChannelNameLen+1:], 10, 64) if err != nil { return err }