mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: reuse consumer for backlog clear and use shared consumer (#42822)
issue: #42820 - fix that ro pulsar cannot be closed when upgrading milvus. Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
parent
74ea57bac1
commit
593662970b
@ -22,25 +22,27 @@ const (
|
||||
type backlogClearHelper struct {
|
||||
log.Binder
|
||||
|
||||
notifier *syncutil.AsyncTaskNotifier[struct{}]
|
||||
cond *syncutil.ContextCond
|
||||
written int64
|
||||
threshold int64
|
||||
channelName types.PChannelInfo
|
||||
c pulsar.Client
|
||||
notifier *syncutil.AsyncTaskNotifier[struct{}]
|
||||
cond *syncutil.ContextCond
|
||||
written int64
|
||||
threshold int64
|
||||
channelName types.PChannelInfo
|
||||
c pulsar.Client
|
||||
reusedConsumer pulsar.Consumer
|
||||
}
|
||||
|
||||
// newBacklogClearHelper creates a new backlog clear helper.
|
||||
func newBacklogClearHelper(c pulsar.Client, channelName types.PChannelInfo, threshold int64) *backlogClearHelper {
|
||||
h := &backlogClearHelper{
|
||||
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
|
||||
cond: syncutil.NewContextCond(&sync.Mutex{}),
|
||||
written: threshold, // trigger the backlog clear immediately.
|
||||
threshold: threshold,
|
||||
channelName: channelName,
|
||||
c: c,
|
||||
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
|
||||
cond: syncutil.NewContextCond(&sync.Mutex{}),
|
||||
written: threshold, // trigger the backlog clear immediately.
|
||||
threshold: threshold,
|
||||
channelName: channelName,
|
||||
c: c,
|
||||
reusedConsumer: nil,
|
||||
}
|
||||
h.SetLogger(log.With(zap.String("channel", channelName.String())))
|
||||
h.SetLogger(log.With(zap.String("channel", channelName.String()), log.FieldComponent("backlog-clear")))
|
||||
go h.background()
|
||||
return h
|
||||
}
|
||||
@ -90,26 +92,51 @@ func (h *backlogClearHelper) background() {
|
||||
|
||||
// performBacklogClear performs the backlog clear.
|
||||
func (h *backlogClearHelper) performBacklogClear() error {
|
||||
cursor, err := h.c.Subscribe(pulsar.ConsumerOptions{
|
||||
consumer, err := h.getConsumer()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "when create subscription")
|
||||
}
|
||||
|
||||
if err := consumer.SeekByTime(time.Now()); err != nil {
|
||||
// close the reused consumer if seek failed.
|
||||
h.closeConsumer()
|
||||
return errors.Wrap(err, "when seek to latest message")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// getConsumer creates a new consumer.
|
||||
func (h *backlogClearHelper) getConsumer() (pulsar.Consumer, error) {
|
||||
if h.reusedConsumer != nil {
|
||||
return h.reusedConsumer, nil
|
||||
}
|
||||
consumer, err := h.c.Subscribe(pulsar.ConsumerOptions{
|
||||
Topic: h.channelName.Name,
|
||||
SubscriptionName: backlogClearHelperName,
|
||||
Type: pulsar.Exclusive,
|
||||
Type: pulsar.Shared, // use shared subscription to avoid the subscription is rejected because of consumer exists.
|
||||
MaxPendingChunkedMessage: 0,
|
||||
StartMessageIDInclusive: true,
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "when create subscription")
|
||||
return nil, errors.Wrap(err, "when create subscription")
|
||||
}
|
||||
defer cursor.Close()
|
||||
h.reusedConsumer = consumer
|
||||
h.Logger().Info("created a new consumer")
|
||||
return h.reusedConsumer, nil
|
||||
}
|
||||
|
||||
if err := cursor.SeekByTime(time.Now()); err != nil {
|
||||
return errors.Wrap(err, "when seek to latest message")
|
||||
// closeConsumer closes the reused consumer.
|
||||
func (h *backlogClearHelper) closeConsumer() {
|
||||
if h.reusedConsumer != nil {
|
||||
h.reusedConsumer.Close()
|
||||
h.reusedConsumer = nil
|
||||
h.Logger().Info("closed the reused consumer")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the backlog clear helper.
|
||||
func (h *backlogClearHelper) Close() {
|
||||
h.notifier.Cancel()
|
||||
h.notifier.BlockUntilFinish()
|
||||
h.closeConsumer()
|
||||
}
|
||||
|
||||
@ -56,11 +56,9 @@ func (o *openerImpl) Open(ctx context.Context, opt *walimpls.OpenOption) (walimp
|
||||
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
|
||||
backlogClearHelper: backlogClearHelper,
|
||||
}
|
||||
if opt.Channel.AccessMode == types.AccessModeRW {
|
||||
// because the producer of pulsar cannot be created if the topic is backlog exceeded,
|
||||
// so we need to set the producer at background with backoff retry.
|
||||
w.initProducerAtBackground()
|
||||
}
|
||||
// because the producer of pulsar cannot be created if the topic is backlog exceeded,
|
||||
// so we need to set the producer at background with backoff retry.
|
||||
w.initProducerAtBackground()
|
||||
return w, nil
|
||||
}
|
||||
|
||||
|
||||
@ -30,7 +30,8 @@ type walImpl struct {
|
||||
// initProducerAtBackground initializes the producer at background.
|
||||
func (w *walImpl) initProducerAtBackground() {
|
||||
if w.Channel().AccessMode != types.AccessModeRW {
|
||||
panic("producer should not be initialized on a wal that is not in read-write mode")
|
||||
w.notifier.Finish(struct{}{})
|
||||
return
|
||||
}
|
||||
|
||||
defer w.notifier.Finish(struct{}{})
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user