From 080eaa9a5cc3cfe2c080267ef6f8ba3947c82d3a Mon Sep 17 00:00:00 2001 From: yukun Date: Fri, 26 Nov 2021 11:27:16 +0800 Subject: [PATCH] Add isClosed for rocksmq (#12267) Signed-off-by: fishpenguin --- .../rocksmq/server/rocksmq/rocksmq_impl.go | 61 ++++++++++++++++++- .../server/rocksmq/rocksmq_impl_test.go | 38 ++++++++++++ 2 files changed, 96 insertions(+), 3 deletions(-) diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index e99d2cec02..574e7199f1 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -20,6 +20,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/milvus-io/milvus/internal/allocator" @@ -36,6 +37,9 @@ import ( // UniqueID is the type of message ID type UniqueID = typeutil.UniqueID +// Rocksmq state +type RmqState = int64 + // RocksmqPageSize is the size of a message page, default 2GB var RocksmqPageSize int64 = 2 << 30 @@ -57,6 +61,15 @@ const ( CurrentIDSuffix = "current_id" ReaderNamePrefix = "reader-" + + RmqNotServingErrMsg = "Rocksmq is not serving" +) + +const ( + // RmqStateStopped state stands for just created or stopped `Rocksmq` instance + RmqStateStopped RmqState = 0 + // RmqStateHealthy state stands for healthy `Rocksmq` instance + RmqStateHealthy RmqState = 1 ) /** @@ -143,6 +156,7 @@ type rocksmq struct { retentionInfo *retentionInfo readers sync.Map + state RmqState } // NewRocksMQ step: @@ -189,18 +203,21 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro if checkRetention() { rmq.retentionInfo.startRetentionInfo() } - + atomic.StoreInt64(&rmq.state, RmqStateHealthy) return rmq, nil } +func (rmq *rocksmq) isClosed() bool { + return atomic.LoadInt64(&rmq.state) != RmqStateHealthy +} + // Close step: // 1. Stop retention // 2. Destroy all consumer groups and topics // 3. Close rocksdb instance func (rmq *rocksmq) Close() { + atomic.StoreInt64(&rmq.state, RmqStateStopped) rmq.stopRetention() - rmq.storeMu.Lock() - defer rmq.storeMu.Unlock() rmq.consumers.Range(func(k, v interface{}) bool { var topic string for _, consumer := range v.([]*Consumer) { @@ -218,6 +235,8 @@ func (rmq *rocksmq) Close() { } return true }) + rmq.storeMu.Lock() + defer rmq.storeMu.Unlock() rmq.store.Close() } @@ -234,6 +253,9 @@ func (rmq *rocksmq) checkKeyExist(key string) bool { // CreateTopic writes initialized messages for topic in rocksdb func (rmq *rocksmq) CreateTopic(topicName string) error { + if rmq.isClosed() { + return errors.New(RmqNotServingErrMsg) + } start := time.Now() beginKey := topicName + "/begin_id" endKey := topicName + "/end_id" @@ -353,6 +375,9 @@ func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Cons // CreateConsumerGroup creates an nonexistent consumer group for topic func (rmq *rocksmq) CreateConsumerGroup(topicName, groupName string) error { + if rmq.isClosed() { + return errors.New(RmqNotServingErrMsg) + } start := time.Now() key := constructCurrentID(topicName, groupName) if rmq.checkKeyExist(key) { @@ -371,6 +396,9 @@ func (rmq *rocksmq) CreateConsumerGroup(topicName, groupName string) error { // RegisterConsumer registers a consumer in rocksmq consumers func (rmq *rocksmq) RegisterConsumer(consumer *Consumer) { + if rmq.isClosed() { + return + } start := time.Now() if vals, ok := rmq.consumers.Load(consumer.Topic); ok { for _, v := range vals.([]*Consumer) { @@ -427,6 +455,9 @@ func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error { // Produce produces messages for topic and updates page infos for retention func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]UniqueID, error) { + if rmq.isClosed() { + return nil, errors.New(RmqNotServingErrMsg) + } start := time.Now() ll, ok := topicMu.Load(topicName) if !ok { @@ -578,6 +609,9 @@ func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes // 2. Update current_id to the last consumed message // 3. Update ack informations in rocksdb func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error) { + if rmq.isClosed() { + return nil, errors.New(RmqNotServingErrMsg) + } start := time.Now() ll, ok := topicMu.Load(topicName) if !ok { @@ -719,6 +753,9 @@ func (rmq *rocksmq) moveConsumePos(topicName string, groupName string, msgID Uni // Seek updates the current id to the given msgID func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) error { + if rmq.isClosed() { + return errors.New(RmqNotServingErrMsg) + } /* Step I: Check if key exists */ ll, ok := topicMu.Load(topicName) if !ok { @@ -736,6 +773,9 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err // SeekToLatest updates current id to the msg id of latest message + 1 func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error { + if rmq.isClosed() { + return errors.New(RmqNotServingErrMsg) + } rmq.storeMu.Lock() defer rmq.storeMu.Unlock() key := constructCurrentID(topicName, groupName) @@ -933,6 +973,9 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID) } func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool, subscriptionRolePrefix string) (string, error) { + if rmq.isClosed() { + return "", errors.New(RmqNotServingErrMsg) + } readOpts := gorocksdb.NewDefaultReadOptions() readOpts.SetPrefixSameAsStart(true) iter := rmq.store.NewIterator(readOpts) @@ -986,6 +1029,9 @@ func (rmq *rocksmq) getReader(topicName, readerName string) *rocksmqReader { } func (rmq *rocksmq) ReaderSeek(topicName string, readerName string, msgID UniqueID) { + if rmq.isClosed() { + return + } reader := rmq.getReader(topicName, readerName) if reader == nil { log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName)) @@ -995,6 +1041,9 @@ func (rmq *rocksmq) ReaderSeek(topicName string, readerName string, msgID Unique } func (rmq *rocksmq) Next(ctx context.Context, topicName string, readerName string, messageIDInclusive bool) (ConsumerMessage, error) { + if rmq.isClosed() { + return ConsumerMessage{}, errors.New(RmqNotServingErrMsg) + } reader := rmq.getReader(topicName, readerName) if reader == nil { return ConsumerMessage{}, fmt.Errorf("reader of %s doesn't exist", topicName) @@ -1003,6 +1052,9 @@ func (rmq *rocksmq) Next(ctx context.Context, topicName string, readerName strin } func (rmq *rocksmq) HasNext(topicName string, readerName string, messageIDInclusive bool) bool { + if rmq.isClosed() { + return false + } reader := rmq.getReader(topicName, readerName) if reader == nil { log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName)) @@ -1012,6 +1064,9 @@ func (rmq *rocksmq) HasNext(topicName string, readerName string, messageIDInclus } func (rmq *rocksmq) CloseReader(topicName string, readerName string) { + if rmq.isClosed() { + return + } reader := rmq.getReader(topicName, readerName) if reader == nil { log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName)) diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go index c8497734d0..74c373f9c4 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go @@ -19,6 +19,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" "time" @@ -766,3 +767,40 @@ func TestRocksmq_Reader(t *testing.T) { } assert.False(t, rmq.HasNext(channelName, readerName, false)) } + +func TestRocksmq_Close(t *testing.T) { + ep := etcdEndpoints() + etcdKV, err := etcdkv.NewEtcdKV(ep, "/etcd/test/root") + assert.Nil(t, err) + defer etcdKV.Close() + idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV) + _ = idAllocator.Initialize() + + name := "/tmp/rocksmq_close" + defer os.RemoveAll(name) + kvName := name + "_meta_kv" + _ = os.RemoveAll(kvName) + defer os.RemoveAll(kvName) + rmq, err := NewRocksMQ(name, idAllocator) + assert.Nil(t, err) + defer rmq.Close() + + atomic.StoreInt64(&rmq.state, RmqStateStopped) + assert.Error(t, rmq.CreateTopic("")) + assert.Error(t, rmq.CreateConsumerGroup("", "")) + rmq.RegisterConsumer(&Consumer{}) + _, err = rmq.Produce("", nil) + assert.Error(t, err) + _, err = rmq.Consume("", "", 0) + assert.Error(t, err) + + assert.Error(t, rmq.seek("", "", 0)) + assert.Error(t, rmq.SeekToLatest("", "")) + _, err = rmq.CreateReader("", 0, false, "") + assert.Error(t, err) + rmq.ReaderSeek("", "", 0) + _, err = rmq.Next(nil, "", "", false) + assert.Error(t, err) + rmq.HasNext("", "", false) + rmq.CloseReader("", "") +}