From 9aa1fd25faee3eb9177e8c8eba3ca950e47fb807 Mon Sep 17 00:00:00 2001 From: godchen Date: Thu, 27 May 2021 10:30:11 +0800 Subject: [PATCH] Add indexnode grpc retry (#5418) Add indexnode grpc retry. Signed-off-by: godchen --- .../distributed/indexnode/client/client.go | 91 +++++++++++++++++-- internal/indexservice/node_mgr.go | 2 +- 2 files changed, 82 insertions(+), 11 deletions(-) diff --git a/internal/distributed/indexnode/client/client.go b/internal/distributed/indexnode/client/client.go index a7a55bb6ec..dee0e36d34 100644 --- a/internal/distributed/indexnode/client/client.go +++ b/internal/distributed/indexnode/client/client.go @@ -13,6 +13,7 @@ package grpcindexnodeclient import ( "context" + "fmt" "time" "github.com/milvus-io/milvus/internal/log" @@ -30,14 +31,26 @@ import ( type Client struct { grpcClient indexpb.IndexNodeClient - address string + conn *grpc.ClientConn ctx context.Context + + address string + + timeout time.Duration + reconnTry int + recallTry int } -func NewClient(nodeAddress string) (*Client, error) { +func NewClient(address string, timeout time.Duration) (*Client, error) { + if address == "" { + return nil, fmt.Errorf("address is empty") + } return &Client{ - address: nodeAddress, - ctx: context.Background(), + address: address, + ctx: context.Background(), + timeout: timeout, + recallTry: 3, + reconnTry: 10, }, nil } @@ -53,16 +66,59 @@ func (c *Client) Init() error { if err != nil { return err } - c.grpcClient = indexpb.NewIndexNodeClient(conn) + c.conn = conn return nil } err := retry.Retry(100000, time.Millisecond*200, connectGrpcFunc) if err != nil { return err } + c.grpcClient = indexpb.NewIndexNodeClient(c.conn) return nil } +func (c *Client) reconnect() error { + tracer := opentracing.GlobalTracer() + var err error + connectGrpcFunc := func() error { + log.Debug("indexnode connect ", zap.String("address", c.address)) + conn, err := grpc.DialContext(c.ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(), + grpc.WithUnaryInterceptor( + otgrpc.OpenTracingClientInterceptor(tracer)), + grpc.WithStreamInterceptor( + otgrpc.OpenTracingStreamClientInterceptor(tracer))) + if err != nil { + return err + } + c.conn = conn + return nil + } + + err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc) + if err != nil { + return err + } + c.grpcClient = indexpb.NewIndexNodeClient(c.conn) + return nil +} + +func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) { + ret, err := caller() + if err == nil { + return ret, nil + } + for i := 0; i < c.recallTry; i++ { + err = c.reconnect() + if err == nil { + ret, err = caller() + if err == nil { + return ret, nil + } + } + } + return ret, err +} + func (c *Client) Start() error { return nil } @@ -77,21 +133,36 @@ func (c *Client) Register() error { } func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { - return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{}) + ret, err := c.recall(func() (interface{}, error) { + return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{}) + }) + return ret.(*internalpb.ComponentStates), err } func (c *Client) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { - return c.grpcClient.GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{}) + ret, err := c.recall(func() (interface{}, error) { + return c.grpcClient.GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{}) + }) + return ret.(*milvuspb.StringResponse), err } func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { - return c.grpcClient.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{}) + ret, err := c.recall(func() (interface{}, error) { + return c.grpcClient.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{}) + }) + return ret.(*milvuspb.StringResponse), err } func (c *Client) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*commonpb.Status, error) { - return c.grpcClient.BuildIndex(ctx, req) + ret, err := c.recall(func() (interface{}, error) { + return c.grpcClient.BuildIndex(ctx, req) + }) + return ret.(*commonpb.Status), err } func (c *Client) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) { - return c.grpcClient.DropIndex(ctx, req) + ret, err := c.recall(func() (interface{}, error) { + return c.grpcClient.DropIndex(ctx, req) + }) + return ret.(*commonpb.Status), err } diff --git a/internal/indexservice/node_mgr.go b/internal/indexservice/node_mgr.go index 2f505f8f88..750c5be4b0 100644 --- a/internal/indexservice/node_mgr.go +++ b/internal/indexservice/node_mgr.go @@ -41,7 +41,7 @@ func (i *IndexService) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest } nodeAddress := req.Address.Ip + ":" + strconv.FormatInt(req.Address.Port, 10) - nodeClient, err := grpcindexnodeclient.NewClient(nodeAddress) + nodeClient, err := grpcindexnodeclient.NewClient(nodeAddress, 10) if err != nil { return err }