enhance: Enhance import context (#42021)

Rename `imeta` to `importMeta` to improve readability, and enhance
import related context usage.

issue: https://github.com/milvus-io/milvus/issues/41123

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-05-23 12:58:27 +08:00 committed by GitHub
parent 83c9527e70
commit f71930e8db
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 380 additions and 376 deletions

View File

@ -86,7 +86,7 @@ type CompactionTriggerManager struct {
allocator allocator.Allocator
meta *meta
imeta ImportMeta
importMeta ImportMeta
l0Policy *l0CompactionPolicy
clusteringPolicy *clusteringCompactionPolicy
singlePolicy *singleCompactionPolicy
@ -103,13 +103,13 @@ type CompactionTriggerManager struct {
compactionChanLock sync.Mutex
}
func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, inspector CompactionInspector, meta *meta, imeta ImportMeta) *CompactionTriggerManager {
func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, inspector CompactionInspector, meta *meta, importMeta ImportMeta) *CompactionTriggerManager {
m := &CompactionTriggerManager{
allocator: alloc,
handler: handler,
inspector: inspector,
meta: meta,
imeta: imeta,
importMeta: importMeta,
pauseCompactionChanMap: make(map[int64]chan struct{}),
resumeCompactionChanMap: make(map[int64]chan struct{}),
}
@ -386,7 +386,7 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context,
func (m *CompactionTriggerManager) addL0ImportTaskForImport(ctx context.Context, collection *collectionInfo, view CompactionView) error {
// add l0 import task for the collection if the collection is importing
importJobs := m.imeta.GetJobBy(ctx,
importJobs := m.importMeta.GetJobBy(ctx,
WithCollectionID(collection.ID),
WithoutJobStates(internalpb.ImportJobState_Completed, internalpb.ImportJobState_Failed),
WithoutL0Job(),
@ -442,13 +442,13 @@ func (m *CompactionTriggerManager) addL0ImportTaskForImport(ctx context.Context,
},
},
},
}, job, m.allocator, m.meta, m.imeta)
}, job, m.allocator, m.meta, m.importMeta)
if err != nil {
log.Warn("new import tasks failed", zap.Error(err))
return err
}
for _, t := range newTasks {
err = m.imeta.AddTask(ctx, t)
err = m.importMeta.AddTask(ctx, t)
if err != nil {
log.Warn("add new l0 import task from l0 compaction failed", WrapTaskLog(t, zap.Error(err))...)
return err

View File

@ -31,12 +31,12 @@ func TestCompactionTriggerManagerSuite(t *testing.T) {
type CompactionTriggerManagerSuite struct {
suite.Suite
mockAlloc *allocator.MockAllocator
handler Handler
inspector *MockCompactionInspector
testLabel *CompactionGroupLabel
meta *meta
imeta ImportMeta
mockAlloc *allocator.MockAllocator
handler Handler
inspector *MockCompactionInspector
testLabel *CompactionGroupLabel
meta *meta
importMeta ImportMeta
triggerManager *CompactionTriggerManager
}
@ -62,8 +62,8 @@ func (s *CompactionTriggerManagerSuite) SetupTest() {
catalog.EXPECT().ListImportJobs(mock.Anything).Return([]*datapb.ImportJob{}, nil)
importMeta, err := NewImportMeta(context.TODO(), catalog, s.mockAlloc, s.meta)
s.Require().NoError(err)
s.imeta = importMeta
s.triggerManager = NewCompactionTriggerManager(s.mockAlloc, s.handler, s.inspector, s.meta, s.imeta)
s.importMeta = importMeta
s.triggerManager = NewCompactionTriggerManager(s.mockAlloc, s.handler, s.inspector, s.meta, s.importMeta)
}
func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() {
@ -368,8 +368,7 @@ func TestCompactionAndImport(t *testing.T) {
catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil)
importMeta, err := NewImportMeta(context.TODO(), catalog, mockAlloc, meta)
assert.NoError(t, err)
imeta := importMeta
triggerManager := NewCompactionTriggerManager(mockAlloc, handler, inspector, meta, imeta)
triggerManager := NewCompactionTriggerManager(mockAlloc, handler, inspector, meta, importMeta)
Params.Save(Params.DataCoordCfg.L0CompactionTriggerInterval.Key, "1")
defer Params.Reset(Params.DataCoordCfg.L0CompactionTriggerInterval.Key)

View File

@ -43,10 +43,11 @@ type ImportChecker interface {
}
type importChecker struct {
ctx context.Context
meta *meta
broker broker.Broker
alloc allocator.Allocator
imeta ImportMeta
importMeta ImportMeta
si StatsInspector
l0CompactionTrigger TriggerManager
@ -54,18 +55,20 @@ type importChecker struct {
closeChan chan struct{}
}
func NewImportChecker(meta *meta,
func NewImportChecker(ctx context.Context,
meta *meta,
broker broker.Broker,
alloc allocator.Allocator,
imeta ImportMeta,
importMeta ImportMeta,
si StatsInspector,
l0CompactionTrigger TriggerManager,
) ImportChecker {
return &importChecker{
ctx: ctx,
meta: meta,
broker: broker,
alloc: alloc,
imeta: imeta,
importMeta: importMeta,
si: si,
l0CompactionTrigger: l0CompactionTrigger,
closeChan: make(chan struct{}),
@ -86,7 +89,7 @@ func (c *importChecker) Start() {
log.Info("import checker exited")
return
case <-ticker1.C:
jobs := c.imeta.GetJobBy(context.TODO())
jobs := c.importMeta.GetJobBy(c.ctx)
for _, job := range jobs {
if !funcutil.SliceSetEqual[string](job.GetVchannels(), job.GetReadyVchannels()) {
// wait for all channels to send signals
@ -112,7 +115,7 @@ func (c *importChecker) Start() {
}
}
case <-ticker2.C:
jobs := c.imeta.GetJobBy(context.TODO())
jobs := c.importMeta.GetJobBy(c.ctx)
for _, job := range jobs {
c.tryTimeoutJob(job)
c.checkGC(job)
@ -168,9 +171,9 @@ func (c *importChecker) LogTaskStats() {
metrics.ImportTasks.WithLabelValues(taskType.String(), datapb.ImportTaskStateV2_Completed.String()).Set(float64(completed))
metrics.ImportTasks.WithLabelValues(taskType.String(), datapb.ImportTaskStateV2_Failed.String()).Set(float64(failed))
}
tasks := c.imeta.GetTaskBy(context.TODO(), WithType(PreImportTaskType))
tasks := c.importMeta.GetTaskBy(c.ctx, WithType(PreImportTaskType))
logFunc(tasks, PreImportTaskType)
tasks = c.imeta.GetTaskBy(context.TODO(), WithType(ImportTaskType))
tasks = c.importMeta.GetTaskBy(c.ctx, WithType(ImportTaskType))
logFunc(tasks, ImportTaskType)
}
@ -178,7 +181,7 @@ func (c *importChecker) getLackFilesForPreImports(job ImportJob) []*internalpb.I
lacks := lo.KeyBy(job.GetFiles(), func(file *internalpb.ImportFile) int64 {
return file.GetId()
})
exists := c.imeta.GetTaskBy(context.TODO(), WithType(PreImportTaskType), WithJob(job.GetJobID()))
exists := c.importMeta.GetTaskBy(c.ctx, WithType(PreImportTaskType), WithJob(job.GetJobID()))
for _, task := range exists {
for _, file := range task.GetFileStats() {
delete(lacks, file.GetImportFile().GetId())
@ -188,7 +191,7 @@ func (c *importChecker) getLackFilesForPreImports(job ImportJob) []*internalpb.I
}
func (c *importChecker) getLackFilesForImports(job ImportJob) []*datapb.ImportFileStats {
preimports := c.imeta.GetTaskBy(context.TODO(), WithType(PreImportTaskType), WithJob(job.GetJobID()))
preimports := c.importMeta.GetTaskBy(c.ctx, WithType(PreImportTaskType), WithJob(job.GetJobID()))
lacks := make(map[int64]*datapb.ImportFileStats, 0)
for _, t := range preimports {
if t.GetState() != datapb.ImportTaskStateV2_Completed {
@ -199,7 +202,7 @@ func (c *importChecker) getLackFilesForImports(job ImportJob) []*datapb.ImportFi
lacks[stat.GetImportFile().GetId()] = stat
}
}
exists := c.imeta.GetTaskBy(context.TODO(), WithType(ImportTaskType), WithJob(job.GetJobID()))
exists := c.importMeta.GetTaskBy(c.ctx, WithType(ImportTaskType), WithJob(job.GetJobID()))
for _, task := range exists {
for _, file := range task.GetFileStats() {
delete(lacks, file.GetImportFile().GetId())
@ -216,13 +219,13 @@ func (c *importChecker) checkPendingJob(job ImportJob) {
}
fileGroups := lo.Chunk(lacks, Params.DataCoordCfg.FilesPerPreImportTask.GetAsInt())
newTasks, err := NewPreImportTasks(fileGroups, job, c.alloc, c.imeta)
newTasks, err := NewPreImportTasks(fileGroups, job, c.alloc, c.importMeta)
if err != nil {
log.Warn("new preimport tasks failed", zap.Error(err))
return
}
for _, t := range newTasks {
err = c.imeta.AddTask(context.TODO(), t)
err = c.importMeta.AddTask(c.ctx, t)
if err != nil {
log.Warn("add preimport task failed", WrapTaskLog(t, zap.Error(err))...)
return
@ -230,7 +233,7 @@ func (c *importChecker) checkPendingJob(job ImportJob) {
log.Info("add new preimport task", WrapTaskLog(t)...)
}
err = c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting))
err = c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting))
if err != nil {
log.Warn("failed to update job state to PreImporting", zap.Error(err))
return
@ -247,10 +250,10 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
return
}
requestSize, err := CheckDiskQuota(job, c.meta, c.imeta)
requestSize, err := CheckDiskQuota(c.ctx, job, c.meta, c.importMeta)
if err != nil {
log.Warn("import failed, disk quota exceeded", zap.Error(err))
err = c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
err = c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
if err != nil {
log.Warn("failed to update job state to Failed", zap.Error(err))
}
@ -259,16 +262,16 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
allDiskIndex := c.meta.indexMeta.AreAllDiskIndex(job.GetCollectionID(), job.GetSchema())
groups := RegroupImportFiles(job, lacks, allDiskIndex)
newTasks, err := NewImportTasks(groups, job, c.alloc, c.meta, c.imeta)
newTasks, err := NewImportTasks(groups, job, c.alloc, c.meta, c.importMeta)
if err != nil {
log.Warn("new import tasks failed", zap.Error(err))
return
}
for _, t := range newTasks {
err = c.imeta.AddTask(context.TODO(), t)
err = c.importMeta.AddTask(c.ctx, t)
if err != nil {
log.Warn("add new import task failed", WrapTaskLog(t, zap.Error(err))...)
updateErr := c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
updateErr := c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
if updateErr != nil {
log.Warn("failed to update job state to Failed", zap.Error(updateErr))
}
@ -277,7 +280,7 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
log.Info("add new import task", WrapTaskLog(t)...)
}
err = c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing), UpdateRequestedDiskSize(requestSize))
err = c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing), UpdateRequestedDiskSize(requestSize))
if err != nil {
log.Warn("failed to update job state to Importing", zap.Error(err))
return
@ -289,13 +292,13 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
func (c *importChecker) checkImportingJob(job ImportJob) {
log := log.With(zap.Int64("jobID", job.GetJobID()))
tasks := c.imeta.GetTaskBy(context.TODO(), WithType(ImportTaskType), WithJob(job.GetJobID()), WithRequestSource())
tasks := c.importMeta.GetTaskBy(c.ctx, WithType(ImportTaskType), WithJob(job.GetJobID()), WithRequestSource())
for _, t := range tasks {
if t.GetState() != datapb.ImportTaskStateV2_Completed {
return
}
}
err := c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Stats))
err := c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Stats))
if err != nil {
log.Warn("failed to update job state to Stats", zap.Error(err))
return
@ -308,7 +311,7 @@ func (c *importChecker) checkImportingJob(job ImportJob) {
func (c *importChecker) checkStatsJob(job ImportJob) {
log := log.With(zap.Int64("jobID", job.GetJobID()))
updateJobState := func(state internalpb.ImportJobState, reason string) {
err := c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(state), UpdateJobReason(reason))
err := c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(state), UpdateJobReason(reason))
if err != nil {
log.Warn("failed to update job state", zap.Error(err))
return
@ -329,7 +332,7 @@ func (c *importChecker) checkStatsJob(job ImportJob) {
taskCnt = 0
doneCnt = 0
)
tasks := c.imeta.GetTaskBy(context.TODO(), WithType(ImportTaskType), WithJob(job.GetJobID()))
tasks := c.importMeta.GetTaskBy(c.ctx, WithType(ImportTaskType), WithJob(job.GetJobID()))
for _, task := range tasks {
originSegmentIDs := task.(*importTask).GetSegmentIDs()
statsSegmentIDs := task.(*importTask).GetStatsSegmentIDs()
@ -365,7 +368,7 @@ func (c *importChecker) checkStatsJob(job ImportJob) {
func (c *importChecker) checkIndexBuildingJob(job ImportJob) {
log := log.With(zap.Int64("jobID", job.GetJobID()))
tasks := c.imeta.GetTaskBy(context.TODO(), WithType(ImportTaskType), WithJob(job.GetJobID()))
tasks := c.importMeta.GetTaskBy(c.ctx, WithType(ImportTaskType), WithJob(job.GetJobID()))
originSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 {
return t.(*importTask).GetSegmentIDs()
})
@ -413,7 +416,7 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) {
}
// all finished, update import job state to `Completed`.
completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00")
err := c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime))
err := c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime))
if err != nil {
log.Warn("failed to update job state to Completed", zap.Error(err))
return
@ -426,7 +429,7 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) {
func (c *importChecker) waitL0ImortTaskDone(job ImportJob) bool {
// wait all lo import tasks to be completed
l0ImportTasks := c.imeta.GetTaskBy(context.TODO(), WithType(ImportTaskType), WithJob(job.GetJobID()), WithL0CompactionSource())
l0ImportTasks := c.importMeta.GetTaskBy(c.ctx, WithType(ImportTaskType), WithJob(job.GetJobID()), WithL0CompactionSource())
for _, t := range l0ImportTasks {
if t.GetState() != datapb.ImportTaskStateV2_Completed {
log.Info("waiting for l0 import task...",
@ -442,7 +445,7 @@ func (c *importChecker) waitL0ImortTaskDone(job ImportJob) bool {
func (c *importChecker) updateSegmentState(originSegmentIDs, statsSegmentIDs []int64) bool {
// Here, all segment indexes have been successfully built, try unset isImporting flag for all segments.
isImportingSegments := lo.Filter(append(originSegmentIDs, statsSegmentIDs...), func(segmentID int64, _ int) bool {
segment := c.meta.GetSegment(context.TODO(), segmentID)
segment := c.meta.GetSegment(c.ctx, segmentID)
if segment == nil {
log.Warn("cannot find segment", zap.Int64("segmentID", segmentID))
return false
@ -463,7 +466,7 @@ func (c *importChecker) updateSegmentState(originSegmentIDs, statsSegmentIDs []i
op1 := UpdateStartPosition([]*datapb.SegmentStartPosition{{StartPosition: channelCP, SegmentID: segmentID}})
op2 := UpdateDmlPosition(segmentID, channelCP)
op3 := UpdateIsImporting(segmentID, false)
err = c.meta.UpdateSegmentsInfo(context.TODO(), op1, op2, op3)
err = c.meta.UpdateSegmentsInfo(c.ctx, op1, op2, op3)
if err != nil {
log.Warn("update import segment failed", zap.Error(err))
return true
@ -473,7 +476,7 @@ func (c *importChecker) updateSegmentState(originSegmentIDs, statsSegmentIDs []i
}
func (c *importChecker) checkFailedJob(job ImportJob) {
tasks := c.imeta.GetTaskBy(context.TODO(), WithType(ImportTaskType), WithJob(job.GetJobID()))
tasks := c.importMeta.GetTaskBy(c.ctx, WithType(ImportTaskType), WithJob(job.GetJobID()))
originSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 {
return t.(*importTask).GetSegmentIDs()
})
@ -488,7 +491,7 @@ func (c *importChecker) checkFailedJob(job ImportJob) {
}
func (c *importChecker) tryFailingTasks(job ImportJob) {
tasks := c.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithStates(datapb.ImportTaskStateV2_Pending,
tasks := c.importMeta.GetTaskBy(c.ctx, WithJob(job.GetJobID()), WithStates(datapb.ImportTaskStateV2_Pending,
datapb.ImportTaskStateV2_InProgress, datapb.ImportTaskStateV2_Completed))
if len(tasks) == 0 {
return
@ -496,7 +499,7 @@ func (c *importChecker) tryFailingTasks(job ImportJob) {
log.Warn("Import job has failed, all tasks with the same jobID will be marked as failed",
zap.Int64("jobID", job.GetJobID()), zap.String("reason", job.GetReason()))
for _, task := range tasks {
err := c.imeta.UpdateTask(context.TODO(), task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed),
err := c.importMeta.UpdateTask(c.ctx, task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed),
UpdateReason(job.GetReason()))
if err != nil {
log.Warn("failed to update import task state to failed", WrapTaskLog(task, zap.Error(err))...)
@ -510,7 +513,7 @@ func (c *importChecker) tryTimeoutJob(job ImportJob) {
if time.Now().After(timeoutTime) {
log.Warn("Import timeout, expired the specified time limit",
zap.Int64("jobID", job.GetJobID()), zap.Time("timeoutTime", timeoutTime))
err := c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed),
err := c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed),
UpdateJobReason("import timeout"))
if err != nil {
log.Warn("failed to update job state to Failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err))
@ -523,7 +526,7 @@ func (c *importChecker) checkCollection(collectionID int64, jobs []ImportJob) {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(c.ctx, 10*time.Second)
defer cancel()
has, err := c.broker.HasCollection(ctx, collectionID)
if err != nil {
@ -535,7 +538,7 @@ func (c *importChecker) checkCollection(collectionID int64, jobs []ImportJob) {
return job.GetState() != internalpb.ImportJobState_Failed
})
for _, job := range jobs {
err = c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed),
err = c.importMeta.UpdateJob(c.ctx, job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed),
UpdateJobReason(fmt.Sprintf("collection %d dropped", collectionID)))
if err != nil {
log.Warn("failed to update job state to Failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err))
@ -555,7 +558,7 @@ func (c *importChecker) checkGC(job ImportJob) {
GCRetention := Params.DataCoordCfg.ImportTaskRetention.GetAsDuration(time.Second)
log.Info("job has reached the GC retention",
zap.Time("cleanupTime", cleanupTime), zap.Duration("GCRetention", GCRetention))
tasks := c.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()))
tasks := c.importMeta.GetTaskBy(c.ctx, WithJob(job.GetJobID()))
shouldRemoveJob := true
for _, task := range tasks {
if job.GetState() == internalpb.ImportJobState_Failed && task.GetType() == ImportTaskType {
@ -568,7 +571,7 @@ func (c *importChecker) checkGC(job ImportJob) {
shouldRemoveJob = false
continue
}
err := c.imeta.RemoveTask(context.TODO(), task.GetTaskID())
err := c.importMeta.RemoveTask(c.ctx, task.GetTaskID())
if err != nil {
log.Warn("remove task failed during GC", WrapTaskLog(task, zap.Error(err))...)
shouldRemoveJob = false
@ -579,7 +582,7 @@ func (c *importChecker) checkGC(job ImportJob) {
if !shouldRemoveJob {
return
}
err := c.imeta.RemoveJob(context.TODO(), job.GetJobID())
err := c.importMeta.RemoveJob(c.ctx, job.GetJobID())
if err != nil {
log.Warn("remove import job failed", zap.Error(err))
return

View File

@ -47,10 +47,10 @@ import (
type ImportCheckerSuite struct {
suite.Suite
jobID int64
imeta ImportMeta
checker *importChecker
alloc *allocator.MockAllocator
jobID int64
importMeta ImportMeta
checker *importChecker
alloc *allocator.MockAllocator
}
func (s *ImportCheckerSuite) SetupTest() {
@ -74,9 +74,9 @@ func (s *ImportCheckerSuite) SetupTest() {
meta, err := newMeta(context.TODO(), catalog, nil, broker)
s.NoError(err)
imeta, err := NewImportMeta(context.TODO(), catalog, s.alloc, meta)
importMeta, err := NewImportMeta(context.TODO(), catalog, s.alloc, meta)
s.NoError(err)
s.imeta = imeta
s.importMeta = importMeta
sjm := NewMockStatsJobManager(s.T())
l0CompactionTrigger := NewMockTriggerManager(s.T())
@ -85,7 +85,7 @@ func (s *ImportCheckerSuite) SetupTest() {
l0CompactionTrigger.EXPECT().GetPauseCompactionChan(mock.Anything, mock.Anything).Return(compactionChan).Maybe()
l0CompactionTrigger.EXPECT().GetResumeCompactionChan(mock.Anything, mock.Anything).Return(compactionChan).Maybe()
checker := NewImportChecker(meta, broker, s.alloc, imeta, sjm, l0CompactionTrigger).(*importChecker)
checker := NewImportChecker(context.TODO(), meta, broker, s.alloc, importMeta, sjm, l0CompactionTrigger).(*importChecker)
s.checker = checker
job := &importJob{
@ -126,13 +126,13 @@ func (s *ImportCheckerSuite) SetupTest() {
}
catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil)
err = s.imeta.AddJob(context.TODO(), job)
err = s.importMeta.AddJob(context.TODO(), job)
s.NoError(err)
s.jobID = job.GetJobID()
}
func (s *ImportCheckerSuite) TestLogStats() {
catalog := s.imeta.(*importMeta).catalog.(*mocks.DataCoordCatalog)
catalog := s.importMeta.(*importMeta).catalog.(*mocks.DataCoordCatalog)
catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(nil)
catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil)
@ -145,7 +145,7 @@ func (s *ImportCheckerSuite) TestLogStats() {
tr: timerecord.NewTimeRecorder("preimport task"),
}
pit1.task.Store(preImportTaskProto)
err := s.imeta.AddTask(context.TODO(), pit1)
err := s.importMeta.AddTask(context.TODO(), pit1)
s.NoError(err)
importTaskProto := &datapb.ImportTaskV2{
@ -158,14 +158,14 @@ func (s *ImportCheckerSuite) TestLogStats() {
tr: timerecord.NewTimeRecorder("import task"),
}
it1.task.Store(importTaskProto)
err = s.imeta.AddTask(context.TODO(), it1)
err = s.importMeta.AddTask(context.TODO(), it1)
s.NoError(err)
s.checker.LogTaskStats()
}
func (s *ImportCheckerSuite) TestCheckJob() {
job := s.imeta.GetJob(context.TODO(), s.jobID)
job := s.importMeta.GetJob(context.TODO(), s.jobID)
// test checkPendingJob
alloc := s.alloc
@ -173,39 +173,39 @@ func (s *ImportCheckerSuite) TestCheckJob() {
id := rand.Int63()
return id, id + n, nil
})
catalog := s.imeta.(*importMeta).catalog.(*mocks.DataCoordCatalog)
catalog := s.importMeta.(*importMeta).catalog.(*mocks.DataCoordCatalog)
catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(nil)
s.checker.checkPendingJob(job)
preimportTasks := s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType))
preimportTasks := s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType))
s.Equal(2, len(preimportTasks))
s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
s.Equal(internalpb.ImportJobState_PreImporting, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState())
s.checker.checkPendingJob(job) // no lack
preimportTasks = s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType))
preimportTasks = s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType))
s.Equal(2, len(preimportTasks))
s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
s.Equal(internalpb.ImportJobState_PreImporting, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState())
// test checkPreImportingJob
catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil)
for _, t := range preimportTasks {
err := s.imeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed))
err := s.importMeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed))
s.NoError(err)
}
s.checker.checkPreImportingJob(job)
importTasks := s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType))
importTasks := s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType))
s.Equal(1, len(importTasks))
s.Equal(internalpb.ImportJobState_Importing, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
s.Equal(internalpb.ImportJobState_Importing, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState())
s.checker.checkPreImportingJob(job) // no lack
importTasks = s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType))
importTasks = s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType))
s.Equal(1, len(importTasks))
s.Equal(internalpb.ImportJobState_Importing, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
s.Equal(internalpb.ImportJobState_Importing, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState())
// test checkImportingJob
s.checker.checkImportingJob(job)
s.Equal(internalpb.ImportJobState_Importing, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
s.Equal(internalpb.ImportJobState_Importing, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState())
for _, t := range importTasks {
task := s.imeta.GetTask(context.TODO(), t.GetTaskID())
task := s.importMeta.GetTask(context.TODO(), t.GetTaskID())
for _, id := range task.(*importTask).GetSegmentIDs() {
segment := s.checker.meta.GetSegment(context.TODO(), id)
s.Equal(true, segment.GetIsImporting())
@ -225,14 +225,14 @@ func (s *ImportCheckerSuite) TestCheckJob() {
}
err := s.checker.meta.AddSegment(context.Background(), segment)
s.NoError(err)
err = s.imeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed),
err = s.importMeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed),
UpdateSegmentIDs([]int64{segment.GetID()}), UpdateStatsSegmentIDs([]int64{rand.Int63()}))
s.NoError(err)
err = s.checker.meta.UpdateChannelCheckpoint(context.TODO(), segment.GetInsertChannel(), &msgpb.MsgPosition{MsgID: []byte{0}})
s.NoError(err)
}
s.checker.checkImportingJob(job)
s.Equal(internalpb.ImportJobState_Stats, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
s.Equal(internalpb.ImportJobState_Stats, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState())
// test check stats job
alloc.EXPECT().AllocID(mock.Anything).Return(rand.Int63(), nil).Maybe()
@ -242,55 +242,55 @@ func (s *ImportCheckerSuite) TestCheckJob() {
State: indexpb.JobState_JobStateNone,
})
s.checker.checkStatsJob(job)
s.Equal(internalpb.ImportJobState_Stats, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
s.Equal(internalpb.ImportJobState_Stats, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState())
sjm = NewMockStatsJobManager(s.T())
sjm.EXPECT().GetStatsTask(mock.Anything, mock.Anything).Return(&indexpb.StatsTask{
State: indexpb.JobState_JobStateInProgress,
})
s.checker.si = sjm
s.checker.checkStatsJob(job)
s.Equal(internalpb.ImportJobState_Stats, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
s.Equal(internalpb.ImportJobState_Stats, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState())
sjm = NewMockStatsJobManager(s.T())
sjm.EXPECT().GetStatsTask(mock.Anything, mock.Anything).Return(&indexpb.StatsTask{
State: indexpb.JobState_JobStateFinished,
})
s.checker.si = sjm
s.checker.checkStatsJob(job)
s.Equal(internalpb.ImportJobState_IndexBuilding, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
s.Equal(internalpb.ImportJobState_IndexBuilding, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState())
// test check IndexBuilding job
s.checker.checkIndexBuildingJob(job)
for _, t := range importTasks {
task := s.imeta.GetTask(context.TODO(), t.GetTaskID())
task := s.importMeta.GetTask(context.TODO(), t.GetTaskID())
for _, id := range task.(*importTask).GetSegmentIDs() {
segment := s.checker.meta.GetSegment(context.TODO(), id)
s.Equal(false, segment.GetIsImporting())
}
}
s.Equal(internalpb.ImportJobState_Completed, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
s.Equal(internalpb.ImportJobState_Completed, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState())
}
func (s *ImportCheckerSuite) TestCheckJob_Failed() {
mockErr := errors.New("mock err")
job := s.imeta.GetJob(context.TODO(), s.jobID)
job := s.importMeta.GetJob(context.TODO(), s.jobID)
// test checkPendingJob
alloc := s.alloc
alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, nil)
catalog := s.imeta.(*importMeta).catalog.(*mocks.DataCoordCatalog)
catalog := s.importMeta.(*importMeta).catalog.(*mocks.DataCoordCatalog)
catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(mockErr)
s.checker.checkPendingJob(job)
preimportTasks := s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType))
preimportTasks := s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType))
s.Equal(0, len(preimportTasks))
s.Equal(internalpb.ImportJobState_Pending, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
s.Equal(internalpb.ImportJobState_Pending, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState())
alloc.ExpectedCalls = nil
alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, mockErr)
s.checker.checkPendingJob(job)
preimportTasks = s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType))
preimportTasks = s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType))
s.Equal(0, len(preimportTasks))
s.Equal(internalpb.ImportJobState_Pending, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
s.Equal(internalpb.ImportJobState_Pending, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState())
alloc.ExpectedCalls = nil
alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, nil)
@ -298,13 +298,13 @@ func (s *ImportCheckerSuite) TestCheckJob_Failed() {
catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil)
catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(nil)
s.checker.checkPendingJob(job)
preimportTasks = s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType))
preimportTasks = s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType))
s.Equal(2, len(preimportTasks))
s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
s.Equal(internalpb.ImportJobState_PreImporting, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState())
// test checkPreImportingJob
for _, t := range preimportTasks {
err := s.imeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed))
err := s.importMeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed))
s.NoError(err)
}
@ -312,18 +312,18 @@ func (s *ImportCheckerSuite) TestCheckJob_Failed() {
catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(mockErr)
catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil)
s.checker.checkPreImportingJob(job)
importTasks := s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType))
importTasks := s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType))
s.Equal(0, len(importTasks))
s.Equal(internalpb.ImportJobState_Failed, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
s.Equal(internalpb.ImportJobState_Failed, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState())
alloc.ExpectedCalls = nil
alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, mockErr)
err := s.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting))
err := s.importMeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting))
s.NoError(err)
s.checker.checkPreImportingJob(job)
importTasks = s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType))
importTasks = s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType))
s.Equal(0, len(importTasks))
s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
s.Equal(internalpb.ImportJobState_PreImporting, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState())
catalog.ExpectedCalls = nil
catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil)
@ -331,13 +331,13 @@ func (s *ImportCheckerSuite) TestCheckJob_Failed() {
alloc.ExpectedCalls = nil
alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, nil)
s.checker.checkPreImportingJob(job)
importTasks = s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType))
importTasks = s.importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType))
s.Equal(1, len(importTasks))
s.Equal(internalpb.ImportJobState_Importing, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
s.Equal(internalpb.ImportJobState_Importing, s.importMeta.GetJob(context.TODO(), job.GetJobID()).GetState())
}
func (s *ImportCheckerSuite) TestCheckTimeout() {
catalog := s.imeta.(*importMeta).catalog.(*mocks.DataCoordCatalog)
catalog := s.importMeta.(*importMeta).catalog.(*mocks.DataCoordCatalog)
catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(nil)
taskProto := &datapb.PreImportTask{
@ -348,17 +348,17 @@ func (s *ImportCheckerSuite) TestCheckTimeout() {
tr: timerecord.NewTimeRecorder("preimport task"),
}
task.task.Store(taskProto)
err := s.imeta.AddTask(context.TODO(), task)
err := s.importMeta.AddTask(context.TODO(), task)
s.NoError(err)
s.checker.tryTimeoutJob(s.imeta.GetJob(context.TODO(), s.jobID))
s.checker.tryTimeoutJob(s.importMeta.GetJob(context.TODO(), s.jobID))
job := s.imeta.GetJob(context.TODO(), s.jobID)
job := s.importMeta.GetJob(context.TODO(), s.jobID)
s.Equal(internalpb.ImportJobState_Failed, job.GetState())
s.Equal("import timeout", job.GetReason())
}
func (s *ImportCheckerSuite) TestCheckFailure() {
catalog := s.imeta.(*importMeta).catalog.(*mocks.DataCoordCatalog)
catalog := s.importMeta.(*importMeta).catalog.(*mocks.DataCoordCatalog)
catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil)
taskProto := &datapb.ImportTaskV2{
@ -372,35 +372,35 @@ func (s *ImportCheckerSuite) TestCheckFailure() {
tr: timerecord.NewTimeRecorder("import task"),
}
it.task.Store(taskProto)
err := s.imeta.AddTask(context.TODO(), it)
err := s.importMeta.AddTask(context.TODO(), it)
s.NoError(err)
sjm := NewMockStatsJobManager(s.T())
sjm.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(errors.New("mock err"))
s.checker.si = sjm
s.checker.checkFailedJob(s.imeta.GetJob(context.TODO(), s.jobID))
tasks := s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID), WithStates(datapb.ImportTaskStateV2_Failed))
s.checker.checkFailedJob(s.importMeta.GetJob(context.TODO(), s.jobID))
tasks := s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID), WithStates(datapb.ImportTaskStateV2_Failed))
s.Equal(0, len(tasks))
sjm.ExpectedCalls = nil
sjm.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(nil)
catalog.ExpectedCalls = nil
catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(errors.New("mock error"))
s.checker.checkFailedJob(s.imeta.GetJob(context.TODO(), s.jobID))
tasks = s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID), WithStates(datapb.ImportTaskStateV2_Failed))
s.checker.checkFailedJob(s.importMeta.GetJob(context.TODO(), s.jobID))
tasks = s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID), WithStates(datapb.ImportTaskStateV2_Failed))
s.Equal(0, len(tasks))
catalog.ExpectedCalls = nil
catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil)
s.checker.checkFailedJob(s.imeta.GetJob(context.TODO(), s.jobID))
tasks = s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID), WithStates(datapb.ImportTaskStateV2_Failed))
s.checker.checkFailedJob(s.importMeta.GetJob(context.TODO(), s.jobID))
tasks = s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID), WithStates(datapb.ImportTaskStateV2_Failed))
s.Equal(1, len(tasks))
}
func (s *ImportCheckerSuite) TestCheckGC() {
mockErr := errors.New("mock err")
catalog := s.imeta.(*importMeta).catalog.(*mocks.DataCoordCatalog)
catalog := s.importMeta.(*importMeta).catalog.(*mocks.DataCoordCatalog)
catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil)
taskProto := &datapb.ImportTaskV2{
@ -415,74 +415,74 @@ func (s *ImportCheckerSuite) TestCheckGC() {
tr: timerecord.NewTimeRecorder("import task"),
}
task.task.Store(taskProto)
err := s.imeta.AddTask(context.TODO(), task)
err := s.importMeta.AddTask(context.TODO(), task)
s.NoError(err)
// not failed or completed
s.checker.checkGC(s.imeta.GetJob(context.TODO(), s.jobID))
s.Equal(1, len(s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID))))
s.Equal(1, len(s.imeta.GetJobBy(context.TODO())))
s.checker.checkGC(s.importMeta.GetJob(context.TODO(), s.jobID))
s.Equal(1, len(s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID))))
s.Equal(1, len(s.importMeta.GetJobBy(context.TODO())))
catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil)
err = s.imeta.UpdateJob(context.TODO(), s.jobID, UpdateJobState(internalpb.ImportJobState_Failed))
err = s.importMeta.UpdateJob(context.TODO(), s.jobID, UpdateJobState(internalpb.ImportJobState_Failed))
s.NoError(err)
// not reach cleanup ts
s.checker.checkGC(s.imeta.GetJob(context.TODO(), s.jobID))
s.Equal(1, len(s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID))))
s.Equal(1, len(s.imeta.GetJobBy(context.TODO())))
s.checker.checkGC(s.importMeta.GetJob(context.TODO(), s.jobID))
s.Equal(1, len(s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID))))
s.Equal(1, len(s.importMeta.GetJobBy(context.TODO())))
GCRetention := Params.DataCoordCfg.ImportTaskRetention.GetAsDuration(time.Second)
job := s.imeta.GetJob(context.TODO(), s.jobID)
job := s.importMeta.GetJob(context.TODO(), s.jobID)
job.(*importJob).CleanupTs = tsoutil.AddPhysicalDurationOnTs(job.GetCleanupTs(), GCRetention*-2)
err = s.imeta.AddJob(context.TODO(), job)
err = s.importMeta.AddJob(context.TODO(), job)
s.NoError(err)
// origin segment not dropped
s.checker.checkGC(s.imeta.GetJob(context.TODO(), s.jobID))
s.Equal(1, len(s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID))))
s.Equal(1, len(s.imeta.GetJobBy(context.TODO())))
err = s.imeta.UpdateTask(context.TODO(), task.GetTaskID(), UpdateSegmentIDs([]int64{}))
s.checker.checkGC(s.importMeta.GetJob(context.TODO(), s.jobID))
s.Equal(1, len(s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID))))
s.Equal(1, len(s.importMeta.GetJobBy(context.TODO())))
err = s.importMeta.UpdateTask(context.TODO(), task.GetTaskID(), UpdateSegmentIDs([]int64{}))
s.NoError(err)
// stats segment not dropped
s.checker.checkGC(s.imeta.GetJob(context.TODO(), s.jobID))
s.Equal(1, len(s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID))))
s.Equal(1, len(s.imeta.GetJobBy(context.TODO())))
err = s.imeta.UpdateTask(context.TODO(), task.GetTaskID(), UpdateStatsSegmentIDs([]int64{}))
s.checker.checkGC(s.importMeta.GetJob(context.TODO(), s.jobID))
s.Equal(1, len(s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID))))
s.Equal(1, len(s.importMeta.GetJobBy(context.TODO())))
err = s.importMeta.UpdateTask(context.TODO(), task.GetTaskID(), UpdateStatsSegmentIDs([]int64{}))
s.NoError(err)
// task is not dropped
s.checker.checkGC(s.imeta.GetJob(context.TODO(), s.jobID))
s.Equal(1, len(s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID))))
s.Equal(1, len(s.imeta.GetJobBy(context.TODO())))
err = s.imeta.UpdateTask(context.TODO(), task.GetTaskID(), UpdateNodeID(NullNodeID))
s.checker.checkGC(s.importMeta.GetJob(context.TODO(), s.jobID))
s.Equal(1, len(s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID))))
s.Equal(1, len(s.importMeta.GetJobBy(context.TODO())))
err = s.importMeta.UpdateTask(context.TODO(), task.GetTaskID(), UpdateNodeID(NullNodeID))
s.NoError(err)
// remove task failed
catalog.EXPECT().DropImportTask(mock.Anything, mock.Anything).Return(mockErr)
s.checker.checkGC(s.imeta.GetJob(context.TODO(), s.jobID))
s.Equal(1, len(s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID))))
s.Equal(1, len(s.imeta.GetJobBy(context.TODO())))
s.checker.checkGC(s.importMeta.GetJob(context.TODO(), s.jobID))
s.Equal(1, len(s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID))))
s.Equal(1, len(s.importMeta.GetJobBy(context.TODO())))
// remove job failed
catalog.ExpectedCalls = nil
catalog.EXPECT().DropImportTask(mock.Anything, mock.Anything).Return(nil)
catalog.EXPECT().DropImportJob(mock.Anything, mock.Anything).Return(mockErr)
s.checker.checkGC(s.imeta.GetJob(context.TODO(), s.jobID))
s.Equal(0, len(s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID))))
s.Equal(1, len(s.imeta.GetJobBy(context.TODO())))
s.checker.checkGC(s.importMeta.GetJob(context.TODO(), s.jobID))
s.Equal(0, len(s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID))))
s.Equal(1, len(s.importMeta.GetJobBy(context.TODO())))
// normal case
catalog.ExpectedCalls = nil
catalog.EXPECT().DropImportJob(mock.Anything, mock.Anything).Return(nil)
s.checker.checkGC(s.imeta.GetJob(context.TODO(), s.jobID))
s.Equal(0, len(s.imeta.GetTaskBy(context.TODO(), WithJob(s.jobID))))
s.Equal(0, len(s.imeta.GetJobBy(context.TODO())))
s.checker.checkGC(s.importMeta.GetJob(context.TODO(), s.jobID))
s.Equal(0, len(s.importMeta.GetTaskBy(context.TODO(), WithJob(s.jobID))))
s.Equal(0, len(s.importMeta.GetJobBy(context.TODO())))
}
func (s *ImportCheckerSuite) TestCheckCollection() {
mockErr := errors.New("mock err")
catalog := s.imeta.(*importMeta).catalog.(*mocks.DataCoordCatalog)
catalog := s.importMeta.(*importMeta).catalog.(*mocks.DataCoordCatalog)
catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(nil)
taskProto := &datapb.PreImportTask{
@ -494,25 +494,25 @@ func (s *ImportCheckerSuite) TestCheckCollection() {
tr: timerecord.NewTimeRecorder("preimport task"),
}
task.task.Store(taskProto)
err := s.imeta.AddTask(context.TODO(), task)
err := s.importMeta.AddTask(context.TODO(), task)
s.NoError(err)
// no jobs
s.checker.checkCollection(1, []ImportJob{})
s.Equal(internalpb.ImportJobState_Pending, s.imeta.GetJob(context.TODO(), s.jobID).GetState())
s.Equal(internalpb.ImportJobState_Pending, s.importMeta.GetJob(context.TODO(), s.jobID).GetState())
// collection exist
broker := s.checker.broker.(*broker2.MockBroker)
broker.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(true, nil)
s.checker.checkCollection(1, []ImportJob{s.imeta.GetJob(context.TODO(), s.jobID)})
s.Equal(internalpb.ImportJobState_Pending, s.imeta.GetJob(context.TODO(), s.jobID).GetState())
s.checker.checkCollection(1, []ImportJob{s.importMeta.GetJob(context.TODO(), s.jobID)})
s.Equal(internalpb.ImportJobState_Pending, s.importMeta.GetJob(context.TODO(), s.jobID).GetState())
// HasCollection failed
s.checker.broker = broker2.NewMockBroker(s.T())
broker = s.checker.broker.(*broker2.MockBroker)
broker.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(true, mockErr)
s.checker.checkCollection(1, []ImportJob{s.imeta.GetJob(context.TODO(), s.jobID)})
s.Equal(internalpb.ImportJobState_Pending, s.imeta.GetJob(context.TODO(), s.jobID).GetState())
s.checker.checkCollection(1, []ImportJob{s.importMeta.GetJob(context.TODO(), s.jobID)})
s.Equal(internalpb.ImportJobState_Pending, s.importMeta.GetJob(context.TODO(), s.jobID).GetState())
// SaveImportJob failed
s.checker.broker = broker2.NewMockBroker(s.T())
@ -520,8 +520,8 @@ func (s *ImportCheckerSuite) TestCheckCollection() {
broker.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(false, nil)
catalog.ExpectedCalls = nil
catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(mockErr)
s.checker.checkCollection(1, []ImportJob{s.imeta.GetJob(context.TODO(), s.jobID)})
s.Equal(internalpb.ImportJobState_Pending, s.imeta.GetJob(context.TODO(), s.jobID).GetState())
s.checker.checkCollection(1, []ImportJob{s.importMeta.GetJob(context.TODO(), s.jobID)})
s.Equal(internalpb.ImportJobState_Pending, s.importMeta.GetJob(context.TODO(), s.jobID).GetState())
// collection dropped
s.checker.broker = broker2.NewMockBroker(s.T())
@ -529,8 +529,8 @@ func (s *ImportCheckerSuite) TestCheckCollection() {
broker.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(false, nil)
catalog.ExpectedCalls = nil
catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil)
s.checker.checkCollection(1, []ImportJob{s.imeta.GetJob(context.TODO(), s.jobID)})
s.Equal(internalpb.ImportJobState_Failed, s.imeta.GetJob(context.TODO(), s.jobID).GetState())
s.checker.checkCollection(1, []ImportJob{s.importMeta.GetJob(context.TODO(), s.jobID)})
s.Equal(internalpb.ImportJobState_Failed, s.importMeta.GetJob(context.TODO(), s.jobID).GetState())
}
func TestImportChecker(t *testing.T) {
@ -565,7 +565,7 @@ func TestImportCheckerCompaction(t *testing.T) {
meta, err := newMeta(context.TODO(), catalog, nil, broker)
assert.NoError(t, err)
imeta, err := NewImportMeta(context.TODO(), catalog, alloc, meta)
importMeta, err := NewImportMeta(context.TODO(), catalog, alloc, meta)
assert.NoError(t, err)
sjm := NewMockStatsJobManager(t)
@ -575,7 +575,7 @@ func TestImportCheckerCompaction(t *testing.T) {
l0CompactionTrigger.EXPECT().GetPauseCompactionChan(mock.Anything, mock.Anything).Return(compactionChan).Maybe()
l0CompactionTrigger.EXPECT().GetResumeCompactionChan(mock.Anything, mock.Anything).Return(compactionChan).Maybe()
checker := NewImportChecker(meta, broker, alloc, imeta, sjm, l0CompactionTrigger).(*importChecker)
checker := NewImportChecker(context.TODO(), meta, broker, alloc, importMeta, sjm, l0CompactionTrigger).(*importChecker)
job := &importJob{
ImportJob: &datapb.ImportJob{
@ -615,7 +615,7 @@ func TestImportCheckerCompaction(t *testing.T) {
tr: timerecord.NewTimeRecorder("import job"),
}
catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil).Once()
err = imeta.AddJob(context.TODO(), job)
err = importMeta.AddJob(context.TODO(), job)
assert.NoError(t, err)
jobID := job.GetJobID()
@ -662,7 +662,7 @@ func TestImportCheckerCompaction(t *testing.T) {
},
tr: timerecord.NewTimeRecorder("import job"),
}
err = imeta.AddJob(context.TODO(), job2)
err = importMeta.AddJob(context.TODO(), job2)
assert.NoError(t, err)
log.Info("job ready")
@ -675,8 +675,8 @@ func TestImportCheckerCompaction(t *testing.T) {
catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(nil).Twice()
catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil).Once()
assert.Eventually(t, func() bool {
job := imeta.GetJob(context.TODO(), jobID)
preimportTasks := imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType))
job := importMeta.GetJob(context.TODO(), jobID)
preimportTasks := importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType))
taskLen := len(preimportTasks)
log.Info("job pre-importing", zap.Any("taskLen", taskLen), zap.Any("jobState", job.GetState()))
return taskLen == 2 && job.GetState() == internalpb.ImportJobState_PreImporting
@ -687,14 +687,14 @@ func TestImportCheckerCompaction(t *testing.T) {
catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil).Once()
catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(nil).Twice()
catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil).Once()
preimportTasks := imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType))
preimportTasks := importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType))
for _, pt := range preimportTasks {
err := imeta.UpdateTask(context.TODO(), pt.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed))
err := importMeta.UpdateTask(context.TODO(), pt.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed))
assert.NoError(t, err)
}
assert.Eventually(t, func() bool {
job := imeta.GetJob(context.TODO(), jobID)
importTasks := imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType))
job := importMeta.GetJob(context.TODO(), jobID)
importTasks := importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType))
return len(importTasks) == 1 && job.GetState() == internalpb.ImportJobState_Importing
}, 2*time.Second, 100*time.Millisecond)
log.Info("job importing")
@ -705,7 +705,7 @@ func TestImportCheckerCompaction(t *testing.T) {
catalog.EXPECT().SaveChannelCheckpoint(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil).Once()
catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil).Once()
importTasks := imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType))
importTasks := importMeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType))
for _, it := range importTasks {
segment := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
@ -717,14 +717,14 @@ func TestImportCheckerCompaction(t *testing.T) {
}
err := checker.meta.AddSegment(context.Background(), segment)
assert.NoError(t, err)
err = imeta.UpdateTask(context.TODO(), it.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed),
err = importMeta.UpdateTask(context.TODO(), it.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed),
UpdateSegmentIDs([]int64{segment.GetID()}), UpdateStatsSegmentIDs([]int64{rand.Int63()}))
assert.NoError(t, err)
err = checker.meta.UpdateChannelCheckpoint(context.TODO(), segment.GetInsertChannel(), &msgpb.MsgPosition{MsgID: []byte{0}})
assert.NoError(t, err)
}
assert.Eventually(t, func() bool {
job := imeta.GetJob(context.TODO(), jobID)
job := importMeta.GetJob(context.TODO(), jobID)
return job.GetState() == internalpb.ImportJobState_Stats
}, 2*time.Second, 100*time.Millisecond)
log.Info("job stats")
@ -735,7 +735,7 @@ func TestImportCheckerCompaction(t *testing.T) {
State: indexpb.JobState_JobStateFinished,
}).Once()
assert.Eventually(t, func() bool {
job := imeta.GetJob(context.TODO(), jobID)
job := importMeta.GetJob(context.TODO(), jobID)
return job.GetState() == internalpb.ImportJobState_IndexBuilding
}, 2*time.Second, 100*time.Millisecond)
log.Info("job index building")
@ -750,16 +750,16 @@ func TestImportCheckerCompaction(t *testing.T) {
}
task := &importTask{}
task.task.Store(taskProto)
imeta.AddTask(context.TODO(), task)
importMeta.AddTask(context.TODO(), task)
time.Sleep(1200 * time.Millisecond)
catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil).Once()
imeta.UpdateTask(context.TODO(), 100000, UpdateState(datapb.ImportTaskStateV2_Completed))
importMeta.UpdateTask(context.TODO(), 100000, UpdateState(datapb.ImportTaskStateV2_Completed))
log.Info("job l0 compaction")
// check index building
catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil).Once()
assert.Eventually(t, func() bool {
job := imeta.GetJob(context.TODO(), jobID)
job := importMeta.GetJob(context.TODO(), jobID)
return job.GetState() == internalpb.ImportJobState_Completed
}, 2*time.Second, 100*time.Millisecond)
log.Info("job completed")

