enhance: Allow import tasks to retry for more errors (#31268)

Allow import tasks to retry for a wider range of errors, including all
gRPC errors and unexpected status codes from Milvus.

issue: https://github.com/milvus-io/milvus/issues/31227,
https://github.com/milvus-io/milvus/issues/28521

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2024-03-15 11:05:04 +08:00 committed by GitHub
parent 811316d2ba
commit 2b035ba2d4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 10 additions and 78 deletions

View File

@ -21,7 +21,6 @@ import (
"sync"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"
@ -29,7 +28,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
)
const (
@ -130,24 +128,6 @@ func (s *importScheduler) process() {
}
}
func (s *importScheduler) checkErr(task ImportTask, err error) {
if merr.IsRetryableErr(err) || merr.IsCanceledOrTimeout(err) || errors.Is(err, merr.ErrNodeNotFound) {
err = s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Pending))
if err != nil {
log.Warn("failed to update import task state to pending", WrapTaskLog(task, zap.Error(err))...)
return
}
log.Info("reset task state to pending due to error occurs", WrapTaskLog(task, zap.Error(err))...)
} else {
err = s.imeta.UpdateJob(task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
if err != nil {
log.Warn("failed to update job state to Failed", zap.Int64("jobID", task.GetJobID()), zap.Error(err))
return
}
log.Info("import task failed", WrapTaskLog(task, zap.Error(err))...)
}
}
func (s *importScheduler) peekSlots() map[int64]int64 {
nodeIDs := lo.Map(s.cluster.GetSessions(), func(s *Session, _ int) int64 {
return s.info.NodeID
@ -229,8 +209,11 @@ func (s *importScheduler) processInProgressPreImport(task ImportTask) {
}
resp, err := s.cluster.QueryPreImport(task.GetNodeID(), req)
if err != nil {
log.Warn("query preimport failed", WrapTaskLog(task, zap.Error(err))...)
s.checkErr(task, err)
updateErr := s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Pending))
if updateErr != nil {
log.Warn("failed to update preimport task state to pending", WrapTaskLog(task, zap.Error(updateErr))...)
}
log.Info("reset preimport task state to pending due to error occurs", WrapTaskLog(task, zap.Error(err))...)
return
}
if resp.GetState() == datapb.ImportTaskStateV2_Failed {
@ -262,8 +245,11 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
}
resp, err := s.cluster.QueryImport(task.GetNodeID(), req)
if err != nil {
log.Warn("query import failed", WrapTaskLog(task, zap.Error(err))...)
s.checkErr(task, err)
updateErr := s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Pending))
if updateErr != nil {
log.Warn("failed to update import task state to pending", WrapTaskLog(task, zap.Error(updateErr))...)
}
log.Info("reset import task state to pending due to error occurs", WrapTaskLog(task, zap.Error(err))...)
return
}
if resp.GetState() == datapb.ImportTaskStateV2_Failed {

View File

@ -21,7 +21,6 @@ import (
"math"
"testing"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
@ -29,8 +28,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/pkg/util/merr"
)
type ImportSchedulerSuite struct {
@ -73,57 +70,6 @@ func (s *ImportSchedulerSuite) SetupTest() {
s.scheduler = NewImportScheduler(s.meta, s.cluster, s.alloc, s.imeta).(*importScheduler)
}
func (s *ImportSchedulerSuite) TestCheckErr() {
s.catalog.EXPECT().SaveImportJob(mock.Anything).Return(nil)
s.catalog.EXPECT().SavePreImportTask(mock.Anything).Return(nil)
var job ImportJob = &importJob{
ImportJob: &datapb.ImportJob{
JobID: 0,
CollectionID: s.collectionID,
TimeoutTs: math.MaxUint64,
Schema: &schemapb.CollectionSchema{},
State: internalpb.ImportJobState_Pending,
},
}
err := s.imeta.AddJob(job)
s.NoError(err)
var task ImportTask = &preImportTask{
PreImportTask: &datapb.PreImportTask{
JobID: 0,
TaskID: 1,
CollectionID: s.collectionID,
State: datapb.ImportTaskStateV2_InProgress,
},
}
err = s.imeta.AddTask(task)
s.NoError(err)
// checkErr and update state
s.scheduler.checkErr(task, merr.ErrNodeNotFound)
task = s.imeta.GetTask(task.GetTaskID())
s.Equal(datapb.ImportTaskStateV2_Pending, task.GetState())
s.scheduler.checkErr(task, errors.New("mock err"))
job = s.imeta.GetJob(job.GetJobID())
s.Equal(internalpb.ImportJobState_Failed, job.GetState())
// update state failed
err = s.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Pending))
s.NoError(err)
err = s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_None))
s.NoError(err)
s.catalog.ExpectedCalls = nil
s.catalog.EXPECT().SaveImportJob(mock.Anything).Return(errors.New("mock err"))
s.catalog.EXPECT().SavePreImportTask(mock.Anything).Return(errors.New("mock err"))
s.scheduler.checkErr(task, merr.ErrNodeNotFound)
s.Equal(datapb.ImportTaskStateV2_None, s.imeta.GetTask(task.GetTaskID()).GetState())
s.scheduler.checkErr(task, errors.New("mock err"))
s.Equal(internalpb.ImportJobState_Pending, s.imeta.GetJob(job.GetJobID()).GetState())
}
func (s *ImportSchedulerSuite) TestProcessPreImport() {
s.catalog.EXPECT().SaveImportJob(mock.Anything).Return(nil)
s.catalog.EXPECT().SavePreImportTask(mock.Anything).Return(nil)