From a63b4cedcf79834f9eccb996abcc5edfadceee97 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Tue, 19 Mar 2024 20:35:07 -0700 Subject: [PATCH] fix: remove some unnecessary unrecoverable errors (#31327) use retry.handle when request is not able to service but don't throw unrecoverable erros fix #31323 Signed-off-by: xiaofanluan --- internal/datanode/services.go | 3 - .../querynodev2/delegator/delegator_data.go | 20 +++--- .../delegator/delegator_data_test.go | 67 +++++++++++++++++++ 3 files changed, 77 insertions(+), 13 deletions(-) diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 6f45f2ed58..e9b37eb709 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -778,9 +778,6 @@ func saveSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest, res *rootcoo }) // Only retrying when DataCoord is unhealthy or err != nil, otherwise return immediately. if err != nil { - if errors.Is(err, merr.ErrServiceNotReady) { - return retry.Unrecoverable(err) - } return err } return nil diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 9a4871cb2d..f983533a26 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -21,6 +21,7 @@ import ( "fmt" "math/rand" "sort" + "time" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -272,9 +273,9 @@ func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker delRecord, ok := delRecords[segmentEntry.SegmentID] if ok { log.Debug("delegator plan to applyDelete via worker") - err := retry.Do(ctx, func() error { + err := retry.Handle(ctx, func() (bool, error) { if sd.Stopped() { - return retry.Unrecoverable(merr.WrapErrChannelNotAvailable(sd.vchannelName, "channel is unsubscribing")) + return false, merr.WrapErrChannelNotAvailable(sd.vchannelName, "channel is unsubscribing") } err := worker.Delete(ctx, &querypb.DeleteRequest{ @@ -289,17 +290,15 @@ func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker }) if errors.Is(err, merr.ErrNodeNotFound) { log.Warn("try to delete data on non-exist node") - return retry.Unrecoverable(err) + return false, err } else if errors.IsAny(err, merr.ErrSegmentNotFound, merr.ErrSegmentNotLoaded) { log.Warn("try to delete data of released segment") - return nil + return false, nil } else if err != nil { - log.Warn("worker failed to delete on segment", - zap.Error(err), - ) - return err + log.Warn("worker failed to delete on segment", zap.Error(err)) + return true, err } - return nil + return false, nil }, retry.Attempts(10)) if err != nil { log.Warn("apply delete for segment failed, marking it offline") @@ -712,6 +711,7 @@ func (sd *shardDelegator) readDeleteFromMsgstream(ctx context.Context, position return nil, err } + ts = time.Now() err = stream.Seek(context.TODO(), []*msgpb.MsgPosition{position}) if err != nil { return nil, err @@ -761,7 +761,7 @@ func (sd *shardDelegator) readDeleteFromMsgstream(ctx context.Context, position } } } - + log.Info("successfully read delete from stream ", zap.Duration("time spent", time.Since(ts))) return result, nil } diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index cfb64dc9e1..897a4e4de2 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -361,6 +361,73 @@ func (s *DelegatorDataSuite) TestProcessDelete() { }, 10) s.False(s.delegator.distribution.Serviceable()) + + worker1.EXPECT().LoadSegments(mock.Anything, mock.AnythingOfType("*querypb.LoadSegmentsRequest")). + Return(nil) + // reload, refresh the state + s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{ + Base: commonpbutil.NewMsgBase(), + DstNodeID: 1, + CollectionID: s.collectionID, + Infos: []*querypb.SegmentLoadInfo{ + { + SegmentID: 1000, + CollectionID: s.collectionID, + PartitionID: 500, + StartPosition: &msgpb.MsgPosition{Timestamp: 20000}, + DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000}, + }, + }, + Version: 1, + }) + s.Require().NoError(err) + s.True(s.delegator.distribution.Serviceable()) + // Test normal errors with retry and fail + worker1.ExpectedCalls = nil + worker1.EXPECT().Delete(mock.Anything, mock.Anything).Return(merr.ErrSegcore) + s.delegator.ProcessDelete([]*DeleteData{ + { + PartitionID: 500, + PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(10)}, + Timestamps: []uint64{10}, + RowCount: 1, + }, + }, 10) + s.False(s.delegator.distribution.Serviceable(), "should retry and failed") + + // refresh + worker1.EXPECT().LoadSegments(mock.Anything, mock.AnythingOfType("*querypb.LoadSegmentsRequest")). + Return(nil) + // reload, refresh the state + s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{ + Base: commonpbutil.NewMsgBase(), + DstNodeID: 1, + CollectionID: s.collectionID, + Infos: []*querypb.SegmentLoadInfo{ + { + SegmentID: 1000, + CollectionID: s.collectionID, + PartitionID: 500, + StartPosition: &msgpb.MsgPosition{Timestamp: 20000}, + DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000}, + }, + }, + Version: 2, + }) + s.Require().NoError(err) + s.True(s.delegator.distribution.Serviceable()) + + s.delegator.Close() + s.delegator.ProcessDelete([]*DeleteData{ + { + PartitionID: 500, + PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(10)}, + Timestamps: []uint64{10}, + RowCount: 1, + }, + }, 10) + s.Require().NoError(err) + s.False(s.delegator.distribution.Serviceable()) } func (s *DelegatorDataSuite) TestLoadSegments() {