mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Fix error in retry refactor (#6000)
* Fix error in retry refactor Signed-off-by: godchen <qingxiang.chen@zilliz.com> * fix error Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
parent
ba5aacd3f2
commit
fd07a44dba
@ -92,7 +92,7 @@ func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, erro
|
||||
}
|
||||
|
||||
func defaultDataNodeCreatorFunc(ctx context.Context, addr string, retryOptions ...retry.Option) (types.DataNode, error) {
|
||||
return datanodeclient.NewClient(ctx, addr, retryOptions)
|
||||
return datanodeclient.NewClient(ctx, addr, retryOptions...)
|
||||
}
|
||||
|
||||
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdEndpoints []string, retryOptions ...retry.Option) (types.RootCoord, error) {
|
||||
|
||||
@ -44,7 +44,7 @@ type Client struct {
|
||||
retryOptions []retry.Option
|
||||
}
|
||||
|
||||
func NewClient(ctx context.Context, addr string, retryOptions []retry.Option) (*Client, error) {
|
||||
func NewClient(ctx context.Context, addr string, retryOptions ...retry.Option) (*Client, error) {
|
||||
if addr == "" {
|
||||
return nil, fmt.Errorf("address is empty")
|
||||
}
|
||||
|
||||
@ -40,20 +40,20 @@ type Client struct {
|
||||
|
||||
addr string
|
||||
|
||||
retryTimes uint
|
||||
retryOptions []retry.Option
|
||||
}
|
||||
|
||||
func NewClient(ctx context.Context, addr string, reTryTimes uint) (*Client, error) {
|
||||
func NewClient(ctx context.Context, addr string, retryOptions ...retry.Option) (*Client, error) {
|
||||
if addr == "" {
|
||||
return nil, fmt.Errorf("addr is empty")
|
||||
}
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
return &Client{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
addr: addr,
|
||||
retryTimes: reTryTimes,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
addr: addr,
|
||||
retryOptions: retryOptions,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -85,7 +85,7 @@ func (c *Client) connect() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := retry.Do(c.ctx, connectGrpcFunc, retry.Attempts(c.retryTimes))
|
||||
err := retry.Do(c.ctx, connectGrpcFunc, c.retryOptions...)
|
||||
if err != nil {
|
||||
log.Debug("QueryNodeClient try connect failed", zap.Error(err))
|
||||
return err
|
||||
|
||||
@ -76,7 +76,7 @@ func (c *queryNodeCluster) reloadFromKV() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = c.RegisterNode(session, nodeID)
|
||||
err = c.RegisterNode(context.Background(), session, nodeID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -378,7 +378,7 @@ func (c *queryNodeCluster) getNumSegments(nodeID int64) (int, error) {
|
||||
return numSegment, nil
|
||||
}
|
||||
|
||||
func (c *queryNodeCluster) RegisterNode(session *sessionutil.Session, id UniqueID) error {
|
||||
func (c *queryNodeCluster) RegisterNode(ctx context.Context, session *sessionutil.Session, id UniqueID) error {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
@ -391,7 +391,7 @@ func (c *queryNodeCluster) RegisterNode(session *sessionutil.Session, id UniqueI
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
node, err := newQueryNode(session.Address, id, c.client)
|
||||
node, err := newQueryNode(ctx, session.Address, id, c.client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -90,7 +90,7 @@ func (qc *QueryCoord) RegisterNode(ctx context.Context, req *querypb.RegisterNod
|
||||
ServerID: nodeID,
|
||||
Address: fmt.Sprintf("%s:%d", req.Address.Ip, req.Address.Port),
|
||||
}
|
||||
err := qc.cluster.RegisterNode(session, req.Base.SourceID)
|
||||
err := qc.cluster.RegisterNode(ctx, session, req.Base.SourceID)
|
||||
if err != nil {
|
||||
log.Debug("register query node new NodeClient failed", zap.Any("QueryNodeID", nodeID), zap.String("address", req.Address.String()))
|
||||
return &querypb.RegisterNodeResponse{
|
||||
|
||||
@ -184,7 +184,7 @@ func (qc *QueryCoord) watchNodeLoop() {
|
||||
for nodeID, session := range sessionMap {
|
||||
if _, ok := qc.cluster.nodes[nodeID]; !ok {
|
||||
serverID := session.ServerID
|
||||
err := qc.cluster.RegisterNode(session, serverID)
|
||||
err := qc.cluster.RegisterNode(ctx, session, serverID)
|
||||
if err != nil {
|
||||
log.Error("register queryNode error", zap.Any("error", err.Error()))
|
||||
}
|
||||
@ -228,7 +228,7 @@ func (qc *QueryCoord) watchNodeLoop() {
|
||||
switch event.EventType {
|
||||
case sessionutil.SessionAddEvent:
|
||||
serverID := event.Session.ServerID
|
||||
err := qc.cluster.RegisterNode(event.Session, serverID)
|
||||
err := qc.cluster.RegisterNode(ctx, event.Session, serverID)
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
}
|
||||
|
||||
@ -26,6 +26,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
)
|
||||
|
||||
type queryNode struct {
|
||||
@ -40,8 +41,8 @@ type queryNode struct {
|
||||
onService bool
|
||||
}
|
||||
|
||||
func newQueryNode(address string, id UniqueID, kv *etcdkv.EtcdKV) (*queryNode, error) {
|
||||
client, err := nodeclient.NewClient(context.TODO(), address, 300)
|
||||
func newQueryNode(ctx context.Context, address string, id UniqueID, kv *etcdkv.EtcdKV) (*queryNode, error) {
|
||||
client, err := nodeclient.NewClient(ctx, address, retry.Attempts(300))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -134,7 +134,7 @@ func (node *QueryNode) Init() error {
|
||||
return err
|
||||
}
|
||||
log.Debug("queryNode try to connect etcd")
|
||||
err := retry.Do(context.TODO(), connectEtcdFn, retry.Attempts(300))
|
||||
err := retry.Do(node.queryNodeLoopCtx, connectEtcdFn, retry.Attempts(300))
|
||||
if err != nil {
|
||||
log.Debug("queryNode try to connect etcd failed", zap.Error(err))
|
||||
return err
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user