diff --git a/internal/proxy/look_aside_balancer.go b/internal/proxy/look_aside_balancer.go index ca254e1ef8..388e08f3a5 100644 --- a/internal/proxy/look_aside_balancer.go +++ b/internal/proxy/look_aside_balancer.go @@ -142,6 +142,9 @@ func (b *LookAsideBalancer) UpdateCostMetrics(node int64, cost *internalpb.CostA b.metricsMap.Insert(node, cost) } b.metricsUpdateTs.Insert(node, time.Now().UnixMilli()) + + // one query/search succeed, we regard heartbeat succeed, clear heartbeat failed counter + b.trySetQueryNodeReachable(node) } // calculateScore compute the query node's workload score @@ -214,28 +217,9 @@ func (b *LookAsideBalancer) checkQueryNodeHealthLoop(ctx context.Context) { ctx, cancel := context.WithTimeout(context.Background(), checkInterval) defer cancel() - setUnreachable := func(err error) bool { - failures, ok := b.failedHeartBeatCounter.Get(node) - if !ok { - failures = atomic.NewInt64(0) - } - failures.Inc() - b.failedHeartBeatCounter.Insert(node, failures) - - if failures.Load() < Params.ProxyCfg.RetryTimesOnHealthCheck.GetAsInt64() { - log.Warn("get component status failed", - zap.Int64("node", node), - zap.Int64("times", failures.Load()), - zap.Error(err)) - return false - } - - return b.unreachableQueryNodes.Insert(node) - } - qn, err := b.clientMgr.GetClient(ctx, node) if err != nil { - if setUnreachable(err) { + if b.trySetQueryNodeUnReachable(node, err) { log.Warn("get client failed, set node unreachable", zap.Int64("node", node), zap.Error(err)) } return struct{}{}, nil @@ -243,29 +227,22 @@ func (b *LookAsideBalancer) checkQueryNodeHealthLoop(ctx context.Context) { resp, err := qn.GetComponentStates(ctx) if err != nil { - if setUnreachable(err) { - log.Warn("get component status failed,set node unreachable", zap.Int64("node", node), zap.Error(err)) + if b.trySetQueryNodeUnReachable(node, err) { + log.Warn("get component status failed, set node unreachable", zap.Int64("node", node), zap.Error(err)) } return struct{}{}, nil } if resp.GetState().GetStateCode() != commonpb.StateCode_Healthy { - if setUnreachable(merr.ErrServiceUnavailable) { - log.Warn("component status unhealthy,set node unreachable", zap.Int64("node", node), zap.Error(err)) + if b.trySetQueryNodeUnReachable(node, merr.ErrServiceUnavailable) { + log.Warn("component status unhealthy, set node unreachable", zap.Int64("node", node), zap.Error(err)) } return struct{}{}, nil } - // check health successfully, update check health ts + // check health successfully, try set query node reachable b.metricsUpdateTs.Insert(node, time.Now().Local().UnixMilli()) - if b.unreachableQueryNodes.TryRemove(node) { - // once heartbeat succeed, clear filed counter - failures, ok := b.failedHeartBeatCounter.Get(node) - if ok { - failures.Store(0) - } - log.Info("component recuperated, set node reachable", zap.Int64("node", node), zap.Error(err)) - } + b.trySetQueryNodeReachable(node) return struct{}{}, nil })) @@ -277,3 +254,33 @@ func (b *LookAsideBalancer) checkQueryNodeHealthLoop(ctx context.Context) { } } } + +func (b *LookAsideBalancer) trySetQueryNodeUnReachable(node int64, err error) bool { + failures, ok := b.failedHeartBeatCounter.Get(node) + if !ok { + failures = atomic.NewInt64(0) + } + failures.Inc() + b.failedHeartBeatCounter.Insert(node, failures) + + if failures.Load() < Params.ProxyCfg.RetryTimesOnHealthCheck.GetAsInt64() { + log.Warn("get component status failed", + zap.Int64("node", node), + zap.Int64("times", failures.Load()), + zap.Error(err)) + return false + } + + return b.unreachableQueryNodes.Insert(node) +} + +func (b *LookAsideBalancer) trySetQueryNodeReachable(node int64) { + // once heartbeat succeed, clear failed counter + failures, ok := b.failedHeartBeatCounter.Get(node) + if ok { + failures.Store(0) + } + if b.unreachableQueryNodes.TryRemove(node) { + log.Info("component recuperated, set node reachable", zap.Int64("node", node)) + } +} diff --git a/internal/proxy/look_aside_balancer_test.go b/internal/proxy/look_aside_balancer_test.go index f08f64e749..0b19cb464f 100644 --- a/internal/proxy/look_aside_balancer_test.go +++ b/internal/proxy/look_aside_balancer_test.go @@ -323,6 +323,11 @@ func (suite *LookAsideBalancerSuite) TestCheckHealthLoop() { suite.ErrorIs(err, merr.ErrServiceUnavailable) suite.Equal(int64(-1), targetNode) + suite.balancer.UpdateCostMetrics(1, &internalpb.CostAggregation{}) + suite.Eventually(func() bool { + return !suite.balancer.unreachableQueryNodes.Contain(1) + }, 3*time.Second, 100*time.Millisecond) + suite.Eventually(func() bool { return !suite.balancer.unreachableQueryNodes.Contain(2) }, 5*time.Second, 100*time.Millisecond)