From eae02de8bd3952d5fd78345bbd36ebe38f11fc63 Mon Sep 17 00:00:00 2001 From: smellthemoon <64083300+smellthemoon@users.noreply.github.com> Date: Mon, 21 Aug 2023 12:58:20 +0800 Subject: [PATCH] Change the key to make read faster in rocksmq (#26404) Signed-off-by: lixinguo Co-authored-by: lixinguo --- internal/kv/rocksdb/rocks_iterator.go | 24 +++ .../mq/mqimpl/rocksmq/server/rocksmq_impl.go | 68 +++++--- .../rocksmq/server/rocksmq_impl_test.go | 161 +++++++++++++++++- 3 files changed, 222 insertions(+), 31 deletions(-) diff --git a/internal/kv/rocksdb/rocks_iterator.go b/internal/kv/rocksdb/rocks_iterator.go index e95b04ce72..aea3cc74cc 100644 --- a/internal/kv/rocksdb/rocks_iterator.go +++ b/internal/kv/rocksdb/rocks_iterator.go @@ -30,6 +30,17 @@ func NewRocksIterator(db *gorocksdb.DB, opts *gorocksdb.ReadOptions) *RocksItera return it } +func NewRocksIteratorCF(db *gorocksdb.DB, cf *gorocksdb.ColumnFamilyHandle, opts *gorocksdb.ReadOptions) *RocksIterator { + iter := db.NewIteratorCF(opts, cf) + it := &RocksIterator{iter, nil, false} + runtime.SetFinalizer(it, func(rocksit *RocksIterator) { + if !rocksit.close { + log.Error("iterator is leaking.. please check") + } + }) + return it +} + func NewRocksIteratorWithUpperBound(db *gorocksdb.DB, upperBoundString string, opts *gorocksdb.ReadOptions) *RocksIterator { upperBound := []byte(upperBoundString) opts.SetIterateUpperBound(upperBound) @@ -43,6 +54,19 @@ func NewRocksIteratorWithUpperBound(db *gorocksdb.DB, upperBoundString string, o return it } +func NewRocksIteratorCFWithUpperBound(db *gorocksdb.DB, cf *gorocksdb.ColumnFamilyHandle, upperBoundString string, opts *gorocksdb.ReadOptions) *RocksIterator { + upperBound := []byte(upperBoundString) + opts.SetIterateUpperBound(upperBound) + iter := db.NewIteratorCF(opts, cf) + it := &RocksIterator{iter, upperBound, false} + runtime.SetFinalizer(it, func(rocksit *RocksIterator) { + if !rocksit.close { + log.Error("iteratorCF is leaking.. please check") + } + }) + return it +} + // Valid returns false only when an Iterator has iterated past either the // first or the last key in the database. func (iter *RocksIterator) Valid() bool { diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go index cecce0eaae..312aa534b2 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go @@ -30,7 +30,6 @@ import ( "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/kv" rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" - "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/pkg/util/hardware" @@ -118,6 +117,7 @@ var topicMu = sync.Map{} type rocksmq struct { store *gorocksdb.DB + cfh []*gorocksdb.ColumnFamilyHandle kv kv.BaseKV idAllocator allocator.Interface storeMu *sync.Mutex @@ -222,8 +222,13 @@ func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error) optsStore.IncreaseParallelism(parallelism) // enable back ground flush optsStore.SetMaxBackgroundFlushes(1) + // use properties as the column families to store trace id + optsStore.SetCreateIfMissingColumnFamilies(true) - db, err := gorocksdb.OpenDb(optsStore, name) + // db, err := gorocksdb.OpenDb(opts, name) + // use properties as the column families to store trace id + giveColumnFamilies := []string{"default", "properties"} + db, cfHandles, err := gorocksdb.OpenDbColumnFamilies(optsStore, name, giveColumnFamilies, []*gorocksdb.Options{optsStore, optsStore}) if err != nil { return nil, err } @@ -243,6 +248,7 @@ func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error) rmq := &rocksmq{ store: db, + cfh: cfHandles, kv: kv, idAllocator: mqIDAllocator, storeMu: &sync.Mutex{}, @@ -634,17 +640,17 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni for i := 0; i < msgLen && idStart+UniqueID(i) < idEnd; i++ { msgID := idStart + UniqueID(i) key := path.Join(topicName, strconv.FormatInt(msgID, 10)) - batch.Put([]byte(key), messages[i].Payload) - properties, err := json.Marshal(messages[i].Properties) - if err != nil { - log.Warn("properties marshal failed", - zap.Int64("msgID", msgID), - zap.String("topicName", topicName), - zap.Error(err)) - return nil, err + batch.PutCF(rmq.cfh[0], []byte(key), messages[i].Payload) + // batch.Put([]byte(key), messages[i].Payload) + if messages[i].Properties != nil { + properties, err := json.Marshal(messages[i].Properties) + if err != nil { + log.Warn("properties marshal failed", zap.Int64("msgID", msgID), zap.String("topicName", topicName), + zap.Error(err)) + return nil, err + } + batch.PutCF(rmq.cfh[1], []byte(key), properties) } - pKey := path.Join(common.PropertiesKey, topicName, strconv.FormatInt(msgID, 10)) - batch.Put([]byte(pKey), properties) msgIDs[i] = msgID msgSizes[msgID] = int64(len(messages[i].Payload)) } @@ -777,8 +783,10 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum readOpts := gorocksdb.NewDefaultReadOptions() defer readOpts.Destroy() prefix := topicName + "/" - iter := rocksdbkv.NewRocksIteratorWithUpperBound(rmq.store, typeutil.AddOne(prefix), readOpts) + iter := rocksdbkv.NewRocksIteratorCFWithUpperBound(rmq.store, rmq.cfh[0], typeutil.AddOne(prefix), readOpts) + iterProperty := rocksdbkv.NewRocksIteratorCFWithUpperBound(rmq.store, rmq.cfh[1], typeutil.AddOne(prefix), readOpts) defer iter.Close() + defer iterProperty.Close() var dataKey string if currentID == DefaultMessageID { @@ -787,30 +795,39 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum dataKey = path.Join(topicName, strconv.FormatInt(currentID, 10)) } iter.Seek([]byte(dataKey)) + iterProperty.Seek([]byte(dataKey)) + consumerMessage := make([]ConsumerMessage, 0, n) offset := 0 + for ; iter.Valid() && offset < n; iter.Next() { key := iter.Key() val := iter.Value() strKey := string(key.Data()) key.Free() - offset++ + properties := make(map[string]string) + var propertiesValue []byte + msgID, err := strconv.ParseInt(strKey[len(topicName)+1:], 10, 64) if err != nil { val.Free() return nil, err } - askedProperties := path.Join(common.PropertiesKey, topicName, strconv.FormatInt(msgID, 10)) - opts := gorocksdb.NewDefaultReadOptions() - defer opts.Destroy() - propertiesValue, err := rmq.store.GetBytes(opts, []byte(askedProperties)) - if err != nil { - return nil, err + offset++ + + if iterProperty.Valid() && string(iterProperty.Key().Data()) == string(iter.Key().Data()) { + // the key of properties is the same with the key of payload + // to prevent mix message with or without property column family + propertiesValue = iterProperty.Value().Data() + iterProperty.Next() } - properties := make(map[string]string) + + // between 2.2.0 and 2.3.0, the key of Payload is topic/properties/msgid/Payload + // will ingnore the property before 2.3.0, just make sure property empty is ok for 2.3 + + // before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq + // when produce before 2.2.0, but consume after 2.2.0, propertiesValue will be [] if len(propertiesValue) != 0 { - // before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq - // when produce before 2.2.0, but consume in 2.2.0, propertiesValue will be [] if err = json.Unmarshal(propertiesValue, &properties); err != nil { return nil, err } @@ -1008,11 +1025,11 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error { func (rmq *rocksmq) getLatestMsg(topicName string) (int64, error) { readOpts := gorocksdb.NewDefaultReadOptions() defer readOpts.Destroy() - iter := rocksdbkv.NewRocksIterator(rmq.store, readOpts) + iter := rocksdbkv.NewRocksIteratorCF(rmq.store, rmq.cfh[0], readOpts) defer iter.Close() prefix := topicName + "/" - // seek to the last message of thie topic + // seek to the last message of the topic iter.SeekForPrev([]byte(typeutil.AddOne(prefix))) // if iterate fail @@ -1037,6 +1054,7 @@ func (rmq *rocksmq) getLatestMsg(topicName string) (int64, error) { } msgID, err := strconv.ParseInt(seekMsgID[len(topicName)+1:], 10, 64) + if err != nil { return DefaultMessageID, err } diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go index 39ee366e69..623713fec3 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go @@ -12,6 +12,7 @@ package server import ( + "encoding/json" "fmt" "os" "path" @@ -49,7 +50,7 @@ func TestMain(m *testing.M) { os.Exit(code) } -type producerMessageBefore struct { +type producerMessageBefore2 struct { Payload []byte } @@ -81,7 +82,7 @@ func etcdEndpoints() []string { } // to test compatibility concern -func (rmq *rocksmq) produceBefore(topicName string, messages []producerMessageBefore) ([]UniqueID, error) { +func (rmq *rocksmq) produceBefore2(topicName string, messages []producerMessageBefore2) ([]UniqueID, error) { if rmq.isClosed() { return nil, errors.New(RmqNotServingErrMsg) } @@ -161,6 +162,99 @@ func (rmq *rocksmq) produceBefore(topicName string, messages []producerMessageBe return msgIDs, nil } +// to test compatibility concern +func (rmq *rocksmq) produceIn2(topicName string, messages []ProducerMessage) ([]UniqueID, error) { + if rmq.isClosed() { + return nil, errors.New(RmqNotServingErrMsg) + } + start := time.Now() + ll, ok := topicMu.Load(topicName) + if !ok { + return []UniqueID{}, fmt.Errorf("topic name = %s not exist", topicName) + } + lock, ok := ll.(*sync.Mutex) + if !ok { + return []UniqueID{}, fmt.Errorf("get mutex failed, topic name = %s", topicName) + } + lock.Lock() + defer lock.Unlock() + + getLockTime := time.Since(start).Milliseconds() + + msgLen := len(messages) + idStart, idEnd, err := rmq.idAllocator.Alloc(uint32(msgLen)) + + if err != nil { + return []UniqueID{}, err + } + allocTime := time.Since(start).Milliseconds() + if UniqueID(msgLen) != idEnd-idStart { + return []UniqueID{}, errors.New("Obtained id length is not equal that of message") + } + + // Insert data to store system + batch := gorocksdb.NewWriteBatch() + defer batch.Destroy() + msgSizes := make(map[UniqueID]int64) + msgIDs := make([]UniqueID, msgLen) + for i := 0; i < msgLen && idStart+UniqueID(i) < idEnd; i++ { + msgID := idStart + UniqueID(i) + key := path.Join(topicName, strconv.FormatInt(msgID, 10)) + batch.Put([]byte(key), messages[i].Payload) + properties, err := json.Marshal(messages[i].Properties) + if err != nil { + log.Warn("properties marshal failed", + zap.Int64("msgID", msgID), + zap.String("topicName", topicName), + zap.Error(err)) + return nil, err + } + pKey := path.Join(common.PropertiesKey, topicName, strconv.FormatInt(msgID, 10)) + batch.Put([]byte(pKey), properties) + msgIDs[i] = msgID + msgSizes[msgID] = int64(len(messages[i].Payload)) + } + + opts := gorocksdb.NewDefaultWriteOptions() + defer opts.Destroy() + err = rmq.store.Write(opts, batch) + if err != nil { + return []UniqueID{}, err + } + writeTime := time.Since(start).Milliseconds() + if vals, ok := rmq.consumers.Load(topicName); ok { + for _, v := range vals.([]*Consumer) { + select { + case v.MsgMutex <- struct{}{}: + continue + default: + continue + } + } + } + + // Update message page info + err = rmq.updatePageInfo(topicName, msgIDs, msgSizes) + if err != nil { + return []UniqueID{}, err + } + + // TODO add this to monitor metrics + getProduceTime := time.Since(start).Milliseconds() + if getProduceTime > 200 { + log.Warn("rocksmq produce too slowly", zap.String("topic", topicName), + zap.Int64("get lock elapse", getLockTime), + zap.Int64("alloc elapse", allocTime-getLockTime), + zap.Int64("write elapse", writeTime-allocTime), + zap.Int64("updatePage elapse", getProduceTime-writeTime), + zap.Int64("produce total elapse", getProduceTime), + ) + } + + rmq.topicLastID.Store(topicName, msgIDs[len(msgIDs)-1]) + return msgIDs, nil +} + func TestRocksmq_RegisterConsumer(t *testing.T) { suffix := "_register" kvPath := rmqPath + kvPathSuffix + suffix @@ -310,12 +404,12 @@ func TestRocksmq_Compatibility(t *testing.T) { defer rmq.DestroyTopic(channelName) // before 2.2.0, there have no properties in ProducerMessage and ConsumerMessage in rocksmq - // it aims to test if produce before 2.2.0, but consume after 2.2.0 + // it aims to test if produce before 2.2.0, will consume after 2.2.0 successfully msgD := "d_message" - tMsgs := make([]producerMessageBefore, 1) - tMsgD := producerMessageBefore{Payload: []byte(msgD)} + tMsgs := make([]producerMessageBefore2, 1) + tMsgD := producerMessageBefore2{Payload: []byte(msgD)} tMsgs[0] = tMsgD - _, err = rmq.produceBefore(channelName, tMsgs) + _, err = rmq.produceBefore2(channelName, tMsgs) assert.NoError(t, err) groupName := "test_group" @@ -324,6 +418,9 @@ func TestRocksmq_Compatibility(t *testing.T) { assert.NoError(t, err) cMsgs, err := rmq.Consume(channelName, groupName, 1) + if err != nil { + log.Info("test", zap.Any("err", err)) + } assert.NoError(t, err) assert.Equal(t, len(cMsgs), 1) assert.Equal(t, string(cMsgs[0].Payload), "d_message") @@ -332,6 +429,58 @@ func TestRocksmq_Compatibility(t *testing.T) { // it will be set empty map if produce message has no properties field expect := make(map[string]string) assert.Equal(t, cMsgs[0].Properties, expect) + + // between 2.2.0 and 2.3.0, the key of Payload is topic/properties/msgid/Payload + // will ingnore the property before 2.3.0, just make sure property empty is ok for 2.3 + // after 2.3, the properties will be stored in column families + // it aims to test if produce in 2.2.0, but consume in 2.3.0, will get properties successfully + msg1 := "1_message" + tMsgs1 := make([]ProducerMessage, 1) + properties := make(map[string]string) + properties[common.TraceIDKey] = "1" + tMsg1 := ProducerMessage{Payload: []byte(msg1), Properties: properties} + tMsgs1[0] = tMsg1 + _, err = rmq.produceIn2(channelName, tMsgs1) + assert.NoError(t, err) + + msg2, err := rmq.Consume(channelName, groupName, 1) + assert.NoError(t, err) + assert.Equal(t, len(msg2), 1) + assert.Equal(t, string(msg2[0].Payload), "1_message") + _, ok = msg2[0].Properties[common.TraceIDKey] + assert.False(t, ok) + // will ingnore the property before 2.3.0, just make sure property empty is ok for 2.3 + expect = make(map[string]string) + assert.Equal(t, cMsgs[0].Properties, expect) + + // between 2.2.0 and 2.3.0, the key of Payload is topic/properties/msgid/Payload + // after 2.3, the properties will be stored in column families + // it aims to test the mixed message before 2.3.0 and after 2.3.0, will get properties successfully + msg3 := "3_message" + tMsgs3 := make([]ProducerMessage, 2) + properties3 := make(map[string]string) + properties3[common.TraceIDKey] = "3" + tMsg3 := ProducerMessage{Payload: []byte(msg3), Properties: properties3} + tMsgs3[0] = tMsg3 + msg4 := "4_message" + tMsg4 := ProducerMessage{Payload: []byte(msg4)} + tMsgs3[1] = tMsg4 + _, err = rmq.Produce(channelName, tMsgs3) + assert.NoError(t, err) + + msg5, err := rmq.Consume(channelName, groupName, 2) + assert.NoError(t, err) + assert.Equal(t, len(msg5), 2) + assert.Equal(t, string(msg5[0].Payload), "3_message") + _, ok = msg5[0].Properties[common.TraceIDKey] + assert.True(t, ok) + assert.Equal(t, msg5[0].Properties, properties3) + assert.Equal(t, string(msg5[1].Payload), "4_message") + _, ok = msg5[1].Properties[common.TraceIDKey] + assert.False(t, ok) + // it will be set empty map if produce message has no properties field + expect = make(map[string]string) + assert.Equal(t, msg5[1].Properties, expect) } func TestRocksmq_MultiConsumer(t *testing.T) {