From a9629951e4f23c0690c7bc924c89031f09fd5710 Mon Sep 17 00:00:00 2001 From: yukun Date: Thu, 11 Jun 2020 18:45:14 +0800 Subject: [PATCH] Improve ut coverage (#2516) (#2522) * Improve ut coverage Signed-off-by: fishpenguin * Delete unused code Signed-off-by: fishpenguin * Add fiu in HybridSearchRequest Signed-off-by: fishpenguin * Update helm config Signed-off-by: JinHai-CN * Change BinaryQuery validation check Signed-off-by: fishpenguin * code format Signed-off-by: fishpenguin * code format Signed-off-by: fishpenguin * code format Signed-off-by: fishpenguin Co-authored-by: JinHai-CN --- .../src/codecs/default/DefaultAttrsFormat.cpp | 17 +- .../default/DefaultIdBloomFilterFormat.cpp | 2 + core/src/db/DBImpl.cpp | 196 ++++---- core/src/db/DBImpl.h | 4 +- core/src/db/insert/MemManagerImpl.cpp | 3 +- core/src/db/meta/Meta.h | 3 - core/src/db/meta/MySQLMetaImpl.cpp | 9 +- core/src/db/meta/MySQLMetaImpl.h | 3 - core/src/db/meta/SqliteMetaImpl.cpp | 422 +++++++----------- core/src/db/meta/SqliteMetaImpl.h | 3 - core/src/query/BinaryQuery.cpp | 2 +- core/src/segment/Attr.cpp | 50 +-- core/src/segment/Attr.h | 20 +- core/src/segment/SegmentWriter.cpp | 23 +- .../CreateHybridCollectionRequest.cpp | 4 + .../DescribeHybridCollectionRequest.cpp | 2 + .../server/grpc_impl/GrpcRequestHandler.cpp | 14 + core/src/utils/LogUtil.cpp | 2 +- core/unittest/db/test_db.cpp | 38 ++ core/unittest/db/test_hybrid_db.cpp | 245 ++++++++-- core/unittest/db/test_meta.cpp | 17 + core/unittest/db/test_meta_mysql.cpp | 14 + core/unittest/server/test_rpc.cpp | 158 ++++++- 23 files changed, 787 insertions(+), 464 deletions(-) diff --git a/core/src/codecs/default/DefaultAttrsFormat.cpp b/core/src/codecs/default/DefaultAttrsFormat.cpp index 27c6cbe2da..7fce91a0a5 100644 --- a/core/src/codecs/default/DefaultAttrsFormat.cpp +++ b/core/src/codecs/default/DefaultAttrsFormat.cpp @@ -18,6 +18,7 @@ #include "codecs/default/DefaultAttrsFormat.h" #include +#include #include #include #include @@ -34,7 +35,9 @@ namespace codec { void DefaultAttrsFormat::read_attrs_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, off_t offset, size_t num, std::vector& raw_attrs, size_t& nbytes) { - if (!fs_ptr->reader_ptr_->open(file_path.c_str())) { + auto open_res = fs_ptr->reader_ptr_->open(file_path.c_str()); + fiu_do_on("read_attrs_internal_open_file_fail", open_res = false); + if (!open_res) { std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno); LOG_ENGINE_ERROR_ << err_msg; throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); @@ -56,7 +59,9 @@ DefaultAttrsFormat::read_attrs_internal(const storage::FSHandlerPtr& fs_ptr, con void DefaultAttrsFormat::read_uids_internal(const storage::FSHandlerPtr& fs_ptr, const std::string& file_path, std::vector& uids) { - if (!fs_ptr->reader_ptr_->open(file_path.c_str())) { + auto open_res = fs_ptr->reader_ptr_->open(file_path.c_str()); + fiu_do_on("read_uids_internal_open_file_fail", open_res = false); + if (!open_res) { std::string err_msg = "Failed to open file: " + file_path + ", error: " + std::strerror(errno); LOG_ENGINE_ERROR_ << err_msg; throw Exception(SERVER_CANNOT_CREATE_FILE, err_msg); @@ -76,7 +81,9 @@ DefaultAttrsFormat::read(const milvus::storage::FSHandlerPtr& fs_ptr, milvus::se const std::lock_guard lock(mutex_); std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); - if (!boost::filesystem::is_directory(dir_path)) { + auto is_directory = boost::filesystem::is_directory(dir_path); + fiu_do_on("read_id_directory_false", is_directory = false); + if (!is_directory) { std::string err_msg = "Directory: " + dir_path + "does not exist"; LOG_ENGINE_ERROR_ << err_msg; throw Exception(SERVER_INVALID_ARGUMENT, err_msg); @@ -190,7 +197,9 @@ DefaultAttrsFormat::read_uids(const milvus::storage::FSHandlerPtr& fs_ptr, std:: const std::lock_guard lock(mutex_); std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); - if (!boost::filesystem::is_directory(dir_path)) { + auto is_directory = boost::filesystem::is_directory(dir_path); + fiu_do_on("is_directory_false", is_directory = false); + if (!is_directory) { std::string err_msg = "Directory: " + dir_path + "does not exist"; LOG_ENGINE_ERROR_ << err_msg; throw Exception(SERVER_INVALID_ARGUMENT, err_msg); diff --git a/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp b/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp index 3222eac631..fb2eb9e535 100644 --- a/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp +++ b/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp @@ -17,6 +17,7 @@ #include "codecs/default/DefaultIdBloomFilterFormat.h" +#include #include #include @@ -37,6 +38,7 @@ DefaultIdBloomFilterFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::I const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_; scaling_bloom_t* bloom_filter = new_scaling_bloom_from_file(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str()); + fiu_do_on("bloom_filter_nullptr", bloom_filter = nullptr); if (bloom_filter == nullptr) { std::string err_msg = "Failed to read bloom filter from file: " + bloom_filter_file_path + ". " + std::strerror(errno); diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 3a3e41dc61..e1d689caba 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -168,6 +168,7 @@ DBImpl::Start() { } // background metric thread + fiu_do_on("options_metric_enable", options_.metric_enable_ = true); if (options_.metric_enable_) { bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this); } @@ -1042,6 +1043,7 @@ DBImpl::Flush() { LOG_ENGINE_DEBUG_ << "Begin flush all collections"; Status status; + fiu_do_on("options_wal_enable_false", options_.wal_enable_ = false); if (options_.wal_enable_) { LOG_ENGINE_DEBUG_ << "WAL flush"; auto lsn = wal_mgr_->Flush(); @@ -1401,7 +1403,10 @@ DBImpl::GetVectorsByIdHelper(const IDNumbers& id_array, std::vector& merge_collection_ids, bool f // LOG_ENGINE_DEBUG_ << "End StartMergeTask"; } -Status -DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& files_holder) { - // const std::lock_guard lock(flush_merge_compact_mutex_); - - LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id; - - // step 1: create table file - meta::SegmentSchema table_file; - table_file.collection_id_ = collection_id; - table_file.file_type_ = meta::SegmentSchema::NEW_MERGE; - Status status = meta_ptr_->CreateHybridCollectionFile(table_file); - - if (!status.ok()) { - LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString(); - return status; - } - - // step 2: merge files - /* - ExecutionEnginePtr index = - EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_, - (MetricType)table_file.metric_type_, table_file.nlist_); -*/ - meta::SegmentsSchema updated; - - std::string new_segment_dir; - utils::GetParentPath(table_file.location_, new_segment_dir); - auto segment_writer_ptr = std::make_shared(new_segment_dir); - - // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal - milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles(); - for (auto& file : files) { - server::CollectMergeFilesMetrics metrics; - std::string segment_dir_to_merge; - utils::GetParentPath(file.location_, segment_dir_to_merge); - segment_writer_ptr->Merge(segment_dir_to_merge, table_file.file_id_); - - files_holder.UnmarkFile(file); - - auto file_schema = file; - file_schema.file_type_ = meta::SegmentSchema::TO_DELETE; - updated.push_back(file_schema); - int64_t size = segment_writer_ptr->Size(); - if (size >= file_schema.index_file_size_) { - break; - } - } - - // step 3: serialize to disk - try { - status = segment_writer_ptr->Serialize(); - fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception()); - fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, "")); - } catch (std::exception& ex) { - std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what()); - LOG_ENGINE_ERROR_ << msg; - status = Status(DB_ERROR, msg); - } - - if (!status.ok()) { - LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << status.message(); - - // if failed to serialize merge file to disk - // typical error: out of disk space, out of memory or permission denied - table_file.file_type_ = meta::SegmentSchema::TO_DELETE; - status = meta_ptr_->UpdateCollectionFile(table_file); - LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; - - return status; - } - - // step 4: update table files state - // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size - // else set file type to RAW, no need to build index - if (!utils::IsRawIndexType(table_file.engine_type_)) { - table_file.file_type_ = (segment_writer_ptr->Size() >= (size_t)(table_file.index_file_size_)) - ? meta::SegmentSchema::TO_INDEX - : meta::SegmentSchema::RAW; - } else { - table_file.file_type_ = meta::SegmentSchema::RAW; - } - table_file.file_size_ = segment_writer_ptr->Size(); - table_file.row_count_ = segment_writer_ptr->VectorCount(); - updated.push_back(table_file); - status = meta_ptr_->UpdateCollectionFiles(updated); - LOG_ENGINE_DEBUG_ << "New merged segment " << table_file.segment_id_ << " of size " << segment_writer_ptr->Size() - << " bytes"; - - if (options_.insert_cache_immediately_) { - segment_writer_ptr->Cache(); - } - - return status; -} +// Status +// DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& files_holder) { +// // const std::lock_guard lock(flush_merge_compact_mutex_); +// +// LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id; +// +// // step 1: create table file +// meta::SegmentSchema table_file; +// table_file.collection_id_ = collection_id; +// table_file.file_type_ = meta::SegmentSchema::NEW_MERGE; +// Status status = meta_ptr_->CreateHybridCollectionFile(table_file); +// +// if (!status.ok()) { +// LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString(); +// return status; +// } +// +// // step 2: merge files +// /* +// ExecutionEnginePtr index = +// EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_, +// (MetricType)table_file.metric_type_, table_file.nlist_); +//*/ +// meta::SegmentsSchema updated; +// +// std::string new_segment_dir; +// utils::GetParentPath(table_file.location_, new_segment_dir); +// auto segment_writer_ptr = std::make_shared(new_segment_dir); +// +// // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal +// milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles(); +// for (auto& file : files) { +// server::CollectMergeFilesMetrics metrics; +// std::string segment_dir_to_merge; +// utils::GetParentPath(file.location_, segment_dir_to_merge); +// segment_writer_ptr->Merge(segment_dir_to_merge, table_file.file_id_); +// +// files_holder.UnmarkFile(file); +// +// auto file_schema = file; +// file_schema.file_type_ = meta::SegmentSchema::TO_DELETE; +// updated.push_back(file_schema); +// int64_t size = segment_writer_ptr->Size(); +// if (size >= file_schema.index_file_size_) { +// break; +// } +// } +// +// // step 3: serialize to disk +// try { +// status = segment_writer_ptr->Serialize(); +// fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception()); +// fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, "")); +// } catch (std::exception& ex) { +// std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what()); +// LOG_ENGINE_ERROR_ << msg; +// status = Status(DB_ERROR, msg); +// } +// +// if (!status.ok()) { +// LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << +// status.message(); +// +// // if failed to serialize merge file to disk +// // typical error: out of disk space, out of memory or permission denied +// table_file.file_type_ = meta::SegmentSchema::TO_DELETE; +// status = meta_ptr_->UpdateCollectionFile(table_file); +// LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; +// +// return status; +// } +// +// // step 4: update table files state +// // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size +// // else set file type to RAW, no need to build index +// if (!utils::IsRawIndexType(table_file.engine_type_)) { +// table_file.file_type_ = (segment_writer_ptr->Size() >= (size_t)(table_file.index_file_size_)) +// ? meta::SegmentSchema::TO_INDEX +// : meta::SegmentSchema::RAW; +// } else { +// table_file.file_type_ = meta::SegmentSchema::RAW; +// } +// table_file.file_size_ = segment_writer_ptr->Size(); +// table_file.row_count_ = segment_writer_ptr->VectorCount(); +// updated.push_back(table_file); +// status = meta_ptr_->UpdateCollectionFiles(updated); +// LOG_ENGINE_DEBUG_ << "New merged segment " << table_file.segment_id_ << " of size " << segment_writer_ptr->Size() +// << " bytes"; +// +// if (options_.insert_cache_immediately_) { +// segment_writer_ptr->Cache(); +// } +// +// return status; +//} void DBImpl::BackgroundMerge(std::set collection_ids, bool force_merge_all) { diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index ca268d847d..08faa26f7f 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -238,8 +238,8 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi void BackgroundMerge(std::set collection_ids, bool force_merge_all); - Status - MergeHybridFiles(const std::string& table_id, meta::FilesHolder& files_holder); + // Status + // MergeHybridFiles(const std::string& table_id, meta::FilesHolder& files_holder); void StartBuildIndexTask(); diff --git a/core/src/db/insert/MemManagerImpl.cpp b/core/src/db/insert/MemManagerImpl.cpp index 9de533b118..00bf712436 100644 --- a/core/src/db/insert/MemManagerImpl.cpp +++ b/core/src/db/insert/MemManagerImpl.cpp @@ -11,6 +11,7 @@ #include "db/insert/MemManagerImpl.h" +#include #include #include "VectorSource.h" @@ -36,9 +37,9 @@ MemManagerImpl::InsertVectors(const std::string& collection_id, int64_t length, const float* vectors, uint64_t lsn, std::set& flushed_tables) { flushed_tables.clear(); if (GetCurrentMem() > options_.insert_buffer_size_) { - LOG_ENGINE_DEBUG_ << "Insert buffer size exceeds limit. Performing force flush"; // TODO(zhiru): Don't apply delete here in order to avoid possible concurrency issues with Merge auto status = Flush(flushed_tables, false); + fiu_do_on("MemManagerImpl::InsertVectors_flush_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { return status; } diff --git a/core/src/db/meta/Meta.h b/core/src/db/meta/Meta.h index a715f31487..7261341a6a 100644 --- a/core/src/db/meta/Meta.h +++ b/core/src/db/meta/Meta.h @@ -174,9 +174,6 @@ class Meta { virtual Status DescribeHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) = 0; - - virtual Status - CreateHybridCollectionFile(SegmentSchema& file_schema) = 0; }; // MetaData using MetaPtr = std::shared_ptr; diff --git a/core/src/db/meta/MySQLMetaImpl.cpp b/core/src/db/meta/MySQLMetaImpl.cpp index 3e1f3de7b5..a5b296dce1 100644 --- a/core/src/db/meta/MySQLMetaImpl.cpp +++ b/core/src/db/meta/MySQLMetaImpl.cpp @@ -2186,8 +2186,8 @@ MySQLMetaImpl::FilesByTypeEx(const std::vector& collecti mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_); bool is_null_connection = (connectionPtr == nullptr); - fiu_do_on("MySQLMetaImpl.FilesByType.null_connection", is_null_connection = true); - fiu_do_on("MySQLMetaImpl.FilesByType.throw_exception", throw std::exception();); + fiu_do_on("MySQLMetaImpl.FilesByTypeEx.null_connection", is_null_connection = true); + fiu_do_on("MySQLMetaImpl.FilesByTypeEx.throw_exception", throw std::exception();); if (is_null_connection) { return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } @@ -3204,11 +3204,6 @@ MySQLMetaImpl::DescribeHybridCollection(CollectionSchema& collection_schema, hyb return Status::OK(); } -Status -MySQLMetaImpl::CreateHybridCollectionFile(milvus::engine::meta::SegmentSchema& file_schema) { - return Status::OK(); -} - } // namespace meta } // namespace engine } // namespace milvus diff --git a/core/src/db/meta/MySQLMetaImpl.h b/core/src/db/meta/MySQLMetaImpl.h index dce46e637c..44251de119 100644 --- a/core/src/db/meta/MySQLMetaImpl.h +++ b/core/src/db/meta/MySQLMetaImpl.h @@ -161,9 +161,6 @@ class MySQLMetaImpl : public Meta { Status DescribeHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) override; - Status - CreateHybridCollectionFile(SegmentSchema& file_schema) override; - private: Status NextFileId(std::string& file_id); diff --git a/core/src/db/meta/SqliteMetaImpl.cpp b/core/src/db/meta/SqliteMetaImpl.cpp index 87cf7a6556..d53162a901 100644 --- a/core/src/db/meta/SqliteMetaImpl.cpp +++ b/core/src/db/meta/SqliteMetaImpl.cpp @@ -49,7 +49,7 @@ namespace { constexpr uint64_t SQL_BATCH_SIZE = 50; -template +template void DistributeBatch(const T& id_array, std::vector>& id_groups) { std::vector temp_group; @@ -86,8 +86,7 @@ StoragePrototype(const std::string& path) { return make_storage( path, make_table(META_ENVIRONMENT, make_column("global_lsn", &EnvironmentSchema::global_lsn_, default_value(0))), - make_table(META_TABLES, - make_column("id", &CollectionSchema::id_, primary_key()), + make_table(META_TABLES, make_column("id", &CollectionSchema::id_, primary_key()), make_column("table_id", &CollectionSchema::collection_id_, unique()), make_column("state", &CollectionSchema::state_), make_column("dimension", &CollectionSchema::dimension_), @@ -210,9 +209,9 @@ SqliteMetaImpl::CreateCollection(CollectionSchema& collection_schema) { NextCollectionId(collection_schema.collection_id_); } else { fiu_do_on("SqliteMetaImpl.CreateCollection.throw_exception", throw std::exception()); - auto collection = ConnectorPtr->select(columns(&CollectionSchema::state_), - where(c(&CollectionSchema::collection_id_) - == collection_schema.collection_id_)); + auto collection = + ConnectorPtr->select(columns(&CollectionSchema::state_), + where(c(&CollectionSchema::collection_id_) == collection_schema.collection_id_)); if (collection.size() == 1) { if (CollectionSchema::TO_DELETE == std::get<0>(collection[0])) { return Status(DB_ERROR, @@ -252,19 +251,11 @@ SqliteMetaImpl::DescribeCollection(CollectionSchema& collection_schema) { std::lock_guard meta_lock(meta_mutex_); fiu_do_on("SqliteMetaImpl.DescribeCollection.throw_exception", throw std::exception()); auto groups = ConnectorPtr->select( - columns(&CollectionSchema::id_, - &CollectionSchema::state_, - &CollectionSchema::dimension_, - &CollectionSchema::created_on_, - &CollectionSchema::flag_, - &CollectionSchema::index_file_size_, - &CollectionSchema::engine_type_, - &CollectionSchema::index_params_, - &CollectionSchema::metric_type_, - &CollectionSchema::owner_collection_, - &CollectionSchema::partition_tag_, - &CollectionSchema::version_, - &CollectionSchema::flush_lsn_), + columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, + &CollectionSchema::created_on_, &CollectionSchema::flag_, &CollectionSchema::index_file_size_, + &CollectionSchema::engine_type_, &CollectionSchema::index_params_, &CollectionSchema::metric_type_, + &CollectionSchema::owner_collection_, &CollectionSchema::partition_tag_, + &CollectionSchema::version_, &CollectionSchema::flush_lsn_), where(c(&CollectionSchema::collection_id_) == collection_schema.collection_id_ and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); @@ -307,14 +298,13 @@ SqliteMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not decltype(ConnectorPtr->select(select_columns)) selected; if (is_root) { selected = ConnectorPtr->select(select_columns, - where(c(&CollectionSchema::collection_id_) == collection_id - and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE - and c(&CollectionSchema::owner_collection_) == "")); + where(c(&CollectionSchema::collection_id_) == collection_id and + c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE and + c(&CollectionSchema::owner_collection_) == "")); } else { selected = ConnectorPtr->select(select_columns, - where(c(&CollectionSchema::collection_id_) == collection_id - and c(&CollectionSchema::state_) - != (int)CollectionSchema::TO_DELETE)); + where(c(&CollectionSchema::collection_id_) == collection_id and + c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); } if (selected.size() == 1) { @@ -337,25 +327,18 @@ SqliteMetaImpl::AllCollections(std::vector& collection_schema_ // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); - auto select_columns = columns(&CollectionSchema::id_, - &CollectionSchema::collection_id_, - &CollectionSchema::dimension_, - &CollectionSchema::created_on_, - &CollectionSchema::flag_, - &CollectionSchema::index_file_size_, - &CollectionSchema::engine_type_, - &CollectionSchema::index_params_, - &CollectionSchema::metric_type_, - &CollectionSchema::owner_collection_, - &CollectionSchema::partition_tag_, - &CollectionSchema::version_, - &CollectionSchema::flush_lsn_); + auto select_columns = + columns(&CollectionSchema::id_, &CollectionSchema::collection_id_, &CollectionSchema::dimension_, + &CollectionSchema::created_on_, &CollectionSchema::flag_, &CollectionSchema::index_file_size_, + &CollectionSchema::engine_type_, &CollectionSchema::index_params_, &CollectionSchema::metric_type_, + &CollectionSchema::owner_collection_, &CollectionSchema::partition_tag_, + &CollectionSchema::version_, &CollectionSchema::flush_lsn_); decltype(ConnectorPtr->select(select_columns)) selected; if (is_root) { selected = ConnectorPtr->select(select_columns, - where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE - and c(&CollectionSchema::owner_collection_) == "")); + where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE and + c(&CollectionSchema::owner_collection_) == "")); } else { selected = ConnectorPtr->select(select_columns, where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); @@ -403,10 +386,9 @@ SqliteMetaImpl::DropCollections(const std::vector& collection_id_ar for (auto group : id_groups) { // soft delete collection - ConnectorPtr->update_all( - set(c(&CollectionSchema::state_) = (int)CollectionSchema::TO_DELETE), - where(in(&CollectionSchema::collection_id_, group) - and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + ConnectorPtr->update_all(set(c(&CollectionSchema::state_) = (int)CollectionSchema::TO_DELETE), + where(in(&CollectionSchema::collection_id_, group) and + c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); } } @@ -509,23 +491,18 @@ SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std:: return status; } - auto select_columns = columns(&SegmentSchema::id_, - &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, - &SegmentSchema::file_type_, - &SegmentSchema::file_size_, - &SegmentSchema::row_count_, - &SegmentSchema::date_, - &SegmentSchema::engine_type_, - &SegmentSchema::created_on_); + auto select_columns = + columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, + &SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_, + &SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_); decltype(ConnectorPtr->select(select_columns)) selected; { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); - selected = ConnectorPtr->select(select_columns, - where(c(&SegmentSchema::collection_id_) == collection_id - and in(&SegmentSchema::id_, ids) and - c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE)); + selected = ConnectorPtr->select( + select_columns, + where(c(&SegmentSchema::collection_id_) == collection_id and in(&SegmentSchema::id_, ids) and + c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE)); } Status result; @@ -559,8 +536,7 @@ SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std:: } Status -SqliteMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id, - FilesHolder& files_holder) { +SqliteMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id, FilesHolder& files_holder) { try { auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, @@ -661,9 +637,8 @@ SqliteMetaImpl::GetCollectionFlushLSN(const std::string& collection_id, uint64_t // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); - auto selected = - ConnectorPtr->select(columns(&CollectionSchema::flush_lsn_), - where(c(&CollectionSchema::collection_id_) == collection_id)); + auto selected = ConnectorPtr->select(columns(&CollectionSchema::flush_lsn_), + where(c(&CollectionSchema::collection_id_) == collection_id)); if (selected.size() > 0) { flush_lsn = std::get<0>(selected[0]); @@ -688,9 +663,9 @@ SqliteMetaImpl::UpdateCollectionFile(SegmentSchema& file_schema) { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); - auto collections = ConnectorPtr->select(columns(&CollectionSchema::state_), - where(c(&CollectionSchema::collection_id_) - == file_schema.collection_id_)); + auto collections = + ConnectorPtr->select(columns(&CollectionSchema::state_), + where(c(&CollectionSchema::collection_id_) == file_schema.collection_id_)); // if the collection has been deleted, just mark the collection file as TO_DELETE // clean thread will delete the file later @@ -702,9 +677,8 @@ SqliteMetaImpl::UpdateCollectionFile(SegmentSchema& file_schema) { LOG_ENGINE_DEBUG_ << "Update single collection file, file id = " << file_schema.file_id_; } catch (std::exception& e) { - std::string msg = - "Exception update collection file: collection_id = " + file_schema.collection_id_ + " file_id = " - + file_schema.file_id_; + std::string msg = "Exception update collection file: collection_id = " + file_schema.collection_id_ + + " file_id = " + file_schema.file_id_; return HandleException(msg, e.what()); } return Status::OK(); @@ -724,11 +698,10 @@ SqliteMetaImpl::UpdateCollectionFiles(SegmentsSchema& files) { if (has_collections.find(file.collection_id_) != has_collections.end()) { continue; } - auto collections = ConnectorPtr->select(columns(&CollectionSchema::id_), - where( - c(&CollectionSchema::collection_id_) == file.collection_id_ and - c(&CollectionSchema::state_) - != (int)CollectionSchema::TO_DELETE)); + auto collections = + ConnectorPtr->select(columns(&CollectionSchema::id_), + where(c(&CollectionSchema::collection_id_) == file.collection_id_ and + c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); if (collections.size() >= 1) { has_collections[file.collection_id_] = true; } else { @@ -791,18 +764,12 @@ SqliteMetaImpl::UpdateCollectionIndex(const std::string& collection_id, const Co auto collections = ConnectorPtr->select( - columns(&CollectionSchema::id_, - &CollectionSchema::state_, - &CollectionSchema::dimension_, - &CollectionSchema::created_on_, - &CollectionSchema::flag_, - &CollectionSchema::index_file_size_, - &CollectionSchema::owner_collection_, - &CollectionSchema::partition_tag_, - &CollectionSchema::version_, - &CollectionSchema::flush_lsn_), - where(c(&CollectionSchema::collection_id_) == collection_id - and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, + &CollectionSchema::created_on_, &CollectionSchema::flag_, &CollectionSchema::index_file_size_, + &CollectionSchema::owner_collection_, &CollectionSchema::partition_tag_, + &CollectionSchema::version_, &CollectionSchema::flush_lsn_), + where(c(&CollectionSchema::collection_id_) == collection_id and + c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); if (collections.size() > 0) { meta::CollectionSchema collection_schema; @@ -871,8 +838,8 @@ SqliteMetaImpl::DescribeCollectionIndex(const std::string& collection_id, Collec auto groups = ConnectorPtr->select( columns(&CollectionSchema::engine_type_, &CollectionSchema::index_params_, &CollectionSchema::metric_type_), - where(c(&CollectionSchema::collection_id_) == collection_id - and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + where(c(&CollectionSchema::collection_id_) == collection_id and + c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); if (groups.size() == 1) { index.engine_type_ = std::get<0>(groups[0]); @@ -933,10 +900,8 @@ SqliteMetaImpl::DropCollectionIndex(const std::string& collection_id) { } Status -SqliteMetaImpl::CreatePartition(const std::string& collection_id, - const std::string& partition_name, - const std::string& tag, - uint64_t lsn) { +SqliteMetaImpl::CreatePartition(const std::string& collection_id, const std::string& partition_name, + const std::string& tag, uint64_t lsn) { USING_SQLITE_WARNING server::MetricCollector metric; @@ -996,11 +961,10 @@ SqliteMetaImpl::HasPartition(const std::string& collection_id, const std::string std::string valid_tag = tag; server::StringHelpFunctions::TrimStringBlank(valid_tag); - auto name = ConnectorPtr->select( - columns(&CollectionSchema::collection_id_), - where(c(&CollectionSchema::owner_collection_) == collection_id - and c(&CollectionSchema::partition_tag_) == valid_tag and - c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + auto name = ConnectorPtr->select(columns(&CollectionSchema::collection_id_), + where(c(&CollectionSchema::owner_collection_) == collection_id and + c(&CollectionSchema::partition_tag_) == valid_tag and + c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); if (name.size() > 0) { has_or_not = true; } else { @@ -1026,18 +990,10 @@ SqliteMetaImpl::ShowPartitions(const std::string& collection_id, fiu_do_on("SqliteMetaImpl.ShowPartitions.throw_exception", throw std::exception()); auto partitions = ConnectorPtr->select( - columns(&CollectionSchema::id_, - &CollectionSchema::state_, - &CollectionSchema::dimension_, - &CollectionSchema::created_on_, - &CollectionSchema::flag_, - &CollectionSchema::index_file_size_, - &CollectionSchema::engine_type_, - &CollectionSchema::index_params_, - &CollectionSchema::metric_type_, - &CollectionSchema::partition_tag_, - &CollectionSchema::version_, - &CollectionSchema::collection_id_, + columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, + &CollectionSchema::created_on_, &CollectionSchema::flag_, &CollectionSchema::index_file_size_, + &CollectionSchema::engine_type_, &CollectionSchema::index_params_, &CollectionSchema::metric_type_, + &CollectionSchema::partition_tag_, &CollectionSchema::version_, &CollectionSchema::collection_id_, &CollectionSchema::flush_lsn_), where(c(&CollectionSchema::owner_collection_) == collection_id and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); @@ -1068,8 +1024,7 @@ SqliteMetaImpl::ShowPartitions(const std::string& collection_id, } Status -SqliteMetaImpl::GetPartitionName(const std::string& collection_id, - const std::string& tag, +SqliteMetaImpl::GetPartitionName(const std::string& collection_id, const std::string& tag, std::string& partition_name) { try { server::MetricCollector metric; @@ -1080,11 +1035,10 @@ SqliteMetaImpl::GetPartitionName(const std::string& collection_id, std::string valid_tag = tag; server::StringHelpFunctions::TrimStringBlank(valid_tag); - auto name = ConnectorPtr->select( - columns(&CollectionSchema::collection_id_), - where(c(&CollectionSchema::owner_collection_) == collection_id - and c(&CollectionSchema::partition_tag_) == valid_tag and - c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + auto name = ConnectorPtr->select(columns(&CollectionSchema::collection_id_), + where(c(&CollectionSchema::owner_collection_) == collection_id and + c(&CollectionSchema::partition_tag_) == valid_tag and + c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); if (name.size() > 0) { partition_name = std::get<0>(name[0]); } else { @@ -1171,8 +1125,7 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& fil } Status -SqliteMetaImpl::FilesToSearchEx(const std::string& root_collection, - const std::set& partition_id_array, +SqliteMetaImpl::FilesToSearchEx(const std::string& root_collection, const std::set& partition_id_array, FilesHolder& files_holder) { try { server::MetricCollector metric; @@ -1280,11 +1233,10 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& file } // get files to merge - auto select_columns = - columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, - &SegmentSchema::created_on_, &SegmentSchema::updated_time_); + auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, + &SegmentSchema::created_on_, &SegmentSchema::updated_time_); decltype(ConnectorPtr->select(select_columns)) selected; { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here @@ -1348,11 +1300,10 @@ SqliteMetaImpl::FilesToIndex(FilesHolder& files_holder) { server::MetricCollector metric; - auto select_columns = - columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, - &SegmentSchema::created_on_, &SegmentSchema::updated_time_); + auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, + &SegmentSchema::created_on_, &SegmentSchema::updated_time_); decltype(ConnectorPtr->select(select_columns)) selected; { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here @@ -1413,8 +1364,7 @@ SqliteMetaImpl::FilesToIndex(FilesHolder& files_holder) { } Status -SqliteMetaImpl::FilesByType(const std::string& collection_id, - const std::vector& file_types, +SqliteMetaImpl::FilesByType(const std::string& collection_id, const std::vector& file_types, FilesHolder& files_holder) { if (file_types.empty()) { return Status(DB_ERROR, "file types array is empty"); @@ -1433,18 +1383,16 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, } // get files by type - auto select_columns = - columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, - &SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_, - &SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_, - &SegmentSchema::updated_time_); + auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, + &SegmentSchema::file_type_, &SegmentSchema::file_size_, + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, + &SegmentSchema::created_on_, &SegmentSchema::updated_time_); decltype(ConnectorPtr->select(select_columns)) selected; { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); - selected = ConnectorPtr->select(select_columns, - where(in(&SegmentSchema::file_type_, file_types) - and c(&SegmentSchema::collection_id_) == collection_id)); + selected = ConnectorPtr->select(select_columns, where(in(&SegmentSchema::file_type_, file_types) and + c(&SegmentSchema::collection_id_) == collection_id)); } if (selected.size() >= 1) { @@ -1470,21 +1418,29 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, file_schema.metric_type_ = collection_schema.metric_type_; switch (file_schema.file_type_) { - case (int)SegmentSchema::RAW:++raw_count; + case (int)SegmentSchema::RAW: + ++raw_count; break; - case (int)SegmentSchema::NEW:++new_count; + case (int)SegmentSchema::NEW: + ++new_count; break; - case (int)SegmentSchema::NEW_MERGE:++new_merge_count; + case (int)SegmentSchema::NEW_MERGE: + ++new_merge_count; break; - case (int)SegmentSchema::NEW_INDEX:++new_index_count; + case (int)SegmentSchema::NEW_INDEX: + ++new_index_count; break; - case (int)SegmentSchema::TO_INDEX:++to_index_count; + case (int)SegmentSchema::TO_INDEX: + ++to_index_count; break; - case (int)SegmentSchema::INDEX:++index_count; + case (int)SegmentSchema::INDEX: + ++index_count; break; - case (int)SegmentSchema::BACKUP:++backup_count; + case (int)SegmentSchema::BACKUP: + ++backup_count; + break; + default: break; - default:break; } auto status = utils::GetCollectionFilePath(options_, file_schema); @@ -1498,9 +1454,11 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, std::string msg = "Get collection files by type."; for (int file_type : file_types) { switch (file_type) { - case (int)SegmentSchema::RAW:msg = msg + " raw files:" + std::to_string(raw_count); + case (int)SegmentSchema::RAW: + msg = msg + " raw files:" + std::to_string(raw_count); break; - case (int)SegmentSchema::NEW:msg = msg + " new files:" + std::to_string(new_count); + case (int)SegmentSchema::NEW: + msg = msg + " new files:" + std::to_string(new_count); break; case (int)SegmentSchema::NEW_MERGE: msg = msg + " new_merge files:" + std::to_string(new_merge_count); @@ -1508,13 +1466,17 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, case (int)SegmentSchema::NEW_INDEX: msg = msg + " new_index files:" + std::to_string(new_index_count); break; - case (int)SegmentSchema::TO_INDEX:msg = msg + " to_index files:" + std::to_string(to_index_count); + case (int)SegmentSchema::TO_INDEX: + msg = msg + " to_index files:" + std::to_string(to_index_count); break; - case (int)SegmentSchema::INDEX:msg = msg + " index files:" + std::to_string(index_count); + case (int)SegmentSchema::INDEX: + msg = msg + " index files:" + std::to_string(index_count); break; - case (int)SegmentSchema::BACKUP:msg = msg + " backup files:" + std::to_string(backup_count); + case (int)SegmentSchema::BACKUP: + msg = msg + " backup files:" + std::to_string(backup_count); + break; + default: break; - default:break; } } LOG_ENGINE_DEBUG_ << msg; @@ -1528,8 +1490,7 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, Status SqliteMetaImpl::FilesByTypeEx(const std::vector& collections, - const std::vector& file_types, - FilesHolder& files_holder) { + const std::vector& file_types, FilesHolder& files_holder) { if (file_types.empty()) { return Status(DB_ERROR, "file types array is empty"); } @@ -1537,7 +1498,7 @@ SqliteMetaImpl::FilesByTypeEx(const std::vector& collect Status ret = Status::OK(); try { - fiu_do_on("SqliteMetaImpl.FilesByType.throw_exception", throw std::exception()); + fiu_do_on("SqliteMetaImpl.FilesByTypeEx.throw_exception", throw std::exception()); // distribute id array to batches const uint64_t batch_size = 50; @@ -1563,17 +1524,10 @@ SqliteMetaImpl::FilesByTypeEx(const std::vector& collect int to_index_count = 0, index_count = 0, backup_count = 0; for (auto group : id_groups) { auto select_columns = - columns(&SegmentSchema::id_, - &SegmentSchema::collection_id_, - &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, - &SegmentSchema::file_type_, - &SegmentSchema::file_size_, - &SegmentSchema::row_count_, - &SegmentSchema::date_, - &SegmentSchema::engine_type_, - &SegmentSchema::created_on_, - &SegmentSchema::updated_time_); + columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, + &SegmentSchema::created_on_, &SegmentSchema::updated_time_); decltype(ConnectorPtr->select(select_columns)) selected; auto match_collectionid = in(&SegmentSchema::collection_id_, group); @@ -1609,21 +1563,29 @@ SqliteMetaImpl::FilesByTypeEx(const std::vector& collect file_schema.metric_type_ = collection_schema.metric_type_; switch (file_schema.file_type_) { - case (int)SegmentSchema::RAW:++raw_count; + case (int)SegmentSchema::RAW: + ++raw_count; break; - case (int)SegmentSchema::NEW:++new_count; + case (int)SegmentSchema::NEW: + ++new_count; break; - case (int)SegmentSchema::NEW_MERGE:++new_merge_count; + case (int)SegmentSchema::NEW_MERGE: + ++new_merge_count; break; - case (int)SegmentSchema::NEW_INDEX:++new_index_count; + case (int)SegmentSchema::NEW_INDEX: + ++new_index_count; break; - case (int)SegmentSchema::TO_INDEX:++to_index_count; + case (int)SegmentSchema::TO_INDEX: + ++to_index_count; break; - case (int)SegmentSchema::INDEX:++index_count; + case (int)SegmentSchema::INDEX: + ++index_count; break; - case (int)SegmentSchema::BACKUP:++backup_count; + case (int)SegmentSchema::BACKUP: + ++backup_count; + break; + default: break; - default:break; } auto status = utils::GetCollectionFilePath(options_, file_schema); @@ -1638,9 +1600,11 @@ SqliteMetaImpl::FilesByTypeEx(const std::vector& collect std::string msg = "Get collection files by type."; for (int file_type : file_types) { switch (file_type) { - case (int)SegmentSchema::RAW:msg = msg + " raw files:" + std::to_string(raw_count); + case (int)SegmentSchema::RAW: + msg = msg + " raw files:" + std::to_string(raw_count); break; - case (int)SegmentSchema::NEW:msg = msg + " new files:" + std::to_string(new_count); + case (int)SegmentSchema::NEW: + msg = msg + " new files:" + std::to_string(new_count); break; case (int)SegmentSchema::NEW_MERGE: msg = msg + " new_merge files:" + std::to_string(new_merge_count); @@ -1651,11 +1615,14 @@ SqliteMetaImpl::FilesByTypeEx(const std::vector& collect case (int)SegmentSchema::TO_INDEX: msg = msg + " to_index files:" + std::to_string(to_index_count); break; - case (int)SegmentSchema::INDEX:msg = msg + " index files:" + std::to_string(index_count); + case (int)SegmentSchema::INDEX: + msg = msg + " index files:" + std::to_string(index_count); break; - case (int)SegmentSchema::BACKUP:msg = msg + " backup files:" + std::to_string(backup_count); + case (int)SegmentSchema::BACKUP: + msg = msg + " backup files:" + std::to_string(backup_count); + break; + default: break; - default:break; } } LOG_ENGINE_DEBUG_ << msg; @@ -1676,17 +1643,10 @@ SqliteMetaImpl::FilesByID(const std::vector& ids, FilesHolder& files_hol server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.FilesByID.throw_exception", throw std::exception()); - auto select_columns = columns(&SegmentSchema::id_, - &SegmentSchema::collection_id_, - &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, - &SegmentSchema::file_type_, - &SegmentSchema::file_size_, - &SegmentSchema::row_count_, - &SegmentSchema::date_, - &SegmentSchema::engine_type_, - &SegmentSchema::created_on_, - &SegmentSchema::updated_time_); + auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, + &SegmentSchema::created_on_, &SegmentSchema::updated_time_); // perform query decltype(ConnectorPtr->select(select_columns)) selected; @@ -1917,8 +1877,8 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/ // delete file from disk storage utils::DeleteCollectionFilePath(options_, collection_file); - LOG_ENGINE_DEBUG_ << "Remove file id:" << collection_file.file_id_ << " location:" - << collection_file.location_; + LOG_ENGINE_DEBUG_ << "Remove file id:" << collection_file.file_id_ + << " location:" << collection_file.location_; collection_ids.insert(collection_file.collection_id_); segment_ids.insert(std::make_pair(collection_file.segment_id_, collection_file)); @@ -1948,9 +1908,9 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/ // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); - auto collections = ConnectorPtr->select(columns(&CollectionSchema::id_, &CollectionSchema::collection_id_), - where( - c(&CollectionSchema::state_) == (int)CollectionSchema::TO_DELETE)); + auto collections = + ConnectorPtr->select(columns(&CollectionSchema::id_, &CollectionSchema::collection_id_), + where(c(&CollectionSchema::state_) == (int)CollectionSchema::TO_DELETE)); auto commited = ConnectorPtr->transaction([&]() mutable { for (auto& collection : collections) { @@ -2039,9 +1999,8 @@ SqliteMetaImpl::Count(const std::string& collection_id, uint64_t& result) { { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); - selected = ConnectorPtr->select(select_columns, - where(in(&SegmentSchema::file_type_, file_types) - and c(&SegmentSchema::collection_id_) == collection_id)); + selected = ConnectorPtr->select(select_columns, where(in(&SegmentSchema::file_type_, file_types) and + c(&SegmentSchema::collection_id_) == collection_id)); } CollectionSchema collection_schema; @@ -2251,19 +2210,11 @@ SqliteMetaImpl::DescribeHybridCollection(milvus::engine::meta::CollectionSchema& server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.DescriCollection.throw_exception", throw std::exception()); auto groups = ConnectorPtr->select( - columns(&CollectionSchema::id_, - &CollectionSchema::state_, - &CollectionSchema::dimension_, - &CollectionSchema::created_on_, - &CollectionSchema::flag_, - &CollectionSchema::index_file_size_, - &CollectionSchema::engine_type_, - &CollectionSchema::index_params_, - &CollectionSchema::metric_type_, - &CollectionSchema::owner_collection_, - &CollectionSchema::partition_tag_, - &CollectionSchema::version_, - &CollectionSchema::flush_lsn_), + columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, + &CollectionSchema::created_on_, &CollectionSchema::flag_, &CollectionSchema::index_file_size_, + &CollectionSchema::engine_type_, &CollectionSchema::index_params_, &CollectionSchema::metric_type_, + &CollectionSchema::owner_collection_, &CollectionSchema::partition_tag_, + &CollectionSchema::version_, &CollectionSchema::flush_lsn_), where(c(&CollectionSchema::collection_id_) == collection_schema.collection_id_ and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); @@ -2309,57 +2260,6 @@ SqliteMetaImpl::DescribeHybridCollection(milvus::engine::meta::CollectionSchema& return Status::OK(); } -Status -SqliteMetaImpl::CreateHybridCollectionFile(SegmentSchema& file_schema) { - USING_SQLITE_WARNING - if (file_schema.date_ == EmptyDate) { - file_schema.date_ = utils::GetDate(); - } - CollectionSchema collection_schema; - hybrid::FieldsSchema fields_schema; - collection_schema.collection_id_ = file_schema.collection_id_; - auto status = DescribeHybridCollection(collection_schema, fields_schema); - if (!status.ok()) { - return status; - } - - try { - fiu_do_on("SqliteMetaImpl.CreateCollectionFile.throw_exception", throw std::exception()); - server::MetricCollector metric; - - NextFileId(file_schema.file_id_); - if (file_schema.segment_id_.empty()) { - file_schema.segment_id_ = file_schema.file_id_; - } - file_schema.dimension_ = collection_schema.dimension_; - file_schema.file_size_ = 0; - file_schema.row_count_ = 0; - file_schema.created_on_ = utils::GetMicroSecTimeStamp(); - file_schema.updated_time_ = file_schema.created_on_; - file_schema.index_file_size_ = collection_schema.index_file_size_; - file_schema.index_params_ = collection_schema.index_params_; - file_schema.engine_type_ = collection_schema.engine_type_; - file_schema.metric_type_ = collection_schema.metric_type_; - - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - - auto id = ConnectorPtr->insert(file_schema); - file_schema.id_ = id; - - for (auto field_schema : fields_schema.fields_schema_) { - ConnectorPtr->insert(field_schema); - } - - LOG_ENGINE_DEBUG_ << "Successfully create collection file, file id = " << file_schema.file_id_; - return utils::CreateCollectionFilePath(options_, file_schema); - } catch (std::exception& e) { - return HandleException("Encounter exception when create collection file", e.what()); - } - - return Status::OK(); -} - } // namespace meta } // namespace engine } // namespace milvus diff --git a/core/src/db/meta/SqliteMetaImpl.h b/core/src/db/meta/SqliteMetaImpl.h index 222d7298a3..5b3c04314a 100644 --- a/core/src/db/meta/SqliteMetaImpl.h +++ b/core/src/db/meta/SqliteMetaImpl.h @@ -163,9 +163,6 @@ class SqliteMetaImpl : public Meta { Status DescribeHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) override; - Status - CreateHybridCollectionFile(SegmentSchema& file_schema) override; - private: Status NextFileId(std::string& file_id); diff --git a/core/src/query/BinaryQuery.cpp b/core/src/query/BinaryQuery.cpp index d812cc6d42..4882e0140e 100644 --- a/core/src/query/BinaryQuery.cpp +++ b/core/src/query/BinaryQuery.cpp @@ -215,7 +215,7 @@ bool ValidateBinaryQuery(BinaryQueryPtr& binary_query) { // Only for one layer BooleanQuery uint64_t height = BinaryQueryHeight(binary_query); - return height > 1 && height < 4; + return height > 1; } } // namespace query diff --git a/core/src/segment/Attr.cpp b/core/src/segment/Attr.cpp index 3c55c0e165..30d4b91a04 100644 --- a/core/src/segment/Attr.cpp +++ b/core/src/segment/Attr.cpp @@ -34,23 +34,23 @@ Attr::Attr(const std::vector& data, size_t nbytes, const std::vector& data, size_t nbytes) { - data_.reserve(data_.size() + data.size()); - data_.insert(data_.end(), std::make_move_iterator(data.begin()), std::make_move_iterator(data.end())); - nbytes_ += nbytes; -} +// void +// Attr::AddAttr(const std::vector& data, size_t nbytes) { +// data_.reserve(data_.size() + data.size()); +// data_.insert(data_.end(), std::make_move_iterator(data.begin()), std::make_move_iterator(data.end())); +// nbytes_ += nbytes; +//} +// +// void +// Attr::AddUids(const std::vector& uids) { +// uids_.reserve(uids_.size() + uids.size()); +// uids_.insert(uids_.end(), std::make_move_iterator(uids.begin()), std::make_move_iterator(uids.end())); +//} -void -Attr::AddUids(const std::vector& uids) { - uids_.reserve(uids_.size() + uids.size()); - uids_.insert(uids_.end(), std::make_move_iterator(uids.begin()), std::make_move_iterator(uids.end())); -} - -void -Attr::SetName(const std::string& name) { - name_ = name; -} +// void +// Attr::SetName(const std::string& name) { +// name_ = name; +//} const std::vector& Attr::GetData() const { @@ -87,15 +87,15 @@ Attr::GetCodeLength() const { return uids_.size() == 0 ? 0 : nbytes_ / uids_.size(); } -void -Attr::Erase(int32_t offset) { - auto code_length = GetCodeLength(); - if (code_length != 0) { - auto step = offset * code_length; - data_.erase(data_.begin() + step, data_.begin() + step + code_length); - uids_.erase(uids_.begin() + offset, uids_.begin() + offset + 1); - } -} +// void +// Attr::Erase(int32_t offset) { +// auto code_length = GetCodeLength(); +// if (code_length != 0) { +// auto step = offset * code_length; +// data_.erase(data_.begin() + step, data_.begin() + step + code_length); +// uids_.erase(uids_.begin() + offset, uids_.begin() + offset + 1); +// } +//} void Attr::Erase(std::vector& offsets) { diff --git a/core/src/segment/Attr.h b/core/src/segment/Attr.h index 506e50f33b..f88a5300ac 100644 --- a/core/src/segment/Attr.h +++ b/core/src/segment/Attr.h @@ -30,14 +30,14 @@ class Attr { Attr(); - void - AddAttr(const std::vector& data, size_t nbytes); - - void - AddUids(const std::vector& uids); - - void - SetName(const std::string& name); + // void + // AddAttr(const std::vector& data, size_t nbytes); + // + // void + // AddUids(const std::vector& uids); + // + // void + // SetName(const std::string& name); const std::vector& GetData() const; @@ -60,8 +60,8 @@ class Attr { size_t GetCodeLength() const; - void - Erase(int32_t offset); + // void + // Erase(int32_t offset); void Erase(std::vector& offsets); diff --git a/core/src/segment/SegmentWriter.cpp b/core/src/segment/SegmentWriter.cpp index f66a00e3d7..f6bc412eb3 100644 --- a/core/src/segment/SegmentWriter.cpp +++ b/core/src/segment/SegmentWriter.cpp @@ -69,15 +69,20 @@ SegmentWriter::AddAttrs(const std::string& name, const std::unordered_mapattrs_ptr_->attrs; for (; attr_data_it != attr_data.end(); ++attr_data_it) { - if (attrs.find(attr_data_it->first) != attrs.end()) { - segment_ptr_->attrs_ptr_->attrs.at(attr_data_it->first) - ->AddAttr(attr_data_it->second, attr_nbytes.at(attr_data_it->first)); - segment_ptr_->attrs_ptr_->attrs.at(attr_data_it->first)->AddUids(uids); - } else { - AttrPtr attr = std::make_shared(attr_data_it->second, attr_nbytes.at(attr_data_it->first), uids, - attr_data_it->first); - segment_ptr_->attrs_ptr_->attrs.insert(std::make_pair(attr_data_it->first, attr)); - } + AttrPtr attr = std::make_shared(attr_data_it->second, attr_nbytes.at(attr_data_it->first), uids, + attr_data_it->first); + segment_ptr_->attrs_ptr_->attrs.insert(std::make_pair(attr_data_it->first, attr)); + + // if (attrs.find(attr_data_it->first) != attrs.end()) { + // segment_ptr_->attrs_ptr_->attrs.at(attr_data_it->first) + // ->AddAttr(attr_data_it->second, attr_nbytes.at(attr_data_it->first)); + // segment_ptr_->attrs_ptr_->attrs.at(attr_data_it->first)->AddUids(uids); + // } else { + // AttrPtr attr = std::make_shared(attr_data_it->second, attr_nbytes.at(attr_data_it->first), + // uids, + // attr_data_it->first); + // segment_ptr_->attrs_ptr_->attrs.insert(std::make_pair(attr_data_it->first, attr)); + // } } return Status::OK(); } diff --git a/core/src/server/delivery/hybrid_request/CreateHybridCollectionRequest.cpp b/core/src/server/delivery/hybrid_request/CreateHybridCollectionRequest.cpp index 82b186f2fd..f39e97e23b 100644 --- a/core/src/server/delivery/hybrid_request/CreateHybridCollectionRequest.cpp +++ b/core/src/server/delivery/hybrid_request/CreateHybridCollectionRequest.cpp @@ -57,6 +57,8 @@ CreateHybridCollectionRequest::OnExecute() { try { // step 1: check arguments auto status = ValidationUtil::ValidateCollectionName(collection_name_); + fiu_do_on("CreateHybridCollectionRequest.OnExecute.invalid_collection_name", + status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { return status; } @@ -98,6 +100,8 @@ CreateHybridCollectionRequest::OnExecute() { // step 3: create collection status = DBWrapper::DB()->CreateHybridCollection(collection_info, fields_schema); + fiu_do_on("CreateHybridCollectionRequest.OnExecute.invalid_db_execute", + status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { // collection could exist if (status.code() == DB_ALREADY_EXIST) { diff --git a/core/src/server/delivery/hybrid_request/DescribeHybridCollectionRequest.cpp b/core/src/server/delivery/hybrid_request/DescribeHybridCollectionRequest.cpp index fa3bf78486..2c50cc7573 100644 --- a/core/src/server/delivery/hybrid_request/DescribeHybridCollectionRequest.cpp +++ b/core/src/server/delivery/hybrid_request/DescribeHybridCollectionRequest.cpp @@ -53,6 +53,8 @@ DescribeHybridCollectionRequest::OnExecute() { engine::meta::hybrid::FieldsSchema fields_schema; collection_schema.collection_id_ = collection_name_; auto status = DBWrapper::DB()->DescribeHybridCollection(collection_schema, fields_schema); + fiu_do_on("DescribeHybridCollectionRequest.OnExecute.invalid_db_execute", + status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); if (!status.ok()) { return status; } diff --git a/core/src/server/grpc_impl/GrpcRequestHandler.cpp b/core/src/server/grpc_impl/GrpcRequestHandler.cpp index d8a019c0d5..2bd7eed694 100644 --- a/core/src/server/grpc_impl/GrpcRequestHandler.cpp +++ b/core/src/server/grpc_impl/GrpcRequestHandler.cpp @@ -952,6 +952,20 @@ GrpcRequestHandler::DescribeHybridCollection(::grpc::ServerContext* context, const ::milvus::grpc::CollectionName* request, ::milvus::grpc::Mapping* response) { LOG_SERVER_INFO_ << LogOut("Request [%s] %s begin.", GetContext(context)->RequestID().c_str(), __func__); + std::unordered_map field_types; + Status status = + request_handler_.DescribeHybridCollection(GetContext(context), request->collection_name(), field_types); + + response->mutable_status()->set_error_code((milvus::grpc::ErrorCode)status.code()); + response->mutable_status()->set_reason(status.message()); + response->set_collection_name(request->collection_name()); + auto field_it = field_types.begin(); + for (; field_it != field_types.end(); field_it++) { + auto field = response->add_fields(); + field->set_name(field_it->first); + field->mutable_type()->set_data_type((milvus::grpc::DataType)field_it->second); + } + CHECK_NULLPTR_RETURN(request); LOG_SERVER_INFO_ << LogOut("Request [%s] %s end.", GetContext(context)->RequestID().c_str(), __func__); return ::grpc::Status::OK; diff --git a/core/src/utils/LogUtil.cpp b/core/src/utils/LogUtil.cpp index d6883bafb7..869170b065 100644 --- a/core/src/utils/LogUtil.cpp +++ b/core/src/utils/LogUtil.cpp @@ -11,11 +11,11 @@ #include "utils/LogUtil.h" +#include #include #include #include -#include #include #include diff --git a/core/unittest/db/test_db.cpp b/core/unittest/db/test_db.cpp index 1c5882d25d..2b9db83c4e 100644 --- a/core/unittest/db/test_db.cpp +++ b/core/unittest/db/test_db.cpp @@ -621,6 +621,11 @@ TEST_F(DBTest, BACK_TIMER_THREAD_1) { db_->Start(); db_->Stop(); fiu_disable("DBImpl.StartMetricTask.InvalidTotalCache"); + + FIU_ENABLE_FIU("options_metric_enable"); + db_->Start(); + db_->Stop(); + fiu_disable("options_metric_enable"); } TEST_F(DBTest, BACK_TIMER_THREAD_2) { @@ -1219,6 +1224,39 @@ TEST_F(DBTest2, GET_VECTOR_BY_ID_TEST) { } } +TEST_F(DBTest2, GET_VECTOR_BY_ID_INVALID_TEST) { + fiu_init(0); + + milvus::engine::meta::CollectionSchema collection_info = BuildCollectionSchema(); + auto stat = db_->CreateCollection(collection_info); + ASSERT_TRUE(stat.ok()); + + uint64_t qb = 1000; + milvus::engine::VectorsData qxb; + BuildVectors(qb, 0, qxb); + + std::string partition_name = "part_name"; + std::string partition_tag = "part_tag"; + stat = db_->CreatePartition(collection_info.collection_id_, partition_name, partition_tag); + ASSERT_TRUE(stat.ok()); + + std::vector vectors; + std::vector empty_array; + stat = db_->GetVectorsByID(collection_info, empty_array, vectors); + ASSERT_FALSE(stat.ok()); + + stat = db_->InsertVectors(collection_info.collection_id_, partition_tag, qxb); + ASSERT_TRUE(stat.ok()); + + db_->Flush(collection_info.collection_id_); + + fiu_enable("bloom_filter_nullptr", 1, NULL, 0); + stat = db_->GetVectorsByID(collection_info, qxb.id_array_, vectors); + ASSERT_FALSE(stat.ok()); + fiu_disable("bloom_filter_nullptr"); +} + + TEST_F(DBTest2, GET_VECTOR_IDS_TEST) { milvus::engine::meta::CollectionSchema collection_schema = BuildCollectionSchema(); auto stat = db_->CreateCollection(collection_schema); diff --git a/core/unittest/db/test_hybrid_db.cpp b/core/unittest/db/test_hybrid_db.cpp index 6d09b8649f..54eb1292fd 100644 --- a/core/unittest/db/test_hybrid_db.cpp +++ b/core/unittest/db/test_hybrid_db.cpp @@ -37,27 +37,28 @@ static constexpr int64_t NQ = 10; static constexpr int64_t TOPK = 100; void -BuildTableSchema(milvus::engine::meta::CollectionSchema& collection_schema, - milvus::engine::meta::hybrid::FieldsSchema& fields_schema, - std::unordered_map& attr_type) { +BuildTableSchema(const std::unordered_map& attr_type, + milvus::engine::meta::CollectionSchema& collection_schema, + milvus::engine::meta::hybrid::FieldsSchema& fields_schema) { collection_schema.dimension_ = TABLE_DIM; collection_schema.collection_id_ = TABLE_NAME; std::vector fields; - fields.resize(FIELD_NUM); - for (uint64_t i = 0; i < FIELD_NUM; ++i) { - fields[i].collection_id_ = TABLE_NAME; - fields[i].field_name_ = "field_" + std::to_string(i + 1); + auto attr_it = attr_type.begin(); + for (; attr_it != attr_type.end(); attr_it++) { + milvus::engine::meta::hybrid::FieldSchema schema; + schema.field_name_ = attr_it->first; + schema.field_type_ = (int)attr_it->second; + schema.collection_id_ = TABLE_NAME; + fields.emplace_back(schema); } - fields[0].field_type_ = (int)milvus::engine::meta::hybrid::DataType::INT32; - fields[1].field_type_ = (int)milvus::engine::meta::hybrid::DataType::INT64; - fields[2].field_type_ = (int)milvus::engine::meta::hybrid::DataType::FLOAT; - fields[3].field_type_ = (int)milvus::engine::meta::hybrid::DataType::VECTOR; - fields_schema.fields_schema_ = fields; + milvus::engine::meta::hybrid::FieldSchema schema; + schema.field_name_ = "field_vector"; + schema.collection_id_ = TABLE_NAME; + schema.field_type_ = (int)(milvus::engine::meta::hybrid::DataType::VECTOR); + fields.emplace_back(schema); - attr_type.insert(std::make_pair("field_0", milvus::engine::meta::hybrid::DataType::INT32)); - attr_type.insert(std::make_pair("field_1", milvus::engine::meta::hybrid::DataType::INT64)); - attr_type.insert(std::make_pair("field_2", milvus::engine::meta::hybrid::DataType::FLOAT)); + fields_schema.fields_schema_ = fields; } void @@ -87,7 +88,7 @@ BuildEntity(uint64_t n, uint64_t batch_index, milvus::engine::Entity& entity) { vectors.id_array_.push_back(n * batch_index + i); } - entity.vector_data_.insert(std::make_pair("field_3", vectors)); + entity.vector_data_.insert(std::make_pair("field_vector", vectors)); std::vector value_0; std::vector value_1; std::vector value_2; @@ -112,6 +113,56 @@ BuildEntity(uint64_t n, uint64_t batch_index, milvus::engine::Entity& entity) { entity.attr_value_ = attr_value; } +void +BuildComplexEntity(uint64_t n, uint64_t batch_index, milvus::engine::Entity& entity) { + milvus::engine::VectorsData vectors; + vectors.vector_count_ = n; + vectors.float_data_.clear(); + vectors.float_data_.resize(n * TABLE_DIM); + float* data = vectors.float_data_.data(); + for (uint64_t i = 0; i < n; i++) { + for (int64_t j = 0; j < TABLE_DIM; j++) data[TABLE_DIM * i + j] = drand48(); + data[TABLE_DIM * i] += i / 2000.; + + vectors.id_array_.push_back(n * batch_index + i); + } + entity.vector_data_.insert(std::make_pair("field_vector", vectors)); + std::vector value_0; + std::vector value_1; + std::vector value_2; + std::vector value_3; + std::vector value_4; + value_0.resize(n); + value_1.resize(n); + value_2.resize(n); + value_3.resize(n); + value_4.resize(n); + for (uint64_t i = 0; i < n; ++i) { + value_0[i] = i % 7; + value_1[i] = i + n; + value_2[i] = i + n; + value_3[i] = i + n; + value_4[i] = (double)((i + 100) / (n + 1)); + } + entity.entity_count_ = n; + size_t attr_size = n * sizeof(int64_t) * 6; + std::vector attr_value(attr_size, 0); + size_t offset = 0; + memcpy(attr_value.data(), value_0.data(), n * sizeof(int64_t)); + offset += n * sizeof(int64_t); + memcpy(attr_value.data() + offset, value_1.data(), n * sizeof(int64_t)); + offset += n * sizeof(int64_t); + memcpy(attr_value.data() + offset, value_2.data(), n * sizeof(int64_t)); + offset += n * sizeof(int64_t); + memcpy(attr_value.data() + offset, value_3.data(), n * sizeof(int64_t)); + offset += n * sizeof(int64_t); + memcpy(attr_value.data() + offset, value_4.data(), n * sizeof(double)); + offset += n * sizeof(int64_t); + memcpy(attr_value.data() + offset, value_4.data(), n * sizeof(double)); + + entity.attr_value_ = attr_value; +} + void ConstructGeneralQuery(milvus::query::GeneralQueryPtr& general_query) { general_query->bin->relation = milvus::query::QueryRelation::AND; @@ -142,7 +193,7 @@ ConstructGeneralQuery(milvus::query::GeneralQueryPtr& general_query) { range_query->boost = 2; auto vector_query = std::make_shared(); - vector_query->field_name = "field_3"; + vector_query->field_name = "field_vector"; vector_query->topk = 100; vector_query->boost = 3; milvus::json json_params = {{"nprobe", 10}}; @@ -166,13 +217,76 @@ ConstructGeneralQuery(milvus::query::GeneralQueryPtr& general_query) { right->leaf = std::make_shared(); right->leaf->vector_query = vector_query; } + + +// TODO(yukun): need to be extend +void +ConstructComplexGeneralQuery(milvus::query::GeneralQueryPtr& general_query) { + general_query->bin->relation = milvus::query::QueryRelation::AND; + general_query->bin->left_query = std::make_shared(); + general_query->bin->right_query = std::make_shared(); + auto left = general_query->bin->left_query; + auto right = general_query->bin->right_query; + left->bin->relation = milvus::query::QueryRelation::AND; + + auto term_query = std::make_shared(); + std::vector field_value = {10, 20, 30, 40, 50}; + std::vector term_value; + term_value.resize(5 * sizeof(int64_t)); + memcpy(term_value.data(), field_value.data(), 5 * sizeof(int64_t)); + term_query->field_name = "field_0"; + term_query->field_value = term_value; + term_query->boost = 1; + + auto range_query = std::make_shared(); + range_query->field_name = "field_1"; + std::vector compare_expr; + compare_expr.resize(2); + compare_expr[0].compare_operator = milvus::query::CompareOperator::GTE; + compare_expr[0].operand = "1000"; + compare_expr[1].compare_operator = milvus::query::CompareOperator::LTE; + compare_expr[1].operand = "5000"; + range_query->compare_expr = compare_expr; + range_query->boost = 2; + + auto vector_query = std::make_shared(); + vector_query->field_name = "field_vector"; + vector_query->topk = 100; + vector_query->boost = 3; + milvus::json json_params = {{"nprobe", 10}}; + vector_query->extra_params = json_params; + milvus::query::VectorRecord record; + record.float_data.resize(NQ * TABLE_DIM); + float* data = record.float_data.data(); + for (uint64_t i = 0; i < NQ; i++) { + for (int64_t j = 0; j < TABLE_DIM; j++) data[TABLE_DIM * i + j] = drand48(); + data[TABLE_DIM * i] += i / 2000.; + } + vector_query->query_vector = record; + + left->bin->left_query = std::make_shared(); + left->bin->right_query = std::make_shared(); + left->bin->left_query->leaf = std::make_shared(); + left->bin->right_query->leaf = std::make_shared(); + left->bin->left_query->leaf->term_query = term_query; + left->bin->right_query->leaf->range_query = range_query; + + right->leaf = std::make_shared(); + right->leaf->vector_query = vector_query; +} + } // namespace TEST_F(DBTest, HYBRID_DB_TEST) { milvus::engine::meta::CollectionSchema collection_info; milvus::engine::meta::hybrid::FieldsSchema fields_info; - std::unordered_map attr_type; - BuildTableSchema(collection_info, fields_info, attr_type); + std::vector field_names = {"field_0", "field_1", "field_2"}; + std::unordered_map attr_type = { + {"field_0", milvus::engine::meta::hybrid::DataType::INT32}, + {"field_1", milvus::engine::meta::hybrid::DataType::INT64}, + {"field_2", milvus::engine::meta::hybrid::DataType::FLOAT}}; + + BuildTableSchema(attr_type, collection_info, fields_info); auto stat = db_->CreateHybridCollection(collection_info, fields_info); ASSERT_TRUE(stat.ok()); @@ -187,8 +301,6 @@ TEST_F(DBTest, HYBRID_DB_TEST) { milvus::engine::Entity entity; BuildEntity(qb, 0, entity); - std::vector field_names = {"field_0", "field_1", "field_2"}; - stat = db_->InsertEntities(TABLE_NAME, "", field_names, entity, attr_type); ASSERT_TRUE(stat.ok()); @@ -207,8 +319,16 @@ TEST_F(DBTest, HYBRID_SEARCH_TEST) { //#ifndef MILVUS_GPU_VERSION milvus::engine::meta::CollectionSchema collection_info; milvus::engine::meta::hybrid::FieldsSchema fields_info; - std::unordered_map attr_type; - BuildTableSchema(collection_info, fields_info, attr_type); + std::unordered_map attr_type = { + {"field_0", milvus::engine::meta::hybrid::DataType::INT8}, + {"field_1", milvus::engine::meta::hybrid::DataType::INT16}, + {"field_2", milvus::engine::meta::hybrid::DataType::INT32}, + {"field_3", milvus::engine::meta::hybrid::DataType::INT64}, + {"field_4", milvus::engine::meta::hybrid::DataType::FLOAT}, + {"field_5", milvus::engine::meta::hybrid::DataType::DOUBLE}, + }; + + BuildTableSchema(attr_type, collection_info, fields_info); auto stat = db_->CreateHybridCollection(collection_info, fields_info); ASSERT_TRUE(stat.ok()); @@ -221,9 +341,9 @@ TEST_F(DBTest, HYBRID_SEARCH_TEST) { uint64_t qb = 1000; milvus::engine::Entity entity; - BuildEntity(qb, 0, entity); + BuildComplexEntity(qb, 0, entity); - std::vector field_names = {"field_0", "field_1", "field_2"}; + std::vector field_names = {"field_0", "field_1", "field_2", "field_3", "field_4", "field_5"}; stat = db_->InsertEntities(TABLE_NAME, "", field_names, entity, attr_type); ASSERT_TRUE(stat.ok()); @@ -233,7 +353,7 @@ TEST_F(DBTest, HYBRID_SEARCH_TEST) { // Construct general query milvus::query::GeneralQueryPtr general_query = std::make_shared(); - ConstructGeneralQuery(general_query); + ConstructComplexGeneralQuery(general_query); std::vector tags; milvus::context::HybridSearchContextPtr hybrid_context = std::make_shared(); @@ -249,8 +369,12 @@ TEST_F(DBTest, HYBRID_SEARCH_TEST) { TEST_F(DBTest, COMPACT_TEST) { milvus::engine::meta::CollectionSchema collection_info; milvus::engine::meta::hybrid::FieldsSchema fields_info; - std::unordered_map attr_type; - BuildTableSchema(collection_info, fields_info, attr_type); + std::unordered_map attr_type = { + {"field_0", milvus::engine::meta::hybrid::DataType::INT32}, + {"field_1", milvus::engine::meta::hybrid::DataType::INT64}, + {"field_2", milvus::engine::meta::hybrid::DataType::FLOAT}}; + + BuildTableSchema(attr_type, collection_info, fields_info); auto stat = db_->CreateHybridCollection(collection_info, fields_info); ASSERT_TRUE(stat.ok()); @@ -292,15 +416,64 @@ TEST_F(DBTest, COMPACT_TEST) { milvus::engine::ResultIds result_ids; milvus::engine::ResultDistances result_distances; - stat = db_->QueryByIDs(dummy_context_, - collection_info.collection_id_, - tags, - topk, - json_params, - ids_to_delete, - result_ids, - result_distances); + stat = db_->QueryByIDs(dummy_context_, collection_info.collection_id_, tags, topk, json_params, ids_to_delete, + result_ids, result_distances); ASSERT_TRUE(stat.ok()); ASSERT_EQ(result_ids[0], -1); ASSERT_EQ(result_distances[0], std::numeric_limits::max()); } + +TEST_F(DBTest, HYBRID_INVALID_TEST) { + fiu_init(0); + + milvus::engine::meta::CollectionSchema collection_info; + milvus::engine::meta::hybrid::FieldsSchema fields_info; + std::unordered_map attr_type = { + {"field_0", milvus::engine::meta::hybrid::DataType::INT32}, + {"field_1", milvus::engine::meta::hybrid::DataType::INT64}, + {"field_2", milvus::engine::meta::hybrid::DataType::FLOAT}}; + + BuildTableSchema(attr_type, collection_info, fields_info); + + auto stat = db_->CreateHybridCollection(collection_info, fields_info); + ASSERT_TRUE(stat.ok()); + milvus::engine::meta::CollectionSchema collection_info_get; + milvus::engine::meta::hybrid::FieldsSchema fields_info_get; + collection_info_get.collection_id_ = TABLE_NAME; + stat = db_->DescribeHybridCollection(collection_info_get, fields_info_get); + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(collection_info_get.dimension_, TABLE_DIM); + + uint64_t vector_count = 1000; + milvus::engine::Entity entity; + BuildEntity(vector_count, 0, entity); + + std::vector field_names = {"field_0", "field_1", "field_2"}; + stat = db_->InsertEntities(TABLE_NAME, "", field_names, entity, attr_type); + ASSERT_TRUE(stat.ok()); + + stat = db_->Flush(); + ASSERT_TRUE(stat.ok()); + + // Construct general query + milvus::query::GeneralQueryPtr general_query = std::make_shared(); + ConstructGeneralQuery(general_query); + + std::vector tags; + milvus::context::HybridSearchContextPtr hybrid_context = std::make_shared(); + milvus::engine::ResultIds result_ids; + milvus::engine::ResultDistances result_distances; + uint64_t nq; + + fiu_enable("read_id_directory_false", 1, NULL, 0); + stat = db_->HybridQuery(dummy_context_, TABLE_NAME, tags, hybrid_context, general_query, attr_type, nq, result_ids, + result_distances); + ASSERT_FALSE(stat.ok()); + fiu_disable("read_id_directory_false"); + + fiu_enable("read_attrs_internal_open_file_fail", 1, NULL, 0); + stat = db_->HybridQuery(dummy_context_, TABLE_NAME, tags, hybrid_context, general_query, attr_type, nq, result_ids, + result_distances); + ASSERT_FALSE(stat.ok()); + fiu_disable("read_attrs_internal_open_file_fail"); +} diff --git a/core/unittest/db/test_meta.cpp b/core/unittest/db/test_meta.cpp index a331e65fb3..435aabdb5f 100644 --- a/core/unittest/db/test_meta.cpp +++ b/core/unittest/db/test_meta.cpp @@ -295,6 +295,20 @@ TEST_F(MetaTest, FAILED_TEST) { ASSERT_EQ(status.code(), milvus::DB_META_TRANSACTION_FAILED); fiu_disable("SqliteMetaImpl.FilesByType.throw_exception"); } + { + milvus::engine::meta::FilesHolder files_holder; + std::vector collection_array; + milvus::engine::meta::CollectionSchema schema; + schema.collection_id_ = collection_id; + collection_array.emplace_back(schema); + std::vector file_types; + file_types.push_back(milvus::engine::meta::SegmentSchema::INDEX); + FIU_ENABLE_FIU("SqliteMetaImpl.FilesByTypeEx.throw_exception"); + status = impl_->FilesByTypeEx(collection_array, file_types, files_holder); + ASSERT_EQ(status.code(), milvus::DB_META_TRANSACTION_FAILED); + fiu_disable("SqliteMetaImpl.FilesByTypeEx.throw_exception"); + status = impl_->FilesByTypeEx(collection_array, file_types, files_holder); + } { uint64_t size = 0; FIU_ENABLE_FIU("SqliteMetaImpl.Size.throw_exception"); @@ -567,6 +581,9 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) { ++i; } + status = impl.GetCollectionFilesBySegmentId(table_file.segment_id_, files_holder); + ASSERT_TRUE(status.ok()); + impl.DropAll(); } diff --git a/core/unittest/db/test_meta_mysql.cpp b/core/unittest/db/test_meta_mysql.cpp index 7c2c1df77e..97de12ad76 100644 --- a/core/unittest/db/test_meta_mysql.cpp +++ b/core/unittest/db/test_meta_mysql.cpp @@ -499,6 +499,9 @@ TEST_F(MySqlMetaTest, ARCHIVE_TEST_DISK) { ++i; } + status = impl.GetCollectionFilesBySegmentId(table_file.segment_id_, files_holder); + ASSERT_TRUE(status.ok()); + status = impl.DropAll(); ASSERT_TRUE(status.ok()); } @@ -709,6 +712,17 @@ TEST_F(MySqlMetaTest, COLLECTION_FILES_TEST) { to_index_files_cnt + index_files_cnt; ASSERT_EQ(files_holder.HoldFiles().size(), total_cnt); + std::vector collection_array; + milvus::engine::meta::CollectionSchema schema; + schema.collection_id_ = collection_id; + status = impl_->FilesByTypeEx(collection_array, file_types, files_holder); + ASSERT_TRUE(status.ok()); + + // FIU_ENABLE_FIU("MySQLMetaImpl.FilesByTypeEx.throw_exception"); + // status = impl_->FilesByTypeEx(collection_array, file_types, files_holder); + // ASSERT_FALSE(status.ok()); + // fiu_disable("MySQLMetaImpl.FilesByTypeEx.throw_exception"); + FIU_ENABLE_FIU("MySQLMetaImpl.DeleteCollectionFiles.null_connection"); status = impl_->DeleteCollectionFiles({collection_id}); ASSERT_FALSE(status.ok()); diff --git a/core/unittest/server/test_rpc.cpp b/core/unittest/server/test_rpc.cpp index 49e625ceea..ffe100ac9e 100644 --- a/core/unittest/server/test_rpc.cpp +++ b/core/unittest/server/test_rpc.cpp @@ -1220,6 +1220,11 @@ TEST_F(RpcHandlerTest, HYBRID_TEST) { field_1->mutable_type()->mutable_vector_param()->set_dimension(128); field_1->set_name("field_1"); + milvus::json json_param = {{"metric_type", 1}, {"engine_type", 1}}; + auto extra_param = field_1->add_extra_params(); + extra_param->set_key("params"); + extra_param->set_value(json_param.dump()); + handler->CreateHybridCollection(&context, &mapping, &response); // Insert Entities @@ -1275,6 +1280,15 @@ TEST_F(RpcHandlerTest, HYBRID_TEST) { term_query->set_value_num(nq); term_query->set_values(term_value.data(), nq * sizeof(int64_t)); + auto range_query = boolean_query_2->add_general_query()->mutable_range_query(); + range_query->set_field_name("field_0"); + auto comp1 = range_query->add_operand(); + comp1->set_operator_(::milvus::grpc::CompareOperator::GTE); + comp1->set_operand("0"); + auto comp2 = range_query->add_operand(); + comp2->set_operator_(::milvus::grpc::CompareOperator::LTE); + comp2->set_operand("100000"); + auto vector_query = boolean_query_2->add_general_query()->mutable_vector_query(); vector_query->set_field_name("field_1"); vector_query->set_topk(topk); @@ -1291,10 +1305,10 @@ TEST_F(RpcHandlerTest, HYBRID_TEST) { auto row_record = vector_query->add_records(); CopyRowRecord(row_record, record); } - auto extra_param = vector_query->add_extra_params(); - extra_param->set_key("params"); + auto extra_param_1 = vector_query->add_extra_params(); + extra_param_1->set_key("params"); milvus::json param = {{"nprobe", 16}}; - extra_param->set_value(param.dump()); + extra_param_1->set_value(param.dump()); search_param.set_collection_name("test_hybrid"); auto search_extra_param = search_param.add_extra_params(); @@ -1305,6 +1319,144 @@ TEST_F(RpcHandlerTest, HYBRID_TEST) { handler->HybridSearch(&context, &search_param, &topk_query_result); } +TEST_F(RpcHandlerTest, HYBRID_INVALID_TEST) { + fiu_init(0); + + ::grpc::ServerContext context; + milvus::grpc::Mapping mapping; + milvus::grpc::Status response; + + uint64_t row_num = 1000; + uint64_t dimension = 128; + + // Create Hybrid Collection + mapping.set_collection_name("test_hybrid"); + auto field_0 = mapping.add_fields(); + field_0->set_name("field_0"); + field_0->mutable_type()->set_data_type(::milvus::grpc::DataType::INT64); + + auto field_1 = mapping.add_fields(); + field_1->mutable_type()->mutable_vector_param()->set_dimension(128); + field_1->set_name("field_1"); + + milvus::json json_param = {{"metric_type", 1}, {"engine_type", 1}}; + auto extra_param = field_1->add_extra_params(); + extra_param->set_key("params"); + extra_param->set_value(json_param.dump()); + + fiu_enable("CreateHybridCollectionRequest.OnExecute.invalid_collection_name", 1, NULL, 0); + handler->CreateHybridCollection(&context, &mapping, &response); + fiu_disable("CreateHybridCollectionRequest.OnExecute.invalid_collection_name"); + + fiu_enable("CreateHybridCollectionRequest.OnExecute.invalid_db_execute", 1, NULL, 0); + handler->CreateHybridCollection(&context, &mapping, &response); + fiu_disable("CreateHybridCollectionRequest.OnExecute.invalid_db_execute"); + + handler->CreateHybridCollection(&context, &mapping, &response); + milvus::grpc::CollectionName grpc_collection_name; + grpc_collection_name.set_collection_name("test_hybrid"); + fiu_enable("DescribeHybridCollectionRequest.OnExecute.invalid_db_execute", 1, NULL, 0); + handler->DescribeHybridCollection(&context, &grpc_collection_name, &mapping); + fiu_disable("DescribeHybridCollectionRequest.OnExecute.invalid_db_execute"); + handler->DescribeHybridCollection(&context, &grpc_collection_name, &mapping); + + // Insert Entities + milvus::grpc::HInsertParam insert_param; + milvus::grpc::HEntityIDs entity_ids; + insert_param.set_collection_name("test_hybrid"); + + auto entity = insert_param.mutable_entities(); + auto field_name_0 = entity->add_field_names(); + *field_name_0 = "field_0"; + auto field_name_1 = entity->add_field_names(); + *field_name_1 = "field_1"; + + entity->set_row_num(row_num); + std::vector field_value(row_num, 0); + for (uint64_t i = 0; i < row_num; i++) { + field_value[i] = i; + } + entity->set_attr_records(field_value.data(), row_num * sizeof(int64_t)); + + std::vector> vector_field; + vector_field.resize(row_num); + for (uint64_t i = 0; i < row_num; ++i) { + vector_field[i].resize(dimension); + for (uint64_t j = 0; j < dimension; ++j) { + vector_field[i][j] = (float)((i + 10) / (j + 20)); + } + } + auto vector_record = entity->add_result_values(); + for (uint64_t i = 0; i < row_num; ++i) { + auto record = vector_record->mutable_vector_value()->add_value(); + auto vector_data = record->mutable_float_data(); + vector_data->Resize(static_cast(vector_field[i].size()), 0.0); + memcpy(vector_data->mutable_data(), vector_field[i].data(), vector_field[i].size() * sizeof(float)); + } + + fiu_enable("InsertEntityRequest.OnExecute.throw_std_exception", 1, NULL, 0); + handler->InsertEntity(&context, &insert_param, &entity_ids); + fiu_disable("InsertEntityRequest.OnExecute.throw_std_exception"); + handler->InsertEntity(&context, &insert_param, &entity_ids); + + uint64_t nq = 10; + uint64_t topk = 10; + milvus::grpc::HSearchParam search_param; + auto general_query = search_param.mutable_general_query(); + auto boolean_query_1 = general_query->mutable_boolean_query(); + boolean_query_1->set_occur(milvus::grpc::Occur::MUST); + auto general_query_1 = boolean_query_1->add_general_query(); + auto boolean_query_2 = general_query_1->mutable_boolean_query(); + auto term_query = boolean_query_2->add_general_query()->mutable_term_query(); + term_query->set_field_name("field_0"); + std::vector term_value(nq, 0); + for (uint64_t i = 0; i < nq; ++i) { + term_value[i] = i + nq; + } + term_query->set_value_num(nq); + term_query->set_values(term_value.data(), nq * sizeof(int64_t)); + + auto range_query = boolean_query_2->add_general_query()->mutable_range_query(); + range_query->set_field_name("field_0"); + auto comp1 = range_query->add_operand(); + comp1->set_operator_(::milvus::grpc::CompareOperator::GTE); + comp1->set_operand("0"); + auto comp2 = range_query->add_operand(); + comp2->set_operator_(::milvus::grpc::CompareOperator::LTE); + comp2->set_operand("100000"); + + auto vector_query = boolean_query_2->add_general_query()->mutable_vector_query(); + vector_query->set_field_name("field_1"); + vector_query->set_topk(topk); + vector_query->set_query_boost(2); + std::vector> query_vector; + query_vector.resize(nq); + for (uint64_t i = 0; i < nq; ++i) { + query_vector[i].resize(dimension); + for (uint64_t j = 0; j < dimension; ++j) { + query_vector[i][j] = (float)((j + 1) / (i + dimension)); + } + } + for (auto record : query_vector) { + auto row_record = vector_query->add_records(); + CopyRowRecord(row_record, record); + } + auto extra_param_1 = vector_query->add_extra_params(); + extra_param_1->set_key("params"); + milvus::json param = {{"nprobe", 16}}; + extra_param_1->set_value(param.dump()); + + search_param.set_collection_name("test_hybrid"); + auto search_extra_param = search_param.add_extra_params(); + search_extra_param->set_key("params"); + search_extra_param->set_value(""); + + milvus::grpc::TopKQueryResult topk_query_result; + fiu_enable("SearchRequest.OnExecute.throw_std_exception", 1, NULL, 0); + handler->HybridSearch(&context, &search_param, &topk_query_result); + fiu_disable("SearchRequest.OnExecute.throw_std_exception"); +} + ////////////////////////////////////////////////////////////////////// namespace { class DummyRequest : public milvus::server::BaseRequest {