mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: remove some unnecessary unrecoverable errors (#31327)
use retry.handle when request is not able to service but don't throw unrecoverable erros fix #31323 Signed-off-by: xiaofanluan <xiaofan.luan@zilliz.com>
This commit is contained in:
parent
66d679ecbb
commit
a63b4cedcf
@ -778,9 +778,6 @@ func saveSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest, res *rootcoo
|
||||
})
|
||||
// Only retrying when DataCoord is unhealthy or err != nil, otherwise return immediately.
|
||||
if err != nil {
|
||||
if errors.Is(err, merr.ErrServiceNotReady) {
|
||||
return retry.Unrecoverable(err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
@ -21,6 +21,7 @@ import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
@ -272,9 +273,9 @@ func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker
|
||||
delRecord, ok := delRecords[segmentEntry.SegmentID]
|
||||
if ok {
|
||||
log.Debug("delegator plan to applyDelete via worker")
|
||||
err := retry.Do(ctx, func() error {
|
||||
err := retry.Handle(ctx, func() (bool, error) {
|
||||
if sd.Stopped() {
|
||||
return retry.Unrecoverable(merr.WrapErrChannelNotAvailable(sd.vchannelName, "channel is unsubscribing"))
|
||||
return false, merr.WrapErrChannelNotAvailable(sd.vchannelName, "channel is unsubscribing")
|
||||
}
|
||||
|
||||
err := worker.Delete(ctx, &querypb.DeleteRequest{
|
||||
@ -289,17 +290,15 @@ func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker
|
||||
})
|
||||
if errors.Is(err, merr.ErrNodeNotFound) {
|
||||
log.Warn("try to delete data on non-exist node")
|
||||
return retry.Unrecoverable(err)
|
||||
return false, err
|
||||
} else if errors.IsAny(err, merr.ErrSegmentNotFound, merr.ErrSegmentNotLoaded) {
|
||||
log.Warn("try to delete data of released segment")
|
||||
return nil
|
||||
return false, nil
|
||||
} else if err != nil {
|
||||
log.Warn("worker failed to delete on segment",
|
||||
zap.Error(err),
|
||||
)
|
||||
return err
|
||||
log.Warn("worker failed to delete on segment", zap.Error(err))
|
||||
return true, err
|
||||
}
|
||||
return nil
|
||||
return false, nil
|
||||
}, retry.Attempts(10))
|
||||
if err != nil {
|
||||
log.Warn("apply delete for segment failed, marking it offline")
|
||||
@ -712,6 +711,7 @@ func (sd *shardDelegator) readDeleteFromMsgstream(ctx context.Context, position
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ts = time.Now()
|
||||
err = stream.Seek(context.TODO(), []*msgpb.MsgPosition{position})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -761,7 +761,7 @@ func (sd *shardDelegator) readDeleteFromMsgstream(ctx context.Context, position
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("successfully read delete from stream ", zap.Duration("time spent", time.Since(ts)))
|
||||
return result, nil
|
||||
}
|
||||
|
||||
|
||||
@ -361,6 +361,73 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
|
||||
}, 10)
|
||||
|
||||
s.False(s.delegator.distribution.Serviceable())
|
||||
|
||||
worker1.EXPECT().LoadSegments(mock.Anything, mock.AnythingOfType("*querypb.LoadSegmentsRequest")).
|
||||
Return(nil)
|
||||
// reload, refresh the state
|
||||
s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
|
||||
Base: commonpbutil.NewMsgBase(),
|
||||
DstNodeID: 1,
|
||||
CollectionID: s.collectionID,
|
||||
Infos: []*querypb.SegmentLoadInfo{
|
||||
{
|
||||
SegmentID: 1000,
|
||||
CollectionID: s.collectionID,
|
||||
PartitionID: 500,
|
||||
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
||||
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
||||
},
|
||||
},
|
||||
Version: 1,
|
||||
})
|
||||
s.Require().NoError(err)
|
||||
s.True(s.delegator.distribution.Serviceable())
|
||||
// Test normal errors with retry and fail
|
||||
worker1.ExpectedCalls = nil
|
||||
worker1.EXPECT().Delete(mock.Anything, mock.Anything).Return(merr.ErrSegcore)
|
||||
s.delegator.ProcessDelete([]*DeleteData{
|
||||
{
|
||||
PartitionID: 500,
|
||||
PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(10)},
|
||||
Timestamps: []uint64{10},
|
||||
RowCount: 1,
|
||||
},
|
||||
}, 10)
|
||||
s.False(s.delegator.distribution.Serviceable(), "should retry and failed")
|
||||
|
||||
// refresh
|
||||
worker1.EXPECT().LoadSegments(mock.Anything, mock.AnythingOfType("*querypb.LoadSegmentsRequest")).
|
||||
Return(nil)
|
||||
// reload, refresh the state
|
||||
s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
|
||||
Base: commonpbutil.NewMsgBase(),
|
||||
DstNodeID: 1,
|
||||
CollectionID: s.collectionID,
|
||||
Infos: []*querypb.SegmentLoadInfo{
|
||||
{
|
||||
SegmentID: 1000,
|
||||
CollectionID: s.collectionID,
|
||||
PartitionID: 500,
|
||||
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
||||
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
|
||||
},
|
||||
},
|
||||
Version: 2,
|
||||
})
|
||||
s.Require().NoError(err)
|
||||
s.True(s.delegator.distribution.Serviceable())
|
||||
|
||||
s.delegator.Close()
|
||||
s.delegator.ProcessDelete([]*DeleteData{
|
||||
{
|
||||
PartitionID: 500,
|
||||
PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(10)},
|
||||
Timestamps: []uint64{10},
|
||||
RowCount: 1,
|
||||
},
|
||||
}, 10)
|
||||
s.Require().NoError(err)
|
||||
s.False(s.delegator.distribution.Serviceable())
|
||||
}
|
||||
|
||||
func (s *DelegatorDataSuite) TestLoadSegments() {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user