diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 979cf7a896..89aa301161 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -124,6 +124,10 @@ rocksmq: retentionTimeInMinutes: 4320 # 3 days, 3 * 24 * 60 minutes, The retention time of the message in rocksmq. retentionSizeInMB: 8192 # 8 GB, 8 * 1024 MB, The retention size of the message in rocksmq. compactionInterval: 86400 # 1 day, trigger rocksdb compaction every day to remove deleted data + # compaction compression type, only support use 0,7. + # 0 means not compress, 7 will use zstd + # len of types means num of rocksdb level. + compressionTypes: [0, 0, 7, 7, 7] # natsmq configuration. natsmq: diff --git a/internal/mq/mqimpl/rocksmq/client/client_impl.go b/internal/mq/mqimpl/rocksmq/client/client_impl.go index c4efd8a918..b32fc79435 100644 --- a/internal/mq/mqimpl/rocksmq/client/client_impl.go +++ b/internal/mq/mqimpl/rocksmq/client/client_impl.go @@ -136,27 +136,25 @@ func (c *client) consume(consumer *consumer) { if !ok { return } - c.deliver(consumer, 100) + c.deliver(consumer) case _, ok := <-consumer.MsgMutex(): if !ok { // consumer MsgMutex closed, goroutine exit log.Debug("Consumer MsgMutex closed") return } - c.deliver(consumer, 100) + c.deliver(consumer) } } } -func (c *client) deliver(consumer *consumer, batchMax int) { +func (c *client) deliver(consumer *consumer) { for { n := cap(consumer.messageCh) - len(consumer.messageCh) if n == 0 { return } - if n > batchMax { // batch min size - n = batchMax - } + msgs, err := consumer.client.server.Consume(consumer.topic, consumer.consumerName, n) if err != nil { log.Warn("Consumer's goroutine cannot consume from (" + consumer.topic + "," + consumer.consumerName + "): " + err.Error()) diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go index 4a6ebfb6dc..183d538a0a 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go @@ -23,6 +23,7 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/samber/lo" "github.com/tecbot/gorocksdb" "go.uber.org/zap" @@ -120,6 +121,7 @@ type rocksmq struct { kv kv.BaseKV idAllocator allocator.Interface storeMu *sync.Mutex + topicLastID sync.Map consumers sync.Map consumersID sync.Map @@ -128,11 +130,31 @@ type rocksmq struct { state RmqState } +func parseCompressionType(params *paramtable.ComponentParam) ([]gorocksdb.CompressionType, error) { + var tError error + validType := []int{0, 7} + + return lo.Map(params.RocksmqCfg.CompressionTypes.GetAsStrings(), func(sType string, _ int) gorocksdb.CompressionType { + iType, err := strconv.Atoi(sType) + if err != nil { + tError = fmt.Errorf("invalid rocksmq compression type: %s", err.Error()) + return 0 + } + + if !lo.Contains(validType, iType) { + tError = fmt.Errorf("invalid rocksmq compression type, should in %v", validType) + return 0 + } + return gorocksdb.CompressionType(iType) + }), tError +} + // NewRocksMQ step: // 1. New rocksmq instance based on rocksdb with name and rocksdbkv with kvname // 2. Init retention info, load retention info to memory // 3. Start retention goroutine func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error) { + params := paramtable.Get() // TODO we should use same rocksdb instance with different cfs maxProcs := runtime.GOMAXPROCS(0) parallelism := 1 @@ -145,7 +167,6 @@ func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error) // default rocks db cache is set with memory rocksDBLRUCacheCapacity := RocksDBLRUCacheMinCapacity if memoryCount > 0 { - params := paramtable.Get() ratio := params.RocksmqCfg.LRUCacheRatio.GetAsFloat() calculatedCapacity := uint64(float64(memoryCount) * ratio) if calculatedCapacity < RocksDBLRUCacheMinCapacity { @@ -162,11 +183,16 @@ func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error) bbto.SetBlockSize(64 << 10) bbto.SetBlockCache(gorocksdb.NewLRUCache(rocksDBLRUCacheCapacity)) + compressionTypes, err := parseCompressionType(params) + if err != nil { + return nil, err + } + optsKV := gorocksdb.NewDefaultOptions() // L0:No Compression // L1,L2: ZSTD - optsKV.SetNumLevels(3) - optsKV.SetCompressionPerLevel([]gorocksdb.CompressionType{0, 7, 7}) + optsKV.SetNumLevels(len(compressionTypes)) + optsKV.SetCompressionPerLevel(compressionTypes) optsKV.SetBlockBasedTableFactory(bbto) optsKV.SetTargetFileSizeMultiplier(2) optsKV.SetCreateIfMissing(true) @@ -186,8 +212,8 @@ func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error) // finish rocks mq store initialization, rocks mq store has to set the prefix extractor optsStore := gorocksdb.NewDefaultOptions() // share block cache with kv - optsKV.SetNumLevels(3) - optsStore.SetCompressionPerLevel([]gorocksdb.CompressionType{0, 7, 7}) + optsStore.SetNumLevels(len(compressionTypes)) + optsStore.SetCompressionPerLevel(compressionTypes) optsStore.SetBlockBasedTableFactory(bbto) optsStore.SetTargetFileSizeMultiplier(2) optsStore.SetCreateIfMissing(true) @@ -222,6 +248,7 @@ func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error) storeMu: &sync.Mutex{}, consumers: sync.Map{}, readers: sync.Map{}, + topicLastID: sync.Map{}, } ri, err := initRetentionInfo(kv, db) @@ -299,14 +326,13 @@ func (rmq *rocksmq) Info() bool { minConsumerPosition := UniqueID(-1) minConsumerGroupName := "" for _, consumer := range consumerList { - consumerKey := constructCurrentID(consumer.Topic, consumer.GroupName) - consumerPosition, ok := rmq.consumersID.Load(consumerKey) + consumerPosition, ok := rmq.getCurrentID(consumer.Topic, consumer.GroupName) if !ok { log.Error("some group not regist", zap.String("topic", consumer.Topic), zap.String("groupName", consumer.GroupName)) continue } - if minConsumerPosition == UniqueID(-1) || consumerPosition.(UniqueID) < minConsumerPosition { - minConsumerPosition = consumerPosition.(UniqueID) + if minConsumerPosition == UniqueID(-1) || consumerPosition < minConsumerPosition { + minConsumerPosition = consumerPosition minConsumerGroupName = consumer.GroupName } } @@ -658,6 +684,8 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni zap.Int64("produce total elapse", getProduceTime), ) } + + rmq.topicLastID.Store(topicName, msgIDs[len(msgIDs)-1]) return msgIDs, nil } @@ -697,6 +725,22 @@ func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes return err } +func (rmq *rocksmq) getCurrentID(topicName, groupName string) (int64, bool) { + currentID, ok := rmq.consumersID.Load(constructCurrentID(topicName, groupName)) + if !ok { + return 0, false + } + return currentID.(int64), true +} + +func (rmq *rocksmq) getLastID(topicName string) (int64, bool) { + currentID, ok := rmq.consumersID.Load(topicName) + if !ok { + return 0, false + } + return currentID.(int64), true +} + // Consume steps: // 1. Consume n messages from rocksdb // 2. Update current_id to the last consumed message @@ -710,20 +754,26 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum if !ok { return nil, fmt.Errorf("topic name = %s not exist", topicName) } + lock, ok := ll.(*sync.Mutex) if !ok { return nil, fmt.Errorf("get mutex failed, topic name = %s", topicName) } lock.Lock() defer lock.Unlock() - getLockTime := time.Since(start).Milliseconds() - metaKey := constructCurrentID(topicName, groupName) - currentID, ok := rmq.consumersID.Load(metaKey) + currentID, ok := rmq.getCurrentID(topicName, groupName) if !ok { return nil, fmt.Errorf("currentID of topicName=%s, groupName=%s not exist", topicName, groupName) } + // return if don't have new message + lastID, ok := rmq.getLastID(topicName) + if ok && currentID > lastID { + return []ConsumerMessage{}, nil + } + + getLockTime := time.Since(start).Milliseconds() readOpts := gorocksdb.NewDefaultReadOptions() defer readOpts.Destroy() prefix := topicName + "/" @@ -734,7 +784,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum if currentID == DefaultMessageID { dataKey = prefix } else { - dataKey = path.Join(topicName, strconv.FormatInt(currentID.(int64), 10)) + dataKey = path.Join(topicName, strconv.FormatInt(currentID, 10)) } iter.Seek([]byte(dataKey)) consumerMessage := make([]ConsumerMessage, 0, n) @@ -844,27 +894,26 @@ func (rmq *rocksmq) seek(topicName string, groupName string, msgID UniqueID) err } func (rmq *rocksmq) moveConsumePos(topicName string, groupName string, msgID UniqueID) error { - key := constructCurrentID(topicName, groupName) - oldPos, ok := rmq.consumersID.Load(key) + oldPos, ok := rmq.getCurrentID(topicName, groupName) if !ok { return errors.New("move unknown consumer") } - if msgID < oldPos.(UniqueID) { + if msgID < oldPos { log.Warn("RocksMQ: trying to move Consume position backward", - zap.String("key", key), zap.Int64("oldPos", oldPos.(UniqueID)), zap.Int64("newPos", msgID)) + zap.String("topic", topicName), zap.String("group", groupName), zap.Int64("oldPos", oldPos), zap.Int64("newPos", msgID)) panic("move consume position backward") } //update ack if position move forward - err := rmq.updateAckedInfo(topicName, groupName, oldPos.(UniqueID), msgID-1) + err := rmq.updateAckedInfo(topicName, groupName, oldPos, msgID-1) if err != nil { log.Warn("failed to update acked info ", zap.String("topic", topicName), zap.String("groupName", groupName), zap.Error(err)) return err } - rmq.consumersID.Store(key, msgID) + rmq.consumersID.Store(constructCurrentID(topicName, groupName), msgID) return nil } @@ -876,7 +925,7 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err /* Step I: Check if key exists */ ll, ok := topicMu.Load(topicName) if !ok { - return fmt.Errorf("Topic %s not exist, %w", topicName, mqwrapper.ErrTopicNotExist) + return fmt.Errorf("topic %s not exist, %w", topicName, mqwrapper.ErrTopicNotExist) } lock, ok := ll.(*sync.Mutex) if !ok { @@ -902,7 +951,7 @@ func (rmq *rocksmq) ForceSeek(topicName string, groupName string, msgID UniqueID /* Step I: Check if key exists */ ll, ok := topicMu.Load(topicName) if !ok { - return fmt.Errorf("Topic %s not exist, %w", topicName, mqwrapper.ErrTopicNotExist) + return fmt.Errorf("topic %s not exist, %w", topicName, mqwrapper.ErrTopicNotExist) } lock, ok := ll.(*sync.Mutex) if !ok { @@ -1058,13 +1107,12 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, firstID UniqueI var minBeginID UniqueID = lastID for _, consumer := range consumers { if consumer.GroupName != groupName { - key := constructCurrentID(consumer.Topic, consumer.GroupName) - beginID, ok := rmq.consumersID.Load(key) + beginID, ok := rmq.getCurrentID(consumer.Topic, consumer.GroupName) if !ok { return fmt.Errorf("currentID of topicName=%s, groupName=%s not exist", consumer.Topic, consumer.GroupName) } - if beginID.(UniqueID) < minBeginID { - minBeginID = beginID.(UniqueID) + if beginID < minBeginID { + minBeginID = beginID } } } diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go index 6af5057c26..39ee366e69 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go @@ -261,16 +261,6 @@ func TestRocksmq_Basic(t *testing.T) { _, err = rmq.Produce(channelName, pMsgs) assert.NoError(t, err) - // before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq - // it aims to test if produce before 2.2.0, but consume after 2.2.0 - msgD := "d_message" - tMsgs := make([]producerMessageBefore, 1) - tMsgD := producerMessageBefore{Payload: []byte(msgD)} - tMsgs[0] = tMsgD - - _, err = rmq.produceBefore(channelName, tMsgs) - assert.NoError(t, err) - groupName := "test_group" _ = rmq.DestroyConsumerGroup(channelName, groupName) err = rmq.CreateConsumerGroup(channelName, groupName) @@ -297,12 +287,47 @@ func TestRocksmq_Basic(t *testing.T) { _, ok = cMsgs[1].Properties[common.TraceIDKey] assert.True(t, ok) assert.Equal(t, cMsgs[1].Properties[common.TraceIDKey], "c") +} - cMsgs, err = rmq.Consume(channelName, groupName, 1) +func TestRocksmq_Compatibility(t *testing.T) { + suffix := "rmq_compatibility" + + kvPath := rmqPath + kvPathSuffix + suffix + defer os.RemoveAll(kvPath) + idAllocator := InitIDAllocator(kvPath) + + rocksdbPath := rmqPath + suffix + defer os.RemoveAll(rocksdbPath + kvSuffix) + defer os.RemoveAll(rocksdbPath) + paramtable.Init() + rmq, err := NewRocksMQ(rocksdbPath, idAllocator) + assert.NoError(t, err) + defer rmq.Close() + + channelName := "channel_rocks" + err = rmq.CreateTopic(channelName) + assert.NoError(t, err) + defer rmq.DestroyTopic(channelName) + + // before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq + // it aims to test if produce before 2.2.0, but consume after 2.2.0 + msgD := "d_message" + tMsgs := make([]producerMessageBefore, 1) + tMsgD := producerMessageBefore{Payload: []byte(msgD)} + tMsgs[0] = tMsgD + _, err = rmq.produceBefore(channelName, tMsgs) + assert.NoError(t, err) + + groupName := "test_group" + _ = rmq.DestroyConsumerGroup(channelName, groupName) + err = rmq.CreateConsumerGroup(channelName, groupName) + assert.NoError(t, err) + + cMsgs, err := rmq.Consume(channelName, groupName, 1) assert.NoError(t, err) assert.Equal(t, len(cMsgs), 1) assert.Equal(t, string(cMsgs[0].Payload), "d_message") - _, ok = cMsgs[0].Properties[common.TraceIDKey] + _, ok := cMsgs[0].Properties[common.TraceIDKey] assert.False(t, ok) // it will be set empty map if produce message has no properties field expect := make(map[string]string) @@ -576,7 +601,7 @@ func TestRocksmq_Goroutines(t *testing.T) { idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV) _ = idAllocator.Initialize() - name := "/tmp/rocksmq_2" + name := "/tmp/rocksmq_goroutines" defer os.RemoveAll(name) kvName := name + "_meta_kv" _ = os.RemoveAll(kvName) @@ -1275,3 +1300,16 @@ func TestRocksmq_Info(t *testing.T) { rmq.kv = &rocksdbkv.RocksdbKV{} assert.False(t, rmq.Info()) } + +func TestRocksmq_ParseCompressionTypeError(t *testing.T) { + params := paramtable.Get() + params.Init() + params.Save(params.RocksmqCfg.CompressionTypes.Key, "invalid,1") + _, err := parseCompressionType(params) + assert.Error(t, err) + + params.Save(params.RocksmqCfg.CompressionTypes.Key, "-1,-1") + defer params.Save(params.RocksmqCfg.CompressionTypes.Key, "0,0,7") + _, err = parseCompressionType(params) + assert.Error(t, err) +} diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index e61b579bd3..91eeae80c6 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -588,6 +588,11 @@ type RocksmqConfig struct { CompactionInterval ParamItem `refreshable:"false"` // TickerTimeInSeconds is the time of expired check, default 10 minutes TickerTimeInSeconds ParamItem `refreshable:"false"` + // CompressionTypes is compression type of each level + // len of CompressionTypes means num of rocksdb level. + // only support {0,7}, 0 means no compress, 7 means zstd + // default [0,7]. + CompressionTypes ParamItem `refreshable:"false"` } func (r *RocksmqConfig) Init(base *BaseTable) { @@ -651,6 +656,13 @@ please adjust in embedded Milvus: /tmp/milvus/rdb_data`, Version: "2.2.2", } r.TickerTimeInSeconds.Init(base.mgr) + + r.CompressionTypes = ParamItem{ + Key: "rocksmq.compressionTypes", + DefaultValue: "0,0,7,7,7", + Version: "2.2.12", + } + r.CompressionTypes.Init(base.mgr) } // NatsmqConfig describes the configuration options for the Nats message queue