mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 01:58:34 +08:00
Initilize listener in service instead of ParamTable (#15220)
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
parent
7a9f5f00f5
commit
f14df1d2db
@ -40,6 +40,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/internal/util/etcd"
|
"github.com/milvus-io/milvus/internal/util/etcd"
|
||||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/retry"
|
||||||
"github.com/milvus-io/milvus/internal/util/trace"
|
"github.com/milvus-io/milvus/internal/util/trace"
|
||||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
@ -93,14 +94,14 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
|
|||||||
|
|
||||||
func (s *Server) startGrpc() error {
|
func (s *Server) startGrpc() error {
|
||||||
s.wg.Add(1)
|
s.wg.Add(1)
|
||||||
go s.startGrpcLoop(Params.Listener)
|
go s.startGrpcLoop(Params.Port)
|
||||||
// wait for grpc server loop start
|
// wait for grpc server loop start
|
||||||
err := <-s.grpcErrChan
|
err := <-s.grpcErrChan
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// startGrpcLoop starts the grep loop of datanode component.
|
// startGrpcLoop starts the grep loop of datanode component.
|
||||||
func (s *Server) startGrpcLoop(listener net.Listener) {
|
func (s *Server) startGrpcLoop(grpcPort int) {
|
||||||
defer s.wg.Done()
|
defer s.wg.Done()
|
||||||
var kaep = keepalive.EnforcementPolicy{
|
var kaep = keepalive.EnforcementPolicy{
|
||||||
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
|
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
|
||||||
@ -111,6 +112,20 @@ func (s *Server) startGrpcLoop(listener net.Listener) {
|
|||||||
Time: 60 * time.Second, // Ping the client if it is idle for 60 seconds to ensure the connection is still active
|
Time: 60 * time.Second, // Ping the client if it is idle for 60 seconds to ensure the connection is still active
|
||||||
Timeout: 10 * time.Second, // Wait 10 second for the ping ack before assuming the connection is dead
|
Timeout: 10 * time.Second, // Wait 10 second for the ping ack before assuming the connection is dead
|
||||||
}
|
}
|
||||||
|
var lis net.Listener
|
||||||
|
|
||||||
|
err := retry.Do(s.ctx, func() error {
|
||||||
|
addr := ":" + strconv.Itoa(grpcPort)
|
||||||
|
var err error
|
||||||
|
lis, err = net.Listen("tcp", addr)
|
||||||
|
return err
|
||||||
|
}, retry.Attempts(10))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Error("DataNode GrpcServer:failed to listen", zap.Error(err))
|
||||||
|
s.grpcErrChan <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
opts := trace.GetInterceptorOpts()
|
opts := trace.GetInterceptorOpts()
|
||||||
s.grpcServer = grpc.NewServer(
|
s.grpcServer = grpc.NewServer(
|
||||||
@ -126,7 +141,7 @@ func (s *Server) startGrpcLoop(listener net.Listener) {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
|
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
|
||||||
if err := s.grpcServer.Serve(listener); err != nil {
|
if err := s.grpcServer.Serve(lis); err != nil {
|
||||||
log.Warn("DataNode Start Grpc Failed!")
|
log.Warn("DataNode Start Grpc Failed!")
|
||||||
s.grpcErrChan <- err
|
s.grpcErrChan <- err
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1445,11 +1445,10 @@ func (p *indexNodeConfig) initIndexStorageRootPath() {
|
|||||||
type grpcConfig struct {
|
type grpcConfig struct {
|
||||||
BaseParamTable
|
BaseParamTable
|
||||||
|
|
||||||
once sync.Once
|
once sync.Once
|
||||||
Domain string
|
Domain string
|
||||||
IP string
|
IP string
|
||||||
Port int
|
Port int
|
||||||
Listener net.Listener
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *grpcConfig) init(domain string) {
|
func (p *grpcConfig) init(domain string) {
|
||||||
@ -1459,7 +1458,6 @@ func (p *grpcConfig) init(domain string) {
|
|||||||
p.LoadFromEnv()
|
p.LoadFromEnv()
|
||||||
p.LoadFromArgs()
|
p.LoadFromArgs()
|
||||||
p.initPort()
|
p.initPort()
|
||||||
p.initListener()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadFromEnv is used to initialize configuration items from env.
|
// LoadFromEnv is used to initialize configuration items from env.
|
||||||
@ -1474,7 +1472,6 @@ func (p *grpcConfig) LoadFromArgs() {
|
|||||||
|
|
||||||
func (p *grpcConfig) initPort() {
|
func (p *grpcConfig) initPort() {
|
||||||
p.Port = p.ParseInt(p.Domain + ".port")
|
p.Port = p.ParseInt(p.Domain + ".port")
|
||||||
|
|
||||||
if p.Domain == typeutil.ProxyRole || p.Domain == typeutil.DataNodeRole || p.Domain == typeutil.IndexNodeRole || p.Domain == typeutil.QueryNodeRole {
|
if p.Domain == typeutil.ProxyRole || p.Domain == typeutil.DataNodeRole || p.Domain == typeutil.IndexNodeRole || p.Domain == typeutil.QueryNodeRole {
|
||||||
if !CheckPortAvailable(p.Port) {
|
if !CheckPortAvailable(p.Port) {
|
||||||
p.Port = GetAvailablePort()
|
p.Port = GetAvailablePort()
|
||||||
@ -1488,16 +1485,6 @@ func (p *grpcConfig) GetAddress() string {
|
|||||||
return p.IP + ":" + strconv.Itoa(p.Port)
|
return p.IP + ":" + strconv.Itoa(p.Port)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *grpcConfig) initListener() {
|
|
||||||
if p.Domain == typeutil.DataNodeRole {
|
|
||||||
listener, err := net.Listen("tcp", p.GetAddress())
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
p.Listener = listener
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// GrpcServerConfig is configuration for grpc server.
|
// GrpcServerConfig is configuration for grpc server.
|
||||||
type GrpcServerConfig struct {
|
type GrpcServerConfig struct {
|
||||||
grpcConfig
|
grpcConfig
|
||||||
|
|||||||
@ -357,9 +357,6 @@ func TestGrpcServerParams(t *testing.T) {
|
|||||||
|
|
||||||
t.Logf("Address = %s", Params.GetAddress())
|
t.Logf("Address = %s", Params.GetAddress())
|
||||||
|
|
||||||
assert.NotNil(t, Params.Listener)
|
|
||||||
t.Logf("Listener = %d", Params.Listener)
|
|
||||||
|
|
||||||
assert.NotZero(t, Params.ServerMaxRecvSize)
|
assert.NotZero(t, Params.ServerMaxRecvSize)
|
||||||
t.Logf("ServerMaxRecvSize = %d", Params.ServerMaxRecvSize)
|
t.Logf("ServerMaxRecvSize = %d", Params.ServerMaxRecvSize)
|
||||||
|
|
||||||
@ -391,9 +388,6 @@ func TestGrpcClientParams(t *testing.T) {
|
|||||||
|
|
||||||
t.Logf("Address = %s", Params.GetAddress())
|
t.Logf("Address = %s", Params.GetAddress())
|
||||||
|
|
||||||
assert.NotNil(t, Params.Listener)
|
|
||||||
t.Logf("Listener = %d", Params.Listener)
|
|
||||||
|
|
||||||
assert.NotZero(t, Params.ClientMaxRecvSize)
|
assert.NotZero(t, Params.ClientMaxRecvSize)
|
||||||
t.Logf("ClientMaxRecvSize = %d", Params.ClientMaxRecvSize)
|
t.Logf("ClientMaxRecvSize = %d", Params.ClientMaxRecvSize)
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user