diff --git a/internal/util/rocksmq/server/rocksmq/global_rmq.go b/internal/util/rocksmq/server/rocksmq/global_rmq.go index d4cd47297a..15d473e7ca 100644 --- a/internal/util/rocksmq/server/rocksmq/global_rmq.go +++ b/internal/util/rocksmq/server/rocksmq/global_rmq.go @@ -14,6 +14,7 @@ package rocksmq import ( "os" "sync" + "sync/atomic" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/log" @@ -56,8 +57,8 @@ func InitRocksMQ() error { idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rocksdbKV) _ = idAllocator.Initialize() - RocksmqRetentionTimeInMinutes = params.ParseInt64("rocksmq.retentionTimeInMinutes") - RocksmqRetentionSizeInMB = params.ParseInt64("rocksmq.retentionSizeInMB") + atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, params.ParseInt64("rocksmq.retentionTimeInMinutes")) + atomic.StoreInt64(&RocksmqRetentionSizeInMB, params.ParseInt64("rocksmq.retentionSizeInMB")) log.Debug("Rocksmq retention: ", zap.Any("RocksmqRetentionTimeInMinutes", RocksmqRetentionTimeInMinutes), zap.Any("RocksmqRetentionSizeInMB", RocksmqRetentionSizeInMB)) Rmq, err = NewRocksMQ(rocksdbName, idAllocator) if err != nil { diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index b784e71e88..1a4c3359a4 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -29,11 +29,12 @@ import ( type UniqueID = typeutil.UniqueID +var RocksmqPageSize int64 = 2 << 30 + const ( DefaultMessageID = "-1" FixedChannelNameLen = 320 RocksDBLRUCacheCapacity = 3 << 30 - RocksmqPageSize = 2 << 30 kvSuffix = "_meta_kv" @@ -275,19 +276,6 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error { func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer) { key := groupName + "/" + topicName + "/current_id" - // keyExist := false - // if ll, ok := topicMu.Load(topicName); !ok { - // keyExist = rmq.checkKeyExist(key) - // } else { - // if lock, lok := ll.(*sync.Mutex); lok { - // lock.Lock() - // defer lock.Unlock() - // keyExist = rmq.checkKeyExist(key) - // } else { - // keyExist = rmq.checkKeyExist(key) - // } - // } - if rmq.checkKeyExist(key) { if vals, ok := rmq.consumers.Load(topicName); ok { for _, v := range vals.([]*Consumer) { @@ -376,22 +364,23 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error } if UniqueID(msgLen) != idEnd-idStart { - log.Debug("RocksMQ: Obtained id length is not equal that of message") return errors.New("Obtained id length is not equal that of message") } /* Step I: Insert data to store system */ batch := gorocksdb.NewWriteBatch() msgSizes := make(map[UniqueID]int64) + msgIDs := make([]UniqueID, msgLen) for i := 0; i < msgLen && idStart+UniqueID(i) < idEnd; i++ { - key, err := combKey(topicName, idStart+UniqueID(i)) + msgID := idStart + UniqueID(i) + key, err := combKey(topicName, msgID) if err != nil { - log.Debug("RocksMQ: combKey(" + topicName + "," + strconv.FormatInt(idStart+UniqueID(i), 10) + ")") return err } batch.Put([]byte(key), messages[i].Payload) - msgSizes[idStart+UniqueID(i)] = int64(len(messages[i].Payload)) + msgIDs[i] = msgID + msgSizes[msgID] = int64(len(messages[i].Payload)) } err = rmq.store.Write(gorocksdb.NewDefaultWriteOptions(), batch) @@ -438,14 +427,14 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error // Update message page info // TODO(yukun): Should this be in a go routine - err = rmq.UpdatePageInfo(topicName, msgSizes) + err = rmq.UpdatePageInfo(topicName, msgIDs, msgSizes) if err != nil { return err } return nil } -func (rmq *rocksmq) UpdatePageInfo(topicName string, msgSizes map[UniqueID]int64) error { +func (rmq *rocksmq) UpdatePageInfo(topicName string, msgIDs []UniqueID, msgSizes map[UniqueID]int64) error { msgSizeKey := MessageSizeTitle + topicName msgSizeVal, err := rmq.kv.Load(msgSizeKey) if err != nil { @@ -459,11 +448,12 @@ func (rmq *rocksmq) UpdatePageInfo(topicName string, msgSizes map[UniqueID]int64 if err != nil { return err } - for k, v := range msgSizes { - if curMsgSize+v > RocksmqPageSize { + for _, id := range msgIDs { + msgSize := msgSizes[id] + if curMsgSize+msgSize > RocksmqPageSize { // Current page is full - newPageSize := curMsgSize + v - pageEndID := k + newPageSize := curMsgSize + msgSize + pageEndID := id // Update page message size for current page. key is page end ID pageMsgSizeKey := fixedPageSizeKey + "/" + strconv.FormatInt(pageEndID, 10) err := rmq.kv.Save(pageMsgSizeKey, strconv.FormatInt(newPageSize, 10)) @@ -484,7 +474,7 @@ func (rmq *rocksmq) UpdatePageInfo(topicName string, msgSizes map[UniqueID]int64 } curMsgSize = 0 } else { - curMsgSize += v + curMsgSize += msgSize // Update message size to current message size err := rmq.kv.Save(msgSizeKey, strconv.FormatInt(curMsgSize, 10)) if err != nil { @@ -629,7 +619,7 @@ func (rmq *rocksmq) Notify(topicName, groupName string) { } func (rmq *rocksmq) UpdateAckedInfo(topicName, groupName string, newID UniqueID, msgSize int64) error { - ll, ok := rmq.ackedMu.Load(topicName) + ll, ok := topicMu.Load(topicName) if !ok { return fmt.Errorf("topic name = %s not exist", topicName) } @@ -685,6 +675,11 @@ func (rmq *rocksmq) UpdateAckedInfo(topicName, groupName string, newID UniqueID, if err != nil { return err } + if info, ok := rmq.retentionInfo.ackedInfo.Load(topicName); ok { + ackedInfo := info.(*topicAckedInfo) + ackedInfo.ackedTs[minBeginID] = ts + rmq.retentionInfo.ackedInfo.Store(topicName, ackedInfo) + } if minBeginID == newID { // Means the begin_id of topic update to newID, so needs to update acked size ackedSizeKey := AckedSizeTitle + topicName diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go index d25a7de065..8280367049 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go @@ -21,11 +21,26 @@ import ( "time" "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/internal/util/paramtable" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" "github.com/stretchr/testify/assert" ) +var Params paramtable.BaseTable +var rmqPath string = "/tmp/rocksmq" + +func InitIDAllocator(kvPath string) *allocator.GlobalIDAllocator { + rocksdbKV, err := rocksdbkv.NewRocksdbKV(kvPath) + if err != nil { + panic(err) + } + idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rocksdbKV) + _ = idAllocator.Initialize() + return idAllocator +} + func TestFixChannelName(t *testing.T) { name := "abcd" fixName, err := fixChannelName(name) @@ -42,23 +57,113 @@ func etcdEndpoints() []string { return etcdEndpoints } -func TestRocksMQ(t *testing.T) { - ep := etcdEndpoints() - etcdKV, err := etcdkv.NewEtcdKV(ep, "/etcd/test/root") - assert.Nil(t, err) - defer etcdKV.Close() +func TestInitRmq(t *testing.T) { + name := "/tmp/rmq_init" + endpoints := os.Getenv("ETCD_ENDPOINTS") + if endpoints == "" { + endpoints = "localhost:2379" + } + etcdEndpoints := strings.Split(endpoints, ",") + etcdKV, err := etcdkv.NewEtcdKV(etcdEndpoints, "/etcd/test/root") + if err != nil { + log.Fatalf("New clientv3 error = %v", err) + } idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV) _ = idAllocator.Initialize() - name := "/tmp/rocksmq" - _ = os.RemoveAll(name) - defer os.RemoveAll(name) - kvName := name + "_meta_kv" - _ = os.RemoveAll(kvName) - defer os.RemoveAll(kvName) - rmq, err := NewRocksMQ(name, idAllocator) + err = InitRmq(name, idAllocator) + defer Rmq.stopRetention() + assert.NoError(t, err) + defer CloseRocksMQ() +} + +func TestGlobalRmq(t *testing.T) { + // Params.Init() + rmqPath := "/tmp/milvus/rdb_data_global" + os.Setenv("ROCKSMQ_PATH", rmqPath) + defer os.RemoveAll(rmqPath) + err := InitRocksMQ() + defer Rmq.stopRetention() + assert.NoError(t, err) + defer CloseRocksMQ() +} + +func TestRegisterConsumer(t *testing.T) { + kvPath := rmqPath + "_kv_register" + defer os.RemoveAll(kvPath) + idAllocator := InitIDAllocator(kvPath) + + rocksdbPath := path + "_db_register" + defer os.RemoveAll(rocksdbPath) + metaPath := path + "_meta_kv_register" + defer os.RemoveAll(metaPath) + + rmq, err := NewRocksMQ(rocksdbPath, idAllocator) + assert.NoError(t, err) + defer rmq.stopRetention() + + topicName := "topic_register" + groupName := "group_register" + _ = rmq.DestroyConsumerGroup(topicName, groupName) + err = rmq.CreateConsumerGroup(topicName, groupName) assert.Nil(t, err) + consumer := &Consumer{ + Topic: topicName, + GroupName: groupName, + MsgMutex: make(chan struct{}), + } + rmq.RegisterConsumer(consumer) + exist, _ := rmq.ExistConsumerGroup(topicName, groupName) + assert.Equal(t, exist, true) + dummyGrpName := "group_dummy" + exist, _ = rmq.ExistConsumerGroup(topicName, dummyGrpName) + assert.Equal(t, exist, false) + + msgA := "a_message" + pMsgs := make([]ProducerMessage, 1) + pMsgA := ProducerMessage{Payload: []byte(msgA)} + pMsgs[0] = pMsgA + + _ = idAllocator.UpdateID() + err = rmq.Produce(topicName, pMsgs) + assert.Error(t, err) + + rmq.Notify(topicName, groupName) + + consumer1 := &Consumer{ + Topic: topicName, + GroupName: groupName, + MsgMutex: make(chan struct{}), + } + rmq.RegisterConsumer(consumer1) + + groupName2 := "group_register2" + consumer2 := &Consumer{ + Topic: topicName, + GroupName: groupName2, + MsgMutex: make(chan struct{}), + } + rmq.RegisterConsumer(consumer2) + + err = rmq.DestroyConsumerGroup(topicName, groupName) + assert.NoError(t, err) +} + +func TestRocksMQ(t *testing.T) { + kvPath := rmqPath + "_kv_rmq" + defer os.RemoveAll(kvPath) + idAllocator := InitIDAllocator(kvPath) + + rocksdbPath := path + "_db_rmq" + defer os.RemoveAll(rocksdbPath) + metaPath := path + "_meta_kv_rmq" + defer os.RemoveAll(metaPath) + + rmq, err := NewRocksMQ(rocksdbPath, idAllocator) + assert.Nil(t, err) + defer rmq.stopRetention() + channelName := "channel_a" err = rmq.CreateTopic(channelName) assert.Nil(t, err) @@ -96,7 +201,56 @@ func TestRocksMQ(t *testing.T) { assert.Equal(t, len(cMsgs), 2) assert.Equal(t, string(cMsgs[0].Payload), "b_message") assert.Equal(t, string(cMsgs[1].Payload), "c_message") +} + +func TestRocksMQDummy(t *testing.T) { + kvPath := rmqPath + "_kv_dummy" + defer os.RemoveAll(kvPath) + idAllocator := InitIDAllocator(kvPath) + + rocksdbPath := path + "_db_dummy" + defer os.RemoveAll(rocksdbPath) + metaPath := path + "_meta_kv_dummy" + defer os.RemoveAll(metaPath) + + rmq, err := NewRocksMQ(rocksdbPath, idAllocator) + assert.Nil(t, err) + + channelName := "channel_a" + err = rmq.CreateTopic(channelName) + assert.Nil(t, err) + defer rmq.DestroyTopic(channelName) + err = rmq.CreateTopic(channelName) + assert.NoError(t, err) + + channelName1 := "channel_dummy" + err = rmq.DestroyTopic(channelName1) + assert.NoError(t, err) + + err = rmq.DestroyConsumerGroup(channelName, channelName1) + assert.NoError(t, err) + + err = rmq.Produce(channelName, nil) + assert.Error(t, err) + + err = rmq.Produce(channelName1, nil) + assert.Error(t, err) + + groupName1 := "group_dummy" + err = rmq.Seek(channelName1, groupName1, 0) + assert.Error(t, err) + rmq.stopRetention() + channelName2 := strings.Repeat(channelName1, 100) + err = rmq.CreateTopic(string(channelName2)) + assert.NoError(t, err) + err = rmq.Produce(string(channelName2), nil) + assert.Error(t, err) + + msgA := "a_message" + pMsgs := make([]ProducerMessage, 1) + pMsgA := ProducerMessage{Payload: []byte(msgA)} + pMsgs[0] = pMsgA } func TestRocksMQ_Loop(t *testing.T) { @@ -115,6 +269,7 @@ func TestRocksMQ_Loop(t *testing.T) { defer os.RemoveAll(kvName) rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) + defer rmq.stopRetention() loopNum := 100 channelName := "channel_test" @@ -164,7 +319,6 @@ func TestRocksMQ_Loop(t *testing.T) { cMsgs, err = rmq.Consume(channelName, groupName, 1) assert.Nil(t, err) assert.Equal(t, len(cMsgs), 0) - rmq.stopRetention() } func TestRocksMQ_Goroutines(t *testing.T) { @@ -182,6 +336,7 @@ func TestRocksMQ_Goroutines(t *testing.T) { defer os.RemoveAll(kvName) rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) + defer rmq.stopRetention() loopNum := 100 channelName := "channel_test" @@ -225,7 +380,6 @@ func TestRocksMQ_Goroutines(t *testing.T) { }(&wg, rmq) } wg.Wait() - rmq.stopRetention() } /** @@ -253,6 +407,7 @@ func TestRocksMQ_Throughout(t *testing.T) { defer os.RemoveAll(kvName) rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) + defer rmq.stopRetention() channelName := "channel_throughout_test" err = rmq.CreateTopic(channelName) @@ -289,7 +444,6 @@ func TestRocksMQ_Throughout(t *testing.T) { ct1 := time.Now().UnixNano() / int64(time.Millisecond) cDuration := ct1 - ct0 log.Printf("Total consume %d item, cost %v ms, throughout %v / s", entityNum, cDuration, int64(entityNum)*1000/cDuration) - rmq.stopRetention() } func TestRocksMQ_MultiChan(t *testing.T) { @@ -307,6 +461,7 @@ func TestRocksMQ_MultiChan(t *testing.T) { defer os.RemoveAll(kvName) rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) + defer rmq.stopRetention() channelName0 := "chan01" channelName1 := "chan11" @@ -338,5 +493,4 @@ func TestRocksMQ_MultiChan(t *testing.T) { assert.Nil(t, err) assert.Equal(t, len(cMsgs), 1) assert.Equal(t, string(cMsgs[0].Payload), "for_chann1_"+strconv.Itoa(0)) - rmq.stopRetention() } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go index f065ef4b0b..5b5e9a5211 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go @@ -16,6 +16,7 @@ import ( "fmt" "strconv" "sync" + "sync/atomic" "time" rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" @@ -27,7 +28,6 @@ import ( var RocksmqRetentionTimeInMinutes int64 var RocksmqRetentionSizeInMB int64 var TickerTimeInMinutes int64 = 1 -var CheckTimeInterval int64 = 6 const ( MB = 2 << 20 @@ -194,10 +194,6 @@ func (ri *retentionInfo) loadRetentionInfo(topic string, wg *sync.WaitGroup) { log.Debug("PrefixLoad failed", zap.Any("error", err)) return } - if len(keys) != len(vals) { - log.Debug("LoadWithPrefix return unequal value length of keys and values") - return - } for i, key := range keys { offset := FixedChannelNameLen + 1 @@ -260,13 +256,13 @@ func (ri *retentionInfo) retention() error { return nil case t := <-ticker.C: timeNow := t.Unix() - checkTime := RocksmqRetentionTimeInMinutes * MINUTE / 10 - log.Debug("A retention triggered by time ticker: ", zap.Any("ticker", timeNow)) + checkTime := atomic.LoadInt64(&RocksmqRetentionTimeInMinutes) * 60 / 10 + log.Debug("In ticker: ", zap.Any("ticker", timeNow)) ri.lastRetentionTime.Range(func(k, v interface{}) bool { if v.(int64)+checkTime < timeNow { err := ri.expiredCleanUp(k.(string)) if err != nil { - panic(err) + log.Warn("Retention expired clean failed", zap.Any("error", err)) } } return true @@ -360,11 +356,11 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { break } } - if endID == 0 { - log.Debug("All messages are not expired", zap.Any("topic", topic)) - return nil - } log.Debug("Expired check by retention time", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize)) + // if endID == 0 { + // log.Debug("All messages are not expired") + // return nil + // } // Delete page message size in rocksdb_kv if pageInfo != nil { @@ -406,6 +402,10 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { } ri.pageInfo.Store(topic, pageInfo) } + if endID == 0 { + log.Debug("All messages are not expired") + return nil + } log.Debug("ExpiredCleanUp: ", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize)) // Delete acked_ts in rocksdb_kv @@ -483,9 +483,9 @@ func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) err } func msgTimeExpiredCheck(ackedTs int64) bool { - return ackedTs+RocksmqRetentionTimeInMinutes*MINUTE < time.Now().Unix() + return ackedTs+atomic.LoadInt64(&RocksmqRetentionTimeInMinutes)*MINUTE < time.Now().Unix() } func msgSizeExpiredCheck(deletedAckedSize, ackedSize int64) bool { - return ackedSize-deletedAckedSize > RocksmqRetentionSizeInMB*MB + return ackedSize-deletedAckedSize > atomic.LoadInt64(&RocksmqRetentionSizeInMB)*MB } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go index 09f0df64dd..162843efbf 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go @@ -14,38 +14,34 @@ package rocksmq import ( "os" "strconv" + "strings" + "sync" + "sync/atomic" "testing" "time" - "github.com/milvus-io/milvus/internal/allocator" - rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" + "github.com/milvus-io/milvus/internal/log" "github.com/stretchr/testify/assert" + "go.uber.org/zap" ) -func InitIDAllocator(kvPath string) *allocator.GlobalIDAllocator { - rocksdbKV, err := rocksdbkv.NewRocksdbKV(kvPath) - if err != nil { - panic(err) - } - idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rocksdbKV) - _ = idAllocator.Initialize() - return idAllocator -} +var path string = "/tmp/rmq_retention" func TestRmqRetention(t *testing.T) { - //RocksmqRetentionSizeInMB = 0 - //RocksmqRetentionTimeInMinutes = 0 - kvPath := "/tmp/rocksmq_idAllocator_kv" + atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0) + atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 0) + kvPath := path + "_kv" defer os.RemoveAll(kvPath) idAllocator := InitIDAllocator(kvPath) - rocksdbPath := "/tmp/rocksmq_test" + rocksdbPath := path + "_db" defer os.RemoveAll(rocksdbPath) - metaPath := rocksdbPath + "_meta_kv" + metaPath := path + "_meta_kv" defer os.RemoveAll(metaPath) rmq, err := NewRocksMQ(rocksdbPath, idAllocator) assert.Nil(t, err) + defer rmq.stopRetention() topicName := "topic_a" err = rmq.CreateTopic(topicName) @@ -81,7 +77,8 @@ func TestRmqRetention(t *testing.T) { } assert.Equal(t, len(cMsgs), msgNum) - time.Sleep(time.Duration(CheckTimeInterval+1) * time.Second) + checkTimeInterval := 6 + time.Sleep(time.Duration(checkTimeInterval+1) * time.Second) // Seek to a previous consumed message, the message should be clean up err = rmq.Seek(topicName, groupName, cMsgs[msgNum/2].MsgID) assert.Nil(t, err) @@ -89,3 +86,205 @@ func TestRmqRetention(t *testing.T) { assert.Nil(t, err) assert.Equal(t, len(newRes), 0) } + +func TestLoadRetentionInfo(t *testing.T) { + atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 0) + atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0) + atomic.StoreInt64(&RocksmqPageSize, 100) + kvPath := path + "_kv_load" + defer os.RemoveAll(kvPath) + idAllocator := InitIDAllocator(kvPath) + + rocksdbPath := path + "_db_load" + defer os.RemoveAll(rocksdbPath) + metaPath := path + "_meta_load" + + defer os.RemoveAll(metaPath) + + rmq, err := NewRocksMQ(rocksdbPath, idAllocator) + assert.Nil(t, err) + defer rmq.stopRetention() + + topicName := "topic_a" + err = rmq.CreateTopic(topicName) + assert.Nil(t, err) + defer rmq.DestroyTopic(topicName) + + rmq.retentionInfo.ackedInfo.Delete(topicName) + + msgNum := 100 + pMsgs := make([]ProducerMessage, msgNum) + for i := 0; i < msgNum; i++ { + msg := "message_" + strconv.Itoa(i) + pMsg := ProducerMessage{Payload: []byte(msg)} + pMsgs[i] = pMsg + } + err = rmq.Produce(topicName, pMsgs) + assert.Nil(t, err) + + var wg sync.WaitGroup + wg.Add(1) + rmq.retentionInfo.loadRetentionInfo(topicName, &wg) + + groupName := "test_group" + _ = rmq.DestroyConsumerGroup(topicName, groupName) + err = rmq.CreateConsumerGroup(topicName, groupName) + + consumer := &Consumer{ + Topic: topicName, + GroupName: groupName, + } + rmq.RegisterConsumer(consumer) + + assert.Nil(t, err) + cMsgs := make([]ConsumerMessage, 0) + for i := 0; i < msgNum; i++ { + cMsg, err := rmq.Consume(topicName, groupName, 1) + assert.Nil(t, err) + cMsgs = append(cMsgs, cMsg[0]) + } + assert.Equal(t, len(cMsgs), msgNum) + + wg.Add(1) + + ll, ok := topicMu.Load(topicName) + assert.Equal(t, ok, true) + lock, _ := ll.(*sync.Mutex) + lock.Lock() + defer lock.Unlock() + rmq.retentionInfo.loadRetentionInfo(topicName, &wg) + + initRetentionInfo(rmq.retentionInfo.kv, rmq.store) + + dummyTopic := strings.Repeat(topicName, 100) + err = DeleteMessages(rmq.store, dummyTopic, 0, 0) + assert.Error(t, err) + + err = DeleteMessages(rmq.store, topicName, 0, 0) + assert.NoError(t, err) +} + +func TestComplexRmqRetention(t *testing.T) { + atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0) + atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 1) + atomic.StoreInt64(&RocksmqPageSize, 10) + kvPath := path + "_kv_com" + defer os.RemoveAll(kvPath) + idAllocator := InitIDAllocator(kvPath) + + rocksdbPath := path + "_db_com" + defer os.RemoveAll(rocksdbPath) + metaPath := path + "_meta_kv_com" + defer os.RemoveAll(metaPath) + + rmq, err := NewRocksMQ(rocksdbPath, idAllocator) + assert.Nil(t, err) + defer rmq.stopRetention() + + topicName := "topic_a" + err = rmq.CreateTopic(topicName) + assert.Nil(t, err) + defer rmq.DestroyTopic(topicName) + + msgNum := 100 + pMsgs := make([]ProducerMessage, msgNum) + for i := 0; i < msgNum; i++ { + msg := "message_" + strconv.Itoa(i) + pMsg := ProducerMessage{Payload: []byte(msg)} + pMsgs[i] = pMsg + } + err = rmq.Produce(topicName, pMsgs) + assert.Nil(t, err) + + groupName := "test_group" + _ = rmq.DestroyConsumerGroup(topicName, groupName) + err = rmq.CreateConsumerGroup(topicName, groupName) + + consumer := &Consumer{ + Topic: topicName, + GroupName: groupName, + } + rmq.RegisterConsumer(consumer) + + assert.Nil(t, err) + cMsgs := make([]ConsumerMessage, 0) + for i := 0; i < msgNum; i++ { + cMsg, err := rmq.Consume(topicName, groupName, 1) + assert.Nil(t, err) + cMsgs = append(cMsgs, cMsg[0]) + } + assert.Equal(t, len(cMsgs), msgNum) + + checkTimeInterval := atomic.LoadInt64(&RocksmqRetentionTimeInMinutes) * MINUTE / 10 + time.Sleep(time.Duration(checkTimeInterval*2) * time.Second) + // Seek to a previous consumed message, the message should be clean up + log.Debug("cMsg", zap.Any("id", cMsgs[10].MsgID)) + err = rmq.Seek(topicName, groupName, cMsgs[10].MsgID) + assert.Nil(t, err) + newRes, err := rmq.Consume(topicName, groupName, 1) + assert.Nil(t, err) + assert.NotEqual(t, newRes[0].MsgID, cMsgs[11].MsgID) +} + +func TestRmqRetentionPageTimeExpire(t *testing.T) { + atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0) + atomic.StoreInt64(&RocksmqRetentionTimeInMinutes, 0) + atomic.StoreInt64(&RocksmqPageSize, 10) + kvPath := path + "_kv_com1" + defer os.RemoveAll(kvPath) + idAllocator := InitIDAllocator(kvPath) + + rocksdbPath := path + "_db_com1" + defer os.RemoveAll(rocksdbPath) + metaPath := path + "_meta_kv_com1" + defer os.RemoveAll(metaPath) + + rmq, err := NewRocksMQ(rocksdbPath, idAllocator) + assert.Nil(t, err) + defer rmq.stopRetention() + + topicName := "topic_a" + err = rmq.CreateTopic(topicName) + assert.Nil(t, err) + defer rmq.DestroyTopic(topicName) + + msgNum := 100 + pMsgs := make([]ProducerMessage, msgNum) + for i := 0; i < msgNum; i++ { + msg := "message_" + strconv.Itoa(i) + pMsg := ProducerMessage{Payload: []byte(msg)} + pMsgs[i] = pMsg + } + err = rmq.Produce(topicName, pMsgs) + assert.Nil(t, err) + + groupName := "test_group" + _ = rmq.DestroyConsumerGroup(topicName, groupName) + err = rmq.CreateConsumerGroup(topicName, groupName) + + consumer := &Consumer{ + Topic: topicName, + GroupName: groupName, + } + rmq.RegisterConsumer(consumer) + + assert.Nil(t, err) + cMsgs := make([]ConsumerMessage, 0) + for i := 0; i < msgNum; i++ { + cMsg, err := rmq.Consume(topicName, groupName, 1) + assert.Nil(t, err) + cMsgs = append(cMsgs, cMsg[0]) + } + assert.Equal(t, len(cMsgs), msgNum) + + checkTimeInterval := 7 + time.Sleep(time.Duration(checkTimeInterval) * time.Second) + // Seek to a previous consumed message, the message should be clean up + log.Debug("cMsg", zap.Any("id", cMsgs[10].MsgID)) + err = rmq.Seek(topicName, groupName, cMsgs[len(cMsgs)/2].MsgID) + assert.Nil(t, err) + newRes, err := rmq.Consume(topicName, groupName, 1) + assert.Nil(t, err) + assert.Equal(t, len(newRes), 0) + // assert.NotEqual(t, newRes[0].MsgID, cMsgs[11].MsgID) +}