diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 6813d95a02..1aa026400b 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -461,16 +461,10 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect readyDelegatorsInCollection = append(readyDelegatorsInCollection, readyDelegatorsInReplica...) } - // If no ready delegators exist in any replica, don't update current target - if len(readyDelegatorsInCollection) == 0 { - log.RatedInfo(10, "no ready delegators in any replica", - zap.Int64("collectionID", collectionID), - zap.Int("replicaCount", len(replicas)), - ) - return false - } - - return ob.syncNextTargetToDelegator(ctx, collectionID, readyDelegatorsInCollection, newVersion) + syncSuccess := ob.syncNextTargetToDelegator(ctx, collectionID, readyDelegatorsInCollection, newVersion) + syncedChannelNames := lo.Uniq(lo.Map(readyDelegatorsInCollection, func(ch *meta.DmChannel, _ int) string { return ch.ChannelName })) + // only after all channel are synced, we can consider the current target is ready + return syncSuccess && lo.Every(syncedChannelNames, lo.Keys(channelNames)) } // sync next target info to delegator as readable snapshot diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index 673bb1cb7b..edc53e7fd7 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -637,6 +637,323 @@ func TestShouldUpdateCurrentTarget_OnlyReadyDelegatorsSynced(t *testing.T) { assert.NotContains(t, syncedNodes, int64(2), "Node 2 (not ready delegator) should NOT receive SyncDistribution") } +// TestShouldUpdateCurrentTarget_AllChannelsSynced tests that shouldUpdateCurrentTarget returns true +// only when ALL channels are synced successfully. This validates the fix where we check: +// syncSuccess && lo.Every(syncedChannelNames, lo.Keys(channelNames)) +func TestShouldUpdateCurrentTarget_AllChannelsSynced(t *testing.T) { + paramtable.Init() + ctx := context.Background() + collectionID := int64(1000) + newVersion := int64(100) + + nodeMgr := session.NewNodeManager() + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 1})) + + targetMgr := meta.NewMockTargetManager(t) + distMgr := meta.NewDistributionManager(nodeMgr) + broker := meta.NewMockBroker(t) + cluster := session.NewMockCluster(t) + + // Create a real replica with node 1 + replica := meta.NewReplica(&querypb.Replica{ + ID: 1, + CollectionID: collectionID, + ResourceGroup: meta.DefaultResourceGroupName, + Nodes: []int64{1}, + }) + + // Create mock catalog for ReplicaManager + mockCatalog := mocks.NewQueryCoordCatalog(t) + mockCatalog.EXPECT().SaveReplica(mock.Anything, mock.Anything).Return(nil).Maybe() + + // Create real ReplicaManager with mock catalog and put the replica into it + replicaMgr := meta.NewReplicaManager(nil, mockCatalog) + err := replicaMgr.Put(ctx, replica) + assert.NoError(t, err) + + metaInstance := &meta.Meta{ + CollectionManager: meta.NewCollectionManager(nil), + ReplicaManager: replicaMgr, + } + + observer := NewTargetObserver(metaInstance, targetMgr, distMgr, broker, cluster, nodeMgr) + + // Setup target manager expectations - TWO channels + channelNames := map[string]*meta.DmChannel{ + "channel-1": { + VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: "channel-1"}, + }, + "channel-2": { + VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: "channel-2"}, + }, + } + + // Define segments for both channels + segmentID1 := int64(100) + segmentID2 := int64(101) + targetSegments1 := map[int64]*datapb.SegmentInfo{ + segmentID1: {ID: segmentID1, CollectionID: collectionID, InsertChannel: "channel-1"}, + } + targetSegments2 := map[int64]*datapb.SegmentInfo{ + segmentID2: {ID: segmentID2, CollectionID: collectionID, InsertChannel: "channel-2"}, + } + + targetMgr.EXPECT().GetDmChannelsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(channelNames).Maybe() + targetMgr.EXPECT().GetCollectionTargetVersion(mock.Anything, collectionID, meta.NextTarget).Return(newVersion).Maybe() + targetMgr.EXPECT().GetSealedSegmentsByChannel(mock.Anything, collectionID, "channel-1", mock.Anything).Return(targetSegments1).Maybe() + targetMgr.EXPECT().GetSealedSegmentsByChannel(mock.Anything, collectionID, "channel-2", mock.Anything).Return(targetSegments2).Maybe() + targetMgr.EXPECT().GetGrowingSegmentsByChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() + targetMgr.EXPECT().GetDroppedSegmentsByChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() + targetMgr.EXPECT().GetDmChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() + targetMgr.EXPECT().GetPartitions(mock.Anything, collectionID, mock.Anything).Return([]int64{}, nil).Maybe() + + broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() + cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() + + // Node 1 has BOTH channels ready + distMgr.ChannelDistManager.Update(1, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: collectionID, + ChannelName: "channel-1", + }, + Node: 1, + View: &meta.LeaderView{ + ID: 1, + CollectionID: collectionID, + Channel: "channel-1", + Segments: map[int64]*querypb.SegmentDist{ + segmentID1: {NodeID: 1}, + }, + Status: &querypb.LeaderViewStatus{Serviceable: true}, + }, + }, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: collectionID, + ChannelName: "channel-2", + }, + Node: 1, + View: &meta.LeaderView{ + ID: 1, + CollectionID: collectionID, + Channel: "channel-2", + Segments: map[int64]*querypb.SegmentDist{ + segmentID2: {NodeID: 1}, + }, + Status: &querypb.LeaderViewStatus{Serviceable: true}, + }, + }) + + // Execute the function under test + result := observer.shouldUpdateCurrentTarget(ctx, collectionID) + + // When all channels are synced, should return true + assert.True(t, result, "Expected true when ALL channels are synced successfully") +} + +// TestShouldUpdateCurrentTarget_PartialChannelsSynced tests that shouldUpdateCurrentTarget returns false +// when only some channels have ready delegators. This is the core behavior of the fix. +func TestShouldUpdateCurrentTarget_PartialChannelsSynced(t *testing.T) { + paramtable.Init() + ctx := context.Background() + collectionID := int64(1000) + newVersion := int64(100) + + nodeMgr := session.NewNodeManager() + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 1})) + + targetMgr := meta.NewMockTargetManager(t) + distMgr := meta.NewDistributionManager(nodeMgr) + broker := meta.NewMockBroker(t) + cluster := session.NewMockCluster(t) + + // Create a real replica with node 1 + replica := meta.NewReplica(&querypb.Replica{ + ID: 1, + CollectionID: collectionID, + ResourceGroup: meta.DefaultResourceGroupName, + Nodes: []int64{1}, + }) + + // Create mock catalog for ReplicaManager + mockCatalog := mocks.NewQueryCoordCatalog(t) + mockCatalog.EXPECT().SaveReplica(mock.Anything, mock.Anything).Return(nil).Maybe() + + // Create real ReplicaManager with mock catalog and put the replica into it + replicaMgr := meta.NewReplicaManager(nil, mockCatalog) + err := replicaMgr.Put(ctx, replica) + assert.NoError(t, err) + + metaInstance := &meta.Meta{ + CollectionManager: meta.NewCollectionManager(nil), + ReplicaManager: replicaMgr, + } + + observer := NewTargetObserver(metaInstance, targetMgr, distMgr, broker, cluster, nodeMgr) + + // Setup target manager expectations - TWO channels in target + channelNames := map[string]*meta.DmChannel{ + "channel-1": { + VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: "channel-1"}, + }, + "channel-2": { + VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: "channel-2"}, + }, + } + + // Define segments + segmentID1 := int64(100) + segmentID2 := int64(101) + targetSegments1 := map[int64]*datapb.SegmentInfo{ + segmentID1: {ID: segmentID1, CollectionID: collectionID, InsertChannel: "channel-1"}, + } + targetSegments2 := map[int64]*datapb.SegmentInfo{ + segmentID2: {ID: segmentID2, CollectionID: collectionID, InsertChannel: "channel-2"}, + } + + targetMgr.EXPECT().GetDmChannelsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(channelNames).Maybe() + targetMgr.EXPECT().GetCollectionTargetVersion(mock.Anything, collectionID, meta.NextTarget).Return(newVersion).Maybe() + targetMgr.EXPECT().GetSealedSegmentsByChannel(mock.Anything, collectionID, "channel-1", mock.Anything).Return(targetSegments1).Maybe() + targetMgr.EXPECT().GetSealedSegmentsByChannel(mock.Anything, collectionID, "channel-2", mock.Anything).Return(targetSegments2).Maybe() + targetMgr.EXPECT().GetGrowingSegmentsByChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() + targetMgr.EXPECT().GetDroppedSegmentsByChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() + targetMgr.EXPECT().GetDmChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() + targetMgr.EXPECT().GetPartitions(mock.Anything, collectionID, mock.Anything).Return([]int64{}, nil).Maybe() + + broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() + cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() + + // Node 1 has ONLY channel-1 ready, channel-2 is NOT ready (missing segment) + distMgr.ChannelDistManager.Update(1, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: collectionID, + ChannelName: "channel-1", + }, + Node: 1, + View: &meta.LeaderView{ + ID: 1, + CollectionID: collectionID, + Channel: "channel-1", + Segments: map[int64]*querypb.SegmentDist{ + segmentID1: {NodeID: 1}, // Has the required segment for channel-1 + }, + Status: &querypb.LeaderViewStatus{Serviceable: true}, + }, + }, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: collectionID, + ChannelName: "channel-2", + }, + Node: 1, + View: &meta.LeaderView{ + ID: 1, + CollectionID: collectionID, + Channel: "channel-2", + Segments: map[int64]*querypb.SegmentDist{}, // Missing the required segment for channel-2! + Status: &querypb.LeaderViewStatus{Serviceable: false}, + }, + }) + + // Execute the function under test + result := observer.shouldUpdateCurrentTarget(ctx, collectionID) + + // When only partial channels are synced, should return false + // This is the key behavior being tested - with the new fix: + // syncedChannelNames = ["channel-1"] (only channel-1 has ready delegator) + // channelNames keys = ["channel-1", "channel-2"] + // lo.Every(["channel-1"], ["channel-1", "channel-2"]) = false + assert.False(t, result, "Expected false when only PARTIAL channels are synced") +} + +// TestShouldUpdateCurrentTarget_NoReadyDelegators tests that shouldUpdateCurrentTarget returns false +// when there are no ready delegators at all. +func TestShouldUpdateCurrentTarget_NoReadyDelegators(t *testing.T) { + paramtable.Init() + ctx := context.Background() + collectionID := int64(1000) + newVersion := int64(100) + + nodeMgr := session.NewNodeManager() + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{NodeID: 1})) + + targetMgr := meta.NewMockTargetManager(t) + distMgr := meta.NewDistributionManager(nodeMgr) + broker := meta.NewMockBroker(t) + cluster := session.NewMockCluster(t) + + // Create a real replica with node 1 + replica := meta.NewReplica(&querypb.Replica{ + ID: 1, + CollectionID: collectionID, + ResourceGroup: meta.DefaultResourceGroupName, + Nodes: []int64{1}, + }) + + // Create mock catalog for ReplicaManager + mockCatalog := mocks.NewQueryCoordCatalog(t) + mockCatalog.EXPECT().SaveReplica(mock.Anything, mock.Anything).Return(nil).Maybe() + + // Create real ReplicaManager with mock catalog and put the replica into it + replicaMgr := meta.NewReplicaManager(nil, mockCatalog) + err := replicaMgr.Put(ctx, replica) + assert.NoError(t, err) + + metaInstance := &meta.Meta{ + CollectionManager: meta.NewCollectionManager(nil), + ReplicaManager: replicaMgr, + } + + observer := NewTargetObserver(metaInstance, targetMgr, distMgr, broker, cluster, nodeMgr) + + // Setup target manager expectations + channelNames := map[string]*meta.DmChannel{ + "channel-1": { + VchannelInfo: &datapb.VchannelInfo{CollectionID: collectionID, ChannelName: "channel-1"}, + }, + } + + // Define segment that no delegator has + segmentID := int64(100) + targetSegments := map[int64]*datapb.SegmentInfo{ + segmentID: {ID: segmentID, CollectionID: collectionID, InsertChannel: "channel-1"}, + } + + targetMgr.EXPECT().GetDmChannelsByCollection(mock.Anything, collectionID, meta.NextTarget).Return(channelNames).Maybe() + targetMgr.EXPECT().GetCollectionTargetVersion(mock.Anything, collectionID, meta.NextTarget).Return(newVersion).Maybe() + targetMgr.EXPECT().GetSealedSegmentsByChannel(mock.Anything, collectionID, "channel-1", mock.Anything).Return(targetSegments).Maybe() + targetMgr.EXPECT().GetGrowingSegmentsByChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() + targetMgr.EXPECT().GetDroppedSegmentsByChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() + targetMgr.EXPECT().GetDmChannel(mock.Anything, collectionID, mock.Anything, mock.Anything).Return(nil).Maybe() + targetMgr.EXPECT().GetPartitions(mock.Anything, collectionID, mock.Anything).Return([]int64{}, nil).Maybe() + + broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() + cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() + + // Node 1 has channel-1 but NOT ready (missing segment) + distMgr.ChannelDistManager.Update(1, &meta.DmChannel{ + VchannelInfo: &datapb.VchannelInfo{ + CollectionID: collectionID, + ChannelName: "channel-1", + }, + Node: 1, + View: &meta.LeaderView{ + ID: 1, + CollectionID: collectionID, + Channel: "channel-1", + Segments: map[int64]*querypb.SegmentDist{}, // Missing the required segment! + Status: &querypb.LeaderViewStatus{Serviceable: false}, + }, + }) + + // Execute the function under test + result := observer.shouldUpdateCurrentTarget(ctx, collectionID) + + // When no ready delegators exist, should return false + // syncedChannelNames = [] (no ready delegators) + // channelNames keys = ["channel-1"] + // lo.Every([], ["channel-1"]) = false (empty does not contain all) + assert.False(t, result, "Expected false when NO ready delegators exist") +} + func TestTargetObserver(t *testing.T) { suite.Run(t, new(TargetObserverSuite)) suite.Run(t, new(TargetObserverCheckSuite))