From 76b06abc29ffcf4b6e4726e05cf03cf69a58270e Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 2 Nov 2023 09:00:25 +0800 Subject: [PATCH] refine stop order (#28016) (#28089) Signed-off-by: Wei Liu --- cmd/roles/roles.go | 46 +++++++++-------------- internal/distributed/rootcoord/service.go | 13 ++++--- scripts/stop_graceful.sh | 2 +- 3 files changed, 25 insertions(+), 36 deletions(-) diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 7d3d62e49a..c5b0286468 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -350,8 +350,12 @@ func (mr *MilvusRoles) Run() { rootCoord = mr.runRootCoord(ctx, local, &wg) } - if mr.EnableProxy { - proxy = mr.runProxy(ctx, local, &wg) + if mr.EnableDataCoord { + dataCoord = mr.runDataCoord(ctx, local, &wg) + } + + if mr.EnableIndexCoord { + indexCoord = mr.runIndexCoord(ctx, local, &wg) } if mr.EnableQueryCoord { @@ -362,22 +366,17 @@ func (mr *MilvusRoles) Run() { queryNode = mr.runQueryNode(ctx, local, &wg) } - if mr.EnableDataCoord { - dataCoord = mr.runDataCoord(ctx, local, &wg) - } - if mr.EnableDataNode { dataNode = mr.runDataNode(ctx, local, &wg) } - - if mr.EnableIndexCoord { - indexCoord = mr.runIndexCoord(ctx, local, &wg) - } - if mr.EnableIndexNode { indexNode = mr.runIndexNode(ctx, local, &wg) } + if mr.EnableProxy { + proxy = mr.runProxy(ctx, local, &wg) + } + wg.Wait() mr.setupLogger() @@ -389,40 +388,29 @@ func (mr *MilvusRoles) Run() { <-mr.closed // stop coordinators first - // var component - coordinators := []component{rootCoord, queryCoord, dataCoord, indexCoord} + coordinators := []component{rootCoord, dataCoord, indexCoord, queryCoord} for idx, coord := range coordinators { log.Warn("stop processing") if coord != nil { - log.Warn("stop coord", zap.Int("idx", idx), zap.Any("coord", coord)) - wg.Add(1) - go func(coord component) { - defer wg.Done() - coord.Stop() - }(coord) + log.Info("stop coord", zap.Int("idx", idx), zap.Any("coord", coord)) + coord.Stop() } } - wg.Wait() log.Info("All coordinators have stopped") // stop nodes nodes := []component{queryNode, indexNode, dataNode} - for _, node := range nodes { + for idx, node := range nodes { if node != nil { - wg.Add(1) - go func(node component) { - defer wg.Done() - node.Stop() - }(node) + log.Info("stop node", zap.Int("idx", idx), zap.Any("node", node)) + node.Stop() } } - wg.Wait() log.Info("All nodes have stopped") - // stop proxy if proxy != nil { proxy.Stop() - log.Info("proxy stopped") + log.Info("proxy stopped!") } log.Info("Milvus components graceful stop done") diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index bb994ad4ca..7e9003c5f3 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -311,6 +311,13 @@ func (s *Server) Stop() error { if s.tikvCli != nil { defer s.tikvCli.Close() } + + s.cancel() + if s.grpcServer != nil { + utils.GracefulStopGRPCServer(s.grpcServer) + } + s.wg.Wait() + if s.dataCoord != nil { if err := s.dataCoord.Close(); err != nil { log.Error("Failed to close dataCoord client", zap.Error(err)) @@ -326,12 +333,6 @@ func (s *Server) Stop() error { log.Error("Failed to close close rootCoord", zap.Error(err)) } } - log.Debug("Rootcoord begin to stop grpc server") - s.cancel() - if s.grpcServer != nil { - utils.GracefulStopGRPCServer(s.grpcServer) - } - s.wg.Wait() return nil } diff --git a/scripts/stop_graceful.sh b/scripts/stop_graceful.sh index 14ad072c7d..4cd71eb2b7 100755 --- a/scripts/stop_graceful.sh +++ b/scripts/stop_graceful.sh @@ -15,7 +15,7 @@ # limitations under the License. function get_milvus_process() { - echo $(ps -e | grep milvus | grep -v grep | awk '{print $1}') + echo $(ps -e | grep milvus | grep -v grep | grep run-with-subprocess | awk '{print $1}') } echo "Stopping milvus..."