From db1b37d45ca154ab1951b00eb1d6fcdcd4b62c1c Mon Sep 17 00:00:00 2001 From: groot Date: Wed, 5 Jun 2019 12:14:53 +0800 Subject: [PATCH] fix problems Former-commit-id: c90a6834c4848fbaed1a1b0c753979f307db4d27 --- cpp/CHANGELOG.md | 3 +- cpp/src/db/DBImpl.inl | 1 - cpp/src/db/scheduler/IScheduleStrategy.h | 26 ++++++++ cpp/src/db/scheduler/IndexLoaderQueue.cpp | 2 +- cpp/src/db/scheduler/ScheduleStrategy.cpp | 75 +++++++++++------------ cpp/src/db/scheduler/ScheduleStrategy.h | 16 ++--- cpp/src/db/scheduler/SearchContext.h | 8 +-- cpp/src/db/scheduler/SearchScheduler.cpp | 35 ++++++----- cpp/src/db/scheduler/SearchTaskQueue.h | 2 +- cpp/src/db/scheduler/SearchTaskQueue.inl | 18 ++++-- 10 files changed, 111 insertions(+), 75 deletions(-) create mode 100644 cpp/src/db/scheduler/IScheduleStrategy.h diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 69a665deb4..47a5ebf6e4 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -10,6 +10,8 @@ Please mark all change in change log and use the ticket from JIRA. ## New Feature +- MS-57 - Implement index load/search pipeline + ## Task # MegaSearch 0.2.0 (2019-05-31) @@ -37,7 +39,6 @@ Please mark all change in change log and use the ticket from JIRA. - MS-37 - Add query, cache usage, disk write speed and file data size metrics - MS-30 - Use faiss v1.5.2 - MS-54 - cmake: Change Thrift third party URL to github.com -- MS-57 - Implement index load/search pipeline ## Task diff --git a/cpp/src/db/DBImpl.inl b/cpp/src/db/DBImpl.inl index adb0e763e0..10f16ee9eb 100644 --- a/cpp/src/db/DBImpl.inl +++ b/cpp/src/db/DBImpl.inl @@ -8,7 +8,6 @@ #include "DBImpl.h" #include "DBMetaImpl.h" #include "Env.h" -#include "utils/Log.h" #include "metrics/Metrics.h" #include "scheduler/SearchScheduler.h" diff --git a/cpp/src/db/scheduler/IScheduleStrategy.h b/cpp/src/db/scheduler/IScheduleStrategy.h new file mode 100644 index 0000000000..190125dfdd --- /dev/null +++ b/cpp/src/db/scheduler/IScheduleStrategy.h @@ -0,0 +1,26 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "IndexLoaderQueue.h" +#include "SearchContext.h" + +namespace zilliz { +namespace vecwise { +namespace engine { + +class IScheduleStrategy { +public: + virtual ~IScheduleStrategy() {} + + virtual bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) = 0; +}; + +using ScheduleStrategyPtr = std::shared_ptr; + +} +} +} \ No newline at end of file diff --git a/cpp/src/db/scheduler/IndexLoaderQueue.cpp b/cpp/src/db/scheduler/IndexLoaderQueue.cpp index 6d476a9df9..7f748a65a9 100644 --- a/cpp/src/db/scheduler/IndexLoaderQueue.cpp +++ b/cpp/src/db/scheduler/IndexLoaderQueue.cpp @@ -37,7 +37,7 @@ IndexLoaderQueue::Put(const SearchContextPtr &search_context) { throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); } - ScheduleStrategyPtr strategy = CreateStrategy(); + ScheduleStrategyPtr strategy = StrategyFactory::CreateMemStrategy(); strategy->Schedule(search_context, queue_); empty_.notify_all(); diff --git a/cpp/src/db/scheduler/ScheduleStrategy.cpp b/cpp/src/db/scheduler/ScheduleStrategy.cpp index 6a25d4c57c..2b344b33ef 100644 --- a/cpp/src/db/scheduler/ScheduleStrategy.cpp +++ b/cpp/src/db/scheduler/ScheduleStrategy.cpp @@ -16,50 +16,49 @@ namespace engine { class MemScheduleStrategy : public IScheduleStrategy { public: - bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) override; + bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) override { + if(search_context == nullptr) { + return false; + } + + SearchContext::Id2IndexMap index_files = search_context->GetIndexMap(); + //some index loader alread exists + for(auto& loader : loader_list) { + if(index_files.find(loader->file_->id) != index_files.end()){ + SERVER_LOG_INFO << "Append SearchContext to exist IndexLoaderContext"; + index_files.erase(loader->file_->id); + loader->search_contexts_.push_back(search_context); + } + } + + //index_files still contains some index files, create new loader + for(auto& pair : index_files) { + SERVER_LOG_INFO << "Create new IndexLoaderContext for: " << pair.second->location; + IndexLoaderContextPtr new_loader = std::make_shared(); + new_loader->search_contexts_.push_back(search_context); + new_loader->file_ = pair.second; + + auto index = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(pair.second->location); + if(index != nullptr) { + //if the index file has been in memory, increase its priority + loader_list.push_front(new_loader); + } else { + //index file not in memory, put it to tail + loader_list.push_back(new_loader); + } + } + + return true; + } }; -ScheduleStrategyPtr CreateStrategy() { + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +ScheduleStrategyPtr StrategyFactory::CreateMemStrategy() { ScheduleStrategyPtr strategy(new MemScheduleStrategy()); return strategy; } -bool MemScheduleStrategy::Schedule(const SearchContextPtr &search_context, - IndexLoaderQueue::LoaderQueue &loader_list) { - if(search_context == nullptr) { - return false; - } - - SearchContext::Id2IndexMap index_files = search_context->GetIndexMap(); - //some index loader alread exists - for(auto iter = loader_list.begin(); iter != loader_list.end(); ++iter) { - if(index_files.find((*iter)->file_->id) != index_files.end()){ - SERVER_LOG_INFO << "Append SearchContext to exist IndexLoaderContext"; - index_files.erase((*iter)->file_->id); - (*iter)->search_contexts_.push_back(search_context); - } - } - - //index_files still contains some index files, create new loader - for(auto iter = index_files.begin(); iter != index_files.end(); ++iter) { - SERVER_LOG_INFO << "Create new IndexLoaderContext for: " << iter->second->location; - IndexLoaderContextPtr new_loader = std::make_shared(); - new_loader->search_contexts_.push_back(search_context); - new_loader->file_ = iter->second; - - auto index = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(iter->second->location); - if(index != nullptr) { - //if the index file has been in memory, increase its priority - loader_list.push_front(new_loader); - } else { - //index file not in memory, put it to tail - loader_list.push_back(new_loader); - } - } - - return true; -} - } } } \ No newline at end of file diff --git a/cpp/src/db/scheduler/ScheduleStrategy.h b/cpp/src/db/scheduler/ScheduleStrategy.h index 6b9ac3bee4..da01d7e745 100644 --- a/cpp/src/db/scheduler/ScheduleStrategy.h +++ b/cpp/src/db/scheduler/ScheduleStrategy.h @@ -5,24 +5,20 @@ ******************************************************************************/ #pragma once -#include "IndexLoaderQueue.h" -#include "SearchContext.h" +#include "IScheduleStrategy.h" namespace zilliz { namespace vecwise { namespace engine { -class IScheduleStrategy { -public: - virtual ~IScheduleStrategy() {} +class StrategyFactory { +private: + StrategyFactory() {} - virtual bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) = 0; +public: + static ScheduleStrategyPtr CreateMemStrategy(); }; -using ScheduleStrategyPtr = std::shared_ptr; - -ScheduleStrategyPtr CreateStrategy(); - } } } diff --git a/cpp/src/db/scheduler/SearchContext.h b/cpp/src/db/scheduler/SearchContext.h index a7ffedb0de..ae7327fd68 100644 --- a/cpp/src/db/scheduler/SearchContext.h +++ b/cpp/src/db/scheduler/SearchContext.h @@ -5,7 +5,7 @@ ******************************************************************************/ #pragma once -#include "../MetaTypes.h" +#include "db/MetaTypes.h" #include #include @@ -24,9 +24,9 @@ public: bool AddIndexFile(TableFileSchemaPtr& index_file); - uint64_t Topk() const { return topk_; } - uint64_t Nq() const { return nq_; } - const float* Vectors() const { return vectors_; } + uint64_t topk() const { return topk_; } + uint64_t nq() const { return nq_; } + const float* vectors() const { return vectors_; } using Id2IndexMap = std::unordered_map; const Id2IndexMap& GetIndexMap() const { return map_index_files_; } diff --git a/cpp/src/db/scheduler/SearchScheduler.cpp b/cpp/src/db/scheduler/SearchScheduler.cpp index 2b1114a14e..3ffb0dd94e 100644 --- a/cpp/src/db/scheduler/SearchScheduler.cpp +++ b/cpp/src/db/scheduler/SearchScheduler.cpp @@ -71,6 +71,7 @@ SearchScheduler::IndexLoadWorker() { while(true) { IndexLoaderContextPtr context = index_queue.Take(); if(context == nullptr) { + SERVER_LOG_INFO << "Stop thread for index loading"; break;//exit } @@ -88,20 +89,25 @@ SearchScheduler::IndexLoadWorker() { << file_size/(1024*1024) << " M"; //metric - if(context->file_->file_type == meta::TableFileSchema::RAW) { - server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size); - server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size); - server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size); - - } else if(context->file_->file_type == meta::TableFileSchema::TO_INDEX) { - server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size); - server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size); - server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size); - - } else { - server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size); - server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size); - server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size); + switch(context->file_->file_type) { + case meta::TableFileSchema::RAW: { + server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size); + server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size); + server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size); + break; + } + case meta::TableFileSchema::TO_INDEX: { + server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size); + server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size); + server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size); + break; + } + default: { + server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size); + server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size); + server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size); + break; + } } //put search task to another queue @@ -122,6 +128,7 @@ SearchScheduler::SearchWorker() { while(true) { SearchTaskPtr task_ptr = search_queue.Take(); if(task_ptr == nullptr) { + SERVER_LOG_INFO << "Stop thread for searching"; break;//exit } diff --git a/cpp/src/db/scheduler/SearchTaskQueue.h b/cpp/src/db/scheduler/SearchTaskQueue.h index 51090a2bad..3b58294811 100644 --- a/cpp/src/db/scheduler/SearchTaskQueue.h +++ b/cpp/src/db/scheduler/SearchTaskQueue.h @@ -42,7 +42,7 @@ using SearchTaskPtr = std::shared_ptr; class SearchTaskQueue : public server::BlockingQueue { private: - SearchTaskQueue() {} + SearchTaskQueue(); SearchTaskQueue(const SearchTaskQueue &rhs) = delete; diff --git a/cpp/src/db/scheduler/SearchTaskQueue.inl b/cpp/src/db/scheduler/SearchTaskQueue.inl index 94afd9b887..c9cbaf8a1b 100644 --- a/cpp/src/db/scheduler/SearchTaskQueue.inl +++ b/cpp/src/db/scheduler/SearchTaskQueue.inl @@ -58,6 +58,12 @@ void TopkResult(SearchContext::ResultSet &result_src, } } + +SearchTaskQueue::SearchTaskQueue() { + SetCapacity(4); +} + + SearchTaskQueue& SearchTaskQueue::GetInstance() { static SearchTaskQueue s_instance; @@ -75,21 +81,23 @@ bool SearchTask::DoSearch() { std::vector output_ids; std::vector output_distence; for(auto& context : search_contexts_) { - auto inner_k = index_engine_->Count() < context->Topk() ? index_engine_->Count() : context->Topk(); - output_ids.resize(inner_k*context->Nq()); - output_distence.resize(inner_k*context->Nq()); + auto inner_k = index_engine_->Count() < context->topk() ? index_engine_->Count() : context->topk(); + output_ids.resize(inner_k*context->nq()); + output_distence.resize(inner_k*context->nq()); try { - index_engine_->Search(context->Nq(), context->Vectors(), inner_k, output_distence.data(), + index_engine_->Search(context->nq(), context->vectors(), inner_k, output_distence.data(), output_ids.data()); } catch (std::exception& ex) { SERVER_LOG_ERROR << "SearchTask encounter exception: " << ex.what(); + context->IndexSearchDone(index_id_);//mark as done avoid dead lock, even search failed + continue; } rc.Record("do search"); SearchContext::ResultSet result_set; - ClusterResult(output_ids, output_distence, context->Nq(), inner_k, result_set); + ClusterResult(output_ids, output_distence, context->nq(), inner_k, result_set); rc.Record("cluster result"); TopkResult(result_set, inner_k, context->GetResult()); rc.Record("reduce topk");