From 4a181c1635090d1f58d9b0a3d95fdca8b3cc1e6d Mon Sep 17 00:00:00 2001 From: yah01 Date: Wed, 23 Aug 2023 19:08:23 +0800 Subject: [PATCH] Fix panic while stopping the cluster (#26580) Signed-off-by: yah01 --- internal/rootcoord/proxy_manager.go | 15 +++++++++++---- internal/rootcoord/root_coord.go | 3 +++ 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/internal/rootcoord/proxy_manager.go b/internal/rootcoord/proxy_manager.go index 9521930397..8ded5da923 100644 --- a/internal/rootcoord/proxy_manager.go +++ b/internal/rootcoord/proxy_manager.go @@ -27,6 +27,7 @@ import ( v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" + "golang.org/x/sync/errgroup" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/util/sessionutil" @@ -38,6 +39,7 @@ import ( type proxyManager struct { ctx context.Context cancel context.CancelFunc + wg errgroup.Group lock sync.Mutex etcdCli *clientv3.Client initSessionsFunc []func([]*sessionutil.Session) @@ -49,10 +51,10 @@ type proxyManager struct { // etcdEndpoints is the address list of etcd // fns are the custom getSessions function list func newProxyManager(ctx context.Context, client *clientv3.Client, fns ...func([]*sessionutil.Session)) *proxyManager { - ctx2, cancel2 := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(ctx) p := &proxyManager{ - ctx: ctx2, - cancel: cancel2, + ctx: ctx, + cancel: cancel, lock: sync.Mutex{}, etcdCli: client, } @@ -97,7 +99,11 @@ func (p *proxyManager) WatchProxy() error { clientv3.WithPrevKV(), clientv3.WithRev(rev+1), ) - go p.startWatchEtcd(p.ctx, eventCh) + + p.wg.Go(func() error { + p.startWatchEtcd(p.ctx, eventCh) + return nil + }) return nil } @@ -203,4 +209,5 @@ func (p *proxyManager) getSessionsOnEtcd(ctx context.Context) ([]*sessionutil.Se // Stop stops the proxyManager func (p *proxyManager) Stop() { p.cancel() + p.wg.Wait() } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 911251968c..9e1f740ba1 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -714,6 +714,9 @@ func (c *Core) Stop() error { c.UpdateStateCode(commonpb.StateCode_Abnormal) c.stopExecutor() c.stopScheduler() + if c.proxyManager != nil { + c.proxyManager.Stop() + } c.cancelIfNotNil() if c.quotaCenter != nil { c.quotaCenter.stop()