From 70a7ae2e398be7885e869cb2c5248ec04b225652 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 20 Oct 2025 19:32:03 +0800 Subject: [PATCH] fix: [skip e2e]use eventual assertion for pending message check in replicate stream test (#44972) Related to #44620 The test was flaky because pendingMessages.Len() assertion happened before async message processing completed. Changed to assert.Eventually to wait up to 1 second for the pending queue to drain, fixing the race condition where actual was 1 but expected was 0. Fixes TestReplicateStreamClient_Reconnect flakiness. Signed-off-by: Congqi Xia --- .../replicatestream/replicate_stream_client_impl_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/cdc/replication/replicatestream/replicate_stream_client_impl_test.go b/internal/cdc/replication/replicatestream/replicate_stream_client_impl_test.go index d6b104a86d..917be58e91 100644 --- a/internal/cdc/replication/replicatestream/replicate_stream_client_impl_test.go +++ b/internal/cdc/replication/replicatestream/replicate_stream_client_impl_test.go @@ -101,7 +101,9 @@ func TestReplicateStreamClient_Replicate(t *testing.T) { mockStreamClient.ExpectRecv() } assert.Equal(t, msgCount, mockStreamClient.GetRecvCount()) - assert.Equal(t, 0, replicateClient.(*replicateStreamClient).pendingMessages.Len()) + assert.Eventually(t, func() bool { + return replicateClient.(*replicateStreamClient).pendingMessages.Len() == 0 + }, time.Second, 100*time.Millisecond) replicateClient.Close() } @@ -218,7 +220,9 @@ func TestReplicateStreamClient_Reconnect(t *testing.T) { mockStreamClient.ExpectRecv() } assert.Equal(t, msgCount, mockStreamClient.GetRecvCount()) - assert.Equal(t, 0, replicateClient.(*replicateStreamClient).pendingMessages.Len()) + assert.Eventually(t, func() bool { + return replicateClient.(*replicateStreamClient).pendingMessages.Len() == 0 + }, time.Second, 100*time.Millisecond) replicateClient.Close() }