From d9185485f1bacfed5dda096be595d1afa39a2f12 Mon Sep 17 00:00:00 2001 From: smellthemoon <64083300+smellthemoon@users.noreply.github.com> Date: Fri, 25 Nov 2022 23:11:12 +0800 Subject: [PATCH] Remove TraceID in mq (#20697) Signed-off-by: lixinguo Signed-off-by: lixinguo Co-authored-by: lixinguo --- internal/common/common.go | 5 ++ .../mqimpl/rocksmq/client/client_impl_test.go | 3 +- internal/mq/mqimpl/rocksmq/client/consumer.go | 7 +-- internal/mq/mqimpl/rocksmq/client/producer.go | 3 +- .../mq/mqimpl/rocksmq/client/producer_impl.go | 3 +- internal/mq/mqimpl/rocksmq/server/rocksmq.go | 8 +-- .../mq/mqimpl/rocksmq/server/rocksmq_impl.go | 29 ++++++++++- .../rocksmq/server/rocksmq_impl_test.go | 34 ++++++++---- internal/mq/msgstream/mq_msgstream.go | 12 ++--- .../mqwrapper/kafka/kafka_client_test.go | 37 ++++++++----- .../mqwrapper/kafka/kafka_consumer_test.go | 52 ++++++++++++------- .../mqwrapper/kafka/kafka_message.go | 13 ++++- .../mqwrapper/kafka/kafka_message_test.go | 2 +- .../mqwrapper/kafka/kafka_producer.go | 6 +++ .../mqwrapper/pulsar/pulsar_client_test.go | 13 +++-- .../mqwrapper/pulsar/pulsar_consumer_test.go | 9 +++- .../mq/msgstream/mqwrapper/rmq/rmq_message.go | 2 +- .../msgstream/mqwrapper/rmq/rmq_producer.go | 2 +- internal/mq/msgstream/trace.go | 4 +- internal/util/trace/util.go | 6 +-- internal/util/trace/util_test.go | 2 +- 21 files changed, 176 insertions(+), 76 deletions(-) diff --git a/internal/common/common.go b/internal/common/common.go index 0782faee51..a3330c2ab3 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -87,3 +87,8 @@ const ( const ( CollectionTTLConfigKey = "collection.ttl.seconds" ) + +const ( + PropertiesKey string = "properties" + TraceIDKey string = "uber-trace-id" +) diff --git a/internal/mq/mqimpl/rocksmq/client/client_impl_test.go b/internal/mq/mqimpl/rocksmq/client/client_impl_test.go index 2430339b3b..dc29f00ff5 100644 --- a/internal/mq/mqimpl/rocksmq/client/client_impl_test.go +++ b/internal/mq/mqimpl/rocksmq/client/client_impl_test.go @@ -154,7 +154,8 @@ func TestClient_SeekLatest(t *testing.T) { assert.NotNil(t, producer) assert.NoError(t, err) msg := &ProducerMessage{ - Payload: make([]byte, 10), + Payload: make([]byte, 10), + Properties: map[string]string{}, } id, err := producer.Send(msg) assert.Nil(t, err) diff --git a/internal/mq/mqimpl/rocksmq/client/consumer.go b/internal/mq/mqimpl/rocksmq/client/consumer.go index dfb4f60fb4..0765d8fd7a 100644 --- a/internal/mq/mqimpl/rocksmq/client/consumer.go +++ b/internal/mq/mqimpl/rocksmq/client/consumer.go @@ -44,9 +44,10 @@ type ConsumerOptions struct { // Message is the message content of a consumer message type Message struct { Consumer - MsgID UniqueID - Topic string - Payload []byte + MsgID UniqueID + Topic string + Payload []byte + Properties map[string]string } // Consumer interface provide operations for a consumer diff --git a/internal/mq/mqimpl/rocksmq/client/producer.go b/internal/mq/mqimpl/rocksmq/client/producer.go index 93c1a28faf..9e1d22074b 100644 --- a/internal/mq/mqimpl/rocksmq/client/producer.go +++ b/internal/mq/mqimpl/rocksmq/client/producer.go @@ -18,7 +18,8 @@ type ProducerOptions struct { // ProducerMessage is the message of a producer type ProducerMessage struct { - Payload []byte + Payload []byte + Properties map[string]string } // Producer provedes some operations for a producer diff --git a/internal/mq/mqimpl/rocksmq/client/producer_impl.go b/internal/mq/mqimpl/rocksmq/client/producer_impl.go index ab588c6722..f68caecb51 100644 --- a/internal/mq/mqimpl/rocksmq/client/producer_impl.go +++ b/internal/mq/mqimpl/rocksmq/client/producer_impl.go @@ -52,7 +52,8 @@ func (p *producer) Topic() string { func (p *producer) Send(message *ProducerMessage) (UniqueID, error) { ids, err := p.c.server.Produce(p.topic, []server.ProducerMessage{ { - Payload: message.Payload, + Payload: message.Payload, + Properties: message.Properties, }, }) if err != nil { diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq.go b/internal/mq/mqimpl/rocksmq/server/rocksmq.go index 66467cfaf0..65e580fb3f 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq.go @@ -13,7 +13,8 @@ package server // ProducerMessage that will be written to rocksdb type ProducerMessage struct { - Payload []byte + Payload []byte + Properties map[string]string } // Consumer is rocksmq consumer @@ -25,8 +26,9 @@ type Consumer struct { // ConsumerMessage that consumed from rocksdb type ConsumerMessage struct { - MsgID UniqueID - Payload []byte + MsgID UniqueID + Payload []byte + Properties map[string]string } // RocksMQ is an interface thatmay be implemented by the application diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go index a03d9bc86d..e962182b04 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go @@ -12,6 +12,7 @@ package server import ( + "encoding/json" "errors" "fmt" "path" @@ -26,6 +27,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/kv" rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" "github.com/milvus-io/milvus/internal/log" @@ -274,7 +276,7 @@ func (rmq *rocksmq) Close() { log.Info("Successfully close rocksmq") } -//print rmq consumer Info +// print rmq consumer Info func (rmq *rocksmq) Info() bool { rtn := true rmq.consumers.Range(func(key, vals interface{}) bool { @@ -594,6 +596,16 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni msgID := idStart + UniqueID(i) key := path.Join(topicName, strconv.FormatInt(msgID, 10)) batch.Put([]byte(key), messages[i].Payload) + properties, err := json.Marshal(messages[i].Properties) + if err != nil { + log.Warn("properties marshal failed", + zap.Int64("msgID", msgID), + zap.String("topicName", topicName), + zap.Error(err)) + return nil, err + } + pKey := path.Join(common.PropertiesKey, topicName, strconv.FormatInt(msgID, 10)) + batch.Put([]byte(pKey), properties) msgIDs[i] = msgID msgSizes[msgID] = int64(len(messages[i].Payload)) } @@ -724,6 +736,17 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum val.Free() return nil, err } + askedProperties := path.Join(common.PropertiesKey, topicName, strconv.FormatInt(msgID, 10)) + opts := gorocksdb.NewDefaultReadOptions() + defer opts.Destroy() + propertiesValue, err := rmq.store.GetBytes(opts, []byte(askedProperties)) + if err != nil { + return nil, err + } + var properties map[string]string + if err = json.Unmarshal(propertiesValue, &properties); err != nil { + return nil, err + } msg := ConsumerMessage{ MsgID: msgID, } @@ -731,8 +754,10 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum dataLen := len(origData) if dataLen == 0 { msg.Payload = nil + msg.Properties = nil } else { msg.Payload = make([]byte, dataLen) + msg.Properties = properties copy(msg.Payload, origData) } consumerMessage = append(consumerMessage, msg) @@ -850,7 +875,7 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err return nil } -//Only for test +// Only for test func (rmq *rocksmq) ForceSeek(topicName string, groupName string, msgID UniqueID) error { log.Warn("Use method ForceSeek that only for test") if rmq.isClosed() { diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go index 52727cbc3c..a976d5bd6a 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/internal/common" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" "github.com/milvus-io/milvus/internal/util/etcd" @@ -151,14 +152,14 @@ func TestRocksmq_Basic(t *testing.T) { msgA := "a_message" pMsgs := make([]ProducerMessage, 1) - pMsgA := ProducerMessage{Payload: []byte(msgA)} + pMsgA := ProducerMessage{Payload: []byte(msgA), Properties: map[string]string{common.TraceIDKey: "a"}} pMsgs[0] = pMsgA _, err = rmq.Produce(channelName, pMsgs) assert.Nil(t, err) - pMsgB := ProducerMessage{Payload: []byte("b_message")} - pMsgC := ProducerMessage{Payload: []byte("c_message")} + pMsgB := ProducerMessage{Payload: []byte("b_message"), Properties: map[string]string{common.TraceIDKey: "b"}} + pMsgC := ProducerMessage{Payload: []byte("c_message"), Properties: map[string]string{common.TraceIDKey: "c"}} pMsgs[0] = pMsgB pMsgs = append(pMsgs, pMsgC) @@ -176,12 +177,21 @@ func TestRocksmq_Basic(t *testing.T) { assert.Nil(t, err) assert.Equal(t, len(cMsgs), 1) assert.Equal(t, string(cMsgs[0].Payload), "a_message") + _, ok := cMsgs[0].Properties[common.TraceIDKey] + assert.True(t, ok) + assert.Equal(t, cMsgs[0].Properties[common.TraceIDKey], "a") cMsgs, err = rmq.Consume(channelName, groupName, 2) assert.Nil(t, err) assert.Equal(t, len(cMsgs), 2) assert.Equal(t, string(cMsgs[0].Payload), "b_message") + _, ok = cMsgs[0].Properties[common.TraceIDKey] + assert.True(t, ok) + assert.Equal(t, cMsgs[0].Properties[common.TraceIDKey], "b") assert.Equal(t, string(cMsgs[1].Payload), "c_message") + _, ok = cMsgs[1].Properties[common.TraceIDKey] + assert.True(t, ok) + assert.Equal(t, cMsgs[1].Properties[common.TraceIDKey], "c") } func TestRocksmq_MultiConsumer(t *testing.T) { @@ -509,15 +519,17 @@ func TestRocksmq_Goroutines(t *testing.T) { wg.Wait() } -/** - This test is aim to measure RocksMq throughout. - Hardware: - CPU Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz - Disk SSD +/* +* - Test with 1,000,000 message, result is as follow: - Produce: 190000 message / s - Consume: 90000 message / s + This test is aim to measure RocksMq throughout. + Hardware: + CPU Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz + Disk SSD + + Test with 1,000,000 message, result is as follow: + Produce: 190000 message / s + Consume: 90000 message / s */ func TestRocksmq_Throughout(t *testing.T) { ep := etcdEndpoints() diff --git a/internal/mq/msgstream/mq_msgstream.go b/internal/mq/msgstream/mq_msgstream.go index b7a2ddb06e..225404a6e8 100644 --- a/internal/mq/msgstream/mq_msgstream.go +++ b/internal/mq/msgstream/mq_msgstream.go @@ -275,7 +275,7 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error { msg := &mqwrapper.ProducerMessage{Payload: m, Properties: map[string]string{}} - trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties) + trace.InjectContextToMsgProperties(sp.Context(), msg.Properties) ms.producerLock.Lock() if _, err := ms.producers[channel].Send( @@ -341,7 +341,7 @@ func (ms *mqMsgStream) ProduceMark(msgPack *MsgPack) (map[string][]MessageID, er msg := &mqwrapper.ProducerMessage{Payload: m, Properties: map[string]string{}} - trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties) + trace.InjectContextToMsgProperties(sp.Context(), msg.Properties) ms.producerLock.Lock() id, err := ms.producers[channel].Send( @@ -384,7 +384,7 @@ func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) error { msg := &mqwrapper.ProducerMessage{Payload: m, Properties: map[string]string{}} - trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties) + trace.InjectContextToMsgProperties(sp.Context(), msg.Properties) ms.producerLock.Lock() for _, producer := range ms.producers { @@ -426,7 +426,7 @@ func (ms *mqMsgStream) BroadcastMark(msgPack *MsgPack) (map[string][]MessageID, msg := &mqwrapper.ProducerMessage{Payload: m, Properties: map[string]string{}} - trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties) + trace.InjectContextToMsgProperties(sp.Context(), msg.Properties) ms.producerLock.Lock() for channel, producer := range ms.producers { @@ -504,7 +504,7 @@ func (ms *mqMsgStream) receiveMsg(consumer mqwrapper.Consumer) { Timestamp: tsMsg.BeginTs(), }) - sp, ok := ExtractFromPulsarMsgProperties(tsMsg, msg.Properties()) + sp, ok := ExtractFromMsgProperties(tsMsg, msg.Properties()) if ok { tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp)) } @@ -810,7 +810,7 @@ func (ms *MqTtMsgStream) consumeToTtMsg(consumer mqwrapper.Consumer) { continue } - sp, ok := ExtractFromPulsarMsgProperties(tsMsg, msg.Properties()) + sp, ok := ExtractFromMsgProperties(tsMsg, msg.Properties()) if ok { tsMsg.SetTraceCtx(opentracing.ContextWithSpan(context.Background(), sp)) } diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go index 48ef5fdfbe..990a38a8e7 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go @@ -172,7 +172,9 @@ func TestKafkaClient_ConsumeWithAck(t *testing.T) { rand.Seed(time.Now().UnixNano()) topic := fmt.Sprintf("test-topic-%d", rand.Int()) subName := fmt.Sprintf("test-subname-%d", rand.Int()) - arr := []int{111, 222, 333, 444, 555, 666, 777} + arr1 := []int{111, 222, 333, 444, 555, 666, 777} + arr2 := []string{"111", "222", "333", "444", "555", "666", "777"} + c := make(chan mqwrapper.MessageID, 1) ctx, cancel := context.WithCancel(context.Background()) @@ -183,7 +185,7 @@ func TestKafkaClient_ConsumeWithAck(t *testing.T) { producer := createProducer(t, kc, topic) defer producer.Close() - produceData(ctx, t, producer, arr) + produceData(ctx, t, producer, arr1, arr2) time.Sleep(100 * time.Millisecond) ctx1, cancel1 := context.WithTimeout(ctx, 5*time.Second) @@ -203,9 +205,9 @@ func TestKafkaClient_ConsumeWithAck(t *testing.T) { cancel3() cancel() - assert.Equal(t, len(arr), total1+total2) + assert.Equal(t, len(arr1), total1+total2) - assert.Equal(t, len(arr), total3) + assert.Equal(t, len(arr1), total3) } func TestKafkaClient_SeekPosition(t *testing.T) { @@ -220,8 +222,9 @@ func TestKafkaClient_SeekPosition(t *testing.T) { producer := createProducer(t, kc, topic) defer producer.Close() - data := []int{1, 2, 3} - ids := produceData(ctx, t, producer, data) + data1 := []int{1, 2, 3} + data2 := []string{"1", "2", "3"} + ids := produceData(ctx, t, producer, data1, data2) consumer := createConsumer(t, kc, topic, subName, mqwrapper.SubscriptionPositionUnknown) defer consumer.Close() @@ -233,6 +236,7 @@ func TestKafkaClient_SeekPosition(t *testing.T) { case msg := <-consumer.Chan(): consumer.Ack(msg) assert.Equal(t, 3, BytesToInt(msg.Payload())) + assert.Equal(t, "3", msg.Properties()[common.TraceIDKey]) case <-time.After(10 * time.Second): assert.FailNow(t, "should not wait") } @@ -250,22 +254,25 @@ func TestKafkaClient_ConsumeFromLatest(t *testing.T) { producer := createProducer(t, kc, topic) defer producer.Close() - data := []int{1, 2} - produceData(ctx, t, producer, data) + data1 := []int{1, 2} + data2 := []string{"1", "2"} + produceData(ctx, t, producer, data1, data2) consumer := createConsumer(t, kc, topic, subName, mqwrapper.SubscriptionPositionLatest) defer consumer.Close() go func() { time.Sleep(time.Second * 2) - data := []int{3} - produceData(ctx, t, producer, data) + data1 := []int{3} + data2 := []string{"3"} + produceData(ctx, t, producer, data1, data2) }() select { case msg := <-consumer.Chan(): consumer.Ack(msg) assert.Equal(t, 3, BytesToInt(msg.Payload())) + assert.Equal(t, "3", msg.Properties()[common.TraceIDKey]) case <-time.After(5 * time.Second): assert.FailNow(t, "should not wait") } @@ -380,12 +387,14 @@ func createProducer(t *testing.T, kc *kafkaClient, topic string) mqwrapper.Produ return producer } -func produceData(ctx context.Context, t *testing.T, producer mqwrapper.Producer, arr []int) []mqwrapper.MessageID { +func produceData(ctx context.Context, t *testing.T, producer mqwrapper.Producer, arr []int, pArr []string) []mqwrapper.MessageID { var msgIDs []mqwrapper.MessageID - for _, v := range arr { + for k, v := range arr { msg := &mqwrapper.ProducerMessage{ - Payload: IntToBytes(v), - Properties: map[string]string{}, + Payload: IntToBytes(v), + Properties: map[string]string{ + common.TraceIDKey: pArr[k], + }, } msgID, err := producer.Send(ctx, msg) msgIDs = append(msgIDs, msgID) diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go index 043cc1731a..db85cb7c3e 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" "github.com/stretchr/testify/assert" ) @@ -34,8 +35,9 @@ func TestKafkaConsumer_SeekExclusive(t *testing.T) { assert.NoError(t, err) defer consumer.Close() - data := []int{111, 222, 333} - testKafkaConsumerProduceData(t, topic, data) + data1 := []int{111, 222, 333} + data2 := []string{"111", "222", "333"} + testKafkaConsumerProduceData(t, topic, data1, data2) msgID := &kafkaID{messageID: 1} err = consumer.Seek(msgID, false) @@ -43,6 +45,7 @@ func TestKafkaConsumer_SeekExclusive(t *testing.T) { msg := <-consumer.Chan() assert.Equal(t, 333, BytesToInt(msg.Payload())) + assert.Equal(t, "333", msg.Properties()[common.TraceIDKey]) assert.Equal(t, int64(2), msg.ID().(*kafkaID).messageID) assert.Equal(t, topic, msg.Topic()) assert.True(t, len(msg.Properties()) == 0) @@ -58,8 +61,9 @@ func TestKafkaConsumer_SeekInclusive(t *testing.T) { assert.NoError(t, err) defer consumer.Close() - data := []int{111, 222, 333} - testKafkaConsumerProduceData(t, topic, data) + data1 := []int{111, 222, 333} + data2 := []string{"111", "222", "333"} + testKafkaConsumerProduceData(t, topic, data1, data2) msgID := &kafkaID{messageID: 1} err = consumer.Seek(msgID, true) @@ -67,6 +71,7 @@ func TestKafkaConsumer_SeekInclusive(t *testing.T) { msg := <-consumer.Chan() assert.Equal(t, 222, BytesToInt(msg.Payload())) + assert.Equal(t, "222", msg.Properties()[common.TraceIDKey]) assert.Equal(t, int64(1), msg.ID().(*kafkaID).messageID) assert.Equal(t, topic, msg.Topic()) assert.True(t, len(msg.Properties()) == 0) @@ -99,8 +104,9 @@ func TestKafkaConsumer_ChanWithNoAssign(t *testing.T) { assert.NoError(t, err) defer consumer.Close() - data := []int{111} - testKafkaConsumerProduceData(t, topic, data) + data1 := []int{111} + data2 := []string{"111"} + testKafkaConsumerProduceData(t, topic, data1, data2) assert.Panics(t, func() { <-consumer.Chan() }) @@ -135,10 +141,12 @@ func TestKafkaConsumer_SeekAfterChan(t *testing.T) { assert.NoError(t, err) defer consumer.Close() - data := []int{111} - testKafkaConsumerProduceData(t, topic, data) + data1 := []int{111} + data2 := []string{"111"} + testKafkaConsumerProduceData(t, topic, data1, data2) msg := <-consumer.Chan() assert.Equal(t, 111, BytesToInt(msg.Payload())) + assert.Equal(t, "111", msg.Properties()[common.TraceIDKey]) err = consumer.Seek(mockMsgID{}, false) assert.Error(t, err) @@ -158,8 +166,9 @@ func TestKafkaConsumer_GetLatestMsgID(t *testing.T) { assert.Equal(t, int64(0), latestMsgID.(*kafkaID).messageID) assert.Nil(t, err) - data := []int{111, 222, 333} - testKafkaConsumerProduceData(t, topic, data) + data1 := []int{111, 222, 333} + data2 := []string{"111", "222", "333"} + testKafkaConsumerProduceData(t, topic, data1, data2) latestMsgID, err = consumer.GetLatestMsgID() assert.Equal(t, int64(2), latestMsgID.(*kafkaID).messageID) @@ -171,20 +180,24 @@ func TestKafkaConsumer_ConsumeFromLatest(t *testing.T) { groupID := fmt.Sprintf("test-groupid-%d", rand.Int()) topic := fmt.Sprintf("test-topicName-%d", rand.Int()) - data := []int{111, 222, 333} - testKafkaConsumerProduceData(t, topic, data) + data1 := []int{111, 222, 333} + data2 := []string{"111", "222", "333"} + testKafkaConsumerProduceData(t, topic, data1, data2) config := createConfig(groupID) consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionLatest) assert.NoError(t, err) defer consumer.Close() - data = []int{444, 555} - testKafkaConsumerProduceData(t, topic, data) + data1 = []int{444, 555} + data2 = []string{"444", "555"} + testKafkaConsumerProduceData(t, topic, data1, data2) msg := <-consumer.Chan() assert.Equal(t, 444, BytesToInt(msg.Payload())) + assert.Equal(t, "444", msg.Properties()[common.TraceIDKey]) msg = <-consumer.Chan() assert.Equal(t, 555, BytesToInt(msg.Payload())) + assert.Equal(t, "555", msg.Properties()[common.TraceIDKey]) } func TestKafkaConsumer_ConsumeFromEarliest(t *testing.T) { @@ -192,14 +205,16 @@ func TestKafkaConsumer_ConsumeFromEarliest(t *testing.T) { groupID := fmt.Sprintf("test-groupid-%d", rand.Int()) topic := fmt.Sprintf("test-topicName-%d", rand.Int()) - data := []int{111, 222, 333} - testKafkaConsumerProduceData(t, topic, data) + data1 := []int{111, 222, 333} + data2 := []string{"111", "222", "333"} + testKafkaConsumerProduceData(t, topic, data1, data2) config := createConfig(groupID) consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionEarliest) assert.NoError(t, err) msg := <-consumer.Chan() assert.Equal(t, 111, BytesToInt(msg.Payload())) + assert.Equal(t, "111", msg.Properties()[common.TraceIDKey]) consumer.Ack(msg) defer consumer.Close() @@ -208,6 +223,7 @@ func TestKafkaConsumer_ConsumeFromEarliest(t *testing.T) { assert.NoError(t, err) msg = <-consumer2.Chan() assert.Equal(t, 111, BytesToInt(msg.Payload())) + assert.Equal(t, "111", msg.Properties()[common.TraceIDKey]) consumer2.Ack(msg) defer consumer2.Close() } @@ -218,14 +234,14 @@ func TestKafkaConsumer_createKafkaConsumer(t *testing.T) { assert.NotNil(t, err) } -func testKafkaConsumerProduceData(t *testing.T, topic string, data []int) { +func testKafkaConsumerProduceData(t *testing.T, topic string, data []int, arr []string) { ctx := context.Background() kc := createKafkaClient(t) defer kc.Close() producer := createProducer(t, kc, topic) defer producer.Close() - produceData(ctx, t, producer, data) + produceData(ctx, t, producer, data, arr) producer.(*kafkaProducer).p.Flush(500) } diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_message.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_message.go index 2d144739e9..8c0af9c06e 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_message.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_message.go @@ -2,6 +2,7 @@ package kafka import ( "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" ) @@ -14,7 +15,17 @@ func (km *kafkaMessage) Topic() string { } func (km *kafkaMessage) Properties() map[string]string { - return nil + if len(km.msg.Headers) == 0 { + return nil + } + var properties map[string]string + for i := 0; i < len(km.msg.Headers); i++ { + if _, ok := properties[km.msg.Headers[i].Key]; ok { + log.Info("Repeated key in kafka message headers") + } + properties[km.msg.Headers[i].Key] = string(km.msg.Headers[i].Value) + } + return properties } func (km *kafkaMessage) Payload() []byte { diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_message_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_message_test.go index e38523a880..90f407a171 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_message_test.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_message_test.go @@ -9,7 +9,7 @@ import ( func TestKafkaMessage_All(t *testing.T) { topic := "t" - msg := &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0, Offset: 0}, Value: nil} + msg := &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0, Offset: 0}, Value: nil, Headers: nil} km := &kafkaMessage{msg: msg} assert.Equal(t, topic, km.Topic()) assert.Equal(t, int64(0), km.ID().(*kafkaID).messageID) diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go index 08eee98076..38d19d06d7 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go @@ -26,9 +26,15 @@ func (kp *kafkaProducer) Topic() string { } func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMessage) (mqwrapper.MessageID, error) { + var headers []kafka.Header + for key, value := range message.Properties { + header := kafka.Header{Key: key, Value: []byte(value)} + headers = append(headers, header) + } err := kp.p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx}, Value: message.Payload, + Headers: headers, }, kp.deliveryChan) if err != nil { diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go index dc458f03e0..f3d16219ec 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go @@ -425,11 +425,14 @@ func TestPulsarClient_SeekPosition(t *testing.T) { log.Info("Produce start") ids := []mqwrapper.MessageID{} - arr := []int{1, 2, 3} - for _, v := range arr { + arr1 := []int{1, 2, 3} + arr2 := []string{"1", "2", "3"} + for k, v := range arr1 { msg := &mqwrapper.ProducerMessage{ - Payload: IntToBytes(v), - Properties: map[string]string{}, + Payload: IntToBytes(v), + Properties: map[string]string{ + common.TraceIDKey: arr2[k], + }, } id, err := producer.Send(ctx, msg) ids = append(ids, id) @@ -459,6 +462,7 @@ func TestPulsarClient_SeekPosition(t *testing.T) { assert.Equal(t, seekID.EntryID(), msg.ID().EntryID()) assert.Equal(t, seekID.PartitionIdx(), msg.ID().PartitionIdx()) assert.Equal(t, 3, BytesToInt(msg.Payload())) + assert.Equal(t, "3", msg.Properties()[common.TraceIDKey]) case <-time.After(2 * time.Second): assert.FailNow(t, "should not wait") } @@ -475,6 +479,7 @@ func TestPulsarClient_SeekPosition(t *testing.T) { assert.Equal(t, seekID.EntryID(), msg.ID().EntryID()) assert.Equal(t, seekID.PartitionIdx(), msg.ID().PartitionIdx()) assert.Equal(t, 2, BytesToInt(msg.Payload())) + assert.Equal(t, "2", msg.Properties()[common.TraceIDKey]) case <-time.After(2 * time.Second): assert.FailNow(t, "should not wait") } diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go index 7880e82e8c..f185b99a6c 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go @@ -23,6 +23,7 @@ import ( "strings" "testing" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" "github.com/streamnative/pulsarctl/pkg/pulsar/utils" @@ -86,6 +87,7 @@ func TestComsumeCompressedMessage(t *testing.T) { msg := []byte("test message") compressedMsg := []byte("test compressed message") + traceValue := "test compressed message id" _, err = producer.Send(context.Background(), &mqwrapper.ProducerMessage{ Payload: msg, Properties: map[string]string{}, @@ -97,14 +99,17 @@ func TestComsumeCompressedMessage(t *testing.T) { assert.Equal(t, msg, recvMsg.Payload()) _, err = compressProducer.Send(context.Background(), &mqwrapper.ProducerMessage{ - Payload: compressedMsg, - Properties: map[string]string{}, + Payload: compressedMsg, + Properties: map[string]string{ + common.TraceIDKey: traceValue, + }, }) assert.NoError(t, err) recvMsg, err = consumer.Receive(context.Background()) assert.NoError(t, err) consumer.Ack(recvMsg) assert.Equal(t, compressedMsg, recvMsg.Payload()) + assert.Equal(t, traceValue, recvMsg.Properties()[common.TraceIDKey]) assert.Nil(t, err) assert.NotNil(t, consumer) diff --git a/internal/mq/msgstream/mqwrapper/rmq/rmq_message.go b/internal/mq/msgstream/mqwrapper/rmq/rmq_message.go index 6748438906..4d2c5ba8cd 100644 --- a/internal/mq/msgstream/mqwrapper/rmq/rmq_message.go +++ b/internal/mq/msgstream/mqwrapper/rmq/rmq_message.go @@ -31,7 +31,7 @@ func (rm *rmqMessage) Topic() string { // Properties returns the properties of rocksmq message func (rm *rmqMessage) Properties() map[string]string { - return nil + return rm.msg.Properties } // Payload returns the payload of rocksmq message diff --git a/internal/mq/msgstream/mqwrapper/rmq/rmq_producer.go b/internal/mq/msgstream/mqwrapper/rmq/rmq_producer.go index ce85b37810..a49da2db57 100644 --- a/internal/mq/msgstream/mqwrapper/rmq/rmq_producer.go +++ b/internal/mq/msgstream/mqwrapper/rmq/rmq_producer.go @@ -33,7 +33,7 @@ func (rp *rmqProducer) Topic() string { // Send send the producer messages to rocksmq func (rp *rmqProducer) Send(ctx context.Context, message *mqwrapper.ProducerMessage) (mqwrapper.MessageID, error) { - pm := &client.ProducerMessage{Payload: message.Payload} + pm := &client.ProducerMessage{Payload: message.Payload, Properties: message.Properties} id, err := rp.p.Send(pm) return &rmqID{messageID: id}, err } diff --git a/internal/mq/msgstream/trace.go b/internal/mq/msgstream/trace.go index 346d04da8d..dce78f1e1c 100644 --- a/internal/mq/msgstream/trace.go +++ b/internal/mq/msgstream/trace.go @@ -29,9 +29,9 @@ import ( "github.com/milvus-io/milvus/internal/util/trace" ) -// ExtractFromPulsarMsgProperties extracts trace span from msg.properties. +// ExtractFromMsgProperties extracts trace span from msg.properties. // And it will attach some default tags to the span. -func ExtractFromPulsarMsgProperties(msg TsMsg, properties map[string]string) (opentracing.Span, bool) { +func ExtractFromMsgProperties(msg TsMsg, properties map[string]string) (opentracing.Span, bool) { if !allowTrace(msg) { return trace.NoopSpan(), false } diff --git a/internal/util/trace/util.go b/internal/util/trace/util.go index 0b67d9e0b7..88d1d4cd57 100644 --- a/internal/util/trace/util.go +++ b/internal/util/trace/util.go @@ -186,13 +186,13 @@ func InfoFromContext(ctx context.Context) (traceID string, sampled, found bool) return "", false, false } -// InjectContextToPulsarMsgProperties is a method inject span to pulsr message. -func InjectContextToPulsarMsgProperties(sc opentracing.SpanContext, properties map[string]string) { +// InjectContextToMsgProperties is a method inject span to pulsr message. +func InjectContextToMsgProperties(sc opentracing.SpanContext, properties map[string]string) { tracer := opentracing.GlobalTracer() tracer.Inject(sc, opentracing.TextMap, PropertiesReaderWriter{properties}) } -// PropertiesReaderWriter is for saving trce in pulsar msg properties. +// PropertiesReaderWriter is for saving trace in pulsar msg properties. // Implement Set and ForeachKey methods. type PropertiesReaderWriter struct { PpMap map[string]string diff --git a/internal/util/trace/util_test.go b/internal/util/trace/util_test.go index 7f4f84434c..a54411b4a3 100644 --- a/internal/util/trace/util_test.go +++ b/internal/util/trace/util_test.go @@ -100,7 +100,7 @@ func TestInject(t *testing.T) { id, sampled, found := InfoFromContext(ctx) fmt.Printf("traceID = %s, sampled = %t, found = %t", id, sampled, found) pp := PropertiesReaderWriter{PpMap: map[string]string{}} - InjectContextToPulsarMsgProperties(sp.Context(), pp.PpMap) + InjectContextToMsgProperties(sp.Context(), pp.PpMap) tracer := opentracing.GlobalTracer() sc, _ := tracer.Extract(opentracing.TextMap, pp) assert.NotNil(t, sc)