diff --git a/cmd/tools/migration/migration/runner.go b/cmd/tools/migration/migration/runner.go index 8d87078f81..cf22fea63c 100644 --- a/cmd/tools/migration/migration/runner.go +++ b/cmd/tools/migration/migration/runner.go @@ -161,7 +161,7 @@ func (r *Runner) CheckSessions() error { func (r *Runner) RegisterSession() error { r.session.Register() - go r.session.LivenessCheck(r.ctx, func() {}) + r.session.LivenessCheck(r.ctx, func() {}) return nil } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 193975e538..1a957e12e2 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -228,7 +228,7 @@ func (s *Server) Register() error { } metrics.NumNodes.WithLabelValues(strconv.FormatInt(s.session.ServerID, 10), typeutil.DataCoordRole).Inc() log.Info("DataCoord Register Finished") - go s.session.LivenessCheck(s.serverLoopCtx, func() { + s.session.LivenessCheck(s.serverLoopCtx, func() { logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.ServerID)) if err := s.Stop(); err != nil { logutil.Logger(s.ctx).Fatal("failed to stop server", zap.Error(err)) diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 70e23c89b3..8c93adb5a2 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -3642,11 +3642,12 @@ func newTestServerWithMeta(t *testing.T, receiveCh chan any, meta *meta, opts .. assert.Nil(t, err) svr.meta = meta - err = svr.Start() - assert.Nil(t, err) err = svr.Register() assert.Nil(t, err) + err = svr.Start() + assert.Nil(t, err) + // Stop channal watch state watcher in tests if svr.channelManager != nil && svr.channelManager.stopChecker != nil { svr.channelManager.stopChecker() @@ -3692,11 +3693,13 @@ func newTestServer2(t *testing.T, receiveCh chan any, opts ...Option) *Server { err = svr.Init() assert.Nil(t, err) - err = svr.Start() - assert.Nil(t, err) + err = svr.Register() assert.Nil(t, err) + err = svr.Start() + assert.Nil(t, err) + // Stop channal watch state watcher in tests if svr.channelManager != nil && svr.channelManager.stopChecker != nil { svr.channelManager.stopChecker() @@ -3883,11 +3886,13 @@ func testDataCoordBase(t *testing.T, opts ...Option) *Server { err = svr.Init() assert.Nil(t, err) - err = svr.Start() - assert.Nil(t, err) + err = svr.Register() assert.Nil(t, err) + err = svr.Start() + assert.Nil(t, err) + resp, err := svr.GetComponentStates(context.Background()) assert.Nil(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 9f4be46870..6e86981e87 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -185,7 +185,7 @@ func (node *DataNode) Register() error { metrics.NumNodes.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), typeutil.DataNodeRole).Inc() log.Info("DataNode Register Finished") // Start liveness check - go node.session.LivenessCheck(node.ctx, func() { + node.session.LivenessCheck(node.ctx, func() { log.Error("Data Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID)) if err := node.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 1af830dd78..9ce5c859ce 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -853,8 +853,6 @@ func TestWatchChannel(t *testing.T) { err = node.Start() assert.Nil(t, err) defer node.Stop() - err = node.Register() - assert.Nil(t, err) defer cancel() diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index 0d9148bce4..feda1f6485 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -141,7 +141,7 @@ func (i *IndexCoord) Register() error { } metrics.NumNodes.WithLabelValues(strconv.FormatInt(i.session.ServerID, 10), typeutil.IndexCoordRole).Inc() log.Info("IndexCoord Register Finished") - go i.session.LivenessCheck(i.loopCtx, func() { + i.session.LivenessCheck(i.loopCtx, func() { log.Error("Index Coord disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID)) if err := i.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 2a9d27e4ba..f96823b20f 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -131,7 +131,7 @@ func (i *IndexNode) Register() error { log.Info("IndexNode Register Finished") //start liveness check - go i.session.LivenessCheck(i.loopCtx, func() { + i.session.LivenessCheck(i.loopCtx, func() { log.Error("Index Node disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID)) if err := i.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index ba07a51cb7..f82f551719 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -132,7 +132,7 @@ func (node *Proxy) Register() error { node.session.Register() metrics.NumNodes.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), typeutil.ProxyRole).Inc() log.Info("Proxy Register Finished") - go node.session.LivenessCheck(node.ctx, func() { + node.session.LivenessCheck(node.ctx, func() { log.Error("Proxy disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID)) if err := node.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 1ca431d099..a926a1635b 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -135,7 +135,7 @@ func (s *Server) Register() error { } metrics.NumNodes.WithLabelValues(strconv.FormatInt(s.session.ServerID, 10), typeutil.QueryCoordRole).Inc() log.Info("QueryCoord Register Finished") - go s.session.LivenessCheck(s.ctx, func() { + s.session.LivenessCheck(s.ctx, func() { log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.ServerID)) if err := s.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index bb7d286ef8..8112374917 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -212,10 +212,11 @@ func (suite *ServerSuite) TestDisableActiveStandby() { suite.NoError(err) suite.Equal(commonpb.StateCode_Initializing, suite.server.status.Load().(commonpb.StateCode)) suite.hackServer() - err = suite.server.Start() - suite.NoError(err) err = suite.server.Register() suite.NoError(err) + err = suite.server.Start() + suite.NoError(err) + suite.Equal(commonpb.StateCode_Healthy, suite.server.status.Load().(commonpb.StateCode)) states, err := suite.server.GetComponentStates(context.Background()) diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index f75bbb664a..2d3389489e 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -170,7 +170,7 @@ func (node *QueryNode) Register() error { metrics.NumNodes.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), typeutil.QueryNodeRole).Inc() log.Info("QueryNode Register Finished") // start liveness check - go node.session.LivenessCheck(node.queryNodeLoopCtx, func() { + node.session.LivenessCheck(node.queryNodeLoopCtx, func() { log.Error("Query Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID)) if err := node.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 7e89b61061..38f3421c20 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -304,7 +304,7 @@ func (c *Core) Register() error { } metrics.NumNodes.WithLabelValues(strconv.FormatInt(c.session.ServerID, 10), typeutil.RootCoordRole).Inc() log.Info("RootCoord Register Finished") - go c.session.LivenessCheck(c.ctx, func() { + c.session.LivenessCheck(c.ctx, func() { log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID)) if err := c.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 64963e148c..e3cbeb6ffb 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -1365,11 +1365,12 @@ func TestRootcoord_EnableActiveStandby(t *testing.T) { err = core.Init() assert.Equal(t, commonpb.StateCode_StandBy, core.stateCode.Load().(commonpb.StateCode)) assert.NoError(t, err) - err = core.Start() - assert.NoError(t, err) core.session.TriggerKill = false err = core.Register() assert.NoError(t, err) + err = core.Start() + assert.NoError(t, err) + assert.Equal(t, commonpb.StateCode_Healthy, core.stateCode.Load().(commonpb.StateCode)) resp, err := core.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ Base: &commonpb.MsgBase{ @@ -1416,11 +1417,11 @@ func TestRootcoord_DisableActiveStandby(t *testing.T) { err = core.Init() assert.Equal(t, commonpb.StateCode_Initializing, core.stateCode.Load().(commonpb.StateCode)) assert.NoError(t, err) - err = core.Start() - assert.NoError(t, err) core.session.TriggerKill = false err = core.Register() assert.NoError(t, err) + err = core.Start() + assert.NoError(t, err) assert.Equal(t, commonpb.StateCode_Healthy, core.stateCode.Load().(commonpb.StateCode)) resp, err := core.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ Base: &commonpb.MsgBase{ diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 3667cfe07a..1dc400d48e 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -669,68 +669,71 @@ func (s *Session) LivenessCheck(ctx context.Context, callback func()) { panic(err) } s.watchSessionKeyCh = s.etcdCli.Watch(context.Background(), s.getSessionKey(), clientv3.WithRev(getResp.Header.Revision)) - for { - select { - case _, ok := <-s.liveCh: - // ok, still alive - if ok { - continue - } - // not ok, connection lost - log.Warn("connection lost detected, shuting down") - if callback != nil { - go callback() - } - return - case <-ctx.Done(): - log.Info("liveness exits due to context done") - // cancel the etcd keepAlive context - if s.keepAliveCancel != nil { - s.keepAliveCancel() - } - return - case resp, ok := <-s.watchSessionKeyCh: - if !ok { - log.Warn("watch session key channel closed") + go func() { + for { + select { + case _, ok := <-s.liveCh: + // ok, still alive + if ok { + continue + } + // not ok, connection lost + log.Warn("connection lost detected, shuting down") + if callback != nil { + go callback() + } + return + case <-ctx.Done(): + log.Info("liveness exits due to context done") + // cancel the etcd keepAlive context if s.keepAliveCancel != nil { s.keepAliveCancel() } return - } - if resp.Err() != nil { - // if not ErrCompacted, just close the channel - if resp.Err() != v3rpc.ErrCompacted { - //close event channel - log.Warn("Watch service found error", zap.Error(resp.Err())) + case resp, ok := <-s.watchSessionKeyCh: + if !ok { + log.Warn("watch session key channel closed") if s.keepAliveCancel != nil { s.keepAliveCancel() } return } - log.Warn("Watch service found compacted error", zap.Error(resp.Err())) - getResp, err := s.etcdCli.Get(s.ctx, s.getSessionKey()) - if err != nil || len(getResp.Kvs) == 0 { - if s.keepAliveCancel != nil { - s.keepAliveCancel() + if resp.Err() != nil { + // if not ErrCompacted, just close the channel + if resp.Err() != v3rpc.ErrCompacted { + //close event channel + log.Warn("Watch service found error", zap.Error(resp.Err())) + if s.keepAliveCancel != nil { + s.keepAliveCancel() + } + return } - return + log.Warn("Watch service found compacted error", zap.Error(resp.Err())) + getResp, err := s.etcdCli.Get(s.ctx, s.getSessionKey()) + if err != nil || len(getResp.Kvs) == 0 { + if s.keepAliveCancel != nil { + s.keepAliveCancel() + } + return + } + s.watchSessionKeyCh = s.etcdCli.Watch(s.ctx, s.getSessionKey(), clientv3.WithRev(getResp.Header.Revision)) + continue } - s.watchSessionKeyCh = s.etcdCli.Watch(s.ctx, s.getSessionKey(), clientv3.WithRev(getResp.Header.Revision)) - continue - } - for _, event := range resp.Events { - switch event.Type { - case mvccpb.PUT: - log.Info("register session success", zap.String("role", s.ServerName), zap.String("key", string(event.Kv.Key))) - case mvccpb.DELETE: - log.Info("session key is deleted, exit...", zap.String("role", s.ServerName), zap.String("key", string(event.Kv.Key))) - if s.keepAliveCancel != nil { - s.keepAliveCancel() + for _, event := range resp.Events { + switch event.Type { + case mvccpb.PUT: + log.Info("register session success", zap.String("role", s.ServerName), zap.String("key", string(event.Kv.Key))) + case mvccpb.DELETE: + log.Info("session key is deleted, exit...", zap.String("role", s.ServerName), zap.String("key", string(event.Kv.Key))) + if s.keepAliveCancel != nil { + s.keepAliveCancel() + } } } } } - } + }() + } // Revoke revokes the internal leaseID for the session key diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index 5e0b189d03..c53145ed87 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -202,7 +202,7 @@ func TestSessionLivenessCheck(t *testing.T) { flag := false - go s.LivenessCheck(ctx, func() { + s.LivenessCheck(ctx, func() { flag = true signal <- struct{}{} }) @@ -222,7 +222,7 @@ func TestSessionLivenessCheck(t *testing.T) { s.liveCh = ch flag = false - go s.LivenessCheck(ctx, func() { + s.LivenessCheck(ctx, func() { flag = true signal <- struct{}{} }) @@ -658,7 +658,7 @@ func TestSessionProcessActiveStandBy(t *testing.T) { wg.Done() return nil }) - go s1.LivenessCheck(ctx1, func() { + s1.LivenessCheck(ctx1, func() { flag = true signal <- struct{}{} s1.keepAliveCancel()