fix: add the request ctx for stream pipeline interface (#37835)

- issue: #37834

Signed-off-by: SimFG <bang.fu@zilliz.com>
This commit is contained in:
SimFG 2024-11-22 03:10:34 +08:00 committed by GitHub
parent 19572f5b06
commit 54aaeda63f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 9 additions and 8 deletions

View File

@ -101,7 +101,7 @@ func (suite *PipelineManagerTestSuite) TestBasic() {
suite.NotNil(pipeline) suite.NotNil(pipeline)
// Init Consumer // Init Consumer
err = pipeline.ConsumeMsgStream(&msgpb.MsgPosition{}) err = pipeline.ConsumeMsgStream(context.Background(), &msgpb.MsgPosition{})
suite.NoError(err) suite.NoError(err)
// Start pipeline // Start pipeline

View File

@ -147,7 +147,7 @@ func (suite *PipelineTestSuite) TestBasic() {
suite.NoError(err) suite.NoError(err)
// Init Consumer // Init Consumer
err = pipeline.ConsumeMsgStream(&msgpb.MsgPosition{}) err = pipeline.ConsumeMsgStream(context.Background(), &msgpb.MsgPosition{})
suite.NoError(err) suite.NoError(err)
err = pipeline.Start() err = pipeline.Start()

View File

@ -325,7 +325,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
MsgID: channel.SeekPosition.MsgID, MsgID: channel.SeekPosition.MsgID,
Timestamp: channel.SeekPosition.Timestamp, Timestamp: channel.SeekPosition.Timestamp,
} }
err = pipeline.ConsumeMsgStream(position) err = pipeline.ConsumeMsgStream(ctx, position)
if err != nil { if err != nil {
err = merr.WrapErrServiceUnavailable(err.Error(), "InitPipelineFailed") err = merr.WrapErrServiceUnavailable(err.Error(), "InitPipelineFailed")
log.Warn(err.Error(), log.Warn(err.Error(),

View File

@ -40,7 +40,7 @@ import (
type StreamPipeline interface { type StreamPipeline interface {
Pipeline Pipeline
ConsumeMsgStream(position *msgpb.MsgPosition) error ConsumeMsgStream(ctx context.Context, position *msgpb.MsgPosition) error
Status() string Status() string
} }
@ -85,7 +85,7 @@ func (p *streamPipeline) Status() string {
return "Healthy" return "Healthy"
} }
func (p *streamPipeline) ConsumeMsgStream(position *msgpb.MsgPosition) error { func (p *streamPipeline) ConsumeMsgStream(ctx context.Context, position *msgpb.MsgPosition) error {
var err error var err error
if position == nil { if position == nil {
log.Error("seek stream to nil position") log.Error("seek stream to nil position")
@ -101,7 +101,7 @@ func (p *streamPipeline) ConsumeMsgStream(position *msgpb.MsgPosition) error {
zap.Uint64("timestamp", position.GetTimestamp()), zap.Uint64("timestamp", position.GetTimestamp()),
) )
handler := adaptor.NewMsgPackAdaptorHandler() handler := adaptor.NewMsgPackAdaptorHandler()
p.scanner = streaming.WAL().Read(context.Background(), streaming.ReadOption{ p.scanner = streaming.WAL().Read(ctx, streaming.ReadOption{
VChannel: position.GetChannelName(), VChannel: position.GetChannelName(),
DeliverPolicy: options.DeliverPolicyStartFrom(startFrom), DeliverPolicy: options.DeliverPolicyStartFrom(startFrom),
DeliverFilters: []options.DeliverFilter{ DeliverFilters: []options.DeliverFilter{
@ -117,7 +117,7 @@ func (p *streamPipeline) ConsumeMsgStream(position *msgpb.MsgPosition) error {
} }
start := time.Now() start := time.Now()
p.input, err = p.dispatcher.Register(context.TODO(), p.vChannel, position, common.SubscriptionPositionUnknown) p.input, err = p.dispatcher.Register(ctx, p.vChannel, position, common.SubscriptionPositionUnknown)
if err != nil { if err != nil {
log.Error("dispatcher register failed", zap.String("channel", position.ChannelName)) log.Error("dispatcher register failed", zap.String("channel", position.ChannelName))
return WrapErrRegDispather(err) return WrapErrRegDispather(err)

View File

@ -17,6 +17,7 @@
package pipeline package pipeline
import ( import (
context2 "context"
"fmt" "fmt"
"testing" "testing"
@ -63,7 +64,7 @@ func (suite *StreamPipelineSuite) TestBasic() {
}) })
} }
err := suite.pipeline.ConsumeMsgStream(&msgpb.MsgPosition{}) err := suite.pipeline.ConsumeMsgStream(context2.Background(), &msgpb.MsgPosition{})
suite.NoError(err) suite.NoError(err)
suite.pipeline.Start() suite.pipeline.Start()