diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index e493edcd41..6d38c58f32 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -98,9 +98,9 @@ type Server struct { flushCh chan UniqueID msFactory msgstream.Factory - session *sessionutil.Session - activeCh <-chan bool - eventCh <-chan *sessionutil.SessionEvent + session *sessionutil.Session + liveCh <-chan bool + eventCh <-chan *sessionutil.SessionEvent dataNodeCreator DataNodeCreatorFunc rootCoordClientCreator RootCoordCreatorFunc @@ -183,7 +183,7 @@ func (s *Server) Register() error { if s.session == nil { return errors.New("failed to initialize session") } - s.activeCh = s.session.Init(typeutil.DataCoordRole, Params.IP, true) + s.liveCh = s.session.Init(typeutil.DataCoordRole, Params.IP, true) Params.NodeID = s.session.ServerID return nil } @@ -304,12 +304,14 @@ func (s *Server) initMeta() error { func (s *Server) startServerLoop() { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) - s.serverLoopWg.Add(5) + s.serverLoopWg.Add(4) go s.startStatsChannel(s.serverLoopCtx) go s.startDataNodeTtLoop(s.serverLoopCtx) go s.startWatchService(s.serverLoopCtx) - go s.startActiveCheck(s.serverLoopCtx) go s.startFlushLoop(s.serverLoopCtx) + go s.session.LivenessCheck(s.serverLoopCtx, s.liveCh, func() { + s.Stop() + }) } func (s *Server) startStatsChannel(ctx context.Context) { @@ -469,26 +471,6 @@ func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.Sess } } -func (s *Server) startActiveCheck(ctx context.Context) { - defer logutil.LogPanic() - defer s.serverLoopWg.Done() - - for { - select { - case _, ok := <-s.activeCh: - if ok { - continue - } - go func() { s.Stop() }() - log.Debug("disconnect with etcd and shutdown data coordinator") - return - case <-ctx.Done(): - log.Debug("connection check shutdown") - return - } - } -} - func (s *Server) startFlushLoop(ctx context.Context) { defer logutil.LogPanic() defer s.serverLoopWg.Done() diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 764b3001e6..8b02b2fc82 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -95,6 +95,7 @@ type DataNode struct { dataCoord types.DataCoord session *sessionutil.Session + liveCh <-chan bool kvClient *etcdkv.EtcdKV closer io.Closer @@ -149,8 +150,7 @@ func (node *DataNode) SetDataCoordInterface(ds types.DataCoord) error { // Register register datanode to etcd func (node *DataNode) Register() error { node.session = sessionutil.NewSession(node.ctx, Params.MetaRootPath, Params.EtcdEndpoints) - activeCh := node.session.Init(typeutil.DataNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false) - go node.etcdAliveCheck(node.ctx, activeCh) + node.liveCh = node.session.Init(typeutil.DataNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false) Params.NodeID = node.session.ServerID node.NodeID = node.session.ServerID // Start node watch node @@ -202,26 +202,6 @@ func (node *DataNode) StartWatchChannels(ctx context.Context) { } } -// etcdAliveCheck performs alive check for etcd connection -// will close datanode if check fails -func (node *DataNode) etcdAliveCheck(ctx context.Context, ch <-chan bool) { - for { - select { - case _, ok := <-ch: - if ok { // ok means still alive do nothing - continue - } - // not ok, disconnect - go func() { node.Stop() }() - log.Warn("disconnected from etcd, shuting down datanode", zap.Int64("ServerID", node.NodeID)) - return - case <-ctx.Done(): - log.Warn("etcd alive check quit, due to ctx done") - return - } - } -} - // handleChannelEvt handles event from kv watch event func (node *DataNode) handleChannelEvt(evt *clientv3.Event) { switch evt.Type { @@ -368,6 +348,10 @@ func (node *DataNode) Start() error { go node.BackGroundGC(node.clearSignal) + go node.session.LivenessCheck(node.ctx, node.liveCh, func() { + node.Stop() + }) + Params.CreatedTime = time.Now() Params.UpdatedTime = time.Now() diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 349f63f576..db9f1efa5f 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -449,31 +449,6 @@ func TestDataNode(t *testing.T) { node.Stop() } -func TestDataNodeEtcdAlive(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - node := newIDLEDataNodeMock(ctx) - node.Init() - node.Start() - - mockCh := make(chan bool) - go node.etcdAliveCheck(ctx, mockCh) - - mockCh <- true - flag := false - select { - case <-node.ctx.Done(): - flag = true - default: - } - assert.False(t, flag) - - close(mockCh) - - _, ok := <-node.ctx.Done() - assert.False(t, ok) -} - func TestWatchChannel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) node := newIDLEDataNodeMock(ctx)