milvus/internal/util/paramtable/service_param.go
Enwei Jiao 264f08953d
Refactor Rocksmq's config (#21304)
Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>
2022-12-26 19:11:30 +08:00

651 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package paramtable
import (
"encoding/json"
"net/url"
"os"
"path"
"strconv"
"strings"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"go.uber.org/zap"
)
const (
// SuggestPulsarMaxMessageSize defines the maximum size of Pulsar message.
SuggestPulsarMaxMessageSize = 5 * 1024 * 1024
defaultEtcdLogLevel = "info"
defaultEtcdLogPath = "stdout"
KafkaProducerConfigPrefix = "kafka.producer."
KafkaConsumerConfigPrefix = "kafka.consumer."
)
// ServiceParam is used to quickly and easily access all basic service configurations.
type ServiceParam struct {
BaseTable
LocalStorageCfg LocalStorageConfig
MetaStoreCfg MetaStoreConfig
EtcdCfg EtcdConfig
DBCfg MetaDBConfig
PulsarCfg PulsarConfig
KafkaCfg KafkaConfig
RocksmqCfg RocksmqConfig
MinioCfg MinioConfig
}
func (p *ServiceParam) Init() {
p.BaseTable.Init(10)
p.LocalStorageCfg.Init(&p.BaseTable)
p.MetaStoreCfg.Init(&p.BaseTable)
p.EtcdCfg.Init(&p.BaseTable)
p.DBCfg.Init(&p.BaseTable)
p.PulsarCfg.Init(&p.BaseTable)
p.KafkaCfg.Init(&p.BaseTable)
p.RocksmqCfg.Init(&p.BaseTable)
p.MinioCfg.Init(&p.BaseTable)
}
// /////////////////////////////////////////////////////////////////////////////
// --- etcd ---
type EtcdConfig struct {
// --- ETCD ---
Endpoints ParamItem `refreshable:"false"`
RootPath ParamItem `refreshable:"false"`
MetaSubPath ParamItem `refreshable:"false"`
KvSubPath ParamItem `refreshable:"false"`
MetaRootPath CompositeParamItem `refreshable:"false"`
KvRootPath CompositeParamItem `refreshable:"false"`
EtcdLogLevel ParamItem `refreshable:"false"`
EtcdLogPath ParamItem `refreshable:"false"`
EtcdUseSSL ParamItem `refreshable:"false"`
EtcdTLSCert ParamItem `refreshable:"false"`
EtcdTLSKey ParamItem `refreshable:"false"`
EtcdTLSCACert ParamItem `refreshable:"false"`
EtcdTLSMinVersion ParamItem `refreshable:"false"`
// --- Embed ETCD ---
UseEmbedEtcd ParamItem `refreshable:"false"`
ConfigPath ParamItem `refreshable:"false"`
DataDir ParamItem `refreshable:"false"`
}
func (p *EtcdConfig) Init(base *BaseTable) {
p.Endpoints = ParamItem{
Key: "etcd.endpoints",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.Endpoints.Init(base.mgr)
p.UseEmbedEtcd = ParamItem{
Key: "etcd.use.embed",
DefaultValue: "false",
Version: "2.1.0",
}
p.UseEmbedEtcd.Init(base.mgr)
if p.UseEmbedEtcd.GetAsBool() && (os.Getenv(metricsinfo.DeployModeEnvKey) != metricsinfo.StandaloneDeployMode) {
panic("embedded etcd can not be used under distributed mode")
}
p.ConfigPath = ParamItem{
Key: "etcd.config.path",
Version: "2.1.0",
}
p.ConfigPath.Init(base.mgr)
p.DataDir = ParamItem{
Key: "etcd.data.dir",
DefaultValue: "default.etcd",
Version: "2.1.0",
}
p.DataDir.Init(base.mgr)
p.Endpoints = ParamItem{
Key: "etcd.endpoints",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.Endpoints.Init(base.mgr)
p.RootPath = ParamItem{
Key: "etcd.rootPath",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.RootPath.Init(base.mgr)
p.MetaSubPath = ParamItem{
Key: "etcd.metaSubPath",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.MetaSubPath.Init(base.mgr)
p.MetaRootPath = CompositeParamItem{
Items: []*ParamItem{&p.RootPath, &p.MetaSubPath},
Format: func(kvs map[string]string) string {
return path.Join(kvs[p.RootPath.Key], kvs[p.MetaSubPath.Key])
},
}
p.KvSubPath = ParamItem{
Key: "etcd.kvSubPath",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.KvSubPath.Init(base.mgr)
p.KvRootPath = CompositeParamItem{
Items: []*ParamItem{&p.RootPath, &p.KvSubPath},
Format: func(kvs map[string]string) string {
return path.Join(kvs[p.RootPath.Key], kvs[p.KvSubPath.Key])
},
}
p.EtcdLogLevel = ParamItem{
Key: "etcd.log.level",
DefaultValue: defaultEtcdLogLevel,
Version: "2.0.0",
}
p.EtcdLogLevel.Init(base.mgr)
p.EtcdLogPath = ParamItem{
Key: "etcd.log.path",
DefaultValue: defaultEtcdLogPath,
Version: "2.0.0",
}
p.EtcdLogPath.Init(base.mgr)
p.EtcdUseSSL = ParamItem{
Key: "etcd.ssl.enabled",
DefaultValue: "false",
Version: "2.0.0",
}
p.EtcdUseSSL.Init(base.mgr)
p.EtcdTLSCert = ParamItem{
Key: "etcd.ssl.tlsCert",
Version: "2.0.0",
}
p.EtcdTLSCert.Init(base.mgr)
p.EtcdTLSKey = ParamItem{
Key: "etcd.ssl.tlsKey",
Version: "2.0.0",
}
p.EtcdTLSKey.Init(base.mgr)
p.EtcdTLSCACert = ParamItem{
Key: "etcd.ssl.tlsCACert",
Version: "2.0.0",
}
p.EtcdTLSCACert.Init(base.mgr)
p.EtcdTLSMinVersion = ParamItem{
Key: "etcd.ssl.tlsMinVersion",
DefaultValue: "1.3",
Version: "2.0.0",
}
p.EtcdTLSMinVersion.Init(base.mgr)
}
type LocalStorageConfig struct {
Path ParamItem `refreshable:"false"`
}
func (p *LocalStorageConfig) Init(base *BaseTable) {
p.Path = ParamItem{
Key: "localStorage.path",
Version: "2.0.0",
DefaultValue: "/var/lib/milvus/data",
}
p.Path.Init(base.mgr)
}
type MetaStoreConfig struct {
MetaStoreType ParamItem `refreshable:"false"`
}
func (p *MetaStoreConfig) Init(base *BaseTable) {
p.MetaStoreType = ParamItem{
Key: "metastore.type",
Version: "2.2.0",
DefaultValue: util.MetaStoreTypeEtcd,
}
p.MetaStoreType.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////
// --- meta db ---
type MetaDBConfig struct {
Username ParamItem `refreshable:"false"`
Password ParamItem `refreshable:"false"`
Address ParamItem `refreshable:"false"`
Port ParamItem `refreshable:"false"`
DBName ParamItem `refreshable:"false"`
MaxOpenConns ParamItem `refreshable:"false"`
MaxIdleConns ParamItem `refreshable:"false"`
LogLevel ParamItem `refreshable:"false"`
}
func (p *MetaDBConfig) Init(base *BaseTable) {
p.Username = ParamItem{
Key: "mysql.username",
Version: "2.2.0",
PanicIfEmpty: true,
}
p.Username.Init(base.mgr)
p.Password = ParamItem{
Key: "mysql.password",
Version: "2.2.0",
PanicIfEmpty: true,
}
p.Password.Init(base.mgr)
p.Address = ParamItem{
Key: "mysql.address",
Version: "2.2.0",
PanicIfEmpty: true,
}
p.Address.Init(base.mgr)
p.Port = ParamItem{
Key: "mysql.port",
Version: "2.2.0",
DefaultValue: "3306",
}
p.Port.Init(base.mgr)
p.DBName = ParamItem{
Key: "mysql.dbName",
Version: "2.2.0",
PanicIfEmpty: true,
}
p.DBName.Init(base.mgr)
p.MaxOpenConns = ParamItem{
Key: "mysql.maxOpenConns",
Version: "2.2.0",
DefaultValue: "20",
}
p.MaxOpenConns.Init(base.mgr)
p.MaxIdleConns = ParamItem{
Key: "mysql.maxIdleConns",
Version: "2.2.0",
DefaultValue: "5",
}
p.MaxIdleConns.Init(base.mgr)
p.LogLevel = ParamItem{
Key: "log.level",
Version: "2.0.0",
DefaultValue: "debug",
}
p.LogLevel.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////
// --- pulsar ---
type PulsarConfig struct {
Address ParamItem `refreshable:"false"`
Port ParamItem `refreshable:"false"`
WebAddress ParamItem `refreshable:"false"`
WebPort ParamItem `refreshable:"false"`
MaxMessageSize ParamItem `refreshable:"true"`
// support auth
AuthPlugin ParamItem `refreshable:"false"`
AuthParams ParamItem `refreshable:"false"`
// support tenant
Tenant ParamItem `refreshable:"false"`
Namespace ParamItem `refreshable:"false"`
}
func (p *PulsarConfig) Init(base *BaseTable) {
p.Port = ParamItem{
Key: "pulsar.port",
Version: "2.0.0",
DefaultValue: "6650",
}
p.Port.Init(base.mgr)
// due to implicit rule of MQ prioritythe default address should be empty
p.Address = ParamItem{
Key: "pulsar.address",
Version: "2.0.0",
DefaultValue: "",
Formatter: func(addr string) string {
if addr == "" {
return ""
}
if strings.Contains(addr, ":") {
return addr
}
port, _ := p.Port.get()
return "pulsar://" + addr + ":" + port
},
}
p.Address.Init(base.mgr)
p.WebPort = ParamItem{
Key: "pulsar.webport",
Version: "2.0.0",
DefaultValue: "80",
}
p.WebPort.Init(base.mgr)
p.WebAddress = ParamItem{
Key: "pulsar.webaddress",
Version: "2.0.0",
DefaultValue: "",
Formatter: func(add string) string {
pulsarURL, err := url.ParseRequestURI(p.Address.GetValue())
if err != nil {
log.Info("failed to parse pulsar config, assume pulsar not used", zap.Error(err))
return ""
}
return "http://" + pulsarURL.Hostname() + ":" + p.WebPort.GetValue()
},
}
p.WebAddress.Init(base.mgr)
p.MaxMessageSize = ParamItem{
Key: "pulsar.maxMessageSize",
Version: "2.0.0",
DefaultValue: strconv.Itoa(SuggestPulsarMaxMessageSize),
}
p.MaxMessageSize.Init(base.mgr)
p.Tenant = ParamItem{
Key: "pulsar.tenant",
Version: "2.2.0",
DefaultValue: "public",
}
p.Tenant.Init(base.mgr)
p.Namespace = ParamItem{
Key: "pulsar.namespace",
Version: "2.2.0",
DefaultValue: "default",
}
p.Namespace.Init(base.mgr)
p.AuthPlugin = ParamItem{
Key: "pulsar.authPlugin",
Version: "2.2.0",
}
p.AuthPlugin.Init(base.mgr)
p.AuthParams = ParamItem{
Key: "pulsar.authParams",
Version: "2.2.0",
Formatter: func(authParams string) string {
jsonMap := make(map[string]string)
params := strings.Split(authParams, ",")
for _, param := range params {
kv := strings.Split(param, ":")
if len(kv) == 2 {
jsonMap[kv[0]] = kv[1]
}
}
if len(jsonMap) == 0 {
return ""
}
jsonData, _ := json.Marshal(&jsonMap)
return string(jsonData)
},
}
p.AuthParams.Init(base.mgr)
}
// --- kafka ---
type KafkaConfig struct {
Address ParamItem `refreshable:"false"`
SaslUsername ParamItem `refreshable:"false"`
SaslPassword ParamItem `refreshable:"false"`
SaslMechanisms ParamItem `refreshable:"false"`
SecurityProtocol ParamItem `refreshable:"false"`
ConsumerExtraConfig ParamGroup `refreshable:"false"`
ProducerExtraConfig ParamGroup `refreshable:"false"`
}
func (k *KafkaConfig) Init(base *BaseTable) {
// due to implicit rule of MQ prioritythe default address should be empty
k.Address = ParamItem{
Key: "kafka.brokerList",
DefaultValue: "",
Version: "2.1.0",
}
k.Address.Init(base.mgr)
k.SaslUsername = ParamItem{
Key: "kafka.saslUsername",
DefaultValue: "",
Version: "2.1.0",
}
k.SaslUsername.Init(base.mgr)
k.SaslPassword = ParamItem{
Key: "kafka.saslPassword",
DefaultValue: "",
Version: "2.1.0",
}
k.SaslPassword.Init(base.mgr)
k.SaslMechanisms = ParamItem{
Key: "kafka.saslMechanisms",
DefaultValue: "PLAIN",
Version: "2.1.0",
}
k.SaslMechanisms.Init(base.mgr)
k.SecurityProtocol = ParamItem{
Key: "kafka.securityProtocol",
DefaultValue: "SASL_SSL",
Version: "2.1.0",
}
k.SecurityProtocol.Init(base.mgr)
k.ConsumerExtraConfig = ParamGroup{
KeyPrefix: "kafka.consumer.",
Version: "2.2.0",
}
k.ConsumerExtraConfig.Init(base.mgr)
k.ProducerExtraConfig = ParamGroup{
KeyPrefix: "kafka.producer.",
Version: "2.2.0",
}
k.ProducerExtraConfig.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////
// --- rocksmq ---
type RocksmqConfig struct {
Path ParamItem `refreshable:"false"`
LRUCacheRatio ParamItem `refreshable:"false"`
PageSize ParamItem `refreshable:"false"`
// RetentionTimeInMinutes is the time of retention
RetentionTimeInMinutes ParamItem `refreshable:"false"`
// RetentionSizeInMB is the size of retention
RetentionSizeInMB ParamItem `refreshable:"false"`
// CompactionInterval is the Interval we trigger compaction,
CompactionInterval ParamItem `refreshable:"false"`
// TickerTimeInSeconds is the time of expired check, default 10 minutes
TickerTimeInSeconds ParamItem `refreshable:"false"`
}
func (r *RocksmqConfig) Init(base *BaseTable) {
r.Path = ParamItem{
Key: "rocksmq.path",
Version: "2.0.0",
}
r.Path.Init(base.mgr)
r.LRUCacheRatio = ParamItem{
Key: "rocksmq.lrucacheratio",
DefaultValue: "0.0.6",
Version: "2.0.0",
}
r.LRUCacheRatio.Init(base.mgr)
r.PageSize = ParamItem{
Key: "rocksmq.rocksmqPageSize",
DefaultValue: strconv.FormatInt(256<<20, 10),
Version: "2.0.0",
}
r.PageSize.Init(base.mgr)
r.RetentionTimeInMinutes = ParamItem{
Key: "rocksmq.retentionTimeInMinutes",
DefaultValue: "7200",
Version: "2.0.0",
}
r.RetentionTimeInMinutes.Init(base.mgr)
r.RetentionSizeInMB = ParamItem{
Key: "rocksmq.retentionSizeInMB",
DefaultValue: "7200",
Version: "2.0.0",
}
r.RetentionSizeInMB.Init(base.mgr)
r.CompactionInterval = ParamItem{
Key: "rocksmq.compactionInterval",
DefaultValue: "86400",
Version: "2.0.0",
}
r.CompactionInterval.Init(base.mgr)
r.TickerTimeInSeconds = ParamItem{
Key: "rocksmq.timtickerInterval",
DefaultValue: "600",
Version: "2.2.2",
}
r.TickerTimeInSeconds.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////
// --- minio ---
type MinioConfig struct {
Address ParamItem `refreshable:"false"`
Port ParamItem `refreshable:"false"`
AccessKeyID ParamItem `refreshable:"false"`
SecretAccessKey ParamItem `refreshable:"false"`
UseSSL ParamItem `refreshable:"false"`
BucketName ParamItem `refreshable:"false"`
RootPath ParamItem `refreshable:"false"`
UseIAM ParamItem `refreshable:"false"`
CloudProvider ParamItem `refreshable:"false"`
IAMEndpoint ParamItem `refreshable:"false"`
}
func (p *MinioConfig) Init(base *BaseTable) {
p.Port = ParamItem{
Key: "minio.port",
DefaultValue: "9000",
Version: "2.0.0",
}
p.Port.Init(base.mgr)
p.Address = ParamItem{
Key: "minio.address",
DefaultValue: "",
Version: "2.0.0",
Formatter: func(addr string) string {
if addr == "" {
return ""
}
if strings.Contains(addr, ":") {
return addr
}
port, _ := p.Port.get()
return addr + ":" + port
},
}
p.Address.Init(base.mgr)
p.AccessKeyID = ParamItem{
Key: "minio.accessKeyID",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.AccessKeyID.Init(base.mgr)
p.SecretAccessKey = ParamItem{
Key: "minio.secretAccessKey",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.SecretAccessKey.Init(base.mgr)
p.UseSSL = ParamItem{
Key: "minio.useSSL",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.UseSSL.Init(base.mgr)
p.BucketName = ParamItem{
Key: "minio.bucketName",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.BucketName.Init(base.mgr)
p.RootPath = ParamItem{
Key: "minio.rootPath",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.RootPath.Init(base.mgr)
p.UseIAM = ParamItem{
Key: "minio.useIAM",
DefaultValue: DefaultMinioUseIAM,
Version: "2.0.0",
}
p.UseIAM.Init(base.mgr)
p.CloudProvider = ParamItem{
Key: "minio.cloudProvider",
DefaultValue: DefaultMinioCloudProvider,
Version: "2.2.0",
}
p.CloudProvider.Init(base.mgr)
p.IAMEndpoint = ParamItem{
Key: "minio.iamEndpoint",
DefaultValue: DefaultMinioIAMEndpoint,
Version: "2.0.0",
}
p.IAMEndpoint.Init(base.mgr)
}