mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
Record only failed task error (#26033)
Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
parent
037a58a60d
commit
2180ef180c
@ -99,7 +99,6 @@ func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectio
|
|||||||
msg := "show collection failed"
|
msg := "show collection failed"
|
||||||
log.Warn(msg, zap.Error(err))
|
log.Warn(msg, zap.Error(err))
|
||||||
status := merr.Status(errors.Wrap(err, msg))
|
status := merr.Status(errors.Wrap(err, msg))
|
||||||
status.ErrorCode = commonpb.ErrorCode_InsufficientMemoryToLoad
|
|
||||||
return &querypb.ShowCollectionsResponse{
|
return &querypb.ShowCollectionsResponse{
|
||||||
Status: status,
|
Status: status,
|
||||||
}, nil
|
}, nil
|
||||||
@ -156,7 +155,6 @@ func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions
|
|||||||
err := meta.GlobalFailedLoadCache.Get(req.GetCollectionID())
|
err := meta.GlobalFailedLoadCache.Get(req.GetCollectionID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
status := merr.Status(err)
|
status := merr.Status(err)
|
||||||
status.ErrorCode = commonpb.ErrorCode_InsufficientMemoryToLoad
|
|
||||||
log.Warn("show partition failed", zap.Error(err))
|
log.Warn("show partition failed", zap.Error(err))
|
||||||
return &querypb.ShowPartitionsResponse{
|
return &querypb.ShowPartitionsResponse{
|
||||||
Status: status,
|
Status: status,
|
||||||
|
|||||||
@ -147,7 +147,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) {
|
|||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
for i := range mergeTask.tasks {
|
for i := range mergeTask.tasks {
|
||||||
mergeTask.tasks[i].Cancel(err)
|
mergeTask.tasks[i].Fail(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for i := range mergeTask.tasks {
|
for i := range mergeTask.tasks {
|
||||||
@ -235,7 +235,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
|
|||||||
var err error
|
var err error
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
task.Cancel(err)
|
task.Fail(err)
|
||||||
ex.removeTask(task, step)
|
ex.removeTask(task, step)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -396,7 +396,7 @@ func (ex *Executor) subDmChannel(task *ChannelTask, step int) error {
|
|||||||
var err error
|
var err error
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
task.Cancel(err)
|
task.Fail(err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -479,7 +479,7 @@ func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) error {
|
|||||||
var err error
|
var err error
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
task.Cancel(err)
|
task.Fail(err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|||||||
@ -675,6 +675,14 @@ func (scheduler *taskScheduler) RemoveByNode(node int64) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *taskScheduler) recordSegmentTaskError(task *SegmentTask) {
|
func (scheduler *taskScheduler) recordSegmentTaskError(task *SegmentTask) {
|
||||||
|
log.Warn("task scheduler recordSegmentTaskError",
|
||||||
|
zap.Int64("taskID", task.ID()),
|
||||||
|
zap.Int64("collectionID", task.CollectionID()),
|
||||||
|
zap.Int64("replicaID", task.ReplicaID()),
|
||||||
|
zap.Int64("segmentID", task.SegmentID()),
|
||||||
|
zap.Int32("taskStatus", task.Status()),
|
||||||
|
zap.Error(task.err),
|
||||||
|
)
|
||||||
meta.GlobalFailedLoadCache.Put(task.collectionID, task.Err())
|
meta.GlobalFailedLoadCache.Put(task.collectionID, task.Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -695,8 +703,7 @@ func (scheduler *taskScheduler) remove(task Task) {
|
|||||||
index := NewReplicaSegmentIndex(task)
|
index := NewReplicaSegmentIndex(task)
|
||||||
delete(scheduler.segmentTasks, index)
|
delete(scheduler.segmentTasks, index)
|
||||||
log = log.With(zap.Int64("segmentID", task.SegmentID()))
|
log = log.With(zap.Int64("segmentID", task.SegmentID()))
|
||||||
if task.Err() != nil && !errors.Is(task.Err(), merr.ErrChannelNotFound) {
|
if task.Status() == TaskStatusFailed && task.Err() != nil && !errors.Is(task.Err(), merr.ErrChannelNotFound) {
|
||||||
log.Warn("task scheduler recordSegmentTaskError", zap.Error(task.err))
|
|
||||||
scheduler.recordSegmentTaskError(task)
|
scheduler.recordSegmentTaskError(task)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -37,6 +37,7 @@ const (
|
|||||||
TaskStatusStarted
|
TaskStatusStarted
|
||||||
TaskStatusSucceeded
|
TaskStatusSucceeded
|
||||||
TaskStatusCanceled
|
TaskStatusCanceled
|
||||||
|
TaskStatusFailed
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -74,7 +75,11 @@ type Task interface {
|
|||||||
SetPriority(priority Priority)
|
SetPriority(priority Priority)
|
||||||
Index() string // dedup indexing string
|
Index() string // dedup indexing string
|
||||||
|
|
||||||
|
// cancel the task as we don't need to continue it
|
||||||
Cancel(err error)
|
Cancel(err error)
|
||||||
|
// fail the task as we encounter some error so be unable to continue,
|
||||||
|
// this error will be recorded for response to user requests
|
||||||
|
Fail(err error)
|
||||||
Wait() error
|
Wait() error
|
||||||
Actions() []Action
|
Actions() []Action
|
||||||
Step() int
|
Step() int
|
||||||
@ -191,6 +196,17 @@ func (task *baseTask) Cancel(err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (task *baseTask) Fail(err error) {
|
||||||
|
if task.canceled.CompareAndSwap(false, true) {
|
||||||
|
task.cancel()
|
||||||
|
if task.Status() != TaskStatusSucceeded {
|
||||||
|
task.SetStatus(TaskStatusFailed)
|
||||||
|
}
|
||||||
|
task.err = err
|
||||||
|
close(task.doneCh)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (task *baseTask) Wait() error {
|
func (task *baseTask) Wait() error {
|
||||||
<-task.doneCh
|
<-task.doneCh
|
||||||
return task.err
|
return task.err
|
||||||
|
|||||||
@ -621,7 +621,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() {
|
|||||||
suite.AssertTaskNum(0, 0, 0, 0)
|
suite.AssertTaskNum(0, 0, 0, 0)
|
||||||
|
|
||||||
for _, task := range tasks {
|
for _, task := range tasks {
|
||||||
suite.Equal(TaskStatusCanceled, task.Status())
|
suite.Equal(TaskStatusFailed, task.Status())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -90,6 +90,8 @@ func oldCode(code int32) commonpb.ErrorCode {
|
|||||||
return commonpb.ErrorCode_MetaFailed
|
return commonpb.ErrorCode_MetaFailed
|
||||||
case ErrReplicaNotAvailable.code(), ErrChannelNotAvailable.code(), ErrNodeNotAvailable.code():
|
case ErrReplicaNotAvailable.code(), ErrChannelNotAvailable.code(), ErrNodeNotAvailable.code():
|
||||||
return commonpb.ErrorCode_NoReplicaAvailable
|
return commonpb.ErrorCode_NoReplicaAvailable
|
||||||
|
case ErrServiceMemoryLimitExceeded.code():
|
||||||
|
return commonpb.ErrorCode_InsufficientMemoryToLoad
|
||||||
default:
|
default:
|
||||||
return commonpb.ErrorCode_UnexpectedError
|
return commonpb.ErrorCode_UnexpectedError
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user