Merge strategy (#2277)

* merge manager

Signed-off-by: yhmo <yihua.mo@zilliz.com>

* fix build error

Signed-off-by: groot <yihua.mo@zilliz.com>

* layered merge strategy

Signed-off-by: groot <yihua.mo@zilliz.com>

* fix ut

Signed-off-by: groot <yihua.mo@zilliz.com>

* use simple

Signed-off-by: groot <yihua.mo@zilliz.com>

* fix parallel multi collections query crashissue

Signed-off-by: groot <yihua.mo@zilliz.com>
This commit is contained in:
groot 2020-05-10 20:38:50 -05:00 committed by GitHub
parent 369743c10d
commit f32cba6111
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 677 additions and 134 deletions

View File

@ -37,6 +37,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db db_main_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/engine db_engine_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/insert db_insert_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/meta db_meta_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/merge db_merge_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/wal db_wal_files)
set(grpc_service_files
@ -143,6 +144,7 @@ set(engine_files
${db_engine_files}
${db_insert_files}
${db_meta_files}
${db_merge_files}
${db_wal_files}
${metrics_files}
${storage_files}

View File

@ -31,6 +31,7 @@
#include "cache/CpuCacheMgr.h"
#include "cache/GpuCacheMgr.h"
#include "db/IDGenerator.h"
#include "db/merge/MergeManagerFactory.h"
#include "engine/EngineFactory.h"
#include "index/thirdparty/faiss/utils/distances.h"
#include "insert/MemManagerFactory.h"
@ -78,6 +79,7 @@ DBImpl::DBImpl(const DBOptions& options)
: options_(options), initialized_(false), merge_thread_pool_(1, 1), index_thread_pool_(1, 1) {
meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
merge_mgr_ptr_ = MergeManagerFactory::Build(meta_ptr_, options_);
if (options_.wal_enable_) {
wal::MXLogConfiguration mxlog_config;
@ -1906,101 +1908,6 @@ DBImpl::StartMergeTask() {
// LOG_ENGINE_DEBUG_ << "End StartMergeTask";
}
Status
DBImpl::MergeFiles(const std::string& collection_id, meta::FilesHolder& files_holder) {
// const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
LOG_ENGINE_DEBUG_ << "Merge files for collection: " << collection_id;
// step 1: create collection file
meta::SegmentSchema collection_file;
collection_file.collection_id_ = collection_id;
collection_file.file_type_ = meta::SegmentSchema::NEW_MERGE;
Status status = meta_ptr_->CreateCollectionFile(collection_file);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString();
return status;
}
// step 2: merge files
/*
ExecutionEnginePtr index =
EngineFactory::Build(collection_file.dimension_, collection_file.location_,
(EngineType)collection_file.engine_type_, (MetricType)collection_file.metric_type_, collection_file.nlist_);
*/
meta::SegmentsSchema updated;
std::string new_segment_dir;
utils::GetParentPath(collection_file.location_, new_segment_dir);
auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);
// attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles();
for (auto& file : files) {
server::CollectMergeFilesMetrics metrics;
std::string segment_dir_to_merge;
utils::GetParentPath(file.location_, segment_dir_to_merge);
segment_writer_ptr->Merge(segment_dir_to_merge, collection_file.file_id_);
files_holder.UnmarkFile(file);
auto file_schema = file;
file_schema.file_type_ = meta::SegmentSchema::TO_DELETE;
updated.push_back(file_schema);
auto size = segment_writer_ptr->Size();
if (size >= file_schema.index_file_size_) {
break;
}
}
// step 3: serialize to disk
try {
status = segment_writer_ptr->Serialize();
fiu_do_on("DBImpl.MergeFiles.Serialize_ThrowException", throw std::exception());
fiu_do_on("DBImpl.MergeFiles.Serialize_ErrorStatus", status = Status(DB_ERROR, ""));
} catch (std::exception& ex) {
std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << msg;
status = Status(DB_ERROR, msg);
}
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << status.message();
// if failed to serialize merge file to disk
// typical error: out of disk space, out of memory or permission denied
collection_file.file_type_ = meta::SegmentSchema::TO_DELETE;
status = meta_ptr_->UpdateCollectionFile(collection_file);
LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << collection_file.file_id_
<< " to to_delete";
return status;
}
// step 4: update collection files state
// if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
// else set file type to RAW, no need to build index
if (!utils::IsRawIndexType(collection_file.engine_type_)) {
collection_file.file_type_ = (segment_writer_ptr->Size() >= collection_file.index_file_size_)
? meta::SegmentSchema::TO_INDEX
: meta::SegmentSchema::RAW;
} else {
collection_file.file_type_ = meta::SegmentSchema::RAW;
}
collection_file.file_size_ = segment_writer_ptr->Size();
collection_file.row_count_ = segment_writer_ptr->VectorCount();
updated.push_back(collection_file);
status = meta_ptr_->UpdateCollectionFiles(updated);
LOG_ENGINE_DEBUG_ << "New merged segment " << collection_file.segment_id_ << " of size "
<< segment_writer_ptr->Size() << " bytes";
if (options_.insert_cache_immediately_) {
segment_writer_ptr->Cache();
}
return status;
}
Status
DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& files_holder) {
// const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
@ -2096,44 +2003,22 @@ DBImpl::MergeHybridFiles(const std::string& collection_id, meta::FilesHolder& fi
return status;
}
Status
DBImpl::BackgroundMergeFiles(const std::string& collection_id) {
const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
meta::FilesHolder files_holder;
auto status = meta_ptr_->FilesToMerge(collection_id, files_holder);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_id;
return status;
}
if (files_holder.HoldFiles().size() < options_.merge_trigger_number_) {
LOG_ENGINE_TRACE_ << "Files number not greater equal than merge trigger number, skip merge action";
return Status::OK();
}
MergeFiles(collection_id, files_holder);
if (!initialized_.load(std::memory_order_acquire)) {
LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection: " << collection_id;
}
return Status::OK();
}
void
DBImpl::BackgroundMerge(std::set<std::string> collection_ids) {
// LOG_ENGINE_TRACE_ << " Background merge thread start";
Status status;
for (auto& collection_id : collection_ids) {
status = BackgroundMergeFiles(collection_id);
const std::lock_guard<std::mutex> lock(flush_merge_compact_mutex_);
auto status = merge_mgr_ptr_->MergeFiles(collection_id);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Merge files for collection " << collection_id << " failed: " << status.ToString();
LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_id
<< " reason:" << status.message();
}
if (!initialized_.load(std::memory_order_acquire)) {
LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action";
LOG_ENGINE_DEBUG_ << "Server will shutdown, skip merge action for collection: " << collection_id;
break;
}
}

