mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
Add session revoke (#11908)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
7908045a28
commit
5edbb82610
@ -711,6 +711,7 @@ func (s *Server) Stop() error {
|
|||||||
s.cluster.Close()
|
s.cluster.Close()
|
||||||
s.garbageCollector.close()
|
s.garbageCollector.close()
|
||||||
s.stopServerLoop()
|
s.stopServerLoop()
|
||||||
|
s.session.Revoke(time.Second)
|
||||||
|
|
||||||
if Params.EnableCompaction {
|
if Params.EnableCompaction {
|
||||||
s.stopCompactionTrigger()
|
s.stopCompactionTrigger()
|
||||||
|
|||||||
@ -639,6 +639,8 @@ func (node *DataNode) Stop() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
node.session.Revoke(time.Second)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -285,6 +285,7 @@ func (i *IndexCoord) Stop() error {
|
|||||||
for _, cb := range i.closeCallbacks {
|
for _, cb := range i.closeCallbacks {
|
||||||
cb()
|
cb()
|
||||||
}
|
}
|
||||||
|
i.session.Revoke(time.Second)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -217,6 +217,7 @@ func (i *IndexNode) Stop() error {
|
|||||||
for _, cb := range i.closeCallbacks {
|
for _, cb := range i.closeCallbacks {
|
||||||
cb()
|
cb()
|
||||||
}
|
}
|
||||||
|
i.session.Revoke(time.Second)
|
||||||
log.Debug("Index node stopped.")
|
log.Debug("Index node stopped.")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -367,6 +367,8 @@ func (node *Proxy) Stop() error {
|
|||||||
cb()
|
cb()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
node.session.Revoke(time.Second)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -208,6 +208,7 @@ func (qc *QueryCoord) Stop() error {
|
|||||||
qc.UpdateStateCode(internalpb.StateCode_Abnormal)
|
qc.UpdateStateCode(internalpb.StateCode_Abnormal)
|
||||||
|
|
||||||
qc.loopWg.Wait()
|
qc.loopWg.Wait()
|
||||||
|
qc.session.Revoke(time.Second)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -298,6 +298,7 @@ func (node *QueryNode) Stop() error {
|
|||||||
if node.statsService != nil {
|
if node.statsService != nil {
|
||||||
node.statsService.close()
|
node.statsService.close()
|
||||||
}
|
}
|
||||||
|
node.session.Revoke(time.Second)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1170,6 +1170,8 @@ func (c *Core) Stop() error {
|
|||||||
c.cancel()
|
c.cancel()
|
||||||
c.wg.Wait()
|
c.wg.Wait()
|
||||||
c.stateCode.Store(internalpb.StateCode_Abnormal)
|
c.stateCode.Store(internalpb.StateCode_Abnormal)
|
||||||
|
// wait at most one second to revoke
|
||||||
|
c.session.Revoke(time.Second)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -55,7 +55,7 @@ type Session struct {
|
|||||||
|
|
||||||
liveCh <-chan bool
|
liveCh <-chan bool
|
||||||
etcdCli *clientv3.Client
|
etcdCli *clientv3.Client
|
||||||
leaseID clientv3.LeaseID
|
leaseID *clientv3.LeaseID
|
||||||
|
|
||||||
metaRoot string
|
metaRoot string
|
||||||
}
|
}
|
||||||
@ -189,7 +189,7 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er
|
|||||||
log.Error("register service", zap.Error(err))
|
log.Error("register service", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.leaseID = resp.ID
|
s.leaseID = &resp.ID
|
||||||
|
|
||||||
sessionJSON, err := json.Marshal(s)
|
sessionJSON, err := json.Marshal(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -388,3 +388,18 @@ func (s *Session) LivenessCheck(ctx context.Context, callback func()) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Revoke revokes the internal leaseID for the session key
|
||||||
|
func (s *Session) Revoke(timeout time.Duration) {
|
||||||
|
if s == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s.etcdCli == nil || s.leaseID == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// can NOT use s.ctx, it may be Done here
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||||
|
defer cancel()
|
||||||
|
// ignores resp & error, just do best effort to revoke
|
||||||
|
_, _ = s.etcdCli.Revoke(ctx, *s.leaseID)
|
||||||
|
}
|
||||||
|
|||||||
@ -202,3 +202,39 @@ func TestSessionLivenessCheck(t *testing.T) {
|
|||||||
|
|
||||||
assert.False(t, flag)
|
assert.False(t, flag)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSessionRevoke(t *testing.T) {
|
||||||
|
s := &Session{}
|
||||||
|
assert.NotPanics(t, func() {
|
||||||
|
s.Revoke(time.Second)
|
||||||
|
})
|
||||||
|
|
||||||
|
s = (*Session)(nil)
|
||||||
|
assert.NotPanics(t, func() {
|
||||||
|
s.Revoke(time.Second)
|
||||||
|
})
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
Params.Init()
|
||||||
|
|
||||||
|
endpoints, err := Params.Load("_EtcdEndpoints")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot)
|
||||||
|
|
||||||
|
etcdEndpoints := strings.Split(endpoints, ",")
|
||||||
|
etcdKV, err := etcdkv.NewEtcdKV(etcdEndpoints, metaRoot)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = etcdKV.RemoveWithPrefix("")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
defer etcdKV.Close()
|
||||||
|
defer etcdKV.RemoveWithPrefix("")
|
||||||
|
|
||||||
|
s = NewSession(ctx, metaRoot, etcdEndpoints)
|
||||||
|
s.Init("revoketest", "testAddr", false)
|
||||||
|
assert.NotPanics(t, func() {
|
||||||
|
s.Revoke(time.Second)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user