diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index a1c86cc560..fe75da1639 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -341,6 +341,9 @@ func (c *importChecker) checkCollection(collectionID int64, jobs []ImportJob) { return } if !has { + jobs = lo.Filter(jobs, func(job ImportJob, _ int) bool { + return job.GetState() != internalpb.ImportJobState_Failed + }) for _, job := range jobs { err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(fmt.Sprintf("collection %d dropped", collectionID))) @@ -388,6 +391,8 @@ func (c *importChecker) checkGC(job ImportJob) { err := c.imeta.RemoveJob(job.GetJobID()) if err != nil { log.Warn("remove import job failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) + return } + log.Info("import job removed", zap.Int64("jobID", job.GetJobID())) } } diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index f1cf30003c..5f042de5db 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -351,9 +351,11 @@ func (s *importScheduler) processFailed(task ImportTask) { return } } - err := s.imeta.UpdateTask(task.GetTaskID(), UpdateSegmentIDs(nil)) - if err != nil { - log.Warn("update import task segments failed", WrapTaskLog(task, zap.Error(err))...) + if len(segments) > 0 { + err := s.imeta.UpdateTask(task.GetTaskID(), UpdateSegmentIDs(nil)) + if err != nil { + log.Warn("update import task segments failed", WrapTaskLog(task, zap.Error(err))...) + } } } err := DropImportTask(task, s.cluster, s.imeta)