From fd07a44dba9ca783fe3a11c60375859eeec3692e Mon Sep 17 00:00:00 2001 From: godchen Date: Wed, 23 Jun 2021 11:48:06 +0800 Subject: [PATCH] Fix error in retry refactor (#6000) * Fix error in retry refactor Signed-off-by: godchen * fix error Signed-off-by: godchen --- internal/datacoord/server.go | 2 +- internal/distributed/datanode/client/client.go | 2 +- internal/distributed/querynode/client/client.go | 14 +++++++------- internal/querycoord/cluster.go | 6 +++--- internal/querycoord/impl.go | 2 +- internal/querycoord/query_coord.go | 4 ++-- internal/querycoord/querynode.go | 5 +++-- internal/querynode/query_node.go | 2 +- 8 files changed, 19 insertions(+), 18 deletions(-) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 1a2c5e4301..07863468b8 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -92,7 +92,7 @@ func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, erro } func defaultDataNodeCreatorFunc(ctx context.Context, addr string, retryOptions ...retry.Option) (types.DataNode, error) { - return datanodeclient.NewClient(ctx, addr, retryOptions) + return datanodeclient.NewClient(ctx, addr, retryOptions...) } func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdEndpoints []string, retryOptions ...retry.Option) (types.RootCoord, error) { diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 263d1bc733..0ff504c626 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -44,7 +44,7 @@ type Client struct { retryOptions []retry.Option } -func NewClient(ctx context.Context, addr string, retryOptions []retry.Option) (*Client, error) { +func NewClient(ctx context.Context, addr string, retryOptions ...retry.Option) (*Client, error) { if addr == "" { return nil, fmt.Errorf("address is empty") } diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index 14a67113f3..f9a19714db 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -40,20 +40,20 @@ type Client struct { addr string - retryTimes uint + retryOptions []retry.Option } -func NewClient(ctx context.Context, addr string, reTryTimes uint) (*Client, error) { +func NewClient(ctx context.Context, addr string, retryOptions ...retry.Option) (*Client, error) { if addr == "" { return nil, fmt.Errorf("addr is empty") } ctx, cancel := context.WithCancel(ctx) return &Client{ - ctx: ctx, - cancel: cancel, - addr: addr, - retryTimes: reTryTimes, + ctx: ctx, + cancel: cancel, + addr: addr, + retryOptions: retryOptions, }, nil } @@ -85,7 +85,7 @@ func (c *Client) connect() error { return nil } - err := retry.Do(c.ctx, connectGrpcFunc, retry.Attempts(c.retryTimes)) + err := retry.Do(c.ctx, connectGrpcFunc, c.retryOptions...) if err != nil { log.Debug("QueryNodeClient try connect failed", zap.Error(err)) return err diff --git a/internal/querycoord/cluster.go b/internal/querycoord/cluster.go index b8196d6fd3..0e1ba86511 100644 --- a/internal/querycoord/cluster.go +++ b/internal/querycoord/cluster.go @@ -76,7 +76,7 @@ func (c *queryNodeCluster) reloadFromKV() error { if err != nil { return err } - err = c.RegisterNode(session, nodeID) + err = c.RegisterNode(context.Background(), session, nodeID) if err != nil { return err } @@ -378,7 +378,7 @@ func (c *queryNodeCluster) getNumSegments(nodeID int64) (int, error) { return numSegment, nil } -func (c *queryNodeCluster) RegisterNode(session *sessionutil.Session, id UniqueID) error { +func (c *queryNodeCluster) RegisterNode(ctx context.Context, session *sessionutil.Session, id UniqueID) error { c.Lock() defer c.Unlock() @@ -391,7 +391,7 @@ func (c *queryNodeCluster) RegisterNode(session *sessionutil.Session, id UniqueI if err != nil { return err } - node, err := newQueryNode(session.Address, id, c.client) + node, err := newQueryNode(ctx, session.Address, id, c.client) if err != nil { return err } diff --git a/internal/querycoord/impl.go b/internal/querycoord/impl.go index 84422951a7..c14993dee1 100644 --- a/internal/querycoord/impl.go +++ b/internal/querycoord/impl.go @@ -90,7 +90,7 @@ func (qc *QueryCoord) RegisterNode(ctx context.Context, req *querypb.RegisterNod ServerID: nodeID, Address: fmt.Sprintf("%s:%d", req.Address.Ip, req.Address.Port), } - err := qc.cluster.RegisterNode(session, req.Base.SourceID) + err := qc.cluster.RegisterNode(ctx, session, req.Base.SourceID) if err != nil { log.Debug("register query node new NodeClient failed", zap.Any("QueryNodeID", nodeID), zap.String("address", req.Address.String())) return &querypb.RegisterNodeResponse{ diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 49cccf29d1..cc08f8b94e 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -184,7 +184,7 @@ func (qc *QueryCoord) watchNodeLoop() { for nodeID, session := range sessionMap { if _, ok := qc.cluster.nodes[nodeID]; !ok { serverID := session.ServerID - err := qc.cluster.RegisterNode(session, serverID) + err := qc.cluster.RegisterNode(ctx, session, serverID) if err != nil { log.Error("register queryNode error", zap.Any("error", err.Error())) } @@ -228,7 +228,7 @@ func (qc *QueryCoord) watchNodeLoop() { switch event.EventType { case sessionutil.SessionAddEvent: serverID := event.Session.ServerID - err := qc.cluster.RegisterNode(event.Session, serverID) + err := qc.cluster.RegisterNode(ctx, event.Session, serverID) if err != nil { log.Error(err.Error()) } diff --git a/internal/querycoord/querynode.go b/internal/querycoord/querynode.go index dbd974f8e9..498d5533b1 100644 --- a/internal/querycoord/querynode.go +++ b/internal/querycoord/querynode.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/retry" ) type queryNode struct { @@ -40,8 +41,8 @@ type queryNode struct { onService bool } -func newQueryNode(address string, id UniqueID, kv *etcdkv.EtcdKV) (*queryNode, error) { - client, err := nodeclient.NewClient(context.TODO(), address, 300) +func newQueryNode(ctx context.Context, address string, id UniqueID, kv *etcdkv.EtcdKV) (*queryNode, error) { + client, err := nodeclient.NewClient(ctx, address, retry.Attempts(300)) if err != nil { return nil, err } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index a3bc9b203f..7bea0f73fc 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -134,7 +134,7 @@ func (node *QueryNode) Init() error { return err } log.Debug("queryNode try to connect etcd") - err := retry.Do(context.TODO(), connectEtcdFn, retry.Attempts(300)) + err := retry.Do(node.queryNodeLoopCtx, connectEtcdFn, retry.Attempts(300)) if err != nil { log.Debug("queryNode try to connect etcd failed", zap.Error(err)) return err