From ec1fe3549ef06829007aaeffc2df9eff7629ac1c Mon Sep 17 00:00:00 2001 From: jaime Date: Mon, 16 Oct 2023 10:24:10 +0800 Subject: [PATCH] Add a stop hook to clean session (#27564) Signed-off-by: jaime --- cmd/main.go | 48 ++++++- cmd/milvus/help.go | 5 + cmd/milvus/run.go | 109 ++------------- cmd/milvus/util.go | 129 ++++++++++++++++++ cmd/roles/roles.go | 37 ++++- internal/datacoord/server.go | 1 + internal/datanode/data_node.go | 1 + internal/indexnode/indexnode.go | 1 + internal/proxy/proxy.go | 1 + internal/querycoordv2/server.go | 1 + internal/querynodev2/server.go | 1 + internal/rootcoord/root_coord.go | 1 + internal/util/sessionutil/session_util.go | 76 +++++++++++ .../util/sessionutil/session_util_test.go | 38 ++++++ scripts/start_cluster.sh | 16 +-- scripts/start_standalone.sh | 2 +- scripts/stop.sh | 2 +- 17 files changed, 356 insertions(+), 113 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 875126af32..eff4fd1a95 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -17,11 +17,57 @@ package main import ( + "log" "os" + "os/exec" + "strings" + + "golang.org/x/exp/slices" "github.com/milvus-io/milvus/cmd/milvus" + "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func main() { - milvus.RunMilvus(os.Args) + idx := slices.Index(os.Args, "--run-with-subprocess") + + // execute command as a subprocess if the command contains "--run-with-subprocess" + if idx > 0 { + args := slices.Delete(os.Args, idx, idx+1) + log.Println("run subprocess with cmd:", args) + + /* #nosec G204 */ + cmd := exec.Command(args[0], args[1:]...) + + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + // No need to extra wait for the process + err := cmd.Run() + + // clean session + paramtable.Init() + params := paramtable.Get() + if len(args) >= 3 { + metaPath := params.EtcdCfg.MetaRootPath.GetValue() + endpoints := params.EtcdCfg.Endpoints.GetValue() + etcdEndpoints := strings.Split(endpoints, ",") + + sessionSuffix := sessionutil.GetSessions(cmd.Process.Pid) + defer sessionutil.RemoveServerInfoFile(cmd.Process.Pid) + + if err := milvus.CleanSession(metaPath, etcdEndpoints, sessionSuffix); err != nil { + log.Println("clean session failed", err.Error()) + } + } + + if err != nil { + log.Println("subprocess exit, ", err.Error()) + } else { + log.Println("exit code:", cmd.ProcessState.ExitCode()) + } + } else { + milvus.RunMilvus(os.Args) + } } diff --git a/cmd/milvus/help.go b/cmd/milvus/help.go index c967b82288..3cb4d7c159 100644 --- a/cmd/milvus/help.go +++ b/cmd/milvus/help.go @@ -7,6 +7,11 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) +const ( + RunCmd = "run" + RoleMixture = "mixture" +) + var ( usageLine = fmt.Sprintf("Usage:\n"+ "%s\n%s\n%s\n%s\n", runLine, stopLine, mckLine, serverTypeLine) diff --git a/cmd/milvus/run.go b/cmd/milvus/run.go index b898e2d1c1..bbb19eb88a 100644 --- a/cmd/milvus/run.go +++ b/cmd/milvus/run.go @@ -10,96 +10,29 @@ import ( "go.uber.org/zap" - "github.com/milvus-io/milvus/cmd/roles" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/metricsinfo" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) -const ( - RunCmd = "run" - roleMixture = "mixture" -) - -type run struct { - serverType string - // flags - svrAlias string - enableRootCoord bool - enableQueryCoord bool - enableDataCoord bool - enableIndexCoord bool - enableQueryNode bool - enableDataNode bool - enableIndexNode bool - enableProxy bool -} - -func (c *run) getHelp() string { - return runLine + "\n" + serverTypeLine -} +type run struct{} func (c *run) execute(args []string, flags *flag.FlagSet) { if len(args) < 3 { - fmt.Fprintln(os.Stderr, c.getHelp()) + fmt.Fprintln(os.Stderr, getHelp()) return } flags.Usage = func() { - fmt.Fprintln(os.Stderr, c.getHelp()) + fmt.Fprintln(os.Stderr, getHelp()) } - c.serverType = args[2] - c.formatFlags(args, flags) - - // make go ignore SIGPIPE when all cgo threads set mask of SIGPIPE + // make go ignore SIGPIPE when all cgo thread set mask SIGPIPE signal.Ignore(syscall.SIGPIPE) - role := roles.NewMilvusRoles() - role.Local = false - switch c.serverType { - case typeutil.RootCoordRole: - role.EnableRootCoord = true - case typeutil.ProxyRole: - role.EnableProxy = true - case typeutil.QueryCoordRole: - role.EnableQueryCoord = true - case typeutil.QueryNodeRole: - role.EnableQueryNode = true - case typeutil.DataCoordRole: - role.EnableDataCoord = true - case typeutil.DataNodeRole: - role.EnableDataNode = true - case typeutil.IndexCoordRole: - role.EnableIndexCoord = true - case typeutil.IndexNodeRole: - role.EnableIndexNode = true - case typeutil.StandaloneRole, typeutil.EmbeddedRole: - role.EnableRootCoord = true - role.EnableProxy = true - role.EnableQueryCoord = true - role.EnableQueryNode = true - role.EnableDataCoord = true - role.EnableDataNode = true - role.EnableIndexCoord = true - role.EnableIndexNode = true - role.Local = true - role.Embedded = c.serverType == typeutil.EmbeddedRole - case roleMixture: - role.EnableRootCoord = c.enableRootCoord - role.EnableQueryCoord = c.enableQueryCoord - role.EnableDataCoord = c.enableDataCoord - role.EnableIndexCoord = c.enableIndexCoord - role.EnableQueryNode = c.enableQueryNode - role.EnableDataNode = c.enableDataNode - role.EnableIndexNode = c.enableIndexNode - role.EnableProxy = c.enableProxy - default: - fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", c.serverType, c.getHelp()) - os.Exit(-1) - } + serverType := args[2] + roles := GetMilvusRoles(args, flags) + // setup config for embedded milvus - runtimeDir := createRuntimeDir(c.serverType) - filename := getPidFileName(c.serverType, c.svrAlias) + runtimeDir := createRuntimeDir(serverType) + filename := getPidFileName(serverType, roles.Alias) c.printBanner(flags.Output()) c.injectVariablesToEnv() @@ -108,29 +41,7 @@ func (c *run) execute(args []string, flags *flag.FlagSet) { panic(err) } defer removePidFile(lock) - role.Run(c.svrAlias) -} - -func (c *run) formatFlags(args []string, flags *flag.FlagSet) { - flags.StringVar(&c.svrAlias, "alias", "", "set alias") - - flags.BoolVar(&c.enableRootCoord, typeutil.RootCoordRole, false, "enable root coordinator") - flags.BoolVar(&c.enableQueryCoord, typeutil.QueryCoordRole, false, "enable query coordinator") - flags.BoolVar(&c.enableIndexCoord, typeutil.IndexCoordRole, false, "enable index coordinator") - flags.BoolVar(&c.enableDataCoord, typeutil.DataCoordRole, false, "enable data coordinator") - - flags.BoolVar(&c.enableQueryNode, typeutil.QueryNodeRole, false, "enable query node") - flags.BoolVar(&c.enableDataNode, typeutil.DataNodeRole, false, "enable data node") - flags.BoolVar(&c.enableIndexNode, typeutil.IndexNodeRole, false, "enable index node") - flags.BoolVar(&c.enableProxy, typeutil.ProxyRole, false, "enable proxy node") - - if c.serverType == typeutil.EmbeddedRole { - flags.SetOutput(io.Discard) - } - hardware.InitMaxprocs(c.serverType, flags) - if err := flags.Parse(args[3:]); err != nil { - os.Exit(-1) - } + roles.Run() } func (c *run) printBanner(w io.Writer) { diff --git a/cmd/milvus/util.go b/cmd/milvus/util.go index 1a012d5222..829745424d 100644 --- a/cmd/milvus/util.go +++ b/cmd/milvus/util.go @@ -1,15 +1,25 @@ package milvus import ( + "context" + "flag" "fmt" "io" "io/ioutil" "os" "path" "runtime" + "time" "github.com/gofrs/flock" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + "github.com/milvus-io/milvus/cmd/roles" + "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -106,3 +116,122 @@ func removePidFile(lock *flock.Flock) { lock.Close() os.Remove(filename) } + +func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles { + alias, enableRootCoord, enableQueryCoord, enableIndexCoord, enableDataCoord, enableQueryNode, + enableDataNode, enableIndexNode, enableProxy := formatFlags(args, flags) + + serverType := args[2] + role := roles.NewMilvusRoles() + role.Alias = alias + + switch serverType { + case typeutil.RootCoordRole: + role.EnableRootCoord = true + case typeutil.ProxyRole: + role.EnableProxy = true + case typeutil.QueryCoordRole: + role.EnableQueryCoord = true + case typeutil.QueryNodeRole: + role.EnableQueryNode = true + case typeutil.DataCoordRole: + role.EnableDataCoord = true + case typeutil.DataNodeRole: + role.EnableDataNode = true + case typeutil.IndexCoordRole: + role.EnableIndexCoord = true + case typeutil.IndexNodeRole: + role.EnableIndexNode = true + case typeutil.StandaloneRole, typeutil.EmbeddedRole: + role.EnableRootCoord = true + role.EnableProxy = true + role.EnableQueryCoord = true + role.EnableQueryNode = true + role.EnableDataCoord = true + role.EnableDataNode = true + role.EnableIndexCoord = true + role.EnableIndexNode = true + role.Local = true + role.Embedded = serverType == typeutil.EmbeddedRole + case RoleMixture: + role.EnableRootCoord = enableRootCoord + role.EnableQueryCoord = enableQueryCoord + role.EnableDataCoord = enableDataCoord + role.EnableIndexCoord = enableIndexCoord + role.EnableQueryNode = enableQueryNode + role.EnableDataNode = enableDataNode + role.EnableIndexNode = enableIndexNode + role.EnableProxy = enableProxy + default: + fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", serverType, getHelp()) + os.Exit(-1) + } + + return role +} + +func formatFlags(args []string, flags *flag.FlagSet) (alias string, enableRootCoord, enableQueryCoord, + enableIndexCoord, enableDataCoord, enableQueryNode, enableDataNode, enableIndexNode, enableProxy bool, +) { + flags.StringVar(&alias, "alias", "", "set alias") + + flags.BoolVar(&enableRootCoord, typeutil.RootCoordRole, false, "enable root coordinator") + flags.BoolVar(&enableQueryCoord, typeutil.QueryCoordRole, false, "enable query coordinator") + flags.BoolVar(&enableIndexCoord, typeutil.IndexCoordRole, false, "enable index coordinator") + flags.BoolVar(&enableDataCoord, typeutil.DataCoordRole, false, "enable data coordinator") + + flags.BoolVar(&enableQueryNode, typeutil.QueryNodeRole, false, "enable query node") + flags.BoolVar(&enableDataNode, typeutil.DataNodeRole, false, "enable data node") + flags.BoolVar(&enableIndexNode, typeutil.IndexNodeRole, false, "enable index node") + flags.BoolVar(&enableProxy, typeutil.ProxyRole, false, "enable proxy node") + + serverType := args[2] + if serverType == typeutil.EmbeddedRole { + flags.SetOutput(io.Discard) + } + hardware.InitMaxprocs(serverType, flags) + if err := flags.Parse(args[3:]); err != nil { + os.Exit(-1) + } + return +} + +func getHelp() string { + return runLine + "\n" + serverTypeLine +} + +func CleanSession(metaPath string, etcdEndpoints []string, sessionSuffix []string) error { + if len(sessionSuffix) == 0 { + log.Warn("not found session info , skip to clean sessions") + return nil + } + + keys := getSessionPaths(metaPath, sessionSuffix) + if len(keys) == 0 { + return nil + } + + etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints) + if err != nil { + return err + } + defer etcdCli.Close() + + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + for _, key := range keys { + _, _ = etcdCli.Delete(ctx, key, clientv3.WithPrefix()) + } + log.Info("clean sessions from etcd", zap.Any("keys", keys)) + return nil +} + +func getSessionPaths(metaPath string, sessionSuffix []string) []string { + sessionKeys := make([]string, 0) + sessionPathPrefix := path.Join(metaPath, sessionutil.DefaultServiceRoot) + for _, suffix := range sessionSuffix { + key := path.Join(sessionPathPrefix, suffix) + sessionKeys = append(sessionKeys, key) + } + return sessionKeys +} diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 4b550a7169..789a59e281 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -138,9 +138,11 @@ type MilvusRoles struct { EnableIndexNode bool `env:"ENABLE_INDEX_NODE"` Local bool + Alias string Embedded bool - closed chan struct{} - once sync.Once + + closed chan struct{} + once sync.Once } // NewMilvusRoles creates a new MilvusRoles with private fields initialized. @@ -293,7 +295,7 @@ func (mr *MilvusRoles) handleSignals() func() { } // Run Milvus components. -func (mr *MilvusRoles) Run(alias string) { +func (mr *MilvusRoles) Run() { // start signal handler, defer close func closeFn := mr.handleSignals() defer closeFn() @@ -424,3 +426,32 @@ func (mr *MilvusRoles) Run(alias string) { log.Info("Milvus components graceful stop done") } + +func (mr *MilvusRoles) GetRoles() []string { + roles := make([]string, 0) + if mr.EnableRootCoord { + roles = append(roles, typeutil.RootCoordRole) + } + if mr.EnableProxy { + roles = append(roles, typeutil.ProxyRole) + } + if mr.EnableQueryCoord { + roles = append(roles, typeutil.QueryCoordRole) + } + if mr.EnableQueryNode { + roles = append(roles, typeutil.QueryNodeRole) + } + if mr.EnableDataCoord { + roles = append(roles, typeutil.DataCoordRole) + } + if mr.EnableDataNode { + roles = append(roles, typeutil.DataNodeRole) + } + if mr.EnableIndexCoord { + roles = append(roles, typeutil.IndexCoordRole) + } + if mr.EnableIndexNode { + roles = append(roles, typeutil.IndexNodeRole) + } + return roles +} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 25e61dd12d..b199c8223b 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -401,6 +401,7 @@ func (s *Server) startDataCoord() { log.Info("DataCoord (re)starts successfully and re-collecting segment stats from DataNodes") s.reCollectSegmentStats(s.ctx) s.stateCode.Store(commonpb.StateCode_Healthy) + sessionutil.SaveServerInfo(typeutil.DataCoordRole, s.session.ServerID) } func (s *Server) initCluster() error { diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index c7b6c8e0eb..91b0c609b4 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -206,6 +206,7 @@ func (node *DataNode) initSession() error { return errors.New("failed to initialize session") } node.session.Init(typeutil.DataNodeRole, node.address, false, true) + sessionutil.SaveServerInfo(typeutil.DataNodeRole, node.session.ServerID) return nil } diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index d0d790c983..f271a23a06 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -193,6 +193,7 @@ func (i *IndexNode) initSession() error { return errors.New("failed to initialize session") } i.session.Init(typeutil.IndexNodeRole, i.address, false, true) + sessionutil.SaveServerInfo(typeutil.IndexNodeRole, i.session.ServerID) return nil } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 0ca41dfa84..760dc9d880 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -173,6 +173,7 @@ func (node *Proxy) initSession() error { return errors.New("new session failed, maybe etcd cannot be connected") } node.session.Init(typeutil.ProxyRole, node.address, false, true) + sessionutil.SaveServerInfo(typeutil.ProxyRole, node.session.ServerID) return nil } diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 7a62cf00ed..1d48f39718 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -424,6 +424,7 @@ func (s *Server) startQueryCoord() error { s.startServerLoop() s.afterStart() s.UpdateStateCode(commonpb.StateCode_Healthy) + sessionutil.SaveServerInfo(typeutil.QueryCoordRole, s.session.ServerID) return nil } diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 9fc84f8558..2c5e4d8fb3 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -156,6 +156,7 @@ func (node *QueryNode) initSession() error { return fmt.Errorf("session is nil, the etcd client connection may have failed") } node.session.Init(typeutil.QueryNodeRole, node.address, false, true) + sessionutil.SaveServerInfo(typeutil.QueryNodeRole, node.session.ServerID) paramtable.SetNodeID(node.session.ServerID) log.Info("QueryNode init session", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("node address", node.session.Address)) return nil diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 8619faa68e..e7c49eb463 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -679,6 +679,7 @@ func (c *Core) startInternal() error { c.startServerLoop() c.UpdateStateCode(commonpb.StateCode_Healthy) + sessionutil.SaveServerInfo(typeutil.RootCoordRole, c.session.ServerID) logutil.Logger(c.ctx).Info("rootcoord startup successfully") return nil diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 07ca9d9cb2..4e1e86f894 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -22,7 +22,9 @@ import ( "fmt" "os" "path" + "path/filepath" "strconv" + "strings" "sync" "time" @@ -38,6 +40,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) const ( @@ -1104,3 +1107,76 @@ func (s *Session) ForceActiveStandby(activateFunc func() error) error { } return nil } + +func filterEmptyStrings(s []string) []string { + var filtered []string + for _, str := range s { + if str != "" { + filtered = append(filtered, str) + } + } + return filtered +} + +func GetSessions(pid int) []string { + fileFullName := GetServerInfoFilePath(pid) + if _, err := os.Stat(fileFullName); errors.Is(err, os.ErrNotExist) { + log.Warn("not found server info file path", zap.String("filePath", fileFullName), zap.Error(err)) + return []string{} + } + + v, err := os.ReadFile(fileFullName) + if err != nil { + log.Warn("read server info file path failed", zap.String("filePath", fileFullName), zap.Error(err)) + return []string{} + } + + return filterEmptyStrings(strings.Split(string(v), "\n")) +} + +func RemoveServerInfoFile(pid int) { + fullPath := GetServerInfoFilePath(pid) + _ = os.Remove(fullPath) +} + +// GetServerInfoFilePath get server info file path, eg: /tmp/milvus/server_id_123456789 +// Notes: this method will not support Windows OS +// return file path +func GetServerInfoFilePath(pid int) string { + tmpDir := "/tmp/milvus" + _ = os.Mkdir(tmpDir, os.ModePerm) + fileName := fmt.Sprintf("server_id_%d", pid) + filePath := filepath.Join(tmpDir, fileName) + return filePath +} + +func saveServerInfoInternal(role string, serverID int64, pid int) { + fileFullPath := GetServerInfoFilePath(pid) + fd, err := os.OpenFile(fileFullPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o664) + if err != nil { + log.Warn("open server info file fail", zap.String("filePath", fileFullPath), zap.Error(err)) + return + } + defer fd.Close() + + data := fmt.Sprintf("%s-%d", role, serverID) + // remove active session if role is a coordinator + if role == typeutil.RootCoordRole || role == typeutil.QueryCoordRole { + data = fmt.Sprintf("%s\n%s\n", data, role) + } else if role == typeutil.DataCoordRole { + // also remove a faked indexcoord seesion if role is a datacoord coord + data = fmt.Sprintf("%s\n%s\n%s\n", data, role, typeutil.IndexCoordRole) + } else { + data = fmt.Sprintf("%s\n", data) + } + _, err = fd.WriteString(data) + if err != nil { + log.Warn("write server info file fail", zap.String("filePath", fileFullPath), zap.Error(err)) + } + + log.Info("save into server info to file", zap.String("content", data), zap.String("filePath", fileFullPath)) +} + +func SaveServerInfo(role string, serverID int64) { + saveServerInfoInternal(role, serverID, os.Getpid()) +} diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index 5f856c4230..e44bf4dc09 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) func TestGetServerIDConcurrently(t *testing.T) { @@ -699,6 +700,43 @@ func TestSessionEventType_String(t *testing.T) { } } +func TestServerInfoOp(t *testing.T) { + t.Run("test with specified pid", func(t *testing.T) { + pid := 9999999 + serverID := int64(999) + + filePath := GetServerInfoFilePath(pid) + defer os.RemoveAll(filePath) + + saveServerInfoInternal(typeutil.QueryCoordRole, serverID, pid) + saveServerInfoInternal(typeutil.DataCoordRole, serverID, pid) + saveServerInfoInternal(typeutil.ProxyRole, serverID, pid) + + sessions := GetSessions(pid) + assert.Equal(t, 6, len(sessions)) + assert.ElementsMatch(t, sessions, []string{ + "querycoord", "querycoord-999", + "datacoord", "datacoord-999", + "indexcoord", + "proxy-999", + }) + + RemoveServerInfoFile(pid) + sessions = GetSessions(pid) + assert.Equal(t, 0, len(sessions)) + }) + + t.Run("test with os pid", func(t *testing.T) { + serverID := int64(9999) + filePath := GetServerInfoFilePath(os.Getpid()) + defer os.RemoveAll(filePath) + + SaveServerInfo(typeutil.QueryCoordRole, serverID) + sessions := GetSessions(os.Getpid()) + assert.Equal(t, 2, len(sessions)) + }) +} + func TestSession_apply(t *testing.T) { session := &Session{} opts := []SessionOption{WithTTL(100), WithRetryTimes(200)} diff --git a/scripts/start_cluster.sh b/scripts/start_cluster.sh index 042dc8f2e7..0a99dad73e 100755 --- a/scripts/start_cluster.sh +++ b/scripts/start_cluster.sh @@ -27,25 +27,25 @@ if [[ "$OSTYPE" == "linux-gnu"* ]]; then fi echo "Starting rootcoord..." -nohup ./bin/milvus run rootcoord > /tmp/rootcoord.log 2>&1 & +nohup ./bin/milvus run rootcoord --run-with-subprocess > /tmp/rootcoord.log 2>&1 & echo "Starting datacoord..." -nohup ./bin/milvus run datacoord > /tmp/datacoord.log 2>&1 & +nohup ./bin/milvus run datacoord --run-with-subprocess > /tmp/datacoord.log 2>&1 & echo "Starting datanode..." -nohup ./bin/milvus run datanode > /tmp/datanode.log 2>&1 & +nohup ./bin/milvus run datanode --run-with-subprocess > /tmp/datanode.log 2>&1 & echo "Starting proxy..." -nohup ./bin/milvus run proxy > /tmp/proxy.log 2>&1 & +nohup ./bin/milvus run proxy --run-with-subprocess > /tmp/proxy.log 2>&1 & echo "Starting querycoord..." -nohup ./bin/milvus run querycoord > /tmp/querycoord.log 2>&1 & +nohup ./bin/milvus run querycoord --run-with-subprocess > /tmp/querycoord.log 2>&1 & echo "Starting querynode..." -nohup ./bin/milvus run querynode > /tmp/querynode.log 2>&1 & +nohup ./bin/milvus run querynode --run-with-subprocess > /tmp/querynode.log 2>&1 & echo "Starting indexcoord..." -nohup ./bin/milvus run indexcoord > /tmp/indexcoord.log 2>&1 & +nohup ./bin/milvus run indexcoord --run-with-subprocess > /tmp/indexcoord.log 2>&1 & echo "Starting indexnode..." -nohup ./bin/milvus run indexnode > /tmp/indexnode.log 2>&1 & +nohup ./bin/milvus run indexnode --run-with-subprocess > /tmp/indexnode.log 2>&1 & diff --git a/scripts/start_standalone.sh b/scripts/start_standalone.sh index 31732ba0d3..6dce9e6d16 100755 --- a/scripts/start_standalone.sh +++ b/scripts/start_standalone.sh @@ -27,4 +27,4 @@ if [[ "$OSTYPE" == "linux-gnu"* ]]; then fi echo "Starting standalone..." -nohup ./bin/milvus run standalone > /tmp/standalone.log 2>&1 & +nohup ./bin/milvus run standalone --run-with-subprocess > /tmp/standalone.log 2>&1 & diff --git a/scripts/stop.sh b/scripts/stop.sh index 7b6248a6cd..cdda061bef 100755 --- a/scripts/stop.sh +++ b/scripts/stop.sh @@ -15,7 +15,7 @@ # limitations under the License. echo "Stopping milvus..." -PROCESS=$(ps -e | grep milvus | grep -v grep | awk '{print $1}') +PROCESS=$(ps -e | grep milvus | grep -v grep | grep -v run-with-subprocess | awk '{print $1}') if [ -z "$PROCESS" ]; then echo "No milvus process" exit 0