mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 09:08:43 +08:00
Check rmq retention only by page (#11508)
Signed-off-by: fishpenguin <kun.yu@zilliz.com>
This commit is contained in:
parent
29e9adc7be
commit
897d6b5fc8
@ -530,12 +530,6 @@ func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes
|
||||
return err
|
||||
}
|
||||
|
||||
// if pageInfo, ok := rmq.retentionInfo.pageInfo.Load(topicName); ok {
|
||||
// pageInfo.(*topicPageInfo).pageEndID = append(pageInfo.(*topicPageInfo).pageEndID, pageEndID)
|
||||
// pageInfo.(*topicPageInfo).pageMsgSize[pageEndID] = newPageSize
|
||||
// rmq.retentionInfo.pageInfo.Store(topicName, pageInfo)
|
||||
// }
|
||||
|
||||
// Update message size to 0
|
||||
err = rmq.kv.Save(msgSizeKey, strconv.FormatInt(0, 10))
|
||||
if err != nil {
|
||||
@ -638,10 +632,8 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
|
||||
}
|
||||
|
||||
consumedIDs := make([]UniqueID, 0, len(consumerMessage))
|
||||
msgSize := make([]int64, 0, len(consumerMessage))
|
||||
for _, msg := range consumerMessage {
|
||||
consumedIDs = append(consumedIDs, msg.MsgID)
|
||||
msgSize = append(msgSize, int64(len(msg.Payload)))
|
||||
}
|
||||
newID := consumedIDs[len(consumedIDs)-1]
|
||||
err = rmq.seek(topicName, groupName, newID)
|
||||
@ -650,7 +642,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go rmq.updateAckedInfo(topicName, groupName, consumedIDs, msgSize)
|
||||
go rmq.updateAckedInfo(topicName, groupName, consumedIDs)
|
||||
|
||||
return consumerMessage, nil
|
||||
}
|
||||
@ -771,7 +763,7 @@ func (rmq *rocksmq) Notify(topicName, groupName string) {
|
||||
}
|
||||
|
||||
// updateAckedInfo update acked informations for retention after consume
|
||||
func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID, msgSize []int64) error {
|
||||
func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID) error {
|
||||
if len(ids) == 0 {
|
||||
return nil
|
||||
}
|
||||
@ -786,24 +778,67 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID,
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
firstID := ids[0]
|
||||
lastID := ids[len(ids)-1]
|
||||
|
||||
fixedBeginIDKey, err := constructKey(BeginIDTitle, topicName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Update begin_id for the consumer_group
|
||||
|
||||
// 1. Update begin_id for the consumer_group
|
||||
beginIDKey := fixedBeginIDKey + "/" + groupName
|
||||
err = rmq.kv.Save(beginIDKey, strconv.FormatInt(lastID, 10))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update begin_id for topic
|
||||
// 2. Try to get the page id between first ID and last ID of ids
|
||||
pageMsgPrefix, err := constructKey(PageMsgSizeTitle, topicName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
readOpts := gorocksdb.NewDefaultReadOptions()
|
||||
defer readOpts.Destroy()
|
||||
readOpts.SetPrefixSameAsStart(true)
|
||||
iter := rmq.kv.(*rocksdbkv.RocksdbKV).DB.NewIterator(readOpts)
|
||||
defer iter.Close()
|
||||
|
||||
pageIDs := make([]UniqueID, 0)
|
||||
pageMsgKey := pageMsgPrefix + "/" + strconv.FormatInt(firstID, 10)
|
||||
for iter.Seek([]byte(pageMsgKey)); iter.Valid(); iter.Next() {
|
||||
key := iter.Key()
|
||||
pageID, err := strconv.ParseInt(string(key.Data())[FixedChannelNameLen+1:], 10, 64)
|
||||
if key != nil {
|
||||
key.Free()
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if pageID <= lastID {
|
||||
pageIDs = append(pageIDs, pageID)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(pageIDs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
fixedAckedTsKey, err := constructKey(AckedTsTitle, topicName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 3. Update acked ts and acked size for pageIDs
|
||||
if vals, ok := rmq.consumers.Load(topicName); ok {
|
||||
var minBeginID int64 = math.MaxInt64
|
||||
for _, v := range vals.([]*Consumer) {
|
||||
curBeginIDKey := fixedBeginIDKey + "/" + v.GroupName
|
||||
consumers, ok := vals.([]*Consumer)
|
||||
if !ok || len(consumers) == 0 {
|
||||
return nil
|
||||
}
|
||||
for _, v := range consumers {
|
||||
curBeginIDKey := path.Join(fixedBeginIDKey, v.GroupName)
|
||||
curBeginIDVal, err := rmq.kv.Load(curBeginIDKey)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -816,52 +851,53 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID,
|
||||
minBeginID = curBeginID
|
||||
}
|
||||
}
|
||||
topicBeginIDKey := TopicBeginIDTitle + topicName
|
||||
err = rmq.kv.Save(topicBeginIDKey, strconv.FormatInt(minBeginID, 10))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update acked info for msg of begin id
|
||||
fixedAckedTsKey, err := constructKey(AckedTsTitle, topicName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ts := strconv.FormatInt(time.Now().Unix(), 10)
|
||||
// current behavior is to ack all safe msgID(before minBeginID)
|
||||
// TODO @silverxia @yukun ack only page separator msg id
|
||||
ackMsgKvs := make(map[string]string)
|
||||
nowTs := strconv.FormatInt(time.Now().Unix(), 10)
|
||||
ackedTsKvs := make(map[string]string)
|
||||
totalAckMsgSize := int64(0)
|
||||
for i, id := range ids {
|
||||
// depends on the ids are monotonically increasing
|
||||
if id <= minBeginID {
|
||||
totalAckMsgSize += msgSize[i]
|
||||
key := path.Join(fixedAckedTsKey, strconv.FormatInt(id, 10))
|
||||
ackMsgKvs[key] = ts
|
||||
}
|
||||
}
|
||||
err = rmq.kv.MultiSave(ackMsgKvs)
|
||||
|
||||
fixedPageSizeKey, err := constructKey(PageMsgSizeTitle, topicName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if minBeginID == lastID {
|
||||
// Means the begin_id of topic update to newID, so needs to update acked size
|
||||
ackedSizeKey := AckedSizeTitle + topicName
|
||||
ackedSizeVal, err := rmq.kv.Load(ackedSizeKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ackedSize, err := strconv.ParseInt(ackedSizeVal, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ackedSize += totalAckMsgSize
|
||||
err = rmq.kv.Save(ackedSizeKey, strconv.FormatInt(ackedSize, 10))
|
||||
if err != nil {
|
||||
return err
|
||||
for _, pID := range pageIDs {
|
||||
if pID <= minBeginID {
|
||||
// Update acked info for message pID
|
||||
pageAckedTsKey := path.Join(fixedAckedTsKey, strconv.FormatInt(pID, 10))
|
||||
ackedTsKvs[pageAckedTsKey] = nowTs
|
||||
|
||||
// get current page message size
|
||||
pageMsgSizeKey := path.Join(fixedPageSizeKey, strconv.FormatInt(pID, 10))
|
||||
pageMsgSizeVal, err := rmq.kv.Load(pageMsgSizeKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pageMsgSize, err := strconv.ParseInt(pageMsgSizeVal, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
totalAckMsgSize += pageMsgSize
|
||||
}
|
||||
}
|
||||
err = rmq.kv.MultiSave(ackedTsKvs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ackedSizeKey := AckedSizeTitle + topicName
|
||||
ackedSizeVal, err := rmq.kv.Load(ackedSizeKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ackedSize, err := strconv.ParseInt(ackedSizeVal, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ackedSize += totalAckMsgSize
|
||||
err = rmq.kv.Save(ackedSizeKey, strconv.FormatInt(ackedSize, 10))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -12,7 +12,6 @@
|
||||
package rocksmq
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
@ -54,32 +53,6 @@ type retentionInfo struct {
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
// Interface LoadWithPrefix() in rocksdbkv needs to close db instance first and then reopen,
|
||||
// which will cause crash when other goroutines operate the db instance. So here implement a
|
||||
// prefixLoad without reopen db instance.
|
||||
func prefixLoad(db *gorocksdb.DB, prefix string) ([]string, []string, error) {
|
||||
if db == nil {
|
||||
return nil, nil, errors.New("Rocksdb instance is nil when do prefixLoad")
|
||||
}
|
||||
readOpts := gorocksdb.NewDefaultReadOptions()
|
||||
defer readOpts.Destroy()
|
||||
readOpts.SetPrefixSameAsStart(true)
|
||||
iter := db.NewIterator(readOpts)
|
||||
defer iter.Close()
|
||||
keys := make([]string, 0)
|
||||
values := make([]string, 0)
|
||||
iter.Seek([]byte(prefix))
|
||||
for ; iter.Valid(); iter.Next() {
|
||||
key := iter.Key()
|
||||
value := iter.Value()
|
||||
keys = append(keys, string(key.Data()))
|
||||
key.Free()
|
||||
values = append(values, string(value.Data()))
|
||||
value.Free()
|
||||
}
|
||||
return keys, values, nil
|
||||
}
|
||||
|
||||
func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInfo, error) {
|
||||
ri := &retentionInfo{
|
||||
topics: sync.Map{},
|
||||
@ -163,9 +136,9 @@ func (ri *retentionInfo) Stop() {
|
||||
func (ri *retentionInfo) expiredCleanUp(topic string) error {
|
||||
log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic)
|
||||
var deletedAckedSize int64 = 0
|
||||
var startID UniqueID
|
||||
var endID UniqueID
|
||||
var startID UniqueID = 0
|
||||
var pageStartID UniqueID = 0
|
||||
var pageEndID UniqueID = 0
|
||||
var err error
|
||||
|
||||
fixedAckedTsKey, _ := constructKey(AckedTsTitle, topic)
|
||||
@ -203,7 +176,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
|
||||
return err
|
||||
}
|
||||
if msgTimeExpiredCheck(ackedTs) {
|
||||
endID = pageID
|
||||
pageEndID = pageID
|
||||
pValue := pageIter.Value()
|
||||
size, err := strconv.ParseInt(string(pValue.Data()), 10, 64)
|
||||
if pValue != nil {
|
||||
@ -219,63 +192,62 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
|
||||
}
|
||||
}
|
||||
|
||||
pageEndID := endID
|
||||
// TODO(yukun): Remove ackedTs expiredCheck one by one
|
||||
// ackedReadOpts := gorocksdb.NewDefaultReadOptions()
|
||||
// defer ackedReadOpts.Destroy()
|
||||
// ackedReadOpts.SetPrefixSameAsStart(true)
|
||||
// ackedIter := ri.kv.DB.NewIterator(ackedReadOpts)
|
||||
// defer ackedIter.Close()
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// ackedIter.Seek([]byte(fixedAckedTsKey))
|
||||
// if !ackedIter.Valid() {
|
||||
// return nil
|
||||
// }
|
||||
|
||||
ackedReadOpts := gorocksdb.NewDefaultReadOptions()
|
||||
defer ackedReadOpts.Destroy()
|
||||
ackedReadOpts.SetPrefixSameAsStart(true)
|
||||
ackedIter := ri.kv.DB.NewIterator(ackedReadOpts)
|
||||
defer ackedIter.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ackedIter.Seek([]byte(fixedAckedTsKey))
|
||||
if !ackedIter.Valid() {
|
||||
return nil
|
||||
}
|
||||
// startID, err = strconv.ParseInt(string(ackedIter.Key().Data())[FixedChannelNameLen+1:], 10, 64)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// if endID > startID {
|
||||
// newPos := fixedAckedTsKey + "/" + strconv.FormatInt(endID, 10)
|
||||
// ackedIter.Seek([]byte(newPos))
|
||||
// }
|
||||
|
||||
startID, err = strconv.ParseInt(string(ackedIter.Key().Data())[FixedChannelNameLen+1:], 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if endID > startID {
|
||||
newPos := fixedAckedTsKey + "/" + strconv.FormatInt(endID, 10)
|
||||
ackedIter.Seek([]byte(newPos))
|
||||
}
|
||||
// for ; ackedIter.Valid(); ackedIter.Next() {
|
||||
// aKey := ackedIter.Key()
|
||||
// aValue := ackedIter.Value()
|
||||
// ackedTs, err := strconv.ParseInt(string(aValue.Data()), 10, 64)
|
||||
// if aValue != nil {
|
||||
// aValue.Free()
|
||||
// }
|
||||
// if err != nil {
|
||||
// if aKey != nil {
|
||||
// aKey.Free()
|
||||
// }
|
||||
// return err
|
||||
// }
|
||||
// if msgTimeExpiredCheck(ackedTs) {
|
||||
// endID, err = strconv.ParseInt(string(aKey.Data())[FixedChannelNameLen+1:], 10, 64)
|
||||
// if aKey != nil {
|
||||
// aKey.Free()
|
||||
// }
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// } else {
|
||||
// if aKey != nil {
|
||||
// aKey.Free()
|
||||
// }
|
||||
// break
|
||||
// }
|
||||
// }
|
||||
|
||||
for ; ackedIter.Valid(); ackedIter.Next() {
|
||||
aKey := ackedIter.Key()
|
||||
aValue := ackedIter.Value()
|
||||
ackedTs, err := strconv.ParseInt(string(aValue.Data()), 10, 64)
|
||||
if aValue != nil {
|
||||
aValue.Free()
|
||||
}
|
||||
if err != nil {
|
||||
if aKey != nil {
|
||||
aKey.Free()
|
||||
}
|
||||
return err
|
||||
}
|
||||
if msgTimeExpiredCheck(ackedTs) {
|
||||
endID, err = strconv.ParseInt(string(aKey.Data())[FixedChannelNameLen+1:], 10, 64)
|
||||
if aKey != nil {
|
||||
aKey.Free()
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if aKey != nil {
|
||||
aKey.Free()
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if endID == 0 {
|
||||
if pageEndID == 0 {
|
||||
log.Debug("All messages are not time expired")
|
||||
}
|
||||
log.Debug("Expired check by retention time", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize))
|
||||
log.Debug("Expired check by retention time", zap.Any("topic", topic), zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize))
|
||||
|
||||
ackedSizeKey := AckedSizeTitle + topic
|
||||
totalAckedSizeVal, err := ri.kv.Load(ackedSizeKey)
|
||||
@ -303,21 +275,20 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
|
||||
}
|
||||
curDeleteSize := deletedAckedSize + size
|
||||
if msgSizeExpiredCheck(curDeleteSize, totalAckedSize) {
|
||||
endID, err = strconv.ParseInt(pKeyStr[FixedChannelNameLen+1:], 10, 64)
|
||||
pageEndID, err = strconv.ParseInt(pKeyStr[FixedChannelNameLen+1:], 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pageEndID = endID
|
||||
deletedAckedSize += size
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
if endID == 0 {
|
||||
if pageEndID == 0 {
|
||||
log.Debug("All messages are not expired")
|
||||
return nil
|
||||
}
|
||||
log.Debug("ExpiredCleanUp: ", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize))
|
||||
log.Debug("ExpiredCleanUp: ", zap.Any("topic", topic), zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize))
|
||||
|
||||
writeBatch := gorocksdb.NewWriteBatch()
|
||||
defer writeBatch.Destroy()
|
||||
@ -333,14 +304,11 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
|
||||
}
|
||||
|
||||
ackedStartIDKey := fixedAckedTsKey + "/" + strconv.Itoa(int(startID))
|
||||
ackedEndIDKey := fixedAckedTsKey + "/" + strconv.Itoa(int(endID+1))
|
||||
if startID > endID {
|
||||
ackedEndIDKey := fixedAckedTsKey + "/" + strconv.Itoa(int(pageEndID+1))
|
||||
if startID > pageEndID {
|
||||
return nil
|
||||
} else if startID == endID {
|
||||
writeBatch.Delete([]byte(ackedStartIDKey))
|
||||
} else {
|
||||
writeBatch.DeleteRange([]byte(ackedStartIDKey), []byte(ackedEndIDKey))
|
||||
}
|
||||
writeBatch.DeleteRange([]byte(ackedStartIDKey), []byte(ackedEndIDKey))
|
||||
|
||||
ll, ok := topicMu.Load(topic)
|
||||
if !ok {
|
||||
@ -363,7 +331,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
|
||||
newAckedSize := currentAckedSize - deletedAckedSize
|
||||
writeBatch.Put([]byte(ackedSizeKey), []byte(strconv.FormatInt(newAckedSize, 10)))
|
||||
|
||||
err = DeleteMessages(ri.db, topic, startID, endID)
|
||||
err = DeleteMessages(ri.db, topic, startID, pageEndID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -52,6 +52,7 @@ func genRandonName() string {
|
||||
func TestRmqRetention(t *testing.T) {
|
||||
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0)
|
||||
atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 0)
|
||||
atomic.StoreInt64(&RocksmqPageSize, 10)
|
||||
atomic.StoreInt64(&TickerTimeInSeconds, 2)
|
||||
defer atomic.StoreInt64(&TickerTimeInSeconds, 6)
|
||||
kvPath := retentionPath + kvPathSuffix
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user