diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 536d86deda..5a6392e007 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -171,8 +171,8 @@ proxy: grpc: serverMaxRecvSize: 67108864 # 64M serverMaxSendSize: 67108864 # 64M - clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024 - clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024 + clientMaxRecvSize: 268435456 # 256 MB + clientMaxSendSize: 268435456 # 256 MB # Related configuration of queryCoord, used to manage topology and load balancing for the query nodes, and handoff from growing segments to sealed segments. @@ -333,8 +333,8 @@ grpc: serverMaxRecvSize: 536870912 # 512MB serverMaxSendSize: 536870912 # 512MB - clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024 - clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024 + clientMaxRecvSize: 268435456 # 256 MB + clientMaxSendSize: 268435456 # 256 MB client: dialTimeout: 5000 @@ -344,6 +344,7 @@ grpc: initialBackOff: 1.0 maxBackoff: 60.0 backoffMultiplier: 2.0 + compressionEnabled: false # Configure the proxy tls enable. tls: diff --git a/internal/distributed/datacoord/client/client.go b/internal/distributed/datacoord/client/client.go index 658d1a0373..2b30c07cd5 100644 --- a/internal/distributed/datacoord/client/client.go +++ b/internal/distributed/datacoord/client/client.go @@ -73,6 +73,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) ( InitialBackoff: float32(clientParams.InitialBackoff.GetAsFloat()), MaxBackoff: float32(clientParams.MaxBackoff.GetAsFloat()), BackoffMultiplier: float32(clientParams.BackoffMultiplier.GetAsFloat()), + CompressionEnabled: clientParams.CompressionEnabled.GetAsBool(), }, sess: sess, } diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 72b8da308d..5b95a1bb23 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -60,6 +60,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) { InitialBackoff: float32(clientParams.InitialBackoff.GetAsFloat()), MaxBackoff: float32(clientParams.MaxBackoff.GetAsFloat()), BackoffMultiplier: float32(clientParams.BackoffMultiplier.GetAsFloat()), + CompressionEnabled: clientParams.CompressionEnabled.GetAsBool(), }, } client.grpcClient.SetRole(typeutil.DataNodeRole) diff --git a/internal/distributed/indexnode/client/client.go b/internal/distributed/indexnode/client/client.go index 7fc3d7b12a..b6391ca9ef 100644 --- a/internal/distributed/indexnode/client/client.go +++ b/internal/distributed/indexnode/client/client.go @@ -61,6 +61,7 @@ func NewClient(ctx context.Context, addr string, encryption bool) (*Client, erro InitialBackoff: float32(clientParams.InitialBackoff.GetAsFloat()), MaxBackoff: float32(clientParams.MaxBackoff.GetAsFloat()), BackoffMultiplier: float32(clientParams.BackoffMultiplier.GetAsFloat()), + CompressionEnabled: clientParams.CompressionEnabled.GetAsBool(), }, } client.grpcClient.SetRole(typeutil.IndexNodeRole) diff --git a/internal/distributed/proxy/client/client.go b/internal/distributed/proxy/client/client.go index 23ddcc9c75..96a2955e35 100644 --- a/internal/distributed/proxy/client/client.go +++ b/internal/distributed/proxy/client/client.go @@ -60,6 +60,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) { InitialBackoff: float32(clientParams.InitialBackoff.GetAsFloat()), MaxBackoff: float32(clientParams.MaxBackoff.GetAsFloat()), BackoffMultiplier: float32(clientParams.BackoffMultiplier.GetAsFloat()), + CompressionEnabled: clientParams.CompressionEnabled.GetAsBool(), }, } client.grpcClient.SetRole(typeutil.ProxyRole) diff --git a/internal/distributed/querycoord/client/client.go b/internal/distributed/querycoord/client/client.go index bd206a2f0f..97e2dddef2 100644 --- a/internal/distributed/querycoord/client/client.go +++ b/internal/distributed/querycoord/client/client.go @@ -66,6 +66,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) ( InitialBackoff: float32(clientParams.InitialBackoff.GetAsFloat()), MaxBackoff: float32(clientParams.MaxBackoff.GetAsFloat()), BackoffMultiplier: float32(clientParams.BackoffMultiplier.GetAsFloat()), + CompressionEnabled: clientParams.CompressionEnabled.GetAsBool(), }, sess: sess, } diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index e3350d21d7..2e9546bdd0 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -61,6 +61,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) { InitialBackoff: float32(clientParams.InitialBackoff.GetAsFloat()), MaxBackoff: float32(clientParams.MaxBackoff.GetAsFloat()), BackoffMultiplier: float32(clientParams.BackoffMultiplier.GetAsFloat()), + CompressionEnabled: clientParams.CompressionEnabled.GetAsBool(), }, } client.grpcClient.SetRole(typeutil.QueryNodeRole) diff --git a/internal/distributed/rootcoord/client/client.go b/internal/distributed/rootcoord/client/client.go index 35e8383bfb..2781df6ec3 100644 --- a/internal/distributed/rootcoord/client/client.go +++ b/internal/distributed/rootcoord/client/client.go @@ -73,6 +73,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) ( InitialBackoff: float32(clientParams.InitialBackoff.GetAsFloat()), MaxBackoff: float32(clientParams.MaxBackoff.GetAsFloat()), BackoffMultiplier: float32(clientParams.BackoffMultiplier.GetAsFloat()), + CompressionEnabled: clientParams.CompressionEnabled.GetAsBool(), }, sess: sess, } diff --git a/internal/util/grpcclient/client.go b/internal/util/grpcclient/client.go index bb8b06d5bf..6da9238fd4 100644 --- a/internal/util/grpcclient/client.go +++ b/internal/util/grpcclient/client.go @@ -65,6 +65,7 @@ type ClientBase[T any] struct { role string ClientMaxSendSize int ClientMaxRecvSize int + CompressionEnabled bool RetryServiceNameConfig string DialTimeout time.Duration @@ -152,7 +153,6 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { opts := tracer.GetInterceptorOpts() dialContext, cancel := context.WithTimeout(ctx, c.DialTimeout) - // refer to https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto retryPolicy := fmt.Sprintf(`{ "methodConfig": [{ @@ -167,6 +167,10 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { }]}`, c.RetryServiceNameConfig, c.MaxAttempts, c.InitialBackoff, c.MaxBackoff, c.BackoffMultiplier) var conn *grpc.ClientConn + compress := None + if c.CompressionEnabled { + compress = Zstd + } if c.encryption { conn, err = grpc.DialContext( dialContext, @@ -178,6 +182,7 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(c.ClientMaxRecvSize), grpc.MaxCallSendMsgSize(c.ClientMaxSendSize), + grpc.UseCompressor(compress), ), grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(opts...)), grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(opts...)), @@ -208,6 +213,7 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(c.ClientMaxRecvSize), grpc.MaxCallSendMsgSize(c.ClientMaxSendSize), + grpc.UseCompressor(compress), ), grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(opts...)), grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(opts...)), diff --git a/internal/util/grpcclient/client_test.go b/internal/util/grpcclient/client_test.go index 8e45a96eec..7255d7b412 100644 --- a/internal/util/grpcclient/client_test.go +++ b/internal/util/grpcclient/client_test.go @@ -79,8 +79,17 @@ func TestClientBase_connect(t *testing.T) { } func TestClientBase_Call(t *testing.T) { + testCall(t, false) +} + +func TestClientBase_CompressCall(t *testing.T) { + testCall(t, true) +} + +func testCall(t *testing.T, compressed bool) { // mock client with nothing base := ClientBase[any]{} + base.CompressionEnabled = compressed base.grpcClientMtx.Lock() base.grpcClient = struct{}{} base.grpcClientMtx.Unlock() @@ -275,9 +284,8 @@ func (s *server) SayHello(ctx context.Context, in *helloworld.HelloRequest) (*he func TestClientBase_RetryPolicy(t *testing.T) { // server - port := ":50051" - address := "localhost:50051" - lis, err := net.Listen("tcp", port) + lis, err := net.Listen("tcp", "localhost:") + address := lis.Addr() if err != nil { log.Fatalf("failed to listen: %v", err) } @@ -318,7 +326,68 @@ func TestClientBase_RetryPolicy(t *testing.T) { } clientBase.SetRole(typeutil.DataCoordRole) clientBase.SetGetAddrFunc(func() (string, error) { - return address, nil + return address.String(), nil + }) + clientBase.SetNewGrpcClientFunc(func(cc *grpc.ClientConn) helloworld.GreeterClient { + return helloworld.NewGreeterClient(cc) + }) + defer clientBase.Close() + + ctx := context.Background() + name := fmt.Sprintf("hello world %d", time.Now().Second()) + res, err := clientBase.Call(ctx, func(client helloworld.GreeterClient) (any, error) { + fmt.Println("client base...") + return client.SayHello(ctx, &helloworld.HelloRequest{Name: name}) + }) + assert.Nil(t, err) + assert.Equal(t, res.(*helloworld.HelloReply).Message, strings.ToUpper(name)) +} + +func TestClientBase_Compression(t *testing.T) { + lis, err := net.Listen("tcp", "localhost:") + address := lis.Addr() + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + var kaep = keepalive.EnforcementPolicy{ + MinTime: 5 * time.Second, + PermitWithoutStream: true, + } + var kasp = keepalive.ServerParameters{ + Time: 60 * time.Second, + Timeout: 60 * time.Second, + } + + maxAttempts := 5 + s := grpc.NewServer( + grpc.KeepaliveEnforcementPolicy(kaep), + grpc.KeepaliveParams(kasp), + ) + helloworld.RegisterGreeterServer(s, &server{SuccessCount: uint(1)}) + reflection.Register(s) + go func() { + if err := s.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } + }() + defer s.Stop() + + clientBase := ClientBase[helloworld.GreeterClient]{ + ClientMaxRecvSize: 1 * 1024 * 1024, + ClientMaxSendSize: 1 * 1024 * 1024, + DialTimeout: 60 * time.Second, + KeepAliveTime: 60 * time.Second, + KeepAliveTimeout: 60 * time.Second, + RetryServiceNameConfig: "helloworld.Greeter", + MaxAttempts: maxAttempts, + InitialBackoff: 10.0, + MaxBackoff: 60.0, + BackoffMultiplier: 2.0, + CompressionEnabled: true, + } + clientBase.SetRole(typeutil.DataCoordRole) + clientBase.SetGetAddrFunc(func() (string, error) { + return address.String(), nil }) clientBase.SetNewGrpcClientFunc(func(cc *grpc.ClientConn) helloworld.GreeterClient { return helloworld.NewGreeterClient(cc) diff --git a/internal/util/grpcclient/grpc_encoder.go b/internal/util/grpcclient/grpc_encoder.go new file mode 100644 index 0000000000..fbc7c1369d --- /dev/null +++ b/internal/util/grpcclient/grpc_encoder.go @@ -0,0 +1,84 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// grpc zstd implementation from https://github.com/cortexproject/cortex + +package grpcclient + +import ( + "bytes" + "io" + + "github.com/klauspost/compress/zstd" + "google.golang.org/grpc/encoding" +) + +const None = "" +const Zstd = "zstd" + +type grpcCompressor struct { + encoder *zstd.Encoder + decoder *zstd.Decoder +} + +func init() { + enc, _ := zstd.NewWriter(nil) + dec, _ := zstd.NewReader(nil) + c := &grpcCompressor{ + encoder: enc, + decoder: dec, + } + encoding.RegisterCompressor(c) +} + +func (c *grpcCompressor) Compress(w io.Writer) (io.WriteCloser, error) { + return &zstdWriteCloser{ + enc: c.encoder, + writer: w, + }, nil +} + +type zstdWriteCloser struct { + enc *zstd.Encoder + writer io.Writer // Compressed data will be written here. + buf bytes.Buffer // Buffer uncompressed data here, compress on Close. +} + +func (z *zstdWriteCloser) Write(p []byte) (int, error) { + return z.buf.Write(p) +} + +func (z *zstdWriteCloser) Close() error { + compressed := z.enc.EncodeAll(z.buf.Bytes(), nil) + _, err := io.Copy(z.writer, bytes.NewReader(compressed)) + return err +} + +func (c *grpcCompressor) Decompress(r io.Reader) (io.Reader, error) { + compressed, err := io.ReadAll(r) + if err != nil { + return nil, err + } + + uncompressed, err := c.decoder.DecodeAll(compressed, nil) + if err != nil { + return nil, err + } + return bytes.NewReader(uncompressed), nil +} + +func (c *grpcCompressor) Name() string { + return Zstd +} diff --git a/internal/util/grpcclient/grpc_encoder_test.go b/internal/util/grpcclient/grpc_encoder_test.go new file mode 100644 index 0000000000..43ced3e1a0 --- /dev/null +++ b/internal/util/grpcclient/grpc_encoder_test.go @@ -0,0 +1,44 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package grpcclient + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/encoding" +) + +func TestGrpcEncoder(t *testing.T) { + data := "hello zstd algorithm!" + var buf bytes.Buffer + + compressor := encoding.GetCompressor(Zstd) + writer, err := compressor.Compress(&buf) + assert.NoError(t, err) + written, err := writer.Write([]byte(data)) + assert.NoError(t, err) + assert.Equal(t, written, len(data)) + err = writer.Close() + assert.NoError(t, err) + + reader, err := compressor.Decompress(bytes.NewReader(buf.Bytes())) + assert.NoError(t, err) + result := make([]byte, len(data)) + reader.Read(result) + assert.Equal(t, data, string(result)) +} diff --git a/internal/util/paramtable/grpc_param.go b/internal/util/paramtable/grpc_param.go index aa8cbd6a64..8607be17ce 100644 --- a/internal/util/paramtable/grpc_param.go +++ b/internal/util/paramtable/grpc_param.go @@ -28,10 +28,10 @@ const ( DefaultServerMaxRecvSize = 512 * 1024 * 1024 // DefaultClientMaxSendSize defines the maximum size of data per grpc request can send by client side. - DefaultClientMaxSendSize = 100 * 1024 * 1024 + DefaultClientMaxSendSize = 256 * 1024 * 1024 // DefaultClientMaxRecvSize defines the maximum size of data per grpc request can receive by client side. - DefaultClientMaxRecvSize = 100 * 1024 * 1024 + DefaultClientMaxRecvSize = 256 * 1024 * 1024 // DefaultLogLevel defines the log level of grpc DefaultLogLevel = "WARNING" @@ -47,6 +47,8 @@ const ( DefaultMaxBackoff float64 = 60.0 DefaultBackoffMultiplier float64 = 2.0 + DefaultCompressionEnabled bool = false + ProxyInternalPort = 19529 ProxyExternalPort = 19530 ) @@ -175,6 +177,8 @@ func (p *GrpcServerConfig) Init(domain string, base *BaseTable) { type GrpcClientConfig struct { grpcConfig + CompressionEnabled ParamItem `refreshable:"false"` + ClientMaxSendSize ParamItem `refreshable:"false"` ClientMaxRecvSize ParamItem `refreshable:"false"` @@ -378,4 +382,24 @@ func (p *GrpcClientConfig) Init(domain string, base *BaseTable) { }, } p.BackoffMultiplier.Init(base.mgr) + + compressionEnabled := fmt.Sprintf("%t", DefaultCompressionEnabled) + p.CompressionEnabled = ParamItem{ + Key: "grpc.client.compressionEnabled", + Version: "2.0.0", + Formatter: func(v string) string { + if v == "" { + return compressionEnabled + } + _, err := strconv.ParseBool(v) + if err != nil { + log.Warn("Failed to convert int when parsing grpc.client.compressionEnabled, set to default", + zap.String("role", p.Domain), + zap.String("grpc.client.compressionEnabled", v)) + return backoffMultiplier + } + return v + }, + } + p.CompressionEnabled.Init(base.mgr) } diff --git a/internal/util/paramtable/grpc_param_test.go b/internal/util/paramtable/grpc_param_test.go index d1b5057441..77427a6f2a 100644 --- a/internal/util/paramtable/grpc_param_test.go +++ b/internal/util/paramtable/grpc_param_test.go @@ -142,6 +142,12 @@ func TestGrpcClientParams(t *testing.T) { base.Save("grpc.client.backoffMultiplier", "3.0") assert.Equal(t, clientConfig.BackoffMultiplier.GetAsFloat(), 3.0) + assert.Equal(t, clientConfig.CompressionEnabled.GetAsBool(), DefaultCompressionEnabled) + base.Save("grpc.client.CompressionEnabled", "a") + assert.Equal(t, clientConfig.CompressionEnabled.GetAsBool(), DefaultCompressionEnabled) + base.Save("grpc.client.CompressionEnabled", "true") + assert.Equal(t, clientConfig.CompressionEnabled.GetAsBool(), true) + base.Save("common.security.tlsMode", "1") base.Save("tls.serverPemPath", "/pem") base.Save("tls.serverKeyPath", "/key")