optimize WatchProxy (#6153)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
This commit is contained in:
Cai Yudong 2021-06-28 11:04:14 +08:00 committed by GitHub
parent effcbc7cb3
commit a9a806ace0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -77,36 +77,36 @@ func (p *proxyManager) WatchProxy() error {
if err != nil {
return fmt.Errorf("proxyManager, watch proxy failed, error = %w", err)
}
sessions := []*sessionutil.Session{}
for _, v := range resp.Kvs {
sess := new(sessionutil.Session)
err := json.Unmarshal(v.Value, sess)
if err != nil {
log.Debug("unmarshal SvrSession failed", zap.Error(err))
continue
}
sessions = append(sessions, sess)
}
for _, f := range p.getSessions {
f(sessions)
}
for _, s := range sessions {
metrics.RootCoordProxyLister.WithLabelValues(metricProxy(s.ServerID)).Set(1)
}
for _, s := range sessions {
log.Debug("Get proxy", zap.Int64("id", s.ServerID), zap.String("addr", s.Address), zap.String("name", s.ServerName))
}
rch := p.etcdCli.Watch(
p.ctx,
path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot, typeutil.ProxyRole),
clientv3.WithPrefix(),
clientv3.WithCreatedNotify(),
clientv3.WithPrevKV(),
clientv3.WithRev(resp.Header.Revision+1),
)
go func() {
sessions := []*sessionutil.Session{}
for _, v := range resp.Kvs {
sess := new(sessionutil.Session)
err := json.Unmarshal(v.Value, sess)
if err != nil {
log.Debug("unmarshal SvrSession failed", zap.Error(err))
continue
}
sessions = append(sessions, sess)
}
for _, f := range p.getSessions {
f(sessions)
}
for _, s := range sessions {
metrics.RootCoordProxyLister.WithLabelValues(metricProxy(s.ServerID)).Set(1)
}
for _, s := range sessions {
log.Debug("Get proxy", zap.Int64("id", s.ServerID), zap.String("addr", s.Address), zap.String("name", s.ServerName))
}
rch := p.etcdCli.Watch(
p.ctx,
path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot, typeutil.ProxyRole),
clientv3.WithPrefix(),
clientv3.WithCreatedNotify(),
clientv3.WithPrevKV(),
clientv3.WithRev(resp.Header.Revision+1),
)
for {
select {
case <-p.ctx.Done():