mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Fix ut about disconnect etcd (#22238)
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
144d8512f4
commit
dd63c71bd0
@ -161,7 +161,7 @@ func (r *Runner) CheckSessions() error {
|
||||
|
||||
func (r *Runner) RegisterSession() error {
|
||||
r.session.Register()
|
||||
go r.session.LivenessCheck(r.ctx, func() {})
|
||||
r.session.LivenessCheck(r.ctx, func() {})
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -228,7 +228,7 @@ func (s *Server) Register() error {
|
||||
}
|
||||
metrics.NumNodes.WithLabelValues(strconv.FormatInt(s.session.ServerID, 10), typeutil.DataCoordRole).Inc()
|
||||
log.Info("DataCoord Register Finished")
|
||||
go s.session.LivenessCheck(s.serverLoopCtx, func() {
|
||||
s.session.LivenessCheck(s.serverLoopCtx, func() {
|
||||
logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.ServerID))
|
||||
if err := s.Stop(); err != nil {
|
||||
logutil.Logger(s.ctx).Fatal("failed to stop server", zap.Error(err))
|
||||
|
||||
@ -3642,11 +3642,12 @@ func newTestServerWithMeta(t *testing.T, receiveCh chan any, meta *meta, opts ..
|
||||
assert.Nil(t, err)
|
||||
svr.meta = meta
|
||||
|
||||
err = svr.Start()
|
||||
assert.Nil(t, err)
|
||||
err = svr.Register()
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = svr.Start()
|
||||
assert.Nil(t, err)
|
||||
|
||||
// Stop channal watch state watcher in tests
|
||||
if svr.channelManager != nil && svr.channelManager.stopChecker != nil {
|
||||
svr.channelManager.stopChecker()
|
||||
@ -3692,11 +3693,13 @@ func newTestServer2(t *testing.T, receiveCh chan any, opts ...Option) *Server {
|
||||
|
||||
err = svr.Init()
|
||||
assert.Nil(t, err)
|
||||
err = svr.Start()
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = svr.Register()
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = svr.Start()
|
||||
assert.Nil(t, err)
|
||||
|
||||
// Stop channal watch state watcher in tests
|
||||
if svr.channelManager != nil && svr.channelManager.stopChecker != nil {
|
||||
svr.channelManager.stopChecker()
|
||||
@ -3883,11 +3886,13 @@ func testDataCoordBase(t *testing.T, opts ...Option) *Server {
|
||||
|
||||
err = svr.Init()
|
||||
assert.Nil(t, err)
|
||||
err = svr.Start()
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = svr.Register()
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = svr.Start()
|
||||
assert.Nil(t, err)
|
||||
|
||||
resp, err := svr.GetComponentStates(context.Background())
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
|
||||
|
||||
@ -185,7 +185,7 @@ func (node *DataNode) Register() error {
|
||||
metrics.NumNodes.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), typeutil.DataNodeRole).Inc()
|
||||
log.Info("DataNode Register Finished")
|
||||
// Start liveness check
|
||||
go node.session.LivenessCheck(node.ctx, func() {
|
||||
node.session.LivenessCheck(node.ctx, func() {
|
||||
log.Error("Data Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID))
|
||||
if err := node.Stop(); err != nil {
|
||||
log.Fatal("failed to stop server", zap.Error(err))
|
||||
|
||||
@ -853,8 +853,6 @@ func TestWatchChannel(t *testing.T) {
|
||||
err = node.Start()
|
||||
assert.Nil(t, err)
|
||||
defer node.Stop()
|
||||
err = node.Register()
|
||||
assert.Nil(t, err)
|
||||
|
||||
defer cancel()
|
||||
|
||||
|
||||
@ -141,7 +141,7 @@ func (i *IndexCoord) Register() error {
|
||||
}
|
||||
metrics.NumNodes.WithLabelValues(strconv.FormatInt(i.session.ServerID, 10), typeutil.IndexCoordRole).Inc()
|
||||
log.Info("IndexCoord Register Finished")
|
||||
go i.session.LivenessCheck(i.loopCtx, func() {
|
||||
i.session.LivenessCheck(i.loopCtx, func() {
|
||||
log.Error("Index Coord disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID))
|
||||
if err := i.Stop(); err != nil {
|
||||
log.Fatal("failed to stop server", zap.Error(err))
|
||||
|
||||
@ -131,7 +131,7 @@ func (i *IndexNode) Register() error {
|
||||
log.Info("IndexNode Register Finished")
|
||||
|
||||
//start liveness check
|
||||
go i.session.LivenessCheck(i.loopCtx, func() {
|
||||
i.session.LivenessCheck(i.loopCtx, func() {
|
||||
log.Error("Index Node disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID))
|
||||
if err := i.Stop(); err != nil {
|
||||
log.Fatal("failed to stop server", zap.Error(err))
|
||||
|
||||
@ -132,7 +132,7 @@ func (node *Proxy) Register() error {
|
||||
node.session.Register()
|
||||
metrics.NumNodes.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), typeutil.ProxyRole).Inc()
|
||||
log.Info("Proxy Register Finished")
|
||||
go node.session.LivenessCheck(node.ctx, func() {
|
||||
node.session.LivenessCheck(node.ctx, func() {
|
||||
log.Error("Proxy disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID))
|
||||
if err := node.Stop(); err != nil {
|
||||
log.Fatal("failed to stop server", zap.Error(err))
|
||||
|
||||
@ -135,7 +135,7 @@ func (s *Server) Register() error {
|
||||
}
|
||||
metrics.NumNodes.WithLabelValues(strconv.FormatInt(s.session.ServerID, 10), typeutil.QueryCoordRole).Inc()
|
||||
log.Info("QueryCoord Register Finished")
|
||||
go s.session.LivenessCheck(s.ctx, func() {
|
||||
s.session.LivenessCheck(s.ctx, func() {
|
||||
log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.ServerID))
|
||||
if err := s.Stop(); err != nil {
|
||||
log.Fatal("failed to stop server", zap.Error(err))
|
||||
|
||||
@ -212,10 +212,11 @@ func (suite *ServerSuite) TestDisableActiveStandby() {
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.StateCode_Initializing, suite.server.status.Load().(commonpb.StateCode))
|
||||
suite.hackServer()
|
||||
err = suite.server.Start()
|
||||
suite.NoError(err)
|
||||
err = suite.server.Register()
|
||||
suite.NoError(err)
|
||||
err = suite.server.Start()
|
||||
suite.NoError(err)
|
||||
|
||||
suite.Equal(commonpb.StateCode_Healthy, suite.server.status.Load().(commonpb.StateCode))
|
||||
|
||||
states, err := suite.server.GetComponentStates(context.Background())
|
||||
|
||||
@ -170,7 +170,7 @@ func (node *QueryNode) Register() error {
|
||||
metrics.NumNodes.WithLabelValues(strconv.FormatInt(node.session.ServerID, 10), typeutil.QueryNodeRole).Inc()
|
||||
log.Info("QueryNode Register Finished")
|
||||
// start liveness check
|
||||
go node.session.LivenessCheck(node.queryNodeLoopCtx, func() {
|
||||
node.session.LivenessCheck(node.queryNodeLoopCtx, func() {
|
||||
log.Error("Query Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID))
|
||||
if err := node.Stop(); err != nil {
|
||||
log.Fatal("failed to stop server", zap.Error(err))
|
||||
|
||||
@ -304,7 +304,7 @@ func (c *Core) Register() error {
|
||||
}
|
||||
metrics.NumNodes.WithLabelValues(strconv.FormatInt(c.session.ServerID, 10), typeutil.RootCoordRole).Inc()
|
||||
log.Info("RootCoord Register Finished")
|
||||
go c.session.LivenessCheck(c.ctx, func() {
|
||||
c.session.LivenessCheck(c.ctx, func() {
|
||||
log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID))
|
||||
if err := c.Stop(); err != nil {
|
||||
log.Fatal("failed to stop server", zap.Error(err))
|
||||
|
||||
@ -1365,11 +1365,12 @@ func TestRootcoord_EnableActiveStandby(t *testing.T) {
|
||||
err = core.Init()
|
||||
assert.Equal(t, commonpb.StateCode_StandBy, core.stateCode.Load().(commonpb.StateCode))
|
||||
assert.NoError(t, err)
|
||||
err = core.Start()
|
||||
assert.NoError(t, err)
|
||||
core.session.TriggerKill = false
|
||||
err = core.Register()
|
||||
assert.NoError(t, err)
|
||||
err = core.Start()
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, commonpb.StateCode_Healthy, core.stateCode.Load().(commonpb.StateCode))
|
||||
resp, err := core.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
@ -1416,11 +1417,11 @@ func TestRootcoord_DisableActiveStandby(t *testing.T) {
|
||||
err = core.Init()
|
||||
assert.Equal(t, commonpb.StateCode_Initializing, core.stateCode.Load().(commonpb.StateCode))
|
||||
assert.NoError(t, err)
|
||||
err = core.Start()
|
||||
assert.NoError(t, err)
|
||||
core.session.TriggerKill = false
|
||||
err = core.Register()
|
||||
assert.NoError(t, err)
|
||||
err = core.Start()
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.StateCode_Healthy, core.stateCode.Load().(commonpb.StateCode))
|
||||
resp, err := core.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
|
||||
@ -669,68 +669,71 @@ func (s *Session) LivenessCheck(ctx context.Context, callback func()) {
|
||||
panic(err)
|
||||
}
|
||||
s.watchSessionKeyCh = s.etcdCli.Watch(context.Background(), s.getSessionKey(), clientv3.WithRev(getResp.Header.Revision))
|
||||
for {
|
||||
select {
|
||||
case _, ok := <-s.liveCh:
|
||||
// ok, still alive
|
||||
if ok {
|
||||
continue
|
||||
}
|
||||
// not ok, connection lost
|
||||
log.Warn("connection lost detected, shuting down")
|
||||
if callback != nil {
|
||||
go callback()
|
||||
}
|
||||
return
|
||||
case <-ctx.Done():
|
||||
log.Info("liveness exits due to context done")
|
||||
// cancel the etcd keepAlive context
|
||||
if s.keepAliveCancel != nil {
|
||||
s.keepAliveCancel()
|
||||
}
|
||||
return
|
||||
case resp, ok := <-s.watchSessionKeyCh:
|
||||
if !ok {
|
||||
log.Warn("watch session key channel closed")
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case _, ok := <-s.liveCh:
|
||||
// ok, still alive
|
||||
if ok {
|
||||
continue
|
||||
}
|
||||
// not ok, connection lost
|
||||
log.Warn("connection lost detected, shuting down")
|
||||
if callback != nil {
|
||||
go callback()
|
||||
}
|
||||
return
|
||||
case <-ctx.Done():
|
||||
log.Info("liveness exits due to context done")
|
||||
// cancel the etcd keepAlive context
|
||||
if s.keepAliveCancel != nil {
|
||||
s.keepAliveCancel()
|
||||
}
|
||||
return
|
||||
}
|
||||
if resp.Err() != nil {
|
||||
// if not ErrCompacted, just close the channel
|
||||
if resp.Err() != v3rpc.ErrCompacted {
|
||||
//close event channel
|
||||
log.Warn("Watch service found error", zap.Error(resp.Err()))
|
||||
case resp, ok := <-s.watchSessionKeyCh:
|
||||
if !ok {
|
||||
log.Warn("watch session key channel closed")
|
||||
if s.keepAliveCancel != nil {
|
||||
s.keepAliveCancel()
|
||||
}
|
||||
return
|
||||
}
|
||||
log.Warn("Watch service found compacted error", zap.Error(resp.Err()))
|
||||
getResp, err := s.etcdCli.Get(s.ctx, s.getSessionKey())
|
||||
if err != nil || len(getResp.Kvs) == 0 {
|
||||
if s.keepAliveCancel != nil {
|
||||
s.keepAliveCancel()
|
||||
if resp.Err() != nil {
|
||||
// if not ErrCompacted, just close the channel
|
||||
if resp.Err() != v3rpc.ErrCompacted {
|
||||
//close event channel
|
||||
log.Warn("Watch service found error", zap.Error(resp.Err()))
|
||||
if s.keepAliveCancel != nil {
|
||||
s.keepAliveCancel()
|
||||
}
|
||||
return
|
||||
}
|
||||
return
|
||||
log.Warn("Watch service found compacted error", zap.Error(resp.Err()))
|
||||
getResp, err := s.etcdCli.Get(s.ctx, s.getSessionKey())
|
||||
if err != nil || len(getResp.Kvs) == 0 {
|
||||
if s.keepAliveCancel != nil {
|
||||
s.keepAliveCancel()
|
||||
}
|
||||
return
|
||||
}
|
||||
s.watchSessionKeyCh = s.etcdCli.Watch(s.ctx, s.getSessionKey(), clientv3.WithRev(getResp.Header.Revision))
|
||||
continue
|
||||
}
|
||||
s.watchSessionKeyCh = s.etcdCli.Watch(s.ctx, s.getSessionKey(), clientv3.WithRev(getResp.Header.Revision))
|
||||
continue
|
||||
}
|
||||
for _, event := range resp.Events {
|
||||
switch event.Type {
|
||||
case mvccpb.PUT:
|
||||
log.Info("register session success", zap.String("role", s.ServerName), zap.String("key", string(event.Kv.Key)))
|
||||
case mvccpb.DELETE:
|
||||
log.Info("session key is deleted, exit...", zap.String("role", s.ServerName), zap.String("key", string(event.Kv.Key)))
|
||||
if s.keepAliveCancel != nil {
|
||||
s.keepAliveCancel()
|
||||
for _, event := range resp.Events {
|
||||
switch event.Type {
|
||||
case mvccpb.PUT:
|
||||
log.Info("register session success", zap.String("role", s.ServerName), zap.String("key", string(event.Kv.Key)))
|
||||
case mvccpb.DELETE:
|
||||
log.Info("session key is deleted, exit...", zap.String("role", s.ServerName), zap.String("key", string(event.Kv.Key)))
|
||||
if s.keepAliveCancel != nil {
|
||||
s.keepAliveCancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
}
|
||||
|
||||
// Revoke revokes the internal leaseID for the session key
|
||||
|
||||
@ -202,7 +202,7 @@ func TestSessionLivenessCheck(t *testing.T) {
|
||||
|
||||
flag := false
|
||||
|
||||
go s.LivenessCheck(ctx, func() {
|
||||
s.LivenessCheck(ctx, func() {
|
||||
flag = true
|
||||
signal <- struct{}{}
|
||||
})
|
||||
@ -222,7 +222,7 @@ func TestSessionLivenessCheck(t *testing.T) {
|
||||
s.liveCh = ch
|
||||
flag = false
|
||||
|
||||
go s.LivenessCheck(ctx, func() {
|
||||
s.LivenessCheck(ctx, func() {
|
||||
flag = true
|
||||
signal <- struct{}{}
|
||||
})
|
||||
@ -658,7 +658,7 @@ func TestSessionProcessActiveStandBy(t *testing.T) {
|
||||
wg.Done()
|
||||
return nil
|
||||
})
|
||||
go s1.LivenessCheck(ctx1, func() {
|
||||
s1.LivenessCheck(ctx1, func() {
|
||||
flag = true
|
||||
signal <- struct{}{}
|
||||
s1.keepAliveCancel()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user