mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix proxy to querynode heartbeat failed counter logic (#26563)
Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
a398cf5d69
commit
2aab9b3234
@ -142,6 +142,9 @@ func (b *LookAsideBalancer) UpdateCostMetrics(node int64, cost *internalpb.CostA
|
||||
b.metricsMap.Insert(node, cost)
|
||||
}
|
||||
b.metricsUpdateTs.Insert(node, time.Now().UnixMilli())
|
||||
|
||||
// one query/search succeed, we regard heartbeat succeed, clear heartbeat failed counter
|
||||
b.trySetQueryNodeReachable(node)
|
||||
}
|
||||
|
||||
// calculateScore compute the query node's workload score
|
||||
@ -214,28 +217,9 @@ func (b *LookAsideBalancer) checkQueryNodeHealthLoop(ctx context.Context) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), checkInterval)
|
||||
defer cancel()
|
||||
|
||||
setUnreachable := func(err error) bool {
|
||||
failures, ok := b.failedHeartBeatCounter.Get(node)
|
||||
if !ok {
|
||||
failures = atomic.NewInt64(0)
|
||||
}
|
||||
failures.Inc()
|
||||
b.failedHeartBeatCounter.Insert(node, failures)
|
||||
|
||||
if failures.Load() < Params.ProxyCfg.RetryTimesOnHealthCheck.GetAsInt64() {
|
||||
log.Warn("get component status failed",
|
||||
zap.Int64("node", node),
|
||||
zap.Int64("times", failures.Load()),
|
||||
zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
return b.unreachableQueryNodes.Insert(node)
|
||||
}
|
||||
|
||||
qn, err := b.clientMgr.GetClient(ctx, node)
|
||||
if err != nil {
|
||||
if setUnreachable(err) {
|
||||
if b.trySetQueryNodeUnReachable(node, err) {
|
||||
log.Warn("get client failed, set node unreachable", zap.Int64("node", node), zap.Error(err))
|
||||
}
|
||||
return struct{}{}, nil
|
||||
@ -243,29 +227,22 @@ func (b *LookAsideBalancer) checkQueryNodeHealthLoop(ctx context.Context) {
|
||||
|
||||
resp, err := qn.GetComponentStates(ctx)
|
||||
if err != nil {
|
||||
if setUnreachable(err) {
|
||||
log.Warn("get component status failed,set node unreachable", zap.Int64("node", node), zap.Error(err))
|
||||
if b.trySetQueryNodeUnReachable(node, err) {
|
||||
log.Warn("get component status failed, set node unreachable", zap.Int64("node", node), zap.Error(err))
|
||||
}
|
||||
return struct{}{}, nil
|
||||
}
|
||||
|
||||
if resp.GetState().GetStateCode() != commonpb.StateCode_Healthy {
|
||||
if setUnreachable(merr.ErrServiceUnavailable) {
|
||||
log.Warn("component status unhealthy,set node unreachable", zap.Int64("node", node), zap.Error(err))
|
||||
if b.trySetQueryNodeUnReachable(node, merr.ErrServiceUnavailable) {
|
||||
log.Warn("component status unhealthy, set node unreachable", zap.Int64("node", node), zap.Error(err))
|
||||
}
|
||||
return struct{}{}, nil
|
||||
}
|
||||
|
||||
// check health successfully, update check health ts
|
||||
// check health successfully, try set query node reachable
|
||||
b.metricsUpdateTs.Insert(node, time.Now().Local().UnixMilli())
|
||||
if b.unreachableQueryNodes.TryRemove(node) {
|
||||
// once heartbeat succeed, clear filed counter
|
||||
failures, ok := b.failedHeartBeatCounter.Get(node)
|
||||
if ok {
|
||||
failures.Store(0)
|
||||
}
|
||||
log.Info("component recuperated, set node reachable", zap.Int64("node", node), zap.Error(err))
|
||||
}
|
||||
b.trySetQueryNodeReachable(node)
|
||||
|
||||
return struct{}{}, nil
|
||||
}))
|
||||
@ -277,3 +254,33 @@ func (b *LookAsideBalancer) checkQueryNodeHealthLoop(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *LookAsideBalancer) trySetQueryNodeUnReachable(node int64, err error) bool {
|
||||
failures, ok := b.failedHeartBeatCounter.Get(node)
|
||||
if !ok {
|
||||
failures = atomic.NewInt64(0)
|
||||
}
|
||||
failures.Inc()
|
||||
b.failedHeartBeatCounter.Insert(node, failures)
|
||||
|
||||
if failures.Load() < Params.ProxyCfg.RetryTimesOnHealthCheck.GetAsInt64() {
|
||||
log.Warn("get component status failed",
|
||||
zap.Int64("node", node),
|
||||
zap.Int64("times", failures.Load()),
|
||||
zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
return b.unreachableQueryNodes.Insert(node)
|
||||
}
|
||||
|
||||
func (b *LookAsideBalancer) trySetQueryNodeReachable(node int64) {
|
||||
// once heartbeat succeed, clear failed counter
|
||||
failures, ok := b.failedHeartBeatCounter.Get(node)
|
||||
if ok {
|
||||
failures.Store(0)
|
||||
}
|
||||
if b.unreachableQueryNodes.TryRemove(node) {
|
||||
log.Info("component recuperated, set node reachable", zap.Int64("node", node))
|
||||
}
|
||||
}
|
||||
|
||||
@ -323,6 +323,11 @@ func (suite *LookAsideBalancerSuite) TestCheckHealthLoop() {
|
||||
suite.ErrorIs(err, merr.ErrServiceUnavailable)
|
||||
suite.Equal(int64(-1), targetNode)
|
||||
|
||||
suite.balancer.UpdateCostMetrics(1, &internalpb.CostAggregation{})
|
||||
suite.Eventually(func() bool {
|
||||
return !suite.balancer.unreachableQueryNodes.Contain(1)
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
|
||||
suite.Eventually(func() bool {
|
||||
return !suite.balancer.unreachableQueryNodes.Contain(2)
|
||||
}, 5*time.Second, 100*time.Millisecond)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user