diff --git a/internal/util/grpcclient/client.go b/internal/util/grpcclient/client.go index 865e791736..8890c7fb98 100644 --- a/internal/util/grpcclient/client.go +++ b/internal/util/grpcclient/client.go @@ -6,8 +6,6 @@ import ( "sync" "time" - grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware" - grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpcopentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/funcutil" @@ -15,7 +13,6 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" - "google.golang.org/grpc/codes" "google.golang.org/grpc/keepalive" ) @@ -121,6 +118,20 @@ func (c *ClientBase) connect(ctx context.Context) error { opts := trace.GetInterceptorOpts() dialContext, cancel := context.WithTimeout(ctx, dialTimeout) + + retryPolicy := `{ + "methodConfig": [{ + "name": [{}], + "waitForReady": true, + "retryPolicy": { + "MaxAttempts": 4, + "InitialBackoff": ".1s", + "MaxBackoff": ".4s", + "BackoffMultiplier": 1.6, + "RetryableStatusCodes": [ "UNAVAILABLE" ] + } + }]}` + conn, err := grpc.DialContext( dialContext, addr, @@ -128,22 +139,11 @@ func (c *ClientBase) connect(ctx context.Context) error { grpc.WithBlock(), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(c.ClientMaxRecvSize), - grpc.MaxCallSendMsgSize(c.ClientMaxSendSize)), - grpc.WithUnaryInterceptor( - grpcmiddleware.ChainUnaryClient( - grpcretry.UnaryClientInterceptor( - grpcretry.WithMax(3), - grpcretry.WithCodes(codes.Aborted, codes.Unavailable), - ), - grpcopentracing.UnaryClientInterceptor(opts...), - )), - grpc.WithStreamInterceptor( - grpcmiddleware.ChainStreamClient( - grpcretry.StreamClientInterceptor(grpcretry.WithMax(3), - grpcretry.WithCodes(codes.Aborted, codes.Unavailable), - ), - grpcopentracing.StreamClientInterceptor(opts...), - )), + grpc.MaxCallSendMsgSize(c.ClientMaxSendSize), + ), + grpc.WithUnaryInterceptor(grpcopentracing.UnaryClientInterceptor(opts...)), + grpc.WithStreamInterceptor(grpcopentracing.StreamClientInterceptor(opts...)), + grpc.WithDefaultServiceConfig(retryPolicy), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: keepAliveTime, // send pings every 60 seconds if there is no activity Timeout: keepAliveTimeout, // wait 6 second for ping ack before considering the connection dead