diff --git a/internal/util/grpcclient/client.go b/internal/util/grpcclient/client.go index 08dfc74d10..8a6912df56 100644 --- a/internal/util/grpcclient/client.go +++ b/internal/util/grpcclient/client.go @@ -218,13 +218,13 @@ func (c *ClientBase[T]) GetGrpcClient(ctx context.Context) (*clientConnWrapper[T return c.grpcClient, nil } -func (c *ClientBase[T]) resetConnection(wrapper *clientConnWrapper[T]) { - if time.Since(c.lastReset.Load()) < c.minResetInterval { +func (c *ClientBase[T]) resetConnection(wrapper *clientConnWrapper[T], forceReset bool) { + if !forceReset && time.Since(c.lastReset.Load()) < c.minResetInterval { return } c.grpcClientMtx.Lock() defer c.grpcClientMtx.Unlock() - if time.Since(c.lastReset.Load()) < c.minResetInterval { + if !forceReset && time.Since(c.lastReset.Load()) < c.minResetInterval { return } if generic.IsZero(c.grpcClient) { @@ -396,12 +396,12 @@ func (c *ClientBase[T]) needResetCancel() (needReset bool) { return false } -func (c *ClientBase[T]) checkGrpcErr(ctx context.Context, err error) (needRetry, needReset bool, retErr error) { +func (c *ClientBase[T]) checkGrpcErr(ctx context.Context, err error) (needRetry, needReset, forceReset bool, retErr error) { log := log.Ctx(ctx).With(zap.String("clientRole", c.GetRole())) // Unknown err if !funcutil.IsGrpcErr(err) { log.Warn("fail to grpc call because of unknown error", zap.Error(err)) - return false, false, err + return false, false, false, err } // grpc err @@ -409,22 +409,25 @@ func (c *ClientBase[T]) checkGrpcErr(ctx context.Context, err error) (needRetry, switch { case funcutil.IsGrpcErr(err, codes.Canceled, codes.DeadlineExceeded): // canceled or deadline exceeded - return true, c.needResetCancel(), err + return true, c.needResetCancel(), false, err case funcutil.IsGrpcErr(err, codes.Unimplemented): // for unimplemented error, reset coord connection to avoid old coord's side effect. // old coord's side effect: when coord changed, the connection in coord's client won't reset automatically. // so if new interface appear in new coord, will got a unimplemented error - return false, true, merr.WrapErrServiceUnimplemented(err) + return false, true, true, merr.WrapErrServiceUnimplemented(err) case IsServerIDMismatchErr(err): if ok := c.checkNodeSessionExist(ctx); !ok { // if session doesn't exist, no need to retry for datanode/indexnode/querynode/proxy - return false, false, err + return false, false, false, err } - return true, true, err + return true, true, true, err case IsCrossClusterRoutingErr(err): - return true, true, err + return true, true, true, err + case funcutil.IsGrpcErr(err, codes.Unavailable): + // for unavailable error in coord, force to reset coord connection + return true, true, !c.isNode, err default: - return true, true, err + return true, true, false, err } } @@ -454,8 +457,8 @@ func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, er log.Warn("fail to get grpc client", zap.Error(clientErr)) } - resetClientFunc := func() { - c.resetConnection(wrapper) + resetClientFunc := func(forceReset bool) { + c.resetConnection(wrapper, forceReset) wrapper, clientErr = c.GetGrpcClient(ctx) if clientErr != nil { log.Warn("fail to get grpc client in the retry state", zap.Error(clientErr)) @@ -473,7 +476,7 @@ func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, er err := errors.Wrap(clientErr, "empty grpc client") log.Warn("grpc client is nil, maybe fail to get client in the retry state", zap.Error(err)) - resetClientFunc() + resetClientFunc(false) return true, err } @@ -483,17 +486,17 @@ func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, er wrapper.Unpin() if err != nil { - var needRetry, needReset bool - needRetry, needReset, err = c.checkGrpcErr(ctx, err) + var needRetry, needReset, forceReset bool + needRetry, needReset, forceReset, err = c.checkGrpcErr(ctx, err) if needReset { log.Warn("start to reset connection because of specific reasons", zap.Error(err)) - resetClientFunc() + resetClientFunc(forceReset) } else { // err occurs but no need to reset connection, try to verify session err := c.verifySession(ctx) if err != nil { log.Warn("failed to verify session, reset connection", zap.Error(err)) - resetClientFunc() + resetClientFunc(forceReset) } } return needRetry, err diff --git a/internal/util/grpcclient/client_test.go b/internal/util/grpcclient/client_test.go index e5f9078562..37a0fd4318 100644 --- a/internal/util/grpcclient/client_test.go +++ b/internal/util/grpcclient/client_test.go @@ -370,28 +370,38 @@ func TestClientBase_CheckGrpcError(t *testing.T) { base.MaxAttempts = 1 ctx := context.Background() - retry, reset, _ := base.checkGrpcErr(ctx, status.Errorf(codes.Canceled, "fake context canceled")) + retry, reset, forceReset, _ := base.checkGrpcErr(ctx, status.Errorf(codes.Canceled, "fake context canceled")) assert.True(t, retry) assert.True(t, reset) + assert.False(t, forceReset) - retry, reset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unimplemented, "fake context canceled")) + retry, reset, forceReset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unimplemented, "fake context canceled")) assert.False(t, retry) assert.True(t, reset) + assert.True(t, forceReset) + + retry, reset, forceReset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unavailable, "fake context canceled")) + assert.True(t, retry) + assert.True(t, reset) + assert.True(t, forceReset) // test serverId mismatch - retry, reset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unknown, merr.ErrNodeNotMatch.Error())) + retry, reset, forceReset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unknown, merr.ErrNodeNotMatch.Error())) assert.True(t, retry) assert.True(t, reset) + assert.True(t, forceReset) // test cross cluster - retry, reset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unknown, merr.ErrServiceCrossClusterRouting.Error())) + retry, reset, forceReset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unknown, merr.ErrServiceCrossClusterRouting.Error())) assert.True(t, retry) assert.True(t, reset) + assert.True(t, forceReset) // test default - retry, reset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unknown, merr.ErrNodeNotFound.Error())) + retry, reset, forceReset, _ = base.checkGrpcErr(ctx, status.Errorf(codes.Unknown, merr.ErrNodeNotFound.Error())) assert.True(t, retry) assert.True(t, reset) + assert.False(t, forceReset) } type server struct {