mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 10:08:42 +08:00
Merge branch 'branch-0.4.0' into 'branch-0.4.0'
MS-364 Modify tasktableitem in tasktable See merge request megasearch/milvus!374 Former-commit-id: 045ac24b40f35cb5b36835efbb2b7489d8c3fae3
This commit is contained in:
commit
90c5e47bfc
@ -19,6 +19,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
- MS-357 - Add minimum schedule function
|
||||
- MS-359 - Add cost test in new scheduler
|
||||
- MS-361 - Add event in resource
|
||||
- MS-364 - Modify tasktableitem in tasktable
|
||||
|
||||
## New Feature
|
||||
- MS-343 - Implement ResourceMgr
|
||||
|
||||
@ -21,7 +21,7 @@ push_task(ResourcePtr &self, ResourcePtr &other) {
|
||||
auto indexes = PickToMove(self_task_table, cache, 1);
|
||||
for (auto index : indexes) {
|
||||
if (self_task_table.Move(index)) {
|
||||
auto task = self_task_table.Get(index).task;
|
||||
auto task = self_task_table.Get(index)->task;
|
||||
other_task_table.Put(task);
|
||||
// TODO: mark moved future
|
||||
other->WakeupLoader();
|
||||
|
||||
@ -27,7 +27,7 @@ TaskTable::Put(std::vector<TaskPtr> &tasks) {
|
||||
}
|
||||
|
||||
|
||||
TaskTableItem &
|
||||
TaskTableItemPtr
|
||||
TaskTable::Get(uint64_t index) {
|
||||
return table_[index];
|
||||
}
|
||||
@ -46,9 +46,9 @@ bool
|
||||
TaskTable::Move(uint64_t index) {
|
||||
auto &task = table_[index];
|
||||
|
||||
std::lock_guard<std::mutex> lock(task.mutex);
|
||||
if (task.state == TaskTableItemState::START) {
|
||||
task.state = TaskTableItemState::LOADING;
|
||||
std::lock_guard<std::mutex> lock(task->mutex);
|
||||
if (task->state == TaskTableItemState::START) {
|
||||
task->state = TaskTableItemState::LOADING;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
||||
@ -31,7 +31,7 @@ struct TaskTableItem {
|
||||
TaskTableItem() : id(0), state(TaskTableItemState::INVALID), mutex(), priority(0) {}
|
||||
|
||||
TaskTableItem(const TaskTableItem &src)
|
||||
: id(src.id), state(src.state), mutex(), priority(src.priority) {}
|
||||
: id(src.id), state(src.state), mutex(), priority(src.priority) {}
|
||||
|
||||
uint64_t id; // auto increment from 0;
|
||||
// TODO: add tag into task
|
||||
@ -42,6 +42,8 @@ struct TaskTableItem {
|
||||
uint8_t priority; // just a number, meaningless;
|
||||
};
|
||||
|
||||
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;
|
||||
|
||||
class TaskTable {
|
||||
public:
|
||||
TaskTable() = default;
|
||||
@ -65,7 +67,7 @@ public:
|
||||
/*
|
||||
* Return task table item reference;
|
||||
*/
|
||||
TaskTableItem &
|
||||
TaskTableItemPtr
|
||||
Get(uint64_t index);
|
||||
|
||||
/*
|
||||
@ -75,7 +77,7 @@ public:
|
||||
*/
|
||||
void
|
||||
Clear();
|
||||
|
||||
|
||||
/*
|
||||
* Return true if task table empty, otherwise false;
|
||||
*/
|
||||
@ -83,11 +85,11 @@ public:
|
||||
Empty() {
|
||||
return table_.empty();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Return size of task table;
|
||||
*/
|
||||
inline size_t
|
||||
inline size_t
|
||||
Size() {
|
||||
return table_.size();
|
||||
}
|
||||
@ -154,7 +156,7 @@ public:
|
||||
|
||||
private:
|
||||
// TODO: map better ?
|
||||
std::deque<TaskTableItem> table_;
|
||||
std::deque<TaskTableItemPtr> table_;
|
||||
};
|
||||
|
||||
|
||||
|
||||
@ -46,7 +46,7 @@ TaskPtr Resource::pick_task_load() {
|
||||
for (auto index : indexes) {
|
||||
// try to set one task loading, then return
|
||||
if (task_table_.Load(index))
|
||||
return task_table_.Get(index).task;
|
||||
return task_table_.Get(index)->task;
|
||||
// else try next
|
||||
}
|
||||
return nullptr;
|
||||
@ -57,7 +57,7 @@ TaskPtr Resource::pick_task_execute() {
|
||||
for (auto index : indexes) {
|
||||
// try to set one task executing, then return
|
||||
if (task_table_.Execute(index))
|
||||
return task_table_.Get(index).task;
|
||||
return task_table_.Get(index)->task;
|
||||
// else try next
|
||||
}
|
||||
return nullptr;
|
||||
|
||||
@ -13,14 +13,14 @@ protected:
|
||||
auto task = std::make_shared<XSearchTask>();
|
||||
table_.Put(task);
|
||||
}
|
||||
table_.Get(0).state = TaskTableItemState::INVALID;
|
||||
table_.Get(1).state = TaskTableItemState::START;
|
||||
table_.Get(2).state = TaskTableItemState::LOADING;
|
||||
table_.Get(3).state = TaskTableItemState::LOADED;
|
||||
table_.Get(4).state = TaskTableItemState::EXECUTING;
|
||||
table_.Get(5).state = TaskTableItemState::EXECUTED;
|
||||
table_.Get(6).state = TaskTableItemState::MOVING;
|
||||
table_.Get(7).state = TaskTableItemState::MOVED;
|
||||
table_.Get(0)->state = TaskTableItemState::INVALID;
|
||||
table_.Get(1)->state = TaskTableItemState::START;
|
||||
table_.Get(2)->state = TaskTableItemState::LOADING;
|
||||
table_.Get(3)->state = TaskTableItemState::LOADED;
|
||||
table_.Get(4)->state = TaskTableItemState::EXECUTING;
|
||||
table_.Get(5)->state = TaskTableItemState::EXECUTED;
|
||||
table_.Get(6)->state = TaskTableItemState::MOVING;
|
||||
table_.Get(7)->state = TaskTableItemState::MOVED;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -58,19 +58,19 @@ protected:
|
||||
|
||||
TEST_F(TaskTableBaseTest, put_task) {
|
||||
empty_table_.Put(task1_);
|
||||
ASSERT_EQ(empty_table_.Get(0).task, task1_);
|
||||
ASSERT_EQ(empty_table_.Get(0)->task, task1_);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, put_invalid_test) {
|
||||
empty_table_.Put(invalid_task_);
|
||||
ASSERT_EQ(empty_table_.Get(0).task, invalid_task_);
|
||||
ASSERT_EQ(empty_table_.Get(0)->task, invalid_task_);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, put_batch) {
|
||||
std::vector<TaskPtr> tasks{task1_, task2_};
|
||||
empty_table_.Put(tasks);
|
||||
ASSERT_EQ(empty_table_.Get(0).task, task1_);
|
||||
ASSERT_EQ(empty_table_.Get(1).task, task2_);
|
||||
ASSERT_EQ(empty_table_.Get(0)->task, task1_);
|
||||
ASSERT_EQ(empty_table_.Get(1)->task, task2_);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableBaseTest, put_empty_batch) {
|
||||
@ -89,14 +89,14 @@ protected:
|
||||
table1_.Put(task);
|
||||
}
|
||||
|
||||
table1_.Get(0).state = TaskTableItemState::INVALID;
|
||||
table1_.Get(1).state = TaskTableItemState::START;
|
||||
table1_.Get(2).state = TaskTableItemState::LOADING;
|
||||
table1_.Get(3).state = TaskTableItemState::LOADED;
|
||||
table1_.Get(4).state = TaskTableItemState::EXECUTING;
|
||||
table1_.Get(5).state = TaskTableItemState::EXECUTED;
|
||||
table1_.Get(6).state = TaskTableItemState::MOVING;
|
||||
table1_.Get(7).state = TaskTableItemState::MOVED;
|
||||
table1_.Get(0)->state = TaskTableItemState::INVALID;
|
||||
table1_.Get(1)->state = TaskTableItemState::START;
|
||||
table1_.Get(2)->state = TaskTableItemState::LOADING;
|
||||
table1_.Get(3)->state = TaskTableItemState::LOADED;
|
||||
table1_.Get(4)->state = TaskTableItemState::EXECUTING;
|
||||
table1_.Get(5)->state = TaskTableItemState::EXECUTED;
|
||||
table1_.Get(6)->state = TaskTableItemState::MOVING;
|
||||
table1_.Get(7)->state = TaskTableItemState::MOVED;
|
||||
}
|
||||
|
||||
TaskTable table1_;
|
||||
@ -106,22 +106,22 @@ TEST_F(TaskTableAdvanceTest, load) {
|
||||
table1_.Load(1);
|
||||
table1_.Loaded(2);
|
||||
|
||||
ASSERT_EQ(table1_.Get(1).state, TaskTableItemState::LOADING);
|
||||
ASSERT_EQ(table1_.Get(2).state, TaskTableItemState::LOADED);
|
||||
ASSERT_EQ(table1_.Get(1)->state, TaskTableItemState::LOADING);
|
||||
ASSERT_EQ(table1_.Get(2)->state, TaskTableItemState::LOADED);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableAdvanceTest, execute) {
|
||||
table1_.Execute(3);
|
||||
table1_.Executed(4);
|
||||
|
||||
ASSERT_EQ(table1_.Get(3).state, TaskTableItemState::EXECUTING);
|
||||
ASSERT_EQ(table1_.Get(4).state, TaskTableItemState::EXECUTED);
|
||||
ASSERT_EQ(table1_.Get(3)->state, TaskTableItemState::EXECUTING);
|
||||
ASSERT_EQ(table1_.Get(4)->state, TaskTableItemState::EXECUTED);
|
||||
}
|
||||
|
||||
TEST_F(TaskTableAdvanceTest, move) {
|
||||
table1_.Move(3);
|
||||
table1_.Moved(6);
|
||||
|
||||
ASSERT_EQ(table1_.Get(3).state, TaskTableItemState::MOVING);
|
||||
ASSERT_EQ(table1_.Get(6).state, TaskTableItemState::MOVED);
|
||||
ASSERT_EQ(table1_.Get(3)->state, TaskTableItemState::MOVING);
|
||||
ASSERT_EQ(table1_.Get(6)->state, TaskTableItemState::MOVED);
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user