diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index c9cf35a215..8f1b49858a 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -40,6 +40,7 @@ import ( "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/typeutil" clientv3 "go.etcd.io/etcd/client/v3" @@ -93,14 +94,14 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) func (s *Server) startGrpc() error { s.wg.Add(1) - go s.startGrpcLoop(Params.Listener) + go s.startGrpcLoop(Params.Port) // wait for grpc server loop start err := <-s.grpcErrChan return err } // startGrpcLoop starts the grep loop of datanode component. -func (s *Server) startGrpcLoop(listener net.Listener) { +func (s *Server) startGrpcLoop(grpcPort int) { defer s.wg.Done() var kaep = keepalive.EnforcementPolicy{ MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection @@ -111,6 +112,20 @@ func (s *Server) startGrpcLoop(listener net.Listener) { Time: 60 * time.Second, // Ping the client if it is idle for 60 seconds to ensure the connection is still active Timeout: 10 * time.Second, // Wait 10 second for the ping ack before assuming the connection is dead } + var lis net.Listener + + err := retry.Do(s.ctx, func() error { + addr := ":" + strconv.Itoa(grpcPort) + var err error + lis, err = net.Listen("tcp", addr) + return err + }, retry.Attempts(10)) + + if err != nil { + log.Error("DataNode GrpcServer:failed to listen", zap.Error(err)) + s.grpcErrChan <- err + return + } opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( @@ -126,7 +141,7 @@ func (s *Server) startGrpcLoop(listener net.Listener) { defer cancel() go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) - if err := s.grpcServer.Serve(listener); err != nil { + if err := s.grpcServer.Serve(lis); err != nil { log.Warn("DataNode Start Grpc Failed!") s.grpcErrChan <- err } diff --git a/internal/util/paramtable/global_param.go b/internal/util/paramtable/global_param.go index 5704e96830..c52966abdf 100644 --- a/internal/util/paramtable/global_param.go +++ b/internal/util/paramtable/global_param.go @@ -1445,11 +1445,10 @@ func (p *indexNodeConfig) initIndexStorageRootPath() { type grpcConfig struct { BaseParamTable - once sync.Once - Domain string - IP string - Port int - Listener net.Listener + once sync.Once + Domain string + IP string + Port int } func (p *grpcConfig) init(domain string) { @@ -1459,7 +1458,6 @@ func (p *grpcConfig) init(domain string) { p.LoadFromEnv() p.LoadFromArgs() p.initPort() - p.initListener() } // LoadFromEnv is used to initialize configuration items from env. @@ -1474,7 +1472,6 @@ func (p *grpcConfig) LoadFromArgs() { func (p *grpcConfig) initPort() { p.Port = p.ParseInt(p.Domain + ".port") - if p.Domain == typeutil.ProxyRole || p.Domain == typeutil.DataNodeRole || p.Domain == typeutil.IndexNodeRole || p.Domain == typeutil.QueryNodeRole { if !CheckPortAvailable(p.Port) { p.Port = GetAvailablePort() @@ -1488,16 +1485,6 @@ func (p *grpcConfig) GetAddress() string { return p.IP + ":" + strconv.Itoa(p.Port) } -func (p *grpcConfig) initListener() { - if p.Domain == typeutil.DataNodeRole { - listener, err := net.Listen("tcp", p.GetAddress()) - if err != nil { - panic(err) - } - p.Listener = listener - } -} - // GrpcServerConfig is configuration for grpc server. type GrpcServerConfig struct { grpcConfig diff --git a/internal/util/paramtable/global_param_test.go b/internal/util/paramtable/global_param_test.go index 15c11617cf..649000859b 100644 --- a/internal/util/paramtable/global_param_test.go +++ b/internal/util/paramtable/global_param_test.go @@ -357,9 +357,6 @@ func TestGrpcServerParams(t *testing.T) { t.Logf("Address = %s", Params.GetAddress()) - assert.NotNil(t, Params.Listener) - t.Logf("Listener = %d", Params.Listener) - assert.NotZero(t, Params.ServerMaxRecvSize) t.Logf("ServerMaxRecvSize = %d", Params.ServerMaxRecvSize) @@ -391,9 +388,6 @@ func TestGrpcClientParams(t *testing.T) { t.Logf("Address = %s", Params.GetAddress()) - assert.NotNil(t, Params.Listener) - t.Logf("Listener = %d", Params.Listener) - assert.NotZero(t, Params.ClientMaxRecvSize) t.Logf("ClientMaxRecvSize = %d", Params.ClientMaxRecvSize)