diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index f5eb13378d..c66b1df7fe 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -46,6 +46,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-405 - Add delete task support - MS-407 - Reconstruct MetricsCollector - MS-408 - Add device_id in resource construct function +- MS-409 - Using new scheduler ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/conf/server_config.template b/cpp/conf/server_config.template index 055a2053f6..2792fbf5da 100644 --- a/cpp/conf/server_config.template +++ b/cpp/conf/server_config.template @@ -37,5 +37,40 @@ cache_config: engine_config: use_blas_threshold: 20 - metric_type: L2 # compare vectors by euclidean distance(L2) or inner product(IP), optional: L2 or IP - omp_thread_num: 0 # how many cpu cores be used by engine, 0 means use all cpu cores used + metric_type: L2 # compare vectors by euclidean distance(L2) or inner product(IP), optional: L2 or IP + omp_thread_num: 0 # how many compute threads be used by engine, 0 means use all cpu core to compute + +resource_config: + resources: + ssda: + type: DISK + memory: 2048 + device_id: 0 + enable_loader: true + enable_executor: false + + cpu: + type: CPU + memory: 64 + device_id: 0 + enable_loader: true + enable_executor: false + + gtx1060: + type: GPU + memory: 6 + device_id: 0 + enable_loader: true + enable_executor: true + + gtx1660: + type: GPU + memory: 6 + device_id: 1 + enable_loader: true + enable_executor: true + + connections: + - ssda===cpu + - cpu===gtx1060 + - cpu===gtx1660 diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 363935fd9c..9b7e28af5f 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -17,6 +17,19 @@ aux_source_directory(db/meta db_meta_files) aux_source_directory(metrics metrics_files) aux_source_directory(wrapper/knowhere knowhere_files) +aux_source_directory(scheduler/action scheduler_action_files) +aux_source_directory(scheduler/event scheduler_event_files) +aux_source_directory(scheduler/resource scheduler_resource_files) +aux_source_directory(scheduler/task scheduler_task_files) +aux_source_directory(scheduler scheduler_root_files) +set(scheduler_srcs + ${scheduler_action_files} + ${scheduler_event_files} + ${scheduler_resource_files} + ${scheduler_task_files} + ${scheduler_root_files} + ) + aux_source_directory(db/scheduler scheduler_files) aux_source_directory(db/scheduler/context scheduler_context_files) aux_source_directory(db/scheduler/task scheduler_task_files) @@ -210,6 +223,7 @@ if (MILVUS_WITH_THRIFT STREQUAL "ON") ${utils_files} ${thrift_service_files} ${metrics_files} + ${scheduler_srcs} ) else() add_executable(milvus_server @@ -219,6 +233,7 @@ else() ${utils_files} ${grpc_service_files} ${metrics_files} + ${scheduler_srcs} ) endif() diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 0910d04d7a..dcf2cd6091 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -23,6 +23,7 @@ #include #include #include +#include "scheduler/SchedInst.h" #include namespace zilliz { @@ -67,13 +68,15 @@ Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& date //scheduler will determine when to delete table files TaskScheduler& scheduler = TaskScheduler::GetInstance(); - DeleteContextPtr context = std::make_shared(table_id, meta_ptr_); + DeleteContextPtr context = std::make_shared(table_id, + meta_ptr_, + ResMgrInst::GetInstance()->GetNumOfComputeResource()); scheduler.Schedule(context); + context->WaitAndDelete(); } else { meta_ptr_->DropPartitionsByDates(table_id, dates); } - return Status::OK(); } diff --git a/cpp/src/db/engine/ExecutionEngine.h b/cpp/src/db/engine/ExecutionEngine.h index 8f61a7c833..f266c1c3e1 100644 --- a/cpp/src/db/engine/ExecutionEngine.h +++ b/cpp/src/db/engine/ExecutionEngine.h @@ -51,6 +51,8 @@ public: virtual Status CopyToCpu() = 0; + virtual std::shared_ptr Clone() = 0; + virtual Status Merge(const std::string& location) = 0; virtual Status Search(long n, diff --git a/cpp/src/db/engine/ExecutionEngineImpl.cpp b/cpp/src/db/engine/ExecutionEngineImpl.cpp index 35cb88c453..5ef2fc0c72 100644 --- a/cpp/src/db/engine/ExecutionEngineImpl.cpp +++ b/cpp/src/db/engine/ExecutionEngineImpl.cpp @@ -181,6 +181,13 @@ Status ExecutionEngineImpl::CopyToCpu() { return Status::OK(); } +ExecutionEnginePtr ExecutionEngineImpl::Clone() { + auto ret = std::make_shared(dim_, location_, index_type_, metric_type_, nlist_); + ret->Init(); + ret->index_ = index_->Clone(); + return ret; +} + Status ExecutionEngineImpl::Merge(const std::string &location) { if (location == location_) { return Status::Error("Cannot Merge Self"); diff --git a/cpp/src/db/engine/ExecutionEngineImpl.h b/cpp/src/db/engine/ExecutionEngineImpl.h index 74d49d1926..4ccf97177a 100644 --- a/cpp/src/db/engine/ExecutionEngineImpl.h +++ b/cpp/src/db/engine/ExecutionEngineImpl.h @@ -50,6 +50,8 @@ public: Status CopyToCpu() override; + ExecutionEnginePtr Clone() override; + Status Merge(const std::string &location) override; Status Search(long n, diff --git a/cpp/src/db/scheduler/TaskScheduler.cpp b/cpp/src/db/scheduler/TaskScheduler.cpp index ef5942311e..8adf690be1 100644 --- a/cpp/src/db/scheduler/TaskScheduler.cpp +++ b/cpp/src/db/scheduler/TaskScheduler.cpp @@ -4,12 +4,15 @@ * Proprietary and confidential. ******************************************************************************/ +#include "server/ServerConfig.h" #include "TaskScheduler.h" #include "TaskDispatchQueue.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" #include "db/engine/EngineFactory.h" #include "scheduler/task/TaskConvert.h" +#include "scheduler/SchedInst.h" +#include "scheduler/ResourceFactory.h" namespace zilliz { namespace milvus { @@ -86,14 +89,22 @@ TaskScheduler::TaskDispatchWorker() { return true; } +#if 1 // TODO: Put task into Disk-TaskTable -// auto task = TaskConvert(task_ptr); -// DiskResourcePtr->task_table().Put(task) + auto task = TaskConvert(task_ptr); + auto disk_list = ResMgrInst::GetInstance()->GetDiskResources(); + if (!disk_list.empty()) { + if (auto disk = disk_list[0].lock()) { + disk->task_table().Put(task); + } + } +#else //execute task ScheduleTaskPtr next_task = task_ptr->Execute(); if(next_task != nullptr) { task_queue_.Put(next_task); } +#endif } return true; diff --git a/cpp/src/db/scheduler/context/DeleteContext.cpp b/cpp/src/db/scheduler/context/DeleteContext.cpp index bffeb9a134..c1987dc739 100644 --- a/cpp/src/db/scheduler/context/DeleteContext.cpp +++ b/cpp/src/db/scheduler/context/DeleteContext.cpp @@ -6,17 +6,33 @@ #include "DeleteContext.h" + namespace zilliz { namespace milvus { namespace engine { -DeleteContext::DeleteContext(const std::string& table_id, meta::Meta::Ptr& meta_ptr) +DeleteContext::DeleteContext(const std::string &table_id, meta::Meta::Ptr &meta_ptr, uint64_t num_resource) : IScheduleContext(ScheduleContextType::kDelete), table_id_(table_id), - meta_ptr_(meta_ptr) { + meta_ptr_(meta_ptr), + num_resource_(num_resource) { } +void DeleteContext::WaitAndDelete() { + std::unique_lock lock(mutex_); + cv_.wait(lock, [&] { return done_resource == num_resource_; }); + meta_ptr_->DeleteTableFiles(table_id_); +} + +void DeleteContext::ResourceDone() { + { + std::lock_guard lock(mutex_); + ++done_resource; + } + cv_.notify_one(); +} + } } } \ No newline at end of file diff --git a/cpp/src/db/scheduler/context/DeleteContext.h b/cpp/src/db/scheduler/context/DeleteContext.h index efcc3bb6a7..e9ff2f1be2 100644 --- a/cpp/src/db/scheduler/context/DeleteContext.h +++ b/cpp/src/db/scheduler/context/DeleteContext.h @@ -7,6 +7,8 @@ #include "IScheduleContext.h" #include "db/meta/Meta.h" +#include +#include namespace zilliz { namespace milvus { @@ -14,14 +16,21 @@ namespace engine { class DeleteContext : public IScheduleContext { public: - DeleteContext(const std::string& table_id, meta::Meta::Ptr& meta_ptr); + DeleteContext(const std::string& table_id, meta::Meta::Ptr& meta_ptr, uint64_t num_resource); std::string table_id() const { return table_id_; } meta::Meta::Ptr meta() const { return meta_ptr_; } + void WaitAndDelete(); + void ResourceDone(); private: std::string table_id_; meta::Meta::Ptr meta_ptr_; + + uint64_t num_resource_; + uint64_t done_resource = 0; + std::mutex mutex_; + std::condition_variable cv_; }; using DeleteContextPtr = std::shared_ptr; diff --git a/cpp/src/db/scheduler/task/DeleteTask.h b/cpp/src/db/scheduler/task/DeleteTask.h index 4617a943bb..866f0a7789 100644 --- a/cpp/src/db/scheduler/task/DeleteTask.h +++ b/cpp/src/db/scheduler/task/DeleteTask.h @@ -18,7 +18,7 @@ public: virtual std::shared_ptr Execute() override; -private: +public: DeleteContextPtr context_; }; diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index 0ef8d7b01f..96e2589382 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -95,6 +95,13 @@ ResourceMgr::Stop() { } } +void +ResourceMgr::Clear() { + std::lock_guard lck(resources_mutex_); + disk_resources_.clear(); + resources_.clear(); +} + void ResourceMgr::PostEvent(const EventPtr &event) { std::lock_guard lock(event_mutex_); diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index c695e70ee4..40d1ab807b 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -68,6 +68,9 @@ public: void Stop(); + void + Clear(); + void PostEvent(const EventPtr &event); diff --git a/cpp/src/scheduler/SchedInst.cpp b/cpp/src/scheduler/SchedInst.cpp index 7d975c1643..972c2e2606 100644 --- a/cpp/src/scheduler/SchedInst.cpp +++ b/cpp/src/scheduler/SchedInst.cpp @@ -5,7 +5,8 @@ ******************************************************************************/ #include "SchedInst.h" - +#include "server/ServerConfig.h" +#include "ResourceFactory.h" namespace zilliz { namespace milvus { @@ -17,6 +18,40 @@ std::mutex ResMgrInst::mutex_; SchedulerPtr SchedInst::instance = nullptr; std::mutex SchedInst::mutex_; +void +SchedServInit() { + server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE); + auto resources = config.GetChild(server::CONFIG_RESOURCES).GetChildren(); + for (auto &resource : resources) { + auto &resname = resource.first; + auto &resconf = resource.second; + auto type = resconf.GetValue(server::CONFIG_RESOURCE_TYPE); +// auto memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_MEMORY); + auto device_id = resconf.GetInt64Value(server::CONFIG_RESOURCE_DEVICE_ID); + auto enable_loader = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_LOADER); + auto enable_executor = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_EXECUTOR); + + ResMgrInst::GetInstance()->Add(ResourceFactory::Create(resname, + type, + device_id, + enable_loader, + enable_executor)); + } + + auto default_connection = Connection("default_connection", 500.0); + auto connections = config.GetSequence(server::CONFIG_RESOURCE_CONNECTIONS); + for (auto &conn : connections) { + std::string delimiter = "==="; + std::string left = conn.substr(0, conn.find(delimiter)); + std::string right = conn.substr(conn.find(delimiter) + 3, conn.length()); + + ResMgrInst::GetInstance()->Connect(left, right, default_connection); + } + + ResMgrInst::GetInstance()->Start(); + SchedInst::GetInstance()->Start(); +} + } } } diff --git a/cpp/src/scheduler/SchedInst.h b/cpp/src/scheduler/SchedInst.h index c05f525e3a..3ae36827a8 100644 --- a/cpp/src/scheduler/SchedInst.h +++ b/cpp/src/scheduler/SchedInst.h @@ -52,6 +52,9 @@ private: static std::mutex mutex_; }; +void +SchedServInit(); + } } } diff --git a/cpp/src/scheduler/task/SearchTask.cpp b/cpp/src/scheduler/task/SearchTask.cpp index b387e2de3b..7d342a3592 100644 --- a/cpp/src/scheduler/task/SearchTask.cpp +++ b/cpp/src/scheduler/task/SearchTask.cpp @@ -84,7 +84,9 @@ CollectFileMetrics(int file_type, size_t file_size) { XSearchTask::XSearchTask(TableFileSchemaPtr file) : file_(file) { index_engine_ = EngineFactory::Build(file_->dimension_, file_->location_, - (EngineType) file_->engine_type_); + (EngineType) file_->engine_type_, + (MetricType)file_->metric_type_, + file_->nlist_); } void diff --git a/cpp/src/server/Server.cpp b/cpp/src/server/Server.cpp index b73a5aaf73..f1b40de43e 100644 --- a/cpp/src/server/Server.cpp +++ b/cpp/src/server/Server.cpp @@ -24,6 +24,7 @@ //#include #include #include +#include #include "metrics/Metrics.h" @@ -163,6 +164,7 @@ Server::Start() { signal(SIGTERM, SignalUtil::HandleSignal); server::Metrics::GetInstance().Init(); server::SystemInfo::GetInstance().Init(); + engine::SchedServInit(); std::cout << "Milvus server start successfully." << std::endl; StartService(); diff --git a/cpp/src/server/ServerConfig.h b/cpp/src/server/ServerConfig.h index 2979e35c15..f8f6deea98 100644 --- a/cpp/src/server/ServerConfig.h +++ b/cpp/src/server/ServerConfig.h @@ -49,6 +49,16 @@ static const std::string CONFIG_ENGINE = "engine_config"; static const std::string CONFIG_DCBT = "use_blas_threshold"; static const std::string CONFIG_OMP_THREAD_NUM = "omp_thread_num"; +static const char* CONFIG_RESOURCE = "resource_config"; +static const char* CONFIG_RESOURCES = "resources"; +static const char* CONFIG_RESOURCE_TYPE = "type"; +static const char* CONFIG_RESOURCE_MEMORY = "memory"; +static const char* CONFIG_RESOURCE_DEVICE_ID = "device_id"; +static const char* CONFIG_RESOURCE_ENABLE_LOADER = "enable_loader"; +static const char* CONFIG_RESOURCE_ENABLE_EXECUTOR = "enable_executor"; +static const char* CONFIG_RESOURCE_CONNECTIONS = "connections"; + + class ServerConfig { public: static ServerConfig &GetInstance(); diff --git a/cpp/unittest/db/CMakeLists.txt b/cpp/unittest/db/CMakeLists.txt index 6c451bf20f..564d03a828 100644 --- a/cpp/unittest/db/CMakeLists.txt +++ b/cpp/unittest/db/CMakeLists.txt @@ -10,6 +10,12 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db/meta db_meta_files) aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files) aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper/knowhere knowhere_src) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/action scheduler_action_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/event scheduler_event_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler scheduler_srcs) + aux_source_directory(./ test_srcs) set(util_files @@ -40,6 +46,11 @@ set(db_test_src ${db_meta_files} ${db_scheduler_srcs} ${wrapper_src} + ${scheduler_action_srcs} + ${scheduler_event_srcs} + ${scheduler_resource_srcs} + ${scheduler_task_srcs} + ${scheduler_srcs} ${knowhere_src} ${util_files} ${require_files} diff --git a/cpp/unittest/db/scheduler_test.cpp b/cpp/unittest/db/scheduler_test.cpp index 4826fa4b16..dfb90fc3df 100644 --- a/cpp/unittest/db/scheduler_test.cpp +++ b/cpp/unittest/db/scheduler_test.cpp @@ -100,7 +100,7 @@ TEST(DBSchedulerTest, DELETE_SCHEDULER_TEST) { } engine::meta::Meta::Ptr meta_ptr; - engine::DeleteContextPtr context_ptr = std::make_shared(table_id, meta_ptr); + engine::DeleteContextPtr context_ptr = std::make_shared(table_id, meta_ptr, 0); ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list); ASSERT_TRUE(ret); ASSERT_EQ(task_list.size(), 21); @@ -115,7 +115,7 @@ TEST(DBSchedulerTest, DELETE_SCHEDULER_TEST) { } } - context_ptr = std::make_shared("no_task_table", meta_ptr); + context_ptr = std::make_shared("no_task_table", meta_ptr, 0); ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list); ASSERT_TRUE(ret); ASSERT_EQ(task_list.size(), 22); diff --git a/cpp/unittest/db/utils.cpp b/cpp/unittest/db/utils.cpp index cfac3ea6e7..b624d4202a 100644 --- a/cpp/unittest/db/utils.cpp +++ b/cpp/unittest/db/utils.cpp @@ -59,12 +59,27 @@ engine::Options DBTest::GetOptions() { void DBTest::SetUp() { InitLog(); + + auto res_mgr = engine::ResMgrInst::GetInstance(); + res_mgr->Clear(); + res_mgr->Add(engine::ResourceFactory::Create("disk", "DISK", 0, true, false)); + res_mgr->Add(engine::ResourceFactory::Create("cpu", "CPU", 0, true, true)); + + auto default_conn = engine::Connection("IO", 500.0); + res_mgr->Connect("disk", "cpu", default_conn); + res_mgr->Start(); + engine::SchedInst::GetInstance()->Start(); + auto options = GetOptions(); db_ = engine::DBFactory::Build(options); } void DBTest::TearDown() { delete db_; + + engine::ResMgrInst::GetInstance()->Stop(); + engine::SchedInst::GetInstance()->Stop(); + boost::filesystem::remove_all("/tmp/milvus_test"); } @@ -117,6 +132,21 @@ void NewMemManagerTest::InitLog() { void NewMemManagerTest::SetUp() { InitLog(); + + auto res_mgr = engine::ResMgrInst::GetInstance(); + res_mgr->Clear(); + res_mgr->Add(engine::ResourceFactory::Create("disk", "DISK", 0, true, false)); + res_mgr->Add(engine::ResourceFactory::Create("cpu", "CPU", 0, true, true)); + + auto default_conn = engine::Connection("IO", 500.0); + res_mgr->Connect("disk", "cpu", default_conn); + res_mgr->Start(); + engine::SchedInst::GetInstance()->Start(); +} + +void NewMemManagerTest::TearDown() { + engine::ResMgrInst::GetInstance()->Stop(); + engine::SchedInst::GetInstance()->Stop(); } int main(int argc, char **argv) { diff --git a/cpp/unittest/db/utils.h b/cpp/unittest/db/utils.h index 4d3e318faa..83f7abef5a 100644 --- a/cpp/unittest/db/utils.h +++ b/cpp/unittest/db/utils.h @@ -13,6 +13,8 @@ #include "db/DB.h" #include "db/meta/SqliteMetaImpl.h" #include "db/meta/MySQLMetaImpl.h" +#include "scheduler/SchedInst.h" +#include "scheduler/ResourceFactory.h" #define TIMING @@ -91,4 +93,5 @@ class DISABLED_MySQLDBTest : public ::testing::Test { class NewMemManagerTest : public ::testing::Test { void InitLog(); void SetUp() override; + void TearDown() override; };