From ebb01aa9b2ee7c54ae4cf128775e294b7837a8c5 Mon Sep 17 00:00:00 2001 From: groot Date: Tue, 28 Apr 2020 08:10:01 -0500 Subject: [PATCH] Fix failed to open file (#2138) * file reference Signed-off-by: groot * print info Signed-off-by: groot * avoid metric crash Signed-off-by: yhmo * refine code Signed-off-by: yhmo * apply delete bug Signed-off-by: groot * has partition check Signed-off-by: groot * duplicate id search Signed-off-by: groot * changelog Signed-off-by: groot --- CHANGELOG.md | 4 +- core/src/db/DBImpl.cpp | 249 ++++++++-------- core/src/db/DBImpl.h | 17 +- core/src/db/IndexFailedChecker.cpp | 17 ++ core/src/db/IndexFailedChecker.h | 3 + core/src/db/OngoingFileChecker.cpp | 130 -------- core/src/db/OngoingFileChecker.h | 59 ---- core/src/db/Types.h | 2 - core/src/db/insert/MemTable.cpp | 77 +++-- core/src/db/meta/FilesHolder.cpp | 237 +++++++++++++++ core/src/db/meta/FilesHolder.h | 113 +++++++ core/src/db/meta/Meta.h | 18 +- core/src/db/meta/MetaTypes.h | 3 + core/src/db/meta/MySQLMetaImpl.cpp | 81 +++-- core/src/db/meta/MySQLMetaImpl.h | 15 +- core/src/db/meta/SqliteMetaImpl.cpp | 279 +++++++++++------- core/src/db/meta/SqliteMetaImpl.h | 15 +- core/src/metrics/SystemInfo.cpp | 200 ++++++++----- .../metrics/prometheus/PrometheusMetrics.cpp | 42 +-- .../delivery/request/HasPartitionRequest.cpp | 16 + core/unittest/db/test_db.cpp | 6 - core/unittest/db/test_meta.cpp | 95 +++--- core/unittest/db/test_meta_mysql.cpp | 130 ++++---- core/unittest/db/test_misc.cpp | 26 -- core/unittest/db/test_search_by_id.cpp | 38 ++- 25 files changed, 1089 insertions(+), 783 deletions(-) delete mode 100644 core/src/db/OngoingFileChecker.cpp delete mode 100644 core/src/db/OngoingFileChecker.h create mode 100644 core/src/db/meta/FilesHolder.cpp create mode 100644 core/src/db/meta/FilesHolder.h diff --git a/CHANGELOG.md b/CHANGELOG.md index a3d0d20cc5..725989f9e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,11 +6,13 @@ Please mark all change in change log and use the issue from GitHub ## Bug - \#1705 Limit the insert data batch size +- \#1796 Too much low performance of building index on ubuntu-mysql-version - \#1929 Skip MySQL meta schema field width check - \#1997 Index file missed after compact - \#2073 Fix CheckDBConfigBackendUrl error message - \#2076 CheckMetricConfigAddress error message -- \#1796 Too much low performance of building index on ubuntu-mysql-version +- \#2128 Check has_partition params +- \#2131 Distance/ID returned is not correct if searching with duplicate ids - \#2141 Fix server start failed if wal directory exist ## Feature diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 50944055e8..bbe001354f 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -339,8 +339,8 @@ DBImpl::GetCollectionInfo(const std::string& collection_id, std::string& collect size_t total_row_count = 0; auto get_info = [&](const std::string& col_id, const std::string& tag) { - meta::SegmentsSchema collection_files; - status = meta_ptr_->FilesByType(col_id, file_types, collection_files); + meta::FilesHolder files_holder; + status = meta_ptr_->FilesByType(col_id, file_types, files_holder); if (!status.ok()) { std::string err_msg = "Failed to get collection info: " + status.ToString(); LOG_ENGINE_ERROR_ << err_msg; @@ -352,6 +352,7 @@ DBImpl::GetCollectionInfo(const std::string& collection_id, std::string& collect milvus::json json_segments; size_t row_count = 0; + milvus::engine::meta::SegmentsSchema& collection_files = files_holder.HoldFiles(); for (auto& file : collection_files) { milvus::json json_segment; json_segment[JSON_SEGMENT_NAME] = file.segment_id_; @@ -401,8 +402,8 @@ DBImpl::PreloadCollection(const std::string& collection_id) { } // step 1: get all collection files from parent collection - meta::SegmentsSchema files_array; - auto status = GetFilesToSearch(collection_id, files_array); + meta::FilesHolder files_holder; + auto status = meta_ptr_->FilesToSearch(collection_id, files_holder); if (!status.ok()) { return status; } @@ -411,7 +412,7 @@ DBImpl::PreloadCollection(const std::string& collection_id) { std::vector partition_array; status = meta_ptr_->ShowPartitions(collection_id, partition_array); for (auto& schema : partition_array) { - status = GetFilesToSearch(schema.collection_id_, files_array); + status = meta_ptr_->FilesToSearch(schema.collection_id_, files_holder); } int64_t size = 0; @@ -420,6 +421,7 @@ DBImpl::PreloadCollection(const std::string& collection_id) { int64_t available_size = cache_total - cache_usage; // step 3: load file one by one + milvus::engine::meta::SegmentsSchema& files_array = files_holder.HoldFiles(); LOG_ENGINE_DEBUG_ << "Begin pre-load collection:" + collection_id + ", totally " << files_array.size() << " files need to be pre-loaded"; TimeRecorderAuto rc("Pre-load collection:" + collection_id); @@ -881,19 +883,19 @@ DBImpl::Compact(const std::string& collection_id) { // Get files to compact from meta. std::vector file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX, meta::SegmentSchema::FILE_TYPE::BACKUP}; - meta::SegmentsSchema files_to_compact; - status = meta_ptr_->FilesByType(collection_id, file_types, files_to_compact); + meta::FilesHolder files_holder; + status = meta_ptr_->FilesByType(collection_id, file_types, files_holder); if (!status.ok()) { std::string err_msg = "Failed to get files to compact: " + status.message(); LOG_ENGINE_ERROR_ << err_msg; return Status(DB_ERROR, err_msg); } - LOG_ENGINE_DEBUG_ << "Found " << files_to_compact.size() << " segment to compact"; - - OngoingFileChecker::GetInstance().MarkOngoingFiles(files_to_compact); + LOG_ENGINE_DEBUG_ << "Found " << files_holder.HoldFiles().size() << " segment to compact"; Status compact_status; + // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal + milvus::engine::meta::SegmentsSchema files_to_compact = files_holder.HoldFiles(); for (auto iter = files_to_compact.begin(); iter != files_to_compact.end();) { meta::SegmentSchema file = *iter; iter = files_to_compact.erase(iter); @@ -906,7 +908,7 @@ DBImpl::Compact(const std::string& collection_id) { size_t deleted_docs_size; status = segment_reader.ReadDeletedDocsSize(deleted_docs_size); if (!status.ok()) { - OngoingFileChecker::GetInstance().UnmarkOngoingFile(file); + files_holder.UnmarkFile(file); continue; // skip this file and try compact next one } @@ -917,26 +919,24 @@ DBImpl::Compact(const std::string& collection_id) { if (!compact_status.ok()) { LOG_ENGINE_ERROR_ << "Compact failed for segment " << file.segment_id_ << ": " << compact_status.message(); - OngoingFileChecker::GetInstance().UnmarkOngoingFile(file); + files_holder.UnmarkFile(file); continue; // skip this file and try compact next one } } else { - OngoingFileChecker::GetInstance().UnmarkOngoingFile(file); + files_holder.UnmarkFile(file); LOG_ENGINE_DEBUG_ << "Segment " << file.segment_id_ << " has no deleted data. No need to compact"; continue; // skip this file and try compact next one } LOG_ENGINE_DEBUG_ << "Updating meta after compaction..."; status = meta_ptr_->UpdateCollectionFiles(files_to_update); - OngoingFileChecker::GetInstance().UnmarkOngoingFile(file); + files_holder.UnmarkFile(file); if (!status.ok()) { compact_status = status; break; // meta error, could not go on } } - OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files_to_compact); - if (compact_status.ok()) { LOG_ENGINE_DEBUG_ << "Finished compacting collection: " << collection_id; } @@ -1006,15 +1006,18 @@ DBImpl::CompactFile(const std::string& collection_id, const meta::SegmentSchema& // Set all files in segment to TO_DELETE auto& segment_id = file.segment_id_; - meta::SegmentsSchema segment_files; - status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, segment_files); + meta::FilesHolder files_holder; + status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, files_holder); if (!status.ok()) { return status; } + + milvus::engine::meta::SegmentsSchema& segment_files = files_holder.HoldFiles(); for (auto& f : segment_files) { f.file_type_ = meta::SegmentSchema::FILE_TYPE::TO_DELETE; files_to_update.emplace_back(f); } + files_holder.ReleaseFiles(); LOG_ENGINE_DEBUG_ << "Compacted segment " << compacted_file.segment_id_ << " from " << std::to_string(file.file_size_) << " bytes to " << std::to_string(compacted_file.file_size_) @@ -1044,47 +1047,35 @@ DBImpl::GetVectorsByID(const std::string& collection_id, const IDNumbers& id_arr return status; } - meta::SegmentsSchema files_to_query; + meta::FilesHolder files_holder; std::vector file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX, meta::SegmentSchema::FILE_TYPE::BACKUP}; - status = meta_ptr_->FilesByType(collection_id, file_types, files_to_query); + status = meta_ptr_->FilesByType(collection_id, file_types, files_holder); if (!status.ok()) { std::string err_msg = "Failed to get files for GetVectorsByID: " + status.message(); LOG_ENGINE_ERROR_ << err_msg; return status; } - OngoingFileChecker::GetInstance().MarkOngoingFiles(files_to_query); - std::vector partition_array; status = meta_ptr_->ShowPartitions(collection_id, partition_array); for (auto& schema : partition_array) { - meta::SegmentsSchema files; - status = meta_ptr_->FilesByType(schema.collection_id_, file_types, files); + status = meta_ptr_->FilesByType(schema.collection_id_, file_types, files_holder); if (!status.ok()) { - OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files_to_query); std::string err_msg = "Failed to get files for GetVectorByID: " + status.message(); LOG_ENGINE_ERROR_ << err_msg; return status; } - - OngoingFileChecker::GetInstance().MarkOngoingFiles(files); - files_to_query.insert(files_to_query.end(), std::make_move_iterator(files.begin()), - std::make_move_iterator(files.end())); } - if (files_to_query.empty()) { - OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files_to_query); + if (files_holder.HoldFiles().empty()) { LOG_ENGINE_DEBUG_ << "No files to get vector by id from"; return Status(DB_NOT_FOUND, "Collection is empty"); } cache::CpuCacheMgr::GetInstance()->PrintInfo(); - - status = GetVectorsByIdHelper(collection_id, id_array, vectors, files_to_query); - - OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files_to_query); + status = GetVectorsByIdHelper(collection_id, id_array, vectors, files_holder); cache::CpuCacheMgr::GetInstance()->PrintInfo(); return status; @@ -1108,12 +1099,13 @@ DBImpl::GetVectorIDs(const std::string& collection_id, const std::string& segmen } // step 2: find segment - meta::SegmentsSchema collection_files; - status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, collection_files); + meta::FilesHolder files_holder; + status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, files_holder); if (!status.ok()) { return status; } + milvus::engine::meta::SegmentsSchema& collection_files = files_holder.HoldFiles(); if (collection_files.empty()) { return Status(DB_NOT_FOUND, "Segment does not exist"); } @@ -1163,7 +1155,9 @@ DBImpl::GetVectorIDs(const std::string& collection_id, const std::string& segmen Status DBImpl::GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers& id_array, - std::vector& vectors, const meta::SegmentsSchema& files) { + std::vector& vectors, meta::FilesHolder& files_holder) { + // attention: this is a copy, not a reference, since the files_holder.UnMarkFile will change the array internal + milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles(); LOG_ENGINE_DEBUG_ << "Getting vector by id in " << files.size() << " files, id count = " << id_array.size(); // sometimes not all of id_array can be found, we need to return empty vector for id not found @@ -1243,6 +1237,9 @@ DBImpl::GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers& it++; } + + // unmark file, allow the file to be deleted + files_holder.UnmarkFile(file); } for (auto id : id_array) { @@ -1251,8 +1248,8 @@ DBImpl::GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers& VectorsData data; data.vector_count_ = vector_ref.vector_count_; if (data.vector_count_ > 0) { - data.float_data_.swap(vector_ref.float_data_); - data.binary_data_.swap(vector_ref.binary_data_); + data.float_data_ = vector_ref.float_data_; // copy data since there could be duplicated id + data.binary_data_ = vector_ref.binary_data_; // copy data since there could be duplicated id } vectors.emplace_back(data); } @@ -1468,12 +1465,11 @@ DBImpl::HybridQuery(const std::shared_ptr& context, const std:: } Status status; - meta::SegmentsSchema files_array; - + meta::FilesHolder files_holder; if (partition_tags.empty()) { // no partition tag specified, means search in whole table // get all table files from parent table - status = GetFilesToSearch(collection_id, files_array); + status = meta_ptr_->FilesToSearch(collection_id, files_holder); if (!status.ok()) { return status; } @@ -1484,14 +1480,14 @@ DBImpl::HybridQuery(const std::shared_ptr& context, const std:: return status; } for (auto& schema : partition_array) { - status = GetFilesToSearch(schema.collection_id_, files_array); + status = meta_ptr_->FilesToSearch(schema.collection_id_, files_holder); if (!status.ok()) { - return Status(DB_ERROR, "GetFilesToSearch failed in HybridQuery"); + return Status(DB_ERROR, "get files to search failed in HybridQuery"); } } - if (files_array.empty()) { - return Status::OK(); + if (files_holder.HoldFiles().empty()) { + return Status::OK(); // no files to search } } else { // get files from specified partitions @@ -1499,19 +1495,19 @@ DBImpl::HybridQuery(const std::shared_ptr& context, const std:: GetPartitionsByTags(collection_id, partition_tags, partition_name_array); for (auto& partition_name : partition_name_array) { - status = GetFilesToSearch(partition_name, files_array); + status = meta_ptr_->FilesToSearch(partition_name, files_holder); if (!status.ok()) { - return Status(DB_ERROR, "GetFilesToSearch failed in HybridQuery"); + return Status(DB_ERROR, "get files to search failed in HybridQuery"); } } - if (files_array.empty()) { + if (files_holder.HoldFiles().empty()) { return Status::OK(); } } cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query - status = HybridQueryAsync(query_ctx, collection_id, files_array, hybrid_search_context, general_query, attr_type, + status = HybridQueryAsync(query_ctx, collection_id, files_holder, hybrid_search_context, general_query, attr_type, nq, result_ids, result_distances); if (!status.ok()) { return status; @@ -1534,12 +1530,11 @@ DBImpl::Query(const std::shared_ptr& context, const std::string } Status status; - meta::SegmentsSchema files_array; - + meta::FilesHolder files_holder; if (partition_tags.empty()) { // no partition tag specified, means search in whole collection // get all collection files from parent collection - status = GetFilesToSearch(collection_id, files_array); + status = meta_ptr_->FilesToSearch(collection_id, files_holder); if (!status.ok()) { return status; } @@ -1547,11 +1542,11 @@ DBImpl::Query(const std::shared_ptr& context, const std::string std::vector partition_array; status = meta_ptr_->ShowPartitions(collection_id, partition_array); for (auto& schema : partition_array) { - status = GetFilesToSearch(schema.collection_id_, files_array); + status = meta_ptr_->FilesToSearch(schema.collection_id_, files_holder); } - if (files_array.empty()) { - return Status::OK(); + if (files_holder.HoldFiles().empty()) { + return Status::OK(); // no files to search } } else { // get files from specified partitions @@ -1562,16 +1557,16 @@ DBImpl::Query(const std::shared_ptr& context, const std::string } for (auto& partition_name : partition_name_array) { - status = GetFilesToSearch(partition_name, files_array); + status = meta_ptr_->FilesToSearch(partition_name, files_holder); } - if (files_array.empty()) { - return Status::OK(); + if (files_holder.HoldFiles().empty()) { + return Status::OK(); // no files to search } } cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query - status = QueryAsync(tracer.Context(), files_array, k, extra_params, vectors, result_ids, result_distances); + status = QueryAsync(tracer.Context(), files_holder, k, extra_params, vectors, result_ids, result_distances); cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query return status; @@ -1594,19 +1589,19 @@ DBImpl::QueryByFileID(const std::shared_ptr& context, const std ids.push_back(std::stoul(id, &sz)); } - meta::SegmentsSchema search_files; - auto status = meta_ptr_->FilesByID(ids, search_files); + meta::FilesHolder files_holder; + auto status = meta_ptr_->FilesByID(ids, files_holder); if (!status.ok()) { return status; } - fiu_do_on("DBImpl.QueryByFileID.empty_files_array", search_files.clear()); + milvus::engine::meta::SegmentsSchema& search_files = files_holder.HoldFiles(); if (search_files.empty()) { return Status(DB_ERROR, "Invalid file id"); } cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query - status = QueryAsync(tracer.Context(), search_files, k, extra_params, vectors, result_ids, result_distances); + status = QueryAsync(tracer.Context(), files_holder, k, extra_params, vectors, result_ids, result_distances); cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query return status; @@ -1625,12 +1620,13 @@ DBImpl::Size(uint64_t& result) { // internal methods /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// Status -DBImpl::QueryAsync(const std::shared_ptr& context, const meta::SegmentsSchema& files, uint64_t k, +DBImpl::QueryAsync(const std::shared_ptr& context, meta::FilesHolder& files_holder, uint64_t k, const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances) { milvus::server::ContextChild tracer(context, "Query Async"); server::CollectQueryMetrics metrics(vectors.vector_count_); + milvus::engine::meta::SegmentsSchema& files = files_holder.HoldFiles(); if (files.size() > milvus::scheduler::TASK_TABLE_MAX_COUNT) { std::string msg = "Search files count exceed scheduler limit: " + std::to_string(milvus::scheduler::TASK_TABLE_MAX_COUNT); @@ -1641,8 +1637,6 @@ DBImpl::QueryAsync(const std::shared_ptr& context, const meta:: TimeRecorder rc(""); // step 1: construct search job - auto status = OngoingFileChecker::GetInstance().MarkOngoingFiles(files); - LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, index file count: %ld", files.size()); scheduler::SearchJobPtr job = std::make_shared(tracer.Context(), k, extra_params, vectors); for (auto& file : files) { @@ -1654,7 +1648,7 @@ DBImpl::QueryAsync(const std::shared_ptr& context, const meta:: scheduler::JobMgrInst::GetInstance()->Put(job); job->WaitResult(); - status = OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files); + files_holder.ReleaseFiles(); if (!job->GetStatus().ok()) { return job->GetStatus(); } @@ -1669,7 +1663,7 @@ DBImpl::QueryAsync(const std::shared_ptr& context, const meta:: Status DBImpl::HybridQueryAsync(const std::shared_ptr& context, const std::string& table_id, - const meta::SegmentsSchema& files, context::HybridSearchContextPtr hybrid_search_context, + meta::FilesHolder& files_holder, context::HybridSearchContextPtr hybrid_search_context, query::GeneralQueryPtr general_query, std::unordered_map& attr_type, uint64_t& nq, ResultIds& result_ids, ResultDistances& result_distances) { @@ -1698,11 +1692,9 @@ DBImpl::HybridQueryAsync(const std::shared_ptr& context, const TimeRecorder rc(""); // step 1: construct search job - auto status = OngoingFileChecker::GetInstance().MarkOngoingFiles(files); - VectorsData vectors; - - LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, index file count: %ld", files.size()); + milvus::engine::meta::SegmentsSchema& files = files_holder.HoldFiles(); + LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, index file count: %ld", files_holder.HoldFiles().size()); scheduler::SearchJobPtr job = std::make_shared(query_async_ctx, general_query, attr_type, vectors); for (auto& file : files) { @@ -1714,7 +1706,7 @@ DBImpl::HybridQueryAsync(const std::shared_ptr& context, const scheduler::JobMgrInst::GetInstance()->Put(job); job->WaitResult(); - status = OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files); + files_holder.ReleaseFiles(); if (!job->GetStatus().ok()) { return job->GetStatus(); } @@ -1840,7 +1832,7 @@ DBImpl::StartMergeTask() { } Status -DBImpl::MergeFiles(const std::string& collection_id, const meta::SegmentsSchema& files) { +DBImpl::MergeFiles(const std::string& collection_id, meta::FilesHolder& files_holder) { // const std::lock_guard lock(flush_merge_compact_mutex_); LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id; @@ -1868,11 +1860,16 @@ DBImpl::MergeFiles(const std::string& collection_id, const meta::SegmentsSchema& utils::GetParentPath(collection_file.location_, new_segment_dir); auto segment_writer_ptr = std::make_shared(new_segment_dir); + // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal + milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles(); for (auto& file : files) { server::CollectMergeFilesMetrics metrics; std::string segment_dir_to_merge; utils::GetParentPath(file.location_, segment_dir_to_merge); segment_writer_ptr->Merge(segment_dir_to_merge, collection_file.file_id_); + + files_holder.UnmarkFile(file); + auto file_schema = file; file_schema.file_type_ = meta::SegmentSchema::TO_DELETE; updated.push_back(file_schema); @@ -1931,7 +1928,7 @@ DBImpl::MergeFiles(const std::string& collection_id, const meta::SegmentsSchema& } Status -DBImpl::MergeHybridFiles(const std::string& collection_id, const milvus::engine::meta::SegmentsSchema& files) { +DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& files_holder) { // const std::lock_guard lock(flush_merge_compact_mutex_); LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id; @@ -1959,11 +1956,16 @@ DBImpl::MergeHybridFiles(const std::string& collection_id, const milvus::engine: utils::GetParentPath(table_file.location_, new_segment_dir); auto segment_writer_ptr = std::make_shared(new_segment_dir); + // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal + milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles(); for (auto& file : files) { server::CollectMergeFilesMetrics metrics; std::string segment_dir_to_merge; utils::GetParentPath(file.location_, segment_dir_to_merge); segment_writer_ptr->Merge(segment_dir_to_merge, table_file.file_id_); + + files_holder.UnmarkFile(file); + auto file_schema = file; file_schema.file_type_ = meta::SegmentSchema::TO_DELETE; updated.push_back(file_schema); @@ -2024,21 +2026,19 @@ Status DBImpl::BackgroundMergeFiles(const std::string& collection_id) { const std::lock_guard lock(flush_merge_compact_mutex_); - meta::SegmentsSchema raw_files; - auto status = meta_ptr_->FilesToMerge(collection_id, raw_files); + meta::FilesHolder files_holder; + auto status = meta_ptr_->FilesToMerge(collection_id, files_holder); if (!status.ok()) { LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_id; return status; } - if (raw_files.size() < options_.merge_trigger_number_) { + if (files_holder.HoldFiles().size() < options_.merge_trigger_number_) { LOG_ENGINE_TRACE_ << "Files number not greater equal than merge trigger number, skip merge action"; return Status::OK(); } - status = OngoingFileChecker::GetInstance().MarkOngoingFiles(raw_files); - MergeFiles(collection_id, raw_files); - status = OngoingFileChecker::GetInstance().UnmarkOngoingFiles(raw_files); + MergeFiles(collection_id, files_holder); if (!initialized_.load(std::memory_order_acquire)) { LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection: " << collection_id; @@ -2103,13 +2103,14 @@ DBImpl::StartBuildIndexTask() { void DBImpl::BackgroundBuildIndex() { std::unique_lock lock(build_index_mutex_); - meta::SegmentsSchema to_index_files; - meta_ptr_->FilesToIndex(to_index_files); + meta::FilesHolder files_holder; + meta_ptr_->FilesToIndex(files_holder); + + milvus::engine::meta::SegmentsSchema& to_index_files = files_holder.HoldFiles(); Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files); if (!to_index_files.empty()) { - LOG_ENGINE_DEBUG_ << "Background build index thread begin"; - status = OngoingFileChecker::GetInstance().MarkOngoingFiles(to_index_files); + LOG_ENGINE_DEBUG_ << "Background build index thread begin " << to_index_files.size() << " files"; // step 2: put build index task to scheduler std::vector> job2file_map; @@ -2136,7 +2137,8 @@ DBImpl::BackgroundBuildIndex() { index_failed_checker_.MarkSucceedIndexFile(file_schema); } - status = OngoingFileChecker::GetInstance().UnmarkOngoingFile(file_schema); + status = files_holder.UnmarkFile(file_schema); + LOG_ENGINE_DEBUG_ << "Finish build index file " << file_schema.file_id_; } LOG_ENGINE_DEBUG_ << "Background build index thread finished"; @@ -2146,39 +2148,26 @@ DBImpl::BackgroundBuildIndex() { Status DBImpl::GetFilesToBuildIndex(const std::string& collection_id, const std::vector& file_types, - meta::SegmentsSchema& files) { - files.clear(); - auto status = meta_ptr_->FilesByType(collection_id, file_types, files); + meta::FilesHolder& files_holder) { + files_holder.ReleaseFiles(); + auto status = meta_ptr_->FilesByType(collection_id, file_types, files_holder); - // only build index for files that row count greater than certain threshold - for (auto it = files.begin(); it != files.end();) { - if ((*it).file_type_ == static_cast(meta::SegmentSchema::RAW) && - (*it).row_count_ < meta::BUILD_INDEX_THRESHOLD) { - it = files.erase(it); - } else { - ++it; + // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal + milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles(); + for (const milvus::engine::meta::SegmentSchema& file : files) { + if (file.file_type_ == static_cast(meta::SegmentSchema::RAW) && + file.row_count_ < meta::BUILD_INDEX_THRESHOLD) { + // skip build index for files that row count less than certain threshold + files_holder.UnmarkFile(file); + } else if (index_failed_checker_.IsFailedIndexFile(file)) { + // skip build index for files that failed before + files_holder.UnmarkFile(file); } } return Status::OK(); } -Status -DBImpl::GetFilesToSearch(const std::string& collection_id, meta::SegmentsSchema& files) { - LOG_ENGINE_DEBUG_ << "Collect files from collection: " << collection_id; - - meta::SegmentsSchema search_files; - auto status = meta_ptr_->FilesToSearch(collection_id, search_files); - if (!status.ok()) { - return status; - } - - for (auto& file : search_files) { - files.push_back(file); - } - return Status::OK(); -} - Status DBImpl::GetPartitionByTag(const std::string& collection_id, const std::string& partition_tag, std::string& partition_name) { @@ -2317,26 +2306,27 @@ DBImpl::WaitCollectionIndexRecursively(const std::string& collection_id, const C } // get files to build index - meta::SegmentsSchema collection_files; - auto status = GetFilesToBuildIndex(collection_id, file_types, collection_files); - int times = 1; + { + meta::FilesHolder files_holder; + auto status = GetFilesToBuildIndex(collection_id, file_types, files_holder); + int times = 1; - while (!collection_files.empty()) { - LOG_ENGINE_DEBUG_ << "Non index files detected! Will build index " << times; - if (!utils::IsRawIndexType(index.engine_type_)) { - status = meta_ptr_->UpdateCollectionFilesToIndex(collection_id); + while (!files_holder.HoldFiles().empty()) { + LOG_ENGINE_DEBUG_ << files_holder.HoldFiles().size() << " non-index files detected! Will build index " + << times; + if (!utils::IsRawIndexType(index.engine_type_)) { + status = meta_ptr_->UpdateCollectionFilesToIndex(collection_id); + } + + index_req_swn_.Wait_For(std::chrono::seconds(WAIT_BUILD_INDEX_INTERVAL)); + GetFilesToBuildIndex(collection_id, file_types, files_holder); + ++times; } - - index_req_swn_.Wait_For(std::chrono::seconds(WAIT_BUILD_INDEX_INTERVAL)); - GetFilesToBuildIndex(collection_id, file_types, collection_files); - ++times; - - index_failed_checker_.IgnoreFailedIndexFiles(collection_files); } // build index for partition std::vector partition_array; - status = meta_ptr_->ShowPartitions(collection_id, partition_array); + auto status = meta_ptr_->ShowPartitions(collection_id, partition_array); for (auto& schema : partition_array) { status = WaitCollectionIndexRecursively(schema.collection_id_, index); fiu_do_on("DBImpl.WaitCollectionIndexRecursively.fail_build_collection_Index_for_partition", @@ -2354,6 +2344,8 @@ DBImpl::WaitCollectionIndexRecursively(const std::string& collection_id, const C return Status(DB_ERROR, err_msg); } + LOG_ENGINE_DEBUG_ << "WaitCollectionIndexRecursively finished"; + return Status::OK(); } @@ -2673,6 +2665,7 @@ DBImpl::BackgroundMetricThread() { swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL)); StartMetricTask(); + meta::FilesHolder::PrintInfo(); } } diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index aa71e04df4..fddd7b72c5 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -27,9 +27,9 @@ #include "config/handler/EngineConfigHandler.h" #include "db/DB.h" #include "db/IndexFailedChecker.h" -#include "db/OngoingFileChecker.h" #include "db/Types.h" #include "db/insert/MemManager.h" +#include "db/meta/FilesHolder.h" #include "utils/ThreadPool.h" #include "wal/WalManager.h" @@ -183,20 +183,20 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi private: Status - QueryAsync(const std::shared_ptr& context, const meta::SegmentsSchema& files, uint64_t k, + QueryAsync(const std::shared_ptr& context, meta::FilesHolder& files_holder, uint64_t k, const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids, ResultDistances& result_distances); Status HybridQueryAsync(const std::shared_ptr& context, const std::string& table_id, - const meta::SegmentsSchema& files, context::HybridSearchContextPtr hybrid_search_context, + meta::FilesHolder& files_holder, context::HybridSearchContextPtr hybrid_search_context, query::GeneralQueryPtr general_query, std::unordered_map& attr_type, uint64_t& nq, ResultIds& result_ids, ResultDistances& result_distances); Status GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers& id_array, - std::vector& vectors, const meta::SegmentsSchema& files); + std::vector& vectors, meta::FilesHolder& files_holder); void InternalFlush(const std::string& collection_id = ""); @@ -226,7 +226,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi StartMergeTask(); Status - MergeFiles(const std::string& collection_id, const meta::SegmentsSchema& files); + MergeFiles(const std::string& collection_id, meta::FilesHolder& files_holder); Status BackgroundMergeFiles(const std::string& collection_id); @@ -235,7 +235,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi BackgroundMerge(std::set collection_ids); Status - MergeHybridFiles(const std::string& table_id, const meta::SegmentsSchema& files); + MergeHybridFiles(const std::string& table_id, meta::FilesHolder& files_holder); void StartBuildIndexTask(); @@ -254,10 +254,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi Status GetFilesToBuildIndex(const std::string& collection_id, const std::vector& file_types, - meta::SegmentsSchema& files); - - Status - GetFilesToSearch(const std::string& collection_id, meta::SegmentsSchema& files); + meta::FilesHolder& files_holder); Status GetPartitionByTag(const std::string& collection_id, const std::string& partition_tag, std::string& partition_name); diff --git a/core/src/db/IndexFailedChecker.cpp b/core/src/db/IndexFailedChecker.cpp index b18aa7f050..016e6399c1 100644 --- a/core/src/db/IndexFailedChecker.cpp +++ b/core/src/db/IndexFailedChecker.cpp @@ -74,6 +74,23 @@ IndexFailedChecker::MarkSucceedIndexFile(const meta::SegmentSchema& file) { return Status::OK(); } +bool +IndexFailedChecker::IsFailedIndexFile(const meta::SegmentSchema& file) { + std::lock_guard lck(mutex_); + + auto it_failed_files = index_failed_files_.find(file.collection_id_); + if (it_failed_files != index_failed_files_.end()) { + auto it_failed_file = it_failed_files->second.find(file.file_id_); + if (it_failed_file != it_failed_files->second.end()) { + if (it_failed_file->second.size() >= INDEX_FAILED_RETRY_TIME) { + return true; + } + } + } + + return false; +} + Status IndexFailedChecker::IgnoreFailedIndexFiles(meta::SegmentsSchema& table_files) { std::lock_guard lck(mutex_); diff --git a/core/src/db/IndexFailedChecker.h b/core/src/db/IndexFailedChecker.h index 4d98516e77..87e8a12085 100644 --- a/core/src/db/IndexFailedChecker.h +++ b/core/src/db/IndexFailedChecker.h @@ -36,6 +36,9 @@ class IndexFailedChecker { Status MarkSucceedIndexFile(const meta::SegmentSchema& file); + bool + IsFailedIndexFile(const meta::SegmentSchema& file); + Status IgnoreFailedIndexFiles(meta::SegmentsSchema& table_files); diff --git a/core/src/db/OngoingFileChecker.cpp b/core/src/db/OngoingFileChecker.cpp deleted file mode 100644 index def2ad9d95..0000000000 --- a/core/src/db/OngoingFileChecker.cpp +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#include "db/OngoingFileChecker.h" -#include "utils/Log.h" - -#include - -namespace milvus { -namespace engine { - -OngoingFileChecker& -OngoingFileChecker::GetInstance() { - static OngoingFileChecker instance; - return instance; -} - -Status -OngoingFileChecker::MarkOngoingFile(const meta::SegmentSchema& table_file) { - std::lock_guard lck(mutex_); - return MarkOngoingFileNoLock(table_file); -} - -Status -OngoingFileChecker::MarkOngoingFiles(const meta::SegmentsSchema& table_files) { - std::lock_guard lck(mutex_); - - for (auto& table_file : table_files) { - MarkOngoingFileNoLock(table_file); - } - - return Status::OK(); -} - -Status -OngoingFileChecker::UnmarkOngoingFile(const meta::SegmentSchema& table_file) { - std::lock_guard lck(mutex_); - return UnmarkOngoingFileNoLock(table_file); -} - -Status -OngoingFileChecker::UnmarkOngoingFiles(const meta::SegmentsSchema& table_files) { - std::lock_guard lck(mutex_); - - for (auto& table_file : table_files) { - UnmarkOngoingFileNoLock(table_file); - } - - return Status::OK(); -} - -bool -OngoingFileChecker::IsIgnored(const meta::SegmentSchema& schema) { - std::lock_guard lck(mutex_); - - auto iter = ongoing_files_.find(schema.collection_id_); - if (iter == ongoing_files_.end()) { - return false; - } else { - auto it_file = iter->second.find(schema.file_id_); - if (it_file == iter->second.end()) { - return false; - } else { - return (it_file->second > 0); - } - } -} - -Status -OngoingFileChecker::MarkOngoingFileNoLock(const meta::SegmentSchema& table_file) { - if (table_file.collection_id_.empty() || table_file.file_id_.empty()) { - return Status(DB_ERROR, "Invalid collection files"); - } - - auto iter = ongoing_files_.find(table_file.collection_id_); - if (iter == ongoing_files_.end()) { - File2RefCount files_refcount; - files_refcount.insert(std::make_pair(table_file.file_id_, 1)); - ongoing_files_.insert(std::make_pair(table_file.collection_id_, files_refcount)); - } else { - auto it_file = iter->second.find(table_file.file_id_); - if (it_file == iter->second.end()) { - iter->second[table_file.file_id_] = 1; - } else { - it_file->second++; - } - } - - LOG_ENGINE_DEBUG_ << "Mark ongoing file:" << table_file.file_id_ - << " refcount:" << ongoing_files_[table_file.collection_id_][table_file.file_id_]; - - return Status::OK(); -} - -Status -OngoingFileChecker::UnmarkOngoingFileNoLock(const meta::SegmentSchema& table_file) { - if (table_file.collection_id_.empty() || table_file.file_id_.empty()) { - return Status(DB_ERROR, "Invalid collection files"); - } - - auto iter = ongoing_files_.find(table_file.collection_id_); - if (iter != ongoing_files_.end()) { - auto it_file = iter->second.find(table_file.file_id_); - if (it_file != iter->second.end()) { - it_file->second--; - - LOG_ENGINE_DEBUG_ << "Unmark ongoing file:" << table_file.file_id_ << " refcount:" << it_file->second; - - if (it_file->second <= 0) { - iter->second.erase(table_file.file_id_); - if (iter->second.empty()) { - ongoing_files_.erase(table_file.collection_id_); - } - } - } - } - - return Status::OK(); -} - -} // namespace engine -} // namespace milvus diff --git a/core/src/db/OngoingFileChecker.h b/core/src/db/OngoingFileChecker.h deleted file mode 100644 index 48832c17b8..0000000000 --- a/core/src/db/OngoingFileChecker.h +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#pragma once - -#include "db/Types.h" -#include "meta/Meta.h" -#include "utils/Status.h" - -#include -#include -#include -#include - -namespace milvus { -namespace engine { - -class OngoingFileChecker { - public: - static OngoingFileChecker& - GetInstance(); - - Status - MarkOngoingFile(const meta::SegmentSchema& table_file); - - Status - MarkOngoingFiles(const meta::SegmentsSchema& table_files); - - Status - UnmarkOngoingFile(const meta::SegmentSchema& table_file); - - Status - UnmarkOngoingFiles(const meta::SegmentsSchema& table_files); - - bool - IsIgnored(const meta::SegmentSchema& schema); - - private: - Status - MarkOngoingFileNoLock(const meta::SegmentSchema& table_file); - - Status - UnmarkOngoingFileNoLock(const meta::SegmentSchema& table_file); - - private: - std::mutex mutex_; - Table2FileRef ongoing_files_; // collection id mapping to (file id mapping to ongoing ref-count) -}; - -} // namespace engine -} // namespace milvus diff --git a/core/src/db/Types.h b/core/src/db/Types.h index 95340e164f..6e212f6bb5 100644 --- a/core/src/db/Types.h +++ b/core/src/db/Types.h @@ -58,8 +58,6 @@ struct Entity { using File2ErrArray = std::map>; using Table2FileErr = std::map; -using File2RefCount = std::map; -using Table2FileRef = std::map; static const char* DEFAULT_PARTITON_TAG = "_default"; diff --git a/core/src/db/insert/MemTable.cpp b/core/src/db/insert/MemTable.cpp index d1d498e8d1..fea0b6fcda 100644 --- a/core/src/db/insert/MemTable.cpp +++ b/core/src/db/insert/MemTable.cpp @@ -16,9 +16,9 @@ #include #include "cache/CpuCacheMgr.h" -#include "db/OngoingFileChecker.h" #include "db/Utils.h" #include "db/insert/MemTable.h" +#include "db/meta/FilesHolder.h" #include "knowhere/index/vector_index/VecIndex.h" #include "segment/SegmentReader.h" #include "utils/Log.h" @@ -206,22 +206,22 @@ MemTable::ApplyDeletes() { std::vector file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX, meta::SegmentSchema::FILE_TYPE::BACKUP}; - meta::SegmentsSchema table_files; - auto status = meta_->FilesByType(collection_id_, file_types, table_files); + meta::FilesHolder files_holder; + auto status = meta_->FilesByType(collection_id_, file_types, files_holder); if (!status.ok()) { std::string err_msg = "Failed to apply deletes: " + status.ToString(); LOG_ENGINE_ERROR_ << err_msg; return Status(DB_ERROR, err_msg); } - OngoingFileChecker::GetInstance().MarkOngoingFiles(table_files); + // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal + milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles(); - std::unordered_map> ids_to_check_map; - - for (size_t i = 0; i < table_files.size(); ++i) { - auto& table_file = table_files[i]; + // which file need to be apply delete + std::unordered_map> ids_to_check_map; // file id mapping to delete ids + for (auto& file : files) { std::string segment_dir; - utils::GetParentPath(table_file.location_, segment_dir); + utils::GetParentPath(file.location_, segment_dir); segment::SegmentReader segment_reader(segment_dir); segment::IdBloomFilterPtr id_bloom_filter_ptr; @@ -229,37 +229,35 @@ MemTable::ApplyDeletes() { for (auto& id : doc_ids_to_delete_) { if (id_bloom_filter_ptr->Check(id)) { - ids_to_check_map[i].emplace_back(id); + ids_to_check_map[file.id_].emplace_back(id); } } } - meta::SegmentsSchema files_to_check; - for (auto& kv : ids_to_check_map) { - files_to_check.emplace_back(table_files[kv.first]); + // release unused files + for (auto& file : files) { + if (ids_to_check_map.find(file.id_) == ids_to_check_map.end()) { + files_holder.UnmarkFile(file); + } } - OngoingFileChecker::GetInstance().UnmarkOngoingFiles(table_files); + // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal + milvus::engine::meta::SegmentsSchema hold_files = files_holder.HoldFiles(); + recorder.RecordSection("Found " + std::to_string(hold_files.size()) + " segment to apply deletes"); - OngoingFileChecker::GetInstance().MarkOngoingFiles(files_to_check); + meta::SegmentsSchema files_to_update; + for (auto& file : hold_files) { + LOG_ENGINE_DEBUG_ << "Applying deletes in segment: " << file.segment_id_; - recorder.RecordSection("Found " + std::to_string(ids_to_check_map.size()) + " segment to apply deletes"); - - meta::SegmentsSchema table_files_to_update; - - for (auto& kv : ids_to_check_map) { - auto& table_file = table_files[kv.first]; - LOG_ENGINE_DEBUG_ << "Applying deletes in segment: " << table_file.segment_id_; - - TimeRecorder rec("handle segment " + table_file.segment_id_); + TimeRecorder rec("handle segment " + file.segment_id_); std::string segment_dir; - utils::GetParentPath(table_file.location_, segment_dir); + utils::GetParentPath(file.location_, segment_dir); segment::SegmentReader segment_reader(segment_dir); - auto& segment_id = table_file.segment_id_; - meta::SegmentsSchema segment_files; - status = meta_->GetCollectionFilesBySegmentId(segment_id, segment_files); + auto& segment_id = file.segment_id_; + meta::FilesHolder segment_holder; + status = meta_->GetCollectionFilesBySegmentId(segment_id, segment_holder); if (!status.ok()) { break; } @@ -267,8 +265,9 @@ MemTable::ApplyDeletes() { // Get all index that contains blacklist in cache std::vector indexes; std::vector blacklists; - for (auto& file : segment_files) { - auto data_obj_ptr = cache::CpuCacheMgr::GetInstance()->GetIndex(file.location_); + milvus::engine::meta::SegmentsSchema& segment_files = segment_holder.HoldFiles(); + for (auto& segment_file : segment_files) { + auto data_obj_ptr = cache::CpuCacheMgr::GetInstance()->GetIndex(segment_file.location_); auto index = std::static_pointer_cast(data_obj_ptr); if (index != nullptr) { faiss::ConcurrentBitsetPtr blacklist = index->GetBlacklist(); @@ -290,7 +289,7 @@ MemTable::ApplyDeletes() { break; } - auto& ids_to_check = kv.second; + auto& ids_to_check = ids_to_check_map[file.id_]; segment::DeletedDocsPtr deleted_docs = std::make_shared(); @@ -361,11 +360,13 @@ MemTable::ApplyDeletes() { rec.RecordSection("Updated bloom filter"); // Update collection file row count - for (auto& file : segment_files) { - if (file.file_type_ == meta::SegmentSchema::RAW || file.file_type_ == meta::SegmentSchema::TO_INDEX || - file.file_type_ == meta::SegmentSchema::INDEX || file.file_type_ == meta::SegmentSchema::BACKUP) { - file.row_count_ -= delete_count; - table_files_to_update.emplace_back(file); + for (auto& segment_file : segment_files) { + if (segment_file.file_type_ == meta::SegmentSchema::RAW || + segment_file.file_type_ == meta::SegmentSchema::TO_INDEX || + segment_file.file_type_ == meta::SegmentSchema::INDEX || + segment_file.file_type_ == meta::SegmentSchema::BACKUP) { + segment_file.row_count_ -= delete_count; + files_to_update.emplace_back(segment_file); } } rec.RecordSection("Update collection file row count in vector"); @@ -373,7 +374,7 @@ MemTable::ApplyDeletes() { recorder.RecordSection("Finished " + std::to_string(ids_to_check_map.size()) + " segment to apply deletes"); - status = meta_->UpdateCollectionFilesRowCount(table_files_to_update); + status = meta_->UpdateCollectionFilesRowCount(files_to_update); if (!status.ok()) { std::string err_msg = "Failed to apply deletes: " + status.ToString(); @@ -386,8 +387,6 @@ MemTable::ApplyDeletes() { recorder.RecordSection("Update deletes to meta"); recorder.ElapseFromBegin("Finished deletes"); - OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files_to_check); - return Status::OK(); } diff --git a/core/src/db/meta/FilesHolder.cpp b/core/src/db/meta/FilesHolder.cpp new file mode 100644 index 0000000000..80b0f764bc --- /dev/null +++ b/core/src/db/meta/FilesHolder.cpp @@ -0,0 +1,237 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "db/meta/FilesHolder.h" +#include "utils/Log.h" + +#include + +namespace milvus { +namespace engine { +namespace meta { + +////////////////////////////////////////////////////////////////////////////////////////////////////////// +FilesHolder::OngoingFileChecker& +FilesHolder::OngoingFileChecker::GetInstance() { + static OngoingFileChecker instance; + return instance; +} + +Status +FilesHolder::OngoingFileChecker::MarkOngoingFile(const meta::SegmentSchema& table_file) { + std::lock_guard lck(mutex_); + return MarkOngoingFileNoLock(table_file); +} + +Status +FilesHolder::OngoingFileChecker::MarkOngoingFiles(const meta::SegmentsSchema& table_files) { + std::lock_guard lck(mutex_); + + for (auto& table_file : table_files) { + MarkOngoingFileNoLock(table_file); + } + + return Status::OK(); +} + +Status +FilesHolder::OngoingFileChecker::UnmarkOngoingFile(const meta::SegmentSchema& table_file) { + std::lock_guard lck(mutex_); + return UnmarkOngoingFileNoLock(table_file); +} + +Status +FilesHolder::OngoingFileChecker::UnmarkOngoingFiles(const meta::SegmentsSchema& table_files) { + std::lock_guard lck(mutex_); + + for (auto& table_file : table_files) { + UnmarkOngoingFileNoLock(table_file); + } + + return Status::OK(); +} + +bool +FilesHolder::OngoingFileChecker::CanBeDeleted(const meta::SegmentSchema& schema) { + std::lock_guard lck(mutex_); + + auto iter = ongoing_files_.find(schema.collection_id_); + if (iter == ongoing_files_.end()) { + return true; + } else { + auto it_file = iter->second.find(schema.id_); + if (it_file == iter->second.end()) { + return true; + } else { + return (it_file->second > 0) ? false : true; + } + } +} + +void +FilesHolder::OngoingFileChecker::PrintInfo() { + std::lock_guard lck(mutex_); + if (!ongoing_files_.empty()) { + LOG_ENGINE_DEBUG_ << "File reference information:"; + for (meta::Table2FileRef::iterator iter = ongoing_files_.begin(); iter != ongoing_files_.end(); ++iter) { + LOG_ENGINE_DEBUG_ << "\t" << iter->first << ": " << iter->second.size() << " files in use"; + } + } +} + +Status +FilesHolder::OngoingFileChecker::MarkOngoingFileNoLock(const meta::SegmentSchema& table_file) { + if (table_file.collection_id_.empty() || table_file.file_id_.empty()) { + return Status(DB_ERROR, "Invalid collection files"); + } + + auto iter = ongoing_files_.find(table_file.collection_id_); + if (iter == ongoing_files_.end()) { + File2RefCount files_refcount; + files_refcount.insert(std::make_pair(table_file.id_, 1)); + ongoing_files_.insert(std::make_pair(table_file.collection_id_, files_refcount)); + } else { + auto it_file = iter->second.find(table_file.id_); + if (it_file == iter->second.end()) { + iter->second[table_file.id_] = 1; + } else { + it_file->second++; + } + } + + LOG_ENGINE_DEBUG_ << "Mark ongoing file:" << table_file.file_id_ + << " refcount:" << ongoing_files_[table_file.collection_id_][table_file.id_]; + + return Status::OK(); +} + +Status +FilesHolder::OngoingFileChecker::UnmarkOngoingFileNoLock(const meta::SegmentSchema& table_file) { + if (table_file.collection_id_.empty() || table_file.file_id_.empty()) { + return Status(DB_ERROR, "Invalid collection files"); + } + + auto iter = ongoing_files_.find(table_file.collection_id_); + if (iter != ongoing_files_.end()) { + auto it_file = iter->second.find(table_file.id_); + if (it_file != iter->second.end()) { + it_file->second--; + + LOG_ENGINE_DEBUG_ << "Unmark ongoing file:" << table_file.file_id_ << " refcount:" << it_file->second; + + if (it_file->second <= 0) { + iter->second.erase(table_file.id_); + if (iter->second.empty()) { + ongoing_files_.erase(table_file.collection_id_); + } + } + } + } + + return Status::OK(); +} + +////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +FilesHolder::FilesHolder() { +} + +FilesHolder::~FilesHolder() { + ReleaseFiles(); +} + +Status +FilesHolder::MarkFile(const meta::SegmentSchema& file) { + std::lock_guard lck(mutex_); + return MarkFileInternal(file); +} + +Status +FilesHolder::MarkFiles(const meta::SegmentsSchema& files) { + std::lock_guard lck(mutex_); + for (auto& file : files) { + MarkFileInternal(file); + } + + return Status::OK(); +} + +Status +FilesHolder::UnmarkFile(const meta::SegmentSchema& file) { + std::lock_guard lck(mutex_); + return UnmarkFileInternal(file); +} + +Status +FilesHolder::UnmarkFiles(const meta::SegmentsSchema& files) { + std::lock_guard lck(mutex_); + for (auto& file : files) { + UnmarkFileInternal(file); + } + + return Status::OK(); +} + +void +FilesHolder::ReleaseFiles() { + std::lock_guard lck(mutex_); + OngoingFileChecker::GetInstance().UnmarkOngoingFiles(hold_files_); + hold_files_.clear(); + unique_ids_.clear(); +} + +bool +FilesHolder::CanBeDeleted(const meta::SegmentSchema& file) { + return OngoingFileChecker::GetInstance().CanBeDeleted(file); +} + +void +FilesHolder::PrintInfo() { + return OngoingFileChecker::GetInstance().PrintInfo(); +} + +Status +FilesHolder::MarkFileInternal(const meta::SegmentSchema& file) { + if (unique_ids_.find(file.id_) != unique_ids_.end()) { + return Status::OK(); // already marked + } + + auto status = OngoingFileChecker::GetInstance().MarkOngoingFile(file); + if (status.ok()) { + unique_ids_.insert(file.id_); + hold_files_.push_back(file); + } + + return status; +} + +Status +FilesHolder::UnmarkFileInternal(const meta::SegmentSchema& file) { + if (unique_ids_.find(file.id_) == unique_ids_.end()) { + return Status::OK(); // no such file + } + + auto status = OngoingFileChecker::GetInstance().UnmarkOngoingFile(file); + if (status.ok()) { + for (auto iter = hold_files_.begin(); iter != hold_files_.end(); ++iter) { + if (file.id_ == (*iter).id_) { + hold_files_.erase(iter); + break; + } + } + + unique_ids_.erase(file.id_); + } + return status; +} + +} // namespace meta +} // namespace engine +} // namespace milvus diff --git a/core/src/db/meta/FilesHolder.h b/core/src/db/meta/FilesHolder.h new file mode 100644 index 0000000000..1c9afbbb16 --- /dev/null +++ b/core/src/db/meta/FilesHolder.h @@ -0,0 +1,113 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include "db/meta/Meta.h" +#include "utils/Status.h" + +#include +#include +#include +#include + +namespace milvus { +namespace engine { +namespace meta { + +class FilesHolder { + public: + FilesHolder(); + virtual ~FilesHolder(); + + Status + MarkFile(const meta::SegmentSchema& file); + + Status + MarkFiles(const meta::SegmentsSchema& files); + + Status + UnmarkFile(const meta::SegmentSchema& file); + + Status + UnmarkFiles(const meta::SegmentsSchema& files); + + const milvus::engine::meta::SegmentsSchema& + HoldFiles() const { + return hold_files_; + } + + milvus::engine::meta::SegmentsSchema& + HoldFiles() { + return hold_files_; + } + + void + ReleaseFiles(); + + static bool + CanBeDeleted(const meta::SegmentSchema& file); + + static void + PrintInfo(); + + private: + class OngoingFileChecker { + public: + static OngoingFileChecker& + GetInstance(); + + Status + MarkOngoingFile(const meta::SegmentSchema& file); + + Status + MarkOngoingFiles(const meta::SegmentsSchema& files); + + Status + UnmarkOngoingFile(const meta::SegmentSchema& file); + + Status + UnmarkOngoingFiles(const meta::SegmentsSchema& files); + + bool + CanBeDeleted(const meta::SegmentSchema& file); + + void + PrintInfo(); + + private: + Status + MarkOngoingFileNoLock(const meta::SegmentSchema& file); + + Status + UnmarkOngoingFileNoLock(const meta::SegmentSchema& file); + + private: + std::mutex mutex_; + meta::Table2FileRef ongoing_files_; // collection id mapping to (file id mapping to ongoing ref-count) + }; + + private: + Status + MarkFileInternal(const meta::SegmentSchema& file); + + Status + UnmarkFileInternal(const meta::SegmentSchema& file); + + private: + std::mutex mutex_; + milvus::engine::meta::SegmentsSchema hold_files_; + std::set unique_ids_; +}; + +} // namespace meta +} // namespace engine +} // namespace milvus diff --git a/core/src/db/meta/Meta.h b/core/src/db/meta/Meta.h index 9419502f3d..1bea3af863 100644 --- a/core/src/db/meta/Meta.h +++ b/core/src/db/meta/Meta.h @@ -19,6 +19,7 @@ #include "MetaTypes.h" #include "db/Options.h" #include "db/Types.h" +#include "db/meta/FilesHolder.h" #include "utils/Status.h" namespace milvus { @@ -32,6 +33,8 @@ static const char* META_COLLECTIONS = "Collections"; static const char* META_FIELDS = "Fields"; static const char* META_COLLECTIONFILES = "CollectionFiles"; +class FilesHolder; + class Meta { /* public: @@ -76,11 +79,10 @@ class Meta { CreateCollectionFile(SegmentSchema& file_schema) = 0; virtual Status - GetCollectionFiles(const std::string& collection_id, const std::vector& ids, - SegmentsSchema& table_files) = 0; + GetCollectionFiles(const std::string& collection_id, const std::vector& ids, FilesHolder& files_holder) = 0; virtual Status - GetCollectionFilesBySegmentId(const std::string& segment_id, SegmentsSchema& table_files) = 0; + GetCollectionFilesBySegmentId(const std::string& segment_id, FilesHolder& files_holder) = 0; virtual Status UpdateCollectionFile(SegmentSchema& file_schema) = 0; @@ -117,19 +119,19 @@ class Meta { GetPartitionName(const std::string& collection_name, const std::string& tag, std::string& partition_name) = 0; virtual Status - FilesToSearch(const std::string& collection_id, SegmentsSchema& files) = 0; + FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) = 0; virtual Status - FilesToMerge(const std::string& collection_id, SegmentsSchema& files) = 0; + FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) = 0; virtual Status - FilesToIndex(SegmentsSchema&) = 0; + FilesToIndex(FilesHolder& files_holder) = 0; virtual Status - FilesByType(const std::string& collection_id, const std::vector& file_types, SegmentsSchema& files) = 0; + FilesByType(const std::string& collection_id, const std::vector& file_types, FilesHolder& files_holder) = 0; virtual Status - FilesByID(const std::vector& ids, SegmentsSchema& files) = 0; + FilesByID(const std::vector& ids, FilesHolder& files_holder) = 0; virtual Status Size(uint64_t& result) = 0; diff --git a/core/src/db/meta/MetaTypes.h b/core/src/db/meta/MetaTypes.h index d4a6a801e4..5d555ec9a3 100644 --- a/core/src/db/meta/MetaTypes.h +++ b/core/src/db/meta/MetaTypes.h @@ -97,6 +97,9 @@ struct SegmentSchema { using SegmentSchemaPtr = std::shared_ptr; using SegmentsSchema = std::vector; +using File2RefCount = std::map; +using Table2FileRef = std::map; + namespace hybrid { enum class DataType { diff --git a/core/src/db/meta/MySQLMetaImpl.cpp b/core/src/db/meta/MySQLMetaImpl.cpp index 8d00eb6e2f..ecb3d35de3 100644 --- a/core/src/db/meta/MySQLMetaImpl.cpp +++ b/core/src/db/meta/MySQLMetaImpl.cpp @@ -31,7 +31,6 @@ #include "MetaConsts.h" #include "db/IDGenerator.h" -#include "db/OngoingFileChecker.h" #include "db/Utils.h" #include "metrics/Metrics.h" #include "utils/CommonUtil.h" @@ -768,7 +767,7 @@ MySQLMetaImpl::CreateCollectionFile(SegmentSchema& file_schema) { Status MySQLMetaImpl::GetCollectionFiles(const std::string& collection_id, const std::vector& ids, - SegmentsSchema& collection_files) { + FilesHolder& files_holder) { if (ids.empty()) { return Status::OK(); } @@ -830,10 +829,10 @@ MySQLMetaImpl::GetCollectionFiles(const std::string& collection_id, const std::v file_schema.dimension_ = collection_schema.dimension_; utils::GetCollectionFilePath(options_, file_schema); - collection_files.emplace_back(file_schema); + files_holder.MarkFile(file_schema); } - LOG_ENGINE_DEBUG_ << "Get collection files by id"; + LOG_ENGINE_DEBUG_ << "Get " << res.size() << " files by id from collection " << collection_id; return ret; } catch (std::exception& e) { return HandleException("Failed to get collection files", e.what()); @@ -841,8 +840,7 @@ MySQLMetaImpl::GetCollectionFiles(const std::string& collection_id, const std::v } Status -MySQLMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id, - milvus::engine::meta::SegmentsSchema& collection_files) { +MySQLMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id, FilesHolder& files_holder) { try { mysqlpp::StoreQueryResult res; { @@ -892,11 +890,11 @@ MySQLMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id, file_schema.dimension_ = collection_schema.dimension_; utils::GetCollectionFilePath(options_, file_schema); - collection_files.emplace_back(file_schema); + files_holder.MarkFile(file_schema); } } - LOG_ENGINE_DEBUG_ << "Get collection files by segment id"; + LOG_ENGINE_DEBUG_ << "Get " << res.size() << " files by segment id " << segment_id; return Status::OK(); } catch (std::exception& e) { return HandleException("Failed to get collection files by segment id", e.what()); @@ -1547,9 +1545,7 @@ MySQLMetaImpl::GetPartitionName(const std::string& collection_id, const std::str } Status -MySQLMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema& files) { - files.clear(); - +MySQLMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) { try { server::MetricCollector metric; mysqlpp::StoreQueryResult res; @@ -1589,6 +1585,7 @@ MySQLMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema& f } Status ret; + int64_t files_count = 0; for (auto& resRow : res) { SegmentSchema collection_file; collection_file.id_ = resRow["id"]; // implicit conversion @@ -1608,13 +1605,17 @@ MySQLMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema& f auto status = utils::GetCollectionFilePath(options_, collection_file); if (!status.ok()) { ret = status; + continue; } - files.emplace_back(collection_file); + files_holder.MarkFile(collection_file); + files_count++; } - if (res.size() > 0) { - LOG_ENGINE_DEBUG_ << "Collect " << res.size() << " to-search files"; + if (files_count == 0) { + LOG_ENGINE_DEBUG_ << "No file to search for collection: " << collection_id; + } else { + LOG_ENGINE_DEBUG_ << "Collect " << files_count << " to-search files in collection " << collection_id; } return ret; } catch (std::exception& e) { @@ -1623,9 +1624,7 @@ MySQLMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema& f } Status -MySQLMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& files) { - files.clear(); - +MySQLMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) { try { server::MetricCollector metric; @@ -1663,7 +1662,7 @@ MySQLMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& fi } // Scoped Connection Status ret; - int64_t to_merge_files = 0; + int64_t files_count = 0; for (auto& resRow : res) { SegmentSchema collection_file; collection_file.file_size_ = resRow["file_size"]; @@ -1688,14 +1687,15 @@ MySQLMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& fi auto status = utils::GetCollectionFilePath(options_, collection_file); if (!status.ok()) { ret = status; + continue; } - files.emplace_back(collection_file); - ++to_merge_files; + files_holder.MarkFile(collection_file); + files_count++; } - if (to_merge_files > 0) { - LOG_ENGINE_TRACE_ << "Collect " << to_merge_files << " to-merge files"; + if (files_count > 0) { + LOG_ENGINE_DEBUG_ << "Collect " << files_count << " to-merge files in collection " << collection_id; } return ret; } catch (std::exception& e) { @@ -1704,9 +1704,7 @@ MySQLMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& fi } Status -MySQLMetaImpl::FilesToIndex(SegmentsSchema& files) { - files.clear(); - +MySQLMetaImpl::FilesToIndex(FilesHolder& files_holder) { try { server::MetricCollector metric; mysqlpp::StoreQueryResult res; @@ -1735,9 +1733,10 @@ MySQLMetaImpl::FilesToIndex(SegmentsSchema& files) { } // Scoped Connection Status ret; + int64_t files_count = 0; std::map groups; - SegmentSchema collection_file; for (auto& resRow : res) { + SegmentSchema collection_file; collection_file.id_ = resRow["id"]; // implicit conversion resRow["table_id"].to_string(collection_file.collection_id_); resRow["segment_id"].to_string(collection_file.segment_id_); @@ -1769,11 +1768,12 @@ MySQLMetaImpl::FilesToIndex(SegmentsSchema& files) { ret = status; } - files.push_back(collection_file); + files_holder.MarkFile(collection_file); + files_count++; } - if (res.size() > 0) { - LOG_ENGINE_DEBUG_ << "Collect " << res.size() << " to-index files"; + if (files_count > 0) { + LOG_ENGINE_DEBUG_ << "Collect " << files_count << " to-index files"; } return ret; } catch (std::exception& e) { @@ -1783,16 +1783,13 @@ MySQLMetaImpl::FilesToIndex(SegmentsSchema& files) { Status MySQLMetaImpl::FilesByType(const std::string& collection_id, const std::vector& file_types, - SegmentsSchema& files) { + FilesHolder& files_holder) { if (file_types.empty()) { return Status(DB_ERROR, "file types array is empty"); } Status ret = Status::OK(); - try { - files.clear(); - mysqlpp::StoreQueryResult res; { mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_); @@ -1860,7 +1857,7 @@ MySQLMetaImpl::FilesByType(const std::string& collection_id, const std::vector& ids, SegmentsSchema& files) { - files.clear(); - +MySQLMetaImpl::FilesByID(const std::vector& ids, FilesHolder& files_holder) { if (ids.empty()) { return Status::OK(); } @@ -1977,6 +1972,7 @@ MySQLMetaImpl::FilesByID(const std::vector& ids, SegmentsSchema& files) std::map collections; Status ret; + int64_t files_count = 0; for (auto& resRow : res) { SegmentSchema collection_file; collection_file.id_ = resRow["id"]; // implicit conversion @@ -2002,11 +1998,14 @@ MySQLMetaImpl::FilesByID(const std::vector& ids, SegmentsSchema& files) auto status = utils::GetCollectionFilePath(options_, collection_file); if (!status.ok()) { ret = status; + continue; } - files.emplace_back(collection_file); + files_holder.MarkFile(collection_file); + files_count++; } + milvus::engine::meta::SegmentsSchema& files = files_holder.HoldFiles(); for (auto& collection_file : files) { CollectionSchema& collection_schema = collections[collection_file.collection_id_]; collection_file.dimension_ = collection_schema.dimension_; @@ -2015,10 +2014,10 @@ MySQLMetaImpl::FilesByID(const std::vector& ids, SegmentsSchema& files) collection_file.metric_type_ = collection_schema.metric_type_; } - if (files.empty()) { + if (files_count == 0) { LOG_ENGINE_ERROR_ << "No file to search in file id list"; } else { - LOG_ENGINE_DEBUG_ << "Collect " << files.size() << " files by id"; + LOG_ENGINE_DEBUG_ << "Collect " << files_count << " files by id"; } return ret; @@ -2221,7 +2220,7 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/) collection_file.file_type_ = resRow["file_type"]; // check if the file can be deleted - if (OngoingFileChecker::GetInstance().IsIgnored(collection_file)) { + if (!FilesHolder::CanBeDeleted(collection_file)) { LOG_ENGINE_DEBUG_ << "File:" << collection_file.file_id_ << " currently is in use, not able to delete now"; continue; // ignore this file, don't delete it diff --git a/core/src/db/meta/MySQLMetaImpl.h b/core/src/db/meta/MySQLMetaImpl.h index 6f26425b66..a6412c97dd 100644 --- a/core/src/db/meta/MySQLMetaImpl.h +++ b/core/src/db/meta/MySQLMetaImpl.h @@ -54,10 +54,10 @@ class MySQLMetaImpl : public Meta { Status GetCollectionFiles(const std::string& collection_id, const std::vector& ids, - SegmentsSchema& collection_files) override; + FilesHolder& files_holder) override; Status - GetCollectionFilesBySegmentId(const std::string& segment_id, SegmentsSchema& collection_files) override; + GetCollectionFilesBySegmentId(const std::string& segment_id, FilesHolder& files_holder) override; Status UpdateCollectionIndex(const std::string& collection_id, const CollectionIndex& index) override; @@ -104,19 +104,20 @@ class MySQLMetaImpl : public Meta { GetPartitionName(const std::string& collection_id, const std::string& tag, std::string& partition_name) override; Status - FilesToSearch(const std::string& collection_id, SegmentsSchema& files) override; + FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) override; Status - FilesToMerge(const std::string& collection_id, SegmentsSchema& files) override; + FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) override; Status - FilesToIndex(SegmentsSchema&) override; + FilesToIndex(FilesHolder& files_holder) override; Status - FilesByType(const std::string& collection_id, const std::vector& file_types, SegmentsSchema& files) override; + FilesByType(const std::string& collection_id, const std::vector& file_types, + FilesHolder& files_holder) override; Status - FilesByID(const std::vector& ids, SegmentsSchema& collection_files) override; + FilesByID(const std::vector& ids, FilesHolder& files_holder) override; Status Archive() override; diff --git a/core/src/db/meta/SqliteMetaImpl.cpp b/core/src/db/meta/SqliteMetaImpl.cpp index 30569d0f6a..c6ab5d2e10 100644 --- a/core/src/db/meta/SqliteMetaImpl.cpp +++ b/core/src/db/meta/SqliteMetaImpl.cpp @@ -26,7 +26,6 @@ #include "MetaConsts.h" #include "db/IDGenerator.h" -#include "db/OngoingFileChecker.h" #include "db/Utils.h" #include "metrics/Metrics.h" #include "utils/CommonUtil.h" @@ -65,9 +64,11 @@ StoragePrototype(const std::string& path) { return make_storage( path, make_table(META_ENVIRONMENT, make_column("global_lsn", &EnvironmentSchema::global_lsn_, default_value(0))), - make_table(META_TABLES, make_column("id", &CollectionSchema::id_, primary_key()), + make_table(META_TABLES, + make_column("id", &CollectionSchema::id_, primary_key()), make_column("table_id", &CollectionSchema::collection_id_, unique()), - make_column("state", &CollectionSchema::state_), make_column("dimension", &CollectionSchema::dimension_), + make_column("state", &CollectionSchema::state_), + make_column("dimension", &CollectionSchema::dimension_), make_column("created_on", &CollectionSchema::created_on_), make_column("flag", &CollectionSchema::flag_, default_value(0)), make_column("index_file_size", &CollectionSchema::index_file_size_), @@ -257,10 +258,12 @@ SqliteMetaImpl::CreateCollection(CollectionSchema& collection_schema) { } else { fiu_do_on("SqliteMetaImpl.CreateCollection.throw_exception", throw std::exception()); auto collection = ConnectorPtr->select(columns(&CollectionSchema::state_), - where(c(&CollectionSchema::collection_id_) == collection_schema.collection_id_)); + where(c(&CollectionSchema::collection_id_) + == collection_schema.collection_id_)); if (collection.size() == 1) { if (CollectionSchema::TO_DELETE == std::get<0>(collection[0])) { - return Status(DB_ERROR, "Collection already exists and it is in delete state, please wait a second"); + return Status(DB_ERROR, + "Collection already exists and it is in delete state, please wait a second"); } else { // Change from no error to already exist. return Status(DB_ALREADY_EXIST, "Collection already exists"); @@ -296,10 +299,19 @@ SqliteMetaImpl::DescribeCollection(CollectionSchema& collection_schema) { std::lock_guard meta_lock(meta_mutex_); fiu_do_on("SqliteMetaImpl.DescribeCollection.throw_exception", throw std::exception()); auto groups = ConnectorPtr->select( - columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, &CollectionSchema::created_on_, - &CollectionSchema::flag_, &CollectionSchema::index_file_size_, &CollectionSchema::engine_type_, - &CollectionSchema::index_params_, &CollectionSchema::metric_type_, &CollectionSchema::owner_collection_, - &CollectionSchema::partition_tag_, &CollectionSchema::version_, &CollectionSchema::flush_lsn_), + columns(&CollectionSchema::id_, + &CollectionSchema::state_, + &CollectionSchema::dimension_, + &CollectionSchema::created_on_, + &CollectionSchema::flag_, + &CollectionSchema::index_file_size_, + &CollectionSchema::engine_type_, + &CollectionSchema::index_params_, + &CollectionSchema::metric_type_, + &CollectionSchema::owner_collection_, + &CollectionSchema::partition_tag_, + &CollectionSchema::version_, + &CollectionSchema::flush_lsn_), where(c(&CollectionSchema::collection_id_) == collection_schema.collection_id_ and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); @@ -339,7 +351,8 @@ SqliteMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not std::lock_guard meta_lock(meta_mutex_); auto collections = ConnectorPtr->select( columns(&CollectionSchema::id_), - where(c(&CollectionSchema::collection_id_) == collection_id and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + where(c(&CollectionSchema::collection_id_) == collection_id + and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); if (collections.size() == 1) { has_or_not = true; } else { @@ -361,11 +374,21 @@ SqliteMetaImpl::AllCollections(std::vector& collection_schema_ // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); auto selected = ConnectorPtr->select( - columns(&CollectionSchema::id_, &CollectionSchema::collection_id_, &CollectionSchema::dimension_, &CollectionSchema::created_on_, - &CollectionSchema::flag_, &CollectionSchema::index_file_size_, &CollectionSchema::engine_type_, - &CollectionSchema::index_params_, &CollectionSchema::metric_type_, &CollectionSchema::owner_collection_, - &CollectionSchema::partition_tag_, &CollectionSchema::version_, &CollectionSchema::flush_lsn_), - where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE and c(&CollectionSchema::owner_collection_) == "")); + columns(&CollectionSchema::id_, + &CollectionSchema::collection_id_, + &CollectionSchema::dimension_, + &CollectionSchema::created_on_, + &CollectionSchema::flag_, + &CollectionSchema::index_file_size_, + &CollectionSchema::engine_type_, + &CollectionSchema::index_params_, + &CollectionSchema::metric_type_, + &CollectionSchema::owner_collection_, + &CollectionSchema::partition_tag_, + &CollectionSchema::version_, + &CollectionSchema::flush_lsn_), + where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE + and c(&CollectionSchema::owner_collection_) == "")); for (auto& collection : selected) { CollectionSchema schema; schema.id_ = std::get<0>(collection); @@ -404,7 +427,8 @@ SqliteMetaImpl::DropCollection(const std::string& collection_id) { // soft delete collection ConnectorPtr->update_all( set(c(&CollectionSchema::state_) = (int)CollectionSchema::TO_DELETE), - where(c(&CollectionSchema::collection_id_) == collection_id and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + where(c(&CollectionSchema::collection_id_) == collection_id + and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); LOG_ENGINE_DEBUG_ << "Successfully delete collection, collection id = " << collection_id; } catch (std::exception& e) { @@ -486,12 +510,10 @@ SqliteMetaImpl::CreateCollectionFile(SegmentSchema& file_schema) { Status SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std::vector& ids, - SegmentsSchema& collection_files) { + FilesHolder& files_holder) { try { fiu_do_on("SqliteMetaImpl.GetCollectionFiles.throw_exception", throw std::exception()); - collection_files.clear(); - CollectionSchema collection_schema; collection_schema.collection_id_ = collection_id; auto status = DescribeCollection(collection_schema); @@ -499,16 +521,23 @@ SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std:: return status; } - auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, - &SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_, - &SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_); + auto select_columns = columns(&SegmentSchema::id_, + &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, + &SegmentSchema::file_type_, + &SegmentSchema::file_size_, + &SegmentSchema::row_count_, + &SegmentSchema::date_, + &SegmentSchema::engine_type_, + &SegmentSchema::created_on_); decltype(ConnectorPtr->select(select_columns)) selected; { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); selected = ConnectorPtr->select(select_columns, - where(c(&SegmentSchema::collection_id_) == collection_id and in(&SegmentSchema::id_, ids) and - c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE)); + where(c(&SegmentSchema::collection_id_) == collection_id + and in(&SegmentSchema::id_, ids) and + c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE)); } Status result; @@ -531,10 +560,10 @@ SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std:: utils::GetCollectionFilePath(options_, file_schema); - collection_files.emplace_back(file_schema); + files_holder.MarkFile(file_schema); } - LOG_ENGINE_DEBUG_ << "Get collection files by id"; + LOG_ENGINE_DEBUG_ << "Get " << selected.size() << " files by id from collection " << collection_id; return result; } catch (std::exception& e) { return HandleException("Encounter exception when lookup collection files", e.what()); @@ -543,10 +572,8 @@ SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std:: Status SqliteMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id, - milvus::engine::meta::SegmentsSchema& collection_files) { + FilesHolder& files_holder) { try { - collection_files.clear(); - auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, @@ -556,8 +583,8 @@ SqliteMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id, // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); selected = ConnectorPtr->select(select_columns, - where(c(&SegmentSchema::segment_id_) == segment_id and - c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE)); + where(c(&SegmentSchema::segment_id_) == segment_id and + c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE)); } if (!selected.empty()) { @@ -586,11 +613,11 @@ SqliteMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id, file_schema.metric_type_ = collection_schema.metric_type_; utils::GetCollectionFilePath(options_, file_schema); - collection_files.emplace_back(file_schema); + files_holder.MarkFile(file_schema); } } - LOG_ENGINE_DEBUG_ << "Get collection files by segment id"; + LOG_ENGINE_DEBUG_ << "Get " << selected.size() << " files by segment id" << segment_id; return Status::OK(); } catch (std::exception& e) { return HandleException("Encounter exception when lookup collection files by segment id", e.what()); @@ -607,7 +634,8 @@ SqliteMetaImpl::UpdateCollectionFlag(const std::string& collection_id, int64_t f std::lock_guard meta_lock(meta_mutex_); // set all backup file to raw - ConnectorPtr->update_all(set(c(&CollectionSchema::flag_) = flag), where(c(&CollectionSchema::collection_id_) == collection_id)); + ConnectorPtr->update_all(set(c(&CollectionSchema::flag_) = flag), + where(c(&CollectionSchema::collection_id_) == collection_id)); LOG_ENGINE_DEBUG_ << "Successfully update collection flag, collection id = " << collection_id; } catch (std::exception& e) { std::string msg = "Encounter exception when update collection flag: collection_id = " + collection_id; @@ -627,7 +655,8 @@ SqliteMetaImpl::UpdateCollectionFlushLSN(const std::string& collection_id, uint6 ConnectorPtr->update_all(set(c(&CollectionSchema::flush_lsn_) = flush_lsn), where(c(&CollectionSchema::collection_id_) == collection_id)); - LOG_ENGINE_DEBUG_ << "Successfully update collection flush_lsn, collection id = " << collection_id << " flush_lsn = " << flush_lsn;; + LOG_ENGINE_DEBUG_ << "Successfully update collection flush_lsn, collection id = " << collection_id + << " flush_lsn = " << flush_lsn;; } catch (std::exception& e) { std::string msg = "Encounter exception when update collection lsn: collection_id = " + collection_id; return HandleException(msg, e.what()); @@ -645,7 +674,8 @@ SqliteMetaImpl::GetCollectionFlushLSN(const std::string& collection_id, uint64_t std::lock_guard meta_lock(meta_mutex_); auto selected = - ConnectorPtr->select(columns(&CollectionSchema::flush_lsn_), where(c(&CollectionSchema::collection_id_) == collection_id)); + ConnectorPtr->select(columns(&CollectionSchema::flush_lsn_), + where(c(&CollectionSchema::collection_id_) == collection_id)); if (selected.size() > 0) { flush_lsn = std::get<0>(selected[0]); @@ -671,7 +701,8 @@ SqliteMetaImpl::UpdateCollectionFile(SegmentSchema& file_schema) { std::lock_guard meta_lock(meta_mutex_); auto collections = ConnectorPtr->select(columns(&CollectionSchema::state_), - where(c(&CollectionSchema::collection_id_) == file_schema.collection_id_)); + where(c(&CollectionSchema::collection_id_) + == file_schema.collection_id_)); // if the collection has been deleted, just mark the collection file as TO_DELETE // clean thread will delete the file later @@ -684,7 +715,8 @@ SqliteMetaImpl::UpdateCollectionFile(SegmentSchema& file_schema) { LOG_ENGINE_DEBUG_ << "Update single collection file, file id = " << file_schema.file_id_; } catch (std::exception& e) { std::string msg = - "Exception update collection file: collection_id = " + file_schema.collection_id_ + " file_id = " + file_schema.file_id_; + "Exception update collection file: collection_id = " + file_schema.collection_id_ + " file_id = " + + file_schema.file_id_; return HandleException(msg, e.what()); } return Status::OK(); @@ -705,8 +737,10 @@ SqliteMetaImpl::UpdateCollectionFiles(SegmentsSchema& files) { continue; } auto collections = ConnectorPtr->select(columns(&CollectionSchema::id_), - where(c(&CollectionSchema::collection_id_) == file.collection_id_ and - c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + where( + c(&CollectionSchema::collection_id_) == file.collection_id_ and + c(&CollectionSchema::state_) + != (int)CollectionSchema::TO_DELETE)); if (collections.size() >= 1) { has_collections[file.collection_id_] = true; } else { @@ -769,10 +803,18 @@ SqliteMetaImpl::UpdateCollectionIndex(const std::string& collection_id, const Co auto collections = ConnectorPtr->select( - columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, &CollectionSchema::created_on_, - &CollectionSchema::flag_, &CollectionSchema::index_file_size_, &CollectionSchema::owner_collection_, - &CollectionSchema::partition_tag_, &CollectionSchema::version_, &CollectionSchema::flush_lsn_), - where(c(&CollectionSchema::collection_id_) == collection_id and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + columns(&CollectionSchema::id_, + &CollectionSchema::state_, + &CollectionSchema::dimension_, + &CollectionSchema::created_on_, + &CollectionSchema::flag_, + &CollectionSchema::index_file_size_, + &CollectionSchema::owner_collection_, + &CollectionSchema::partition_tag_, + &CollectionSchema::version_, + &CollectionSchema::flush_lsn_), + where(c(&CollectionSchema::collection_id_) == collection_id + and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); if (collections.size() > 0) { meta::CollectionSchema collection_schema; @@ -841,7 +883,8 @@ SqliteMetaImpl::DescribeCollectionIndex(const std::string& collection_id, Collec auto groups = ConnectorPtr->select( columns(&CollectionSchema::engine_type_, &CollectionSchema::index_params_, &CollectionSchema::metric_type_), - where(c(&CollectionSchema::collection_id_) == collection_id and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + where(c(&CollectionSchema::collection_id_) == collection_id + and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); if (groups.size() == 1) { index.engine_type_ = std::get<0>(groups[0]); @@ -902,7 +945,9 @@ SqliteMetaImpl::DropCollectionIndex(const std::string& collection_id) { } Status -SqliteMetaImpl::CreatePartition(const std::string& collection_id, const std::string& partition_name, const std::string& tag, +SqliteMetaImpl::CreatePartition(const std::string& collection_id, + const std::string& partition_name, + const std::string& tag, uint64_t lsn) { USING_SQLITE_WARNING server::MetricCollector metric; @@ -959,16 +1004,25 @@ SqliteMetaImpl::DropPartition(const std::string& partition_name) { } Status -SqliteMetaImpl::ShowPartitions(const std::string& collection_id, std::vector& partition_schema_array) { +SqliteMetaImpl::ShowPartitions(const std::string& collection_id, + std::vector& partition_schema_array) { try { server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.ShowPartitions.throw_exception", throw std::exception()); auto partitions = ConnectorPtr->select( - columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, &CollectionSchema::created_on_, - &CollectionSchema::flag_, &CollectionSchema::index_file_size_, &CollectionSchema::engine_type_, - &CollectionSchema::index_params_, &CollectionSchema::metric_type_, &CollectionSchema::partition_tag_, - &CollectionSchema::version_, &CollectionSchema::collection_id_), + columns(&CollectionSchema::id_, + &CollectionSchema::state_, + &CollectionSchema::dimension_, + &CollectionSchema::created_on_, + &CollectionSchema::flag_, + &CollectionSchema::index_file_size_, + &CollectionSchema::engine_type_, + &CollectionSchema::index_params_, + &CollectionSchema::metric_type_, + &CollectionSchema::partition_tag_, + &CollectionSchema::version_, + &CollectionSchema::collection_id_), where(c(&CollectionSchema::owner_collection_) == collection_id and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); @@ -997,7 +1051,9 @@ SqliteMetaImpl::ShowPartitions(const std::string& collection_id, std::vectorselect( columns(&CollectionSchema::collection_id_), - where(c(&CollectionSchema::owner_collection_) == collection_id and c(&CollectionSchema::partition_tag_) == valid_tag and + where(c(&CollectionSchema::owner_collection_) == collection_id + and c(&CollectionSchema::partition_tag_) == valid_tag and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); if (name.size() > 0) { partition_name = std::get<0>(name[0]); @@ -1024,9 +1081,7 @@ SqliteMetaImpl::GetPartitionName(const std::string& collection_id, const std::st } Status -SqliteMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema& files) { - files.clear(); - +SqliteMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) { try { server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.FilesToSearch.throw_exception", throw std::exception()); @@ -1058,6 +1113,7 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema& } Status ret; + int64_t files_count = 0; for (auto& file : selected) { SegmentSchema collection_file; collection_file.id_ = std::get<0>(file); @@ -1077,16 +1133,16 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema& auto status = utils::GetCollectionFilePath(options_, collection_file); if (!status.ok()) { ret = status; + continue; } - files.emplace_back(collection_file); + files_holder.MarkFile(collection_file); + files_count++; } - if (files.empty()) { - LOG_ENGINE_ERROR_ << "No file to search for collection: " << collection_id; - } - - if (selected.size() > 0) { - LOG_ENGINE_DEBUG_ << "Collect " << selected.size() << " to-search files"; + if (files_count == 0) { + LOG_ENGINE_DEBUG_ << "No file to search for collection: " << collection_id; + } else { + LOG_ENGINE_DEBUG_ << "Collect " << files_count << " to-search files in collection " << collection_id; } return ret; } catch (std::exception& e) { @@ -1095,9 +1151,7 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema& } Status -SqliteMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& files) { - files.clear(); - +SqliteMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) { try { fiu_do_on("SqliteMetaImpl.FilesToMerge.throw_exception", throw std::exception()); @@ -1120,13 +1174,13 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& f // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); selected = ConnectorPtr->select(select_columns, - where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::RAW and - c(&SegmentSchema::collection_id_) == collection_id), - order_by(&SegmentSchema::file_size_).desc()); + where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::RAW and + c(&SegmentSchema::collection_id_) == collection_id), + order_by(&SegmentSchema::file_size_).desc()); } Status result; - int64_t to_merge_files = 0; + int64_t files_count = 0; for (auto& file : selected) { SegmentSchema collection_file; collection_file.file_size_ = std::get<5>(file); @@ -1152,12 +1206,12 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& f result = status; } - files.emplace_back(collection_file); - ++to_merge_files; + files_holder.MarkFile(collection_file); + files_count++; } - if (to_merge_files > 0) { - LOG_ENGINE_TRACE_ << "Collect " << to_merge_files << " to-merge files"; + if (files_count > 0) { + LOG_ENGINE_DEBUG_ << "Collect " << files_count << " to-merge files in collection " << collection_id; } return result; } catch (std::exception& e) { @@ -1166,9 +1220,7 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& f } Status -SqliteMetaImpl::FilesToIndex(SegmentsSchema& files) { - files.clear(); - +SqliteMetaImpl::FilesToIndex(FilesHolder& files_holder) { try { fiu_do_on("SqliteMetaImpl.FilesToIndex.throw_exception", throw std::exception()); @@ -1183,14 +1235,14 @@ SqliteMetaImpl::FilesToIndex(SegmentsSchema& files) { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); selected = ConnectorPtr->select(select_columns, - where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::TO_INDEX)); + where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::TO_INDEX)); } - std::map groups; - SegmentSchema collection_file; - Status ret; + int64_t files_count = 0; + std::map groups; for (auto& file : selected) { + SegmentSchema collection_file; collection_file.id_ = std::get<0>(file); collection_file.collection_id_ = std::get<1>(file); collection_file.segment_id_ = std::get<2>(file); @@ -1222,11 +1274,13 @@ SqliteMetaImpl::FilesToIndex(SegmentsSchema& files) { collection_file.index_file_size_ = groups[collection_file.collection_id_].index_file_size_; collection_file.index_params_ = groups[collection_file.collection_id_].index_params_; collection_file.metric_type_ = groups[collection_file.collection_id_].metric_type_; - files.push_back(collection_file); + files_holder.MarkFile(collection_file); + + files_count++; } - if (selected.size() > 0) { - LOG_ENGINE_DEBUG_ << "Collect " << selected.size() << " to-index files"; + if (files_count > 0) { + LOG_ENGINE_DEBUG_ << "Collect " << files_count << " to-index files"; } return ret; } catch (std::exception& e) { @@ -1235,7 +1289,9 @@ SqliteMetaImpl::FilesToIndex(SegmentsSchema& files) { } Status -SqliteMetaImpl::FilesByType(const std::string& collection_id, const std::vector& file_types, SegmentsSchema& files) { +SqliteMetaImpl::FilesByType(const std::string& collection_id, + const std::vector& file_types, + FilesHolder& files_holder) { if (file_types.empty()) { return Status(DB_ERROR, "file types array is empty"); } @@ -1244,7 +1300,6 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, const std::vector< try { fiu_do_on("SqliteMetaImpl.FilesByType.throw_exception", throw std::exception()); - files.clear(); CollectionSchema collection_schema; collection_schema.collection_id_ = collection_id; @@ -1267,7 +1322,8 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, const std::vector< // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); selected = ConnectorPtr->select(select_columns, - where(in(&SegmentSchema::file_type_, file_types) and c(&SegmentSchema::collection_id_) == collection_id)); + where(in(&SegmentSchema::file_type_, file_types) + and c(&SegmentSchema::collection_id_) == collection_id)); } if (selected.size() >= 1) { @@ -1314,7 +1370,7 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, const std::vector< ret = status; } - files.emplace_back(file_schema); + files_holder.MarkFile(file_schema); } std::string msg = "Get collection files by type."; @@ -1351,9 +1407,7 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, const std::vector< } Status -SqliteMetaImpl::FilesByID(const std::vector& ids, SegmentsSchema& files) { - files.clear(); - +SqliteMetaImpl::FilesByID(const std::vector& ids, FilesHolder& files_holder) { if (ids.empty()) { return Status::OK(); } @@ -1383,6 +1437,7 @@ SqliteMetaImpl::FilesByID(const std::vector& ids, SegmentsSchema& files) std::map collections; Status ret; + int64_t files_count = 0; for (auto& file : selected) { SegmentSchema collection_file; collection_file.id_ = std::get<0>(file); @@ -1410,9 +1465,11 @@ SqliteMetaImpl::FilesByID(const std::vector& ids, SegmentsSchema& files) ret = status; } - files.emplace_back(collection_file); + files_holder.MarkFile(collection_file); + files_count++; } + milvus::engine::meta::SegmentsSchema& files = files_holder.HoldFiles(); for (auto& collection_file : files) { CollectionSchema& collection_schema = collections[collection_file.collection_id_]; collection_file.dimension_ = collection_schema.dimension_; @@ -1421,10 +1478,10 @@ SqliteMetaImpl::FilesByID(const std::vector& ids, SegmentsSchema& files) collection_file.metric_type_ = collection_schema.metric_type_; } - if (files.empty()) { + if (files_count == 0) { LOG_ENGINE_ERROR_ << "No file to search in file id list"; } else { - LOG_ENGINE_DEBUG_ << "Collect " << selected.size() << " files by id"; + LOG_ENGINE_DEBUG_ << "Collect " << files_count << " files by id"; } return ret; @@ -1576,9 +1633,9 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/ collection_file.date_ = std::get<6>(file); // check if the file can be deleted - if (OngoingFileChecker::GetInstance().IsIgnored(collection_file)) { + if (!FilesHolder::CanBeDeleted(collection_file)) { LOG_ENGINE_DEBUG_ << "File:" << collection_file.file_id_ - << " currently is in use, not able to delete now"; + << " currently is in use, not able to delete now"; continue; // ignore this file, don't delete it } @@ -1596,7 +1653,7 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/ utils::DeleteCollectionFilePath(options_, collection_file); LOG_ENGINE_DEBUG_ << "Remove file id:" << collection_file.file_id_ << " location:" - << collection_file.location_; + << collection_file.location_; collection_ids.insert(collection_file.collection_id_); segment_ids.insert(std::make_pair(collection_file.segment_id_, collection_file)); @@ -1627,7 +1684,8 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/ std::lock_guard meta_lock(meta_mutex_); auto collections = ConnectorPtr->select(columns(&CollectionSchema::id_, &CollectionSchema::collection_id_), - where(c(&CollectionSchema::state_) == (int)CollectionSchema::TO_DELETE)); + where( + c(&CollectionSchema::state_) == (int)CollectionSchema::TO_DELETE)); auto commited = ConnectorPtr->transaction([&]() mutable { for (auto& collection : collections) { @@ -1717,7 +1775,8 @@ SqliteMetaImpl::Count(const std::string& collection_id, uint64_t& result) { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); selected = ConnectorPtr->select(select_columns, - where(in(&SegmentSchema::file_type_, file_types) and c(&SegmentSchema::collection_id_) == collection_id)); + where(in(&SegmentSchema::file_type_, file_types) + and c(&SegmentSchema::collection_id_) == collection_id)); } CollectionSchema collection_schema; @@ -1784,7 +1843,7 @@ SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) { collection_file.file_size_ = std::get<1>(file); ids.push_back(collection_file.id_); LOG_ENGINE_DEBUG_ << "Discard file id=" << collection_file.file_id_ - << " file size=" << collection_file.file_size_; + << " file size=" << collection_file.file_size_; to_discard_size -= collection_file.file_size_; } @@ -1874,7 +1933,8 @@ SqliteMetaImpl::CreateHybridCollection(meta::CollectionSchema& collection_schema == collection_schema.collection_id_)); if (collection.size() == 1) { if (CollectionSchema::TO_DELETE == std::get<0>(collection[0])) { - return Status(DB_ERROR, "Collection already exists and it is in delete state, please wait a second"); + return Status(DB_ERROR, + "Collection already exists and it is in delete state, please wait a second"); } else { // Change from no error to already exist. return Status(DB_ALREADY_EXIST, "Collection already exists"); @@ -1923,10 +1983,19 @@ SqliteMetaImpl::DescribeHybridCollection(milvus::engine::meta::CollectionSchema& server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.DescriCollection.throw_exception", throw std::exception()); auto groups = ConnectorPtr->select( - columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, &CollectionSchema::created_on_, - &CollectionSchema::flag_, &CollectionSchema::index_file_size_, &CollectionSchema::engine_type_, - &CollectionSchema::index_params_, &CollectionSchema::metric_type_, &CollectionSchema::owner_collection_, - &CollectionSchema::partition_tag_, &CollectionSchema::version_, &CollectionSchema::flush_lsn_), + columns(&CollectionSchema::id_, + &CollectionSchema::state_, + &CollectionSchema::dimension_, + &CollectionSchema::created_on_, + &CollectionSchema::flag_, + &CollectionSchema::index_file_size_, + &CollectionSchema::engine_type_, + &CollectionSchema::index_params_, + &CollectionSchema::metric_type_, + &CollectionSchema::owner_collection_, + &CollectionSchema::partition_tag_, + &CollectionSchema::version_, + &CollectionSchema::flush_lsn_), where(c(&CollectionSchema::collection_id_) == collection_schema.collection_id_ and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); diff --git a/core/src/db/meta/SqliteMetaImpl.h b/core/src/db/meta/SqliteMetaImpl.h index a71931e155..6ce4af8f92 100644 --- a/core/src/db/meta/SqliteMetaImpl.h +++ b/core/src/db/meta/SqliteMetaImpl.h @@ -56,10 +56,10 @@ class SqliteMetaImpl : public Meta { Status GetCollectionFiles(const std::string& collection_id, const std::vector& ids, - SegmentsSchema& collection_files) override; + FilesHolder& files_holder) override; Status - GetCollectionFilesBySegmentId(const std::string& segment_id, SegmentsSchema& collection_files) override; + GetCollectionFilesBySegmentId(const std::string& segment_id, FilesHolder& files_holder) override; Status UpdateCollectionIndex(const std::string& collection_id, const CollectionIndex& index) override; @@ -106,19 +106,20 @@ class SqliteMetaImpl : public Meta { GetPartitionName(const std::string& collection_id, const std::string& tag, std::string& partition_name) override; Status - FilesToSearch(const std::string& collection_id, SegmentsSchema& files) override; + FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) override; Status - FilesToMerge(const std::string& collection_id, SegmentsSchema& files) override; + FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) override; Status - FilesToIndex(SegmentsSchema&) override; + FilesToIndex(FilesHolder& files_holder) override; Status - FilesByType(const std::string& collection_id, const std::vector& file_types, SegmentsSchema& files) override; + FilesByType(const std::string& collection_id, const std::vector& file_types, + FilesHolder& files_holder) override; Status - FilesByID(const std::vector& ids, SegmentsSchema& files) override; + FilesByID(const std::vector& ids, FilesHolder& files_holder) override; Status Size(uint64_t& result) override; diff --git a/core/src/metrics/SystemInfo.cpp b/core/src/metrics/SystemInfo.cpp index cab394ac9e..3ba70a245f 100644 --- a/core/src/metrics/SystemInfo.cpp +++ b/core/src/metrics/SystemInfo.cpp @@ -11,6 +11,7 @@ #include "metrics/SystemInfo.h" #include "thirdparty/nlohmann/json.hpp" +#include "utils/Exception.h" #include "utils/Log.h" #include @@ -38,24 +39,32 @@ SystemInfo::Init() { initialized_ = true; // initialize CPU information - FILE* file; - struct tms time_sample; - char line[128]; - last_cpu_ = times(&time_sample); - last_sys_cpu_ = time_sample.tms_stime; - last_user_cpu_ = time_sample.tms_utime; - file = fopen("/proc/cpuinfo", "r"); - num_processors_ = 0; - while (fgets(line, 128, file) != nullptr) { - if (strncmp(line, "processor", 9) == 0) { - num_processors_++; - } - if (strncmp(line, "physical", 8) == 0) { - num_physical_processors_ = ParseLine(line); + try { + struct tms time_sample; + char line[128]; + last_cpu_ = times(&time_sample); + last_sys_cpu_ = time_sample.tms_stime; + last_user_cpu_ = time_sample.tms_utime; + num_processors_ = 0; + FILE* file = fopen("/proc/cpuinfo", "r"); + if (file) { + while (fgets(line, 128, file) != nullptr) { + if (strncmp(line, "processor", 9) == 0) { + num_processors_++; + } + if (strncmp(line, "physical", 8) == 0) { + num_physical_processors_ = ParseLine(line); + } + } + fclose(file); + } else { + LOG_SERVER_ERROR_ << "Failed to read /proc/cpuinfo"; } + total_ram_ = GetPhysicalMemory(); + } catch (std::exception& ex) { + std::string msg = "Failed to read /proc/cpuinfo, reason: " + std::string(ex.what()); + LOG_SERVER_ERROR_ << msg; } - total_ram_ = GetPhysicalMemory(); - fclose(file); #ifdef MILVUS_GPU_VERSION // initialize GPU information @@ -75,10 +84,15 @@ SystemInfo::Init() { #endif // initialize network traffic information - std::pair in_and_out_octets = Octets(); - in_octets_ = in_and_out_octets.first; - out_octets_ = in_and_out_octets.second; - net_time_ = std::chrono::system_clock::now(); + try { + std::pair in_and_out_octets = Octets(); + in_octets_ = in_and_out_octets.first; + out_octets_ = in_and_out_octets.second; + net_time_ = std::chrono::system_clock::now(); + } catch (std::exception& ex) { + std::string msg = "Failed to initialize network traffic information, reason: " + std::string(ex.what()); + LOG_SERVER_ERROR_ << msg; + } } uint64_t @@ -101,27 +115,39 @@ SystemInfo::GetPhysicalMemory() { uint64_t totalPhysMem = memInfo.totalram; // Multiply in next statement to avoid int overflow on right hand side... totalPhysMem *= memInfo.mem_unit; + return totalPhysMem; } uint64_t SystemInfo::GetProcessUsedMemory() { - // Note: this value is in KB! - FILE* file = fopen("/proc/self/status", "r"); - constexpr uint64_t line_length = 128; - uint64_t result = -1; - constexpr uint64_t KB_SIZE = 1024; - char line[line_length]; + try { + // Note: this value is in KB! + FILE* file = fopen("/proc/self/status", "r"); + uint64_t result = 0; + constexpr uint64_t KB_SIZE = 1024; + if (file) { + constexpr uint64_t line_length = 128; + char line[line_length]; - while (fgets(line, line_length, file) != nullptr) { - if (strncmp(line, "VmRSS:", 6) == 0) { - result = ParseLine(line); - break; + while (fgets(line, line_length, file) != nullptr) { + if (strncmp(line, "VmRSS:", 6) == 0) { + result = ParseLine(line); + break; + } + } + fclose(file); + } else { + LOG_SERVER_ERROR_ << "Failed to read /proc/self/status"; } + + // return value in Byte + return (result * KB_SIZE); + } catch (std::exception& ex) { + std::string msg = "Failed to read /proc/self/status, reason: " + std::string(ex.what()); + LOG_SERVER_ERROR_ << msg; + return 0; } - fclose(file); - // return value in Byte - return (result * KB_SIZE); } double @@ -155,34 +181,40 @@ SystemInfo::CPUCorePercent() { std::vector SystemInfo::getTotalCpuTime(std::vector& work_time_array) { std::vector total_time_array; - FILE* file = fopen("/proc/stat", "r"); - fiu_do_on("SystemInfo.getTotalCpuTime.open_proc", file = NULL); - if (file == NULL) { - LOG_SERVER_ERROR_ << "Could not open stat file"; - return total_time_array; - } - - uint64_t user = 0, nice = 0, system = 0, idle = 0; - uint64_t iowait = 0, irq = 0, softirq = 0, steal = 0, guest = 0, guestnice = 0; - - for (int i = 0; i < num_processors_; i++) { - char buffer[1024]; - char* ret = fgets(buffer, sizeof(buffer) - 1, file); - fiu_do_on("SystemInfo.getTotalCpuTime.read_proc", ret = NULL); - if (ret == NULL) { - LOG_SERVER_ERROR_ << "Could not read stat file"; - fclose(file); + try { + FILE* file = fopen("/proc/stat", "r"); + fiu_do_on("SystemInfo.getTotalCpuTime.open_proc", file = NULL); + if (file == NULL) { + LOG_SERVER_ERROR_ << "Failed to read /proc/stat"; return total_time_array; } - sscanf(buffer, "cpu %16lu %16lu %16lu %16lu %16lu %16lu %16lu %16lu %16lu %16lu", &user, &nice, &system, &idle, - &iowait, &irq, &softirq, &steal, &guest, &guestnice); + uint64_t user = 0, nice = 0, system = 0, idle = 0; + uint64_t iowait = 0, irq = 0, softirq = 0, steal = 0, guest = 0, guestnice = 0; - work_time_array.push_back(user + nice + system); - total_time_array.push_back(user + nice + system + idle + iowait + irq + softirq + steal); + for (int i = 0; i < num_processors_; i++) { + char buffer[1024]; + char* ret = fgets(buffer, sizeof(buffer) - 1, file); + fiu_do_on("SystemInfo.getTotalCpuTime.read_proc", ret = NULL); + if (ret == NULL) { + LOG_SERVER_ERROR_ << "Could not read stat file"; + fclose(file); + return total_time_array; + } + + sscanf(buffer, "cpu %16lu %16lu %16lu %16lu %16lu %16lu %16lu %16lu %16lu %16lu", &user, &nice, &system, + &idle, &iowait, &irq, &softirq, &steal, &guest, &guestnice); + + work_time_array.push_back(user + nice + system); + total_time_array.push_back(user + nice + system + idle + iowait + irq + softirq + steal); + } + + fclose(file); + } catch (std::exception& ex) { + std::string msg = "Failed to read /proc/stat, reason: " + std::string(ex.what()); + LOG_SERVER_ERROR_ << msg; } - fclose(file); return total_time_array; } @@ -260,39 +292,43 @@ std::vector SystemInfo::CPUTemperature() { std::vector result; std::string path = "/sys/class/hwmon/"; + try { + DIR* dir = opendir(path.c_str()); + fiu_do_on("SystemInfo.CPUTemperature.opendir", dir = NULL); + if (!dir) { + LOG_SERVER_ERROR_ << "Could not open hwmon directory"; + return result; + } - DIR* dir = NULL; - dir = opendir(path.c_str()); - fiu_do_on("SystemInfo.CPUTemperature.opendir", dir = NULL); - if (!dir) { - LOG_SERVER_ERROR_ << "Could not open hwmon directory"; - return result; - } + struct dirent* ptr = NULL; + while ((ptr = readdir(dir)) != NULL) { + std::string filename(path); + filename.append(ptr->d_name); - struct dirent* ptr = NULL; - while ((ptr = readdir(dir)) != NULL) { - std::string filename(path); - filename.append(ptr->d_name); - - char buf[100]; - if (readlink(filename.c_str(), buf, 100) != -1) { - std::string m(buf); - if (m.find("coretemp") != std::string::npos) { - std::string object = filename; - object += "/temp1_input"; - FILE* file = fopen(object.c_str(), "r"); - fiu_do_on("SystemInfo.CPUTemperature.openfile", file = NULL); - if (file == nullptr) { - LOG_SERVER_ERROR_ << "Could not open temperature file"; - return result; + char buf[100]; + if (readlink(filename.c_str(), buf, 100) != -1) { + std::string m(buf); + if (m.find("coretemp") != std::string::npos) { + std::string object = filename; + object += "/temp1_input"; + FILE* file = fopen(object.c_str(), "r"); + fiu_do_on("SystemInfo.CPUTemperature.openfile", file = NULL); + if (file == nullptr) { + LOG_SERVER_ERROR_ << "Could not open temperature file"; + return result; + } + float temp; + fscanf(file, "%f", &temp); + result.push_back(temp / 1000); } - float temp; - fscanf(file, "%f", &temp); - result.push_back(temp / 1000); } } + closedir(dir); + } catch (std::exception& ex) { + std::string msg = "Failed to get cpu temperature, reason: " + std::string(ex.what()); + LOG_SERVER_ERROR_ << msg; } - closedir(dir); + return result; } diff --git a/core/src/metrics/prometheus/PrometheusMetrics.cpp b/core/src/metrics/prometheus/PrometheusMetrics.cpp index 3bdb2d1b9a..30db0913bb 100644 --- a/core/src/metrics/prometheus/PrometheusMetrics.cpp +++ b/core/src/metrics/prometheus/PrometheusMetrics.cpp @@ -158,26 +158,32 @@ PrometheusMetrics::OctetsSet() { return; } - // get old stats and reset them - uint64_t old_inoctets = SystemInfo::GetInstance().get_inoctets(); - uint64_t old_outoctets = SystemInfo::GetInstance().get_octets(); - auto old_time = SystemInfo::GetInstance().get_nettime(); - std::pair in_and_out_octets = SystemInfo::GetInstance().Octets(); - SystemInfo::GetInstance().set_inoctets(in_and_out_octets.first); - SystemInfo::GetInstance().set_outoctets(in_and_out_octets.second); - SystemInfo::GetInstance().set_nettime(); + try { + // get old stats and reset them + uint64_t old_inoctets = SystemInfo::GetInstance().get_inoctets(); + uint64_t old_outoctets = SystemInfo::GetInstance().get_octets(); + auto old_time = SystemInfo::GetInstance().get_nettime(); - // - constexpr double micro_to_second = 1e-6; - auto now_time = std::chrono::system_clock::now(); - auto total_microsecond = METRICS_MICROSECONDS(old_time, now_time); - auto total_second = total_microsecond * micro_to_second; - if (total_second == 0) { - return; + std::pair in_and_out_octets = SystemInfo::GetInstance().Octets(); + SystemInfo::GetInstance().set_inoctets(in_and_out_octets.first); + SystemInfo::GetInstance().set_outoctets(in_and_out_octets.second); + SystemInfo::GetInstance().set_nettime(); + + // + constexpr double micro_to_second = 1e-6; + auto now_time = std::chrono::system_clock::now(); + auto total_microsecond = METRICS_MICROSECONDS(old_time, now_time); + auto total_second = total_microsecond * micro_to_second; + if (total_second == 0) { + return; + } + + inoctets_gauge_.Set((in_and_out_octets.first - old_inoctets) / total_second); + outoctets_gauge_.Set((in_and_out_octets.second - old_outoctets) / total_second); + } catch (std::exception& ex) { + std::string msg = "Failed to set in/out octets, reason: " + std::string(ex.what()); + LOG_SERVER_ERROR_ << msg; } - - inoctets_gauge_.Set((in_and_out_octets.first - old_inoctets) / total_second); - outoctets_gauge_.Set((in_and_out_octets.second - old_outoctets) / total_second); } void diff --git a/core/src/server/delivery/request/HasPartitionRequest.cpp b/core/src/server/delivery/request/HasPartitionRequest.cpp index 7e823aa236..64a8a3aeaf 100644 --- a/core/src/server/delivery/request/HasPartitionRequest.cpp +++ b/core/src/server/delivery/request/HasPartitionRequest.cpp @@ -51,6 +51,22 @@ HasPartitionRequest::OnExecute() { return status; } + // only process root collection, ignore partition collection + engine::meta::CollectionSchema collection_schema; + collection_schema.collection_id_ = collection_name_; + status = DBWrapper::DB()->DescribeCollection(collection_schema); + if (!status.ok()) { + if (status.code() == DB_NOT_FOUND) { + return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_)); + } else { + return status; + } + } else { + if (!collection_schema.owner_collection_.empty()) { + return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_)); + } + } + std::vector schema_array; status = DBWrapper::DB()->ShowPartitions(collection_name_, schema_array); if (!status.ok()) { diff --git a/core/unittest/db/test_db.cpp b/core/unittest/db/test_db.cpp index 5c0601c055..9b8f7927a6 100644 --- a/core/unittest/db/test_db.cpp +++ b/core/unittest/db/test_db.cpp @@ -377,12 +377,6 @@ TEST_F(DBTest, SEARCH_TEST) { result_ids, result_distances); ASSERT_TRUE(stat.ok()); - -// FIU_ENABLE_FIU("DBImpl.QueryByFileID.empty_files_array"); -// stat = -// db_->QueryByFileID(dummy_context_, file_ids, k, json_params, xq, result_ids, result_distances); -// ASSERT_FALSE(stat.ok()); -// fiu_disable("DBImpl.QueryByFileID.empty_files_array"); } // TODO(zhiru): PQ build takes forever diff --git a/core/unittest/db/test_meta.cpp b/core/unittest/db/test_meta.cpp index ca787a30a5..1f2ab2ec22 100644 --- a/core/unittest/db/test_meta.cpp +++ b/core/unittest/db/test_meta.cpp @@ -22,7 +22,6 @@ #include #include #include -#include "src/db/OngoingFileChecker.h" TEST_F(MetaTest, COLLECTION_TEST) { auto collection_id = "meta_test_table"; @@ -149,13 +148,13 @@ TEST_F(MetaTest, FALID_TEST) { fiu_disable("SqliteMetaImpl.DeleteCollectionFiles.throw_exception"); } { - milvus::engine::meta::SegmentsSchema schemas; + milvus::engine::meta::FilesHolder files_holder; std::vector ids; - status = impl_->GetCollectionFiles("notexist", ids, schemas); + status = impl_->GetCollectionFiles("notexist", ids, files_holder); ASSERT_FALSE(status.ok()); FIU_ENABLE_FIU("SqliteMetaImpl.GetCollectionFiles.throw_exception"); - status = impl_->GetCollectionFiles(collection_id, ids, schemas); + status = impl_->GetCollectionFiles(collection_id, ids, files_holder); ASSERT_FALSE(status.ok()); ASSERT_EQ(status.code(), milvus::DB_META_TRANSACTION_FAILED); fiu_disable("SqliteMetaImpl.GetCollectionFiles.throw_exception"); @@ -260,12 +259,12 @@ TEST_F(MetaTest, FALID_TEST) { fiu_disable("SqliteMetaImpl.GetPartitionName.throw_exception"); } { - milvus::engine::meta::SegmentsSchema table_files; - status = impl_->FilesToSearch("notexist", table_files); + milvus::engine::meta::FilesHolder files_holder; + status = impl_->FilesToSearch("notexist", files_holder); ASSERT_EQ(status.code(), milvus::DB_NOT_FOUND); FIU_ENABLE_FIU("SqliteMetaImpl.FilesToSearch.throw_exception"); - status = impl_->FilesToSearch(collection_id, table_files); + status = impl_->FilesToSearch(collection_id, files_holder); ASSERT_EQ(status.code(), milvus::DB_META_TRANSACTION_FAILED); fiu_disable("SqliteMetaImpl.FilesToSearch.throw_exception"); } @@ -277,23 +276,23 @@ TEST_F(MetaTest, FALID_TEST) { file.file_type_ = milvus::engine::meta::SegmentSchema::TO_INDEX; impl_->UpdateCollectionFile(file); - milvus::engine::meta::SegmentsSchema files; + milvus::engine::meta::FilesHolder files_holder; FIU_ENABLE_FIU("SqliteMetaImpl_FilesToIndex_CollectionNotFound"); - status = impl_->FilesToIndex(files); + status = impl_->FilesToIndex(files_holder); ASSERT_EQ(status.code(), milvus::DB_NOT_FOUND); fiu_disable("SqliteMetaImpl_FilesToIndex_CollectionNotFound"); FIU_ENABLE_FIU("SqliteMetaImpl.FilesToIndex.throw_exception"); - status = impl_->FilesToIndex(files); + status = impl_->FilesToIndex(files_holder); ASSERT_EQ(status.code(), milvus::DB_META_TRANSACTION_FAILED); fiu_disable("SqliteMetaImpl.FilesToIndex.throw_exception"); } { - milvus::engine::meta::SegmentsSchema files; + milvus::engine::meta::FilesHolder files_holder; std::vector file_types; file_types.push_back(milvus::engine::meta::SegmentSchema::INDEX); FIU_ENABLE_FIU("SqliteMetaImpl.FilesByType.throw_exception"); - status = impl_->FilesByType(collection_id, file_types, files); + status = impl_->FilesByType(collection_id, file_types, files_holder); ASSERT_EQ(status.code(), milvus::DB_META_TRANSACTION_FAILED); fiu_disable("SqliteMetaImpl.FilesByType.throw_exception"); } @@ -410,8 +409,10 @@ TEST_F(MetaTest, COLLECTION_FILE_ROW_COUNT_TEST) { ASSERT_EQ(table_file.row_count_, cnt); std::vector ids = {table_file.id_}; - milvus::engine::meta::SegmentsSchema schemas; - status = impl_->GetCollectionFiles(collection_id, ids, schemas); + milvus::engine::meta::FilesHolder files_holder; + status = impl_->GetCollectionFiles(collection_id, ids, files_holder); + + milvus::engine::meta::SegmentsSchema& schemas = files_holder.HoldFiles(); ASSERT_EQ(schemas.size(), 1UL); ASSERT_EQ(table_file.row_count_, schemas[0].row_count_); ASSERT_EQ(table_file.file_id_, schemas[0].file_id_); @@ -470,10 +471,11 @@ TEST_F(MetaTest, ARCHIVE_TEST_DAYS) { impl.Archive(); int i = 0; - milvus::engine::meta::SegmentsSchema files_get; - status = impl.GetCollectionFiles(table_file.collection_id_, ids, files_get); + milvus::engine::meta::FilesHolder files_holder; + status = impl.GetCollectionFiles(table_file.collection_id_, ids, files_holder); ASSERT_TRUE(status.ok()); + milvus::engine::meta::SegmentsSchema& files_get = files_holder.HoldFiles(); for (auto& file : files_get) { if (days[i] < days_num) { ASSERT_EQ(file.file_type_, milvus::engine::meta::SegmentSchema::NEW); @@ -526,10 +528,11 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) { impl.Archive(); int i = 0; - milvus::engine::meta::SegmentsSchema files_get; - status = impl.GetCollectionFiles(table_file.collection_id_, ids, files_get); + milvus::engine::meta::FilesHolder files_holder; + status = impl.GetCollectionFiles(table_file.collection_id_, ids, files_holder); ASSERT_TRUE(status.ok()); + milvus::engine::meta::SegmentsSchema& files_get = files_holder.HoldFiles(); for (auto& file : files_get) { if (i >= 5) { ASSERT_EQ(file.file_type_, milvus::engine::meta::SegmentSchema::NEW); @@ -609,39 +612,40 @@ TEST_F(MetaTest, COLLECTION_FILES_TEST) { ASSERT_TRUE(status.ok()); ASSERT_EQ(total_row_count, raw_files_cnt + to_index_files_cnt + index_files_cnt); - milvus::engine::meta::SegmentsSchema files; - status = impl_->FilesToIndex(files); - ASSERT_EQ(files.size(), to_index_files_cnt); + milvus::engine::meta::FilesHolder files_holder; + status = impl_->FilesToIndex(files_holder); + ASSERT_EQ(files_holder.HoldFiles().size(), to_index_files_cnt); - milvus::engine::meta::SegmentsSchema table_files; - status = impl_->FilesToMerge(collection.collection_id_, table_files); - ASSERT_EQ(table_files.size(), raw_files_cnt); + files_holder.ReleaseFiles(); + status = impl_->FilesToMerge(collection.collection_id_, files_holder); + ASSERT_EQ(files_holder.HoldFiles().size(), raw_files_cnt); - status = impl_->FilesToIndex(files); - ASSERT_EQ(files.size(), to_index_files_cnt); + files_holder.ReleaseFiles(); + status = impl_->FilesToIndex(files_holder); + ASSERT_EQ(files_holder.HoldFiles().size(), to_index_files_cnt); - table_files.clear(); - status = impl_->FilesToSearch(collection_id, table_files); - ASSERT_EQ(table_files.size(), to_index_files_cnt + raw_files_cnt + index_files_cnt); + files_holder.ReleaseFiles(); + status = impl_->FilesToSearch(collection_id, files_holder); + ASSERT_EQ(files_holder.HoldFiles().size(), to_index_files_cnt + raw_files_cnt + index_files_cnt); std::vector ids; - for (auto& file : table_files) { + for (auto& file : files_holder.HoldFiles()) { ids.push_back(file.id_); } - size_t cnt = table_files.size(); - table_files.clear(); - status = impl_->FilesByID(ids, table_files); - ASSERT_EQ(table_files.size(), cnt); + size_t cnt = files_holder.HoldFiles().size(); + files_holder.ReleaseFiles(); + status = impl_->FilesByID(ids, files_holder); + ASSERT_EQ(files_holder.HoldFiles().size(), cnt); - table_files.clear(); + files_holder.ReleaseFiles(); ids = {9999999999UL}; - status = impl_->FilesByID(ids, table_files); - ASSERT_EQ(table_files.size(), 0); + status = impl_->FilesByID(ids, files_holder); + ASSERT_EQ(files_holder.HoldFiles().size(), 0); - table_files.clear(); + files_holder.ReleaseFiles(); std::vector file_types; - status = impl_->FilesByType(collection.collection_id_, file_types, table_files); - ASSERT_TRUE(table_files.empty()); + status = impl_->FilesByType(collection.collection_id_, file_types, files_holder); + ASSERT_TRUE(files_holder.HoldFiles().empty()); ASSERT_FALSE(status.ok()); file_types = { @@ -650,11 +654,11 @@ TEST_F(MetaTest, COLLECTION_FILES_TEST) { milvus::engine::meta::SegmentSchema::INDEX, milvus::engine::meta::SegmentSchema::RAW, milvus::engine::meta::SegmentSchema::BACKUP, }; - status = impl_->FilesByType(collection.collection_id_, file_types, table_files); + status = impl_->FilesByType(collection.collection_id_, file_types, files_holder); ASSERT_TRUE(status.ok()); uint64_t total_cnt = new_index_files_cnt + new_merge_files_cnt + backup_files_cnt + new_files_cnt + raw_files_cnt + to_index_files_cnt + index_files_cnt; - ASSERT_EQ(table_files.size(), total_cnt); + ASSERT_EQ(files_holder.HoldFiles().size(), total_cnt); status = impl_->DeleteCollectionFiles(collection_id); ASSERT_TRUE(status.ok()); @@ -670,15 +674,14 @@ TEST_F(MetaTest, COLLECTION_FILES_TEST) { status = impl_->CreateCollectionFile(table_file); std::vector files_to_delete; - milvus::engine::meta::SegmentsSchema files_schema; + files_holder.ReleaseFiles(); files_to_delete.push_back(milvus::engine::meta::SegmentSchema::TO_DELETE); - status = impl_->FilesByType(collection_id, files_to_delete, files_schema); + status = impl_->FilesByType(collection_id, files_to_delete, files_holder); ASSERT_TRUE(status.ok()); table_file.collection_id_ = collection_id; table_file.file_type_ = milvus::engine::meta::SegmentSchema::TO_DELETE; - table_file.file_id_ = files_schema.front().file_id_; - milvus::engine::OngoingFileChecker::GetInstance().MarkOngoingFile(table_file); + table_file.file_id_ = files_holder.HoldFiles().front().file_id_; status = impl_->CleanUpFilesWithTTL(1UL); ASSERT_TRUE(status.ok()); diff --git a/core/unittest/db/test_meta_mysql.cpp b/core/unittest/db/test_meta_mysql.cpp index 3daec35056..8b0c4091b4 100644 --- a/core/unittest/db/test_meta_mysql.cpp +++ b/core/unittest/db/test_meta_mysql.cpp @@ -23,7 +23,6 @@ #include #include #include -#include const char* FAILED_CONNECT_SQL_SERVER = "Failed to connect to meta server(mysql)"; const char* COLLECTION_ALREADY_EXISTS = "Collection already exists and it is in delete state, please wait a second"; @@ -231,44 +230,43 @@ TEST_F(MySqlMetaTest, COLLECTION_FILE_TEST) { ASSERT_TRUE(status.ok()); std::vector ids = {table_file.id_}; - milvus::engine::meta::SegmentsSchema files; - status = impl_->GetCollectionFiles(table_file.collection_id_, ids, files); - ASSERT_EQ(files.size(), 0UL); + milvus::engine::meta::FilesHolder files_holder; + status = impl_->GetCollectionFiles(table_file.collection_id_, ids, files_holder); + ASSERT_EQ(files_holder.HoldFiles().size(), 0UL); FIU_ENABLE_FIU("MySQLMetaImpl.GetCollectionFiles.null_connection"); - status = impl_->GetCollectionFiles(table_file.collection_id_, ids, files); + status = impl_->GetCollectionFiles(table_file.collection_id_, ids, files_holder); ASSERT_FALSE(status.ok()); fiu_disable("MySQLMetaImpl.GetCollectionFiles.null_connection"); FIU_ENABLE_FIU("MySQLMetaImpl.GetCollectionFiles.throw_exception"); - status = impl_->GetCollectionFiles(table_file.collection_id_, ids, files); + status = impl_->GetCollectionFiles(table_file.collection_id_, ids, files_holder); ASSERT_FALSE(status.ok()); fiu_disable("MySQLMetaImpl.GetCollectionFiles.throw_exception"); ids.clear(); - status = impl_->GetCollectionFiles(table_file.collection_id_, ids, files); + status = impl_->GetCollectionFiles(table_file.collection_id_, ids, files_holder); ASSERT_TRUE(status.ok()); table_file.collection_id_ = collection.collection_id_; table_file.file_type_ = milvus::engine::meta::SegmentSchema::RAW; status = impl_->CreateCollectionFile(table_file); ids = {table_file.id_}; - status = impl_->FilesByID(ids, files); - ASSERT_EQ(files.size(), 1UL); + status = impl_->FilesByID(ids, files_holder); + ASSERT_EQ(files_holder.HoldFiles().size(), 1UL); table_file.collection_id_ = collection.collection_id_; table_file.file_type_ = milvus::engine::meta::SegmentSchema::TO_DELETE; status = impl_->CreateCollectionFile(table_file); - std::vector files_to_delete; - files_to_delete.push_back(milvus::engine::meta::SegmentSchema::TO_DELETE); - status = impl_->FilesByType(collection_id, files_to_delete, files_schema); + files_holder.ReleaseFiles(); + std::vector files_to_delete = {milvus::engine::meta::SegmentSchema::TO_DELETE}; + status = impl_->FilesByType(collection_id, files_to_delete, files_holder); ASSERT_TRUE(status.ok()); table_file.collection_id_ = collection_id; table_file.file_type_ = milvus::engine::meta::SegmentSchema::TO_DELETE; - table_file.file_id_ = files_schema.front().file_id_; - milvus::engine::OngoingFileChecker::GetInstance().MarkOngoingFile(table_file); + table_file.file_id_ = files_holder.HoldFiles().front().file_id_; status = impl_->CleanUpFilesWithTTL(1UL); ASSERT_TRUE(status.ok()); @@ -306,8 +304,10 @@ TEST_F(MySqlMetaTest, COLLECTION_FILE_ROW_COUNT_TEST) { ASSERT_EQ(table_file.row_count_, cnt); std::vector ids = {table_file.id_}; - milvus::engine::meta::SegmentsSchema schemas; - status = impl_->GetCollectionFiles(collection_id, ids, schemas); + milvus::engine::meta::FilesHolder files_holder; + status = impl_->GetCollectionFiles(collection_id, ids, files_holder); + + milvus::engine::meta::SegmentsSchema& schemas = files_holder.HoldFiles(); ASSERT_EQ(schemas.size(), 1UL); ASSERT_EQ(table_file.row_count_, schemas[0].row_count_); ASSERT_EQ(table_file.file_id_, schemas[0].file_id_); @@ -371,11 +371,11 @@ TEST_F(MySqlMetaTest, ARCHIVE_TEST_DAYS) { impl.Archive(); int i = 0; - milvus::engine::meta::SegmentsSchema files_get; - status = impl.GetCollectionFiles(table_file.collection_id_, ids, files_get); + milvus::engine::meta::FilesHolder files_holder; + status = impl.GetCollectionFiles(table_file.collection_id_, ids, files_holder); ASSERT_TRUE(status.ok()); - for (auto& file : files_get) { + for (auto& file : files_holder.HoldFiles()) { if (days[i] < days_num) { ASSERT_EQ(file.file_type_, milvus::engine::meta::SegmentSchema::NEW); } @@ -385,21 +385,22 @@ TEST_F(MySqlMetaTest, ARCHIVE_TEST_DAYS) { std::vector file_types = { (int)milvus::engine::meta::SegmentSchema::NEW, }; - milvus::engine::meta::SegmentsSchema table_files; - status = impl.FilesByType(collection_id, file_types, table_files); - ASSERT_FALSE(table_files.empty()); + + files_holder.ReleaseFiles(); + status = impl.FilesByType(collection_id, file_types, files_holder); + ASSERT_FALSE(files_holder.HoldFiles().empty()); FIU_ENABLE_FIU("MySQLMetaImpl.FilesByType.null_connection"); - table_files.clear(); - status = impl.FilesByType(collection_id, file_types, table_files); + files_holder.ReleaseFiles(); + status = impl.FilesByType(collection_id, file_types, files_holder); ASSERT_FALSE(status.ok()); - ASSERT_TRUE(table_files.empty()); + ASSERT_TRUE(files_holder.HoldFiles().empty()); fiu_disable("MySQLMetaImpl.FilesByType.null_connection"); FIU_ENABLE_FIU("MySQLMetaImpl.FilesByType.throw_exception"); - status = impl.FilesByType(collection_id, file_types, table_files); + status = impl.FilesByType(collection_id, file_types, files_holder); ASSERT_FALSE(status.ok()); - ASSERT_TRUE(table_files.empty()); + ASSERT_TRUE(files_holder.HoldFiles().empty()); fiu_disable("MySQLMetaImpl.FilesByType.throw_exception"); status = impl.UpdateCollectionFilesToIndex(collection_id); @@ -463,11 +464,11 @@ TEST_F(MySqlMetaTest, ARCHIVE_TEST_DISK) { impl.Archive(); int i = 0; - milvus::engine::meta::SegmentsSchema files_get; - status = impl.GetCollectionFiles(table_file.collection_id_, ids, files_get); + milvus::engine::meta::FilesHolder files_holder; + status = impl.GetCollectionFiles(table_file.collection_id_, ids, files_holder); ASSERT_TRUE(status.ok()); - for (auto& file : files_get) { + for (auto& file : files_holder.HoldFiles()) { if (i >= 5) { ASSERT_EQ(file.file_type_, milvus::engine::meta::SegmentSchema::NEW); } @@ -596,83 +597,80 @@ TEST_F(MySqlMetaTest, COLLECTION_FILES_TEST) { ASSERT_TRUE(status.ok()); ASSERT_EQ(total_row_count, raw_files_cnt + to_index_files_cnt + index_files_cnt); - milvus::engine::meta::SegmentsSchema files; - status = impl_->FilesToIndex(files); - ASSERT_EQ(files.size(), to_index_files_cnt); + milvus::engine::meta::FilesHolder files_holder; + status = impl_->FilesToIndex(files_holder); + ASSERT_EQ(files_holder.HoldFiles().size(), to_index_files_cnt); - milvus::engine::meta::SegmentsSchema table_files; - status = impl_->FilesToMerge(collection.collection_id_, table_files); - ASSERT_EQ(table_files.size(), raw_files_cnt); + files_holder.ReleaseFiles(); + status = impl_->FilesToMerge(collection.collection_id_, files_holder); + ASSERT_EQ(files_holder.HoldFiles().size(), raw_files_cnt); + files_holder.ReleaseFiles(); FIU_ENABLE_FIU("MySQLMetaImpl.FilesToMerge.null_connection"); - status = impl_->FilesToMerge(collection.collection_id_, table_files); + status = impl_->FilesToMerge(collection.collection_id_, files_holder); ASSERT_FALSE(status.ok()); fiu_disable("MySQLMetaImpl.FilesToMerge.null_connection"); + files_holder.ReleaseFiles(); FIU_ENABLE_FIU("MySQLMetaImpl.FilesToMerge.throw_exception"); - status = impl_->FilesToMerge(collection.collection_id_, table_files); + status = impl_->FilesToMerge(collection.collection_id_, files_holder); ASSERT_FALSE(status.ok()); fiu_disable("MySQLMetaImpl.FilesToMerge.throw_exception"); - status = impl_->FilesToMerge("notexist", table_files); + files_holder.ReleaseFiles(); + status = impl_->FilesToMerge("notexist", files_holder); ASSERT_EQ(status.code(), milvus::DB_NOT_FOUND); table_file.file_type_ = milvus::engine::meta::SegmentSchema::RAW; table_file.file_size_ = milvus::engine::GB + 1; status = impl_->UpdateCollectionFile(table_file); ASSERT_TRUE(status.ok()); -#if 0 - { - //skip large files - milvus::engine::meta::SegmentsSchema table_files; - status = impl_->FilesToMerge(collection.collection_id_, table_files); - ASSERT_EQ(dated_files[table_file.date_].size(), raw_files_cnt); - } -#endif - status = impl_->FilesToIndex(files); - ASSERT_EQ(files.size(), to_index_files_cnt); + + files_holder.ReleaseFiles(); + status = impl_->FilesToIndex(files_holder); + ASSERT_EQ(files_holder.HoldFiles().size(), to_index_files_cnt); FIU_ENABLE_FIU("MySQLMetaImpl.DescribeCollection.throw_exception"); - status = impl_->FilesToIndex(files); + status = impl_->FilesToIndex(files_holder); ASSERT_FALSE(status.ok()); fiu_disable("MySQLMetaImpl.DescribeCollection.throw_exception"); FIU_ENABLE_FIU("MySQLMetaImpl.FilesToIndex.null_connection"); - status = impl_->FilesToIndex(files); + status = impl_->FilesToIndex(files_holder); ASSERT_FALSE(status.ok()); fiu_disable("MySQLMetaImpl.FilesToIndex.null_connection"); FIU_ENABLE_FIU("MySQLMetaImpl.FilesToIndex.throw_exception"); - status = impl_->FilesToIndex(files); + status = impl_->FilesToIndex(files_holder); ASSERT_FALSE(status.ok()); fiu_disable("MySQLMetaImpl.FilesToIndex.throw_exception"); - table_files.clear(); - status = impl_->FilesToSearch(collection_id, table_files); - ASSERT_EQ(table_files.size(), to_index_files_cnt + raw_files_cnt + index_files_cnt); + files_holder.ReleaseFiles(); + status = impl_->FilesToSearch(collection_id, files_holder); + ASSERT_EQ(files_holder.HoldFiles().size(), to_index_files_cnt + raw_files_cnt + index_files_cnt); - table_files.clear(); + files_holder.ReleaseFiles(); std::vector ids = {9999999999UL}; - status = impl_->FilesByID(ids, table_files); - ASSERT_EQ(table_files.size(), 0); + status = impl_->FilesByID(ids, files_holder); + ASSERT_EQ(files_holder.HoldFiles().size(), 0); FIU_ENABLE_FIU("MySQLMetaImpl.FilesToSearch.null_connection"); - status = impl_->FilesToSearch(collection_id, table_files); + status = impl_->FilesToSearch(collection_id, files_holder); ASSERT_FALSE(status.ok()); fiu_disable("MySQLMetaImpl.FilesToSearch.null_connection"); FIU_ENABLE_FIU("MySQLMetaImpl.FilesToSearch.throw_exception"); - status = impl_->FilesToSearch(collection_id, table_files); + status = impl_->FilesToSearch(collection_id, files_holder); ASSERT_FALSE(status.ok()); fiu_disable("MySQLMetaImpl.FilesToSearch.throw_exception"); - status = impl_->FilesToSearch("notexist", table_files); + status = impl_->FilesToSearch("notexist", files_holder); ASSERT_EQ(status.code(), milvus::DB_NOT_FOUND); - table_files.clear(); + files_holder.ReleaseFiles(); std::vector file_types; - status = impl_->FilesByType(collection.collection_id_, file_types, table_files); - ASSERT_TRUE(table_files.empty()); + status = impl_->FilesByType(collection.collection_id_, file_types, files_holder); + ASSERT_TRUE(files_holder.HoldFiles().empty()); ASSERT_FALSE(status.ok()); file_types = { @@ -681,11 +679,11 @@ TEST_F(MySqlMetaTest, COLLECTION_FILES_TEST) { milvus::engine::meta::SegmentSchema::INDEX, milvus::engine::meta::SegmentSchema::RAW, milvus::engine::meta::SegmentSchema::BACKUP, }; - status = impl_->FilesByType(collection.collection_id_, file_types, table_files); + status = impl_->FilesByType(collection.collection_id_, file_types, files_holder); ASSERT_TRUE(status.ok()); uint64_t total_cnt = new_index_files_cnt + new_merge_files_cnt + backup_files_cnt + new_files_cnt + raw_files_cnt + to_index_files_cnt + index_files_cnt; - ASSERT_EQ(table_files.size(), total_cnt); + ASSERT_EQ(files_holder.HoldFiles().size(), total_cnt); FIU_ENABLE_FIU("MySQLMetaImpl.DeleteCollectionFiles.null_connection"); status = impl_->DeleteCollectionFiles(collection_id); diff --git a/core/unittest/db/test_misc.cpp b/core/unittest/db/test_misc.cpp index 7d9450f6cf..6035bce740 100644 --- a/core/unittest/db/test_misc.cpp +++ b/core/unittest/db/test_misc.cpp @@ -17,7 +17,6 @@ #include "db/IDGenerator.h" #include "db/IndexFailedChecker.h" -#include "db/OngoingFileChecker.h" #include "db/Options.h" #include "db/Utils.h" #include "db/engine/EngineFactory.h" @@ -209,31 +208,6 @@ TEST(DBMiscTest, CHECKER_TEST) { checker.GetErrMsgForCollection("bbb", err_msg); ASSERT_EQ(err_msg, "5001 fail"); } - - { - milvus::engine::OngoingFileChecker& checker = milvus::engine::OngoingFileChecker::GetInstance(); - milvus::engine::meta::SegmentSchema schema; - schema.collection_id_ = "aaa"; - schema.file_id_ = "5000"; - checker.MarkOngoingFile(schema); - - ASSERT_TRUE(checker.IsIgnored(schema)); - - schema.collection_id_ = "bbb"; - schema.file_id_ = "5001"; - milvus::engine::meta::SegmentsSchema table_files = {schema}; - checker.MarkOngoingFiles(table_files); - - ASSERT_TRUE(checker.IsIgnored(schema)); - - checker.UnmarkOngoingFile(schema); - ASSERT_FALSE(checker.IsIgnored(schema)); - - schema.collection_id_ = "aaa"; - schema.file_id_ = "5000"; - checker.UnmarkOngoingFile(schema); - ASSERT_FALSE(checker.IsIgnored(schema)); - } } TEST(DBMiscTest, IDGENERATOR_TEST) { diff --git a/core/unittest/db/test_search_by_id.cpp b/core/unittest/db/test_search_by_id.cpp index d26802469c..c5ed9283a4 100644 --- a/core/unittest/db/test_search_by_id.cpp +++ b/core/unittest/db/test_search_by_id.cpp @@ -108,7 +108,6 @@ TEST_F(SearchByIdTest, BASIC_TEST) { ids_to_search.emplace_back(index); } - // std::this_thread::sleep_for(std::chrono::seconds(3)); // ensure raw data write to disk stat = db_->Flush(); ASSERT_TRUE(stat.ok()); @@ -156,6 +155,24 @@ TEST_F(SearchByIdTest, BASIC_TEST) { ASSERT_LT(result_distances[topk * i], 1e-3); } } + + // duplicate id search + ids_to_search.clear(); + ids_to_search.push_back(1); + ids_to_search.push_back(1); + + stat = db_->QueryByIDs(dummy_context_, + collection_info.collection_id_, + tags, + topk, + json_params, + ids_to_search, + result_ids, + result_distances); + ASSERT_EQ(result_ids.size(), ids_to_search.size() * topk); + ASSERT_EQ(result_distances.size(), ids_to_search.size() * topk); + + CheckQueryResult(ids_to_search, topk, result_ids, result_distances); } TEST_F(SearchByIdTest, WITH_INDEX_TEST) { @@ -508,7 +525,6 @@ TEST_F(SearchByIdTest, BINARY_TEST) { ids_to_search.emplace_back(index); } - // std::this_thread::sleep_for(std::chrono::seconds(3)); // ensure raw data write to disk stat = db_->Flush(); ASSERT_TRUE(stat.ok()); @@ -577,4 +593,22 @@ TEST_F(SearchByIdTest, BINARY_TEST) { ASSERT_LT(result_distances[topk * i], 1e-3); } } + + // duplicate id search + ids_to_search.clear(); + ids_to_search.push_back(1); + ids_to_search.push_back(1); + + stat = db_->QueryByIDs(dummy_context_, + collection_info.collection_id_, + tags, + topk, + json_params, + ids_to_search, + result_ids, + result_distances); + ASSERT_EQ(result_ids.size(), ids_to_search.size() * topk); + ASSERT_EQ(result_distances.size(), ids_to_search.size() * topk); + + CheckQueryResult(ids_to_search, topk, result_ids, result_distances); }