From 71fa8bdd05dca378bfbb6ec042c5102436080d92 Mon Sep 17 00:00:00 2001 From: jinhai Date: Thu, 31 Oct 2019 11:03:22 +0000 Subject: [PATCH 1/2] Fix parallel merge issue Former-commit-id: 7eae24c65e50c6c0abc423b841c14bf16a8c0a1d --- core/src/scheduler/job/SearchJob.h | 4 ++++ core/src/scheduler/task/SearchTask.cpp | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/scheduler/job/SearchJob.h b/core/src/scheduler/job/SearchJob.h index 1e586090b9..40cf59e24e 100644 --- a/core/src/scheduler/job/SearchJob.h +++ b/core/src/scheduler/job/SearchJob.h @@ -90,6 +90,10 @@ class SearchJob : public Job { return index_files_; } + std::mutex& mutex() { + return mutex_; + } + private: uint64_t topk_ = 0; uint64_t nq_ = 0; diff --git a/core/src/scheduler/task/SearchTask.cpp b/core/src/scheduler/task/SearchTask.cpp index 1bf1caff76..7daa6268e8 100644 --- a/core/src/scheduler/task/SearchTask.cpp +++ b/core/src/scheduler/task/SearchTask.cpp @@ -219,8 +219,11 @@ XSearchTask::Execute() { // step 3: pick up topk result auto spec_k = index_engine_->Count() < topk ? index_engine_->Count() : topk; - XSearchTask::MergeTopkToResultSet(output_ids, output_distance, spec_k, nq, topk, metric_l2, + { + std::unique_lock lock(search_job->mutex()); + XSearchTask::MergeTopkToResultSet(output_ids, output_distance, spec_k, nq, topk, metric_l2, search_job->GetResult()); + } span = rc.RecordSection(hdr + ", reduce topk"); // search_job->AccumReduceCost(span); From d01c555604554db44dbdb4c7bcca962cce730fe3 Mon Sep 17 00:00:00 2001 From: jinhai Date: Thu, 31 Oct 2019 11:05:04 +0000 Subject: [PATCH 2/2] Fix lint Former-commit-id: 52ff843377f1463bb4cad7fcf67366b3facd8d10 --- core/src/scheduler/job/SearchJob.h | 3 ++- core/src/scheduler/task/SearchTask.cpp | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/scheduler/job/SearchJob.h b/core/src/scheduler/job/SearchJob.h index 40cf59e24e..90fcf36773 100644 --- a/core/src/scheduler/job/SearchJob.h +++ b/core/src/scheduler/job/SearchJob.h @@ -90,7 +90,8 @@ class SearchJob : public Job { return index_files_; } - std::mutex& mutex() { + std::mutex& + mutex() { return mutex_; } diff --git a/core/src/scheduler/task/SearchTask.cpp b/core/src/scheduler/task/SearchTask.cpp index 7daa6268e8..edeb41bdbe 100644 --- a/core/src/scheduler/task/SearchTask.cpp +++ b/core/src/scheduler/task/SearchTask.cpp @@ -222,7 +222,7 @@ XSearchTask::Execute() { { std::unique_lock lock(search_job->mutex()); XSearchTask::MergeTopkToResultSet(output_ids, output_distance, spec_k, nq, topk, metric_l2, - search_job->GetResult()); + search_job->GetResult()); } span = rc.RecordSection(hdr + ", reduce topk");