diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index bc0a912b79..0794caec63 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -317,9 +317,7 @@ func (s *Server) startServerLoop() { go s.startWatchService(s.serverLoopCtx) go s.startFlushLoop(s.serverLoopCtx) go s.session.LivenessCheck(s.serverLoopCtx, func() { - if err := s.Stop(); err != nil { - log.Error("failed to stop server", zap.Error(err)) - } + log.Fatal("Data Coord disconnected from etcd, process will exit", zap.Int64("Server Id", s.session.ServerID)) }) } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index b6cf92f8b4..9c4f7d7d80 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -173,10 +173,7 @@ func (node *DataNode) Register() error { go node.StartWatchChannels(node.ctx) // Start liveness check go node.session.LivenessCheck(node.ctx, func() { - err := node.Stop() - if err != nil { - log.Warn("node stop failed", zap.Error(err)) - } + log.Fatal("Data Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID)) }) Params.initMsgChannelSubName() diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index d869b07b3b..422eb0b826 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -253,7 +253,7 @@ func (i *IndexCoord) Start() error { go i.watchMetaLoop() go i.session.LivenessCheck(i.loopCtx, func() { - i.Stop() + log.Fatal("Index Coord disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID)) }) startErr = i.sched.Start() diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 13d2989e8f..c9bda41b8d 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -185,7 +185,7 @@ func (i *IndexNode) Start() error { //start liveness check go i.session.LivenessCheck(i.loopCtx, func() { - i.Stop() + log.Fatal("Index Node disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID)) }) i.UpdateStateCode(internalpb.StateCode_Healthy) diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 1a0567115d..a2c6d9c566 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -105,6 +105,9 @@ func (node *Proxy) Register() error { Params.ProxyID = node.session.ServerID Params.SetLogger(Params.ProxyID) Params.initProxySubName() + go node.session.LivenessCheck(node.ctx, func() { + log.Fatal("Proxy disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID)) + }) // TODO Reset the logger //Params.initLogCfg() return nil diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index c0fb08719a..10ab21d23b 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -183,7 +183,7 @@ func (qc *QueryCoord) Start() error { go qc.watchHandoffSegmentLoop() go qc.session.LivenessCheck(qc.loopCtx, func() { - qc.Stop() + log.Fatal("Query Coord disconnected from etcd, process will exit", zap.Int64("Server Id", qc.session.ServerID)) }) return nil diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 828cd91b65..c4beac1042 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -118,7 +118,7 @@ func (node *QueryNode) Register() error { node.session.Init(typeutil.QueryNodeRole, Params.QueryNodeIP+":"+strconv.FormatInt(Params.QueryNodePort, 10), false) // start liveness check go node.session.LivenessCheck(node.queryNodeLoopCtx, func() { - node.Stop() + log.Fatal("Query Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID)) }) Params.QueryNodeID = node.session.ServerID diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 489f662d2a..012d26b338 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -16,7 +16,6 @@ import ( "encoding/json" "fmt" "math/rand" - "os" "strconv" "sync" "sync/atomic" @@ -1103,11 +1102,7 @@ func (c *Core) Start() error { go c.checkFlushedSegmentsLoop() go c.session.LivenessCheck(c.ctx, func() { - log.Error("rootcoord disconnected from etcd, process will exit in 1 second") - go func() { - time.Sleep(time.Second) - os.Exit(-1) - }() + log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID)) }) Params.CreatedTime = time.Now() Params.UpdatedTime = time.Now() diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 33cf3d4e9d..339d20ed7b 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -241,12 +241,12 @@ func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveRes return case resp, ok := <-ch: if !ok { - log.Debug("session keepalive channel closed") + log.Warn("session keepalive channel closed") close(failCh) return } if resp == nil { - log.Debug("session keepalive response failed") + log.Warn("session keepalive response failed") close(failCh) return }