milvus/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go
Zhen Ye 648994182f
fix: pulsar use more memory for queue (#43565)
issue: #43564

Signed-off-by: chyezh <chyezh@outlook.com>
2025-07-28 14:00:56 +08:00

144 lines
4.0 KiB
Go

package pulsar
import (
"sync"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
"github.com/milvus-io/milvus/pkg/v2/util/retry"
"github.com/milvus-io/milvus/pkg/v2/util/syncutil"
)
const (
backlogClearHelperName = "backlog-clear"
)
// backlogClearHelper is a helper to clear the backlog of pulsar.
type backlogClearHelper struct {
log.Binder
notifier *syncutil.AsyncTaskNotifier[struct{}]
cond *syncutil.ContextCond
written int64
threshold int64
channelName types.PChannelInfo
c pulsar.Client
reusedConsumer pulsar.Consumer
}
// newBacklogClearHelper creates a new backlog clear helper.
func newBacklogClearHelper(c pulsar.Client, channelName types.PChannelInfo, threshold int64) *backlogClearHelper {
h := &backlogClearHelper{
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
cond: syncutil.NewContextCond(&sync.Mutex{}),
written: threshold, // trigger the backlog clear immediately.
threshold: threshold,
channelName: channelName,
c: c,
reusedConsumer: nil,
}
h.SetLogger(log.With(zap.String("channel", channelName.String()), log.FieldComponent("backlog-clear")))
go h.background()
return h
}
// ObserveAppend observes the append traffic.
func (h *backlogClearHelper) ObserveAppend(size int) {
h.cond.L.Lock()
h.written += int64(size)
if h.written >= h.threshold {
h.cond.UnsafeBroadcast()
}
h.cond.L.Unlock()
}
// background is the background goroutine to clear the backlog.
func (h *backlogClearHelper) background() {
defer func() {
h.notifier.Finish(struct{}{})
h.Logger().Info("backlog clear helper exit")
}()
for {
h.cond.L.Lock()
for h.written < h.threshold {
if err := h.cond.Wait(h.notifier.Context()); err != nil {
return
}
}
h.written = 0
h.cond.L.Unlock()
if err := retry.Do(h.notifier.Context(), func() error {
if h.notifier.Context().Err() != nil {
return h.notifier.Context().Err()
}
if err := h.performBacklogClear(); err != nil {
h.Logger().Warn("failed to perform backlog clear", zap.Error(err))
return err
}
h.Logger().Debug("perform backlog clear done")
return nil
}, retry.AttemptAlways()); err != nil {
return
}
}
}
// performBacklogClear performs the backlog clear.
func (h *backlogClearHelper) performBacklogClear() error {
consumer, err := h.getConsumer()
if err != nil {
return errors.Wrap(err, "when create subscription")
}
if err := consumer.SeekByTime(time.Now()); err != nil {
// close the reused consumer if seek failed.
h.closeConsumer()
return errors.Wrap(err, "when seek to latest message")
}
return nil
}
// getConsumer creates a new consumer.
func (h *backlogClearHelper) getConsumer() (pulsar.Consumer, error) {
if h.reusedConsumer != nil {
return h.reusedConsumer, nil
}
consumer, err := h.c.Subscribe(pulsar.ConsumerOptions{
Topic: h.channelName.Name,
SubscriptionName: backlogClearHelperName,
Type: pulsar.Shared, // use shared subscription to avoid the subscription is rejected because of consumer exists.
MaxPendingChunkedMessage: 1, // We cannot set it to 0, because the 0 means 100.
ReceiverQueueSize: 1, // We cannot set it to 0, because the 0 means 1000.
StartMessageIDInclusive: true,
})
if err != nil {
return nil, errors.Wrap(err, "when create subscription")
}
h.reusedConsumer = consumer
h.Logger().Info("created a new consumer")
return h.reusedConsumer, nil
}
// closeConsumer closes the reused consumer.
func (h *backlogClearHelper) closeConsumer() {
if h.reusedConsumer != nil {
h.reusedConsumer.Close()
h.reusedConsumer = nil
h.Logger().Info("closed the reused consumer")
}
}
// Close closes the backlog clear helper.
func (h *backlogClearHelper) Close() {
h.notifier.Cancel()
h.notifier.BlockUntilFinish()
h.closeConsumer()
}