View File

@ -41,32 +41,34 @@ type ImportInspector interface {
}
type importInspector struct {
meta *meta
alloc allocator.Allocator
imeta ImportMeta
scheduler task.GlobalScheduler
ctx context.Context
meta *meta
alloc allocator.Allocator
importMeta ImportMeta
scheduler task.GlobalScheduler
closeOnce sync.Once
closeChan chan struct{}
}
func NewImportInspector(meta *meta, imeta ImportMeta, scheduler task.GlobalScheduler) ImportInspector {
func NewImportInspector(ctx context.Context, meta *meta, importMeta ImportMeta, scheduler task.GlobalScheduler) ImportInspector {
return &importInspector{
meta: meta,
imeta: imeta,
scheduler: scheduler,
closeChan: make(chan struct{}),
ctx: ctx,
meta: meta,
importMeta: importMeta,
scheduler: scheduler,
closeChan: make(chan struct{}),
}
}
func (s *importInspector) Start() {
log.Ctx(context.TODO()).Info("start import inspector")
log.Ctx(s.ctx).Info("start import inspector")
ticker := time.NewTicker(Params.DataCoordCfg.ImportScheduleInterval.GetAsDuration(time.Second))
defer ticker.Stop()
for {
select {
case <-s.closeChan:
log.Ctx(context.TODO()).Info("import inspector exited")
log.Ctx(s.ctx).Info("import inspector exited")
return
case <-ticker.C:
s.inspect()
@ -81,12 +83,12 @@ func (s *importInspector) Close() {
}
func (s *importInspector) inspect() {
jobs := s.imeta.GetJobBy(context.TODO())
jobs := s.importMeta.GetJobBy(s.ctx)
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].GetJobID() < jobs[j].GetJobID()
})
for _, job := range jobs {
tasks := s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()))
tasks := s.importMeta.GetTaskBy(s.ctx, WithJob(job.GetJobID()))
for _, task := range tasks {
switch task.GetState() {
case datapb.ImportTaskStateV2_Pending:
@ -118,16 +120,16 @@ func (s *importInspector) processFailed(task ImportTask) {
segments := append(originSegmentIDs, statsSegmentIDs...)
for _, segment := range segments {
op := UpdateStatusOperator(segment, commonpb.SegmentState_Dropped)
err := s.meta.UpdateSegmentsInfo(context.TODO(), op)
err := s.meta.UpdateSegmentsInfo(s.ctx, op)
if err != nil {
log.Ctx(context.TODO()).Warn("drop import segment failed", WrapTaskLog(task, zap.Int64("segment", segment), zap.Error(err))...)
log.Ctx(s.ctx).Warn("drop import segment failed", WrapTaskLog(task, zap.Int64("segment", segment), zap.Error(err))...)
return
}
}
if len(segments) > 0 {
err := s.imeta.UpdateTask(context.TODO(), task.GetTaskID(), UpdateSegmentIDs(nil), UpdateStatsSegmentIDs(nil))
err := s.importMeta.UpdateTask(s.ctx, task.GetTaskID(), UpdateSegmentIDs(nil), UpdateStatsSegmentIDs(nil))
if err != nil {
log.Ctx(context.TODO()).Warn("update import task segments failed", WrapTaskLog(task, zap.Error(err))...)
log.Ctx(s.ctx).Warn("update import task segments failed", WrapTaskLog(task, zap.Error(err))...)
}
}
}

View File

@ -40,12 +40,12 @@ type ImportInspectorSuite struct {
collectionID int64
catalog *mocks.DataCoordCatalog
alloc *allocator.MockAllocator
cluster *MockCluster
meta *meta
imeta ImportMeta
inspector *importInspector
catalog *mocks.DataCoordCatalog
alloc *allocator.MockAllocator
cluster *MockCluster
meta *meta
importMeta ImportMeta
inspector *importInspector
}
func (s *ImportInspectorSuite) SetupTest() {
@ -75,10 +75,10 @@ func (s *ImportInspectorSuite) SetupTest() {
ID: s.collectionID,
Schema: newTestSchema(),
})
s.imeta, err = NewImportMeta(context.TODO(), s.catalog, s.alloc, s.meta)
s.importMeta, err = NewImportMeta(context.TODO(), s.catalog, s.alloc, s.meta)
s.NoError(err)
scheduler := task2.NewMockGlobalScheduler(s.T())
s.inspector = NewImportInspector(s.meta, s.imeta, scheduler).(*importInspector)
s.inspector = NewImportInspector(context.TODO(), s.meta, s.importMeta, scheduler).(*importInspector)
}
func (s *ImportInspectorSuite) TestProcessPreImport() {
@ -93,11 +93,11 @@ func (s *ImportInspectorSuite) TestProcessPreImport() {
}
var task ImportTask = &preImportTask{
imeta: s.imeta,
tr: timerecord.NewTimeRecorder("preimport task"),
importMeta: s.importMeta,
tr: timerecord.NewTimeRecorder("preimport task"),
}
task.(*preImportTask).task.Store(taskProto)
err := s.imeta.AddTask(context.TODO(), task)
err := s.importMeta.AddTask(context.TODO(), task)
s.NoError(err)
var job ImportJob = &importJob{
ImportJob: &datapb.ImportJob{
@ -108,7 +108,7 @@ func (s *ImportInspectorSuite) TestProcessPreImport() {
},
tr: timerecord.NewTimeRecorder("import job"),
}
err = s.imeta.AddJob(context.TODO(), job)
err = s.importMeta.AddJob(context.TODO(), job)
s.NoError(err)
// pending -> inProgress
@ -118,7 +118,7 @@ func (s *ImportInspectorSuite) TestProcessPreImport() {
task.CreateTaskOnWorker(1, cluster)
})
s.inspector.inspect()
task = s.imeta.GetTask(context.TODO(), task.GetTaskID())
task = s.importMeta.GetTask(context.TODO(), task.GetTaskID())
s.Equal(datapb.ImportTaskStateV2_InProgress, task.GetState())
// inProgress -> completed
@ -126,7 +126,7 @@ func (s *ImportInspectorSuite) TestProcessPreImport() {
State: datapb.ImportTaskStateV2_Completed,
}, nil)
task.QueryTaskOnWorker(cluster)
task = s.imeta.GetTask(context.TODO(), task.GetTaskID())
task = s.importMeta.GetTask(context.TODO(), task.GetTaskID())
s.Equal(datapb.ImportTaskStateV2_Completed, task.GetState())
}
@ -156,13 +156,13 @@ func (s *ImportInspectorSuite) TestProcessImport() {
}
var task ImportTask = &importTask{
alloc: s.alloc,
meta: s.meta,
imeta: s.imeta,
tr: timerecord.NewTimeRecorder("import task"),
alloc: s.alloc,
meta: s.meta,
importMeta: s.importMeta,
tr: timerecord.NewTimeRecorder("import task"),
}
task.(*importTask).task.Store(taskProto)
err := s.imeta.AddTask(context.TODO(), task)
err := s.importMeta.AddTask(context.TODO(), task)
s.NoError(err)
var job ImportJob = &importJob{
ImportJob: &datapb.ImportJob{
@ -175,7 +175,7 @@ func (s *ImportInspectorSuite) TestProcessImport() {
},
tr: timerecord.NewTimeRecorder("import job"),
}
err = s.imeta.AddJob(context.TODO(), job)
err = s.importMeta.AddJob(context.TODO(), job)
s.NoError(err)
// pending -> inProgress
@ -188,7 +188,7 @@ func (s *ImportInspectorSuite) TestProcessImport() {
task.CreateTaskOnWorker(nodeID, cluster)
})
s.inspector.inspect()
task = s.imeta.GetTask(context.TODO(), task.GetTaskID())
task = s.importMeta.GetTask(context.TODO(), task.GetTaskID())
s.Equal(datapb.ImportTaskStateV2_InProgress, task.GetState())
// inProgress -> completed
@ -196,7 +196,7 @@ func (s *ImportInspectorSuite) TestProcessImport() {
State: datapb.ImportTaskStateV2_Completed,
}, nil)
task.QueryTaskOnWorker(cluster)
task = s.imeta.GetTask(context.TODO(), task.GetTaskID())
task = s.importMeta.GetTask(context.TODO(), task.GetTaskID())
s.Equal(datapb.ImportTaskStateV2_Completed, task.GetState())
}
@ -218,7 +218,7 @@ func (s *ImportInspectorSuite) TestProcessFailed() {
tr: timerecord.NewTimeRecorder("import task"),
}
task.(*importTask).task.Store(taskProto)
err := s.imeta.AddTask(context.TODO(), task)
err := s.importMeta.AddTask(context.TODO(), task)
s.NoError(err)
var job ImportJob = &importJob{
ImportJob: &datapb.ImportJob{
@ -231,7 +231,7 @@ func (s *ImportInspectorSuite) TestProcessFailed() {
},
tr: timerecord.NewTimeRecorder("import job"),
}
err = s.imeta.AddJob(context.TODO(), job)
err = s.importMeta.AddJob(context.TODO(), job)
s.NoError(err)
s.catalog.EXPECT().AddSegment(mock.Anything, mock.Anything).Return(nil)
@ -253,7 +253,7 @@ func (s *ImportInspectorSuite) TestProcessFailed() {
segment := s.meta.GetSegment(context.TODO(), id)
s.Equal(commonpb.SegmentState_Dropped, segment.GetState())
}
task = s.imeta.GetTask(context.TODO(), task.GetTaskID())
task = s.importMeta.GetTask(context.TODO(), task.GetTaskID())
s.Equal(datapb.ImportTaskStateV2_Failed, task.GetState())
s.Equal(0, len(task.(*importTask).GetSegmentIDs()))
}

