From 32809c1053b12cb112718ae36f73d83ce95acf43 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Sun, 21 Dec 2025 19:11:16 +0800 Subject: [PATCH] fix: Remove stale proxy clients on rewatch etcd (#46398) AddProxyClients now removes clients not in the new snapshot before adding new ones. This ensures proper cleanup when ProxyWatcher re-watche etcd. issue: https://github.com/milvus-io/milvus/issues/46397 Signed-off-by: bigsheeper --- internal/querycoordv2/server.go | 2 +- internal/rootcoord/root_coord.go | 2 +- .../proxyutil/mock_proxy_client_manager.go | 66 +++++++++---------- .../util/proxyutil/proxy_client_manager.go | 24 ++++++- .../proxyutil/proxy_client_manager_test.go | 40 +++++++---- 5 files changed, 85 insertions(+), 49 deletions(-) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 37ba453cbd..702b6daa09 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -321,7 +321,7 @@ func (s *Server) initQueryCoord() error { s.proxyClientManager = proxyutil.NewProxyClientManager(proxyutil.DefaultProxyCreator) s.proxyWatcher = proxyutil.NewProxyWatcher( s.etcdCli, - s.proxyClientManager.AddProxyClients, + s.proxyClientManager.SetProxyClients, ) s.proxyWatcher.AddSessionFunc(s.proxyClientManager.AddProxyClient) s.proxyWatcher.DelSessionFunc(s.proxyClientManager.DelProxyClient) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 9543ff01ba..3424d1e60c 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -445,7 +445,7 @@ func (c *Core) initInternal() error { c.proxyWatcher = proxyutil.NewProxyWatcher( c.etcdCli, c.chanTimeTick.initSessions, - c.proxyClientManager.AddProxyClients, + c.proxyClientManager.SetProxyClients, ) c.proxyWatcher.AddSessionFunc(c.chanTimeTick.addSession, c.proxyClientManager.AddProxyClient) c.proxyWatcher.DelSessionFunc(c.chanTimeTick.delSession, c.proxyClientManager.DelProxyClient) diff --git a/internal/util/proxyutil/mock_proxy_client_manager.go b/internal/util/proxyutil/mock_proxy_client_manager.go index 1e729da365..81f368d81c 100644 --- a/internal/util/proxyutil/mock_proxy_client_manager.go +++ b/internal/util/proxyutil/mock_proxy_client_manager.go @@ -63,39 +63,6 @@ func (_c *MockProxyClientManager_AddProxyClient_Call) RunAndReturn(run func(*ses return _c } -// AddProxyClients provides a mock function with given fields: session -func (_m *MockProxyClientManager) AddProxyClients(session []*sessionutil.Session) { - _m.Called(session) -} - -// MockProxyClientManager_AddProxyClients_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddProxyClients' -type MockProxyClientManager_AddProxyClients_Call struct { - *mock.Call -} - -// AddProxyClients is a helper method to define mock.On call -// - session []*sessionutil.Session -func (_e *MockProxyClientManager_Expecter) AddProxyClients(session interface{}) *MockProxyClientManager_AddProxyClients_Call { - return &MockProxyClientManager_AddProxyClients_Call{Call: _e.mock.On("AddProxyClients", session)} -} - -func (_c *MockProxyClientManager_AddProxyClients_Call) Run(run func(session []*sessionutil.Session)) *MockProxyClientManager_AddProxyClients_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].([]*sessionutil.Session)) - }) - return _c -} - -func (_c *MockProxyClientManager_AddProxyClients_Call) Return() *MockProxyClientManager_AddProxyClients_Call { - _c.Call.Return() - return _c -} - -func (_c *MockProxyClientManager_AddProxyClients_Call) RunAndReturn(run func([]*sessionutil.Session)) *MockProxyClientManager_AddProxyClients_Call { - _c.Run(run) - return _c -} - // DelProxyClient provides a mock function with given fields: s func (_m *MockProxyClientManager) DelProxyClient(s *sessionutil.Session) { _m.Called(s) @@ -540,6 +507,39 @@ func (_c *MockProxyClientManager_RefreshPolicyInfoCache_Call) RunAndReturn(run f return _c } +// SetProxyClients provides a mock function with given fields: session +func (_m *MockProxyClientManager) SetProxyClients(session []*sessionutil.Session) { + _m.Called(session) +} + +// MockProxyClientManager_SetProxyClients_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetProxyClients' +type MockProxyClientManager_SetProxyClients_Call struct { + *mock.Call +} + +// SetProxyClients is a helper method to define mock.On call +// - session []*sessionutil.Session +func (_e *MockProxyClientManager_Expecter) SetProxyClients(session interface{}) *MockProxyClientManager_SetProxyClients_Call { + return &MockProxyClientManager_SetProxyClients_Call{Call: _e.mock.On("SetProxyClients", session)} +} + +func (_c *MockProxyClientManager_SetProxyClients_Call) Run(run func(session []*sessionutil.Session)) *MockProxyClientManager_SetProxyClients_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].([]*sessionutil.Session)) + }) + return _c +} + +func (_c *MockProxyClientManager_SetProxyClients_Call) Return() *MockProxyClientManager_SetProxyClients_Call { + _c.Call.Return() + return _c +} + +func (_c *MockProxyClientManager_SetProxyClients_Call) RunAndReturn(run func([]*sessionutil.Session)) *MockProxyClientManager_SetProxyClients_Call { + _c.Run(run) + return _c +} + // SetRates provides a mock function with given fields: ctx, request func (_m *MockProxyClientManager) SetRates(ctx context.Context, request *proxypb.SetRatesRequest) error { ret := _m.Called(ctx, request) diff --git a/internal/util/proxyutil/proxy_client_manager.go b/internal/util/proxyutil/proxy_client_manager.go index 7693683055..2f05dee2b2 100644 --- a/internal/util/proxyutil/proxy_client_manager.go +++ b/internal/util/proxyutil/proxy_client_manager.go @@ -22,6 +22,7 @@ import ( "sync" "github.com/cockroachdb/errors" + "github.com/samber/lo" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -82,7 +83,7 @@ var defaultClientManagerHelper = ProxyClientManagerHelper{ type ProxyClientManagerInterface interface { AddProxyClient(session *sessionutil.Session) - AddProxyClients(session []*sessionutil.Session) + SetProxyClients(session []*sessionutil.Session) GetProxyClients() *typeutil.ConcurrentMap[int64, types.ProxyClient] DelProxyClient(s *sessionutil.Session) GetProxyCount() int @@ -111,7 +112,26 @@ func NewProxyClientManager(creator ProxyCreator) *ProxyClientManager { } } -func (p *ProxyClientManager) AddProxyClients(sessions []*sessionutil.Session) { +// SetProxyClients sets proxy clients from a full snapshot of sessions. +// It removes stale clients not in the new snapshot and adds new ones. +// This is called during initial setup or when re-watching after etcd error. +func (p *ProxyClientManager) SetProxyClients(sessions []*sessionutil.Session) { + aliveSessions := lo.KeyBy(sessions, func(session *sessionutil.Session) int64 { + return session.ServerID + }) + + // Remove stale clients not in the alive sessions + p.proxyClient.Range(func(key int64, value types.ProxyClient) bool { + if _, ok := aliveSessions[key]; !ok { + if cli, loaded := p.proxyClient.GetAndRemove(key); loaded { + cli.Close() + log.Info("remove stale proxy client", zap.Int64("serverID", key)) + } + } + return true + }) + + // Add new clients for _, session := range sessions { p.AddProxyClient(session) } diff --git a/internal/util/proxyutil/proxy_client_manager_test.go b/internal/util/proxyutil/proxy_client_manager_test.go index 60033dfaf5..382b6c5d91 100644 --- a/internal/util/proxyutil/proxy_client_manager_test.go +++ b/internal/util/proxyutil/proxy_client_manager_test.go @@ -103,22 +103,38 @@ func (p *proxyMock) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.Ref return merr.Success(), nil } -func TestProxyClientManager_AddProxyClients(t *testing.T) { - proxyCreator := func(ctx context.Context, addr string, nodeID int64) (types.ProxyClient, error) { - return nil, errors.New("failed") - } +func TestProxyClientManager_SetProxyClients(t *testing.T) { + p1 := mocks.NewMockProxyClient(t) + p1.EXPECT().Close().Return(nil).Once() + p2 := mocks.NewMockProxyClient(t) + p3 := mocks.NewMockProxyClient(t) + proxyCreator := func(ctx context.Context, addr string, nodeID int64) (types.ProxyClient, error) { + return p3, nil + } pcm := NewProxyClientManager(proxyCreator) - session := &sessionutil.Session{ - SessionRaw: sessionutil.SessionRaw{ - ServerID: 100, - Address: "localhost", - }, - } + // Initial state: proxy 1, 2 + pcm.proxyClient.Insert(1, p1) + pcm.proxyClient.Insert(2, p2) + assert.Equal(t, 2, pcm.GetProxyCount()) - sessions := []*sessionutil.Session{session} - pcm.AddProxyClients(sessions) + // New snapshot: proxy 2, 3 + sessions := []*sessionutil.Session{ + {SessionRaw: sessionutil.SessionRaw{ServerID: 2, Address: "addr2"}}, + {SessionRaw: sessionutil.SessionRaw{ServerID: 4, Address: "addr4"}}, + } + pcm.SetProxyClients(sessions) + + // Verify: proxy 1 removed, proxy 2 kept, proxy 3 added + _, ok := pcm.proxyClient.Get(1) + assert.False(t, ok, "stale proxy 1 should be removed") + _, ok = pcm.proxyClient.Get(2) + assert.True(t, ok, "proxy 2 should still exist") + _, ok = pcm.proxyClient.Get(4) + assert.True(t, ok, "proxy 4 should be added") + + assert.Equal(t, 2, pcm.GetProxyCount()) } func TestProxyClientManager_AddProxyClient(t *testing.T) {