Refactor kafka msgstream implementation (#17069)

Signed-off-by: yun.zhang <yun.zhang@zilliz.com>
This commit is contained in:
jaime 2022-05-23 12:39:58 +08:00 committed by GitHub
parent 33ddd8bb58
commit a20e0dfc74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 218 additions and 236 deletions

View File

@ -22,6 +22,7 @@ import (
"log"
"sync"
"testing"
"time"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
@ -101,8 +102,12 @@ import (
// assert.Equal(t, o3.BeginTs, p3.BeginTs)
//}
func TestStream_KafkaMsgStream_SeekToLast(t *testing.T) {
func skipTest(t *testing.T) {
t.Skip("skip kafka test")
}
func TestStream_KafkaMsgStream_SeekToLast(t *testing.T) {
skipTest(t)
kafkaAddress, _ := Params.Load("_KafkaBrokerList")
c := funcutil.RandomString(8)
@ -179,7 +184,7 @@ func TestStream_KafkaMsgStream_SeekToLast(t *testing.T) {
}
func TestStream_KafkaTtMsgStream_Seek(t *testing.T) {
t.Skip("skip kafka test")
skipTest(t)
kafkaAddress, _ := Params.Load("_KafkaBrokerList")
c1 := funcutil.RandomString(8)
@ -293,7 +298,7 @@ func TestStream_KafkaTtMsgStream_Seek(t *testing.T) {
}
func TestStream_KafkaTtMsgStream_1(t *testing.T) {
t.Skip("skip kafka test")
skipTest(t)
kafkaAddress, _ := Params.Load("_KafkaBrokerList")
c1 := funcutil.RandomString(8)
@ -340,7 +345,7 @@ func TestStream_KafkaTtMsgStream_1(t *testing.T) {
}
func TestStream_KafkaTtMsgStream_2(t *testing.T) {
t.Skip("skip kafka test")
skipTest(t)
kafkaAddress, _ := Params.Load("_KafkaBrokerList")
c1 := funcutil.RandomString(8)
@ -398,7 +403,7 @@ func TestStream_KafkaTtMsgStream_2(t *testing.T) {
}
func TestStream_KafkaTtMsgStream_DataNodeTimetickMsgstream(t *testing.T) {
t.Skip("skip kafka test")
skipTest(t)
kafkaAddress, _ := Params.Load("_KafkaBrokerList")
c1 := funcutil.RandomString(8)
@ -435,6 +440,9 @@ func TestStream_KafkaTtMsgStream_DataNodeTimetickMsgstream(t *testing.T) {
}
}()
// make producer start to produce messages after invoking Chan
time.Sleep(5 * time.Second)
inputStream1 := getKafkaInputStream(ctx, kafkaAddress, p1Channels)
msgPacks1 := createRandMsgPacks(2, 1, 1)
assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1))

View File

@ -43,6 +43,24 @@ func (kc *kafkaClient) getKafkaProducer() (*kafka.Producer, error) {
once.Do(func() {
config := kc.newProducerConfig()
Producer, err = kafka.NewProducer(config)
go func() {
for e := range Producer.Events() {
switch ev := e.(type) {
case kafka.Error:
// Generic client instance-level errors, such as broker connection failures,
// authentication issues, etc.
// After a fatal error has been raised, any subsequent Produce*() calls will fail with
// the original error code.
log.Error("kafka error", zap.Any("error msg", ev.Error()))
if ev.IsFatal() {
panic(ev)
}
default:
log.Debug("kafka producer event", zap.Any("event", ev))
}
}
}()
})
if err != nil {
@ -98,8 +116,8 @@ func (kc *kafkaClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwrap
func (kc *kafkaClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.Consumer, error) {
config := kc.newConsumerConfig(options.SubscriptionName, options.SubscriptionInitialPosition)
consumer := newKafkaConsumer(config, options.Topic, options.SubscriptionName)
return consumer, nil
consumer, err := newKafkaConsumer(config, options.Topic, options.SubscriptionName)
return consumer, err
}
func (kc *kafkaClient) EarliestMessageID() mqwrapper.MessageID {

View File

@ -187,154 +187,6 @@ func TestKafkaClient_ConsumeWithAck(t *testing.T) {
assert.Equal(t, len(arr), total3)
}
func ConsumeFromEarliestToRandomPosition(ctx context.Context, t *testing.T, kc *kafkaClient, topic string, subName string, c chan mqwrapper.MessageID, total *int) {
consumer, err := kc.Subscribe(mqwrapper.ConsumerOptions{
Topic: topic,
SubscriptionName: subName,
BufSize: 1024,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
})
assert.Nil(t, err)
assert.NotNil(t, consumer)
defer consumer.Close()
// get random number between 1 ~ 5
rand.Seed(time.Now().UnixNano())
cnt := 1 + rand.Int()%5
log.Info("Consume1 channel start")
var msg mqwrapper.Message
for i := 0; i < cnt; i++ {
select {
case <-ctx.Done():
log.Info("Consume1 channel closed")
return
case msg = <-consumer.Chan():
if msg == nil {
continue
}
v := BytesToInt(msg.Payload())
log.Info("Consume1 RECV", zap.Any("v", v))
(*total)++
}
}
c <- &kafkaID{messageID: msg.ID().(*kafkaID).messageID}
log.Info("Consume1 randomly RECV", zap.Any("number", cnt))
log.Info("Consume1 done")
}
// Consume2 will consume messages from specified MessageID
func consumeFromSpecifiedPositionToEnd(ctx context.Context, t *testing.T, kc *kafkaClient, topic string, subName string, msgID mqwrapper.MessageID, total *int) {
consumer, err := kc.Subscribe(mqwrapper.ConsumerOptions{
Topic: topic,
SubscriptionName: subName,
BufSize: 1024,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
})
assert.Nil(t, err)
assert.NotNil(t, consumer)
defer consumer.Close()
err = consumer.Seek(msgID, false)
assert.Nil(t, err)
log.Info("Consume2 start")
for {
select {
case <-ctx.Done():
log.Info("Consume2 channel closed")
return
case msg, ok := <-consumer.Chan():
if msg == nil || !ok {
return
}
v := BytesToInt(msg.Payload())
log.Info("Consume2 RECV", zap.Any("v", v))
(*total)++
}
}
}
func ConsumeFromEarliestToEndPosition(ctx context.Context, t *testing.T, kc *kafkaClient, topic string, subName string, total *int) {
consumer, err := kc.Subscribe(mqwrapper.ConsumerOptions{
Topic: topic,
SubscriptionName: subName,
BufSize: 1024,
SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest,
})
assert.Nil(t, err)
assert.NotNil(t, consumer)
defer consumer.Close()
log.Info("Consume3 start")
for {
select {
case <-ctx.Done():
log.Info("Consume3 channel closed")
return
case msg, ok := <-consumer.Chan():
if msg == nil || !ok {
return
}
v := BytesToInt(msg.Payload())
log.Info("Consume3 RECV", zap.Any("v", v))
(*total)++
}
}
}
func TestKafkaClient_ConsumeNoAck(t *testing.T) {
kc := createKafkaClient(t)
defer kc.Close()
assert.NotNil(t, kc)
rand.Seed(time.Now().UnixNano())
topic := fmt.Sprintf("test-topic-%d", rand.Int())
subName := fmt.Sprintf("test-subname-%d", rand.Int())
var total1 int
var total2 int
var total3 int
arr := []int{111, 222, 333, 444, 555, 666, 777}
ctx, cancel := context.WithCancel(context.Background())
producer := createProducer(t, kc, topic)
defer producer.Close()
produceData(ctx, t, producer, arr)
time.Sleep(100 * time.Millisecond)
ctx1, cancel1 := context.WithTimeout(ctx, 5*time.Second)
defer cancel1()
c := make(chan mqwrapper.MessageID, 1)
ConsumeFromEarliestToRandomPosition(ctx1, t, kc, topic, subName, c, &total1)
// record the last received message id
lastMsgID := <-c
log.Info("msg", zap.Any("lastMsgID", lastMsgID))
ctx2, cancel2 := context.WithTimeout(ctx, 5*time.Second)
defer cancel2()
consumeFromSpecifiedPositionToEnd(ctx2, t, kc, topic, subName, lastMsgID, &total2)
ctx3, cancel3 := context.WithTimeout(ctx, 5*time.Second)
defer cancel3()
ConsumeFromEarliestToEndPosition(ctx3, t, kc, topic, subName, &total3)
cancel()
//TODO enable, it seems that ack is unavailable
//assert.Equal(t, len(arr)*2, total1+total2)
assert.Equal(t, len(arr), total3)
}
func TestKafkaClient_SeekPosition(t *testing.T) {
kc := createKafkaClient(t)
defer kc.Close()
@ -365,6 +217,39 @@ func TestKafkaClient_SeekPosition(t *testing.T) {
}
}
func TestKafkaClient_ConsumeFromLatest(t *testing.T) {
kc := createKafkaClient(t)
defer kc.Close()
rand.Seed(time.Now().UnixNano())
ctx := context.Background()
topic := fmt.Sprintf("test-topic-%d", rand.Int())
subName := fmt.Sprintf("test-subname-%d", rand.Int())
producer := createProducer(t, kc, topic)
defer producer.Close()
data := []int{1, 2}
produceData(ctx, t, producer, data)
consumer := createConsumer(t, kc, topic, subName, mqwrapper.SubscriptionPositionLatest)
defer consumer.Close()
go func() {
time.Sleep(time.Second * 2)
data := []int{3}
produceData(ctx, t, producer, data)
}()
select {
case msg := <-consumer.Chan():
consumer.Ack(msg)
assert.Equal(t, 3, BytesToInt(msg.Payload()))
case <-time.After(5 * time.Second):
assert.FailNow(t, "should not wait")
}
}
func TestKafkaClient_EarliestMessageID(t *testing.T) {
kafkaAddress, _ := Params.Load("_KafkaBrokerList")
kc := NewKafkaClientInstance(kafkaAddress)
@ -374,6 +259,25 @@ func TestKafkaClient_EarliestMessageID(t *testing.T) {
assert.NotNil(t, mid)
}
func TestKafkaClient_MsgSerializAndDeserialize(t *testing.T) {
kafkaAddress, _ := Params.Load("_KafkaBrokerList")
kc := NewKafkaClientInstance(kafkaAddress)
defer kc.Close()
mid := kc.EarliestMessageID()
msgID, err := kc.BytesToMsgID(mid.Serialize())
assert.NoError(t, err)
assert.True(t, msgID.AtEarliestPosition())
msgID, err = kc.StringToMsgID("1")
assert.NoError(t, err)
assert.NotNil(t, msgID)
msgID, err = kc.StringToMsgID("1.0")
assert.Error(t, err)
assert.Nil(t, msgID)
}
func createKafkaClient(t *testing.T) *kafkaClient {
kafkaAddress, _ := Params.Load("_KafkaBrokerList")
kc := NewKafkaClientInstance(kafkaAddress)

View File

@ -1,6 +1,7 @@
package kafka
import (
"errors"
"sync"
"time"
@ -15,7 +16,7 @@ type Consumer struct {
config *kafka.ConfigMap
msgChannel chan mqwrapper.Message
hasSeek bool
isStarted bool
hasConsume bool
skipMsg bool
topic string
groupID string
@ -24,7 +25,7 @@ type Consumer struct {
closeOnce sync.Once
}
func newKafkaConsumer(config *kafka.ConfigMap, topic string, groupID string) *Consumer {
func newKafkaConsumer(config *kafka.ConfigMap, topic string, groupID string) (*Consumer, error) {
closeCh := make(chan struct{})
msgChannel := make(chan mqwrapper.Message, 256)
@ -36,56 +37,20 @@ func newKafkaConsumer(config *kafka.ConfigMap, topic string, groupID string) *Co
closeCh: closeCh,
}
kafkaConsumer.createKafkaConsumer()
return kafkaConsumer
err := kafkaConsumer.createKafkaConsumer()
return kafkaConsumer, err
}
func (kc *Consumer) createKafkaConsumer() error {
var err error
kc.c, err = kafka.NewConsumer(kc.config)
if err != nil {
log.Fatal("create kafka consumer failed", zap.String("topic", kc.topic), zap.Error(err))
log.Error("create kafka consumer failed", zap.String("topic", kc.topic), zap.Error(err))
return err
}
return nil
}
func (kc *Consumer) startReceiveMsgTask() {
if kc.isStarted {
return
}
if !kc.hasSeek {
tps := []kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx}}
if err := kc.c.Assign(tps); err != nil {
log.Error("kafka consumer assign failed ", zap.String("topic name", kc.topic), zap.Error(err))
panic(err)
}
}
go func() {
for ev := range kc.c.Events() {
switch e := ev.(type) {
case *kafka.Message:
if kc.skipMsg {
kc.skipMsg = false
continue
}
kc.msgChannel <- &kafkaMessage{msg: e}
case kafka.Error:
log.Error("read msg failed", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Error(e))
}
}
if kc.msgChannel != nil {
close(kc.msgChannel)
}
}()
kc.isStarted = true
}
func (kc *Consumer) Subscription() string {
return kc.groupID
}
@ -96,14 +61,73 @@ func (kc *Consumer) Subscription() string {
// https://github.com/confluentinc/confluent-kafka-go.
func (kc *Consumer) Chan() <-chan mqwrapper.Message {
kc.chanOnce.Do(func() {
kc.startReceiveMsgTask()
if !kc.hasSeek {
offsetStr, err := kc.config.Get("auto.offset.reset", "earliest")
if err != nil {
log.Error("get auto.offset.reset config fail in kafka consumer", zap.String("topic name", kc.topic), zap.Error(err))
panic(err)
}
offset, err := kafka.NewOffset(offsetStr)
if err != nil {
log.Error("Invalid kafka offset", zap.String("topic name", kc.topic), zap.Error(err))
panic(err)
}
// we assume that case is Chan starting before producing message with auto create topic config,
// consuming messages will fail that error is 'Subscribed topic not available'
// if invoke Subscribe method of kafka, so we use Assign instead of Subscribe.
tps := []kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}}
if err := kc.c.Assign(tps); err != nil {
log.Error("kafka consumer subscribe failed ", zap.String("topic name", kc.topic), zap.Error(err))
panic(err)
}
log.Debug("starting kafka consume", zap.String("topic name", kc.topic), zap.Any("offset", offset))
}
go func() {
// loop end if consumer is closed
for ev := range kc.c.Events() {
switch e := ev.(type) {
case *kafka.Message:
if kc.skipMsg {
kc.skipMsg = false
continue
}
kc.msgChannel <- &kafkaMessage{msg: e}
case kafka.Error:
log.Error("consume msg failed", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Error(e))
if ev.(kafka.Error).IsFatal() {
panic(e)
}
}
}
if kc.msgChannel != nil {
close(kc.msgChannel)
}
}()
kc.hasConsume = true
})
return kc.msgChannel
}
func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error {
if kc.hasSeek {
return errors.New("unsupported multiple seek with the same kafka consumer")
}
if kc.hasConsume {
return errors.New("unsupported seek after consume message with the same kafka consumer")
}
start := time.Now()
offset := kafka.Offset(id.(*kafkaID).messageID)
log.Debug("kafka consumer seek ", zap.String("topic name", kc.topic),
log.Debug("kafka consumer seek start", zap.String("topic name", kc.topic),
zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive))
err := kc.c.Assign([]kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}})
@ -112,6 +136,12 @@ func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error {
return err
}
cost := time.Since(start).Milliseconds()
if cost > 100 {
log.Debug("kafka consumer assign take too long!", zap.String("topic name", kc.topic),
zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive), zap.Int64("time cost(ms)", cost))
}
// If seek timeout is not 0 the call twice will return error isStarted RD_KAFKA_RESP_ERR__STATE.
// if the timeout is 0 it will initiate the seek but return immediately without any error reporting
kc.skipMsg = !inclusive
@ -123,7 +153,11 @@ func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error {
}
kc.hasSeek = true
kc.startReceiveMsgTask()
cost = time.Since(start).Milliseconds()
log.Debug("kafka consumer seek finished", zap.String("topic name", kc.topic),
zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive), zap.Int64("time cost(ms)", cost))
return nil
}

View File

@ -17,7 +17,8 @@ func TestKafkaConsumer_Subscription(t *testing.T) {
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
config := createConfig(groupID)
kc := newKafkaConsumer(config, topic, groupID)
kc, err := newKafkaConsumer(config, topic, groupID)
assert.NoError(t, err)
defer kc.Close()
assert.Equal(t, kc.Subscription(), groupID)
}
@ -28,14 +29,15 @@ func TestKafkaConsumer_Chan(t *testing.T) {
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
config := createConfig(groupID)
consumer := newKafkaConsumer(config, topic, groupID)
consumer, err := newKafkaConsumer(config, topic, groupID)
assert.NoError(t, err)
defer consumer.Close()
data := []int{111, 222, 333}
testKafkaConsumerProduceData(t, topic, data)
msgID := &kafkaID{messageID: 1}
err := consumer.Seek(msgID, false)
err = consumer.Seek(msgID, false)
assert.Nil(t, err)
msg := <-consumer.Chan()
@ -51,12 +53,37 @@ func TestKafkaConsumer_GetSeek(t *testing.T) {
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
config := createConfig(groupID)
consumer := newKafkaConsumer(config, topic, groupID)
consumer, err := newKafkaConsumer(config, topic, groupID)
assert.NoError(t, err)
defer consumer.Close()
msgID := &kafkaID{messageID: 0}
err := consumer.Seek(msgID, false)
err = consumer.Seek(msgID, false)
assert.Nil(t, err)
assert.Panics(t, func() {
consumer.Seek(msgID, false)
})
}
func TestKafkaConsumer_SeekAfterChan(t *testing.T) {
rand.Seed(time.Now().UnixNano())
groupID := fmt.Sprintf("test-groupid-%d", rand.Int())
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
config := createConfig(groupID)
consumer, err := newKafkaConsumer(config, topic, groupID)
assert.NoError(t, err)
defer consumer.Close()
data := []int{111}
testKafkaConsumerProduceData(t, topic, data)
msg := <-consumer.Chan()
assert.Equal(t, 111, BytesToInt(msg.Payload()))
assert.Panics(t, func() {
consumer.Seek(nil, false)
})
}
func TestKafkaConsumer_GetLatestMsgID(t *testing.T) {
@ -65,7 +92,8 @@ func TestKafkaConsumer_GetLatestMsgID(t *testing.T) {
topic := fmt.Sprintf("test-topicName-%d", rand.Int())
config := createConfig(groupID)
consumer := newKafkaConsumer(config, topic, groupID)
consumer, err := newKafkaConsumer(config, topic, groupID)
assert.NoError(t, err)
defer consumer.Close()
latestMsgID, err := consumer.GetLatestMsgID()
@ -88,6 +116,8 @@ func testKafkaConsumerProduceData(t *testing.T, topic string, data []int) {
defer producer.Close()
produceData(ctx, t, producer, data)
time.Sleep(5 * time.Second)
}
func createConfig(groupID string) *kafka.ConfigMap {

View File

@ -2,11 +2,10 @@ package kafka
import (
"context"
"errors"
"sync"
"time"
"github.com/pkg/errors"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
@ -26,38 +25,27 @@ func (kp *kafkaProducer) Topic() string {
}
func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMessage) (mqwrapper.MessageID, error) {
var err error
maxAttempt := 3
err := kp.p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx},
Value: message.Payload,
}, kp.deliveryChan)
// In order to avoid https://github.com/confluentinc/confluent-kafka-go/issues/769,
// just retry produce again when getting a nil from delivery chan.
for i := 0; i < maxAttempt; i++ {
err = kp.p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx},
Value: message.Payload,
}, kp.deliveryChan)
if err != nil {
break
}
e := <-kp.deliveryChan
if e == nil {
errMsg := "produce message arise exception, delivery Chan return a nil value"
err = errors.New(errMsg)
log.Warn(errMsg, zap.String("topic", kp.topic), zap.ByteString("msg", message.Payload), zap.Int("retries", i))
continue
}
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
return nil, m.TopicPartition.Error
}
return &kafkaID{messageID: int64(m.TopicPartition.Offset)}, nil
if err != nil {
return nil, err
}
return nil, err
e, ok := <-kp.deliveryChan
if !ok {
log.Error("kafka produce message fail because of delivery chan is closed", zap.String("topic", kp.topic))
return nil, errors.New("delivery chan of kafka producer is closed")
}
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
return nil, m.TopicPartition.Error
}
return &kafkaID{messageID: int64(m.TopicPartition.Offset)}, nil
}
func (kp *kafkaProducer) Close() {