diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 4fdfe76e45..7348d99461 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "math" "net" "strconv" "sync" @@ -73,8 +74,11 @@ func (s *Server) startGrpcLoop(grpcPort int) { log.Debug("DataNode address", zap.String("address", addr)) tracer := opentracing.GlobalTracer() - s.grpcServer = grpc.NewServer(grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + s.grpcServer = grpc.NewServer( + grpc.MaxRecvMsgSize(math.MaxInt32), + grpc.MaxSendMsgSize(math.MaxInt32), + grpc.UnaryInterceptor( + otgrpc.OpenTracingServerInterceptor(tracer)), grpc.StreamInterceptor( otgrpc.OpenTracingStreamServerInterceptor(tracer))) datapb.RegisterDataNodeServer(s.grpcServer, s) diff --git a/internal/distributed/dataservice/service.go b/internal/distributed/dataservice/service.go index a199800258..710810deee 100644 --- a/internal/distributed/dataservice/service.go +++ b/internal/distributed/dataservice/service.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "math" "net" "strconv" "sync" @@ -134,8 +135,11 @@ func (s *Server) startGrpcLoop(grpcPort int) { defer cancel() tracer := opentracing.GlobalTracer() - s.grpcServer = grpc.NewServer(grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + s.grpcServer = grpc.NewServer( + grpc.MaxRecvMsgSize(math.MaxInt32), + grpc.MaxSendMsgSize(math.MaxInt32), + grpc.UnaryInterceptor( + otgrpc.OpenTracingServerInterceptor(tracer)), grpc.StreamInterceptor( otgrpc.OpenTracingStreamServerInterceptor(tracer))) datapb.RegisterDataServiceServer(s.grpcServer, s) diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 2c5fd96ff6..2c518c86a7 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -3,6 +3,7 @@ package grpcindexnode import ( "context" "log" + "math" "net" "strconv" "sync" @@ -61,8 +62,11 @@ func (s *Server) startGrpcLoop(grpcPort int) { defer cancel() tracer := opentracing.GlobalTracer() - s.grpcServer = grpc.NewServer(grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + s.grpcServer = grpc.NewServer( + grpc.MaxRecvMsgSize(math.MaxInt32), + grpc.MaxSendMsgSize(math.MaxInt32), + grpc.UnaryInterceptor( + otgrpc.OpenTracingServerInterceptor(tracer)), grpc.StreamInterceptor( otgrpc.OpenTracingStreamServerInterceptor(tracer))) indexpb.RegisterIndexNodeServer(s.grpcServer, s) diff --git a/internal/distributed/indexservice/service.go b/internal/distributed/indexservice/service.go index 0088635dc6..b32fe6f40b 100644 --- a/internal/distributed/indexservice/service.go +++ b/internal/distributed/indexservice/service.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "log" + "math" "net" "strconv" "sync" @@ -133,8 +134,11 @@ func (s *Server) startGrpcLoop(grpcPort int) { defer cancel() tracer := opentracing.GlobalTracer() - s.grpcServer = grpc.NewServer(grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + s.grpcServer = grpc.NewServer( + grpc.MaxRecvMsgSize(math.MaxInt32), + grpc.MaxSendMsgSize(math.MaxInt32), + grpc.UnaryInterceptor( + otgrpc.OpenTracingServerInterceptor(tracer)), grpc.StreamInterceptor( otgrpc.OpenTracingStreamServerInterceptor(tracer))) indexpb.RegisterIndexServiceServer(s.grpcServer, s) diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index 6fc3af69ef..313eb2dec5 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "math" "net" "strconv" "sync" @@ -209,8 +210,11 @@ func (s *Server) startGrpcLoop(grpcPort int) { defer cancel() tracer := opentracing.GlobalTracer() - s.grpcServer = grpc.NewServer(grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + s.grpcServer = grpc.NewServer( + grpc.MaxRecvMsgSize(math.MaxInt32), + grpc.MaxSendMsgSize(math.MaxInt32), + grpc.UnaryInterceptor( + otgrpc.OpenTracingServerInterceptor(tracer)), grpc.StreamInterceptor( otgrpc.OpenTracingStreamServerInterceptor(tracer))) masterpb.RegisterMasterServiceServer(s.grpcServer, s) diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index aa5d5f91ce..dbd697f439 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "log" + "math" "net" "strconv" "sync" @@ -83,8 +84,11 @@ func (s *Server) startGrpcLoop(grpcPort int) { defer cancel() tracer := opentracing.GlobalTracer() - s.grpcServer = grpc.NewServer(grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + s.grpcServer = grpc.NewServer( + grpc.MaxRecvMsgSize(math.MaxInt32), + grpc.MaxSendMsgSize(math.MaxInt32), + grpc.UnaryInterceptor( + otgrpc.OpenTracingServerInterceptor(tracer)), grpc.StreamInterceptor( otgrpc.OpenTracingStreamServerInterceptor(tracer)), grpc.MaxRecvMsgSize(GRPCMaxMagSize)) diff --git a/internal/distributed/proxyservice/service.go b/internal/distributed/proxyservice/service.go index 201f40070a..c3fe8ad0b5 100644 --- a/internal/distributed/proxyservice/service.go +++ b/internal/distributed/proxyservice/service.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "log" + "math" "net" "strconv" "sync" @@ -116,8 +117,11 @@ func (s *Server) startGrpcLoop(grpcPort int) { defer cancel() tracer := opentracing.GlobalTracer() - s.grpcServer = grpc.NewServer(grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + s.grpcServer = grpc.NewServer( + grpc.MaxRecvMsgSize(math.MaxInt32), + grpc.MaxSendMsgSize(math.MaxInt32), + grpc.UnaryInterceptor( + otgrpc.OpenTracingServerInterceptor(tracer)), grpc.StreamInterceptor( otgrpc.OpenTracingStreamServerInterceptor(tracer))) proxypb.RegisterProxyServiceServer(s.grpcServer, s) diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 771bb2d171..e6e050aff5 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "math" "net" "strconv" "sync" @@ -221,8 +222,11 @@ func (s *Server) startGrpcLoop(grpcPort int) { log.Debug("QueryNode", zap.String("address", addr)) tracer := opentracing.GlobalTracer() - s.grpcServer = grpc.NewServer(grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + s.grpcServer = grpc.NewServer( + grpc.MaxRecvMsgSize(math.MaxInt32), + grpc.MaxSendMsgSize(math.MaxInt32), + grpc.UnaryInterceptor( + otgrpc.OpenTracingServerInterceptor(tracer)), grpc.StreamInterceptor( otgrpc.OpenTracingStreamServerInterceptor(tracer))) querypb.RegisterQueryNodeServer(s.grpcServer, s) diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go index a51b99a8af..1e1d021e8a 100644 --- a/internal/distributed/queryservice/service.go +++ b/internal/distributed/queryservice/service.go @@ -2,6 +2,7 @@ package grpcqueryservice import ( "context" + "math" "net" "strconv" "sync" @@ -152,8 +153,11 @@ func (s *Server) startGrpcLoop(grpcPort int) { defer cancel() tracer := opentracing.GlobalTracer() - s.grpcServer = grpc.NewServer(grpc.UnaryInterceptor( - otgrpc.OpenTracingServerInterceptor(tracer)), + s.grpcServer = grpc.NewServer( + grpc.MaxRecvMsgSize(math.MaxInt32), + grpc.MaxSendMsgSize(math.MaxInt32), + grpc.UnaryInterceptor( + otgrpc.OpenTracingServerInterceptor(tracer)), grpc.StreamInterceptor( otgrpc.OpenTracingStreamServerInterceptor(tracer))) querypb.RegisterQueryServiceServer(s.grpcServer, s)