From 5658f779fe7205efe05b4b0e956d456709771ec5 Mon Sep 17 00:00:00 2001 From: groot Date: Thu, 4 Feb 2021 18:57:21 +0800 Subject: [PATCH] Refine singlenode Signed-off-by: groot --- cmd/distributed/main.go | 286 ++-------------------------- cmd/distributed/main_test.go | 28 --- cmd/distributed/roles/roles.go | 261 +++++++++++++++++++++++++ cmd/distributed/roles/roles_test.go | 89 +++++++++ cmd/singlenode/main.go | 161 ++-------------- 5 files changed, 384 insertions(+), 441 deletions(-) delete mode 100644 cmd/distributed/main_test.go create mode 100644 cmd/distributed/roles/roles.go create mode 100644 cmd/distributed/roles/roles_test.go diff --git a/cmd/distributed/main.go b/cmd/distributed/main.go index 591940ab9f..2c00e113e8 100644 --- a/cmd/distributed/main.go +++ b/cmd/distributed/main.go @@ -1,50 +1,14 @@ package main import ( - "context" "flag" - "log" "os" - "os/signal" "strings" - "syscall" - "github.com/zilliztech/milvus-distributed/cmd/distributed/components" + "github.com/zilliztech/milvus-distributed/cmd/distributed/roles" ) -type MilvusRoles struct { - EnableMaster bool `env:"ENABLE_MASTER"` - EnableProxyService bool `env:"ENABLE_PROXY_SERVICE"` - EnableProxyNode bool `env:"ENABLE_PROXY_NODE"` - EnableQueryService bool `env:"ENABLE_QUERY_SERVICE"` - EnableQueryNode bool `env:"ENABLE_QUERY_NODE"` - EnableDataService bool `env:"ENABLE_DATA_SERVICE"` - EnableDataNode bool `env:"ENABLE_DATA_NODE"` - EnableIndexService bool `env:"ENABLE_INDEX_SERVICE"` - EnableIndexNode bool `env:"ENABLE_INDEX_NODE"` - EnableMsgStreamService bool `env:"ENABLE_MSGSTREAM_SERVICE"` -} - -func (mr *MilvusRoles) hasAnyRole() bool { - return mr.EnableMaster || mr.EnableMsgStreamService || - mr.EnableProxyService || mr.EnableProxyNode || - mr.EnableQueryService || mr.EnableQueryNode || - mr.EnableDataService || mr.EnableDataNode || - mr.EnableIndexService || mr.EnableIndexNode -} - -func (mr *MilvusRoles) envValue(env string) bool { - env = strings.ToLower(env) - env = strings.Trim(env, " ") - if env == "1" || env == "true" { - return true - } - return false -} - -func main() { - var roles MilvusRoles - +func initRoles(roles *roles.MilvusRoles) { flag.BoolVar(&roles.EnableMaster, "master-service", false, "start as master service") flag.BoolVar(&roles.EnableProxyService, "proxy-service", false, "start as proxy service") flag.BoolVar(&roles.EnableProxyNode, "proxy-node", false, "start as proxy node") @@ -57,251 +21,39 @@ func main() { flag.BoolVar(&roles.EnableMsgStreamService, "msg-stream", false, "start as msg stream service") flag.Parse() - if !roles.hasAnyRole() { + if !roles.HasAnyRole() { for _, e := range os.Environ() { pairs := strings.SplitN(e, "=", 2) if len(pairs) == 2 { switch pairs[0] { case "ENABLE_MASTER": - roles.EnableMaster = roles.envValue(pairs[1]) + roles.EnableMaster = roles.EnvValue(pairs[1]) case "ENABLE_PROXY_SERVICE": - roles.EnableProxyService = roles.envValue(pairs[1]) + roles.EnableProxyService = roles.EnvValue(pairs[1]) case "ENABLE_PROXY_NODE": - roles.EnableProxyNode = roles.envValue(pairs[1]) + roles.EnableProxyNode = roles.EnvValue(pairs[1]) case "ENABLE_QUERY_SERVICE": - roles.EnableQueryService = roles.envValue(pairs[1]) + roles.EnableQueryService = roles.EnvValue(pairs[1]) case "ENABLE_QUERY_NODE": - roles.EnableQueryNode = roles.envValue(pairs[1]) + roles.EnableQueryNode = roles.EnvValue(pairs[1]) case "ENABLE_DATA_SERVICE": - roles.EnableDataService = roles.envValue(pairs[1]) + roles.EnableDataService = roles.EnvValue(pairs[1]) case "ENABLE_DATA_NODE": - roles.EnableDataNode = roles.envValue(pairs[1]) + roles.EnableDataNode = roles.EnvValue(pairs[1]) case "ENABLE_INDEX_SERVICE": - roles.EnableIndexService = roles.envValue(pairs[1]) + roles.EnableIndexService = roles.EnvValue(pairs[1]) case "ENABLE_INDEX_NODE": - roles.EnableIndexNode = roles.envValue(pairs[1]) + roles.EnableIndexNode = roles.EnvValue(pairs[1]) case "ENABLE_MSGSTREAM_SERVICE": - roles.EnableMsgStreamService = roles.envValue(pairs[1]) + roles.EnableMsgStreamService = roles.EnvValue(pairs[1]) } } } } - - if !roles.hasAnyRole() { - log.Printf("set the roles please ...") - return - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var masterService *components.MasterService - if roles.EnableMaster { - log.Print("start as master service") - go func() { - var err error - masterService, err = components.NewMasterService(ctx) - if err != nil { - panic(err) - } - _ = masterService.Run() - }() - } - - var proxyService *components.ProxyService - if roles.EnableProxyService { - log.Print("start as proxy service") - go func() { - var err error - proxyService, err = components.NewProxyService(ctx) - if err != nil { - panic(err) - } - _ = proxyService.Run() - }() - } - - var proxyNode *components.ProxyNode - if roles.EnableProxyNode { - log.Print("start as proxy node") - go func() { - var err error - proxyNode, err = components.NewProxyNode(ctx) - if err != nil { - panic(err) - } - _ = proxyNode.Run() - }() - } - - var queryService *components.QueryService - if roles.EnableQueryService { - log.Print("start as query service") - go func() { - var err error - queryService, err = components.NewQueryService(ctx) - if err != nil { - panic(err) - } - _ = queryService.Run() - }() - } - - var queryNode *components.QueryNode - if roles.EnableQueryNode { - log.Print("start as query node") - go func() { - var err error - queryNode, err = components.NewQueryNode(ctx) - if err != nil { - panic(err) - } - _ = queryNode.Run() - }() - } - - var dataService *components.DataService - if roles.EnableDataService { - log.Print("start as data service") - go func() { - var err error - dataService, err = components.NewDataService(ctx) - if err != nil { - panic(err) - } - _ = dataService.Run() - }() - } - - var dataNode *components.DataNode - if roles.EnableDataNode { - log.Print("start as data node") - go func() { - var err error - dataNode, err = components.NewDataNode(ctx) - if err != nil { - panic(err) - } - _ = dataNode.Run() - }() - } - - var indexService *components.IndexService - if roles.EnableIndexService { - log.Print("start as index service") - go func() { - var err error - indexService, err = components.NewIndexService(ctx) - if err != nil { - panic(err) - } - _ = indexService.Run() - }() - } - - var indexNode *components.IndexNode - if roles.EnableIndexNode { - log.Print("start as index node") - go func() { - var err error - indexNode, err = components.NewIndexNode(ctx) - if err != nil { - panic(err) - } - _ = indexNode.Run() - }() - } - - var msgStream *components.MsgStream - if roles.EnableMsgStreamService { - log.Print("start as msg stream service") - go func() { - var err error - msgStream, err = components.NewMsgStreamService(ctx) - if err != nil { - panic(err) - } - _ = msgStream.Run() - }() - } - - sc := make(chan os.Signal, 1) - signal.Notify(sc, - syscall.SIGHUP, - syscall.SIGINT, - syscall.SIGTERM, - syscall.SIGQUIT) - sig := <-sc - log.Printf("Get %s signal to exit", sig.String()) - - if roles.EnableMaster { - if masterService != nil { - _ = masterService.Stop() - } - log.Printf("exit master service") - } - - if roles.EnableProxyService { - if proxyService != nil { - _ = proxyService.Stop() - } - log.Printf("exit proxy service") - } - - if roles.EnableProxyNode { - if proxyNode != nil { - _ = proxyNode.Stop() - } - log.Printf("exit proxy node") - } - - if roles.EnableQueryService { - if queryService != nil { - _ = queryService.Stop() - } - log.Printf("exit query service") - } - - if roles.EnableQueryNode { - if queryNode != nil { - _ = queryNode.Stop() - } - log.Printf("exit query node") - } - - if roles.EnableDataService { - if dataService != nil { - _ = dataService.Stop() - } - log.Printf("exit data service") - } - - if roles.EnableDataNode { - if dataNode != nil { - _ = dataNode.Stop() - } - log.Printf("exit data node") - } - - if roles.EnableIndexService { - if indexService != nil { - _ = indexService.Stop() - } - log.Printf("exit index service") - } - - if roles.EnableIndexNode { - if indexNode != nil { - _ = indexNode.Stop() - } - log.Printf("exit index node") - } - - if roles.EnableMsgStreamService { - if msgStream != nil { - _ = msgStream.Stop() - } - log.Printf("exit msg stream service") - } - +} + +func main() { + var roles roles.MilvusRoles + initRoles(&roles) + roles.Run() } diff --git a/cmd/distributed/main_test.go b/cmd/distributed/main_test.go deleted file mode 100644 index 928e7d1188..0000000000 --- a/cmd/distributed/main_test.go +++ /dev/null @@ -1,28 +0,0 @@ -package main - -import ( - "strings" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestRoles(t *testing.T) { - r := MilvusRoles{} - - assert.True(t, r.envValue("1")) - assert.True(t, r.envValue(" 1 ")) - assert.True(t, r.envValue("True")) - assert.True(t, r.envValue(" True ")) - assert.True(t, r.envValue(" TRue ")) - assert.False(t, r.envValue("0")) - assert.False(t, r.envValue(" 0 ")) - assert.False(t, r.envValue(" false ")) - assert.False(t, r.envValue(" False ")) - assert.False(t, r.envValue(" abc ")) - - ss := strings.SplitN("abcdef", "=", 2) - assert.Equal(t, len(ss), 1) - ss = strings.SplitN("adb=def", "=", 2) - assert.Equal(t, len(ss), 2) -} diff --git a/cmd/distributed/roles/roles.go b/cmd/distributed/roles/roles.go new file mode 100644 index 0000000000..52838f02a1 --- /dev/null +++ b/cmd/distributed/roles/roles.go @@ -0,0 +1,261 @@ +package roles + +import ( + "context" + "log" + "os" + "os/signal" + "strings" + "syscall" + + "github.com/zilliztech/milvus-distributed/cmd/distributed/components" +) + +type MilvusRoles struct { + EnableMaster bool `env:"ENABLE_MASTER"` + EnableProxyService bool `env:"ENABLE_PROXY_SERVICE"` + EnableProxyNode bool `env:"ENABLE_PROXY_NODE"` + EnableQueryService bool `env:"ENABLE_QUERY_SERVICE"` + EnableQueryNode bool `env:"ENABLE_QUERY_NODE"` + EnableDataService bool `env:"ENABLE_DATA_SERVICE"` + EnableDataNode bool `env:"ENABLE_DATA_NODE"` + EnableIndexService bool `env:"ENABLE_INDEX_SERVICE"` + EnableIndexNode bool `env:"ENABLE_INDEX_NODE"` + EnableMsgStreamService bool `env:"ENABLE_MSGSTREAM_SERVICE"` +} + +func (mr *MilvusRoles) HasAnyRole() bool { + return mr.EnableMaster || mr.EnableMsgStreamService || + mr.EnableProxyService || mr.EnableProxyNode || + mr.EnableQueryService || mr.EnableQueryNode || + mr.EnableDataService || mr.EnableDataNode || + mr.EnableIndexService || mr.EnableIndexNode +} + +func (mr *MilvusRoles) EnvValue(env string) bool { + env = strings.ToLower(env) + env = strings.Trim(env, " ") + if env == "1" || env == "true" { + return true + } + return false +} + +func (mr *MilvusRoles) Run() { + if !mr.HasAnyRole() { + log.Printf("set the roles please ...") + return + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var masterService *components.MasterService + if mr.EnableMaster { + log.Print("start as master service") + go func() { + var err error + masterService, err = components.NewMasterService(ctx) + if err != nil { + panic(err) + } + _ = masterService.Run() + }() + } + + var proxyService *components.ProxyService + if mr.EnableProxyService { + log.Print("start as proxy service") + go func() { + var err error + proxyService, err = components.NewProxyService(ctx) + if err != nil { + panic(err) + } + _ = proxyService.Run() + }() + } + + var proxyNode *components.ProxyNode + if mr.EnableProxyNode { + log.Print("start as proxy node") + go func() { + var err error + proxyNode, err = components.NewProxyNode(ctx) + if err != nil { + panic(err) + } + _ = proxyNode.Run() + }() + } + + var queryService *components.QueryService + if mr.EnableQueryService { + log.Print("start as query service") + go func() { + var err error + queryService, err = components.NewQueryService(ctx) + if err != nil { + panic(err) + } + _ = queryService.Run() + }() + } + + var queryNode *components.QueryNode + if mr.EnableQueryNode { + log.Print("start as query node") + go func() { + var err error + queryNode, err = components.NewQueryNode(ctx) + if err != nil { + panic(err) + } + _ = queryNode.Run() + }() + } + + var dataService *components.DataService + if mr.EnableDataService { + log.Print("start as data service") + go func() { + var err error + dataService, err = components.NewDataService(ctx) + if err != nil { + panic(err) + } + _ = dataService.Run() + }() + } + + var dataNode *components.DataNode + if mr.EnableDataNode { + log.Print("start as data node") + go func() { + var err error + dataNode, err = components.NewDataNode(ctx) + if err != nil { + panic(err) + } + _ = dataNode.Run() + }() + } + + var indexService *components.IndexService + if mr.EnableIndexService { + log.Print("start as index service") + go func() { + var err error + indexService, err = components.NewIndexService(ctx) + if err != nil { + panic(err) + } + _ = indexService.Run() + }() + } + + var indexNode *components.IndexNode + if mr.EnableIndexNode { + log.Print("start as index node") + go func() { + var err error + indexNode, err = components.NewIndexNode(ctx) + if err != nil { + panic(err) + } + _ = indexNode.Run() + }() + } + + var msgStream *components.MsgStream + if mr.EnableMsgStreamService { + log.Print("start as msg stream service") + go func() { + var err error + msgStream, err = components.NewMsgStreamService(ctx) + if err != nil { + panic(err) + } + _ = msgStream.Run() + }() + } + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + sig := <-sc + log.Printf("Get %s signal to exit", sig.String()) + + if mr.EnableMaster { + if masterService != nil { + _ = masterService.Stop() + } + log.Printf("exit master service") + } + + if mr.EnableProxyService { + if proxyService != nil { + _ = proxyService.Stop() + } + log.Printf("exit proxy service") + } + + if mr.EnableProxyNode { + if proxyNode != nil { + _ = proxyNode.Stop() + } + log.Printf("exit proxy node") + } + + if mr.EnableQueryService { + if queryService != nil { + _ = queryService.Stop() + } + log.Printf("exit query service") + } + + if mr.EnableQueryNode { + if queryNode != nil { + _ = queryNode.Stop() + } + log.Printf("exit query node") + } + + if mr.EnableDataService { + if dataService != nil { + _ = dataService.Stop() + } + log.Printf("exit data service") + } + + if mr.EnableDataNode { + if dataNode != nil { + _ = dataNode.Stop() + } + log.Printf("exit data node") + } + + if mr.EnableIndexService { + if indexService != nil { + _ = indexService.Stop() + } + log.Printf("exit index service") + } + + if mr.EnableIndexNode { + if indexNode != nil { + _ = indexNode.Stop() + } + log.Printf("exit index node") + } + + if mr.EnableMsgStreamService { + if msgStream != nil { + _ = msgStream.Stop() + } + log.Printf("exit msg stream service") + } +} diff --git a/cmd/distributed/roles/roles_test.go b/cmd/distributed/roles/roles_test.go new file mode 100644 index 0000000000..d6c90db296 --- /dev/null +++ b/cmd/distributed/roles/roles_test.go @@ -0,0 +1,89 @@ +package roles + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRoles(t *testing.T) { + r := MilvusRoles{} + assert.False(t, r.HasAnyRole()) + + assert.True(t, r.EnvValue("1")) + assert.True(t, r.EnvValue(" 1 ")) + assert.True(t, r.EnvValue("True")) + assert.True(t, r.EnvValue(" True ")) + assert.True(t, r.EnvValue(" TRue ")) + assert.False(t, r.EnvValue("0")) + assert.False(t, r.EnvValue(" 0 ")) + assert.False(t, r.EnvValue(" false ")) + assert.False(t, r.EnvValue(" False ")) + assert.False(t, r.EnvValue(" abc ")) + + ss := strings.SplitN("abcdef", "=", 2) + assert.Equal(t, len(ss), 1) + ss = strings.SplitN("adb=def", "=", 2) + assert.Equal(t, len(ss), 2) + + { + var roles MilvusRoles + roles.EnableMaster = true + assert.True(t, roles.HasAnyRole()) + } + + { + var roles MilvusRoles + roles.EnableProxyService = true + assert.True(t, roles.HasAnyRole()) + } + + { + var roles MilvusRoles + roles.EnableProxyNode = true + assert.True(t, roles.HasAnyRole()) + } + + { + var roles MilvusRoles + roles.EnableQueryService = true + assert.True(t, roles.HasAnyRole()) + } + + { + var roles MilvusRoles + roles.EnableQueryNode = true + assert.True(t, roles.HasAnyRole()) + } + + { + var roles MilvusRoles + roles.EnableDataService = true + assert.True(t, roles.HasAnyRole()) + } + + { + var roles MilvusRoles + roles.EnableDataNode = true + assert.True(t, roles.HasAnyRole()) + } + + { + var roles MilvusRoles + roles.EnableIndexService = true + assert.True(t, roles.HasAnyRole()) + } + + { + var roles MilvusRoles + roles.EnableIndexNode = true + assert.True(t, roles.HasAnyRole()) + } + + { + var roles MilvusRoles + roles.EnableMsgStreamService = true + assert.True(t, roles.HasAnyRole()) + } +} diff --git a/cmd/singlenode/main.go b/cmd/singlenode/main.go index 24c9137a1b..40265fd141 100644 --- a/cmd/singlenode/main.go +++ b/cmd/singlenode/main.go @@ -1,155 +1,24 @@ package main import ( - "context" - "flag" - "fmt" - "log" - "os" - "os/signal" - "sync" - "syscall" - "time" - - "go.uber.org/zap" - - "github.com/zilliztech/milvus-distributed/internal/indexnode" - "github.com/zilliztech/milvus-distributed/internal/proxynode" - "github.com/zilliztech/milvus-distributed/internal/querynode" + "github.com/zilliztech/milvus-distributed/cmd/distributed/roles" ) -func InitProxy(wg *sync.WaitGroup) { - defer wg.Done() - //proxynode.Init() - //fmt.Println("ProxyID is", proxynode.Params.ProxyID()) - ctx, cancel := context.WithCancel(context.Background()) - svr, err := proxynode.NewProxyNodeImpl(ctx) - if err != nil { - log.Print("create server failed", zap.Error(err)) - } - - sc := make(chan os.Signal, 1) - signal.Notify(sc, - syscall.SIGHUP, - syscall.SIGINT, - syscall.SIGTERM, - syscall.SIGQUIT) - - var sig os.Signal - go func() { - sig = <-sc - cancel() - }() - - if err := svr.Init(); err != nil { - log.Fatal("init server failed", zap.Error(err)) - } - - if err := svr.Start(); err != nil { - log.Fatal("run server failed", zap.Error(err)) - } - - <-ctx.Done() - log.Print("Got signal to exit", zap.String("signal", sig.String())) - - svr.Stop() - switch sig { - case syscall.SIGTERM: - exit(0) - default: - exit(1) - } -} - -func InitQueryNode(wg *sync.WaitGroup) { - defer wg.Done() - querynode.Init() - fmt.Println("QueryNodeID is", querynode.Params.QueryNodeID) - // Creates server. - ctx, cancel := context.WithCancel(context.Background()) - svr := querynode.NewQueryNode(ctx, 0) - - sc := make(chan os.Signal, 1) - signal.Notify(sc, - syscall.SIGHUP, - syscall.SIGINT, - syscall.SIGTERM, - syscall.SIGQUIT) - - var sig os.Signal - go func() { - sig = <-sc - cancel() - }() - - svr.Start() - - <-ctx.Done() - log.Print("Got signal to exit", zap.String("signal", sig.String())) - - svr.Stop() - switch sig { - case syscall.SIGTERM: - exit(0) - default: - exit(1) - } -} - -func InitIndexBuilder(wg *sync.WaitGroup) { - defer wg.Done() - ctx, cancel := context.WithCancel(context.Background()) - svr, err := indexnode.NewNodeImpl(ctx) - if err != nil { - log.Print("create server failed", zap.Error(err)) - } - - sc := make(chan os.Signal, 1) - signal.Notify(sc, - syscall.SIGHUP, - syscall.SIGINT, - syscall.SIGTERM, - syscall.SIGQUIT) - - var sig os.Signal - go func() { - sig = <-sc - cancel() - }() - - if err := svr.Init(); err != nil { - log.Fatal("init builder server failed", zap.Error(err)) - } - - if err := svr.Start(); err != nil { - log.Fatal("run builder server failed", zap.Error(err)) - } - - <-ctx.Done() - log.Print("Got signal to exit", zap.String("signal", sig.String())) - - svr.Stop() - switch sig { - case syscall.SIGTERM: - exit(0) - default: - exit(1) - } +func initRoles(roles *roles.MilvusRoles) { + roles.EnableMaster = true + roles.EnableProxyService = true + roles.EnableProxyNode = true + roles.EnableQueryService = true + roles.EnableQueryNode = true + roles.EnableDataService = true + roles.EnableDataNode = true + roles.EnableIndexService = true + roles.EnableIndexNode = true + roles.EnableMsgStreamService = true } func main() { - var wg sync.WaitGroup - flag.Parse() - time.Sleep(time.Second * 1) - wg.Add(1) - go InitProxy(&wg) - wg.Add(1) - go InitQueryNode(&wg) - wg.Add(1) - go InitIndexBuilder(&wg) - wg.Wait() -} - -func exit(code int) { - os.Exit(code) + var roles roles.MilvusRoles + initRoles(&roles) + roles.Run() }