From cd29b863d04f3fba720f0b927e27b7d0164860ce Mon Sep 17 00:00:00 2001 From: yah01 Date: Thu, 6 Jul 2023 14:52:25 +0800 Subject: [PATCH] Fix data race in session (#25354) Signed-off-by: yah01 --- internal/util/sessionutil/session_util.go | 12 ++++++------ internal/util/sessionutil/session_util_test.go | 5 +---- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 8b1796d2db..ee6b537fa9 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -23,7 +23,6 @@ import ( "path" "strconv" "sync" - "sync/atomic" "time" "github.com/blang/semver/v4" @@ -31,6 +30,7 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/common" @@ -108,7 +108,7 @@ type Session struct { registered atomic.Value disconnected atomic.Value retryKeepAlive atomic.Value - enableRetryKeepAlive bool + enableRetryKeepAlive *atomic.Bool isStandby atomic.Value enableActiveStandBy bool @@ -209,7 +209,7 @@ func NewSession(ctx context.Context, metaRoot string, client *clientv3.Client, o sessionTTL: paramtable.Get().CommonCfg.SessionTTL.GetAsInt64(), sessionRetryTimes: paramtable.Get().CommonCfg.SessionRetryTimes.GetAsInt64(), reuseNodeID: true, - enableRetryKeepAlive: true, + enableRetryKeepAlive: atomic.NewBool(true), } // integration test create cluster with different nodeId in one process @@ -464,7 +464,7 @@ func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveRes case resp, ok := <-ch: if !ok { log.Warn("session keepalive channel closed") - if !s.enableRetryKeepAlive { + if !s.enableRetryKeepAlive.Load() { s.safeCloseLiveCh() return } @@ -784,7 +784,7 @@ func (s *Session) LivenessCheck(ctx context.Context, callback func()) { return case <-ctx.Done(): log.Debug("liveness exits due to context done") - s.enableRetryKeepAlive = false + s.enableRetryKeepAlive.Store(false) // cancel the etcd keepAlive context if s.keepAliveCancel != nil { s.keepAliveCancel() @@ -900,7 +900,7 @@ func (s *Session) updateStandby(b bool) { } func (s *Session) SetEnableRetryKeepAlive(enable bool) { - s.enableRetryKeepAlive = enable + s.enableRetryKeepAlive.Store(enable) } func (s *Session) isRetryingKeepAlive() bool { diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index b121b83e55..31934925eb 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -193,10 +193,7 @@ func TestSessionLivenessCheck(t *testing.T) { etcdEndpoints := strings.Split(endpoints, ",") etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints) require.NoError(t, err) - s := &Session{ - etcdCli: etcdCli, - metaRoot: metaRoot, - } + s := NewSession(context.Background(), metaRoot, etcdCli) ctx := context.Background() ch := make(chan bool) s.liveCh = ch