From ed3c29eae7b7327ef773bdd9de5c63dd27f0adc0 Mon Sep 17 00:00:00 2001 From: jaime Date: Tue, 12 Apr 2022 19:47:33 +0800 Subject: [PATCH] Support Kafka (#16149) Signed-off-by: yun.zhang --- cmd/roles/roles.go | 28 +- configs/milvus.yaml | 13 +- deployments/docker/dev/docker-compose.yml | 25 + go.mod | 1 + go.sum | 2 + internal/datacoord/server.go | 2 +- internal/kv/mem/mem_kv.go | 11 + internal/mq/msgstream/mq_factory.go | 32 +- .../mq/msgstream/mq_kafka_msgstream_test.go | 489 ++++++++++++++++++ internal/mq/msgstream/mq_msgstream.go | 37 +- internal/mq/msgstream/mq_msgstream_test.go | 3 +- internal/mq/msgstream/mqwrapper/consumer.go | 3 +- .../msgstream/mqwrapper/kafka/kafka_client.go | 108 ++++ .../mqwrapper/kafka/kafka_client_test.go | 418 +++++++++++++++ .../mqwrapper/kafka/kafka_consumer.go | 162 ++++++ .../mqwrapper/kafka/kafka_consumer_test.go | 102 ++++ .../mq/msgstream/mqwrapper/kafka/kafka_id.go | 34 ++ .../mqwrapper/kafka/kafka_id_test.go | 56 ++ .../mqwrapper/kafka/kafka_message.go | 27 + .../mqwrapper/kafka/kafka_message_test.go | 18 + .../mqwrapper/kafka/kafka_producer.go | 59 +++ .../mqwrapper/kafka/kafka_producer_test.go | 69 +++ internal/querycoord/query_coord.go | 3 +- internal/rootcoord/root_coord.go | 3 - internal/rootcoord/root_coord_test.go | 3 +- internal/util/dependency/factory.go | 57 +- internal/util/paramtable/base_table.go | 24 +- internal/util/paramtable/base_table_test.go | 11 + internal/util/paramtable/component_param.go | 10 + internal/util/paramtable/service_param.go | 21 + scripts/run_go_codecov.sh | 10 +- scripts/run_go_unittest.sh | 2 +- 32 files changed, 1795 insertions(+), 48 deletions(-) create mode 100644 internal/mq/msgstream/mq_kafka_msgstream_test.go create mode 100644 internal/mq/msgstream/mqwrapper/kafka/kafka_client.go create mode 100644 internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go create mode 100644 internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go create mode 100644 internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go create mode 100644 internal/mq/msgstream/mqwrapper/kafka/kafka_id.go create mode 100644 internal/mq/msgstream/mqwrapper/kafka/kafka_id_test.go create mode 100644 internal/mq/msgstream/mqwrapper/kafka/kafka_message.go create mode 100644 internal/mq/msgstream/mqwrapper/kafka/kafka_message_test.go create mode 100644 internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go create mode 100644 internal/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 7e0f0dc100..296ba386bf 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -101,7 +101,7 @@ func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool) *compone } else { rootcoord.Params.SetLogConfig(typeutil.RootCoordRole) } - factory := dependency.NewDefaultFactory(localMsg) + factory := dependency.NewFactory(localMsg) var err error rc, err = components.NewRootCoord(ctx, factory) if err != nil { @@ -133,7 +133,7 @@ func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, alias string proxy.Params.SetLogConfig(typeutil.ProxyRole) } - factory := dependency.NewDefaultFactory(localMsg) + factory := dependency.NewFactory(localMsg) var err error pn, err = components.NewProxy(ctx, factory) if err != nil { @@ -164,7 +164,7 @@ func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool) *compon querycoord.Params.SetLogConfig(typeutil.QueryCoordRole) } - factory := dependency.NewDefaultFactory(localMsg) + factory := dependency.NewFactory(localMsg) var err error qs, err = components.NewQueryCoord(ctx, factory) if err != nil { @@ -196,7 +196,7 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, alias st querynode.Params.SetLogConfig(typeutil.QueryNodeRole) } - factory := dependency.NewDefaultFactory(localMsg) + factory := dependency.NewFactory(localMsg) var err error qn, err = components.NewQueryNode(ctx, factory) if err != nil { @@ -227,7 +227,7 @@ func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool) *compone datacoord.Params.SetLogConfig(typeutil.DataCoordRole) } - factory := dependency.NewDefaultFactory(localMsg) + factory := dependency.NewFactory(localMsg) dctx := logutil.WithModule(ctx, "DataCoord") var err error @@ -261,7 +261,7 @@ func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, alias str datanode.Params.SetLogConfig(typeutil.DataNodeRole) } - factory := dependency.NewDefaultFactory(localMsg) + factory := dependency.NewFactory(localMsg) var err error dn, err = components.NewDataNode(ctx, factory) if err != nil { @@ -292,7 +292,7 @@ func (mr *MilvusRoles) runIndexCoord(ctx context.Context, localMsg bool) *compon indexcoord.Params.SetLogConfig(typeutil.IndexCoordRole) } - factory := dependency.NewDefaultFactory(localMsg) + factory := dependency.NewFactory(localMsg) var err error is, err = components.NewIndexCoord(ctx, factory) @@ -325,7 +325,7 @@ func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool, alias st indexnode.Params.SetLogConfig(typeutil.IndexNodeRole) } - factory := dependency.NewDefaultFactory(localMsg) + factory := dependency.NewFactory(localMsg) var err error in, err = components.NewIndexNode(ctx, factory) @@ -355,12 +355,14 @@ func (mr *MilvusRoles) Run(local bool, alias string) { } Params.Init() - path, _ := Params.Load("_RocksmqPath") - err := rocksmqimpl.InitRocksMQ(path) - if err != nil { - panic(err) + if Params.RocksmqEnable() { + path, _ := Params.Load("_RocksmqPath") + err := rocksmqimpl.InitRocksMQ(path) + if err != nil { + panic(err) + } + defer stopRocksmq() } - defer stopRocksmq() if Params.EtcdCfg.UseEmbedEtcd { // Start etcd server. diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 5d5e33e06f..77ec6452b0 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -45,12 +45,21 @@ minio: bucketName: "a-bucket" # Bucket name in MinIO/S3 rootPath: files # The root path where the message is stored in MinIO/S3 +# Milvus supports three MQ: rocksmq(based on RockDB), Pulsar and Kafka, which should be reserved in config what you use. +# There is a note about enabling priority if we config multiple mq in this file +# 1. standalone(local) mode: rockskmq(default) > Pulsar > Kafka +# 2. cluster mode: Pulsar(default) > Kafka (rocksmq is unsupported) + # Related configuration of pulsar, used to manage Milvus logs of recent mutation operations, output streaming log, and provide log publish-subscribe services. pulsar: address: localhost # Address of pulsar port: 6650 # Port of pulsar maxMessageSize: 5242880 # 5 * 1024 * 1024 Bytes, Maximum size of each message in pulsar. +# If you want to enable kafka, needs to comment the pulsar configs +#kafka: +# brokerList: localhost1:9092,localhost2:9092,localhost3:9092 + rocksmq: path: /var/lib/milvus/rdb_data # The path where the message is stored in rocksmq rocksmqPageSize: 2147483648 # 2 GB, 2 * 1024 * 1024 * 1024 bytes, The size of each page of messages in rocksmq @@ -276,5 +285,5 @@ common: simdType: auto indexSliceSize: 4 # MB - storage: - vector: minio + storageType: minio + diff --git a/deployments/docker/dev/docker-compose.yml b/deployments/docker/dev/docker-compose.yml index cff4e0b9e4..cbf7e4d3b1 100644 --- a/deployments/docker/dev/docker-compose.yml +++ b/deployments/docker/dev/docker-compose.yml @@ -54,6 +54,31 @@ services: - "6831:6831/udp" - "16686:16686" + zookeeper: + image: wurstmeister/zookeeper:latest + ports: + - "2181:2181" + + kafka: + image: 'bitnami/kafka:3.1.0' + ports: + - '9092:9092' + environment: + - KAFKA_BROKER_ID=0 + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 + - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 + - ALLOW_PLAINTEXT_LISTENER=yes + # set kafka server config + - KAFKA_CFG_MAX_PARTITION_FETCH_BYTES=5242880 + - KAFKA_CFG_MAX_REQUEST_SIZE=5242880 + - KAFKA_CFG_MESSAGE_MAX_BYTES=5242880 + - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=5242880 + - KAFKA_CFG_FETCH_MESSAGE_MAX_BYTES=5242880 + - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true + depends_on: + - zookeeper + networks: default: name: milvus_dev diff --git a/go.mod b/go.mod index 46f03f9b22..38ae793c2a 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/apache/pulsar-client-go v0.6.1-0.20210728062540-29414db801a7 github.com/apache/thrift v0.15.0 github.com/bits-and-blooms/bloom/v3 v3.0.1 + github.com/confluentinc/confluent-kafka-go v1.8.2 // indirect github.com/containerd/cgroups v1.0.2 github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect diff --git a/go.sum b/go.sum index f1a1fbc805..ea9ce85fb1 100644 --- a/go.sum +++ b/go.sum @@ -121,6 +121,8 @@ github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/confluentinc/confluent-kafka-go v1.8.2 h1:PBdbvYpyOdFLehj8j+9ba7FL4c4Moxn79gy9cYKxG5E= +github.com/confluentinc/confluent-kafka-go v1.8.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 5489c94dd8..627858f579 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -249,6 +249,7 @@ func (s *Server) initSession() error { // Init change server state to Initializing func (s *Server) Init() error { atomic.StoreInt64(&s.isServing, ServerStateInitializing) + s.factory.Init(&Params) return s.initSession() } @@ -261,7 +262,6 @@ func (s *Server) Init() error { // 4. set server state to Healthy func (s *Server) Start() error { var err error - s.factory.Init(&Params) if err = s.initRootCoordClient(); err != nil { return err } diff --git a/internal/kv/mem/mem_kv.go b/internal/kv/mem/mem_kv.go index 050f458549..5cb137253f 100644 --- a/internal/kv/mem/mem_kv.go +++ b/internal/kv/mem/mem_kv.go @@ -97,6 +97,17 @@ func (kv *MemoryKV) LoadBytes(key string) ([]byte, error) { return item.(memoryKVItem).value.ByteSlice(), nil } +// Get return value if key exists, or return empty string +func (kv *MemoryKV) Get(key string) string { + kv.RLock() + defer kv.RUnlock() + item := kv.tree.Get(memoryKVItem{key: key}) + if item == nil { + return "" + } + return item.(memoryKVItem).value.String() +} + // LoadWithDefault loads an object with @key. If the object does not exist, @defaultValue will be returned. func (kv *MemoryKV) LoadWithDefault(key, defaultValue string) string { kv.RLock() diff --git a/internal/mq/msgstream/mq_factory.go b/internal/mq/msgstream/mq_factory.go index 2e580ea8b5..cda2101360 100644 --- a/internal/mq/msgstream/mq_factory.go +++ b/internal/mq/msgstream/mq_factory.go @@ -27,7 +27,7 @@ import ( rmqimplserver "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server" "github.com/apache/pulsar-client-go/pulsar" - + kafkawrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/kafka" puslarmqwrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/pulsar" rmqwrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/rmq" ) @@ -114,9 +114,39 @@ func NewRmsFactory(path string) *RmsFactory { ReceiveBufSize: 1024, RmqBufSize: 1024, } + err := rmqimplserver.InitRocksMQ(path) if err != nil { log.Error("init rmq error", zap.Error(err)) } return f } + +type KmsFactory struct { + dispatcherFactory ProtoUDFactory + KafkaAddress string + ReceiveBufSize int64 +} + +func (f *KmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) { + kafkaClient := kafkawrapper.NewKafkaClientInstance(f.KafkaAddress) + return NewMqMsgStream(ctx, f.ReceiveBufSize, -1, kafkaClient, f.dispatcherFactory.NewUnmarshalDispatcher()) +} + +func (f *KmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) { + kafkaClient := kafkawrapper.NewKafkaClientInstance(f.KafkaAddress) + return NewMqTtMsgStream(ctx, f.ReceiveBufSize, -1, kafkaClient, f.dispatcherFactory.NewUnmarshalDispatcher()) +} + +func (f *KmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error) { + return f.NewMsgStream(ctx) +} + +func NewKmsFactory(config *paramtable.KafkaConfig) Factory { + f := &KmsFactory{ + dispatcherFactory: ProtoUDFactory{}, + ReceiveBufSize: 1024, + KafkaAddress: config.Address, + } + return f +} diff --git a/internal/mq/msgstream/mq_kafka_msgstream_test.go b/internal/mq/msgstream/mq_kafka_msgstream_test.go new file mode 100644 index 0000000000..7a1132a803 --- /dev/null +++ b/internal/mq/msgstream/mq_kafka_msgstream_test.go @@ -0,0 +1,489 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package msgstream + +import ( + "context" + "fmt" + "log" + "sync" + "testing" + + "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" + + kafkawrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/kafka" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/stretchr/testify/assert" +) + +// Note: kafka does not support get all data when consuming from the earliest position again. +//func TestStream_KafkaTtMsgStream_NoSeek(t *testing.T) { +// kafkaAddress, _ := Params.Load("_KafkaBrokerList") +// c1 := funcutil.RandomString(8) +// producerChannels := []string{c1} +// consumerChannels := []string{c1} +// consumerSubName := funcutil.RandomString(8) +// +// msgPack0 := MsgPack{} +// msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) +// +// msgPack1 := MsgPack{} +// msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) +// msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 19)) +// +// msgPack2 := MsgPack{} +// msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5)) +// +// msgPack3 := MsgPack{} +// msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 14)) +// msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 9)) +// +// msgPack4 := MsgPack{} +// msgPack4.Msgs = append(msgPack4.Msgs, getTimeTickMsg(11)) +// +// msgPack5 := MsgPack{} +// msgPack5.Msgs = append(msgPack5.Msgs, getTimeTickMsg(15)) +// +// ctx := context.Background() +// inputStream := getKafkaInputStream(ctx, kafkaAddress, producerChannels) +// outputStream := getKafkaTtOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName) +// +// err := inputStream.Broadcast(&msgPack0) +// assert.Nil(t, err) +// err = inputStream.Produce(&msgPack1) +// assert.Nil(t, err) +// err = inputStream.Broadcast(&msgPack2) +// assert.Nil(t, err) +// err = inputStream.Produce(&msgPack3) +// assert.Nil(t, err) +// err = inputStream.Broadcast(&msgPack4) +// assert.Nil(t, err) +// err = inputStream.Broadcast(&msgPack5) +// assert.Nil(t, err) +// +// o1 := consumer(ctx, outputStream) +// o2 := consumer(ctx, outputStream) +// o3 := consumer(ctx, outputStream) +// +// t.Log(o1.BeginTs) +// t.Log(o2.BeginTs) +// t.Log(o3.BeginTs) +// outputStream.Close() +// +// outputStream2 := getKafkaTtOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName) +// p1 := consumer(ctx, outputStream2) +// p2 := consumer(ctx, outputStream2) +// p3 := consumer(ctx, outputStream2) +// t.Log(p1.BeginTs) +// t.Log(p2.BeginTs) +// t.Log(p3.BeginTs) +// outputStream2.Close() +// +// assert.Equal(t, o1.BeginTs, p1.BeginTs) +// assert.Equal(t, o2.BeginTs, p2.BeginTs) +// assert.Equal(t, o3.BeginTs, p3.BeginTs) +//} + +func TestStream_KafkaMsgStream_SeekToLast(t *testing.T) { + t.Skip("skip kafka test") + + kafkaAddress, _ := Params.Load("_KafkaBrokerList") + c := funcutil.RandomString(8) + producerChannels := []string{c} + consumerChannels := []string{c} + consumerSubName := funcutil.RandomString(8) + + msgPack := &MsgPack{} + ctx := context.Background() + inputStream := getKafkaInputStream(ctx, kafkaAddress, producerChannels) + defer inputStream.Close() + + for i := 0; i < 10; i++ { + insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i)) + msgPack.Msgs = append(msgPack.Msgs, insertMsg) + } + + // produce test data + err := inputStream.Produce(msgPack) + assert.Nil(t, err) + + // pick a seekPosition + var seekPosition *internalpb.MsgPosition + outputStream := getKafkaOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName) + for i := 0; i < 10; i++ { + result := consumer(ctx, outputStream) + assert.Equal(t, result.Msgs[0].ID(), int64(i)) + if i == 5 { + seekPosition = result.EndPositions[0] + break + } + } + outputStream.Close() + + // create a consumer can consume data from seek position to last msg + outputStream2 := getKafkaOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName) + lastMsgID, err := outputStream2.GetLatestMsgID(c) + defer outputStream2.Close() + assert.Nil(t, err) + + err = outputStream2.Seek([]*internalpb.MsgPosition{seekPosition}) + assert.Nil(t, err) + outputStream2.Start() + + cnt := 0 + var value int64 = 6 + hasMore := true + for hasMore { + select { + case <-ctx.Done(): + hasMore = false + case msgPack, ok := <-outputStream2.Chan(): + if !ok { + assert.Fail(t, "Should not reach here") + } + + assert.Equal(t, 1, len(msgPack.Msgs)) + for _, tsMsg := range msgPack.Msgs { + assert.Equal(t, value, tsMsg.ID()) + value++ + cnt++ + + ret, err := lastMsgID.LessOrEqualThan(tsMsg.Position().MsgID) + assert.Nil(t, err) + if ret { + hasMore = false + break + } + } + } + } + + assert.Equal(t, 4, cnt) +} + +func TestStream_KafkaTtMsgStream_Seek(t *testing.T) { + t.Skip("skip kafka test") + + kafkaAddress, _ := Params.Load("_KafkaBrokerList") + c1 := funcutil.RandomString(8) + producerChannels := []string{c1} + consumerChannels := []string{c1} + consumerSubName := funcutil.RandomString(8) + + msgPack0 := MsgPack{} + msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) + + msgPack1 := MsgPack{} + msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) + msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3)) + msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 19)) + + msgPack2 := MsgPack{} + msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5)) + + msgPack3 := MsgPack{} + msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 14)) + msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 9)) + + msgPack4 := MsgPack{} + msgPack4.Msgs = append(msgPack4.Msgs, getTimeTickMsg(11)) + + msgPack5 := MsgPack{} + msgPack5.Msgs = append(msgPack5.Msgs, getTsMsg(commonpb.MsgType_Insert, 12)) + msgPack5.Msgs = append(msgPack5.Msgs, getTsMsg(commonpb.MsgType_Insert, 13)) + + msgPack6 := MsgPack{} + msgPack6.Msgs = append(msgPack6.Msgs, getTimeTickMsg(15)) + + msgPack7 := MsgPack{} + msgPack7.Msgs = append(msgPack7.Msgs, getTimeTickMsg(20)) + + ctx := context.Background() + inputStream := getKafkaInputStream(ctx, kafkaAddress, producerChannels) + outputStream := getKafkaTtOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName) + + err := inputStream.Broadcast(&msgPack0) + assert.Nil(t, err) + err = inputStream.Produce(&msgPack1) + assert.Nil(t, err) + err = inputStream.Broadcast(&msgPack2) + assert.Nil(t, err) + err = inputStream.Produce(&msgPack3) + assert.Nil(t, err) + err = inputStream.Broadcast(&msgPack4) + assert.Nil(t, err) + err = inputStream.Produce(&msgPack5) + assert.Nil(t, err) + err = inputStream.Broadcast(&msgPack6) + assert.Nil(t, err) + err = inputStream.Broadcast(&msgPack7) + assert.Nil(t, err) + + receivedMsg := consumer(ctx, outputStream) + assert.Equal(t, len(receivedMsg.Msgs), 2) + assert.Equal(t, receivedMsg.BeginTs, uint64(0)) + assert.Equal(t, receivedMsg.EndTs, uint64(5)) + + assert.Equal(t, receivedMsg.StartPositions[0].Timestamp, uint64(0)) + assert.Equal(t, receivedMsg.EndPositions[0].Timestamp, uint64(5)) + + receivedMsg2 := consumer(ctx, outputStream) + assert.Equal(t, len(receivedMsg2.Msgs), 1) + assert.Equal(t, receivedMsg2.BeginTs, uint64(5)) + assert.Equal(t, receivedMsg2.EndTs, uint64(11)) + assert.Equal(t, receivedMsg2.StartPositions[0].Timestamp, uint64(5)) + assert.Equal(t, receivedMsg2.EndPositions[0].Timestamp, uint64(11)) + + receivedMsg3 := consumer(ctx, outputStream) + assert.Equal(t, len(receivedMsg3.Msgs), 3) + assert.Equal(t, receivedMsg3.BeginTs, uint64(11)) + assert.Equal(t, receivedMsg3.EndTs, uint64(15)) + assert.Equal(t, receivedMsg3.StartPositions[0].Timestamp, uint64(11)) + assert.Equal(t, receivedMsg3.EndPositions[0].Timestamp, uint64(15)) + + receivedMsg4 := consumer(ctx, outputStream) + assert.Equal(t, len(receivedMsg4.Msgs), 1) + assert.Equal(t, receivedMsg4.BeginTs, uint64(15)) + assert.Equal(t, receivedMsg4.EndTs, uint64(20)) + assert.Equal(t, receivedMsg4.StartPositions[0].Timestamp, uint64(15)) + assert.Equal(t, receivedMsg4.EndPositions[0].Timestamp, uint64(20)) + + outputStream.Close() + + outputStream = getKafkaTtOutputStreamAndSeek(ctx, kafkaAddress, receivedMsg3.StartPositions) + seekMsg := consumer(ctx, outputStream) + assert.Equal(t, len(seekMsg.Msgs), 3) + result := []uint64{14, 12, 13} + for i, msg := range seekMsg.Msgs { + assert.Equal(t, msg.BeginTs(), result[i]) + } + + seekMsg2 := consumer(ctx, outputStream) + assert.Equal(t, len(seekMsg2.Msgs), 1) + for _, msg := range seekMsg2.Msgs { + assert.Equal(t, msg.BeginTs(), uint64(19)) + } + + outputStream2 := getKafkaTtOutputStreamAndSeek(ctx, kafkaAddress, receivedMsg3.EndPositions) + seekMsg = consumer(ctx, outputStream2) + assert.Equal(t, len(seekMsg.Msgs), 1) + for _, msg := range seekMsg.Msgs { + assert.Equal(t, msg.BeginTs(), uint64(19)) + } + + inputStream.Close() + outputStream2.Close() +} + +func TestStream_KafkaTtMsgStream_1(t *testing.T) { + t.Skip("skip kafka test") + + kafkaAddress, _ := Params.Load("_KafkaBrokerList") + c1 := funcutil.RandomString(8) + c2 := funcutil.RandomString(8) + p1Channels := []string{c1} + p2Channels := []string{c2} + consumerChannels := []string{c1, c2} + consumerSubName := funcutil.RandomString(8) + + ctx := context.Background() + inputStream1 := getKafkaInputStream(ctx, kafkaAddress, p1Channels) + msgPacks1 := createRandMsgPacks(3, 10, 10) + assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1)) + + inputStream2 := getKafkaInputStream(ctx, kafkaAddress, p2Channels) + msgPacks2 := createRandMsgPacks(5, 10, 10) + assert.Nil(t, sendMsgPacks(inputStream2, msgPacks2)) + + // consume msg + outputStream := getKafkaTtOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName) + checkNMsgPack := func(t *testing.T, outputStream MsgStream, num int) int { + rcvMsg := 0 + for i := 0; i < num; i++ { + msgPack := consumer(ctx, outputStream) + rcvMsg += len(msgPack.Msgs) + if len(msgPack.Msgs) > 0 { + for _, msg := range msgPack.Msgs { + log.Println("msg type: ", msg.Type(), ", msg value: ", msg) + assert.Greater(t, msg.BeginTs(), msgPack.BeginTs) + assert.LessOrEqual(t, msg.BeginTs(), msgPack.EndTs) + } + } + } + return rcvMsg + } + msgCount := checkNMsgPack(t, outputStream, len(msgPacks1)/2) + cnt1 := (len(msgPacks1)/2 - 1) * len(msgPacks1[0].Msgs) + cnt2 := (len(msgPacks2)/2 - 1) * len(msgPacks2[0].Msgs) + assert.Equal(t, (cnt1 + cnt2), msgCount) + + inputStream1.Close() + inputStream2.Close() + outputStream.Close() +} + +func TestStream_KafkaTtMsgStream_2(t *testing.T) { + t.Skip("skip kafka test") + + kafkaAddress, _ := Params.Load("_KafkaBrokerList") + c1 := funcutil.RandomString(8) + c2 := funcutil.RandomString(8) + p1Channels := []string{c1} + p2Channels := []string{c2} + consumerChannels := []string{c1, c2} + consumerSubName := funcutil.RandomString(8) + + ctx := context.Background() + inputStream1 := getKafkaInputStream(ctx, kafkaAddress, p1Channels) + msgPacks1 := createRandMsgPacks(3, 10, 10) + assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1)) + + inputStream2 := getKafkaInputStream(ctx, kafkaAddress, p2Channels) + msgPacks2 := createRandMsgPacks(5, 10, 10) + assert.Nil(t, sendMsgPacks(inputStream2, msgPacks2)) + + // consume msg + log.Println("=============receive msg===================") + rcvMsgPacks := make([]*MsgPack, 0) + + resumeMsgPack := func(t *testing.T) int { + var outputStream MsgStream + msgCount := len(rcvMsgPacks) + if msgCount == 0 { + outputStream = getKafkaTtOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName) + } else { + outputStream = getKafkaTtOutputStreamAndSeek(ctx, kafkaAddress, rcvMsgPacks[msgCount-1].EndPositions) + } + msgPack := consumer(ctx, outputStream) + rcvMsgPacks = append(rcvMsgPacks, msgPack) + if len(msgPack.Msgs) > 0 { + for _, msg := range msgPack.Msgs { + log.Println("msg type: ", msg.Type(), ", msg value: ", msg) + assert.Greater(t, msg.BeginTs(), msgPack.BeginTs) + assert.LessOrEqual(t, msg.BeginTs(), msgPack.EndTs) + } + log.Println("================") + } + outputStream.Close() + return len(rcvMsgPacks[msgCount].Msgs) + } + + msgCount := 0 + for i := 0; i < len(msgPacks1)/2; i++ { + msgCount += resumeMsgPack(t) + } + cnt1 := (len(msgPacks1)/2 - 1) * len(msgPacks1[0].Msgs) + cnt2 := (len(msgPacks2)/2 - 1) * len(msgPacks2[0].Msgs) + assert.Equal(t, (cnt1 + cnt2), msgCount) + + inputStream1.Close() + inputStream2.Close() +} + +func TestStream_KafkaTtMsgStream_DataNodeTimetickMsgstream(t *testing.T) { + t.Skip("skip kafka test") + + kafkaAddress, _ := Params.Load("_KafkaBrokerList") + c1 := funcutil.RandomString(8) + p1Channels := []string{c1} + consumerChannels := []string{c1} + consumerSubName := funcutil.RandomString(8) + + ctx := context.Background() + + factory := ProtoUDFactory{} + kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress) + outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher()) + outputStream.AsConsumerWithPosition(consumerChannels, consumerSubName, mqwrapper.SubscriptionPositionLatest) + outputStream.Start() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + for { + select { + case <-ctx.Done(): + wg.Done() + return + case msgPack, ok := <-outputStream.Chan(): + assert.True(t, ok) + assert.NotNil(t, msgPack) + + if len(msgPack.Msgs) > 0 { + fmt.Println("msg===:", msgPack.Msgs[0]) + wg.Done() + return + } + } + } + }() + + inputStream1 := getKafkaInputStream(ctx, kafkaAddress, p1Channels) + msgPacks1 := createRandMsgPacks(2, 1, 1) + assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1)) + wg.Wait() + + defer outputStream.Close() + defer inputStream1.Close() +} + +func getKafkaInputStream(ctx context.Context, kafkaAddress string, producerChannels []string, opts ...RepackFunc) MsgStream { + factory := ProtoUDFactory{} + kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress) + inputStream, _ := NewMqMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher()) + inputStream.AsProducer(producerChannels) + for _, opt := range opts { + inputStream.SetRepackFunc(opt) + } + inputStream.Start() + return inputStream +} + +func getKafkaOutputStream(ctx context.Context, kafkaAddress string, consumerChannels []string, consumerSubName string) MsgStream { + factory := ProtoUDFactory{} + kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress) + outputStream, _ := NewMqMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher()) + outputStream.AsConsumer(consumerChannels, consumerSubName) + outputStream.Start() + return outputStream +} + +func getKafkaTtOutputStream(ctx context.Context, kafkaAddress string, consumerChannels []string, consumerSubName string) MsgStream { + factory := ProtoUDFactory{} + kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress) + outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher()) + outputStream.AsConsumer(consumerChannels, consumerSubName) + outputStream.Start() + return outputStream +} + +func getKafkaTtOutputStreamAndSeek(ctx context.Context, kafkaAddress string, positions []*MsgPosition) MsgStream { + factory := ProtoUDFactory{} + kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress) + outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher()) + consumerName := []string{} + for _, c := range positions { + consumerName = append(consumerName, c.ChannelName) + } + outputStream.AsConsumer(consumerName, funcutil.RandomString(8)) + outputStream.Seek(positions) + outputStream.Start() + return outputStream +} diff --git a/internal/mq/msgstream/mq_msgstream.go b/internal/mq/msgstream/mq_msgstream.go index 0959d82514..72a545a2bc 100644 --- a/internal/mq/msgstream/mq_msgstream.go +++ b/internal/mq/msgstream/mq_msgstream.go @@ -57,6 +57,7 @@ type mqMsgStream struct { consumerLock *sync.Mutex readerLock *sync.Mutex closed int32 + onceChan sync.Once } // NewMqMsgStream is used to generate a new mqMsgStream object @@ -179,15 +180,14 @@ func (ms *mqMsgStream) SetRepackFunc(repackFunc RepackFunc) { } func (ms *mqMsgStream) Start() { - for _, c := range ms.consumers { - ms.wait.Add(1) - go ms.receiveMsg(c) - } } func (ms *mqMsgStream) Close() { + ms.streamCancel() + ms.readerLock.Lock() ms.wait.Wait() + ms.readerLock.Unlock() for _, producer := range ms.producers { if producer != nil { @@ -521,6 +521,15 @@ func (ms *mqMsgStream) receiveMsg(consumer mqwrapper.Consumer) { } func (ms *mqMsgStream) Chan() <-chan *MsgPack { + ms.onceChan.Do(func() { + for _, c := range ms.consumers { + ms.readerLock.Lock() + ms.wait.Add(1) + ms.readerLock.Unlock() + go ms.receiveMsg(c) + } + }) + return ms.receiveBuf } @@ -648,12 +657,7 @@ func (ms *MqTtMsgStream) AsConsumerWithPosition(channels []string, subName strin } // Start will start a goroutine which keep carrying msg from pulsar/rocksmq to golang chan -func (ms *MqTtMsgStream) Start() { - if ms.consumers != nil { - ms.wait.Add(1) - go ms.bufMsgPackToChannel() - } -} +func (ms *MqTtMsgStream) Start() {} // Close will stop goroutine and free internal producers and consumers func (ms *MqTtMsgStream) Close() { @@ -911,3 +915,16 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { } return nil } + +func (ms *MqTtMsgStream) Chan() <-chan *MsgPack { + ms.onceChan.Do(func() { + if ms.consumers != nil { + ms.readerLock.Lock() + ms.wait.Add(1) + ms.readerLock.Unlock() + go ms.bufMsgPackToChannel() + } + }) + + return ms.receiveBuf +} diff --git a/internal/mq/msgstream/mq_msgstream_test.go b/internal/mq/msgstream/mq_msgstream_test.go index 043b87b81e..5a9607a250 100644 --- a/internal/mq/msgstream/mq_msgstream_test.go +++ b/internal/mq/msgstream/mq_msgstream_test.go @@ -92,7 +92,8 @@ func (f *fixture) setup() []parameters { rmqClient, _ := rmq.NewClientWithDefaultOptions() parameters := []parameters{ - {pulsarClient}, {rmqClient}, + {pulsarClient}, + {rmqClient}, } return parameters } diff --git a/internal/mq/msgstream/mqwrapper/consumer.go b/internal/mq/msgstream/mqwrapper/consumer.go index b8ea37d352..9132db786d 100644 --- a/internal/mq/msgstream/mqwrapper/consumer.go +++ b/internal/mq/msgstream/mqwrapper/consumer.go @@ -59,11 +59,12 @@ type Consumer interface { // Seek to the uniqueID position Seek(MessageID, bool) error //nolint:govet - // Make sure that msg is received. Only used in pulsar + // Ack make sure that msg is received Ack(Message) // Close consumer Close() + // GetLatestMsgID return the latest message ID GetLatestMsgID() (MessageID, error) } diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go new file mode 100644 index 0000000000..d468781210 --- /dev/null +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go @@ -0,0 +1,108 @@ +package kafka + +import ( + "strconv" + + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" + "go.uber.org/zap" +) + +type kafkaClient struct { + // more configs you can see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md + basicConfig kafka.ConfigMap +} + +func NewKafkaClientInstance(address string) *kafkaClient { + config := kafka.ConfigMap{ + "bootstrap.servers": address, + "socket.timeout.ms": 300000, + "socket.max.fails": 3, + //"receive.message.max.bytes": 10485760, + "api.version.request": true, + } + + return &kafkaClient{basicConfig: config} +} + +func cloneKafkaConfig(config kafka.ConfigMap) *kafka.ConfigMap { + newConfig := make(kafka.ConfigMap) + for k, v := range config { + newConfig[k] = v + } + return &newConfig +} + +func (kc *kafkaClient) newProducerConfig() *kafka.ConfigMap { + newConf := cloneKafkaConfig(kc.basicConfig) + // default max message size 5M + newConf.SetKey("message.max.bytes", 10485760) + newConf.SetKey("compression.codec", "zstd") + newConf.SetKey("go.events.channel.size", 0) + newConf.SetKey("go.produce.channel.size", 0) + return newConf +} + +func (kc *kafkaClient) newConsumerConfig(group string, offset mqwrapper.SubscriptionInitialPosition) *kafka.ConfigMap { + newConf := cloneKafkaConfig(kc.basicConfig) + + if offset == mqwrapper.SubscriptionPositionEarliest { + newConf.SetKey("auto.offset.reset", "earliest") + } else { + newConf.SetKey("auto.offset.reset", "latest") + } + + newConf.SetKey("session.timeout.ms", 180000) + newConf.SetKey("group.id", group) + newConf.SetKey("enable.auto.commit", false) + + //Kafka default will not create topics if consumer's the topics don't exist. + //In order to compatible with other MQ, we need to enable the following configuration, + //meanwhile, some implementation also try to consume a non-exist topic, such as dataCoordTimeTick. + newConf.SetKey("allow.auto.create.topics", true) + + //newConf.SetKey("enable.partition.eof", true) + newConf.SetKey("go.events.channel.enable", true) + return newConf +} + +func (kc *kafkaClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwrapper.Producer, error) { + config := kc.newProducerConfig() + pp, err := kafka.NewProducer(config) + if err != nil { + log.Error("kafka create sync producer , error", zap.Error(err)) + return nil, err + } + + deliveryChan := make(chan kafka.Event, 128) + producer := &kafkaProducer{p: pp, deliveryChan: deliveryChan, topic: options.Topic} + return producer, nil +} + +func (kc *kafkaClient) Subscribe(options mqwrapper.ConsumerOptions) (mqwrapper.Consumer, error) { + config := kc.newConsumerConfig(options.SubscriptionName, options.SubscriptionInitialPosition) + consumer := newKafkaConsumer(config, options.Topic, options.SubscriptionName) + return consumer, nil +} + +func (kc *kafkaClient) EarliestMessageID() mqwrapper.MessageID { + return &kafkaID{messageID: int64(kafka.OffsetBeginning)} +} + +func (kc *kafkaClient) StringToMsgID(id string) (mqwrapper.MessageID, error) { + offset, err := strconv.ParseInt(id, 10, 64) + if err != nil { + return nil, err + } + + return &kafkaID{messageID: offset}, nil +} + +func (kc *kafkaClient) BytesToMsgID(id []byte) (mqwrapper.MessageID, error) { + offset := DeserializeKafkaID(id) + return &kafkaID{messageID: offset}, nil +} + +func (kc *kafkaClient) Close() { +} diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go new file mode 100644 index 0000000000..2f75c19632 --- /dev/null +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go @@ -0,0 +1,418 @@ +package kafka + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "math/rand" + "os" + "testing" + "time" + + "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" + "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +var Params paramtable.BaseTable + +func TestMain(m *testing.M) { + Params.Init() + exitCode := m.Run() + os.Exit(exitCode) +} +func IntToBytes(n int) []byte { + tmp := int32(n) + bytesBuffer := bytes.NewBuffer([]byte{}) + binary.Write(bytesBuffer, common.Endian, tmp) + return bytesBuffer.Bytes() +} +func BytesToInt(b []byte) int { + bytesBuffer := bytes.NewBuffer(b) + var tmp int32 + binary.Read(bytesBuffer, common.Endian, &tmp) + return int(tmp) +} + +// Consume1 will consume random messages and record the last MessageID it received +func Consume1(ctx context.Context, t *testing.T, kc *kafkaClient, topic string, subName string, c chan mqwrapper.MessageID, total *int) { + consumer, err := kc.Subscribe(mqwrapper.ConsumerOptions{ + Topic: topic, + SubscriptionName: subName, + BufSize: 1024, + SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + assert.NotNil(t, consumer) + defer consumer.Close() + + // get random number between 1 ~ 5 + rand.Seed(time.Now().UnixNano()) + cnt := 1 + rand.Int()%5 + + log.Info("Consume1 start") + var msg mqwrapper.Message + for i := 0; i < cnt; i++ { + select { + case <-ctx.Done(): + log.Info("Consume1 channel closed") + return + case msg = <-consumer.Chan(): + if msg == nil { + return + } + + log.Info("Consume1 RECV", zap.Any("v", BytesToInt(msg.Payload()))) + consumer.Ack(msg) + (*total)++ + } + } + + c <- msg.ID() + log.Info("Consume1 randomly RECV", zap.Any("number", cnt)) + log.Info("Consume1 done") +} + +// Consume2 will consume messages from specified MessageID +func Consume2(ctx context.Context, t *testing.T, kc *kafkaClient, topic string, subName string, msgID mqwrapper.MessageID, total *int) { + consumer, err := kc.Subscribe(mqwrapper.ConsumerOptions{ + Topic: topic, + SubscriptionName: subName, + BufSize: 1024, + SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + assert.NotNil(t, consumer) + defer consumer.Close() + + err = consumer.Seek(msgID, true) + assert.Nil(t, err) + + mm := <-consumer.Chan() + consumer.Ack(mm) + log.Info("skip the last received message", zap.Any("skip msg", mm.ID())) + + log.Info("Consume2 start") + for { + select { + case <-ctx.Done(): + log.Info("Consume2 channel closed") + return + case msg, ok := <-consumer.Chan(): + if msg == nil || !ok { + return + } + + log.Info("Consume2 RECV", zap.Any("v", BytesToInt(msg.Payload()))) + consumer.Ack(msg) + (*total)++ + } + } +} + +func Consume3(ctx context.Context, t *testing.T, kc *kafkaClient, topic string, subName string, total *int) { + consumer, err := kc.Subscribe(mqwrapper.ConsumerOptions{ + Topic: topic, + SubscriptionName: subName, + BufSize: 1024, + SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + assert.NotNil(t, consumer) + defer consumer.Close() + + log.Info("Consume3 start") + for { + select { + case <-ctx.Done(): + log.Info("Consume3 channel closed") + return + case msg, ok := <-consumer.Chan(): + if msg == nil || !ok { + return + } + + consumer.Ack(msg) + (*total)++ + log.Info("Consume3 RECV", zap.Any("v", BytesToInt(msg.Payload()))) + } + } +} + +func TestKafkaClient_ConsumeWithAck(t *testing.T) { + kc := createKafkaClient(t) + defer kc.Close() + assert.NotNil(t, kc) + + rand.Seed(time.Now().UnixNano()) + topic := fmt.Sprintf("test-topic-%d", rand.Int()) + subName := fmt.Sprintf("test-subname-%d", rand.Int()) + arr := []int{111, 222, 333, 444, 555, 666, 777} + c := make(chan mqwrapper.MessageID, 1) + + ctx, cancel := context.WithCancel(context.Background()) + + var total1 int + var total2 int + var total3 int + + producer := createProducer(t, kc, topic) + defer producer.Close() + produceData(ctx, t, producer, arr) + time.Sleep(100 * time.Millisecond) + + ctx1, cancel1 := context.WithTimeout(ctx, 5*time.Second) + defer cancel1() + Consume1(ctx1, t, kc, topic, subName, c, &total1) + + lastMsgID := <-c + log.Info("lastMsgID", zap.Any("lastMsgID", lastMsgID.(*kafkaID).messageID)) + + ctx2, cancel2 := context.WithTimeout(ctx, 3*time.Second) + Consume2(ctx2, t, kc, topic, subName, lastMsgID, &total2) + cancel2() + + time.Sleep(5 * time.Second) + ctx3, cancel3 := context.WithTimeout(ctx, 3*time.Second) + Consume3(ctx3, t, kc, topic, subName, &total3) + cancel3() + + cancel() + assert.Equal(t, len(arr), total1+total2) + + assert.Equal(t, len(arr), total3) +} + +func ConsumeFromEarliestToRandomPosition(ctx context.Context, t *testing.T, kc *kafkaClient, topic string, subName string, c chan mqwrapper.MessageID, total *int) { + consumer, err := kc.Subscribe(mqwrapper.ConsumerOptions{ + Topic: topic, + SubscriptionName: subName, + BufSize: 1024, + SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + assert.NotNil(t, consumer) + defer consumer.Close() + + // get random number between 1 ~ 5 + rand.Seed(time.Now().UnixNano()) + cnt := 1 + rand.Int()%5 + + log.Info("Consume1 channel start") + var msg mqwrapper.Message + for i := 0; i < cnt; i++ { + select { + case <-ctx.Done(): + log.Info("Consume1 channel closed") + return + case msg = <-consumer.Chan(): + if msg == nil { + continue + } + + v := BytesToInt(msg.Payload()) + log.Info("Consume1 RECV", zap.Any("v", v)) + (*total)++ + } + } + + c <- &kafkaID{messageID: msg.ID().(*kafkaID).messageID} + + log.Info("Consume1 randomly RECV", zap.Any("number", cnt)) + log.Info("Consume1 done") +} + +// Consume2 will consume messages from specified MessageID +func consumeFromSpecifiedPositionToEnd(ctx context.Context, t *testing.T, kc *kafkaClient, topic string, subName string, msgID mqwrapper.MessageID, total *int) { + consumer, err := kc.Subscribe(mqwrapper.ConsumerOptions{ + Topic: topic, + SubscriptionName: subName, + BufSize: 1024, + SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + assert.NotNil(t, consumer) + defer consumer.Close() + + err = consumer.Seek(msgID, false) + assert.Nil(t, err) + + log.Info("Consume2 start") + for { + select { + case <-ctx.Done(): + log.Info("Consume2 channel closed") + return + case msg, ok := <-consumer.Chan(): + if msg == nil || !ok { + return + } + + v := BytesToInt(msg.Payload()) + log.Info("Consume2 RECV", zap.Any("v", v)) + (*total)++ + } + } + +} + +func ConsumeFromEarliestToEndPosition(ctx context.Context, t *testing.T, kc *kafkaClient, topic string, subName string, total *int) { + consumer, err := kc.Subscribe(mqwrapper.ConsumerOptions{ + Topic: topic, + SubscriptionName: subName, + BufSize: 1024, + SubscriptionInitialPosition: mqwrapper.SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + assert.NotNil(t, consumer) + defer consumer.Close() + + log.Info("Consume3 start") + for { + select { + case <-ctx.Done(): + log.Info("Consume3 channel closed") + return + case msg, ok := <-consumer.Chan(): + if msg == nil || !ok { + return + } + v := BytesToInt(msg.Payload()) + log.Info("Consume3 RECV", zap.Any("v", v)) + (*total)++ + } + } +} + +func TestKafkaClient_ConsumeNoAck(t *testing.T) { + kc := createKafkaClient(t) + defer kc.Close() + assert.NotNil(t, kc) + + rand.Seed(time.Now().UnixNano()) + topic := fmt.Sprintf("test-topic-%d", rand.Int()) + subName := fmt.Sprintf("test-subname-%d", rand.Int()) + + var total1 int + var total2 int + var total3 int + + arr := []int{111, 222, 333, 444, 555, 666, 777} + ctx, cancel := context.WithCancel(context.Background()) + + producer := createProducer(t, kc, topic) + defer producer.Close() + produceData(ctx, t, producer, arr) + time.Sleep(100 * time.Millisecond) + + ctx1, cancel1 := context.WithTimeout(ctx, 5*time.Second) + defer cancel1() + + c := make(chan mqwrapper.MessageID, 1) + ConsumeFromEarliestToRandomPosition(ctx1, t, kc, topic, subName, c, &total1) + + // record the last received message id + lastMsgID := <-c + log.Info("msg", zap.Any("lastMsgID", lastMsgID)) + + ctx2, cancel2 := context.WithTimeout(ctx, 5*time.Second) + defer cancel2() + consumeFromSpecifiedPositionToEnd(ctx2, t, kc, topic, subName, lastMsgID, &total2) + + ctx3, cancel3 := context.WithTimeout(ctx, 5*time.Second) + defer cancel3() + ConsumeFromEarliestToEndPosition(ctx3, t, kc, topic, subName, &total3) + + cancel() + + //TODO enable, it seems that ack is unavailable + //assert.Equal(t, len(arr)*2, total1+total2) + + assert.Equal(t, len(arr), total3) +} + +func TestKafkaClient_SeekPosition(t *testing.T) { + kc := createKafkaClient(t) + defer kc.Close() + + rand.Seed(time.Now().UnixNano()) + ctx := context.Background() + topic := fmt.Sprintf("test-topic-%d", rand.Int()) + subName := fmt.Sprintf("test-subname-%d", rand.Int()) + + producer := createProducer(t, kc, topic) + defer producer.Close() + + data := []int{1, 2, 3} + ids := produceData(ctx, t, producer, data) + + consumer := createConsumer(t, kc, topic, subName, mqwrapper.SubscriptionPositionLatest) + defer consumer.Close() + + err := consumer.Seek(ids[2], true) + assert.Nil(t, err) + + select { + case msg := <-consumer.Chan(): + consumer.Ack(msg) + assert.Equal(t, 3, BytesToInt(msg.Payload())) + case <-time.After(10 * time.Second): + assert.FailNow(t, "should not wait") + } +} + +func TestKafkaClient_EarliestMessageID(t *testing.T) { + kafkaAddress, _ := Params.Load("_KafkaBrokerList") + kc := NewKafkaClientInstance(kafkaAddress) + defer kc.Close() + + mid := kc.EarliestMessageID() + assert.NotNil(t, mid) +} + +func createKafkaClient(t *testing.T) *kafkaClient { + kafkaAddress, _ := Params.Load("_KafkaBrokerList") + kc := NewKafkaClientInstance(kafkaAddress) + assert.NotNil(t, kc) + return kc +} + +func createConsumer(t *testing.T, + kc *kafkaClient, + topic string, + groupID string, + initPosition mqwrapper.SubscriptionInitialPosition) mqwrapper.Consumer { + consumer, err := kc.Subscribe(mqwrapper.ConsumerOptions{ + Topic: topic, + SubscriptionName: groupID, + BufSize: 1024, + SubscriptionInitialPosition: initPosition, + }) + assert.Nil(t, err) + return consumer +} + +func createProducer(t *testing.T, kc *kafkaClient, topic string) mqwrapper.Producer { + producer, err := kc.CreateProducer(mqwrapper.ProducerOptions{Topic: topic}) + assert.Nil(t, err) + assert.NotNil(t, producer) + return producer +} + +func produceData(ctx context.Context, t *testing.T, producer mqwrapper.Producer, arr []int) []mqwrapper.MessageID { + var msgIDs []mqwrapper.MessageID + for _, v := range arr { + msg := &mqwrapper.ProducerMessage{ + Payload: IntToBytes(v), + Properties: map[string]string{}, + } + msgID, err := producer.Send(ctx, msg) + msgIDs = append(msgIDs, msgID) + assert.Nil(t, err) + } + return msgIDs +} diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go new file mode 100644 index 0000000000..ef7a60d75e --- /dev/null +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer.go @@ -0,0 +1,162 @@ +package kafka + +import ( + "sync" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" + "go.uber.org/zap" +) + +type Consumer struct { + c *kafka.Consumer + config *kafka.ConfigMap + msgChannel chan mqwrapper.Message + hasSeek bool + isStarted bool + skipMsg bool + topic string + groupID string + closeCh chan struct{} + chanOnce sync.Once + closeOnce sync.Once +} + +func newKafkaConsumer(config *kafka.ConfigMap, topic string, groupID string) *Consumer { + closeCh := make(chan struct{}) + msgChannel := make(chan mqwrapper.Message, 256) + + kafkaConsumer := &Consumer{ + config: config, + msgChannel: msgChannel, + topic: topic, + groupID: groupID, + closeCh: closeCh, + } + + kafkaConsumer.createKafkaConsumer() + return kafkaConsumer +} + +func (kc *Consumer) createKafkaConsumer() error { + var err error + kc.c, err = kafka.NewConsumer(kc.config) + if err != nil { + log.Fatal("create kafka consumer failed", zap.String("topic", kc.topic), zap.Error(err)) + return err + } + return nil +} + +func (kc *Consumer) startReceiveMsgTask() { + if kc.isStarted { + return + } + + if !kc.hasSeek { + tps := []kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx}} + if err := kc.c.Assign(tps); err != nil { + log.Error("kafka consumer assign failed ", zap.String("topic name", kc.topic), zap.Error(err)) + panic(err) + } + } + + go func() { + for ev := range kc.c.Events() { + switch e := ev.(type) { + case *kafka.Message: + if kc.skipMsg { + kc.skipMsg = false + continue + } + + kc.msgChannel <- &kafkaMessage{msg: e} + case kafka.Error: + log.Error("read msg failed", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Error(e)) + } + } + + if kc.msgChannel != nil { + close(kc.msgChannel) + } + }() + + kc.isStarted = true +} + +func (kc *Consumer) Subscription() string { + return kc.groupID +} + +// Chan provides a channel to read consumed message. +// confluent-kafka-go recommend us to use function-based consumer, +// channel-based consumer API had already deprecated, see more details +// https://github.com/confluentinc/confluent-kafka-go. +func (kc *Consumer) Chan() <-chan mqwrapper.Message { + kc.chanOnce.Do(func() { + kc.startReceiveMsgTask() + }) + return kc.msgChannel +} + +func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error { + offset := kafka.Offset(id.(*kafkaID).messageID) + log.Debug("kafka consumer seek ", zap.String("topic name", kc.topic), + zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive)) + + err := kc.c.Assign([]kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}}) + if err != nil { + log.Error("kafka consumer assign failed ", zap.String("topic name", kc.topic), zap.Any("Msg offset", offset), zap.Error(err)) + return err + } + + // If seek timeout is not 0 the call twice will return error isStarted RD_KAFKA_RESP_ERR__STATE. + // if the timeout is 0 it will initiate the seek but return immediately without any error reporting + kc.skipMsg = !inclusive + if err := kc.c.Seek(kafka.TopicPartition{ + Topic: &kc.topic, + Partition: mqwrapper.DefaultPartitionIdx, + Offset: offset}, 1000); err != nil { + return err + } + + kc.hasSeek = true + kc.startReceiveMsgTask() + return nil +} + +func (kc *Consumer) Ack(message mqwrapper.Message) { + kc.c.Commit() +} + +func (kc *Consumer) GetLatestMsgID() (mqwrapper.MessageID, error) { + low, high, err := kc.c.QueryWatermarkOffsets(kc.topic, mqwrapper.DefaultPartitionIdx, -1) + if err != nil { + return nil, err + } + + // Current high value is next offset of the latest message ID, in order to keep + // semantics consistency with the latest message ID, the high value need to move forward. + if high > 0 { + high = high - 1 + } + + log.Debug("get latest msg ID ", zap.Any("topic", kc.topic), zap.Int64("oldest offset", low), zap.Int64("latest offset", high)) + return &kafkaID{messageID: high}, nil +} + +func (kc *Consumer) Close() { + kc.closeOnce.Do(func() { + start := time.Now() + + // FIXME we should not use goroutine to close consumer, it will be fix after pr https://github.com/confluentinc/confluent-kafka-go/pull/757 + go kc.c.Close() + + cost := time.Since(start).Milliseconds() + if cost > 500 { + log.Debug("kafka consumer is closed ", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Int64("time cost(ms)", cost)) + } + }) +} diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go new file mode 100644 index 0000000000..6f5682ea29 --- /dev/null +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go @@ -0,0 +1,102 @@ +package kafka + +import ( + "context" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/stretchr/testify/assert" +) + +func TestKafkaConsumer_Subscription(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + groupID := fmt.Sprintf("test-groupid-%d", rand.Int()) + topic := fmt.Sprintf("test-topicName-%d", rand.Int()) + + config := createConfig(groupID) + kc := newKafkaConsumer(config, topic, groupID) + defer kc.Close() + assert.Equal(t, kc.Subscription(), groupID) +} + +func TestKafkaConsumer_Chan(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + groupID := fmt.Sprintf("test-groupid-%d", rand.Int()) + topic := fmt.Sprintf("test-topicName-%d", rand.Int()) + + config := createConfig(groupID) + consumer := newKafkaConsumer(config, topic, groupID) + defer consumer.Close() + + data := []int{111, 222, 333} + testKafkaConsumerProduceData(t, topic, data) + + msgID := &kafkaID{messageID: 1} + err := consumer.Seek(msgID, false) + assert.Nil(t, err) + + msg := <-consumer.Chan() + assert.Equal(t, 333, BytesToInt(msg.Payload())) + assert.Equal(t, int64(2), msg.ID().(*kafkaID).messageID) + assert.Equal(t, topic, msg.Topic()) + assert.True(t, len(msg.Properties()) == 0) +} + +func TestKafkaConsumer_GetSeek(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + groupID := fmt.Sprintf("test-groupid-%d", rand.Int()) + topic := fmt.Sprintf("test-topicName-%d", rand.Int()) + + config := createConfig(groupID) + consumer := newKafkaConsumer(config, topic, groupID) + defer consumer.Close() + + msgID := &kafkaID{messageID: 0} + err := consumer.Seek(msgID, false) + assert.Nil(t, err) +} + +func TestKafkaConsumer_GetLatestMsgID(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + groupID := fmt.Sprintf("test-groupid-%d", rand.Int()) + topic := fmt.Sprintf("test-topicName-%d", rand.Int()) + + config := createConfig(groupID) + consumer := newKafkaConsumer(config, topic, groupID) + defer consumer.Close() + + latestMsgID, err := consumer.GetLatestMsgID() + assert.Nil(t, latestMsgID) + assert.NotNil(t, err) + + data := []int{111, 222, 333} + testKafkaConsumerProduceData(t, topic, data) + + latestMsgID, err = consumer.GetLatestMsgID() + assert.Equal(t, int64(2), latestMsgID.(*kafkaID).messageID) + assert.Nil(t, err) +} + +func testKafkaConsumerProduceData(t *testing.T, topic string, data []int) { + ctx := context.Background() + kc := createKafkaClient(t) + defer kc.Close() + producer := createProducer(t, kc, topic) + defer producer.Close() + + produceData(ctx, t, producer, data) +} + +func createConfig(groupID string) *kafka.ConfigMap { + kafkaAddress, _ := Params.Load("_KafkaBrokerList") + return &kafka.ConfigMap{ + "bootstrap.servers": kafkaAddress, + "group.id": groupID, + "auto.offset.reset": "earliest", + "api.version.request": "true", + "go.events.channel.enable": true, + } +} diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_id.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_id.go new file mode 100644 index 0000000000..5a5f7b4474 --- /dev/null +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_id.go @@ -0,0 +1,34 @@ +package kafka + +import ( + "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" +) + +type kafkaID struct { + messageID int64 +} + +var _ mqwrapper.MessageID = &kafkaID{} + +func (kid *kafkaID) Serialize() []byte { + return SerializeKafkaID(kid.messageID) +} + +func (kid *kafkaID) AtEarliestPosition() bool { + return kid.messageID <= 0 +} + +func (kid *kafkaID) LessOrEqualThan(msgID []byte) (bool, error) { + return kid.messageID <= DeserializeKafkaID(msgID), nil +} + +func SerializeKafkaID(messageID int64) []byte { + b := make([]byte, 8) + common.Endian.PutUint64(b, uint64(messageID)) + return b +} + +func DeserializeKafkaID(messageID []byte) int64 { + return int64(common.Endian.Uint64(messageID)) +} diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_id_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_id_test.go new file mode 100644 index 0000000000..ea6fde3d2f --- /dev/null +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_id_test.go @@ -0,0 +1,56 @@ +package kafka + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestKafkaID_Serialize(t *testing.T) { + rid := &kafkaID{messageID: 8} + bin := rid.Serialize() + assert.NotNil(t, bin) + assert.NotZero(t, len(bin)) +} + +func TestKafkaID_AtEarliestPosition(t *testing.T) { + rid := &kafkaID{messageID: 8} + assert.False(t, rid.AtEarliestPosition()) + + rid = &kafkaID{messageID: 0} + assert.True(t, rid.AtEarliestPosition()) +} + +func TestKafkaID_LessOrEqualThan(t *testing.T) { + { + rid1 := &kafkaID{messageID: 8} + rid2 := &kafkaID{messageID: 0} + ret, err := rid1.LessOrEqualThan(rid2.Serialize()) + assert.Nil(t, err) + assert.False(t, ret) + + ret, err = rid2.LessOrEqualThan(rid1.Serialize()) + assert.Nil(t, err) + assert.True(t, ret) + } + + { + rid1 := &kafkaID{messageID: 0} + rid2 := &kafkaID{messageID: 0} + ret, err := rid1.LessOrEqualThan(rid2.Serialize()) + assert.Nil(t, err) + assert.True(t, ret) + } +} + +func Test_SerializeKafkaID(t *testing.T) { + bin := SerializeKafkaID(10) + assert.NotNil(t, bin) + assert.NotZero(t, len(bin)) +} + +func Test_DeserializeKafkaID(t *testing.T) { + bin := SerializeKafkaID(5) + id := DeserializeKafkaID(bin) + assert.Equal(t, id, int64(5)) +} diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_message.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_message.go new file mode 100644 index 0000000000..2d144739e9 --- /dev/null +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_message.go @@ -0,0 +1,27 @@ +package kafka + +import ( + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" +) + +type kafkaMessage struct { + msg *kafka.Message +} + +func (km *kafkaMessage) Topic() string { + return *km.msg.TopicPartition.Topic +} + +func (km *kafkaMessage) Properties() map[string]string { + return nil +} + +func (km *kafkaMessage) Payload() []byte { + return km.msg.Value +} + +func (km *kafkaMessage) ID() mqwrapper.MessageID { + kid := &kafkaID{messageID: int64(km.msg.TopicPartition.Offset)} + return kid +} diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_message_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_message_test.go new file mode 100644 index 0000000000..e38523a880 --- /dev/null +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_message_test.go @@ -0,0 +1,18 @@ +package kafka + +import ( + "testing" + + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/stretchr/testify/assert" +) + +func TestKafkaMessage_All(t *testing.T) { + topic := "t" + msg := &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0, Offset: 0}, Value: nil} + km := &kafkaMessage{msg: msg} + assert.Equal(t, topic, km.Topic()) + assert.Equal(t, int64(0), km.ID().(*kafkaID).messageID) + assert.Nil(t, km.Payload()) + assert.Nil(t, km.Properties()) +} diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go new file mode 100644 index 0000000000..bad354a6c7 --- /dev/null +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go @@ -0,0 +1,59 @@ +package kafka + +import ( + "context" + "sync" + "time" + + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" +) + +type kafkaProducer struct { + p *kafka.Producer + topic string + deliveryChan chan kafka.Event + closeOnce sync.Once +} + +func (kp *kafkaProducer) Topic() string { + return kp.topic +} + +func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMessage) (mqwrapper.MessageID, error) { + err := kp.p.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: mqwrapper.DefaultPartitionIdx}, + Value: message.Payload, + }, kp.deliveryChan) + + if err != nil { + return nil, err + } + + e := <-kp.deliveryChan + m := e.(*kafka.Message) + if m.TopicPartition.Error != nil { + return nil, m.TopicPartition.Error + } + + return &kafkaID{messageID: int64(m.TopicPartition.Offset)}, nil +} + +func (kp *kafkaProducer) Close() { + kp.closeOnce.Do(func() { + start := time.Now() + //flush in-flight msg within queue. + kp.p.Flush(10000) + + kp.p.Close() + close(kp.deliveryChan) + + cost := time.Since(start).Milliseconds() + if cost > 500 { + log.Debug("kafka producer is closed", zap.Any("topic", kp.topic), zap.Int64("time cost(ms)", cost)) + } + }) +} diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go new file mode 100644 index 0000000000..df48600266 --- /dev/null +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go @@ -0,0 +1,69 @@ +package kafka + +import ( + "context" + "errors" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" + + "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" + "github.com/stretchr/testify/assert" +) + +func TestKafkaProducer_SendSuccess(t *testing.T) { + kafkaAddress, _ := Params.Load("_KafkaBrokerList") + kc := NewKafkaClientInstance(kafkaAddress) + defer kc.Close() + assert.NotNil(t, kc) + + rand.Seed(time.Now().UnixNano()) + topic := fmt.Sprintf("test-topic-%d", rand.Int()) + + producer, err := kc.CreateProducer(mqwrapper.ProducerOptions{Topic: topic}) + assert.Nil(t, err) + assert.NotNil(t, producer) + + kafkaProd := producer.(*kafkaProducer) + assert.Equal(t, kafkaProd.Topic(), topic) + + msg2 := &mqwrapper.ProducerMessage{ + Payload: []byte{}, + Properties: map[string]string{}, + } + msgID, err := producer.Send(context.TODO(), msg2) + assert.Nil(t, err) + assert.NotNil(t, msgID) + + producer.Close() +} + +func TestKafkaProducer_SendFail(t *testing.T) { + kafkaAddress, _ := Params.Load("_KafkaBrokerList") + { + + deliveryChan := make(chan kafka.Event, 1) + rand.Seed(time.Now().UnixNano()) + topic := fmt.Sprintf("test-topic-%d", rand.Int()) + + pp, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": kafkaAddress}) + assert.Nil(t, err) + producer := &kafkaProducer{p: pp, deliveryChan: deliveryChan, topic: topic} + + msg := &mqwrapper.ProducerMessage{ + Payload: []byte{1}, + Properties: map[string]string{}, + } + var resultMsg kafka.Event = &kafka.Message{TopicPartition: kafka.TopicPartition{Error: errors.New("error")}} + deliveryChan <- resultMsg + + ret, err := producer.Send(context.TODO(), msg) + assert.Nil(t, ret) + assert.NotNil(t, err) + + producer.Close() + } +} diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 1a35b5cd3d..8967a69abc 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -160,6 +160,8 @@ func (qc *QueryCoord) Init() error { return idAllocator.AllocOne() } + qc.factory.Init(&Params) + // init meta qc.meta, initError = newMeta(qc.loopCtx, qc.kvClient, qc.factory, qc.idAllocator) if initError != nil { @@ -217,7 +219,6 @@ func (qc *QueryCoord) Init() error { // Start function starts the goroutines to watch the meta and node updates func (qc *QueryCoord) Start() error { - qc.factory.Init(&Params) qc.scheduler.Start() log.Debug("start scheduler ...") diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 3d95b21ad9..5fb1311e86 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -467,9 +467,6 @@ func (c *Core) getSegments(ctx context.Context, collID typeutil.UniqueID) (map[t } func (c *Core) setMsgStreams() error { - if Params.PulsarCfg.Address == "" { - return fmt.Errorf("pulsar address is empty") - } if Params.CommonCfg.RootCoordSubName == "" { return fmt.Errorf("RootCoordSubName is empty") } diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 10195effd0..e7c3744cbf 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -709,9 +709,10 @@ func TestRootCoord_Base(t *testing.T) { timeTickStream, _ := tmpFactory.NewMsgStream(ctx) timeTickStream.AsConsumer([]string{Params.CommonCfg.RootCoordTimeTick}, Params.CommonCfg.RootCoordSubName) timeTickStream.Start() + defer timeTickStream.Close() dmlStream, _ := tmpFactory.NewMsgStream(ctx) - clearMsgChan(1500*time.Millisecond, dmlStream.Chan()) + defer dmlStream.Close() core.SetEtcdClient(etcdCli) diff --git a/internal/util/dependency/factory.go b/internal/util/dependency/factory.go index 3040871508..02296e3823 100644 --- a/internal/util/dependency/factory.go +++ b/internal/util/dependency/factory.go @@ -23,14 +23,25 @@ func NewDefaultFactory(standAlone bool) *DefaultFactory { } } +func NewFactory(standAlone bool) *DefaultFactory { + return &DefaultFactory{standAlone: standAlone} +} + +// Init create a msg factory(TODO only support one mq at the same time.) +// In order to guarantee backward compatibility of config file, we still support multiple mq configs. +// 1. Rocksmq only run on local mode, and it has the highest priority +// 2. Pulsar has higher priority than Kafka within remote msg func (f *DefaultFactory) Init(params *paramtable.ComponentParam) { - if f.standAlone { - path, _ := params.Load("_RocksmqPath") - f.msgStreamFactory = msgstream.NewRmsFactory(path) + // skip if using default factory + if f.msgStreamFactory != nil { + return + } + + // init storage + if params.CommonCfg.StorageType == "local" { f.chunkManagerFactory = storage.NewChunkManagerFactory("local", "local", storage.RootPath(params.LocalStorageCfg.Path)) } else { - f.msgStreamFactory = msgstream.NewPmsFactory(¶ms.PulsarCfg) f.chunkManagerFactory = storage.NewChunkManagerFactory("local", "minio", storage.RootPath(params.LocalStorageCfg.Path), storage.Address(params.MinioCfg.Address), @@ -40,6 +51,44 @@ func (f *DefaultFactory) Init(params *paramtable.ComponentParam) { storage.BucketName(params.MinioCfg.BucketName), storage.CreateBucket(true)) } + + // init mq storage + if f.standAlone { + f.msgStreamFactory = f.initMQLocalService(params) + if f.msgStreamFactory == nil { + f.msgStreamFactory = f.initMQRemoteService(params) + if f.msgStreamFactory == nil { + panic("no available mq configuration, must config rocksmq, Pulsar or Kafka at least one of these!") + } + } + return + } + + f.msgStreamFactory = f.initMQRemoteService(params) + if f.msgStreamFactory == nil { + panic("no available remote mq configuration, must config Pulsar or Kafka at least one of these!") + } +} + +func (f *DefaultFactory) initMQLocalService(params *paramtable.ComponentParam) msgstream.Factory { + if params.RocksmqEnable() { + path, _ := params.Load("_RocksmqPath") + return msgstream.NewRmsFactory(path) + } + return nil +} + +// initRemoteService Pulsar has higher priority than Kafka. +func (f *DefaultFactory) initMQRemoteService(params *paramtable.ComponentParam) msgstream.Factory { + if params.PulsarEnable() { + return msgstream.NewPmsFactory(¶ms.PulsarCfg) + } + + if params.KafkaEnable() { + return msgstream.NewKmsFactory(¶ms.KafkaCfg) + } + + return nil } func (f *DefaultFactory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) { diff --git a/internal/util/paramtable/base_table.go b/internal/util/paramtable/base_table.go index b54b18d2b2..761ab66399 100644 --- a/internal/util/paramtable/base_table.go +++ b/internal/util/paramtable/base_table.go @@ -225,6 +225,10 @@ func (gp *BaseTable) LoadYaml(fileName string) error { return nil } +func (gp *BaseTable) Get(key string) string { + return gp.params.Get(strings.ToLower(key)) +} + func (gp *BaseTable) Remove(key string) error { return gp.params.Remove(strings.ToLower(key)) } @@ -413,12 +417,23 @@ func (gp *BaseTable) SetLogger(id UniqueID) { } } +func (gp *BaseTable) loadKafkaConfig() { + brokerList := os.Getenv("KAFKA_BROKER_LIST") + if brokerList == "" { + brokerList = gp.Get("kafka.brokerList") + } + gp.Save("_KafkaBrokerList", brokerList) +} + func (gp *BaseTable) loadPulsarConfig() { pulsarAddress := os.Getenv("PULSAR_ADDRESS") if pulsarAddress == "" { - pulsarHost := gp.LoadWithDefault("pulsar.address", DefaultPulsarHost) - port := gp.LoadWithDefault("pulsar.port", DefaultPulsarPort) - pulsarAddress = "pulsar://" + pulsarHost + ":" + port + pulsarHost := gp.Get("pulsar.address") + port := gp.Get("pulsar.port") + + if len(pulsarHost) != 0 && len(port) != 0 { + pulsarAddress = "pulsar://" + pulsarHost + ":" + port + } } gp.Save("_PulsarAddress", pulsarAddress) @@ -427,13 +442,14 @@ func (gp *BaseTable) loadPulsarConfig() { func (gp *BaseTable) loadRocksMQConfig() { rocksmqPath := os.Getenv("ROCKSMQ_PATH") if rocksmqPath == "" { - rocksmqPath = gp.LoadWithDefault("rocksmq.path", DefaultRocksmqPath) + rocksmqPath = gp.Get("rocksmq.path") } gp.Save("_RocksmqPath", rocksmqPath) } func (gp *BaseTable) loadMQConfig() { gp.loadPulsarConfig() + gp.loadKafkaConfig() gp.loadRocksMQConfig() } diff --git a/internal/util/paramtable/base_table_test.go b/internal/util/paramtable/base_table_test.go index 0ceb35d5cf..b7b7241502 100644 --- a/internal/util/paramtable/base_table_test.go +++ b/internal/util/paramtable/base_table_test.go @@ -136,6 +136,17 @@ func TestBaseTable_Remove(t *testing.T) { assert.Nil(t, err6) } +func TestBaseTable_Get(t *testing.T) { + err := baseParams.Save("key", "10") + assert.Nil(t, err) + + v := baseParams.Get("key") + assert.Equal(t, "10", v) + + v2 := baseParams.Get("none") + assert.Equal(t, "", v2) +} + func TestBaseTable_LoadYaml(t *testing.T) { err := baseParams.LoadYaml("milvus.yaml") assert.Nil(t, err) diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index c1b2367606..f923b16c24 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -87,6 +87,10 @@ func (p *ComponentParam) PulsarEnable() bool { return p.PulsarCfg.Address != "" } +func (p *ComponentParam) KafkaEnable() bool { + return p.KafkaCfg.Address != "" +} + /////////////////////////////////////////////////////////////////////////////// // --- common --- type commonConfig struct { @@ -120,6 +124,7 @@ type commonConfig struct { SimdType string IndexSliceSize int64 + StorageType string } func (p *commonConfig) init(base *BaseTable) { @@ -154,6 +159,7 @@ func (p *commonConfig) init(base *BaseTable) { p.initSimdType() p.initIndexSliceSize() + p.initStorageType() } func (p *commonConfig) initClusterPrefix() { @@ -334,6 +340,10 @@ func (p *commonConfig) initIndexSliceSize() { p.IndexSliceSize = p.Base.ParseInt64WithDefault("common.indexSliceSize", DefaultIndexSliceSize) } +func (p *commonConfig) initStorageType() { + p.StorageType = p.Base.LoadWithDefault("storageType", "minio") +} + /////////////////////////////////////////////////////////////////////////////// // --- rootcoord --- type rootCoordConfig struct { diff --git a/internal/util/paramtable/service_param.go b/internal/util/paramtable/service_param.go index 41199933b6..de63b06b00 100644 --- a/internal/util/paramtable/service_param.go +++ b/internal/util/paramtable/service_param.go @@ -39,6 +39,7 @@ type ServiceParam struct { LocalStorageCfg LocalStorageConfig EtcdCfg EtcdConfig PulsarCfg PulsarConfig + KafkaCfg KafkaConfig RocksmqCfg RocksmqConfig MinioCfg MinioConfig } @@ -49,6 +50,7 @@ func (p *ServiceParam) Init() { p.LocalStorageCfg.init(&p.BaseTable) p.EtcdCfg.init(&p.BaseTable) p.PulsarCfg.init(&p.BaseTable) + p.KafkaCfg.init(&p.BaseTable) p.RocksmqCfg.init(&p.BaseTable) p.MinioCfg.init(&p.BaseTable) } @@ -200,6 +202,25 @@ func (p *PulsarConfig) initMaxMessageSize() { } } +// --- kafka --- +type KafkaConfig struct { + Base *BaseTable + Address string +} + +func (k *KafkaConfig) init(base *BaseTable) { + k.Base = base + k.initAddress() +} + +func (k *KafkaConfig) initAddress() { + addr, err := k.Base.Load("_KafkaBrokerList") + if err != nil { + panic(err) + } + k.Address = addr +} + /////////////////////////////////////////////////////////////////////////////// // --- rocksmq --- type RocksmqConfig struct { diff --git a/scripts/run_go_codecov.sh b/scripts/run_go_codecov.sh index d0c99ba61d..870896eed7 100755 --- a/scripts/run_go_codecov.sh +++ b/scripts/run_go_codecov.sh @@ -19,7 +19,7 @@ FILE_COVERAGE_INFO="go_coverage.txt" FILE_COVERAGE_HTML="go_coverage.html" -set -e +set -ex echo "mode: atomic" > ${FILE_COVERAGE_INFO} # run unittest @@ -27,18 +27,18 @@ echo "mode: atomic" > ${FILE_COVERAGE_INFO} echo "Running unittest under ./internal" if [[ "$(uname -s)" == "Darwin" ]]; then export MallocNanoZone=0 - for d in $(go list ./internal/... | grep -v -e vendor -e internal/querycoord -e /metricsinfo -e internal/proxy -e internal/querynode); do + for d in $(go list ./internal/... | grep -v -e vendor -e kafka -e internal/querycoord -e /metricsinfo -e internal/proxy -e internal/querynode); do go test -race -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" if [ -f profile.out ]; then - sed '1d' profile.out >> ${FILE_COVERAGE_INFO} + grep -v kafka profile.out | sed '1d' >> ${FILE_COVERAGE_INFO} rm profile.out fi done else - for d in $(go list ./internal/... | grep -v vendor); do + for d in $(go list ./internal/... | grep -v -e vendor -e kafka); do go test -race -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" if [ -f profile.out ]; then - sed '1d' profile.out >> ${FILE_COVERAGE_INFO} + grep -v kafka profile.out | sed '1d' >> ${FILE_COVERAGE_INFO} rm profile.out fi done diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index 7bf411daae..93fee6659d 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -38,7 +38,7 @@ echo "Running go unittest under $MILVUS_DIR" go test -race -cover "${MILVUS_DIR}/allocator/..." -failfast go test -race -cover "${MILVUS_DIR}/kv/..." -failfast -go test -race -cover "${MILVUS_DIR}/mq/..." -failfast +go test -race -cover $(go list "${MILVUS_DIR}/mq/..." | grep -v kafka) -failfast go test -race -cover "${MILVUS_DIR}/storage" -failfast go test -race -cover "${MILVUS_DIR}/tso/..." -failfast go test -race -cover "${MILVUS_DIR}/util/funcutil/..." -failfast