add retry on get recovery info (#23764)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2023-04-27 17:46:34 +08:00 committed by GitHub
parent 66a5efeb3d
commit f36cdc182a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 30 deletions

View File

@ -24,6 +24,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil"
"github.com/samber/lo"
"go.uber.org/zap"
@ -136,10 +137,6 @@ func (mgr *TargetManager) updateCollectionNextTarget(collectionID int64, partiti
}
func (mgr *TargetManager) PullNextTargetV1(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (*CollectionTarget, error) {
log.Info("start to pull next targets for partition",
zap.Int64("collectionID", collectionID),
zap.Int64s("chosenPartitionIDs", chosenPartitionIDs))
channelInfos := make(map[string][]*datapb.VchannelInfo)
segments := make(map[int64]*datapb.SegmentInfo, 0)
dmChannels := make(map[string]*DmChannel)
@ -203,41 +200,45 @@ func (mgr *TargetManager) PullNextTarget(broker Broker, collectionID int64, chos
return NewCollectionTarget(segments, dmChannels), nil
}
tryPullNextTargetV1 := func() (*CollectionTarget, error) {
// for rolling upgrade, when call GetRecoveryInfoV2 failed, back to retry GetRecoveryInfo
target, err := mgr.PullNextTargetV1(broker, collectionID, chosenPartitionIDs...)
var target *CollectionTarget
getRecoveryInfo := func() error {
var err error
vChannelInfos, segmentInfos, err := broker.GetRecoveryInfoV2(context.TODO(), collectionID)
if err != nil {
return nil, err
// if meet rpc error, for compatibility with previous versions, try pull next target v1
if funcutil.IsGrpcErr(err) {
target, err = mgr.PullNextTargetV1(broker, collectionID, chosenPartitionIDs...)
return err
}
return err
}
return target, nil
for _, info := range vChannelInfos {
channelInfos[info.GetChannelName()] = append(channelInfos[info.GetChannelName()], info)
}
partitionSet := typeutil.NewUniqueSet(chosenPartitionIDs...)
for _, segmentInfo := range segmentInfos {
if partitionSet.Contain(segmentInfo.GetPartitionID()) {
segments[segmentInfo.GetID()] = segmentInfo
}
}
for _, infos := range channelInfos {
merged := mgr.mergeDmChannelInfo(infos)
dmChannels[merged.GetChannelName()] = merged
}
target = NewCollectionTarget(segments, dmChannels)
return nil
}
// we should pull `channel targets` from all partitions because QueryNodes need to load
// the complete growing segments. And we should pull `segments targets` only from the chosen partitions.
vChannelInfos, segmentInfos, err := broker.GetRecoveryInfoV2(context.TODO(), collectionID)
err := retry.Do(context.TODO(), getRecoveryInfo, retry.Attempts(10))
if err != nil {
if funcutil.IsGrpcErr(err) {
return tryPullNextTargetV1()
}
return nil, err
}
for _, info := range vChannelInfos {
channelInfos[info.GetChannelName()] = append(channelInfos[info.GetChannelName()], info)
}
partitionSet := typeutil.NewUniqueSet(chosenPartitionIDs...)
for _, segmentInfo := range segmentInfos {
if partitionSet.Contain(segmentInfo.GetPartitionID()) {
segments[segmentInfo.GetID()] = segmentInfo
}
}
for _, infos := range channelInfos {
merged := mgr.mergeDmChannelInfo(infos)
dmChannels[merged.GetChannelName()] = merged
}
return NewCollectionTarget(segments, dmChannels), nil
return target, nil
}
func (mgr *TargetManager) mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmChannel {

View File

@ -19,6 +19,7 @@ package meta
import (
"testing"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
@ -216,12 +217,20 @@ func (suite *TargetManagerSuite) TestUpdateNextTarget() {
suite.assertChannels([]string{}, suite.mgr.GetDmChannelsByCollection(collectionID, CurrentTarget))
suite.broker.ExpectedCalls = nil
// test getRecoveryInfoV2 failed , then back to getRecoveryInfo succeed
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(nil, nil, status.Errorf(codes.NotFound, "fake not found"))
suite.broker.EXPECT().GetPartitions(mock.Anything, mock.Anything).Return([]int64{1}, nil)
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, collectionID, int64(1)).Return(nextTargetChannels, nextTargetBinlogs, nil)
err := suite.mgr.UpdateCollectionNextTargetWithPartitions(collectionID, int64(1))
suite.NoError(err)
suite.broker.ExpectedCalls = nil
// test getRecoveryInfoV2 failed , then retry getRecoveryInfoV2 succeed
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(nil, nil, errors.New("fake error")).Times(1)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(nextTargetChannels, nextTargetSegments, nil)
err = suite.mgr.UpdateCollectionNextTargetWithPartitions(collectionID, int64(1))
suite.NoError(err)
err = suite.mgr.UpdateCollectionNextTargetWithPartitions(collectionID)
suite.Error(err)