diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 30282409fa..35a90b3e46 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -88,11 +88,8 @@ func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool) *compone go func() { rootcoord.Params.Init() - if !localMsg { - logutil.SetupLogger(&rootcoord.Params.Log) - defer log.Sync() - } - + f := setLoggerFunc(localMsg) + rootcoord.Params.SetLogConfig(f) factory := newMsgFactory(localMsg) var err error rc, err = components.NewRootCoord(ctx, factory) @@ -120,11 +117,8 @@ func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, alias string proxy.Params.InitAlias(alias) proxy.Params.Init() - if !localMsg { - logutil.SetupLogger(&proxy.Params.Log) - defer log.Sync() - } - + f := setLoggerFunc(localMsg) + proxy.Params.SetLogConfig(f) factory := newMsgFactory(localMsg) var err error pn, err = components.NewProxy(ctx, factory) @@ -151,11 +145,8 @@ func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool) *compon go func() { querycoord.Params.Init() - if !localMsg { - logutil.SetupLogger(&querycoord.Params.Log) - defer log.Sync() - } - + f := setLoggerFunc(localMsg) + querycoord.Params.SetLogConfig(f) factory := newMsgFactory(localMsg) var err error qs, err = components.NewQueryCoord(ctx, factory) @@ -183,11 +174,8 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, alias st querynode.Params.InitAlias(alias) querynode.Params.Init() - if !localMsg { - logutil.SetupLogger(&querynode.Params.Log) - defer log.Sync() - } - + f := setLoggerFunc(localMsg) + querynode.Params.SetLogConfig(f) factory := newMsgFactory(localMsg) var err error qn, err = components.NewQueryNode(ctx, factory) @@ -214,11 +202,8 @@ func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool) *compone go func() { datacoord.Params.Init() - if !localMsg { - logutil.SetupLogger(&datacoord.Params.Log) - defer log.Sync() - } - + f := setLoggerFunc(localMsg) + datacoord.Params.SetLogConfig(f) factory := newMsgFactory(localMsg) var err error ds, err = components.NewDataCoord(ctx, factory) @@ -245,12 +230,8 @@ func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, alias str go func() { datanode.Params.InitAlias(alias) datanode.Params.Init() - - if !localMsg { - logutil.SetupLogger(&datanode.Params.Log) - defer log.Sync() - } - + f := setLoggerFunc(localMsg) + datanode.Params.SetLogConfig(f) factory := newMsgFactory(localMsg) var err error dn, err = components.NewDataNode(ctx, factory) @@ -277,11 +258,8 @@ func (mr *MilvusRoles) runIndexCoord(ctx context.Context, localMsg bool) *compon go func() { indexcoord.Params.Init() - if !localMsg { - logutil.SetupLogger(&indexcoord.Params.Log) - defer log.Sync() - } - + f := setLoggerFunc(localMsg) + indexcoord.Params.SetLogConfig(f) var err error is, err = components.NewIndexCoord(ctx) if err != nil { @@ -308,11 +286,8 @@ func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool, alias st indexnode.Params.InitAlias(alias) indexnode.Params.Init() - if !localMsg { - logutil.SetupLogger(&indexnode.Params.Log) - defer log.Sync() - } - + f := setLoggerFunc(localMsg) + indexnode.Params.SetLogConfig(f) var err error in, err = components.NewIndexNode(ctx) if err != nil { @@ -492,3 +467,15 @@ func (mr *MilvusRoles) Run(localMsg bool, alias string) { // some deferred Stop has race with context cancel cancel() } + +func setLoggerFunc(localMsg bool) func(cfg log.Config) { + if !localMsg { + return func(cfg log.Config) { + log.Info("Set log file to ", zap.String("path", cfg.File.Filename)) + logutil.SetupLogger(&cfg) + defer log.Sync() + } + } + // no need to setup logger for standalone + return func(cfg log.Config) {} +} diff --git a/internal/datacoord/param_table.go b/internal/datacoord/param_table.go index 59fddb7578..f8346122b1 100644 --- a/internal/datacoord/param_table.go +++ b/internal/datacoord/param_table.go @@ -93,7 +93,7 @@ func (p *ParamTable) Init() { p.initTimeTickChannelName() p.initSegmentInfoChannelName() p.initDataCoordSubscriptionName() - p.initLogCfg() + p.initRoleName() p.initFlushStreamPosSubPath() p.initStatsStreamPosSubPath() @@ -235,8 +235,8 @@ func (p *ParamTable) initDataCoordSubscriptionName() { p.DataCoordSubscriptionName = strings.Join(s, "-") } -func (p *ParamTable) initLogCfg() { - p.InitLogCfg("datacoord", 0) +func (p *ParamTable) initRoleName() { + p.RoleName = "datacoord" } func (p *ParamTable) initFlushStreamPosSubPath() { diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 180b47e154..2892b0aef3 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -185,6 +185,7 @@ func (s *Server) Register() error { } s.liveCh = s.session.Init(typeutil.DataCoordRole, Params.IP, true) Params.NodeID = s.session.ServerID + Params.SetLogger(typeutil.UniqueID(-1)) return nil } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index cceefe9b2f..07be2d03b8 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -162,6 +162,7 @@ func (node *DataNode) Register() error { node.liveCh = node.session.Init(typeutil.DataNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false) Params.NodeID = node.session.ServerID node.NodeID = node.session.ServerID + Params.SetLogger(Params.NodeID) // Start node watch node go node.StartWatchChannels(node.ctx) diff --git a/internal/datanode/param_table.go b/internal/datanode/param_table.go index 6111efcbc6..1ead872b7b 100644 --- a/internal/datanode/param_table.go +++ b/internal/datanode/param_table.go @@ -127,7 +127,7 @@ func (p *ParamTable) Init() { p.initMinioUseSSL() p.initMinioBucketName() - p.initLogCfg() + p.initRoleName() } // ==== DataNode internal components configs ==== @@ -277,6 +277,6 @@ func (p *ParamTable) initMinioBucketName() { p.MinioBucketName = bucketName } -func (p *ParamTable) initLogCfg() { - p.InitLogCfg("datanode", p.NodeID) +func (p *ParamTable) initRoleName() { + p.RoleName = "datanode" } diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index 1315c3e317..4854cc9c61 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -113,6 +113,7 @@ func (i *IndexCoord) Register() error { return errors.New("failed to initialize session") } i.liveCh = i.session.Init(typeutil.IndexCoordRole, Params.Address, true) + Params.SetLogger(typeutil.UniqueID(-1)) return nil } diff --git a/internal/indexcoord/param_table.go b/internal/indexcoord/param_table.go index ef924cdba2..e507934c6e 100644 --- a/internal/indexcoord/param_table.go +++ b/internal/indexcoord/param_table.go @@ -56,7 +56,6 @@ func (pt *ParamTable) Init() { panic(err) }*/ - pt.initLogCfg() pt.initEtcdEndpoints() pt.initMetaRootPath() pt.initKvRootPath() @@ -66,6 +65,7 @@ func (pt *ParamTable) Init() { pt.initMinIOUseSSL() pt.initMinioBucketName() pt.initIndexRootPath() + pt.initRoleName() } // InitOnce is used to initialize configuration items, and it will only be called once. @@ -158,6 +158,6 @@ func (pt *ParamTable) initIndexRootPath() { pt.IndexRootPath = path.Join(rootPath, "index_files") } -func (pt *ParamTable) initLogCfg() { - pt.InitLogCfg("indexcoord", 0) +func (pt *ParamTable) initRoleName() { + pt.RoleName = "indexcoord" } diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 3d7ed404bb..e6440754f3 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -106,8 +106,7 @@ func (i *IndexNode) Register() error { } i.liveCh = i.session.Init(typeutil.IndexNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false) Params.NodeID = i.session.ServerID - //TODO reset logger - //Params.initLogCfg() + Params.SetLogger(Params.NodeID) return nil } diff --git a/internal/indexnode/param_table.go b/internal/indexnode/param_table.go index bd77e44cab..1b435cb587 100644 --- a/internal/indexnode/param_table.go +++ b/internal/indexnode/param_table.go @@ -92,7 +92,7 @@ func (pt *ParamTable) initParams() { pt.initEtcdEndpoints() pt.initMetaRootPath() pt.initIndexRootPath() - pt.initLogCfg() + pt.initRoleName() } func (pt *ParamTable) initMinIOAddress() { @@ -166,8 +166,8 @@ func (pt *ParamTable) initMinioBucketName() { pt.MinioBucketName = bucketName } -func (pt *ParamTable) initLogCfg() { - pt.InitLogCfg("indexnode", pt.NodeID) +func (pt *ParamTable) initRoleName() { + pt.RoleName = "indexnode" } func (pt *ParamTable) initKnowhereSimdType() { diff --git a/internal/proxy/param_table.go b/internal/proxy/param_table.go index 64b260592c..c5d0159ab6 100644 --- a/internal/proxy/param_table.go +++ b/internal/proxy/param_table.go @@ -12,7 +12,6 @@ package proxy import ( - "fmt" "path" "strconv" "strings" @@ -66,7 +65,6 @@ type ParamTable struct { MaxTaskNum int64 PulsarMaxMessageSize int - RoleName string } var Params ParamTable @@ -107,7 +105,7 @@ func (pt *ParamTable) Init() { pt.initMaxTaskNum() - Params.initLogCfg() + pt.initRoleName() } func (pt *ParamTable) InitAlias(alias string) { @@ -273,12 +271,8 @@ func (pt *ParamTable) initPulsarMaxMessageSize() { } } -func (pt *ParamTable) initLogCfg() { - pt.InitLogCfg("proxy", pt.ProxyID) -} - func (pt *ParamTable) initRoleName() { - pt.RoleName = fmt.Sprintf("%s-%s", "Proxy", pt.Alias) + pt.RoleName = "proxy" } func (pt *ParamTable) initEtcdEndpoints() { diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 09d89f3262..62e144115c 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -99,6 +99,7 @@ func (node *Proxy) Register() error { node.session = sessionutil.NewSession(node.ctx, Params.MetaRootPath, Params.EtcdEndpoints) node.session.Init(typeutil.ProxyRole, Params.NetworkAddress, false) Params.ProxyID = node.session.ServerID + Params.SetLogger(Params.ProxyID) Params.initProxySubName() // TODO Reset the logger //Params.initLogCfg() diff --git a/internal/querycoord/param_table.go b/internal/querycoord/param_table.go index fda68e432a..6a2fcb1ced 100644 --- a/internal/querycoord/param_table.go +++ b/internal/querycoord/param_table.go @@ -12,7 +12,6 @@ package querycoord import ( - "fmt" "path" "strconv" "strings" @@ -40,8 +39,6 @@ type ParamTable struct { // timetick TimeTickChannelName string - RoleName string - // channels ClusterChannelPrefix string SearchChannelPrefix string @@ -80,8 +77,6 @@ func (p *ParamTable) Init() { panic(err) } - p.initLogCfg() - p.initQueryCoordAddress() p.initRoleName() @@ -113,10 +108,6 @@ func (p *ParamTable) initQueryCoordAddress() { p.Address = url } -func (p *ParamTable) initRoleName() { - p.RoleName = fmt.Sprintf("%s-%d", "QueryCoord", p.NodeID) -} - func (p *ParamTable) initClusterMsgChannelPrefix() { config, err := p.Load("msgChannel.chanNamePrefix.cluster") if err != nil { @@ -238,6 +229,6 @@ func (p *ParamTable) initMinioBucketName() { p.MinioBucketName = bucketName } -func (p *ParamTable) initLogCfg() { - p.InitLogCfg("querycoord", 0) +func (p *ParamTable) initRoleName() { + p.RoleName = "querycoord" } diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 1b432b29fe..1b1c3d8bcb 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -79,6 +79,7 @@ func (qc *QueryCoord) Register() error { qc.session = sessionutil.NewSession(qc.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints) qc.liveCh = qc.session.Init(typeutil.QueryCoordRole, Params.Address, true) Params.NodeID = uint64(qc.session.ServerID) + Params.SetLogger(typeutil.UniqueID(-1)) return nil } diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index 732c624ec3..5cf29c64a8 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -133,7 +133,7 @@ func (p *ParamTable) Init() { p.initSegcoreChunkRows() p.initKnowhereSimdType() - p.initLogCfg() + p.initRoleName() } func (p *ParamTable) initCacheSize() { @@ -320,6 +320,6 @@ func (p *ParamTable) initKnowhereSimdType() { p.SimdType = simdType } -func (p *ParamTable) initLogCfg() { - p.InitLogCfg("querynode", p.QueryNodeID) +func (p *ParamTable) initRoleName() { + p.RoleName = "querynode" } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 7f030ae9cf..fbf6d11cff 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -113,6 +113,7 @@ func (node *QueryNode) Register() error { node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.MetaRootPath, Params.EtcdEndpoints) node.liveCh = node.session.Init(typeutil.QueryNodeRole, Params.QueryNodeIP+":"+strconv.FormatInt(Params.QueryNodePort, 10), false) Params.QueryNodeID = node.session.ServerID + Params.SetLogger(Params.QueryNodeID) log.Debug("query nodeID", zap.Int64("nodeID", Params.QueryNodeID)) log.Debug("query node address", zap.String("address", node.session.Address)) diff --git a/internal/rootcoord/param_table.go b/internal/rootcoord/param_table.go index 3b1023e5b9..4c6faa8f27 100644 --- a/internal/rootcoord/param_table.go +++ b/internal/rootcoord/param_table.go @@ -52,8 +52,6 @@ type ParamTable struct { CreatedTime time.Time UpdatedTime time.Time - - RoleName string } // InitOnce initialize once @@ -93,7 +91,6 @@ func (p *ParamTable) Init() { p.initTimeout() p.initTimeTickInterval() - p.initLogCfg() p.initRoleName() } @@ -217,10 +214,6 @@ func (p *ParamTable) initTimeTickInterval() { p.TimeTickInterval = p.ParseInt("rootcoord.timeTickInterval") } -func (p *ParamTable) initLogCfg() { - p.InitLogCfg("rootcoord", 0) -} - func (p *ParamTable) initRoleName() { - p.RoleName = "RootCoord" + p.RoleName = "rootcoord" } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 3700d1b06e..cffc4bb7ff 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -864,6 +864,7 @@ func (c *Core) Register() error { return fmt.Errorf("session is nil, maybe the etcd client connection fails") } c.sessCloseCh = c.session.Init(typeutil.RootCoordRole, Params.Address, true) + Params.SetLogger(typeutil.UniqueID(-1)) return nil } diff --git a/internal/util/paramtable/basetable.go b/internal/util/paramtable/basetable.go index 9a02bc4b52..714346b1e5 100644 --- a/internal/util/paramtable/basetable.go +++ b/internal/util/paramtable/basetable.go @@ -44,7 +44,9 @@ type BaseTable struct { params *memkv.MemoryKV configDir string - Log log.Config + RoleName string + Log log.Config + LogConfigFunction func(log.Config) } func (gp *BaseTable) Init() { @@ -59,6 +61,8 @@ func (gp *BaseTable) Init() { gp.loadFromCommonYaml() gp.tryloadFromEnv() + + gp.InitLogCfg() } func (gp *BaseTable) GetConfigDir() string { @@ -419,7 +423,7 @@ func ConvertRangeToIntSlice(rangeStr, sep string) []int { return ret } -func (gp *BaseTable) InitLogCfg(role string, id UniqueID) { +func (gp *BaseTable) InitLogCfg() { gp.Log = log.Config{} format, err := gp.Load("log.format") if err != nil { @@ -434,13 +438,29 @@ func (gp *BaseTable) InitLogCfg(role string, id UniqueID) { gp.Log.File.MaxSize = gp.ParseInt("log.file.maxSize") gp.Log.File.MaxBackups = gp.ParseInt("log.file.maxBackups") gp.Log.File.MaxDays = gp.ParseInt("log.file.maxAge") +} + +func (gp *BaseTable) SetLogConfig(f func(log.Config)) { + gp.LogConfigFunction = f +} + +func (gp *BaseTable) SetLogger(id UniqueID) { rootPath, err := gp.Load("log.file.rootPath") if err != nil { panic(err) } if len(rootPath) != 0 { - gp.Log.File.Filename = path.Join(rootPath, role+"-"+strconv.FormatInt(id, 10)+".log") + log.Debug("Set logger ", zap.Int64("id", id), zap.String("role", gp.RoleName)) + if id < 0 { + gp.Log.File.Filename = path.Join(rootPath, gp.RoleName+".log") + } else { + gp.Log.File.Filename = path.Join(rootPath, gp.RoleName+"-"+strconv.FormatInt(id, 10)+".log") + } } else { gp.Log.File.Filename = "" } + + if gp.LogConfigFunction != nil { + gp.LogConfigFunction(gp.Log) + } } diff --git a/internal/util/paramtable/basetable_test.go b/internal/util/paramtable/basetable_test.go index f1f5b88d79..8b4ac911f7 100644 --- a/internal/util/paramtable/basetable_test.go +++ b/internal/util/paramtable/basetable_test.go @@ -12,6 +12,7 @@ package paramtable import ( + "fmt" "os" "testing" @@ -224,16 +225,20 @@ func Test_ConvertRangeToIntSlice(t *testing.T) { }) } -func Test_InitLogCfg(t *testing.T) { - t.Run("TestInitLogCfg", func(t *testing.T) { - baseParams.InitLogCfg("rootcoord", 0) - assert.Equal(t, baseParams.Log.File.Filename, "") +func Test_SetLogger(t *testing.T) { + t.Run("TestSetLooger", func(t *testing.T) { + baseParams.RoleName = "rootcoord" + baseParams.Save("log.file.rootPath", ".") + baseParams.SetLogger(UniqueID(-1)) + fmt.Println(baseParams.Log.File.Filename) + assert.Equal(t, "rootcoord.log", baseParams.Log.File.Filename) - baseParams.Save("log.file.rootPath", "/") - baseParams.InitLogCfg("rootcoord", 0) - assert.Equal(t, baseParams.Log.File.Filename, "/rootcoord-0.log") + baseParams.RoleName = "datanode" + baseParams.SetLogger(UniqueID(1)) + assert.Equal(t, "datanode-1.log", baseParams.Log.File.Filename) - baseParams.InitLogCfg("datanode", 8) - assert.Equal(t, baseParams.Log.File.Filename, "/datanode-8.log") + baseParams.RoleName = "datanode" + baseParams.SetLogger(UniqueID(0)) + assert.Equal(t, "datanode-0.log", baseParams.Log.File.Filename) }) }