From ca56290e85e7b7d901a438481f9bc9c17689ca70 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Tue, 2 Nov 2021 16:50:31 +0800 Subject: [PATCH] Fix rocksdb retention ts not set (#11081) Signed-off-by: xiaofan-luan --- .../rocksmq/server/rocksmq/rocksmq_impl.go | 40 +- .../server/rocksmq/rocksmq_retention.go | 402 +----------------- .../server/rocksmq/rocksmq_retention_test.go | 209 +-------- 3 files changed, 25 insertions(+), 626 deletions(-) diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 7cfffdbf08..4e99be4140 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -261,24 +261,9 @@ func (rmq *rocksmq) CreateTopic(topicName string) error { return err } - // Initialize last retention timestamp to time_now - lastRetentionTsKey := LastRetTsTitle + topicName - timeNow := time.Now().Unix() - err = rmq.kv.Save(lastRetentionTsKey, strconv.FormatInt(timeNow, 10)) - if err != nil { - return nil - } rmq.retentionInfo.mutex.Lock() defer rmq.retentionInfo.mutex.Unlock() - rmq.retentionInfo.topics = append(rmq.retentionInfo.topics, topicName) - // rmq.retentionInfo.pageInfo.Store(topicName, &topicPageInfo{ - // pageEndID: make([]UniqueID, 0), - // pageMsgSize: map[UniqueID]int64{}, - // }) - // rmq.retentionInfo.lastRetentionTime.Store(topicName, timeNow) - // rmq.retentionInfo.ackedInfo.Store(topicName, &topicAckedInfo{ - // ackedTs: map[UniqueID]int64{}, - // }) + rmq.retentionInfo.topics.Store(topicName, time.Now().Unix()) log.Debug("Rocksmq create topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds())) return nil } @@ -313,6 +298,8 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error { if err != nil { return err } + + // just for clean up old topics, for new topics this is not required lastRetTsKey := LastRetTsTitle + topicName err = rmq.kv.Remove(lastRetTsKey) if err != nil { @@ -325,15 +312,8 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error { } topicMu.Delete(topicName) - for i, name := range rmq.retentionInfo.topics { - if topicName == name { - rmq.retentionInfo.topics = append(rmq.retentionInfo.topics[:i], rmq.retentionInfo.topics[i+1:]...) - break - } - } - // rmq.retentionInfo.ackedInfo.Delete(topicName) - // rmq.retentionInfo.lastRetentionTime.Delete(topicName) - // rmq.retentionInfo.pageInfo.Delete(topicName) + // clean up retention info + rmq.retentionInfo.topics.Delete(topicName) log.Debug("Rocksmq destroy topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds())) return nil } @@ -825,11 +805,6 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID, if err != nil { return err } - // if info, ok := rmq.retentionInfo.ackedInfo.Load(topicName); ok { - // ackedInfo := info.(*topicAckedInfo) - // ackedInfo.ackedTs[minBeginID] = ts - // rmq.retentionInfo.ackedInfo.Store(topicName, ackedInfo) - // } if minBeginID == newID { // Means the begin_id of topic update to newID, so needs to update acked size ackedSizeKey := AckedSizeTitle + topicName @@ -846,11 +821,6 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID, if err != nil { return err } - // if info, ok := rmq.retentionInfo.ackedInfo.Load(topicName); ok { - // ackedInfo := info.(*topicAckedInfo) - // ackedInfo.ackedSize = ackedSize - // rmq.retentionInfo.ackedInfo.Store(topicName, ackedInfo) - // } } } return nil diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go index 628494609b..b161507846 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go @@ -14,7 +14,6 @@ package rocksmq import ( "errors" "fmt" - "math" "strconv" "sync" "sync/atomic" @@ -41,29 +40,10 @@ const ( // TickerTimeInSeconds is the time of expired check, default 10 minutes var TickerTimeInSeconds int64 = 10 * MINUTE -type topicPageInfo struct { - pageEndID []UniqueID - pageMsgSize map[UniqueID]int64 -} - -type topicAckedInfo struct { - topicBeginID UniqueID - // TODO(yukun): may need to delete ackedTs - ackedTs map[UniqueID]UniqueID - ackedSize int64 -} - type retentionInfo struct { - topics []string - // pageInfo map[string]*topicPageInfo - pageInfo sync.Map - // ackedInfo map[string]*topicAckedInfo - ackedInfo sync.Map - // Key is last_retention_time/${topic} - // lastRetentionTime map[string]int64 - lastRetentionTime sync.Map - - mutex sync.RWMutex + // key is topic name, value is last retention type + topics sync.Map + mutex sync.RWMutex kv *rocksdbkv.RocksdbKV db *gorocksdb.DB @@ -101,15 +81,12 @@ func prefixLoad(db *gorocksdb.DB, prefix string) ([]string, []string, error) { func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInfo, error) { ri := &retentionInfo{ - topics: make([]string, 0), - pageInfo: sync.Map{}, - ackedInfo: sync.Map{}, - lastRetentionTime: sync.Map{}, - mutex: sync.RWMutex{}, - kv: kv, - db: db, - closeCh: make(chan struct{}), - closeWg: sync.WaitGroup{}, + topics: sync.Map{}, + mutex: sync.RWMutex{}, + kv: kv, + db: db, + closeCh: make(chan struct{}), + closeWg: sync.WaitGroup{}, } // Get topic from topic begin id beginIDKeys, _, err := ri.kv.LoadWithPrefix(TopicBeginIDTitle) @@ -118,7 +95,7 @@ func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInf } for _, key := range beginIDKeys { topic := key[len(TopicBeginIDTitle):] - ri.topics = append(ri.topics, topic) + ri.topics.Store(topic, time.Now().Unix()) topicMu.Store(topic, new(sync.Mutex)) } return ri, nil @@ -129,152 +106,10 @@ func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInf func (ri *retentionInfo) startRetentionInfo() { // var wg sync.WaitGroup ri.kv.ResetPrefixLength(FixedChannelNameLen) - // for _, topic := range ri.topics { - // log.Debug("Start load retention info", zap.Any("topic", topic)) - // Load all page infos - // wg.Add(1) - // go ri.loadRetentionInfo(topic, &wg) - // } - // wg.Wait() - // log.Debug("Finish load retention info, start retention") ri.closeWg.Add(1) go ri.retention() } -// Read retention infos from rocksdb so that retention check can be done based on memory data -func (ri *retentionInfo) loadRetentionInfo(topic string, wg *sync.WaitGroup) { - // TODO(yukun): If there needs to add lock - // ll, ok := topicMu.Load(topic) - // if !ok { - // return fmt.Errorf("topic name = %s not exist", topic) - // } - // lock, ok := ll.(*sync.Mutex) - // if !ok { - // return fmt.Errorf("get mutex failed, topic name = %s", topic) - // } - // lock.Lock() - // defer lock.Unlock() - defer wg.Done() - pageEndID := make([]UniqueID, 0) - pageMsgSize := make(map[int64]UniqueID) - - fixedPageSizeKey, err := constructKey(PageMsgSizeTitle, topic) - if err != nil { - log.Debug("ConstructKey failed", zap.Any("error", err)) - return - } - pageMsgSizePrefix := fixedPageSizeKey + "/" - pageMsgSizeKeys, pageMsgSizeVals, err := prefixLoad(ri.kv.DB, pageMsgSizePrefix) - if err != nil { - log.Debug("PrefixLoad failed", zap.Any("error", err)) - return - } - for i, key := range pageMsgSizeKeys { - endID, err := strconv.ParseInt(key[FixedChannelNameLen+1:], 10, 64) - if err != nil { - log.Debug("ParseInt failed", zap.Any("error", err)) - return - } - pageEndID = append(pageEndID, endID) - - msgSize, err := strconv.ParseInt(pageMsgSizeVals[i], 10, 64) - if err != nil { - log.Debug("ParseInt failed", zap.Any("error", err)) - return - } - pageMsgSize[endID] = msgSize - } - topicPageInfo := &topicPageInfo{ - pageEndID: pageEndID, - pageMsgSize: pageMsgSize, - } - - // Load all acked infos - ackedTs := make(map[UniqueID]UniqueID) - - topicBeginIDKey := TopicBeginIDTitle + topic - topicBeginIDVal, err := ri.kv.Load(topicBeginIDKey) - if err != nil { - return - } - topicBeginID, err := strconv.ParseInt(topicBeginIDVal, 10, 64) - if err != nil { - log.Debug("ParseInt failed", zap.Any("error", err)) - return - } - - ackedTsPrefix, err := constructKey(AckedTsTitle, topic) - if err != nil { - log.Debug("ConstructKey failed", zap.Any("error", err)) - return - } - keys, vals, err := prefixLoad(ri.kv.DB, ackedTsPrefix) - if err != nil { - log.Debug("PrefixLoad failed", zap.Any("error", err)) - return - } - - for i, key := range keys { - offset := FixedChannelNameLen + 1 - ackedID, err := strconv.ParseInt((key)[offset:], 10, 64) - if err != nil { - log.Debug("RocksMQ: parse int " + key[offset:] + " failed") - return - } - - ts, err := strconv.ParseInt(vals[i], 10, 64) - if err != nil { - return - } - ackedTs[ackedID] = ts - } - - ackedSizeKey := AckedSizeTitle + topic - ackedSizeVal, err := ri.kv.Load(ackedSizeKey) - if err != nil { - log.Debug("Load failed", zap.Any("error", err)) - return - } - var ackedSize int64 - if ackedSizeVal == "" { - ackedSize = 0 - } else { - ackedSize, err = strconv.ParseInt(ackedSizeVal, 10, 64) - if err != nil { - log.Debug("PrefixLoad failed", zap.Any("error", err)) - return - } - } - - ackedInfo := &topicAckedInfo{ - topicBeginID: topicBeginID, - ackedTs: ackedTs, - ackedSize: ackedSize, - } - - //Load last retention timestamp - lastRetentionTsKey := LastRetTsTitle + topic - lastRetentionTsVal, err := ri.kv.Load(lastRetentionTsKey) - if err != nil { - log.Debug("Load failed", zap.Any("error", err)) - return - } - var lastRetentionTs int64 - if lastRetentionTsVal == "" { - lastRetentionTs = math.MaxInt64 - } else { - lastRetentionTs, err = strconv.ParseInt(lastRetentionTsVal, 10, 64) - if err != nil { - log.Debug("ParseInt failed", zap.Any("error", err)) - return - } - } - - ri.ackedInfo.Store(topic, ackedInfo) - ri.pageInfo.Store(topic, topicPageInfo) - ri.lastRetentionTime.Store(topic, lastRetentionTs) -} - // retention do time ticker and trigger retention check and operation for each topic func (ri *retentionInfo) retention() error { log.Debug("Rocksmq retention goroutine start!") @@ -290,19 +125,13 @@ func (ri *retentionInfo) retention() error { case t := <-ticker.C: timeNow := t.Unix() checkTime := atomic.LoadInt64(&RocksmqRetentionTimeInMinutes) * MINUTE / 10 - log.Debug("In ticker: ", zap.Any("ticker", timeNow)) ri.mutex.RLock() - for _, topic := range ri.topics { - lastRetentionTsKey := LastRetTsTitle + topic - lastRetentionTsVal, err := ri.kv.Load(lastRetentionTsKey) - if err != nil || lastRetentionTsVal == "" { - log.Warn("Can't get lastRetentionTs", zap.Any("lastRetentionTsKey", lastRetentionTsKey)) - continue - } - lastRetentionTs, err := strconv.ParseInt(lastRetentionTsVal, 10, 64) - if err != nil { - log.Warn("Can't parse lastRetentionTsVal to int", zap.Any("lastRetentionTsKey", lastRetentionTsKey)) - continue + ri.topics.Range(func(k, v interface{}) bool { + topic, _ := k.(string) + lastRetentionTs, ok := v.(int64) + if !ok { + log.Warn("Can't parse lastRetention to int64", zap.String("topic", topic), zap.Any("value", v)) + return true } if lastRetentionTs+checkTime < timeNow { err := ri.newExpiredCleanUp(topic) @@ -310,7 +139,8 @@ func (ri *retentionInfo) retention() error { log.Warn("Retention expired clean failed", zap.Any("error", err)) } } - } + return true + }) ri.mutex.RUnlock() } } @@ -531,202 +361,6 @@ func (ri *retentionInfo) newExpiredCleanUp(topic string) error { return nil } -/* -// 1. Obtain pageAckedInfo and do time expired check, get the expired page scope; -// 2. Do iteration in the page after the last page in step 1 and get the last time expired message id; -// 3. Do size expired check in next page, and get the last size expired message id; -// 4. Do delete by range of [start_msg_id, end_msg_id) in rocksdb -// 5. Delete corresponding data in retentionInfo -func (ri *retentionInfo) expiredCleanUp(topic string) error { - log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic) - var ackedInfo *topicAckedInfo - if info, ok := ri.ackedInfo.Load(topic); ok { - ackedInfo = info.(*topicAckedInfo) - } else { - log.Debug("Topic " + topic + " doesn't have acked infos") - return nil - } - - ll, ok := topicMu.Load(topic) - if !ok { - return fmt.Errorf("topic name = %s not exist", topic) - } - lock, ok := ll.(*sync.Mutex) - if !ok { - return fmt.Errorf("get mutex failed, topic name = %s", topic) - } - lock.Lock() - defer lock.Unlock() - - readOpts := gorocksdb.NewDefaultReadOptions() - defer readOpts.Destroy() - readOpts.SetPrefixSameAsStart(true) - iter := ri.kv.DB.NewIterator(readOpts) - defer iter.Close() - ackedTsPrefix, err := constructKey(AckedTsTitle, topic) - if err != nil { - return err - } - iter.Seek([]byte(ackedTsPrefix)) - if !iter.Valid() { - return nil - } - var startID UniqueID - var endID UniqueID - endID = 0 - startID, err = strconv.ParseInt(string(iter.Key().Data())[FixedChannelNameLen+1:], 10, 64) - if err != nil { - return err - } - - var deletedAckedSize int64 = 0 - pageRetentionOffset := 0 - var pageInfo *topicPageInfo - if info, ok := ri.pageInfo.Load(topic); ok { - pageInfo = info.(*topicPageInfo) - } - if pageInfo != nil { - for i, pageEndID := range pageInfo.pageEndID { - // Clean by RocksmqRetentionTimeInMinutes - if msgTimeExpiredCheck(ackedInfo.ackedTs[pageEndID]) { - // All of the page expired, set the pageEndID to current endID - endID = pageEndID - fixedAckedTsKey, err := constructKey(AckedTsTitle, topic) - if err != nil { - return err - } - newKey := fixedAckedTsKey + "/" + strconv.Itoa(int(pageEndID)) - iter.Seek([]byte(newKey)) - pageRetentionOffset = i + 1 - - deletedAckedSize += pageInfo.pageMsgSize[pageEndID] - delete(pageInfo.pageMsgSize, pageEndID) - } - } - } - log.Debug("Expired check by page info", zap.Any("topic", topic), zap.Any("pageEndID", endID), zap.Any("deletedAckedSize", deletedAckedSize)) - - pageEndID := endID - // The end msg of the page is not expired, find the last expired msg in this page - for ; iter.Valid(); iter.Next() { - ackedTs, err := strconv.ParseInt(string(iter.Value().Data()), 10, 64) - if err != nil { - return err - } - if msgTimeExpiredCheck(ackedTs) { - endID, err = strconv.ParseInt(string(iter.Key().Data())[FixedChannelNameLen+1:], 10, 64) - if err != nil { - return err - } - } else { - break - } - } - log.Debug("Expired check by retention time", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize)) - // if endID == 0 { - // log.Debug("All messages are not expired") - // return nil - // } - - // Delete page message size in rocksdb_kv - if pageInfo != nil { - // Judge expire by ackedSize - if msgSizeExpiredCheck(deletedAckedSize, ackedInfo.ackedSize) { - for _, pEndID := range pageInfo.pageEndID[pageRetentionOffset:] { - curDeletedSize := deletedAckedSize + pageInfo.pageMsgSize[pEndID] - if msgSizeExpiredCheck(curDeletedSize, ackedInfo.ackedSize) { - endID = pEndID - pageEndID = pEndID - deletedAckedSize = curDeletedSize - delete(pageInfo.pageMsgSize, pEndID) - } else { - break - } - } - log.Debug("Expired check by retention size", zap.Any("topic", topic), zap.Any("new endID", endID), zap.Any("new deletedAckedSize", deletedAckedSize)) - } - - if pageEndID > 0 && len(pageInfo.pageEndID) > 0 { - pageStartID := pageInfo.pageEndID[0] - fixedPageSizeKey, err := constructKey(PageMsgSizeTitle, topic) - if err != nil { - return err - } - pageStartKey := fixedPageSizeKey + "/" + strconv.Itoa(int(pageStartID)) - pageEndKey := fixedPageSizeKey + "/" + strconv.Itoa(int(pageEndID)) - pageWriteBatch := gorocksdb.NewWriteBatch() - defer pageWriteBatch.Clear() - log.Debug("Delete page info", zap.Any("topic", topic), zap.Any("pageStartID", pageStartID), zap.Any("pageEndID", pageEndID)) - if pageStartID == pageEndID { - pageWriteBatch.Delete([]byte(pageStartKey)) - } else if pageStartID < pageEndID { - pageWriteBatch.DeleteRange([]byte(pageStartKey), []byte(pageEndKey)) - } - err = ri.kv.DB.Write(gorocksdb.NewDefaultWriteOptions(), pageWriteBatch) - if err != nil { - log.Error("rocksdb write error", zap.Error(err)) - return err - } - - pageInfo.pageEndID = pageInfo.pageEndID[pageRetentionOffset:] - } - ri.pageInfo.Store(topic, pageInfo) - } - if endID == 0 { - log.Debug("All messages are not expired") - return nil - } - log.Debug("ExpiredCleanUp: ", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize)) - - // Delete acked_ts in rocksdb_kv - fixedAckedTsTitle, err := constructKey(AckedTsTitle, topic) - if err != nil { - return err - } - ackedStartIDKey := fixedAckedTsTitle + "/" + strconv.Itoa(int(startID)) - ackedEndIDKey := fixedAckedTsTitle + "/" + strconv.Itoa(int(endID)) - ackedTsWriteBatch := gorocksdb.NewWriteBatch() - defer ackedTsWriteBatch.Clear() - if startID > endID { - return nil - } else if startID == endID { - ackedTsWriteBatch.Delete([]byte(ackedStartIDKey)) - } else { - ackedTsWriteBatch.DeleteRange([]byte(ackedStartIDKey), []byte(ackedEndIDKey)) - } - err = ri.kv.DB.Write(gorocksdb.NewDefaultWriteOptions(), ackedTsWriteBatch) - if err != nil { - log.Error("rocksdb write error", zap.Error(err)) - return err - } - - // Update acked_size in rocksdb_kv - - // Update last retention ts - lastRetentionTsKey := LastRetTsTitle + topic - err = ri.kv.Save(lastRetentionTsKey, strconv.FormatInt(time.Now().Unix(), 10)) - if err != nil { - return err - } - - ackedInfo.ackedSize -= deletedAckedSize - ackedSizeKey := AckedSizeTitle + topic - err = ri.kv.Save(ackedSizeKey, strconv.FormatInt(ackedInfo.ackedSize, 10)) - if err != nil { - return err - } - - for k := range ackedInfo.ackedTs { - if k < endID { - delete(ackedInfo.ackedTs, k) - } - } - ri.ackedInfo.Store(topic, ackedInfo) - - return DeleteMessages(ri.db, topic, startID, endID) -} -*/ - // DeleteMessages in rocksdb by range of [startID, endID) func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error { // Delete msg by range of startID and endID diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go index d3cc9154cd..55347fad8a 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go @@ -15,8 +15,6 @@ import ( "math/rand" "os" "strconv" - "strings" - "sync" "sync/atomic" "testing" "time" @@ -111,14 +109,9 @@ func TestRmqRetention(t *testing.T) { newRes, err := rmq.Consume(topicName, groupName, 1) assert.Nil(t, err) assert.Equal(t, len(newRes), 0) - ////////////////////////////////////////////////// - lastRetTsKey := LastRetTsTitle + topicName - rmq.kv.Save(lastRetTsKey, "") - time.Sleep(time.Duration(checkTimeInterval+1) * time.Second) - - ////////////////////////////////////////////////// - rmq.kv.Save(lastRetTsKey, "dummy") + // test valid value case + rmq.retentionInfo.topics.Store(topicName, "dummy") time.Sleep(time.Duration(checkTimeInterval+1) * time.Second) } @@ -143,204 +136,6 @@ func TestRetentionInfo_InitRetentionInfo(t *testing.T) { assert.Error(t, err) } -func TestRetentionInfo_LoadRetentionInfo(t *testing.T) { - atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 0) - atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0) - atomic.StoreInt64(&RocksmqPageSize, 100) - kvPath := retentionPath + "kv_" + genRandonName() - defer os.RemoveAll(kvPath) - idAllocator := InitIDAllocator(kvPath) - - rocksdbPath := retentionPath + "db_" + genRandonName() - defer os.RemoveAll(rocksdbPath) - metaPath := retentionPath + "meta_" + genRandonName() - defer os.RemoveAll(metaPath) - - rmq, err := NewRocksMQ(rocksdbPath, idAllocator) - assert.Nil(t, err) - defer rmq.Close() - - topicName := "topic_a" - err = rmq.CreateTopic(topicName) - assert.Nil(t, err) - defer rmq.DestroyTopic(topicName) - - rmq.retentionInfo.startRetentionInfo() - - rmq.retentionInfo.ackedInfo.Delete(topicName) - - msgNum := 100 - pMsgs := make([]ProducerMessage, msgNum) - for i := 0; i < msgNum; i++ { - msg := "message_" + strconv.Itoa(i) - pMsg := ProducerMessage{Payload: []byte(msg)} - pMsgs[i] = pMsg - } - ids, err := rmq.Produce(topicName, pMsgs) - assert.Nil(t, err) - assert.Equal(t, len(pMsgs), len(ids)) - - var wg sync.WaitGroup - wg.Add(1) - rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - - groupName := "test_group" - _ = rmq.DestroyConsumerGroup(topicName, groupName) - err = rmq.CreateConsumerGroup(topicName, groupName) - - consumer := &Consumer{ - Topic: topicName, - GroupName: groupName, - } - rmq.RegisterConsumer(consumer) - - assert.Nil(t, err) - cMsgs := make([]ConsumerMessage, 0) - for i := 0; i < msgNum; i++ { - cMsg, err := rmq.Consume(topicName, groupName, 1) - assert.Nil(t, err) - cMsgs = append(cMsgs, cMsg[0]) - } - assert.Equal(t, len(cMsgs), msgNum) - - wg.Add(1) - - ll, ok := topicMu.Load(topicName) - assert.Equal(t, ok, true) - lock, _ := ll.(*sync.Mutex) - lock.Lock() - defer lock.Unlock() - rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - - _, err = initRetentionInfo(rmq.retentionInfo.kv, rmq.store) - assert.Nil(t, err) - - dummyTopic := strings.Repeat(topicName, 100) - err = DeleteMessages(rmq.store, dummyTopic, 0, 0) - assert.Error(t, err) - - err = DeleteMessages(rmq.store, topicName, 0, 0) - assert.NoError(t, err) - - ////////////////////////////////////////////////// - ackedTsPrefix, _ := constructKey(AckedTsTitle, topicName) - ackedTsKey0 := ackedTsPrefix + "/1" - err = rmq.retentionInfo.kv.Save(ackedTsKey0, "dummy") - assert.Nil(t, err) - wg.Add(1) - rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - err = rmq.retentionInfo.kv.Remove(ackedTsKey0) - assert.Nil(t, err) - - ////////////////////////////////////////////////// - ackedTsKey1 := ackedTsPrefix + "/dummy" - err = rmq.retentionInfo.kv.Save(ackedTsKey1, "dummy") - assert.Nil(t, err) - wg.Add(1) - rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - err = rmq.retentionInfo.kv.Remove(ackedTsKey1) - assert.Nil(t, err) - - ////////////////////////////////////////////////// - lastRetentionTsKey := LastRetTsTitle + topicName - err = rmq.retentionInfo.kv.Save(lastRetentionTsKey, strconv.FormatInt(1, 10)) - assert.Nil(t, err) - wg.Add(1) - rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - err = rmq.retentionInfo.kv.Remove(lastRetentionTsKey) - assert.Nil(t, err) - - ////////////////////////////////////////////////// - err = rmq.retentionInfo.kv.Save(lastRetentionTsKey, "dummy") - assert.Nil(t, err) - wg.Add(1) - rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - err = rmq.retentionInfo.kv.Remove(lastRetentionTsKey) - assert.Nil(t, err) - - ////////////////////////////////////////////////// - ackedSizeKey := AckedSizeTitle + topicName - err = rmq.retentionInfo.kv.Save(ackedSizeKey, strconv.FormatInt(1, 10)) - assert.Nil(t, err) - wg.Add(1) - rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - err = rmq.retentionInfo.kv.Remove(ackedSizeKey) - assert.Nil(t, err) - - ////////////////////////////////////////////////// - err = rmq.retentionInfo.kv.Save(ackedSizeKey, "") - assert.Nil(t, err) - wg.Add(1) - rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - err = rmq.retentionInfo.kv.Remove(ackedSizeKey) - assert.Nil(t, err) - - ////////////////////////////////////////////////// - err = rmq.retentionInfo.kv.Save(ackedSizeKey, "dummy") - assert.Nil(t, err) - wg.Add(1) - rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - err = rmq.retentionInfo.kv.Remove(ackedSizeKey) - assert.Nil(t, err) - - ////////////////////////////////////////////////// - topicBeginIDKey := TopicBeginIDTitle + topicName - err = rmq.retentionInfo.kv.Save(topicBeginIDKey, "dummy") - assert.Nil(t, err) - wg.Add(1) - rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - err = rmq.retentionInfo.kv.Remove(topicBeginIDKey) - assert.Nil(t, err) - - //////////////////////////////////////////////////// - fixedPageSizeKey0, _ := constructKey(PageMsgSizeTitle, topicName) - pageMsgSizeKey0 := fixedPageSizeKey0 + "/" + "1" - err = rmq.retentionInfo.kv.Save(pageMsgSizeKey0, "dummy") - assert.Nil(t, err) - wg.Add(1) - rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - err = rmq.retentionInfo.kv.Remove(pageMsgSizeKey0) - assert.Nil(t, err) - - ////////////////////////////////////////////////// - fixedPageSizeKey1, _ := constructKey(PageMsgSizeTitle, topicName) - pageMsgSizeKey1 := fixedPageSizeKey1 + "/" + "dummy" - rmq.retentionInfo.kv.Save(pageMsgSizeKey1, "dummy") - wg.Add(1) - rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - rmq.retentionInfo.kv.Remove(pageMsgSizeKey1) - - ////////////////////////////////////////////////// - pageMsgPrefix, _ := constructKey(PageMsgSizeTitle, topicName) - pageMsgKey := pageMsgPrefix + "/dummy" - rmq.kv.Save(pageMsgKey, "0") - rmq.retentionInfo.newExpiredCleanUp(topicName) - - ////////////////////////////////////////////////// - rmq.retentionInfo.kv.DB = nil - wg.Add(1) - rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - rmq.retentionInfo.kv.Remove(pageMsgSizeKey1) - - ////////////////////////////////////////////////// - longTopic := strings.Repeat("dummy", 100) - wg.Add(1) - rmq.retentionInfo.loadRetentionInfo(longTopic, &wg) - - ////////////////////////////////////////////////// - topicMu.Delete(topicName) - topicMu.Store(topicName, topicName) - rmq.retentionInfo.newExpiredCleanUp(topicName) - - ////////////////////////////////////////////////// - topicMu.Delete(topicName) - rmq.retentionInfo.newExpiredCleanUp(topicName) - - ////////////////////////////////////////////////// - rmq.retentionInfo.ackedInfo.Delete(topicName) - rmq.retentionInfo.newExpiredCleanUp(topicName) -} - func TestRmqRetention_Complex(t *testing.T) { atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0) atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 1)