From f74ea3beeebe868f7e5c02d5bcd035b201febaeb Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Fri, 11 Jun 2021 13:24:11 +0800 Subject: [PATCH] [skip ci] Remove ProxyService (#5738) * [skip ci] Remove ProxyService Signed-off-by: zhenshan.cao * Change helm branch Signed-off-by: zhenshan.cao * Fix bug: paramstable crashed Signed-off-by: zhenshan.cao --- Makefile | 36 -- build/ci/jenkins/Jenkinsfile | 1 + build/ci/jenkins/NightlyCI.groovy | 1 + cmd/distributed/components/proxy_service.go | 48 --- cmd/distributed/main.go | 7 +- cmd/distributed/roles/roles.go | 26 -- cmd/standalone/main.go | 1 - .../docker/distributed/docker-compose.yml | 16 - .../masterservice/masterservice_test.go | 3 - .../distributed/masterservice/param_table.go | 10 - .../masterservice/param_table_test.go | 3 - internal/distributed/proxynode/paramtable.go | 17 - internal/distributed/proxynode/service.go | 11 - .../distributed/proxyservice/client/client.go | 100 ----- .../distributed/proxyservice/paramtable.go | 52 --- internal/distributed/proxyservice/service.go | 186 --------- internal/masterservice/timestamp_test.go | 18 - internal/metrics/metrics.go | 5 - internal/metrics/metrics_test.go | 1 - internal/proto/proxy_service.proto | 19 - internal/proto/proxypb/proxy_service.pb.go | 371 ++---------------- internal/proxynode/paramtable.go | 78 +--- internal/proxynode/proxy_node.go | 38 +- internal/proxyservice/OWNERS | 15 - internal/proxyservice/impl.go | 304 -------------- internal/proxyservice/node_info.go | 140 ------- internal/proxyservice/node_info_test.go | 86 ---- internal/proxyservice/nodeid_allocator.go | 47 --- .../proxyservice/nodeid_allocator_test.go | 29 -- internal/proxyservice/paramtable.go | 144 ------- internal/proxyservice/paramtable_test.go | 31 -- internal/proxyservice/proxyservice.go | 60 --- internal/proxyservice/task.go | 222 ----------- internal/proxyservice/task_mock.go | 101 ----- internal/proxyservice/task_queue.go | 103 ----- internal/proxyservice/task_queue_test.go | 160 -------- internal/proxyservice/task_scheduler.go | 143 ------- internal/proxyservice/task_scheduler_test.go | 111 ------ internal/proxyservice/timetick.go | 101 ----- internal/proxyservice/timetick_test.go | 119 ------ internal/types/types.go | 8 - internal/util/typeutil/type.go | 1 - scripts/run_docker.sh | 3 - scripts/start.sh | 3 - 44 files changed, 38 insertions(+), 2941 deletions(-) delete mode 100644 cmd/distributed/components/proxy_service.go delete mode 100644 internal/distributed/proxyservice/client/client.go delete mode 100644 internal/distributed/proxyservice/paramtable.go delete mode 100644 internal/distributed/proxyservice/service.go delete mode 100644 internal/proxyservice/OWNERS delete mode 100644 internal/proxyservice/impl.go delete mode 100644 internal/proxyservice/node_info.go delete mode 100644 internal/proxyservice/node_info_test.go delete mode 100644 internal/proxyservice/nodeid_allocator.go delete mode 100644 internal/proxyservice/nodeid_allocator_test.go delete mode 100644 internal/proxyservice/paramtable.go delete mode 100644 internal/proxyservice/paramtable_test.go delete mode 100644 internal/proxyservice/proxyservice.go delete mode 100644 internal/proxyservice/task.go delete mode 100644 internal/proxyservice/task_mock.go delete mode 100644 internal/proxyservice/task_queue.go delete mode 100644 internal/proxyservice/task_queue_test.go delete mode 100644 internal/proxyservice/task_scheduler.go delete mode 100644 internal/proxyservice/task_scheduler_test.go delete mode 100644 internal/proxyservice/timetick.go delete mode 100644 internal/proxyservice/timetick_test.go diff --git a/Makefile b/Makefile index fb0eac1b7b..1acd9dda5d 100644 --- a/Makefile +++ b/Makefile @@ -83,42 +83,6 @@ binlog: @echo "Building binlog ..." @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/binlog $(PWD)/cmd/binlog/main.go 1>/dev/null -master: - @echo "Building master ..." - @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/masterservice $(PWD)/cmd/masterservice/main.go 1>/dev/null - -proxyservice: - @echo "Building proxyservice ..." - @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/proxyservice $(PWD)/cmd/proxy/service/proxy_service.go 1>/dev/null - -proxynode: - @echo "Building proxynode ..." - @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/proxynode $(PWD)/cmd/proxy/node/proxy_node.go 1>/dev/null - -queryservice: - @echo "Building queryservice ..." - @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/queryservice $(PWD)/cmd/queryservice/queryservice.go 1>/dev/null - -querynode: - @echo "Building querynode ..." - @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/querynode.go 1>/dev/null - -dataservice: - @echo "Building dataservice ..." - @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/dataservice $(PWD)/cmd/dataservice/main.go 1>/dev/null - -datanode: - @echo "Building datanode ..." - @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/datanode $(PWD)/cmd/datanode/main.go 1>/dev/null - -indexservice: build-cpp - @echo "Building indexservice ..." - @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/indexservice $(PWD)/cmd/indexservice/main.go 1>/dev/null - -indexnode: build-cpp - @echo "Building indexnode ..." - @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/indexnode $(PWD)/cmd/indexnode/main.go 1>/dev/null - standalone: build-cpp @echo "Building Milvus standalone ..." @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/standalone $(PWD)/cmd/standalone/main.go 1>/dev/null diff --git a/build/ci/jenkins/Jenkinsfile b/build/ci/jenkins/Jenkinsfile index 0d83574690..a1b87ec451 100644 --- a/build/ci/jenkins/Jenkinsfile +++ b/build/ci/jenkins/Jenkinsfile @@ -34,6 +34,7 @@ pipeline { IMAGE_REPO = "dockerhub-mirror-sh.zilliz.cc/milvusdb" DOCKER_BUILDKIT = 1 ARTIFACTS = "${env.WORKSPACE}/artifacts" + MILVUS_HELM_BRANCH = "recovery" } stages { stage('Test') { diff --git a/build/ci/jenkins/NightlyCI.groovy b/build/ci/jenkins/NightlyCI.groovy index e700cb8422..7ee469497b 100644 --- a/build/ci/jenkins/NightlyCI.groovy +++ b/build/ci/jenkins/NightlyCI.groovy @@ -44,6 +44,7 @@ pipeline { DOCKER_CREDENTIALS_ID = "ba070c98-c8cc-4f7c-b657-897715f359fc" DOKCER_REGISTRY_URL = "registry.zilliz.com" TARGET_REPO = "${DOKCER_REGISTRY_URL}/milvus" + MILVUS_HELM_BRANCH = "recovery" } stages { stage('Test') { diff --git a/cmd/distributed/components/proxy_service.go b/cmd/distributed/components/proxy_service.go deleted file mode 100644 index b0513c4576..0000000000 --- a/cmd/distributed/components/proxy_service.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package components - -import ( - "context" - - grpcproxyservice "github.com/milvus-io/milvus/internal/distributed/proxyservice" - "github.com/milvus-io/milvus/internal/msgstream" -) - -type ProxyService struct { - svr *grpcproxyservice.Server -} - -func NewProxyService(ctx context.Context, factory msgstream.Factory) (*ProxyService, error) { - var err error - service := &ProxyService{} - svr, err := grpcproxyservice.NewServer(ctx, factory) - if err != nil { - return nil, err - } - service.svr = svr - return service, nil -} - -func (s *ProxyService) Run() error { - if err := s.svr.Run(); err != nil { - return err - } - return nil -} - -func (s *ProxyService) Stop() error { - if err := s.svr.Stop(); err != nil { - return err - } - return nil -} diff --git a/cmd/distributed/main.go b/cmd/distributed/main.go index 7be1d850cd..ef174cd529 100644 --- a/cmd/distributed/main.go +++ b/cmd/distributed/main.go @@ -24,7 +24,6 @@ import ( const ( roleMaster = "master" - roleProxyService = "proxyservice" roleQueryService = "queryservice" roleIndexService = "indexservice" roleDataService = "dataservice" @@ -136,9 +135,8 @@ func main() { var svrAlias string flags.StringVar(&svrAlias, "alias", "", "set alias") - var enableMaster, enableProxyService, enableQueryService, enableIndexService, enableDataService bool + var enableMaster, enableQueryService, enableIndexService, enableDataService bool flags.BoolVar(&enableMaster, roleMaster, false, "enable master") - flags.BoolVar(&enableProxyService, roleProxyService, false, "enable proxy service") flags.BoolVar(&enableQueryService, roleQueryService, false, "enable query service") flags.BoolVar(&enableIndexService, roleIndexService, false, "enable index service") flags.BoolVar(&enableDataService, roleDataService, false, "enable data service") @@ -151,8 +149,6 @@ func main() { switch serverType { case roleMaster: role.EnableMaster = true - case roleProxyService: - role.EnableProxyService = true case roleProxyNode: role.EnableProxyNode = true case roleQueryService: @@ -169,7 +165,6 @@ func main() { role.EnableIndexNode = true case roleMixture: role.EnableMaster = enableMaster - role.EnableProxyService = enableProxyService role.EnableQueryService = enableQueryService role.EnableDataService = enableDataService role.EnableIndexService = enableIndexService diff --git a/cmd/distributed/roles/roles.go b/cmd/distributed/roles/roles.go index ee7199518e..8b443f1000 100644 --- a/cmd/distributed/roles/roles.go +++ b/cmd/distributed/roles/roles.go @@ -27,7 +27,6 @@ import ( "github.com/milvus-io/milvus/internal/masterservice" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proxynode" - "github.com/milvus-io/milvus/internal/proxyservice" "github.com/milvus-io/milvus/internal/querynode" "github.com/milvus-io/milvus/internal/queryservice" @@ -46,7 +45,6 @@ func newMsgFactory(localMsg bool) msgstream.Factory { type MilvusRoles struct { EnableMaster bool `env:"ENABLE_MASTER"` - EnableProxyService bool `env:"ENABLE_PROXY_SERVICE"` EnableProxyNode bool `env:"ENABLE_PROXY_NODE"` EnableQueryService bool `env:"ENABLE_QUERY_SERVICE"` EnableQueryNode bool `env:"ENABLE_QUERY_NODE"` @@ -98,30 +96,6 @@ func (mr *MilvusRoles) Run(localMsg bool) { metrics.RegisterMaster() } - if mr.EnableProxyService { - var ps *components.ProxyService - - go func() { - proxyservice.Params.Init() - logutil.SetupLogger(&proxyservice.Params.Log) - defer log.Sync() - - factory := newMsgFactory(localMsg) - var err error - ps, err = components.NewProxyService(ctx, factory) - if err != nil { - panic(err) - } - _ = ps.Run() - }() - - if ps != nil { - defer ps.Stop() - } - - metrics.RegisterProxyService() - } - if mr.EnableProxyNode { var pn *components.ProxyNode diff --git a/cmd/standalone/main.go b/cmd/standalone/main.go index 9592c0281e..22322b0781 100644 --- a/cmd/standalone/main.go +++ b/cmd/standalone/main.go @@ -21,7 +21,6 @@ import ( func initRoles(roles *roles.MilvusRoles) { roles.EnableMaster = true - roles.EnableProxyService = true roles.EnableProxyNode = true roles.EnableQueryService = true roles.EnableQueryNode = true diff --git a/deployments/docker/distributed/docker-compose.yml b/deployments/docker/distributed/docker-compose.yml index d99c7e9124..d835276387 100644 --- a/deployments/docker/distributed/docker-compose.yml +++ b/deployments/docker/distributed/docker-compose.yml @@ -36,19 +36,6 @@ services: DATA_SERVICE_ADDRESS: dataservice:13333 INDEX_SERVICE_ADDRESS: indexservice:31000 QUERY_SERVICE_ADDRESS: queryservice:19531 - PROXY_SERVICE_ADDRESS: proxyservice:21122 - depends_on: - - "etcd" - - "pulsar" - - "minio" - networks: - - milvus - - proxyservice: - image: registry.zilliz.com/milvus/milvus:master-release - command: ["/milvus/bin/milvus", "run", "proxyservice"] - environment: - PULSAR_ADDRESS: pulsar://pulsar:6650 depends_on: - "etcd" - "pulsar" @@ -66,11 +53,8 @@ services: DATA_SERVICE_ADDRESS: dataservice:13333 INDEX_SERVICE_ADDRESS: indexservice:31000 QUERY_SERVICE_ADDRESS: queryservice:19531 - PROXY_SERVICE_ADDRESS: proxyservice:21122 ports: - "19530:19530" - depends_on: - - "proxyservice" networks: - milvus diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index 3c75929ae7..0a5125b429 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -820,9 +820,6 @@ type mockCore struct { func (m *mockCore) UpdateStateCode(internalpb.StateCode) { } -func (m *mockCore) SetProxyService(context.Context, types.ProxyService) error { - return nil -} func (m *mockCore) SetDataService(context.Context, types.DataService) error { return nil } diff --git a/internal/distributed/masterservice/param_table.go b/internal/distributed/masterservice/param_table.go index 3c1bc01845..c206d375f9 100644 --- a/internal/distributed/masterservice/param_table.go +++ b/internal/distributed/masterservice/param_table.go @@ -26,7 +26,6 @@ type ParamTable struct { Address string // ip:port Port int - ProxyServiceAddress string IndexServiceAddress string QueryServiceAddress string DataServiceAddress string @@ -41,7 +40,6 @@ func (p *ParamTable) Init() { } p.initAddress() p.initPort() - p.initProxyServiceAddress() p.initIndexServiceAddress() p.initQueryServiceAddress() p.initDataServiceAddress() @@ -61,14 +59,6 @@ func (p *ParamTable) initPort() { p.Port = p.ParseInt("master.port") } -func (p *ParamTable) initProxyServiceAddress() { - ret, err := p.Load("_PROXY_SERVICE_ADDRESS") - if err != nil { - panic(err) - } - p.ProxyServiceAddress = ret -} - func (p *ParamTable) initIndexServiceAddress() { ret, err := p.Load("IndexServiceAddress") if err != nil { diff --git a/internal/distributed/masterservice/param_table_test.go b/internal/distributed/masterservice/param_table_test.go index e71045a6a4..691d23a4c7 100644 --- a/internal/distributed/masterservice/param_table_test.go +++ b/internal/distributed/masterservice/param_table_test.go @@ -34,7 +34,4 @@ func TestParamTable(t *testing.T) { assert.NotEqual(t, Params.QueryServiceAddress, "") t.Logf("QueryServiceAddress:%s", Params.QueryServiceAddress) - - assert.NotEqual(t, Params.ProxyServiceAddress, "") - t.Logf("ProxyServiceAddress:%s", Params.ProxyServiceAddress) } diff --git a/internal/distributed/proxynode/paramtable.go b/internal/distributed/proxynode/paramtable.go index 589b1bbd39..f1d1d05ee2 100644 --- a/internal/distributed/proxynode/paramtable.go +++ b/internal/distributed/proxynode/paramtable.go @@ -21,9 +21,6 @@ import ( type ParamTable struct { paramtable.BaseTable - ProxyServiceAddress string - ProxyServicePort int - IndexServerAddress string MasterAddress string @@ -54,27 +51,13 @@ func (pt *ParamTable) LoadFromEnv() { } func (pt *ParamTable) initParams() { - pt.initPoxyServicePort() pt.initPort() - pt.initProxyServiceAddress() pt.initMasterAddress() pt.initIndexServerAddress() pt.initDataServiceAddress() pt.initQueryServiceAddress() } -func (pt *ParamTable) initPoxyServicePort() { - pt.ProxyServicePort = pt.ParseInt("proxyService.port") -} - -func (pt *ParamTable) initProxyServiceAddress() { - ret, err := pt.Load("_PROXY_SERVICE_ADDRESS") - if err != nil { - panic(err) - } - pt.ProxyServiceAddress = ret -} - // todo remove and use load from env func (pt *ParamTable) initIndexServerAddress() { ret, err := pt.Load("IndexServiceAddress") diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index 5d3f292dd5..1966b1cd6d 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -27,7 +27,6 @@ import ( grpcdataserviceclient "github.com/milvus-io/milvus/internal/distributed/dataservice/client" grpcindexserviceclient "github.com/milvus-io/milvus/internal/distributed/indexservice/client" grpcmasterserviceclient "github.com/milvus-io/milvus/internal/distributed/masterservice/client" - grpcproxyserviceclient "github.com/milvus-io/milvus/internal/distributed/proxyservice/client" grpcqueryserviceclient "github.com/milvus-io/milvus/internal/distributed/queryservice/client" otgrpc "github.com/opentracing-contrib/go-grpc" @@ -55,7 +54,6 @@ type Server struct { grpcErrChan chan error - proxyServiceClient *grpcproxyserviceclient.Client masterServiceClient *grpcmasterserviceclient.GrpcClient dataServiceClient *grpcdataserviceclient.Client queryServiceClient *grpcqueryserviceclient.Client @@ -170,15 +168,6 @@ func (s *Server) init() error { return err } - s.proxyServiceClient = grpcproxyserviceclient.NewClient(Params.ProxyServiceAddress) - err = s.proxyServiceClient.Init() - if err != nil { - log.Debug("ProxyNode proxyServiceClient init failed ", zap.Error(err)) - return err - } - s.proxynode.SetProxyServiceClient(s.proxyServiceClient) - log.Debug("set proxy service client ...") - masterServiceAddr := Params.MasterAddress log.Debug("ProxyNode", zap.String("master address", masterServiceAddr)) timeout := 3 * time.Second diff --git a/internal/distributed/proxyservice/client/client.go b/internal/distributed/proxyservice/client/client.go deleted file mode 100644 index 37ca8636ed..0000000000 --- a/internal/distributed/proxyservice/client/client.go +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package grpcproxyserviceclient - -import ( - "context" - "time" - - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/proto/commonpb" - "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/proto/milvuspb" - "github.com/milvus-io/milvus/internal/proto/proxypb" - "github.com/milvus-io/milvus/internal/util/retry" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" - "go.uber.org/zap" - "google.golang.org/grpc" -) - -type Client struct { - proxyServiceClient proxypb.ProxyServiceClient - address string - ctx context.Context -} - -func NewClient(address string) *Client { - return &Client{ - address: address, - ctx: context.Background(), - } -} - -func (c *Client) Init() error { - tracer := opentracing.GlobalTracer() - log.Debug("ProxyServiceClient try connect ", zap.String("address", c.address)) - connectGrpcFunc := func() error { - ctx, cancelFunc := context.WithTimeout(c.ctx, time.Second*3) - defer cancelFunc() - conn, err := grpc.DialContext(ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(), - grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer)), - grpc.WithStreamInterceptor( - otgrpc.OpenTracingStreamClientInterceptor(tracer))) - if err != nil { - return err - } - c.proxyServiceClient = proxypb.NewProxyServiceClient(conn) - return nil - } - err := retry.Retry(100000, time.Millisecond*200, connectGrpcFunc) - if err != nil { - log.Debug("ProxyServiceClient try connect failed", zap.Error(err)) - return err - } - log.Debug("ProxyServiceClient try connect success") - return nil -} - -func (c *Client) Start() error { - return nil -} - -func (c *Client) Stop() error { - return nil -} - -// Register dummy -func (c *Client) Register() error { - return nil -} - -func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { - return c.proxyServiceClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{}) -} - -func (c *Client) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { - return c.proxyServiceClient.GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{}) -} - -func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { - return c.proxyServiceClient.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{}) -} - -func (c *Client) RegisterNode(ctx context.Context, req *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error) { - return c.proxyServiceClient.RegisterNode(ctx, req) -} - -func (c *Client) InvalidateCollectionMetaCache(ctx context.Context, req *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { - return c.proxyServiceClient.InvalidateCollectionMetaCache(ctx, req) -} diff --git a/internal/distributed/proxyservice/paramtable.go b/internal/distributed/proxyservice/paramtable.go deleted file mode 100644 index 002a48efcd..0000000000 --- a/internal/distributed/proxyservice/paramtable.go +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package grpcproxyservice - -import ( - "sync" - - "github.com/milvus-io/milvus/internal/util/paramtable" -) - -type ParamTable struct { - paramtable.BaseTable - - ServiceAddress string - ServicePort int -} - -var Params ParamTable -var once sync.Once - -func (pt *ParamTable) Init() { - once.Do(func() { - pt.BaseTable.Init() - pt.initParams() - }) -} - -func (pt *ParamTable) initParams() { - pt.initServicePort() - pt.initServiceAddress() -} - -func (pt *ParamTable) initServicePort() { - pt.ServicePort = pt.ParseInt("proxyService.port") -} - -func (pt *ParamTable) initServiceAddress() { - ret, err := pt.Load("_PROXY_SERVICE_ADDRESS") - if err != nil { - panic(err) - } - pt.ServiceAddress = ret -} diff --git a/internal/distributed/proxyservice/service.go b/internal/distributed/proxyservice/service.go deleted file mode 100644 index 6ba42af8fb..0000000000 --- a/internal/distributed/proxyservice/service.go +++ /dev/null @@ -1,186 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package grpcproxyservice - -import ( - "context" - "io" - "math" - "net" - "strconv" - "sync" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/msgstream" - "github.com/milvus-io/milvus/internal/proto/commonpb" - "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/proto/milvuspb" - "github.com/milvus-io/milvus/internal/proto/proxypb" - "github.com/milvus-io/milvus/internal/proxyservice" - "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/trace" - otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" - "google.golang.org/grpc" -) - -type Server struct { - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - - grpcServer *grpc.Server - grpcErrChan chan error - - proxyservice *proxyservice.ProxyService - - tracer opentracing.Tracer - closer io.Closer -} - -func NewServer(ctx1 context.Context, factory msgstream.Factory) (*Server, error) { - ctx, cancel := context.WithCancel(ctx1) - var err error - - server := &Server{ - ctx: ctx, - cancel: cancel, - grpcErrChan: make(chan error), - } - - server.proxyservice, err = proxyservice.NewProxyService(server.ctx, factory) - if err != nil { - return nil, err - } - return server, nil -} - -func (s *Server) Run() error { - - if err := s.init(); err != nil { - return err - } - log.Debug("proxy service init done ...") - - if err := s.start(); err != nil { - return err - } - return nil -} - -func (s *Server) init() error { - Params.Init() - proxyservice.Params.Init() - log.Debug("init params done") - - closer := trace.InitTracing("proxy_service") - s.closer = closer - - s.wg.Add(1) - go s.startGrpcLoop(Params.ServicePort) - // wait for grpc server loop start - if err := <-s.grpcErrChan; err != nil { - return err - } - s.proxyservice.UpdateStateCode(internalpb.StateCode_Initializing) - log.Debug("grpc init done ...") - - if err := s.proxyservice.Init(); err != nil { - return err - } - return nil -} - -func (s *Server) startGrpcLoop(grpcPort int) { - - defer s.wg.Done() - - log.Debug("ProxyService", zap.Int("network port", grpcPort)) - lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort)) - if err != nil { - log.Warn("ProxyService", zap.String("GrpcServer:failed to listen", err.Error())) - s.grpcErrChan <- err - return - } - - ctx, cancel := context.WithCancel(s.ctx) - defer cancel() - - tracer := opentracing.GlobalTracer() - s.grpcServer = grpc.NewServer( - grpc.MaxRecvMsgSize(math.MaxInt32), - grpc.MaxSendMsgSize(math.MaxInt32), - grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), - grpc.StreamInterceptor( - otgrpc.OpenTracingStreamServerInterceptor(tracer))) - proxypb.RegisterProxyServiceServer(s.grpcServer, s) - milvuspb.RegisterProxyServiceServer(s.grpcServer, s) - - go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) - if err := s.grpcServer.Serve(lis); err != nil { - s.grpcErrChan <- err - } - -} - -func (s *Server) start() error { - log.Debug("ProxyService start ...") - if err := s.proxyservice.Start(); err != nil { - return err - } - return nil -} - -func (s *Server) Stop() error { - if s.closer != nil { - if err := s.closer.Close(); err != nil { - return err - } - } - s.cancel() - err := s.proxyservice.Stop() - if err != nil { - return err - } - if s.grpcServer != nil { - s.grpcServer.GracefulStop() - } - s.wg.Wait() - return nil -} - -func (s *Server) GetComponentStates(ctx context.Context, req *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) { - return s.proxyservice.GetComponentStates(ctx) -} - -func (s *Server) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) { - return s.proxyservice.GetTimeTickChannel(ctx) -} - -func (s *Server) GetStatisticsChannel(ctx context.Context, req *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) { - return s.proxyservice.GetStatisticsChannel(ctx) -} - -func (s *Server) RegisterLink(ctx context.Context, req *milvuspb.RegisterLinkRequest) (*milvuspb.RegisterLinkResponse, error) { - return s.proxyservice.RegisterLink(ctx) -} - -func (s *Server) RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error) { - return s.proxyservice.RegisterNode(ctx, request) -} - -func (s *Server) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { - return s.proxyservice.InvalidateCollectionMetaCache(ctx, request) -} diff --git a/internal/masterservice/timestamp_test.go b/internal/masterservice/timestamp_test.go index 379310f0ef..520d486580 100644 --- a/internal/masterservice/timestamp_test.go +++ b/internal/masterservice/timestamp_test.go @@ -24,30 +24,12 @@ import ( "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/masterpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" - "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/stretchr/testify/assert" ) -type tbp struct { - types.ProxyService -} - -func (*tbp) GetTimeTickChannel(context.Context) (*milvuspb.StringResponse, error) { - return &milvuspb.StringResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - Value: fmt.Sprintf("tbp-%d", rand.Int()), - }, nil -} - -func (*tbp) InvalidateCollectionMetaCache(context.Context, *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { - return nil, nil -} - type tbd struct { types.DataService } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index a479d34c2c..8385f4727a 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -214,11 +214,6 @@ func RegisterMaster() { //prometheus.MustRegister(PanicCounter) } -//RegisterProxyService register ProxyService metrics -func RegisterProxyService() { - -} - //RegisterProxyNode register ProxyNode metrics func RegisterProxyNode() { diff --git a/internal/metrics/metrics_test.go b/internal/metrics/metrics_test.go index a629854725..48601350b0 100644 --- a/internal/metrics/metrics_test.go +++ b/internal/metrics/metrics_test.go @@ -12,7 +12,6 @@ func TestRegisterMetrics(t *testing.T) { RegisterIndexNode() RegisterIndexService() RegisterProxyNode() - RegisterProxyService() RegisterQueryNode() RegisterQueryService() RegisterMsgStreamService() diff --git a/internal/proto/proxy_service.proto b/internal/proto/proxy_service.proto index 3b8353dc55..17b2722a52 100644 --- a/internal/proto/proxy_service.proto +++ b/internal/proto/proxy_service.proto @@ -7,15 +7,6 @@ import "common.proto"; import "internal.proto"; import "milvus.proto"; -service ProxyService { - rpc GetComponentStates(internal.GetComponentStatesRequest) returns (internal.ComponentStates) {} - rpc GetTimeTickChannel(internal.GetTimeTickChannelRequest) returns(milvus.StringResponse) {} - rpc GetStatisticsChannel(internal.GetStatisticsChannelRequest) returns(milvus.StringResponse){} - - rpc RegisterNode(RegisterNodeRequest) returns (RegisterNodeResponse) {} - rpc InvalidateCollectionMetaCache(InvalidateCollMetaCacheRequest) returns (common.Status) {} -} - service ProxyNodeService { rpc GetComponentStates(internal.GetComponentStatesRequest) returns (internal.ComponentStates) {} rpc GetStatisticsChannel(internal.GetStatisticsChannelRequest) returns(milvus.StringResponse){} @@ -24,16 +15,6 @@ service ProxyNodeService { rpc GetDdChannel(internal.GetDdChannelRequest) returns (milvus.StringResponse) {} } -message RegisterNodeRequest { - common.MsgBase base = 1; - common.Address address = 2; -} - -message RegisterNodeResponse { - internal.InitParams init_params = 1; - common.Status status = 2; -} - message InvalidateCollMetaCacheRequest { common.MsgBase base = 1; string db_name = 2; diff --git a/internal/proto/proxypb/proxy_service.pb.go b/internal/proto/proxypb/proxy_service.pb.go index 561cf472cc..72ecfbcbe1 100644 --- a/internal/proto/proxypb/proxy_service.pb.go +++ b/internal/proto/proxypb/proxy_service.pb.go @@ -27,100 +27,6 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package -type RegisterNodeRequest struct { - Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` - Address *commonpb.Address `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *RegisterNodeRequest) Reset() { *m = RegisterNodeRequest{} } -func (m *RegisterNodeRequest) String() string { return proto.CompactTextString(m) } -func (*RegisterNodeRequest) ProtoMessage() {} -func (*RegisterNodeRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_34ca2fbc94d169de, []int{0} -} - -func (m *RegisterNodeRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_RegisterNodeRequest.Unmarshal(m, b) -} -func (m *RegisterNodeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_RegisterNodeRequest.Marshal(b, m, deterministic) -} -func (m *RegisterNodeRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_RegisterNodeRequest.Merge(m, src) -} -func (m *RegisterNodeRequest) XXX_Size() int { - return xxx_messageInfo_RegisterNodeRequest.Size(m) -} -func (m *RegisterNodeRequest) XXX_DiscardUnknown() { - xxx_messageInfo_RegisterNodeRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_RegisterNodeRequest proto.InternalMessageInfo - -func (m *RegisterNodeRequest) GetBase() *commonpb.MsgBase { - if m != nil { - return m.Base - } - return nil -} - -func (m *RegisterNodeRequest) GetAddress() *commonpb.Address { - if m != nil { - return m.Address - } - return nil -} - -type RegisterNodeResponse struct { - InitParams *internalpb.InitParams `protobuf:"bytes,1,opt,name=init_params,json=initParams,proto3" json:"init_params,omitempty"` - Status *commonpb.Status `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *RegisterNodeResponse) Reset() { *m = RegisterNodeResponse{} } -func (m *RegisterNodeResponse) String() string { return proto.CompactTextString(m) } -func (*RegisterNodeResponse) ProtoMessage() {} -func (*RegisterNodeResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_34ca2fbc94d169de, []int{1} -} - -func (m *RegisterNodeResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_RegisterNodeResponse.Unmarshal(m, b) -} -func (m *RegisterNodeResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_RegisterNodeResponse.Marshal(b, m, deterministic) -} -func (m *RegisterNodeResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_RegisterNodeResponse.Merge(m, src) -} -func (m *RegisterNodeResponse) XXX_Size() int { - return xxx_messageInfo_RegisterNodeResponse.Size(m) -} -func (m *RegisterNodeResponse) XXX_DiscardUnknown() { - xxx_messageInfo_RegisterNodeResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_RegisterNodeResponse proto.InternalMessageInfo - -func (m *RegisterNodeResponse) GetInitParams() *internalpb.InitParams { - if m != nil { - return m.InitParams - } - return nil -} - -func (m *RegisterNodeResponse) GetStatus() *commonpb.Status { - if m != nil { - return m.Status - } - return nil -} - type InvalidateCollMetaCacheRequest struct { Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` DbName string `protobuf:"bytes,2,opt,name=db_name,json=dbName,proto3" json:"db_name,omitempty"` @@ -134,7 +40,7 @@ func (m *InvalidateCollMetaCacheRequest) Reset() { *m = InvalidateCollMe func (m *InvalidateCollMetaCacheRequest) String() string { return proto.CompactTextString(m) } func (*InvalidateCollMetaCacheRequest) ProtoMessage() {} func (*InvalidateCollMetaCacheRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_34ca2fbc94d169de, []int{2} + return fileDescriptor_34ca2fbc94d169de, []int{0} } func (m *InvalidateCollMetaCacheRequest) XXX_Unmarshal(b []byte) error { @@ -177,47 +83,36 @@ func (m *InvalidateCollMetaCacheRequest) GetCollectionName() string { } func init() { - proto.RegisterType((*RegisterNodeRequest)(nil), "milvus.proto.proxy.RegisterNodeRequest") - proto.RegisterType((*RegisterNodeResponse)(nil), "milvus.proto.proxy.RegisterNodeResponse") proto.RegisterType((*InvalidateCollMetaCacheRequest)(nil), "milvus.proto.proxy.InvalidateCollMetaCacheRequest") } func init() { proto.RegisterFile("proxy_service.proto", fileDescriptor_34ca2fbc94d169de) } var fileDescriptor_34ca2fbc94d169de = []byte{ - // 501 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x93, 0xcf, 0x6e, 0xd3, 0x40, - 0x10, 0xc6, 0x13, 0x5a, 0xa5, 0x62, 0x6a, 0x15, 0xb4, 0xad, 0x44, 0x65, 0xfe, 0x08, 0x8c, 0x44, - 0x2b, 0x24, 0x9c, 0xca, 0x45, 0xdc, 0x49, 0x90, 0xa2, 0x1e, 0x5a, 0x55, 0x4e, 0x4f, 0x5c, 0xa2, - 0xb5, 0x3d, 0x4a, 0x56, 0x78, 0x77, 0xdd, 0xdd, 0x4d, 0x04, 0x27, 0x1e, 0x81, 0x0b, 0x6f, 0xc3, - 0x3b, 0xf0, 0x4c, 0xc8, 0xeb, 0x3f, 0xd4, 0x49, 0x63, 0x14, 0x71, 0xc8, 0xcd, 0x6b, 0xfd, 0x66, - 0xbe, 0xf9, 0x76, 0xbe, 0x85, 0xc3, 0x4c, 0xc9, 0xaf, 0xdf, 0x26, 0x1a, 0xd5, 0x82, 0xc5, 0xe8, - 0x67, 0x4a, 0x1a, 0x49, 0x08, 0x67, 0xe9, 0x62, 0xae, 0x8b, 0x93, 0x6f, 0x09, 0xd7, 0x89, 0x25, - 0xe7, 0x52, 0x14, 0xff, 0xdc, 0x03, 0x26, 0x0c, 0x2a, 0x41, 0xd3, 0xf2, 0xec, 0xdc, 0xad, 0xf0, - 0xbe, 0xc3, 0x61, 0x88, 0x53, 0xa6, 0x0d, 0xaa, 0x2b, 0x99, 0x60, 0x88, 0xb7, 0x73, 0xd4, 0x86, - 0x9c, 0xc1, 0x6e, 0x44, 0x35, 0x1e, 0x77, 0x5f, 0x76, 0x4f, 0xf7, 0x83, 0x67, 0x7e, 0x43, 0xa5, - 0x6c, 0x7f, 0xa9, 0xa7, 0x03, 0xaa, 0x31, 0xb4, 0x24, 0xf9, 0x00, 0x7b, 0x34, 0x49, 0x14, 0x6a, - 0x7d, 0xfc, 0xa0, 0xa5, 0xe8, 0x63, 0xc1, 0x84, 0x15, 0xec, 0xfd, 0xe8, 0xc2, 0x51, 0x73, 0x02, - 0x9d, 0x49, 0xa1, 0x91, 0x0c, 0x60, 0x9f, 0x09, 0x66, 0x26, 0x19, 0x55, 0x94, 0xeb, 0x72, 0x92, - 0x57, 0xcd, 0xa6, 0xb5, 0xb5, 0x0b, 0xc1, 0xcc, 0xb5, 0x05, 0x43, 0x60, 0xf5, 0x37, 0x39, 0x87, - 0x9e, 0x36, 0xd4, 0xcc, 0xab, 0x99, 0x9e, 0xde, 0x3b, 0xd3, 0xd8, 0x22, 0x61, 0x89, 0x7a, 0x3f, - 0xbb, 0xf0, 0xe2, 0x42, 0x2c, 0x68, 0xca, 0x12, 0x6a, 0x70, 0x28, 0xd3, 0xf4, 0x12, 0x0d, 0x1d, - 0xd2, 0x78, 0xf6, 0x1f, 0xd7, 0xf3, 0x04, 0xf6, 0x92, 0x68, 0x22, 0x28, 0x47, 0x3b, 0xca, 0xc3, - 0xb0, 0x97, 0x44, 0x57, 0x94, 0x23, 0x39, 0x81, 0x47, 0xb1, 0x4c, 0x53, 0x8c, 0x0d, 0x93, 0xa2, - 0x00, 0x76, 0x2c, 0x70, 0xf0, 0xf7, 0x77, 0x0e, 0x06, 0xbf, 0x76, 0xc1, 0xb9, 0xce, 0xf7, 0x3b, - 0x2e, 0x02, 0x40, 0x32, 0x20, 0x23, 0x34, 0x43, 0xc9, 0x33, 0x29, 0x50, 0x98, 0xdc, 0x05, 0x6a, - 0x72, 0xb6, 0xe6, 0x86, 0x56, 0xd1, 0xd2, 0x8c, 0xfb, 0x66, 0x4d, 0xc5, 0x12, 0xee, 0x75, 0x08, - 0xb7, 0x8a, 0x37, 0x8c, 0xe3, 0x0d, 0x8b, 0xbf, 0x0c, 0x67, 0x54, 0x08, 0x4c, 0xdb, 0x14, 0x97, - 0xd0, 0x4a, 0xf1, 0x75, 0xb3, 0xa2, 0x3c, 0x8c, 0x8d, 0x62, 0x62, 0x5a, 0xed, 0xdf, 0xeb, 0x90, - 0x5b, 0x38, 0x1a, 0xa1, 0x55, 0x67, 0xda, 0xb0, 0x58, 0x57, 0x82, 0xc1, 0x7a, 0xc1, 0x15, 0x78, - 0x43, 0xc9, 0x18, 0x9c, 0xbb, 0x61, 0x24, 0x27, 0xfe, 0xea, 0xfb, 0xf2, 0xef, 0x79, 0x30, 0xee, - 0xe9, 0xbf, 0xc1, 0x5a, 0x44, 0xc1, 0xf3, 0x66, 0xbe, 0x8a, 0x2d, 0xd7, 0x29, 0x5b, 0x36, 0x58, - 0x34, 0x6b, 0x8f, 0xa4, 0xdb, 0x16, 0x6d, 0xaf, 0x13, 0xfc, 0xde, 0x81, 0xc7, 0x36, 0x3d, 0xf9, - 0x2c, 0xdb, 0x4b, 0xd0, 0x16, 0x56, 0xba, 0x85, 0xdb, 0x26, 0x14, 0x9c, 0x11, 0x9a, 0x4f, 0x49, - 0x65, 0xef, 0xed, 0x7a, 0x7b, 0x35, 0xb4, 0x99, 0xad, 0xc1, 0xfb, 0xcf, 0xc1, 0x94, 0x99, 0xd9, - 0x3c, 0xca, 0xc5, 0xfb, 0x05, 0xf5, 0x8e, 0xc9, 0xf2, 0xab, 0x5f, 0x49, 0xf4, 0x6d, 0x97, 0xbe, - 0x35, 0x95, 0x45, 0x51, 0xcf, 0x1e, 0xcf, 0xff, 0x04, 0x00, 0x00, 0xff, 0xff, 0x1f, 0x4d, 0x0f, - 0xc9, 0x4c, 0x06, 0x00, 0x00, + // 367 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x91, 0xdf, 0x6a, 0xe2, 0x40, + 0x14, 0xc6, 0xcd, 0xba, 0xb8, 0xec, 0xac, 0xb8, 0xcb, 0xec, 0xc2, 0x4a, 0xfa, 0x07, 0xb1, 0xd0, + 0x4a, 0xa1, 0x89, 0xa4, 0x7d, 0x02, 0x53, 0x90, 0x5e, 0x28, 0x25, 0xde, 0xf5, 0x46, 0x26, 0xc9, + 0x41, 0x07, 0x26, 0x33, 0x31, 0x73, 0x22, 0xed, 0x7b, 0xf4, 0xb9, 0xfa, 0x4c, 0x25, 0x93, 0x68, + 0x1b, 0xad, 0x85, 0xde, 0xe5, 0x0c, 0xbf, 0x93, 0x6f, 0x7e, 0xf3, 0x91, 0xbf, 0x69, 0xa6, 0x1e, + 0x9f, 0xe6, 0x1a, 0xb2, 0x35, 0x8f, 0xc0, 0x49, 0x33, 0x85, 0x8a, 0xd2, 0x84, 0x8b, 0x75, 0xae, + 0xcb, 0xc9, 0x31, 0x84, 0xdd, 0x8e, 0x54, 0x92, 0x28, 0x59, 0x9e, 0xd9, 0x1d, 0x2e, 0x11, 0x32, + 0xc9, 0x44, 0x35, 0xb7, 0xdf, 0x6f, 0xf4, 0x9f, 0x2d, 0x72, 0x7a, 0x27, 0xd7, 0x4c, 0xf0, 0x98, + 0x21, 0xf8, 0x4a, 0x88, 0x09, 0x20, 0xf3, 0x59, 0xb4, 0x84, 0x00, 0x56, 0x39, 0x68, 0xa4, 0x43, + 0xf2, 0x3d, 0x64, 0x1a, 0xba, 0x56, 0xcf, 0x1a, 0xfc, 0xf2, 0x8e, 0x9d, 0x5a, 0x62, 0x15, 0x35, + 0xd1, 0x8b, 0x11, 0xd3, 0x10, 0x18, 0x92, 0xfe, 0x27, 0x3f, 0xe2, 0x70, 0x2e, 0x59, 0x02, 0xdd, + 0x6f, 0x3d, 0x6b, 0xf0, 0x33, 0x68, 0xc5, 0xe1, 0x94, 0x25, 0x40, 0x2f, 0xc8, 0xef, 0x48, 0x09, + 0x01, 0x11, 0x72, 0x25, 0x4b, 0xa0, 0x69, 0x80, 0xce, 0xdb, 0x71, 0x01, 0x7a, 0x2f, 0x4d, 0xf2, + 0xe7, 0xbe, 0x90, 0x99, 0xaa, 0x18, 0x66, 0xa5, 0x31, 0x4d, 0x09, 0x1d, 0x03, 0xfa, 0x2a, 0x49, + 0x95, 0x04, 0x89, 0x33, 0x64, 0x08, 0x9a, 0x0e, 0xeb, 0x17, 0xda, 0xda, 0xee, 0xa3, 0x95, 0x90, + 0x7d, 0x7e, 0x60, 0x63, 0x07, 0xef, 0x37, 0xe8, 0x8a, 0xfc, 0x1b, 0x83, 0x19, 0xb9, 0x46, 0x1e, + 0x69, 0x7f, 0xc9, 0xa4, 0x04, 0x41, 0xbd, 0xc3, 0x99, 0x7b, 0xf0, 0x26, 0xf5, 0xac, 0xbe, 0x53, + 0x0d, 0x33, 0xcc, 0xb8, 0x5c, 0x04, 0xa0, 0x53, 0x25, 0x35, 0xf4, 0x1b, 0x34, 0x23, 0x27, 0xf5, + 0x3e, 0xca, 0x57, 0xd9, 0xb6, 0xb2, 0x9b, 0x6d, 0x2a, 0x77, 0x3e, 0xaf, 0xd0, 0x3e, 0xfa, 0xb0, + 0xb4, 0xe2, 0xaa, 0x79, 0xa1, 0xc9, 0x48, 0x7b, 0x0c, 0x78, 0x1b, 0x6f, 0xf4, 0x2e, 0x0f, 0xeb, + 0x6d, 0xa1, 0xaf, 0x69, 0x8d, 0x6e, 0x1e, 0xbc, 0x05, 0xc7, 0x65, 0x1e, 0x16, 0xe1, 0x6e, 0x49, + 0x5d, 0x71, 0x55, 0x7d, 0xb9, 0x9b, 0x08, 0xd7, 0xfc, 0xc5, 0x35, 0x52, 0x69, 0x18, 0xb6, 0xcc, + 0x78, 0xfd, 0x1a, 0x00, 0x00, 0xff, 0xff, 0x67, 0x1c, 0x59, 0x0b, 0xfb, 0x02, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -228,222 +123,6 @@ var _ grpc.ClientConn // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion4 -// ProxyServiceClient is the client API for ProxyService service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type ProxyServiceClient interface { - GetComponentStates(ctx context.Context, in *internalpb.GetComponentStatesRequest, opts ...grpc.CallOption) (*internalpb.ComponentStates, error) - GetTimeTickChannel(ctx context.Context, in *internalpb.GetTimeTickChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) - GetStatisticsChannel(ctx context.Context, in *internalpb.GetStatisticsChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) - RegisterNode(ctx context.Context, in *RegisterNodeRequest, opts ...grpc.CallOption) (*RegisterNodeResponse, error) - InvalidateCollectionMetaCache(ctx context.Context, in *InvalidateCollMetaCacheRequest, opts ...grpc.CallOption) (*commonpb.Status, error) -} - -type proxyServiceClient struct { - cc *grpc.ClientConn -} - -func NewProxyServiceClient(cc *grpc.ClientConn) ProxyServiceClient { - return &proxyServiceClient{cc} -} - -func (c *proxyServiceClient) GetComponentStates(ctx context.Context, in *internalpb.GetComponentStatesRequest, opts ...grpc.CallOption) (*internalpb.ComponentStates, error) { - out := new(internalpb.ComponentStates) - err := c.cc.Invoke(ctx, "/milvus.proto.proxy.ProxyService/GetComponentStates", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *proxyServiceClient) GetTimeTickChannel(ctx context.Context, in *internalpb.GetTimeTickChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) { - out := new(milvuspb.StringResponse) - err := c.cc.Invoke(ctx, "/milvus.proto.proxy.ProxyService/GetTimeTickChannel", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *proxyServiceClient) GetStatisticsChannel(ctx context.Context, in *internalpb.GetStatisticsChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) { - out := new(milvuspb.StringResponse) - err := c.cc.Invoke(ctx, "/milvus.proto.proxy.ProxyService/GetStatisticsChannel", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *proxyServiceClient) RegisterNode(ctx context.Context, in *RegisterNodeRequest, opts ...grpc.CallOption) (*RegisterNodeResponse, error) { - out := new(RegisterNodeResponse) - err := c.cc.Invoke(ctx, "/milvus.proto.proxy.ProxyService/RegisterNode", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *proxyServiceClient) InvalidateCollectionMetaCache(ctx context.Context, in *InvalidateCollMetaCacheRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { - out := new(commonpb.Status) - err := c.cc.Invoke(ctx, "/milvus.proto.proxy.ProxyService/InvalidateCollectionMetaCache", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// ProxyServiceServer is the server API for ProxyService service. -type ProxyServiceServer interface { - GetComponentStates(context.Context, *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) - GetTimeTickChannel(context.Context, *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) - GetStatisticsChannel(context.Context, *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) - RegisterNode(context.Context, *RegisterNodeRequest) (*RegisterNodeResponse, error) - InvalidateCollectionMetaCache(context.Context, *InvalidateCollMetaCacheRequest) (*commonpb.Status, error) -} - -// UnimplementedProxyServiceServer can be embedded to have forward compatible implementations. -type UnimplementedProxyServiceServer struct { -} - -func (*UnimplementedProxyServiceServer) GetComponentStates(ctx context.Context, req *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetComponentStates not implemented") -} -func (*UnimplementedProxyServiceServer) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetTimeTickChannel not implemented") -} -func (*UnimplementedProxyServiceServer) GetStatisticsChannel(ctx context.Context, req *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetStatisticsChannel not implemented") -} -func (*UnimplementedProxyServiceServer) RegisterNode(ctx context.Context, req *RegisterNodeRequest) (*RegisterNodeResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method RegisterNode not implemented") -} -func (*UnimplementedProxyServiceServer) InvalidateCollectionMetaCache(ctx context.Context, req *InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { - return nil, status.Errorf(codes.Unimplemented, "method InvalidateCollectionMetaCache not implemented") -} - -func RegisterProxyServiceServer(s *grpc.Server, srv ProxyServiceServer) { - s.RegisterService(&_ProxyService_serviceDesc, srv) -} - -func _ProxyService_GetComponentStates_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(internalpb.GetComponentStatesRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ProxyServiceServer).GetComponentStates(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/milvus.proto.proxy.ProxyService/GetComponentStates", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ProxyServiceServer).GetComponentStates(ctx, req.(*internalpb.GetComponentStatesRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _ProxyService_GetTimeTickChannel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(internalpb.GetTimeTickChannelRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ProxyServiceServer).GetTimeTickChannel(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/milvus.proto.proxy.ProxyService/GetTimeTickChannel", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ProxyServiceServer).GetTimeTickChannel(ctx, req.(*internalpb.GetTimeTickChannelRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _ProxyService_GetStatisticsChannel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(internalpb.GetStatisticsChannelRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ProxyServiceServer).GetStatisticsChannel(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/milvus.proto.proxy.ProxyService/GetStatisticsChannel", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ProxyServiceServer).GetStatisticsChannel(ctx, req.(*internalpb.GetStatisticsChannelRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _ProxyService_RegisterNode_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(RegisterNodeRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ProxyServiceServer).RegisterNode(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/milvus.proto.proxy.ProxyService/RegisterNode", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ProxyServiceServer).RegisterNode(ctx, req.(*RegisterNodeRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _ProxyService_InvalidateCollectionMetaCache_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(InvalidateCollMetaCacheRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ProxyServiceServer).InvalidateCollectionMetaCache(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/milvus.proto.proxy.ProxyService/InvalidateCollectionMetaCache", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ProxyServiceServer).InvalidateCollectionMetaCache(ctx, req.(*InvalidateCollMetaCacheRequest)) - } - return interceptor(ctx, in, info, handler) -} - -var _ProxyService_serviceDesc = grpc.ServiceDesc{ - ServiceName: "milvus.proto.proxy.ProxyService", - HandlerType: (*ProxyServiceServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "GetComponentStates", - Handler: _ProxyService_GetComponentStates_Handler, - }, - { - MethodName: "GetTimeTickChannel", - Handler: _ProxyService_GetTimeTickChannel_Handler, - }, - { - MethodName: "GetStatisticsChannel", - Handler: _ProxyService_GetStatisticsChannel_Handler, - }, - { - MethodName: "RegisterNode", - Handler: _ProxyService_RegisterNode_Handler, - }, - { - MethodName: "InvalidateCollectionMetaCache", - Handler: _ProxyService_InvalidateCollectionMetaCache_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: "proxy_service.proto", -} - // ProxyNodeServiceClient is the client API for ProxyNodeService service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. diff --git a/internal/proxynode/paramtable.go b/internal/proxynode/paramtable.go index 877c14ebf4..3c72ca27fb 100644 --- a/internal/proxynode/paramtable.go +++ b/internal/proxynode/paramtable.go @@ -12,7 +12,6 @@ package proxynode import ( - "bytes" "fmt" "path" "strconv" @@ -20,12 +19,9 @@ import ( "sync" "time" - "github.com/spf13/cast" - "github.com/spf13/viper" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/paramtable" ) @@ -73,77 +69,21 @@ type ParamTable struct { var Params ParamTable var once sync.Once -func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb.InitParams) error { - pt.ProxyID = initParams.NodeID - - config := viper.New() - config.SetConfigType("yaml") - save := func() error { - for _, key := range config.AllKeys() { - val := config.Get(key) - str, err := cast.ToStringE(val) - if err != nil { - switch val := val.(type) { - case []interface{}: - str = str[:0] - for _, v := range val { - ss, err := cast.ToStringE(v) - if err != nil { - log.Warn("proxynode", zap.String("error", err.Error())) - } - if len(str) == 0 { - str = ss - } else { - str = str + "," + ss - } - } - - default: - log.Debug("proxynode", zap.String("error", "Undefined config type, key="+key)) - } - } - err = pt.Save(key, str) - if err != nil { - panic(err) - } - } - return nil - } - - for _, pair := range initParams.StartParams { - if strings.HasPrefix(pair.Key, StartParamsKey) { - err := config.ReadConfig(bytes.NewBuffer([]byte(pair.Value))) - if err != nil { - return err - } - err = save() - if err != nil { - return err - } - } - } - - pt.initParams() - - return nil -} - func (pt *ParamTable) Init() { once.Do(func() { pt.BaseTable.Init() - pt.initLogCfg() - - pt.initEtcdAddress() - pt.initMetaRootPath() - // err := pt.LoadYaml("advanced/proxy_node.yaml") - // if err != nil { - // panic(err) - // } - // pt.initParams() + err := pt.LoadYaml("advanced/proxy_node.yaml") + if err != nil { + panic(err) + } + pt.initParams() }) } func (pt *ParamTable) initParams() { + pt.initLogCfg() + pt.initEtcdAddress() + pt.initMetaRootPath() pt.initPulsarAddress() pt.initQueryNodeIDList() pt.initQueryNodeNum() @@ -184,7 +124,7 @@ func (pt *ParamTable) initQueryNodeIDList() []UniqueID { for _, i := range queryNodeIDs { v, err := strconv.Atoi(i) if err != nil { - log.Error("proxynode", zap.String("load proxynode id list error", err.Error())) + log.Error("ProxyNode ParamsTable", zap.String("load QueryNodeID list error", err.Error())) } ret = append(ret, UniqueID(v)) } diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index 6a9a6e8ae3..7dd2ac852f 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -28,7 +28,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" - "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" @@ -53,7 +52,6 @@ type ProxyNode struct { masterService types.MasterService indexService types.IndexService dataService types.DataService - proxyService types.ProxyService queryService types.QueryService chMgr channelsMgr @@ -100,38 +98,8 @@ func (node *ProxyNode) Register() error { } func (node *ProxyNode) Init() error { - // todo wait for proxyservice state changed to Healthy ctx := context.Background() - err := funcutil.WaitForComponentHealthy(ctx, node.proxyService, "ProxyService", 1000000, time.Millisecond*200) - if err != nil { - return err - } - log.Debug("ProxyService is ready ...") - - request := &proxypb.RegisterNodeRequest{ - Address: &commonpb.Address{ - Ip: Params.IP, - Port: int64(Params.NetworkPort), - }, - } - - response, err := node.proxyService.RegisterNode(ctx, request) - if err != nil { - log.Debug("ProxyNode RegisterNode failed", zap.Error(err)) - return err - } - if response.Status.ErrorCode != commonpb.ErrorCode_Success { - log.Debug("ProxyNode RegisterNode failed", zap.String("Reason", response.Status.Reason)) - return errors.New(response.Status.Reason) - } - - err = Params.LoadConfigFromInitParams(response.InitParams) - if err != nil { - log.Debug("ProxyNode LoadConfigFromInitParams failed", zap.Error(err)) - return err - } - // wait for dataservice state changed to Healthy if node.dataService != nil { log.Debug("ProxyNode wait for dataService ready") @@ -197,7 +165,7 @@ func (node *ProxyNode) Init() error { m := map[string]interface{}{ "PulsarAddress": Params.PulsarAddress, "PulsarBufSize": 1024} - err = node.msFactory.SetParams(m) + err := node.msFactory.SetParams(m) if err != nil { return err } @@ -445,10 +413,6 @@ func (node *ProxyNode) SetDataServiceClient(cli types.DataService) { node.dataService = cli } -func (node *ProxyNode) SetProxyServiceClient(cli types.ProxyService) { - node.proxyService = cli -} - func (node *ProxyNode) SetQueryServiceClient(cli types.QueryService) { node.queryService = cli } diff --git a/internal/proxyservice/OWNERS b/internal/proxyservice/OWNERS deleted file mode 100644 index b5d5d4db38..0000000000 --- a/internal/proxyservice/OWNERS +++ /dev/null @@ -1,15 +0,0 @@ -# order by contributions -reviewers: - - DragonDriver - - sunby - - xiaocai2333 - - godchen0212 - -approvers: - - czs007 - - neza2017 - - scsven - -labels: -- component/proxyservice - diff --git a/internal/proxyservice/impl.go b/internal/proxyservice/impl.go deleted file mode 100644 index 5081fe3fb8..0000000000 --- a/internal/proxyservice/impl.go +++ /dev/null @@ -1,304 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package proxyservice - -import ( - "context" - "io/ioutil" - "os" - "path" - "runtime" - "strconv" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/proto/commonpb" - "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/proto/milvuspb" - "github.com/milvus-io/milvus/internal/proto/proxypb" - "github.com/milvus-io/milvus/internal/timesync" -) - -const ( - StartParamsKey = "START_PARAMS" - ChannelYamlContent = "advanced/channel.yaml" - CommonYamlContent = "advanced/common.yaml" - DataNodeYamlContent = "advanced/data_node.yaml" - MasterYamlContent = "advanced/master.yaml" - ProxyNodeYamlContent = "advanced/proxy_node.yaml" - QueryNodeYamlContent = "advanced/query_node.yaml" - MilvusYamlContent = "milvus.yaml" -) - -func (s *ProxyService) fillNodeInitParams() error { - s.nodeStartParams = make([]*commonpb.KeyValuePair, 0) - - getConfigContentByName := func(fileName string) []byte { - _, fpath, _, _ := runtime.Caller(0) - configFile := path.Dir(fpath) + "/../../configs/" + fileName - _, err := os.Stat(configFile) - log.Debug("proxyservice", zap.String("configFile = ", configFile)) - if os.IsNotExist(err) { - runPath, err := os.Getwd() - if err != nil { - panic(err) - } - configFile = runPath + "/configs/" + fileName - } - data, err := ioutil.ReadFile(configFile) - if err != nil { - panic(err) - } - return append(data, []byte("\n")...) - } - - channelYamlContent := getConfigContentByName(ChannelYamlContent) - commonYamlContent := getConfigContentByName(CommonYamlContent) - dataNodeYamlContent := getConfigContentByName(DataNodeYamlContent) - masterYamlContent := getConfigContentByName(MasterYamlContent) - proxyNodeYamlContent := getConfigContentByName(ProxyNodeYamlContent) - queryNodeYamlContent := getConfigContentByName(QueryNodeYamlContent) - milvusYamlContent := getConfigContentByName(MilvusYamlContent) - - appendContent := func(key string, content []byte) { - s.nodeStartParams = append(s.nodeStartParams, &commonpb.KeyValuePair{ - Key: StartParamsKey + "_" + key, - Value: string(content), - }) - } - appendContent(ChannelYamlContent, channelYamlContent) - appendContent(CommonYamlContent, commonYamlContent) - appendContent(DataNodeYamlContent, dataNodeYamlContent) - appendContent(MasterYamlContent, masterYamlContent) - appendContent(ProxyNodeYamlContent, proxyNodeYamlContent) - appendContent(QueryNodeYamlContent, queryNodeYamlContent) - appendContent(MilvusYamlContent, milvusYamlContent) - return nil -} - -func (s *ProxyService) Init() error { - err := s.fillNodeInitParams() - if err != nil { - log.Debug("ProxyService fillNodeInitParams failed", zap.Error(err)) - return err - } - log.Debug("ProxyService fillNodeInitParams success ...") - - m := map[string]interface{}{ - "PulsarAddress": Params.PulsarAddress, - "ReceiveBufSize": 1024, - "PulsarBufSize": 1024} - err = s.msFactory.SetParams(m) - if err != nil { - return err - } - - serviceTimeTickMsgStream, _ := s.msFactory.NewTtMsgStream(s.ctx) - serviceTimeTickMsgStream.AsProducer([]string{Params.ServiceTimeTickChannel}) - log.Debug("ProxyService AsProducer", zap.Strings("channels", []string{Params.ServiceTimeTickChannel})) - - channels := make([]string, Params.InsertChannelNum) - var i int64 = 0 - for ; i < Params.InsertChannelNum; i++ { - channels[i] = Params.InsertChannelPrefixName + strconv.FormatInt(i, 10) - } - insertTickMsgStream, _ := s.msFactory.NewMsgStream(s.ctx) - insertTickMsgStream.AsProducer(channels) - log.Debug("ProxyService", zap.Strings("create insert time tick producer channels", channels)) - - nodeTimeTickMsgStream, _ := s.msFactory.NewMsgStream(s.ctx) - nodeTimeTickMsgStream.AsConsumer(Params.NodeTimeTickChannel, "proxyservicesub") // TODO: add config - log.Debug("ProxyService", zap.Strings("NodeTimeTickChannel", Params.NodeTimeTickChannel)) - - ttBarrier := timesync.NewSoftTimeTickBarrier(s.ctx, nodeTimeTickMsgStream, []UniqueID{1}, 10) - log.Debug("ProxyService create soft time tick barrier ...") - s.tick = newTimeTick(s.ctx, ttBarrier, serviceTimeTickMsgStream, insertTickMsgStream) - log.Debug("ProxyService create time tick ...") - - return nil -} - -func (s *ProxyService) Start() error { - s.sched.Start() - log.Debug("ProxyService start scheduler ...") - defer func() { - s.UpdateStateCode(internalpb.StateCode_Healthy) - log.Debug("ProxyService", zap.Any("State", s.stateCode.Load())) - }() - return s.tick.Start() -} - -func (s *ProxyService) Stop() error { - s.sched.Close() - log.Debug("close scheduler ...") - s.tick.Close() - log.Debug("close time tick") - - err := s.nodeInfos.ReleaseAllClients() - if err != nil { - panic(err) - } - log.Debug("stop all node ProxyNodes ...") - - s.cancel() - - return nil -} - -func (s *ProxyService) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { - stateInfo := &internalpb.ComponentInfo{ - NodeID: UniqueID(0), - Role: "ProxyService", - StateCode: s.stateCode.Load().(internalpb.StateCode), - } - - ret := &internalpb.ComponentStates{ - State: stateInfo, - SubcomponentStates: nil, // todo add subcomponents states - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - } - return ret, nil -} - -func (s *ProxyService) UpdateStateCode(code internalpb.StateCode) { - s.stateCode.Store(code) -} - -func (s *ProxyService) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { - return &milvuspb.StringResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - Value: Params.ServiceTimeTickChannel, - }, nil -} - -func (s *ProxyService) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { - panic("implement me") -} - -func (s *ProxyService) RegisterLink(ctx context.Context) (*milvuspb.RegisterLinkResponse, error) { - log.Debug("ProxyService RegisterLink") - - t := ®isterLinkTask{ - ctx: ctx, - Condition: newTaskCondition(ctx), - nodeInfos: s.nodeInfos, - } - - var err error - - err = s.sched.RegisterLinkTaskQueue.Enqueue(t) - if err != nil { - log.Debug("ProxyService RegisterLink Enqueue failed", zap.Error(err)) - return &milvuspb.RegisterLinkResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, - Address: nil, - }, nil - } - - err = t.WaitToFinish() - if err != nil { - log.Debug("ProxyService RegisterLink WaitToFinish failed", zap.Error(err)) - return &milvuspb.RegisterLinkResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, - Address: nil, - }, nil - } - log.Debug("ProxyService rRegisterLink WaitToFinish failed", zap.Error(err)) - return t.response, nil -} - -func (s *ProxyService) RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error) { - log.Debug("ProxyService receive RegisterNode request", - zap.String("ip", request.Address.Ip), - zap.Int64("port", request.Address.Port)) - - t := ®isterNodeTask{ - ctx: ctx, - request: request, - startParams: s.nodeStartParams, - Condition: newTaskCondition(ctx), - allocator: s.allocator, - nodeInfos: s.nodeInfos, - } - - var err error - - err = s.sched.RegisterNodeTaskQueue.Enqueue(t) - if err != nil { - log.Debug("ProxyService RegisterNode Enqueue failed", zap.Error(err)) - return &proxypb.RegisterNodeResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, - InitParams: nil, - }, nil - } - - err = t.WaitToFinish() - if err != nil { - log.Debug("ProxyService RegisterNode WaitToFinish failed", zap.Error(err)) - return &proxypb.RegisterNodeResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, - InitParams: nil, - }, nil - } - - return t.response, nil -} - -func (s *ProxyService) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { - log.Debug("ProxyService receive InvalidateCollectionMetaCache request", - zap.String("db", request.DbName), - zap.String("collection", request.CollectionName)) - - t := &invalidateCollectionMetaCacheTask{ - ctx: ctx, - request: request, - Condition: newTaskCondition(ctx), - nodeInfos: s.nodeInfos, - } - - var err error - - err = s.sched.InvalidateCollectionMetaCacheTaskQueue.Enqueue(t) - if err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, nil - } - - err = t.WaitToFinish() - if err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, nil - } - - return t.response, nil -} diff --git a/internal/proxyservice/node_info.go b/internal/proxyservice/node_info.go deleted file mode 100644 index 0a3c319f47..0000000000 --- a/internal/proxyservice/node_info.go +++ /dev/null @@ -1,140 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package proxyservice - -import ( - "errors" - "math/rand" - "strconv" - "sync" - "time" - - "github.com/milvus-io/milvus/internal/util/funcutil" - - grpcproxynodeclient "github.com/milvus-io/milvus/internal/distributed/proxynode/client" - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/types" -) - -type nodeInfo struct { - ip string - port int64 -} - -type globalNodeInfoTable struct { - mu sync.RWMutex - infos map[UniqueID]*nodeInfo - nodeIDs []UniqueID - // lazy creating, so len(clients) <= len(infos) - ProxyNodes map[UniqueID]types.ProxyNode -} - -func (table *globalNodeInfoTable) randomPick() UniqueID { - l := len(table.nodeIDs) - choice := rand.Intn(l) - return table.nodeIDs[choice] -} - -func (table *globalNodeInfoTable) Pick() (*nodeInfo, error) { - table.mu.RLock() - defer table.mu.RUnlock() - - if len(table.nodeIDs) <= 0 || len(table.infos) <= 0 { - return nil, errors.New("no available server node") - } - - id := table.randomPick() - info, ok := table.infos[id] - if !ok { - // though impossible - return nil, errors.New("fix me, something wrong in pick algorithm") - } - - return info, nil -} - -func (table *globalNodeInfoTable) Register(id UniqueID, info *nodeInfo) error { - table.mu.Lock() - defer table.mu.Unlock() - - _, ok := table.infos[id] - if !ok { - table.infos[id] = info - } - - if !funcutil.SliceContain(table.nodeIDs, id) { - table.nodeIDs = append(table.nodeIDs, id) - } - - return nil -} - -func (table *globalNodeInfoTable) createClients() error { - if len(table.ProxyNodes) == len(table.infos) { - return nil - } - - for nodeID, info := range table.infos { - _, ok := table.ProxyNodes[nodeID] - if !ok { - table.ProxyNodes[nodeID] = grpcproxynodeclient.NewClient(info.ip+":"+strconv.Itoa(int(info.port)), 3*time.Second) - var err error - err = table.ProxyNodes[nodeID].Init() - if err != nil { - panic(err) - } - err = table.ProxyNodes[nodeID].Start() - if err != nil { - panic(err) - } - } - } - - return nil -} - -func (table *globalNodeInfoTable) ReleaseAllClients() error { - table.mu.Lock() - log.Debug("get write lock") - defer func() { - table.mu.Unlock() - log.Debug("release write lock") - }() - - var err error - for id, client := range table.ProxyNodes { - err = client.Stop() - if err != nil { - panic(err) - } - delete(table.ProxyNodes, id) - } - - return nil -} - -func (table *globalNodeInfoTable) ObtainAllClients() (map[UniqueID]types.ProxyNode, error) { - table.mu.RLock() - defer table.mu.RUnlock() - - err := table.createClients() - - return table.ProxyNodes, err -} - -func newGlobalNodeInfoTable() *globalNodeInfoTable { - return &globalNodeInfoTable{ - nodeIDs: make([]UniqueID, 0), - infos: make(map[UniqueID]*nodeInfo), - ProxyNodes: make(map[UniqueID]types.ProxyNode), - } -} diff --git a/internal/proxyservice/node_info_test.go b/internal/proxyservice/node_info_test.go deleted file mode 100644 index 5eb54f22b7..0000000000 --- a/internal/proxyservice/node_info_test.go +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package proxyservice - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestGlobalNodeInfoTable_Register(t *testing.T) { - table := newGlobalNodeInfoTable() - - idInfoMaps := map[UniqueID]*nodeInfo{ - 0: {"localhost", 1080}, - 1: {"localhost", 1081}, - } - - var err error - - err = table.Register(0, idInfoMaps[0]) - assert.Equal(t, nil, err) - - err = table.Register(1, idInfoMaps[1]) - assert.Equal(t, nil, err) - - /************** duplicated register ***************/ - - err = table.Register(0, idInfoMaps[0]) - assert.Equal(t, nil, err) - - err = table.Register(1, idInfoMaps[1]) - assert.Equal(t, nil, err) -} - -func TestGlobalNodeInfoTable_Pick(t *testing.T) { - table := newGlobalNodeInfoTable() - - var err error - - _, err = table.Pick() - assert.NotEqual(t, nil, err) - - idInfoMaps := map[UniqueID]*nodeInfo{ - 0: {"localhost", 1080}, - 1: {"localhost", 1081}, - } - - err = table.Register(0, idInfoMaps[0]) - assert.Equal(t, nil, err) - - err = table.Register(1, idInfoMaps[1]) - assert.Equal(t, nil, err) - - num := 10 - for i := 0; i < num; i++ { - _, err = table.Pick() - assert.Equal(t, nil, err) - } -} - -func TestGlobalNodeInfoTable_ObtainAllClients(t *testing.T) { - table := newGlobalNodeInfoTable() - - var err error - - clients, err := table.ObtainAllClients() - assert.Equal(t, nil, err) - assert.Equal(t, 0, len(clients)) -} - -func TestGlobalNodeInfoTable_ReleaseAllClients(t *testing.T) { - table := newGlobalNodeInfoTable() - - err := table.ReleaseAllClients() - assert.Equal(t, nil, err) -} diff --git a/internal/proxyservice/nodeid_allocator.go b/internal/proxyservice/nodeid_allocator.go deleted file mode 100644 index 49d7596a9b..0000000000 --- a/internal/proxyservice/nodeid_allocator.go +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package proxyservice - -import ( - "sync" - - "github.com/milvus-io/milvus/internal/allocator" - "github.com/milvus-io/milvus/internal/util/typeutil" -) - -type UniqueID = typeutil.UniqueID -type Timestamp = typeutil.Timestamp - -type nodeIDAllocator interface { - AllocOne() UniqueID -} - -type naiveNodeIDAllocator struct { - allocator *allocator.IDAllocator - now UniqueID - mtx sync.Mutex -} - -func (allocator *naiveNodeIDAllocator) AllocOne() UniqueID { - allocator.mtx.Lock() - defer func() { - // allocator.now++ - allocator.mtx.Unlock() - }() - return allocator.now -} - -func newNodeIDAllocator() *naiveNodeIDAllocator { - return &naiveNodeIDAllocator{ - now: 1, - } -} diff --git a/internal/proxyservice/nodeid_allocator_test.go b/internal/proxyservice/nodeid_allocator_test.go deleted file mode 100644 index 3994062d63..0000000000 --- a/internal/proxyservice/nodeid_allocator_test.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package proxyservice - -import ( - "testing" - - "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" -) - -func TestNaiveNodeIDAllocator_AllocOne(t *testing.T) { - allocator := newNodeIDAllocator() - - num := 10 - for i := 0; i < num; i++ { - nodeID := allocator.AllocOne() - log.Debug("TestNaiveNodeIDAllocator_AllocOne", zap.Any("node id", nodeID)) - } -} diff --git a/internal/proxyservice/paramtable.go b/internal/proxyservice/paramtable.go deleted file mode 100644 index a9d771a555..0000000000 --- a/internal/proxyservice/paramtable.go +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package proxyservice - -import ( - "path" - "strconv" - "sync" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/paramtable" -) - -type ParamTable struct { - paramtable.BaseTable - - PulsarAddress string - MasterAddress string - NodeTimeTickChannel []string - ServiceTimeTickChannel string - DataServiceAddress string - InsertChannelPrefixName string - InsertChannelNum int64 - - Log log.Config -} - -var Params ParamTable -var once sync.Once - -func (pt *ParamTable) Init() { - once.Do(func() { - pt.BaseTable.Init() - - if err := pt.LoadYaml("advanced/data_service.yaml"); err != nil { - panic(err) - } - - pt.initPulsarAddress() - pt.initMasterAddress() - pt.initNodeTimeTickChannel() - pt.initServiceTimeTickChannel() - pt.initDataServiceAddress() - pt.initInsertChannelPrefixName() - pt.initInsertChannelNum() - pt.initLogCfg() - }) -} - -func (pt *ParamTable) initPulsarAddress() { - ret, err := pt.Load("_PulsarAddress") - if err != nil { - panic(err) - } - pt.PulsarAddress = ret -} - -func (pt *ParamTable) initMasterAddress() { - ret, err := pt.Load("_MasterAddress") - if err != nil { - panic(err) - } - pt.MasterAddress = ret -} - -func (pt *ParamTable) initNodeTimeTickChannel() { - prefix, err := pt.Load("msgChannel.chanNamePrefix.proxyTimeTick") - if err != nil { - log.Error("proxyservice", zap.Error(err)) - } - prefix += "-0" - pt.NodeTimeTickChannel = []string{prefix} -} - -func (pt *ParamTable) initServiceTimeTickChannel() { - ch, err := pt.Load("msgChannel.chanNamePrefix.proxyServiceTimeTick") - if err != nil { - log.Error("proxyservice", zap.Error(err)) - } - pt.ServiceTimeTickChannel = ch -} - -func (pt *ParamTable) initDataServiceAddress() { - // NOT USED NOW - pt.DataServiceAddress = "TODO: read from config" -} - -func (pt *ParamTable) initInsertChannelNum() { - pt.InsertChannelNum = pt.ParseInt64("dataservice.insertChannelNum") -} - -func (pt *ParamTable) initInsertChannelPrefixName() { - var err error - pt.InsertChannelPrefixName, err = pt.Load("msgChannel.chanNamePrefix.dataServiceInsertChannel") - if err != nil { - panic(err) - } -} - -func (pt *ParamTable) initLogCfg() { - pt.Log = log.Config{} - format, err := pt.Load("log.format") - if err != nil { - panic(err) - } - pt.Log.Format = format - level, err := pt.Load("log.level") - if err != nil { - panic(err) - } - pt.Log.Level = level - devStr, err := pt.Load("log.dev") - if err != nil { - panic(err) - } - dev, err := strconv.ParseBool(devStr) - if err != nil { - panic(err) - } - pt.Log.Development = dev - pt.Log.File.MaxSize = pt.ParseInt("log.file.maxSize") - pt.Log.File.MaxBackups = pt.ParseInt("log.file.maxBackups") - pt.Log.File.MaxDays = pt.ParseInt("log.file.maxAge") - rootPath, err := pt.Load("log.file.rootPath") - if err != nil { - panic(err) - } - if len(rootPath) != 0 { - pt.Log.File.Filename = path.Join(rootPath, "proxyservice.log") - } else { - pt.Log.File.Filename = "" - } -} diff --git a/internal/proxyservice/paramtable_test.go b/internal/proxyservice/paramtable_test.go deleted file mode 100644 index 84bc40bfeb..0000000000 --- a/internal/proxyservice/paramtable_test.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package proxyservice - -import ( - "testing" - - "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" -) - -func TestParamTable_Init(t *testing.T) { - Params.Init() - - log.Debug("TestParamTable_Init", zap.Any("PulsarAddress", Params.PulsarAddress)) - log.Debug("TestParamTable_Init", zap.Any("MasterAddress", Params.MasterAddress)) - log.Debug("TestParamTable_Init", zap.Any("NodeTimeTickChannel", Params.NodeTimeTickChannel)) - log.Debug("TestParamTable_Init", zap.Any("ServiceTimeTickChannel", Params.ServiceTimeTickChannel)) - log.Debug("TestParamTable_Init", zap.Any("DataServiceAddress", Params.DataServiceAddress)) - log.Debug("TestParamTable_Init", zap.Any("InsertChannelPrefixName", Params.InsertChannelPrefixName)) - log.Debug("TestParamTable_Init", zap.Any("InsertChannelNum", Params.InsertChannelNum)) -} diff --git a/internal/proxyservice/proxyservice.go b/internal/proxyservice/proxyservice.go deleted file mode 100644 index c5db96787d..0000000000 --- a/internal/proxyservice/proxyservice.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package proxyservice - -import ( - "context" - "math/rand" - "sync/atomic" - "time" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/log" - - "github.com/milvus-io/milvus/internal/msgstream" - "github.com/milvus-io/milvus/internal/proto/commonpb" - "github.com/milvus-io/milvus/internal/proto/internalpb" -) - -type ProxyService struct { - allocator nodeIDAllocator - sched *taskScheduler - tick *TimeTick - nodeInfos *globalNodeInfoTable - stateCode atomic.Value - - nodeStartParams []*commonpb.KeyValuePair - - ctx context.Context - cancel context.CancelFunc - - msFactory msgstream.Factory -} - -func NewProxyService(ctx context.Context, factory msgstream.Factory) (*ProxyService, error) { - rand.Seed(time.Now().UnixNano()) - ctx1, cancel := context.WithCancel(ctx) - s := &ProxyService{ - ctx: ctx1, - cancel: cancel, - msFactory: factory, - } - - s.allocator = newNodeIDAllocator() - s.sched = newTaskScheduler(ctx1) - s.nodeInfos = newGlobalNodeInfoTable() - s.UpdateStateCode(internalpb.StateCode_Abnormal) - log.Debug("ProxyService", zap.Any("State", s.stateCode.Load())) - - return s, nil -} diff --git a/internal/proxyservice/task.go b/internal/proxyservice/task.go deleted file mode 100644 index 3ff0a511df..0000000000 --- a/internal/proxyservice/task.go +++ /dev/null @@ -1,222 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package proxyservice - -import ( - "context" - "errors" - - "github.com/milvus-io/milvus/internal/proto/commonpb" - "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/proto/milvuspb" - "github.com/milvus-io/milvus/internal/proto/proxypb" -) - -type TaskEnum = int - -const ( - FromSDK TaskEnum = 0 - FromMaster TaskEnum = 1 - FromNode TaskEnum = 2 -) - -const ( - RegisterLinkTaskName = "RegisLinkTask" - RegisterNodeTaskName = "RegisNodeTask" - InvalidateCollectionMetaCacheTaskName = "InvalidateCollectionMetaCacheTask" -) - -type task interface { - Ctx() context.Context - ID() UniqueID // return ReqID - Name() string - PreExecute(ctx context.Context) error - Execute(ctx context.Context) error - PostExecute(ctx context.Context) error - WaitToFinish() error - Notify(err error) -} - -type Condition interface { - WaitToFinish() error - Notify(err error) -} - -type taskCondition struct { - done chan error - ctx context.Context -} - -func (c *taskCondition) WaitToFinish() error { - select { - case <-c.ctx.Done(): - return errors.New("timeout") - case err := <-c.done: - return err - } -} - -func (c *taskCondition) Notify(err error) { - c.done <- err -} - -func newTaskCondition(ctx context.Context) Condition { - return &taskCondition{ - done: make(chan error), - ctx: ctx, - } -} - -type registerLinkTask struct { - Condition - ctx context.Context - response *milvuspb.RegisterLinkResponse - nodeInfos *globalNodeInfoTable -} - -func (t *registerLinkTask) Ctx() context.Context { - return t.ctx -} - -func (t *registerLinkTask) ID() UniqueID { - return 0 -} - -func (t *registerLinkTask) Name() string { - return RegisterLinkTaskName -} - -func (t *registerLinkTask) PreExecute(ctx context.Context) error { - return nil -} - -func (t *registerLinkTask) Execute(ctx context.Context) error { - info, err := t.nodeInfos.Pick() - if err != nil { - return err - } - t.response = &milvuspb.RegisterLinkResponse{ - Address: &commonpb.Address{ - Ip: info.ip, - Port: info.port, - }, - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, - } - return nil -} - -func (t *registerLinkTask) PostExecute(ctx context.Context) error { - return nil -} - -type registerNodeTask struct { - Condition - ctx context.Context - request *proxypb.RegisterNodeRequest - response *proxypb.RegisterNodeResponse - startParams []*commonpb.KeyValuePair - allocator nodeIDAllocator - nodeInfos *globalNodeInfoTable -} - -func (t *registerNodeTask) Ctx() context.Context { - return t.ctx -} - -func (t *registerNodeTask) ID() UniqueID { - return t.request.Base.MsgID -} - -func (t *registerNodeTask) Name() string { - return RegisterNodeTaskName -} - -func (t *registerNodeTask) PreExecute(ctx context.Context) error { - return nil -} - -func (t *registerNodeTask) Execute(ctx context.Context) error { - nodeID := t.allocator.AllocOne() - info := nodeInfo{ - ip: t.request.Address.Ip, - port: t.request.Address.Port, - } - err := t.nodeInfos.Register(nodeID, &info) - // TODO: fill init params - t.response = &proxypb.RegisterNodeResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, - InitParams: &internalpb.InitParams{ - NodeID: nodeID, - StartParams: t.startParams, - }, - } - return err -} - -func (t *registerNodeTask) PostExecute(ctx context.Context) error { - return nil -} - -type invalidateCollectionMetaCacheTask struct { - Condition - ctx context.Context - request *proxypb.InvalidateCollMetaCacheRequest - response *commonpb.Status - nodeInfos *globalNodeInfoTable -} - -func (t *invalidateCollectionMetaCacheTask) Ctx() context.Context { - return t.ctx -} - -func (t *invalidateCollectionMetaCacheTask) ID() UniqueID { - return t.request.Base.MsgID -} - -func (t *invalidateCollectionMetaCacheTask) Name() string { - return InvalidateCollectionMetaCacheTaskName -} - -func (t *invalidateCollectionMetaCacheTask) PreExecute(ctx context.Context) error { - return nil -} - -func (t *invalidateCollectionMetaCacheTask) Execute(ctx context.Context) error { - var err error - clients, err := t.nodeInfos.ObtainAllClients() - if err != nil { - return err - } - for _, c := range clients { - status, _ := c.InvalidateCollectionMetaCache(ctx, t.request) - if status == nil { - return errors.New("invalidate collection meta cache error") - } - if status.ErrorCode != commonpb.ErrorCode_Success { - return errors.New(status.Reason) - } - } - t.response = &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - } - return nil -} - -func (t *invalidateCollectionMetaCacheTask) PostExecute(ctx context.Context) error { - return nil -} diff --git a/internal/proxyservice/task_mock.go b/internal/proxyservice/task_mock.go deleted file mode 100644 index 31f31d496d..0000000000 --- a/internal/proxyservice/task_mock.go +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package proxyservice - -import "context" - -type mockTask struct { - ctx context.Context - id UniqueID - name string -} - -func (t *mockTask) Ctx() context.Context { - return t.ctx -} - -func (t *mockTask) ID() UniqueID { - return t.id -} - -func (t *mockTask) Name() string { - return t.name -} - -func (t *mockTask) PreExecute(ctx context.Context) error { - return nil -} - -func (t *mockTask) Execute(ctx context.Context) error { - return nil -} - -func (t *mockTask) PostExecute(ctx context.Context) error { - return nil -} - -func (t *mockTask) WaitToFinish() error { - return nil -} - -func (t *mockTask) Notify(err error) { -} - -func newMockTask(ctx context.Context) *mockTask { - return &mockTask{ - ctx: ctx, - id: 0, - name: "mockTask", - } -} - -type mockRegisterLinkTask struct { - mockTask -} - -type mockRegisterNodeTask struct { - mockTask -} - -type mockInvalidateCollectionMetaCacheTask struct { - mockTask -} - -func newMockRegisterLinkTask(ctx context.Context) *mockRegisterLinkTask { - return &mockRegisterLinkTask{ - mockTask: mockTask{ - ctx: ctx, - id: 0, - name: "mockRegisterLinkTask", - }, - } -} - -func newMockRegisterNodeTask(ctx context.Context) *mockRegisterNodeTask { - return &mockRegisterNodeTask{ - mockTask: mockTask{ - ctx: ctx, - id: 0, - name: "mockRegisterNodeTask", - }, - } -} - -func newMockInvalidateCollectionMetaCacheTask(ctx context.Context) *mockInvalidateCollectionMetaCacheTask { - return &mockInvalidateCollectionMetaCacheTask{ - mockTask: mockTask{ - ctx: ctx, - id: 0, - name: "mockInvalidateCollectionMetaCacheTask", - }, - } -} diff --git a/internal/proxyservice/task_queue.go b/internal/proxyservice/task_queue.go deleted file mode 100644 index 7dfba4a123..0000000000 --- a/internal/proxyservice/task_queue.go +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package proxyservice - -import ( - "container/list" - "errors" - "sync" - - "github.com/milvus-io/milvus/internal/log" -) - -type taskQueue interface { - Chan() <-chan int - Empty() bool - Full() bool - addTask(t task) error - FrontTask() task - PopTask() task - Enqueue(t task) error -} - -type baseTaskQueue struct { - tasks *list.List - mtx sync.Mutex - - // maxTaskNum should keep still - maxTaskNum int64 - - bufChan chan int // to block scheduler -} - -func (queue *baseTaskQueue) Chan() <-chan int { - return queue.bufChan -} - -func (queue *baseTaskQueue) Empty() bool { - return queue.tasks.Len() <= 0 -} - -func (queue *baseTaskQueue) Full() bool { - return int64(queue.tasks.Len()) >= queue.maxTaskNum -} - -func (queue *baseTaskQueue) addTask(t task) error { - queue.mtx.Lock() - defer queue.mtx.Unlock() - - if queue.Full() { - return errors.New("task queue is full") - } - queue.tasks.PushBack(t) - queue.bufChan <- 1 - return nil -} - -func (queue *baseTaskQueue) FrontTask() task { - queue.mtx.Lock() - defer queue.mtx.Unlock() - - if queue.tasks.Len() <= 0 { - log.Warn("sorry, but the task list is empty!") - return nil - } - - return queue.tasks.Front().Value.(task) -} - -func (queue *baseTaskQueue) PopTask() task { - queue.mtx.Lock() - defer queue.mtx.Unlock() - - if queue.tasks.Len() <= 0 { - log.Warn("sorry, but the task list is empty!") - return nil - } - - ft := queue.tasks.Front() - queue.tasks.Remove(ft) - - return ft.Value.(task) -} - -func (queue *baseTaskQueue) Enqueue(t task) error { - return queue.addTask(t) -} - -func newBaseTaskQueue() *baseTaskQueue { - return &baseTaskQueue{ - tasks: list.New(), - maxTaskNum: 1024, - bufChan: make(chan int, 1024), - } -} diff --git a/internal/proxyservice/task_queue_test.go b/internal/proxyservice/task_queue_test.go deleted file mode 100644 index aa917d680a..0000000000 --- a/internal/proxyservice/task_queue_test.go +++ /dev/null @@ -1,160 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package proxyservice - -import ( - "context" - "sync" - "testing" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/log" - "github.com/stretchr/testify/assert" -) - -func TestBaseTaskQueue_Enqueue(t *testing.T) { - queue := newBaseTaskQueue() - - num := 10 - var wg sync.WaitGroup - - for i := 0; i < num; i++ { - wg.Add(1) - go func() { - defer wg.Done() - - tsk := newMockTask(context.Background()) - err := queue.Enqueue(tsk) - assert.Equal(t, nil, err) - }() - } - - wg.Wait() -} - -func TestBaseTaskQueue_FrontTask(t *testing.T) { - queue := newBaseTaskQueue() - - tsk := queue.FrontTask() - assert.Equal(t, nil, tsk) - - frontTask := newMockTask(context.Background()) - err := queue.Enqueue(frontTask) - assert.Equal(t, nil, err) - tsk = queue.FrontTask() - assert.NotEqual(t, nil, tsk) - assert.Equal(t, frontTask.ID(), tsk.ID()) - assert.Equal(t, frontTask.Name(), tsk.Name()) - - num := 10 - - for i := 0; i < num; i++ { - tsk := newMockTask(context.Background()) - err := queue.Enqueue(tsk) - assert.Equal(t, nil, err) - - tskF := queue.FrontTask() - assert.NotEqual(t, nil, tskF) - assert.Equal(t, frontTask.ID(), tskF.ID()) - assert.Equal(t, frontTask.Name(), tskF.Name()) - } -} - -func TestBaseTaskQueue_PopTask(t *testing.T) { - queue := newBaseTaskQueue() - - tsk := queue.PopTask() - assert.Equal(t, nil, tsk) - - num := 10 - - for i := 0; i < num; i++ { - tsk := newMockTask(context.Background()) - err := queue.Enqueue(tsk) - assert.Equal(t, nil, err) - - tskP := queue.PopTask() - assert.NotEqual(t, nil, tskP) - } - - tsk = queue.PopTask() - assert.Equal(t, nil, tsk) -} - -func TestBaseTaskQueue_Chan(t *testing.T) { - queue := newBaseTaskQueue() - - ctx, cancel := context.WithCancel(context.Background()) - go func() { - for { - select { - case <-ctx.Done(): - log.Debug("TestBaseTaskQueue_Chan exit") - return - case i := <-queue.Chan(): - log.Debug("TestBaseTaskQueue_Chan", zap.Any("receive", i)) - } - } - }() - - num := 10 - var wg sync.WaitGroup - for i := 0; i < num; i++ { - wg.Add(1) - go func() { - defer wg.Done() - tsk := newMockTask(context.Background()) - err := queue.Enqueue(tsk) - assert.Equal(t, nil, err) - }() - } - - wg.Wait() - - cancel() -} - -func TestBaseTaskQueue_Empty(t *testing.T) { - queue := newBaseTaskQueue() - assert.Equal(t, true, queue.Empty()) - - num := 10 - for i := 0; i < num; i++ { - tsk := newMockTask(context.Background()) - err := queue.Enqueue(tsk) - assert.Equal(t, nil, err) - - assert.Equal(t, false, queue.Empty()) - } - - for !queue.Empty() { - assert.Equal(t, false, queue.Empty()) - queue.PopTask() - } - - assert.Equal(t, true, queue.Empty()) -} - -func TestBaseTaskQueue_Full(t *testing.T) { - queue := newBaseTaskQueue() - - for !queue.Full() { - assert.Equal(t, false, queue.Full()) - - tsk := newMockTask(context.Background()) - err := queue.Enqueue(tsk) - assert.Equal(t, nil, err) - } - - assert.Equal(t, true, queue.Full()) -} diff --git a/internal/proxyservice/task_scheduler.go b/internal/proxyservice/task_scheduler.go deleted file mode 100644 index 5a844a803e..0000000000 --- a/internal/proxyservice/task_scheduler.go +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package proxyservice - -import ( - "context" - "sync" - - "github.com/milvus-io/milvus/internal/util/trace" - "github.com/opentracing/opentracing-go" - oplog "github.com/opentracing/opentracing-go/log" -) - -type taskScheduler struct { - RegisterLinkTaskQueue taskQueue - RegisterNodeTaskQueue taskQueue - InvalidateCollectionMetaCacheTaskQueue taskQueue - - wg sync.WaitGroup - ctx context.Context - cancel context.CancelFunc -} - -func newTaskScheduler(ctx context.Context) *taskScheduler { - ctx1, cancel := context.WithCancel(ctx) - - return &taskScheduler{ - RegisterLinkTaskQueue: newBaseTaskQueue(), - RegisterNodeTaskQueue: newBaseTaskQueue(), - InvalidateCollectionMetaCacheTaskQueue: newBaseTaskQueue(), - ctx: ctx1, - cancel: cancel, - } -} - -func (sched *taskScheduler) scheduleRegisterLinkTask() task { - return sched.RegisterLinkTaskQueue.PopTask() -} - -func (sched *taskScheduler) scheduleRegisterNodeTask() task { - return sched.RegisterNodeTaskQueue.PopTask() -} - -func (sched *taskScheduler) scheduleInvalidateCollectionMetaCacheTask() task { - return sched.InvalidateCollectionMetaCacheTaskQueue.PopTask() -} - -func (sched *taskScheduler) processTask(t task, q taskQueue) { - span, ctx := trace.StartSpanFromContext(t.Ctx(), - opentracing.Tags{ - "Type": t.Name(), - }) - defer span.Finish() - span.LogFields(oplog.String("scheduler process PreExecute", t.Name())) - err := t.PreExecute(ctx) - - defer func() { - trace.LogError(span, err) - t.Notify(err) - }() - if err != nil { - return - } - - span.LogFields(oplog.String("scheduler process Execute", t.Name())) - err = t.Execute(ctx) - if err != nil { - trace.LogError(span, err) - return - } - span.LogFields(oplog.String("scheduler process PostExecute", t.Name())) - err = t.PostExecute(ctx) -} - -func (sched *taskScheduler) registerLinkLoop() { - defer sched.wg.Done() - for { - select { - case <-sched.ctx.Done(): - return - case <-sched.RegisterLinkTaskQueue.Chan(): - if !sched.RegisterLinkTaskQueue.Empty() { - t := sched.scheduleRegisterLinkTask() - go sched.processTask(t, sched.RegisterLinkTaskQueue) - } - } - } -} - -func (sched *taskScheduler) registerNodeLoop() { - defer sched.wg.Done() - for { - select { - case <-sched.ctx.Done(): - return - case <-sched.RegisterNodeTaskQueue.Chan(): - if !sched.RegisterNodeTaskQueue.Empty() { - t := sched.scheduleRegisterNodeTask() - go sched.processTask(t, sched.RegisterNodeTaskQueue) - } - } - } -} - -func (sched *taskScheduler) invalidateCollectionMetaCacheLoop() { - defer sched.wg.Done() - for { - select { - case <-sched.ctx.Done(): - return - case <-sched.InvalidateCollectionMetaCacheTaskQueue.Chan(): - if !sched.InvalidateCollectionMetaCacheTaskQueue.Empty() { - t := sched.scheduleInvalidateCollectionMetaCacheTask() - go sched.processTask(t, sched.InvalidateCollectionMetaCacheTaskQueue) - } - } - } -} - -func (sched *taskScheduler) Start() { - sched.wg.Add(1) - go sched.registerLinkLoop() - - sched.wg.Add(1) - go sched.registerNodeLoop() - - sched.wg.Add(1) - go sched.invalidateCollectionMetaCacheLoop() -} - -func (sched *taskScheduler) Close() { - sched.cancel() - sched.wg.Wait() -} diff --git a/internal/proxyservice/task_scheduler_test.go b/internal/proxyservice/task_scheduler_test.go deleted file mode 100644 index 97bcd2b142..0000000000 --- a/internal/proxyservice/task_scheduler_test.go +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package proxyservice - -import ( - "context" - "math/rand" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestTaskScheduler_Start(t *testing.T) { - sched := newTaskScheduler(context.Background()) - sched.Start() - defer sched.Close() - - num := 64 - var wg sync.WaitGroup - for i := 0; i < num; i++ { - wg.Add(1) - switch rand.Int() % 3 { - case 0: - go func() { - defer wg.Done() - tsk := newMockRegisterLinkTask(context.Background()) - err := sched.RegisterLinkTaskQueue.Enqueue(tsk) - assert.Equal(t, nil, err) - }() - case 1: - go func() { - defer wg.Done() - tsk := newMockRegisterNodeTask(context.Background()) - err := sched.RegisterNodeTaskQueue.Enqueue(tsk) - assert.Equal(t, nil, err) - }() - case 2: - go func() { - defer wg.Done() - tsk := newMockInvalidateCollectionMetaCacheTask(context.Background()) - err := sched.InvalidateCollectionMetaCacheTaskQueue.Enqueue(tsk) - assert.Equal(t, nil, err) - }() - default: - go func() { - defer wg.Done() - tsk := newMockRegisterLinkTask(context.Background()) - err := sched.RegisterLinkTaskQueue.Enqueue(tsk) - assert.Equal(t, nil, err) - }() - } - } - wg.Wait() - - time.Sleep(3 * time.Second) -} - -func TestTaskScheduler_Close(t *testing.T) { - sched := newTaskScheduler(context.Background()) - sched.Start() - defer sched.Close() - - num := 64 - var wg sync.WaitGroup - for i := 0; i < num; i++ { - wg.Add(1) - switch rand.Int() % 3 { - case 0: - go func() { - defer wg.Done() - tsk := newMockRegisterLinkTask(context.Background()) - err := sched.RegisterLinkTaskQueue.Enqueue(tsk) - assert.Equal(t, nil, err) - }() - case 1: - go func() { - defer wg.Done() - tsk := newMockRegisterNodeTask(context.Background()) - err := sched.RegisterNodeTaskQueue.Enqueue(tsk) - assert.Equal(t, nil, err) - }() - case 2: - go func() { - defer wg.Done() - tsk := newMockInvalidateCollectionMetaCacheTask(context.Background()) - err := sched.InvalidateCollectionMetaCacheTaskQueue.Enqueue(tsk) - assert.Equal(t, nil, err) - }() - default: - go func() { - defer wg.Done() - tsk := newMockRegisterLinkTask(context.Background()) - err := sched.RegisterLinkTaskQueue.Enqueue(tsk) - assert.Equal(t, nil, err) - }() - } - } - - wg.Wait() -} diff --git a/internal/proxyservice/timetick.go b/internal/proxyservice/timetick.go deleted file mode 100644 index 002ff66bfc..0000000000 --- a/internal/proxyservice/timetick.go +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package proxyservice - -import ( - "context" - "sync" - - "github.com/milvus-io/milvus/internal/timesync" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/msgstream" - "github.com/milvus-io/milvus/internal/proto/commonpb" - "github.com/milvus-io/milvus/internal/proto/internalpb" -) - -type TimeTick struct { - ttBarrier timesync.TimeTickBarrier - channels []msgstream.MsgStream - wg sync.WaitGroup - ctx context.Context - cancel context.CancelFunc -} - -func (tt *TimeTick) Start() error { - log.Debug("start time tick ...") - tt.wg.Add(1) - go func() { - defer tt.wg.Done() - for { - select { - case <-tt.ctx.Done(): - log.Debug("time tick loop was canceled by context!") - return - default: - current, err := tt.ttBarrier.GetTimeTick() - if err != nil { - log.Error("GetTimeTick error", zap.Error(err)) - break - } - msgPack := msgstream.MsgPack{} - timeTickMsg := &msgstream.TimeTickMsg{ - BaseMsg: msgstream.BaseMsg{ - HashValues: []uint32{0}, - }, - TimeTickMsg: internalpb.TimeTickMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_TimeTick, - MsgID: 0, - Timestamp: current, - SourceID: 0, - }, - }, - } - msgPack.Msgs = append(msgPack.Msgs, timeTickMsg) - //for _, msg := range msgPack.Msgs { - // log.Debug("proxyservice", zap.Stringer("msg type", msg.Type())) - //} - for _, channel := range tt.channels { - err = channel.Broadcast(&msgPack) - if err != nil { - log.Error("proxyservice", zap.String("send time tick error", err.Error())) - } - } - } - } - }() - - for _, channel := range tt.channels { - channel.Start() - } - - tt.ttBarrier.Start() - - return nil -} - -func (tt *TimeTick) Close() { - for _, channel := range tt.channels { - channel.Close() - } - tt.ttBarrier.Close() - tt.cancel() - tt.wg.Wait() -} - -func newTimeTick(ctx context.Context, ttBarrier timesync.TimeTickBarrier, channels ...msgstream.MsgStream) *TimeTick { - ctx1, cancel := context.WithCancel(ctx) - return &TimeTick{ctx: ctx1, cancel: cancel, ttBarrier: ttBarrier, channels: channels} -} diff --git a/internal/proxyservice/timetick_test.go b/internal/proxyservice/timetick_test.go deleted file mode 100644 index 33e9ad83d4..0000000000 --- a/internal/proxyservice/timetick_test.go +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package proxyservice - -import ( - "context" - "math" - "testing" - "time" - - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/proto/commonpb" - "github.com/milvus-io/milvus/internal/proto/internalpb" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/timesync" - - "github.com/milvus-io/milvus/internal/msgstream" - "github.com/stretchr/testify/assert" -) - -func ttStreamProduceLoop(ctx context.Context, ttStream msgstream.MsgStream, durationInterval time.Duration, sourceID int64) { - log.Debug("ttStreamProduceLoop", zap.Any("durationInterval", durationInterval)) - timer := time.NewTicker(durationInterval) - - go func() { - for { - select { - case <-ctx.Done(): - return - case <-timer.C: - ttMsgs := &msgstream.MsgPack{ - BeginTs: 0, - EndTs: 0, - Msgs: nil, - StartPositions: nil, - EndPositions: nil, - } - - currentT := uint64(time.Now().Nanosecond()) - msg := &msgstream.TimeTickMsg{ - BaseMsg: msgstream.BaseMsg{ - Ctx: ctx, - BeginTimestamp: 0, - EndTimestamp: 0, - HashValues: nil, - MsgPosition: nil, - }, - TimeTickMsg: internalpb.TimeTickMsg{ - Base: &commonpb.MsgBase{ - MsgType: 0, - MsgID: 0, - Timestamp: currentT, - SourceID: sourceID, - }, - }, - } - - ttMsgs.Msgs = append(ttMsgs.Msgs, msg) - - _ = ttStream.Produce(ttMsgs) - //log.Debug("ttStreamProduceLoop", zap.Any("Send", currentT)) - } - } - }() -} - -func TestTimeTick_Start(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ttStream := msgstream.NewSimpleMsgStream() - sourceID := 1 - peerIds := []UniqueID{UniqueID(sourceID)} - interval := 100 - minTtInterval := Timestamp(interval) - - durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18 - - ttBarrier := timesync.NewSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval) - channels := msgstream.NewSimpleMsgStream() - - tick := newTimeTick(ctx, ttBarrier, channels) - err := tick.Start() - ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID)) - assert.Equal(t, nil, err) - defer tick.Close() -} - -func TestTimeTick_Close(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ttStream := msgstream.NewSimpleMsgStream() - sourceID := 1 - peerIds := []UniqueID{UniqueID(sourceID)} - interval := 100 - minTtInterval := Timestamp(interval) - - durationInterval := time.Duration(interval*int(math.Pow10(6))) >> 18 - - ttBarrier := timesync.NewSoftTimeTickBarrier(ctx, ttStream, peerIds, minTtInterval) - channels := msgstream.NewSimpleMsgStream() - - tick := newTimeTick(ctx, ttBarrier, channels) - err := tick.Start() - ttStreamProduceLoop(ctx, ttStream, durationInterval, int64(sourceID)) - assert.Equal(t, nil, err) - defer tick.Close() -} diff --git a/internal/types/types.go b/internal/types/types.go index 5bf9e6643a..87d9875ba3 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -164,14 +164,6 @@ type ProxyNode interface { */ } -type ProxyService interface { - Component - TimeTickProvider - - RegisterNode(ctx context.Context, req *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error) - InvalidateCollectionMetaCache(ctx context.Context, req *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) -} - type QueryNode interface { Component TimeTickProvider diff --git a/internal/util/typeutil/type.go b/internal/util/typeutil/type.go index dee1dc584c..0407059487 100644 --- a/internal/util/typeutil/type.go +++ b/internal/util/typeutil/type.go @@ -17,7 +17,6 @@ type UniqueID = int64 const ( MasterServiceRole = "MasterService" - ProxyServiceRole = "ProxyService" ProxyNodeRole = "ProxyNode" QueryServiceRole = "QueryService" QueryNodeRole = "QueryNode" diff --git a/scripts/run_docker.sh b/scripts/run_docker.sh index 271fe847d8..552514536d 100755 --- a/scripts/run_docker.sh +++ b/scripts/run_docker.sh @@ -3,9 +3,6 @@ cd ../build/docker/deploy/ echo "starting master docker" nohup docker-compose -p milvus up master > ~/master_docker.log 2>&1 & -echo "starting proxyservice docker" -nohup docker-compose -p milvus up proxyservice > ~/proxyservice_docker.log 2>&1 & - echo "starting proxynode docker" nohup docker-compose -p milvus up proxynode > ~/proxynode_docker.log 2>&1 & diff --git a/scripts/start.sh b/scripts/start.sh index 1a6dc6cb68..2ffd4fda21 100755 --- a/scripts/start.sh +++ b/scripts/start.sh @@ -9,9 +9,6 @@ nohup ./bin/milvus run dataservice > ~/dataservice.out 2>&1 & echo "starting datanode" nohup ./bin/milvus run datanode > ~/datanode.out 2>&1 & -echo "starting proxyservice" -nohup ./bin/milvus run proxyservice > ~/proxyservice.out 2>&1 & - echo "starting proxynode" nohup ./bin/milvus run proxynode > ~/proxynode.out 2>&1 &