diff --git a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go index 28377afc79..1d3e15d54d 100644 --- a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go @@ -98,7 +98,9 @@ func (s *scannerAdaptorImpl) execute() { s.logger.Info("scanner start background task") msgChan := make(chan message.ImmutableMessage) + ch := make(chan struct{}) + defer func() { <-ch }() // TODO: optimize the extra goroutine here after msgstream is removed. go func() { defer close(ch) @@ -116,9 +118,6 @@ func (s *scannerAdaptorImpl) execute() { return } s.logger.Warn("the consuming event loop of scanner is closed with unexpected error", zap.Error(err)) - - // waiting for the produce event loop to close. - <-ch } // produceEventLoop produces the message from the wal and write ahead buffer. diff --git a/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go b/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go index f2c967beb4..911f35e8a0 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go @@ -41,10 +41,14 @@ func (impl *timeTickAppendInterceptor) Ready() <-chan struct{} { // Do implements AppendInterceptor. func (impl *timeTickAppendInterceptor) DoAppend(ctx context.Context, msg message.MutableMessage, append interceptors.Append) (msgID message.MessageID, err error) { + cm, err := impl.operator.MVCCManager(ctx) + if err != nil { + return nil, err + } + defer func() { if err == nil { // the cursor manager should beready since the timetick interceptor is ready. - cm, _ := impl.operator.MVCCManager(ctx) cm.UpdateMVCC(msg) } }()