diff --git a/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go b/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go index d4d0933e8b..4024d19482 100644 --- a/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go +++ b/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go @@ -114,7 +114,8 @@ func (h *backlogClearHelper) getConsumer() (pulsar.Consumer, error) { Topic: h.channelName.Name, SubscriptionName: backlogClearHelperName, Type: pulsar.Shared, // use shared subscription to avoid the subscription is rejected because of consumer exists. - MaxPendingChunkedMessage: 0, + MaxPendingChunkedMessage: 1, // We cannot set it to 0, because the 0 means 100. + ReceiverQueueSize: 1, // We cannot set it to 0, because the 0 means 1000. StartMessageIDInclusive: true, }) if err != nil { diff --git a/pkg/streaming/walimpls/impls/pulsar/wal.go b/pkg/streaming/walimpls/impls/pulsar/wal.go index c49563ffe0..f7cc404c3a 100644 --- a/pkg/streaming/walimpls/impls/pulsar/wal.go +++ b/pkg/streaming/walimpls/impls/pulsar/wal.go @@ -146,7 +146,8 @@ func (w *walImpl) Truncate(ctx context.Context, id message.MessageID) error { Topic: w.Channel().Name, SubscriptionName: truncateCursorSubscriptionName, Type: pulsar.Exclusive, - MaxPendingChunkedMessage: 0, + MaxPendingChunkedMessage: 1, // We cannot set it to 0, because the 0 means 100. + ReceiverQueueSize: 1, // We cannot set it to 0, because the 0 means 1000. StartMessageIDInclusive: true, }) if err != nil {