diff --git a/internal/util/proxyutil/proxy_watcher.go b/internal/util/proxyutil/proxy_watcher.go index 0cd81bada6..25a916b396 100644 --- a/internal/util/proxyutil/proxy_watcher.go +++ b/internal/util/proxyutil/proxy_watcher.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -53,7 +54,7 @@ type ProxyWatcher struct { addSessionsFunc []func(*sessionutil.Session) delSessionsFunc []func(*sessionutil.Session) - closeCh chan struct{} + closeCh lifetime.SafeChan } // NewProxyWatcher helper function to create a proxyWatcher @@ -62,7 +63,7 @@ func NewProxyWatcher(client *clientv3.Client, fns ...func([]*sessionutil.Session p := &ProxyWatcher{ lock: sync.Mutex{}, etcdCli: client, - closeCh: make(chan struct{}), + closeCh: lifetime.NewSafeChan(), } p.initSessionsFunc = append(p.initSessionsFunc, fns...) return p @@ -121,7 +122,7 @@ func (p *ProxyWatcher) startWatchEtcd(ctx context.Context, eventCh clientv3.Watc log.Warn("stop watching etcd loop") return - case <-p.closeCh: + case <-p.closeCh.CloseCh(): log.Warn("stop watching etcd loop") return @@ -218,6 +219,6 @@ func (p *ProxyWatcher) getSessionsOnEtcd(ctx context.Context) ([]*sessionutil.Se // Stop stops the ProxyManager func (p *ProxyWatcher) Stop() { - close(p.closeCh) + p.closeCh.Close() p.wg.Wait() }