From 6c826c1308cfa0982652ccb687c87e6dd8c1b727 Mon Sep 17 00:00:00 2001 From: groot Date: Sun, 15 Mar 2020 20:45:11 +0800 Subject: [PATCH] #1648 The cache cannot be used all when the type is binary (#1648) * #1648 The cache cannot be used all when the type is binary Signed-off-by: groot * #1646 The cache cannot be used all when the type is binary Signed-off-by: groot * #1646 The cache cannot be used all when the type is binary Signed-off-by: groot --- CHANGELOG.md | 1 + core/src/db/DBImpl.cpp | 29 ++-- core/src/db/engine/ExecutionEngine.h | 5 +- core/src/db/engine/ExecutionEngineImpl.cpp | 82 +++------- core/src/db/engine/ExecutionEngineImpl.h | 5 +- core/src/scheduler/task/BuildIndexTask.cpp | 7 +- core/src/scheduler/task/SearchTask.cpp | 3 +- core/src/wrapper/VecIndex.cpp | 5 +- core/unittest/server/test_cache.cpp | 2 + sdk/examples/binary_vector/src/ClientTest.cpp | 150 +++++++++++------- 10 files changed, 147 insertions(+), 142 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b4ebd329aa..6d534d6701 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ Please mark all change in change log and use the issue from GitHub - \#1301 Data in WAL may be accidentally inserted into a new table with the same name. - \#1634 Fix search demo bug in HTTP doc - \#1635 Vectors can be returned by searching after vectors deleted if `cache_insert_data` set true +- \#1648 The cache cannot be used all when the type is binary ## Feature - \#1603 BinaryFlat add 2 Metric: Substructure and Superstructure diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 5bc7ae50ba..3a165ff360 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -381,22 +381,23 @@ DBImpl::PreloadTable(const std::string& table_id) { return Status(DB_ERROR, "Invalid engine type"); } - size += engine->PhysicalSize(); fiu_do_on("DBImpl.PreloadTable.exceed_cache", size = available_size + 1); - if (size > available_size) { - ENGINE_LOG_DEBUG << "Pre-load cancelled since cache is almost full"; - return Status(SERVER_CACHE_FULL, "Cache is full"); - } else { - try { - fiu_do_on("DBImpl.PreloadTable.engine_throw_exception", throw std::exception()); - std::string msg = "Pre-loaded file: " + file.file_id_ + " size: " + std::to_string(file.file_size_); - TimeRecorderAuto rc_1(msg); - engine->Load(true); - } catch (std::exception& ex) { - std::string msg = "Pre-load table encounter exception: " + std::string(ex.what()); - ENGINE_LOG_ERROR << msg; - return Status(DB_ERROR, msg); + + try { + fiu_do_on("DBImpl.PreloadTable.engine_throw_exception", throw std::exception()); + std::string msg = "Pre-loaded file: " + file.file_id_ + " size: " + std::to_string(file.file_size_); + TimeRecorderAuto rc_1(msg); + engine->Load(true); + + size += engine->Size(); + if (size > available_size) { + ENGINE_LOG_DEBUG << "Pre-load cancelled since cache is almost full"; + return Status(SERVER_CACHE_FULL, "Cache is full"); } + } catch (std::exception& ex) { + std::string msg = "Pre-load table encounter exception: " + std::string(ex.what()); + ENGINE_LOG_ERROR << msg; + return Status(DB_ERROR, msg); } } diff --git a/core/src/db/engine/ExecutionEngine.h b/core/src/db/engine/ExecutionEngine.h index 3fe6be9576..e7739d4d53 100644 --- a/core/src/db/engine/ExecutionEngine.h +++ b/core/src/db/engine/ExecutionEngine.h @@ -60,14 +60,11 @@ class ExecutionEngine { virtual size_t Count() const = 0; - virtual size_t - Size() const = 0; - virtual size_t Dimension() const = 0; virtual size_t - PhysicalSize() const = 0; + Size() const = 0; virtual Status Serialize() = 0; diff --git a/core/src/db/engine/ExecutionEngineImpl.cpp b/core/src/db/engine/ExecutionEngineImpl.cpp index 1687626b24..fc0cdda33f 100644 --- a/core/src/db/engine/ExecutionEngineImpl.cpp +++ b/core/src/db/engine/ExecutionEngineImpl.cpp @@ -330,15 +330,6 @@ ExecutionEngineImpl::Count() const { return index_->Count(); } -size_t -ExecutionEngineImpl::Size() const { - if (IsBinaryIndexType(index_->GetType())) { - return (size_t)(Count() * Dimension() / 8); - } else { - return (size_t)(Count() * Dimension()) * sizeof(float); - } -} - size_t ExecutionEngineImpl::Dimension() const { if (index_ == nullptr) { @@ -349,8 +340,12 @@ ExecutionEngineImpl::Dimension() const { } size_t -ExecutionEngineImpl::PhysicalSize() const { - return server::CommonUtil::GetFileSize(location_); +ExecutionEngineImpl::Size() const { + if (index_ == nullptr) { + ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return size 0"; + return 0; + } + return index_->Size(); } Status @@ -359,7 +354,7 @@ ExecutionEngineImpl::Serialize() { // here we reset index size by file size, // since some index type(such as SQ8) data size become smaller after serialized - index_->set_size(PhysicalSize()); + index_->set_size(server::CommonUtil::GetFileSize(location_)); ENGINE_LOG_DEBUG << "Finish serialize index file: " << location_ << " size: " << index_->Size(); if (index_->Size() == 0) { @@ -370,36 +365,6 @@ ExecutionEngineImpl::Serialize() { return status; } -/* -Status -ExecutionEngineImpl::Load(bool to_cache) { - index_ = std::static_pointer_cast(cache::CpuCacheMgr::GetInstance()->GetIndex(location_)); - bool already_in_cache = (index_ != nullptr); - if (!already_in_cache) { - try { - double physical_size = PhysicalSize(); - server::CollectExecutionEngineMetrics metrics(physical_size); - index_ = read_index(location_); - if (index_ == nullptr) { - std::string msg = "Failed to load index from " + location_; - ENGINE_LOG_ERROR << msg; - return Status(DB_ERROR, msg); - } else { - ENGINE_LOG_DEBUG << "Disk io from: " << location_; - } - } catch (std::exception& e) { - ENGINE_LOG_ERROR << e.what(); - return Status(DB_ERROR, e.what()); - } - } - - if (!already_in_cache && to_cache) { - Cache(); - } - return Status::OK(); -} -*/ - Status ExecutionEngineImpl::Load(bool to_cache) { // TODO(zhiru): refactor @@ -460,10 +425,6 @@ ExecutionEngineImpl::Load(bool to_cache) { status = std::static_pointer_cast(index_)->AddWithoutIds(vectors->GetCount(), float_vectors.data(), Config()); status = std::static_pointer_cast(index_)->SetBlacklist(concurrent_bitset_ptr); - - int64_t index_size = vectors->GetCount() * dim_ * sizeof(float); - int64_t bitset_size = vectors->GetCount() / 8; - index_->set_size(index_size + bitset_size); } else if (index_type_ == EngineType::FAISS_BIN_IDMAP) { ec = std::static_pointer_cast(index_)->Build(conf); if (ec != KNOWHERE_SUCCESS) { @@ -472,11 +433,12 @@ ExecutionEngineImpl::Load(bool to_cache) { status = std::static_pointer_cast(index_)->AddWithoutIds(vectors->GetCount(), vectors_data.data(), Config()); status = std::static_pointer_cast(index_)->SetBlacklist(concurrent_bitset_ptr); - - int64_t index_size = vectors->GetCount() * dim_ * sizeof(uint8_t); - int64_t bitset_size = vectors->GetCount() / 8; - index_->set_size(index_size + bitset_size); } + + int64_t index_size = vectors->Size(); // vector data size + vector ids size + int64_t bitset_size = vectors->GetCount(); // delete list size + index_->set_size(index_size + bitset_size); + if (!status.ok()) { return status; } @@ -485,8 +447,8 @@ ExecutionEngineImpl::Load(bool to_cache) { } else { try { - double physical_size = PhysicalSize(); - server::CollectExecutionEngineMetrics metrics(physical_size); + // size_t physical_size = PhysicalSize(); + // server::CollectExecutionEngineMetrics metrics((double)physical_size); index_ = read_index(location_); if (index_ == nullptr) { @@ -518,6 +480,10 @@ ExecutionEngineImpl::Load(bool to_cache) { index_->SetUids(uids); ENGINE_LOG_DEBUG << "set uids " << index_->GetUids().size() << " for index " << location_; + int64_t index_size = index_->Size(); // vector data size + vector ids size + int64_t bitset_size = index_->Count(); // delete list size + index_->set_size(index_size + bitset_size); + ENGINE_LOG_DEBUG << "Finished loading index file from segment " << segment_dir; } } catch (std::exception& e) { @@ -619,10 +585,12 @@ Status ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) { #ifdef MILVUS_GPU_VERSION // the ToIndexData is only a placeholder, cpu-copy-to-gpu action is performed in - gpu_num_ = device_id; - auto to_index_data = std::make_shared(PhysicalSize()); - cache::DataObjPtr obj = std::static_pointer_cast(to_index_data); - milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_ + "_placeholder", obj); + if (index_) { + gpu_num_ = device_id; + auto to_index_data = std::make_shared(index_->Size()); + cache::DataObjPtr obj = std::static_pointer_cast(to_index_data); + milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_ + "_placeholder", obj); + } #endif return Status::OK(); } @@ -765,7 +733,7 @@ ExecutionEngineImpl::BuildIndex(const std::string& location, EngineType engine_t throw Exception(DB_ERROR, status.message()); } - ENGINE_LOG_DEBUG << "Finish build index file: " << location << " size: " << to_index->Size(); + ENGINE_LOG_DEBUG << "Finish build index: " << location; return std::make_shared(to_index, location, engine_type, metric_type_, index_params_); } diff --git a/core/src/db/engine/ExecutionEngineImpl.h b/core/src/db/engine/ExecutionEngineImpl.h index d0d55b2f10..2ab781f4bc 100644 --- a/core/src/db/engine/ExecutionEngineImpl.h +++ b/core/src/db/engine/ExecutionEngineImpl.h @@ -41,14 +41,11 @@ class ExecutionEngineImpl : public ExecutionEngine { size_t Count() const override; - size_t - Size() const override; - size_t Dimension() const override; size_t - PhysicalSize() const override; + Size() const override; Status Serialize() override; diff --git a/core/src/scheduler/task/BuildIndexTask.cpp b/core/src/scheduler/task/BuildIndexTask.cpp index feec750e5a..2fa02567fe 100644 --- a/core/src/scheduler/task/BuildIndexTask.cpp +++ b/core/src/scheduler/task/BuildIndexTask.cpp @@ -22,6 +22,7 @@ #include "db/engine/EngineFactory.h" #include "metrics/Metrics.h" #include "scheduler/job/BuildIndexJob.h" +#include "utils/CommonUtil.h" #include "utils/Exception.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" @@ -95,7 +96,7 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) { return; } - size_t file_size = to_index_engine_->PhysicalSize(); + size_t file_size = to_index_engine_->Size(); std::string info = "Build index task load file id:" + std::to_string(file_->id_) + " " + type_str + " file type:" + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) + @@ -207,7 +208,7 @@ XBuildIndexTask::Execute() { // step 6: update meta table_file.file_type_ = engine::meta::TableFileSchema::INDEX; - table_file.file_size_ = index->PhysicalSize(); + table_file.file_size_ = server::CommonUtil::GetFileSize(table_file.location_); table_file.row_count_ = file_->row_count_; // index->Count(); auto origin_file = *file_; @@ -221,7 +222,7 @@ XBuildIndexTask::Execute() { fiu_do_on("XBuildIndexTask.Execute.update_table_file_fail", status = Status(SERVER_UNEXPECTED_ERROR, "")); if (status.ok()) { - ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " << index->PhysicalSize() + ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " << table_file.file_size_ << " bytes" << " from file " << origin_file.file_id_; if (build_index_job->options().insert_cache_immediately_) { diff --git a/core/src/scheduler/task/SearchTask.cpp b/core/src/scheduler/task/SearchTask.cpp index 857965dd2d..12b79818bd 100644 --- a/core/src/scheduler/task/SearchTask.cpp +++ b/core/src/scheduler/task/SearchTask.cpp @@ -25,6 +25,7 @@ #include "scheduler/SchedInst.h" #include "scheduler/job/SearchJob.h" #include "segment/SegmentReader.h" +#include "utils/CommonUtil.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" #include "utils/ValidationUtil.h" @@ -181,7 +182,7 @@ XSearchTask::Load(LoadType type, uint8_t device_id) { return; } - size_t file_size = index_engine_->PhysicalSize(); + size_t file_size = index_engine_->Size(); std::string info = "Search task load file id:" + std::to_string(file_->id_) + " " + type_str + " file type:" + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) + diff --git a/core/src/wrapper/VecIndex.cpp b/core/src/wrapper/VecIndex.cpp index a81dfe52e8..868567ca36 100644 --- a/core/src/wrapper/VecIndex.cpp +++ b/core/src/wrapper/VecIndex.cpp @@ -50,10 +50,7 @@ namespace engine { int64_t VecIndex::Size() { - if (size_ != 0) { - return size_; - } - return Count() * Dimension() * sizeof(float); + return size_; } void diff --git a/core/unittest/server/test_cache.cpp b/core/unittest/server/test_cache.cpp index 489498dbca..f4e85a64a0 100644 --- a/core/unittest/server/test_cache.cpp +++ b/core/unittest/server/test_cache.cpp @@ -36,6 +36,8 @@ class LessItemCacheMgr : public milvus::cache::CacheMgr& entity_array, @@ -59,7 +39,7 @@ BuildBinaryVectors(int64_t from, int64_t to, std::vector& entity entity_array.clear(); entity_ids.clear(); - int64_t dim_byte = dimension/8; + int64_t dim_byte = dimension / 8; for (int64_t k = from; k < to; k++) { milvus::Entity entity; entity.binary_data.resize(dim_byte); @@ -72,29 +52,16 @@ BuildBinaryVectors(int64_t from, int64_t to, std::vector& entity } } -} // namespace - void -ClientTest::Test(const std::string& address, const std::string& port) { - std::shared_ptr conn = milvus::Connection::Create(); - +TestProcess(std::shared_ptr connection, + const milvus::CollectionParam& collection_param, + const milvus::IndexParam& index_param) { milvus::Status stat; - { // connect server - milvus::ConnectParam param = {address, port}; - stat = conn->Connect(param); - std::cout << "Connect function call status: " << stat.message() << std::endl; - } { // create collection - milvus::CollectionParam collection_param = BuildCollectionParam(); - stat = conn->CreateCollection(collection_param); + stat = connection->CreateCollection(collection_param); std::cout << "CreateCollection function call status: " << stat.message() << std::endl; milvus_sdk::Utils::PrintCollectionParam(collection_param); - - bool has_collection = conn->HasCollection(collection_param.collection_name); - if (has_collection) { - std::cout << "Collection is created" << std::endl; - } } std::vector> search_entity_array; @@ -109,7 +76,7 @@ ClientTest::Test(const std::string& address, const std::string& port) { begin_index + BATCH_ENTITY_COUNT, entity_array, entity_ids, - COLLECTION_DIMENSION); + collection_param.dimension); } if (search_entity_array.size() < NQ) { @@ -118,49 +85,122 @@ ClientTest::Test(const std::string& address, const std::string& port) { std::string title = "Insert " + std::to_string(entity_array.size()) + " entities No." + std::to_string(i); milvus_sdk::TimeRecorder rc(title); - stat = conn->Insert(COLLECTION_NAME, "", entity_array, entity_ids); + stat = connection->Insert(collection_param.collection_name, "", entity_array, entity_ids); std::cout << "Insert function call status: " << stat.message() << std::endl; std::cout << "Returned id array count: " << entity_ids.size() << std::endl; } } { // flush buffer - stat = conn->FlushCollection(COLLECTION_NAME); + stat = connection->FlushCollection(collection_param.collection_name); std::cout << "FlushCollection function call status: " << stat.message() << std::endl; } { // search vectors std::vector partition_tags; milvus::TopKQueryResult topk_query_result; - milvus_sdk::Utils::DoSearch(conn, COLLECTION_NAME, partition_tags, TOP_K, NPROBE, search_entity_array, + milvus_sdk::Utils::DoSearch(connection, + collection_param.collection_name, + partition_tags, + TOP_K, + NPROBE, + search_entity_array, topk_query_result); } { // wait unit build index finish milvus_sdk::TimeRecorder rc("Create index"); std::cout << "Wait until create all index done" << std::endl; - milvus::IndexParam index1 = BuildIndexParam(); - milvus_sdk::Utils::PrintIndexParam(index1); - stat = conn->CreateIndex(index1); + milvus_sdk::Utils::PrintIndexParam(index_param); + stat = connection->CreateIndex(index_param); std::cout << "CreateIndex function call status: " << stat.message() << std::endl; - - milvus::IndexParam index2; - stat = conn->DescribeIndex(COLLECTION_NAME, index2); - std::cout << "DescribeIndex function call status: " << stat.message() << std::endl; - milvus_sdk::Utils::PrintIndexParam(index2); } { // search vectors std::vector partition_tags; milvus::TopKQueryResult topk_query_result; - milvus_sdk::Utils::DoSearch(conn, COLLECTION_NAME, partition_tags, TOP_K, NPROBE, search_entity_array, + milvus_sdk::Utils::DoSearch(connection, + collection_param.collection_name, + partition_tags, + TOP_K, + NPROBE, + search_entity_array, topk_query_result); } { // drop collection - stat = conn->DropCollection(COLLECTION_NAME); + stat = connection->DropCollection(collection_param.collection_name); std::cout << "DropCollection function call status: " << stat.message() << std::endl; } - - milvus::Connection::Destroy(conn); +} + +} // namespace + +void +ClientTest::Test(const std::string& address, const std::string& port) { + std::shared_ptr connection = milvus::Connection::Create(); + { // connect server + milvus::ConnectParam param = {address, port}; + auto stat = connection->Connect(param); + std::cout << "Connect function call status: " << stat.message() << std::endl; + if (!stat.ok()) { + return; + } + } + + { + milvus::CollectionParam collection_param = { + "collection_1", + 512, + 256, + milvus::MetricType::TANIMOTO + }; + + JSON json_params = {{"nlist", 1024}}; + milvus::IndexParam index_param = { + collection_param.collection_name, + milvus::IndexType::IVFFLAT, + json_params.dump() + }; + + TestProcess(connection, collection_param, index_param); + } + + { + milvus::CollectionParam collection_param = { + "collection_2", + 256, + 512, + milvus::MetricType::SUBSTRUCTURE + }; + + JSON json_params = {{"nlist", 2048}}; + milvus::IndexParam index_param = { + collection_param.collection_name, + milvus::IndexType::FLAT, + json_params.dump() + }; + + TestProcess(connection, collection_param, index_param); + } + + { + milvus::CollectionParam collection_param = { + "collection_3", + 128, + 1024, + milvus::MetricType::SUPERSTRUCTURE + }; + + JSON json_params = {{"nlist", 4092}}; + milvus::IndexParam index_param = { + collection_param.collection_name, + milvus::IndexType::FLAT, + json_params.dump() + }; + + TestProcess(connection, collection_param, index_param); + } + + milvus::Connection::Destroy(connection); }