diff --git a/internal/util/rocksmq/client/rocksmq/reader_impl.go b/internal/util/rocksmq/client/rocksmq/reader_impl.go index 2da3ea77f7..0f2d48c0e6 100644 --- a/internal/util/rocksmq/client/rocksmq/reader_impl.go +++ b/internal/util/rocksmq/client/rocksmq/reader_impl.go @@ -74,6 +74,7 @@ func (r *reader) HasNext() bool { return r.c.server.HasNext(r.topic, r.name, r.startMessageIDInclusive) } +// Close close the reader and stop the blocking reader func (r *reader) Close() { r.c.server.CloseReader(r.topic, r.name) }