diff --git a/configs/milvus.yaml b/configs/milvus.yaml index c8e9d23cdf..c7cd7cfb34 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -640,6 +640,7 @@ dataNode: maxConcurrentTaskNum: 16 # The maximum number of import/pre-import tasks allowed to run concurrently on a datanode. maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files. readBufferSizeInMB: 16 # The data block size (in MB) read from chunk manager by the datanode during import. + maxTaskSlotNum: 16 # The maximum number of slots occupied by each import/pre-import task. compaction: levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1. diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index 9504b50ec9..8fec368cd7 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -196,10 +196,15 @@ func (c *importChecker) checkPendingJob(job ImportJob) { } log.Info("add new preimport task", WrapTaskLog(t)...) } + err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting)) if err != nil { log.Warn("failed to update job state to PreImporting", zap.Error(err)) + return } + pendingDuration := job.GetTR().RecordSpan() + metrics.ImportJobLatency.WithLabelValues(metrics.ImportStagePending).Observe(float64(pendingDuration.Milliseconds())) + log.Info("import job start to execute", zap.Duration("jobTimeCost/pending", pendingDuration)) } func (c *importChecker) checkPreImportingJob(job ImportJob) { @@ -234,10 +239,15 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { } log.Info("add new import task", WrapTaskLog(t)...) } + err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing), UpdateRequestedDiskSize(requestSize)) if err != nil { log.Warn("failed to update job state to Importing", zap.Error(err)) + return } + preImportDuration := job.GetTR().RecordSpan() + metrics.ImportJobLatency.WithLabelValues(metrics.ImportStagePreImport).Observe(float64(preImportDuration.Milliseconds())) + log.Info("import job preimport done", zap.Duration("jobTimeCost/preimport", preImportDuration)) } func (c *importChecker) checkImportingJob(job ImportJob) { @@ -253,7 +263,9 @@ func (c *importChecker) checkImportingJob(job ImportJob) { log.Warn("failed to update job state to IndexBuilding", zap.Error(err)) return } - log.Info("update import job state to IndexBuilding") + importDuration := job.GetTR().RecordSpan() + metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageImport).Observe(float64(importDuration.Milliseconds())) + log.Info("import job import done", zap.Duration("jobTimeCost/import", importDuration)) } func (c *importChecker) checkIndexBuildingJob(job ImportJob) { @@ -269,6 +281,10 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) { return } + buildIndexDuration := job.GetTR().RecordSpan() + metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageBuildIndex).Observe(float64(buildIndexDuration.Milliseconds())) + log.Info("import job build index done", zap.Duration("jobTimeCost/buildIndex", buildIndexDuration)) + // Here, all segment indexes have been successfully built, try unset isImporting flag for all segments. isImportingSegments := lo.Filter(segmentIDs, func(segmentID int64, _ int) bool { segment := c.meta.GetSegment(segmentID) @@ -306,7 +322,9 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) { log.Warn("failed to update job state to Completed", zap.Error(err)) return } - log.Info("import job completed") + totalDuration := job.GetTR().ElapseSpan() + metrics.ImportJobLatency.WithLabelValues(metrics.TotalLabel).Observe(float64(totalDuration.Milliseconds())) + log.Info("import job all completed", zap.Duration("jobTimeCost/total", totalDuration)) } func (c *importChecker) tryFailingTasks(job ImportJob) { diff --git a/internal/datacoord/import_checker_test.go b/internal/datacoord/import_checker_test.go index e04da6ee2e..8197dd23ff 100644 --- a/internal/datacoord/import_checker_test.go +++ b/internal/datacoord/import_checker_test.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) @@ -95,6 +96,7 @@ func (s *ImportCheckerSuite) SetupTest() { }, }, }, + tr: timerecord.NewTimeRecorder("import job"), } catalog.EXPECT().SaveImportJob(mock.Anything).Return(nil) @@ -114,6 +116,7 @@ func (s *ImportCheckerSuite) TestLogStats() { TaskID: 1, State: datapb.ImportTaskStateV2_Failed, }, + tr: timerecord.NewTimeRecorder("preimport task"), } err := s.imeta.AddTask(pit1) s.NoError(err) @@ -125,6 +128,7 @@ func (s *ImportCheckerSuite) TestLogStats() { SegmentIDs: []int64{10, 11, 12}, State: datapb.ImportTaskStateV2_Pending, }, + tr: timerecord.NewTimeRecorder("import task"), } err = s.imeta.AddTask(it1) s.NoError(err) @@ -286,6 +290,7 @@ func (s *ImportCheckerSuite) TestCheckTimeout() { TaskID: 1, State: datapb.ImportTaskStateV2_InProgress, }, + tr: timerecord.NewTimeRecorder("preimport task"), } err := s.imeta.AddTask(task) s.NoError(err) @@ -307,6 +312,7 @@ func (s *ImportCheckerSuite) TestCheckFailure() { State: datapb.ImportTaskStateV2_Pending, SegmentIDs: []int64{2}, }, + tr: timerecord.NewTimeRecorder("import task"), } err := s.imeta.AddTask(it) s.NoError(err) @@ -336,6 +342,7 @@ func (s *ImportCheckerSuite) TestCheckGC() { State: datapb.ImportTaskStateV2_Failed, SegmentIDs: []int64{2}, }, + tr: timerecord.NewTimeRecorder("import task"), } err := s.imeta.AddTask(task) s.NoError(err) @@ -405,6 +412,7 @@ func (s *ImportCheckerSuite) TestCheckCollection() { TaskID: 1, State: datapb.ImportTaskStateV2_Pending, }, + tr: timerecord.NewTimeRecorder("preimport task"), } err := s.imeta.AddTask(task) s.NoError(err) diff --git a/internal/datacoord/import_job.go b/internal/datacoord/import_job.go index 4404810573..39645ec515 100644 --- a/internal/datacoord/import_job.go +++ b/internal/datacoord/import_job.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) @@ -91,15 +92,23 @@ type ImportJob interface { GetCompleteTime() string GetFiles() []*internalpb.ImportFile GetOptions() []*commonpb.KeyValuePair + GetTR() *timerecord.TimeRecorder Clone() ImportJob } type importJob struct { *datapb.ImportJob + + tr *timerecord.TimeRecorder +} + +func (j *importJob) GetTR() *timerecord.TimeRecorder { + return j.tr } func (j *importJob) Clone() ImportJob { return &importJob{ ImportJob: proto.Clone(j.ImportJob).(*datapb.ImportJob), + tr: j.tr, } } diff --git a/internal/datacoord/import_meta.go b/internal/datacoord/import_meta.go index debf5509e8..0a9b272dc9 100644 --- a/internal/datacoord/import_meta.go +++ b/internal/datacoord/import_meta.go @@ -19,6 +19,7 @@ package datacoord import ( "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/pkg/util/lock" + "github.com/milvus-io/milvus/pkg/util/timerecord" ) type ImportMeta interface { @@ -61,11 +62,13 @@ func NewImportMeta(catalog metastore.DataCoordCatalog) (ImportMeta, error) { for _, task := range restoredPreImportTasks { tasks[task.GetTaskID()] = &preImportTask{ PreImportTask: task, + tr: timerecord.NewTimeRecorder("preimport task"), } } for _, task := range restoredImportTasks { tasks[task.GetTaskID()] = &importTask{ ImportTaskV2: task, + tr: timerecord.NewTimeRecorder("import task"), } } @@ -73,6 +76,7 @@ func NewImportMeta(catalog metastore.DataCoordCatalog) (ImportMeta, error) { for _, job := range restoredJobs { jobs[job.GetJobID()] = &importJob{ ImportJob: job, + tr: timerecord.NewTimeRecorder("import job"), } } diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index 453c4bd761..c5ab7ba215 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -93,23 +93,6 @@ func (s *importScheduler) Close() { } func (s *importScheduler) process() { - getNodeID := func(nodeSlots map[int64]int64) int64 { - var ( - nodeID int64 = NullNodeID - maxSlots int64 = -1 - ) - for id, slots := range nodeSlots { - if slots > 0 && slots > maxSlots { - nodeID = id - maxSlots = slots - } - } - if nodeID != NullNodeID { - nodeSlots[nodeID]-- - } - return nodeID - } - jobs := s.imeta.GetJobBy() sort.Slice(jobs, func(i, j int) bool { return jobs[i].GetJobID() < jobs[j].GetJobID() @@ -120,7 +103,7 @@ func (s *importScheduler) process() { for _, task := range tasks { switch task.GetState() { case datapb.ImportTaskStateV2_Pending: - nodeID := getNodeID(nodeSlots) + nodeID := s.getNodeID(task, nodeSlots) switch task.GetType() { case PreImportTaskType: s.processPendingPreImport(task, nodeID) @@ -169,6 +152,25 @@ func (s *importScheduler) peekSlots() map[int64]int64 { return nodeSlots } +func (s *importScheduler) getNodeID(task ImportTask, nodeSlots map[int64]int64) int64 { + var ( + nodeID int64 = NullNodeID + maxSlots int64 = -1 + ) + require := task.GetSlots() + for id, slots := range nodeSlots { + // find the most idle datanode + if slots > 0 && slots >= require && slots > maxSlots { + nodeID = id + maxSlots = slots + } + } + if nodeID != NullNodeID { + nodeSlots[nodeID] -= require + } + return nodeID +} + func (s *importScheduler) processPendingPreImport(task ImportTask, nodeID int64) { if nodeID == NullNodeID { return @@ -188,7 +190,9 @@ func (s *importScheduler) processPendingPreImport(task ImportTask, nodeID int64) log.Warn("update import task failed", WrapTaskLog(task, zap.Error(err))...) return } - log.Info("process pending preimport task done", WrapTaskLog(task)...) + pendingDuration := task.GetTR().RecordSpan() + metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStagePending).Observe(float64(pendingDuration.Milliseconds())) + log.Info("preimport task start to execute", WrapTaskLog(task, zap.Int64("scheduledNodeID", nodeID), zap.Duration("taskTimeCost/pending", pendingDuration))...) } func (s *importScheduler) processPendingImport(task ImportTask, nodeID int64) { @@ -214,7 +218,9 @@ func (s *importScheduler) processPendingImport(task ImportTask, nodeID int64) { log.Warn("update import task failed", WrapTaskLog(task, zap.Error(err))...) return } - log.Info("processing pending import task done", WrapTaskLog(task)...) + pendingDuration := task.GetTR().RecordSpan() + metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStagePending).Observe(float64(pendingDuration.Milliseconds())) + log.Info("import task start to execute", WrapTaskLog(task, zap.Int64("scheduledNodeID", nodeID), zap.Duration("taskTimeCost/pending", pendingDuration))...) } func (s *importScheduler) processInProgressPreImport(task ImportTask) { @@ -251,6 +257,11 @@ func (s *importScheduler) processInProgressPreImport(task ImportTask) { } log.Info("query preimport", WrapTaskLog(task, zap.String("state", resp.GetState().String()), zap.Any("fileStats", resp.GetFileStats()))...) + if resp.GetState() == datapb.ImportTaskStateV2_Completed { + preimportDuration := task.GetTR().RecordSpan() + metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStagePreImport).Observe(float64(preimportDuration.Milliseconds())) + log.Info("preimport done", WrapTaskLog(task, zap.Duration("taskTimeCost/preimport", preimportDuration))...) + } } func (s *importScheduler) processInProgressImport(task ImportTask) { @@ -328,6 +339,9 @@ func (s *importScheduler) processInProgressImport(task ImportTask) { log.Warn("update import task failed", WrapTaskLog(task, zap.Error(err))...) return } + importDuration := task.GetTR().RecordSpan() + metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStageImport).Observe(float64(importDuration.Milliseconds())) + log.Info("import done", WrapTaskLog(task, zap.Duration("taskTimeCost/import", importDuration))...) } log.Info("query import", WrapTaskLog(task, zap.String("state", resp.GetState().String()), zap.String("reason", resp.GetReason()))...) diff --git a/internal/datacoord/import_scheduler_test.go b/internal/datacoord/import_scheduler_test.go index a8f51d28ae..79eeb2c107 100644 --- a/internal/datacoord/import_scheduler_test.go +++ b/internal/datacoord/import_scheduler_test.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/timerecord" ) type ImportSchedulerSuite struct { @@ -85,6 +86,7 @@ func (s *ImportSchedulerSuite) TestProcessPreImport() { CollectionID: s.collectionID, State: datapb.ImportTaskStateV2_Pending, }, + tr: timerecord.NewTimeRecorder("preimport task"), } err := s.imeta.AddTask(task) s.NoError(err) @@ -95,6 +97,7 @@ func (s *ImportSchedulerSuite) TestProcessPreImport() { TimeoutTs: math.MaxUint64, Schema: &schemapb.CollectionSchema{}, }, + tr: timerecord.NewTimeRecorder("import job"), } err = s.imeta.AddJob(job) s.NoError(err) @@ -156,6 +159,7 @@ func (s *ImportSchedulerSuite) TestProcessImport() { }, }, }, + tr: timerecord.NewTimeRecorder("import task"), } err := s.imeta.AddTask(task) s.NoError(err) @@ -168,6 +172,7 @@ func (s *ImportSchedulerSuite) TestProcessImport() { Schema: &schemapb.CollectionSchema{}, TimeoutTs: math.MaxUint64, }, + tr: timerecord.NewTimeRecorder("import job"), } err = s.imeta.AddJob(job) s.NoError(err) @@ -222,6 +227,7 @@ func (s *ImportSchedulerSuite) TestProcessFailed() { SegmentIDs: []int64{2, 3}, State: datapb.ImportTaskStateV2_Failed, }, + tr: timerecord.NewTimeRecorder("import task"), } err := s.imeta.AddTask(task) s.NoError(err) @@ -234,6 +240,7 @@ func (s *ImportSchedulerSuite) TestProcessFailed() { Schema: &schemapb.CollectionSchema{}, TimeoutTs: math.MaxUint64, }, + tr: timerecord.NewTimeRecorder("import job"), } err = s.imeta.AddJob(job) s.NoError(err) diff --git a/internal/datacoord/import_task.go b/internal/datacoord/import_task.go index 1261cd5854..b790c11a5a 100644 --- a/internal/datacoord/import_task.go +++ b/internal/datacoord/import_task.go @@ -17,9 +17,12 @@ package datacoord import ( + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/paramtable" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/timerecord" ) type TaskType int @@ -131,33 +134,62 @@ type ImportTask interface { GetState() datapb.ImportTaskStateV2 GetReason() string GetFileStats() []*datapb.ImportFileStats + GetTR() *timerecord.TimeRecorder + GetSlots() int64 Clone() ImportTask } type preImportTask struct { *datapb.PreImportTask + tr *timerecord.TimeRecorder } func (p *preImportTask) GetType() TaskType { return PreImportTaskType } +func (p *preImportTask) GetTR() *timerecord.TimeRecorder { + return p.tr +} + +func (p *preImportTask) GetSlots() int64 { + return int64(funcutil.Min(len(p.GetFileStats()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt())) +} + func (p *preImportTask) Clone() ImportTask { return &preImportTask{ PreImportTask: proto.Clone(p.PreImportTask).(*datapb.PreImportTask), + tr: p.tr, } } type importTask struct { *datapb.ImportTaskV2 + tr *timerecord.TimeRecorder } func (t *importTask) GetType() TaskType { return ImportTaskType } +func (t *importTask) GetTR() *timerecord.TimeRecorder { + return t.tr +} + +func (t *importTask) GetSlots() int64 { + // Consider the following two scenarios: + // 1. Importing a large number of small files results in + // a small total data size, making file count unsuitable as a slot number. + // 2. Importing a file with many shards number results in many segments and a small total data size, + // making segment count unsuitable as a slot number. + // Taking these factors into account, we've decided to use the + // minimum value between segment count and file count as the slot number. + return int64(funcutil.Min(len(t.GetFileStats()), len(t.GetSegmentIDs()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt())) +} + func (t *importTask) Clone() ImportTask { return &importTask{ ImportTaskV2: proto.Clone(t.ImportTaskV2).(*datapb.ImportTaskV2), + tr: t.tr, } } diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 62e76c8ee9..06be742048 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/timerecord" ) func WrapTaskLog(task ImportTask, fields ...zap.Field) []zap.Field { @@ -45,6 +46,7 @@ func WrapTaskLog(task ImportTask, fields ...zap.Field) []zap.Field { zap.Int64("jobID", task.GetJobID()), zap.Int64("collectionID", task.GetCollectionID()), zap.String("type", task.GetType().String()), + zap.Int64("nodeID", task.GetNodeID()), } res = append(res, fields...) return res @@ -73,6 +75,7 @@ func NewPreImportTasks(fileGroups [][]*internalpb.ImportFile, State: datapb.ImportTaskStateV2_Pending, FileStats: fileStats, }, + tr: timerecord.NewTimeRecorder("preimport task"), } tasks = append(tasks, task) } @@ -97,6 +100,7 @@ func NewImportTasks(fileGroups [][]*datapb.ImportFileStats, State: datapb.ImportTaskStateV2_Pending, FileStats: group, }, + tr: timerecord.NewTimeRecorder("import task"), } segments, err := AssignSegments(job, task, alloc, meta) if err != nil { diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index b38d81d2de..a4b23d7e35 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -46,6 +46,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" + "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -1680,6 +1681,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter return importFile }) + startTime := time.Now() job := &importJob{ ImportJob: &datapb.ImportJob{ JobID: idStart, @@ -1693,8 +1695,9 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter State: internalpb.ImportJobState_Pending, Files: files, Options: in.GetOptions(), - StartTime: time.Now().Format("2006-01-02T15:04:05Z07:00"), + StartTime: startTime.Format("2006-01-02T15:04:05Z07:00"), }, + tr: timerecord.NewTimeRecorder("import job"), } err = s.importMeta.AddJob(job) if err != nil { diff --git a/internal/datanode/importv2/scheduler.go b/internal/datanode/importv2/scheduler.go index d1d58e8df0..e94b2671c4 100644 --- a/internal/datanode/importv2/scheduler.go +++ b/internal/datanode/importv2/scheduler.go @@ -17,6 +17,7 @@ package importv2 import ( + "sort" "sync" "time" @@ -64,6 +65,9 @@ func (s *scheduler) Start() { return case <-exeTicker.C: tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending)) + sort.Slice(tasks, func(i, j int) bool { + return tasks[i].GetTaskID() < tasks[j].GetTaskID() + }) futures := make(map[int64][]*conc.Future[any]) for _, task := range tasks { fs := task.Execute() @@ -86,7 +90,15 @@ func (s *scheduler) Start() { func (s *scheduler) Slots() int64 { tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending, datapb.ImportTaskStateV2_InProgress)) - return paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt64() - int64(len(tasks)) + used := lo.SumBy(tasks, func(t Task) int64 { + return t.GetSlots() + }) + total := paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt64() + free := total - used + if free >= 0 { + return free + } + return 0 } func (s *scheduler) Close() { diff --git a/internal/datanode/importv2/task.go b/internal/datanode/importv2/task.go index 023c23c007..0d7c46e6cc 100644 --- a/internal/datanode/importv2/task.go +++ b/internal/datanode/importv2/task.go @@ -161,6 +161,7 @@ type Task interface { GetState() datapb.ImportTaskStateV2 GetReason() string GetSchema() *schemapb.CollectionSchema + GetSlots() int64 Cancel() Clone() Task } diff --git a/internal/datanode/importv2/task_import.go b/internal/datanode/importv2/task_import.go index 92f9ec39cb..fc57f4a2ce 100644 --- a/internal/datanode/importv2/task_import.go +++ b/internal/datanode/importv2/task_import.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -97,6 +98,17 @@ func (t *ImportTask) GetSchema() *schemapb.CollectionSchema { return t.req.GetSchema() } +func (t *ImportTask) GetSlots() int64 { + // Consider the following two scenarios: + // 1. Importing a large number of small files results in + // a small total data size, making file count unsuitable as a slot number. + // 2. Importing a file with many shards number results in many segments and a small total data size, + // making segment count unsuitable as a slot number. + // Taking these factors into account, we've decided to use the + // minimum value between segment count and file count as the slot number. + return int64(funcutil.Min(len(t.GetFileStats()), len(t.GetSegmentIDs()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt())) +} + func (t *ImportTask) Cancel() { t.cancel() } diff --git a/internal/datanode/importv2/task_l0_import.go b/internal/datanode/importv2/task_l0_import.go index 65c0c8effc..7c718d7c61 100644 --- a/internal/datanode/importv2/task_l0_import.go +++ b/internal/datanode/importv2/task_l0_import.go @@ -93,6 +93,10 @@ func (t *L0ImportTask) GetSchema() *schemapb.CollectionSchema { return t.req.GetSchema() } +func (t *L0ImportTask) GetSlots() int64 { + return 1 +} + func (t *L0ImportTask) Cancel() { t.cancel() } diff --git a/internal/datanode/importv2/task_l0_preimport.go b/internal/datanode/importv2/task_l0_preimport.go index 65851da007..93777c8682 100644 --- a/internal/datanode/importv2/task_l0_preimport.go +++ b/internal/datanode/importv2/task_l0_preimport.go @@ -94,6 +94,10 @@ func (t *L0PreImportTask) GetSchema() *schemapb.CollectionSchema { return t.schema } +func (t *L0PreImportTask) GetSlots() int64 { + return 1 +} + func (t *L0PreImportTask) Cancel() { t.cancel() } diff --git a/internal/datanode/importv2/task_preimport.go b/internal/datanode/importv2/task_preimport.go index be5b03afb6..c507f69ff0 100644 --- a/internal/datanode/importv2/task_preimport.go +++ b/internal/datanode/importv2/task_preimport.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -101,6 +102,10 @@ func (t *PreImportTask) GetSchema() *schemapb.CollectionSchema { return t.schema } +func (t *PreImportTask) GetSlots() int64 { + return int64(funcutil.Min(len(t.GetFileStats()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt())) +} + func (t *PreImportTask) Cancel() { t.cancel() } diff --git a/pkg/metrics/datacoord_metrics.go b/pkg/metrics/datacoord_metrics.go index 7976cea697..30b78dcad3 100644 --- a/pkg/metrics/datacoord_metrics.go +++ b/pkg/metrics/datacoord_metrics.go @@ -212,6 +212,28 @@ var ( stageLabelName, }) + ImportJobLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataCoordRole, + Name: "import_job_latency", + Help: "latency of import job", + Buckets: longTaskBuckets, + }, []string{ + importStageLabelName, + }) + + ImportTaskLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataCoordRole, + Name: "import_task_latency", + Help: "latency of import task", + Buckets: longTaskBuckets, + }, []string{ + importStageLabelName, + }) + FlushedSegmentFileNum = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: milvusNamespace, @@ -342,6 +364,8 @@ func RegisterDataCoord(registry *prometheus.Registry) { registry.MustRegister(DataCoordCompactedSegmentSize) registry.MustRegister(DataCoordCompactionTaskNum) registry.MustRegister(DataCoordCompactionLatency) + registry.MustRegister(ImportJobLatency) + registry.MustRegister(ImportTaskLatency) registry.MustRegister(DataCoordSizeStoredL0Segment) registry.MustRegister(DataCoordL0DeleteEntriesNum) registry.MustRegister(FlushedSegmentFileNum) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index a5046a79d3..3e85cbdc2c 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -76,6 +76,11 @@ const ( Executing = "executing" Done = "done" + ImportStagePending = "pending" + ImportStagePreImport = "preimport" + ImportStageImport = "import" + ImportStageBuildIndex = "build_index" + compactionTypeLabelName = "compaction_type" isVectorFieldLabelName = "is_vector_field" segmentPruneLabelName = "segment_prune_label" @@ -103,6 +108,7 @@ const ( cacheStateLabelName = "cache_state" indexCountLabelName = "indexed_field_count" dataSourceLabelName = "data_source" + importStageLabelName = "import_stage" requestScope = "scope" fullMethodLabelName = "full_method" reduceLevelName = "reduce_level" diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 8cbd35ddcd..c34a5b167a 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -4085,6 +4085,7 @@ type dataNodeConfig struct { MaxConcurrentImportTaskNum ParamItem `refreshable:"true"` MaxImportFileSizeInGB ParamItem `refreshable:"true"` ReadBufferSizeInMB ParamItem `refreshable:"true"` + MaxTaskSlotNum ParamItem `refreshable:"true"` // Compaction L0BatchMemoryRatio ParamItem `refreshable:"true"` @@ -4389,6 +4390,16 @@ if this parameter <= 0, will set it as 10`, } p.ReadBufferSizeInMB.Init(base.mgr) + p.MaxTaskSlotNum = ParamItem{ + Key: "dataNode.import.maxTaskSlotNum", + Version: "2.4.13", + Doc: "The maximum number of slots occupied by each import/pre-import task.", + DefaultValue: "16", + PanicIfEmpty: false, + Export: true, + } + p.MaxTaskSlotNum.Init(base.mgr) + p.L0BatchMemoryRatio = ParamItem{ Key: "dataNode.compaction.levelZeroBatchMemoryRatio", Version: "2.4.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 9573ac5365..41604b3358 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -554,6 +554,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 16, maxConcurrentImportTaskNum) assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64()) assert.Equal(t, 16, Params.ReadBufferSizeInMB.GetAsInt()) + assert.Equal(t, 16, Params.MaxTaskSlotNum.GetAsInt()) params.Save("datanode.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) assert.Equal(t, 4, Params.BloomFilterApplyParallelFactor.GetAsInt())