mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Improve some DataNode service logs (#19648)
/kind improvement Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com> Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>
This commit is contained in:
parent
49daee2e17
commit
b24e97e6a8
@ -161,7 +161,7 @@ func (node *DataNode) SetEtcdClient(etcdCli *clientv3.Client) {
|
||||
func (node *DataNode) SetRootCoord(rc types.RootCoord) error {
|
||||
switch {
|
||||
case rc == nil, node.rootCoord != nil:
|
||||
return errors.New("Nil parameter or repeatly set")
|
||||
return errors.New("nil parameter or repeatedly set")
|
||||
default:
|
||||
node.rootCoord = rc
|
||||
return nil
|
||||
@ -172,7 +172,7 @@ func (node *DataNode) SetRootCoord(rc types.RootCoord) error {
|
||||
func (node *DataNode) SetDataCoord(ds types.DataCoord) error {
|
||||
switch {
|
||||
case ds == nil, node.dataCoord != nil:
|
||||
return errors.New("Nil parameter or repeatly set")
|
||||
return errors.New("nil parameter or repeatedly set")
|
||||
default:
|
||||
node.dataCoord = ds
|
||||
return nil
|
||||
@ -225,32 +225,32 @@ func (node *DataNode) initRateCollector() error {
|
||||
|
||||
// Init function does nothing now.
|
||||
func (node *DataNode) Init() error {
|
||||
log.Info("DataNode Init",
|
||||
log.Info("DataNode server initializing",
|
||||
zap.String("TimeTickChannelName", Params.CommonCfg.DataCoordTimeTick),
|
||||
)
|
||||
if err := node.initSession(); err != nil {
|
||||
log.Error("DataNode init session failed", zap.Error(err))
|
||||
log.Error("DataNode server init session failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
err := node.initRateCollector()
|
||||
if err != nil {
|
||||
log.Error("DataNode init rateCollector failed", zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()), zap.Error(err))
|
||||
log.Error("DataNode server init rateCollector failed", zap.Int64("node ID", Params.QueryNodeCfg.GetNodeID()), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Info("DataNode init rateCollector done", zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
|
||||
log.Info("DataNode server init rateCollector done", zap.Int64("node ID", Params.QueryNodeCfg.GetNodeID()))
|
||||
|
||||
idAllocator, err := allocator2.NewIDAllocator(node.ctx, node.rootCoord, Params.DataNodeCfg.GetNodeID())
|
||||
if err != nil {
|
||||
log.Error("failed to create id allocator",
|
||||
zap.Error(err),
|
||||
zap.String("role", typeutil.DataNodeRole), zap.Int64("DataNodeID", Params.DataNodeCfg.GetNodeID()))
|
||||
zap.String("role", typeutil.DataNodeRole), zap.Int64("DataNode ID", Params.DataNodeCfg.GetNodeID()))
|
||||
return err
|
||||
}
|
||||
node.rowIDAllocator = idAllocator
|
||||
|
||||
node.factory.Init(&Params)
|
||||
log.Info("DataNode Init successfully",
|
||||
log.Info("DataNode server init succeeded",
|
||||
zap.String("MsgChannelSubName", Params.CommonCfg.DataNodeSubName))
|
||||
|
||||
return nil
|
||||
|
||||
@ -149,7 +149,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
|
||||
|
||||
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
|
||||
if err := s.grpcServer.Serve(lis); err != nil {
|
||||
log.Warn("DataNode Start Grpc Failed!")
|
||||
log.Warn("DataNode failed to start gRPC")
|
||||
s.grpcErrChan <- err
|
||||
}
|
||||
|
||||
@ -170,14 +170,15 @@ func (s *Server) SetDataCoordInterface(ds types.DataCoord) error {
|
||||
// Run initializes and starts Datanode's grpc service.
|
||||
func (s *Server) Run() error {
|
||||
if err := s.init(); err != nil {
|
||||
// errors are propagated upstream as panic.
|
||||
return err
|
||||
}
|
||||
log.Debug("DataNode init done ...")
|
||||
|
||||
log.Info("DataNode gRPC services successfully initialized")
|
||||
if err := s.start(); err != nil {
|
||||
// errors are propagated upstream as panic.
|
||||
return err
|
||||
}
|
||||
log.Debug("DataNode start done ...")
|
||||
log.Info("DataNode gRPC services successfully started")
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -226,7 +227,7 @@ func (s *Server) init() error {
|
||||
Params.InitOnce(typeutil.DataNodeRole)
|
||||
if !funcutil.CheckPortAvailable(Params.Port) {
|
||||
Params.Port = funcutil.GetAvailablePort()
|
||||
log.Warn("DataNode get available port when init", zap.Int("Port", Params.Port))
|
||||
log.Warn("DataNode found available port during init", zap.Int("port", Params.Port))
|
||||
}
|
||||
dn.Params.InitOnce()
|
||||
dn.Params.DataNodeCfg.Port = Params.Port
|
||||
@ -234,15 +235,14 @@ func (s *Server) init() error {
|
||||
|
||||
etcdCli, err := etcd.GetEtcdClient(&dn.Params.EtcdCfg)
|
||||
if err != nil {
|
||||
log.Debug("DataNode connect to etcd failed", zap.Error(err))
|
||||
log.Error("failed to connect to etcd", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
s.etcdCli = etcdCli
|
||||
s.SetEtcdClient(s.etcdCli)
|
||||
closer := trace.InitTracing(fmt.Sprintf("data_node ip: %s, port: %d", Params.IP, Params.Port))
|
||||
closer := trace.InitTracing(fmt.Sprintf("DataNode IP: %s, port: %d", Params.IP, Params.Port))
|
||||
s.closer = closer
|
||||
addr := Params.IP + ":" + strconv.Itoa(Params.Port)
|
||||
log.Debug("DataNode address", zap.String("address", addr))
|
||||
log.Info("DataNode address", zap.String("address", Params.IP+":"+strconv.Itoa(Params.Port)))
|
||||
|
||||
err = s.startGrpc()
|
||||
if err != nil {
|
||||
@ -251,65 +251,62 @@ func (s *Server) init() error {
|
||||
|
||||
// --- RootCoord Client ---
|
||||
if s.newRootCoordClient != nil {
|
||||
log.Debug("Init root coord client ...")
|
||||
log.Info("initializing RootCoord client for DataNode")
|
||||
rootCoordClient, err := s.newRootCoordClient(dn.Params.EtcdCfg.MetaRootPath, s.etcdCli)
|
||||
if err != nil {
|
||||
log.Debug("DataNode newRootCoordClient failed", zap.Error(err))
|
||||
log.Error("failed to create new RootCoord client", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
if err = rootCoordClient.Init(); err != nil {
|
||||
log.Debug("DataNode rootCoordClient Init failed", zap.Error(err))
|
||||
log.Error("failed to init RootCoord client", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
if err = rootCoordClient.Start(); err != nil {
|
||||
log.Debug("DataNode rootCoordClient Start failed", zap.Error(err))
|
||||
log.Error("failed to start RootCoord client", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
err = funcutil.WaitForComponentHealthy(ctx, rootCoordClient, "RootCoord", 1000000, time.Millisecond*200)
|
||||
if err != nil {
|
||||
log.Debug("DataNode wait rootCoord ready failed", zap.Error(err))
|
||||
if err = funcutil.WaitForComponentHealthy(ctx, rootCoordClient, "RootCoord", 1000000, time.Millisecond*200); err != nil {
|
||||
log.Error("failed to wait for RootCoord client to be ready", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
log.Debug("DataNode rootCoord is ready")
|
||||
log.Info("RootCoord client is ready for DataNode")
|
||||
if err = s.SetRootCoordInterface(rootCoordClient); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// --- Data Server Client ---
|
||||
// --- DataCoord Client ---
|
||||
if s.newDataCoordClient != nil {
|
||||
log.Debug("DataNode Init data service client ...")
|
||||
log.Debug("starting DataCoord client for DataNode")
|
||||
dataCoordClient, err := s.newDataCoordClient(dn.Params.EtcdCfg.MetaRootPath, s.etcdCli)
|
||||
if err != nil {
|
||||
log.Debug("DataNode newDataCoordClient failed", zap.Error(err))
|
||||
log.Error("failed to create new DataCoord client", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
if err = dataCoordClient.Init(); err != nil {
|
||||
log.Debug("DataNode newDataCoord failed", zap.Error(err))
|
||||
log.Error("failed to init DataCoord client", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
if err = dataCoordClient.Start(); err != nil {
|
||||
log.Debug("DataNode dataCoordClient Start failed", zap.Error(err))
|
||||
log.Error("failed to start DataCoord client", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
err = funcutil.WaitForComponentInitOrHealthy(ctx, dataCoordClient, "DataCoord", 1000000, time.Millisecond*200)
|
||||
if err != nil {
|
||||
log.Debug("DataNode wait dataCoordClient ready failed", zap.Error(err))
|
||||
if err = funcutil.WaitForComponentInitOrHealthy(ctx, dataCoordClient, "DataCoord", 1000000, time.Millisecond*200); err != nil {
|
||||
log.Error("failed to wait for DataCoord client to be ready", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
log.Debug("DataNode dataCoord is ready")
|
||||
log.Info("DataCoord client is ready for DataNode")
|
||||
if err = s.SetDataCoordInterface(dataCoordClient); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
s.datanode.UpdateStateCode(internalpb.StateCode_Initializing)
|
||||
|
||||
if err := s.datanode.Init(); err != nil {
|
||||
log.Warn("datanode init error: ", zap.Error(err))
|
||||
log.Error("failed to init DataNode server", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("DataNode", zap.Any("State", internalpb.StateCode_Initializing))
|
||||
log.Info("current DataNode state", zap.Any("state", s.datanode.GetStateCode()))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -320,7 +317,7 @@ func (s *Server) start() error {
|
||||
}
|
||||
err := s.datanode.Register()
|
||||
if err != nil {
|
||||
log.Debug("DataNode Register etcd failed", zap.Error(err))
|
||||
log.Debug("failed to register to Etcd", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user