From 064f8f76cd3ffa359c8c4ece245f116695051452 Mon Sep 17 00:00:00 2001 From: godchen Date: Thu, 24 Jun 2021 15:20:08 +0800 Subject: [PATCH] change_coord_connect_retry (#6064) Signed-off-by: godchen --- internal/distributed/datacoord/client/client.go | 15 ++++----------- internal/distributed/indexcoord/client/client.go | 16 ++++------------ internal/distributed/querycoord/client/client.go | 13 +++---------- internal/distributed/rootcoord/client/client.go | 13 +++---------- 4 files changed, 14 insertions(+), 43 deletions(-) diff --git a/internal/distributed/datacoord/client/client.go b/internal/distributed/datacoord/client/client.go index 2b491e0f4f..307bc9f5c8 100644 --- a/internal/distributed/datacoord/client/client.go +++ b/internal/distributed/datacoord/client/client.go @@ -83,23 +83,16 @@ func (c *Client) Init() error { func (c *Client) connect() error { var err error - getDataCoordAddressFn := func() error { + connectDataCoordFn := func() error { c.addr, err = getDataCoordAddress(c.sess) if err != nil { + log.Debug("DataCoordClient getDataCoordAddr failed", zap.Error(err)) return err } - return nil - } - err = retry.Do(c.ctx, getDataCoordAddressFn, c.retryOptions...) - if err != nil { - log.Debug("DataCoordClient try reconnect getDataCoordAddressFn failed", zap.Error(err)) - return err - } - connectGrpcFunc := func() error { opts := trace.GetInterceptorOpts() log.Debug("DataCoordClient try reconnect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(c.ctx, c.addr, - grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(5*time.Second), + grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(3*time.Second), grpc.WithUnaryInterceptor( grpc_middleware.ChainUnaryClient( grpc_retry.UnaryClientInterceptor(), @@ -118,7 +111,7 @@ func (c *Client) connect() error { return nil } - err = retry.Do(c.ctx, connectGrpcFunc, c.retryOptions...) + err = retry.Do(c.ctx, connectDataCoordFn, c.retryOptions...) if err != nil { log.Debug("DataCoord try reconnect failed", zap.Error(err)) return err diff --git a/internal/distributed/indexcoord/client/client.go b/internal/distributed/indexcoord/client/client.go index f09ac0c8a1..3631d07a52 100644 --- a/internal/distributed/indexcoord/client/client.go +++ b/internal/distributed/indexcoord/client/client.go @@ -85,24 +85,16 @@ func (c *Client) Init() error { func (c *Client) connect() error { var err error - getIndexCoordaddrFn := func() error { + connectIndexCoordaddrFn := func() error { c.addr, err = getIndexCoordAddr(c.sess) if err != nil { + log.Debug("IndexCoordClient getIndexCoordAddress failed") return err } - return nil - } - err = retry.Do(c.ctx, getIndexCoordaddrFn, c.retryOptions...) - if err != nil { - log.Debug("IndexCoordClient getIndexCoordAddress failed", zap.Error(err)) - return err - } - log.Debug("IndexCoordClient getIndexCoordAddress success") - connectGrpcFunc := func() error { opts := trace.GetInterceptorOpts() log.Debug("IndexCoordClient try connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(c.ctx, c.addr, - grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(5*time.Second), + grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(3*time.Second), grpc.WithUnaryInterceptor( grpc_middleware.ChainUnaryClient( grpc_retry.UnaryClientInterceptor(), @@ -121,7 +113,7 @@ func (c *Client) connect() error { return nil } - err = retry.Do(c.ctx, connectGrpcFunc, c.retryOptions...) + err = retry.Do(c.ctx, connectIndexCoordaddrFn, c.retryOptions...) if err != nil { log.Debug("IndexCoordClient try connect failed", zap.Error(err)) return err diff --git a/internal/distributed/querycoord/client/client.go b/internal/distributed/querycoord/client/client.go index a2a5e57943..f89bf37bc7 100644 --- a/internal/distributed/querycoord/client/client.go +++ b/internal/distributed/querycoord/client/client.go @@ -84,19 +84,12 @@ func (c *Client) Init() error { func (c *Client) connect() error { var err error - getQueryCoordAddressFn := func() error { + connectQueryCoordAddressFn := func() error { c.addr, err = getQueryCoordAddress(c.sess) if err != nil { + log.Debug("QueryCoordClient getQueryCoordAddress failed", zap.Error(err)) return err } - return nil - } - err = retry.Do(c.ctx, getQueryCoordAddressFn, c.retryOptions...) - if err != nil { - log.Debug("QueryCoordClient getQueryCoordAddress failed", zap.Error(err)) - return err - } - connectGrpcFunc := func() error { opts := trace.GetInterceptorOpts() log.Debug("QueryCoordClient try reconnect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(c.ctx, c.addr, @@ -119,7 +112,7 @@ func (c *Client) connect() error { return nil } - err = retry.Do(c.ctx, connectGrpcFunc, c.retryOptions...) + err = retry.Do(c.ctx, connectQueryCoordAddressFn, c.retryOptions...) if err != nil { log.Debug("QueryCoordClient try reconnect failed", zap.Error(err)) return err diff --git a/internal/distributed/rootcoord/client/client.go b/internal/distributed/rootcoord/client/client.go index 1dda66d76b..ed0d1ee6f4 100644 --- a/internal/distributed/rootcoord/client/client.go +++ b/internal/distributed/rootcoord/client/client.go @@ -91,19 +91,12 @@ func (c *GrpcClient) Init() error { func (c *GrpcClient) connect() error { var err error - getRootCoordAddrFn := func() error { + connectRootCoordAddrFn := func() error { c.addr, err = getRootCoordAddr(c.sess) if err != nil { + log.Debug("RootCoordClient getRootCoordAddr failed", zap.Error(err)) return err } - return nil - } - err = retry.Do(c.ctx, getRootCoordAddrFn, c.retryOptions...) - if err != nil { - log.Debug("RootCoordClient getRootCoordAddr failed", zap.Error(err)) - return err - } - connectGrpcFunc := func() error { opts := trace.GetInterceptorOpts() log.Debug("RootCoordClient try reconnect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(c.ctx, c.addr, @@ -126,7 +119,7 @@ func (c *GrpcClient) connect() error { return nil } - err = retry.Do(c.ctx, connectGrpcFunc, c.retryOptions...) + err = retry.Do(c.ctx, connectRootCoordAddrFn, c.retryOptions...) if err != nil { log.Debug("RootCoordClient try reconnect failed", zap.Error(err)) return err