From 0d3da0b84124cd56aa1c6453a6e6756a8fa5d8a4 Mon Sep 17 00:00:00 2001 From: wxyu Date: Tue, 8 Oct 2019 15:03:16 +0800 Subject: [PATCH 1/3] MS-611 Add resources validity check in ResourceMgr Former-commit-id: 0957d8980a07a07434bc2bea3ecff686a56cb127 --- cpp/src/scheduler/ResourceMgr.cpp | 4 ++++ cpp/src/scheduler/task/SearchTask.cpp | 1 + 2 files changed, 5 insertions(+) diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index 3ea8a56ef8..d9da1f16bf 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -104,6 +104,10 @@ ResourceMgr::Connect(const std::string& name1, const std::string& name2, Connect void ResourceMgr::Clear() { std::lock_guard lck(resources_mutex_); + if (running_) { + ENGINE_LOG_ERROR << "ResourceMgr is running, cannot clear."; + return; + } disk_resources_.clear(); cpu_resources_.clear(); gpu_resources_.clear(); diff --git a/cpp/src/scheduler/task/SearchTask.cpp b/cpp/src/scheduler/task/SearchTask.cpp index 2beff8f4c3..41e6582047 100644 --- a/cpp/src/scheduler/task/SearchTask.cpp +++ b/cpp/src/scheduler/task/SearchTask.cpp @@ -34,6 +34,7 @@ static constexpr size_t PARALLEL_REDUCE_BATCH = 1000; std::mutex XSearchTask::merge_mutex_; +// TODO(wxyu): remove unused code // bool // NeedParallelReduce(uint64_t nq, uint64_t topk) { // server::ServerConfig &config = server::ServerConfig::GetInstance(); From d0e7744abce4e26a002c119f8bfe17e17e18a9d3 Mon Sep 17 00:00:00 2001 From: wxyu Date: Tue, 8 Oct 2019 15:24:50 +0800 Subject: [PATCH 2/3] MS-611 Add resources validity check in ResourceMgr Former-commit-id: cc00283529c2e56ee9829c2f42d7c217d1ba3f42 --- cpp/src/scheduler/JobMgr.cpp | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/cpp/src/scheduler/JobMgr.cpp b/cpp/src/scheduler/JobMgr.cpp index 25a4932f03..bf22f7be2e 100644 --- a/cpp/src/scheduler/JobMgr.cpp +++ b/cpp/src/scheduler/JobMgr.cpp @@ -66,12 +66,10 @@ JobMgr::worker_function() { } auto tasks = build_task(job); - auto disk_list = res_mgr_->GetDiskResources(); - if (!disk_list.empty()) { - if (auto disk = disk_list[0].lock()) { - for (auto& task : tasks) { - disk->task_table().Put(task); - } + // disk resources NEVER be empty. + if (auto disk = res_mgr_->GetDiskResources()[0].lock()) { + for (auto& task : tasks) { + disk->task_table().Put(task); } } } From 1040298ec061027dc8744009f4827b7d9ae0ad8f Mon Sep 17 00:00:00 2001 From: starlord Date: Tue, 8 Oct 2019 15:44:03 +0800 Subject: [PATCH 3/3] refine code Former-commit-id: 5303054b0769e7f03a16ede2ff42f7068663b889 --- cpp/src/db/meta/MetaConsts.h | 5 --- cpp/src/scheduler/ResourceMgr.cpp | 20 ++++++------ cpp/src/scheduler/task/SearchTask.cpp | 12 +++----- cpp/src/scheduler/task/SearchTask.h | 9 ++---- cpp/unittest/db/test_db.cpp | 5 +-- cpp/unittest/db/test_db_mysql.cpp | 5 +-- cpp/unittest/db/test_mem.cpp | 2 +- cpp/unittest/db/test_meta.cpp | 3 +- cpp/unittest/db/test_meta_mysql.cpp | 2 +- cpp/unittest/db/test_search.cpp | 44 +++++++++++++-------------- 10 files changed, 49 insertions(+), 58 deletions(-) diff --git a/cpp/src/db/meta/MetaConsts.h b/cpp/src/db/meta/MetaConsts.h index e963a63f43..4e40ff7731 100644 --- a/cpp/src/db/meta/MetaConsts.h +++ b/cpp/src/db/meta/MetaConsts.h @@ -21,11 +21,6 @@ namespace milvus { namespace engine { namespace meta { -const size_t K = 1024UL; -const size_t M = K * K; -const size_t G = K * M; -const size_t T = K * G; - const size_t S_PS = 1UL; const size_t MS_PS = 1000 * S_PS; const size_t US_PS = 1000 * MS_PS; diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index 3ea8a56ef8..1807f6f83f 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -79,9 +79,7 @@ ResourceMgr::Add(ResourcePtr&& resource) { gpu_resources_.emplace_back(ResourceWPtr(resource)); break; } - default: { - break; - } + default: { break; } } resources_.emplace_back(resource); @@ -196,13 +194,19 @@ bool ResourceMgr::check_resource_valid() { { // TODO: check one disk-resource, one cpu-resource, zero or more gpu-resource; - if (GetDiskResources().size() != 1) return false; - if (GetCpuResources().size() != 1) return false; + if (GetDiskResources().size() != 1) { + return false; + } + if (GetCpuResources().size() != 1) { + return false; + } } { // TODO: one compute-resource at least; - if (GetNumOfComputeResource() < 1) return false; + if (GetNumOfComputeResource() < 1) { + return false; + } } { @@ -233,9 +237,7 @@ void ResourceMgr::event_process() { while (running_) { std::unique_lock lock(event_mutex_); - event_cv_.wait(lock, [this] { - return !queue_.empty(); - }); + event_cv_.wait(lock, [this] { return !queue_.empty(); }); auto event = queue_.front(); queue_.pop(); diff --git a/cpp/src/scheduler/task/SearchTask.cpp b/cpp/src/scheduler/task/SearchTask.cpp index 20962d8a10..36eee0e18f 100644 --- a/cpp/src/scheduler/task/SearchTask.cpp +++ b/cpp/src/scheduler/task/SearchTask.cpp @@ -22,6 +22,7 @@ #include "utils/Log.h" #include "utils/TimeRecorder.h" +#include #include #include #include @@ -229,13 +230,8 @@ XSearchTask::Execute() { } Status -XSearchTask::TopkResult(const std::vector &input_ids, - const std::vector &input_distance, - uint64_t input_k, - uint64_t nq, - uint64_t topk, - bool ascending, - scheduler::ResultSet &result) { +XSearchTask::TopkResult(const std::vector& input_ids, const std::vector& input_distance, + uint64_t input_k, uint64_t nq, uint64_t topk, bool ascending, scheduler::ResultSet& result) { scheduler::ResultSet result_buf; if (result.empty()) { @@ -265,7 +261,7 @@ XSearchTask::TopkResult(const std::vector &input_ids, auto& result_buf_item = result_buf_i[buf_k]; auto& result_item = result_i[tar_k]; if ((ascending && input_distance[src_idx] < result_item.second) || - (!ascending && input_distance[src_idx] > result_item.second)) { + (!ascending && input_distance[src_idx] > result_item.second)) { result_buf_item.first = input_ids[src_idx]; result_buf_item.second = input_distance[src_idx]; src_k++; diff --git a/cpp/src/scheduler/task/SearchTask.h b/cpp/src/scheduler/task/SearchTask.h index 92d7235c6b..fd5c8a0d1d 100644 --- a/cpp/src/scheduler/task/SearchTask.h +++ b/cpp/src/scheduler/task/SearchTask.h @@ -39,13 +39,8 @@ class XSearchTask : public Task { public: static Status - TopkResult(const std::vector &input_ids, - const std::vector &input_distance, - uint64_t input_k, - uint64_t nq, - uint64_t topk, - bool ascending, - scheduler::ResultSet &result); + TopkResult(const std::vector& input_ids, const std::vector& input_distance, uint64_t input_k, + uint64_t nq, uint64_t topk, bool ascending, scheduler::ResultSet& result); public: TableFileSchemaPtr file_; diff --git a/cpp/unittest/db/test_db.cpp b/cpp/unittest/db/test_db.cpp index 413e192335..eb1f947d61 100644 --- a/cpp/unittest/db/test_db.cpp +++ b/cpp/unittest/db/test_db.cpp @@ -18,6 +18,7 @@ #include "db/utils.h" #include "db/DB.h" #include "db/DBImpl.h" +#include "db/Constants.h" #include "db/meta/MetaConsts.h" #include "db/DBFactory.h" #include "cache/CpuCacheMgr.h" @@ -190,7 +191,7 @@ TEST_F(DBTest, DB_TEST) { START_TIMER; stat = db_->Query(TABLE_NAME, k, qb, 10, qxb.data(), results); - ss << "Search " << j << " With Size " << count / ms::engine::meta::M << " M"; + ss << "Search " << j << " With Size " << count / ms::engine::M << " M"; STOP_TIMER(ss.str()); ASSERT_TRUE(stat.ok()); @@ -439,7 +440,7 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) { db_->Size(size); LOG(DEBUG) << "size=" << size; - ASSERT_LE(size, 1 * ms::engine::meta::G); + ASSERT_LE(size, 1 * ms::engine::G); } TEST_F(DBTest2, DELETE_TEST) { diff --git a/cpp/unittest/db/test_db_mysql.cpp b/cpp/unittest/db/test_db_mysql.cpp index 89d92c1e06..3b73deb9cc 100644 --- a/cpp/unittest/db/test_db_mysql.cpp +++ b/cpp/unittest/db/test_db_mysql.cpp @@ -18,6 +18,7 @@ #include "db/utils.h" #include "db/DB.h" #include "db/DBImpl.h" +#include "db/Constants.h" #include "db/meta/MetaConsts.h" #include @@ -98,7 +99,7 @@ TEST_F(MySqlDBTest, DB_TEST) { START_TIMER; stat = db_->Query(TABLE_NAME, k, qb, 10, qxb.data(), results); - ss << "Search " << j << " With Size " << count / ms::engine::meta::M << " M"; + ss << "Search " << j << " With Size " << count / ms::engine::M << " M"; STOP_TIMER(ss.str()); ASSERT_TRUE(stat.ok()); @@ -236,7 +237,7 @@ TEST_F(MySqlDBTest, ARHIVE_DISK_CHECK) { db_->Size(size); LOG(DEBUG) << "size=" << size; - ASSERT_LE(size, 1 * ms::engine::meta::G); + ASSERT_LE(size, 1 * ms::engine::G); } TEST_F(MySqlDBTest, DELETE_TEST) { diff --git a/cpp/unittest/db/test_mem.cpp b/cpp/unittest/db/test_mem.cpp index a99631155e..1e465a69fb 100644 --- a/cpp/unittest/db/test_mem.cpp +++ b/cpp/unittest/db/test_mem.cpp @@ -329,7 +329,7 @@ TEST_F(MemManagerTest2, CONCURRENT_INSERT_SEARCH_TEST) { START_TIMER; stat = db_->Query(GetTableName(), k, qb, 10, qxb.data(), results); - ss << "Search " << j << " With Size " << count / ms::engine::meta::M << " M"; + ss << "Search " << j << " With Size " << count / ms::engine::M << " M"; STOP_TIMER(ss.str()); ASSERT_TRUE(stat.ok()); diff --git a/cpp/unittest/db/test_meta.cpp b/cpp/unittest/db/test_meta.cpp index b8eb690462..d88c087aa4 100644 --- a/cpp/unittest/db/test_meta.cpp +++ b/cpp/unittest/db/test_meta.cpp @@ -18,6 +18,7 @@ #include "db/utils.h" #include "db/meta/SqliteMetaImpl.h" #include "db/Utils.h" +#include "db/Constants.h" #include "db/meta/MetaConsts.h" #include @@ -192,7 +193,7 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) { for (auto i = 0; i < cnt; ++i) { status = impl.CreateTableFile(table_file); table_file.file_type_ = ms::engine::meta::TableFileSchema::NEW; - table_file.file_size_ = each_size * ms::engine::meta::G; + table_file.file_size_ = each_size * ms::engine::G; status = impl.UpdateTableFile(table_file); files.push_back(table_file); ids.push_back(table_file.id_); diff --git a/cpp/unittest/db/test_meta_mysql.cpp b/cpp/unittest/db/test_meta_mysql.cpp index c1d33a17a4..a825e8cbdd 100644 --- a/cpp/unittest/db/test_meta_mysql.cpp +++ b/cpp/unittest/db/test_meta_mysql.cpp @@ -213,7 +213,7 @@ TEST_F(MySqlMetaTest, ARCHIVE_TEST_DISK) { for (auto i = 0; i < cnt; ++i) { status = impl.CreateTableFile(table_file); table_file.file_type_ = ms::engine::meta::TableFileSchema::NEW; - table_file.file_size_ = each_size * ms::engine::meta::G; + table_file.file_size_ = each_size * ms::engine::G; status = impl.UpdateTableFile(table_file); files.push_back(table_file); ids.push_back(table_file.id_); diff --git a/cpp/unittest/db/test_search.cpp b/cpp/unittest/db/test_search.cpp index 0b13af0c51..348463357e 100644 --- a/cpp/unittest/db/test_search.cpp +++ b/cpp/unittest/db/test_search.cpp @@ -22,10 +22,10 @@ #include "scheduler/task/SearchTask.h" #include "utils/TimeRecorder.h" -using namespace milvus::scheduler; - namespace { +namespace ms = milvus::scheduler; + void BuildResult(uint64_t nq, uint64_t topk, @@ -45,14 +45,14 @@ BuildResult(uint64_t nq, } } -void CheckTopkResult(const std::vector &input_ids_1, +void CheckTopkResult(const std::vector &input_ids_1, const std::vector &input_distance_1, - const std::vector &input_ids_2, + const std::vector &input_ids_2, const std::vector &input_distance_2, uint64_t nq, uint64_t topk, bool ascending, - const ResultSet& result) { + const ms::ResultSet& result) { ASSERT_EQ(result.size(), nq); ASSERT_EQ(input_ids_1.size(), input_distance_1.size()); ASSERT_EQ(input_ids_2.size(), input_distance_2.size()); @@ -85,21 +85,21 @@ TEST(DBSearchTest, TOPK_TEST) { uint64_t NQ = 15; uint64_t TOP_K = 64; bool ascending; - std::vector ids1, ids2; + std::vector ids1, ids2; std::vector dist1, dist2; - ResultSet result; + ms::ResultSet result; milvus::Status status; /* test1, id1/dist1 valid, id2/dist2 empty */ ascending = true; BuildResult(NQ, TOP_K, ascending, ids1, dist1); - status = XSearchTask::TopkResult(ids1, dist1, TOP_K, NQ, TOP_K, ascending, result); + status = ms::XSearchTask::TopkResult(ids1, dist1, TOP_K, NQ, TOP_K, ascending, result); ASSERT_TRUE(status.ok()); CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result); /* test2, id1/dist1 valid, id2/dist2 valid */ BuildResult(NQ, TOP_K, ascending, ids2, dist2); - status = XSearchTask::TopkResult(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); + status = ms::XSearchTask::TopkResult(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); ASSERT_TRUE(status.ok()); CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result); @@ -108,9 +108,9 @@ TEST(DBSearchTest, TOPK_TEST) { dist1.clear(); result.clear(); BuildResult(NQ, TOP_K/2, ascending, ids1, dist1); - status = XSearchTask::TopkResult(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result); + status = ms::XSearchTask::TopkResult(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result); ASSERT_TRUE(status.ok()); - status = XSearchTask::TopkResult(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); + status = ms::XSearchTask::TopkResult(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); ASSERT_TRUE(status.ok()); CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result); @@ -119,9 +119,9 @@ TEST(DBSearchTest, TOPK_TEST) { dist2.clear(); result.clear(); BuildResult(NQ, TOP_K/3, ascending, ids2, dist2); - status = XSearchTask::TopkResult(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result); + status = ms::XSearchTask::TopkResult(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result); ASSERT_TRUE(status.ok()); - status = XSearchTask::TopkResult(ids2, dist2, TOP_K/3, NQ, TOP_K, ascending, result); + status = ms::XSearchTask::TopkResult(ids2, dist2, TOP_K/3, NQ, TOP_K, ascending, result); ASSERT_TRUE(status.ok()); CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result); @@ -135,13 +135,13 @@ TEST(DBSearchTest, TOPK_TEST) { /* test1, id1/dist1 valid, id2/dist2 empty */ BuildResult(NQ, TOP_K, ascending, ids1, dist1); - status = XSearchTask::TopkResult(ids1, dist1, TOP_K, NQ, TOP_K, ascending, result); + status = ms::XSearchTask::TopkResult(ids1, dist1, TOP_K, NQ, TOP_K, ascending, result); ASSERT_TRUE(status.ok()); CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result); /* test2, id1/dist1 valid, id2/dist2 valid */ BuildResult(NQ, TOP_K, ascending, ids2, dist2); - status = XSearchTask::TopkResult(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); + status = ms::XSearchTask::TopkResult(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); ASSERT_TRUE(status.ok()); CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result); @@ -150,9 +150,9 @@ TEST(DBSearchTest, TOPK_TEST) { dist1.clear(); result.clear(); BuildResult(NQ, TOP_K/2, ascending, ids1, dist1); - status = XSearchTask::TopkResult(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result); + status = ms::XSearchTask::TopkResult(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result); ASSERT_TRUE(status.ok()); - status = XSearchTask::TopkResult(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); + status = ms::XSearchTask::TopkResult(ids2, dist2, TOP_K, NQ, TOP_K, ascending, result); ASSERT_TRUE(status.ok()); CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result); @@ -161,9 +161,9 @@ TEST(DBSearchTest, TOPK_TEST) { dist2.clear(); result.clear(); BuildResult(NQ, TOP_K/3, ascending, ids2, dist2); - status = XSearchTask::TopkResult(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result); + status = ms::XSearchTask::TopkResult(ids1, dist1, TOP_K/2, NQ, TOP_K, ascending, result); ASSERT_TRUE(status.ok()); - status = XSearchTask::TopkResult(ids2, dist2, TOP_K/3, NQ, TOP_K, ascending, result); + status = ms::XSearchTask::TopkResult(ids2, dist2, TOP_K/3, NQ, TOP_K, ascending, result); ASSERT_TRUE(status.ok()); CheckTopkResult(ids1, dist1, ids2, dist2, NQ, TOP_K, ascending, result); } @@ -173,9 +173,9 @@ TEST(DBSearchTest, REDUCE_PERF_TEST) { int32_t top_k = 1000; int32_t index_file_num = 478; /* sift1B dataset, index files num */ bool ascending = true; - std::vector input_ids; + std::vector input_ids; std::vector input_distance; - ResultSet final_result; + ms::ResultSet final_result; milvus::Status status; double span, reduce_cost = 0.0; @@ -187,7 +187,7 @@ TEST(DBSearchTest, REDUCE_PERF_TEST) { rc.RecordSection("do search for context: " + std::to_string(i)); // pick up topk result - status = XSearchTask::TopkResult(input_ids, input_distance, top_k, nq, top_k, ascending, final_result); + status = ms::XSearchTask::TopkResult(input_ids, input_distance, top_k, nq, top_k, ascending, final_result); ASSERT_TRUE(status.ok()); ASSERT_EQ(final_result.size(), nq);