mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Add unit test for pulsarClient Seek (#11107)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
68d6839ab0
commit
dddf84b194
@ -376,6 +376,58 @@ func TestPulsarClient_Consume2(t *testing.T) {
|
||||
log.Info("main done")
|
||||
}
|
||||
|
||||
func TestPulsarClient_Seek(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
pc, err := GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
|
||||
defer pc.Close()
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, pc)
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
|
||||
ctx := context.Background()
|
||||
topic := fmt.Sprintf("test-topic-%d", rand.Int())
|
||||
subName := fmt.Sprintf("test-subname-%d", rand.Int())
|
||||
|
||||
producer, err := pc.CreateProducer(ProducerOptions{Topic: topic})
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, producer)
|
||||
|
||||
log.Info("Produce start")
|
||||
var id MessageID
|
||||
arr := []int{1, 2, 3}
|
||||
for _, v := range arr {
|
||||
msg := &ProducerMessage{
|
||||
Payload: IntToBytes(v),
|
||||
Properties: map[string]string{},
|
||||
}
|
||||
id, err = producer.Send(ctx, msg)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
log.Info("Produced")
|
||||
|
||||
consumer, err := pc.client.Subscribe(pulsar.ConsumerOptions{
|
||||
Topic: topic,
|
||||
SubscriptionName: subName,
|
||||
Type: pulsar.KeyShared,
|
||||
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, consumer)
|
||||
defer consumer.Close()
|
||||
seekID := id.(*pulsarID).messageID
|
||||
consumer.Seek(seekID)
|
||||
|
||||
msgChan := consumer.Chan()
|
||||
|
||||
select {
|
||||
case msg := <-msgChan:
|
||||
assert.Equal(t, 3, BytesToInt(msg.Payload()))
|
||||
case <-time.After(2 * time.Second):
|
||||
log.Info("after 2 seconds")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPulsarClient_EarliestMessageID(t *testing.T) {
|
||||
pulsarAddress, _ := Params.Load("_PulsarAddress")
|
||||
client, _ := GetPulsarClientInstance(pulsar.ClientOptions{URL: pulsarAddress})
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user