diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index e26775daa1..75cdd3bbe8 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -86,7 +86,7 @@ type CompactionTriggerManager struct { allocator allocator.Allocator meta *meta - imeta ImportMeta + importMeta ImportMeta l0Policy *l0CompactionPolicy clusteringPolicy *clusteringCompactionPolicy singlePolicy *singleCompactionPolicy @@ -103,13 +103,13 @@ type CompactionTriggerManager struct { compactionChanLock sync.Mutex } -func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, inspector CompactionInspector, meta *meta, imeta ImportMeta) *CompactionTriggerManager { +func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, inspector CompactionInspector, meta *meta, importMeta ImportMeta) *CompactionTriggerManager { m := &CompactionTriggerManager{ allocator: alloc, handler: handler, inspector: inspector, meta: meta, - imeta: imeta, + importMeta: importMeta, pauseCompactionChanMap: make(map[int64]chan struct{}), resumeCompactionChanMap: make(map[int64]chan struct{}), } @@ -386,7 +386,7 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, func (m *CompactionTriggerManager) addL0ImportTaskForImport(ctx context.Context, collection *collectionInfo, view CompactionView) error { // add l0 import task for the collection if the collection is importing - importJobs := m.imeta.GetJobBy(ctx, + importJobs := m.importMeta.GetJobBy(ctx, WithCollectionID(collection.ID), WithoutJobStates(internalpb.ImportJobState_Completed, internalpb.ImportJobState_Failed), WithoutL0Job(), @@ -442,13 +442,13 @@ func (m *CompactionTriggerManager) addL0ImportTaskForImport(ctx context.Context, }, }, }, - }, job, m.allocator, m.meta, m.imeta) + }, job, m.allocator, m.meta, m.importMeta) if err != nil { log.Warn("new import tasks failed", zap.Error(err)) return err } for _, t := range newTasks { - err = m.imeta.AddTask(ctx, t) + err = m.importMeta.AddTask(ctx, t) if err != nil { log.Warn("add new l0 import task from l0 compaction failed", WrapTaskLog(t, zap.Error(err))...) return err diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index a1ec2e37a1..98384f45f4 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -31,12 +31,12 @@ func TestCompactionTriggerManagerSuite(t *testing.T) { type CompactionTriggerManagerSuite struct { suite.Suite - mockAlloc *allocator.MockAllocator - handler Handler - inspector *MockCompactionInspector - testLabel *CompactionGroupLabel - meta *meta - imeta ImportMeta + mockAlloc *allocator.MockAllocator + handler Handler + inspector *MockCompactionInspector + testLabel *CompactionGroupLabel + meta *meta + importMeta ImportMeta triggerManager *CompactionTriggerManager } @@ -62,8 +62,8 @@ func (s *CompactionTriggerManagerSuite) SetupTest() { catalog.EXPECT().ListImportJobs(mock.Anything).Return([]*datapb.ImportJob{}, nil) importMeta, err := NewImportMeta(context.TODO(), catalog, s.mockAlloc, s.meta) s.Require().NoError(err) - s.imeta = importMeta - s.triggerManager = NewCompactionTriggerManager(s.mockAlloc, s.handler, s.inspector, s.meta, s.imeta) + s.importMeta = importMeta + s.triggerManager = NewCompactionTriggerManager(s.mockAlloc, s.handler, s.inspector, s.meta, s.importMeta) } func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { @@ -368,8 +368,7 @@ func TestCompactionAndImport(t *testing.T) { catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil) importMeta, err := NewImportMeta(context.TODO(), catalog, mockAlloc, meta) assert.NoError(t, err) - imeta := importMeta - triggerManager := NewCompactionTriggerManager(mockAlloc, handler, inspector, meta, imeta) + triggerManager := NewCompactionTriggerManager(mockAlloc, handler, inspector, meta, importMeta) Params.Save(Params.DataCoordCfg.L0CompactionTriggerInterval.Key, "1") defer Params.Reset(Params.DataCoordCfg.L0CompactionTriggerInterval.Key) diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index db36503728..c4fd30efa4 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -43,10 +43,11 @@ type ImportChecker interface { } type importChecker struct { + ctx context.Context meta *meta broker broker.Broker alloc allocator.Allocator - imeta ImportMeta + importMeta ImportMeta si StatsInspector l0CompactionTrigger TriggerManager @@ -54,18 +55,20 @@ type importChecker struct { closeChan chan struct{} } -func NewImportChecker(meta *meta, +func NewImportChecker(ctx context.Context, + meta *meta, broker broker.Broker, alloc allocator.Allocator, - imeta ImportMeta, + importMeta ImportMeta, si StatsInspector, l0CompactionTrigger TriggerManager, ) ImportChecker { return &importChecker{ + ctx: ctx, meta: meta, broker: broker, alloc: alloc, - imeta: imeta, + importMeta: importMeta, si: si, l0CompactionTrigger: l0CompactionTrigger, closeChan: make(chan struct{}), @@ -86,7 +89,7 @@ func (c *importChecker) Start() { log.Info("import checker exited") return case <-ticker1.C: - jobs := c.imeta.GetJobBy(context.TODO()) + jobs := c.importMeta.GetJobBy(c.ctx) for _, job := range jobs { if !funcutil.SliceSetEqual[string](job.GetVchannels(), job.GetReadyVchannels()) { // wait for all channels to send signals @@ -112,7 +115,7 @@ func (c *importChecker) Start() { } } case <-ticker2.C: - jobs := c.imeta.GetJobBy(context.TODO()) + jobs := c.importMeta.GetJobBy(c.ctx) for _, job := range jobs { c.tryTimeoutJob(job) c.checkGC(job) @@ -168,9 +171,9 @@ func (c *importChecker) LogTaskStats() { metrics.ImportTasks.WithLabelValues(taskType.String(), datapb.ImportTaskStateV2_Completed.String()).Set(float64(completed)) metrics.ImportTasks.WithLabelValues(taskType.String(), datapb.ImportTaskStateV2_Failed.String()).Set(float64(failed)) } - tasks := c.imeta.GetTaskBy(context.TODO(), WithType(PreImportTaskType)) + tasks := c.importMeta.GetTaskBy(c.ctx, WithType(PreImportTaskType)) logFunc(tasks, PreImportTaskType) - tasks = c.imeta.GetTaskBy(context.TODO(), WithType(ImportTaskType)) + tasks = c.importMeta.GetTaskBy(c.ctx, WithType(ImportTaskType)) logFunc(tasks, ImportTaskType) } @@ -178,7 +181,7 @@ func (c *importChecker) getLackFilesForPreImports(job ImportJob) []*internalpb.I lacks := lo.KeyBy(job.GetFiles(), func(file *internalpb.ImportFile) int64 { return file.GetId() }) - exists := c.imeta.GetTaskBy(context.TODO(), WithType(PreImportTaskType), WithJob(job.GetJobID())) + exists := c.importMeta.GetTaskBy(c.ctx, WithType(PreImportTaskType), WithJob(job.GetJobID())) for _, task := range exists { for _, file := range task.GetFileStats() { delete(lacks, file.GetImportFile().GetId()) @@ -188,7 +191,7 @@ func (c *importChecker) getLackFilesForPreImports(job ImportJob) []*internalpb.I } func (c *importChecker) getLackFilesForImports(job ImportJob) []*datapb.ImportFileStats { - preimports := c.imeta.GetTaskBy(context.TODO(), WithType(PreImportTaskType), WithJob(job.GetJobID())) + preimports := c.importMeta.GetTaskBy(c.ctx, WithType(PreImportTaskType), WithJob(job.GetJobID())) lacks := make(map[int64]*datapb.ImportFileStats, 0) for _, t := range preimports { if t.GetState() != datapb.ImportTaskStateV2_Completed { @@ -199,7 +202,7 @@ func (c *importChecker) getLackFilesForImports(job ImportJob) []*datapb.ImportFi lacks[stat.GetImportFile().GetId()] = stat } } - exists := c.imeta.GetTaskBy(context.TODO(), WithType(ImportTaskType), WithJob(job.GetJobID())) + exists := c.importMeta.GetTaskBy(c.ctx, WithType(ImportTaskType), WithJob(job.GetJobID())) for _, task := range exists { for _, file := range task.GetFileStats() { delete(lacks, file.GetImportFile().GetId()) @@ -216,13 +219,13 @@ func (c *importChecker) checkPendingJob(job ImportJob) { } fileGroups := lo.Chunk(lacks, Params.DataCoordCfg.FilesPerPreImportTask.GetAsInt()) - newTasks, err := NewPreImportTasks(fileGroups, job, c.alloc, c.imeta) + newTasks, err := NewPreImportTasks(fileGroups, job, c.alloc, c.importMeta) if err != nil { log.Warn("new preimport tasks failed", zap.Error(err)) return } for _, t := range newTasks { - err = c.imeta.AddTask(context.TODO(), t) + err = c.importMeta.AddTask(c.ctx, t) if err != nil { log.Warn("add preimport task failed", WrapTaskLog(t, zap.Error(err))...) return @@ -230,7 +233,7 @@ func (c *importChecker) checkPendingJob(job ImportJob) { log.Info("add new preimport task", WrapTaskLog(t)...) } - err = c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting)) + err = c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting)) if err != nil { log.Warn("failed to update job state to PreImporting", zap.Error(err)) return @@ -247,10 +250,10 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { return } - requestSize, err := CheckDiskQuota(job, c.meta, c.imeta) + requestSize, err := CheckDiskQuota(c.ctx, job, c.meta, c.importMeta) if err != nil { log.Warn("import failed, disk quota exceeded", zap.Error(err)) - err = c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error())) + err = c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error())) if err != nil { log.Warn("failed to update job state to Failed", zap.Error(err)) } @@ -259,16 +262,16 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { allDiskIndex := c.meta.indexMeta.AreAllDiskIndex(job.GetCollectionID(), job.GetSchema()) groups := RegroupImportFiles(job, lacks, allDiskIndex) - newTasks, err := NewImportTasks(groups, job, c.alloc, c.meta, c.imeta) + newTasks, err := NewImportTasks(groups, job, c.alloc, c.meta, c.importMeta) if err != nil { log.Warn("new import tasks failed", zap.Error(err)) return } for _, t := range newTasks { - err = c.imeta.AddTask(context.TODO(), t) + err = c.importMeta.AddTask(c.ctx, t) if err != nil { log.Warn("add new import task failed", WrapTaskLog(t, zap.Error(err))...) - updateErr := c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error())) + updateErr := c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error())) if updateErr != nil { log.Warn("failed to update job state to Failed", zap.Error(updateErr)) } @@ -277,7 +280,7 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { log.Info("add new import task", WrapTaskLog(t)...) } - err = c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing), UpdateRequestedDiskSize(requestSize)) + err = c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing), UpdateRequestedDiskSize(requestSize)) if err != nil { log.Warn("failed to update job state to Importing", zap.Error(err)) return @@ -289,13 +292,13 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { func (c *importChecker) checkImportingJob(job ImportJob) { log := log.With(zap.Int64("jobID", job.GetJobID())) - tasks := c.imeta.GetTaskBy(context.TODO(), WithType(ImportTaskType), WithJob(job.GetJobID()), WithRequestSource()) + tasks := c.importMeta.GetTaskBy(c.ctx, WithType(ImportTaskType), WithJob(job.GetJobID()), WithRequestSource()) for _, t := range tasks { if t.GetState() != datapb.ImportTaskStateV2_Completed { return } } - err := c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Stats)) + err := c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Stats)) if err != nil { log.Warn("failed to update job state to Stats", zap.Error(err)) return @@ -308,7 +311,7 @@ func (c *importChecker) checkImportingJob(job ImportJob) { func (c *importChecker) checkStatsJob(job ImportJob) { log := log.With(zap.Int64("jobID", job.GetJobID())) updateJobState := func(state internalpb.ImportJobState, reason string) { - err := c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(state), UpdateJobReason(reason)) + err := c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(state), UpdateJobReason(reason)) if err != nil { log.Warn("failed to update job state", zap.Error(err)) return @@ -329,7 +332,7 @@ func (c *importChecker) checkStatsJob(job ImportJob) { taskCnt = 0 doneCnt = 0 ) - tasks := c.imeta.GetTaskBy(context.TODO(), WithType(ImportTaskType), WithJob(job.GetJobID())) + tasks := c.importMeta.GetTaskBy(c.ctx, WithType(ImportTaskType), WithJob(job.GetJobID())) for _, task := range tasks { originSegmentIDs := task.(*importTask).GetSegmentIDs() statsSegmentIDs := task.(*importTask).GetStatsSegmentIDs() @@ -365,7 +368,7 @@ func (c *importChecker) checkStatsJob(job ImportJob) { func (c *importChecker) checkIndexBuildingJob(job ImportJob) { log := log.With(zap.Int64("jobID", job.GetJobID())) - tasks := c.imeta.GetTaskBy(context.TODO(), WithType(ImportTaskType), WithJob(job.GetJobID())) + tasks := c.importMeta.GetTaskBy(c.ctx, WithType(ImportTaskType), WithJob(job.GetJobID())) originSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 { return t.(*importTask).GetSegmentIDs() }) @@ -413,7 +416,7 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) { } // all finished, update import job state to `Completed`. completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00") - err := c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime)) + err := c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime)) if err != nil { log.Warn("failed to update job state to Completed", zap.Error(err)) return @@ -426,7 +429,7 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) { func (c *importChecker) waitL0ImortTaskDone(job ImportJob) bool { // wait all lo import tasks to be completed - l0ImportTasks := c.imeta.GetTaskBy(context.TODO(), WithType(ImportTaskType), WithJob(job.GetJobID()), WithL0CompactionSource()) + l0ImportTasks := c.importMeta.GetTaskBy(c.ctx, WithType(ImportTaskType), WithJob(job.GetJobID()), WithL0CompactionSource()) for _, t := range l0ImportTasks { if t.GetState() != datapb.ImportTaskStateV2_Completed { log.Info("waiting for l0 import task...", @@ -442,7 +445,7 @@ func (c *importChecker) waitL0ImortTaskDone(job ImportJob) bool { func (c *importChecker) updateSegmentState(originSegmentIDs, statsSegmentIDs []int64) bool { // Here, all segment indexes have been successfully built, try unset isImporting flag for all segments. isImportingSegments := lo.Filter(append(originSegmentIDs, statsSegmentIDs...), func(segmentID int64, _ int) bool { - segment := c.meta.GetSegment(context.TODO(), segmentID) + segment := c.meta.GetSegment(c.ctx, segmentID) if segment == nil { log.Warn("cannot find segment", zap.Int64("segmentID", segmentID)) return false @@ -463,7 +466,7 @@ func (c *importChecker) updateSegmentState(originSegmentIDs, statsSegmentIDs []i op1 := UpdateStartPosition([]*datapb.SegmentStartPosition{{StartPosition: channelCP, SegmentID: segmentID}}) op2 := UpdateDmlPosition(segmentID, channelCP) op3 := UpdateIsImporting(segmentID, false) - err = c.meta.UpdateSegmentsInfo(context.TODO(), op1, op2, op3) + err = c.meta.UpdateSegmentsInfo(c.ctx, op1, op2, op3) if err != nil { log.Warn("update import segment failed", zap.Error(err)) return true @@ -473,7 +476,7 @@ func (c *importChecker) updateSegmentState(originSegmentIDs, statsSegmentIDs []i } func (c *importChecker) checkFailedJob(job ImportJob) { - tasks := c.imeta.GetTaskBy(context.TODO(), WithType(ImportTaskType), WithJob(job.GetJobID())) + tasks := c.importMeta.GetTaskBy(c.ctx, WithType(ImportTaskType), WithJob(job.GetJobID())) originSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 { return t.(*importTask).GetSegmentIDs() }) @@ -488,7 +491,7 @@ func (c *importChecker) checkFailedJob(job ImportJob) { } func (c *importChecker) tryFailingTasks(job ImportJob) { - tasks := c.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithStates(datapb.ImportTaskStateV2_Pending, + tasks := c.importMeta.GetTaskBy(c.ctx, WithJob(job.GetJobID()), WithStates(datapb.ImportTaskStateV2_Pending, datapb.ImportTaskStateV2_InProgress, datapb.ImportTaskStateV2_Completed)) if len(tasks) == 0 { return @@ -496,7 +499,7 @@ func (c *importChecker) tryFailingTasks(job ImportJob) { log.Warn("Import job has failed, all tasks with the same jobID will be marked as failed", zap.Int64("jobID", job.GetJobID()), zap.String("reason", job.GetReason())) for _, task := range tasks { - err := c.imeta.UpdateTask(context.TODO(), task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), + err := c.importMeta.UpdateTask(c.ctx, task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(job.GetReason())) if err != nil { log.Warn("failed to update import task state to failed", WrapTaskLog(task, zap.Error(err))...) @@ -510,7 +513,7 @@ func (c *importChecker) tryTimeoutJob(job ImportJob) { if time.Now().After(timeoutTime) { log.Warn("Import timeout, expired the specified time limit", zap.Int64("jobID", job.GetJobID()), zap.Time("timeoutTime", timeoutTime)) - err := c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), + err := c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason("import timeout")) if err != nil { log.Warn("failed to update job state to Failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) @@ -523,7 +526,7 @@ func (c *importChecker) checkCollection(collectionID int64, jobs []ImportJob) { return } - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(c.ctx, 10*time.Second) defer cancel() has, err := c.broker.HasCollection(ctx, collectionID) if err != nil { @@ -535,7 +538,7 @@ func (c *importChecker) checkCollection(collectionID int64, jobs []ImportJob) { return job.GetState() != internalpb.ImportJobState_Failed }) for _, job := range jobs { - err = c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), + err = c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(fmt.Sprintf("collection %d dropped", collectionID))) if err != nil { log.Warn("failed to update job state to Failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) @@ -555,7 +558,7 @@ func (c *importChecker) checkGC(job ImportJob) { GCRetention := Params.DataCoordCfg.ImportTaskRetention.GetAsDuration(time.Second) log.Info("job has reached the GC retention", zap.Time("cleanupTime", cleanupTime), zap.Duration("GCRetention", GCRetention)) - tasks := c.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID())) + tasks := c.importMeta.GetTaskBy(c.ctx, WithJob(job.GetJobID())) shouldRemoveJob := true for _, task := range tasks { if job.GetState() == internalpb.ImportJobState_Failed && task.GetType() == ImportTaskType { @@ -568,7 +571,7 @@ func (c *importChecker) checkGC(job ImportJob) { shouldRemoveJob = false continue } - err := c.imeta.RemoveTask(context.TODO(), task.GetTaskID()) + err := c.importMeta.RemoveTask(c.ctx, task.GetTaskID()) if err != nil { log.Warn("remove task failed during GC", WrapTaskLog(task, zap.Error(err))...) shouldRemoveJob = false @@ -579,7 +582,7 @@ func (c *importChecker) checkGC(job ImportJob) { if !shouldRemoveJob { return } - err := c.imeta.RemoveJob(context.TODO(), job.GetJobID()) + err := c.importMeta.RemoveJob(c.ctx, job.GetJobID()) if err != nil { log.Warn("remove import job failed", zap.Error(err)) return diff --git a/internal/datacoord/import_checker_test.go b/internal/datacoord/import_checker_test.go index 907247f5ec..cb587caee3 100644 --- a/internal/datacoord/import_checker_test.go +++ b/internal/datacoord/import_checker_test.go @@ -47,10 +47,10 @@ import ( type ImportCheckerSuite struct { suite.Suite - jobID int64 - imeta ImportMeta - checker *importChecker - alloc *allocator.MockAllocator + jobID int64 + importMeta ImportMeta + checker *importChecker + alloc *allocator.MockAllocator } func (s *ImportCheckerSuite) SetupTest() { @@ -74,9 +74,9 @@ func (s *ImportCheckerSuite) SetupTest() { meta, err := newMeta(context.TODO(), catalog, nil, broker) s.NoError(err) - imeta, err := NewImportMeta(context.TODO(), catalog, s.alloc, meta) + importMeta, err := NewImportMeta(context.TODO(), catalog, s.alloc, meta) s.NoError(err) - s.imeta = imeta + s.importMeta = importMeta sjm := NewMockStatsJobManager(s.T()) l0CompactionTrigger := NewMockTriggerManager(s.T()) @@ -85,7 +85,7 @@ func (s *ImportCheckerSuite) SetupTest() { l0CompactionTrigger.EXPECT().GetPauseCompactionChan(mock.Anything, mock.Anything).Return(compactionChan).Maybe() l0CompactionTrigger.EXPECT().GetResumeCompactionChan(mock.Anything, mock.Anything).Return(compactionChan).Maybe() - checker := NewImportChecker(meta, broker, s.alloc, imeta, sjm, l0CompactionTrigger).(*importChecker) + checker := NewImportChecker(context.TODO(), meta, broker, s.alloc, importMeta, sjm, l0CompactionTrigger).(*importChecker) s.checker = checker job := &importJob{ @@ -126,13 +126,13 @@ func (s *ImportCheckerSuite) SetupTest() { } catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil) - err = s.imeta.AddJob(context.TODO(), job) + err = s.importMeta.AddJob(context.TODO(), job) s.NoError(err) s.jobID = job.GetJobID() } func (s *ImportCheckerSuite) TestLogStats() { - catalog := s.imeta.(*importMeta).catalog.(*mocks.DataCoordCatalog) + catalog := s.importMeta.(*importMeta).catalog.(*mocks.DataCoordCatalog) catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(nil) catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil) @@ -145,7 +145,7 @@ func (s *ImportCheckerSuite) TestLogStats() { tr: timerecord.NewTimeRecorder("preimport task"), } pit1.task.Store(preImportTaskProto) - err := s.imeta.AddTask(context.TODO(), pit1) + err := s.importMeta.AddTask(context.TODO(), pit1) s.NoError(err) importTaskProto := &datapb.ImportTaskV2{ @@ -158,14 +158,14 @@ func (s *ImportCheckerSuite) TestLogStats() { tr: timerecord.NewTimeRecorder("import task"), } it1.task.Store(importTaskProto) - err = s.imeta.AddTask(context.TODO(), it1) + err = s.importMeta.AddTask(context.TODO(), it1) s.NoError(err) s.checker.LogTaskStats() } func (s *ImportCheckerSuite) TestCheckJob() { - job := s.imeta.GetJob(context.TODO(), s.jobID) + job := s.importMeta.GetJob(context.TODO(), s.jobID) // test checkPendingJob alloc := s.alloc @@ -173,39 +173,39 @@ func (s *ImportCheckerSuite) TestCheckJob() { id := rand.Int63() return id, id + n, nil }) - catalog := s.imeta.(*importMeta).catalog.(*mocks.DataCoordCatalog) + catalog := s.importMeta.(*importMeta).catalog.(*mocks.DataCoordCatalog) catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(nil) s.checker.checkPendingJob(job) - preimportTasks := s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType)) + preimportTasks := s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType)) s.Equal(2, len(preimportTasks)) - s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) + s.Equal(internalpb.ImportJobState_PreImporting, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState()) s.checker.checkPendingJob(job) // no lack - preimportTasks = s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType)) + preimportTasks = s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType)) s.Equal(2, len(preimportTasks)) - s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) + s.Equal(internalpb.ImportJobState_PreImporting, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState()) // test checkPreImportingJob catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil) for _, t := range preimportTasks { - err := s.imeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed)) + err := s.importMeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed)) s.NoError(err) } s.checker.checkPreImportingJob(job) - importTasks := s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType)) + importTasks := s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType)) s.Equal(1, len(importTasks)) - s.Equal(internalpb.ImportJobState_Importing, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) + s.Equal(internalpb.ImportJobState_Importing, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState()) s.checker.checkPreImportingJob(job) // no lack - importTasks = s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType)) + importTasks = s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType)) s.Equal(1, len(importTasks)) - s.Equal(internalpb.ImportJobState_Importing, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) + s.Equal(internalpb.ImportJobState_Importing, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState()) // test checkImportingJob s.checker.checkImportingJob(job) - s.Equal(internalpb.ImportJobState_Importing, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) + s.Equal(internalpb.ImportJobState_Importing, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState()) for _, t := range importTasks { - task := s.imeta.GetTask(context.TODO(), t.GetTaskID()) + task := s.importMeta.GetTask(context.TODO(), t.GetTaskID()) for _, id := range task.(*importTask).GetSegmentIDs() { segment := s.checker.meta.GetSegment(context.TODO(), id) s.Equal(true, segment.GetIsImporting()) @@ -225,14 +225,14 @@ func (s *ImportCheckerSuite) TestCheckJob() { } err := s.checker.meta.AddSegment(context.Background(), segment) s.NoError(err) - err = s.imeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), + err = s.importMeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), UpdateSegmentIDs([]int64{segment.GetID()}), UpdateStatsSegmentIDs([]int64{rand.Int63()})) s.NoError(err) err = s.checker.meta.UpdateChannelCheckpoint(context.TODO(), segment.GetInsertChannel(), &msgpb.MsgPosition{MsgID: []byte{0}}) s.NoError(err) } s.checker.checkImportingJob(job) - s.Equal(internalpb.ImportJobState_Stats, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) + s.Equal(internalpb.ImportJobState_Stats, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState()) // test check stats job alloc.EXPECT().AllocID(mock.Anything).Return(rand.Int63(), nil).Maybe() @@ -242,55 +242,55 @@ func (s *ImportCheckerSuite) TestCheckJob() { State: indexpb.JobState_JobStateNone, }) s.checker.checkStatsJob(job) - s.Equal(internalpb.ImportJobState_Stats, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) + s.Equal(internalpb.ImportJobState_Stats, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState()) sjm = NewMockStatsJobManager(s.T()) sjm.EXPECT().GetStatsTask(mock.Anything, mock.Anything).Return(&indexpb.StatsTask{ State: indexpb.JobState_JobStateInProgress, }) s.checker.si = sjm s.checker.checkStatsJob(job) - s.Equal(internalpb.ImportJobState_Stats, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) + s.Equal(internalpb.ImportJobState_Stats, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState()) sjm = NewMockStatsJobManager(s.T()) sjm.EXPECT().GetStatsTask(mock.Anything, mock.Anything).Return(&indexpb.StatsTask{ State: indexpb.JobState_JobStateFinished, }) s.checker.si = sjm s.checker.checkStatsJob(job) - s.Equal(internalpb.ImportJobState_IndexBuilding, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) + s.Equal(internalpb.ImportJobState_IndexBuilding, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState()) // test check IndexBuilding job s.checker.checkIndexBuildingJob(job) for _, t := range importTasks { - task := s.imeta.GetTask(context.TODO(), t.GetTaskID()) + task := s.importMeta.GetTask(context.TODO(), t.GetTaskID()) for _, id := range task.(*importTask).GetSegmentIDs() { segment := s.checker.meta.GetSegment(context.TODO(), id) s.Equal(false, segment.GetIsImporting()) } } - s.Equal(internalpb.ImportJobState_Completed, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) + s.Equal(internalpb.ImportJobState_Completed, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState()) } func (s *ImportCheckerSuite) TestCheckJob_Failed() { mockErr := errors.New("mock err") - job := s.imeta.GetJob(context.TODO(), s.jobID) + job := s.importMeta.GetJob(context.TODO(), s.jobID) // test checkPendingJob alloc := s.alloc alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, nil) - catalog := s.imeta.(*importMeta).catalog.(*mocks.DataCoordCatalog) + catalog := s.importMeta.(*importMeta).catalog.(*mocks.DataCoordCatalog) catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(mockErr) s.checker.checkPendingJob(job) - preimportTasks := s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType)) + preimportTasks := s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType)) s.Equal(0, len(preimportTasks)) - s.Equal(internalpb.ImportJobState_Pending, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) + s.Equal(internalpb.ImportJobState_Pending, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState()) alloc.ExpectedCalls = nil alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, mockErr) s.checker.checkPendingJob(job) - preimportTasks = s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType)) + preimportTasks = s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType)) s.Equal(0, len(preimportTasks)) - s.Equal(internalpb.ImportJobState_Pending, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) + s.Equal(internalpb.ImportJobState_Pending, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState()) alloc.ExpectedCalls = nil alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, nil) @@ -298,13 +298,13 @@ func (s *ImportCheckerSuite) TestCheckJob_Failed() { catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil) catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(nil) s.checker.checkPendingJob(job) - preimportTasks = s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType)) + preimportTasks = s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType)) s.Equal(2, len(preimportTasks)) - s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) + s.Equal(internalpb.ImportJobState_PreImporting, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState()) // test checkPreImportingJob for _, t := range preimportTasks { - err := s.imeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed)) + err := s.importMeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed)) s.NoError(err) } @@ -312,18 +312,18 @@ func (s *ImportCheckerSuite) TestCheckJob_Failed() { catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(mockErr) catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil) s.checker.checkPreImportingJob(job) - importTasks := s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType)) + importTasks := s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType)) s.Equal(0, len(importTasks)) - s.Equal(internalpb.ImportJobState_Failed, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) + s.Equal(internalpb.ImportJobState_Failed, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState()) alloc.ExpectedCalls = nil alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, mockErr) - err := s.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting)) + err := s.importMeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting)) s.NoError(err) s.checker.checkPreImportingJob(job) - importTasks = s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType)) + importTasks = s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType)) s.Equal(0, len(importTasks)) - s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) + s.Equal(internalpb.ImportJobState_PreImporting, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState()) catalog.ExpectedCalls = nil catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil) @@ -331,13 +331,13 @@ func (s *ImportCheckerSuite) TestCheckJob_Failed() { alloc.ExpectedCalls = nil alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, nil) s.checker.checkPreImportingJob(job) - importTasks = s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType)) + importTasks = s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType)) s.Equal(1, len(importTasks)) - s.Equal(internalpb.ImportJobState_Importing, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) + s.Equal(internalpb.ImportJobState_Importing, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState()) } func (s *ImportCheckerSuite) TestCheckTimeout() { - catalog := s.imeta.(*importMeta).catalog.(*mocks.DataCoordCatalog) + catalog := s.importMeta.(*importMeta).catalog.(*mocks.DataCoordCatalog) catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(nil) taskProto := &datapb.PreImportTask{ @@ -348,17 +348,17 @@ func (s *ImportCheckerSuite) TestCheckTimeout() { tr: timerecord.NewTimeRecorder("preimport task"), } task.task.Store(taskProto) - err := s.imeta.AddTask(context.TODO(), task) + err := s.importMeta.AddTask(context.TODO(), task) s.NoError(err) - s.checker.tryTimeoutJob(s.imeta.GetJob(context.TODO(), s.jobID)) + s.checker.tryTimeoutJob(s.importMeta.GetJob(context.TODO(), s.jobID)) - job := s.imeta.GetJob(context.TODO(), s.jobID) + job := s.importMeta.GetJob(context.TODO(), s.jobID) s.Equal(internalpb.ImportJobState_Failed, job.GetState()) s.Equal("import timeout", job.GetReason()) } func (s *ImportCheckerSuite) TestCheckFailure() { - catalog := s.imeta.(*importMeta).catalog.(*mocks.DataCoordCatalog) + catalog := s.importMeta.(*importMeta).catalog.(*mocks.DataCoordCatalog) catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil) taskProto := &datapb.ImportTaskV2{ @@ -372,35 +372,35 @@ func (s *ImportCheckerSuite) TestCheckFailure() { tr: timerecord.NewTimeRecorder("import task"), } it.task.Store(taskProto) - err := s.imeta.AddTask(context.TODO(), it) + err := s.importMeta.AddTask(context.TODO(), it) s.NoError(err) sjm := NewMockStatsJobManager(s.T()) sjm.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(errors.New("mock err")) s.checker.si = sjm - s.checker.checkFailedJob(s.imeta.GetJob(context.TODO(), s.jobID)) - tasks := s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID), WithStates(datapb.ImportTaskStateV2_Failed)) + s.checker.checkFailedJob(s.importMeta.GetJob(context.TODO(), s.jobID)) + tasks := s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID), WithStates(datapb.ImportTaskStateV2_Failed)) s.Equal(0, len(tasks)) sjm.ExpectedCalls = nil sjm.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(nil) catalog.ExpectedCalls = nil catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(errors.New("mock error")) - s.checker.checkFailedJob(s.imeta.GetJob(context.TODO(), s.jobID)) - tasks = s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID), WithStates(datapb.ImportTaskStateV2_Failed)) + s.checker.checkFailedJob(s.importMeta.GetJob(context.TODO(), s.jobID)) + tasks = s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID), WithStates(datapb.ImportTaskStateV2_Failed)) s.Equal(0, len(tasks)) catalog.ExpectedCalls = nil catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil) - s.checker.checkFailedJob(s.imeta.GetJob(context.TODO(), s.jobID)) - tasks = s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID), WithStates(datapb.ImportTaskStateV2_Failed)) + s.checker.checkFailedJob(s.importMeta.GetJob(context.TODO(), s.jobID)) + tasks = s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID), WithStates(datapb.ImportTaskStateV2_Failed)) s.Equal(1, len(tasks)) } func (s *ImportCheckerSuite) TestCheckGC() { mockErr := errors.New("mock err") - catalog := s.imeta.(*importMeta).catalog.(*mocks.DataCoordCatalog) + catalog := s.importMeta.(*importMeta).catalog.(*mocks.DataCoordCatalog) catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil) taskProto := &datapb.ImportTaskV2{ @@ -415,74 +415,74 @@ func (s *ImportCheckerSuite) TestCheckGC() { tr: timerecord.NewTimeRecorder("import task"), } task.task.Store(taskProto) - err := s.imeta.AddTask(context.TODO(), task) + err := s.importMeta.AddTask(context.TODO(), task) s.NoError(err) // not failed or completed - s.checker.checkGC(s.imeta.GetJob(context.TODO(), s.jobID)) - s.Equal(1, len(s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID)))) - s.Equal(1, len(s.imeta.GetJobBy(context.TODO()))) + s.checker.checkGC(s.importMeta.GetJob(context.TODO(), s.jobID)) + s.Equal(1, len(s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID)))) + s.Equal(1, len(s.importMeta.GetJobBy(context.TODO()))) catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil) - err = s.imeta.UpdateJob(context.TODO(), s.jobID, UpdateJobState(internalpb.ImportJobState_Failed)) + err = s.importMeta.UpdateJob(context.TODO(), s.jobID, UpdateJobState(internalpb.ImportJobState_Failed)) s.NoError(err) // not reach cleanup ts - s.checker.checkGC(s.imeta.GetJob(context.TODO(), s.jobID)) - s.Equal(1, len(s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID)))) - s.Equal(1, len(s.imeta.GetJobBy(context.TODO()))) + s.checker.checkGC(s.importMeta.GetJob(context.TODO(), s.jobID)) + s.Equal(1, len(s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID)))) + s.Equal(1, len(s.importMeta.GetJobBy(context.TODO()))) GCRetention := Params.DataCoordCfg.ImportTaskRetention.GetAsDuration(time.Second) - job := s.imeta.GetJob(context.TODO(), s.jobID) + job := s.importMeta.GetJob(context.TODO(), s.jobID) job.(*importJob).CleanupTs = tsoutil.AddPhysicalDurationOnTs(job.GetCleanupTs(), GCRetention*-2) - err = s.imeta.AddJob(context.TODO(), job) + err = s.importMeta.AddJob(context.TODO(), job) s.NoError(err) // origin segment not dropped - s.checker.checkGC(s.imeta.GetJob(context.TODO(), s.jobID)) - s.Equal(1, len(s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID)))) - s.Equal(1, len(s.imeta.GetJobBy(context.TODO()))) - err = s.imeta.UpdateTask(context.TODO(), task.GetTaskID(), UpdateSegmentIDs([]int64{})) + s.checker.checkGC(s.importMeta.GetJob(context.TODO(), s.jobID)) + s.Equal(1, len(s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID)))) + s.Equal(1, len(s.importMeta.GetJobBy(context.TODO()))) + err = s.importMeta.UpdateTask(context.TODO(), task.GetTaskID(), UpdateSegmentIDs([]int64{})) s.NoError(err) // stats segment not dropped - s.checker.checkGC(s.imeta.GetJob(context.TODO(), s.jobID)) - s.Equal(1, len(s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID)))) - s.Equal(1, len(s.imeta.GetJobBy(context.TODO()))) - err = s.imeta.UpdateTask(context.TODO(), task.GetTaskID(), UpdateStatsSegmentIDs([]int64{})) + s.checker.checkGC(s.importMeta.GetJob(context.TODO(), s.jobID)) + s.Equal(1, len(s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID)))) + s.Equal(1, len(s.importMeta.GetJobBy(context.TODO()))) + err = s.importMeta.UpdateTask(context.TODO(), task.GetTaskID(), UpdateStatsSegmentIDs([]int64{})) s.NoError(err) // task is not dropped - s.checker.checkGC(s.imeta.GetJob(context.TODO(), s.jobID)) - s.Equal(1, len(s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID)))) - s.Equal(1, len(s.imeta.GetJobBy(context.TODO()))) - err = s.imeta.UpdateTask(context.TODO(), task.GetTaskID(), UpdateNodeID(NullNodeID)) + s.checker.checkGC(s.importMeta.GetJob(context.TODO(), s.jobID)) + s.Equal(1, len(s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID)))) + s.Equal(1, len(s.importMeta.GetJobBy(context.TODO()))) + err = s.importMeta.UpdateTask(context.TODO(), task.GetTaskID(), UpdateNodeID(NullNodeID)) s.NoError(err) // remove task failed catalog.EXPECT().DropImportTask(mock.Anything, mock.Anything).Return(mockErr) - s.checker.checkGC(s.imeta.GetJob(context.TODO(), s.jobID)) - s.Equal(1, len(s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID)))) - s.Equal(1, len(s.imeta.GetJobBy(context.TODO()))) + s.checker.checkGC(s.importMeta.GetJob(context.TODO(), s.jobID)) + s.Equal(1, len(s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID)))) + s.Equal(1, len(s.importMeta.GetJobBy(context.TODO()))) // remove job failed catalog.ExpectedCalls = nil catalog.EXPECT().DropImportTask(mock.Anything, mock.Anything).Return(nil) catalog.EXPECT().DropImportJob(mock.Anything, mock.Anything).Return(mockErr) - s.checker.checkGC(s.imeta.GetJob(context.TODO(), s.jobID)) - s.Equal(0, len(s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID)))) - s.Equal(1, len(s.imeta.GetJobBy(context.TODO()))) + s.checker.checkGC(s.importMeta.GetJob(context.TODO(), s.jobID)) + s.Equal(0, len(s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID)))) + s.Equal(1, len(s.importMeta.GetJobBy(context.TODO()))) // normal case catalog.ExpectedCalls = nil catalog.EXPECT().DropImportJob(mock.Anything, mock.Anything).Return(nil) - s.checker.checkGC(s.imeta.GetJob(context.TODO(), s.jobID)) - s.Equal(0, len(s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID)))) - s.Equal(0, len(s.imeta.GetJobBy(context.TODO()))) + s.checker.checkGC(s.importMeta.GetJob(context.TODO(), s.jobID)) + s.Equal(0, len(s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID)))) + s.Equal(0, len(s.importMeta.GetJobBy(context.TODO()))) } func (s *ImportCheckerSuite) TestCheckCollection() { mockErr := errors.New("mock err") - catalog := s.imeta.(*importMeta).catalog.(*mocks.DataCoordCatalog) + catalog := s.importMeta.(*importMeta).catalog.(*mocks.DataCoordCatalog) catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(nil) taskProto := &datapb.PreImportTask{ @@ -494,25 +494,25 @@ func (s *ImportCheckerSuite) TestCheckCollection() { tr: timerecord.NewTimeRecorder("preimport task"), } task.task.Store(taskProto) - err := s.imeta.AddTask(context.TODO(), task) + err := s.importMeta.AddTask(context.TODO(), task) s.NoError(err) // no jobs s.checker.checkCollection(1, []ImportJob{}) - s.Equal(internalpb.ImportJobState_Pending, s.imeta.GetJob(context.TODO(), s.jobID).GetState()) + s.Equal(internalpb.ImportJobState_Pending, s.importMeta.GetJob(context.TODO(), s.jobID).GetState()) // collection exist broker := s.checker.broker.(*broker2.MockBroker) broker.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(true, nil) - s.checker.checkCollection(1, []ImportJob{s.imeta.GetJob(context.TODO(), s.jobID)}) - s.Equal(internalpb.ImportJobState_Pending, s.imeta.GetJob(context.TODO(), s.jobID).GetState()) + s.checker.checkCollection(1, []ImportJob{s.importMeta.GetJob(context.TODO(), s.jobID)}) + s.Equal(internalpb.ImportJobState_Pending, s.importMeta.GetJob(context.TODO(), s.jobID).GetState()) // HasCollection failed s.checker.broker = broker2.NewMockBroker(s.T()) broker = s.checker.broker.(*broker2.MockBroker) broker.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(true, mockErr) - s.checker.checkCollection(1, []ImportJob{s.imeta.GetJob(context.TODO(), s.jobID)}) - s.Equal(internalpb.ImportJobState_Pending, s.imeta.GetJob(context.TODO(), s.jobID).GetState()) + s.checker.checkCollection(1, []ImportJob{s.importMeta.GetJob(context.TODO(), s.jobID)}) + s.Equal(internalpb.ImportJobState_Pending, s.importMeta.GetJob(context.TODO(), s.jobID).GetState()) // SaveImportJob failed s.checker.broker = broker2.NewMockBroker(s.T()) @@ -520,8 +520,8 @@ func (s *ImportCheckerSuite) TestCheckCollection() { broker.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(false, nil) catalog.ExpectedCalls = nil catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(mockErr) - s.checker.checkCollection(1, []ImportJob{s.imeta.GetJob(context.TODO(), s.jobID)}) - s.Equal(internalpb.ImportJobState_Pending, s.imeta.GetJob(context.TODO(), s.jobID).GetState()) + s.checker.checkCollection(1, []ImportJob{s.importMeta.GetJob(context.TODO(), s.jobID)}) + s.Equal(internalpb.ImportJobState_Pending, s.importMeta.GetJob(context.TODO(), s.jobID).GetState()) // collection dropped s.checker.broker = broker2.NewMockBroker(s.T()) @@ -529,8 +529,8 @@ func (s *ImportCheckerSuite) TestCheckCollection() { broker.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(false, nil) catalog.ExpectedCalls = nil catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil) - s.checker.checkCollection(1, []ImportJob{s.imeta.GetJob(context.TODO(), s.jobID)}) - s.Equal(internalpb.ImportJobState_Failed, s.imeta.GetJob(context.TODO(), s.jobID).GetState()) + s.checker.checkCollection(1, []ImportJob{s.importMeta.GetJob(context.TODO(), s.jobID)}) + s.Equal(internalpb.ImportJobState_Failed, s.importMeta.GetJob(context.TODO(), s.jobID).GetState()) } func TestImportChecker(t *testing.T) { @@ -565,7 +565,7 @@ func TestImportCheckerCompaction(t *testing.T) { meta, err := newMeta(context.TODO(), catalog, nil, broker) assert.NoError(t, err) - imeta, err := NewImportMeta(context.TODO(), catalog, alloc, meta) + importMeta, err := NewImportMeta(context.TODO(), catalog, alloc, meta) assert.NoError(t, err) sjm := NewMockStatsJobManager(t) @@ -575,7 +575,7 @@ func TestImportCheckerCompaction(t *testing.T) { l0CompactionTrigger.EXPECT().GetPauseCompactionChan(mock.Anything, mock.Anything).Return(compactionChan).Maybe() l0CompactionTrigger.EXPECT().GetResumeCompactionChan(mock.Anything, mock.Anything).Return(compactionChan).Maybe() - checker := NewImportChecker(meta, broker, alloc, imeta, sjm, l0CompactionTrigger).(*importChecker) + checker := NewImportChecker(context.TODO(), meta, broker, alloc, importMeta, sjm, l0CompactionTrigger).(*importChecker) job := &importJob{ ImportJob: &datapb.ImportJob{ @@ -615,7 +615,7 @@ func TestImportCheckerCompaction(t *testing.T) { tr: timerecord.NewTimeRecorder("import job"), } catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil).Once() - err = imeta.AddJob(context.TODO(), job) + err = importMeta.AddJob(context.TODO(), job) assert.NoError(t, err) jobID := job.GetJobID() @@ -662,7 +662,7 @@ func TestImportCheckerCompaction(t *testing.T) { }, tr: timerecord.NewTimeRecorder("import job"), } - err = imeta.AddJob(context.TODO(), job2) + err = importMeta.AddJob(context.TODO(), job2) assert.NoError(t, err) log.Info("job ready") @@ -675,8 +675,8 @@ func TestImportCheckerCompaction(t *testing.T) { catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(nil).Twice() catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil).Once() assert.Eventually(t, func() bool { - job := imeta.GetJob(context.TODO(), jobID) - preimportTasks := imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType)) + job := importMeta.GetJob(context.TODO(), jobID) + preimportTasks := importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType)) taskLen := len(preimportTasks) log.Info("job pre-importing", zap.Any("taskLen", taskLen), zap.Any("jobState", job.GetState())) return taskLen == 2 && job.GetState() == internalpb.ImportJobState_PreImporting @@ -687,14 +687,14 @@ func TestImportCheckerCompaction(t *testing.T) { catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil).Once() catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(nil).Twice() catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil).Once() - preimportTasks := imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType)) + preimportTasks := importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType)) for _, pt := range preimportTasks { - err := imeta.UpdateTask(context.TODO(), pt.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed)) + err := importMeta.UpdateTask(context.TODO(), pt.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed)) assert.NoError(t, err) } assert.Eventually(t, func() bool { - job := imeta.GetJob(context.TODO(), jobID) - importTasks := imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType)) + job := importMeta.GetJob(context.TODO(), jobID) + importTasks := importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType)) return len(importTasks) == 1 && job.GetState() == internalpb.ImportJobState_Importing }, 2*time.Second, 100*time.Millisecond) log.Info("job importing") @@ -705,7 +705,7 @@ func TestImportCheckerCompaction(t *testing.T) { catalog.EXPECT().SaveChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil).Once() catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil).Once() - importTasks := imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType)) + importTasks := importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType)) for _, it := range importTasks { segment := &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ @@ -717,14 +717,14 @@ func TestImportCheckerCompaction(t *testing.T) { } err := checker.meta.AddSegment(context.Background(), segment) assert.NoError(t, err) - err = imeta.UpdateTask(context.TODO(), it.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), + err = importMeta.UpdateTask(context.TODO(), it.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), UpdateSegmentIDs([]int64{segment.GetID()}), UpdateStatsSegmentIDs([]int64{rand.Int63()})) assert.NoError(t, err) err = checker.meta.UpdateChannelCheckpoint(context.TODO(), segment.GetInsertChannel(), &msgpb.MsgPosition{MsgID: []byte{0}}) assert.NoError(t, err) } assert.Eventually(t, func() bool { - job := imeta.GetJob(context.TODO(), jobID) + job := importMeta.GetJob(context.TODO(), jobID) return job.GetState() == internalpb.ImportJobState_Stats }, 2*time.Second, 100*time.Millisecond) log.Info("job stats") @@ -735,7 +735,7 @@ func TestImportCheckerCompaction(t *testing.T) { State: indexpb.JobState_JobStateFinished, }).Once() assert.Eventually(t, func() bool { - job := imeta.GetJob(context.TODO(), jobID) + job := importMeta.GetJob(context.TODO(), jobID) return job.GetState() == internalpb.ImportJobState_IndexBuilding }, 2*time.Second, 100*time.Millisecond) log.Info("job index building") @@ -750,16 +750,16 @@ func TestImportCheckerCompaction(t *testing.T) { } task := &importTask{} task.task.Store(taskProto) - imeta.AddTask(context.TODO(), task) + importMeta.AddTask(context.TODO(), task) time.Sleep(1200 * time.Millisecond) catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil).Once() - imeta.UpdateTask(context.TODO(), 100000, UpdateState(datapb.ImportTaskStateV2_Completed)) + importMeta.UpdateTask(context.TODO(), 100000, UpdateState(datapb.ImportTaskStateV2_Completed)) log.Info("job l0 compaction") // check index building catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil).Once() assert.Eventually(t, func() bool { - job := imeta.GetJob(context.TODO(), jobID) + job := importMeta.GetJob(context.TODO(), jobID) return job.GetState() == internalpb.ImportJobState_Completed }, 2*time.Second, 100*time.Millisecond) log.Info("job completed") diff --git a/internal/datacoord/import_inspector.go b/internal/datacoord/import_inspector.go index c61ab3428c..b7870596ef 100644 --- a/internal/datacoord/import_inspector.go +++ b/internal/datacoord/import_inspector.go @@ -41,32 +41,34 @@ type ImportInspector interface { } type importInspector struct { - meta *meta - alloc allocator.Allocator - imeta ImportMeta - scheduler task.GlobalScheduler + ctx context.Context + meta *meta + alloc allocator.Allocator + importMeta ImportMeta + scheduler task.GlobalScheduler closeOnce sync.Once closeChan chan struct{} } -func NewImportInspector(meta *meta, imeta ImportMeta, scheduler task.GlobalScheduler) ImportInspector { +func NewImportInspector(ctx context.Context, meta *meta, importMeta ImportMeta, scheduler task.GlobalScheduler) ImportInspector { return &importInspector{ - meta: meta, - imeta: imeta, - scheduler: scheduler, - closeChan: make(chan struct{}), + ctx: ctx, + meta: meta, + importMeta: importMeta, + scheduler: scheduler, + closeChan: make(chan struct{}), } } func (s *importInspector) Start() { - log.Ctx(context.TODO()).Info("start import inspector") + log.Ctx(s.ctx).Info("start import inspector") ticker := time.NewTicker(Params.DataCoordCfg.ImportScheduleInterval.GetAsDuration(time.Second)) defer ticker.Stop() for { select { case <-s.closeChan: - log.Ctx(context.TODO()).Info("import inspector exited") + log.Ctx(s.ctx).Info("import inspector exited") return case <-ticker.C: s.inspect() @@ -81,12 +83,12 @@ func (s *importInspector) Close() { } func (s *importInspector) inspect() { - jobs := s.imeta.GetJobBy(context.TODO()) + jobs := s.importMeta.GetJobBy(s.ctx) sort.Slice(jobs, func(i, j int) bool { return jobs[i].GetJobID() < jobs[j].GetJobID() }) for _, job := range jobs { - tasks := s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID())) + tasks := s.importMeta.GetTaskBy(s.ctx, WithJob(job.GetJobID())) for _, task := range tasks { switch task.GetState() { case datapb.ImportTaskStateV2_Pending: @@ -118,16 +120,16 @@ func (s *importInspector) processFailed(task ImportTask) { segments := append(originSegmentIDs, statsSegmentIDs...) for _, segment := range segments { op := UpdateStatusOperator(segment, commonpb.SegmentState_Dropped) - err := s.meta.UpdateSegmentsInfo(context.TODO(), op) + err := s.meta.UpdateSegmentsInfo(s.ctx, op) if err != nil { - log.Ctx(context.TODO()).Warn("drop import segment failed", WrapTaskLog(task, zap.Int64("segment", segment), zap.Error(err))...) + log.Ctx(s.ctx).Warn("drop import segment failed", WrapTaskLog(task, zap.Int64("segment", segment), zap.Error(err))...) return } } if len(segments) > 0 { - err := s.imeta.UpdateTask(context.TODO(), task.GetTaskID(), UpdateSegmentIDs(nil), UpdateStatsSegmentIDs(nil)) + err := s.importMeta.UpdateTask(s.ctx, task.GetTaskID(), UpdateSegmentIDs(nil), UpdateStatsSegmentIDs(nil)) if err != nil { - log.Ctx(context.TODO()).Warn("update import task segments failed", WrapTaskLog(task, zap.Error(err))...) + log.Ctx(s.ctx).Warn("update import task segments failed", WrapTaskLog(task, zap.Error(err))...) } } } diff --git a/internal/datacoord/import_inspector_test.go b/internal/datacoord/import_inspector_test.go index b8dc6a757c..78417328cf 100644 --- a/internal/datacoord/import_inspector_test.go +++ b/internal/datacoord/import_inspector_test.go @@ -40,12 +40,12 @@ type ImportInspectorSuite struct { collectionID int64 - catalog *mocks.DataCoordCatalog - alloc *allocator.MockAllocator - cluster *MockCluster - meta *meta - imeta ImportMeta - inspector *importInspector + catalog *mocks.DataCoordCatalog + alloc *allocator.MockAllocator + cluster *MockCluster + meta *meta + importMeta ImportMeta + inspector *importInspector } func (s *ImportInspectorSuite) SetupTest() { @@ -75,10 +75,10 @@ func (s *ImportInspectorSuite) SetupTest() { ID: s.collectionID, Schema: newTestSchema(), }) - s.imeta, err = NewImportMeta(context.TODO(), s.catalog, s.alloc, s.meta) + s.importMeta, err = NewImportMeta(context.TODO(), s.catalog, s.alloc, s.meta) s.NoError(err) scheduler := task2.NewMockGlobalScheduler(s.T()) - s.inspector = NewImportInspector(s.meta, s.imeta, scheduler).(*importInspector) + s.inspector = NewImportInspector(context.TODO(), s.meta, s.importMeta, scheduler).(*importInspector) } func (s *ImportInspectorSuite) TestProcessPreImport() { @@ -93,11 +93,11 @@ func (s *ImportInspectorSuite) TestProcessPreImport() { } var task ImportTask = &preImportTask{ - imeta: s.imeta, - tr: timerecord.NewTimeRecorder("preimport task"), + importMeta: s.importMeta, + tr: timerecord.NewTimeRecorder("preimport task"), } task.(*preImportTask).task.Store(taskProto) - err := s.imeta.AddTask(context.TODO(), task) + err := s.importMeta.AddTask(context.TODO(), task) s.NoError(err) var job ImportJob = &importJob{ ImportJob: &datapb.ImportJob{ @@ -108,7 +108,7 @@ func (s *ImportInspectorSuite) TestProcessPreImport() { }, tr: timerecord.NewTimeRecorder("import job"), } - err = s.imeta.AddJob(context.TODO(), job) + err = s.importMeta.AddJob(context.TODO(), job) s.NoError(err) // pending -> inProgress @@ -118,7 +118,7 @@ func (s *ImportInspectorSuite) TestProcessPreImport() { task.CreateTaskOnWorker(1, cluster) }) s.inspector.inspect() - task = s.imeta.GetTask(context.TODO(), task.GetTaskID()) + task = s.importMeta.GetTask(context.TODO(), task.GetTaskID()) s.Equal(datapb.ImportTaskStateV2_InProgress, task.GetState()) // inProgress -> completed @@ -126,7 +126,7 @@ func (s *ImportInspectorSuite) TestProcessPreImport() { State: datapb.ImportTaskStateV2_Completed, }, nil) task.QueryTaskOnWorker(cluster) - task = s.imeta.GetTask(context.TODO(), task.GetTaskID()) + task = s.importMeta.GetTask(context.TODO(), task.GetTaskID()) s.Equal(datapb.ImportTaskStateV2_Completed, task.GetState()) } @@ -156,13 +156,13 @@ func (s *ImportInspectorSuite) TestProcessImport() { } var task ImportTask = &importTask{ - alloc: s.alloc, - meta: s.meta, - imeta: s.imeta, - tr: timerecord.NewTimeRecorder("import task"), + alloc: s.alloc, + meta: s.meta, + importMeta: s.importMeta, + tr: timerecord.NewTimeRecorder("import task"), } task.(*importTask).task.Store(taskProto) - err := s.imeta.AddTask(context.TODO(), task) + err := s.importMeta.AddTask(context.TODO(), task) s.NoError(err) var job ImportJob = &importJob{ ImportJob: &datapb.ImportJob{ @@ -175,7 +175,7 @@ func (s *ImportInspectorSuite) TestProcessImport() { }, tr: timerecord.NewTimeRecorder("import job"), } - err = s.imeta.AddJob(context.TODO(), job) + err = s.importMeta.AddJob(context.TODO(), job) s.NoError(err) // pending -> inProgress @@ -188,7 +188,7 @@ func (s *ImportInspectorSuite) TestProcessImport() { task.CreateTaskOnWorker(nodeID, cluster) }) s.inspector.inspect() - task = s.imeta.GetTask(context.TODO(), task.GetTaskID()) + task = s.importMeta.GetTask(context.TODO(), task.GetTaskID()) s.Equal(datapb.ImportTaskStateV2_InProgress, task.GetState()) // inProgress -> completed @@ -196,7 +196,7 @@ func (s *ImportInspectorSuite) TestProcessImport() { State: datapb.ImportTaskStateV2_Completed, }, nil) task.QueryTaskOnWorker(cluster) - task = s.imeta.GetTask(context.TODO(), task.GetTaskID()) + task = s.importMeta.GetTask(context.TODO(), task.GetTaskID()) s.Equal(datapb.ImportTaskStateV2_Completed, task.GetState()) } @@ -218,7 +218,7 @@ func (s *ImportInspectorSuite) TestProcessFailed() { tr: timerecord.NewTimeRecorder("import task"), } task.(*importTask).task.Store(taskProto) - err := s.imeta.AddTask(context.TODO(), task) + err := s.importMeta.AddTask(context.TODO(), task) s.NoError(err) var job ImportJob = &importJob{ ImportJob: &datapb.ImportJob{ @@ -231,7 +231,7 @@ func (s *ImportInspectorSuite) TestProcessFailed() { }, tr: timerecord.NewTimeRecorder("import job"), } - err = s.imeta.AddJob(context.TODO(), job) + err = s.importMeta.AddJob(context.TODO(), job) s.NoError(err) s.catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil) @@ -253,7 +253,7 @@ func (s *ImportInspectorSuite) TestProcessFailed() { segment := s.meta.GetSegment(context.TODO(), id) s.Equal(commonpb.SegmentState_Dropped, segment.GetState()) } - task = s.imeta.GetTask(context.TODO(), task.GetTaskID()) + task = s.importMeta.GetTask(context.TODO(), task.GetTaskID()) s.Equal(datapb.ImportTaskStateV2_Failed, task.GetState()) s.Equal(0, len(task.(*importTask).GetSegmentIDs())) } diff --git a/internal/datacoord/import_meta.go b/internal/datacoord/import_meta.go index 30421b8fc6..fce82c3d97 100644 --- a/internal/datacoord/import_meta.go +++ b/internal/datacoord/import_meta.go @@ -111,24 +111,24 @@ func NewImportMeta(ctx context.Context, catalog metastore.DataCoordCatalog, allo } tasks := newImportTasks() - imeta := &importMeta{} + importMeta := &importMeta{} for _, task := range restoredPreImportTasks { t := &preImportTask{ - imeta: imeta, - tr: timerecord.NewTimeRecorder("preimport task"), - times: taskcommon.NewTimes(), + importMeta: importMeta, + tr: timerecord.NewTimeRecorder("preimport task"), + times: taskcommon.NewTimes(), } t.task.Store(task) tasks.add(t) } for _, task := range restoredImportTasks { t := &importTask{ - alloc: alloc, - meta: meta, - imeta: imeta, - tr: timerecord.NewTimeRecorder("import task"), - times: taskcommon.NewTimes(), + alloc: alloc, + meta: meta, + importMeta: importMeta, + tr: timerecord.NewTimeRecorder("import task"), + times: taskcommon.NewTimes(), } t.task.Store(task) tasks.add(t) @@ -142,10 +142,10 @@ func NewImportMeta(ctx context.Context, catalog metastore.DataCoordCatalog, allo } } - imeta.jobs = jobs - imeta.tasks = tasks - imeta.catalog = catalog - return imeta, nil + importMeta.jobs = jobs + importMeta.tasks = tasks + importMeta.catalog = catalog + return importMeta, nil } func (m *importMeta) AddJob(ctx context.Context, job ImportJob) error { diff --git a/internal/datacoord/import_task_import.go b/internal/datacoord/import_task_import.go index 77228b0853..9466f76565 100644 --- a/internal/datacoord/import_task_import.go +++ b/internal/datacoord/import_task_import.go @@ -47,7 +47,7 @@ type importTask struct { alloc allocator.Allocator meta *meta - imeta ImportMeta + importMeta ImportMeta tr *timerecord.TimeRecorder times *taskcommon.Times retryTimes int64 @@ -138,7 +138,7 @@ func (t *importTask) GetTaskSlot() int64 { func (t *importTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster) { log.Info("processing pending import task...", WrapTaskLog(t)...) - job := t.imeta.GetJob(context.TODO(), t.GetJobID()) + job := t.importMeta.GetJob(context.TODO(), t.GetJobID()) req, err := AssembleImportRequest(t, job, t.meta, t.alloc) if err != nil { log.Warn("assemble import request failed", WrapTaskLog(t, zap.Error(err))...) @@ -150,7 +150,7 @@ func (t *importTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster) { t.retryTimes++ return } - err = t.imeta.UpdateTask(context.TODO(), t.GetTaskID(), + err = t.importMeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress), UpdateNodeID(nodeID)) if err != nil { @@ -169,7 +169,7 @@ func (t *importTask) QueryTaskOnWorker(cluster session.Cluster) { } resp, err := cluster.QueryImport(t.GetNodeID(), req) if err != nil { - updateErr := t.imeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Pending)) + updateErr := t.importMeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Pending)) if updateErr != nil { log.Warn("failed to update import task state to pending", WrapTaskLog(t, zap.Error(updateErr))...) } @@ -177,7 +177,7 @@ func (t *importTask) QueryTaskOnWorker(cluster session.Cluster) { return } if resp.GetState() == datapb.ImportTaskStateV2_Failed { - err = t.imeta.UpdateJob(context.TODO(), t.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(resp.GetReason())) + err = t.importMeta.UpdateJob(context.TODO(), t.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(resp.GetReason())) if err != nil { log.Warn("failed to update job state to Failed", zap.Int64("jobID", t.GetJobID()), zap.Error(err)) } @@ -222,7 +222,7 @@ func (t *importTask) QueryTaskOnWorker(cluster session.Cluster) { op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed) err = t.meta.UpdateSegmentsInfo(context.TODO(), op1, op2) if err != nil { - updateErr := t.imeta.UpdateJob(context.TODO(), t.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error())) + updateErr := t.importMeta.UpdateJob(context.TODO(), t.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error())) if updateErr != nil { log.Warn("failed to update job state to Failed", zap.Int64("jobID", t.GetJobID()), zap.Error(updateErr)) } @@ -231,7 +231,7 @@ func (t *importTask) QueryTaskOnWorker(cluster session.Cluster) { } } completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00") - err = t.imeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), UpdateCompleteTime(completeTime)) + err = t.importMeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), UpdateCompleteTime(completeTime)) if err != nil { log.Warn("update import task failed", WrapTaskLog(t, zap.Error(err))...) return @@ -245,7 +245,7 @@ func (t *importTask) QueryTaskOnWorker(cluster session.Cluster) { } func (t *importTask) DropTaskOnWorker(cluster session.Cluster) { - err := DropImportTask(t, cluster, t.imeta) + err := DropImportTask(t, cluster, t.importMeta) if err != nil { log.Warn("drop import failed", WrapTaskLog(t, zap.Error(err))...) return @@ -263,11 +263,11 @@ func (t *importTask) GetTR() *timerecord.TimeRecorder { func (t *importTask) Clone() ImportTask { cloned := &importTask{ - alloc: t.alloc, - meta: t.meta, - imeta: t.imeta, - tr: t.tr, - times: t.times, + alloc: t.alloc, + meta: t.meta, + importMeta: t.importMeta, + tr: t.tr, + times: t.times, } cloned.task.Store(typeutil.Clone(t.task.Load())) return cloned diff --git a/internal/datacoord/import_task_import_test.go b/internal/datacoord/import_task_import_test.go index e14a4f7d14..fd26489084 100644 --- a/internal/datacoord/import_task_import_test.go +++ b/internal/datacoord/import_task_import_test.go @@ -95,10 +95,10 @@ func TestImportTask_CreateTaskOnWorker(t *testing.T) { State: datapb.ImportTaskStateV2_Pending, } task := &importTask{ - alloc: alloc, - meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()}, - imeta: im, - tr: timerecord.NewTimeRecorder(""), + alloc: alloc, + meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()}, + importMeta: im, + tr: timerecord.NewTimeRecorder(""), } task.task.Store(taskProto) err = im.AddTask(context.TODO(), task) @@ -139,10 +139,10 @@ func TestImportTask_CreateTaskOnWorker(t *testing.T) { State: datapb.ImportTaskStateV2_Pending, } task := &importTask{ - alloc: alloc, - meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()}, - imeta: im, - tr: timerecord.NewTimeRecorder(""), + alloc: alloc, + meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()}, + importMeta: im, + tr: timerecord.NewTimeRecorder(""), } task.task.Store(taskProto) err = im.AddTask(context.TODO(), task) @@ -184,10 +184,10 @@ func TestImportTask_CreateTaskOnWorker(t *testing.T) { State: datapb.ImportTaskStateV2_Pending, } task := &importTask{ - alloc: alloc, - meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()}, - imeta: im, - tr: timerecord.NewTimeRecorder(""), + alloc: alloc, + meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()}, + importMeta: im, + tr: timerecord.NewTimeRecorder(""), } task.task.Store(taskProto) err = im.AddTask(context.TODO(), task) @@ -234,10 +234,10 @@ func TestImportTask_CreateTaskOnWorker(t *testing.T) { State: datapb.ImportTaskStateV2_Pending, } task := &importTask{ - alloc: alloc, - meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()}, - imeta: im, - tr: timerecord.NewTimeRecorder(""), + alloc: alloc, + meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()}, + importMeta: im, + tr: timerecord.NewTimeRecorder(""), } task.task.Store(taskProto) err = im.AddTask(context.TODO(), task) @@ -270,10 +270,10 @@ func TestImportTask_QueryTaskOnWorker(t *testing.T) { State: datapb.ImportTaskStateV2_InProgress, } task := &importTask{ - alloc: nil, - meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()}, - imeta: im, - tr: timerecord.NewTimeRecorder(""), + alloc: nil, + meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()}, + importMeta: im, + tr: timerecord.NewTimeRecorder(""), } task.task.Store(taskProto) err = im.AddTask(context.TODO(), task) @@ -322,8 +322,8 @@ func TestImportTask_QueryTaskOnWorker(t *testing.T) { collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](), segments: NewSegmentsInfo(), }, - imeta: im, - tr: timerecord.NewTimeRecorder(""), + importMeta: im, + tr: timerecord.NewTimeRecorder(""), } task.task.Store(taskProto) err = im.AddTask(context.TODO(), task) @@ -363,8 +363,8 @@ func TestImportTask_QueryTaskOnWorker(t *testing.T) { collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](), segments: NewSegmentsInfo(), }, - imeta: im, - tr: timerecord.NewTimeRecorder(""), + importMeta: im, + tr: timerecord.NewTimeRecorder(""), } task.task.Store(taskProto) err = im.AddTask(context.TODO(), task) @@ -431,8 +431,8 @@ func TestImportTask_QueryTaskOnWorker(t *testing.T) { collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](), segments: NewSegmentsInfo(), }, - imeta: im, - tr: timerecord.NewTimeRecorder(""), + importMeta: im, + tr: timerecord.NewTimeRecorder(""), } task.task.Store(taskProto) err = im.AddTask(context.TODO(), task) @@ -496,9 +496,9 @@ func TestImportTask_DropTaskOnWorker(t *testing.T) { State: datapb.ImportTaskStateV2_Completed, } task := &importTask{ - alloc: nil, - imeta: im, - tr: timerecord.NewTimeRecorder(""), + alloc: nil, + importMeta: im, + tr: timerecord.NewTimeRecorder(""), } task.task.Store(taskProto) err = im.AddTask(context.TODO(), task) @@ -530,9 +530,9 @@ func TestImportTask_DropTaskOnWorker(t *testing.T) { State: datapb.ImportTaskStateV2_Completed, } task := &importTask{ - alloc: nil, - imeta: im, - tr: timerecord.NewTimeRecorder(""), + alloc: nil, + importMeta: im, + tr: timerecord.NewTimeRecorder(""), } task.task.Store(taskProto) err = im.AddTask(context.TODO(), task) diff --git a/internal/datacoord/import_task_preimport.go b/internal/datacoord/import_task_preimport.go index f37c6867a8..f5e603f914 100644 --- a/internal/datacoord/import_task_preimport.go +++ b/internal/datacoord/import_task_preimport.go @@ -41,7 +41,7 @@ var _ ImportTask = (*preImportTask)(nil) type preImportTask struct { task atomic.Pointer[datapb.PreImportTask] - imeta ImportMeta + importMeta ImportMeta tr *timerecord.TimeRecorder times *taskcommon.Times retryTimes int64 @@ -109,7 +109,7 @@ func (p *preImportTask) GetTaskVersion() int64 { func (p *preImportTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster) { log.Info("processing pending preimport task...", WrapTaskLog(p)...) - job := p.imeta.GetJob(context.TODO(), p.GetJobID()) + job := p.importMeta.GetJob(context.TODO(), p.GetJobID()) req := AssemblePreImportRequest(p, job) err := cluster.CreatePreImport(nodeID, req, p.GetTaskSlot()) @@ -118,7 +118,7 @@ func (p *preImportTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster p.retryTimes++ return } - err = p.imeta.UpdateTask(context.TODO(), p.GetTaskID(), + err = p.importMeta.UpdateTask(context.TODO(), p.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress), UpdateNodeID(nodeID)) if err != nil { @@ -137,7 +137,7 @@ func (p *preImportTask) QueryTaskOnWorker(cluster session.Cluster) { } resp, err := cluster.QueryPreImport(p.GetNodeID(), req) if err != nil { - updateErr := p.imeta.UpdateTask(context.TODO(), p.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Pending)) + updateErr := p.importMeta.UpdateTask(context.TODO(), p.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Pending)) if updateErr != nil { log.Warn("failed to update preimport task state to pending", WrapTaskLog(p, zap.Error(updateErr))...) } @@ -145,7 +145,7 @@ func (p *preImportTask) QueryTaskOnWorker(cluster session.Cluster) { return } if resp.GetState() == datapb.ImportTaskStateV2_Failed { - err = p.imeta.UpdateJob(context.TODO(), p.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), + err = p.importMeta.UpdateJob(context.TODO(), p.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(resp.GetReason())) if err != nil { log.Warn("failed to update job state to Failed", zap.Int64("jobID", p.GetJobID()), zap.Error(err)) @@ -157,7 +157,7 @@ func (p *preImportTask) QueryTaskOnWorker(cluster session.Cluster) { if resp.GetState() == datapb.ImportTaskStateV2_Completed { actions = append(actions, UpdateState(datapb.ImportTaskStateV2_Completed)) } - err = p.imeta.UpdateTask(context.TODO(), p.GetTaskID(), actions...) + err = p.importMeta.UpdateTask(context.TODO(), p.GetTaskID(), actions...) if err != nil { log.Warn("update preimport task failed", WrapTaskLog(p, zap.Error(err))...) return @@ -172,7 +172,7 @@ func (p *preImportTask) QueryTaskOnWorker(cluster session.Cluster) { } func (p *preImportTask) DropTaskOnWorker(cluster session.Cluster) { - err := DropImportTask(p, cluster, p.imeta) + err := DropImportTask(p, cluster, p.importMeta) if err != nil { log.Warn("drop import failed", WrapTaskLog(p, zap.Error(err))...) return @@ -190,9 +190,9 @@ func (p *preImportTask) GetTR() *timerecord.TimeRecorder { func (p *preImportTask) Clone() ImportTask { cloned := &preImportTask{ - imeta: p.imeta, - tr: p.tr, - times: p.times, + importMeta: p.importMeta, + tr: p.tr, + times: p.times, } cloned.task.Store(typeutil.Clone(p.task.Load())) return cloned diff --git a/internal/datacoord/import_task_preimport_test.go b/internal/datacoord/import_task_preimport_test.go index fac45ded59..ee666ff86e 100644 --- a/internal/datacoord/import_task_preimport_test.go +++ b/internal/datacoord/import_task_preimport_test.go @@ -81,8 +81,8 @@ func TestPreImportTask_CreateTaskOnWorker(t *testing.T) { State: datapb.ImportTaskStateV2_Pending, } task := &preImportTask{ - imeta: im, - tr: timerecord.NewTimeRecorder(""), + importMeta: im, + tr: timerecord.NewTimeRecorder(""), } task.task.Store(taskProto) err = im.AddTask(context.TODO(), task) @@ -120,8 +120,8 @@ func TestPreImportTask_CreateTaskOnWorker(t *testing.T) { State: datapb.ImportTaskStateV2_Pending, } task := &preImportTask{ - imeta: im, - tr: timerecord.NewTimeRecorder(""), + importMeta: im, + tr: timerecord.NewTimeRecorder(""), } task.task.Store(taskProto) err = im.AddTask(context.TODO(), task) @@ -132,7 +132,7 @@ func TestPreImportTask_CreateTaskOnWorker(t *testing.T) { catalog = mocks.NewDataCoordCatalog(t) catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(errors.New("mock err")) - task.imeta.(*importMeta).catalog = catalog + task.importMeta.(*importMeta).catalog = catalog task.CreateTaskOnWorker(1, cluster) assert.Equal(t, datapb.ImportTaskStateV2_Pending, task.GetState()) }) @@ -163,8 +163,8 @@ func TestPreImportTask_CreateTaskOnWorker(t *testing.T) { State: datapb.ImportTaskStateV2_Pending, } task := &preImportTask{ - imeta: im, - tr: timerecord.NewTimeRecorder(""), + importMeta: im, + tr: timerecord.NewTimeRecorder(""), } task.task.Store(taskProto) err = im.AddTask(context.TODO(), task) @@ -196,8 +196,8 @@ func TestPreImportTask_QueryTaskOnWorker(t *testing.T) { State: datapb.ImportTaskStateV2_InProgress, } task := &preImportTask{ - imeta: im, - tr: timerecord.NewTimeRecorder(""), + importMeta: im, + tr: timerecord.NewTimeRecorder(""), } task.task.Store(taskProto) err = im.AddTask(context.TODO(), task) @@ -236,8 +236,8 @@ func TestPreImportTask_QueryTaskOnWorker(t *testing.T) { State: datapb.ImportTaskStateV2_InProgress, } task := &preImportTask{ - imeta: im, - tr: timerecord.NewTimeRecorder(""), + importMeta: im, + tr: timerecord.NewTimeRecorder(""), } task.task.Store(taskProto) err = im.AddTask(context.TODO(), task) @@ -271,8 +271,8 @@ func TestPreImportTask_QueryTaskOnWorker(t *testing.T) { State: datapb.ImportTaskStateV2_InProgress, } task := &preImportTask{ - imeta: im, - tr: timerecord.NewTimeRecorder(""), + importMeta: im, + tr: timerecord.NewTimeRecorder(""), } task.task.Store(taskProto) err = im.AddTask(context.TODO(), task) @@ -306,8 +306,8 @@ func TestPreImportTask_DropTaskOnWorker(t *testing.T) { State: datapb.ImportTaskStateV2_Completed, } task := &preImportTask{ - imeta: im, - tr: timerecord.NewTimeRecorder(""), + importMeta: im, + tr: timerecord.NewTimeRecorder(""), } task.task.Store(taskProto) err = im.AddTask(context.TODO(), task) @@ -338,8 +338,8 @@ func TestPreImportTask_DropTaskOnWorker(t *testing.T) { State: datapb.ImportTaskStateV2_Completed, } task := &preImportTask{ - imeta: im, - tr: timerecord.NewTimeRecorder(""), + importMeta: im, + tr: timerecord.NewTimeRecorder(""), } task.task.Store(taskProto) err = im.AddTask(context.TODO(), task) diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 5fe082a489..91b5283ba8 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -59,7 +59,7 @@ func WrapTaskLog(task ImportTask, fields ...zap.Field) []zap.Field { } func NewPreImportTasks(fileGroups [][]*internalpb.ImportFile, - job ImportJob, alloc allocator.Allocator, imeta ImportMeta, + job ImportJob, alloc allocator.Allocator, importMeta ImportMeta, ) ([]ImportTask, error) { idStart, _, err := alloc.AllocN(int64(len(fileGroups))) if err != nil { @@ -81,9 +81,9 @@ func NewPreImportTasks(fileGroups [][]*internalpb.ImportFile, CreatedTime: time.Now().Format("2006-01-02T15:04:05Z07:00"), } task := &preImportTask{ - imeta: imeta, - tr: timerecord.NewTimeRecorder("preimport task"), - times: taskcommon.NewTimes(), + importMeta: importMeta, + tr: timerecord.NewTimeRecorder("preimport task"), + times: taskcommon.NewTimes(), } task.task.Store(taskProto) tasks = append(tasks, task) @@ -92,7 +92,7 @@ func NewPreImportTasks(fileGroups [][]*internalpb.ImportFile, } func NewImportTasks(fileGroups [][]*datapb.ImportFileStats, - job ImportJob, alloc allocator.Allocator, meta *meta, imeta ImportMeta, + job ImportJob, alloc allocator.Allocator, meta *meta, importMeta ImportMeta, ) ([]ImportTask, error) { idBegin, _, err := alloc.AllocN(int64(len(fileGroups))) if err != nil { @@ -110,11 +110,11 @@ func NewImportTasks(fileGroups [][]*datapb.ImportFileStats, CreatedTime: time.Now().Format("2006-01-02T15:04:05Z07:00"), } task := &importTask{ - alloc: alloc, - meta: meta, - imeta: imeta, - tr: timerecord.NewTimeRecorder("import task"), - times: taskcommon.NewTimes(), + alloc: alloc, + meta: meta, + importMeta: importMeta, + tr: timerecord.NewTimeRecorder("import task"), + times: taskcommon.NewTimes(), } task.task.Store(taskProto) segments, err := AssignSegments(job, task, alloc, meta) @@ -389,7 +389,7 @@ func RegroupImportFiles(job ImportJob, files []*datapb.ImportFileStats, allDiskI return fileGroups } -func CheckDiskQuota(job ImportJob, meta *meta, imeta ImportMeta) (int64, error) { +func CheckDiskQuota(ctx context.Context, job ImportJob, meta *meta, importMeta ImportMeta) (int64, error) { if !Params.QuotaConfig.DiskProtectionEnabled.GetAsBool() { return 0, nil } @@ -402,7 +402,7 @@ func CheckDiskQuota(job ImportJob, meta *meta, imeta ImportMeta) (int64, error) requestedTotal int64 requestedCollections = make(map[int64]int64) ) - for _, j := range imeta.GetJobBy(context.TODO()) { + for _, j := range importMeta.GetJobBy(ctx) { requested := j.GetRequestedDiskSize() requestedTotal += requested requestedCollections[j.GetCollectionID()] += requested @@ -412,7 +412,7 @@ func CheckDiskQuota(job ImportJob, meta *meta, imeta ImportMeta) (int64, error) quotaInfo := meta.GetQuotaInfo() totalUsage, collectionsUsage := quotaInfo.TotalBinlogSize, quotaInfo.CollectionBinlogSize - tasks := imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType)) + tasks := importMeta.GetTaskBy(ctx, WithJob(job.GetJobID()), WithType(PreImportTaskType)) files := make([]*datapb.ImportFileStats, 0) for _, task := range tasks { files = append(files, task.GetFileStats()...) @@ -445,20 +445,20 @@ func CheckDiskQuota(job ImportJob, meta *meta, imeta ImportMeta) (int64, error) return requestSize, nil } -func getPendingProgress(jobID int64, imeta ImportMeta) float32 { - tasks := imeta.GetTaskBy(context.TODO(), WithJob(jobID), WithType(PreImportTaskType)) +func getPendingProgress(ctx context.Context, jobID int64, importMeta ImportMeta) float32 { + tasks := importMeta.GetTaskBy(context.TODO(), WithJob(jobID), WithType(PreImportTaskType)) preImportingFiles := lo.SumBy(tasks, func(task ImportTask) int { return len(task.GetFileStats()) }) - totalFiles := len(imeta.GetJob(context.TODO(), jobID).GetFiles()) + totalFiles := len(importMeta.GetJob(ctx, jobID).GetFiles()) if totalFiles == 0 { return 1 } return float32(preImportingFiles) / float32(totalFiles) } -func getPreImportingProgress(jobID int64, imeta ImportMeta) float32 { - tasks := imeta.GetTaskBy(context.TODO(), WithJob(jobID), WithType(PreImportTaskType)) +func getPreImportingProgress(ctx context.Context, jobID int64, importMeta ImportMeta) float32 { + tasks := importMeta.GetTaskBy(ctx, WithJob(jobID), WithType(PreImportTaskType)) completedTasks := lo.Filter(tasks, func(task ImportTask, _ int) bool { return task.GetState() == datapb.ImportTaskStateV2_Completed }) @@ -468,8 +468,8 @@ func getPreImportingProgress(jobID int64, imeta ImportMeta) float32 { return float32(len(completedTasks)) / float32(len(tasks)) } -func getImportRowsInfo(jobID int64, imeta ImportMeta, meta *meta) (importedRows, totalRows int64) { - tasks := imeta.GetTaskBy(context.TODO(), WithJob(jobID), WithType(ImportTaskType)) +func getImportRowsInfo(ctx context.Context, jobID int64, importMeta ImportMeta, meta *meta) (importedRows, totalRows int64) { + tasks := importMeta.GetTaskBy(ctx, WithJob(jobID), WithType(ImportTaskType)) segmentIDs := make([]int64, 0) for _, task := range tasks { totalRows += lo.SumBy(task.GetFileStats(), func(file *datapb.ImportFileStats) int64 { @@ -481,19 +481,19 @@ func getImportRowsInfo(jobID int64, imeta ImportMeta, meta *meta) (importedRows, return } -func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) (float32, int64, int64) { - importedRows, totalRows := getImportRowsInfo(jobID, imeta, meta) +func getImportingProgress(ctx context.Context, jobID int64, importMeta ImportMeta, meta *meta) (float32, int64, int64) { + importedRows, totalRows := getImportRowsInfo(ctx, jobID, importMeta, meta) if totalRows == 0 { return 1, importedRows, totalRows } return float32(importedRows) / float32(totalRows), importedRows, totalRows } -func getStatsProgress(jobID int64, imeta ImportMeta, sjm StatsInspector) float32 { +func getStatsProgress(ctx context.Context, jobID int64, importMeta ImportMeta, sjm StatsInspector) float32 { if !Params.DataCoordCfg.EnableStatsTask.GetAsBool() { return 1 } - tasks := imeta.GetTaskBy(context.TODO(), WithJob(jobID), WithType(ImportTaskType)) + tasks := importMeta.GetTaskBy(ctx, WithJob(jobID), WithType(ImportTaskType)) originSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 { return t.(*importTask).GetSegmentIDs() }) @@ -510,12 +510,12 @@ func getStatsProgress(jobID int64, imeta ImportMeta, sjm StatsInspector) float32 return float32(doneCnt) / float32(len(originSegmentIDs)) } -func getIndexBuildingProgress(jobID int64, imeta ImportMeta, meta *meta) float32 { - job := imeta.GetJob(context.TODO(), jobID) +func getIndexBuildingProgress(ctx context.Context, jobID int64, importMeta ImportMeta, meta *meta) float32 { + job := importMeta.GetJob(ctx, jobID) if !Params.DataCoordCfg.WaitForIndex.GetAsBool() { return 1 } - tasks := imeta.GetTaskBy(context.TODO(), WithJob(jobID), WithType(ImportTaskType)) + tasks := importMeta.GetTaskBy(ctx, WithJob(jobID), WithType(ImportTaskType)) originSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 { return t.(*importTask).GetSegmentIDs() }) @@ -542,36 +542,36 @@ func getIndexBuildingProgress(jobID int64, imeta ImportMeta, meta *meta) float32 // 10%: Completed // TODO: Wrap a function to map status to user status. // TODO: Save these progress to job instead of recalculating. -func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta, sjm StatsInspector) (int64, internalpb.ImportJobState, int64, int64, string) { - job := imeta.GetJob(context.TODO(), jobID) +func GetJobProgress(ctx context.Context, jobID int64, importMeta ImportMeta, meta *meta, sjm StatsInspector) (int64, internalpb.ImportJobState, int64, int64, string) { + job := importMeta.GetJob(ctx, jobID) if job == nil { return 0, internalpb.ImportJobState_Failed, 0, 0, fmt.Sprintf("import job does not exist, jobID=%d", jobID) } switch job.GetState() { case internalpb.ImportJobState_Pending: - progress := getPendingProgress(jobID, imeta) + progress := getPendingProgress(ctx, jobID, importMeta) return int64(progress * 10), internalpb.ImportJobState_Pending, 0, 0, "" case internalpb.ImportJobState_PreImporting: - progress := getPreImportingProgress(jobID, imeta) + progress := getPreImportingProgress(ctx, jobID, importMeta) return 10 + int64(progress*30), internalpb.ImportJobState_Importing, 0, 0, "" case internalpb.ImportJobState_Importing: - progress, importedRows, totalRows := getImportingProgress(jobID, imeta, meta) + progress, importedRows, totalRows := getImportingProgress(ctx, jobID, importMeta, meta) return 10 + 30 + int64(progress*30), internalpb.ImportJobState_Importing, importedRows, totalRows, "" case internalpb.ImportJobState_Stats: - progress := getStatsProgress(jobID, imeta, sjm) - _, totalRows := getImportRowsInfo(jobID, imeta, meta) + progress := getStatsProgress(ctx, jobID, importMeta, sjm) + _, totalRows := getImportRowsInfo(ctx, jobID, importMeta, meta) return 10 + 30 + 30 + int64(progress*10), internalpb.ImportJobState_Importing, totalRows, totalRows, "" case internalpb.ImportJobState_IndexBuilding: - progress := getIndexBuildingProgress(jobID, imeta, meta) - _, totalRows := getImportRowsInfo(jobID, imeta, meta) + progress := getIndexBuildingProgress(ctx, jobID, importMeta, meta) + _, totalRows := getImportRowsInfo(ctx, jobID, importMeta, meta) return 10 + 30 + 30 + 10 + int64(progress*10), internalpb.ImportJobState_Importing, totalRows, totalRows, "" case internalpb.ImportJobState_Completed: - _, totalRows := getImportRowsInfo(jobID, imeta, meta) + _, totalRows := getImportRowsInfo(ctx, jobID, importMeta, meta) return 100, internalpb.ImportJobState_Completed, totalRows, totalRows, "" case internalpb.ImportJobState_Failed: @@ -580,9 +580,9 @@ func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta, sjm StatsInspecto return 0, internalpb.ImportJobState_None, 0, 0, "unknown import job state" } -func GetTaskProgresses(jobID int64, imeta ImportMeta, meta *meta) []*internalpb.ImportTaskProgress { +func GetTaskProgresses(ctx context.Context, jobID int64, importMeta ImportMeta, meta *meta) []*internalpb.ImportTaskProgress { progresses := make([]*internalpb.ImportTaskProgress, 0) - tasks := imeta.GetTaskBy(context.TODO(), WithJob(jobID), WithType(ImportTaskType)) + tasks := importMeta.GetTaskBy(ctx, WithJob(jobID), WithType(ImportTaskType)) for _, task := range tasks { totalRows := lo.SumBy(task.GetFileStats(), func(file *datapb.ImportFileStats) int64 { return file.GetTotalRows() diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 5950172218..ae94ab468b 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -412,7 +412,7 @@ func TestImportUtil_CheckDiskQuota(t *testing.T) { catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil) catalog.EXPECT().ListStatsTasks(mock.Anything).Return(nil, nil) - imeta, err := NewImportMeta(context.TODO(), catalog, nil, nil) + importMeta, err := NewImportMeta(context.TODO(), catalog, nil, nil) assert.NoError(t, err) broker := broker.NewMockBroker(t) @@ -426,7 +426,7 @@ func TestImportUtil_CheckDiskQuota(t *testing.T) { CollectionID: 100, }, } - err = imeta.AddJob(context.TODO(), job) + err = importMeta.AddJob(context.TODO(), job) assert.NoError(t, err) preImportTaskProto := &datapb.PreImportTask{ @@ -439,12 +439,12 @@ func TestImportUtil_CheckDiskQuota(t *testing.T) { } pit := &preImportTask{} pit.task.Store(preImportTaskProto) - err = imeta.AddTask(context.TODO(), pit) + err = importMeta.AddTask(context.TODO(), pit) assert.NoError(t, err) Params.Save(Params.QuotaConfig.DiskProtectionEnabled.Key, "false") defer Params.Reset(Params.QuotaConfig.DiskProtectionEnabled.Key) - _, err = CheckDiskQuota(job, meta, imeta) + _, err = CheckDiskQuota(context.TODO(), job, meta, importMeta) assert.NoError(t, err) segment := &SegmentInfo{ @@ -459,7 +459,7 @@ func TestImportUtil_CheckDiskQuota(t *testing.T) { {Key: importutilv2.BackupFlag, Value: "true"}, {Key: importutilv2.SkipDQC, Value: "true"}, } - _, err = CheckDiskQuota(job, meta, imeta) + _, err = CheckDiskQuota(context.TODO(), job, meta, importMeta) assert.NoError(t, err) job.Options = nil @@ -467,17 +467,17 @@ func TestImportUtil_CheckDiskQuota(t *testing.T) { Params.Save(Params.QuotaConfig.DiskQuotaPerCollection.Key, "10000") defer Params.Reset(Params.QuotaConfig.DiskQuota.Key) defer Params.Reset(Params.QuotaConfig.DiskQuotaPerCollection.Key) - requestSize, err := CheckDiskQuota(job, meta, imeta) + requestSize, err := CheckDiskQuota(context.TODO(), job, meta, importMeta) assert.NoError(t, err) assert.Equal(t, int64(3000*1024*1024), requestSize) Params.Save(Params.QuotaConfig.DiskQuota.Key, "5000") - _, err = CheckDiskQuota(job, meta, imeta) + _, err = CheckDiskQuota(context.TODO(), job, meta, importMeta) assert.True(t, errors.Is(err, merr.ErrServiceQuotaExceeded)) Params.Save(Params.QuotaConfig.DiskQuota.Key, "10000") Params.Save(Params.QuotaConfig.DiskQuotaPerCollection.Key, "5000") - _, err = CheckDiskQuota(job, meta, imeta) + _, err = CheckDiskQuota(context.TODO(), job, meta, importMeta) assert.True(t, errors.Is(err, merr.ErrServiceQuotaExceeded)) } @@ -491,7 +491,7 @@ func TestImportUtil_DropImportTask(t *testing.T) { catalog.EXPECT().ListImportTasks(mock.Anything).Return(nil, nil) catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil) - imeta, err := NewImportMeta(context.TODO(), catalog, nil, nil) + importMeta, err := NewImportMeta(context.TODO(), catalog, nil, nil) assert.NoError(t, err) taskProto := &datapb.ImportTaskV2{ @@ -500,10 +500,10 @@ func TestImportUtil_DropImportTask(t *testing.T) { } task := &importTask{} task.task.Store(taskProto) - err = imeta.AddTask(context.TODO(), task) + err = importMeta.AddTask(context.TODO(), task) assert.NoError(t, err) - err = DropImportTask(task, cluster, imeta) + err = DropImportTask(task, cluster, importMeta) assert.NoError(t, err) } @@ -599,7 +599,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil) catalog.EXPECT().ListStatsTasks(mock.Anything).Return(nil, nil) - imeta, err := NewImportMeta(context.TODO(), catalog, nil, nil) + importMeta, err := NewImportMeta(context.TODO(), catalog, nil, nil) assert.NoError(t, err) broker := broker.NewMockBroker(t) @@ -625,7 +625,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { Files: []*internalpb.ImportFile{file1, file2, file3}, }, } - err = imeta.AddJob(context.TODO(), job) + err = importMeta.AddJob(context.TODO(), job) assert.NoError(t, err) preImportTaskProto := &datapb.PreImportTask{ @@ -645,7 +645,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { pit1 := &preImportTask{} pit1.task.Store(preImportTaskProto) - err = imeta.AddTask(context.TODO(), pit1) + err = importMeta.AddTask(context.TODO(), pit1) assert.NoError(t, err) preImportTaskProto2 := &datapb.PreImportTask{ @@ -660,7 +660,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { } pit2 := &preImportTask{} pit2.task.Store(preImportTaskProto2) - err = imeta.AddTask(context.TODO(), pit2) + err = importMeta.AddTask(context.TODO(), pit2) assert.NoError(t, err) taskProto1 := &datapb.ImportTaskV2{ @@ -681,7 +681,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { } it1 := &importTask{} it1.task.Store(taskProto1) - err = imeta.AddTask(context.TODO(), it1) + err = importMeta.AddTask(context.TODO(), it1) assert.NoError(t, err) err = meta.AddSegment(ctx, &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ID: 10, IsImporting: true, State: commonpb.SegmentState_Flushed, NumOfRows: 50}, @@ -710,7 +710,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { } it2 := &importTask{} it2.task.Store(taskProto2) - err = imeta.AddTask(context.TODO(), it2) + err = importMeta.AddTask(context.TODO(), it2) assert.NoError(t, err) err = meta.AddSegment(ctx, &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ID: 20, IsImporting: true, State: commonpb.SegmentState_Flushed, NumOfRows: 50}, @@ -726,40 +726,40 @@ func TestImportUtil_GetImportProgress(t *testing.T) { assert.NoError(t, err) // failed state - err = imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(mockErr)) + err = importMeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(mockErr)) assert.NoError(t, err) - progress, state, _, _, reason := GetJobProgress(job.GetJobID(), imeta, meta, nil) + progress, state, _, _, reason := GetJobProgress(ctx, job.GetJobID(), importMeta, meta, nil) assert.Equal(t, int64(0), progress) assert.Equal(t, internalpb.ImportJobState_Failed, state) assert.Equal(t, mockErr, reason) // job does not exist - progress, state, _, _, reason = GetJobProgress(-1, imeta, meta, nil) + progress, state, _, _, reason = GetJobProgress(ctx, -1, importMeta, meta, nil) assert.Equal(t, int64(0), progress) assert.Equal(t, internalpb.ImportJobState_Failed, state) assert.NotEqual(t, "", reason) // pending state - err = imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Pending)) + err = importMeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Pending)) assert.NoError(t, err) - progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, nil) + progress, state, _, _, reason = GetJobProgress(ctx, job.GetJobID(), importMeta, meta, nil) assert.Equal(t, int64(10), progress) assert.Equal(t, internalpb.ImportJobState_Pending, state) assert.Equal(t, "", reason) // preImporting state - err = imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting)) + err = importMeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting)) assert.NoError(t, err) - progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, nil) + progress, state, _, _, reason = GetJobProgress(ctx, job.GetJobID(), importMeta, meta, nil) assert.Equal(t, int64(10+30), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) // importing state, segmentImportedRows/totalRows = 0.5 - err = imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing)) + err = importMeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing)) assert.NoError(t, err) - progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, nil) + progress, state, _, _, reason = GetJobProgress(ctx, job.GetJobID(), importMeta, meta, nil) assert.Equal(t, int64(10+30+30*0.5), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) @@ -777,13 +777,13 @@ func TestImportUtil_GetImportProgress(t *testing.T) { assert.NoError(t, err) err = meta.UpdateSegmentsInfo(context.TODO(), UpdateImportedRows(22, 100)) assert.NoError(t, err) - progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, nil) + progress, state, _, _, reason = GetJobProgress(ctx, job.GetJobID(), importMeta, meta, nil) assert.Equal(t, int64(float32(10+30+30)), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) // stats state, len(statsSegmentIDs) / (len(originalSegmentIDs) = 0.5 - err = imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Stats)) + err = importMeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Stats)) assert.NoError(t, err) sjm := NewMockStatsJobManager(t) sjm.EXPECT().GetStatsTask(mock.Anything, mock.Anything).RunAndReturn(func(segmentID int64, _ indexpb.StatsSubJob) *indexpb.StatsTask { @@ -796,7 +796,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { State: indexpb.JobState_JobStateInProgress, } }) - progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, sjm) + progress, state, _, _, reason = GetJobProgress(ctx, job.GetJobID(), importMeta, meta, sjm) assert.Equal(t, int64(10+30+30+10*0.5), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) @@ -806,15 +806,15 @@ func TestImportUtil_GetImportProgress(t *testing.T) { sjm.EXPECT().GetStatsTask(mock.Anything, mock.Anything).Return(&indexpb.StatsTask{ State: indexpb.JobState_JobStateFinished, }) - progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, sjm) + progress, state, _, _, reason = GetJobProgress(ctx, job.GetJobID(), importMeta, meta, sjm) assert.Equal(t, int64(10+30+30+10), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) // completed state - err = imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed)) + err = importMeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed)) assert.NoError(t, err) - progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, sjm) + progress, state, _, _, reason = GetJobProgress(ctx, job.GetJobID(), importMeta, meta, sjm) assert.Equal(t, int64(100), progress) assert.Equal(t, internalpb.ImportJobState_Completed, state) assert.Equal(t, "", reason) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index e9d667df1f..7892ca8018 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -319,9 +319,9 @@ func (s *Server) initDataCoord() error { s.initGarbageCollection(storageCli) - s.importInspector = NewImportInspector(s.meta, s.importMeta, s.globalScheduler) + s.importInspector = NewImportInspector(s.ctx, s.meta, s.importMeta, s.globalScheduler) - s.importChecker = NewImportChecker(s.meta, s.broker, s.allocator, s.importMeta, s.statsInspector, s.compactionTriggerManager) + s.importChecker = NewImportChecker(s.ctx, s.meta, s.broker, s.allocator, s.importMeta, s.statsInspector, s.compactionTriggerManager) s.syncSegmentsScheduler = newSyncSegmentsScheduler(s.meta, s.channelManager, s.sessionManager) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index a25c6e450b..70849509a4 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1908,7 +1908,7 @@ func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImport resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("import job does not exist, jobID=%d", jobID))) return resp, nil } - progress, state, importedRows, totalRows, reason := GetJobProgress(jobID, s.importMeta, s.meta, s.statsInspector) + progress, state, importedRows, totalRows, reason := GetJobProgress(ctx, jobID, s.importMeta, s.meta, s.statsInspector) resp.State = state resp.Reason = reason resp.Progress = progress @@ -1917,7 +1917,7 @@ func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImport resp.CompleteTime = job.GetCompleteTime() resp.ImportedRows = importedRows resp.TotalRows = totalRows - resp.TaskProgresses = GetTaskProgresses(jobID, s.importMeta, s.meta) + resp.TaskProgresses = GetTaskProgresses(ctx, jobID, s.importMeta, s.meta) log.Info("GetImportProgress done", zap.String("jobState", job.GetState().String()), zap.Any("resp", resp)) return resp, nil } @@ -1945,7 +1945,7 @@ func (s *Server) ListImports(ctx context.Context, req *internalpb.ListImportsReq } for _, job := range jobs { - progress, state, _, _, reason := GetJobProgress(job.GetJobID(), s.importMeta, s.meta, s.statsInspector) + progress, state, _, _, reason := GetJobProgress(ctx, job.GetJobID(), s.importMeta, s.meta, s.statsInspector) resp.JobIDs = append(resp.JobIDs, fmt.Sprintf("%d", job.GetJobID())) resp.States = append(resp.States, state) resp.Reasons = append(resp.Reasons, reason)