mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Add indexnode grpc retry (#5418)
Add indexnode grpc retry. Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
parent
74ea346e7c
commit
9aa1fd25fa
@ -13,6 +13,7 @@ package grpcindexnodeclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
@ -30,14 +31,26 @@ import (
|
||||
|
||||
type Client struct {
|
||||
grpcClient indexpb.IndexNodeClient
|
||||
address string
|
||||
conn *grpc.ClientConn
|
||||
ctx context.Context
|
||||
|
||||
address string
|
||||
|
||||
timeout time.Duration
|
||||
reconnTry int
|
||||
recallTry int
|
||||
}
|
||||
|
||||
func NewClient(nodeAddress string) (*Client, error) {
|
||||
func NewClient(address string, timeout time.Duration) (*Client, error) {
|
||||
if address == "" {
|
||||
return nil, fmt.Errorf("address is empty")
|
||||
}
|
||||
return &Client{
|
||||
address: nodeAddress,
|
||||
ctx: context.Background(),
|
||||
address: address,
|
||||
ctx: context.Background(),
|
||||
timeout: timeout,
|
||||
recallTry: 3,
|
||||
reconnTry: 10,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -53,16 +66,59 @@ func (c *Client) Init() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.grpcClient = indexpb.NewIndexNodeClient(conn)
|
||||
c.conn = conn
|
||||
return nil
|
||||
}
|
||||
err := retry.Retry(100000, time.Millisecond*200, connectGrpcFunc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.grpcClient = indexpb.NewIndexNodeClient(c.conn)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) reconnect() error {
|
||||
tracer := opentracing.GlobalTracer()
|
||||
var err error
|
||||
connectGrpcFunc := func() error {
|
||||
log.Debug("indexnode connect ", zap.String("address", c.address))
|
||||
conn, err := grpc.DialContext(c.ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(),
|
||||
grpc.WithUnaryInterceptor(
|
||||
otgrpc.OpenTracingClientInterceptor(tracer)),
|
||||
grpc.WithStreamInterceptor(
|
||||
otgrpc.OpenTracingStreamClientInterceptor(tracer)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.conn = conn
|
||||
return nil
|
||||
}
|
||||
|
||||
err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.grpcClient = indexpb.NewIndexNodeClient(c.conn)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) {
|
||||
ret, err := caller()
|
||||
if err == nil {
|
||||
return ret, nil
|
||||
}
|
||||
for i := 0; i < c.recallTry; i++ {
|
||||
err = c.reconnect()
|
||||
if err == nil {
|
||||
ret, err = caller()
|
||||
if err == nil {
|
||||
return ret, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func (c *Client) Start() error {
|
||||
return nil
|
||||
}
|
||||
@ -77,21 +133,36 @@ func (c *Client) Register() error {
|
||||
}
|
||||
|
||||
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
|
||||
return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
|
||||
ret, err := c.recall(func() (interface{}, error) {
|
||||
return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
|
||||
})
|
||||
return ret.(*internalpb.ComponentStates), err
|
||||
}
|
||||
|
||||
func (c *Client) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
return c.grpcClient.GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{})
|
||||
ret, err := c.recall(func() (interface{}, error) {
|
||||
return c.grpcClient.GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{})
|
||||
})
|
||||
return ret.(*milvuspb.StringResponse), err
|
||||
}
|
||||
|
||||
func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
return c.grpcClient.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
|
||||
ret, err := c.recall(func() (interface{}, error) {
|
||||
return c.grpcClient.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
|
||||
})
|
||||
return ret.(*milvuspb.StringResponse), err
|
||||
}
|
||||
|
||||
func (c *Client) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*commonpb.Status, error) {
|
||||
return c.grpcClient.BuildIndex(ctx, req)
|
||||
ret, err := c.recall(func() (interface{}, error) {
|
||||
return c.grpcClient.BuildIndex(ctx, req)
|
||||
})
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
func (c *Client) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
|
||||
return c.grpcClient.DropIndex(ctx, req)
|
||||
ret, err := c.recall(func() (interface{}, error) {
|
||||
return c.grpcClient.DropIndex(ctx, req)
|
||||
})
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
@ -41,7 +41,7 @@ func (i *IndexService) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest
|
||||
}
|
||||
|
||||
nodeAddress := req.Address.Ip + ":" + strconv.FormatInt(req.Address.Port, 10)
|
||||
nodeClient, err := grpcindexnodeclient.NewClient(nodeAddress)
|
||||
nodeClient, err := grpcindexnodeclient.NewClient(nodeAddress, 10)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user