diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 3c81868e32..83f1aa6998 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -104,6 +104,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-519 - Add event_test in scheduler - MS-520 - Update resource_test in scheduler - MS-524 - Add some unittest in event_test and resource_test +- MS-525 - Disable parallel reduce in SearchTask ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/task/SearchTask.cpp b/cpp/src/scheduler/task/SearchTask.cpp index aa526517fa..47d5532921 100644 --- a/cpp/src/scheduler/task/SearchTask.cpp +++ b/cpp/src/scheduler/task/SearchTask.cpp @@ -20,47 +20,47 @@ namespace engine { static constexpr size_t PARALLEL_REDUCE_THRESHOLD = 10000; static constexpr size_t PARALLEL_REDUCE_BATCH = 1000; -bool -NeedParallelReduce(uint64_t nq, uint64_t topk) { - server::ServerConfig &config = server::ServerConfig::GetInstance(); - server::ConfigNode &db_config = config.GetConfig(server::CONFIG_DB); - bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, false); - if (!need_parallel) { - return false; - } - - return nq * topk >= PARALLEL_REDUCE_THRESHOLD; -} - -void -ParallelReduce(std::function &reduce_function, size_t max_index) { - size_t reduce_batch = PARALLEL_REDUCE_BATCH; - - auto thread_count = std::thread::hardware_concurrency() - 1; //not all core do this work - if (thread_count > 0) { - reduce_batch = max_index / thread_count + 1; - } - ENGINE_LOG_DEBUG << "use " << thread_count << - " thread parallelly do reduce, each thread process " << reduce_batch << " vectors"; - - std::vector > thread_array; - size_t from_index = 0; - while (from_index < max_index) { - size_t to_index = from_index + reduce_batch; - if (to_index > max_index) { - to_index = max_index; - } - - auto reduce_thread = std::make_shared(reduce_function, from_index, to_index); - thread_array.push_back(reduce_thread); - - from_index = to_index; - } - - for (auto &thread_ptr : thread_array) { - thread_ptr->join(); - } -} +//bool +//NeedParallelReduce(uint64_t nq, uint64_t topk) { +// server::ServerConfig &config = server::ServerConfig::GetInstance(); +// server::ConfigNode &db_config = config.GetConfig(server::CONFIG_DB); +// bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, false); +// if (!need_parallel) { +// return false; +// } +// +// return nq * topk >= PARALLEL_REDUCE_THRESHOLD; +//} +// +//void +//ParallelReduce(std::function &reduce_function, size_t max_index) { +// size_t reduce_batch = PARALLEL_REDUCE_BATCH; +// +// auto thread_count = std::thread::hardware_concurrency() - 1; //not all core do this work +// if (thread_count > 0) { +// reduce_batch = max_index / thread_count + 1; +// } +// ENGINE_LOG_DEBUG << "use " << thread_count << +// " thread parallelly do reduce, each thread process " << reduce_batch << " vectors"; +// +// std::vector > thread_array; +// size_t from_index = 0; +// while (from_index < max_index) { +// size_t to_index = from_index + reduce_batch; +// if (to_index > max_index) { +// to_index = max_index; +// } +// +// auto reduce_thread = std::make_shared(reduce_function, from_index, to_index); +// thread_array.push_back(reduce_thread); +// +// from_index = to_index; +// } +// +// for (auto &thread_ptr : thread_array) { +// thread_ptr->join(); +// } +//} void CollectFileMetrics(int file_type, size_t file_size) { @@ -238,11 +238,11 @@ Status XSearchTask::ClusterResult(const std::vector &output_ids, } }; - if (NeedParallelReduce(nq, topk)) { - ParallelReduce(reduce_worker, nq); - } else { +// if (NeedParallelReduce(nq, topk)) { +// ParallelReduce(reduce_worker, nq); +// } else { reduce_worker(0, nq); - } +// } return Status::OK(); } @@ -343,11 +343,11 @@ Status XSearchTask::TopkResult(SearchContext::ResultSet &result_src, } }; - if (NeedParallelReduce(result_src.size(), topk)) { - ParallelReduce(ReduceWorker, result_src.size()); - } else { +// if (NeedParallelReduce(result_src.size(), topk)) { +// ParallelReduce(ReduceWorker, result_src.size()); +// } else { ReduceWorker(0, result_src.size()); - } +// } return Status::OK(); }