milvus/internal/util/paramtable/service_param.go
jaime d84cda22ad
Fix start fail with kafka model (#20726)
Signed-off-by: yun.zhang <yun.zhang@zilliz.com>

Signed-off-by: yun.zhang <yun.zhang@zilliz.com>
2022-11-21 17:49:12 +08:00

617 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package paramtable
import (
"encoding/json"
"net/url"
"os"
"path"
"strconv"
"strings"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"go.uber.org/zap"
)
const (
// SuggestPulsarMaxMessageSize defines the maximum size of Pulsar message.
SuggestPulsarMaxMessageSize = 5 * 1024 * 1024
defaultEtcdLogLevel = "info"
defaultEtcdLogPath = "stdout"
KafkaProducerConfigPrefix = "kafka.producer."
KafkaConsumerConfigPrefix = "kafka.consumer."
)
// ServiceParam is used to quickly and easily access all basic service configurations.
type ServiceParam struct {
BaseTable
LocalStorageCfg LocalStorageConfig
MetaStoreCfg MetaStoreConfig
EtcdCfg EtcdConfig
DBCfg MetaDBConfig
PulsarCfg PulsarConfig
KafkaCfg KafkaConfig
RocksmqCfg RocksmqConfig
MinioCfg MinioConfig
}
func (p *ServiceParam) Init() {
p.BaseTable.Init()
p.LocalStorageCfg.Init(&p.BaseTable)
p.MetaStoreCfg.Init(&p.BaseTable)
p.EtcdCfg.Init(&p.BaseTable)
if p.MetaStoreCfg.MetaStoreType == util.MetaStoreTypeMysql {
log.Debug("Mysql protocol is used as meta store")
p.DBCfg.Init(&p.BaseTable)
}
p.PulsarCfg.Init(&p.BaseTable)
p.KafkaCfg.Init(&p.BaseTable)
p.RocksmqCfg.Init(&p.BaseTable)
p.MinioCfg.Init(&p.BaseTable)
}
// /////////////////////////////////////////////////////////////////////////////
// --- etcd ---
type EtcdConfig struct {
// --- ETCD ---
Endpoints ParamItem
RootPath ParamItem
MetaSubPath ParamItem
KvSubPath ParamItem
MetaRootPath CompositeParamItem
KvRootPath CompositeParamItem
EtcdLogLevel ParamItem
EtcdLogPath ParamItem
EtcdUseSSL ParamItem
EtcdTLSCert ParamItem
EtcdTLSKey ParamItem
EtcdTLSCACert ParamItem
EtcdTLSMinVersion ParamItem
// --- Embed ETCD ---
UseEmbedEtcd ParamItem
ConfigPath ParamItem
DataDir ParamItem
}
func (p *EtcdConfig) Init(base *BaseTable) {
p.Endpoints = ParamItem{
Key: "etcd.endpoints",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.Endpoints.Init(base.mgr)
p.UseEmbedEtcd = ParamItem{
Key: "etcd.use.embed",
DefaultValue: "false",
Version: "2.1.0",
}
p.UseEmbedEtcd.Init(base.mgr)
if p.UseEmbedEtcd.GetAsBool() && (os.Getenv(metricsinfo.DeployModeEnvKey) != metricsinfo.StandaloneDeployMode) {
panic("embedded etcd can not be used under distributed mode")
}
if p.UseEmbedEtcd.GetAsBool() {
p.ConfigPath = ParamItem{
Key: "etcd.config.path",
DefaultValue: "",
Version: "2.1.0",
}
p.ConfigPath.Init(base.mgr)
p.DataDir = ParamItem{
Key: "etcd.data.dir",
DefaultValue: "default.etcd",
Version: "2.1.0",
}
p.DataDir.Init(base.mgr)
} else {
p.Endpoints = ParamItem{
Key: "etcd.endpoints",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.Endpoints.Init(base.mgr)
}
p.RootPath = ParamItem{
Key: "etcd.rootPath",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.RootPath.Init(base.mgr)
p.MetaSubPath = ParamItem{
Key: "etcd.metaSubPath",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.MetaSubPath.Init(base.mgr)
p.MetaRootPath = CompositeParamItem{
Items: []*ParamItem{&p.RootPath, &p.MetaSubPath},
Format: func(kvs map[string]string) string {
return path.Join(kvs["etcd.rootPath"], kvs["etcd.metaSubPath"])
},
}
p.KvSubPath = ParamItem{
Key: "etcd.kvSubPath",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.KvSubPath.Init(base.mgr)
p.KvRootPath = CompositeParamItem{
Items: []*ParamItem{&p.RootPath, &p.KvSubPath},
Format: func(kvs map[string]string) string {
return path.Join(kvs["etcd.rootPath"], kvs["etcd.kvSubPath"])
},
}
p.EtcdLogLevel = ParamItem{
Key: "etcd.log.level",
DefaultValue: defaultEtcdLogLevel,
Version: "2.0.0",
}
p.EtcdLogLevel.Init(base.mgr)
p.EtcdLogPath = ParamItem{
Key: "etcd.log.path",
DefaultValue: defaultEtcdLogPath,
Version: "2.0.0",
}
p.EtcdLogPath.Init(base.mgr)
p.EtcdUseSSL = ParamItem{
Key: "etcd.ssl.enabled",
DefaultValue: "false",
Version: "2.0.0",
}
p.EtcdUseSSL.Init(base.mgr)
p.EtcdTLSCert = ParamItem{
Key: "etcd.ssl.tlsCert",
Version: "2.0.0",
}
p.EtcdTLSCert.Init(base.mgr)
p.EtcdTLSKey = ParamItem{
Key: "etcd.ssl.tlsKey",
Version: "2.0.0",
}
p.EtcdTLSKey.Init(base.mgr)
p.EtcdTLSCACert = ParamItem{
Key: "etcd.ssl.tlsCACert",
Version: "2.0.0",
}
p.EtcdTLSCACert.Init(base.mgr)
p.EtcdTLSMinVersion = ParamItem{
Key: "etcd.ssl.tlsMinVersion",
DefaultValue: "1.3",
Version: "2.0.0",
}
p.EtcdTLSMinVersion.Init(base.mgr)
}
type LocalStorageConfig struct {
Path ParamItem
}
func (p *LocalStorageConfig) Init(base *BaseTable) {
p.Path = ParamItem{
Key: "localStorage.path",
Version: "2.0.0",
DefaultValue: "/var/lib/milvus/data",
}
p.Path.Init(base.mgr)
}
type MetaStoreConfig struct {
Base *BaseTable
MetaStoreType string
}
func (p *MetaStoreConfig) Init(base *BaseTable) {
p.Base = base
p.LoadCfgToMemory()
}
func (p *MetaStoreConfig) LoadCfgToMemory() {
p.initMetaStoreType()
}
func (p *MetaStoreConfig) initMetaStoreType() {
p.MetaStoreType = p.Base.LoadWithDefault("metastore.type", util.MetaStoreTypeEtcd)
}
// /////////////////////////////////////////////////////////////////////////////
// --- meta db ---
type MetaDBConfig struct {
Base *BaseTable
Username string
Password string
Address string
Port int
DBName string
MaxOpenConns int
MaxIdleConns int
}
func (p *MetaDBConfig) Init(base *BaseTable) {
p.Base = base
p.LoadCfgToMemory()
}
func (p *MetaDBConfig) LoadCfgToMemory() {
p.initUsername()
p.initPassword()
p.initAddress()
p.initPort()
p.initDbName()
p.initMaxOpenConns()
p.initMaxIdleConns()
}
func (p *MetaDBConfig) initUsername() {
username, err := p.Base.Load("mysql.username")
if err != nil {
panic(err)
}
p.Username = username
}
func (p *MetaDBConfig) initPassword() {
password, err := p.Base.Load("mysql.password")
if err != nil {
panic(err)
}
p.Password = password
}
func (p *MetaDBConfig) initAddress() {
address, err := p.Base.Load("mysql.address")
if err != nil {
panic(err)
}
p.Address = address
}
func (p *MetaDBConfig) initPort() {
port := p.Base.ParseIntWithDefault("mysql.port", 3306)
p.Port = port
}
func (p *MetaDBConfig) initDbName() {
dbName, err := p.Base.Load("mysql.dbName")
if err != nil {
panic(err)
}
p.DBName = dbName
}
func (p *MetaDBConfig) initMaxOpenConns() {
maxOpenConns := p.Base.ParseIntWithDefault("mysql.maxOpenConns", 20)
p.MaxOpenConns = maxOpenConns
}
func (p *MetaDBConfig) initMaxIdleConns() {
maxIdleConns := p.Base.ParseIntWithDefault("mysql.maxIdleConns", 5)
p.MaxIdleConns = maxIdleConns
}
// /////////////////////////////////////////////////////////////////////////////
// --- pulsar ---
type PulsarConfig struct {
Address ParamItem
Port ParamItem
WebAddress ParamItem
WebPort ParamItem
MaxMessageSize ParamItem
// support auth
AuthPlugin ParamItem
AuthParams ParamItem
// support tenant
Tenant ParamItem
Namespace ParamItem
}
func (p *PulsarConfig) Init(base *BaseTable) {
p.Port = ParamItem{
Key: "pulsar.port",
Version: "2.0.0",
DefaultValue: "6650",
}
p.Port.Init(base.mgr)
// due to implicit rule of MQ prioritythe default address should be empty
p.Address = ParamItem{
Key: "pulsar.address",
Version: "2.0.0",
DefaultValue: "",
Formatter: func(addr string) string {
if addr == "" {
return ""
}
if strings.Contains(addr, ":") {
return addr
}
port, _ := p.Port.get()
return "pulsar://" + addr + ":" + port
},
}
p.Address.Init(base.mgr)
p.WebPort = ParamItem{
Key: "pulsar.webport",
Version: "2.0.0",
DefaultValue: "80",
}
p.WebPort.Init(base.mgr)
p.WebAddress = ParamItem{
Key: "pulsar.webaddress",
Version: "2.0.0",
DefaultValue: "",
Formatter: func(add string) string {
pulsarURL, err := url.ParseRequestURI(p.Address.GetValue())
if err != nil {
log.Info("failed to parse pulsar config, assume pulsar not used", zap.Error(err))
return ""
}
return "http://" + pulsarURL.Hostname() + ":" + p.WebPort.GetValue()
},
}
p.WebAddress.Init(base.mgr)
p.MaxMessageSize = ParamItem{
Key: "pulsar.maxMessageSize",
Version: "2.0.0",
DefaultValue: strconv.Itoa(SuggestPulsarMaxMessageSize),
}
p.MaxMessageSize.Init(base.mgr)
p.Tenant = ParamItem{
Key: "pulsar.tenant",
Version: "2.2.0",
DefaultValue: "public",
}
p.Tenant.Init(base.mgr)
p.Namespace = ParamItem{
Key: "pulsar.namespace",
Version: "2.2.0",
DefaultValue: "default",
}
p.Namespace.Init(base.mgr)
p.AuthPlugin = ParamItem{
Key: "pulsar.authPlugin",
Version: "2.2.0",
}
p.AuthPlugin.Init(base.mgr)
p.AuthParams = ParamItem{
Key: "pulsar.authParams",
Version: "2.2.0",
Formatter: func(authParams string) string {
jsonMap := make(map[string]string)
params := strings.Split(authParams, ",")
for _, param := range params {
kv := strings.Split(param, ":")
if len(kv) == 2 {
jsonMap[kv[0]] = kv[1]
}
}
if len(jsonMap) == 0 {
return ""
}
jsonData, _ := json.Marshal(&jsonMap)
return string(jsonData)
},
}
p.AuthParams.Init(base.mgr)
}
// --- kafka ---
type KafkaConfig struct {
Address ParamItem
SaslUsername ParamItem
SaslPassword ParamItem
SaslMechanisms ParamItem
SecurityProtocol ParamItem
ConsumerExtraConfig ParamGroup
ProducerExtraConfig ParamGroup
}
func (k *KafkaConfig) Init(base *BaseTable) {
// due to implicit rule of MQ prioritythe default address should be empty
k.Address = ParamItem{
Key: "kafka.brokerList",
DefaultValue: "",
Version: "2.1.0",
}
k.Address.Init(base.mgr)
k.SaslUsername = ParamItem{
Key: "kafka.saslUsername",
DefaultValue: "",
Version: "2.1.0",
}
k.SaslUsername.Init(base.mgr)
k.SaslPassword = ParamItem{
Key: "kafka.saslPassword",
DefaultValue: "",
Version: "2.1.0",
}
k.SaslPassword.Init(base.mgr)
k.SaslMechanisms = ParamItem{
Key: "kafka.saslMechanisms",
DefaultValue: "PLAIN",
Version: "2.1.0",
}
k.SaslMechanisms.Init(base.mgr)
k.SecurityProtocol = ParamItem{
Key: "kafka.securityProtocol",
DefaultValue: "SASL_SSL",
Version: "2.1.0",
}
k.SecurityProtocol.Init(base.mgr)
k.ConsumerExtraConfig = ParamGroup{
KeyPrefix: "kafka.consumer.",
Version: "2.2.0",
}
k.ConsumerExtraConfig.Init(base.mgr)
k.ProducerExtraConfig = ParamGroup{
KeyPrefix: "kafka.producer.",
Version: "2.2.0",
}
k.ProducerExtraConfig.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////
// --- rocksmq ---
type RocksmqConfig struct {
Path ParamItem
}
func (r *RocksmqConfig) Init(base *BaseTable) {
r.Path = ParamItem{
Key: "rocksmq.path",
DefaultValue: "",
Version: "2.0.0",
}
r.Path.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////
// --- minio ---
type MinioConfig struct {
Address ParamItem
Port ParamItem
AccessKeyID ParamItem
SecretAccessKey ParamItem
UseSSL ParamItem
BucketName ParamItem
RootPath ParamItem
UseIAM ParamItem
CloudProvider ParamItem
IAMEndpoint ParamItem
}
func (p *MinioConfig) Init(base *BaseTable) {
p.Port = ParamItem{
Key: "minio.port",
DefaultValue: "9000",
Version: "2.0.0",
}
p.Port.Init(base.mgr)
p.Address = ParamItem{
Key: "minio.address",
DefaultValue: "",
Version: "2.0.0",
Formatter: func(addr string) string {
if addr == "" {
return ""
}
if strings.Contains(addr, ":") {
return addr
}
port, _ := p.Port.get()
return addr + ":" + port
},
}
p.Address.Init(base.mgr)
p.AccessKeyID = ParamItem{
Key: "minio.accessKeyID",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.AccessKeyID.Init(base.mgr)
p.SecretAccessKey = ParamItem{
Key: "minio.secretAccessKey",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.SecretAccessKey.Init(base.mgr)
p.UseSSL = ParamItem{
Key: "minio.useSSL",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.UseSSL.Init(base.mgr)
p.BucketName = ParamItem{
Key: "minio.bucketName",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.BucketName.Init(base.mgr)
p.RootPath = ParamItem{
Key: "minio.rootPath",
Version: "2.0.0",
PanicIfEmpty: true,
}
p.RootPath.Init(base.mgr)
p.UseIAM = ParamItem{
Key: "minio.useIAM",
DefaultValue: DefaultMinioUseIAM,
Version: "2.0.0",
}
p.UseIAM.Init(base.mgr)
p.CloudProvider = ParamItem{
Key: "minio.cloudProvider",
DefaultValue: DefaultMinioCloudProvider,
Version: "2.2.0",
}
p.CloudProvider.Init(base.mgr)
p.IAMEndpoint = ParamItem{
Key: "minio.iamEndpoint",
DefaultValue: DefaultMinioIAMEndpoint,
Version: "2.0.0",
}
p.IAMEndpoint.Init(base.mgr)
}