diff --git a/internal/mq/mqimpl/rocksmq/client/client.go b/internal/mq/mqimpl/rocksmq/client/client.go index 097e8d5bc5..cc25d8bd09 100644 --- a/internal/mq/mqimpl/rocksmq/client/client.go +++ b/internal/mq/mqimpl/rocksmq/client/client.go @@ -39,8 +39,6 @@ type Client interface { // Create a consumer instance and subscribe a topic Subscribe(options ConsumerOptions) (Consumer, error) - CreateReader(options ReaderOptions) (Reader, error) - // Close the client and free associated resources Close() } diff --git a/internal/mq/mqimpl/rocksmq/client/client_impl.go b/internal/mq/mqimpl/rocksmq/client/client_impl.go index 2b84313d76..18598cf31b 100644 --- a/internal/mq/mqimpl/rocksmq/client/client_impl.go +++ b/internal/mq/mqimpl/rocksmq/client/client_impl.go @@ -177,11 +177,6 @@ func (c *client) deliver(consumer *consumer, batchMax int) { } } -func (c *client) CreateReader(readerOptions ReaderOptions) (Reader, error) { - reader, err := newReader(c, &readerOptions) - return reader, err -} - // Close close the channel to notify rocksmq to stop operation and close rocksmq server func (c *client) Close() { c.closeOnce.Do(func() { diff --git a/internal/mq/mqimpl/rocksmq/client/consumer_impl.go b/internal/mq/mqimpl/rocksmq/client/consumer_impl.go index 9c014eecd9..dea745d8b9 100644 --- a/internal/mq/mqimpl/rocksmq/client/consumer_impl.go +++ b/internal/mq/mqimpl/rocksmq/client/consumer_impl.go @@ -128,9 +128,10 @@ func (c *consumer) Seek(id UniqueID) error { //nolint:govet // Close destroy current consumer in rocksmq func (c *consumer) Close() { + // TODO should panic? err := c.client.server.DestroyConsumerGroup(c.topic, c.consumerName) if err != nil { - log.Debug("Consumer close failed", zap.Any("topicName", c.topic), zap.Any("groupName", c.consumerName), zap.Any("error", err)) + log.Warn("Consumer close failed", zap.Any("topicName", c.topic), zap.Any("groupName", c.consumerName), zap.Any("error", err)) } } diff --git a/internal/mq/mqimpl/rocksmq/client/reader.go b/internal/mq/mqimpl/rocksmq/client/reader.go deleted file mode 100644 index 5ae6825f69..0000000000 --- a/internal/mq/mqimpl/rocksmq/client/reader.go +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package client - -import ( - "context" -) - -// ReaderMessage package Reader and Message as a struct to use -type ReaderMessage struct { - Reader - Message -} - -// ReaderOptions abstraction Reader options to use. -type ReaderOptions struct { - // Topic specify the topic this consumer will subscribe on. - // This argument is required when constructing the reader. - Topic string - - // Name set the reader name. - Name string - - // Attach a set of application defined properties to the reader - // This properties will be visible in the topic stats - Properties map[string]string - - // StartMessageID initial reader positioning is done by specifying a message id. The options are: - // * `MessageID` : Start reading from a particular message id, the reader will position itself on that - // specific position. The first message to be read will be the message next to the specified - // messageID - StartMessageID UniqueID - - // If true, the reader will start at the `StartMessageID`, included. - // Default is `false` and the reader will start from the "next" message - StartMessageIDInclusive bool - - // SubscriptionRolePrefix set the subscription role prefix. The default prefix is "reader". - SubscriptionRolePrefix string -} - -// Reader can be used to scan through all the messages currently available in a topic. -type Reader interface { - // Topic from which this reader is reading from - Topic() string - - // Next read the next message in the topic, blocking until a message is available - Next(context.Context) (Message, error) - - // HasNext check if there is any message available to read from the current position - HasNext() bool - - // Close the reader and stop the broker to push more messages - Close() - - // Reset the subscription associated with this reader to a specific message id. - Seek(UniqueID) error //nolint:govet - -} diff --git a/internal/mq/mqimpl/rocksmq/client/reader_impl.go b/internal/mq/mqimpl/rocksmq/client/reader_impl.go deleted file mode 100644 index 0e5e6c75f7..0000000000 --- a/internal/mq/mqimpl/rocksmq/client/reader_impl.go +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package client - -import ( - "context" -) - -// reader contains main options for rocksmq, and can only be set when newReader -type reader struct { - c *client - topic string - name string - startMessageID UniqueID - startMessageIDInclusive bool - subscriptionRolePrefix string -} - -// newReader create a rocksmq reader from reader options -func newReader(c *client, readerOptions *ReaderOptions) (*reader, error) { - if c == nil { - return nil, newError(InvalidConfiguration, "client is nil") - } - if readerOptions == nil { - return nil, newError(InvalidConfiguration, "options is nil") - } - if readerOptions.Topic == "" { - return nil, newError(InvalidConfiguration, "topic is empty") - } - reader := &reader{ - c: c, - topic: readerOptions.Topic, - name: readerOptions.Name, - startMessageID: readerOptions.StartMessageID, - startMessageIDInclusive: readerOptions.StartMessageIDInclusive, - subscriptionRolePrefix: readerOptions.SubscriptionRolePrefix, - } - if c.server == nil { - return nil, newError(InvalidConfiguration, "rmq server in client is nil") - } - name, err := c.server.CreateReader(readerOptions.Topic, reader.startMessageID, reader.startMessageIDInclusive, reader.subscriptionRolePrefix) - if err != nil { - return nil, err - } - reader.name = name - return reader, nil -} - -//Topic return the topic name of the reader -func (r *reader) Topic() string { - return r.topic -} - -// Next return the next message of reader, blocking until a message is available -func (r *reader) Next(ctx context.Context) (Message, error) { - cMsg, err := r.c.server.Next(ctx, r.topic, r.name) - if err != nil { - return Message{}, err - } - msg := Message{ - MsgID: cMsg.MsgID, - Payload: cMsg.Payload, - Topic: r.topic, - } - return msg, nil -} - -// HasNext check if there is a message available to read -func (r *reader) HasNext() bool { - return r.c.server.HasNext(r.topic, r.name) -} - -// Close close the reader and stop the blocking reader -func (r *reader) Close() { - r.c.server.CloseReader(r.topic, r.name) -} - -// Seek seek the reader to the position of message id -func (r *reader) Seek(msgID UniqueID) error { //nolint:govet - r.c.server.ReaderSeek(r.topic, r.name, msgID) - return nil -} diff --git a/internal/mq/mqimpl/rocksmq/client/reader_impl_test.go b/internal/mq/mqimpl/rocksmq/client/reader_impl_test.go deleted file mode 100644 index 9658e4a899..0000000000 --- a/internal/mq/mqimpl/rocksmq/client/reader_impl_test.go +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package client - -import ( - "context" - "os" - "strconv" - "testing" - - "github.com/stretchr/testify/assert" -) - -func Test_NewReader(t *testing.T) { - reader, err := newReader(nil, nil) - assert.Error(t, err) - assert.Nil(t, reader) - - reader, err = newReader(newMockClient(), nil) - assert.Error(t, err) - assert.Nil(t, reader) - - options := &ReaderOptions{} - reader, err = newReader(newMockClient(), options) - assert.Error(t, err) - assert.Nil(t, reader) - - options.Topic = newTopicName() - reader, err = newReader(newMockClient(), options) - assert.Error(t, err) - assert.Nil(t, reader) -} - -func TestReader_Next(t *testing.T) { - os.MkdirAll(rmqPath, os.ModePerm) - rmqPathTest := rmqPath + "/test_reader" - rmq := newRocksMQ(t, rmqPathTest) - defer removePath(rmqPath) - client, err := newClient(Options{ - Server: rmq, - }) - assert.NoError(t, err) - assert.NotNil(t, client) - defer client.Close() - - topicName := newTopicName() - - producer, err := client.CreateProducer(ProducerOptions{ - Topic: topicName, - }) - assert.NotNil(t, producer) - assert.NoError(t, err) - - msgNum := 10 - ids := make([]UniqueID, 0) - for i := 0; i < msgNum; i++ { - msg := &ProducerMessage{ - Payload: []byte("message_" + strconv.FormatInt(int64(i), 10)), - } - id, err := producer.Send(msg) - assert.NoError(t, err) - ids = append(ids, id) - } - - reader1, err := newReader(client, &ReaderOptions{ - Topic: topicName, - StartMessageIDInclusive: true, - SubscriptionRolePrefix: "reder1", - }) - assert.NoError(t, err) - assert.NotNil(t, reader1) - assert.Equal(t, reader1.Topic(), topicName) - defer reader1.Close() - - reader1.Seek(ids[1]) - ctx := context.Background() - for i := 1; i < msgNum; i++ { - assert.True(t, reader1.HasNext()) - rMsg, err := reader1.Next(ctx) - assert.NoError(t, err) - assert.Equal(t, rMsg.MsgID, ids[i]) - } - assert.False(t, reader1.HasNext()) - - reader2, err := newReader(client, &ReaderOptions{ - Topic: topicName, - StartMessageIDInclusive: false, - SubscriptionRolePrefix: "reader2", - }) - assert.NoError(t, err) - - reader2.Seek(ids[5]) - for i := 5; i < msgNum-1; i++ { - assert.True(t, reader2.HasNext()) - rMsg, err := reader2.Next(ctx) - assert.NoError(t, err) - assert.Equal(t, rMsg.MsgID, ids[i+1]) - } - assert.False(t, reader2.HasNext()) -} diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq.go b/internal/mq/mqimpl/rocksmq/server/rocksmq.go index 8623ed1169..aa8fbecc2c 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq.go @@ -11,8 +11,6 @@ package server -import "context" - // ProducerMessage that will be written to rocksdb type ProducerMessage struct { Payload []byte @@ -51,10 +49,4 @@ type RocksMQ interface { ExistConsumerGroup(topicName string, groupName string) (bool, *Consumer, error) Notify(topicName, groupName string) - - CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool, subscriptionRolePrefix string) (string, error) - ReaderSeek(topicName string, readerName string, msgID UniqueID) error - Next(ctx context.Context, topicName string, readerName string) (*ConsumerMessage, error) - HasNext(topicName string, readerName string) bool - CloseReader(topicName string, readerName string) } diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go index f3694e4643..f742e1bb8c 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go @@ -12,7 +12,6 @@ package server import ( - "context" "errors" "fmt" "math" @@ -289,7 +288,7 @@ func (rmq *rocksmq) CreateTopic(topicName string) error { // Check if topicName contains "/" if strings.Contains(topicName, "/") { - log.Error("rocksmq failed to create topic for topic name contains \"/\"", zap.String("topic", topicName)) + log.Warn("rocksmq failed to create topic for topic name contains \"/\"", zap.String("topic", topicName)) return retry.Unrecoverable(fmt.Errorf("topic name = %s contains \"/\"", topicName)) } @@ -300,7 +299,7 @@ func (rmq *rocksmq) CreateTopic(topicName string) error { return err } if val != "" { - log.Debug("rocksmq topic already exists ", zap.String("topic", topicName)) + log.Warn("rocksmq topic already exists ", zap.String("topic", topicName)) return nil } @@ -390,12 +389,6 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error { topicMu.Delete(topicName) rmq.retentionInfo.topicRetetionTime.Delete(topicName) - // clean up reader - if val, ok := rmq.readers.LoadAndDelete(topicName); ok { - for _, reader := range val.([]*rocksmqReader) { - reader.Close() - } - } log.Debug("Rocksmq destroy topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds())) return nil } @@ -533,7 +526,6 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni idStart, idEnd, err := rmq.idAllocator.Alloc(uint32(msgLen)) if err != nil { - log.Error("RocksMQ: alloc id failed.", zap.Error(err)) return []UniqueID{}, err } allocTime := time.Since(start).Milliseconds() @@ -558,7 +550,6 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni defer opts.Destroy() err = rmq.store.Write(opts, batch) if err != nil { - log.Debug("RocksMQ: write batch failed") return []UniqueID{}, err } writeTime := time.Since(start).Milliseconds() @@ -573,16 +564,6 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni } } - // Notify reader - if val, ok := rmq.readers.Load(topicName); ok { - for _, reader := range val.([]*rocksmqReader) { - select { - case reader.readerMutex <- struct{}{}: - default: - } - } - } - // Update message page info err = rmq.updatePageInfo(topicName, msgIDs, msgSizes) if err != nil { @@ -688,7 +669,6 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum offset++ msgID, err := strconv.ParseInt(strKey[len(topicName)+1:], 10, 64) if err != nil { - log.Warn("RocksMQ: parse int " + strKey[len(topicName)+1:] + " failed") val.Free() return nil, err } @@ -751,7 +731,6 @@ func (rmq *rocksmq) seek(topicName string, groupName string, msgID UniqueID) err key := constructCurrentID(topicName, groupName) _, ok := rmq.consumersID.Load(key) if !ok { - log.Warn("RocksMQ: channel " + key + " not exists") return fmt.Errorf("ConsumerGroup %s, channel %s not exists", groupName, topicName) } @@ -760,7 +739,6 @@ func (rmq *rocksmq) seek(topicName string, groupName string, msgID UniqueID) err defer opts.Destroy() val, err := rmq.store.Get(opts, []byte(storeKey)) if err != nil { - log.Warn("RocksMQ: get " + storeKey + " failed") return err } defer val.Free() @@ -800,7 +778,6 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err err := rmq.seek(topicName, groupName, msgID) if err != nil { - log.Debug("failed to seek", zap.String("topic", topicName), zap.String("group", groupName), zap.Uint64("msgId", uint64(msgID)), zap.Error(err)) return err } log.Debug("successfully seek", zap.String("topic", topicName), zap.String("group", groupName), zap.Uint64("msgId", uint64(msgID))) @@ -817,7 +794,6 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error { key := constructCurrentID(topicName, groupName) _, ok := rmq.consumersID.Load(key) if !ok { - log.Warn("RocksMQ: channel " + key + " not exists") return fmt.Errorf("ConsumerGroup %s, channel %s not exists", groupName, topicName) } @@ -968,127 +944,3 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID) } return nil } - -// CreateReader create a reader for topic and generate reader name -func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool, subscriptionRolePrefix string) (string, error) { - if rmq.isClosed() { - return "", errors.New(RmqNotServingErrMsg) - } - if _, ok := topicMu.Load(topicName); !ok { - return "", fmt.Errorf("topic=%s not exist", topicName) - } - readOpts := gorocksdb.NewDefaultReadOptions() - readOpts.SetPrefixSameAsStart(true) - iter := rocksdbkv.NewRocksIteratorWithUpperBound(rmq.store, typeutil.AddOne(topicName+"/"), readOpts) - dataKey := path.Join(topicName, strconv.FormatInt(startMsgID, 10)) - iter.Seek([]byte(dataKey)) - // if iterate fail - if err := iter.Err(); err != nil { - return "", err - } - - nowTs, err := getNowTs(rmq.idAllocator) - if err != nil { - return "", errors.New("Can't get current ts from rocksmq idAllocator") - } - readerName := subscriptionRolePrefix + ReaderNamePrefix + strconv.FormatInt(nowTs, 10) - - reader := &rocksmqReader{ - store: rmq.store, - topic: topicName, - readerName: readerName, - readOpts: readOpts, - iter: iter, - currentID: startMsgID, - messageIDInclusive: messageIDInclusive, - readerMutex: make(chan struct{}, 1), - } - if vals, ok := rmq.readers.Load(topicName); ok { - readers := vals.([]*rocksmqReader) - readers = append(readers, reader) - rmq.readers.Store(topicName, readers) - } else { - readers := make([]*rocksmqReader, 1) - readers[0] = reader - rmq.readers.Store(topicName, readers) - } - return readerName, nil -} - -func (rmq *rocksmq) getReader(topicName, readerName string) *rocksmqReader { - if vals, ok := rmq.readers.Load(topicName); ok { - for _, v := range vals.([]*rocksmqReader) { - if v.readerName == readerName { - return v - } - } - } - return nil -} - -func (rmq *rocksmq) getAndDeleteReader(topicName, readerName string) *rocksmqReader { - if vals, ok := rmq.readers.Load(topicName); ok { - readers := vals.([]*rocksmqReader) - for i, v := range vals.([]*rocksmqReader) { - if v.readerName == readerName { - readers[i] = readers[len(readers)-1] - rmq.readers.Store(topicName, readers[:len(readers)-1]) - return v - } - } - } - return nil -} - -// ReaderSeek seek a reader to the pointed position -func (rmq *rocksmq) ReaderSeek(topicName string, readerName string, msgID UniqueID) error { - if rmq.isClosed() { - return errors.New(RmqNotServingErrMsg) - } - reader := rmq.getReader(topicName, readerName) - if reader == nil { - log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName)) - return fmt.Errorf("reader not exist, topic %s, reader %s", topicName, readerName) - } - reader.Seek(msgID) - return nil -} - -// Next get the next message of reader -func (rmq *rocksmq) Next(ctx context.Context, topicName string, readerName string) (*ConsumerMessage, error) { - if rmq.isClosed() { - return nil, errors.New(RmqNotServingErrMsg) - } - reader := rmq.getReader(topicName, readerName) - if reader == nil { - return nil, fmt.Errorf("reader of %s doesn't exist", topicName) - } - return reader.Next(ctx) -} - -// HasNext judge whether reader has next message -func (rmq *rocksmq) HasNext(topicName string, readerName string) bool { - if rmq.isClosed() { - return false - } - reader := rmq.getReader(topicName, readerName) - if reader == nil { - log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName)) - return false - } - return reader.HasNext() -} - -// CloseReader close a reader -func (rmq *rocksmq) CloseReader(topicName string, readerName string) { - if rmq.isClosed() { - return - } - reader := rmq.getAndDeleteReader(topicName, readerName) - if reader == nil { - log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName)) - return - } - reader.Close() - reader = nil -} diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go index 4a345f57c2..d3698c429c 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go @@ -12,7 +12,6 @@ package server import ( - "context" "fmt" "log" "os" @@ -726,128 +725,6 @@ func TestRocksmq_SeekToLatest(t *testing.T) { } } -func TestRocksmq_Reader(t *testing.T) { - ep := etcdEndpoints() - etcdCli, err := etcd.GetRemoteEtcdClient(ep) - assert.Nil(t, err) - defer etcdCli.Close() - etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root") - assert.Nil(t, err) - defer etcdKV.Close() - idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV) - _ = idAllocator.Initialize() - - name := "/tmp/rocksmq_reader" - defer os.RemoveAll(name) - kvName := name + "_meta_kv" - _ = os.RemoveAll(kvName) - defer os.RemoveAll(kvName) - rmq, err := NewRocksMQ(name, idAllocator) - assert.Nil(t, err) - defer rmq.Close() - - channelName := newChanName() - _, err = rmq.CreateReader(channelName, 0, true, "") - assert.Error(t, err) - err = rmq.CreateTopic(channelName) - assert.Nil(t, err) - defer rmq.DestroyTopic(channelName) - loopNum := 100 - - pMsgs := make([]ProducerMessage, loopNum) - for i := 0; i < loopNum; i++ { - msg := "message_" + strconv.Itoa(i+loopNum) - pMsg := ProducerMessage{Payload: []byte(msg)} - pMsgs[i] = pMsg - } - ids, err := rmq.Produce(channelName, pMsgs) - assert.Nil(t, err) - assert.Equal(t, len(ids), loopNum) - - readerName1, err := rmq.CreateReader(channelName, ids[0], true, "test-reader-true") - assert.NoError(t, err) - rmq.ReaderSeek(channelName, readerName1, ids[0]) - ctx := context.Background() - for i := 0; i < loopNum; i++ { - assert.Equal(t, true, rmq.HasNext(channelName, readerName1)) - msg, err := rmq.Next(ctx, channelName, readerName1) - assert.NoError(t, err) - assert.Equal(t, msg.MsgID, ids[i]) - } - assert.False(t, rmq.HasNext(channelName, readerName1)) - - readerName2, err := rmq.CreateReader(channelName, ids[0], false, "test-reader-false") - assert.NoError(t, err) - - rmq.ReaderSeek(channelName, readerName2, ids[0]) - for i := 0; i < loopNum-1; i++ { - assert.Equal(t, true, rmq.HasNext(channelName, readerName2)) - msg, err := rmq.Next(ctx, channelName, readerName2) - assert.NoError(t, err) - assert.Equal(t, msg.MsgID, ids[i+1]) - } - assert.False(t, rmq.HasNext(channelName, readerName2)) -} - -func TestReader_CornerCase(t *testing.T) { - ep := etcdEndpoints() - etcdCli, err := etcd.GetRemoteEtcdClient(ep) - assert.Nil(t, err) - defer etcdCli.Close() - etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root") - defer etcdKV.Close() - idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV) - _ = idAllocator.Initialize() - - name := "/tmp/rocksmq_reader_cornercase" - defer os.RemoveAll(name) - kvName := name + "_meta_kv" - _ = os.RemoveAll(kvName) - defer os.RemoveAll(kvName) - rmq, err := NewRocksMQ(name, idAllocator) - assert.Nil(t, err) - defer rmq.Close() - - channelName := newChanName() - err = rmq.CreateTopic(channelName) - assert.Nil(t, err) - defer rmq.DestroyTopic(channelName) - loopNum := 10 - - pMsgs := make([]ProducerMessage, loopNum) - for i := 0; i < loopNum; i++ { - msg := "message_" + strconv.Itoa(i+loopNum) - pMsg := ProducerMessage{Payload: []byte(msg)} - pMsgs[i] = pMsg - } - ids, err := rmq.Produce(channelName, pMsgs) - assert.Nil(t, err) - assert.Equal(t, len(ids), loopNum) - - readerName, err := rmq.CreateReader(channelName, ids[loopNum-1], true, "cornercase") - assert.NoError(t, err) - - ctx := context.Background() - msg, err := rmq.Next(ctx, channelName, readerName) - assert.NoError(t, err) - assert.Equal(t, msg.MsgID, ids[loopNum-1]) - - var extraIds []UniqueID - go func() { - time.Sleep(1 * time.Second) - extraMsgs := make([]ProducerMessage, 1) - msg := "extra_message" - extraMsgs[0] = ProducerMessage{Payload: []byte(msg)} - extraIds, _ = rmq.Produce(channelName, extraMsgs) - // assert.NoError(t, er) - assert.Equal(t, 1, len(extraIds)) - }() - - msg, err = rmq.Next(ctx, channelName, readerName) - assert.NoError(t, err) - assert.Equal(t, string(msg.Payload), "extra_message") -} - func TestRocksmq_GetLatestMsg(t *testing.T) { ep := etcdEndpoints() etcdCli, err := etcd.GetRemoteEtcdClient(ep) @@ -953,13 +830,6 @@ func TestRocksmq_Close(t *testing.T) { assert.Error(t, rmq.seek("", "", 0)) assert.Error(t, rmq.SeekToLatest("", "")) - _, err = rmq.CreateReader("", 0, false, "") - assert.Error(t, err) - rmq.ReaderSeek("", "", 0) - _, err = rmq.Next(nil, "", "") - assert.Error(t, err) - rmq.HasNext("", "") - rmq.CloseReader("", "") } func TestRocksmq_SeekWithNoConsumerError(t *testing.T) { diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_reader.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_reader.go deleted file mode 100644 index ec213cb094..0000000000 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_reader.go +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package server - -import ( - "context" - "errors" - "fmt" - "path" - "strconv" - - rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/typeutil" - "github.com/tecbot/gorocksdb" -) - -type rocksmqReader struct { - store *gorocksdb.DB - topic string - readerName string - - readOpts *gorocksdb.ReadOptions - iter *rocksdbkv.RocksIterator - - currentID UniqueID - messageIDInclusive bool - readerMutex chan struct{} -} - -//Seek seek the rocksmq reader to the pointed position -func (rr *rocksmqReader) Seek(msgID UniqueID) { //nolint:govet - rr.currentID = msgID - dataKey := path.Join(rr.topic, strconv.FormatInt(msgID, 10)) - rr.iter.Seek([]byte(dataKey)) - if !rr.messageIDInclusive { - rr.currentID++ - rr.iter.Next() - } -} - -func (rr *rocksmqReader) Next(ctx context.Context) (*ConsumerMessage, error) { - var err error - iter := rr.iter - - var msg *ConsumerMessage - getMsg := func() { - key := iter.Key() - val := iter.Value() - tmpKey := string(key.Data()) - if key != nil { - key.Free() - } - - var msgID UniqueID - msgID, err = strconv.ParseInt(tmpKey[len(rr.topic)+1:], 10, 64) - msg = &ConsumerMessage{ - MsgID: msgID, - } - origData := val.Data() - dataLen := len(origData) - if dataLen > 0 { - msg.Payload = make([]byte, dataLen) - copy(msg.Payload, origData) - } - if val != nil { - val.Free() - } - iter.Next() - rr.currentID = msgID - } - if iter.Valid() { - getMsg() - return msg, err - } - // TODO this is the same logic as pulsar reader, but do we really need to read till the end of the stream - select { - case <-ctx.Done(): - log.Debug("Stop get next reader message!") - return nil, ctx.Err() - case _, ok := <-rr.readerMutex: - if !ok { - log.Warn("reader Mutex closed") - return nil, fmt.Errorf("reader Mutex closed") - } - rr.iter.Close() - rr.iter = rocksdbkv.NewRocksIteratorWithUpperBound(rr.store, typeutil.AddOne(rr.topic+"/"), rr.readOpts) - dataKey := path.Join(rr.topic, strconv.FormatInt(rr.currentID+1, 10)) - iter = rr.iter - iter.Seek([]byte(dataKey)) - if !iter.Valid() { - return nil, errors.New("reader iterater is still invalid after receive mutex") - } - getMsg() - return msg, err - } -} - -func (rr *rocksmqReader) HasNext() bool { - if rr.iter.Valid() { - return true - } - - select { - case _, ok := <-rr.readerMutex: - if !ok { - return false - } - rr.iter.Close() - rr.iter = rocksdbkv.NewRocksIteratorWithUpperBound(rr.store, typeutil.AddOne(rr.topic+"/"), rr.readOpts) - dataKey := path.Join(rr.topic, strconv.FormatInt(rr.currentID+1, 10)) - rr.iter.Seek([]byte(dataKey)) - return rr.iter.Valid() - default: - return false - } -} - -func (rr *rocksmqReader) Close() { - close(rr.readerMutex) - rr.iter.Close() - rr.readOpts.Destroy() -}