From d6354958856fc3e3d0bbe80f543d9d7bd4ef482b Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 11 Apr 2024 17:33:21 +0800 Subject: [PATCH] fix: [2.3] Make coordinator `Register` not blocked on ProcessActiveStandby(#32069) (#32133) Cherry-pick from master pr: #32069 See also #32066 This PR make coordinator register successful and let `ProcessActiveStandBy` run async. And roles may receive stop signal and notify servers. --------- Signed-off-by: Congqi Xia --- internal/datacoord/server.go | 41 ++++++++++++--------- internal/datacoord/server_test.go | 43 +++++++++++++++++++++-- internal/querycoordv2/server.go | 27 ++++++++------ internal/querycoordv2/server_test.go | 12 +++---- internal/rootcoord/root_coord.go | 28 +++++++++------ internal/rootcoord/root_coord_test.go | 13 +++++-- internal/util/sessionutil/session_util.go | 2 +- 7 files changed, 116 insertions(+), 50 deletions(-) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 3b0511d768..aa0867d5d2 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -252,25 +252,34 @@ func (s *Server) Register() error { // first register indexCoord s.icSession.Register() s.session.Register() - if s.enableActiveStandBy { - err := s.session.ProcessActiveStandBy(s.activateFunc) - if err != nil { - return err - } + afterRegister := func() { + metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.DataCoordRole).Inc() + log.Info("DataCoord Register Finished") - err = s.icSession.ForceActiveStandby(nil) - if err != nil { - return nil - } + s.session.LivenessCheck(s.ctx, func() { + logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.GetServerID())) + os.Exit(1) + }) + } + if s.enableActiveStandBy { + go func() { + err := s.session.ProcessActiveStandBy(s.activateFunc) + if err != nil { + log.Error("failed to activate standby datacoord server", zap.Error(err)) + return + } + + err = s.icSession.ForceActiveStandby(nil) + if err != nil { + log.Error("failed to force activate standby indexcoord server", zap.Error(err)) + return + } + afterRegister() + }() + } else { + afterRegister() } - metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.DataCoordRole).Inc() - log.Info("DataCoord Register Finished") - - s.session.LivenessCheck(s.ctx, func() { - logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.GetServerID())) - os.Exit(1) - }) return nil } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index aca196ad2a..727ac1c5f8 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -4296,13 +4296,28 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server { err = svr.Init() assert.NoError(t, err) + + signal := make(chan struct{}) if Params.DataCoordCfg.EnableActiveStandby.GetAsBool() { assert.Equal(t, commonpb.StateCode_StandBy, svr.stateCode.Load().(commonpb.StateCode)) + activateFunc := svr.activateFunc + svr.activateFunc = func() error { + defer func() { + close(signal) + }() + var err error + if activateFunc != nil { + err = activateFunc() + } + return err + } } else { assert.Equal(t, commonpb.StateCode_Initializing, svr.stateCode.Load().(commonpb.StateCode)) + close(signal) } err = svr.Register() assert.NoError(t, err) + <-signal err = svr.Start() assert.NoError(t, err) assert.Equal(t, commonpb.StateCode_Healthy, svr.stateCode.Load().(commonpb.StateCode)) @@ -4600,14 +4615,35 @@ func testDataCoordBase(t *testing.T, opts ...Option) *Server { err = svr.Init() assert.NoError(t, err) - err = svr.Start() - assert.NoError(t, err) + + signal := make(chan struct{}) + if Params.DataCoordCfg.EnableActiveStandby.GetAsBool() { + assert.Equal(t, commonpb.StateCode_StandBy, svr.stateCode.Load().(commonpb.StateCode)) + activateFunc := svr.activateFunc + svr.activateFunc = func() error { + defer func() { + close(signal) + }() + var err error + if activateFunc != nil { + err = activateFunc() + } + return err + } + } else { + assert.Equal(t, commonpb.StateCode_Initializing, svr.stateCode.Load().(commonpb.StateCode)) + close(signal) + } err = svr.Register() assert.NoError(t, err) + <-signal + + err = svr.Start() + assert.NoError(t, err) resp, err := svr.GetComponentStates(context.Background(), nil) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + assert.True(t, merr.Ok(resp.GetStatus())) assert.Equal(t, commonpb.StateCode_Healthy, resp.GetState().GetStateCode()) // stop channal watch state watcher in tests @@ -4620,6 +4656,7 @@ func testDataCoordBase(t *testing.T, opts ...Option) *Server { func TestDataCoord_DisableActiveStandby(t *testing.T) { paramtable.Get().Save(Params.DataCoordCfg.EnableActiveStandby.Key, "false") + defer paramtable.Get().Reset(Params.DataCoordCfg.EnableActiveStandby.Key) svr := testDataCoordBase(t) defer closeTestServer(t, svr) } diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 4ce64d26bd..c466b5d26a 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -142,17 +142,24 @@ func NewQueryCoord(ctx context.Context) (*Server, error) { func (s *Server) Register() error { s.session.Register() - if s.enableActiveStandBy { - if err := s.session.ProcessActiveStandBy(s.activateFunc); err != nil { - log.Error("failed to activate standby server", zap.Error(err)) - return err - } + afterRegister := func() { + metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryCoordRole).Inc() + s.session.LivenessCheck(s.ctx, func() { + log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.GetServerID())) + os.Exit(1) + }) + } + if s.enableActiveStandBy { + go func() { + if err := s.session.ProcessActiveStandBy(s.activateFunc); err != nil { + log.Error("failed to activate standby server", zap.Error(err)) + return + } + afterRegister() + }() + } else { + afterRegister() } - metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryCoordRole).Inc() - s.session.LivenessCheck(s.ctx, func() { - log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.GetServerID())) - os.Exit(1) - }) return nil } diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index 0ea351843c..7218d3661e 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -306,6 +306,7 @@ func (suite *ServerSuite) TestDisableActiveStandby() { func (suite *ServerSuite) TestEnableActiveStandby() { paramtable.Get().Save(Params.QueryCoordCfg.EnableActiveStandby.Key, "true") + defer paramtable.Get().Reset(Params.QueryCoordCfg.EnableActiveStandby.Key) err := suite.server.Stop() suite.NoError(err) @@ -342,14 +343,11 @@ func (suite *ServerSuite) TestEnableActiveStandby() { suite.Equal(commonpb.StateCode_StandBy, states1.GetState().GetStateCode()) err = suite.server.Register() suite.NoError(err) - err = suite.server.Start() - suite.NoError(err) - states2, err := suite.server.GetComponentStates(context.Background(), nil) - suite.NoError(err) - suite.Equal(commonpb.StateCode_Healthy, states2.GetState().GetStateCode()) - - paramtable.Get().Save(Params.QueryCoordCfg.EnableActiveStandby.Key, "false") + suite.Eventually(func() bool { + state, err := suite.server.GetComponentStates(context.Background(), nil) + return err == nil && state.GetState().GetStateCode() == commonpb.StateCode_Healthy + }, time.Second*5, time.Millisecond*200) } func (suite *ServerSuite) TestStop() { diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 714dbbbaf8..8ea1a92e1f 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -270,17 +270,25 @@ func (c *Core) SetQueryCoordClient(s types.QueryCoordClient) error { // Register register rootcoord at etcd func (c *Core) Register() error { c.session.Register() - if c.enableActiveStandBy { - if err := c.session.ProcessActiveStandBy(c.activateFunc); err != nil { - return err - } + afterRegister := func() { + metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.RootCoordRole).Inc() + log.Info("RootCoord Register Finished") + c.session.LivenessCheck(c.ctx, func() { + log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID)) + os.Exit(1) + }) + } + if c.enableActiveStandBy { + go func() { + if err := c.session.ProcessActiveStandBy(c.activateFunc); err != nil { + log.Warn("failed to activate standby rootcoord server", zap.Error(err)) + return + } + afterRegister() + }() + } else { + afterRegister() } - metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.RootCoordRole).Inc() - log.Info("RootCoord Register Finished") - c.session.LivenessCheck(c.ctx, func() { - log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID)) - os.Exit(1) - }) return nil } diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 836f057797..27192f4320 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -1796,9 +1796,13 @@ func TestRootcoord_EnableActiveStandby(t *testing.T) { // Need to reset global etcd to follow new path kvfactory.CloseEtcdClient() paramtable.Get().Save(Params.RootCoordCfg.EnableActiveStandby.Key, "true") + defer paramtable.Get().Reset(Params.RootCoordCfg.EnableActiveStandby.Key) paramtable.Get().Save(Params.CommonCfg.RootCoordTimeTick.Key, fmt.Sprintf("rootcoord-time-tick-%d", randVal)) + defer paramtable.Get().Reset(Params.CommonCfg.RootCoordTimeTick.Key) paramtable.Get().Save(Params.CommonCfg.RootCoordStatistics.Key, fmt.Sprintf("rootcoord-statistics-%d", randVal)) + defer paramtable.Get().Reset(Params.CommonCfg.RootCoordStatistics.Key) paramtable.Get().Save(Params.CommonCfg.RootCoordDml.Key, fmt.Sprintf("rootcoord-dml-test-%d", randVal)) + defer paramtable.Get().Reset(Params.CommonCfg.RootCoordDml.Key) ctx := context.Background() coreFactory := dependency.NewDefaultFactory(true) @@ -1820,12 +1824,15 @@ func TestRootcoord_EnableActiveStandby(t *testing.T) { err = core.Init() assert.NoError(t, err) assert.Equal(t, commonpb.StateCode_StandBy, core.GetStateCode()) - err = core.Start() - assert.NoError(t, err) core.session.TriggerKill = false err = core.Register() assert.NoError(t, err) - assert.Equal(t, commonpb.StateCode_Healthy, core.GetStateCode()) + err = core.Start() + assert.NoError(t, err) + + assert.Eventually(t, func() bool { + return core.GetStateCode() == commonpb.StateCode_Healthy + }, time.Second*5, time.Millisecond*200) resp, err := core.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_DescribeCollection, diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index f55342ec35..0aba680ac2 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -1149,7 +1149,7 @@ func (s *Session) ForceActiveStandby(activateFunc func() error) error { 0)). Then(clientv3.OpPut(s.activeKey, string(sessionJSON), clientv3.WithLease(*s.LeaseID))).Commit() - if !resp.Succeeded { + if err != nil || !resp.Succeeded { msg := fmt.Sprintf("failed to force register ACTIVE %s", s.ServerName) log.Error(msg, zap.Error(err), zap.Any("resp", resp)) return errors.New(msg)