mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com> Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
241 lines
6.8 KiB
Go
241 lines
6.8 KiB
Go
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/confluentinc/confluent-kafka-go/kafka"
|
|
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
|
|
"github.com/stretchr/testify/assert"
|
|
)
|
|
|
|
func TestKafkaConsumer_Subscription(t *testing.T) {
|
|
rand.Seed(time.Now().UnixNano())
|
|
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
|
|
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
|
|
|
|
config := createConfig(groupID)
|
|
kc, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionUnknown)
|
|
assert.NoError(t, err)
|
|
defer kc.Close()
|
|
assert.Equal(t, kc.Subscription(), groupID)
|
|
}
|
|
|
|
func TestKafkaConsumer_SeekExclusive(t *testing.T) {
|
|
rand.Seed(time.Now().UnixNano())
|
|
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
|
|
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
|
|
|
|
config := createConfig(groupID)
|
|
consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionUnknown)
|
|
assert.NoError(t, err)
|
|
defer consumer.Close()
|
|
|
|
data := []int{111, 222, 333}
|
|
testKafkaConsumerProduceData(t, topic, data)
|
|
|
|
msgID := &kafkaID{messageID: 1}
|
|
err = consumer.Seek(msgID, false)
|
|
assert.Nil(t, err)
|
|
|
|
msg := <-consumer.Chan()
|
|
assert.Equal(t, 333, BytesToInt(msg.Payload()))
|
|
assert.Equal(t, int64(2), msg.ID().(*kafkaID).messageID)
|
|
assert.Equal(t, topic, msg.Topic())
|
|
assert.True(t, len(msg.Properties()) == 0)
|
|
}
|
|
|
|
func TestKafkaConsumer_SeekInclusive(t *testing.T) {
|
|
rand.Seed(time.Now().UnixNano())
|
|
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
|
|
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
|
|
|
|
config := createConfig(groupID)
|
|
consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionUnknown)
|
|
assert.NoError(t, err)
|
|
defer consumer.Close()
|
|
|
|
data := []int{111, 222, 333}
|
|
testKafkaConsumerProduceData(t, topic, data)
|
|
|
|
msgID := &kafkaID{messageID: 1}
|
|
err = consumer.Seek(msgID, true)
|
|
assert.Nil(t, err)
|
|
|
|
msg := <-consumer.Chan()
|
|
assert.Equal(t, 222, BytesToInt(msg.Payload()))
|
|
assert.Equal(t, int64(1), msg.ID().(*kafkaID).messageID)
|
|
assert.Equal(t, topic, msg.Topic())
|
|
assert.True(t, len(msg.Properties()) == 0)
|
|
}
|
|
|
|
func TestKafkaConsumer_GetSeek(t *testing.T) {
|
|
rand.Seed(time.Now().UnixNano())
|
|
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
|
|
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
|
|
|
|
config := createConfig(groupID)
|
|
consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionUnknown)
|
|
assert.NoError(t, err)
|
|
defer consumer.Close()
|
|
|
|
msgID := &kafkaID{messageID: 0}
|
|
err = consumer.Seek(msgID, false)
|
|
assert.Nil(t, err)
|
|
|
|
assert.Error(t, consumer.Seek(msgID, false))
|
|
}
|
|
|
|
func TestKafkaConsumer_ChanWithNoAssign(t *testing.T) {
|
|
rand.Seed(time.Now().UnixNano())
|
|
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
|
|
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
|
|
|
|
config := createConfig(groupID)
|
|
consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionUnknown)
|
|
assert.NoError(t, err)
|
|
defer consumer.Close()
|
|
|
|
data := []int{111}
|
|
testKafkaConsumerProduceData(t, topic, data)
|
|
assert.Panics(t, func() {
|
|
<-consumer.Chan()
|
|
})
|
|
}
|
|
|
|
type mockMsgID struct {
|
|
}
|
|
|
|
func (m2 mockMsgID) AtEarliestPosition() bool {
|
|
return false
|
|
}
|
|
|
|
func (m2 mockMsgID) LessOrEqualThan(msgID []byte) (bool, error) {
|
|
return false, nil
|
|
}
|
|
|
|
func (m2 mockMsgID) Equal(msgID []byte) (bool, error) {
|
|
return false, nil
|
|
}
|
|
|
|
func (m2 mockMsgID) Serialize() []byte {
|
|
return nil
|
|
}
|
|
|
|
func TestKafkaConsumer_SeekAfterChan(t *testing.T) {
|
|
rand.Seed(time.Now().UnixNano())
|
|
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
|
|
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
|
|
|
|
config := createConfig(groupID)
|
|
consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionEarliest)
|
|
assert.NoError(t, err)
|
|
defer consumer.Close()
|
|
|
|
data := []int{111}
|
|
testKafkaConsumerProduceData(t, topic, data)
|
|
msg := <-consumer.Chan()
|
|
assert.Equal(t, 111, BytesToInt(msg.Payload()))
|
|
|
|
err = consumer.Seek(mockMsgID{}, false)
|
|
assert.Error(t, err)
|
|
}
|
|
|
|
func TestKafkaConsumer_GetLatestMsgID(t *testing.T) {
|
|
rand.Seed(time.Now().UnixNano())
|
|
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
|
|
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
|
|
|
|
config := createConfig(groupID)
|
|
consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionUnknown)
|
|
assert.NoError(t, err)
|
|
defer consumer.Close()
|
|
|
|
latestMsgID, err := consumer.GetLatestMsgID()
|
|
assert.Equal(t, int64(0), latestMsgID.(*kafkaID).messageID)
|
|
assert.Nil(t, err)
|
|
|
|
data := []int{111, 222, 333}
|
|
testKafkaConsumerProduceData(t, topic, data)
|
|
|
|
latestMsgID, err = consumer.GetLatestMsgID()
|
|
assert.Equal(t, int64(2), latestMsgID.(*kafkaID).messageID)
|
|
assert.Nil(t, err)
|
|
}
|
|
|
|
func TestKafkaConsumer_ConsumeFromLatest(t *testing.T) {
|
|
rand.Seed(time.Now().UnixNano())
|
|
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
|
|
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
|
|
|
|
data := []int{111, 222, 333}
|
|
testKafkaConsumerProduceData(t, topic, data)
|
|
|
|
config := createConfig(groupID)
|
|
consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionLatest)
|
|
assert.NoError(t, err)
|
|
defer consumer.Close()
|
|
data = []int{444, 555}
|
|
testKafkaConsumerProduceData(t, topic, data)
|
|
|
|
msg := <-consumer.Chan()
|
|
assert.Equal(t, 444, BytesToInt(msg.Payload()))
|
|
msg = <-consumer.Chan()
|
|
assert.Equal(t, 555, BytesToInt(msg.Payload()))
|
|
}
|
|
|
|
func TestKafkaConsumer_ConsumeFromEarliest(t *testing.T) {
|
|
rand.Seed(time.Now().UnixNano())
|
|
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
|
|
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
|
|
|
|
data := []int{111, 222, 333}
|
|
testKafkaConsumerProduceData(t, topic, data)
|
|
|
|
config := createConfig(groupID)
|
|
consumer, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionEarliest)
|
|
assert.NoError(t, err)
|
|
msg := <-consumer.Chan()
|
|
assert.Equal(t, 111, BytesToInt(msg.Payload()))
|
|
consumer.Ack(msg)
|
|
defer consumer.Close()
|
|
|
|
config = createConfig(groupID)
|
|
consumer2, err := newKafkaConsumer(config, topic, groupID, mqwrapper.SubscriptionPositionEarliest)
|
|
assert.NoError(t, err)
|
|
msg = <-consumer2.Chan()
|
|
assert.Equal(t, 111, BytesToInt(msg.Payload()))
|
|
consumer2.Ack(msg)
|
|
defer consumer2.Close()
|
|
}
|
|
|
|
func TestKafkaConsumer_createKafkaConsumer(t *testing.T) {
|
|
consumer := &Consumer{config: &kafka.ConfigMap{}}
|
|
err := consumer.createKafkaConsumer()
|
|
assert.NotNil(t, err)
|
|
}
|
|
|
|
func testKafkaConsumerProduceData(t *testing.T, topic string, data []int) {
|
|
ctx := context.Background()
|
|
kc := createKafkaClient(t)
|
|
defer kc.Close()
|
|
producer := createProducer(t, kc, topic)
|
|
defer producer.Close()
|
|
|
|
produceData(ctx, t, producer, data)
|
|
|
|
producer.(*kafkaProducer).p.Flush(500)
|
|
}
|
|
|
|
func createConfig(groupID string) *kafka.ConfigMap {
|
|
kafkaAddress := getKafkaBrokerList()
|
|
return &kafka.ConfigMap{
|
|
"bootstrap.servers": kafkaAddress,
|
|
"group.id": groupID,
|
|
"api.version.request": "true",
|
|
}
|
|
}
|