mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 01:58:34 +08:00
Related to #39095 https://go.dev/doc/modules/version-numbers Update pkg version according to golang dep version convention --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
89 lines
2.1 KiB
Go
89 lines
2.1 KiB
Go
package kafka
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/confluentinc/confluent-kafka-go/kafka"
|
|
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls"
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/helper"
|
|
)
|
|
|
|
var _ walimpls.ScannerImpls = (*scannerImpl)(nil)
|
|
|
|
// newScanner creates a new scanner.
|
|
func newScanner(scannerName string, exclude *kafkaID, consumer *kafka.Consumer) *scannerImpl {
|
|
s := &scannerImpl{
|
|
ScannerHelper: helper.NewScannerHelper(scannerName),
|
|
consumer: consumer,
|
|
msgChannel: make(chan message.ImmutableMessage, 1),
|
|
exclude: exclude,
|
|
}
|
|
go s.executeConsume()
|
|
return s
|
|
}
|
|
|
|
// scannerImpl is the implementation of ScannerImpls for kafka.
|
|
type scannerImpl struct {
|
|
*helper.ScannerHelper
|
|
consumer *kafka.Consumer
|
|
msgChannel chan message.ImmutableMessage
|
|
exclude *kafkaID
|
|
}
|
|
|
|
// Chan returns the channel of message.
|
|
func (s *scannerImpl) Chan() <-chan message.ImmutableMessage {
|
|
return s.msgChannel
|
|
}
|
|
|
|
// Close the scanner, release the underlying resources.
|
|
// Return the error same with `Error`
|
|
func (s *scannerImpl) Close() error {
|
|
s.consumer.Unassign()
|
|
err := s.ScannerHelper.Close()
|
|
s.consumer.Close()
|
|
return err
|
|
}
|
|
|
|
func (s *scannerImpl) executeConsume() {
|
|
defer close(s.msgChannel)
|
|
for {
|
|
msg, err := s.consumer.ReadMessage(200 * time.Millisecond)
|
|
if err != nil {
|
|
if s.Context().Err() != nil {
|
|
// context canceled, means the the scanner is closed.
|
|
s.Finish(nil)
|
|
return
|
|
}
|
|
if c, ok := err.(kafka.Error); ok && c.Code() == kafka.ErrTimedOut {
|
|
continue
|
|
}
|
|
s.Finish(err)
|
|
return
|
|
}
|
|
messageID := kafkaID(msg.TopicPartition.Offset)
|
|
if s.exclude != nil && messageID.EQ(*s.exclude) {
|
|
// Skip the message that is exclude for StartAfter semantics.
|
|
continue
|
|
}
|
|
|
|
properties := make(map[string]string, len(msg.Headers))
|
|
for _, header := range msg.Headers {
|
|
properties[header.Key] = string(header.Value)
|
|
}
|
|
|
|
newImmutableMessage := message.NewImmutableMesasge(
|
|
messageID,
|
|
msg.Value,
|
|
properties,
|
|
)
|
|
select {
|
|
case <-s.Context().Done():
|
|
s.Finish(nil)
|
|
return
|
|
case s.msgChannel <- newImmutableMessage:
|
|
}
|
|
}
|
|
}
|