From e27907cd29527e192cd27f6a4358741996d449b8 Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 1 Mar 2023 11:31:48 +0800 Subject: [PATCH] [Cherry-Pick] Add background health check if ctx err returned (#22470) Signed-off-by: Congqi Xia --- internal/util/grpcclient/client.go | 76 ++++++++++++++++------- internal/util/grpcclient/client_test.go | 81 ++++++++++++------------- 2 files changed, 92 insertions(+), 65 deletions(-) diff --git a/internal/util/grpcclient/client.go b/internal/util/grpcclient/client.go index 06f3228c03..8d460956be 100644 --- a/internal/util/grpcclient/client.go +++ b/internal/util/grpcclient/client.go @@ -24,6 +24,14 @@ import ( "time" grpcopentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" + "go.uber.org/zap" + "golang.org/x/sync/singleflight" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" + + "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util/crypto" @@ -31,15 +39,12 @@ import ( "github.com/milvus-io/milvus/internal/util/generic" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/trace" - "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/backoff" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/keepalive" ) // GrpcClient abstracts client of grpc -type GrpcClient[T any] interface { +type GrpcClient[T interface { + GetComponentStates(ctx context.Context, in *milvuspb.GetComponentStatesRequest, opts ...grpc.CallOption) (*milvuspb.ComponentStates, error) +}] interface { SetRole(string) GetRole() string SetGetAddrFunc(func() (string, error)) @@ -54,7 +59,9 @@ type GrpcClient[T any] interface { } // ClientBase is a base of grpc client -type ClientBase[T any] struct { +type ClientBase[T interface { + GetComponentStates(ctx context.Context, in *milvuspb.GetComponentStatesRequest, opts ...grpc.CallOption) (*milvuspb.ComponentStates, error) +}] struct { getAddrFunc func() (string, error) newGrpcClient func(cc *grpc.ClientConn) T @@ -76,6 +83,8 @@ type ClientBase[T any] struct { MaxBackoff float32 BackoffMultiplier float32 NodeID int64 + + sf singleflight.Group } // SetRole sets role of client @@ -249,29 +258,31 @@ func (c *ClientBase[T]) callOnce(ctx context.Context, caller func(client T) (any return generic.Zero[T](), err } - var ( - ret any - err2 error - ) + var ret any _, _ = retry.DoGrpc(ctx, uint(c.MaxAttempts*2), func() (any, error) { - ret, err2 = caller(client) - return ret, err2 + ret, err = caller(client) + return ret, err }) - if err2 == nil { + if err == nil { return ret, nil } if !funcutil.CheckCtxValid(ctx) { - return generic.Zero[T](), err2 + // start bg check in case of https://github.com/milvus-io/milvus/issues/22435 + go c.bgHealthCheck(client) + return generic.Zero[T](), err } - if !funcutil.IsGrpcErr(err2) { - log.Debug("ClientBase:isNotGrpcErr", zap.Error(err2)) - return generic.Zero[T](), err2 + if !funcutil.IsGrpcErr(err) { + log.Warn("ClientBase:isNotGrpcErr", zap.Error(err)) + return generic.Zero[T](), err } - log.Debug(c.GetRole()+" ClientBase grpc error, start to reset connection", zap.Error(err2)) + log.Info("ClientBase grpc error, start to reset connection", + zap.String("role", c.GetRole()), + zap.Error(err), + ) c.resetConnection(client) - return ret, err2 + return ret, err } // Call does a grpc call @@ -283,7 +294,10 @@ func (c *ClientBase[T]) Call(ctx context.Context, caller func(client T) (any, er ret, err := c.callOnce(ctx, caller) if err != nil { traceErr := fmt.Errorf("err: %w\n, %s", err, trace.StackTrace()) - log.Error("ClientBase Call grpc first call get error", zap.String("role", c.GetRole()), zap.Error(traceErr)) + log.Warn("ClientBase Call grpc first call get error", + zap.String("role", c.GetRole()), + zap.Error(traceErr), + ) return generic.Zero[T](), traceErr } return ret, err @@ -301,7 +315,10 @@ func (c *ClientBase[T]) ReCall(ctx context.Context, caller func(client T) (any, } traceErr := fmt.Errorf("err: %w\n, %s", err, trace.StackTrace()) - log.Warn(c.GetRole()+" ClientBase ReCall grpc first call get error ", zap.Error(traceErr)) + log.Warn("ClientBase ReCall grpc first call get error", + zap.String("role", c.GetRole()), + zap.Error(traceErr), + ) if !funcutil.CheckCtxValid(ctx) { return generic.Zero[T](), ctx.Err() @@ -316,6 +333,21 @@ func (c *ClientBase[T]) ReCall(ctx context.Context, caller func(client T) (any, return ret, err } +func (c *ClientBase[T]) bgHealthCheck(client T) { + c.sf.Do("healthcheck", func() (any, error) { + // v2.2.0 does not has paramtable, use magic nubmer here + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + _, err := client.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + if err != nil { + c.resetConnection(client) + } + + return struct{}{}, nil + }) +} + // Close close the client connection func (c *ClientBase[T]) Close() error { c.grpcClientMtx.Lock() diff --git a/internal/util/grpcclient/client_test.go b/internal/util/grpcclient/client_test.go index 8e45a96eec..57f62d5985 100644 --- a/internal/util/grpcclient/client_test.go +++ b/internal/util/grpcclient/client_test.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "log" + "math/rand" "net" "strings" "sync" @@ -30,6 +31,8 @@ import ( "google.golang.org/grpc/examples/helloworld/helloworld" "google.golang.org/grpc/keepalive" + "github.com/milvus-io/milvus-proto/go-api/milvuspb" + "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/util/typeutil" "google.golang.org/grpc" "google.golang.org/grpc/reflection" @@ -39,21 +42,27 @@ import ( "google.golang.org/grpc/status" ) +type mockClient struct{} + +func (c *mockClient) GetComponentStates(ctx context.Context, in *milvuspb.GetComponentStatesRequest, opts ...grpc.CallOption) (*milvuspb.ComponentStates, error) { + return &milvuspb.ComponentStates{}, nil +} + func TestClientBase_SetRole(t *testing.T) { - base := ClientBase[any]{} + base := ClientBase[*mockClient]{} expect := "abc" base.SetRole("abc") assert.Equal(t, expect, base.GetRole()) } func TestClientBase_GetRole(t *testing.T) { - base := ClientBase[any]{} + base := ClientBase[*mockClient]{} assert.Equal(t, "", base.GetRole()) } func TestClientBase_connect(t *testing.T) { t.Run("failed to connect", func(t *testing.T) { - base := ClientBase[any]{ + base := ClientBase[*mockClient]{ getAddrFunc: func() (string, error) { return "", nil }, @@ -66,7 +75,7 @@ func TestClientBase_connect(t *testing.T) { t.Run("failed to get addr", func(t *testing.T) { errMock := errors.New("mocked") - base := ClientBase[any]{ + base := ClientBase[*mockClient]{ getAddrFunc: func() (string, error) { return "", errMock }, @@ -80,13 +89,13 @@ func TestClientBase_connect(t *testing.T) { func TestClientBase_Call(t *testing.T) { // mock client with nothing - base := ClientBase[any]{} + base := ClientBase[*mockClient]{} base.grpcClientMtx.Lock() - base.grpcClient = struct{}{} + base.grpcClient = &mockClient{} base.grpcClientMtx.Unlock() t.Run("Call normal return", func(t *testing.T) { - _, err := base.Call(context.Background(), func(client any) (any, error) { + _, err := base.Call(context.Background(), func(client *mockClient) (any, error) { return struct{}{}, nil }) assert.NoError(t, err) @@ -95,7 +104,7 @@ func TestClientBase_Call(t *testing.T) { t.Run("Call with canceled context", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - _, err := base.Call(ctx, func(client any) (any, error) { + _, err := base.Call(ctx, func(client *mockClient) (any, error) { return struct{}{}, nil }) assert.Error(t, err) @@ -105,23 +114,7 @@ func TestClientBase_Call(t *testing.T) { t.Run("Call canceled in caller func", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) errMock := errors.New("mocked") - _, err := base.Call(ctx, func(client any) (any, error) { - cancel() - return nil, errMock - }) - - assert.Error(t, err) - assert.True(t, errors.Is(err, errMock)) - base.grpcClientMtx.RLock() - // client shall not be reset - assert.NotNil(t, base.grpcClient) - base.grpcClientMtx.RUnlock() - }) - - t.Run("Call canceled in caller func", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - errMock := errors.New("mocked") - _, err := base.Call(ctx, func(client any) (any, error) { + _, err := base.Call(ctx, func(client *mockClient) (any, error) { cancel() return nil, errMock }) @@ -138,7 +131,7 @@ func TestClientBase_Call(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() errMock := errors.New("mocked") - _, err := base.Call(ctx, func(client any) (any, error) { + _, err := base.Call(ctx, func(client *mockClient) (any, error) { return nil, errMock }) @@ -154,7 +147,7 @@ func TestClientBase_Call(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() errGrpc := status.Error(codes.Unknown, "mocked") - _, err := base.Call(ctx, func(client any) (any, error) { + _, err := base.Call(ctx, func(client *mockClient) (any, error) { return nil, errGrpc }) @@ -175,7 +168,7 @@ func TestClientBase_Call(t *testing.T) { t.Run("Call with connect failure", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, err := base.Call(ctx, func(client any) (any, error) { + _, err := base.Call(ctx, func(client *mockClient) (any, error) { return struct{}{}, nil }) assert.Error(t, err) @@ -185,13 +178,13 @@ func TestClientBase_Call(t *testing.T) { func TestClientBase_Recall(t *testing.T) { // mock client with nothing - base := ClientBase[any]{} + base := ClientBase[*mockClient]{} base.grpcClientMtx.Lock() - base.grpcClient = struct{}{} + base.grpcClient = &mockClient{} base.grpcClientMtx.Unlock() t.Run("Recall normal return", func(t *testing.T) { - _, err := base.ReCall(context.Background(), func(client any) (any, error) { + _, err := base.ReCall(context.Background(), func(client *mockClient) (any, error) { return struct{}{}, nil }) assert.NoError(t, err) @@ -200,7 +193,7 @@ func TestClientBase_Recall(t *testing.T) { t.Run("ReCall with canceled context", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - _, err := base.ReCall(ctx, func(client any) (any, error) { + _, err := base.ReCall(ctx, func(client *mockClient) (any, error) { return struct{}{}, nil }) assert.Error(t, err) @@ -212,7 +205,7 @@ func TestClientBase_Recall(t *testing.T) { defer cancel() flag := false var mut sync.Mutex - _, err := base.ReCall(ctx, func(client any) (any, error) { + _, err := base.ReCall(ctx, func(client *mockClient) (any, error) { mut.Lock() defer mut.Unlock() if flag { @@ -227,7 +220,7 @@ func TestClientBase_Recall(t *testing.T) { t.Run("ReCall canceled in caller func", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) errMock := errors.New("mocked") - _, err := base.ReCall(ctx, func(client any) (any, error) { + _, err := base.ReCall(ctx, func(client *mockClient) (any, error) { cancel() return nil, errMock }) @@ -248,7 +241,7 @@ func TestClientBase_Recall(t *testing.T) { t.Run("ReCall with connect failure", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, err := base.ReCall(ctx, func(client any) (any, error) { + _, err := base.ReCall(ctx, func(client *mockClient) (any, error) { return struct{}{}, nil }) assert.Error(t, err) @@ -304,13 +297,13 @@ func TestClientBase_RetryPolicy(t *testing.T) { }() defer s.Stop() - clientBase := ClientBase[helloworld.GreeterClient]{ + clientBase := ClientBase[rootcoordpb.RootCoordClient]{ ClientMaxRecvSize: 1 * 1024 * 1024, ClientMaxSendSize: 1 * 1024 * 1024, DialTimeout: 60 * time.Second, KeepAliveTime: 60 * time.Second, KeepAliveTimeout: 60 * time.Second, - RetryServiceNameConfig: "helloworld.Greeter", + RetryServiceNameConfig: "rootcoordpb.GetComponentStates", MaxAttempts: maxAttempts, InitialBackoff: 10.0, MaxBackoff: 60.0, @@ -320,17 +313,19 @@ func TestClientBase_RetryPolicy(t *testing.T) { clientBase.SetGetAddrFunc(func() (string, error) { return address, nil }) - clientBase.SetNewGrpcClientFunc(func(cc *grpc.ClientConn) helloworld.GreeterClient { - return helloworld.NewGreeterClient(cc) + clientBase.SetNewGrpcClientFunc(func(cc *grpc.ClientConn) rootcoordpb.RootCoordClient { + return rootcoordpb.NewRootCoordClient(cc) }) defer clientBase.Close() ctx := context.Background() - name := fmt.Sprintf("hello world %d", time.Now().Second()) - res, err := clientBase.Call(ctx, func(client helloworld.GreeterClient) (any, error) { + randID := rand.Int63() + res, err := clientBase.Call(ctx, func(client rootcoordpb.RootCoordClient) (any, error) { fmt.Println("client base...") - return client.SayHello(ctx, &helloworld.HelloRequest{Name: name}) + return &milvuspb.ComponentStates{State: &milvuspb.ComponentInfo{ + NodeID: randID, + }}, nil }) assert.Nil(t, err) - assert.Equal(t, res.(*helloworld.HelloReply).Message, strings.ToUpper(name)) + assert.Equal(t, res.(*milvuspb.ComponentStates).GetState().GetNodeID(), randID) }