From d34bb2641454752bcd9a09eba76b3cbd6836ab8a Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Tue, 28 Dec 2021 23:12:46 +0800 Subject: [PATCH] Set rolname when init GlobalParamTable (#14422) Signed-off-by: yudong.cai --- cmd/roles/roles.go | 71 ++++++++++++--------- internal/util/paramtable/base_param.go | 3 - internal/util/paramtable/base_param_test.go | 2 +- internal/util/paramtable/base_table.go | 9 ++- internal/util/paramtable/global_param.go | 50 ++------------- 5 files changed, 55 insertions(+), 80 deletions(-) diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index a68146c562..db9632afa4 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -45,9 +45,9 @@ import ( "github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/util/healthz" "github.com/milvus-io/milvus/internal/util/metricsinfo" - "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq" "github.com/milvus-io/milvus/internal/util/trace" + "github.com/milvus-io/milvus/internal/util/typeutil" ) func newMsgFactory(localMsg bool) msgstream.Factory { @@ -92,9 +92,12 @@ func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool) *compone wg.Add(1) go func() { rootcoord.Params.InitOnce() + if localMsg { + rootcoord.Params.SetLogConfig(typeutil.StandaloneRole) + } else { + rootcoord.Params.SetLogConfig(typeutil.RootCoordRole) + } - f := setLoggerFunc() - rootcoord.Params.BaseParams.SetLogConfig(f) factory := newMsgFactory(localMsg) var err error rc, err = components.NewRootCoord(ctx, factory) @@ -121,9 +124,12 @@ func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, alias string go func() { proxy.Params.ProxyCfg.InitAlias(alias) proxy.Params.InitOnce() + if localMsg { + proxy.Params.SetLogConfig(typeutil.StandaloneRole) + } else { + proxy.Params.SetLogConfig(typeutil.ProxyRole) + } - f := setLoggerFunc() - proxy.Params.BaseParams.SetLogConfig(f) factory := newMsgFactory(localMsg) var err error pn, err = components.NewProxy(ctx, factory) @@ -149,9 +155,12 @@ func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool) *compon wg.Add(1) go func() { querycoord.Params.InitOnce() + if localMsg { + querycoord.Params.SetLogConfig(typeutil.StandaloneRole) + } else { + querycoord.Params.SetLogConfig(typeutil.QueryCoordRole) + } - f := setLoggerFunc() - querycoord.Params.BaseParams.SetLogConfig(f) factory := newMsgFactory(localMsg) var err error qs, err = components.NewQueryCoord(ctx, factory) @@ -178,9 +187,12 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, alias st go func() { querynode.Params.QueryNodeCfg.InitAlias(alias) querynode.Params.InitOnce() + if localMsg { + querynode.Params.SetLogConfig(typeutil.StandaloneRole) + } else { + querynode.Params.SetLogConfig(typeutil.QueryNodeRole) + } - f := setLoggerFunc() - querynode.Params.BaseParams.SetLogConfig(f) factory := newMsgFactory(localMsg) var err error qn, err = components.NewQueryNode(ctx, factory) @@ -206,9 +218,12 @@ func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool) *compone wg.Add(1) go func() { datacoord.Params.InitOnce() + if localMsg { + datacoord.Params.SetLogConfig(typeutil.StandaloneRole) + } else { + datacoord.Params.SetLogConfig(typeutil.DataCoordRole) + } - f := setLoggerFunc() - datacoord.Params.BaseParams.SetLogConfig(f) factory := newMsgFactory(localMsg) dctx := logutil.WithModule(ctx, "DataCoord") @@ -237,8 +252,12 @@ func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, alias str go func() { datanode.Params.DataNodeCfg.InitAlias(alias) datanode.Params.InitOnce() - f := setLoggerFunc() - datanode.Params.BaseParams.SetLogConfig(f) + if localMsg { + datanode.Params.SetLogConfig(typeutil.StandaloneRole) + } else { + datanode.Params.SetLogConfig(typeutil.DataNodeRole) + } + factory := newMsgFactory(localMsg) var err error dn, err = components.NewDataNode(ctx, factory) @@ -264,9 +283,12 @@ func (mr *MilvusRoles) runIndexCoord(ctx context.Context, localMsg bool) *compon wg.Add(1) go func() { indexcoord.Params.InitOnce() + if localMsg { + indexcoord.Params.SetLogConfig(typeutil.StandaloneRole) + } else { + indexcoord.Params.SetLogConfig(typeutil.IndexCoordRole) + } - f := setLoggerFunc() - indexcoord.Params.BaseParams.SetLogConfig(f) var err error is, err = components.NewIndexCoord(ctx) if err != nil { @@ -292,9 +314,12 @@ func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool, alias st go func() { indexnode.Params.IndexNodeCfg.InitAlias(alias) indexnode.Params.InitOnce() + if localMsg { + indexnode.Params.SetLogConfig(typeutil.StandaloneRole) + } else { + indexnode.Params.SetLogConfig(typeutil.IndexNodeRole) + } - f := setLoggerFunc() - indexnode.Params.BaseParams.SetLogConfig(f) var err error in, err = components.NewIndexNode(ctx) if err != nil { @@ -329,10 +354,6 @@ func (mr *MilvusRoles) Run(localMsg bool, alias string) { log.Error("Failed to set deploy mode: ", zap.Error(err)) } - paramtable.Params.Init() - f := setLoggerFunc() - paramtable.Params.SetLogConfig(f) - if err := initRocksmq(); err != nil { panic(err) } @@ -466,11 +487,3 @@ func (mr *MilvusRoles) Run(localMsg bool, alias string) { // some deferred Stop has race with context cancel cancel() } - -func setLoggerFunc() func(cfg log.Config) { - return func(cfg log.Config) { - log.Info("Set log file to ", zap.String("path", cfg.File.Filename)) - logutil.SetupLogger(&cfg) - defer log.Sync() - } -} diff --git a/internal/util/paramtable/base_param.go b/internal/util/paramtable/base_param.go index ecc9c5b197..e2ab1a53c2 100644 --- a/internal/util/paramtable/base_param.go +++ b/internal/util/paramtable/base_param.go @@ -20,9 +20,6 @@ import ( "github.com/milvus-io/milvus/internal/util/metricsinfo" ) -// Params is a package scoped variable of type BaseParamTable. -var Params BaseParamTable - // BaseParamTable is a derived struct of BaseTable. It achieves Composition by // embedding BaseTable. It is used to quickly and easily access the system configuration. type BaseParamTable struct { diff --git a/internal/util/paramtable/base_param_test.go b/internal/util/paramtable/base_param_test.go index 2af5b7a3c8..090bdab88a 100644 --- a/internal/util/paramtable/base_param_test.go +++ b/internal/util/paramtable/base_param_test.go @@ -16,11 +16,11 @@ import ( "testing" "github.com/milvus-io/milvus/internal/util/metricsinfo" - "github.com/stretchr/testify/assert" ) func TestBaseParamTable(t *testing.T) { + var Params BaseParamTable Params.Init() assert.NotZero(t, len(Params.EtcdEndpoints)) diff --git a/internal/util/paramtable/base_table.go b/internal/util/paramtable/base_table.go index 0ddd8a012a..115073cf83 100644 --- a/internal/util/paramtable/base_table.go +++ b/internal/util/paramtable/base_table.go @@ -24,6 +24,7 @@ import ( memkv "github.com/milvus-io/milvus/internal/kv/mem" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/logutil" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/spf13/cast" @@ -451,8 +452,12 @@ func (gp *BaseTable) InitLogCfg() { gp.Log.File.MaxDays = gp.ParseInt("log.file.maxAge") } -func (gp *BaseTable) SetLogConfig(f func(log.Config)) { - gp.LogCfgFunc = f +func (gp *BaseTable) SetLogConfig() { + gp.LogCfgFunc = func(cfg log.Config) { + log.Info("Set log file to ", zap.String("path", cfg.File.Filename)) + logutil.SetupLogger(&cfg) + defer log.Sync() + } } func (gp *BaseTable) SetLogger(id UniqueID) { diff --git a/internal/util/paramtable/global_param.go b/internal/util/paramtable/global_param.go index c75b3bdd44..65d825fb21 100644 --- a/internal/util/paramtable/global_param.go +++ b/internal/util/paramtable/global_param.go @@ -88,6 +88,11 @@ func (p *GlobalParamTable) Init() { p.IndexNodeCfg.init(&p.BaseParams) } +func (p *GlobalParamTable) SetLogConfig(role string) { + p.BaseParams.RoleName = role + p.BaseParams.SetLogConfig() +} + /////////////////////////////////////////////////////////////////////////////// // --- common --- //type commonConfig struct { @@ -330,8 +335,6 @@ func (p *rootCoordConfig) init(bp *BaseParamTable) { p.initDefaultIndexName() p.initTimeout() - - //p.initRoleName() } func (p *rootCoordConfig) initPulsarAddress() { @@ -453,10 +456,6 @@ func (p *rootCoordConfig) initTimeout() { p.Timeout = p.BaseParams.ParseIntWithDefault("rootCoord.timeout", 3600) } -//func (p *rootCoordConfig) initRoleName() { -// p.RoleName = "rootcoord" -//} - /////////////////////////////////////////////////////////////////////////////// // --- proxy --- type proxyConfig struct { @@ -531,8 +530,6 @@ func (p *proxyConfig) init(bp *BaseParamTable) { p.initMaxTaskNum() p.initBufFlagExpireTime() p.initBufFlagCleanupInterval() - - //p.initRoleName() } // Refresh is called after session init @@ -657,10 +654,6 @@ func (p *proxyConfig) initPulsarMaxMessageSize() { } } -//func (p *proxyConfig) initRoleName() { -// p.RoleName = "proxy" -//} - func (p *proxyConfig) initEtcdEndpoints() { endpoints, err := p.BaseParams.Load("_EtcdEndpoints") if err != nil { @@ -751,8 +744,6 @@ type queryCoordConfig struct { func (p *queryCoordConfig) init(bp *BaseParamTable) { p.BaseParams = bp - //p.initRoleName() - // --- Channels --- p.initClusterMsgChannelPrefix() p.initSearchChannelPrefix() @@ -909,10 +900,6 @@ func (p *queryCoordConfig) initMinioBucketName() { p.MinioBucketName = bucketName } -//func (p *queryCoordConfig) initRoleName() { -// p.RoleName = "querycoord" -//} - func (p *queryCoordConfig) initPulsarAddress() { addr, err := p.BaseParams.Load("_PulsarAddress") if err != nil { @@ -1089,8 +1076,6 @@ func (p *queryNodeConfig) init(bp *BaseParamTable) { p.initSegcoreChunkRows() p.initKnowhereSimdType() - //p.initRoleName() - p.initSkipQueryChannelRecovery() p.initOverloadedMemoryThresholdPercentage() } @@ -1288,10 +1273,6 @@ func (p *queryNodeConfig) initKnowhereSimdType() { log.Debug("initialize the knowhere simd type", zap.String("simd_type", p.SimdType)) } -//func (p *queryNodeConfig) initRoleName() { -// p.RoleName = "querynode" -//} - func (p *queryNodeConfig) initSkipQueryChannelRecovery() { p.SkipQueryChannelRecovery = p.BaseParams.ParseBool("msgChannel.skipQueryChannelRecovery", false) } @@ -1391,7 +1372,6 @@ func (p *dataCoordConfig) init(bp *BaseParamTable) { p.initTimeTickChannelName() p.initSegmentInfoChannelName() p.initDataCoordSubscriptionName() - //p.initRoleName() p.initFlushStreamPosSubPath() p.initStatsStreamPosSubPath() @@ -1534,10 +1514,6 @@ func (p *dataCoordConfig) initDataCoordSubscriptionName() { p.DataCoordSubscriptionName = strings.Join(s, "-") } -//func (p *dataCoordConfig) initRoleName() { -// p.RoleName = "datacoord" -//} - func (p *dataCoordConfig) initFlushStreamPosSubPath() { subPath, err := p.BaseParams.Load("etcd.flushStreamPosSubPath") if err != nil { @@ -1725,8 +1701,6 @@ func (p *dataNodeConfig) init(bp *BaseParamTable) { p.initDmlChannelName() p.initDeltaChannelName() - - //p.initRoleName() } // Refresh is called after session init @@ -1883,10 +1857,6 @@ func (p *dataNodeConfig) initMinioBucketName() { p.MinioBucketName = bucketName } -//func (p *dataNodeConfig) initRoleName() { -// p.RoleName = "datanode" -//} - func (p *dataNodeConfig) initDmlChannelName() { config, err := p.BaseParams.Load("msgChannel.chanNamePrefix.rootCoordDml") if err != nil { @@ -1940,7 +1910,6 @@ func (p *indexCoordConfig) init(bp *BaseParamTable) { p.initMinIOUseSSL() p.initMinioBucketName() p.initIndexStorageRootPath() - //p.initRoleName() } func (p *indexCoordConfig) initEtcdEndpoints() { @@ -2032,10 +2001,6 @@ func (p *indexCoordConfig) initIndexStorageRootPath() { p.IndexStorageRootPath = path.Join(rootPath, "index_files") } -//func (p *indexCoordConfig) initRoleName() { -// p.RoleName = "indexcoord" -//} - /////////////////////////////////////////////////////////////////////////////// // --- indexnode --- type indexNodeConfig struct { @@ -2075,7 +2040,6 @@ func (p *indexNodeConfig) init(bp *BaseParamTable) { p.initEtcdEndpoints() p.initMetaRootPath() p.initIndexStorageRootPath() - //p.initRoleName() p.initKnowhereSimdType() } @@ -2155,10 +2119,6 @@ func (p *indexNodeConfig) initMinioBucketName() { p.MinioBucketName = bucketName } -//func (p *indexNodeConfig) initRoleName() { -// p.RoleName = "indexnode" -//} - func (p *indexNodeConfig) initKnowhereSimdType() { simdType := p.BaseParams.LoadWithDefault("knowhere.simdType", "auto") p.SimdType = simdType