diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 1a4c3359a4..a1a7260ee3 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -541,8 +541,15 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum return nil, err } msg := ConsumerMessage{ - MsgID: msgID, - Payload: val.Data(), + MsgID: msgID, + } + origData := val.Data() + dataLen := len(origData) + if dataLen == 0 { + msg.Payload = nil + } else { + msg.Payload = make([]byte, dataLen) + copy(msg.Payload, origData) } consumerMessage = append(consumerMessage, msg) key.Free() diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go index 8280367049..3097ac92c6 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go @@ -494,3 +494,67 @@ func TestRocksMQ_MultiChan(t *testing.T) { assert.Equal(t, len(cMsgs), 1) assert.Equal(t, string(cMsgs[0].Payload), "for_chann1_"+strconv.Itoa(0)) } + +func TestRocksMQ_CopyData(t *testing.T) { + ep := etcdEndpoints() + etcdKV, err := etcdkv.NewEtcdKV(ep, "/etcd/test/root") + assert.Nil(t, err) + defer etcdKV.Close() + idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV) + _ = idAllocator.Initialize() + + name := "/tmp/rocksmq_copydata" + defer os.RemoveAll(name) + kvName := name + "_meta_kv" + _ = os.RemoveAll(kvName) + defer os.RemoveAll(kvName) + rmq, err := NewRocksMQ(name, idAllocator) + assert.Nil(t, err) + defer rmq.stopRetention() + + channelName0 := "test_chan01" + channelName1 := "test_chan11" + err = rmq.CreateTopic(channelName0) + assert.Nil(t, err) + defer rmq.DestroyTopic(channelName0) + err = rmq.CreateTopic(channelName1) + assert.Nil(t, err) + defer rmq.DestroyTopic(channelName1) + assert.Nil(t, err) + + msg0 := "abcde" + pMsg0 := ProducerMessage{Payload: []byte(msg0)} + err = rmq.Produce(channelName0, []ProducerMessage{pMsg0}) + assert.Nil(t, err) + + pMsg1 := ProducerMessage{Payload: nil} + err = rmq.Produce(channelName1, []ProducerMessage{pMsg1}) + assert.Nil(t, err) + + pMsg2 := ProducerMessage{Payload: []byte{}} + err = rmq.Produce(channelName1, []ProducerMessage{pMsg2}) + assert.Nil(t, err) + + var emptyTargetData []byte + pMsg3 := ProducerMessage{Payload: emptyTargetData} + err = rmq.Produce(channelName1, []ProducerMessage{pMsg3}) + assert.Nil(t, err) + + groupName := "test_group" + _ = rmq.DestroyConsumerGroup(channelName0, groupName) + err = rmq.CreateConsumerGroup(channelName0, groupName) + assert.Nil(t, err) + cMsgs0, err := rmq.Consume(channelName0, groupName, 1) + assert.Nil(t, err) + assert.Equal(t, len(cMsgs0), 1) + assert.Equal(t, string(cMsgs0[0].Payload), msg0) + + _ = rmq.DestroyConsumerGroup(channelName1, groupName) + err = rmq.CreateConsumerGroup(channelName1, groupName) + assert.Nil(t, err) + cMsgs1, err := rmq.Consume(channelName1, groupName, 3) + assert.Nil(t, err) + assert.Equal(t, 3, len(cMsgs1)) + assert.Equal(t, emptyTargetData, cMsgs1[0].Payload) + +}