diff --git a/internal/streamingnode/server/wal/adaptor/ro_wal_adaptor.go b/internal/streamingnode/server/wal/adaptor/ro_wal_adaptor.go index d1a5e19431..229017f277 100644 --- a/internal/streamingnode/server/wal/adaptor/ro_wal_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/ro_wal_adaptor.go @@ -2,6 +2,7 @@ package adaptor import ( "context" + "time" "go.uber.org/zap" @@ -12,7 +13,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/streaming/util/message" "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls" - "github.com/milvus-io/milvus/pkg/v2/util/lifetime" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -21,7 +21,8 @@ var _ wal.WAL = (*roWALAdaptorImpl)(nil) type roWALAdaptorImpl struct { log.Binder lifetime *typeutil.Lifetime - available lifetime.SafeChan + availableCtx context.Context + availableCancel context.CancelFunc idAllocator *typeutil.IDAllocator roWALImpls walimpls.ROWALImpls scannerRegistry scannerRegistry @@ -89,12 +90,12 @@ func (w *roWALAdaptorImpl) Read(ctx context.Context, opts wal.ReadOption) (wal.S // IsAvailable returns whether the wal is available. func (w *roWALAdaptorImpl) IsAvailable() bool { - return !w.available.IsClosed() + return w.availableCtx.Err() == nil } // Available returns a channel that will be closed when the wal is shut down. func (w *roWALAdaptorImpl) Available() <-chan struct{} { - return w.available.CloseCh() + return w.availableCtx.Done() } // Close overrides Scanner Close function. @@ -102,7 +103,7 @@ func (w *roWALAdaptorImpl) Close() { // begin to close the wal. w.Logger().Info("wal begin to close...") w.lifetime.SetState(typeutil.LifetimeStateStopped) - w.available.Close() + w.forceCancelAfterGracefulTimeout() w.lifetime.Wait() w.Logger().Info("wal begin to close scanners...") @@ -124,3 +125,14 @@ func (w *roWALAdaptorImpl) Close() { // close all metrics. w.scanMetrics.Close() } + +// forceCancelAfterGracefulTimeout forces to cancel the context after the graceful timeout. +func (w *roWALAdaptorImpl) forceCancelAfterGracefulTimeout() { + if w.availableCtx.Err() != nil { + return + } + time.AfterFunc(3*time.Second, func() { + // perform a force cancel to avoid resource leak. + w.availableCancel() + }) +} diff --git a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go index 438fe09624..98286dca2c 100644 --- a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go @@ -22,7 +22,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/streaming/util/types" "github.com/milvus-io/milvus/pkg/v2/streaming/walimpls" "github.com/milvus-io/milvus/pkg/v2/util/conc" - "github.com/milvus-io/milvus/pkg/v2/util/lifetime" + "github.com/milvus-io/milvus/pkg/v2/util/contextutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -39,11 +39,13 @@ func adaptImplsToROWAL( log.FieldComponent("wal"), zap.String("channel", basicWAL.Channel().String()), ) + ctx, cancel := context.WithCancel(context.Background()) roWAL := &roWALAdaptorImpl{ - roWALImpls: basicWAL, - lifetime: typeutil.NewLifetime(), - available: lifetime.NewSafeChan(), - idAllocator: typeutil.NewIDAllocator(), + roWALImpls: basicWAL, + lifetime: typeutil.NewLifetime(), + availableCtx: ctx, + availableCancel: cancel, + idAllocator: typeutil.NewIDAllocator(), scannerRegistry: scannerRegistry{ channel: basicWAL.Channel(), idAllocator: typeutil.NewIDAllocator(), @@ -147,7 +149,7 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage) select { case <-ctx.Done(): return nil, ctx.Err() - case <-w.available.CloseCh(): + case <-w.availableCtx.Done(): return nil, status.NewOnShutdownError("wal is on shutdown") case <-w.interceptorBuildResult.Interceptor.Ready(): } @@ -155,6 +157,12 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage) // Setup the term of wal. msg = msg.WithWALTerm(w.Channel().Term) + // we need to promise the state of wal kept consistent with the memory state of streamingnode. + // So we don't allow the append operation can be canceled by the append caller to avoid leave a inconsistent state of alive wal, + // the wal append operation can only be canceled when the wal is shutting down. + ctx, cancel := contextutil.MergeContext(context.WithoutCancel(ctx), w.availableCtx) + defer cancel() + appendMetrics := w.writeMetrics.StartAppend(msg) ctx = utility.WithAppendMetricsContext(ctx, appendMetrics) @@ -181,7 +189,7 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage) if errors.Is(err, walimpls.ErrFenced) { // if the append operation of wal is fenced, we should report the error to the client. if w.isFenced.CompareAndSwap(false, true) { - w.available.Close() + w.forceCancelAfterGracefulTimeout() w.Logger().Warn("wal is fenced, mark as unavailable, all append opertions will be rejected", zap.Error(err)) } return nil, status.NewChannelFenced(w.Channel().String()) @@ -231,8 +239,8 @@ func (w *walAdaptorImpl) retryAppendWhenRecoverableError(ctx context.Context, ms select { case <-ctx.Done(): - return nil, ctx.Err() - case <-w.available.CloseCh(): + return nil, context.Cause(ctx) + case <-w.availableCtx.Done(): return nil, status.NewOnShutdownError("wal is on shutdown") case <-time.After(nextInterval): } @@ -265,7 +273,7 @@ func (w *walAdaptorImpl) Close() { // begin to close the wal. w.lifetime.SetState(typeutil.LifetimeStateStopped) - w.available.Close() + w.forceCancelAfterGracefulTimeout() w.lifetime.Wait() // close the flusher.