From db79be3ae009ecbc88470fdc051d7cfae678b681 Mon Sep 17 00:00:00 2001 From: jaime Date: Fri, 15 Mar 2024 10:33:05 +0800 Subject: [PATCH] fix: ctx cancel should be the last step while stopping server (#31220) issue: #31219 Signed-off-by: jaime --- internal/datacoord/server.go | 6 +++--- internal/datanode/data_node.go | 3 +-- internal/distributed/datacoord/service.go | 14 ++++++------- internal/distributed/datanode/service.go | 16 ++++++++------- internal/distributed/datanode/service_test.go | 7 ------- internal/distributed/indexnode/service.go | 16 +++++++++------ internal/distributed/proxy/service.go | 2 ++ internal/distributed/querycoord/service.go | 17 +++++++++++----- internal/distributed/querynode/service.go | 13 +++++++----- internal/distributed/rootcoord/service.go | 14 +++++++------ internal/indexnode/indexnode.go | 2 +- internal/querycoordv2/server.go | 6 +----- internal/querynodev2/server.go | 6 ++++-- internal/rootcoord/root_coord.go | 7 ++++--- internal/util/sessionutil/session_util.go | 20 +++++++++++++++++-- 15 files changed, 88 insertions(+), 61 deletions(-) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 99b4c76ada..6a8503258e 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -1126,9 +1126,6 @@ func (s *Server) Stop() error { s.cluster.Close() logutil.Logger(s.ctx).Info("datacoord cluster stopped") - s.stopServerLoop() - logutil.Logger(s.ctx).Info("datacoord serverloop stopped") - if s.session != nil { s.session.Stop() } @@ -1136,6 +1133,9 @@ func (s *Server) Stop() error { if s.icSession != nil { s.icSession.Stop() } + + s.stopServerLoop() + logutil.Logger(s.ctx).Info("datacoord serverloop stopped") logutil.Logger(s.ctx).Warn("datacoord stop successful") return nil diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 2654fdaeb9..5942af8078 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -426,8 +426,6 @@ func (node *DataNode) Stop() error { node.stopOnce.Do(func() { // https://github.com/milvus-io/milvus/issues/12282 node.UpdateStateCode(commonpb.StateCode_Abnormal) - // Delay the cancellation of ctx to ensure that the session is automatically recycled after closed the flow graph - node.cancel() node.eventManager.CloseAll() @@ -460,6 +458,7 @@ func (node *DataNode) Stop() error { node.importManager.Close() } + node.cancel() node.stopWaiter.Wait() }) return nil diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index ca8582674a..71e6695e88 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -61,7 +61,7 @@ type Server struct { serverID atomic.Int64 - wg sync.WaitGroup + grpcWG sync.WaitGroup dataCoord types.DataCoordComponent etcdCli *clientv3.Client @@ -135,7 +135,7 @@ func (s *Server) init() error { func (s *Server) startGrpc() error { Params := ¶mtable.Get().DataCoordGrpcServerCfg - s.wg.Add(1) + s.grpcWG.Add(1) go s.startGrpcLoop(Params.Port.GetAsInt()) // wait for grpc server loop start err := <-s.grpcErrChan @@ -144,7 +144,7 @@ func (s *Server) startGrpc() error { func (s *Server) startGrpcLoop(grpcPort int) { defer logutil.LogPanic() - defer s.wg.Done() + defer s.grpcWG.Done() Params := ¶mtable.Get().DataCoordGrpcServerCfg log.Debug("network port", zap.Int("port", grpcPort)) @@ -229,8 +229,6 @@ func (s *Server) Stop() (err error) { logger.Info("Datacoord stopped", zap.Error(err)) }() - s.cancel() - if s.etcdCli != nil { defer s.etcdCli.Close() } @@ -240,14 +238,16 @@ func (s *Server) Stop() (err error) { if s.grpcServer != nil { utils.GracefulStopGRPCServer(s.grpcServer) } + s.grpcWG.Wait() + logger.Info("internal server[dataCoord] start to stop") err = s.dataCoord.Stop() if err != nil { + log.Error("failed to close dataCoord", zap.Error(err)) return err } - s.wg.Wait() - + s.cancel() return nil } diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index c76b4c1ec8..299d7b6bdf 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -57,7 +57,7 @@ import ( type Server struct { datanode types.DataNodeComponent - wg sync.WaitGroup + grpcWG sync.WaitGroup grpcErrChan chan error grpcServer *grpc.Server ctx context.Context @@ -97,7 +97,7 @@ func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error) func (s *Server) startGrpc() error { Params := ¶mtable.Get().DataNodeGrpcServerCfg - s.wg.Add(1) + s.grpcWG.Add(1) go s.startGrpcLoop(Params.Port.GetAsInt()) // wait for grpc server loop start err := <-s.grpcErrChan @@ -106,7 +106,7 @@ func (s *Server) startGrpc() error { // startGrpcLoop starts the grep loop of datanode component. func (s *Server) startGrpcLoop(grpcPort int) { - defer s.wg.Done() + defer s.grpcWG.Done() Params := ¶mtable.Get().DataNodeGrpcServerCfg kaep := keepalive.EnforcementPolicy{ MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection @@ -202,24 +202,26 @@ func (s *Server) Run() error { func (s *Server) Stop() (err error) { Params := ¶mtable.Get().DataNodeGrpcServerCfg logger := log.With(zap.String("address", Params.GetAddress())) - logger.Info("Datanode stopping") + logger.Info("datanode stopping") defer func() { - logger.Info("Datanode stopped", zap.Error(err)) + logger.Info("datanode stopped", zap.Error(err)) }() - s.cancel() if s.etcdCli != nil { defer s.etcdCli.Close() } if s.grpcServer != nil { utils.GracefulStopGRPCServer(s.grpcServer) } + s.grpcWG.Wait() + logger.Info("internal server[datanode] start to stop") err = s.datanode.Stop() if err != nil { + log.Error("failed to close datanode", zap.Error(err)) return err } - s.wg.Wait() + s.cancel() return nil } diff --git a/internal/distributed/datanode/service_test.go b/internal/distributed/datanode/service_test.go index 124bde6b25..f173f096aa 100644 --- a/internal/distributed/datanode/service_test.go +++ b/internal/distributed/datanode/service_test.go @@ -405,11 +405,4 @@ func Test_Run(t *testing.T) { err = server.Run() assert.Error(t, err) - - server.datanode = &MockDataNode{ - stopErr: errors.New("error"), - } - - err = server.Stop() - assert.Error(t, err) } diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 252e0d944d..c315d52cfc 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -61,7 +61,7 @@ type Server struct { loopCtx context.Context loopCancel func() - loopWg sync.WaitGroup + grpcWG sync.WaitGroup etcdCli *clientv3.Client } @@ -81,7 +81,7 @@ func (s *Server) Run() error { // startGrpcLoop starts the grep loop of IndexNode component. func (s *Server) startGrpcLoop(grpcPort int) { - defer s.loopWg.Done() + defer s.grpcWG.Done() Params := ¶mtable.Get().IndexNodeGrpcServerCfg log.Debug("IndexNode", zap.String("network address", Params.GetAddress()), zap.Int("network port: ", grpcPort)) @@ -159,7 +159,7 @@ func (s *Server) init() error { } }() - s.loopWg.Add(1) + s.grpcWG.Add(1) go s.startGrpcLoop(Params.Port.GetAsInt()) // wait for grpc server loop start err = <-s.grpcErrChan @@ -220,17 +220,21 @@ func (s *Server) Stop() (err error) { }() if s.indexnode != nil { - s.indexnode.Stop() + err := s.indexnode.Stop() + if err != nil { + log.Error("failed to close indexnode", zap.Error(err)) + return err + } } - s.loopCancel() if s.etcdCli != nil { defer s.etcdCli.Close() } if s.grpcServer != nil { utils.GracefulStopGRPCServer(s.grpcServer) } - s.loopWg.Wait() + s.grpcWG.Wait() + s.loopCancel() return nil } diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index feda94aaa0..efac546d39 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -745,8 +745,10 @@ func (s *Server) Stop() (err error) { s.wg.Wait() + logger.Info("internal server[proxy] start to stop") err = s.proxy.Stop() if err != nil { + log.Error("failed to close proxy", zap.Error(err)) return err } diff --git a/internal/distributed/querycoord/service.go b/internal/distributed/querycoord/service.go index 96eedce5ed..573438c55c 100644 --- a/internal/distributed/querycoord/service.go +++ b/internal/distributed/querycoord/service.go @@ -57,7 +57,7 @@ import ( // Server is the grpc server of QueryCoord. type Server struct { - wg sync.WaitGroup + grpcWG sync.WaitGroup loopCtx context.Context loopCancel context.CancelFunc grpcServer *grpc.Server @@ -147,7 +147,7 @@ func (s *Server) init() error { log.Info("Connected to tikv. Using tikv as metadata storage.") } - s.wg.Add(1) + s.grpcWG.Add(1) go s.startGrpcLoop(rpcParams.Port.GetAsInt()) // wait for grpc server loop start err = <-s.grpcErrChan @@ -204,7 +204,7 @@ func (s *Server) init() error { } func (s *Server) startGrpcLoop(grpcPort int) { - defer s.wg.Done() + defer s.grpcWG.Done() Params := ¶mtable.Get().QueryCoordGrpcServerCfg kaep := keepalive.EnforcementPolicy{ MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection @@ -283,11 +283,18 @@ func (s *Server) Stop() (err error) { if s.etcdCli != nil { defer s.etcdCli.Close() } - s.loopCancel() + if s.grpcServer != nil { utils.GracefulStopGRPCServer(s.grpcServer) } - return s.queryCoord.Stop() + s.grpcWG.Wait() + + logger.Info("internal server[queryCoord] start to stop") + if err := s.queryCoord.Stop(); err != nil { + log.Error("failed to close queryCoord", zap.Error(err)) + } + s.loopCancel() + return nil } // SetRootCoord sets root coordinator's client diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 40c8f32f3c..e53d07ad91 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -58,7 +58,7 @@ type UniqueID = typeutil.UniqueID // Server is the grpc server of QueryNode. type Server struct { querynode types.QueryNodeComponent - wg sync.WaitGroup + grpcWG sync.WaitGroup ctx context.Context cancel context.CancelFunc grpcErrChan chan error @@ -118,7 +118,7 @@ func (s *Server) init() error { s.SetEtcdClient(etcdCli) s.querynode.SetAddress(Params.GetAddress()) log.Debug("QueryNode connect to etcd successfully") - s.wg.Add(1) + s.grpcWG.Add(1) go s.startGrpcLoop(Params.Port.GetAsInt()) // wait for grpc server loop start err = <-s.grpcErrChan @@ -152,7 +152,7 @@ func (s *Server) start() error { // startGrpcLoop starts the grpc loop of QueryNode component. func (s *Server) startGrpcLoop(grpcPort int) { - defer s.wg.Done() + defer s.grpcWG.Done() Params := ¶mtable.Get().QueryNodeGrpcServerCfg kaep := keepalive.EnforcementPolicy{ MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection @@ -245,19 +245,22 @@ func (s *Server) Stop() (err error) { logger.Info("QueryNode stopped", zap.Error(err)) }() + logger.Info("internal server[querynode] start to stop") err = s.querynode.Stop() if err != nil { + log.Error("failed to close querynode", zap.Error(err)) return err } if s.etcdCli != nil { defer s.etcdCli.Close() } - s.cancel() if s.grpcServer != nil { utils.GracefulStopGRPCServer(s.grpcServer) } - s.wg.Wait() + s.grpcWG.Wait() + + s.cancel() return nil } diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index b43c918325..36b7b2bfc3 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -61,7 +61,7 @@ type Server struct { grpcServer *grpc.Server grpcErrChan chan error - wg sync.WaitGroup + grpcWG sync.WaitGroup ctx context.Context cancel context.CancelFunc @@ -234,7 +234,7 @@ func (s *Server) init() error { } func (s *Server) startGrpc(port int) error { - s.wg.Add(1) + s.grpcWG.Add(1) go s.startGrpcLoop(port) // wait for grpc server loop start err := <-s.grpcErrChan @@ -242,7 +242,7 @@ func (s *Server) startGrpc(port int) error { } func (s *Server) startGrpcLoop(port int) { - defer s.wg.Done() + defer s.grpcWG.Done() Params := ¶mtable.Get().RootCoordGrpcServerCfg kaep := keepalive.EnforcementPolicy{ MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection @@ -330,11 +330,10 @@ func (s *Server) Stop() (err error) { defer s.tikvCli.Close() } - s.cancel() if s.grpcServer != nil { utils.GracefulStopGRPCServer(s.grpcServer) } - s.wg.Wait() + s.grpcWG.Wait() if s.dataCoord != nil { if err := s.dataCoord.Close(); err != nil { @@ -347,10 +346,13 @@ func (s *Server) Stop() (err error) { } } if s.rootCoord != nil { + logger.Info("internal server[rootCoord] start to stop") if err := s.rootCoord.Stop(); err != nil { - log.Error("Failed to close close rootCoord", zap.Error(err)) + log.Error("Failed to close rootCoord", zap.Error(err)) } } + + s.cancel() return nil } diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index bc45edb46e..2dc7ab7580 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -257,7 +257,6 @@ func (i *IndexNode) Stop() error { task.cancel() } } - i.loopCancel() if i.sched != nil { i.sched.Close() } @@ -266,6 +265,7 @@ func (i *IndexNode) Stop() error { } i.CloseSegcore() + i.loopCancel() log.Info("Index node stopped.") }) return nil diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 8807702db6..f843fb294e 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -451,11 +451,6 @@ func (s *Server) startServerLoop() { } func (s *Server) Stop() error { - // stop the components from outside to inside, - // to make the dependencies stopped working properly, - // cancel the server context first to stop receiving requests - s.cancel() - // FOLLOW the dependence graph: // job scheduler -> checker controller -> task scheduler -> dist controller -> cluster -> session // observers -> dist controller @@ -503,6 +498,7 @@ func (s *Server) Stop() error { s.session.Stop() } + s.cancel() s.wg.Wait() log.Info("QueryCoord stop successfully") return nil diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 99fc5e6552..7caf5ef99a 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -476,8 +476,7 @@ func (node *QueryNode) Stop() error { if node.pipelineManager != nil { node.pipelineManager.Close() } - // Delay the cancellation of ctx to ensure that the session is automatically recycled after closed the pipeline - node.cancel() + if node.session != nil { node.session.Stop() } @@ -489,6 +488,9 @@ func (node *QueryNode) Stop() error { } node.CloseSegcore() + + // Delay the cancellation of ctx to ensure that the session is automatically recycled after closed the pipeline + node.cancel() }) return nil } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 8b8627dfed..0284584045 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -772,7 +772,7 @@ func (c *Core) revokeSession() { if c.session != nil { // wait at most one second to revoke c.session.Stop() - log.Info("revoke rootcoord session") + log.Info("rootcoord session stop") } } @@ -784,12 +784,13 @@ func (c *Core) Stop() error { if c.proxyWatcher != nil { c.proxyWatcher.Stop() } - c.cancelIfNotNil() if c.quotaCenter != nil { c.quotaCenter.stop() } - c.wg.Wait() + c.revokeSession() + c.cancelIfNotNil() + c.wg.Wait() return nil } diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 2e6fcad26a..0fa78c10ad 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -151,6 +151,8 @@ type Session struct { sessionTTL int64 sessionRetryTimes int64 reuseNodeID bool + + isStopped atomic.Bool // set to true if stop method is invoked } type SessionOption func(session *Session) @@ -239,6 +241,7 @@ func NewSessionWithEtcd(ctx context.Context, metaRoot string, client *clientv3.C sessionTTL: paramtable.Get().CommonCfg.SessionTTL.GetAsInt64(), sessionRetryTimes: paramtable.Get().CommonCfg.SessionRetryTimes.GetAsInt64(), reuseNodeID: true, + isStopped: *atomic.NewBool(false), } // integration test create cluster with different nodeId in one process @@ -861,7 +864,8 @@ func (s *Session) LivenessCheck(ctx context.Context, callback func()) { if callback != nil { // before exit liveness check, callback to exit the session owner defer func() { - if ctx.Err() == nil { + // the callback method will not be invoked if session is stopped. + if ctx.Err() == nil && !s.isStopped.Load() { go callback() } }() @@ -940,6 +944,7 @@ func (s *Session) deleteSession() bool { } func (s *Session) Stop() { + s.isStopped.Store(true) s.Revoke(time.Second) s.cancelKeepAlive() s.deleteSession() @@ -951,17 +956,28 @@ func (s *Session) Revoke(timeout time.Duration) { if s == nil { return } + log.Info("start to revoke session", zap.String("sessionKey", s.activeKey)) if s.etcdCli == nil || s.LeaseID == nil { + log.Warn("skip remove session", + zap.String("sessionKey", s.activeKey), + zap.Bool("etcdCliIsNil", s.etcdCli == nil), + zap.Bool("LeaseIDIsNil", s.LeaseID == nil), + ) return } if s.Disconnected() { + log.Warn("skip remove session, connection is disconnected", zap.String("sessionKey", s.activeKey)) return } // can NOT use s.ctx, it may be Done here ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() // ignores resp & error, just do best effort to revoke - _, _ = s.etcdCli.Revoke(ctx, *s.LeaseID) + _, err := s.etcdCli.Revoke(ctx, *s.LeaseID) + if err != nil { + log.Warn("failed to revoke session", zap.String("sessionKey", s.activeKey), zap.Error(err)) + } + log.Info("revoke session successfully", zap.String("sessionKey", s.activeKey)) } // UpdateRegistered update the state of registered.