mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-28 22:45:26 +08:00
Fix SeekToLatest memory leakage and remove redundant logic (#11057)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
93339c7f49
commit
7d671b14ff
@ -12,7 +12,6 @@
|
||||
package rocksmq
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
@ -723,30 +722,28 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error {
|
||||
|
||||
fixChanName, _ := fixChannelName(topicName)
|
||||
iter.Seek([]byte(fixChanName + "/"))
|
||||
var last []byte
|
||||
iKey := iter.Key()
|
||||
// iter.SeekToLast bypass prefix limitation
|
||||
// use for range until find next prefix for now
|
||||
// use for range until iterator invalid for now
|
||||
if iter.Valid() {
|
||||
last = iter.Key().Data()
|
||||
current := last
|
||||
for bytes.HasPrefix(current, []byte(topicName)) {
|
||||
iter.Next()
|
||||
for iter.Valid() {
|
||||
iKey.Free()
|
||||
iKey = iter.Key()
|
||||
iter.Next()
|
||||
if iter.Valid() {
|
||||
current = last
|
||||
last = iter.Key().Data()
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// In this case there are no messages, so shouldn't return error
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(last) <= FixedChannelNameLen {
|
||||
if iKey == nil {
|
||||
return nil
|
||||
}
|
||||
msgID, err := strconv.ParseInt(string(last)[FixedChannelNameLen+1:], 10, 64)
|
||||
|
||||
seekMsgID := string(iKey.Data()) // bytes to string, copy
|
||||
iKey.Free()
|
||||
|
||||
msgID, err := strconv.ParseInt(seekMsgID[FixedChannelNameLen+1:], 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user