From 2d9b358e163e8291ff1e1dc995d4f0c51efbbf7d Mon Sep 17 00:00:00 2001 From: groot Date: Mon, 25 May 2020 01:19:25 -0500 Subject: [PATCH] optimize merge process (#2419) * optimize merge process Signed-off-by: groot * typo Signed-off-by: groot * refine code Signed-off-by: yhmo * drop collecion issue Signed-off-by: yhmo --- core/src/db/merge/MergeAdaptiveStrategy.cpp | 13 +++----- core/src/db/merge/MergeLayeredStrategy.cpp | 33 +++++++++++++++++-- .../request/DropCollectionRequest.cpp | 3 ++ 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/core/src/db/merge/MergeAdaptiveStrategy.cpp b/core/src/db/merge/MergeAdaptiveStrategy.cpp index 4ebcaeaae9..6b948a8737 100644 --- a/core/src/db/merge/MergeAdaptiveStrategy.cpp +++ b/core/src/db/merge/MergeAdaptiveStrategy.cpp @@ -17,14 +17,6 @@ namespace milvus { namespace engine { -namespace { -struct { - bool - operator()(meta::SegmentSchema& left, meta::SegmentSchema& right) const { - return left.file_size_ > right.file_size_; - } -} CompareSegment; -} // namespace Status MergeAdaptiveStrategy::RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) { @@ -54,7 +46,10 @@ MergeAdaptiveStrategy::RegroupFiles(meta::FilesHolder& files_holder, MergeFilesG } // arrange files by file size in descending order - std::sort(sort_files.begin(), sort_files.end(), CompareSegment); + std::sort(sort_files.begin(), sort_files.end(), + [](const meta::SegmentSchema& left, const meta::SegmentSchema& right) { + return left.file_size_ > right.file_size_; + }); // pick files to merge int64_t index_file_size = sort_files[0].index_file_size_; diff --git a/core/src/db/merge/MergeLayeredStrategy.cpp b/core/src/db/merge/MergeLayeredStrategy.cpp index 9568fe46c1..298c9c3491 100644 --- a/core/src/db/merge/MergeLayeredStrategy.cpp +++ b/core/src/db/merge/MergeLayeredStrategy.cpp @@ -14,6 +14,7 @@ #include "db/meta/MetaConsts.h" #include "utils/Log.h" +#include #include #include @@ -34,13 +35,39 @@ MergeLayeredStrategy::RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGr {1UL << 30, meta::SegmentsSchema()}, // 1GB }; - meta::SegmentsSchema& files = files_holder.HoldFiles(); + meta::SegmentsSchema sort_files = files_holder.HoldFiles(); + // no need to merge single file + if (sort_files.size() < 2) { + return Status::OK(); + } + + // arrange files by file size in descending order + std::sort(sort_files.begin(), sort_files.end(), + [](const meta::SegmentSchema& left, const meta::SegmentSchema& right) { + return left.file_size_ > right.file_size_; + }); + + // priority pick files that merge size greater than index_file_size + // to avoid big files such as index_file_size = 1024, merged file size = 1280 + int64_t index_file_size = sort_files[0].index_file_size_; + size_t biggest_size = sort_files[0].file_size_; + for (auto iter = sort_files.end() - 1; iter != sort_files.begin() + 1; --iter) { + if ((*iter).file_size_ + biggest_size > index_file_size) { + meta::SegmentsSchema temp_group = {*sort_files.begin(), *iter}; + files_groups.emplace_back(temp_group); + sort_files.erase(iter); + sort_files.erase(sort_files.begin()); + break; + } + } + meta::SegmentsSchema huge_files; - // iterater from end, because typically the files_holder get files in order from largest to smallest - for (meta::SegmentsSchema::reverse_iterator iter = files.rbegin(); iter != files.rend(); ++iter) { + // put files to layers + for (meta::SegmentsSchema::reverse_iterator iter = sort_files.rbegin(); iter != sort_files.rend(); ++iter) { meta::SegmentSchema& file = *iter; if (file.index_file_size_ > 0 && file.file_size_ > (size_t)(file.index_file_size_)) { // file that no need to merge + files_holder.UnmarkFile(file); continue; } diff --git a/core/src/server/delivery/request/DropCollectionRequest.cpp b/core/src/server/delivery/request/DropCollectionRequest.cpp index ca32a36085..52de5596aa 100644 --- a/core/src/server/delivery/request/DropCollectionRequest.cpp +++ b/core/src/server/delivery/request/DropCollectionRequest.cpp @@ -76,6 +76,9 @@ DropCollectionRequest::OnExecute() { return status; } + // step 4: flush to trigger CleanUpFilesWithTTL + status = DBWrapper::DB()->Flush(); + rc.ElapseFromBegin("total cost"); } catch (std::exception& ex) { return Status(SERVER_UNEXPECTED_ERROR, ex.what());