diff --git a/Makefile b/Makefile index 61e4271260..36d94c1ef4 100644 --- a/Makefile +++ b/Makefile @@ -321,6 +321,10 @@ test-tso: @echo "Running go unittests..." @(env bash $(PWD)/scripts/run_go_unittest.sh -t tso) +test-pkg: + @echo "Running go unittests..." + @(env bash $(PWD)/scripts/run_go_unittest.sh -t pkg) + test-kv: @echo "Running go unittests..." @(env bash $(PWD)/scripts/run_go_unittest.sh -t kv) diff --git a/internal/datacoord/compaction_policy_forcemerge.go b/internal/datacoord/compaction_policy_forcemerge.go index 2d7563f423..0d11537be1 100644 --- a/internal/datacoord/compaction_policy_forcemerge.go +++ b/internal/datacoord/compaction_policy_forcemerge.go @@ -4,10 +4,10 @@ import ( "context" "fmt" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/session" "github.com/milvus-io/milvus/internal/types" @@ -117,7 +117,6 @@ func (policy *forceMergeCompactionPolicy) triggerOneCollection( } views = append(views, view) } - return views, triggerID, nil log.Info("force merge triggered", zap.Int("viewCount", len(views))) return views, triggerID, nil diff --git a/internal/datacoord/compaction_view_forcemerge.go b/internal/datacoord/compaction_view_forcemerge.go index 01c76068b6..17ac708dcc 100644 --- a/internal/datacoord/compaction_view_forcemerge.go +++ b/internal/datacoord/compaction_view_forcemerge.go @@ -21,10 +21,11 @@ import ( "math" "time" - "github.com/milvus-io/milvus/pkg/v2/log" - "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/samber/lo" "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) const ( diff --git a/internal/datacoord/compaction_view_forcemerge_test.go b/internal/datacoord/compaction_view_forcemerge_test.go index be6523ae5b..c46de67c18 100644 --- a/internal/datacoord/compaction_view_forcemerge_test.go +++ b/internal/datacoord/compaction_view_forcemerge_test.go @@ -22,7 +22,6 @@ import ( "time" "github.com/samber/lo" - "github.com/stretchr/testify/assert" ) diff --git a/pkg/mq/msgstream/mq_kafka_msgstream_test.go b/pkg/mq/msgstream/mq_kafka_msgstream_test.go deleted file mode 100644 index 6fe4ddcdf3..0000000000 --- a/pkg/mq/msgstream/mq_kafka_msgstream_test.go +++ /dev/null @@ -1,498 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package msgstream - -import ( - "context" - "log" - "sync" - "testing" - - "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus/pkg/v2/mq/common" - kafkawrapper "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper/kafka" - "github.com/milvus-io/milvus/pkg/v2/util/funcutil" -) - -// Note: kafka does not support get all data when consuming from the earliest position again. -//func TestStream_KafkaTtMsgStream_NoSeek(t *testing.T) { -// kafkaAddress := getKafkaBrokerList() -// c1 := funcutil.RandomString(8) -// producerChannels := []string{c1} -// consumerChannels := []string{c1} -// consumerSubName := funcutil.RandomString(8) -// -// msgPack0 := MsgPack{} -// msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) -// -// msgPack1 := MsgPack{} -// msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) -// msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 19)) -// -// msgPack2 := MsgPack{} -// msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5)) -// -// msgPack3 := MsgPack{} -// msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 14)) -// msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 9)) -// -// msgPack4 := MsgPack{} -// msgPack4.Msgs = append(msgPack4.Msgs, getTimeTickMsg(11)) -// -// msgPack5 := MsgPack{} -// msgPack5.Msgs = append(msgPack5.Msgs, getTimeTickMsg(15)) -// -// ctx := context.Background() -// inputStream := getKafkaInputStream(ctx, kafkaAddress, producerChannels) -// outputStream := getKafkaTtOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName) -// -// err := inputStream.Broadcast(&msgPack0) -// assert.NoError(t, err) -// err = inputStream.Produce(&msgPack1) -// assert.NoError(t, err) -// err = inputStream.Broadcast(&msgPack2) -// assert.NoError(t, err) -// err = inputStream.Produce(&msgPack3) -// assert.NoError(t, err) -// err = inputStream.Broadcast(&msgPack4) -// assert.NoError(t, err) -// err = inputStream.Broadcast(&msgPack5) -// assert.NoError(t, err) -// -// o1 := consumer(ctx, outputStream) -// o2 := consumer(ctx, outputStream) -// o3 := consumer(ctx, outputStream) -// -// t.Log(o1.BeginTs) -// t.Log(o2.BeginTs) -// t.Log(o3.BeginTs) -// outputStream.Close() -// -// outputStream2 := getKafkaTtOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName) -// p1 := consumer(ctx, outputStream2) -// p2 := consumer(ctx, outputStream2) -// p3 := consumer(ctx, outputStream2) -// t.Log(p1.BeginTs) -// t.Log(p2.BeginTs) -// t.Log(p3.BeginTs) -// outputStream2.Close() -// -// assert.Equal(t, o1.BeginTs, p1.BeginTs) -// assert.Equal(t, o2.BeginTs, p2.BeginTs) -// assert.Equal(t, o3.BeginTs, p3.BeginTs) -//} - -func skipTest(t *testing.T) { - t.Skip("skip kafka test") -} - -func TestStream_KafkaMsgStream_SeekToLast(t *testing.T) { - skipTest(t) - kafkaAddress := getKafkaBrokerList() - c := funcutil.RandomString(8) - producerChannels := []string{c} - consumerChannels := []string{c} - consumerSubName := funcutil.RandomString(8) - - msgPack := &MsgPack{} - ctx := context.Background() - inputStream := getKafkaInputStream(ctx, kafkaAddress, producerChannels) - defer inputStream.Close() - - for i := 0; i < 10; i++ { - insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i)) - msgPack.Msgs = append(msgPack.Msgs, insertMsg) - } - - // produce test data - err := inputStream.Produce(ctx, msgPack) - assert.NoError(t, err) - - // pick a seekPosition - var seekPosition *msgpb.MsgPosition - outputStream := getKafkaOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName, common.SubscriptionPositionEarliest) - for i := 0; i < 10; i++ { - result := consumer(ctx, outputStream) - assert.Equal(t, result.Msgs[0].GetID(), int64(i)) - if i == 5 { - seekPosition = result.EndPositions[0] - break - } - } - outputStream.Close() - - // create a consumer can consume data from seek position to last msg - outputStream2 := getKafkaOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName, common.SubscriptionPositionUnknown) - lastMsgID, err := outputStream2.GetLatestMsgID(c) - defer outputStream2.Close() - assert.NoError(t, err) - - err = outputStream2.Seek(ctx, []*msgpb.MsgPosition{seekPosition}, false) - assert.NoError(t, err) - - cnt := 0 - var value int64 = 6 - hasMore := true - for hasMore { - select { - case <-ctx.Done(): - hasMore = false - case msgPack, ok := <-outputStream2.Chan(): - if !ok { - assert.Fail(t, "Should not reach here") - } - - assert.Equal(t, 1, len(msgPack.Msgs)) - for _, tsMsg := range msgPack.Msgs { - assert.Equal(t, value, tsMsg.GetID()) - value++ - cnt++ - - ret, err := lastMsgID.LessOrEqualThan(tsMsg.GetPosition().MsgID) - assert.NoError(t, err) - if ret { - hasMore = false - break - } - } - } - } - - assert.Equal(t, 4, cnt) -} - -func TestStream_KafkaTtMsgStream_Seek(t *testing.T) { - skipTest(t) - kafkaAddress := getKafkaBrokerList() - c1 := funcutil.RandomString(8) - producerChannels := []string{c1} - consumerChannels := []string{c1} - consumerSubName := funcutil.RandomString(8) - - msgPack0 := MsgPack{} - msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) - - msgPack1 := MsgPack{} - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3)) - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 19)) - - msgPack2 := MsgPack{} - msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5)) - - msgPack3 := MsgPack{} - msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 14)) - msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 9)) - - msgPack4 := MsgPack{} - msgPack4.Msgs = append(msgPack4.Msgs, getTimeTickMsg(11)) - - msgPack5 := MsgPack{} - msgPack5.Msgs = append(msgPack5.Msgs, getTsMsg(commonpb.MsgType_Insert, 12)) - msgPack5.Msgs = append(msgPack5.Msgs, getTsMsg(commonpb.MsgType_Insert, 13)) - - msgPack6 := MsgPack{} - msgPack6.Msgs = append(msgPack6.Msgs, getTimeTickMsg(15)) - - msgPack7 := MsgPack{} - msgPack7.Msgs = append(msgPack7.Msgs, getTimeTickMsg(20)) - - ctx := context.Background() - inputStream := getKafkaInputStream(ctx, kafkaAddress, producerChannels) - outputStream := getKafkaTtOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName) - - _, err := inputStream.Broadcast(ctx, &msgPack0) - assert.NoError(t, err) - err = inputStream.Produce(ctx, &msgPack1) - assert.NoError(t, err) - _, err = inputStream.Broadcast(ctx, &msgPack2) - assert.NoError(t, err) - err = inputStream.Produce(ctx, &msgPack3) - assert.NoError(t, err) - _, err = inputStream.Broadcast(ctx, &msgPack4) - assert.NoError(t, err) - err = inputStream.Produce(ctx, &msgPack5) - assert.NoError(t, err) - _, err = inputStream.Broadcast(ctx, &msgPack6) - assert.NoError(t, err) - _, err = inputStream.Broadcast(ctx, &msgPack7) - assert.NoError(t, err) - - receivedMsg := consumer(ctx, outputStream) - assert.Equal(t, len(receivedMsg.Msgs), 2) - assert.Equal(t, receivedMsg.BeginTs, uint64(0)) - assert.Equal(t, receivedMsg.EndTs, uint64(5)) - - assert.Equal(t, receivedMsg.StartPositions[0].Timestamp, uint64(0)) - assert.Equal(t, receivedMsg.EndPositions[0].Timestamp, uint64(5)) - - receivedMsg2 := consumer(ctx, outputStream) - assert.Equal(t, len(receivedMsg2.Msgs), 1) - assert.Equal(t, receivedMsg2.BeginTs, uint64(5)) - assert.Equal(t, receivedMsg2.EndTs, uint64(11)) - assert.Equal(t, receivedMsg2.StartPositions[0].Timestamp, uint64(5)) - assert.Equal(t, receivedMsg2.EndPositions[0].Timestamp, uint64(11)) - - receivedMsg3 := consumer(ctx, outputStream) - assert.Equal(t, len(receivedMsg3.Msgs), 3) - assert.Equal(t, receivedMsg3.BeginTs, uint64(11)) - assert.Equal(t, receivedMsg3.EndTs, uint64(15)) - assert.Equal(t, receivedMsg3.StartPositions[0].Timestamp, uint64(11)) - assert.Equal(t, receivedMsg3.EndPositions[0].Timestamp, uint64(15)) - - receivedMsg4 := consumer(ctx, outputStream) - assert.Equal(t, len(receivedMsg4.Msgs), 1) - assert.Equal(t, receivedMsg4.BeginTs, uint64(15)) - assert.Equal(t, receivedMsg4.EndTs, uint64(20)) - assert.Equal(t, receivedMsg4.StartPositions[0].Timestamp, uint64(15)) - assert.Equal(t, receivedMsg4.EndPositions[0].Timestamp, uint64(20)) - - outputStream.Close() - - outputStream = getKafkaTtOutputStreamAndSeek(ctx, kafkaAddress, receivedMsg3.StartPositions) - seekMsg := consumer(ctx, outputStream) - assert.Equal(t, len(seekMsg.Msgs), 3) - result := []uint64{14, 12, 13} - for i, msg := range seekMsg.Msgs { - tsMsg, err := msg.Unmarshal(outputStream.GetUnmarshalDispatcher()) - require.NoError(t, err) - assert.Equal(t, tsMsg.BeginTs(), result[i]) - } - - seekMsg2 := consumer(ctx, outputStream) - assert.Equal(t, len(seekMsg2.Msgs), 1) - for _, msg := range seekMsg2.Msgs { - tsMsg, err := msg.Unmarshal(outputStream.GetUnmarshalDispatcher()) - require.NoError(t, err) - assert.Equal(t, tsMsg.BeginTs(), uint64(19)) - } - - outputStream2 := getKafkaTtOutputStreamAndSeek(ctx, kafkaAddress, receivedMsg3.EndPositions) - seekMsg = consumer(ctx, outputStream2) - assert.Equal(t, len(seekMsg.Msgs), 1) - for _, msg := range seekMsg.Msgs { - tsMsg, err := msg.Unmarshal(outputStream.GetUnmarshalDispatcher()) - require.NoError(t, err) - assert.Equal(t, tsMsg.BeginTs(), uint64(19)) - } - - inputStream.Close() - outputStream2.Close() -} - -func TestStream_KafkaTtMsgStream_1(t *testing.T) { - skipTest(t) - kafkaAddress := getKafkaBrokerList() - c1 := funcutil.RandomString(8) - c2 := funcutil.RandomString(8) - p1Channels := []string{c1} - p2Channels := []string{c2} - consumerChannels := []string{c1, c2} - consumerSubName := funcutil.RandomString(8) - - ctx := context.Background() - inputStream1 := getKafkaInputStream(ctx, kafkaAddress, p1Channels) - msgPacks1 := createRandMsgPacks(3, 10, 10) - assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1)) - - inputStream2 := getKafkaInputStream(ctx, kafkaAddress, p2Channels) - msgPacks2 := createRandMsgPacks(5, 10, 10) - assert.Nil(t, sendMsgPacks(inputStream2, msgPacks2)) - - // consume msg - outputStream := getKafkaTtOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName) - checkNMsgPack := func(t *testing.T, outputStream MsgStream, num int) int { - rcvMsg := 0 - for i := 0; i < num; i++ { - msgPack := consumer(ctx, outputStream) - rcvMsg += len(msgPack.Msgs) - if len(msgPack.Msgs) > 0 { - for _, msg := range msgPack.Msgs { - tsMsg, err := msg.Unmarshal(outputStream.GetUnmarshalDispatcher()) - require.NoError(t, err) - log.Println("msg type: ", tsMsg.Type(), ", msg value: ", msg) - assert.Greater(t, tsMsg.BeginTs(), msgPack.BeginTs) - assert.LessOrEqual(t, tsMsg.BeginTs(), msgPack.EndTs) - } - } - } - return rcvMsg - } - msgCount := checkNMsgPack(t, outputStream, len(msgPacks1)/2) - cnt1 := (len(msgPacks1)/2 - 1) * len(msgPacks1[0].Msgs) - cnt2 := (len(msgPacks2)/2 - 1) * len(msgPacks2[0].Msgs) - assert.Equal(t, (cnt1 + cnt2), msgCount) - - inputStream1.Close() - inputStream2.Close() - outputStream.Close() -} - -func TestStream_KafkaTtMsgStream_2(t *testing.T) { - skipTest(t) - kafkaAddress := getKafkaBrokerList() - c1 := funcutil.RandomString(8) - c2 := funcutil.RandomString(8) - p1Channels := []string{c1} - p2Channels := []string{c2} - consumerChannels := []string{c1, c2} - consumerSubName := funcutil.RandomString(8) - - ctx := context.Background() - inputStream1 := getKafkaInputStream(ctx, kafkaAddress, p1Channels) - defer inputStream1.Close() - msgPacks1 := createRandMsgPacks(3, 10, 10) - assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1)) - - inputStream2 := getKafkaInputStream(ctx, kafkaAddress, p2Channels) - defer inputStream2.Close() - msgPacks2 := createRandMsgPacks(5, 10, 10) - assert.Nil(t, sendMsgPacks(inputStream2, msgPacks2)) - - // consume msg - log.Println("=============receive msg===================") - rcvMsgPacks := make([]*ConsumeMsgPack, 0) - - resumeMsgPack := func(t *testing.T) int { - var outputStream MsgStream - msgCount := len(rcvMsgPacks) - if msgCount == 0 { - outputStream = getKafkaTtOutputStream(ctx, kafkaAddress, consumerChannels, consumerSubName) - } else { - outputStream = getKafkaTtOutputStreamAndSeek(ctx, kafkaAddress, rcvMsgPacks[msgCount-1].EndPositions) - } - defer outputStream.Close() - msgPack := consumer(ctx, outputStream) - rcvMsgPacks = append(rcvMsgPacks, msgPack) - if len(msgPack.Msgs) > 0 { - for _, msg := range msgPack.Msgs { - tsMsg, err := msg.Unmarshal(outputStream.GetUnmarshalDispatcher()) - require.NoError(t, err) - log.Println("TestStream_KafkaTtMsgStream_2 msg type: ", tsMsg.Type(), ", msg value: ", msg) - assert.Greater(t, tsMsg.BeginTs(), msgPack.BeginTs) - assert.LessOrEqual(t, tsMsg.BeginTs(), msgPack.EndTs) - } - log.Println("================") - } - return len(rcvMsgPacks[msgCount].Msgs) - } - - msgCount := 0 - for i := 0; i < len(msgPacks1)/2; i++ { - msgCount += resumeMsgPack(t) - } - cnt1 := (len(msgPacks1)/2 - 1) * len(msgPacks1[0].Msgs) - cnt2 := (len(msgPacks2)/2 - 1) * len(msgPacks2[0].Msgs) - assert.Equal(t, (cnt1 + cnt2), msgCount) -} - -func TestStream_KafkaTtMsgStream_DataNodeTimetickMsgstream(t *testing.T) { - skipTest(t) - kafkaAddress := getKafkaBrokerList() - c1 := funcutil.RandomString(8) - p1Channels := []string{c1} - consumerChannels := []string{c1} - consumerSubName := funcutil.RandomString(8) - - ctx := context.Background() - - factory := ProtoUDFactory{} - kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress) - outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher()) - outputStream.AsConsumer(context.Background(), consumerChannels, consumerSubName, common.SubscriptionPositionLatest) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - for { - select { - case <-ctx.Done(): - wg.Done() - return - case msgPack, ok := <-outputStream.Chan(): - assert.True(t, ok) - assert.NotNil(t, msgPack) - - if len(msgPack.Msgs) > 0 { - t.Log("msg===:", msgPack.Msgs[0]) - wg.Done() - return - } - } - } - }() - - inputStream1 := getKafkaInputStream(ctx, kafkaAddress, p1Channels) - msgPacks1 := createRandMsgPacks(2, 10, 1) - assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1)) - wg.Wait() - - defer outputStream.Close() - defer inputStream1.Close() -} - -func getKafkaInputStream(ctx context.Context, kafkaAddress string, producerChannels []string, opts ...RepackFunc) MsgStream { - factory := ProtoUDFactory{} - config := kafka.ConfigMap{ - "bootstrap.servers": kafkaAddress, - "socket.timeout.ms": 500, - "socket.max.fails": 2, - "api.version.request": true, - "linger.ms": 10, - } - kafkaClient := kafkawrapper.NewKafkaClientInstanceWithConfigMap(config, nil, nil) - inputStream, _ := NewMqMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher()) - inputStream.AsProducer(ctx, producerChannels) - for _, opt := range opts { - inputStream.SetRepackFunc(opt) - } - return inputStream -} - -func getKafkaOutputStream(ctx context.Context, kafkaAddress string, consumerChannels []string, consumerSubName string, position common.SubscriptionInitialPosition) MsgStream { - factory := ProtoUDFactory{} - kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress) - outputStream, _ := NewMqMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher()) - outputStream.AsConsumer(context.Background(), consumerChannels, consumerSubName, position) - return outputStream -} - -func getKafkaTtOutputStream(ctx context.Context, kafkaAddress string, consumerChannels []string, consumerSubName string) MsgStream { - factory := ProtoUDFactory{} - kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress) - outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher()) - outputStream.AsConsumer(context.Background(), consumerChannels, consumerSubName, common.SubscriptionPositionEarliest) - return outputStream -} - -func getKafkaTtOutputStreamAndSeek(ctx context.Context, kafkaAddress string, positions []*MsgPosition) MsgStream { - factory := ProtoUDFactory{} - kafkaClient := kafkawrapper.NewKafkaClientInstance(kafkaAddress) - outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, kafkaClient, factory.NewUnmarshalDispatcher()) - consumerName := []string{} - for _, c := range positions { - consumerName = append(consumerName, c.ChannelName) - } - outputStream.AsConsumer(context.Background(), consumerName, funcutil.RandomString(8), common.SubscriptionPositionUnknown) - outputStream.Seek(context.Background(), positions, false) - return outputStream -} diff --git a/pkg/mq/msgstream/mq_msgstream_test.go b/pkg/mq/msgstream/mq_msgstream_test.go index d3acf0d452..25c0aa44b1 100644 --- a/pkg/mq/msgstream/mq_msgstream_test.go +++ b/pkg/mq/msgstream/mq_msgstream_test.go @@ -29,20 +29,15 @@ import ( "github.com/apache/pulsar-client-go/pulsar" "github.com/cockroachdb/errors" "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "go.uber.org/atomic" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus/pkg/v2/config" "github.com/milvus-io/milvus/pkg/v2/mq/common" "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper" - kafkawrapper "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper/kafka" pulsarwrapper "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper/pulsar" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" - "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" ) const ( @@ -99,700 +94,6 @@ func consumer(ctx context.Context, mq MsgStream) *ConsumeMsgPack { } } -func TestStream_ConfigEvent(t *testing.T) { - pulsarAddress := getPulsarAddress() - factory := ProtoUDFactory{} - pulsarClient, err := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) - assert.NoError(t, err) - stream, err := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) - assert.NoError(t, err) - stream.configEvent.OnEvent(&config.Event{Value: "false"}) - stream.configEvent.OnEvent(&config.Event{Value: "????"}) - assert.False(t, stream.isEnabledProduce()) - stream.configEvent.OnEvent(&config.Event{Value: "true"}) - assert.True(t, stream.isEnabledProduce()) -} - -func TestStream_PulsarMsgStream_Insert(t *testing.T) { - Params.Save(Params.CommonCfg.TTMsgEnabled.Key, "false") - defer Params.Remove(Params.CommonCfg.TTMsgEnabled.Key) - pulsarAddress := getPulsarAddress() - c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) - producerChannels := []string{c1, c2} - consumerChannels := []string{c1, c2} - consumerSubName := funcutil.RandomString(8) - ctx := context.Background() - - msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 3)) - - inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) - outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) - - { - inputStream.ForceEnableProduce(false) - err := inputStream.Produce(ctx, &msgPack) - require.Error(t, err) - } - - inputStream.ForceEnableProduce(true) - err := inputStream.Produce(ctx, &msgPack) - require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err)) - - receiveMsg(ctx, outputStream, len(msgPack.Msgs)) - inputStream.Close() - outputStream.Close() -} - -func TestStream_PulsarMsgStream_Delete(t *testing.T) { - pulsarAddress := getPulsarAddress() - c := funcutil.RandomString(8) - producerChannels := []string{c} - consumerChannels := []string{c} - consumerSubName := funcutil.RandomString(8) - msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Delete, 1)) - - ctx := context.Background() - inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) - outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) - - err := inputStream.Produce(ctx, &msgPack) - require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err)) - - receiveMsg(ctx, outputStream, len(msgPack.Msgs)) - inputStream.Close() - outputStream.Close() -} - -func TestStream_PulsarMsgStream_TimeTick(t *testing.T) { - pulsarAddress := getPulsarAddress() - c := funcutil.RandomString(8) - producerChannels := []string{c} - consumerChannels := []string{c} - consumerSubName := funcutil.RandomString(8) - msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 3)) - - ctx := context.Background() - inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) - outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) - - err := inputStream.Produce(ctx, &msgPack) - require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err)) - - receiveMsg(ctx, outputStream, len(msgPack.Msgs)) - inputStream.Close() - outputStream.Close() -} - -func TestStream_PulsarMsgStream_BroadCast(t *testing.T) { - Params.Save(Params.CommonCfg.TTMsgEnabled.Key, "false") - defer Params.Remove(Params.CommonCfg.TTMsgEnabled.Key) - pulsarAddress := getPulsarAddress() - c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) - producerChannels := []string{c1, c2} - consumerChannels := []string{c1, c2} - consumerSubName := funcutil.RandomString(8) - - msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 3)) - - ctx := context.Background() - inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) - outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) - - { - inputStream.ForceEnableProduce(false) - _, err := inputStream.Broadcast(ctx, &msgPack) - require.Error(t, err) - } - - inputStream.ForceEnableProduce(true) - _, err := inputStream.Broadcast(ctx, &msgPack) - require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) - - receiveMsg(ctx, outputStream, len(consumerChannels)*len(msgPack.Msgs)) - inputStream.Close() - outputStream.Close() -} - -func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) { - pulsarAddress := getPulsarAddress() - c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) - producerChannels := []string{c1, c2} - consumerChannels := []string{c1, c2} - consumerSubName := funcutil.RandomString(8) - - msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 3)) - - ctx := context.Background() - inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels, repackFunc) - outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) - err := inputStream.Produce(ctx, &msgPack) - require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err)) - - receiveMsg(ctx, outputStream, len(msgPack.Msgs)) - inputStream.Close() - outputStream.Close() -} - -func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) { - pulsarAddress := getPulsarAddress() - c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) - producerChannels := []string{c1, c2} - consumerChannels := []string{c1, c2} - consumerSubName := funcutil.RandomString(8) - - insertRequest := &msgpb.InsertRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Insert, - MsgID: 1, - Timestamp: 1, - SourceID: 1, - }, - CollectionName: "Collection", - PartitionName: "Partition", - SegmentID: 1, - ShardName: "1", - Timestamps: []Timestamp{1, 1}, - RowIDs: []int64{1, 3}, - RowData: []*commonpb.Blob{{}, {}}, - } - insertMsg := &InsertMsg{ - BaseMsg: BaseMsg{ - BeginTimestamp: 0, - EndTimestamp: 0, - HashValues: []uint32{1, 3}, - }, - InsertRequest: insertRequest, - } - - msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, insertMsg) - - factory := ProtoUDFactory{} - - ctx := context.Background() - pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) - inputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) - inputStream.AsProducer(ctx, producerChannels) - - pulsarClient2, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) - outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) - outputStream.AsConsumer(ctx, consumerChannels, consumerSubName, common.SubscriptionPositionEarliest) - var output MsgStream = outputStream - - err := (*inputStream).Produce(ctx, &msgPack) - require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err)) - - receiveMsg(ctx, output, len(msgPack.Msgs)*2) - (*inputStream).Close() - (*outputStream).Close() -} - -func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) { - pulsarAddress := getPulsarAddress() - c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) - producerChannels := []string{c1, c2} - consumerChannels := []string{c1, c2} - consumerSubName := funcutil.RandomString(8) - - deleteRequest := &msgpb.DeleteRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Delete, - MsgID: 1, - Timestamp: 1, - SourceID: 1, - }, - CollectionName: "Collection", - ShardName: "chan-1", - Timestamps: []Timestamp{1}, - Int64PrimaryKeys: []int64{1}, - NumRows: 1, - } - deleteMsg := &DeleteMsg{ - BaseMsg: BaseMsg{ - BeginTimestamp: 0, - EndTimestamp: 0, - HashValues: []uint32{1}, - }, - DeleteRequest: deleteRequest, - } - - msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, deleteMsg) - - factory := ProtoUDFactory{} - ctx := context.Background() - pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) - inputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) - inputStream.AsProducer(ctx, producerChannels) - - pulsarClient2, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) - outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) - outputStream.AsConsumer(ctx, consumerChannels, consumerSubName, common.SubscriptionPositionEarliest) - var output MsgStream = outputStream - - err := (*inputStream).Produce(ctx, &msgPack) - require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err)) - - receiveMsg(ctx, output, len(msgPack.Msgs)*1) - (*inputStream).Close() - (*outputStream).Close() -} - -func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) { - pulsarAddress := getPulsarAddress() - c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) - producerChannels := []string{c1, c2} - consumerChannels := []string{c1, c2} - consumerSubName := funcutil.RandomString(8) - - msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 2)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Delete, 3)) - - factory := ProtoUDFactory{} - - ctx := context.Background() - pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) - inputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) - inputStream.AsProducer(ctx, producerChannels) - - pulsarClient2, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) - outputStream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) - outputStream.AsConsumer(ctx, consumerChannels, consumerSubName, common.SubscriptionPositionEarliest) - var output MsgStream = outputStream - - err := (*inputStream).Produce(ctx, &msgPack) - require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err)) - - receiveMsg(ctx, output, len(msgPack.Msgs)) - (*inputStream).Close() - (*outputStream).Close() -} - -func TestStream_PulsarTtMsgStream_Insert(t *testing.T) { - pulsarAddress := getPulsarAddress() - c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) - producerChannels := []string{c1, c2} - consumerChannels := []string{c1, c2} - consumerSubName := funcutil.RandomString(8) - msgPack0 := MsgPack{} - msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) - - msgPack1 := MsgPack{} - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3)) - - msgPack2 := MsgPack{} - msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5)) - - ctx := context.Background() - inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) - outputStream := getPulsarTtOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) - - _, err := inputStream.Broadcast(ctx, &msgPack0) - require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) - - err = inputStream.Produce(ctx, &msgPack1) - require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err)) - - _, err = inputStream.Broadcast(ctx, &msgPack2) - require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) - - receiveMsg(ctx, outputStream, len(msgPack1.Msgs)) - inputStream.Close() - outputStream.Close() -} - -func TestStream_PulsarTtMsgStream_NoSeek(t *testing.T) { - pulsarAddress := getPulsarAddress() - c1 := funcutil.RandomString(8) - producerChannels := []string{c1} - consumerChannels := []string{c1} - consumerSubName := funcutil.RandomString(8) - - msgPack0 := MsgPack{} - msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) - - msgPack1 := MsgPack{} - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 19)) - - msgPack2 := MsgPack{} - msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5)) - - msgPack3 := MsgPack{} - msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 14)) - msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 9)) - - msgPack4 := MsgPack{} - msgPack4.Msgs = append(msgPack4.Msgs, getTimeTickMsg(11)) - - msgPack5 := MsgPack{} - msgPack5.Msgs = append(msgPack5.Msgs, getTimeTickMsg(15)) - - ctx := context.Background() - inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) - outputStream := getPulsarTtOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) - - _, err := inputStream.Broadcast(ctx, &msgPack0) - assert.NoError(t, err) - err = inputStream.Produce(ctx, &msgPack1) - assert.NoError(t, err) - _, err = inputStream.Broadcast(ctx, &msgPack2) - assert.NoError(t, err) - err = inputStream.Produce(ctx, &msgPack3) - assert.NoError(t, err) - _, err = inputStream.Broadcast(ctx, &msgPack4) - assert.NoError(t, err) - _, err = inputStream.Broadcast(ctx, &msgPack5) - assert.NoError(t, err) - - o1 := consumer(ctx, outputStream) - o2 := consumer(ctx, outputStream) - o3 := consumer(ctx, outputStream) - - t.Log(o1.BeginTs) - t.Log(o2.BeginTs) - t.Log(o3.BeginTs) - outputStream.Close() - - outputStream2 := getPulsarTtOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) - p1 := consumer(ctx, outputStream2) - p2 := consumer(ctx, outputStream2) - p3 := consumer(ctx, outputStream2) - t.Log(p1.BeginTs) - t.Log(p2.BeginTs) - t.Log(p3.BeginTs) - outputStream2.Close() - - assert.Equal(t, o1.BeginTs, p1.BeginTs) - assert.Equal(t, o2.BeginTs, p2.BeginTs) - assert.Equal(t, o3.BeginTs, p3.BeginTs) -} - -func TestStream_PulsarMsgStream_SeekToLast(t *testing.T) { - pulsarAddress := getPulsarAddress() - c := funcutil.RandomString(8) - producerChannels := []string{c} - consumerChannels := []string{c} - consumerSubName := funcutil.RandomString(8) - - msgPack := &MsgPack{} - ctx := context.Background() - inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) - defer inputStream.Close() - - outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) - for i := 0; i < 10; i++ { - insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i)) - msgPack.Msgs = append(msgPack.Msgs, insertMsg) - } - - // produce test data - err := inputStream.Produce(ctx, msgPack) - assert.NoError(t, err) - - // pick a seekPosition - var seekPosition *msgpb.MsgPosition - for i := 0; i < 10; i++ { - result := consumer(ctx, outputStream) - assert.Equal(t, result.Msgs[0].GetID(), int64(i)) - if i == 5 { - seekPosition = result.EndPositions[0] - } - } - outputStream.Close() - - // create a consumer can consume data from seek position to last msg - factory := ProtoUDFactory{} - pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) - outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) - outputStream2.AsConsumer(ctx, consumerChannels, consumerSubName, common.SubscriptionPositionEarliest) - lastMsgID, err := outputStream2.GetLatestMsgID(c) - defer outputStream2.Close() - assert.NoError(t, err) - - err = outputStream2.Seek(ctx, []*msgpb.MsgPosition{seekPosition}, false) - assert.NoError(t, err) - - cnt := 0 - var value int64 = 6 - hasMore := true - for hasMore { - select { - case <-ctx.Done(): - hasMore = false - case msgPack, ok := <-outputStream2.Chan(): - if !ok { - assert.Fail(t, "Should not reach here") - } - - assert.Equal(t, 1, len(msgPack.Msgs)) - for _, tsMsg := range msgPack.Msgs { - assert.Equal(t, value, tsMsg.GetID()) - value++ - cnt++ - - ret, err := lastMsgID.LessOrEqualThan(tsMsg.GetPosition().MsgID) - assert.NoError(t, err) - if ret { - hasMore = false - break - } - } - } - } - - assert.Equal(t, 4, cnt) -} - -func TestStream_MsgStream_AsConsumerCtxDone(t *testing.T) { - pulsarAddress := getPulsarAddress() - - t.Run("MsgStream AsConsumer with timeout context", func(t *testing.T) { - c1 := funcutil.RandomString(8) - consumerChannels := []string{c1} - consumerSubName := funcutil.RandomString(8) - - ctx := context.Background() - factory := ProtoUDFactory{} - pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) - outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) - - ctx, cancel := context.WithTimeout(ctx, time.Millisecond) - defer cancel() - <-time.After(2 * time.Millisecond) - err := outputStream.AsConsumer(ctx, consumerChannels, consumerSubName, common.SubscriptionPositionEarliest) - assert.Error(t, err) - - omsgstream, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) - err = omsgstream.AsConsumer(ctx, consumerChannels, consumerSubName, common.SubscriptionPositionEarliest) - assert.Error(t, err) - }) -} - -func TestStream_PulsarTtMsgStream_Seek(t *testing.T) { - pulsarAddress := getPulsarAddress() - - c1 := funcutil.RandomString(8) - producerChannels := []string{c1} - consumerChannels := []string{c1} - consumerSubName := funcutil.RandomString(8) - - msgPack0 := MsgPack{} - msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) - - msgPack1 := MsgPack{} - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3)) - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 19)) - - msgPack2 := MsgPack{} - msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5)) - - msgPack3 := MsgPack{} - msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 14)) - msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 9)) - - msgPack4 := MsgPack{} - msgPack4.Msgs = append(msgPack4.Msgs, getTimeTickMsg(11)) - - msgPack5 := MsgPack{} - msgPack5.Msgs = append(msgPack5.Msgs, getTsMsg(commonpb.MsgType_Insert, 12)) - msgPack5.Msgs = append(msgPack5.Msgs, getTsMsg(commonpb.MsgType_Insert, 13)) - - msgPack6 := MsgPack{} - msgPack6.Msgs = append(msgPack6.Msgs, getTimeTickMsg(15)) - - msgPack7 := MsgPack{} - msgPack7.Msgs = append(msgPack7.Msgs, getTimeTickMsg(20)) - - ctx := context.Background() - inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) - outputStream := getPulsarTtOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) - - _, err := inputStream.Broadcast(ctx, &msgPack0) - assert.NoError(t, err) - err = inputStream.Produce(ctx, &msgPack1) - assert.NoError(t, err) - _, err = inputStream.Broadcast(ctx, &msgPack2) - assert.NoError(t, err) - err = inputStream.Produce(ctx, &msgPack3) - assert.NoError(t, err) - _, err = inputStream.Broadcast(ctx, &msgPack4) - assert.NoError(t, err) - err = inputStream.Produce(ctx, &msgPack5) - assert.NoError(t, err) - _, err = inputStream.Broadcast(ctx, &msgPack6) - assert.NoError(t, err) - _, err = inputStream.Broadcast(ctx, &msgPack7) - assert.NoError(t, err) - - receivedMsg := consumer(ctx, outputStream) - assert.Equal(t, len(receivedMsg.Msgs), 2) - assert.Equal(t, receivedMsg.BeginTs, uint64(0)) - assert.Equal(t, receivedMsg.EndTs, uint64(5)) - - assert.Equal(t, receivedMsg.StartPositions[0].Timestamp, uint64(0)) - assert.Equal(t, receivedMsg.EndPositions[0].Timestamp, uint64(5)) - - receivedMsg2 := consumer(ctx, outputStream) - assert.Equal(t, len(receivedMsg2.Msgs), 1) - assert.Equal(t, receivedMsg2.BeginTs, uint64(5)) - assert.Equal(t, receivedMsg2.EndTs, uint64(11)) - assert.Equal(t, receivedMsg2.StartPositions[0].Timestamp, uint64(5)) - assert.Equal(t, receivedMsg2.EndPositions[0].Timestamp, uint64(11)) - - receivedMsg3 := consumer(ctx, outputStream) - assert.Equal(t, len(receivedMsg3.Msgs), 3) - assert.Equal(t, receivedMsg3.BeginTs, uint64(11)) - assert.Equal(t, receivedMsg3.EndTs, uint64(15)) - assert.Equal(t, receivedMsg3.StartPositions[0].Timestamp, uint64(11)) - assert.Equal(t, receivedMsg3.EndPositions[0].Timestamp, uint64(15)) - - receivedMsg4 := consumer(ctx, outputStream) - assert.Equal(t, len(receivedMsg4.Msgs), 1) - assert.Equal(t, receivedMsg4.BeginTs, uint64(15)) - assert.Equal(t, receivedMsg4.EndTs, uint64(20)) - assert.Equal(t, receivedMsg4.StartPositions[0].Timestamp, uint64(15)) - assert.Equal(t, receivedMsg4.EndPositions[0].Timestamp, uint64(20)) - - outputStream.Close() - - outputStream = getPulsarTtOutputStreamAndSeek(ctx, pulsarAddress, receivedMsg3.StartPositions) - seekMsg := consumer(ctx, outputStream) - assert.Equal(t, len(seekMsg.Msgs), 3) - result := []uint64{14, 12, 13} - for i, msg := range seekMsg.Msgs { - tsMsg, err := msg.Unmarshal(outputStream.GetUnmarshalDispatcher()) - require.NoError(t, err) - assert.Equal(t, tsMsg.BeginTs(), result[i]) - } - - seekMsg2 := consumer(ctx, outputStream) - assert.Equal(t, len(seekMsg2.Msgs), 1) - for _, msg := range seekMsg2.Msgs { - tsMsg, err := msg.Unmarshal(outputStream.GetUnmarshalDispatcher()) - require.NoError(t, err) - assert.Equal(t, tsMsg.BeginTs(), uint64(19)) - } - - outputStream2 := getPulsarTtOutputStreamAndSeek(ctx, pulsarAddress, receivedMsg3.EndPositions) - seekMsg = consumer(ctx, outputStream2) - assert.Equal(t, len(seekMsg.Msgs), 1) - for _, msg := range seekMsg.Msgs { - tsMsg, err := msg.Unmarshal(outputStream.GetUnmarshalDispatcher()) - require.NoError(t, err) - assert.Equal(t, tsMsg.BeginTs(), uint64(19)) - } - - inputStream.Close() - outputStream2.Close() -} - -func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) { - pulsarAddress := getPulsarAddress() - c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) - producerChannels := []string{c1, c2} - consumerChannels := []string{c1, c2} - consumerSubName := funcutil.RandomString(8) - - msgPack0 := MsgPack{} - msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) - - msgPack1 := MsgPack{} - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3)) - - replicatePack := MsgPack{} - replicatePack.Msgs = append(replicatePack.Msgs, &ReplicateMsg{ - BaseMsg: BaseMsg{ - BeginTimestamp: 0, - EndTimestamp: 0, - HashValues: []uint32{100}, - }, - ReplicateMsg: &msgpb.ReplicateMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Replicate, - Timestamp: 100, - }, - }, - }) - - msgPack2 := MsgPack{} - msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5)) - - ctx := context.Background() - inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) - outputStream := getPulsarTtOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) - - _, err := inputStream.Broadcast(ctx, &msgPack0) - require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) - - err = inputStream.Produce(ctx, &msgPack1) - require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err)) - - err = inputStream.Produce(ctx, &replicatePack) - require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err)) - - _, err = inputStream.Broadcast(ctx, &msgPack2) - require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) - - receiveMsg(ctx, outputStream, len(msgPack1.Msgs)) - inputStream.Close() - outputStream.Close() -} - -func TestStream_PulsarTtMsgStream_DropCollection(t *testing.T) { - pulsarAddress := getPulsarAddress() - c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) - producerChannels := []string{c1, c2} - consumerChannels := []string{c1, c2} - consumerSubName := funcutil.RandomString(8) - - msgPack0 := MsgPack{} - msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) - - msgPack1 := MsgPack{} - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) - - msgPack2 := MsgPack{} - msgPack2.Msgs = append(msgPack2.Msgs, getTsMsg(commonpb.MsgType_DropCollection, 3)) - - msgPack3 := MsgPack{} - msgPack3.Msgs = append(msgPack3.Msgs, getTimeTickMsg(5)) - - ctx := context.Background() - inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) - outputStream := getPulsarTtOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) - - _, err := inputStream.Broadcast(ctx, &msgPack0) - require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) - - err = inputStream.Produce(ctx, &msgPack1) - require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err)) - - _, err = inputStream.Broadcast(ctx, &msgPack2) - require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) - - _, err = inputStream.Broadcast(ctx, &msgPack3) - require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) - - receiveMsg(ctx, outputStream, 2) - inputStream.Close() - outputStream.Close() -} - func createRandMsgPacks(msgsInPack int, numOfMsgPack int, deltaTs int) []*MsgPack { msgPacks := make([]*MsgPack, numOfMsgPack) @@ -844,420 +145,6 @@ func sendMsgPacks(ms MsgStream, msgPacks []*MsgPack) error { return nil } -// This testcase will generate MsgPacks as following: -// -// Insert Insert Insert Insert Insert Insert -// c1 |----------|----------|----------|----------|----------|----------| -// ^ ^ ^ ^ ^ ^ -// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100) -// -// Insert Insert Insert Insert Insert Insert -// c2 |----------|----------|----------|----------|----------|----------| -// ^ ^ ^ ^ ^ ^ -// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100) -// -// Then check: -// 1. For each msg in MsgPack received by ttMsgStream consumer, there should be -// msgPack.BeginTs < msg.BeginTs() <= msgPack.EndTs -// 2. The count of consumed msg should be equal to the count of produced msg -func TestStream_PulsarTtMsgStream_1(t *testing.T) { - pulsarAddr := getPulsarAddress() - c1 := funcutil.RandomString(8) - c2 := funcutil.RandomString(8) - p1Channels := []string{c1} - p2Channels := []string{c2} - consumerChannels := []string{c1, c2} - consumerSubName := funcutil.RandomString(8) - - ctx := context.Background() - inputStream1 := getPulsarInputStream(ctx, pulsarAddr, p1Channels) - msgPacks1 := createRandMsgPacks(3, 10, 10) - assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1)) - - inputStream2 := getPulsarInputStream(ctx, pulsarAddr, p2Channels) - msgPacks2 := createRandMsgPacks(5, 10, 10) - assert.Nil(t, sendMsgPacks(inputStream2, msgPacks2)) - - // consume msg - outputStream := getPulsarTtOutputStream(ctx, pulsarAddr, consumerChannels, consumerSubName) - log.Println("===============receive msg=================") - checkNMsgPack := func(t *testing.T, outputStream MsgStream, num int) int { - rcvMsg := 0 - for i := 0; i < num; i++ { - msgPack := consumer(ctx, outputStream) - rcvMsg += len(msgPack.Msgs) - if len(msgPack.Msgs) > 0 { - for _, msg := range msgPack.Msgs { - tsMsg, err := msg.Unmarshal(outputStream.GetUnmarshalDispatcher()) - require.NoError(t, err) - log.Println("msg type: ", tsMsg.Type(), ", msg value: ", msg) - assert.Greater(t, tsMsg.BeginTs(), msgPack.BeginTs) - assert.LessOrEqual(t, tsMsg.BeginTs(), msgPack.EndTs) - } - log.Println("================") - } - } - return rcvMsg - } - msgCount := checkNMsgPack(t, outputStream, len(msgPacks1)/2) - cnt1 := (len(msgPacks1)/2 - 1) * len(msgPacks1[0].Msgs) - cnt2 := (len(msgPacks2)/2 - 1) * len(msgPacks2[0].Msgs) - assert.Equal(t, (cnt1 + cnt2), msgCount) - - inputStream1.Close() - inputStream2.Close() - outputStream.Close() -} - -// This testcase will generate MsgPacks as following: -// -// Insert Insert Insert Insert Insert Insert -// -// c1 |----------|----------|----------|----------|----------|----------| -// -// ^ ^ ^ ^ ^ ^ -// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100) -// -// Insert Insert Insert Insert Insert Insert -// -// c2 |----------|----------|----------|----------|----------|----------| -// -// ^ ^ ^ ^ ^ ^ -// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100) -// -// Then check: -// 1. ttMsgStream consumer can seek to the right position and resume -// 2. The count of consumed msg should be equal to the count of produced msg -func TestStream_PulsarTtMsgStream_2(t *testing.T) { - pulsarAddr := getPulsarAddress() - c1 := funcutil.RandomString(8) - c2 := funcutil.RandomString(8) - p1Channels := []string{c1} - p2Channels := []string{c2} - consumerChannels := []string{c1, c2} - consumerSubName := funcutil.RandomString(8) - - ctx := context.Background() - inputStream1 := getPulsarInputStream(ctx, pulsarAddr, p1Channels) - msgPacks1 := createRandMsgPacks(3, 10, 10) - assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1)) - - inputStream2 := getPulsarInputStream(ctx, pulsarAddr, p2Channels) - msgPacks2 := createRandMsgPacks(5, 10, 10) - assert.Nil(t, sendMsgPacks(inputStream2, msgPacks2)) - - // consume msg - log.Println("=============receive msg===================") - rcvMsgPacks := make([]*ConsumeMsgPack, 0) - - resumeMsgPack := func(t *testing.T) int { - var outputStream MsgStream - msgCount := len(rcvMsgPacks) - if msgCount == 0 { - outputStream = getPulsarTtOutputStream(ctx, pulsarAddr, consumerChannels, consumerSubName) - } else { - outputStream = getPulsarTtOutputStreamAndSeek(ctx, pulsarAddr, rcvMsgPacks[msgCount-1].EndPositions) - } - msgPack := consumer(ctx, outputStream) - rcvMsgPacks = append(rcvMsgPacks, msgPack) - if len(msgPack.Msgs) > 0 { - for _, msg := range msgPack.Msgs { - tsMsg, err := msg.Unmarshal(outputStream.GetUnmarshalDispatcher()) - require.NoError(t, err) - log.Println("msg type: ", tsMsg.Type(), ", msg value: ", msg) - assert.Greater(t, tsMsg.BeginTs(), msgPack.BeginTs) - assert.LessOrEqual(t, tsMsg.BeginTs(), msgPack.EndTs) - } - log.Println("================") - } - outputStream.Close() - return len(rcvMsgPacks[msgCount].Msgs) - } - - msgCount := 0 - for i := 0; i < len(msgPacks1)/2; i++ { - msgCount += resumeMsgPack(t) - } - cnt1 := (len(msgPacks1)/2 - 1) * len(msgPacks1[0].Msgs) - cnt2 := (len(msgPacks2)/2 - 1) * len(msgPacks2[0].Msgs) - assert.Equal(t, (cnt1 + cnt2), msgCount) - - inputStream1.Close() - inputStream2.Close() -} - -func TestStream_MqMsgStream_Seek(t *testing.T) { - pulsarAddress := getPulsarAddress() - c := funcutil.RandomString(8) - producerChannels := []string{c} - consumerChannels := []string{c} - consumerSubName := funcutil.RandomString(8) - - msgPack := &MsgPack{} - ctx := context.Background() - inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) - outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, consumerSubName) - - for i := 0; i < 10; i++ { - insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i)) - msgPack.Msgs = append(msgPack.Msgs, insertMsg) - } - - err := inputStream.Produce(ctx, msgPack) - assert.NoError(t, err) - var seekPosition *msgpb.MsgPosition - for i := 0; i < 10; i++ { - result := consumer(ctx, outputStream) - assert.Equal(t, result.Msgs[0].GetID(), int64(i)) - if i == 5 { - seekPosition = result.EndPositions[0] - } - } - outputStream.Close() - - factory := ProtoUDFactory{} - pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) - outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) - outputStream2.AsConsumer(ctx, consumerChannels, consumerSubName, common.SubscriptionPositionEarliest) - outputStream2.Seek(ctx, []*msgpb.MsgPosition{seekPosition}, false) - - for i := 6; i < 10; i++ { - result := consumer(ctx, outputStream2) - assert.Equal(t, result.Msgs[0].GetID(), int64(i)) - } - outputStream2.Close() -} - -func TestStream_MqMsgStream_SeekInvalidMessage(t *testing.T) { - pulsarAddress := getPulsarAddress() - c := funcutil.RandomString(8) - producerChannels := []string{c} - consumerChannels := []string{c} - - msgPack := &MsgPack{} - ctx := context.Background() - inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) - defer inputStream.Close() - - outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, funcutil.RandomString(8)) - defer outputStream.Close() - - for i := 0; i < 10; i++ { - insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i)) - msgPack.Msgs = append(msgPack.Msgs, insertMsg) - } - - err := inputStream.Produce(ctx, msgPack) - assert.NoError(t, err) - var seekPosition *msgpb.MsgPosition - for i := 0; i < 10; i++ { - result := consumer(ctx, outputStream) - assert.Equal(t, result.Msgs[0].GetID(), int64(i)) - seekPosition = result.EndPositions[0] - } - - factory := ProtoUDFactory{} - pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) - outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) - outputStream2.AsConsumer(ctx, consumerChannels, funcutil.RandomString(8), common.SubscriptionPositionEarliest) - defer outputStream2.Close() - messageID, _ := pulsar.DeserializeMessageID(seekPosition.MsgID) - // try to seek to not written position - patchMessageID(&messageID, 13) - - p := []*msgpb.MsgPosition{ - { - ChannelName: seekPosition.ChannelName, - Timestamp: seekPosition.Timestamp, - MsgGroup: seekPosition.MsgGroup, - MsgID: messageID.Serialize(), - }, - } - - err = outputStream2.Seek(ctx, p, false) - assert.NoError(t, err) - - for i := 10; i < 20; i++ { - insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i)) - msgPack.Msgs = append(msgPack.Msgs, insertMsg) - } - err = inputStream.Produce(ctx, msgPack) - assert.NoError(t, err) - result := consumer(ctx, outputStream2) - assert.Equal(t, result.Msgs[0].GetID(), int64(1)) -} - -func TestSTream_MqMsgStream_SeekBadMessageID(t *testing.T) { - pulsarAddress := getPulsarAddress() - c := funcutil.RandomString(8) - producerChannels := []string{c} - consumerChannels := []string{c} - - msgPack := &MsgPack{} - ctx := context.Background() - inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) - defer inputStream.Close() - - outputStream := getPulsarOutputStream(ctx, pulsarAddress, consumerChannels, funcutil.RandomString(8)) - defer outputStream.Close() - - for i := 0; i < 10; i++ { - insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i)) - msgPack.Msgs = append(msgPack.Msgs, insertMsg) - } - - err := inputStream.Produce(ctx, msgPack) - assert.NoError(t, err) - var seekPosition *msgpb.MsgPosition - for i := 0; i < 10; i++ { - result := consumer(ctx, outputStream) - assert.Equal(t, result.Msgs[0].GetID(), int64(i)) - seekPosition = result.EndPositions[0] - } - - // produce timetick for mqtt msgstream seek - msgPack = &MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTimeTickMsg(1000)) - err = inputStream.Produce(ctx, msgPack) - assert.NoError(t, err) - - factory := ProtoUDFactory{} - pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) - outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) - outputStream2.AsConsumer(ctx, consumerChannels, funcutil.RandomString(8), common.SubscriptionPositionLatest) - defer outputStream2.Close() - - outputStream3, err := NewMqTtMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) - outputStream3.AsConsumer(ctx, consumerChannels, funcutil.RandomString(8), common.SubscriptionPositionEarliest) - require.NoError(t, err) - - defer paramtable.Get().Reset(paramtable.Get().MQCfg.IgnoreBadPosition.Key) - - p := []*msgpb.MsgPosition{ - { - ChannelName: seekPosition.ChannelName, - Timestamp: seekPosition.Timestamp, - MsgGroup: seekPosition.MsgGroup, - MsgID: kafkawrapper.SerializeKafkaID(123), - }, - } - - paramtable.Get().Save(paramtable.Get().MQCfg.IgnoreBadPosition.Key, "false") - err = outputStream2.Seek(ctx, p, false) - assert.Error(t, err) - err = outputStream3.Seek(ctx, p, false) - assert.Error(t, err) - - paramtable.Get().Save(paramtable.Get().MQCfg.IgnoreBadPosition.Key, "true") - err = outputStream2.Seek(ctx, p, false) - assert.NoError(t, err) - err = outputStream3.Seek(ctx, p, false) - assert.NoError(t, err) -} - -func TestStream_MqMsgStream_SeekLatest(t *testing.T) { - pulsarAddress := getPulsarAddress() - c := funcutil.RandomString(8) - producerChannels := []string{c} - consumerChannels := []string{c} - consumerSubName := funcutil.RandomString(8) - - msgPack := &MsgPack{} - ctx := context.Background() - inputStream := getPulsarInputStream(ctx, pulsarAddress, producerChannels) - - for i := 0; i < 10; i++ { - insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i)) - msgPack.Msgs = append(msgPack.Msgs, insertMsg) - } - - err := inputStream.Produce(ctx, msgPack) - assert.NoError(t, err) - factory := ProtoUDFactory{} - pulsarClient, _ := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) - outputStream2, _ := NewMqMsgStream(ctx, 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) - outputStream2.AsConsumer(ctx, consumerChannels, consumerSubName, common.SubscriptionPositionLatest) - - msgPack.Msgs = nil - // produce another 10 tsMs - for i := 10; i < 20; i++ { - insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i)) - msgPack.Msgs = append(msgPack.Msgs, insertMsg) - } - err = inputStream.Produce(ctx, msgPack) - assert.NoError(t, err) - - for i := 10; i < 20; i++ { - result := consumer(ctx, outputStream2) - assert.Equal(t, result.Msgs[0].GetID(), int64(i)) - } - - inputStream.Close() - outputStream2.Close() -} - -func TestStream_BroadcastMark(t *testing.T) { - pulsarAddress := getPulsarAddress() - c1 := funcutil.RandomString(8) - c2 := funcutil.RandomString(8) - producerChannels := []string{c1, c2} - ctx := context.Background() - - factory := ProtoUDFactory{} - pulsarClient, err := pulsarwrapper.NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) - assert.NoError(t, err) - outputStream, err := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) - assert.NoError(t, err) - - // add producer channels - outputStream.AsProducer(ctx, producerChannels) - - msgPack0 := MsgPack{} - msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) - - ids, err := outputStream.Broadcast(ctx, &msgPack0) - assert.NoError(t, err) - assert.NotNil(t, ids) - assert.Equal(t, len(producerChannels), len(ids)) - for _, c := range producerChannels { - ids, ok := ids[c] - assert.True(t, ok) - assert.Equal(t, len(msgPack0.Msgs), len(ids)) - } - - msgPack1 := MsgPack{} - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3)) - - ids, err = outputStream.Broadcast(ctx, &msgPack1) - assert.NoError(t, err) - assert.NotNil(t, ids) - assert.Equal(t, len(producerChannels), len(ids)) - for _, c := range producerChannels { - ids, ok := ids[c] - assert.True(t, ok) - assert.Equal(t, len(msgPack1.Msgs), len(ids)) - } - - // edge cases - _, err = outputStream.Broadcast(ctx, nil) - assert.Error(t, err) - - msgPack2 := MsgPack{} - msgPack2.Msgs = append(msgPack2.Msgs, &MarshalFailTsMsg{}) - _, err = outputStream.Broadcast(ctx, &msgPack2) - assert.Error(t, err) - - // mock send fail - for k, p := range outputStream.producers { - outputStream.producers[k] = &mockSendFailProducer{Producer: p} - } - _, err = outputStream.Broadcast(ctx, &msgPack1) - assert.Error(t, err) - - outputStream.Close() -} - var _ TsMsg = (*MarshalFailTsMsg)(nil) type MarshalFailTsMsg struct { @@ -1622,110 +509,3 @@ type messageID struct { type iface struct { Type, Data unsafe.Pointer } - -func TestMqttStream_continueBuffering(t *testing.T) { - defer Params.Save(Params.ServiceParam.MQCfg.EnablePursuitMode.Key, "false") - - ms := &MqTtMsgStream{ - mqMsgStream: &mqMsgStream{ - ctx: context.Background(), - }, - } - - t.Run("disable_pursuit_mode", func(t *testing.T) { - Params.Save(Params.ServiceParam.MQCfg.EnablePursuitMode.Key, "false") - Params.Save(Params.ServiceParam.MQCfg.PursuitLag.Key, "10") - Params.Save(Params.ServiceParam.MQCfg.PursuitBufferSize.Key, "1024") - - type testCase struct { - tag string - endTs uint64 - size uint64 - expect bool - startTime time.Time - } - - currTs := tsoutil.ComposeTSByTime(time.Now(), 0) - cases := []testCase{ - { - tag: "first_run", - endTs: 0, - size: 0, - expect: true, - startTime: time.Now(), - }, - { - tag: "lag_large", - endTs: 1, - size: 10, - expect: false, - startTime: time.Now(), - }, - { - tag: "currTs", - endTs: currTs, - size: 10, - expect: false, - startTime: time.Now(), - }, - { - tag: "bufferTs", - endTs: 10, - size: 0, - expect: false, - startTime: time.Now().Add(-time.Hour), - }, - } - for _, tc := range cases { - t.Run(tc.tag, func(t *testing.T) { - assert.Equal(t, tc.expect, ms.continueBuffering(tc.endTs, tc.size, tc.startTime)) - }) - } - }) - - t.Run("disable_pursuit_mode", func(t *testing.T) { - Params.Save(Params.ServiceParam.MQCfg.EnablePursuitMode.Key, "true") - Params.Save(Params.ServiceParam.MQCfg.PursuitLag.Key, "10") - Params.Save(Params.ServiceParam.MQCfg.PursuitBufferSize.Key, "1024") - - type testCase struct { - tag string - endTs uint64 - size uint64 - expect bool - } - - currTs := tsoutil.ComposeTSByTime(time.Now(), 0) - cases := []testCase{ - { - tag: "first_run", - endTs: 0, - size: 0, - expect: true, - }, - { - tag: "lag_large", - endTs: 1, - size: 10, - expect: true, - }, - { - tag: "currTs", - endTs: currTs, - size: 10, - expect: false, - }, - { - tag: "large_lag_buffer_full", - endTs: 1, - size: 2048, - expect: false, - }, - } - for _, tc := range cases { - t.Run(tc.tag, func(t *testing.T) { - assert.Equal(t, tc.expect, ms.continueBuffering(tc.endTs, tc.size, time.Now())) - }) - } - }) -} diff --git a/pkg/mq/msgstream/mq_rocksmq_msgstream_test.go b/pkg/mq/msgstream/mq_rocksmq_msgstream_test.go deleted file mode 100644 index 20538c026f..0000000000 --- a/pkg/mq/msgstream/mq_rocksmq_msgstream_test.go +++ /dev/null @@ -1,596 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package msgstream - -import ( - "context" - "fmt" - "sync" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus/pkg/v2/common" - mqcommon "github.com/milvus-io/milvus/pkg/v2/mq/common" - "github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper/rmq" - "github.com/milvus-io/milvus/pkg/v2/util/funcutil" -) - -func Test_NewMqMsgStream(t *testing.T) { - client, _ := rmq.NewClientWithDefaultOptions(context.TODO()) - defer client.Close() - - factory := &ProtoUDFactory{} - _, err := NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher()) - assert.NoError(t, err) -} - -// TODO(wxyu): add a mock implement of mqwrapper.Client, then inject errors to improve coverage -func TestMqMsgStream_AsProducer(t *testing.T) { - client, _ := rmq.NewClientWithDefaultOptions(context.TODO()) - defer client.Close() - - factory := &ProtoUDFactory{} - m, err := NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher()) - assert.NoError(t, err) - - // empty channel name - m.AsProducer(context.TODO(), []string{""}) -} - -// TODO(wxyu): add a mock implement of mqwrapper.Client, then inject errors to improve coverage -func TestMqMsgStream_AsConsumer(t *testing.T) { - client, _ := rmq.NewClientWithDefaultOptions(context.TODO()) - defer client.Close() - - factory := &ProtoUDFactory{} - m, err := NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher()) - assert.NoError(t, err) - - // repeat calling AsConsumer - m.AsConsumer(context.Background(), []string{"a"}, "b", mqcommon.SubscriptionPositionUnknown) - m.AsConsumer(context.Background(), []string{"a"}, "b", mqcommon.SubscriptionPositionUnknown) -} - -func TestMqMsgStream_ComputeProduceChannelIndexes(t *testing.T) { - client, _ := rmq.NewClientWithDefaultOptions(context.TODO()) - defer client.Close() - - factory := &ProtoUDFactory{} - m, err := NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher()) - assert.NoError(t, err) - - // empty parameters - reBucketValues := m.ComputeProduceChannelIndexes([]TsMsg{}) - assert.Nil(t, reBucketValues) - - // not called AsProducer yet - insertMsg := &InsertMsg{ - BaseMsg: generateBaseMsg(), - InsertRequest: &msgpb.InsertRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Insert, - MsgID: 1, - Timestamp: 2, - SourceID: 3, - }, - - DbName: "test_db", - CollectionName: "test_collection", - PartitionName: "test_partition", - DbID: 4, - CollectionID: 5, - PartitionID: 6, - SegmentID: 7, - ShardName: "test-channel", - Timestamps: []uint64{2, 1, 3}, - RowData: []*commonpb.Blob{}, - }, - } - reBucketValues = m.ComputeProduceChannelIndexes([]TsMsg{insertMsg}) - assert.Nil(t, reBucketValues) -} - -func TestMqMsgStream_GetProduceChannels(t *testing.T) { - client, _ := rmq.NewClientWithDefaultOptions(context.TODO()) - defer client.Close() - - factory := &ProtoUDFactory{} - m, err := NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher()) - assert.NoError(t, err) - - // empty if not called AsProducer yet - chs := m.GetProduceChannels() - assert.Equal(t, 0, len(chs)) - - // not empty after AsProducer - m.AsProducer(context.TODO(), []string{"a"}) - chs = m.GetProduceChannels() - assert.Equal(t, 1, len(chs)) -} - -func TestMqMsgStream_Produce(t *testing.T) { - client, _ := rmq.NewClientWithDefaultOptions(context.TODO()) - defer client.Close() - - factory := &ProtoUDFactory{} - m, err := NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher()) - assert.NoError(t, err) - - // Produce before called AsProducer - insertMsg := &InsertMsg{ - BaseMsg: generateBaseMsg(), - InsertRequest: &msgpb.InsertRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Insert, - MsgID: 1, - Timestamp: 2, - SourceID: 3, - }, - - DbName: "test_db", - CollectionName: "test_collection", - PartitionName: "test_partition", - DbID: 4, - CollectionID: 5, - PartitionID: 6, - SegmentID: 7, - ShardName: "test-channel", - Timestamps: []uint64{2, 1, 3}, - RowData: []*commonpb.Blob{}, - }, - } - msgPack := &MsgPack{ - Msgs: []TsMsg{insertMsg}, - } - err = m.Produce(context.TODO(), msgPack) - assert.Error(t, err) -} - -func TestMqMsgStream_Broadcast(t *testing.T) { - client, _ := rmq.NewClientWithDefaultOptions(context.TODO()) - defer client.Close() - - factory := &ProtoUDFactory{} - m, err := NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher()) - assert.NoError(t, err) - - // Broadcast nil pointer - _, err = m.Broadcast(context.TODO(), nil) - assert.Error(t, err) -} - -func TestMqMsgStream_Consume(t *testing.T) { - client, _ := rmq.NewClientWithDefaultOptions(context.TODO()) - defer client.Close() - - factory := &ProtoUDFactory{} - // Consume return nil when ctx canceled - var wg sync.WaitGroup - ctx, cancel := context.WithCancel(context.Background()) - m, err := NewMqMsgStream(ctx, 100, 100, client, factory.NewUnmarshalDispatcher()) - assert.NoError(t, err) - - wg.Add(1) - go func() { - defer wg.Done() - msgPack := consumer(ctx, m) - assert.Nil(t, msgPack) - }() - - cancel() - wg.Wait() -} - -func TestMqMsgStream_Chan(t *testing.T) { - client, _ := rmq.NewClientWithDefaultOptions(context.TODO()) - defer client.Close() - - factory := &ProtoUDFactory{} - m, err := NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher()) - assert.NoError(t, err) - - ch := m.Chan() - assert.NotNil(t, ch) -} - -func TestMqMsgStream_SeekNotSubscribed(t *testing.T) { - client, _ := rmq.NewClientWithDefaultOptions(context.TODO()) - defer client.Close() - - factory := &ProtoUDFactory{} - m, err := NewMqMsgStream(context.Background(), 100, 100, client, factory.NewUnmarshalDispatcher()) - assert.NoError(t, err) - - // seek in not subscribed channel - p := []*msgpb.MsgPosition{ - { - ChannelName: "b", - }, - } - err = m.Seek(context.Background(), p, false) - assert.Error(t, err) -} - -/****************************************Rmq test******************************************/ - -func initRmqStream(ctx context.Context, - producerChannels []string, - consumerChannels []string, - consumerGroupName string, - opts ...RepackFunc, -) (MsgStream, MsgStream) { - factory := ProtoUDFactory{} - - rmqClient, _ := rmq.NewClientWithDefaultOptions(ctx) - inputStream, _ := NewMqMsgStream(ctx, 100, 100, rmqClient, factory.NewUnmarshalDispatcher()) - inputStream.AsProducer(ctx, producerChannels) - for _, opt := range opts { - inputStream.SetRepackFunc(opt) - } - var input MsgStream = inputStream - - rmqClient2, _ := rmq.NewClientWithDefaultOptions(ctx) - outputStream, _ := NewMqMsgStream(ctx, 100, 100, rmqClient2, factory.NewUnmarshalDispatcher()) - outputStream.AsConsumer(ctx, consumerChannels, consumerGroupName, mqcommon.SubscriptionPositionEarliest) - var output MsgStream = outputStream - - return input, output -} - -func initRmqTtStream(ctx context.Context, - producerChannels []string, - consumerChannels []string, - consumerGroupName string, - opts ...RepackFunc, -) (MsgStream, MsgStream) { - factory := ProtoUDFactory{} - - rmqClient, _ := rmq.NewClientWithDefaultOptions(ctx) - inputStream, _ := NewMqMsgStream(ctx, 100, 100, rmqClient, factory.NewUnmarshalDispatcher()) - inputStream.AsProducer(ctx, producerChannels) - for _, opt := range opts { - inputStream.SetRepackFunc(opt) - } - var input MsgStream = inputStream - - rmqClient2, _ := rmq.NewClientWithDefaultOptions(ctx) - outputStream, _ := NewMqTtMsgStream(ctx, 100, 100, rmqClient2, factory.NewUnmarshalDispatcher()) - outputStream.AsConsumer(ctx, consumerChannels, consumerGroupName, mqcommon.SubscriptionPositionEarliest) - var output MsgStream = outputStream - - return input, output -} - -func TestStream_RmqMsgStream_Insert(t *testing.T) { - producerChannels := []string{"insert1", "insert2"} - consumerChannels := []string{"insert1", "insert2"} - consumerGroupName := "InsertGroup" - - msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 3)) - - ctx := context.Background() - inputStream, outputStream := initRmqStream(ctx, producerChannels, consumerChannels, consumerGroupName) - err := inputStream.Produce(ctx, &msgPack) - require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err)) - - receiveMsg(ctx, outputStream, len(msgPack.Msgs)) - inputStream.Close() - outputStream.Close() -} - -func TestStream_RmqTtMsgStream_Insert(t *testing.T) { - producerChannels := []string{"insert1", "insert2"} - consumerChannels := []string{"insert1", "insert2"} - consumerSubName := "subInsert" - - msgPack0 := MsgPack{} - msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) - - msgPack1 := MsgPack{} - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3)) - - msgPack2 := MsgPack{} - msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5)) - - ctx := context.Background() - inputStream, outputStream := initRmqTtStream(ctx, producerChannels, consumerChannels, consumerSubName) - - _, err := inputStream.Broadcast(ctx, &msgPack0) - require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) - - err = inputStream.Produce(ctx, &msgPack1) - require.NoErrorf(t, err, fmt.Sprintf("produce error = %v", err)) - - _, err = inputStream.Broadcast(ctx, &msgPack2) - require.NoErrorf(t, err, fmt.Sprintf("broadcast error = %v", err)) - - receiveMsg(ctx, outputStream, len(msgPack1.Msgs)) - inputStream.Close() - outputStream.Close() -} - -func TestStream_RmqTtMsgStream_DuplicatedIDs(t *testing.T) { - c1 := funcutil.RandomString(8) - producerChannels := []string{c1} - consumerChannels := []string{c1} - consumerSubName := funcutil.RandomString(8) - - msgPack0 := MsgPack{} - msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) - - msgPack1 := MsgPack{} - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) - - // would not dedup for non-dml messages - msgPack2 := MsgPack{} - msgPack2.Msgs = append(msgPack2.Msgs, getTsMsg(commonpb.MsgType_CreateCollection, 2)) - msgPack2.Msgs = append(msgPack2.Msgs, getTsMsg(commonpb.MsgType_CreateCollection, 2)) - - msgPack3 := MsgPack{} - msgPack3.Msgs = append(msgPack3.Msgs, getTimeTickMsg(15)) - - ctx := context.Background() - inputStream, outputStream := initRmqTtStream(ctx, producerChannels, consumerChannels, consumerSubName) - - _, err := inputStream.Broadcast(ctx, &msgPack0) - assert.NoError(t, err) - err = inputStream.Produce(ctx, &msgPack1) - assert.NoError(t, err) - err = inputStream.Produce(ctx, &msgPack2) - assert.NoError(t, err) - _, err = inputStream.Broadcast(ctx, &msgPack3) - assert.NoError(t, err) - - receivedMsg := consumer(ctx, outputStream) - assert.Equal(t, len(receivedMsg.Msgs), 3) - assert.Equal(t, receivedMsg.BeginTs, uint64(0)) - assert.Equal(t, receivedMsg.EndTs, uint64(15)) - - outputStream.Close() - - factory := ProtoUDFactory{} - - rmqClient, _ := rmq.NewClientWithDefaultOptions(ctx) - outputStream, _ = NewMqTtMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher()) - consumerSubName = funcutil.RandomString(8) - outputStream.AsConsumer(ctx, consumerChannels, consumerSubName, mqcommon.SubscriptionPositionUnknown) - outputStream.Seek(ctx, receivedMsg.StartPositions, false) - seekMsg := consumer(ctx, outputStream) - assert.Equal(t, len(seekMsg.Msgs), 1+2) - assert.EqualValues(t, seekMsg.Msgs[0].GetTimestamp(), 1) - assert.Equal(t, commonpb.MsgType_CreateCollection, seekMsg.Msgs[1].GetType()) - assert.Equal(t, commonpb.MsgType_CreateCollection, seekMsg.Msgs[2].GetType()) - - inputStream.Close() - outputStream.Close() -} - -func TestStream_RmqTtMsgStream_Seek(t *testing.T) { - c1 := funcutil.RandomString(8) - producerChannels := []string{c1} - consumerChannels := []string{c1} - consumerSubName := funcutil.RandomString(8) - - msgPack0 := MsgPack{} - msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) - - msgPack1 := MsgPack{} - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3)) - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 19)) - - msgPack2 := MsgPack{} - msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5)) - - msgPack3 := MsgPack{} - msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 14)) - msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 9)) - - msgPack4 := MsgPack{} - msgPack4.Msgs = append(msgPack4.Msgs, getTimeTickMsg(11)) - - msgPack5 := MsgPack{} - msgPack5.Msgs = append(msgPack5.Msgs, getTsMsg(commonpb.MsgType_Insert, 12)) - msgPack5.Msgs = append(msgPack5.Msgs, getTsMsg(commonpb.MsgType_Insert, 13)) - - msgPack6 := MsgPack{} - msgPack6.Msgs = append(msgPack6.Msgs, getTimeTickMsg(15)) - - msgPack7 := MsgPack{} - msgPack7.Msgs = append(msgPack7.Msgs, getTimeTickMsg(20)) - - ctx := context.Background() - inputStream, outputStream := initRmqTtStream(ctx, producerChannels, consumerChannels, consumerSubName) - - _, err := inputStream.Broadcast(ctx, &msgPack0) - assert.NoError(t, err) - err = inputStream.Produce(ctx, &msgPack1) - assert.NoError(t, err) - _, err = inputStream.Broadcast(ctx, &msgPack2) - assert.NoError(t, err) - err = inputStream.Produce(ctx, &msgPack3) - assert.NoError(t, err) - _, err = inputStream.Broadcast(ctx, &msgPack4) - assert.NoError(t, err) - err = inputStream.Produce(ctx, &msgPack5) - assert.NoError(t, err) - _, err = inputStream.Broadcast(ctx, &msgPack6) - assert.NoError(t, err) - _, err = inputStream.Broadcast(ctx, &msgPack7) - assert.NoError(t, err) - - receivedMsg := consumer(ctx, outputStream) - assert.Equal(t, len(receivedMsg.Msgs), 2) - assert.Equal(t, receivedMsg.BeginTs, uint64(0)) - assert.Equal(t, receivedMsg.EndTs, uint64(5)) - - assert.Equal(t, receivedMsg.StartPositions[0].Timestamp, uint64(0)) - assert.Equal(t, receivedMsg.EndPositions[0].Timestamp, uint64(5)) - - receivedMsg2 := consumer(ctx, outputStream) - assert.Equal(t, len(receivedMsg2.Msgs), 1) - assert.Equal(t, receivedMsg2.BeginTs, uint64(5)) - assert.Equal(t, receivedMsg2.EndTs, uint64(11)) - assert.Equal(t, receivedMsg2.StartPositions[0].Timestamp, uint64(5)) - assert.Equal(t, receivedMsg2.EndPositions[0].Timestamp, uint64(11)) - - receivedMsg3 := consumer(ctx, outputStream) - assert.Equal(t, len(receivedMsg3.Msgs), 3) - assert.Equal(t, receivedMsg3.BeginTs, uint64(11)) - assert.Equal(t, receivedMsg3.EndTs, uint64(15)) - assert.Equal(t, receivedMsg3.StartPositions[0].Timestamp, uint64(11)) - assert.Equal(t, receivedMsg3.EndPositions[0].Timestamp, uint64(15)) - - receivedMsg4 := consumer(ctx, outputStream) - assert.Equal(t, len(receivedMsg4.Msgs), 1) - assert.Equal(t, receivedMsg4.BeginTs, uint64(15)) - assert.Equal(t, receivedMsg4.EndTs, uint64(20)) - assert.Equal(t, receivedMsg4.StartPositions[0].Timestamp, uint64(15)) - assert.Equal(t, receivedMsg4.EndPositions[0].Timestamp, uint64(20)) - - outputStream.Close() - - factory := ProtoUDFactory{} - - rmqClient, _ := rmq.NewClientWithDefaultOptions(ctx) - outputStream, _ = NewMqTtMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher()) - consumerSubName = funcutil.RandomString(8) - outputStream.AsConsumer(ctx, consumerChannels, consumerSubName, mqcommon.SubscriptionPositionUnknown) - - outputStream.Seek(ctx, receivedMsg3.StartPositions, false) - seekMsg := consumer(ctx, outputStream) - assert.Equal(t, len(seekMsg.Msgs), 3) - result := []uint64{14, 12, 13} - for i, msg := range seekMsg.Msgs { - tsMsg, err := msg.Unmarshal(outputStream.GetUnmarshalDispatcher()) - require.NoError(t, err) - assert.Equal(t, tsMsg.BeginTs(), result[i]) - } - - seekMsg2 := consumer(ctx, outputStream) - assert.Equal(t, len(seekMsg2.Msgs), 1) - for _, msg := range seekMsg2.Msgs { - tsMsg, err := msg.Unmarshal(outputStream.GetUnmarshalDispatcher()) - require.NoError(t, err) - assert.Equal(t, tsMsg.BeginTs(), uint64(19)) - } - - inputStream.Close() - outputStream.Close() -} - -func TestStream_RMqMsgStream_SeekInvalidMessage(t *testing.T) { - c := funcutil.RandomString(8) - producerChannels := []string{c} - consumerChannels := []string{c} - consumerSubName := funcutil.RandomString(8) - ctx := context.Background() - inputStream, outputStream := initRmqStream(ctx, producerChannels, consumerChannels, consumerSubName) - - msgPack := &MsgPack{} - for i := 0; i < 10; i++ { - insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i)) - msgPack.Msgs = append(msgPack.Msgs, insertMsg) - } - - err := inputStream.Produce(ctx, msgPack) - assert.NoError(t, err) - var seekPosition *msgpb.MsgPosition - for i := 0; i < 10; i++ { - result := consumer(ctx, outputStream) - assert.Equal(t, result.Msgs[0].GetID(), int64(i)) - seekPosition = result.EndPositions[0] - } - outputStream.Close() - - factory := ProtoUDFactory{} - rmqClient2, _ := rmq.NewClientWithDefaultOptions(ctx) - outputStream2, _ := NewMqMsgStream(ctx, 100, 100, rmqClient2, factory.NewUnmarshalDispatcher()) - outputStream2.AsConsumer(ctx, consumerChannels, funcutil.RandomString(8), mqcommon.SubscriptionPositionUnknown) - - id := common.Endian.Uint64(seekPosition.MsgID) + 10 - bs := make([]byte, 8) - common.Endian.PutUint64(bs, id) - p := []*msgpb.MsgPosition{ - { - ChannelName: seekPosition.ChannelName, - Timestamp: seekPosition.Timestamp, - MsgGroup: seekPosition.MsgGroup, - MsgID: bs, - }, - } - - err = outputStream2.Seek(ctx, p, false) - assert.NoError(t, err) - - for i := 10; i < 20; i++ { - insertMsg := getTsMsg(commonpb.MsgType_Insert, int64(i)) - msgPack.Msgs = append(msgPack.Msgs, insertMsg) - } - err = inputStream.Produce(ctx, msgPack) - assert.NoError(t, err) - - result := consumer(ctx, outputStream2) - assert.Equal(t, result.Msgs[0].GetID(), int64(1)) - - inputStream.Close() - outputStream2.Close() -} - -func TestStream_RmqTtMsgStream_AsConsumerWithPosition(t *testing.T) { - producerChannels := []string{"insert1"} - consumerChannels := []string{"insert1"} - consumerSubName := "subInsert" - ctx := context.Background() - - factory := ProtoUDFactory{} - - rmqClient, _ := rmq.NewClientWithDefaultOptions(context.Background()) - - otherInputStream, _ := NewMqMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher()) - otherInputStream.AsProducer(context.TODO(), []string{"root_timetick"}) - otherInputStream.Produce(ctx, getTimeTickMsgPack(999)) - - inputStream, _ := NewMqMsgStream(context.Background(), 100, 100, rmqClient, factory.NewUnmarshalDispatcher()) - inputStream.AsProducer(context.TODO(), producerChannels) - - for i := 0; i < 100; i++ { - inputStream.Produce(ctx, getTimeTickMsgPack(int64(i))) - } - - rmqClient2, _ := rmq.NewClientWithDefaultOptions(context.Background()) - outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, rmqClient2, factory.NewUnmarshalDispatcher()) - outputStream.AsConsumer(context.Background(), consumerChannels, consumerSubName, mqcommon.SubscriptionPositionLatest) - - inputStream.Produce(ctx, getTimeTickMsgPack(1000)) - pack := <-outputStream.Chan() - assert.NotNil(t, pack) - assert.Equal(t, 1, len(pack.Msgs)) - assert.EqualValues(t, 1000, pack.Msgs[0].GetTimestamp()) - - inputStream.Close() - outputStream.Close() -} diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client_test.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client_test.go index e204b786a3..2845ae3c8e 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client_test.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client_test.go @@ -369,42 +369,6 @@ func createKafkaConfig(opts ...kafkaCfgOption) *paramtable.KafkaConfig { return cfg } -func TestKafkaClient_NewKafkaClientInstanceWithConfig(t *testing.T) { - config1 := createKafkaConfig(withAddr("addr"), withPasswd("password")) - - assert.Panics(t, func() { NewKafkaClientInstanceWithConfig(context.Background(), config1) }) - - config2 := createKafkaConfig(withAddr("addr"), withUsername("username")) - assert.Panics(t, func() { NewKafkaClientInstanceWithConfig(context.Background(), config2) }) - - producerConfig := make(map[string]string) - producerConfig["client.id"] = "dc1" - consumerConfig := make(map[string]string) - consumerConfig["client.id"] = "dc" - - config := createKafkaConfig(withKafkaUseSSL("false"), withAddr("addr"), withUsername("username"), - withPasswd("password"), withMechanism("sasl"), withProtocol("plain")) - config.ConsumerExtraConfig = paramtable.ParamGroup{GetFunc: func() map[string]string { return consumerConfig }} - config.ProducerExtraConfig = paramtable.ParamGroup{GetFunc: func() map[string]string { return producerConfig }} - - client, err := NewKafkaClientInstanceWithConfig(context.Background(), config) - assert.NoError(t, err) - assert.NotNil(t, client) - assert.NotNil(t, client.basicConfig) - - assert.Equal(t, "dc", client.consumerConfig["client.id"]) - newConsumerConfig := client.newConsumerConfig("test", 0) - clientID, err := newConsumerConfig.Get("client.id", "") - assert.NoError(t, err) - assert.Equal(t, "dc", clientID) - - assert.Equal(t, "dc1", client.producerConfig["client.id"]) - newProducerConfig := client.newProducerConfig() - pClientID, err := newProducerConfig.Get("client.id", "") - assert.NoError(t, err) - assert.Equal(t, pClientID, "dc1") -} - func createKafkaClient(t *testing.T) *kafkaClient { kafkaAddress := getKafkaBrokerList() kc := NewKafkaClientInstance(kafkaAddress) diff --git a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client.go b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client.go index ca48ec39c0..35ad46edb2 100644 --- a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client.go +++ b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client.go @@ -119,6 +119,7 @@ func (pc *pulsarClient) Subscribe(ctx context.Context, options mqwrapper.Consume Type: pulsar.Exclusive, SubscriptionInitialPosition: pulsar.SubscriptionInitialPosition(options.SubscriptionInitialPosition), MessageChannel: receiveChannel, + StartMessageIDInclusive: true, }) if err != nil { metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc() diff --git a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go index c9797c65b8..dde30e880d 100644 --- a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go +++ b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go @@ -22,14 +22,12 @@ import ( "encoding/binary" "fmt" "math/rand" - "net/url" "os" "testing" "time" "unsafe" "github.com/apache/pulsar-client-go/pulsar" - "github.com/streamnative/pulsarctl/pkg/pulsar/utils" "github.com/stretchr/testify/assert" "go.uber.org/zap" @@ -207,56 +205,6 @@ func Consume3(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, } } -func TestPulsarClient_Consume1(t *testing.T) { - pulsarAddress := getPulsarAddress() - pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) - defer pc.Close() - assert.NoError(t, err) - assert.NotNil(t, pc) - rand.Seed(time.Now().UnixNano()) - - topic := fmt.Sprintf("test-topic-%d", rand.Int()) - subName := fmt.Sprintf("test-subname-%d", rand.Int()) - arr := []int{111, 222, 333, 444, 555, 666, 777} - c := make(chan mqcommon.MessageID, 1) - - ctx, cancel := context.WithCancel(context.Background()) - - var total1 int - var total2 int - var total3 int - - // launch produce - Produce(ctx, t, pc, topic, arr) - time.Sleep(100 * time.Millisecond) - - // launch consume1 - ctx1, cancel1 := context.WithTimeout(ctx, 2*time.Second) - defer cancel1() - Consume1(ctx1, t, pc, topic, subName, c, &total1) - - // record the last received message id - lastMsgID := <-c - log.Info("msg", zap.Any("lastMsgID", lastMsgID)) - - // launch consume2 - ctx2, cancel2 := context.WithTimeout(ctx, 2*time.Second) - defer cancel2() - Consume2(ctx2, t, pc, topic, subName, lastMsgID, &total2) - - // launch consume3 - ctx3, cancel3 := context.WithTimeout(ctx, 2*time.Second) - defer cancel3() - Consume3(ctx3, t, pc, topic, subName, &total3) - - // stop Consume2 - cancel() - assert.Equal(t, len(arr), total1+total2) - assert.Equal(t, len(arr), total3) - - log.Info("main done") -} - func Consume21(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, subName string, c chan mqcommon.MessageID, total *int) { consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{ Topic: topic, @@ -358,134 +306,6 @@ func Consume23(ctx context.Context, t *testing.T, pc *pulsarClient, topic string } } -func TestPulsarClient_Consume2(t *testing.T) { - pulsarAddress := getPulsarAddress() - pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) - defer pc.Close() - assert.NoError(t, err) - assert.NotNil(t, pc) - rand.Seed(time.Now().UnixNano()) - - topic := fmt.Sprintf("test-topic-%d", rand.Int()) - subName := fmt.Sprintf("test-subname-%d", rand.Int()) - arr := []int{111, 222, 333, 444, 555, 666, 777} - c := make(chan mqcommon.MessageID, 1) - - ctx, cancel := context.WithCancel(context.Background()) - - var total1 int - var total2 int - var total3 int - - // launch produce - Produce(ctx, t, pc, topic, arr) - time.Sleep(100 * time.Millisecond) - - // launch consume1 - ctx1, cancel1 := context.WithTimeout(ctx, 2*time.Second) - defer cancel1() - Consume21(ctx1, t, pc, topic, subName, c, &total1) - - // record the last received message id - lastMsgID := <-c - log.Info("msg", zap.Any("lastMsgID", lastMsgID)) - - // launch consume2 - ctx2, cancel2 := context.WithTimeout(ctx, 2*time.Second) - defer cancel2() - Consume22(ctx2, t, pc, topic, subName, lastMsgID, &total2) - - // launch consume3 - ctx3, cancel3 := context.WithTimeout(ctx, 2*time.Second) - defer cancel3() - Consume23(ctx3, t, pc, topic, subName, &total3) - - // stop Consume2 - cancel() - assert.Equal(t, len(arr), total1+total2) - assert.Equal(t, 0, total3) - - log.Info("main done") -} - -func TestPulsarClient_SeekPosition(t *testing.T) { - pulsarAddress := getPulsarAddress() - pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) - defer pc.Close() - assert.NoError(t, err) - assert.NotNil(t, pc) - rand.Seed(time.Now().UnixNano()) - - ctx := context.Background() - topic := fmt.Sprintf("test-topic-%d", rand.Int()) - subName := fmt.Sprintf("test-subname-%d", rand.Int()) - - producer, err := pc.CreateProducer(ctx, mqcommon.ProducerOptions{Topic: topic}) - assert.NoError(t, err) - assert.NotNil(t, producer) - - log.Info("Produce start") - ids := []mqcommon.MessageID{} - arr1 := []int{1, 2, 3} - arr2 := []string{"1", "2", "3"} - for k, v := range arr1 { - msg := &mqcommon.ProducerMessage{ - Payload: IntToBytes(v), - Properties: map[string]string{ - common.TraceIDKey: arr2[k], - }, - } - id, err := producer.Send(ctx, msg) - ids = append(ids, id) - assert.NoError(t, err) - } - - log.Info("Produced") - - consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{ - Topic: topic, - SubscriptionName: subName, - Type: pulsar.KeyShared, - SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, - }) - assert.NoError(t, err) - assert.NotNil(t, consumer) - defer consumer.Close() - seekID := ids[2].(*pulsarID).messageID - consumer.Seek(seekID) - - msgChan := consumer.Chan() - - select { - case msg := <-msgChan: - assert.Equal(t, seekID.BatchIdx(), msg.ID().BatchIdx()) - assert.Equal(t, seekID.LedgerID(), msg.ID().LedgerID()) - assert.Equal(t, seekID.EntryID(), msg.ID().EntryID()) - assert.Equal(t, seekID.PartitionIdx(), msg.ID().PartitionIdx()) - assert.Equal(t, 3, BytesToInt(msg.Payload())) - assert.Equal(t, "3", msg.Properties()[common.TraceIDKey]) - case <-time.After(2 * time.Second): - assert.FailNow(t, "should not wait") - } - - seekID = ids[1].(*pulsarID).messageID - consumer.Seek(seekID) - - msgChan = consumer.Chan() - - select { - case msg := <-msgChan: - assert.Equal(t, seekID.BatchIdx(), msg.ID().BatchIdx()) - assert.Equal(t, seekID.LedgerID(), msg.ID().LedgerID()) - assert.Equal(t, seekID.EntryID(), msg.ID().EntryID()) - assert.Equal(t, seekID.PartitionIdx(), msg.ID().PartitionIdx()) - assert.Equal(t, 2, BytesToInt(msg.Payload())) - assert.Equal(t, "2", msg.Properties()[common.TraceIDKey]) - case <-time.After(2 * time.Second): - assert.FailNow(t, "should not wait") - } -} - func TestPulsarClient_SeekLatest(t *testing.T) { pulsarAddress := getPulsarAddress() pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) @@ -706,73 +526,6 @@ func TestPulsarClient_WithTenantAndNamespace(t *testing.T) { assert.NotNil(t, consumer) } -func TestPulsarCtl(t *testing.T) { - topic := "test-pulsar-ctl" - subName := "hello" - - pulsarAddress := getPulsarAddress() - pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) - assert.NoError(t, err) - consumer, err := pc.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{ - Topic: topic, - SubscriptionName: subName, - BufSize: 1024, - SubscriptionInitialPosition: mqcommon.SubscriptionPositionEarliest, - }) - assert.NoError(t, err) - assert.NotNil(t, consumer) - defer consumer.Close() - - _, err = pc.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{ - Topic: topic, - SubscriptionName: subName, - BufSize: 1024, - SubscriptionInitialPosition: mqcommon.SubscriptionPositionEarliest, - }) - - assert.Error(t, err) - - _, err = pc.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{ - Topic: topic, - SubscriptionName: subName, - BufSize: 1024, - SubscriptionInitialPosition: mqcommon.SubscriptionPositionEarliest, - }) - assert.Error(t, err) - - fullTopicName, err := GetFullTopicName(DefaultPulsarTenant, DefaultPulsarNamespace, topic) - assert.NoError(t, err) - topicName, err := utils.GetTopicName(fullTopicName) - assert.NoError(t, err) - - pulsarURL, err := url.ParseRequestURI(pulsarAddress) - if err != nil { - panic(err) - } - webport := Params.PulsarCfg.WebPort.GetValue() - webServiceURL := "http://" + pulsarURL.Hostname() + ":" + webport - admin, err := NewAdminClient(webServiceURL, "", "") - assert.NoError(t, err) - err = admin.Subscriptions().Delete(*topicName, subName, true) - if err != nil { - webServiceURL = "http://" + pulsarURL.Hostname() + ":" + "8080" - admin, err := NewAdminClient(webServiceURL, "", "") - assert.NoError(t, err) - err = admin.Subscriptions().Delete(*topicName, subName, true) - assert.NoError(t, err) - } - - consumer2, err := pc.Subscribe(context.TODO(), mqwrapper.ConsumerOptions{ - Topic: topic, - SubscriptionName: subName, - BufSize: 1024, - SubscriptionInitialPosition: mqcommon.SubscriptionPositionEarliest, - }) - assert.NoError(t, err) - assert.NotNil(t, consumer2) - defer consumer2.Close() -} - func NewPulsarAdminClient() { panic("unimplemented") } diff --git a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go index bc052eff26..fdc43cd38e 100644 --- a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go +++ b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go @@ -19,12 +19,9 @@ package pulsar import ( "context" "fmt" - "net/url" - "strings" "testing" "github.com/apache/pulsar-client-go/pulsar" - "github.com/streamnative/pulsarctl/pkg/pulsar/utils" "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus/pkg/v2/common" @@ -140,85 +137,6 @@ func TestPulsarConsumer_Close(t *testing.T) { pulsarConsumer.Close() } -func TestPulsarClientCloseUnsubscribeError(t *testing.T) { - topic := "TestPulsarClientCloseUnsubscribeError" - subName := "test" - pulsarAddress := getPulsarAddress() - - client, err := pulsar.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) - defer client.Close() - assert.NoError(t, err) - - consumer, err := client.Subscribe(pulsar.ConsumerOptions{ - Topic: topic, - SubscriptionName: subName, - Type: pulsar.Exclusive, - SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, - }) - defer consumer.Close() - assert.NoError(t, err) - - // subscribe agiain - _, err = client.Subscribe(pulsar.ConsumerOptions{ - Topic: topic, - SubscriptionName: subName, - Type: pulsar.Exclusive, - SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, - }) - defer consumer.Close() - assert.Error(t, err) - assert.True(t, strings.Contains(err.Error(), "ConsumerBusy")) - - topicName, err := utils.GetTopicName(topic) - assert.NoError(t, err) - - pulsarURL, err := url.ParseRequestURI(pulsarAddress) - if err != nil { - panic(err) - } - webport := Params.PulsarCfg.WebPort.GetValue() - webServiceURL := "http://" + pulsarURL.Hostname() + ":" + webport - admin, err := NewAdminClient(webServiceURL, "", "") - assert.NoError(t, err) - err = admin.Subscriptions().Delete(*topicName, subName, true) - if err != nil { - webServiceURL = "http://" + pulsarURL.Hostname() + ":" + "8080" - admin, err := NewAdminClient(webServiceURL, "", "") - assert.NoError(t, err) - err = admin.Subscriptions().Delete(*topicName, subName, true) - assert.NoError(t, err) - } - - err = consumer.Unsubscribe() - assert.True(t, strings.Contains(err.Error(), "Consumer not found")) - t.Log(err) -} - -func TestPulsarClientUnsubscribeTwice(t *testing.T) { - topic := "TestPulsarClientUnsubscribeTwice" - subName := "test" - pulsarAddress := getPulsarAddress() - - client, err := pulsar.NewClient(pulsar.ClientOptions{URL: pulsarAddress}) - defer client.Close() - assert.NoError(t, err) - - consumer, err := client.Subscribe(pulsar.ConsumerOptions{ - Topic: topic, - SubscriptionName: subName, - Type: pulsar.Exclusive, - SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, - }) - defer consumer.Close() - assert.NoError(t, err) - - err = consumer.Unsubscribe() - assert.NoError(t, err) - err = consumer.Unsubscribe() - assert.True(t, strings.Contains(err.Error(), "Consumer not found")) - t.Log(err) -} - func TestCheckPreTopicValid(t *testing.T) { pulsarAddress := getPulsarAddress() pc, err := NewClient(DefaultPulsarTenant, DefaultPulsarNamespace, pulsar.ClientOptions{URL: pulsarAddress}) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index b052b24e22..830be3b259 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -48,7 +48,7 @@ const ( DefaultMiddlePriorityThreadCoreCoefficient = 5 DefaultLowPriorityThreadCoreCoefficient = 1 - DefaultSessionTTL = 10 // s + DefaultSessionTTL = 15 // s DefaultSessionRetryTimes = 30 DefaultMaxDegree = 56 @@ -853,7 +853,7 @@ Large numeric passwords require double quotes to avoid yaml parsing precision is p.SessionTTL = ParamItem{ Key: "common.session.ttl", Version: "2.0.0", - DefaultValue: "30", + DefaultValue: "15", Doc: "ttl value when session granting a lease to register service", Export: true, } diff --git a/pkg/util/timestamptz/timestamptz_test.go b/pkg/util/timestamptz/timestamptz_test.go index 1ff1c80e46..e713124575 100644 --- a/pkg/util/timestamptz/timestamptz_test.go +++ b/pkg/util/timestamptz/timestamptz_test.go @@ -57,12 +57,20 @@ func TestValidateAndNormalizeTimestampTz(t *testing.T) { name: "Case 6: Invalid Default Timezone", inputStr: "2024-10-23T15:30:00Z", defaultTZ: "Invalid/TZ", + expectedOutput: "2024-10-23T15:30:00Z", + expectError: false, + errorContainsMsg: "", + }, + { + name: "Case 7: Invalid Default Timezone", + inputStr: "2024-10-23T15:30:00", + defaultTZ: "Invalid/TZ", expectedOutput: "", expectError: true, errorContainsMsg: "invalid default timezone string", }, { - name: "Case 7: Offset Too High (+15:00)", + name: "Case 8: Offset Too High (+15:00)", inputStr: "2024-10-23T15:30:00+15:00", defaultTZ: "UTC", expectedOutput: "", @@ -70,7 +78,7 @@ func TestValidateAndNormalizeTimestampTz(t *testing.T) { errorContainsMsg: "UTC offset hour 15 is out of the valid range", }, { - name: "Case 8: Offset Too Low (-13:00)", + name: "Case 9: Offset Too Low (-13:00)", inputStr: "2024-10-23T15:30:00-13:00", defaultTZ: "UTC", expectedOutput: "", diff --git a/scripts/run_go_codecov.sh b/scripts/run_go_codecov.sh index 53f681e56f..bd94468459 100755 --- a/scripts/run_go_codecov.sh +++ b/scripts/run_go_codecov.sh @@ -36,22 +36,22 @@ fi # starting the timer beginTime=`date +%s` pushd cmd/tools -$TEST_CMD -gcflags="all=-N -l" -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic ./... +$TEST_CMD -gcflags="all=-N -l" -race -tags dynamic,test -v -buildvcs=false -coverpkg=./... -coverprofile=profile.out -covermode=atomic ./... if [ -f profile.out ]; then grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ../${FILE_COVERAGE_INFO} rm profile.out fi popd -for d in $(go list ./internal/... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do - $TEST_CMD -gcflags="all=-N -l" -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" +for d in $(go list -buildvcs=false ./internal/... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do + $TEST_CMD -gcflags="all=-N -l" -race -tags dynamic,test -v -buildvcs=false -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" if [ -f profile.out ]; then grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ${FILE_COVERAGE_INFO} rm profile.out fi done pushd pkg -for d in $(go list ./... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do - $TEST_CMD -gcflags="all=-N -l" -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" +for d in $(go list -buildvcs=false ./... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do + $TEST_CMD -gcflags="all=-N -l" -race -tags dynamic,test -v -buildvcs=false -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" if [ -f profile.out ]; then grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ../${FILE_COVERAGE_INFO} rm profile.out @@ -60,8 +60,8 @@ done popd # milvusclient pushd client -for d in $(go list ./... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do - $TEST_CMD -gcflags="all=-N -l" -race -tags dynamic -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" +for d in $(go list -buildvcs=false ./... | grep -v -e vendor -e kafka -e planparserv2/generated -e mocks); do + $TEST_CMD -gcflags="all=-N -l" -race -tags dynamic -v -buildvcs=false -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" if [ -f profile.out ]; then grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ../${FILE_COVERAGE_INFO} rm profile.out