mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-28 22:45:26 +08:00
Fix rocksdb retention ts not set (#11081)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
This commit is contained in:
parent
c63b804606
commit
ca56290e85
@ -261,24 +261,9 @@ func (rmq *rocksmq) CreateTopic(topicName string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Initialize last retention timestamp to time_now
|
||||
lastRetentionTsKey := LastRetTsTitle + topicName
|
||||
timeNow := time.Now().Unix()
|
||||
err = rmq.kv.Save(lastRetentionTsKey, strconv.FormatInt(timeNow, 10))
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
rmq.retentionInfo.mutex.Lock()
|
||||
defer rmq.retentionInfo.mutex.Unlock()
|
||||
rmq.retentionInfo.topics = append(rmq.retentionInfo.topics, topicName)
|
||||
// rmq.retentionInfo.pageInfo.Store(topicName, &topicPageInfo{
|
||||
// pageEndID: make([]UniqueID, 0),
|
||||
// pageMsgSize: map[UniqueID]int64{},
|
||||
// })
|
||||
// rmq.retentionInfo.lastRetentionTime.Store(topicName, timeNow)
|
||||
// rmq.retentionInfo.ackedInfo.Store(topicName, &topicAckedInfo{
|
||||
// ackedTs: map[UniqueID]int64{},
|
||||
// })
|
||||
rmq.retentionInfo.topics.Store(topicName, time.Now().Unix())
|
||||
log.Debug("Rocksmq create topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds()))
|
||||
return nil
|
||||
}
|
||||
@ -313,6 +298,8 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// just for clean up old topics, for new topics this is not required
|
||||
lastRetTsKey := LastRetTsTitle + topicName
|
||||
err = rmq.kv.Remove(lastRetTsKey)
|
||||
if err != nil {
|
||||
@ -325,15 +312,8 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error {
|
||||
}
|
||||
|
||||
topicMu.Delete(topicName)
|
||||
for i, name := range rmq.retentionInfo.topics {
|
||||
if topicName == name {
|
||||
rmq.retentionInfo.topics = append(rmq.retentionInfo.topics[:i], rmq.retentionInfo.topics[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
// rmq.retentionInfo.ackedInfo.Delete(topicName)
|
||||
// rmq.retentionInfo.lastRetentionTime.Delete(topicName)
|
||||
// rmq.retentionInfo.pageInfo.Delete(topicName)
|
||||
// clean up retention info
|
||||
rmq.retentionInfo.topics.Delete(topicName)
|
||||
log.Debug("Rocksmq destroy topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds()))
|
||||
return nil
|
||||
}
|
||||
@ -825,11 +805,6 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// if info, ok := rmq.retentionInfo.ackedInfo.Load(topicName); ok {
|
||||
// ackedInfo := info.(*topicAckedInfo)
|
||||
// ackedInfo.ackedTs[minBeginID] = ts
|
||||
// rmq.retentionInfo.ackedInfo.Store(topicName, ackedInfo)
|
||||
// }
|
||||
if minBeginID == newID {
|
||||
// Means the begin_id of topic update to newID, so needs to update acked size
|
||||
ackedSizeKey := AckedSizeTitle + topicName
|
||||
@ -846,11 +821,6 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// if info, ok := rmq.retentionInfo.ackedInfo.Load(topicName); ok {
|
||||
// ackedInfo := info.(*topicAckedInfo)
|
||||
// ackedInfo.ackedSize = ackedSize
|
||||
// rmq.retentionInfo.ackedInfo.Store(topicName, ackedInfo)
|
||||
// }
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
@ -14,7 +14,6 @@ package rocksmq
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -41,29 +40,10 @@ const (
|
||||
// TickerTimeInSeconds is the time of expired check, default 10 minutes
|
||||
var TickerTimeInSeconds int64 = 10 * MINUTE
|
||||
|
||||
type topicPageInfo struct {
|
||||
pageEndID []UniqueID
|
||||
pageMsgSize map[UniqueID]int64
|
||||
}
|
||||
|
||||
type topicAckedInfo struct {
|
||||
topicBeginID UniqueID
|
||||
// TODO(yukun): may need to delete ackedTs
|
||||
ackedTs map[UniqueID]UniqueID
|
||||
ackedSize int64
|
||||
}
|
||||
|
||||
type retentionInfo struct {
|
||||
topics []string
|
||||
// pageInfo map[string]*topicPageInfo
|
||||
pageInfo sync.Map
|
||||
// ackedInfo map[string]*topicAckedInfo
|
||||
ackedInfo sync.Map
|
||||
// Key is last_retention_time/${topic}
|
||||
// lastRetentionTime map[string]int64
|
||||
lastRetentionTime sync.Map
|
||||
|
||||
mutex sync.RWMutex
|
||||
// key is topic name, value is last retention type
|
||||
topics sync.Map
|
||||
mutex sync.RWMutex
|
||||
|
||||
kv *rocksdbkv.RocksdbKV
|
||||
db *gorocksdb.DB
|
||||
@ -101,15 +81,12 @@ func prefixLoad(db *gorocksdb.DB, prefix string) ([]string, []string, error) {
|
||||
|
||||
func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInfo, error) {
|
||||
ri := &retentionInfo{
|
||||
topics: make([]string, 0),
|
||||
pageInfo: sync.Map{},
|
||||
ackedInfo: sync.Map{},
|
||||
lastRetentionTime: sync.Map{},
|
||||
mutex: sync.RWMutex{},
|
||||
kv: kv,
|
||||
db: db,
|
||||
closeCh: make(chan struct{}),
|
||||
closeWg: sync.WaitGroup{},
|
||||
topics: sync.Map{},
|
||||
mutex: sync.RWMutex{},
|
||||
kv: kv,
|
||||
db: db,
|
||||
closeCh: make(chan struct{}),
|
||||
closeWg: sync.WaitGroup{},
|
||||
}
|
||||
// Get topic from topic begin id
|
||||
beginIDKeys, _, err := ri.kv.LoadWithPrefix(TopicBeginIDTitle)
|
||||
@ -118,7 +95,7 @@ func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInf
|
||||
}
|
||||
for _, key := range beginIDKeys {
|
||||
topic := key[len(TopicBeginIDTitle):]
|
||||
ri.topics = append(ri.topics, topic)
|
||||
ri.topics.Store(topic, time.Now().Unix())
|
||||
topicMu.Store(topic, new(sync.Mutex))
|
||||
}
|
||||
return ri, nil
|
||||
@ -129,152 +106,10 @@ func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInf
|
||||
func (ri *retentionInfo) startRetentionInfo() {
|
||||
// var wg sync.WaitGroup
|
||||
ri.kv.ResetPrefixLength(FixedChannelNameLen)
|
||||
// for _, topic := range ri.topics {
|
||||
// log.Debug("Start load retention info", zap.Any("topic", topic))
|
||||
// Load all page infos
|
||||
// wg.Add(1)
|
||||
// go ri.loadRetentionInfo(topic, &wg)
|
||||
// }
|
||||
// wg.Wait()
|
||||
// log.Debug("Finish load retention info, start retention")
|
||||
ri.closeWg.Add(1)
|
||||
go ri.retention()
|
||||
}
|
||||
|
||||
// Read retention infos from rocksdb so that retention check can be done based on memory data
|
||||
func (ri *retentionInfo) loadRetentionInfo(topic string, wg *sync.WaitGroup) {
|
||||
// TODO(yukun): If there needs to add lock
|
||||
// ll, ok := topicMu.Load(topic)
|
||||
// if !ok {
|
||||
// return fmt.Errorf("topic name = %s not exist", topic)
|
||||
// }
|
||||
// lock, ok := ll.(*sync.Mutex)
|
||||
// if !ok {
|
||||
// return fmt.Errorf("get mutex failed, topic name = %s", topic)
|
||||
// }
|
||||
// lock.Lock()
|
||||
// defer lock.Unlock()
|
||||
defer wg.Done()
|
||||
pageEndID := make([]UniqueID, 0)
|
||||
pageMsgSize := make(map[int64]UniqueID)
|
||||
|
||||
fixedPageSizeKey, err := constructKey(PageMsgSizeTitle, topic)
|
||||
if err != nil {
|
||||
log.Debug("ConstructKey failed", zap.Any("error", err))
|
||||
return
|
||||
}
|
||||
pageMsgSizePrefix := fixedPageSizeKey + "/"
|
||||
pageMsgSizeKeys, pageMsgSizeVals, err := prefixLoad(ri.kv.DB, pageMsgSizePrefix)
|
||||
if err != nil {
|
||||
log.Debug("PrefixLoad failed", zap.Any("error", err))
|
||||
return
|
||||
}
|
||||
for i, key := range pageMsgSizeKeys {
|
||||
endID, err := strconv.ParseInt(key[FixedChannelNameLen+1:], 10, 64)
|
||||
if err != nil {
|
||||
log.Debug("ParseInt failed", zap.Any("error", err))
|
||||
return
|
||||
}
|
||||
pageEndID = append(pageEndID, endID)
|
||||
|
||||
msgSize, err := strconv.ParseInt(pageMsgSizeVals[i], 10, 64)
|
||||
if err != nil {
|
||||
log.Debug("ParseInt failed", zap.Any("error", err))
|
||||
return
|
||||
}
|
||||
pageMsgSize[endID] = msgSize
|
||||
}
|
||||
topicPageInfo := &topicPageInfo{
|
||||
pageEndID: pageEndID,
|
||||
pageMsgSize: pageMsgSize,
|
||||
}
|
||||
|
||||
// Load all acked infos
|
||||
ackedTs := make(map[UniqueID]UniqueID)
|
||||
|
||||
topicBeginIDKey := TopicBeginIDTitle + topic
|
||||
topicBeginIDVal, err := ri.kv.Load(topicBeginIDKey)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
topicBeginID, err := strconv.ParseInt(topicBeginIDVal, 10, 64)
|
||||
if err != nil {
|
||||
log.Debug("ParseInt failed", zap.Any("error", err))
|
||||
return
|
||||
}
|
||||
|
||||
ackedTsPrefix, err := constructKey(AckedTsTitle, topic)
|
||||
if err != nil {
|
||||
log.Debug("ConstructKey failed", zap.Any("error", err))
|
||||
return
|
||||
}
|
||||
keys, vals, err := prefixLoad(ri.kv.DB, ackedTsPrefix)
|
||||
if err != nil {
|
||||
log.Debug("PrefixLoad failed", zap.Any("error", err))
|
||||
return
|
||||
}
|
||||
|
||||
for i, key := range keys {
|
||||
offset := FixedChannelNameLen + 1
|
||||
ackedID, err := strconv.ParseInt((key)[offset:], 10, 64)
|
||||
if err != nil {
|
||||
log.Debug("RocksMQ: parse int " + key[offset:] + " failed")
|
||||
return
|
||||
}
|
||||
|
||||
ts, err := strconv.ParseInt(vals[i], 10, 64)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ackedTs[ackedID] = ts
|
||||
}
|
||||
|
||||
ackedSizeKey := AckedSizeTitle + topic
|
||||
ackedSizeVal, err := ri.kv.Load(ackedSizeKey)
|
||||
if err != nil {
|
||||
log.Debug("Load failed", zap.Any("error", err))
|
||||
return
|
||||
}
|
||||
var ackedSize int64
|
||||
if ackedSizeVal == "" {
|
||||
ackedSize = 0
|
||||
} else {
|
||||
ackedSize, err = strconv.ParseInt(ackedSizeVal, 10, 64)
|
||||
if err != nil {
|
||||
log.Debug("PrefixLoad failed", zap.Any("error", err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
ackedInfo := &topicAckedInfo{
|
||||
topicBeginID: topicBeginID,
|
||||
ackedTs: ackedTs,
|
||||
ackedSize: ackedSize,
|
||||
}
|
||||
|
||||
//Load last retention timestamp
|
||||
lastRetentionTsKey := LastRetTsTitle + topic
|
||||
lastRetentionTsVal, err := ri.kv.Load(lastRetentionTsKey)
|
||||
if err != nil {
|
||||
log.Debug("Load failed", zap.Any("error", err))
|
||||
return
|
||||
}
|
||||
var lastRetentionTs int64
|
||||
if lastRetentionTsVal == "" {
|
||||
lastRetentionTs = math.MaxInt64
|
||||
} else {
|
||||
lastRetentionTs, err = strconv.ParseInt(lastRetentionTsVal, 10, 64)
|
||||
if err != nil {
|
||||
log.Debug("ParseInt failed", zap.Any("error", err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
ri.ackedInfo.Store(topic, ackedInfo)
|
||||
ri.pageInfo.Store(topic, topicPageInfo)
|
||||
ri.lastRetentionTime.Store(topic, lastRetentionTs)
|
||||
}
|
||||
|
||||
// retention do time ticker and trigger retention check and operation for each topic
|
||||
func (ri *retentionInfo) retention() error {
|
||||
log.Debug("Rocksmq retention goroutine start!")
|
||||
@ -290,19 +125,13 @@ func (ri *retentionInfo) retention() error {
|
||||
case t := <-ticker.C:
|
||||
timeNow := t.Unix()
|
||||
checkTime := atomic.LoadInt64(&RocksmqRetentionTimeInMinutes) * MINUTE / 10
|
||||
log.Debug("In ticker: ", zap.Any("ticker", timeNow))
|
||||
ri.mutex.RLock()
|
||||
for _, topic := range ri.topics {
|
||||
lastRetentionTsKey := LastRetTsTitle + topic
|
||||
lastRetentionTsVal, err := ri.kv.Load(lastRetentionTsKey)
|
||||
if err != nil || lastRetentionTsVal == "" {
|
||||
log.Warn("Can't get lastRetentionTs", zap.Any("lastRetentionTsKey", lastRetentionTsKey))
|
||||
continue
|
||||
}
|
||||
lastRetentionTs, err := strconv.ParseInt(lastRetentionTsVal, 10, 64)
|
||||
if err != nil {
|
||||
log.Warn("Can't parse lastRetentionTsVal to int", zap.Any("lastRetentionTsKey", lastRetentionTsKey))
|
||||
continue
|
||||
ri.topics.Range(func(k, v interface{}) bool {
|
||||
topic, _ := k.(string)
|
||||
lastRetentionTs, ok := v.(int64)
|
||||
if !ok {
|
||||
log.Warn("Can't parse lastRetention to int64", zap.String("topic", topic), zap.Any("value", v))
|
||||
return true
|
||||
}
|
||||
if lastRetentionTs+checkTime < timeNow {
|
||||
err := ri.newExpiredCleanUp(topic)
|
||||
@ -310,7 +139,8 @@ func (ri *retentionInfo) retention() error {
|
||||
log.Warn("Retention expired clean failed", zap.Any("error", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
ri.mutex.RUnlock()
|
||||
}
|
||||
}
|
||||
@ -531,202 +361,6 @@ func (ri *retentionInfo) newExpiredCleanUp(topic string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
/*
|
||||
// 1. Obtain pageAckedInfo and do time expired check, get the expired page scope;
|
||||
// 2. Do iteration in the page after the last page in step 1 and get the last time expired message id;
|
||||
// 3. Do size expired check in next page, and get the last size expired message id;
|
||||
// 4. Do delete by range of [start_msg_id, end_msg_id) in rocksdb
|
||||
// 5. Delete corresponding data in retentionInfo
|
||||
func (ri *retentionInfo) expiredCleanUp(topic string) error {
|
||||
log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic)
|
||||
var ackedInfo *topicAckedInfo
|
||||
if info, ok := ri.ackedInfo.Load(topic); ok {
|
||||
ackedInfo = info.(*topicAckedInfo)
|
||||
} else {
|
||||
log.Debug("Topic " + topic + " doesn't have acked infos")
|
||||
return nil
|
||||
}
|
||||
|
||||
ll, ok := topicMu.Load(topic)
|
||||
if !ok {
|
||||
return fmt.Errorf("topic name = %s not exist", topic)
|
||||
}
|
||||
lock, ok := ll.(*sync.Mutex)
|
||||
if !ok {
|
||||
return fmt.Errorf("get mutex failed, topic name = %s", topic)
|
||||
}
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
readOpts := gorocksdb.NewDefaultReadOptions()
|
||||
defer readOpts.Destroy()
|
||||
readOpts.SetPrefixSameAsStart(true)
|
||||
iter := ri.kv.DB.NewIterator(readOpts)
|
||||
defer iter.Close()
|
||||
ackedTsPrefix, err := constructKey(AckedTsTitle, topic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
iter.Seek([]byte(ackedTsPrefix))
|
||||
if !iter.Valid() {
|
||||
return nil
|
||||
}
|
||||
var startID UniqueID
|
||||
var endID UniqueID
|
||||
endID = 0
|
||||
startID, err = strconv.ParseInt(string(iter.Key().Data())[FixedChannelNameLen+1:], 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var deletedAckedSize int64 = 0
|
||||
pageRetentionOffset := 0
|
||||
var pageInfo *topicPageInfo
|
||||
if info, ok := ri.pageInfo.Load(topic); ok {
|
||||
pageInfo = info.(*topicPageInfo)
|
||||
}
|
||||
if pageInfo != nil {
|
||||
for i, pageEndID := range pageInfo.pageEndID {
|
||||
// Clean by RocksmqRetentionTimeInMinutes
|
||||
if msgTimeExpiredCheck(ackedInfo.ackedTs[pageEndID]) {
|
||||
// All of the page expired, set the pageEndID to current endID
|
||||
endID = pageEndID
|
||||
fixedAckedTsKey, err := constructKey(AckedTsTitle, topic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newKey := fixedAckedTsKey + "/" + strconv.Itoa(int(pageEndID))
|
||||
iter.Seek([]byte(newKey))
|
||||
pageRetentionOffset = i + 1
|
||||
|
||||
deletedAckedSize += pageInfo.pageMsgSize[pageEndID]
|
||||
delete(pageInfo.pageMsgSize, pageEndID)
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Debug("Expired check by page info", zap.Any("topic", topic), zap.Any("pageEndID", endID), zap.Any("deletedAckedSize", deletedAckedSize))
|
||||
|
||||
pageEndID := endID
|
||||
// The end msg of the page is not expired, find the last expired msg in this page
|
||||
for ; iter.Valid(); iter.Next() {
|
||||
ackedTs, err := strconv.ParseInt(string(iter.Value().Data()), 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if msgTimeExpiredCheck(ackedTs) {
|
||||
endID, err = strconv.ParseInt(string(iter.Key().Data())[FixedChannelNameLen+1:], 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
log.Debug("Expired check by retention time", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize))
|
||||
// if endID == 0 {
|
||||
// log.Debug("All messages are not expired")
|
||||
// return nil
|
||||
// }
|
||||
|
||||
// Delete page message size in rocksdb_kv
|
||||
if pageInfo != nil {
|
||||
// Judge expire by ackedSize
|
||||
if msgSizeExpiredCheck(deletedAckedSize, ackedInfo.ackedSize) {
|
||||
for _, pEndID := range pageInfo.pageEndID[pageRetentionOffset:] {
|
||||
curDeletedSize := deletedAckedSize + pageInfo.pageMsgSize[pEndID]
|
||||
if msgSizeExpiredCheck(curDeletedSize, ackedInfo.ackedSize) {
|
||||
endID = pEndID
|
||||
pageEndID = pEndID
|
||||
deletedAckedSize = curDeletedSize
|
||||
delete(pageInfo.pageMsgSize, pEndID)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
log.Debug("Expired check by retention size", zap.Any("topic", topic), zap.Any("new endID", endID), zap.Any("new deletedAckedSize", deletedAckedSize))
|
||||
}
|
||||
|
||||
if pageEndID > 0 && len(pageInfo.pageEndID) > 0 {
|
||||
pageStartID := pageInfo.pageEndID[0]
|
||||
fixedPageSizeKey, err := constructKey(PageMsgSizeTitle, topic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pageStartKey := fixedPageSizeKey + "/" + strconv.Itoa(int(pageStartID))
|
||||
pageEndKey := fixedPageSizeKey + "/" + strconv.Itoa(int(pageEndID))
|
||||
pageWriteBatch := gorocksdb.NewWriteBatch()
|
||||
defer pageWriteBatch.Clear()
|
||||
log.Debug("Delete page info", zap.Any("topic", topic), zap.Any("pageStartID", pageStartID), zap.Any("pageEndID", pageEndID))
|
||||
if pageStartID == pageEndID {
|
||||
pageWriteBatch.Delete([]byte(pageStartKey))
|
||||
} else if pageStartID < pageEndID {
|
||||
pageWriteBatch.DeleteRange([]byte(pageStartKey), []byte(pageEndKey))
|
||||
}
|
||||
err = ri.kv.DB.Write(gorocksdb.NewDefaultWriteOptions(), pageWriteBatch)
|
||||
if err != nil {
|
||||
log.Error("rocksdb write error", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
pageInfo.pageEndID = pageInfo.pageEndID[pageRetentionOffset:]
|
||||
}
|
||||
ri.pageInfo.Store(topic, pageInfo)
|
||||
}
|
||||
if endID == 0 {
|
||||
log.Debug("All messages are not expired")
|
||||
return nil
|
||||
}
|
||||
log.Debug("ExpiredCleanUp: ", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize))
|
||||
|
||||
// Delete acked_ts in rocksdb_kv
|
||||
fixedAckedTsTitle, err := constructKey(AckedTsTitle, topic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ackedStartIDKey := fixedAckedTsTitle + "/" + strconv.Itoa(int(startID))
|
||||
ackedEndIDKey := fixedAckedTsTitle + "/" + strconv.Itoa(int(endID))
|
||||
ackedTsWriteBatch := gorocksdb.NewWriteBatch()
|
||||
defer ackedTsWriteBatch.Clear()
|
||||
if startID > endID {
|
||||
return nil
|
||||
} else if startID == endID {
|
||||
ackedTsWriteBatch.Delete([]byte(ackedStartIDKey))
|
||||
} else {
|
||||
ackedTsWriteBatch.DeleteRange([]byte(ackedStartIDKey), []byte(ackedEndIDKey))
|
||||
}
|
||||
err = ri.kv.DB.Write(gorocksdb.NewDefaultWriteOptions(), ackedTsWriteBatch)
|
||||
if err != nil {
|
||||
log.Error("rocksdb write error", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// Update acked_size in rocksdb_kv
|
||||
|
||||
// Update last retention ts
|
||||
lastRetentionTsKey := LastRetTsTitle + topic
|
||||
err = ri.kv.Save(lastRetentionTsKey, strconv.FormatInt(time.Now().Unix(), 10))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ackedInfo.ackedSize -= deletedAckedSize
|
||||
ackedSizeKey := AckedSizeTitle + topic
|
||||
err = ri.kv.Save(ackedSizeKey, strconv.FormatInt(ackedInfo.ackedSize, 10))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for k := range ackedInfo.ackedTs {
|
||||
if k < endID {
|
||||
delete(ackedInfo.ackedTs, k)
|
||||
}
|
||||
}
|
||||
ri.ackedInfo.Store(topic, ackedInfo)
|
||||
|
||||
return DeleteMessages(ri.db, topic, startID, endID)
|
||||
}
|
||||
*/
|
||||
|
||||
// DeleteMessages in rocksdb by range of [startID, endID)
|
||||
func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error {
|
||||
// Delete msg by range of startID and endID
|
||||
|
||||
@ -15,8 +15,6 @@ import (
|
||||
"math/rand"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
@ -111,14 +109,9 @@ func TestRmqRetention(t *testing.T) {
|
||||
newRes, err := rmq.Consume(topicName, groupName, 1)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, len(newRes), 0)
|
||||
|
||||
//////////////////////////////////////////////////
|
||||
lastRetTsKey := LastRetTsTitle + topicName
|
||||
rmq.kv.Save(lastRetTsKey, "")
|
||||
time.Sleep(time.Duration(checkTimeInterval+1) * time.Second)
|
||||
|
||||
//////////////////////////////////////////////////
|
||||
rmq.kv.Save(lastRetTsKey, "dummy")
|
||||
// test valid value case
|
||||
rmq.retentionInfo.topics.Store(topicName, "dummy")
|
||||
time.Sleep(time.Duration(checkTimeInterval+1) * time.Second)
|
||||
}
|
||||
|
||||
@ -143,204 +136,6 @@ func TestRetentionInfo_InitRetentionInfo(t *testing.T) {
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestRetentionInfo_LoadRetentionInfo(t *testing.T) {
|
||||
atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 0)
|
||||
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0)
|
||||
atomic.StoreInt64(&RocksmqPageSize, 100)
|
||||
kvPath := retentionPath + "kv_" + genRandonName()
|
||||
defer os.RemoveAll(kvPath)
|
||||
idAllocator := InitIDAllocator(kvPath)
|
||||
|
||||
rocksdbPath := retentionPath + "db_" + genRandonName()
|
||||
defer os.RemoveAll(rocksdbPath)
|
||||
metaPath := retentionPath + "meta_" + genRandonName()
|
||||
defer os.RemoveAll(metaPath)
|
||||
|
||||
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
|
||||
assert.Nil(t, err)
|
||||
defer rmq.Close()
|
||||
|
||||
topicName := "topic_a"
|
||||
err = rmq.CreateTopic(topicName)
|
||||
assert.Nil(t, err)
|
||||
defer rmq.DestroyTopic(topicName)
|
||||
|
||||
rmq.retentionInfo.startRetentionInfo()
|
||||
|
||||
rmq.retentionInfo.ackedInfo.Delete(topicName)
|
||||
|
||||
msgNum := 100
|
||||
pMsgs := make([]ProducerMessage, msgNum)
|
||||
for i := 0; i < msgNum; i++ {
|
||||
msg := "message_" + strconv.Itoa(i)
|
||||
pMsg := ProducerMessage{Payload: []byte(msg)}
|
||||
pMsgs[i] = pMsg
|
||||
}
|
||||
ids, err := rmq.Produce(topicName, pMsgs)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, len(pMsgs), len(ids))
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
|
||||
|
||||
groupName := "test_group"
|
||||
_ = rmq.DestroyConsumerGroup(topicName, groupName)
|
||||
err = rmq.CreateConsumerGroup(topicName, groupName)
|
||||
|
||||
consumer := &Consumer{
|
||||
Topic: topicName,
|
||||
GroupName: groupName,
|
||||
}
|
||||
rmq.RegisterConsumer(consumer)
|
||||
|
||||
assert.Nil(t, err)
|
||||
cMsgs := make([]ConsumerMessage, 0)
|
||||
for i := 0; i < msgNum; i++ {
|
||||
cMsg, err := rmq.Consume(topicName, groupName, 1)
|
||||
assert.Nil(t, err)
|
||||
cMsgs = append(cMsgs, cMsg[0])
|
||||
}
|
||||
assert.Equal(t, len(cMsgs), msgNum)
|
||||
|
||||
wg.Add(1)
|
||||
|
||||
ll, ok := topicMu.Load(topicName)
|
||||
assert.Equal(t, ok, true)
|
||||
lock, _ := ll.(*sync.Mutex)
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
|
||||
|
||||
_, err = initRetentionInfo(rmq.retentionInfo.kv, rmq.store)
|
||||
assert.Nil(t, err)
|
||||
|
||||
dummyTopic := strings.Repeat(topicName, 100)
|
||||
err = DeleteMessages(rmq.store, dummyTopic, 0, 0)
|
||||
assert.Error(t, err)
|
||||
|
||||
err = DeleteMessages(rmq.store, topicName, 0, 0)
|
||||
assert.NoError(t, err)
|
||||
|
||||
//////////////////////////////////////////////////
|
||||
ackedTsPrefix, _ := constructKey(AckedTsTitle, topicName)
|
||||
ackedTsKey0 := ackedTsPrefix + "/1"
|
||||
err = rmq.retentionInfo.kv.Save(ackedTsKey0, "dummy")
|
||||
assert.Nil(t, err)
|
||||
wg.Add(1)
|
||||
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
|
||||
err = rmq.retentionInfo.kv.Remove(ackedTsKey0)
|
||||
assert.Nil(t, err)
|
||||
|
||||
//////////////////////////////////////////////////
|
||||
ackedTsKey1 := ackedTsPrefix + "/dummy"
|
||||
err = rmq.retentionInfo.kv.Save(ackedTsKey1, "dummy")
|
||||
assert.Nil(t, err)
|
||||
wg.Add(1)
|
||||
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
|
||||
err = rmq.retentionInfo.kv.Remove(ackedTsKey1)
|
||||
assert.Nil(t, err)
|
||||
|
||||
//////////////////////////////////////////////////
|
||||
lastRetentionTsKey := LastRetTsTitle + topicName
|
||||
err = rmq.retentionInfo.kv.Save(lastRetentionTsKey, strconv.FormatInt(1, 10))
|
||||
assert.Nil(t, err)
|
||||
wg.Add(1)
|
||||
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
|
||||
err = rmq.retentionInfo.kv.Remove(lastRetentionTsKey)
|
||||
assert.Nil(t, err)
|
||||
|
||||
//////////////////////////////////////////////////
|
||||
err = rmq.retentionInfo.kv.Save(lastRetentionTsKey, "dummy")
|
||||
assert.Nil(t, err)
|
||||
wg.Add(1)
|
||||
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
|
||||
err = rmq.retentionInfo.kv.Remove(lastRetentionTsKey)
|
||||
assert.Nil(t, err)
|
||||
|
||||
//////////////////////////////////////////////////
|
||||
ackedSizeKey := AckedSizeTitle + topicName
|
||||
err = rmq.retentionInfo.kv.Save(ackedSizeKey, strconv.FormatInt(1, 10))
|
||||
assert.Nil(t, err)
|
||||
wg.Add(1)
|
||||
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
|
||||
err = rmq.retentionInfo.kv.Remove(ackedSizeKey)
|
||||
assert.Nil(t, err)
|
||||
|
||||
//////////////////////////////////////////////////
|
||||
err = rmq.retentionInfo.kv.Save(ackedSizeKey, "")
|
||||
assert.Nil(t, err)
|
||||
wg.Add(1)
|
||||
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
|
||||
err = rmq.retentionInfo.kv.Remove(ackedSizeKey)
|
||||
assert.Nil(t, err)
|
||||
|
||||
//////////////////////////////////////////////////
|
||||
err = rmq.retentionInfo.kv.Save(ackedSizeKey, "dummy")
|
||||
assert.Nil(t, err)
|
||||
wg.Add(1)
|
||||
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
|
||||
err = rmq.retentionInfo.kv.Remove(ackedSizeKey)
|
||||
assert.Nil(t, err)
|
||||
|
||||
//////////////////////////////////////////////////
|
||||
topicBeginIDKey := TopicBeginIDTitle + topicName
|
||||
err = rmq.retentionInfo.kv.Save(topicBeginIDKey, "dummy")
|
||||
assert.Nil(t, err)
|
||||
wg.Add(1)
|
||||
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
|
||||
err = rmq.retentionInfo.kv.Remove(topicBeginIDKey)
|
||||
assert.Nil(t, err)
|
||||
|
||||
////////////////////////////////////////////////////
|
||||
fixedPageSizeKey0, _ := constructKey(PageMsgSizeTitle, topicName)
|
||||
pageMsgSizeKey0 := fixedPageSizeKey0 + "/" + "1"
|
||||
err = rmq.retentionInfo.kv.Save(pageMsgSizeKey0, "dummy")
|
||||
assert.Nil(t, err)
|
||||
wg.Add(1)
|
||||
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
|
||||
err = rmq.retentionInfo.kv.Remove(pageMsgSizeKey0)
|
||||
assert.Nil(t, err)
|
||||
|
||||
//////////////////////////////////////////////////
|
||||
fixedPageSizeKey1, _ := constructKey(PageMsgSizeTitle, topicName)
|
||||
pageMsgSizeKey1 := fixedPageSizeKey1 + "/" + "dummy"
|
||||
rmq.retentionInfo.kv.Save(pageMsgSizeKey1, "dummy")
|
||||
wg.Add(1)
|
||||
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
|
||||
rmq.retentionInfo.kv.Remove(pageMsgSizeKey1)
|
||||
|
||||
//////////////////////////////////////////////////
|
||||
pageMsgPrefix, _ := constructKey(PageMsgSizeTitle, topicName)
|
||||
pageMsgKey := pageMsgPrefix + "/dummy"
|
||||
rmq.kv.Save(pageMsgKey, "0")
|
||||
rmq.retentionInfo.newExpiredCleanUp(topicName)
|
||||
|
||||
//////////////////////////////////////////////////
|
||||
rmq.retentionInfo.kv.DB = nil
|
||||
wg.Add(1)
|
||||
rmq.retentionInfo.loadRetentionInfo(topicName, &wg)
|
||||
rmq.retentionInfo.kv.Remove(pageMsgSizeKey1)
|
||||
|
||||
//////////////////////////////////////////////////
|
||||
longTopic := strings.Repeat("dummy", 100)
|
||||
wg.Add(1)
|
||||
rmq.retentionInfo.loadRetentionInfo(longTopic, &wg)
|
||||
|
||||
//////////////////////////////////////////////////
|
||||
topicMu.Delete(topicName)
|
||||
topicMu.Store(topicName, topicName)
|
||||
rmq.retentionInfo.newExpiredCleanUp(topicName)
|
||||
|
||||
//////////////////////////////////////////////////
|
||||
topicMu.Delete(topicName)
|
||||
rmq.retentionInfo.newExpiredCleanUp(topicName)
|
||||
|
||||
//////////////////////////////////////////////////
|
||||
rmq.retentionInfo.ackedInfo.Delete(topicName)
|
||||
rmq.retentionInfo.newExpiredCleanUp(topicName)
|
||||
}
|
||||
|
||||
func TestRmqRetention_Complex(t *testing.T) {
|
||||
atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0)
|
||||
atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 1)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user