mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Change Log Name with NodeId (#8721)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
This commit is contained in:
parent
436b5d4d41
commit
2c2b800ff4
@ -88,11 +88,8 @@ func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool) *compone
|
||||
go func() {
|
||||
rootcoord.Params.Init()
|
||||
|
||||
if !localMsg {
|
||||
logutil.SetupLogger(&rootcoord.Params.Log)
|
||||
defer log.Sync()
|
||||
}
|
||||
|
||||
f := setLoggerFunc(localMsg)
|
||||
rootcoord.Params.SetLogConfig(f)
|
||||
factory := newMsgFactory(localMsg)
|
||||
var err error
|
||||
rc, err = components.NewRootCoord(ctx, factory)
|
||||
@ -120,11 +117,8 @@ func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, alias string
|
||||
proxy.Params.InitAlias(alias)
|
||||
proxy.Params.Init()
|
||||
|
||||
if !localMsg {
|
||||
logutil.SetupLogger(&proxy.Params.Log)
|
||||
defer log.Sync()
|
||||
}
|
||||
|
||||
f := setLoggerFunc(localMsg)
|
||||
proxy.Params.SetLogConfig(f)
|
||||
factory := newMsgFactory(localMsg)
|
||||
var err error
|
||||
pn, err = components.NewProxy(ctx, factory)
|
||||
@ -151,11 +145,8 @@ func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool) *compon
|
||||
go func() {
|
||||
querycoord.Params.Init()
|
||||
|
||||
if !localMsg {
|
||||
logutil.SetupLogger(&querycoord.Params.Log)
|
||||
defer log.Sync()
|
||||
}
|
||||
|
||||
f := setLoggerFunc(localMsg)
|
||||
querycoord.Params.SetLogConfig(f)
|
||||
factory := newMsgFactory(localMsg)
|
||||
var err error
|
||||
qs, err = components.NewQueryCoord(ctx, factory)
|
||||
@ -183,11 +174,8 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, alias st
|
||||
querynode.Params.InitAlias(alias)
|
||||
querynode.Params.Init()
|
||||
|
||||
if !localMsg {
|
||||
logutil.SetupLogger(&querynode.Params.Log)
|
||||
defer log.Sync()
|
||||
}
|
||||
|
||||
f := setLoggerFunc(localMsg)
|
||||
querynode.Params.SetLogConfig(f)
|
||||
factory := newMsgFactory(localMsg)
|
||||
var err error
|
||||
qn, err = components.NewQueryNode(ctx, factory)
|
||||
@ -214,11 +202,8 @@ func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool) *compone
|
||||
go func() {
|
||||
datacoord.Params.Init()
|
||||
|
||||
if !localMsg {
|
||||
logutil.SetupLogger(&datacoord.Params.Log)
|
||||
defer log.Sync()
|
||||
}
|
||||
|
||||
f := setLoggerFunc(localMsg)
|
||||
datacoord.Params.SetLogConfig(f)
|
||||
factory := newMsgFactory(localMsg)
|
||||
var err error
|
||||
ds, err = components.NewDataCoord(ctx, factory)
|
||||
@ -245,12 +230,8 @@ func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, alias str
|
||||
go func() {
|
||||
datanode.Params.InitAlias(alias)
|
||||
datanode.Params.Init()
|
||||
|
||||
if !localMsg {
|
||||
logutil.SetupLogger(&datanode.Params.Log)
|
||||
defer log.Sync()
|
||||
}
|
||||
|
||||
f := setLoggerFunc(localMsg)
|
||||
datanode.Params.SetLogConfig(f)
|
||||
factory := newMsgFactory(localMsg)
|
||||
var err error
|
||||
dn, err = components.NewDataNode(ctx, factory)
|
||||
@ -277,11 +258,8 @@ func (mr *MilvusRoles) runIndexCoord(ctx context.Context, localMsg bool) *compon
|
||||
go func() {
|
||||
indexcoord.Params.Init()
|
||||
|
||||
if !localMsg {
|
||||
logutil.SetupLogger(&indexcoord.Params.Log)
|
||||
defer log.Sync()
|
||||
}
|
||||
|
||||
f := setLoggerFunc(localMsg)
|
||||
indexcoord.Params.SetLogConfig(f)
|
||||
var err error
|
||||
is, err = components.NewIndexCoord(ctx)
|
||||
if err != nil {
|
||||
@ -308,11 +286,8 @@ func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool, alias st
|
||||
indexnode.Params.InitAlias(alias)
|
||||
indexnode.Params.Init()
|
||||
|
||||
if !localMsg {
|
||||
logutil.SetupLogger(&indexnode.Params.Log)
|
||||
defer log.Sync()
|
||||
}
|
||||
|
||||
f := setLoggerFunc(localMsg)
|
||||
indexnode.Params.SetLogConfig(f)
|
||||
var err error
|
||||
in, err = components.NewIndexNode(ctx)
|
||||
if err != nil {
|
||||
@ -492,3 +467,15 @@ func (mr *MilvusRoles) Run(localMsg bool, alias string) {
|
||||
// some deferred Stop has race with context cancel
|
||||
cancel()
|
||||
}
|
||||
|
||||
func setLoggerFunc(localMsg bool) func(cfg log.Config) {
|
||||
if !localMsg {
|
||||
return func(cfg log.Config) {
|
||||
log.Info("Set log file to ", zap.String("path", cfg.File.Filename))
|
||||
logutil.SetupLogger(&cfg)
|
||||
defer log.Sync()
|
||||
}
|
||||
}
|
||||
// no need to setup logger for standalone
|
||||
return func(cfg log.Config) {}
|
||||
}
|
||||
|
||||
@ -93,7 +93,7 @@ func (p *ParamTable) Init() {
|
||||
p.initTimeTickChannelName()
|
||||
p.initSegmentInfoChannelName()
|
||||
p.initDataCoordSubscriptionName()
|
||||
p.initLogCfg()
|
||||
p.initRoleName()
|
||||
|
||||
p.initFlushStreamPosSubPath()
|
||||
p.initStatsStreamPosSubPath()
|
||||
@ -235,8 +235,8 @@ func (p *ParamTable) initDataCoordSubscriptionName() {
|
||||
p.DataCoordSubscriptionName = strings.Join(s, "-")
|
||||
}
|
||||
|
||||
func (p *ParamTable) initLogCfg() {
|
||||
p.InitLogCfg("datacoord", 0)
|
||||
func (p *ParamTable) initRoleName() {
|
||||
p.RoleName = "datacoord"
|
||||
}
|
||||
|
||||
func (p *ParamTable) initFlushStreamPosSubPath() {
|
||||
|
||||
@ -185,6 +185,7 @@ func (s *Server) Register() error {
|
||||
}
|
||||
s.liveCh = s.session.Init(typeutil.DataCoordRole, Params.IP, true)
|
||||
Params.NodeID = s.session.ServerID
|
||||
Params.SetLogger(typeutil.UniqueID(-1))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -162,6 +162,7 @@ func (node *DataNode) Register() error {
|
||||
node.liveCh = node.session.Init(typeutil.DataNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
|
||||
Params.NodeID = node.session.ServerID
|
||||
node.NodeID = node.session.ServerID
|
||||
Params.SetLogger(Params.NodeID)
|
||||
// Start node watch node
|
||||
go node.StartWatchChannels(node.ctx)
|
||||
|
||||
|
||||
@ -127,7 +127,7 @@ func (p *ParamTable) Init() {
|
||||
p.initMinioUseSSL()
|
||||
p.initMinioBucketName()
|
||||
|
||||
p.initLogCfg()
|
||||
p.initRoleName()
|
||||
}
|
||||
|
||||
// ==== DataNode internal components configs ====
|
||||
@ -277,6 +277,6 @@ func (p *ParamTable) initMinioBucketName() {
|
||||
p.MinioBucketName = bucketName
|
||||
}
|
||||
|
||||
func (p *ParamTable) initLogCfg() {
|
||||
p.InitLogCfg("datanode", p.NodeID)
|
||||
func (p *ParamTable) initRoleName() {
|
||||
p.RoleName = "datanode"
|
||||
}
|
||||
|
||||
@ -113,6 +113,7 @@ func (i *IndexCoord) Register() error {
|
||||
return errors.New("failed to initialize session")
|
||||
}
|
||||
i.liveCh = i.session.Init(typeutil.IndexCoordRole, Params.Address, true)
|
||||
Params.SetLogger(typeutil.UniqueID(-1))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -56,7 +56,6 @@ func (pt *ParamTable) Init() {
|
||||
panic(err)
|
||||
}*/
|
||||
|
||||
pt.initLogCfg()
|
||||
pt.initEtcdEndpoints()
|
||||
pt.initMetaRootPath()
|
||||
pt.initKvRootPath()
|
||||
@ -66,6 +65,7 @@ func (pt *ParamTable) Init() {
|
||||
pt.initMinIOUseSSL()
|
||||
pt.initMinioBucketName()
|
||||
pt.initIndexRootPath()
|
||||
pt.initRoleName()
|
||||
}
|
||||
|
||||
// InitOnce is used to initialize configuration items, and it will only be called once.
|
||||
@ -158,6 +158,6 @@ func (pt *ParamTable) initIndexRootPath() {
|
||||
pt.IndexRootPath = path.Join(rootPath, "index_files")
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initLogCfg() {
|
||||
pt.InitLogCfg("indexcoord", 0)
|
||||
func (pt *ParamTable) initRoleName() {
|
||||
pt.RoleName = "indexcoord"
|
||||
}
|
||||
|
||||
@ -106,8 +106,7 @@ func (i *IndexNode) Register() error {
|
||||
}
|
||||
i.liveCh = i.session.Init(typeutil.IndexNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
|
||||
Params.NodeID = i.session.ServerID
|
||||
//TODO reset logger
|
||||
//Params.initLogCfg()
|
||||
Params.SetLogger(Params.NodeID)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -92,7 +92,7 @@ func (pt *ParamTable) initParams() {
|
||||
pt.initEtcdEndpoints()
|
||||
pt.initMetaRootPath()
|
||||
pt.initIndexRootPath()
|
||||
pt.initLogCfg()
|
||||
pt.initRoleName()
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initMinIOAddress() {
|
||||
@ -166,8 +166,8 @@ func (pt *ParamTable) initMinioBucketName() {
|
||||
pt.MinioBucketName = bucketName
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initLogCfg() {
|
||||
pt.InitLogCfg("indexnode", pt.NodeID)
|
||||
func (pt *ParamTable) initRoleName() {
|
||||
pt.RoleName = "indexnode"
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initKnowhereSimdType() {
|
||||
|
||||
@ -12,7 +12,6 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -66,7 +65,6 @@ type ParamTable struct {
|
||||
MaxTaskNum int64
|
||||
|
||||
PulsarMaxMessageSize int
|
||||
RoleName string
|
||||
}
|
||||
|
||||
var Params ParamTable
|
||||
@ -107,7 +105,7 @@ func (pt *ParamTable) Init() {
|
||||
|
||||
pt.initMaxTaskNum()
|
||||
|
||||
Params.initLogCfg()
|
||||
pt.initRoleName()
|
||||
}
|
||||
|
||||
func (pt *ParamTable) InitAlias(alias string) {
|
||||
@ -273,12 +271,8 @@ func (pt *ParamTable) initPulsarMaxMessageSize() {
|
||||
}
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initLogCfg() {
|
||||
pt.InitLogCfg("proxy", pt.ProxyID)
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initRoleName() {
|
||||
pt.RoleName = fmt.Sprintf("%s-%s", "Proxy", pt.Alias)
|
||||
pt.RoleName = "proxy"
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initEtcdEndpoints() {
|
||||
|
||||
@ -99,6 +99,7 @@ func (node *Proxy) Register() error {
|
||||
node.session = sessionutil.NewSession(node.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
|
||||
node.session.Init(typeutil.ProxyRole, Params.NetworkAddress, false)
|
||||
Params.ProxyID = node.session.ServerID
|
||||
Params.SetLogger(Params.ProxyID)
|
||||
Params.initProxySubName()
|
||||
// TODO Reset the logger
|
||||
//Params.initLogCfg()
|
||||
|
||||
@ -12,7 +12,6 @@
|
||||
package querycoord
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -40,8 +39,6 @@ type ParamTable struct {
|
||||
// timetick
|
||||
TimeTickChannelName string
|
||||
|
||||
RoleName string
|
||||
|
||||
// channels
|
||||
ClusterChannelPrefix string
|
||||
SearchChannelPrefix string
|
||||
@ -80,8 +77,6 @@ func (p *ParamTable) Init() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
p.initLogCfg()
|
||||
|
||||
p.initQueryCoordAddress()
|
||||
p.initRoleName()
|
||||
|
||||
@ -113,10 +108,6 @@ func (p *ParamTable) initQueryCoordAddress() {
|
||||
p.Address = url
|
||||
}
|
||||
|
||||
func (p *ParamTable) initRoleName() {
|
||||
p.RoleName = fmt.Sprintf("%s-%d", "QueryCoord", p.NodeID)
|
||||
}
|
||||
|
||||
func (p *ParamTable) initClusterMsgChannelPrefix() {
|
||||
config, err := p.Load("msgChannel.chanNamePrefix.cluster")
|
||||
if err != nil {
|
||||
@ -238,6 +229,6 @@ func (p *ParamTable) initMinioBucketName() {
|
||||
p.MinioBucketName = bucketName
|
||||
}
|
||||
|
||||
func (p *ParamTable) initLogCfg() {
|
||||
p.InitLogCfg("querycoord", 0)
|
||||
func (p *ParamTable) initRoleName() {
|
||||
p.RoleName = "querycoord"
|
||||
}
|
||||
|
||||
@ -79,6 +79,7 @@ func (qc *QueryCoord) Register() error {
|
||||
qc.session = sessionutil.NewSession(qc.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
|
||||
qc.liveCh = qc.session.Init(typeutil.QueryCoordRole, Params.Address, true)
|
||||
Params.NodeID = uint64(qc.session.ServerID)
|
||||
Params.SetLogger(typeutil.UniqueID(-1))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -133,7 +133,7 @@ func (p *ParamTable) Init() {
|
||||
p.initSegcoreChunkRows()
|
||||
p.initKnowhereSimdType()
|
||||
|
||||
p.initLogCfg()
|
||||
p.initRoleName()
|
||||
}
|
||||
|
||||
func (p *ParamTable) initCacheSize() {
|
||||
@ -320,6 +320,6 @@ func (p *ParamTable) initKnowhereSimdType() {
|
||||
p.SimdType = simdType
|
||||
}
|
||||
|
||||
func (p *ParamTable) initLogCfg() {
|
||||
p.InitLogCfg("querynode", p.QueryNodeID)
|
||||
func (p *ParamTable) initRoleName() {
|
||||
p.RoleName = "querynode"
|
||||
}
|
||||
|
||||
@ -113,6 +113,7 @@ func (node *QueryNode) Register() error {
|
||||
node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
|
||||
node.liveCh = node.session.Init(typeutil.QueryNodeRole, Params.QueryNodeIP+":"+strconv.FormatInt(Params.QueryNodePort, 10), false)
|
||||
Params.QueryNodeID = node.session.ServerID
|
||||
Params.SetLogger(Params.QueryNodeID)
|
||||
log.Debug("query nodeID", zap.Int64("nodeID", Params.QueryNodeID))
|
||||
log.Debug("query node address", zap.String("address", node.session.Address))
|
||||
|
||||
|
||||
@ -52,8 +52,6 @@ type ParamTable struct {
|
||||
|
||||
CreatedTime time.Time
|
||||
UpdatedTime time.Time
|
||||
|
||||
RoleName string
|
||||
}
|
||||
|
||||
// InitOnce initialize once
|
||||
@ -93,7 +91,6 @@ func (p *ParamTable) Init() {
|
||||
p.initTimeout()
|
||||
p.initTimeTickInterval()
|
||||
|
||||
p.initLogCfg()
|
||||
p.initRoleName()
|
||||
}
|
||||
|
||||
@ -217,10 +214,6 @@ func (p *ParamTable) initTimeTickInterval() {
|
||||
p.TimeTickInterval = p.ParseInt("rootcoord.timeTickInterval")
|
||||
}
|
||||
|
||||
func (p *ParamTable) initLogCfg() {
|
||||
p.InitLogCfg("rootcoord", 0)
|
||||
}
|
||||
|
||||
func (p *ParamTable) initRoleName() {
|
||||
p.RoleName = "RootCoord"
|
||||
p.RoleName = "rootcoord"
|
||||
}
|
||||
|
||||
@ -864,6 +864,7 @@ func (c *Core) Register() error {
|
||||
return fmt.Errorf("session is nil, maybe the etcd client connection fails")
|
||||
}
|
||||
c.sessCloseCh = c.session.Init(typeutil.RootCoordRole, Params.Address, true)
|
||||
Params.SetLogger(typeutil.UniqueID(-1))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -44,7 +44,9 @@ type BaseTable struct {
|
||||
params *memkv.MemoryKV
|
||||
configDir string
|
||||
|
||||
Log log.Config
|
||||
RoleName string
|
||||
Log log.Config
|
||||
LogConfigFunction func(log.Config)
|
||||
}
|
||||
|
||||
func (gp *BaseTable) Init() {
|
||||
@ -59,6 +61,8 @@ func (gp *BaseTable) Init() {
|
||||
gp.loadFromCommonYaml()
|
||||
|
||||
gp.tryloadFromEnv()
|
||||
|
||||
gp.InitLogCfg()
|
||||
}
|
||||
|
||||
func (gp *BaseTable) GetConfigDir() string {
|
||||
@ -419,7 +423,7 @@ func ConvertRangeToIntSlice(rangeStr, sep string) []int {
|
||||
return ret
|
||||
}
|
||||
|
||||
func (gp *BaseTable) InitLogCfg(role string, id UniqueID) {
|
||||
func (gp *BaseTable) InitLogCfg() {
|
||||
gp.Log = log.Config{}
|
||||
format, err := gp.Load("log.format")
|
||||
if err != nil {
|
||||
@ -434,13 +438,29 @@ func (gp *BaseTable) InitLogCfg(role string, id UniqueID) {
|
||||
gp.Log.File.MaxSize = gp.ParseInt("log.file.maxSize")
|
||||
gp.Log.File.MaxBackups = gp.ParseInt("log.file.maxBackups")
|
||||
gp.Log.File.MaxDays = gp.ParseInt("log.file.maxAge")
|
||||
}
|
||||
|
||||
func (gp *BaseTable) SetLogConfig(f func(log.Config)) {
|
||||
gp.LogConfigFunction = f
|
||||
}
|
||||
|
||||
func (gp *BaseTable) SetLogger(id UniqueID) {
|
||||
rootPath, err := gp.Load("log.file.rootPath")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if len(rootPath) != 0 {
|
||||
gp.Log.File.Filename = path.Join(rootPath, role+"-"+strconv.FormatInt(id, 10)+".log")
|
||||
log.Debug("Set logger ", zap.Int64("id", id), zap.String("role", gp.RoleName))
|
||||
if id < 0 {
|
||||
gp.Log.File.Filename = path.Join(rootPath, gp.RoleName+".log")
|
||||
} else {
|
||||
gp.Log.File.Filename = path.Join(rootPath, gp.RoleName+"-"+strconv.FormatInt(id, 10)+".log")
|
||||
}
|
||||
} else {
|
||||
gp.Log.File.Filename = ""
|
||||
}
|
||||
|
||||
if gp.LogConfigFunction != nil {
|
||||
gp.LogConfigFunction(gp.Log)
|
||||
}
|
||||
}
|
||||
|
||||
@ -12,6 +12,7 @@
|
||||
package paramtable
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
@ -224,16 +225,20 @@ func Test_ConvertRangeToIntSlice(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func Test_InitLogCfg(t *testing.T) {
|
||||
t.Run("TestInitLogCfg", func(t *testing.T) {
|
||||
baseParams.InitLogCfg("rootcoord", 0)
|
||||
assert.Equal(t, baseParams.Log.File.Filename, "")
|
||||
func Test_SetLogger(t *testing.T) {
|
||||
t.Run("TestSetLooger", func(t *testing.T) {
|
||||
baseParams.RoleName = "rootcoord"
|
||||
baseParams.Save("log.file.rootPath", ".")
|
||||
baseParams.SetLogger(UniqueID(-1))
|
||||
fmt.Println(baseParams.Log.File.Filename)
|
||||
assert.Equal(t, "rootcoord.log", baseParams.Log.File.Filename)
|
||||
|
||||
baseParams.Save("log.file.rootPath", "/")
|
||||
baseParams.InitLogCfg("rootcoord", 0)
|
||||
assert.Equal(t, baseParams.Log.File.Filename, "/rootcoord-0.log")
|
||||
baseParams.RoleName = "datanode"
|
||||
baseParams.SetLogger(UniqueID(1))
|
||||
assert.Equal(t, "datanode-1.log", baseParams.Log.File.Filename)
|
||||
|
||||
baseParams.InitLogCfg("datanode", 8)
|
||||
assert.Equal(t, baseParams.Log.File.Filename, "/datanode-8.log")
|
||||
baseParams.RoleName = "datanode"
|
||||
baseParams.SetLogger(UniqueID(0))
|
||||
assert.Equal(t, "datanode-0.log", baseParams.Log.File.Filename)
|
||||
})
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user