diff --git a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client.go b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client.go index ca48ec39c0..35ad46edb2 100644 --- a/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client.go +++ b/pkg/mq/msgstream/mqwrapper/pulsar/pulsar_client.go @@ -119,6 +119,7 @@ func (pc *pulsarClient) Subscribe(ctx context.Context, options mqwrapper.Consume Type: pulsar.Exclusive, SubscriptionInitialPosition: pulsar.SubscriptionInitialPosition(options.SubscriptionInitialPosition), MessageChannel: receiveChannel, + StartMessageIDInclusive: true, }) if err != nil { metrics.MsgStreamOpCounter.WithLabelValues(metrics.CreateConsumerLabel, metrics.FailLabel).Inc()