From 9433a24f5d33033ad247d86dcd78f09a00828e47 Mon Sep 17 00:00:00 2001 From: wei liu Date: Fri, 22 Sep 2023 19:13:25 +0800 Subject: [PATCH] fix component not exit when liveness check failed (#27236) Signed-off-by: Wei Liu --- internal/util/sessionutil/session_util.go | 22 +++++--- .../util/sessionutil/session_util_test.go | 51 ++++++++++++------- 2 files changed, 50 insertions(+), 23 deletions(-) diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index c45fc40b51..b73fa3be72 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -372,10 +372,10 @@ func (s *Session) initWatchSessionCh(ctx context.Context) error { err = retry.Do(ctx, func() error { getResp, err = s.etcdCli.Get(ctx, s.getSessionKey()) - log.Warn("fail to get the session key from the etcd", zap.Error(err)) return err }, retry.Attempts(uint(s.sessionRetryTimes))) if err != nil { + log.Warn("fail to get the session key from the etcd", zap.Error(err)) return err } s.watchSessionKeyCh = s.etcdCli.Watch(ctx, s.getSessionKey(), clientv3.WithRev(getResp.Header.Revision)) @@ -785,18 +785,31 @@ func (w *sessionWatcher) handleWatchErr(err error) error { // LivenessCheck performs liveness check with provided context and channel // ctx controls the liveness check loop // ch is the liveness signal channel, ch is closed only when the session is expired -// callback is the function to call when ch is closed, note that callback will not be invoked when loop exits due to context +// callback must be called before liveness check exit, to close the session's owner component func (s *Session) LivenessCheck(ctx context.Context, callback func()) { err := s.initWatchSessionCh(ctx) if err != nil { log.Error("failed to get session for liveness check", zap.Error(err)) s.cancelKeepAlive() + if callback != nil { + go callback() + } + return } + s.wg.Add(1) go func() { defer s.wg.Done() + if callback != nil { + // before exit liveness check, callback to exit the session owner + defer func() { + if ctx.Err() == nil { + go callback() + } + }() + } + defer s.SetDisconnected(true) for { - defer s.SetDisconnected(true) select { case _, ok := <-s.liveCh: // ok, still alive @@ -805,9 +818,6 @@ func (s *Session) LivenessCheck(ctx context.Context, callback func()) { } // not ok, connection lost log.Warn("connection lost detected, shuting down") - if callback != nil { - go callback() - } return case <-ctx.Done(): log.Warn("liveness exits due to context done") diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index 24a5713941..293adac943 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -24,6 +24,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/server/v3/etcdserver/api/v3client" + "go.uber.org/atomic" "go.uber.org/zap" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" @@ -194,39 +195,55 @@ func TestSessionLivenessCheck(t *testing.T) { etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints) require.NoError(t, err) s := NewSession(context.Background(), metaRoot, etcdCli) - ctx := context.Background() + s.Register() ch := make(chan struct{}) s.liveCh = ch signal := make(chan struct{}, 1) - flag := false - - s.LivenessCheck(ctx, func() { - flag = true + flag := atomic.NewBool(false) + s.LivenessCheck(context.Background(), func() { + flag.Store(true) signal <- struct{}{} }) + assert.False(t, flag.Load()) - assert.False(t, flag) + // test liveCh receive event, liveness won't exit, callback won't trigger ch <- struct{}{} + assert.False(t, flag.Load()) - assert.False(t, flag) + // test close liveCh, liveness exit, callback should trigger close(ch) - <-signal - assert.True(t, flag) + assert.True(t, flag.Load()) - ctx, cancel := context.WithCancel(ctx) - cancel() - ch = make(chan struct{}) - s.liveCh = ch - flag = false + // test context done, liveness exit, callback shouldn't trigger + metaRoot = fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) + s1 := NewSession(context.Background(), metaRoot, etcdCli) + s1.Register() + ctx, cancel := context.WithCancel(context.Background()) + flag.Store(false) - s.LivenessCheck(ctx, func() { - flag = true + s1.LivenessCheck(ctx, func() { + flag.Store(true) signal <- struct{}{} }) + cancel() + assert.False(t, flag.Load()) - assert.False(t, flag) + // test context done, liveness start failed, callback should trigger + metaRoot = fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) + s2 := NewSession(context.Background(), metaRoot, etcdCli) + s2.Register() + ctx, cancel = context.WithCancel(context.Background()) + signal = make(chan struct{}, 1) + flag.Store(false) + cancel() + s2.LivenessCheck(ctx, func() { + flag.Store(true) + signal <- struct{}{} + }) + <-signal + assert.True(t, flag.Load()) } func TestWatcherHandleWatchResp(t *testing.T) {