From b5c67948b7b5199ecab57bfe79edc606c9e04438 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Wed, 13 Mar 2024 19:51:03 +0800 Subject: [PATCH] enhance: Enhance and modify the return content of ImportV2 (#31192) 1. The Import APIs now provide detailed progress information for each imported file, including details such as file name, file size, progress, and more. 2. The APIs now return the collection name and the completion time. 3. Other modifications include changing jobID to jobId and other similar adjustments. issue: https://github.com/milvus-io/milvus/issues/28521 --------- Signed-off-by: bigsheeper --- internal/datacoord/import_checker.go | 3 +- internal/datacoord/import_job.go | 8 ++++ internal/datacoord/import_scheduler.go | 3 +- internal/datacoord/import_task.go | 8 ++++ internal/datacoord/import_util.go | 27 +++++++++++++- internal/datacoord/import_util_test.go | 14 +++---- internal/datacoord/services.go | 30 +++++++++------ internal/datanode/importv2/executor.go | 10 ++++- internal/datanode/importv2/executor_test.go | 9 ++++- internal/datanode/importv2/task.go | 2 + internal/datanode/importv2/util.go | 19 ++++++++++ .../proxy/httpserver/handler_v2.go | 37 +++++++++++++++---- .../proxy/httpserver/handler_v2_test.go | 7 ++-- .../proxy/httpserver/request_v2.go | 15 +++++++- internal/proto/data_coord.proto | 21 ++++++----- internal/proto/internal.proto | 23 +++++++++--- internal/proxy/impl.go | 13 ++++--- 17 files changed, 192 insertions(+), 57 deletions(-) diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index 775184c5da..54fafd7b9a 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -290,7 +290,8 @@ func (c *importChecker) checkImportingJob(job ImportJob) { } } - err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed)) + completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00") + err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime)) if err != nil { log.Warn("failed to update job state to Completed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) } diff --git a/internal/datacoord/import_job.go b/internal/datacoord/import_job.go index 49790f4a89..2d82d27763 100644 --- a/internal/datacoord/import_job.go +++ b/internal/datacoord/import_job.go @@ -55,9 +55,16 @@ func UpdateJobReason(reason string) UpdateJobAction { } } +func UpdateJobCompleteTime(completeTime string) UpdateJobAction { + return func(job ImportJob) { + job.(*importJob).ImportJob.CompleteTime = completeTime + } +} + type ImportJob interface { GetJobID() int64 GetCollectionID() int64 + GetCollectionName() string GetPartitionIDs() []int64 GetVchannels() []string GetSchema() *schemapb.CollectionSchema @@ -65,6 +72,7 @@ type ImportJob interface { GetCleanupTs() uint64 GetState() internalpb.ImportJobState GetReason() string + GetCompleteTime() string GetFiles() []*internalpb.ImportFile GetOptions() []*commonpb.KeyValuePair Clone() ImportJob diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index 08f7d17b16..965c364c87 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -303,7 +303,8 @@ func (s *importScheduler) processInProgressImport(task ImportTask) { return } } - err = s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed)) + completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00") + err = s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), UpdateCompleteTime(completeTime)) if err != nil { log.Warn("update import task failed", WrapTaskLog(task, zap.Error(err))...) return diff --git a/internal/datacoord/import_task.go b/internal/datacoord/import_task.go index 57f2ffc77c..82d3c70b2f 100644 --- a/internal/datacoord/import_task.go +++ b/internal/datacoord/import_task.go @@ -87,6 +87,14 @@ func UpdateReason(reason string) UpdateAction { } } +func UpdateCompleteTime(completeTime string) UpdateAction { + return func(t ImportTask) { + if task, ok := t.(*importTask); ok { + task.ImportTaskV2.CompleteTime = completeTime + } + } +} + func UpdateNodeID(nodeID int64) UpdateAction { return func(t ImportTask) { switch t.GetType() { diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 92d8b98c5b..e6c7199282 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -321,7 +321,7 @@ func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) float32 { return importingProgress*0.8 + completedProgress*0.2 } -func GetImportProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalpb.ImportJobState, string) { +func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalpb.ImportJobState, string) { job := imeta.GetJob(jobID) switch job.GetState() { case internalpb.ImportJobState_Pending: @@ -345,6 +345,31 @@ func GetImportProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, intern return 0, internalpb.ImportJobState_None, "unknown import job state" } +func GetTaskProgresses(jobID int64, imeta ImportMeta, meta *meta) []*internalpb.ImportTaskProgress { + progresses := make([]*internalpb.ImportTaskProgress, 0) + tasks := imeta.GetTaskBy(WithJob(jobID), WithType(ImportTaskType)) + for _, task := range tasks { + totalRows := lo.SumBy(task.GetFileStats(), func(file *datapb.ImportFileStats) int64 { + return file.GetTotalRows() + }) + importedRows := meta.GetSegmentsTotalCurrentRows(task.(*importTask).GetSegmentIDs()) + progress := int64(100) + if totalRows != 0 { + progress = int64(float32(importedRows) / float32(totalRows) * 100) + } + for _, fileStat := range task.GetFileStats() { + progresses = append(progresses, &internalpb.ImportTaskProgress{ + FileName: fileStat.GetImportFile().String(), + FileSize: fileStat.GetFileSize(), + Reason: task.GetReason(), + Progress: progress, + CompleteTime: task.(*importTask).GetCompleteTime(), + }) + } + } + return progresses +} + func DropImportTask(task ImportTask, cluster Cluster, tm ImportMeta) error { if task.GetNodeID() == NullNodeID { return nil diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 60e56168c5..7a6c53dcb0 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -448,7 +448,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { // failed state err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(mockErr)) assert.NoError(t, err) - progress, state, reason := GetImportProgress(job.GetJobID(), imeta, meta) + progress, state, reason := GetJobProgress(job.GetJobID(), imeta, meta) assert.Equal(t, int64(0), progress) assert.Equal(t, internalpb.ImportJobState_Failed, state) assert.Equal(t, mockErr, reason) @@ -456,7 +456,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { // pending state err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Pending)) assert.NoError(t, err) - progress, state, reason = GetImportProgress(job.GetJobID(), imeta, meta) + progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta) assert.Equal(t, int64(10), progress) assert.Equal(t, internalpb.ImportJobState_Pending, state) assert.Equal(t, "", reason) @@ -464,7 +464,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { // preImporting state err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting)) assert.NoError(t, err) - progress, state, reason = GetImportProgress(job.GetJobID(), imeta, meta) + progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta) assert.Equal(t, int64(10+40), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) @@ -472,7 +472,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { // importing state, segmentImportedRows/totalRows = 0.5 err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing)) assert.NoError(t, err) - progress, state, reason = GetImportProgress(job.GetJobID(), imeta, meta) + progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta) assert.Equal(t, int64(10+40+40*0.5), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) @@ -494,7 +494,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { assert.NoError(t, err) err = meta.UpdateSegmentsInfo(UpdateImportedRows(22, 100)) assert.NoError(t, err) - progress, state, reason = GetImportProgress(job.GetJobID(), imeta, meta) + progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta) assert.Equal(t, int64(float32(10+40+40+10*2/6)), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) @@ -508,7 +508,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { assert.NoError(t, err) err = meta.UpdateSegmentsInfo(UpdateIsImporting(22, false)) assert.NoError(t, err) - progress, state, reason = GetImportProgress(job.GetJobID(), imeta, meta) + progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta) assert.Equal(t, int64(10+40+40+10), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) @@ -516,7 +516,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { // completed state err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed)) assert.NoError(t, err) - progress, state, reason = GetImportProgress(job.GetJobID(), imeta, meta) + progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta) assert.Equal(t, int64(100), progress) assert.Equal(t, internalpb.ImportJobState_Completed, state) assert.Equal(t, "", reason) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index f0aa5f5670..91928d883c 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1819,16 +1819,17 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter job := &importJob{ ImportJob: &datapb.ImportJob{ - JobID: idStart, - CollectionID: in.GetCollectionID(), - PartitionIDs: in.GetPartitionIDs(), - Vchannels: in.GetChannelNames(), - Schema: in.GetSchema(), - TimeoutTs: timeoutTs, - CleanupTs: math.MaxUint64, - State: internalpb.ImportJobState_Pending, - Files: files, - Options: in.GetOptions(), + JobID: idStart, + CollectionID: in.GetCollectionID(), + CollectionName: in.GetCollectionName(), + PartitionIDs: in.GetPartitionIDs(), + Vchannels: in.GetChannelNames(), + Schema: in.GetSchema(), + TimeoutTs: timeoutTs, + CleanupTs: math.MaxUint64, + State: internalpb.ImportJobState_Pending, + Files: files, + Options: in.GetOptions(), }, } err = s.importMeta.AddJob(job) @@ -1858,10 +1859,14 @@ func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImport resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprint("parse job id failed, err=%w", err))) return resp, nil } - progress, state, reason := GetImportProgress(jobID, s.importMeta, s.meta) + job := s.importMeta.GetJob(jobID) + progress, state, reason := GetJobProgress(jobID, s.importMeta, s.meta) resp.State = state resp.Reason = reason resp.Progress = progress + resp.CollectionName = job.GetCollectionName() + resp.CompleteTime = job.GetCompleteTime() + resp.TaskProgresses = GetTaskProgresses(jobID, s.importMeta, s.meta) log.Info("GetImportProgress done", zap.Any("resp", resp)) return resp, nil } @@ -1889,11 +1894,12 @@ func (s *Server) ListImports(ctx context.Context, req *internalpb.ListImportsReq } for _, job := range jobs { - progress, state, reason := GetImportProgress(job.GetJobID(), s.importMeta, s.meta) + progress, state, reason := GetJobProgress(job.GetJobID(), s.importMeta, s.meta) resp.JobIDs = append(resp.JobIDs, fmt.Sprintf("%d", job.GetJobID())) resp.States = append(resp.States, state) resp.Reasons = append(resp.Reasons, reason) resp.Progresses = append(resp.Progresses, progress) + resp.CollectionNames = append(resp.CollectionNames, job.GetCollectionName()) } return resp, nil } diff --git a/internal/datanode/importv2/executor.go b/internal/datanode/importv2/executor.go index 46c7421dec..15501a611c 100644 --- a/internal/datanode/importv2/executor.go +++ b/internal/datanode/importv2/executor.go @@ -150,7 +150,7 @@ func (e *executor) PreImport(task Task) { } defer reader.Close() start := time.Now() - err = e.readFileStat(reader, task, i) + err = e.readFileStat(reader, task, i, file) if err != nil { e.handleErr(task, err, "preimport failed") return err @@ -180,7 +180,12 @@ func (e *executor) PreImport(task Task) { WrapLogFields(task, zap.Any("fileStats", task.(*PreImportTask).GetFileStats()))...) } -func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx int) error { +func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx int, file *internalpb.ImportFile) error { + fileSize, err := GetFileSize(file, e.cm) + if err != nil { + return err + } + totalRows := 0 totalSize := 0 hashedStats := make(map[string]*datapb.PartitionImportStats) @@ -209,6 +214,7 @@ func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx i } stat := &datapb.ImportFileStats{ + FileSize: fileSize, TotalRows: int64(totalRows), TotalMemorySize: int64(totalSize), HashedStats: hashedStats, diff --git a/internal/datanode/importv2/executor_test.go b/internal/datanode/importv2/executor_test.go index f8f315d039..c342706cae 100644 --- a/internal/datanode/importv2/executor_test.go +++ b/internal/datanode/importv2/executor_test.go @@ -260,6 +260,7 @@ func (s *ExecutorSuite) TestExecutor_Start_Preimport() { cm := mocks.NewChunkManager(s.T()) ioReader := strings.NewReader(string(bytes)) + cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) s.executor.cm = cm @@ -313,6 +314,7 @@ func (s *ExecutorSuite) TestExecutor_Start_Preimport_Failed() { io.Seeker } ioReader := strings.NewReader(string(bytes)) + cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) s.executor.cm = cm @@ -461,6 +463,11 @@ func (s *ExecutorSuite) TestExecutor_ReadFileStat() { importFile := &internalpb.ImportFile{ Paths: []string{"dummy.json"}, } + + cm := mocks.NewChunkManager(s.T()) + cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil) + s.executor.cm = cm + var once sync.Once data := createInsertData(s.T(), s.schema, s.numRows) s.reader = importutilv2.NewMockReader(s.T()) @@ -485,7 +492,7 @@ func (s *ExecutorSuite) TestExecutor_ReadFileStat() { } preimportTask := NewPreImportTask(preimportReq) s.manager.Add(preimportTask) - err := s.executor.readFileStat(s.reader, preimportTask, 0) + err := s.executor.readFileStat(s.reader, preimportTask, 0, importFile) s.NoError(err) } diff --git a/internal/datanode/importv2/task.go b/internal/datanode/importv2/task.go index d623c2f90d..f0b01ac15a 100644 --- a/internal/datanode/importv2/task.go +++ b/internal/datanode/importv2/task.go @@ -92,7 +92,9 @@ func UpdateReason(reason string) UpdateAction { func UpdateFileStat(idx int, fileStat *datapb.ImportFileStats) UpdateAction { return func(task Task) { if it, ok := task.(*PreImportTask); ok { + it.PreImportTask.FileStats[idx].FileSize = fileStat.GetFileSize() it.PreImportTask.FileStats[idx].TotalRows = fileStat.GetTotalRows() + it.PreImportTask.FileStats[idx].TotalMemorySize = fileStat.GetTotalMemorySize() it.PreImportTask.FileStats[idx].HashedStats = fileStat.GetHashedStats() } } diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index b9b99b7777..1005e3197e 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "strconv" + "time" "github.com/samber/lo" "go.uber.org/zap" @@ -29,6 +30,7 @@ import ( "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" @@ -201,6 +203,23 @@ func GetInsertDataRowCount(data *storage.InsertData, schema *schemapb.Collection return 0 } +func GetFileSize(file *internalpb.ImportFile, cm storage.ChunkManager) (int64, error) { + fn := func(path string) (int64, error) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + return cm.Size(ctx, path) + } + var totalSize int64 = 0 + for _, path := range file.GetPaths() { + size, err := fn(path) + if err != nil { + return 0, err + } + totalSize += size + } + return totalSize, nil +} + func LogStats(manager TaskManager) { logFunc := func(tasks []Task, taskType TaskType) { byState := lo.GroupBy(tasks, func(t Task) datapb.ImportTaskStateV2 { diff --git a/internal/distributed/proxy/httpserver/handler_v2.go b/internal/distributed/proxy/httpserver/handler_v2.go index 3af1b57865..4e2c486d10 100644 --- a/internal/distributed/proxy/httpserver/handler_v2.go +++ b/internal/distributed/proxy/httpserver/handler_v2.go @@ -130,7 +130,7 @@ func (h *HandlersV2) RegisterRoutesToV2(router gin.IRouter) { router.POST(AliasCategory+DropAction, timeoutMiddleware(wrapperPost(func() any { return &AliasReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.dropAlias))))) router.POST(AliasCategory+AlterAction, timeoutMiddleware(wrapperPost(func() any { return &AliasCollectionReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.alterAlias))))) - router.POST(ImportJobCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listImportJob))))) + router.POST(ImportJobCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &OptionalCollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listImportJob))))) router.POST(ImportJobCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &ImportReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.createImportJob))))) router.POST(ImportJobCategory+GetProgressAction, timeoutMiddleware(wrapperPost(func() any { return &JobIDReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getImportJobProcess))))) } @@ -1642,28 +1642,34 @@ func (h *HandlersV2) alterAlias(ctx context.Context, c *gin.Context, anyReq any, } func (h *HandlersV2) listImportJob(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { - collectionGetter := anyReq.(requestutil.CollectionNameGetter) + var collectionName string + if collectionGetter, ok := anyReq.(requestutil.CollectionNameGetter); ok { + collectionName = collectionGetter.GetCollectionName() + } req := &internalpb.ListImportsRequest{ DbName: dbName, - CollectionName: collectionGetter.GetCollectionName(), + CollectionName: collectionName, } resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { return h.proxy.ListImports(reqCtx, req.(*internalpb.ListImportsRequest)) }) if err == nil { - returnData := make([]map[string]interface{}, 0) + returnData := make(map[string]interface{}) + records := make([]map[string]interface{}, 0) response := resp.(*internalpb.ListImportsResponse) for i, jobID := range response.GetJobIDs() { jobDetail := make(map[string]interface{}) - jobDetail["jobID"] = jobID + jobDetail["jobId"] = jobID + jobDetail["collectionName"] = response.GetCollectionNames()[i] jobDetail["state"] = response.GetStates()[i].String() jobDetail["progress"] = response.GetProgresses()[i] reason := response.GetReasons()[i] if reason != "" { jobDetail["reason"] = reason } - returnData = append(returnData, jobDetail) + records = append(records, jobDetail) } + returnData["records"] = records c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: returnData}) } return resp, err @@ -1690,7 +1696,7 @@ func (h *HandlersV2) createImportJob(ctx context.Context, c *gin.Context, anyReq }) if err == nil { returnData := make(map[string]interface{}) - returnData["jobID"] = resp.(*internalpb.ImportResponse).GetJobID() + returnData["jobId"] = resp.(*internalpb.ImportResponse).GetJobID() c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: returnData}) } return resp, err @@ -1708,13 +1714,28 @@ func (h *HandlersV2) getImportJobProcess(ctx context.Context, c *gin.Context, an if err == nil { response := resp.(*internalpb.GetImportProgressResponse) returnData := make(map[string]interface{}) - returnData["jobID"] = jobIDGetter.GetJobID() + returnData["jobId"] = jobIDGetter.GetJobID() + returnData["collectionName"] = response.GetCollectionName() returnData["state"] = response.GetState().String() returnData["progress"] = response.GetProgress() reason := response.GetReason() if reason != "" { returnData["reason"] = reason } + details := make([]map[string]interface{}, 0) + for _, taskProgress := range response.GetTaskProgresses() { + detail := make(map[string]interface{}) + detail["fileName"] = taskProgress.GetFileName() + detail["fileSize"] = taskProgress.GetFileSize() + detail["progress"] = taskProgress.GetProgress() + detail["completeTime"] = taskProgress.GetCompleteTime() + reason = taskProgress.GetReason() + if reason != "" { + detail["reason"] = reason + } + details = append(details, detail) + } + returnData["details"] = details c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: returnData}) } return resp, err diff --git a/internal/distributed/proxy/httpserver/handler_v2_test.go b/internal/distributed/proxy/httpserver/handler_v2_test.go index a7422caec9..ad0a88241a 100644 --- a/internal/distributed/proxy/httpserver/handler_v2_test.go +++ b/internal/distributed/proxy/httpserver/handler_v2_test.go @@ -898,8 +898,9 @@ func TestMethodPost(t *testing.T) { internalpb.ImportJobState_Failed, internalpb.ImportJobState_Completed, }, - Reasons: []string{"", "", "mock reason", ""}, - Progresses: []int64{0, 30, 0, 100}, + Reasons: []string{"", "", "mock reason", ""}, + Progresses: []int64{0, 30, 0, 100}, + CollectionNames: []string{"AAA", "BBB", "CCC", "DDD"}, }, nil).Once() mp.EXPECT().GetImportProgress(mock.Anything, mock.Anything).Return(&internalpb.GetImportProgressResponse{ Status: &StatusSuccess, @@ -985,7 +986,7 @@ func TestMethodPost(t *testing.T) { `"userName": "` + util.UserRoot + `", "password": "Milvus", "newPassword": "milvus", "roleName": "` + util.RoleAdmin + `",` + `"roleName": "` + util.RoleAdmin + `", "objectType": "Global", "objectName": "*", "privilege": "*",` + `"aliasName": "` + DefaultAliasName + `",` + - `"jobID": "1234567890",` + + `"jobId": "1234567890",` + `"files": [["book.json"]]` + `}`)) req := httptest.NewRequest(http.MethodPost, testcase.path, bodyReader) diff --git a/internal/distributed/proxy/httpserver/request_v2.go b/internal/distributed/proxy/httpserver/request_v2.go index a4404a1b1f..0e2c130904 100644 --- a/internal/distributed/proxy/httpserver/request_v2.go +++ b/internal/distributed/proxy/httpserver/request_v2.go @@ -33,6 +33,19 @@ func (req *CollectionNameReq) GetPartitionNames() []string { return req.PartitionNames } +type OptionalCollectionNameReq struct { + DbName string `json:"dbName"` + CollectionName string `json:"collectionName"` +} + +func (req *OptionalCollectionNameReq) GetDbName() string { + return req.DbName +} + +func (req *OptionalCollectionNameReq) GetCollectionName() string { + return req.CollectionName +} + type RenameCollectionReq struct { DbName string `json:"dbName"` CollectionName string `json:"collectionName" binding:"required"` @@ -82,7 +95,7 @@ func (req *ImportReq) GetOptions() map[string]string { } type JobIDReq struct { - JobID string `json:"jobID" binding:"required"` + JobID string `json:"jobId" binding:"required"` } func (req *JobIDReq) GetJobID() string { return req.JobID } diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index ab50d983c3..ca4299d586 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -859,15 +859,17 @@ message ImportJob { int64 jobID = 1; int64 dbID = 2; int64 collectionID = 3; - repeated int64 partitionIDs = 4; - repeated string vchannels = 5; - schema.CollectionSchema schema = 6; - uint64 timeout_ts = 7; - uint64 cleanup_ts = 8; - internal.ImportJobState state = 9; - string reason = 10; - repeated internal.ImportFile files = 11; - repeated common.KeyValuePair options = 12; + string collection_name = 4; + repeated int64 partitionIDs = 5; + repeated string vchannels = 6; + schema.CollectionSchema schema = 7; + uint64 timeout_ts = 8; + uint64 cleanup_ts = 9; + internal.ImportJobState state = 10; + string reason = 11; + string complete_time = 12; + repeated internal.ImportFile files = 13; + repeated common.KeyValuePair options = 14; } enum ImportTaskStateV2 { @@ -896,6 +898,7 @@ message ImportTaskV2 { int64 nodeID = 5; ImportTaskStateV2 state = 6; string reason = 7; + string complete_time = 8; repeated ImportFileStats file_stats = 9; } diff --git a/internal/proto/internal.proto b/internal/proto/internal.proto index 3608cc3f73..ef42e11b24 100644 --- a/internal/proto/internal.proto +++ b/internal/proto/internal.proto @@ -280,11 +280,12 @@ message ImportFile { message ImportRequestInternal { int64 dbID = 1; int64 collectionID = 2; - repeated int64 partitionIDs = 3; - repeated string channel_names = 4; - schema.CollectionSchema schema = 5; - repeated ImportFile files = 6; - repeated common.KeyValuePair options = 7; + string collection_name = 3; + repeated int64 partitionIDs = 4; + repeated string channel_names = 5; + schema.CollectionSchema schema = 6; + repeated ImportFile files = 7; + repeated common.KeyValuePair options = 8; } message ImportRequest { @@ -305,11 +306,22 @@ message GetImportProgressRequest { string jobID = 2; } +message ImportTaskProgress { + string file_name = 1; + int64 file_size = 2; + string reason = 3; + int64 progress = 4; + string complete_time = 5; +} + message GetImportProgressResponse { common.Status status = 1; ImportJobState state = 2; string reason = 3; int64 progress = 4; + string collection_name = 5; + string complete_time = 6; + repeated ImportTaskProgress task_progresses = 7; } message ListImportsRequestInternal { @@ -328,4 +340,5 @@ message ListImportsResponse { repeated ImportJobState states = 3; repeated string reasons = 4; repeated int64 progresses = 5; + repeated string collection_names = 6; } diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index e0e0dd5502..81ba356fa4 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -5643,12 +5643,13 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest) } } importRequest := &internalpb.ImportRequestInternal{ - CollectionID: collectionID, - PartitionIDs: partitionIDs, - ChannelNames: channels, - Schema: schema.CollectionSchema, - Files: req.GetFiles(), - Options: req.GetOptions(), + CollectionID: collectionID, + CollectionName: req.GetCollectionName(), + PartitionIDs: partitionIDs, + ChannelNames: channels, + Schema: schema.CollectionSchema, + Files: req.GetFiles(), + Options: req.GetOptions(), } resp, err = node.dataCoord.ImportV2(ctx, importRequest) if err != nil {