diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index 965c364c87..3a13216a20 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -21,7 +21,6 @@ import ( "sync" "time" - "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/zap" @@ -29,7 +28,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/merr" ) const ( @@ -130,24 +128,6 @@ func (s *importScheduler) process() { } } -func (s *importScheduler) checkErr(task ImportTask, err error) { - if merr.IsRetryableErr(err) || merr.IsCanceledOrTimeout(err) || errors.Is(err, merr.ErrNodeNotFound) { - err = s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Pending)) - if err != nil { - log.Warn("failed to update import task state to pending", WrapTaskLog(task, zap.Error(err))...) - return - } - log.Info("reset task state to pending due to error occurs", WrapTaskLog(task, zap.Error(err))...) - } else { - err = s.imeta.UpdateJob(task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error())) - if err != nil { - log.Warn("failed to update job state to Failed", zap.Int64("jobID", task.GetJobID()), zap.Error(err)) - return - } - log.Info("import task failed", WrapTaskLog(task, zap.Error(err))...) - } -} - func (s *importScheduler) peekSlots() map[int64]int64 { nodeIDs := lo.Map(s.cluster.GetSessions(), func(s *Session, _ int) int64 { return s.info.NodeID @@ -229,8 +209,11 @@ func (s *importScheduler) processInProgressPreImport(task ImportTask) { } resp, err := s.cluster.QueryPreImport(task.GetNodeID(), req) if err != nil { - log.Warn("query preimport failed", WrapTaskLog(task, zap.Error(err))...) - s.checkErr(task, err) + updateErr := s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Pending)) + if updateErr != nil { + log.Warn("failed to update preimport task state to pending", WrapTaskLog(task, zap.Error(updateErr))...) + } + log.Info("reset preimport task state to pending due to error occurs", WrapTaskLog(task, zap.Error(err))...) return } if resp.GetState() == datapb.ImportTaskStateV2_Failed { @@ -262,8 +245,11 @@ func (s *importScheduler) processInProgressImport(task ImportTask) { } resp, err := s.cluster.QueryImport(task.GetNodeID(), req) if err != nil { - log.Warn("query import failed", WrapTaskLog(task, zap.Error(err))...) - s.checkErr(task, err) + updateErr := s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Pending)) + if updateErr != nil { + log.Warn("failed to update import task state to pending", WrapTaskLog(task, zap.Error(updateErr))...) + } + log.Info("reset import task state to pending due to error occurs", WrapTaskLog(task, zap.Error(err))...) return } if resp.GetState() == datapb.ImportTaskStateV2_Failed { diff --git a/internal/datacoord/import_scheduler_test.go b/internal/datacoord/import_scheduler_test.go index 5e521ccb3f..20356d5a37 100644 --- a/internal/datacoord/import_scheduler_test.go +++ b/internal/datacoord/import_scheduler_test.go @@ -21,7 +21,6 @@ import ( "math" "testing" - "github.com/cockroachdb/errors" "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -29,8 +28,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/pkg/util/merr" ) type ImportSchedulerSuite struct { @@ -73,57 +70,6 @@ func (s *ImportSchedulerSuite) SetupTest() { s.scheduler = NewImportScheduler(s.meta, s.cluster, s.alloc, s.imeta).(*importScheduler) } -func (s *ImportSchedulerSuite) TestCheckErr() { - s.catalog.EXPECT().SaveImportJob(mock.Anything).Return(nil) - s.catalog.EXPECT().SavePreImportTask(mock.Anything).Return(nil) - - var job ImportJob = &importJob{ - ImportJob: &datapb.ImportJob{ - JobID: 0, - CollectionID: s.collectionID, - TimeoutTs: math.MaxUint64, - Schema: &schemapb.CollectionSchema{}, - State: internalpb.ImportJobState_Pending, - }, - } - err := s.imeta.AddJob(job) - s.NoError(err) - var task ImportTask = &preImportTask{ - PreImportTask: &datapb.PreImportTask{ - JobID: 0, - TaskID: 1, - CollectionID: s.collectionID, - State: datapb.ImportTaskStateV2_InProgress, - }, - } - err = s.imeta.AddTask(task) - s.NoError(err) - - // checkErr and update state - s.scheduler.checkErr(task, merr.ErrNodeNotFound) - task = s.imeta.GetTask(task.GetTaskID()) - s.Equal(datapb.ImportTaskStateV2_Pending, task.GetState()) - - s.scheduler.checkErr(task, errors.New("mock err")) - job = s.imeta.GetJob(job.GetJobID()) - s.Equal(internalpb.ImportJobState_Failed, job.GetState()) - - // update state failed - err = s.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Pending)) - s.NoError(err) - err = s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_None)) - s.NoError(err) - s.catalog.ExpectedCalls = nil - s.catalog.EXPECT().SaveImportJob(mock.Anything).Return(errors.New("mock err")) - s.catalog.EXPECT().SavePreImportTask(mock.Anything).Return(errors.New("mock err")) - - s.scheduler.checkErr(task, merr.ErrNodeNotFound) - s.Equal(datapb.ImportTaskStateV2_None, s.imeta.GetTask(task.GetTaskID()).GetState()) - - s.scheduler.checkErr(task, errors.New("mock err")) - s.Equal(internalpb.ImportJobState_Pending, s.imeta.GetJob(job.GetJobID()).GetState()) -} - func (s *ImportSchedulerSuite) TestProcessPreImport() { s.catalog.EXPECT().SaveImportJob(mock.Anything).Return(nil) s.catalog.EXPECT().SavePreImportTask(mock.Anything).Return(nil)