From 177ddda47fcf946b7cbddef398f4e3043160d489 Mon Sep 17 00:00:00 2001 From: wei liu Date: Tue, 9 Apr 2024 15:33:25 +0800 Subject: [PATCH] fix: Check stale should check leader task's leader id (#31962) issue: #30816 check stale rules for leader task: 1. for reduce leader task, it should keep executing until leader's node become offline. 2. for grow leader task,it should keep executing until leader's node become stopping. This PR check leader node's stopping state for grow leader task Signed-off-by: Wei Liu --- internal/querycoordv2/checkers/leader_checker.go | 8 +------- internal/querycoordv2/task/action.go | 4 ++++ internal/querycoordv2/task/scheduler.go | 2 +- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/internal/querycoordv2/checkers/leader_checker.go b/internal/querycoordv2/checkers/leader_checker.go index 99e7937e3e..5950748c16 100644 --- a/internal/querycoordv2/checkers/leader_checker.go +++ b/internal/querycoordv2/checkers/leader_checker.go @@ -20,7 +20,6 @@ import ( "context" "time" - "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -122,12 +121,7 @@ func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica *met ) ret := make([]task.Task, 0) - // skip set segment on stopping node to leader view - aliveNodeDist := lo.Filter(dist, func(s *meta.Segment, _ int) bool { - nodeInfo := c.nodeMgr.Get(s.Node) - return nodeInfo != nil && nodeInfo.GetState() != session.NodeStateStopping - }) - latestNodeDist := utils.FindMaxVersionSegments(aliveNodeDist) + latestNodeDist := utils.FindMaxVersionSegments(dist) for _, s := range latestNodeDist { segment := c.target.GetSealedSegment(leaderView.CollectionID, s.GetID(), meta.CurrentTargetFirst) existInTarget := segment != nil diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index 1412c9ccec..8dd291badd 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -191,6 +191,10 @@ func (action *LeaderAction) Version() typeutil.UniqueID { return action.version } +func (action *LeaderAction) GetLeaderID() typeutil.UniqueID { + return action.leaderID +} + func (action *LeaderAction) IsFinished(distMgr *meta.DistributionManager) bool { views := distMgr.LeaderViewManager.GetLeaderView(action.leaderID) view := views[action.Shard()] diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 641b0d6e70..170c456fd5 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -966,7 +966,7 @@ func (scheduler *taskScheduler) checkLeaderTaskStale(task *LeaderTask) error { for _, action := range task.Actions() { switch action.Type() { case ActionTypeGrow: - if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok { + if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.(*LeaderAction).GetLeaderID()); ok { log.Warn("task stale due to node offline", zap.Int64("segment", task.segmentID)) return merr.WrapErrNodeOffline(action.Node()) }