From 16dc26833b02f4fa1ea28439cef4330b78a34a0e Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 9 Nov 2023 10:10:19 +0800 Subject: [PATCH] Fix retry when proxy stopped (#28263) Signed-off-by: Wei Liu --- internal/rootcoord/proxy_client_manager.go | 5 +++++ internal/rootcoord/proxy_client_manager_test.go | 14 ++++++++++++++ internal/util/grpcclient/client.go | 4 ++-- internal/util/grpcclient/client_test.go | 11 ++++++++++- 4 files changed, 31 insertions(+), 3 deletions(-) diff --git a/internal/rootcoord/proxy_client_manager.go b/internal/rootcoord/proxy_client_manager.go index 3b5af86fc5..1f6f495ffa 100644 --- a/internal/rootcoord/proxy_client_manager.go +++ b/internal/rootcoord/proxy_client_manager.go @@ -21,6 +21,7 @@ import ( "fmt" "sync" + "github.com/cockroachdb/errors" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -155,6 +156,10 @@ func (p *proxyClientManager) InvalidateCollectionMetaCache(ctx context.Context, group.Go(func() error { sta, err := v.InvalidateCollectionMetaCache(ctx, request) if err != nil { + if errors.Is(err, merr.ErrNodeNotFound) { + log.Warn("InvalidateCollectionMetaCache failed due to proxy service not found", zap.Error(err)) + return nil + } return fmt.Errorf("InvalidateCollectionMetaCache failed, proxyID = %d, err = %s", k, err) } if sta.ErrorCode != commonpb.ErrorCode_Success { diff --git a/internal/rootcoord/proxy_client_manager_test.go b/internal/rootcoord/proxy_client_manager_test.go index 9892776c62..0e1f017b14 100644 --- a/internal/rootcoord/proxy_client_manager_test.go +++ b/internal/rootcoord/proxy_client_manager_test.go @@ -196,6 +196,20 @@ func TestProxyClientManager_InvalidateCollectionMetaCache(t *testing.T) { assert.Error(t, err) }) + t.Run("mock proxy service down", func(t *testing.T) { + ctx := context.Background() + p1 := newMockProxy() + p1.InvalidateCollectionMetaCacheFunc = func(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { + return nil, merr.ErrNodeNotFound + } + pcm := &proxyClientManager{proxyClient: map[int64]types.ProxyClient{ + TestProxyID: p1, + }} + + err := pcm.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{}) + assert.NoError(t, err) + }) + t.Run("normal case", func(t *testing.T) { ctx := context.Background() p1 := newMockProxy() diff --git a/internal/util/grpcclient/client.go b/internal/util/grpcclient/client.go index d9b83a067b..ff53082e6b 100644 --- a/internal/util/grpcclient/client.go +++ b/internal/util/grpcclient/client.go @@ -392,9 +392,9 @@ func (c *ClientBase[T]) checkGrpcErr(ctx context.Context, err error) (needRetry, func (c *ClientBase[T]) checkNodeSessionExist(ctx context.Context) (bool, error) { switch c.GetRole() { - case typeutil.DataNodeRole, typeutil.IndexNodeRole, typeutil.QueryNodeRole: + case typeutil.DataNodeRole, typeutil.IndexNodeRole, typeutil.QueryNodeRole, typeutil.ProxyRole: err := c.verifySession(ctx) - if err != nil && errors.Is(err, merr.ErrNodeNotFound) { + if errors.Is(err, merr.ErrNodeNotFound) { log.Warn("failed to verify node session", zap.Error(err)) // stop retry return false, err diff --git a/internal/util/grpcclient/client_test.go b/internal/util/grpcclient/client_test.go index cb27f3d639..421e4b0ef9 100644 --- a/internal/util/grpcclient/client_test.go +++ b/internal/util/grpcclient/client_test.go @@ -116,7 +116,7 @@ func TestClientBase_NodeSessionNotExist(t *testing.T) { }) assert.True(t, errors.Is(err, merr.ErrNodeNotFound)) - // test node already down, but new node start up with same ip and port + // test querynode/datanode/indexnode/proxy already down, but new node start up with same ip and port base.grpcClientMtx.Lock() base.grpcClient = &mockClient{} base.grpcClientMtx.Unlock() @@ -124,6 +124,15 @@ func TestClientBase_NodeSessionNotExist(t *testing.T) { return struct{}{}, status.Errorf(codes.Unknown, merr.ErrNodeNotMatch.Error()) }) assert.True(t, errors.Is(err, merr.ErrNodeNotFound)) + + // test querynode/datanode/indexnode/proxy down, return unavailable error + base.grpcClientMtx.Lock() + base.grpcClient = &mockClient{} + base.grpcClientMtx.Unlock() + _, err = base.Call(ctx, func(client *mockClient) (any, error) { + return struct{}{}, status.Errorf(codes.Unavailable, "fake error") + }) + assert.True(t, errors.Is(err, merr.ErrNodeNotFound)) } func TestClientBase_Call(t *testing.T) {