From 648994182fa0aa86891badee1ff9e9d1df21d52b Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Mon, 28 Jul 2025 14:00:56 +0800 Subject: [PATCH] fix: pulsar use more memory for queue (#43565) issue: #43564 Signed-off-by: chyezh --- pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go | 3 ++- pkg/streaming/walimpls/impls/pulsar/wal.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go b/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go index d4d0933e8b..4024d19482 100644 --- a/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go +++ b/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go @@ -114,7 +114,8 @@ func (h *backlogClearHelper) getConsumer() (pulsar.Consumer, error) { Topic: h.channelName.Name, SubscriptionName: backlogClearHelperName, Type: pulsar.Shared, // use shared subscription to avoid the subscription is rejected because of consumer exists. - MaxPendingChunkedMessage: 0, + MaxPendingChunkedMessage: 1, // We cannot set it to 0, because the 0 means 100. + ReceiverQueueSize: 1, // We cannot set it to 0, because the 0 means 1000. StartMessageIDInclusive: true, }) if err != nil { diff --git a/pkg/streaming/walimpls/impls/pulsar/wal.go b/pkg/streaming/walimpls/impls/pulsar/wal.go index c49563ffe0..f7cc404c3a 100644 --- a/pkg/streaming/walimpls/impls/pulsar/wal.go +++ b/pkg/streaming/walimpls/impls/pulsar/wal.go @@ -146,7 +146,8 @@ func (w *walImpl) Truncate(ctx context.Context, id message.MessageID) error { Topic: w.Channel().Name, SubscriptionName: truncateCursorSubscriptionName, Type: pulsar.Exclusive, - MaxPendingChunkedMessage: 0, + MaxPendingChunkedMessage: 1, // We cannot set it to 0, because the 0 means 100. + ReceiverQueueSize: 1, // We cannot set it to 0, because the 0 means 1000. StartMessageIDInclusive: true, }) if err != nil {