#596 Frequently insert operation cost too much disk space

This commit is contained in:
groot 2019-11-29 10:30:10 +08:00
parent 98270022a2
commit 930097128b
15 changed files with 434 additions and 171 deletions

View File

@ -35,6 +35,7 @@ Please mark all change in change log and use the ticket from JIRA.
- \#545 - Avoid dead circle of build index thread when error occurs
- \#552 - Server down during building index_type: IVF_PQ using GPU-edition
- \#561 - Milvus server should report exception/error message or terminate on mysql metadata backend error
- \#596 - Frequently insert operation cost too much disk space
- \#599 - Build index log is incorrect
## Feature

View File

@ -52,8 +52,6 @@ constexpr uint64_t METRIC_ACTION_INTERVAL = 1;
constexpr uint64_t COMPACT_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
constexpr uint64_t INDEX_FAILED_RETRY_TIME = 1;
static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milsvus server is shutdown!");
void
@ -370,7 +368,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
WaitMergeFileFinish();
// step 4: wait and build index
status = CleanFailedIndexFileOfTable(table_id);
status = index_failed_checker_.CleanFailedIndexFileOfTable(table_id);
status = BuildTableIndexRecursively(table_id, index);
return status;
@ -504,7 +502,9 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi
TimeRecorder rc("");
// step 1: get files to search
// step 1: construct search job
auto status = ongoing_files_checker_.MarkOngoingFiles(files);
ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size();
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(k, nq, nprobe, vectors);
for (auto& file : files) {
@ -512,9 +512,11 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi
job->AddIndexFile(file_ptr);
}
// step 2: put search task to scheduler
// step 2: put search job to scheduler and wait result
scheduler::JobMgrInst::GetInstance()->Put(job);
job->WaitResult();
status = ongoing_files_checker_.UnmarkOngoingFiles(files);
if (!job->GetStatus().ok()) {
return job->GetStatus();
}
@ -751,13 +753,15 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) {
}
for (auto& kv : raw_files) {
auto files = kv.second;
meta::TableFilesSchema& files = kv.second;
if (files.size() < options_.merge_trigger_number_) {
ENGINE_LOG_TRACE << "Files number not greater equal than merge trigger number, skip merge action";
continue;
}
status = ongoing_files_checker_.MarkOngoingFiles(files);
MergeFiles(table_id, kv.first, kv.second);
status = ongoing_files_checker_.UnmarkOngoingFiles(files);
if (shutting_down_.load(std::memory_order_acquire)) {
ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id;
@ -787,17 +791,18 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
meta_ptr_->Archive();
meta::Table2FileIDs ignore_files = ongoing_files_checker_.GetOngoingFiles();
{
uint64_t ttl = 10 * meta::SECOND; // default: file data will be erase from cache after few seconds
meta_ptr_->CleanUpCacheWithTTL(ttl);
meta_ptr_->CleanUpCacheWithTTL(ttl, ignore_files);
}
{
uint64_t ttl = 5 * meta::M_SEC; // default: file will be deleted after few minutes
uint64_t ttl = 20 * meta::SECOND; // default: file will be deleted after few seconds
if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
ttl = meta::D_SEC;
ttl = meta::H_SEC;
}
meta_ptr_->CleanUpFilesWithTTL(ttl);
meta_ptr_->CleanUpFilesWithTTL(ttl, ignore_files);
}
// ENGINE_LOG_TRACE << " Background compaction thread exit";
@ -838,9 +843,11 @@ DBImpl::BackgroundBuildIndex() {
std::unique_lock<std::mutex> lock(build_index_mutex_);
meta::TableFilesSchema to_index_files;
meta_ptr_->FilesToIndex(to_index_files);
Status status = IgnoreFailedIndexFiles(to_index_files);
Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files);
if (!to_index_files.empty()) {
status = ongoing_files_checker_.MarkOngoingFiles(to_index_files);
// step 2: put build index task to scheduler
std::map<scheduler::BuildIndexJobPtr, scheduler::TableFileSchemaPtr> job2file_map;
for (auto& file : to_index_files) {
@ -859,13 +866,15 @@ DBImpl::BackgroundBuildIndex() {
Status status = job->GetStatus();
ENGINE_LOG_ERROR << "Building index job " << job->id() << " failed: " << status.ToString();
MarkFailedIndexFile(file_schema);
index_failed_checker_.MarkFailedIndexFile(file_schema);
} else {
MarkSucceedIndexFile(file_schema);
index_failed_checker_.MarkSucceedIndexFile(file_schema);
ENGINE_LOG_DEBUG << "Building index job " << job->id() << " succeed.";
}
}
status = ongoing_files_checker_.UnmarkOngoingFiles(to_index_files);
ENGINE_LOG_DEBUG << "Background build index thread finished";
}
@ -894,6 +903,8 @@ DBImpl::GetFilesToBuildIndex(const std::string& table_id, const std::vector<int>
Status
DBImpl::GetFilesToSearch(const std::string& table_id, const std::vector<size_t>& file_ids, const meta::DatesT& dates,
meta::TableFilesSchema& files) {
ENGINE_LOG_DEBUG << "Collect files from table: " << table_id;
meta::DatePartionedTableFilesSchema date_files;
auto status = meta_ptr_->FilesToSearch(table_id, file_ids, dates, date_files);
if (!status.ok()) {
@ -934,7 +945,7 @@ DBImpl::DropTableRecursively(const std::string& table_id, const meta::DatesT& da
if (dates.empty()) {
status = mem_mgr_->EraseMemVector(table_id); // not allow insert
status = meta_ptr_->DropTable(table_id); // soft delete table
CleanFailedIndexFileOfTable(table_id);
index_failed_checker_.CleanFailedIndexFileOfTable(table_id);
// scheduler will determine when to delete table files
auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource();
@ -1014,7 +1025,7 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex
GetFilesToBuildIndex(table_id, file_types, table_files);
times++;
IgnoreFailedIndexFiles(table_files);
index_failed_checker_.IgnoreFailedIndexFiles(table_files);
}
// build index for partition
@ -1029,7 +1040,7 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex
// failed to build index for some files, return error
std::vector<std::string> failed_files;
GetFailedIndexFileOfTable(table_id, failed_files);
index_failed_checker_.GetFailedIndexFileOfTable(table_id, failed_files);
if (!failed_files.empty()) {
std::string msg = "Failed to build index for " + std::to_string(failed_files.size()) +
((failed_files.size() == 1) ? " file" : " files");
@ -1047,7 +1058,7 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex
Status
DBImpl::DropTableIndexRecursively(const std::string& table_id) {
ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
CleanFailedIndexFileOfTable(table_id);
index_failed_checker_.CleanFailedIndexFileOfTable(table_id);
auto status = meta_ptr_->DropTableIndex(table_id);
if (!status.ok()) {
return status;
@ -1090,86 +1101,5 @@ DBImpl::GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_c
return Status::OK();
}
Status
DBImpl::CleanFailedIndexFileOfTable(const std::string& table_id) {
std::lock_guard<std::mutex> lck(index_failed_mutex_);
index_failed_files_.erase(table_id); // rebuild failed index files for this table
return Status::OK();
}
Status
DBImpl::GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files) {
failed_files.clear();
std::lock_guard<std::mutex> lck(index_failed_mutex_);
auto iter = index_failed_files_.find(table_id);
if (iter != index_failed_files_.end()) {
FileID2FailedTimes& failed_map = iter->second;
for (auto it_file = failed_map.begin(); it_file != failed_map.end(); ++it_file) {
failed_files.push_back(it_file->first);
}
}
return Status::OK();
}
Status
DBImpl::MarkFailedIndexFile(const meta::TableFileSchema& file) {
std::lock_guard<std::mutex> lck(index_failed_mutex_);
auto iter = index_failed_files_.find(file.table_id_);
if (iter == index_failed_files_.end()) {
FileID2FailedTimes failed_files;
failed_files.insert(std::make_pair(file.file_id_, 1));
index_failed_files_.insert(std::make_pair(file.table_id_, failed_files));
} else {
auto it_failed_files = iter->second.find(file.file_id_);
if (it_failed_files != iter->second.end()) {
it_failed_files->second++;
} else {
iter->second.insert(std::make_pair(file.file_id_, 1));
}
}
return Status::OK();
}
Status
DBImpl::MarkSucceedIndexFile(const meta::TableFileSchema& file) {
std::lock_guard<std::mutex> lck(index_failed_mutex_);
auto iter = index_failed_files_.find(file.table_id_);
if (iter != index_failed_files_.end()) {
iter->second.erase(file.file_id_);
}
return Status::OK();
}
Status
DBImpl::IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files) {
std::lock_guard<std::mutex> lck(index_failed_mutex_);
// there could be some failed files belong to different table.
// some files may has failed for several times, no need to build index for these files.
// thus we can avoid dead circle for build index operation
for (auto it_file = table_files.begin(); it_file != table_files.end();) {
auto it_failed_files = index_failed_files_.find((*it_file).table_id_);
if (it_failed_files != index_failed_files_.end()) {
auto it_failed_file = it_failed_files->second.find((*it_file).file_id_);
if (it_failed_file != it_failed_files->second.end()) {
if (it_failed_file->second >= INDEX_FAILED_RETRY_TIME) {
it_file = table_files.erase(it_file);
continue;
}
}
}
++it_file;
}
return Status::OK();
}
} // namespace engine
} // namespace milvus

View File

@ -18,8 +18,10 @@
#pragma once
#include "DB.h"
#include "Types.h"
#include "src/db/insert/MemManager.h"
#include "db/IndexFailedChecker.h"
#include "db/OngoingFileChecker.h"
#include "db/Types.h"
#include "db/insert/MemManager.h"
#include "utils/ThreadPool.h"
#include <atomic>
@ -178,21 +180,6 @@ class DBImpl : public DB {
Status
GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_count);
Status
CleanFailedIndexFileOfTable(const std::string& table_id);
Status
GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files);
Status
MarkFailedIndexFile(const meta::TableFileSchema& file);
Status
MarkSucceedIndexFile(const meta::TableFileSchema& file);
Status
IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files);
private:
const DBOptions options_;
@ -214,11 +201,10 @@ class DBImpl : public DB {
std::list<std::future<void>> index_thread_results_;
std::mutex build_index_mutex_;
std::mutex index_failed_mutex_;
using FileID2FailedTimes = std::map<std::string, uint64_t>;
using Table2FailedFiles = std::map<std::string, FileID2FailedTimes>;
Table2FailedFiles index_failed_files_; // file id mapping to failed times
}; // DBImpl
IndexFailedChecker index_failed_checker_;
OngoingFileChecker ongoing_files_checker_;
}; // DBImpl
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,109 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "db/IndexFailedChecker.h"
#include <utility>
namespace milvus {
namespace engine {
constexpr uint64_t INDEX_FAILED_RETRY_TIME = 1;
Status
IndexFailedChecker::CleanFailedIndexFileOfTable(const std::string& table_id) {
std::lock_guard<std::mutex> lck(mutex_);
index_failed_files_.erase(table_id); // rebuild failed index files for this table
return Status::OK();
}
Status
IndexFailedChecker::GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files) {
failed_files.clear();
std::lock_guard<std::mutex> lck(mutex_);
auto iter = index_failed_files_.find(table_id);
if (iter != index_failed_files_.end()) {
FileID2FailedTimes& failed_map = iter->second;
for (auto it_file = failed_map.begin(); it_file != failed_map.end(); ++it_file) {
failed_files.push_back(it_file->first);
}
}
return Status::OK();
}
Status
IndexFailedChecker::MarkFailedIndexFile(const meta::TableFileSchema& file) {
std::lock_guard<std::mutex> lck(mutex_);
auto iter = index_failed_files_.find(file.table_id_);
if (iter == index_failed_files_.end()) {
FileID2FailedTimes failed_files;
failed_files.insert(std::make_pair(file.file_id_, 1));
index_failed_files_.insert(std::make_pair(file.table_id_, failed_files));
} else {
auto it_failed_files = iter->second.find(file.file_id_);
if (it_failed_files != iter->second.end()) {
it_failed_files->second++;
} else {
iter->second.insert(std::make_pair(file.file_id_, 1));
}
}
return Status::OK();
}
Status
IndexFailedChecker::MarkSucceedIndexFile(const meta::TableFileSchema& file) {
std::lock_guard<std::mutex> lck(mutex_);
auto iter = index_failed_files_.find(file.table_id_);
if (iter != index_failed_files_.end()) {
iter->second.erase(file.file_id_);
}
return Status::OK();
}
Status
IndexFailedChecker::IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files) {
std::lock_guard<std::mutex> lck(mutex_);
// there could be some failed files belong to different table.
// some files may has failed for several times, no need to build index for these files.
// thus we can avoid dead circle for build index operation
for (auto it_file = table_files.begin(); it_file != table_files.end();) {
auto it_failed_files = index_failed_files_.find((*it_file).table_id_);
if (it_failed_files != index_failed_files_.end()) {
auto it_failed_file = it_failed_files->second.find((*it_file).file_id_);
if (it_failed_file != it_failed_files->second.end()) {
if (it_failed_file->second >= INDEX_FAILED_RETRY_TIME) {
it_file = table_files.erase(it_file);
continue;
}
}
}
++it_file;
}
return Status::OK();
}
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "meta/Meta.h"
#include "utils/Status.h"
#include <map>
#include <mutex>
#include <string>
#include <vector>
namespace milvus {
namespace engine {
class IndexFailedChecker {
public:
Status
CleanFailedIndexFileOfTable(const std::string& table_id);
Status
GetFailedIndexFileOfTable(const std::string& table_id, std::vector<std::string>& failed_files);
Status
MarkFailedIndexFile(const meta::TableFileSchema& file);
Status
MarkSucceedIndexFile(const meta::TableFileSchema& file);
Status
IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files);
private:
std::mutex mutex_;
using FileID2FailedTimes = std::map<std::string, uint64_t>;
using Table2FailedFiles = std::map<std::string, FileID2FailedTimes>;
Table2FailedFiles index_failed_files_; // file id mapping to failed times
};
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,98 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "db/OngoingFileChecker.h"
#include <utility>
namespace milvus {
namespace engine {
Status
OngoingFileChecker::MarkOngoingFile(const meta::TableFileSchema& table_file) {
std::lock_guard<std::mutex> lck(mutex_);
return MarkOngoingFileNoLock(table_file);
}
Status
OngoingFileChecker::MarkOngoingFiles(const meta::TableFilesSchema& table_files) {
std::lock_guard<std::mutex> lck(mutex_);
for (auto& table_file : table_files) {
MarkOngoingFileNoLock(table_file);
}
return Status::OK();
}
Status
OngoingFileChecker::UnmarkOngoingFile(const meta::TableFileSchema& table_file) {
std::lock_guard<std::mutex> lck(mutex_);
return UnmarkOngoingFileNoLock(table_file);
}
Status
OngoingFileChecker::UnmarkOngoingFiles(const meta::TableFilesSchema& table_files) {
std::lock_guard<std::mutex> lck(mutex_);
for (auto& table_file : table_files) {
UnmarkOngoingFileNoLock(table_file);
}
return Status::OK();
}
meta::Table2FileIDs
OngoingFileChecker::GetOngoingFiles() {
// return copy
// don't return reference(avoid multi-threads conflict)
return ongoing_files_;
}
Status
OngoingFileChecker::MarkOngoingFileNoLock(const meta::TableFileSchema& table_file) {
if (table_file.table_id_.empty() || table_file.file_id_.empty()) {
return Status(DB_ERROR, "Invalid table files");
}
auto iter = ongoing_files_.find(table_file.table_id_);
if (iter == ongoing_files_.end()) {
meta::FileIDArray file_ids = {table_file.file_id_};
ongoing_files_.insert(std::make_pair(table_file.table_id_, file_ids));
} else {
iter->second.insert(table_file.file_id_);
}
return Status::OK();
}
Status
OngoingFileChecker::UnmarkOngoingFileNoLock(const meta::TableFileSchema& table_file) {
if (table_file.table_id_.empty() || table_file.file_id_.empty()) {
return Status(DB_ERROR, "Invalid table files");
}
auto iter = ongoing_files_.find(table_file.table_id_);
if (iter != ongoing_files_.end()) {
iter->second.erase(table_file.file_id_);
}
return Status::OK();
}
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "meta/Meta.h"
#include "utils/Status.h"
#include <map>
#include <mutex>
#include <set>
#include <string>
namespace milvus {
namespace engine {
class OngoingFileChecker {
public:
Status
MarkOngoingFile(const meta::TableFileSchema& table_file);
Status
MarkOngoingFiles(const meta::TableFilesSchema& table_files);
Status
UnmarkOngoingFile(const meta::TableFileSchema& table_file);
Status
UnmarkOngoingFiles(const meta::TableFilesSchema& table_files);
meta::Table2FileIDs
GetOngoingFiles();
private:
Status
MarkOngoingFileNoLock(const meta::TableFileSchema& table_file);
Status
UnmarkOngoingFileNoLock(const meta::TableFileSchema& table_file);
private:
std::mutex mutex_;
meta::Table2FileIDs ongoing_files_;
};
} // namespace engine
} // namespace milvus

View File

@ -121,10 +121,10 @@ class Meta {
CleanUpShadowFiles() = 0;
virtual Status
CleanUpCacheWithTTL(uint64_t seconds) = 0;
CleanUpCacheWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) = 0;
virtual Status
CleanUpFilesWithTTL(uint64_t seconds) = 0;
CleanUpFilesWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) = 0;
virtual Status
DropAll() = 0;

View File

@ -23,6 +23,7 @@
#include <map>
#include <memory>
#include <set>
#include <string>
#include <vector>
@ -97,6 +98,9 @@ using TableFileSchemaPtr = std::shared_ptr<meta::TableFileSchema>;
using TableFilesSchema = std::vector<TableFileSchema>;
using DatePartionedTableFilesSchema = std::map<DateT, TableFilesSchema>;
using FileIDArray = std::set<std::string>;
using Table2FileIDs = std::map<std::string, FileIDArray>;
} // namespace meta
} // namespace engine
} // namespace milvus

