Fix rocksmq incompatible (#20888)

Signed-off-by: lixinguo <xinguo.li@zilliz.com>

Signed-off-by: lixinguo <xinguo.li@zilliz.com>
Co-authored-by: lixinguo <xinguo.li@zilliz.com>
This commit is contained in:
smellthemoon 2022-11-30 10:01:15 +08:00 committed by GitHub
parent c6fc1511aa
commit a46a5ac900
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 125 additions and 6 deletions

View File

@ -743,9 +743,13 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
if err != nil {
return nil, err
}
var properties map[string]string
if err = json.Unmarshal(propertiesValue, &properties); err != nil {
return nil, err
properties := make(map[string]string)
if len(propertiesValue) != 0 {
// before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq
// when produce before 2.2.0, but consume in 2.2.0, propertiesValue will be []
if err = json.Unmarshal(propertiesValue, &properties); err != nil {
return nil, err
}
}
msg := ConsumerMessage{
MsgID: msgID,

View File

@ -12,9 +12,10 @@
package server
import (
"errors"
"fmt"
"log"
"os"
"path"
"strconv"
"strings"
"sync"
@ -26,8 +27,11 @@ import (
"github.com/milvus-io/milvus/internal/common"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/tecbot/gorocksdb"
"go.uber.org/zap"
"github.com/stretchr/testify/assert"
)
@ -37,6 +41,10 @@ var rmqPath = "/tmp/rocksmq"
var kvPathSuffix = "_kv"
var metaPathSuffix = "_meta"
type producerMessageBefore struct {
Payload []byte
}
func InitIDAllocator(kvPath string) *allocator.GlobalIDAllocator {
rocksdbKV, err := rocksdbkv.NewRocksdbKV(kvPath)
if err != nil {
@ -64,6 +72,87 @@ func etcdEndpoints() []string {
return etcdEndpoints
}
// to test compatibility concern
func (rmq *rocksmq) produceBefore(topicName string, messages []producerMessageBefore) ([]UniqueID, error) {
if rmq.isClosed() {
return nil, errors.New(RmqNotServingErrMsg)
}
start := time.Now()
ll, ok := topicMu.Load(topicName)
if !ok {
return []UniqueID{}, fmt.Errorf("topic name = %s not exist", topicName)
}
lock, ok := ll.(*sync.Mutex)
if !ok {
return []UniqueID{}, fmt.Errorf("get mutex failed, topic name = %s", topicName)
}
lock.Lock()
defer lock.Unlock()
getLockTime := time.Since(start).Milliseconds()
msgLen := len(messages)
idStart, idEnd, err := rmq.idAllocator.Alloc(uint32(msgLen))
if err != nil {
return []UniqueID{}, err
}
allocTime := time.Since(start).Milliseconds()
if UniqueID(msgLen) != idEnd-idStart {
return []UniqueID{}, errors.New("Obtained id length is not equal that of message")
}
// Insert data to store system
batch := gorocksdb.NewWriteBatch()
defer batch.Destroy()
msgSizes := make(map[UniqueID]int64)
msgIDs := make([]UniqueID, msgLen)
for i := 0; i < msgLen && idStart+UniqueID(i) < idEnd; i++ {
msgID := idStart + UniqueID(i)
key := path.Join(topicName, strconv.FormatInt(msgID, 10))
batch.Put([]byte(key), messages[i].Payload)
msgIDs[i] = msgID
msgSizes[msgID] = int64(len(messages[i].Payload))
}
opts := gorocksdb.NewDefaultWriteOptions()
defer opts.Destroy()
err = rmq.store.Write(opts, batch)
if err != nil {
return []UniqueID{}, err
}
writeTime := time.Since(start).Milliseconds()
if vals, ok := rmq.consumers.Load(topicName); ok {
for _, v := range vals.([]*Consumer) {
select {
case v.MsgMutex <- struct{}{}:
continue
default:
continue
}
}
}
// Update message page info
err = rmq.updatePageInfo(topicName, msgIDs, msgSizes)
if err != nil {
return []UniqueID{}, err
}
getProduceTime := time.Since(start).Milliseconds()
if getProduceTime > 200 {
log.Warn("rocksmq produce too slowly", zap.String("topic", topicName),
zap.Int64("get lock elapse", getLockTime),
zap.Int64("alloc elapse", allocTime-getLockTime),
zap.Int64("write elapse", writeTime-allocTime),
zap.Int64("updatePage elapse", getProduceTime-writeTime),
zap.Int64("produce total elapse", getProduceTime),
)
}
return msgIDs, nil
}
func TestRocksmq_RegisterConsumer(t *testing.T) {
suffix := "_register"
kvPath := rmqPath + kvPathSuffix + suffix
@ -166,6 +255,16 @@ func TestRocksmq_Basic(t *testing.T) {
_, err = rmq.Produce(channelName, pMsgs)
assert.Nil(t, err)
// before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq
// it aims to test if produce before 2.2.0, but consume after 2.2.0
msgD := "d_message"
tMsgs := make([]producerMessageBefore, 1)
tMsgD := producerMessageBefore{Payload: []byte(msgD)}
tMsgs[0] = tMsgD
_, err = rmq.produceBefore(channelName, tMsgs)
assert.Nil(t, err)
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(channelName, groupName)
err = rmq.CreateConsumerGroup(channelName, groupName)
@ -192,6 +291,16 @@ func TestRocksmq_Basic(t *testing.T) {
_, ok = cMsgs[1].Properties[common.TraceIDKey]
assert.True(t, ok)
assert.Equal(t, cMsgs[1].Properties[common.TraceIDKey], "c")
cMsgs, err = rmq.Consume(channelName, groupName, 1)
assert.Nil(t, err)
assert.Equal(t, len(cMsgs), 1)
assert.Equal(t, string(cMsgs[0].Payload), "d_message")
_, ok = cMsgs[0].Properties[common.TraceIDKey]
assert.False(t, ok)
// it will be set empty map if produce message has no properties field
expect := make(map[string]string)
assert.Equal(t, cMsgs[0].Properties, expect)
}
func TestRocksmq_MultiConsumer(t *testing.T) {
@ -570,7 +679,10 @@ func TestRocksmq_Throughout(t *testing.T) {
}
pt1 := time.Now().UnixNano() / int64(time.Millisecond)
pDuration := pt1 - pt0
log.Printf("Total produce %d item, cost %v ms, throughout %v / s", entityNum, pDuration, int64(entityNum)*1000/pDuration)
log.Info("Rocksmq_Throughout",
zap.Int("Total produce item number", entityNum),
zap.Int64("Total cost (ms)", pDuration),
zap.Int64("Total throughout (s)", int64(entityNum)*1000/pDuration))
groupName := "test_throughout_group"
_ = rmq.DestroyConsumerGroup(channelName, groupName)
@ -587,7 +699,10 @@ func TestRocksmq_Throughout(t *testing.T) {
}
ct1 := time.Now().UnixNano() / int64(time.Millisecond)
cDuration := ct1 - ct0
log.Printf("Total consume %d item, cost %v ms, throughout %v / s", entityNum, cDuration, int64(entityNum)*1000/cDuration)
log.Info("Rocksmq_Throughout",
zap.Int("Total produce item number", entityNum),
zap.Int64("Total cost (ms)", cDuration),
zap.Int64("Total throughout (s)", int64(entityNum)*1000/cDuration))
}
func TestRocksmq_MultiChan(t *testing.T) {