From 930097128b8067e925d7cd84d1f48b16aafec869 Mon Sep 17 00:00:00 2001 From: groot Date: Fri, 29 Nov 2019 10:30:10 +0800 Subject: [PATCH] #596 Frequently insert operation cost too much disk space --- CHANGELOG.md | 1 + core/src/db/DBImpl.cpp | 126 ++++++--------------------- core/src/db/DBImpl.h | 30 ++----- core/src/db/IndexFailedChecker.cpp | 109 +++++++++++++++++++++++ core/src/db/IndexFailedChecker.h | 56 ++++++++++++ core/src/db/OngoingFileChecker.cpp | 98 +++++++++++++++++++++ core/src/db/OngoingFileChecker.h | 61 +++++++++++++ core/src/db/meta/Meta.h | 4 +- core/src/db/meta/MetaTypes.h | 4 + core/src/db/meta/MySQLMetaImpl.cpp | 73 ++++++++-------- core/src/db/meta/MySQLMetaImpl.h | 4 +- core/src/db/meta/SqliteMetaImpl.cpp | 29 +++++- core/src/db/meta/SqliteMetaImpl.h | 4 +- core/unittest/db/test_meta.cpp | 3 +- core/unittest/db/test_meta_mysql.cpp | 3 +- 15 files changed, 434 insertions(+), 171 deletions(-) create mode 100644 core/src/db/IndexFailedChecker.cpp create mode 100644 core/src/db/IndexFailedChecker.h create mode 100644 core/src/db/OngoingFileChecker.cpp create mode 100644 core/src/db/OngoingFileChecker.h diff --git a/CHANGELOG.md b/CHANGELOG.md index fab5bf3bd0..7c44e9cbe8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#545 - Avoid dead circle of build index thread when error occurs - \#552 - Server down during building index_type: IVF_PQ using GPU-edition - \#561 - Milvus server should report exception/error message or terminate on mysql metadata backend error +- \#596 - Frequently insert operation cost too much disk space - \#599 - Build index log is incorrect ## Feature diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 8e08a850f8..c9ebed6378 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -52,8 +52,6 @@ constexpr uint64_t METRIC_ACTION_INTERVAL = 1; constexpr uint64_t COMPACT_ACTION_INTERVAL = 1; constexpr uint64_t INDEX_ACTION_INTERVAL = 1; -constexpr uint64_t INDEX_FAILED_RETRY_TIME = 1; - static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milsvus server is shutdown!"); void @@ -370,7 +368,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) { WaitMergeFileFinish(); // step 4: wait and build index - status = CleanFailedIndexFileOfTable(table_id); + status = index_failed_checker_.CleanFailedIndexFileOfTable(table_id); status = BuildTableIndexRecursively(table_id, index); return status; @@ -504,7 +502,9 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi TimeRecorder rc(""); - // step 1: get files to search + // step 1: construct search job + auto status = ongoing_files_checker_.MarkOngoingFiles(files); + ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size(); scheduler::SearchJobPtr job = std::make_shared(k, nq, nprobe, vectors); for (auto& file : files) { @@ -512,9 +512,11 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi job->AddIndexFile(file_ptr); } - // step 2: put search task to scheduler + // step 2: put search job to scheduler and wait result scheduler::JobMgrInst::GetInstance()->Put(job); job->WaitResult(); + + status = ongoing_files_checker_.UnmarkOngoingFiles(files); if (!job->GetStatus().ok()) { return job->GetStatus(); } @@ -751,13 +753,15 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) { } for (auto& kv : raw_files) { - auto files = kv.second; + meta::TableFilesSchema& files = kv.second; if (files.size() < options_.merge_trigger_number_) { ENGINE_LOG_TRACE << "Files number not greater equal than merge trigger number, skip merge action"; continue; } + status = ongoing_files_checker_.MarkOngoingFiles(files); MergeFiles(table_id, kv.first, kv.second); + status = ongoing_files_checker_.UnmarkOngoingFiles(files); if (shutting_down_.load(std::memory_order_acquire)) { ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id; @@ -787,17 +791,18 @@ DBImpl::BackgroundCompaction(std::set table_ids) { meta_ptr_->Archive(); + meta::Table2FileIDs ignore_files = ongoing_files_checker_.GetOngoingFiles(); { uint64_t ttl = 10 * meta::SECOND; // default: file data will be erase from cache after few seconds - meta_ptr_->CleanUpCacheWithTTL(ttl); + meta_ptr_->CleanUpCacheWithTTL(ttl, ignore_files); } { - uint64_t ttl = 5 * meta::M_SEC; // default: file will be deleted after few minutes + uint64_t ttl = 20 * meta::SECOND; // default: file will be deleted after few seconds if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) { - ttl = meta::D_SEC; + ttl = meta::H_SEC; } - meta_ptr_->CleanUpFilesWithTTL(ttl); + meta_ptr_->CleanUpFilesWithTTL(ttl, ignore_files); } // ENGINE_LOG_TRACE << " Background compaction thread exit"; @@ -838,9 +843,11 @@ DBImpl::BackgroundBuildIndex() { std::unique_lock lock(build_index_mutex_); meta::TableFilesSchema to_index_files; meta_ptr_->FilesToIndex(to_index_files); - Status status = IgnoreFailedIndexFiles(to_index_files); + Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files); if (!to_index_files.empty()) { + status = ongoing_files_checker_.MarkOngoingFiles(to_index_files); + // step 2: put build index task to scheduler std::map job2file_map; for (auto& file : to_index_files) { @@ -859,13 +866,15 @@ DBImpl::BackgroundBuildIndex() { Status status = job->GetStatus(); ENGINE_LOG_ERROR << "Building index job " << job->id() << " failed: " << status.ToString(); - MarkFailedIndexFile(file_schema); + index_failed_checker_.MarkFailedIndexFile(file_schema); } else { - MarkSucceedIndexFile(file_schema); + index_failed_checker_.MarkSucceedIndexFile(file_schema); ENGINE_LOG_DEBUG << "Building index job " << job->id() << " succeed."; } } + status = ongoing_files_checker_.UnmarkOngoingFiles(to_index_files); + ENGINE_LOG_DEBUG << "Background build index thread finished"; } @@ -894,6 +903,8 @@ DBImpl::GetFilesToBuildIndex(const std::string& table_id, const std::vector Status DBImpl::GetFilesToSearch(const std::string& table_id, const std::vector& file_ids, const meta::DatesT& dates, meta::TableFilesSchema& files) { + ENGINE_LOG_DEBUG << "Collect files from table: " << table_id; + meta::DatePartionedTableFilesSchema date_files; auto status = meta_ptr_->FilesToSearch(table_id, file_ids, dates, date_files); if (!status.ok()) { @@ -934,7 +945,7 @@ DBImpl::DropTableRecursively(const std::string& table_id, const meta::DatesT& da if (dates.empty()) { status = mem_mgr_->EraseMemVector(table_id); // not allow insert status = meta_ptr_->DropTable(table_id); // soft delete table - CleanFailedIndexFileOfTable(table_id); + index_failed_checker_.CleanFailedIndexFileOfTable(table_id); // scheduler will determine when to delete table files auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource(); @@ -1014,7 +1025,7 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex GetFilesToBuildIndex(table_id, file_types, table_files); times++; - IgnoreFailedIndexFiles(table_files); + index_failed_checker_.IgnoreFailedIndexFiles(table_files); } // build index for partition @@ -1029,7 +1040,7 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex // failed to build index for some files, return error std::vector failed_files; - GetFailedIndexFileOfTable(table_id, failed_files); + index_failed_checker_.GetFailedIndexFileOfTable(table_id, failed_files); if (!failed_files.empty()) { std::string msg = "Failed to build index for " + std::to_string(failed_files.size()) + ((failed_files.size() == 1) ? " file" : " files"); @@ -1047,7 +1058,7 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex Status DBImpl::DropTableIndexRecursively(const std::string& table_id) { ENGINE_LOG_DEBUG << "Drop index for table: " << table_id; - CleanFailedIndexFileOfTable(table_id); + index_failed_checker_.CleanFailedIndexFileOfTable(table_id); auto status = meta_ptr_->DropTableIndex(table_id); if (!status.ok()) { return status; @@ -1090,86 +1101,5 @@ DBImpl::GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_c return Status::OK(); } -Status -DBImpl::CleanFailedIndexFileOfTable(const std::string& table_id) { - std::lock_guard lck(index_failed_mutex_); - index_failed_files_.erase(table_id); // rebuild failed index files for this table - - return Status::OK(); -} - -Status -DBImpl::GetFailedIndexFileOfTable(const std::string& table_id, std::vector& failed_files) { - failed_files.clear(); - std::lock_guard lck(index_failed_mutex_); - auto iter = index_failed_files_.find(table_id); - if (iter != index_failed_files_.end()) { - FileID2FailedTimes& failed_map = iter->second; - for (auto it_file = failed_map.begin(); it_file != failed_map.end(); ++it_file) { - failed_files.push_back(it_file->first); - } - } - - return Status::OK(); -} - -Status -DBImpl::MarkFailedIndexFile(const meta::TableFileSchema& file) { - std::lock_guard lck(index_failed_mutex_); - - auto iter = index_failed_files_.find(file.table_id_); - if (iter == index_failed_files_.end()) { - FileID2FailedTimes failed_files; - failed_files.insert(std::make_pair(file.file_id_, 1)); - index_failed_files_.insert(std::make_pair(file.table_id_, failed_files)); - } else { - auto it_failed_files = iter->second.find(file.file_id_); - if (it_failed_files != iter->second.end()) { - it_failed_files->second++; - } else { - iter->second.insert(std::make_pair(file.file_id_, 1)); - } - } - - return Status::OK(); -} - -Status -DBImpl::MarkSucceedIndexFile(const meta::TableFileSchema& file) { - std::lock_guard lck(index_failed_mutex_); - - auto iter = index_failed_files_.find(file.table_id_); - if (iter != index_failed_files_.end()) { - iter->second.erase(file.file_id_); - } - - return Status::OK(); -} - -Status -DBImpl::IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files) { - std::lock_guard lck(index_failed_mutex_); - - // there could be some failed files belong to different table. - // some files may has failed for several times, no need to build index for these files. - // thus we can avoid dead circle for build index operation - for (auto it_file = table_files.begin(); it_file != table_files.end();) { - auto it_failed_files = index_failed_files_.find((*it_file).table_id_); - if (it_failed_files != index_failed_files_.end()) { - auto it_failed_file = it_failed_files->second.find((*it_file).file_id_); - if (it_failed_file != it_failed_files->second.end()) { - if (it_failed_file->second >= INDEX_FAILED_RETRY_TIME) { - it_file = table_files.erase(it_file); - continue; - } - } - } - - ++it_file; - } - - return Status::OK(); -} - } // namespace engine } // namespace milvus diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index 3baac92c0a..5091160d63 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -18,8 +18,10 @@ #pragma once #include "DB.h" -#include "Types.h" -#include "src/db/insert/MemManager.h" +#include "db/IndexFailedChecker.h" +#include "db/OngoingFileChecker.h" +#include "db/Types.h" +#include "db/insert/MemManager.h" #include "utils/ThreadPool.h" #include @@ -178,21 +180,6 @@ class DBImpl : public DB { Status GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_count); - Status - CleanFailedIndexFileOfTable(const std::string& table_id); - - Status - GetFailedIndexFileOfTable(const std::string& table_id, std::vector& failed_files); - - Status - MarkFailedIndexFile(const meta::TableFileSchema& file); - - Status - MarkSucceedIndexFile(const meta::TableFileSchema& file); - - Status - IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files); - private: const DBOptions options_; @@ -214,11 +201,10 @@ class DBImpl : public DB { std::list> index_thread_results_; std::mutex build_index_mutex_; - std::mutex index_failed_mutex_; - using FileID2FailedTimes = std::map; - using Table2FailedFiles = std::map; - Table2FailedFiles index_failed_files_; // file id mapping to failed times -}; // DBImpl + + IndexFailedChecker index_failed_checker_; + OngoingFileChecker ongoing_files_checker_; +}; // DBImpl } // namespace engine } // namespace milvus diff --git a/core/src/db/IndexFailedChecker.cpp b/core/src/db/IndexFailedChecker.cpp new file mode 100644 index 0000000000..f3667996a5 --- /dev/null +++ b/core/src/db/IndexFailedChecker.cpp @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "db/IndexFailedChecker.h" + +#include + +namespace milvus { +namespace engine { + +constexpr uint64_t INDEX_FAILED_RETRY_TIME = 1; + +Status +IndexFailedChecker::CleanFailedIndexFileOfTable(const std::string& table_id) { + std::lock_guard lck(mutex_); + index_failed_files_.erase(table_id); // rebuild failed index files for this table + + return Status::OK(); +} + +Status +IndexFailedChecker::GetFailedIndexFileOfTable(const std::string& table_id, std::vector& failed_files) { + failed_files.clear(); + std::lock_guard lck(mutex_); + auto iter = index_failed_files_.find(table_id); + if (iter != index_failed_files_.end()) { + FileID2FailedTimes& failed_map = iter->second; + for (auto it_file = failed_map.begin(); it_file != failed_map.end(); ++it_file) { + failed_files.push_back(it_file->first); + } + } + + return Status::OK(); +} + +Status +IndexFailedChecker::MarkFailedIndexFile(const meta::TableFileSchema& file) { + std::lock_guard lck(mutex_); + + auto iter = index_failed_files_.find(file.table_id_); + if (iter == index_failed_files_.end()) { + FileID2FailedTimes failed_files; + failed_files.insert(std::make_pair(file.file_id_, 1)); + index_failed_files_.insert(std::make_pair(file.table_id_, failed_files)); + } else { + auto it_failed_files = iter->second.find(file.file_id_); + if (it_failed_files != iter->second.end()) { + it_failed_files->second++; + } else { + iter->second.insert(std::make_pair(file.file_id_, 1)); + } + } + + return Status::OK(); +} + +Status +IndexFailedChecker::MarkSucceedIndexFile(const meta::TableFileSchema& file) { + std::lock_guard lck(mutex_); + + auto iter = index_failed_files_.find(file.table_id_); + if (iter != index_failed_files_.end()) { + iter->second.erase(file.file_id_); + } + + return Status::OK(); +} + +Status +IndexFailedChecker::IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files) { + std::lock_guard lck(mutex_); + + // there could be some failed files belong to different table. + // some files may has failed for several times, no need to build index for these files. + // thus we can avoid dead circle for build index operation + for (auto it_file = table_files.begin(); it_file != table_files.end();) { + auto it_failed_files = index_failed_files_.find((*it_file).table_id_); + if (it_failed_files != index_failed_files_.end()) { + auto it_failed_file = it_failed_files->second.find((*it_file).file_id_); + if (it_failed_file != it_failed_files->second.end()) { + if (it_failed_file->second >= INDEX_FAILED_RETRY_TIME) { + it_file = table_files.erase(it_file); + continue; + } + } + } + + ++it_file; + } + + return Status::OK(); +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/IndexFailedChecker.h b/core/src/db/IndexFailedChecker.h new file mode 100644 index 0000000000..cdcc34a76d --- /dev/null +++ b/core/src/db/IndexFailedChecker.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "meta/Meta.h" +#include "utils/Status.h" + +#include +#include +#include +#include + +namespace milvus { +namespace engine { + +class IndexFailedChecker { + public: + Status + CleanFailedIndexFileOfTable(const std::string& table_id); + + Status + GetFailedIndexFileOfTable(const std::string& table_id, std::vector& failed_files); + + Status + MarkFailedIndexFile(const meta::TableFileSchema& file); + + Status + MarkSucceedIndexFile(const meta::TableFileSchema& file); + + Status + IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files); + + private: + std::mutex mutex_; + using FileID2FailedTimes = std::map; + using Table2FailedFiles = std::map; + Table2FailedFiles index_failed_files_; // file id mapping to failed times +}; + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/OngoingFileChecker.cpp b/core/src/db/OngoingFileChecker.cpp new file mode 100644 index 0000000000..6bf966ba73 --- /dev/null +++ b/core/src/db/OngoingFileChecker.cpp @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "db/OngoingFileChecker.h" + +#include + +namespace milvus { +namespace engine { + +Status +OngoingFileChecker::MarkOngoingFile(const meta::TableFileSchema& table_file) { + std::lock_guard lck(mutex_); + return MarkOngoingFileNoLock(table_file); +} + +Status +OngoingFileChecker::MarkOngoingFiles(const meta::TableFilesSchema& table_files) { + std::lock_guard lck(mutex_); + + for (auto& table_file : table_files) { + MarkOngoingFileNoLock(table_file); + } + + return Status::OK(); +} + +Status +OngoingFileChecker::UnmarkOngoingFile(const meta::TableFileSchema& table_file) { + std::lock_guard lck(mutex_); + return UnmarkOngoingFileNoLock(table_file); +} + +Status +OngoingFileChecker::UnmarkOngoingFiles(const meta::TableFilesSchema& table_files) { + std::lock_guard lck(mutex_); + + for (auto& table_file : table_files) { + UnmarkOngoingFileNoLock(table_file); + } + + return Status::OK(); +} + +meta::Table2FileIDs +OngoingFileChecker::GetOngoingFiles() { + // return copy + // don't return reference(avoid multi-threads conflict) + return ongoing_files_; +} + +Status +OngoingFileChecker::MarkOngoingFileNoLock(const meta::TableFileSchema& table_file) { + if (table_file.table_id_.empty() || table_file.file_id_.empty()) { + return Status(DB_ERROR, "Invalid table files"); + } + + auto iter = ongoing_files_.find(table_file.table_id_); + if (iter == ongoing_files_.end()) { + meta::FileIDArray file_ids = {table_file.file_id_}; + ongoing_files_.insert(std::make_pair(table_file.table_id_, file_ids)); + } else { + iter->second.insert(table_file.file_id_); + } + + return Status::OK(); +} + +Status +OngoingFileChecker::UnmarkOngoingFileNoLock(const meta::TableFileSchema& table_file) { + if (table_file.table_id_.empty() || table_file.file_id_.empty()) { + return Status(DB_ERROR, "Invalid table files"); + } + + auto iter = ongoing_files_.find(table_file.table_id_); + if (iter != ongoing_files_.end()) { + iter->second.erase(table_file.file_id_); + } + + return Status::OK(); +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/OngoingFileChecker.h b/core/src/db/OngoingFileChecker.h new file mode 100644 index 0000000000..8f21a45045 --- /dev/null +++ b/core/src/db/OngoingFileChecker.h @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "meta/Meta.h" +#include "utils/Status.h" + +#include +#include +#include +#include + +namespace milvus { +namespace engine { + +class OngoingFileChecker { + public: + Status + MarkOngoingFile(const meta::TableFileSchema& table_file); + + Status + MarkOngoingFiles(const meta::TableFilesSchema& table_files); + + Status + UnmarkOngoingFile(const meta::TableFileSchema& table_file); + + Status + UnmarkOngoingFiles(const meta::TableFilesSchema& table_files); + + meta::Table2FileIDs + GetOngoingFiles(); + + private: + Status + MarkOngoingFileNoLock(const meta::TableFileSchema& table_file); + + Status + UnmarkOngoingFileNoLock(const meta::TableFileSchema& table_file); + + private: + std::mutex mutex_; + meta::Table2FileIDs ongoing_files_; +}; + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/meta/Meta.h b/core/src/db/meta/Meta.h index bf46f02fea..f620087ad5 100644 --- a/core/src/db/meta/Meta.h +++ b/core/src/db/meta/Meta.h @@ -121,10 +121,10 @@ class Meta { CleanUpShadowFiles() = 0; virtual Status - CleanUpCacheWithTTL(uint64_t seconds) = 0; + CleanUpCacheWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) = 0; virtual Status - CleanUpFilesWithTTL(uint64_t seconds) = 0; + CleanUpFilesWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) = 0; virtual Status DropAll() = 0; diff --git a/core/src/db/meta/MetaTypes.h b/core/src/db/meta/MetaTypes.h index d98b74be7d..3e28658983 100644 --- a/core/src/db/meta/MetaTypes.h +++ b/core/src/db/meta/MetaTypes.h @@ -23,6 +23,7 @@ #include #include +#include #include #include @@ -97,6 +98,9 @@ using TableFileSchemaPtr = std::shared_ptr; using TableFilesSchema = std::vector; using DatePartionedTableFilesSchema = std::map; +using FileIDArray = std::set; +using Table2FileIDs = std::map; + } // namespace meta } // namespace engine } // namespace milvus diff --git a/core/src/db/meta/MySQLMetaImpl.cpp b/core/src/db/meta/MySQLMetaImpl.cpp index 7b53e6361a..d9ca892341 100644 --- a/core/src/db/meta/MySQLMetaImpl.cpp +++ b/core/src/db/meta/MySQLMetaImpl.cpp @@ -1639,7 +1639,8 @@ MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector& case (int)TableFileSchema::BACKUP: msg = msg + " backup files:" + std::to_string(backup_count); break; - default:break; + default: + break; } } ENGINE_LOG_DEBUG << msg; @@ -1782,7 +1783,7 @@ MySQLMetaImpl::CleanUpShadowFiles() { } Status -MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) { +MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) { auto now = utils::GetMicroSecTimeStamp(); // erase deleted/backup files from cache @@ -1795,14 +1796,13 @@ MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) { return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } - mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query(); - cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date" - << " FROM " << META_TABLEFILES << " WHERE file_type IN (" - << std::to_string(TableFileSchema::TO_DELETE) << "," - << std::to_string(TableFileSchema::BACKUP) << ")" - << " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";"; + mysqlpp::Query query = connectionPtr->query(); + query << "SELECT id, table_id, file_id, date" + << " FROM " << META_TABLEFILES << " WHERE file_type IN (" << std::to_string(TableFileSchema::TO_DELETE) + << "," << std::to_string(TableFileSchema::BACKUP) << ")" + << " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";"; - mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); + mysqlpp::StoreQueryResult res = query.store(); TableFileSchema table_file; std::vector idsToDelete; @@ -1824,7 +1824,7 @@ MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) { } Status -MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { +MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) { auto now = utils::GetMicroSecTimeStamp(); std::set table_ids; @@ -1839,15 +1839,14 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } - mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query(); - cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date" - << " FROM " << META_TABLEFILES - << " WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE) - << " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";"; + mysqlpp::Query query = connectionPtr->query(); + query << "SELECT id, table_id, file_id, date" + << " FROM " << META_TABLEFILES << " WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE) + << " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";"; - ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str(); + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str(); - mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); + mysqlpp::StoreQueryResult res = query.store(); TableFileSchema table_file; std::vector idsToDelete; @@ -1874,13 +1873,12 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { std::string idsToDeleteStr = idsToDeleteSS.str(); idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); // remove the last " OR " - cleanUpFilesWithTTLQuery << "DELETE FROM " << META_TABLEFILES << " WHERE " << idsToDeleteStr << ";"; + query << "DELETE FROM " << META_TABLEFILES << " WHERE " << idsToDeleteStr << ";"; - ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str(); + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str(); - if (!cleanUpFilesWithTTLQuery.exec()) { - return HandleException("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", - cleanUpFilesWithTTLQuery.error()); + if (!query.exec()) { + return HandleException("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", query.error()); } } @@ -1903,14 +1901,13 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } - mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query(); - cleanUpFilesWithTTLQuery << "SELECT id, table_id" - << " FROM " << META_TABLES - << " WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";"; + mysqlpp::Query query = connectionPtr->query(); + query << "SELECT id, table_id" + << " FROM " << META_TABLES << " WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";"; - ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str(); + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str(); - mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); + mysqlpp::StoreQueryResult res = query.store(); int64_t remove_tables = 0; if (!res.empty()) { @@ -1926,13 +1923,12 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { } std::string idsToDeleteStr = idsToDeleteSS.str(); idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); // remove the last " OR " - cleanUpFilesWithTTLQuery << "DELETE FROM " << META_TABLES << " WHERE " << idsToDeleteStr << ";"; + query << "DELETE FROM " << META_TABLES << " WHERE " << idsToDeleteStr << ";"; - ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str(); + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str(); - if (!cleanUpFilesWithTTLQuery.exec()) { - return HandleException("QUERY ERROR WHEN CLEANING UP TABLES WITH TTL", - cleanUpFilesWithTTLQuery.error()); + if (!query.exec()) { + return HandleException("QUERY ERROR WHEN CLEANING UP TABLES WITH TTL", query.error()); } } @@ -1957,14 +1953,13 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { } for (auto& table_id : table_ids) { - mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query(); - cleanUpFilesWithTTLQuery << "SELECT file_id" - << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote - << table_id << ";"; + mysqlpp::Query query = connectionPtr->query(); + query << "SELECT file_id" + << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id << ";"; - ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str(); + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str(); - mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); + mysqlpp::StoreQueryResult res = query.store(); if (res.empty()) { utils::DeleteTablePath(options_, table_id); diff --git a/core/src/db/meta/MySQLMetaImpl.h b/core/src/db/meta/MySQLMetaImpl.h index e7697316af..d13ed7e043 100644 --- a/core/src/db/meta/MySQLMetaImpl.h +++ b/core/src/db/meta/MySQLMetaImpl.h @@ -120,10 +120,10 @@ class MySQLMetaImpl : public Meta { CleanUpShadowFiles() override; Status - CleanUpCacheWithTTL(uint64_t seconds) override; + CleanUpCacheWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) override; Status - CleanUpFilesWithTTL(uint64_t seconds) override; + CleanUpFilesWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) override; Status DropAll() override; diff --git a/core/src/db/meta/SqliteMetaImpl.cpp b/core/src/db/meta/SqliteMetaImpl.cpp index 07f890d50a..e751b14555 100644 --- a/core/src/db/meta/SqliteMetaImpl.cpp +++ b/core/src/db/meta/SqliteMetaImpl.cpp @@ -1294,7 +1294,7 @@ SqliteMetaImpl::CleanUpShadowFiles() { } Status -SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) { +SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) { auto now = utils::GetMicroSecTimeStamp(); // erase deleted/backup files from cache @@ -1309,6 +1309,7 @@ SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) { (int)TableFileSchema::BACKUP, }; + // collect files to be erased auto files = ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::file_id_, @@ -1326,6 +1327,15 @@ SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) { table_file.file_id_ = std::get<2>(file); table_file.date_ = std::get<3>(file); + // check if the file can be deleted + auto iter = ignore_files.find(table_file.table_id_); + if (iter != ignore_files.end()) { + if (iter->second.find(table_file.file_id_) != iter->second.end()) { + continue; // ignore this file, don't delete it + } + } + + // erase file data from cache utils::GetTableFilePath(options_, table_file); server::CommonUtil::EraseFromCache(table_file.location_); } @@ -1338,7 +1348,7 @@ SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) { } Status -SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { +SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) { auto now = utils::GetMicroSecTimeStamp(); std::set table_ids; @@ -1349,6 +1359,7 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); + // collect files to be deleted auto files = ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::file_id_, @@ -1368,10 +1379,20 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { table_file.file_id_ = std::get<2>(file); table_file.date_ = std::get<3>(file); - utils::DeleteTableFilePath(options_, table_file); - ENGINE_LOG_DEBUG << "Removing file id:" << table_file.file_id_ << " location:" << table_file.location_; + // check if the file can be deleted + auto iter = ignore_files.find(table_file.table_id_); + if (iter != ignore_files.end()) { + if (iter->second.find(table_file.file_id_) != iter->second.end()) { + continue; // ignore this file, don't delete it + } + } + + // delete file from meta ConnectorPtr->remove(table_file.id_); + // delete file from disk storage + utils::DeleteTableFilePath(options_, table_file); + ENGINE_LOG_DEBUG << "Removing file id:" << table_file.file_id_ << " location:" << table_file.location_; table_ids.insert(table_file.table_id_); } return true; diff --git a/core/src/db/meta/SqliteMetaImpl.h b/core/src/db/meta/SqliteMetaImpl.h index 5581efe361..4ef4542022 100644 --- a/core/src/db/meta/SqliteMetaImpl.h +++ b/core/src/db/meta/SqliteMetaImpl.h @@ -120,10 +120,10 @@ class SqliteMetaImpl : public Meta { CleanUpShadowFiles() override; Status - CleanUpCacheWithTTL(uint64_t seconds) override; + CleanUpCacheWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) override; Status - CleanUpFilesWithTTL(uint64_t seconds) override; + CleanUpFilesWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) override; Status DropAll() override; diff --git a/core/unittest/db/test_meta.cpp b/core/unittest/db/test_meta.cpp index b89c73c296..c2a9f8e8bc 100644 --- a/core/unittest/db/test_meta.cpp +++ b/core/unittest/db/test_meta.cpp @@ -335,7 +335,8 @@ TEST_F(MetaTest, TABLE_FILES_TEST) { status = impl_->DropTable(table_id); ASSERT_TRUE(status.ok()); - status = impl_->CleanUpFilesWithTTL(1UL); + milvus::engine::meta::Table2FileIDs ignore_files; + status = impl_->CleanUpFilesWithTTL(1UL, ignore_files); ASSERT_TRUE(status.ok()); } diff --git a/core/unittest/db/test_meta_mysql.cpp b/core/unittest/db/test_meta_mysql.cpp index 9a52a01b7b..cdd8aa32a8 100644 --- a/core/unittest/db/test_meta_mysql.cpp +++ b/core/unittest/db/test_meta_mysql.cpp @@ -349,7 +349,8 @@ TEST_F(MySqlMetaTest, TABLE_FILES_TEST) { status = impl_->DropTable(table_id); ASSERT_TRUE(status.ok()); - status = impl_->CleanUpFilesWithTTL(0UL); + milvus::engine::meta::Table2FileIDs ignore_files; + status = impl_->CleanUpFilesWithTTL(0UL, ignore_files); ASSERT_TRUE(status.ok()); }