From 02d12829496b2a7a724e929d63ebdf6eff47d4ac Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Wed, 22 Dec 2021 11:39:03 +0800 Subject: [PATCH] Fix RocksMQ UT (#13819) Signed-off-by: xiaofan-luan --- internal/kv/rocksdb/rocksdb_kv.go | 18 +- .../rocksmq/server/rocksmq/rocksmq_impl.go | 38 ++-- .../rocksmq/server/rocksmq/rocksmq_reader.go | 28 +-- .../server/rocksmq/rocksmq_retention.go | 18 +- .../server/rocksmq/rocksmq_retention_test.go | 167 +++++++++++++++++- 5 files changed, 216 insertions(+), 53 deletions(-) diff --git a/internal/kv/rocksdb/rocksdb_kv.go b/internal/kv/rocksdb/rocksdb_kv.go index 1c05c7f66a..5cbf7b94ea 100644 --- a/internal/kv/rocksdb/rocksdb_kv.go +++ b/internal/kv/rocksdb/rocksdb_kv.go @@ -98,7 +98,9 @@ func (kv *RocksdbKV) Load(key string) (string, error) { if key == "" { return "", errors.New("rocksdb kv does not support load empty key") } - value, err := kv.DB.Get(kv.ReadOptions, []byte(key)) + option := gorocksdb.NewDefaultReadOptions() + defer option.Destroy() + value, err := kv.DB.Get(option, []byte(key)) if err != nil { return "", err } @@ -112,16 +114,14 @@ func (kv *RocksdbKV) LoadWithPrefix(prefix string) ([]string, []string, error) { if kv.DB == nil { return nil, nil, fmt.Errorf("rocksdb instance is nil when load %s", prefix) } - kv.ReadOptions.SetPrefixSameAsStart(true) - if prefix != "" { - kv.ReadOptions.SetIterateUpperBound([]byte(typeutil.AddOne(prefix))) - } - iter := kv.DB.NewIterator(kv.ReadOptions) + option := gorocksdb.NewDefaultReadOptions() + defer option.Destroy() + iter := kv.DB.NewIterator(option) defer iter.Close() keys := make([]string, 0) values := make([]string, 0) iter.Seek([]byte(prefix)) - for ; iter.Valid(); iter.Next() { + for ; iter.ValidForPrefix([]byte(prefix)); iter.Next() { key := iter.Key() value := iter.Value() keys = append(keys, string(key.Data())) @@ -141,8 +141,10 @@ func (kv *RocksdbKV) MultiLoad(keys []string) ([]string, error) { return nil, errors.New("rocksdb instance is nil when do MultiLoad") } values := make([]string, 0, len(keys)) + option := gorocksdb.NewDefaultReadOptions() + defer option.Destroy() for _, key := range keys { - value, err := kv.DB.Get(kv.ReadOptions, []byte(key)) + value, err := kv.DB.Get(option, []byte(key)) if err != nil { return []string{}, err } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 0d8083844a..c6d6b8bc24 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -513,7 +513,7 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni log.Error("RocksMQ: alloc id failed.", zap.Error(err)) return []UniqueID{}, err } - + allocTime := time.Since(start).Milliseconds() if UniqueID(msgLen) != idEnd-idStart { return []UniqueID{}, errors.New("Obtained id length is not equal that of message") } @@ -538,7 +538,7 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni log.Debug("RocksMQ: write batch failed") return []UniqueID{}, err } - + writeTime := time.Since(start).Milliseconds() if vals, ok := rmq.consumers.Load(topicName); ok { for _, v := range vals.([]*Consumer) { select { @@ -565,12 +565,16 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni if err != nil { return []UniqueID{}, err } + updatePageTime := time.Since(start).Milliseconds() // TODO add this to monitor metrics getProduceTime := time.Since(start).Milliseconds() - if getLockTime > 200 || getProduceTime > 200 { + if getProduceTime > 200 { log.Warn("rocksmq produce too slowly", zap.String("topic", topicName), - zap.Int64("get lock elapse", getLockTime), zap.Int64("produce elapse", getProduceTime)) + zap.Int64("get lock elapse", getLockTime), + zap.Int64("alloc elapse", allocTime), + zap.Int64("write elapse", writeTime), + zap.Int64("updatePage elapse", updatePageTime)) } return msgIDs, nil } @@ -639,14 +643,13 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum readOpts := gorocksdb.NewDefaultReadOptions() defer readOpts.Destroy() - readOpts.SetPrefixSameAsStart(true) - readOpts.SetIterateUpperBound([]byte(typeutil.AddOne(topicName + "/"))) + prefix := topicName + "/" iter := rmq.store.NewIterator(readOpts) defer iter.Close() var dataKey string if currentID == DefaultMessageID { - dataKey = topicName + "/" + dataKey = prefix } else { dataKey = path.Join(topicName, strconv.FormatInt(currentID.(int64), 10)) } @@ -654,7 +657,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum consumerMessage := make([]ConsumerMessage, 0, n) offset := 0 - for ; iter.Valid() && offset < n; iter.Next() { + for ; iter.ValidForPrefix([]byte(prefix)) && offset < n; iter.Next() { key := iter.Key() val := iter.Value() strKey := string(key.Data()) @@ -697,7 +700,6 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum } newID := consumedIDs[len(consumedIDs)-1] rmq.moveConsumePos(topicName, groupName, newID+1) - rmq.updateAckedInfo(topicName, groupName, consumedIDs) // TODO add this to monitor metrics getConsumeTime := time.Since(start).Milliseconds() @@ -784,8 +786,8 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error { iter := rmq.store.NewIterator(readOpts) defer iter.Close() - // 0 is the ASC value of "/" + 1 - iter.SeekForPrev([]byte(topicName + "0")) + // seek to the last message of thie topic + iter.SeekForPrev([]byte(typeutil.AddOne(topicName + "/"))) // if iterate fail if err := iter.Err(); err != nil { @@ -801,7 +803,9 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error { iKey := iter.Key() seekMsgID := string(iKey.Data()) - iKey.Free() + if iKey != nil { + iKey.Free() + } // if find message is not belong to current channel, start from 0 if !strings.Contains(seekMsgID, fixTopicName) { return nil @@ -841,19 +845,16 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID) lastID := ids[len(ids)-1] // 1. Try to get the page id between first ID and last ID of ids - pageMsgPrefix := constructKey(PageMsgSizeTitle, topicName) + pageMsgPrefix := constructKey(PageMsgSizeTitle, topicName) + "/" readOpts := gorocksdb.NewDefaultReadOptions() defer readOpts.Destroy() - pageMsgFirstKey := pageMsgPrefix + "/" + strconv.FormatInt(firstID, 10) - // set last key by lastID - pageMsgLastKey := pageMsgPrefix + "/" + strconv.FormatInt(lastID+1, 10) + pageMsgFirstKey := pageMsgPrefix + strconv.FormatInt(firstID, 10) - readOpts.SetIterateUpperBound([]byte(pageMsgLastKey)) iter := rmq.kv.(*rocksdbkv.RocksdbKV).DB.NewIterator(readOpts) defer iter.Close() var pageIDs []UniqueID - for iter.Seek([]byte(pageMsgFirstKey)); iter.Valid(); iter.Next() { + for iter.Seek([]byte(pageMsgFirstKey)); iter.ValidForPrefix([]byte(pageMsgPrefix)); iter.Next() { key := iter.Key() pageID, err := parsePageID(string(key.Data())) if key != nil { @@ -943,6 +944,7 @@ func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageI reader := &rocksmqReader{ store: rmq.store, topic: topicName, + prefix: []byte(topicName + "/"), readerName: readerName, readOpts: readOpts, iter: iter, diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_reader.go b/internal/util/rocksmq/server/rocksmq/rocksmq_reader.go index ccd9150293..4570c6a61e 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_reader.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_reader.go @@ -25,6 +25,7 @@ import ( type rocksmqReader struct { store *gorocksdb.DB topic string + prefix []byte readerName string readOpts *gorocksdb.ReadOptions @@ -38,8 +39,7 @@ type rocksmqReader struct { //Seek seek the rocksmq reader to the pointed position func (rr *rocksmqReader) Seek(msgID UniqueID) { //nolint:govet rr.currentID = msgID - fixTopicName := rr.topic + "/" - dataKey := path.Join(fixTopicName, strconv.FormatInt(msgID, 10)) + dataKey := path.Join(rr.topic, strconv.FormatInt(msgID, 10)) rr.iter.Seek([]byte(dataKey)) if !rr.messageIDInclusive { rr.currentID++ @@ -56,6 +56,10 @@ func (rr *rocksmqReader) Next(ctx context.Context) (*ConsumerMessage, error) { key := iter.Key() val := iter.Value() tmpKey := string(key.Data()) + if key != nil { + key.Free() + } + var msgID UniqueID msgID, err = strconv.ParseInt(tmpKey[len(rr.topic)+1:], 10, 64) msg = &ConsumerMessage{ @@ -67,15 +71,17 @@ func (rr *rocksmqReader) Next(ctx context.Context) (*ConsumerMessage, error) { msg.Payload = make([]byte, dataLen) copy(msg.Payload, origData) } - val.Free() + if val != nil { + val.Free() + } iter.Next() rr.currentID = msgID } - if iter.Valid() { + if iter.ValidForPrefix(rr.prefix) { getMsg() return msg, err } - + // TODO this is the same logic as pulsar reader, but do we really need to read till the end of the stream select { case <-ctx.Done(): log.Debug("Stop get next reader message!") @@ -87,11 +93,10 @@ func (rr *rocksmqReader) Next(ctx context.Context) (*ConsumerMessage, error) { } rr.iter.Close() rr.iter = rr.store.NewIterator(rr.readOpts) - fixTopicName := rr.topic + "/" - dataKey := path.Join(fixTopicName, strconv.FormatInt(rr.currentID+1, 10)) + dataKey := path.Join(rr.topic, strconv.FormatInt(rr.currentID+1, 10)) iter = rr.iter iter.Seek([]byte(dataKey)) - if !iter.Valid() { + if !iter.ValidForPrefix(rr.prefix) { return nil, errors.New("reader iterater is still invalid after receive mutex") } getMsg() @@ -100,7 +105,7 @@ func (rr *rocksmqReader) Next(ctx context.Context) (*ConsumerMessage, error) { } func (rr *rocksmqReader) HasNext() bool { - if rr.iter.Valid() { + if rr.iter.ValidForPrefix(rr.prefix) { return true } @@ -111,10 +116,9 @@ func (rr *rocksmqReader) HasNext() bool { } rr.iter.Close() rr.iter = rr.store.NewIterator(rr.readOpts) - fixTopicName := rr.topic + "/" - dataKey := path.Join(fixTopicName, strconv.FormatInt(rr.currentID+1, 10)) + dataKey := path.Join(rr.topic, strconv.FormatInt(rr.currentID+1, 10)) rr.iter.Seek([]byte(dataKey)) - return rr.iter.Valid() + return rr.iter.ValidForPrefix(rr.prefix) default: return false } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go index 954f5c3db3..67ed4a0450 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go @@ -21,8 +21,6 @@ import ( rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/typeutil" - "github.com/tecbot/gorocksdb" "go.uber.org/zap" ) @@ -136,6 +134,7 @@ func (ri *retentionInfo) Stop() { // 4. delete message by range of page id; func (ri *retentionInfo) expiredCleanUp(topic string) error { log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic) + start := time.Now() var deletedAckedSize int64 var pageCleaned UniqueID var pageEndID UniqueID @@ -154,14 +153,12 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { } pageReadOpts := gorocksdb.NewDefaultReadOptions() defer pageReadOpts.Destroy() - pageMsgPrefix := constructKey(PageMsgSizeTitle, topic) - // ensure the iterator won't iterate to other topics - pageReadOpts.SetIterateUpperBound([]byte(typeutil.AddOne(pageMsgPrefix))) + pageMsgPrefix := constructKey(PageMsgSizeTitle, topic) + "/" pageIter := ri.kv.DB.NewIterator(pageReadOpts) defer pageIter.Close() pageIter.Seek([]byte(pageMsgPrefix)) - for ; pageIter.Valid(); pageIter.Next() { + for ; pageIter.ValidForPrefix([]byte(pageMsgPrefix)); pageIter.Next() { pKey := pageIter.Key() pageID, err := parsePageID(string(pKey.Data())) if pKey != nil { @@ -240,8 +237,10 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { log.Debug("All messages are not expired, skip retention", zap.Any("topic", topic)) return nil } + expireTime := time.Since(start).Milliseconds() log.Debug("Expired check by message size: ", zap.Any("topic", topic), - zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize), zap.Any("pageCleaned", pageCleaned)) + zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize), + zap.Any("pageCleaned", pageCleaned), zap.Any("time taken", expireTime)) return ri.cleanData(topic, pageEndID) } @@ -250,14 +249,13 @@ func (ri *retentionInfo) calculateTopicAckedSize(topic string) (int64, error) { pageReadOpts := gorocksdb.NewDefaultReadOptions() defer pageReadOpts.Destroy() - pageMsgPrefix := constructKey(PageMsgSizeTitle, topic) + pageMsgPrefix := constructKey(PageMsgSizeTitle, topic) + "/" // ensure the iterator won't iterate to other topics - pageReadOpts.SetIterateUpperBound([]byte(typeutil.AddOne(pageMsgPrefix))) pageIter := ri.kv.DB.NewIterator(pageReadOpts) defer pageIter.Close() pageIter.Seek([]byte(pageMsgPrefix)) var ackedSize int64 - for ; pageIter.Valid(); pageIter.Next() { + for ; pageIter.ValidForPrefix([]byte(pageMsgPrefix)); pageIter.Next() { key := pageIter.Key() pageID, err := parsePageID(string(key.Data())) if key != nil { diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go index 63bfc264a1..5409871cd2 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go @@ -228,6 +228,163 @@ func TestRmqRetention_NotConsumed(t *testing.T) { // Test multiple topic func TestRmqRetention_MultipleTopic(t *testing.T) { + err := os.MkdirAll(retentionPath, os.ModePerm) + if err != nil { + log.Error("MkdirALl error for path", zap.Any("path", retentionPath)) + return + } + defer os.RemoveAll(retentionPath) + // no retention by size + atomic.StoreInt64(&RocksmqRetentionSizeInMB, -1) + // retention by secs + atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 1) + atomic.StoreInt64(&RocksmqPageSize, 10) + atomic.StoreInt64(&TickerTimeInSeconds, 1) + kvPath := retentionPath + "kv_multi_topic" + os.RemoveAll(kvPath) + idAllocator := InitIDAllocator(kvPath) + + rocksdbPath := retentionPath + "db_multi_topic" + os.RemoveAll(rocksdbPath) + metaPath := retentionPath + "meta_multi_topic" + os.RemoveAll(metaPath) + + rmq, err := NewRocksMQ(rocksdbPath, idAllocator) + assert.Nil(t, err) + defer rmq.Close() + + topicName := "topic_a" + err = rmq.CreateTopic(topicName) + assert.Nil(t, err) + defer rmq.DestroyTopic(topicName) + + msgNum := 100 + pMsgs := make([]ProducerMessage, msgNum) + for i := 0; i < msgNum; i++ { + msg := "message_" + strconv.Itoa(i) + pMsg := ProducerMessage{Payload: []byte(msg)} + pMsgs[i] = pMsg + } + ids1, err := rmq.Produce(topicName, pMsgs) + assert.Nil(t, err) + assert.Equal(t, len(pMsgs), len(ids1)) + + topicName = "topic_b" + err = rmq.CreateTopic(topicName) + assert.Nil(t, err) + defer rmq.DestroyTopic(topicName) + pMsgs = make([]ProducerMessage, msgNum) + for i := 0; i < msgNum; i++ { + msg := "message_" + strconv.Itoa(i) + pMsg := ProducerMessage{Payload: []byte(msg)} + pMsgs[i] = pMsg + } + ids2, err := rmq.Produce(topicName, pMsgs) + assert.Nil(t, err) + assert.Equal(t, len(pMsgs), len(ids2)) + + topicName = "topic_a" + groupName := "test_group" + _ = rmq.DestroyConsumerGroup(topicName, groupName) + err = rmq.CreateConsumerGroup(topicName, groupName) + assert.NoError(t, err) + + consumer := &Consumer{ + Topic: topicName, + GroupName: groupName, + } + rmq.RegisterConsumer(consumer) + + cMsgs := make([]ConsumerMessage, 0) + for i := 0; i < msgNum; i++ { + cMsg, err := rmq.Consume(topicName, groupName, 1) + assert.Nil(t, err) + cMsgs = append(cMsgs, cMsg[0]) + } + assert.Equal(t, len(cMsgs), msgNum) + assert.Equal(t, cMsgs[0].MsgID, ids1[0]) + + time.Sleep(time.Duration(3) * time.Second) + + err = rmq.Seek(topicName, groupName, ids1[10]) + assert.Nil(t, err) + newRes, err := rmq.Consume(topicName, groupName, 1) + assert.Nil(t, err) + assert.Equal(t, len(newRes), 0) + + // test acked size acked ts and other meta are updated as expect + msgSizeKey := MessageSizeTitle + topicName + msgSizeVal, err := rmq.kv.Load(msgSizeKey) + assert.NoError(t, err) + assert.Equal(t, msgSizeVal, "0") + + // 100 page left, each entity is a page + pageMsgSizeKey := constructKey(PageMsgSizeTitle, "topic_a") + keys, values, err := rmq.kv.LoadWithPrefix(pageMsgSizeKey) + assert.NoError(t, err) + assert.Equal(t, len(keys), 0) + assert.Equal(t, len(values), 0) + + pageTsSizeKey := constructKey(PageTsTitle, "topic_a") + keys, values, err = rmq.kv.LoadWithPrefix(pageTsSizeKey) + assert.NoError(t, err) + + assert.Equal(t, len(keys), 0) + assert.Equal(t, len(values), 0) + + aclTsSizeKey := constructKey(AckedTsTitle, "topic_a") + keys, values, err = rmq.kv.LoadWithPrefix(aclTsSizeKey) + assert.NoError(t, err) + assert.Equal(t, len(keys), 0) + assert.Equal(t, len(values), 0) + + // for topic B, nothing has been cleadn + pageMsgSizeKey = constructKey(PageMsgSizeTitle, "topic_b") + keys, values, err = rmq.kv.LoadWithPrefix(pageMsgSizeKey) + assert.NoError(t, err) + assert.Equal(t, len(keys), 50) + assert.Equal(t, len(values), 50) + + pageTsSizeKey = constructKey(PageTsTitle, "topic_b") + keys, values, err = rmq.kv.LoadWithPrefix(pageTsSizeKey) + assert.NoError(t, err) + + assert.Equal(t, len(keys), 50) + assert.Equal(t, len(values), 50) + + aclTsSizeKey = constructKey(AckedTsTitle, "topic_b") + keys, values, err = rmq.kv.LoadWithPrefix(aclTsSizeKey) + assert.NoError(t, err) + assert.Equal(t, len(keys), 0) + assert.Equal(t, len(values), 0) + + topicName = "topic_b" + _ = rmq.DestroyConsumerGroup(topicName, groupName) + err = rmq.CreateConsumerGroup(topicName, groupName) + assert.NoError(t, err) + + consumer = &Consumer{ + Topic: topicName, + GroupName: groupName, + } + rmq.RegisterConsumer(consumer) + + cMsgs = make([]ConsumerMessage, 0) + for i := 0; i < msgNum; i++ { + cMsg, err := rmq.Consume(topicName, groupName, 1) + assert.Nil(t, err) + cMsgs = append(cMsgs, cMsg[0]) + } + assert.Equal(t, len(cMsgs), msgNum) + assert.Equal(t, cMsgs[0].MsgID, ids2[0]) + + time.Sleep(time.Duration(3) * time.Second) + + err = rmq.Seek(topicName, groupName, ids2[10]) + assert.Nil(t, err) + newRes, err = rmq.Consume(topicName, groupName, 1) + assert.Nil(t, err) + assert.Equal(t, len(newRes), 0) } @@ -417,7 +574,7 @@ func TestRmqRetention_PageSizeExpire(t *testing.T) { atomic.StoreInt64(&RocksmqRetentionSizeInMB, 1) atomic.StoreInt64(&RocksmqRetentionTimeInSecs, -1) atomic.StoreInt64(&RocksmqPageSize, 10) - atomic.StoreInt64(&TickerTimeInSeconds, 2) + atomic.StoreInt64(&TickerTimeInSeconds, 1) kvPath := retentionPath + "kv_com2" os.RemoveAll(kvPath) idAllocator := InitIDAllocator(kvPath) @@ -466,14 +623,14 @@ func TestRmqRetention_PageSizeExpire(t *testing.T) { cMsgs = append(cMsgs, cMsg[0]) } assert.Equal(t, len(cMsgs), msgNum) - - time.Sleep(time.Duration(3) * time.Second) + log.Debug("Already consumed, wait for message cleaned by retention") + // wait for enough time for page expiration + time.Sleep(time.Duration(2) * time.Second) err = rmq.Seek(topicName, groupName, ids[0]) assert.Nil(t, err) newRes, err := rmq.Consume(topicName, groupName, 1) assert.Nil(t, err) assert.Equal(t, len(newRes), 1) // make sure clean up happens - // TODO(yukun): Sometimes failed - // assert.True(t, newRes[0].MsgID > ids[0]) + assert.True(t, newRes[0].MsgID > ids[0]) }