diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 67a4ec0b62..ebe80f9683 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -1127,21 +1127,35 @@ func (scheduler *taskScheduler) checkStale(task Task) error { } for _, action := range task.Actions() { - nodeInfo := scheduler.nodeMgr.Get(action.Node()) - if nodeInfo == nil { - log.Warn("task stale due to node not found", zap.Int64("nodeID", action.Node())) - return merr.WrapErrNodeNotFound(action.Node()) + // Determine the target node for stale checking. + // For LeaderAction, we need to check the leader node (delegator) instead of the worker node. + // This is because LeaderAction.Node() returns the worker node where the segment resides, + // but the task is executed on the leader node. If the worker node is an RO node while + // the leader node is still RW, the task should NOT be marked as stale. + // See issue #46737: Using action.Node() for LeaderAction incorrectly marks tasks as stale + // when syncing segments from RO nodes to the delegator, blocking balance channel operations. + var targetNode int64 + switch a := action.(type) { + case *LeaderAction: + targetNode = a.GetLeaderID() + default: + targetNode = a.Node() } - switch action.Type() { - case ActionTypeGrow: + + nodeInfo := scheduler.nodeMgr.Get(targetNode) + if nodeInfo == nil { + log.Warn("task stale due to node not found", zap.Int64("nodeID", targetNode)) + return merr.WrapErrNodeNotFound(targetNode) + } + if action.Type() == ActionTypeGrow { if nodeInfo.IsStoppingState() { - log.Warn("task stale due to node offline", zap.Int64("nodeID", action.Node())) - return merr.WrapErrNodeOffline(action.Node()) + log.Warn("task stale due to node offline", zap.Int64("nodeID", targetNode)) + return merr.WrapErrNodeOffline(targetNode) } - if replica != nil && (replica.ContainRONode(action.Node()) || replica.ContainROSQNode(action.Node())) { - log.Warn("task stale due to node becomes ro node", zap.Int64("nodeID", action.Node())) - return merr.WrapErrNodeStateUnexpected(action.Node(), "node becomes ro node") + if replica != nil && (replica.ContainRONode(targetNode) || replica.ContainROSQNode(targetNode)) { + log.Warn("task stale due to node becomes ro node", zap.Int64("nodeID", targetNode)) + return merr.WrapErrNodeStateUnexpected(targetNode, "node becomes ro node") } } } diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index d2cba0b31d..4a93bd876e 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -187,7 +187,8 @@ func (suite *TaskSuite) BeforeTest(suiteName, testName string) { "TestLeaderTaskSet", "TestLeaderTaskRemove", "TestNoExecutor", - "TestTaskStaleByRONode": + "TestTaskStaleByRONode", + "TestLeaderTaskStaleByRONode": suite.meta.PutCollection(suite.ctx, &meta.Collection{ CollectionLoadInfo: &querypb.CollectionLoadInfo{ CollectionID: suite.collection, @@ -2060,6 +2061,156 @@ func (suite *TaskSuite) TestExecutor_MoveSegmentTask() { suite.ErrorContains(moveTask.Err(), "shard leader changed") } +func (suite *TaskSuite) TestLeaderTaskStaleByRONode() { + ctx := context.Background() + leaderNode := int64(1) + workerNode := int64(3) + channel := &datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: Params.CommonCfg.RootCoordDml.GetValue() + "-test", + } + + // Test case 1: LeaderAction should NOT be stale when worker node is RO but leader is RW + // This is the fix for issue #46737: the checkStale should use leaderID instead of Node() for LeaderAction + suite.Run("WorkerRONodeLeaderRW", func() { + // Create replica with worker node (node 3) as RO node, leader node (node 1) as RW + replicaWithRONode := meta.NewReplica(&querypb.Replica{ + ID: suite.replica.GetID(), + CollectionID: suite.collection, + Nodes: []int64{1, 2}, // RW nodes (leader is RW) + RoNodes: []int64{3}, // RO nodes (worker is RO) + ResourceGroup: meta.DefaultResourceGroupName, + }, typeutil.NewUniqueSet(1, 2)) + suite.meta.ReplicaManager.Put(suite.ctx, replicaWithRONode) + + // Set up channel distribution + suite.dist.ChannelDistManager.Update(leaderNode, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: channel.ChannelName, + }, + Node: leaderNode, + Version: 1, + View: &meta.LeaderView{ID: leaderNode, CollectionID: suite.collection, Channel: channel.ChannelName, Status: &querypb.LeaderViewStatus{Serviceable: true}}, + }) + + // Create a LeaderAction that syncs segment from worker (RO node) to leader (RW node) + // leaderID=1 (RW), workerID=3 (RO) + // Before fix: checkStale used action.Node() which returns workerNode (3, RO), causing false stale + // After fix: checkStale uses leaderID (1, RW), task should NOT be stale + task := NewLeaderSegmentTask( + ctx, + WrapIDSource(0), + suite.collection, + replicaWithRONode, + leaderNode, + NewLeaderAction(leaderNode, workerNode, ActionTypeGrow, channel.GetChannelName(), suite.loadSegments[0], 0), + ) + + // Add task should succeed because leader node is RW + err := suite.scheduler.Add(task) + suite.NoError(err) + + // Task should NOT be stale because we check leader node status, not worker node + suite.Equal(TaskStatusStarted, task.Status()) + suite.NoError(task.Err()) + + // Clean up + suite.scheduler.remove(task) + suite.meta.ReplicaManager.Put(suite.ctx, utils.CreateTestReplica(suite.replica.GetID(), suite.collection, []int64{1, 2, 3})) + }) + + // Test case 2: LeaderAction should be stale when leader node becomes RO + suite.Run("LeaderRONode", func() { + // Set up channel distribution with leader on node 1 + suite.dist.ChannelDistManager.Update(leaderNode, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: channel.ChannelName, + }, + Node: leaderNode, + Version: 1, + View: &meta.LeaderView{ID: leaderNode, CollectionID: suite.collection, Channel: channel.ChannelName, Status: &querypb.LeaderViewStatus{Serviceable: true}}, + }) + + // Create task with original replica (all RW nodes) + task := NewLeaderSegmentTask( + ctx, + WrapIDSource(0), + suite.collection, + suite.replica, + leaderNode, + NewLeaderAction(leaderNode, workerNode, ActionTypeGrow, channel.GetChannelName(), suite.loadSegments[0], 0), + ) + + // Add task should succeed + err := suite.scheduler.Add(task) + suite.NoError(err) + + // Now change replica to make leader node (node 1) as RO node + replicaWithLeaderRO := meta.NewReplica(&querypb.Replica{ + ID: suite.replica.GetID(), + CollectionID: suite.collection, + Nodes: []int64{2, 3}, // RW nodes + RoNodes: []int64{1}, // Leader becomes RO + ResourceGroup: meta.DefaultResourceGroupName, + }, typeutil.NewUniqueSet(2, 3)) + suite.meta.ReplicaManager.Put(suite.ctx, replicaWithLeaderRO) + + // Dispatch will trigger promote which calls checkStale + suite.dispatchAndWait(leaderNode) + + // Task should be canceled because leader node is RO + suite.Equal(TaskStatusCanceled, task.Status()) + suite.ErrorContains(task.Err(), "node becomes ro node") + + // Restore original replica + suite.meta.ReplicaManager.Put(suite.ctx, utils.CreateTestReplica(suite.replica.GetID(), suite.collection, []int64{1, 2, 3})) + }) + + // Test case 3: LeaderAction with Reduce type should NOT be affected by RO node + suite.Run("LeaderReduceActionNotAffected", func() { + // Create replica with leader node as RO + replicaWithLeaderRO := meta.NewReplica(&querypb.Replica{ + ID: suite.replica.GetID(), + CollectionID: suite.collection, + Nodes: []int64{2, 3}, // RW nodes + RoNodes: []int64{1}, // Leader is RO + ResourceGroup: meta.DefaultResourceGroupName, + }, typeutil.NewUniqueSet(2, 3)) + suite.meta.ReplicaManager.Put(suite.ctx, replicaWithLeaderRO) + + // Set up channel distribution + suite.dist.ChannelDistManager.Update(leaderNode, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: channel.ChannelName, + }, + Node: leaderNode, + Version: 1, + View: &meta.LeaderView{ID: leaderNode, CollectionID: suite.collection, Channel: channel.ChannelName, Status: &querypb.LeaderViewStatus{Serviceable: true}}, + }) + + // Create LeaderAction with Reduce type + task := NewLeaderSegmentTask( + ctx, + WrapIDSource(0), + suite.collection, + replicaWithLeaderRO, + leaderNode, + NewLeaderAction(leaderNode, workerNode, ActionTypeReduce, channel.GetChannelName(), suite.releaseSegments[0], 0), + ) + + // Add task should succeed because Reduce action is not affected by RO node check + err := suite.scheduler.Add(task) + suite.NoError(err) + + // Clean up + suite.scheduler.remove(task) + suite.meta.ReplicaManager.Put(suite.ctx, utils.CreateTestReplica(suite.replica.GetID(), suite.collection, []int64{1, 2, 3})) + }) +} + func (suite *TaskSuite) TestTaskStaleByRONode() { ctx := context.Background() timeout := 10 * time.Second