diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go index e962182b04..87bc1db357 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go @@ -743,9 +743,13 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum if err != nil { return nil, err } - var properties map[string]string - if err = json.Unmarshal(propertiesValue, &properties); err != nil { - return nil, err + properties := make(map[string]string) + if len(propertiesValue) != 0 { + // before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq + // when produce before 2.2.0, but consume in 2.2.0, propertiesValue will be [] + if err = json.Unmarshal(propertiesValue, &properties); err != nil { + return nil, err + } } msg := ConsumerMessage{ MsgID: msgID, diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go index a976d5bd6a..80b3c8a54a 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go @@ -12,9 +12,10 @@ package server import ( + "errors" "fmt" - "log" "os" + "path" "strconv" "strings" "sync" @@ -26,8 +27,11 @@ import ( "github.com/milvus-io/milvus/internal/common" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/tecbot/gorocksdb" + "go.uber.org/zap" "github.com/stretchr/testify/assert" ) @@ -37,6 +41,10 @@ var rmqPath = "/tmp/rocksmq" var kvPathSuffix = "_kv" var metaPathSuffix = "_meta" +type producerMessageBefore struct { + Payload []byte +} + func InitIDAllocator(kvPath string) *allocator.GlobalIDAllocator { rocksdbKV, err := rocksdbkv.NewRocksdbKV(kvPath) if err != nil { @@ -64,6 +72,87 @@ func etcdEndpoints() []string { return etcdEndpoints } +// to test compatibility concern +func (rmq *rocksmq) produceBefore(topicName string, messages []producerMessageBefore) ([]UniqueID, error) { + if rmq.isClosed() { + return nil, errors.New(RmqNotServingErrMsg) + } + start := time.Now() + ll, ok := topicMu.Load(topicName) + if !ok { + return []UniqueID{}, fmt.Errorf("topic name = %s not exist", topicName) + } + lock, ok := ll.(*sync.Mutex) + if !ok { + return []UniqueID{}, fmt.Errorf("get mutex failed, topic name = %s", topicName) + } + lock.Lock() + defer lock.Unlock() + + getLockTime := time.Since(start).Milliseconds() + + msgLen := len(messages) + idStart, idEnd, err := rmq.idAllocator.Alloc(uint32(msgLen)) + + if err != nil { + return []UniqueID{}, err + } + allocTime := time.Since(start).Milliseconds() + if UniqueID(msgLen) != idEnd-idStart { + return []UniqueID{}, errors.New("Obtained id length is not equal that of message") + } + + // Insert data to store system + batch := gorocksdb.NewWriteBatch() + defer batch.Destroy() + msgSizes := make(map[UniqueID]int64) + msgIDs := make([]UniqueID, msgLen) + for i := 0; i < msgLen && idStart+UniqueID(i) < idEnd; i++ { + msgID := idStart + UniqueID(i) + key := path.Join(topicName, strconv.FormatInt(msgID, 10)) + batch.Put([]byte(key), messages[i].Payload) + msgIDs[i] = msgID + msgSizes[msgID] = int64(len(messages[i].Payload)) + } + + opts := gorocksdb.NewDefaultWriteOptions() + defer opts.Destroy() + err = rmq.store.Write(opts, batch) + if err != nil { + return []UniqueID{}, err + } + writeTime := time.Since(start).Milliseconds() + if vals, ok := rmq.consumers.Load(topicName); ok { + for _, v := range vals.([]*Consumer) { + select { + case v.MsgMutex <- struct{}{}: + continue + default: + continue + } + } + } + + // Update message page info + err = rmq.updatePageInfo(topicName, msgIDs, msgSizes) + if err != nil { + return []UniqueID{}, err + } + + getProduceTime := time.Since(start).Milliseconds() + if getProduceTime > 200 { + + log.Warn("rocksmq produce too slowly", zap.String("topic", topicName), + zap.Int64("get lock elapse", getLockTime), + zap.Int64("alloc elapse", allocTime-getLockTime), + zap.Int64("write elapse", writeTime-allocTime), + zap.Int64("updatePage elapse", getProduceTime-writeTime), + zap.Int64("produce total elapse", getProduceTime), + ) + } + return msgIDs, nil +} + func TestRocksmq_RegisterConsumer(t *testing.T) { suffix := "_register" kvPath := rmqPath + kvPathSuffix + suffix @@ -166,6 +255,16 @@ func TestRocksmq_Basic(t *testing.T) { _, err = rmq.Produce(channelName, pMsgs) assert.Nil(t, err) + // before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq + // it aims to test if produce before 2.2.0, but consume after 2.2.0 + msgD := "d_message" + tMsgs := make([]producerMessageBefore, 1) + tMsgD := producerMessageBefore{Payload: []byte(msgD)} + tMsgs[0] = tMsgD + + _, err = rmq.produceBefore(channelName, tMsgs) + assert.Nil(t, err) + groupName := "test_group" _ = rmq.DestroyConsumerGroup(channelName, groupName) err = rmq.CreateConsumerGroup(channelName, groupName) @@ -192,6 +291,16 @@ func TestRocksmq_Basic(t *testing.T) { _, ok = cMsgs[1].Properties[common.TraceIDKey] assert.True(t, ok) assert.Equal(t, cMsgs[1].Properties[common.TraceIDKey], "c") + + cMsgs, err = rmq.Consume(channelName, groupName, 1) + assert.Nil(t, err) + assert.Equal(t, len(cMsgs), 1) + assert.Equal(t, string(cMsgs[0].Payload), "d_message") + _, ok = cMsgs[0].Properties[common.TraceIDKey] + assert.False(t, ok) + // it will be set empty map if produce message has no properties field + expect := make(map[string]string) + assert.Equal(t, cMsgs[0].Properties, expect) } func TestRocksmq_MultiConsumer(t *testing.T) { @@ -570,7 +679,10 @@ func TestRocksmq_Throughout(t *testing.T) { } pt1 := time.Now().UnixNano() / int64(time.Millisecond) pDuration := pt1 - pt0 - log.Printf("Total produce %d item, cost %v ms, throughout %v / s", entityNum, pDuration, int64(entityNum)*1000/pDuration) + log.Info("Rocksmq_Throughout", + zap.Int("Total produce item number", entityNum), + zap.Int64("Total cost (ms)", pDuration), + zap.Int64("Total throughout (s)", int64(entityNum)*1000/pDuration)) groupName := "test_throughout_group" _ = rmq.DestroyConsumerGroup(channelName, groupName) @@ -587,7 +699,10 @@ func TestRocksmq_Throughout(t *testing.T) { } ct1 := time.Now().UnixNano() / int64(time.Millisecond) cDuration := ct1 - ct0 - log.Printf("Total consume %d item, cost %v ms, throughout %v / s", entityNum, cDuration, int64(entityNum)*1000/cDuration) + log.Info("Rocksmq_Throughout", + zap.Int("Total produce item number", entityNum), + zap.Int64("Total cost (ms)", cDuration), + zap.Int64("Total throughout (s)", int64(entityNum)*1000/cDuration)) } func TestRocksmq_MultiChan(t *testing.T) {