From 1f66b650e97a4a1586c357afdbfd51bc05f8b08d Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Fri, 13 Jun 2025 14:28:37 +0800 Subject: [PATCH] fix: pulsar cannot work properly if backlog exceed (#42653) issue: #42649 - the sync operation of different pchannel is concurrent now. - add a option to notify the backlog clear automatically. - make pulsar walimpls can be recovered from backlog exceed. Signed-off-by: chyezh --- configs/milvus.yaml | 24 +++- internal/storage/compress/zstd.go | 5 +- .../interceptors/timetick/inspector/impls.go | 31 ++++- .../server/wal/recovery/config_test.go | 4 +- .../impls/pulsar/backlog_clear_helper.go | 115 ++++++++++++++++++ pkg/streaming/walimpls/impls/pulsar/opener.go | 60 +++++---- pkg/streaming/walimpls/impls/pulsar/wal.go | 77 +++++++++++- pkg/util/paramtable/component_param.go | 19 +-- pkg/util/paramtable/component_param_test.go | 4 +- pkg/util/paramtable/service_param.go | 19 +++ pkg/util/paramtable/service_param_test.go | 5 + 11 files changed, 308 insertions(+), 55 deletions(-) create mode 100644 pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 3107249261..46bde5d841 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -227,6 +227,16 @@ pulsar: namespace: default # A Pulsar namespace is the administrative unit nomenclature within a tenant. requestTimeout: 60 # pulsar client global request timeout in seconds enableClientMetrics: false # Whether to register pulsar client metrics into milvus metrics path. + # Perform a backlog cleanup every time the data of given bytes is written. + # Because milvus use puslar reader to read the message, so if there's no pulsar subscriber when milvus running. + # If the pulsar cluster open the backlog protection (backlogQuotaDefaultLimitBytes), the backlog exceed will reported to fail the write operation + # set this option to non-zero will create a subscription seek to latest position to clear the pulsar backlog. + # If these options is non-zero, the wal data in pulsar is fully protected by retention policy, + # so admin of pulsar should give enough retention time to avoid the wal message lost. + # If these options is zero, no subscription will be created, so pulsar cluster must close the backlog protection, otherwise the milvus can not recovered if backlog exceed. + # Moreover, if these option is zero, Milvus use a truncation subscriber to protect the wal data in pulsar if user disable the subscriptionExpirationTimeMinutes. + # The retention policy of pulsar can set shorter to save the storage space in this case. + backlogAutoClearBytes: 100m # If you want to enable kafka, needs to comment the pulsar configs # kafka: @@ -1269,14 +1279,18 @@ streaming: # If that persist operation exceeds this timeout, the wal recovery module will close right now. gracefulCloseTimeout: 3s walTruncate: - # The interval of sampling wal checkpoint when truncate, 1m by default. + # The interval of sampling wal checkpoint when truncate, 30m by default. # Every time the checkpoint is persisted, the checkpoint will be sampled and used to be a candidate of truncate checkpoint. # More samples, more frequent truncate, more memory usage. - sampleInterval: 1m - # The retention interval of wal truncate, 5m by default. + sampleInterval: 30m + # The retention interval of wal truncate, 26h by default. # If the sampled checkpoint is older than this interval, it will be used to truncate wal checkpoint. - # Greater the interval, more wal storage usage, more redundant data in wal - retentionInterval: 5m + # Greater the interval, more wal storage usage, more redundant data in wal. + # Because current query path doesn't promise the read operation not happen before the truncate point, + # retention interval should be greater than the dataCoord.segment.maxLife to avoid the message lost at query path. + # If the wal is pulsar, the pulsar should close the subscription expiration to avoid the message lost. + # because the wal truncate operation is implemented by pulsar consumer. + retentionInterval: 26h # Any configuration related to the knowhere vector search engine knowhere: diff --git a/internal/storage/compress/zstd.go b/internal/storage/compress/zstd.go index 70a8ad14fe..0b19acda81 100644 --- a/internal/storage/compress/zstd.go +++ b/internal/storage/compress/zstd.go @@ -26,6 +26,7 @@ import ( "github.com/apache/arrow/go/v17/parquet/compress" "github.com/klauspost/compress/zstd" + "github.com/milvus-io/milvus/pkg/v2/util/hardware" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) @@ -119,8 +120,8 @@ func (zstdCodec) getConcurrency() int { // But most of the time, we only serialize 16MB data for one binlog generation. // So 1 is enough for most cases to avoid to use too much memory. concurrent := paramtable.Get().CommonCfg.StorageZstdConcurrency.GetAsInt() - if concurrent < 0 { - return 0 + if concurrent <= 0 { + return hardware.GetCPUNum() } return concurrent } diff --git a/internal/streamingnode/server/wal/interceptors/timetick/inspector/impls.go b/internal/streamingnode/server/wal/interceptors/timetick/inspector/impls.go index b03f2ce46c..7d655b1463 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/inspector/impls.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/inspector/impls.go @@ -1,6 +1,7 @@ package inspector import ( + "sync" "time" "go.uber.org/zap" @@ -27,6 +28,8 @@ type timeTickSyncInspectorImpl struct { taskNotifier *syncutil.AsyncTaskNotifier[struct{}] syncNotifier *syncNotifier operators *typeutil.ConcurrentMap[string, TimeTickSyncOperator] + wg sync.WaitGroup + working typeutil.ConcurrentSet[string] } func (s *timeTickSyncInspectorImpl) TriggerSync(pChannelInfo types.PChannelInfo, persisted bool) { @@ -71,22 +74,40 @@ func (s *timeTickSyncInspectorImpl) background() { case <-s.taskNotifier.Context().Done(): return case <-ticker.C: - s.operators.Range(func(_ string, operator TimeTickSyncOperator) bool { - operator.Sync(s.taskNotifier.Context(), false) + s.operators.Range(func(name string, _ TimeTickSyncOperator) bool { + s.asyncSync(name, false) return true }) case <-s.syncNotifier.WaitChan(): signals := s.syncNotifier.Get() for pchannel, persisted := range signals { - if operator, ok := s.operators.Get(pchannel.Name); ok { - operator.Sync(s.taskNotifier.Context(), persisted) - } + s.asyncSync(pchannel.Name, persisted) } } } } +// asyncSync syncs the pchannel in a goroutine. +func (s *timeTickSyncInspectorImpl) asyncSync(pchannelName string, persisted bool) { + if !s.working.Insert(pchannelName) { + // Check if the sync operation of pchannel is working, if so, skip it. + return + } + + s.wg.Add(1) + go func() { + defer func() { + s.wg.Done() + s.working.Remove(pchannelName) + }() + if operator, ok := s.operators.Get(pchannelName); ok { + operator.Sync(s.taskNotifier.Context(), persisted) + } + }() +} + func (s *timeTickSyncInspectorImpl) Close() { s.taskNotifier.Cancel() s.taskNotifier.BlockUntilFinish() + s.wg.Wait() } diff --git a/internal/streamingnode/server/wal/recovery/config_test.go b/internal/streamingnode/server/wal/recovery/config_test.go index 71d53ac67b..238c79e4b9 100644 --- a/internal/streamingnode/server/wal/recovery/config_test.go +++ b/internal/streamingnode/server/wal/recovery/config_test.go @@ -55,6 +55,6 @@ func TestTruncatorConfig(t *testing.T) { paramtable.Init() cfg := newTruncatorConfig() - assert.Equal(t, 1*time.Minute, cfg.sampleInterval) - assert.Equal(t, 5*time.Minute, cfg.retentionInterval) + assert.Equal(t, 30*time.Minute, cfg.sampleInterval) + assert.Equal(t, 26*time.Hour, cfg.retentionInterval) } diff --git a/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go b/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go new file mode 100644 index 0000000000..da2782cfad --- /dev/null +++ b/pkg/streaming/walimpls/impls/pulsar/backlog_clear_helper.go @@ -0,0 +1,115 @@ +package pulsar + +import ( + "sync" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/cockroachdb/errors" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" + "github.com/milvus-io/milvus/pkg/v2/util/retry" + "github.com/milvus-io/milvus/pkg/v2/util/syncutil" +) + +const ( + backlogClearHelperName = "backlog-clear" +) + +// backlogClearHelper is a helper to clear the backlog of pulsar. +type backlogClearHelper struct { + log.Binder + + notifier *syncutil.AsyncTaskNotifier[struct{}] + cond *syncutil.ContextCond + written int64 + threshold int64 + channelName types.PChannelInfo + c pulsar.Client +} + +// newBacklogClearHelper creates a new backlog clear helper. +func newBacklogClearHelper(c pulsar.Client, channelName types.PChannelInfo, threshold int64) *backlogClearHelper { + h := &backlogClearHelper{ + notifier: syncutil.NewAsyncTaskNotifier[struct{}](), + cond: syncutil.NewContextCond(&sync.Mutex{}), + written: threshold, // trigger the backlog clear immediately. + threshold: threshold, + channelName: channelName, + c: c, + } + h.SetLogger(log.With(zap.String("channel", channelName.String()))) + go h.background() + return h +} + +// ObserveAppend observes the append traffic. +func (h *backlogClearHelper) ObserveAppend(size int) { + h.cond.L.Lock() + h.written += int64(size) + if h.written >= h.threshold { + h.cond.UnsafeBroadcast() + } + h.cond.L.Unlock() +} + +// background is the background goroutine to clear the backlog. +func (h *backlogClearHelper) background() { + defer func() { + h.notifier.Finish(struct{}{}) + h.Logger().Info("backlog clear helper exit") + }() + + for { + h.cond.L.Lock() + for h.written < h.threshold { + if err := h.cond.Wait(h.notifier.Context()); err != nil { + return + } + } + h.written = 0 + h.cond.L.Unlock() + + if err := retry.Do(h.notifier.Context(), func() error { + if h.notifier.Context().Err() != nil { + return h.notifier.Context().Err() + } + if err := h.performBacklogClear(); err != nil { + h.Logger().Warn("failed to perform backlog clear", zap.Error(err)) + return err + } + h.Logger().Debug("perform backlog clear done") + return nil + }, retry.AttemptAlways()); err != nil { + return + } + } +} + +// performBacklogClear performs the backlog clear. +func (h *backlogClearHelper) performBacklogClear() error { + cursor, err := h.c.Subscribe(pulsar.ConsumerOptions{ + Topic: h.channelName.Name, + SubscriptionName: backlogClearHelperName, + Type: pulsar.Exclusive, + MaxPendingChunkedMessage: 0, + StartMessageIDInclusive: true, + }) + if err != nil { + return errors.Wrap(err, "when create subscription") + } + defer cursor.Close() + + if err := cursor.SeekByTime(time.Now()); err != nil { + return errors.Wrap(err, "when seek to latest message") + } + return nil +} + +// Close closes the backlog clear helper. +func (h *backlogClearHelper) Close() { + h.notifier.Cancel() + h.notifier.BlockUntilFinish() +} diff --git a/pkg/streaming/walimpls/impls/pulsar/opener.go b/pkg/streaming/walimpls/impls/pulsar/opener.go index d565cd4e60..8771a6f946 100644 --- a/pkg/streaming/walimpls/impls/pulsar/opener.go +++ b/pkg/streaming/walimpls/impls/pulsar/opener.go @@ -8,6 +8,8 @@ import ( "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls" "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/helper" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" + "github.com/milvus-io/milvus/pkg/v2/util/syncutil" ) const ( @@ -26,36 +28,40 @@ func (o *openerImpl) Open(ctx context.Context, opt *walimpls.OpenOption) (walimp if err := opt.Validate(); err != nil { return nil, err } - var p pulsar.Producer - if opt.Channel.AccessMode == types.AccessModeRW { - var err error - p, err = o.c.CreateProducer(pulsar.ProducerOptions{ - Topic: opt.Channel.Name, - // TODO: current go pulsar client does not support fencing, we should enable it after go pulsar client supports it. - // ProducerAccessMode: pulsar.ProducerAccessModeExclusiveWithFencing, - }) - if err != nil { - return nil, err - } - // Initialize a persistent cursor to protect the topic from being retention. - cursor, err := o.c.Subscribe(pulsar.ConsumerOptions{ - Topic: opt.Channel.Name, - SubscriptionName: truncateCursorSubscriptionName, - Type: pulsar.Exclusive, - MaxPendingChunkedMessage: 1, - SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, - }) - if err != nil { - return nil, err + var backlogClearHelper *backlogClearHelper + if opt.Channel.AccessMode == types.AccessModeRW { + backlogAutoClearBytes := paramtable.Get().PulsarCfg.BacklogAutoClearBytes.GetAsSize() + if backlogAutoClearBytes > 0 { + backlogClearHelper = newBacklogClearHelper(o.c, opt.Channel, backlogAutoClearBytes) + } else { + // Initialize a persistent cursor to protect the topic from being retention. + cursor, err := o.c.Subscribe(pulsar.ConsumerOptions{ + Topic: opt.Channel.Name, + SubscriptionName: truncateCursorSubscriptionName, + Type: pulsar.Exclusive, + MaxPendingChunkedMessage: 0, + SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, + }) + if err != nil { + return nil, err + } + cursor.Close() } - cursor.Close() } - return &walImpl{ - WALHelper: helper.NewWALHelper(opt), - p: p, - c: o.c, - }, nil + w := &walImpl{ + WALHelper: helper.NewWALHelper(opt), + c: o.c, + p: syncutil.NewFuture[pulsar.Producer](), + notifier: syncutil.NewAsyncTaskNotifier[struct{}](), + backlogClearHelper: backlogClearHelper, + } + if opt.Channel.AccessMode == types.AccessModeRW { + // because the producer of pulsar cannot be created if the topic is backlog exceeded, + // so we need to set the producer at background with backoff retry. + w.initProducerAtBackground() + } + return w, nil } // Close closes the opener resources. diff --git a/pkg/streaming/walimpls/impls/pulsar/wal.go b/pkg/streaming/walimpls/impls/pulsar/wal.go index 6ba724f0aa..f36655311d 100644 --- a/pkg/streaming/walimpls/impls/pulsar/wal.go +++ b/pkg/streaming/walimpls/impls/pulsar/wal.go @@ -2,8 +2,11 @@ package pulsar import ( "context" + "time" "github.com/apache/pulsar-client-go/pulsar" + "github.com/cenkalti/backoff/v4" + "github.com/cockroachdb/errors" "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/v2/proto/streamingpb" @@ -11,14 +14,59 @@ import ( "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls" "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/helper" + "github.com/milvus-io/milvus/pkg/v2/util/syncutil" ) var _ walimpls.WALImpls = (*walImpl)(nil) type walImpl struct { *helper.WALHelper - c pulsar.Client - p pulsar.Producer + c pulsar.Client + p *syncutil.Future[pulsar.Producer] + notifier *syncutil.AsyncTaskNotifier[struct{}] + backlogClearHelper *backlogClearHelper +} + +// initProducerAtBackground initializes the producer at background. +func (w *walImpl) initProducerAtBackground() { + if w.Channel().AccessMode != types.AccessModeRW { + panic("producer should not be initialized on a wal that is not in read-write mode") + } + + defer w.notifier.Finish(struct{}{}) + backoff := backoff.NewExponentialBackOff() + backoff.InitialInterval = 10 * time.Millisecond + backoff.MaxInterval = 10 * time.Second + backoff.MaxElapsedTime = 0 + backoff.Reset() + + for { + if err := w.initProducer(); err == nil { + return + } + select { + case <-time.After(backoff.NextBackOff()): + continue + case <-w.notifier.Context().Done(): + return + } + } +} + +// initProducer initializes the producer. +func (w *walImpl) initProducer() error { + p, err := w.c.CreateProducer(pulsar.ProducerOptions{ + Topic: w.Channel().Name, + // TODO: current go pulsar client does not support fencing, we should enable it after go pulsar client supports it. + // ProducerAccessMode: pulsar.ProducerAccessModeExclusiveWithFencing, + }) + if err != nil { + w.Log().Warn("create producer failed", zap.Error(err)) + return err + } + w.Log().Info("pulsar create producer done") + w.p.Set(p) + return nil } func (w *walImpl) WALName() string { @@ -29,10 +77,19 @@ func (w *walImpl) Append(ctx context.Context, msg message.MutableMessage) (messa if w.Channel().AccessMode != types.AccessModeRW { panic("write on a wal that is not in read-write mode") } - id, err := w.p.Send(ctx, &pulsar.ProducerMessage{ + p, err := w.p.GetWithContext(ctx) + if err != nil { + return nil, errors.Wrap(err, "get producer from future") + } + id, err := p.Send(ctx, &pulsar.ProducerMessage{ Payload: msg.Payload(), Properties: msg.Properties().ToRawMap(), }) + if w.backlogClearHelper != nil { + // Observe the append traffic even if the message is not sent successfully. + // Because if the write is failed, the message may be already written to the pulsar topic. + w.backlogClearHelper.ObserveAppend(len(msg.Payload())) + } if err != nil { w.Log().RatedWarn(1, "send message to pulsar failed", zap.Error(err)) return nil, err @@ -80,6 +137,10 @@ func (w *walImpl) Truncate(ctx context.Context, id message.MessageID) error { if w.Channel().AccessMode != types.AccessModeRW { panic("truncate on a wal that is not in read-write mode") } + if w.backlogClearHelper != nil { + // if the backlog clear helper is enabled, the truncate make no sense, skip it. + return nil + } cursor, err := w.c.Subscribe(pulsar.ConsumerOptions{ Topic: w.Channel().Name, SubscriptionName: truncateCursorSubscriptionName, @@ -95,7 +156,13 @@ func (w *walImpl) Truncate(ctx context.Context, id message.MessageID) error { } func (w *walImpl) Close() { - if w.p != nil { - w.p.Close() // close producer + w.notifier.Cancel() + w.notifier.BlockUntilFinish() + // close producer if it is initialized + if w.p.Ready() { + w.p.Get().Close() + } + if w.backlogClearHelper != nil { + w.backlogClearHelper.Close() } } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 8bba74a051..c815646abd 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -5659,8 +5659,9 @@ type streamingConfig struct { WALRecoveryPersistInterval ParamItem `refreshable:"true"` WALRecoveryMaxDirtyMessage ParamItem `refreshable:"true"` WALRecoveryGracefulCloseTimeout ParamItem `refreshable:"true"` - WALTruncateSampleInterval ParamItem `refreshable:"true"` - WALTruncateRetentionInterval ParamItem `refreshable:"true"` + + WALTruncateSampleInterval ParamItem `refreshable:"true"` + WALTruncateRetentionInterval ParamItem `refreshable:"true"` } func (p *streamingConfig) init(base *BaseTable) { @@ -5888,10 +5889,10 @@ If that persist operation exceeds this timeout, the wal recovery module will clo p.WALTruncateSampleInterval = ParamItem{ Key: "streaming.walTruncate.sampleInterval", Version: "2.6.0", - Doc: `The interval of sampling wal checkpoint when truncate, 1m by default. + Doc: `The interval of sampling wal checkpoint when truncate, 30m by default. Every time the checkpoint is persisted, the checkpoint will be sampled and used to be a candidate of truncate checkpoint. More samples, more frequent truncate, more memory usage.`, - DefaultValue: "1m", + DefaultValue: "30m", Export: true, } p.WALTruncateSampleInterval.Init(base.mgr) @@ -5899,10 +5900,14 @@ More samples, more frequent truncate, more memory usage.`, p.WALTruncateRetentionInterval = ParamItem{ Key: "streaming.walTruncate.retentionInterval", Version: "2.6.0", - Doc: `The retention interval of wal truncate, 5m by default. + Doc: `The retention interval of wal truncate, 26h by default. If the sampled checkpoint is older than this interval, it will be used to truncate wal checkpoint. -Greater the interval, more wal storage usage, more redundant data in wal`, - DefaultValue: "5m", +Greater the interval, more wal storage usage, more redundant data in wal. +Because current query path doesn't promise the read operation not happen before the truncate point, +retention interval should be greater than the dataCoord.segment.maxLife to avoid the message lost at query path. +If the wal is pulsar, the pulsar should close the subscription expiration to avoid the message lost. +because the wal truncate operation is implemented by pulsar consumer.`, + DefaultValue: "26h", Export: true, } p.WALTruncateRetentionInterval.Init(base.mgr) diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 3fb549a6be..6cb2e85740 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -650,8 +650,8 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, float64(0.6), params.StreamingCfg.FlushMemoryThreshold.GetAsFloat()) assert.Equal(t, float64(0.2), params.StreamingCfg.FlushGrowingSegmentBytesHwmThreshold.GetAsFloat()) assert.Equal(t, float64(0.1), params.StreamingCfg.FlushGrowingSegmentBytesLwmThreshold.GetAsFloat()) - assert.Equal(t, 1*time.Minute, params.StreamingCfg.WALTruncateSampleInterval.GetAsDurationByParse()) - assert.Equal(t, 5*time.Minute, params.StreamingCfg.WALTruncateRetentionInterval.GetAsDurationByParse()) + assert.Equal(t, 30*time.Minute, params.StreamingCfg.WALTruncateSampleInterval.GetAsDurationByParse()) + assert.Equal(t, 26*time.Hour, params.StreamingCfg.WALTruncateRetentionInterval.GetAsDurationByParse()) params.Save(params.StreamingCfg.WALBalancerTriggerInterval.Key, "50s") params.Save(params.StreamingCfg.WALBalancerBackoffInitialInterval.Key, "50s") diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index e44f374fbd..76e7b90edb 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -875,6 +875,8 @@ type PulsarConfig struct { // Enable Client side metrics EnableClientMetrics ParamItem `refreshable:"false"` + + BacklogAutoClearBytes ParamItem `refreshable:"false"` } func (p *PulsarConfig) Init(base *BaseTable) { @@ -1007,6 +1009,23 @@ To share a Pulsar instance among multiple Milvus instances, you can change this Export: true, } p.EnableClientMetrics.Init(base.mgr) + + p.BacklogAutoClearBytes = ParamItem{ + Key: "pulsar.backlogAutoClearBytes", + Version: "2.6.0", + DefaultValue: "100m", + Doc: `Perform a backlog cleanup every time the data of given bytes is written. +Because milvus use puslar reader to read the message, so if there's no pulsar subscriber when milvus running. +If the pulsar cluster open the backlog protection (backlogQuotaDefaultLimitBytes), the backlog exceed will reported to fail the write operation +set this option to non-zero will create a subscription seek to latest position to clear the pulsar backlog. +If these options is non-zero, the wal data in pulsar is fully protected by retention policy, +so admin of pulsar should give enough retention time to avoid the wal message lost. +If these options is zero, no subscription will be created, so pulsar cluster must close the backlog protection, otherwise the milvus can not recovered if backlog exceed. +Moreover, if these option is zero, Milvus use a truncation subscriber to protect the wal data in pulsar if user disable the subscriptionExpirationTimeMinutes. +The retention policy of pulsar can set shorter to save the storage space in this case.`, + Export: true, + } + p.BacklogAutoClearBytes.Init(base.mgr) } // --- kafka --- diff --git a/pkg/util/paramtable/service_param_test.go b/pkg/util/paramtable/service_param_test.go index be2ea653a4..ed6c441ae4 100644 --- a/pkg/util/paramtable/service_param_test.go +++ b/pkg/util/paramtable/service_param_test.go @@ -193,6 +193,11 @@ func TestServiceParam(t *testing.T) { assert.Equal(t, "60", Params.RequestTimeout.GetValue()) }) + t.Run("pulsar_backlog_auto_clear_bytes", func(t *testing.T) { + Params := &SParams.PulsarCfg + assert.Equal(t, int64(100*1024*1024), Params.BacklogAutoClearBytes.GetAsSize()) + }) + t.Run("test rocksmqConfig", func(t *testing.T) { Params := &SParams.RocksmqCfg