diff --git a/CHANGELOG.md b/CHANGELOG.md index 74a1951142..bcb3f5b70f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ Please mark all change in change log and use the ticket from JIRA. - \#92 - Speed up CMake build process ## Feature +- \#115 - Using new structure for tasktable + ## Task # Milvus 0.5.0 (2019-10-21) diff --git a/core/src/scheduler/BuildMgr.h b/core/src/scheduler/BuildMgr.h index ee7ab38e25..805c01aafd 100644 --- a/core/src/scheduler/BuildMgr.h +++ b/core/src/scheduler/BuildMgr.h @@ -34,27 +34,30 @@ namespace scheduler { class BuildMgr { public: - explicit BuildMgr(int64_t numoftasks) : numoftasks_(numoftasks) { + explicit BuildMgr(int64_t concurrent_limit) : available_(concurrent_limit) { } public: void Put() { - ++numoftasks_; + std::lock_guard lock(mutex_); + ++available_; } - void - take() { - --numoftasks_; - } - - int64_t - numoftasks() { - return (int64_t)numoftasks_; + bool + Take() { + std::lock_guard lock(mutex_); + if (available_ < 1) { + return false; + } else { + --available_; + return true; + } } private: - std::atomic_long numoftasks_; + std::int64_t available_; + std::mutex mutex_; }; using BuildMgrPtr = std::shared_ptr; diff --git a/core/src/scheduler/CircleQueue.h b/core/src/scheduler/CircleQueue.h new file mode 100644 index 0000000000..5da9338ba5 --- /dev/null +++ b/core/src/scheduler/CircleQueue.h @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace milvus { +namespace scheduler { + +template +class CircleQueue { + using value_type = T; + using atomic_size_type = std::atomic_ullong; + using size_type = uint64_t; + using const_reference = const value_type&; +#define MEMORY_ORDER (std::memory_order::memory_order_seq_cst) + + public: + explicit CircleQueue(size_type cap) : data_(cap, nullptr), capacity_(cap), front_() { + front_.store(cap - 1, MEMORY_ORDER); + } + + CircleQueue() = delete; + CircleQueue(const CircleQueue& q) = delete; + CircleQueue(CircleQueue&& q) = delete; + + public: + const_reference operator[](size_type n) { + return data_[n % capacity_]; + } + + size_type + front() { + return front_.load(MEMORY_ORDER); + } + + size_type + rear() { + return rear_; + } + + size_type + size() { + return size_; + } + + size_type + capacity() { + return capacity_; + } + + void + set_front(uint64_t last_finish) { + if (last_finish == rear_) { + throw; + } + front_.store(last_finish % capacity_, MEMORY_ORDER); + } + + void + put(const value_type& x) { + if ((rear_) % capacity_ == front_.load(MEMORY_ORDER)) { + throw; + } + data_[rear_] = x; + rear_ = ++rear_ % capacity_; + if (size_ < capacity_) { + ++size_; + } + } + + void + put(value_type&& x) { + if ((rear_) % capacity_ == front_.load(MEMORY_ORDER)) { + throw; + } + data_[rear_] = std::move(x); + rear_ = ++rear_ % capacity_; + if (size_ < capacity_) { + ++size_; + } + } + + private: + std::vector data_; + size_type capacity_; + atomic_size_type front_; + size_type rear_ = 0; + size_type size_ = 0; +#undef MEMORY_ORDER +}; + +} // namespace scheduler +} // namespace milvus diff --git a/core/src/scheduler/TaskTable.cpp b/core/src/scheduler/TaskTable.cpp index d0e6c1c38b..e35c7cd255 100644 --- a/core/src/scheduler/TaskTable.cpp +++ b/core/src/scheduler/TaskTable.cpp @@ -20,6 +20,7 @@ #include "event/TaskTableUpdatedEvent.h" #include "scheduler/SchedInst.h" #include "utils/Log.h" +#include "utils/TimeRecorder.h" #include #include @@ -153,7 +154,42 @@ TaskTableItem::Dump() const { std::vector TaskTable::PickToLoad(uint64_t limit) { - std::lock_guard lock(mutex_); +#if 1 + TimeRecorder rc(""); + std::vector indexes; + bool cross = false; + + uint64_t available_begin = table_.front() + 1; + for (uint64_t i = 0, loaded_count = 0, pick_count = 0; i < table_.size() && pick_count < limit; ++i) { + auto index = available_begin + i; + if (not table_[index]) + break; + if (index % table_.capacity() == table_.rear()) + break; + if (not cross && table_[index]->IsFinish()) { + table_.set_front(index); + } else if (table_[index]->state == TaskTableItemState::LOADED) { + cross = true; + ++loaded_count; + if (loaded_count > 2) + return std::vector(); + } else if (table_[index]->state == TaskTableItemState::START) { + auto task = table_[index]->task; + + // if task is a build index task, limit it + if (task->Type() == TaskType::BuildIndexTask && task->path().Current() == "cpu") { + if (not BuildMgrInst::GetInstance()->Take()) { + continue; + } + } + cross = true; + indexes.push_back(index); + ++pick_count; + } + } + rc.ElapseFromBegin("PickToLoad "); + return indexes; +#else size_t count = 0; for (uint64_t j = last_finish_ + 1; j < table_.size(); ++j) { if (not table_[j]) { @@ -197,34 +233,44 @@ TaskTable::PickToLoad(uint64_t limit) { } } return indexes; +#endif } std::vector TaskTable::PickToExecute(uint64_t limit) { - std::lock_guard lock(mutex_); + TimeRecorder rc(""); std::vector indexes; bool cross = false; - for (uint64_t i = last_finish_ + 1, count = 0; i < table_.size() && count < limit; ++i) { - if (not cross && table_[i]->IsFinish()) { - last_finish_ = i; - } else if (table_[i]->state == TaskTableItemState::LOADED) { + uint64_t available_begin = table_.front() + 1; + for (uint64_t i = 0, pick_count = 0; i < table_.size() && pick_count < limit; ++i) { + uint64_t index = available_begin + i; + if (not table_[index]) { + break; + } + if (index % table_.capacity() == table_.rear()) { + break; + } + + if (not cross && table_[index]->IsFinish()) { + table_.set_front(index); + } else if (table_[index]->state == TaskTableItemState::LOADED) { cross = true; - indexes.push_back(i); - ++count; + indexes.push_back(index); + ++pick_count; } } + rc.ElapseFromBegin("PickToExecute "); return indexes; } void TaskTable::Put(TaskPtr task) { - std::lock_guard lock(mutex_); auto item = std::make_shared(); item->id = id_++; item->task = std::move(task); item->state = TaskTableItemState::START; item->timestamp.start = get_current_timestamp(); - table_.push_back(item); + table_.put(std::move(item)); if (subscriber_) { subscriber_(); } @@ -232,14 +278,13 @@ TaskTable::Put(TaskPtr task) { void TaskTable::Put(std::vector& tasks) { - std::lock_guard lock(mutex_); for (auto& task : tasks) { auto item = std::make_shared(); item->id = id_++; item->task = std::move(task); item->state = TaskTableItemState::START; item->timestamp.start = get_current_timestamp(); - table_.push_back(item); + table_.put(std::move(item)); } if (subscriber_) { subscriber_(); @@ -248,26 +293,25 @@ TaskTable::Put(std::vector& tasks) { TaskTableItemPtr TaskTable::Get(uint64_t index) { - std::lock_guard lock(mutex_); return table_[index]; } -// void -// TaskTable::Clear() { -//// find first task is NOT (done or moved), erase from begin to it; -//// auto iterator = table_.begin(); -//// while (iterator->state == TaskTableItemState::EXECUTED or -//// iterator->state == TaskTableItemState::MOVED) -//// iterator++; -//// table_.erase(table_.begin(), iterator); -//} +size_t +TaskTable::TaskToExecute() { + size_t count = 0; + auto begin = table_.front() + 1; + for (size_t i = 0; i < table_.size(); ++i) { + auto index = begin + i; + if (table_[index]->state == TaskTableItemState::LOADED) { + ++count; + } + } + return count; +} json TaskTable::Dump() const { - json ret; - for (auto& item : table_) { - ret.push_back(item->Dump()); - } + json ret{{"error.message", "not support yet."}}; return ret; } diff --git a/core/src/scheduler/TaskTable.h b/core/src/scheduler/TaskTable.h index a9d00043c2..052be66890 100644 --- a/core/src/scheduler/TaskTable.h +++ b/core/src/scheduler/TaskTable.h @@ -25,6 +25,7 @@ #include #include +#include "CircleQueue.h" #include "event/Event.h" #include "interface/interfaces.h" #include "task/SearchTask.h" @@ -99,7 +100,8 @@ using TaskTableItemPtr = std::shared_ptr; class TaskTable : public interface::dumpable { public: - TaskTable() = default; + TaskTable() : table_(1ULL << 16ULL) { + } TaskTable(const TaskTable&) = delete; TaskTable(TaskTable&&) = delete; @@ -128,20 +130,9 @@ class TaskTable : public interface::dumpable { TaskTableItemPtr Get(uint64_t index); - /* - * TODO(wxyu): BIG GC - * Remove sequence task which is DONE or MOVED from front; - * Called by ? - */ - // void - // Clear(); - - /* - * Return true if task table empty, otherwise false; - */ - inline bool - Empty() { - return table_.empty(); + inline size_t + Capacity() { + return table_.capacity(); } /* @@ -152,22 +143,14 @@ class TaskTable : public interface::dumpable { return table_.size(); } + size_t + TaskToExecute(); + public: - TaskTableItemPtr& operator[](uint64_t index) { - std::lock_guard lock(mutex_); + const TaskTableItemPtr& operator[](uint64_t index) { return table_[index]; } - std::deque::iterator - begin() { - return table_.begin(); - } - - std::deque::iterator - end() { - return table_.end(); - } - public: std::vector PickToLoad(uint64_t limit); @@ -249,8 +232,7 @@ class TaskTable : public interface::dumpable { private: std::uint64_t id_ = 0; - mutable std::mutex mutex_; - std::deque table_; + CircleQueue table_; std::function subscriber_ = nullptr; // cache last finish avoid Pick task from begin always diff --git a/core/src/scheduler/resource/Resource.cpp b/core/src/scheduler/resource/Resource.cpp index 1cd4cde609..2577617dab 100644 --- a/core/src/scheduler/resource/Resource.cpp +++ b/core/src/scheduler/resource/Resource.cpp @@ -123,12 +123,7 @@ Resource::Dump() const { uint64_t Resource::NumOfTaskToExec() { - uint64_t count = 0; - for (auto& task : task_table_) { - if (task->state == TaskTableItemState::LOADED) - ++count; - } - return count; + return task_table_.TaskToExecute(); } TaskTableItemPtr diff --git a/core/unittest/scheduler/test_tasktable.cpp b/core/unittest/scheduler/test_tasktable.cpp index e717e40285..97aa1dce66 100644 --- a/core/unittest/scheduler/test_tasktable.cpp +++ b/core/unittest/scheduler/test_tasktable.cpp @@ -15,11 +15,10 @@ // specific language governing permissions and limitations // under the License. - +#include #include "scheduler/TaskTable.h" #include "scheduler/task/TestTask.h" #include "scheduler/tasklabel/DefaultLabel.h" -#include /************ TaskTableBaseTest ************/ @@ -28,15 +27,11 @@ class TaskTableItemTest : public ::testing::Test { void SetUp() override { std::vector states{ - milvus::scheduler::TaskTableItemState::INVALID, - milvus::scheduler::TaskTableItemState::START, - milvus::scheduler::TaskTableItemState::LOADING, - milvus::scheduler::TaskTableItemState::LOADED, - milvus::scheduler::TaskTableItemState::EXECUTING, - milvus::scheduler::TaskTableItemState::EXECUTED, - milvus::scheduler::TaskTableItemState::MOVING, - milvus::scheduler::TaskTableItemState::MOVED}; - for (auto &state : states) { + milvus::scheduler::TaskTableItemState::INVALID, milvus::scheduler::TaskTableItemState::START, + milvus::scheduler::TaskTableItemState::LOADING, milvus::scheduler::TaskTableItemState::LOADED, + milvus::scheduler::TaskTableItemState::EXECUTING, milvus::scheduler::TaskTableItemState::EXECUTED, + milvus::scheduler::TaskTableItemState::MOVING, milvus::scheduler::TaskTableItemState::MOVED}; + for (auto& state : states) { auto item = std::make_shared(); item->state = state; items_.emplace_back(item); @@ -59,9 +54,9 @@ TEST_F(TaskTableItemTest, DESTRUCT) { } TEST_F(TaskTableItemTest, IS_FINISH) { - for (auto &item : items_) { - if (item->state == milvus::scheduler::TaskTableItemState::EXECUTED - || item->state == milvus::scheduler::TaskTableItemState::MOVED) { + for (auto& item : items_) { + if (item->state == milvus::scheduler::TaskTableItemState::EXECUTED || + item->state == milvus::scheduler::TaskTableItemState::MOVED) { ASSERT_TRUE(item->IsFinish()); } else { ASSERT_FALSE(item->IsFinish()); @@ -70,13 +65,13 @@ TEST_F(TaskTableItemTest, IS_FINISH) { } TEST_F(TaskTableItemTest, DUMP) { - for (auto &item : items_) { + for (auto& item : items_) { ASSERT_FALSE(item->Dump().empty()); } } TEST_F(TaskTableItemTest, LOAD) { - for (auto &item : items_) { + for (auto& item : items_) { auto before_state = item->state; auto ret = item->Load(); if (before_state == milvus::scheduler::TaskTableItemState::START) { @@ -90,7 +85,7 @@ TEST_F(TaskTableItemTest, LOAD) { } TEST_F(TaskTableItemTest, LOADED) { - for (auto &item : items_) { + for (auto& item : items_) { auto before_state = item->state; auto ret = item->Loaded(); if (before_state == milvus::scheduler::TaskTableItemState::LOADING) { @@ -104,7 +99,7 @@ TEST_F(TaskTableItemTest, LOADED) { } TEST_F(TaskTableItemTest, EXECUTE) { - for (auto &item : items_) { + for (auto& item : items_) { auto before_state = item->state; auto ret = item->Execute(); if (before_state == milvus::scheduler::TaskTableItemState::LOADED) { @@ -118,7 +113,7 @@ TEST_F(TaskTableItemTest, EXECUTE) { } TEST_F(TaskTableItemTest, EXECUTED) { - for (auto &item : items_) { + for (auto& item : items_) { auto before_state = item->state; auto ret = item->Executed(); if (before_state == milvus::scheduler::TaskTableItemState::EXECUTING) { @@ -132,7 +127,7 @@ TEST_F(TaskTableItemTest, EXECUTED) { } TEST_F(TaskTableItemTest, MOVE) { - for (auto &item : items_) { + for (auto& item : items_) { auto before_state = item->state; auto ret = item->Move(); if (before_state == milvus::scheduler::TaskTableItemState::LOADED) { @@ -146,7 +141,7 @@ TEST_F(TaskTableItemTest, MOVE) { } TEST_F(TaskTableItemTest, MOVED) { - for (auto &item : items_) { + for (auto& item : items_) { auto before_state = item->state; auto ret = item->Moved(); if (before_state == milvus::scheduler::TaskTableItemState::MOVING) { @@ -180,9 +175,7 @@ class TaskTableBaseTest : public ::testing::Test { TEST_F(TaskTableBaseTest, SUBSCRIBER) { bool flag = false; - auto callback = [&]() { - flag = true; - }; + auto callback = [&]() { flag = true; }; empty_table_.RegisterSubscriber(callback); empty_table_.Put(task1_); ASSERT_TRUE(flag); @@ -210,12 +203,6 @@ TEST_F(TaskTableBaseTest, PUT_EMPTY_BATCH) { empty_table_.Put(tasks); } -TEST_F(TaskTableBaseTest, EMPTY) { - ASSERT_TRUE(empty_table_.Empty()); - empty_table_.Put(task1_); - ASSERT_FALSE(empty_table_.Empty()); -} - TEST_F(TaskTableBaseTest, SIZE) { ASSERT_EQ(empty_table_.Size(), 0); empty_table_.Put(task1_); @@ -237,7 +224,7 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD) { auto indexes = empty_table_.PickToLoad(1); ASSERT_EQ(indexes.size(), 1); - ASSERT_EQ(indexes[0], 2); + ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2); } TEST_F(TaskTableBaseTest, PICK_TO_LOAD_LIMIT) { @@ -250,9 +237,9 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD_LIMIT) { auto indexes = empty_table_.PickToLoad(3); ASSERT_EQ(indexes.size(), 3); - ASSERT_EQ(indexes[0], 2); - ASSERT_EQ(indexes[1], 3); - ASSERT_EQ(indexes[2], 4); + ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2); + ASSERT_EQ(indexes[1]% empty_table_.Capacity(), 3); + ASSERT_EQ(indexes[2]% empty_table_.Capacity(), 4); } TEST_F(TaskTableBaseTest, PICK_TO_LOAD_CACHE) { @@ -266,14 +253,14 @@ TEST_F(TaskTableBaseTest, PICK_TO_LOAD_CACHE) { // first pick, non-cache auto indexes = empty_table_.PickToLoad(1); ASSERT_EQ(indexes.size(), 1); - ASSERT_EQ(indexes[0], 2); + ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2); // second pick, iterate from 2 // invalid state change empty_table_[1]->state = milvus::scheduler::TaskTableItemState::START; indexes = empty_table_.PickToLoad(1); ASSERT_EQ(indexes.size(), 1); - ASSERT_EQ(indexes[0], 2); + ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2); } TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE) { @@ -287,7 +274,7 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE) { auto indexes = empty_table_.PickToExecute(1); ASSERT_EQ(indexes.size(), 1); - ASSERT_EQ(indexes[0], 2); + ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2); } TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_LIMIT) { @@ -302,8 +289,8 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_LIMIT) { auto indexes = empty_table_.PickToExecute(3); ASSERT_EQ(indexes.size(), 2); - ASSERT_EQ(indexes[0], 2); - ASSERT_EQ(indexes[1], 3); + ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2); + ASSERT_EQ(indexes[1] % empty_table_.Capacity(), 3); } TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_CACHE) { @@ -318,14 +305,14 @@ TEST_F(TaskTableBaseTest, PICK_TO_EXECUTE_CACHE) { // first pick, non-cache auto indexes = empty_table_.PickToExecute(1); ASSERT_EQ(indexes.size(), 1); - ASSERT_EQ(indexes[0], 2); + ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2); // second pick, iterate from 2 // invalid state change empty_table_[1]->state = milvus::scheduler::TaskTableItemState::START; indexes = empty_table_.PickToExecute(1); ASSERT_EQ(indexes.size(), 1); - ASSERT_EQ(indexes[0], 2); + ASSERT_EQ(indexes[0] % empty_table_.Capacity(), 2); } /************ TaskTableAdvanceTest ************/ @@ -356,8 +343,8 @@ class TaskTableAdvanceTest : public ::testing::Test { TEST_F(TaskTableAdvanceTest, LOAD) { std::vector before_state; - for (auto &task : table1_) { - before_state.push_back(task->state); + for (size_t i = 0; i < table1_.Size(); ++i) { + before_state.push_back(table1_[i]->state); } for (size_t i = 0; i < table1_.Size(); ++i) { @@ -375,8 +362,8 @@ TEST_F(TaskTableAdvanceTest, LOAD) { TEST_F(TaskTableAdvanceTest, LOADED) { std::vector before_state; - for (auto &task : table1_) { - before_state.push_back(task->state); + for (size_t i = 0; i < table1_.Size(); ++i) { + before_state.push_back(table1_[i]->state); } for (size_t i = 0; i < table1_.Size(); ++i) { @@ -394,8 +381,8 @@ TEST_F(TaskTableAdvanceTest, LOADED) { TEST_F(TaskTableAdvanceTest, EXECUTE) { std::vector before_state; - for (auto &task : table1_) { - before_state.push_back(task->state); + for (size_t i = 0; i < table1_.Size(); ++i) { + before_state.push_back(table1_[i]->state); } for (size_t i = 0; i < table1_.Size(); ++i) { @@ -413,8 +400,8 @@ TEST_F(TaskTableAdvanceTest, EXECUTE) { TEST_F(TaskTableAdvanceTest, EXECUTED) { std::vector before_state; - for (auto &task : table1_) { - before_state.push_back(task->state); + for (size_t i = 0; i < table1_.Size(); ++i) { + before_state.push_back(table1_[i]->state); } for (size_t i = 0; i < table1_.Size(); ++i) { @@ -432,8 +419,8 @@ TEST_F(TaskTableAdvanceTest, EXECUTED) { TEST_F(TaskTableAdvanceTest, MOVE) { std::vector before_state; - for (auto &task : table1_) { - before_state.push_back(task->state); + for (size_t i = 0; i < table1_.Size(); ++i) { + before_state.push_back(table1_[i]->state); } for (size_t i = 0; i < table1_.Size(); ++i) { @@ -451,8 +438,8 @@ TEST_F(TaskTableAdvanceTest, MOVE) { TEST_F(TaskTableAdvanceTest, MOVED) { std::vector before_state; - for (auto &task : table1_) { - before_state.push_back(task->state); + for (size_t i = 0; i < table1_.Size(); ++i) { + before_state.push_back(table1_[i]->state); } for (size_t i = 0; i < table1_.Size(); ++i) { @@ -467,4 +454,3 @@ TEST_F(TaskTableAdvanceTest, MOVED) { } } } -