From 2df2d488e18d05330d3013bbb1844f685e33c19e Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Sat, 30 Oct 2021 10:24:38 +0800 Subject: [PATCH] Fix Unit test fatal fail (#10933) Signed-off-by: xiaofan-luan --- internal/datacoord/server.go | 5 ++++- internal/datanode/data_node.go | 5 ++++- internal/indexcoord/index_coord.go | 5 ++++- internal/indexnode/indexnode.go | 5 ++++- internal/proxy/proxy.go | 5 ++++- internal/querycoord/query_coord.go | 5 ++++- internal/querynode/query_node.go | 5 ++++- internal/rootcoord/root_coord.go | 3 +++ 8 files changed, 31 insertions(+), 7 deletions(-) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 490b6ce08b..9d8c9b3379 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -317,7 +317,10 @@ func (s *Server) startServerLoop() { s.startWatchService(s.serverLoopCtx) s.startFlushLoop(s.serverLoopCtx) go s.session.LivenessCheck(s.serverLoopCtx, func() { - log.Fatal("Data Coord disconnected from etcd, process will exit", zap.Int64("Server Id", s.session.ServerID)) + log.Error("Data Coord disconnected from etcd, process will exit", zap.Int64("Server Id", s.session.ServerID)) + if err := s.Stop(); err != nil { + log.Fatal("failed to stop server", zap.Error(err)) + } }) } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 9c4f7d7d80..aac980308e 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -173,7 +173,10 @@ func (node *DataNode) Register() error { go node.StartWatchChannels(node.ctx) // Start liveness check go node.session.LivenessCheck(node.ctx, func() { - log.Fatal("Data Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID)) + log.Error("Data Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID)) + if err := node.Stop(); err != nil { + log.Fatal("failed to stop server", zap.Error(err)) + } }) Params.initMsgChannelSubName() diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index 422eb0b826..6bf3626756 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -253,7 +253,10 @@ func (i *IndexCoord) Start() error { go i.watchMetaLoop() go i.session.LivenessCheck(i.loopCtx, func() { - log.Fatal("Index Coord disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID)) + log.Error("Index Coord disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID)) + if err := i.Stop(); err != nil { + log.Fatal("failed to stop server", zap.Error(err)) + } }) startErr = i.sched.Start() diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index c9bda41b8d..5d576bc004 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -185,7 +185,10 @@ func (i *IndexNode) Start() error { //start liveness check go i.session.LivenessCheck(i.loopCtx, func() { - log.Fatal("Index Node disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID)) + log.Error("Index Node disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID)) + if err := i.Stop(); err != nil { + log.Fatal("failed to stop server", zap.Error(err)) + } }) i.UpdateStateCode(internalpb.StateCode_Healthy) diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 31615262b2..e76e1c85b9 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -109,7 +109,10 @@ func (node *Proxy) Register() error { Params.SetLogger(Params.ProxyID) Params.initProxySubName() go node.session.LivenessCheck(node.ctx, func() { - log.Fatal("Proxy disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID)) + log.Error("Proxy disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID)) + if err := node.Stop(); err != nil { + log.Fatal("failed to stop server", zap.Error(err)) + } }) // TODO Reset the logger //Params.initLogCfg() diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 10ab21d23b..1313738a5c 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -183,7 +183,10 @@ func (qc *QueryCoord) Start() error { go qc.watchHandoffSegmentLoop() go qc.session.LivenessCheck(qc.loopCtx, func() { - log.Fatal("Query Coord disconnected from etcd, process will exit", zap.Int64("Server Id", qc.session.ServerID)) + log.Error("Query Coord disconnected from etcd, process will exit", zap.Int64("Server Id", qc.session.ServerID)) + if err := qc.Stop(); err != nil { + log.Fatal("failed to stop server", zap.Error(err)) + } }) return nil diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 9c488af844..40770ad59a 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -118,7 +118,10 @@ func (node *QueryNode) Register() error { node.session.Init(typeutil.QueryNodeRole, Params.QueryNodeIP+":"+strconv.FormatInt(Params.QueryNodePort, 10), false) // start liveness check go node.session.LivenessCheck(node.queryNodeLoopCtx, func() { - log.Fatal("Query Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID)) + log.Error("Query Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID)) + if err := node.Stop(); err != nil { + log.Fatal("failed to stop server", zap.Error(err)) + } }) Params.QueryNodeID = node.session.ServerID diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 6105c81f01..6d5aefeeb1 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1103,6 +1103,9 @@ func (c *Core) Start() error { go c.checkFlushedSegmentsLoop() go c.session.LivenessCheck(c.ctx, func() { log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID)) + if err := c.Stop(); err != nil { + log.Fatal("failed to stop server", zap.Error(err)) + } }) Params.CreatedTime = time.Now() Params.UpdatedTime = time.Now()