diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 4acfab4b8e..338b2d749f 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -704,7 +704,6 @@ dataNode: maxConcurrentTaskNum: 16 # The maximum number of import/pre-import tasks allowed to run concurrently on a datanode. maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files. readBufferSizeInMB: 16 # The data block size (in MB) read from chunk manager by the datanode during import. - maxTaskSlotNum: 16 # The maximum number of slots occupied by each import/pre-import task. compaction: levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1. diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index 871733a25c..b7f6dbf969 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -102,7 +102,7 @@ func (s *importScheduler) process() { for _, task := range tasks { switch task.GetState() { case datapb.ImportTaskStateV2_Pending: - nodeID := s.getNodeID(task, nodeSlots) + nodeID := s.pickNode(task, nodeSlots) switch task.GetType() { case PreImportTaskType: s.processPendingPreImport(task, nodeID) @@ -151,23 +151,30 @@ func (s *importScheduler) peekSlots() map[int64]int64 { return nodeSlots } -func (s *importScheduler) getNodeID(task ImportTask, nodeSlots map[int64]int64) int64 { +func (s *importScheduler) pickNode(task ImportTask, nodeSlots map[int64]int64) int64 { var ( - nodeID int64 = NullNodeID - maxSlots int64 = -1 + fallbackNodeID int64 = NullNodeID + maxAvailableSlots int64 = -1 ) require := task.GetSlots() - for id, slots := range nodeSlots { - // find the most idle datanode - if slots > 0 && slots >= require && slots > maxSlots { - nodeID = id - maxSlots = slots + for id, availableSlots := range nodeSlots { + // if the node has enough slots, assign the task to the node + if availableSlots >= require { + nodeSlots[id] -= require + return id + } + // find the node with the most available slots + if availableSlots > 0 && availableSlots > maxAvailableSlots { + fallbackNodeID = id + maxAvailableSlots = availableSlots } } - if nodeID != NullNodeID { - nodeSlots[nodeID] -= require + if fallbackNodeID != NullNodeID { + // if no node has enough slots, assign the task to the node with the most available slots + nodeSlots[fallbackNodeID] = 0 + return fallbackNodeID } - return nodeID + return NullNodeID } func (s *importScheduler) processPendingPreImport(task ImportTask, nodeID int64) { diff --git a/internal/datacoord/import_scheduler_test.go b/internal/datacoord/import_scheduler_test.go index ac739167b1..6fa564f021 100644 --- a/internal/datacoord/import_scheduler_test.go +++ b/internal/datacoord/import_scheduler_test.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/datacoord/session" "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" "github.com/milvus-io/milvus/pkg/v2/util/timerecord" ) @@ -283,6 +284,64 @@ func (s *ImportSchedulerSuite) TestProcessFailed() { s.Equal(int64(NullNodeID), task.GetNodeID()) } +func (s *ImportSchedulerSuite) TestPickNode() { + task := &importTask{ + ImportTaskV2: &datapb.ImportTaskV2{ + JobID: 0, + TaskID: 1, + CollectionID: s.collectionID, + State: datapb.ImportTaskStateV2_Pending, + FileStats: []*datapb.ImportFileStats{ + {ImportFile: &internalpb.ImportFile{Id: 1}}, + {ImportFile: &internalpb.ImportFile{Id: 2}}, + {ImportFile: &internalpb.ImportFile{Id: 3}}, + {ImportFile: &internalpb.ImportFile{Id: 4}}, + }, + SegmentIDs: []int64{1, 2, 3, 4}, + }, + tr: timerecord.NewTimeRecorder("import task"), + } + s.Equal(int64(4), task.GetSlots()) + + // Test case 1: Node with sufficient slots + nodeSlots := map[int64]int64{ + 1: 5, + 2: 3, + 3: 10, + } + nodeID := s.scheduler.pickNode(task, nodeSlots) + s.True(nodeID == int64(1) || nodeID == int64(3)) // Should select node 1 or 3 as it has sufficient slots + if nodeID == int64(1) { + s.Equal(int64(1), nodeSlots[1]) + } else { + s.Equal(int64(6), nodeSlots[3]) + } + + // Test case 2: No node has sufficient slots, select node with most slots + nodeSlots = map[int64]int64{ + 1: 2, + 2: 3, + 3: 1, + } + nodeID = s.scheduler.pickNode(task, nodeSlots) + s.Equal(int64(2), nodeID) // Should select node 2 as it has the most slots + s.Equal(int64(0), nodeSlots[2]) // Node 2's slots should be exhausted + + // Test case 3: All nodes have no slots + nodeSlots = map[int64]int64{ + 1: 0, + 2: 0, + 3: 0, + } + nodeID = s.scheduler.pickNode(task, nodeSlots) + s.Equal(int64(NullNodeID), nodeID) // Should return NullNodeID + + // Test case 4: Empty node list + nodeSlots = map[int64]int64{} + nodeID = s.scheduler.pickNode(task, nodeSlots) + s.Equal(int64(NullNodeID), nodeID) // Should return NullNodeID +} + func TestImportScheduler(t *testing.T) { suite.Run(t, new(ImportSchedulerSuite)) } diff --git a/internal/datacoord/import_task.go b/internal/datacoord/import_task.go index 1dca8d4723..76d91879ff 100644 --- a/internal/datacoord/import_task.go +++ b/internal/datacoord/import_task.go @@ -23,7 +23,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/metricsinfo" - "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/timerecord" ) @@ -166,7 +165,11 @@ func (p *preImportTask) GetTR() *timerecord.TimeRecorder { } func (p *preImportTask) GetSlots() int64 { - return int64(funcutil.Min(len(p.GetFileStats()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt())) + slots := int64(len(p.GetFileStats())) + if slots < 1 { + slots = 1 + } + return slots } func (p *preImportTask) Clone() ImportTask { @@ -212,7 +215,11 @@ func (t *importTask) GetSlots() int64 { // making segment count unsuitable as a slot number. // Taking these factors into account, we've decided to use the // minimum value between segment count and file count as the slot number. - return int64(funcutil.Min(len(t.GetFileStats()), len(t.GetSegmentIDs()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt())) + slots := int64(funcutil.Min(len(t.GetFileStats()), len(t.GetSegmentIDs()))) + if slots < 1 { + slots = 1 + } + return slots } func (t *importTask) Clone() ImportTask { diff --git a/internal/datanode/importv2/task_import.go b/internal/datanode/importv2/task_import.go index 65861e0ff9..2564935ebc 100644 --- a/internal/datanode/importv2/task_import.go +++ b/internal/datanode/importv2/task_import.go @@ -112,7 +112,11 @@ func (t *ImportTask) GetSlots() int64 { // making segment count unsuitable as a slot number. // Taking these factors into account, we've decided to use the // minimum value between segment count and file count as the slot number. - return int64(funcutil.Min(len(t.GetFileStats()), len(t.GetSegmentIDs()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt())) + slots := int64(funcutil.Min(len(t.GetFileStats()), len(t.GetSegmentIDs()))) + if slots < 1 { + slots = 1 + } + return slots } func (t *ImportTask) Cancel() { diff --git a/internal/datanode/importv2/task_preimport.go b/internal/datanode/importv2/task_preimport.go index db884a4179..055915ab89 100644 --- a/internal/datanode/importv2/task_preimport.go +++ b/internal/datanode/importv2/task_preimport.go @@ -34,7 +34,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/internalpb" "github.com/milvus-io/milvus/pkg/v2/util/conc" - "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -103,7 +102,11 @@ func (t *PreImportTask) GetSchema() *schemapb.CollectionSchema { } func (t *PreImportTask) GetSlots() int64 { - return int64(funcutil.Min(len(t.GetFileStats()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt())) + slots := int64(len(t.GetFileStats())) + if slots < 1 { + slots = 1 + } + return slots } func (t *PreImportTask) Cancel() { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 0dabe035b2..0b6f25944a 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -4707,7 +4707,6 @@ type dataNodeConfig struct { MaxConcurrentImportTaskNum ParamItem `refreshable:"true"` MaxImportFileSizeInGB ParamItem `refreshable:"true"` ReadBufferSizeInMB ParamItem `refreshable:"true"` - MaxTaskSlotNum ParamItem `refreshable:"true"` // Compaction L0BatchMemoryRatio ParamItem `refreshable:"true"` @@ -5017,16 +5016,6 @@ if this parameter <= 0, will set it as 10`, } p.ReadBufferSizeInMB.Init(base.mgr) - p.MaxTaskSlotNum = ParamItem{ - Key: "dataNode.import.maxTaskSlotNum", - Version: "2.4.13", - Doc: "The maximum number of slots occupied by each import/pre-import task.", - DefaultValue: "16", - PanicIfEmpty: false, - Export: true, - } - p.MaxTaskSlotNum.Init(base.mgr) - p.L0BatchMemoryRatio = ParamItem{ Key: "dataNode.compaction.levelZeroBatchMemoryRatio", Version: "2.4.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 49514521fa..21b353a75b 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -599,7 +599,6 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 16, maxConcurrentImportTaskNum) assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64()) assert.Equal(t, 16, Params.ReadBufferSizeInMB.GetAsInt()) - assert.Equal(t, 16, Params.MaxTaskSlotNum.GetAsInt()) params.Save("datanode.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) assert.Equal(t, 16, Params.SlotCap.GetAsInt())