milvus/internal/util/sessionutil/session_util_test.go
Zhen Ye 9942454811
enhance: remove watch at session liveness check (#45974)
issue: #45724
pr: #45968

---------

Signed-off-by: chyezh <chyezh@outlook.com>
2025-12-01 19:29:10 +08:00

846 lines
21 KiB
Go

package sessionutil
import (
"context"
"fmt"
"math/rand"
"os"
"os/exec"
"path"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/blang/semver/v4"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/json"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
func TestGetServerIDConcurrently(t *testing.T) {
ctx := context.Background()
paramtable.Init()
metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot)
etcdCli, _ := kvfactory.GetEtcdAndPath()
etcdKV := etcdkv.NewEtcdKV(etcdCli, metaRoot)
err := etcdKV.RemoveWithPrefix(ctx, "")
assert.NoError(t, err)
defer etcdKV.RemoveWithPrefix(ctx, "")
var wg sync.WaitGroup
muList := sync.Mutex{}
s := NewSessionWithEtcd(ctx, metaRoot, etcdCli)
res := make([]int64, 0)
getIDFunc := func() {
s.checkIDExist()
id, err := s.getServerID()
assert.NoError(t, err)
muList.Lock()
res = append(res, id)
muList.Unlock()
wg.Done()
}
for i := 0; i < 10; i++ {
wg.Add(1)
go getIDFunc()
}
wg.Wait()
assert.ElementsMatch(t, []int64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, res)
}
func TestInit(t *testing.T) {
ctx := context.Background()
paramtable.Init()
metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot)
etcdCli, _ := kvfactory.GetEtcdAndPath()
etcdKV := etcdkv.NewEtcdKV(etcdCli, metaRoot)
err := etcdKV.RemoveWithPrefix(ctx, "")
assert.NoError(t, err)
defer etcdKV.RemoveWithPrefix(ctx, "")
s := NewSessionWithEtcd(ctx, metaRoot, etcdCli)
s.Init("inittest", "testAddr", false, false)
assert.NotEqual(t, int64(0), s.LeaseID)
assert.NotEqual(t, int64(0), s.ServerID)
s.Register()
sessions, _, err := s.GetSessions("inittest")
assert.NoError(t, err)
assert.Contains(t, sessions, "inittest-"+strconv.FormatInt(s.ServerID, 10))
}
func TestInitNoArgs(t *testing.T) {
ctx := context.Background()
paramtable.Init()
metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot)
etcdCli, _ := kvfactory.GetEtcdAndPath()
etcdKV := etcdkv.NewEtcdKV(etcdCli, metaRoot)
err := etcdKV.RemoveWithPrefix(ctx, "")
assert.NoError(t, err)
defer etcdKV.RemoveWithPrefix(ctx, "")
s := NewSession(ctx)
s.Init("inittest", "testAddr", false, false)
assert.NotEqual(t, int64(0), s.LeaseID)
assert.NotEqual(t, int64(0), s.ServerID)
s.Register()
sessions, _, err := s.GetSessions("inittest")
assert.NoError(t, err)
assert.Contains(t, sessions, "inittest-"+strconv.FormatInt(s.ServerID, 10))
}
func TestUpdateSessions(t *testing.T) {
ctx := context.Background()
paramtable.Init()
metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot)
etcdCli, _ := kvfactory.GetEtcdAndPath()
etcdKV := etcdkv.NewEtcdKV(etcdCli, "")
defer etcdKV.RemoveWithPrefix(ctx, "")
var wg sync.WaitGroup
muList := sync.Mutex{}
s := NewSessionWithEtcd(ctx, metaRoot, etcdCli, WithResueNodeID(false))
sessions, rev, err := s.GetSessions("test")
assert.NoError(t, err)
assert.Equal(t, len(sessions), 0)
watcher := s.WatchServices("test", rev, nil)
sList := []*Session{}
getIDFunc := func() {
singleS := NewSessionWithEtcd(ctx, metaRoot, etcdCli, WithResueNodeID(false))
singleS.Init("test", "testAddr", false, false)
singleS.Register()
muList.Lock()
sList = append(sList, singleS)
muList.Unlock()
wg.Done()
}
for i := 0; i < 10; i++ {
wg.Add(1)
go getIDFunc()
}
wg.Wait()
assert.Eventually(t, func() bool {
sessions, _, _ := s.GetSessions("test")
return len(sessions) == 10
}, 10*time.Second, 100*time.Millisecond)
notExistSessions, _, _ := s.GetSessions("testt")
assert.Equal(t, len(notExistSessions), 0)
etcdKV.RemoveWithPrefix(ctx, metaRoot)
assert.Eventually(t, func() bool {
sessions, _, _ := s.GetSessions("test")
return len(sessions) == 0
}, 10*time.Second, 100*time.Millisecond)
sessionEvents := []*SessionEvent{}
addEventLen := 0
delEventLen := 0
ch := time.After(time.Second * 5)
LOOP:
for {
select {
case <-ch:
t.FailNow()
case sessionEvent := <-watcher.EventChannel():
if sessionEvent.EventType == SessionAddEvent {
addEventLen++
}
if sessionEvent.EventType == SessionDelEvent {
delEventLen++
}
sessionEvents = append(sessionEvents, sessionEvent)
if len(sessionEvents) == 20 {
break LOOP
}
}
}
assert.Equal(t, addEventLen, 10)
assert.Equal(t, delEventLen, 10)
}
func TestWatcherHandleWatchResp(t *testing.T) {
ctx := context.Background()
paramtable.Init()
metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot)
etcdCli, _ := kvfactory.GetEtcdAndPath()
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/by-dev/session-ut")
defer etcdKV.RemoveWithPrefix(ctx, "/by-dev/session-ut")
s := NewSessionWithEtcd(ctx, metaRoot, etcdCli)
defer s.Stop()
getWatcher := func(s *Session, rewatch Rewatch) *sessionWatcher {
return &sessionWatcher{
s: s,
prefix: "test",
rewatch: rewatch,
eventCh: make(chan *SessionEvent, 10),
validate: func(*Session) bool { return true },
}
}
t.Run("handle normal events", func(t *testing.T) {
w := getWatcher(s, nil)
wresp := clientv3.WatchResponse{
Events: []*clientv3.Event{
{
Type: mvccpb.PUT,
Kv: &mvccpb.KeyValue{
Value: []byte(`{"ServerID": 1, "ServerName": "test1"}`),
},
},
{
Type: mvccpb.DELETE,
PrevKv: &mvccpb.KeyValue{
Value: []byte(`{"ServerID": 2, "ServerName": "test2"}`),
},
},
},
}
assert.NotPanics(t, func() {
w.handleWatchResponse(wresp)
})
assert.Equal(t, 2, len(w.eventCh))
})
t.Run("handle abnormal events", func(t *testing.T) {
w := getWatcher(s, nil)
wresp := clientv3.WatchResponse{
Events: []*clientv3.Event{
{
Type: mvccpb.PUT,
Kv: &mvccpb.KeyValue{
Value: []byte(``),
},
},
{
Type: mvccpb.DELETE,
PrevKv: &mvccpb.KeyValue{
Value: []byte(``),
},
},
},
}
assert.NotPanics(t, func() {
w.handleWatchResponse(wresp)
})
assert.Equal(t, 0, len(w.eventCh))
})
t.Run("err compacted resp, nil Rewatch", func(t *testing.T) {
w := getWatcher(s, nil)
wresp := clientv3.WatchResponse{
CompactRevision: 1,
}
assert.NotPanics(t, func() {
w.handleWatchResponse(wresp)
})
})
t.Run("err compacted resp, valid Rewatch", func(t *testing.T) {
w := getWatcher(s, func(sessions map[string]*Session) error {
return nil
})
wresp := clientv3.WatchResponse{
CompactRevision: 1,
}
assert.NotPanics(t, func() {
w.handleWatchResponse(wresp)
})
})
t.Run("err canceled", func(t *testing.T) {
w := getWatcher(s, nil)
wresp := clientv3.WatchResponse{
Canceled: true,
}
assert.Panics(t, func() {
w.handleWatchResponse(wresp)
})
})
t.Run("err handled but rewatch failed", func(t *testing.T) {
w := getWatcher(s, func(sessions map[string]*Session) error {
return errors.New("mocked")
})
wresp := clientv3.WatchResponse{
CompactRevision: 1,
}
assert.Panics(t, func() {
w.handleWatchResponse(wresp)
})
})
}
func TestSession_Registered(t *testing.T) {
session := &Session{}
session.UpdateRegistered(false)
assert.False(t, session.Registered())
session.UpdateRegistered(true)
assert.True(t, session.Registered())
}
func TestSession_String(t *testing.T) {
s := &Session{}
log.Debug("log session", zap.Any("session", s))
}
func TestSesssionMarshal(t *testing.T) {
s := &Session{
SessionRaw: SessionRaw{
ServerID: 1,
ServerName: "test",
Address: "localhost",
},
Version: common.Version,
}
bs, err := json.Marshal(s)
require.NoError(t, err)
s2 := &Session{}
err = json.Unmarshal(bs, s2)
assert.NoError(t, err)
assert.Equal(t, s.ServerID, s2.ServerID)
assert.Equal(t, s.ServerName, s2.ServerName)
assert.Equal(t, s.Address, s2.Address)
assert.Equal(t, s.Version.String(), s2.Version.String())
}
func TestSessionUnmarshal(t *testing.T) {
t.Run("json failure", func(t *testing.T) {
s := &Session{}
err := json.Unmarshal([]byte("garbage"), s)
assert.Error(t, err)
})
t.Run("version error", func(t *testing.T) {
s := &Session{}
err := json.Unmarshal([]byte(`{"Version": "a.b.c"}`), s)
assert.Error(t, err)
})
}
type SessionWithVersionSuite struct {
suite.Suite
tmpDir string
metaRoot string
serverName string
sessions []*Session
client *clientv3.Client
}
func (suite *SessionWithVersionSuite) SetupSuite() {
client, _ := kvfactory.GetEtcdAndPath()
suite.client = client
}
func (suite *SessionWithVersionSuite) SetupTest() {
ctx := context.Background()
suite.metaRoot = "sessionWithVersion"
suite.serverName = "sessionComp"
s1 := NewSessionWithEtcd(ctx, suite.metaRoot, suite.client, WithResueNodeID(false))
s1.Version.Major, s1.Version.Minor, s1.Version.Patch = 0, 0, 0
s1.Init(suite.serverName, "s1", false, false)
s1.Register()
suite.sessions = append(suite.sessions, s1)
s2 := NewSessionWithEtcd(ctx, suite.metaRoot, suite.client, WithResueNodeID(false))
s2.Version.Major, s2.Version.Minor, s2.Version.Patch = 2, 1, 0
s2.Init(suite.serverName, "s2", false, false)
s2.Register()
suite.sessions = append(suite.sessions, s2)
s3 := NewSessionWithEtcd(ctx, suite.metaRoot, suite.client, WithResueNodeID(false))
s3.Version.Major, s3.Version.Minor, s3.Version.Patch = 2, 2, 0
s3.Version.Build = []string{"dev"}
s3.Init(suite.serverName, "s3", false, false)
s3.Register()
suite.sessions = append(suite.sessions, s3)
}
func (suite *SessionWithVersionSuite) TearDownTest() {
for _, s := range suite.sessions {
s.Stop()
}
suite.sessions = nil
client, _ := kvfactory.GetEtcdAndPath()
_, err := client.Delete(context.Background(), suite.metaRoot, clientv3.WithPrefix())
suite.Require().NoError(err)
}
func (suite *SessionWithVersionSuite) TestGetSessionsWithRangeVersion() {
s := NewSessionWithEtcd(context.Background(), suite.metaRoot, suite.client, WithResueNodeID(false))
suite.Run(">1.0.0", func() {
r, err := semver.ParseRange(">1.0.0")
suite.Require().NoError(err)
result, _, err := s.GetSessionsWithVersionRange(suite.serverName, r)
suite.Require().NoError(err)
suite.Equal(2, len(result))
})
suite.Run(">2.1.0", func() {
r, err := semver.ParseRange(">2.1.0")
suite.Require().NoError(err)
result, _, err := s.GetSessionsWithVersionRange(suite.serverName, r)
suite.Require().NoError(err)
suite.Equal(1, len(result))
})
suite.Run(">2.2.0", func() {
r, err := semver.ParseRange(">2.2.0")
suite.Require().NoError(err)
result, _, err := s.GetSessionsWithVersionRange(suite.serverName, r)
suite.Require().NoError(err)
suite.Equal(0, len(result))
suite.T().Log(result)
})
suite.Run(">=0.0.0 with garbage", func() {
ctx := context.Background()
r, err := semver.ParseRange(">=0.0.0")
suite.Require().NoError(err)
suite.client.Put(ctx, path.Join(suite.metaRoot, DefaultServiceRoot, suite.serverName, "garbage"), "garbage")
suite.client.Put(ctx, path.Join(suite.metaRoot, DefaultServiceRoot, suite.serverName, "garbage_1"), `{"Version": "a.b.c"}`)
_, _, err = s.GetSessionsWithVersionRange(suite.serverName, r)
suite.Error(err)
})
}
func (suite *SessionWithVersionSuite) TestWatchServicesWithVersionRange() {
s := NewSessionWithEtcd(context.Background(), suite.metaRoot, suite.client, WithResueNodeID(false))
suite.Run(">1.0.0 <=2.1.0", func() {
r, err := semver.ParseRange(">1.0.0 <=2.1.0")
suite.Require().NoError(err)
_, rev, err := s.GetSessionsWithVersionRange(suite.serverName, r)
suite.Require().NoError(err)
watcher := s.WatchServicesWithVersionRange(suite.serverName, r, rev, nil)
// remove all sessions
go func() {
for _, s := range suite.sessions {
s.Stop()
}
}()
select {
case evt := <-watcher.EventChannel():
suite.Equal(suite.sessions[1].ServerID, evt.Session.ServerID)
case <-time.After(time.Second):
suite.Fail("no event received, failing")
}
})
}
func TestSessionWithVersionRange(t *testing.T) {
suite.Run(t, new(SessionWithVersionSuite))
}
func TestSessionProcessActiveStandBy(t *testing.T) {
ctx := context.TODO()
// initial etcd
paramtable.Init()
metaRoot := fmt.Sprintf("%d/%s1", rand.Int(), DefaultServiceRoot)
etcdCli, _ := kvfactory.GetEtcdAndPath()
etcdKV := etcdkv.NewEtcdKV(etcdCli, metaRoot)
err := etcdKV.RemoveWithPrefix(ctx, "")
assert.NoError(t, err)
defer etcdKV.RemoveWithPrefix(ctx, "")
var wg sync.WaitGroup
flag := false
// register session 1, will be active
ctx1 := context.Background()
s1 := NewSessionWithEtcd(ctx1, metaRoot, etcdCli, WithResueNodeID(false))
s1.Init("inittest", "testAddr", true, true)
s1.SetEnableActiveStandBy(true)
s1.Register()
wg.Add(1)
s1.ProcessActiveStandBy(func() error {
log.Debug("Session 1 become active")
wg.Done()
return nil
})
wg.Wait()
assert.False(t, s1.isStandby.Load().(bool))
// register session 2, will be standby
ctx2 := context.Background()
s2 := NewSessionWithEtcd(ctx2, metaRoot, etcdCli, WithResueNodeID(false))
s2.Init("inittest", "testAddr", true, true)
s2.SetEnableActiveStandBy(true)
s2.Register()
wg.Add(1)
go s2.ProcessActiveStandBy(func() error {
log.Debug("Session 2 become active")
wg.Done()
return nil
})
assert.True(t, s2.isStandby.Load().(bool))
// assert.True(t, s2.watchingPrimaryKeyLock)
// stop session 1, session 2 will take over primary service
log.Debug("Stop session 1, session 2 will take over primary service")
assert.False(t, flag)
s1.Stop()
wg.Wait()
log.Debug("session s2 wait done")
assert.False(t, s2.isStandby.Load().(bool))
s2.Stop()
}
func TestSessionEventType_String(t *testing.T) {
tests := []struct {
name string
t SessionEventType
want string
}{
{t: SessionNoneEvent, want: ""},
{t: SessionAddEvent, want: "SessionAddEvent"},
{t: SessionDelEvent, want: "SessionDelEvent"},
{t: SessionUpdateEvent, want: "SessionUpdateEvent"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, tt.t.String(), "String()")
})
}
}
func TestServerInfoOp(t *testing.T) {
t.Run("test with specified pid", func(t *testing.T) {
pid := 9999999
serverID := int64(999)
filePath := GetServerInfoFilePath(pid)
defer os.RemoveAll(filePath)
saveServerInfoInternal(typeutil.QueryCoordRole, serverID, pid)
saveServerInfoInternal(typeutil.DataCoordRole, serverID, pid)
saveServerInfoInternal(typeutil.ProxyRole, serverID, pid)
sessions := GetSessions(pid)
assert.Equal(t, 3, len(sessions))
assert.ElementsMatch(t, sessions, []string{
"querycoord-999",
"datacoord-999",
"proxy-999",
})
RemoveServerInfoFile(pid)
sessions = GetSessions(pid)
assert.Equal(t, 0, len(sessions))
})
t.Run("test with os pid", func(t *testing.T) {
serverID := int64(9999)
filePath := GetServerInfoFilePath(os.Getpid())
defer os.RemoveAll(filePath)
SaveServerInfo(typeutil.QueryCoordRole, serverID)
sessions := GetSessions(os.Getpid())
assert.Equal(t, 1, len(sessions))
})
}
func TestSession_apply(t *testing.T) {
session := &Session{}
opts := []SessionOption{WithTTL(100), WithRetryTimes(200)}
session.apply(opts...)
assert.Equal(t, int64(100), session.sessionTTL)
assert.Equal(t, int64(200), session.sessionRetryTimes)
}
func TestIntegrationMode(t *testing.T) {
ctx := context.Background()
paramtable.Init()
params := paramtable.Get()
params.Save(params.IntegrationTestCfg.IntegrationMode.Key, "true")
endpoints := params.EtcdCfg.Endpoints.GetValue()
metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot)
etcdEndpoints := strings.Split(endpoints, ",")
etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints)
require.NoError(t, err)
etcdKV := etcdkv.NewEtcdKV(etcdCli, metaRoot)
err = etcdKV.RemoveWithPrefix(ctx, "")
assert.NoError(t, err)
s1 := NewSessionWithEtcd(ctx, metaRoot, etcdCli)
assert.Equal(t, false, s1.reuseNodeID)
s2 := NewSessionWithEtcd(ctx, metaRoot, etcdCli)
assert.Equal(t, false, s2.reuseNodeID)
s1.Init("inittest1", "testAddr1", false, false)
s1.Init("inittest2", "testAddr2", false, false)
assert.NotEqual(t, s1.ServerID, s2.ServerID)
}
type SessionSuite struct {
suite.Suite
tmpDir string
metaRoot string
serverName string
client *clientv3.Client
}
func (s *SessionSuite) SetupSuite() {
paramtable.Init()
}
func (s *SessionSuite) TearDownSuite() {
}
func (s *SessionSuite) SetupTest() {
s.client, _ = kvfactory.GetEtcdAndPath()
s.metaRoot = fmt.Sprintf("milvus-ut/session-%s/", funcutil.GenRandomStr())
}
func (s *SessionSuite) TearDownTest() {
_, err := s.client.Delete(context.Background(), s.metaRoot, clientv3.WithPrefix())
s.Require().NoError(err)
}
func (s *SessionSuite) TestDisconnected() {
st := &Session{}
st.SetDisconnected(true)
sf := &Session{}
sf.SetDisconnected(false)
cases := []struct {
tag string
input *Session
expect bool
}{
{"not_set", &Session{}, false},
{"set_true", st, true},
{"set_false", sf, false},
}
for _, c := range cases {
s.Run(c.tag, func() {
s.Equal(c.expect, c.input.Disconnected())
})
}
}
func (s *SessionSuite) TestGoingStop() {
ctx := context.Background()
sdisconnect := NewSessionWithEtcd(ctx, s.metaRoot, s.client)
sdisconnect.SetDisconnected(true)
sess := NewSessionWithEtcd(ctx, s.metaRoot, s.client)
sess.Init("test", "normal", false, false)
sess.Register()
cases := []struct {
tag string
input *Session
expectError bool
}{
{"nil", nil, true},
{"not_inited", &Session{}, true},
{"disconnected", sdisconnect, true},
{"normal", sess, false},
}
for _, c := range cases {
s.Run(c.tag, func() {
err := c.input.GoingStop()
if c.expectError {
s.Error(err)
} else {
s.NoError(err)
}
})
}
}
func (s *SessionSuite) TestKeepAliveRetryActiveCancel() {
ctx := context.Background()
session := NewSessionWithEtcd(ctx, s.metaRoot, s.client)
session.Init("test", "normal", false, false)
// Register
err := session.registerService()
s.Require().NoError(err)
session.startKeepAliveLoop()
session.Stop()
// wait workers exit
session.wg.Wait()
}
func (s *SessionSuite) TestKeepAliveRetryChannelClose() {
ctx := context.Background()
session := NewSessionWithEtcd(ctx, s.metaRoot, s.client)
session.Init("test", "normal", false, false)
// Register
err := session.registerService()
if err != nil {
panic(err)
}
closeChan := make(chan *clientv3.LeaseKeepAliveResponse)
session.startKeepAliveLoop()
// close channel, should retry connect
close(closeChan)
// sleep a while wait goroutine process
time.Sleep(time.Millisecond * 100)
// expected Disconnected = false, means session is not closed
assert.Equal(s.T(), false, session.Disconnected())
time.Sleep(time.Second * 1)
// expected Disconnected = false, means session is not closed, keepalive keeps working
assert.Equal(s.T(), false, session.Disconnected())
}
func (s *SessionSuite) TestGetSessions() {
os.Setenv("MILVUS_SERVER_LABEL_key1", "value1")
os.Setenv("MILVUS_SERVER_LABEL_key2", "value2")
os.Setenv("key3", "value3")
defer os.Unsetenv("MILVUS_SERVER_LABEL_key1")
defer os.Unsetenv("MILVUS_SERVER_LABEL_key2")
defer os.Unsetenv("key3")
ret := GetServerLabelsFromEnv("querynode")
assert.Equal(s.T(), 2, len(ret))
assert.Equal(s.T(), "value1", ret["key1"])
assert.Equal(s.T(), "value2", ret["key2"])
}
func (s *SessionSuite) TestSessionLifetime() {
ctx := context.Background()
session := NewSessionWithEtcd(ctx, s.metaRoot, s.client)
session.Init("test", "normal", false, false)
session.Register()
resp, err := s.client.Get(ctx, session.getCompleteKey())
s.Require().NoError(err)
s.Equal(1, len(resp.Kvs))
str, err := json.Marshal(session.SessionRaw)
s.Require().NoError(err)
s.Equal(string(resp.Kvs[0].Value), string(str))
ttlResp, err := s.client.Lease.TimeToLive(ctx, *session.LeaseID)
s.Require().NoError(err)
s.Greater(ttlResp.TTL, int64(0))
session.GoingStop()
resp, err = s.client.Get(ctx, session.getCompleteKey())
s.Require().True(session.SessionRaw.Stopping)
s.Require().NoError(err)
s.Equal(1, len(resp.Kvs))
str, err = json.Marshal(session.SessionRaw)
s.Require().NoError(err)
s.Equal(string(resp.Kvs[0].Value), string(str))
session.Stop()
session.wg.Wait()
resp, err = s.client.Get(ctx, session.getCompleteKey())
s.Require().NoError(err)
s.Equal(0, len(resp.Kvs))
ttlResp, err = s.client.Lease.TimeToLive(ctx, *session.LeaseID)
s.Require().NoError(err)
s.Equal(int64(-1), ttlResp.TTL)
}
func TestSessionSuite(t *testing.T) {
suite.Run(t, new(SessionSuite))
}
func TestForceKill(t *testing.T) {
if os.Getenv("TEST_EXIT") == "1" {
testForceKill("testForceKill")
return
}
cmd := exec.Command(os.Args[0], "-test.run=TestForceKill") /* #nosec G204 */
cmd.Env = append(os.Environ(), "TEST_EXIT=1")
err := cmd.Run()
// 子进程退出码
if e, ok := err.(*exec.ExitError); ok {
if e.ExitCode() != 1 {
t.Fatalf("expected exit 1, got %d", e.ExitCode())
}
} else {
t.Fatalf("unexpected error: %#v", err)
}
}
func testForceKill(serverName string) {
etcdCli, _ := kvfactory.GetEtcdAndPath()
session := NewSessionWithEtcd(context.Background(), "test", etcdCli)
session.Init(serverName, "normal", false, false)
session.Register()
// trigger a force kill
etcdCli.Revoke(context.Background(), *session.LeaseID)
}