diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index d76ccb8d72..432ce5ab72 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -99,7 +99,6 @@ func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectio msg := "show collection failed" log.Warn(msg, zap.Error(err)) status := merr.Status(errors.Wrap(err, msg)) - status.ErrorCode = commonpb.ErrorCode_InsufficientMemoryToLoad return &querypb.ShowCollectionsResponse{ Status: status, }, nil @@ -156,7 +155,6 @@ func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions err := meta.GlobalFailedLoadCache.Get(req.GetCollectionID()) if err != nil { status := merr.Status(err) - status.ErrorCode = commonpb.ErrorCode_InsufficientMemoryToLoad log.Warn("show partition failed", zap.Error(err)) return &querypb.ShowPartitionsResponse{ Status: status, diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 11b52a3ab3..c8e24fa15b 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -147,7 +147,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) { defer func() { if err != nil { for i := range mergeTask.tasks { - mergeTask.tasks[i].Cancel(err) + mergeTask.tasks[i].Fail(err) } } for i := range mergeTask.tasks { @@ -235,7 +235,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { var err error defer func() { if err != nil { - task.Cancel(err) + task.Fail(err) ex.removeTask(task, step) } }() @@ -396,7 +396,7 @@ func (ex *Executor) subDmChannel(task *ChannelTask, step int) error { var err error defer func() { if err != nil { - task.Cancel(err) + task.Fail(err) } }() @@ -479,7 +479,7 @@ func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) error { var err error defer func() { if err != nil { - task.Cancel(err) + task.Fail(err) } }() diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 44403b5dfe..7b891fc2d4 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -675,6 +675,14 @@ func (scheduler *taskScheduler) RemoveByNode(node int64) { } func (scheduler *taskScheduler) recordSegmentTaskError(task *SegmentTask) { + log.Warn("task scheduler recordSegmentTaskError", + zap.Int64("taskID", task.ID()), + zap.Int64("collectionID", task.CollectionID()), + zap.Int64("replicaID", task.ReplicaID()), + zap.Int64("segmentID", task.SegmentID()), + zap.Int32("taskStatus", task.Status()), + zap.Error(task.err), + ) meta.GlobalFailedLoadCache.Put(task.collectionID, task.Err()) } @@ -695,8 +703,7 @@ func (scheduler *taskScheduler) remove(task Task) { index := NewReplicaSegmentIndex(task) delete(scheduler.segmentTasks, index) log = log.With(zap.Int64("segmentID", task.SegmentID())) - if task.Err() != nil && !errors.Is(task.Err(), merr.ErrChannelNotFound) { - log.Warn("task scheduler recordSegmentTaskError", zap.Error(task.err)) + if task.Status() == TaskStatusFailed && task.Err() != nil && !errors.Is(task.Err(), merr.ErrChannelNotFound) { scheduler.recordSegmentTaskError(task) } diff --git a/internal/querycoordv2/task/task.go b/internal/querycoordv2/task/task.go index 72be3e6a25..128f549a99 100644 --- a/internal/querycoordv2/task/task.go +++ b/internal/querycoordv2/task/task.go @@ -37,6 +37,7 @@ const ( TaskStatusStarted TaskStatusSucceeded TaskStatusCanceled + TaskStatusFailed ) const ( @@ -74,7 +75,11 @@ type Task interface { SetPriority(priority Priority) Index() string // dedup indexing string + // cancel the task as we don't need to continue it Cancel(err error) + // fail the task as we encounter some error so be unable to continue, + // this error will be recorded for response to user requests + Fail(err error) Wait() error Actions() []Action Step() int @@ -191,6 +196,17 @@ func (task *baseTask) Cancel(err error) { } } +func (task *baseTask) Fail(err error) { + if task.canceled.CompareAndSwap(false, true) { + task.cancel() + if task.Status() != TaskStatusSucceeded { + task.SetStatus(TaskStatusFailed) + } + task.err = err + close(task.doneCh) + } +} + func (task *baseTask) Wait() error { <-task.doneCh return task.err diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 8ee9cf1413..c5fe17d5e9 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -621,7 +621,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() { suite.AssertTaskNum(0, 0, 0, 0) for _, task := range tasks { - suite.Equal(TaskStatusCanceled, task.Status()) + suite.Equal(TaskStatusFailed, task.Status()) } } diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 1f6f17f78e..093830dacc 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -90,6 +90,8 @@ func oldCode(code int32) commonpb.ErrorCode { return commonpb.ErrorCode_MetaFailed case ErrReplicaNotAvailable.code(), ErrChannelNotAvailable.code(), ErrNodeNotAvailable.code(): return commonpb.ErrorCode_NoReplicaAvailable + case ErrServiceMemoryLimitExceeded.code(): + return commonpb.ErrorCode_InsufficientMemoryToLoad default: return commonpb.ErrorCode_UnexpectedError }