diff --git a/internal/distributed/datacoord/client/client.go b/internal/distributed/datacoord/client/client.go index 9d660ed58b..18de3ea86d 100644 --- a/internal/distributed/datacoord/client/client.go +++ b/internal/distributed/datacoord/client/client.go @@ -20,7 +20,6 @@ import ( "context" "fmt" - "github.com/milvus-io/milvus/internal/util/typeutil" "go.uber.org/zap" "google.golang.org/grpc" @@ -31,9 +30,13 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/grpcclient" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/internal/util/typeutil" ) +var Params paramtable.GrpcClientConfig + // Client is the datacoord grpc client type Client struct { grpcClient grpcclient.GrpcClient @@ -48,7 +51,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*C log.Debug("DataCoordClient NewClient failed", zap.Error(err)) return nil, err } - Params.Init() + Params.InitOnce(typeutil.DataCoordRole) client := &Client{ grpcClient: &grpcclient.ClientBase{ ClientMaxRecvSize: Params.ClientMaxRecvSize, diff --git a/internal/distributed/datacoord/client/param_table.go b/internal/distributed/datacoord/client/param_table.go deleted file mode 100644 index c5b7d24e36..0000000000 --- a/internal/distributed/datacoord/client/param_table.go +++ /dev/null @@ -1,98 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcdatacoordclient - -import ( - "strconv" - "sync" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/util/paramtable" -) - -// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by -// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration. -type ParamTable struct { - paramtable.BaseTable - - ClientMaxSendSize int - ClientMaxRecvSize int -} - -// Params is a package scoped variable of type ParamTable. -var Params ParamTable -var once sync.Once - -// Init is an override method of BaseTable's Init. It mainly calls the -// Init of BaseTable and do some other initialization. -func (pt *ParamTable) Init() { - once.Do(func() { - pt.BaseTable.Init() - - pt.initClientMaxSendSize() - pt.initClientMaxRecvSize() - }) -} - -func (pt *ParamTable) initClientMaxSendSize() { - var err error - - valueStr, err := pt.Load("dataCoord.grpc.clientMaxSendSize") - if err != nil { // not set - pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse dataCoord.grpc.clientMaxSendSize, set to default", - zap.String("dataCoord.grpc.clientMaxSendSize", valueStr), - zap.Error(err)) - - pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize - } else { - pt.ClientMaxSendSize = value - } - - log.Debug("initClientMaxSendSize", - zap.Int("dataCoord.grpc.clientMaxSendSize", pt.ClientMaxSendSize)) -} - -func (pt *ParamTable) initClientMaxRecvSize() { - var err error - - valueStr, err := pt.Load("dataCoord.grpc.clientMaxRecvSize") - if err != nil { // not set - pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse dataCoord.grpc.clientMaxRecvSize, set to default", - zap.String("dataCoord.grpc.clientMaxRecvSize", valueStr), - zap.Error(err)) - - pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize - } else { - pt.ClientMaxRecvSize = value - } - - log.Debug("initClientMaxRecvSize", - zap.Int("dataCoord.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize)) -} diff --git a/internal/distributed/datacoord/client/param_table_test.go b/internal/distributed/datacoord/client/param_table_test.go deleted file mode 100644 index ab7307ce41..0000000000 --- a/internal/distributed/datacoord/client/param_table_test.go +++ /dev/null @@ -1,43 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcdatacoordclient - -import ( - "testing" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" -) - -func TestParamTable(t *testing.T) { - Params.Init() - - log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize)) - log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize)) - - err := Params.Remove("dataCoord.grpc.clientMaxSendSize") - assert.Nil(t, err) - Params.initClientMaxSendSize() - assert.Equal(t, Params.ClientMaxSendSize, grpcconfigs.DefaultClientMaxSendSize) - - err = Params.Remove("dataCoord.grpc.clientMaxRecvSize") - assert.Nil(t, err) - Params.initClientMaxRecvSize() - assert.Equal(t, Params.ClientMaxRecvSize, grpcconfigs.DefaultClientMaxRecvSize) -} diff --git a/internal/distributed/datacoord/param_table.go b/internal/distributed/datacoord/param_table.go deleted file mode 100644 index d00ef1516e..0000000000 --- a/internal/distributed/datacoord/param_table.go +++ /dev/null @@ -1,123 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcdatacoord - -import ( - "strconv" - "sync" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/paramtable" - "go.uber.org/zap" -) - -// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by -// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration. -type ParamTable struct { - paramtable.BaseTable - - IP string - Port int - Address string - - ServerMaxSendSize int - ServerMaxRecvSize int -} - -// Params is a package scoped variable of type ParamTable. -var Params ParamTable -var once sync.Once - -// Init is an override method of BaseTable's Init. It mainly calls the -// Init of BaseTable and do some other initialization. -func (pt *ParamTable) Init() { - once.Do(func() { - pt.BaseTable.Init() - pt.initParams() - pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10) - }) -} - -func (pt *ParamTable) initParams() { - pt.loadFromEnv() - pt.loadFromArgs() - pt.initPort() - - pt.initServerMaxSendSize() - pt.initServerMaxRecvSize() -} - -func (pt *ParamTable) loadFromEnv() { - Params.IP = funcutil.GetLocalIP() -} - -// LoadFromArgs is used to initialize configuration items from args. -func (pt *ParamTable) loadFromArgs() { - -} - -func (pt *ParamTable) initPort() { - pt.Port = pt.ParseInt("dataCoord.port") -} - -func (pt *ParamTable) initServerMaxSendSize() { - var err error - - valueStr, err := pt.Load("dataCoord.grpc.serverMaxSendSize") - if err != nil { // not set - pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse dataCoord.grpc.serverMaxSendSize, set to default", - zap.String("dataCoord.grpc.serverMaxSendSize", valueStr), - zap.Error(err)) - - pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize - } else { - pt.ServerMaxSendSize = value - } - - log.Debug("initServerMaxSendSize", - zap.Int("dataCoord.grpc.serverMaxSendSize", pt.ServerMaxSendSize)) -} - -func (pt *ParamTable) initServerMaxRecvSize() { - var err error - - valueStr, err := pt.Load("dataCoord.grpc.serverMaxRecvSize") - if err != nil { // not set - pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse dataCoord.grpc.serverMaxRecvSize, set to default", - zap.String("dataCoord.grpc.serverMaxRecvSize", valueStr), - zap.Error(err)) - - pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize - } else { - pt.ServerMaxRecvSize = value - } - - log.Debug("initServerMaxRecvSize", - zap.Int("dataCoord.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize)) -} diff --git a/internal/distributed/datacoord/param_table_test.go b/internal/distributed/datacoord/param_table_test.go deleted file mode 100644 index d869f513f0..0000000000 --- a/internal/distributed/datacoord/param_table_test.go +++ /dev/null @@ -1,47 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcdatacoord - -import ( - "testing" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" - - "github.com/stretchr/testify/assert" -) - -func TestParamTable(t *testing.T) { - Params.Init() - - assert.NotEqual(t, Params.Port, 0) - t.Logf("DataCoord Port:%d", Params.Port) - - log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) - log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) - - err := Params.Remove("dataCoord.grpc.ServerMaxSendSize") - assert.Nil(t, err) - Params.initServerMaxSendSize() - assert.Equal(t, Params.ServerMaxSendSize, grpcconfigs.DefaultServerMaxSendSize) - - err = Params.Remove("dataCoord.grpc.ServerMaxRecvSize") - assert.Nil(t, err) - Params.initServerMaxRecvSize() - assert.Equal(t, Params.ServerMaxRecvSize, grpcconfigs.DefaultServerMaxRecvSize) -} diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index d45b602322..f8aa8eefbd 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -25,28 +25,29 @@ import ( "sync" "time" - "github.com/milvus-io/milvus/internal/logutil" - "github.com/milvus-io/milvus/internal/types" - "go.uber.org/zap" - "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/milvus-io/milvus/internal/datacoord" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/logutil" "github.com/milvus-io/milvus/internal/msgstream" - "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/trace" - "google.golang.org/grpc/keepalive" - "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/milvus-io/milvus/internal/util/trace" + "github.com/milvus-io/milvus/internal/util/typeutil" ) +var Params paramtable.GrpcServerConfig + // Server is the grpc server of datacoord type Server struct { ctx context.Context @@ -78,7 +79,7 @@ func NewServer(ctx context.Context, factory msgstream.Factory, opts ...datacoord } func (s *Server) init() error { - Params.Init() + Params.InitOnce(typeutil.DataCoordRole) closer := trace.InitTracing("datacoord") s.closer = closer @@ -86,7 +87,7 @@ func (s *Server) init() error { datacoord.Params.InitOnce() datacoord.Params.IP = Params.IP datacoord.Params.Port = Params.Port - datacoord.Params.Address = Params.Address + datacoord.Params.Address = Params.GetAddress() err := s.startGrpc() if err != nil { @@ -170,7 +171,7 @@ func (s *Server) start() error { // Stop stops the DataCoord server gracefully. // Need to call the GracefulStop interface of grpc server and call the stop method of the inner DataCoord object. func (s *Server) Stop() error { - log.Debug("Datacoord stop", zap.String("Address", Params.Address)) + log.Debug("Datacoord stop", zap.String("Address", Params.GetAddress())) var err error if s.closer != nil { if err = s.closer.Close(); err != nil { diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 28dffedbe5..84221eaf25 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -20,7 +20,6 @@ import ( "context" "fmt" - "github.com/milvus-io/milvus/internal/util/typeutil" "google.golang.org/grpc" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -29,8 +28,12 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/grpcclient" + "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/milvus-io/milvus/internal/util/typeutil" ) +var Params paramtable.GrpcClientConfig + // Client is the grpc client for DataNode type Client struct { grpcClient grpcclient.GrpcClient @@ -42,7 +45,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) { if addr == "" { return nil, fmt.Errorf("address is empty") } - Params.Init() + Params.InitOnce(typeutil.DataNodeRole) client := &Client{ addr: addr, grpcClient: &grpcclient.ClientBase{ diff --git a/internal/distributed/datanode/client/param_table.go b/internal/distributed/datanode/client/param_table.go deleted file mode 100644 index 9443e2ba15..0000000000 --- a/internal/distributed/datanode/client/param_table.go +++ /dev/null @@ -1,98 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcdatanodeclient - -import ( - "strconv" - "sync" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/util/paramtable" -) - -// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by -// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration. -type ParamTable struct { - paramtable.BaseTable - - ClientMaxSendSize int - ClientMaxRecvSize int -} - -// Params is a package scoped variable of type ParamTable. -var Params ParamTable -var once sync.Once - -// Init is an override method of BaseTable's Init. It mainly calls the -// Init of BaseTable and do some other initialization. -func (pt *ParamTable) Init() { - once.Do(func() { - pt.BaseTable.Init() - - pt.initClientMaxSendSize() - pt.initClientMaxRecvSize() - }) -} - -func (pt *ParamTable) initClientMaxSendSize() { - var err error - - valueStr, err := pt.Load("dataNode.grpc.clientMaxSendSize") - if err != nil { // not set - pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse dataNode.grpc.clientMaxSendSize, set to default", - zap.String("dataNode.grpc.clientMaxSendSize", valueStr), - zap.Error(err)) - - pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize - } else { - pt.ClientMaxSendSize = value - } - - log.Debug("initClientMaxSendSize", - zap.Int("dataNode.grpc.clientMaxSendSize", pt.ClientMaxSendSize)) -} - -func (pt *ParamTable) initClientMaxRecvSize() { - var err error - - valueStr, err := pt.Load("dataNode.grpc.clientMaxRecvSize") - if err != nil { // not set - pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse dataNode.grpc.clientMaxRecvSize, set to default", - zap.String("dataNode.grpc.clientMaxRecvSize", valueStr), - zap.Error(err)) - - pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize - } else { - pt.ClientMaxRecvSize = value - } - - log.Debug("initClientMaxRecvSize", - zap.Int("dataNode.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize)) -} diff --git a/internal/distributed/datanode/client/param_table_test.go b/internal/distributed/datanode/client/param_table_test.go deleted file mode 100644 index 8c9aea1d9e..0000000000 --- a/internal/distributed/datanode/client/param_table_test.go +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcdatanodeclient - -import ( - "testing" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" -) - -func TestParamTable(t *testing.T) { - Params.Init() - - log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize)) - log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize)) - - Params.Remove("dataNode.grpc.clientMaxSendSize") - Params.initClientMaxSendSize() - assert.Equal(t, Params.ClientMaxSendSize, grpcconfigs.DefaultClientMaxSendSize) - - Params.Remove("dataNode.grpc.clientMaxRecvSize") - Params.initClientMaxRecvSize() - assert.Equal(t, Params.ClientMaxRecvSize, grpcconfigs.DefaultClientMaxRecvSize) -} diff --git a/internal/distributed/datanode/param_table.go b/internal/distributed/datanode/param_table.go deleted file mode 100644 index 7feddecd0c..0000000000 --- a/internal/distributed/datanode/param_table.go +++ /dev/null @@ -1,137 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcdatanode - -import ( - "net" - "strconv" - "sync" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/paramtable" - "go.uber.org/zap" -) - -// Params is a package scoped variable of type ParamTable. -var Params ParamTable -var once sync.Once - -// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by -// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration. -type ParamTable struct { - paramtable.BaseTable - - IP string - Port int - Address string - listener net.Listener - - ServerMaxSendSize int - ServerMaxRecvSize int -} - -// Init is an override method of BaseTable's Init. It mainly calls the -// Init of BaseTable and do some other initialization. -func (pt *ParamTable) Init() { - once.Do(func() { - pt.BaseTable.Init() - pt.initParams() - pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10) - - listener, err := net.Listen("tcp", pt.Address) - if err != nil { - panic(err) - } - pt.listener = listener - }) -} - -// initParams initializes params of the configuration items. -func (pt *ParamTable) initParams() { - pt.loadFromEnv() - pt.loadFromArgs() - - pt.initPort() - pt.initServerMaxSendSize() - pt.initServerMaxRecvSize() -} - -func (pt *ParamTable) loadFromArgs() { - -} - -func (pt *ParamTable) loadFromEnv() { - Params.IP = funcutil.GetLocalIP() -} - -func (pt *ParamTable) initPort() { - port := pt.ParseInt("dataNode.port") - pt.Port = port - if !funcutil.CheckPortAvailable(pt.Port) { - pt.Port = funcutil.GetAvailablePort() - log.Warn("DataNode init", zap.Any("Port", pt.Port)) - } -} - -func (pt *ParamTable) initServerMaxSendSize() { - var err error - - valueStr, err := pt.Load("dataNode.grpc.serverMaxSendSize") - if err != nil { // not set - pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse dataNode.grpc.serverMaxSendSize, set to default", - zap.String("dataNode.grpc.serverMaxSendSize", valueStr), - zap.Error(err)) - - pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize - } else { - pt.ServerMaxSendSize = value - } - - log.Debug("initServerMaxSendSize", - zap.Int("dataNode.grpc.serverMaxSendSize", pt.ServerMaxSendSize)) -} - -func (pt *ParamTable) initServerMaxRecvSize() { - var err error - - valueStr, err := pt.Load("dataNode.grpc.serverMaxRecvSize") - if err != nil { // not set - pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // invalid format - log.Warn("Failed to parse dataNode.grpc.serverMaxRecvSize, set to default", - zap.String("dataNode.grpc.serverMaxRecvSize", valueStr), - zap.Error(err)) - - pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize - } else { - pt.ServerMaxRecvSize = value - } - - log.Debug("initServerMaxRecvSize", - zap.Int("dataNode.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize)) -} diff --git a/internal/distributed/datanode/param_table_test.go b/internal/distributed/datanode/param_table_test.go deleted file mode 100644 index 02c7e1d497..0000000000 --- a/internal/distributed/datanode/param_table_test.go +++ /dev/null @@ -1,52 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcdatanode - -import ( - "testing" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" - - "github.com/stretchr/testify/assert" -) - -func TestParamTable(t *testing.T) { - Params.Init() - - Params.loadFromEnv() - assert.NotEqual(t, Params.IP, "") - t.Logf("DataNode IP:%s", Params.IP) - - assert.NotEqual(t, Params.Port, 0) - t.Logf("DataNode Port:%d", Params.Port) - - assert.NotNil(t, Params.listener) - t.Logf("DataNode listener:%d", Params.listener) - - log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) - log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) - - Params.Remove("dataNode.grpc.serverMaxSendSize") - Params.initServerMaxSendSize() - assert.Equal(t, Params.ServerMaxSendSize, grpcconfigs.DefaultServerMaxSendSize) - - Params.Remove("dataNode.grpc.serverMaxRecvSize") - Params.initServerMaxRecvSize() - assert.Equal(t, Params.ServerMaxRecvSize, grpcconfigs.DefaultServerMaxRecvSize) -} diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 6ecc70efae..5eae9d523f 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -28,6 +28,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" dn "github.com/milvus-io/milvus/internal/datanode" dsc "github.com/milvus-io/milvus/internal/distributed/datacoord/client" @@ -42,10 +43,13 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/trace" - "google.golang.org/grpc/keepalive" + "github.com/milvus-io/milvus/internal/util/typeutil" ) +var Params paramtable.GrpcServerConfig + type Server struct { datanode types.DataNodeComponent wg sync.WaitGroup @@ -88,7 +92,7 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) func (s *Server) startGrpc() error { s.wg.Add(1) - go s.startGrpcLoop(Params.listener) + go s.startGrpcLoop(Params.Listener) // wait for grpc server loop start err := <-s.grpcErrChan return err @@ -154,7 +158,7 @@ func (s *Server) Run() error { // Stop stops Datanode's grpc service. func (s *Server) Stop() error { - log.Debug("Datanode stop", zap.String("Address", Params.Address)) + log.Debug("Datanode stop", zap.String("Address", Params.GetAddress())) if s.closer != nil { if err := s.closer.Close(); err != nil { return err @@ -191,7 +195,7 @@ func (s *Server) Stop() error { // init initializes Datanode's grpc service. func (s *Server) init() error { ctx := context.Background() - Params.Init() + Params.InitOnce(typeutil.DataNodeRole) dn.Params.InitOnce() dn.Params.Port = Params.Port diff --git a/internal/distributed/grpcconfigs/configs.go b/internal/distributed/grpcconfigs/configs.go deleted file mode 100644 index 80a335c47d..0000000000 --- a/internal/distributed/grpcconfigs/configs.go +++ /dev/null @@ -1,33 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcconfigs - -import "math" - -const ( - // DefaultServerMaxSendSize defines the maximum size of data per grpc request can send by server side. - DefaultServerMaxSendSize = math.MaxInt32 - - // DefaultServerMaxRecvSize defines the maximum size of data per grpc request can receive by server side. - DefaultServerMaxRecvSize = math.MaxInt32 - - // DefaultClientMaxSendSize defines the maximum size of data per grpc request can send by client side. - DefaultClientMaxSendSize = 100 * 1024 * 1024 - - // DefaultClientMaxRecvSize defines the maximum size of data per grpc request can receive by client side. - DefaultClientMaxRecvSize = 100 * 1024 * 1024 -) diff --git a/internal/distributed/indexcoord/client/client.go b/internal/distributed/indexcoord/client/client.go index 83ef6037e6..b2e29abbe4 100644 --- a/internal/distributed/indexcoord/client/client.go +++ b/internal/distributed/indexcoord/client/client.go @@ -30,10 +30,13 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/grpcclient" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/typeutil" ) +var Params paramtable.GrpcClientConfig + // Client is the grpc client of IndexCoord. type Client struct { grpcClient grpcclient.GrpcClient @@ -48,7 +51,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*C log.Debug("IndexCoordClient NewClient failed", zap.Error(err)) return nil, err } - Params.Init() + Params.InitOnce(typeutil.IndexCoordRole) client := &Client{ grpcClient: &grpcclient.ClientBase{ ClientMaxRecvSize: Params.ClientMaxRecvSize, diff --git a/internal/distributed/indexcoord/client/client_test.go b/internal/distributed/indexcoord/client/client_test.go index a091a5cb2b..90b6ef10e3 100644 --- a/internal/distributed/indexcoord/client/client_test.go +++ b/internal/distributed/indexcoord/client/client_test.go @@ -26,11 +26,12 @@ import ( "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/stretchr/testify/assert" ) func TestIndexCoordClient(t *testing.T) { - Params.Init() + Params.InitOnce(typeutil.IndexCoordRole) ctx := context.Background() server, err := grpcindexcoord.NewServer(ctx) assert.Nil(t, err) diff --git a/internal/distributed/indexcoord/client/param_table.go b/internal/distributed/indexcoord/client/param_table.go deleted file mode 100644 index 3be577cf53..0000000000 --- a/internal/distributed/indexcoord/client/param_table.go +++ /dev/null @@ -1,96 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// - -package grpcindexcoordclient - -import ( - "strconv" - "sync" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/util/paramtable" -) - -// ParamTable is used to record configuration items. -type ParamTable struct { - paramtable.BaseTable - - ClientMaxSendSize int - ClientMaxRecvSize int -} - -// Params is an alias for ParamTable. -var Params ParamTable -var once sync.Once - -// Init is used to initialize configuration items. -func (pt *ParamTable) Init() { - once.Do(func() { - pt.BaseTable.Init() - - pt.initClientMaxSendSize() - pt.initClientMaxRecvSize() - }) -} - -func (pt *ParamTable) initClientMaxSendSize() { - var err error - - valueStr, err := pt.Load("indexCoord.grpc.clientMaxSendSize") - if err != nil { // not set - pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse indexCoord.grpc.clientMaxSendSize, set to default", - zap.String("indexCoord.grpc.clientMaxSendSize", valueStr), - zap.Error(err)) - - pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize - } else { - pt.ClientMaxSendSize = value - } - - log.Debug("initClientMaxSendSize", - zap.Int("indexCoord.grpc.clientMaxSendSize", pt.ClientMaxSendSize)) -} - -func (pt *ParamTable) initClientMaxRecvSize() { - var err error - - valueStr, err := pt.Load("indexCoord.grpc.clientMaxRecvSize") - if err != nil { // not set - pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse indexCoord.grpc.clientMaxRecvSize, set to default", - zap.String("indexCoord.grpc.clientMaxRecvSize", valueStr), - zap.Error(err)) - - pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize - } else { - pt.ClientMaxRecvSize = value - } - - log.Debug("initClientMaxRecvSize", - zap.Int("indexCoord.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize)) -} diff --git a/internal/distributed/indexcoord/client/param_table_test.go b/internal/distributed/indexcoord/client/param_table_test.go deleted file mode 100644 index 819f7029f9..0000000000 --- a/internal/distributed/indexcoord/client/param_table_test.go +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcindexcoordclient - -import ( - "testing" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" -) - -func TestParamTable(t *testing.T) { - Params.Init() - - log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize)) - log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize)) - - Params.Remove("indexCoord.grpc.clientMaxSendSize") - Params.initClientMaxSendSize() - assert.Equal(t, Params.ClientMaxSendSize, grpcconfigs.DefaultClientMaxSendSize) - - Params.Remove("indexCoord.grpc.clientMaxRecvSize") - Params.initClientMaxRecvSize() - assert.Equal(t, Params.ClientMaxRecvSize, grpcconfigs.DefaultClientMaxRecvSize) -} diff --git a/internal/distributed/indexcoord/param_table.go b/internal/distributed/indexcoord/param_table.go deleted file mode 100644 index 4dc62dc8bf..0000000000 --- a/internal/distributed/indexcoord/param_table.go +++ /dev/null @@ -1,131 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcindexcoord - -import ( - "strconv" - "sync" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/paramtable" - "go.uber.org/zap" -) - -// ParamTable is used to record configuration items. -type ParamTable struct { - paramtable.BaseTable - - IP string - Port int - Address string - - ServerMaxSendSize int - ServerMaxRecvSize int -} - -// Params is an alias for ParamTable. -var Params ParamTable -var once sync.Once - -// Init is used to initialize configuration items. -func (pt *ParamTable) Init() { - once.Do(func() { - pt.BaseTable.Init() - pt.initParams() - pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10) - }) -} - -// initParams initializes params of the configuration items. -func (pt *ParamTable) initParams() { - pt.LoadFromEnv() - pt.LoadFromArgs() - - pt.initPort() - pt.initServerMaxSendSize() - pt.initServerMaxRecvSize() -} - -// initPort initializes the port of IndexCoord service. -func (pt *ParamTable) initPort() { - pt.Port = pt.ParseInt("indexCoord.port") -} - -// LoadFromEnv gets the configuration from environment variables. -func (pt *ParamTable) LoadFromEnv() { - Params.IP = funcutil.GetLocalIP() -} - -// LoadFromArgs is used to initialize configuration items from args. -func (pt *ParamTable) loadFromArgs() { - -} - -// LoadFromArgs is used to initialize configuration items from args. -func (pt *ParamTable) LoadFromArgs() { - -} - -// initServerMaxSendSize initializes the max send size of IndexCoord service. -func (pt *ParamTable) initServerMaxSendSize() { - var err error - - valueStr, err := pt.Load("indexCoord.grpc.serverMaxSendSize") - if err != nil { // not set - pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse indexCoord.grpc.serverMaxSendSize, set to default", - zap.String("indexCoord.grpc.serverMaxSendSize", valueStr), - zap.Error(err)) - - pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize - } else { - pt.ServerMaxSendSize = value - } - - log.Debug("initServerMaxSendSize", - zap.Int("indexCoord.grpc.serverMaxSendSize", pt.ServerMaxSendSize)) -} - -// initServerMaxSendSize initializes the max receive size of IndexCoord service. -func (pt *ParamTable) initServerMaxRecvSize() { - var err error - - valueStr, err := pt.Load("indexCoord.grpc.serverMaxRecvSize") - if err != nil { // not set - pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse indexCoord.grpc.serverMaxRecvSize, set to default", - zap.String("indexCoord.grpc.serverMaxRecvSize", valueStr), - zap.Error(err)) - - pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize - } else { - pt.ServerMaxRecvSize = value - } - - log.Debug("initServerMaxRecvSize", - zap.Int("indexCoord.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize)) -} diff --git a/internal/distributed/indexcoord/param_table_test.go b/internal/distributed/indexcoord/param_table_test.go deleted file mode 100644 index c1e0b9479c..0000000000 --- a/internal/distributed/indexcoord/param_table_test.go +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcindexcoord - -import ( - "testing" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" -) - -func TestParamTable(t *testing.T) { - Params.Init() - - log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) - log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) - - Params.Remove("indexCoord.grpc.ServerMaxSendSize") - Params.initServerMaxSendSize() - assert.Equal(t, Params.ServerMaxSendSize, grpcconfigs.DefaultServerMaxSendSize) - - Params.Remove("indexCoord.grpc.ServerMaxRecvSize") - Params.initServerMaxRecvSize() - assert.Equal(t, Params.ServerMaxRecvSize, grpcconfigs.DefaultServerMaxRecvSize) -} diff --git a/internal/distributed/indexcoord/service.go b/internal/distributed/indexcoord/service.go index ac8d0fa26d..9b9028f3a6 100644 --- a/internal/distributed/indexcoord/service.go +++ b/internal/distributed/indexcoord/service.go @@ -24,8 +24,6 @@ import ( "sync" "time" - "github.com/milvus-io/milvus/internal/types" - "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -37,11 +35,15 @@ import ( "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/typeutil" ) +var Params paramtable.GrpcServerConfig + // UniqueID is an alias of int64, is used as a unique identifier for the request. type UniqueID = typeutil.UniqueID @@ -75,10 +77,10 @@ func (s *Server) Run() error { // init initializes IndexCoord's grpc service. func (s *Server) init() error { - Params.Init() + Params.InitOnce(typeutil.IndexCoordRole) indexcoord.Params.InitOnce() - indexcoord.Params.Address = Params.Address + indexcoord.Params.Address = Params.GetAddress() indexcoord.Params.Port = Params.Port closer := trace.InitTracing("IndexCoord") @@ -115,7 +117,7 @@ func (s *Server) start() error { // Stop stops IndexCoord's grpc service. func (s *Server) Stop() error { - log.Debug("Indexcoord stop", zap.String("Address", Params.Address)) + log.Debug("Indexcoord stop", zap.String("Address", Params.GetAddress())) if s.closer != nil { if err := s.closer.Close(); err != nil { return err diff --git a/internal/distributed/indexnode/client/client.go b/internal/distributed/indexnode/client/client.go index 4fb3b1426f..77f06cc32c 100644 --- a/internal/distributed/indexnode/client/client.go +++ b/internal/distributed/indexnode/client/client.go @@ -26,10 +26,13 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/grpcclient" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/typeutil" "google.golang.org/grpc" ) +var Params paramtable.GrpcClientConfig + // Client is the grpc client of IndexNode. type Client struct { grpcClient grpcclient.GrpcClient @@ -41,7 +44,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) { if addr == "" { return nil, fmt.Errorf("address is empty") } - Params.Init() + Params.InitOnce(typeutil.IndexNodeRole) client := &Client{ addr: addr, grpcClient: &grpcclient.ClientBase{ diff --git a/internal/distributed/indexnode/client/param_table.go b/internal/distributed/indexnode/client/param_table.go deleted file mode 100644 index f10e642fe0..0000000000 --- a/internal/distributed/indexnode/client/param_table.go +++ /dev/null @@ -1,96 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcindexnodeclient - -import ( - "strconv" - "sync" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/util/paramtable" -) - -// ParamTable is used to record configuration items. -type ParamTable struct { - paramtable.BaseTable - - ClientMaxSendSize int - ClientMaxRecvSize int -} - -// Params is an instance of ParamTable. -var Params ParamTable -var once sync.Once - -// Init is used to initialize configuration items. -func (pt *ParamTable) Init() { - once.Do(func() { - pt.BaseTable.Init() - - pt.initClientMaxSendSize() - pt.initClientMaxRecvSize() - }) -} - -func (pt *ParamTable) initClientMaxSendSize() { - var err error - - valueStr, err := pt.Load("indexNode.grpc.clientMaxSendSize") - if err != nil { // not set - pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse indexNode.grpc.clientMaxSendSize, set to default", - zap.String("indexNode.grpc.clientMaxSendSize", valueStr), - zap.Error(err)) - - pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize - } else { - pt.ClientMaxSendSize = value - } - - log.Debug("initClientMaxSendSize", - zap.Int("indexNode.grpc.clientMaxSendSize", pt.ClientMaxSendSize)) -} - -func (pt *ParamTable) initClientMaxRecvSize() { - var err error - - valueStr, err := pt.Load("indexNode.grpc.clientMaxRecvSize") - if err != nil { // not set - pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse indexNode.grpc.clientMaxRecvSize, set to default", - zap.String("indexNode.grpc.clientMaxRecvSize", valueStr), - zap.Error(err)) - - pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize - } else { - pt.ClientMaxRecvSize = value - } - - log.Debug("initClientMaxRecvSize", - zap.Int("indexNode.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize)) -} diff --git a/internal/distributed/indexnode/client/param_table_test.go b/internal/distributed/indexnode/client/param_table_test.go deleted file mode 100644 index c9c74d99a4..0000000000 --- a/internal/distributed/indexnode/client/param_table_test.go +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcindexnodeclient - -import ( - "testing" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" -) - -func TestParamTable(t *testing.T) { - Params.Init() - - log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize)) - log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize)) - - Params.Remove("indexNode.grpc.clientMaxSendSize") - Params.initClientMaxSendSize() - assert.Equal(t, Params.ClientMaxSendSize, grpcconfigs.DefaultClientMaxSendSize) - - Params.Remove("indexNode.grpc.clientMaxRecvSize") - Params.initClientMaxRecvSize() - assert.Equal(t, Params.ClientMaxRecvSize, grpcconfigs.DefaultClientMaxRecvSize) -} diff --git a/internal/distributed/indexnode/param_table.go b/internal/distributed/indexnode/param_table.go deleted file mode 100644 index d0cf46f9e8..0000000000 --- a/internal/distributed/indexnode/param_table.go +++ /dev/null @@ -1,128 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcindexnode - -import ( - "strconv" - "sync" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/paramtable" -) - -// ParamTable is used to record configuration items. -type ParamTable struct { - paramtable.BaseTable - - IP string - Port int - Address string - - ServerMaxSendSize int - ServerMaxRecvSize int -} - -// Params is an alias for ParamTable. -var Params ParamTable -var once sync.Once - -// Init is used to initialize configuration items. -func (pt *ParamTable) Init() { - once.Do(func() { - pt.BaseTable.Init() - pt.initParams() - }) -} - -// LoadFromArgs is used to initialize configuration items from args. -func (pt *ParamTable) LoadFromArgs() { - -} - -// LoadFromEnv is used to initialize configuration items from env. -func (pt *ParamTable) LoadFromEnv() { - Params.IP = funcutil.GetLocalIP() -} - -// initParams initializes params of the configuration items. -func (pt *ParamTable) initParams() { - pt.LoadFromEnv() - pt.LoadFromArgs() - pt.initPort() - pt.initServerMaxSendSize() - pt.initServerMaxRecvSize() -} - -// initPort initializes the port of IndexNode service. -func (pt *ParamTable) initPort() { - port := pt.ParseInt("indexNode.port") - pt.Port = port - if !funcutil.CheckPortAvailable(pt.Port) { - pt.Port = funcutil.GetAvailablePort() - log.Warn("IndexNode init", zap.Any("Port", pt.Port)) - } -} - -func (pt *ParamTable) initServerMaxSendSize() { - var err error - - valueStr, err := pt.Load("indexNode.grpc.serverMaxSendSize") - if err != nil { // not set - pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse indexNode.grpc.serverMaxSendSize, set to default", - zap.String("indexNode.grpc.serverMaxSendSize", valueStr), - zap.Error(err)) - - pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize - } else { - pt.ServerMaxSendSize = value - } - - log.Debug("initServerMaxSendSize", - zap.Int("indexNode.grpc.serverMaxSendSize", pt.ServerMaxSendSize)) -} - -func (pt *ParamTable) initServerMaxRecvSize() { - var err error - - valueStr, err := pt.Load("indexNode.grpc.serverMaxRecvSize") - if err != nil { // not set - pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse indexNode.grpc.serverMaxRecvSize, set to default", - zap.String("indexNode.grpc.serverMaxRecvSize", valueStr), - zap.Error(err)) - - pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize - } else { - pt.ServerMaxRecvSize = value - } - - log.Debug("initServerMaxRecvSize", - zap.Int("indexNode.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize)) -} diff --git a/internal/distributed/indexnode/param_table_test.go b/internal/distributed/indexnode/param_table_test.go deleted file mode 100644 index 2200eac81e..0000000000 --- a/internal/distributed/indexnode/param_table_test.go +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcindexnode - -import ( - "testing" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" -) - -func TestParamTable(t *testing.T) { - Params.Init() - - log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) - log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) - - Params.Remove("indexNode.grpc.ServerMaxSendSize") - Params.initServerMaxSendSize() - assert.Equal(t, Params.ServerMaxSendSize, grpcconfigs.DefaultServerMaxSendSize) - - Params.Remove("indexNode.grpc.ServerMaxRecvSize") - Params.initServerMaxRecvSize() - assert.Equal(t, Params.ServerMaxRecvSize, grpcconfigs.DefaultServerMaxRecvSize) -} diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 6206179626..8f74963cb9 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -25,9 +25,8 @@ import ( "sync" "time" - "github.com/milvus-io/milvus/internal/types" - "go.uber.org/zap" + "google.golang.org/grpc" "google.golang.org/grpc/keepalive" grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" @@ -37,11 +36,15 @@ import ( "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/trace" - "google.golang.org/grpc" + "github.com/milvus-io/milvus/internal/util/typeutil" ) +var Params paramtable.GrpcServerConfig + // Server is the grpc wrapper of IndexNode. type Server struct { indexnode types.IndexNode @@ -73,7 +76,7 @@ func (s *Server) Run() error { func (s *Server) startGrpcLoop(grpcPort int) { defer s.loopWg.Done() - log.Debug("IndexNode", zap.String("network address", Params.Address), zap.Int("network port: ", grpcPort)) + log.Debug("IndexNode", zap.String("network address", Params.GetAddress()), zap.Int("network port: ", grpcPort)) lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort)) if err != nil { log.Warn("IndexNode", zap.String("GrpcServer:failed to listen", err.Error())) @@ -112,18 +115,16 @@ func (s *Server) startGrpcLoop(grpcPort int) { // init initializes IndexNode's grpc service. func (s *Server) init() error { var err error - Params.Init() + Params.InitOnce(typeutil.IndexNodeRole) indexnode.Params.InitOnce() indexnode.Params.Port = Params.Port indexnode.Params.IP = Params.IP - indexnode.Params.Address = Params.Address + indexnode.Params.Address = Params.GetAddress() closer := trace.InitTracing(fmt.Sprintf("IndexNode-%d", indexnode.Params.NodeID)) s.closer = closer - Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10) - defer func() { if err != nil { err = s.Stop() @@ -168,7 +169,7 @@ func (s *Server) start() error { // Stop stops IndexNode's grpc service. func (s *Server) Stop() error { - log.Debug("IndexNode stop", zap.String("Address", Params.Address)) + log.Debug("IndexNode stop", zap.String("Address", Params.GetAddress())) if s.closer != nil { if err := s.closer.Close(); err != nil { return err diff --git a/internal/distributed/proxy/client/client.go b/internal/distributed/proxy/client/client.go index f09e0f0aa1..224712e64e 100644 --- a/internal/distributed/proxy/client/client.go +++ b/internal/distributed/proxy/client/client.go @@ -26,10 +26,13 @@ import ( "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/grpcclient" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/typeutil" "google.golang.org/grpc" ) +var Params paramtable.GrpcClientConfig + // Client is the grpc client for Proxy type Client struct { grpcClient grpcclient.GrpcClient @@ -41,7 +44,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) { if addr == "" { return nil, fmt.Errorf("address is empty") } - Params.Init() + Params.InitOnce(typeutil.ProxyRole) client := &Client{ addr: addr, grpcClient: &grpcclient.ClientBase{ diff --git a/internal/distributed/proxy/client/param_table.go b/internal/distributed/proxy/client/param_table.go deleted file mode 100644 index 1aa3dba850..0000000000 --- a/internal/distributed/proxy/client/param_table.go +++ /dev/null @@ -1,98 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcproxyclient - -import ( - "strconv" - "sync" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/util/paramtable" -) - -// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by -// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration. -type ParamTable struct { - paramtable.BaseTable - - ClientMaxSendSize int - ClientMaxRecvSize int -} - -// Params is a package scoped variable of type ParamTable. -var Params ParamTable -var once sync.Once - -// Init is an override method of BaseTable's Init. It mainly calls the -// Init of BaseTable and do some other initialization. -func (pt *ParamTable) Init() { - once.Do(func() { - pt.BaseTable.Init() - - pt.initClientMaxSendSize() - pt.initClientMaxRecvSize() - }) -} - -func (pt *ParamTable) initClientMaxSendSize() { - var err error - - valueStr, err := pt.Load("proxy.grpc.clientMaxSendSize") - if err != nil { // not set - pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse proxy.grpc.clientMaxSendSize, set to default", - zap.String("proxy.grpc.clientMaxSendSize", valueStr), - zap.Error(err)) - - pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize - } else { - pt.ClientMaxSendSize = value - } - - log.Debug("initClientMaxSendSize", - zap.Int("proxy.grpc.clientMaxSendSize", pt.ClientMaxSendSize)) -} - -func (pt *ParamTable) initClientMaxRecvSize() { - var err error - - valueStr, err := pt.Load("proxy.grpc.clientMaxRecvSize") - if err != nil { // not set - pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse proxy.grpc.clientMaxRecvSize, set to default", - zap.String("proxy.grpc.clientMaxRecvSize", valueStr), - zap.Error(err)) - - pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize - } else { - pt.ClientMaxRecvSize = value - } - - log.Debug("initClientMaxRecvSize", - zap.Int("proxy.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize)) -} diff --git a/internal/distributed/proxy/client/param_table_test.go b/internal/distributed/proxy/client/param_table_test.go deleted file mode 100644 index 20b3108b36..0000000000 --- a/internal/distributed/proxy/client/param_table_test.go +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcproxyclient - -import ( - "testing" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" -) - -func TestParamTable(t *testing.T) { - Params.Init() - - log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize)) - log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize)) - - Params.Remove("proxy.grpc.clientMaxSendSize") - Params.initClientMaxSendSize() - assert.Equal(t, Params.ClientMaxSendSize, grpcconfigs.DefaultClientMaxSendSize) - - Params.Remove("proxy.grpc.clientMaxRecvSize") - Params.initClientMaxRecvSize() - assert.Equal(t, Params.ClientMaxRecvSize, grpcconfigs.DefaultClientMaxRecvSize) -} diff --git a/internal/distributed/proxy/param_table.go b/internal/distributed/proxy/param_table.go deleted file mode 100644 index 76982107c3..0000000000 --- a/internal/distributed/proxy/param_table.go +++ /dev/null @@ -1,123 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcproxy - -import ( - "strconv" - "sync" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/paramtable" -) - -// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by -// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration. -type ParamTable struct { - paramtable.BaseTable - - IP string - Port int - Address string - - ServerMaxSendSize int - ServerMaxRecvSize int -} - -// Params is a package scoped variable of type ParamTable. -var Params ParamTable -var once sync.Once - -// Init is an override method of BaseTable's Init. It mainly calls the -// Init of BaseTable and do some other initialization. -func (pt *ParamTable) Init() { - once.Do(func() { - pt.BaseTable.Init() - pt.initParams() - pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10) - }) -} - -func (pt *ParamTable) loadFromArgs() { - -} - -func (pt *ParamTable) loadFromEnv() { - pt.IP = funcutil.GetLocalIP() -} - -func (pt *ParamTable) initParams() { - pt.loadFromEnv() - pt.loadFromArgs() - pt.initPort() - pt.initServerMaxSendSize() - pt.initServerMaxRecvSize() -} - -func (pt *ParamTable) initPort() { - port := pt.ParseInt("proxy.port") - pt.Port = port -} - -func (pt *ParamTable) initServerMaxSendSize() { - var err error - - valueStr, err := pt.Load("proxy.grpc.serverMaxSendSize") - if err != nil { // not set - pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse proxy.grpc.serverMaxSendSize, set to default", - zap.String("proxy.grpc.serverMaxSendSize", valueStr), - zap.Error(err)) - - pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize - } else { - pt.ServerMaxSendSize = value - } - - log.Debug("initServerMaxSendSize", - zap.Int("proxy.grpc.serverMaxSendSize", pt.ServerMaxSendSize)) -} - -func (pt *ParamTable) initServerMaxRecvSize() { - var err error - - valueStr, err := pt.Load("proxy.grpc.serverMaxRecvSize") - if err != nil { // not set - pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse proxy.grpc.serverMaxRecvSize, set to default", - zap.String("proxy.grpc.serverMaxRecvSize", valueStr), - zap.Error(err)) - - pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize - } else { - pt.ServerMaxRecvSize = value - } - - log.Debug("initServerMaxRecvSize", - zap.Int("proxy.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize)) -} diff --git a/internal/distributed/proxy/param_table_test.go b/internal/distributed/proxy/param_table_test.go deleted file mode 100644 index ef193ce628..0000000000 --- a/internal/distributed/proxy/param_table_test.go +++ /dev/null @@ -1,45 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcproxy - -import ( - "testing" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" -) - -func TestParamTable(t *testing.T) { - Params.Init() - - log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) - log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) - - Params.Remove("proxy.grpc.serverMaxSendSize") - Params.initServerMaxSendSize() - assert.Equal(t, Params.ServerMaxSendSize, grpcconfigs.DefaultServerMaxSendSize) - - Params.Remove("proxy.grpc.serverMaxRecvSize") - Params.initServerMaxRecvSize() - assert.Equal(t, Params.ServerMaxRecvSize, grpcconfigs.DefaultServerMaxRecvSize) - - Params.loadFromEnv() - assert.Equal(t, Params.IP, funcutil.GetLocalIP()) -} diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index 8b8bf7348a..2d84582840 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -43,7 +43,9 @@ import ( "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proxy" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/trace" + "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/opentracing/opentracing-go" "google.golang.org/grpc/keepalive" ) @@ -53,6 +55,8 @@ const ( GRPCMaxMagSize = 2 << 30 ) +var Params paramtable.GrpcServerConfig + // Server is the Proxy Server type Server struct { ctx context.Context @@ -150,11 +154,7 @@ func (s *Server) Run() error { func (s *Server) init() error { var err error - Params.Init() - if !funcutil.CheckPortAvailable(Params.Port) { - Params.Port = funcutil.GetAvailablePort() - log.Warn("Proxy init", zap.Any("Port", Params.Port)) - } + Params.InitOnce(typeutil.ProxyRole) proxy.Params.InitOnce() log.Debug("init params done ...") @@ -163,14 +163,14 @@ func (s *Server) init() error { proxy.Params.NetworkPort = Params.Port proxy.Params.IP = Params.IP - proxy.Params.NetworkAddress = Params.Address + proxy.Params.NetworkAddress = Params.GetAddress() closer := trace.InitTracing(fmt.Sprintf("proxy ip: %s, port: %d", Params.IP, Params.Port)) s.closer = closer log.Debug("proxy", zap.String("proxy host", Params.IP)) log.Debug("proxy", zap.Int("proxy port", Params.Port)) - log.Debug("proxy", zap.String("proxy address", Params.Address)) + log.Debug("proxy", zap.String("proxy address", Params.GetAddress())) s.wg.Add(1) go s.startGrpcLoop(Params.Port) @@ -272,7 +272,7 @@ func (s *Server) start() error { // Stop stop the Proxy Server func (s *Server) Stop() error { - log.Debug("Proxy stop", zap.String("Address", Params.Address)) + log.Debug("Proxy stop", zap.String("Address", Params.GetAddress())) var err error if s.closer != nil { if err = s.closer.Close(); err != nil { diff --git a/internal/distributed/querycoord/client/client.go b/internal/distributed/querycoord/client/client.go index 2df37177d7..a90d2af59b 100644 --- a/internal/distributed/querycoord/client/client.go +++ b/internal/distributed/querycoord/client/client.go @@ -27,12 +27,15 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/grpcclient" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/typeutil" "go.uber.org/zap" "google.golang.org/grpc" ) +var Params paramtable.GrpcClientConfig + // Client is the grpc client of QueryCoord. type Client struct { grpcClient grpcclient.GrpcClient @@ -47,7 +50,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*C log.Debug("QueryCoordClient NewClient failed", zap.Error(err)) return nil, err } - Params.Init() + Params.InitOnce(typeutil.QueryCoordRole) client := &Client{ grpcClient: &grpcclient.ClientBase{ ClientMaxRecvSize: Params.ClientMaxRecvSize, diff --git a/internal/distributed/querycoord/client/param_table.go b/internal/distributed/querycoord/client/param_table.go deleted file mode 100644 index 18bab784cb..0000000000 --- a/internal/distributed/querycoord/client/param_table.go +++ /dev/null @@ -1,98 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcquerycoordclient - -import ( - "strconv" - "sync" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/util/paramtable" -) - -// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by -// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration. -type ParamTable struct { - paramtable.BaseTable - - ClientMaxSendSize int - ClientMaxRecvSize int -} - -// Params is a package scoped variable of type ParamTable. -var Params ParamTable -var once sync.Once - -// Init is an override method of BaseTable's Init. It mainly calls the -// Init of BaseTable and do some other initialization. -func (pt *ParamTable) Init() { - once.Do(func() { - pt.BaseTable.Init() - - pt.initClientMaxSendSize() - pt.initClientMaxRecvSize() - }) -} - -func (pt *ParamTable) initClientMaxSendSize() { - var err error - - valueStr, err := pt.Load("queryCoord.grpc.clientMaxSendSize") - if err != nil { // not set - pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse queryCoord.grpc.clientMaxSendSize, set to default", - zap.String("queryCoord.grpc.clientMaxSendSize", valueStr), - zap.Error(err)) - - pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize - } else { - pt.ClientMaxSendSize = value - } - - log.Debug("initClientMaxSendSize", - zap.Int("queryCoord.grpc.clientMaxSendSize", pt.ClientMaxSendSize)) -} - -func (pt *ParamTable) initClientMaxRecvSize() { - var err error - - valueStr, err := pt.Load("queryCoord.grpc.clientMaxRecvSize") - if err != nil { // not set - pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse queryCoord.grpc.clientMaxRecvSize, set to default", - zap.String("queryCoord.grpc.clientMaxRecvSize", valueStr), - zap.Error(err)) - - pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize - } else { - pt.ClientMaxRecvSize = value - } - - log.Debug("initClientMaxRecvSize", - zap.Int("queryCoord.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize)) -} diff --git a/internal/distributed/querycoord/client/param_table_test.go b/internal/distributed/querycoord/client/param_table_test.go deleted file mode 100644 index 87db926e81..0000000000 --- a/internal/distributed/querycoord/client/param_table_test.go +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcquerycoordclient - -import ( - "testing" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" -) - -func TestParamTable(t *testing.T) { - Params.Init() - - log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize)) - log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize)) - - Params.Remove("queryCoord.grpc.clientMaxSendSize") - Params.initClientMaxSendSize() - assert.Equal(t, Params.ClientMaxSendSize, grpcconfigs.DefaultClientMaxSendSize) - - Params.Remove("queryCoord.grpc.clientMaxRecvSize") - Params.initClientMaxRecvSize() - assert.Equal(t, Params.ClientMaxRecvSize, grpcconfigs.DefaultClientMaxRecvSize) -} diff --git a/internal/distributed/querycoord/param_table.go b/internal/distributed/querycoord/param_table.go deleted file mode 100644 index f0e1b14b83..0000000000 --- a/internal/distributed/querycoord/param_table.go +++ /dev/null @@ -1,122 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcquerycoord - -import ( - "strconv" - "sync" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/paramtable" - "go.uber.org/zap" -) - -// Params is a package scoped variable of type ParamTable. -var Params ParamTable -var once sync.Once - -// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by -// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration. -type ParamTable struct { - paramtable.BaseTable - - IP string - Port int - Address string - ServerMaxSendSize int - ServerMaxRecvSize int -} - -// Init is an override method of BaseTable's Init. It mainly calls the -// Init of BaseTable and do some other initialization. -func (pt *ParamTable) Init() { - once.Do(func() { - pt.BaseTable.Init() - pt.initParams() - pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10) - }) -} - -// initParams initializes params of the configuration items. -func (pt *ParamTable) initParams() { - pt.LoadFromEnv() - pt.LoadFromArgs() - pt.initPort() - pt.initServerMaxSendSize() - pt.initServerMaxRecvSize() -} - -func (pt *ParamTable) initPort() { - pt.Port = pt.ParseInt("queryCoord.port") -} - -func (pt *ParamTable) LoadFromEnv() { - pt.IP = funcutil.GetLocalIP() -} - -// LoadFromArgs is used to initialize configuration items from args. -func (pt *ParamTable) LoadFromArgs() { - -} - -func (pt *ParamTable) initServerMaxSendSize() { - var err error - - valueStr, err := pt.Load("queryCoord.grpc.serverMaxSendSize") - if err != nil { // not set - pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse queryCoord.grpc.serverMaxSendSize, set to default", - zap.String("queryCoord.grpc.serverMaxSendSize", valueStr), - zap.Error(err)) - - pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize - } else { - pt.ServerMaxSendSize = value - } - - log.Debug("initServerMaxSendSize", - zap.Int("queryCoord.grpc.serverMaxSendSize", pt.ServerMaxSendSize)) -} - -func (pt *ParamTable) initServerMaxRecvSize() { - var err error - - valueStr, err := pt.Load("queryCoord.grpc.serverMaxRecvSize") - if err != nil { // not set - pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse queryCoord.grpc.serverMaxRecvSize, set to default", - zap.String("queryCoord.grpc.serverMaxRecvSize", valueStr), - zap.Error(err)) - - pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize - } else { - pt.ServerMaxRecvSize = value - } - - log.Debug("initServerMaxRecvSize", - zap.Int("queryCoord.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize)) -} diff --git a/internal/distributed/querycoord/param_table_test.go b/internal/distributed/querycoord/param_table_test.go deleted file mode 100644 index 7437a7c817..0000000000 --- a/internal/distributed/querycoord/param_table_test.go +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcquerycoord - -import ( - "testing" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "github.com/stretchr/testify/assert" -) - -func TestParamTable(t *testing.T) { - Params.Init() - - log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) - log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) - - Params.Remove("queryCoord.grpc.ServerMaxSendSize") - Params.initServerMaxSendSize() - assert.Equal(t, Params.ServerMaxSendSize, grpcconfigs.DefaultServerMaxSendSize) - - Params.Remove("queryCoord.grpc.ServerMaxRecvSize") - Params.initServerMaxRecvSize() - assert.Equal(t, Params.ServerMaxRecvSize, grpcconfigs.DefaultServerMaxRecvSize) -} diff --git a/internal/distributed/querycoord/service.go b/internal/distributed/querycoord/service.go index 7c1089246a..666d3ea453 100644 --- a/internal/distributed/querycoord/service.go +++ b/internal/distributed/querycoord/service.go @@ -40,10 +40,14 @@ import ( qc "github.com/milvus-io/milvus/internal/querycoord" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/trace" + "github.com/milvus-io/milvus/internal/util/typeutil" "google.golang.org/grpc/keepalive" ) +var Params paramtable.GrpcServerConfig + // Server is the grpc server of QueryCoord. type Server struct { wg sync.WaitGroup @@ -99,10 +103,10 @@ func (s *Server) Run() error { // init initializes QueryCoord's grpc service. func (s *Server) init() error { - Params.Init() + Params.InitOnce(typeutil.QueryCoordRole) qc.Params.InitOnce() - qc.Params.Address = Params.Address + qc.Params.Address = Params.GetAddress() qc.Params.Port = Params.Port closer := trace.InitTracing("querycoord") @@ -266,7 +270,7 @@ func (s *Server) start() error { // Stop stops QueryCoord's grpc service. func (s *Server) Stop() error { - log.Debug("QueryCoord stop", zap.String("Address", Params.Address)) + log.Debug("QueryCoord stop", zap.String("Address", Params.GetAddress())) if s.closer != nil { if err := s.closer.Close(); err != nil { return err diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index 0a2b3f54ef..3939bdc63a 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -26,10 +26,13 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/grpcclient" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/typeutil" "google.golang.org/grpc" ) +var Params paramtable.GrpcClientConfig + // Client is the grpc client of QueryNode. type Client struct { grpcClient grpcclient.GrpcClient @@ -41,7 +44,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) { if addr == "" { return nil, fmt.Errorf("addr is empty") } - Params.Init() + Params.InitOnce(typeutil.QueryNodeRole) client := &Client{ addr: addr, grpcClient: &grpcclient.ClientBase{ diff --git a/internal/distributed/querynode/client/param_table.go b/internal/distributed/querynode/client/param_table.go deleted file mode 100644 index cd4319164a..0000000000 --- a/internal/distributed/querynode/client/param_table.go +++ /dev/null @@ -1,98 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcquerynodeclient - -import ( - "strconv" - "sync" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/util/paramtable" -) - -// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by -// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration. -type ParamTable struct { - paramtable.BaseTable - - ClientMaxSendSize int - ClientMaxRecvSize int -} - -// Params is a package scoped variable of type ParamTable. -var Params ParamTable -var once sync.Once - -// Init is an override method of BaseTable's Init. It mainly calls the -// Init of BaseTable and do some other initialization. -func (pt *ParamTable) Init() { - once.Do(func() { - pt.BaseTable.Init() - - pt.initClientMaxSendSize() - pt.initClientMaxRecvSize() - }) -} - -func (pt *ParamTable) initClientMaxSendSize() { - var err error - - valueStr, err := pt.Load("queryNode.grpc.clientMaxSendSize") - if err != nil { // not set - pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse queryNode.grpc.clientMaxSendSize, set to default", - zap.String("queryNode.grpc.clientMaxSendSize", valueStr), - zap.Error(err)) - - pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize - } else { - pt.ClientMaxSendSize = value - } - - log.Debug("initClientMaxSendSize", - zap.Int("queryNode.grpc.clientMaxSendSize", pt.ClientMaxSendSize)) -} - -func (pt *ParamTable) initClientMaxRecvSize() { - var err error - - valueStr, err := pt.Load("queryNode.grpc.clientMaxRecvSize") - if err != nil { // not set - pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse queryNode.grpc.clientMaxRecvSize, set to default", - zap.String("queryNode.grpc.clientMaxRecvSize", valueStr), - zap.Error(err)) - - pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize - } else { - pt.ClientMaxRecvSize = value - } - - log.Debug("initClientMaxRecvSize", - zap.Int("queryNode.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize)) -} diff --git a/internal/distributed/querynode/client/param_table_test.go b/internal/distributed/querynode/client/param_table_test.go deleted file mode 100644 index f70979bcf0..0000000000 --- a/internal/distributed/querynode/client/param_table_test.go +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcquerynodeclient - -import ( - "testing" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" -) - -func TestParamTable(t *testing.T) { - Params.Init() - - log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize)) - log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize)) - - Params.Remove("queryNode.grpc.clientMaxSendSize") - Params.initClientMaxSendSize() - assert.Equal(t, Params.ClientMaxSendSize, grpcconfigs.DefaultClientMaxSendSize) - - Params.Remove("queryNode.grpc.clientMaxRecvSize") - Params.initClientMaxRecvSize() - assert.Equal(t, Params.ClientMaxRecvSize, grpcconfigs.DefaultClientMaxRecvSize) -} diff --git a/internal/distributed/querynode/param_table.go b/internal/distributed/querynode/param_table.go deleted file mode 100644 index ab0955ffcd..0000000000 --- a/internal/distributed/querynode/param_table.go +++ /dev/null @@ -1,130 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcquerynode - -import ( - "strconv" - "sync" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/paramtable" -) - -// Params is a package scoped variable of type ParamTable. -var Params ParamTable -var once sync.Once - -// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by -// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration. -type ParamTable struct { - paramtable.BaseTable - - IP string - Port int - Address string - QueryNodeID UniqueID - - ServerMaxSendSize int - ServerMaxRecvSize int -} - -// Init is used to initialize configuration items. -func (pt *ParamTable) Init() { - once.Do(func() { - pt.BaseTable.Init() - pt.initParams() - pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10) - }) -} - -// initParams initializes params of the configuration items. -func (pt *ParamTable) initParams() { - pt.LoadFromEnv() - pt.LoadFromArgs() - pt.initPort() - pt.initServerMaxSendSize() - pt.initServerMaxRecvSize() -} - -// LoadFromArgs is used to initialize configuration items from args. -func (pt *ParamTable) LoadFromArgs() { - -} - -// LoadFromEnv is used to initialize configuration items from env. -func (pt *ParamTable) LoadFromEnv() { - pt.IP = funcutil.GetLocalIP() -} - -func (pt *ParamTable) initPort() { - port := pt.ParseInt("queryNode.port") - pt.Port = port - if !funcutil.CheckPortAvailable(pt.Port) { - pt.Port = funcutil.GetAvailablePort() - log.Warn("QueryNode init", zap.Any("Port", pt.Port)) - } -} - -func (pt *ParamTable) initServerMaxSendSize() { - var err error - - valueStr, err := pt.Load("queryNode.grpc.serverMaxSendSize") - if err != nil { // not set - pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse queryNode.grpc.serverMaxSendSize, set to default", - zap.String("queryNode.grpc.serverMaxSendSize", valueStr), - zap.Error(err)) - - pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize - } else { - pt.ServerMaxSendSize = value - } - - log.Debug("initServerMaxSendSize", - zap.Int("queryNode.grpc.serverMaxSendSize", pt.ServerMaxSendSize)) -} - -func (pt *ParamTable) initServerMaxRecvSize() { - var err error - - valueStr, err := pt.Load("queryNode.grpc.serverMaxRecvSize") - if err != nil { // not set - pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse queryNode.grpc.serverMaxRecvSize, set to default", - zap.String("queryNode.grpc.serverMaxRecvSize", valueStr), - zap.Error(err)) - - pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize - } else { - pt.ServerMaxRecvSize = value - } - - log.Debug("initServerMaxRecvSize", - zap.Int("queryNode.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize)) -} diff --git a/internal/distributed/querynode/param_table_test.go b/internal/distributed/querynode/param_table_test.go deleted file mode 100644 index 84d558e5e1..0000000000 --- a/internal/distributed/querynode/param_table_test.go +++ /dev/null @@ -1,46 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcquerynode - -import ( - "testing" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/stretchr/testify/assert" -) - -func TestParamTable(t *testing.T) { - Params.Init() - - log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) - log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) - - Params.Remove("queryNode.grpc.ServerMaxSendSize") - Params.initServerMaxSendSize() - assert.Equal(t, Params.ServerMaxSendSize, grpcconfigs.DefaultServerMaxSendSize) - - Params.Remove("queryNode.grpc.ServerMaxRecvSize") - Params.initServerMaxRecvSize() - assert.Equal(t, Params.ServerMaxRecvSize, grpcconfigs.DefaultServerMaxRecvSize) - - Params.LoadFromEnv() - assert.Equal(t, Params.IP, funcutil.GetLocalIP()) -} diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 5bf5e9e57b..76a4809fa6 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -25,10 +25,6 @@ import ( "sync" "time" - "github.com/milvus-io/milvus/internal/util/retry" - - "github.com/milvus-io/milvus/internal/types" - "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -43,11 +39,16 @@ import ( "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/querypb" qn "github.com/milvus-io/milvus/internal/querynode" + "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/typeutil" ) +var Params paramtable.GrpcServerConfig + // UniqueID is an alias for type typeutil.UniqueID, used as a unique identifier for the request. type UniqueID = typeutil.UniqueID @@ -82,12 +83,12 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) // init initializes QueryNode's grpc service. func (s *Server) init() error { - Params.Init() + Params.InitOnce(typeutil.QueryNodeRole) qn.Params.InitOnce() qn.Params.QueryNodeIP = Params.IP qn.Params.QueryNodePort = int64(Params.Port) - qn.Params.QueryNodeID = Params.QueryNodeID + //qn.Params.QueryNodeID = Params.QueryNodeID closer := trace.InitTracing(fmt.Sprintf("query_node ip: %s, port: %d", Params.IP, Params.Port)) s.closer = closer @@ -256,7 +257,7 @@ func (s *Server) Run() error { // Stop stops QueryNode's grpc service. func (s *Server) Stop() error { - log.Debug("QueryNode stop", zap.String("Address", Params.Address)) + log.Debug("QueryNode stop", zap.String("Address", Params.GetAddress())) if s.closer != nil { if err := s.closer.Close(); err != nil { return err diff --git a/internal/distributed/rootcoord/client/client.go b/internal/distributed/rootcoord/client/client.go index c873159e6a..6ceffd45fd 100644 --- a/internal/distributed/rootcoord/client/client.go +++ b/internal/distributed/rootcoord/client/client.go @@ -29,12 +29,15 @@ import ( "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/grpcclient" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/typeutil" "go.uber.org/zap" "google.golang.org/grpc" ) +var Params paramtable.GrpcClientConfig + // Client grpc client type Client struct { grpcClient grpcclient.GrpcClient @@ -53,7 +56,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*C log.Debug("QueryCoordClient NewClient failed", zap.Error(err)) return nil, err } - Params.Init() + Params.InitOnce(typeutil.RootCoordRole) client := &Client{ grpcClient: &grpcclient.ClientBase{ ClientMaxRecvSize: Params.ClientMaxRecvSize, diff --git a/internal/distributed/rootcoord/client/param_table.go b/internal/distributed/rootcoord/client/param_table.go deleted file mode 100644 index 287a3c3964..0000000000 --- a/internal/distributed/rootcoord/client/param_table.go +++ /dev/null @@ -1,96 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcrootcoordclient - -import ( - "strconv" - "sync" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/util/paramtable" -) - -// ParamTable structure stored all parameters -type ParamTable struct { - paramtable.BaseTable - - ClientMaxSendSize int - ClientMaxRecvSize int -} - -// Params rootcoord parameter table -var Params ParamTable -var once sync.Once - -// Init initialize param table -func (pt *ParamTable) Init() { - once.Do(func() { - pt.BaseTable.Init() - - pt.initClientMaxSendSize() - pt.initClientMaxRecvSize() - }) -} - -func (pt *ParamTable) initClientMaxSendSize() { - var err error - - valueStr, err := pt.Load("rootCoord.grpc.clientMaxSendSize") - if err != nil { // not set - pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse rootCoord.grpc.clientMaxSendSize, set to default", - zap.String("rootCoord.grpc.clientMaxSendSize", valueStr), - zap.Error(err)) - - pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize - } else { - pt.ClientMaxSendSize = value - } - - log.Debug("initClientMaxSendSize", - zap.Int("rootCoord.grpc.clientMaxSendSize", pt.ClientMaxSendSize)) -} - -func (pt *ParamTable) initClientMaxRecvSize() { - var err error - - valueStr, err := pt.Load("rootCoord.grpc.clientMaxRecvSize") - if err != nil { // not set - pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse rootCoord.grpc.clientMaxRecvSize, set to default", - zap.String("rootCoord.grpc.clientMaxRecvSize", valueStr), - zap.Error(err)) - - pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize - } else { - pt.ClientMaxRecvSize = value - } - - log.Debug("initClientMaxRecvSize", - zap.Int("rootCoord.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize)) -} diff --git a/internal/distributed/rootcoord/client/param_table_test.go b/internal/distributed/rootcoord/client/param_table_test.go deleted file mode 100644 index 058d5bf5ea..0000000000 --- a/internal/distributed/rootcoord/client/param_table_test.go +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcrootcoordclient - -import ( - "testing" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" -) - -func TestParamTable(t *testing.T) { - Params.Init() - - log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize)) - log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize)) - - Params.Remove("rootCoord.grpc.clientMaxSendSize") - Params.initClientMaxSendSize() - assert.Equal(t, Params.ClientMaxSendSize, grpcconfigs.DefaultClientMaxSendSize) - - Params.Remove("rootCoord.grpc.clientMaxRecvSize") - Params.initClientMaxRecvSize() - assert.Equal(t, Params.ClientMaxRecvSize, grpcconfigs.DefaultClientMaxRecvSize) -} diff --git a/internal/distributed/rootcoord/param_table.go b/internal/distributed/rootcoord/param_table.go deleted file mode 100644 index 4594b94de1..0000000000 --- a/internal/distributed/rootcoord/param_table.go +++ /dev/null @@ -1,124 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcrootcoord - -import ( - "strconv" - "sync" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/funcutil" - "github.com/milvus-io/milvus/internal/util/paramtable" - "go.uber.org/zap" -) - -// Params is a package scoped variable of type ParamTable. -var Params ParamTable -var once sync.Once - -// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by -// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration. -type ParamTable struct { - paramtable.BaseTable - - IP string - Port int - Address string - - ServerMaxSendSize int - ServerMaxRecvSize int -} - -// Init is an override method of BaseTable's Init. It mainly calls the -// Init of BaseTable and do some other initialization. -func (pt *ParamTable) Init() { - once.Do(func() { - pt.BaseTable.Init() - pt.initParams() - pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10) - }) -} - -// initParams initializes params of the configuration items. -func (pt *ParamTable) initParams() { - pt.LoadFromEnv() - pt.LoadFromArgs() - pt.initPort() - pt.initServerMaxSendSize() - pt.initServerMaxRecvSize() -} - -// LoadFromEnv is used to initialize configuration items from env. -func (pt *ParamTable) LoadFromEnv() { - pt.IP = funcutil.GetLocalIP() -} - -// LoadFromArgs is used to initialize configuration items from args. -func (pt *ParamTable) LoadFromArgs() { - -} - -func (pt *ParamTable) initPort() { - pt.Port = pt.ParseInt("rootCoord.port") -} - -func (pt *ParamTable) initServerMaxSendSize() { - var err error - - valueStr, err := pt.Load("rootCoord.grpc.serverMaxSendSize") - if err != nil { // not set - pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse rootCoord.grpc.serverMaxSendSize, set to default", - zap.String("rootCoord.grpc.serverMaxSendSize", valueStr), - zap.Error(err)) - - pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize - } else { - pt.ServerMaxSendSize = value - } - - log.Debug("initServerMaxSendSize", - zap.Int("rootCoord.grpc.serverMaxSendSize", pt.ServerMaxSendSize)) -} - -func (pt *ParamTable) initServerMaxRecvSize() { - var err error - - valueStr, err := pt.Load("rootCoord.grpc.serverMaxRecvSize") - if err != nil { // not set - pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize - } - - value, err := strconv.Atoi(valueStr) - if err != nil { // not in valid format - log.Warn("Failed to parse rootCoord.grpc.serverMaxRecvSize, set to default", - zap.String("rootCoord.grpc.serverMaxRecvSize", valueStr), - zap.Error(err)) - - pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize - } else { - pt.ServerMaxRecvSize = value - } - - log.Debug("initServerMaxRecvSize", - zap.Int("rootCoord.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize)) -} diff --git a/internal/distributed/rootcoord/param_table_test.go b/internal/distributed/rootcoord/param_table_test.go deleted file mode 100644 index 765f1742bc..0000000000 --- a/internal/distributed/rootcoord/param_table_test.go +++ /dev/null @@ -1,48 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpcrootcoord - -import ( - "testing" - - "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" - "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" - - "github.com/stretchr/testify/assert" -) - -func TestParamTable(t *testing.T) { - Params.Init() - - assert.NotEqual(t, Params.Address, "") - t.Logf("master address = %s", Params.Address) - - assert.NotEqual(t, Params.Port, 0) - t.Logf("master port = %d", Params.Port) - - log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) - log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) - - Params.Remove("rootCoord.grpc.ServerMaxSendSize") - Params.initServerMaxSendSize() - assert.Equal(t, Params.ServerMaxSendSize, grpcconfigs.DefaultServerMaxSendSize) - - Params.Remove("rootCoord.grpc.ServerMaxRecvSize") - Params.initServerMaxRecvSize() - assert.Equal(t, Params.ServerMaxRecvSize, grpcconfigs.DefaultServerMaxRecvSize) -} diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index a223ca0671..430d2a6eff 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -26,6 +26,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" dsc "github.com/milvus-io/milvus/internal/distributed/datacoord/client" @@ -43,11 +44,14 @@ import ( "github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/trace" - "google.golang.org/grpc/keepalive" + "github.com/milvus-io/milvus/internal/util/typeutil" ) +var Params paramtable.GrpcServerConfig + // Server grpc wrapper type Server struct { rootCoord types.RootCoordComponent @@ -141,10 +145,10 @@ func (s *Server) Run() error { } func (s *Server) init() error { - Params.Init() + Params.InitOnce(typeutil.RootCoordRole) rootcoord.Params.InitOnce() - rootcoord.Params.Address = Params.Address + rootcoord.Params.Address = Params.GetAddress() rootcoord.Params.Port = Params.Port log.Debug("grpc init done ...") @@ -264,7 +268,7 @@ func (s *Server) start() error { } func (s *Server) Stop() error { - log.Debug("Rootcoord stop", zap.String("Address", Params.Address)) + log.Debug("Rootcoord stop", zap.String("Address", Params.GetAddress())) if s.closer != nil { if err := s.closer.Close(); err != nil { log.Error("Failed to close opentracing", zap.Error(err)) diff --git a/internal/distributed/rootcoord/service_test.go b/internal/distributed/rootcoord/service_test.go index 571f82fc8d..775053d181 100644 --- a/internal/distributed/rootcoord/service_test.go +++ b/internal/distributed/rootcoord/service_test.go @@ -22,8 +22,6 @@ import ( "fmt" "math/rand" "path" - "strconv" - "strings" "sync" "testing" "time" @@ -71,13 +69,9 @@ func TestGrpcService(t *testing.T) { rand.Seed(time.Now().UnixNano()) randVal := rand.Int() - Params.Init() + Params.InitOnce(typeutil.RootCoordRole) Params.Port = (randVal % 100) + 10000 - parts := strings.Split(Params.Address, ":") - if len(parts) == 2 { - Params.Address = parts[0] + ":" + strconv.Itoa(Params.Port) - t.Log("newParams.Address:", Params.Address) - } + t.Log("newParams.Address:", Params.GetAddress()) ctx := context.Background() msFactory := msgstream.NewPmsFactory() @@ -95,7 +89,7 @@ func TestGrpcService(t *testing.T) { rootcoord.Params.DefaultPartitionName = "_default" rootcoord.Params.DefaultIndexName = "_default" - t.Logf("master service port = %d", Params.Port) + t.Logf("service port = %d", Params.Port) core, ok := (svr.rootCoord).(*rootcoord.Core) assert.True(t, ok) @@ -119,7 +113,7 @@ func TestGrpcService(t *testing.T) { _, err = etcdCli.Put(ctx, path.Join(sessKey, typeutil.ProxyRole+"-100"), string(pnb)) assert.Nil(t, err) - rootcoord.Params.Address = Params.Address + rootcoord.Params.Address = Params.GetAddress() err = core.Init() assert.Nil(t, err) @@ -895,7 +889,7 @@ func TestRun(t *testing.T) { cancel: cancel, grpcErrChan: make(chan error), } - Params.Init() + Params.InitOnce(typeutil.RootCoordRole) Params.Port = 1000000 err := svr.Run() assert.NotNil(t, err) diff --git a/internal/util/funcutil/func.go b/internal/util/funcutil/func.go index ff719b9fb2..f45b5472f1 100644 --- a/internal/util/funcutil/func.go +++ b/internal/util/funcutil/func.go @@ -17,9 +17,7 @@ import ( "errors" "fmt" "io/ioutil" - "net" "net/http" - "strconv" "time" "github.com/go-basic/ipv4" @@ -41,27 +39,6 @@ func CheckGrpcReady(ctx context.Context, targetCh chan error) { } } -// CheckPortAvailable check if a port is available to be listened on -func CheckPortAvailable(port int) bool { - addr := ":" + strconv.Itoa(port) - listener, err := net.Listen("tcp", addr) - if listener != nil { - listener.Close() - } - return err == nil -} - -// GetAvailablePort return an available port that can be listened on -func GetAvailablePort() int { - listener, err := net.Listen("tcp", ":0") - if err != nil { - panic(err) - } - defer listener.Close() - - return listener.Addr().(*net.TCPAddr).Port -} - // GetLocalIP return the local ip address func GetLocalIP() string { return ipv4.LocalIP() diff --git a/internal/util/funcutil/func_test.go b/internal/util/funcutil/func_test.go index 7b032a18f8..266e419e57 100644 --- a/internal/util/funcutil/func_test.go +++ b/internal/util/funcutil/func_test.go @@ -99,15 +99,6 @@ func Test_CheckGrpcReady(t *testing.T) { cancel() } -func Test_CheckPortAvailable(t *testing.T) { - num := 10 - - for i := 0; i < num; i++ { - port := GetAvailablePort() - assert.Equal(t, CheckPortAvailable(port), true) - } -} - func Test_GetLocalIP(t *testing.T) { ip := GetLocalIP() assert.NotNil(t, ip) diff --git a/internal/util/paramtable/global_param.go b/internal/util/paramtable/global_param.go new file mode 100644 index 0000000000..b1707f11c3 --- /dev/null +++ b/internal/util/paramtable/global_param.go @@ -0,0 +1,252 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package paramtable + +import ( + "math" + "net" + "strconv" + "sync" + + "go.uber.org/zap" + + "github.com/go-basic/ipv4" + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/typeutil" +) + +const ( + // DefaultServerMaxSendSize defines the maximum size of data per grpc request can send by server side. + DefaultServerMaxSendSize = math.MaxInt32 + + // DefaultServerMaxRecvSize defines the maximum size of data per grpc request can receive by server side. + DefaultServerMaxRecvSize = math.MaxInt32 + + // DefaultClientMaxSendSize defines the maximum size of data per grpc request can send by client side. + DefaultClientMaxSendSize = 100 * 1024 * 1024 + + // DefaultClientMaxRecvSize defines the maximum size of data per grpc request can receive by client side. + DefaultClientMaxRecvSize = 100 * 1024 * 1024 +) + +/////////////////////////////////////////////////////////////////////////////// +// -- grpc --- +type grpcConfig struct { + BaseParamTable + + once sync.Once + Domain string + IP string + Port int + Listener net.Listener +} + +func (p *grpcConfig) init(domain string) { + p.BaseParamTable.Init() + p.Domain = domain + + p.LoadFromEnv() + p.LoadFromArgs() + p.initPort() + p.initListener() +} + +// LoadFromEnv is used to initialize configuration items from env. +func (p *grpcConfig) LoadFromEnv() { + p.IP = ipv4.LocalIP() +} + +// LoadFromArgs is used to initialize configuration items from args. +func (p *grpcConfig) LoadFromArgs() { + +} + +func (p *grpcConfig) initPort() { + p.Port = p.ParseInt(p.Domain + ".port") + + if p.Domain == typeutil.ProxyRole || p.Domain == typeutil.DataNodeRole || p.Domain == typeutil.IndexNodeRole || p.Domain == typeutil.QueryNodeRole { + if !CheckPortAvailable(p.Port) { + p.Port = GetAvailablePort() + log.Warn("get available port when init", zap.String("Domain", p.Domain), zap.Int("Port", p.Port)) + } + } +} + +// GetAddress return grpc address +func (p *grpcConfig) GetAddress() string { + return p.IP + ":" + strconv.Itoa(p.Port) +} + +func (p *grpcConfig) initListener() { + if p.Domain == typeutil.DataNodeRole { + listener, err := net.Listen("tcp", p.GetAddress()) + if err != nil { + panic(err) + } + p.Listener = listener + } +} + +type GrpcServerConfig struct { + grpcConfig + + ServerMaxSendSize int + ServerMaxRecvSize int +} + +// InitOnce initialize grpc server config once +func (p *GrpcServerConfig) InitOnce(domain string) { + p.once.Do(func() { + p.init(domain) + }) +} + +func (p *GrpcServerConfig) init(domain string) { + p.grpcConfig.init(domain) + + p.initServerMaxSendSize() + p.initServerMaxRecvSize() +} + +func (p *GrpcServerConfig) initServerMaxSendSize() { + var err error + + valueStr, err := p.Load(p.Domain + ".grpc.serverMaxSendSize") + if err != nil { + p.ServerMaxSendSize = DefaultServerMaxSendSize + } + + value, err := strconv.Atoi(valueStr) + if err != nil { + log.Warn("Failed to parse grpc.serverMaxSendSize, set to default", + zap.String("rol", p.Domain), zap.String("grpc.serverMaxSendSize", valueStr), + zap.Error(err)) + + p.ServerMaxSendSize = DefaultServerMaxSendSize + } else { + p.ServerMaxSendSize = value + } + + log.Debug("initServerMaxSendSize", + zap.String("role", p.Domain), zap.Int("grpc.serverMaxSendSize", p.ServerMaxSendSize)) +} + +func (p *GrpcServerConfig) initServerMaxRecvSize() { + var err error + + valueStr, err := p.Load(p.Domain + ".grpc.serverMaxRecvSize") + if err != nil { + p.ServerMaxRecvSize = DefaultServerMaxRecvSize + } + + value, err := strconv.Atoi(valueStr) + if err != nil { + log.Warn("Failed to parse grpc.serverMaxRecvSize, set to default", + zap.String("role", p.Domain), zap.String("grpc.serverMaxRecvSize", valueStr), + zap.Error(err)) + + p.ServerMaxRecvSize = DefaultServerMaxRecvSize + } else { + p.ServerMaxRecvSize = value + } + + log.Debug("initServerMaxRecvSize", + zap.String("role", p.Domain), zap.Int("grpc.serverMaxRecvSize", p.ServerMaxRecvSize)) +} + +type GrpcClientConfig struct { + grpcConfig + + ClientMaxSendSize int + ClientMaxRecvSize int +} + +// InitOnce initialize grpc client config once +func (p *GrpcClientConfig) InitOnce(domain string) { + p.once.Do(func() { + p.init(domain) + }) +} + +func (p *GrpcClientConfig) init(domain string) { + p.grpcConfig.init(domain) + + p.initClientMaxSendSize() + p.initClientMaxRecvSize() +} + +func (p *GrpcClientConfig) initClientMaxSendSize() { + var err error + + valueStr, err := p.Load(p.Domain + ".grpc.clientMaxSendSize") + if err != nil { + p.ClientMaxSendSize = DefaultClientMaxSendSize + } + + value, err := strconv.Atoi(valueStr) + if err != nil { + log.Warn("Failed to parse grpc.clientMaxSendSize, set to default", + zap.String("role", p.Domain), zap.String("grpc.clientMaxSendSize", valueStr), + zap.Error(err)) + + p.ClientMaxSendSize = DefaultClientMaxSendSize + } else { + p.ClientMaxSendSize = value + } + + log.Debug("initClientMaxSendSize", + zap.String("role", p.Domain), zap.Int("grpc.clientMaxSendSize", p.ClientMaxSendSize)) +} + +func (p *GrpcClientConfig) initClientMaxRecvSize() { + var err error + + valueStr, err := p.Load(p.Domain + ".grpc.clientMaxRecvSize") + if err != nil { + p.ClientMaxRecvSize = DefaultClientMaxRecvSize + } + + value, err := strconv.Atoi(valueStr) + if err != nil { + log.Warn("Failed to parse grpc.clientMaxRecvSize, set to default", + zap.String("role", p.Domain), zap.String("grpc.clientMaxRecvSize", valueStr), + zap.Error(err)) + + p.ClientMaxRecvSize = DefaultClientMaxRecvSize + } else { + p.ClientMaxRecvSize = value + } + + log.Debug("initClientMaxRecvSize", + zap.String("role", p.Domain), zap.Int("grpc.clientMaxRecvSize", p.ClientMaxRecvSize)) +} + +// CheckPortAvailable check if a port is available to be listened on +func CheckPortAvailable(port int) bool { + addr := ":" + strconv.Itoa(port) + listener, err := net.Listen("tcp", addr) + if listener != nil { + listener.Close() + } + return err == nil +} + +// GetAvailablePort return an available port that can be listened on +func GetAvailablePort() int { + listener, err := net.Listen("tcp", ":0") + if err != nil { + panic(err) + } + defer listener.Close() + + return listener.Addr().(*net.TCPAddr).Port +} diff --git a/internal/util/paramtable/global_param_test.go b/internal/util/paramtable/global_param_test.go new file mode 100644 index 0000000000..1469c05f52 --- /dev/null +++ b/internal/util/paramtable/global_param_test.go @@ -0,0 +1,96 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package paramtable + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/util/typeutil" +) + +func TestGrpcServerParams(t *testing.T) { + role := typeutil.DataNodeRole + var Params GrpcServerConfig + Params.InitOnce(role) + + assert.Equal(t, Params.Domain, role) + t.Logf("Domain = %s", Params.Domain) + + assert.NotEqual(t, Params.IP, "") + t.Logf("IP = %s", Params.IP) + + assert.NotZero(t, Params.Port) + t.Logf("Port = %d", Params.Port) + + t.Logf("Address = %s", Params.GetAddress()) + + assert.NotNil(t, Params.Listener) + t.Logf("Listener = %d", Params.Listener) + + assert.NotZero(t, Params.ServerMaxRecvSize) + t.Logf("ServerMaxRecvSize = %d", Params.ServerMaxRecvSize) + + Params.Remove(role + ".grpc.serverMaxRecvSize") + Params.initServerMaxRecvSize() + assert.Equal(t, Params.ServerMaxRecvSize, DefaultServerMaxRecvSize) + + assert.NotZero(t, Params.ServerMaxSendSize) + t.Logf("ServerMaxSendSize = %d", Params.ServerMaxSendSize) + + Params.Remove(role + ".grpc.serverMaxSendSize") + Params.initServerMaxSendSize() + assert.Equal(t, Params.ServerMaxSendSize, DefaultServerMaxSendSize) +} + +func TestGrpcClientParams(t *testing.T) { + role := typeutil.DataNodeRole + var Params GrpcClientConfig + Params.InitOnce(role) + + assert.Equal(t, Params.Domain, role) + t.Logf("Domain = %s", Params.Domain) + + assert.NotEqual(t, Params.IP, "") + t.Logf("IP = %s", Params.IP) + + assert.NotZero(t, Params.Port) + t.Logf("Port = %d", Params.Port) + + t.Logf("Address = %s", Params.GetAddress()) + + assert.NotNil(t, Params.Listener) + t.Logf("Listener = %d", Params.Listener) + + assert.NotZero(t, Params.ClientMaxRecvSize) + t.Logf("ClientMaxRecvSize = %d", Params.ClientMaxRecvSize) + + Params.Remove(role + ".grpc.clientMaxRecvSize") + Params.initClientMaxRecvSize() + assert.Equal(t, Params.ClientMaxRecvSize, DefaultClientMaxRecvSize) + + assert.NotZero(t, Params.ClientMaxSendSize) + t.Logf("ClientMaxSendSize = %d", Params.ClientMaxSendSize) + + Params.Remove(role + ".grpc.clientMaxSendSize") + Params.initClientMaxSendSize() + assert.Equal(t, Params.ClientMaxSendSize, DefaultClientMaxSendSize) +} + +func TestCheckPortAvailable(t *testing.T) { + num := 10 + for i := 0; i < num; i++ { + port := GetAvailablePort() + assert.Equal(t, CheckPortAvailable(port), true) + } +}