From b907f9e7a80fb2d72026c139124c2304ccc4bee0 Mon Sep 17 00:00:00 2001 From: wei liu Date: Wed, 24 Dec 2025 10:31:19 +0800 Subject: [PATCH] fix: unify ro node handling to avoid balance channel task stuck (#46440) issue: #46393 RO node can be created from two sources: stopping a QueryNode or replica node transfer (e.g., suspend node). Before this fix, there were two defects and one constraint that caused a deadlock: Defects: 1. LeaderChecker does not sync segment distribution to RO nodes 2. Scheduler only cancels tasks on stopping nodes, not RO nodes Constraint: - Balance channel task blocks waiting for new delegator to become serviceable (via sync segment) before executing release action Deadlock scenario: When target node becomes RO node (but not stopping) during balance channel execution, the task gets stuck because: - Cannot sync segment to RO node (defect 1) -> task blocks - Task is not cancelled since node is not stopping (defect 2) PR #45949 attempted to fix defect 1 but was not successful. This PR unifies RO node handling by: - LeaderChecker: only sync segment distribution to RW nodes - Scheduler: cancel task when target node becomes RO node - Simplify checkStale logic with unified node state checking Signed-off-by: Wei Liu --- .../querycoordv2/checkers/leader_checker.go | 8 +- .../checkers/leader_checker_test.go | 2 +- internal/querycoordv2/task/scheduler.go | 140 ++------ internal/querycoordv2/task/task_test.go | 324 +++++++++--------- 4 files changed, 179 insertions(+), 295 deletions(-) diff --git a/internal/querycoordv2/checkers/leader_checker.go b/internal/querycoordv2/checkers/leader_checker.go index 68afabab2b..57ed269415 100644 --- a/internal/querycoordv2/checkers/leader_checker.go +++ b/internal/querycoordv2/checkers/leader_checker.go @@ -92,13 +92,9 @@ func (c *LeaderChecker) Check(ctx context.Context) []task.Task { replicas := c.meta.ReplicaManager.GetByCollection(ctx, collectionID) for _, replica := range replicas { - // note: should enable sync segment distribution to ro node, to avoid balance channel from ro node stucks - nodes := replica.GetNodes() + nodes := replica.GetRWNodes() if streamingutil.IsStreamingServiceEnabled() { - sqNodes := make([]int64, 0, len(replica.GetROSQNodes())+len(replica.GetRWSQNodes())) - sqNodes = append(sqNodes, replica.GetROSQNodes()...) - sqNodes = append(sqNodes, replica.GetRWSQNodes()...) - nodes = sqNodes + nodes = replica.GetRWSQNodes() } for _, node := range nodes { delegatorList := c.dist.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(node)) diff --git a/internal/querycoordv2/checkers/leader_checker_test.go b/internal/querycoordv2/checkers/leader_checker_test.go index 32fe8453d2..38a0dbfe38 100644 --- a/internal/querycoordv2/checkers/leader_checker_test.go +++ b/internal/querycoordv2/checkers/leader_checker_test.go @@ -291,7 +291,7 @@ func (suite *LeaderCheckerTestSuite) TestStoppingNode() { observer.meta.ReplicaManager.Put(ctx, mutableReplica.IntoReplica()) tasks := suite.checker.Check(context.TODO()) - suite.Len(tasks, 1) + suite.Len(tasks, 0) } func (suite *LeaderCheckerTestSuite) TestIgnoreSyncLoadedSegments() { diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index fcff28fef2..67a4ec0b62 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -1111,140 +1111,40 @@ func WrapTaskLog(task Task, fields ...zap.Field) []zap.Field { } func (scheduler *taskScheduler) checkStale(task Task) error { - switch task := task.(type) { - case *SegmentTask: - if err := scheduler.checkSegmentTaskStale(task); err != nil { - return err - } + log := log.Ctx(task.Context()).With( + zap.String("task", task.String()), + ) - case *ChannelTask: - if err := scheduler.checkChannelTaskStale(task); err != nil { - return err + // Get replica, but only fail if we need it for RO node check + // NilReplica (ID=-1) is used for reduce-only tasks like unsubscribe channel + var replica *meta.Replica + if task.ReplicaID() != -1 { + replica = scheduler.meta.ReplicaManager.Get(scheduler.ctx, task.ReplicaID()) + if replica == nil { + log.Warn("task stale due to replica not found") + return merr.WrapErrReplicaNotFound(task.ReplicaID()) } - - case *LeaderTask: - if err := scheduler.checkLeaderTaskStale(task); err != nil { - return err - } - - case *DropIndexTask: - if err := scheduler.checkDropIndexTaskStale(task); err != nil { - return err - } - default: - panic(fmt.Sprintf("checkStale: forget to check task type: %+v", task)) } - for step, action := range task.Actions() { - log := log.With( - zap.Int64("nodeID", action.Node()), - zap.Int("step", step)) - - if scheduler.nodeMgr.Get(action.Node()) == nil { - log.Warn("the task is stale, the target node is offline", WrapTaskLog(task, - zap.Int64("nodeID", action.Node()), - zap.Int("step", step))...) + for _, action := range task.Actions() { + nodeInfo := scheduler.nodeMgr.Get(action.Node()) + if nodeInfo == nil { + log.Warn("task stale due to node not found", zap.Int64("nodeID", action.Node())) return merr.WrapErrNodeNotFound(action.Node()) } - } - - return nil -} - -func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error { - for _, action := range task.Actions() { switch action.Type() { case ActionTypeGrow: - if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok { - log.Ctx(task.Context()).Warn("task stale due to node offline", WrapTaskLog(task, zap.Int64("segment", task.segmentID))...) - return merr.WrapErrNodeOffline(action.Node()) - } - taskType := GetTaskType(task) - segment := scheduler.targetMgr.GetSealedSegment(task.ctx, task.CollectionID(), task.SegmentID(), meta.CurrentTargetFirst) - if segment == nil { - log.Ctx(task.Context()).Warn("task stale due to the segment to load not exists in targets", - WrapTaskLog(task, zap.Int64("segment", task.segmentID), - zap.String("taskType", taskType.String()))...) - return merr.WrapErrSegmentReduplicate(task.SegmentID(), "target doesn't contain this segment") - } - - leader := scheduler.getReplicaShardLeader(task.Shard(), task.ReplicaID()) - if leader == nil { - log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task)...) - return merr.WrapErrChannelNotFound(segment.GetInsertChannel(), "failed to get shard delegator") - } - - case ActionTypeReduce: - // do nothing here - } - } - return nil -} - -func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) error { - for _, action := range task.Actions() { - switch action.Type() { - case ActionTypeGrow: - if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok { - log.Ctx(task.Context()).Warn("task stale due to node offline", WrapTaskLog(task, zap.String("channel", task.Channel()))...) - return merr.WrapErrNodeOffline(action.Node()) - } - if scheduler.targetMgr.GetDmChannel(task.ctx, task.collectionID, task.Channel(), meta.NextTargetFirst) == nil { - log.Ctx(task.Context()).Warn("the task is stale, the channel to subscribe not exists in targets", - WrapTaskLog(task, zap.String("channel", task.Channel()))...) - return merr.WrapErrChannelReduplicate(task.Channel(), "target doesn't contain this channel") - } - - case ActionTypeReduce: - // do nothing here - } - } - return nil -} - -func (scheduler *taskScheduler) checkLeaderTaskStale(task *LeaderTask) error { - for _, action := range task.Actions() { - switch action.Type() { - case ActionTypeGrow: - if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.(*LeaderAction).GetLeaderID()); ok { - log.Ctx(task.Context()).Warn("task stale due to node offline", - WrapTaskLog(task, zap.Int64("leaderID", task.leaderID), zap.Int64("segment", task.segmentID))...) + if nodeInfo.IsStoppingState() { + log.Warn("task stale due to node offline", zap.Int64("nodeID", action.Node())) return merr.WrapErrNodeOffline(action.Node()) } - taskType := GetTaskType(task) - segment := scheduler.targetMgr.GetSealedSegment(task.ctx, task.CollectionID(), task.SegmentID(), meta.CurrentTargetFirst) - if segment == nil { - log.Ctx(task.Context()).Warn("task stale due to the segment to load not exists in targets", - WrapTaskLog(task, zap.Int64("leaderID", task.leaderID), - zap.Int64("segment", task.segmentID), - zap.String("taskType", taskType.String()))...) - return merr.WrapErrSegmentReduplicate(task.SegmentID(), "target doesn't contain this segment") - } - - leader := scheduler.getReplicaShardLeader(task.Shard(), task.ReplicaID()) - if leader == nil { - log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...) - return merr.WrapErrChannelNotFound(task.Shard(), "failed to get shard delegator") - } - - case ActionTypeReduce: - leader := scheduler.getReplicaShardLeader(task.Shard(), task.ReplicaID()) - if leader == nil { - log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...) - return merr.WrapErrChannelNotFound(task.Shard(), "failed to get shard delegator") + if replica != nil && (replica.ContainRONode(action.Node()) || replica.ContainROSQNode(action.Node())) { + log.Warn("task stale due to node becomes ro node", zap.Int64("nodeID", action.Node())) + return merr.WrapErrNodeStateUnexpected(action.Node(), "node becomes ro node") } } } - return nil -} -func (scheduler *taskScheduler) checkDropIndexTaskStale(task *DropIndexTask) error { - for _, action := range task.Actions() { - if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok { - log.Ctx(task.Context()).Warn("task stale due to node offline", WrapTaskLog(task, zap.String("channel", task.Shard()))...) - return merr.WrapErrNodeOffline(action.Node()) - } - } return nil } diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index b7b1b7abd5..dd3ec16893 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -190,15 +190,14 @@ func (suite *TaskSuite) BeforeTest(suiteName, testName string) { "TestLoadSegmentTask", "TestLoadSegmentTaskNotIndex", "TestLoadSegmentTaskFailed", - "TestSegmentTaskStale", "TestTaskCanceled", "TestMoveSegmentTask", - "TestMoveSegmentTaskStale", "TestSubmitDuplicateLoadSegmentTask", "TestSubmitDuplicateSubscribeChannelTask", "TestLeaderTaskSet", "TestLeaderTaskRemove", - "TestNoExecutor": + "TestNoExecutor", + "TestTaskStaleByRONode": suite.meta.PutCollection(suite.ctx, &meta.Collection{ CollectionLoadInfo: &querypb.CollectionLoadInfo{ CollectionID: suite.collection, @@ -975,73 +974,6 @@ func (suite *TaskSuite) TestMoveSegmentTask() { } } -func (suite *TaskSuite) TestMoveSegmentTaskStale() { - ctx := context.Background() - timeout := 10 * time.Second - leader := int64(1) - sourceNode := int64(2) - targetNode := int64(3) - channel := &datapb.VchannelInfo{ - CollectionID: suite.collection, - ChannelName: Params.CommonCfg.RootCoordDml.GetValue() + "-test", - } - - vchannel := &datapb.VchannelInfo{ - CollectionID: suite.collection, - ChannelName: channel.ChannelName, - } - suite.dist.ChannelDistManager.Update(leader, &meta.DmChannel{ - VchannelInfo: vchannel, - Node: leader, - Version: 1, - View: &meta.LeaderView{ - ID: leader, - CollectionID: suite.collection, - Channel: channel.ChannelName, - Status: &querypb.LeaderViewStatus{Serviceable: true}, - }, - }) - view := &meta.LeaderView{ - ID: leader, - CollectionID: suite.collection, - Channel: channel.ChannelName, - Segments: make(map[int64]*querypb.SegmentDist), - } - tasks := []Task{} - segmentInfos := make([]*datapb.SegmentInfo, 0) - for _, segment := range suite.moveSegments { - segmentInfos = append(segmentInfos, &datapb.SegmentInfo{ - ID: segment, - PartitionID: 1, - InsertChannel: channel.ChannelName, - }) - view.Segments[segment] = &querypb.SegmentDist{NodeID: sourceNode, Version: 0} - - task, err := NewSegmentTask( - ctx, - timeout, - WrapIDSource(0), - suite.collection, - suite.replica, - commonpb.LoadPriority_LOW, - NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment), - NewSegmentAction(sourceNode, ActionTypeReduce, channel.GetChannelName(), segment), - ) - suite.NoError(err) - tasks = append(tasks, task) - } - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{vchannel}, segmentInfos, nil) - suite.target.UpdateCollectionNextTarget(ctx, suite.collection) - suite.target.UpdateCollectionCurrentTarget(ctx, suite.collection) - for _, task := range tasks { - err := suite.scheduler.Add(task) - suite.Error(err) - suite.Equal(TaskStatusCanceled, task.Status()) - suite.Error(task.Err()) - } - suite.AssertTaskNum(0, 0, 0, 0) -} - func (suite *TaskSuite) TestTaskCanceled() { ctx := context.Background() timeout := 10 * time.Second @@ -1138,104 +1070,6 @@ func (suite *TaskSuite) TestTaskCanceled() { } } -func (suite *TaskSuite) TestSegmentTaskStale() { - ctx := context.Background() - timeout := 10 * time.Second - targetNode := int64(3) - partition := int64(100) - channel := &datapb.VchannelInfo{ - CollectionID: suite.collection, - ChannelName: Params.CommonCfg.RootCoordDml.GetValue() + "-test", - } - - // Expect - suite.broker.EXPECT().DescribeCollection(mock.Anything, suite.collection).RunAndReturn(func(ctx context.Context, i int64) (*milvuspb.DescribeCollectionResponse, error) { - return &milvuspb.DescribeCollectionResponse{ - Schema: &schemapb.CollectionSchema{ - Name: "TestSegmentTaskStale", - Fields: []*schemapb.FieldSchema{ - {FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector}, - }, - }, - }, nil - }) - suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ - { - CollectionID: suite.collection, - }, - }, nil) - for _, segment := range suite.loadSegments[1:] { - suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return([]*datapb.SegmentInfo{ - { - ID: segment, - CollectionID: suite.collection, - PartitionID: partition, - InsertChannel: channel.ChannelName, - }, - }, nil) - suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, nil) - } - suite.cluster.EXPECT().LoadSegments(mock.Anything, targetNode, mock.Anything).Return(merr.Success(), nil) - - // Test load segment task - suite.dist.ChannelDistManager.Update(targetNode, &meta.DmChannel{ - VchannelInfo: &datapb.VchannelInfo{ - CollectionID: suite.collection, - ChannelName: channel.ChannelName, - }, - Node: targetNode, - Version: 1, - View: &meta.LeaderView{ID: targetNode, CollectionID: suite.collection, Channel: channel.ChannelName, Status: &querypb.LeaderViewStatus{Serviceable: true}}, - }) - tasks := []Task{} - for _, segment := range suite.loadSegments { - task, err := NewSegmentTask( - ctx, - timeout, - WrapIDSource(0), - suite.collection, - suite.replica, - commonpb.LoadPriority_LOW, - NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment), - ) - suite.NoError(err) - tasks = append(tasks, task) - err = suite.scheduler.Add(task) - suite.NoError(err) - } - - segments := make([]*datapb.SegmentInfo, 0) - for _, segment := range suite.loadSegments[1:] { - segments = append(segments, &datapb.SegmentInfo{ - ID: segment, - PartitionID: 2, - InsertChannel: channel.GetChannelName(), - }) - } - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil) - - suite.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(suite.collection, 2)) - suite.target.UpdateCollectionNextTarget(ctx, suite.collection) - - // process done - suite.dispatchAndWait(targetNode) - suite.AssertTaskNum(1, 0, 0, 1) - - // task removed - suite.dispatchAndWait(targetNode) - suite.AssertTaskNum(0, 0, 0, 0) - - for i, task := range tasks { - if i == 0 { - suite.Equal(TaskStatusCanceled, task.Status()) - suite.Error(task.Err()) - } else { - suite.Equal(TaskStatusSucceeded, task.Status()) - suite.NoError(task.Err()) - } - } -} - func (suite *TaskSuite) TestChannelTaskReplace() { ctx := context.Background() timeout := 10 * time.Second @@ -2235,3 +2069,157 @@ func (suite *TaskSuite) TestExecutor_MoveSegmentTask() { suite.True(moveTask.actions[1].IsFinished(suite.dist)) suite.ErrorContains(moveTask.Err(), "shard leader changed") } + +func (suite *TaskSuite) TestTaskStaleByRONode() { + ctx := context.Background() + timeout := 10 * time.Second + targetNode := int64(3) + channel := &datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: Params.CommonCfg.RootCoordDml.GetValue() + "-test", + } + + // Test case 1: Task stale due to target node becomes RO node + suite.Run("RONode", func() { + // Set up channel distribution first with RW replica + suite.dist.ChannelDistManager.Update(targetNode, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: channel.ChannelName, + }, + Node: targetNode, + Version: 1, + View: &meta.LeaderView{ID: targetNode, CollectionID: suite.collection, Channel: channel.ChannelName, Status: &querypb.LeaderViewStatus{Serviceable: true}}, + }) + + // Create task with original replica (all RW nodes) + task, err := NewSegmentTask( + ctx, + timeout, + WrapIDSource(0), + suite.collection, + suite.replica, + commonpb.LoadPriority_LOW, + NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), suite.loadSegments[0]), + ) + suite.NoError(err) + + // Add task should succeed + err = suite.scheduler.Add(task) + suite.NoError(err) + + // Now change replica to make node 3 as RO node (simulating node state change) + replicaWithRONode := meta.NewReplica(&querypb.Replica{ + ID: suite.replica.GetID(), + CollectionID: suite.collection, + Nodes: []int64{1, 2}, // RW nodes + RoNodes: []int64{3}, // RO node + ResourceGroup: meta.DefaultResourceGroupName, + }, typeutil.NewUniqueSet(1, 2)) + suite.meta.ReplicaManager.Put(suite.ctx, replicaWithRONode) + + // Dispatch will trigger promote which calls checkStale + suite.dispatchAndWait(targetNode) + + // Task should be canceled due to RO node + suite.Equal(TaskStatusCanceled, task.Status()) + suite.ErrorContains(task.Err(), "node becomes ro node") + + // Restore original replica for other tests + suite.meta.ReplicaManager.Put(suite.ctx, utils.CreateTestReplica(suite.replica.GetID(), suite.collection, []int64{1, 2, 3})) + }) + + // Test case 2: Task stale due to target node becomes RO SQ node + suite.Run("ROSQNode", func() { + // Set up channel distribution + suite.dist.ChannelDistManager.Update(targetNode, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: channel.ChannelName, + }, + Node: targetNode, + Version: 1, + View: &meta.LeaderView{ID: targetNode, CollectionID: suite.collection, Channel: channel.ChannelName, Status: &querypb.LeaderViewStatus{Serviceable: true}}, + }) + + // Create task with original replica (all RW nodes) + task, err := NewSegmentTask( + ctx, + timeout, + WrapIDSource(0), + suite.collection, + suite.replica, + commonpb.LoadPriority_LOW, + NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), suite.loadSegments[1]), + ) + suite.NoError(err) + + // Add task should succeed + err = suite.scheduler.Add(task) + suite.NoError(err) + + // Now change replica to make node 3 as RO SQ node + replicaWithROSQNode := meta.NewReplica(&querypb.Replica{ + ID: suite.replica.GetID(), + CollectionID: suite.collection, + Nodes: []int64{1, 2}, // RW nodes + RoSqNodes: []int64{3}, // RO SQ node + ResourceGroup: meta.DefaultResourceGroupName, + }, typeutil.NewUniqueSet(1, 2)) + suite.meta.ReplicaManager.Put(suite.ctx, replicaWithROSQNode) + + // Dispatch will trigger promote which calls checkStale + suite.dispatchAndWait(targetNode) + + // Task should be canceled due to RO SQ node + suite.Equal(TaskStatusCanceled, task.Status()) + suite.ErrorContains(task.Err(), "node becomes ro node") + + // Restore original replica + suite.meta.ReplicaManager.Put(suite.ctx, utils.CreateTestReplica(suite.replica.GetID(), suite.collection, []int64{1, 2, 3})) + }) + + // Test case 3: ActionTypeReduce should not be affected by RO node + suite.Run("ReduceActionNotAffected", func() { + // Create replica with node 3 as RO node + replicaWithRONode := meta.NewReplica(&querypb.Replica{ + ID: suite.replica.GetID(), + CollectionID: suite.collection, + Nodes: []int64{1, 2}, // RW nodes + RoNodes: []int64{3}, // RO node + ResourceGroup: meta.DefaultResourceGroupName, + }, typeutil.NewUniqueSet(1, 2)) + suite.meta.ReplicaManager.Put(suite.ctx, replicaWithRONode) + + // Set up channel distribution + suite.dist.ChannelDistManager.Update(targetNode, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: channel.ChannelName, + }, + Node: targetNode, + Version: 1, + View: &meta.LeaderView{ID: targetNode, CollectionID: suite.collection, Channel: channel.ChannelName, Status: &querypb.LeaderViewStatus{Serviceable: true}}, + }) + + // Create a segment task with Reduce action targeting the RO node + task, err := NewSegmentTask( + ctx, + timeout, + WrapIDSource(0), + suite.collection, + replicaWithRONode, + commonpb.LoadPriority_LOW, + NewSegmentAction(targetNode, ActionTypeReduce, channel.GetChannelName(), suite.releaseSegments[0]), + ) + suite.NoError(err) + + // Add task should succeed because Reduce action is not affected by RO node check + err = suite.scheduler.Add(task) + suite.NoError(err) + + // Clean up + suite.scheduler.remove(task) + suite.meta.ReplicaManager.Put(suite.ctx, utils.CreateTestReplica(suite.replica.GetID(), suite.collection, []int64{1, 2, 3})) + }) +}