From 5a303e7672d5f8b239250368f5bb4b0443327abc Mon Sep 17 00:00:00 2001 From: yukun Date: Fri, 10 Sep 2021 10:22:01 +0800 Subject: [PATCH] Fix rocksmq load with prefix (#7678) Signed-off-by: fishpenguin --- internal/kv/rocksdb/rocksdb_kv.go | 8 ++++++++ internal/util/rocksmq/server/rocksmq/rocksmq_impl.go | 2 +- internal/util/rocksmq/server/rocksmq/rocksmq_retention.go | 5 +++-- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/internal/kv/rocksdb/rocksdb_kv.go b/internal/kv/rocksdb/rocksdb_kv.go index 0bec4ba438..cf8bfef290 100644 --- a/internal/kv/rocksdb/rocksdb_kv.go +++ b/internal/kv/rocksdb/rocksdb_kv.go @@ -90,6 +90,14 @@ func (kv *RocksdbKV) LoadWithPrefix(key string) ([]string, []string, error) { return keys, values, nil } +func (kv *RocksdbKV) ResetPrefixLength(len int) error { + kv.DB.Close() + kv.Opts.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(len)) + var err error + kv.DB, err = gorocksdb.OpenDb(kv.Opts, kv.GetName()) + return err +} + func (kv *RocksdbKV) MultiLoad(keys []string) ([]string, error) { values := make([]string, 0, len(keys)) for _, key := range keys { diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 4536d8c534..68b5587574 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -113,7 +113,7 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro opts := gorocksdb.NewDefaultOptions() opts.SetBlockBasedTableFactory(bbto) opts.SetCreateIfMissing(true) - opts.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(FixedChannelNameLen + 1)) + opts.SetPrefixExtractor(gorocksdb.NewFixedPrefixTransform(FixedChannelNameLen)) db, err := gorocksdb.OpenDb(opts, name) if err != nil { diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go index 6e95814b7b..5baa111797 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go @@ -76,10 +76,10 @@ func prefixLoad(db *gorocksdb.DB, prefix string) ([]string, []string, error) { for ; iter.Valid(); iter.Next() { key := iter.Key() value := iter.Value() - defer key.Free() - defer value.Free() keys = append(keys, string(key.Data())) + key.Free() values = append(values, string(value.Data())) + value.Free() } if err := iter.Err(); err != nil { return nil, nil, err @@ -112,6 +112,7 @@ func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInf func (ri *retentionInfo) startRetentionInfo() error { var wg sync.WaitGroup + ri.kv.ResetPrefixLength(FixedChannelNameLen) for _, topic := range ri.topics { log.Debug("Start load retention info", zap.Any("topic", topic)) // Load all page infos