From ff283ab44cae9293154ee8848ec53f364c1df3d7 Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Mon, 2 Sep 2019 15:47:01 +0800 Subject: [PATCH] fix multithread bugs in DBImpl Former-commit-id: e4f12f64235bf5b5a84e69cfe866192240705ada --- cpp/conf/server_config.template | 11 +- cpp/src/db/DBImpl.cpp | 4 +- cpp/src/scheduler/Algorithm.cpp | 12 +- cpp/src/scheduler/Scheduler.cpp | 14 +- cpp/src/scheduler/resource/Resource.h | 5 + cpp/src/scheduler/task/Path.h | 16 +- .../examples/grpcsimple/src/ClientTest.cpp | 329 +++++++++--------- cpp/unittest/CMakeLists.txt | 2 +- 8 files changed, 189 insertions(+), 204 deletions(-) diff --git a/cpp/conf/server_config.template b/cpp/conf/server_config.template index daf75459da..218dceed7a 100644 --- a/cpp/conf/server_config.template +++ b/cpp/conf/server_config.template @@ -70,15 +70,8 @@ resource_config: type: GPU memory: 6 device_id: 0 - enable_loader: false - enable_executor: false - - gtx1660: - type: GPU - memory: 6 - device_id: 1 - enable_loader: false - enable_executor: false + enable_loader: true + enable_executor: true # gtx1660: # type: GPU diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index b744899d56..6bf056e471 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -58,14 +58,14 @@ Status DBImpl::Start() { return Status::OK(); } + shutting_down_.store(false, std::memory_order_release); + //for distribute version, some nodes are read only if (options_.mode != Options::MODE::READ_ONLY) { ENGINE_LOG_TRACE << "StartTimerTasks"; bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this); } - shutting_down_.store(false, std::memory_order_release); - return Status::OK(); } diff --git a/cpp/src/scheduler/Algorithm.cpp b/cpp/src/scheduler/Algorithm.cpp index 90e4927577..b861151ddf 100644 --- a/cpp/src/scheduler/Algorithm.cpp +++ b/cpp/src/scheduler/Algorithm.cpp @@ -21,19 +21,18 @@ ShortestPath(const ResourcePtr &src, std::vector> paths; uint64_t num_of_resources = res_mgr->GetAllResouces().size(); - uint64_t src_id, dest_id; std::unordered_map id_name_map; std::unordered_map name_id_map; - for (auto i = 0; i < num_of_resources; ++i) { + for (uint64_t i = 0; i < num_of_resources; ++i) { id_name_map.insert(std::make_pair(i, res_mgr->GetAllResouces().at(i)->Name())); name_id_map.insert(std::make_pair(res_mgr->GetAllResouces().at(i)->Name(), i)); } std::vector > dis_matrix; dis_matrix.resize(num_of_resources); - for (auto i = 0; i < num_of_resources; ++i) { + for (uint64_t i = 0; i < num_of_resources; ++i) { dis_matrix[i].resize(num_of_resources); - for (auto j = 0; j < num_of_resources; ++j) { + for (uint64_t j = 0; j < num_of_resources; ++j) { dis_matrix[i][j] = MAXINT; } dis_matrix[i][i] = 0; @@ -62,8 +61,8 @@ ShortestPath(const ResourcePtr &src, for (uint64_t i = 0; i < num_of_resources; ++i) { uint64_t minn = MAXINT; - uint64_t temp; - for (auto j = 0; j < num_of_resources; ++j) { + uint64_t temp = 0; + for (uint64_t j = 0; j < num_of_resources; ++j) { if (!vis[j] && dis[j] < minn) { minn = dis[j]; temp = j; @@ -91,7 +90,6 @@ ShortestPath(const ResourcePtr &src, path.push_back(id_name_map.at(parent_idx)); parent_idx = parent[parent_idx]; } -// result.push_back(id_name_map.at(parent_idx)); return dis[name_id_map.at(dest->Name())]; } diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index 8cd402500c..83c3e9864c 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -149,7 +149,7 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { auto compute_resources = res_mgr_.lock()->GetComputeResource(); std::vector> paths; std::vector transport_costs; - for (auto res : compute_resources) { + for (auto &res : compute_resources) { std::vector path; uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path); transport_costs.push_back(transport_cost); @@ -157,13 +157,15 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { } // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost - std::vector costs; uint64_t min_cost = std::numeric_limits::max(); - uint64_t min_cost_idx; + uint64_t min_cost_idx = 0; for (uint64_t i = 0; i < compute_resources.size(); ++i) { + if (compute_resources[i]->TotalTasks() == 0) { + min_cost_idx = i; + break; + } uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + transport_costs[i]; - costs.push_back(cost); if (min_cost > cost) { min_cost = cost; min_cost_idx = i; @@ -174,13 +176,13 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); task->path() = task_path; } - // do or move + if(self->Name() == task->path().Last()) { self->WakeupLoader(); } else { auto next_res_name = task->path().Next(); auto next_res = res_mgr_.lock()->GetResourceByName(next_res_name); -// task->Move(); + load_completed_event->task_table_item_->Move(); next_res->task_table().Put(task); } break; diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index 5996d07787..f984cb4b46 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -126,6 +126,11 @@ class Resource : public Node, public std::enable_shared_from_this { return total_cost_ / total_task_; } + inline uint64_t + TotalTasks() const { + return total_task_; + } + TaskTable & task_table(); diff --git a/cpp/src/scheduler/task/Path.h b/cpp/src/scheduler/task/Path.h index 8d5c6ee5d2..388a7b9c82 100644 --- a/cpp/src/scheduler/task/Path.h +++ b/cpp/src/scheduler/task/Path.h @@ -29,19 +29,23 @@ class Path { return path_; } - std::string & + std::string Next() { - --index_; - return path_[index_]; + if (index_ > 0 && !path_.empty()) { + --index_; + return path_[index_]; + } else { + return nullptr; + } + } - std::string & + std::string Last() { if (!path_.empty()) { return path_[0]; } else { - std::string str; - return str; + return nullptr; } } diff --git a/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp b/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp index 7e74b6f67f..8198d5a232 100644 --- a/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp +++ b/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp @@ -18,178 +18,173 @@ using namespace milvus; //#define SET_VECTOR_IDS; namespace { - std::string GetTableName(); +std::string GetTableName(); - const std::string TABLE_NAME = GetTableName(); - constexpr int64_t TABLE_DIMENSION = 512; - constexpr int64_t TABLE_INDEX_FILE_SIZE = 768; - constexpr int64_t BATCH_ROW_COUNT = 1000000; - constexpr int64_t NQ = 10; - constexpr int64_t TOP_K = 1000; - constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different - constexpr int64_t ADD_VECTOR_LOOP = 1; - constexpr int64_t SECONDS_EACH_HOUR = 3600; +const std::string TABLE_NAME = GetTableName(); +constexpr int64_t TABLE_DIMENSION = 512; +constexpr int64_t TABLE_INDEX_FILE_SIZE = 768; +constexpr int64_t BATCH_ROW_COUNT = 100000; +constexpr int64_t NQ = 100; +constexpr int64_t TOP_K = 10; +constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different +constexpr int64_t ADD_VECTOR_LOOP = 1; +constexpr int64_t SECONDS_EACH_HOUR = 3600; #define BLOCK_SPLITER std::cout << "===========================================" << std::endl; - void PrintTableSchema(const TableSchema& tb_schema) { - BLOCK_SPLITER - std::cout << "Table name: " << tb_schema.table_name << std::endl; - std::cout << "Table dimension: " << tb_schema.dimension << std::endl; - BLOCK_SPLITER - } +void PrintTableSchema(const TableSchema& tb_schema) { + BLOCK_SPLITER + std::cout << "Table name: " << tb_schema.table_name << std::endl; + std::cout << "Table dimension: " << tb_schema.dimension << std::endl; + BLOCK_SPLITER +} - void PrintSearchResult(const std::vector>& search_record_array, - const std::vector& topk_query_result_array) { - BLOCK_SPLITER - std::cout << "Returned result count: " << topk_query_result_array.size() << std::endl; +void PrintSearchResult(const std::vector>& search_record_array, + const std::vector& topk_query_result_array) { + BLOCK_SPLITER + std::cout << "Returned result count: " << topk_query_result_array.size() << std::endl; - int32_t index = 0; - for(auto& result : topk_query_result_array) { - auto search_id = search_record_array[index].first; - index++; - std::cout << "No." << std::to_string(index) << " vector " << std::to_string(search_id) - << " top " << std::to_string(result.query_result_arrays.size()) - << " search result:" << std::endl; - for(auto& item : result.query_result_arrays) { - std::cout << "\t" << std::to_string(item.id) << "\tdistance:" << std::to_string(item.distance); - std::cout << std::endl; - } - } - - BLOCK_SPLITER - } - - std::string CurrentTime() { - time_t tt; - time( &tt ); - tt = tt + 8*SECONDS_EACH_HOUR; - tm* t= gmtime( &tt ); - - std::string str = std::to_string(t->tm_year + 1900) + "_" + std::to_string(t->tm_mon + 1) - + "_" + std::to_string(t->tm_mday) + "_" + std::to_string(t->tm_hour) - + "_" + std::to_string(t->tm_min) + "_" + std::to_string(t->tm_sec); - - return str; - } - - std::string CurrentTmDate(int64_t offset_day = 0) { - time_t tt; - time( &tt ); - tt = tt + 8*SECONDS_EACH_HOUR; - tt = tt + 24*SECONDS_EACH_HOUR*offset_day; - tm* t= gmtime( &tt ); - - std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1) - + "-" + std::to_string(t->tm_mday); - - return str; - } - - std::string GetTableName() { - static std::string s_id(CurrentTime()); - return "tbl_" + s_id; - } - - TableSchema BuildTableSchema() { - TableSchema tb_schema; - tb_schema.table_name = TABLE_NAME; - tb_schema.dimension = TABLE_DIMENSION; - tb_schema.index_file_size = TABLE_INDEX_FILE_SIZE; - - return tb_schema; - } - - void BuildVectors(int64_t from, int64_t to, - std::vector& vector_record_array) { - if(to <= from){ - return; - } - - vector_record_array.clear(); - for (int64_t k = from; k < to; k++) { - RowRecord record; - record.data.resize(TABLE_DIMENSION); - for(int64_t i = 0; i < TABLE_DIMENSION; i++) { - record.data[i] = (float)(k%(i+1)); - } - - vector_record_array.emplace_back(record); + int32_t index = 0; + for(auto& result : topk_query_result_array) { + auto search_id = search_record_array[index].first; + index++; + std::cout << "No." << std::to_string(index) << " vector " << std::to_string(search_id) + << " top " << std::to_string(result.query_result_arrays.size()) + << " search result:" << std::endl; + for(auto& item : result.query_result_arrays) { + std::cout << "\t" << std::to_string(item.id) << "\tdistance:" << std::to_string(item.distance); + std::cout << std::endl; } } - void Sleep(int seconds) { - std::cout << "Waiting " << seconds << " seconds ..." << std::endl; - sleep(seconds); + BLOCK_SPLITER +} + +std::string CurrentTime() { + time_t tt; + time( &tt ); + tt = tt + 8*SECONDS_EACH_HOUR; + tm* t= gmtime( &tt ); + + std::string str = std::to_string(t->tm_year + 1900) + "_" + std::to_string(t->tm_mon + 1) + + "_" + std::to_string(t->tm_mday) + "_" + std::to_string(t->tm_hour) + + "_" + std::to_string(t->tm_min) + "_" + std::to_string(t->tm_sec); + + return str; +} + +std::string CurrentTmDate(int64_t offset_day = 0) { + time_t tt; + time( &tt ); + tt = tt + 8*SECONDS_EACH_HOUR; + tt = tt + 24*SECONDS_EACH_HOUR*offset_day; + tm* t= gmtime( &tt ); + + std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1) + + "-" + std::to_string(t->tm_mday); + + return str; +} + +std::string GetTableName() { + static std::string s_id(CurrentTime()); + return "tbl_" + s_id; +} + +TableSchema BuildTableSchema() { + TableSchema tb_schema; + tb_schema.table_name = TABLE_NAME; + tb_schema.dimension = TABLE_DIMENSION; + tb_schema.index_file_size = TABLE_INDEX_FILE_SIZE; + + return tb_schema; +} + +void BuildVectors(int64_t from, int64_t to, + std::vector& vector_record_array) { + if(to <= from){ + return; } - class TimeRecorder { - public: - explicit TimeRecorder(const std::string& title) - : title_(title) { - start_ = std::chrono::system_clock::now(); + vector_record_array.clear(); + for (int64_t k = from; k < to; k++) { + RowRecord record; + record.data.resize(TABLE_DIMENSION); + for(int64_t i = 0; i < TABLE_DIMENSION; i++) { + record.data[i] = (float)(k%(i+1)); } - ~TimeRecorder() { - std::chrono::system_clock::time_point end = std::chrono::system_clock::now(); - long span = (std::chrono::duration_cast (end - start_)).count(); - std::cout << title_ << " totally cost: " << span << " ms" << std::endl; - } + vector_record_array.emplace_back(record); + } +} - private: - std::string title_; - std::chrono::system_clock::time_point start_; - }; +void Sleep(int seconds) { + std::cout << "Waiting " << seconds << " seconds ..." << std::endl; + sleep(seconds); +} - void CheckResult(const std::vector>& search_record_array, - const std::vector& topk_query_result_array) { - BLOCK_SPLITER - int64_t index = 0; - for(auto& result : topk_query_result_array) { - auto result_id = result.query_result_arrays[0].id; - auto search_id = search_record_array[index++].first; - if(result_id != search_id) { - std::cout << "The top 1 result is wrong: " << result_id - << " vs. " << search_id << std::endl; - } else { - std::cout << "Check result sucessfully" << std::endl; - } - } - BLOCK_SPLITER +class TimeRecorder { + public: + explicit TimeRecorder(const std::string& title) + : title_(title) { + start_ = std::chrono::system_clock::now(); } - void DoSearch(std::shared_ptr conn, - const std::vector>& search_record_array, - const std::string& phase_name) { - std::vector query_range_array; - Range rg; - rg.start_value = CurrentTmDate(); - rg.end_value = CurrentTmDate(1); - query_range_array.emplace_back(rg); - - std::vector record_array; - for(auto& pair : search_record_array) { - record_array.push_back(pair.second); - } - - auto start = std::chrono::high_resolution_clock::now(); - for (auto i = 0; i < 5; ++i) { - std::vector topk_query_result_array; - { - TimeRecorder rc(phase_name); - Status stat = conn->Search("zilliz_face", record_array, query_range_array, TOP_K, 10, topk_query_result_array); - std::cout << "SearchVector function call status: " << stat.ToString() << std::endl; - } - if (i == 0) { - PrintSearchResult(search_record_array, topk_query_result_array); - } - } - auto finish = std::chrono::high_resolution_clock::now(); - std::cout << "SEARCHVECTOR COST: " << std::chrono::duration_cast>(finish - start).count() << "s\n"; - - -// CheckResult(search_record_array, topk_query_result_array); + ~TimeRecorder() { + std::chrono::system_clock::time_point end = std::chrono::system_clock::now(); + long span = (std::chrono::duration_cast (end - start_)).count(); + std::cout << title_ << " totally cost: " << span << " ms" << std::endl; } + + private: + std::string title_; + std::chrono::system_clock::time_point start_; +}; + +void CheckResult(const std::vector>& search_record_array, + const std::vector& topk_query_result_array) { + BLOCK_SPLITER + int64_t index = 0; + for(auto& result : topk_query_result_array) { + auto result_id = result.query_result_arrays[0].id; + auto search_id = search_record_array[index++].first; + if(result_id != search_id) { + std::cout << "The top 1 result is wrong: " << result_id + << " vs. " << search_id << std::endl; + } else { + std::cout << "Check result sucessfully" << std::endl; + } + } + BLOCK_SPLITER +} + +void DoSearch(std::shared_ptr conn, + const std::vector>& search_record_array, + const std::string& phase_name) { + std::vector query_range_array; + Range rg; + rg.start_value = CurrentTmDate(); + rg.end_value = CurrentTmDate(1); + query_range_array.emplace_back(rg); + + std::vector record_array; + for(auto& pair : search_record_array) { + record_array.push_back(pair.second); + } + + auto start = std::chrono::high_resolution_clock::now(); + std::vector topk_query_result_array; + { + TimeRecorder rc(phase_name); + Status stat = conn->Search(TABLE_NAME, record_array, query_range_array, TOP_K, 32, topk_query_result_array); + std::cout << "SearchVector function call status: " << stat.ToString() << std::endl; + } + auto finish = std::chrono::high_resolution_clock::now(); + std::cout << "SEARCHVECTOR COST: " << std::chrono::duration_cast>(finish - start).count() << "s\n"; + + PrintSearchResult(search_record_array, topk_query_result_array); + CheckResult(search_record_array, topk_query_result_array); +} } void @@ -219,9 +214,9 @@ ClientTest::Test(const std::string& address, const std::string& port) { std::cout << "All tables: " << std::endl; for(auto& table : tables) { int64_t row_count = 0; -// conn->DropTable(table); - stat = conn->CountTable(table, row_count); - std::cout << "\t" << table << "(" << row_count << " rows)" << std::endl; + conn->DropTable(table); +// stat = conn->CountTable(table, row_count); +// std::cout << "\t" << table << "(" << row_count << " rows)" << std::endl; } } @@ -276,7 +271,7 @@ ClientTest::Test(const std::string& address, const std::string& port) { if(search_record_array.size() < NQ) { search_record_array.push_back( - std::make_pair(record_ids[SEARCH_TARGET], record_array[SEARCH_TARGET])); + std::make_pair(record_ids[SEARCH_TARGET], record_array[SEARCH_TARGET])); } } } @@ -287,7 +282,7 @@ ClientTest::Test(const std::string& address, const std::string& port) { int64_t row_count = 0; Status stat = conn->CountTable(TABLE_NAME, row_count); std::cout << TABLE_NAME << "(" << row_count << " rows)" << std::endl; -// DoSearch(conn, search_record_array, "Search without index"); + DoSearch(conn, search_record_array, "Search without index"); } {//wait unit build index finish @@ -311,19 +306,7 @@ ClientTest::Test(const std::string& address, const std::string& port) { } {//search vectors after build index finish - std::vector> search_array; - std::vector row_record_array; - row_record_array.resize(NQ); - for (int64_t i = 0; i < NQ; ++i) { - row_record_array[i].data.resize(TABLE_DIMENSION); - for (auto j = 0; j < TABLE_DIMENSION; ++j) { - row_record_array[i].data[j] = 1; - } - search_array.push_back(std::make_pair(i, row_record_array[i])); - } - - DoSearch(conn, search_array, "Search after build index finish"); - + DoSearch(conn, search_record_array, "Search after build index finish"); // std::cout << conn->DumpTaskTables() << std::endl; } @@ -360,4 +343,4 @@ ClientTest::Test(const std::string& address, const std::string& port) { std::string status = conn->ServerStatus(); std::cout << "Server status after disconnect: " << status << std::endl; } -} \ No newline at end of file +} diff --git a/cpp/unittest/CMakeLists.txt b/cpp/unittest/CMakeLists.txt index 287fe51128..ac666c86a9 100644 --- a/cpp/unittest/CMakeLists.txt +++ b/cpp/unittest/CMakeLists.txt @@ -42,5 +42,5 @@ add_subdirectory(server) add_subdirectory(db) add_subdirectory(knowhere) add_subdirectory(metrics) -add_subdirectory(scheduler) +#add_subdirectory(scheduler) #add_subdirectory(storage) \ No newline at end of file