diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 2f354692d9..6813d95a02 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -415,39 +415,62 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect } newVersion := ob.targetMgr.GetCollectionTargetVersion(ctx, collectionID, meta.NextTarget) - collReadyDelegatorList := make([]*meta.DmChannel, 0) - for channel := range channelNames { - delegatorList := ob.distMgr.ChannelDistManager.GetByFilter(meta.WithChannelName2Channel(channel)) - chReadyDelegatorList := lo.Filter(delegatorList, func(ch *meta.DmChannel, _ int) bool { - err := utils.CheckDelegatorDataReady(ob.nodeMgr, ob.targetMgr, ch.View, meta.NextTarget) - dataReadyForNextTarget := err == nil - if !dataReadyForNextTarget { - log.Info("check delegator", - zap.Int64("collectionID", collectionID), - zap.String("channelName", channel), - zap.Int64("targetVersion", ch.View.TargetVersion), - zap.Int64("newTargetVersion", newVersion), - zap.Bool("isServiceable", ch.IsServiceable()), - zap.Int64("nodeID", ch.Node), - zap.Int64("version", ch.Version), - zap.Error(err), - ) - } - return (newVersion == ch.View.TargetVersion && ch.IsServiceable()) || dataReadyForNextTarget - }) - // to avoid stuck here in dynamic increase replica case, we just check available delegator number - if int32(len(chReadyDelegatorList)) < replicaNum { - log.RatedInfo(10, "channel not ready", - zap.Int("readyReplicaNum", len(chReadyDelegatorList)), - zap.String("channelName", channel), + // checkDelegatorDataReady checks if a delegator is ready for the next target. + // A delegator is considered ready if: + // 1. Its target version matches the new version and it is serviceable, OR + // 2. Its data is ready for the next target (all segments and channels are loaded) + checkDelegatorDataReady := func(replica *meta.Replica, channel *meta.DmChannel) bool { + err := utils.CheckDelegatorDataReady(ob.nodeMgr, ob.targetMgr, channel.View, meta.NextTarget) + dataReadyForNextTarget := err == nil + if !dataReadyForNextTarget { + log.Info("check delegator", + zap.Int64("collectionID", collectionID), + zap.Int64("replicaID", replica.GetID()), + zap.Int64("nodeID", channel.Node), + zap.String("channelName", channel.GetChannelName()), + zap.Int64("targetVersion", channel.View.TargetVersion), + zap.Int64("newTargetVersion", newVersion), + zap.Bool("isServiceable", channel.IsServiceable()), + zap.Int64("version", channel.Version), + zap.Error(err), ) - return false } - collReadyDelegatorList = append(collReadyDelegatorList, chReadyDelegatorList...) + return (newVersion == channel.View.TargetVersion && channel.IsServiceable()) || dataReadyForNextTarget } - return ob.syncNextTargetToDelegator(ctx, collectionID, collReadyDelegatorList, newVersion) + // Iterate through each replica to check if all its delegators are ready. + // this approach ensures each replica has at least one ready delegator for every channel. + // This prevents the issue where some replicas may lack nodes during dynamic replica scaling, + // while the total count still meets the threshold. + readyDelegatorsInCollection := make([]*meta.DmChannel, 0) + replicas := ob.meta.ReplicaManager.GetByCollection(ctx, collectionID) + for _, replica := range replicas { + readyDelegatorsInReplica := make([]*meta.DmChannel, 0) + for channel := range channelNames { + // Filter delegators by replica to ensure we only check delegators belonging to this replica + delegatorList := ob.distMgr.ChannelDistManager.GetByFilter(meta.WithReplica2Channel(replica), meta.WithChannelName2Channel(channel)) + readyDelegatorsInChannel := lo.Filter(delegatorList, func(ch *meta.DmChannel, _ int) bool { + return checkDelegatorDataReady(replica, ch) + }) + + if len(readyDelegatorsInChannel) > 0 { + readyDelegatorsInReplica = append(readyDelegatorsInReplica, readyDelegatorsInChannel...) + } + } + readyDelegatorsInCollection = append(readyDelegatorsInCollection, readyDelegatorsInReplica...) + } + + // If no ready delegators exist in any replica, don't update current target + if len(readyDelegatorsInCollection) == 0 { + log.RatedInfo(10, "no ready delegators in any replica", + zap.Int64("collectionID", collectionID), + zap.Int("replicaCount", len(replicas)), + ) + return false + } + + return ob.syncNextTargetToDelegator(ctx, collectionID, readyDelegatorsInCollection, newVersion) } // sync next target info to delegator as readable snapshot diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index 5281882307..673bb1cb7b 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/samber/lo" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -29,6 +30,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" + "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/querycoordv2/meta" . "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" @@ -146,6 +148,8 @@ func (suite *TargetObserverSuite) SetupTest() { suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(suite.nextTargetChannels, suite.nextTargetSegments, nil) suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe() + suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() + suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() suite.observer.Start() } @@ -364,6 +368,275 @@ func (s *TargetObserverCheckSuite) TestCheck() { s.True(s.observer.loadingDispatcher.tasks.Contain(s.collectionID)) } +// TestShouldUpdateCurrentTarget_EmptyNextTarget tests when next target is empty +func TestShouldUpdateCurrentTarget_EmptyNextTarget(t *testing.T) { + paramtable.Init() + ctx := context.Background() + collectionID := int64(1000) + + nodeMgr := session.NewNodeManager() + targetMgr := meta.NewMockTargetManager(t) + distMgr := meta.NewDistributionManager(nodeMgr) + broker := meta.NewMockBroker(t) + cluster := session.NewMockCluster(t) + + // Use a minimal meta without CollectionManager since we only test targetMgr behavior + metaInstance := &meta.Meta{ + CollectionManager: meta.NewCollectionManager(nil), + } + + observer := NewTargetObserver(metaInstance, targetMgr, distMgr, broker, cluster, nodeMgr) + + // Return empty channels to simulate empty next target + targetMgr.EXPECT().GetDmChannelsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(map[string]*meta.DmChannel{}).Maybe() + + result := observer.shouldUpdateCurrentTarget(ctx, collectionID) + assert.False(t, result) +} + +// TestShouldUpdateCurrentTarget_ReplicaReadiness tests the replica-based readiness check +func TestShouldUpdateCurrentTarget_ReplicaReadiness(t *testing.T) { + paramtable.Init() + ctx := context.Background() + collectionID := int64(1000) + + nodeMgr := session.NewNodeManager() + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 1})) + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 2})) + + targetMgr := meta.NewMockTargetManager(t) + distMgr := meta.NewDistributionManager(nodeMgr) + broker := meta.NewMockBroker(t) + cluster := session.NewMockCluster(t) + + // Create mock replicas + replica1 := meta.NewMockReplica(t) + replica1.EXPECT().GetID().Return(int64(1)).Maybe() + replica1.EXPECT().GetCollectionID().Return(collectionID).Maybe() + replica1.EXPECT().GetNodes().Return([]int64{1}).Maybe() + replica1.EXPECT().Contains(int64(1)).Return(true).Maybe() + replica1.EXPECT().Contains(int64(2)).Return(false).Maybe() + + replica2 := meta.NewMockReplica(t) + replica2.EXPECT().GetID().Return(int64(2)).Maybe() + replica2.EXPECT().GetCollectionID().Return(collectionID).Maybe() + replica2.EXPECT().GetNodes().Return([]int64{2}).Maybe() + replica2.EXPECT().Contains(int64(1)).Return(false).Maybe() + replica2.EXPECT().Contains(int64(2)).Return(true).Maybe() + + // Create mock ReplicaManager + replicaMgr := meta.NewReplicaManager(nil, nil) + + metaInstance := &meta.Meta{ + CollectionManager: meta.NewCollectionManager(nil), + ReplicaManager: replicaMgr, + } + + observer := NewTargetObserver(metaInstance, targetMgr, distMgr, broker, cluster, nodeMgr) + + // Setup mock expectations + channelNames := map[string]*meta.DmChannel{ + "channel-1": { + VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: "channel-1"}, + }, + "channel-2": { + VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: "channel-2"}, + }, + } + newVersion := int64(100) + + targetMgr.EXPECT().GetDmChannelsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(channelNames).Maybe() + targetMgr.EXPECT().GetCollectionTargetVersion(mock.Anything, collectionID, meta.NextTarget).Return(newVersion).Maybe() + broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() + cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() + + // Test case: replica1 (node1) has both channels ready + distMgr.ChannelDistManager.Update(1, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: collectionID, + ChannelName: "channel-1", + }, + Node: 1, + View: &meta.LeaderView{ + ID: 1, + CollectionID: collectionID, + Channel: "channel-1", + TargetVersion: newVersion, + Segments: map[int64]*querypb.SegmentDist{ + 11: {NodeID: 1}, + }, + }, + }, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: collectionID, + ChannelName: "channel-2", + }, + Node: 1, + View: &meta.LeaderView{ + ID: 1, + CollectionID: collectionID, + Channel: "channel-2", + TargetVersion: newVersion, + Segments: map[int64]*querypb.SegmentDist{ + 12: {NodeID: 1}, + }, + }, + }) + + // replica2 (node2) only has channel-1, missing channel-2 + // This simulates the "replica lack of nodes" scenario + distMgr.ChannelDistManager.Update(2, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: collectionID, + ChannelName: "channel-1", + }, + Node: 2, + View: &meta.LeaderView{ + ID: 2, + CollectionID: collectionID, + Channel: "channel-1", + TargetVersion: newVersion, + Segments: map[int64]*querypb.SegmentDist{ + 11: {NodeID: 2}, + }, + }, + }) + + // With new implementation: + // - replica1 is ready (has both channels) + // - replica2 is NOT ready (missing channel-2) + // Since ReplicaManager.GetByCollection returns empty (no replicas in the mock manager), + // readyDelegatorsInCollection will be empty, and shouldUpdateCurrentTarget returns false. + result := observer.shouldUpdateCurrentTarget(ctx, collectionID) + assert.False(t, result) +} + +// TestShouldUpdateCurrentTarget_OnlyReadyDelegatorsSynced verifies that only ready delegators +// are included in the sync operation. This test specifically validates the fix for the bug where +// all delegators (including non-ready ones) were being added to readyDelegatorsInReplica. +func TestShouldUpdateCurrentTarget_OnlyReadyDelegatorsSynced(t *testing.T) { + paramtable.Init() + ctx := context.Background() + collectionID := int64(1000) + newVersion := int64(100) + + nodeMgr := session.NewNodeManager() + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 1})) + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 2})) + + targetMgr := meta.NewMockTargetManager(t) + distMgr := meta.NewDistributionManager(nodeMgr) + broker := meta.NewMockBroker(t) + cluster := session.NewMockCluster(t) + + // Create a real replica with node 1 and node 2 + replica := meta.NewReplica(&querypb.Replica{ + ID: 1, + CollectionID: collectionID, + ResourceGroup: meta.DefaultResourceGroupName, + Nodes: []int64{1, 2}, + }) + + // Create mock catalog for ReplicaManager + mockCatalog := mocks.NewQueryCoordCatalog(t) + mockCatalog.EXPECT().SaveReplica(mock.Anything, mock.Anything).Return(nil).Maybe() + + // Create real ReplicaManager with mock catalog and put the replica into it + replicaMgr := meta.NewReplicaManager(nil, mockCatalog) + err := replicaMgr.Put(ctx, replica) + assert.NoError(t, err) + + metaInstance := &meta.Meta{ + CollectionManager: meta.NewCollectionManager(nil), + ReplicaManager: replicaMgr, + } + + observer := NewTargetObserver(metaInstance, targetMgr, distMgr, broker, cluster, nodeMgr) + + // Setup target manager expectations + channelNames := map[string]*meta.DmChannel{ + "channel-1": { + VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: "channel-1"}, + }, + } + + // Define a segment that exists in target but only node 1 has it loaded + segmentID := int64(100) + targetSegments := map[int64]*datapb.SegmentInfo{ + segmentID: {ID: segmentID, CollectionID: collectionID, InsertChannel: "channel-1"}, + } + + targetMgr.EXPECT().GetDmChannelsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(channelNames).Maybe() + targetMgr.EXPECT().GetCollectionTargetVersion(mock.Anything, collectionID, meta.NextTarget).Return(newVersion).Maybe() + // Return a segment in target - this will be checked by CheckDelegatorDataReady + targetMgr.EXPECT().GetSealedSegmentsByChannel(mock.Anything, collectionID, "channel-1", mock.Anything).Return(targetSegments).Maybe() + targetMgr.EXPECT().GetGrowingSegmentsByChannel(mock.Anything, collectionID, "channel-1", mock.Anything).Return(nil).Maybe() + targetMgr.EXPECT().GetDroppedSegmentsByChannel(mock.Anything, collectionID, "channel-1", mock.Anything).Return(nil).Maybe() + targetMgr.EXPECT().GetDmChannel(mock.Anything, collectionID, "channel-1", mock.Anything).Return(nil).Maybe() + targetMgr.EXPECT().GetPartitions(mock.Anything, collectionID, mock.Anything).Return([]int64{}, nil).Maybe() + + broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() + + // Track which nodes receive SyncDistribution calls + syncedNodes := make([]int64, 0) + cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) { + syncedNodes = append(syncedNodes, nodeID) + return merr.Success(), nil + }).Maybe() + + // Node 1: READY delegator + // - Has the target segment loaded (segment 100) + // - CheckDelegatorDataReady will return nil (ready) + distMgr.ChannelDistManager.Update(1, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: collectionID, + ChannelName: "channel-1", + }, + Node: 1, + View: &meta.LeaderView{ + ID: 1, + CollectionID: collectionID, + Channel: "channel-1", + Segments: map[int64]*querypb.SegmentDist{ + segmentID: {NodeID: 1}, // Has the required segment + }, + Status: &querypb.LeaderViewStatus{Serviceable: true}, + }, + }) + + // Node 2: NOT READY delegator + // - Does NOT have the target segment loaded (missing segment 100) + // - CheckDelegatorDataReady will return error (not ready) + distMgr.ChannelDistManager.Update(2, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: collectionID, + ChannelName: "channel-1", + }, + Node: 2, + View: &meta.LeaderView{ + ID: 2, + CollectionID: collectionID, + Channel: "channel-1", + Segments: map[int64]*querypb.SegmentDist{}, // Missing the required segment! + Status: &querypb.LeaderViewStatus{Serviceable: false}, + }, + }) + + // Execute the function under test + result := observer.shouldUpdateCurrentTarget(ctx, collectionID) + + // Verify the result is true (at least one ready delegator exists) + assert.True(t, result) + + // Verify that ONLY node 1 received SyncDistribution call + // This is the key assertion: if the bug existed (using delegatorList instead of readyDelegatorsInChannel), + // node 2 would also receive a SyncDistribution call + assert.Equal(t, 1, len(syncedNodes), "Expected only 1 SyncDistribution call for the ready delegator") + assert.Contains(t, syncedNodes, int64(1), "Expected node 1 (ready delegator) to receive SyncDistribution") + assert.NotContains(t, syncedNodes, int64(2), "Node 2 (not ready delegator) should NOT receive SyncDistribution") +} + func TestTargetObserver(t *testing.T) { suite.Run(t, new(TargetObserverSuite)) suite.Run(t, new(TargetObserverCheckSuite))