fix: [hotfix-2.5.12] Fix import slot assignment (#41982) (#42058)

Assign the import task to the worker with the most available slots, even
if availableSlots < requiredSlots. This ensures tasks won’t be blocked
indefinitely.

issue: https://github.com/milvus-io/milvus/issues/41981

pr: https://github.com/milvus-io/milvus/pull/41982

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
yihao.dai 2025-05-24 18:34:26 +08:00 committed by GitHub
parent 043e333290
commit ba5ed97846
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 98 additions and 31 deletions

View File

@ -704,7 +704,6 @@ dataNode:
maxConcurrentTaskNum: 16 # The maximum number of import/pre-import tasks allowed to run concurrently on a datanode.
maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files.
readBufferSizeInMB: 16 # The data block size (in MB) read from chunk manager by the datanode during import.
maxTaskSlotNum: 16 # The maximum number of slots occupied by each import/pre-import task.
compaction:
levelZeroBatchMemoryRatio: 0.5 # The minimal memory ratio of free memory for level zero compaction executing in batch mode
levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1.

View File

@ -102,7 +102,7 @@ func (s *importScheduler) process() {
for _, task := range tasks {
switch task.GetState() {
case datapb.ImportTaskStateV2_Pending:
nodeID := s.getNodeID(task, nodeSlots)
nodeID := s.pickNode(task, nodeSlots)
switch task.GetType() {
case PreImportTaskType:
s.processPendingPreImport(task, nodeID)
@ -151,23 +151,30 @@ func (s *importScheduler) peekSlots() map[int64]int64 {
return nodeSlots
}
func (s *importScheduler) getNodeID(task ImportTask, nodeSlots map[int64]int64) int64 {
func (s *importScheduler) pickNode(task ImportTask, nodeSlots map[int64]int64) int64 {
var (
nodeID int64 = NullNodeID
maxSlots int64 = -1
fallbackNodeID int64 = NullNodeID
maxAvailableSlots int64 = -1
)
require := task.GetSlots()
for id, slots := range nodeSlots {
// find the most idle datanode
if slots > 0 && slots >= require && slots > maxSlots {
nodeID = id
maxSlots = slots
for id, availableSlots := range nodeSlots {
// if the node has enough slots, assign the task to the node
if availableSlots >= require {
nodeSlots[id] -= require
return id
}
// find the node with the most available slots
if availableSlots > 0 && availableSlots > maxAvailableSlots {
fallbackNodeID = id
maxAvailableSlots = availableSlots
}
}
if nodeID != NullNodeID {
nodeSlots[nodeID] -= require
if fallbackNodeID != NullNodeID {
// if no node has enough slots, assign the task to the node with the most available slots
nodeSlots[fallbackNodeID] = 0
return fallbackNodeID
}
return nodeID
return NullNodeID
}
func (s *importScheduler) processPendingPreImport(task ImportTask, nodeID int64) {

View File

@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
)
@ -283,6 +284,64 @@ func (s *ImportSchedulerSuite) TestProcessFailed() {
s.Equal(int64(NullNodeID), task.GetNodeID())
}
func (s *ImportSchedulerSuite) TestPickNode() {
task := &importTask{
ImportTaskV2: &datapb.ImportTaskV2{
JobID: 0,
TaskID: 1,
CollectionID: s.collectionID,
State: datapb.ImportTaskStateV2_Pending,
FileStats: []*datapb.ImportFileStats{
{ImportFile: &internalpb.ImportFile{Id: 1}},
{ImportFile: &internalpb.ImportFile{Id: 2}},
{ImportFile: &internalpb.ImportFile{Id: 3}},
{ImportFile: &internalpb.ImportFile{Id: 4}},
},
SegmentIDs: []int64{1, 2, 3, 4},
},
tr: timerecord.NewTimeRecorder("import task"),
}
s.Equal(int64(4), task.GetSlots())
// Test case 1: Node with sufficient slots
nodeSlots := map[int64]int64{
1: 5,
2: 3,
3: 10,
}
nodeID := s.scheduler.pickNode(task, nodeSlots)
s.True(nodeID == int64(1) || nodeID == int64(3)) // Should select node 1 or 3 as it has sufficient slots
if nodeID == int64(1) {
s.Equal(int64(1), nodeSlots[1])
} else {
s.Equal(int64(6), nodeSlots[3])
}
// Test case 2: No node has sufficient slots, select node with most slots
nodeSlots = map[int64]int64{
1: 2,
2: 3,
3: 1,
}
nodeID = s.scheduler.pickNode(task, nodeSlots)
s.Equal(int64(2), nodeID) // Should select node 2 as it has the most slots
s.Equal(int64(0), nodeSlots[2]) // Node 2's slots should be exhausted
// Test case 3: All nodes have no slots
nodeSlots = map[int64]int64{
1: 0,
2: 0,
3: 0,
}
nodeID = s.scheduler.pickNode(task, nodeSlots)
s.Equal(int64(NullNodeID), nodeID) // Should return NullNodeID
// Test case 4: Empty node list
nodeSlots = map[int64]int64{}
nodeID = s.scheduler.pickNode(task, nodeSlots)
s.Equal(int64(NullNodeID), nodeID) // Should return NullNodeID
}
func TestImportScheduler(t *testing.T) {
suite.Run(t, new(ImportSchedulerSuite))
}

View File

@ -23,7 +23,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
)
@ -166,7 +165,11 @@ func (p *preImportTask) GetTR() *timerecord.TimeRecorder {
}
func (p *preImportTask) GetSlots() int64 {
return int64(funcutil.Min(len(p.GetFileStats()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt()))
slots := int64(len(p.GetFileStats()))
if slots < 1 {
slots = 1
}
return slots
}
func (p *preImportTask) Clone() ImportTask {
@ -212,7 +215,11 @@ func (t *importTask) GetSlots() int64 {
// making segment count unsuitable as a slot number.
// Taking these factors into account, we've decided to use the
// minimum value between segment count and file count as the slot number.
return int64(funcutil.Min(len(t.GetFileStats()), len(t.GetSegmentIDs()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt()))
slots := int64(funcutil.Min(len(t.GetFileStats()), len(t.GetSegmentIDs())))
if slots < 1 {
slots = 1
}
return slots
}
func (t *importTask) Clone() ImportTask {

View File

@ -112,7 +112,11 @@ func (t *ImportTask) GetSlots() int64 {
// making segment count unsuitable as a slot number.
// Taking these factors into account, we've decided to use the
// minimum value between segment count and file count as the slot number.
return int64(funcutil.Min(len(t.GetFileStats()), len(t.GetSegmentIDs()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt()))
slots := int64(funcutil.Min(len(t.GetFileStats()), len(t.GetSegmentIDs())))
if slots < 1 {
slots = 1
}
return slots
}
func (t *ImportTask) Cancel() {

View File

@ -34,7 +34,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/util/conc"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -103,7 +102,11 @@ func (t *PreImportTask) GetSchema() *schemapb.CollectionSchema {
}
func (t *PreImportTask) GetSlots() int64 {
return int64(funcutil.Min(len(t.GetFileStats()), paramtable.Get().DataNodeCfg.MaxTaskSlotNum.GetAsInt()))
slots := int64(len(t.GetFileStats()))
if slots < 1 {
slots = 1
}
return slots
}
func (t *PreImportTask) Cancel() {

View File

@ -4707,7 +4707,6 @@ type dataNodeConfig struct {
MaxConcurrentImportTaskNum ParamItem `refreshable:"true"`
MaxImportFileSizeInGB ParamItem `refreshable:"true"`
ReadBufferSizeInMB ParamItem `refreshable:"true"`
MaxTaskSlotNum ParamItem `refreshable:"true"`
// Compaction
L0BatchMemoryRatio ParamItem `refreshable:"true"`
@ -5017,16 +5016,6 @@ if this parameter <= 0, will set it as 10`,
}
p.ReadBufferSizeInMB.Init(base.mgr)
p.MaxTaskSlotNum = ParamItem{
Key: "dataNode.import.maxTaskSlotNum",
Version: "2.4.13",
Doc: "The maximum number of slots occupied by each import/pre-import task.",
DefaultValue: "16",
PanicIfEmpty: false,
Export: true,
}
p.MaxTaskSlotNum.Init(base.mgr)
p.L0BatchMemoryRatio = ParamItem{
Key: "dataNode.compaction.levelZeroBatchMemoryRatio",
Version: "2.4.0",

View File

@ -599,7 +599,6 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 16, maxConcurrentImportTaskNum)
assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64())
assert.Equal(t, 16, Params.ReadBufferSizeInMB.GetAsInt())
assert.Equal(t, 16, Params.MaxTaskSlotNum.GetAsInt())
params.Save("datanode.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
assert.Equal(t, 16, Params.SlotCap.GetAsInt())