View File

@ -1639,7 +1639,8 @@ MySQLMetaImpl::FilesByType(const std::string& table_id, const std::vector<int>&
case (int)TableFileSchema::BACKUP:
msg = msg + " backup files:" + std::to_string(backup_count);
break;
default:break;
default:
break;
}
}
ENGINE_LOG_DEBUG << msg;
@ -1782,7 +1783,7 @@ MySQLMetaImpl::CleanUpShadowFiles() {
}
Status
MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) {
MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) {
auto now = utils::GetMicroSecTimeStamp();
// erase deleted/backup files from cache
@ -1795,14 +1796,13 @@ MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query();
cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date"
<< " FROM " << META_TABLEFILES << " WHERE file_type IN ("
<< std::to_string(TableFileSchema::TO_DELETE) << ","
<< std::to_string(TableFileSchema::BACKUP) << ")"
<< " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";
mysqlpp::Query query = connectionPtr->query();
query << "SELECT id, table_id, file_id, date"
<< " FROM " << META_TABLEFILES << " WHERE file_type IN (" << std::to_string(TableFileSchema::TO_DELETE)
<< "," << std::to_string(TableFileSchema::BACKUP) << ")"
<< " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";
mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
mysqlpp::StoreQueryResult res = query.store();
TableFileSchema table_file;
std::vector<std::string> idsToDelete;
@ -1824,7 +1824,7 @@ MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) {
}
Status
MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) {
auto now = utils::GetMicroSecTimeStamp();
std::set<std::string> table_ids;
@ -1839,15 +1839,14 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query();
cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date"
<< " FROM " << META_TABLEFILES
<< " WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE)
<< " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";
mysqlpp::Query query = connectionPtr->query();
query << "SELECT id, table_id, file_id, date"
<< " FROM " << META_TABLEFILES << " WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE)
<< " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
mysqlpp::StoreQueryResult res = query.store();
TableFileSchema table_file;
std::vector<std::string> idsToDelete;
@ -1874,13 +1873,12 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
std::string idsToDeleteStr = idsToDeleteSS.str();
idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); // remove the last " OR "
cleanUpFilesWithTTLQuery << "DELETE FROM " << META_TABLEFILES << " WHERE " << idsToDeleteStr << ";";
query << "DELETE FROM " << META_TABLEFILES << " WHERE " << idsToDeleteStr << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
if (!cleanUpFilesWithTTLQuery.exec()) {
return HandleException("QUERY ERROR WHEN CLEANING UP FILES WITH TTL",
cleanUpFilesWithTTLQuery.error());
if (!query.exec()) {
return HandleException("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", query.error());
}
}
@ -1903,14 +1901,13 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
}
mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query();
cleanUpFilesWithTTLQuery << "SELECT id, table_id"
<< " FROM " << META_TABLES
<< " WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";";
mysqlpp::Query query = connectionPtr->query();
query << "SELECT id, table_id"
<< " FROM " << META_TABLES << " WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
mysqlpp::StoreQueryResult res = query.store();
int64_t remove_tables = 0;
if (!res.empty()) {
@ -1926,13 +1923,12 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
}
std::string idsToDeleteStr = idsToDeleteSS.str();
idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); // remove the last " OR "
cleanUpFilesWithTTLQuery << "DELETE FROM " << META_TABLES << " WHERE " << idsToDeleteStr << ";";
query << "DELETE FROM " << META_TABLES << " WHERE " << idsToDeleteStr << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
if (!cleanUpFilesWithTTLQuery.exec()) {
return HandleException("QUERY ERROR WHEN CLEANING UP TABLES WITH TTL",
cleanUpFilesWithTTLQuery.error());
if (!query.exec()) {
return HandleException("QUERY ERROR WHEN CLEANING UP TABLES WITH TTL", query.error());
}
}
@ -1957,14 +1953,13 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
}
for (auto& table_id : table_ids) {
mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query();
cleanUpFilesWithTTLQuery << "SELECT file_id"
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote
<< table_id << ";";
mysqlpp::Query query = connectionPtr->query();
query << "SELECT file_id"
<< " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str();
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str();
mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store();
mysqlpp::StoreQueryResult res = query.store();
if (res.empty()) {
utils::DeleteTablePath(options_, table_id);

View File

@ -120,10 +120,10 @@ class MySQLMetaImpl : public Meta {
CleanUpShadowFiles() override;
Status
CleanUpCacheWithTTL(uint64_t seconds) override;
CleanUpCacheWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) override;
Status
CleanUpFilesWithTTL(uint64_t seconds) override;
CleanUpFilesWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) override;
Status
DropAll() override;

View File

@ -1294,7 +1294,7 @@ SqliteMetaImpl::CleanUpShadowFiles() {
}
Status
SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) {
SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) {
auto now = utils::GetMicroSecTimeStamp();
// erase deleted/backup files from cache
@ -1309,6 +1309,7 @@ SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) {
(int)TableFileSchema::BACKUP,
};
// collect files to be erased
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_,
&TableFileSchema::file_id_,
@ -1326,6 +1327,15 @@ SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) {
table_file.file_id_ = std::get<2>(file);
table_file.date_ = std::get<3>(file);
// check if the file can be deleted
auto iter = ignore_files.find(table_file.table_id_);
if (iter != ignore_files.end()) {
if (iter->second.find(table_file.file_id_) != iter->second.end()) {
continue; // ignore this file, don't delete it
}
}
// erase file data from cache
utils::GetTableFilePath(options_, table_file);
server::CommonUtil::EraseFromCache(table_file.location_);
}
@ -1338,7 +1348,7 @@ SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) {
}
Status
SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) {
auto now = utils::GetMicroSecTimeStamp();
std::set<std::string> table_ids;
@ -1349,6 +1359,7 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
// collect files to be deleted
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
&TableFileSchema::table_id_,
&TableFileSchema::file_id_,
@ -1368,10 +1379,20 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) {
table_file.file_id_ = std::get<2>(file);
table_file.date_ = std::get<3>(file);
utils::DeleteTableFilePath(options_, table_file);
ENGINE_LOG_DEBUG << "Removing file id:" << table_file.file_id_ << " location:" << table_file.location_;
// check if the file can be deleted
auto iter = ignore_files.find(table_file.table_id_);
if (iter != ignore_files.end()) {
if (iter->second.find(table_file.file_id_) != iter->second.end()) {
continue; // ignore this file, don't delete it
}
}
// delete file from meta
ConnectorPtr->remove<TableFileSchema>(table_file.id_);
// delete file from disk storage
utils::DeleteTableFilePath(options_, table_file);
ENGINE_LOG_DEBUG << "Removing file id:" << table_file.file_id_ << " location:" << table_file.location_;
table_ids.insert(table_file.table_id_);
}
return true;

View File

@ -120,10 +120,10 @@ class SqliteMetaImpl : public Meta {
CleanUpShadowFiles() override;
Status
CleanUpCacheWithTTL(uint64_t seconds) override;
CleanUpCacheWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) override;
Status
CleanUpFilesWithTTL(uint64_t seconds) override;
CleanUpFilesWithTTL(uint64_t seconds, const Table2FileIDs& ignore_files) override;
Status
DropAll() override;

View File

@ -335,7 +335,8 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
status = impl_->DropTable(table_id);
ASSERT_TRUE(status.ok());
status = impl_->CleanUpFilesWithTTL(1UL);
milvus::engine::meta::Table2FileIDs ignore_files;
status = impl_->CleanUpFilesWithTTL(1UL, ignore_files);
ASSERT_TRUE(status.ok());
}

View File

@ -349,7 +349,8 @@ TEST_F(MySqlMetaTest, TABLE_FILES_TEST) {
status = impl_->DropTable(table_id);
ASSERT_TRUE(status.ok());
status = impl_->CleanUpFilesWithTTL(0UL);
milvus::engine::meta::Table2FileIDs ignore_files;
status = impl_->CleanUpFilesWithTTL(0UL, ignore_files);
ASSERT_TRUE(status.ok());
}