MS-525 Disable parallel reduce in SearchTask

Former-commit-id: d5a39b47988f8d98cd31f3b5d5801d19dfc3f249
This commit is contained in:
wxyu 2019-09-09 14:59:42 +08:00
parent f00a5a11fa
commit 6375003116
2 changed files with 50 additions and 49 deletions

View File

@ -104,6 +104,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-519 - Add event_test in scheduler
- MS-520 - Update resource_test in scheduler
- MS-524 - Add some unittest in event_test and resource_test
- MS-525 - Disable parallel reduce in SearchTask
## New Feature
- MS-343 - Implement ResourceMgr

View File

@ -20,47 +20,47 @@ namespace engine {
static constexpr size_t PARALLEL_REDUCE_THRESHOLD = 10000;
static constexpr size_t PARALLEL_REDUCE_BATCH = 1000;
bool
NeedParallelReduce(uint64_t nq, uint64_t topk) {
server::ServerConfig &config = server::ServerConfig::GetInstance();
server::ConfigNode &db_config = config.GetConfig(server::CONFIG_DB);
bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, false);
if (!need_parallel) {
return false;
}
return nq * topk >= PARALLEL_REDUCE_THRESHOLD;
}
void
ParallelReduce(std::function<void(size_t, size_t)> &reduce_function, size_t max_index) {
size_t reduce_batch = PARALLEL_REDUCE_BATCH;
auto thread_count = std::thread::hardware_concurrency() - 1; //not all core do this work
if (thread_count > 0) {
reduce_batch = max_index / thread_count + 1;
}
ENGINE_LOG_DEBUG << "use " << thread_count <<
" thread parallelly do reduce, each thread process " << reduce_batch << " vectors";
std::vector<std::shared_ptr<std::thread> > thread_array;
size_t from_index = 0;
while (from_index < max_index) {
size_t to_index = from_index + reduce_batch;
if (to_index > max_index) {
to_index = max_index;
}
auto reduce_thread = std::make_shared<std::thread>(reduce_function, from_index, to_index);
thread_array.push_back(reduce_thread);
from_index = to_index;
}
for (auto &thread_ptr : thread_array) {
thread_ptr->join();
}
}
//bool
//NeedParallelReduce(uint64_t nq, uint64_t topk) {
// server::ServerConfig &config = server::ServerConfig::GetInstance();
// server::ConfigNode &db_config = config.GetConfig(server::CONFIG_DB);
// bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, false);
// if (!need_parallel) {
// return false;
// }
//
// return nq * topk >= PARALLEL_REDUCE_THRESHOLD;
//}
//
//void
//ParallelReduce(std::function<void(size_t, size_t)> &reduce_function, size_t max_index) {
// size_t reduce_batch = PARALLEL_REDUCE_BATCH;
//
// auto thread_count = std::thread::hardware_concurrency() - 1; //not all core do this work
// if (thread_count > 0) {
// reduce_batch = max_index / thread_count + 1;
// }
// ENGINE_LOG_DEBUG << "use " << thread_count <<
// " thread parallelly do reduce, each thread process " << reduce_batch << " vectors";
//
// std::vector<std::shared_ptr<std::thread> > thread_array;
// size_t from_index = 0;
// while (from_index < max_index) {
// size_t to_index = from_index + reduce_batch;
// if (to_index > max_index) {
// to_index = max_index;
// }
//
// auto reduce_thread = std::make_shared<std::thread>(reduce_function, from_index, to_index);
// thread_array.push_back(reduce_thread);
//
// from_index = to_index;
// }
//
// for (auto &thread_ptr : thread_array) {
// thread_ptr->join();
// }
//}
void
CollectFileMetrics(int file_type, size_t file_size) {
@ -238,11 +238,11 @@ Status XSearchTask::ClusterResult(const std::vector<long> &output_ids,
}
};
if (NeedParallelReduce(nq, topk)) {
ParallelReduce(reduce_worker, nq);
} else {
// if (NeedParallelReduce(nq, topk)) {
// ParallelReduce(reduce_worker, nq);
// } else {
reduce_worker(0, nq);
}
// }
return Status::OK();
}
@ -343,11 +343,11 @@ Status XSearchTask::TopkResult(SearchContext::ResultSet &result_src,
}
};
if (NeedParallelReduce(result_src.size(), topk)) {
ParallelReduce(ReduceWorker, result_src.size());
} else {
// if (NeedParallelReduce(result_src.size(), topk)) {
// ParallelReduce(ReduceWorker, result_src.size());
// } else {
ReduceWorker(0, result_src.size());
}
// }
return Status::OK();
}