From c0f054870286cc7912f353bfe42a4befa5124d22 Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 16 Jan 2024 17:34:54 +0800 Subject: [PATCH] fix: use SafeChan preventing close channel multiple times (#30022) See also #29935 Signed-off-by: Congqi Xia --- internal/util/proxyutil/proxy_watcher.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/internal/util/proxyutil/proxy_watcher.go b/internal/util/proxyutil/proxy_watcher.go index 0cd81bada6..25a916b396 100644 --- a/internal/util/proxyutil/proxy_watcher.go +++ b/internal/util/proxyutil/proxy_watcher.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -53,7 +54,7 @@ type ProxyWatcher struct { addSessionsFunc []func(*sessionutil.Session) delSessionsFunc []func(*sessionutil.Session) - closeCh chan struct{} + closeCh lifetime.SafeChan } // NewProxyWatcher helper function to create a proxyWatcher @@ -62,7 +63,7 @@ func NewProxyWatcher(client *clientv3.Client, fns ...func([]*sessionutil.Session p := &ProxyWatcher{ lock: sync.Mutex{}, etcdCli: client, - closeCh: make(chan struct{}), + closeCh: lifetime.NewSafeChan(), } p.initSessionsFunc = append(p.initSessionsFunc, fns...) return p @@ -121,7 +122,7 @@ func (p *ProxyWatcher) startWatchEtcd(ctx context.Context, eventCh clientv3.Watc log.Warn("stop watching etcd loop") return - case <-p.closeCh: + case <-p.closeCh.CloseCh(): log.Warn("stop watching etcd loop") return @@ -218,6 +219,6 @@ func (p *ProxyWatcher) getSessionsOnEtcd(ctx context.Context) ([]*sessionutil.Se // Stop stops the ProxyManager func (p *ProxyWatcher) Stop() { - close(p.closeCh) + p.closeCh.Close() p.wg.Wait() }