diff --git a/internal/coordinator/mix_coord.go b/internal/coordinator/mix_coord.go index 8aaad1195d..c364142b57 100644 --- a/internal/coordinator/mix_coord.go +++ b/internal/coordinator/mix_coord.go @@ -121,6 +121,10 @@ func (s *mixCoordImpl) Register() error { if s.enableActiveStandBy { go func() { if err := s.session.ProcessActiveStandBy(s.activateFunc); err != nil { + if s.ctx.Err() == context.Canceled { + log.Info("standby process canceled due to server shutdown") + return + } log.Error("failed to activate standby server", zap.Error(err)) panic(err) } diff --git a/internal/util/sessionutil/mock_session.go b/internal/util/sessionutil/mock_session.go index d6bedd3402..d060f766ba 100644 --- a/internal/util/sessionutil/mock_session.go +++ b/internal/util/sessionutil/mock_session.go @@ -69,52 +69,6 @@ func (_c *MockSession_Disconnected_Call) RunAndReturn(run func() bool) *MockSess return _c } -// ForceActiveStandby provides a mock function with given fields: activateFunc -func (_m *MockSession) ForceActiveStandby(activateFunc func() error) error { - ret := _m.Called(activateFunc) - - if len(ret) == 0 { - panic("no return value specified for ForceActiveStandby") - } - - var r0 error - if rf, ok := ret.Get(0).(func(func() error) error); ok { - r0 = rf(activateFunc) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MockSession_ForceActiveStandby_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ForceActiveStandby' -type MockSession_ForceActiveStandby_Call struct { - *mock.Call -} - -// ForceActiveStandby is a helper method to define mock.On call -// - activateFunc func() error -func (_e *MockSession_Expecter) ForceActiveStandby(activateFunc interface{}) *MockSession_ForceActiveStandby_Call { - return &MockSession_ForceActiveStandby_Call{Call: _e.mock.On("ForceActiveStandby", activateFunc)} -} - -func (_c *MockSession_ForceActiveStandby_Call) Run(run func(activateFunc func() error)) *MockSession_ForceActiveStandby_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(func() error)) - }) - return _c -} - -func (_c *MockSession_ForceActiveStandby_Call) Return(_a0 error) *MockSession_ForceActiveStandby_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockSession_ForceActiveStandby_Call) RunAndReturn(run func(func() error) error) *MockSession_ForceActiveStandby_Call { - _c.Call.Return(run) - return _c -} - // GetAddress provides a mock function with no fields func (_m *MockSession) GetAddress() string { ret := _m.Called() diff --git a/internal/util/sessionutil/session.go b/internal/util/sessionutil/session.go index 83908a2cd2..12c23aab38 100644 --- a/internal/util/sessionutil/session.go +++ b/internal/util/sessionutil/session.go @@ -45,7 +45,6 @@ type SessionInterface interface { Disconnected() bool SetEnableActiveStandBy(enable bool) ProcessActiveStandBy(activateFunc func() error) error - ForceActiveStandby(activateFunc func() error) error GetAddress() string GetServerID() int64 diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index a18ed529f2..21f2c5554a 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -1230,73 +1230,6 @@ func (s *Session) ProcessActiveStandBy(activateFunc func() error) error { return nil } -func (s *Session) ForceActiveStandby(activateFunc func() error) error { - s.activeKey = path.Join(s.metaRoot, DefaultServiceRoot, s.ServerName) - - // force register to the active_key. - forceRegisterActiveFn := func() error { - log.Info(fmt.Sprintf("try to register as ACTIVE %v service...", s.ServerName)) - sessionJSON, err := json.Marshal(s) - if err != nil { - log.Error("json marshal error", zap.Error(err)) - return err - } - - // try to release old session first - sessions, _, err := s.GetSessions(s.ServerName) - if err != nil { - return err - } - - if len(sessions) != 0 { - activeSess := sessions[s.ServerName] - if activeSess == nil || activeSess.LeaseID == nil { - // force delete all old sessions - s.etcdCli.Delete(s.ctx, s.activeKey) - for _, sess := range sessions { - if sess.ServerID != s.ServerID { - sess.getCompleteKey() - key := path.Join(s.metaRoot, DefaultServiceRoot, fmt.Sprintf("%s-%d", sess.ServerName, sess.ServerID)) - s.etcdCli.Delete(s.ctx, key) - } - } - } else { - // force release old active session - _, _ = s.etcdCli.Revoke(s.ctx, *activeSess.LeaseID) - } - } - - // then try to register as active - resp, err := s.etcdCli.Txn(s.ctx).If( - clientv3.Compare( - clientv3.Version(s.activeKey), - "=", - 0)). - Then(clientv3.OpPut(s.activeKey, string(sessionJSON), clientv3.WithLease(*s.LeaseID))).Commit() - - if err != nil || !resp.Succeeded { - msg := fmt.Sprintf("failed to force register ACTIVE %s", s.ServerName) - log.Error(msg, zap.Error(err), zap.Any("resp", resp)) - return errors.New(msg) - } - - log.Info(fmt.Sprintf("force register ACTIVE %s", s.ServerName)) - return nil - } - - err := retry.Do(s.ctx, forceRegisterActiveFn, retry.Attempts(uint(s.sessionRetryTimes))) - if err != nil { - log.Warn(fmt.Sprintf("failed to force register ACTIVE %s", s.ServerName)) - return err - } - s.updateStandby(false) - log.Info(fmt.Sprintf("serverName: %v quit STANDBY mode, this node will become ACTIVE, ID: %d", s.ServerName, s.ServerID)) - if activateFunc != nil { - return activateFunc() - } - return nil -} - func filterEmptyStrings(s []string) []string { var filtered []string for _, str := range s { diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index 29d7c30ddd..8e95431820 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -962,60 +962,6 @@ func (s *SessionSuite) TestRevoke() { } } -func (s *SessionSuite) TestForceActiveWithLeaseID() { - ctx := context.Background() - role := "test" - sess1 := NewSessionWithEtcd(ctx, s.metaRoot, s.client, WithResueNodeID(false)) - sess1.Init(role, "normal1", false, false) - sess1.Register() - sess1.ProcessActiveStandBy(nil) - - sess2 := NewSessionWithEtcd(ctx, s.metaRoot, s.client, WithResueNodeID(false)) - sess2.Init(role, "normal2", false, false) - sess2.Register() - sess2.ForceActiveStandby(nil) - - defer func() { - sess1.Stop() - sess2.Stop() - }() - sessions, _, err := sess2.GetSessions(role) - s.NoError(err) - s.Len(sessions, 2) - sess := sessions[role] - s.NotNil(sess) - s.Equal(sess.Address, "normal2") - s.Equal(sess.ServerID, sess2.ServerID) -} - -func (s *SessionSuite) TestForceActiveWithDelete() { - ctx := context.Background() - role := "test" - sess1 := NewSessionWithEtcd(ctx, s.metaRoot, s.client, WithResueNodeID(false)) - sess1.Init(role, "normal1", false, false) - sessionJSON, err := json.Marshal(sess1) - s.NoError(err) - s.client.Put(ctx, path.Join(s.metaRoot, DefaultServiceRoot, fmt.Sprintf("%s-%d", role, 1)), string(sessionJSON)) - s.client.Put(ctx, path.Join(s.metaRoot, DefaultServiceRoot, role), string(sessionJSON)) - - sess2 := NewSessionWithEtcd(ctx, s.metaRoot, s.client, WithResueNodeID(false)) - sess2.Init(role, "normal2", false, false) - sess2.Register() - sess2.ForceActiveStandby(nil) - - defer func() { - sess1.Stop() - sess2.Stop() - }() - sessions, _, err := sess2.GetSessions(role) - s.NoError(err) - s.Len(sessions, 2) - sess := sessions[role] - s.NotNil(sess) - s.Equal(sess.Address, "normal2") - s.Equal(sess.ServerID, sess2.ServerID) -} - func (s *SessionSuite) TestKeepAliveRetryActiveCancel() { ctx := context.Background() session := NewSessionWithEtcd(ctx, s.metaRoot, s.client)