diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index edc53e7fd7..5e0e71837d 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -153,14 +153,19 @@ func (suite *TargetObserverSuite) SetupTest() { suite.observer.Start() } -func (suite *TargetObserverSuite) TestTriggerUpdateTarget() { +// TestInitialLoad_ShouldNotUpdateCurrentTarget verifies that when CurrentTarget is empty, +// it should NOT be updated even when NextTarget is ready and all nodes have loaded the data. +// This is a critical safety mechanism to ensure the system doesn't expose incomplete data. +func (suite *TargetObserverSuite) TestInitialLoad_ShouldNotUpdateCurrentTarget() { ctx := suite.ctx + // Wait for observer to automatically update NextTarget with initial 2 segments suite.Eventually(func() bool { return len(suite.targetMgr.GetSealedSegmentsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 2 && len(suite.targetMgr.GetDmChannelsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 2 }, 5*time.Second, 1*time.Second) + // Simulate distributed environment: Node 2 has loaded all channels and segments suite.distMgr.ChannelDistManager.Update(2, &meta.DmChannel{ VchannelInfo: &datapb.VchannelInfo{ CollectionID: suite.collectionID, @@ -191,34 +196,64 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() { }, }) - // Never update current target if it's empty, even the next target is ready + // Key verification: CurrentTarget should remain empty even though NextTarget is ready + // This ensures we don't expose data before explicitly updating CurrentTarget suite.Eventually(func() bool { return len(suite.targetMgr.GetDmChannelsByCollection(ctx, suite.collectionID, meta.CurrentTarget)) == 0 }, 3*time.Second, 1*time.Second) + // Verify all expected broker calls were made + suite.broker.AssertExpectations(suite.T()) +} + +// TestIncrementalUpdate_WithNewSegment verifies that when CurrentTarget is not empty, +// the observer can automatically detect and update to include new segments. +// This simulates a real-world scenario where data is continuously ingested. +func (suite *TargetObserverSuite) TestIncrementalUpdate_WithNewSegment() { + ctx := suite.ctx + + // Wait for initial load: 2 segments in NextTarget + suite.Eventually(func() bool { + return len(suite.targetMgr.GetSealedSegmentsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 2 && + len(suite.targetMgr.GetDmChannelsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 2 + }, 5*time.Second, 1*time.Second) + + // Manually set CurrentTarget to non-empty (simulating previous successful load) + // This is the precondition for incremental updates to work + suite.targetMgr.UpdateCollectionCurrentTarget(ctx, suite.collectionID) + + // Clear previous mock expectations and prepare for new segment suite.broker.AssertExpectations(suite.T()) suite.broker.ExpectedCalls = suite.broker.ExpectedCalls[:0] + + // Simulate new data arrival: Add segment 13 to the segment list suite.nextTargetSegments = append(suite.nextTargetSegments, &datapb.SegmentInfo{ ID: 13, PartitionID: suite.partitionID, InsertChannel: "channel-1", }) - suite.targetMgr.UpdateCollectionCurrentTarget(ctx, suite.collectionID) - // Pull next again + // Setup mocks for the new segment discovery phase + // These mocks will be used by the background goroutine when it polls for updates suite.broker.EXPECT(). GetRecoveryInfoV2(mock.Anything, mock.Anything). Return(suite.nextTargetChannels, suite.nextTargetSegments, nil) + suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe() + suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() + // Wait for observer to automatically discover the new segment + // The background goroutine should detect segment 13 and update NextTarget suite.Eventually(func() bool { return len(suite.targetMgr.GetSealedSegmentsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 3 && len(suite.targetMgr.GetDmChannelsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 2 }, 7*time.Second, 1*time.Second) suite.broker.AssertExpectations(suite.T()) - // Manually update next target + // Manually trigger update to ensure NextTarget is ready ready, err := suite.observer.UpdateNextTarget(suite.collectionID) suite.NoError(err) + + // Simulate nodes loading the new segment 13 suite.distMgr.ChannelDistManager.Update(2, &meta.DmChannel{ VchannelInfo: &datapb.VchannelInfo{ CollectionID: suite.collectionID, @@ -231,7 +266,7 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() { Channel: "channel-1", Segments: map[int64]*querypb.SegmentDist{ 11: {NodeID: 2}, - 13: {NodeID: 2}, + 13: {NodeID: 2}, // New segment loaded }, }, }, &meta.DmChannel{ @@ -250,11 +285,10 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() { }, }) - suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe() - suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe() suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() - // Able to update current if it's not empty + // Verify that CurrentTarget is updated to include all 3 segments + // Since CurrentTarget is not empty, the update should proceed successfully suite.Eventually(func() bool { isReady := false select { @@ -267,6 +301,7 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() { len(suite.targetMgr.GetDmChannelsByCollection(ctx, suite.collectionID, meta.CurrentTarget)) == 2 }, 7*time.Second, 1*time.Second) + // Verify sync action contains correct checkpoint information ch1View := suite.distMgr.ChannelDistManager.GetByFilter(meta.WithChannelName2Channel("channel-1"))[0].View action := suite.observer.genSyncAction(ctx, ch1View, 100) suite.Equal(action.GetDeleteCP().Timestamp, uint64(200))