From 28681276e2cc1d6fb04b48d569476ad2f2f6e1e0 Mon Sep 17 00:00:00 2001 From: SimFG Date: Wed, 6 Sep 2023 17:43:14 +0800 Subject: [PATCH] Improve the retry of the rpc client (#26795) Signed-off-by: SimFG --- configs/milvus.yaml | 8 +- .../querycoordv2/meta/collection_manager.go | 19 +- .../querycoordv2/meta/coordinator_broker.go | 2 +- internal/querycoordv2/mocks/querynode.go | 4 +- internal/querycoordv2/server.go | 6 +- internal/querycoordv2/session/cluster_test.go | 3 + internal/querynodev2/handlers.go | 2 +- internal/querynodev2/segments/result.go | 3 +- internal/rootcoord/root_coord.go | 2 +- internal/util/grpcclient/client.go | 228 +++++++++--------- internal/util/grpcclient/client_test.go | 93 ++++--- pkg/go.sum | 1 + pkg/util/merr/errors.go | 4 +- pkg/util/merr/utils.go | 10 +- pkg/util/paramtable/grpc_param.go | 50 +--- pkg/util/paramtable/grpc_param_test.go | 13 +- pkg/util/retry/retry.go | 2 +- .../cross_cluster_routing_test.go | 4 + 18 files changed, 212 insertions(+), 242 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 424721fd84..baf439624a 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -406,10 +406,10 @@ grpc: dialTimeout: 200 keepAliveTime: 10000 keepAliveTimeout: 20000 - maxMaxAttempts: 5 - initialBackoff: 1 - maxBackoff: 10 - backoffMultiplier: 2 + maxMaxAttempts: 10 + initialBackOff: 0.2 # seconds + maxBackoff: 10 # seconds + backoffMultiplier: 2.0 # deprecated clientMaxSendSize: 268435456 clientMaxRecvSize: 268435456 diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index c994d0df54..be3845a363 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -19,6 +19,7 @@ package meta import ( "context" "fmt" + "strconv" "sync" "time" @@ -123,20 +124,25 @@ func (m *CollectionManager) Recover(broker Broker) error { return err } + ctx := log.WithTraceID(context.Background(), strconv.FormatInt(time.Now().UnixNano(), 10)) + ctxLog := log.Ctx(ctx) + ctxLog.Info("recover collections and partitions from kv store") + for _, collection := range collections { // Dropped collection should be deprecated - _, err = broker.GetCollectionSchema(context.Background(), collection.GetCollectionID()) + _, err = broker.GetCollectionSchema(ctx, collection.GetCollectionID()) if errors.Is(err, merr.ErrCollectionNotFound) { - log.Info("skip dropped collection during recovery", zap.Int64("collection", collection.GetCollectionID())) + ctxLog.Info("skip dropped collection during recovery", zap.Int64("collection", collection.GetCollectionID())) m.catalog.ReleaseCollection(collection.GetCollectionID()) continue } if err != nil { + ctxLog.Warn("failed to get collection schema", zap.Error(err)) return err } // Collections not loaded done should be deprecated if collection.GetStatus() != querypb.LoadStatus_Loaded || collection.GetReplicaNumber() <= 0 { - log.Info("skip recovery and release collection", + ctxLog.Info("skip recovery and release collection", zap.Int64("collectionID", collection.GetCollectionID()), zap.String("status", collection.GetStatus().String()), zap.Int32("replicaNumber", collection.GetReplicaNumber()), @@ -150,13 +156,14 @@ func (m *CollectionManager) Recover(broker Broker) error { } for collection, partitions := range partitions { - existPartitions, err := broker.GetPartitions(context.Background(), collection) + existPartitions, err := broker.GetPartitions(ctx, collection) if errors.Is(err, merr.ErrCollectionNotFound) { - log.Info("skip dropped collection during recovery", zap.Int64("collection", collection)) + ctxLog.Info("skip dropped collection during recovery", zap.Int64("collection", collection)) m.catalog.ReleaseCollection(collection) continue } if err != nil { + ctxLog.Warn("failed to get partitions", zap.Error(err)) return err } omitPartitions := make([]int64, 0) @@ -168,7 +175,7 @@ func (m *CollectionManager) Recover(broker Broker) error { return true }) if len(omitPartitions) > 0 { - log.Info("skip dropped partitions during recovery", + ctxLog.Info("skip dropped partitions during recovery", zap.Int64("collection", collection), zap.Int64s("partitions", omitPartitions)) m.catalog.ReleasePartition(collection, omitPartitions...) } diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index ea2324eb11..4dfc8d43f5 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -80,7 +80,7 @@ func (broker *CoordinatorBroker) GetCollectionSchema(ctx context.Context, collec err = merr.Error(resp.GetStatus()) if err != nil { - log.Warn("failed to get collection schema", zap.Error(err)) + log.Ctx(ctx).Warn("failed to get collection schema", zap.Error(err)) return nil, err } return resp.GetSchema(), nil diff --git a/internal/querycoordv2/mocks/querynode.go b/internal/querycoordv2/mocks/querynode.go index 013b267db1..db81ef8761 100644 --- a/internal/querycoordv2/mocks/querynode.go +++ b/internal/querycoordv2/mocks/querynode.go @@ -117,7 +117,9 @@ func (node *MockQueryNode) Start() error { case <-node.ctx.Done(): return nil default: - return &milvuspb.ComponentStates{} + return &milvuspb.ComponentStates{ + Status: successStatus, + } } }, func(context.Context, *milvuspb.GetComponentStatesRequest) error { select { diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index d8609c18eb..624b7737b4 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -310,7 +310,7 @@ func (s *Server) initMeta() error { log.Info("recover meta...") err := s.meta.CollectionManager.Recover(s.broker) if err != nil { - log.Warn("failed to recover collections") + log.Warn("failed to recover collections", zap.Error(err)) return err } collections := s.meta.GetAll() @@ -323,13 +323,13 @@ func (s *Server) initMeta() error { err = s.meta.ReplicaManager.Recover(collections) if err != nil { - log.Warn("failed to recover replicas") + log.Warn("failed to recover replicas", zap.Error(err)) return err } err = s.meta.ResourceManager.Recover() if err != nil { - log.Warn("failed to recover resource groups") + log.Warn("failed to recover resource groups", zap.Error(err)) return err } diff --git a/internal/querycoordv2/session/cluster_test.go b/internal/querycoordv2/session/cluster_test.go index d888387e45..8e7b5e618a 100644 --- a/internal/querycoordv2/session/cluster_test.go +++ b/internal/querycoordv2/session/cluster_test.go @@ -19,6 +19,7 @@ package session import ( "context" "net" + "strconv" "testing" "time" @@ -45,10 +46,12 @@ type ClusterTestSuite struct { func (suite *ClusterTestSuite) SetupSuite() { paramtable.Init() + paramtable.Get().Save("grpc.client.maxMaxAttempts", "1") suite.setupServers() } func (suite *ClusterTestSuite) TearDownSuite() { + paramtable.Get().Save("grpc.client.maxMaxAttempts", strconv.FormatInt(paramtable.DefaultMaxAttempts, 10)) for _, svr := range suite.svrs { svr.GracefulStop() } diff --git a/internal/querynodev2/handlers.go b/internal/querynodev2/handlers.go index 0dab6bbcfa..1962560ef1 100644 --- a/internal/querynodev2/handlers.go +++ b/internal/querynodev2/handlers.go @@ -219,7 +219,7 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque )) // - failRet.Status.ErrorCode = commonpb.ErrorCode_Success + ret.Status = merr.Status(nil) latency := tr.ElapseSpan() metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.Leader).Observe(float64(latency.Milliseconds())) metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel, metrics.Leader).Inc() diff --git a/internal/querynodev2/segments/result.go b/internal/querynodev2/segments/result.go index 9703ab0e03..c717180a6d 100644 --- a/internal/querynodev2/segments/result.go +++ b/internal/querynodev2/segments/result.go @@ -245,7 +245,8 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna ) var ( ret = &internalpb.RetrieveResults{ - Ids: &schemapb.IDs{}, + Status: merr.Status(nil), + Ids: &schemapb.IDs{}, } skipDupCnt int64 loopEnd int diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index afc7e4af7d..1238d46c34 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -491,7 +491,7 @@ func (c *Core) Init() error { log.Error("RootCoord start failed", zap.Error(err)) } }) - log.Info("RootCoord startup success") + log.Info("RootCoord startup success", zap.String("address", c.session.Address)) return err } c.UpdateStateCode(commonpb.StateCode_StandBy) diff --git a/internal/util/grpcclient/client.go b/internal/util/grpcclient/client.go index 7dc8a12685..3aaf37981a 100644 --- a/internal/util/grpcclient/client.go +++ b/internal/util/grpcclient/client.go @@ -19,22 +19,13 @@ package grpcclient import ( "context" "crypto/tls" - "fmt" "strings" "sync" "time" + "github.com/cockroachdb/errors" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" - "go.uber.org/atomic" - "go.uber.org/zap" - "golang.org/x/sync/singleflight" - "google.golang.org/grpc" - "google.golang.org/grpc/backoff" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/keepalive" - + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/log" @@ -46,6 +37,15 @@ import ( "github.com/milvus-io/milvus/pkg/util/interceptor" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/retry" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.uber.org/atomic" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" ) // GrpcClient abstracts client of grpc @@ -88,14 +88,11 @@ type ClientBase[T interface { KeepAliveTime time.Duration KeepAliveTimeout time.Duration - MaxAttempts int - InitialBackoff float32 - MaxBackoff float32 - BackoffMultiplier float32 - NodeID atomic.Int64 - sess *sessionutil.Session - - sf singleflight.Group + MaxAttempts int + InitialBackoff float64 + MaxBackoff float64 + NodeID atomic.Int64 + sess *sessionutil.Session } func NewClientBase[T interface { @@ -109,9 +106,8 @@ func NewClientBase[T interface { KeepAliveTimeout: config.KeepAliveTimeout.GetAsDuration(time.Millisecond), RetryServiceNameConfig: serviceName, MaxAttempts: config.MaxAttempts.GetAsInt(), - InitialBackoff: float32(config.InitialBackoff.GetAsFloat()), - MaxBackoff: float32(config.MaxBackoff.GetAsFloat()), - BackoffMultiplier: float32(config.BackoffMultiplier.GetAsFloat()), + InitialBackoff: config.InitialBackoff.GetAsFloat(), + MaxBackoff: config.MaxBackoff.GetAsFloat(), CompressionEnabled: config.CompressionEnabled.GetAsBool(), } } @@ -196,18 +192,6 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { opts := tracer.GetInterceptorOpts() dialContext, cancel := context.WithTimeout(ctx, c.DialTimeout) - // refer to https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto - retryPolicy := fmt.Sprintf(`{ - "methodConfig": [{ - "name": [{"service": "%s"}], - "retryPolicy": { - "MaxAttempts": %d, - "InitialBackoff": "%fs", - "MaxBackoff": "%fs", - "BackoffMultiplier": %f, - "RetryableStatusCodes": [ "UNAVAILABLE" ] - } - }]}`, c.RetryServiceNameConfig, c.MaxAttempts, c.InitialBackoff, c.MaxBackoff, c.BackoffMultiplier) var conn *grpc.ClientConn compress := None @@ -236,7 +220,6 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { interceptor.ClusterInjectionStreamClientInterceptor(), interceptor.ServerIDInjectionStreamClientInterceptor(c.GetNodeID()), )), - grpc.WithDefaultServiceConfig(retryPolicy), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: c.KeepAliveTime, Timeout: c.KeepAliveTimeout, @@ -254,6 +237,7 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { grpc.WithPerRPCCredentials(&Token{Value: crypto.Base64Encode(util.MemberCredID)}), grpc.FailOnNonTempDialError(true), grpc.WithReturnConnectionError(), + grpc.WithDisableRetry(), ) } else { conn, err = grpc.DialContext( @@ -276,7 +260,6 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { interceptor.ClusterInjectionStreamClientInterceptor(), interceptor.ServerIDInjectionStreamClientInterceptor(c.GetNodeID()), )), - grpc.WithDefaultServiceConfig(retryPolicy), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: c.KeepAliveTime, Timeout: c.KeepAliveTimeout, @@ -294,6 +277,7 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { grpc.WithPerRPCCredentials(&Token{Value: crypto.Base64Encode(util.MemberCredID)}), grpc.FailOnNonTempDialError(true), grpc.WithReturnConnectionError(), + grpc.WithDisableRetry(), ) } @@ -311,55 +295,101 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { return nil } -func (c *ClientBase[T]) callOnce(ctx context.Context, caller func(client T) (any, error)) (any, error) { - log := log.Ctx(ctx).With(zap.String("role", c.GetRole())) - client, err := c.GetGrpcClient(ctx) - if err != nil { - return generic.Zero[T](), err +func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, error)) (any, error) { + log := log.Ctx(ctx).With(zap.String("client_role", c.GetRole())) + var ( + ret any + clientErr error + callErr error + client T + ) + + client, clientErr = c.GetGrpcClient(ctx) + if clientErr != nil { + log.Warn("fail to get grpc client", zap.Error(clientErr)) } - ret, err := caller(client) - if err == nil { - return ret, nil + resetClientFunc := func() { + c.resetConnection(client) + client, clientErr = c.GetGrpcClient(ctx) + if clientErr != nil { + log.Warn("fail to get grpc client in the retry state", zap.Error(clientErr)) + } } - if IsCrossClusterRoutingErr(err) { - log.Warn("CrossClusterRoutingErr, start to reset connection", zap.Error(err)) - c.resetConnection(client) - return ret, merr.ErrServiceUnavailable // For concealing ErrCrossClusterRouting from the client - } - if IsServerIDMismatchErr(err) { - log.Warn("Server ID mismatch, start to reset connection", zap.Error(err)) - c.resetConnection(client) - return ret, err - } - if !funcutil.CheckCtxValid(ctx) { - // check if server ID matches coord session, if not, reset connection - if c.sess != nil { - sessions, _, getSessionErr := c.sess.GetSessions(c.GetRole()) - if getSessionErr != nil { - // Only log but not handle this error as it is an auxiliary logic - log.Warn("Fail to GetSessions", zap.Error(getSessionErr)) + innerCtx, cancel := context.WithCancel(ctx) + defer cancel() + _ = retry.Do(innerCtx, func() error { + if generic.IsZero(client) { + callErr = errors.Wrap(clientErr, "empty grpc client") + log.Warn("grpc client is nil, maybe fail to get client in the retry state") + resetClientFunc() + return callErr + } + ret, callErr = caller(client) + if callErr != nil { + if funcutil.IsGrpcErr(callErr) || + IsCrossClusterRoutingErr(callErr) || IsServerIDMismatchErr(callErr) { + log.Warn("start to reset connection because of specific reasons", zap.Error(callErr)) + resetClientFunc() + return callErr } - if coordSess, exist := sessions[c.GetRole()]; exist { - if c.GetNodeID() != coordSess.ServerID { - log.Warn("Server ID mismatch, may connected to a old server, start to reset connection", zap.Error(err)) - c.resetConnection(client) - return ret, err + if !funcutil.CheckCtxValid(ctx) { + if c.sess != nil { + sessions, _, getSessionErr := c.sess.GetSessions(c.GetRole()) + if getSessionErr != nil { + // Only log but not handle this error as it is an auxiliary logic + log.Warn("fail to get session", zap.Error(getSessionErr)) + } + if coordSess, exist := sessions[c.GetRole()]; exist { + if c.GetNodeID() != coordSess.ServerID { + log.Warn("server id mismatch, may connected to a old server, start to reset connection", + zap.Int64("client_node", c.GetNodeID()), zap.Int64("current_node", coordSess.ServerID)) + resetClientFunc() + return callErr + } + } } } + log.Warn("fail to grpc call because of unknown error", zap.Error(callErr)) + // not rpc error, it will stop to retry + return retry.Unrecoverable(callErr) } - // start bg check in case of https://github.com/milvus-io/milvus/issues/22435 - go c.bgHealthCheck(client) - return generic.Zero[T](), err + + var status *commonpb.Status + switch res := ret.(type) { + case *commonpb.Status: + status = res + case interface{ GetStatus() *commonpb.Status }: + status = res.GetStatus() + default: + // it will directly return the result + log.Warn("unknown return type", zap.Any("return", ret)) + return nil + } + + if merr.Ok(status) || !merr.IsRetryableCode(status.GetCode()) { + return nil + } + + return errors.Newf("error code: %d, reason: %s", status.GetCode(), status.GetReason()) + }, retry.Attempts(uint(c.MaxAttempts)), + // Because the previous InitialBackoff and MaxBackoff were float, and the unit was s. + // For compatibility, this is multiplied by 1000. + retry.Sleep(time.Duration(c.InitialBackoff*1000)*time.Millisecond), + retry.MaxSleepTime(time.Duration(c.MaxBackoff*1000)*time.Millisecond)) + // default value list: MaxAttempts 10, InitialBackoff 0.2s, MaxBackoff 10s + // and consume 52.8s if all retry failed + + if callErr != nil { + // make the error more friendly to user + if IsCrossClusterRoutingErr(callErr) { + callErr = merr.ErrServiceUnavailable + } + + return generic.Zero[T](), callErr } - if !funcutil.IsGrpcErr(err) { - log.Warn("ClientBase:isNotGrpcErr", zap.Error(err)) - return generic.Zero[T](), err - } - log.Info("ClientBase grpc error, start to reset connection", zap.Error(err)) - c.resetConnection(client) - return ret, err + return ret, nil } // Call does a grpc call @@ -368,10 +398,10 @@ func (c *ClientBase[T]) Call(ctx context.Context, caller func(client T) (any, er return generic.Zero[T](), ctx.Err() } - ret, err := c.callOnce(ctx, caller) + ret, err := c.call(ctx, caller) if err != nil { - traceErr := fmt.Errorf("err: %w\n, %s", err, tracer.StackTrace()) - log.Ctx(ctx).Warn("ClientBase Call grpc first call get error", + traceErr := errors.Wrapf(err, "stack trace: %s", tracer.StackTrace()) + log.Ctx(ctx).Warn("ClientBase Call grpc call get error", zap.String("role", c.GetRole()), zap.String("address", c.GetAddr()), zap.Error(traceErr), @@ -383,44 +413,8 @@ func (c *ClientBase[T]) Call(ctx context.Context, caller func(client T) (any, er // ReCall does the grpc call twice func (c *ClientBase[T]) ReCall(ctx context.Context, caller func(client T) (any, error)) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return generic.Zero[T](), ctx.Err() - } - - ret, err := c.callOnce(ctx, caller) - if err == nil { - return ret, nil - } - - log := log.Ctx(ctx).With(zap.String("role", c.GetRole()), zap.String("address", c.GetAddr())) - traceErr := fmt.Errorf("err: %w\n, %s", err, tracer.StackTrace()) - log.Warn("ClientBase ReCall grpc first call get error ", zap.Error(traceErr)) - - if !funcutil.CheckCtxValid(ctx) { - return generic.Zero[T](), ctx.Err() - } - - ret, err = c.callOnce(ctx, caller) - if err != nil { - traceErr = fmt.Errorf("err: %w\n, %s", err, tracer.StackTrace()) - log.Warn("ClientBase ReCall grpc second call get error", zap.Error(traceErr)) - return generic.Zero[T](), traceErr - } - return ret, err -} - -func (c *ClientBase[T]) bgHealthCheck(client T) { - c.sf.Do("healthcheck", func() (any, error) { - ctx, cancel := context.WithTimeout(context.Background(), paramtable.Get().CommonCfg.SessionTTL.GetAsDuration(time.Second)) - defer cancel() - - _, err := client.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) - if err != nil { - c.resetConnection(client) - } - - return struct{}{}, nil - }) + // All retry operations are done in `call` function. + return c.Call(ctx, caller) } // Close close the client connection diff --git a/internal/util/grpcclient/client_test.go b/internal/util/grpcclient/client_test.go index 17272e70bc..63a72c214a 100644 --- a/internal/util/grpcclient/client_test.go +++ b/internal/util/grpcclient/client_test.go @@ -23,11 +23,11 @@ import ( "net" "os" "strings" - "sync" "testing" "time" "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus/pkg/util/merr" "google.golang.org/grpc/examples/helloworld/helloworld" "google.golang.org/grpc/keepalive" @@ -106,11 +106,18 @@ func testCall(t *testing.T, compressed bool) { // mock client with nothing base := ClientBase[*mockClient]{} base.CompressionEnabled = compressed - base.grpcClientMtx.Lock() - base.grpcClient = &mockClient{} - base.grpcClientMtx.Unlock() + initClient := func() { + base.grpcClientMtx.Lock() + base.grpcClient = &mockClient{} + base.grpcClientMtx.Unlock() + } + base.MaxAttempts = 1 + base.SetGetAddrFunc(func() (string, error) { + return "", errors.New("mocked address error") + }) t.Run("Call normal return", func(t *testing.T) { + initClient() _, err := base.Call(context.Background(), func(client *mockClient) (any, error) { return struct{}{}, nil }) @@ -118,6 +125,7 @@ func testCall(t *testing.T, compressed bool) { }) t.Run("Call with canceled context", func(t *testing.T) { + initClient() ctx, cancel := context.WithCancel(context.Background()) cancel() _, err := base.Call(ctx, func(client *mockClient) (any, error) { @@ -128,22 +136,7 @@ func testCall(t *testing.T, compressed bool) { }) t.Run("Call canceled in caller func", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - errMock := errors.New("mocked") - _, err := base.Call(ctx, func(client *mockClient) (any, error) { - cancel() - return nil, errMock - }) - - assert.Error(t, err) - assert.True(t, errors.Is(err, errMock)) - base.grpcClientMtx.RLock() - // client shall not be reset - assert.NotNil(t, base.grpcClient) - base.grpcClientMtx.RUnlock() - }) - - t.Run("Call canceled in caller func", func(t *testing.T) { + initClient() ctx, cancel := context.WithCancel(context.Background()) errMock := errors.New("mocked") _, err := base.Call(ctx, func(client *mockClient) (any, error) { @@ -160,6 +153,7 @@ func testCall(t *testing.T, compressed bool) { }) t.Run("Call returns non-grpc error", func(t *testing.T) { + initClient() ctx, cancel := context.WithCancel(context.Background()) defer cancel() errMock := errors.New("mocked") @@ -176,6 +170,7 @@ func testCall(t *testing.T, compressed bool) { }) t.Run("Call returns grpc error", func(t *testing.T) { + initClient() ctx, cancel := context.WithCancel(context.Background()) defer cancel() errGrpc := status.Error(codes.Unknown, "mocked") @@ -211,11 +206,18 @@ func testCall(t *testing.T, compressed bool) { func TestClientBase_Recall(t *testing.T) { // mock client with nothing base := ClientBase[*mockClient]{} - base.grpcClientMtx.Lock() - base.grpcClient = &mockClient{} - base.grpcClientMtx.Unlock() + initClient := func() { + base.grpcClientMtx.Lock() + base.grpcClient = &mockClient{} + base.grpcClientMtx.Unlock() + } + base.MaxAttempts = 1 + base.SetGetAddrFunc(func() (string, error) { + return "", errors.New("mocked address error") + }) t.Run("Recall normal return", func(t *testing.T) { + initClient() _, err := base.ReCall(context.Background(), func(client *mockClient) (any, error) { return struct{}{}, nil }) @@ -223,6 +225,7 @@ func TestClientBase_Recall(t *testing.T) { }) t.Run("ReCall with canceled context", func(t *testing.T) { + initClient() ctx, cancel := context.WithCancel(context.Background()) cancel() _, err := base.ReCall(ctx, func(client *mockClient) (any, error) { @@ -232,24 +235,8 @@ func TestClientBase_Recall(t *testing.T) { assert.True(t, errors.Is(err, context.Canceled)) }) - t.Run("ReCall fails first and success second", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - flag := false - var mut sync.Mutex - _, err := base.ReCall(ctx, func(client *mockClient) (any, error) { - mut.Lock() - defer mut.Unlock() - if flag { - return struct{}{}, nil - } - flag = true - return nil, errors.New("mock first") - }) - assert.NoError(t, err) - }) - t.Run("ReCall canceled in caller func", func(t *testing.T) { + initClient() ctx, cancel := context.WithCancel(context.Background()) errMock := errors.New("mocked") _, err := base.ReCall(ctx, func(client *mockClient) (any, error) { @@ -258,7 +245,7 @@ func TestClientBase_Recall(t *testing.T) { }) assert.Error(t, err) - assert.True(t, errors.Is(err, context.Canceled)) + assert.True(t, errors.Is(err, errMock)) base.grpcClientMtx.RLock() // client shall not be reset assert.NotNil(t, base.grpcClient) @@ -314,7 +301,7 @@ func TestClientBase_RetryPolicy(t *testing.T) { Timeout: 60 * time.Second, } - maxAttempts := 5 + maxAttempts := 1 s := grpc.NewServer( grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp), @@ -338,7 +325,6 @@ func TestClientBase_RetryPolicy(t *testing.T) { MaxAttempts: maxAttempts, InitialBackoff: 10.0, MaxBackoff: 60.0, - BackoffMultiplier: 2.0, } clientBase.SetRole(typeutil.DataCoordRole) clientBase.SetGetAddrFunc(func() (string, error) { @@ -352,9 +338,12 @@ func TestClientBase_RetryPolicy(t *testing.T) { ctx := context.Background() randID := rand.Int63() res, err := clientBase.Call(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { - return &milvuspb.ComponentStates{State: &milvuspb.ComponentInfo{ - NodeID: randID, - }}, nil + return &milvuspb.ComponentStates{ + State: &milvuspb.ComponentInfo{ + NodeID: randID, + }, + Status: merr.Status(nil), + }, nil }) assert.NoError(t, err) assert.Equal(t, res.(*milvuspb.ComponentStates).GetState().GetNodeID(), randID) @@ -375,7 +364,7 @@ func TestClientBase_Compression(t *testing.T) { Timeout: 60 * time.Second, } - maxAttempts := 5 + maxAttempts := 1 s := grpc.NewServer( grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp), @@ -399,7 +388,6 @@ func TestClientBase_Compression(t *testing.T) { MaxAttempts: maxAttempts, InitialBackoff: 10.0, MaxBackoff: 60.0, - BackoffMultiplier: 2.0, CompressionEnabled: true, } clientBase.SetRole(typeutil.DataCoordRole) @@ -414,9 +402,12 @@ func TestClientBase_Compression(t *testing.T) { ctx := context.Background() randID := rand.Int63() res, err := clientBase.Call(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { - return &milvuspb.ComponentStates{State: &milvuspb.ComponentInfo{ - NodeID: randID, - }}, nil + return &milvuspb.ComponentStates{ + State: &milvuspb.ComponentInfo{ + NodeID: randID, + }, + Status: merr.Status(nil), + }, nil }) assert.NoError(t, err) assert.Equal(t, res.(*milvuspb.ComponentStates).GetState().GetNodeID(), randID) diff --git a/pkg/go.sum b/pkg/go.sum index 3ed3a90b8a..7e9d5fc7df 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -211,6 +211,7 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 92e19ddc3a..84c92b2e94 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -22,7 +22,7 @@ import ( ) const ( - retriableFlag = 1 << 16 + retryableFlag = 1 << 16 CanceledCode int32 = 10000 TimeoutCode int32 = 10001 ) @@ -124,7 +124,7 @@ type milvusError struct { func newMilvusError(msg string, code int32, retriable bool) milvusError { if retriable { - code |= retriableFlag + code |= retryableFlag } return milvusError{ msg: msg, diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 3995299c49..75d4b762f5 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -56,8 +56,12 @@ func Code(err error) int32 { } } -func IsRetriable(err error) bool { - return Code(err)&retriableFlag != 0 +func IsRetryableErr(err error) bool { + return IsRetryableCode(Code(err)) +} + +func IsRetryableCode(code int32) bool { + return code&retryableFlag != 0 } func IsCanceledOrTimeout(err error) bool { @@ -130,7 +134,7 @@ func Error(status *commonpb.Status) error { return newMilvusError(fmt.Sprintf("legacy error code:%d, reason: %s", status.GetErrorCode(), status.GetReason()), errUnexpected.errCode, false) } - return newMilvusError(status.GetReason(), code, code&retriableFlag != 0) + return newMilvusError(status.GetReason(), code, code&retryableFlag != 0) } // CheckHealthy checks whether the state is healthy, diff --git a/pkg/util/paramtable/grpc_param.go b/pkg/util/paramtable/grpc_param.go index 9760b20ef4..2371d57feb 100644 --- a/pkg/util/paramtable/grpc_param.go +++ b/pkg/util/paramtable/grpc_param.go @@ -43,12 +43,10 @@ const ( DefaultKeepAliveTimeout = 20000 // Grpc retry policy - DefaultMaxAttempts = 5 - DefaultInitialBackoff float64 = 1.0 - DefaultMaxBackoff float64 = 10.0 - DefaultBackoffMultiplier float64 = 2.0 - - DefaultCompressionEnabled bool = false + DefaultMaxAttempts = 10 + DefaultInitialBackoff float64 = 0.2 + DefaultMaxBackoff float64 = 10 + DefaultCompressionEnabled bool = false ProxyInternalPort = 19529 ProxyExternalPort = 19530 @@ -194,10 +192,9 @@ type GrpcClientConfig struct { KeepAliveTime ParamItem `refreshable:"false"` KeepAliveTimeout ParamItem `refreshable:"false"` - MaxAttempts ParamItem `refreshable:"false"` - InitialBackoff ParamItem `refreshable:"false"` - MaxBackoff ParamItem `refreshable:"false"` - BackoffMultiplier ParamItem `refreshable:"false"` + MaxAttempts ParamItem `refreshable:"false"` + InitialBackoff ParamItem `refreshable:"false"` + MaxBackoff ParamItem `refreshable:"false"` } func (p *GrpcClientConfig) Init(domain string, base *BaseTable) { @@ -318,19 +315,13 @@ func (p *GrpcClientConfig) Init(domain string, base *BaseTable) { if v == "" { return maxAttempts } - iv, err := strconv.Atoi(v) + _, err := strconv.Atoi(v) if err != nil { log.Warn("Failed to convert int when parsing grpc.client.maxMaxAttempts, set to default", zap.String("role", p.Domain), zap.String("grpc.client.maxMaxAttempts", v)) return maxAttempts } - if iv < 2 || iv > 5 { - log.Warn("The value of %s should be greater than 1 and less than 6, set to default", - zap.String("role", p.Domain), - zap.String("grpc.client.maxMaxAttempts", v)) - return maxAttempts - } return v }, Export: true, @@ -345,7 +336,7 @@ func (p *GrpcClientConfig) Init(domain string, base *BaseTable) { if v == "" { return initialBackoff } - _, err := strconv.Atoi(v) + _, err := strconv.ParseFloat(v, 64) if err != nil { log.Warn("Failed to convert int when parsing grpc.client.initialBackoff, set to default", zap.String("role", p.Domain), @@ -379,27 +370,6 @@ func (p *GrpcClientConfig) Init(domain string, base *BaseTable) { } p.MaxBackoff.Init(base.mgr) - backoffMultiplier := fmt.Sprintf("%f", DefaultBackoffMultiplier) - p.BackoffMultiplier = ParamItem{ - Key: "grpc.client.backoffMultiplier", - Version: "2.0.0", - Formatter: func(v string) string { - if v == "" { - return backoffMultiplier - } - _, err := strconv.ParseFloat(v, 64) - if err != nil { - log.Warn("Failed to convert int when parsing grpc.client.backoffMultiplier, set to default", - zap.String("role", p.Domain), - zap.String("grpc.client.backoffMultiplier", v)) - return backoffMultiplier - } - return v - }, - Export: true, - } - p.BackoffMultiplier.Init(base.mgr) - compressionEnabled := fmt.Sprintf("%t", DefaultCompressionEnabled) p.CompressionEnabled = ParamItem{ Key: "grpc.client.compressionEnabled", @@ -413,7 +383,7 @@ func (p *GrpcClientConfig) Init(domain string, base *BaseTable) { log.Warn("Failed to convert int when parsing grpc.client.compressionEnabled, set to default", zap.String("role", p.Domain), zap.String("grpc.client.compressionEnabled", v)) - return backoffMultiplier + return compressionEnabled } return v }, diff --git a/pkg/util/paramtable/grpc_param_test.go b/pkg/util/paramtable/grpc_param_test.go index 235f066896..9baeef30bf 100644 --- a/pkg/util/paramtable/grpc_param_test.go +++ b/pkg/util/paramtable/grpc_param_test.go @@ -122,15 +122,14 @@ func TestGrpcClientParams(t *testing.T) { assert.Equal(t, clientConfig.MaxAttempts.GetAsInt(), DefaultMaxAttempts) base.Save("grpc.client.maxMaxAttempts", "a") assert.Equal(t, clientConfig.MaxAttempts.GetAsInt(), DefaultMaxAttempts) - base.Save("grpc.client.maxMaxAttempts", "1") - assert.Equal(t, clientConfig.MaxAttempts.GetAsInt(), DefaultMaxAttempts) - base.Save("grpc.client.maxMaxAttempts", "10") - assert.Equal(t, clientConfig.MaxAttempts.GetAsInt(), DefaultMaxAttempts) base.Save("grpc.client.maxMaxAttempts", "4") assert.Equal(t, clientConfig.MaxAttempts.GetAsInt(), 4) + assert.Equal(t, clientConfig.InitialBackoff.GetAsFloat(), DefaultInitialBackoff) base.Save("grpc.client.initialBackOff", "a") + assert.Equal(t, clientConfig.InitialBackoff.GetAsFloat(), DefaultInitialBackoff) base.Save("grpc.client.initialBackOff", "2.0") + assert.Equal(t, clientConfig.InitialBackoff.GetAsFloat(), 2.0) assert.Equal(t, clientConfig.MaxBackoff.GetAsFloat(), DefaultMaxBackoff) base.Save("grpc.client.maxBackOff", "a") @@ -138,12 +137,6 @@ func TestGrpcClientParams(t *testing.T) { base.Save("grpc.client.maxBackOff", "50.0") assert.Equal(t, clientConfig.MaxBackoff.GetAsFloat(), 50.0) - assert.Equal(t, clientConfig.BackoffMultiplier.GetAsFloat(), DefaultBackoffMultiplier) - base.Save("grpc.client.backoffMultiplier", "a") - assert.Equal(t, clientConfig.BackoffMultiplier.GetAsFloat(), DefaultBackoffMultiplier) - base.Save("grpc.client.backoffMultiplier", "3.0") - assert.Equal(t, clientConfig.BackoffMultiplier.GetAsFloat(), 3.0) - assert.Equal(t, clientConfig.CompressionEnabled.GetAsBool(), DefaultCompressionEnabled) base.Save("grpc.client.CompressionEnabled", "a") assert.Equal(t, clientConfig.CompressionEnabled.GetAsBool(), DefaultCompressionEnabled) diff --git a/pkg/util/retry/retry.go b/pkg/util/retry/retry.go index eb56745365..5425e5dd11 100644 --- a/pkg/util/retry/retry.go +++ b/pkg/util/retry/retry.go @@ -38,7 +38,7 @@ func Do(ctx context.Context, fn func() error, opts ...Option) error { for i := uint(0); i < c.attempts; i++ { if err := fn(); err != nil { - if i%10 == 0 { + if i%4 == 0 { log.Error("retry func failed", zap.Uint("retry time", i), zap.Error(err)) } diff --git a/tests/integration/crossclusterrouting/cross_cluster_routing_test.go b/tests/integration/crossclusterrouting/cross_cluster_routing_test.go index 681b22dd3b..e2a22d7a06 100644 --- a/tests/integration/crossclusterrouting/cross_cluster_routing_test.go +++ b/tests/integration/crossclusterrouting/cross_cluster_routing_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "math/rand" + "strconv" "strings" "testing" "time" @@ -88,10 +89,13 @@ func (s *CrossClusterRoutingSuite) SetupSuite() { rand.Seed(time.Now().UnixNano()) paramtable.Init() + + paramtable.Get().Save("grpc.client.maxMaxAttempts", "1") s.factory = dependency.NewDefaultFactory(true) } func (s *CrossClusterRoutingSuite) TearDownSuite() { + paramtable.Get().Save("grpc.client.maxMaxAttempts", strconv.FormatInt(paramtable.DefaultMaxAttempts, 10)) } func (s *CrossClusterRoutingSuite) SetupTest() {