From dc18d2aa8a78eac714407e1d664cce81ac5fe0d7 Mon Sep 17 00:00:00 2001 From: wei liu Date: Tue, 6 Jan 2026 18:27:24 +0800 Subject: [PATCH] fix: Use leaderID for LeaderAction stale check in scheduler (#46785) issue: #46737 PR #46440 refactored checkStale to use action.Node() for all action types, which breaks LeaderAction stale checking. For LeaderAction, Node() returns the worker node where segment resides, but the task is executed on the leader node (delegator). When syncing segments from RO worker nodes to a RW delegator, using action.Node() incorrectly marks the task as stale because the worker is RO, even though the leader is RW and the task should proceed. This fix: - Uses leaderID instead of Node() for LeaderAction stale checking - Adds detailed comments explaining the distinction - Adds unit tests covering the RO worker with RW leader scenario Signed-off-by: Wei Liu --- internal/querycoordv2/task/scheduler.go | 36 ++++-- internal/querycoordv2/task/task_test.go | 153 +++++++++++++++++++++++- 2 files changed, 177 insertions(+), 12 deletions(-) diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 67a4ec0b62..ebe80f9683 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -1127,21 +1127,35 @@ func (scheduler *taskScheduler) checkStale(task Task) error { } for _, action := range task.Actions() { - nodeInfo := scheduler.nodeMgr.Get(action.Node()) - if nodeInfo == nil { - log.Warn("task stale due to node not found", zap.Int64("nodeID", action.Node())) - return merr.WrapErrNodeNotFound(action.Node()) + // Determine the target node for stale checking. + // For LeaderAction, we need to check the leader node (delegator) instead of the worker node. + // This is because LeaderAction.Node() returns the worker node where the segment resides, + // but the task is executed on the leader node. If the worker node is an RO node while + // the leader node is still RW, the task should NOT be marked as stale. + // See issue #46737: Using action.Node() for LeaderAction incorrectly marks tasks as stale + // when syncing segments from RO nodes to the delegator, blocking balance channel operations. + var targetNode int64 + switch a := action.(type) { + case *LeaderAction: + targetNode = a.GetLeaderID() + default: + targetNode = a.Node() } - switch action.Type() { - case ActionTypeGrow: + + nodeInfo := scheduler.nodeMgr.Get(targetNode) + if nodeInfo == nil { + log.Warn("task stale due to node not found", zap.Int64("nodeID", targetNode)) + return merr.WrapErrNodeNotFound(targetNode) + } + if action.Type() == ActionTypeGrow { if nodeInfo.IsStoppingState() { - log.Warn("task stale due to node offline", zap.Int64("nodeID", action.Node())) - return merr.WrapErrNodeOffline(action.Node()) + log.Warn("task stale due to node offline", zap.Int64("nodeID", targetNode)) + return merr.WrapErrNodeOffline(targetNode) } - if replica != nil && (replica.ContainRONode(action.Node()) || replica.ContainROSQNode(action.Node())) { - log.Warn("task stale due to node becomes ro node", zap.Int64("nodeID", action.Node())) - return merr.WrapErrNodeStateUnexpected(action.Node(), "node becomes ro node") + if replica != nil && (replica.ContainRONode(targetNode) || replica.ContainROSQNode(targetNode)) { + log.Warn("task stale due to node becomes ro node", zap.Int64("nodeID", targetNode)) + return merr.WrapErrNodeStateUnexpected(targetNode, "node becomes ro node") } } } diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index d2cba0b31d..4a93bd876e 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -187,7 +187,8 @@ func (suite *TaskSuite) BeforeTest(suiteName, testName string) { "TestLeaderTaskSet", "TestLeaderTaskRemove", "TestNoExecutor", - "TestTaskStaleByRONode": + "TestTaskStaleByRONode", + "TestLeaderTaskStaleByRONode": suite.meta.PutCollection(suite.ctx, &meta.Collection{ CollectionLoadInfo: &querypb.CollectionLoadInfo{ CollectionID: suite.collection, @@ -2060,6 +2061,156 @@ func (suite *TaskSuite) TestExecutor_MoveSegmentTask() { suite.ErrorContains(moveTask.Err(), "shard leader changed") } +func (suite *TaskSuite) TestLeaderTaskStaleByRONode() { + ctx := context.Background() + leaderNode := int64(1) + workerNode := int64(3) + channel := &datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: Params.CommonCfg.RootCoordDml.GetValue() + "-test", + } + + // Test case 1: LeaderAction should NOT be stale when worker node is RO but leader is RW + // This is the fix for issue #46737: the checkStale should use leaderID instead of Node() for LeaderAction + suite.Run("WorkerRONodeLeaderRW", func() { + // Create replica with worker node (node 3) as RO node, leader node (node 1) as RW + replicaWithRONode := meta.NewReplica(&querypb.Replica{ + ID: suite.replica.GetID(), + CollectionID: suite.collection, + Nodes: []int64{1, 2}, // RW nodes (leader is RW) + RoNodes: []int64{3}, // RO nodes (worker is RO) + ResourceGroup: meta.DefaultResourceGroupName, + }, typeutil.NewUniqueSet(1, 2)) + suite.meta.ReplicaManager.Put(suite.ctx, replicaWithRONode) + + // Set up channel distribution + suite.dist.ChannelDistManager.Update(leaderNode, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: channel.ChannelName, + }, + Node: leaderNode, + Version: 1, + View: &meta.LeaderView{ID: leaderNode, CollectionID: suite.collection, Channel: channel.ChannelName, Status: &querypb.LeaderViewStatus{Serviceable: true}}, + }) + + // Create a LeaderAction that syncs segment from worker (RO node) to leader (RW node) + // leaderID=1 (RW), workerID=3 (RO) + // Before fix: checkStale used action.Node() which returns workerNode (3, RO), causing false stale + // After fix: checkStale uses leaderID (1, RW), task should NOT be stale + task := NewLeaderSegmentTask( + ctx, + WrapIDSource(0), + suite.collection, + replicaWithRONode, + leaderNode, + NewLeaderAction(leaderNode, workerNode, ActionTypeGrow, channel.GetChannelName(), suite.loadSegments[0], 0), + ) + + // Add task should succeed because leader node is RW + err := suite.scheduler.Add(task) + suite.NoError(err) + + // Task should NOT be stale because we check leader node status, not worker node + suite.Equal(TaskStatusStarted, task.Status()) + suite.NoError(task.Err()) + + // Clean up + suite.scheduler.remove(task) + suite.meta.ReplicaManager.Put(suite.ctx, utils.CreateTestReplica(suite.replica.GetID(), suite.collection, []int64{1, 2, 3})) + }) + + // Test case 2: LeaderAction should be stale when leader node becomes RO + suite.Run("LeaderRONode", func() { + // Set up channel distribution with leader on node 1 + suite.dist.ChannelDistManager.Update(leaderNode, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: channel.ChannelName, + }, + Node: leaderNode, + Version: 1, + View: &meta.LeaderView{ID: leaderNode, CollectionID: suite.collection, Channel: channel.ChannelName, Status: &querypb.LeaderViewStatus{Serviceable: true}}, + }) + + // Create task with original replica (all RW nodes) + task := NewLeaderSegmentTask( + ctx, + WrapIDSource(0), + suite.collection, + suite.replica, + leaderNode, + NewLeaderAction(leaderNode, workerNode, ActionTypeGrow, channel.GetChannelName(), suite.loadSegments[0], 0), + ) + + // Add task should succeed + err := suite.scheduler.Add(task) + suite.NoError(err) + + // Now change replica to make leader node (node 1) as RO node + replicaWithLeaderRO := meta.NewReplica(&querypb.Replica{ + ID: suite.replica.GetID(), + CollectionID: suite.collection, + Nodes: []int64{2, 3}, // RW nodes + RoNodes: []int64{1}, // Leader becomes RO + ResourceGroup: meta.DefaultResourceGroupName, + }, typeutil.NewUniqueSet(2, 3)) + suite.meta.ReplicaManager.Put(suite.ctx, replicaWithLeaderRO) + + // Dispatch will trigger promote which calls checkStale + suite.dispatchAndWait(leaderNode) + + // Task should be canceled because leader node is RO + suite.Equal(TaskStatusCanceled, task.Status()) + suite.ErrorContains(task.Err(), "node becomes ro node") + + // Restore original replica + suite.meta.ReplicaManager.Put(suite.ctx, utils.CreateTestReplica(suite.replica.GetID(), suite.collection, []int64{1, 2, 3})) + }) + + // Test case 3: LeaderAction with Reduce type should NOT be affected by RO node + suite.Run("LeaderReduceActionNotAffected", func() { + // Create replica with leader node as RO + replicaWithLeaderRO := meta.NewReplica(&querypb.Replica{ + ID: suite.replica.GetID(), + CollectionID: suite.collection, + Nodes: []int64{2, 3}, // RW nodes + RoNodes: []int64{1}, // Leader is RO + ResourceGroup: meta.DefaultResourceGroupName, + }, typeutil.NewUniqueSet(2, 3)) + suite.meta.ReplicaManager.Put(suite.ctx, replicaWithLeaderRO) + + // Set up channel distribution + suite.dist.ChannelDistManager.Update(leaderNode, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: channel.ChannelName, + }, + Node: leaderNode, + Version: 1, + View: &meta.LeaderView{ID: leaderNode, CollectionID: suite.collection, Channel: channel.ChannelName, Status: &querypb.LeaderViewStatus{Serviceable: true}}, + }) + + // Create LeaderAction with Reduce type + task := NewLeaderSegmentTask( + ctx, + WrapIDSource(0), + suite.collection, + replicaWithLeaderRO, + leaderNode, + NewLeaderAction(leaderNode, workerNode, ActionTypeReduce, channel.GetChannelName(), suite.releaseSegments[0], 0), + ) + + // Add task should succeed because Reduce action is not affected by RO node check + err := suite.scheduler.Add(task) + suite.NoError(err) + + // Clean up + suite.scheduler.remove(task) + suite.meta.ReplicaManager.Put(suite.ctx, utils.CreateTestReplica(suite.replica.GetID(), suite.collection, []int64{1, 2, 3})) + }) +} + func (suite *TaskSuite) TestTaskStaleByRONode() { ctx := context.Background() timeout := 10 * time.Second