From 046693eaf7baf08e0fe463be8f57e1f7525cbfeb Mon Sep 17 00:00:00 2001 From: wei liu Date: Tue, 9 Dec 2025 17:57:14 +0800 Subject: [PATCH] test: [skip e2e] fix race condition in TestQueryNodePipeline/TestBasic (#46218) issue: #46217 The test was failing intermittently because it didn't wait for the pipeline to finish processing messages before exiting. The test sent a message to the pipeline and immediately returned, causing the deferred Close() to execute before ProcessInsert, ProcessDelete, and UpdateTSafe could be called. Fix by: - Moving message construction before mock expectations setup - Adding a done channel to synchronize on UpdateTSafe completion - Waiting for the signal before test exits Signed-off-by: Wei Liu --- internal/querynodev2/pipeline/pipeline_test.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/internal/querynodev2/pipeline/pipeline_test.go b/internal/querynodev2/pipeline/pipeline_test.go index 1da1bf813a..605995bc77 100644 --- a/internal/querynodev2/pipeline/pipeline_test.go +++ b/internal/querynodev2/pipeline/pipeline_test.go @@ -139,6 +139,15 @@ func (suite *PipelineTestSuite) TestBasic() { } }) + // build input msg ahead of time to set up expectations + in := suite.buildMsgPack(schema) + + // use a channel to signal when UpdateTSafe is called (last operation in pipeline) + done := make(chan struct{}) + suite.delegator.EXPECT().UpdateTSafe(in.EndTs).Run(func(ts uint64) { + close(done) + }).Return() + // build pipleine manager := &segments.Manager{ Collection: suite.collectionManager, @@ -155,10 +164,11 @@ func (suite *PipelineTestSuite) TestBasic() { suite.NoError(err) defer pipelineObj.Close() - // build input msg - in := suite.buildMsgPack(schema) - suite.delegator.EXPECT().UpdateTSafe(in.EndTs).Return() + // send message to pipeline suite.msgChan <- in + + // wait for pipeline to process the message + <-done } func TestQueryNodePipeline(t *testing.T) {