mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-28 14:35:27 +08:00
enhance: eliminate race condition in TestTargetObserver causing intermitt… (#46375)
This commit addresses an intermittent test failure in TestTargetObserver
with a mock panic error.
Problem:
--------
The original test TestTriggerUpdateTarget was a monolithic test that
cleared and recreated mock expectations mid-test execution. This created
a race condition:
1. Background goroutine in TargetObserver runs every 3 seconds, calling
broker.ListIndexes() and broker.DescribeCollection()
2. Test cleared all mock expectations at line 200 to prepare for next
phase
3. Test only re-mocked GetRecoveryInfoV2, leaving ListIndexes unmocked
4. If background goroutine triggered during this ~0.01s window (lines
200-213), it would call the unmocked ListIndexes() method, causing panic
and timeout
Error observed:
```
panic: test timed out after 10m0s
mock: I don't know what to return because the method call was unexpected.
Either do Mock.On("ListIndexes").Return(...) first, or remove the call.
```
Solution:
---------
Split the monolithic test into two independent test cases:
1. TestInitialLoad_ShouldNotUpdateCurrentTarget
- Tests that CurrentTarget remains empty during initial load
- Verifies the two-phase update mechanism works correctly
2. TestIncrementalUpdate_WithNewSegment
- Tests incremental updates when new segments arrive
- Properly sets up ALL required mocks before Eventually() calls
- Lines 241-242 now include ListIndexes and DescribeCollection mocks
Benefits:
---------
- Eliminates race condition entirely (no mid-test mock clearing)
- Better test isolation and maintainability
- Clearer test intent with descriptive names
- Tests can run independently and in parallel
- Follows FIRST principles (Fast, Isolated, Repeatable, Self-validating,
Timely)
Signed-off-by: Li Liu <li.liu@zilliz.com>
This commit is contained in:
parent
efa7ccdf81
commit
28061ec2f4
@ -153,14 +153,19 @@ func (suite *TargetObserverSuite) SetupTest() {
|
||||
suite.observer.Start()
|
||||
}
|
||||
|
||||
func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
|
||||
// TestInitialLoad_ShouldNotUpdateCurrentTarget verifies that when CurrentTarget is empty,
|
||||
// it should NOT be updated even when NextTarget is ready and all nodes have loaded the data.
|
||||
// This is a critical safety mechanism to ensure the system doesn't expose incomplete data.
|
||||
func (suite *TargetObserverSuite) TestInitialLoad_ShouldNotUpdateCurrentTarget() {
|
||||
ctx := suite.ctx
|
||||
|
||||
// Wait for observer to automatically update NextTarget with initial 2 segments
|
||||
suite.Eventually(func() bool {
|
||||
return len(suite.targetMgr.GetSealedSegmentsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 2 &&
|
||||
len(suite.targetMgr.GetDmChannelsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 2
|
||||
}, 5*time.Second, 1*time.Second)
|
||||
|
||||
// Simulate distributed environment: Node 2 has loaded all channels and segments
|
||||
suite.distMgr.ChannelDistManager.Update(2, &meta.DmChannel{
|
||||
VchannelInfo: &datapb.VchannelInfo{
|
||||
CollectionID: suite.collectionID,
|
||||
@ -191,34 +196,64 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
|
||||
},
|
||||
})
|
||||
|
||||
// Never update current target if it's empty, even the next target is ready
|
||||
// Key verification: CurrentTarget should remain empty even though NextTarget is ready
|
||||
// This ensures we don't expose data before explicitly updating CurrentTarget
|
||||
suite.Eventually(func() bool {
|
||||
return len(suite.targetMgr.GetDmChannelsByCollection(ctx, suite.collectionID, meta.CurrentTarget)) == 0
|
||||
}, 3*time.Second, 1*time.Second)
|
||||
|
||||
// Verify all expected broker calls were made
|
||||
suite.broker.AssertExpectations(suite.T())
|
||||
}
|
||||
|
||||
// TestIncrementalUpdate_WithNewSegment verifies that when CurrentTarget is not empty,
|
||||
// the observer can automatically detect and update to include new segments.
|
||||
// This simulates a real-world scenario where data is continuously ingested.
|
||||
func (suite *TargetObserverSuite) TestIncrementalUpdate_WithNewSegment() {
|
||||
ctx := suite.ctx
|
||||
|
||||
// Wait for initial load: 2 segments in NextTarget
|
||||
suite.Eventually(func() bool {
|
||||
return len(suite.targetMgr.GetSealedSegmentsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 2 &&
|
||||
len(suite.targetMgr.GetDmChannelsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 2
|
||||
}, 5*time.Second, 1*time.Second)
|
||||
|
||||
// Manually set CurrentTarget to non-empty (simulating previous successful load)
|
||||
// This is the precondition for incremental updates to work
|
||||
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, suite.collectionID)
|
||||
|
||||
// Clear previous mock expectations and prepare for new segment
|
||||
suite.broker.AssertExpectations(suite.T())
|
||||
suite.broker.ExpectedCalls = suite.broker.ExpectedCalls[:0]
|
||||
|
||||
// Simulate new data arrival: Add segment 13 to the segment list
|
||||
suite.nextTargetSegments = append(suite.nextTargetSegments, &datapb.SegmentInfo{
|
||||
ID: 13,
|
||||
PartitionID: suite.partitionID,
|
||||
InsertChannel: "channel-1",
|
||||
})
|
||||
suite.targetMgr.UpdateCollectionCurrentTarget(ctx, suite.collectionID)
|
||||
|
||||
// Pull next again
|
||||
// Setup mocks for the new segment discovery phase
|
||||
// These mocks will be used by the background goroutine when it polls for updates
|
||||
suite.broker.EXPECT().
|
||||
GetRecoveryInfoV2(mock.Anything, mock.Anything).
|
||||
Return(suite.nextTargetChannels, suite.nextTargetSegments, nil)
|
||||
suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
|
||||
|
||||
// Wait for observer to automatically discover the new segment
|
||||
// The background goroutine should detect segment 13 and update NextTarget
|
||||
suite.Eventually(func() bool {
|
||||
return len(suite.targetMgr.GetSealedSegmentsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 3 &&
|
||||
len(suite.targetMgr.GetDmChannelsByCollection(ctx, suite.collectionID, meta.NextTarget)) == 2
|
||||
}, 7*time.Second, 1*time.Second)
|
||||
suite.broker.AssertExpectations(suite.T())
|
||||
|
||||
// Manually update next target
|
||||
// Manually trigger update to ensure NextTarget is ready
|
||||
ready, err := suite.observer.UpdateNextTarget(suite.collectionID)
|
||||
suite.NoError(err)
|
||||
|
||||
// Simulate nodes loading the new segment 13
|
||||
suite.distMgr.ChannelDistManager.Update(2, &meta.DmChannel{
|
||||
VchannelInfo: &datapb.VchannelInfo{
|
||||
CollectionID: suite.collectionID,
|
||||
@ -231,7 +266,7 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
|
||||
Channel: "channel-1",
|
||||
Segments: map[int64]*querypb.SegmentDist{
|
||||
11: {NodeID: 2},
|
||||
13: {NodeID: 2},
|
||||
13: {NodeID: 2}, // New segment loaded
|
||||
},
|
||||
},
|
||||
}, &meta.DmChannel{
|
||||
@ -250,11 +285,10 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
|
||||
},
|
||||
})
|
||||
|
||||
suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
|
||||
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
|
||||
suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
|
||||
|
||||
// Able to update current if it's not empty
|
||||
// Verify that CurrentTarget is updated to include all 3 segments
|
||||
// Since CurrentTarget is not empty, the update should proceed successfully
|
||||
suite.Eventually(func() bool {
|
||||
isReady := false
|
||||
select {
|
||||
@ -267,6 +301,7 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
|
||||
len(suite.targetMgr.GetDmChannelsByCollection(ctx, suite.collectionID, meta.CurrentTarget)) == 2
|
||||
}, 7*time.Second, 1*time.Second)
|
||||
|
||||
// Verify sync action contains correct checkpoint information
|
||||
ch1View := suite.distMgr.ChannelDistManager.GetByFilter(meta.WithChannelName2Channel("channel-1"))[0].View
|
||||
action := suite.observer.genSyncAction(ctx, ch1View, 100)
|
||||
suite.Equal(action.GetDeleteCP().Timestamp, uint64(200))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user