diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index f1eb96e5fe..1d55a6a664 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -530,12 +530,6 @@ func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes return err } - // if pageInfo, ok := rmq.retentionInfo.pageInfo.Load(topicName); ok { - // pageInfo.(*topicPageInfo).pageEndID = append(pageInfo.(*topicPageInfo).pageEndID, pageEndID) - // pageInfo.(*topicPageInfo).pageMsgSize[pageEndID] = newPageSize - // rmq.retentionInfo.pageInfo.Store(topicName, pageInfo) - // } - // Update message size to 0 err = rmq.kv.Save(msgSizeKey, strconv.FormatInt(0, 10)) if err != nil { @@ -638,10 +632,8 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum } consumedIDs := make([]UniqueID, 0, len(consumerMessage)) - msgSize := make([]int64, 0, len(consumerMessage)) for _, msg := range consumerMessage { consumedIDs = append(consumedIDs, msg.MsgID) - msgSize = append(msgSize, int64(len(msg.Payload))) } newID := consumedIDs[len(consumedIDs)-1] err = rmq.seek(topicName, groupName, newID) @@ -650,7 +642,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum return nil, err } - go rmq.updateAckedInfo(topicName, groupName, consumedIDs, msgSize) + go rmq.updateAckedInfo(topicName, groupName, consumedIDs) return consumerMessage, nil } @@ -771,7 +763,7 @@ func (rmq *rocksmq) Notify(topicName, groupName string) { } // updateAckedInfo update acked informations for retention after consume -func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID, msgSize []int64) error { +func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID) error { if len(ids) == 0 { return nil } @@ -786,24 +778,67 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID, lock.Lock() defer lock.Unlock() + firstID := ids[0] lastID := ids[len(ids)-1] fixedBeginIDKey, err := constructKey(BeginIDTitle, topicName) if err != nil { return err } - // Update begin_id for the consumer_group + + // 1. Update begin_id for the consumer_group beginIDKey := fixedBeginIDKey + "/" + groupName err = rmq.kv.Save(beginIDKey, strconv.FormatInt(lastID, 10)) if err != nil { return err } - // Update begin_id for topic + // 2. Try to get the page id between first ID and last ID of ids + pageMsgPrefix, err := constructKey(PageMsgSizeTitle, topicName) + if err != nil { + return err + } + readOpts := gorocksdb.NewDefaultReadOptions() + defer readOpts.Destroy() + readOpts.SetPrefixSameAsStart(true) + iter := rmq.kv.(*rocksdbkv.RocksdbKV).DB.NewIterator(readOpts) + defer iter.Close() + + pageIDs := make([]UniqueID, 0) + pageMsgKey := pageMsgPrefix + "/" + strconv.FormatInt(firstID, 10) + for iter.Seek([]byte(pageMsgKey)); iter.Valid(); iter.Next() { + key := iter.Key() + pageID, err := strconv.ParseInt(string(key.Data())[FixedChannelNameLen+1:], 10, 64) + if key != nil { + key.Free() + } + if err != nil { + return err + } + if pageID <= lastID { + pageIDs = append(pageIDs, pageID) + } else { + break + } + } + if len(pageIDs) == 0 { + return nil + } + + fixedAckedTsKey, err := constructKey(AckedTsTitle, topicName) + if err != nil { + return err + } + + // 3. Update acked ts and acked size for pageIDs if vals, ok := rmq.consumers.Load(topicName); ok { var minBeginID int64 = math.MaxInt64 - for _, v := range vals.([]*Consumer) { - curBeginIDKey := fixedBeginIDKey + "/" + v.GroupName + consumers, ok := vals.([]*Consumer) + if !ok || len(consumers) == 0 { + return nil + } + for _, v := range consumers { + curBeginIDKey := path.Join(fixedBeginIDKey, v.GroupName) curBeginIDVal, err := rmq.kv.Load(curBeginIDKey) if err != nil { return err @@ -816,52 +851,53 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID, minBeginID = curBeginID } } - topicBeginIDKey := TopicBeginIDTitle + topicName - err = rmq.kv.Save(topicBeginIDKey, strconv.FormatInt(minBeginID, 10)) - if err != nil { - return err - } - // Update acked info for msg of begin id - fixedAckedTsKey, err := constructKey(AckedTsTitle, topicName) - if err != nil { - return err - } - - ts := strconv.FormatInt(time.Now().Unix(), 10) - // current behavior is to ack all safe msgID(before minBeginID) - // TODO @silverxia @yukun ack only page separator msg id - ackMsgKvs := make(map[string]string) + nowTs := strconv.FormatInt(time.Now().Unix(), 10) + ackedTsKvs := make(map[string]string) totalAckMsgSize := int64(0) - for i, id := range ids { - // depends on the ids are monotonically increasing - if id <= minBeginID { - totalAckMsgSize += msgSize[i] - key := path.Join(fixedAckedTsKey, strconv.FormatInt(id, 10)) - ackMsgKvs[key] = ts - } - } - err = rmq.kv.MultiSave(ackMsgKvs) + + fixedPageSizeKey, err := constructKey(PageMsgSizeTitle, topicName) if err != nil { return err } - if minBeginID == lastID { - // Means the begin_id of topic update to newID, so needs to update acked size - ackedSizeKey := AckedSizeTitle + topicName - ackedSizeVal, err := rmq.kv.Load(ackedSizeKey) - if err != nil { - return err - } - ackedSize, err := strconv.ParseInt(ackedSizeVal, 10, 64) - if err != nil { - return err - } - ackedSize += totalAckMsgSize - err = rmq.kv.Save(ackedSizeKey, strconv.FormatInt(ackedSize, 10)) - if err != nil { - return err + for _, pID := range pageIDs { + if pID <= minBeginID { + // Update acked info for message pID + pageAckedTsKey := path.Join(fixedAckedTsKey, strconv.FormatInt(pID, 10)) + ackedTsKvs[pageAckedTsKey] = nowTs + + // get current page message size + pageMsgSizeKey := path.Join(fixedPageSizeKey, strconv.FormatInt(pID, 10)) + pageMsgSizeVal, err := rmq.kv.Load(pageMsgSizeKey) + if err != nil { + return err + } + pageMsgSize, err := strconv.ParseInt(pageMsgSizeVal, 10, 64) + if err != nil { + return err + } + totalAckMsgSize += pageMsgSize } } + err = rmq.kv.MultiSave(ackedTsKvs) + if err != nil { + return err + } + + ackedSizeKey := AckedSizeTitle + topicName + ackedSizeVal, err := rmq.kv.Load(ackedSizeKey) + if err != nil { + return err + } + ackedSize, err := strconv.ParseInt(ackedSizeVal, 10, 64) + if err != nil { + return err + } + ackedSize += totalAckMsgSize + err = rmq.kv.Save(ackedSizeKey, strconv.FormatInt(ackedSize, 10)) + if err != nil { + return err + } } return nil } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go index d2ef07c005..70fc95e43c 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go @@ -12,7 +12,6 @@ package rocksmq import ( - "errors" "fmt" "strconv" "sync" @@ -54,32 +53,6 @@ type retentionInfo struct { closeOnce sync.Once } -// Interface LoadWithPrefix() in rocksdbkv needs to close db instance first and then reopen, -// which will cause crash when other goroutines operate the db instance. So here implement a -// prefixLoad without reopen db instance. -func prefixLoad(db *gorocksdb.DB, prefix string) ([]string, []string, error) { - if db == nil { - return nil, nil, errors.New("Rocksdb instance is nil when do prefixLoad") - } - readOpts := gorocksdb.NewDefaultReadOptions() - defer readOpts.Destroy() - readOpts.SetPrefixSameAsStart(true) - iter := db.NewIterator(readOpts) - defer iter.Close() - keys := make([]string, 0) - values := make([]string, 0) - iter.Seek([]byte(prefix)) - for ; iter.Valid(); iter.Next() { - key := iter.Key() - value := iter.Value() - keys = append(keys, string(key.Data())) - key.Free() - values = append(values, string(value.Data())) - value.Free() - } - return keys, values, nil -} - func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInfo, error) { ri := &retentionInfo{ topics: sync.Map{}, @@ -163,9 +136,9 @@ func (ri *retentionInfo) Stop() { func (ri *retentionInfo) expiredCleanUp(topic string) error { log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic) var deletedAckedSize int64 = 0 - var startID UniqueID - var endID UniqueID + var startID UniqueID = 0 var pageStartID UniqueID = 0 + var pageEndID UniqueID = 0 var err error fixedAckedTsKey, _ := constructKey(AckedTsTitle, topic) @@ -203,7 +176,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { return err } if msgTimeExpiredCheck(ackedTs) { - endID = pageID + pageEndID = pageID pValue := pageIter.Value() size, err := strconv.ParseInt(string(pValue.Data()), 10, 64) if pValue != nil { @@ -219,63 +192,62 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { } } - pageEndID := endID + // TODO(yukun): Remove ackedTs expiredCheck one by one + // ackedReadOpts := gorocksdb.NewDefaultReadOptions() + // defer ackedReadOpts.Destroy() + // ackedReadOpts.SetPrefixSameAsStart(true) + // ackedIter := ri.kv.DB.NewIterator(ackedReadOpts) + // defer ackedIter.Close() + // if err != nil { + // return err + // } + // ackedIter.Seek([]byte(fixedAckedTsKey)) + // if !ackedIter.Valid() { + // return nil + // } - ackedReadOpts := gorocksdb.NewDefaultReadOptions() - defer ackedReadOpts.Destroy() - ackedReadOpts.SetPrefixSameAsStart(true) - ackedIter := ri.kv.DB.NewIterator(ackedReadOpts) - defer ackedIter.Close() - if err != nil { - return err - } - ackedIter.Seek([]byte(fixedAckedTsKey)) - if !ackedIter.Valid() { - return nil - } + // startID, err = strconv.ParseInt(string(ackedIter.Key().Data())[FixedChannelNameLen+1:], 10, 64) + // if err != nil { + // return err + // } + // if endID > startID { + // newPos := fixedAckedTsKey + "/" + strconv.FormatInt(endID, 10) + // ackedIter.Seek([]byte(newPos)) + // } - startID, err = strconv.ParseInt(string(ackedIter.Key().Data())[FixedChannelNameLen+1:], 10, 64) - if err != nil { - return err - } - if endID > startID { - newPos := fixedAckedTsKey + "/" + strconv.FormatInt(endID, 10) - ackedIter.Seek([]byte(newPos)) - } + // for ; ackedIter.Valid(); ackedIter.Next() { + // aKey := ackedIter.Key() + // aValue := ackedIter.Value() + // ackedTs, err := strconv.ParseInt(string(aValue.Data()), 10, 64) + // if aValue != nil { + // aValue.Free() + // } + // if err != nil { + // if aKey != nil { + // aKey.Free() + // } + // return err + // } + // if msgTimeExpiredCheck(ackedTs) { + // endID, err = strconv.ParseInt(string(aKey.Data())[FixedChannelNameLen+1:], 10, 64) + // if aKey != nil { + // aKey.Free() + // } + // if err != nil { + // return err + // } + // } else { + // if aKey != nil { + // aKey.Free() + // } + // break + // } + // } - for ; ackedIter.Valid(); ackedIter.Next() { - aKey := ackedIter.Key() - aValue := ackedIter.Value() - ackedTs, err := strconv.ParseInt(string(aValue.Data()), 10, 64) - if aValue != nil { - aValue.Free() - } - if err != nil { - if aKey != nil { - aKey.Free() - } - return err - } - if msgTimeExpiredCheck(ackedTs) { - endID, err = strconv.ParseInt(string(aKey.Data())[FixedChannelNameLen+1:], 10, 64) - if aKey != nil { - aKey.Free() - } - if err != nil { - return err - } - } else { - if aKey != nil { - aKey.Free() - } - break - } - } - - if endID == 0 { + if pageEndID == 0 { log.Debug("All messages are not time expired") } - log.Debug("Expired check by retention time", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize)) + log.Debug("Expired check by retention time", zap.Any("topic", topic), zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize)) ackedSizeKey := AckedSizeTitle + topic totalAckedSizeVal, err := ri.kv.Load(ackedSizeKey) @@ -303,21 +275,20 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { } curDeleteSize := deletedAckedSize + size if msgSizeExpiredCheck(curDeleteSize, totalAckedSize) { - endID, err = strconv.ParseInt(pKeyStr[FixedChannelNameLen+1:], 10, 64) + pageEndID, err = strconv.ParseInt(pKeyStr[FixedChannelNameLen+1:], 10, 64) if err != nil { return err } - pageEndID = endID deletedAckedSize += size } else { break } } - if endID == 0 { + if pageEndID == 0 { log.Debug("All messages are not expired") return nil } - log.Debug("ExpiredCleanUp: ", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize)) + log.Debug("ExpiredCleanUp: ", zap.Any("topic", topic), zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize)) writeBatch := gorocksdb.NewWriteBatch() defer writeBatch.Destroy() @@ -333,14 +304,11 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { } ackedStartIDKey := fixedAckedTsKey + "/" + strconv.Itoa(int(startID)) - ackedEndIDKey := fixedAckedTsKey + "/" + strconv.Itoa(int(endID+1)) - if startID > endID { + ackedEndIDKey := fixedAckedTsKey + "/" + strconv.Itoa(int(pageEndID+1)) + if startID > pageEndID { return nil - } else if startID == endID { - writeBatch.Delete([]byte(ackedStartIDKey)) - } else { - writeBatch.DeleteRange([]byte(ackedStartIDKey), []byte(ackedEndIDKey)) } + writeBatch.DeleteRange([]byte(ackedStartIDKey), []byte(ackedEndIDKey)) ll, ok := topicMu.Load(topic) if !ok { @@ -363,7 +331,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { newAckedSize := currentAckedSize - deletedAckedSize writeBatch.Put([]byte(ackedSizeKey), []byte(strconv.FormatInt(newAckedSize, 10))) - err = DeleteMessages(ri.db, topic, startID, endID) + err = DeleteMessages(ri.db, topic, startID, pageEndID) if err != nil { return err } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go index 57496e7839..d54688c786 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go @@ -52,6 +52,7 @@ func genRandonName() string { func TestRmqRetention(t *testing.T) { atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0) atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 0) + atomic.StoreInt64(&RocksmqPageSize, 10) atomic.StoreInt64(&TickerTimeInSeconds, 2) defer atomic.StoreInt64(&TickerTimeInSeconds, 6) kvPath := retentionPath + kvPathSuffix