From 98e4ff33a87520aa774bf92ab075f697b0fbd94a Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Thu, 30 Dec 2021 20:08:15 +0800 Subject: [PATCH] Fix RocksDB Slow (#14614) --- internal/kv/rocksdb/RocksIterator.go | 108 ++++++++++++++++++ internal/kv/rocksdb/rocksdb_kv.go | 6 +- internal/proxy/task.go | 1 - .../rocksmq/client/rocksmq/client_impl.go | 9 +- .../util/rocksmq/server/rocksmq/global_rmq.go | 2 +- .../rocksmq/server/rocksmq/rocksmq_impl.go | 43 ++++--- .../rocksmq/server/rocksmq/rocksmq_reader.go | 17 +-- .../server/rocksmq/rocksmq_retention.go | 18 +-- 8 files changed, 162 insertions(+), 42 deletions(-) create mode 100644 internal/kv/rocksdb/RocksIterator.go diff --git a/internal/kv/rocksdb/RocksIterator.go b/internal/kv/rocksdb/RocksIterator.go new file mode 100644 index 0000000000..fdb89410f2 --- /dev/null +++ b/internal/kv/rocksdb/RocksIterator.go @@ -0,0 +1,108 @@ +package rocksdbkv + +import ( + "runtime" + + "github.com/milvus-io/milvus/internal/log" + "github.com/tecbot/gorocksdb" +) + +/** + * A wrapper of go rocksdb iterator + * it helps on 1) reserve the upperBound array to avoid garbage collection + * 2) do a leakage check of iterator + */ +type RocksIterator struct { + it *gorocksdb.Iterator + upperBound []byte + close bool +} + +func NewRocksIterator(db *gorocksdb.DB, opts *gorocksdb.ReadOptions) *RocksIterator { + iter := db.NewIterator(opts) + it := &RocksIterator{iter, nil, false} + runtime.SetFinalizer(it, func(rocksit *RocksIterator) { + if !rocksit.close { + log.Error("iterator is leaking.. please check") + } + }) + return it +} + +func NewRocksIteratorWithUpperBound(db *gorocksdb.DB, upperBoundString string, opts *gorocksdb.ReadOptions) *RocksIterator { + upperBound := []byte(upperBoundString) + opts.SetIterateUpperBound(upperBound) + iter := db.NewIterator(opts) + it := &RocksIterator{iter, upperBound, false} + runtime.SetFinalizer(it, func(rocksit *RocksIterator) { + if !rocksit.close { + log.Error("iterator is leaking.. please check") + } + }) + return it +} + +// Valid returns false only when an Iterator has iterated past either the +// first or the last key in the database. +func (iter *RocksIterator) Valid() bool { + return iter.it.Valid() +} + +// ValidForPrefix returns false only when an Iterator has iterated past the +// first or the last key in the database or the specified prefix. +func (iter *RocksIterator) ValidForPrefix(prefix []byte) bool { + return iter.it.ValidForPrefix(prefix) +} + +// Key returns the key the iterator currently holds. +func (iter *RocksIterator) Key() *gorocksdb.Slice { + return iter.it.Key() +} + +// Value returns the value in the database the iterator currently holds. +func (iter *RocksIterator) Value() *gorocksdb.Slice { + return iter.it.Value() +} + +// Next moves the iterator to the next sequential key in the database. +func (iter *RocksIterator) Next() { + iter.it.Next() +} + +// Prev moves the iterator to the previous sequential key in the database. +func (iter *RocksIterator) Prev() { + iter.it.Prev() +} + +// SeekToFirst moves the iterator to the first key in the database. +func (iter *RocksIterator) SeekToFirst() { + iter.it.SeekToFirst() +} + +// SeekToLast moves the iterator to the last key in the database. +func (iter *RocksIterator) SeekToLast() { + iter.it.SeekToLast() +} + +// Seek moves the iterator to the position greater than or equal to the key. +func (iter *RocksIterator) Seek(key []byte) { + iter.it.Seek(key) +} + +// SeekForPrev moves the iterator to the last key that less than or equal +// to the target key, in contrast with Seek. +func (iter *RocksIterator) SeekForPrev(key []byte) { + iter.it.SeekForPrev(key) +} + +// Err returns nil if no errors happened during iteration, or the actual +// error otherwise. +func (iter *RocksIterator) Err() error { + return iter.it.Err() +} + +// Close closes the iterator. +func (iter *RocksIterator) Close() { + iter.close = true + iter.it.Close() +} diff --git a/internal/kv/rocksdb/rocksdb_kv.go b/internal/kv/rocksdb/rocksdb_kv.go index 2866cffc18..0565a3b937 100644 --- a/internal/kv/rocksdb/rocksdb_kv.go +++ b/internal/kv/rocksdb/rocksdb_kv.go @@ -116,12 +116,12 @@ func (kv *RocksdbKV) LoadWithPrefix(prefix string) ([]string, []string, error) { } option := gorocksdb.NewDefaultReadOptions() defer option.Destroy() - iter := kv.DB.NewIterator(option) + iter := NewRocksIteratorWithUpperBound(kv.DB, typeutil.AddOne(prefix), option) defer iter.Close() var keys, values []string iter.Seek([]byte(prefix)) - for ; iter.ValidForPrefix([]byte(prefix)); iter.Next() { + for ; iter.Valid(); iter.Next() { key := iter.Key() value := iter.Value() keys = append(keys, string(key.Data())) @@ -193,7 +193,7 @@ func (kv *RocksdbKV) RemoveWithPrefix(prefix string) error { // better to use drop column family, but as we use default column family, we just delete ["",lastKey+1) readOpts := gorocksdb.NewDefaultReadOptions() defer readOpts.Destroy() - iter := kv.DB.NewIterator(readOpts) + iter := NewRocksIterator(kv.DB, readOpts) defer iter.Close() // seek to the last key iter.SeekToLast() diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 698cf8bc66..d92f6fe081 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -891,7 +891,6 @@ func (it *insertTask) _assignSegmentID(stream msgstream.MsgStream, pack *msgstre } threshold := Params.ProxyCfg.PulsarMaxMessageSize - log.Debug("Proxy", zap.Int("threshold of message size: ", threshold)) // not accurate /* #nosec G103 */ getFixedSizeOfInsertMsg := func(msg *msgstream.InsertMsg) int { diff --git a/internal/util/rocksmq/client/rocksmq/client_impl.go b/internal/util/rocksmq/client/rocksmq/client_impl.go index e5b5ed1901..c458a9fceb 100644 --- a/internal/util/rocksmq/client/rocksmq/client_impl.go +++ b/internal/util/rocksmq/client/rocksmq/client_impl.go @@ -143,11 +143,14 @@ func (c *client) consume(consumer *consumer) { } } -func (c *client) deliver(consumer *consumer, batchMin int) { +func (c *client) deliver(consumer *consumer, batchMax int) { for { n := cap(consumer.messageCh) - len(consumer.messageCh) - if n < batchMin { // batch min size - n = batchMin + if n == 0 { + return + } + if n > batchMax { // batch min size + n = batchMax } msgs, err := consumer.client.server.Consume(consumer.topic, consumer.consumerName, n) if err != nil { diff --git a/internal/util/rocksmq/server/rocksmq/global_rmq.go b/internal/util/rocksmq/server/rocksmq/global_rmq.go index 0019417f56..fba0ff38e7 100644 --- a/internal/util/rocksmq/server/rocksmq/global_rmq.go +++ b/internal/util/rocksmq/server/rocksmq/global_rmq.go @@ -83,7 +83,7 @@ func InitRocksMQ() error { if err == nil { atomic.StoreInt64(&RocksmqRetentionTimeInSecs, rawRmqRetentionTimeInMinutes*60) } else { - log.Warn("rocksmq.retentionTimeInMinutes is invalid, using default value 3 days") + log.Warn("rocksmq.retentionTimeInMinutes is invalid, using default value") } } rawRmqRetentionSizeInMB, err := params.Load("rocksmq.retentionSizeInMB") diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index c2b670a532..b2258241d8 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -147,16 +147,22 @@ type rocksmq struct { // 3. Start retention goroutine func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, error) { // TODO we should use same rocksdb instance with different cfs + maxProcs := runtime.GOMAXPROCS(0) + parallelism := 1 + if maxProcs > 32 { + parallelism = 4 + } else if maxProcs > 8 { + parallelism = 2 + } + log.Debug("Start rocksmq ", zap.Int("max proc", maxProcs), zap.Int("parallism", parallelism)) bbto := gorocksdb.NewDefaultBlockBasedTableOptions() - bbto.SetCacheIndexAndFilterBlocks(true) - bbto.SetPinL0FilterAndIndexBlocksInCache(true) bbto.SetBlockCache(gorocksdb.NewLRUCache(RocksDBLRUCacheCapacity)) optsKV := gorocksdb.NewDefaultOptions() optsKV.SetBlockBasedTableFactory(bbto) optsKV.SetCreateIfMissing(true) // by default there are only 1 thread for flush compaction, which may block each other. // increase to a reasonable thread numbers - optsKV.IncreaseParallelism(runtime.NumCPU()) + optsKV.IncreaseParallelism(parallelism) // enable back ground flush optsKV.SetMaxBackgroundFlushes(1) @@ -174,7 +180,7 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro optsStore.SetCreateIfMissing(true) // by default there are only 1 thread for flush compaction, which may block each other. // increase to a reasonable thread numbers - optsStore.IncreaseParallelism(runtime.NumCPU()) + optsStore.IncreaseParallelism(parallelism) // enable back ground flush optsStore.SetMaxBackgroundFlushes(1) @@ -219,16 +225,19 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro go func() { for { time.Sleep(5 * time.Minute) - log.Info("Rocksmq memory usage", - zap.String("rockskv kv cache", kv.DB.GetProperty("rocksdb.block-cache-usage")), - zap.String("rockskv memtable ", kv.DB.GetProperty("rocksdb.cur-size-all-mem-tables")), + log.Info("Rocksmq stats", + zap.String("cache", kv.DB.GetProperty("rocksdb.block-cache-usage")), + zap.String("rockskv memtable ", kv.DB.GetProperty("rocksdb.size-all-mem-tables")), zap.String("rockskv table readers", kv.DB.GetProperty("rocksdb.estimate-table-readers-mem")), zap.String("rockskv pinned", kv.DB.GetProperty("rocksdb.block-cache-pinned-usage")), - - zap.String("store kv cache", db.GetProperty("rocksdb.block-cache-usage")), - zap.String("store memtable ", db.GetProperty("rocksdb.cur-size-all-mem-tables")), + zap.String("store memtable ", db.GetProperty("rocksdb.size-all-mem-tables")), zap.String("store table readers", db.GetProperty("rocksdb.estimate-table-readers-mem")), zap.String("store pinned", db.GetProperty("rocksdb.block-cache-pinned-usage")), + zap.String("store l0 file num", db.GetProperty("rocksdb.num-files-at-level0")), + zap.String("store l1 file num", db.GetProperty("rocksdb.num-files-at-level1")), + zap.String("store l2 file num", db.GetProperty("rocksdb.num-files-at-level2")), + zap.String("store l3 file num", db.GetProperty("rocksdb.num-files-at-level3")), + zap.String("store l4 file num", db.GetProperty("rocksdb.num-files-at-level4")), ) } }() @@ -644,7 +653,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum readOpts := gorocksdb.NewDefaultReadOptions() defer readOpts.Destroy() prefix := topicName + "/" - iter := rmq.store.NewIterator(readOpts) + iter := rocksdbkv.NewRocksIteratorWithUpperBound(rmq.store, typeutil.AddOne(prefix), readOpts) defer iter.Close() var dataKey string @@ -654,10 +663,9 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum dataKey = path.Join(topicName, strconv.FormatInt(currentID.(int64), 10)) } iter.Seek([]byte(dataKey)) - consumerMessage := make([]ConsumerMessage, 0, n) offset := 0 - for ; iter.ValidForPrefix([]byte(prefix)) && offset < n; iter.Next() { + for ; iter.Valid() && offset < n; iter.Next() { key := iter.Key() val := iter.Value() strKey := string(key.Data()) @@ -795,7 +803,7 @@ func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error { readOpts := gorocksdb.NewDefaultReadOptions() defer readOpts.Destroy() - iter := rmq.store.NewIterator(readOpts) + iter := rocksdbkv.NewRocksIterator(rmq.store, readOpts) defer iter.Close() prefix := topicName + "/" @@ -863,11 +871,11 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID) defer readOpts.Destroy() pageMsgFirstKey := pageMsgPrefix + strconv.FormatInt(firstID, 10) - iter := rmq.kv.(*rocksdbkv.RocksdbKV).DB.NewIterator(readOpts) + iter := rocksdbkv.NewRocksIteratorWithUpperBound(rmq.kv.(*rocksdbkv.RocksdbKV).DB, typeutil.AddOne(pageMsgPrefix), readOpts) defer iter.Close() var pageIDs []UniqueID - for iter.Seek([]byte(pageMsgFirstKey)); iter.ValidForPrefix([]byte(pageMsgPrefix)); iter.Next() { + for iter.Seek([]byte(pageMsgFirstKey)); iter.Valid(); iter.Next() { key := iter.Key() pageID, err := parsePageID(string(key.Data())) if key != nil { @@ -940,7 +948,7 @@ func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageI } readOpts := gorocksdb.NewDefaultReadOptions() readOpts.SetPrefixSameAsStart(true) - iter := rmq.store.NewIterator(readOpts) + iter := rocksdbkv.NewRocksIteratorWithUpperBound(rmq.store, typeutil.AddOne(topicName+"/"), readOpts) dataKey := path.Join(topicName, strconv.FormatInt(startMsgID, 10)) iter.Seek([]byte(dataKey)) // if iterate fail @@ -957,7 +965,6 @@ func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageI reader := &rocksmqReader{ store: rmq.store, topic: topicName, - prefix: []byte(topicName + "/"), readerName: readerName, readOpts: readOpts, iter: iter, diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_reader.go b/internal/util/rocksmq/server/rocksmq/rocksmq_reader.go index 4570c6a61e..04cd0cf48b 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_reader.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_reader.go @@ -18,18 +18,19 @@ import ( "path" "strconv" + rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/tecbot/gorocksdb" ) type rocksmqReader struct { store *gorocksdb.DB topic string - prefix []byte readerName string readOpts *gorocksdb.ReadOptions - iter *gorocksdb.Iterator + iter *rocksdbkv.RocksIterator currentID UniqueID messageIDInclusive bool @@ -77,7 +78,7 @@ func (rr *rocksmqReader) Next(ctx context.Context) (*ConsumerMessage, error) { iter.Next() rr.currentID = msgID } - if iter.ValidForPrefix(rr.prefix) { + if iter.Valid() { getMsg() return msg, err } @@ -92,11 +93,11 @@ func (rr *rocksmqReader) Next(ctx context.Context) (*ConsumerMessage, error) { return nil, fmt.Errorf("reader Mutex closed") } rr.iter.Close() - rr.iter = rr.store.NewIterator(rr.readOpts) + rr.iter = rocksdbkv.NewRocksIteratorWithUpperBound(rr.store, typeutil.AddOne(rr.topic+"/"), rr.readOpts) dataKey := path.Join(rr.topic, strconv.FormatInt(rr.currentID+1, 10)) iter = rr.iter iter.Seek([]byte(dataKey)) - if !iter.ValidForPrefix(rr.prefix) { + if !iter.Valid() { return nil, errors.New("reader iterater is still invalid after receive mutex") } getMsg() @@ -105,7 +106,7 @@ func (rr *rocksmqReader) Next(ctx context.Context) (*ConsumerMessage, error) { } func (rr *rocksmqReader) HasNext() bool { - if rr.iter.ValidForPrefix(rr.prefix) { + if rr.iter.Valid() { return true } @@ -115,10 +116,10 @@ func (rr *rocksmqReader) HasNext() bool { return false } rr.iter.Close() - rr.iter = rr.store.NewIterator(rr.readOpts) + rr.iter = rocksdbkv.NewRocksIteratorWithUpperBound(rr.store, typeutil.AddOne(rr.topic+"/"), rr.readOpts) dataKey := path.Join(rr.topic, strconv.FormatInt(rr.currentID+1, 10)) rr.iter.Seek([]byte(dataKey)) - return rr.iter.ValidForPrefix(rr.prefix) + return rr.iter.Valid() default: return false } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go index 67ed4a0450..b4098e17f3 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go @@ -21,6 +21,7 @@ import ( rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/tecbot/gorocksdb" "go.uber.org/zap" ) @@ -133,7 +134,6 @@ func (ri *retentionInfo) Stop() { // 3. delete acked info by range of page id; // 4. delete message by range of page id; func (ri *retentionInfo) expiredCleanUp(topic string) error { - log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic) start := time.Now() var deletedAckedSize int64 var pageCleaned UniqueID @@ -148,17 +148,18 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { } // Quick Path, No page to check if totalAckedSize == 0 { - log.Debug("All messages are not expired, skip retention because no ack", zap.Any("topic", topic)) + log.Debug("All messages are not expired, skip retention because no ack", zap.Any("topic", topic), + zap.Any("time taken", time.Since(start).Milliseconds())) return nil } pageReadOpts := gorocksdb.NewDefaultReadOptions() defer pageReadOpts.Destroy() pageMsgPrefix := constructKey(PageMsgSizeTitle, topic) + "/" - pageIter := ri.kv.DB.NewIterator(pageReadOpts) + pageIter := rocksdbkv.NewRocksIteratorWithUpperBound(ri.kv.DB, typeutil.AddOne(pageMsgPrefix), pageReadOpts) defer pageIter.Close() pageIter.Seek([]byte(pageMsgPrefix)) - for ; pageIter.ValidForPrefix([]byte(pageMsgPrefix)); pageIter.Next() { + for ; pageIter.Valid(); pageIter.Next() { pKey := pageIter.Key() pageID, err := parsePageID(string(pKey.Data())) if pKey != nil { @@ -201,7 +202,8 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { } log.Debug("Expired check by retention time", zap.Any("topic", topic), - zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize), zap.Any("pageCleaned", pageCleaned)) + zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize), + zap.Any("pageCleaned", pageCleaned), zap.Any("time taken", time.Since(start).Milliseconds())) for ; pageIter.Valid(); pageIter.Next() { pValue := pageIter.Value() @@ -234,7 +236,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error { } if pageEndID == 0 { - log.Debug("All messages are not expired, skip retention", zap.Any("topic", topic)) + log.Debug("All messages are not expired, skip retention", zap.Any("topic", topic), zap.Any("time taken", time.Since(start).Milliseconds())) return nil } expireTime := time.Since(start).Milliseconds() @@ -251,11 +253,11 @@ func (ri *retentionInfo) calculateTopicAckedSize(topic string) (int64, error) { defer pageReadOpts.Destroy() pageMsgPrefix := constructKey(PageMsgSizeTitle, topic) + "/" // ensure the iterator won't iterate to other topics - pageIter := ri.kv.DB.NewIterator(pageReadOpts) + pageIter := rocksdbkv.NewRocksIteratorWithUpperBound(ri.kv.DB, typeutil.AddOne(pageMsgPrefix), pageReadOpts) defer pageIter.Close() pageIter.Seek([]byte(pageMsgPrefix)) var ackedSize int64 - for ; pageIter.ValidForPrefix([]byte(pageMsgPrefix)); pageIter.Next() { + for ; pageIter.Valid(); pageIter.Next() { key := pageIter.Key() pageID, err := parsePageID(string(key.Data())) if key != nil {