Enable kafka ut with a mocked kafka server (#18366)

Signed-off-by: yun.zhang <yun.zhang@zilliz.com>
This commit is contained in:
jaime 2022-08-03 19:06:35 +08:00 committed by GitHub
parent 28aadc988d
commit 4a32c1bb8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 74 additions and 37 deletions

View File

@ -22,7 +22,8 @@ import (
"log"
"sync"
"testing"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
@ -36,7 +37,7 @@ import (
// Note: kafka does not support get all data when consuming from the earliest position again.
//func TestStream_KafkaTtMsgStream_NoSeek(t *testing.T) {
// kafkaAddress := Params.Get("kafka.brokerList")
// kafkaAddress := getKafkaBrokerList()
// c1 := funcutil.RandomString(8)
// producerChannels := []string{c1}
// consumerChannels := []string{c1}
@ -102,14 +103,8 @@ import (
// assert.Equal(t, o3.BeginTs, p3.BeginTs)
//}
func skipTest(t *testing.T) {
t.Skip("skip kafka test")
}
func TestStream_KafkaMsgStream_SeekToLast(t *testing.T) {
skipTest(t)
kafkaAddress := Params.Get("kafka.brokerList")
kafkaAddress := getKafkaBrokerList()
c := funcutil.RandomString(8)
producerChannels := []string{c}
consumerChannels := []string{c}
@ -184,9 +179,7 @@ func TestStream_KafkaMsgStream_SeekToLast(t *testing.T) {
}
func TestStream_KafkaTtMsgStream_Seek(t *testing.T) {
skipTest(t)
kafkaAddress := Params.Get("kafka.brokerList")
kafkaAddress := getKafkaBrokerList()
c1 := funcutil.RandomString(8)
producerChannels := []string{c1}
consumerChannels := []string{c1}
@ -298,9 +291,7 @@ func TestStream_KafkaTtMsgStream_Seek(t *testing.T) {
}
func TestStream_KafkaTtMsgStream_1(t *testing.T) {
skipTest(t)
kafkaAddress := Params.Get("kafka.brokerList")
kafkaAddress := getKafkaBrokerList()
c1 := funcutil.RandomString(8)
c2 := funcutil.RandomString(8)
p1Channels := []string{c1}
@ -345,9 +336,7 @@ func TestStream_KafkaTtMsgStream_1(t *testing.T) {
}
func TestStream_KafkaTtMsgStream_2(t *testing.T) {
skipTest(t)
kafkaAddress := Params.Get("kafka.brokerList")
kafkaAddress := getKafkaBrokerList()
c1 := funcutil.RandomString(8)
c2 := funcutil.RandomString(8)
p1Channels := []string{c1}
@ -403,9 +392,7 @@ func TestStream_KafkaTtMsgStream_2(t *testing.T) {
}
func TestStream_KafkaTtMsgStream_DataNodeTimetickMsgstream(t *testing.T) {
skipTest(t)
kafkaAddress := Params.Get("kafka.brokerList")
kafkaAddress := getKafkaBrokerList()
c1 := funcutil.RandomString(8)
p1Channels := []string{c1}
consumerChannels := []string{c1}
@ -440,11 +427,8 @@ func TestStream_KafkaTtMsgStream_DataNodeTimetickMsgstream(t *testing.T) {
}
}()
// make producer start to produce messages after invoking Chan
time.Sleep(5 * time.Second)
inputStream1 := getKafkaInputStream(ctx, kafkaAddress, p1Channels)
msgPacks1 := createRandMsgPacks(2, 1, 1)
msgPacks1 := createRandMsgPacks(2, 10, 1)
assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1))
wg.Wait()
@ -454,7 +438,14 @@ func TestStream_KafkaTtMsgStream_DataNodeTimetickMsgstream(t *testing.T) {
func getKafkaInputStream(ctx context.Context, kafkaAddress string, producerChannels []string, opts ...RepackFunc) MsgStream {
factory := ProtoUDFactory{}
kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress)
config := kafka.ConfigMap{
"bootstrap.servers": kafkaAddress,
"socket.timeout.ms": 500,
"socket.max.fails": 2,
"api.version.request": true,
"linger.ms": 10,
}
kafkaClient := kafkawrapper.NewKafkaClientInstanceWithConfigMap(config)
inputStream, _ := NewMqMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher())
inputStream.AsProducer(producerChannels)
for _, opt := range opts {

View File

@ -29,6 +29,8 @@ import (
"time"
"unsafe"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"go.uber.org/atomic"
@ -53,6 +55,15 @@ var Params paramtable.ComponentParam
func TestMain(m *testing.M) {
Params.Init()
mockKafkaCluster, err := kafka.NewMockCluster(1)
defer mockKafkaCluster.Close()
if err != nil {
fmt.Printf("Failed to create MockCluster: %s\n", err)
os.Exit(1)
}
broker := mockKafkaCluster.BootstrapServers()
Params.Save("kafka.brokerList", broker)
exitCode := m.Run()
os.Exit(exitCode)
}
@ -66,6 +77,12 @@ func getPulsarAddress() string {
panic("invalid pulsar address")
}
func getKafkaBrokerList() string {
brokerList := Params.Get("kafka.brokerList")
log.Printf("kafka broker list: %s", brokerList)
return brokerList
}
type fixture struct {
t *testing.T
etcdKV *etcdkv.EtcdKV

View File

@ -34,6 +34,10 @@ func NewKafkaClientInstance(address string) *kafkaClient {
return &kafkaClient{basicConfig: config}
}
func NewKafkaClientInstanceWithConfigMap(config kafka.ConfigMap) *kafkaClient {
return &kafkaClient{basicConfig: config}
}
func NewKafkaClientInstanceWithConfig(config *paramtable.KafkaConfig) *kafkaClient {
kafkaConfig := getBasicConfig(config.Address)

View File

@ -10,6 +10,8 @@ import (
"testing"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
@ -22,9 +24,26 @@ var Params paramtable.BaseTable
func TestMain(m *testing.M) {
Params.Init()
mockCluster, err := kafka.NewMockCluster(1)
defer mockCluster.Close()
if err != nil {
fmt.Printf("Failed to create MockCluster: %s\n", err)
os.Exit(1)
}
broker := mockCluster.BootstrapServers()
Params.Save("kafka.brokerList", broker)
exitCode := m.Run()
os.Exit(exitCode)
}
func getKafkaBrokerList() string {
brokerList := Params.Get("kafka.brokerList")
log.Info("get kafka broker list.", zap.String("address", brokerList))
return brokerList
}
func IntToBytes(n int) []byte {
tmp := int32(n)
bytesBuffer := bytes.NewBuffer([]byte{})
@ -251,7 +270,7 @@ func TestKafkaClient_ConsumeFromLatest(t *testing.T) {
}
func TestKafkaClient_EarliestMessageID(t *testing.T) {
kafkaAddress := Params.Get("kafka.brokerList")
kafkaAddress := getKafkaBrokerList()
kc := NewKafkaClientInstance(kafkaAddress)
defer kc.Close()
@ -260,7 +279,7 @@ func TestKafkaClient_EarliestMessageID(t *testing.T) {
}
func TestKafkaClient_MsgSerializAndDeserialize(t *testing.T) {
kafkaAddress := Params.Get("kafka.brokerList")
kafkaAddress := getKafkaBrokerList()
kc := NewKafkaClientInstance(kafkaAddress)
defer kc.Close()
@ -292,7 +311,7 @@ func TestKafkaClient_NewKafkaClientInstanceWithConfig(t *testing.T) {
}
func createKafkaClient(t *testing.T) *kafkaClient {
kafkaAddress := Params.Get("kafka.brokerList")
kafkaAddress := getKafkaBrokerList()
kc := NewKafkaClientInstance(kafkaAddress)
assert.NotNil(t, kc)
return kc

View File

@ -95,8 +95,8 @@ func TestKafkaConsumer_GetLatestMsgID(t *testing.T) {
defer consumer.Close()
latestMsgID, err := consumer.GetLatestMsgID()
assert.Nil(t, latestMsgID)
assert.NotNil(t, err)
assert.Equal(t, int64(0), latestMsgID.(*kafkaID).messageID)
assert.Nil(t, err)
data := []int{111, 222, 333}
testKafkaConsumerProduceData(t, topic, data)
@ -129,6 +129,12 @@ func TestKafkaConsumer_ConsumeFromLatest(t *testing.T) {
assert.Equal(t, 555, BytesToInt(msg.Payload()))
}
func TestKafkaConsumer_createKafkaConsumer(t *testing.T) {
consumer := &Consumer{config: &kafka.ConfigMap{}}
err := consumer.createKafkaConsumer()
assert.NotNil(t, err)
}
func testKafkaConsumerProduceData(t *testing.T, topic string, data []int) {
ctx := context.Background()
kc := createKafkaClient(t)
@ -138,11 +144,11 @@ func testKafkaConsumerProduceData(t *testing.T, topic string, data []int) {
produceData(ctx, t, producer, data)
time.Sleep(5 * time.Second)
producer.(*kafkaProducer).p.Flush(500)
}
func createConfig(groupID string) *kafka.ConfigMap {
kafkaAddress := Params.Get("kafka.brokerList")
kafkaAddress := getKafkaBrokerList()
return &kafka.ConfigMap{
"bootstrap.servers": kafkaAddress,
"group.id": groupID,

View File

@ -15,7 +15,7 @@ import (
)
func TestKafkaProducer_SendSuccess(t *testing.T) {
kafkaAddress := Params.Get("kafka.brokerList")
kafkaAddress := getKafkaBrokerList()
kc := NewKafkaClientInstance(kafkaAddress)
defer kc.Close()
assert.NotNil(t, kc)
@ -42,7 +42,7 @@ func TestKafkaProducer_SendSuccess(t *testing.T) {
}
func TestKafkaProducer_SendFail(t *testing.T) {
kafkaAddress := Params.Get("kafka.brokerList")
kafkaAddress := getKafkaBrokerList()
{
deliveryChan := make(chan kafka.Event, 1)

View File

@ -30,7 +30,7 @@ echo "Running unittest under ./internal"
if [[ $(uname -s) == "Darwin" && "$(uname -m)" == "arm64" ]]; then
APPLE_SILICON_FLAG="-tags dynamic"
fi
for d in $(go list ./internal/... | grep -v -e vendor -e kafka -e planparserv2/generated); do
for d in $(go list ./internal/... | grep -v -e vendor -e planparserv2/generated); do
go test -race ${APPLE_SILICON_FLAG} -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d"
if [ -f profile.out ]; then
grep -v kafka profile.out | grep -v planparserv2/generated | sed '1d' >> ${FILE_COVERAGE_INFO}

View File

@ -36,7 +36,7 @@ echo "Running go unittest under $MILVUS_DIR"
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/allocator/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/kv/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} $(go list "${MILVUS_DIR}/mq/..." | grep -v kafka) -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/mq/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/storage" -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/tso/..." -failfast
go test -race -cover ${APPLE_SILICON_FLAG} "${MILVUS_DIR}/config/..." -failfast