mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-01 00:15:30 +08:00
refine code
This commit is contained in:
parent
4a183c4116
commit
bc2dba26a2
2
core/src/cache/Cache.inl
vendored
2
core/src/cache/Cache.inl
vendored
@ -179,7 +179,7 @@ Cache<ItemObj>::print() {
|
||||
}
|
||||
|
||||
SERVER_LOG_DEBUG << "[Cache item count]: " << cache_count;
|
||||
#if 0
|
||||
#if 1
|
||||
for (auto it = lru_.begin(); it != lru_.end(); ++it) {
|
||||
SERVER_LOG_DEBUG << it->first;
|
||||
}
|
||||
|
||||
@ -705,20 +705,27 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m
|
||||
|
||||
// step 3: serialize to disk
|
||||
try {
|
||||
index->Serialize();
|
||||
status = index->Serialize();
|
||||
if (status.ok()) {
|
||||
ENGINE_LOG_ERROR << status.message();
|
||||
}
|
||||
} catch (std::exception& ex) {
|
||||
// typical error: out of disk space or permition denied
|
||||
std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
|
||||
ENGINE_LOG_ERROR << msg;
|
||||
status = Status(DB_ERROR, msg);
|
||||
}
|
||||
|
||||
if (!status.ok()) {
|
||||
// if failed to serialize merge file to disk
|
||||
// typical error: out of disk space, out of memory or permition denied
|
||||
table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
|
||||
status = meta_ptr_->UpdateTableFile(table_file);
|
||||
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
|
||||
|
||||
std::cout << "ERROR: failed to persist merged index file: " << table_file.location_
|
||||
<< ", possible out of disk space" << std::endl;
|
||||
ENGINE_LOG_ERROR << "ERROR: failed to persist merged file: " << table_file.location_
|
||||
<< ", possible out of disk space or memory";
|
||||
|
||||
return Status(DB_ERROR, msg);
|
||||
return status;
|
||||
}
|
||||
|
||||
// step 4: update table files state
|
||||
@ -792,12 +799,7 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
|
||||
meta_ptr_->Archive();
|
||||
|
||||
{
|
||||
uint64_t ttl = 10 * meta::SECOND; // default: file data will be erase from cache after few seconds
|
||||
meta_ptr_->CleanUpCacheWithTTL(ttl, &ongoing_files_checker_);
|
||||
}
|
||||
|
||||
{
|
||||
uint64_t ttl = 20 * meta::SECOND; // default: file will be deleted after few seconds
|
||||
uint64_t ttl = 1 * meta::SECOND; // default: file will be deleted after few seconds
|
||||
if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) {
|
||||
ttl = meta::H_SEC;
|
||||
}
|
||||
|
||||
@ -265,6 +265,11 @@ ExecutionEngineImpl::Serialize() {
|
||||
index_->set_size(PhysicalSize());
|
||||
ENGINE_LOG_DEBUG << "Finish serialize index file: " << location_ << " size: " << index_->Size();
|
||||
|
||||
if (index_->Size() == 0) {
|
||||
std::string msg = "Failed to serialize file: " + location_ + " reason: out of disk space or memory";
|
||||
status = Status(DB_ERROR, msg);
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
|
||||
@ -127,9 +127,6 @@ class Meta {
|
||||
virtual Status
|
||||
CleanUpShadowFiles() = 0;
|
||||
|
||||
virtual Status
|
||||
CleanUpCacheWithTTL(uint64_t seconds, CleanUpFilter* filter = nullptr) = 0;
|
||||
|
||||
virtual Status
|
||||
CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter = nullptr) = 0;
|
||||
|
||||
|
||||
@ -1782,55 +1782,6 @@ MySQLMetaImpl::CleanUpShadowFiles() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds, CleanUpFilter* filter) {
|
||||
auto now = utils::GetMicroSecTimeStamp();
|
||||
|
||||
// erase deleted/backup files from cache
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
|
||||
|
||||
if (connectionPtr == nullptr) {
|
||||
return Status(DB_ERROR, "Failed to connect to meta server(mysql)");
|
||||
}
|
||||
|
||||
mysqlpp::Query query = connectionPtr->query();
|
||||
query << "SELECT id, table_id, file_id, date"
|
||||
<< " FROM " << META_TABLEFILES << " WHERE file_type IN (" << std::to_string(TableFileSchema::TO_DELETE)
|
||||
<< "," << std::to_string(TableFileSchema::BACKUP) << ")"
|
||||
<< " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";";
|
||||
|
||||
mysqlpp::StoreQueryResult res = query.store();
|
||||
|
||||
TableFileSchema table_file;
|
||||
std::vector<std::string> idsToDelete;
|
||||
|
||||
for (auto& resRow : res) {
|
||||
table_file.id_ = resRow["id"]; // implicit conversion
|
||||
resRow["table_id"].to_string(table_file.table_id_);
|
||||
resRow["file_id"].to_string(table_file.file_id_);
|
||||
table_file.date_ = resRow["date"];
|
||||
|
||||
// check if the file can be erased
|
||||
if (filter && filter->IsIgnored(table_file)) {
|
||||
ENGINE_LOG_DEBUG << "File:" << table_file.file_id_
|
||||
<< " currently is in use, not able to erase from cache now";
|
||||
continue; // ignore this file, don't erase it
|
||||
}
|
||||
|
||||
// erase file data from cache
|
||||
utils::GetTableFilePath(options_, table_file);
|
||||
server::CommonUtil::EraseFromCache(table_file.location_);
|
||||
}
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", e.what());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) {
|
||||
auto now = utils::GetMicroSecTimeStamp();
|
||||
@ -1876,6 +1827,9 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) {
|
||||
utils::DeleteTableFilePath(options_, table_file);
|
||||
ENGINE_LOG_DEBUG << "Removing file id:" << table_file.id_ << " location:" << table_file.location_;
|
||||
|
||||
// erase file data from cache
|
||||
server::CommonUtil::EraseFromCache(table_file.location_);
|
||||
|
||||
idsToDelete.emplace_back(std::to_string(table_file.id_));
|
||||
table_ids.insert(table_file.table_id_);
|
||||
}
|
||||
|
||||
@ -119,9 +119,6 @@ class MySQLMetaImpl : public Meta {
|
||||
Status
|
||||
CleanUpShadowFiles() override;
|
||||
|
||||
Status
|
||||
CleanUpCacheWithTTL(uint64_t seconds, CleanUpFilter* filter = nullptr) override;
|
||||
|
||||
Status
|
||||
CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter = nullptr) override;
|
||||
|
||||
|
||||
@ -1293,59 +1293,6 @@ SqliteMetaImpl::CleanUpShadowFiles() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds, CleanUpFilter* filter) {
|
||||
auto now = utils::GetMicroSecTimeStamp();
|
||||
|
||||
// erase deleted/backup files from cache
|
||||
try {
|
||||
server::MetricCollector metric;
|
||||
|
||||
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
|
||||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||
|
||||
std::vector<int> file_types = {
|
||||
(int)TableFileSchema::TO_DELETE,
|
||||
(int)TableFileSchema::BACKUP,
|
||||
};
|
||||
|
||||
// collect files to be erased
|
||||
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||
&TableFileSchema::table_id_,
|
||||
&TableFileSchema::file_id_,
|
||||
&TableFileSchema::date_),
|
||||
where(
|
||||
in(&TableFileSchema::file_type_, file_types)
|
||||
and
|
||||
c(&TableFileSchema::updated_time_)
|
||||
< now - seconds * US_PS));
|
||||
|
||||
for (auto& file : files) {
|
||||
TableFileSchema table_file;
|
||||
table_file.id_ = std::get<0>(file);
|
||||
table_file.table_id_ = std::get<1>(file);
|
||||
table_file.file_id_ = std::get<2>(file);
|
||||
table_file.date_ = std::get<3>(file);
|
||||
|
||||
// check if the file can be erased
|
||||
if (filter && filter->IsIgnored(table_file)) {
|
||||
ENGINE_LOG_DEBUG << "File:" << table_file.file_id_
|
||||
<< " currently is in use, not able to erase from cache now";
|
||||
continue; // ignore this file, don't erase it
|
||||
}
|
||||
|
||||
// erase file data from cache
|
||||
utils::GetTableFilePath(options_, table_file);
|
||||
server::CommonUtil::EraseFromCache(table_file.location_);
|
||||
}
|
||||
|
||||
} catch (std::exception& e) {
|
||||
return HandleException("Encounter exception when clean cache", e.what());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status
|
||||
SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) {
|
||||
auto now = utils::GetMicroSecTimeStamp();
|
||||
@ -1390,6 +1337,10 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) {
|
||||
|
||||
// delete file from disk storage
|
||||
utils::DeleteTableFilePath(options_, table_file);
|
||||
|
||||
// erase from cache
|
||||
server::CommonUtil::EraseFromCache(table_file.location_);
|
||||
|
||||
ENGINE_LOG_DEBUG << "Removing file id:" << table_file.file_id_ << " location:" << table_file.location_;
|
||||
table_ids.insert(table_file.table_id_);
|
||||
}
|
||||
|
||||
@ -119,9 +119,6 @@ class SqliteMetaImpl : public Meta {
|
||||
Status
|
||||
CleanUpShadowFiles() override;
|
||||
|
||||
Status
|
||||
CleanUpCacheWithTTL(uint64_t seconds, CleanUpFilter* filter = nullptr) override;
|
||||
|
||||
Status
|
||||
CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter = nullptr) override;
|
||||
|
||||
|
||||
@ -170,22 +170,26 @@ XBuildIndexTask::Execute() {
|
||||
try {
|
||||
status = index->Serialize();
|
||||
if (status.ok()) {
|
||||
ENGINE_LOG_DEBUG << "Failed to serilize index file: " << status.message();
|
||||
ENGINE_LOG_ERROR << status.message();
|
||||
}
|
||||
} catch (std::exception& ex) {
|
||||
// typical error: out of disk space or permition denied
|
||||
std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
|
||||
ENGINE_LOG_ERROR << msg;
|
||||
status = Status(DB_ERROR, msg);
|
||||
}
|
||||
|
||||
if (!status.ok()) {
|
||||
// if failed to serialize index file to disk
|
||||
// typical error: out of disk space, out of memory or permition denied
|
||||
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
|
||||
status = meta_ptr->UpdateTableFile(table_file);
|
||||
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
|
||||
|
||||
ENGINE_LOG_ERROR << "Failed to persist index file: " << table_file.location_
|
||||
<< ", possible out of disk space";
|
||||
<< ", possible out of disk space or memory";
|
||||
|
||||
build_index_job->BuildIndexDone(to_index_id_);
|
||||
build_index_job->GetStatus() = Status(DB_ERROR, msg);
|
||||
build_index_job->GetStatus() = status;
|
||||
to_index_engine_ = nullptr;
|
||||
return;
|
||||
}
|
||||
@ -200,10 +204,8 @@ XBuildIndexTask::Execute() {
|
||||
|
||||
engine::meta::TableFilesSchema update_files = {table_file, origin_file};
|
||||
|
||||
if (table_file.file_size_ > 0) { // makesure index file is sucessfully serialized to disk
|
||||
if (status.ok()) { // makesure index file is sucessfully serialized to disk
|
||||
status = meta_ptr->UpdateTableFiles(update_files);
|
||||
} else {
|
||||
status = Status(DB_ERROR, "Illegal index file: out of disk space or memory");
|
||||
}
|
||||
|
||||
if (status.ok()) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user