diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index 775184c5da..54fafd7b9a 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -290,7 +290,8 @@ func (c *importChecker) checkImportingJob(job ImportJob) { } } - err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed)) + completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00") + err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime)) if err != nil { log.Warn("failed to update job state to Completed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) } diff --git a/internal/datacoord/import_job.go b/internal/datacoord/import_job.go index 49790f4a89..2d82d27763 100644 --- a/internal/datacoord/import_job.go +++ b/internal/datacoord/import_job.go @@ -55,9 +55,16 @@ func UpdateJobReason(reason string) UpdateJobAction { } } +func UpdateJobCompleteTime(completeTime string) UpdateJobAction { + return func(job ImportJob) { + job.(*importJob).ImportJob.CompleteTime = completeTime + } +} + type ImportJob interface { GetJobID() int64 GetCollectionID() int64 + GetCollectionName() string GetPartitionIDs() []int64 GetVchannels() []string GetSchema() *schemapb.CollectionSchema @@ -65,6 +72,7 @@ type ImportJob interface { GetCleanupTs() uint64 GetState() internalpb.ImportJobState GetReason() string + GetCompleteTime() string GetFiles() []*internalpb.ImportFile GetOptions() []*commonpb.KeyValuePair Clone() ImportJob diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index 08f7d17b16..965c364c87 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -303,7 +303,8 @@ func (s *importScheduler) processInProgressImport(task ImportTask) { return } } - err = s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed)) + completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00") + err = s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), UpdateCompleteTime(completeTime)) if err != nil { log.Warn("update import task failed", WrapTaskLog(task, zap.Error(err))...) return diff --git a/internal/datacoord/import_task.go b/internal/datacoord/import_task.go index 57f2ffc77c..82d3c70b2f 100644 --- a/internal/datacoord/import_task.go +++ b/internal/datacoord/import_task.go @@ -87,6 +87,14 @@ func UpdateReason(reason string) UpdateAction { } } +func UpdateCompleteTime(completeTime string) UpdateAction { + return func(t ImportTask) { + if task, ok := t.(*importTask); ok { + task.ImportTaskV2.CompleteTime = completeTime + } + } +} + func UpdateNodeID(nodeID int64) UpdateAction { return func(t ImportTask) { switch t.GetType() { diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 92d8b98c5b..e6c7199282 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -321,7 +321,7 @@ func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) float32 { return importingProgress*0.8 + completedProgress*0.2 } -func GetImportProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalpb.ImportJobState, string) { +func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalpb.ImportJobState, string) { job := imeta.GetJob(jobID) switch job.GetState() { case internalpb.ImportJobState_Pending: @@ -345,6 +345,31 @@ func GetImportProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, intern return 0, internalpb.ImportJobState_None, "unknown import job state" } +func GetTaskProgresses(jobID int64, imeta ImportMeta, meta *meta) []*internalpb.ImportTaskProgress { + progresses := make([]*internalpb.ImportTaskProgress, 0) + tasks := imeta.GetTaskBy(WithJob(jobID), WithType(ImportTaskType)) + for _, task := range tasks { + totalRows := lo.SumBy(task.GetFileStats(), func(file *datapb.ImportFileStats) int64 { + return file.GetTotalRows() + }) + importedRows := meta.GetSegmentsTotalCurrentRows(task.(*importTask).GetSegmentIDs()) + progress := int64(100) + if totalRows != 0 { + progress = int64(float32(importedRows) / float32(totalRows) * 100) + } + for _, fileStat := range task.GetFileStats() { + progresses = append(progresses, &internalpb.ImportTaskProgress{ + FileName: fileStat.GetImportFile().String(), + FileSize: fileStat.GetFileSize(), + Reason: task.GetReason(), + Progress: progress, + CompleteTime: task.(*importTask).GetCompleteTime(), + }) + } + } + return progresses +} + func DropImportTask(task ImportTask, cluster Cluster, tm ImportMeta) error { if task.GetNodeID() == NullNodeID { return nil diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 60e56168c5..7a6c53dcb0 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -448,7 +448,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { // failed state err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(mockErr)) assert.NoError(t, err) - progress, state, reason := GetImportProgress(job.GetJobID(), imeta, meta) + progress, state, reason := GetJobProgress(job.GetJobID(), imeta, meta) assert.Equal(t, int64(0), progress) assert.Equal(t, internalpb.ImportJobState_Failed, state) assert.Equal(t, mockErr, reason) @@ -456,7 +456,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { // pending state err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Pending)) assert.NoError(t, err) - progress, state, reason = GetImportProgress(job.GetJobID(), imeta, meta) + progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta) assert.Equal(t, int64(10), progress) assert.Equal(t, internalpb.ImportJobState_Pending, state) assert.Equal(t, "", reason) @@ -464,7 +464,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { // preImporting state err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting)) assert.NoError(t, err) - progress, state, reason = GetImportProgress(job.GetJobID(), imeta, meta) + progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta) assert.Equal(t, int64(10+40), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) @@ -472,7 +472,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { // importing state, segmentImportedRows/totalRows = 0.5 err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing)) assert.NoError(t, err) - progress, state, reason = GetImportProgress(job.GetJobID(), imeta, meta) + progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta) assert.Equal(t, int64(10+40+40*0.5), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) @@ -494,7 +494,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { assert.NoError(t, err) err = meta.UpdateSegmentsInfo(UpdateImportedRows(22, 100)) assert.NoError(t, err) - progress, state, reason = GetImportProgress(job.GetJobID(), imeta, meta) + progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta) assert.Equal(t, int64(float32(10+40+40+10*2/6)), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) @@ -508,7 +508,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { assert.NoError(t, err) err = meta.UpdateSegmentsInfo(UpdateIsImporting(22, false)) assert.NoError(t, err) - progress, state, reason = GetImportProgress(job.GetJobID(), imeta, meta) + progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta) assert.Equal(t, int64(10+40+40+10), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) @@ -516,7 +516,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { // completed state err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed)) assert.NoError(t, err) - progress, state, reason = GetImportProgress(job.GetJobID(), imeta, meta) + progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta) assert.Equal(t, int64(100), progress) assert.Equal(t, internalpb.ImportJobState_Completed, state) assert.Equal(t, "", reason) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index f0aa5f5670..91928d883c 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1819,16 +1819,17 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter job := &importJob{ ImportJob: &datapb.ImportJob{ - JobID: idStart, - CollectionID: in.GetCollectionID(), - PartitionIDs: in.GetPartitionIDs(), - Vchannels: in.GetChannelNames(), - Schema: in.GetSchema(), - TimeoutTs: timeoutTs, - CleanupTs: math.MaxUint64, - State: internalpb.ImportJobState_Pending, - Files: files, - Options: in.GetOptions(), + JobID: idStart, + CollectionID: in.GetCollectionID(), + CollectionName: in.GetCollectionName(), + PartitionIDs: in.GetPartitionIDs(), + Vchannels: in.GetChannelNames(), + Schema: in.GetSchema(), + TimeoutTs: timeoutTs, + CleanupTs: math.MaxUint64, + State: internalpb.ImportJobState_Pending, + Files: files, + Options: in.GetOptions(), }, } err = s.importMeta.AddJob(job) @@ -1858,10 +1859,14 @@ func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImport resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprint("parse job id failed, err=%w", err))) return resp, nil } - progress, state, reason := GetImportProgress(jobID, s.importMeta, s.meta) + job := s.importMeta.GetJob(jobID) + progress, state, reason := GetJobProgress(jobID, s.importMeta, s.meta) resp.State = state resp.Reason = reason resp.Progress = progress + resp.CollectionName = job.GetCollectionName() + resp.CompleteTime = job.GetCompleteTime() + resp.TaskProgresses = GetTaskProgresses(jobID, s.importMeta, s.meta) log.Info("GetImportProgress done", zap.Any("resp", resp)) return resp, nil } @@ -1889,11 +1894,12 @@ func (s *Server) ListImports(ctx context.Context, req *internalpb.ListImportsReq } for _, job := range jobs { - progress, state, reason := GetImportProgress(job.GetJobID(), s.importMeta, s.meta) + progress, state, reason := GetJobProgress(job.GetJobID(), s.importMeta, s.meta) resp.JobIDs = append(resp.JobIDs, fmt.Sprintf("%d", job.GetJobID())) resp.States = append(resp.States, state) resp.Reasons = append(resp.Reasons, reason) resp.Progresses = append(resp.Progresses, progress) + resp.CollectionNames = append(resp.CollectionNames, job.GetCollectionName()) } return resp, nil } diff --git a/internal/datanode/importv2/executor.go b/internal/datanode/importv2/executor.go index 46c7421dec..15501a611c 100644 --- a/internal/datanode/importv2/executor.go +++ b/internal/datanode/importv2/executor.go @@ -150,7 +150,7 @@ func (e *executor) PreImport(task Task) { } defer reader.Close() start := time.Now() - err = e.readFileStat(reader, task, i) + err = e.readFileStat(reader, task, i, file) if err != nil { e.handleErr(task, err, "preimport failed") return err @@ -180,7 +180,12 @@ func (e *executor) PreImport(task Task) { WrapLogFields(task, zap.Any("fileStats", task.(*PreImportTask).GetFileStats()))...) } -func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx int) error { +func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx int, file *internalpb.ImportFile) error { + fileSize, err := GetFileSize(file, e.cm) + if err != nil { + return err + } + totalRows := 0 totalSize := 0 hashedStats := make(map[string]*datapb.PartitionImportStats) @@ -209,6 +214,7 @@ func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx i } stat := &datapb.ImportFileStats{ + FileSize: fileSize, TotalRows: int64(totalRows), TotalMemorySize: int64(totalSize), HashedStats: hashedStats, diff --git a/internal/datanode/importv2/executor_test.go b/internal/datanode/importv2/executor_test.go index f8f315d039..c342706cae 100644 --- a/internal/datanode/importv2/executor_test.go +++ b/internal/datanode/importv2/executor_test.go @@ -260,6 +260,7 @@ func (s *ExecutorSuite) TestExecutor_Start_Preimport() { cm := mocks.NewChunkManager(s.T()) ioReader := strings.NewReader(string(bytes)) + cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) s.executor.cm = cm @@ -313,6 +314,7 @@ func (s *ExecutorSuite) TestExecutor_Start_Preimport_Failed() { io.Seeker } ioReader := strings.NewReader(string(bytes)) + cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil) cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) s.executor.cm = cm @@ -461,6 +463,11 @@ func (s *ExecutorSuite) TestExecutor_ReadFileStat() { importFile := &internalpb.ImportFile{ Paths: []string{"dummy.json"}, } + + cm := mocks.NewChunkManager(s.T()) + cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil) + s.executor.cm = cm + var once sync.Once data := createInsertData(s.T(), s.schema, s.numRows) s.reader = importutilv2.NewMockReader(s.T()) @@ -485,7 +492,7 @@ func (s *ExecutorSuite) TestExecutor_ReadFileStat() { } preimportTask := NewPreImportTask(preimportReq) s.manager.Add(preimportTask) - err := s.executor.readFileStat(s.reader, preimportTask, 0) + err := s.executor.readFileStat(s.reader, preimportTask, 0, importFile) s.NoError(err) } diff --git a/internal/datanode/importv2/task.go b/internal/datanode/importv2/task.go index d623c2f90d..f0b01ac15a 100644 --- a/internal/datanode/importv2/task.go +++ b/internal/datanode/importv2/task.go @@ -92,7 +92,9 @@ func UpdateReason(reason string) UpdateAction { func UpdateFileStat(idx int, fileStat *datapb.ImportFileStats) UpdateAction { return func(task Task) { if it, ok := task.(*PreImportTask); ok { + it.PreImportTask.FileStats[idx].FileSize = fileStat.GetFileSize() it.PreImportTask.FileStats[idx].TotalRows = fileStat.GetTotalRows() + it.PreImportTask.FileStats[idx].TotalMemorySize = fileStat.GetTotalMemorySize() it.PreImportTask.FileStats[idx].HashedStats = fileStat.GetHashedStats() } } diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index b9b99b7777..1005e3197e 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "strconv" + "time" "github.com/samber/lo" "go.uber.org/zap" @@ -29,6 +30,7 @@ import ( "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" @@ -201,6 +203,23 @@ func GetInsertDataRowCount(data *storage.InsertData, schema *schemapb.Collection return 0 } +func GetFileSize(file *internalpb.ImportFile, cm storage.ChunkManager) (int64, error) { + fn := func(path string) (int64, error) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + return cm.Size(ctx, path) + } + var totalSize int64 = 0 + for _, path := range file.GetPaths() { + size, err := fn(path) + if err != nil { + return 0, err + } + totalSize += size + } + return totalSize, nil +} + func LogStats(manager TaskManager) { logFunc := func(tasks []Task, taskType TaskType) { byState := lo.GroupBy(tasks, func(t Task) datapb.ImportTaskStateV2 { diff --git a/internal/distributed/proxy/httpserver/handler_v2.go b/internal/distributed/proxy/httpserver/handler_v2.go index 3af1b57865..4e2c486d10 100644 --- a/internal/distributed/proxy/httpserver/handler_v2.go +++ b/internal/distributed/proxy/httpserver/handler_v2.go @@ -130,7 +130,7 @@ func (h *HandlersV2) RegisterRoutesToV2(router gin.IRouter) { router.POST(AliasCategory+DropAction, timeoutMiddleware(wrapperPost(func() any { return &AliasReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.dropAlias))))) router.POST(AliasCategory+AlterAction, timeoutMiddleware(wrapperPost(func() any { return &AliasCollectionReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.alterAlias))))) - router.POST(ImportJobCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listImportJob))))) + router.POST(ImportJobCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &OptionalCollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listImportJob))))) router.POST(ImportJobCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &ImportReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.createImportJob))))) router.POST(ImportJobCategory+GetProgressAction, timeoutMiddleware(wrapperPost(func() any { return &JobIDReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getImportJobProcess))))) } @@ -1642,28 +1642,34 @@ func (h *HandlersV2) alterAlias(ctx context.Context, c *gin.Context, anyReq any, } func (h *HandlersV2) listImportJob(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) { - collectionGetter := anyReq.(requestutil.CollectionNameGetter) + var collectionName string + if collectionGetter, ok := anyReq.(requestutil.CollectionNameGetter); ok { + collectionName = collectionGetter.GetCollectionName() + } req := &internalpb.ListImportsRequest{ DbName: dbName, - CollectionName: collectionGetter.GetCollectionName(), + CollectionName: collectionName, } resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) { return h.proxy.ListImports(reqCtx, req.(*internalpb.ListImportsRequest)) }) if err == nil { - returnData := make([]map[string]interface{}, 0) + returnData := make(map[string]interface{}) + records := make([]map[string]interface{}, 0) response := resp.(*internalpb.ListImportsResponse) for i, jobID := range response.GetJobIDs() { jobDetail := make(map[string]interface{}) - jobDetail["jobID"] = jobID + jobDetail["jobId"] = jobID + jobDetail["collectionName"] = response.GetCollectionNames()[i] jobDetail["state"] = response.GetStates()[i].String() jobDetail["progress"] = response.GetProgresses()[i] reason := response.GetReasons()[i] if reason != "" { jobDetail["reason"] = reason } - returnData = append(returnData, jobDetail) + records = append(records, jobDetail) } + returnData["records"] = records c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: returnData}) } return resp, err @@ -1690,7 +1696,7 @@ func (h *HandlersV2) createImportJob(ctx context.Context, c *gin.Context, anyReq }) if err == nil { returnData := make(map[string]interface{}) - returnData["jobID"] = resp.(*internalpb.ImportResponse).GetJobID() + returnData["jobId"] = resp.(*internalpb.ImportResponse).GetJobID() c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: returnData}) } return resp, err @@ -1708,13 +1714,28 @@ func (h *HandlersV2) getImportJobProcess(ctx context.Context, c *gin.Context, an if err == nil { response := resp.(*internalpb.GetImportProgressResponse) returnData := make(map[string]interface{}) - returnData["jobID"] = jobIDGetter.GetJobID() + returnData["jobId"] = jobIDGetter.GetJobID() + returnData["collectionName"] = response.GetCollectionName() returnData["state"] = response.GetState().String() returnData["progress"] = response.GetProgress() reason := response.GetReason() if reason != "" { returnData["reason"] = reason } + details := make([]map[string]interface{}, 0) + for _, taskProgress := range response.GetTaskProgresses() { + detail := make(map[string]interface{}) + detail["fileName"] = taskProgress.GetFileName() + detail["fileSize"] = taskProgress.GetFileSize() + detail["progress"] = taskProgress.GetProgress() + detail["completeTime"] = taskProgress.GetCompleteTime() + reason = taskProgress.GetReason() + if reason != "" { + detail["reason"] = reason + } + details = append(details, detail) + } + returnData["details"] = details c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: returnData}) } return resp, err diff --git a/internal/distributed/proxy/httpserver/handler_v2_test.go b/internal/distributed/proxy/httpserver/handler_v2_test.go index a7422caec9..ad0a88241a 100644 --- a/internal/distributed/proxy/httpserver/handler_v2_test.go +++ b/internal/distributed/proxy/httpserver/handler_v2_test.go @@ -898,8 +898,9 @@ func TestMethodPost(t *testing.T) { internalpb.ImportJobState_Failed, internalpb.ImportJobState_Completed, }, - Reasons: []string{"", "", "mock reason", ""}, - Progresses: []int64{0, 30, 0, 100}, + Reasons: []string{"", "", "mock reason", ""}, + Progresses: []int64{0, 30, 0, 100}, + CollectionNames: []string{"AAA", "BBB", "CCC", "DDD"}, }, nil).Once() mp.EXPECT().GetImportProgress(mock.Anything, mock.Anything).Return(&internalpb.GetImportProgressResponse{ Status: &StatusSuccess, @@ -985,7 +986,7 @@ func TestMethodPost(t *testing.T) { `"userName": "` + util.UserRoot + `", "password": "Milvus", "newPassword": "milvus", "roleName": "` + util.RoleAdmin + `",` + `"roleName": "` + util.RoleAdmin + `", "objectType": "Global", "objectName": "*", "privilege": "*",` + `"aliasName": "` + DefaultAliasName + `",` + - `"jobID": "1234567890",` + + `"jobId": "1234567890",` + `"files": [["book.json"]]` + `}`)) req := httptest.NewRequest(http.MethodPost, testcase.path, bodyReader) diff --git a/internal/distributed/proxy/httpserver/request_v2.go b/internal/distributed/proxy/httpserver/request_v2.go index a4404a1b1f..0e2c130904 100644 --- a/internal/distributed/proxy/httpserver/request_v2.go +++ b/internal/distributed/proxy/httpserver/request_v2.go @@ -33,6 +33,19 @@ func (req *CollectionNameReq) GetPartitionNames() []string { return req.PartitionNames } +type OptionalCollectionNameReq struct { + DbName string `json:"dbName"` + CollectionName string `json:"collectionName"` +} + +func (req *OptionalCollectionNameReq) GetDbName() string { + return req.DbName +} + +func (req *OptionalCollectionNameReq) GetCollectionName() string { + return req.CollectionName +} + type RenameCollectionReq struct { DbName string `json:"dbName"` CollectionName string `json:"collectionName" binding:"required"` @@ -82,7 +95,7 @@ func (req *ImportReq) GetOptions() map[string]string { } type JobIDReq struct { - JobID string `json:"jobID" binding:"required"` + JobID string `json:"jobId" binding:"required"` } func (req *JobIDReq) GetJobID() string { return req.JobID } diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index ab50d983c3..ca4299d586 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -859,15 +859,17 @@ message ImportJob { int64 jobID = 1; int64 dbID = 2; int64 collectionID = 3; - repeated int64 partitionIDs = 4; - repeated string vchannels = 5; - schema.CollectionSchema schema = 6; - uint64 timeout_ts = 7; - uint64 cleanup_ts = 8; - internal.ImportJobState state = 9; - string reason = 10; - repeated internal.ImportFile files = 11; - repeated common.KeyValuePair options = 12; + string collection_name = 4; + repeated int64 partitionIDs = 5; + repeated string vchannels = 6; + schema.CollectionSchema schema = 7; + uint64 timeout_ts = 8; + uint64 cleanup_ts = 9; + internal.ImportJobState state = 10; + string reason = 11; + string complete_time = 12; + repeated internal.ImportFile files = 13; + repeated common.KeyValuePair options = 14; } enum ImportTaskStateV2 { @@ -896,6 +898,7 @@ message ImportTaskV2 { int64 nodeID = 5; ImportTaskStateV2 state = 6; string reason = 7; + string complete_time = 8; repeated ImportFileStats file_stats = 9; } diff --git a/internal/proto/internal.proto b/internal/proto/internal.proto index 3608cc3f73..ef42e11b24 100644 --- a/internal/proto/internal.proto +++ b/internal/proto/internal.proto @@ -280,11 +280,12 @@ message ImportFile { message ImportRequestInternal { int64 dbID = 1; int64 collectionID = 2; - repeated int64 partitionIDs = 3; - repeated string channel_names = 4; - schema.CollectionSchema schema = 5; - repeated ImportFile files = 6; - repeated common.KeyValuePair options = 7; + string collection_name = 3; + repeated int64 partitionIDs = 4; + repeated string channel_names = 5; + schema.CollectionSchema schema = 6; + repeated ImportFile files = 7; + repeated common.KeyValuePair options = 8; } message ImportRequest { @@ -305,11 +306,22 @@ message GetImportProgressRequest { string jobID = 2; } +message ImportTaskProgress { + string file_name = 1; + int64 file_size = 2; + string reason = 3; + int64 progress = 4; + string complete_time = 5; +} + message GetImportProgressResponse { common.Status status = 1; ImportJobState state = 2; string reason = 3; int64 progress = 4; + string collection_name = 5; + string complete_time = 6; + repeated ImportTaskProgress task_progresses = 7; } message ListImportsRequestInternal { @@ -328,4 +340,5 @@ message ListImportsResponse { repeated ImportJobState states = 3; repeated string reasons = 4; repeated int64 progresses = 5; + repeated string collection_names = 6; } diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index e0e0dd5502..81ba356fa4 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -5643,12 +5643,13 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest) } } importRequest := &internalpb.ImportRequestInternal{ - CollectionID: collectionID, - PartitionIDs: partitionIDs, - ChannelNames: channels, - Schema: schema.CollectionSchema, - Files: req.GetFiles(), - Options: req.GetOptions(), + CollectionID: collectionID, + CollectionName: req.GetCollectionName(), + PartitionIDs: partitionIDs, + ChannelNames: channels, + Schema: schema.CollectionSchema, + Files: req.GetFiles(), + Options: req.GetOptions(), } resp, err = node.dataCoord.ImportV2(ctx, importRequest) if err != nil {