diff --git a/internal/streamingnode/server/wal/adaptor/ro_wal_adaptor.go b/internal/streamingnode/server/wal/adaptor/ro_wal_adaptor.go index 071576bb94..e3ebe6685f 100644 --- a/internal/streamingnode/server/wal/adaptor/ro_wal_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/ro_wal_adaptor.go @@ -95,8 +95,8 @@ func (w *roWALAdaptorImpl) Close() { // begin to close the wal. w.Logger().Info("wal begin to close...") w.lifetime.SetState(typeutil.LifetimeStateStopped) - w.lifetime.Wait() close(w.available) + w.lifetime.Wait() w.Logger().Info("wal begin to close scanners...") diff --git a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go index ccb35ec708..4d97feda4a 100644 --- a/internal/streamingnode/server/wal/adaptor/wal_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/wal_adaptor.go @@ -112,8 +112,11 @@ func (w *walAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage) select { case <-ctx.Done(): return nil, ctx.Err() + case <-w.available: + return nil, status.NewOnShutdownError("wal is on shutdown") case <-w.interceptorBuildResult.Interceptor.Ready(): } + // Setup the term of wal. msg = msg.WithWALTerm(w.Channel().Term) @@ -184,13 +187,12 @@ func (w *walAdaptorImpl) Close() { w.Logger().Info("wal begin to close, start graceful close...") // graceful close the interceptors before wal closing. w.interceptorBuildResult.GracefulCloseFunc() - w.Logger().Info("wal graceful close done, wait for operation to be finished...") // begin to close the wal. w.lifetime.SetState(typeutil.LifetimeStateStopped) - w.lifetime.Wait() close(w.available) + w.lifetime.Wait() w.Logger().Info("wal begin to close scanners...")