milvus/internal/util/mqclient/pulsar_reader.go
yukun 94745ba3c3
[skip e2e]Add comment for pulsar reader (#14119)
Signed-off-by: fishpenguin <kun.yu@zilliz.com>
2021-12-24 11:01:19 +08:00

46 lines
910 B
Go

package mqclient
import (
"context"
"github.com/apache/pulsar-client-go/pulsar"
)
// pulsarReader contains a pulsar reader
type pulsarReader struct {
r pulsar.Reader
}
// Topic returns the topic of pulsar reader
func (pr *pulsarReader) Topic() string {
return pr.r.Topic()
}
// Next read the next message in the topic, blocking until a message is available
func (pr *pulsarReader) Next(ctx context.Context) (Message, error) {
pm, err := pr.r.Next(ctx)
if err != nil {
return nil, err
}
return &pulsarMessage{msg: pm}, nil
}
// HasNext check if there is any message available to read from the current position
func (pr *pulsarReader) HasNext() bool {
return pr.r.HasNext()
}
func (pr *pulsarReader) Close() {
pr.r.Close()
}
func (pr *pulsarReader) Seek(id MessageID) error {
messageID := id.(*pulsarID).messageID
err := pr.r.Seek(messageID)
if err != nil {
return err
}
return nil
}