View File

@ -29,6 +29,7 @@
#include "db/IndexFailedChecker.h"
#include "db/Types.h"
#include "db/insert/MemManager.h"
#include "db/merge/MergeManager.h"
#include "db/meta/FilesHolder.h"
#include "utils/ThreadPool.h"
#include "wal/WalManager.h"
@ -226,12 +227,6 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
void
StartMergeTask();
Status
MergeFiles(const std::string& collection_id, meta::FilesHolder& files_holder);
Status
BackgroundMergeFiles(const std::string& collection_id);
void
BackgroundMerge(std::set<std::string> collection_ids);
@ -290,6 +285,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi
meta::MetaPtr meta_ptr_;
MemManagerPtr mem_mgr_;
MergeManagerPtr merge_mgr_ptr_;
std::shared_ptr<wal::WalManager> wal_mgr_;
std::thread bg_wal_thread_;

View File

@ -0,0 +1,138 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/merge/MergeLayeredStrategy.h"
#include "db/Utils.h"
#include "db/meta/MetaConsts.h"
#include "utils/Log.h"
#include <map>
#include <vector>
namespace milvus {
namespace engine {
Status
MergeLayeredStrategy::RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) {
using LayerGroups = std::map<uint64_t, meta::SegmentsSchema>;
// distribute files to groups according to file size(in byte)
LayerGroups layers = {
{1UL << 22, meta::SegmentsSchema()}, // 4MB
{1UL << 24, meta::SegmentsSchema()}, // 16MB
{1UL << 26, meta::SegmentsSchema()}, // 64MB
{1UL << 28, meta::SegmentsSchema()}, // 256MB
{1UL << 30, meta::SegmentsSchema()}, // 1GB
};
meta::SegmentsSchema& files = files_holder.HoldFiles();
meta::SegmentsSchema huge_files;
// iterater from end, because typically the files_holder get files in order from largest to smallest
for (meta::SegmentsSchema::reverse_iterator iter = files.rbegin(); iter != files.rend(); ++iter) {
meta::SegmentSchema& file = *iter;
if (file.index_file_size_ > 0 && file.file_size_ > file.index_file_size_) {
// release file that no need to merge
files_holder.UnmarkFile(file);
continue;
}
bool match = false;
for (auto& pair : layers) {
if ((*iter).file_size_ < pair.first) {
pair.second.push_back(file);
match = true;
break;
}
}
if (!match) {
huge_files.push_back(file);
}
}
const int64_t force_merge_threashold = 60; // force merge files older than 1 minute
auto now = utils::GetMicroSecTimeStamp();
meta::SegmentsSchema force_merge_file;
for (auto& pair : layers) {
// skip empty layer
if (pair.second.empty()) {
continue;
}
// layer has multiple files, merge along with the force_merge_file
if (!force_merge_file.empty()) {
for (auto& file : force_merge_file) {
pair.second.push_back(file);
}
force_merge_file.clear();
}
// layer only has one file, if the file is too old, force merge it, else no need to merge it
if (pair.second.size() == 1) {
if (now - pair.second[0].created_on_ > force_merge_threashold * meta::US_PS) {
force_merge_file.push_back(pair.second[0]);
pair.second.clear();
}
}
}
// if force_merge_file is not allocated by any layer, combine it to huge_files
if (!force_merge_file.empty() && !huge_files.empty()) {
for (auto& file : force_merge_file) {
huge_files.push_back(file);
}
force_merge_file.clear();
}
// return result
for (auto& pair : layers) {
if (pair.second.size() == 1) {
// release file that no need to merge
files_holder.UnmarkFile(pair.second[0]);
} else if (pair.second.size() > 1) {
// create group
meta::SegmentsSchema temp_files;
temp_files.swap(pair.second);
files_groups.emplace_back(temp_files);
}
}
if (huge_files.size() >= 1) {
meta::SegmentsSchema temp_files;
temp_files.swap(huge_files);
for (auto& file : force_merge_file) {
temp_files.push_back(file);
}
if (temp_files.size() >= 2) {
// create group
files_groups.emplace_back(temp_files);
} else {
for (auto& file : huge_files) {
// release file that no need to merge
files_holder.UnmarkFile(file);
}
for (auto& file : force_merge_file) {
// release file that no need to merge
files_holder.UnmarkFile(file);
}
}
} else {
for (auto& file : force_merge_file) {
// release file that no need to merge
files_holder.UnmarkFile(file);
}
}
return Status::OK();
}
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,29 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <vector>
#include "db/merge/MergeStrategy.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
class MergeLayeredStrategy : public MergeStrategy {
public:
Status
RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) override;
}; // MergeLayeredStrategy
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,43 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <memory>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
#include "db/Types.h"
#include "db/meta/FilesHolder.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
enum class MergeStrategyType {
SIMPLE = 1,
LAYERED = 2,
};
class MergeManager {
public:
virtual Status
UseStrategy(MergeStrategyType type) = 0;
virtual Status
MergeFiles(const std::string& collection_id) = 0;
}; // MergeManager
using MergeManagerPtr = std::shared_ptr<MergeManager>;
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,26 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/merge/MergeManagerFactory.h"
#include "db/merge/MergeManagerImpl.h"
#include "utils/Exception.h"
#include "utils/Log.h"
namespace milvus {
namespace engine {
MergeManagerPtr
MergeManagerFactory::Build(const meta::MetaPtr& meta_ptr, const DBOptions& options) {
return std::make_shared<MergeManagerImpl>(meta_ptr, options, MergeStrategyType::SIMPLE);
}
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,29 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include "MergeManager.h"
#include "db/Options.h"
#include <memory>
namespace milvus {
namespace engine {
class MergeManagerFactory {
public:
static MergeManagerPtr
Build(const meta::MetaPtr& meta_ptr, const DBOptions& options);
};
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,89 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/merge/MergeManagerImpl.h"
#include "db/merge/MergeLayeredStrategy.h"
#include "db/merge/MergeSimpleStrategy.h"
#include "db/merge/MergeStrategy.h"
#include "db/merge/MergeTask.h"
#include "utils/Exception.h"
#include "utils/Log.h"
namespace milvus {
namespace engine {
MergeManagerImpl::MergeManagerImpl(const meta::MetaPtr& meta_ptr, const DBOptions& options, MergeStrategyType type)
: meta_ptr_(meta_ptr), options_(options) {
UseStrategy(type);
}
Status
MergeManagerImpl::UseStrategy(MergeStrategyType type) {
switch (type) {
case MergeStrategyType::SIMPLE: {
strategy_ = std::make_shared<MergeSimpleStrategy>();
break;
}
case MergeStrategyType::LAYERED: {
strategy_ = std::make_shared<MergeLayeredStrategy>();
break;
}
default: {
std::string msg = "Unsupported merge strategy type: " + std::to_string((int32_t)type);
LOG_ENGINE_ERROR_ << msg;
throw Exception(DB_ERROR, msg);
}
}
return Status::OK();
}
Status
MergeManagerImpl::MergeFiles(const std::string& collection_id) {
if (strategy_ == nullptr) {
std::string msg = "No merge strategy specified";
LOG_ENGINE_ERROR_ << msg;
return Status(DB_ERROR, msg);
}
meta::FilesHolder files_holder;
auto status = meta_ptr_->FilesToMerge(collection_id, files_holder);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to get merge files for collection: " << collection_id;
return status;
}
if (files_holder.HoldFiles().size() < 2) {
return Status::OK();
}
MergeFilesGroups files_groups;
status = strategy_->RegroupFiles(files_holder, files_groups);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to regroup files for: " << collection_id
<< ", continue to merge all files into one";
MergeTask task(meta_ptr_, options_, files_holder.HoldFiles());
return task.Execute();
}
for (auto& group : files_groups) {
MergeTask task(meta_ptr_, options_, files_holder.HoldFiles());
status = task.Execute();
files_holder.UnmarkFiles(group);
}
return status;
}
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,48 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <ctime>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
#include "db/merge/MergeManager.h"
#include "db/merge/MergeStrategy.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
class MergeManagerImpl : public MergeManager {
public:
MergeManagerImpl(const meta::MetaPtr& meta_ptr, const DBOptions& options, MergeStrategyType type);
Status
UseStrategy(MergeStrategyType type) override;
Status
MergeFiles(const std::string& collection_id) override;
private:
meta::MetaPtr meta_ptr_;
DBOptions options_;
MergeStrategyPtr strategy_;
}; // MergeManagerImpl
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,25 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/merge/MergeSimpleStrategy.h"
#include "utils/Log.h"
namespace milvus {
namespace engine {
Status
MergeSimpleStrategy::RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) {
files_groups.push_back(files_holder.HoldFiles());
return Status::OK();
}
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,29 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <vector>
#include "db/merge/MergeStrategy.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
class MergeSimpleStrategy : public MergeStrategy {
public:
Status
RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) override;
}; // MergeSimpleStrategy
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,38 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <memory>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
#include "db/Types.h"
#include "db/meta/FilesHolder.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
using MergeFilesGroups = std::vector<meta::SegmentsSchema>;
class MergeStrategy {
public:
virtual Status
RegroupFiles(meta::FilesHolder& files_holder, MergeFilesGroups& files_groups) = 0;
}; // MergeStrategy
using MergeStrategyPtr = std::shared_ptr<MergeStrategy>;
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,128 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "db/merge/MergeTask.h"
#include "db/Utils.h"
#include "metrics/Metrics.h"
#include "segment/SegmentReader.h"
#include "segment/SegmentWriter.h"
#include "utils/Log.h"
#include <memory>
#include <string>
namespace milvus {
namespace engine {
MergeTask::MergeTask(const meta::MetaPtr& meta_ptr, const DBOptions& options, meta::SegmentsSchema& files)
: meta_ptr_(meta_ptr), options_(options), files_(files) {
}
Status
MergeTask::Execute() {
if (files_.empty()) {
return Status::OK();
}
// check input
std::string collection_id = files_.front().collection_id_;
for (auto& file : files_) {
if (file.collection_id_ != collection_id) {
return Status(DB_ERROR, "Cannot merge files across collections");
}
}
// step 1: create collection file
meta::SegmentSchema collection_file;
collection_file.collection_id_ = collection_id;
collection_file.file_type_ = meta::SegmentSchema::NEW_MERGE;
Status status = meta_ptr_->CreateCollectionFile(collection_file);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to create collection: " << status.ToString();
return status;
}
// step 2: merge files
meta::SegmentsSchema updated;
std::string new_segment_dir;
utils::GetParentPath(collection_file.location_, new_segment_dir);
auto segment_writer_ptr = std::make_shared<segment::SegmentWriter>(new_segment_dir);
// attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal
std::string info = "Merge task files size info:";
for (auto& file : files_) {
info += std::to_string(file.file_size_);
info += ", ";
server::CollectMergeFilesMetrics metrics;
std::string segment_dir_to_merge;
utils::GetParentPath(file.location_, segment_dir_to_merge);
segment_writer_ptr->Merge(segment_dir_to_merge, collection_file.file_id_);
auto file_schema = file;
file_schema.file_type_ = meta::SegmentSchema::TO_DELETE;
updated.push_back(file_schema);
auto size = segment_writer_ptr->Size();
if (size >= file_schema.index_file_size_) {
break;
}
}
LOG_ENGINE_DEBUG_ << info;
// step 3: serialize to disk
try {
status = segment_writer_ptr->Serialize();
} catch (std::exception& ex) {
std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
LOG_ENGINE_ERROR_ << msg;
status = Status(DB_ERROR, msg);
}
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Failed to persist merged segment: " << new_segment_dir << ". Error: " << status.message();
// if failed to serialize merge file to disk
// typical error: out of disk space, out of memory or permission denied
collection_file.file_type_ = meta::SegmentSchema::TO_DELETE;
status = meta_ptr_->UpdateCollectionFile(collection_file);
LOG_ENGINE_DEBUG_ << "Failed to update file to index, mark file: " << collection_file.file_id_
<< " to to_delete";
return status;
}
// step 4: update collection files state
// if index type isn't IDMAP, set file type to TO_INDEX if file size exceed index_file_size
// else set file type to RAW, no need to build index
if (!utils::IsRawIndexType(collection_file.engine_type_)) {
collection_file.file_type_ = (segment_writer_ptr->Size() >= collection_file.index_file_size_)
? meta::SegmentSchema::TO_INDEX
: meta::SegmentSchema::RAW;
} else {
collection_file.file_type_ = meta::SegmentSchema::RAW;
}
collection_file.file_size_ = segment_writer_ptr->Size();
collection_file.row_count_ = segment_writer_ptr->VectorCount();
updated.push_back(collection_file);
status = meta_ptr_->UpdateCollectionFiles(updated);
LOG_ENGINE_DEBUG_ << "New merged segment " << collection_file.segment_id_ << " of size "
<< segment_writer_ptr->Size() << " bytes";
if (options_.insert_cache_immediately_) {
segment_writer_ptr->Cache();
}
return status;
}
} // namespace engine
} // namespace milvus

View File

@ -0,0 +1,36 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include "db/merge/MergeManager.h"
#include "db/meta/MetaTypes.h"
#include "utils/Status.h"
namespace milvus {
namespace engine {
class MergeTask {
public:
MergeTask(const meta::MetaPtr& meta, const DBOptions& options, meta::SegmentsSchema& files);
Status
Execute();
private:
meta::MetaPtr meta_ptr_;
DBOptions options_;
meta::SegmentsSchema files_;
}; // MergeTask
} // namespace engine
} // namespace milvus

View File

@ -2508,7 +2508,8 @@ MySQLMetaImpl::DropAll() {
}
mysqlpp::Query statement = connectionPtr->query();
statement << "DROP TABLE IF EXISTS " << TABLES_SCHEMA.name() << ", " << TABLEFILES_SCHEMA.name() << ";";
statement << "DROP TABLE IF EXISTS " << TABLES_SCHEMA.name() << ", " << TABLEFILES_SCHEMA.name() << ", "
<< ENVIRONMENT_SCHEMA.name() << ", " << FIELDS_SCHEMA.name() << ";";
LOG_ENGINE_DEBUG_ << "DropAll: " << statement.str();

View File

@ -1752,6 +1752,8 @@ SqliteMetaImpl::DropAll() {
try {
ConnectorPtr->drop_table(META_TABLES);
ConnectorPtr->drop_table(META_TABLEFILES);
ConnectorPtr->drop_table(META_ENVIRONMENT);
ConnectorPtr->drop_table(META_FIELDS);
} catch (std::exception& e) {
return HandleException("Encounter exception when drop all meta", e.what());
}

View File

@ -41,14 +41,11 @@ SearchReqStrategy::ReScheduleQueue(const BaseRequestPtr& request, std::queue<Bas
if (last_req->GetRequestType() == BaseRequest::kSearch) {
SearchRequestPtr last_search_req = std::static_pointer_cast<SearchRequest>(last_req);
if (SearchCombineRequest::CanCombine(last_search_req, new_search_req)) {
// pop last request
queue.pop();
// combine request
SearchCombineRequestPtr combine_request = std::make_shared<SearchCombineRequest>();
combine_request->Combine(last_search_req);
combine_request->Combine(new_search_req);
queue.push(combine_request);
queue.back() = combine_request; // replace the last request to combine request
LOG_SERVER_DEBUG_ << "Combine 2 search request";
} else {
// directly put to queue

View File

@ -31,6 +31,7 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db db_main_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/engine db_engine_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/insert db_insert_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/meta db_meta_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/merge db_merge_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/wal db_wal_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/search search_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/query query_files)
@ -143,6 +144,7 @@ set(common_files
${db_engine_files}
${db_insert_files}
${db_meta_files}
${db_merge_files}
${db_wal_files}
${metrics_files}
${thirdparty_files}