From 22b5b6cbf193b9d0babc1ef37c5685687ab32eaa Mon Sep 17 00:00:00 2001 From: groot Date: Tue, 4 Jun 2019 19:32:43 +0800 Subject: [PATCH 01/11] pipeline for loading/searching Former-commit-id: 93cd16549fcd392de487237df21e2df14ef8321a --- cpp/src/CMakeLists.txt | 2 + cpp/src/db/DBImpl.h | 6 + cpp/src/db/DBImpl.inl | 49 ++++++- cpp/src/db/scheduler/IndexLoaderQueue.cpp | 110 ++++++++++++++++ cpp/src/db/scheduler/IndexLoaderQueue.h | 66 ++++++++++ cpp/src/db/scheduler/ScheduleStrategy.cpp | 65 ++++++++++ cpp/src/db/scheduler/ScheduleStrategy.h | 28 ++++ cpp/src/db/scheduler/SearchContext.cpp | 54 ++++++++ cpp/src/db/scheduler/SearchContext.h | 62 +++++++++ cpp/src/db/scheduler/SearchScheduler.cpp | 150 ++++++++++++++++++++++ cpp/src/db/scheduler/SearchScheduler.h | 40 ++++++ cpp/src/db/scheduler/SearchTaskQueue.h | 63 +++++++++ cpp/src/db/scheduler/SearchTaskQueue.inl | 106 +++++++++++++++ cpp/src/sdk/src/client/ClientProxy.cpp | 2 +- cpp/src/server/MegasearchTask.cpp | 2 +- cpp/unittest/db/CMakeLists.txt | 2 + cpp/unittest/metrics/CMakeLists.txt | 2 + 17 files changed, 805 insertions(+), 4 deletions(-) create mode 100644 cpp/src/db/scheduler/IndexLoaderQueue.cpp create mode 100644 cpp/src/db/scheduler/IndexLoaderQueue.h create mode 100644 cpp/src/db/scheduler/ScheduleStrategy.cpp create mode 100644 cpp/src/db/scheduler/ScheduleStrategy.h create mode 100644 cpp/src/db/scheduler/SearchContext.cpp create mode 100644 cpp/src/db/scheduler/SearchContext.h create mode 100644 cpp/src/db/scheduler/SearchScheduler.cpp create mode 100644 cpp/src/db/scheduler/SearchScheduler.h create mode 100644 cpp/src/db/scheduler/SearchTaskQueue.h create mode 100644 cpp/src/db/scheduler/SearchTaskQueue.inl diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 86afe997dc..0f3c7b14ef 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -10,6 +10,7 @@ aux_source_directory(config config_files) aux_source_directory(server server_files) aux_source_directory(utils utils_files) aux_source_directory(db db_files) +aux_source_directory(db/scheduler db_scheduler_files) aux_source_directory(wrapper wrapper_files) aux_source_directory(metrics metrics_files) @@ -39,6 +40,7 @@ set(vecwise_engine_files ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp ${cache_files} ${db_files} + ${db_scheduler_files} ${wrapper_files} # metrics/Metrics.cpp ${metrics_files} diff --git a/cpp/src/db/DBImpl.h b/cpp/src/db/DBImpl.h index 7384e53a34..b67aa8a0a9 100644 --- a/cpp/src/db/DBImpl.h +++ b/cpp/src/db/DBImpl.h @@ -53,6 +53,12 @@ public: virtual ~DBImpl(); private: + Status QuerySync(const std::string& table_id, size_t k, size_t nq, + const float* vectors, const meta::DatesT& dates, QueryResults& results); + + Status QueryAsync(const std::string& table_id, size_t k, size_t nq, + const float* vectors, const meta::DatesT& dates, QueryResults& results); + void BackgroundBuildIndex(); Status BuildIndex(const meta::TableFileSchema&); diff --git a/cpp/src/db/DBImpl.inl b/cpp/src/db/DBImpl.inl index ba0074ae2c..adb0e763e0 100644 --- a/cpp/src/db/DBImpl.inl +++ b/cpp/src/db/DBImpl.inl @@ -8,6 +8,9 @@ #include "DBImpl.h" #include "DBMetaImpl.h" #include "Env.h" +#include "utils/Log.h" +#include "metrics/Metrics.h" +#include "scheduler/SearchScheduler.h" #include #include @@ -16,8 +19,6 @@ #include #include #include -#include "../utils/Log.h" -#include "metrics/Metrics.h" namespace zilliz { namespace vecwise { @@ -98,7 +99,16 @@ Status DBImpl::Query(const std::string &table_id, size_t k, size_t nq, template Status DBImpl::Query(const std::string& table_id, size_t k, size_t nq, const float* vectors, const meta::DatesT& dates, QueryResults& results) { +#if 0 + return QuerySync(table_id, k, nq, vectors, dates, results); +#else + return QueryAsync(table_id, k, nq, vectors, dates, results); +#endif +} +template +Status DBImpl::QuerySync(const std::string& table_id, size_t k, size_t nq, + const float* vectors, const meta::DatesT& dates, QueryResults& results) { meta::DatePartionedTableFilesSchema files; auto status = pMeta_->FilesToSearch(table_id, dates, files); if (!status.ok()) { return status; } @@ -150,6 +160,7 @@ Status DBImpl::Query(const std::string& table_id, size_t k, size_t nq, auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void { for (auto &file : file_vec) { + EngineT index(file.dimension, file.location); index.Load(); auto file_size = index.PhysicalSize()/(1024*1024); @@ -248,6 +259,40 @@ Status DBImpl::Query(const std::string& table_id, size_t k, size_t nq, return Status::OK(); } +template +Status DBImpl::QueryAsync(const std::string& table_id, size_t k, size_t nq, + const float* vectors, const meta::DatesT& dates, QueryResults& results) { + meta::DatePartionedTableFilesSchema files; + auto status = pMeta_->FilesToSearch(table_id, dates, files); + if (!status.ok()) { return status; } + + LOG(DEBUG) << "Search DateT Size=" << files.size(); + + SearchContextPtr context = std::make_shared(k, nq, vectors); + + for (auto &day_files : files) { + for (auto &file : day_files.second) { + TableFileSchemaPtr file_ptr = std::make_shared(file); + context->AddIndexFile(file_ptr); + } + } + + SearchScheduler& scheduler = SearchScheduler::GetInstance(); + scheduler.ScheduleSearchTask(context); + + context->WaitResult(); + auto& context_result = context->GetResult(); + for(auto& topk_result : context_result) { + QueryResult ids; + for(auto& pair : topk_result) { + ids.push_back(pair.second); + } + results.emplace_back(ids); + } + + return Status::OK(); +} + template void DBImpl::StartTimerTasks(int interval) { bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this, interval); diff --git a/cpp/src/db/scheduler/IndexLoaderQueue.cpp b/cpp/src/db/scheduler/IndexLoaderQueue.cpp new file mode 100644 index 0000000000..6d476a9df9 --- /dev/null +++ b/cpp/src/db/scheduler/IndexLoaderQueue.cpp @@ -0,0 +1,110 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "IndexLoaderQueue.h" +#include "ScheduleStrategy.h" +#include "utils/Error.h" +#include "utils/Log.h" + +namespace zilliz { +namespace vecwise { +namespace engine { + +IndexLoaderQueue& +IndexLoaderQueue::GetInstance() { + static IndexLoaderQueue s_instance; + return s_instance; +} + +void +IndexLoaderQueue::Put(const SearchContextPtr &search_context) { + std::unique_lock lock(mtx); + full_.wait(lock, [this] { return (queue_.size() < capacity_); }); + + if(search_context == nullptr) { + queue_.push_back(nullptr); + return; + } + + if (queue_.size() >= capacity_) { + std::string error_msg = + "blocking queue is full, capacity: " + std::to_string(capacity_) + " queue_size: " + + std::to_string(queue_.size()); + SERVER_LOG_ERROR << error_msg; + throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); + } + + ScheduleStrategyPtr strategy = CreateStrategy(); + strategy->Schedule(search_context, queue_); + + empty_.notify_all(); +} + +IndexLoaderContextPtr +IndexLoaderQueue::Take() { + std::unique_lock lock(mtx); + empty_.wait(lock, [this] { return !queue_.empty(); }); + + if (queue_.empty()) { + std::string error_msg = "blocking queue empty"; + SERVER_LOG_ERROR << error_msg; + throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); + } + + IndexLoaderContextPtr front(queue_.front()); + queue_.pop_front(); + full_.notify_all(); + return front; +} + +size_t +IndexLoaderQueue::Size() { + std::lock_guard lock(mtx); + return queue_.size(); +} + +IndexLoaderContextPtr +IndexLoaderQueue::Front() { + std::unique_lock lock(mtx); + empty_.wait(lock, [this] { return !queue_.empty(); }); + if (queue_.empty()) { + std::string error_msg = "blocking queue empty"; + SERVER_LOG_ERROR << error_msg; + throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); + } + IndexLoaderContextPtr front(queue_.front()); + return front; +} + +IndexLoaderContextPtr +IndexLoaderQueue::Back() { + std::unique_lock lock(mtx); + empty_.wait(lock, [this] { return !queue_.empty(); }); + + if (queue_.empty()) { + std::string error_msg = "blocking queue empty"; + SERVER_LOG_ERROR << error_msg; + throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); + } + + IndexLoaderContextPtr back(queue_.back()); + return back; +} + +bool +IndexLoaderQueue::Empty() { + std::unique_lock lock(mtx); + return queue_.empty(); +} + +void +IndexLoaderQueue::SetCapacity(const size_t capacity) { + capacity_ = (capacity > 0 ? capacity : capacity_); +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/db/scheduler/IndexLoaderQueue.h b/cpp/src/db/scheduler/IndexLoaderQueue.h new file mode 100644 index 0000000000..f0d71dcbd7 --- /dev/null +++ b/cpp/src/db/scheduler/IndexLoaderQueue.h @@ -0,0 +1,66 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "SearchContext.h" + +#include +#include +#include +#include + + +namespace zilliz { +namespace vecwise { +namespace engine { + + +class IndexLoaderContext { +public: + TableFileSchemaPtr file_; + std::vector search_contexts_; +}; +using IndexLoaderContextPtr = std::shared_ptr; + +class IndexLoaderQueue { +private: + IndexLoaderQueue() : mtx(), full_(), empty_() {} + + IndexLoaderQueue(const IndexLoaderQueue &rhs) = delete; + + IndexLoaderQueue &operator=(const IndexLoaderQueue &rhs) = delete; + +public: + using LoaderQueue = std::list; + + static IndexLoaderQueue& GetInstance(); + + void Put(const SearchContextPtr &search_context); + + IndexLoaderContextPtr Take(); + + IndexLoaderContextPtr Front(); + + IndexLoaderContextPtr Back(); + + size_t Size(); + + bool Empty(); + + void SetCapacity(const size_t capacity); + +private: + mutable std::mutex mtx; + std::condition_variable full_; + std::condition_variable empty_; + + LoaderQueue queue_; + size_t capacity_ = 1000000; +}; + +} +} +} diff --git a/cpp/src/db/scheduler/ScheduleStrategy.cpp b/cpp/src/db/scheduler/ScheduleStrategy.cpp new file mode 100644 index 0000000000..6a25d4c57c --- /dev/null +++ b/cpp/src/db/scheduler/ScheduleStrategy.cpp @@ -0,0 +1,65 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + + +#include "ScheduleStrategy.h" +#include "cache/CpuCacheMgr.h" +#include "utils/Error.h" +#include "utils/Log.h" + +namespace zilliz { +namespace vecwise { +namespace engine { + +class MemScheduleStrategy : public IScheduleStrategy { +public: + bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) override; +}; + +ScheduleStrategyPtr CreateStrategy() { + ScheduleStrategyPtr strategy(new MemScheduleStrategy()); + return strategy; +} + +bool MemScheduleStrategy::Schedule(const SearchContextPtr &search_context, + IndexLoaderQueue::LoaderQueue &loader_list) { + if(search_context == nullptr) { + return false; + } + + SearchContext::Id2IndexMap index_files = search_context->GetIndexMap(); + //some index loader alread exists + for(auto iter = loader_list.begin(); iter != loader_list.end(); ++iter) { + if(index_files.find((*iter)->file_->id) != index_files.end()){ + SERVER_LOG_INFO << "Append SearchContext to exist IndexLoaderContext"; + index_files.erase((*iter)->file_->id); + (*iter)->search_contexts_.push_back(search_context); + } + } + + //index_files still contains some index files, create new loader + for(auto iter = index_files.begin(); iter != index_files.end(); ++iter) { + SERVER_LOG_INFO << "Create new IndexLoaderContext for: " << iter->second->location; + IndexLoaderContextPtr new_loader = std::make_shared(); + new_loader->search_contexts_.push_back(search_context); + new_loader->file_ = iter->second; + + auto index = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(iter->second->location); + if(index != nullptr) { + //if the index file has been in memory, increase its priority + loader_list.push_front(new_loader); + } else { + //index file not in memory, put it to tail + loader_list.push_back(new_loader); + } + } + + return true; +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/db/scheduler/ScheduleStrategy.h b/cpp/src/db/scheduler/ScheduleStrategy.h new file mode 100644 index 0000000000..6b9ac3bee4 --- /dev/null +++ b/cpp/src/db/scheduler/ScheduleStrategy.h @@ -0,0 +1,28 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "IndexLoaderQueue.h" +#include "SearchContext.h" + +namespace zilliz { +namespace vecwise { +namespace engine { + +class IScheduleStrategy { +public: + virtual ~IScheduleStrategy() {} + + virtual bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) = 0; +}; + +using ScheduleStrategyPtr = std::shared_ptr; + +ScheduleStrategyPtr CreateStrategy(); + +} +} +} diff --git a/cpp/src/db/scheduler/SearchContext.cpp b/cpp/src/db/scheduler/SearchContext.cpp new file mode 100644 index 0000000000..51f523bf70 --- /dev/null +++ b/cpp/src/db/scheduler/SearchContext.cpp @@ -0,0 +1,54 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "SearchContext.h" +#include "utils/Log.h" +#include + +namespace zilliz { +namespace vecwise { +namespace engine { + +SearchContext::SearchContext(uint64_t topk, uint64_t nq, const float* vectors) + : topk_(topk), + nq_(nq), + vectors_(vectors) { + //use current time to identify this context + time_t t; + time(&t); + identity_ = std::to_string(t); +} + +bool +SearchContext::AddIndexFile(TableFileSchemaPtr& index_file) { + std::unique_lock lock(mtx_); + if(index_file == nullptr || map_index_files_.find(index_file->id) != map_index_files_.end()) { + return false; + } + + SERVER_LOG_INFO << "SearchContext " << identity_ << " add index file: " << index_file->id; + + map_index_files_[index_file->id] = index_file; + return true; +} + +void +SearchContext::IndexSearchDone(size_t index_id) { + std::unique_lock lock(mtx_); + map_index_files_.erase(index_id); + done_cond_.notify_all(); + SERVER_LOG_INFO << "SearchContext " << identity_ << " finish index file: " << index_id; +} + +void +SearchContext::WaitResult() { + std::unique_lock lock(mtx_); + done_cond_.wait(lock, [this] { return map_index_files_.empty(); }); +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/db/scheduler/SearchContext.h b/cpp/src/db/scheduler/SearchContext.h new file mode 100644 index 0000000000..a7ffedb0de --- /dev/null +++ b/cpp/src/db/scheduler/SearchContext.h @@ -0,0 +1,62 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "../MetaTypes.h" + +#include +#include +#include +#include + +namespace zilliz { +namespace vecwise { +namespace engine { + +using TableFileSchemaPtr = std::shared_ptr; + +class SearchContext { +public: + SearchContext(uint64_t topk, uint64_t nq, const float* vectors); + + bool AddIndexFile(TableFileSchemaPtr& index_file); + + uint64_t Topk() const { return topk_; } + uint64_t Nq() const { return nq_; } + const float* Vectors() const { return vectors_; } + + using Id2IndexMap = std::unordered_map; + const Id2IndexMap& GetIndexMap() const { return map_index_files_; } + + using Score2IdMap = std::map; + using ResultSet = std::vector; + const ResultSet& GetResult() const { return result_; } + ResultSet& GetResult() { return result_; } + + void IndexSearchDone(size_t index_id); + void WaitResult(); + +private: + uint64_t topk_ = 0; + uint64_t nq_ = 0; + const float* vectors_ = nullptr; + + Id2IndexMap map_index_files_; + ResultSet result_; + + std::mutex mtx_; + std::condition_variable done_cond_; + + std::string identity_; //for debug +}; + +using SearchContextPtr = std::shared_ptr; + + + +} +} +} diff --git a/cpp/src/db/scheduler/SearchScheduler.cpp b/cpp/src/db/scheduler/SearchScheduler.cpp new file mode 100644 index 0000000000..2b1114a14e --- /dev/null +++ b/cpp/src/db/scheduler/SearchScheduler.cpp @@ -0,0 +1,150 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "SearchScheduler.h" +#include "IndexLoaderQueue.h" +#include "SearchTaskQueue.h" +#include "utils/Log.h" +#include "utils/TimeRecorder.h" +#include "metrics/Metrics.h" + +namespace zilliz { +namespace vecwise { +namespace engine { + +SearchScheduler::SearchScheduler() + : thread_pool_(2), + stopped_(true) { + Start(); +} + +SearchScheduler::~SearchScheduler() { + Stop(); +} + +SearchScheduler& SearchScheduler::GetInstance() { + static SearchScheduler s_instance; + return s_instance; +} + +bool +SearchScheduler::Start() { + if(!stopped_) { + return true; + } + + thread_pool_.enqueue(&SearchScheduler::IndexLoadWorker, this); + thread_pool_.enqueue(&SearchScheduler::SearchWorker, this); + return true; +} + +bool +SearchScheduler::Stop() { + if(stopped_) { + return true; + } + + IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance(); + index_queue.Put(nullptr); + + SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance(); + search_queue.Put(nullptr); + + return true; +} + +bool +SearchScheduler::ScheduleSearchTask(SearchContextPtr& search_context) { + IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance(); + index_queue.Put(search_context); + + return true; +} + +bool +SearchScheduler::IndexLoadWorker() { + IndexLoaderQueue& index_queue = IndexLoaderQueue::GetInstance(); + SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance(); + while(true) { + IndexLoaderContextPtr context = index_queue.Take(); + if(context == nullptr) { + break;//exit + } + + SERVER_LOG_INFO << "Loading index(" << context->file_->id << ") from location: " << context->file_->location; + + server::TimeRecorder rc("Load index"); + //load index + IndexEnginePtr index_ptr = std::make_shared(context->file_->dimension, context->file_->location); + index_ptr->Load(); + + rc.Record("load index file to memory"); + + size_t file_size = index_ptr->PhysicalSize(); + LOG(DEBUG) << "Index file type " << context->file_->file_type << " Of Size: " + << file_size/(1024*1024) << " M"; + + //metric + if(context->file_->file_type == meta::TableFileSchema::RAW) { + server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size); + server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size); + server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size); + + } else if(context->file_->file_type == meta::TableFileSchema::TO_INDEX) { + server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size); + server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size); + server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size); + + } else { + server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size); + server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size); + server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size); + } + + //put search task to another queue + SearchTaskPtr task_ptr = std::make_shared(); + task_ptr->index_id_ = context->file_->id; + task_ptr->index_type_ = context->file_->file_type; + task_ptr->index_engine_ = index_ptr; + task_ptr->search_contexts_.swap(context->search_contexts_); + search_queue.Put(task_ptr); + } + + return true; +} + +bool +SearchScheduler::SearchWorker() { + SearchTaskQueue& search_queue = SearchTaskQueue::GetInstance(); + while(true) { + SearchTaskPtr task_ptr = search_queue.Take(); + if(task_ptr == nullptr) { + break;//exit + } + + SERVER_LOG_INFO << "Searching in index(" << task_ptr->index_id_<< ") with " + << task_ptr->search_contexts_.size() << " tasks"; + + //do search + auto start_time = METRICS_NOW_TIME; + task_ptr->DoSearch(); + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + if(task_ptr->index_type_ == meta::TableFileSchema::RAW) { + server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); + } else if(task_ptr->index_type_ == meta::TableFileSchema::TO_INDEX) { + server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); + } else { + server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time); + } + } + + return true; +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/db/scheduler/SearchScheduler.h b/cpp/src/db/scheduler/SearchScheduler.h new file mode 100644 index 0000000000..24c85395fc --- /dev/null +++ b/cpp/src/db/scheduler/SearchScheduler.h @@ -0,0 +1,40 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "SearchContext.h" +#include "utils/ThreadPool.h" + +namespace zilliz { +namespace vecwise { +namespace engine { + +class SearchScheduler { +private: + SearchScheduler(); + virtual ~SearchScheduler(); + +public: + static SearchScheduler& GetInstance(); + + bool ScheduleSearchTask(SearchContextPtr& search_context); + +private: + bool Start(); + bool Stop(); + + bool IndexLoadWorker(); + bool SearchWorker(); + +private: + server::ThreadPool thread_pool_; + bool stopped_ = true; +}; + + +} +} +} diff --git a/cpp/src/db/scheduler/SearchTaskQueue.h b/cpp/src/db/scheduler/SearchTaskQueue.h new file mode 100644 index 0000000000..51090a2bad --- /dev/null +++ b/cpp/src/db/scheduler/SearchTaskQueue.h @@ -0,0 +1,63 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "SearchContext.h" +#include "utils/BlockingQueue.h" +#include "../FaissExecutionEngine.h" +#include "../Traits.h" + +#include + +namespace zilliz { +namespace vecwise { +namespace engine { + +#ifdef GPU_VERSION +using IndexTraitClass = IVFIndexTrait; +#else +using IndexTraitClass = IDMapIndexTrait; +#endif + +using IndexClass = FaissExecutionEngine; +using IndexEnginePtr = std::shared_ptr; + +template +class SearchTask { +public: + bool DoSearch(); + +public: + size_t index_id_ = 0; + int index_type_ = 0; //for metrics + IndexEnginePtr index_engine_; + std::vector search_contexts_; +}; + +using SearchTaskClass = SearchTask; +using SearchTaskPtr = std::shared_ptr; + +class SearchTaskQueue : public server::BlockingQueue { +private: + SearchTaskQueue() {} + + SearchTaskQueue(const SearchTaskQueue &rhs) = delete; + + SearchTaskQueue &operator=(const SearchTaskQueue &rhs) = delete; + +public: + static SearchTaskQueue& GetInstance(); + +private: + +}; + + +} +} +} + +#include "SearchTaskQueue.inl" \ No newline at end of file diff --git a/cpp/src/db/scheduler/SearchTaskQueue.inl b/cpp/src/db/scheduler/SearchTaskQueue.inl new file mode 100644 index 0000000000..94afd9b887 --- /dev/null +++ b/cpp/src/db/scheduler/SearchTaskQueue.inl @@ -0,0 +1,106 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "SearchTaskQueue.h" +#include "utils/Log.h" +#include "utils/TimeRecorder.h" + +namespace zilliz { +namespace vecwise { +namespace engine { + +namespace { +void ClusterResult(const std::vector &output_ids, + const std::vector &output_distence, + uint64_t nq, + uint64_t topk, + SearchContext::ResultSet &result_set) { + result_set.clear(); + for (auto i = 0; i < nq; i++) { + SearchContext::Score2IdMap score2id; + for (auto k = 0; k < topk; k++) { + uint64_t index = i * nq + k; + score2id.insert(std::make_pair(output_distence[index], output_ids[index])); + } + result_set.emplace_back(score2id); + } +} + +void TopkResult(SearchContext::ResultSet &result_src, + uint64_t topk, + SearchContext::ResultSet &result_target) { + if (result_target.empty()) { + result_target.swap(result_src); + return; + } + + if (result_src.size() != result_target.size()) { + SERVER_LOG_ERROR << "Invalid result set"; + return; + } + + for (size_t i = 0; i < result_src.size(); i++) { + SearchContext::Score2IdMap &score2id_src = result_src[i]; + SearchContext::Score2IdMap &score2id_target = result_target[i]; + for (auto iter = score2id_src.begin(); iter != score2id_src.end(); ++iter) { + score2id_target.insert(std::make_pair(iter->first, iter->second)); + } + + //remove unused items + while (score2id_target.size() > topk) { + score2id_target.erase(score2id_target.rbegin()->first); + } + } +} +} + +SearchTaskQueue& +SearchTaskQueue::GetInstance() { + static SearchTaskQueue s_instance; + return s_instance; +} + +template +bool SearchTask::DoSearch() { + if(index_engine_ == nullptr) { + return false; + } + + server::TimeRecorder rc("DoSearch"); + + std::vector output_ids; + std::vector output_distence; + for(auto& context : search_contexts_) { + auto inner_k = index_engine_->Count() < context->Topk() ? index_engine_->Count() : context->Topk(); + output_ids.resize(inner_k*context->Nq()); + output_distence.resize(inner_k*context->Nq()); + + try { + index_engine_->Search(context->Nq(), context->Vectors(), inner_k, output_distence.data(), + output_ids.data()); + } catch (std::exception& ex) { + SERVER_LOG_ERROR << "SearchTask encounter exception: " << ex.what(); + } + + rc.Record("do search"); + + SearchContext::ResultSet result_set; + ClusterResult(output_ids, output_distence, context->Nq(), inner_k, result_set); + rc.Record("cluster result"); + TopkResult(result_set, inner_k, context->GetResult()); + rc.Record("reduce topk"); + context->IndexSearchDone(index_id_); + } + + rc.Elapse("totally cost"); + + return true; +} + +} +} +} diff --git a/cpp/src/sdk/src/client/ClientProxy.cpp b/cpp/src/sdk/src/client/ClientProxy.cpp index 1208494fb3..1e23484742 100644 --- a/cpp/src/sdk/src/client/ClientProxy.cpp +++ b/cpp/src/sdk/src/client/ClientProxy.cpp @@ -264,7 +264,7 @@ ClientProxy::SearchVector(const std::string &table_name, } } catch ( std::exception& ex) { - return Status(StatusCode::UnknownError, "failed to create table partition: " + std::string(ex.what())); + return Status(StatusCode::UnknownError, "failed to search vectors: " + std::string(ex.what())); } return Status::OK(); diff --git a/cpp/src/server/MegasearchTask.cpp b/cpp/src/server/MegasearchTask.cpp index 6871893348..1ea2a235dc 100644 --- a/cpp/src/server/MegasearchTask.cpp +++ b/cpp/src/server/MegasearchTask.cpp @@ -455,7 +455,7 @@ ServerError SearchVectorTask::OnExecute() { result_array_.emplace_back(thrift_topk_result); } rc.Record("construct result"); - + rc.Elapse("totally cost"); } catch (std::exception& ex) { error_code_ = SERVER_UNEXPECTED_ERROR; error_msg_ = ex.what(); diff --git a/cpp/unittest/db/CMakeLists.txt b/cpp/unittest/db/CMakeLists.txt index 2b3d4609b0..ecfeb1e92c 100644 --- a/cpp/unittest/db/CMakeLists.txt +++ b/cpp/unittest/db/CMakeLists.txt @@ -6,6 +6,7 @@ include_directories(../../src) aux_source_directory(../../src/db db_srcs) +aux_source_directory(../../src/db/scheduler db_scheduler_srcs) aux_source_directory(../../src/config config_files) aux_source_directory(../../src/cache cache_srcs) aux_source_directory(../../src/wrapper wrapper_src) @@ -24,6 +25,7 @@ set(db_test_src ${config_files} ${cache_srcs} ${db_srcs} + ${db_scheduler_srcs} ${wrapper_src} ${require_files} utils.cpp diff --git a/cpp/unittest/metrics/CMakeLists.txt b/cpp/unittest/metrics/CMakeLists.txt index 2560467c5b..f24538fad4 100644 --- a/cpp/unittest/metrics/CMakeLists.txt +++ b/cpp/unittest/metrics/CMakeLists.txt @@ -13,6 +13,7 @@ include_directories(../../src) aux_source_directory(../../src/db db_srcs) +aux_source_directory(../../src/db/scheduler db_scheduler_srcs) aux_source_directory(../../src/config config_files) aux_source_directory(../../src/cache cache_srcs) aux_source_directory(../../src/wrapper wrapper_src) @@ -47,6 +48,7 @@ set(count_test_src ${config_files} ${cache_srcs} ${db_srcs} + ${db_scheduler_srcs} ${wrapper_src} ${require_files} metrics_test.cpp From 1f90641697bf266e86c8cedb6c25ae270da9241c Mon Sep 17 00:00:00 2001 From: groot Date: Tue, 4 Jun 2019 19:36:48 +0800 Subject: [PATCH 02/11] changelog Former-commit-id: 9dc092a8836e7da04966acecece0270467f48959 --- cpp/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 02b4e490c2..69a665deb4 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -37,6 +37,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-37 - Add query, cache usage, disk write speed and file data size metrics - MS-30 - Use faiss v1.5.2 - MS-54 - cmake: Change Thrift third party URL to github.com +- MS-57 - Implement index load/search pipeline ## Task From db1b37d45ca154ab1951b00eb1d6fcdcd4b62c1c Mon Sep 17 00:00:00 2001 From: groot Date: Wed, 5 Jun 2019 12:14:53 +0800 Subject: [PATCH 03/11] fix problems Former-commit-id: c90a6834c4848fbaed1a1b0c753979f307db4d27 --- cpp/CHANGELOG.md | 3 +- cpp/src/db/DBImpl.inl | 1 - cpp/src/db/scheduler/IScheduleStrategy.h | 26 ++++++++ cpp/src/db/scheduler/IndexLoaderQueue.cpp | 2 +- cpp/src/db/scheduler/ScheduleStrategy.cpp | 75 +++++++++++------------ cpp/src/db/scheduler/ScheduleStrategy.h | 16 ++--- cpp/src/db/scheduler/SearchContext.h | 8 +-- cpp/src/db/scheduler/SearchScheduler.cpp | 35 ++++++----- cpp/src/db/scheduler/SearchTaskQueue.h | 2 +- cpp/src/db/scheduler/SearchTaskQueue.inl | 18 ++++-- 10 files changed, 111 insertions(+), 75 deletions(-) create mode 100644 cpp/src/db/scheduler/IScheduleStrategy.h diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 69a665deb4..47a5ebf6e4 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -10,6 +10,8 @@ Please mark all change in change log and use the ticket from JIRA. ## New Feature +- MS-57 - Implement index load/search pipeline + ## Task # MegaSearch 0.2.0 (2019-05-31) @@ -37,7 +39,6 @@ Please mark all change in change log and use the ticket from JIRA. - MS-37 - Add query, cache usage, disk write speed and file data size metrics - MS-30 - Use faiss v1.5.2 - MS-54 - cmake: Change Thrift third party URL to github.com -- MS-57 - Implement index load/search pipeline ## Task diff --git a/cpp/src/db/DBImpl.inl b/cpp/src/db/DBImpl.inl index adb0e763e0..10f16ee9eb 100644 --- a/cpp/src/db/DBImpl.inl +++ b/cpp/src/db/DBImpl.inl @@ -8,7 +8,6 @@ #include "DBImpl.h" #include "DBMetaImpl.h" #include "Env.h" -#include "utils/Log.h" #include "metrics/Metrics.h" #include "scheduler/SearchScheduler.h" diff --git a/cpp/src/db/scheduler/IScheduleStrategy.h b/cpp/src/db/scheduler/IScheduleStrategy.h new file mode 100644 index 0000000000..190125dfdd --- /dev/null +++ b/cpp/src/db/scheduler/IScheduleStrategy.h @@ -0,0 +1,26 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "IndexLoaderQueue.h" +#include "SearchContext.h" + +namespace zilliz { +namespace vecwise { +namespace engine { + +class IScheduleStrategy { +public: + virtual ~IScheduleStrategy() {} + + virtual bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) = 0; +}; + +using ScheduleStrategyPtr = std::shared_ptr; + +} +} +} \ No newline at end of file diff --git a/cpp/src/db/scheduler/IndexLoaderQueue.cpp b/cpp/src/db/scheduler/IndexLoaderQueue.cpp index 6d476a9df9..7f748a65a9 100644 --- a/cpp/src/db/scheduler/IndexLoaderQueue.cpp +++ b/cpp/src/db/scheduler/IndexLoaderQueue.cpp @@ -37,7 +37,7 @@ IndexLoaderQueue::Put(const SearchContextPtr &search_context) { throw server::ServerException(server::SERVER_BLOCKING_QUEUE_EMPTY, error_msg); } - ScheduleStrategyPtr strategy = CreateStrategy(); + ScheduleStrategyPtr strategy = StrategyFactory::CreateMemStrategy(); strategy->Schedule(search_context, queue_); empty_.notify_all(); diff --git a/cpp/src/db/scheduler/ScheduleStrategy.cpp b/cpp/src/db/scheduler/ScheduleStrategy.cpp index 6a25d4c57c..2b344b33ef 100644 --- a/cpp/src/db/scheduler/ScheduleStrategy.cpp +++ b/cpp/src/db/scheduler/ScheduleStrategy.cpp @@ -16,50 +16,49 @@ namespace engine { class MemScheduleStrategy : public IScheduleStrategy { public: - bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) override; + bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) override { + if(search_context == nullptr) { + return false; + } + + SearchContext::Id2IndexMap index_files = search_context->GetIndexMap(); + //some index loader alread exists + for(auto& loader : loader_list) { + if(index_files.find(loader->file_->id) != index_files.end()){ + SERVER_LOG_INFO << "Append SearchContext to exist IndexLoaderContext"; + index_files.erase(loader->file_->id); + loader->search_contexts_.push_back(search_context); + } + } + + //index_files still contains some index files, create new loader + for(auto& pair : index_files) { + SERVER_LOG_INFO << "Create new IndexLoaderContext for: " << pair.second->location; + IndexLoaderContextPtr new_loader = std::make_shared(); + new_loader->search_contexts_.push_back(search_context); + new_loader->file_ = pair.second; + + auto index = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(pair.second->location); + if(index != nullptr) { + //if the index file has been in memory, increase its priority + loader_list.push_front(new_loader); + } else { + //index file not in memory, put it to tail + loader_list.push_back(new_loader); + } + } + + return true; + } }; -ScheduleStrategyPtr CreateStrategy() { + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +ScheduleStrategyPtr StrategyFactory::CreateMemStrategy() { ScheduleStrategyPtr strategy(new MemScheduleStrategy()); return strategy; } -bool MemScheduleStrategy::Schedule(const SearchContextPtr &search_context, - IndexLoaderQueue::LoaderQueue &loader_list) { - if(search_context == nullptr) { - return false; - } - - SearchContext::Id2IndexMap index_files = search_context->GetIndexMap(); - //some index loader alread exists - for(auto iter = loader_list.begin(); iter != loader_list.end(); ++iter) { - if(index_files.find((*iter)->file_->id) != index_files.end()){ - SERVER_LOG_INFO << "Append SearchContext to exist IndexLoaderContext"; - index_files.erase((*iter)->file_->id); - (*iter)->search_contexts_.push_back(search_context); - } - } - - //index_files still contains some index files, create new loader - for(auto iter = index_files.begin(); iter != index_files.end(); ++iter) { - SERVER_LOG_INFO << "Create new IndexLoaderContext for: " << iter->second->location; - IndexLoaderContextPtr new_loader = std::make_shared(); - new_loader->search_contexts_.push_back(search_context); - new_loader->file_ = iter->second; - - auto index = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(iter->second->location); - if(index != nullptr) { - //if the index file has been in memory, increase its priority - loader_list.push_front(new_loader); - } else { - //index file not in memory, put it to tail - loader_list.push_back(new_loader); - } - } - - return true; -} - } } } \ No newline at end of file diff --git a/cpp/src/db/scheduler/ScheduleStrategy.h b/cpp/src/db/scheduler/ScheduleStrategy.h index 6b9ac3bee4..da01d7e745 100644 --- a/cpp/src/db/scheduler/ScheduleStrategy.h +++ b/cpp/src/db/scheduler/ScheduleStrategy.h @@ -5,24 +5,20 @@ ******************************************************************************/ #pragma once -#include "IndexLoaderQueue.h" -#include "SearchContext.h" +#include "IScheduleStrategy.h" namespace zilliz { namespace vecwise { namespace engine { -class IScheduleStrategy { -public: - virtual ~IScheduleStrategy() {} +class StrategyFactory { +private: + StrategyFactory() {} - virtual bool Schedule(const SearchContextPtr &search_context, IndexLoaderQueue::LoaderQueue& loader_list) = 0; +public: + static ScheduleStrategyPtr CreateMemStrategy(); }; -using ScheduleStrategyPtr = std::shared_ptr; - -ScheduleStrategyPtr CreateStrategy(); - } } } diff --git a/cpp/src/db/scheduler/SearchContext.h b/cpp/src/db/scheduler/SearchContext.h index a7ffedb0de..ae7327fd68 100644 --- a/cpp/src/db/scheduler/SearchContext.h +++ b/cpp/src/db/scheduler/SearchContext.h @@ -5,7 +5,7 @@ ******************************************************************************/ #pragma once -#include "../MetaTypes.h" +#include "db/MetaTypes.h" #include #include @@ -24,9 +24,9 @@ public: bool AddIndexFile(TableFileSchemaPtr& index_file); - uint64_t Topk() const { return topk_; } - uint64_t Nq() const { return nq_; } - const float* Vectors() const { return vectors_; } + uint64_t topk() const { return topk_; } + uint64_t nq() const { return nq_; } + const float* vectors() const { return vectors_; } using Id2IndexMap = std::unordered_map; const Id2IndexMap& GetIndexMap() const { return map_index_files_; } diff --git a/cpp/src/db/scheduler/SearchScheduler.cpp b/cpp/src/db/scheduler/SearchScheduler.cpp index 2b1114a14e..3ffb0dd94e 100644 --- a/cpp/src/db/scheduler/SearchScheduler.cpp +++ b/cpp/src/db/scheduler/SearchScheduler.cpp @@ -71,6 +71,7 @@ SearchScheduler::IndexLoadWorker() { while(true) { IndexLoaderContextPtr context = index_queue.Take(); if(context == nullptr) { + SERVER_LOG_INFO << "Stop thread for index loading"; break;//exit } @@ -88,20 +89,25 @@ SearchScheduler::IndexLoadWorker() { << file_size/(1024*1024) << " M"; //metric - if(context->file_->file_type == meta::TableFileSchema::RAW) { - server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size); - server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size); - server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size); - - } else if(context->file_->file_type == meta::TableFileSchema::TO_INDEX) { - server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size); - server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size); - server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size); - - } else { - server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size); - server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size); - server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size); + switch(context->file_->file_type) { + case meta::TableFileSchema::RAW: { + server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size); + server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size); + server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size); + break; + } + case meta::TableFileSchema::TO_INDEX: { + server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size); + server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size); + server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size); + break; + } + default: { + server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size); + server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size); + server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size); + break; + } } //put search task to another queue @@ -122,6 +128,7 @@ SearchScheduler::SearchWorker() { while(true) { SearchTaskPtr task_ptr = search_queue.Take(); if(task_ptr == nullptr) { + SERVER_LOG_INFO << "Stop thread for searching"; break;//exit } diff --git a/cpp/src/db/scheduler/SearchTaskQueue.h b/cpp/src/db/scheduler/SearchTaskQueue.h index 51090a2bad..3b58294811 100644 --- a/cpp/src/db/scheduler/SearchTaskQueue.h +++ b/cpp/src/db/scheduler/SearchTaskQueue.h @@ -42,7 +42,7 @@ using SearchTaskPtr = std::shared_ptr; class SearchTaskQueue : public server::BlockingQueue { private: - SearchTaskQueue() {} + SearchTaskQueue(); SearchTaskQueue(const SearchTaskQueue &rhs) = delete; diff --git a/cpp/src/db/scheduler/SearchTaskQueue.inl b/cpp/src/db/scheduler/SearchTaskQueue.inl index 94afd9b887..c9cbaf8a1b 100644 --- a/cpp/src/db/scheduler/SearchTaskQueue.inl +++ b/cpp/src/db/scheduler/SearchTaskQueue.inl @@ -58,6 +58,12 @@ void TopkResult(SearchContext::ResultSet &result_src, } } + +SearchTaskQueue::SearchTaskQueue() { + SetCapacity(4); +} + + SearchTaskQueue& SearchTaskQueue::GetInstance() { static SearchTaskQueue s_instance; @@ -75,21 +81,23 @@ bool SearchTask::DoSearch() { std::vector output_ids; std::vector output_distence; for(auto& context : search_contexts_) { - auto inner_k = index_engine_->Count() < context->Topk() ? index_engine_->Count() : context->Topk(); - output_ids.resize(inner_k*context->Nq()); - output_distence.resize(inner_k*context->Nq()); + auto inner_k = index_engine_->Count() < context->topk() ? index_engine_->Count() : context->topk(); + output_ids.resize(inner_k*context->nq()); + output_distence.resize(inner_k*context->nq()); try { - index_engine_->Search(context->Nq(), context->Vectors(), inner_k, output_distence.data(), + index_engine_->Search(context->nq(), context->vectors(), inner_k, output_distence.data(), output_ids.data()); } catch (std::exception& ex) { SERVER_LOG_ERROR << "SearchTask encounter exception: " << ex.what(); + context->IndexSearchDone(index_id_);//mark as done avoid dead lock, even search failed + continue; } rc.Record("do search"); SearchContext::ResultSet result_set; - ClusterResult(output_ids, output_distence, context->Nq(), inner_k, result_set); + ClusterResult(output_ids, output_distence, context->nq(), inner_k, result_set); rc.Record("cluster result"); TopkResult(result_set, inner_k, context->GetResult()); rc.Record("reduce topk"); From ab3c9649c6bab5a8839970a11d5158369e95a8e2 Mon Sep 17 00:00:00 2001 From: groot Date: Wed, 5 Jun 2019 12:29:31 +0800 Subject: [PATCH 04/11] search context id Former-commit-id: 7e8bafac6b78d879edb8a09cd2ffb83c46d44401 --- cpp/src/db/scheduler/SearchContext.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cpp/src/db/scheduler/SearchContext.cpp b/cpp/src/db/scheduler/SearchContext.cpp index 51f523bf70..7cde55f31a 100644 --- a/cpp/src/db/scheduler/SearchContext.cpp +++ b/cpp/src/db/scheduler/SearchContext.cpp @@ -6,7 +6,8 @@ #include "SearchContext.h" #include "utils/Log.h" -#include + +#include namespace zilliz { namespace vecwise { @@ -17,9 +18,9 @@ SearchContext::SearchContext(uint64_t topk, uint64_t nq, const float* vectors) nq_(nq), vectors_(vectors) { //use current time to identify this context - time_t t; - time(&t); - identity_ = std::to_string(t); + std::chrono::system_clock::time_point tp = std::chrono::system_clock::now(); + long id = tp.time_since_epoch().count(); + identity_ = std::to_string(id); } bool From 326945efa715aa9b37d73eb020fee44120af9150 Mon Sep 17 00:00:00 2001 From: groot Date: Wed, 5 Jun 2019 12:33:22 +0800 Subject: [PATCH 05/11] use switch case Former-commit-id: 39b674d81433171cc62f30b75064c1f4daa844f9 --- cpp/src/db/scheduler/SearchScheduler.cpp | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/cpp/src/db/scheduler/SearchScheduler.cpp b/cpp/src/db/scheduler/SearchScheduler.cpp index 3ffb0dd94e..4d3a12fdf8 100644 --- a/cpp/src/db/scheduler/SearchScheduler.cpp +++ b/cpp/src/db/scheduler/SearchScheduler.cpp @@ -140,12 +140,19 @@ SearchScheduler::SearchWorker() { task_ptr->DoSearch(); auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time, end_time); - if(task_ptr->index_type_ == meta::TableFileSchema::RAW) { - server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); - } else if(task_ptr->index_type_ == meta::TableFileSchema::TO_INDEX) { - server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); - } else { - server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time); + switch(task_ptr->index_type_) { + case meta::TableFileSchema::RAW: { + server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); + break; + } + case meta::TableFileSchema::TO_INDEX: { + server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time); + break; + } + default: { + server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time); + break; + } } } From dfef871f97ba7e36014434df5a150f5ed44a39b2 Mon Sep 17 00:00:00 2001 From: groot Date: Wed, 5 Jun 2019 15:41:35 +0800 Subject: [PATCH 06/11] rename Former-commit-id: 0db9006f7f17adcc933984f9b471a9173cd5b97e --- cpp/src/db/scheduler/IndexLoaderQueue.cpp | 4 ++-- cpp/src/db/scheduler/SearchTaskQueue.inl | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/db/scheduler/IndexLoaderQueue.cpp b/cpp/src/db/scheduler/IndexLoaderQueue.cpp index 7f748a65a9..2840e6564e 100644 --- a/cpp/src/db/scheduler/IndexLoaderQueue.cpp +++ b/cpp/src/db/scheduler/IndexLoaderQueue.cpp @@ -15,8 +15,8 @@ namespace engine { IndexLoaderQueue& IndexLoaderQueue::GetInstance() { - static IndexLoaderQueue s_instance; - return s_instance; + static IndexLoaderQueue instance; + return instance; } void diff --git a/cpp/src/db/scheduler/SearchTaskQueue.inl b/cpp/src/db/scheduler/SearchTaskQueue.inl index c9cbaf8a1b..7816ba8878 100644 --- a/cpp/src/db/scheduler/SearchTaskQueue.inl +++ b/cpp/src/db/scheduler/SearchTaskQueue.inl @@ -66,8 +66,8 @@ SearchTaskQueue::SearchTaskQueue() { SearchTaskQueue& SearchTaskQueue::GetInstance() { - static SearchTaskQueue s_instance; - return s_instance; + static SearchTaskQueue instance; + return instance; } template From 956f774c62a85fd0be2440cc17a29978bfc8b3e3 Mon Sep 17 00:00:00 2001 From: Xu Peng Date: Wed, 5 Jun 2019 21:10:53 +0800 Subject: [PATCH 07/11] fix(db): temp fix for files to search Former-commit-id: f45a0e22708c9e9fc388f3fbe6a40c3e0094dd77 --- cpp/src/db/DBMetaImpl.cpp | 123 +++++++++++++++++++++++++------------- 1 file changed, 83 insertions(+), 40 deletions(-) diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index 731830fb38..ea325c0c54 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -49,6 +49,7 @@ inline auto StoragePrototype(const std::string &path) { using ConnectorT = decltype(StoragePrototype("")); static std::unique_ptr ConnectorPtr; +using ConditionT = decltype(c(&TableFileSchema::id) == 1UL); std::string DBMetaImpl::GetTablePath(const std::string &table_id) { return options_.path + "/tables/" + table_id; @@ -334,51 +335,93 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id, const DatesT &partition, DatePartionedTableFilesSchema &files) { files.clear(); - DatesT today = {Meta::GetDate()}; - const DatesT &dates = (partition.empty() == true) ? today : partition; try { server::Metrics::GetInstance().MetaAccessTotalIncrement(); auto start_time = METRICS_NOW_TIME; - auto selected = ConnectorPtr->select(columns(&TableFileSchema::id, - &TableFileSchema::table_id, - &TableFileSchema::file_id, - &TableFileSchema::file_type, - &TableFileSchema::size, - &TableFileSchema::date), - where(c(&TableFileSchema::table_id) == table_id and - in(&TableFileSchema::date, dates) and - (c(&TableFileSchema::file_type) == (int) TableFileSchema::RAW or - c(&TableFileSchema::file_type) - == (int) TableFileSchema::TO_INDEX or - c(&TableFileSchema::file_type) - == (int) TableFileSchema::INDEX))); - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); - TableSchema table_schema; - table_schema.table_id = table_id; - auto status = DescribeTable(table_schema); - if (!status.ok()) { - return status; - } - - TableFileSchema table_file; - - for (auto &file : selected) { - table_file.id = std::get<0>(file); - table_file.table_id = std::get<1>(file); - table_file.file_id = std::get<2>(file); - table_file.file_type = std::get<3>(file); - table_file.size = std::get<4>(file); - table_file.date = std::get<5>(file); - table_file.dimension = table_schema.dimension; - GetTableFilePath(table_file); - auto dateItr = files.find(table_file.date); - if (dateItr == files.end()) { - files[table_file.date] = TableFilesSchema(); + if (partition.empty()) { + auto selected = ConnectorPtr->select(columns(&TableFileSchema::id, + &TableFileSchema::table_id, + &TableFileSchema::file_id, + &TableFileSchema::file_type, + &TableFileSchema::size, + &TableFileSchema::date), + where(c(&TableFileSchema::table_id) == table_id and + (c(&TableFileSchema::file_type) == (int) TableFileSchema::RAW or + c(&TableFileSchema::file_type) + == (int) TableFileSchema::TO_INDEX or + c(&TableFileSchema::file_type) + == (int) TableFileSchema::INDEX))); + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + TableSchema table_schema; + table_schema.table_id = table_id; + auto status = DescribeTable(table_schema); + if (!status.ok()) { + return status; } - files[table_file.date].push_back(table_file); + + TableFileSchema table_file; + + for (auto &file : selected) { + table_file.id = std::get<0>(file); + table_file.table_id = std::get<1>(file); + table_file.file_id = std::get<2>(file); + table_file.file_type = std::get<3>(file); + table_file.size = std::get<4>(file); + table_file.date = std::get<5>(file); + table_file.dimension = table_schema.dimension; + GetTableFilePath(table_file); + auto dateItr = files.find(table_file.date); + if (dateItr == files.end()) { + files[table_file.date] = TableFilesSchema(); + } + files[table_file.date].push_back(table_file); + } + } + else { + auto selected = ConnectorPtr->select(columns(&TableFileSchema::id, + &TableFileSchema::table_id, + &TableFileSchema::file_id, + &TableFileSchema::file_type, + &TableFileSchema::size, + &TableFileSchema::date), + where(c(&TableFileSchema::table_id) == table_id and + in(&TableFileSchema::date, partition) and + (c(&TableFileSchema::file_type) == (int) TableFileSchema::RAW or + c(&TableFileSchema::file_type) + == (int) TableFileSchema::TO_INDEX or + c(&TableFileSchema::file_type) + == (int) TableFileSchema::INDEX))); + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + server::Metrics::GetInstance().MetaAccessDurationSecondsHistogramObserve(total_time); + TableSchema table_schema; + table_schema.table_id = table_id; + auto status = DescribeTable(table_schema); + if (!status.ok()) { + return status; + } + + TableFileSchema table_file; + + for (auto &file : selected) { + table_file.id = std::get<0>(file); + table_file.table_id = std::get<1>(file); + table_file.file_id = std::get<2>(file); + table_file.file_type = std::get<3>(file); + table_file.size = std::get<4>(file); + table_file.date = std::get<5>(file); + table_file.dimension = table_schema.dimension; + GetTableFilePath(table_file); + auto dateItr = files.find(table_file.date); + if (dateItr == files.end()) { + files[table_file.date] = TableFilesSchema(); + } + files[table_file.date].push_back(table_file); + } + } } catch (std::exception &e) { LOG(DEBUG) << e.what(); From baf70e6156e14c31eedac5c195f2109f6a967b64 Mon Sep 17 00:00:00 2001 From: Xu Peng Date: Wed, 5 Jun 2019 21:17:54 +0800 Subject: [PATCH 08/11] chore(all): update change log Former-commit-id: ccd39605c25d169f2bc7eb80e57cf20447b41715 --- cpp/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 47a5ebf6e4..699e65149b 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -44,4 +44,4 @@ Please mark all change in change log and use the ticket from JIRA. - MS-1 - Add CHANGELOG.md - MS-4 - Refactor the vecwise_engine code structure - +- MS-62 - Search range to all if no date specified From d019ccc338a9c6d477c3a335a1429db8cd4134c4 Mon Sep 17 00:00:00 2001 From: groot Date: Thu, 6 Jun 2019 10:24:40 +0800 Subject: [PATCH 09/11] automaticlly generate build version infor Former-commit-id: 4c8fccc57722bd527e1947590014e07ca110a108 --- cpp/.gitignore | 1 + cpp/CMakeLists.txt | 27 ++++++++++++++++++++++++++- cpp/src/main.cpp | 5 +++-- cpp/version.h.cf | 5 +++++ 4 files changed, 35 insertions(+), 3 deletions(-) create mode 100644 cpp/version.h.cf diff --git a/cpp/.gitignore b/cpp/.gitignore index e99e0273f3..88c9c4c2f4 100644 --- a/cpp/.gitignore +++ b/cpp/.gitignore @@ -4,3 +4,4 @@ third_party/bzip2-1.0.6/ third_party/sqlite3/ megasearch/ conf/server_config.yaml +version.h diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 58ba3aea7d..d38fd73556 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -7,7 +7,32 @@ cmake_minimum_required(VERSION 3.14) message(STATUS "Building using CMake version: ${CMAKE_VERSION}") -set(MEGASEARCH_VERSION "0.1.0") +MACRO (GET_CURRENT_TIME CURRENT_TIME) + execute_process(COMMAND "date" +"%Y-%m-%d %H:%M.%S" OUTPUT_VARIABLE ${CURRENT_TIME}) +ENDMACRO (GET_CURRENT_TIME) + +GET_CURRENT_TIME(BUILD_TIME) +string(REGEX REPLACE "\n" "" BUILD_TIME ${BUILD_TIME}) +message(STATUS "Build time = ${BUILD_TIME}") + +MACRO (GET_GIT_BRANCH_NAME GIT_BRANCH_NAME) + execute_process(COMMAND "git" symbolic-ref --short HEAD OUTPUT_VARIABLE ${GIT_BRANCH_NAME}) +ENDMACRO (GET_GIT_BRANCH_NAME) + +GET_GIT_BRANCH_NAME(GIT_BRANCH_NAME) +string(REGEX REPLACE "\n" "" GIT_BRANCH_NAME ${GIT_BRANCH_NAME}) + +set(MEGASEARCH_VERSION "${GIT_BRANCH_NAME}") +string(REGEX REPLACE "branch-" "" MEGASEARCH_VERSION ${MEGASEARCH_VERSION}) + +if(CMAKE_BUILD_TYPE STREQUAL "Release") + set(BUILD_TYPE "release") +else() + set(BUILD_TYPE "debug") +endif() +message(STATUS "Build type = ${BUILD_TYPE}") + +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/version.h.cf ${CMAKE_CURRENT_SOURCE_DIR}/version.h) string(REGEX MATCH "^[0-9]+\\.[0-9]+\\.[0-9]+" MEGASEARCH_BASE_VERSION "${MEGASEARCH_VERSION}") diff --git a/cpp/src/main.cpp b/cpp/src/main.cpp index 08ecb8c194..32f7c515e3 100644 --- a/cpp/src/main.cpp +++ b/cpp/src/main.cpp @@ -4,6 +4,7 @@ // Proprietary and confidential. //////////////////////////////////////////////////////////////////////////////// #include "server/Server.h" +#include "version.h" #include #include @@ -25,8 +26,8 @@ using namespace zilliz::vecwise; int main(int argc, char *argv[]) { - printf("Vecwise engine server start...\n"); -// zilliz::lib::gpu::InitMemoryAllocator(); + printf("Megasearch %s version: v%s built at %s\n", BUILD_TYPE, MEGASEARCH_VERSION, BUILD_TIME); + printf("Megasearch server start...\n"); signal(SIGINT, server::SignalUtil::HandleSignal); signal(SIGSEGV, server::SignalUtil::HandleSignal); diff --git a/cpp/version.h.cf b/cpp/version.h.cf new file mode 100644 index 0000000000..a1d61cb271 --- /dev/null +++ b/cpp/version.h.cf @@ -0,0 +1,5 @@ +#pragma once + +#define MEGASEARCH_VERSION "@MEGASEARCH_VERSION@" +#define BUILD_TYPE "@BUILD_TYPE@" +#define BUILD_TIME @BUILD_TIME@ \ No newline at end of file From 741c5e90ebb5b53e8863412d4f9e6474d61fd382 Mon Sep 17 00:00:00 2001 From: groot Date: Thu, 6 Jun 2019 10:34:10 +0800 Subject: [PATCH 10/11] automaticlly generate build version infor Former-commit-id: 7ed02faa146b614ee06327b95a8dba4154a904f9 --- cpp/CMakeLists.txt | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index d38fd73556..7100ae0179 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -23,7 +23,8 @@ GET_GIT_BRANCH_NAME(GIT_BRANCH_NAME) string(REGEX REPLACE "\n" "" GIT_BRANCH_NAME ${GIT_BRANCH_NAME}) set(MEGASEARCH_VERSION "${GIT_BRANCH_NAME}") -string(REGEX REPLACE "branch-" "" MEGASEARCH_VERSION ${MEGASEARCH_VERSION}) +string(REGEX MATCH "[0-9]+\\.[0-9]+\\.[0-9]" MEGASEARCH_VERSION "${MEGASEARCH_VERSION}") +message(STATUS "Build version = ${MEGASEARCH_VERSION}") if(CMAKE_BUILD_TYPE STREQUAL "Release") set(BUILD_TYPE "release") @@ -34,9 +35,7 @@ message(STATUS "Build type = ${BUILD_TYPE}") configure_file(${CMAKE_CURRENT_SOURCE_DIR}/version.h.cf ${CMAKE_CURRENT_SOURCE_DIR}/version.h) -string(REGEX MATCH "^[0-9]+\\.[0-9]+\\.[0-9]+" MEGASEARCH_BASE_VERSION "${MEGASEARCH_VERSION}") - -project(megasearch VERSION "${MEGASEARCH_BASE_VERSION}") +project(megasearch VERSION "${MEGASEARCH_VERSION}") project(vecwise_engine LANGUAGES CUDA CXX) set(MEGASEARCH_VERSION_MAJOR "${megasearch_VERSION_MAJOR}") From 0c8eb1a8e67fa565bcf8d6baad71d210b44fb701 Mon Sep 17 00:00:00 2001 From: groot Date: Thu, 6 Jun 2019 10:45:07 +0800 Subject: [PATCH 11/11] automaticlly generate build version infor Former-commit-id: 2998203da61d287b167e5170dbc4f8ec1f315b9a --- cpp/CHANGELOG.md | 1 + cpp/CMakeLists.txt | 2 +- cpp/{version.h.cf => version.h.macro} | 0 3 files changed, 2 insertions(+), 1 deletion(-) rename cpp/{version.h.cf => version.h.macro} (100%) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 47a5ebf6e4..8d3b2cb6f6 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -11,6 +11,7 @@ Please mark all change in change log and use the ticket from JIRA. ## New Feature - MS-57 - Implement index load/search pipeline +- MS-56 - Add version information when server is started ## Task diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 7100ae0179..7362dc7597 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -33,7 +33,7 @@ else() endif() message(STATUS "Build type = ${BUILD_TYPE}") -configure_file(${CMAKE_CURRENT_SOURCE_DIR}/version.h.cf ${CMAKE_CURRENT_SOURCE_DIR}/version.h) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/version.h.macro ${CMAKE_CURRENT_SOURCE_DIR}/version.h) project(megasearch VERSION "${MEGASEARCH_VERSION}") project(vecwise_engine LANGUAGES CUDA CXX) diff --git a/cpp/version.h.cf b/cpp/version.h.macro similarity index 100% rename from cpp/version.h.cf rename to cpp/version.h.macro