diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 02b4e490c2..47a5ebf6e4 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -10,6 +10,8 @@ Please mark all change in change log and use the ticket from JIRA. ## New Feature +- MS-57 - Implement index load/search pipeline + ## Task # MegaSearch 0.2.0 (2019-05-31) diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index e400ab538f..5399c9b113 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -10,6 +10,7 @@ aux_source_directory(config config_files) aux_source_directory(server server_files) aux_source_directory(utils utils_files) aux_source_directory(db db_files) +aux_source_directory(db/scheduler db_scheduler_files) aux_source_directory(wrapper wrapper_files) aux_source_directory(metrics metrics_files) @@ -39,6 +40,7 @@ set(vecwise_engine_files ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp ${cache_files} ${db_files} + ${db_scheduler_files} ${wrapper_files} # metrics/Metrics.cpp ${metrics_files} diff --git a/cpp/src/db/DBImpl.h b/cpp/src/db/DBImpl.h index 7384e53a34..b67aa8a0a9 100644 --- a/cpp/src/db/DBImpl.h +++ b/cpp/src/db/DBImpl.h @@ -53,6 +53,12 @@ public: virtual ~DBImpl(); private: + Status QuerySync(const std::string& table_id, size_t k, size_t nq, + const float* vectors, const meta::DatesT& dates, QueryResults& results); + + Status QueryAsync(const std::string& table_id, size_t k, size_t nq, + const float* vectors, const meta::DatesT& dates, QueryResults& results); + void BackgroundBuildIndex(); Status BuildIndex(const meta::TableFileSchema&); diff --git a/cpp/src/db/DBImpl.inl b/cpp/src/db/DBImpl.inl index ba0074ae2c..10f16ee9eb 100644 --- a/cpp/src/db/DBImpl.inl +++ b/cpp/src/db/DBImpl.inl @@ -8,6 +8,8 @@ #include "DBImpl.h" #include "DBMetaImpl.h" #include "Env.h" +#include "metrics/Metrics.h" +#include "scheduler/SearchScheduler.h" #include #include @@ -16,8 +18,6 @@ #include #include #include -#include "../utils/Log.h" -#include "metrics/Metrics.h" namespace zilliz { namespace vecwise { @@ -98,7 +98,16 @@ Status DBImpl::Query(const std::string &table_id, size_t k, size_t nq, template Status DBImpl::Query(const std::string& table_id, size_t k, size_t nq, const float* vectors, const meta::DatesT& dates, QueryResults& results) { +#if 0 + return QuerySync(table_id, k, nq, vectors, dates, results); +#else + return QueryAsync(table_id, k, nq, vectors, dates, results); +#endif +} +template +Status DBImpl::QuerySync(const std::string& table_id, size_t k, size_t nq, + const float* vectors, const meta::DatesT& dates, QueryResults& results) { meta::DatePartionedTableFilesSchema files; auto status = pMeta_->FilesToSearch(table_id, dates, files); if (!status.ok()) { return status; } @@ -150,6 +159,7 @@ Status DBImpl::Query(const std::string& table_id, size_t k, size_t nq, auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void { for (auto &file : file_vec) { + EngineT index(file.dimension, file.location); index.Load(); auto file_size = index.PhysicalSize()/(1024*1024); @@ -248,6 +258,40 @@ Status DBImpl::Query(const std::string& table_id, size_t k, size_t nq, return Status::OK(); } +template +Status DBImpl::QueryAsync(const std::string& table_id, size_t k, size_t nq, + const float* vectors, const meta::DatesT& dates, QueryResults& results) { + meta::DatePartionedTableFilesSchema files; + auto status = pMeta_->FilesToSearch(table_id, dates, files); + if (!status.ok()) { return status; } + + LOG(DEBUG) << "Search DateT Size=" << files.size(); + + SearchContextPtr context = std::make_shared(k, nq, vectors); + + for (auto &day_files : files) { + for (auto &file : day_files.second) { + TableFileSchemaPtr file_ptr = std::make_shared(file); + context->AddIndexFile(file_ptr); + } + } + + SearchScheduler& scheduler = SearchScheduler::GetInstance(); + scheduler.ScheduleSearchTask(context); + + context->WaitResult(); + auto& context_result = context->GetResult(); + for(auto& topk_result : context_result) { + QueryResult ids; + for(auto& pair : topk_result) { + ids.push_back(pair.second); + } + results.emplace_back(ids); + } + + return Status::OK(); +} + template void DBImpl::StartTimerTasks(int interval) { bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this, interval); diff --git a/cpp/src/db/scheduler/IScheduleStrategy.h b/cpp/src/db/scheduler/IScheduleStrategy.h new file mode 100644 index 0000000000..190125dfdd --- /dev/null +++ b/cpp/src/db/scheduler/IScheduleStrategy.h @@ -0,0 +1,26 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "IndexLoaderQueue.h" +#include "SearchContext.h" + +namespace zilliz { +namespace vecwise { +namespace engine { + +class IScheduleStrategy { +public: + virtual ~IScheduleStrategy() {} + + virtual bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) = 0; +}; + +using ScheduleStrategyPtr = std::shared_ptr; + +} +} +} \ No newline at end of file diff --git a/cpp/src/db/scheduler/IndexLoaderQueue.cpp b/cpp/src/db/scheduler/IndexLoaderQueue.cpp new file mode 100644 index 0000000000..2840e6564e --- /dev/null +++ b/cpp/src/db/scheduler/IndexLoaderQueue.cpp @@ -0,0 +1,110 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "IndexLoaderQueue.h" +#include "ScheduleStrategy.h" +#include "utils/Error.h" +#include "utils/Log.h" + +namespace zilliz { +namespace vecwise { +namespace engine { + +IndexLoaderQueue& +IndexLoaderQueue::GetInstance() { + static IndexLoaderQueue instance; + return instance; +} + +void +IndexLoaderQueue::Put(const SearchContextPtr &search_context) { + std::unique_lock lock(mtx); + full_.wait(lock, [this] { return (queue_.size() < capacity_); }); + + if(search_context == nullptr) { + queue_.push_back(nullptr); + return; + } + + if (queue_.size() >= capacity_) { + std::string error_msg = + "blocking queue is full, capacity: " + std::to_string(capacity_) + " queue_size: " + + std::to_string(queue_.size()); + SERVER_LOG_ERROR << error_msg; + throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); + } + + ScheduleStrategyPtr strategy = StrategyFactory::CreateMemStrategy(); + strategy->Schedule(search_context, queue_); + + empty_.notify_all(); +} + +IndexLoaderContextPtr +IndexLoaderQueue::Take() { + std::unique_lock lock(mtx); + empty_.wait(lock, [this] { return !queue_.empty(); }); + + if (queue_.empty()) { + std::string error_msg = "blocking queue empty"; + SERVER_LOG_ERROR << error_msg; + throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); + } + + IndexLoaderContextPtr front(queue_.front()); + queue_.pop_front(); + full_.notify_all(); + return front; +} + +size_t +IndexLoaderQueue::Size() { + std::lock_guard lock(mtx); + return queue_.size(); +} + +IndexLoaderContextPtr +IndexLoaderQueue::Front() { + std::unique_lock lock(mtx); + empty_.wait(lock, [this] { return !queue_.empty(); }); + if (queue_.empty()) { + std::string error_msg = "blocking queue empty"; + SERVER_LOG_ERROR << error_msg; + throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); + } + IndexLoaderContextPtr front(queue_.front()); + return front; +} + +IndexLoaderContextPtr +IndexLoaderQueue::Back() { + std::unique_lock lock(mtx); + empty_.wait(lock, [this] { return !queue_.empty(); }); + + if (queue_.empty()) { + std::string error_msg = "blocking queue empty"; + SERVER_LOG_ERROR << error_msg; + throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); + } + + IndexLoaderContextPtr back(queue_.back()); + return back; +} + +bool +IndexLoaderQueue::Empty() { + std::unique_lock lock(mtx); + return queue_.empty(); +} + +void +IndexLoaderQueue::SetCapacity(const size_t capacity) { + capacity_ = (capacity > 0 ? capacity : capacity_); +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/db/scheduler/IndexLoaderQueue.h b/cpp/src/db/scheduler/IndexLoaderQueue.h new file mode 100644 index 0000000000..f0d71dcbd7 --- /dev/null +++ b/cpp/src/db/scheduler/IndexLoaderQueue.h @@ -0,0 +1,66 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "SearchContext.h" + +#include +#include +#include +#include + + +namespace zilliz { +namespace vecwise { +namespace engine { + + +class IndexLoaderContext { +public: + TableFileSchemaPtr file_; + std::vector search_contexts_; +}; +using IndexLoaderContextPtr = std::shared_ptr; + +class IndexLoaderQueue { +private: + IndexLoaderQueue() : mtx(), full_(), empty_() {} + + IndexLoaderQueue(const IndexLoaderQueue &rhs) = delete; + + IndexLoaderQueue &operator=(const IndexLoaderQueue &rhs) = delete; + +public: + using LoaderQueue = std::list; + + static IndexLoaderQueue& GetInstance(); + + void Put(const SearchContextPtr &search_context); + + IndexLoaderContextPtr Take(); + + IndexLoaderContextPtr Front(); + + IndexLoaderContextPtr Back(); + + size_t Size(); + + bool Empty(); + + void SetCapacity(const size_t capacity); + +private: + mutable std::mutex mtx; + std::condition_variable full_; + std::condition_variable empty_; + + LoaderQueue queue_; + size_t capacity_ = 1000000; +}; + +} +} +} diff --git a/cpp/src/db/scheduler/ScheduleStrategy.cpp b/cpp/src/db/scheduler/ScheduleStrategy.cpp new file mode 100644 index 0000000000..2b344b33ef --- /dev/null +++ b/cpp/src/db/scheduler/ScheduleStrategy.cpp @@ -0,0 +1,64 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + + +#include "ScheduleStrategy.h" +#include "cache/CpuCacheMgr.h" +#include "utils/Error.h" +#include "utils/Log.h" + +namespace zilliz { +namespace vecwise { +namespace engine { + +class MemScheduleStrategy : public IScheduleStrategy { +public: + bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) override { + if(search_context == nullptr) { + return false; + } + + SearchContext::Id2IndexMap index_files = search_context->GetIndexMap(); + //some index loader alread exists + for(auto& loader : loader_list) { + if(index_files.find(loader->file_->id) != index_files.end()){ + SERVER_LOG_INFO << "Append SearchContext to exist IndexLoaderContext"; + index_files.erase(loader->file_->id); + loader->search_contexts_.push_back(search_context); + } + } + + //index_files still contains some index files, create new loader + for(auto& pair : index_files) { + SERVER_LOG_INFO << "Create new IndexLoaderContext for: " << pair.second->location; + IndexLoaderContextPtr new_loader = std::make_shared(); + new_loader->search_contexts_.push_back(search_context); + new_loader->file_ = pair.second; + + auto index = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(pair.second->location); + if(index != nullptr) { + //if the index file has been in memory, increase its priority + loader_list.push_front(new_loader); + } else { + //index file not in memory, put it to tail + loader_list.push_back(new_loader); + } + } + + return true; + } +}; + + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +ScheduleStrategyPtr StrategyFactory::CreateMemStrategy() { + ScheduleStrategyPtr strategy(new MemScheduleStrategy()); + return strategy; +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/db/scheduler/ScheduleStrategy.h b/cpp/src/db/scheduler/ScheduleStrategy.h new file mode 100644 index 0000000000..da01d7e745 --- /dev/null +++ b/cpp/src/db/scheduler/ScheduleStrategy.h @@ -0,0 +1,24 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "IScheduleStrategy.h" + +namespace zilliz { +namespace vecwise { +namespace engine { + +class StrategyFactory { +private: + StrategyFactory() {} + +public: + static ScheduleStrategyPtr CreateMemStrategy(); +}; + +} +} +} diff --git a/cpp/src/db/scheduler/SearchContext.cpp b/cpp/src/db/scheduler/SearchContext.cpp new file mode 100644 index 0000000000..7cde55f31a --- /dev/null +++ b/cpp/src/db/scheduler/SearchContext.cpp @@ -0,0 +1,55 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "SearchContext.h" +#include "utils/Log.h" + +#include + +namespace zilliz { +namespace vecwise { +namespace engine { + +SearchContext::SearchContext(uint64_t topk, uint64_t nq, const float* vectors) + : topk_(topk), + nq_(nq), + vectors_(vectors) { + //use current time to identify this context + std::chrono::system_clock::time_point tp = std::chrono::system_clock::now(); + long id = tp.time_since_epoch().count(); + identity_ = std::to_string(id); +} + +bool +SearchContext::AddIndexFile(TableFileSchemaPtr& index_file) { + std::unique_lock lock(mtx_); + if(index_file == nullptr || map_index_files_.find(index_file->id) != map_index_files_.end()) { + return false; + } + + SERVER_LOG_INFO << "SearchContext " << identity_ << " add index file: " << index_file->id; + + map_index_files_[index_file->id] = index_file; + return true; +} + +void +SearchContext::IndexSearchDone(size_t index_id) { + std::unique_lock lock(mtx_); + map_index_files_.erase(index_id); + done_cond_.notify_all(); + SERVER_LOG_INFO << "SearchContext " << identity_ << " finish index file: " << index_id; +} + +void +SearchContext::WaitResult() { + std::unique_lock lock(mtx_); + done_cond_.wait(lock, [this] { return map_index_files_.empty(); }); +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/db/scheduler/SearchContext.h b/cpp/src/db/scheduler/SearchContext.h new file mode 100644 index 0000000000..ae7327fd68 --- /dev/null +++ b/cpp/src/db/scheduler/SearchContext.h @@ -0,0 +1,62 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "db/MetaTypes.h" + +#include +#include +#include +#include + +namespace zilliz { +namespace vecwise { +namespace engine { + +using TableFileSchemaPtr = std::shared_ptr; + +class SearchContext { +public: + SearchContext(uint64_t topk, uint64_t nq, const float* vectors); + + bool AddIndexFile(TableFileSchemaPtr& index_file); + + uint64_t topk() const { return topk_; } + uint64_t nq() const { return nq_; } + const float* vectors() const { return vectors_; } + + using Id2IndexMap = std::unordered_map; + const Id2IndexMap& GetIndexMap() const { return map_index_files_; } + + using Score2IdMap = std::map; + using ResultSet = std::vector; + const ResultSet& GetResult() const { return result_; } + ResultSet& GetResult() { return result_; } + + void IndexSearchDone(size_t index_id); + void WaitResult(); + +private: + uint64_t topk_ = 0; + uint64_t nq_ = 0; + const float* vectors_ = nullptr; + + Id2IndexMap map_index_files_; + ResultSet result_; + + std::mutex mtx_; + std::condition_variable done_cond_; + + std::string identity_; //for debug +}; + +using SearchContextPtr = std::shared_ptr; + + + +} +} +} diff --git a/cpp/src/db/scheduler/SearchScheduler.cpp b/cpp/src/db/scheduler/SearchScheduler.cpp new file mode 100644 index 0000000000..4d3a12fdf8 --- /dev/null +++ b/cpp/src/db/scheduler/SearchScheduler.cpp @@ -0,0 +1,164 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "SearchScheduler.h" +#include "IndexLoaderQueue.h" +#include "SearchTaskQueue.h" +#include "utils/Log.h" +#include "utils/TimeRecorder.h" +#include "metrics/Metrics.h" + +namespace zilliz { +namespace vecwise { +namespace engine { + +SearchScheduler::SearchScheduler() + : thread_pool_(2), + stopped_(true) { + Start(); +} + +SearchScheduler::~SearchScheduler() { + Stop(); +} + +SearchScheduler& SearchScheduler::GetInstance() { + static SearchScheduler s_instance; + return s_instance; +} + +bool +SearchScheduler::Start() { + if(!stopped_) { + return true; + } + + thread_pool_.enqueue(&SearchScheduler::IndexLoadWorker, this); + thread_pool_.enqueue(&SearchScheduler::SearchWorker, this); + return true; +} + +bool +SearchScheduler::Stop() { + if(stopped_) { + return true; + } + + IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance(); + index_queue.Put(nullptr); + + SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance(); + search_queue.Put(nullptr); + + return true; +} + +bool +SearchScheduler::ScheduleSearchTask(SearchContextPtr& search_context) { + IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance(); + index_queue.Put(search_context); + + return true; +} + +bool +SearchScheduler::IndexLoadWorker() { + IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance(); + SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance(); + while(true) { + IndexLoaderContextPtr context = index_queue.Take(); + if(context == nullptr) { + SERVER_LOG_INFO << "Stop thread for index loading"; + break;//exit + } + + SERVER_LOG_INFO << "Loading index(" << context->file_->id << ") from location: " << context->file_->location; + + server::TimeRecorder rc("Load index"); + //load index + IndexEnginePtr index_ptr = std::make_shared(context->file_->dimension, context->file_->location); + index_ptr->Load(); + + rc.Record("load index file to memory"); + + size_t file_size = index_ptr->PhysicalSize(); + LOG(DEBUG) << "Index file type " << context->file_->file_type << " Of Size: " + << file_size/(1024*1024) << " M"; + + //metric + switch(context->file_->file_type) { + case meta::TableFileSchema::RAW: { + server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size); + server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size); + server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size); + break; + } + case meta::TableFileSchema::TO_INDEX: { + server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size); + server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size); + server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size); + break; + } + default: { + server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size); + server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size); + server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size); + break; + } + } + + //put search task to another queue + SearchTaskPtr task_ptr = std::make_shared(); + task_ptr->index_id_ = context->file_->id; + task_ptr->index_type_ = context->file_->file_type; + task_ptr->index_engine_ = index_ptr; + task_ptr->search_contexts_.swap(context->search_contexts_); + search_queue.Put(task_ptr); + } + + return true; +} + +bool +SearchScheduler::SearchWorker() { + SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance(); + while(true) { + SearchTaskPtr task_ptr = search_queue.Take(); + if(task_ptr == nullptr) { + SERVER_LOG_INFO << "Stop thread for searching"; + break;//exit + } + + SERVER_LOG_INFO << "Searching in index(" << task_ptr->index_id_<< ") with " + << task_ptr->search_contexts_.size() << " tasks"; + + //do search + auto start_time = METRICS_NOW_TIME; + task_ptr->DoSearch(); + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + switch(task_ptr->index_type_) { + case meta::TableFileSchema::RAW: { + server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); + break; + } + case meta::TableFileSchema::TO_INDEX: { + server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); + break; + } + default: { + server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time); + break; + } + } + } + + return true; +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/db/scheduler/SearchScheduler.h b/cpp/src/db/scheduler/SearchScheduler.h new file mode 100644 index 0000000000..24c85395fc --- /dev/null +++ b/cpp/src/db/scheduler/SearchScheduler.h @@ -0,0 +1,40 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "SearchContext.h" +#include "utils/ThreadPool.h" + +namespace zilliz { +namespace vecwise { +namespace engine { + +class SearchScheduler { +private: + SearchScheduler(); + virtual ~SearchScheduler(); + +public: + static SearchScheduler& GetInstance(); + + bool ScheduleSearchTask(SearchContextPtr& search_context); + +private: + bool Start(); + bool Stop(); + + bool IndexLoadWorker(); + bool SearchWorker(); + +private: + server::ThreadPool thread_pool_; + bool stopped_ = true; +}; + + +} +} +} diff --git a/cpp/src/db/scheduler/SearchTaskQueue.h b/cpp/src/db/scheduler/SearchTaskQueue.h new file mode 100644 index 0000000000..3b58294811 --- /dev/null +++ b/cpp/src/db/scheduler/SearchTaskQueue.h @@ -0,0 +1,63 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "SearchContext.h" +#include "utils/BlockingQueue.h" +#include "../FaissExecutionEngine.h" +#include "../Traits.h" + +#include + +namespace zilliz { +namespace vecwise { +namespace engine { + +#ifdef GPU_VERSION +using IndexTraitClass = IVFIndexTrait; +#else +using IndexTraitClass = IDMapIndexTrait; +#endif + +using IndexClass = FaissExecutionEngine; +using IndexEnginePtr = std::shared_ptr; + +template +class SearchTask { +public: + bool DoSearch(); + +public: + size_t index_id_ = 0; + int index_type_ = 0; //for metrics + IndexEnginePtr index_engine_; + std::vector search_contexts_; +}; + +using SearchTaskClass = SearchTask; +using SearchTaskPtr = std::shared_ptr; + +class SearchTaskQueue : public server::BlockingQueue { +private: + SearchTaskQueue(); + + SearchTaskQueue(const SearchTaskQueue &rhs) = delete; + + SearchTaskQueue &operator=(const SearchTaskQueue &rhs) = delete; + +public: + static SearchTaskQueue& GetInstance(); + +private: + +}; + + +} +} +} + +#include "SearchTaskQueue.inl" \ No newline at end of file diff --git a/cpp/src/db/scheduler/SearchTaskQueue.inl b/cpp/src/db/scheduler/SearchTaskQueue.inl new file mode 100644 index 0000000000..7816ba8878 --- /dev/null +++ b/cpp/src/db/scheduler/SearchTaskQueue.inl @@ -0,0 +1,114 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "SearchTaskQueue.h" +#include "utils/Log.h" +#include "utils/TimeRecorder.h" + +namespace zilliz { +namespace vecwise { +namespace engine { + +namespace { +void ClusterResult(const std::vector &output_ids, + const std::vector &output_distence, + uint64_t nq, + uint64_t topk, + SearchContext::ResultSet &result_set) { + result_set.clear(); + for (auto i = 0; i < nq; i++) { + SearchContext::Score2IdMap score2id; + for (auto k = 0; k < topk; k++) { + uint64_t index = i * nq + k; + score2id.insert(std::make_pair(output_distence[index], output_ids[index])); + } + result_set.emplace_back(score2id); + } +} + +void TopkResult(SearchContext::ResultSet &result_src, + uint64_t topk, + SearchContext::ResultSet &result_target) { + if (result_target.empty()) { + result_target.swap(result_src); + return; + } + + if (result_src.size() != result_target.size()) { + SERVER_LOG_ERROR << "Invalid result set"; + return; + } + + for (size_t i = 0; i < result_src.size(); i++) { + SearchContext::Score2IdMap &score2id_src = result_src[i]; + SearchContext::Score2IdMap &score2id_target = result_target[i]; + for (auto iter = score2id_src.begin(); iter != score2id_src.end(); ++iter) { + score2id_target.insert(std::make_pair(iter->first, iter->second)); + } + + //remove unused items + while (score2id_target.size() > topk) { + score2id_target.erase(score2id_target.rbegin()->first); + } + } +} +} + + +SearchTaskQueue::SearchTaskQueue() { + SetCapacity(4); +} + + +SearchTaskQueue& +SearchTaskQueue::GetInstance() { + static SearchTaskQueue instance; + return instance; +} + +template +bool SearchTask::DoSearch() { + if(index_engine_ == nullptr) { + return false; + } + + server::TimeRecorder rc("DoSearch"); + + std::vector output_ids; + std::vector output_distence; + for(auto& context : search_contexts_) { + auto inner_k = index_engine_->Count() < context->topk() ? index_engine_->Count() : context->topk(); + output_ids.resize(inner_k*context->nq()); + output_distence.resize(inner_k*context->nq()); + + try { + index_engine_->Search(context->nq(), context->vectors(), inner_k, output_distence.data(), + output_ids.data()); + } catch (std::exception& ex) { + SERVER_LOG_ERROR << "SearchTask encounter exception: " << ex.what(); + context->IndexSearchDone(index_id_);//mark as done avoid dead lock, even search failed + continue; + } + + rc.Record("do search"); + + SearchContext::ResultSet result_set; + ClusterResult(output_ids, output_distence, context->nq(), inner_k, result_set); + rc.Record("cluster result"); + TopkResult(result_set, inner_k, context->GetResult()); + rc.Record("reduce topk"); + context->IndexSearchDone(index_id_); + } + + rc.Elapse("totally cost"); + + return true; +} + +} +} +} diff --git a/cpp/src/sdk/src/client/ClientProxy.cpp b/cpp/src/sdk/src/client/ClientProxy.cpp index 1208494fb3..1e23484742 100644 --- a/cpp/src/sdk/src/client/ClientProxy.cpp +++ b/cpp/src/sdk/src/client/ClientProxy.cpp @@ -264,7 +264,7 @@ ClientProxy::SearchVector(const std::string &table_name, } } catch ( std::exception& ex) { - return Status(StatusCode::UnknownError, "failed to create table partition: " + std::string(ex.what())); + return Status(StatusCode::UnknownError, "failed to search vectors: " + std::string(ex.what())); } return Status::OK(); diff --git a/cpp/src/server/MegasearchTask.cpp b/cpp/src/server/MegasearchTask.cpp index 6871893348..1ea2a235dc 100644 --- a/cpp/src/server/MegasearchTask.cpp +++ b/cpp/src/server/MegasearchTask.cpp @@ -455,7 +455,7 @@ ServerError SearchVectorTask::OnExecute() { result_array_.emplace_back(thrift_topk_result); } rc.Record("construct result"); - + rc.Elapse("totally cost"); } catch (std::exception& ex) { error_code_ = SERVER_UNEXPECTED_ERROR; error_msg_ = ex.what(); diff --git a/cpp/unittest/db/CMakeLists.txt b/cpp/unittest/db/CMakeLists.txt index 2b3d4609b0..ecfeb1e92c 100644 --- a/cpp/unittest/db/CMakeLists.txt +++ b/cpp/unittest/db/CMakeLists.txt @@ -6,6 +6,7 @@ include_directories(../../src) aux_source_directory(../../src/db db_srcs) +aux_source_directory(../../src/db/scheduler db_scheduler_srcs) aux_source_directory(../../src/config config_files) aux_source_directory(../../src/cache cache_srcs) aux_source_directory(../../src/wrapper wrapper_src) @@ -24,6 +25,7 @@ set(db_test_src ${config_files} ${cache_srcs} ${db_srcs} + ${db_scheduler_srcs} ${wrapper_src} ${require_files} utils.cpp diff --git a/cpp/unittest/metrics/CMakeLists.txt b/cpp/unittest/metrics/CMakeLists.txt index 2560467c5b..f24538fad4 100644 --- a/cpp/unittest/metrics/CMakeLists.txt +++ b/cpp/unittest/metrics/CMakeLists.txt @@ -13,6 +13,7 @@ include_directories(../../src) aux_source_directory(../../src/db db_srcs) +aux_source_directory(../../src/db/scheduler db_scheduler_srcs) aux_source_directory(../../src/config config_files) aux_source_directory(../../src/cache cache_srcs) aux_source_directory(../../src/wrapper wrapper_src) @@ -47,6 +48,7 @@ set(count_test_src ${config_files} ${cache_srcs} ${db_srcs} + ${db_scheduler_srcs} ${wrapper_src} ${require_files} metrics_test.cpp