Fix panic while stopping the cluster (#26580)

Signed-off-by: yah01 <yah2er0ne@outlook.com>
This commit is contained in:
yah01 2023-08-23 19:08:23 +08:00 committed by GitHub
parent f625e3beb5
commit 4a181c1635
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 4 deletions

View File

@ -27,6 +27,7 @@ import (
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/sessionutil"
@ -38,6 +39,7 @@ import (
type proxyManager struct { type proxyManager struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
wg errgroup.Group
lock sync.Mutex lock sync.Mutex
etcdCli *clientv3.Client etcdCli *clientv3.Client
initSessionsFunc []func([]*sessionutil.Session) initSessionsFunc []func([]*sessionutil.Session)
@ -49,10 +51,10 @@ type proxyManager struct {
// etcdEndpoints is the address list of etcd // etcdEndpoints is the address list of etcd
// fns are the custom getSessions function list // fns are the custom getSessions function list
func newProxyManager(ctx context.Context, client *clientv3.Client, fns ...func([]*sessionutil.Session)) *proxyManager { func newProxyManager(ctx context.Context, client *clientv3.Client, fns ...func([]*sessionutil.Session)) *proxyManager {
ctx2, cancel2 := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
p := &proxyManager{ p := &proxyManager{
ctx: ctx2, ctx: ctx,
cancel: cancel2, cancel: cancel,
lock: sync.Mutex{}, lock: sync.Mutex{},
etcdCli: client, etcdCli: client,
} }
@ -97,7 +99,11 @@ func (p *proxyManager) WatchProxy() error {
clientv3.WithPrevKV(), clientv3.WithPrevKV(),
clientv3.WithRev(rev+1), clientv3.WithRev(rev+1),
) )
go p.startWatchEtcd(p.ctx, eventCh)
p.wg.Go(func() error {
p.startWatchEtcd(p.ctx, eventCh)
return nil
})
return nil return nil
} }
@ -203,4 +209,5 @@ func (p *proxyManager) getSessionsOnEtcd(ctx context.Context) ([]*sessionutil.Se
// Stop stops the proxyManager // Stop stops the proxyManager
func (p *proxyManager) Stop() { func (p *proxyManager) Stop() {
p.cancel() p.cancel()
p.wg.Wait()
} }

View File

@ -714,6 +714,9 @@ func (c *Core) Stop() error {
c.UpdateStateCode(commonpb.StateCode_Abnormal) c.UpdateStateCode(commonpb.StateCode_Abnormal)
c.stopExecutor() c.stopExecutor()
c.stopScheduler() c.stopScheduler()
if c.proxyManager != nil {
c.proxyManager.Stop()
}
c.cancelIfNotNil() c.cancelIfNotNil()
if c.quotaCenter != nil { if c.quotaCenter != nil {
c.quotaCenter.stop() c.quotaCenter.stop()