// Copyright (C) 2019-2020 Zilliz. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. package paramtable import ( "math" "os" "runtime" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/shirou/gopsutil/v3/disk" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" ) const ( // DefaultRetentionDuration defines the default duration for retention which is 1 days in seconds. DefaultRetentionDuration = 3600 * 24 // DefaultIndexSliceSize defines the default slice size of index file when serializing. DefaultIndexSliceSize = 16 DefaultGracefulTime = 5000 //ms DefaultThreadCoreCoefficient = 10 DefaultSessionTTL = 60 //s DefaultSessionRetryTimes = 30 DefaultMaxDegree = 56 DefaultSearchListSize = 100 DefaultPQCodeBudgetGBRatio = 0.125 DefaultBuildNumThreadsRatio = 1.0 DefaultSearchCacheBudgetGBRatio = 0.10 DefaultLoadNumThreadRatio = 8.0 DefaultBeamWidthRatio = 4.0 ) // ComponentParam is used to quickly and easily access all components' configurations. type ComponentParam struct { ServiceParam once sync.Once CommonCfg commonConfig QuotaConfig quotaConfig AutoIndexConfig autoIndexConfig RootCoordCfg rootCoordConfig ProxyCfg proxyConfig QueryCoordCfg queryCoordConfig QueryNodeCfg queryNodeConfig DataCoordCfg dataCoordConfig DataNodeCfg dataNodeConfig IndexCoordCfg indexCoordConfig IndexNodeCfg indexNodeConfig HookCfg HookConfig } // InitOnce initialize once func (p *ComponentParam) InitOnce() { p.once.Do(func() { p.Init() }) } // Init initialize the global param table func (p *ComponentParam) Init() { p.ServiceParam.Init() p.CommonCfg.init(&p.BaseTable) p.QuotaConfig.init(&p.BaseTable) p.AutoIndexConfig.init(&p.BaseTable) p.RootCoordCfg.init(&p.BaseTable) p.ProxyCfg.init(&p.BaseTable) p.QueryCoordCfg.init(&p.BaseTable) p.QueryNodeCfg.init(&p.BaseTable) p.DataCoordCfg.init(&p.BaseTable) p.DataNodeCfg.init(&p.BaseTable) p.IndexCoordCfg.init(&p.BaseTable) p.IndexNodeCfg.init(&p.BaseTable) p.HookCfg.init() } func (p *ComponentParam) RocksmqEnable() bool { return p.RocksmqCfg.Path.GetValue() != "" } func (p *ComponentParam) PulsarEnable() bool { return p.PulsarCfg.Address.GetValue() != "" } func (p *ComponentParam) KafkaEnable() bool { return p.KafkaCfg.Address.GetValue() != "" } // ///////////////////////////////////////////////////////////////////////////// // --- common --- type commonConfig struct { Base *BaseTable ClusterPrefix string ProxySubName string RootCoordTimeTick string RootCoordStatistics string RootCoordDml string RootCoordDelta string RootCoordSubName string QueryCoordSearch string QueryCoordSearchResult string QueryCoordTimeTick string QueryNodeSubName string DataCoordStatistic string DataCoordTimeTick string DataCoordSegmentInfo string DataCoordSubName string DataNodeSubName string DefaultPartitionName string DefaultIndexName string RetentionDuration int64 EntityExpirationTTL time.Duration IndexSliceSize int64 ThreadCoreCoefficient int64 MaxDegree int64 SearchListSize int64 PQCodeBudgetGBRatio float64 BuildNumThreadsRatio float64 SearchCacheBudgetGBRatio float64 LoadNumThreadRatio float64 BeamWidthRatio float64 GracefulTime int64 StorageType string SimdType string AuthorizationEnabled bool ClusterName string SessionTTL int64 SessionRetryTimes int64 } func (p *commonConfig) init(base *BaseTable) { p.Base = base // must init cluster prefix first p.initClusterPrefix() p.initProxySubName() p.initRootCoordTimeTick() p.initRootCoordStatistics() p.initRootCoordDml() p.initRootCoordDelta() p.initRootCoordSubName() p.initQueryCoordSearch() p.initQueryCoordSearchResult() p.initQueryCoordTimeTick() p.initQueryNodeSubName() p.initDataCoordStatistic() p.initDataCoordTimeTick() p.initDataCoordSegmentInfo() p.initDataCoordSubName() p.initDataNodeSubName() p.initDefaultPartitionName() p.initDefaultIndexName() p.initRetentionDuration() p.initEntityExpiration() p.initSimdType() p.initIndexSliceSize() p.initMaxDegree() p.initSearchListSize() p.initPQCodeBudgetGBRatio() p.initBuildNumThreadsRatio() p.initSearchCacheBudgetGBRatio() p.initLoadNumThreadRatio() p.initBeamWidthRatio() p.initGracefulTime() p.initStorageType() p.initThreadCoreCoefficient() p.initEnableAuthorization() p.initClusterName() p.initSessionTTL() p.initSessionRetryTimes() } func (p *commonConfig) initClusterPrefix() { keys := []string{ "msgChannel.chanNamePrefix.cluster", "common.chanNamePrefix.cluster", } str, err := p.Base.LoadWithPriority(keys) if err != nil { panic(err) } p.ClusterPrefix = str } func (p *commonConfig) initChanNamePrefix(keys []string) string { value, err := p.Base.LoadWithPriority(keys) if err != nil { panic(err) } s := []string{p.ClusterPrefix, value} return strings.Join(s, "-") } // --- proxy --- func (p *commonConfig) initProxySubName() { keys := []string{ "msgChannel.subNamePrefix.proxySubNamePrefix", "common.subNamePrefix.proxySubNamePrefix", } p.ProxySubName = p.initChanNamePrefix(keys) } // --- rootcoord --- // Deprecate func (p *commonConfig) initRootCoordTimeTick() { keys := []string{ "msgChannel.chanNamePrefix.rootCoordTimeTick", "common.chanNamePrefix.rootCoordTimeTick", } p.RootCoordTimeTick = p.initChanNamePrefix(keys) } func (p *commonConfig) initRootCoordStatistics() { keys := []string{ "msgChannel.chanNamePrefix.rootCoordStatistics", "common.chanNamePrefix.rootCoordStatistics", } p.RootCoordStatistics = p.initChanNamePrefix(keys) } func (p *commonConfig) initRootCoordDml() { keys := []string{ "msgChannel.chanNamePrefix.rootCoordDml", "common.chanNamePrefix.rootCoordDml", } p.RootCoordDml = p.initChanNamePrefix(keys) } func (p *commonConfig) initRootCoordDelta() { keys := []string{ "msgChannel.chanNamePrefix.rootCoordDelta", "common.chanNamePrefix.rootCoordDelta", } p.RootCoordDelta = p.initChanNamePrefix(keys) } func (p *commonConfig) initRootCoordSubName() { keys := []string{ "msgChannel.subNamePrefix.rootCoordSubNamePrefix", "common.subNamePrefix.rootCoordSubNamePrefix", } p.RootCoordSubName = p.initChanNamePrefix(keys) } // --- querycoord --- func (p *commonConfig) initQueryCoordSearch() { keys := []string{ "msgChannel.chanNamePrefix.search", "common.chanNamePrefix.search", } p.QueryCoordSearch = p.initChanNamePrefix(keys) } // Deprecated, search result use grpc instead of a result channel func (p *commonConfig) initQueryCoordSearchResult() { keys := []string{ "msgChannel.chanNamePrefix.searchResult", "common.chanNamePrefix.searchResult", } p.QueryCoordSearchResult = p.initChanNamePrefix(keys) } // Deprecate func (p *commonConfig) initQueryCoordTimeTick() { keys := []string{ "msgChannel.chanNamePrefix.queryTimeTick", "common.chanNamePrefix.queryTimeTick", } p.QueryCoordTimeTick = p.initChanNamePrefix(keys) } // --- querynode --- func (p *commonConfig) initQueryNodeSubName() { keys := []string{ "msgChannel.subNamePrefix.queryNodeSubNamePrefix", "common.subNamePrefix.queryNodeSubNamePrefix", } p.QueryNodeSubName = p.initChanNamePrefix(keys) } // --- datacoord --- func (p *commonConfig) initDataCoordStatistic() { keys := []string{ "msgChannel.chanNamePrefix.dataCoordStatistic", "common.chanNamePrefix.dataCoordStatistic", } p.DataCoordStatistic = p.initChanNamePrefix(keys) } // Deprecate func (p *commonConfig) initDataCoordTimeTick() { keys := []string{ "msgChannel.chanNamePrefix.dataCoordTimeTick", "common.chanNamePrefix.dataCoordTimeTick", } p.DataCoordTimeTick = p.initChanNamePrefix(keys) } func (p *commonConfig) initDataCoordSegmentInfo() { keys := []string{ "msgChannel.chanNamePrefix.dataCoordSegmentInfo", "common.chanNamePrefix.dataCoordSegmentInfo", } p.DataCoordSegmentInfo = p.initChanNamePrefix(keys) } func (p *commonConfig) initDataCoordSubName() { keys := []string{ "msgChannel.subNamePrefix.dataCoordSubNamePrefix", "common.subNamePrefix.dataCoordSubNamePrefix", } p.DataCoordSubName = p.initChanNamePrefix(keys) } func (p *commonConfig) initDataNodeSubName() { keys := []string{ "msgChannel.subNamePrefix.dataNodeSubNamePrefix", "common.subNamePrefix.dataNodeSubNamePrefix", } p.DataNodeSubName = p.initChanNamePrefix(keys) } func (p *commonConfig) initDefaultPartitionName() { p.DefaultPartitionName = p.Base.LoadWithDefault("common.defaultPartitionName", "_default") } func (p *commonConfig) initDefaultIndexName() { p.DefaultIndexName = p.Base.LoadWithDefault("common.defaultIndexName", "_default_idx") } func (p *commonConfig) initRetentionDuration() { p.RetentionDuration = p.Base.ParseInt64WithDefault("common.retentionDuration", DefaultRetentionDuration) } func (p *commonConfig) initEntityExpiration() { ttl := p.Base.ParseInt64WithDefault("common.entityExpiration", -1) if ttl < 0 { p.EntityExpirationTTL = -1 return } // make sure ttl is larger than retention duration to ensure time travel works if ttl > p.RetentionDuration { p.EntityExpirationTTL = time.Duration(ttl) * time.Second } else { p.EntityExpirationTTL = time.Duration(p.RetentionDuration) * time.Second } } func (p *commonConfig) initSimdType() { keys := []string{ "common.simdType", "knowhere.simdType", } p.SimdType = p.Base.LoadWithDefault2(keys, "auto") } func (p *commonConfig) initIndexSliceSize() { p.IndexSliceSize = p.Base.ParseInt64WithDefault("common.indexSliceSize", DefaultIndexSliceSize) } func (p *commonConfig) initThreadCoreCoefficient() { p.ThreadCoreCoefficient = p.Base.ParseInt64WithDefault("common.threadCoreCoefficient", DefaultThreadCoreCoefficient) } func (p *commonConfig) initPQCodeBudgetGBRatio() { p.PQCodeBudgetGBRatio = p.Base.ParseFloatWithDefault("common.DiskIndex.PQCodeBudgetGBRatio", DefaultPQCodeBudgetGBRatio) } func (p *commonConfig) initBuildNumThreadsRatio() { p.BuildNumThreadsRatio = p.Base.ParseFloatWithDefault("common.DiskIndex.BuildNumThreadsRatio", DefaultBuildNumThreadsRatio) } func (p *commonConfig) initSearchCacheBudgetGBRatio() { p.SearchCacheBudgetGBRatio = p.Base.ParseFloatWithDefault("common.DiskIndex.SearchCacheBudgetGBRatio", DefaultSearchCacheBudgetGBRatio) } func (p *commonConfig) initLoadNumThreadRatio() { p.LoadNumThreadRatio = p.Base.ParseFloatWithDefault("common.DiskIndex.LoadNumThreadRatio", DefaultLoadNumThreadRatio) } func (p *commonConfig) initBeamWidthRatio() { p.BeamWidthRatio = p.Base.ParseFloatWithDefault("common.DiskIndex.BeamWidthRatio", DefaultBeamWidthRatio) } func (p *commonConfig) initMaxDegree() { p.MaxDegree = p.Base.ParseInt64WithDefault("common.DiskIndex.MaxDegree", DefaultMaxDegree) } func (p *commonConfig) initSearchListSize() { p.SearchListSize = p.Base.ParseInt64WithDefault("common.DiskIndex.SearchListSize", DefaultSearchListSize) } func (p *commonConfig) initGracefulTime() { p.GracefulTime = p.Base.ParseInt64WithDefault("common.gracefulTime", DefaultGracefulTime) } func (p *commonConfig) initStorageType() { p.StorageType = p.Base.LoadWithDefault("common.storageType", "minio") } func (p *commonConfig) initEnableAuthorization() { p.AuthorizationEnabled = p.Base.ParseBool("common.security.authorizationEnabled", false) } func (p *commonConfig) initClusterName() { p.ClusterName = p.Base.LoadWithDefault("common.cluster.name", "") } func (p *commonConfig) initSessionTTL() { p.SessionTTL = p.Base.ParseInt64WithDefault("common.session.ttl", 60) } func (p *commonConfig) initSessionRetryTimes() { p.SessionRetryTimes = p.Base.ParseInt64WithDefault("common.session.retryTimes", 30) } // ///////////////////////////////////////////////////////////////////////////// // --- rootcoord --- type rootCoordConfig struct { Base *BaseTable DmlChannelNum int64 MaxPartitionNum int64 MinSegmentSizeToEnableIndex int64 ImportTaskExpiration float64 ImportTaskRetention float64 // --- ETCD Path --- ImportTaskSubPath string CreatedTime time.Time UpdatedTime time.Time EnableActiveStandby bool } func (p *rootCoordConfig) init(base *BaseTable) { p.Base = base p.DmlChannelNum = p.Base.ParseInt64WithDefault("rootCoord.dmlChannelNum", 256) p.MaxPartitionNum = p.Base.ParseInt64WithDefault("rootCoord.maxPartitionNum", 4096) p.MinSegmentSizeToEnableIndex = p.Base.ParseInt64WithDefault("rootCoord.minSegmentSizeToEnableIndex", 1024) p.ImportTaskExpiration = p.Base.ParseFloatWithDefault("rootCoord.importTaskExpiration", 15*60) p.ImportTaskRetention = p.Base.ParseFloatWithDefault("rootCoord.importTaskRetention", 24*60*60) p.ImportTaskSubPath = "importtask" p.EnableActiveStandby = p.Base.ParseBool("rootCoord.enableActiveStandby", false) } // ///////////////////////////////////////////////////////////////////////////// // --- proxy --- type AccessLogConfig struct { // if use access log Enable bool // if upload sealed access log file to minio MinioEnable bool // Log path LocalPath string // Log filename, leave empty to disable file log. Filename string // Max size for a single file, in MB. MaxSize int // Max time for single access log file in seconds RotatedTime int64 // Maximum number of old log files to retain. MaxBackups int //File path in minIO RemotePath string } type proxyConfig struct { Base *BaseTable Alias string SoPath string TimeTickInterval time.Duration MsgStreamTimeTickBufSize int64 MaxNameLength int64 MaxUsernameLength int64 MinPasswordLength int64 MaxPasswordLength int64 MaxFieldNum int64 MaxShardNum int32 MaxDimension int64 GinLogging bool MaxUserNum int MaxRoleNum int AccessLog AccessLogConfig // required from QueryCoord SearchResultChannelNames []string RetrieveResultChannelNames []string MaxTaskNum int64 CreatedTime time.Time UpdatedTime time.Time } func (p *proxyConfig) init(base *BaseTable) { p.Base = base p.initTimeTickInterval() p.initMsgStreamTimeTickBufSize() p.initMaxNameLength() p.initMinPasswordLength() p.initMaxUsernameLength() p.initMaxPasswordLength() p.initMaxFieldNum() p.initMaxShardNum() p.initMaxDimension() p.initMaxTaskNum() p.initGinLogging() p.initMaxUserNum() p.initMaxRoleNum() p.initSoPath() p.initAccessLogConfig() } // InitAlias initialize Alias member. func (p *proxyConfig) InitAlias(alias string) { p.Alias = alias } func (p *proxyConfig) initSoPath() { p.SoPath = p.Base.LoadWithDefault("proxy.soPath", "") } func (p *proxyConfig) initTimeTickInterval() { interval := p.Base.ParseIntWithDefault("proxy.timeTickInterval", 200) p.TimeTickInterval = time.Duration(interval) * time.Millisecond } func (p *proxyConfig) initMsgStreamTimeTickBufSize() { p.MsgStreamTimeTickBufSize = p.Base.ParseInt64WithDefault("proxy.msgStream.timeTick.bufSize", 512) } func (p *proxyConfig) initMaxNameLength() { str := p.Base.LoadWithDefault("proxy.maxNameLength", "255") maxNameLength, err := strconv.ParseInt(str, 10, 64) if err != nil { panic(err) } p.MaxNameLength = maxNameLength } func (p *proxyConfig) initMaxUsernameLength() { str := p.Base.LoadWithDefault("proxy.maxUsernameLength", "32") maxUsernameLength, err := strconv.ParseInt(str, 10, 64) if err != nil { panic(err) } p.MaxUsernameLength = maxUsernameLength } func (p *proxyConfig) initMinPasswordLength() { str := p.Base.LoadWithDefault("proxy.minPasswordLength", "6") minPasswordLength, err := strconv.ParseInt(str, 10, 64) if err != nil { panic(err) } p.MinPasswordLength = minPasswordLength } func (p *proxyConfig) initMaxPasswordLength() { str := p.Base.LoadWithDefault("proxy.maxPasswordLength", "256") maxPasswordLength, err := strconv.ParseInt(str, 10, 64) if err != nil { panic(err) } p.MaxPasswordLength = maxPasswordLength } func (p *proxyConfig) initMaxShardNum() { str := p.Base.LoadWithDefault("proxy.maxShardNum", "256") maxShardNum, err := strconv.ParseInt(str, 10, 64) if err != nil { panic(err) } p.MaxShardNum = int32(maxShardNum) } func (p *proxyConfig) initMaxFieldNum() { str := p.Base.LoadWithDefault("proxy.maxFieldNum", "64") maxFieldNum, err := strconv.ParseInt(str, 10, 64) if err != nil { panic(err) } p.MaxFieldNum = maxFieldNum } func (p *proxyConfig) initMaxDimension() { str := p.Base.LoadWithDefault("proxy.maxDimension", "32768") maxDimension, err := strconv.ParseInt(str, 10, 64) if err != nil { panic(err) } p.MaxDimension = maxDimension } func (p *proxyConfig) initMaxTaskNum() { p.MaxTaskNum = p.Base.ParseInt64WithDefault("proxy.maxTaskNum", 1024) } func (p *proxyConfig) initGinLogging() { // Gin logging is on by default. p.GinLogging = p.Base.ParseBool("proxy.ginLogging", true) } func (p *proxyConfig) initMaxUserNum() { str := p.Base.LoadWithDefault("proxy.maxUserNum", "100") maxUserNum, err := strconv.ParseInt(str, 10, 64) if err != nil { panic(err) } p.MaxUserNum = int(maxUserNum) } func (p *proxyConfig) initMaxRoleNum() { str := p.Base.LoadWithDefault("proxy.maxRoleNum", "10") maxRoleNum, err := strconv.ParseInt(str, 10, 64) if err != nil { panic(err) } p.MaxRoleNum = int(maxRoleNum) } func (p *proxyConfig) initAccessLogConfig() { enable := p.Base.ParseBool("proxy.accessLog.enable", true) minioEnable := p.Base.ParseBool("proxy.accessLog.minioEnable", false) p.AccessLog = AccessLogConfig{ Enable: enable, MinioEnable: minioEnable, } if enable { p.initAccessLogFileConfig() } if minioEnable { p.initAccessLogMinioConfig() } } func (p *proxyConfig) initAccessLogFileConfig() { //use os.TempDir() if localPath was empty p.AccessLog.LocalPath = p.Base.LoadWithDefault("proxy.accessLog.localPath", "") p.AccessLog.Filename = p.Base.LoadWithDefault("proxy.accessLog.filename", "milvus_access_log.log") p.AccessLog.MaxSize = p.Base.ParseIntWithDefault("proxy.accessLog.maxSize", 64) p.AccessLog.MaxBackups = p.Base.ParseIntWithDefault("proxy.accessLog.maxBackups", 8) p.AccessLog.RotatedTime = p.Base.ParseInt64WithDefault("proxy.accessLog.rotatedTime", 3600) } func (p *proxyConfig) initAccessLogMinioConfig() { p.AccessLog.RemotePath = p.Base.LoadWithDefault("proxy.accessLog.remotePath", "access_log/") } // ///////////////////////////////////////////////////////////////////////////// // --- querycoord --- type queryCoordConfig struct { Base *BaseTable CreatedTime time.Time UpdatedTime time.Time //---- Task --- RetryNum int32 RetryInterval int64 TaskMergeCap int32 TaskExecutionCap int32 //---- Handoff --- AutoHandoff bool //---- Balance --- AutoBalance bool OverloadedMemoryThresholdPercentage float64 BalanceIntervalSeconds int64 MemoryUsageMaxDifferencePercentage float64 CheckInterval time.Duration ChannelTaskTimeout time.Duration SegmentTaskTimeout time.Duration DistPullInterval time.Duration HeartbeatAvailableInterval time.Duration LoadTimeoutSeconds time.Duration CheckHandoffInterval time.Duration EnableActiveStandby bool NextTargetSurviveTime time.Duration UpdateNextTargetInterval time.Duration } func (p *queryCoordConfig) init(base *BaseTable) { p.Base = base //---- Task --- p.initTaskRetryNum() p.initTaskRetryInterval() p.initTaskMergeCap() p.initTaskExecutionCap() //---- Handoff --- p.initAutoHandoff() //---- Balance --- p.initAutoBalance() p.initOverloadedMemoryThresholdPercentage() p.initBalanceIntervalSeconds() p.initMemoryUsageMaxDifferencePercentage() p.initCheckInterval() p.initChannelTaskTimeout() p.initSegmentTaskTimeout() p.initDistPullInterval() p.initHeartbeatAvailableInterval() p.initLoadTimeoutSeconds() p.initEnableActiveStandby() p.initNextTargetSurviveTime() p.initUpdateNextTargetInterval() } func (p *queryCoordConfig) initTaskRetryNum() { p.RetryNum = p.Base.ParseInt32WithDefault("queryCoord.task.retrynum", 5) } func (p *queryCoordConfig) initTaskRetryInterval() { p.RetryInterval = p.Base.ParseInt64WithDefault("queryCoord.task.retryinterval", int64(10*time.Second)) } func (p *queryCoordConfig) initTaskMergeCap() { p.TaskMergeCap = p.Base.ParseInt32WithDefault("queryCoord.taskMergeCap", 16) } func (p *queryCoordConfig) initTaskExecutionCap() { p.TaskExecutionCap = p.Base.ParseInt32WithDefault("queryCoord.taskExecutionCap", 256) } func (p *queryCoordConfig) initAutoHandoff() { handoff, err := p.Base.Load("queryCoord.autoHandoff") if err != nil { panic(err) } p.AutoHandoff, err = strconv.ParseBool(handoff) if err != nil { panic(err) } } func (p *queryCoordConfig) initAutoBalance() { balanceStr := p.Base.LoadWithDefault("queryCoord.autoBalance", "false") autoBalance, err := strconv.ParseBool(balanceStr) if err != nil { panic(err) } p.AutoBalance = autoBalance } func (p *queryCoordConfig) initOverloadedMemoryThresholdPercentage() { overloadedMemoryThresholdPercentage := p.Base.LoadWithDefault("queryCoord.overloadedMemoryThresholdPercentage", "90") thresholdPercentage, err := strconv.ParseInt(overloadedMemoryThresholdPercentage, 10, 64) if err != nil { panic(err) } p.OverloadedMemoryThresholdPercentage = float64(thresholdPercentage) / 100 } func (p *queryCoordConfig) initBalanceIntervalSeconds() { balanceInterval := p.Base.LoadWithDefault("queryCoord.balanceIntervalSeconds", "60") interval, err := strconv.ParseInt(balanceInterval, 10, 64) if err != nil { panic(err) } p.BalanceIntervalSeconds = interval } func (p *queryCoordConfig) initMemoryUsageMaxDifferencePercentage() { maxDiff := p.Base.LoadWithDefault("queryCoord.memoryUsageMaxDifferencePercentage", "30") diffPercentage, err := strconv.ParseInt(maxDiff, 10, 64) if err != nil { panic(err) } p.MemoryUsageMaxDifferencePercentage = float64(diffPercentage) / 100 } func (p *queryCoordConfig) initEnableActiveStandby() { p.EnableActiveStandby = p.Base.ParseBool("queryCoord.enableActiveStandby", false) } func (p *queryCoordConfig) initCheckInterval() { interval := p.Base.LoadWithDefault("queryCoord.checkInterval", "1000") checkInterval, err := strconv.ParseInt(interval, 10, 64) if err != nil { panic(err) } p.CheckInterval = time.Duration(checkInterval) * time.Millisecond } func (p *queryCoordConfig) initChannelTaskTimeout() { timeout := p.Base.LoadWithDefault("queryCoord.channelTaskTimeout", "60000") taskTimeout, err := strconv.ParseInt(timeout, 10, 64) if err != nil { panic(err) } p.ChannelTaskTimeout = time.Duration(taskTimeout) * time.Millisecond } func (p *queryCoordConfig) initSegmentTaskTimeout() { timeout := p.Base.LoadWithDefault("queryCoord.segmentTaskTimeout", "120000") taskTimeout, err := strconv.ParseInt(timeout, 10, 64) if err != nil { panic(err) } p.SegmentTaskTimeout = time.Duration(taskTimeout) * time.Millisecond } func (p *queryCoordConfig) initDistPullInterval() { interval := p.Base.LoadWithDefault("queryCoord.distPullInterval", "500") pullInterval, err := strconv.ParseInt(interval, 10, 64) if err != nil { panic(err) } p.DistPullInterval = time.Duration(pullInterval) * time.Millisecond } func (p *queryCoordConfig) initHeartbeatAvailableInterval() { interval := p.Base.ParseInt32WithDefault("queryCoord.heartbeatAvailableInterval", 2500) p.HeartbeatAvailableInterval = time.Duration(interval) * time.Millisecond } func (p *queryCoordConfig) initLoadTimeoutSeconds() { timeout := p.Base.LoadWithDefault("queryCoord.loadTimeoutSeconds", "600") loadTimeout, err := strconv.ParseInt(timeout, 10, 64) if err != nil { panic(err) } p.LoadTimeoutSeconds = time.Duration(loadTimeout) * time.Second } func (p *queryCoordConfig) initNextTargetSurviveTime() { interval := p.Base.LoadWithDefault("queryCoord.NextTargetSurviveTime", "300") nextTargetSurviveTime, err := strconv.ParseInt(interval, 10, 64) if err != nil { panic(err) } p.NextTargetSurviveTime = time.Duration(nextTargetSurviveTime) * time.Second } func (p *queryCoordConfig) initUpdateNextTargetInterval() { interval := p.Base.LoadWithDefault("queryCoord.UpdateNextTargetInterval", "10") updateNextTargetInterval, err := strconv.ParseInt(interval, 10, 64) if err != nil { panic(err) } p.UpdateNextTargetInterval = time.Duration(updateNextTargetInterval) * time.Second } // ///////////////////////////////////////////////////////////////////////////// // --- querynode --- type queryNodeConfig struct { Base *BaseTable Alias string FlowGraphMaxQueueLength int32 FlowGraphMaxParallelism int32 // stats StatsPublishInterval int SliceIndex int // segcore ChunkRows int64 SmallIndexNlist int64 SmallIndexNProbe int64 CreatedTime time.Time UpdatedTime time.Time // memory limit LoadMemoryUsageFactor float64 OverloadedMemoryThresholdPercentage float64 // enable disk EnableDisk bool DiskCapacityLimit int64 MaxDiskUsagePercentage float64 // cache limit CacheEnabled bool CacheMemoryLimit int64 GroupEnabled bool MaxReceiveChanSize int32 MaxUnsolvedQueueSize int32 MaxReadConcurrency int32 MaxGroupNQ int64 TopKMergeRatio float64 CPURatio float64 GCHelperEnabled bool MinimumGOGCConfig int MaximumGOGCConfig int } func (p *queryNodeConfig) init(base *BaseTable) { p.Base = base p.initFlowGraphMaxQueueLength() p.initFlowGraphMaxParallelism() p.initStatsPublishInterval() p.initSmallIndexParams() p.initLoadMemoryUsageFactor() p.initOverloadedMemoryThresholdPercentage() p.initCacheMemoryLimit() p.initCacheEnabled() p.initGroupEnabled() p.initMaxReceiveChanSize() p.initMaxReadConcurrency() p.initMaxUnsolvedQueueSize() p.initMaxGroupNQ() p.initTopKMergeRatio() p.initCPURatio() p.initEnableDisk() p.initDiskCapacity() p.initMaxDiskUsagePercentage() p.initGCTunerEnbaled() p.initMaximumGOGC() p.initMinimumGOGC() } // InitAlias initializes an alias for the QueryNode role. func (p *queryNodeConfig) InitAlias(alias string) { p.Alias = alias } // advanced params // stats func (p *queryNodeConfig) initStatsPublishInterval() { p.StatsPublishInterval = p.Base.ParseIntWithDefault("queryNode.stats.publishInterval", 1000) } // dataSync: func (p *queryNodeConfig) initFlowGraphMaxQueueLength() { p.FlowGraphMaxQueueLength = p.Base.ParseInt32WithDefault("queryNode.dataSync.flowGraph.maxQueueLength", 1024) } func (p *queryNodeConfig) initFlowGraphMaxParallelism() { p.FlowGraphMaxParallelism = p.Base.ParseInt32WithDefault("queryNode.dataSync.flowGraph.maxParallelism", 1024) } func (p *queryNodeConfig) initSmallIndexParams() { p.ChunkRows = p.Base.ParseInt64WithDefault("queryNode.segcore.chunkRows", 1024) if p.ChunkRows < 1024 { log.Warn("chunk rows can not be less than 1024, force set to 1024", zap.Any("current", p.ChunkRows)) p.ChunkRows = 1024 } // default NList is the first nlist var defaultNList int64 for i := int64(0); i < p.ChunkRows; i++ { if math.Pow(2.0, float64(i)) > math.Sqrt(float64(p.ChunkRows)) { defaultNList = int64(math.Pow(2, float64(i))) break } } p.SmallIndexNlist = p.Base.ParseInt64WithDefault("queryNode.segcore.smallIndex.nlist", defaultNList) if p.SmallIndexNlist > p.ChunkRows/8 { log.Warn("small index nlist must smaller than chunkRows/8, force set to", zap.Any("nliit", p.ChunkRows/8)) p.SmallIndexNlist = p.ChunkRows / 8 } defaultNprobe := p.SmallIndexNlist / 16 p.SmallIndexNProbe = p.Base.ParseInt64WithDefault("queryNode.segcore.smallIndex.nprobe", defaultNprobe) if p.SmallIndexNProbe > p.SmallIndexNlist { log.Warn("small index nprobe must smaller than nlist, force set to", zap.Any("nprobe", p.SmallIndexNlist)) p.SmallIndexNProbe = p.SmallIndexNlist } } func (p *queryNodeConfig) initLoadMemoryUsageFactor() { loadMemoryUsageFactor := p.Base.LoadWithDefault("queryNode.loadMemoryUsageFactor", "3") factor, err := strconv.ParseFloat(loadMemoryUsageFactor, 64) if err != nil { panic(err) } p.LoadMemoryUsageFactor = factor } func (p *queryNodeConfig) initOverloadedMemoryThresholdPercentage() { overloadedMemoryThresholdPercentage := p.Base.LoadWithDefault("queryCoord.overloadedMemoryThresholdPercentage", "90") thresholdPercentage, err := strconv.ParseInt(overloadedMemoryThresholdPercentage, 10, 64) if err != nil { panic(err) } p.OverloadedMemoryThresholdPercentage = float64(thresholdPercentage) / 100 } func (p *queryNodeConfig) initCacheMemoryLimit() { overloadedMemoryThresholdPercentage := p.Base.LoadWithDefault("queryNode.cache.memoryLimit", "2147483648") cacheMemoryLimit, err := strconv.ParseInt(overloadedMemoryThresholdPercentage, 10, 64) if err != nil { panic(err) } p.CacheMemoryLimit = cacheMemoryLimit } func (p *queryNodeConfig) initCacheEnabled() { var err error cacheEnabled := p.Base.LoadWithDefault("queryNode.cache.enabled", "true") p.CacheEnabled, err = strconv.ParseBool(cacheEnabled) if err != nil { panic(err) } } func (p *queryNodeConfig) initGroupEnabled() { p.GroupEnabled = p.Base.ParseBool("queryNode.grouping.enabled", true) } func (p *queryNodeConfig) initMaxReceiveChanSize() { p.MaxReceiveChanSize = p.Base.ParseInt32WithDefault("queryNode.scheduler.receiveChanSize", 10240) } func (p *queryNodeConfig) initMaxUnsolvedQueueSize() { p.MaxUnsolvedQueueSize = p.Base.ParseInt32WithDefault("queryNode.scheduler.unsolvedQueueSize", 10240) } func (p *queryNodeConfig) initCPURatio() { p.CPURatio = p.Base.ParseFloatWithDefault("queryNode.scheduler.cpuRatio", 10.0) } func (p *queryNodeConfig) initMaxReadConcurrency() { readConcurrencyRatio := p.Base.ParseFloatWithDefault("queryNode.scheduler.maxReadConcurrentRatio", 2.0) cpuNum := int32(runtime.GOMAXPROCS(0)) p.MaxReadConcurrency = int32(float64(cpuNum) * readConcurrencyRatio) if p.MaxReadConcurrency < 1 { p.MaxReadConcurrency = 1 // MaxReadConcurrency must >= 1 } else if p.MaxReadConcurrency > cpuNum*100 { p.MaxReadConcurrency = cpuNum * 100 // MaxReadConcurrency must <= 100*cpuNum } } func (p *queryNodeConfig) initMaxGroupNQ() { p.MaxGroupNQ = p.Base.ParseInt64WithDefault("queryNode.grouping.maxNQ", 1000) } func (p *queryNodeConfig) initTopKMergeRatio() { p.TopKMergeRatio = p.Base.ParseFloatWithDefault("queryNode.grouping.topKMergeRatio", 10.0) } func (p *queryNodeConfig) initEnableDisk() { var err error enableDisk := p.Base.LoadWithDefault("queryNode.enableDisk", "false") p.EnableDisk, err = strconv.ParseBool(enableDisk) if err != nil { panic(err) } } func (p *queryNodeConfig) initMaxDiskUsagePercentage() { maxDiskUsagePercentageStr := p.Base.LoadWithDefault("queryNode.maxDiskUsagePercentage", "95") maxDiskUsagePercentage, err := strconv.ParseInt(maxDiskUsagePercentageStr, 10, 64) if err != nil { panic(err) } p.MaxDiskUsagePercentage = float64(maxDiskUsagePercentage) / 100 } func (p *queryNodeConfig) initDiskCapacity() { diskSizeStr := os.Getenv("LOCAL_STORAGE_SIZE") if len(diskSizeStr) == 0 { diskUsage, err := disk.Usage("/") if err != nil { panic(err) } p.DiskCapacityLimit = int64(diskUsage.Total) return } diskSize, err := strconv.ParseInt(diskSizeStr, 10, 64) if err != nil { panic(err) } p.DiskCapacityLimit = diskSize * 1024 * 1024 * 1024 } func (p *queryNodeConfig) initGCTunerEnbaled() { p.GCHelperEnabled = p.Base.ParseBool("queryNode.gchelper.enabled", true) } func (p *queryNodeConfig) initMinimumGOGC() { p.MinimumGOGCConfig = p.Base.ParseIntWithDefault("queryNode.gchelper.minimumGoGC", 30) } func (p *queryNodeConfig) initMaximumGOGC() { p.MaximumGOGCConfig = p.Base.ParseIntWithDefault("queryNode.gchelper.maximumGoGC", 200) } // ///////////////////////////////////////////////////////////////////////////// // --- datacoord --- type dataCoordConfig struct { Base *BaseTable // --- ETCD --- ChannelWatchSubPath string // --- CHANNEL --- MaxWatchDuration time.Duration // --- SEGMENTS --- SegmentMaxSize float64 DiskSegmentMaxSize float64 SegmentSealProportion float64 SegAssignmentExpiration int64 SegmentMaxLifetime time.Duration SegmentMaxIdleTime time.Duration SegmentMinSizeFromIdleToSealed float64 CreatedTime time.Time UpdatedTime time.Time // compaction EnableCompaction bool EnableAutoCompaction atomic.Value MinSegmentToMerge int MaxSegmentToMerge int SegmentSmallProportion float64 SegmentCompactableProportion float64 CompactionTimeoutInSeconds int32 CompactionCheckIntervalInSeconds int64 SingleCompactionRatioThreshold float32 SingleCompactionDeltaLogMaxSize int64 SingleCompactionExpiredLogMaxSize int64 SingleCompactionBinlogMaxNum int64 GlobalCompactionInterval time.Duration // Garbage Collection EnableGarbageCollection bool GCInterval time.Duration GCMissingTolerance time.Duration GCDropTolerance time.Duration EnableActiveStandby bool } func (p *dataCoordConfig) init(base *BaseTable) { p.Base = base p.initChannelWatchPrefix() p.initMaxWatchDuration() p.initSegmentMaxSize() p.initDiskSegmentMaxSize() p.initSegmentSealProportion() p.initSegAssignmentExpiration() p.initSegmentMaxLifetime() p.initSegmentMaxIdleTime() p.initSegmentMinSizeFromIdleToSealed() p.initEnableCompaction() p.initEnableAutoCompaction() p.initCompactionMinSegment() p.initCompactionMaxSegment() p.initSegmentSmallProportion() p.initSegmentCompactableProportion() p.initCompactionTimeoutInSeconds() p.initCompactionCheckIntervalInSeconds() p.initSingleCompactionRatioThreshold() p.initSingleCompactionDeltaLogMaxSize() p.initSingleCompactionExpiredLogMaxSize() p.initSingleCompactionBinlogMaxNum() p.initGlobalCompactionInterval() p.initEnableGarbageCollection() p.initGCInterval() p.initGCMissingTolerance() p.initGCDropTolerance() p.initEnableActiveStandby() } func (p *dataCoordConfig) initMaxWatchDuration() { p.MaxWatchDuration = time.Duration(p.Base.ParseInt64WithDefault("dataCoord.channel.maxWatchDuration", 60)) * time.Second } func (p *dataCoordConfig) initSegmentMaxSize() { p.SegmentMaxSize = p.Base.ParseFloatWithDefault("dataCoord.segment.maxSize", 512.0) } func (p *dataCoordConfig) initDiskSegmentMaxSize() { p.DiskSegmentMaxSize = p.Base.ParseFloatWithDefault("dataCoord.segment.diskSegmentMaxSize", 512.0*4) } func (p *dataCoordConfig) initSegmentSealProportion() { p.SegmentSealProportion = p.Base.ParseFloatWithDefault("dataCoord.segment.sealProportion", 0.25) } func (p *dataCoordConfig) initSegAssignmentExpiration() { p.SegAssignmentExpiration = p.Base.ParseInt64WithDefault("dataCoord.segment.assignmentExpiration", 2000) } func (p *dataCoordConfig) initSegmentMaxLifetime() { p.SegmentMaxLifetime = time.Duration(p.Base.ParseInt64WithDefault("dataCoord.segment.maxLife", 24*60*60)) * time.Second } func (p *dataCoordConfig) initSegmentMaxIdleTime() { p.SegmentMaxIdleTime = time.Duration(p.Base.ParseInt64WithDefault("dataCoord.segment.maxIdleTime", 60*60)) * time.Second log.Info("init segment max idle time", zap.String("value", p.SegmentMaxIdleTime.String())) } func (p *dataCoordConfig) initSegmentMinSizeFromIdleToSealed() { p.SegmentMinSizeFromIdleToSealed = p.Base.ParseFloatWithDefault("dataCoord.segment.minSizeFromIdleToSealed", 16.0) log.Info("init segment min size from idle to sealed", zap.Float64("value", p.SegmentMinSizeFromIdleToSealed)) } func (p *dataCoordConfig) initChannelWatchPrefix() { // WARN: this value should not be put to milvus.yaml. It's a default value for channel watch path. // This will be removed after we reconstruct our config module. p.ChannelWatchSubPath = "channelwatch" } func (p *dataCoordConfig) initEnableCompaction() { p.EnableCompaction = p.Base.ParseBool("dataCoord.enableCompaction", false) } func (p *dataCoordConfig) initEnableAutoCompaction() { p.EnableAutoCompaction.Store(p.Base.ParseBool("dataCoord.compaction.enableAutoCompaction", false)) } func (p *dataCoordConfig) initCompactionMinSegment() { p.MinSegmentToMerge = p.Base.ParseIntWithDefault("dataCoord.compaction.min.segment", 4) } func (p *dataCoordConfig) initCompactionMaxSegment() { p.MaxSegmentToMerge = p.Base.ParseIntWithDefault("dataCoord.compaction.max.segment", 30) } func (p *dataCoordConfig) initSegmentSmallProportion() { p.SegmentSmallProportion = p.Base.ParseFloatWithDefault("dataCoord.segment.smallProportion", 0.5) } func (p *dataCoordConfig) initSegmentCompactableProportion() { p.SegmentCompactableProportion = p.Base.ParseFloatWithDefault("dataCoord.segment.compactableProportion", 0.5) } // compaction execution timeout func (p *dataCoordConfig) initCompactionTimeoutInSeconds() { p.CompactionTimeoutInSeconds = p.Base.ParseInt32WithDefault("dataCoord.compaction.timeout", 60*3) } func (p *dataCoordConfig) initCompactionCheckIntervalInSeconds() { p.CompactionCheckIntervalInSeconds = p.Base.ParseInt64WithDefault("dataCoord.compaction.check.interval", 10) } // if total delete entities is large than a ratio of total entities, trigger single compaction. func (p *dataCoordConfig) initSingleCompactionRatioThreshold() { p.SingleCompactionRatioThreshold = float32(p.Base.ParseFloatWithDefault("dataCoord.compaction.single.ratio.threshold", 0.2)) } // if total delta file size > SingleCompactionDeltaLogMaxSize, trigger single compaction func (p *dataCoordConfig) initSingleCompactionDeltaLogMaxSize() { p.SingleCompactionDeltaLogMaxSize = p.Base.ParseInt64WithDefault("dataCoord.compaction.single.deltalog.maxsize", 2*1024*1024) } // if total expired file size > SingleCompactionExpiredLogMaxSize, trigger single compaction func (p *dataCoordConfig) initSingleCompactionExpiredLogMaxSize() { p.SingleCompactionExpiredLogMaxSize = p.Base.ParseInt64WithDefault("dataCoord.compaction.single.expiredlog.maxsize", 10*1024*1024) } // if total binlog number > SingleCompactionBinlogMaxNum, trigger single compaction to ensure binlog number per segment is limited func (p *dataCoordConfig) initSingleCompactionBinlogMaxNum() { p.SingleCompactionBinlogMaxNum = p.Base.ParseInt64WithDefault("dataCoord.compaction.single.binlog.maxnum", 1000) } // interval we check and trigger global compaction func (p *dataCoordConfig) initGlobalCompactionInterval() { p.GlobalCompactionInterval = time.Duration(p.Base.ParseInt64WithDefault("dataCoord.compaction.global.interval", int64(60*time.Second))) } // -- GC -- func (p *dataCoordConfig) initEnableGarbageCollection() { p.EnableGarbageCollection = p.Base.ParseBool("dataCoord.enableGarbageCollection", true) } func (p *dataCoordConfig) initGCInterval() { p.GCInterval = time.Duration(p.Base.ParseInt64WithDefault("dataCoord.gc.interval", 60*60)) * time.Second } func (p *dataCoordConfig) initGCMissingTolerance() { p.GCMissingTolerance = time.Duration(p.Base.ParseInt64WithDefault("dataCoord.gc.missingTolerance", 24*60*60)) * time.Second } func (p *dataCoordConfig) initGCDropTolerance() { p.GCDropTolerance = time.Duration(p.Base.ParseInt64WithDefault("dataCoord.gc.dropTolerance", 24*60*60)) * time.Second } func (p *dataCoordConfig) SetEnableAutoCompaction(enable bool) { p.EnableAutoCompaction.Store(enable) } func (p *dataCoordConfig) GetEnableAutoCompaction() bool { enable := p.EnableAutoCompaction.Load() if enable != nil { return enable.(bool) } return false } func (p *dataCoordConfig) initEnableActiveStandby() { p.EnableActiveStandby = p.Base.ParseBool("dataCoord.enableActiveStandby", false) } // ///////////////////////////////////////////////////////////////////////////// // --- datanode --- type dataNodeConfig struct { Base *BaseTable FlowGraphMaxQueueLength int32 FlowGraphMaxParallelism int32 // segment FlushInsertBufferSize int64 FlushDeleteBufferBytes int64 SyncPeriod time.Duration Alias string // Different datanode in one machine // etcd ChannelWatchSubPath string // io concurrency to fetch stats logs IOConcurrency int CreatedTime time.Time UpdatedTime time.Time } func (p *dataNodeConfig) init(base *BaseTable) { p.Base = base p.initFlowGraphMaxQueueLength() p.initFlowGraphMaxParallelism() p.initFlushInsertBufferSize() p.initFlushDeleteBufferSize() p.initSyncPeriod() p.initIOConcurrency() p.initChannelWatchPath() } // InitAlias init this DataNode alias func (p *dataNodeConfig) InitAlias(alias string) { p.Alias = alias } func (p *dataNodeConfig) initFlowGraphMaxQueueLength() { p.FlowGraphMaxQueueLength = p.Base.ParseInt32WithDefault("dataNode.dataSync.flowGraph.maxQueueLength", 1024) } func (p *dataNodeConfig) initFlowGraphMaxParallelism() { p.FlowGraphMaxParallelism = p.Base.ParseInt32WithDefault("dataNode.dataSync.flowGraph.maxParallelism", 1024) } func (p *dataNodeConfig) initFlushInsertBufferSize() { bufferSize := p.Base.LoadWithDefault2([]string{"DATA_NODE_IBUFSIZE", "datanode.segment.insertBufSize"}, "16777216") bs, err := strconv.ParseInt(bufferSize, 10, 64) if err != nil { panic(err) } p.FlushInsertBufferSize = bs } func (p *dataNodeConfig) initFlushDeleteBufferSize() { deleteBufBytes := p.Base.ParseInt64WithDefault("datanode.segment.deleteBufBytes", 64*1024*1024) p.FlushDeleteBufferBytes = deleteBufBytes } func (p *dataNodeConfig) initSyncPeriod() { syncPeriodInSeconds := p.Base.ParseInt64WithDefault("datanode.segment.syncPeriod", 600) p.SyncPeriod = time.Duration(syncPeriodInSeconds) * time.Second } func (p *dataNodeConfig) initChannelWatchPath() { p.ChannelWatchSubPath = "channelwatch" } func (p *dataNodeConfig) initIOConcurrency() { p.IOConcurrency = p.Base.ParseIntWithDefault("dataNode.dataSync.ioConcurrency", 10) } // ///////////////////////////////////////////////////////////////////////////// // --- indexcoord --- type indexCoordConfig struct { Base *BaseTable BindIndexNodeMode bool IndexNodeAddress string WithCredential bool IndexNodeID int64 MinSegmentNumRowsToEnableIndex int64 GCInterval time.Duration CreatedTime time.Time UpdatedTime time.Time EnableActiveStandby bool } func (p *indexCoordConfig) init(base *BaseTable) { p.Base = base p.initGCInterval() p.initMinSegmentNumRowsToEnableIndex() p.initBindIndexNodeMode() p.initIndexNodeAddress() p.initWithCredential() p.initIndexNodeID() p.initEnableActiveStandby() } func (p *indexCoordConfig) initMinSegmentNumRowsToEnableIndex() { p.MinSegmentNumRowsToEnableIndex = p.Base.ParseInt64WithDefault("indexCoord.minSegmentNumRowsToEnableIndex", 1024) } func (p *indexCoordConfig) initGCInterval() { p.GCInterval = time.Duration(p.Base.ParseInt64WithDefault("indexCoord.gc.interval", 60*10)) * time.Second } func (p *indexCoordConfig) initBindIndexNodeMode() { p.BindIndexNodeMode = p.Base.ParseBool("indexCoord.bindIndexNodeMode.enable", false) } func (p *indexCoordConfig) initIndexNodeAddress() { p.IndexNodeAddress = p.Base.LoadWithDefault("indexCoord.bindIndexNodeMode.address", "localhost:22930") } func (p *indexCoordConfig) initWithCredential() { p.WithCredential = p.Base.ParseBool("indexCoord.bindIndexNodeMode.withCred", false) } func (p *indexCoordConfig) initIndexNodeID() { p.IndexNodeID = p.Base.ParseInt64WithDefault("indexCoord.bindIndexNodeMode.nodeID", 0) } func (p *indexCoordConfig) initEnableActiveStandby() { p.EnableActiveStandby = p.Base.ParseBool("indexCoord.enableActiveStandby", false) } // ///////////////////////////////////////////////////////////////////////////// // --- indexnode --- type indexNodeConfig struct { Base *BaseTable Alias string BuildParallel int CreatedTime time.Time UpdatedTime time.Time // enable disk EnableDisk bool DiskCapacityLimit int64 MaxDiskUsagePercentage float64 } func (p *indexNodeConfig) init(base *BaseTable) { p.Base = base p.initBuildParallel() p.initEnableDisk() p.initDiskCapacity() p.initMaxDiskUsagePercentage() } // InitAlias initializes an alias for the IndexNode role. func (p *indexNodeConfig) InitAlias(alias string) { p.Alias = alias } func (p *indexNodeConfig) initBuildParallel() { p.BuildParallel = p.Base.ParseIntWithDefault("indexNode.scheduler.buildParallel", 1) } func (p *indexNodeConfig) initEnableDisk() { var err error enableDisk := p.Base.LoadWithDefault("indexNode.enableDisk", "false") p.EnableDisk, err = strconv.ParseBool(enableDisk) if err != nil { panic(err) } } func (p *indexNodeConfig) initDiskCapacity() { diskSizeStr := os.Getenv("LOCAL_STORAGE_SIZE") if len(diskSizeStr) == 0 { diskUsage, err := disk.Usage("/") if err != nil { panic(err) } p.DiskCapacityLimit = int64(diskUsage.Total) return } diskSize, err := strconv.ParseInt(diskSizeStr, 10, 64) if err != nil { panic(err) } p.DiskCapacityLimit = diskSize * 1024 * 1024 * 1024 } func (p *indexNodeConfig) initMaxDiskUsagePercentage() { maxDiskUsagePercentageStr := p.Base.LoadWithDefault("indexNode.maxDiskUsagePercentage", "95") maxDiskUsagePercentage, err := strconv.ParseInt(maxDiskUsagePercentageStr, 10, 64) if err != nil { panic(err) } p.MaxDiskUsagePercentage = float64(maxDiskUsagePercentage) / 100 }