diff --git a/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go b/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go index da2782cfad..d4d0933e8b 100644 --- a/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go +++ b/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go @@ -22,25 +22,27 @@ const ( type backlogClearHelper struct { log.Binder - notifier *syncutil.AsyncTaskNotifier[struct{}] - cond *syncutil.ContextCond - written int64 - threshold int64 - channelName types.PChannelInfo - c pulsar.Client + notifier *syncutil.AsyncTaskNotifier[struct{}] + cond *syncutil.ContextCond + written int64 + threshold int64 + channelName types.PChannelInfo + c pulsar.Client + reusedConsumer pulsar.Consumer } // newBacklogClearHelper creates a new backlog clear helper. func newBacklogClearHelper(c pulsar.Client, channelName types.PChannelInfo, threshold int64) *backlogClearHelper { h := &backlogClearHelper{ - notifier: syncutil.NewAsyncTaskNotifier[struct{}](), - cond: syncutil.NewContextCond(&sync.Mutex{}), - written: threshold, // trigger the backlog clear immediately. - threshold: threshold, - channelName: channelName, - c: c, + notifier: syncutil.NewAsyncTaskNotifier[struct{}](), + cond: syncutil.NewContextCond(&sync.Mutex{}), + written: threshold, // trigger the backlog clear immediately. + threshold: threshold, + channelName: channelName, + c: c, + reusedConsumer: nil, } - h.SetLogger(log.With(zap.String("channel", channelName.String()))) + h.SetLogger(log.With(zap.String("channel", channelName.String()), log.FieldComponent("backlog-clear"))) go h.background() return h } @@ -90,26 +92,51 @@ func (h *backlogClearHelper) background() { // performBacklogClear performs the backlog clear. func (h *backlogClearHelper) performBacklogClear() error { - cursor, err := h.c.Subscribe(pulsar.ConsumerOptions{ + consumer, err := h.getConsumer() + if err != nil { + return errors.Wrap(err, "when create subscription") + } + + if err := consumer.SeekByTime(time.Now()); err != nil { + // close the reused consumer if seek failed. + h.closeConsumer() + return errors.Wrap(err, "when seek to latest message") + } + return nil +} + +// getConsumer creates a new consumer. +func (h *backlogClearHelper) getConsumer() (pulsar.Consumer, error) { + if h.reusedConsumer != nil { + return h.reusedConsumer, nil + } + consumer, err := h.c.Subscribe(pulsar.ConsumerOptions{ Topic: h.channelName.Name, SubscriptionName: backlogClearHelperName, - Type: pulsar.Exclusive, + Type: pulsar.Shared, // use shared subscription to avoid the subscription is rejected because of consumer exists. MaxPendingChunkedMessage: 0, StartMessageIDInclusive: true, }) if err != nil { - return errors.Wrap(err, "when create subscription") + return nil, errors.Wrap(err, "when create subscription") } - defer cursor.Close() + h.reusedConsumer = consumer + h.Logger().Info("created a new consumer") + return h.reusedConsumer, nil +} - if err := cursor.SeekByTime(time.Now()); err != nil { - return errors.Wrap(err, "when seek to latest message") +// closeConsumer closes the reused consumer. +func (h *backlogClearHelper) closeConsumer() { + if h.reusedConsumer != nil { + h.reusedConsumer.Close() + h.reusedConsumer = nil + h.Logger().Info("closed the reused consumer") } - return nil } // Close closes the backlog clear helper. func (h *backlogClearHelper) Close() { h.notifier.Cancel() h.notifier.BlockUntilFinish() + h.closeConsumer() } diff --git a/pkg/streaming/walimpls/impls/pulsar/opener.go b/pkg/streaming/walimpls/impls/pulsar/opener.go index 8771a6f946..d066577c36 100644 --- a/pkg/streaming/walimpls/impls/pulsar/opener.go +++ b/pkg/streaming/walimpls/impls/pulsar/opener.go @@ -56,11 +56,9 @@ func (o *openerImpl) Open(ctx context.Context, opt *walimpls.OpenOption) (walimp notifier: syncutil.NewAsyncTaskNotifier[struct{}](), backlogClearHelper: backlogClearHelper, } - if opt.Channel.AccessMode == types.AccessModeRW { - // because the producer of pulsar cannot be created if the topic is backlog exceeded, - // so we need to set the producer at background with backoff retry. - w.initProducerAtBackground() - } + // because the producer of pulsar cannot be created if the topic is backlog exceeded, + // so we need to set the producer at background with backoff retry. + w.initProducerAtBackground() return w, nil } diff --git a/pkg/streaming/walimpls/impls/pulsar/wal.go b/pkg/streaming/walimpls/impls/pulsar/wal.go index f36655311d..c49563ffe0 100644 --- a/pkg/streaming/walimpls/impls/pulsar/wal.go +++ b/pkg/streaming/walimpls/impls/pulsar/wal.go @@ -30,7 +30,8 @@ type walImpl struct { // initProducerAtBackground initializes the producer at background. func (w *walImpl) initProducerAtBackground() { if w.Channel().AccessMode != types.AccessModeRW { - panic("producer should not be initialized on a wal that is not in read-write mode") + w.notifier.Finish(struct{}{}) + return } defer w.notifier.Finish(struct{}{})