diff --git a/internal/querynodev2/pipeline/pipeline_test.go b/internal/querynodev2/pipeline/pipeline_test.go index 1da1bf813a..605995bc77 100644 --- a/internal/querynodev2/pipeline/pipeline_test.go +++ b/internal/querynodev2/pipeline/pipeline_test.go @@ -139,6 +139,15 @@ func (suite *PipelineTestSuite) TestBasic() { } }) + // build input msg ahead of time to set up expectations + in := suite.buildMsgPack(schema) + + // use a channel to signal when UpdateTSafe is called (last operation in pipeline) + done := make(chan struct{}) + suite.delegator.EXPECT().UpdateTSafe(in.EndTs).Run(func(ts uint64) { + close(done) + }).Return() + // build pipleine manager := &segments.Manager{ Collection: suite.collectionManager, @@ -155,10 +164,11 @@ func (suite *PipelineTestSuite) TestBasic() { suite.NoError(err) defer pipelineObj.Close() - // build input msg - in := suite.buildMsgPack(schema) - suite.delegator.EXPECT().UpdateTSafe(in.EndTs).Return() + // send message to pipeline suite.msgChan <- in + + // wait for pipeline to process the message + <-done } func TestQueryNodePipeline(t *testing.T) {