From ebfa05d435603ec562691fd8a75a7935d85cd327 Mon Sep 17 00:00:00 2001 From: groot Date: Thu, 13 Jun 2019 15:32:02 +0800 Subject: [PATCH] fix hang issue Former-commit-id: 8e31caee7dc66dfca929f78a55786b5d0e86837d --- cpp/CHANGELOG.md | 1 + cpp/src/db/scheduler/IndexLoaderQueue.cpp | 7 +--- cpp/src/db/scheduler/IndexLoaderQueue.h | 5 +-- cpp/src/db/scheduler/SearchScheduler.cpp | 40 ++++++++++++++--------- cpp/src/db/scheduler/SearchScheduler.h | 10 ++++-- cpp/src/db/scheduler/SearchTaskQueue.cpp | 12 ------- cpp/src/db/scheduler/SearchTaskQueue.h | 16 +-------- 7 files changed, 36 insertions(+), 55 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 4822bcb16b..baf1d92444 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -5,6 +5,7 @@ Please mark all change in change log and use the ticket from JIRA. # MegaSearch 0.3.0 (TBD) ## Bug +- MS-80 - Fix server hang issue ## Improvement diff --git a/cpp/src/db/scheduler/IndexLoaderQueue.cpp b/cpp/src/db/scheduler/IndexLoaderQueue.cpp index 2840e6564e..52d27831cb 100644 --- a/cpp/src/db/scheduler/IndexLoaderQueue.cpp +++ b/cpp/src/db/scheduler/IndexLoaderQueue.cpp @@ -13,12 +13,6 @@ namespace zilliz { namespace vecwise { namespace engine { -IndexLoaderQueue& -IndexLoaderQueue::GetInstance() { - static IndexLoaderQueue instance; - return instance; -} - void IndexLoaderQueue::Put(const SearchContextPtr &search_context) { std::unique_lock lock(mtx); @@ -26,6 +20,7 @@ IndexLoaderQueue::Put(const SearchContextPtr &search_context) { if(search_context == nullptr) { queue_.push_back(nullptr); + empty_.notify_all(); return; } diff --git a/cpp/src/db/scheduler/IndexLoaderQueue.h b/cpp/src/db/scheduler/IndexLoaderQueue.h index f0d71dcbd7..3850a8de8b 100644 --- a/cpp/src/db/scheduler/IndexLoaderQueue.h +++ b/cpp/src/db/scheduler/IndexLoaderQueue.h @@ -26,18 +26,15 @@ public: using IndexLoaderContextPtr = std::shared_ptr; class IndexLoaderQueue { -private: +public: IndexLoaderQueue() : mtx(), full_(), empty_() {} IndexLoaderQueue(const IndexLoaderQueue &rhs) = delete; IndexLoaderQueue &operator=(const IndexLoaderQueue &rhs) = delete; -public: using LoaderQueue = std::list; - static IndexLoaderQueue& GetInstance(); - void Put(const SearchContextPtr &search_context); IndexLoaderContextPtr Take(); diff --git a/cpp/src/db/scheduler/SearchScheduler.cpp b/cpp/src/db/scheduler/SearchScheduler.cpp index c18f95d04d..137cac30c7 100644 --- a/cpp/src/db/scheduler/SearchScheduler.cpp +++ b/cpp/src/db/scheduler/SearchScheduler.cpp @@ -55,8 +55,7 @@ void CollectDurationMetrics(int index_type, double total_time) { } SearchScheduler::SearchScheduler() - : thread_pool_(2), - stopped_(true) { + : stopped_(true) { Start(); } @@ -75,8 +74,13 @@ SearchScheduler::Start() { return true; } - thread_pool_.enqueue(&SearchScheduler::IndexLoadWorker, this); - thread_pool_.enqueue(&SearchScheduler::SearchWorker, this); + stopped_ = false; + + search_queue_.SetCapacity(2); + + index_load_thread_ = std::make_shared(&SearchScheduler::IndexLoadWorker, this); + search_thread_ = std::make_shared(&SearchScheduler::SearchWorker, this); + return true; } @@ -86,29 +90,34 @@ SearchScheduler::Stop() { return true; } - IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance(); - index_queue.Put(nullptr); + if(index_load_thread_) { + index_load_queue_.Put(nullptr); + index_load_thread_->join(); + index_load_thread_ = nullptr; + } - SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance(); - search_queue.Put(nullptr); + if(search_thread_) { + search_queue_.Put(nullptr); + search_thread_->join(); + search_thread_ = nullptr; + } + + stopped_ = true; return true; } bool SearchScheduler::ScheduleSearchTask(SearchContextPtr& search_context) { - IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance(); - index_queue.Put(search_context); + index_load_queue_.Put(search_context); return true; } bool SearchScheduler::IndexLoadWorker() { - IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance(); - SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance(); while(true) { - IndexLoaderContextPtr context = index_queue.Take(); + IndexLoaderContextPtr context = index_load_queue_.Take(); if(context == nullptr) { SERVER_LOG_INFO << "Stop thread for index loading"; break;//exit @@ -137,7 +146,7 @@ SearchScheduler::IndexLoadWorker() { task_ptr->index_type_ = context->file_->file_type_; task_ptr->index_engine_ = index_ptr; task_ptr->search_contexts_.swap(context->search_contexts_); - search_queue.Put(task_ptr); + search_queue_.Put(task_ptr); } return true; @@ -145,9 +154,8 @@ SearchScheduler::IndexLoadWorker() { bool SearchScheduler::SearchWorker() { - SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance(); while(true) { - SearchTaskPtr task_ptr = search_queue.Take(); + SearchTaskPtr task_ptr = search_queue_.Take(); if(task_ptr == nullptr) { SERVER_LOG_INFO << "Stop thread for searching"; break;//exit diff --git a/cpp/src/db/scheduler/SearchScheduler.h b/cpp/src/db/scheduler/SearchScheduler.h index 24c85395fc..6a5b7b344f 100644 --- a/cpp/src/db/scheduler/SearchScheduler.h +++ b/cpp/src/db/scheduler/SearchScheduler.h @@ -6,7 +6,8 @@ #pragma once #include "SearchContext.h" -#include "utils/ThreadPool.h" +#include "IndexLoaderQueue.h" +#include "SearchTaskQueue.h" namespace zilliz { namespace vecwise { @@ -30,7 +31,12 @@ private: bool SearchWorker(); private: - server::ThreadPool thread_pool_; + std::shared_ptr index_load_thread_; + std::shared_ptr search_thread_; + + IndexLoaderQueue index_load_queue_; + SearchTaskQueue search_queue_; + bool stopped_ = true; }; diff --git a/cpp/src/db/scheduler/SearchTaskQueue.cpp b/cpp/src/db/scheduler/SearchTaskQueue.cpp index 101dc818c2..7b18f8cb69 100644 --- a/cpp/src/db/scheduler/SearchTaskQueue.cpp +++ b/cpp/src/db/scheduler/SearchTaskQueue.cpp @@ -94,18 +94,6 @@ void CalcScore(uint64_t vector_count, } - -SearchTaskQueue::SearchTaskQueue() { - SetCapacity(4); -} - - -SearchTaskQueue& -SearchTaskQueue::GetInstance() { - static SearchTaskQueue instance; - return instance; -} - bool SearchTask::DoSearch() { if(index_engine_ == nullptr) { return false; diff --git a/cpp/src/db/scheduler/SearchTaskQueue.h b/cpp/src/db/scheduler/SearchTaskQueue.h index bd8e9d7f24..ef0f77ef9d 100644 --- a/cpp/src/db/scheduler/SearchTaskQueue.h +++ b/cpp/src/db/scheduler/SearchTaskQueue.h @@ -27,21 +27,7 @@ public: }; using SearchTaskPtr = std::shared_ptr; - -class SearchTaskQueue : public server::BlockingQueue { -private: - SearchTaskQueue(); - - SearchTaskQueue(const SearchTaskQueue &rhs) = delete; - - SearchTaskQueue &operator=(const SearchTaskQueue &rhs) = delete; - -public: - static SearchTaskQueue& GetInstance(); - -private: - -}; +using SearchTaskQueue = server::BlockingQueue; }