From 2ff657f2d95d266c845097f17c8a344ce9dffd21 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Fri, 28 Feb 2025 17:41:58 +0800 Subject: [PATCH] fix: wal may panics when context canceled (#40265) issue: #40264 - wal may panics when context canceled - scanner may data race when closing Signed-off-by: chyezh --- .../streamingnode/server/wal/adaptor/scanner_adaptor.go | 5 ++--- .../wal/interceptors/timetick/timetick_interceptor.go | 6 +++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go index 28377afc79..1d3e15d54d 100644 --- a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go @@ -98,7 +98,9 @@ func (s *scannerAdaptorImpl) execute() { s.logger.Info("scanner start background task") msgChan := make(chan message.ImmutableMessage) + ch := make(chan struct{}) + defer func() { <-ch }() // TODO: optimize the extra goroutine here after msgstream is removed. go func() { defer close(ch) @@ -116,9 +118,6 @@ func (s *scannerAdaptorImpl) execute() { return } s.logger.Warn("the consuming event loop of scanner is closed with unexpected error", zap.Error(err)) - - // waiting for the produce event loop to close. - <-ch } // produceEventLoop produces the message from the wal and write ahead buffer. diff --git a/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go b/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go index f2c967beb4..911f35e8a0 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go @@ -41,10 +41,14 @@ func (impl *timeTickAppendInterceptor) Ready() <-chan struct{} { // Do implements AppendInterceptor. func (impl *timeTickAppendInterceptor) DoAppend(ctx context.Context, msg message.MutableMessage, append interceptors.Append) (msgID message.MessageID, err error) { + cm, err := impl.operator.MVCCManager(ctx) + if err != nil { + return nil, err + } + defer func() { if err == nil { // the cursor manager should beready since the timetick interceptor is ready. - cm, _ := impl.operator.MVCCManager(ctx) cm.UpdateMVCC(msg) } }()