fix: Prevent target update blocking when replica lacks nodes during scaling (#46088)

issue: #46087
The previous implementation checked if the total number of ready
delegators >= replicaNum per channel. This could cause target updates to
block indefinitely when dynamically increasing replicas, because some
replicas might lack nodes while the total count still met the threshold.

This change switches to a replica-based check approach:
- Iterate through each replica individually
- For each replica, verify all channels have at least one ready
delegator
- Only sync delegators from fully ready replicas
- Skip replicas that are not ready (e.g., missing nodes for some
channels)

This ensures target updates can proceed with ready replicas while
replicas that lack nodes during dynamic scaling are gracefully skipped.

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2025-12-11 17:09:14 +08:00 committed by GitHub
parent 224a7943ad
commit a195c33b71
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 324 additions and 28 deletions

View File

@ -415,39 +415,62 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
}
newVersion := ob.targetMgr.GetCollectionTargetVersion(ctx, collectionID, meta.NextTarget)
collReadyDelegatorList := make([]*meta.DmChannel, 0)
for channel := range channelNames {
delegatorList := ob.distMgr.ChannelDistManager.GetByFilter(meta.WithChannelName2Channel(channel))
chReadyDelegatorList := lo.Filter(delegatorList, func(ch *meta.DmChannel, _ int) bool {
err := utils.CheckDelegatorDataReady(ob.nodeMgr, ob.targetMgr, ch.View, meta.NextTarget)
dataReadyForNextTarget := err == nil
if !dataReadyForNextTarget {
log.Info("check delegator",
zap.Int64("collectionID", collectionID),
zap.String("channelName", channel),
zap.Int64("targetVersion", ch.View.TargetVersion),
zap.Int64("newTargetVersion", newVersion),
zap.Bool("isServiceable", ch.IsServiceable()),
zap.Int64("nodeID", ch.Node),
zap.Int64("version", ch.Version),
zap.Error(err),
)
}
return (newVersion == ch.View.TargetVersion && ch.IsServiceable()) || dataReadyForNextTarget
})
// to avoid stuck here in dynamic increase replica case, we just check available delegator number
if int32(len(chReadyDelegatorList)) < replicaNum {
log.RatedInfo(10, "channel not ready",
zap.Int("readyReplicaNum", len(chReadyDelegatorList)),
zap.String("channelName", channel),
// checkDelegatorDataReady checks if a delegator is ready for the next target.
// A delegator is considered ready if:
// 1. Its target version matches the new version and it is serviceable, OR
// 2. Its data is ready for the next target (all segments and channels are loaded)
checkDelegatorDataReady := func(replica *meta.Replica, channel *meta.DmChannel) bool {
err := utils.CheckDelegatorDataReady(ob.nodeMgr, ob.targetMgr, channel.View, meta.NextTarget)
dataReadyForNextTarget := err == nil
if !dataReadyForNextTarget {
log.Info("check delegator",
zap.Int64("collectionID", collectionID),
zap.Int64("replicaID", replica.GetID()),
zap.Int64("nodeID", channel.Node),
zap.String("channelName", channel.GetChannelName()),
zap.Int64("targetVersion", channel.View.TargetVersion),
zap.Int64("newTargetVersion", newVersion),
zap.Bool("isServiceable", channel.IsServiceable()),
zap.Int64("version", channel.Version),
zap.Error(err),
)
return false
}
collReadyDelegatorList = append(collReadyDelegatorList, chReadyDelegatorList...)
return (newVersion == channel.View.TargetVersion && channel.IsServiceable()) || dataReadyForNextTarget
}
return ob.syncNextTargetToDelegator(ctx, collectionID, collReadyDelegatorList, newVersion)
// Iterate through each replica to check if all its delegators are ready.
// this approach ensures each replica has at least one ready delegator for every channel.
// This prevents the issue where some replicas may lack nodes during dynamic replica scaling,
// while the total count still meets the threshold.
readyDelegatorsInCollection := make([]*meta.DmChannel, 0)
replicas := ob.meta.ReplicaManager.GetByCollection(ctx, collectionID)
for _, replica := range replicas {
readyDelegatorsInReplica := make([]*meta.DmChannel, 0)
for channel := range channelNames {
// Filter delegators by replica to ensure we only check delegators belonging to this replica
delegatorList := ob.distMgr.ChannelDistManager.GetByFilter(meta.WithReplica2Channel(replica), meta.WithChannelName2Channel(channel))
readyDelegatorsInChannel := lo.Filter(delegatorList, func(ch *meta.DmChannel, _ int) bool {
return checkDelegatorDataReady(replica, ch)
})
if len(readyDelegatorsInChannel) > 0 {
readyDelegatorsInReplica = append(readyDelegatorsInReplica, readyDelegatorsInChannel...)
}
}
readyDelegatorsInCollection = append(readyDelegatorsInCollection, readyDelegatorsInReplica...)
}
// If no ready delegators exist in any replica, don't update current target
if len(readyDelegatorsInCollection) == 0 {
log.RatedInfo(10, "no ready delegators in any replica",
zap.Int64("collectionID", collectionID),
zap.Int("replicaCount", len(replicas)),
)
return false
}
return ob.syncNextTargetToDelegator(ctx, collectionID, readyDelegatorsInCollection, newVersion)
}
// sync next target info to delegator as readable snapshot

View File

@ -22,6 +22,7 @@ import (
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
@ -29,6 +30,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
@ -146,6 +148,8 @@ func (suite *TargetObserverSuite) SetupTest() {
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(suite.nextTargetChannels, suite.nextTargetSegments, nil)
suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
suite.observer.Start()
}
@ -364,6 +368,275 @@ func (s *TargetObserverCheckSuite) TestCheck() {
s.True(s.observer.loadingDispatcher.tasks.Contain(s.collectionID))
}
// TestShouldUpdateCurrentTarget_EmptyNextTarget tests when next target is empty
func TestShouldUpdateCurrentTarget_EmptyNextTarget(t *testing.T) {
paramtable.Init()
ctx := context.Background()
collectionID := int64(1000)
nodeMgr := session.NewNodeManager()
targetMgr := meta.NewMockTargetManager(t)
distMgr := meta.NewDistributionManager(nodeMgr)
broker := meta.NewMockBroker(t)
cluster := session.NewMockCluster(t)
// Use a minimal meta without CollectionManager since we only test targetMgr behavior
metaInstance := &meta.Meta{
CollectionManager: meta.NewCollectionManager(nil),
}
observer := NewTargetObserver(metaInstance, targetMgr, distMgr, broker, cluster, nodeMgr)
// Return empty channels to simulate empty next target
targetMgr.EXPECT().GetDmChannelsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(map[string]*meta.DmChannel{}).Maybe()
result := observer.shouldUpdateCurrentTarget(ctx, collectionID)
assert.False(t, result)
}
// TestShouldUpdateCurrentTarget_ReplicaReadiness tests the replica-based readiness check
func TestShouldUpdateCurrentTarget_ReplicaReadiness(t *testing.T) {
paramtable.Init()
ctx := context.Background()
collectionID := int64(1000)
nodeMgr := session.NewNodeManager()
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 1}))
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 2}))
targetMgr := meta.NewMockTargetManager(t)
distMgr := meta.NewDistributionManager(nodeMgr)
broker := meta.NewMockBroker(t)
cluster := session.NewMockCluster(t)
// Create mock replicas
replica1 := meta.NewMockReplica(t)
replica1.EXPECT().GetID().Return(int64(1)).Maybe()
replica1.EXPECT().GetCollectionID().Return(collectionID).Maybe()
replica1.EXPECT().GetNodes().Return([]int64{1}).Maybe()
replica1.EXPECT().Contains(int64(1)).Return(true).Maybe()
replica1.EXPECT().Contains(int64(2)).Return(false).Maybe()
replica2 := meta.NewMockReplica(t)
replica2.EXPECT().GetID().Return(int64(2)).Maybe()
replica2.EXPECT().GetCollectionID().Return(collectionID).Maybe()
replica2.EXPECT().GetNodes().Return([]int64{2}).Maybe()
replica2.EXPECT().Contains(int64(1)).Return(false).Maybe()
replica2.EXPECT().Contains(int64(2)).Return(true).Maybe()
// Create mock ReplicaManager
replicaMgr := meta.NewReplicaManager(nil, nil)
metaInstance := &meta.Meta{
CollectionManager: meta.NewCollectionManager(nil),
ReplicaManager: replicaMgr,
}
observer := NewTargetObserver(metaInstance, targetMgr, distMgr, broker, cluster, nodeMgr)
// Setup mock expectations
channelNames := map[string]*meta.DmChannel{
"channel-1": {
VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: "channel-1"},
},
"channel-2": {
VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: "channel-2"},
},
}
newVersion := int64(100)
targetMgr.EXPECT().GetDmChannelsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(channelNames).Maybe()
targetMgr.EXPECT().GetCollectionTargetVersion(mock.Anything, collectionID, meta.NextTarget).Return(newVersion).Maybe()
broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
// Test case: replica1 (node1) has both channels ready
distMgr.ChannelDistManager.Update(1, &meta.DmChannel{
VchannelInfo: &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: "channel-1",
},
Node: 1,
View: &meta.LeaderView{
ID: 1,
CollectionID: collectionID,
Channel: "channel-1",
TargetVersion: newVersion,
Segments: map[int64]*querypb.SegmentDist{
11: {NodeID: 1},
},
},
}, &meta.DmChannel{
VchannelInfo: &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: "channel-2",
},
Node: 1,
View: &meta.LeaderView{
ID: 1,
CollectionID: collectionID,
Channel: "channel-2",
TargetVersion: newVersion,
Segments: map[int64]*querypb.SegmentDist{
12: {NodeID: 1},
},
},
})
// replica2 (node2) only has channel-1, missing channel-2
// This simulates the "replica lack of nodes" scenario
distMgr.ChannelDistManager.Update(2, &meta.DmChannel{
VchannelInfo: &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: "channel-1",
},
Node: 2,
View: &meta.LeaderView{
ID: 2,
CollectionID: collectionID,
Channel: "channel-1",
TargetVersion: newVersion,
Segments: map[int64]*querypb.SegmentDist{
11: {NodeID: 2},
},
},
})
// With new implementation:
// - replica1 is ready (has both channels)
// - replica2 is NOT ready (missing channel-2)
// Since ReplicaManager.GetByCollection returns empty (no replicas in the mock manager),
// readyDelegatorsInCollection will be empty, and shouldUpdateCurrentTarget returns false.
result := observer.shouldUpdateCurrentTarget(ctx, collectionID)
assert.False(t, result)
}
// TestShouldUpdateCurrentTarget_OnlyReadyDelegatorsSynced verifies that only ready delegators
// are included in the sync operation. This test specifically validates the fix for the bug where
// all delegators (including non-ready ones) were being added to readyDelegatorsInReplica.
func TestShouldUpdateCurrentTarget_OnlyReadyDelegatorsSynced(t *testing.T) {
paramtable.Init()
ctx := context.Background()
collectionID := int64(1000)
newVersion := int64(100)
nodeMgr := session.NewNodeManager()
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 1}))
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 2}))
targetMgr := meta.NewMockTargetManager(t)
distMgr := meta.NewDistributionManager(nodeMgr)
broker := meta.NewMockBroker(t)
cluster := session.NewMockCluster(t)
// Create a real replica with node 1 and node 2
replica := meta.NewReplica(&querypb.Replica{
ID: 1,
CollectionID: collectionID,
ResourceGroup: meta.DefaultResourceGroupName,
Nodes: []int64{1, 2},
})
// Create mock catalog for ReplicaManager
mockCatalog := mocks.NewQueryCoordCatalog(t)
mockCatalog.EXPECT().SaveReplica(mock.Anything, mock.Anything).Return(nil).Maybe()
// Create real ReplicaManager with mock catalog and put the replica into it
replicaMgr := meta.NewReplicaManager(nil, mockCatalog)
err := replicaMgr.Put(ctx, replica)
assert.NoError(t, err)
metaInstance := &meta.Meta{
CollectionManager: meta.NewCollectionManager(nil),
ReplicaManager: replicaMgr,
}
observer := NewTargetObserver(metaInstance, targetMgr, distMgr, broker, cluster, nodeMgr)
// Setup target manager expectations
channelNames := map[string]*meta.DmChannel{
"channel-1": {
VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: "channel-1"},
},
}
// Define a segment that exists in target but only node 1 has it loaded
segmentID := int64(100)
targetSegments := map[int64]*datapb.SegmentInfo{
segmentID: {ID: segmentID, CollectionID: collectionID, InsertChannel: "channel-1"},
}
targetMgr.EXPECT().GetDmChannelsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(channelNames).Maybe()
targetMgr.EXPECT().GetCollectionTargetVersion(mock.Anything, collectionID, meta.NextTarget).Return(newVersion).Maybe()
// Return a segment in target - this will be checked by CheckDelegatorDataReady
targetMgr.EXPECT().GetSealedSegmentsByChannel(mock.Anything, collectionID, "channel-1", mock.Anything).Return(targetSegments).Maybe()
targetMgr.EXPECT().GetGrowingSegmentsByChannel(mock.Anything, collectionID, "channel-1", mock.Anything).Return(nil).Maybe()
targetMgr.EXPECT().GetDroppedSegmentsByChannel(mock.Anything, collectionID, "channel-1", mock.Anything).Return(nil).Maybe()
targetMgr.EXPECT().GetDmChannel(mock.Anything, collectionID, "channel-1", mock.Anything).Return(nil).Maybe()
targetMgr.EXPECT().GetPartitions(mock.Anything, collectionID, mock.Anything).Return([]int64{}, nil).Maybe()
broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
// Track which nodes receive SyncDistribution calls
syncedNodes := make([]int64, 0)
cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) {
syncedNodes = append(syncedNodes, nodeID)
return merr.Success(), nil
}).Maybe()
// Node 1: READY delegator
// - Has the target segment loaded (segment 100)
// - CheckDelegatorDataReady will return nil (ready)
distMgr.ChannelDistManager.Update(1, &meta.DmChannel{
VchannelInfo: &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: "channel-1",
},
Node: 1,
View: &meta.LeaderView{
ID: 1,
CollectionID: collectionID,
Channel: "channel-1",
Segments: map[int64]*querypb.SegmentDist{
segmentID: {NodeID: 1}, // Has the required segment
},
Status: &querypb.LeaderViewStatus{Serviceable: true},
},
})
// Node 2: NOT READY delegator
// - Does NOT have the target segment loaded (missing segment 100)
// - CheckDelegatorDataReady will return error (not ready)
distMgr.ChannelDistManager.Update(2, &meta.DmChannel{
VchannelInfo: &datapb.VchannelInfo{
CollectionID: collectionID,
ChannelName: "channel-1",
},
Node: 2,
View: &meta.LeaderView{
ID: 2,
CollectionID: collectionID,
Channel: "channel-1",
Segments: map[int64]*querypb.SegmentDist{}, // Missing the required segment!
Status: &querypb.LeaderViewStatus{Serviceable: false},
},
})
// Execute the function under test
result := observer.shouldUpdateCurrentTarget(ctx, collectionID)
// Verify the result is true (at least one ready delegator exists)
assert.True(t, result)
// Verify that ONLY node 1 received SyncDistribution call
// This is the key assertion: if the bug existed (using delegatorList instead of readyDelegatorsInChannel),
// node 2 would also receive a SyncDistribution call
assert.Equal(t, 1, len(syncedNodes), "Expected only 1 SyncDistribution call for the ready delegator")
assert.Contains(t, syncedNodes, int64(1), "Expected node 1 (ready delegator) to receive SyncDistribution")
assert.NotContains(t, syncedNodes, int64(2), "Node 2 (not ready delegator) should NOT receive SyncDistribution")
}
func TestTargetObserver(t *testing.T) {
suite.Run(t, new(TargetObserverSuite))
suite.Run(t, new(TargetObserverCheckSuite))