fix hang issue

Former-commit-id: 8e31caee7dc66dfca929f78a55786b5d0e86837d
This commit is contained in:
groot 2019-06-13 15:32:02 +08:00
parent ff00692666
commit ebfa05d435
7 changed files with 36 additions and 55 deletions

View File

@ -5,6 +5,7 @@ Please mark all change in change log and use the ticket from JIRA.
# MegaSearch 0.3.0 (TBD)
## Bug
- MS-80 - Fix server hang issue
## Improvement

View File

@ -13,12 +13,6 @@ namespace zilliz {
namespace vecwise {
namespace engine {
IndexLoaderQueue&
IndexLoaderQueue::GetInstance() {
static IndexLoaderQueue instance;
return instance;
}
void
IndexLoaderQueue::Put(const SearchContextPtr &search_context) {
std::unique_lock <std::mutex> lock(mtx);
@ -26,6 +20,7 @@ IndexLoaderQueue::Put(const SearchContextPtr &search_context) {
if(search_context == nullptr) {
queue_.push_back(nullptr);
empty_.notify_all();
return;
}

View File

@ -26,18 +26,15 @@ public:
using IndexLoaderContextPtr = std::shared_ptr<IndexLoaderContext>;
class IndexLoaderQueue {
private:
public:
IndexLoaderQueue() : mtx(), full_(), empty_() {}
IndexLoaderQueue(const IndexLoaderQueue &rhs) = delete;
IndexLoaderQueue &operator=(const IndexLoaderQueue &rhs) = delete;
public:
using LoaderQueue = std::list<IndexLoaderContextPtr>;
static IndexLoaderQueue& GetInstance();
void Put(const SearchContextPtr &search_context);
IndexLoaderContextPtr Take();

View File

@ -55,8 +55,7 @@ void CollectDurationMetrics(int index_type, double total_time) {
}
SearchScheduler::SearchScheduler()
: thread_pool_(2),
stopped_(true) {
: stopped_(true) {
Start();
}
@ -75,8 +74,13 @@ SearchScheduler::Start() {
return true;
}
thread_pool_.enqueue(&SearchScheduler::IndexLoadWorker, this);
thread_pool_.enqueue(&SearchScheduler::SearchWorker, this);
stopped_ = false;
search_queue_.SetCapacity(2);
index_load_thread_ = std::make_shared<std::thread>(&SearchScheduler::IndexLoadWorker, this);
search_thread_ = std::make_shared<std::thread>(&SearchScheduler::SearchWorker, this);
return true;
}
@ -86,29 +90,34 @@ SearchScheduler::Stop() {
return true;
}
IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance();
index_queue.Put(nullptr);
if(index_load_thread_) {
index_load_queue_.Put(nullptr);
index_load_thread_->join();
index_load_thread_ = nullptr;
}
SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance();
search_queue.Put(nullptr);
if(search_thread_) {
search_queue_.Put(nullptr);
search_thread_->join();
search_thread_ = nullptr;
}
stopped_ = true;
return true;
}
bool
SearchScheduler::ScheduleSearchTask(SearchContextPtr& search_context) {
IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance();
index_queue.Put(search_context);
index_load_queue_.Put(search_context);
return true;
}
bool
SearchScheduler::IndexLoadWorker() {
IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance();
SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance();
while(true) {
IndexLoaderContextPtr context = index_queue.Take();
IndexLoaderContextPtr context = index_load_queue_.Take();
if(context == nullptr) {
SERVER_LOG_INFO << "Stop thread for index loading";
break;//exit
@ -137,7 +146,7 @@ SearchScheduler::IndexLoadWorker() {
task_ptr->index_type_ = context->file_->file_type_;
task_ptr->index_engine_ = index_ptr;
task_ptr->search_contexts_.swap(context->search_contexts_);
search_queue.Put(task_ptr);
search_queue_.Put(task_ptr);
}
return true;
@ -145,9 +154,8 @@ SearchScheduler::IndexLoadWorker() {
bool
SearchScheduler::SearchWorker() {
SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance();
while(true) {
SearchTaskPtr task_ptr = search_queue.Take();
SearchTaskPtr task_ptr = search_queue_.Take();
if(task_ptr == nullptr) {
SERVER_LOG_INFO << "Stop thread for searching";
break;//exit

View File

@ -6,7 +6,8 @@
#pragma once
#include "SearchContext.h"
#include "utils/ThreadPool.h"
#include "IndexLoaderQueue.h"
#include "SearchTaskQueue.h"
namespace zilliz {
namespace vecwise {
@ -30,7 +31,12 @@ private:
bool SearchWorker();
private:
server::ThreadPool thread_pool_;
std::shared_ptr<std::thread> index_load_thread_;
std::shared_ptr<std::thread> search_thread_;
IndexLoaderQueue index_load_queue_;
SearchTaskQueue search_queue_;
bool stopped_ = true;
};

View File

@ -94,18 +94,6 @@ void CalcScore(uint64_t vector_count,
}
SearchTaskQueue::SearchTaskQueue() {
SetCapacity(4);
}
SearchTaskQueue&
SearchTaskQueue::GetInstance() {
static SearchTaskQueue instance;
return instance;
}
bool SearchTask::DoSearch() {
if(index_engine_ == nullptr) {
return false;

View File

@ -27,21 +27,7 @@ public:
};
using SearchTaskPtr = std::shared_ptr<SearchTask>;
class SearchTaskQueue : public server::BlockingQueue<SearchTaskPtr> {
private:
SearchTaskQueue();
SearchTaskQueue(const SearchTaskQueue &rhs) = delete;
SearchTaskQueue &operator=(const SearchTaskQueue &rhs) = delete;
public:
static SearchTaskQueue& GetInstance();
private:
};
using SearchTaskQueue = server::BlockingQueue<SearchTaskPtr>;
}