From 9d29e6ee641f0765e174f9dd9b02e092499cd3f5 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Mon, 27 Oct 2025 11:08:05 +0800 Subject: [PATCH] fix: append operation can be only canceled by the wal itself but not the rpc (#45078) issue: #45077 We need to promise the state of wal consistent with the memory state of streamingnode. So we don't allow the append operation can be cancelled by the append caller to avoid leave a inconsistent state of alive wal. The wal append operation can only be cancelled when the wal is shutting down. Signed-off-by: chyezh --- .../server/wal/adaptor/ro_wal_adaptor.go | 22 +++++++++++---- .../server/wal/adaptor/wal_adaptor.go | 28 ++++++++++++------- 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/internal/streamingnode/server/wal/adaptor/ro_wal_adaptor.go b/internal/streamingnode/server/wal/adaptor/ro_wal_adaptor.go index d1a5e19431..229017f277 100644 --- a/internal/streamingnode/server/wal/adaptor/ro_wal_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/ro_wal_adaptor.go @@ -2,6 +2,7 @@ package adaptor import ( "context" + "time" "go.uber.org/zap" @@ -12,7 +13,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls" - "github.com/milvus-io/milvus/pkg/v2/util/lifetime" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -21,7 +21,8 @@ var _ wal.WAL = (*roWALAdaptorImpl)(nil) type roWALAdaptorImpl struct { log.Binder lifetime *typeutil.Lifetime - available lifetime.SafeChan + availableCtx context.Context + availableCancel context.CancelFunc idAllocator *typeutil.IDAllocator roWALImpls walimpls.ROWALImpls scannerRegistry scannerRegistry @@ -89,12 +90,12 @@ func (w *roWALAdaptorImpl) Read(ctx context.Context, opts wal.ReadOption) (wal.S // IsAvailable returns whether the wal is available. func (w *roWALAdaptorImpl) IsAvailable() bool { - return !w.available.IsClosed() + return w.availableCtx.Err() == nil } // Available returns a channel that will be closed when the wal is shut down. func (w *roWALAdaptorImpl) Available() <-chan struct{} { - return w.available.CloseCh() + return w.availableCtx.Done() } // Close overrides Scanner Close function. @@ -102,7 +103,7 @@ func (w *roWALAdaptorImpl) Close() { // begin to close the wal. w.Logger().Info("wal begin to close...") w.lifetime.SetState(typeutil.LifetimeStateStopped) - w.available.Close() + w.forceCancelAfterGracefulTimeout() w.lifetime.Wait() w.Logger().Info("wal begin to close scanners...") @@ -124,3 +125,14 @@ func (w *roWALAdaptorImpl) Close() { // close all metrics. w.scanMetrics.Close() } + +// forceCancelAfterGracefulTimeout forces to cancel the context after the graceful timeout. +func (w *roWALAdaptorImpl) forceCancelAfterGracefulTimeout() { + if w.availableCtx.Err() != nil { + return + } + time.AfterFunc(3*time.Second, func() { + // perform a force cancel to avoid resource leak. + w.availableCancel() + }) +} diff --git a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go index 438fe09624..98286dca2c 100644 --- a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go @@ -22,7 +22,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls" "github.com/milvus-io/milvus/pkg/v2/util/conc" - "github.com/milvus-io/milvus/pkg/v2/util/lifetime" + "github.com/milvus-io/milvus/pkg/v2/util/contextutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -39,11 +39,13 @@ func adaptImplsToROWAL( log.FieldComponent("wal"), zap.String("channel", basicWAL.Channel().String()), ) + ctx, cancel := context.WithCancel(context.Background()) roWAL := &roWALAdaptorImpl{ - roWALImpls: basicWAL, - lifetime: typeutil.NewLifetime(), - available: lifetime.NewSafeChan(), - idAllocator: typeutil.NewIDAllocator(), + roWALImpls: basicWAL, + lifetime: typeutil.NewLifetime(), + availableCtx: ctx, + availableCancel: cancel, + idAllocator: typeutil.NewIDAllocator(), scannerRegistry: scannerRegistry{ channel: basicWAL.Channel(), idAllocator: typeutil.NewIDAllocator(), @@ -147,7 +149,7 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage) select { case <-ctx.Done(): return nil, ctx.Err() - case <-w.available.CloseCh(): + case <-w.availableCtx.Done(): return nil, status.NewOnShutdownError("wal is on shutdown") case <-w.interceptorBuildResult.Interceptor.Ready(): } @@ -155,6 +157,12 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage) // Setup the term of wal. msg = msg.WithWALTerm(w.Channel().Term) + // we need to promise the state of wal kept consistent with the memory state of streamingnode. + // So we don't allow the append operation can be canceled by the append caller to avoid leave a inconsistent state of alive wal, + // the wal append operation can only be canceled when the wal is shutting down. + ctx, cancel := contextutil.MergeContext(context.WithoutCancel(ctx), w.availableCtx) + defer cancel() + appendMetrics := w.writeMetrics.StartAppend(msg) ctx = utility.WithAppendMetricsContext(ctx, appendMetrics) @@ -181,7 +189,7 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage) if errors.Is(err, walimpls.ErrFenced) { // if the append operation of wal is fenced, we should report the error to the client. if w.isFenced.CompareAndSwap(false, true) { - w.available.Close() + w.forceCancelAfterGracefulTimeout() w.Logger().Warn("wal is fenced, mark as unavailable, all append opertions will be rejected", zap.Error(err)) } return nil, status.NewChannelFenced(w.Channel().String()) @@ -231,8 +239,8 @@ func (w *walAdaptorImpl) retryAppendWhenRecoverableError(ctx context.Context, ms select { case <-ctx.Done(): - return nil, ctx.Err() - case <-w.available.CloseCh(): + return nil, context.Cause(ctx) + case <-w.availableCtx.Done(): return nil, status.NewOnShutdownError("wal is on shutdown") case <-time.After(nextInterval): } @@ -265,7 +273,7 @@ func (w *walAdaptorImpl) Close() { // begin to close the wal. w.lifetime.SetState(typeutil.LifetimeStateStopped) - w.available.Close() + w.forceCancelAfterGracefulTimeout() w.lifetime.Wait() // close the flusher.