diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index f697fe7ee1..b3b06d3e4a 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -18,6 +18,7 @@ package datacoord import ( "context" + "fmt" "path" "sort" "time" @@ -252,6 +253,9 @@ func getPendingProgress(jobID int64, imeta ImportMeta) float32 { return len(task.GetFileStats()) }) totalFiles := len(imeta.GetJob(jobID).GetFiles()) + if totalFiles == 0 { + return 1 + } return float32(preImportingFiles) / float32(totalFiles) } @@ -260,6 +264,9 @@ func getPreImportingProgress(jobID int64, imeta ImportMeta) float32 { completedTasks := lo.Filter(tasks, func(task ImportTask, _ int) bool { return task.GetState() == datapb.ImportTaskStateV2_Completed }) + if len(tasks) == 0 { + return 1 + } return float32(len(completedTasks)) / float32(len(tasks)) } @@ -277,7 +284,10 @@ func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) float32 { segmentIDs = append(segmentIDs, task.(*importTask).GetSegmentIDs()...) } importedRows = meta.GetSegmentsTotalCurrentRows(segmentIDs) - importingProgress := float32(importedRows) / float32(totalRows) + var importingProgress float32 = 1 + if totalRows != 0 { + importingProgress = float32(importedRows) / float32(totalRows) + } var ( unsetIsImportingSegment int64 @@ -297,7 +307,10 @@ func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) float32 { } } } - completedProgress := float32(unsetIsImportingSegment) / float32(totalSegment) + var completedProgress float32 = 1 + if totalSegment != 0 { + completedProgress = float32(unsetIsImportingSegment) / float32(totalSegment) + } return importingProgress*0.8 + completedProgress*0.2 } @@ -371,7 +384,15 @@ func ListBinlogsAndGroupBySegment(ctx context.Context, cm storage.ChunkManager, return nil, merr.WrapErrImportFailed("no insert binlogs to import") } - segmentInsertPaths, _, err := cm.ListWithPrefix(ctx, importFile.GetPaths()[0], false) + insertPrefix := importFile.GetPaths()[0] + ok, err := cm.Exist(ctx, insertPrefix) + if err != nil { + return nil, err + } + if !ok { + return nil, fmt.Errorf("insert binlog prefix does not exist, path=%s", insertPrefix) + } + segmentInsertPaths, _, err := cm.ListWithPrefix(ctx, insertPrefix, false) if err != nil { return nil, err } @@ -382,7 +403,16 @@ func ListBinlogsAndGroupBySegment(ctx context.Context, cm storage.ChunkManager, if len(importFile.GetPaths()) < 2 { return segmentImportFiles, nil } - segmentDeltaPaths, _, err := cm.ListWithPrefix(context.Background(), importFile.GetPaths()[1], false) + deltaPrefix := importFile.GetPaths()[1] + ok, err = cm.Exist(ctx, deltaPrefix) + if err != nil { + return nil, err + } + if !ok { + log.Warn("delta binlog prefix does not exist", zap.String("path", deltaPrefix)) + return segmentImportFiles, nil + } + segmentDeltaPaths, _, err := cm.ListWithPrefix(context.Background(), deltaPrefix, false) if err != nil { return nil, err } diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 537fedfa9d..ecf058ac98 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -23,6 +23,7 @@ import ( "path" "testing" + "github.com/cockroachdb/errors" "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -261,6 +262,7 @@ func TestImportUtil_ListBinlogsAndGroupBySegment(t *testing.T) { ctx := context.Background() cm := mocks2.NewChunkManager(t) + cm.EXPECT().Exist(mock.Anything, mock.Anything).Return(true, nil) cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(segmentInsertPaths, nil, nil) cm.EXPECT().ListWithPrefix(mock.Anything, deltaPrefix, mock.Anything).Return(segmentDeltaPaths, nil, nil) @@ -279,6 +281,32 @@ func TestImportUtil_ListBinlogsAndGroupBySegment(t *testing.T) { assert.True(t, segmentID == "435978159261483008" || segmentID == "435978159261483009") } } + + // test failure + mockErr := errors.New("mock err") + cm = mocks2.NewChunkManager(t) + cm.EXPECT().Exist(mock.Anything, insertPrefix).Return(true, mockErr) + _, err = ListBinlogsAndGroupBySegment(ctx, cm, file) + assert.Error(t, err) + + cm = mocks2.NewChunkManager(t) + cm.EXPECT().Exist(mock.Anything, insertPrefix).Return(false, nil) + _, err = ListBinlogsAndGroupBySegment(ctx, cm, file) + assert.Error(t, err) + + cm = mocks2.NewChunkManager(t) + cm.EXPECT().Exist(mock.Anything, insertPrefix).Return(true, nil) + cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(segmentInsertPaths, nil, nil) + cm.EXPECT().Exist(mock.Anything, deltaPrefix).Return(true, mockErr) + _, err = ListBinlogsAndGroupBySegment(ctx, cm, file) + assert.Error(t, err) + + cm = mocks2.NewChunkManager(t) + cm.EXPECT().Exist(mock.Anything, insertPrefix).Return(true, nil) + cm.EXPECT().ListWithPrefix(mock.Anything, insertPrefix, mock.Anything).Return(segmentInsertPaths, nil, nil) + cm.EXPECT().Exist(mock.Anything, deltaPrefix).Return(false, nil) + _, err = ListBinlogsAndGroupBySegment(ctx, cm, file) + assert.NoError(t, err) } func TestImportUtil_GetImportProgress(t *testing.T) { diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 91928d883c..89851cf159 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1773,9 +1773,8 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter log := log.With(zap.Int64("collection", in.GetCollectionID()), zap.Int64s("partitions", in.GetPartitionIDs()), - zap.Strings("channels", in.GetChannelNames()), - zap.Any("files", in.GetFiles())) - log.Info("receive import request") + zap.Strings("channels", in.GetChannelNames())) + log.Info("receive import request", zap.Any("files", in.GetFiles())) var timeoutTs uint64 = math.MaxUint64 timeoutStr, err := funcutil.GetAttrByKeyFromRepeatedKV("timeout", in.GetOptions()) @@ -1800,11 +1799,18 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter for _, importFile := range in.GetFiles() { segmentPrefixes, err := ListBinlogsAndGroupBySegment(ctx, s.meta.chunkManager, importFile) if err != nil { - resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprint("list binlogs and group by segment failed, err=%w", err))) + resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("list binlogs failed, err=%s", err))) return resp, nil } files = append(files, segmentPrefixes...) } + files = lo.Filter(files, func(file *internalpb.ImportFile, _ int) bool { + return len(file.GetPaths()) > 0 + }) + if len(files) == 0 { + resp.Status = merr.Status(merr.WrapErrParameterInvalidMsg(fmt.Sprintf("no binlog to import, import_prefix=%s", in.GetFiles()))) + return resp, nil + } } idStart, _, err := s.allocator.allocN(int64(len(files)) + 1) @@ -1839,7 +1845,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter } resp.JobID = fmt.Sprint(job.GetJobID()) - log.Info("add import job done", zap.Int64("jobID", job.GetJobID())) + log.Info("add import job done", zap.Int64("jobID", job.GetJobID()), zap.Any("files", files)) return resp, nil } diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index b9c8e646f7..4cf9e1a2eb 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -1369,6 +1369,7 @@ func TestImportV2(t *testing.T) { // list binlog failed cm := mocks2.NewChunkManager(t) + cm.EXPECT().Exist(mock.Anything, mock.Anything).Return(true, nil) cm.EXPECT().ListWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, mockErr) s.meta = &meta{chunkManager: cm} resp, err = s.ImportV2(ctx, &internalpb.ImportRequestInternal{ @@ -1388,6 +1389,28 @@ func TestImportV2(t *testing.T) { assert.NoError(t, err) assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed)) + // list no binlog + cm = mocks2.NewChunkManager(t) + cm.EXPECT().Exist(mock.Anything, mock.Anything).Return(true, nil) + cm.EXPECT().ListWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil) + s.meta = &meta{chunkManager: cm} + resp, err = s.ImportV2(ctx, &internalpb.ImportRequestInternal{ + Files: []*internalpb.ImportFile{ + { + Id: 1, + Paths: []string{"mock_insert_prefix"}, + }, + }, + Options: []*commonpb.KeyValuePair{ + { + Key: "backup", + Value: "true", + }, + }, + }) + assert.NoError(t, err) + assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrParameterInvalid)) + // alloc failed alloc := NewNMockAllocator(t) alloc.EXPECT().allocN(mock.Anything).Return(0, 0, mockErr) diff --git a/internal/datanode/importv2/executor.go b/internal/datanode/importv2/executor.go index 15501a611c..6804b17fbd 100644 --- a/internal/datanode/importv2/executor.go +++ b/internal/datanode/importv2/executor.go @@ -318,6 +318,9 @@ func (e *executor) Sync(task *ImportTask, hashedData HashedData) ([]*conc.Future for channelIdx, datas := range hashedData { channel := task.GetVchannels()[channelIdx] for partitionIdx, data := range datas { + if data.GetRowNum() == 0 { + continue + } partitionID := task.GetPartitionIDs()[partitionIdx] size := data.GetMemorySize() segmentID := PickSegment(task, segmentImportedSizes, channel, partitionID, size)