From f9e2d00f913826db955d3648a3c021abfb6b3a79 Mon Sep 17 00:00:00 2001 From: SimFG Date: Wed, 12 Jul 2023 17:26:30 +0800 Subject: [PATCH] Prevent `exclusive consumer` exception in pulsar (#25376) Signed-off-by: SimFG --- internal/datanode/data_node.go | 8 +++--- internal/datanode/event_manager.go | 6 ++++- .../querynodev2/delegator/delegator_data.go | 1 + .../delegator/delegator_data_test.go | 26 +++++++++++++++++++ internal/querynodev2/services_test.go | 1 + pkg/mq/msgdispatcher/dispatcher.go | 1 + 6 files changed, 38 insertions(+), 5 deletions(-) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 0997272416..d6d8abc869 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -426,12 +426,12 @@ func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version switch watchInfo.State { case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch: if err := node.flowgraphManager.addAndStart(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil { + log.Warn("handle put event: new data sync service failed", zap.String("vChanName", vChanName), zap.Error(err)) watchInfo.State = datapb.ChannelWatchState_WatchFailure - return fmt.Errorf("fail to add and start flowgraph for vChanName: %s, err: %v", vChanName, err) + } else { + log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName)) + watchInfo.State = datapb.ChannelWatchState_WatchSuccess } - log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName)) - watchInfo.State = datapb.ChannelWatchState_WatchSuccess - case datapb.ChannelWatchState_ToRelease: // there is no reason why we release fail node.tryToReleaseFlowgraph(vChanName) diff --git a/internal/datanode/event_manager.go b/internal/datanode/event_manager.go index 85f8ab359c..8efbd9dda4 100644 --- a/internal/datanode/event_manager.go +++ b/internal/datanode/event_manager.go @@ -73,7 +73,11 @@ func (e *channelEventManager) Run() { case event := <-e.eventChan: switch event.eventType { case putEventType: - e.handlePutEvent(event.info, event.version) + err := e.handlePutEvent(event.info, event.version) + if err != nil { + // logging the error is convenient for follow-up investigation of problems + log.Warn("handle put event failed", zap.String("vChanName", event.vChanName), zap.Error(err)) + } case deleteEventType: e.handleDeleteEvent(event.vChanName) } diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index bb3025122e..517c535c81 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -456,6 +456,7 @@ func (sd *shardDelegator) readDeleteFromMsgstream(ctx context.Context, position if err != nil { return nil, err } + defer stream.Close() vchannelName := position.ChannelName pChannelName := funcutil.ToPhysicalChannel(vchannelName) position.ChannelName = pChannelName diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 5a8cffada4..e91f27b578 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -300,6 +300,32 @@ func (s *DelegatorDataSuite) TestProcessDelete() { RowCount: 1, }, }, 10) + + // load sealed + s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{ + Base: commonpbutil.NewMsgBase(), + DstNodeID: 1, + CollectionID: s.collectionID, + Infos: []*querypb.SegmentLoadInfo{ + { + SegmentID: 1000, + CollectionID: s.collectionID, + PartitionID: 500, + StartPosition: &msgpb.MsgPosition{Timestamp: 5000}, + DeltaPosition: &msgpb.MsgPosition{Timestamp: 5000}, + }, + }, + }) + s.Require().NoError(err) + + s.delegator.ProcessDelete([]*DeleteData{ + { + PartitionID: 500, + PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(10)}, + Timestamps: []uint64{10}, + RowCount: 1, + }, + }, 10) } func (s *DelegatorDataSuite) TestLoadSegments() { diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index c9d957eb32..21f3d6789d 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -373,6 +373,7 @@ func (suite *ServiceSuite) TestWatchDmChannels_Failed() { // init msgstream failed suite.factory.EXPECT().NewTtMsgStream(mock.Anything).Return(suite.msgStream, nil) suite.msgStream.EXPECT().AsConsumer([]string{suite.pchannel}, mock.Anything, mock.Anything).Return() + suite.msgStream.EXPECT().Close().Return() suite.msgStream.EXPECT().Seek(mock.Anything).Return(errors.New("mock error")) status, err := suite.node.WatchDmChannels(ctx, req) diff --git a/pkg/mq/msgdispatcher/dispatcher.go b/pkg/mq/msgdispatcher/dispatcher.go index 97166bcc45..62e48dc528 100644 --- a/pkg/mq/msgdispatcher/dispatcher.go +++ b/pkg/mq/msgdispatcher/dispatcher.go @@ -99,6 +99,7 @@ func NewDispatcher(factory msgstream.Factory, stream.AsConsumer([]string{pchannel}, subName, mqwrapper.SubscriptionPositionUnknown) err = stream.Seek([]*Pos{position}) if err != nil { + stream.Close() log.Error("seek failed", zap.Error(err)) return nil, err }