From f32cba6111b512a0c6a1f71146af7c3ec185304f Mon Sep 17 00:00:00 2001 From: groot Date: Sun, 10 May 2020 20:38:50 -0500 Subject: [PATCH] Merge strategy (#2277) * merge manager Signed-off-by: yhmo * fix build error Signed-off-by: groot * layered merge strategy Signed-off-by: groot * fix ut Signed-off-by: groot * use simple Signed-off-by: groot * fix parallel multi collections query crashissue Signed-off-by: groot --- core/src/CMakeLists.txt | 2 + core/src/db/DBImpl.cpp | 131 +---------------- core/src/db/DBImpl.h | 8 +- core/src/db/merge/MergeLayeredStrategy.cpp | 138 ++++++++++++++++++ core/src/db/merge/MergeLayeredStrategy.h | 29 ++++ core/src/db/merge/MergeManager.h | 43 ++++++ core/src/db/merge/MergeManagerFactory.cpp | 26 ++++ core/src/db/merge/MergeManagerFactory.h | 29 ++++ core/src/db/merge/MergeManagerImpl.cpp | 89 +++++++++++ core/src/db/merge/MergeManagerImpl.h | 48 ++++++ core/src/db/merge/MergeSimpleStrategy.cpp | 25 ++++ core/src/db/merge/MergeSimpleStrategy.h | 29 ++++ core/src/db/merge/MergeStrategy.h | 38 +++++ core/src/db/merge/MergeTask.cpp | 128 ++++++++++++++++ core/src/db/merge/MergeTask.h | 36 +++++ core/src/db/meta/MySQLMetaImpl.cpp | 3 +- core/src/db/meta/SqliteMetaImpl.cpp | 2 + .../delivery/strategy/SearchReqStrategy.cpp | 5 +- core/unittest/CMakeLists.txt | 2 + 19 files changed, 677 insertions(+), 134 deletions(-) create mode 100644 core/src/db/merge/MergeLayeredStrategy.cpp create mode 100644 core/src/db/merge/MergeLayeredStrategy.h create mode 100644 core/src/db/merge/MergeManager.h create mode 100644 core/src/db/merge/MergeManagerFactory.cpp create mode 100644 core/src/db/merge/MergeManagerFactory.h create mode 100644 core/src/db/merge/MergeManagerImpl.cpp create mode 100644 core/src/db/merge/MergeManagerImpl.h create mode 100644 core/src/db/merge/MergeSimpleStrategy.cpp create mode 100644 core/src/db/merge/MergeSimpleStrategy.h create mode 100644 core/src/db/merge/MergeStrategy.h create mode 100644 core/src/db/merge/MergeTask.cpp create mode 100644 core/src/db/merge/MergeTask.h diff --git a/core/src/CMakeLists.txt b/core/src/CMakeLists.txt index 159b03b078..270e83f5cf 100644 --- a/core/src/CMakeLists.txt +++ b/core/src/CMakeLists.txt @@ -37,6 +37,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db db_main_files) aux_source_directory(${MILVUS_ENGINE_SRC}/db/engine db_engine_files) aux_source_directory(${MILVUS_ENGINE_SRC}/db/insert db_insert_files) aux_source_directory(${MILVUS_ENGINE_SRC}/db/meta db_meta_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/db/merge db_merge_files) aux_source_directory(${MILVUS_ENGINE_SRC}/db/wal db_wal_files) set(grpc_service_files @@ -143,6 +144,7 @@ set(engine_files ${db_engine_files} ${db_insert_files} ${db_meta_files} + ${db_merge_files} ${db_wal_files} ${metrics_files} ${storage_files} diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index b3fe65c516..b0abd4abec 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -31,6 +31,7 @@ #include "cache/CpuCacheMgr.h" #include "cache/GpuCacheMgr.h" #include "db/IDGenerator.h" +#include "db/merge/MergeManagerFactory.h" #include "engine/EngineFactory.h" #include "index/thirdparty/faiss/utils/distances.h" #include "insert/MemManagerFactory.h" @@ -78,6 +79,7 @@ DBImpl::DBImpl(const DBOptions& options) : options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) { meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_); mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_); + merge_mgr_ptr_ = MergeManagerFactory::Build(meta_ptr_, options_); if (options_.wal_enable_) { wal::MXLogConfiguration mxlog_config; @@ -1906,101 +1908,6 @@ DBImpl::StartMergeTask() { // LOG_ENGINE_DEBUG_ << "End StartMergeTask"; } -Status -DBImpl::MergeFiles(const std::string& collection_id, meta::FilesHolder& files_holder) { - // const std::lock_guard lock(flush_merge_compact_mutex_); - - LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id; - - // step 1: create collection file - meta::SegmentSchema collection_file; - collection_file.collection_id_ = collection_id; - collection_file.file_type_ = meta::SegmentSchema::NEW_MERGE; - Status status = meta_ptr_->CreateCollectionFile(collection_file); - if (!status.ok()) { - LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString(); - return status; - } - - // step 2: merge files - /* - ExecutionEnginePtr index = - EngineFactory::Build(collection_file.dimension_, collection_file.location_, - (EngineType)collection_file.engine_type_, (MetricType)collection_file.metric_type_, collection_file.nlist_); -*/ - meta::SegmentsSchema updated; - - std::string new_segment_dir; - utils::GetParentPath(collection_file.location_, new_segment_dir); - auto segment_writer_ptr = std::make_shared(new_segment_dir); - - // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal - milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles(); - for (auto& file : files) { - server::CollectMergeFilesMetrics metrics; - std::string segment_dir_to_merge; - utils::GetParentPath(file.location_, segment_dir_to_merge); - segment_writer_ptr->Merge(segment_dir_to_merge, collection_file.file_id_); - - files_holder.UnmarkFile(file); - - auto file_schema = file; - file_schema.file_type_ = meta::SegmentSchema::TO_DELETE; - updated.push_back(file_schema); - auto size = segment_writer_ptr->Size(); - if (size >= file_schema.index_file_size_) { - break; - } - } - - // step 3: serialize to disk - try { - status = segment_writer_ptr->Serialize(); - fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception()); - fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, "")); - } catch (std::exception& ex) { - std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what()); - LOG_ENGINE_ERROR_ << msg; - status = Status(DB_ERROR, msg); - } - - if (!status.ok()) { - LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << status.message(); - - // if failed to serialize merge file to disk - // typical error: out of disk space, out of memory or permission denied - collection_file.file_type_ = meta::SegmentSchema::TO_DELETE; - status = meta_ptr_->UpdateCollectionFile(collection_file); - LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << collection_file.file_id_ - << " to to_delete"; - - return status; - } - - // step 4: update collection files state - // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size - // else set file type to RAW, no need to build index - if (!utils::IsRawIndexType(collection_file.engine_type_)) { - collection_file.file_type_ = (segment_writer_ptr->Size() >= collection_file.index_file_size_) - ? meta::SegmentSchema::TO_INDEX - : meta::SegmentSchema::RAW; - } else { - collection_file.file_type_ = meta::SegmentSchema::RAW; - } - collection_file.file_size_ = segment_writer_ptr->Size(); - collection_file.row_count_ = segment_writer_ptr->VectorCount(); - updated.push_back(collection_file); - status = meta_ptr_->UpdateCollectionFiles(updated); - LOG_ENGINE_DEBUG_ << "New merged segment " << collection_file.segment_id_ << " of size " - << segment_writer_ptr->Size() << " bytes"; - - if (options_.insert_cache_immediately_) { - segment_writer_ptr->Cache(); - } - - return status; -} - Status DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& files_holder) { // const std::lock_guard lock(flush_merge_compact_mutex_); @@ -2096,44 +2003,22 @@ DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& fi return status; } -Status -DBImpl::BackgroundMergeFiles(const std::string& collection_id) { - const std::lock_guard lock(flush_merge_compact_mutex_); - - meta::FilesHolder files_holder; - auto status = meta_ptr_->FilesToMerge(collection_id, files_holder); - if (!status.ok()) { - LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_id; - return status; - } - - if (files_holder.HoldFiles().size() < options_.merge_trigger_number_) { - LOG_ENGINE_TRACE_ << "Files number not greater equal than merge trigger number, skip merge action"; - return Status::OK(); - } - - MergeFiles(collection_id, files_holder); - - if (!initialized_.load(std::memory_order_acquire)) { - LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection: " << collection_id; - } - - return Status::OK(); -} - void DBImpl::BackgroundMerge(std::set collection_ids) { // LOG_ENGINE_TRACE_ << " Background merge thread start"; Status status; for (auto& collection_id : collection_ids) { - status = BackgroundMergeFiles(collection_id); + const std::lock_guard lock(flush_merge_compact_mutex_); + + auto status = merge_mgr_ptr_->MergeFiles(collection_id); if (!status.ok()) { - LOG_ENGINE_ERROR_ << "Merge files for collection " << collection_id << " failed: " << status.ToString(); + LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_id + << " reason:" << status.message(); } if (!initialized_.load(std::memory_order_acquire)) { - LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action"; + LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection: " << collection_id; break; } } diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index 4e3b86fc8c..4843aaaa63 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -29,6 +29,7 @@ #include "db/IndexFailedChecker.h" #include "db/Types.h" #include "db/insert/MemManager.h" +#include "db/merge/MergeManager.h" #include "db/meta/FilesHolder.h" #include "utils/ThreadPool.h" #include "wal/WalManager.h" @@ -226,12 +227,6 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi void StartMergeTask(); - Status - MergeFiles(const std::string& collection_id, meta::FilesHolder& files_holder); - - Status - BackgroundMergeFiles(const std::string& collection_id); - void BackgroundMerge(std::set collection_ids); @@ -290,6 +285,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi meta::MetaPtr meta_ptr_; MemManagerPtr mem_mgr_; + MergeManagerPtr merge_mgr_ptr_; std::shared_ptr wal_mgr_; std::thread bg_wal_thread_; diff --git a/core/src/db/merge/MergeLayeredStrategy.cpp b/core/src/db/merge/MergeLayeredStrategy.cpp new file mode 100644 index 0000000000..dddd114a89 --- /dev/null +++ b/core/src/db/merge/MergeLayeredStrategy.cpp @@ -0,0 +1,138 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "db/merge/MergeLayeredStrategy.h" +#include "db/Utils.h" +#include "db/meta/MetaConsts.h" +#include "utils/Log.h" + +#include +#include + +namespace milvus { +namespace engine { + +Status +MergeLayeredStrategy::RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) { + using LayerGroups = std::map; + // distribute files to groups according to file size(in byte) + LayerGroups layers = { + {1UL << 22, meta::SegmentsSchema()}, // 4MB + {1UL << 24, meta::SegmentsSchema()}, // 16MB + {1UL << 26, meta::SegmentsSchema()}, // 64MB + {1UL << 28, meta::SegmentsSchema()}, // 256MB + {1UL << 30, meta::SegmentsSchema()}, // 1GB + }; + + meta::SegmentsSchema& files = files_holder.HoldFiles(); + meta::SegmentsSchema huge_files; + // iterater from end, because typically the files_holder get files in order from largest to smallest + for (meta::SegmentsSchema::reverse_iterator iter = files.rbegin(); iter != files.rend(); ++iter) { + meta::SegmentSchema& file = *iter; + if (file.index_file_size_ > 0 && file.file_size_ > file.index_file_size_) { + // release file that no need to merge + files_holder.UnmarkFile(file); + continue; + } + + bool match = false; + for (auto& pair : layers) { + if ((*iter).file_size_ < pair.first) { + pair.second.push_back(file); + match = true; + break; + } + } + + if (!match) { + huge_files.push_back(file); + } + } + + const int64_t force_merge_threashold = 60; // force merge files older than 1 minute + auto now = utils::GetMicroSecTimeStamp(); + meta::SegmentsSchema force_merge_file; + for (auto& pair : layers) { + // skip empty layer + if (pair.second.empty()) { + continue; + } + + // layer has multiple files, merge along with the force_merge_file + if (!force_merge_file.empty()) { + for (auto& file : force_merge_file) { + pair.second.push_back(file); + } + force_merge_file.clear(); + } + + // layer only has one file, if the file is too old, force merge it, else no need to merge it + if (pair.second.size() == 1) { + if (now - pair.second[0].created_on_ > force_merge_threashold * meta::US_PS) { + force_merge_file.push_back(pair.second[0]); + pair.second.clear(); + } + } + } + + // if force_merge_file is not allocated by any layer, combine it to huge_files + if (!force_merge_file.empty() && !huge_files.empty()) { + for (auto& file : force_merge_file) { + huge_files.push_back(file); + } + force_merge_file.clear(); + } + + // return result + for (auto& pair : layers) { + if (pair.second.size() == 1) { + // release file that no need to merge + files_holder.UnmarkFile(pair.second[0]); + } else if (pair.second.size() > 1) { + // create group + meta::SegmentsSchema temp_files; + temp_files.swap(pair.second); + files_groups.emplace_back(temp_files); + } + } + + if (huge_files.size() >= 1) { + meta::SegmentsSchema temp_files; + temp_files.swap(huge_files); + for (auto& file : force_merge_file) { + temp_files.push_back(file); + } + + if (temp_files.size() >= 2) { + // create group + files_groups.emplace_back(temp_files); + } else { + for (auto& file : huge_files) { + // release file that no need to merge + files_holder.UnmarkFile(file); + } + for (auto& file : force_merge_file) { + // release file that no need to merge + files_holder.UnmarkFile(file); + } + } + } else { + for (auto& file : force_merge_file) { + // release file that no need to merge + files_holder.UnmarkFile(file); + } + } + + return Status::OK(); +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/MergeLayeredStrategy.h b/core/src/db/merge/MergeLayeredStrategy.h new file mode 100644 index 0000000000..4442d6a359 --- /dev/null +++ b/core/src/db/merge/MergeLayeredStrategy.h @@ -0,0 +1,29 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include + +#include "db/merge/MergeStrategy.h" +#include "utils/Status.h" + +namespace milvus { +namespace engine { + +class MergeLayeredStrategy : public MergeStrategy { + public: + Status + RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) override; +}; // MergeLayeredStrategy + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/MergeManager.h b/core/src/db/merge/MergeManager.h new file mode 100644 index 0000000000..698e64e5be --- /dev/null +++ b/core/src/db/merge/MergeManager.h @@ -0,0 +1,43 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include "db/Types.h" +#include "db/meta/FilesHolder.h" +#include "utils/Status.h" + +namespace milvus { +namespace engine { + +enum class MergeStrategyType { + SIMPLE = 1, + LAYERED = 2, +}; + +class MergeManager { + public: + virtual Status + UseStrategy(MergeStrategyType type) = 0; + virtual Status + MergeFiles(const std::string& collection_id) = 0; +}; // MergeManager + +using MergeManagerPtr = std::shared_ptr; + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/MergeManagerFactory.cpp b/core/src/db/merge/MergeManagerFactory.cpp new file mode 100644 index 0000000000..b4e2e430ce --- /dev/null +++ b/core/src/db/merge/MergeManagerFactory.cpp @@ -0,0 +1,26 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "db/merge/MergeManagerFactory.h" +#include "db/merge/MergeManagerImpl.h" +#include "utils/Exception.h" +#include "utils/Log.h" + +namespace milvus { +namespace engine { + +MergeManagerPtr +MergeManagerFactory::Build(const meta::MetaPtr& meta_ptr, const DBOptions& options) { + return std::make_shared(meta_ptr, options, MergeStrategyType::SIMPLE); +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/MergeManagerFactory.h b/core/src/db/merge/MergeManagerFactory.h new file mode 100644 index 0000000000..533a321161 --- /dev/null +++ b/core/src/db/merge/MergeManagerFactory.h @@ -0,0 +1,29 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include "MergeManager.h" +#include "db/Options.h" + +#include + +namespace milvus { +namespace engine { + +class MergeManagerFactory { + public: + static MergeManagerPtr + Build(const meta::MetaPtr& meta_ptr, const DBOptions& options); +}; + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/MergeManagerImpl.cpp b/core/src/db/merge/MergeManagerImpl.cpp new file mode 100644 index 0000000000..3ba9667ab4 --- /dev/null +++ b/core/src/db/merge/MergeManagerImpl.cpp @@ -0,0 +1,89 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "db/merge/MergeManagerImpl.h" +#include "db/merge/MergeLayeredStrategy.h" +#include "db/merge/MergeSimpleStrategy.h" +#include "db/merge/MergeStrategy.h" +#include "db/merge/MergeTask.h" +#include "utils/Exception.h" +#include "utils/Log.h" + +namespace milvus { +namespace engine { + +MergeManagerImpl::MergeManagerImpl(const meta::MetaPtr& meta_ptr, const DBOptions& options, MergeStrategyType type) + : meta_ptr_(meta_ptr), options_(options) { + UseStrategy(type); +} + +Status +MergeManagerImpl::UseStrategy(MergeStrategyType type) { + switch (type) { + case MergeStrategyType::SIMPLE: { + strategy_ = std::make_shared(); + break; + } + case MergeStrategyType::LAYERED: { + strategy_ = std::make_shared(); + break; + } + default: { + std::string msg = "Unsupported merge strategy type: " + std::to_string((int32_t)type); + LOG_ENGINE_ERROR_ << msg; + throw Exception(DB_ERROR, msg); + } + } + + return Status::OK(); +} + +Status +MergeManagerImpl::MergeFiles(const std::string& collection_id) { + if (strategy_ == nullptr) { + std::string msg = "No merge strategy specified"; + LOG_ENGINE_ERROR_ << msg; + return Status(DB_ERROR, msg); + } + + meta::FilesHolder files_holder; + auto status = meta_ptr_->FilesToMerge(collection_id, files_holder); + if (!status.ok()) { + LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_id; + return status; + } + + if (files_holder.HoldFiles().size() < 2) { + return Status::OK(); + } + + MergeFilesGroups files_groups; + status = strategy_->RegroupFiles(files_holder, files_groups); + if (!status.ok()) { + LOG_ENGINE_ERROR_ << "Failed to regroup files for: " << collection_id + << ", continue to merge all files into one"; + + MergeTask task(meta_ptr_, options_, files_holder.HoldFiles()); + return task.Execute(); + } + + for (auto& group : files_groups) { + MergeTask task(meta_ptr_, options_, files_holder.HoldFiles()); + status = task.Execute(); + + files_holder.UnmarkFiles(group); + } + + return status; +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/MergeManagerImpl.h b/core/src/db/merge/MergeManagerImpl.h new file mode 100644 index 0000000000..257bf10014 --- /dev/null +++ b/core/src/db/merge/MergeManagerImpl.h @@ -0,0 +1,48 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "db/merge/MergeManager.h" +#include "db/merge/MergeStrategy.h" +#include "utils/Status.h" + +namespace milvus { +namespace engine { + +class MergeManagerImpl : public MergeManager { + public: + MergeManagerImpl(const meta::MetaPtr& meta_ptr, const DBOptions& options, MergeStrategyType type); + + Status + UseStrategy(MergeStrategyType type) override; + + Status + MergeFiles(const std::string& collection_id) override; + + private: + meta::MetaPtr meta_ptr_; + DBOptions options_; + + MergeStrategyPtr strategy_; +}; // MergeManagerImpl + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/MergeSimpleStrategy.cpp b/core/src/db/merge/MergeSimpleStrategy.cpp new file mode 100644 index 0000000000..825d0d4bbe --- /dev/null +++ b/core/src/db/merge/MergeSimpleStrategy.cpp @@ -0,0 +1,25 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "db/merge/MergeSimpleStrategy.h" +#include "utils/Log.h" + +namespace milvus { +namespace engine { + +Status +MergeSimpleStrategy::RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) { + files_groups.push_back(files_holder.HoldFiles()); + return Status::OK(); +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/MergeSimpleStrategy.h b/core/src/db/merge/MergeSimpleStrategy.h new file mode 100644 index 0000000000..3d6406ca29 --- /dev/null +++ b/core/src/db/merge/MergeSimpleStrategy.h @@ -0,0 +1,29 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include + +#include "db/merge/MergeStrategy.h" +#include "utils/Status.h" + +namespace milvus { +namespace engine { + +class MergeSimpleStrategy : public MergeStrategy { + public: + Status + RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) override; +}; // MergeSimpleStrategy + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/MergeStrategy.h b/core/src/db/merge/MergeStrategy.h new file mode 100644 index 0000000000..cb00babb25 --- /dev/null +++ b/core/src/db/merge/MergeStrategy.h @@ -0,0 +1,38 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include "db/Types.h" +#include "db/meta/FilesHolder.h" +#include "utils/Status.h" + +namespace milvus { +namespace engine { + +using MergeFilesGroups = std::vector; + +class MergeStrategy { + public: + virtual Status + RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) = 0; +}; // MergeStrategy + +using MergeStrategyPtr = std::shared_ptr; + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/MergeTask.cpp b/core/src/db/merge/MergeTask.cpp new file mode 100644 index 0000000000..113159593a --- /dev/null +++ b/core/src/db/merge/MergeTask.cpp @@ -0,0 +1,128 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "db/merge/MergeTask.h" +#include "db/Utils.h" +#include "metrics/Metrics.h" +#include "segment/SegmentReader.h" +#include "segment/SegmentWriter.h" +#include "utils/Log.h" + +#include +#include + +namespace milvus { +namespace engine { + +MergeTask::MergeTask(const meta::MetaPtr& meta_ptr, const DBOptions& options, meta::SegmentsSchema& files) + : meta_ptr_(meta_ptr), options_(options), files_(files) { +} + +Status +MergeTask::Execute() { + if (files_.empty()) { + return Status::OK(); + } + + // check input + std::string collection_id = files_.front().collection_id_; + for (auto& file : files_) { + if (file.collection_id_ != collection_id) { + return Status(DB_ERROR, "Cannot merge files across collections"); + } + } + + // step 1: create collection file + meta::SegmentSchema collection_file; + collection_file.collection_id_ = collection_id; + collection_file.file_type_ = meta::SegmentSchema::NEW_MERGE; + Status status = meta_ptr_->CreateCollectionFile(collection_file); + if (!status.ok()) { + LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString(); + return status; + } + + // step 2: merge files + meta::SegmentsSchema updated; + + std::string new_segment_dir; + utils::GetParentPath(collection_file.location_, new_segment_dir); + auto segment_writer_ptr = std::make_shared(new_segment_dir); + + // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal + std::string info = "Merge task files size info:"; + for (auto& file : files_) { + info += std::to_string(file.file_size_); + info += ", "; + + server::CollectMergeFilesMetrics metrics; + std::string segment_dir_to_merge; + utils::GetParentPath(file.location_, segment_dir_to_merge); + segment_writer_ptr->Merge(segment_dir_to_merge, collection_file.file_id_); + + auto file_schema = file; + file_schema.file_type_ = meta::SegmentSchema::TO_DELETE; + updated.push_back(file_schema); + auto size = segment_writer_ptr->Size(); + if (size >= file_schema.index_file_size_) { + break; + } + } + LOG_ENGINE_DEBUG_ << info; + + // step 3: serialize to disk + try { + status = segment_writer_ptr->Serialize(); + } catch (std::exception& ex) { + std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what()); + LOG_ENGINE_ERROR_ << msg; + status = Status(DB_ERROR, msg); + } + + if (!status.ok()) { + LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << status.message(); + + // if failed to serialize merge file to disk + // typical error: out of disk space, out of memory or permission denied + collection_file.file_type_ = meta::SegmentSchema::TO_DELETE; + status = meta_ptr_->UpdateCollectionFile(collection_file); + LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << collection_file.file_id_ + << " to to_delete"; + + return status; + } + + // step 4: update collection files state + // if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size + // else set file type to RAW, no need to build index + if (!utils::IsRawIndexType(collection_file.engine_type_)) { + collection_file.file_type_ = (segment_writer_ptr->Size() >= collection_file.index_file_size_) + ? meta::SegmentSchema::TO_INDEX + : meta::SegmentSchema::RAW; + } else { + collection_file.file_type_ = meta::SegmentSchema::RAW; + } + collection_file.file_size_ = segment_writer_ptr->Size(); + collection_file.row_count_ = segment_writer_ptr->VectorCount(); + updated.push_back(collection_file); + status = meta_ptr_->UpdateCollectionFiles(updated); + LOG_ENGINE_DEBUG_ << "New merged segment " << collection_file.segment_id_ << " of size " + << segment_writer_ptr->Size() << " bytes"; + + if (options_.insert_cache_immediately_) { + segment_writer_ptr->Cache(); + } + + return status; +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/merge/MergeTask.h b/core/src/db/merge/MergeTask.h new file mode 100644 index 0000000000..af0933a665 --- /dev/null +++ b/core/src/db/merge/MergeTask.h @@ -0,0 +1,36 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include "db/merge/MergeManager.h" +#include "db/meta/MetaTypes.h" +#include "utils/Status.h" + +namespace milvus { +namespace engine { + +class MergeTask { + public: + MergeTask(const meta::MetaPtr& meta, const DBOptions& options, meta::SegmentsSchema& files); + + Status + Execute(); + + private: + meta::MetaPtr meta_ptr_; + DBOptions options_; + + meta::SegmentsSchema files_; +}; // MergeTask + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/meta/MySQLMetaImpl.cpp b/core/src/db/meta/MySQLMetaImpl.cpp index 561f0127b8..7b35f92ded 100644 --- a/core/src/db/meta/MySQLMetaImpl.cpp +++ b/core/src/db/meta/MySQLMetaImpl.cpp @@ -2508,7 +2508,8 @@ MySQLMetaImpl::DropAll() { } mysqlpp::Query statement = connectionPtr->query(); - statement << "DROP TABLE IF EXISTS " << TABLES_SCHEMA.name() << ", " << TABLEFILES_SCHEMA.name() << ";"; + statement << "DROP TABLE IF EXISTS " << TABLES_SCHEMA.name() << ", " << TABLEFILES_SCHEMA.name() << ", " + << ENVIRONMENT_SCHEMA.name() << ", " << FIELDS_SCHEMA.name() << ";"; LOG_ENGINE_DEBUG_ << "DropAll: " << statement.str(); diff --git a/core/src/db/meta/SqliteMetaImpl.cpp b/core/src/db/meta/SqliteMetaImpl.cpp index c50f2054a6..f6b4eae1bd 100644 --- a/core/src/db/meta/SqliteMetaImpl.cpp +++ b/core/src/db/meta/SqliteMetaImpl.cpp @@ -1752,6 +1752,8 @@ SqliteMetaImpl::DropAll() { try { ConnectorPtr->drop_table(META_TABLES); ConnectorPtr->drop_table(META_TABLEFILES); + ConnectorPtr->drop_table(META_ENVIRONMENT); + ConnectorPtr->drop_table(META_FIELDS); } catch (std::exception& e) { return HandleException("Encounter exception when drop all meta", e.what()); } diff --git a/core/src/server/delivery/strategy/SearchReqStrategy.cpp b/core/src/server/delivery/strategy/SearchReqStrategy.cpp index 2ab3f6ff1f..3b49ed6964 100644 --- a/core/src/server/delivery/strategy/SearchReqStrategy.cpp +++ b/core/src/server/delivery/strategy/SearchReqStrategy.cpp @@ -41,14 +41,11 @@ SearchReqStrategy::ReScheduleQueue(const BaseRequestPtr& request, std::queueGetRequestType() == BaseRequest::kSearch) { SearchRequestPtr last_search_req = std::static_pointer_cast(last_req); if (SearchCombineRequest::CanCombine(last_search_req, new_search_req)) { - // pop last request - queue.pop(); - // combine request SearchCombineRequestPtr combine_request = std::make_shared(); combine_request->Combine(last_search_req); combine_request->Combine(new_search_req); - queue.push(combine_request); + queue.back() = combine_request; // replace the last request to combine request LOG_SERVER_DEBUG_ << "Combine 2 search request"; } else { // directly put to queue diff --git a/core/unittest/CMakeLists.txt b/core/unittest/CMakeLists.txt index ea655eeb01..a3d085a146 100644 --- a/core/unittest/CMakeLists.txt +++ b/core/unittest/CMakeLists.txt @@ -31,6 +31,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db db_main_files) aux_source_directory(${MILVUS_ENGINE_SRC}/db/engine db_engine_files) aux_source_directory(${MILVUS_ENGINE_SRC}/db/insert db_insert_files) aux_source_directory(${MILVUS_ENGINE_SRC}/db/meta db_meta_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/db/merge db_merge_files) aux_source_directory(${MILVUS_ENGINE_SRC}/db/wal db_wal_files) aux_source_directory(${MILVUS_ENGINE_SRC}/search search_files) aux_source_directory(${MILVUS_ENGINE_SRC}/query query_files) @@ -143,6 +144,7 @@ set(common_files ${db_engine_files} ${db_insert_files} ${db_meta_files} + ${db_merge_files} ${db_wal_files} ${metrics_files} ${thirdparty_files}