diff --git a/CHANGELOG.md b/CHANGELOG.md index 7956e513fc..f7d5bd9504 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#552 - Server down during building index_type: IVF_PQ using GPU-edition - \#561 - Milvus server should report exception/error message or terminate on mysql metadata backend error - \#579 - Build index hang in GPU version when gpu_resources disabled +- \#596 - Frequently insert operation cost too much disk space - \#599 - Build index log is incorrect - \#602 - Optimizer specify wrong gpu_id - \#606 - No log generated during building index with CPU @@ -59,6 +60,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#502 - C++ SDK support IVFPQ and SPTAG - \#560 - Add version in server config file - \#605 - Print more messages when server start +- \#644 - Add a new rpc command to get milvus build version whether cpu or gpu ## Improvement - \#255 - Add ivfsq8 test report detailed version diff --git a/core/src/cache/Cache.inl b/core/src/cache/Cache.inl index 9ebec7cfdd..5b68e54249 100644 --- a/core/src/cache/Cache.inl +++ b/core/src/cache/Cache.inl @@ -176,6 +176,11 @@ Cache::print() { { std::lock_guard lock(mutex_); cache_count = lru_.size(); +#if 0 + for (auto it = lru_.begin(); it != lru_.end(); ++it) { + SERVER_LOG_DEBUG << it->first; + } +#endif } SERVER_LOG_DEBUG << "[Cache item count]: " << cache_count; diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index e2099739ed..8f8516770f 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -52,8 +52,6 @@ constexpr uint64_t METRIC_ACTION_INTERVAL = 1; constexpr uint64_t COMPACT_ACTION_INTERVAL = 1; constexpr uint64_t INDEX_ACTION_INTERVAL = 1; -constexpr uint64_t INDEX_FAILED_RETRY_TIME = 1; - static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milsvus server is shutdown!"); void @@ -370,7 +368,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) { WaitMergeFileFinish(); // step 4: wait and build index - status = CleanFailedIndexFileOfTable(table_id); + status = index_failed_checker_.CleanFailedIndexFileOfTable(table_id); status = BuildTableIndexRecursively(table_id, index); return status; @@ -504,7 +502,9 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi TimeRecorder rc(""); - // step 1: get files to search + // step 1: construct search job + auto status = ongoing_files_checker_.MarkOngoingFiles(files); + ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size(); scheduler::SearchJobPtr job = std::make_shared(k, nq, nprobe, vectors); for (auto& file : files) { @@ -512,9 +512,11 @@ DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& fi job->AddIndexFile(file_ptr); } - // step 2: put search task to scheduler + // step 2: put search job to scheduler and wait result scheduler::JobMgrInst::GetInstance()->Put(job); job->WaitResult(); + + status = ongoing_files_checker_.UnmarkOngoingFiles(files); if (!job->GetStatus().ok()) { return job->GetStatus(); } @@ -693,7 +695,6 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m auto file_schema = file; file_schema.file_type_ = meta::TableFileSchema::TO_DELETE; updated.push_back(file_schema); - ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_; index_size = index->Size(); if (index_size >= file_schema.index_file_size_) { @@ -703,20 +704,27 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m // step 3: serialize to disk try { - index->Serialize(); + status = index->Serialize(); + if (!status.ok()) { + ENGINE_LOG_ERROR << status.message(); + } } catch (std::exception& ex) { - // typical error: out of disk space or permition denied std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; + status = Status(DB_ERROR, msg); + } + if (!status.ok()) { + // if failed to serialize merge file to disk + // typical error: out of disk space, out of memory or permition denied table_file.file_type_ = meta::TableFileSchema::TO_DELETE; status = meta_ptr_->UpdateTableFile(table_file); ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; - std::cout << "ERROR: failed to persist merged index file: " << table_file.location_ - << ", possible out of disk space" << std::endl; + ENGINE_LOG_ERROR << "Failed to persist merged file: " << table_file.location_ + << ", possible out of disk space or memory"; - return Status(DB_ERROR, msg); + return status; } // step 4: update table files state @@ -751,13 +759,15 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) { } for (auto& kv : raw_files) { - auto files = kv.second; + meta::TableFilesSchema& files = kv.second; if (files.size() < options_.merge_trigger_number_) { ENGINE_LOG_TRACE << "Files number not greater equal than merge trigger number, skip merge action"; continue; } + status = ongoing_files_checker_.MarkOngoingFiles(files); MergeFiles(table_id, kv.first, kv.second); + status = ongoing_files_checker_.UnmarkOngoingFiles(files); if (shutting_down_.load(std::memory_order_acquire)) { ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id; @@ -788,16 +798,12 @@ DBImpl::BackgroundCompaction(std::set table_ids) { meta_ptr_->Archive(); { - uint64_t ttl = 10 * meta::SECOND; // default: file data will be erase from cache after few seconds - meta_ptr_->CleanUpCacheWithTTL(ttl); - } - - { - uint64_t ttl = 5 * meta::M_SEC; // default: file will be deleted after few minutes + uint64_t ttl = 10 * meta::SECOND; // default: file will be hard-deleted few seconds after soft-deleted if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) { - ttl = meta::D_SEC; + ttl = meta::H_SEC; } - meta_ptr_->CleanUpFilesWithTTL(ttl); + + meta_ptr_->CleanUpFilesWithTTL(ttl, &ongoing_files_checker_); } // ENGINE_LOG_TRACE << " Background compaction thread exit"; @@ -833,14 +839,15 @@ DBImpl::StartBuildIndexTask(bool force) { void DBImpl::BackgroundBuildIndex() { - // ENGINE_LOG_TRACE << "Background build index thread start"; - std::unique_lock lock(build_index_mutex_); meta::TableFilesSchema to_index_files; meta_ptr_->FilesToIndex(to_index_files); - Status status = IgnoreFailedIndexFiles(to_index_files); + Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files); if (!to_index_files.empty()) { + ENGINE_LOG_DEBUG << "Background build index thread begin"; + status = ongoing_files_checker_.MarkOngoingFiles(to_index_files); + // step 2: put build index task to scheduler std::vector> job2file_map; for (auto& file : to_index_files) { @@ -851,6 +858,7 @@ DBImpl::BackgroundBuildIndex() { job2file_map.push_back(std::make_pair(job, file_ptr)); } + // step 3: wait build index finished and mark failed files for (auto iter = job2file_map.begin(); iter != job2file_map.end(); ++iter) { scheduler::BuildIndexJobPtr job = iter->first; meta::TableFileSchema& file_schema = *(iter->second.get()); @@ -859,17 +867,17 @@ DBImpl::BackgroundBuildIndex() { Status status = job->GetStatus(); ENGINE_LOG_ERROR << "Building index job " << job->id() << " failed: " << status.ToString(); - MarkFailedIndexFile(file_schema); + index_failed_checker_.MarkFailedIndexFile(file_schema); } else { - MarkSucceedIndexFile(file_schema); ENGINE_LOG_DEBUG << "Building index job " << job->id() << " succeed."; + + index_failed_checker_.MarkSucceedIndexFile(file_schema); } + status = ongoing_files_checker_.UnmarkOngoingFile(file_schema); } ENGINE_LOG_DEBUG << "Background build index thread finished"; } - - // ENGINE_LOG_TRACE << "Background build index thread exit"; } Status @@ -894,6 +902,8 @@ DBImpl::GetFilesToBuildIndex(const std::string& table_id, const std::vector Status DBImpl::GetFilesToSearch(const std::string& table_id, const std::vector& file_ids, const meta::DatesT& dates, meta::TableFilesSchema& files) { + ENGINE_LOG_DEBUG << "Collect files from table: " << table_id; + meta::DatePartionedTableFilesSchema date_files; auto status = meta_ptr_->FilesToSearch(table_id, file_ids, dates, date_files); if (!status.ok()) { @@ -934,7 +944,7 @@ DBImpl::DropTableRecursively(const std::string& table_id, const meta::DatesT& da if (dates.empty()) { status = mem_mgr_->EraseMemVector(table_id); // not allow insert status = meta_ptr_->DropTable(table_id); // soft delete table - CleanFailedIndexFileOfTable(table_id); + index_failed_checker_.CleanFailedIndexFileOfTable(table_id); // scheduler will determine when to delete table files auto nres = scheduler::ResMgrInst::GetInstance()->GetNumOfComputeResource(); @@ -1014,7 +1024,7 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex GetFilesToBuildIndex(table_id, file_types, table_files); times++; - IgnoreFailedIndexFiles(table_files); + index_failed_checker_.IgnoreFailedIndexFiles(table_files); } // build index for partition @@ -1029,7 +1039,7 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex // failed to build index for some files, return error std::vector failed_files; - GetFailedIndexFileOfTable(table_id, failed_files); + index_failed_checker_.GetFailedIndexFileOfTable(table_id, failed_files); if (!failed_files.empty()) { std::string msg = "Failed to build index for " + std::to_string(failed_files.size()) + ((failed_files.size() == 1) ? " file" : " files"); @@ -1043,7 +1053,7 @@ DBImpl::BuildTableIndexRecursively(const std::string& table_id, const TableIndex Status DBImpl::DropTableIndexRecursively(const std::string& table_id) { ENGINE_LOG_DEBUG << "Drop index for table: " << table_id; - CleanFailedIndexFileOfTable(table_id); + index_failed_checker_.CleanFailedIndexFileOfTable(table_id); auto status = meta_ptr_->DropTableIndex(table_id); if (!status.ok()) { return status; @@ -1086,86 +1096,5 @@ DBImpl::GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_c return Status::OK(); } -Status -DBImpl::CleanFailedIndexFileOfTable(const std::string& table_id) { - std::lock_guard lck(index_failed_mutex_); - index_failed_files_.erase(table_id); // rebuild failed index files for this table - - return Status::OK(); -} - -Status -DBImpl::GetFailedIndexFileOfTable(const std::string& table_id, std::vector& failed_files) { - failed_files.clear(); - std::lock_guard lck(index_failed_mutex_); - auto iter = index_failed_files_.find(table_id); - if (iter != index_failed_files_.end()) { - FileID2FailedTimes& failed_map = iter->second; - for (auto it_file = failed_map.begin(); it_file != failed_map.end(); ++it_file) { - failed_files.push_back(it_file->first); - } - } - - return Status::OK(); -} - -Status -DBImpl::MarkFailedIndexFile(const meta::TableFileSchema& file) { - std::lock_guard lck(index_failed_mutex_); - - auto iter = index_failed_files_.find(file.table_id_); - if (iter == index_failed_files_.end()) { - FileID2FailedTimes failed_files; - failed_files.insert(std::make_pair(file.file_id_, 1)); - index_failed_files_.insert(std::make_pair(file.table_id_, failed_files)); - } else { - auto it_failed_files = iter->second.find(file.file_id_); - if (it_failed_files != iter->second.end()) { - it_failed_files->second++; - } else { - iter->second.insert(std::make_pair(file.file_id_, 1)); - } - } - - return Status::OK(); -} - -Status -DBImpl::MarkSucceedIndexFile(const meta::TableFileSchema& file) { - std::lock_guard lck(index_failed_mutex_); - - auto iter = index_failed_files_.find(file.table_id_); - if (iter != index_failed_files_.end()) { - iter->second.erase(file.file_id_); - } - - return Status::OK(); -} - -Status -DBImpl::IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files) { - std::lock_guard lck(index_failed_mutex_); - - // there could be some failed files belong to different table. - // some files may has failed for several times, no need to build index for these files. - // thus we can avoid dead circle for build index operation - for (auto it_file = table_files.begin(); it_file != table_files.end();) { - auto it_failed_files = index_failed_files_.find((*it_file).table_id_); - if (it_failed_files != index_failed_files_.end()) { - auto it_failed_file = it_failed_files->second.find((*it_file).file_id_); - if (it_failed_file != it_failed_files->second.end()) { - if (it_failed_file->second >= INDEX_FAILED_RETRY_TIME) { - it_file = table_files.erase(it_file); - continue; - } - } - } - - ++it_file; - } - - return Status::OK(); -} - } // namespace engine } // namespace milvus diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index 3baac92c0a..5091160d63 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -18,8 +18,10 @@ #pragma once #include "DB.h" -#include "Types.h" -#include "src/db/insert/MemManager.h" +#include "db/IndexFailedChecker.h" +#include "db/OngoingFileChecker.h" +#include "db/Types.h" +#include "db/insert/MemManager.h" #include "utils/ThreadPool.h" #include @@ -178,21 +180,6 @@ class DBImpl : public DB { Status GetTableRowCountRecursively(const std::string& table_id, uint64_t& row_count); - Status - CleanFailedIndexFileOfTable(const std::string& table_id); - - Status - GetFailedIndexFileOfTable(const std::string& table_id, std::vector& failed_files); - - Status - MarkFailedIndexFile(const meta::TableFileSchema& file); - - Status - MarkSucceedIndexFile(const meta::TableFileSchema& file); - - Status - IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files); - private: const DBOptions options_; @@ -214,11 +201,10 @@ class DBImpl : public DB { std::list> index_thread_results_; std::mutex build_index_mutex_; - std::mutex index_failed_mutex_; - using FileID2FailedTimes = std::map; - using Table2FailedFiles = std::map; - Table2FailedFiles index_failed_files_; // file id mapping to failed times -}; // DBImpl + + IndexFailedChecker index_failed_checker_; + OngoingFileChecker ongoing_files_checker_; +}; // DBImpl } // namespace engine } // namespace milvus diff --git a/core/src/db/IndexFailedChecker.cpp b/core/src/db/IndexFailedChecker.cpp new file mode 100644 index 0000000000..2ed22d75c4 --- /dev/null +++ b/core/src/db/IndexFailedChecker.cpp @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "db/IndexFailedChecker.h" + +#include + +namespace milvus { +namespace engine { + +constexpr uint64_t INDEX_FAILED_RETRY_TIME = 1; + +Status +IndexFailedChecker::CleanFailedIndexFileOfTable(const std::string& table_id) { + std::lock_guard lck(mutex_); + index_failed_files_.erase(table_id); // rebuild failed index files for this table + + return Status::OK(); +} + +Status +IndexFailedChecker::GetFailedIndexFileOfTable(const std::string& table_id, std::vector& failed_files) { + failed_files.clear(); + std::lock_guard lck(mutex_); + auto iter = index_failed_files_.find(table_id); + if (iter != index_failed_files_.end()) { + File2RefCount& failed_map = iter->second; + for (auto it_file = failed_map.begin(); it_file != failed_map.end(); ++it_file) { + failed_files.push_back(it_file->first); + } + } + + return Status::OK(); +} + +Status +IndexFailedChecker::MarkFailedIndexFile(const meta::TableFileSchema& file) { + std::lock_guard lck(mutex_); + + auto iter = index_failed_files_.find(file.table_id_); + if (iter == index_failed_files_.end()) { + File2RefCount failed_files; + failed_files.insert(std::make_pair(file.file_id_, 1)); + index_failed_files_.insert(std::make_pair(file.table_id_, failed_files)); + } else { + auto it_failed_files = iter->second.find(file.file_id_); + if (it_failed_files != iter->second.end()) { + it_failed_files->second++; + } else { + iter->second.insert(std::make_pair(file.file_id_, 1)); + } + } + + return Status::OK(); +} + +Status +IndexFailedChecker::MarkSucceedIndexFile(const meta::TableFileSchema& file) { + std::lock_guard lck(mutex_); + + auto iter = index_failed_files_.find(file.table_id_); + if (iter != index_failed_files_.end()) { + iter->second.erase(file.file_id_); + if (iter->second.empty()) { + index_failed_files_.erase(file.table_id_); + } + } + + return Status::OK(); +} + +Status +IndexFailedChecker::IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files) { + std::lock_guard lck(mutex_); + + // there could be some failed files belong to different table. + // some files may has failed for several times, no need to build index for these files. + // thus we can avoid dead circle for build index operation + for (auto it_file = table_files.begin(); it_file != table_files.end();) { + auto it_failed_files = index_failed_files_.find((*it_file).table_id_); + if (it_failed_files != index_failed_files_.end()) { + auto it_failed_file = it_failed_files->second.find((*it_file).file_id_); + if (it_failed_file != it_failed_files->second.end()) { + if (it_failed_file->second >= INDEX_FAILED_RETRY_TIME) { + it_file = table_files.erase(it_file); + continue; + } + } + } + + ++it_file; + } + + return Status::OK(); +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/IndexFailedChecker.h b/core/src/db/IndexFailedChecker.h new file mode 100644 index 0000000000..cf9ea990fe --- /dev/null +++ b/core/src/db/IndexFailedChecker.h @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "db/Types.h" +#include "meta/Meta.h" +#include "utils/Status.h" + +#include +#include +#include +#include + +namespace milvus { +namespace engine { + +class IndexFailedChecker { + public: + Status + CleanFailedIndexFileOfTable(const std::string& table_id); + + Status + GetFailedIndexFileOfTable(const std::string& table_id, std::vector& failed_files); + + Status + MarkFailedIndexFile(const meta::TableFileSchema& file); + + Status + MarkSucceedIndexFile(const meta::TableFileSchema& file); + + Status + IgnoreFailedIndexFiles(meta::TableFilesSchema& table_files); + + private: + std::mutex mutex_; + Table2Files index_failed_files_; // table id mapping to (file id mapping to failed times) +}; + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/OngoingFileChecker.cpp b/core/src/db/OngoingFileChecker.cpp new file mode 100644 index 0000000000..3c3eb4011a --- /dev/null +++ b/core/src/db/OngoingFileChecker.cpp @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "db/OngoingFileChecker.h" +#include "utils/Log.h" + +#include + +namespace milvus { +namespace engine { + +Status +OngoingFileChecker::MarkOngoingFile(const meta::TableFileSchema& table_file) { + std::lock_guard lck(mutex_); + return MarkOngoingFileNoLock(table_file); +} + +Status +OngoingFileChecker::MarkOngoingFiles(const meta::TableFilesSchema& table_files) { + std::lock_guard lck(mutex_); + + for (auto& table_file : table_files) { + MarkOngoingFileNoLock(table_file); + } + + return Status::OK(); +} + +Status +OngoingFileChecker::UnmarkOngoingFile(const meta::TableFileSchema& table_file) { + std::lock_guard lck(mutex_); + return UnmarkOngoingFileNoLock(table_file); +} + +Status +OngoingFileChecker::UnmarkOngoingFiles(const meta::TableFilesSchema& table_files) { + std::lock_guard lck(mutex_); + + for (auto& table_file : table_files) { + UnmarkOngoingFileNoLock(table_file); + } + + return Status::OK(); +} + +bool +OngoingFileChecker::IsIgnored(const meta::TableFileSchema& schema) { + std::lock_guard lck(mutex_); + + auto iter = ongoing_files_.find(schema.table_id_); + if (iter == ongoing_files_.end()) { + return false; + } else { + auto it_file = iter->second.find(schema.file_id_); + if (it_file == iter->second.end()) { + return false; + } else { + return (it_file->second > 0); + } + } +} + +Status +OngoingFileChecker::MarkOngoingFileNoLock(const meta::TableFileSchema& table_file) { + if (table_file.table_id_.empty() || table_file.file_id_.empty()) { + return Status(DB_ERROR, "Invalid table files"); + } + + auto iter = ongoing_files_.find(table_file.table_id_); + if (iter == ongoing_files_.end()) { + File2RefCount files_refcount; + files_refcount.insert(std::make_pair(table_file.file_id_, 1)); + ongoing_files_.insert(std::make_pair(table_file.table_id_, files_refcount)); + } else { + auto it_file = iter->second.find(table_file.file_id_); + if (it_file == iter->second.end()) { + iter->second[table_file.file_id_] = 1; + } else { + it_file->second++; + } + } + + ENGINE_LOG_DEBUG << "Mark ongoing file:" << table_file.file_id_ + << " refcount:" << ongoing_files_[table_file.table_id_][table_file.file_id_]; + + return Status::OK(); +} + +Status +OngoingFileChecker::UnmarkOngoingFileNoLock(const meta::TableFileSchema& table_file) { + if (table_file.table_id_.empty() || table_file.file_id_.empty()) { + return Status(DB_ERROR, "Invalid table files"); + } + + auto iter = ongoing_files_.find(table_file.table_id_); + if (iter != ongoing_files_.end()) { + auto it_file = iter->second.find(table_file.file_id_); + if (it_file != iter->second.end()) { + it_file->second--; + + ENGINE_LOG_DEBUG << "Unmark ongoing file:" << table_file.file_id_ << " refcount:" << it_file->second; + + if (it_file->second <= 0) { + iter->second.erase(table_file.file_id_); + if (iter->second.empty()) { + ongoing_files_.erase(table_file.table_id_); + } + } + } + } + + return Status::OK(); +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/OngoingFileChecker.h b/core/src/db/OngoingFileChecker.h new file mode 100644 index 0000000000..2e52fdeea6 --- /dev/null +++ b/core/src/db/OngoingFileChecker.h @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "db/Types.h" +#include "meta/Meta.h" +#include "utils/Status.h" + +#include +#include +#include +#include + +namespace milvus { +namespace engine { + +class OngoingFileChecker : public meta::Meta::CleanUpFilter { + public: + Status + MarkOngoingFile(const meta::TableFileSchema& table_file); + + Status + MarkOngoingFiles(const meta::TableFilesSchema& table_files); + + Status + UnmarkOngoingFile(const meta::TableFileSchema& table_file); + + Status + UnmarkOngoingFiles(const meta::TableFilesSchema& table_files); + + bool + IsIgnored(const meta::TableFileSchema& schema) override; + + private: + Status + MarkOngoingFileNoLock(const meta::TableFileSchema& table_file); + + Status + UnmarkOngoingFileNoLock(const meta::TableFileSchema& table_file); + + private: + std::mutex mutex_; + Table2Files ongoing_files_; // table id mapping to (file id mapping to ongoing ref-count) +}; + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/Types.h b/core/src/db/Types.h index 76c06126f8..ca6f97849a 100644 --- a/core/src/db/Types.h +++ b/core/src/db/Types.h @@ -21,6 +21,9 @@ #include #include +#include +#include +#include #include #include @@ -40,5 +43,8 @@ struct TableIndex { int32_t metric_type_ = (int)MetricType::L2; }; +using File2RefCount = std::map; +using Table2Files = std::map; + } // namespace engine } // namespace milvus diff --git a/core/src/db/engine/ExecutionEngineImpl.cpp b/core/src/db/engine/ExecutionEngineImpl.cpp index 70f2d6ef2d..f3fd4a67cf 100644 --- a/core/src/db/engine/ExecutionEngineImpl.cpp +++ b/core/src/db/engine/ExecutionEngineImpl.cpp @@ -271,6 +271,12 @@ ExecutionEngineImpl::Serialize() { // here we reset index size by file size, // since some index type(such as SQ8) data size become smaller after serialized index_->set_size(PhysicalSize()); + ENGINE_LOG_DEBUG << "Finish serialize index file: " << location_ << " size: " << index_->Size(); + + if (index_->Size() == 0) { + std::string msg = "Failed to serialize file: " + location_ + " reason: out of disk space or memory"; + status = Status(DB_ERROR, msg); + } return status; } @@ -465,7 +471,9 @@ ExecutionEngineImpl::Merge(const std::string& location) { if (auto file_index = std::dynamic_pointer_cast(to_merge)) { auto status = index_->Add(file_index->Count(), file_index->GetRawVectors(), file_index->GetRawIds()); if (!status.ok()) { - ENGINE_LOG_ERROR << "Merge: Add Error"; + ENGINE_LOG_ERROR << "Failed to merge: " << location << " to: " << location_; + } else { + ENGINE_LOG_DEBUG << "Finish merge index file: " << location; } return status; } else { @@ -503,6 +511,7 @@ ExecutionEngineImpl::BuildIndex(const std::string& location, EngineType engine_t throw Exception(DB_ERROR, status.message()); } + ENGINE_LOG_DEBUG << "Finish build index file: " << location << " size: " << to_index->Size(); return std::make_shared(to_index, location, engine_type, metric_type_, nlist_); } diff --git a/core/src/db/meta/Meta.h b/core/src/db/meta/Meta.h index bf46f02fea..1929414fdc 100644 --- a/core/src/db/meta/Meta.h +++ b/core/src/db/meta/Meta.h @@ -35,6 +35,13 @@ static const char* META_TABLES = "Tables"; static const char* META_TABLEFILES = "TableFiles"; class Meta { + public: + class CleanUpFilter { + public: + virtual bool + IsIgnored(const TableFileSchema& schema) = 0; + }; + public: virtual ~Meta() = default; @@ -121,10 +128,7 @@ class Meta { CleanUpShadowFiles() = 0; virtual Status - CleanUpCacheWithTTL(uint64_t seconds) = 0; - - virtual Status - CleanUpFilesWithTTL(uint64_t seconds) = 0; + CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter = nullptr) = 0; virtual Status DropAll() = 0; diff --git a/core/src/db/meta/MySQLMetaImpl.cpp b/core/src/db/meta/MySQLMetaImpl.cpp index dcf3824fe1..da91585b92 100644 --- a/core/src/db/meta/MySQLMetaImpl.cpp +++ b/core/src/db/meta/MySQLMetaImpl.cpp @@ -1783,49 +1783,7 @@ MySQLMetaImpl::CleanUpShadowFiles() { } Status -MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) { - auto now = utils::GetMicroSecTimeStamp(); - - // erase deleted/backup files from cache - try { - server::MetricCollector metric; - - mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_); - - if (connectionPtr == nullptr) { - return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); - } - - mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query(); - cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date" - << " FROM " << META_TABLEFILES << " WHERE file_type IN (" - << std::to_string(TableFileSchema::TO_DELETE) << "," - << std::to_string(TableFileSchema::BACKUP) << ")" - << " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";"; - - mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); - - TableFileSchema table_file; - std::vector idsToDelete; - - for (auto& resRow : res) { - table_file.id_ = resRow["id"]; // implicit conversion - resRow["table_id"].to_string(table_file.table_id_); - resRow["file_id"].to_string(table_file.file_id_); - table_file.date_ = resRow["date"]; - - utils::GetTableFilePath(options_, table_file); - server::CommonUtil::EraseFromCache(table_file.location_); - } - } catch (std::exception& e) { - return HandleException("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", e.what()); - } - - return Status::OK(); -} - -Status -MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { +MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { auto now = utils::GetMicroSecTimeStamp(); std::set table_ids; @@ -1840,33 +1798,52 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } - mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query(); - cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date" - << " FROM " << META_TABLEFILES - << " WHERE file_type = " << std::to_string(TableFileSchema::TO_DELETE) - << " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";"; + mysqlpp::Query query = connectionPtr->query(); + query << "SELECT id, table_id, file_id, file_type, date" + << " FROM " << META_TABLEFILES << " WHERE file_type IN (" + << std::to_string(TableFileSchema::TO_DELETE) << "," << std::to_string(TableFileSchema::BACKUP) << ")" + << " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";"; - ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str(); + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str(); - mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); + mysqlpp::StoreQueryResult res = query.store(); TableFileSchema table_file; std::vector idsToDelete; + int64_t clean_files = 0; for (auto& resRow : res) { table_file.id_ = resRow["id"]; // implicit conversion resRow["table_id"].to_string(table_file.table_id_); resRow["file_id"].to_string(table_file.file_id_); table_file.date_ = resRow["date"]; + table_file.file_type_ = resRow["file_type"]; - utils::DeleteTableFilePath(options_, table_file); + // check if the file can be deleted + if (filter && filter->IsIgnored(table_file)) { + ENGINE_LOG_DEBUG << "File:" << table_file.file_id_ + << " currently is in use, not able to delete now"; + continue; // ignore this file, don't delete it + } - ENGINE_LOG_DEBUG << "Removing file id:" << table_file.id_ << " location:" << table_file.location_; + // erase file data from cache + // because GetTableFilePath won't able to generate file path after the file is deleted + utils::GetTableFilePath(options_, table_file); + server::CommonUtil::EraseFromCache(table_file.location_); - idsToDelete.emplace_back(std::to_string(table_file.id_)); - table_ids.insert(table_file.table_id_); + if (table_file.file_type_ == (int)TableFileSchema::TO_DELETE) { + // delete file from disk storage + utils::DeleteTableFilePath(options_, table_file); + ENGINE_LOG_DEBUG << "Remove file id:" << table_file.id_ << " location:" << table_file.location_; + + idsToDelete.emplace_back(std::to_string(table_file.id_)); + table_ids.insert(table_file.table_id_); + + clean_files++; + } } + // delete file from meta if (!idsToDelete.empty()) { std::stringstream idsToDeleteSS; for (auto& id : idsToDelete) { @@ -1875,18 +1852,17 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { std::string idsToDeleteStr = idsToDeleteSS.str(); idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); // remove the last " OR " - cleanUpFilesWithTTLQuery << "DELETE FROM " << META_TABLEFILES << " WHERE " << idsToDeleteStr << ";"; + query << "DELETE FROM " << META_TABLEFILES << " WHERE " << idsToDeleteStr << ";"; - ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str(); + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str(); - if (!cleanUpFilesWithTTLQuery.exec()) { - return HandleException("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", - cleanUpFilesWithTTLQuery.error()); + if (!query.exec()) { + return HandleException("QUERY ERROR WHEN CLEANING UP FILES WITH TTL", query.error()); } } - if (res.size() > 0) { - ENGINE_LOG_DEBUG << "Clean " << res.size() << " files deleted in " << seconds << " seconds"; + if (clean_files > 0) { + ENGINE_LOG_DEBUG << "Clean " << clean_files << " files expired in " << seconds << " seconds"; } } // Scoped Connection } catch (std::exception& e) { @@ -1904,14 +1880,13 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); } - mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query(); - cleanUpFilesWithTTLQuery << "SELECT id, table_id" - << " FROM " << META_TABLES - << " WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";"; + mysqlpp::Query query = connectionPtr->query(); + query << "SELECT id, table_id" + << " FROM " << META_TABLES << " WHERE state = " << std::to_string(TableSchema::TO_DELETE) << ";"; - ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str(); + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str(); - mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); + mysqlpp::StoreQueryResult res = query.store(); int64_t remove_tables = 0; if (!res.empty()) { @@ -1927,13 +1902,12 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { } std::string idsToDeleteStr = idsToDeleteSS.str(); idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); // remove the last " OR " - cleanUpFilesWithTTLQuery << "DELETE FROM " << META_TABLES << " WHERE " << idsToDeleteStr << ";"; + query << "DELETE FROM " << META_TABLES << " WHERE " << idsToDeleteStr << ";"; - ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str(); + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str(); - if (!cleanUpFilesWithTTLQuery.exec()) { - return HandleException("QUERY ERROR WHEN CLEANING UP TABLES WITH TTL", - cleanUpFilesWithTTLQuery.error()); + if (!query.exec()) { + return HandleException("QUERY ERROR WHEN CLEANING UP TABLES WITH TTL", query.error()); } } @@ -1958,14 +1932,13 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { } for (auto& table_id : table_ids) { - mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query(); - cleanUpFilesWithTTLQuery << "SELECT file_id" - << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote - << table_id << ";"; + mysqlpp::Query query = connectionPtr->query(); + query << "SELECT file_id" + << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << table_id << ";"; - ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << cleanUpFilesWithTTLQuery.str(); + ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUpFilesWithTTL: " << query.str(); - mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); + mysqlpp::StoreQueryResult res = query.store(); if (res.empty()) { utils::DeleteTablePath(options_, table_id); diff --git a/core/src/db/meta/MySQLMetaImpl.h b/core/src/db/meta/MySQLMetaImpl.h index e7697316af..6d2e7cf4c2 100644 --- a/core/src/db/meta/MySQLMetaImpl.h +++ b/core/src/db/meta/MySQLMetaImpl.h @@ -120,10 +120,7 @@ class MySQLMetaImpl : public Meta { CleanUpShadowFiles() override; Status - CleanUpCacheWithTTL(uint64_t seconds) override; - - Status - CleanUpFilesWithTTL(uint64_t seconds) override; + CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter = nullptr) override; Status DropAll() override; diff --git a/core/src/db/meta/SqliteMetaImpl.cpp b/core/src/db/meta/SqliteMetaImpl.cpp index 07f890d50a..9e323a4fac 100644 --- a/core/src/db/meta/SqliteMetaImpl.cpp +++ b/core/src/db/meta/SqliteMetaImpl.cpp @@ -1294,51 +1294,7 @@ SqliteMetaImpl::CleanUpShadowFiles() { } Status -SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) { - auto now = utils::GetMicroSecTimeStamp(); - - // erase deleted/backup files from cache - try { - server::MetricCollector metric; - - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - - std::vector file_types = { - (int)TableFileSchema::TO_DELETE, - (int)TableFileSchema::BACKUP, - }; - - auto files = ConnectorPtr->select(columns(&TableFileSchema::id_, - &TableFileSchema::table_id_, - &TableFileSchema::file_id_, - &TableFileSchema::date_), - where( - in(&TableFileSchema::file_type_, file_types) - and - c(&TableFileSchema::updated_time_) - < now - seconds * US_PS)); - - for (auto& file : files) { - TableFileSchema table_file; - table_file.id_ = std::get<0>(file); - table_file.table_id_ = std::get<1>(file); - table_file.file_id_ = std::get<2>(file); - table_file.date_ = std::get<3>(file); - - utils::GetTableFilePath(options_, table_file); - server::CommonUtil::EraseFromCache(table_file.location_); - } - - } catch (std::exception& e) { - return HandleException("Encounter exception when clean cache", e.what()); - } - - return Status::OK(); -} - -Status -SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { +SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter) { auto now = utils::GetMicroSecTimeStamp(); std::set table_ids; @@ -1346,33 +1302,60 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { try { server::MetricCollector metric; + std::vector file_types = { + (int)TableFileSchema::TO_DELETE, + (int)TableFileSchema::BACKUP, + }; + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here std::lock_guard meta_lock(meta_mutex_); + // collect files to be deleted auto files = ConnectorPtr->select(columns(&TableFileSchema::id_, &TableFileSchema::table_id_, &TableFileSchema::file_id_, + &TableFileSchema::file_type_, &TableFileSchema::date_), where( - c(&TableFileSchema::file_type_) == - (int)TableFileSchema::TO_DELETE + in(&TableFileSchema::file_type_, file_types) and c(&TableFileSchema::updated_time_) < now - seconds * US_PS)); + int64_t clean_files = 0; auto commited = ConnectorPtr->transaction([&]() mutable { TableFileSchema table_file; for (auto& file : files) { table_file.id_ = std::get<0>(file); table_file.table_id_ = std::get<1>(file); table_file.file_id_ = std::get<2>(file); - table_file.date_ = std::get<3>(file); + table_file.file_type_ = std::get<3>(file); + table_file.date_ = std::get<4>(file); - utils::DeleteTableFilePath(options_, table_file); - ENGINE_LOG_DEBUG << "Removing file id:" << table_file.file_id_ << " location:" << table_file.location_; - ConnectorPtr->remove(table_file.id_); + // check if the file can be deleted + if (filter && filter->IsIgnored(table_file)) { + ENGINE_LOG_DEBUG << "File:" << table_file.file_id_ + << " currently is in use, not able to delete now"; + continue; // ignore this file, don't delete it + } - table_ids.insert(table_file.table_id_); + // erase from cache, must do this before file deleted, + // because GetTableFilePath won't able to generate file path after the file is deleted + utils::GetTableFilePath(options_, table_file); + server::CommonUtil::EraseFromCache(table_file.location_); + + if (table_file.file_type_ == (int)TableFileSchema::TO_DELETE) { + // delete file from meta + ConnectorPtr->remove(table_file.id_); + + // delete file from disk storage + utils::DeleteTableFilePath(options_, table_file); + + ENGINE_LOG_DEBUG << "Remove file id:" << table_file.file_id_ << " location:" << table_file.location_; + table_ids.insert(table_file.table_id_); + + clean_files++; + } } return true; }); @@ -1381,8 +1364,8 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed"); } - if (files.size() > 0) { - ENGINE_LOG_DEBUG << "Clean " << files.size() << " files deleted in " << seconds << " seconds"; + if (clean_files > 0) { + ENGINE_LOG_DEBUG << "Clean " << clean_files << " files expired in " << seconds << " seconds"; } } catch (std::exception& e) { return HandleException("Encounter exception when clean table files", e.what()); diff --git a/core/src/db/meta/SqliteMetaImpl.h b/core/src/db/meta/SqliteMetaImpl.h index 5581efe361..f50fa452f3 100644 --- a/core/src/db/meta/SqliteMetaImpl.h +++ b/core/src/db/meta/SqliteMetaImpl.h @@ -120,10 +120,7 @@ class SqliteMetaImpl : public Meta { CleanUpShadowFiles() override; Status - CleanUpCacheWithTTL(uint64_t seconds) override; - - Status - CleanUpFilesWithTTL(uint64_t seconds) override; + CleanUpFilesWithTTL(uint64_t seconds, CleanUpFilter* filter = nullptr) override; Status DropAll() override; diff --git a/core/src/scheduler/task/BuildIndexTask.cpp b/core/src/scheduler/task/BuildIndexTask.cpp index e952bd0938..571bb279a3 100644 --- a/core/src/scheduler/task/BuildIndexTask.cpp +++ b/core/src/scheduler/task/BuildIndexTask.cpp @@ -168,21 +168,28 @@ XBuildIndexTask::Execute() { // step 5: save index file try { - index->Serialize(); + status = index->Serialize(); + if (!status.ok()) { + ENGINE_LOG_ERROR << status.message(); + } } catch (std::exception& ex) { - // typical error: out of disk space or permition denied std::string msg = "Serialize index encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; + status = Status(DB_ERROR, msg); + } + if (!status.ok()) { + // if failed to serialize index file to disk + // typical error: out of disk space, out of memory or permition denied table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE; status = meta_ptr->UpdateTableFile(table_file); ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; ENGINE_LOG_ERROR << "Failed to persist index file: " << table_file.location_ - << ", possible out of disk space"; + << ", possible out of disk space or memory"; build_index_job->BuildIndexDone(to_index_id_); - build_index_job->GetStatus() = Status(DB_ERROR, msg); + build_index_job->GetStatus() = status; to_index_engine_ = nullptr; return; } @@ -196,7 +203,11 @@ XBuildIndexTask::Execute() { origin_file.file_type_ = engine::meta::TableFileSchema::BACKUP; engine::meta::TableFilesSchema update_files = {table_file, origin_file}; - status = meta_ptr->UpdateTableFiles(update_files); + + if (status.ok()) { // makesure index file is sucessfully serialized to disk + status = meta_ptr->UpdateTableFiles(update_files); + } + if (status.ok()) { ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " << index->PhysicalSize() << " bytes" diff --git a/core/src/server/grpc_impl/request/CmdRequest.cpp b/core/src/server/grpc_impl/request/CmdRequest.cpp index b215f94d31..4af9db03ef 100644 --- a/core/src/server/grpc_impl/request/CmdRequest.cpp +++ b/core/src/server/grpc_impl/request/CmdRequest.cpp @@ -17,6 +17,8 @@ #include "server/grpc_impl/request/CmdRequest.h" #include "scheduler/SchedInst.h" +#include "utils/Log.h" +#include "utils/TimeRecorder.h" #include @@ -35,10 +37,19 @@ CmdRequest::Create(const std::string& cmd, std::string& result) { Status CmdRequest::OnExecute() { + std::string hdr = "CmdRequest(cmd=" + cmd_ + ")"; + TimeRecorder rc(hdr); + if (cmd_ == "version") { result_ = MILVUS_VERSION; } else if (cmd_ == "tasktable") { result_ = scheduler::ResMgrInst::GetInstance()->DumpTaskTables(); + } else if (cmd_ == "mode") { +#ifdef MILVUS_GPU_VERSION + result_ = "GPU"; +#else + result_ = "CPU"; +#endif } else { result_ = "OK"; } diff --git a/core/src/server/grpc_impl/request/CountTableRequest.cpp b/core/src/server/grpc_impl/request/CountTableRequest.cpp index 8559890ad6..b90a33bf61 100644 --- a/core/src/server/grpc_impl/request/CountTableRequest.cpp +++ b/core/src/server/grpc_impl/request/CountTableRequest.cpp @@ -39,7 +39,8 @@ CountTableRequest::Create(const std::string& table_name, int64_t& row_count) { Status CountTableRequest::OnExecute() { try { - TimeRecorder rc("CountTableRequest"); + std::string hdr = "CountTableRequest(table=" + table_name_ + ")"; + TimeRecorder rc(hdr); // step 1: check arguments auto status = ValidationUtil::ValidateTableName(table_name_); diff --git a/core/src/server/grpc_impl/request/CreateIndexRequest.cpp b/core/src/server/grpc_impl/request/CreateIndexRequest.cpp index 72678aee87..c7628048ff 100644 --- a/core/src/server/grpc_impl/request/CreateIndexRequest.cpp +++ b/core/src/server/grpc_impl/request/CreateIndexRequest.cpp @@ -44,7 +44,8 @@ CreateIndexRequest::Create(const ::milvus::grpc::IndexParam* index_param) { Status CreateIndexRequest::OnExecute() { try { - TimeRecorder rc("CreateIndexRequest"); + std::string hdr = "CreateIndexRequest(table=" + index_param_->table_name() + ")"; + TimeRecorder rc(hdr); // step 1: check arguments std::string table_name_ = index_param_->table_name(); diff --git a/core/src/server/grpc_impl/request/CreatePartitionRequest.cpp b/core/src/server/grpc_impl/request/CreatePartitionRequest.cpp index 3bd4a86ef6..1a18cdfcad 100644 --- a/core/src/server/grpc_impl/request/CreatePartitionRequest.cpp +++ b/core/src/server/grpc_impl/request/CreatePartitionRequest.cpp @@ -22,6 +22,7 @@ #include "utils/ValidationUtil.h" #include +#include namespace milvus { namespace server { @@ -42,7 +43,10 @@ CreatePartitionRequest::Create(const ::milvus::grpc::PartitionParam* partition_p Status CreatePartitionRequest::OnExecute() { - TimeRecorder rc("CreatePartitionRequest"); + std::string hdr = "CreatePartitionRequest(table=" + partition_param_->table_name() + + ", partition_name=" + partition_param_->partition_name() + + ", partition_tag=" + partition_param_->tag() + ")"; + TimeRecorder rc(hdr); try { // step 1: check arguments diff --git a/core/src/server/grpc_impl/request/CreateTableRequest.cpp b/core/src/server/grpc_impl/request/CreateTableRequest.cpp index 67a3eaa877..55d953aa6f 100644 --- a/core/src/server/grpc_impl/request/CreateTableRequest.cpp +++ b/core/src/server/grpc_impl/request/CreateTableRequest.cpp @@ -22,6 +22,7 @@ #include "utils/ValidationUtil.h" #include +#include namespace milvus { namespace server { @@ -42,7 +43,9 @@ CreateTableRequest::Create(const ::milvus::grpc::TableSchema* schema) { Status CreateTableRequest::OnExecute() { - TimeRecorder rc("CreateTableRequest"); + std::string hdr = "CreateTableRequest(table=" + schema_->table_name() + + ", dimension=" + std::to_string(schema_->dimension()) + ")"; + TimeRecorder rc(hdr); try { // step 1: check arguments diff --git a/core/src/server/grpc_impl/request/DescribeIndexRequest.cpp b/core/src/server/grpc_impl/request/DescribeIndexRequest.cpp index b3a987c6b0..d57d47bf2c 100644 --- a/core/src/server/grpc_impl/request/DescribeIndexRequest.cpp +++ b/core/src/server/grpc_impl/request/DescribeIndexRequest.cpp @@ -39,7 +39,8 @@ DescribeIndexRequest::Create(const std::string& table_name, ::milvus::grpc::Inde Status DescribeIndexRequest::OnExecute() { try { - TimeRecorder rc("DescribeIndexRequest"); + std::string hdr = "DescribeIndexRequest(table=" + table_name_ + ")"; + TimeRecorder rc(hdr); // step 1: check arguments auto status = ValidationUtil::ValidateTableName(table_name_); diff --git a/core/src/server/grpc_impl/request/DescribeTableRequest.cpp b/core/src/server/grpc_impl/request/DescribeTableRequest.cpp index 28a5f327c5..3bd97ef7e8 100644 --- a/core/src/server/grpc_impl/request/DescribeTableRequest.cpp +++ b/core/src/server/grpc_impl/request/DescribeTableRequest.cpp @@ -38,7 +38,8 @@ DescribeTableRequest::Create(const std::string& table_name, ::milvus::grpc::Tabl Status DescribeTableRequest::OnExecute() { - TimeRecorder rc("DescribeTableRequest"); + std::string hdr = "DescribeTableRequest(table=" + table_name_ + ")"; + TimeRecorder rc(hdr); try { // step 1: check arguments diff --git a/core/src/server/grpc_impl/request/DropIndexRequest.cpp b/core/src/server/grpc_impl/request/DropIndexRequest.cpp index ab5c83b0e5..619ea753ba 100644 --- a/core/src/server/grpc_impl/request/DropIndexRequest.cpp +++ b/core/src/server/grpc_impl/request/DropIndexRequest.cpp @@ -39,7 +39,8 @@ DropIndexRequest::Create(const std::string& table_name) { Status DropIndexRequest::OnExecute() { try { - TimeRecorder rc("DropIndexRequest"); + std::string hdr = "DropIndexRequest(table=" + table_name_ + ")"; + TimeRecorder rc(hdr); // step 1: check arguments auto status = ValidationUtil::ValidateTableName(table_name_); diff --git a/core/src/server/grpc_impl/request/DropPartitionRequest.cpp b/core/src/server/grpc_impl/request/DropPartitionRequest.cpp index 0e29b6abe8..1ec189986a 100644 --- a/core/src/server/grpc_impl/request/DropPartitionRequest.cpp +++ b/core/src/server/grpc_impl/request/DropPartitionRequest.cpp @@ -39,6 +39,11 @@ DropPartitionRequest::Create(const ::milvus::grpc::PartitionParam* partition_par Status DropPartitionRequest::OnExecute() { + std::string hdr = "DropPartitionRequest(table=" + partition_param_->table_name() + + ", partition_name=" + partition_param_->partition_name() + + ", partition_tag=" + partition_param_->tag() + ")"; + TimeRecorder rc(hdr); + std::string table_name = partition_param_->table_name(); std::string partition_name = partition_param_->partition_name(); std::string partition_tag = partition_param_->tag(); diff --git a/core/src/server/grpc_impl/request/DropTableRequest.cpp b/core/src/server/grpc_impl/request/DropTableRequest.cpp index a678e5a6f6..6f81a5b2a0 100644 --- a/core/src/server/grpc_impl/request/DropTableRequest.cpp +++ b/core/src/server/grpc_impl/request/DropTableRequest.cpp @@ -40,7 +40,8 @@ DropTableRequest::Create(const std::string& table_name) { Status DropTableRequest::OnExecute() { try { - TimeRecorder rc("DropTableRequest"); + std::string hdr = "DropTableRequest(table=" + table_name_ + ")"; + TimeRecorder rc(hdr); // step 1: check arguments auto status = ValidationUtil::ValidateTableName(table_name_); diff --git a/core/src/server/grpc_impl/request/HasTableRequest.cpp b/core/src/server/grpc_impl/request/HasTableRequest.cpp index 2909c93056..434580efdf 100644 --- a/core/src/server/grpc_impl/request/HasTableRequest.cpp +++ b/core/src/server/grpc_impl/request/HasTableRequest.cpp @@ -39,7 +39,8 @@ HasTableRequest::Create(const std::string& table_name, bool& has_table) { Status HasTableRequest::OnExecute() { try { - TimeRecorder rc("HasTableRequest"); + std::string hdr = "HasTableRequest(table=" + table_name_ + ")"; + TimeRecorder rc(hdr); // step 1: check arguments auto status = ValidationUtil::ValidateTableName(table_name_); diff --git a/core/src/server/grpc_impl/request/InsertRequest.cpp b/core/src/server/grpc_impl/request/InsertRequest.cpp index f436db074e..df65c679ed 100644 --- a/core/src/server/grpc_impl/request/InsertRequest.cpp +++ b/core/src/server/grpc_impl/request/InsertRequest.cpp @@ -45,7 +45,10 @@ InsertRequest::Create(const ::milvus::grpc::InsertParam* insert_param, ::milvus: Status InsertRequest::OnExecute() { try { - TimeRecorder rc("InsertRequest"); + std::string hdr = "InsertRequest(table=" + insert_param_->table_name() + + ", n=" + std::to_string(insert_param_->row_record_array_size()) + + ", partition_tag=" + insert_param_->partition_tag() + ")"; + TimeRecorder rc(hdr); // step 1: check arguments auto status = ValidationUtil::ValidateTableName(insert_param_->table_name()); diff --git a/core/src/server/grpc_impl/request/PreloadTableRequest.cpp b/core/src/server/grpc_impl/request/PreloadTableRequest.cpp index e26aa8a877..3c46524afe 100644 --- a/core/src/server/grpc_impl/request/PreloadTableRequest.cpp +++ b/core/src/server/grpc_impl/request/PreloadTableRequest.cpp @@ -39,7 +39,8 @@ PreloadTableRequest::Create(const std::string& table_name) { Status PreloadTableRequest::OnExecute() { try { - TimeRecorder rc("PreloadTableRequest"); + std::string hdr = "PreloadTableRequest(table=" + table_name_ + ")"; + TimeRecorder rc(hdr); // step 1: check arguments auto status = ValidationUtil::ValidateTableName(table_name_); diff --git a/core/src/server/grpc_impl/request/SearchRequest.cpp b/core/src/server/grpc_impl/request/SearchRequest.cpp index 28f4ff723e..db0a55d48e 100644 --- a/core/src/server/grpc_impl/request/SearchRequest.cpp +++ b/core/src/server/grpc_impl/request/SearchRequest.cpp @@ -51,7 +51,9 @@ SearchRequest::OnExecute() { int64_t top_k = search_param_->topk(); int64_t nprobe = search_param_->nprobe(); - std::string hdr = "SearchRequest(k=" + std::to_string(top_k) + ", nprob=" + std::to_string(nprobe) + ")"; + std::string hdr = "SearchRequest(table=" + search_param_->table_name() + + ", nq=" + std::to_string(search_param_->query_record_array_size()) + + ", k=" + std::to_string(top_k) + ", nprob=" + std::to_string(nprobe) + ")"; TimeRecorder rc(hdr); // step 1: check table name diff --git a/core/src/server/grpc_impl/request/ShowPartitionsRequest.cpp b/core/src/server/grpc_impl/request/ShowPartitionsRequest.cpp index 32fa0672c5..0d0809b171 100644 --- a/core/src/server/grpc_impl/request/ShowPartitionsRequest.cpp +++ b/core/src/server/grpc_impl/request/ShowPartitionsRequest.cpp @@ -40,6 +40,9 @@ ShowPartitionsRequest::Create(const std::string& table_name, ::milvus::grpc::Par Status ShowPartitionsRequest::OnExecute() { + std::string hdr = "ShowPartitionsRequest(table=" + table_name_ + ")"; + TimeRecorder rc(hdr); + auto status = ValidationUtil::ValidateTableName(table_name_); if (!status.ok()) { return status; diff --git a/core/src/server/grpc_impl/request/ShowTablesRequest.cpp b/core/src/server/grpc_impl/request/ShowTablesRequest.cpp index 90dbad981f..404e08197e 100644 --- a/core/src/server/grpc_impl/request/ShowTablesRequest.cpp +++ b/core/src/server/grpc_impl/request/ShowTablesRequest.cpp @@ -38,6 +38,8 @@ ShowTablesRequest::Create(::milvus::grpc::TableNameList* table_name_list) { Status ShowTablesRequest::OnExecute() { + TimeRecorder rc("ShowTablesRequest"); + std::vector schema_array; auto statuts = DBWrapper::DB()->AllTables(schema_array); if (!statuts.ok()) { diff --git a/core/src/utils/CommonUtil.cpp b/core/src/utils/CommonUtil.cpp index cfadb2fcc4..cdfae8f1e5 100644 --- a/core/src/utils/CommonUtil.cpp +++ b/core/src/utils/CommonUtil.cpp @@ -229,7 +229,7 @@ CommonUtil::ConvertTime(tm time_struct, time_t& time_integer) { void CommonUtil::EraseFromCache(const std::string& item_key) { if (item_key.empty()) { - // SERVER_LOG_ERROR << "Empty key cannot be erased from cache"; + SERVER_LOG_ERROR << "Empty key cannot be erased from cache"; return; } diff --git a/core/unittest/db/test_misc.cpp b/core/unittest/db/test_misc.cpp index c044dde8da..326f705184 100644 --- a/core/unittest/db/test_misc.cpp +++ b/core/unittest/db/test_misc.cpp @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include "db/IndexFailedChecker.h" +#include "db/OngoingFileChecker.h" #include "db/Options.h" #include "db/Utils.h" #include "db/engine/EngineFactory.h" @@ -119,3 +121,61 @@ TEST(DBMiscTest, UTILS_TEST) { status = milvus::engine::utils::DeleteTableFilePath(options, file); ASSERT_TRUE(status.ok()); } + +TEST(DBMiscTest, CHECKER_TEST) { + { + milvus::engine::IndexFailedChecker checker; + milvus::engine::meta::TableFileSchema schema; + schema.table_id_ = "aaa"; + schema.file_id_ = "5000"; + checker.MarkFailedIndexFile(schema); + schema.table_id_ = "bbb"; + schema.file_id_ = "5001"; + checker.MarkFailedIndexFile(schema); + + std::vector failed_files; + checker.GetFailedIndexFileOfTable("aaa", failed_files); + ASSERT_EQ(failed_files.size(), 1UL); + + schema.table_id_ = "bbb"; + schema.file_id_ = "5002"; + checker.MarkFailedIndexFile(schema); + checker.MarkFailedIndexFile(schema); + + milvus::engine::meta::TableFilesSchema table_files = {schema}; + checker.IgnoreFailedIndexFiles(table_files); + ASSERT_TRUE(table_files.empty()); + + checker.GetFailedIndexFileOfTable("bbb", failed_files); + ASSERT_EQ(failed_files.size(), 2UL); + + checker.MarkSucceedIndexFile(schema); + checker.GetFailedIndexFileOfTable("bbb", failed_files); + ASSERT_EQ(failed_files.size(), 1UL); + } + + { + milvus::engine::OngoingFileChecker checker; + milvus::engine::meta::TableFileSchema schema; + schema.table_id_ = "aaa"; + schema.file_id_ = "5000"; + checker.MarkOngoingFile(schema); + + ASSERT_TRUE(checker.IsIgnored(schema)); + + schema.table_id_ = "bbb"; + schema.file_id_ = "5001"; + milvus::engine::meta::TableFilesSchema table_files = {schema}; + checker.MarkOngoingFiles(table_files); + + ASSERT_TRUE(checker.IsIgnored(schema)); + + checker.UnmarkOngoingFile(schema); + ASSERT_FALSE(checker.IsIgnored(schema)); + + schema.table_id_ = "aaa"; + schema.file_id_ = "5000"; + checker.UnmarkOngoingFile(schema); + ASSERT_FALSE(checker.IsIgnored(schema)); + } +}