diff --git a/internal/util/rocksmq/client/rocksmq/reader_impl.go b/internal/util/rocksmq/client/rocksmq/reader_impl.go index e40c54d1aa..3da1d58f1f 100644 --- a/internal/util/rocksmq/client/rocksmq/reader_impl.go +++ b/internal/util/rocksmq/client/rocksmq/reader_impl.go @@ -41,8 +41,12 @@ func newReader(c *client, readerOptions *ReaderOptions) (*reader, error) { if c.server == nil { return nil, newError(InvalidConfiguration, "rmq server in client is nil") } - err := c.server.CreateReader(readerOptions.Topic, reader.startMessageID, reader.startMessageIDInclusive) - return reader, err + name, err := c.server.CreateReader(readerOptions.Topic, reader.startMessageID, reader.startMessageIDInclusive) + if err != nil { + return nil, err + } + reader.name = name + return reader, nil } func (r *reader) Topic() string { @@ -50,7 +54,7 @@ func (r *reader) Topic() string { } func (r *reader) Next(ctx context.Context) (Message, error) { - cMsg, err := r.c.server.Next(ctx, r.topic, r.startMessageIDInclusive) + cMsg, err := r.c.server.Next(ctx, r.topic, r.name, r.startMessageIDInclusive) if err != nil { return Message{}, err } @@ -63,14 +67,14 @@ func (r *reader) Next(ctx context.Context) (Message, error) { } func (r *reader) HasNext() bool { - return r.c.server.HasNext(r.topic, r.startMessageIDInclusive) + return r.c.server.HasNext(r.topic, r.name, r.startMessageIDInclusive) } func (r *reader) Close() { - r.c.server.CloseReader(r.topic) + r.c.server.CloseReader(r.topic, r.name) } func (r *reader) Seek(msgID UniqueID) error { //nolint:govet - r.c.server.ReaderSeek(r.topic, msgID) + r.c.server.ReaderSeek(r.topic, r.name, msgID) return nil } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq.go b/internal/util/rocksmq/server/rocksmq/rocksmq.go index a9aa3e2665..ae106e2135 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq.go @@ -50,9 +50,9 @@ type RocksMQ interface { Notify(topicName, groupName string) - CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool) error - ReaderSeek(topicName string, msgID UniqueID) - Next(ctx context.Context, topicName string, messageIDInclusive bool) (ConsumerMessage, error) - HasNext(topicName string, messageIDInclusive bool) bool - CloseReader(topicName string) + CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool) (string, error) + ReaderSeek(topicName string, readerName string, msgID UniqueID) + Next(ctx context.Context, topicName string, readerName string, messageIDInclusive bool) (ConsumerMessage, error) + HasNext(topicName string, readerName string, messageIDInclusive bool) bool + CloseReader(topicName string, readerName string) } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 8854f404e0..9edf15af70 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -25,6 +25,7 @@ import ( "github.com/milvus-io/milvus/internal/kv" rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/tecbot/gorocksdb" @@ -53,7 +54,8 @@ const ( AckedSizeTitle = "acked_size/" LastRetTsTitle = "last_retention_ts/" - CurrentIDSuffix = "current_id" + CurrentIDSuffix = "current_id" + ReaderNamePrefix = "reader-" ) /** @@ -115,6 +117,19 @@ func checkRetention() bool { return RocksmqRetentionTimeInMinutes != -1 && RocksmqRetentionSizeInMB != -1 } +func getNowTs(idAllocator allocator.GIDAllocator) (int64, error) { + err := idAllocator.UpdateID() + if err != nil { + return 0, err + } + newID, err := idAllocator.AllocOne() + if err != nil { + return 0, err + } + nowTs, _ := tsoutil.ParseTS(uint64(newID)) + return nowTs.Unix(), err +} + var topicMu sync.Map = sync.Map{} type rocksmq struct { @@ -312,7 +327,7 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error { // clean up reader if val, ok := rmq.readers.LoadAndDelete(topicName); ok { - if reader, rOk := val.(*rocksmqReader); rOk { + for _, reader := range val.([]*rocksmqReader) { reader.Close() } } @@ -499,7 +514,7 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni // Notify reader if val, ok := rmq.readers.Load(topicName); ok { - if reader, rOk := val.(*rocksmqReader); rOk { + for _, reader := range val.([]*rocksmqReader) { select { case reader.readerMutex <- struct{}{}: default: @@ -915,65 +930,90 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID) return nil } -func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool) error { +func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool) (string, error) { readOpts := gorocksdb.NewDefaultReadOptions() readOpts.SetPrefixSameAsStart(true) iter := rmq.store.NewIterator(readOpts) fixChanName, err := fixChannelName(topicName) if err != nil { log.Debug("RocksMQ: fixChannelName " + topicName + " failed") - return err + return "", err } dataKey := path.Join(fixChanName, strconv.FormatInt(startMsgID, 10)) iter.Seek([]byte(dataKey)) if !iter.Valid() { log.Warn("iterator of startMsgID is invalid") } + nowTs, err := getNowTs(rmq.idAllocator) + if err != nil { + return "", errors.New("Can't get current ts from rocksmq idAllocator") + } + readerName := ReaderNamePrefix + strconv.FormatInt(nowTs, 10) reader := &rocksmqReader{ store: rmq.store, topic: topicName, + readerName: readerName, readOpts: readOpts, iter: iter, currentID: startMsgID, messageIDInclusive: messageIDInclusive, readerMutex: make(chan struct{}, 1), } - rmq.readers.Store(topicName, reader) + if vals, ok := rmq.readers.Load(topicName); ok { + readers := vals.([]*rocksmqReader) + readers = append(readers, reader) + rmq.readers.Store(topicName, readers) + } else { + readers := make([]*rocksmqReader, 1) + readers[0] = reader + rmq.readers.Store(topicName, readers) + } + return readerName, nil +} +func (rmq *rocksmq) getReader(topicName, readerName string) *rocksmqReader { + if vals, ok := rmq.readers.Load(topicName); ok { + for _, v := range vals.([]*rocksmqReader) { + if v.readerName == readerName { + return v + } + } + } return nil } -func (rmq *rocksmq) ReaderSeek(topicName string, msgID UniqueID) { - if val, ok := rmq.readers.Load(topicName); ok { - if reader, rOk := val.(*rocksmqReader); rOk { - reader.Seek(msgID) - } +func (rmq *rocksmq) ReaderSeek(topicName string, readerName string, msgID UniqueID) { + reader := rmq.getReader(topicName, readerName) + if reader == nil { + log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName)) + return } + reader.Seek(msgID) } -func (rmq *rocksmq) Next(ctx context.Context, topicName string, messageIDInclusive bool) (ConsumerMessage, error) { - if val, ok := rmq.readers.Load(topicName); ok { - if reader, rOk := val.(*rocksmqReader); rOk { - return reader.Next(ctx, messageIDInclusive) - } +func (rmq *rocksmq) Next(ctx context.Context, topicName string, readerName string, messageIDInclusive bool) (ConsumerMessage, error) { + reader := rmq.getReader(topicName, readerName) + if reader == nil { + return ConsumerMessage{}, fmt.Errorf("reader of %s doesn't exist", topicName) } - return ConsumerMessage{}, fmt.Errorf("reader of %s doesn't exist", topicName) + return reader.Next(ctx, messageIDInclusive) } -func (rmq *rocksmq) HasNext(topicName string, messageIDInclusive bool) bool { - if val, ok := rmq.readers.Load(topicName); ok { - if reader, rOk := val.(*rocksmqReader); rOk { - return reader.HasNext(messageIDInclusive) - } +func (rmq *rocksmq) HasNext(topicName string, readerName string, messageIDInclusive bool) bool { + reader := rmq.getReader(topicName, readerName) + if reader == nil { + log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName)) + return false } - return false + return reader.HasNext(messageIDInclusive) } -func (rmq *rocksmq) CloseReader(topicName string) { - if val, ok := rmq.readers.Load(topicName); ok { - if reader, rOk := val.(*rocksmqReader); rOk { - reader.Close() - } +func (rmq *rocksmq) CloseReader(topicName string, readerName string) { + reader := rmq.getReader(topicName, readerName) + if reader == nil { + log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName)) + return } + reader.Close() } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go index 463cf99a5c..5060eab8e3 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go @@ -639,7 +639,7 @@ func TestRocksmq_Reader(t *testing.T) { defer rmq.DestroyTopic(channelName) loopNum := 100 - err = rmq.CreateReader(channelName, 0, true) + readerName, err := rmq.CreateReader(channelName, 0, true) assert.NoError(t, err) pMsgs := make([]ProducerMessage, loopNum) @@ -652,22 +652,22 @@ func TestRocksmq_Reader(t *testing.T) { assert.Nil(t, err) assert.Equal(t, len(ids), loopNum) - rmq.ReaderSeek(channelName, ids[0]) + rmq.ReaderSeek(channelName, readerName, ids[0]) ctx := context.Background() for i := 0; i < loopNum; i++ { - assert.Equal(t, true, rmq.HasNext(channelName, true)) - msg, err := rmq.Next(ctx, channelName, true) + assert.Equal(t, true, rmq.HasNext(channelName, readerName, true)) + msg, err := rmq.Next(ctx, channelName, readerName, true) assert.NoError(t, err) assert.Equal(t, msg.MsgID, ids[i]) } - assert.False(t, rmq.HasNext(channelName, true)) + assert.False(t, rmq.HasNext(channelName, readerName, true)) - rmq.ReaderSeek(channelName, ids[0]) + rmq.ReaderSeek(channelName, readerName, ids[0]) for i := 0; i < loopNum-1; i++ { - assert.Equal(t, true, rmq.HasNext(channelName, false)) - msg, err := rmq.Next(ctx, channelName, false) + assert.Equal(t, true, rmq.HasNext(channelName, readerName, false)) + msg, err := rmq.Next(ctx, channelName, readerName, false) assert.NoError(t, err) assert.Equal(t, msg.MsgID, ids[i+1]) } - assert.False(t, rmq.HasNext(channelName, false)) + assert.False(t, rmq.HasNext(channelName, readerName, false)) } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_reader.go b/internal/util/rocksmq/server/rocksmq/rocksmq_reader.go index caaacbaed0..db09eb6b47 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_reader.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_reader.go @@ -23,8 +23,9 @@ import ( ) type rocksmqReader struct { - store *gorocksdb.DB - topic string + store *gorocksdb.DB + topic string + readerName string readOpts *gorocksdb.ReadOptions iter *gorocksdb.Iterator @@ -71,7 +72,11 @@ func (rr *rocksmqReader) Next(ctx context.Context, messageIDInclusive bool) (Con case <-ctx.Done(): log.Debug("Stop get next reader message!") return ConsumerMessage{}, nil - case <-rr.readerMutex: + case _, ok := <-rr.readerMutex: + if !ok { + log.Warn("reader Mutex closed") + return ConsumerMessage{}, nil + } dataKey := path.Join(fixChanName, strconv.FormatInt(rr.currentID, 10)) if iter.Seek([]byte(dataKey)); !iter.Valid() { continue