From 264f08953dabfc51f789a236f2ae8bcede08ea05 Mon Sep 17 00:00:00 2001 From: Enwei Jiao Date: Mon, 26 Dec 2022 19:11:30 +0800 Subject: [PATCH] Refactor Rocksmq's config (#21304) Signed-off-by: Enwei Jiao --- cmd/tools/migration/configs/config.go | 11 ++- internal/config/config.go | 2 - internal/config/env_source.go | 4 + internal/config/file_source.go | 2 +- internal/config/manager.go | 2 + internal/config/manager_test.go | 4 + internal/config/source.go | 1 + .../distributed/connection_manager_test.go | 2 +- .../mqimpl/rocksmq/client/client_impl_test.go | 7 ++ .../mq/mqimpl/rocksmq/client/test_helper.go | 5 +- .../mq/mqimpl/rocksmq/server/global_rmq.go | 23 +---- .../mq/mqimpl/rocksmq/server/rocksmq_impl.go | 14 +-- .../rocksmq/server/rocksmq_impl_test.go | 76 +++++++-------- .../rocksmq/server/rocksmq_retention.go | 41 +++----- .../rocksmq/server/rocksmq_retention_test.go | 94 ++++++++++--------- internal/mq/msgstream/mq_factory_test.go | 2 + internal/mq/msgstream/mq_msgstream_test.go | 4 +- .../mqwrapper/pulsar/pulsar_client_test.go | 6 +- .../mqwrapper/pulsar/pulsar_consumer_test.go | 2 +- .../mqwrapper/rmq/rmq_client_test.go | 3 +- internal/querynode/mock_test.go | 4 +- internal/storage/minio_chunk_manager_test.go | 4 +- internal/util/flowgraph/flow_graph_test.go | 8 ++ internal/util/paramtable/base_table.go | 79 +++------------- internal/util/paramtable/base_table_test.go | 70 -------------- internal/util/paramtable/service_param.go | 59 +++++++++++- .../util/sessionutil/session_util_test.go | 10 +- 27 files changed, 224 insertions(+), 315 deletions(-) diff --git a/cmd/tools/migration/configs/config.go b/cmd/tools/migration/configs/config.go index c92776c710..a902792024 100644 --- a/cmd/tools/migration/configs/config.go +++ b/cmd/tools/migration/configs/config.go @@ -2,6 +2,7 @@ package configs import ( "fmt" + "strconv" "github.com/milvus-io/milvus/cmd/tools/migration/console" "github.com/milvus-io/milvus/internal/util" @@ -55,11 +56,11 @@ func (c *RunConfig) show() { func (c *RunConfig) init(base *paramtable.BaseTable) { c.base = base - c.Cmd = c.base.LoadWithDefault("cmd.type", "") - c.RunWithBackup = c.base.ParseBool("cmd.runWithBackup", false) - c.SourceVersion = c.base.LoadWithDefault("config.sourceVersion", "") - c.TargetVersion = c.base.LoadWithDefault("config.targetVersion", "") - c.BackupFilePath = c.base.LoadWithDefault("config.backupFilePath", "") + c.Cmd = c.base.GetWithDefault("cmd.type", "") + c.RunWithBackup, _ = strconv.ParseBool(c.base.GetWithDefault("cmd.runWithBackup", "false")) + c.SourceVersion = c.base.GetWithDefault("config.sourceVersion", "") + c.TargetVersion = c.base.GetWithDefault("config.targetVersion", "") + c.BackupFilePath = c.base.GetWithDefault("config.backupFilePath", "") } type MilvusConfig struct { diff --git a/internal/config/config.go b/internal/config/config.go index 3dad026af6..b1940173fa 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -36,7 +36,6 @@ func Init(opts ...Option) (*Manager, error) { if o.File != nil { s := NewFileSource(o.File) sourceManager.AddSource(s) - s.SetEventHandler(sourceManager) } if o.EnvKeyFormatter != nil { @@ -48,7 +47,6 @@ func Init(opts ...Option) (*Manager, error) { return nil, err } sourceManager.AddSource(s) - s.SetEventHandler(sourceManager) } return sourceManager, nil diff --git a/internal/config/env_source.go b/internal/config/env_source.go index ea7eb24d6a..783eb08b04 100644 --- a/internal/config/env_source.go +++ b/internal/config/env_source.go @@ -76,6 +76,10 @@ func (es EnvSource) GetSourceName() string { return "EnvironmentSource" } +func (es EnvSource) SetEventHandler(eh EventHandler) { + +} + func (es EnvSource) Close() { } diff --git a/internal/config/file_source.go b/internal/config/file_source.go index e01e5779ce..ed7935e5be 100644 --- a/internal/config/file_source.go +++ b/internal/config/file_source.go @@ -111,7 +111,7 @@ func (fs *FileSource) loadFromFile() error { str, err := cast.ToStringE(val) if err != nil { switch val := val.(type) { - case []interface{}: + case []any: str = str[:0] for _, v := range val { ss, err := cast.ToStringE(v) diff --git a/internal/config/manager.go b/internal/config/manager.go index d1c1cf5d79..faac3b212d 100644 --- a/internal/config/manager.go +++ b/internal/config/manager.go @@ -173,6 +173,8 @@ func (m *Manager) AddSource(source Source) error { return err } + source.SetEventHandler(m) + return nil } diff --git a/internal/config/manager_test.go b/internal/config/manager_test.go index fbd0f29977..1e10d49cd2 100644 --- a/internal/config/manager_test.go +++ b/internal/config/manager_test.go @@ -71,3 +71,7 @@ func (ErrSource) GetPriority() int { func (ErrSource) GetSourceName() string { return "ErrSource" } + +func (e ErrSource) SetEventHandler(eh EventHandler) { + +} diff --git a/internal/config/source.go b/internal/config/source.go index b92f8e863f..ead2319b06 100644 --- a/internal/config/source.go +++ b/internal/config/source.go @@ -28,6 +28,7 @@ type Source interface { GetConfigurationByKey(string) (string, error) GetPriority() int GetSourceName() string + SetEventHandler(eh EventHandler) Close() } diff --git a/internal/distributed/connection_manager_test.go b/internal/distributed/connection_manager_test.go index 06fd4813bc..e48262138e 100644 --- a/internal/distributed/connection_manager_test.go +++ b/internal/distributed/connection_manager_test.go @@ -279,7 +279,7 @@ func initSession(ctx context.Context) *sessionutil.Session { } metaRootPath := rootPath + "/" + subPath - endpoints := baseTable.LoadWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) + endpoints := baseTable.GetWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) etcdEndpoints := strings.Split(endpoints, ",") log.Debug("metaRootPath", zap.Any("metaRootPath", metaRootPath)) diff --git a/internal/mq/mqimpl/rocksmq/client/client_impl_test.go b/internal/mq/mqimpl/rocksmq/client/client_impl_test.go index dc29f00ff5..7ec4411b43 100644 --- a/internal/mq/mqimpl/rocksmq/client/client_impl_test.go +++ b/internal/mq/mqimpl/rocksmq/client/client_impl_test.go @@ -17,11 +17,18 @@ import ( "time" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/stretchr/testify/assert" ) var rmqPath = "/tmp/rocksmq_client" +func TestMain(m *testing.M) { + paramtable.Init() + code := m.Run() + os.Exit(code) +} + func TestClient(t *testing.T) { client, err := NewClient(Options{}) assert.NotNil(t, client) diff --git a/internal/mq/mqimpl/rocksmq/client/test_helper.go b/internal/mq/mqimpl/rocksmq/client/test_helper.go index c67de70bfa..87a2d0ff90 100644 --- a/internal/mq/mqimpl/rocksmq/client/test_helper.go +++ b/internal/mq/mqimpl/rocksmq/client/test_helper.go @@ -19,7 +19,6 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server" - "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/stretchr/testify/assert" "go.uber.org/zap" @@ -47,9 +46,7 @@ func newMockClient() *client { func newRocksMQ(t *testing.T, rmqPath string) server.RocksMQ { rocksdbPath := rmqPath - var params paramtable.BaseTable - params.Init(0) - rmq, err := server.NewRocksMQ(params, rocksdbPath, nil) + rmq, err := server.NewRocksMQ(rocksdbPath, nil) assert.NoError(t, err) return rmq } diff --git a/internal/mq/mqimpl/rocksmq/server/global_rmq.go b/internal/mq/mqimpl/rocksmq/server/global_rmq.go index 61e8a4d179..78012401ab 100644 --- a/internal/mq/mqimpl/rocksmq/server/global_rmq.go +++ b/internal/mq/mqimpl/rocksmq/server/global_rmq.go @@ -19,13 +19,10 @@ package server import ( "errors" "os" - "strconv" "sync" - "sync/atomic" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/paramtable" "go.uber.org/zap" ) @@ -36,14 +33,10 @@ var Rmq *rocksmq // once is used to init global rocksmq var once sync.Once -// Params provide params that rocksmq needs -var params paramtable.BaseTable - // InitRmq is deprecate implementation of global rocksmq. will be removed later func InitRmq(rocksdbName string, idAllocator allocator.Interface) error { var err error - params.Init(0) - Rmq, err = NewRocksMQ(params, rocksdbName, idAllocator) + Rmq, err = NewRocksMQ(rocksdbName, idAllocator) return err } @@ -51,7 +44,6 @@ func InitRmq(rocksdbName string, idAllocator allocator.Interface) error { func InitRocksMQ(path string) error { var finalErr error once.Do(func() { - params.Init(0) log.Debug("initializing global rmq", zap.String("path", path)) var fi os.FileInfo fi, finalErr = os.Stat(path) @@ -67,18 +59,7 @@ func InitRocksMQ(path string) error { return } } - - rawRmqPageSize, err := params.Load("rocksmq.rocksmqPageSize") - if err == nil && rawRmqPageSize != "" { - rmqPageSize, err := strconv.ParseInt(rawRmqPageSize, 10, 64) - if err == nil { - atomic.StoreInt64(&RocksmqPageSize, rmqPageSize) - } else { - log.Warn("rocksmq.rocksmqPageSize is invalid, using default value 2G") - } - } - - Rmq, finalErr = NewRocksMQ(params, path, nil) + Rmq, finalErr = NewRocksMQ(path, nil) }) return finalErr } diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go index 87bc1db357..3e77ef3e3b 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl.go @@ -45,7 +45,6 @@ type UniqueID = typeutil.UniqueID type RmqState = int64 // RocksmqPageSize is the size of a message page, default 256MB -var RocksmqPageSize int64 = 256 << 20 // RocksDB cache size limitation(TODO config it) var RocksDBLRUCacheMinCapacity = uint64(1 << 29) @@ -108,7 +107,8 @@ func parsePageID(key string) (int64, error) { } func checkRetention() bool { - return RocksmqRetentionTimeInSecs != -1 || RocksmqRetentionSizeInMB != -1 + params := paramtable.Get() + return params.RocksmqCfg.RetentionSizeInMB.GetAsInt64() != -1 || params.RocksmqCfg.RetentionTimeInMinutes.GetAsInt64() != -1 } var topicMu = sync.Map{} @@ -130,7 +130,7 @@ type rocksmq struct { // 1. New rocksmq instance based on rocksdb with name and rocksdbkv with kvname // 2. Init retention info, load retention info to memory // 3. Start retention goroutine -func NewRocksMQ(params paramtable.BaseTable, name string, idAllocator allocator.Interface) (*rocksmq, error) { +func NewRocksMQ(name string, idAllocator allocator.Interface) (*rocksmq, error) { // TODO we should use same rocksdb instance with different cfs maxProcs := runtime.GOMAXPROCS(0) parallelism := 1 @@ -143,7 +143,8 @@ func NewRocksMQ(params paramtable.BaseTable, name string, idAllocator allocator. // default rocks db cache is set with memory rocksDBLRUCacheCapacity := RocksDBLRUCacheMinCapacity if memoryCount > 0 { - ratio := params.ParseFloatWithDefault("rocksmq.lrucacheratio", 0.06) + params := paramtable.Get() + ratio := params.RocksmqCfg.LRUCacheRatio.GetAsFloat() calculatedCapacity := uint64(float64(memoryCount) * ratio) if calculatedCapacity < RocksDBLRUCacheMinCapacity { rocksDBLRUCacheCapacity = RocksDBLRUCacheMinCapacity @@ -211,7 +212,7 @@ func NewRocksMQ(params paramtable.BaseTable, name string, idAllocator allocator. readers: sync.Map{}, } - ri, err := initRetentionInfo(params, kv, db) + ri, err := initRetentionInfo(kv, db) if err != nil { return nil, err } @@ -649,6 +650,7 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni } func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes map[UniqueID]int64) error { + params := paramtable.Get() msgSizeKey := MessageSizeTitle + topicName msgSizeVal, err := rmq.kv.Load(msgSizeKey) if err != nil { @@ -664,7 +666,7 @@ func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes mutateBuffer := make(map[string]string) for _, id := range msgIDs { msgSize := msgSizes[id] - if curMsgSize+msgSize > RocksmqPageSize { + if curMsgSize+msgSize > params.RocksmqCfg.PageSize.GetAsInt64() { // Current page is full newPageSize := curMsgSize + msgSize pageEndID := id diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go index 9e167d05a9..8e6da04b13 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_impl_test.go @@ -36,11 +36,16 @@ import ( "github.com/stretchr/testify/assert" ) -var Params paramtable.BaseTable var rmqPath = "/tmp/rocksmq" var kvPathSuffix = "_kv" var metaPathSuffix = "_meta" +func TestMain(m *testing.M) { + paramtable.Init() + code := m.Run() + os.Exit(code) +} + type producerMessageBefore struct { Payload []byte } @@ -165,7 +170,7 @@ func TestRocksmq_RegisterConsumer(t *testing.T) { var params paramtable.BaseTable params.Init(0) - rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator) + rmq, err := NewRocksMQ(rocksdbPath, idAllocator) assert.NoError(t, err) defer rmq.Close() @@ -230,7 +235,7 @@ func TestRocksmq_Basic(t *testing.T) { defer os.RemoveAll(rocksdbPath) var params paramtable.BaseTable params.Init(0) - rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator) + rmq, err := NewRocksMQ(rocksdbPath, idAllocator) assert.Nil(t, err) defer rmq.Close() @@ -312,11 +317,10 @@ func TestRocksmq_MultiConsumer(t *testing.T) { rocksdbPath := rmqPath + suffix defer os.RemoveAll(rocksdbPath + kvSuffix) defer os.RemoveAll(rocksdbPath) - var params paramtable.BaseTable - params.Init(0) - atomic.StoreInt64(&RocksmqPageSize, 10) - rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator) + params := paramtable.Get() + params.Save(params.RocksmqCfg.PageSize.Key, "10") + rmq, err := NewRocksMQ(rocksdbPath, idAllocator) assert.Nil(t, err) defer rmq.Close() @@ -368,11 +372,11 @@ func TestRocksmq_Dummy(t *testing.T) { defer os.RemoveAll(rocksdbPath) var params paramtable.BaseTable params.Init(0) - rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator) + rmq, err := NewRocksMQ(rocksdbPath, idAllocator) assert.Nil(t, err) defer rmq.Close() - _, err = NewRocksMQ(params, "", idAllocator) + _, err = NewRocksMQ("", idAllocator) assert.Error(t, err) channelName := "channel_a" @@ -439,11 +443,11 @@ func TestRocksmq_Seek(t *testing.T) { var params paramtable.BaseTable params.Init(0) - rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator) + rmq, err := NewRocksMQ(rocksdbPath, idAllocator) assert.Nil(t, err) defer rmq.Close() - _, err = NewRocksMQ(params, "", idAllocator) + _, err = NewRocksMQ("", idAllocator) assert.Error(t, err) defer os.RemoveAll("_meta_kv") @@ -507,7 +511,7 @@ func TestRocksmq_Loop(t *testing.T) { defer os.RemoveAll(kvName) var params paramtable.BaseTable params.Init(0) - rmq, err := NewRocksMQ(params, name, idAllocator) + rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) defer rmq.Close() @@ -579,7 +583,7 @@ func TestRocksmq_Goroutines(t *testing.T) { defer os.RemoveAll(kvName) var params paramtable.BaseTable params.Init(0) - rmq, err := NewRocksMQ(params, name, idAllocator) + rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) defer rmq.Close() @@ -658,7 +662,7 @@ func TestRocksmq_Throughout(t *testing.T) { defer os.RemoveAll(kvName) var params paramtable.BaseTable params.Init(0) - rmq, err := NewRocksMQ(params, name, idAllocator) + rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) defer rmq.Close() @@ -723,7 +727,7 @@ func TestRocksmq_MultiChan(t *testing.T) { defer os.RemoveAll(kvName) var params paramtable.BaseTable params.Init(0) - rmq, err := NewRocksMQ(params, name, idAllocator) + rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) defer rmq.Close() @@ -777,7 +781,7 @@ func TestRocksmq_CopyData(t *testing.T) { defer os.RemoveAll(kvName) var params paramtable.BaseTable params.Init(0) - rmq, err := NewRocksMQ(params, name, idAllocator) + rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) defer rmq.Close() @@ -845,7 +849,7 @@ func TestRocksmq_SeekToLatest(t *testing.T) { defer os.RemoveAll(kvName) var params paramtable.BaseTable params.Init(0) - rmq, err := NewRocksMQ(params, name, idAllocator) + rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) defer rmq.Close() @@ -936,9 +940,7 @@ func TestRocksmq_GetLatestMsg(t *testing.T) { kvName := name + "_meta_kv" _ = os.RemoveAll(kvName) defer os.RemoveAll(kvName) - var params paramtable.BaseTable - params.Init(0) - rmq, err := NewRocksMQ(params, name, idAllocator) + rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) channelName := newChanName() @@ -1012,9 +1014,7 @@ func TestRocksmq_Close(t *testing.T) { kvName := name + "_meta_kv" _ = os.RemoveAll(kvName) defer os.RemoveAll(kvName) - var params paramtable.BaseTable - params.Init(0) - rmq, err := NewRocksMQ(params, name, idAllocator) + rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) defer rmq.Close() @@ -1046,9 +1046,7 @@ func TestRocksmq_SeekWithNoConsumerError(t *testing.T) { kvName := name + "_meta_kv" _ = os.RemoveAll(kvName) defer os.RemoveAll(kvName) - var params paramtable.BaseTable - params.Init(0) - rmq, err := NewRocksMQ(params, name, idAllocator) + rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) defer rmq.Close() @@ -1073,9 +1071,7 @@ func TestRocksmq_SeekTopicNotExistError(t *testing.T) { kvName := name + "_meta_kv" _ = os.RemoveAll(kvName) defer os.RemoveAll(kvName) - var params paramtable.BaseTable - params.Init(0) - rmq, err := NewRocksMQ(params, name, idAllocator) + rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) defer rmq.Close() @@ -1097,9 +1093,7 @@ func TestRocksmq_SeekTopicMutexError(t *testing.T) { kvName := name + "_meta_kv" _ = os.RemoveAll(kvName) defer os.RemoveAll(kvName) - var params paramtable.BaseTable - params.Init(0) - rmq, err := NewRocksMQ(params, name, idAllocator) + rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) defer rmq.Close() @@ -1122,9 +1116,7 @@ func TestRocksmq_moveConsumePosError(t *testing.T) { kvName := name + "_meta_kv" _ = os.RemoveAll(kvName) defer os.RemoveAll(kvName) - var params paramtable.BaseTable - params.Init(0) - rmq, err := NewRocksMQ(params, name, idAllocator) + rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) defer rmq.Close() @@ -1146,10 +1138,9 @@ func TestRocksmq_updateAckedInfoErr(t *testing.T) { kvName := name + "_meta_kv" _ = os.RemoveAll(kvName) defer os.RemoveAll(kvName) - var params paramtable.BaseTable - params.Init(0) - atomic.StoreInt64(&RocksmqPageSize, 10) - rmq, err := NewRocksMQ(params, name, idAllocator) + params := paramtable.Get() + params.Save(params.RocksmqCfg.PageSize.Key, "10") + rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) defer rmq.Close() @@ -1199,10 +1190,9 @@ func TestRocksmq_Info(t *testing.T) { kvName := name + "_meta_kv" _ = os.RemoveAll(kvName) defer os.RemoveAll(kvName) - var params paramtable.BaseTable - params.Init(0) - atomic.StoreInt64(&RocksmqPageSize, 10) - rmq, err := NewRocksMQ(params, name, idAllocator) + params := paramtable.Get() + params.Save(params.RocksmqCfg.PageSize.Key, "10") + rmq, err := NewRocksMQ(name, idAllocator) assert.Nil(t, err) defer rmq.Close() diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go index 75ce1a4765..d915d12962 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention.go @@ -16,7 +16,6 @@ import ( "path" "strconv" "sync" - "sync/atomic" "time" rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb" @@ -27,26 +26,11 @@ import ( "go.uber.org/zap" ) -// RocksmqRetentionTimeInMinutes is the time of retention -var RocksmqRetentionTimeInSecs int64 -var DefaultRocksmqRetentionTimeInMins int64 = 7200 - -// RocksmqRetentionSizeInMB is the size of retention -var RocksmqRetentionSizeInMB int64 -var DefaultRocksmqRetentionSizeInMB int64 = 8192 - -// RocksmqRetentionCompactionInterval is the Interval we trigger compaction, -var RocksmqRetentionCompactionInterval int64 -var DefaultRocksmqRetentionCompactionInterval int64 = 86400 - // Const value that used to convert unit const ( MB = 1024 * 1024 ) -// TickerTimeInSeconds is the time of expired check, default 10 minutes -var TickerTimeInSeconds int64 = 600 - type retentionInfo struct { // key is topic name, value is last retention time topicRetetionTime sync.Map @@ -60,11 +44,7 @@ type retentionInfo struct { closeOnce sync.Once } -func initRetentionInfo(params paramtable.BaseTable, kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInfo, error) { - rawRmqRetentionTimeInMinutes := params.ParseInt64WithDefault("rocksmq.retentionTimeInMinutes", DefaultRocksmqRetentionTimeInMins) - atomic.StoreInt64(&RocksmqRetentionTimeInSecs, rawRmqRetentionTimeInMinutes*60) - atomic.StoreInt64(&RocksmqRetentionSizeInMB, params.ParseInt64WithDefault("rocksmq.retentionSizeInMB", DefaultRocksmqRetentionSizeInMB)) - atomic.StoreInt64(&RocksmqRetentionCompactionInterval, params.ParseInt64WithDefault("rocksmq.compactionInterval", DefaultRocksmqRetentionCompactionInterval)) +func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInfo, error) { ri := &retentionInfo{ topicRetetionTime: sync.Map{}, mutex: sync.RWMutex{}, @@ -97,10 +77,11 @@ func (ri *retentionInfo) startRetentionInfo() { // retention do time ticker and trigger retention check and operation for each topic func (ri *retentionInfo) retention() error { log.Debug("Rocksmq retention goroutine start!") + params := paramtable.Get() // Do retention check every 10 mins - ticker := time.NewTicker(time.Duration(atomic.LoadInt64(&TickerTimeInSeconds) * int64(time.Second))) + ticker := time.NewTicker(params.RocksmqCfg.TickerTimeInSeconds.GetAsDuration(time.Second)) defer ticker.Stop() - compactionTicker := time.NewTicker(time.Duration(atomic.LoadInt64(&RocksmqRetentionCompactionInterval) * int64(time.Second))) + compactionTicker := time.NewTicker(params.RocksmqCfg.CompactionInterval.GetAsDuration(time.Second)) defer compactionTicker.Stop() defer ri.closeWg.Done() @@ -115,7 +96,7 @@ func (ri *retentionInfo) retention() error { go ri.kv.DB.CompactRange(gorocksdb.Range{Start: nil, Limit: nil}) case t := <-ticker.C: timeNow := t.Unix() - checkTime := atomic.LoadInt64(&RocksmqRetentionTimeInSecs) / 10 + checkTime := int64(params.RocksmqCfg.RetentionTimeInMinutes.GetAsFloat() * 60 / 10) ri.mutex.RLock() ri.topicRetetionTime.Range(func(k, v interface{}) bool { topic, _ := k.(string) @@ -377,15 +358,19 @@ func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) err } func msgTimeExpiredCheck(ackedTs int64) bool { - if RocksmqRetentionTimeInSecs < 0 { + params := paramtable.Get() + retentionSeconds := int64(params.RocksmqCfg.RetentionTimeInMinutes.GetAsFloat() * 60) + if retentionSeconds < 0 { return false } - return ackedTs+atomic.LoadInt64(&RocksmqRetentionTimeInSecs) < time.Now().Unix() + return ackedTs+retentionSeconds < time.Now().Unix() } func msgSizeExpiredCheck(deletedAckedSize, ackedSize int64) bool { - if RocksmqRetentionSizeInMB < 0 { + params := paramtable.Get() + size := params.RocksmqCfg.RetentionSizeInMB.GetAsInt64() + if size < 0 { return false } - return ackedSize-deletedAckedSize > atomic.LoadInt64(&RocksmqRetentionSizeInMB)*MB + return ackedSize-deletedAckedSize > size*MB } diff --git a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go index cf6e1b9445..049a16d22c 100644 --- a/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go +++ b/internal/mq/mqimpl/rocksmq/server/rocksmq_retention_test.go @@ -14,7 +14,6 @@ package server import ( "os" "strconv" - "sync/atomic" "testing" "time" @@ -26,11 +25,6 @@ import ( var retentionPath = "/tmp/rmq_retention/" -func TestMain(m *testing.M) { - code := m.Run() - os.Exit(code) -} - // Test write data and wait for retention func TestRmqRetention_Basic(t *testing.T) { err := os.MkdirAll(retentionPath, os.ModePerm) @@ -44,17 +38,16 @@ func TestRmqRetention_Basic(t *testing.T) { metaPath := retentionPath + metaPathSuffix defer os.RemoveAll(metaPath) - var params paramtable.BaseTable - params.Init(0) + params := paramtable.Get() + params.Init() - checkTimeInterval := 2 - atomic.StoreInt64(&RocksmqPageSize, 10) - atomic.StoreInt64(&TickerTimeInSeconds, int64(checkTimeInterval)) - rmq, err := NewRocksMQ(params, rocksdbPath, nil) + params.Save(params.RocksmqCfg.PageSize.Key, "10") + params.Save(params.RocksmqCfg.TickerTimeInSeconds.Key, "2") + rmq, err := NewRocksMQ(rocksdbPath, nil) assert.Nil(t, err) defer rmq.Close() - atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0) - atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 0) + params.Save(params.RocksmqCfg.RetentionSizeInMB.Key, "0") + params.Save(params.RocksmqCfg.RetentionTimeInMinutes.Key, "0") topicName := "topic_a" err = rmq.CreateTopic(topicName) @@ -92,7 +85,7 @@ func TestRmqRetention_Basic(t *testing.T) { assert.Equal(t, len(cMsgs), msgNum) rmq.Info() - time.Sleep(time.Duration(checkTimeInterval+1) * time.Second) + time.Sleep(time.Duration(3) * time.Second) // Seek to a previous consumed message, the message should be clean up err = rmq.ForceSeek(topicName, groupName, cMsgs[msgNum/2].MsgID) @@ -140,16 +133,17 @@ func TestRmqRetention_NotConsumed(t *testing.T) { metaPath := retentionPath + metaPathSuffix defer os.RemoveAll(metaPath) - var params paramtable.BaseTable - params.Init(0) - atomic.StoreInt64(&RocksmqPageSize, 10) - atomic.StoreInt64(&TickerTimeInSeconds, 2) - rmq, err := NewRocksMQ(params, rocksdbPath, nil) + params := paramtable.Get() + params.Init() + + params.Save(params.RocksmqCfg.PageSize.Key, "10") + params.Save(params.RocksmqCfg.TickerTimeInSeconds.Key, "2") + rmq, err := NewRocksMQ(rocksdbPath, nil) assert.Nil(t, err) defer rmq.Close() - atomic.StoreInt64(&RocksmqRetentionSizeInMB, 0) - atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 0) + params.Save(params.RocksmqCfg.RetentionSizeInMB.Key, "0") + params.Save(params.RocksmqCfg.RetentionTimeInMinutes.Key, "0") topicName := "topic_a" err = rmq.CreateTopic(topicName) @@ -248,18 +242,21 @@ func TestRmqRetention_MultipleTopic(t *testing.T) { os.RemoveAll(rocksdbPath) metaPath := retentionPath + "meta_multi_topic" os.RemoveAll(metaPath) - var params paramtable.BaseTable - atomic.StoreInt64(&RocksmqPageSize, 10) - atomic.StoreInt64(&TickerTimeInSeconds, 1) - params.Init(0) - rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator) + + params := paramtable.Get() + params.Init() + + params.Save(params.RocksmqCfg.PageSize.Key, "10") + params.Save(params.RocksmqCfg.TickerTimeInSeconds.Key, "1") + + rmq, err := NewRocksMQ(rocksdbPath, idAllocator) assert.Nil(t, err) defer rmq.Close() // no retention by size - atomic.StoreInt64(&RocksmqRetentionSizeInMB, -1) + params.Save(params.RocksmqCfg.RetentionSizeInMB.Key, "-1") // retention by secs - atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 1) + params.Save(params.RocksmqCfg.RetentionTimeInMinutes.Key, "0.017") topicName := "topic_a" err = rmq.CreateTopic(topicName) @@ -416,7 +413,7 @@ func TestRetentionInfo_InitRetentionInfo(t *testing.T) { var params paramtable.BaseTable params.Init(0) - rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator) + rmq, err := NewRocksMQ(rocksdbPath, idAllocator) assert.Nil(t, err) assert.NotNil(t, rmq) @@ -425,7 +422,7 @@ func TestRetentionInfo_InitRetentionInfo(t *testing.T) { assert.Nil(t, err) rmq.Close() - rmq, err = NewRocksMQ(params, rocksdbPath, idAllocator) + rmq, err = NewRocksMQ(rocksdbPath, idAllocator) assert.Nil(t, err) assert.NotNil(t, rmq) @@ -469,18 +466,20 @@ func TestRmqRetention_PageTimeExpire(t *testing.T) { metaPath := retentionPath + "meta_kv_com1" os.RemoveAll(metaPath) - var params paramtable.BaseTable - params.Init(0) - atomic.StoreInt64(&RocksmqPageSize, 10) - atomic.StoreInt64(&TickerTimeInSeconds, 1) - rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator) + params := paramtable.Get() + params.Init() + + params.Save(params.RocksmqCfg.PageSize.Key, "10") + params.Save(params.RocksmqCfg.TickerTimeInSeconds.Key, "1") + + rmq, err := NewRocksMQ(rocksdbPath, idAllocator) assert.Nil(t, err) defer rmq.Close() // no retention by size - atomic.StoreInt64(&RocksmqRetentionSizeInMB, -1) + params.Save(params.RocksmqCfg.RetentionSizeInMB.Key, "-1") // retention by secs - atomic.StoreInt64(&RocksmqRetentionTimeInSecs, 5) + params.Save(params.RocksmqCfg.RetentionTimeInMinutes.Key, "0.084") topicName := "topic_a" err = rmq.CreateTopic(topicName) @@ -593,17 +592,20 @@ func TestRmqRetention_PageSizeExpire(t *testing.T) { metaPath := retentionPath + "meta_kv_com2" os.RemoveAll(metaPath) - var params paramtable.BaseTable - params.Init(0) - atomic.StoreInt64(&RocksmqPageSize, 10) - atomic.StoreInt64(&TickerTimeInSeconds, 1) - rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator) + params := paramtable.Get() + params.Init() + + params.Save(params.RocksmqCfg.PageSize.Key, "10") + params.Save(params.RocksmqCfg.TickerTimeInSeconds.Key, "1") + + rmq, err := NewRocksMQ(rocksdbPath, idAllocator) assert.Nil(t, err) defer rmq.Close() - // update some configrocksmq_retentions to make cleanup trigger faster - atomic.StoreInt64(&RocksmqRetentionSizeInMB, 1) - atomic.StoreInt64(&RocksmqRetentionTimeInSecs, -1) + // no retention by size + params.Save(params.RocksmqCfg.RetentionSizeInMB.Key, "1") + // retention by secs + params.Save(params.RocksmqCfg.RetentionTimeInMinutes.Key, "-1") topicName := "topic_a" err = rmq.CreateTopic(topicName) diff --git a/internal/mq/msgstream/mq_factory_test.go b/internal/mq/msgstream/mq_factory_test.go index 3840dc84cd..b5cb748491 100644 --- a/internal/mq/msgstream/mq_factory_test.go +++ b/internal/mq/msgstream/mq_factory_test.go @@ -21,6 +21,7 @@ import ( "os" "testing" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/stretchr/testify/assert" ) @@ -78,6 +79,7 @@ func TestPmsFactoryWithAuth(t *testing.T) { func TestRmsFactory(t *testing.T) { defer os.Unsetenv("ROCKSMQ_PATH") + paramtable.Init() dir := t.TempDir() diff --git a/internal/mq/msgstream/mq_msgstream_test.go b/internal/mq/msgstream/mq_msgstream_test.go index f8436165ce..2fc9451b04 100644 --- a/internal/mq/msgstream/mq_msgstream_test.go +++ b/internal/mq/msgstream/mq_msgstream_test.go @@ -74,8 +74,8 @@ func TestMain(m *testing.M) { } func getPulsarAddress() string { - pulsarHost := Params.LoadWithDefault("pulsar.address", "") - port := Params.LoadWithDefault("pulsar.port", "") + pulsarHost := Params.GetWithDefault("pulsar.address", "") + port := Params.GetWithDefault("pulsar.port", "") if len(pulsarHost) != 0 && len(port) != 0 { return "pulsar://" + pulsarHost + ":" + port } diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go index 9a4d0bb409..2171191aae 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go @@ -53,8 +53,8 @@ func TestMain(m *testing.M) { } func getPulsarAddress() string { - pulsarHost := Params.LoadWithDefault("pulsar.address", "") - port := Params.LoadWithDefault("pulsar.port", "") + pulsarHost := Params.GetWithDefault("pulsar.address", "") + port := Params.GetWithDefault("pulsar.port", "") log.Info("pulsar address", zap.String("host", pulsarHost), zap.String("port", port)) if len(pulsarHost) != 0 && len(port) != 0 { return "pulsar://" + pulsarHost + ":" + port @@ -739,7 +739,7 @@ func TestPulsarCtl(t *testing.T) { if err != nil { panic(err) } - webport := Params.LoadWithDefault("pulsar.webport", "80") + webport := Params.GetWithDefault("pulsar.webport", "80") webServiceURL := "http://" + pulsarURL.Hostname() + ":" + webport admin, err := NewAdminClient(webServiceURL, "", "") assert.NoError(t, err) diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go index f185b99a6c..72ca3fa577 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_consumer_test.go @@ -176,7 +176,7 @@ func TestPulsarClientCloseUnsubscribeError(t *testing.T) { if err != nil { panic(err) } - webport := Params.LoadWithDefault("pulsar.webport", "80") + webport := Params.GetWithDefault("pulsar.webport", "80") webServiceURL := "http://" + pulsarURL.Hostname() + ":" + webport admin, err := NewAdminClient(webServiceURL, "", "") assert.NoError(t, err) diff --git a/internal/mq/msgstream/mqwrapper/rmq/rmq_client_test.go b/internal/mq/msgstream/mqwrapper/rmq/rmq_client_test.go index 8176d26b00..0270c8cdb2 100644 --- a/internal/mq/msgstream/mqwrapper/rmq/rmq_client_test.go +++ b/internal/mq/msgstream/mqwrapper/rmq/rmq_client_test.go @@ -34,11 +34,10 @@ import ( "github.com/stretchr/testify/assert" ) -var Params paramtable.BaseTable - func TestMain(m *testing.M) { path := "/tmp/milvus/rdb_data" defer os.RemoveAll(path) + paramtable.Init() _ = rocksmqimplserver.InitRocksMQ(path) exitCode := m.Run() defer rocksmqimplserver.CloseRocksMQ() diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index 2d4e7a3b4d..ddf9a594fd 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -559,7 +559,7 @@ func genQueryMsgStream(ctx context.Context) (msgstream.MsgStream, error) { } func genLocalChunkManager() (storage.ChunkManager, error) { - p := Params.LoadWithDefault("storage.path", "/tmp/milvus_test/data") + p := Params.GetWithDefault("storage.path", "/tmp/milvus_test/data") lcm := storage.NewLocalChunkManager(storage.RootPath(p)) return lcm, nil } @@ -576,7 +576,7 @@ func genRemoteChunkManager(ctx context.Context) (storage.ChunkManager, error) { } func genVectorChunkManager(ctx context.Context, col *Collection) (*storage.VectorChunkManager, error) { - p := Params.LoadWithDefault("storage.path", "/tmp/milvus_test/data") + p := Params.GetWithDefault("storage.path", "/tmp/milvus_test/data") lcm := storage.NewLocalChunkManager(storage.RootPath(p)) rcm, err := storage.NewMinioChunkManager( diff --git a/internal/storage/minio_chunk_manager_test.go b/internal/storage/minio_chunk_manager_test.go index 8d3d0f6522..c7b6d769d4 100644 --- a/internal/storage/minio_chunk_manager_test.go +++ b/internal/storage/minio_chunk_manager_test.go @@ -55,11 +55,11 @@ func newMinIOChunkManager(ctx context.Context, bucketName string, rootPath strin } func getMinioAddress() string { - minioHost := Params.LoadWithDefault("minio.address", paramtable.DefaultMinioHost) + minioHost := Params.GetWithDefault("minio.address", paramtable.DefaultMinioHost) if strings.Contains(minioHost, ":") { return minioHost } - port := Params.LoadWithDefault("minio.port", paramtable.DefaultMinioPort) + port := Params.GetWithDefault("minio.port", paramtable.DefaultMinioPort) return minioHost + ":" + port } diff --git a/internal/util/flowgraph/flow_graph_test.go b/internal/util/flowgraph/flow_graph_test.go index bb6d28eeed..387e20c8d2 100644 --- a/internal/util/flowgraph/flow_graph_test.go +++ b/internal/util/flowgraph/flow_graph_test.go @@ -20,9 +20,11 @@ import ( "context" "math" "math/rand" + "os" "testing" "time" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/stretchr/testify/assert" ) @@ -155,6 +157,12 @@ func createExampleFlowGraph() (*TimeTickedFlowGraph, chan float64, chan float64, return fg, inputChan, outputChan, cancel, nil } +func TestMain(m *testing.M) { + paramtable.Init() + code := m.Run() + os.Exit(code) +} + func TestTimeTickedFlowGraph_AddNode(t *testing.T) { const MaxQueueLength = 1024 inputChan := make(chan float64, MaxQueueLength) diff --git a/internal/util/paramtable/base_table.go b/internal/util/paramtable/base_table.go index 3237846cf2..81c4b413b0 100644 --- a/internal/util/paramtable/base_table.go +++ b/internal/util/paramtable/base_table.go @@ -13,7 +13,6 @@ package paramtable import ( "context" - "fmt" "os" "path" "runtime" @@ -44,15 +43,10 @@ const ( DefaultMinioCloudProvider = "aws" DefaultMinioIAMEndpoint = "" DefaultEtcdEndpoints = "localhost:2379" - DefaultInsertBufferSize = "16777216" - DefaultEnvPrefix = "milvus" DefaultLogFormat = "text" DefaultLogLevelForBase = "debug" DefaultRootPath = "" - DefaultMaxSize = 300 - DefaultMaxAge = 10 - DefaultMaxBackups = 20 ) //Const of Global Config List @@ -183,8 +177,12 @@ func (gp *BaseTable) Load(key string) (string, error) { return gp.mgr.GetConfig(key) } -// LoadWithDefault loads an object with @key. If the object does not exist, @defaultValue will be returned. -func (gp *BaseTable) LoadWithDefault(key, defaultValue string) string { +func (gp *BaseTable) Get(key string) string { + return gp.GetWithDefault(key, "") +} + +// GetWithDefault loads an object with @key. If the object does not exist, @defaultValue will be returned. +func (gp *BaseTable) GetWithDefault(key, defaultValue string) string { str, err := gp.mgr.GetConfig(key) if err != nil { return defaultValue @@ -192,14 +190,6 @@ func (gp *BaseTable) LoadWithDefault(key, defaultValue string) string { return str } -func (gp *BaseTable) Get(key string) string { - value, err := gp.mgr.GetConfig(key) - if err != nil { - return "" - } - return value -} - func (gp *BaseTable) GetConfigSubSet(pattern string) map[string]string { return gp.mgr.GetBy(config.WithPrefix(pattern), config.RemovePrefix(pattern)) } @@ -231,62 +221,17 @@ func (gp *BaseTable) Reset(key string) error { return nil } -func (gp *BaseTable) ParseBool(key string, defaultValue bool) bool { - valueStr := gp.LoadWithDefault(key, strconv.FormatBool(defaultValue)) - value, err := strconv.ParseBool(valueStr) - if err != nil { - panic(err) - } - return value -} - -func (gp *BaseTable) ParseFloatWithDefault(key string, defaultValue float64) float64 { - valueStr := gp.LoadWithDefault(key, fmt.Sprintf("%f", defaultValue)) - value, err := strconv.ParseFloat(valueStr, 64) - if err != nil { - panic(err) - } - return value -} - -func (gp *BaseTable) ParseInt64WithDefault(key string, defaultValue int64) int64 { - valueStr := gp.LoadWithDefault(key, strconv.FormatInt(defaultValue, 10)) - value, err := strconv.ParseInt(valueStr, 10, 64) - if err != nil { - panic(err) - } - return value -} - -func (gp *BaseTable) ParseInt32WithDefault(key string, defaultValue int32) int32 { - valueStr := gp.LoadWithDefault(key, strconv.FormatInt(int64(defaultValue), 10)) - value, err := strconv.ParseInt(valueStr, 10, 32) - if err != nil { - panic(err) - } - return int32(value) -} - -func (gp *BaseTable) ParseIntWithDefault(key string, defaultValue int) int { - valueStr := gp.LoadWithDefault(key, strconv.FormatInt(int64(defaultValue), 10)) - value, err := strconv.Atoi(valueStr) - if err != nil { - panic(err) - } - return value -} - // InitLogCfg init log of the base table func (gp *BaseTable) InitLogCfg() { gp.Log = log.Config{} - format := gp.LoadWithDefault("log.format", DefaultLogFormat) + format := gp.GetWithDefault("log.format", DefaultLogFormat) gp.Log.Format = format - level := gp.LoadWithDefault("log.level", DefaultLogLevelForBase) + level := gp.GetWithDefault("log.level", DefaultLogLevelForBase) gp.Log.Level = level - gp.Log.File.MaxSize = gp.ParseIntWithDefault("log.file.maxSize", DefaultMaxSize) - gp.Log.File.MaxBackups = gp.ParseIntWithDefault("log.file.maxBackups", DefaultMaxBackups) - gp.Log.File.MaxDays = gp.ParseIntWithDefault("log.file.maxAge", DefaultMaxAge) - gp.Log.File.RootPath = gp.LoadWithDefault("log.file.rootPath", DefaultRootPath) + gp.Log.File.MaxSize, _ = strconv.Atoi(gp.GetWithDefault("log.file.maxSize", "300")) + gp.Log.File.MaxBackups, _ = strconv.Atoi(gp.GetWithDefault("log.file.maxBackups", "10")) + gp.Log.File.MaxDays, _ = strconv.Atoi(gp.GetWithDefault("log.file.maxAge", "20")) + gp.Log.File.RootPath = gp.GetWithDefault("log.file.rootPath", DefaultRootPath) grpclog, err := gp.Load("grpc.log.level") if err != nil { diff --git a/internal/util/paramtable/base_table_test.go b/internal/util/paramtable/base_table_test.go index bc78e393ab..1c370fde12 100644 --- a/internal/util/paramtable/base_table_test.go +++ b/internal/util/paramtable/base_table_test.go @@ -118,35 +118,6 @@ func TestBaseTable_Pulsar(t *testing.T) { assert.NotEqual(t, "", port) } -// func TestBaseTable_ConfDir(t *testing.T) { -// rightConfig := baseParams.configDir -// // fake dir -// baseParams.configDir = "./" - -// assert.Panics(t, func() { baseParams.loadFromYaml(defaultYaml) }) - -// baseParams.configDir = rightConfig -// baseParams.loadFromYaml(defaultYaml) -// baseParams.GlobalInitWithYaml(defaultYaml) -// } - -// func TestBateTable_ConfPath(t *testing.T) { -// os.Setenv("MILVUSCONF", "test") -// config := baseParams.initConfPath() -// assert.Equal(t, config, "test") - -// os.Unsetenv("MILVUSCONF") -// dir, _ := os.Getwd() -// config = baseParams.initConfPath() -// assert.Equal(t, filepath.Clean(config), filepath.Clean(dir+"/../../../configs/")) - -// // test use get dir -// os.Chdir(dir + "/../../../") -// defer os.Chdir(dir) -// config = baseParams.initConfPath() -// assert.Equal(t, filepath.Clean(config), filepath.Clean(dir+"/../../../configs/")) -// } - func TestBaseTable_Env(t *testing.T) { t.Setenv("milvus.test", "test") t.Setenv("milvus.test.test2", "test2") @@ -165,47 +136,6 @@ func TestBaseTable_Env(t *testing.T) { assert.Equal(t, result, "xxx=test") } -func TestBaseTable_Parse(t *testing.T) { - t.Run("ParseBool", func(t *testing.T) { - assert.Nil(t, baseParams.Save("key", "true")) - assert.True(t, baseParams.ParseBool("key", false)) - assert.False(t, baseParams.ParseBool("not_exist_key", false)) - - assert.Nil(t, baseParams.Save("key", "rand")) - assert.Panics(t, func() { baseParams.ParseBool("key", false) }) - }) - - t.Run("ParseFloatWithDefault", func(t *testing.T) { - baseParams.Remove("key") - assert.Equal(t, float64(0.0), baseParams.ParseFloatWithDefault("key", 0.0)) - assert.Equal(t, float64(3.14), baseParams.ParseFloatWithDefault("key", 3.14)) - - assert.Nil(t, baseParams.Save("key", "2")) - assert.Equal(t, float64(2.0), baseParams.ParseFloatWithDefault("key", 3.14)) - }) - - t.Run("ParseInt32WithDefault", func(t *testing.T) { - baseParams.Remove("key") - assert.Equal(t, int32(1), baseParams.ParseInt32WithDefault("key", 1)) - assert.Nil(t, baseParams.Save("key", "2")) - assert.Equal(t, int32(2), baseParams.ParseInt32WithDefault("key", 1)) - }) - - t.Run("ParseInt64WithDefault", func(t *testing.T) { - baseParams.Remove("key") - assert.Equal(t, int64(1), baseParams.ParseInt64WithDefault("key", 1)) - assert.Nil(t, baseParams.Save("key", "2")) - assert.Equal(t, int64(2), baseParams.ParseInt64WithDefault("key", 1)) - }) - - t.Run("ParseIntWithDefault", func(t *testing.T) { - baseParams.Remove("key") - assert.Equal(t, int(1), baseParams.ParseIntWithDefault("key", 1)) - assert.Nil(t, baseParams.Save("key", "2")) - assert.Equal(t, int(2), baseParams.ParseIntWithDefault("key", 1)) - }) -} - func TestNewBaseTableFromYamlOnly(t *testing.T) { var yaml string var gp *BaseTable diff --git a/internal/util/paramtable/service_param.go b/internal/util/paramtable/service_param.go index b5ce10e66d..491d2d8d7b 100644 --- a/internal/util/paramtable/service_param.go +++ b/internal/util/paramtable/service_param.go @@ -489,16 +489,67 @@ func (k *KafkaConfig) Init(base *BaseTable) { // ///////////////////////////////////////////////////////////////////////////// // --- rocksmq --- type RocksmqConfig struct { - Path ParamItem `refreshable:"false"` + Path ParamItem `refreshable:"false"` + LRUCacheRatio ParamItem `refreshable:"false"` + PageSize ParamItem `refreshable:"false"` + // RetentionTimeInMinutes is the time of retention + RetentionTimeInMinutes ParamItem `refreshable:"false"` + // RetentionSizeInMB is the size of retention + RetentionSizeInMB ParamItem `refreshable:"false"` + // CompactionInterval is the Interval we trigger compaction, + CompactionInterval ParamItem `refreshable:"false"` + // TickerTimeInSeconds is the time of expired check, default 10 minutes + TickerTimeInSeconds ParamItem `refreshable:"false"` } func (r *RocksmqConfig) Init(base *BaseTable) { r.Path = ParamItem{ - Key: "rocksmq.path", - DefaultValue: "", - Version: "2.0.0", + Key: "rocksmq.path", + Version: "2.0.0", } r.Path.Init(base.mgr) + + r.LRUCacheRatio = ParamItem{ + Key: "rocksmq.lrucacheratio", + DefaultValue: "0.0.6", + Version: "2.0.0", + } + r.LRUCacheRatio.Init(base.mgr) + + r.PageSize = ParamItem{ + Key: "rocksmq.rocksmqPageSize", + DefaultValue: strconv.FormatInt(256<<20, 10), + Version: "2.0.0", + } + r.PageSize.Init(base.mgr) + + r.RetentionTimeInMinutes = ParamItem{ + Key: "rocksmq.retentionTimeInMinutes", + DefaultValue: "7200", + Version: "2.0.0", + } + r.RetentionTimeInMinutes.Init(base.mgr) + + r.RetentionSizeInMB = ParamItem{ + Key: "rocksmq.retentionSizeInMB", + DefaultValue: "7200", + Version: "2.0.0", + } + r.RetentionSizeInMB.Init(base.mgr) + + r.CompactionInterval = ParamItem{ + Key: "rocksmq.compactionInterval", + DefaultValue: "86400", + Version: "2.0.0", + } + r.CompactionInterval.Init(base.mgr) + + r.TickerTimeInSeconds = ParamItem{ + Key: "rocksmq.timtickerInterval", + DefaultValue: "600", + Version: "2.2.2", + } + r.TickerTimeInSeconds.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index 4ee357cf56..1ade1f4067 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -38,7 +38,7 @@ func TestGetServerIDConcurrently(t *testing.T) { paramtable.Init() params := paramtable.Get() - endpoints := params.LoadWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) + endpoints := params.GetWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) etcdEndpoints := strings.Split(endpoints, ",") @@ -81,7 +81,7 @@ func TestInit(t *testing.T) { paramtable.Init() params := paramtable.Get() - endpoints := params.LoadWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) + endpoints := params.GetWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) etcdEndpoints := strings.Split(endpoints, ",") @@ -109,7 +109,7 @@ func TestUpdateSessions(t *testing.T) { paramtable.Init() params := paramtable.Get() - endpoints := params.LoadWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) + endpoints := params.GetWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) etcdEndpoints := strings.Split(endpoints, ",") metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints) @@ -224,7 +224,7 @@ func TestWatcherHandleWatchResp(t *testing.T) { paramtable.Init() params := paramtable.Get() - endpoints := params.LoadWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) + endpoints := params.GetWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) etcdEndpoints := strings.Split(endpoints, ",") metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) @@ -375,7 +375,7 @@ func TestSessionRevoke(t *testing.T) { paramtable.Init() params := paramtable.Get() - endpoints := params.LoadWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) + endpoints := params.GetWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints) metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) etcdEndpoints := strings.Split(endpoints, ",")