From b5e62023ea6d8181dc2a2dc4781344a4809f89f3 Mon Sep 17 00:00:00 2001 From: groot Date: Thu, 16 Jun 2022 13:02:10 +0800 Subject: [PATCH] Return error when bulkload pending list hit limit (#17570) Signed-off-by: groot --- internal/rootcoord/import_manager.go | 10 ++++------ internal/rootcoord/import_manager_test.go | 7 +++++++ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/internal/rootcoord/import_manager.go b/internal/rootcoord/import_manager.go index 996b8e10c8..2d70bccc92 100644 --- a/internal/rootcoord/import_manager.go +++ b/internal/rootcoord/import_manager.go @@ -282,13 +282,11 @@ func (m *importManager) importJob(ctx context.Context, req *milvuspb.ImportReque taskCount = len(req.Files) } - // task queue size has a limit, return error if import request contains too many data files + // task queue size has a limit, return error if import request contains too many data files, and skip entire job if capacity-length < taskCount { - resp.Status = &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IllegalArgument, - Reason: "Import task queue max size is " + strconv.Itoa(capacity) + ", currently there are " + strconv.Itoa(length) + " tasks is pending. Not able to execute this request with " + strconv.Itoa(taskCount) + " tasks.", - } - return + err = fmt.Errorf("import task queue max size is %v, currently there are %v tasks is pending. Not able to execute this request with %v tasks", capacity, length, taskCount) + log.Error(err.Error()) + return err } bucket := "" diff --git a/internal/rootcoord/import_manager_test.go b/internal/rootcoord/import_manager_test.go index ac034036be..e5246d7ba4 100644 --- a/internal/rootcoord/import_manager_test.go +++ b/internal/rootcoord/import_manager_test.go @@ -19,6 +19,7 @@ package rootcoord import ( "context" "errors" + "strconv" "sync" "testing" "time" @@ -260,6 +261,12 @@ func TestImportManager_ImportJob(t *testing.T) { resp = mgr.importJob(context.TODO(), rowReq, colID, 0) assert.Equal(t, len(rowReq.Files)-2, len(mgr.pendingTasks)) assert.Equal(t, 2, len(mgr.workingTasks)) + + for i := 0; i <= 32; i++ { + rowReq.Files = append(rowReq.Files, strconv.Itoa(i)) + } + resp = mgr.importJob(context.TODO(), rowReq, colID, 0) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) } func TestImportManager_AllDataNodesBusy(t *testing.T) {