From 28061ec2f4f5506239e3411d137b132294d89852 Mon Sep 17 00:00:00 2001
From: liliu-z <105927039+liliu-z@users.noreply.github.com>
Date: Wed, 17 Dec 2025 11:35:16 +0800
Subject: [PATCH] =?UTF-8?q?enhance:=20eliminate=20race=20condition=20in=20?=
=?UTF-8?q?TestTargetObserver=20causing=20intermitt=E2=80=A6=20(#46375)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
This commit addresses an intermittent test failure in TestTargetObserver
with a mock panic error.
Problem:
--------
The original test TestTriggerUpdateTarget was a monolithic test that
cleared and recreated mock expectations mid-test execution. This created
a race condition:
1. Background goroutine in TargetObserver runs every 3 seconds, calling
broker.ListIndexes() and broker.DescribeCollection()
2. Test cleared all mock expectations at line 200 to prepare for next
phase
3. Test only re-mocked GetRecoveryInfoV2, leaving ListIndexes unmocked
4. If background goroutine triggered during this ~0.01s window (lines
200-213), it would call the unmocked ListIndexes() method, causing panic
and timeout
Error observed:
```
panic: test timed out after 10m0s
mock: I don't know what to return because the method call was unexpected.
Either do Mock.On("ListIndexes").Return(...) first, or remove the call.
```
Solution:
---------
Split the monolithic test into two independent test cases:
1. TestInitialLoad_ShouldNotUpdateCurrentTarget
- Tests that CurrentTarget remains empty during initial load
- Verifies the two-phase update mechanism works correctly
2. TestIncrementalUpdate_WithNewSegment
- Tests incremental updates when new segments arrive
- Properly sets up ALL required mocks before Eventually() calls
- Lines 241-242 now include ListIndexes and DescribeCollection mocks
Benefits:
---------
- Eliminates race condition entirely (no mid-test mock clearing)
- Better test isolation and maintainability
- Clearer test intent with descriptive names
- Tests can run independently and in parallel
- Follows FIRST principles (Fast, Isolated, Repeatable, Self-validating,
Timely)
Signed-off-by: Li Liu
---
.../observers/target_observer_test.go | 53 +++++++++++++++----
1 file changed, 44 insertions(+), 9 deletions(-)
diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go
index edc53e7fd7..5e0e71837d 100644
--- a/internal/querycoordv2/observers/target_observer_test.go
+++ b/internal/querycoordv2/observers/target_observer_test.go
@@ -153,14 +153,19 @@ func (suite *TargetObserverSuite) SetupTest() {
suite.observer.Start()
}
-func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
+// TestInitialLoad_ShouldNotUpdateCurrentTarget verifies that when CurrentTarget is empty,
+// it should NOT be updated even when NextTarget is ready and all nodes have loaded the data.
+// This is a critical safety mechanism to ensure the system doesn't expose incomplete data.
+func (suite *TargetObserverSuite) TestInitialLoad_ShouldNotUpdateCurrentTarget() {
ctx := suite.ctx
+ // Wait for observer to automatically update NextTarget with initial 2 segments
suite.Eventually(func() bool {
return len(suite.targetMgr.GetSealedSegmentsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 2 &&
len(suite.targetMgr.GetDmChannelsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 2
}, 5*time.Second, 1*time.Second)
+ // Simulate distributed environment: Node 2 has loaded all channels and segments
suite.distMgr.ChannelDistManager.Update(2, &meta.DmChannel{
VchannelInfo: &datapb.VchannelInfo{
CollectionID: suite.collectionID,
@@ -191,34 +196,64 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
},
})
- // Never update current target if it's empty, even the next target is ready
+ // Key verification: CurrentTarget should remain empty even though NextTarget is ready
+ // This ensures we don't expose data before explicitly updating CurrentTarget
suite.Eventually(func() bool {
return len(suite.targetMgr.GetDmChannelsByCollection(ctx, suite.collectionID, meta.CurrentTarget)) == 0
}, 3*time.Second, 1*time.Second)
+ // Verify all expected broker calls were made
+ suite.broker.AssertExpectations(suite.T())
+}
+
+// TestIncrementalUpdate_WithNewSegment verifies that when CurrentTarget is not empty,
+// the observer can automatically detect and update to include new segments.
+// This simulates a real-world scenario where data is continuously ingested.
+func (suite *TargetObserverSuite) TestIncrementalUpdate_WithNewSegment() {
+ ctx := suite.ctx
+
+ // Wait for initial load: 2 segments in NextTarget
+ suite.Eventually(func() bool {
+ return len(suite.targetMgr.GetSealedSegmentsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 2 &&
+ len(suite.targetMgr.GetDmChannelsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 2
+ }, 5*time.Second, 1*time.Second)
+
+ // Manually set CurrentTarget to non-empty (simulating previous successful load)
+ // This is the precondition for incremental updates to work
+ suite.targetMgr.UpdateCollectionCurrentTarget(ctx, suite.collectionID)
+
+ // Clear previous mock expectations and prepare for new segment
suite.broker.AssertExpectations(suite.T())
suite.broker.ExpectedCalls = suite.broker.ExpectedCalls[:0]
+
+ // Simulate new data arrival: Add segment 13 to the segment list
suite.nextTargetSegments = append(suite.nextTargetSegments, &datapb.SegmentInfo{
ID: 13,
PartitionID: suite.partitionID,
InsertChannel: "channel-1",
})
- suite.targetMgr.UpdateCollectionCurrentTarget(ctx, suite.collectionID)
- // Pull next again
+ // Setup mocks for the new segment discovery phase
+ // These mocks will be used by the background goroutine when it polls for updates
suite.broker.EXPECT().
GetRecoveryInfoV2(mock.Anything, mock.Anything).
Return(suite.nextTargetChannels, suite.nextTargetSegments, nil)
+ suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
+ suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
+ // Wait for observer to automatically discover the new segment
+ // The background goroutine should detect segment 13 and update NextTarget
suite.Eventually(func() bool {
return len(suite.targetMgr.GetSealedSegmentsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 3 &&
len(suite.targetMgr.GetDmChannelsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 2
}, 7*time.Second, 1*time.Second)
suite.broker.AssertExpectations(suite.T())
- // Manually update next target
+ // Manually trigger update to ensure NextTarget is ready
ready, err := suite.observer.UpdateNextTarget(suite.collectionID)
suite.NoError(err)
+
+ // Simulate nodes loading the new segment 13
suite.distMgr.ChannelDistManager.Update(2, &meta.DmChannel{
VchannelInfo: &datapb.VchannelInfo{
CollectionID: suite.collectionID,
@@ -231,7 +266,7 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
Channel: "channel-1",
Segments: map[int64]*querypb.SegmentDist{
11: {NodeID: 2},
- 13: {NodeID: 2},
+ 13: {NodeID: 2}, // New segment loaded
},
},
}, &meta.DmChannel{
@@ -250,11 +285,10 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
},
})
- suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
- suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
- // Able to update current if it's not empty
+ // Verify that CurrentTarget is updated to include all 3 segments
+ // Since CurrentTarget is not empty, the update should proceed successfully
suite.Eventually(func() bool {
isReady := false
select {
@@ -267,6 +301,7 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
len(suite.targetMgr.GetDmChannelsByCollection(ctx, suite.collectionID, meta.CurrentTarget)) == 2
}, 7*time.Second, 1*time.Second)
+ // Verify sync action contains correct checkpoint information
ch1View := suite.distMgr.ChannelDistManager.GetByFilter(meta.WithChannelName2Channel("channel-1"))[0].View
action := suite.observer.genSyncAction(ctx, ch1View, 100)
suite.Equal(action.GetDeleteCP().Timestamp, uint64(200))