From b2ef07659759a418995f53bd9d11d13b4fcee7bd Mon Sep 17 00:00:00 2001 From: wei liu Date: Fri, 28 Nov 2025 19:33:09 +0800 Subject: [PATCH] fix: [2.6] prevent panic in standby mixcoord during shutdown #45859 (#45898) issue: #45728 pr: #45730 When mixcoord is in standby mode and shutdown is triggered, the ProcessActiveStandBy goroutine may panic if context cancellation occurs. This happens because the error handling didn't check for context.Canceled errors before panicking. Changes: - Add context cancellation check in mix_coord Register() before panic - Check s.ctx.Err() == context.Canceled and gracefully exit - Remove unused ForceActiveStandby() function from session_util This ensures standby mixcoord can shutdown gracefully without panic when context is cancelled during the standby process. Signed-off-by: Wei Liu --- internal/coordinator/mix_coord.go | 4 ++ internal/util/sessionutil/mock_session.go | 46 ------------- internal/util/sessionutil/session.go | 1 - internal/util/sessionutil/session_util.go | 67 ------------------- .../util/sessionutil/session_util_test.go | 54 --------------- 5 files changed, 4 insertions(+), 168 deletions(-) diff --git a/internal/coordinator/mix_coord.go b/internal/coordinator/mix_coord.go index 8aaad1195d..c364142b57 100644 --- a/internal/coordinator/mix_coord.go +++ b/internal/coordinator/mix_coord.go @@ -121,6 +121,10 @@ func (s *mixCoordImpl) Register() error { if s.enableActiveStandBy { go func() { if err := s.session.ProcessActiveStandBy(s.activateFunc); err != nil { + if s.ctx.Err() == context.Canceled { + log.Info("standby process canceled due to server shutdown") + return + } log.Error("failed to activate standby server", zap.Error(err)) panic(err) } diff --git a/internal/util/sessionutil/mock_session.go b/internal/util/sessionutil/mock_session.go index d6bedd3402..d060f766ba 100644 --- a/internal/util/sessionutil/mock_session.go +++ b/internal/util/sessionutil/mock_session.go @@ -69,52 +69,6 @@ func (_c *MockSession_Disconnected_Call) RunAndReturn(run func() bool) *MockSess return _c } -// ForceActiveStandby provides a mock function with given fields: activateFunc -func (_m *MockSession) ForceActiveStandby(activateFunc func() error) error { - ret := _m.Called(activateFunc) - - if len(ret) == 0 { - panic("no return value specified for ForceActiveStandby") - } - - var r0 error - if rf, ok := ret.Get(0).(func(func() error) error); ok { - r0 = rf(activateFunc) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MockSession_ForceActiveStandby_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ForceActiveStandby' -type MockSession_ForceActiveStandby_Call struct { - *mock.Call -} - -// ForceActiveStandby is a helper method to define mock.On call -// - activateFunc func() error -func (_e *MockSession_Expecter) ForceActiveStandby(activateFunc interface{}) *MockSession_ForceActiveStandby_Call { - return &MockSession_ForceActiveStandby_Call{Call: _e.mock.On("ForceActiveStandby", activateFunc)} -} - -func (_c *MockSession_ForceActiveStandby_Call) Run(run func(activateFunc func() error)) *MockSession_ForceActiveStandby_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(func() error)) - }) - return _c -} - -func (_c *MockSession_ForceActiveStandby_Call) Return(_a0 error) *MockSession_ForceActiveStandby_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockSession_ForceActiveStandby_Call) RunAndReturn(run func(func() error) error) *MockSession_ForceActiveStandby_Call { - _c.Call.Return(run) - return _c -} - // GetAddress provides a mock function with no fields func (_m *MockSession) GetAddress() string { ret := _m.Called() diff --git a/internal/util/sessionutil/session.go b/internal/util/sessionutil/session.go index 83908a2cd2..12c23aab38 100644 --- a/internal/util/sessionutil/session.go +++ b/internal/util/sessionutil/session.go @@ -45,7 +45,6 @@ type SessionInterface interface { Disconnected() bool SetEnableActiveStandBy(enable bool) ProcessActiveStandBy(activateFunc func() error) error - ForceActiveStandby(activateFunc func() error) error GetAddress() string GetServerID() int64 diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index a18ed529f2..21f2c5554a 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -1230,73 +1230,6 @@ func (s *Session) ProcessActiveStandBy(activateFunc func() error) error { return nil } -func (s *Session) ForceActiveStandby(activateFunc func() error) error { - s.activeKey = path.Join(s.metaRoot, DefaultServiceRoot, s.ServerName) - - // force register to the active_key. - forceRegisterActiveFn := func() error { - log.Info(fmt.Sprintf("try to register as ACTIVE %v service...", s.ServerName)) - sessionJSON, err := json.Marshal(s) - if err != nil { - log.Error("json marshal error", zap.Error(err)) - return err - } - - // try to release old session first - sessions, _, err := s.GetSessions(s.ServerName) - if err != nil { - return err - } - - if len(sessions) != 0 { - activeSess := sessions[s.ServerName] - if activeSess == nil || activeSess.LeaseID == nil { - // force delete all old sessions - s.etcdCli.Delete(s.ctx, s.activeKey) - for _, sess := range sessions { - if sess.ServerID != s.ServerID { - sess.getCompleteKey() - key := path.Join(s.metaRoot, DefaultServiceRoot, fmt.Sprintf("%s-%d", sess.ServerName, sess.ServerID)) - s.etcdCli.Delete(s.ctx, key) - } - } - } else { - // force release old active session - _, _ = s.etcdCli.Revoke(s.ctx, *activeSess.LeaseID) - } - } - - // then try to register as active - resp, err := s.etcdCli.Txn(s.ctx).If( - clientv3.Compare( - clientv3.Version(s.activeKey), - "=", - 0)). - Then(clientv3.OpPut(s.activeKey, string(sessionJSON), clientv3.WithLease(*s.LeaseID))).Commit() - - if err != nil || !resp.Succeeded { - msg := fmt.Sprintf("failed to force register ACTIVE %s", s.ServerName) - log.Error(msg, zap.Error(err), zap.Any("resp", resp)) - return errors.New(msg) - } - - log.Info(fmt.Sprintf("force register ACTIVE %s", s.ServerName)) - return nil - } - - err := retry.Do(s.ctx, forceRegisterActiveFn, retry.Attempts(uint(s.sessionRetryTimes))) - if err != nil { - log.Warn(fmt.Sprintf("failed to force register ACTIVE %s", s.ServerName)) - return err - } - s.updateStandby(false) - log.Info(fmt.Sprintf("serverName: %v quit STANDBY mode, this node will become ACTIVE, ID: %d", s.ServerName, s.ServerID)) - if activateFunc != nil { - return activateFunc() - } - return nil -} - func filterEmptyStrings(s []string) []string { var filtered []string for _, str := range s { diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index 29d7c30ddd..8e95431820 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -962,60 +962,6 @@ func (s *SessionSuite) TestRevoke() { } } -func (s *SessionSuite) TestForceActiveWithLeaseID() { - ctx := context.Background() - role := "test" - sess1 := NewSessionWithEtcd(ctx, s.metaRoot, s.client, WithResueNodeID(false)) - sess1.Init(role, "normal1", false, false) - sess1.Register() - sess1.ProcessActiveStandBy(nil) - - sess2 := NewSessionWithEtcd(ctx, s.metaRoot, s.client, WithResueNodeID(false)) - sess2.Init(role, "normal2", false, false) - sess2.Register() - sess2.ForceActiveStandby(nil) - - defer func() { - sess1.Stop() - sess2.Stop() - }() - sessions, _, err := sess2.GetSessions(role) - s.NoError(err) - s.Len(sessions, 2) - sess := sessions[role] - s.NotNil(sess) - s.Equal(sess.Address, "normal2") - s.Equal(sess.ServerID, sess2.ServerID) -} - -func (s *SessionSuite) TestForceActiveWithDelete() { - ctx := context.Background() - role := "test" - sess1 := NewSessionWithEtcd(ctx, s.metaRoot, s.client, WithResueNodeID(false)) - sess1.Init(role, "normal1", false, false) - sessionJSON, err := json.Marshal(sess1) - s.NoError(err) - s.client.Put(ctx, path.Join(s.metaRoot, DefaultServiceRoot, fmt.Sprintf("%s-%d", role, 1)), string(sessionJSON)) - s.client.Put(ctx, path.Join(s.metaRoot, DefaultServiceRoot, role), string(sessionJSON)) - - sess2 := NewSessionWithEtcd(ctx, s.metaRoot, s.client, WithResueNodeID(false)) - sess2.Init(role, "normal2", false, false) - sess2.Register() - sess2.ForceActiveStandby(nil) - - defer func() { - sess1.Stop() - sess2.Stop() - }() - sessions, _, err := sess2.GetSessions(role) - s.NoError(err) - s.Len(sessions, 2) - sess := sessions[role] - s.NotNil(sess) - s.Equal(sess.Address, "normal2") - s.Equal(sess.ServerID, sess2.ServerID) -} - func (s *SessionSuite) TestKeepAliveRetryActiveCancel() { ctx := context.Background() session := NewSessionWithEtcd(ctx, s.metaRoot, s.client)