mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
1. Optimize import scheduling strategic: a. Revise slot weights, calculating them based on the number of files and segments for both import and pre-import tasks. b. Ensure that the DN executes tasks in ascending order of task ID. 2. Add time cost metric and log. issue: https://github.com/milvus-io/milvus/issues/36600, https://github.com/milvus-io/milvus/issues/36518 pr: https://github.com/milvus-io/milvus/pull/36601 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
b24788b2c7
commit
a4ef93457d
@ -640,6 +640,7 @@ dataNode:
|
||||
maxConcurrentTaskNum: 16 # The maximum number of import/pre-import tasks allowed to run concurrently on a datanode.
|
||||
maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files.
|
||||
readBufferSizeInMB: 16 # The data block size (in MB) read from chunk manager by the datanode during import.
|
||||
maxTaskSlotNum: 16 # The maximum number of slots occupied by each import/pre-import task.
|
||||
compaction:
|
||||
levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode
|
||||
levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1.
|
||||
|
||||
@ -196,10 +196,15 @@ func (c *importChecker) checkPendingJob(job ImportJob) {
|
||||
}
|
||||
log.Info("add new preimport task", WrapTaskLog(t)...)
|
||||
}
|
||||
|
||||
err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting))
|
||||
if err != nil {
|
||||
log.Warn("failed to update job state to PreImporting", zap.Error(err))
|
||||
return
|
||||
}
|
||||
pendingDuration := job.GetTR().RecordSpan()
|
||||
metrics.ImportJobLatency.WithLabelValues(metrics.ImportStagePending).Observe(float64(pendingDuration.Milliseconds()))
|
||||
log.Info("import job start to execute", zap.Duration("jobTimeCost/pending", pendingDuration))
|
||||
}
|
||||
|
||||
func (c *importChecker) checkPreImportingJob(job ImportJob) {
|
||||
@ -234,10 +239,15 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
|
||||
}
|
||||
log.Info("add new import task", WrapTaskLog(t)...)
|
||||
}
|
||||
|
||||
err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing), UpdateRequestedDiskSize(requestSize))
|
||||
if err != nil {
|
||||
log.Warn("failed to update job state to Importing", zap.Error(err))
|
||||
return
|
||||
}
|
||||
preImportDuration := job.GetTR().RecordSpan()
|
||||
metrics.ImportJobLatency.WithLabelValues(metrics.ImportStagePreImport).Observe(float64(preImportDuration.Milliseconds()))
|
||||
log.Info("import job preimport done", zap.Duration("jobTimeCost/preimport", preImportDuration))
|
||||
}
|
||||
|
||||
func (c *importChecker) checkImportingJob(job ImportJob) {
|
||||
@ -253,7 +263,9 @@ func (c *importChecker) checkImportingJob(job ImportJob) {
|
||||
log.Warn("failed to update job state to IndexBuilding", zap.Error(err))
|
||||
return
|
||||
}
|
||||
log.Info("update import job state to IndexBuilding")
|
||||
importDuration := job.GetTR().RecordSpan()
|
||||
metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageImport).Observe(float64(importDuration.Milliseconds()))
|
||||
log.Info("import job import done", zap.Duration("jobTimeCost/import", importDuration))
|
||||
}
|
||||
|
||||
func (c *importChecker) checkIndexBuildingJob(job ImportJob) {
|
||||
@ -269,6 +281,10 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) {
|
||||
return
|
||||
}
|
||||
|
||||
buildIndexDuration := job.GetTR().RecordSpan()
|
||||
metrics.ImportJobLatency.WithLabelValues(metrics.ImportStageBuildIndex).Observe(float64(buildIndexDuration.Milliseconds()))
|
||||
log.Info("import job build index done", zap.Duration("jobTimeCost/buildIndex", buildIndexDuration))
|
||||
|
||||
// Here, all segment indexes have been successfully built, try unset isImporting flag for all segments.
|
||||
isImportingSegments := lo.Filter(segmentIDs, func(segmentID int64, _ int) bool {
|
||||
segment := c.meta.GetSegment(segmentID)
|
||||
@ -306,7 +322,9 @@ func (c *importChecker) checkIndexBuildingJob(job ImportJob) {
|
||||
log.Warn("failed to update job state to Completed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
log.Info("import job completed")
|
||||
totalDuration := job.GetTR().ElapseSpan()
|
||||
metrics.ImportJobLatency.WithLabelValues(metrics.TotalLabel).Observe(float64(totalDuration.Milliseconds()))
|
||||
log.Info("import job all completed", zap.Duration("jobTimeCost/total", totalDuration))
|
||||
}
|
||||
|
||||
func (c *importChecker) tryFailingTasks(job ImportJob) {
|
||||
|
||||
@ -32,6 +32,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/metastore/mocks"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
)
|
||||
|
||||
@ -95,6 +96,7 @@ func (s *ImportCheckerSuite) SetupTest() {
|
||||
},
|
||||
},
|
||||
},
|
||||
tr: timerecord.NewTimeRecorder("import job"),
|
||||
}
|
||||
|
||||
catalog.EXPECT().SaveImportJob(mock.Anything).Return(nil)
|
||||
@ -114,6 +116,7 @@ func (s *ImportCheckerSuite) TestLogStats() {
|
||||
TaskID: 1,
|
||||
State: datapb.ImportTaskStateV2_Failed,
|
||||
},
|
||||
tr: timerecord.NewTimeRecorder("preimport task"),
|
||||
}
|
||||
err := s.imeta.AddTask(pit1)
|
||||
s.NoError(err)
|
||||
@ -125,6 +128,7 @@ func (s *ImportCheckerSuite) TestLogStats() {
|
||||
SegmentIDs: []int64{10, 11, 12},
|
||||
State: datapb.ImportTaskStateV2_Pending,
|
||||
},
|
||||
tr: timerecord.NewTimeRecorder("import task"),
|
||||
}
|
||||
err = s.imeta.AddTask(it1)
|
||||
s.NoError(err)
|
||||
@ -286,6 +290,7 @@ func (s *ImportCheckerSuite) TestCheckTimeout() {
|
||||
TaskID: 1,
|
||||
State: datapb.ImportTaskStateV2_InProgress,
|
||||
},
|
||||
tr: timerecord.NewTimeRecorder("preimport task"),
|
||||
}
|
||||
err := s.imeta.AddTask(task)
|
||||
s.NoError(err)
|
||||
@ -307,6 +312,7 @@ func (s *ImportCheckerSuite) TestCheckFailure() {
|
||||
State: datapb.ImportTaskStateV2_Pending,
|
||||
SegmentIDs: []int64{2},
|
||||
},
|
||||
tr: timerecord.NewTimeRecorder("import task"),
|
||||
}
|
||||
err := s.imeta.AddTask(it)
|
||||
s.NoError(err)
|
||||
@ -336,6 +342,7 @@ func (s *ImportCheckerSuite) TestCheckGC() {
|
||||
State: datapb.ImportTaskStateV2_Failed,
|
||||
SegmentIDs: []int64{2},
|
||||
},
|
||||
tr: timerecord.NewTimeRecorder("import task"),
|
||||
}
|
||||
err := s.imeta.AddTask(task)
|
||||
s.NoError(err)
|
||||
@ -405,6 +412,7 @@ func (s *ImportCheckerSuite) TestCheckCollection() {
|
||||
TaskID: 1,
|
||||
State: datapb.ImportTaskStateV2_Pending,
|
||||
},
|
||||
tr: timerecord.NewTimeRecorder("preimport task"),
|
||||
}
|
||||
err := s.imeta.AddTask(task)
|
||||
s.NoError(err)
|
||||
|
||||
@ -27,6 +27,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
)
|
||||
|
||||
@ -91,15 +92,23 @@ type ImportJob interface {
|
||||
GetCompleteTime() string
|
||||
GetFiles() []*internalpb.ImportFile
|
||||
GetOptions() []*commonpb.KeyValuePair
|
||||
GetTR() *timerecord.TimeRecorder
|
||||
Clone() ImportJob
|
||||
}
|
||||
|
||||
type importJob struct {
|
||||
*datapb.ImportJob
|
||||
|
||||
tr *timerecord.TimeRecorder
|
||||
}
|
||||
|
||||
func (j *importJob) GetTR() *timerecord.TimeRecorder {
|
||||
return j.tr
|
||||
}
|
||||
|
||||
func (j *importJob) Clone() ImportJob {
|
||||
return &importJob{
|
||||
ImportJob: proto.Clone(j.ImportJob).(*datapb.ImportJob),
|
||||
tr: j.tr,
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ package datacoord
|
||||
import (
|
||||
"github.com/milvus-io/milvus/internal/metastore"
|
||||
"github.com/milvus-io/milvus/pkg/util/lock"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
)
|
||||
|
||||
type ImportMeta interface {
|
||||
@ -61,11 +62,13 @@ func NewImportMeta(catalog metastore.DataCoordCatalog) (ImportMeta, error) {
|
||||
for _, task := range restoredPreImportTasks {
|
||||
tasks[task.GetTaskID()] = &preImportTask{
|
||||
PreImportTask: task,
|
||||
tr: timerecord.NewTimeRecorder("preimport task"),
|
||||
}
|
||||
}
|
||||
for _, task := range restoredImportTasks {
|
||||
tasks[task.GetTaskID()] = &importTask{
|
||||
ImportTaskV2: task,
|
||||
tr: timerecord.NewTimeRecorder("import task"),
|
||||
}
|
||||
}
|
||||
|
||||
@ -73,6 +76,7 @@ func NewImportMeta(catalog metastore.DataCoordCatalog) (ImportMeta, error) {
|
||||
for _, job := range restoredJobs {
|
||||
jobs[job.GetJobID()] = &importJob{
|
||||
ImportJob: job,
|
||||
tr: timerecord.NewTimeRecorder("import job"),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -93,23 +93,6 @@ func (s *importScheduler) Close() {
|
||||
}
|
||||
|
||||
func (s *importScheduler) process() {
|
||||
getNodeID := func(nodeSlots map[int64]int64) int64 {
|
||||
var (
|
||||
nodeID int64 = NullNodeID
|
||||
maxSlots int64 = -1
|
||||
)
|
||||
for id, slots := range nodeSlots {
|
||||
if slots > 0 && slots > maxSlots {
|
||||
nodeID = id
|
||||
maxSlots = slots
|
||||
}
|
||||
}
|
||||
if nodeID != NullNodeID {
|
||||
nodeSlots[nodeID]--
|
||||
}
|
||||
return nodeID
|
||||
}
|
||||
|
||||
jobs := s.imeta.GetJobBy()
|
||||
sort.Slice(jobs, func(i, j int) bool {
|
||||
return jobs[i].GetJobID() < jobs[j].GetJobID()
|
||||
@ -120,7 +103,7 @@ func (s *importScheduler) process() {
|
||||
for _, task := range tasks {
|
||||
switch task.GetState() {
|
||||
case datapb.ImportTaskStateV2_Pending:
|
||||
nodeID := getNodeID(nodeSlots)
|
||||
nodeID := s.getNodeID(task, nodeSlots)
|
||||
switch task.GetType() {
|
||||
case PreImportTaskType:
|
||||
s.processPendingPreImport(task, nodeID)
|
||||
@ -169,6 +152,25 @@ func (s *importScheduler) peekSlots() map[int64]int64 {
|
||||
return nodeSlots
|
||||
}
|
||||
|
||||
func (s *importScheduler) getNodeID(task ImportTask, nodeSlots map[int64]int64) int64 {
|
||||
var (
|
||||
nodeID int64 = NullNodeID
|
||||
maxSlots int64 = -1
|
||||
)
|
||||
require := task.GetSlots()
|
||||
for id, slots := range nodeSlots {
|
||||
// find the most idle datanode
|
||||
if slots > 0 && slots >= require && slots > maxSlots {
|
||||
nodeID = id
|
||||
maxSlots = slots
|
||||
}
|
||||
}
|
||||
if nodeID != NullNodeID {
|
||||
nodeSlots[nodeID] -= require
|
||||
}
|
||||
return nodeID
|
||||
}
|
||||
|
||||
func (s *importScheduler) processPendingPreImport(task ImportTask, nodeID int64) {
|
||||
if nodeID == NullNodeID {
|
||||
return
|
||||
@ -188,7 +190,9 @@ func (s *importScheduler) processPendingPreImport(task ImportTask, nodeID int64)
|
||||
log.Warn("update import task failed", WrapTaskLog(task, zap.Error(err))...)
|
||||
return
|
||||
}
|
||||
log.Info("process pending preimport task done", WrapTaskLog(task)...)
|
||||
pendingDuration := task.GetTR().RecordSpan()
|
||||
metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStagePending).Observe(float64(pendingDuration.Milliseconds()))
|
||||
log.Info("preimport task start to execute", WrapTaskLog(task, zap.Int64("scheduledNodeID", nodeID), zap.Duration("taskTimeCost/pending", pendingDuration))...)
|
||||
}
|
||||
|
||||
func (s *importScheduler) processPendingImport(task ImportTask, nodeID int64) {
|
||||
@ -214,7 +218,9 @@ func (s *importScheduler) processPendingImport(task ImportTask, nodeID int64) {
|
||||
log.Warn("update import task failed", WrapTaskLog(task, zap.Error(err))...)
|
||||
return
|
||||
}
|
||||
log.Info("processing pending import task done", WrapTaskLog(task)...)
|
||||
pendingDuration := task.GetTR().RecordSpan()
|
||||
metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStagePending).Observe(float64(pendingDuration.Milliseconds()))
|
||||
log.Info("import task start to execute", WrapTaskLog(task, zap.Int64("scheduledNodeID", nodeID), zap.Duration("taskTimeCost/pending", pendingDuration))...)
|
||||
}
|
||||
|
||||
func (s *importScheduler) processInProgressPreImport(task ImportTask) {
|
||||
@ -251,6 +257,11 @@ func (s *importScheduler) processInProgressPreImport(task ImportTask) {
|
||||
}
|
||||
log.Info("query preimport", WrapTaskLog(task, zap.String("state", resp.GetState().String()),
|
||||
zap.Any("fileStats", resp.GetFileStats()))...)
|
||||
if resp.GetState() == datapb.ImportTaskStateV2_Completed {
|
||||
preimportDuration := task.GetTR().RecordSpan()
|
||||
metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStagePreImport).Observe(float64(preimportDuration.Milliseconds()))
|
||||
log.Info("preimport done", WrapTaskLog(task, zap.Duration("taskTimeCost/preimport", preimportDuration))...)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *importScheduler) processInProgressImport(task ImportTask) {
|
||||
@ -328,6 +339,9 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
|
||||
log.Warn("update import task failed", WrapTaskLog(task, zap.Error(err))...)
|
||||
return
|
||||
}
|
||||
importDuration := task.GetTR().RecordSpan()
|
||||
metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStageImport).Observe(float64(importDuration.Milliseconds()))
|
||||
log.Info("import done", WrapTaskLog(task, zap.Duration("taskTimeCost/import", importDuration))...)
|
||||
}
|
||||
log.Info("query import", WrapTaskLog(task, zap.String("state", resp.GetState().String()),
|
||||
zap.String("reason", resp.GetReason()))...)
|
||||
|
||||
@ -29,6 +29,7 @@ import (
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/metastore/mocks"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
)
|
||||
|
||||
type ImportSchedulerSuite struct {
|
||||
@ -85,6 +86,7 @@ func (s *ImportSchedulerSuite) TestProcessPreImport() {
|
||||
CollectionID: s.collectionID,
|
||||
State: datapb.ImportTaskStateV2_Pending,
|
||||
},
|
||||
tr: timerecord.NewTimeRecorder("preimport task"),
|
||||
}
|
||||
err := s.imeta.AddTask(task)
|
||||
s.NoError(err)
|
||||
@ -95,6 +97,7 @@ func (s *ImportSchedulerSuite) TestProcessPreImport() {
|
||||
TimeoutTs: math.MaxUint64,
|
||||
Schema: &schemapb.CollectionSchema{},
|
||||
},
|
||||
tr: timerecord.NewTimeRecorder("import job"),
|
||||
}
|
||||
err = s.imeta.AddJob(job)
|
||||
s.NoError(err)
|
||||
@ -156,6 +159,7 @@ func (s *ImportSchedulerSuite) TestProcessImport() {
|
||||
},
|
||||
},
|
||||
},
|
||||
tr: timerecord.NewTimeRecorder("import task"),
|
||||
}
|
||||
err := s.imeta.AddTask(task)
|
||||
s.NoError(err)
|
||||
@ -168,6 +172,7 @@ func (s *ImportSchedulerSuite) TestProcessImport() {
|
||||
Schema: &schemapb.CollectionSchema{},
|
||||
TimeoutTs: math.MaxUint64,
|
||||
},
|
||||
tr: timerecord.NewTimeRecorder("import job"),
|
||||
}
|
||||
err = s.imeta.AddJob(job)
|
||||
s.NoError(err)
|
||||
@ -222,6 +227,7 @@ func (s *ImportSchedulerSuite) TestProcessFailed() {
|
||||
SegmentIDs: []int64{2, 3},
|
||||
State: datapb.ImportTaskStateV2_Failed,
|
||||
},
|
||||
tr: timerecord.NewTimeRecorder("import task"),
|
||||
}
|
||||
err := s.imeta.AddTask(task)
|
||||
s.NoError(err)
|
||||
@ -234,6 +240,7 @@ func (s *ImportSchedulerSuite) TestProcessFailed() {
|
||||
Schema: &schemapb.CollectionSchema{},
|
||||
TimeoutTs: math.MaxUint64,
|
||||
},
|
||||
tr: timerecord.NewTimeRecorder("import job"),
|
||||
}
|
||||
err = s.imeta.AddJob(job)
|
||||
s.NoError(err)
|
||||
|
||||
@ -17,9 +17,12 @@
|
||||
package datacoord
|
||||
|
||||
import (
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
)
|
||||
|
||||
type TaskType int
|
||||
@ -131,33 +134,62 @@ type ImportTask interface {
|
||||
GetState() datapb.ImportTaskStateV2
|
||||
GetReason() string
|
||||
GetFileStats() []*datapb.ImportFileStats
|
||||
GetTR() *timerecord.TimeRecorder
|
||||
GetSlots() int64
|
||||
Clone() ImportTask
|
||||
}
|
||||
|
||||
type preImportTask struct {
|
||||
*datapb.PreImportTask
|
||||
tr *timerecord.TimeRecorder
|
||||
}
|
||||
|
||||
func (p *preImportTask) GetType() TaskType {
|
||||
return PreImportTaskType
|
||||
}
|
||||
|
||||
func (p *preImportTask) GetTR() *timerecord.TimeRecorder {
|
||||
return p.tr
|
||||
}
|
||||
|
||||
func (p *preImportTask) GetSlots() int64 {
|
||||
return int64(funcutil.Min(len(p.GetFileStats()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt()))
|
||||
}
|
||||
|
||||
func (p *preImportTask) Clone() ImportTask {
|
||||
return &preImportTask{
|
||||
PreImportTask: proto.Clone(p.PreImportTask).(*datapb.PreImportTask),
|
||||
tr: p.tr,
|
||||
}
|
||||
}
|
||||
|
||||
type importTask struct {
|
||||
*datapb.ImportTaskV2
|
||||
tr *timerecord.TimeRecorder
|
||||
}
|
||||
|
||||
func (t *importTask) GetType() TaskType {
|
||||
return ImportTaskType
|
||||
}
|
||||
|
||||
func (t *importTask) GetTR() *timerecord.TimeRecorder {
|
||||
return t.tr
|
||||
}
|
||||
|
||||
func (t *importTask) GetSlots() int64 {
|
||||
// Consider the following two scenarios:
|
||||
// 1. Importing a large number of small files results in
|
||||
// a small total data size, making file count unsuitable as a slot number.
|
||||
// 2. Importing a file with many shards number results in many segments and a small total data size,
|
||||
// making segment count unsuitable as a slot number.
|
||||
// Taking these factors into account, we've decided to use the
|
||||
// minimum value between segment count and file count as the slot number.
|
||||
return int64(funcutil.Min(len(t.GetFileStats()), len(t.GetSegmentIDs()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt()))
|
||||
}
|
||||
|
||||
func (t *importTask) Clone() ImportTask {
|
||||
return &importTask{
|
||||
ImportTaskV2: proto.Clone(t.ImportTaskV2).(*datapb.ImportTaskV2),
|
||||
tr: t.tr,
|
||||
}
|
||||
}
|
||||
|
||||
@ -37,6 +37,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
)
|
||||
|
||||
func WrapTaskLog(task ImportTask, fields ...zap.Field) []zap.Field {
|
||||
@ -45,6 +46,7 @@ func WrapTaskLog(task ImportTask, fields ...zap.Field) []zap.Field {
|
||||
zap.Int64("jobID", task.GetJobID()),
|
||||
zap.Int64("collectionID", task.GetCollectionID()),
|
||||
zap.String("type", task.GetType().String()),
|
||||
zap.Int64("nodeID", task.GetNodeID()),
|
||||
}
|
||||
res = append(res, fields...)
|
||||
return res
|
||||
@ -73,6 +75,7 @@ func NewPreImportTasks(fileGroups [][]*internalpb.ImportFile,
|
||||
State: datapb.ImportTaskStateV2_Pending,
|
||||
FileStats: fileStats,
|
||||
},
|
||||
tr: timerecord.NewTimeRecorder("preimport task"),
|
||||
}
|
||||
tasks = append(tasks, task)
|
||||
}
|
||||
@ -97,6 +100,7 @@ func NewImportTasks(fileGroups [][]*datapb.ImportFileStats,
|
||||
State: datapb.ImportTaskStateV2_Pending,
|
||||
FileStats: group,
|
||||
},
|
||||
tr: timerecord.NewTimeRecorder("import task"),
|
||||
}
|
||||
segments, err := AssignSegments(job, task, alloc, meta)
|
||||
if err != nil {
|
||||
|
||||
@ -46,6 +46,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
@ -1680,6 +1681,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
|
||||
return importFile
|
||||
})
|
||||
|
||||
startTime := time.Now()
|
||||
job := &importJob{
|
||||
ImportJob: &datapb.ImportJob{
|
||||
JobID: idStart,
|
||||
@ -1693,8 +1695,9 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
|
||||
State: internalpb.ImportJobState_Pending,
|
||||
Files: files,
|
||||
Options: in.GetOptions(),
|
||||
StartTime: time.Now().Format("2006-01-02T15:04:05Z07:00"),
|
||||
StartTime: startTime.Format("2006-01-02T15:04:05Z07:00"),
|
||||
},
|
||||
tr: timerecord.NewTimeRecorder("import job"),
|
||||
}
|
||||
err = s.importMeta.AddJob(job)
|
||||
if err != nil {
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
package importv2
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -64,6 +65,9 @@ func (s *scheduler) Start() {
|
||||
return
|
||||
case <-exeTicker.C:
|
||||
tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending))
|
||||
sort.Slice(tasks, func(i, j int) bool {
|
||||
return tasks[i].GetTaskID() < tasks[j].GetTaskID()
|
||||
})
|
||||
futures := make(map[int64][]*conc.Future[any])
|
||||
for _, task := range tasks {
|
||||
fs := task.Execute()
|
||||
@ -86,7 +90,15 @@ func (s *scheduler) Start() {
|
||||
|
||||
func (s *scheduler) Slots() int64 {
|
||||
tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending, datapb.ImportTaskStateV2_InProgress))
|
||||
return paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt64() - int64(len(tasks))
|
||||
used := lo.SumBy(tasks, func(t Task) int64 {
|
||||
return t.GetSlots()
|
||||
})
|
||||
total := paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt64()
|
||||
free := total - used
|
||||
if free >= 0 {
|
||||
return free
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (s *scheduler) Close() {
|
||||
|
||||
@ -161,6 +161,7 @@ type Task interface {
|
||||
GetState() datapb.ImportTaskStateV2
|
||||
GetReason() string
|
||||
GetSchema() *schemapb.CollectionSchema
|
||||
GetSlots() int64
|
||||
Cancel()
|
||||
Clone() Task
|
||||
}
|
||||
|
||||
@ -34,6 +34,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/util/importutilv2"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
@ -97,6 +98,17 @@ func (t *ImportTask) GetSchema() *schemapb.CollectionSchema {
|
||||
return t.req.GetSchema()
|
||||
}
|
||||
|
||||
func (t *ImportTask) GetSlots() int64 {
|
||||
// Consider the following two scenarios:
|
||||
// 1. Importing a large number of small files results in
|
||||
// a small total data size, making file count unsuitable as a slot number.
|
||||
// 2. Importing a file with many shards number results in many segments and a small total data size,
|
||||
// making segment count unsuitable as a slot number.
|
||||
// Taking these factors into account, we've decided to use the
|
||||
// minimum value between segment count and file count as the slot number.
|
||||
return int64(funcutil.Min(len(t.GetFileStats()), len(t.GetSegmentIDs()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt()))
|
||||
}
|
||||
|
||||
func (t *ImportTask) Cancel() {
|
||||
t.cancel()
|
||||
}
|
||||
|
||||
@ -93,6 +93,10 @@ func (t *L0ImportTask) GetSchema() *schemapb.CollectionSchema {
|
||||
return t.req.GetSchema()
|
||||
}
|
||||
|
||||
func (t *L0ImportTask) GetSlots() int64 {
|
||||
return 1
|
||||
}
|
||||
|
||||
func (t *L0ImportTask) Cancel() {
|
||||
t.cancel()
|
||||
}
|
||||
|
||||
@ -94,6 +94,10 @@ func (t *L0PreImportTask) GetSchema() *schemapb.CollectionSchema {
|
||||
return t.schema
|
||||
}
|
||||
|
||||
func (t *L0PreImportTask) GetSlots() int64 {
|
||||
return 1
|
||||
}
|
||||
|
||||
func (t *L0PreImportTask) Cancel() {
|
||||
t.cancel()
|
||||
}
|
||||
|
||||
@ -34,6 +34,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/util/importutilv2"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
@ -101,6 +102,10 @@ func (t *PreImportTask) GetSchema() *schemapb.CollectionSchema {
|
||||
return t.schema
|
||||
}
|
||||
|
||||
func (t *PreImportTask) GetSlots() int64 {
|
||||
return int64(funcutil.Min(len(t.GetFileStats()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt()))
|
||||
}
|
||||
|
||||
func (t *PreImportTask) Cancel() {
|
||||
t.cancel()
|
||||
}
|
||||
|
||||
@ -212,6 +212,28 @@ var (
|
||||
stageLabelName,
|
||||
})
|
||||
|
||||
ImportJobLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.DataCoordRole,
|
||||
Name: "import_job_latency",
|
||||
Help: "latency of import job",
|
||||
Buckets: longTaskBuckets,
|
||||
}, []string{
|
||||
importStageLabelName,
|
||||
})
|
||||
|
||||
ImportTaskLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
Subsystem: typeutil.DataCoordRole,
|
||||
Name: "import_task_latency",
|
||||
Help: "latency of import task",
|
||||
Buckets: longTaskBuckets,
|
||||
}, []string{
|
||||
importStageLabelName,
|
||||
})
|
||||
|
||||
FlushedSegmentFileNum = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: milvusNamespace,
|
||||
@ -342,6 +364,8 @@ func RegisterDataCoord(registry *prometheus.Registry) {
|
||||
registry.MustRegister(DataCoordCompactedSegmentSize)
|
||||
registry.MustRegister(DataCoordCompactionTaskNum)
|
||||
registry.MustRegister(DataCoordCompactionLatency)
|
||||
registry.MustRegister(ImportJobLatency)
|
||||
registry.MustRegister(ImportTaskLatency)
|
||||
registry.MustRegister(DataCoordSizeStoredL0Segment)
|
||||
registry.MustRegister(DataCoordL0DeleteEntriesNum)
|
||||
registry.MustRegister(FlushedSegmentFileNum)
|
||||
|
||||
@ -76,6 +76,11 @@ const (
|
||||
Executing = "executing"
|
||||
Done = "done"
|
||||
|
||||
ImportStagePending = "pending"
|
||||
ImportStagePreImport = "preimport"
|
||||
ImportStageImport = "import"
|
||||
ImportStageBuildIndex = "build_index"
|
||||
|
||||
compactionTypeLabelName = "compaction_type"
|
||||
isVectorFieldLabelName = "is_vector_field"
|
||||
segmentPruneLabelName = "segment_prune_label"
|
||||
@ -103,6 +108,7 @@ const (
|
||||
cacheStateLabelName = "cache_state"
|
||||
indexCountLabelName = "indexed_field_count"
|
||||
dataSourceLabelName = "data_source"
|
||||
importStageLabelName = "import_stage"
|
||||
requestScope = "scope"
|
||||
fullMethodLabelName = "full_method"
|
||||
reduceLevelName = "reduce_level"
|
||||
|
||||
@ -4085,6 +4085,7 @@ type dataNodeConfig struct {
|
||||
MaxConcurrentImportTaskNum ParamItem `refreshable:"true"`
|
||||
MaxImportFileSizeInGB ParamItem `refreshable:"true"`
|
||||
ReadBufferSizeInMB ParamItem `refreshable:"true"`
|
||||
MaxTaskSlotNum ParamItem `refreshable:"true"`
|
||||
|
||||
// Compaction
|
||||
L0BatchMemoryRatio ParamItem `refreshable:"true"`
|
||||
@ -4389,6 +4390,16 @@ if this parameter <= 0, will set it as 10`,
|
||||
}
|
||||
p.ReadBufferSizeInMB.Init(base.mgr)
|
||||
|
||||
p.MaxTaskSlotNum = ParamItem{
|
||||
Key: "dataNode.import.maxTaskSlotNum",
|
||||
Version: "2.4.13",
|
||||
Doc: "The maximum number of slots occupied by each import/pre-import task.",
|
||||
DefaultValue: "16",
|
||||
PanicIfEmpty: false,
|
||||
Export: true,
|
||||
}
|
||||
p.MaxTaskSlotNum.Init(base.mgr)
|
||||
|
||||
p.L0BatchMemoryRatio = ParamItem{
|
||||
Key: "dataNode.compaction.levelZeroBatchMemoryRatio",
|
||||
Version: "2.4.0",
|
||||
|
||||
@ -554,6 +554,7 @@ func TestComponentParam(t *testing.T) {
|
||||
assert.Equal(t, 16, maxConcurrentImportTaskNum)
|
||||
assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64())
|
||||
assert.Equal(t, 16, Params.ReadBufferSizeInMB.GetAsInt())
|
||||
assert.Equal(t, 16, Params.MaxTaskSlotNum.GetAsInt())
|
||||
params.Save("datanode.gracefulStopTimeout", "100")
|
||||
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
|
||||
assert.Equal(t, 4, Params.BloomFilterApplyParallelFactor.GetAsInt())
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user