From a20e0dfc74ff552405512087fa6fa5b6bc56a259 Mon Sep 17 00:00:00 2001 From: jaime Date: Mon, 23 May 2022 12:39:58 +0800 Subject: [PATCH] Refactor kafka msgstream implementation (#17069) Signed-off-by: yun.zhang --- .../mq/msgstream/mq_kafka_msgstream_test.go | 18 +- .../msgstream/mqwrapper/kafka/kafka_client.go | 22 +- .../mqwrapper/kafka/kafka_client_test.go | 200 +++++------------- .../mqwrapper/kafka/kafka_consumer.go | 122 +++++++---- .../mqwrapper/kafka/kafka_consumer_test.go | 42 +++- .../mqwrapper/kafka/kafka_producer.go | 50 ++--- 6 files changed, 218 insertions(+), 236 deletions(-) diff --git a/internal/mq/msgstream/mq_kafka_msgstream_test.go b/internal/mq/msgstream/mq_kafka_msgstream_test.go index 7a1132a803..097295be2d 100644 --- a/internal/mq/msgstream/mq_kafka_msgstream_test.go +++ b/internal/mq/msgstream/mq_kafka_msgstream_test.go @@ -22,6 +22,7 @@ import ( "log" "sync" "testing" + "time" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" @@ -101,8 +102,12 @@ import ( // assert.Equal(t, o3.BeginTs, p3.BeginTs) //} -func TestStream_KafkaMsgStream_SeekToLast(t *testing.T) { +func skipTest(t *testing.T) { t.Skip("skip kafka test") +} + +func TestStream_KafkaMsgStream_SeekToLast(t *testing.T) { + skipTest(t) kafkaAddress, _ := Params.Load("_KafkaBrokerList") c := funcutil.RandomString(8) @@ -179,7 +184,7 @@ func TestStream_KafkaMsgStream_SeekToLast(t *testing.T) { } func TestStream_KafkaTtMsgStream_Seek(t *testing.T) { - t.Skip("skip kafka test") + skipTest(t) kafkaAddress, _ := Params.Load("_KafkaBrokerList") c1 := funcutil.RandomString(8) @@ -293,7 +298,7 @@ func TestStream_KafkaTtMsgStream_Seek(t *testing.T) { } func TestStream_KafkaTtMsgStream_1(t *testing.T) { - t.Skip("skip kafka test") + skipTest(t) kafkaAddress, _ := Params.Load("_KafkaBrokerList") c1 := funcutil.RandomString(8) @@ -340,7 +345,7 @@ func TestStream_KafkaTtMsgStream_1(t *testing.T) { } func TestStream_KafkaTtMsgStream_2(t *testing.T) { - t.Skip("skip kafka test") + skipTest(t) kafkaAddress, _ := Params.Load("_KafkaBrokerList") c1 := funcutil.RandomString(8) @@ -398,7 +403,7 @@ func TestStream_KafkaTtMsgStream_2(t *testing.T) { } func TestStream_KafkaTtMsgStream_DataNodeTimetickMsgstream(t *testing.T) { - t.Skip("skip kafka test") + skipTest(t) kafkaAddress, _ := Params.Load("_KafkaBrokerList") c1 := funcutil.RandomString(8) @@ -435,6 +440,9 @@ func TestStream_KafkaTtMsgStream_DataNodeTimetickMsgstream(t *testing.T) { } }() + // make producer start to produce messages after invoking Chan + time.Sleep(5 * time.Second) + inputStream1 := getKafkaInputStream(ctx, kafkaAddress, p1Channels) msgPacks1 := createRandMsgPacks(2, 1, 1) assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1)) diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go index 347bf66883..caf0a92867 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go @@ -43,6 +43,24 @@ func (kc *kafkaClient) getKafkaProducer() (*kafka.Producer, error) { once.Do(func() { config := kc.newProducerConfig() Producer, err = kafka.NewProducer(config) + + go func() { + for e := range Producer.Events() { + switch ev := e.(type) { + case kafka.Error: + // Generic client instance-level errors, such as broker connection failures, + // authentication issues, etc. + // After a fatal error has been raised, any subsequent Produce*() calls will fail with + // the original error code. + log.Error("kafka error", zap.Any("error msg", ev.Error())) + if ev.IsFatal() { + panic(ev) + } + default: + log.Debug("kafka producer event", zap.Any("event", ev)) + } + } + }() }) if err != nil { @@ -98,8 +116,8 @@ func (kc *kafkaClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwrap func (kc *kafkaClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.Consumer, error) { config := kc.newConsumerConfig(options.SubscriptionName, options.SubscriptionInitialPosition) - consumer := newKafkaConsumer(config, options.Topic, options.SubscriptionName) - return consumer, nil + consumer, err := newKafkaConsumer(config, options.Topic, options.SubscriptionName) + return consumer, err } func (kc *kafkaClient) EarliestMessageID() mqwrapper.MessageID { diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go index 2f75c19632..6a130c6b73 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go @@ -187,154 +187,6 @@ func TestKafkaClient_ConsumeWithAck(t *testing.T) { assert.Equal(t, len(arr), total3) } -func ConsumeFromEarliestToRandomPosition(ctx context.Context, t *testing.T, kc *kafkaClient, topic string, subName string, c chan mqwrapper.MessageID, total *int) { - consumer, err := kc.Subscribe(mqwrapper.ConsumerOptions{ - Topic: topic, - SubscriptionName: subName, - BufSize: 1024, - SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest, - }) - assert.Nil(t, err) - assert.NotNil(t, consumer) - defer consumer.Close() - - // get random number between 1 ~ 5 - rand.Seed(time.Now().UnixNano()) - cnt := 1 + rand.Int()%5 - - log.Info("Consume1 channel start") - var msg mqwrapper.Message - for i := 0; i < cnt; i++ { - select { - case <-ctx.Done(): - log.Info("Consume1 channel closed") - return - case msg = <-consumer.Chan(): - if msg == nil { - continue - } - - v := BytesToInt(msg.Payload()) - log.Info("Consume1 RECV", zap.Any("v", v)) - (*total)++ - } - } - - c <- &kafkaID{messageID: msg.ID().(*kafkaID).messageID} - - log.Info("Consume1 randomly RECV", zap.Any("number", cnt)) - log.Info("Consume1 done") -} - -// Consume2 will consume messages from specified MessageID -func consumeFromSpecifiedPositionToEnd(ctx context.Context, t *testing.T, kc *kafkaClient, topic string, subName string, msgID mqwrapper.MessageID, total *int) { - consumer, err := kc.Subscribe(mqwrapper.ConsumerOptions{ - Topic: topic, - SubscriptionName: subName, - BufSize: 1024, - SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest, - }) - assert.Nil(t, err) - assert.NotNil(t, consumer) - defer consumer.Close() - - err = consumer.Seek(msgID, false) - assert.Nil(t, err) - - log.Info("Consume2 start") - for { - select { - case <-ctx.Done(): - log.Info("Consume2 channel closed") - return - case msg, ok := <-consumer.Chan(): - if msg == nil || !ok { - return - } - - v := BytesToInt(msg.Payload()) - log.Info("Consume2 RECV", zap.Any("v", v)) - (*total)++ - } - } - -} - -func ConsumeFromEarliestToEndPosition(ctx context.Context, t *testing.T, kc *kafkaClient, topic string, subName string, total *int) { - consumer, err := kc.Subscribe(mqwrapper.ConsumerOptions{ - Topic: topic, - SubscriptionName: subName, - BufSize: 1024, - SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest, - }) - assert.Nil(t, err) - assert.NotNil(t, consumer) - defer consumer.Close() - - log.Info("Consume3 start") - for { - select { - case <-ctx.Done(): - log.Info("Consume3 channel closed") - return - case msg, ok := <-consumer.Chan(): - if msg == nil || !ok { - return - } - v := BytesToInt(msg.Payload()) - log.Info("Consume3 RECV", zap.Any("v", v)) - (*total)++ - } - } -} - -func TestKafkaClient_ConsumeNoAck(t *testing.T) { - kc := createKafkaClient(t) - defer kc.Close() - assert.NotNil(t, kc) - - rand.Seed(time.Now().UnixNano()) - topic := fmt.Sprintf("test-topic-%d", rand.Int()) - subName := fmt.Sprintf("test-subname-%d", rand.Int()) - - var total1 int - var total2 int - var total3 int - - arr := []int{111, 222, 333, 444, 555, 666, 777} - ctx, cancel := context.WithCancel(context.Background()) - - producer := createProducer(t, kc, topic) - defer producer.Close() - produceData(ctx, t, producer, arr) - time.Sleep(100 * time.Millisecond) - - ctx1, cancel1 := context.WithTimeout(ctx, 5*time.Second) - defer cancel1() - - c := make(chan mqwrapper.MessageID, 1) - ConsumeFromEarliestToRandomPosition(ctx1, t, kc, topic, subName, c, &total1) - - // record the last received message id - lastMsgID := <-c - log.Info("msg", zap.Any("lastMsgID", lastMsgID)) - - ctx2, cancel2 := context.WithTimeout(ctx, 5*time.Second) - defer cancel2() - consumeFromSpecifiedPositionToEnd(ctx2, t, kc, topic, subName, lastMsgID, &total2) - - ctx3, cancel3 := context.WithTimeout(ctx, 5*time.Second) - defer cancel3() - ConsumeFromEarliestToEndPosition(ctx3, t, kc, topic, subName, &total3) - - cancel() - - //TODO enable, it seems that ack is unavailable - //assert.Equal(t, len(arr)*2, total1+total2) - - assert.Equal(t, len(arr), total3) -} - func TestKafkaClient_SeekPosition(t *testing.T) { kc := createKafkaClient(t) defer kc.Close() @@ -365,6 +217,39 @@ func TestKafkaClient_SeekPosition(t *testing.T) { } } +func TestKafkaClient_ConsumeFromLatest(t *testing.T) { + kc := createKafkaClient(t) + defer kc.Close() + + rand.Seed(time.Now().UnixNano()) + ctx := context.Background() + topic := fmt.Sprintf("test-topic-%d", rand.Int()) + subName := fmt.Sprintf("test-subname-%d", rand.Int()) + + producer := createProducer(t, kc, topic) + defer producer.Close() + + data := []int{1, 2} + produceData(ctx, t, producer, data) + + consumer := createConsumer(t, kc, topic, subName, mqwrapper.SubscriptionPositionLatest) + defer consumer.Close() + + go func() { + time.Sleep(time.Second * 2) + data := []int{3} + produceData(ctx, t, producer, data) + }() + + select { + case msg := <-consumer.Chan(): + consumer.Ack(msg) + assert.Equal(t, 3, BytesToInt(msg.Payload())) + case <-time.After(5 * time.Second): + assert.FailNow(t, "should not wait") + } +} + func TestKafkaClient_EarliestMessageID(t *testing.T) { kafkaAddress, _ := Params.Load("_KafkaBrokerList") kc := NewKafkaClientInstance(kafkaAddress) @@ -374,6 +259,25 @@ func TestKafkaClient_EarliestMessageID(t *testing.T) { assert.NotNil(t, mid) } +func TestKafkaClient_MsgSerializAndDeserialize(t *testing.T) { + kafkaAddress, _ := Params.Load("_KafkaBrokerList") + kc := NewKafkaClientInstance(kafkaAddress) + defer kc.Close() + + mid := kc.EarliestMessageID() + msgID, err := kc.BytesToMsgID(mid.Serialize()) + assert.NoError(t, err) + assert.True(t, msgID.AtEarliestPosition()) + + msgID, err = kc.StringToMsgID("1") + assert.NoError(t, err) + assert.NotNil(t, msgID) + + msgID, err = kc.StringToMsgID("1.0") + assert.Error(t, err) + assert.Nil(t, msgID) +} + func createKafkaClient(t *testing.T) *kafkaClient { kafkaAddress, _ := Params.Load("_KafkaBrokerList") kc := NewKafkaClientInstance(kafkaAddress) diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go index ef7a60d75e..6254ad30b0 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go @@ -1,6 +1,7 @@ package kafka import ( + "errors" "sync" "time" @@ -15,7 +16,7 @@ type Consumer struct { config *kafka.ConfigMap msgChannel chan mqwrapper.Message hasSeek bool - isStarted bool + hasConsume bool skipMsg bool topic string groupID string @@ -24,7 +25,7 @@ type Consumer struct { closeOnce sync.Once } -func newKafkaConsumer(config *kafka.ConfigMap, topic string, groupID string) *Consumer { +func newKafkaConsumer(config *kafka.ConfigMap, topic string, groupID string) (*Consumer, error) { closeCh := make(chan struct{}) msgChannel := make(chan mqwrapper.Message, 256) @@ -36,56 +37,20 @@ func newKafkaConsumer(config *kafka.ConfigMap, topic string, groupID string) *Co closeCh: closeCh, } - kafkaConsumer.createKafkaConsumer() - return kafkaConsumer + err := kafkaConsumer.createKafkaConsumer() + return kafkaConsumer, err } func (kc *Consumer) createKafkaConsumer() error { var err error kc.c, err = kafka.NewConsumer(kc.config) if err != nil { - log.Fatal("create kafka consumer failed", zap.String("topic", kc.topic), zap.Error(err)) + log.Error("create kafka consumer failed", zap.String("topic", kc.topic), zap.Error(err)) return err } return nil } -func (kc *Consumer) startReceiveMsgTask() { - if kc.isStarted { - return - } - - if !kc.hasSeek { - tps := []kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx}} - if err := kc.c.Assign(tps); err != nil { - log.Error("kafka consumer assign failed ", zap.String("topic name", kc.topic), zap.Error(err)) - panic(err) - } - } - - go func() { - for ev := range kc.c.Events() { - switch e := ev.(type) { - case *kafka.Message: - if kc.skipMsg { - kc.skipMsg = false - continue - } - - kc.msgChannel <- &kafkaMessage{msg: e} - case kafka.Error: - log.Error("read msg failed", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Error(e)) - } - } - - if kc.msgChannel != nil { - close(kc.msgChannel) - } - }() - - kc.isStarted = true -} - func (kc *Consumer) Subscription() string { return kc.groupID } @@ -96,14 +61,73 @@ func (kc *Consumer) Subscription() string { // https://github.com/confluentinc/confluent-kafka-go. func (kc *Consumer) Chan() <-chan mqwrapper.Message { kc.chanOnce.Do(func() { - kc.startReceiveMsgTask() + if !kc.hasSeek { + offsetStr, err := kc.config.Get("auto.offset.reset", "earliest") + if err != nil { + log.Error("get auto.offset.reset config fail in kafka consumer", zap.String("topic name", kc.topic), zap.Error(err)) + panic(err) + } + + offset, err := kafka.NewOffset(offsetStr) + if err != nil { + log.Error("Invalid kafka offset", zap.String("topic name", kc.topic), zap.Error(err)) + panic(err) + } + + // we assume that case is Chan starting before producing message with auto create topic config, + // consuming messages will fail that error is 'Subscribed topic not available' + // if invoke Subscribe method of kafka, so we use Assign instead of Subscribe. + tps := []kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}} + if err := kc.c.Assign(tps); err != nil { + log.Error("kafka consumer subscribe failed ", zap.String("topic name", kc.topic), zap.Error(err)) + panic(err) + } + + log.Debug("starting kafka consume", zap.String("topic name", kc.topic), zap.Any("offset", offset)) + } + + go func() { + // loop end if consumer is closed + for ev := range kc.c.Events() { + switch e := ev.(type) { + case *kafka.Message: + if kc.skipMsg { + kc.skipMsg = false + continue + } + + kc.msgChannel <- &kafkaMessage{msg: e} + case kafka.Error: + log.Error("consume msg failed", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Error(e)) + if ev.(kafka.Error).IsFatal() { + panic(e) + } + } + } + + if kc.msgChannel != nil { + close(kc.msgChannel) + } + }() + + kc.hasConsume = true }) + return kc.msgChannel } func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error { + if kc.hasSeek { + return errors.New("unsupported multiple seek with the same kafka consumer") + } + + if kc.hasConsume { + return errors.New("unsupported seek after consume message with the same kafka consumer") + } + + start := time.Now() offset := kafka.Offset(id.(*kafkaID).messageID) - log.Debug("kafka consumer seek ", zap.String("topic name", kc.topic), + log.Debug("kafka consumer seek start", zap.String("topic name", kc.topic), zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive)) err := kc.c.Assign([]kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}}) @@ -112,6 +136,12 @@ func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error { return err } + cost := time.Since(start).Milliseconds() + if cost > 100 { + log.Debug("kafka consumer assign take too long!", zap.String("topic name", kc.topic), + zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive), zap.Int64("time cost(ms)", cost)) + } + // If seek timeout is not 0 the call twice will return error isStarted RD_KAFKA_RESP_ERR__STATE. // if the timeout is 0 it will initiate the seek but return immediately without any error reporting kc.skipMsg = !inclusive @@ -123,7 +153,11 @@ func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error { } kc.hasSeek = true - kc.startReceiveMsgTask() + + cost = time.Since(start).Milliseconds() + log.Debug("kafka consumer seek finished", zap.String("topic name", kc.topic), + zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive), zap.Int64("time cost(ms)", cost)) + return nil } diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go index 6f5682ea29..47213f45c9 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go @@ -17,7 +17,8 @@ func TestKafkaConsumer_Subscription(t *testing.T) { topic := fmt.Sprintf("test-topicName-%d", rand.Int()) config := createConfig(groupID) - kc := newKafkaConsumer(config, topic, groupID) + kc, err := newKafkaConsumer(config, topic, groupID) + assert.NoError(t, err) defer kc.Close() assert.Equal(t, kc.Subscription(), groupID) } @@ -28,14 +29,15 @@ func TestKafkaConsumer_Chan(t *testing.T) { topic := fmt.Sprintf("test-topicName-%d", rand.Int()) config := createConfig(groupID) - consumer := newKafkaConsumer(config, topic, groupID) + consumer, err := newKafkaConsumer(config, topic, groupID) + assert.NoError(t, err) defer consumer.Close() data := []int{111, 222, 333} testKafkaConsumerProduceData(t, topic, data) msgID := &kafkaID{messageID: 1} - err := consumer.Seek(msgID, false) + err = consumer.Seek(msgID, false) assert.Nil(t, err) msg := <-consumer.Chan() @@ -51,12 +53,37 @@ func TestKafkaConsumer_GetSeek(t *testing.T) { topic := fmt.Sprintf("test-topicName-%d", rand.Int()) config := createConfig(groupID) - consumer := newKafkaConsumer(config, topic, groupID) + consumer, err := newKafkaConsumer(config, topic, groupID) + assert.NoError(t, err) defer consumer.Close() msgID := &kafkaID{messageID: 0} - err := consumer.Seek(msgID, false) + err = consumer.Seek(msgID, false) assert.Nil(t, err) + + assert.Panics(t, func() { + consumer.Seek(msgID, false) + }) +} + +func TestKafkaConsumer_SeekAfterChan(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + groupID := fmt.Sprintf("test-groupid-%d", rand.Int()) + topic := fmt.Sprintf("test-topicName-%d", rand.Int()) + + config := createConfig(groupID) + consumer, err := newKafkaConsumer(config, topic, groupID) + assert.NoError(t, err) + defer consumer.Close() + + data := []int{111} + testKafkaConsumerProduceData(t, topic, data) + msg := <-consumer.Chan() + assert.Equal(t, 111, BytesToInt(msg.Payload())) + + assert.Panics(t, func() { + consumer.Seek(nil, false) + }) } func TestKafkaConsumer_GetLatestMsgID(t *testing.T) { @@ -65,7 +92,8 @@ func TestKafkaConsumer_GetLatestMsgID(t *testing.T) { topic := fmt.Sprintf("test-topicName-%d", rand.Int()) config := createConfig(groupID) - consumer := newKafkaConsumer(config, topic, groupID) + consumer, err := newKafkaConsumer(config, topic, groupID) + assert.NoError(t, err) defer consumer.Close() latestMsgID, err := consumer.GetLatestMsgID() @@ -88,6 +116,8 @@ func testKafkaConsumerProduceData(t *testing.T, topic string, data []int) { defer producer.Close() produceData(ctx, t, producer, data) + + time.Sleep(5 * time.Second) } func createConfig(groupID string) *kafka.ConfigMap { diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go index ca39855684..f40e13107f 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go @@ -2,11 +2,10 @@ package kafka import ( "context" + "errors" "sync" "time" - "github.com/pkg/errors" - "github.com/milvus-io/milvus/internal/log" "go.uber.org/zap" @@ -26,38 +25,27 @@ func (kp *kafkaProducer) Topic() string { } func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMessage) (mqwrapper.MessageID, error) { - var err error - maxAttempt := 3 + err := kp.p.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx}, + Value: message.Payload, + }, kp.deliveryChan) - // In order to avoid https://github.com/confluentinc/confluent-kafka-go/issues/769, - // just retry produce again when getting a nil from delivery chan. - for i := 0; i < maxAttempt; i++ { - err = kp.p.Produce(&kafka.Message{ - TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx}, - Value: message.Payload, - }, kp.deliveryChan) - - if err != nil { - break - } - - e := <-kp.deliveryChan - if e == nil { - errMsg := "produce message arise exception, delivery Chan return a nil value" - err = errors.New(errMsg) - log.Warn(errMsg, zap.String("topic", kp.topic), zap.ByteString("msg", message.Payload), zap.Int("retries", i)) - continue - } - - m := e.(*kafka.Message) - if m.TopicPartition.Error != nil { - return nil, m.TopicPartition.Error - } - - return &kafkaID{messageID: int64(m.TopicPartition.Offset)}, nil + if err != nil { + return nil, err } - return nil, err + e, ok := <-kp.deliveryChan + if !ok { + log.Error("kafka produce message fail because of delivery chan is closed", zap.String("topic", kp.topic)) + return nil, errors.New("delivery chan of kafka producer is closed") + } + + m := e.(*kafka.Message) + if m.TopicPartition.Error != nil { + return nil, m.TopicPartition.Error + } + + return &kafkaID{messageID: int64(m.TopicPartition.Offset)}, nil } func (kp *kafkaProducer) Close() {