diff --git a/internal/util/grpcclient/client.go b/internal/util/grpcclient/client.go index d407d428bc..cc33d5b562 100644 --- a/internal/util/grpcclient/client.go +++ b/internal/util/grpcclient/client.go @@ -377,7 +377,11 @@ func (c *ClientBase[T]) checkErr(ctx context.Context, err error) (needRetry, nee } return true, true, err case IsServerIDMismatchErr(err): - fallthrough + if ok, err := c.checkNodeSessionExist(ctx); !ok { + // if session doesn't exist, no need to retry for datanode/indexnode/querynode + return false, false, err + } + return true, true, err case IsCrossClusterRoutingErr(err): return true, true, err default: @@ -387,6 +391,19 @@ func (c *ClientBase[T]) checkErr(ctx context.Context, err error) (needRetry, nee } } +func (c *ClientBase[T]) checkNodeSessionExist(ctx context.Context) (bool, error) { + switch c.GetRole() { + case typeutil.DataNodeRole, typeutil.IndexNodeRole, typeutil.QueryNodeRole: + err := c.verifySession(ctx) + if err != nil && errors.Is(err, merr.ErrNodeNotFound) { + log.Warn("failed to verify node session", zap.Error(err)) + // stop retry + return false, err + } + } + return true, nil +} + func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, error)) (any, error) { log := log.Ctx(ctx).With(zap.String("client_role", c.GetRole())) var ( @@ -412,15 +429,9 @@ func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, er defer cancel() err := retry.Do(ctx, func() error { if generic.IsZero(client) { - switch c.GetRole() { - case typeutil.DataNodeRole, typeutil.IndexNodeRole, typeutil.QueryNodeRole: + if ok, err := c.checkNodeSessionExist(ctx); !ok { // if session doesn't exist, no need to reset connection for datanode/indexnode/querynode - err := c.verifySession(ctx) - if err != nil && errors.Is(err, merr.ErrNodeNotFound) { - log.Warn("failed to verify node session", zap.Error(err)) - // stop retry - return retry.Unrecoverable(err) - } + return retry.Unrecoverable(err) } err := errors.Wrap(clientErr, "empty grpc client") diff --git a/internal/util/grpcclient/client_test.go b/internal/util/grpcclient/client_test.go index e567f71b26..32cd123c53 100644 --- a/internal/util/grpcclient/client_test.go +++ b/internal/util/grpcclient/client_test.go @@ -115,6 +115,15 @@ func TestClientBase_NodeSessionNotExist(t *testing.T) { return struct{}{}, nil }) assert.True(t, errors.Is(err, merr.ErrNodeNotFound)) + + // test node already down, but new node start up with same ip and port + base.grpcClientMtx.Lock() + base.grpcClient = &mockClient{} + base.grpcClientMtx.Unlock() + _, err = base.Call(ctx, func(client *mockClient) (any, error) { + return struct{}{}, merr.ErrNodeNotMatch + }) + assert.True(t, errors.Is(err, merr.ErrNodeNotFound)) } func TestClientBase_Call(t *testing.T) {