fix: unify ro node handling to avoid balance channel task stuck (#46440)

issue: #46393

RO node can be created from two sources: stopping a QueryNode or replica
node transfer (e.g., suspend node). Before this fix, there were two
defects and one constraint that caused a deadlock:

Defects:
1. LeaderChecker does not sync segment distribution to RO nodes
2. Scheduler only cancels tasks on stopping nodes, not RO nodes

Constraint:
- Balance channel task blocks waiting for new delegator to become
serviceable (via sync segment) before executing release action

Deadlock scenario:
When target node becomes RO node (but not stopping) during balance
channel execution, the task gets stuck because:
- Cannot sync segment to RO node (defect 1) -> task blocks
- Task is not cancelled since node is not stopping (defect 2)

PR #45949 attempted to fix defect 1 but was not successful.

This PR unifies RO node handling by:
- LeaderChecker: only sync segment distribution to RW nodes
- Scheduler: cancel task when target node becomes RO node
- Simplify checkStale logic with unified node state checking

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2025-12-24 10:31:19 +08:00 committed by GitHub
parent 48f8b3b585
commit b907f9e7a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 179 additions and 295 deletions

View File

@ -92,13 +92,9 @@ func (c *LeaderChecker) Check(ctx context.Context) []task.Task {
replicas := c.meta.ReplicaManager.GetByCollection(ctx, collectionID)
for _, replica := range replicas {
// note: should enable sync segment distribution to ro node, to avoid balance channel from ro node stucks
nodes := replica.GetNodes()
nodes := replica.GetRWNodes()
if streamingutil.IsStreamingServiceEnabled() {
sqNodes := make([]int64, 0, len(replica.GetROSQNodes())+len(replica.GetRWSQNodes()))
sqNodes = append(sqNodes, replica.GetROSQNodes()...)
sqNodes = append(sqNodes, replica.GetRWSQNodes()...)
nodes = sqNodes
nodes = replica.GetRWSQNodes()
}
for _, node := range nodes {
delegatorList := c.dist.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(node))

View File

@ -291,7 +291,7 @@ func (suite *LeaderCheckerTestSuite) TestStoppingNode() {
observer.meta.ReplicaManager.Put(ctx, mutableReplica.IntoReplica())
tasks := suite.checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Len(tasks, 0)
}
func (suite *LeaderCheckerTestSuite) TestIgnoreSyncLoadedSegments() {

View File

@ -1111,140 +1111,40 @@ func WrapTaskLog(task Task, fields ...zap.Field) []zap.Field {
}
func (scheduler *taskScheduler) checkStale(task Task) error {
switch task := task.(type) {
case *SegmentTask:
if err := scheduler.checkSegmentTaskStale(task); err != nil {
return err
}
log := log.Ctx(task.Context()).With(
zap.String("task", task.String()),
)
case *ChannelTask:
if err := scheduler.checkChannelTaskStale(task); err != nil {
return err
// Get replica, but only fail if we need it for RO node check
// NilReplica (ID=-1) is used for reduce-only tasks like unsubscribe channel
var replica *meta.Replica
if task.ReplicaID() != -1 {
replica = scheduler.meta.ReplicaManager.Get(scheduler.ctx, task.ReplicaID())
if replica == nil {
log.Warn("task stale due to replica not found")
return merr.WrapErrReplicaNotFound(task.ReplicaID())
}
case *LeaderTask:
if err := scheduler.checkLeaderTaskStale(task); err != nil {
return err
}
case *DropIndexTask:
if err := scheduler.checkDropIndexTaskStale(task); err != nil {
return err
}
default:
panic(fmt.Sprintf("checkStale: forget to check task type: %+v", task))
}
for step, action := range task.Actions() {
log := log.With(
zap.Int64("nodeID", action.Node()),
zap.Int("step", step))
if scheduler.nodeMgr.Get(action.Node()) == nil {
log.Warn("the task is stale, the target node is offline", WrapTaskLog(task,
zap.Int64("nodeID", action.Node()),
zap.Int("step", step))...)
for _, action := range task.Actions() {
nodeInfo := scheduler.nodeMgr.Get(action.Node())
if nodeInfo == nil {
log.Warn("task stale due to node not found", zap.Int64("nodeID", action.Node()))
return merr.WrapErrNodeNotFound(action.Node())
}
}
return nil
}
func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error {
for _, action := range task.Actions() {
switch action.Type() {
case ActionTypeGrow:
if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok {
log.Ctx(task.Context()).Warn("task stale due to node offline", WrapTaskLog(task, zap.Int64("segment", task.segmentID))...)
return merr.WrapErrNodeOffline(action.Node())
}
taskType := GetTaskType(task)
segment := scheduler.targetMgr.GetSealedSegment(task.ctx, task.CollectionID(), task.SegmentID(), meta.CurrentTargetFirst)
if segment == nil {
log.Ctx(task.Context()).Warn("task stale due to the segment to load not exists in targets",
WrapTaskLog(task, zap.Int64("segment", task.segmentID),
zap.String("taskType", taskType.String()))...)
return merr.WrapErrSegmentReduplicate(task.SegmentID(), "target doesn't contain this segment")
}
leader := scheduler.getReplicaShardLeader(task.Shard(), task.ReplicaID())
if leader == nil {
log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task)...)
return merr.WrapErrChannelNotFound(segment.GetInsertChannel(), "failed to get shard delegator")
}
case ActionTypeReduce:
// do nothing here
}
}
return nil
}
func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) error {
for _, action := range task.Actions() {
switch action.Type() {
case ActionTypeGrow:
if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok {
log.Ctx(task.Context()).Warn("task stale due to node offline", WrapTaskLog(task, zap.String("channel", task.Channel()))...)
return merr.WrapErrNodeOffline(action.Node())
}
if scheduler.targetMgr.GetDmChannel(task.ctx, task.collectionID, task.Channel(), meta.NextTargetFirst) == nil {
log.Ctx(task.Context()).Warn("the task is stale, the channel to subscribe not exists in targets",
WrapTaskLog(task, zap.String("channel", task.Channel()))...)
return merr.WrapErrChannelReduplicate(task.Channel(), "target doesn't contain this channel")
}
case ActionTypeReduce:
// do nothing here
}
}
return nil
}
func (scheduler *taskScheduler) checkLeaderTaskStale(task *LeaderTask) error {
for _, action := range task.Actions() {
switch action.Type() {
case ActionTypeGrow:
if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.(*LeaderAction).GetLeaderID()); ok {
log.Ctx(task.Context()).Warn("task stale due to node offline",
WrapTaskLog(task, zap.Int64("leaderID", task.leaderID), zap.Int64("segment", task.segmentID))...)
if nodeInfo.IsStoppingState() {
log.Warn("task stale due to node offline", zap.Int64("nodeID", action.Node()))
return merr.WrapErrNodeOffline(action.Node())
}
taskType := GetTaskType(task)
segment := scheduler.targetMgr.GetSealedSegment(task.ctx, task.CollectionID(), task.SegmentID(), meta.CurrentTargetFirst)
if segment == nil {
log.Ctx(task.Context()).Warn("task stale due to the segment to load not exists in targets",
WrapTaskLog(task, zap.Int64("leaderID", task.leaderID),
zap.Int64("segment", task.segmentID),
zap.String("taskType", taskType.String()))...)
return merr.WrapErrSegmentReduplicate(task.SegmentID(), "target doesn't contain this segment")
}
leader := scheduler.getReplicaShardLeader(task.Shard(), task.ReplicaID())
if leader == nil {
log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...)
return merr.WrapErrChannelNotFound(task.Shard(), "failed to get shard delegator")
}
case ActionTypeReduce:
leader := scheduler.getReplicaShardLeader(task.Shard(), task.ReplicaID())
if leader == nil {
log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...)
return merr.WrapErrChannelNotFound(task.Shard(), "failed to get shard delegator")
if replica != nil && (replica.ContainRONode(action.Node()) || replica.ContainROSQNode(action.Node())) {
log.Warn("task stale due to node becomes ro node", zap.Int64("nodeID", action.Node()))
return merr.WrapErrNodeStateUnexpected(action.Node(), "node becomes ro node")
}
}
}
return nil
}
func (scheduler *taskScheduler) checkDropIndexTaskStale(task *DropIndexTask) error {
for _, action := range task.Actions() {
if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok {
log.Ctx(task.Context()).Warn("task stale due to node offline", WrapTaskLog(task, zap.String("channel", task.Shard()))...)
return merr.WrapErrNodeOffline(action.Node())
}
}
return nil
}

View File

@ -190,15 +190,14 @@ func (suite *TaskSuite) BeforeTest(suiteName, testName string) {
"TestLoadSegmentTask",
"TestLoadSegmentTaskNotIndex",
"TestLoadSegmentTaskFailed",
"TestSegmentTaskStale",
"TestTaskCanceled",
"TestMoveSegmentTask",
"TestMoveSegmentTaskStale",
"TestSubmitDuplicateLoadSegmentTask",
"TestSubmitDuplicateSubscribeChannelTask",
"TestLeaderTaskSet",
"TestLeaderTaskRemove",
"TestNoExecutor":
"TestNoExecutor",
"TestTaskStaleByRONode":
suite.meta.PutCollection(suite.ctx, &meta.Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: suite.collection,
@ -975,73 +974,6 @@ func (suite *TaskSuite) TestMoveSegmentTask() {
}
}
func (suite *TaskSuite) TestMoveSegmentTaskStale() {
ctx := context.Background()
timeout := 10 * time.Second
leader := int64(1)
sourceNode := int64(2)
targetNode := int64(3)
channel := &datapb.VchannelInfo{
CollectionID: suite.collection,
ChannelName: Params.CommonCfg.RootCoordDml.GetValue() + "-test",
}
vchannel := &datapb.VchannelInfo{
CollectionID: suite.collection,
ChannelName: channel.ChannelName,
}
suite.dist.ChannelDistManager.Update(leader, &meta.DmChannel{
VchannelInfo: vchannel,
Node: leader,
Version: 1,
View: &meta.LeaderView{
ID: leader,
CollectionID: suite.collection,
Channel: channel.ChannelName,
Status: &querypb.LeaderViewStatus{Serviceable: true},
},
})
view := &meta.LeaderView{
ID: leader,
CollectionID: suite.collection,
Channel: channel.ChannelName,
Segments: make(map[int64]*querypb.SegmentDist),
}
tasks := []Task{}
segmentInfos := make([]*datapb.SegmentInfo, 0)
for _, segment := range suite.moveSegments {
segmentInfos = append(segmentInfos, &datapb.SegmentInfo{
ID: segment,
PartitionID: 1,
InsertChannel: channel.ChannelName,
})
view.Segments[segment] = &querypb.SegmentDist{NodeID: sourceNode, Version: 0}
task, err := NewSegmentTask(
ctx,
timeout,
WrapIDSource(0),
suite.collection,
suite.replica,
commonpb.LoadPriority_LOW,
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
NewSegmentAction(sourceNode, ActionTypeReduce, channel.GetChannelName(), segment),
)
suite.NoError(err)
tasks = append(tasks, task)
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{vchannel}, segmentInfos, nil)
suite.target.UpdateCollectionNextTarget(ctx, suite.collection)
suite.target.UpdateCollectionCurrentTarget(ctx, suite.collection)
for _, task := range tasks {
err := suite.scheduler.Add(task)
suite.Error(err)
suite.Equal(TaskStatusCanceled, task.Status())
suite.Error(task.Err())
}
suite.AssertTaskNum(0, 0, 0, 0)
}
func (suite *TaskSuite) TestTaskCanceled() {
ctx := context.Background()
timeout := 10 * time.Second
@ -1138,104 +1070,6 @@ func (suite *TaskSuite) TestTaskCanceled() {
}
}
func (suite *TaskSuite) TestSegmentTaskStale() {
ctx := context.Background()
timeout := 10 * time.Second
targetNode := int64(3)
partition := int64(100)
channel := &datapb.VchannelInfo{
CollectionID: suite.collection,
ChannelName: Params.CommonCfg.RootCoordDml.GetValue() + "-test",
}
// Expect
suite.broker.EXPECT().DescribeCollection(mock.Anything, suite.collection).RunAndReturn(func(ctx context.Context, i int64) (*milvuspb.DescribeCollectionResponse, error) {
return &milvuspb.DescribeCollectionResponse{
Schema: &schemapb.CollectionSchema{
Name: "TestSegmentTaskStale",
Fields: []*schemapb.FieldSchema{
{FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector},
},
},
}, nil
})
suite.broker.EXPECT().ListIndexes(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{
{
CollectionID: suite.collection,
},
}, nil)
for _, segment := range suite.loadSegments[1:] {
suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return([]*datapb.SegmentInfo{
{
ID: segment,
CollectionID: suite.collection,
PartitionID: partition,
InsertChannel: channel.ChannelName,
},
}, nil)
suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, nil)
}
suite.cluster.EXPECT().LoadSegments(mock.Anything, targetNode, mock.Anything).Return(merr.Success(), nil)
// Test load segment task
suite.dist.ChannelDistManager.Update(targetNode, &meta.DmChannel{
VchannelInfo: &datapb.VchannelInfo{
CollectionID: suite.collection,
ChannelName: channel.ChannelName,
},
Node: targetNode,
Version: 1,
View: &meta.LeaderView{ID: targetNode, CollectionID: suite.collection, Channel: channel.ChannelName, Status: &querypb.LeaderViewStatus{Serviceable: true}},
})
tasks := []Task{}
for _, segment := range suite.loadSegments {
task, err := NewSegmentTask(
ctx,
timeout,
WrapIDSource(0),
suite.collection,
suite.replica,
commonpb.LoadPriority_LOW,
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
)
suite.NoError(err)
tasks = append(tasks, task)
err = suite.scheduler.Add(task)
suite.NoError(err)
}
segments := make([]*datapb.SegmentInfo, 0)
for _, segment := range suite.loadSegments[1:] {
segments = append(segments, &datapb.SegmentInfo{
ID: segment,
PartitionID: 2,
InsertChannel: channel.GetChannelName(),
})
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil)
suite.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(suite.collection, 2))
suite.target.UpdateCollectionNextTarget(ctx, suite.collection)
// process done
suite.dispatchAndWait(targetNode)
suite.AssertTaskNum(1, 0, 0, 1)
// task removed
suite.dispatchAndWait(targetNode)
suite.AssertTaskNum(0, 0, 0, 0)
for i, task := range tasks {
if i == 0 {
suite.Equal(TaskStatusCanceled, task.Status())
suite.Error(task.Err())
} else {
suite.Equal(TaskStatusSucceeded, task.Status())
suite.NoError(task.Err())
}
}
}
func (suite *TaskSuite) TestChannelTaskReplace() {
ctx := context.Background()
timeout := 10 * time.Second
@ -2235,3 +2069,157 @@ func (suite *TaskSuite) TestExecutor_MoveSegmentTask() {
suite.True(moveTask.actions[1].IsFinished(suite.dist))
suite.ErrorContains(moveTask.Err(), "shard leader changed")
}
func (suite *TaskSuite) TestTaskStaleByRONode() {
ctx := context.Background()
timeout := 10 * time.Second
targetNode := int64(3)
channel := &datapb.VchannelInfo{
CollectionID: suite.collection,
ChannelName: Params.CommonCfg.RootCoordDml.GetValue() + "-test",
}
// Test case 1: Task stale due to target node becomes RO node
suite.Run("RONode", func() {
// Set up channel distribution first with RW replica
suite.dist.ChannelDistManager.Update(targetNode, &meta.DmChannel{
VchannelInfo: &datapb.VchannelInfo{
CollectionID: suite.collection,
ChannelName: channel.ChannelName,
},
Node: targetNode,
Version: 1,
View: &meta.LeaderView{ID: targetNode, CollectionID: suite.collection, Channel: channel.ChannelName, Status: &querypb.LeaderViewStatus{Serviceable: true}},
})
// Create task with original replica (all RW nodes)
task, err := NewSegmentTask(
ctx,
timeout,
WrapIDSource(0),
suite.collection,
suite.replica,
commonpb.LoadPriority_LOW,
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), suite.loadSegments[0]),
)
suite.NoError(err)
// Add task should succeed
err = suite.scheduler.Add(task)
suite.NoError(err)
// Now change replica to make node 3 as RO node (simulating node state change)
replicaWithRONode := meta.NewReplica(&querypb.Replica{
ID: suite.replica.GetID(),
CollectionID: suite.collection,
Nodes: []int64{1, 2}, // RW nodes
RoNodes: []int64{3}, // RO node
ResourceGroup: meta.DefaultResourceGroupName,
}, typeutil.NewUniqueSet(1, 2))
suite.meta.ReplicaManager.Put(suite.ctx, replicaWithRONode)
// Dispatch will trigger promote which calls checkStale
suite.dispatchAndWait(targetNode)
// Task should be canceled due to RO node
suite.Equal(TaskStatusCanceled, task.Status())
suite.ErrorContains(task.Err(), "node becomes ro node")
// Restore original replica for other tests
suite.meta.ReplicaManager.Put(suite.ctx, utils.CreateTestReplica(suite.replica.GetID(), suite.collection, []int64{1, 2, 3}))
})
// Test case 2: Task stale due to target node becomes RO SQ node
suite.Run("ROSQNode", func() {
// Set up channel distribution
suite.dist.ChannelDistManager.Update(targetNode, &meta.DmChannel{
VchannelInfo: &datapb.VchannelInfo{
CollectionID: suite.collection,
ChannelName: channel.ChannelName,
},
Node: targetNode,
Version: 1,
View: &meta.LeaderView{ID: targetNode, CollectionID: suite.collection, Channel: channel.ChannelName, Status: &querypb.LeaderViewStatus{Serviceable: true}},
})
// Create task with original replica (all RW nodes)
task, err := NewSegmentTask(
ctx,
timeout,
WrapIDSource(0),
suite.collection,
suite.replica,
commonpb.LoadPriority_LOW,
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), suite.loadSegments[1]),
)
suite.NoError(err)
// Add task should succeed
err = suite.scheduler.Add(task)
suite.NoError(err)
// Now change replica to make node 3 as RO SQ node
replicaWithROSQNode := meta.NewReplica(&querypb.Replica{
ID: suite.replica.GetID(),
CollectionID: suite.collection,
Nodes: []int64{1, 2}, // RW nodes
RoSqNodes: []int64{3}, // RO SQ node
ResourceGroup: meta.DefaultResourceGroupName,
}, typeutil.NewUniqueSet(1, 2))
suite.meta.ReplicaManager.Put(suite.ctx, replicaWithROSQNode)
// Dispatch will trigger promote which calls checkStale
suite.dispatchAndWait(targetNode)
// Task should be canceled due to RO SQ node
suite.Equal(TaskStatusCanceled, task.Status())
suite.ErrorContains(task.Err(), "node becomes ro node")
// Restore original replica
suite.meta.ReplicaManager.Put(suite.ctx, utils.CreateTestReplica(suite.replica.GetID(), suite.collection, []int64{1, 2, 3}))
})
// Test case 3: ActionTypeReduce should not be affected by RO node
suite.Run("ReduceActionNotAffected", func() {
// Create replica with node 3 as RO node
replicaWithRONode := meta.NewReplica(&querypb.Replica{
ID: suite.replica.GetID(),
CollectionID: suite.collection,
Nodes: []int64{1, 2}, // RW nodes
RoNodes: []int64{3}, // RO node
ResourceGroup: meta.DefaultResourceGroupName,
}, typeutil.NewUniqueSet(1, 2))
suite.meta.ReplicaManager.Put(suite.ctx, replicaWithRONode)
// Set up channel distribution
suite.dist.ChannelDistManager.Update(targetNode, &meta.DmChannel{
VchannelInfo: &datapb.VchannelInfo{
CollectionID: suite.collection,
ChannelName: channel.ChannelName,
},
Node: targetNode,
Version: 1,
View: &meta.LeaderView{ID: targetNode, CollectionID: suite.collection, Channel: channel.ChannelName, Status: &querypb.LeaderViewStatus{Serviceable: true}},
})
// Create a segment task with Reduce action targeting the RO node
task, err := NewSegmentTask(
ctx,
timeout,
WrapIDSource(0),
suite.collection,
replicaWithRONode,
commonpb.LoadPriority_LOW,
NewSegmentAction(targetNode, ActionTypeReduce, channel.GetChannelName(), suite.releaseSegments[0]),
)
suite.NoError(err)
// Add task should succeed because Reduce action is not affected by RO node check
err = suite.scheduler.Add(task)
suite.NoError(err)
// Clean up
suite.scheduler.remove(task)
suite.meta.ReplicaManager.Put(suite.ctx, utils.CreateTestReplica(suite.replica.GetID(), suite.collection, []int64{1, 2, 3}))
})
}