diff --git a/internal/streamingnode/server/wal/vchantempstore/vchannel_temp_storage.go b/internal/streamingnode/server/wal/vchantempstore/vchannel_temp_storage.go index fa616152c4..baaa12bee7 100644 --- a/internal/streamingnode/server/wal/vchantempstore/vchannel_temp_storage.go +++ b/internal/streamingnode/server/wal/vchantempstore/vchannel_temp_storage.go @@ -40,6 +40,12 @@ type VChannelTempStorage struct { func (ts *VChannelTempStorage) GetVChannelByPChannelOfCollection(ctx context.Context, collectionID int64, pchannel string) (string, error) { if err := ts.updateVChannelByPChannelOfCollectionIfNotExist(ctx, collectionID); err != nil { + if ctx.Err() != nil { + // Because underlying mixcoord client may report grpc rpc ctx error, + // and the retry.Retry doesn't return the context.Error, + // so we check the ctx error here and return it directly to the caller. + return "", ctx.Err() + } return "", err } @@ -91,5 +97,5 @@ func (ts *VChannelTempStorage) updateVChannelByPChannelOfCollectionIfNotExist(ct ts.mu.Unlock() } return err - }) + }, retry.AttemptAlways()) } diff --git a/pkg/streaming/walimpls/impls/pulsar/scanner.go b/pkg/streaming/walimpls/impls/pulsar/scanner.go index d28a2d9dd5..f99ac6e1fd 100644 --- a/pkg/streaming/walimpls/impls/pulsar/scanner.go +++ b/pkg/streaming/walimpls/impls/pulsar/scanner.go @@ -50,10 +50,15 @@ func (s *scannerImpl) executeConsume() { for { msg, err := s.reader.Next(s.Context()) if err != nil { - if errors.IsAny(err, context.Canceled, context.DeadlineExceeded) { + // underlying mq may report ctx error, so we need to check the ctx error here to avoid return nil Error() without close. + if s.Context().Err() != nil { s.Finish(nil) return } + if errors.IsAny(err, context.Canceled, context.DeadlineExceeded) { + s.Finish(errors.Wrap(err, "pulsar readNext timeout")) + return + } s.Finish(err) return } diff --git a/pkg/streaming/walimpls/impls/wp/scanner.go b/pkg/streaming/walimpls/impls/wp/scanner.go index 9f1c583ed9..d118fa99a5 100644 --- a/pkg/streaming/walimpls/impls/wp/scanner.go +++ b/pkg/streaming/walimpls/impls/wp/scanner.go @@ -54,10 +54,15 @@ func (s *scannerImpl) executeConsumer() { for { msg, err := s.reader.ReadNext(s.Context()) if err != nil { - if errors.IsAny(err, context.Canceled, context.DeadlineExceeded) { + // underlying mq may report ctx error, so we need to check the ctx error here to avoid return nil Error() without close. + if s.Context().Err() != nil { s.Finish(nil) return } + if errors.IsAny(err, context.Canceled, context.DeadlineExceeded) { + s.Finish(errors.Wrap(err, "wp readNext Timeout")) + return + } log.Ctx(s.Context()).Error("wp readNext msg exception", zap.Error(err)) s.Finish(err) return