diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index 54fafd7b9a..6b1fa54ffa 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -268,12 +268,6 @@ func (c *importChecker) checkImportingJob(job ImportJob) { return } for _, segmentID := range unfinished { - err = AddImportSegment(c.cluster, c.meta, segmentID) - if err != nil { - log.Warn("add import segment failed", zap.Int64("jobID", job.GetJobID()), - zap.Int64("collectionID", job.GetCollectionID()), zap.Error(err)) - return - } c.buildIndexCh <- segmentID // accelerate index building channelCP := c.meta.GetChannelCheckpoint(channels[segmentID]) if channelCP == nil { diff --git a/internal/datacoord/import_checker_test.go b/internal/datacoord/import_checker_test.go index 01a447c396..0451cfe68a 100644 --- a/internal/datacoord/import_checker_test.go +++ b/internal/datacoord/import_checker_test.go @@ -180,8 +180,6 @@ func (s *ImportCheckerSuite) TestCheckJob() { } sm := s.checker.sm.(*MockManager) sm.EXPECT().FlushImportSegments(mock.Anything, mock.Anything, mock.Anything).Return(nil) - cluster := s.checker.cluster.(*MockCluster) - cluster.EXPECT().AddImportSegment(mock.Anything, mock.Anything).Return(nil, nil) catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil) catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil) catalog.EXPECT().SaveChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil) diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index e6c7199282..f697fe7ee1 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -30,7 +30,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -247,25 +246,6 @@ func RegroupImportFiles(job ImportJob, files []*datapb.ImportFileStats) [][]*dat return fileGroups } -func AddImportSegment(cluster Cluster, meta *meta, segmentID int64) error { - segment := meta.GetSegment(segmentID) - req := &datapb.AddImportSegmentRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithSourceID(paramtable.GetNodeID()), - ), - SegmentId: segment.GetID(), - ChannelName: segment.GetInsertChannel(), - CollectionId: segment.GetCollectionID(), - PartitionId: segment.GetPartitionID(), - RowNum: segment.GetNumOfRows(), - StatsLog: segment.GetStatslogs(), - } - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - _, err := cluster.AddImportSegment(ctx, req) - return err -} - func getPendingProgress(jobID int64, imeta ImportMeta) float32 { tasks := imeta.GetTaskBy(WithJob(jobID), WithType(PreImportTaskType)) preImportingFiles := lo.SumBy(tasks, func(task ImportTask) int { diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 7a6c53dcb0..537fedfa9d 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -215,29 +215,6 @@ func TestImportUtil_RegroupImportFiles(t *testing.T) { assert.Equal(t, fileNum, total) } -func TestImportUtil_AddImportSegment(t *testing.T) { - cluster := NewMockCluster(t) - cluster.EXPECT().AddImportSegment(mock.Anything, mock.Anything).Return(nil, nil) - - catalog := mocks.NewDataCoordCatalog(t) - catalog.EXPECT().ListSegments(mock.Anything).Return(nil, nil) - catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, nil) - catalog.EXPECT().ListIndexes(mock.Anything).Return(nil, nil) - catalog.EXPECT().ListSegmentIndexes(mock.Anything).Return(nil, nil) - catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil) - - meta, err := newMeta(context.TODO(), catalog, nil) - assert.NoError(t, err) - segment := &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ID: 1, IsImporting: true}, - } - err = meta.AddSegment(context.Background(), segment) - assert.NoError(t, err) - - err = AddImportSegment(cluster, meta, segment.GetID()) - assert.NoError(t, err) -} - func TestImportUtil_DropImportTask(t *testing.T) { cluster := NewMockCluster(t) cluster.EXPECT().DropImport(mock.Anything, mock.Anything).Return(nil)