From 3630eec92c18457f57700560633acfc7ec226309 Mon Sep 17 00:00:00 2001 From: xige-16 Date: Thu, 27 May 2021 10:38:37 +0800 Subject: [PATCH] Fix receving wrong msg after seek (#5441) Signed-off-by: xige-16 --- internal/msgstream/mq_msgstream.go | 21 +- internal/msgstream/mq_msgstream_test.go | 413 ++++++++++++++++++------ 2 files changed, 324 insertions(+), 110 deletions(-) diff --git a/internal/msgstream/mq_msgstream.go b/internal/msgstream/mq_msgstream.go index 133fda142a..9921149e67 100644 --- a/internal/msgstream/mq_msgstream.go +++ b/internal/msgstream/mq_msgstream.go @@ -102,8 +102,7 @@ func (ms *mqMsgStream) AsProducer(channels []string) { } } -func (ms *mqMsgStream) AsConsumer(channels []string, - subName string) { +func (ms *mqMsgStream) AsConsumer(channels []string, subName string) { for _, channel := range channels { if _, ok := ms.consumers[channel]; ok { continue @@ -190,14 +189,14 @@ func (ms *mqMsgStream) GetProduceChannels() []string { } func (ms *mqMsgStream) Produce(msgPack *MsgPack) error { - tsMsgs := msgPack.Msgs - if len(tsMsgs) <= 0 { + if msgPack == nil || len(msgPack.Msgs) <= 0 { log.Debug("Warning: Receive empty msgPack") return nil } if len(ms.producers) <= 0 { return errors.New("nil producer in msg stream") } + tsMsgs := msgPack.Msgs reBucketValues := ms.ComputeProduceChannelIndexes(msgPack.Msgs) var result map[int32]*MsgPack var err error @@ -251,6 +250,10 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error { } func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) error { + if msgPack == nil || len(msgPack.Msgs) <= 0 { + log.Debug("Warning: Receive empty msgPack") + return nil + } for _, v := range msgPack.Msgs { sp, spanCtx := MsgSpanFromCtx(v.TraceCtx(), v) @@ -479,7 +482,6 @@ func (ms *MqTtMsgStream) Close() { func (ms *MqTtMsgStream) bufMsgPackToChannel() { defer ms.wait.Done() - ms.unsolvedBuf = make(map[mqclient.Consumer][]TsMsg) isChannelReady := make(map[mqclient.Consumer]bool) eofMsgTimeStamp := make(map[mqclient.Consumer]Timestamp) @@ -503,11 +505,11 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() { wg.Add(1) go ms.findTimeTick(consumer, eofMsgTimeStamp, &wg, &findMapMutex) } - ms.consumerLock.Unlock() wg.Wait() timeStamp, ok := checkTimeTickMsg(eofMsgTimeStamp, isChannelReady, &findMapMutex) if !ok || timeStamp <= ms.lastTimeStamp { //log.Printf("All timeTick's timestamps are inconsistent") + ms.consumerLock.Unlock() continue } timeTickBuf := make([]TsMsg, 0) @@ -553,6 +555,7 @@ func (ms *MqTtMsgStream) bufMsgPackToChannel() { ms.msgPositions[consumer] = newPos } ms.unsolvedMutex.Unlock() + ms.consumerLock.Unlock() msgPack := MsgPack{ BeginTs: ms.lastTimeStamp, @@ -712,9 +715,9 @@ func (ms *MqTtMsgStream) Seek(mp *internalpb.MsgPosition) error { ms.addConsumer(consumer, seekChannel) //TODO: May cause problem - if len(consumer.Chan()) == 0 { - return nil - } + //if len(consumer.Chan()) == 0 { + // return nil + //} for { select { diff --git a/internal/msgstream/mq_msgstream_test.go b/internal/msgstream/mq_msgstream_test.go index 7bcc4c1b5d..adcfbc152a 100644 --- a/internal/msgstream/mq_msgstream_test.go +++ b/internal/msgstream/mq_msgstream_test.go @@ -13,10 +13,11 @@ package msgstream import ( "context" - "fmt" "log" + "math/rand" "os" "testing" + "time" "github.com/apache/pulsar-client-go/pulsar" "github.com/stretchr/testify/assert" @@ -57,7 +58,9 @@ func repackFunc(msgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) { return result, nil } -func getTsMsg(msgType MsgType, reqID UniqueID, hashValue uint32) TsMsg { +func getTsMsg(msgType MsgType, reqID UniqueID) TsMsg { + hashValue := uint32(reqID) + time := uint64(reqID) baseMsg := BaseMsg{ BeginTimestamp: 0, EndTimestamp: 0, @@ -69,14 +72,14 @@ func getTsMsg(msgType MsgType, reqID UniqueID, hashValue uint32) TsMsg { Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_Insert, MsgID: reqID, - Timestamp: 11, + Timestamp: time, SourceID: reqID, }, CollectionName: "Collection", PartitionName: "Partition", SegmentID: 1, ChannelID: "0", - Timestamps: []Timestamp{uint64(reqID)}, + Timestamps: []Timestamp{time}, RowIDs: []int64{1}, RowData: []*commonpb.Blob{{}}, } @@ -164,7 +167,9 @@ func getTsMsg(msgType MsgType, reqID UniqueID, hashValue uint32) TsMsg { return nil } -func getTimeTickMsg(reqID UniqueID, hashValue uint32, time uint64) TsMsg { +func getTimeTickMsg(reqID UniqueID) TsMsg { + hashValue := uint32(reqID) + time := uint64(reqID) baseMsg := BaseMsg{ BeginTimestamp: 0, EndTimestamp: 0, @@ -185,41 +190,30 @@ func getTimeTickMsg(reqID UniqueID, hashValue uint32, time uint64) TsMsg { return timeTickMsg } -func initPulsarStream(pulsarAddress string, - producerChannels []string, - consumerChannels []string, - consumerSubName string, - opts ...RepackFunc) (MsgStream, MsgStream) { - factory := ProtoUDFactory{} - - // set input stream - pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) - inputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) - inputStream.AsProducer(producerChannels) - for _, opt := range opts { - inputStream.SetRepackFunc(opt) +// Generate MsgPack contains 'num' msgs, with timestamp in (start, end) +func getInsertMsgPack(num int, start int, end int) *MsgPack { + Rand := rand.New(rand.NewSource(time.Now().UnixNano())) + set := make(map[int]bool) + msgPack := MsgPack{} + for len(set) < num { + reqID := Rand.Int()%(end-start-1) + start + 1 + _, ok := set[reqID] + if !ok { + set[reqID] = true + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, int64(reqID))) + } } - inputStream.Start() - var input MsgStream = inputStream - - // set output stream - pulsarClient2, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) - outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) - outputStream.AsConsumer(consumerChannels, consumerSubName) - outputStream.Start() - var output MsgStream = outputStream - - return input, output + return &msgPack } -func initPulsarTtStream(pulsarAddress string, - producerChannels []string, - consumerChannels []string, - consumerSubName string, - opts ...RepackFunc) (MsgStream, MsgStream) { - factory := ProtoUDFactory{} +func getTimeTickMsgPack(reqID UniqueID) *MsgPack { + msgPack := MsgPack{} + msgPack.Msgs = append(msgPack.Msgs, getTimeTickMsg(reqID)) + return &msgPack +} - // set input stream +func getPulsarInputStream(pulsarAddress string, producerChannels []string, opts ...RepackFunc) MsgStream { + factory := ProtoUDFactory{} pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) inputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) inputStream.AsProducer(producerChannels) @@ -227,16 +221,39 @@ func initPulsarTtStream(pulsarAddress string, inputStream.SetRepackFunc(opt) } inputStream.Start() - var input MsgStream = inputStream + return inputStream +} - // set output stream - pulsarClient2, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) - outputStream, _ := NewMqTtMsgStream(context.Background(), 100, 100, pulsarClient2, factory.NewUnmarshalDispatcher()) +func getPulsarOutputStream(pulsarAddress string, consumerChannels []string, consumerSubName string) MsgStream { + factory := ProtoUDFactory{} + pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) + outputStream, _ := NewMqMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName) outputStream.Start() - var output MsgStream = outputStream + return outputStream +} - return input, output +func getPulsarTtOutputStream(pulsarAddress string, consumerChannels []string, consumerSubName string) MsgStream { + factory := ProtoUDFactory{} + pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) + outputStream, _ := NewMqTtMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) + outputStream.AsConsumer(consumerChannels, consumerSubName) + outputStream.Start() + return outputStream +} + +func getPulsarTtOutputStreamAndSeek(pulsarAddress string, positions []*MsgPosition) MsgStream { + factory := ProtoUDFactory{} + pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) + outputStream, _ := NewMqTtMsgStream(context.Background(), 100, 100, pulsarClient, factory.NewUnmarshalDispatcher()) + //outputStream.AsConsumer(consumerChannels, consumerSubName) + outputStream.Start() + for _, pos := range positions { + pos.MsgGroup = funcutil.RandomString(4) + outputStream.Seek(pos) + } + //outputStream.Start() + return outputStream } func receiveMsg(outputStream MsgStream, msgCount int) { @@ -247,8 +264,9 @@ func receiveMsg(outputStream MsgStream, msgCount int) { msgs := result.Msgs for _, v := range msgs { receiveCount++ - fmt.Println("msg type: ", v.Type(), ", msg value: ", v) + log.Println("msg type: ", v.Type(), ", msg value: ", v) } + log.Println("================") } if receiveCount >= msgCount { break @@ -256,6 +274,17 @@ func receiveMsg(outputStream MsgStream, msgCount int) { } } +func printMsgPack(msgPack *MsgPack) { + if msgPack == nil { + log.Println("msg nil") + } else { + for _, v := range msgPack.Msgs { + log.Println("msg type: ", v.Type(), ", msg value: ", v) + } + } + log.Println("================") +} + func TestStream_PulsarMsgStream_Insert(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress") c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) @@ -264,10 +293,12 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) { consumerSubName := funcutil.RandomString(8) msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 1, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 3, 3)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 3)) + + inputStream := getPulsarInputStream(pulsarAddress, producerChannels) + outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName) - inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) err := inputStream.Produce(&msgPack) if err != nil { log.Fatalf("produce error = %v", err) @@ -276,7 +307,6 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) { receiveMsg(outputStream, len(msgPack.Msgs)) inputStream.Close() outputStream.Close() - } func TestStream_PulsarMsgStream_Delete(t *testing.T) { @@ -286,10 +316,12 @@ func TestStream_PulsarMsgStream_Delete(t *testing.T) { consumerChannels := []string{c} consumerSubName := funcutil.RandomString(8) msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Delete, 1, 1)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Delete, 1)) //msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Delete, 3, 3)) - inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) + inputStream := getPulsarInputStream(pulsarAddress, producerChannels) + outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName) + err := inputStream.Produce(&msgPack) if err != nil { log.Fatalf("produce error = %v", err) @@ -307,10 +339,12 @@ func TestStream_PulsarMsgStream_Search(t *testing.T) { consumerSubName := funcutil.RandomString(8) msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Search, 1, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Search, 3, 3)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Search, 1)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Search, 3)) + + inputStream := getPulsarInputStream(pulsarAddress, producerChannels) + outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName) - inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) err := inputStream.Produce(&msgPack) if err != nil { log.Fatalf("produce error = %v", err) @@ -327,10 +361,12 @@ func TestStream_PulsarMsgStream_SearchResult(t *testing.T) { consumerChannels := []string{c} consumerSubName := funcutil.RandomString(8) msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_SearchResult, 1, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_SearchResult, 3, 3)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_SearchResult, 1)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_SearchResult, 3)) + + inputStream := getPulsarInputStream(pulsarAddress, producerChannels) + outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName) - inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) err := inputStream.Produce(&msgPack) if err != nil { log.Fatalf("produce error = %v", err) @@ -347,10 +383,12 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) { consumerChannels := []string{c} consumerSubName := funcutil.RandomString(8) msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 1, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 3, 3)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 1)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 3)) + + inputStream := getPulsarInputStream(pulsarAddress, producerChannels) + outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName) - inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) err := inputStream.Produce(&msgPack) if err != nil { log.Fatalf("produce error = %v", err) @@ -368,10 +406,12 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) { consumerSubName := funcutil.RandomString(8) msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 1, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 3, 3)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 1)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 3)) + + inputStream := getPulsarInputStream(pulsarAddress, producerChannels) + outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName) - inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) err := inputStream.Broadcast(&msgPack) if err != nil { log.Fatalf("produce error = %v", err) @@ -389,10 +429,11 @@ func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) { consumerSubName := funcutil.RandomString(8) msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 1, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 3, 3)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 3)) - inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, repackFunc) + inputStream := getPulsarInputStream(pulsarAddress, producerChannels, repackFunc) + outputStream := getPulsarOutputStream(pulsarAddress, consumerChannels, consumerSubName) err := inputStream.Produce(&msgPack) if err != nil { log.Fatalf("produce error = %v", err) @@ -521,10 +562,10 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) { consumerSubName := funcutil.RandomString(8) msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 1, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Search, 2, 2)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_SearchResult, 3, 3)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_QueryNodeStats, 4, 4)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_TimeTick, 1)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Search, 2)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_SearchResult, 3)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_QueryNodeStats, 4)) factory := ProtoUDFactory{} pulsarClient, _ := mqclient.NewPulsarClient(pulsar.ClientOptions{URL: pulsarAddress}) @@ -554,16 +595,18 @@ func TestStream_PulsarTtMsgStream_Insert(t *testing.T) { consumerChannels := []string{c1, c2} consumerSubName := funcutil.RandomString(8) msgPack0 := MsgPack{} - msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0)) + msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) msgPack1 := MsgPack{} - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1, 1)) - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3, 3)) + msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) + msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3)) msgPack2 := MsgPack{} - msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5)) + msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5)) + + inputStream := getPulsarInputStream(pulsarAddress, producerChannels) + outputStream := getPulsarTtOutputStream(pulsarAddress, consumerChannels, consumerSubName) - inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) err := inputStream.Broadcast(&msgPack0) if err != nil { log.Fatalf("broadcast error = %v", err) @@ -589,26 +632,28 @@ func TestStream_PulsarTtMsgStream_Seek(t *testing.T) { consumerSubName := funcutil.RandomString(8) msgPack0 := MsgPack{} - msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0)) + msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) msgPack1 := MsgPack{} - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1, 1)) - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 19, 19)) + msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) + msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 19)) msgPack2 := MsgPack{} - msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5)) + msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5)) msgPack3 := MsgPack{} - msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 14, 14)) - msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 9, 9)) + msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 14)) + msgPack3.Msgs = append(msgPack3.Msgs, getTsMsg(commonpb.MsgType_Insert, 9)) msgPack4 := MsgPack{} - msgPack4.Msgs = append(msgPack4.Msgs, getTimeTickMsg(11, 11, 11)) + msgPack4.Msgs = append(msgPack4.Msgs, getTimeTickMsg(11)) msgPack5 := MsgPack{} - msgPack5.Msgs = append(msgPack5.Msgs, getTimeTickMsg(15, 15, 15)) + msgPack5.Msgs = append(msgPack5.Msgs, getTimeTickMsg(15)) + + inputStream := getPulsarInputStream(pulsarAddress, producerChannels) + outputStream := getPulsarTtOutputStream(pulsarAddress, consumerChannels, consumerSubName) - inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) err := inputStream.Broadcast(&msgPack0) assert.Nil(t, err) err = inputStream.Produce(&msgPack1) @@ -622,15 +667,15 @@ func TestStream_PulsarTtMsgStream_Seek(t *testing.T) { outputStream.Consume() receivedMsg := outputStream.Consume() - for _, position := range receivedMsg.StartPositions { - outputStream.Seek(position) - } + outputStream.Close() + outputStream = getPulsarTtOutputStreamAndSeek(pulsarAddress, receivedMsg.EndPositions) + err = inputStream.Broadcast(&msgPack5) assert.Nil(t, err) - //seekMsg, _ := outputStream.Consume() - //for _, msg := range seekMsg.Msgs { - // assert.Equal(t, msg.BeginTs(), uint64(14)) - //} + seekMsg := outputStream.Consume() + for _, msg := range seekMsg.Msgs { + assert.Equal(t, msg.BeginTs(), uint64(14)) + } inputStream.Close() outputStream.Close() } @@ -643,16 +688,18 @@ func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) { consumerSubName := funcutil.RandomString(8) msgPack0 := MsgPack{} - msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0)) + msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) msgPack1 := MsgPack{} - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1, 1)) - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3, 3)) + msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) + msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3)) msgPack2 := MsgPack{} - msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5)) + msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5)) + + inputStream := getPulsarInputStream(pulsarAddress, producerChannels) + outputStream := getPulsarTtOutputStream(pulsarAddress, consumerChannels, consumerSubName) - inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName) err := inputStream.Broadcast(&msgPack0) if err != nil { log.Fatalf("broadcast error = %v", err) @@ -670,6 +717,170 @@ func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) { outputStream.Close() } +// +// This testcase will generate MsgPacks as following: +// +// Insert Insert Insert Insert Insert Insert +// |----------|----------|----------|----------|----------|----------| +// ^ ^ ^ ^ ^ ^ +// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100) +// +// Then check: +// 1. For each msg in MsgPack received by ttMsgStream consumer, there should be +// msgPack.BeginTs < msg.BeginTs() <= msgPack.EndTs +// 2. The count of consumed msg should be equal to the count of produced msg +// +func TestStream_PulsarTtMsgStream_1(t *testing.T) { + pulsarAddress, _ := Params.Load("_PulsarAddress") + c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) + producerChannels := []string{c1, c2} + consumerChannels := []string{c1, c2} + consumerSubName := funcutil.RandomString(8) + + const msgsInPack = 5 + const numOfMsgPack = 10 + msgPacks := make([]*MsgPack, numOfMsgPack) + + // generate MsgPack + for i := 0; i < numOfMsgPack; i++ { + if i%2 == 0 { + msgPacks[i] = getInsertMsgPack(msgsInPack, i/2*10, i/2*10+22) + } else { + msgPacks[i] = getTimeTickMsgPack(int64((i + 1) / 2 * 10)) + } + } + msgPacks = append(msgPacks, nil) + msgPacks = append(msgPacks, getTimeTickMsgPack(100)) + + inputStream := getPulsarInputStream(pulsarAddress, producerChannels) + outputStream := getPulsarTtOutputStream(pulsarAddress, consumerChannels, consumerSubName) + + // produce msg + log.Println("==============produce msg==================") + for i := 0; i < len(msgPacks); i++ { + printMsgPack(msgPacks[i]) + if i%2 == 0 { + // insert msg use Produce + err := inputStream.Produce(msgPacks[i]) + assert.Nil(t, err) + } else { + // tt msg use Broadcast + err := inputStream.Broadcast(msgPacks[i]) + assert.Nil(t, err) + } + } + + // consume msg + log.Println("===============receive msg=================") + checkNMsgPack := func(t *testing.T, outputStream MsgStream, num int) int { + rcvMsg := 0 + for i := 0; i < num; i++ { + msgPack := outputStream.Consume() + rcvMsg += len(msgPack.Msgs) + if len(msgPack.Msgs) > 0 { + for _, msg := range msgPack.Msgs { + log.Println("msg type: ", msg.Type(), ", msg value: ", msg) + assert.Greater(t, msg.BeginTs(), msgPack.BeginTs) + assert.LessOrEqual(t, msg.BeginTs(), msgPack.EndTs) + } + log.Println("================") + } + } + return rcvMsg + } + msgCount := checkNMsgPack(t, outputStream, len(msgPacks)/2) + assert.Equal(t, (len(msgPacks)/2-1)*msgsInPack, msgCount) + + inputStream.Close() + outputStream.Close() +} + +// +// This testcase will generate MsgPacks as following: +// +// Insert Insert Insert Insert Insert Insert +// |----------|----------|----------|----------|----------|----------| +// ^ ^ ^ ^ ^ ^ +// TT(10) TT(20) TT(30) TT(40) TT(50) TT(100) +// +// Then check: +// 1. ttMsgStream consumer can seek to the right position and resume +// 2. The count of consumed msg should be equal to the count of produced msg +// +func TestStream_PulsarTtMsgStream_2(t *testing.T) { + pulsarAddress, _ := Params.Load("_PulsarAddress") + c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) + producerChannels := []string{c1, c2} + consumerChannels := []string{c1, c2} + consumerSubName := funcutil.RandomString(8) + + const msgsInPack = 5 + const numOfMsgPack = 10 + msgPacks := make([]*MsgPack, numOfMsgPack) + + // generate MsgPack + for i := 0; i < numOfMsgPack; i++ { + if i%2 == 0 { + msgPacks[i] = getInsertMsgPack(msgsInPack, i/2*10, i/2*10+22) + } else { + msgPacks[i] = getTimeTickMsgPack(int64((i + 1) / 2 * 10)) + } + } + msgPacks = append(msgPacks, nil) + msgPacks = append(msgPacks, getTimeTickMsgPack(100)) + + inputStream := getPulsarInputStream(pulsarAddress, producerChannels) + + // produce msg + log.Println("===============produce msg=================") + for i := 0; i < len(msgPacks); i++ { + printMsgPack(msgPacks[i]) + if i%2 == 0 { + // insert msg use Produce + err := inputStream.Produce(msgPacks[i]) + assert.Nil(t, err) + } else { + // tt msg use Broadcast + err := inputStream.Broadcast(msgPacks[i]) + assert.Nil(t, err) + } + } + + // consume msg + log.Println("=============receive msg===================") + rcvMsgPacks := make([]*MsgPack, 0) + + resumeMsgPack := func(t *testing.T) int { + var outputStream MsgStream + msgCount := len(rcvMsgPacks) + if msgCount == 0 { + outputStream = getPulsarTtOutputStream(pulsarAddress, consumerChannels, consumerSubName) + } else { + outputStream = getPulsarTtOutputStreamAndSeek(pulsarAddress, rcvMsgPacks[msgCount-1].EndPositions) + } + msgPack := outputStream.Consume() + rcvMsgPacks = append(rcvMsgPacks, msgPack) + if len(msgPack.Msgs) > 0 { + for _, msg := range msgPack.Msgs { + log.Println("msg type: ", msg.Type(), ", msg value: ", msg) + assert.Greater(t, msg.BeginTs(), msgPack.BeginTs) + assert.LessOrEqual(t, msg.BeginTs(), msgPack.EndTs) + } + log.Println("================") + } + outputStream.Close() + return len(rcvMsgPacks[msgCount].Msgs) + } + + msgCount := 0 + for i := 0; i < len(msgPacks)/2; i++ { + msgCount += resumeMsgPack(t) + } + assert.Equal(t, (len(msgPacks)/2-1)*msgsInPack, msgCount) + + inputStream.Close() +} + /****************************************Rmq test******************************************/ func initRmq(name string) *etcdkv.EtcdKV { @@ -698,7 +909,7 @@ func Close(rocksdbName string, intputStream, outputStream MsgStream, etcdKV *etc outputStream.Close() etcdKV.Close() err := os.RemoveAll(rocksdbName) - fmt.Println(err) + log.Println(err) } func initRmqStream(producerChannels []string, @@ -755,8 +966,8 @@ func TestStream_RmqMsgStream_Insert(t *testing.T) { consumerGroupName := "InsertGroup" msgPack := MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 1, 1)) - msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 3, 3)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) + msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_Insert, 3)) rocksdbName := "/tmp/rocksmq_insert" etcdKV := initRmq(rocksdbName) @@ -776,14 +987,14 @@ func TestStream_RmqTtMsgStream_Insert(t *testing.T) { consumerSubName := "subInsert" msgPack0 := MsgPack{} - msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0)) + msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0)) msgPack1 := MsgPack{} - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1, 1)) - msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3, 3)) + msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 1)) + msgPack1.Msgs = append(msgPack1.Msgs, getTsMsg(commonpb.MsgType_Insert, 3)) msgPack2 := MsgPack{} - msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5)) + msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5)) rocksdbName := "/tmp/rocksmq_insert_tt" etcdKV := initRmq(rocksdbName)