From 1f337891e1fb6aedac343f8f711e78745faf91c6 Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Tue, 8 Feb 2022 20:57:47 +0800 Subject: [PATCH] Rename base_param to service_param, rename global_param to component_param (#15463) Signed-off-by: yudong.cai --- cmd/roles/roles.go | 2 +- internal/allocator/global_id_test.go | 2 +- internal/datacoord/server.go | 2 +- internal/datanode/data_node.go | 2 +- .../indexnode/client/client_test.go | 2 +- .../distributed/indexnode/service_test.go | 2 +- internal/indexcoord/index_coord.go | 2 +- internal/indexnode/indexnode.go | 2 +- internal/kv/etcd/embed_etcd_config_test.go | 2 +- internal/kv/etcd/embed_etcd_kv_test.go | 2 +- internal/kv/etcd/embed_etcd_restart_test.go | 2 +- internal/kv/etcd/etcd_kv_test.go | 2 +- internal/proxy/proxy.go | 2 +- internal/querycoord/query_coord.go | 2 +- internal/querynode/query_node.go | 2 +- internal/rootcoord/root_coord.go | 2 +- internal/util/etcd/etcd_util_test.go | 2 +- internal/util/paramtable/base_table.go | 26 -- .../{global_param.go => component_param.go} | 408 +++++------------- ..._param_test.go => component_param_test.go} | 107 +---- internal/util/paramtable/grpc_param.go | 211 +++++++++ internal/util/paramtable/grpc_param_test.go | 81 ++++ .../{base_param.go => service_param.go} | 9 +- ...se_param_test.go => service_param_test.go} | 14 +- 24 files changed, 453 insertions(+), 437 deletions(-) rename internal/util/paramtable/{global_param.go => component_param.go} (61%) rename internal/util/paramtable/{global_param_test.go => component_param_test.go} (73%) create mode 100644 internal/util/paramtable/grpc_param.go create mode 100644 internal/util/paramtable/grpc_param_test.go rename internal/util/paramtable/{base_param.go => service_param.go} (96%) rename internal/util/paramtable/{base_param_test.go => service_param_test.go} (91%) diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index e5856e1aa4..61b1dcb883 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -52,7 +52,7 @@ import ( "github.com/milvus-io/milvus/internal/util/typeutil" ) -var Params paramtable.GlobalParamTable +var Params paramtable.ComponentParam func newMsgFactory(localMsg bool) msgstream.Factory { if localMsg { diff --git a/internal/allocator/global_id_test.go b/internal/allocator/global_id_test.go index a5afa01d7d..4ddeb80df8 100644 --- a/internal/allocator/global_id_test.go +++ b/internal/allocator/global_id_test.go @@ -27,7 +27,7 @@ import ( var gTestIDAllocator *GlobalIDAllocator -var Params paramtable.GlobalParamTable +var Params paramtable.ComponentParam func TestGlobalTSOAllocator_All(t *testing.T) { Params.Init() diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index fdeb6783f2..c9f96d8c18 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -89,7 +89,7 @@ type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdCli // makes sure Server implements `DataCoord` var _ types.DataCoord = (*Server)(nil) -var Params paramtable.GlobalParamTable +var Params paramtable.ComponentParam // Server implements `types.Datacoord` // handles Data Cooridinator related jobs diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index aee6aad567..3e44dd4290 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -78,7 +78,7 @@ const illegalRequestErrStr = "Illegal request" var _ types.DataNode = (*DataNode)(nil) // Params from config.yaml -var Params paramtable.GlobalParamTable +var Params paramtable.ComponentParam // DataNode communicates with outside services and unioun all // services in datanode package. diff --git a/internal/distributed/indexnode/client/client_test.go b/internal/distributed/indexnode/client/client_test.go index 7204dc54fd..f478842c40 100644 --- a/internal/distributed/indexnode/client/client_test.go +++ b/internal/distributed/indexnode/client/client_test.go @@ -35,7 +35,7 @@ import ( "google.golang.org/grpc" ) -var ParamsGlobal paramtable.GlobalParamTable +var ParamsGlobal paramtable.ComponentParam func Test_NewClient(t *testing.T) { ClientParams.InitOnce(typeutil.IndexNodeRole) diff --git a/internal/distributed/indexnode/service_test.go b/internal/distributed/indexnode/service_test.go index 346776ca29..8752b831d1 100644 --- a/internal/distributed/indexnode/service_test.go +++ b/internal/distributed/indexnode/service_test.go @@ -30,7 +30,7 @@ import ( "github.com/stretchr/testify/assert" ) -var ParamsGlobal paramtable.GlobalParamTable +var ParamsGlobal paramtable.ComponentParam func TestIndexNodeServer(t *testing.T) { ctx := context.Background() diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index 8825860772..1b64af275d 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -60,7 +60,7 @@ const ( indexSizeFactor = 6 ) -var Params paramtable.GlobalParamTable +var Params paramtable.ComponentParam // IndexCoord is a component responsible for scheduling index construction tasks and maintaining index status. // IndexCoord accepts requests from rootcoord to build indexes, delete indexes, and query index information. diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 7986643c13..3db52ab06b 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -68,7 +68,7 @@ var _ types.IndexNode = (*IndexNode)(nil) var _ types.IndexNodeComponent = (*IndexNode)(nil) // Params is a GlobalParamTable singleton of indexnode -var Params paramtable.GlobalParamTable +var Params paramtable.ComponentParam // IndexNode is a component that executes the task of building indexes. type IndexNode struct { diff --git a/internal/kv/etcd/embed_etcd_config_test.go b/internal/kv/etcd/embed_etcd_config_test.go index 9d4033314b..3bfd8797ed 100644 --- a/internal/kv/etcd/embed_etcd_config_test.go +++ b/internal/kv/etcd/embed_etcd_config_test.go @@ -30,7 +30,7 @@ import ( func TestEtcdConfigLoad(te *testing.T) { os.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.StandaloneDeployMode) - param := new(paramtable.BaseParamTable) + param := new(paramtable.ServiceParam) param.Init() param.BaseTable.Save("etcd.use.embed", "true") // TODO, not sure if the relative path works for ci environment diff --git a/internal/kv/etcd/embed_etcd_kv_test.go b/internal/kv/etcd/embed_etcd_kv_test.go index e0ea7d12db..2aae7c399e 100644 --- a/internal/kv/etcd/embed_etcd_kv_test.go +++ b/internal/kv/etcd/embed_etcd_kv_test.go @@ -31,7 +31,7 @@ import ( func TestEmbedEtcd(te *testing.T) { os.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.StandaloneDeployMode) - param := new(paramtable.BaseParamTable) + param := new(paramtable.ServiceParam) param.Init() param.BaseTable.Save("etcd.use.embed", "true") param.BaseTable.Save("etcd.config.path", "../../../configs/advanced/etcd.yaml") diff --git a/internal/kv/etcd/embed_etcd_restart_test.go b/internal/kv/etcd/embed_etcd_restart_test.go index 5be728dbc3..fcec498455 100644 --- a/internal/kv/etcd/embed_etcd_restart_test.go +++ b/internal/kv/etcd/embed_etcd_restart_test.go @@ -30,7 +30,7 @@ import ( func TestEtcdRestartLoad(te *testing.T) { os.Setenv(metricsinfo.DeployModeEnvKey, metricsinfo.StandaloneDeployMode) - param := new(paramtable.BaseParamTable) + param := new(paramtable.ServiceParam) param.Init() param.BaseTable.Save("etcd.use.embed", "true") // TODO, not sure if the relative path works for ci environment diff --git a/internal/kv/etcd/etcd_kv_test.go b/internal/kv/etcd/etcd_kv_test.go index 2b5937fe3d..1f55581eac 100644 --- a/internal/kv/etcd/etcd_kv_test.go +++ b/internal/kv/etcd/etcd_kv_test.go @@ -29,7 +29,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) -var Params paramtable.GlobalParamTable +var Params paramtable.ComponentParam func TestMain(m *testing.M) { Params.Init() diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 2d7b5d81d6..04a46e5a99 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -54,7 +54,7 @@ type Timestamp = typeutil.Timestamp // make sure Proxy implements types.Proxy var _ types.Proxy = (*Proxy)(nil) -var Params paramtable.GlobalParamTable +var Params paramtable.ComponentParam // Proxy of milvus type Proxy struct { diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 0741733e4d..f5cce1eb2d 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -64,7 +64,7 @@ type queryChannelInfo struct { } // Params is param table of query coordinator -var Params paramtable.GlobalParamTable +var Params paramtable.ComponentParam // QueryCoord is the coordinator of queryNodes type QueryCoord struct { diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index b4dd066f10..1e92175215 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -65,7 +65,7 @@ var _ types.QueryNode = (*QueryNode)(nil) // make sure QueryNode implements types.QueryNodeComponent var _ types.QueryNodeComponent = (*QueryNode)(nil) -var Params paramtable.GlobalParamTable +var Params paramtable.ComponentParam // QueryNode communicates with outside services and union all // services in querynode package. diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index f62b6e2705..e873115b32 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -79,7 +79,7 @@ func metricProxy(v int64) string { return fmt.Sprintf("client_%d", v) } -var Params paramtable.GlobalParamTable +var Params paramtable.ComponentParam // Core root coordinator core type Core struct { diff --git a/internal/util/etcd/etcd_util_test.go b/internal/util/etcd/etcd_util_test.go index 4671431d52..9162cb5d38 100644 --- a/internal/util/etcd/etcd_util_test.go +++ b/internal/util/etcd/etcd_util_test.go @@ -26,7 +26,7 @@ import ( "github.com/stretchr/testify/assert" ) -var Params paramtable.BaseParamTable +var Params paramtable.ServiceParam func TestEtcd(t *testing.T) { Params.Init() diff --git a/internal/util/paramtable/base_table.go b/internal/util/paramtable/base_table.go index 9d40bb1a8e..5fb6adb4da 100644 --- a/internal/util/paramtable/base_table.go +++ b/internal/util/paramtable/base_table.go @@ -64,10 +64,6 @@ func (gp *BaseTable) Init() { gp.configDir = gp.initConfPath() log.Debug("config directory", zap.String("configDir", gp.configDir)) - gp.loadFromCommonYaml() - - gp.loadFromComponentYaml() - gp.loadFromMilvusYaml() gp.tryloadFromEnv() @@ -115,28 +111,6 @@ func (gp *BaseTable) loadFromMilvusYaml() { } } -func (gp *BaseTable) loadFromComponentYaml() bool { - configFile := gp.configDir + "advanced/component.yaml" - if _, err := os.Stat(configFile); err == nil { - if err := gp.LoadYaml("advanced/component.yaml"); err != nil { - panic(err) - } - return true - } - return false -} - -func (gp *BaseTable) loadFromCommonYaml() bool { - configFile := gp.configDir + "advanced/common.yaml" - if _, err := os.Stat(configFile); err == nil { - if err := gp.LoadYaml("advanced/common.yaml"); err != nil { - panic(err) - } - return true - } - return false -} - func (gp *BaseTable) tryloadFromEnv() { var err error minioAddress := os.Getenv("MINIO_ADDRESS") diff --git a/internal/util/paramtable/global_param.go b/internal/util/paramtable/component_param.go similarity index 61% rename from internal/util/paramtable/global_param.go rename to internal/util/paramtable/component_param.go index ebfa4848c2..74574af39f 100644 --- a/internal/util/paramtable/global_param.go +++ b/internal/util/paramtable/component_param.go @@ -20,35 +20,18 @@ import ( "sync" "time" - "github.com/go-basic/ipv4" "github.com/milvus-io/milvus/internal/log" "go.uber.org/zap" ) const ( - // DefaultServerMaxSendSize defines the maximum size of data per grpc request can send by server side. - DefaultServerMaxSendSize = math.MaxInt32 - - // DefaultServerMaxRecvSize defines the maximum size of data per grpc request can receive by server side. - DefaultServerMaxRecvSize = math.MaxInt32 - - // DefaultClientMaxSendSize defines the maximum size of data per grpc request can send by client side. - DefaultClientMaxSendSize = 100 * 1024 * 1024 - - // DefaultClientMaxRecvSize defines the maximum size of data per grpc request can receive by client side. - DefaultClientMaxRecvSize = 100 * 1024 * 1024 - - // SuggestPulsarMaxMessageSize defines the maximum size of Pulsar message. - SuggestPulsarMaxMessageSize = 5 * 1024 * 1024 - // DefaultRetentionDuration defines the default duration for retention which is 5 days in seconds. DefaultRetentionDuration = 3600 * 24 * 5 ) -// GlobalParamTable is a derived struct of BaseParamTable. -// It is used to quickly and easily access global system configuration. -type GlobalParamTable struct { - BaseParamTable +// ComponentParam is used to quickly and easily access all system configuration. +type ComponentParam struct { + ServiceParam once sync.Once CommonCfg commonConfig @@ -66,32 +49,32 @@ type GlobalParamTable struct { } // InitOnce initialize once -func (p *GlobalParamTable) InitOnce() { +func (p *ComponentParam) InitOnce() { p.once.Do(func() { p.Init() }) } // Init initialize the global param table -func (p *GlobalParamTable) Init() { - p.BaseParamTable.Init() +func (p *ComponentParam) Init() { + p.ServiceParam.Init() - p.CommonCfg.init(&p.BaseParamTable) - p.KnowhereCfg.init(&p.BaseParamTable) - p.MsgChannelCfg.init(&p.BaseParamTable) + p.CommonCfg.init(&p.BaseTable) + p.KnowhereCfg.init(&p.BaseTable) + p.MsgChannelCfg.init(&p.BaseTable) - p.RootCoordCfg.init(&p.BaseParamTable) - p.ProxyCfg.init(&p.BaseParamTable) - p.QueryCoordCfg.init(&p.BaseParamTable) - p.QueryNodeCfg.init(&p.BaseParamTable) - p.DataCoordCfg.init(&p.BaseParamTable) - p.DataNodeCfg.init(&p.BaseParamTable) - p.IndexCoordCfg.init(&p.BaseParamTable) - p.IndexNodeCfg.init(&p.BaseParamTable) + p.RootCoordCfg.init(&p.BaseTable) + p.ProxyCfg.init(&p.BaseTable) + p.QueryCoordCfg.init(&p.BaseTable) + p.QueryNodeCfg.init(&p.BaseTable) + p.DataCoordCfg.init(&p.BaseTable) + p.DataNodeCfg.init(&p.BaseTable) + p.IndexCoordCfg.init(&p.BaseTable) + p.IndexNodeCfg.init(&p.BaseTable) } // SetLogConfig set log config with given role -func (p *GlobalParamTable) SetLogConfig(role string) { +func (p *ComponentParam) SetLogConfig(role string) { p.BaseTable.RoleName = role p.BaseTable.SetLogConfig() } @@ -99,15 +82,15 @@ func (p *GlobalParamTable) SetLogConfig(role string) { /////////////////////////////////////////////////////////////////////////////// // --- common --- type commonConfig struct { - BaseParams *BaseParamTable + Base *BaseTable DefaultPartitionName string DefaultIndexName string RetentionDuration int64 } -func (p *commonConfig) init(bp *BaseParamTable) { - p.BaseParams = bp +func (p *commonConfig) init(base *BaseTable) { + p.Base = base p.initDefaultPartitionName() p.initDefaultIndexName() @@ -115,39 +98,39 @@ func (p *commonConfig) init(bp *BaseParamTable) { } func (p *commonConfig) initDefaultPartitionName() { - p.DefaultPartitionName = p.BaseParams.LoadWithDefault("common.defaultPartitionName", "_default") + p.DefaultPartitionName = p.Base.LoadWithDefault("common.defaultPartitionName", "_default") } func (p *commonConfig) initDefaultIndexName() { - p.DefaultIndexName = p.BaseParams.LoadWithDefault("common.defaultIndexName", "_default_idx") + p.DefaultIndexName = p.Base.LoadWithDefault("common.defaultIndexName", "_default_idx") } func (p *commonConfig) initRetentionDuration() { - p.RetentionDuration = p.BaseParams.ParseInt64WithDefault("common.retentionDuration", DefaultRetentionDuration) + p.RetentionDuration = p.Base.ParseInt64WithDefault("common.retentionDuration", DefaultRetentionDuration) } /////////////////////////////////////////////////////////////////////////////// // --- knowhere --- type knowhereConfig struct { - BaseParams *BaseParamTable + Base *BaseTable SimdType string } -func (p *knowhereConfig) init(bp *BaseParamTable) { - p.BaseParams = bp +func (p *knowhereConfig) init(base *BaseTable) { + p.Base = base p.initSimdType() } func (p *knowhereConfig) initSimdType() { - p.SimdType = p.BaseParams.LoadWithDefault("knowhere.simdType", "auto") + p.SimdType = p.Base.LoadWithDefault("knowhere.simdType", "auto") } /////////////////////////////////////////////////////////////////////////////// // --- msgChannel --- type msgChannelConfig struct { - BaseParams *BaseParamTable + Base *BaseTable ClusterPrefix string @@ -168,8 +151,8 @@ type msgChannelConfig struct { DataCoordSubName string } -func (p *msgChannelConfig) init(bp *BaseParamTable) { - p.BaseParams = bp +func (p *msgChannelConfig) init(base *BaseTable) { + p.Base = base // must init cluster prefix first p.initClusterPrefix() @@ -192,7 +175,7 @@ func (p *msgChannelConfig) init(bp *BaseParamTable) { } func (p *msgChannelConfig) initClusterPrefix() { - str, err := p.BaseParams.Load("msgChannel.chanNamePrefix.cluster") + str, err := p.Base.Load("msgChannel.chanNamePrefix.cluster") if err != nil { panic(err) } @@ -200,7 +183,7 @@ func (p *msgChannelConfig) initClusterPrefix() { } func (p *msgChannelConfig) initChanNamePrefix(cfg string) string { - value, err := p.BaseParams.Load(cfg) + value, err := p.Base.Load(cfg) if err != nil { panic(err) } @@ -267,7 +250,7 @@ func (p *msgChannelConfig) initDataCoordSubName() { /////////////////////////////////////////////////////////////////////////////// // --- rootcoord --- type rootCoordConfig struct { - BaseParams *BaseParamTable + Base *BaseTable Address string Port int @@ -280,8 +263,8 @@ type rootCoordConfig struct { UpdatedTime time.Time } -func (p *rootCoordConfig) init(bp *BaseParamTable) { - p.BaseParams = bp +func (p *rootCoordConfig) init(base *BaseTable) { + p.Base = base p.initDmlChannelNum() p.initMaxPartitionNum() @@ -289,21 +272,21 @@ func (p *rootCoordConfig) init(bp *BaseParamTable) { } func (p *rootCoordConfig) initDmlChannelNum() { - p.DmlChannelNum = p.BaseParams.ParseInt64WithDefault("rootCoord.dmlChannelNum", 256) + p.DmlChannelNum = p.Base.ParseInt64WithDefault("rootCoord.dmlChannelNum", 256) } func (p *rootCoordConfig) initMaxPartitionNum() { - p.MaxPartitionNum = p.BaseParams.ParseInt64WithDefault("rootCoord.maxPartitionNum", 4096) + p.MaxPartitionNum = p.Base.ParseInt64WithDefault("rootCoord.maxPartitionNum", 4096) } func (p *rootCoordConfig) initMinSegmentSizeToEnableIndex() { - p.MinSegmentSizeToEnableIndex = p.BaseParams.ParseInt64WithDefault("rootCoord.minSegmentSizeToEnableIndex", 1024) + p.MinSegmentSizeToEnableIndex = p.Base.ParseInt64WithDefault("rootCoord.minSegmentSizeToEnableIndex", 1024) } /////////////////////////////////////////////////////////////////////////////// // --- proxy --- type proxyConfig struct { - BaseParams *BaseParamTable + Base *BaseTable // NetworkPort & IP are not used NetworkPort int @@ -335,8 +318,8 @@ type proxyConfig struct { UpdatedTime time.Time } -func (p *proxyConfig) init(bp *BaseParamTable) { - p.BaseParams = bp +func (p *proxyConfig) init(base *BaseTable) { + p.Base = base p.initTimeTickInterval() @@ -364,16 +347,16 @@ func (p *proxyConfig) InitAlias(alias string) { } func (p *proxyConfig) initTimeTickInterval() { - interval := p.BaseParams.ParseIntWithDefault("proxy.timeTickInterval", 200) + interval := p.Base.ParseIntWithDefault("proxy.timeTickInterval", 200) p.TimeTickInterval = time.Duration(interval) * time.Millisecond } func (p *proxyConfig) initProxySubName() { - cluster, err := p.BaseParams.Load("msgChannel.chanNamePrefix.cluster") + cluster, err := p.Base.Load("msgChannel.chanNamePrefix.cluster") if err != nil { panic(err) } - subname, err := p.BaseParams.Load("msgChannel.subNamePrefix.proxySubNamePrefix") + subname, err := p.Base.Load("msgChannel.subNamePrefix.proxySubNamePrefix") if err != nil { panic(err) } @@ -382,11 +365,11 @@ func (p *proxyConfig) initProxySubName() { } func (p *proxyConfig) initMsgStreamTimeTickBufSize() { - p.MsgStreamTimeTickBufSize = p.BaseParams.ParseInt64WithDefault("proxy.msgStream.timeTick.bufSize", 512) + p.MsgStreamTimeTickBufSize = p.Base.ParseInt64WithDefault("proxy.msgStream.timeTick.bufSize", 512) } func (p *proxyConfig) initMaxNameLength() { - str := p.BaseParams.LoadWithDefault("proxy.maxNameLength", "255") + str := p.Base.LoadWithDefault("proxy.maxNameLength", "255") maxNameLength, err := strconv.ParseInt(str, 10, 64) if err != nil { panic(err) @@ -395,7 +378,7 @@ func (p *proxyConfig) initMaxNameLength() { } func (p *proxyConfig) initMaxShardNum() { - str := p.BaseParams.LoadWithDefault("proxy.maxShardNum", "256") + str := p.Base.LoadWithDefault("proxy.maxShardNum", "256") maxShardNum, err := strconv.ParseInt(str, 10, 64) if err != nil { panic(err) @@ -404,7 +387,7 @@ func (p *proxyConfig) initMaxShardNum() { } func (p *proxyConfig) initMaxFieldNum() { - str := p.BaseParams.LoadWithDefault("proxy.maxFieldNum", "64") + str := p.Base.LoadWithDefault("proxy.maxFieldNum", "64") maxFieldNum, err := strconv.ParseInt(str, 10, 64) if err != nil { panic(err) @@ -413,7 +396,7 @@ func (p *proxyConfig) initMaxFieldNum() { } func (p *proxyConfig) initMaxDimension() { - str := p.BaseParams.LoadWithDefault("proxy.maxDimension", "32768") + str := p.Base.LoadWithDefault("proxy.maxDimension", "32768") maxDimension, err := strconv.ParseInt(str, 10, 64) if err != nil { panic(err) @@ -422,23 +405,23 @@ func (p *proxyConfig) initMaxDimension() { } func (p *proxyConfig) initMaxTaskNum() { - p.MaxTaskNum = p.BaseParams.ParseInt64WithDefault("proxy.maxTaskNum", 1024) + p.MaxTaskNum = p.Base.ParseInt64WithDefault("proxy.maxTaskNum", 1024) } func (p *proxyConfig) initBufFlagExpireTime() { - expireTime := p.BaseParams.ParseInt64WithDefault("proxy.bufFlagExpireTime", 3600) + expireTime := p.Base.ParseInt64WithDefault("proxy.bufFlagExpireTime", 3600) p.BufFlagExpireTime = time.Duration(expireTime) * time.Second } func (p *proxyConfig) initBufFlagCleanupInterval() { - interval := p.BaseParams.ParseInt64WithDefault("proxy.bufFlagCleanupInterval", 600) + interval := p.Base.ParseInt64WithDefault("proxy.bufFlagCleanupInterval", 600) p.BufFlagCleanupInterval = time.Duration(interval) * time.Second } /////////////////////////////////////////////////////////////////////////////// // --- querycoord --- type queryCoordConfig struct { - BaseParams *BaseParamTable + Base *BaseTable NodeID uint64 @@ -459,8 +442,8 @@ type queryCoordConfig struct { MemoryUsageMaxDifferencePercentage float64 } -func (p *queryCoordConfig) init(bp *BaseParamTable) { - p.BaseParams = bp +func (p *queryCoordConfig) init(base *BaseTable) { + p.Base = base //---- Handoff --- p.initAutoHandoff() @@ -473,7 +456,7 @@ func (p *queryCoordConfig) init(bp *BaseParamTable) { } func (p *queryCoordConfig) initAutoHandoff() { - handoff, err := p.BaseParams.Load("queryCoord.autoHandoff") + handoff, err := p.Base.Load("queryCoord.autoHandoff") if err != nil { panic(err) } @@ -484,7 +467,7 @@ func (p *queryCoordConfig) initAutoHandoff() { } func (p *queryCoordConfig) initAutoBalance() { - balanceStr := p.BaseParams.LoadWithDefault("queryCoord.autoBalance", "false") + balanceStr := p.Base.LoadWithDefault("queryCoord.autoBalance", "false") autoBalance, err := strconv.ParseBool(balanceStr) if err != nil { panic(err) @@ -493,7 +476,7 @@ func (p *queryCoordConfig) initAutoBalance() { } func (p *queryCoordConfig) initOverloadedMemoryThresholdPercentage() { - overloadedMemoryThresholdPercentage := p.BaseParams.LoadWithDefault("queryCoord.overloadedMemoryThresholdPercentage", "90") + overloadedMemoryThresholdPercentage := p.Base.LoadWithDefault("queryCoord.overloadedMemoryThresholdPercentage", "90") thresholdPercentage, err := strconv.ParseInt(overloadedMemoryThresholdPercentage, 10, 64) if err != nil { panic(err) @@ -502,7 +485,7 @@ func (p *queryCoordConfig) initOverloadedMemoryThresholdPercentage() { } func (p *queryCoordConfig) initBalanceIntervalSeconds() { - balanceInterval := p.BaseParams.LoadWithDefault("queryCoord.balanceIntervalSeconds", "60") + balanceInterval := p.Base.LoadWithDefault("queryCoord.balanceIntervalSeconds", "60") interval, err := strconv.ParseInt(balanceInterval, 10, 64) if err != nil { panic(err) @@ -511,7 +494,7 @@ func (p *queryCoordConfig) initBalanceIntervalSeconds() { } func (p *queryCoordConfig) initMemoryUsageMaxDifferencePercentage() { - maxDiff := p.BaseParams.LoadWithDefault("queryCoord.memoryUsageMaxDifferencePercentage", "30") + maxDiff := p.Base.LoadWithDefault("queryCoord.memoryUsageMaxDifferencePercentage", "30") diffPercentage, err := strconv.ParseInt(maxDiff, 10, 64) if err != nil { panic(err) @@ -522,7 +505,7 @@ func (p *queryCoordConfig) initMemoryUsageMaxDifferencePercentage() { /////////////////////////////////////////////////////////////////////////////// // --- querynode --- type queryNodeConfig struct { - BaseParams *BaseParamTable + Base *BaseTable Alias string QueryNodeIP string @@ -567,8 +550,8 @@ type queryNodeConfig struct { OverloadedMemoryThresholdPercentage float64 } -func (p *queryNodeConfig) init(bp *BaseParamTable) { - p.BaseParams = bp +func (p *queryNodeConfig) init(base *BaseTable) { + p.Base = base p.initCacheSize() p.initGracefulTime() @@ -608,7 +591,7 @@ func (p *queryNodeConfig) initCacheSize() { var err error cacheSize := os.Getenv("CACHE_SIZE") if cacheSize == "" { - cacheSize, err = p.BaseParams.Load("queryNode.cacheSize") + cacheSize, err = p.Base.Load("queryNode.cacheSize") if err != nil { return } @@ -623,38 +606,38 @@ func (p *queryNodeConfig) initCacheSize() { // advanced params // stats func (p *queryNodeConfig) initStatsPublishInterval() { - p.StatsPublishInterval = p.BaseParams.ParseIntWithDefault("queryNode.stats.publishInterval", 1000) + p.StatsPublishInterval = p.Base.ParseIntWithDefault("queryNode.stats.publishInterval", 1000) } // dataSync: func (p *queryNodeConfig) initFlowGraphMaxQueueLength() { - p.FlowGraphMaxQueueLength = p.BaseParams.ParseInt32WithDefault("queryNode.dataSync.flowGraph.maxQueueLength", 1024) + p.FlowGraphMaxQueueLength = p.Base.ParseInt32WithDefault("queryNode.dataSync.flowGraph.maxQueueLength", 1024) } func (p *queryNodeConfig) initFlowGraphMaxParallelism() { - p.FlowGraphMaxParallelism = p.BaseParams.ParseInt32WithDefault("queryNode.dataSync.flowGraph.maxParallelism", 1024) + p.FlowGraphMaxParallelism = p.Base.ParseInt32WithDefault("queryNode.dataSync.flowGraph.maxParallelism", 1024) } // msgStream func (p *queryNodeConfig) initSearchReceiveBufSize() { - p.SearchReceiveBufSize = p.BaseParams.ParseInt64WithDefault("queryNode.msgStream.search.recvBufSize", 512) + p.SearchReceiveBufSize = p.Base.ParseInt64WithDefault("queryNode.msgStream.search.recvBufSize", 512) } func (p *queryNodeConfig) initSearchPulsarBufSize() { - p.SearchPulsarBufSize = p.BaseParams.ParseInt64WithDefault("queryNode.msgStream.search.pulsarBufSize", 512) + p.SearchPulsarBufSize = p.Base.ParseInt64WithDefault("queryNode.msgStream.search.pulsarBufSize", 512) } func (p *queryNodeConfig) initSearchResultReceiveBufSize() { - p.SearchResultReceiveBufSize = p.BaseParams.ParseInt64WithDefault("queryNode.msgStream.searchResult.recvBufSize", 64) + p.SearchResultReceiveBufSize = p.Base.ParseInt64WithDefault("queryNode.msgStream.searchResult.recvBufSize", 64) } // ------------------------ channel names func (p *queryNodeConfig) initQueryNodeSubName() { - cluster, err := p.BaseParams.Load("msgChannel.chanNamePrefix.cluster") + cluster, err := p.Base.Load("msgChannel.chanNamePrefix.cluster") if err != nil { panic(err) } - subname, err := p.BaseParams.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix") + subname, err := p.Base.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix") if err != nil { log.Warn(err.Error()) } @@ -664,16 +647,16 @@ func (p *queryNodeConfig) initQueryNodeSubName() { } func (p *queryNodeConfig) initGracefulTime() { - p.GracefulTime = p.BaseParams.ParseInt64("queryNode.gracefulTime") + p.GracefulTime = p.Base.ParseInt64("queryNode.gracefulTime") log.Debug("query node init gracefulTime", zap.Any("gracefulTime", p.GracefulTime)) } func (p *queryNodeConfig) initSegcoreChunkRows() { - p.ChunkRows = p.BaseParams.ParseInt64WithDefault("queryNode.segcore.chunkRows", 32768) + p.ChunkRows = p.Base.ParseInt64WithDefault("queryNode.segcore.chunkRows", 32768) } func (p *queryNodeConfig) initOverloadedMemoryThresholdPercentage() { - overloadedMemoryThresholdPercentage := p.BaseParams.LoadWithDefault("queryCoord.overloadedMemoryThresholdPercentage", "90") + overloadedMemoryThresholdPercentage := p.Base.LoadWithDefault("queryCoord.overloadedMemoryThresholdPercentage", "90") thresholdPercentage, err := strconv.ParseInt(overloadedMemoryThresholdPercentage, 10, 64) if err != nil { panic(err) @@ -684,7 +667,7 @@ func (p *queryNodeConfig) initOverloadedMemoryThresholdPercentage() { /////////////////////////////////////////////////////////////////////////////// // --- datacoord --- type dataCoordConfig struct { - BaseParams *BaseParamTable + Base *BaseTable NodeID int64 @@ -716,8 +699,8 @@ type dataCoordConfig struct { GCDropTolerance time.Duration } -func (p *dataCoordConfig) init(bp *BaseParamTable) { - p.BaseParams = bp +func (p *dataCoordConfig) init(base *BaseTable) { + p.Base = base p.initChannelWatchPrefix() @@ -736,15 +719,15 @@ func (p *dataCoordConfig) init(bp *BaseParamTable) { } func (p *dataCoordConfig) initSegmentMaxSize() { - p.SegmentMaxSize = p.BaseParams.ParseFloatWithDefault("dataCoord.segment.maxSize", 512.0) + p.SegmentMaxSize = p.Base.ParseFloatWithDefault("dataCoord.segment.maxSize", 512.0) } func (p *dataCoordConfig) initSegmentSealProportion() { - p.SegmentSealProportion = p.BaseParams.ParseFloatWithDefault("dataCoord.segment.sealProportion", 0.75) + p.SegmentSealProportion = p.Base.ParseFloatWithDefault("dataCoord.segment.sealProportion", 0.75) } func (p *dataCoordConfig) initSegAssignmentExpiration() { - p.SegAssignmentExpiration = p.BaseParams.ParseInt64WithDefault("dataCoord.segment.assignmentExpiration", 2000) + p.SegAssignmentExpiration = p.Base.ParseInt64WithDefault("dataCoord.segment.assignmentExpiration", 2000) } func (p *dataCoordConfig) initChannelWatchPrefix() { @@ -754,32 +737,32 @@ func (p *dataCoordConfig) initChannelWatchPrefix() { } func (p *dataCoordConfig) initEnableCompaction() { - p.EnableCompaction = p.BaseParams.ParseBool("dataCoord.enableCompaction", false) + p.EnableCompaction = p.Base.ParseBool("dataCoord.enableCompaction", false) } // -- GC -- func (p *dataCoordConfig) initEnableGarbageCollection() { - p.EnableGarbageCollection = p.BaseParams.ParseBool("dataCoord.enableGarbageCollection", false) + p.EnableGarbageCollection = p.Base.ParseBool("dataCoord.enableGarbageCollection", false) } func (p *dataCoordConfig) initGCInterval() { - p.GCInterval = time.Duration(p.BaseParams.ParseInt64WithDefault("dataCoord.gc.interval", 60*60)) * time.Second + p.GCInterval = time.Duration(p.Base.ParseInt64WithDefault("dataCoord.gc.interval", 60*60)) * time.Second } func (p *dataCoordConfig) initGCMissingTolerance() { - p.GCMissingTolerance = time.Duration(p.BaseParams.ParseInt64WithDefault("dataCoord.gc.missingTolerance", 24*60*60)) * time.Second + p.GCMissingTolerance = time.Duration(p.Base.ParseInt64WithDefault("dataCoord.gc.missingTolerance", 24*60*60)) * time.Second } func (p *dataCoordConfig) initGCDropTolerance() { - p.GCDropTolerance = time.Duration(p.BaseParams.ParseInt64WithDefault("dataCoord.gc.dropTolerance", 24*60*60)) * time.Second + p.GCDropTolerance = time.Duration(p.Base.ParseInt64WithDefault("dataCoord.gc.dropTolerance", 24*60*60)) * time.Second } func (p *dataCoordConfig) initEnableAutoCompaction() { - p.EnableAutoCompaction = p.BaseParams.ParseBool("dataCoord.compaction.enableAutoCompaction", false) + p.EnableAutoCompaction = p.Base.ParseBool("dataCoord.compaction.enableAutoCompaction", false) } func (p *dataCoordConfig) initCompactionEntityExpiration() { - p.CompactionEntityExpiration = p.BaseParams.ParseInt64WithDefault("dataCoord.compaction.entityExpiration", math.MaxInt64) + p.CompactionEntityExpiration = p.Base.ParseInt64WithDefault("dataCoord.compaction.entityExpiration", math.MaxInt64) p.CompactionEntityExpiration = func(x, y int64) int64 { if x > y { return x @@ -791,7 +774,7 @@ func (p *dataCoordConfig) initCompactionEntityExpiration() { /////////////////////////////////////////////////////////////////////////////// // --- datanode --- type dataNodeConfig struct { - BaseParams *BaseParamTable + Base *BaseTable // ID of the current DataNode NodeID UniqueID @@ -819,8 +802,8 @@ type dataNodeConfig struct { UpdatedTime time.Time } -func (p *dataNodeConfig) init(bp *BaseParamTable) { - p.BaseParams = bp +func (p *dataNodeConfig) init(base *BaseTable) { + p.Base = base p.initFlowGraphMaxQueueLength() p.initFlowGraphMaxParallelism() @@ -844,20 +827,20 @@ func (p *dataNodeConfig) InitAlias(alias string) { } func (p *dataNodeConfig) initFlowGraphMaxQueueLength() { - p.FlowGraphMaxQueueLength = p.BaseParams.ParseInt32WithDefault("dataNode.dataSync.flowGraph.maxQueueLength", 1024) + p.FlowGraphMaxQueueLength = p.Base.ParseInt32WithDefault("dataNode.dataSync.flowGraph.maxQueueLength", 1024) } func (p *dataNodeConfig) initFlowGraphMaxParallelism() { - p.FlowGraphMaxParallelism = p.BaseParams.ParseInt32WithDefault("dataNode.dataSync.flowGraph.maxParallelism", 1024) + p.FlowGraphMaxParallelism = p.Base.ParseInt32WithDefault("dataNode.dataSync.flowGraph.maxParallelism", 1024) } func (p *dataNodeConfig) initFlushInsertBufferSize() { - p.FlushInsertBufferSize = p.BaseParams.ParseInt64("_DATANODE_INSERTBUFSIZE") + p.FlushInsertBufferSize = p.Base.ParseInt64("_DATANODE_INSERTBUFSIZE") } func (p *dataNodeConfig) initInsertBinlogRootPath() { // GOOSE TODO: rootPath change to TenentID - rootPath, err := p.BaseParams.Load("minio.rootPath") + rootPath, err := p.Base.Load("minio.rootPath") if err != nil { panic(err) } @@ -865,7 +848,7 @@ func (p *dataNodeConfig) initInsertBinlogRootPath() { } func (p *dataNodeConfig) initStatsBinlogRootPath() { - rootPath, err := p.BaseParams.Load("minio.rootPath") + rootPath, err := p.Base.Load("minio.rootPath") if err != nil { panic(err) } @@ -873,7 +856,7 @@ func (p *dataNodeConfig) initStatsBinlogRootPath() { } func (p *dataNodeConfig) initDeleteBinlogRootPath() { - rootPath, err := p.BaseParams.Load("minio.rootPath") + rootPath, err := p.Base.Load("minio.rootPath") if err != nil { panic(err) } @@ -881,11 +864,11 @@ func (p *dataNodeConfig) initDeleteBinlogRootPath() { } func (p *dataNodeConfig) initDataNodeSubName() { - cluster, err := p.BaseParams.Load("msgChannel.chanNamePrefix.cluster") + cluster, err := p.Base.Load("msgChannel.chanNamePrefix.cluster") if err != nil { panic(err) } - subname, err := p.BaseParams.Load("msgChannel.subNamePrefix.dataNodeSubNamePrefix") + subname, err := p.Base.Load("msgChannel.subNamePrefix.dataNodeSubNamePrefix") if err != nil { panic(err) } @@ -900,7 +883,7 @@ func (p *dataNodeConfig) initChannelWatchPath() { /////////////////////////////////////////////////////////////////////////////// // --- indexcoord --- type indexCoordConfig struct { - BaseParams *BaseParamTable + Base *BaseTable Address string Port int @@ -911,15 +894,15 @@ type indexCoordConfig struct { UpdatedTime time.Time } -func (p *indexCoordConfig) init(bp *BaseParamTable) { - p.BaseParams = bp +func (p *indexCoordConfig) init(base *BaseTable) { + p.Base = base p.initIndexStorageRootPath() } // initIndexStorageRootPath initializes the root path of index files. func (p *indexCoordConfig) initIndexStorageRootPath() { - rootPath, err := p.BaseParams.Load("minio.rootPath") + rootPath, err := p.Base.Load("minio.rootPath") if err != nil { panic(err) } @@ -929,7 +912,7 @@ func (p *indexCoordConfig) initIndexStorageRootPath() { /////////////////////////////////////////////////////////////////////////////// // --- indexnode --- type indexNodeConfig struct { - BaseParams *BaseParamTable + Base *BaseTable IP string Address string @@ -944,8 +927,8 @@ type indexNodeConfig struct { UpdatedTime time.Time } -func (p *indexNodeConfig) init(bp *BaseParamTable) { - p.BaseParams = bp +func (p *indexNodeConfig) init(base *BaseTable) { + p.Base = base p.initIndexStorageRootPath() } @@ -956,184 +939,9 @@ func (p *indexNodeConfig) InitAlias(alias string) { } func (p *indexNodeConfig) initIndexStorageRootPath() { - rootPath, err := p.BaseParams.Load("minio.rootPath") + rootPath, err := p.Base.Load("minio.rootPath") if err != nil { panic(err) } p.IndexStorageRootPath = path.Join(rootPath, "index_files") } - -/////////////////////////////////////////////////////////////////////////////// -// --- grpc --- -type grpcConfig struct { - BaseParamTable - - once sync.Once - Domain string - IP string - Port int -} - -func (p *grpcConfig) init(domain string) { - p.BaseParamTable.Init() - p.Domain = domain - - p.LoadFromEnv() - p.LoadFromArgs() - p.initPort() -} - -// LoadFromEnv is used to initialize configuration items from env. -func (p *grpcConfig) LoadFromEnv() { - p.IP = ipv4.LocalIP() -} - -// LoadFromArgs is used to initialize configuration items from args. -func (p *grpcConfig) LoadFromArgs() { - -} - -func (p *grpcConfig) initPort() { - p.Port = p.ParseInt(p.Domain + ".port") -} - -// GetAddress return grpc address -func (p *grpcConfig) GetAddress() string { - return p.IP + ":" + strconv.Itoa(p.Port) -} - -// GrpcServerConfig is configuration for grpc server. -type GrpcServerConfig struct { - grpcConfig - - ServerMaxSendSize int - ServerMaxRecvSize int -} - -// InitOnce initialize grpc server config once -func (p *GrpcServerConfig) InitOnce(domain string) { - p.once.Do(func() { - p.init(domain) - }) -} - -func (p *GrpcServerConfig) init(domain string) { - p.grpcConfig.init(domain) - - p.initServerMaxSendSize() - p.initServerMaxRecvSize() -} - -func (p *GrpcServerConfig) initServerMaxSendSize() { - var err error - - valueStr, err := p.Load(p.Domain + ".grpc.serverMaxSendSize") - if err != nil { - p.ServerMaxSendSize = DefaultServerMaxSendSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { - log.Warn("Failed to parse grpc.serverMaxSendSize, set to default", - zap.String("rol", p.Domain), zap.String("grpc.serverMaxSendSize", valueStr), - zap.Error(err)) - - p.ServerMaxSendSize = DefaultServerMaxSendSize - } else { - p.ServerMaxSendSize = value - } - - log.Debug("initServerMaxSendSize", - zap.String("role", p.Domain), zap.Int("grpc.serverMaxSendSize", p.ServerMaxSendSize)) -} - -func (p *GrpcServerConfig) initServerMaxRecvSize() { - var err error - - valueStr, err := p.Load(p.Domain + ".grpc.serverMaxRecvSize") - if err != nil { - p.ServerMaxRecvSize = DefaultServerMaxRecvSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { - log.Warn("Failed to parse grpc.serverMaxRecvSize, set to default", - zap.String("role", p.Domain), zap.String("grpc.serverMaxRecvSize", valueStr), - zap.Error(err)) - - p.ServerMaxRecvSize = DefaultServerMaxRecvSize - } else { - p.ServerMaxRecvSize = value - } - - log.Debug("initServerMaxRecvSize", - zap.String("role", p.Domain), zap.Int("grpc.serverMaxRecvSize", p.ServerMaxRecvSize)) -} - -// GrpcClientConfig is configuration for grpc client. -type GrpcClientConfig struct { - grpcConfig - - ClientMaxSendSize int - ClientMaxRecvSize int -} - -// InitOnce initialize grpc client config once -func (p *GrpcClientConfig) InitOnce(domain string) { - p.once.Do(func() { - p.init(domain) - }) -} - -func (p *GrpcClientConfig) init(domain string) { - p.grpcConfig.init(domain) - - p.initClientMaxSendSize() - p.initClientMaxRecvSize() -} - -func (p *GrpcClientConfig) initClientMaxSendSize() { - var err error - - valueStr, err := p.Load(p.Domain + ".grpc.clientMaxSendSize") - if err != nil { - p.ClientMaxSendSize = DefaultClientMaxSendSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { - log.Warn("Failed to parse grpc.clientMaxSendSize, set to default", - zap.String("role", p.Domain), zap.String("grpc.clientMaxSendSize", valueStr), - zap.Error(err)) - - p.ClientMaxSendSize = DefaultClientMaxSendSize - } else { - p.ClientMaxSendSize = value - } - - log.Debug("initClientMaxSendSize", - zap.String("role", p.Domain), zap.Int("grpc.clientMaxSendSize", p.ClientMaxSendSize)) -} - -func (p *GrpcClientConfig) initClientMaxRecvSize() { - var err error - - valueStr, err := p.Load(p.Domain + ".grpc.clientMaxRecvSize") - if err != nil { - p.ClientMaxRecvSize = DefaultClientMaxRecvSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { - log.Warn("Failed to parse grpc.clientMaxRecvSize, set to default", - zap.String("role", p.Domain), zap.String("grpc.clientMaxRecvSize", valueStr), - zap.Error(err)) - - p.ClientMaxRecvSize = DefaultClientMaxRecvSize - } else { - p.ClientMaxRecvSize = value - } - - log.Debug("initClientMaxRecvSize", - zap.String("role", p.Domain), zap.Int("grpc.clientMaxRecvSize", p.ClientMaxRecvSize)) -} diff --git a/internal/util/paramtable/global_param_test.go b/internal/util/paramtable/component_param_test.go similarity index 73% rename from internal/util/paramtable/global_param_test.go rename to internal/util/paramtable/component_param_test.go index c34cae40e8..ec156e3975 100644 --- a/internal/util/paramtable/global_param_test.go +++ b/internal/util/paramtable/component_param_test.go @@ -18,7 +18,6 @@ import ( "testing" "time" - "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/stretchr/testify/assert" ) @@ -28,12 +27,12 @@ func shouldPanic(t *testing.T, name string, f func()) { t.Errorf("%s should have panicked", name) } -func TestGlobalParamTable(t *testing.T) { - var GlobalParams GlobalParamTable - GlobalParams.Init() +func TestComponentParam(t *testing.T) { + var CParams ComponentParam + CParams.Init() t.Run("test commonConfig", func(t *testing.T) { - Params := GlobalParams.CommonCfg + Params := CParams.CommonCfg assert.NotEqual(t, Params.DefaultPartitionName, "") t.Logf("default partition name = %s", Params.DefaultPartitionName) @@ -45,14 +44,14 @@ func TestGlobalParamTable(t *testing.T) { }) t.Run("test knowhereConfig", func(t *testing.T) { - Params := GlobalParams.KnowhereCfg + Params := CParams.KnowhereCfg assert.NotEqual(t, Params.SimdType, "") t.Logf("knowhere simd type = %s", Params.SimdType) }) t.Run("test knowhereConfig", func(t *testing.T) { - Params := GlobalParams.MsgChannelCfg + Params := CParams.MsgChannelCfg // -- rootcoord -- assert.Equal(t, Params.RootCoordTimeTick, "by-dev-rootcoord-timetick") @@ -96,7 +95,7 @@ func TestGlobalParamTable(t *testing.T) { }) t.Run("test rootCoordConfig", func(t *testing.T) { - Params := GlobalParams.RootCoordCfg + Params := CParams.RootCoordCfg assert.NotEqual(t, Params.MaxPartitionNum, 0) t.Logf("master MaxPartitionNum = %d", Params.MaxPartitionNum) @@ -111,7 +110,7 @@ func TestGlobalParamTable(t *testing.T) { }) t.Run("test proxyConfig", func(t *testing.T) { - Params := GlobalParams.ProxyCfg + Params := CParams.ProxyCfg t.Logf("TimeTickInterval: %v", Params.TimeTickInterval) @@ -132,50 +131,50 @@ func TestGlobalParamTable(t *testing.T) { }) t.Run("test proxyConfig panic", func(t *testing.T) { - Params := GlobalParams.ProxyCfg + Params := CParams.ProxyCfg shouldPanic(t, "proxy.timeTickInterval", func() { - Params.BaseParams.Save("proxy.timeTickInterval", "") + Params.Base.Save("proxy.timeTickInterval", "") Params.initTimeTickInterval() }) shouldPanic(t, "proxy.msgStream.timeTick.bufSize", func() { - Params.BaseParams.Save("proxy.msgStream.timeTick.bufSize", "abc") + Params.Base.Save("proxy.msgStream.timeTick.bufSize", "abc") Params.initMsgStreamTimeTickBufSize() }) shouldPanic(t, "proxy.maxNameLength", func() { - Params.BaseParams.Save("proxy.maxNameLength", "abc") + Params.Base.Save("proxy.maxNameLength", "abc") Params.initMaxNameLength() }) shouldPanic(t, "proxy.maxFieldNum", func() { - Params.BaseParams.Save("proxy.maxFieldNum", "abc") + Params.Base.Save("proxy.maxFieldNum", "abc") Params.initMaxFieldNum() }) shouldPanic(t, "proxy.maxShardNum", func() { - Params.BaseParams.Save("proxy.maxShardNum", "abc") + Params.Base.Save("proxy.maxShardNum", "abc") Params.initMaxShardNum() }) shouldPanic(t, "proxy.maxDimension", func() { - Params.BaseParams.Save("proxy.maxDimension", "-asdf") + Params.Base.Save("proxy.maxDimension", "-asdf") Params.initMaxDimension() }) shouldPanic(t, "proxy.maxTaskNum", func() { - Params.BaseParams.Save("proxy.maxTaskNum", "-asdf") + Params.Base.Save("proxy.maxTaskNum", "-asdf") Params.initMaxTaskNum() }) }) t.Run("test queryCoordConfig", func(t *testing.T) { - //Params := GlobalParams.QueryCoordCfg + //Params := CParams.QueryCoordCfg }) t.Run("test queryNodeConfig", func(t *testing.T) { - Params := GlobalParams.QueryNodeCfg + Params := CParams.QueryNodeCfg cacheSize := Params.CacheSize assert.Equal(t, int64(32), cacheSize) @@ -213,11 +212,11 @@ func TestGlobalParamTable(t *testing.T) { }) t.Run("test dataCoordConfig", func(t *testing.T) { - //Params := GlobalParams.DataCoordCfg + //Params := CParams.DataCoordCfg }) t.Run("test dataNodeConfig", func(t *testing.T) { - Params := GlobalParams.DataNodeCfg + Params := CParams.DataNodeCfg Params.NodeID = 2 Params.Refresh() @@ -256,7 +255,7 @@ func TestGlobalParamTable(t *testing.T) { }) t.Run("test indexCoordConfig", func(t *testing.T) { - Params := GlobalParams.IndexCoordCfg + Params := CParams.IndexCoordCfg t.Logf("Address: %v", Params.Address) @@ -272,7 +271,7 @@ func TestGlobalParamTable(t *testing.T) { }) t.Run("test indexNodeConfig", func(t *testing.T) { - Params := GlobalParams.IndexNodeCfg + Params := CParams.IndexNodeCfg t.Logf("IP: %v", Params.IP) @@ -293,65 +292,3 @@ func TestGlobalParamTable(t *testing.T) { t.Logf("IndexStorageRootPath: %v", Params.IndexStorageRootPath) }) } - -func TestGrpcServerParams(t *testing.T) { - role := typeutil.DataNodeRole - var Params GrpcServerConfig - Params.InitOnce(role) - - assert.Equal(t, Params.Domain, role) - t.Logf("Domain = %s", Params.Domain) - - assert.NotEqual(t, Params.IP, "") - t.Logf("IP = %s", Params.IP) - - assert.NotZero(t, Params.Port) - t.Logf("Port = %d", Params.Port) - - t.Logf("Address = %s", Params.GetAddress()) - - assert.NotZero(t, Params.ServerMaxRecvSize) - t.Logf("ServerMaxRecvSize = %d", Params.ServerMaxRecvSize) - - Params.Remove(role + ".grpc.serverMaxRecvSize") - Params.initServerMaxRecvSize() - assert.Equal(t, Params.ServerMaxRecvSize, DefaultServerMaxRecvSize) - - assert.NotZero(t, Params.ServerMaxSendSize) - t.Logf("ServerMaxSendSize = %d", Params.ServerMaxSendSize) - - Params.Remove(role + ".grpc.serverMaxSendSize") - Params.initServerMaxSendSize() - assert.Equal(t, Params.ServerMaxSendSize, DefaultServerMaxSendSize) -} - -func TestGrpcClientParams(t *testing.T) { - role := typeutil.DataNodeRole - var Params GrpcClientConfig - Params.InitOnce(role) - - assert.Equal(t, Params.Domain, role) - t.Logf("Domain = %s", Params.Domain) - - assert.NotEqual(t, Params.IP, "") - t.Logf("IP = %s", Params.IP) - - assert.NotZero(t, Params.Port) - t.Logf("Port = %d", Params.Port) - - t.Logf("Address = %s", Params.GetAddress()) - - assert.NotZero(t, Params.ClientMaxRecvSize) - t.Logf("ClientMaxRecvSize = %d", Params.ClientMaxRecvSize) - - Params.Remove(role + ".grpc.clientMaxRecvSize") - Params.initClientMaxRecvSize() - assert.Equal(t, Params.ClientMaxRecvSize, DefaultClientMaxRecvSize) - - assert.NotZero(t, Params.ClientMaxSendSize) - t.Logf("ClientMaxSendSize = %d", Params.ClientMaxSendSize) - - Params.Remove(role + ".grpc.clientMaxSendSize") - Params.initClientMaxSendSize() - assert.Equal(t, Params.ClientMaxSendSize, DefaultClientMaxSendSize) -} diff --git a/internal/util/paramtable/grpc_param.go b/internal/util/paramtable/grpc_param.go new file mode 100644 index 0000000000..69250de59d --- /dev/null +++ b/internal/util/paramtable/grpc_param.go @@ -0,0 +1,211 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package paramtable + +import ( + "math" + "strconv" + "sync" + + "github.com/go-basic/ipv4" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" +) + +const ( + // DefaultServerMaxSendSize defines the maximum size of data per grpc request can send by server side. + DefaultServerMaxSendSize = math.MaxInt32 + + // DefaultServerMaxRecvSize defines the maximum size of data per grpc request can receive by server side. + DefaultServerMaxRecvSize = math.MaxInt32 + + // DefaultClientMaxSendSize defines the maximum size of data per grpc request can send by client side. + DefaultClientMaxSendSize = 100 * 1024 * 1024 + + // DefaultClientMaxRecvSize defines the maximum size of data per grpc request can receive by client side. + DefaultClientMaxRecvSize = 100 * 1024 * 1024 +) + +/////////////////////////////////////////////////////////////////////////////// +// --- grpc --- +type grpcConfig struct { + ServiceParam + + once sync.Once + Domain string + IP string + Port int +} + +func (p *grpcConfig) init(domain string) { + p.ServiceParam.Init() + p.Domain = domain + + p.LoadFromEnv() + p.LoadFromArgs() + p.initPort() +} + +// LoadFromEnv is used to initialize configuration items from env. +func (p *grpcConfig) LoadFromEnv() { + p.IP = ipv4.LocalIP() +} + +// LoadFromArgs is used to initialize configuration items from args. +func (p *grpcConfig) LoadFromArgs() { + +} + +func (p *grpcConfig) initPort() { + p.Port = p.ParseInt(p.Domain + ".port") +} + +// GetAddress return grpc address +func (p *grpcConfig) GetAddress() string { + return p.IP + ":" + strconv.Itoa(p.Port) +} + +// GrpcServerConfig is configuration for grpc server. +type GrpcServerConfig struct { + grpcConfig + + ServerMaxSendSize int + ServerMaxRecvSize int +} + +// InitOnce initialize grpc server config once +func (p *GrpcServerConfig) InitOnce(domain string) { + p.once.Do(func() { + p.init(domain) + }) +} + +func (p *GrpcServerConfig) init(domain string) { + p.grpcConfig.init(domain) + + p.initServerMaxSendSize() + p.initServerMaxRecvSize() +} + +func (p *GrpcServerConfig) initServerMaxSendSize() { + var err error + + valueStr, err := p.Load(p.Domain + ".grpc.serverMaxSendSize") + if err != nil { + p.ServerMaxSendSize = DefaultServerMaxSendSize + } + + value, err := strconv.Atoi(valueStr) + if err != nil { + log.Warn("Failed to parse grpc.serverMaxSendSize, set to default", + zap.String("rol", p.Domain), zap.String("grpc.serverMaxSendSize", valueStr), + zap.Error(err)) + + p.ServerMaxSendSize = DefaultServerMaxSendSize + } else { + p.ServerMaxSendSize = value + } + + log.Debug("initServerMaxSendSize", + zap.String("role", p.Domain), zap.Int("grpc.serverMaxSendSize", p.ServerMaxSendSize)) +} + +func (p *GrpcServerConfig) initServerMaxRecvSize() { + var err error + + valueStr, err := p.Load(p.Domain + ".grpc.serverMaxRecvSize") + if err != nil { + p.ServerMaxRecvSize = DefaultServerMaxRecvSize + } + + value, err := strconv.Atoi(valueStr) + if err != nil { + log.Warn("Failed to parse grpc.serverMaxRecvSize, set to default", + zap.String("role", p.Domain), zap.String("grpc.serverMaxRecvSize", valueStr), + zap.Error(err)) + + p.ServerMaxRecvSize = DefaultServerMaxRecvSize + } else { + p.ServerMaxRecvSize = value + } + + log.Debug("initServerMaxRecvSize", + zap.String("role", p.Domain), zap.Int("grpc.serverMaxRecvSize", p.ServerMaxRecvSize)) +} + +// GrpcClientConfig is configuration for grpc client. +type GrpcClientConfig struct { + grpcConfig + + ClientMaxSendSize int + ClientMaxRecvSize int +} + +// InitOnce initialize grpc client config once +func (p *GrpcClientConfig) InitOnce(domain string) { + p.once.Do(func() { + p.init(domain) + }) +} + +func (p *GrpcClientConfig) init(domain string) { + p.grpcConfig.init(domain) + + p.initClientMaxSendSize() + p.initClientMaxRecvSize() +} + +func (p *GrpcClientConfig) initClientMaxSendSize() { + var err error + + valueStr, err := p.Load(p.Domain + ".grpc.clientMaxSendSize") + if err != nil { + p.ClientMaxSendSize = DefaultClientMaxSendSize + } + + value, err := strconv.Atoi(valueStr) + if err != nil { + log.Warn("Failed to parse grpc.clientMaxSendSize, set to default", + zap.String("role", p.Domain), zap.String("grpc.clientMaxSendSize", valueStr), + zap.Error(err)) + + p.ClientMaxSendSize = DefaultClientMaxSendSize + } else { + p.ClientMaxSendSize = value + } + + log.Debug("initClientMaxSendSize", + zap.String("role", p.Domain), zap.Int("grpc.clientMaxSendSize", p.ClientMaxSendSize)) +} + +func (p *GrpcClientConfig) initClientMaxRecvSize() { + var err error + + valueStr, err := p.Load(p.Domain + ".grpc.clientMaxRecvSize") + if err != nil { + p.ClientMaxRecvSize = DefaultClientMaxRecvSize + } + + value, err := strconv.Atoi(valueStr) + if err != nil { + log.Warn("Failed to parse grpc.clientMaxRecvSize, set to default", + zap.String("role", p.Domain), zap.String("grpc.clientMaxRecvSize", valueStr), + zap.Error(err)) + + p.ClientMaxRecvSize = DefaultClientMaxRecvSize + } else { + p.ClientMaxRecvSize = value + } + + log.Debug("initClientMaxRecvSize", + zap.String("role", p.Domain), zap.Int("grpc.clientMaxRecvSize", p.ClientMaxRecvSize)) +} diff --git a/internal/util/paramtable/grpc_param_test.go b/internal/util/paramtable/grpc_param_test.go new file mode 100644 index 0000000000..8c51ad5c31 --- /dev/null +++ b/internal/util/paramtable/grpc_param_test.go @@ -0,0 +1,81 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package paramtable + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/util/typeutil" + "github.com/stretchr/testify/assert" +) + +func TestGrpcServerParams(t *testing.T) { + role := typeutil.DataNodeRole + var Params GrpcServerConfig + Params.InitOnce(role) + + assert.Equal(t, Params.Domain, role) + t.Logf("Domain = %s", Params.Domain) + + assert.NotEqual(t, Params.IP, "") + t.Logf("IP = %s", Params.IP) + + assert.NotZero(t, Params.Port) + t.Logf("Port = %d", Params.Port) + + t.Logf("Address = %s", Params.GetAddress()) + + assert.NotZero(t, Params.ServerMaxRecvSize) + t.Logf("ServerMaxRecvSize = %d", Params.ServerMaxRecvSize) + + Params.Remove(role + ".grpc.serverMaxRecvSize") + Params.initServerMaxRecvSize() + assert.Equal(t, Params.ServerMaxRecvSize, DefaultServerMaxRecvSize) + + assert.NotZero(t, Params.ServerMaxSendSize) + t.Logf("ServerMaxSendSize = %d", Params.ServerMaxSendSize) + + Params.Remove(role + ".grpc.serverMaxSendSize") + Params.initServerMaxSendSize() + assert.Equal(t, Params.ServerMaxSendSize, DefaultServerMaxSendSize) +} + +func TestGrpcClientParams(t *testing.T) { + role := typeutil.DataNodeRole + var Params GrpcClientConfig + Params.InitOnce(role) + + assert.Equal(t, Params.Domain, role) + t.Logf("Domain = %s", Params.Domain) + + assert.NotEqual(t, Params.IP, "") + t.Logf("IP = %s", Params.IP) + + assert.NotZero(t, Params.Port) + t.Logf("Port = %d", Params.Port) + + t.Logf("Address = %s", Params.GetAddress()) + + assert.NotZero(t, Params.ClientMaxRecvSize) + t.Logf("ClientMaxRecvSize = %d", Params.ClientMaxRecvSize) + + Params.Remove(role + ".grpc.clientMaxRecvSize") + Params.initClientMaxRecvSize() + assert.Equal(t, Params.ClientMaxRecvSize, DefaultClientMaxRecvSize) + + assert.NotZero(t, Params.ClientMaxSendSize) + t.Logf("ClientMaxSendSize = %d", Params.ClientMaxSendSize) + + Params.Remove(role + ".grpc.clientMaxSendSize") + Params.initClientMaxSendSize() + assert.Equal(t, Params.ClientMaxSendSize, DefaultClientMaxSendSize) +} diff --git a/internal/util/paramtable/base_param.go b/internal/util/paramtable/service_param.go similarity index 96% rename from internal/util/paramtable/base_param.go rename to internal/util/paramtable/service_param.go index 70e7ac934d..88c5c90663 100644 --- a/internal/util/paramtable/base_param.go +++ b/internal/util/paramtable/service_param.go @@ -25,9 +25,14 @@ import ( "github.com/milvus-io/milvus/internal/util/metricsinfo" ) +const ( + // SuggestPulsarMaxMessageSize defines the maximum size of Pulsar message. + SuggestPulsarMaxMessageSize = 5 * 1024 * 1024 +) + // BaseParamTable is a derived struct of BaseTable. It achieves Composition by // embedding BaseTable. It is used to quickly and easily access the system configuration. -type BaseParamTable struct { +type ServiceParam struct { BaseTable EtcdCfg EtcdConfig @@ -38,7 +43,7 @@ type BaseParamTable struct { // Init is an override method of BaseTable's Init. It mainly calls the // Init of BaseTable and do some other initialization. -func (p *BaseParamTable) Init() { +func (p *ServiceParam) Init() { p.BaseTable.Init() p.EtcdCfg.init(&p.BaseTable) diff --git a/internal/util/paramtable/base_param_test.go b/internal/util/paramtable/service_param_test.go similarity index 91% rename from internal/util/paramtable/base_param_test.go rename to internal/util/paramtable/service_param_test.go index 6bb72d8e49..a4162c4894 100644 --- a/internal/util/paramtable/base_param_test.go +++ b/internal/util/paramtable/service_param_test.go @@ -19,12 +19,12 @@ import ( "github.com/stretchr/testify/assert" ) -func TestBaseParamTable(t *testing.T) { - var BaseParams BaseParamTable - BaseParams.Init() +func TestServiceParam(t *testing.T) { + var SParams ServiceParam + SParams.Init() t.Run("test etcdConfig", func(t *testing.T) { - Params := BaseParams.EtcdCfg + Params := SParams.EtcdCfg assert.NotZero(t, len(Params.Endpoints)) t.Logf("etcd endpoints = %s", Params.Endpoints) @@ -45,7 +45,7 @@ func TestBaseParamTable(t *testing.T) { }) t.Run("test pulsarConfig", func(t *testing.T) { - Params := BaseParams.PulsarCfg + Params := SParams.PulsarCfg assert.NotEqual(t, Params.Address, "") t.Logf("pulsar address = %s", Params.Address) @@ -54,14 +54,14 @@ func TestBaseParamTable(t *testing.T) { }) t.Run("test rocksmqConfig", func(t *testing.T) { - Params := BaseParams.RocksmqCfg + Params := SParams.RocksmqCfg assert.NotEqual(t, Params.Path, "") t.Logf("rocksmq path = %s", Params.Path) }) t.Run("test minioConfig", func(t *testing.T) { - Params := BaseParams.MinioCfg + Params := SParams.MinioCfg addr := Params.Address equal := addr == "localhost:9000" || addr == "minio:9000"