From 54aaeda63fc035338038fd33cb9da98dfb4f020f Mon Sep 17 00:00:00 2001 From: SimFG Date: Fri, 22 Nov 2024 03:10:34 +0800 Subject: [PATCH] fix: add the request ctx for stream pipeline interface (#37835) - issue: #37834 Signed-off-by: SimFG --- internal/querynodev2/pipeline/manager_test.go | 2 +- internal/querynodev2/pipeline/pipeline_test.go | 2 +- internal/querynodev2/services.go | 2 +- internal/util/pipeline/stream_pipeline.go | 8 ++++---- internal/util/pipeline/stream_pipeline_test.go | 3 ++- 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/internal/querynodev2/pipeline/manager_test.go b/internal/querynodev2/pipeline/manager_test.go index e1654cd462..d8479dcf3f 100644 --- a/internal/querynodev2/pipeline/manager_test.go +++ b/internal/querynodev2/pipeline/manager_test.go @@ -101,7 +101,7 @@ func (suite *PipelineManagerTestSuite) TestBasic() { suite.NotNil(pipeline) // Init Consumer - err = pipeline.ConsumeMsgStream(&msgpb.MsgPosition{}) + err = pipeline.ConsumeMsgStream(context.Background(), &msgpb.MsgPosition{}) suite.NoError(err) // Start pipeline diff --git a/internal/querynodev2/pipeline/pipeline_test.go b/internal/querynodev2/pipeline/pipeline_test.go index 3dca8e674c..8e2edde820 100644 --- a/internal/querynodev2/pipeline/pipeline_test.go +++ b/internal/querynodev2/pipeline/pipeline_test.go @@ -147,7 +147,7 @@ func (suite *PipelineTestSuite) TestBasic() { suite.NoError(err) // Init Consumer - err = pipeline.ConsumeMsgStream(&msgpb.MsgPosition{}) + err = pipeline.ConsumeMsgStream(context.Background(), &msgpb.MsgPosition{}) suite.NoError(err) err = pipeline.Start() diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index e8b4c8ce25..3f92eafacf 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -325,7 +325,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm MsgID: channel.SeekPosition.MsgID, Timestamp: channel.SeekPosition.Timestamp, } - err = pipeline.ConsumeMsgStream(position) + err = pipeline.ConsumeMsgStream(ctx, position) if err != nil { err = merr.WrapErrServiceUnavailable(err.Error(), "InitPipelineFailed") log.Warn(err.Error(), diff --git a/internal/util/pipeline/stream_pipeline.go b/internal/util/pipeline/stream_pipeline.go index c18dd19651..d340fb3829 100644 --- a/internal/util/pipeline/stream_pipeline.go +++ b/internal/util/pipeline/stream_pipeline.go @@ -40,7 +40,7 @@ import ( type StreamPipeline interface { Pipeline - ConsumeMsgStream(position *msgpb.MsgPosition) error + ConsumeMsgStream(ctx context.Context, position *msgpb.MsgPosition) error Status() string } @@ -85,7 +85,7 @@ func (p *streamPipeline) Status() string { return "Healthy" } -func (p *streamPipeline) ConsumeMsgStream(position *msgpb.MsgPosition) error { +func (p *streamPipeline) ConsumeMsgStream(ctx context.Context, position *msgpb.MsgPosition) error { var err error if position == nil { log.Error("seek stream to nil position") @@ -101,7 +101,7 @@ func (p *streamPipeline) ConsumeMsgStream(position *msgpb.MsgPosition) error { zap.Uint64("timestamp", position.GetTimestamp()), ) handler := adaptor.NewMsgPackAdaptorHandler() - p.scanner = streaming.WAL().Read(context.Background(), streaming.ReadOption{ + p.scanner = streaming.WAL().Read(ctx, streaming.ReadOption{ VChannel: position.GetChannelName(), DeliverPolicy: options.DeliverPolicyStartFrom(startFrom), DeliverFilters: []options.DeliverFilter{ @@ -117,7 +117,7 @@ func (p *streamPipeline) ConsumeMsgStream(position *msgpb.MsgPosition) error { } start := time.Now() - p.input, err = p.dispatcher.Register(context.TODO(), p.vChannel, position, common.SubscriptionPositionUnknown) + p.input, err = p.dispatcher.Register(ctx, p.vChannel, position, common.SubscriptionPositionUnknown) if err != nil { log.Error("dispatcher register failed", zap.String("channel", position.ChannelName)) return WrapErrRegDispather(err) diff --git a/internal/util/pipeline/stream_pipeline_test.go b/internal/util/pipeline/stream_pipeline_test.go index 0f94bd18b5..8ceaf38e52 100644 --- a/internal/util/pipeline/stream_pipeline_test.go +++ b/internal/util/pipeline/stream_pipeline_test.go @@ -17,6 +17,7 @@ package pipeline import ( + context2 "context" "fmt" "testing" @@ -63,7 +64,7 @@ func (suite *StreamPipelineSuite) TestBasic() { }) } - err := suite.pipeline.ConsumeMsgStream(&msgpb.MsgPosition{}) + err := suite.pipeline.ConsumeMsgStream(context2.Background(), &msgpb.MsgPosition{}) suite.NoError(err) suite.pipeline.Start()