Move node.yaml to component.yaml (#11363)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
This commit is contained in:
Xiaofan 2021-11-10 14:27:37 +08:00 committed by GitHub
parent 124bd6c20a
commit 5f08b67156
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 154 additions and 327 deletions

View File

@ -0,0 +1,74 @@
# Licensed to the LF AI & Data foundation under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
dataCoord:
segment:
maxSize: 512 # Maximum size of a segment in MB
sealProportion: 0.75 # It's the minimum proportion for a segment which can be sealed
assignmentExpiration: 2000 # ms
enableCompaction: false
dataNode:
dataSync:
flowGraph:
maxQueueLength: 1024 # Maximum length of task queue in flowgraph
maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph
flush:
# Max buffer size to flush for a single segment.
insertBufSize: 16777216 # Bytes, 16 MB
proxy:
timeTickInterval: 200 # ms, the interval that proxy synchronize the time tick
msgStream:
insert:
bufSize: 1024 # msgPack chan buffer size
search:
bufSize: 512
searchResult:
recvBufSize: 1024 # msgPack chan buffer size
pulsarBufSize: 1024 # pulsar chan buffer size
timeTick:
bufSize: 512
maxNameLength: 255 # max name length of collection or alias
maxFieldNum: 64 # max field number of a collection
maxDimension: 32768 # Maximum dimension of vector
maxShardNum: 256 # Maximum number of shards in a collection
maxTaskNum: 1024 # max task number of proxy task queue
queryNode:
stats:
publishInterval: 1000 # Interval for querynode to report node information (milliseconds)
dataSync:
flowGraph:
maxQueueLength: 1024 # Maximum length of task queue in flowgraph
maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph
msgStream:
search:
recvBufSize: 512 # msgPack channel buffer size
pulsarBufSize: 512 # pulsar channel buffer size
searchResult:
recvBufSize: 64 # msgPack channel buffer size
# Segcore will divide a segment into multiple chunks.
segcore:
chunkRows: 32768 # The number of vectors in a chunk.
rootcoord:
dmlChannelNum: 256 # The number of dml channels created at system startup
maxPartitionNum: 4096 # Maximum number of partitions in a collection
minSegmentSizeToEnableIndex: 1024 # It's a threshold. When the segment size is less than this value, the segment will not be indexed
timeout: 3600 # time out, 5 seconds
timeTickInterval: 200 # ms, the interval that proxy synchronize the time tick

View File

@ -1,23 +0,0 @@
# Licensed to the LF AI & Data foundation under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
dataCoord:
segment:
maxSize: 512 # Maximum size of a segment in MB
sealProportion: 0.75 # It's the minimum proportion for a segment which can be sealed
assignmentExpiration: 2000 # ms
enableCompaction: false

View File

@ -1,26 +0,0 @@
# Licensed to the LF AI & Data foundation under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
dataNode:
dataSync:
flowGraph:
maxQueueLength: 1024 # Maximum length of task queue in flowgraph
maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph
flush:
# Max buffer size to flush for a single segment.
insertBufSize: 16777216 # Bytes, 16 MB

View File

@ -1,39 +0,0 @@
# Licensed to the LF AI & Data foundation under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
proxy:
timeTickInterval: 200 # ms, the interval that proxy synchronize the time tick
msgStream:
insert:
bufSize: 1024 # msgPack chan buffer size
search:
bufSize: 512
searchResult:
recvBufSize: 1024 # msgPack chan buffer size
pulsarBufSize: 1024 # pulsar chan buffer size
timeTick:
bufSize: 512
maxNameLength: 255 # max name length of collection or alias
maxFieldNum: 64 # max field number of a collection
maxDimension: 32768 # Maximum dimension of vector
maxShardNum: 256 # Maximum number of shards in a collection
maxTaskNum: 1024 # max task number of proxy task queue

View File

@ -1,36 +0,0 @@
# Licensed to the LF AI & Data foundation under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
queryNode:
stats:
publishInterval: 1000 # Interval for querynode to report node information (milliseconds)
dataSync:
flowGraph:
maxQueueLength: 1024 # Maximum length of task queue in flowgraph
maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph
msgStream:
search:
recvBufSize: 512 # msgPack channel buffer size
pulsarBufSize: 512 # pulsar channel buffer size
searchResult:
recvBufSize: 64 # msgPack channel buffer size
# Segcore will divide a segment into multiple chunks.
segcore:
chunkRows: 32768 # The number of vectors in a chunk.

View File

@ -1,23 +0,0 @@
# Licensed to the LF AI & Data foundation under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
rootcoord:
dmlChannelNum: 256 # The number of dml channels created at system startup
maxPartitionNum: 4096 # Maximum number of partitions in a collection
minSegmentSizeToEnableIndex: 1024 # It's a threshold. When the segment size is less than this value, the segment will not be indexed
timeout: 3600 # time out, 5 seconds
timeTickInterval: 200 # ms, the interval that proxy synchronize the time tick

View File

@ -80,10 +80,6 @@ func (p *ParamTable) Init() {
// load yaml // load yaml
p.BaseTable.Init() p.BaseTable.Init()
if err := p.LoadYaml("advanced/data_coord.yaml"); err != nil {
panic(err)
}
// set members // set members
p.initEtcdEndpoints() p.initEtcdEndpoints()
p.initMetaRootPath() p.initMetaRootPath()

View File

@ -104,10 +104,6 @@ func (p *ParamTable) InitOnce() {
// Init initializes DataNode configs // Init initializes DataNode configs
func (p *ParamTable) Init() { func (p *ParamTable) Init() {
p.BaseTable.Init() p.BaseTable.Init()
err := p.LoadYaml("advanced/data_node.yaml")
if err != nil {
panic(err)
}
p.initFlowGraphMaxQueueLength() p.initFlowGraphMaxQueueLength()
p.initFlowGraphMaxParallelism() p.initFlowGraphMaxParallelism()
@ -141,11 +137,11 @@ func (p *ParamTable) Init() {
} }
func (p *ParamTable) initFlowGraphMaxQueueLength() { func (p *ParamTable) initFlowGraphMaxQueueLength() {
p.FlowGraphMaxQueueLength = p.ParseInt32("dataNode.dataSync.flowGraph.maxQueueLength") p.FlowGraphMaxQueueLength = p.ParseInt32WithDefault("dataNode.dataSync.flowGraph.maxQueueLength", 1024)
} }
func (p *ParamTable) initFlowGraphMaxParallelism() { func (p *ParamTable) initFlowGraphMaxParallelism() {
p.FlowGraphMaxParallelism = p.ParseInt32("dataNode.dataSync.flowGraph.maxParallelism") p.FlowGraphMaxParallelism = p.ParseInt32WithDefault("dataNode.dataSync.flowGraph.maxParallelism", 1024)
} }
func (p *ParamTable) initFlushInsertBufferSize() { func (p *ParamTable) initFlushInsertBufferSize() {

View File

@ -52,10 +52,6 @@ type ParamTable struct {
func (p *ParamTable) Init() { func (p *ParamTable) Init() {
once.Do(func() { once.Do(func() {
p.BaseTable.Init() p.BaseTable.Init()
err := p.LoadYaml("advanced/root_coord.yaml")
if err != nil {
panic(err)
}
p.initAddress() p.initAddress()
p.initPort() p.initPort()
p.initIndexCoordAddress() p.initIndexCoordAddress()

View File

@ -55,11 +55,6 @@ var once sync.Once
// Init is used to initialize configuration items. // Init is used to initialize configuration items.
func (pt *ParamTable) Init() { func (pt *ParamTable) Init() {
pt.BaseTable.Init() pt.BaseTable.Init()
// TODO, load index_node.yaml
/*err := pt.LoadYaml("advanced/index_coord.yaml")
if err != nil {
panic(err)
}*/
pt.initEtcdEndpoints() pt.initEtcdEndpoints()
pt.initMetaRootPath() pt.initMetaRootPath()

View File

@ -68,11 +68,6 @@ func (pt *ParamTable) InitAlias(alias string) {
// Init is used to initialize configuration items. // Init is used to initialize configuration items.
func (pt *ParamTable) Init() { func (pt *ParamTable) Init() {
pt.BaseTable.Init() pt.BaseTable.Init()
// TODO, load index_node.yaml
/*err := pt.LoadYaml("advanced/index_node.yaml")
if err != nil {
panic(err)
}*/
pt.initParams() pt.initParams()
} }

View File

@ -88,11 +88,6 @@ func (pt *ParamTable) InitOnce() {
// Init of BaseTable and do some other initialization. // Init of BaseTable and do some other initialization.
func (pt *ParamTable) Init() { func (pt *ParamTable) Init() {
pt.BaseTable.Init() pt.BaseTable.Init()
err := pt.LoadYaml("advanced/proxy.yaml")
if err != nil {
panic(err)
}
pt.initEtcdEndpoints() pt.initEtcdEndpoints()
pt.initMetaRootPath() pt.initMetaRootPath()
pt.initPulsarAddress() pt.initPulsarAddress()
@ -139,14 +134,7 @@ func (pt *ParamTable) initRocksmqPath() {
} }
func (pt *ParamTable) initTimeTickInterval() { func (pt *ParamTable) initTimeTickInterval() {
intervalStr, err := pt.Load("proxy.timeTickInterval") interval := pt.ParseIntWithDefault("proxy.timeTickInterval", 200)
if err != nil {
panic(err)
}
interval, err := strconv.Atoi(intervalStr)
if err != nil {
panic(err)
}
pt.TimeTickInterval = time.Duration(interval) * time.Millisecond pt.TimeTickInterval = time.Duration(interval) * time.Millisecond
} }
@ -178,14 +166,11 @@ func (pt *ParamTable) initProxyTimeTickChannelNames() {
} }
func (pt *ParamTable) initMsgStreamTimeTickBufSize() { func (pt *ParamTable) initMsgStreamTimeTickBufSize() {
pt.MsgStreamTimeTickBufSize = pt.ParseInt64("proxy.msgStream.timeTick.bufSize") pt.MsgStreamTimeTickBufSize = pt.ParseInt64WithDefault("proxy.msgStream.timeTick.bufSize", 512)
} }
func (pt *ParamTable) initMaxNameLength() { func (pt *ParamTable) initMaxNameLength() {
str, err := pt.Load("proxy.maxNameLength") str := pt.LoadWithDefault("proxy.maxNameLength", "255")
if err != nil {
panic(err)
}
maxNameLength, err := strconv.ParseInt(str, 10, 64) maxNameLength, err := strconv.ParseInt(str, 10, 64)
if err != nil { if err != nil {
panic(err) panic(err)
@ -194,10 +179,7 @@ func (pt *ParamTable) initMaxNameLength() {
} }
func (pt *ParamTable) initMaxShardNum() { func (pt *ParamTable) initMaxShardNum() {
str, err := pt.Load("proxy.maxShardNum") str := pt.LoadWithDefault("proxy.maxShardNum", "256")
if err != nil {
panic(err)
}
maxShardNum, err := strconv.ParseInt(str, 10, 64) maxShardNum, err := strconv.ParseInt(str, 10, 64)
if err != nil { if err != nil {
panic(err) panic(err)
@ -206,10 +188,7 @@ func (pt *ParamTable) initMaxShardNum() {
} }
func (pt *ParamTable) initMaxFieldNum() { func (pt *ParamTable) initMaxFieldNum() {
str, err := pt.Load("proxy.maxFieldNum") str := pt.LoadWithDefault("proxy.maxFieldNum", "64")
if err != nil {
panic(err)
}
maxFieldNum, err := strconv.ParseInt(str, 10, 64) maxFieldNum, err := strconv.ParseInt(str, 10, 64)
if err != nil { if err != nil {
panic(err) panic(err)
@ -218,10 +197,7 @@ func (pt *ParamTable) initMaxFieldNum() {
} }
func (pt *ParamTable) initMaxDimension() { func (pt *ParamTable) initMaxDimension() {
str, err := pt.Load("proxy.maxDimension") str := pt.LoadWithDefault("proxy.maxDimension", "32768")
if err != nil {
panic(err)
}
maxDimension, err := strconv.ParseInt(str, 10, 64) maxDimension, err := strconv.ParseInt(str, 10, 64)
if err != nil { if err != nil {
panic(err) panic(err)
@ -306,13 +282,5 @@ func (pt *ParamTable) initMetaRootPath() {
} }
func (pt *ParamTable) initMaxTaskNum() { func (pt *ParamTable) initMaxTaskNum() {
str, err := pt.Load("proxy.maxTaskNum") pt.MaxTaskNum = pt.ParseInt64WithDefault("proxy.maxTaskNum", 1024)
if err != nil {
panic(err)
}
maxTaskNum, err := strconv.ParseInt(str, 10, 64)
if err != nil {
panic(err)
}
pt.MaxTaskNum = maxTaskNum
} }

View File

@ -98,71 +98,36 @@ func shouldPanic(t *testing.T, name string, f func()) {
} }
func TestParamTable_Panics(t *testing.T) { func TestParamTable_Panics(t *testing.T) {
shouldPanic(t, "proxy.timeTickInterval", func() {
Params.Remove("proxy.timeTickInterval")
Params.initTimeTickInterval()
})
shouldPanic(t, "proxy.timeTickInterval", func() { shouldPanic(t, "proxy.timeTickInterval", func() {
Params.Save("proxy.timeTickInterval", "") Params.Save("proxy.timeTickInterval", "")
Params.initTimeTickInterval() Params.initTimeTickInterval()
}) })
shouldPanic(t, "proxy.msgStream.timeTick.bufSize", func() {
Params.Remove("proxy.msgStream.timeTick.bufSize")
Params.initMsgStreamTimeTickBufSize()
})
shouldPanic(t, "proxy.msgStream.timeTick.bufSize", func() { shouldPanic(t, "proxy.msgStream.timeTick.bufSize", func() {
Params.Save("proxy.msgStream.timeTick.bufSize", "abc") Params.Save("proxy.msgStream.timeTick.bufSize", "abc")
Params.initMsgStreamTimeTickBufSize() Params.initMsgStreamTimeTickBufSize()
}) })
shouldPanic(t, "proxy.maxNameLength", func() {
Params.Remove("proxy.maxNameLength")
Params.initMaxNameLength()
})
shouldPanic(t, "proxy.maxNameLength", func() { shouldPanic(t, "proxy.maxNameLength", func() {
Params.Save("proxy.maxNameLength", "abc") Params.Save("proxy.maxNameLength", "abc")
Params.initMaxNameLength() Params.initMaxNameLength()
}) })
shouldPanic(t, "proxy.maxFieldNum", func() {
Params.Remove("proxy.maxFieldNum")
Params.initMaxFieldNum()
})
shouldPanic(t, "proxy.maxFieldNum", func() { shouldPanic(t, "proxy.maxFieldNum", func() {
Params.Save("proxy.maxFieldNum", "abc") Params.Save("proxy.maxFieldNum", "abc")
Params.initMaxFieldNum() Params.initMaxFieldNum()
}) })
shouldPanic(t, "proxy.maxShardNum", func() {
Params.Remove("proxy.maxShardNum")
Params.initMaxShardNum()
})
shouldPanic(t, "proxy.maxShardNum", func() { shouldPanic(t, "proxy.maxShardNum", func() {
Params.Save("proxy.maxShardNum", "abc") Params.Save("proxy.maxShardNum", "abc")
Params.initMaxShardNum() Params.initMaxShardNum()
}) })
shouldPanic(t, "proxy.maxDimension", func() {
Params.Remove("proxy.maxDimension")
Params.initMaxDimension()
})
shouldPanic(t, "proxy.maxDimension", func() { shouldPanic(t, "proxy.maxDimension", func() {
Params.Save("proxy.maxDimension", "-asdf") Params.Save("proxy.maxDimension", "-asdf")
Params.initMaxDimension() Params.initMaxDimension()
}) })
shouldPanic(t, "proxy.maxTaskNum", func() {
Params.Remove("proxy.maxTaskNum")
Params.initMaxTaskNum()
})
shouldPanic(t, "proxy.maxTaskNum", func() { shouldPanic(t, "proxy.maxTaskNum", func() {
Params.Save("proxy.maxTaskNum", "-asdf") Params.Save("proxy.maxTaskNum", "-asdf")
Params.initMaxTaskNum() Params.initMaxTaskNum()

View File

@ -86,15 +86,6 @@ func (p *ParamTable) InitOnce() {
//Init is used to initialize params //Init is used to initialize params
func (p *ParamTable) Init() { func (p *ParamTable) Init() {
p.BaseTable.Init() p.BaseTable.Init()
err := p.LoadYaml("advanced/query_node.yaml")
if err != nil {
panic(err)
}
err = p.LoadYaml("milvus.yaml")
if err != nil {
panic(err)
}
p.initQueryCoordAddress() p.initQueryCoordAddress()
p.initRoleName() p.initRoleName()

View File

@ -108,9 +108,6 @@ func (p *ParamTable) InitOnce() {
// Init is used to initialize configuration items. // Init is used to initialize configuration items.
func (p *ParamTable) Init() { func (p *ParamTable) Init() {
p.BaseTable.Init() p.BaseTable.Init()
if err := p.LoadYaml("advanced/query_node.yaml"); err != nil {
panic(err)
}
p.initCacheSize() p.initCacheSize()
p.initInContainer() p.initInContainer()
@ -245,29 +242,29 @@ func (p *ParamTable) initRocksmqPath() {
// advanced params // advanced params
// stats // stats
func (p *ParamTable) initStatsPublishInterval() { func (p *ParamTable) initStatsPublishInterval() {
p.StatsPublishInterval = p.ParseInt("queryNode.stats.publishInterval") p.StatsPublishInterval = p.ParseIntWithDefault("queryNode.stats.publishInterval", 1000)
} }
// dataSync: // dataSync:
func (p *ParamTable) initFlowGraphMaxQueueLength() { func (p *ParamTable) initFlowGraphMaxQueueLength() {
p.FlowGraphMaxQueueLength = p.ParseInt32("queryNode.dataSync.flowGraph.maxQueueLength") p.FlowGraphMaxQueueLength = p.ParseInt32WithDefault("queryNode.dataSync.flowGraph.maxQueueLength", 1024)
} }
func (p *ParamTable) initFlowGraphMaxParallelism() { func (p *ParamTable) initFlowGraphMaxParallelism() {
p.FlowGraphMaxParallelism = p.ParseInt32("queryNode.dataSync.flowGraph.maxParallelism") p.FlowGraphMaxParallelism = p.ParseInt32WithDefault("queryNode.dataSync.flowGraph.maxParallelism", 1024)
} }
// msgStream // msgStream
func (p *ParamTable) initSearchReceiveBufSize() { func (p *ParamTable) initSearchReceiveBufSize() {
p.SearchReceiveBufSize = p.ParseInt64("queryNode.msgStream.search.recvBufSize") p.SearchReceiveBufSize = p.ParseInt64WithDefault("queryNode.msgStream.search.recvBufSize", 512)
} }
func (p *ParamTable) initSearchPulsarBufSize() { func (p *ParamTable) initSearchPulsarBufSize() {
p.SearchPulsarBufSize = p.ParseInt64("queryNode.msgStream.search.pulsarBufSize") p.SearchPulsarBufSize = p.ParseInt64WithDefault("queryNode.msgStream.search.pulsarBufSize", 512)
} }
func (p *ParamTable) initSearchResultReceiveBufSize() { func (p *ParamTable) initSearchResultReceiveBufSize() {
p.SearchResultReceiveBufSize = p.ParseInt64("queryNode.msgStream.searchResult.recvBufSize") p.SearchResultReceiveBufSize = p.ParseInt64WithDefault("queryNode.msgStream.searchResult.recvBufSize", 64)
} }
// ------------------------ channel names // ------------------------ channel names
@ -333,7 +330,7 @@ func (p *ParamTable) initGracefulTime() {
} }
func (p *ParamTable) initSegcoreChunkRows() { func (p *ParamTable) initSegcoreChunkRows() {
p.ChunkRows = p.ParseInt64("queryNode.segcore.chunkRows") p.ChunkRows = p.ParseInt64WithDefault("queryNode.segcore.chunkRows", 32768)
} }
func (p *ParamTable) initKnowhereSimdType() { func (p *ParamTable) initKnowhereSimdType() {

View File

@ -66,10 +66,6 @@ func (p *ParamTable) InitOnce() {
func (p *ParamTable) Init() { func (p *ParamTable) Init() {
// load yaml // load yaml
p.BaseTable.Init() p.BaseTable.Init()
err := p.LoadYaml("advanced/root_coord.yaml")
if err != nil {
panic(err)
}
p.initPulsarAddress() p.initPulsarAddress()
p.initEtcdEndpoints() p.initEtcdEndpoints()

View File

@ -33,6 +33,8 @@ import (
// UniqueID is type alias of typeutil.UniqueID // UniqueID is type alias of typeutil.UniqueID
type UniqueID = typeutil.UniqueID type UniqueID = typeutil.UniqueID
const envPrefix string = "milvus"
type Base interface { type Base interface {
Load(key string) (string, error) Load(key string) (string, error)
LoadRange(key, endKey string, limit int) ([]string, []string, error) LoadRange(key, endKey string, limit int) ([]string, []string, error)
@ -61,6 +63,8 @@ func (gp *BaseTable) Init() {
gp.loadFromMilvusYaml() gp.loadFromMilvusYaml()
gp.loadFromComponentYaml()
gp.tryloadFromEnv() gp.tryloadFromEnv()
gp.InitLogCfg() gp.InitLogCfg()
@ -104,6 +108,18 @@ func (gp *BaseTable) loadFromMilvusYaml() {
} }
} }
func (gp *BaseTable) loadFromComponentYaml() bool {
configFile := gp.configDir + "advanced/component.yaml"
if _, err := os.Stat(configFile); err == nil {
if err := gp.LoadYaml("advanced/component.yaml"); err != nil {
panic(err)
}
return true
}
log.Debug("failed to find component.yaml in config, skip..")
return false
}
func (gp *BaseTable) loadFromCommonYaml() bool { func (gp *BaseTable) loadFromCommonYaml() bool {
configFile := gp.configDir + "advanced/common.yaml" configFile := gp.configDir + "advanced/common.yaml"
if _, err := os.Stat(configFile); err == nil { if _, err := os.Stat(configFile); err == nil {
@ -117,6 +133,7 @@ func (gp *BaseTable) loadFromCommonYaml() bool {
} }
func (gp *BaseTable) tryloadFromEnv() { func (gp *BaseTable) tryloadFromEnv() {
var err error
minioAddress := os.Getenv("MINIO_ADDRESS") minioAddress := os.Getenv("MINIO_ADDRESS")
if minioAddress == "" { if minioAddress == "" {
minioHost, err := gp.Load("minio.address") minioHost, err := gp.Load("minio.address")
@ -129,10 +146,7 @@ func (gp *BaseTable) tryloadFromEnv() {
} }
minioAddress = minioHost + ":" + port minioAddress = minioHost + ":" + port
} }
err := gp.Save("_MinioAddress", minioAddress) gp.Save("_MinioAddress", minioAddress)
if err != nil {
panic(err)
}
etcdEndpoints := os.Getenv("ETCD_ENDPOINTS") etcdEndpoints := os.Getenv("ETCD_ENDPOINTS")
if etcdEndpoints == "" { if etcdEndpoints == "" {
@ -141,10 +155,7 @@ func (gp *BaseTable) tryloadFromEnv() {
panic(err) panic(err)
} }
} }
err = gp.Save("_EtcdEndpoints", etcdEndpoints) gp.Save("_EtcdEndpoints", etcdEndpoints)
if err != nil {
panic(err)
}
pulsarAddress := os.Getenv("PULSAR_ADDRESS") pulsarAddress := os.Getenv("PULSAR_ADDRESS")
if pulsarAddress == "" { if pulsarAddress == "" {
@ -158,10 +169,7 @@ func (gp *BaseTable) tryloadFromEnv() {
} }
pulsarAddress = "pulsar://" + pulsarHost + ":" + port pulsarAddress = "pulsar://" + pulsarHost + ":" + port
} }
err = gp.Save("_PulsarAddress", pulsarAddress) gp.Save("_PulsarAddress", pulsarAddress)
if err != nil {
panic(err)
}
rocksmqPath := os.Getenv("ROCKSMQ_PATH") rocksmqPath := os.Getenv("ROCKSMQ_PATH")
if rocksmqPath == "" { if rocksmqPath == "" {
@ -171,10 +179,7 @@ func (gp *BaseTable) tryloadFromEnv() {
} }
rocksmqPath = path rocksmqPath = path
} }
err = gp.Save("_RocksmqPath", rocksmqPath) gp.Save("_RocksmqPath", rocksmqPath)
if err != nil {
panic(err)
}
rootCoordAddress := os.Getenv("ROOT_COORD_ADDRESS") rootCoordAddress := os.Getenv("ROOT_COORD_ADDRESS")
if rootCoordAddress == "" { if rootCoordAddress == "" {
@ -188,10 +193,7 @@ func (gp *BaseTable) tryloadFromEnv() {
} }
rootCoordAddress = rootCoordHost + ":" + port rootCoordAddress = rootCoordHost + ":" + port
} }
err = gp.Save("_RootCoordAddress", rootCoordAddress) gp.Save("_RootCoordAddress", rootCoordAddress)
if err != nil {
panic(err)
}
indexCoordAddress := os.Getenv("INDEX_COORD_ADDRESS") indexCoordAddress := os.Getenv("INDEX_COORD_ADDRESS")
if indexCoordAddress == "" { if indexCoordAddress == "" {
@ -205,10 +207,7 @@ func (gp *BaseTable) tryloadFromEnv() {
} }
indexCoordAddress = indexCoordHost + ":" + port indexCoordAddress = indexCoordHost + ":" + port
} }
err = gp.Save("_IndexCoordAddress", indexCoordAddress) gp.Save("_IndexCoordAddress", indexCoordAddress)
if err != nil {
panic(err)
}
queryCoordAddress := os.Getenv("QUERY_COORD_ADDRESS") queryCoordAddress := os.Getenv("QUERY_COORD_ADDRESS")
if queryCoordAddress == "" { if queryCoordAddress == "" {
@ -222,10 +221,7 @@ func (gp *BaseTable) tryloadFromEnv() {
} }
queryCoordAddress = serviceHost + ":" + port queryCoordAddress = serviceHost + ":" + port
} }
err = gp.Save("_QueryCoordAddress", queryCoordAddress) gp.Save("_QueryCoordAddress", queryCoordAddress)
if err != nil {
panic(err)
}
dataCoordAddress := os.Getenv("DATA_COORD_ADDRESS") dataCoordAddress := os.Getenv("DATA_COORD_ADDRESS")
if dataCoordAddress == "" { if dataCoordAddress == "" {
@ -239,26 +235,13 @@ func (gp *BaseTable) tryloadFromEnv() {
} }
dataCoordAddress = serviceHost + ":" + port dataCoordAddress = serviceHost + ":" + port
} }
err = gp.Save("_DataCoordAddress", dataCoordAddress) gp.Save("_DataCoordAddress", dataCoordAddress)
if err != nil {
panic(err)
}
insertBufferFlushSize := os.Getenv("DATA_NODE_IBUFSIZE") insertBufferFlushSize := os.Getenv("DATA_NODE_IBUFSIZE")
if insertBufferFlushSize == "" { if insertBufferFlushSize == "" {
//var err error insertBufferFlushSize = gp.LoadWithDefault("datanode.flush.insertBufSize", "16777216")
insertBufferFlushSize, err = gp.Load("datanode.flush.insertBufSize")
if err != nil {
panic(err)
}
}
if insertBufferFlushSize == "" {
insertBufferFlushSize = "16777216" //use default
}
err = gp.Save("_DATANODE_INSERTBUFSIZE", insertBufferFlushSize)
if err != nil {
panic(err)
} }
gp.Save("_DATANODE_INSERTBUFSIZE", insertBufferFlushSize)
minioAccessKey := os.Getenv("MINIO_ACCESS_KEY") minioAccessKey := os.Getenv("MINIO_ACCESS_KEY")
if minioAccessKey == "" { if minioAccessKey == "" {
@ -267,10 +250,7 @@ func (gp *BaseTable) tryloadFromEnv() {
panic(err) panic(err)
} }
} }
err = gp.Save("_MinioAccessKeyID", minioAccessKey) gp.Save("_MinioAccessKeyID", minioAccessKey)
if err != nil {
panic(err)
}
minioSecretKey := os.Getenv("MINIO_SECRET_KEY") minioSecretKey := os.Getenv("MINIO_SECRET_KEY")
if minioSecretKey == "" { if minioSecretKey == "" {
@ -279,10 +259,7 @@ func (gp *BaseTable) tryloadFromEnv() {
panic(err) panic(err)
} }
} }
err = gp.Save("_MinioSecretAccessKey", minioSecretKey) gp.Save("_MinioSecretAccessKey", minioSecretKey)
if err != nil {
panic(err)
}
minioUseSSL := os.Getenv("MINIO_USE_SSL") minioUseSSL := os.Getenv("MINIO_USE_SSL")
if minioUseSSL == "" { if minioUseSSL == "" {
@ -291,10 +268,7 @@ func (gp *BaseTable) tryloadFromEnv() {
panic(err) panic(err)
} }
} }
err = gp.Save("_MinioUseSSL", minioUseSSL) gp.Save("_MinioUseSSL", minioUseSSL)
if err != nil {
panic(err)
}
minioBucketName := os.Getenv("MINIO_BUCKET_NAME") minioBucketName := os.Getenv("MINIO_BUCKET_NAME")
if minioBucketName == "" { if minioBucketName == "" {
@ -303,9 +277,18 @@ func (gp *BaseTable) tryloadFromEnv() {
panic(err) panic(err)
} }
} }
err = gp.Save("_MinioBucketName", minioBucketName) gp.Save("_MinioBucketName", minioBucketName)
if err != nil {
panic(err) // try to load environment start with ENV_PREFIX
for _, e := range os.Environ() {
parts := strings.SplitN(e, "=", 2)
if strings.Contains(parts[0], envPrefix) {
parts := strings.SplitN(e, "=", 2)
// remove the ENV PREFIX and use the rest as key
keyParts := strings.SplitAfterN(parts[0], ".", 2)
// mem kv throw no errors
gp.Save(keyParts[1], parts[1])
}
} }
} }

View File

@ -156,6 +156,28 @@ func TestBateTable_ConfPath(t *testing.T) {
assert.Equal(t, filepath.Clean(config), filepath.Clean(dir+"/../../../configs/")) assert.Equal(t, filepath.Clean(config), filepath.Clean(dir+"/../../../configs/"))
} }
func TestBaseTable_Env(t *testing.T) {
os.Setenv("milvus.test", "test")
os.Setenv("milvus.test.test2", "test2")
baseParams.Init()
result, _ := baseParams.Load("test")
assert.Equal(t, result, "test")
result, _ = baseParams.Load("test.test2")
assert.Equal(t, result, "test2")
err := os.Setenv("milvus.invalid=xxx", "test")
assert.Error(t, err)
err = os.Setenv("milvus.invalid", "xxx=test")
assert.NoError(t, err)
baseParams.Init()
result, _ = baseParams.Load("invalid")
assert.Equal(t, result, "xxx=test")
}
func TestBaseTable_Parse(t *testing.T) { func TestBaseTable_Parse(t *testing.T) {
t.Run("ParseBool", func(t *testing.T) { t.Run("ParseBool", func(t *testing.T) {
assert.Nil(t, baseParams.Save("key", "true")) assert.Nil(t, baseParams.Save("key", "true"))