From bc74d16376b393c9876239ae2dbc3418f685906d Mon Sep 17 00:00:00 2001 From: "shengjun.li" <49774184+shengjun1985@users.noreply.github.com> Date: Tue, 12 May 2020 09:27:45 +0800 Subject: [PATCH] #2283 Suspend the building tasks when any query command arrives (#2285) * add builder suspend Signed-off-by: shengjun.li * IndexIVF builder check wait Signed-off-by: sahuang * Build suspend for all IVF Signed-off-by: sahuang * HNSW suspend Signed-off-by: sahuang * HNSW suspend Signed-off-by: sahuang * HNSW suspend Signed-off-by: sahuang * HNSW suspend Signed-off-by: sahuang * HNSW suspend Signed-off-by: sahuang * cpubuilder Signed-off-by: wxyu * add suspend check during annoy build index Signed-off-by: cmli * fix hnsw Signed-off-by: shengjun.li * fix changelog Signed-off-by: shengjun.li * fix clang format Signed-off-by: shengjun.li * suspend nsg Signed-off-by: shengjun.li Co-authored-by: sahuang Co-authored-by: wxyu Co-authored-by: cmli Co-authored-by: JinHai-CN --- CHANGELOG.md | 1 + core/src/db/DBImpl.cpp | 26 ++++++++ core/src/db/DBImpl.h | 9 +++ .../knowhere/index/vector_index/IndexHNSW.cpp | 2 + .../knowhere/index/vector_index/IndexIVF.cpp | 4 ++ .../vector_index/helpers/BuilderSuspend.h | 30 +++++++++ .../index/vector_index/impl/nsg/NSG.cpp | 4 ++ .../src/index/thirdparty/annoy/src/annoylib.h | 3 + .../index/thirdparty/faiss/BuilderSuspend.cpp | 35 ++++++++++ .../index/thirdparty/faiss/BuilderSuspend.h | 33 ++++++++++ .../thirdparty/faiss/utils/distances.cpp | 4 +- .../index/thirdparty/faiss/utils/distances.h | 1 + core/src/scheduler/CPUBuilder.cpp | 66 +++++++++++++++++++ core/src/scheduler/CPUBuilder.h | 55 ++++++++++++++++ core/src/scheduler/JobMgr.cpp | 7 +- core/src/scheduler/SchedInst.cpp | 5 ++ core/src/scheduler/SchedInst.h | 19 ++++++ 17 files changed, 302 insertions(+), 2 deletions(-) create mode 100644 core/src/index/knowhere/knowhere/index/vector_index/helpers/BuilderSuspend.h create mode 100644 core/src/index/thirdparty/faiss/BuilderSuspend.cpp create mode 100644 core/src/index/thirdparty/faiss/BuilderSuspend.h create mode 100644 core/src/scheduler/CPUBuilder.cpp create mode 100644 core/src/scheduler/CPUBuilder.h diff --git a/CHANGELOG.md b/CHANGELOG.md index c077578898..e45b6b8d17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ Please mark all change in change log and use the issue from GitHub - \#2206 Log file rotating - \#2240 Obtain running rpc requests information - \#2268 Intelligently detect openblas library in system to avoid installing from source code every time +- \#2283 Suspend the building tasks when any query comand arrives. ## Improvement - \#221 Refactor LOG macro diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index b0abd4abec..e78f410573 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -33,6 +34,7 @@ #include "db/IDGenerator.h" #include "db/merge/MergeManagerFactory.h" #include "engine/EngineFactory.h" +#include "index/knowhere/knowhere/index/vector_index/helpers/BuilderSuspend.h" #include "index/thirdparty/faiss/utils/distances.h" #include "insert/MemManagerFactory.h" #include "meta/MetaConsts.h" @@ -1721,10 +1723,16 @@ DBImpl::QueryAsync(const std::shared_ptr& context, meta::FilesH job->AddIndexFile(file_ptr); } + // Suspend builder + SuspendIfFirst(); + // step 2: put search job to scheduler and wait result scheduler::JobMgrInst::GetInstance()->Put(job); job->WaitResult(); + // Resume builder + ResumeIfLast(); + files_holder.ReleaseFiles(); if (!job->GetStatus().ok()) { return job->GetStatus(); @@ -2649,5 +2657,23 @@ DBImpl::OnUseBlasThresholdChanged(int64_t threshold) { faiss::distance_compute_blas_threshold = threshold; } +void +DBImpl::SuspendIfFirst() { + std::lock_guard lock(suspend_build_mutex_); + if (++live_search_num_ == 1) { + LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_; + knowhere::BuilderSuspend(); + } +} + +void +DBImpl::ResumeIfLast() { + std::lock_guard lock(suspend_build_mutex_); + if (--live_search_num_ == 0) { + LOG_ENGINE_TRACE_ << "live_search_num_: " << live_search_num_; + knowhere::BuildResume(); + } +} + } // namespace engine } // namespace milvus diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index 4843aaaa63..60a61fc691 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -278,6 +278,12 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi Status ExecWalRecord(const wal::MXLogRecord& record); + void + SuspendIfFirst(); + + void + ResumeIfLast(); + private: DBOptions options_; @@ -357,6 +363,9 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi IndexFailedChecker index_failed_checker_; std::mutex flush_merge_compact_mutex_; + + int64_t live_search_num_ = 0; + std::mutex suspend_build_mutex_; }; // DBImpl } // namespace engine diff --git a/core/src/index/knowhere/knowhere/index/vector_index/IndexHNSW.cpp b/core/src/index/knowhere/knowhere/index/vector_index/IndexHNSW.cpp index 858f60e121..045ea60375 100644 --- a/core/src/index/knowhere/knowhere/index/vector_index/IndexHNSW.cpp +++ b/core/src/index/knowhere/knowhere/index/vector_index/IndexHNSW.cpp @@ -17,6 +17,7 @@ #include #include +#include "faiss/BuilderSuspend.h" #include "hnswlib/hnswalg.h" #include "hnswlib/space_ip.h" #include "hnswlib/space_l2.h" @@ -124,6 +125,7 @@ IndexHNSW::Add(const DatasetPtr& dataset_ptr, const Config& config) { index_->addPoint(p_data, p_ids[0]); #pragma omp parallel for for (int i = 1; i < rows; ++i) { + faiss::BuilderSuspend::check_wait(); index_->addPoint(((float*)p_data + Dim() * i), p_ids[i]); } } diff --git a/core/src/index/knowhere/knowhere/index/vector_index/IndexIVF.cpp b/core/src/index/knowhere/knowhere/index/vector_index/IndexIVF.cpp index 2779f3ac93..0d208dc087 100644 --- a/core/src/index/knowhere/knowhere/index/vector_index/IndexIVF.cpp +++ b/core/src/index/knowhere/knowhere/index/vector_index/IndexIVF.cpp @@ -30,6 +30,7 @@ #include #include +#include "faiss/BuilderSuspend.h" #include "knowhere/common/Exception.h" #include "knowhere/common/Log.h" #include "knowhere/index/vector_index/IndexIVF.h" @@ -256,6 +257,9 @@ IVF::GenGraph(const float* data, const int64_t k, GraphType& graph, const Config graph.resize(ntotal); GraphType res_vec(total_search_count); for (int i = 0; i < total_search_count; ++i) { + // it is usually used in NSG::train, to check BuilderSuspend + faiss::BuilderSuspend::check_wait(); + auto b_size = (i == (total_search_count - 1)) && tail_batch_size != 0 ? tail_batch_size : batch_size; auto& res = res_vec[i]; diff --git a/core/src/index/knowhere/knowhere/index/vector_index/helpers/BuilderSuspend.h b/core/src/index/knowhere/knowhere/index/vector_index/helpers/BuilderSuspend.h new file mode 100644 index 0000000000..d77d9a9dea --- /dev/null +++ b/core/src/index/knowhere/knowhere/index/vector_index/helpers/BuilderSuspend.h @@ -0,0 +1,30 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include "faiss/BuilderSuspend.h" + +namespace milvus { +namespace knowhere { + +inline void +BuilderSuspend() { + faiss::BuilderSuspend::suspend(); +} + +inline void +BuildResume() { + faiss::BuilderSuspend::resume(); +} + +} // namespace knowhere +} // namespace milvus diff --git a/core/src/index/knowhere/knowhere/index/vector_index/impl/nsg/NSG.cpp b/core/src/index/knowhere/knowhere/index/vector_index/impl/nsg/NSG.cpp index 376182d15b..dbc4cd5bc5 100644 --- a/core/src/index/knowhere/knowhere/index/vector_index/impl/nsg/NSG.cpp +++ b/core/src/index/knowhere/knowhere/index/vector_index/impl/nsg/NSG.cpp @@ -19,6 +19,7 @@ #include #include +#include "faiss/BuilderSuspend.h" #include "knowhere/common/Exception.h" #include "knowhere/common/Log.h" #include "knowhere/common/Timer.h" @@ -432,6 +433,7 @@ NsgIndex::Link() { boost::dynamic_bitset<> flags{ntotal, 0}; #pragma omp for schedule(dynamic, 100) for (size_t n = 0; n < ntotal; ++n) { + faiss::BuilderSuspend::check_wait(); fullset.clear(); temp.clear(); flags.reset(); @@ -461,6 +463,7 @@ NsgIndex::Link() { std::vector mutex_vec(ntotal); #pragma omp for schedule(dynamic, 100) for (unsigned n = 0; n < ntotal; ++n) { + faiss::BuilderSuspend::check_wait(); InterInsert(n, mutex_vec, cut_graph_dist); } delete[] cut_graph_dist; @@ -611,6 +614,7 @@ NsgIndex::CheckConnectivity() { int64_t linked_count = 0; while (linked_count < static_cast(ntotal)) { + faiss::BuilderSuspend::check_wait(); DFS(root, has_linked, linked_count); if (linked_count >= static_cast(ntotal)) { break; diff --git a/core/src/index/thirdparty/annoy/src/annoylib.h b/core/src/index/thirdparty/annoy/src/annoylib.h index ed83629462..00058099c9 100644 --- a/core/src/index/thirdparty/annoy/src/annoylib.h +++ b/core/src/index/thirdparty/annoy/src/annoylib.h @@ -126,6 +126,7 @@ inline void set_error_from_string(char **error, const char* msg) { #endif #include +#include using std::vector; using std::pair; @@ -1280,6 +1281,7 @@ protected: vector children_indices[2]; Node* m = (Node*)alloca(_s); D::create_split(children, _f, _s, _random, m); + faiss::BuilderSuspend::check_wait(); for (size_t i = 0; i < indices.size(); i++) { S j = indices[i]; @@ -1319,6 +1321,7 @@ protected: m->n_descendants = is_root ? _n_items : (S)indices.size(); for (int side = 0; side < 2; side++) { // run _make_tree for the smallest child first (for cache locality) + faiss::BuilderSuspend::check_wait(); m->children[side^flip] = _make_tree(children_indices[side^flip], false); } diff --git a/core/src/index/thirdparty/faiss/BuilderSuspend.cpp b/core/src/index/thirdparty/faiss/BuilderSuspend.cpp new file mode 100644 index 0000000000..dd32b630f7 --- /dev/null +++ b/core/src/index/thirdparty/faiss/BuilderSuspend.cpp @@ -0,0 +1,35 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "BuilderSuspend.h" + +namespace faiss { + +std::atomic BuilderSuspend::suspend_flag_(false); +std::mutex BuilderSuspend::mutex_; +std::condition_variable BuilderSuspend::cv_; + +void BuilderSuspend::suspend() { + suspend_flag_ = true; +} + +void BuilderSuspend::resume() { + suspend_flag_ = false; +} + +void BuilderSuspend::check_wait() { + while (suspend_flag_) { + std::unique_lock lck(mutex_); + cv_.wait_for(lck, std::chrono::seconds(5)); + } +} + +} // namespace faiss diff --git a/core/src/index/thirdparty/faiss/BuilderSuspend.h b/core/src/index/thirdparty/faiss/BuilderSuspend.h new file mode 100644 index 0000000000..d5291a9628 --- /dev/null +++ b/core/src/index/thirdparty/faiss/BuilderSuspend.h @@ -0,0 +1,33 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include +#include +#include + +namespace faiss { + +class BuilderSuspend { +public: + static void suspend(); + static void resume(); + static void check_wait(); + +private: + static std::atomic suspend_flag_; + static std::mutex mutex_; + static std::condition_variable cv_; + +}; + +} // namespace faiss diff --git a/core/src/index/thirdparty/faiss/utils/distances.cpp b/core/src/index/thirdparty/faiss/utils/distances.cpp index a80c7c32d7..50f16b53b0 100644 --- a/core/src/index/thirdparty/faiss/utils/distances.cpp +++ b/core/src/index/thirdparty/faiss/utils/distances.cpp @@ -15,7 +15,7 @@ #include #include - +#include #include #include #include @@ -1015,6 +1015,8 @@ void elkan_L2_sse ( float *data = (float *) malloc((bs_y * (bs_y - 1) / 2) * sizeof (float)); for (size_t j0 = 0; j0 < ny; j0 += bs_y) { + BuilderSuspend::check_wait(); + size_t j1 = j0 + bs_y; if (j1 > ny) j1 = ny; diff --git a/core/src/index/thirdparty/faiss/utils/distances.h b/core/src/index/thirdparty/faiss/utils/distances.h index 9d0f5a066a..c54f691991 100644 --- a/core/src/index/thirdparty/faiss/utils/distances.h +++ b/core/src/index/thirdparty/faiss/utils/distances.h @@ -186,6 +186,7 @@ void knn_jaccard ( size_t d, size_t nx, size_t ny, float_maxheap_array_t * res, ConcurrentBitsetPtr bitset = nullptr); + /** same as knn_L2sqr, but base_shift[bno] is subtracted to all * computed distances. * diff --git a/core/src/scheduler/CPUBuilder.cpp b/core/src/scheduler/CPUBuilder.cpp new file mode 100644 index 0000000000..78794b4cc0 --- /dev/null +++ b/core/src/scheduler/CPUBuilder.cpp @@ -0,0 +1,66 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "scheduler/CPUBuilder.h" +#include "utils/Log.h" + +namespace milvus { +namespace scheduler { + +void +CPUBuilder::Start() { + std::lock_guard lock(mutex_); + if (not running_) { + running_ = true; + thread_ = std::thread(&CPUBuilder::worker_function, this); + } +} + +void +CPUBuilder::Stop() { + std::lock_guard lock(mutex_); + if (running_) { + this->Put(nullptr); + thread_.join(); + running_ = false; + } +} + +void +CPUBuilder::Put(const TaskPtr& task) { + { + std::lock_guard lock(queue_mutex_); + queue_.push(task); + } + queue_cv_.notify_one(); +} + +void +CPUBuilder::worker_function() { + SetThreadName("cpubuilder_thread"); + while (running_) { + std::unique_lock lock(queue_mutex_); + queue_cv_.wait(lock, [&] { return not queue_.empty(); }); + auto task = queue_.front(); + queue_.pop(); + lock.unlock(); + + if (task == nullptr) { + // thread exit + break; + } + task->Load(LoadType::DISK2CPU, 0); + task->Execute(); + } +} + +} // namespace scheduler +} // namespace milvus diff --git a/core/src/scheduler/CPUBuilder.h b/core/src/scheduler/CPUBuilder.h new file mode 100644 index 0000000000..1c4a4f12f2 --- /dev/null +++ b/core/src/scheduler/CPUBuilder.h @@ -0,0 +1,55 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include "task/Task.h" + +namespace milvus { +namespace scheduler { + +class CPUBuilder { + public: + CPUBuilder() = default; + + void + Start(); + + void + Stop(); + + void + Put(const TaskPtr& task); + + private: + void + worker_function(); + + private: + bool running_ = false; + std::mutex mutex_; + std::thread thread_; + + std::queue queue_; + std::condition_variable queue_cv_; + std::mutex queue_mutex_; +}; + +using CPUBuilderPtr = std::shared_ptr; + +} // namespace scheduler +} // namespace milvus diff --git a/core/src/scheduler/JobMgr.cpp b/core/src/scheduler/JobMgr.cpp index 1a2a9359f7..9534f1a4e2 100644 --- a/core/src/scheduler/JobMgr.cpp +++ b/core/src/scheduler/JobMgr.cpp @@ -20,6 +20,7 @@ #include "SchedInst.h" #include "TaskCreator.h" #include "scheduler/Algorithm.h" +#include "scheduler/CPUBuilder.h" #include "scheduler/tasklabel/SpecResLabel.h" #include "selector/Optimizer.h" #include "task/Task.h" @@ -140,7 +141,11 @@ JobMgr::worker_function() { if (auto disk = res_mgr_->GetDiskResources()[0].lock()) { // if (auto disk = res_mgr_->GetCpuResources()[0].lock()) { for (auto& task : tasks) { - disk->task_table().Put(task, nullptr); + if (task->Type() == TaskType::BuildIndexTask && task->path().Last() == "cpu") { + CPUBuilderInst::GetInstance()->Put(task); + } else { + disk->task_table().Put(task, nullptr); + } } } } diff --git a/core/src/scheduler/SchedInst.cpp b/core/src/scheduler/SchedInst.cpp index 392e1abd97..7ccff67696 100644 --- a/core/src/scheduler/SchedInst.cpp +++ b/core/src/scheduler/SchedInst.cpp @@ -38,6 +38,9 @@ std::mutex OptimizerInst::mutex_; BuildMgrPtr BuildMgrInst::instance = nullptr; std::mutex BuildMgrInst::mutex_; +CPUBuilderPtr CPUBuilderInst::instance = nullptr; +std::mutex CPUBuilderInst::mutex_; + void load_simple_config() { // create and connect @@ -94,10 +97,12 @@ StartSchedulerService() { ResMgrInst::GetInstance()->Start(); SchedInst::GetInstance()->Start(); JobMgrInst::GetInstance()->Start(); + CPUBuilderInst::GetInstance()->Start(); } void StopSchedulerService() { + CPUBuilderInst::GetInstance()->Stop(); JobMgrInst::GetInstance()->Stop(); SchedInst::GetInstance()->Stop(); ResMgrInst::GetInstance()->Stop(); diff --git a/core/src/scheduler/SchedInst.h b/core/src/scheduler/SchedInst.h index c78d271108..5628bb2712 100644 --- a/core/src/scheduler/SchedInst.h +++ b/core/src/scheduler/SchedInst.h @@ -12,6 +12,7 @@ #pragma once #include "BuildMgr.h" +#include "CPUBuilder.h" #include "JobMgr.h" #include "ResourceMgr.h" #include "Scheduler.h" @@ -157,6 +158,24 @@ class BuildMgrInst { static std::mutex mutex_; }; +class CPUBuilderInst { + public: + static CPUBuilderPtr + GetInstance() { + if (instance == nullptr) { + std::lock_guard lock(mutex_); + if (instance == nullptr) { + instance = std::make_shared(); + } + } + return instance; + } + + private: + static CPUBuilderPtr instance; + static std::mutex mutex_; +}; + void StartSchedulerService();