diff --git a/internal/util/mqclient/rmq_reader.go b/internal/util/mqclient/rmq_reader.go new file mode 100644 index 0000000000..acd1ccff7b --- /dev/null +++ b/internal/util/mqclient/rmq_reader.go @@ -0,0 +1,39 @@ +package mqclient + +import ( + "context" + + "github.com/milvus-io/milvus/internal/util/rocksmq/client/rocksmq" +) + +var _ Reader = (*rmqReader)(nil) + +type rmqReader struct { + r rocksmq.Reader +} + +func (rr *rmqReader) Topic() string { + return rr.r.Topic() +} + +func (rr *rmqReader) Next(ctx context.Context) (Message, error) { + rMsg, err := rr.r.Next(ctx) + if err != nil { + return nil, err + } + msg := &rmqMessage{msg: rMsg} + return msg, nil +} + +func (rr *rmqReader) HasNext() bool { + return rr.r.HasNext() +} + +func (rr *rmqReader) Seek(id MessageID) error { + msgID := id.(*rmqID).messageID + return rr.r.Seek(msgID) +} + +func (rr *rmqReader) Close() { + rr.r.Close() +} diff --git a/internal/util/rocksmq/client/rocksmq/client.go b/internal/util/rocksmq/client/rocksmq/client.go index 715dd77b2d..7046acca3b 100644 --- a/internal/util/rocksmq/client/rocksmq/client.go +++ b/internal/util/rocksmq/client/rocksmq/client.go @@ -39,6 +39,8 @@ type Client interface { // Create a consumer instance and subscribe a topic Subscribe(options ConsumerOptions) (Consumer, error) + CreateReader(options ReaderOptions) (Reader, error) + // Close the client and free associated resources Close() } diff --git a/internal/util/rocksmq/client/rocksmq/client_impl.go b/internal/util/rocksmq/client/rocksmq/client_impl.go index cce60c3618..a40d6af3bb 100644 --- a/internal/util/rocksmq/client/rocksmq/client_impl.go +++ b/internal/util/rocksmq/client/rocksmq/client_impl.go @@ -158,6 +158,11 @@ func (c *client) consume(consumer *consumer) { } } +func (c *client) CreateReader(readerOptions ReaderOptions) (Reader, error) { + reader, err := newReader(c, &readerOptions) + return reader, err +} + // Close close the channel to notify rocksmq to stop operation and close rocksmq server func (c *client) Close() { // TODO(yukun): Should call server.close() here? diff --git a/internal/util/rocksmq/client/rocksmq/reader_impl.go b/internal/util/rocksmq/client/rocksmq/reader_impl.go new file mode 100644 index 0000000000..e40c54d1aa --- /dev/null +++ b/internal/util/rocksmq/client/rocksmq/reader_impl.go @@ -0,0 +1,76 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package rocksmq + +import "context" + +type reader struct { + c *client + topic string + name string + startMessageID UniqueID + startMessageIDInclusive bool +} + +func newReader(c *client, readerOptions *ReaderOptions) (*reader, error) { + if c == nil { + return nil, newError(InvalidConfiguration, "client is nil") + } + if readerOptions == nil { + return nil, newError(InvalidConfiguration, "options is nil") + } + if readerOptions.Topic == "" { + return nil, newError(InvalidConfiguration, "topic is empty") + } + reader := &reader{ + c: c, + topic: readerOptions.Topic, + name: readerOptions.Name, + startMessageID: readerOptions.StartMessageID, + startMessageIDInclusive: readerOptions.StartMessageIDInclusive, + } + if c.server == nil { + return nil, newError(InvalidConfiguration, "rmq server in client is nil") + } + err := c.server.CreateReader(readerOptions.Topic, reader.startMessageID, reader.startMessageIDInclusive) + return reader, err +} + +func (r *reader) Topic() string { + return r.topic +} + +func (r *reader) Next(ctx context.Context) (Message, error) { + cMsg, err := r.c.server.Next(ctx, r.topic, r.startMessageIDInclusive) + if err != nil { + return Message{}, err + } + msg := Message{ + MsgID: cMsg.MsgID, + Payload: cMsg.Payload, + Topic: r.topic, + } + return msg, nil +} + +func (r *reader) HasNext() bool { + return r.c.server.HasNext(r.topic, r.startMessageIDInclusive) +} + +func (r *reader) Close() { + r.c.server.CloseReader(r.topic) +} + +func (r *reader) Seek(msgID UniqueID) error { //nolint:govet + r.c.server.ReaderSeek(r.topic, msgID) + return nil +} diff --git a/internal/util/rocksmq/client/rocksmq/reader_impl_test.go b/internal/util/rocksmq/client/rocksmq/reader_impl_test.go new file mode 100644 index 0000000000..e925b8f402 --- /dev/null +++ b/internal/util/rocksmq/client/rocksmq/reader_impl_test.go @@ -0,0 +1,99 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package rocksmq + +import ( + "context" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_NewReader(t *testing.T) { + reader, err := newReader(nil, nil) + assert.Error(t, err) + assert.Nil(t, reader) + + reader, err = newReader(newMockClient(), nil) + assert.Error(t, err) + assert.Nil(t, reader) + + options := &ReaderOptions{} + reader, err = newReader(newMockClient(), options) + assert.Error(t, err) + assert.Nil(t, reader) + + options.Topic = newTopicName() + reader, err = newReader(newMockClient(), options) + assert.Error(t, err) + assert.Nil(t, reader) +} + +func TestReader_Next(t *testing.T) { + rmqPath := "/tmp/milvus/test_reader" + rmq := newRocksMQ(rmqPath) + defer removePath(rmqPath) + client, err := newClient(ClientOptions{ + Server: rmq, + }) + assert.NoError(t, err) + assert.NotNil(t, client) + defer client.Close() + + topicName := newTopicName() + reader, err := newReader(client, &ReaderOptions{ + Topic: topicName, + StartMessageIDInclusive: true, + }) + assert.NoError(t, err) + assert.NotNil(t, reader) + assert.Equal(t, reader.Topic(), topicName) + defer reader.Close() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + }) + assert.NotNil(t, producer) + assert.NoError(t, err) + + msgNum := 10 + ids := make([]UniqueID, 0) + for i := 0; i < msgNum; i++ { + msg := &ProducerMessage{ + Payload: []byte("message_" + strconv.FormatInt(int64(i), 10)), + } + id, err := producer.Send(msg) + assert.NoError(t, err) + ids = append(ids, id) + } + + reader.Seek(ids[1]) + ctx := context.Background() + for i := 1; i < msgNum; i++ { + assert.True(t, reader.HasNext()) + rMsg, err := reader.Next(ctx) + assert.NoError(t, err) + assert.Equal(t, rMsg.MsgID, ids[i]) + } + assert.False(t, reader.HasNext()) + + reader.startMessageIDInclusive = false + reader.Seek(ids[5]) + for i := 5; i < msgNum-1; i++ { + assert.True(t, reader.HasNext()) + rMsg, err := reader.Next(ctx) + assert.NoError(t, err) + assert.Equal(t, rMsg.MsgID, ids[i+1]) + } + assert.False(t, reader.HasNext()) +} diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq.go b/internal/util/rocksmq/server/rocksmq/rocksmq.go index 824f8ac94f..a9aa3e2665 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq.go @@ -11,6 +11,8 @@ package rocksmq +import "context" + // ProducerMessage that will be write to rocksdb type ProducerMessage struct { Payload []byte @@ -47,4 +49,10 @@ type RocksMQ interface { ExistConsumerGroup(topicName string, groupName string) (bool, *Consumer) Notify(topicName, groupName string) + + CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool) error + ReaderSeek(topicName string, msgID UniqueID) + Next(ctx context.Context, topicName string, messageIDInclusive bool) (ConsumerMessage, error) + HasNext(topicName string, messageIDInclusive bool) bool + CloseReader(topicName string) } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 0e9ef0327f..ed69a05e76 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -12,6 +12,7 @@ package rocksmq import ( + "context" "errors" "fmt" "math" @@ -125,6 +126,7 @@ type rocksmq struct { ackedMu sync.Map retentionInfo *retentionInfo + readers sync.Map } // NewRocksMQ step: @@ -159,6 +161,7 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro storeMu: &sync.Mutex{}, consumers: sync.Map{}, ackedMu: sync.Map{}, + readers: sync.Map{}, } ri, err := initRetentionInfo(kv, db) @@ -306,6 +309,13 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error { // clean up retention info topicMu.Delete(topicName) rmq.retentionInfo.topics.Delete(topicName) + + // clean up reader + if val, ok := rmq.readers.LoadAndDelete(topicName); ok { + if reader, rOk := val.(*rocksmqReader); rOk { + reader.Close() + } + } log.Debug("Rocksmq destroy topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds())) return nil } @@ -490,6 +500,16 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni } } + // Notify reader + if val, ok := rmq.readers.Load(topicName); ok { + if reader, rOk := val.(*rocksmqReader); rOk { + select { + case reader.readerMutex <- struct{}{}: + default: + } + } + } + // Update message page info // TODO(yukun): Should this be in a go routine err = rmq.updatePageInfo(topicName, msgIDs, msgSizes) @@ -893,3 +913,66 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID) } return nil } + +func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool) error { + readOpts := gorocksdb.NewDefaultReadOptions() + readOpts.SetPrefixSameAsStart(true) + iter := rmq.store.NewIterator(readOpts) + fixChanName, err := fixChannelName(topicName) + if err != nil { + log.Debug("RocksMQ: fixChannelName " + topicName + " failed") + return err + } + dataKey := path.Join(fixChanName, strconv.FormatInt(startMsgID, 10)) + iter.Seek([]byte(dataKey)) + if !iter.Valid() { + log.Warn("iterator of startMsgID is invalid") + } + + reader := &rocksmqReader{ + store: rmq.store, + topic: topicName, + readOpts: readOpts, + iter: iter, + currentID: startMsgID, + messageIDInclusive: messageIDInclusive, + readerMutex: make(chan struct{}, 1), + } + rmq.readers.Store(topicName, reader) + + return nil +} + +func (rmq *rocksmq) ReaderSeek(topicName string, msgID UniqueID) { + if val, ok := rmq.readers.Load(topicName); ok { + if reader, rOk := val.(*rocksmqReader); rOk { + reader.Seek(msgID) + } + } +} + +func (rmq *rocksmq) Next(ctx context.Context, topicName string, messageIDInclusive bool) (ConsumerMessage, error) { + if val, ok := rmq.readers.Load(topicName); ok { + if reader, rOk := val.(*rocksmqReader); rOk { + return reader.Next(ctx, messageIDInclusive) + } + } + return ConsumerMessage{}, fmt.Errorf("reader of %s doesn't exist", topicName) +} + +func (rmq *rocksmq) HasNext(topicName string, messageIDInclusive bool) bool { + if val, ok := rmq.readers.Load(topicName); ok { + if reader, rOk := val.(*rocksmqReader); rOk { + return reader.HasNext(messageIDInclusive) + } + } + return false +} + +func (rmq *rocksmq) CloseReader(topicName string) { + if val, ok := rmq.readers.Load(topicName); ok { + if reader, rOk := val.(*rocksmqReader); rOk { + reader.Close() + } + } +} diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go index e7588a27c5..463cf99a5c 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go @@ -12,6 +12,8 @@ package rocksmq import ( + "context" + "fmt" "log" "os" "strconv" @@ -44,6 +46,14 @@ func InitIDAllocator(kvPath string) *allocator.GlobalIDAllocator { return idAllocator } +func newChanName() string { + return fmt.Sprintf("my-chan-%v", time.Now().Nanosecond()) +} + +func newGroupName() string { + return fmt.Sprintf("my-group-%v", time.Now().Nanosecond()) +} + func Test_FixChannelName(t *testing.T) { name := "abcd" fixName, err := fixChannelName(name) @@ -605,3 +615,59 @@ func TestRocksmq_SeekToLatest(t *testing.T) { assert.Nil(t, err) assert.Equal(t, len(cMsgs), 0) } + +func TestRocksmq_Reader(t *testing.T) { + ep := etcdEndpoints() + etcdKV, err := etcdkv.NewEtcdKV(ep, "/etcd/test/root") + assert.Nil(t, err) + defer etcdKV.Close() + idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV) + _ = idAllocator.Initialize() + + name := "/tmp/rocksmq_reader" + defer os.RemoveAll(name) + kvName := name + "_meta_kv" + _ = os.RemoveAll(kvName) + defer os.RemoveAll(kvName) + rmq, err := NewRocksMQ(name, idAllocator) + assert.Nil(t, err) + defer rmq.Close() + + channelName := newChanName() + err = rmq.CreateTopic(channelName) + assert.Nil(t, err) + defer rmq.DestroyTopic(channelName) + loopNum := 100 + + err = rmq.CreateReader(channelName, 0, true) + assert.NoError(t, err) + + pMsgs := make([]ProducerMessage, loopNum) + for i := 0; i < loopNum; i++ { + msg := "message_" + strconv.Itoa(i+loopNum) + pMsg := ProducerMessage{Payload: []byte(msg)} + pMsgs[i] = pMsg + } + ids, err := rmq.Produce(channelName, pMsgs) + assert.Nil(t, err) + assert.Equal(t, len(ids), loopNum) + + rmq.ReaderSeek(channelName, ids[0]) + ctx := context.Background() + for i := 0; i < loopNum; i++ { + assert.Equal(t, true, rmq.HasNext(channelName, true)) + msg, err := rmq.Next(ctx, channelName, true) + assert.NoError(t, err) + assert.Equal(t, msg.MsgID, ids[i]) + } + assert.False(t, rmq.HasNext(channelName, true)) + + rmq.ReaderSeek(channelName, ids[0]) + for i := 0; i < loopNum-1; i++ { + assert.Equal(t, true, rmq.HasNext(channelName, false)) + msg, err := rmq.Next(ctx, channelName, false) + assert.NoError(t, err) + assert.Equal(t, msg.MsgID, ids[i+1]) + } + assert.False(t, rmq.HasNext(channelName, false)) +} diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_reader.go b/internal/util/rocksmq/server/rocksmq/rocksmq_reader.go new file mode 100644 index 0000000000..caaacbaed0 --- /dev/null +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_reader.go @@ -0,0 +1,187 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package rocksmq + +import ( + "context" + "fmt" + "path" + "strconv" + "sync" + + "github.com/milvus-io/milvus/internal/log" + "github.com/tecbot/gorocksdb" +) + +type rocksmqReader struct { + store *gorocksdb.DB + topic string + + readOpts *gorocksdb.ReadOptions + iter *gorocksdb.Iterator + + currentID UniqueID + messageIDInclusive bool + readerMutex chan struct{} +} + +func (rr *rocksmqReader) Seek(msgID UniqueID) { //nolint:govet + rr.currentID = msgID + select { + case rr.readerMutex <- struct{}{}: + default: + } +} + +func (rr *rocksmqReader) Next(ctx context.Context, messageIDInclusive bool) (ConsumerMessage, error) { + ll, ok := topicMu.Load(rr.topic) + if !ok { + return ConsumerMessage{}, fmt.Errorf("topic name = %s not exist", rr.topic) + } + lock, ok := ll.(*sync.Mutex) + if !ok { + return ConsumerMessage{}, fmt.Errorf("get mutex failed, topic name = %s", rr.topic) + } + lock.Lock() + defer lock.Unlock() + fixChanName, err := fixChannelName(rr.topic) + if err != nil { + log.Debug("RocksMQ: fixChannelName " + rr.topic + " failed") + return ConsumerMessage{}, err + } + readOpts := gorocksdb.NewDefaultReadOptions() + defer readOpts.Destroy() + + var msg ConsumerMessage + readOpts.SetPrefixSameAsStart(true) + iter := rr.store.NewIterator(readOpts) + defer iter.Close() + + for { + select { + case <-ctx.Done(): + log.Debug("Stop get next reader message!") + return ConsumerMessage{}, nil + case <-rr.readerMutex: + dataKey := path.Join(fixChanName, strconv.FormatInt(rr.currentID, 10)) + if iter.Seek([]byte(dataKey)); !iter.Valid() { + continue + } + if messageIDInclusive { + val, err := rr.store.Get(readOpts, []byte(dataKey)) + if err != nil { + return ConsumerMessage{}, err + } + if !val.Exists() { + continue + } + msg = ConsumerMessage{ + MsgID: rr.currentID, + } + origData := val.Data() + dataLen := len(origData) + if dataLen == 0 { + msg.Payload = nil + } else { + msg.Payload = make([]byte, dataLen) + copy(msg.Payload, origData) + } + val.Free() + + // Update nextID in readerOffset + var nextID UniqueID + iter.Next() + if iter.Valid() { + key := iter.Key() + nextID, err = strconv.ParseInt(string(key.Data())[FixedChannelNameLen+1:], 10, 64) + if key.Exists() { + key.Free() + } + if err != nil { + return ConsumerMessage{}, err + } + rr.readerMutex <- struct{}{} + } else { + nextID = rr.currentID + 1 + } + rr.currentID = nextID + } else { + iter.Next() + if iter.Valid() { + key := iter.Key() + tmpKey := string(key.Data()) + key.Free() + id, err := strconv.ParseInt(tmpKey[FixedChannelNameLen+1:], 10, 64) + if err != nil { + return ConsumerMessage{}, err + } + val := iter.Value() + msg = ConsumerMessage{ + MsgID: id, + } + origData := val.Data() + dataLen := len(origData) + if dataLen == 0 { + msg.Payload = nil + } else { + msg.Payload = make([]byte, dataLen) + copy(msg.Payload, origData) + } + val.Free() + rr.currentID = id + rr.readerMutex <- struct{}{} + } + } + return msg, nil + } + } +} + +func (rr *rocksmqReader) HasNext(messageIDInclusive bool) bool { + ll, ok := topicMu.Load(rr.topic) + if !ok { + return false + } + lock, ok := ll.(*sync.Mutex) + if !ok { + return false + } + lock.Lock() + defer lock.Unlock() + fixChanName, err := fixChannelName(rr.topic) + if err != nil { + log.Debug("RocksMQ: fixChannelName " + rr.topic + " failed") + return false + } + readOpts := gorocksdb.NewDefaultReadOptions() + defer readOpts.Destroy() + readOpts.SetPrefixSameAsStart(true) + iter := rr.store.NewIterator(readOpts) + defer iter.Close() + + dataKey := path.Join(fixChanName, strconv.FormatInt(rr.currentID, 10)) + iter.Seek([]byte(dataKey)) + if !iter.Valid() { + return false + } + if messageIDInclusive { + return true + } + iter.Next() + return iter.Valid() +} + +func (rr *rocksmqReader) Close() { + close(rr.readerMutex) + rr.iter.Close() + rr.readOpts.Destroy() +}