From 5fd92ec80c668eca936cd0f9d64a0f4edf0a4e43 Mon Sep 17 00:00:00 2001 From: godchen Date: Wed, 26 May 2021 19:37:34 +0800 Subject: [PATCH] Add querynode grpc retry (#5428) Add querynode grpc retry. Signed-off-by: godchen --- .../distributed/querynode/client/client.go | 131 +++++++++++++++--- internal/queryservice/impl.go | 10 +- 2 files changed, 119 insertions(+), 22 deletions(-) diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index 7746b232d5..04df5999a6 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -13,16 +13,20 @@ package grpcquerynodeclient import ( "context" + "fmt" "time" "google.golang.org/grpc" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/milvuspb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util/retry" otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" + "go.uber.org/zap" ) const ( @@ -35,28 +39,38 @@ type Client struct { grpcClient querypb.QueryNodeClient conn *grpc.ClientConn addr string + + timeout time.Duration + reconnTry int + recallTry int } -func NewClient(address string) *Client { - return &Client{ - addr: address, +func NewClient(address string) (*Client, error) { + if address == "" { + return nil, fmt.Errorf("address is empty") } + return &Client{ + ctx: context.Background(), + addr: address, + }, nil } func (c *Client) Init() error { tracer := opentracing.GlobalTracer() - ctx, cancel := context.WithTimeout(context.Background(), RPCConnectionTimeout) - defer cancel() - var err error - for i := 0; i < Retry*100; i++ { - if c.conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), + connectGrpcFunc := func() error { + log.Debug("querynode connect", zap.String("address", c.addr)) + conn, err := grpc.DialContext(c.ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor( otgrpc.OpenTracingClientInterceptor(tracer)), grpc.WithStreamInterceptor( - otgrpc.OpenTracingStreamClientInterceptor(tracer))); err == nil { - break + otgrpc.OpenTracingStreamClientInterceptor(tracer))) + if err != nil { + return err } + c.conn = conn + return nil } + err := retry.Retry(c.reconnTry, time.Millisecond*200, connectGrpcFunc) if err != nil { return err } @@ -64,6 +78,48 @@ func (c *Client) Init() error { return nil } +func (c *Client) reconnect() error { + tracer := opentracing.GlobalTracer() + var err error + connectGrpcFunc := func() error { + log.Debug("querynode connect ", zap.String("address", c.addr)) + conn, err := grpc.DialContext(c.ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), + grpc.WithUnaryInterceptor( + otgrpc.OpenTracingClientInterceptor(tracer)), + grpc.WithStreamInterceptor( + otgrpc.OpenTracingStreamClientInterceptor(tracer))) + if err != nil { + return err + } + c.conn = conn + return nil + } + + err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc) + if err != nil { + return err + } + c.grpcClient = querypb.NewQueryNodeClient(c.conn) + return nil +} + +func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) { + ret, err := caller() + if err == nil { + return ret, nil + } + for i := 0; i < c.recallTry; i++ { + err = c.reconnect() + if err == nil { + ret, err = caller() + if err == nil { + return ret, nil + } + } + } + return ret, err +} + func (c *Client) Start() error { return nil } @@ -78,45 +134,78 @@ func (c *Client) Register() error { } func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { - return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{}) + ret, err := c.recall(func() (interface{}, error) { + return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{}) + }) + return ret.(*internalpb.ComponentStates), err } func (c *Client) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { - return c.grpcClient.GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{}) + ret, err := c.recall(func() (interface{}, error) { + return c.grpcClient.GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{}) + }) + return ret.(*milvuspb.StringResponse), err } func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { - return c.grpcClient.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{}) + ret, err := c.recall(func() (interface{}, error) { + return c.grpcClient.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{}) + }) + return ret.(*milvuspb.StringResponse), err } func (c *Client) AddQueryChannel(ctx context.Context, req *querypb.AddQueryChannelRequest) (*commonpb.Status, error) { - return c.grpcClient.AddQueryChannel(ctx, req) + ret, err := c.recall(func() (interface{}, error) { + return c.grpcClient.AddQueryChannel(ctx, req) + }) + return ret.(*commonpb.Status), err } func (c *Client) RemoveQueryChannel(ctx context.Context, req *querypb.RemoveQueryChannelRequest) (*commonpb.Status, error) { - return c.grpcClient.RemoveQueryChannel(ctx, req) + ret, err := c.recall(func() (interface{}, error) { + return c.grpcClient.RemoveQueryChannel(ctx, req) + }) + return ret.(*commonpb.Status), err } func (c *Client) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) { - return c.grpcClient.WatchDmChannels(ctx, req) + ret, err := c.recall(func() (interface{}, error) { + return c.grpcClient.WatchDmChannels(ctx, req) + }) + return ret.(*commonpb.Status), err } func (c *Client) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) (*commonpb.Status, error) { - return c.grpcClient.LoadSegments(ctx, req) + ret, err := c.recall(func() (interface{}, error) { + return c.grpcClient.LoadSegments(ctx, req) + }) + return ret.(*commonpb.Status), err } func (c *Client) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) { - return c.grpcClient.ReleaseCollection(ctx, req) + ret, err := c.recall(func() (interface{}, error) { + return c.grpcClient.ReleaseCollection(ctx, req) + }) + return ret.(*commonpb.Status), err } func (c *Client) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) { - return c.grpcClient.ReleasePartitions(ctx, req) + ret, err := c.recall(func() (interface{}, error) { + return c.grpcClient.ReleasePartitions(ctx, req) + }) + return ret.(*commonpb.Status), err } func (c *Client) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error) { - return c.grpcClient.ReleaseSegments(ctx, req) + ret, err := c.recall(func() (interface{}, error) { + return c.grpcClient.ReleaseSegments(ctx, req) + }) + return ret.(*commonpb.Status), err } func (c *Client) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) { - return c.grpcClient.GetSegmentInfo(ctx, req) + ret, err := c.recall(func() (interface{}, error) { + return c.grpcClient.GetSegmentInfo(ctx, req) + }) + return ret.(*querypb.GetSegmentInfoResponse), err } diff --git a/internal/queryservice/impl.go b/internal/queryservice/impl.go index 4ab6d5c904..cfd8887fee 100644 --- a/internal/queryservice/impl.go +++ b/internal/queryservice/impl.go @@ -90,7 +90,15 @@ func (qs *QueryService) RegisterNode(ctx context.Context, req *querypb.RegisterN } registerNodeAddress := req.Address.Ip + ":" + strconv.FormatInt(req.Address.Port, 10) - client := nodeclient.NewClient(registerNodeAddress) + client, err := nodeclient.NewClient(registerNodeAddress) + if err != nil { + return &querypb.RegisterNodeResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + InitParams: new(internalpb.InitParams), + }, err + } if err := client.Init(); err != nil { return &querypb.RegisterNodeResponse{ Status: &commonpb.Status{