mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-28 14:35:27 +08:00
fix: Remove stale proxy clients on rewatch etcd (#46398)
AddProxyClients now removes clients not in the new snapshot before adding new ones. This ensures proper cleanup when ProxyWatcher re-watche etcd. issue: https://github.com/milvus-io/milvus/issues/46397 Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
d03b9cc052
commit
32809c1053
@ -321,7 +321,7 @@ func (s *Server) initQueryCoord() error {
|
||||
s.proxyClientManager = proxyutil.NewProxyClientManager(proxyutil.DefaultProxyCreator)
|
||||
s.proxyWatcher = proxyutil.NewProxyWatcher(
|
||||
s.etcdCli,
|
||||
s.proxyClientManager.AddProxyClients,
|
||||
s.proxyClientManager.SetProxyClients,
|
||||
)
|
||||
s.proxyWatcher.AddSessionFunc(s.proxyClientManager.AddProxyClient)
|
||||
s.proxyWatcher.DelSessionFunc(s.proxyClientManager.DelProxyClient)
|
||||
|
||||
@ -445,7 +445,7 @@ func (c *Core) initInternal() error {
|
||||
c.proxyWatcher = proxyutil.NewProxyWatcher(
|
||||
c.etcdCli,
|
||||
c.chanTimeTick.initSessions,
|
||||
c.proxyClientManager.AddProxyClients,
|
||||
c.proxyClientManager.SetProxyClients,
|
||||
)
|
||||
c.proxyWatcher.AddSessionFunc(c.chanTimeTick.addSession, c.proxyClientManager.AddProxyClient)
|
||||
c.proxyWatcher.DelSessionFunc(c.chanTimeTick.delSession, c.proxyClientManager.DelProxyClient)
|
||||
|
||||
@ -63,39 +63,6 @@ func (_c *MockProxyClientManager_AddProxyClient_Call) RunAndReturn(run func(*ses
|
||||
return _c
|
||||
}
|
||||
|
||||
// AddProxyClients provides a mock function with given fields: session
|
||||
func (_m *MockProxyClientManager) AddProxyClients(session []*sessionutil.Session) {
|
||||
_m.Called(session)
|
||||
}
|
||||
|
||||
// MockProxyClientManager_AddProxyClients_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddProxyClients'
|
||||
type MockProxyClientManager_AddProxyClients_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// AddProxyClients is a helper method to define mock.On call
|
||||
// - session []*sessionutil.Session
|
||||
func (_e *MockProxyClientManager_Expecter) AddProxyClients(session interface{}) *MockProxyClientManager_AddProxyClients_Call {
|
||||
return &MockProxyClientManager_AddProxyClients_Call{Call: _e.mock.On("AddProxyClients", session)}
|
||||
}
|
||||
|
||||
func (_c *MockProxyClientManager_AddProxyClients_Call) Run(run func(session []*sessionutil.Session)) *MockProxyClientManager_AddProxyClients_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].([]*sessionutil.Session))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockProxyClientManager_AddProxyClients_Call) Return() *MockProxyClientManager_AddProxyClients_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockProxyClientManager_AddProxyClients_Call) RunAndReturn(run func([]*sessionutil.Session)) *MockProxyClientManager_AddProxyClients_Call {
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// DelProxyClient provides a mock function with given fields: s
|
||||
func (_m *MockProxyClientManager) DelProxyClient(s *sessionutil.Session) {
|
||||
_m.Called(s)
|
||||
@ -540,6 +507,39 @@ func (_c *MockProxyClientManager_RefreshPolicyInfoCache_Call) RunAndReturn(run f
|
||||
return _c
|
||||
}
|
||||
|
||||
// SetProxyClients provides a mock function with given fields: session
|
||||
func (_m *MockProxyClientManager) SetProxyClients(session []*sessionutil.Session) {
|
||||
_m.Called(session)
|
||||
}
|
||||
|
||||
// MockProxyClientManager_SetProxyClients_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetProxyClients'
|
||||
type MockProxyClientManager_SetProxyClients_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// SetProxyClients is a helper method to define mock.On call
|
||||
// - session []*sessionutil.Session
|
||||
func (_e *MockProxyClientManager_Expecter) SetProxyClients(session interface{}) *MockProxyClientManager_SetProxyClients_Call {
|
||||
return &MockProxyClientManager_SetProxyClients_Call{Call: _e.mock.On("SetProxyClients", session)}
|
||||
}
|
||||
|
||||
func (_c *MockProxyClientManager_SetProxyClients_Call) Run(run func(session []*sessionutil.Session)) *MockProxyClientManager_SetProxyClients_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].([]*sessionutil.Session))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockProxyClientManager_SetProxyClients_Call) Return() *MockProxyClientManager_SetProxyClients_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockProxyClientManager_SetProxyClients_Call) RunAndReturn(run func([]*sessionutil.Session)) *MockProxyClientManager_SetProxyClients_Call {
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// SetRates provides a mock function with given fields: ctx, request
|
||||
func (_m *MockProxyClientManager) SetRates(ctx context.Context, request *proxypb.SetRatesRequest) error {
|
||||
ret := _m.Called(ctx, request)
|
||||
|
||||
@ -22,6 +22,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
@ -82,7 +83,7 @@ var defaultClientManagerHelper = ProxyClientManagerHelper{
|
||||
|
||||
type ProxyClientManagerInterface interface {
|
||||
AddProxyClient(session *sessionutil.Session)
|
||||
AddProxyClients(session []*sessionutil.Session)
|
||||
SetProxyClients(session []*sessionutil.Session)
|
||||
GetProxyClients() *typeutil.ConcurrentMap[int64, types.ProxyClient]
|
||||
DelProxyClient(s *sessionutil.Session)
|
||||
GetProxyCount() int
|
||||
@ -111,7 +112,26 @@ func NewProxyClientManager(creator ProxyCreator) *ProxyClientManager {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ProxyClientManager) AddProxyClients(sessions []*sessionutil.Session) {
|
||||
// SetProxyClients sets proxy clients from a full snapshot of sessions.
|
||||
// It removes stale clients not in the new snapshot and adds new ones.
|
||||
// This is called during initial setup or when re-watching after etcd error.
|
||||
func (p *ProxyClientManager) SetProxyClients(sessions []*sessionutil.Session) {
|
||||
aliveSessions := lo.KeyBy(sessions, func(session *sessionutil.Session) int64 {
|
||||
return session.ServerID
|
||||
})
|
||||
|
||||
// Remove stale clients not in the alive sessions
|
||||
p.proxyClient.Range(func(key int64, value types.ProxyClient) bool {
|
||||
if _, ok := aliveSessions[key]; !ok {
|
||||
if cli, loaded := p.proxyClient.GetAndRemove(key); loaded {
|
||||
cli.Close()
|
||||
log.Info("remove stale proxy client", zap.Int64("serverID", key))
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
// Add new clients
|
||||
for _, session := range sessions {
|
||||
p.AddProxyClient(session)
|
||||
}
|
||||
|
||||
@ -103,22 +103,38 @@ func (p *proxyMock) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.Ref
|
||||
return merr.Success(), nil
|
||||
}
|
||||
|
||||
func TestProxyClientManager_AddProxyClients(t *testing.T) {
|
||||
proxyCreator := func(ctx context.Context, addr string, nodeID int64) (types.ProxyClient, error) {
|
||||
return nil, errors.New("failed")
|
||||
}
|
||||
func TestProxyClientManager_SetProxyClients(t *testing.T) {
|
||||
p1 := mocks.NewMockProxyClient(t)
|
||||
p1.EXPECT().Close().Return(nil).Once()
|
||||
p2 := mocks.NewMockProxyClient(t)
|
||||
p3 := mocks.NewMockProxyClient(t)
|
||||
|
||||
proxyCreator := func(ctx context.Context, addr string, nodeID int64) (types.ProxyClient, error) {
|
||||
return p3, nil
|
||||
}
|
||||
pcm := NewProxyClientManager(proxyCreator)
|
||||
|
||||
session := &sessionutil.Session{
|
||||
SessionRaw: sessionutil.SessionRaw{
|
||||
ServerID: 100,
|
||||
Address: "localhost",
|
||||
},
|
||||
}
|
||||
// Initial state: proxy 1, 2
|
||||
pcm.proxyClient.Insert(1, p1)
|
||||
pcm.proxyClient.Insert(2, p2)
|
||||
assert.Equal(t, 2, pcm.GetProxyCount())
|
||||
|
||||
sessions := []*sessionutil.Session{session}
|
||||
pcm.AddProxyClients(sessions)
|
||||
// New snapshot: proxy 2, 3
|
||||
sessions := []*sessionutil.Session{
|
||||
{SessionRaw: sessionutil.SessionRaw{ServerID: 2, Address: "addr2"}},
|
||||
{SessionRaw: sessionutil.SessionRaw{ServerID: 4, Address: "addr4"}},
|
||||
}
|
||||
pcm.SetProxyClients(sessions)
|
||||
|
||||
// Verify: proxy 1 removed, proxy 2 kept, proxy 3 added
|
||||
_, ok := pcm.proxyClient.Get(1)
|
||||
assert.False(t, ok, "stale proxy 1 should be removed")
|
||||
_, ok = pcm.proxyClient.Get(2)
|
||||
assert.True(t, ok, "proxy 2 should still exist")
|
||||
_, ok = pcm.proxyClient.Get(4)
|
||||
assert.True(t, ok, "proxy 4 should be added")
|
||||
|
||||
assert.Equal(t, 2, pcm.GetProxyCount())
|
||||
}
|
||||
|
||||
func TestProxyClientManager_AddProxyClient(t *testing.T) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user