View File

@ -111,24 +111,24 @@ func NewImportMeta(ctx context.Context, catalog metastore.DataCoordCatalog, allo
}
tasks := newImportTasks()
imeta := &importMeta{}
importMeta := &importMeta{}
for _, task := range restoredPreImportTasks {
t := &preImportTask{
imeta: imeta,
tr: timerecord.NewTimeRecorder("preimport task"),
times: taskcommon.NewTimes(),
importMeta: importMeta,
tr: timerecord.NewTimeRecorder("preimport task"),
times: taskcommon.NewTimes(),
}
t.task.Store(task)
tasks.add(t)
}
for _, task := range restoredImportTasks {
t := &importTask{
alloc: alloc,
meta: meta,
imeta: imeta,
tr: timerecord.NewTimeRecorder("import task"),
times: taskcommon.NewTimes(),
alloc: alloc,
meta: meta,
importMeta: importMeta,
tr: timerecord.NewTimeRecorder("import task"),
times: taskcommon.NewTimes(),
}
t.task.Store(task)
tasks.add(t)
@ -142,10 +142,10 @@ func NewImportMeta(ctx context.Context, catalog metastore.DataCoordCatalog, allo
}
}
imeta.jobs = jobs
imeta.tasks = tasks
imeta.catalog = catalog
return imeta, nil
importMeta.jobs = jobs
importMeta.tasks = tasks
importMeta.catalog = catalog
return importMeta, nil
}
func (m *importMeta) AddJob(ctx context.Context, job ImportJob) error {

View File

@ -47,7 +47,7 @@ type importTask struct {
alloc allocator.Allocator
meta *meta
imeta ImportMeta
importMeta ImportMeta
tr *timerecord.TimeRecorder
times *taskcommon.Times
retryTimes int64
@ -138,7 +138,7 @@ func (t *importTask) GetTaskSlot() int64 {
func (t *importTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster) {
log.Info("processing pending import task...", WrapTaskLog(t)...)
job := t.imeta.GetJob(context.TODO(), t.GetJobID())
job := t.importMeta.GetJob(context.TODO(), t.GetJobID())
req, err := AssembleImportRequest(t, job, t.meta, t.alloc)
if err != nil {
log.Warn("assemble import request failed", WrapTaskLog(t, zap.Error(err))...)
@ -150,7 +150,7 @@ func (t *importTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster) {
t.retryTimes++
return
}
err = t.imeta.UpdateTask(context.TODO(), t.GetTaskID(),
err = t.importMeta.UpdateTask(context.TODO(), t.GetTaskID(),
UpdateState(datapb.ImportTaskStateV2_InProgress),
UpdateNodeID(nodeID))
if err != nil {
@ -169,7 +169,7 @@ func (t *importTask) QueryTaskOnWorker(cluster session.Cluster) {
}
resp, err := cluster.QueryImport(t.GetNodeID(), req)
if err != nil {
updateErr := t.imeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Pending))
updateErr := t.importMeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Pending))
if updateErr != nil {
log.Warn("failed to update import task state to pending", WrapTaskLog(t, zap.Error(updateErr))...)
}
@ -177,7 +177,7 @@ func (t *importTask) QueryTaskOnWorker(cluster session.Cluster) {
return
}
if resp.GetState() == datapb.ImportTaskStateV2_Failed {
err = t.imeta.UpdateJob(context.TODO(), t.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(resp.GetReason()))
err = t.importMeta.UpdateJob(context.TODO(), t.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(resp.GetReason()))
if err != nil {
log.Warn("failed to update job state to Failed", zap.Int64("jobID", t.GetJobID()), zap.Error(err))
}
@ -222,7 +222,7 @@ func (t *importTask) QueryTaskOnWorker(cluster session.Cluster) {
op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed)
err = t.meta.UpdateSegmentsInfo(context.TODO(), op1, op2)
if err != nil {
updateErr := t.imeta.UpdateJob(context.TODO(), t.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
updateErr := t.importMeta.UpdateJob(context.TODO(), t.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
if updateErr != nil {
log.Warn("failed to update job state to Failed", zap.Int64("jobID", t.GetJobID()), zap.Error(updateErr))
}
@ -231,7 +231,7 @@ func (t *importTask) QueryTaskOnWorker(cluster session.Cluster) {
}
}
completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00")
err = t.imeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), UpdateCompleteTime(completeTime))
err = t.importMeta.UpdateTask(context.TODO(), t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), UpdateCompleteTime(completeTime))
if err != nil {
log.Warn("update import task failed", WrapTaskLog(t, zap.Error(err))...)
return
@ -245,7 +245,7 @@ func (t *importTask) QueryTaskOnWorker(cluster session.Cluster) {
}
func (t *importTask) DropTaskOnWorker(cluster session.Cluster) {
err := DropImportTask(t, cluster, t.imeta)
err := DropImportTask(t, cluster, t.importMeta)
if err != nil {
log.Warn("drop import failed", WrapTaskLog(t, zap.Error(err))...)
return
@ -263,11 +263,11 @@ func (t *importTask) GetTR() *timerecord.TimeRecorder {
func (t *importTask) Clone() ImportTask {
cloned := &importTask{
alloc: t.alloc,
meta: t.meta,
imeta: t.imeta,
tr: t.tr,
times: t.times,
alloc: t.alloc,
meta: t.meta,
importMeta: t.importMeta,
tr: t.tr,
times: t.times,
}
cloned.task.Store(typeutil.Clone(t.task.Load()))
return cloned

View File

@ -95,10 +95,10 @@ func TestImportTask_CreateTaskOnWorker(t *testing.T) {
State: datapb.ImportTaskStateV2_Pending,
}
task := &importTask{
alloc: alloc,
meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()},
imeta: im,
tr: timerecord.NewTimeRecorder(""),
alloc: alloc,
meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()},
importMeta: im,
tr: timerecord.NewTimeRecorder(""),
}
task.task.Store(taskProto)
err = im.AddTask(context.TODO(), task)
@ -139,10 +139,10 @@ func TestImportTask_CreateTaskOnWorker(t *testing.T) {
State: datapb.ImportTaskStateV2_Pending,
}
task := &importTask{
alloc: alloc,
meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()},
imeta: im,
tr: timerecord.NewTimeRecorder(""),
alloc: alloc,
meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()},
importMeta: im,
tr: timerecord.NewTimeRecorder(""),
}
task.task.Store(taskProto)
err = im.AddTask(context.TODO(), task)
@ -184,10 +184,10 @@ func TestImportTask_CreateTaskOnWorker(t *testing.T) {
State: datapb.ImportTaskStateV2_Pending,
}
task := &importTask{
alloc: alloc,
meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()},
imeta: im,
tr: timerecord.NewTimeRecorder(""),
alloc: alloc,
meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()},
importMeta: im,
tr: timerecord.NewTimeRecorder(""),
}
task.task.Store(taskProto)
err = im.AddTask(context.TODO(), task)
@ -234,10 +234,10 @@ func TestImportTask_CreateTaskOnWorker(t *testing.T) {
State: datapb.ImportTaskStateV2_Pending,
}
task := &importTask{
alloc: alloc,
meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()},
imeta: im,
tr: timerecord.NewTimeRecorder(""),
alloc: alloc,
meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()},
importMeta: im,
tr: timerecord.NewTimeRecorder(""),
}
task.task.Store(taskProto)
err = im.AddTask(context.TODO(), task)
@ -270,10 +270,10 @@ func TestImportTask_QueryTaskOnWorker(t *testing.T) {
State: datapb.ImportTaskStateV2_InProgress,
}
task := &importTask{
alloc: nil,
meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()},
imeta: im,
tr: timerecord.NewTimeRecorder(""),
alloc: nil,
meta: &meta{collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo]()},
importMeta: im,
tr: timerecord.NewTimeRecorder(""),
}
task.task.Store(taskProto)
err = im.AddTask(context.TODO(), task)
@ -322,8 +322,8 @@ func TestImportTask_QueryTaskOnWorker(t *testing.T) {
collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](),
segments: NewSegmentsInfo(),
},
imeta: im,
tr: timerecord.NewTimeRecorder(""),
importMeta: im,
tr: timerecord.NewTimeRecorder(""),
}
task.task.Store(taskProto)
err = im.AddTask(context.TODO(), task)
@ -363,8 +363,8 @@ func TestImportTask_QueryTaskOnWorker(t *testing.T) {
collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](),
segments: NewSegmentsInfo(),
},
imeta: im,
tr: timerecord.NewTimeRecorder(""),
importMeta: im,
tr: timerecord.NewTimeRecorder(""),
}
task.task.Store(taskProto)
err = im.AddTask(context.TODO(), task)
@ -431,8 +431,8 @@ func TestImportTask_QueryTaskOnWorker(t *testing.T) {
collections: typeutil.NewConcurrentMap[UniqueID, *collectionInfo](),
segments: NewSegmentsInfo(),
},
imeta: im,
tr: timerecord.NewTimeRecorder(""),
importMeta: im,
tr: timerecord.NewTimeRecorder(""),
}
task.task.Store(taskProto)
err = im.AddTask(context.TODO(), task)
@ -496,9 +496,9 @@ func TestImportTask_DropTaskOnWorker(t *testing.T) {
State: datapb.ImportTaskStateV2_Completed,
}
task := &importTask{
alloc: nil,
imeta: im,
tr: timerecord.NewTimeRecorder(""),
alloc: nil,
importMeta: im,
tr: timerecord.NewTimeRecorder(""),
}
task.task.Store(taskProto)
err = im.AddTask(context.TODO(), task)
@ -530,9 +530,9 @@ func TestImportTask_DropTaskOnWorker(t *testing.T) {
State: datapb.ImportTaskStateV2_Completed,
}
task := &importTask{
alloc: nil,
imeta: im,
tr: timerecord.NewTimeRecorder(""),
alloc: nil,
importMeta: im,
tr: timerecord.NewTimeRecorder(""),
}
task.task.Store(taskProto)
err = im.AddTask(context.TODO(), task)

