Set rolname when init GlobalParamTable (#14422)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
This commit is contained in:
Cai Yudong 2021-12-28 23:12:46 +08:00 committed by GitHub
parent ffe44ae1ea
commit d34bb26414
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 55 additions and 80 deletions

View File

@ -45,9 +45,9 @@ import (
"github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/util/healthz" "github.com/milvus-io/milvus/internal/util/healthz"
"github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq" "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq"
"github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
) )
func newMsgFactory(localMsg bool) msgstream.Factory { func newMsgFactory(localMsg bool) msgstream.Factory {
@ -92,9 +92,12 @@ func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool) *compone
wg.Add(1) wg.Add(1)
go func() { go func() {
rootcoord.Params.InitOnce() rootcoord.Params.InitOnce()
if localMsg {
rootcoord.Params.SetLogConfig(typeutil.StandaloneRole)
} else {
rootcoord.Params.SetLogConfig(typeutil.RootCoordRole)
}
f := setLoggerFunc()
rootcoord.Params.BaseParams.SetLogConfig(f)
factory := newMsgFactory(localMsg) factory := newMsgFactory(localMsg)
var err error var err error
rc, err = components.NewRootCoord(ctx, factory) rc, err = components.NewRootCoord(ctx, factory)
@ -121,9 +124,12 @@ func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, alias string
go func() { go func() {
proxy.Params.ProxyCfg.InitAlias(alias) proxy.Params.ProxyCfg.InitAlias(alias)
proxy.Params.InitOnce() proxy.Params.InitOnce()
if localMsg {
proxy.Params.SetLogConfig(typeutil.StandaloneRole)
} else {
proxy.Params.SetLogConfig(typeutil.ProxyRole)
}
f := setLoggerFunc()
proxy.Params.BaseParams.SetLogConfig(f)
factory := newMsgFactory(localMsg) factory := newMsgFactory(localMsg)
var err error var err error
pn, err = components.NewProxy(ctx, factory) pn, err = components.NewProxy(ctx, factory)
@ -149,9 +155,12 @@ func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool) *compon
wg.Add(1) wg.Add(1)
go func() { go func() {
querycoord.Params.InitOnce() querycoord.Params.InitOnce()
if localMsg {
querycoord.Params.SetLogConfig(typeutil.StandaloneRole)
} else {
querycoord.Params.SetLogConfig(typeutil.QueryCoordRole)
}
f := setLoggerFunc()
querycoord.Params.BaseParams.SetLogConfig(f)
factory := newMsgFactory(localMsg) factory := newMsgFactory(localMsg)
var err error var err error
qs, err = components.NewQueryCoord(ctx, factory) qs, err = components.NewQueryCoord(ctx, factory)
@ -178,9 +187,12 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, alias st
go func() { go func() {
querynode.Params.QueryNodeCfg.InitAlias(alias) querynode.Params.QueryNodeCfg.InitAlias(alias)
querynode.Params.InitOnce() querynode.Params.InitOnce()
if localMsg {
querynode.Params.SetLogConfig(typeutil.StandaloneRole)
} else {
querynode.Params.SetLogConfig(typeutil.QueryNodeRole)
}
f := setLoggerFunc()
querynode.Params.BaseParams.SetLogConfig(f)
factory := newMsgFactory(localMsg) factory := newMsgFactory(localMsg)
var err error var err error
qn, err = components.NewQueryNode(ctx, factory) qn, err = components.NewQueryNode(ctx, factory)
@ -206,9 +218,12 @@ func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool) *compone
wg.Add(1) wg.Add(1)
go func() { go func() {
datacoord.Params.InitOnce() datacoord.Params.InitOnce()
if localMsg {
datacoord.Params.SetLogConfig(typeutil.StandaloneRole)
} else {
datacoord.Params.SetLogConfig(typeutil.DataCoordRole)
}
f := setLoggerFunc()
datacoord.Params.BaseParams.SetLogConfig(f)
factory := newMsgFactory(localMsg) factory := newMsgFactory(localMsg)
dctx := logutil.WithModule(ctx, "DataCoord") dctx := logutil.WithModule(ctx, "DataCoord")
@ -237,8 +252,12 @@ func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, alias str
go func() { go func() {
datanode.Params.DataNodeCfg.InitAlias(alias) datanode.Params.DataNodeCfg.InitAlias(alias)
datanode.Params.InitOnce() datanode.Params.InitOnce()
f := setLoggerFunc() if localMsg {
datanode.Params.BaseParams.SetLogConfig(f) datanode.Params.SetLogConfig(typeutil.StandaloneRole)
} else {
datanode.Params.SetLogConfig(typeutil.DataNodeRole)
}
factory := newMsgFactory(localMsg) factory := newMsgFactory(localMsg)
var err error var err error
dn, err = components.NewDataNode(ctx, factory) dn, err = components.NewDataNode(ctx, factory)
@ -264,9 +283,12 @@ func (mr *MilvusRoles) runIndexCoord(ctx context.Context, localMsg bool) *compon
wg.Add(1) wg.Add(1)
go func() { go func() {
indexcoord.Params.InitOnce() indexcoord.Params.InitOnce()
if localMsg {
indexcoord.Params.SetLogConfig(typeutil.StandaloneRole)
} else {
indexcoord.Params.SetLogConfig(typeutil.IndexCoordRole)
}
f := setLoggerFunc()
indexcoord.Params.BaseParams.SetLogConfig(f)
var err error var err error
is, err = components.NewIndexCoord(ctx) is, err = components.NewIndexCoord(ctx)
if err != nil { if err != nil {
@ -292,9 +314,12 @@ func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool, alias st
go func() { go func() {
indexnode.Params.IndexNodeCfg.InitAlias(alias) indexnode.Params.IndexNodeCfg.InitAlias(alias)
indexnode.Params.InitOnce() indexnode.Params.InitOnce()
if localMsg {
indexnode.Params.SetLogConfig(typeutil.StandaloneRole)
} else {
indexnode.Params.SetLogConfig(typeutil.IndexNodeRole)
}
f := setLoggerFunc()
indexnode.Params.BaseParams.SetLogConfig(f)
var err error var err error
in, err = components.NewIndexNode(ctx) in, err = components.NewIndexNode(ctx)
if err != nil { if err != nil {
@ -329,10 +354,6 @@ func (mr *MilvusRoles) Run(localMsg bool, alias string) {
log.Error("Failed to set deploy mode: ", zap.Error(err)) log.Error("Failed to set deploy mode: ", zap.Error(err))
} }
paramtable.Params.Init()
f := setLoggerFunc()
paramtable.Params.SetLogConfig(f)
if err := initRocksmq(); err != nil { if err := initRocksmq(); err != nil {
panic(err) panic(err)
} }
@ -466,11 +487,3 @@ func (mr *MilvusRoles) Run(localMsg bool, alias string) {
// some deferred Stop has race with context cancel // some deferred Stop has race with context cancel
cancel() cancel()
} }
func setLoggerFunc() func(cfg log.Config) {
return func(cfg log.Config) {
log.Info("Set log file to ", zap.String("path", cfg.File.Filename))
logutil.SetupLogger(&cfg)
defer log.Sync()
}
}

View File

@ -20,9 +20,6 @@ import (
"github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/metricsinfo"
) )
// Params is a package scoped variable of type BaseParamTable.
var Params BaseParamTable
// BaseParamTable is a derived struct of BaseTable. It achieves Composition by // BaseParamTable is a derived struct of BaseTable. It achieves Composition by
// embedding BaseTable. It is used to quickly and easily access the system configuration. // embedding BaseTable. It is used to quickly and easily access the system configuration.
type BaseParamTable struct { type BaseParamTable struct {

View File

@ -16,11 +16,11 @@ import (
"testing" "testing"
"github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestBaseParamTable(t *testing.T) { func TestBaseParamTable(t *testing.T) {
var Params BaseParamTable
Params.Init() Params.Init()
assert.NotZero(t, len(Params.EtcdEndpoints)) assert.NotZero(t, len(Params.EtcdEndpoints))

View File

@ -24,6 +24,7 @@ import (
memkv "github.com/milvus-io/milvus/internal/kv/mem" memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/logutil"
"github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/spf13/cast" "github.com/spf13/cast"
@ -451,8 +452,12 @@ func (gp *BaseTable) InitLogCfg() {
gp.Log.File.MaxDays = gp.ParseInt("log.file.maxAge") gp.Log.File.MaxDays = gp.ParseInt("log.file.maxAge")
} }
func (gp *BaseTable) SetLogConfig(f func(log.Config)) { func (gp *BaseTable) SetLogConfig() {
gp.LogCfgFunc = f gp.LogCfgFunc = func(cfg log.Config) {
log.Info("Set log file to ", zap.String("path", cfg.File.Filename))
logutil.SetupLogger(&cfg)
defer log.Sync()
}
} }
func (gp *BaseTable) SetLogger(id UniqueID) { func (gp *BaseTable) SetLogger(id UniqueID) {

View File

@ -88,6 +88,11 @@ func (p *GlobalParamTable) Init() {
p.IndexNodeCfg.init(&p.BaseParams) p.IndexNodeCfg.init(&p.BaseParams)
} }
func (p *GlobalParamTable) SetLogConfig(role string) {
p.BaseParams.RoleName = role
p.BaseParams.SetLogConfig()
}
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// --- common --- // --- common ---
//type commonConfig struct { //type commonConfig struct {
@ -330,8 +335,6 @@ func (p *rootCoordConfig) init(bp *BaseParamTable) {
p.initDefaultIndexName() p.initDefaultIndexName()
p.initTimeout() p.initTimeout()
//p.initRoleName()
} }
func (p *rootCoordConfig) initPulsarAddress() { func (p *rootCoordConfig) initPulsarAddress() {
@ -453,10 +456,6 @@ func (p *rootCoordConfig) initTimeout() {
p.Timeout = p.BaseParams.ParseIntWithDefault("rootCoord.timeout", 3600) p.Timeout = p.BaseParams.ParseIntWithDefault("rootCoord.timeout", 3600)
} }
//func (p *rootCoordConfig) initRoleName() {
// p.RoleName = "rootcoord"
//}
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// --- proxy --- // --- proxy ---
type proxyConfig struct { type proxyConfig struct {
@ -531,8 +530,6 @@ func (p *proxyConfig) init(bp *BaseParamTable) {
p.initMaxTaskNum() p.initMaxTaskNum()
p.initBufFlagExpireTime() p.initBufFlagExpireTime()
p.initBufFlagCleanupInterval() p.initBufFlagCleanupInterval()
//p.initRoleName()
} }
// Refresh is called after session init // Refresh is called after session init
@ -657,10 +654,6 @@ func (p *proxyConfig) initPulsarMaxMessageSize() {
} }
} }
//func (p *proxyConfig) initRoleName() {
// p.RoleName = "proxy"
//}
func (p *proxyConfig) initEtcdEndpoints() { func (p *proxyConfig) initEtcdEndpoints() {
endpoints, err := p.BaseParams.Load("_EtcdEndpoints") endpoints, err := p.BaseParams.Load("_EtcdEndpoints")
if err != nil { if err != nil {
@ -751,8 +744,6 @@ type queryCoordConfig struct {
func (p *queryCoordConfig) init(bp *BaseParamTable) { func (p *queryCoordConfig) init(bp *BaseParamTable) {
p.BaseParams = bp p.BaseParams = bp
//p.initRoleName()
// --- Channels --- // --- Channels ---
p.initClusterMsgChannelPrefix() p.initClusterMsgChannelPrefix()
p.initSearchChannelPrefix() p.initSearchChannelPrefix()
@ -909,10 +900,6 @@ func (p *queryCoordConfig) initMinioBucketName() {
p.MinioBucketName = bucketName p.MinioBucketName = bucketName
} }
//func (p *queryCoordConfig) initRoleName() {
// p.RoleName = "querycoord"
//}
func (p *queryCoordConfig) initPulsarAddress() { func (p *queryCoordConfig) initPulsarAddress() {
addr, err := p.BaseParams.Load("_PulsarAddress") addr, err := p.BaseParams.Load("_PulsarAddress")
if err != nil { if err != nil {
@ -1089,8 +1076,6 @@ func (p *queryNodeConfig) init(bp *BaseParamTable) {
p.initSegcoreChunkRows() p.initSegcoreChunkRows()
p.initKnowhereSimdType() p.initKnowhereSimdType()
//p.initRoleName()
p.initSkipQueryChannelRecovery() p.initSkipQueryChannelRecovery()
p.initOverloadedMemoryThresholdPercentage() p.initOverloadedMemoryThresholdPercentage()
} }
@ -1288,10 +1273,6 @@ func (p *queryNodeConfig) initKnowhereSimdType() {
log.Debug("initialize the knowhere simd type", zap.String("simd_type", p.SimdType)) log.Debug("initialize the knowhere simd type", zap.String("simd_type", p.SimdType))
} }
//func (p *queryNodeConfig) initRoleName() {
// p.RoleName = "querynode"
//}
func (p *queryNodeConfig) initSkipQueryChannelRecovery() { func (p *queryNodeConfig) initSkipQueryChannelRecovery() {
p.SkipQueryChannelRecovery = p.BaseParams.ParseBool("msgChannel.skipQueryChannelRecovery", false) p.SkipQueryChannelRecovery = p.BaseParams.ParseBool("msgChannel.skipQueryChannelRecovery", false)
} }
@ -1391,7 +1372,6 @@ func (p *dataCoordConfig) init(bp *BaseParamTable) {
p.initTimeTickChannelName() p.initTimeTickChannelName()
p.initSegmentInfoChannelName() p.initSegmentInfoChannelName()
p.initDataCoordSubscriptionName() p.initDataCoordSubscriptionName()
//p.initRoleName()
p.initFlushStreamPosSubPath() p.initFlushStreamPosSubPath()
p.initStatsStreamPosSubPath() p.initStatsStreamPosSubPath()
@ -1534,10 +1514,6 @@ func (p *dataCoordConfig) initDataCoordSubscriptionName() {
p.DataCoordSubscriptionName = strings.Join(s, "-") p.DataCoordSubscriptionName = strings.Join(s, "-")
} }
//func (p *dataCoordConfig) initRoleName() {
// p.RoleName = "datacoord"
//}
func (p *dataCoordConfig) initFlushStreamPosSubPath() { func (p *dataCoordConfig) initFlushStreamPosSubPath() {
subPath, err := p.BaseParams.Load("etcd.flushStreamPosSubPath") subPath, err := p.BaseParams.Load("etcd.flushStreamPosSubPath")
if err != nil { if err != nil {
@ -1725,8 +1701,6 @@ func (p *dataNodeConfig) init(bp *BaseParamTable) {
p.initDmlChannelName() p.initDmlChannelName()
p.initDeltaChannelName() p.initDeltaChannelName()
//p.initRoleName()
} }
// Refresh is called after session init // Refresh is called after session init
@ -1883,10 +1857,6 @@ func (p *dataNodeConfig) initMinioBucketName() {
p.MinioBucketName = bucketName p.MinioBucketName = bucketName
} }
//func (p *dataNodeConfig) initRoleName() {
// p.RoleName = "datanode"
//}
func (p *dataNodeConfig) initDmlChannelName() { func (p *dataNodeConfig) initDmlChannelName() {
config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.rootCoordDml") config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.rootCoordDml")
if err != nil { if err != nil {
@ -1940,7 +1910,6 @@ func (p *indexCoordConfig) init(bp *BaseParamTable) {
p.initMinIOUseSSL() p.initMinIOUseSSL()
p.initMinioBucketName() p.initMinioBucketName()
p.initIndexStorageRootPath() p.initIndexStorageRootPath()
//p.initRoleName()
} }
func (p *indexCoordConfig) initEtcdEndpoints() { func (p *indexCoordConfig) initEtcdEndpoints() {
@ -2032,10 +2001,6 @@ func (p *indexCoordConfig) initIndexStorageRootPath() {
p.IndexStorageRootPath = path.Join(rootPath, "index_files") p.IndexStorageRootPath = path.Join(rootPath, "index_files")
} }
//func (p *indexCoordConfig) initRoleName() {
// p.RoleName = "indexcoord"
//}
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// --- indexnode --- // --- indexnode ---
type indexNodeConfig struct { type indexNodeConfig struct {
@ -2075,7 +2040,6 @@ func (p *indexNodeConfig) init(bp *BaseParamTable) {
p.initEtcdEndpoints() p.initEtcdEndpoints()
p.initMetaRootPath() p.initMetaRootPath()
p.initIndexStorageRootPath() p.initIndexStorageRootPath()
//p.initRoleName()
p.initKnowhereSimdType() p.initKnowhereSimdType()
} }
@ -2155,10 +2119,6 @@ func (p *indexNodeConfig) initMinioBucketName() {
p.MinioBucketName = bucketName p.MinioBucketName = bucketName
} }
//func (p *indexNodeConfig) initRoleName() {
// p.RoleName = "indexnode"
//}
func (p *indexNodeConfig) initKnowhereSimdType() { func (p *indexNodeConfig) initKnowhereSimdType() {
simdType := p.BaseParams.LoadWithDefault("knowhere.simdType", "auto") simdType := p.BaseParams.LoadWithDefault("knowhere.simdType", "auto")
p.SimdType = simdType p.SimdType = simdType