jaime 7a3a721380
Reconstruct mqstream module (#15784)
Signed-off-by: yun.zhang <yun.zhang@zilliz.com>
2022-03-03 21:57:56 +08:00

92 lines
2.8 KiB
Go

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package client
import (
"context"
)
// reader contains main options for rocksmq, and can only be set when newReader
type reader struct {
c *client
topic string
name string
startMessageID UniqueID
startMessageIDInclusive bool
subscriptionRolePrefix string
}
// newReader create a rocksmq reader from reader options
func newReader(c *client, readerOptions *ReaderOptions) (*reader, error) {
if c == nil {
return nil, newError(InvalidConfiguration, "client is nil")
}
if readerOptions == nil {
return nil, newError(InvalidConfiguration, "options is nil")
}
if readerOptions.Topic == "" {
return nil, newError(InvalidConfiguration, "topic is empty")
}
reader := &reader{
c: c,
topic: readerOptions.Topic,
name: readerOptions.Name,
startMessageID: readerOptions.StartMessageID,
startMessageIDInclusive: readerOptions.StartMessageIDInclusive,
subscriptionRolePrefix: readerOptions.SubscriptionRolePrefix,
}
if c.server == nil {
return nil, newError(InvalidConfiguration, "rmq server in client is nil")
}
name, err := c.server.CreateReader(readerOptions.Topic, reader.startMessageID, reader.startMessageIDInclusive, reader.subscriptionRolePrefix)
if err != nil {
return nil, err
}
reader.name = name
return reader, nil
}
//Topic return the topic name of the reader
func (r *reader) Topic() string {
return r.topic
}
// Next return the next message of reader, blocking until a message is available
func (r *reader) Next(ctx context.Context) (Message, error) {
cMsg, err := r.c.server.Next(ctx, r.topic, r.name)
if err != nil {
return Message{}, err
}
msg := Message{
MsgID: cMsg.MsgID,
Payload: cMsg.Payload,
Topic: r.topic,
}
return msg, nil
}
// HasNext check if there is a message available to read
func (r *reader) HasNext() bool {
return r.c.server.HasNext(r.topic, r.name)
}
// Close close the reader and stop the blocking reader
func (r *reader) Close() {
r.c.server.CloseReader(r.topic, r.name)
}
// Seek seek the reader to the position of message id
func (r *reader) Seek(msgID UniqueID) error { //nolint:govet
r.c.server.ReaderSeek(r.topic, r.name, msgID)
return nil
}