Fix session stop/goingStop stuck after connection lost (#23771)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2023-05-16 09:59:27 +08:00 committed by GitHub
parent 08fe2786f6
commit bc86aa666f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 179 additions and 36 deletions

View File

@ -88,7 +88,8 @@ type Session struct {
metaRoot string
registered atomic.Value
registered atomic.Value
disconnected atomic.Value
isStandby atomic.Value
enableActiveStandBy bool
@ -491,6 +492,10 @@ func (s *Session) GoingStop() error {
return errors.New("the session hasn't been init")
}
if s.Disconnected() {
return errors.New("this session has disconnected")
}
completeKey := s.getCompleteKey()
resp, err := s.etcdCli.Get(s.ctx, completeKey, clientv3.WithCountOnly())
if err != nil {
@ -688,6 +693,7 @@ func (s *Session) LivenessCheck(ctx context.Context, callback func()) {
}
// not ok, connection lost
log.Warn("connection lost detected, shuting down")
s.SetDisconnected(true)
if callback != nil {
go callback()
}
@ -760,6 +766,9 @@ func (s *Session) Revoke(timeout time.Duration) {
if s.etcdCli == nil || s.leaseID == nil {
return
}
if s.Disconnected() {
return
}
// can NOT use s.ctx, it may be Done here
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
@ -781,6 +790,18 @@ func (s *Session) Registered() bool {
return b
}
func (s *Session) SetDisconnected(b bool) {
s.disconnected.Store(b)
}
func (s *Session) Disconnected() bool {
b, ok := s.disconnected.Load().(bool)
if !ok {
return false
}
return b
}
func (s *Session) SetEnableActiveStandBy(enable bool) {
s.enableActiveStandBy = enable
}

View File

@ -21,6 +21,7 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -370,41 +371,6 @@ func TestWatcherHandleWatchResp(t *testing.T) {
}
func TestSessionRevoke(t *testing.T) {
s := &Session{}
assert.NotPanics(t, func() {
s.Revoke(time.Second)
})
s = (*Session)(nil)
assert.NotPanics(t, func() {
s.Revoke(time.Second)
})
ctx := context.Background()
Params.Init()
endpoints := Params.LoadWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints)
metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot)
etcdEndpoints := strings.Split(endpoints, ",")
etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints)
defer etcdCli.Close()
require.NoError(t, err)
etcdKV := etcdkv.NewEtcdKV(etcdCli, metaRoot)
err = etcdKV.RemoveWithPrefix("")
assert.NoError(t, err)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
s = NewSession(ctx, metaRoot, etcdCli)
s.Init("revoketest", "testAddr", false, false)
assert.NotPanics(t, func() {
s.Revoke(time.Second)
})
}
func TestSession_Registered(t *testing.T) {
session := &Session{}
session.UpdateRegistered(false)
@ -719,3 +685,159 @@ func TestSession_apply(t *testing.T) {
assert.Equal(t, int64(100), session.sessionTTL)
assert.Equal(t, int64(200), session.sessionRetryTimes)
}
type SessionSuite struct {
suite.Suite
tmpDir string
etcdServer *embed.Etcd
metaRoot string
serverName string
client *clientv3.Client
}
func (s *SessionSuite) SetupSuite() {
dir, err := ioutil.TempDir(os.TempDir(), "milvus_ut")
s.Require().NoError(err)
s.tmpDir = dir
s.T().Log("using tmp dir:", dir)
config := embed.NewConfig()
config.Dir = os.TempDir()
config.LogLevel = "warn"
config.LogOutputs = []string{"default"}
u, err := url.Parse("http://localhost:0")
s.Require().NoError(err)
config.LCUrls = []url.URL{*u}
u, err = url.Parse("http://localhost:0")
s.Require().NoError(err)
config.LPUrls = []url.URL{*u}
etcdServer, err := embed.StartEtcd(config)
s.Require().NoError(err)
s.etcdServer = etcdServer
}
func (s *SessionSuite) TearDownSuite() {
if s.etcdServer != nil {
s.etcdServer.Close()
}
if s.tmpDir != "" {
os.RemoveAll(s.tmpDir)
}
}
func (s *SessionSuite) SetupTest() {
client := v3client.New(s.etcdServer.Server)
s.client = client
s.metaRoot = fmt.Sprintf("milvus-ut/session-%s/", funcutil.GenRandomStr())
}
func (s *SessionSuite) TearDownTest() {
_, err := s.client.Delete(context.Background(), s.metaRoot, clientv3.WithPrefix())
s.Require().NoError(err)
if s.client != nil {
s.client.Close()
s.client = nil
}
}
func (s *SessionSuite) TestDisconnected() {
st := &Session{}
st.SetDisconnected(true)
sf := &Session{}
sf.SetDisconnected(false)
cases := []struct {
tag string
input *Session
expect bool
}{
{"not_set", &Session{}, false},
{"set_true", st, true},
{"set_false", sf, false},
}
for _, c := range cases {
s.Run(c.tag, func() {
s.Equal(c.expect, c.input.Disconnected())
})
}
}
func (s *SessionSuite) TestGoingStop() {
ctx := context.Background()
sdisconnect := NewSession(ctx, s.metaRoot, s.client)
sdisconnect.SetDisconnected(true)
sess := NewSession(ctx, s.metaRoot, s.client)
sess.Init("test", "normal", false, false)
sess.Register()
cases := []struct {
tag string
input *Session
expectError bool
}{
{"nil", nil, true},
{"not_inited", &Session{}, true},
{"disconnected", sdisconnect, true},
{"normal", sess, false},
}
for _, c := range cases {
s.Run(c.tag, func() {
err := c.input.GoingStop()
if c.expectError {
s.Error(err)
} else {
s.NoError(err)
}
})
}
}
func (s *SessionSuite) TestRevoke() {
ctx := context.Background()
disconnected := NewSession(ctx, s.metaRoot, s.client)
disconnected.Init("test", "disconnected", false, false)
disconnected.Register()
disconnected.SetDisconnected(true)
sess := NewSession(ctx, s.metaRoot, s.client)
sess.Init("test", "normal", false, false)
sess.Register()
cases := []struct {
tag string
input *Session
preExist bool
success bool
}{
{"not_inited", &Session{}, false, true},
{"disconnected", disconnected, true, false},
{"normal", sess, false, true},
}
for _, c := range cases {
s.Run(c.tag, func() {
c.input.Revoke(time.Second)
resp, err := s.client.Get(ctx, c.input.getCompleteKey())
s.Require().NoError(err)
if !c.preExist || c.success {
s.Equal(0, len(resp.Kvs))
}
if c.preExist && !c.success {
s.Equal(1, len(resp.Kvs))
}
})
}
}
func TestSessionSuite(t *testing.T) {
suite.Run(t, new(SessionSuite))
}