From 5edbb82610b106f36cc62f7c388620db06ab70fb Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 16 Nov 2021 22:31:14 +0800 Subject: [PATCH] Add session revoke (#11908) Signed-off-by: Congqi Xia --- internal/datacoord/server.go | 1 + internal/datanode/data_node.go | 2 ++ internal/indexcoord/index_coord.go | 1 + internal/indexnode/indexnode.go | 1 + internal/proxy/proxy.go | 2 ++ internal/querycoord/query_coord.go | 1 + internal/querynode/query_node.go | 1 + internal/rootcoord/root_coord.go | 2 ++ internal/util/sessionutil/session_util.go | 19 ++++++++-- .../util/sessionutil/session_util_test.go | 36 +++++++++++++++++++ 10 files changed, 64 insertions(+), 2 deletions(-) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 6b38a89340..ae16f33c55 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -711,6 +711,7 @@ func (s *Server) Stop() error { s.cluster.Close() s.garbageCollector.close() s.stopServerLoop() + s.session.Revoke(time.Second) if Params.EnableCompaction { s.stopCompactionTrigger() diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 5169b7f6ab..496e80ecf7 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -639,6 +639,8 @@ func (node *DataNode) Stop() error { return err } } + + node.session.Revoke(time.Second) return nil } diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index c54bbe3a8e..68143db0c8 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -285,6 +285,7 @@ func (i *IndexCoord) Stop() error { for _, cb := range i.closeCallbacks { cb() } + i.session.Revoke(time.Second) return nil } diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index cd1810ba97..924f1b5fc1 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -217,6 +217,7 @@ func (i *IndexNode) Stop() error { for _, cb := range i.closeCallbacks { cb() } + i.session.Revoke(time.Second) log.Debug("Index node stopped.") return nil } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 4cbf869fae..3bd6f1df43 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -367,6 +367,8 @@ func (node *Proxy) Stop() error { cb() } + node.session.Revoke(time.Second) + return nil } diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 5539a5c75c..260da72fa8 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -208,6 +208,7 @@ func (qc *QueryCoord) Stop() error { qc.UpdateStateCode(internalpb.StateCode_Abnormal) qc.loopWg.Wait() + qc.session.Revoke(time.Second) return nil } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 774d6d389f..a13675b2d2 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -298,6 +298,7 @@ func (node *QueryNode) Stop() error { if node.statsService != nil { node.statsService.close() } + node.session.Revoke(time.Second) return nil } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 53e6f74c4d..f480c63b45 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1170,6 +1170,8 @@ func (c *Core) Stop() error { c.cancel() c.wg.Wait() c.stateCode.Store(internalpb.StateCode_Abnormal) + // wait at most one second to revoke + c.session.Revoke(time.Second) return nil } diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index d1b70a42ee..616ecdece5 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -55,7 +55,7 @@ type Session struct { liveCh <-chan bool etcdCli *clientv3.Client - leaseID clientv3.LeaseID + leaseID *clientv3.LeaseID metaRoot string } @@ -189,7 +189,7 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er log.Error("register service", zap.Error(err)) return err } - s.leaseID = resp.ID + s.leaseID = &resp.ID sessionJSON, err := json.Marshal(s) if err != nil { @@ -388,3 +388,18 @@ func (s *Session) LivenessCheck(ctx context.Context, callback func()) { } } } + +// Revoke revokes the internal leaseID for the session key +func (s *Session) Revoke(timeout time.Duration) { + if s == nil { + return + } + if s.etcdCli == nil || s.leaseID == nil { + return + } + // can NOT use s.ctx, it may be Done here + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + // ignores resp & error, just do best effort to revoke + _, _ = s.etcdCli.Revoke(ctx, *s.leaseID) +} diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index 4d4c36d219..1595cb0055 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -202,3 +202,39 @@ func TestSessionLivenessCheck(t *testing.T) { assert.False(t, flag) } + +func TestSessionRevoke(t *testing.T) { + s := &Session{} + assert.NotPanics(t, func() { + s.Revoke(time.Second) + }) + + s = (*Session)(nil) + assert.NotPanics(t, func() { + s.Revoke(time.Second) + }) + + ctx := context.Background() + Params.Init() + + endpoints, err := Params.Load("_EtcdEndpoints") + if err != nil { + panic(err) + } + metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) + + etcdEndpoints := strings.Split(endpoints, ",") + etcdKV, err := etcdkv.NewEtcdKV(etcdEndpoints, metaRoot) + assert.NoError(t, err) + err = etcdKV.RemoveWithPrefix("") + assert.NoError(t, err) + + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") + + s = NewSession(ctx, metaRoot, etcdEndpoints) + s.Init("revoketest", "testAddr", false) + assert.NotPanics(t, func() { + s.Revoke(time.Second) + }) +}