mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 17:48:29 +08:00
enhance: make runtime config into a global environment table (#38671)
issue: #38399 --------- Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
parent
69a9fd6ead
commit
118678bd91
@ -23,7 +23,6 @@ import (
|
|||||||
"os/signal"
|
"os/signal"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
@ -173,13 +172,6 @@ func NewMilvusRoles() *MilvusRoles {
|
|||||||
return mr
|
return mr
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnvValue not used now.
|
|
||||||
func (mr *MilvusRoles) EnvValue(env string) bool {
|
|
||||||
env = strings.ToLower(env)
|
|
||||||
env = strings.Trim(env, " ")
|
|
||||||
return env == "1" || env == "true"
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mr *MilvusRoles) printLDPreLoad() {
|
func (mr *MilvusRoles) printLDPreLoad() {
|
||||||
const LDPreLoad = "LD_PRELOAD"
|
const LDPreLoad = "LD_PRELOAD"
|
||||||
val, ok := os.LookupEnv(LDPreLoad)
|
val, ok := os.LookupEnv(LDPreLoad)
|
||||||
@ -449,45 +441,55 @@ func (mr *MilvusRoles) Run() {
|
|||||||
if mr.EnableRootCoord {
|
if mr.EnableRootCoord {
|
||||||
rootCoord = mr.runRootCoord(ctx, local, &wg)
|
rootCoord = mr.runRootCoord(ctx, local, &wg)
|
||||||
componentMap[typeutil.RootCoordRole] = rootCoord
|
componentMap[typeutil.RootCoordRole] = rootCoord
|
||||||
|
paramtable.SetLocalComponentEnabled(typeutil.RootCoordRole)
|
||||||
}
|
}
|
||||||
|
|
||||||
if mr.EnableDataCoord {
|
if mr.EnableDataCoord {
|
||||||
dataCoord = mr.runDataCoord(ctx, local, &wg)
|
dataCoord = mr.runDataCoord(ctx, local, &wg)
|
||||||
componentMap[typeutil.DataCoordRole] = dataCoord
|
componentMap[typeutil.DataCoordRole] = dataCoord
|
||||||
|
paramtable.SetLocalComponentEnabled(typeutil.DataCoordRole)
|
||||||
}
|
}
|
||||||
|
|
||||||
if mr.EnableIndexCoord {
|
if mr.EnableIndexCoord {
|
||||||
indexCoord = mr.runIndexCoord(ctx, local, &wg)
|
indexCoord = mr.runIndexCoord(ctx, local, &wg)
|
||||||
componentMap[typeutil.IndexCoordRole] = indexCoord
|
componentMap[typeutil.IndexCoordRole] = indexCoord
|
||||||
|
paramtable.SetLocalComponentEnabled(typeutil.IndexCoordRole)
|
||||||
}
|
}
|
||||||
|
|
||||||
if mr.EnableQueryCoord {
|
if mr.EnableQueryCoord {
|
||||||
queryCoord = mr.runQueryCoord(ctx, local, &wg)
|
queryCoord = mr.runQueryCoord(ctx, local, &wg)
|
||||||
componentMap[typeutil.QueryCoordRole] = queryCoord
|
componentMap[typeutil.QueryCoordRole] = queryCoord
|
||||||
|
paramtable.SetLocalComponentEnabled(typeutil.QueryCoordRole)
|
||||||
}
|
}
|
||||||
|
|
||||||
if mr.EnableQueryNode {
|
if mr.EnableQueryNode {
|
||||||
queryNode = mr.runQueryNode(ctx, local, &wg)
|
queryNode = mr.runQueryNode(ctx, local, &wg)
|
||||||
componentMap[typeutil.QueryNodeRole] = queryNode
|
componentMap[typeutil.QueryNodeRole] = queryNode
|
||||||
|
paramtable.SetLocalComponentEnabled(typeutil.QueryNodeRole)
|
||||||
}
|
}
|
||||||
|
|
||||||
if mr.EnableDataNode {
|
if mr.EnableDataNode {
|
||||||
dataNode = mr.runDataNode(ctx, local, &wg)
|
dataNode = mr.runDataNode(ctx, local, &wg)
|
||||||
componentMap[typeutil.DataNodeRole] = dataNode
|
componentMap[typeutil.DataNodeRole] = dataNode
|
||||||
|
paramtable.SetLocalComponentEnabled(typeutil.DataNodeRole)
|
||||||
}
|
}
|
||||||
if mr.EnableIndexNode {
|
if mr.EnableIndexNode {
|
||||||
indexNode = mr.runIndexNode(ctx, local, &wg)
|
indexNode = mr.runIndexNode(ctx, local, &wg)
|
||||||
componentMap[typeutil.IndexNodeRole] = indexNode
|
componentMap[typeutil.IndexNodeRole] = indexNode
|
||||||
|
paramtable.SetLocalComponentEnabled(typeutil.IndexNodeRole)
|
||||||
}
|
}
|
||||||
|
|
||||||
if mr.EnableProxy {
|
if mr.EnableProxy {
|
||||||
proxy = mr.runProxy(ctx, local, &wg)
|
proxy = mr.runProxy(ctx, local, &wg)
|
||||||
componentMap[typeutil.ProxyRole] = proxy
|
componentMap[typeutil.ProxyRole] = proxy
|
||||||
|
paramtable.SetLocalComponentEnabled(typeutil.ProxyRole)
|
||||||
}
|
}
|
||||||
|
|
||||||
if mr.EnableStreamingNode {
|
if mr.EnableStreamingNode {
|
||||||
|
// Before initializing the local streaming node, make sure the local registry is ready.
|
||||||
streamingNode = mr.runStreamingNode(ctx, local, &wg)
|
streamingNode = mr.runStreamingNode(ctx, local, &wg)
|
||||||
componentMap[typeutil.StreamingNodeRole] = streamingNode
|
componentMap[typeutil.StreamingNodeRole] = streamingNode
|
||||||
|
paramtable.SetLocalComponentEnabled(typeutil.StreamingNodeRole)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|||||||
@ -28,19 +28,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestRoles(t *testing.T) {
|
func TestRoles(t *testing.T) {
|
||||||
r := MilvusRoles{}
|
|
||||||
|
|
||||||
assert.True(t, r.EnvValue("1"))
|
|
||||||
assert.True(t, r.EnvValue(" 1 "))
|
|
||||||
assert.True(t, r.EnvValue("True"))
|
|
||||||
assert.True(t, r.EnvValue(" True "))
|
|
||||||
assert.True(t, r.EnvValue(" TRue "))
|
|
||||||
assert.False(t, r.EnvValue("0"))
|
|
||||||
assert.False(t, r.EnvValue(" 0 "))
|
|
||||||
assert.False(t, r.EnvValue(" false "))
|
|
||||||
assert.False(t, r.EnvValue(" False "))
|
|
||||||
assert.False(t, r.EnvValue(" abc "))
|
|
||||||
|
|
||||||
ss := strings.SplitN("abcdef", "=", 2)
|
ss := strings.SplitN("abcdef", "=", 2)
|
||||||
assert.Equal(t, len(ss), 1)
|
assert.Equal(t, len(ss), 1)
|
||||||
ss = strings.SplitN("adb=def", "=", 2)
|
ss = strings.SplitN("adb=def", "=", 2)
|
||||||
|
|||||||
@ -26,6 +26,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/shirou/gopsutil/v3/disk"
|
"github.com/shirou/gopsutil/v3/disk"
|
||||||
|
"go.uber.org/atomic"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/pkg/config"
|
"github.com/milvus-io/milvus/pkg/config"
|
||||||
@ -104,8 +105,6 @@ type ComponentParam struct {
|
|||||||
StreamingCoordGrpcClientCfg GrpcClientConfig
|
StreamingCoordGrpcClientCfg GrpcClientConfig
|
||||||
StreamingNodeGrpcClientCfg GrpcClientConfig
|
StreamingNodeGrpcClientCfg GrpcClientConfig
|
||||||
IntegrationTestCfg integrationTestConfig
|
IntegrationTestCfg integrationTestConfig
|
||||||
|
|
||||||
RuntimeConfig runtimeConfig
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init initialize once
|
// Init initialize once
|
||||||
@ -4850,11 +4849,13 @@ It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDura
|
|||||||
p.TxnDefaultKeepaliveTimeout.Init(base.mgr)
|
p.TxnDefaultKeepaliveTimeout.Init(base.mgr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// runtimeConfig is just a private environment value table.
|
||||||
type runtimeConfig struct {
|
type runtimeConfig struct {
|
||||||
CreateTime RuntimeParamItem
|
createTime time.Time
|
||||||
UpdateTime RuntimeParamItem
|
updateTime time.Time
|
||||||
Role RuntimeParamItem
|
role string
|
||||||
NodeID RuntimeParamItem
|
nodeID atomic.Int64
|
||||||
|
components map[string]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type integrationTestConfig struct {
|
type integrationTestConfig struct {
|
||||||
|
|||||||
@ -396,39 +396,3 @@ func getAndConvert[T any](v string, converter func(input string) (T, error), def
|
|||||||
}
|
}
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
|
|
||||||
type RuntimeParamItem struct {
|
|
||||||
value atomic.Value
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rpi *RuntimeParamItem) GetValue() any {
|
|
||||||
return rpi.value.Load()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rpi *RuntimeParamItem) GetAsString() string {
|
|
||||||
value, ok := rpi.value.Load().(string)
|
|
||||||
if !ok {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
return value
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rpi *RuntimeParamItem) GetAsTime() time.Time {
|
|
||||||
value, ok := rpi.value.Load().(time.Time)
|
|
||||||
if !ok {
|
|
||||||
return time.Time{}
|
|
||||||
}
|
|
||||||
return value
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rpi *RuntimeParamItem) GetAsInt64() int64 {
|
|
||||||
value, ok := rpi.value.Load().(int64)
|
|
||||||
if !ok {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
return value
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rpi *RuntimeParamItem) SetValue(value any) {
|
|
||||||
rpi.value.Store(value)
|
|
||||||
}
|
|
||||||
|
|||||||
@ -25,6 +25,9 @@ import (
|
|||||||
var (
|
var (
|
||||||
once sync.Once
|
once sync.Once
|
||||||
params ComponentParam
|
params ComponentParam
|
||||||
|
runtimeParam = runtimeConfig{
|
||||||
|
components: make(map[string]struct{}, 0),
|
||||||
|
}
|
||||||
hookParams hookConfig
|
hookParams hookConfig
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -58,11 +61,11 @@ func GetHookParams() *hookConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func SetNodeID(newID UniqueID) {
|
func SetNodeID(newID UniqueID) {
|
||||||
params.RuntimeConfig.NodeID.SetValue(newID)
|
runtimeParam.nodeID.Store(newID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetNodeID() UniqueID {
|
func GetNodeID() UniqueID {
|
||||||
return params.RuntimeConfig.NodeID.GetAsInt64()
|
return runtimeParam.nodeID.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetStringNodeID() string {
|
func GetStringNodeID() string {
|
||||||
@ -70,25 +73,34 @@ func GetStringNodeID() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func SetRole(role string) {
|
func SetRole(role string) {
|
||||||
params.RuntimeConfig.Role.SetValue(role)
|
runtimeParam.role = role
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetRole() string {
|
func GetRole() string {
|
||||||
return params.RuntimeConfig.Role.GetAsString()
|
return runtimeParam.role
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetCreateTime(d time.Time) {
|
func SetCreateTime(d time.Time) {
|
||||||
params.RuntimeConfig.CreateTime.SetValue(d)
|
runtimeParam.createTime = d
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetCreateTime() time.Time {
|
func GetCreateTime() time.Time {
|
||||||
return params.RuntimeConfig.CreateTime.GetAsTime()
|
return runtimeParam.createTime
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetUpdateTime(d time.Time) {
|
func SetUpdateTime(d time.Time) {
|
||||||
params.RuntimeConfig.UpdateTime.SetValue(d)
|
runtimeParam.updateTime = d
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetUpdateTime() time.Time {
|
func GetUpdateTime() time.Time {
|
||||||
return params.RuntimeConfig.UpdateTime.GetAsTime()
|
return runtimeParam.updateTime
|
||||||
|
}
|
||||||
|
|
||||||
|
func SetLocalComponentEnabled(component string) {
|
||||||
|
runtimeParam.components[component] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func IsLocalComponentEnabled(component string) bool {
|
||||||
|
_, ok := runtimeParam.components[component]
|
||||||
|
return ok
|
||||||
}
|
}
|
||||||
|
|||||||
@ -25,6 +25,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/pkg/config"
|
"github.com/milvus-io/milvus/pkg/config"
|
||||||
"github.com/milvus-io/milvus/pkg/util"
|
"github.com/milvus-io/milvus/pkg/util"
|
||||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestServiceParam(t *testing.T) {
|
func TestServiceParam(t *testing.T) {
|
||||||
@ -221,3 +222,14 @@ func TestServiceParam(t *testing.T) {
|
|||||||
assert.Equal(t, 10000, Params.PaginationSize.GetAsInt())
|
assert.Equal(t, 10000, Params.PaginationSize.GetAsInt())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRuntimConfig(t *testing.T) {
|
||||||
|
SetRole(typeutil.StandaloneRole)
|
||||||
|
assert.Equal(t, GetRole(), typeutil.StandaloneRole)
|
||||||
|
|
||||||
|
SetLocalComponentEnabled(typeutil.QueryNodeRole)
|
||||||
|
assert.True(t, IsLocalComponentEnabled(typeutil.QueryNodeRole))
|
||||||
|
|
||||||
|
SetLocalComponentEnabled(typeutil.QueryCoordRole)
|
||||||
|
assert.True(t, IsLocalComponentEnabled(typeutil.QueryCoordRole))
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user