View File

@ -41,7 +41,7 @@ var _ ImportTask = (*preImportTask)(nil)
type preImportTask struct {
task atomic.Pointer[datapb.PreImportTask]
imeta ImportMeta
importMeta ImportMeta
tr *timerecord.TimeRecorder
times *taskcommon.Times
retryTimes int64
@ -109,7 +109,7 @@ func (p *preImportTask) GetTaskVersion() int64 {
func (p *preImportTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster) {
log.Info("processing pending preimport task...", WrapTaskLog(p)...)
job := p.imeta.GetJob(context.TODO(), p.GetJobID())
job := p.importMeta.GetJob(context.TODO(), p.GetJobID())
req := AssemblePreImportRequest(p, job)
err := cluster.CreatePreImport(nodeID, req, p.GetTaskSlot())
@ -118,7 +118,7 @@ func (p *preImportTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster
p.retryTimes++
return
}
err = p.imeta.UpdateTask(context.TODO(), p.GetTaskID(),
err = p.importMeta.UpdateTask(context.TODO(), p.GetTaskID(),
UpdateState(datapb.ImportTaskStateV2_InProgress),
UpdateNodeID(nodeID))
if err != nil {
@ -137,7 +137,7 @@ func (p *preImportTask) QueryTaskOnWorker(cluster session.Cluster) {
}
resp, err := cluster.QueryPreImport(p.GetNodeID(), req)
if err != nil {
updateErr := p.imeta.UpdateTask(context.TODO(), p.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Pending))
updateErr := p.importMeta.UpdateTask(context.TODO(), p.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Pending))
if updateErr != nil {
log.Warn("failed to update preimport task state to pending", WrapTaskLog(p, zap.Error(updateErr))...)
}
@ -145,7 +145,7 @@ func (p *preImportTask) QueryTaskOnWorker(cluster session.Cluster) {
return
}
if resp.GetState() == datapb.ImportTaskStateV2_Failed {
err = p.imeta.UpdateJob(context.TODO(), p.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed),
err = p.importMeta.UpdateJob(context.TODO(), p.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed),
UpdateJobReason(resp.GetReason()))
if err != nil {
log.Warn("failed to update job state to Failed", zap.Int64("jobID", p.GetJobID()), zap.Error(err))
@ -157,7 +157,7 @@ func (p *preImportTask) QueryTaskOnWorker(cluster session.Cluster) {
if resp.GetState() == datapb.ImportTaskStateV2_Completed {
actions = append(actions, UpdateState(datapb.ImportTaskStateV2_Completed))
}
err = p.imeta.UpdateTask(context.TODO(), p.GetTaskID(), actions...)
err = p.importMeta.UpdateTask(context.TODO(), p.GetTaskID(), actions...)
if err != nil {
log.Warn("update preimport task failed", WrapTaskLog(p, zap.Error(err))...)
return
@ -172,7 +172,7 @@ func (p *preImportTask) QueryTaskOnWorker(cluster session.Cluster) {
}
func (p *preImportTask) DropTaskOnWorker(cluster session.Cluster) {
err := DropImportTask(p, cluster, p.imeta)
err := DropImportTask(p, cluster, p.importMeta)
if err != nil {
log.Warn("drop import failed", WrapTaskLog(p, zap.Error(err))...)
return
@ -190,9 +190,9 @@ func (p *preImportTask) GetTR() *timerecord.TimeRecorder {
func (p *preImportTask) Clone() ImportTask {
cloned := &preImportTask{
imeta: p.imeta,
tr: p.tr,
times: p.times,
importMeta: p.importMeta,
tr: p.tr,
times: p.times,
}
cloned.task.Store(typeutil.Clone(p.task.Load()))
return cloned

View File

@ -81,8 +81,8 @@ func TestPreImportTask_CreateTaskOnWorker(t *testing.T) {
State: datapb.ImportTaskStateV2_Pending,
}
task := &preImportTask{
imeta: im,
tr: timerecord.NewTimeRecorder(""),
importMeta: im,
tr: timerecord.NewTimeRecorder(""),
}
task.task.Store(taskProto)
err = im.AddTask(context.TODO(), task)
@ -120,8 +120,8 @@ func TestPreImportTask_CreateTaskOnWorker(t *testing.T) {
State: datapb.ImportTaskStateV2_Pending,
}
task := &preImportTask{
imeta: im,
tr: timerecord.NewTimeRecorder(""),
importMeta: im,
tr: timerecord.NewTimeRecorder(""),
}
task.task.Store(taskProto)
err = im.AddTask(context.TODO(), task)
@ -132,7 +132,7 @@ func TestPreImportTask_CreateTaskOnWorker(t *testing.T) {
catalog = mocks.NewDataCoordCatalog(t)
catalog.EXPECT().SavePreImportTask(mock.Anything, mock.Anything).Return(errors.New("mock err"))
task.imeta.(*importMeta).catalog = catalog
task.importMeta.(*importMeta).catalog = catalog
task.CreateTaskOnWorker(1, cluster)
assert.Equal(t, datapb.ImportTaskStateV2_Pending, task.GetState())
})
@ -163,8 +163,8 @@ func TestPreImportTask_CreateTaskOnWorker(t *testing.T) {
State: datapb.ImportTaskStateV2_Pending,
}
task := &preImportTask{
imeta: im,
tr: timerecord.NewTimeRecorder(""),
importMeta: im,
tr: timerecord.NewTimeRecorder(""),
}
task.task.Store(taskProto)
err = im.AddTask(context.TODO(), task)
@ -196,8 +196,8 @@ func TestPreImportTask_QueryTaskOnWorker(t *testing.T) {
State: datapb.ImportTaskStateV2_InProgress,
}
task := &preImportTask{
imeta: im,
tr: timerecord.NewTimeRecorder(""),
importMeta: im,
tr: timerecord.NewTimeRecorder(""),
}
task.task.Store(taskProto)
err = im.AddTask(context.TODO(), task)
@ -236,8 +236,8 @@ func TestPreImportTask_QueryTaskOnWorker(t *testing.T) {
State: datapb.ImportTaskStateV2_InProgress,
}
task := &preImportTask{
imeta: im,
tr: timerecord.NewTimeRecorder(""),
importMeta: im,
tr: timerecord.NewTimeRecorder(""),
}
task.task.Store(taskProto)
err = im.AddTask(context.TODO(), task)
@ -271,8 +271,8 @@ func TestPreImportTask_QueryTaskOnWorker(t *testing.T) {
State: datapb.ImportTaskStateV2_InProgress,
}
task := &preImportTask{
imeta: im,
tr: timerecord.NewTimeRecorder(""),
importMeta: im,
tr: timerecord.NewTimeRecorder(""),
}
task.task.Store(taskProto)
err = im.AddTask(context.TODO(), task)
@ -306,8 +306,8 @@ func TestPreImportTask_DropTaskOnWorker(t *testing.T) {
State: datapb.ImportTaskStateV2_Completed,
}
task := &preImportTask{
imeta: im,
tr: timerecord.NewTimeRecorder(""),
importMeta: im,
tr: timerecord.NewTimeRecorder(""),
}
task.task.Store(taskProto)
err = im.AddTask(context.TODO(), task)
@ -338,8 +338,8 @@ func TestPreImportTask_DropTaskOnWorker(t *testing.T) {
State: datapb.ImportTaskStateV2_Completed,
}
task := &preImportTask{
imeta: im,
tr: timerecord.NewTimeRecorder(""),
importMeta: im,
tr: timerecord.NewTimeRecorder(""),
}
task.task.Store(taskProto)
err = im.AddTask(context.TODO(), task)

View File

@ -59,7 +59,7 @@ func WrapTaskLog(task ImportTask, fields ...zap.Field) []zap.Field {
}
func NewPreImportTasks(fileGroups [][]*internalpb.ImportFile,
job ImportJob, alloc allocator.Allocator, imeta ImportMeta,
job ImportJob, alloc allocator.Allocator, importMeta ImportMeta,
) ([]ImportTask, error) {
idStart, _, err := alloc.AllocN(int64(len(fileGroups)))
if err != nil {
@ -81,9 +81,9 @@ func NewPreImportTasks(fileGroups [][]*internalpb.ImportFile,
CreatedTime: time.Now().Format("2006-01-02T15:04:05Z07:00"),
}
task := &preImportTask{
imeta: imeta,
tr: timerecord.NewTimeRecorder("preimport task"),
times: taskcommon.NewTimes(),
importMeta: importMeta,
tr: timerecord.NewTimeRecorder("preimport task"),
times: taskcommon.NewTimes(),
}
task.task.Store(taskProto)
tasks = append(tasks, task)
@ -92,7 +92,7 @@ func NewPreImportTasks(fileGroups [][]*internalpb.ImportFile,
}
func NewImportTasks(fileGroups [][]*datapb.ImportFileStats,
job ImportJob, alloc allocator.Allocator, meta *meta, imeta ImportMeta,
job ImportJob, alloc allocator.Allocator, meta *meta, importMeta ImportMeta,
) ([]ImportTask, error) {
idBegin, _, err := alloc.AllocN(int64(len(fileGroups)))
if err != nil {
@ -110,11 +110,11 @@ func NewImportTasks(fileGroups [][]*datapb.ImportFileStats,
CreatedTime: time.Now().Format("2006-01-02T15:04:05Z07:00"),
}
task := &importTask{
alloc: alloc,
meta: meta,
imeta: imeta,
tr: timerecord.NewTimeRecorder("import task"),
times: taskcommon.NewTimes(),
alloc: alloc,
meta: meta,
importMeta: importMeta,
tr: timerecord.NewTimeRecorder("import task"),
times: taskcommon.NewTimes(),
}
task.task.Store(taskProto)
segments, err := AssignSegments(job, task, alloc, meta)
@ -389,7 +389,7 @@ func RegroupImportFiles(job ImportJob, files []*datapb.ImportFileStats, allDiskI
return fileGroups
}
func CheckDiskQuota(job ImportJob, meta *meta, imeta ImportMeta) (int64, error) {
func CheckDiskQuota(ctx context.Context, job ImportJob, meta *meta, importMeta ImportMeta) (int64, error) {
if !Params.QuotaConfig.DiskProtectionEnabled.GetAsBool() {
return 0, nil
}
@ -402,7 +402,7 @@ func CheckDiskQuota(job ImportJob, meta *meta, imeta ImportMeta) (int64, error)
requestedTotal int64
requestedCollections = make(map[int64]int64)
)
for _, j := range imeta.GetJobBy(context.TODO()) {
for _, j := range importMeta.GetJobBy(ctx) {
requested := j.GetRequestedDiskSize()
requestedTotal += requested
requestedCollections[j.GetCollectionID()] += requested
@ -412,7 +412,7 @@ func CheckDiskQuota(job ImportJob, meta *meta, imeta ImportMeta) (int64, error)
quotaInfo := meta.GetQuotaInfo()
totalUsage, collectionsUsage := quotaInfo.TotalBinlogSize, quotaInfo.CollectionBinlogSize
tasks := imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(PreImportTaskType))
tasks := importMeta.GetTaskBy(ctx, WithJob(job.GetJobID()), WithType(PreImportTaskType))
files := make([]*datapb.ImportFileStats, 0)
for _, task := range tasks {
files = append(files, task.GetFileStats()...)
@ -445,20 +445,20 @@ func CheckDiskQuota(job ImportJob, meta *meta, imeta ImportMeta) (int64, error)
return requestSize, nil
}
func getPendingProgress(jobID int64, imeta ImportMeta) float32 {
tasks := imeta.GetTaskBy(context.TODO(), WithJob(jobID), WithType(PreImportTaskType))
func getPendingProgress(ctx context.Context, jobID int64, importMeta ImportMeta) float32 {
tasks := importMeta.GetTaskBy(context.TODO(), WithJob(jobID), WithType(PreImportTaskType))
preImportingFiles := lo.SumBy(tasks, func(task ImportTask) int {
return len(task.GetFileStats())
})
totalFiles := len(imeta.GetJob(context.TODO(), jobID).GetFiles())
totalFiles := len(importMeta.GetJob(ctx, jobID).GetFiles())
if totalFiles == 0 {
return 1
}
return float32(preImportingFiles) / float32(totalFiles)
}
func getPreImportingProgress(jobID int64, imeta ImportMeta) float32 {
tasks := imeta.GetTaskBy(context.TODO(), WithJob(jobID), WithType(PreImportTaskType))
func getPreImportingProgress(ctx context.Context, jobID int64, importMeta ImportMeta) float32 {
tasks := importMeta.GetTaskBy(ctx, WithJob(jobID), WithType(PreImportTaskType))
completedTasks := lo.Filter(tasks, func(task ImportTask, _ int) bool {
return task.GetState() == datapb.ImportTaskStateV2_Completed
})
@ -468,8 +468,8 @@ func getPreImportingProgress(jobID int64, imeta ImportMeta) float32 {
return float32(len(completedTasks)) / float32(len(tasks))
}
func getImportRowsInfo(jobID int64, imeta ImportMeta, meta *meta) (importedRows, totalRows int64) {
tasks := imeta.GetTaskBy(context.TODO(), WithJob(jobID), WithType(ImportTaskType))
func getImportRowsInfo(ctx context.Context, jobID int64, importMeta ImportMeta, meta *meta) (importedRows, totalRows int64) {
tasks := importMeta.GetTaskBy(ctx, WithJob(jobID), WithType(ImportTaskType))
segmentIDs := make([]int64, 0)
for _, task := range tasks {
totalRows += lo.SumBy(task.GetFileStats(), func(file *datapb.ImportFileStats) int64 {
@ -481,19 +481,19 @@ func getImportRowsInfo(jobID int64, imeta ImportMeta, meta *meta) (importedRows,
return
}
func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) (float32, int64, int64) {
importedRows, totalRows := getImportRowsInfo(jobID, imeta, meta)
func getImportingProgress(ctx context.Context, jobID int64, importMeta ImportMeta, meta *meta) (float32, int64, int64) {
importedRows, totalRows := getImportRowsInfo(ctx, jobID, importMeta, meta)
if totalRows == 0 {
return 1, importedRows, totalRows
}
return float32(importedRows) / float32(totalRows), importedRows, totalRows
}
func getStatsProgress(jobID int64, imeta ImportMeta, sjm StatsInspector) float32 {
func getStatsProgress(ctx context.Context, jobID int64, importMeta ImportMeta, sjm StatsInspector) float32 {
if !Params.DataCoordCfg.EnableStatsTask.GetAsBool() {
return 1
}
tasks := imeta.GetTaskBy(context.TODO(), WithJob(jobID), WithType(ImportTaskType))
tasks := importMeta.GetTaskBy(ctx, WithJob(jobID), WithType(ImportTaskType))
originSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 {
return t.(*importTask).GetSegmentIDs()
})
@ -510,12 +510,12 @@ func getStatsProgress(jobID int64, imeta ImportMeta, sjm StatsInspector) float32
return float32(doneCnt) / float32(len(originSegmentIDs))
}
func getIndexBuildingProgress(jobID int64, imeta ImportMeta, meta *meta) float32 {
job := imeta.GetJob(context.TODO(), jobID)
func getIndexBuildingProgress(ctx context.Context, jobID int64, importMeta ImportMeta, meta *meta) float32 {
job := importMeta.GetJob(ctx, jobID)
if !Params.DataCoordCfg.WaitForIndex.GetAsBool() {
return 1
}
tasks := imeta.GetTaskBy(context.TODO(), WithJob(jobID), WithType(ImportTaskType))
tasks := importMeta.GetTaskBy(ctx, WithJob(jobID), WithType(ImportTaskType))
originSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 {
return t.(*importTask).GetSegmentIDs()
})
@ -542,36 +542,36 @@ func getIndexBuildingProgress(jobID int64, imeta ImportMeta, meta *meta) float32
// 10%: Completed
// TODO: Wrap a function to map status to user status.
// TODO: Save these progress to job instead of recalculating.
func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta, sjm StatsInspector) (int64, internalpb.ImportJobState, int64, int64, string) {
job := imeta.GetJob(context.TODO(), jobID)
func GetJobProgress(ctx context.Context, jobID int64, importMeta ImportMeta, meta *meta, sjm StatsInspector) (int64, internalpb.ImportJobState, int64, int64, string) {
job := importMeta.GetJob(ctx, jobID)
if job == nil {
return 0, internalpb.ImportJobState_Failed, 0, 0, fmt.Sprintf("import job does not exist, jobID=%d", jobID)
}
switch job.GetState() {
case internalpb.ImportJobState_Pending:
progress := getPendingProgress(jobID, imeta)
progress := getPendingProgress(ctx, jobID, importMeta)
return int64(progress * 10), internalpb.ImportJobState_Pending, 0, 0, ""
case internalpb.ImportJobState_PreImporting:
progress := getPreImportingProgress(jobID, imeta)
progress := getPreImportingProgress(ctx, jobID, importMeta)
return 10 + int64(progress*30), internalpb.ImportJobState_Importing, 0, 0, ""
case internalpb.ImportJobState_Importing:
progress, importedRows, totalRows := getImportingProgress(jobID, imeta, meta)
progress, importedRows, totalRows := getImportingProgress(ctx, jobID, importMeta, meta)
return 10 + 30 + int64(progress*30), internalpb.ImportJobState_Importing, importedRows, totalRows, ""
case internalpb.ImportJobState_Stats:
progress := getStatsProgress(jobID, imeta, sjm)
_, totalRows := getImportRowsInfo(jobID, imeta, meta)
progress := getStatsProgress(ctx, jobID, importMeta, sjm)
_, totalRows := getImportRowsInfo(ctx, jobID, importMeta, meta)
return 10 + 30 + 30 + int64(progress*10), internalpb.ImportJobState_Importing, totalRows, totalRows, ""
case internalpb.ImportJobState_IndexBuilding:
progress := getIndexBuildingProgress(jobID, imeta, meta)
_, totalRows := getImportRowsInfo(jobID, imeta, meta)
progress := getIndexBuildingProgress(ctx, jobID, importMeta, meta)
_, totalRows := getImportRowsInfo(ctx, jobID, importMeta, meta)
return 10 + 30 + 30 + 10 + int64(progress*10), internalpb.ImportJobState_Importing, totalRows, totalRows, ""
case internalpb.ImportJobState_Completed:
_, totalRows := getImportRowsInfo(jobID, imeta, meta)
_, totalRows := getImportRowsInfo(ctx, jobID, importMeta, meta)
return 100, internalpb.ImportJobState_Completed, totalRows, totalRows, ""
case internalpb.ImportJobState_Failed:
@ -580,9 +580,9 @@ func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta, sjm StatsInspecto
return 0, internalpb.ImportJobState_None, 0, 0, "unknown import job state"
}
func GetTaskProgresses(jobID int64, imeta ImportMeta, meta *meta) []*internalpb.ImportTaskProgress {
func GetTaskProgresses(ctx context.Context, jobID int64, importMeta ImportMeta, meta *meta) []*internalpb.ImportTaskProgress {
progresses := make([]*internalpb.ImportTaskProgress, 0)
tasks := imeta.GetTaskBy(context.TODO(), WithJob(jobID), WithType(ImportTaskType))
tasks := importMeta.GetTaskBy(ctx, WithJob(jobID), WithType(ImportTaskType))
for _, task := range tasks {
totalRows := lo.SumBy(task.GetFileStats(), func(file *datapb.ImportFileStats) int64 {
return file.GetTotalRows()

View File

@ -412,7 +412,7 @@ func TestImportUtil_CheckDiskQuota(t *testing.T) {
catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListStatsTasks(mock.Anything).Return(nil, nil)
imeta, err := NewImportMeta(context.TODO(), catalog, nil, nil)
importMeta, err := NewImportMeta(context.TODO(), catalog, nil, nil)
assert.NoError(t, err)
broker := broker.NewMockBroker(t)
@ -426,7 +426,7 @@ func TestImportUtil_CheckDiskQuota(t *testing.T) {
CollectionID: 100,
},
}
err = imeta.AddJob(context.TODO(), job)
err = importMeta.AddJob(context.TODO(), job)
assert.NoError(t, err)
preImportTaskProto := &datapb.PreImportTask{
@ -439,12 +439,12 @@ func TestImportUtil_CheckDiskQuota(t *testing.T) {
}
pit := &preImportTask{}
pit.task.Store(preImportTaskProto)
err = imeta.AddTask(context.TODO(), pit)
err = importMeta.AddTask(context.TODO(), pit)
assert.NoError(t, err)
Params.Save(Params.QuotaConfig.DiskProtectionEnabled.Key, "false")
defer Params.Reset(Params.QuotaConfig.DiskProtectionEnabled.Key)
_, err = CheckDiskQuota(job, meta, imeta)
_, err = CheckDiskQuota(context.TODO(), job, meta, importMeta)
assert.NoError(t, err)
segment := &SegmentInfo{
@ -459,7 +459,7 @@ func TestImportUtil_CheckDiskQuota(t *testing.T) {
{Key: importutilv2.BackupFlag, Value: "true"},
{Key: importutilv2.SkipDQC, Value: "true"},
}
_, err = CheckDiskQuota(job, meta, imeta)
_, err = CheckDiskQuota(context.TODO(), job, meta, importMeta)
assert.NoError(t, err)
job.Options = nil
@ -467,17 +467,17 @@ func TestImportUtil_CheckDiskQuota(t *testing.T) {
Params.Save(Params.QuotaConfig.DiskQuotaPerCollection.Key, "10000")
defer Params.Reset(Params.QuotaConfig.DiskQuota.Key)
defer Params.Reset(Params.QuotaConfig.DiskQuotaPerCollection.Key)
requestSize, err := CheckDiskQuota(job, meta, imeta)
requestSize, err := CheckDiskQuota(context.TODO(), job, meta, importMeta)
assert.NoError(t, err)
assert.Equal(t, int64(3000*1024*1024), requestSize)
Params.Save(Params.QuotaConfig.DiskQuota.Key, "5000")
_, err = CheckDiskQuota(job, meta, imeta)
_, err = CheckDiskQuota(context.TODO(), job, meta, importMeta)
assert.True(t, errors.Is(err, merr.ErrServiceQuotaExceeded))
Params.Save(Params.QuotaConfig.DiskQuota.Key, "10000")
Params.Save(Params.QuotaConfig.DiskQuotaPerCollection.Key, "5000")
_, err = CheckDiskQuota(job, meta, imeta)
_, err = CheckDiskQuota(context.TODO(), job, meta, importMeta)
assert.True(t, errors.Is(err, merr.ErrServiceQuotaExceeded))
}
@ -491,7 +491,7 @@ func TestImportUtil_DropImportTask(t *testing.T) {
catalog.EXPECT().ListImportTasks(mock.Anything).Return(nil, nil)
catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(nil)
imeta, err := NewImportMeta(context.TODO(), catalog, nil, nil)
importMeta, err := NewImportMeta(context.TODO(), catalog, nil, nil)
assert.NoError(t, err)
taskProto := &datapb.ImportTaskV2{
@ -500,10 +500,10 @@ func TestImportUtil_DropImportTask(t *testing.T) {
}
task := &importTask{}
task.task.Store(taskProto)
err = imeta.AddTask(context.TODO(), task)
err = importMeta.AddTask(context.TODO(), task)
assert.NoError(t, err)
err = DropImportTask(task, cluster, imeta)
err = DropImportTask(task, cluster, importMeta)
assert.NoError(t, err)
}
@ -599,7 +599,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil)
catalog.EXPECT().ListStatsTasks(mock.Anything).Return(nil, nil)
imeta, err := NewImportMeta(context.TODO(), catalog, nil, nil)
importMeta, err := NewImportMeta(context.TODO(), catalog, nil, nil)
assert.NoError(t, err)
broker := broker.NewMockBroker(t)
@ -625,7 +625,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
Files: []*internalpb.ImportFile{file1, file2, file3},
},
}
err = imeta.AddJob(context.TODO(), job)
err = importMeta.AddJob(context.TODO(), job)
assert.NoError(t, err)
preImportTaskProto := &datapb.PreImportTask{
@ -645,7 +645,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
pit1 := &preImportTask{}
pit1.task.Store(preImportTaskProto)
err = imeta.AddTask(context.TODO(), pit1)
err = importMeta.AddTask(context.TODO(), pit1)
assert.NoError(t, err)
preImportTaskProto2 := &datapb.PreImportTask{
@ -660,7 +660,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
}
pit2 := &preImportTask{}
pit2.task.Store(preImportTaskProto2)
err = imeta.AddTask(context.TODO(), pit2)
err = importMeta.AddTask(context.TODO(), pit2)
assert.NoError(t, err)
taskProto1 := &datapb.ImportTaskV2{
@ -681,7 +681,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
}
it1 := &importTask{}
it1.task.Store(taskProto1)
err = imeta.AddTask(context.TODO(), it1)
err = importMeta.AddTask(context.TODO(), it1)
assert.NoError(t, err)
err = meta.AddSegment(ctx, &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{ID: 10, IsImporting: true, State: commonpb.SegmentState_Flushed, NumOfRows: 50},
@ -710,7 +710,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
}
it2 := &importTask{}
it2.task.Store(taskProto2)
err = imeta.AddTask(context.TODO(), it2)
err = importMeta.AddTask(context.TODO(), it2)
assert.NoError(t, err)
err = meta.AddSegment(ctx, &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{ID: 20, IsImporting: true, State: commonpb.SegmentState_Flushed, NumOfRows: 50},
@ -726,40 +726,40 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
assert.NoError(t, err)
// failed state
err = imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(mockErr))
err = importMeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(mockErr))
assert.NoError(t, err)
progress, state, _, _, reason := GetJobProgress(job.GetJobID(), imeta, meta, nil)
progress, state, _, _, reason := GetJobProgress(ctx, job.GetJobID(), importMeta, meta, nil)
assert.Equal(t, int64(0), progress)
assert.Equal(t, internalpb.ImportJobState_Failed, state)
assert.Equal(t, mockErr, reason)
// job does not exist
progress, state, _, _, reason = GetJobProgress(-1, imeta, meta, nil)
progress, state, _, _, reason = GetJobProgress(ctx, -1, importMeta, meta, nil)
assert.Equal(t, int64(0), progress)
assert.Equal(t, internalpb.ImportJobState_Failed, state)
assert.NotEqual(t, "", reason)
// pending state
err = imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Pending))
err = importMeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Pending))
assert.NoError(t, err)
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, nil)
progress, state, _, _, reason = GetJobProgress(ctx, job.GetJobID(), importMeta, meta, nil)
assert.Equal(t, int64(10), progress)
assert.Equal(t, internalpb.ImportJobState_Pending, state)
assert.Equal(t, "", reason)
// preImporting state
err = imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting))
err = importMeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting))
assert.NoError(t, err)
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, nil)
progress, state, _, _, reason = GetJobProgress(ctx, job.GetJobID(), importMeta, meta, nil)
assert.Equal(t, int64(10+30), progress)
assert.Equal(t, internalpb.ImportJobState_Importing, state)
assert.Equal(t, "", reason)
// importing state, segmentImportedRows/totalRows = 0.5
err = imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing))
err = importMeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing))
assert.NoError(t, err)
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, nil)
progress, state, _, _, reason = GetJobProgress(ctx, job.GetJobID(), importMeta, meta, nil)
assert.Equal(t, int64(10+30+30*0.5), progress)
assert.Equal(t, internalpb.ImportJobState_Importing, state)
assert.Equal(t, "", reason)
@ -777,13 +777,13 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
assert.NoError(t, err)
err = meta.UpdateSegmentsInfo(context.TODO(), UpdateImportedRows(22, 100))
assert.NoError(t, err)
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, nil)
progress, state, _, _, reason = GetJobProgress(ctx, job.GetJobID(), importMeta, meta, nil)
assert.Equal(t, int64(float32(10+30+30)), progress)
assert.Equal(t, internalpb.ImportJobState_Importing, state)
assert.Equal(t, "", reason)
// stats state, len(statsSegmentIDs) / (len(originalSegmentIDs) = 0.5
err = imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Stats))
err = importMeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Stats))
assert.NoError(t, err)
sjm := NewMockStatsJobManager(t)
sjm.EXPECT().GetStatsTask(mock.Anything, mock.Anything).RunAndReturn(func(segmentID int64, _ indexpb.StatsSubJob) *indexpb.StatsTask {
@ -796,7 +796,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
State: indexpb.JobState_JobStateInProgress,
}
})
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, sjm)
progress, state, _, _, reason = GetJobProgress(ctx, job.GetJobID(), importMeta, meta, sjm)
assert.Equal(t, int64(10+30+30+10*0.5), progress)
assert.Equal(t, internalpb.ImportJobState_Importing, state)
assert.Equal(t, "", reason)
@ -806,15 +806,15 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
sjm.EXPECT().GetStatsTask(mock.Anything, mock.Anything).Return(&indexpb.StatsTask{
State: indexpb.JobState_JobStateFinished,
})
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, sjm)
progress, state, _, _, reason = GetJobProgress(ctx, job.GetJobID(), importMeta, meta, sjm)
assert.Equal(t, int64(10+30+30+10), progress)
assert.Equal(t, internalpb.ImportJobState_Importing, state)
assert.Equal(t, "", reason)
// completed state
err = imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed))
err = importMeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed))
assert.NoError(t, err)
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, sjm)
progress, state, _, _, reason = GetJobProgress(ctx, job.GetJobID(), importMeta, meta, sjm)
assert.Equal(t, int64(100), progress)
assert.Equal(t, internalpb.ImportJobState_Completed, state)
assert.Equal(t, "", reason)

View File

@ -319,9 +319,9 @@ func (s *Server) initDataCoord() error {
s.initGarbageCollection(storageCli)
s.importInspector = NewImportInspector(s.meta, s.importMeta, s.globalScheduler)
s.importInspector = NewImportInspector(s.ctx, s.meta, s.importMeta, s.globalScheduler)
s.importChecker = NewImportChecker(s.meta, s.broker, s.allocator, s.importMeta, s.statsInspector, s.compactionTriggerManager)
s.importChecker = NewImportChecker(s.ctx, s.meta, s.broker, s.allocator, s.importMeta, s.statsInspector, s.compactionTriggerManager)
s.syncSegmentsScheduler = newSyncSegmentsScheduler(s.meta, s.channelManager, s.sessionManager)

View File

@ -1908,7 +1908,7 @@ func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImport
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("import job does not exist, jobID=%d", jobID)))
return resp, nil
}
progress, state, importedRows, totalRows, reason := GetJobProgress(jobID, s.importMeta, s.meta, s.statsInspector)
progress, state, importedRows, totalRows, reason := GetJobProgress(ctx, jobID, s.importMeta, s.meta, s.statsInspector)
resp.State = state
resp.Reason = reason
resp.Progress = progress
@ -1917,7 +1917,7 @@ func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImport
resp.CompleteTime = job.GetCompleteTime()
resp.ImportedRows = importedRows
resp.TotalRows = totalRows
resp.TaskProgresses = GetTaskProgresses(jobID, s.importMeta, s.meta)
resp.TaskProgresses = GetTaskProgresses(ctx, jobID, s.importMeta, s.meta)
log.Info("GetImportProgress done", zap.String("jobState", job.GetState().String()), zap.Any("resp", resp))
return resp, nil
}
@ -1945,7 +1945,7 @@ func (s *Server) ListImports(ctx context.Context, req *internalpb.ListImportsReq
}
for _, job := range jobs {
progress, state, _, _, reason := GetJobProgress(job.GetJobID(), s.importMeta, s.meta, s.statsInspector)
progress, state, _, _, reason := GetJobProgress(ctx, job.GetJobID(), s.importMeta, s.meta, s.statsInspector)
resp.JobIDs = append(resp.JobIDs, fmt.Sprintf("%d", job.GetJobID()))
resp.States = append(resp.States, state)
resp.Reasons = append(resp.Reasons, reason)