From f36cdc182ac40407d84cae98534fe3f85ecc573a Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 27 Apr 2023 17:46:34 +0800 Subject: [PATCH] add retry on get recovery info (#23764) Signed-off-by: Wei Liu --- internal/querycoordv2/meta/target_manager.go | 61 ++++++++++--------- .../querycoordv2/meta/target_manager_test.go | 9 +++ 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index f997e5a6ee..4b2a12af24 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -24,6 +24,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/samber/lo" "go.uber.org/zap" @@ -136,10 +137,6 @@ func (mgr *TargetManager) updateCollectionNextTarget(collectionID int64, partiti } func (mgr *TargetManager) PullNextTargetV1(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (*CollectionTarget, error) { - log.Info("start to pull next targets for partition", - zap.Int64("collectionID", collectionID), - zap.Int64s("chosenPartitionIDs", chosenPartitionIDs)) - channelInfos := make(map[string][]*datapb.VchannelInfo) segments := make(map[int64]*datapb.SegmentInfo, 0) dmChannels := make(map[string]*DmChannel) @@ -203,41 +200,45 @@ func (mgr *TargetManager) PullNextTarget(broker Broker, collectionID int64, chos return NewCollectionTarget(segments, dmChannels), nil } - tryPullNextTargetV1 := func() (*CollectionTarget, error) { - // for rolling upgrade, when call GetRecoveryInfoV2 failed, back to retry GetRecoveryInfo - target, err := mgr.PullNextTargetV1(broker, collectionID, chosenPartitionIDs...) + var target *CollectionTarget + getRecoveryInfo := func() error { + var err error + + vChannelInfos, segmentInfos, err := broker.GetRecoveryInfoV2(context.TODO(), collectionID) if err != nil { - return nil, err + // if meet rpc error, for compatibility with previous versions, try pull next target v1 + if funcutil.IsGrpcErr(err) { + target, err = mgr.PullNextTargetV1(broker, collectionID, chosenPartitionIDs...) + return err + } + return err } - return target, nil + for _, info := range vChannelInfos { + channelInfos[info.GetChannelName()] = append(channelInfos[info.GetChannelName()], info) + } + + partitionSet := typeutil.NewUniqueSet(chosenPartitionIDs...) + for _, segmentInfo := range segmentInfos { + if partitionSet.Contain(segmentInfo.GetPartitionID()) { + segments[segmentInfo.GetID()] = segmentInfo + } + } + + for _, infos := range channelInfos { + merged := mgr.mergeDmChannelInfo(infos) + dmChannels[merged.GetChannelName()] = merged + } + target = NewCollectionTarget(segments, dmChannels) + return nil } - // we should pull `channel targets` from all partitions because QueryNodes need to load - // the complete growing segments. And we should pull `segments targets` only from the chosen partitions. - vChannelInfos, segmentInfos, err := broker.GetRecoveryInfoV2(context.TODO(), collectionID) + err := retry.Do(context.TODO(), getRecoveryInfo, retry.Attempts(10)) if err != nil { - if funcutil.IsGrpcErr(err) { - return tryPullNextTargetV1() - } return nil, err } - for _, info := range vChannelInfos { - channelInfos[info.GetChannelName()] = append(channelInfos[info.GetChannelName()], info) - } - partitionSet := typeutil.NewUniqueSet(chosenPartitionIDs...) - for _, segmentInfo := range segmentInfos { - if partitionSet.Contain(segmentInfo.GetPartitionID()) { - segments[segmentInfo.GetID()] = segmentInfo - } - } - - for _, infos := range channelInfos { - merged := mgr.mergeDmChannelInfo(infos) - dmChannels[merged.GetChannelName()] = merged - } - return NewCollectionTarget(segments, dmChannels), nil + return target, nil } func (mgr *TargetManager) mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmChannel { diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index f8e45352b4..6d3c8cfab7 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -19,6 +19,7 @@ package meta import ( "testing" + "github.com/cockroachdb/errors" "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -216,12 +217,20 @@ func (suite *TargetManagerSuite) TestUpdateNextTarget() { suite.assertChannels([]string{}, suite.mgr.GetDmChannelsByCollection(collectionID, CurrentTarget)) suite.broker.ExpectedCalls = nil + // test getRecoveryInfoV2 failed , then back to getRecoveryInfo succeed suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(nil, nil, status.Errorf(codes.NotFound, "fake not found")) suite.broker.EXPECT().GetPartitions(mock.Anything, mock.Anything).Return([]int64{1}, nil) suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, collectionID, int64(1)).Return(nextTargetChannels, nextTargetBinlogs, nil) err := suite.mgr.UpdateCollectionNextTargetWithPartitions(collectionID, int64(1)) suite.NoError(err) + suite.broker.ExpectedCalls = nil + // test getRecoveryInfoV2 failed , then retry getRecoveryInfoV2 succeed + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(nil, nil, errors.New("fake error")).Times(1) + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(nextTargetChannels, nextTargetSegments, nil) + err = suite.mgr.UpdateCollectionNextTargetWithPartitions(collectionID, int64(1)) + suite.NoError(err) + err = suite.mgr.UpdateCollectionNextTargetWithPartitions(collectionID) suite.Error(err)