mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-02 01:06:41 +08:00
Exit component process when session key is deleted (#21658)
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
af8f7acb7b
commit
c333ee8d17
@ -3561,7 +3561,6 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server {
|
||||
Params.Init()
|
||||
Params.CommonCfg.DataCoordTimeTick = Params.CommonCfg.DataCoordTimeTick + strconv.Itoa(rand.Int())
|
||||
factory := dependency.NewDefaultFactory(true)
|
||||
|
||||
etcdCli, err := etcd.GetEtcdClient(
|
||||
Params.EtcdCfg.UseEmbedEtcd,
|
||||
Params.EtcdCfg.EtcdUseSSL,
|
||||
@ -3682,14 +3681,6 @@ func newTestServer2(t *testing.T, receiveCh chan any, opts ...Option) *Server {
|
||||
_, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix())
|
||||
assert.Nil(t, err)
|
||||
|
||||
icSession := sessionutil.NewSession(context.Background(), Params.EtcdCfg.MetaRootPath, etcdCli)
|
||||
icSession.Init(typeutil.IndexCoordRole, "localhost:31000", true, true)
|
||||
icSession.Register()
|
||||
|
||||
qcSession := sessionutil.NewSession(context.Background(), Params.EtcdCfg.MetaRootPath, etcdCli)
|
||||
qcSession.Init(typeutil.QueryCoordRole, "localhost:19532", true, true)
|
||||
qcSession.Register()
|
||||
|
||||
svr := CreateServer(context.TODO(), factory, opts...)
|
||||
svr.SetEtcdClient(etcdCli)
|
||||
svr.dataNodeCreator = func(ctx context.Context, addr string) (types.DataNode, error) {
|
||||
@ -3703,10 +3694,6 @@ func newTestServer2(t *testing.T, receiveCh chan any, opts ...Option) *Server {
|
||||
assert.Nil(t, err)
|
||||
err = svr.Start()
|
||||
assert.Nil(t, err)
|
||||
|
||||
_, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix())
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = svr.Register()
|
||||
assert.Nil(t, err)
|
||||
|
||||
@ -3821,6 +3808,7 @@ func Test_initServiceDiscovery(t *testing.T) {
|
||||
|
||||
func Test_newChunkManagerFactory(t *testing.T) {
|
||||
server := newTestServer2(t, nil)
|
||||
defer closeTestServer(t, server)
|
||||
Params.DataCoordCfg.EnableGarbageCollection = true
|
||||
|
||||
t.Run("err_minio_bad_address", func(t *testing.T) {
|
||||
@ -3847,6 +3835,7 @@ func Test_newChunkManagerFactory(t *testing.T) {
|
||||
|
||||
func Test_initGarbageCollection(t *testing.T) {
|
||||
server := newTestServer2(t, nil)
|
||||
defer closeTestServer(t, server)
|
||||
Params.DataCoordCfg.EnableGarbageCollection = true
|
||||
|
||||
t.Run("ok", func(t *testing.T) {
|
||||
|
||||
@ -79,9 +79,10 @@ type Session struct {
|
||||
TriggerKill bool
|
||||
Version semver.Version `json:"Version,omitempty"`
|
||||
|
||||
liveCh <-chan bool
|
||||
etcdCli *clientv3.Client
|
||||
leaseID *clientv3.LeaseID
|
||||
liveCh <-chan bool
|
||||
etcdCli *clientv3.Client
|
||||
leaseID *clientv3.LeaseID
|
||||
watchSessionKeyCh clientv3.WatchChan
|
||||
|
||||
metaRoot string
|
||||
|
||||
@ -301,6 +302,14 @@ func (s *Session) getCompleteKey() string {
|
||||
return path.Join(s.metaRoot, DefaultServiceRoot, key)
|
||||
}
|
||||
|
||||
func (s *Session) getSessionKey() string {
|
||||
key := s.ServerName
|
||||
if !s.Exclusive {
|
||||
key = fmt.Sprintf("%s-%d", key, s.ServerID)
|
||||
}
|
||||
return path.Join(s.metaRoot, DefaultServiceRoot, key)
|
||||
}
|
||||
|
||||
// registerService registers the service to etcd so that other services
|
||||
// can find that the service is online and issue subsequent operations
|
||||
// RegisterService will save a key-value in etcd
|
||||
@ -655,6 +664,11 @@ func (w *sessionWatcher) handleWatchErr(err error) error {
|
||||
// ch is the liveness signal channel, ch is closed only when the session is expired
|
||||
// callback is the function to call when ch is closed, note that callback will not be invoked when loop exits due to context
|
||||
func (s *Session) LivenessCheck(ctx context.Context, callback func()) {
|
||||
getResp, err := s.etcdCli.Get(context.Background(), s.getSessionKey())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
s.watchSessionKeyCh = s.etcdCli.Watch(context.Background(), s.getSessionKey(), clientv3.WithRev(getResp.Header.Revision))
|
||||
for {
|
||||
select {
|
||||
case _, ok := <-s.liveCh:
|
||||
@ -675,6 +689,46 @@ func (s *Session) LivenessCheck(ctx context.Context, callback func()) {
|
||||
s.keepAliveCancel()
|
||||
}
|
||||
return
|
||||
case resp, ok := <-s.watchSessionKeyCh:
|
||||
if !ok {
|
||||
log.Warn("watch session key channel closed")
|
||||
if s.keepAliveCancel != nil {
|
||||
s.keepAliveCancel()
|
||||
}
|
||||
return
|
||||
}
|
||||
if resp.Err() != nil {
|
||||
// if not ErrCompacted, just close the channel
|
||||
if resp.Err() != v3rpc.ErrCompacted {
|
||||
//close event channel
|
||||
log.Warn("Watch service found error", zap.Error(resp.Err()))
|
||||
if s.keepAliveCancel != nil {
|
||||
s.keepAliveCancel()
|
||||
}
|
||||
return
|
||||
}
|
||||
log.Warn("Watch service found compacted error", zap.Error(resp.Err()))
|
||||
getResp, err := s.etcdCli.Get(s.ctx, s.getSessionKey())
|
||||
if err != nil || len(getResp.Kvs) == 0 {
|
||||
if s.keepAliveCancel != nil {
|
||||
s.keepAliveCancel()
|
||||
}
|
||||
return
|
||||
}
|
||||
s.watchSessionKeyCh = s.etcdCli.Watch(s.ctx, s.getSessionKey(), clientv3.WithRev(getResp.Header.Revision))
|
||||
continue
|
||||
}
|
||||
for _, event := range resp.Events {
|
||||
switch event.Type {
|
||||
case mvccpb.PUT:
|
||||
log.Info("register session success", zap.String("role", s.ServerName), zap.String("key", string(event.Kv.Key)))
|
||||
case mvccpb.DELETE:
|
||||
log.Info("session key is deleted, exit...", zap.String("role", s.ServerName), zap.String("key", string(event.Kv.Key)))
|
||||
if s.keepAliveCancel != nil {
|
||||
s.keepAliveCancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -184,7 +184,17 @@ func TestUpdateSessions(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSessionLivenessCheck(t *testing.T) {
|
||||
s := &Session{}
|
||||
Params.Init()
|
||||
endpoints := Params.LoadWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints)
|
||||
metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot)
|
||||
|
||||
etcdEndpoints := strings.Split(endpoints, ",")
|
||||
etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints)
|
||||
require.NoError(t, err)
|
||||
s := &Session{
|
||||
etcdCli: etcdCli,
|
||||
metaRoot: metaRoot,
|
||||
}
|
||||
ctx := context.Background()
|
||||
ch := make(chan bool)
|
||||
s.liveCh = ch
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user