diff --git a/internal/datacoord/analyze_inspector.go b/internal/datacoord/analyze_inspector.go index 87bed497ff..505349d079 100644 --- a/internal/datacoord/analyze_inspector.go +++ b/internal/datacoord/analyze_inspector.go @@ -51,8 +51,9 @@ func (ai *analyzeInspector) Stop() { func (ai *analyzeInspector) reloadFromMeta() { analyzeTasks := ai.mt.analyzeMeta.GetAllTasks() for _, t := range analyzeTasks { - if t.GetState() == indexpb.JobState_JobStateFinished || - t.GetState() == indexpb.JobState_JobStateFailed { + if t.GetState() != indexpb.JobState_JobStateInit && + t.GetState() != indexpb.JobState_JobStateRetry && + t.GetState() != indexpb.JobState_JobStateInProgress { continue } ai.scheduler.Enqueue(newAnalyzeTask( diff --git a/internal/datacoord/compaction_inspector.go b/internal/datacoord/compaction_inspector.go index 6c939098b6..8c749a02ff 100644 --- a/internal/datacoord/compaction_inspector.go +++ b/internal/datacoord/compaction_inspector.go @@ -553,6 +553,7 @@ func (c *compactionInspector) submitTask(t CompactionTask) error { func (c *compactionInspector) restoreTask(t CompactionTask) { c.executingGuard.Lock() c.executingTasks[t.GetTaskProto().GetPlanID()] = t + c.scheduler.Enqueue(t) c.executingGuard.Unlock() metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Executing).Inc() } diff --git a/internal/datacoord/compaction_inspector_test.go b/internal/datacoord/compaction_inspector_test.go index 9e3c712203..120cab6c63 100644 --- a/internal/datacoord/compaction_inspector_test.go +++ b/internal/datacoord/compaction_inspector_test.go @@ -75,6 +75,7 @@ func (s *CompactionPlanHandlerSuite) TestScheduleEmpty() { } func (s *CompactionPlanHandlerSuite) generateInitTasksForSchedule() { + s.handler.scheduler.(*task.MockGlobalScheduler).EXPECT().Enqueue(mock.Anything).Return() task1 := &mixCompactionTask{ meta: s.mockMeta, } @@ -407,6 +408,8 @@ func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() { s.SetupTest() ch := "ch1" + s.handler.scheduler.(*task.MockGlobalScheduler).EXPECT().Enqueue(mock.Anything).Return() + t1 := newMixCompactionTask(&datapb.CompactionTask{ PlanID: 19530, Type: datapb.CompactionType_MixCompaction, diff --git a/internal/datacoord/import_inspector.go b/internal/datacoord/import_inspector.go index b7870596ef..1ec9f3fe85 100644 --- a/internal/datacoord/import_inspector.go +++ b/internal/datacoord/import_inspector.go @@ -62,6 +62,7 @@ func NewImportInspector(ctx context.Context, meta *meta, importMeta ImportMeta, } func (s *importInspector) Start() { + s.reloadFromMeta() log.Ctx(s.ctx).Info("start import inspector") ticker := time.NewTicker(Params.DataCoordCfg.ImportScheduleInterval.GetAsDuration(time.Second)) defer ticker.Stop() @@ -82,6 +83,21 @@ func (s *importInspector) Close() { }) } +func (s *importInspector) reloadFromMeta() { + jobs := s.importMeta.GetJobBy(s.ctx) + sort.Slice(jobs, func(i, j int) bool { + return jobs[i].GetJobID() < jobs[j].GetJobID() + }) + for _, job := range jobs { + tasks := s.importMeta.GetTaskBy(s.ctx, WithJob(job.GetJobID())) + for _, task := range tasks { + if task.GetState() == datapb.ImportTaskStateV2_InProgress { + s.scheduler.Enqueue(task) + } + } + } +} + func (s *importInspector) inspect() { jobs := s.importMeta.GetJobBy(s.ctx) sort.Slice(jobs, func(i, j int) bool { diff --git a/internal/datacoord/import_inspector_test.go b/internal/datacoord/import_inspector_test.go index 78417328cf..d4e597c6ee 100644 --- a/internal/datacoord/import_inspector_test.go +++ b/internal/datacoord/import_inspector_test.go @@ -258,6 +258,77 @@ func (s *ImportInspectorSuite) TestProcessFailed() { s.Equal(0, len(task.(*importTask).GetSegmentIDs())) } +func (s *ImportInspectorSuite) TestReloadFromMeta() { + // Test case 1: No jobs and tasks + s.catalog.EXPECT().ListImportJobs(mock.Anything).Return(nil, nil) + s.catalog.EXPECT().ListPreImportTasks(mock.Anything).Return(nil, nil) + s.catalog.EXPECT().ListImportTasks(mock.Anything).Return(nil, nil) + s.inspector.reloadFromMeta() + + // Test case 2: Jobs with in-progress tasks + jobProto := &datapb.ImportJob{ + JobID: 1, + CollectionID: s.collectionID, + TimeoutTs: math.MaxUint64, + Schema: &schemapb.CollectionSchema{}, + } + job := &importJob{ + ImportJob: jobProto, + tr: timerecord.NewTimeRecorder("import job"), + } + s.catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil) + err := s.importMeta.AddJob(context.TODO(), job) + s.NoError(err) + + // Add an in-progress pre-import task + inprogressPreImportTask := &preImportTask{ + importMeta: s.importMeta, + tr: timerecord.NewTimeRecorder("preimport task"), + } + inprogressPreImportTask.task.Store(&datapb.PreImportTask{ + JobID: 1, + TaskID: 1, + CollectionID: s.collectionID, + State: datapb.ImportTaskStateV2_InProgress, + }) + s.catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(nil) + err = s.importMeta.AddTask(context.TODO(), inprogressPreImportTask) + s.NoError(err) + + // Add an in-progress import task + inprogressImportTask := &importTask{ + importMeta: s.importMeta, + tr: timerecord.NewTimeRecorder("import task"), + } + inprogressImportTask.task.Store(&datapb.ImportTaskV2{ + JobID: 1, + TaskID: 2, + CollectionID: s.collectionID, + State: datapb.ImportTaskStateV2_InProgress, + }) + s.catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil) + err = s.importMeta.AddTask(context.TODO(), inprogressImportTask) + s.NoError(err) + + // Add an pending import task + pendingImportTask := &importTask{ + importMeta: s.importMeta, + tr: timerecord.NewTimeRecorder("import task"), + } + pendingImportTask.task.Store(&datapb.ImportTaskV2{ + JobID: 1, + TaskID: 3, + CollectionID: s.collectionID, + State: datapb.ImportTaskStateV2_Pending, + }) + s.catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil) + err = s.importMeta.AddTask(context.TODO(), pendingImportTask) + + // Mock scheduler expectations + s.inspector.scheduler.(*task2.MockGlobalScheduler).EXPECT().Enqueue(mock.Anything).Times(2) + s.inspector.reloadFromMeta() +} + func TestImportInspector(t *testing.T) { suite.Run(t, new(ImportInspectorSuite)) } diff --git a/internal/datacoord/index_inspector.go b/internal/datacoord/index_inspector.go index 5b3e764c60..245d3fb5bb 100644 --- a/internal/datacoord/index_inspector.go +++ b/internal/datacoord/index_inspector.go @@ -200,8 +200,9 @@ func (i *indexInspector) reloadFromMeta() { segments := i.meta.GetAllSegmentsUnsafe() for _, segment := range segments { for _, segIndex := range i.meta.indexMeta.GetSegmentIndexes(segment.GetCollectionID(), segment.ID) { - if segIndex.IsDeleted || segIndex.IndexState == commonpb.IndexState_Finished || - segIndex.IndexState == commonpb.IndexState_Failed { + if segIndex.IsDeleted || (segIndex.IndexState != commonpb.IndexState_Unissued && + segIndex.IndexState != commonpb.IndexState_Retry && + segIndex.IndexState != commonpb.IndexState_InProgress) { continue } diff --git a/internal/datacoord/stats_inspector.go b/internal/datacoord/stats_inspector.go index 05a52c2567..91c8309bdd 100644 --- a/internal/datacoord/stats_inspector.go +++ b/internal/datacoord/stats_inspector.go @@ -98,8 +98,9 @@ func (si *statsInspector) Stop() { func (si *statsInspector) reloadFromMeta() { tasks := si.mt.statsTaskMeta.GetAllTasks() for _, st := range tasks { - if st.GetState() == indexpb.JobState_JobStateFinished || - st.GetState() == indexpb.JobState_JobStateFailed { + if st.GetState() != indexpb.JobState_JobStateInit && + st.GetState() != indexpb.JobState_JobStateRetry && + st.GetState() != indexpb.JobState_JobStateInProgress { continue } segment := si.mt.GetHealthySegment(si.ctx, st.GetSegmentID()) diff --git a/internal/datacoord/task/global_scheduler.go b/internal/datacoord/task/global_scheduler.go index 8f74c5de47..b16017dbb6 100644 --- a/internal/datacoord/task/global_scheduler.go +++ b/internal/datacoord/task/global_scheduler.go @@ -69,10 +69,11 @@ func (s *globalTaskScheduler) Enqueue(task Task) { case taskcommon.Init: task.SetTaskTime(taskcommon.TimeQueue, time.Now()) s.pendingTasks.Push(task) - case taskcommon.InProgress: + case taskcommon.InProgress, taskcommon.Retry: task.SetTaskTime(taskcommon.TimeStart, time.Now()) s.runningTasks.Insert(task.GetTaskID(), task) } + log.Ctx(s.ctx).Info("task enqueued", WrapTaskLog(task)...) } func (s *globalTaskScheduler) AbortAndRemoveTask(taskID int64) { diff --git a/internal/datacoord/task/global_scheduler_test.go b/internal/datacoord/task/global_scheduler_test.go index ffb7831c87..f1eabe63ec 100644 --- a/internal/datacoord/task/global_scheduler_test.go +++ b/internal/datacoord/task/global_scheduler_test.go @@ -41,6 +41,7 @@ func TestGlobalScheduler_Enqueue(t *testing.T) { task := NewMockTask(t) task.EXPECT().GetTaskID().Return(1) task.EXPECT().GetTaskState().Return(taskcommon.Init) + task.EXPECT().GetTaskType().Return(taskcommon.Compaction) task.EXPECT().SetTaskTime(mock.Anything, mock.Anything).Return() scheduler.Enqueue(task) assert.Equal(t, 1, len(scheduler.(*globalTaskScheduler).pendingTasks.TaskIDs())) @@ -50,6 +51,7 @@ func TestGlobalScheduler_Enqueue(t *testing.T) { task = NewMockTask(t) task.EXPECT().GetTaskID().Return(2) task.EXPECT().GetTaskState().Return(taskcommon.InProgress) + task.EXPECT().GetTaskType().Return(taskcommon.Compaction) task.EXPECT().SetTaskTime(mock.Anything, mock.Anything).Return() scheduler.Enqueue(task) assert.Equal(t, 1, scheduler.(*globalTaskScheduler).runningTasks.Len()) @@ -64,6 +66,7 @@ func TestGlobalScheduler_AbortAndRemoveTask(t *testing.T) { task := NewMockTask(t) task.EXPECT().GetTaskID().Return(1) task.EXPECT().GetTaskState().Return(taskcommon.Init) + task.EXPECT().GetTaskType().Return(taskcommon.Compaction) task.EXPECT().SetTaskTime(mock.Anything, mock.Anything).Return() task.EXPECT().DropTaskOnWorker(mock.Anything).Return() scheduler.Enqueue(task) @@ -74,6 +77,7 @@ func TestGlobalScheduler_AbortAndRemoveTask(t *testing.T) { task = NewMockTask(t) task.EXPECT().GetTaskID().Return(2) task.EXPECT().GetTaskState().Return(taskcommon.InProgress) + task.EXPECT().GetTaskType().Return(taskcommon.Compaction) task.EXPECT().SetTaskTime(mock.Anything, mock.Anything).Return() task.EXPECT().DropTaskOnWorker(mock.Anything).Return() scheduler.Enqueue(task) @@ -140,6 +144,7 @@ func TestGlobalScheduler_TestSchedule(t *testing.T) { task.EXPECT().GetTaskID().Return(1) task.EXPECT().GetTaskType().Return(taskcommon.Compaction) task.EXPECT().GetTaskState().Return(taskcommon.Init) + task.EXPECT().GetTaskType().Return(taskcommon.Compaction) task.EXPECT().SetTaskTime(mock.Anything, mock.Anything).Return() task.EXPECT().GetTaskSlot().Return(1) return task