From f6fb4bc442da786f8a203816b6a1fcfa2730936a Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Thu, 13 Mar 2025 16:24:06 +0800 Subject: [PATCH] fix: backoff will retry infinitely after reaching max elapse (#40589) issue: #40588 Signed-off-by: chyezh --- .../internal/consumer/consumer_impl.go | 1 + .../streaming/internal/producer/producer.go | 3 ++- internal/rootcoord/dml_channels.go | 27 +++++++++++-------- .../client/handler/handler_client_impl.go | 3 +++ .../flusherimpl/pchannel_checkpoint.go | 1 + .../streamingutil/service/lazygrpc/conn.go | 8 +++++- pkg/util/typeutil/backoff_timer.go | 16 ++--------- pkg/util/typeutil/backoff_timer_test.go | 16 +++-------- 8 files changed, 35 insertions(+), 40 deletions(-) diff --git a/internal/distributed/streaming/internal/consumer/consumer_impl.go b/internal/distributed/streaming/internal/consumer/consumer_impl.go index 1f3a62a5a9..aa7fbf2b1c 100644 --- a/internal/distributed/streaming/internal/consumer/consumer_impl.go +++ b/internal/distributed/streaming/internal/consumer/consumer_impl.go @@ -130,6 +130,7 @@ func (rc *resumableConsumerImpl) createNewConsumer(opts *handler.ConsumerOptions backoff := backoff.NewExponentialBackOff() backoff.InitialInterval = 100 * time.Millisecond backoff.MaxInterval = 10 * time.Second + backoff.MaxElapsedTime = 0 for { // Create a new consumer. // a underlying stream consumer life time should be equal to the resumable producer. diff --git a/internal/distributed/streaming/internal/producer/producer.go b/internal/distributed/streaming/internal/producer/producer.go index c4fe1744ca..90fb95315b 100644 --- a/internal/distributed/streaming/internal/producer/producer.go +++ b/internal/distributed/streaming/internal/producer/producer.go @@ -152,7 +152,8 @@ func (p *ResumableProducer) waitUntilUnavailable(producer handler.Producer) erro func (p *ResumableProducer) createNewProducer() (producer.Producer, error) { backoff := backoff.NewExponentialBackOff() backoff.InitialInterval = 100 * time.Millisecond - backoff.MaxInterval = 2 * time.Second + backoff.MaxInterval = 10 * time.Second + backoff.MaxElapsedTime = 0 for { // Create a new producer. // a underlying stream producer life time should be equal to the resumable producer. diff --git a/internal/rootcoord/dml_channels.go b/internal/rootcoord/dml_channels.go index cbf430f4e6..9e10fc1233 100644 --- a/internal/rootcoord/dml_channels.go +++ b/internal/rootcoord/dml_channels.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/errors" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/mq/common" @@ -179,19 +180,23 @@ func newDmlChannels(initCtx context.Context, factory msgstream.Factory, chanName } for i, name := range names { - ms, err := factory.NewMsgStream(initCtx) - if err != nil { - log.Ctx(initCtx).Error("Failed to add msgstream", - zap.String("name", name), - zap.Error(err)) - panic("Failed to add msgstream") - } + var ms msgstream.MsgStream + if !streamingutil.IsStreamingServiceEnabled() { + var err error + ms, err = factory.NewMsgStream(initCtx) + if err != nil { + log.Ctx(initCtx).Error("Failed to add msgstream", + zap.String("name", name), + zap.Error(err)) + panic("Failed to add msgstream") + } - if params.PreCreatedTopicEnabled.GetAsBool() { - d.checkPreCreatedTopic(initCtx, factory, name) - } + if params.PreCreatedTopicEnabled.GetAsBool() { + d.checkPreCreatedTopic(initCtx, factory, name) + } - ms.AsProducer(initCtx, []string{name}) + ms.AsProducer(initCtx, []string{name}) + } dms := &dmlMsgStream{ ms: ms, refcnt: 0, diff --git a/internal/streamingnode/client/handler/handler_client_impl.go b/internal/streamingnode/client/handler/handler_client_impl.go index 60f1caabce..e97c7f85c3 100644 --- a/internal/streamingnode/client/handler/handler_client_impl.go +++ b/internal/streamingnode/client/handler/handler_client_impl.go @@ -155,6 +155,9 @@ type handlerCreateFunc func(ctx context.Context, assign *types.PChannelInfoAssig func (hc *handlerClientImpl) createHandlerAfterStreamingNodeReady(ctx context.Context, logger *log.MLogger, pchannel string, create handlerCreateFunc) (any, error) { // TODO: backoff should be configurable. backoff := backoff.NewExponentialBackOff() + backoff.InitialInterval = 100 * time.Millisecond + backoff.MaxInterval = 10 * time.Second + backoff.MaxElapsedTime = 0 for { assign := hc.watcher.Get(ctx, pchannel) if assign != nil { diff --git a/internal/streamingnode/server/flusher/flusherimpl/pchannel_checkpoint.go b/internal/streamingnode/server/flusher/flusherimpl/pchannel_checkpoint.go index 65c8056a10..240eb852fb 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/pchannel_checkpoint.go +++ b/internal/streamingnode/server/flusher/flusherimpl/pchannel_checkpoint.go @@ -117,6 +117,7 @@ func (m *pchannelCheckpointManager) background(previous message.MessageID) { backoff := backoff.NewExponentialBackOff() backoff.InitialInterval = 100 * time.Millisecond backoff.MaxInterval = 10 * time.Second + backoff.MaxElapsedTime = 0 for { current, err := m.blockUntilCheckpointUpdate(previous) if err != nil { diff --git a/internal/util/streamingutil/service/lazygrpc/conn.go b/internal/util/streamingutil/service/lazygrpc/conn.go index e375979695..bcd75944a9 100644 --- a/internal/util/streamingutil/service/lazygrpc/conn.go +++ b/internal/util/streamingutil/service/lazygrpc/conn.go @@ -2,6 +2,7 @@ package lazygrpc import ( "context" + "time" "github.com/cenkalti/backoff/v4" "github.com/cockroachdb/errors" @@ -50,6 +51,11 @@ type connImpl struct { func (c *connImpl) initialize() { defer c.initializationNotifier.Finish(struct{}{}) + newBackOff := backoff.NewExponentialBackOff() + newBackOff.InitialInterval = 100 * time.Millisecond + newBackOff.MaxInterval = 10 * time.Second + newBackOff.MaxElapsedTime = 0 + backoff.Retry(func() error { conn, err := c.dialer(c.initializationNotifier.Context()) if err != nil { @@ -62,7 +68,7 @@ func (c *connImpl) initialize() { } c.conn.Set(conn) return nil - }, backoff.NewExponentialBackOff()) + }, newBackOff) } func (c *connImpl) GetConn(ctx context.Context) (*grpc.ClientConn, error) { diff --git a/pkg/util/typeutil/backoff_timer.go b/pkg/util/typeutil/backoff_timer.go index 997ccb2839..f62848c193 100644 --- a/pkg/util/typeutil/backoff_timer.go +++ b/pkg/util/typeutil/backoff_timer.go @@ -55,13 +55,11 @@ type BackoffTimer struct { func (t *BackoffTimer) EnableBackoff() { if t.backoff == nil { cfg := t.configFetcher.BackoffConfig() - defaultInterval := t.configFetcher.DefaultInterval() backoff := backoff.NewExponentialBackOff() backoff.InitialInterval = cfg.InitialInterval backoff.Multiplier = cfg.Multiplier backoff.MaxInterval = cfg.MaxInterval - backoff.MaxElapsedTime = defaultInterval - backoff.Stop = defaultInterval + backoff.MaxElapsedTime = 0 backoff.Reset() t.backoff = backoff } @@ -72,14 +70,6 @@ func (t *BackoffTimer) DisableBackoff() { t.backoff = nil } -// IsBackoffStopped returns the elapsed time of backoff -func (t *BackoffTimer) IsBackoffStopped() bool { - if t.backoff != nil { - return t.backoff.GetElapsedTime() > t.backoff.MaxElapsedTime - } - return true -} - // NextTimer returns the next timer and the duration of the timer func (t *BackoffTimer) NextTimer() (<-chan time.Time, time.Duration) { nextBackoff := t.NextInterval() @@ -98,13 +88,11 @@ func (t *BackoffTimer) NextInterval() time.Duration { // NewBackoffWithInstant creates a new backoff with instant func NewBackoffWithInstant(fetcher BackoffTimerConfigFetcher) *BackoffWithInstant { cfg := fetcher.BackoffConfig() - defaultInterval := fetcher.DefaultInterval() backoff := backoff.NewExponentialBackOff() backoff.InitialInterval = cfg.InitialInterval backoff.Multiplier = cfg.Multiplier backoff.MaxInterval = cfg.MaxInterval - backoff.MaxElapsedTime = defaultInterval - backoff.Stop = defaultInterval + backoff.MaxElapsedTime = 0 backoff.Reset() return &BackoffWithInstant{ backoff: backoff, diff --git a/pkg/util/typeutil/backoff_timer_test.go b/pkg/util/typeutil/backoff_timer_test.go index ddc11c933e..23ed132ec4 100644 --- a/pkg/util/typeutil/backoff_timer_test.go +++ b/pkg/util/typeutil/backoff_timer_test.go @@ -21,24 +21,14 @@ func TestBackoffTimer(t *testing.T) { assert.Equal(t, time.Second, b.NextInterval()) assert.Equal(t, time.Second, b.NextInterval()) assert.Equal(t, time.Second, b.NextInterval()) - assert.True(t, b.IsBackoffStopped()) b.EnableBackoff() - assert.False(t, b.IsBackoffStopped()) timer, backoff := b.NextTimer() assert.Less(t, backoff, 200*time.Millisecond) - for { - <-timer - if b.IsBackoffStopped() { - break - } - timer, _ = b.NextTimer() - } - assert.True(t, b.IsBackoffStopped()) - - assert.Equal(t, time.Second, b.NextInterval()) + <-timer + _, backoff = b.NextTimer() + assert.NotZero(t, backoff) b.DisableBackoff() assert.Equal(t, time.Second, b.NextInterval()) - assert.True(t, b.IsBackoffStopped()) } }