mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 18:18:30 +08:00
fix: grpc client check session skipped due to role not match (#29356)
Related to #28815 --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
f457b9f7c9
commit
f699be79f7
@ -106,8 +106,10 @@ type ClientBase[T interface {
|
|||||||
encryption bool
|
encryption bool
|
||||||
addr atomic.String
|
addr atomic.String
|
||||||
// conn *grpc.ClientConn
|
// conn *grpc.ClientConn
|
||||||
grpcClientMtx sync.RWMutex
|
grpcClientMtx sync.RWMutex
|
||||||
role string
|
role string
|
||||||
|
isNode bool // pre-calculated is node flag
|
||||||
|
|
||||||
ClientMaxSendSize int
|
ClientMaxSendSize int
|
||||||
ClientMaxRecvSize int
|
ClientMaxRecvSize int
|
||||||
CompressionEnabled bool
|
CompressionEnabled bool
|
||||||
@ -159,6 +161,12 @@ func NewClientBase[T interface {
|
|||||||
// SetRole sets role of client
|
// SetRole sets role of client
|
||||||
func (c *ClientBase[T]) SetRole(role string) {
|
func (c *ClientBase[T]) SetRole(role string) {
|
||||||
c.role = role
|
c.role = role
|
||||||
|
if strings.HasPrefix(role, typeutil.DataNodeRole) ||
|
||||||
|
strings.HasPrefix(role, typeutil.IndexNodeRole) ||
|
||||||
|
strings.HasPrefix(role, typeutil.QueryNodeRole) ||
|
||||||
|
strings.HasPrefix(role, typeutil.ProxyRole) {
|
||||||
|
c.isNode = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRole returns role of client
|
// GetRole returns role of client
|
||||||
@ -417,8 +425,7 @@ func (c *ClientBase[T]) checkGrpcErr(ctx context.Context, err error) (needRetry,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClientBase[T]) checkNodeSessionExist(ctx context.Context) (bool, error) {
|
func (c *ClientBase[T]) checkNodeSessionExist(ctx context.Context) (bool, error) {
|
||||||
switch c.GetRole() {
|
if c.isNode {
|
||||||
case typeutil.DataNodeRole, typeutil.IndexNodeRole, typeutil.QueryNodeRole, typeutil.ProxyRole:
|
|
||||||
err := c.verifySession(ctx)
|
err := c.verifySession(ctx)
|
||||||
if errors.Is(err, merr.ErrNodeNotFound) {
|
if errors.Is(err, merr.ErrNodeNotFound) {
|
||||||
log.Warn("failed to verify node session", zap.Error(err))
|
log.Warn("failed to verify node session", zap.Error(err))
|
||||||
|
|||||||
@ -98,6 +98,7 @@ func TestClientBase_NodeSessionNotExist(t *testing.T) {
|
|||||||
base := ClientBase[*mockClient]{
|
base := ClientBase[*mockClient]{
|
||||||
maxCancelError: 10,
|
maxCancelError: 10,
|
||||||
MaxAttempts: 3,
|
MaxAttempts: 3,
|
||||||
|
isNode: true,
|
||||||
}
|
}
|
||||||
base.SetGetAddrFunc(func() (string, error) {
|
base.SetGetAddrFunc(func() (string, error) {
|
||||||
return "", errors.New("mocked address error")
|
return "", errors.New("mocked address error")
|
||||||
@ -148,6 +149,7 @@ func testCall(t *testing.T, compressed bool) {
|
|||||||
base := ClientBase[*mockClient]{
|
base := ClientBase[*mockClient]{
|
||||||
maxCancelError: 10,
|
maxCancelError: 10,
|
||||||
MaxAttempts: 3,
|
MaxAttempts: 3,
|
||||||
|
isNode: true,
|
||||||
}
|
}
|
||||||
base.CompressionEnabled = compressed
|
base.CompressionEnabled = compressed
|
||||||
initClient := func() {
|
initClient := func() {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user