diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 7d00cc8061..d74d699b86 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -108,6 +108,10 @@ func constructKey(metaName, topic string) (string, error) { return metaName + topic + string(nameBytes), nil } +func checkRetention() bool { + return RocksmqRetentionTimeInMinutes != -1 && RocksmqRetentionSizeInMB != -1 +} + var topicMu sync.Map = sync.Map{} type rocksmq struct { @@ -161,8 +165,10 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro } rmq.retentionInfo = ri - rmq.retentionInfo.startRetentionInfo() - log.Debug("Rocksmq start successfully ", zap.String("name", name)) + if checkRetention() { + rmq.retentionInfo.startRetentionInfo() + } + return rmq, nil } @@ -261,15 +267,17 @@ func (rmq *rocksmq) CreateTopic(topicName string) error { if err != nil { return nil } + rmq.retentionInfo.mutex.Lock() + defer rmq.retentionInfo.mutex.Unlock() rmq.retentionInfo.topics = append(rmq.retentionInfo.topics, topicName) - rmq.retentionInfo.pageInfo.Store(topicName, &topicPageInfo{ - pageEndID: make([]UniqueID, 0), - pageMsgSize: map[UniqueID]int64{}, - }) - rmq.retentionInfo.lastRetentionTime.Store(topicName, timeNow) - rmq.retentionInfo.ackedInfo.Store(topicName, &topicAckedInfo{ - ackedTs: map[UniqueID]int64{}, - }) + // rmq.retentionInfo.pageInfo.Store(topicName, &topicPageInfo{ + // pageEndID: make([]UniqueID, 0), + // pageMsgSize: map[UniqueID]int64{}, + // }) + // rmq.retentionInfo.lastRetentionTime.Store(topicName, timeNow) + // rmq.retentionInfo.ackedInfo.Store(topicName, &topicAckedInfo{ + // ackedTs: map[UniqueID]int64{}, + // }) log.Debug("Rocksmq create topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds())) return nil } @@ -315,9 +323,15 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error { } topicMu.Delete(topicName) - rmq.retentionInfo.ackedInfo.Delete(topicName) - rmq.retentionInfo.lastRetentionTime.Delete(topicName) - rmq.retentionInfo.pageInfo.Delete(topicName) + for i, name := range rmq.retentionInfo.topics { + if topicName == name { + rmq.retentionInfo.topics = append(rmq.retentionInfo.topics[:i], rmq.retentionInfo.topics[i+1:]...) + break + } + } + // rmq.retentionInfo.ackedInfo.Delete(topicName) + // rmq.retentionInfo.lastRetentionTime.Delete(topicName) + // rmq.retentionInfo.pageInfo.Delete(topicName) log.Debug("Rocksmq destroy topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds())) return nil } @@ -527,11 +541,11 @@ func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes return err } - if pageInfo, ok := rmq.retentionInfo.pageInfo.Load(topicName); ok { - pageInfo.(*topicPageInfo).pageEndID = append(pageInfo.(*topicPageInfo).pageEndID, pageEndID) - pageInfo.(*topicPageInfo).pageMsgSize[pageEndID] = newPageSize - rmq.retentionInfo.pageInfo.Store(topicName, pageInfo) - } + // if pageInfo, ok := rmq.retentionInfo.pageInfo.Load(topicName); ok { + // pageInfo.(*topicPageInfo).pageEndID = append(pageInfo.(*topicPageInfo).pageEndID, pageEndID) + // pageInfo.(*topicPageInfo).pageMsgSize[pageEndID] = newPageSize + // rmq.retentionInfo.pageInfo.Store(topicName, pageInfo) + // } // Update message size to 0 err = rmq.kv.Save(msgSizeKey, strconv.FormatInt(0, 10)) @@ -779,11 +793,11 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID, if err != nil { return err } - if info, ok := rmq.retentionInfo.ackedInfo.Load(topicName); ok { - ackedInfo := info.(*topicAckedInfo) - ackedInfo.ackedTs[minBeginID] = ts - rmq.retentionInfo.ackedInfo.Store(topicName, ackedInfo) - } + // if info, ok := rmq.retentionInfo.ackedInfo.Load(topicName); ok { + // ackedInfo := info.(*topicAckedInfo) + // ackedInfo.ackedTs[minBeginID] = ts + // rmq.retentionInfo.ackedInfo.Store(topicName, ackedInfo) + // } if minBeginID == newID { // Means the begin_id of topic update to newID, so needs to update acked size ackedSizeKey := AckedSizeTitle + topicName @@ -800,11 +814,11 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, newID UniqueID, if err != nil { return err } - if info, ok := rmq.retentionInfo.ackedInfo.Load(topicName); ok { - ackedInfo := info.(*topicAckedInfo) - ackedInfo.ackedSize = ackedSize - rmq.retentionInfo.ackedInfo.Store(topicName, ackedInfo) - } + // if info, ok := rmq.retentionInfo.ackedInfo.Load(topicName); ok { + // ackedInfo := info.(*topicAckedInfo) + // ackedInfo.ackedSize = ackedSize + // rmq.retentionInfo.ackedInfo.Store(topicName, ackedInfo) + // } } } return nil diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go index 559a3aaf3b..60f1f07078 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go @@ -33,8 +33,8 @@ var RocksmqRetentionTimeInMinutes int64 // RocksmqRetentionSizeInMB is the size of retention var RocksmqRetentionSizeInMB int64 -// TickerTimeInMinutes is the time of expired check -var TickerTimeInMinutes int64 = 1 +// TickerTimeInSeconds is the time of expired check +var TickerTimeInSeconds int64 = 6 // Const value that used to convert unit const ( @@ -66,6 +66,8 @@ type retentionInfo struct { // lastRetentionTime map[string]int64 lastRetentionTime sync.Map + mutex sync.RWMutex + kv *rocksdbkv.RocksdbKV db *gorocksdb.DB } @@ -105,6 +107,7 @@ func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInf pageInfo: sync.Map{}, ackedInfo: sync.Map{}, lastRetentionTime: sync.Map{}, + mutex: sync.RWMutex{}, kv: kv, db: db, } @@ -124,19 +127,16 @@ func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInf // Before do retention, load retention info from rocksdb to retention info structure in goroutines. // Because loadRetentionInfo may need some time, so do this asynchronously. Finally start retention goroutine. func (ri *retentionInfo) startRetentionInfo() { - var wg sync.WaitGroup - err := ri.kv.ResetPrefixLength(FixedChannelNameLen) - if err != nil { - log.Warn("Start load retention info", zap.Error(err)) - } - for _, topic := range ri.topics { - log.Debug("Start load retention info", zap.Any("topic", topic)) - // Load all page infos - wg.Add(1) - go ri.loadRetentionInfo(topic, &wg) - } - wg.Wait() - log.Debug("Finish load retention info, start retention") + // var wg sync.WaitGroup + ri.kv.ResetPrefixLength(FixedChannelNameLen) + // for _, topic := range ri.topics { + // log.Debug("Start load retention info", zap.Any("topic", topic)) + // Load all page infos + // wg.Add(1) + // go ri.loadRetentionInfo(topic, &wg) + // } + // wg.Wait() + // log.Debug("Finish load retention info, start retention") go ri.retention() } @@ -277,7 +277,7 @@ func (ri *retentionInfo) loadRetentionInfo(topic string, wg *sync.WaitGroup) { func (ri *retentionInfo) retention() error { log.Debug("Rocksmq retention goroutine start!") // Do retention check every 6s - ticker := time.NewTicker(time.Duration(TickerTimeInMinutes * int64(time.Minute) / 10)) + ticker := time.NewTicker(time.Duration(TickerTimeInSeconds * int64(time.Second))) for { select { @@ -286,21 +286,242 @@ func (ri *retentionInfo) retention() error { return nil case t := <-ticker.C: timeNow := t.Unix() - checkTime := atomic.LoadInt64(&RocksmqRetentionTimeInMinutes) * 60 / 10 + checkTime := atomic.LoadInt64(&RocksmqRetentionTimeInMinutes) * MINUTE / 10 log.Debug("In ticker: ", zap.Any("ticker", timeNow)) - ri.lastRetentionTime.Range(func(k, v interface{}) bool { - if v.(int64)+checkTime < timeNow { - err := ri.expiredCleanUp(k.(string)) + ri.mutex.RLock() + for _, topic := range ri.topics { + lastRetentionTsKey := LastRetTsTitle + topic + lastRetentionTsVal, err := ri.kv.Load(lastRetentionTsKey) + if err != nil || lastRetentionTsVal == "" { + log.Warn("Can't get lastRetentionTs", zap.Any("lastRetentionTsKey", lastRetentionTsKey)) + continue + } + lastRetentionTs, err := strconv.ParseInt(lastRetentionTsVal, 10, 64) + if err != nil { + log.Warn("Can't parse lastRetentionTsVal to int", zap.Any("lastRetentionTsKey", lastRetentionTsKey)) + continue + } + if lastRetentionTs+checkTime < timeNow { + err := ri.newExpiredCleanUp(topic) if err != nil { log.Warn("Retention expired clean failed", zap.Any("error", err)) } } - return true - }) + } + ri.mutex.RUnlock() + // ri.lastRetentionTime.Range(func(k, v interface{}) bool { + // if v.(int64)+checkTime < timeNow { + // err := ri.newExpiredCleanUp(k.(string)) + // if err != nil { + // log.Warn("Retention expired clean failed", zap.Any("error", err)) + // } + // } + // return true + // }) } } } +func (ri *retentionInfo) newExpiredCleanUp(topic string) error { + log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic) + ll, ok := topicMu.Load(topic) + if !ok { + return fmt.Errorf("topic name = %s not exist", topic) + } + lock, ok := ll.(*sync.Mutex) + if !ok { + return fmt.Errorf("get mutex failed, topic name = %s", topic) + } + lock.Lock() + defer lock.Unlock() + + var deletedAckedSize int64 = 0 + var startID UniqueID + var endID UniqueID + var pageStartID UniqueID = 0 + var err error + + fixedAckedTsKey, _ := constructKey(AckedTsTitle, topic) + + pageReadOpts := gorocksdb.NewDefaultReadOptions() + defer pageReadOpts.Destroy() + pageReadOpts.SetPrefixSameAsStart(true) + pageIter := ri.kv.DB.NewIterator(pageReadOpts) + defer pageIter.Close() + pageMsgPrefix, _ := constructKey(PageMsgSizeTitle, topic) + pageIter.Seek([]byte(pageMsgPrefix)) + if pageIter.Valid() { + pageStartID, err = strconv.ParseInt(string(pageIter.Key().Data())[FixedChannelNameLen+1:], 10, 64) + if err != nil { + return err + } + + for ; pageIter.Valid(); pageIter.Next() { + pKey := pageIter.Key() + pageID, err := strconv.ParseInt(string(pKey.Data())[FixedChannelNameLen+1:], 10, 64) + if pKey != nil { + pKey.Free() + } + if err != nil { + return err + } + + ackedTsKey := fixedAckedTsKey + "/" + strconv.FormatInt(pageID, 10) + ackedTsVal, err := ri.kv.Load(ackedTsKey) + if err != nil { + return err + } + ackedTs, err := strconv.ParseInt(ackedTsVal, 10, 64) + if err != nil { + return err + } + if msgTimeExpiredCheck(ackedTs) { + endID = pageID + pValue := pageIter.Value() + size, err := strconv.ParseInt(string(pValue.Data()), 10, 64) + if pValue != nil { + pValue.Free() + } + if err != nil { + return err + } + deletedAckedSize += size + } else { + break + } + } + } + + pageEndID := endID + + ackedReadOpts := gorocksdb.NewDefaultReadOptions() + defer ackedReadOpts.Destroy() + ackedReadOpts.SetPrefixSameAsStart(true) + ackedIter := ri.kv.DB.NewIterator(ackedReadOpts) + defer ackedIter.Close() + if err != nil { + return err + } + ackedIter.Seek([]byte(fixedAckedTsKey)) + if !ackedIter.Valid() { + return nil + } + + startID, err = strconv.ParseInt(string(ackedIter.Key().Data())[FixedChannelNameLen+1:], 10, 64) + if err != nil { + return err + } + if endID > startID { + newPos := fixedAckedTsKey + "/" + strconv.FormatInt(endID, 10) + ackedIter.Seek([]byte(newPos)) + } + + for ; ackedIter.Valid(); ackedIter.Next() { + aKey := ackedIter.Key() + aValue := ackedIter.Value() + ackedTs, err := strconv.ParseInt(string(aValue.Data()), 10, 64) + if aValue != nil { + aValue.Free() + } + if err != nil { + if aKey != nil { + aKey.Free() + } + return err + } + if msgTimeExpiredCheck(ackedTs) { + endID, err = strconv.ParseInt(string(aKey.Data())[FixedChannelNameLen+1:], 10, 64) + if aKey != nil { + aKey.Free() + } + if err != nil { + return err + } + } else { + if aKey != nil { + aKey.Free() + } + break + } + } + + if endID == 0 { + log.Debug("All messaged are not time expired") + } + log.Debug("Expired check by retention time", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize)) + + ackedSizeKey := AckedSizeTitle + topic + totalAckedSizeVal, err := ri.kv.Load(ackedSizeKey) + if err != nil { + return err + } + totalAckedSize, err := strconv.ParseInt(totalAckedSizeVal, 10, 64) + if err != nil { + return err + } + + for ; pageIter.Valid(); pageIter.Next() { + pValue := pageIter.Value() + size, err := strconv.ParseInt(string(pValue.Data()), 10, 64) + if pValue != nil { + pValue.Free() + } + if err != nil { + return err + } + curDeleteSize := deletedAckedSize + size + if msgSizeExpiredCheck(curDeleteSize, totalAckedSize) { + pKey := pageIter.Key() + endID, err = strconv.ParseInt(string(pKey.Data())[FixedChannelNameLen+1:], 10, 64) + if pKey != nil { + pKey.Free() + } + if err != nil { + return err + } + pageEndID = endID + deletedAckedSize += size + } else { + break + } + } + if endID == 0 { + log.Debug("All messages are not expired") + return nil + } + log.Debug("ExpiredCleanUp: ", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize)) + + writeBatch := gorocksdb.NewWriteBatch() + defer writeBatch.Destroy() + + pageStartIDKey := pageMsgPrefix + "/" + strconv.FormatInt(pageStartID, 10) + pageEndIDKey := pageMsgPrefix + "/" + strconv.FormatInt(pageEndID, 10) + if pageStartID == pageEndID { + writeBatch.Delete([]byte(pageStartIDKey)) + } else if pageStartID < pageEndID { + writeBatch.DeleteRange([]byte(pageStartIDKey), []byte(pageEndIDKey)) + } + + ackedStartIDKey := fixedAckedTsKey + "/" + strconv.Itoa(int(startID)) + ackedEndIDKey := fixedAckedTsKey + "/" + strconv.Itoa(int(endID)) + if startID > endID { + return nil + } else if startID == endID { + writeBatch.Delete([]byte(ackedStartIDKey)) + } else { + writeBatch.DeleteRange([]byte(ackedStartIDKey), []byte(ackedEndIDKey)) + } + + newAckedSize := totalAckedSize - deletedAckedSize + writeBatch.Put([]byte(ackedSizeKey), []byte(strconv.FormatInt(newAckedSize, 10))) + writeOpts := gorocksdb.NewDefaultWriteOptions() + defer writeOpts.Destroy() + ri.kv.DB.Write(writeOpts, writeBatch) + + return DeleteMessages(ri.db, topic, startID, endID) +} + +/* // 1. Obtain pageAckedInfo and do time expired check, get the expired page scope; // 2. Do iteration in the page after the last page in step 1 and get the last time expired message id; // 3. Do size expired check in next page, and get the last size expired message id; @@ -494,6 +715,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { return DeleteMessages(ri.db, topic, startID, endID) } +*/ // DeleteMessages in rocksdb by range of [startID, endID) func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error { diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go index b1cb5384c2..343db3be3d 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go @@ -52,6 +52,8 @@ func genRandonName() string { func TestRmqRetention(t *testing.T) { atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0) atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 0) + atomic.StoreInt64(&TickerTimeInSeconds, 2) + defer atomic.StoreInt64(&TickerTimeInSeconds, 6) kvPath := retentionPath + kvPathSuffix defer os.RemoveAll(kvPath) idAllocator := InitIDAllocator(kvPath) @@ -100,7 +102,7 @@ func TestRmqRetention(t *testing.T) { } assert.Equal(t, len(cMsgs), msgNum) - checkTimeInterval := 6 + checkTimeInterval := 2 time.Sleep(time.Duration(checkTimeInterval+1) * time.Second) // Seek to a previous consumed message, the message should be clean up err = rmq.Seek(topicName, groupName, cMsgs[msgNum/2].MsgID) @@ -108,6 +110,15 @@ func TestRmqRetention(t *testing.T) { newRes, err := rmq.Consume(topicName, groupName, 1) assert.Nil(t, err) assert.Equal(t, len(newRes), 0) + + ////////////////////////////////////////////////// + lastRetTsKey := LastRetTsTitle + topicName + rmq.kv.Save(lastRetTsKey, "") + time.Sleep(time.Duration(checkTimeInterval+1) * time.Second) + + ////////////////////////////////////////////////// + rmq.kv.Save(lastRetTsKey, "dummy") + time.Sleep(time.Duration(checkTimeInterval+1) * time.Second) } func TestRetentionInfo_InitRetentionInfo(t *testing.T) { @@ -293,35 +304,40 @@ func TestRetentionInfo_LoadRetentionInfo(t *testing.T) { ////////////////////////////////////////////////// fixedPageSizeKey1, _ := constructKey(PageMsgSizeTitle, topicName) pageMsgSizeKey1 := fixedPageSizeKey1 + "/" + "dummy" - err = rmq.retentionInfo.kv.Save(pageMsgSizeKey1, "dummy") - assert.Nil(t, err) + rmq.retentionInfo.kv.Save(pageMsgSizeKey1, "dummy") wg.Add(1) rmq.retentionInfo.loadRetentionInfo(topicName, &wg) - err = rmq.retentionInfo.kv.Remove(pageMsgSizeKey1) - assert.Nil(t, err) + rmq.retentionInfo.kv.Remove(pageMsgSizeKey1) ////////////////////////////////////////////////// - topicMu.Delete(topicName) - topicMu.Store(topicName, &sync.Mutex{}) - err = rmq.retentionInfo.expiredCleanUp(topicName) - assert.Nil(t, err) - - ////////////////////////////////////////////////// - topicMu.Delete(topicName) - err = rmq.retentionInfo.expiredCleanUp(topicName) - // TopicName has been deleted in line 310 - assert.NotNil(t, err) - - ////////////////////////////////////////////////// - rmq.retentionInfo.ackedInfo.Delete(topicName) - err = rmq.retentionInfo.expiredCleanUp(topicName) - assert.Nil(t, err) + pageMsgPrefix, _ := constructKey(PageMsgSizeTitle, topicName) + pageMsgKey := pageMsgPrefix + "/dummy" + rmq.kv.Save(pageMsgKey, "0") + rmq.retentionInfo.newExpiredCleanUp(topicName) ////////////////////////////////////////////////// rmq.retentionInfo.kv.DB = nil wg.Add(1) rmq.retentionInfo.loadRetentionInfo(topicName, &wg) + rmq.retentionInfo.kv.Remove(pageMsgSizeKey1) + ////////////////////////////////////////////////// + longTopic := strings.Repeat("dummy", 100) + wg.Add(1) + rmq.retentionInfo.loadRetentionInfo(longTopic, &wg) + + ////////////////////////////////////////////////// + topicMu.Delete(topicName) + topicMu.Store(topicName, topicName) + rmq.retentionInfo.newExpiredCleanUp(topicName) + + ////////////////////////////////////////////////// + topicMu.Delete(topicName) + rmq.retentionInfo.newExpiredCleanUp(topicName) + + ////////////////////////////////////////////////// + rmq.retentionInfo.ackedInfo.Delete(topicName) + rmq.retentionInfo.newExpiredCleanUp(topicName) } func TestRmqRetention_Complex(t *testing.T) { @@ -394,13 +410,13 @@ func TestRmqRetention_PageTimeExpire(t *testing.T) { atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 0) atomic.StoreInt64(&RocksmqPageSize, 10) kvPath := retentionPath + "kv_com1" - defer os.RemoveAll(kvPath) + os.RemoveAll(kvPath) idAllocator := InitIDAllocator(kvPath) rocksdbPath := retentionPath + "db_com1" - defer os.RemoveAll(rocksdbPath) + os.RemoveAll(rocksdbPath) metaPath := retentionPath + "meta_kv_com1" - defer os.RemoveAll(metaPath) + os.RemoveAll(metaPath) rmq, err := NewRocksMQ(rocksdbPath, idAllocator) assert.Nil(t, err)