diff --git a/internal/mq/msgstream/mq_kafka_msgstream_test.go b/internal/mq/msgstream/mq_kafka_msgstream_test.go index fbfa3a9768..c59112a04e 100644 --- a/internal/mq/msgstream/mq_kafka_msgstream_test.go +++ b/internal/mq/msgstream/mq_kafka_msgstream_test.go @@ -22,7 +22,8 @@ import ( "log" "sync" "testing" - "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" @@ -36,7 +37,7 @@ import ( // Note: kafka does not support get all data when consuming from the earliest position again. //func TestStream_KafkaTtMsgStream_NoSeek(t *testing.T) { -// kafkaAddress := Params.Get("kafka.brokerList") +// kafkaAddress := getKafkaBrokerList() // c1 := funcutil.RandomString(8) // producerChannels := []string{c1} // consumerChannels := []string{c1} @@ -102,14 +103,8 @@ import ( // assert.Equal(t, o3.BeginTs, p3.BeginTs) //} -func skipTest(t *testing.T) { - t.Skip("skip kafka test") -} - func TestStream_KafkaMsgStream_SeekToLast(t *testing.T) { - skipTest(t) - - kafkaAddress := Params.Get("kafka.brokerList") + kafkaAddress := getKafkaBrokerList() c := funcutil.RandomString(8) producerChannels := []string{c} consumerChannels := []string{c} @@ -184,9 +179,7 @@ func TestStream_KafkaMsgStream_SeekToLast(t *testing.T) { } func TestStream_KafkaTtMsgStream_Seek(t *testing.T) { - skipTest(t) - - kafkaAddress := Params.Get("kafka.brokerList") + kafkaAddress := getKafkaBrokerList() c1 := funcutil.RandomString(8) producerChannels := []string{c1} consumerChannels := []string{c1} @@ -298,9 +291,7 @@ func TestStream_KafkaTtMsgStream_Seek(t *testing.T) { } func TestStream_KafkaTtMsgStream_1(t *testing.T) { - skipTest(t) - - kafkaAddress := Params.Get("kafka.brokerList") + kafkaAddress := getKafkaBrokerList() c1 := funcutil.RandomString(8) c2 := funcutil.RandomString(8) p1Channels := []string{c1} @@ -345,9 +336,7 @@ func TestStream_KafkaTtMsgStream_1(t *testing.T) { } func TestStream_KafkaTtMsgStream_2(t *testing.T) { - skipTest(t) - - kafkaAddress := Params.Get("kafka.brokerList") + kafkaAddress := getKafkaBrokerList() c1 := funcutil.RandomString(8) c2 := funcutil.RandomString(8) p1Channels := []string{c1} @@ -403,9 +392,7 @@ func TestStream_KafkaTtMsgStream_2(t *testing.T) { } func TestStream_KafkaTtMsgStream_DataNodeTimetickMsgstream(t *testing.T) { - skipTest(t) - - kafkaAddress := Params.Get("kafka.brokerList") + kafkaAddress := getKafkaBrokerList() c1 := funcutil.RandomString(8) p1Channels := []string{c1} consumerChannels := []string{c1} @@ -440,11 +427,8 @@ func TestStream_KafkaTtMsgStream_DataNodeTimetickMsgstream(t *testing.T) { } }() - // make producer start to produce messages after invoking Chan - time.Sleep(5 * time.Second) - inputStream1 := getKafkaInputStream(ctx, kafkaAddress, p1Channels) - msgPacks1 := createRandMsgPacks(2, 1, 1) + msgPacks1 := createRandMsgPacks(2, 10, 1) assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1)) wg.Wait() @@ -454,7 +438,14 @@ func TestStream_KafkaTtMsgStream_DataNodeTimetickMsgstream(t *testing.T) { func getKafkaInputStream(ctx context.Context, kafkaAddress string, producerChannels []string, opts ...RepackFunc) MsgStream { factory := ProtoUDFactory{} - kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress) + config := kafka.ConfigMap{ + "bootstrap.servers": kafkaAddress, + "socket.timeout.ms": 500, + "socket.max.fails": 2, + "api.version.request": true, + "linger.ms": 10, + } + kafkaClient := kafkawrapper.NewKafkaClientInstanceWithConfigMap(config) inputStream, _ := NewMqMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher()) inputStream.AsProducer(producerChannels) for _, opt := range opts { diff --git a/internal/mq/msgstream/mq_msgstream_test.go b/internal/mq/msgstream/mq_msgstream_test.go index 33558a97a3..e0c758e555 100644 --- a/internal/mq/msgstream/mq_msgstream_test.go +++ b/internal/mq/msgstream/mq_msgstream_test.go @@ -29,6 +29,8 @@ import ( "time" "unsafe" + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server" "go.uber.org/atomic" @@ -53,6 +55,15 @@ var Params paramtable.ComponentParam func TestMain(m *testing.M) { Params.Init() + mockKafkaCluster, err := kafka.NewMockCluster(1) + defer mockKafkaCluster.Close() + if err != nil { + fmt.Printf("Failed to create MockCluster: %s\n", err) + os.Exit(1) + } + broker := mockKafkaCluster.BootstrapServers() + Params.Save("kafka.brokerList", broker) + exitCode := m.Run() os.Exit(exitCode) } @@ -66,6 +77,12 @@ func getPulsarAddress() string { panic("invalid pulsar address") } +func getKafkaBrokerList() string { + brokerList := Params.Get("kafka.brokerList") + log.Printf("kafka broker list: %s", brokerList) + return brokerList +} + type fixture struct { t *testing.T etcdKV *etcdkv.EtcdKV diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go index 34fcbded64..b69ce344b3 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_client.go @@ -34,6 +34,10 @@ func NewKafkaClientInstance(address string) *kafkaClient { return &kafkaClient{basicConfig: config} } +func NewKafkaClientInstanceWithConfigMap(config kafka.ConfigMap) *kafkaClient { + return &kafkaClient{basicConfig: config} +} + func NewKafkaClientInstanceWithConfig(config *paramtable.KafkaConfig) *kafkaClient { kafkaConfig := getBasicConfig(config.Address) diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go index 51beb7d084..ba7bf236ff 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_client_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" @@ -22,9 +24,26 @@ var Params paramtable.BaseTable func TestMain(m *testing.M) { Params.Init() + mockCluster, err := kafka.NewMockCluster(1) + defer mockCluster.Close() + if err != nil { + fmt.Printf("Failed to create MockCluster: %s\n", err) + os.Exit(1) + } + + broker := mockCluster.BootstrapServers() + Params.Save("kafka.brokerList", broker) + exitCode := m.Run() os.Exit(exitCode) } + +func getKafkaBrokerList() string { + brokerList := Params.Get("kafka.brokerList") + log.Info("get kafka broker list.", zap.String("address", brokerList)) + return brokerList +} + func IntToBytes(n int) []byte { tmp := int32(n) bytesBuffer := bytes.NewBuffer([]byte{}) @@ -251,7 +270,7 @@ func TestKafkaClient_ConsumeFromLatest(t *testing.T) { } func TestKafkaClient_EarliestMessageID(t *testing.T) { - kafkaAddress := Params.Get("kafka.brokerList") + kafkaAddress := getKafkaBrokerList() kc := NewKafkaClientInstance(kafkaAddress) defer kc.Close() @@ -260,7 +279,7 @@ func TestKafkaClient_EarliestMessageID(t *testing.T) { } func TestKafkaClient_MsgSerializAndDeserialize(t *testing.T) { - kafkaAddress := Params.Get("kafka.brokerList") + kafkaAddress := getKafkaBrokerList() kc := NewKafkaClientInstance(kafkaAddress) defer kc.Close() @@ -292,7 +311,7 @@ func TestKafkaClient_NewKafkaClientInstanceWithConfig(t *testing.T) { } func createKafkaClient(t *testing.T) *kafkaClient { - kafkaAddress := Params.Get("kafka.brokerList") + kafkaAddress := getKafkaBrokerList() kc := NewKafkaClientInstance(kafkaAddress) assert.NotNil(t, kc) return kc diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go index 5cedc35b7e..2a1b284efe 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go @@ -95,8 +95,8 @@ func TestKafkaConsumer_GetLatestMsgID(t *testing.T) { defer consumer.Close() latestMsgID, err := consumer.GetLatestMsgID() - assert.Nil(t, latestMsgID) - assert.NotNil(t, err) + assert.Equal(t, int64(0), latestMsgID.(*kafkaID).messageID) + assert.Nil(t, err) data := []int{111, 222, 333} testKafkaConsumerProduceData(t, topic, data) @@ -129,6 +129,12 @@ func TestKafkaConsumer_ConsumeFromLatest(t *testing.T) { assert.Equal(t, 555, BytesToInt(msg.Payload())) } +func TestKafkaConsumer_createKafkaConsumer(t *testing.T) { + consumer := &Consumer{config: &kafka.ConfigMap{}} + err := consumer.createKafkaConsumer() + assert.NotNil(t, err) +} + func testKafkaConsumerProduceData(t *testing.T, topic string, data []int) { ctx := context.Background() kc := createKafkaClient(t) @@ -138,11 +144,11 @@ func testKafkaConsumerProduceData(t *testing.T, topic string, data []int) { produceData(ctx, t, producer, data) - time.Sleep(5 * time.Second) + producer.(*kafkaProducer).p.Flush(500) } func createConfig(groupID string) *kafka.ConfigMap { - kafkaAddress := Params.Get("kafka.brokerList") + kafkaAddress := getKafkaBrokerList() return &kafka.ConfigMap{ "bootstrap.servers": kafkaAddress, "group.id": groupID, diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go index 5731ef49d1..3c1df35cba 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer_test.go @@ -15,7 +15,7 @@ import ( ) func TestKafkaProducer_SendSuccess(t *testing.T) { - kafkaAddress := Params.Get("kafka.brokerList") + kafkaAddress := getKafkaBrokerList() kc := NewKafkaClientInstance(kafkaAddress) defer kc.Close() assert.NotNil(t, kc) @@ -42,7 +42,7 @@ func TestKafkaProducer_SendSuccess(t *testing.T) { } func TestKafkaProducer_SendFail(t *testing.T) { - kafkaAddress := Params.Get("kafka.brokerList") + kafkaAddress := getKafkaBrokerList() { deliveryChan := make(chan kafka.Event, 1) diff --git a/scripts/run_go_codecov.sh b/scripts/run_go_codecov.sh index 07fe007a08..cc4cc8e503 100755 --- a/scripts/run_go_codecov.sh +++ b/scripts/run_go_codecov.sh @@ -30,7 +30,7 @@ echo "Running unittest under ./internal" if [[ $(uname -s) == "Darwin" && "$(uname -m)" == "arm64" ]]; then APPLE_SILICON_FLAG="-tags dynamic" fi -for d in $(go list ./internal/... | grep -v -e vendor -e kafka -e planparserv2/generated); do +for d in $(go list ./internal/... | grep -v -e vendor -e planparserv2/generated); do go test -race ${APPLE_SILICON_FLAG} -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" if [ -f profile.out ]; then grep -v kafka profile.out | grep -v planparserv2/generated | sed '1d' >> ${FILE_COVERAGE_INFO} diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index bb026612ce..81faa408bb 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -36,7 +36,7 @@ echo "Running go unittest under $MILVUS_DIR" go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/allocator/..." -failfast go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/kv/..." -failfast -go test -race -cover ${APPLE_SILICON_FLAG} $(go list "${MILVUS_DIR}/mq/..." | grep -v kafka) -failfast +go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/mq/..." -failfast go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/storage" -failfast go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/tso/..." -failfast go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/config/..." -failfast