From ac9dde7352cca94d51685d78a8d7bad64d15d2bf Mon Sep 17 00:00:00 2001 From: neza2017 Date: Thu, 10 Jun 2021 20:36:21 +0800 Subject: [PATCH] fix seek on pulsar msgstream (#5726) * fix seek on pulsar msgstream Signed-off-by: yefu.chen * test seek on pulsar msgstream Signed-off-by: yefu.chen * msg-seek test test Signed-off-by: yefu.chen * add pure pulsar client seek test Signed-off-by: yefu.chen --- internal/msgstream/mq_msgstream_test.go | 66 ++++++ internal/util/mqclient/pulsar_client.go | 2 + internal/util/mqclient/pulsar_client_test.go | 228 +++++++++++++++++-- internal/util/mqclient/pulsar_consumer.go | 12 +- 4 files changed, 293 insertions(+), 15 deletions(-) diff --git a/internal/msgstream/mq_msgstream_test.go b/internal/msgstream/mq_msgstream_test.go index a625cfb7b0..d96b581d3d 100644 --- a/internal/msgstream/mq_msgstream_test.go +++ b/internal/msgstream/mq_msgstream_test.go @@ -632,6 +632,72 @@ func TestStream_PulsarTtMsgStream_Insert(t *testing.T) { outputStream.Close() } +func TestStream_PulsarTtMsgStream_NoSeek(t *testing.T) { + pulsarAddress, _ := Params.Load("_PulsarAddress") + c1 := funcutil.RandomString(8) + producerChannels := []string{c1} + consumerChannels := []string{c1} + consumerSubName := funcutil.RandomString(8) + + msgPack0 := MsgPack{} + msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) + + msgPack1 := MsgPack{} + msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) + msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 19)) + + msgPack2 := MsgPack{} + msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5)) + + msgPack3 := MsgPack{} + msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 14)) + msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 9)) + + msgPack4 := MsgPack{} + msgPack4.Msgs = append(msgPack4.Msgs, getTimeTickMsg(11)) + + msgPack5 := MsgPack{} + msgPack5.Msgs = append(msgPack5.Msgs, getTimeTickMsg(15)) + + inputStream := getPulsarInputStream(pulsarAddress, producerChannels) + outputStream := getPulsarTtOutputStream(pulsarAddress, consumerChannels, consumerSubName) + + err := inputStream.Broadcast(&msgPack0) + assert.Nil(t, err) + err = inputStream.Produce(&msgPack1) + assert.Nil(t, err) + err = inputStream.Broadcast(&msgPack2) + assert.Nil(t, err) + err = inputStream.Produce(&msgPack3) + assert.Nil(t, err) + err = inputStream.Broadcast(&msgPack4) + assert.Nil(t, err) + err = inputStream.Broadcast(&msgPack5) + assert.Nil(t, err) + + o1 := outputStream.Consume() + o2 := outputStream.Consume() + o3 := outputStream.Consume() + + t.Log(o1.BeginTs) + t.Log(o2.BeginTs) + t.Log(o3.BeginTs) + outputStream.Close() + + outputStream = getPulsarTtOutputStream(pulsarAddress, consumerChannels, consumerSubName) + p1 := outputStream.Consume() + p2 := outputStream.Consume() + p3 := outputStream.Consume() + t.Log(p1.BeginTs) + t.Log(p2.BeginTs) + t.Log(p3.BeginTs) + outputStream.Close() + + assert.Equal(t, o1.BeginTs, p1.BeginTs) + assert.Equal(t, o2.BeginTs, p2.BeginTs) + assert.Equal(t, o3.BeginTs, p3.BeginTs) + +} func TestStream_PulsarTtMsgStream_Seek(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress") c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) diff --git a/internal/util/mqclient/pulsar_client.go b/internal/util/mqclient/pulsar_client.go index 4ece2995b0..4a45bb7f20 100644 --- a/internal/util/mqclient/pulsar_client.go +++ b/internal/util/mqclient/pulsar_client.go @@ -58,6 +58,8 @@ func (pc *pulsarClient) Subscribe(options ConsumerOptions) (Consumer, error) { if err != nil { return nil, err } + //consumer.Seek(pulsar.EarliestMessageID()) + //consumer.SeekByTime(time.Unix(0, 0)) pConsumer := &pulsarConsumer{c: consumer} return pConsumer, nil diff --git a/internal/util/mqclient/pulsar_client_test.go b/internal/util/mqclient/pulsar_client_test.go index 4b395b32cc..e68a54466a 100644 --- a/internal/util/mqclient/pulsar_client_test.go +++ b/internal/util/mqclient/pulsar_client_test.go @@ -15,6 +15,7 @@ import ( "bytes" "context" "encoding/binary" + "fmt" "math/rand" "testing" "time" @@ -61,7 +62,7 @@ func Produce(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, } // Consume1 will consume random messages and record the last MessageID it received -func Consume1(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, subName string, c chan MessageID) { +func Consume1(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, subName string, c chan MessageID, total *int) { consumer, err := pc.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: subName, @@ -86,9 +87,11 @@ func Consume1(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, log.Info("Consume1 channel closed") return case msg = <-consumer.Chan(): - //consumer.Ack(msg) + consumer.Ack(msg) v := BytesToInt(msg.Payload()) log.Info("RECV", zap.Any("v", v)) + (*total)++ + //log.Debug("total", zap.Int("val", *total)) } } c <- msg.ID() @@ -98,7 +101,7 @@ func Consume1(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, } // Consume2 will consume messages from specified MessageID -func Consume2(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, subName string, msgID MessageID) { +func Consume2(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, subName string, msgID MessageID, total *int) { consumer, err := pc.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: subName, @@ -114,7 +117,8 @@ func Consume2(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, assert.Nil(t, err) // skip the last received message - <-consumer.Chan() + mm := <-consumer.Chan() + consumer.Ack(mm) log.Info("Consume2 start") @@ -124,9 +128,40 @@ func Consume2(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, log.Info("Consume2 channel closed") return case msg := <-consumer.Chan(): - //consumer.Ack(msg) + consumer.Ack(msg) v := BytesToInt(msg.Payload()) log.Info("RECV", zap.Any("v", v)) + (*total)++ + //log.Debug("total", zap.Int("val", *total)) + } + } +} + +func Consume3(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, subName string, total *int) { + consumer, err := pc.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: subName, + BufSize: 1024, + Type: KeyShared, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + assert.NotNil(t, consumer) + defer consumer.Close() + + log.Info("Consume3 start") + + for { + select { + case <-ctx.Done(): + log.Info("Consume3 channel closed") + return + case msg := <-consumer.Chan(): + consumer.Ack(msg) + v := BytesToInt(msg.Payload()) + log.Info("RECV", zap.Any("v", v)) + (*total)++ + //log.Debug("total", zap.Int("val", *total)) } } } @@ -137,32 +172,197 @@ func TestPulsarClient(t *testing.T) { defer pc.Close() assert.NoError(t, err) assert.NotNil(t, pc) + rand.Seed(time.Now().UnixNano()) - topic := "test" - subName := "subName" + topic := fmt.Sprintf("test-topic-%d", rand.Int()) + subName := fmt.Sprintf("test-subname-%d", rand.Int()) arr := []int{111, 222, 333, 444, 555, 666, 777} - c := make(chan MessageID) + c := make(chan MessageID, 1) ctx, cancel := context.WithCancel(context.Background()) - // launch consume1 - go Consume1(ctx, t, pc, topic, subName, c) - time.Sleep(1 * time.Second) + var total1 int + var total2 int + var total3 int // launch produce - go Produce(ctx, t, pc, topic, arr) + Produce(ctx, t, pc, topic, arr) time.Sleep(1 * time.Second) + // launch consume1 + ctx1, cancel1 := context.WithTimeout(ctx, 2*time.Second) + defer cancel1() + Consume1(ctx1, t, pc, topic, subName, c, &total1) + // record the last received message id lastMsgID := <-c log.Info("msg", zap.Any("lastMsgID", lastMsgID)) // launch consume2 - go Consume2(ctx, t, pc, topic, subName, lastMsgID) - time.Sleep(1 * time.Second) + ctx2, cancel2 := context.WithTimeout(ctx, 2*time.Second) + defer cancel2() + Consume2(ctx2, t, pc, topic, subName, lastMsgID, &total2) + + // launch consume3 + ctx3, cancel3 := context.WithTimeout(ctx, 2*time.Second) + defer cancel3() + Consume3(ctx3, t, pc, topic, subName, &total3) // stop Consume2 cancel() + assert.Equal(t, len(arr), total1+total2) + assert.Equal(t, len(arr), total3) + + log.Info("main done") +} + +func Consume21(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, subName string, c chan MessageID, total *int) { + consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{ + Topic: topic, + SubscriptionName: subName, + Type: pulsar.KeyShared, + SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + assert.NotNil(t, consumer) + defer consumer.Close() + + log.Info("Consume1 start") + + // get random number between 1 ~ 5 + rand.Seed(time.Now().UnixNano()) + cnt := 1 + rand.Int()%5 + + var msg pulsar.ConsumerMessage + for i := 0; i < cnt; i++ { + select { + case <-ctx.Done(): + log.Info("Consume1 channel closed") + return + case msg = <-consumer.Chan(): + consumer.Ack(msg) + v := BytesToInt(msg.Payload()) + log.Info("RECV", zap.Any("v", v)) + (*total)++ + //log.Debug("total", zap.Int("val", *total)) + } + } + c <- msg.ID() + + log.Info("Consume1 randomly RECV", zap.Any("number", cnt)) + log.Info("Consume1 done") +} + +// Consume2 will consume messages from specified MessageID +func Consume22(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, subName string, msgID MessageID, total *int) { + consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{ + Topic: topic, + SubscriptionName: subName, + Type: pulsar.KeyShared, + SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + assert.NotNil(t, consumer) + defer consumer.Close() + + err = consumer.Seek(msgID) + assert.Nil(t, err) + + // skip the last received message + mm := <-consumer.Chan() + consumer.Ack(mm) + + log.Info("Consume2 start") + + for { + select { + case <-ctx.Done(): + log.Info("Consume2 channel closed") + return + case msg := <-consumer.Chan(): + consumer.Ack(msg) + v := BytesToInt(msg.Payload()) + log.Info("RECV", zap.Any("v", v)) + (*total)++ + //log.Debug("total", zap.Int("val", *total)) + } + } +} + +func Consume23(ctx context.Context, t *testing.T, pc *pulsarClient, topic string, subName string, total *int) { + consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{ + Topic: topic, + SubscriptionName: subName, + Type: pulsar.KeyShared, + SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + assert.NotNil(t, consumer) + defer consumer.Close() + + log.Info("Consume3 start") + + for { + select { + case <-ctx.Done(): + log.Info("Consume3 channel closed") + return + case msg := <-consumer.Chan(): + consumer.Ack(msg) + v := BytesToInt(msg.Payload()) + log.Info("RECV", zap.Any("v", v)) + (*total)++ + //log.Debug("total", zap.Int("val", *total)) + } + } +} + +func TestPulsarClient2(t *testing.T) { + pulsarAddress, _ := Params.Load("_PulsarAddress") + pc, err := NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) + defer pc.Close() + assert.NoError(t, err) + assert.NotNil(t, pc) + rand.Seed(time.Now().UnixNano()) + + topic := fmt.Sprintf("test-topic-%d", rand.Int()) + subName := fmt.Sprintf("test-subname-%d", rand.Int()) + arr := []int{111, 222, 333, 444, 555, 666, 777} + c := make(chan MessageID, 1) + + ctx, cancel := context.WithCancel(context.Background()) + + var total1 int + var total2 int + var total3 int + + // launch produce + Produce(ctx, t, pc, topic, arr) + time.Sleep(1 * time.Second) + + // launch consume1 + ctx1, cancel1 := context.WithTimeout(ctx, 2*time.Second) + defer cancel1() + Consume21(ctx1, t, pc, topic, subName, c, &total1) + + // record the last received message id + lastMsgID := <-c + log.Info("msg", zap.Any("lastMsgID", lastMsgID)) + + // launch consume2 + ctx2, cancel2 := context.WithTimeout(ctx, 2*time.Second) + defer cancel2() + Consume22(ctx2, t, pc, topic, subName, lastMsgID, &total2) + + // launch consume3 + ctx3, cancel3 := context.WithTimeout(ctx, 2*time.Second) + defer cancel3() + Consume23(ctx3, t, pc, topic, subName, &total3) + + // stop Consume2 + cancel() + assert.Equal(t, len(arr), total1+total2) + assert.Equal(t, 0, total3) log.Info("main done") } diff --git a/internal/util/mqclient/pulsar_consumer.go b/internal/util/mqclient/pulsar_consumer.go index 528fa3c1be..1287950c6a 100644 --- a/internal/util/mqclient/pulsar_consumer.go +++ b/internal/util/mqclient/pulsar_consumer.go @@ -12,6 +12,8 @@ package mqclient import ( + "time" + "github.com/apache/pulsar-client-go/pulsar" "github.com/milvus-io/milvus/internal/log" ) @@ -19,6 +21,7 @@ import ( type pulsarConsumer struct { c pulsar.Consumer msgChannel chan ConsumerMessage + hasSeek bool } func (pc *pulsarConsumer) Subscription() string { @@ -28,6 +31,9 @@ func (pc *pulsarConsumer) Subscription() string { func (pc *pulsarConsumer) Chan() <-chan ConsumerMessage { if pc.msgChannel == nil { pc.msgChannel = make(chan ConsumerMessage) + if !pc.hasSeek { + pc.c.SeekByTime(time.Unix(0, 0)) + } go func() { for { //nolint:gosimple select { @@ -47,7 +53,11 @@ func (pc *pulsarConsumer) Chan() <-chan ConsumerMessage { func (pc *pulsarConsumer) Seek(id MessageID) error { messageID := id.(*pulsarID).messageID - return pc.c.Seek(messageID) + err := pc.c.Seek(messageID) + if err == nil { + pc.hasSeek = true + } + return err } func (pc *pulsarConsumer) Ack(message ConsumerMessage) {