Fix failed to open file (#2138)

* file reference

Signed-off-by: groot <yihua.mo@zilliz.com>

* print info

Signed-off-by: groot <yihua.mo@zilliz.com>

* avoid metric crash

Signed-off-by: yhmo <yihua.mo@zilliz.com>

* refine code

Signed-off-by: yhmo <yihua.mo@zilliz.com>

* apply delete bug

Signed-off-by: groot <yihua.mo@zilliz.com>

* has partition check

Signed-off-by: groot <yihua.mo@zilliz.com>

* duplicate id search

Signed-off-by: groot <yihua.mo@zilliz.com>

* changelog

Signed-off-by: groot <yihua.mo@zilliz.com>
This commit is contained in:
groot 2020-04-28 08:10:01 -05:00 committed by GitHub
parent 3be8aad1af
commit ebb01aa9b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1089 additions and 783 deletions

View File

@ -6,11 +6,13 @@ Please mark all change in change log and use the issue from GitHub
## Bug
- \#1705 Limit the insert data batch size
- \#1796 Too much low performance of building index on ubuntu-mysql-version
- \#1929 Skip MySQL meta schema field width check
- \#1997 Index file missed after compact
- \#2073 Fix CheckDBConfigBackendUrl error message
- \#2076 CheckMetricConfigAddress error message
- \#1796 Too much low performance of building index on ubuntu-mysql-version
- \#2128 Check has_partition params
- \#2131 Distance/ID returned is not correct if searching with duplicate ids
- \#2141 Fix server start failed if wal directory exist
## Feature

View File

@ -339,8 +339,8 @@ DBImpl::GetCollectionInfo(const std::string& collection_id, std::string& collect
size_t total_row_count = 0;
auto get_info = [&](const std::string& col_id, const std::string& tag) {
meta::SegmentsSchema collection_files;
status = meta_ptr_->FilesByType(col_id, file_types, collection_files);
meta::FilesHolder files_holder;
status = meta_ptr_->FilesByType(col_id, file_types, files_holder);
if (!status.ok()) {
std::string err_msg = "Failed to get collection info: " + status.ToString();
LOG_ENGINE_ERROR_ << err_msg;
@ -352,6 +352,7 @@ DBImpl::GetCollectionInfo(const std::string& collection_id, std::string& collect
milvus::json json_segments;
size_t row_count = 0;
milvus::engine::meta::SegmentsSchema& collection_files = files_holder.HoldFiles();
for (auto& file : collection_files) {
milvus::json json_segment;
json_segment[JSON_SEGMENT_NAME] = file.segment_id_;
@ -401,8 +402,8 @@ DBImpl::PreloadCollection(const std::string& collection_id) {
}
// step 1: get all collection files from parent collection
meta::SegmentsSchema files_array;
auto status = GetFilesToSearch(collection_id, files_array);
meta::FilesHolder files_holder;
auto status = meta_ptr_->FilesToSearch(collection_id, files_holder);
if (!status.ok()) {
return status;
}
@ -411,7 +412,7 @@ DBImpl::PreloadCollection(const std::string& collection_id) {
std::vector<meta::CollectionSchema> partition_array;
status = meta_ptr_->ShowPartitions(collection_id, partition_array);
for (auto& schema : partition_array) {
status = GetFilesToSearch(schema.collection_id_, files_array);
status = meta_ptr_->FilesToSearch(schema.collection_id_, files_holder);
}
int64_t size = 0;
@ -420,6 +421,7 @@ DBImpl::PreloadCollection(const std::string& collection_id) {
int64_t available_size = cache_total - cache_usage;
// step 3: load file one by one
milvus::engine::meta::SegmentsSchema& files_array = files_holder.HoldFiles();
LOG_ENGINE_DEBUG_ << "Begin pre-load collection:" + collection_id + ", totally " << files_array.size()
<< " files need to be pre-loaded";
TimeRecorderAuto rc("Pre-load collection:" + collection_id);
@ -881,19 +883,19 @@ DBImpl::Compact(const std::string& collection_id) {
// Get files to compact from meta.
std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
meta::SegmentSchema::FILE_TYPE::BACKUP};
meta::SegmentsSchema files_to_compact;
status = meta_ptr_->FilesByType(collection_id, file_types, files_to_compact);
meta::FilesHolder files_holder;
status = meta_ptr_->FilesByType(collection_id, file_types, files_holder);
if (!status.ok()) {
std::string err_msg = "Failed to get files to compact: " + status.message();
LOG_ENGINE_ERROR_ << err_msg;
return Status(DB_ERROR, err_msg);
}
LOG_ENGINE_DEBUG_ << "Found " << files_to_compact.size() << " segment to compact";
OngoingFileChecker::GetInstance().MarkOngoingFiles(files_to_compact);
LOG_ENGINE_DEBUG_ << "Found " << files_holder.HoldFiles().size() << " segment to compact";
Status compact_status;
// attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
milvus::engine::meta::SegmentsSchema files_to_compact = files_holder.HoldFiles();
for (auto iter = files_to_compact.begin(); iter != files_to_compact.end();) {
meta::SegmentSchema file = *iter;
iter = files_to_compact.erase(iter);
@ -906,7 +908,7 @@ DBImpl::Compact(const std::string& collection_id) {
size_t deleted_docs_size;
status = segment_reader.ReadDeletedDocsSize(deleted_docs_size);
if (!status.ok()) {
OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
files_holder.UnmarkFile(file);
continue; // skip this file and try compact next one
}
@ -917,26 +919,24 @@ DBImpl::Compact(const std::string& collection_id) {
if (!compact_status.ok()) {
LOG_ENGINE_ERROR_ << "Compact failed for segment " << file.segment_id_ << ": "
<< compact_status.message();
OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
files_holder.UnmarkFile(file);
continue; // skip this file and try compact next one
}
} else {
OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
files_holder.UnmarkFile(file);
LOG_ENGINE_DEBUG_ << "Segment " << file.segment_id_ << " has no deleted data. No need to compact";
continue; // skip this file and try compact next one
}
LOG_ENGINE_DEBUG_ << "Updating meta after compaction...";
status = meta_ptr_->UpdateCollectionFiles(files_to_update);
OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
files_holder.UnmarkFile(file);
if (!status.ok()) {
compact_status = status;
break; // meta error, could not go on
}
}
OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files_to_compact);
if (compact_status.ok()) {
LOG_ENGINE_DEBUG_ << "Finished compacting collection: " << collection_id;
}
@ -1006,15 +1006,18 @@ DBImpl::CompactFile(const std::string& collection_id, const meta::SegmentSchema&
// Set all files in segment to TO_DELETE
auto& segment_id = file.segment_id_;
meta::SegmentsSchema segment_files;
status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, segment_files);
meta::FilesHolder files_holder;
status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, files_holder);
if (!status.ok()) {
return status;
}
milvus::engine::meta::SegmentsSchema& segment_files = files_holder.HoldFiles();
for (auto& f : segment_files) {
f.file_type_ = meta::SegmentSchema::FILE_TYPE::TO_DELETE;
files_to_update.emplace_back(f);
}
files_holder.ReleaseFiles();
LOG_ENGINE_DEBUG_ << "Compacted segment " << compacted_file.segment_id_ << " from "
<< std::to_string(file.file_size_) << " bytes to " << std::to_string(compacted_file.file_size_)
@ -1044,47 +1047,35 @@ DBImpl::GetVectorsByID(const std::string& collection_id, const IDNumbers& id_arr
return status;
}
meta::SegmentsSchema files_to_query;
meta::FilesHolder files_holder;
std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
meta::SegmentSchema::FILE_TYPE::BACKUP};
status = meta_ptr_->FilesByType(collection_id, file_types, files_to_query);
status = meta_ptr_->FilesByType(collection_id, file_types, files_holder);
if (!status.ok()) {
std::string err_msg = "Failed to get files for GetVectorsByID: " + status.message();
LOG_ENGINE_ERROR_ << err_msg;
return status;
}
OngoingFileChecker::GetInstance().MarkOngoingFiles(files_to_query);
std::vector<meta::CollectionSchema> partition_array;
status = meta_ptr_->ShowPartitions(collection_id, partition_array);
for (auto& schema : partition_array) {
meta::SegmentsSchema files;
status = meta_ptr_->FilesByType(schema.collection_id_, file_types, files);
status = meta_ptr_->FilesByType(schema.collection_id_, file_types, files_holder);
if (!status.ok()) {
OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files_to_query);
std::string err_msg = "Failed to get files for GetVectorByID: " + status.message();
LOG_ENGINE_ERROR_ << err_msg;
return status;
}
OngoingFileChecker::GetInstance().MarkOngoingFiles(files);
files_to_query.insert(files_to_query.end(), std::make_move_iterator(files.begin()),
std::make_move_iterator(files.end()));
}
if (files_to_query.empty()) {
OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files_to_query);
if (files_holder.HoldFiles().empty()) {
LOG_ENGINE_DEBUG_ << "No files to get vector by id from";
return Status(DB_NOT_FOUND, "Collection is empty");
}
cache::CpuCacheMgr::GetInstance()->PrintInfo();
status = GetVectorsByIdHelper(collection_id, id_array, vectors, files_to_query);
OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files_to_query);
status = GetVectorsByIdHelper(collection_id, id_array, vectors, files_holder);
cache::CpuCacheMgr::GetInstance()->PrintInfo();
return status;
@ -1108,12 +1099,13 @@ DBImpl::GetVectorIDs(const std::string& collection_id, const std::string& segmen
}
// step 2: find segment
meta::SegmentsSchema collection_files;
status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, collection_files);
meta::FilesHolder files_holder;
status = meta_ptr_->GetCollectionFilesBySegmentId(segment_id, files_holder);
if (!status.ok()) {
return status;
}
milvus::engine::meta::SegmentsSchema& collection_files = files_holder.HoldFiles();
if (collection_files.empty()) {
return Status(DB_NOT_FOUND, "Segment does not exist");
}
@ -1163,7 +1155,9 @@ DBImpl::GetVectorIDs(const std::string& collection_id, const std::string& segmen
Status
DBImpl::GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers& id_array,
std::vector<engine::VectorsData>& vectors, const meta::SegmentsSchema& files) {
std::vector<engine::VectorsData>& vectors, meta::FilesHolder& files_holder) {
// attention: this is a copy, not a reference, since the files_holder.UnMarkFile will change the array internal
milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
LOG_ENGINE_DEBUG_ << "Getting vector by id in " << files.size() << " files, id count = " << id_array.size();
// sometimes not all of id_array can be found, we need to return empty vector for id not found
@ -1243,6 +1237,9 @@ DBImpl::GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers&
it++;
}
// unmark file, allow the file to be deleted
files_holder.UnmarkFile(file);
}
for (auto id : id_array) {
@ -1251,8 +1248,8 @@ DBImpl::GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers&
VectorsData data;
data.vector_count_ = vector_ref.vector_count_;
if (data.vector_count_ > 0) {
data.float_data_.swap(vector_ref.float_data_);
data.binary_data_.swap(vector_ref.binary_data_);
data.float_data_ = vector_ref.float_data_; // copy data since there could be duplicated id
data.binary_data_ = vector_ref.binary_data_; // copy data since there could be duplicated id
}
vectors.emplace_back(data);
}
@ -1468,12 +1465,11 @@ DBImpl::HybridQuery(const std::shared_ptr<server::Context>& context, const std::
}
Status status;
meta::SegmentsSchema files_array;
meta::FilesHolder files_holder;
if (partition_tags.empty()) {
// no partition tag specified, means search in whole table
// get all table files from parent table
status = GetFilesToSearch(collection_id, files_array);
status = meta_ptr_->FilesToSearch(collection_id, files_holder);
if (!status.ok()) {
return status;
}
@ -1484,14 +1480,14 @@ DBImpl::HybridQuery(const std::shared_ptr<server::Context>& context, const std::
return status;
}
for (auto& schema : partition_array) {
status = GetFilesToSearch(schema.collection_id_, files_array);
status = meta_ptr_->FilesToSearch(schema.collection_id_, files_holder);
if (!status.ok()) {
return Status(DB_ERROR, "GetFilesToSearch failed in HybridQuery");
return Status(DB_ERROR, "get files to search failed in HybridQuery");
}
}
if (files_array.empty()) {
return Status::OK();
if (files_holder.HoldFiles().empty()) {
return Status::OK(); // no files to search
}
} else {
// get files from specified partitions
@ -1499,19 +1495,19 @@ DBImpl::HybridQuery(const std::shared_ptr<server::Context>& context, const std::
GetPartitionsByTags(collection_id, partition_tags, partition_name_array);
for (auto& partition_name : partition_name_array) {
status = GetFilesToSearch(partition_name, files_array);
status = meta_ptr_->FilesToSearch(partition_name, files_holder);
if (!status.ok()) {
return Status(DB_ERROR, "GetFilesToSearch failed in HybridQuery");
return Status(DB_ERROR, "get files to search failed in HybridQuery");
}
}
if (files_array.empty()) {
if (files_holder.HoldFiles().empty()) {
return Status::OK();
}
}
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query
status = HybridQueryAsync(query_ctx, collection_id, files_array, hybrid_search_context, general_query, attr_type,
status = HybridQueryAsync(query_ctx, collection_id, files_holder, hybrid_search_context, general_query, attr_type,
nq, result_ids, result_distances);
if (!status.ok()) {
return status;
@ -1534,12 +1530,11 @@ DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string
}
Status status;
meta::SegmentsSchema files_array;
meta::FilesHolder files_holder;
if (partition_tags.empty()) {
// no partition tag specified, means search in whole collection
// get all collection files from parent collection
status = GetFilesToSearch(collection_id, files_array);
status = meta_ptr_->FilesToSearch(collection_id, files_holder);
if (!status.ok()) {
return status;
}
@ -1547,11 +1542,11 @@ DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string
std::vector<meta::CollectionSchema> partition_array;
status = meta_ptr_->ShowPartitions(collection_id, partition_array);
for (auto& schema : partition_array) {
status = GetFilesToSearch(schema.collection_id_, files_array);
status = meta_ptr_->FilesToSearch(schema.collection_id_, files_holder);
}
if (files_array.empty()) {
return Status::OK();
if (files_holder.HoldFiles().empty()) {
return Status::OK(); // no files to search
}
} else {
// get files from specified partitions
@ -1562,16 +1557,16 @@ DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string
}
for (auto& partition_name : partition_name_array) {
status = GetFilesToSearch(partition_name, files_array);
status = meta_ptr_->FilesToSearch(partition_name, files_holder);
}
if (files_array.empty()) {
return Status::OK();
if (files_holder.HoldFiles().empty()) {
return Status::OK(); // no files to search
}
}
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query
status = QueryAsync(tracer.Context(), files_array, k, extra_params, vectors, result_ids, result_distances);
status = QueryAsync(tracer.Context(), files_holder, k, extra_params, vectors, result_ids, result_distances);
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query
return status;
@ -1594,19 +1589,19 @@ DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std
ids.push_back(std::stoul(id, &sz));
}
meta::SegmentsSchema search_files;
auto status = meta_ptr_->FilesByID(ids, search_files);
meta::FilesHolder files_holder;
auto status = meta_ptr_->FilesByID(ids, files_holder);
if (!status.ok()) {
return status;
}
fiu_do_on("DBImpl.QueryByFileID.empty_files_array", search_files.clear());
milvus::engine::meta::SegmentsSchema& search_files = files_holder.HoldFiles();
if (search_files.empty()) {
return Status(DB_ERROR, "Invalid file id");
}
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info before query
status = QueryAsync(tracer.Context(), search_files, k, extra_params, vectors, result_ids, result_distances);
status = QueryAsync(tracer.Context(), files_holder, k, extra_params, vectors, result_ids, result_distances);
cache::CpuCacheMgr::GetInstance()->PrintInfo(); // print cache info after query
return status;
@ -1625,12 +1620,13 @@ DBImpl::Size(uint64_t& result) {
// internal methods
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Status
DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const meta::SegmentsSchema& files, uint64_t k,
DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, meta::FilesHolder& files_holder, uint64_t k,
const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
ResultDistances& result_distances) {
milvus::server::ContextChild tracer(context, "Query Async");
server::CollectQueryMetrics metrics(vectors.vector_count_);
milvus::engine::meta::SegmentsSchema& files = files_holder.HoldFiles();
if (files.size() > milvus::scheduler::TASK_TABLE_MAX_COUNT) {
std::string msg =
"Search files count exceed scheduler limit: " + std::to_string(milvus::scheduler::TASK_TABLE_MAX_COUNT);
@ -1641,8 +1637,6 @@ DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const meta::
TimeRecorder rc("");
// step 1: construct search job
auto status = OngoingFileChecker::GetInstance().MarkOngoingFiles(files);
LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, index file count: %ld", files.size());
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(tracer.Context(), k, extra_params, vectors);
for (auto& file : files) {
@ -1654,7 +1648,7 @@ DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const meta::
scheduler::JobMgrInst::GetInstance()->Put(job);
job->WaitResult();
status = OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files);
files_holder.ReleaseFiles();
if (!job->GetStatus().ok()) {
return job->GetStatus();
}
@ -1669,7 +1663,7 @@ DBImpl::QueryAsync(const std::shared_ptr<server::Context>& context, const meta::
Status
DBImpl::HybridQueryAsync(const std::shared_ptr<server::Context>& context, const std::string& table_id,
const meta::SegmentsSchema& files, context::HybridSearchContextPtr hybrid_search_context,
meta::FilesHolder& files_holder, context::HybridSearchContextPtr hybrid_search_context,
query::GeneralQueryPtr general_query,
std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type, uint64_t& nq,
ResultIds& result_ids, ResultDistances& result_distances) {
@ -1698,11 +1692,9 @@ DBImpl::HybridQueryAsync(const std::shared_ptr<server::Context>& context, const
TimeRecorder rc("");
// step 1: construct search job
auto status = OngoingFileChecker::GetInstance().MarkOngoingFiles(files);
VectorsData vectors;
LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, index file count: %ld", files.size());
milvus::engine::meta::SegmentsSchema& files = files_holder.HoldFiles();
LOG_ENGINE_DEBUG_ << LogOut("Engine query begin, index file count: %ld", files_holder.HoldFiles().size());
scheduler::SearchJobPtr job =
std::make_shared<scheduler::SearchJob>(query_async_ctx, general_query, attr_type, vectors);
for (auto& file : files) {
@ -1714,7 +1706,7 @@ DBImpl::HybridQueryAsync(const std::shared_ptr<server::Context>& context, const
scheduler::JobMgrInst::GetInstance()->Put(job);
job->WaitResult();
status = OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files);
files_holder.ReleaseFiles();
if (!job->GetStatus().ok()) {
return job->GetStatus();
}
@ -1840,7 +1832,7 @@ DBImpl::StartMergeTask() {
}
Status
DBImpl::MergeFiles(const std::string& collection_id, const meta::SegmentsSchema& files) {
DBImpl::MergeFiles(const std::string& collection_id, meta::FilesHolder& files_holder) {
// const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id;
@ -1868,11 +1860,16 @@ DBImpl::MergeFiles(const std::string& collection_id, const meta::SegmentsSchema&
utils::GetParentPath(collection_file.location_, new_segment_dir);
auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);
// attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
for (auto& file : files) {
server::CollectMergeFilesMetrics metrics;
std::string segment_dir_to_merge;
utils::GetParentPath(file.location_, segment_dir_to_merge);
segment_writer_ptr->Merge(segment_dir_to_merge, collection_file.file_id_);
files_holder.UnmarkFile(file);
auto file_schema = file;
file_schema.file_type_ = meta::SegmentSchema::TO_DELETE;
updated.push_back(file_schema);
@ -1931,7 +1928,7 @@ DBImpl::MergeFiles(const std::string& collection_id, const meta::SegmentsSchema&
}
Status
DBImpl::MergeHybridFiles(const std::string& collection_id, const milvus::engine::meta::SegmentsSchema& files) {
DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& files_holder) {
// const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id;
@ -1959,11 +1956,16 @@ DBImpl::MergeHybridFiles(const std::string& collection_id, const milvus::engine:
utils::GetParentPath(table_file.location_, new_segment_dir);
auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);
// attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
for (auto& file : files) {
server::CollectMergeFilesMetrics metrics;
std::string segment_dir_to_merge;
utils::GetParentPath(file.location_, segment_dir_to_merge);
segment_writer_ptr->Merge(segment_dir_to_merge, table_file.file_id_);
files_holder.UnmarkFile(file);
auto file_schema = file;
file_schema.file_type_ = meta::SegmentSchema::TO_DELETE;
updated.push_back(file_schema);
@ -2024,21 +2026,19 @@ Status
DBImpl::BackgroundMergeFiles(const std::string& collection_id) {
const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
meta::SegmentsSchema raw_files;
auto status = meta_ptr_->FilesToMerge(collection_id, raw_files);
meta::FilesHolder files_holder;
auto status = meta_ptr_->FilesToMerge(collection_id, files_holder);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_id;
return status;
}
if (raw_files.size() < options_.merge_trigger_number_) {
if (files_holder.HoldFiles().size() < options_.merge_trigger_number_) {
LOG_ENGINE_TRACE_ << "Files number not greater equal than merge trigger number, skip merge action";
return Status::OK();
}
status = OngoingFileChecker::GetInstance().MarkOngoingFiles(raw_files);
MergeFiles(collection_id, raw_files);
status = OngoingFileChecker::GetInstance().UnmarkOngoingFiles(raw_files);
MergeFiles(collection_id, files_holder);
if (!initialized_.load(std::memory_order_acquire)) {
LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection: " << collection_id;
@ -2103,13 +2103,14 @@ DBImpl::StartBuildIndexTask() {
void
DBImpl::BackgroundBuildIndex() {
std::unique_lock<std::mutex> lock(build_index_mutex_);
meta::SegmentsSchema to_index_files;
meta_ptr_->FilesToIndex(to_index_files);
meta::FilesHolder files_holder;
meta_ptr_->FilesToIndex(files_holder);
milvus::engine::meta::SegmentsSchema& to_index_files = files_holder.HoldFiles();
Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files);
if (!to_index_files.empty()) {
LOG_ENGINE_DEBUG_ << "Background build index thread begin";
status = OngoingFileChecker::GetInstance().MarkOngoingFiles(to_index_files);
LOG_ENGINE_DEBUG_ << "Background build index thread begin " << to_index_files.size() << " files";
// step 2: put build index task to scheduler
std::vector<std::pair<scheduler::BuildIndexJobPtr, scheduler::SegmentSchemaPtr>> job2file_map;
@ -2136,7 +2137,8 @@ DBImpl::BackgroundBuildIndex() {
index_failed_checker_.MarkSucceedIndexFile(file_schema);
}
status = OngoingFileChecker::GetInstance().UnmarkOngoingFile(file_schema);
status = files_holder.UnmarkFile(file_schema);
LOG_ENGINE_DEBUG_ << "Finish build index file " << file_schema.file_id_;
}
LOG_ENGINE_DEBUG_ << "Background build index thread finished";
@ -2146,39 +2148,26 @@ DBImpl::BackgroundBuildIndex() {
Status
DBImpl::GetFilesToBuildIndex(const std::string& collection_id, const std::vector<int>& file_types,
meta::SegmentsSchema& files) {
files.clear();
auto status = meta_ptr_->FilesByType(collection_id, file_types, files);
meta::FilesHolder& files_holder) {
files_holder.ReleaseFiles();
auto status = meta_ptr_->FilesByType(collection_id, file_types, files_holder);
// only build index for files that row count greater than certain threshold
for (auto it = files.begin(); it != files.end();) {
if ((*it).file_type_ == static_cast<int>(meta::SegmentSchema::RAW) &&
(*it).row_count_ < meta::BUILD_INDEX_THRESHOLD) {
it = files.erase(it);
} else {
++it;
// attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
for (const milvus::engine::meta::SegmentSchema& file : files) {
if (file.file_type_ == static_cast<int>(meta::SegmentSchema::RAW) &&
file.row_count_ < meta::BUILD_INDEX_THRESHOLD) {
// skip build index for files that row count less than certain threshold
files_holder.UnmarkFile(file);
} else if (index_failed_checker_.IsFailedIndexFile(file)) {
// skip build index for files that failed before
files_holder.UnmarkFile(file);
}
}
return Status::OK();
}
Status
DBImpl::GetFilesToSearch(const std::string& collection_id, meta::SegmentsSchema& files) {
LOG_ENGINE_DEBUG_ << "Collect files from collection: " << collection_id;
meta::SegmentsSchema search_files;
auto status = meta_ptr_->FilesToSearch(collection_id, search_files);
if (!status.ok()) {
return status;
}
for (auto& file : search_files) {
files.push_back(file);
}
return Status::OK();
}
Status
DBImpl::GetPartitionByTag(const std::string& collection_id, const std::string& partition_tag,
std::string& partition_name) {
@ -2317,26 +2306,27 @@ DBImpl::WaitCollectionIndexRecursively(const std::string& collection_id, const C
}
// get files to build index
meta::SegmentsSchema collection_files;
auto status = GetFilesToBuildIndex(collection_id, file_types, collection_files);
int times = 1;
{
meta::FilesHolder files_holder;
auto status = GetFilesToBuildIndex(collection_id, file_types, files_holder);
int times = 1;
while (!collection_files.empty()) {
LOG_ENGINE_DEBUG_ << "Non index files detected! Will build index " << times;
if (!utils::IsRawIndexType(index.engine_type_)) {
status = meta_ptr_->UpdateCollectionFilesToIndex(collection_id);
while (!files_holder.HoldFiles().empty()) {
LOG_ENGINE_DEBUG_ << files_holder.HoldFiles().size() << " non-index files detected! Will build index "
<< times;
if (!utils::IsRawIndexType(index.engine_type_)) {
status = meta_ptr_->UpdateCollectionFilesToIndex(collection_id);
}
index_req_swn_.Wait_For(std::chrono::seconds(WAIT_BUILD_INDEX_INTERVAL));
GetFilesToBuildIndex(collection_id, file_types, files_holder);
++times;
}
index_req_swn_.Wait_For(std::chrono::seconds(WAIT_BUILD_INDEX_INTERVAL));
GetFilesToBuildIndex(collection_id, file_types, collection_files);
++times;
index_failed_checker_.IgnoreFailedIndexFiles(collection_files);
}
// build index for partition
std::vector<meta::CollectionSchema> partition_array;
status = meta_ptr_->ShowPartitions(collection_id, partition_array);
auto status = meta_ptr_->ShowPartitions(collection_id, partition_array);
for (auto& schema : partition_array) {
status = WaitCollectionIndexRecursively(schema.collection_id_, index);
fiu_do_on("DBImpl.WaitCollectionIndexRecursively.fail_build_collection_Index_for_partition",
@ -2354,6 +2344,8 @@ DBImpl::WaitCollectionIndexRecursively(const std::string& collection_id, const C
return Status(DB_ERROR, err_msg);
}
LOG_ENGINE_DEBUG_ << "WaitCollectionIndexRecursively finished";
return Status::OK();
}
@ -2673,6 +2665,7 @@ DBImpl::BackgroundMetricThread() {
swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL));
StartMetricTask();
meta::FilesHolder::PrintInfo();
}
}

View File

@ -27,9 +27,9 @@
#include "config/handler/EngineConfigHandler.h"
#include "db/DB.h"
#include "db/IndexFailedChecker.h"
#include "db/OngoingFileChecker.h"
#include "db/Types.h"
#include "db/insert/MemManager.h"
#include "db/meta/FilesHolder.h"
#include "utils/ThreadPool.h"
#include "wal/WalManager.h"
@ -183,20 +183,20 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
private:
Status
QueryAsync(const std::shared_ptr<server::Context>& context, const meta::SegmentsSchema& files, uint64_t k,
QueryAsync(const std::shared_ptr<server::Context>& context, meta::FilesHolder& files_holder, uint64_t k,
const milvus::json& extra_params, const VectorsData& vectors, ResultIds& result_ids,
ResultDistances& result_distances);
Status
HybridQueryAsync(const std::shared_ptr<server::Context>& context, const std::string& table_id,
const meta::SegmentsSchema& files, context::HybridSearchContextPtr hybrid_search_context,
meta::FilesHolder& files_holder, context::HybridSearchContextPtr hybrid_search_context,
query::GeneralQueryPtr general_query,
std::unordered_map<std::string, engine::meta::hybrid::DataType>& attr_type, uint64_t& nq,
ResultIds& result_ids, ResultDistances& result_distances);
Status
GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers& id_array,
std::vector<engine::VectorsData>& vectors, const meta::SegmentsSchema& files);
std::vector<engine::VectorsData>& vectors, meta::FilesHolder& files_holder);
void
InternalFlush(const std::string& collection_id = "");
@ -226,7 +226,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
StartMergeTask();
Status
MergeFiles(const std::string& collection_id, const meta::SegmentsSchema& files);
MergeFiles(const std::string& collection_id, meta::FilesHolder& files_holder);
Status
BackgroundMergeFiles(const std::string& collection_id);
@ -235,7 +235,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
BackgroundMerge(std::set<std::string> collection_ids);
Status
MergeHybridFiles(const std::string& table_id, const meta::SegmentsSchema& files);
MergeHybridFiles(const std::string& table_id, meta::FilesHolder& files_holder);
void
StartBuildIndexTask();
@ -254,10 +254,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
Status
GetFilesToBuildIndex(const std::string& collection_id, const std::vector<int>& file_types,
meta::SegmentsSchema& files);
Status
GetFilesToSearch(const std::string& collection_id, meta::SegmentsSchema& files);
meta::FilesHolder& files_holder);
Status
GetPartitionByTag(const std::string& collection_id, const std::string& partition_tag, std::string& partition_name);

View File

@ -74,6 +74,23 @@ IndexFailedChecker::MarkSucceedIndexFile(const meta::SegmentSchema& file) {
return Status::OK();
}
bool
IndexFailedChecker::IsFailedIndexFile(const meta::SegmentSchema& file) {
std::lock_guard<std::mutex> lck(mutex_);
auto it_failed_files = index_failed_files_.find(file.collection_id_);
if (it_failed_files != index_failed_files_.end()) {
auto it_failed_file = it_failed_files->second.find(file.file_id_);
if (it_failed_file != it_failed_files->second.end()) {
if (it_failed_file->second.size() >= INDEX_FAILED_RETRY_TIME) {
return true;
}
}
}
return false;
}
Status
IndexFailedChecker::IgnoreFailedIndexFiles(meta::SegmentsSchema& table_files) {
std::lock_guard<std::mutex> lck(mutex_);

View File

@ -36,6 +36,9 @@ class IndexFailedChecker {
Status
MarkSucceedIndexFile(const meta::SegmentSchema& file);
bool
IsFailedIndexFile(const meta::SegmentSchema& file);
Status
IgnoreFailedIndexFiles(meta::SegmentsSchema& table_files);

View File

@ -1,130 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/OngoingFileChecker.h"
#include "utils/Log.h"
#include <utility>
namespace milvus {
namespace engine {
OngoingFileChecker&
OngoingFileChecker::GetInstance() {
static OngoingFileChecker instance;
return instance;
}
Status
OngoingFileChecker::MarkOngoingFile(const meta::SegmentSchema& table_file) {
std::lock_guard<std::mutex> lck(mutex_);
return MarkOngoingFileNoLock(table_file);
}
Status
OngoingFileChecker::MarkOngoingFiles(const meta::SegmentsSchema& table_files) {
std::lock_guard<std::mutex> lck(mutex_);
for (auto& table_file : table_files) {
MarkOngoingFileNoLock(table_file);
}
return Status::OK();
}
Status
OngoingFileChecker::UnmarkOngoingFile(const meta::SegmentSchema& table_file) {
std::lock_guard<std::mutex> lck(mutex_);
return UnmarkOngoingFileNoLock(table_file);
}
Status
OngoingFileChecker::UnmarkOngoingFiles(const meta::SegmentsSchema& table_files) {
std::lock_guard<std::mutex> lck(mutex_);
for (auto& table_file : table_files) {
UnmarkOngoingFileNoLock(table_file);
}
return Status::OK();
}
bool
OngoingFileChecker::IsIgnored(const meta::SegmentSchema& schema) {
std::lock_guard<std::mutex> lck(mutex_);
auto iter = ongoing_files_.find(schema.collection_id_);
if (iter == ongoing_files_.end()) {
return false;
} else {
auto it_file = iter->second.find(schema.file_id_);
if (it_file == iter->second.end()) {
return false;
} else {
return (it_file->second > 0);
}
}
}
Status
OngoingFileChecker::MarkOngoingFileNoLock(const meta::SegmentSchema& table_file) {
if (table_file.collection_id_.empty() || table_file.file_id_.empty()) {
return Status(DB_ERROR, "Invalid collection files");
}
auto iter = ongoing_files_.find(table_file.collection_id_);
if (iter == ongoing_files_.end()) {
File2RefCount files_refcount;
files_refcount.insert(std::make_pair(table_file.file_id_, 1));
ongoing_files_.insert(std::make_pair(table_file.collection_id_, files_refcount));
} else {
auto it_file = iter->second.find(table_file.file_id_);
if (it_file == iter->second.end()) {
iter->second[table_file.file_id_] = 1;
} else {
it_file->second++;
}
}
LOG_ENGINE_DEBUG_ << "Mark ongoing file:" << table_file.file_id_
<< " refcount:" << ongoing_files_[table_file.collection_id_][table_file.file_id_];
return Status::OK();
}
Status
OngoingFileChecker::UnmarkOngoingFileNoLock(const meta::SegmentSchema& table_file) {
if (table_file.collection_id_.empty() || table_file.file_id_.empty()) {
return Status(DB_ERROR, "Invalid collection files");
}
auto iter = ongoing_files_.find(table_file.collection_id_);
if (iter != ongoing_files_.end()) {
auto it_file = iter->second.find(table_file.file_id_);
if (it_file != iter->second.end()) {
it_file->second--;
LOG_ENGINE_DEBUG_ << "Unmark ongoing file:" << table_file.file_id_ << " refcount:" << it_file->second;
if (it_file->second <= 0) {
iter->second.erase(table_file.file_id_);
if (iter->second.empty()) {
ongoing_files_.erase(table_file.collection_id_);
}
}
}
}
return Status::OK();
}
} // namespace engine
} // namespace milvus

View File

@ -1,59 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include "db/Types.h"
#include "meta/Meta.h"
#include "utils/Status.h"
#include <map>
#include <mutex>
#include <set>
#include <string>
namespace milvus {
namespace engine {
class OngoingFileChecker {
public:
static OngoingFileChecker&
GetInstance();
Status
MarkOngoingFile(const meta::SegmentSchema& table_file);
Status
MarkOngoingFiles(const meta::SegmentsSchema& table_files);
Status
UnmarkOngoingFile(const meta::SegmentSchema& table_file);
Status
UnmarkOngoingFiles(const meta::SegmentsSchema& table_files);
bool
IsIgnored(const meta::SegmentSchema& schema);
private:
Status
MarkOngoingFileNoLock(const meta::SegmentSchema& table_file);
Status
UnmarkOngoingFileNoLock(const meta::SegmentSchema& table_file);
private:
std::mutex mutex_;
Table2FileRef ongoing_files_; // collection id mapping to (file id mapping to ongoing ref-count)
};
} // namespace engine
} // namespace milvus

View File

@ -58,8 +58,6 @@ struct Entity {
using File2ErrArray = std::map<std::string, std::vector<std::string>>;
using Table2FileErr = std::map<std::string, File2ErrArray>;
using File2RefCount = std::map<std::string, int64_t>;
using Table2FileRef = std::map<std::string, File2RefCount>;
static const char* DEFAULT_PARTITON_TAG = "_default";

View File

@ -16,9 +16,9 @@
#include <unordered_map>
#include "cache/CpuCacheMgr.h"
#include "db/OngoingFileChecker.h"
#include "db/Utils.h"
#include "db/insert/MemTable.h"
#include "db/meta/FilesHolder.h"
#include "knowhere/index/vector_index/VecIndex.h"
#include "segment/SegmentReader.h"
#include "utils/Log.h"
@ -206,22 +206,22 @@ MemTable::ApplyDeletes() {
std::vector<int> file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX,
meta::SegmentSchema::FILE_TYPE::BACKUP};
meta::SegmentsSchema table_files;
auto status = meta_->FilesByType(collection_id_, file_types, table_files);
meta::FilesHolder files_holder;
auto status = meta_->FilesByType(collection_id_, file_types, files_holder);
if (!status.ok()) {
std::string err_msg = "Failed to apply deletes: " + status.ToString();
LOG_ENGINE_ERROR_ << err_msg;
return Status(DB_ERROR, err_msg);
}
OngoingFileChecker::GetInstance().MarkOngoingFiles(table_files);
// attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
std::unordered_map<size_t, std::vector<segment::doc_id_t>> ids_to_check_map;
for (size_t i = 0; i < table_files.size(); ++i) {
auto& table_file = table_files[i];
// which file need to be apply delete
std::unordered_map<size_t, std::vector<segment::doc_id_t>> ids_to_check_map; // file id mapping to delete ids
for (auto& file : files) {
std::string segment_dir;
utils::GetParentPath(table_file.location_, segment_dir);
utils::GetParentPath(file.location_, segment_dir);
segment::SegmentReader segment_reader(segment_dir);
segment::IdBloomFilterPtr id_bloom_filter_ptr;
@ -229,37 +229,35 @@ MemTable::ApplyDeletes() {
for (auto& id : doc_ids_to_delete_) {
if (id_bloom_filter_ptr->Check(id)) {
ids_to_check_map[i].emplace_back(id);
ids_to_check_map[file.id_].emplace_back(id);
}
}
}
meta::SegmentsSchema files_to_check;
for (auto& kv : ids_to_check_map) {
files_to_check.emplace_back(table_files[kv.first]);
// release unused files
for (auto& file : files) {
if (ids_to_check_map.find(file.id_) == ids_to_check_map.end()) {
files_holder.UnmarkFile(file);
}
}
OngoingFileChecker::GetInstance().UnmarkOngoingFiles(table_files);
// attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
milvus::engine::meta::SegmentsSchema hold_files = files_holder.HoldFiles();
recorder.RecordSection("Found " + std::to_string(hold_files.size()) + " segment to apply deletes");
OngoingFileChecker::GetInstance().MarkOngoingFiles(files_to_check);
meta::SegmentsSchema files_to_update;
for (auto& file : hold_files) {
LOG_ENGINE_DEBUG_ << "Applying deletes in segment: " << file.segment_id_;
recorder.RecordSection("Found " + std::to_string(ids_to_check_map.size()) + " segment to apply deletes");
meta::SegmentsSchema table_files_to_update;
for (auto& kv : ids_to_check_map) {
auto& table_file = table_files[kv.first];
LOG_ENGINE_DEBUG_ << "Applying deletes in segment: " << table_file.segment_id_;
TimeRecorder rec("handle segment " + table_file.segment_id_);
TimeRecorder rec("handle segment " + file.segment_id_);
std::string segment_dir;
utils::GetParentPath(table_file.location_, segment_dir);
utils::GetParentPath(file.location_, segment_dir);
segment::SegmentReader segment_reader(segment_dir);
auto& segment_id = table_file.segment_id_;
meta::SegmentsSchema segment_files;
status = meta_->GetCollectionFilesBySegmentId(segment_id, segment_files);
auto& segment_id = file.segment_id_;
meta::FilesHolder segment_holder;
status = meta_->GetCollectionFilesBySegmentId(segment_id, segment_holder);
if (!status.ok()) {
break;
}
@ -267,8 +265,9 @@ MemTable::ApplyDeletes() {
// Get all index that contains blacklist in cache
std::vector<knowhere::VecIndexPtr> indexes;
std::vector<faiss::ConcurrentBitsetPtr> blacklists;
for (auto& file : segment_files) {
auto data_obj_ptr = cache::CpuCacheMgr::GetInstance()->GetIndex(file.location_);
milvus::engine::meta::SegmentsSchema& segment_files = segment_holder.HoldFiles();
for (auto& segment_file : segment_files) {
auto data_obj_ptr = cache::CpuCacheMgr::GetInstance()->GetIndex(segment_file.location_);
auto index = std::static_pointer_cast<knowhere::VecIndex>(data_obj_ptr);
if (index != nullptr) {
faiss::ConcurrentBitsetPtr blacklist = index->GetBlacklist();
@ -290,7 +289,7 @@ MemTable::ApplyDeletes() {
break;
}
auto& ids_to_check = kv.second;
auto& ids_to_check = ids_to_check_map[file.id_];
segment::DeletedDocsPtr deleted_docs = std::make_shared<segment::DeletedDocs>();
@ -361,11 +360,13 @@ MemTable::ApplyDeletes() {
rec.RecordSection("Updated bloom filter");
// Update collection file row count
for (auto& file : segment_files) {
if (file.file_type_ == meta::SegmentSchema::RAW || file.file_type_ == meta::SegmentSchema::TO_INDEX ||
file.file_type_ == meta::SegmentSchema::INDEX || file.file_type_ == meta::SegmentSchema::BACKUP) {
file.row_count_ -= delete_count;
table_files_to_update.emplace_back(file);
for (auto& segment_file : segment_files) {
if (segment_file.file_type_ == meta::SegmentSchema::RAW ||
segment_file.file_type_ == meta::SegmentSchema::TO_INDEX ||
segment_file.file_type_ == meta::SegmentSchema::INDEX ||
segment_file.file_type_ == meta::SegmentSchema::BACKUP) {
segment_file.row_count_ -= delete_count;
files_to_update.emplace_back(segment_file);
}
}
rec.RecordSection("Update collection file row count in vector");
@ -373,7 +374,7 @@ MemTable::ApplyDeletes() {
recorder.RecordSection("Finished " + std::to_string(ids_to_check_map.size()) + " segment to apply deletes");
status = meta_->UpdateCollectionFilesRowCount(table_files_to_update);
status = meta_->UpdateCollectionFilesRowCount(files_to_update);
if (!status.ok()) {
std::string err_msg = "Failed to apply deletes: " + status.ToString();
@ -386,8 +387,6 @@ MemTable::ApplyDeletes() {
recorder.RecordSection("Update deletes to meta");
recorder.ElapseFromBegin("Finished deletes");
OngoingFileChecker::GetInstance().UnmarkOngoingFiles(files_to_check);
return Status::OK();
}

View File

@ -0,0 +1,237 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/meta/FilesHolder.h"
#include "utils/Log.h"
#include <utility>
namespace milvus {
namespace engine {
namespace meta {
//////////////////////////////////////////////////////////////////////////////////////////////////////////
FilesHolder::OngoingFileChecker&
FilesHolder::OngoingFileChecker::GetInstance() {
static OngoingFileChecker instance;
return instance;
}
Status
FilesHolder::OngoingFileChecker::MarkOngoingFile(const meta::SegmentSchema& table_file) {
std::lock_guard<std::mutex> lck(mutex_);
return MarkOngoingFileNoLock(table_file);
}
Status
FilesHolder::OngoingFileChecker::MarkOngoingFiles(const meta::SegmentsSchema& table_files) {
std::lock_guard<std::mutex> lck(mutex_);
for (auto& table_file : table_files) {
MarkOngoingFileNoLock(table_file);
}
return Status::OK();
}
Status
FilesHolder::OngoingFileChecker::UnmarkOngoingFile(const meta::SegmentSchema& table_file) {
std::lock_guard<std::mutex> lck(mutex_);
return UnmarkOngoingFileNoLock(table_file);
}
Status
FilesHolder::OngoingFileChecker::UnmarkOngoingFiles(const meta::SegmentsSchema& table_files) {
std::lock_guard<std::mutex> lck(mutex_);
for (auto& table_file : table_files) {
UnmarkOngoingFileNoLock(table_file);
}
return Status::OK();
}
bool
FilesHolder::OngoingFileChecker::CanBeDeleted(const meta::SegmentSchema& schema) {
std::lock_guard<std::mutex> lck(mutex_);
auto iter = ongoing_files_.find(schema.collection_id_);
if (iter == ongoing_files_.end()) {
return true;
} else {
auto it_file = iter->second.find(schema.id_);
if (it_file == iter->second.end()) {
return true;
} else {
return (it_file->second > 0) ? false : true;
}
}
}
void
FilesHolder::OngoingFileChecker::PrintInfo() {
std::lock_guard<std::mutex> lck(mutex_);
if (!ongoing_files_.empty()) {
LOG_ENGINE_DEBUG_ << "File reference information:";
for (meta::Table2FileRef::iterator iter = ongoing_files_.begin(); iter != ongoing_files_.end(); ++iter) {
LOG_ENGINE_DEBUG_ << "\t" << iter->first << ": " << iter->second.size() << " files in use";
}
}
}
Status
FilesHolder::OngoingFileChecker::MarkOngoingFileNoLock(const meta::SegmentSchema& table_file) {
if (table_file.collection_id_.empty() || table_file.file_id_.empty()) {
return Status(DB_ERROR, "Invalid collection files");
}
auto iter = ongoing_files_.find(table_file.collection_id_);
if (iter == ongoing_files_.end()) {
File2RefCount files_refcount;
files_refcount.insert(std::make_pair(table_file.id_, 1));
ongoing_files_.insert(std::make_pair(table_file.collection_id_, files_refcount));
} else {
auto it_file = iter->second.find(table_file.id_);
if (it_file == iter->second.end()) {
iter->second[table_file.id_] = 1;
} else {
it_file->second++;
}
}
LOG_ENGINE_DEBUG_ << "Mark ongoing file:" << table_file.file_id_
<< " refcount:" << ongoing_files_[table_file.collection_id_][table_file.id_];
return Status::OK();
}
Status
FilesHolder::OngoingFileChecker::UnmarkOngoingFileNoLock(const meta::SegmentSchema& table_file) {
if (table_file.collection_id_.empty() || table_file.file_id_.empty()) {
return Status(DB_ERROR, "Invalid collection files");
}
auto iter = ongoing_files_.find(table_file.collection_id_);
if (iter != ongoing_files_.end()) {
auto it_file = iter->second.find(table_file.id_);
if (it_file != iter->second.end()) {
it_file->second--;
LOG_ENGINE_DEBUG_ << "Unmark ongoing file:" << table_file.file_id_ << " refcount:" << it_file->second;
if (it_file->second <= 0) {
iter->second.erase(table_file.id_);
if (iter->second.empty()) {
ongoing_files_.erase(table_file.collection_id_);
}
}
}
}
return Status::OK();
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
FilesHolder::FilesHolder() {
}
FilesHolder::~FilesHolder() {
ReleaseFiles();
}
Status
FilesHolder::MarkFile(const meta::SegmentSchema& file) {
std::lock_guard<std::mutex> lck(mutex_);
return MarkFileInternal(file);
}
Status
FilesHolder::MarkFiles(const meta::SegmentsSchema& files) {
std::lock_guard<std::mutex> lck(mutex_);
for (auto& file : files) {
MarkFileInternal(file);
}
return Status::OK();
}
Status
FilesHolder::UnmarkFile(const meta::SegmentSchema& file) {
std::lock_guard<std::mutex> lck(mutex_);
return UnmarkFileInternal(file);
}
Status
FilesHolder::UnmarkFiles(const meta::SegmentsSchema& files) {
std::lock_guard<std::mutex> lck(mutex_);
for (auto& file : files) {
UnmarkFileInternal(file);
}
return Status::OK();
}
void
FilesHolder::ReleaseFiles() {
std::lock_guard<std::mutex> lck(mutex_);
OngoingFileChecker::GetInstance().UnmarkOngoingFiles(hold_files_);
hold_files_.clear();
unique_ids_.clear();
}
bool
FilesHolder::CanBeDeleted(const meta::SegmentSchema& file) {
return OngoingFileChecker::GetInstance().CanBeDeleted(file);
}
void
FilesHolder::PrintInfo() {
return OngoingFileChecker::GetInstance().PrintInfo();
}
Status
FilesHolder::MarkFileInternal(const meta::SegmentSchema& file) {
if (unique_ids_.find(file.id_) != unique_ids_.end()) {
return Status::OK(); // already marked
}
auto status = OngoingFileChecker::GetInstance().MarkOngoingFile(file);
if (status.ok()) {
unique_ids_.insert(file.id_);
hold_files_.push_back(file);
}
return status;
}
Status
FilesHolder::UnmarkFileInternal(const meta::SegmentSchema& file) {
if (unique_ids_.find(file.id_) == unique_ids_.end()) {
return Status::OK(); // no such file
}
auto status = OngoingFileChecker::GetInstance().UnmarkOngoingFile(file);
if (status.ok()) {
for (auto iter = hold_files_.begin(); iter != hold_files_.end(); ++iter) {
if (file.id_ == (*iter).id_) {
hold_files_.erase(iter);
break;
}
}
unique_ids_.erase(file.id_);
}
return status;
}
} // namespace meta
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,113 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include "db/meta/Meta.h"
#include "utils/Status.h"
#include <map>
#include <mutex>
#include <set>
#include <string>
namespace milvus {
namespace engine {
namespace meta {
class FilesHolder {
public:
FilesHolder();
virtual ~FilesHolder();
Status
MarkFile(const meta::SegmentSchema& file);
Status
MarkFiles(const meta::SegmentsSchema& files);
Status
UnmarkFile(const meta::SegmentSchema& file);
Status
UnmarkFiles(const meta::SegmentsSchema& files);
const milvus::engine::meta::SegmentsSchema&
HoldFiles() const {
return hold_files_;
}
milvus::engine::meta::SegmentsSchema&
HoldFiles() {
return hold_files_;
}
void
ReleaseFiles();
static bool
CanBeDeleted(const meta::SegmentSchema& file);
static void
PrintInfo();
private:
class OngoingFileChecker {
public:
static OngoingFileChecker&
GetInstance();
Status
MarkOngoingFile(const meta::SegmentSchema& file);
Status
MarkOngoingFiles(const meta::SegmentsSchema& files);
Status
UnmarkOngoingFile(const meta::SegmentSchema& file);
Status
UnmarkOngoingFiles(const meta::SegmentsSchema& files);
bool
CanBeDeleted(const meta::SegmentSchema& file);
void
PrintInfo();
private:
Status
MarkOngoingFileNoLock(const meta::SegmentSchema& file);
Status
UnmarkOngoingFileNoLock(const meta::SegmentSchema& file);
private:
std::mutex mutex_;
meta::Table2FileRef ongoing_files_; // collection id mapping to (file id mapping to ongoing ref-count)
};
private:
Status
MarkFileInternal(const meta::SegmentSchema& file);
Status
UnmarkFileInternal(const meta::SegmentSchema& file);
private:
std::mutex mutex_;
milvus::engine::meta::SegmentsSchema hold_files_;
std::set<uint64_t> unique_ids_;
};
} // namespace meta
} // namespace engine
} // namespace milvus

View File

@ -19,6 +19,7 @@
#include "MetaTypes.h"
#include "db/Options.h"
#include "db/Types.h"
#include "db/meta/FilesHolder.h"
#include "utils/Status.h"
namespace milvus {
@ -32,6 +33,8 @@ static const char* META_COLLECTIONS = "Collections";
static const char* META_FIELDS = "Fields";
static const char* META_COLLECTIONFILES = "CollectionFiles";
class FilesHolder;
class Meta {
/*
public:
@ -76,11 +79,10 @@ class Meta {
CreateCollectionFile(SegmentSchema& file_schema) = 0;
virtual Status
GetCollectionFiles(const std::string& collection_id, const std::vector<size_t>& ids,
SegmentsSchema& table_files) = 0;
GetCollectionFiles(const std::string& collection_id, const std::vector<size_t>& ids, FilesHolder& files_holder) = 0;
virtual Status
GetCollectionFilesBySegmentId(const std::string& segment_id, SegmentsSchema& table_files) = 0;
GetCollectionFilesBySegmentId(const std::string& segment_id, FilesHolder& files_holder) = 0;
virtual Status
UpdateCollectionFile(SegmentSchema& file_schema) = 0;
@ -117,19 +119,19 @@ class Meta {
GetPartitionName(const std::string& collection_name, const std::string& tag, std::string& partition_name) = 0;
virtual Status
FilesToSearch(const std::string& collection_id, SegmentsSchema& files) = 0;
FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) = 0;
virtual Status
FilesToMerge(const std::string& collection_id, SegmentsSchema& files) = 0;
FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) = 0;
virtual Status
FilesToIndex(SegmentsSchema&) = 0;
FilesToIndex(FilesHolder& files_holder) = 0;
virtual Status
FilesByType(const std::string& collection_id, const std::vector<int>& file_types, SegmentsSchema& files) = 0;
FilesByType(const std::string& collection_id, const std::vector<int>& file_types, FilesHolder& files_holder) = 0;
virtual Status
FilesByID(const std::vector<size_t>& ids, SegmentsSchema& files) = 0;
FilesByID(const std::vector<size_t>& ids, FilesHolder& files_holder) = 0;
virtual Status
Size(uint64_t& result) = 0;

View File

@ -97,6 +97,9 @@ struct SegmentSchema {
using SegmentSchemaPtr = std::shared_ptr<meta::SegmentSchema>;
using SegmentsSchema = std::vector<SegmentSchema>;
using File2RefCount = std::map<uint64_t, int64_t>;
using Table2FileRef = std::map<std::string, File2RefCount>;
namespace hybrid {
enum class DataType {

View File

@ -31,7 +31,6 @@
#include "MetaConsts.h"
#include "db/IDGenerator.h"
#include "db/OngoingFileChecker.h"
#include "db/Utils.h"
#include "metrics/Metrics.h"
#include "utils/CommonUtil.h"
@ -768,7 +767,7 @@ MySQLMetaImpl::CreateCollectionFile(SegmentSchema& file_schema) {
Status
MySQLMetaImpl::GetCollectionFiles(const std::string& collection_id, const std::vector<size_t>& ids,
SegmentsSchema& collection_files) {
FilesHolder& files_holder) {
if (ids.empty()) {
return Status::OK();
}
@ -830,10 +829,10 @@ MySQLMetaImpl::GetCollectionFiles(const std::string& collection_id, const std::v
file_schema.dimension_ = collection_schema.dimension_;
utils::GetCollectionFilePath(options_, file_schema);
collection_files.emplace_back(file_schema);
files_holder.MarkFile(file_schema);
}
LOG_ENGINE_DEBUG_ << "Get collection files by id";
LOG_ENGINE_DEBUG_ << "Get " << res.size() << " files by id from collection " << collection_id;
return ret;
} catch (std::exception& e) {
return HandleException("Failed to get collection files", e.what());
@ -841,8 +840,7 @@ MySQLMetaImpl::GetCollectionFiles(const std::string& collection_id, const std::v
}
Status
MySQLMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id,
milvus::engine::meta::SegmentsSchema& collection_files) {
MySQLMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id, FilesHolder& files_holder) {
try {
mysqlpp::StoreQueryResult res;
{
@ -892,11 +890,11 @@ MySQLMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id,
file_schema.dimension_ = collection_schema.dimension_;
utils::GetCollectionFilePath(options_, file_schema);
collection_files.emplace_back(file_schema);
files_holder.MarkFile(file_schema);
}
}
LOG_ENGINE_DEBUG_ << "Get collection files by segment id";
LOG_ENGINE_DEBUG_ << "Get " << res.size() << " files by segment id " << segment_id;
return Status::OK();
} catch (std::exception& e) {
return HandleException("Failed to get collection files by segment id", e.what());
@ -1547,9 +1545,7 @@ MySQLMetaImpl::GetPartitionName(const std::string& collection_id, const std::str
}
Status
MySQLMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema& files) {
files.clear();
MySQLMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) {
try {
server::MetricCollector metric;
mysqlpp::StoreQueryResult res;
@ -1589,6 +1585,7 @@ MySQLMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema& f
}
Status ret;
int64_t files_count = 0;
for (auto& resRow : res) {
SegmentSchema collection_file;
collection_file.id_ = resRow["id"]; // implicit conversion
@ -1608,13 +1605,17 @@ MySQLMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema& f
auto status = utils::GetCollectionFilePath(options_, collection_file);
if (!status.ok()) {
ret = status;
continue;
}
files.emplace_back(collection_file);
files_holder.MarkFile(collection_file);
files_count++;
}
if (res.size() > 0) {
LOG_ENGINE_DEBUG_ << "Collect " << res.size() << " to-search files";
if (files_count == 0) {
LOG_ENGINE_DEBUG_ << "No file to search for collection: " << collection_id;
} else {
LOG_ENGINE_DEBUG_ << "Collect " << files_count << " to-search files in collection " << collection_id;
}
return ret;
} catch (std::exception& e) {
@ -1623,9 +1624,7 @@ MySQLMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema& f
}
Status
MySQLMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& files) {
files.clear();
MySQLMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) {
try {
server::MetricCollector metric;
@ -1663,7 +1662,7 @@ MySQLMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& fi
} // Scoped Connection
Status ret;
int64_t to_merge_files = 0;
int64_t files_count = 0;
for (auto& resRow : res) {
SegmentSchema collection_file;
collection_file.file_size_ = resRow["file_size"];
@ -1688,14 +1687,15 @@ MySQLMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& fi
auto status = utils::GetCollectionFilePath(options_, collection_file);
if (!status.ok()) {
ret = status;
continue;
}
files.emplace_back(collection_file);
++to_merge_files;
files_holder.MarkFile(collection_file);
files_count++;
}
if (to_merge_files > 0) {
LOG_ENGINE_TRACE_ << "Collect " << to_merge_files << " to-merge files";
if (files_count > 0) {
LOG_ENGINE_DEBUG_ << "Collect " << files_count << " to-merge files in collection " << collection_id;
}
return ret;
} catch (std::exception& e) {
@ -1704,9 +1704,7 @@ MySQLMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& fi
}
Status
MySQLMetaImpl::FilesToIndex(SegmentsSchema& files) {
files.clear();
MySQLMetaImpl::FilesToIndex(FilesHolder& files_holder) {
try {
server::MetricCollector metric;
mysqlpp::StoreQueryResult res;
@ -1735,9 +1733,10 @@ MySQLMetaImpl::FilesToIndex(SegmentsSchema& files) {
} // Scoped Connection
Status ret;
int64_t files_count = 0;
std::map<std::string, CollectionSchema> groups;
SegmentSchema collection_file;
for (auto& resRow : res) {
SegmentSchema collection_file;
collection_file.id_ = resRow["id"]; // implicit conversion
resRow["table_id"].to_string(collection_file.collection_id_);
resRow["segment_id"].to_string(collection_file.segment_id_);
@ -1769,11 +1768,12 @@ MySQLMetaImpl::FilesToIndex(SegmentsSchema& files) {
ret = status;
}
files.push_back(collection_file);
files_holder.MarkFile(collection_file);
files_count++;
}
if (res.size() > 0) {
LOG_ENGINE_DEBUG_ << "Collect " << res.size() << " to-index files";
if (files_count > 0) {
LOG_ENGINE_DEBUG_ << "Collect " << files_count << " to-index files";
}
return ret;
} catch (std::exception& e) {
@ -1783,16 +1783,13 @@ MySQLMetaImpl::FilesToIndex(SegmentsSchema& files) {
Status
MySQLMetaImpl::FilesByType(const std::string& collection_id, const std::vector<int>& file_types,
SegmentsSchema& files) {
FilesHolder& files_holder) {
if (file_types.empty()) {
return Status(DB_ERROR, "file types array is empty");
}
Status ret = Status::OK();
try {
files.clear();
mysqlpp::StoreQueryResult res;
{
mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_);
@ -1860,7 +1857,7 @@ MySQLMetaImpl::FilesByType(const std::string& collection_id, const std::vector<i
ret = status;
}
files.emplace_back(file_schema);
files_holder.MarkFile(file_schema);
int32_t file_type = resRow["file_type"];
switch (file_type) {
@ -1928,9 +1925,7 @@ MySQLMetaImpl::FilesByType(const std::string& collection_id, const std::vector<i
}
Status
MySQLMetaImpl::FilesByID(const std::vector<size_t>& ids, SegmentsSchema& files) {
files.clear();
MySQLMetaImpl::FilesByID(const std::vector<size_t>& ids, FilesHolder& files_holder) {
if (ids.empty()) {
return Status::OK();
}
@ -1977,6 +1972,7 @@ MySQLMetaImpl::FilesByID(const std::vector<size_t>& ids, SegmentsSchema& files)
std::map<std::string, meta::CollectionSchema> collections;
Status ret;
int64_t files_count = 0;
for (auto& resRow : res) {
SegmentSchema collection_file;
collection_file.id_ = resRow["id"]; // implicit conversion
@ -2002,11 +1998,14 @@ MySQLMetaImpl::FilesByID(const std::vector<size_t>& ids, SegmentsSchema& files)
auto status = utils::GetCollectionFilePath(options_, collection_file);
if (!status.ok()) {
ret = status;
continue;
}
files.emplace_back(collection_file);
files_holder.MarkFile(collection_file);
files_count++;
}
milvus::engine::meta::SegmentsSchema& files = files_holder.HoldFiles();
for (auto& collection_file : files) {
CollectionSchema& collection_schema = collections[collection_file.collection_id_];
collection_file.dimension_ = collection_schema.dimension_;
@ -2015,10 +2014,10 @@ MySQLMetaImpl::FilesByID(const std::vector<size_t>& ids, SegmentsSchema& files)
collection_file.metric_type_ = collection_schema.metric_type_;
}
if (files.empty()) {
if (files_count == 0) {
LOG_ENGINE_ERROR_ << "No file to search in file id list";
} else {
LOG_ENGINE_DEBUG_ << "Collect " << files.size() << " files by id";
LOG_ENGINE_DEBUG_ << "Collect " << files_count << " files by id";
}
return ret;
@ -2221,7 +2220,7 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/)
collection_file.file_type_ = resRow["file_type"];
// check if the file can be deleted
if (OngoingFileChecker::GetInstance().IsIgnored(collection_file)) {
if (!FilesHolder::CanBeDeleted(collection_file)) {
LOG_ENGINE_DEBUG_ << "File:" << collection_file.file_id_
<< " currently is in use, not able to delete now";
continue; // ignore this file, don't delete it

View File

@ -54,10 +54,10 @@ class MySQLMetaImpl : public Meta {
Status
GetCollectionFiles(const std::string& collection_id, const std::vector<size_t>& ids,
SegmentsSchema& collection_files) override;
FilesHolder& files_holder) override;
Status
GetCollectionFilesBySegmentId(const std::string& segment_id, SegmentsSchema& collection_files) override;
GetCollectionFilesBySegmentId(const std::string& segment_id, FilesHolder& files_holder) override;
Status
UpdateCollectionIndex(const std::string& collection_id, const CollectionIndex& index) override;
@ -104,19 +104,20 @@ class MySQLMetaImpl : public Meta {
GetPartitionName(const std::string& collection_id, const std::string& tag, std::string& partition_name) override;
Status
FilesToSearch(const std::string& collection_id, SegmentsSchema& files) override;
FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) override;
Status
FilesToMerge(const std::string& collection_id, SegmentsSchema& files) override;
FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) override;
Status
FilesToIndex(SegmentsSchema&) override;
FilesToIndex(FilesHolder& files_holder) override;
Status
FilesByType(const std::string& collection_id, const std::vector<int>& file_types, SegmentsSchema& files) override;
FilesByType(const std::string& collection_id, const std::vector<int>& file_types,
FilesHolder& files_holder) override;
Status
FilesByID(const std::vector<size_t>& ids, SegmentsSchema& collection_files) override;
FilesByID(const std::vector<size_t>& ids, FilesHolder& files_holder) override;
Status
Archive() override;

View File

@ -26,7 +26,6 @@
#include "MetaConsts.h"
#include "db/IDGenerator.h"
#include "db/OngoingFileChecker.h"
#include "db/Utils.h"
#include "metrics/Metrics.h"
#include "utils/CommonUtil.h"
@ -65,9 +64,11 @@ StoragePrototype(const std::string& path) {
return make_storage(
path,
make_table(META_ENVIRONMENT, make_column("global_lsn", &EnvironmentSchema::global_lsn_, default_value(0))),
make_table(META_TABLES, make_column("id", &CollectionSchema::id_, primary_key()),
make_table(META_TABLES,
make_column("id", &CollectionSchema::id_, primary_key()),
make_column("table_id", &CollectionSchema::collection_id_, unique()),
make_column("state", &CollectionSchema::state_), make_column("dimension", &CollectionSchema::dimension_),
make_column("state", &CollectionSchema::state_),
make_column("dimension", &CollectionSchema::dimension_),
make_column("created_on", &CollectionSchema::created_on_),
make_column("flag", &CollectionSchema::flag_, default_value(0)),
make_column("index_file_size", &CollectionSchema::index_file_size_),
@ -257,10 +258,12 @@ SqliteMetaImpl::CreateCollection(CollectionSchema& collection_schema) {
} else {
fiu_do_on("SqliteMetaImpl.CreateCollection.throw_exception", throw std::exception());
auto collection = ConnectorPtr->select(columns(&CollectionSchema::state_),
where(c(&CollectionSchema::collection_id_) == collection_schema.collection_id_));
where(c(&CollectionSchema::collection_id_)
== collection_schema.collection_id_));
if (collection.size() == 1) {
if (CollectionSchema::TO_DELETE == std::get<0>(collection[0])) {
return Status(DB_ERROR, "Collection already exists and it is in delete state, please wait a second");
return Status(DB_ERROR,
"Collection already exists and it is in delete state, please wait a second");
} else {
// Change from no error to already exist.
return Status(DB_ALREADY_EXIST, "Collection already exists");
@ -296,10 +299,19 @@ SqliteMetaImpl::DescribeCollection(CollectionSchema& collection_schema) {
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
fiu_do_on("SqliteMetaImpl.DescribeCollection.throw_exception", throw std::exception());
auto groups = ConnectorPtr->select(
columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, &CollectionSchema::created_on_,
&CollectionSchema::flag_, &CollectionSchema::index_file_size_, &CollectionSchema::engine_type_,
&CollectionSchema::index_params_, &CollectionSchema::metric_type_, &CollectionSchema::owner_collection_,
&CollectionSchema::partition_tag_, &CollectionSchema::version_, &CollectionSchema::flush_lsn_),
columns(&CollectionSchema::id_,
&CollectionSchema::state_,
&CollectionSchema::dimension_,
&CollectionSchema::created_on_,
&CollectionSchema::flag_,
&CollectionSchema::index_file_size_,
&CollectionSchema::engine_type_,
&CollectionSchema::index_params_,
&CollectionSchema::metric_type_,
&CollectionSchema::owner_collection_,
&CollectionSchema::partition_tag_,
&CollectionSchema::version_,
&CollectionSchema::flush_lsn_),
where(c(&CollectionSchema::collection_id_) == collection_schema.collection_id_ and
c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
@ -339,7 +351,8 @@ SqliteMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto collections = ConnectorPtr->select(
columns(&CollectionSchema::id_),
where(c(&CollectionSchema::collection_id_) == collection_id and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
where(c(&CollectionSchema::collection_id_) == collection_id
and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
if (collections.size() == 1) {
has_or_not = true;
} else {
@ -361,11 +374,21 @@ SqliteMetaImpl::AllCollections(std::vector<CollectionSchema>& collection_schema_
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto selected = ConnectorPtr->select(
columns(&CollectionSchema::id_, &CollectionSchema::collection_id_, &CollectionSchema::dimension_, &CollectionSchema::created_on_,
&CollectionSchema::flag_, &CollectionSchema::index_file_size_, &CollectionSchema::engine_type_,
&CollectionSchema::index_params_, &CollectionSchema::metric_type_, &CollectionSchema::owner_collection_,
&CollectionSchema::partition_tag_, &CollectionSchema::version_, &CollectionSchema::flush_lsn_),
where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE and c(&CollectionSchema::owner_collection_) == ""));
columns(&CollectionSchema::id_,
&CollectionSchema::collection_id_,
&CollectionSchema::dimension_,
&CollectionSchema::created_on_,
&CollectionSchema::flag_,
&CollectionSchema::index_file_size_,
&CollectionSchema::engine_type_,
&CollectionSchema::index_params_,
&CollectionSchema::metric_type_,
&CollectionSchema::owner_collection_,
&CollectionSchema::partition_tag_,
&CollectionSchema::version_,
&CollectionSchema::flush_lsn_),
where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE
and c(&CollectionSchema::owner_collection_) == ""));
for (auto& collection : selected) {
CollectionSchema schema;
schema.id_ = std::get<0>(collection);
@ -404,7 +427,8 @@ SqliteMetaImpl::DropCollection(const std::string& collection_id) {
// soft delete collection
ConnectorPtr->update_all(
set(c(&CollectionSchema::state_) = (int)CollectionSchema::TO_DELETE),
where(c(&CollectionSchema::collection_id_) == collection_id and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
where(c(&CollectionSchema::collection_id_) == collection_id
and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
LOG_ENGINE_DEBUG_ << "Successfully delete collection, collection id = " << collection_id;
} catch (std::exception& e) {
@ -486,12 +510,10 @@ SqliteMetaImpl::CreateCollectionFile(SegmentSchema& file_schema) {
Status
SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std::vector<size_t>& ids,
SegmentsSchema& collection_files) {
FilesHolder& files_holder) {
try {
fiu_do_on("SqliteMetaImpl.GetCollectionFiles.throw_exception", throw std::exception());
collection_files.clear();
CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_id;
auto status = DescribeCollection(collection_schema);
@ -499,16 +521,23 @@ SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std::
return status;
}
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_,
&SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_,
&SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_);
auto select_columns = columns(&SegmentSchema::id_,
&SegmentSchema::segment_id_,
&SegmentSchema::file_id_,
&SegmentSchema::file_type_,
&SegmentSchema::file_size_,
&SegmentSchema::row_count_,
&SegmentSchema::date_,
&SegmentSchema::engine_type_,
&SegmentSchema::created_on_);
decltype(ConnectorPtr->select(select_columns)) selected;
{
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns,
where(c(&SegmentSchema::collection_id_) == collection_id and in(&SegmentSchema::id_, ids) and
c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE));
where(c(&SegmentSchema::collection_id_) == collection_id
and in(&SegmentSchema::id_, ids) and
c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE));
}
Status result;
@ -531,10 +560,10 @@ SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std::
utils::GetCollectionFilePath(options_, file_schema);
collection_files.emplace_back(file_schema);
files_holder.MarkFile(file_schema);
}
LOG_ENGINE_DEBUG_ << "Get collection files by id";
LOG_ENGINE_DEBUG_ << "Get " << selected.size() << " files by id from collection " << collection_id;
return result;
} catch (std::exception& e) {
return HandleException("Encounter exception when lookup collection files", e.what());
@ -543,10 +572,8 @@ SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std::
Status
SqliteMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id,
milvus::engine::meta::SegmentsSchema& collection_files) {
FilesHolder& files_holder) {
try {
collection_files.clear();
auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_,
&SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_,
&SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_,
@ -556,8 +583,8 @@ SqliteMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id,
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns,
where(c(&SegmentSchema::segment_id_) == segment_id and
c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE));
where(c(&SegmentSchema::segment_id_) == segment_id and
c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE));
}
if (!selected.empty()) {
@ -586,11 +613,11 @@ SqliteMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id,
file_schema.metric_type_ = collection_schema.metric_type_;
utils::GetCollectionFilePath(options_, file_schema);
collection_files.emplace_back(file_schema);
files_holder.MarkFile(file_schema);
}
}
LOG_ENGINE_DEBUG_ << "Get collection files by segment id";
LOG_ENGINE_DEBUG_ << "Get " << selected.size() << " files by segment id" << segment_id;
return Status::OK();
} catch (std::exception& e) {
return HandleException("Encounter exception when lookup collection files by segment id", e.what());
@ -607,7 +634,8 @@ SqliteMetaImpl::UpdateCollectionFlag(const std::string& collection_id, int64_t f
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
// set all backup file to raw
ConnectorPtr->update_all(set(c(&CollectionSchema::flag_) = flag), where(c(&CollectionSchema::collection_id_) == collection_id));
ConnectorPtr->update_all(set(c(&CollectionSchema::flag_) = flag),
where(c(&CollectionSchema::collection_id_) == collection_id));
LOG_ENGINE_DEBUG_ << "Successfully update collection flag, collection id = " << collection_id;
} catch (std::exception& e) {
std::string msg = "Encounter exception when update collection flag: collection_id = " + collection_id;
@ -627,7 +655,8 @@ SqliteMetaImpl::UpdateCollectionFlushLSN(const std::string& collection_id, uint6
ConnectorPtr->update_all(set(c(&CollectionSchema::flush_lsn_) = flush_lsn),
where(c(&CollectionSchema::collection_id_) == collection_id));
LOG_ENGINE_DEBUG_ << "Successfully update collection flush_lsn, collection id = " << collection_id << " flush_lsn = " << flush_lsn;;
LOG_ENGINE_DEBUG_ << "Successfully update collection flush_lsn, collection id = " << collection_id
<< " flush_lsn = " << flush_lsn;;
} catch (std::exception& e) {
std::string msg = "Encounter exception when update collection lsn: collection_id = " + collection_id;
return HandleException(msg, e.what());
@ -645,7 +674,8 @@ SqliteMetaImpl::GetCollectionFlushLSN(const std::string& collection_id, uint64_t
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto selected =
ConnectorPtr->select(columns(&CollectionSchema::flush_lsn_), where(c(&CollectionSchema::collection_id_) == collection_id));
ConnectorPtr->select(columns(&CollectionSchema::flush_lsn_),
where(c(&CollectionSchema::collection_id_) == collection_id));
if (selected.size() > 0) {
flush_lsn = std::get<0>(selected[0]);
@ -671,7 +701,8 @@ SqliteMetaImpl::UpdateCollectionFile(SegmentSchema& file_schema) {
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto collections = ConnectorPtr->select(columns(&CollectionSchema::state_),
where(c(&CollectionSchema::collection_id_) == file_schema.collection_id_));
where(c(&CollectionSchema::collection_id_)
== file_schema.collection_id_));
// if the collection has been deleted, just mark the collection file as TO_DELETE
// clean thread will delete the file later
@ -684,7 +715,8 @@ SqliteMetaImpl::UpdateCollectionFile(SegmentSchema& file_schema) {
LOG_ENGINE_DEBUG_ << "Update single collection file, file id = " << file_schema.file_id_;
} catch (std::exception& e) {
std::string msg =
"Exception update collection file: collection_id = " + file_schema.collection_id_ + " file_id = " + file_schema.file_id_;
"Exception update collection file: collection_id = " + file_schema.collection_id_ + " file_id = "
+ file_schema.file_id_;
return HandleException(msg, e.what());
}
return Status::OK();
@ -705,8 +737,10 @@ SqliteMetaImpl::UpdateCollectionFiles(SegmentsSchema& files) {
continue;
}
auto collections = ConnectorPtr->select(columns(&CollectionSchema::id_),
where(c(&CollectionSchema::collection_id_) == file.collection_id_ and
c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
where(
c(&CollectionSchema::collection_id_) == file.collection_id_ and
c(&CollectionSchema::state_)
!= (int)CollectionSchema::TO_DELETE));
if (collections.size() >= 1) {
has_collections[file.collection_id_] = true;
} else {
@ -769,10 +803,18 @@ SqliteMetaImpl::UpdateCollectionIndex(const std::string& collection_id, const Co
auto collections = ConnectorPtr->select(
columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, &CollectionSchema::created_on_,
&CollectionSchema::flag_, &CollectionSchema::index_file_size_, &CollectionSchema::owner_collection_,
&CollectionSchema::partition_tag_, &CollectionSchema::version_, &CollectionSchema::flush_lsn_),
where(c(&CollectionSchema::collection_id_) == collection_id and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
columns(&CollectionSchema::id_,
&CollectionSchema::state_,
&CollectionSchema::dimension_,
&CollectionSchema::created_on_,
&CollectionSchema::flag_,
&CollectionSchema::index_file_size_,
&CollectionSchema::owner_collection_,
&CollectionSchema::partition_tag_,
&CollectionSchema::version_,
&CollectionSchema::flush_lsn_),
where(c(&CollectionSchema::collection_id_) == collection_id
and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
if (collections.size() > 0) {
meta::CollectionSchema collection_schema;
@ -841,7 +883,8 @@ SqliteMetaImpl::DescribeCollectionIndex(const std::string& collection_id, Collec
auto groups = ConnectorPtr->select(
columns(&CollectionSchema::engine_type_, &CollectionSchema::index_params_, &CollectionSchema::metric_type_),
where(c(&CollectionSchema::collection_id_) == collection_id and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
where(c(&CollectionSchema::collection_id_) == collection_id
and c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
if (groups.size() == 1) {
index.engine_type_ = std::get<0>(groups[0]);
@ -902,7 +945,9 @@ SqliteMetaImpl::DropCollectionIndex(const std::string& collection_id) {
}
Status
SqliteMetaImpl::CreatePartition(const std::string& collection_id, const std::string& partition_name, const std::string& tag,
SqliteMetaImpl::CreatePartition(const std::string& collection_id,
const std::string& partition_name,
const std::string& tag,
uint64_t lsn) {
USING_SQLITE_WARNING
server::MetricCollector metric;
@ -959,16 +1004,25 @@ SqliteMetaImpl::DropPartition(const std::string& partition_name) {
}
Status
SqliteMetaImpl::ShowPartitions(const std::string& collection_id, std::vector<meta::CollectionSchema>& partition_schema_array) {
SqliteMetaImpl::ShowPartitions(const std::string& collection_id,
std::vector<meta::CollectionSchema>& partition_schema_array) {
try {
server::MetricCollector metric;
fiu_do_on("SqliteMetaImpl.ShowPartitions.throw_exception", throw std::exception());
auto partitions = ConnectorPtr->select(
columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, &CollectionSchema::created_on_,
&CollectionSchema::flag_, &CollectionSchema::index_file_size_, &CollectionSchema::engine_type_,
&CollectionSchema::index_params_, &CollectionSchema::metric_type_, &CollectionSchema::partition_tag_,
&CollectionSchema::version_, &CollectionSchema::collection_id_),
columns(&CollectionSchema::id_,
&CollectionSchema::state_,
&CollectionSchema::dimension_,
&CollectionSchema::created_on_,
&CollectionSchema::flag_,
&CollectionSchema::index_file_size_,
&CollectionSchema::engine_type_,
&CollectionSchema::index_params_,
&CollectionSchema::metric_type_,
&CollectionSchema::partition_tag_,
&CollectionSchema::version_,
&CollectionSchema::collection_id_),
where(c(&CollectionSchema::owner_collection_) == collection_id and
c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
@ -997,7 +1051,9 @@ SqliteMetaImpl::ShowPartitions(const std::string& collection_id, std::vector<met
}
Status
SqliteMetaImpl::GetPartitionName(const std::string& collection_id, const std::string& tag, std::string& partition_name) {
SqliteMetaImpl::GetPartitionName(const std::string& collection_id,
const std::string& tag,
std::string& partition_name) {
try {
server::MetricCollector metric;
fiu_do_on("SqliteMetaImpl.GetPartitionName.throw_exception", throw std::exception());
@ -1009,7 +1065,8 @@ SqliteMetaImpl::GetPartitionName(const std::string& collection_id, const std::st
auto name = ConnectorPtr->select(
columns(&CollectionSchema::collection_id_),
where(c(&CollectionSchema::owner_collection_) == collection_id and c(&CollectionSchema::partition_tag_) == valid_tag and
where(c(&CollectionSchema::owner_collection_) == collection_id
and c(&CollectionSchema::partition_tag_) == valid_tag and
c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));
if (name.size() > 0) {
partition_name = std::get<0>(name[0]);
@ -1024,9 +1081,7 @@ SqliteMetaImpl::GetPartitionName(const std::string& collection_id, const std::st
}
Status
SqliteMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema& files) {
files.clear();
SqliteMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) {
try {
server::MetricCollector metric;
fiu_do_on("SqliteMetaImpl.FilesToSearch.throw_exception", throw std::exception());
@ -1058,6 +1113,7 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema&
}
Status ret;
int64_t files_count = 0;
for (auto& file : selected) {
SegmentSchema collection_file;
collection_file.id_ = std::get<0>(file);
@ -1077,16 +1133,16 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema&
auto status = utils::GetCollectionFilePath(options_, collection_file);
if (!status.ok()) {
ret = status;
continue;
}
files.emplace_back(collection_file);
files_holder.MarkFile(collection_file);
files_count++;
}
if (files.empty()) {
LOG_ENGINE_ERROR_ << "No file to search for collection: " << collection_id;
}
if (selected.size() > 0) {
LOG_ENGINE_DEBUG_ << "Collect " << selected.size() << " to-search files";
if (files_count == 0) {
LOG_ENGINE_DEBUG_ << "No file to search for collection: " << collection_id;
} else {
LOG_ENGINE_DEBUG_ << "Collect " << files_count << " to-search files in collection " << collection_id;
}
return ret;
} catch (std::exception& e) {
@ -1095,9 +1151,7 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, SegmentsSchema&
}
Status
SqliteMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& files) {
files.clear();
SqliteMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) {
try {
fiu_do_on("SqliteMetaImpl.FilesToMerge.throw_exception", throw std::exception());
@ -1120,13 +1174,13 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& f
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns,
where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::RAW and
c(&SegmentSchema::collection_id_) == collection_id),
order_by(&SegmentSchema::file_size_).desc());
where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::RAW and
c(&SegmentSchema::collection_id_) == collection_id),
order_by(&SegmentSchema::file_size_).desc());
}
Status result;
int64_t to_merge_files = 0;
int64_t files_count = 0;
for (auto& file : selected) {
SegmentSchema collection_file;
collection_file.file_size_ = std::get<5>(file);
@ -1152,12 +1206,12 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& f
result = status;
}
files.emplace_back(collection_file);
++to_merge_files;
files_holder.MarkFile(collection_file);
files_count++;
}
if (to_merge_files > 0) {
LOG_ENGINE_TRACE_ << "Collect " << to_merge_files << " to-merge files";
if (files_count > 0) {
LOG_ENGINE_DEBUG_ << "Collect " << files_count << " to-merge files in collection " << collection_id;
}
return result;
} catch (std::exception& e) {
@ -1166,9 +1220,7 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, SegmentsSchema& f
}
Status
SqliteMetaImpl::FilesToIndex(SegmentsSchema& files) {
files.clear();
SqliteMetaImpl::FilesToIndex(FilesHolder& files_holder) {
try {
fiu_do_on("SqliteMetaImpl.FilesToIndex.throw_exception", throw std::exception());
@ -1183,14 +1235,14 @@ SqliteMetaImpl::FilesToIndex(SegmentsSchema& files) {
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns,
where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::TO_INDEX));
where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::TO_INDEX));
}
std::map<std::string, CollectionSchema> groups;
SegmentSchema collection_file;
Status ret;
int64_t files_count = 0;
std::map<std::string, CollectionSchema> groups;
for (auto& file : selected) {
SegmentSchema collection_file;
collection_file.id_ = std::get<0>(file);
collection_file.collection_id_ = std::get<1>(file);
collection_file.segment_id_ = std::get<2>(file);
@ -1222,11 +1274,13 @@ SqliteMetaImpl::FilesToIndex(SegmentsSchema& files) {
collection_file.index_file_size_ = groups[collection_file.collection_id_].index_file_size_;
collection_file.index_params_ = groups[collection_file.collection_id_].index_params_;
collection_file.metric_type_ = groups[collection_file.collection_id_].metric_type_;
files.push_back(collection_file);
files_holder.MarkFile(collection_file);
files_count++;
}
if (selected.size() > 0) {
LOG_ENGINE_DEBUG_ << "Collect " << selected.size() << " to-index files";
if (files_count > 0) {
LOG_ENGINE_DEBUG_ << "Collect " << files_count << " to-index files";
}
return ret;
} catch (std::exception& e) {
@ -1235,7 +1289,9 @@ SqliteMetaImpl::FilesToIndex(SegmentsSchema& files) {
}
Status
SqliteMetaImpl::FilesByType(const std::string& collection_id, const std::vector<int>& file_types, SegmentsSchema& files) {
SqliteMetaImpl::FilesByType(const std::string& collection_id,
const std::vector<int>& file_types,
FilesHolder& files_holder) {
if (file_types.empty()) {
return Status(DB_ERROR, "file types array is empty");
}
@ -1244,7 +1300,6 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, const std::vector<
try {
fiu_do_on("SqliteMetaImpl.FilesByType.throw_exception", throw std::exception());
files.clear();
CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_id;
@ -1267,7 +1322,8 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, const std::vector<
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns,
where(in(&SegmentSchema::file_type_, file_types) and c(&SegmentSchema::collection_id_) == collection_id));
where(in(&SegmentSchema::file_type_, file_types)
and c(&SegmentSchema::collection_id_) == collection_id));
}
if (selected.size() >= 1) {
@ -1314,7 +1370,7 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, const std::vector<
ret = status;
}
files.emplace_back(file_schema);
files_holder.MarkFile(file_schema);
}
std::string msg = "Get collection files by type.";
@ -1351,9 +1407,7 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, const std::vector<
}
Status
SqliteMetaImpl::FilesByID(const std::vector<size_t>& ids, SegmentsSchema& files) {
files.clear();
SqliteMetaImpl::FilesByID(const std::vector<size_t>& ids, FilesHolder& files_holder) {
if (ids.empty()) {
return Status::OK();
}
@ -1383,6 +1437,7 @@ SqliteMetaImpl::FilesByID(const std::vector<size_t>& ids, SegmentsSchema& files)
std::map<std::string, meta::CollectionSchema> collections;
Status ret;
int64_t files_count = 0;
for (auto& file : selected) {
SegmentSchema collection_file;
collection_file.id_ = std::get<0>(file);
@ -1410,9 +1465,11 @@ SqliteMetaImpl::FilesByID(const std::vector<size_t>& ids, SegmentsSchema& files)
ret = status;
}
files.emplace_back(collection_file);
files_holder.MarkFile(collection_file);
files_count++;
}
milvus::engine::meta::SegmentsSchema& files = files_holder.HoldFiles();
for (auto& collection_file : files) {
CollectionSchema& collection_schema = collections[collection_file.collection_id_];
collection_file.dimension_ = collection_schema.dimension_;
@ -1421,10 +1478,10 @@ SqliteMetaImpl::FilesByID(const std::vector<size_t>& ids, SegmentsSchema& files)
collection_file.metric_type_ = collection_schema.metric_type_;
}
if (files.empty()) {
if (files_count == 0) {
LOG_ENGINE_ERROR_ << "No file to search in file id list";
} else {
LOG_ENGINE_DEBUG_ << "Collect " << selected.size() << " files by id";
LOG_ENGINE_DEBUG_ << "Collect " << files_count << " files by id";
}
return ret;
@ -1576,9 +1633,9 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/
collection_file.date_ = std::get<6>(file);
// check if the file can be deleted
if (OngoingFileChecker::GetInstance().IsIgnored(collection_file)) {
if (!FilesHolder::CanBeDeleted(collection_file)) {
LOG_ENGINE_DEBUG_ << "File:" << collection_file.file_id_
<< " currently is in use, not able to delete now";
<< " currently is in use, not able to delete now";
continue; // ignore this file, don't delete it
}
@ -1596,7 +1653,7 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/
utils::DeleteCollectionFilePath(options_, collection_file);
LOG_ENGINE_DEBUG_ << "Remove file id:" << collection_file.file_id_ << " location:"
<< collection_file.location_;
<< collection_file.location_;
collection_ids.insert(collection_file.collection_id_);
segment_ids.insert(std::make_pair(collection_file.segment_id_, collection_file));
@ -1627,7 +1684,8 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
auto collections = ConnectorPtr->select(columns(&CollectionSchema::id_, &CollectionSchema::collection_id_),
where(c(&CollectionSchema::state_) == (int)CollectionSchema::TO_DELETE));
where(
c(&CollectionSchema::state_) == (int)CollectionSchema::TO_DELETE));
auto commited = ConnectorPtr->transaction([&]() mutable {
for (auto& collection : collections) {
@ -1717,7 +1775,8 @@ SqliteMetaImpl::Count(const std::string& collection_id, uint64_t& result) {
// multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
selected = ConnectorPtr->select(select_columns,
where(in(&SegmentSchema::file_type_, file_types) and c(&SegmentSchema::collection_id_) == collection_id));
where(in(&SegmentSchema::file_type_, file_types)
and c(&SegmentSchema::collection_id_) == collection_id));
}
CollectionSchema collection_schema;
@ -1784,7 +1843,7 @@ SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) {
collection_file.file_size_ = std::get<1>(file);
ids.push_back(collection_file.id_);
LOG_ENGINE_DEBUG_ << "Discard file id=" << collection_file.file_id_
<< " file size=" << collection_file.file_size_;
<< " file size=" << collection_file.file_size_;
to_discard_size -= collection_file.file_size_;
}
@ -1874,7 +1933,8 @@ SqliteMetaImpl::CreateHybridCollection(meta::CollectionSchema& collection_schema
== collection_schema.collection_id_));
if (collection.size() == 1) {
if (CollectionSchema::TO_DELETE == std::get<0>(collection[0])) {
return Status(DB_ERROR, "Collection already exists and it is in delete state, please wait a second");
return Status(DB_ERROR,
"Collection already exists and it is in delete state, please wait a second");
} else {
// Change from no error to already exist.
return Status(DB_ALREADY_EXIST, "Collection already exists");
@ -1923,10 +1983,19 @@ SqliteMetaImpl::DescribeHybridCollection(milvus::engine::meta::CollectionSchema&
server::MetricCollector metric;
fiu_do_on("SqliteMetaImpl.DescriCollection.throw_exception", throw std::exception());
auto groups = ConnectorPtr->select(
columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, &CollectionSchema::created_on_,
&CollectionSchema::flag_, &CollectionSchema::index_file_size_, &CollectionSchema::engine_type_,
&CollectionSchema::index_params_, &CollectionSchema::metric_type_, &CollectionSchema::owner_collection_,
&CollectionSchema::partition_tag_, &CollectionSchema::version_, &CollectionSchema::flush_lsn_),
columns(&CollectionSchema::id_,
&CollectionSchema::state_,
&CollectionSchema::dimension_,
&CollectionSchema::created_on_,
&CollectionSchema::flag_,
&CollectionSchema::index_file_size_,
&CollectionSchema::engine_type_,
&CollectionSchema::index_params_,
&CollectionSchema::metric_type_,
&CollectionSchema::owner_collection_,
&CollectionSchema::partition_tag_,
&CollectionSchema::version_,
&CollectionSchema::flush_lsn_),
where(c(&CollectionSchema::collection_id_) == collection_schema.collection_id_ and
c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE));

View File

@ -56,10 +56,10 @@ class SqliteMetaImpl : public Meta {
Status
GetCollectionFiles(const std::string& collection_id, const std::vector<size_t>& ids,
SegmentsSchema& collection_files) override;
FilesHolder& files_holder) override;
Status
GetCollectionFilesBySegmentId(const std::string& segment_id, SegmentsSchema& collection_files) override;
GetCollectionFilesBySegmentId(const std::string& segment_id, FilesHolder& files_holder) override;
Status
UpdateCollectionIndex(const std::string& collection_id, const CollectionIndex& index) override;
@ -106,19 +106,20 @@ class SqliteMetaImpl : public Meta {
GetPartitionName(const std::string& collection_id, const std::string& tag, std::string& partition_name) override;
Status
FilesToSearch(const std::string& collection_id, SegmentsSchema& files) override;
FilesToSearch(const std::string& collection_id, FilesHolder& files_holder) override;
Status
FilesToMerge(const std::string& collection_id, SegmentsSchema& files) override;
FilesToMerge(const std::string& collection_id, FilesHolder& files_holder) override;
Status
FilesToIndex(SegmentsSchema&) override;
FilesToIndex(FilesHolder& files_holder) override;
Status
FilesByType(const std::string& collection_id, const std::vector<int>& file_types, SegmentsSchema& files) override;
FilesByType(const std::string& collection_id, const std::vector<int>& file_types,
FilesHolder& files_holder) override;
Status
FilesByID(const std::vector<size_t>& ids, SegmentsSchema& files) override;
FilesByID(const std::vector<size_t>& ids, FilesHolder& files_holder) override;
Status
Size(uint64_t& result) override;

View File

@ -11,6 +11,7 @@
#include "metrics/SystemInfo.h"
#include "thirdparty/nlohmann/json.hpp"
#include "utils/Exception.h"
#include "utils/Log.h"
#include <dirent.h>
@ -38,24 +39,32 @@ SystemInfo::Init() {
initialized_ = true;
// initialize CPU information
FILE* file;
struct tms time_sample;
char line[128];
last_cpu_ = times(&time_sample);
last_sys_cpu_ = time_sample.tms_stime;
last_user_cpu_ = time_sample.tms_utime;
file = fopen("/proc/cpuinfo", "r");
num_processors_ = 0;
while (fgets(line, 128, file) != nullptr) {
if (strncmp(line, "processor", 9) == 0) {
num_processors_++;
}
if (strncmp(line, "physical", 8) == 0) {
num_physical_processors_ = ParseLine(line);
try {
struct tms time_sample;
char line[128];
last_cpu_ = times(&time_sample);
last_sys_cpu_ = time_sample.tms_stime;
last_user_cpu_ = time_sample.tms_utime;
num_processors_ = 0;
FILE* file = fopen("/proc/cpuinfo", "r");
if (file) {
while (fgets(line, 128, file) != nullptr) {
if (strncmp(line, "processor", 9) == 0) {
num_processors_++;
}
if (strncmp(line, "physical", 8) == 0) {
num_physical_processors_ = ParseLine(line);
}
}
fclose(file);
} else {
LOG_SERVER_ERROR_ << "Failed to read /proc/cpuinfo";
}
total_ram_ = GetPhysicalMemory();
} catch (std::exception& ex) {
std::string msg = "Failed to read /proc/cpuinfo, reason: " + std::string(ex.what());
LOG_SERVER_ERROR_ << msg;
}
total_ram_ = GetPhysicalMemory();
fclose(file);
#ifdef MILVUS_GPU_VERSION
// initialize GPU information
@ -75,10 +84,15 @@ SystemInfo::Init() {
#endif
// initialize network traffic information
std::pair<uint64_t, uint64_t> in_and_out_octets = Octets();
in_octets_ = in_and_out_octets.first;
out_octets_ = in_and_out_octets.second;
net_time_ = std::chrono::system_clock::now();
try {
std::pair<uint64_t, uint64_t> in_and_out_octets = Octets();
in_octets_ = in_and_out_octets.first;
out_octets_ = in_and_out_octets.second;
net_time_ = std::chrono::system_clock::now();
} catch (std::exception& ex) {
std::string msg = "Failed to initialize network traffic information, reason: " + std::string(ex.what());
LOG_SERVER_ERROR_ << msg;
}
}
uint64_t
@ -101,27 +115,39 @@ SystemInfo::GetPhysicalMemory() {
uint64_t totalPhysMem = memInfo.totalram;
// Multiply in next statement to avoid int overflow on right hand side...
totalPhysMem *= memInfo.mem_unit;
return totalPhysMem;
}
uint64_t
SystemInfo::GetProcessUsedMemory() {
// Note: this value is in KB!
FILE* file = fopen("/proc/self/status", "r");
constexpr uint64_t line_length = 128;
uint64_t result = -1;
constexpr uint64_t KB_SIZE = 1024;
char line[line_length];
try {
// Note: this value is in KB!
FILE* file = fopen("/proc/self/status", "r");
uint64_t result = 0;
constexpr uint64_t KB_SIZE = 1024;
if (file) {
constexpr uint64_t line_length = 128;
char line[line_length];
while (fgets(line, line_length, file) != nullptr) {
if (strncmp(line, "VmRSS:", 6) == 0) {
result = ParseLine(line);
break;
while (fgets(line, line_length, file) != nullptr) {
if (strncmp(line, "VmRSS:", 6) == 0) {
result = ParseLine(line);
break;
}
}
fclose(file);
} else {
LOG_SERVER_ERROR_ << "Failed to read /proc/self/status";
}
// return value in Byte
return (result * KB_SIZE);
} catch (std::exception& ex) {
std::string msg = "Failed to read /proc/self/status, reason: " + std::string(ex.what());
LOG_SERVER_ERROR_ << msg;
return 0;
}
fclose(file);
// return value in Byte
return (result * KB_SIZE);
}
double
@ -155,34 +181,40 @@ SystemInfo::CPUCorePercent() {
std::vector<uint64_t>
SystemInfo::getTotalCpuTime(std::vector<uint64_t>& work_time_array) {
std::vector<uint64_t> total_time_array;
FILE* file = fopen("/proc/stat", "r");
fiu_do_on("SystemInfo.getTotalCpuTime.open_proc", file = NULL);
if (file == NULL) {
LOG_SERVER_ERROR_ << "Could not open stat file";
return total_time_array;
}
uint64_t user = 0, nice = 0, system = 0, idle = 0;
uint64_t iowait = 0, irq = 0, softirq = 0, steal = 0, guest = 0, guestnice = 0;
for (int i = 0; i < num_processors_; i++) {
char buffer[1024];
char* ret = fgets(buffer, sizeof(buffer) - 1, file);
fiu_do_on("SystemInfo.getTotalCpuTime.read_proc", ret = NULL);
if (ret == NULL) {
LOG_SERVER_ERROR_ << "Could not read stat file";
fclose(file);
try {
FILE* file = fopen("/proc/stat", "r");
fiu_do_on("SystemInfo.getTotalCpuTime.open_proc", file = NULL);
if (file == NULL) {
LOG_SERVER_ERROR_ << "Failed to read /proc/stat";
return total_time_array;
}
sscanf(buffer, "cpu %16lu %16lu %16lu %16lu %16lu %16lu %16lu %16lu %16lu %16lu", &user, &nice, &system, &idle,
&iowait, &irq, &softirq, &steal, &guest, &guestnice);
uint64_t user = 0, nice = 0, system = 0, idle = 0;
uint64_t iowait = 0, irq = 0, softirq = 0, steal = 0, guest = 0, guestnice = 0;
work_time_array.push_back(user + nice + system);
total_time_array.push_back(user + nice + system + idle + iowait + irq + softirq + steal);
for (int i = 0; i < num_processors_; i++) {
char buffer[1024];
char* ret = fgets(buffer, sizeof(buffer) - 1, file);
fiu_do_on("SystemInfo.getTotalCpuTime.read_proc", ret = NULL);
if (ret == NULL) {
LOG_SERVER_ERROR_ << "Could not read stat file";
fclose(file);
return total_time_array;
}
sscanf(buffer, "cpu %16lu %16lu %16lu %16lu %16lu %16lu %16lu %16lu %16lu %16lu", &user, &nice, &system,
&idle, &iowait, &irq, &softirq, &steal, &guest, &guestnice);
work_time_array.push_back(user + nice + system);
total_time_array.push_back(user + nice + system + idle + iowait + irq + softirq + steal);
}
fclose(file);
} catch (std::exception& ex) {
std::string msg = "Failed to read /proc/stat, reason: " + std::string(ex.what());
LOG_SERVER_ERROR_ << msg;
}
fclose(file);
return total_time_array;
}
@ -260,39 +292,43 @@ std::vector<float>
SystemInfo::CPUTemperature() {
std::vector<float> result;
std::string path = "/sys/class/hwmon/";
try {
DIR* dir = opendir(path.c_str());
fiu_do_on("SystemInfo.CPUTemperature.opendir", dir = NULL);
if (!dir) {
LOG_SERVER_ERROR_ << "Could not open hwmon directory";
return result;
}
DIR* dir = NULL;
dir = opendir(path.c_str());
fiu_do_on("SystemInfo.CPUTemperature.opendir", dir = NULL);
if (!dir) {
LOG_SERVER_ERROR_ << "Could not open hwmon directory";
return result;
}
struct dirent* ptr = NULL;
while ((ptr = readdir(dir)) != NULL) {
std::string filename(path);
filename.append(ptr->d_name);
struct dirent* ptr = NULL;
while ((ptr = readdir(dir)) != NULL) {
std::string filename(path);
filename.append(ptr->d_name);
char buf[100];
if (readlink(filename.c_str(), buf, 100) != -1) {
std::string m(buf);
if (m.find("coretemp") != std::string::npos) {
std::string object = filename;
object += "/temp1_input";
FILE* file = fopen(object.c_str(), "r");
fiu_do_on("SystemInfo.CPUTemperature.openfile", file = NULL);
if (file == nullptr) {
LOG_SERVER_ERROR_ << "Could not open temperature file";
return result;
char buf[100];
if (readlink(filename.c_str(), buf, 100) != -1) {
std::string m(buf);
if (m.find("coretemp") != std::string::npos) {
std::string object = filename;
object += "/temp1_input";
FILE* file = fopen(object.c_str(), "r");
fiu_do_on("SystemInfo.CPUTemperature.openfile", file = NULL);
if (file == nullptr) {
LOG_SERVER_ERROR_ << "Could not open temperature file";
return result;
}
float temp;
fscanf(file, "%f", &temp);
result.push_back(temp / 1000);
}
float temp;
fscanf(file, "%f", &temp);
result.push_back(temp / 1000);
}
}
closedir(dir);
} catch (std::exception& ex) {
std::string msg = "Failed to get cpu temperature, reason: " + std::string(ex.what());
LOG_SERVER_ERROR_ << msg;
}
closedir(dir);
return result;
}

View File

@ -158,26 +158,32 @@ PrometheusMetrics::OctetsSet() {
return;
}
// get old stats and reset them
uint64_t old_inoctets = SystemInfo::GetInstance().get_inoctets();
uint64_t old_outoctets = SystemInfo::GetInstance().get_octets();
auto old_time = SystemInfo::GetInstance().get_nettime();
std::pair<uint64_t, uint64_t> in_and_out_octets = SystemInfo::GetInstance().Octets();
SystemInfo::GetInstance().set_inoctets(in_and_out_octets.first);
SystemInfo::GetInstance().set_outoctets(in_and_out_octets.second);
SystemInfo::GetInstance().set_nettime();
try {
// get old stats and reset them
uint64_t old_inoctets = SystemInfo::GetInstance().get_inoctets();
uint64_t old_outoctets = SystemInfo::GetInstance().get_octets();
auto old_time = SystemInfo::GetInstance().get_nettime();
//
constexpr double micro_to_second = 1e-6;
auto now_time = std::chrono::system_clock::now();
auto total_microsecond = METRICS_MICROSECONDS(old_time, now_time);
auto total_second = total_microsecond * micro_to_second;
if (total_second == 0) {
return;
std::pair<uint64_t, uint64_t> in_and_out_octets = SystemInfo::GetInstance().Octets();
SystemInfo::GetInstance().set_inoctets(in_and_out_octets.first);
SystemInfo::GetInstance().set_outoctets(in_and_out_octets.second);
SystemInfo::GetInstance().set_nettime();
//
constexpr double micro_to_second = 1e-6;
auto now_time = std::chrono::system_clock::now();
auto total_microsecond = METRICS_MICROSECONDS(old_time, now_time);
auto total_second = total_microsecond * micro_to_second;
if (total_second == 0) {
return;
}
inoctets_gauge_.Set((in_and_out_octets.first - old_inoctets) / total_second);
outoctets_gauge_.Set((in_and_out_octets.second - old_outoctets) / total_second);
} catch (std::exception& ex) {
std::string msg = "Failed to set in/out octets, reason: " + std::string(ex.what());
LOG_SERVER_ERROR_ << msg;
}
inoctets_gauge_.Set((in_and_out_octets.first - old_inoctets) / total_second);
outoctets_gauge_.Set((in_and_out_octets.second - old_outoctets) / total_second);
}
void

View File

@ -51,6 +51,22 @@ HasPartitionRequest::OnExecute() {
return status;
}
// only process root collection, ignore partition collection
engine::meta::CollectionSchema collection_schema;
collection_schema.collection_id_ = collection_name_;
status = DBWrapper::DB()->DescribeCollection(collection_schema);
if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) {
return Status(SERVER_COLLECTION_NOT_EXIST, CollectionNotExistMsg(collection_name_));
} else {
return status;
}
} else {
if (!collection_schema.owner_collection_.empty()) {
return Status(SERVER_INVALID_COLLECTION_NAME, CollectionNotExistMsg(collection_name_));
}
}
std::vector<engine::meta::CollectionSchema> schema_array;
status = DBWrapper::DB()->ShowPartitions(collection_name_, schema_array);
if (!status.ok()) {

View File

@ -377,12 +377,6 @@ TEST_F(DBTest, SEARCH_TEST) {
result_ids,
result_distances);
ASSERT_TRUE(stat.ok());
// FIU_ENABLE_FIU("DBImpl.QueryByFileID.empty_files_array");
// stat =
// db_->QueryByFileID(dummy_context_, file_ids, k, json_params, xq, result_ids, result_distances);
// ASSERT_FALSE(stat.ok());
// fiu_disable("DBImpl.QueryByFileID.empty_files_array");
}
// TODO(zhiru): PQ build takes forever

View File

@ -22,7 +22,6 @@
#include <fiu-local.h>
#include <fiu-control.h>
#include <boost/filesystem/operations.hpp>
#include "src/db/OngoingFileChecker.h"
TEST_F(MetaTest, COLLECTION_TEST) {
auto collection_id = "meta_test_table";
@ -149,13 +148,13 @@ TEST_F(MetaTest, FALID_TEST) {
fiu_disable("SqliteMetaImpl.DeleteCollectionFiles.throw_exception");
}
{
milvus::engine::meta::SegmentsSchema schemas;
milvus::engine::meta::FilesHolder files_holder;
std::vector<size_t> ids;
status = impl_->GetCollectionFiles("notexist", ids, schemas);
status = impl_->GetCollectionFiles("notexist", ids, files_holder);
ASSERT_FALSE(status.ok());
FIU_ENABLE_FIU("SqliteMetaImpl.GetCollectionFiles.throw_exception");
status = impl_->GetCollectionFiles(collection_id, ids, schemas);
status = impl_->GetCollectionFiles(collection_id, ids, files_holder);
ASSERT_FALSE(status.ok());
ASSERT_EQ(status.code(), milvus::DB_META_TRANSACTION_FAILED);
fiu_disable("SqliteMetaImpl.GetCollectionFiles.throw_exception");
@ -260,12 +259,12 @@ TEST_F(MetaTest, FALID_TEST) {
fiu_disable("SqliteMetaImpl.GetPartitionName.throw_exception");
}
{
milvus::engine::meta::SegmentsSchema table_files;
status = impl_->FilesToSearch("notexist", table_files);
milvus::engine::meta::FilesHolder files_holder;
status = impl_->FilesToSearch("notexist", files_holder);
ASSERT_EQ(status.code(), milvus::DB_NOT_FOUND);
FIU_ENABLE_FIU("SqliteMetaImpl.FilesToSearch.throw_exception");
status = impl_->FilesToSearch(collection_id, table_files);
status = impl_->FilesToSearch(collection_id, files_holder);
ASSERT_EQ(status.code(), milvus::DB_META_TRANSACTION_FAILED);
fiu_disable("SqliteMetaImpl.FilesToSearch.throw_exception");
}
@ -277,23 +276,23 @@ TEST_F(MetaTest, FALID_TEST) {
file.file_type_ = milvus::engine::meta::SegmentSchema::TO_INDEX;
impl_->UpdateCollectionFile(file);
milvus::engine::meta::SegmentsSchema files;
milvus::engine::meta::FilesHolder files_holder;
FIU_ENABLE_FIU("SqliteMetaImpl_FilesToIndex_CollectionNotFound");
status = impl_->FilesToIndex(files);
status = impl_->FilesToIndex(files_holder);
ASSERT_EQ(status.code(), milvus::DB_NOT_FOUND);
fiu_disable("SqliteMetaImpl_FilesToIndex_CollectionNotFound");
FIU_ENABLE_FIU("SqliteMetaImpl.FilesToIndex.throw_exception");
status = impl_->FilesToIndex(files);
status = impl_->FilesToIndex(files_holder);
ASSERT_EQ(status.code(), milvus::DB_META_TRANSACTION_FAILED);
fiu_disable("SqliteMetaImpl.FilesToIndex.throw_exception");
}
{
milvus::engine::meta::SegmentsSchema files;
milvus::engine::meta::FilesHolder files_holder;
std::vector<int> file_types;
file_types.push_back(milvus::engine::meta::SegmentSchema::INDEX);
FIU_ENABLE_FIU("SqliteMetaImpl.FilesByType.throw_exception");
status = impl_->FilesByType(collection_id, file_types, files);
status = impl_->FilesByType(collection_id, file_types, files_holder);
ASSERT_EQ(status.code(), milvus::DB_META_TRANSACTION_FAILED);
fiu_disable("SqliteMetaImpl.FilesByType.throw_exception");
}
@ -410,8 +409,10 @@ TEST_F(MetaTest, COLLECTION_FILE_ROW_COUNT_TEST) {
ASSERT_EQ(table_file.row_count_, cnt);
std::vector<size_t> ids = {table_file.id_};
milvus::engine::meta::SegmentsSchema schemas;
status = impl_->GetCollectionFiles(collection_id, ids, schemas);
milvus::engine::meta::FilesHolder files_holder;
status = impl_->GetCollectionFiles(collection_id, ids, files_holder);
milvus::engine::meta::SegmentsSchema& schemas = files_holder.HoldFiles();
ASSERT_EQ(schemas.size(), 1UL);
ASSERT_EQ(table_file.row_count_, schemas[0].row_count_);
ASSERT_EQ(table_file.file_id_, schemas[0].file_id_);
@ -470,10 +471,11 @@ TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
impl.Archive();
int i = 0;
milvus::engine::meta::SegmentsSchema files_get;
status = impl.GetCollectionFiles(table_file.collection_id_, ids, files_get);
milvus::engine::meta::FilesHolder files_holder;
status = impl.GetCollectionFiles(table_file.collection_id_, ids, files_holder);
ASSERT_TRUE(status.ok());
milvus::engine::meta::SegmentsSchema& files_get = files_holder.HoldFiles();
for (auto& file : files_get) {
if (days[i] < days_num) {
ASSERT_EQ(file.file_type_, milvus::engine::meta::SegmentSchema::NEW);
@ -526,10 +528,11 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) {
impl.Archive();
int i = 0;
milvus::engine::meta::SegmentsSchema files_get;
status = impl.GetCollectionFiles(table_file.collection_id_, ids, files_get);
milvus::engine::meta::FilesHolder files_holder;
status = impl.GetCollectionFiles(table_file.collection_id_, ids, files_holder);
ASSERT_TRUE(status.ok());
milvus::engine::meta::SegmentsSchema& files_get = files_holder.HoldFiles();
for (auto& file : files_get) {
if (i >= 5) {
ASSERT_EQ(file.file_type_, milvus::engine::meta::SegmentSchema::NEW);
@ -609,39 +612,40 @@ TEST_F(MetaTest, COLLECTION_FILES_TEST) {
ASSERT_TRUE(status.ok());
ASSERT_EQ(total_row_count, raw_files_cnt + to_index_files_cnt + index_files_cnt);
milvus::engine::meta::SegmentsSchema files;
status = impl_->FilesToIndex(files);
ASSERT_EQ(files.size(), to_index_files_cnt);
milvus::engine::meta::FilesHolder files_holder;
status = impl_->FilesToIndex(files_holder);
ASSERT_EQ(files_holder.HoldFiles().size(), to_index_files_cnt);
milvus::engine::meta::SegmentsSchema table_files;
status = impl_->FilesToMerge(collection.collection_id_, table_files);
ASSERT_EQ(table_files.size(), raw_files_cnt);
files_holder.ReleaseFiles();
status = impl_->FilesToMerge(collection.collection_id_, files_holder);
ASSERT_EQ(files_holder.HoldFiles().size(), raw_files_cnt);
status = impl_->FilesToIndex(files);
ASSERT_EQ(files.size(), to_index_files_cnt);
files_holder.ReleaseFiles();
status = impl_->FilesToIndex(files_holder);
ASSERT_EQ(files_holder.HoldFiles().size(), to_index_files_cnt);
table_files.clear();
status = impl_->FilesToSearch(collection_id, table_files);
ASSERT_EQ(table_files.size(), to_index_files_cnt + raw_files_cnt + index_files_cnt);
files_holder.ReleaseFiles();
status = impl_->FilesToSearch(collection_id, files_holder);
ASSERT_EQ(files_holder.HoldFiles().size(), to_index_files_cnt + raw_files_cnt + index_files_cnt);
std::vector<size_t> ids;
for (auto& file : table_files) {
for (auto& file : files_holder.HoldFiles()) {
ids.push_back(file.id_);
}
size_t cnt = table_files.size();
table_files.clear();
status = impl_->FilesByID(ids, table_files);
ASSERT_EQ(table_files.size(), cnt);
size_t cnt = files_holder.HoldFiles().size();
files_holder.ReleaseFiles();
status = impl_->FilesByID(ids, files_holder);
ASSERT_EQ(files_holder.HoldFiles().size(), cnt);
table_files.clear();
files_holder.ReleaseFiles();
ids = {9999999999UL};
status = impl_->FilesByID(ids, table_files);
ASSERT_EQ(table_files.size(), 0);
status = impl_->FilesByID(ids, files_holder);
ASSERT_EQ(files_holder.HoldFiles().size(), 0);
table_files.clear();
files_holder.ReleaseFiles();
std::vector<int> file_types;
status = impl_->FilesByType(collection.collection_id_, file_types, table_files);
ASSERT_TRUE(table_files.empty());
status = impl_->FilesByType(collection.collection_id_, file_types, files_holder);
ASSERT_TRUE(files_holder.HoldFiles().empty());
ASSERT_FALSE(status.ok());
file_types = {
@ -650,11 +654,11 @@ TEST_F(MetaTest, COLLECTION_FILES_TEST) {
milvus::engine::meta::SegmentSchema::INDEX, milvus::engine::meta::SegmentSchema::RAW,
milvus::engine::meta::SegmentSchema::BACKUP,
};
status = impl_->FilesByType(collection.collection_id_, file_types, table_files);
status = impl_->FilesByType(collection.collection_id_, file_types, files_holder);
ASSERT_TRUE(status.ok());
uint64_t total_cnt = new_index_files_cnt + new_merge_files_cnt + backup_files_cnt + new_files_cnt + raw_files_cnt +
to_index_files_cnt + index_files_cnt;
ASSERT_EQ(table_files.size(), total_cnt);
ASSERT_EQ(files_holder.HoldFiles().size(), total_cnt);
status = impl_->DeleteCollectionFiles(collection_id);
ASSERT_TRUE(status.ok());
@ -670,15 +674,14 @@ TEST_F(MetaTest, COLLECTION_FILES_TEST) {
status = impl_->CreateCollectionFile(table_file);
std::vector<int> files_to_delete;
milvus::engine::meta::SegmentsSchema files_schema;
files_holder.ReleaseFiles();
files_to_delete.push_back(milvus::engine::meta::SegmentSchema::TO_DELETE);
status = impl_->FilesByType(collection_id, files_to_delete, files_schema);
status = impl_->FilesByType(collection_id, files_to_delete, files_holder);
ASSERT_TRUE(status.ok());
table_file.collection_id_ = collection_id;
table_file.file_type_ = milvus::engine::meta::SegmentSchema::TO_DELETE;
table_file.file_id_ = files_schema.front().file_id_;
milvus::engine::OngoingFileChecker::GetInstance().MarkOngoingFile(table_file);
table_file.file_id_ = files_holder.HoldFiles().front().file_id_;
status = impl_->CleanUpFilesWithTTL(1UL);
ASSERT_TRUE(status.ok());

View File

@ -23,7 +23,6 @@
#include <thread>
#include <fiu-local.h>
#include <fiu-control.h>
#include <src/db/OngoingFileChecker.h>
const char* FAILED_CONNECT_SQL_SERVER = "Failed to connect to meta server(mysql)";
const char* COLLECTION_ALREADY_EXISTS = "Collection already exists and it is in delete state, please wait a second";
@ -231,44 +230,43 @@ TEST_F(MySqlMetaTest, COLLECTION_FILE_TEST) {
ASSERT_TRUE(status.ok());
std::vector<size_t> ids = {table_file.id_};
milvus::engine::meta::SegmentsSchema files;
status = impl_->GetCollectionFiles(table_file.collection_id_, ids, files);
ASSERT_EQ(files.size(), 0UL);
milvus::engine::meta::FilesHolder files_holder;
status = impl_->GetCollectionFiles(table_file.collection_id_, ids, files_holder);
ASSERT_EQ(files_holder.HoldFiles().size(), 0UL);
FIU_ENABLE_FIU("MySQLMetaImpl.GetCollectionFiles.null_connection");
status = impl_->GetCollectionFiles(table_file.collection_id_, ids, files);
status = impl_->GetCollectionFiles(table_file.collection_id_, ids, files_holder);
ASSERT_FALSE(status.ok());
fiu_disable("MySQLMetaImpl.GetCollectionFiles.null_connection");
FIU_ENABLE_FIU("MySQLMetaImpl.GetCollectionFiles.throw_exception");
status = impl_->GetCollectionFiles(table_file.collection_id_, ids, files);
status = impl_->GetCollectionFiles(table_file.collection_id_, ids, files_holder);
ASSERT_FALSE(status.ok());
fiu_disable("MySQLMetaImpl.GetCollectionFiles.throw_exception");
ids.clear();
status = impl_->GetCollectionFiles(table_file.collection_id_, ids, files);
status = impl_->GetCollectionFiles(table_file.collection_id_, ids, files_holder);
ASSERT_TRUE(status.ok());
table_file.collection_id_ = collection.collection_id_;
table_file.file_type_ = milvus::engine::meta::SegmentSchema::RAW;
status = impl_->CreateCollectionFile(table_file);
ids = {table_file.id_};
status = impl_->FilesByID(ids, files);
ASSERT_EQ(files.size(), 1UL);
status = impl_->FilesByID(ids, files_holder);
ASSERT_EQ(files_holder.HoldFiles().size(), 1UL);
table_file.collection_id_ = collection.collection_id_;
table_file.file_type_ = milvus::engine::meta::SegmentSchema::TO_DELETE;
status = impl_->CreateCollectionFile(table_file);
std::vector<int> files_to_delete;
files_to_delete.push_back(milvus::engine::meta::SegmentSchema::TO_DELETE);
status = impl_->FilesByType(collection_id, files_to_delete, files_schema);
files_holder.ReleaseFiles();
std::vector<int> files_to_delete = {milvus::engine::meta::SegmentSchema::TO_DELETE};
status = impl_->FilesByType(collection_id, files_to_delete, files_holder);
ASSERT_TRUE(status.ok());
table_file.collection_id_ = collection_id;
table_file.file_type_ = milvus::engine::meta::SegmentSchema::TO_DELETE;
table_file.file_id_ = files_schema.front().file_id_;
milvus::engine::OngoingFileChecker::GetInstance().MarkOngoingFile(table_file);
table_file.file_id_ = files_holder.HoldFiles().front().file_id_;
status = impl_->CleanUpFilesWithTTL(1UL);
ASSERT_TRUE(status.ok());
@ -306,8 +304,10 @@ TEST_F(MySqlMetaTest, COLLECTION_FILE_ROW_COUNT_TEST) {
ASSERT_EQ(table_file.row_count_, cnt);
std::vector<size_t> ids = {table_file.id_};
milvus::engine::meta::SegmentsSchema schemas;
status = impl_->GetCollectionFiles(collection_id, ids, schemas);
milvus::engine::meta::FilesHolder files_holder;
status = impl_->GetCollectionFiles(collection_id, ids, files_holder);
milvus::engine::meta::SegmentsSchema& schemas = files_holder.HoldFiles();
ASSERT_EQ(schemas.size(), 1UL);
ASSERT_EQ(table_file.row_count_, schemas[0].row_count_);
ASSERT_EQ(table_file.file_id_, schemas[0].file_id_);
@ -371,11 +371,11 @@ TEST_F(MySqlMetaTest, ARCHIVE_TEST_DAYS) {
impl.Archive();
int i = 0;
milvus::engine::meta::SegmentsSchema files_get;
status = impl.GetCollectionFiles(table_file.collection_id_, ids, files_get);
milvus::engine::meta::FilesHolder files_holder;
status = impl.GetCollectionFiles(table_file.collection_id_, ids, files_holder);
ASSERT_TRUE(status.ok());
for (auto& file : files_get) {
for (auto& file : files_holder.HoldFiles()) {
if (days[i] < days_num) {
ASSERT_EQ(file.file_type_, milvus::engine::meta::SegmentSchema::NEW);
}
@ -385,21 +385,22 @@ TEST_F(MySqlMetaTest, ARCHIVE_TEST_DAYS) {
std::vector<int> file_types = {
(int)milvus::engine::meta::SegmentSchema::NEW,
};
milvus::engine::meta::SegmentsSchema table_files;
status = impl.FilesByType(collection_id, file_types, table_files);
ASSERT_FALSE(table_files.empty());
files_holder.ReleaseFiles();
status = impl.FilesByType(collection_id, file_types, files_holder);
ASSERT_FALSE(files_holder.HoldFiles().empty());
FIU_ENABLE_FIU("MySQLMetaImpl.FilesByType.null_connection");
table_files.clear();
status = impl.FilesByType(collection_id, file_types, table_files);
files_holder.ReleaseFiles();
status = impl.FilesByType(collection_id, file_types, files_holder);
ASSERT_FALSE(status.ok());
ASSERT_TRUE(table_files.empty());
ASSERT_TRUE(files_holder.HoldFiles().empty());
fiu_disable("MySQLMetaImpl.FilesByType.null_connection");
FIU_ENABLE_FIU("MySQLMetaImpl.FilesByType.throw_exception");
status = impl.FilesByType(collection_id, file_types, table_files);
status = impl.FilesByType(collection_id, file_types, files_holder);
ASSERT_FALSE(status.ok());
ASSERT_TRUE(table_files.empty());
ASSERT_TRUE(files_holder.HoldFiles().empty());
fiu_disable("MySQLMetaImpl.FilesByType.throw_exception");
status = impl.UpdateCollectionFilesToIndex(collection_id);
@ -463,11 +464,11 @@ TEST_F(MySqlMetaTest, ARCHIVE_TEST_DISK) {
impl.Archive();
int i = 0;
milvus::engine::meta::SegmentsSchema files_get;
status = impl.GetCollectionFiles(table_file.collection_id_, ids, files_get);
milvus::engine::meta::FilesHolder files_holder;
status = impl.GetCollectionFiles(table_file.collection_id_, ids, files_holder);
ASSERT_TRUE(status.ok());
for (auto& file : files_get) {
for (auto& file : files_holder.HoldFiles()) {
if (i >= 5) {
ASSERT_EQ(file.file_type_, milvus::engine::meta::SegmentSchema::NEW);
}
@ -596,83 +597,80 @@ TEST_F(MySqlMetaTest, COLLECTION_FILES_TEST) {
ASSERT_TRUE(status.ok());
ASSERT_EQ(total_row_count, raw_files_cnt + to_index_files_cnt + index_files_cnt);
milvus::engine::meta::SegmentsSchema files;
status = impl_->FilesToIndex(files);
ASSERT_EQ(files.size(), to_index_files_cnt);
milvus::engine::meta::FilesHolder files_holder;
status = impl_->FilesToIndex(files_holder);
ASSERT_EQ(files_holder.HoldFiles().size(), to_index_files_cnt);
milvus::engine::meta::SegmentsSchema table_files;
status = impl_->FilesToMerge(collection.collection_id_, table_files);
ASSERT_EQ(table_files.size(), raw_files_cnt);
files_holder.ReleaseFiles();
status = impl_->FilesToMerge(collection.collection_id_, files_holder);
ASSERT_EQ(files_holder.HoldFiles().size(), raw_files_cnt);
files_holder.ReleaseFiles();
FIU_ENABLE_FIU("MySQLMetaImpl.FilesToMerge.null_connection");
status = impl_->FilesToMerge(collection.collection_id_, table_files);
status = impl_->FilesToMerge(collection.collection_id_, files_holder);
ASSERT_FALSE(status.ok());
fiu_disable("MySQLMetaImpl.FilesToMerge.null_connection");
files_holder.ReleaseFiles();
FIU_ENABLE_FIU("MySQLMetaImpl.FilesToMerge.throw_exception");
status = impl_->FilesToMerge(collection.collection_id_, table_files);
status = impl_->FilesToMerge(collection.collection_id_, files_holder);
ASSERT_FALSE(status.ok());
fiu_disable("MySQLMetaImpl.FilesToMerge.throw_exception");
status = impl_->FilesToMerge("notexist", table_files);
files_holder.ReleaseFiles();
status = impl_->FilesToMerge("notexist", files_holder);
ASSERT_EQ(status.code(), milvus::DB_NOT_FOUND);
table_file.file_type_ = milvus::engine::meta::SegmentSchema::RAW;
table_file.file_size_ = milvus::engine::GB + 1;
status = impl_->UpdateCollectionFile(table_file);
ASSERT_TRUE(status.ok());
#if 0
{
//skip large files
milvus::engine::meta::SegmentsSchema table_files;
status = impl_->FilesToMerge(collection.collection_id_, table_files);
ASSERT_EQ(dated_files[table_file.date_].size(), raw_files_cnt);
}
#endif
status = impl_->FilesToIndex(files);
ASSERT_EQ(files.size(), to_index_files_cnt);
files_holder.ReleaseFiles();
status = impl_->FilesToIndex(files_holder);
ASSERT_EQ(files_holder.HoldFiles().size(), to_index_files_cnt);
FIU_ENABLE_FIU("MySQLMetaImpl.DescribeCollection.throw_exception");
status = impl_->FilesToIndex(files);
status = impl_->FilesToIndex(files_holder);
ASSERT_FALSE(status.ok());
fiu_disable("MySQLMetaImpl.DescribeCollection.throw_exception");
FIU_ENABLE_FIU("MySQLMetaImpl.FilesToIndex.null_connection");
status = impl_->FilesToIndex(files);
status = impl_->FilesToIndex(files_holder);
ASSERT_FALSE(status.ok());
fiu_disable("MySQLMetaImpl.FilesToIndex.null_connection");
FIU_ENABLE_FIU("MySQLMetaImpl.FilesToIndex.throw_exception");
status = impl_->FilesToIndex(files);
status = impl_->FilesToIndex(files_holder);
ASSERT_FALSE(status.ok());
fiu_disable("MySQLMetaImpl.FilesToIndex.throw_exception");
table_files.clear();
status = impl_->FilesToSearch(collection_id, table_files);
ASSERT_EQ(table_files.size(), to_index_files_cnt + raw_files_cnt + index_files_cnt);
files_holder.ReleaseFiles();
status = impl_->FilesToSearch(collection_id, files_holder);
ASSERT_EQ(files_holder.HoldFiles().size(), to_index_files_cnt + raw_files_cnt + index_files_cnt);
table_files.clear();
files_holder.ReleaseFiles();
std::vector<size_t> ids = {9999999999UL};
status = impl_->FilesByID(ids, table_files);
ASSERT_EQ(table_files.size(), 0);
status = impl_->FilesByID(ids, files_holder);
ASSERT_EQ(files_holder.HoldFiles().size(), 0);
FIU_ENABLE_FIU("MySQLMetaImpl.FilesToSearch.null_connection");
status = impl_->FilesToSearch(collection_id, table_files);
status = impl_->FilesToSearch(collection_id, files_holder);
ASSERT_FALSE(status.ok());
fiu_disable("MySQLMetaImpl.FilesToSearch.null_connection");
FIU_ENABLE_FIU("MySQLMetaImpl.FilesToSearch.throw_exception");
status = impl_->FilesToSearch(collection_id, table_files);
status = impl_->FilesToSearch(collection_id, files_holder);
ASSERT_FALSE(status.ok());
fiu_disable("MySQLMetaImpl.FilesToSearch.throw_exception");
status = impl_->FilesToSearch("notexist", table_files);
status = impl_->FilesToSearch("notexist", files_holder);
ASSERT_EQ(status.code(), milvus::DB_NOT_FOUND);
table_files.clear();
files_holder.ReleaseFiles();
std::vector<int> file_types;
status = impl_->FilesByType(collection.collection_id_, file_types, table_files);
ASSERT_TRUE(table_files.empty());
status = impl_->FilesByType(collection.collection_id_, file_types, files_holder);
ASSERT_TRUE(files_holder.HoldFiles().empty());
ASSERT_FALSE(status.ok());
file_types = {
@ -681,11 +679,11 @@ TEST_F(MySqlMetaTest, COLLECTION_FILES_TEST) {
milvus::engine::meta::SegmentSchema::INDEX, milvus::engine::meta::SegmentSchema::RAW,
milvus::engine::meta::SegmentSchema::BACKUP,
};
status = impl_->FilesByType(collection.collection_id_, file_types, table_files);
status = impl_->FilesByType(collection.collection_id_, file_types, files_holder);
ASSERT_TRUE(status.ok());
uint64_t total_cnt = new_index_files_cnt + new_merge_files_cnt + backup_files_cnt + new_files_cnt + raw_files_cnt +
to_index_files_cnt + index_files_cnt;
ASSERT_EQ(table_files.size(), total_cnt);
ASSERT_EQ(files_holder.HoldFiles().size(), total_cnt);
FIU_ENABLE_FIU("MySQLMetaImpl.DeleteCollectionFiles.null_connection");
status = impl_->DeleteCollectionFiles(collection_id);

View File

@ -17,7 +17,6 @@
#include "db/IDGenerator.h"
#include "db/IndexFailedChecker.h"
#include "db/OngoingFileChecker.h"
#include "db/Options.h"
#include "db/Utils.h"
#include "db/engine/EngineFactory.h"
@ -209,31 +208,6 @@ TEST(DBMiscTest, CHECKER_TEST) {
checker.GetErrMsgForCollection("bbb", err_msg);
ASSERT_EQ(err_msg, "5001 fail");
}
{
milvus::engine::OngoingFileChecker& checker = milvus::engine::OngoingFileChecker::GetInstance();
milvus::engine::meta::SegmentSchema schema;
schema.collection_id_ = "aaa";
schema.file_id_ = "5000";
checker.MarkOngoingFile(schema);
ASSERT_TRUE(checker.IsIgnored(schema));
schema.collection_id_ = "bbb";
schema.file_id_ = "5001";
milvus::engine::meta::SegmentsSchema table_files = {schema};
checker.MarkOngoingFiles(table_files);
ASSERT_TRUE(checker.IsIgnored(schema));
checker.UnmarkOngoingFile(schema);
ASSERT_FALSE(checker.IsIgnored(schema));
schema.collection_id_ = "aaa";
schema.file_id_ = "5000";
checker.UnmarkOngoingFile(schema);
ASSERT_FALSE(checker.IsIgnored(schema));
}
}
TEST(DBMiscTest, IDGENERATOR_TEST) {

View File

@ -108,7 +108,6 @@ TEST_F(SearchByIdTest, BASIC_TEST) {
ids_to_search.emplace_back(index);
}
// std::this_thread::sleep_for(std::chrono::seconds(3)); // ensure raw data write to disk
stat = db_->Flush();
ASSERT_TRUE(stat.ok());
@ -156,6 +155,24 @@ TEST_F(SearchByIdTest, BASIC_TEST) {
ASSERT_LT(result_distances[topk * i], 1e-3);
}
}
// duplicate id search
ids_to_search.clear();
ids_to_search.push_back(1);
ids_to_search.push_back(1);
stat = db_->QueryByIDs(dummy_context_,
collection_info.collection_id_,
tags,
topk,
json_params,
ids_to_search,
result_ids,
result_distances);
ASSERT_EQ(result_ids.size(), ids_to_search.size() * topk);
ASSERT_EQ(result_distances.size(), ids_to_search.size() * topk);
CheckQueryResult(ids_to_search, topk, result_ids, result_distances);
}
TEST_F(SearchByIdTest, WITH_INDEX_TEST) {
@ -508,7 +525,6 @@ TEST_F(SearchByIdTest, BINARY_TEST) {
ids_to_search.emplace_back(index);
}
// std::this_thread::sleep_for(std::chrono::seconds(3)); // ensure raw data write to disk
stat = db_->Flush();
ASSERT_TRUE(stat.ok());
@ -577,4 +593,22 @@ TEST_F(SearchByIdTest, BINARY_TEST) {
ASSERT_LT(result_distances[topk * i], 1e-3);
}
}
// duplicate id search
ids_to_search.clear();
ids_to_search.push_back(1);
ids_to_search.push_back(1);
stat = db_->QueryByIDs(dummy_context_,
collection_info.collection_id_,
tags,
topk,
json_params,
ids_to_search,
result_ids,
result_distances);
ASSERT_EQ(result_ids.size(), ids_to_search.size() * topk);
ASSERT_EQ(result_distances.size(), ids_to_search.size() * topk);
CheckQueryResult(ids_to_search, topk, result_ids, result_distances);
}