diff --git a/core/src/db/Constants.h b/core/src/db/Constants.h index 30d72f359a..bdabe1bce8 100644 --- a/core/src/db/Constants.h +++ b/core/src/db/Constants.h @@ -28,6 +28,7 @@ constexpr int64_t MAX_DIMENSION = 32768; constexpr int32_t MAX_SEGMENT_ROW_COUNT = 4 * 1024 * 1024; constexpr int64_t DEFAULT_SEGMENT_ROW_COUNT = 100000; // default row count per segment when creating collection constexpr int64_t MAX_INSERT_DATA_SIZE = 256 * MB; +constexpr int64_t MAX_WAL_FILE_SIZE = 256 * MB; } // namespace engine } // namespace milvus diff --git a/core/src/db/DB.h b/core/src/db/DB.h index 397c6ce22a..6a25e8e832 100644 --- a/core/src/db/DB.h +++ b/core/src/db/DB.h @@ -94,7 +94,7 @@ class DB { // op_id is for wal machinery, this id will be used in MemManager virtual Status Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, - id_t op_id = 0) = 0; + idx_t op_id = 0) = 0; virtual Status GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, @@ -103,7 +103,7 @@ class DB { // op_id is for wal machinery, this id will be used in MemManager virtual Status - DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id = 0) = 0; + DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, idx_t op_id = 0) = 0; virtual Status ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) = 0; diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index e95412664c..0ce4c11297 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -413,7 +413,7 @@ DBImpl::DescribeIndex(const std::string& collection_name, const std::string& fie Status DBImpl::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, - id_t op_id) { + idx_t op_id) { CHECK_INITIALIZED; if (data_chunk == nullptr) { @@ -510,7 +510,7 @@ DBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_ar } Status -DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) { +DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, idx_t op_id) { CHECK_INITIALIZED; snapshot::ScopedSnapshotT ss; diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index 228dd5eb52..0c33a171d7 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -87,7 +87,7 @@ class DBImpl : public DB, public ConfigObserver { Status Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, - id_t op_id) override; + idx_t op_id) override; Status GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, @@ -95,7 +95,7 @@ class DBImpl : public DB, public ConfigObserver { DataChunkPtr& data_chunk) override; Status - DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) override; + DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, idx_t op_id) override; Status Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) override; diff --git a/core/src/db/DBProxy.cpp b/core/src/db/DBProxy.cpp index 179a8fef58..9d76cfb763 100644 --- a/core/src/db/DBProxy.cpp +++ b/core/src/db/DBProxy.cpp @@ -122,7 +122,7 @@ DBProxy::DescribeIndex(const std::string& collection_name, const std::string& fi Status DBProxy::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, - id_t op_id) { + idx_t op_id) { DB_CHECK return db_->Insert(collection_name, partition_name, data_chunk, op_id); } @@ -136,7 +136,7 @@ DBProxy::GetEntityByID(const std::string& collection_name, const IDNumbers& id_a } Status -DBProxy::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) { +DBProxy::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, idx_t op_id) { DB_CHECK return db_->DeleteEntityByID(collection_name, entity_ids, op_id); } diff --git a/core/src/db/DBProxy.h b/core/src/db/DBProxy.h index ae9d0deede..ca03a4ee08 100644 --- a/core/src/db/DBProxy.h +++ b/core/src/db/DBProxy.h @@ -76,7 +76,7 @@ class DBProxy : public DB { Status Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, - id_t op_id) override; + idx_t op_id) override; Status GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, @@ -84,7 +84,7 @@ class DBProxy : public DB { DataChunkPtr& data_chunk) override; Status - DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) override; + DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, idx_t op_id) override; Status ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) override; diff --git a/core/src/db/IDGenerator.cpp b/core/src/db/IDGenerator.cpp index 8849f67afc..4bdb62cfcf 100644 --- a/core/src/db/IDGenerator.cpp +++ b/core/src/db/IDGenerator.cpp @@ -25,7 +25,7 @@ IDGenerator::~IDGenerator() = default; constexpr size_t SimpleIDGenerator::MAX_IDS_PER_MICRO; -id_t +idx_t SimpleIDGenerator::GetNextIDNumber() { auto now = std::chrono::system_clock::now(); auto micros = std::chrono::duration_cast(now.time_since_epoch()).count(); @@ -61,7 +61,7 @@ SimpleIDGenerator::GetNextIDNumbers(size_t n, IDNumbers& ids) { return Status::OK(); } -id_t +idx_t SafeIDGenerator::GetNextIDNumber() { auto now = std::chrono::system_clock::now(); auto micros = std::chrono::duration_cast(now.time_since_epoch()).count(); diff --git a/core/src/db/IDGenerator.h b/core/src/db/IDGenerator.h index c44f27872a..546231adeb 100644 --- a/core/src/db/IDGenerator.h +++ b/core/src/db/IDGenerator.h @@ -23,7 +23,7 @@ namespace engine { class IDGenerator { public: - virtual id_t + virtual idx_t GetNextIDNumber() = 0; virtual Status @@ -36,7 +36,7 @@ class SimpleIDGenerator : public IDGenerator { public: ~SimpleIDGenerator() override = default; - id_t + idx_t GetNextIDNumber() override; Status @@ -60,7 +60,7 @@ class SafeIDGenerator : public IDGenerator { SafeIDGenerator() = default; ~SafeIDGenerator() override = default; - id_t + idx_t GetNextIDNumber() override; Status diff --git a/core/src/db/Types.h b/core/src/db/Types.h index b42366fb09..aac937f3f2 100644 --- a/core/src/db/Types.h +++ b/core/src/db/Types.h @@ -48,11 +48,11 @@ extern const char* DEFAULT_STRUCTURED_INDEX; extern const char* DEFAULT_PARTITON_TAG; /////////////////////////////////////////////////////////////////////////////////////////////////// -using id_t = int64_t; +using idx_t = int64_t; using offset_t = int32_t; using date_t = int32_t; -using IDNumbers = std::vector; +using IDNumbers = std::vector; using VectorDistance = faiss::Index::distance_t; using VectorDistances = std::vector; diff --git a/core/src/db/Utils.cpp b/core/src/db/Utils.cpp index 5459f092fd..ef6ff53595 100644 --- a/core/src/db/Utils.cpp +++ b/core/src/db/Utils.cpp @@ -141,7 +141,7 @@ GetIDFromChunk(const engine::DataChunkPtr& chunk, engine::IDNumbers& ids) { } if (!pair->second->data_.empty()) { - ids.resize(pair->second->data_.size() / sizeof(engine::id_t)); + ids.resize(pair->second->data_.size() / sizeof(engine::idx_t)); memcpy((void*)(ids.data()), pair->second->data_.data(), pair->second->data_.size()); } } diff --git a/core/src/db/engine/ExecutionEngineImpl.cpp b/core/src/db/engine/ExecutionEngineImpl.cpp index 2e8c22a09b..dbdbba2608 100644 --- a/core/src/db/engine/ExecutionEngineImpl.cpp +++ b/core/src/db/engine/ExecutionEngineImpl.cpp @@ -226,7 +226,7 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { } void -MapAndCopyResult(const knowhere::DatasetPtr& dataset, const std::vector& uids, int64_t nq, int64_t k, +MapAndCopyResult(const knowhere::DatasetPtr& dataset, const std::vector& uids, int64_t nq, int64_t k, float* distances, int64_t* labels) { int64_t* res_ids = dataset->Get(knowhere::meta::IDS); float* res_dist = dataset->Get(knowhere::meta::DISTANCE); @@ -791,7 +791,7 @@ ExecutionEngineImpl::BuildKnowhereIndex(const std::string& field_name, const Col } LOG_ENGINE_DEBUG_ << "Index config: " << conf.dump(); - std::vector uids; + std::vector uids; faiss::ConcurrentBitsetPtr blacklist; if (from_index) { auto dataset = diff --git a/core/src/db/insert/MemCollection.cpp b/core/src/db/insert/MemCollection.cpp index 9231e5e101..94b49f59c7 100644 --- a/core/src/db/insert/MemCollection.cpp +++ b/core/src/db/insert/MemCollection.cpp @@ -75,7 +75,7 @@ MemCollection::Add(int64_t partition_id, const milvus::engine::VectorSourcePtr& } Status -MemCollection::Delete(const std::vector& ids) { +MemCollection::Delete(const std::vector& ids) { // Locate which collection file the doc id lands in { std::lock_guard lock(mutex_); @@ -182,7 +182,7 @@ MemCollection::ApplyDeletes() { std::make_shared(options_.meta_.path_, seg_visitor); // Step 1: Check delete_id in mem - std::vector delete_ids; + std::vector delete_ids; { segment::IdBloomFilterPtr pre_bloom_filter; STATUS_CHECK(segment_reader->LoadBloomFilter(pre_bloom_filter)); @@ -197,11 +197,11 @@ MemCollection::ApplyDeletes() { } } - std::vector uids; + std::vector uids; STATUS_CHECK(segment_reader->LoadUids(uids)); std::sort(delete_ids.begin(), delete_ids.end()); - std::set ids_to_check(delete_ids.begin(), delete_ids.end()); + std::set ids_to_check(delete_ids.begin(), delete_ids.end()); // Step 2: Mark previous deleted docs file and bloom filter file stale auto& field_visitors_map = seg_visitor->GetFieldVisitors(); diff --git a/core/src/db/insert/MemCollection.h b/core/src/db/insert/MemCollection.h index d253f1ac83..98680ab06a 100644 --- a/core/src/db/insert/MemCollection.h +++ b/core/src/db/insert/MemCollection.h @@ -40,7 +40,7 @@ class MemCollection { Add(int64_t partition_id, const VectorSourcePtr& source); Status - Delete(const std::vector& ids); + Delete(const std::vector& ids); Status EraseMem(int64_t partition_id); @@ -73,7 +73,7 @@ class MemCollection { std::mutex mutex_; - std::set doc_ids_to_delete_; + std::set doc_ids_to_delete_; std::atomic lsn_; }; // SSMemCollection diff --git a/core/src/db/insert/MemManager.h b/core/src/db/insert/MemManager.h index c0a6c3c08f..606d5afdb2 100644 --- a/core/src/db/insert/MemManager.h +++ b/core/src/db/insert/MemManager.h @@ -30,7 +30,7 @@ class MemManager { InsertEntities(int64_t collection_id, int64_t partition_id, const DataChunkPtr& chunk, uint64_t lsn) = 0; virtual Status - DeleteEntities(int64_t collection_id, const std::vector& entity_ids, uint64_t lsn) = 0; + DeleteEntities(int64_t collection_id, const std::vector& entity_ids, uint64_t lsn) = 0; virtual Status Flush(int64_t collection_id) = 0; diff --git a/core/src/db/insert/MemManagerImpl.cpp b/core/src/db/insert/MemManagerImpl.cpp index 3689b4e577..fe0cf083a2 100644 --- a/core/src/db/insert/MemManagerImpl.cpp +++ b/core/src/db/insert/MemManagerImpl.cpp @@ -150,7 +150,7 @@ MemManagerImpl::InsertEntitiesNoLock(int64_t collection_id, int64_t partition_id } Status -MemManagerImpl::DeleteEntities(int64_t collection_id, const std::vector& entity_ids, uint64_t lsn) { +MemManagerImpl::DeleteEntities(int64_t collection_id, const std::vector& entity_ids, uint64_t lsn) { std::unique_lock lock(mutex_); MemCollectionPtr mem = GetMemByCollection(collection_id); diff --git a/core/src/db/insert/MemManagerImpl.h b/core/src/db/insert/MemManagerImpl.h index 905d4273c4..0106e70c92 100644 --- a/core/src/db/insert/MemManagerImpl.h +++ b/core/src/db/insert/MemManagerImpl.h @@ -42,7 +42,7 @@ class MemManagerImpl : public MemManager { InsertEntities(int64_t collection_id, int64_t partition_id, const DataChunkPtr& chunk, uint64_t lsn) override; Status - DeleteEntities(int64_t collection_id, const std::vector& entity_ids, uint64_t lsn) override; + DeleteEntities(int64_t collection_id, const std::vector& entity_ids, uint64_t lsn) override; Status Flush(int64_t collection_id) override; diff --git a/core/src/db/insert/MemSegment.cpp b/core/src/db/insert/MemSegment.cpp index 2160195dbc..7879342986 100644 --- a/core/src/db/insert/MemSegment.cpp +++ b/core/src/db/insert/MemSegment.cpp @@ -197,12 +197,12 @@ MemSegment::Add(const VectorSourcePtr& source) { } Status -MemSegment::Delete(const std::vector& ids) { +MemSegment::Delete(const std::vector& ids) { engine::SegmentPtr segment_ptr; segment_writer_ptr_->GetSegment(segment_ptr); // Check wither the doc_id is present, if yes, delete it's corresponding buffer - std::vector uids; + std::vector uids; segment_writer_ptr_->LoadUids(uids); std::vector offsets; diff --git a/core/src/db/insert/MemSegment.h b/core/src/db/insert/MemSegment.h index ea5c074d56..1229a0d4e5 100644 --- a/core/src/db/insert/MemSegment.h +++ b/core/src/db/insert/MemSegment.h @@ -39,7 +39,7 @@ class MemSegment { Add(const VectorSourcePtr& source); Status - Delete(const std::vector& ids); + Delete(const std::vector& ids); int64_t GetCurrentMem(); diff --git a/core/src/db/transcript/TranscriptProxy.cpp b/core/src/db/transcript/TranscriptProxy.cpp index 96869496aa..9c0e2d6efd 100644 --- a/core/src/db/transcript/TranscriptProxy.cpp +++ b/core/src/db/transcript/TranscriptProxy.cpp @@ -137,7 +137,7 @@ TranscriptProxy::DescribeIndex(const std::string& collection_name, const std::st Status TranscriptProxy::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, - id_t op_id) { + idx_t op_id) { return db_->Insert(collection_name, partition_name, data_chunk); } @@ -149,7 +149,8 @@ TranscriptProxy::GetEntityByID(const std::string& collection_name, const IDNumbe } Status -TranscriptProxy::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) { +TranscriptProxy::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, + idx_t op_id) { return db_->DeleteEntityByID(collection_name, entity_ids); } diff --git a/core/src/db/transcript/TranscriptProxy.h b/core/src/db/transcript/TranscriptProxy.h index 19e6743abe..1611f9ead9 100644 --- a/core/src/db/transcript/TranscriptProxy.h +++ b/core/src/db/transcript/TranscriptProxy.h @@ -76,7 +76,7 @@ class TranscriptProxy : public DBProxy { Status Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, - id_t op_id) override; + idx_t op_id) override; Status GetEntityByID(const std::string& collection_name, const IDNumbers& id_array, @@ -84,7 +84,7 @@ class TranscriptProxy : public DBProxy { DataChunkPtr& data_chunk) override; Status - DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) override; + DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, idx_t op_id) override; Status ListIDInSegment(const std::string& collection_name, int64_t segment_id, IDNumbers& entity_ids) override; diff --git a/core/src/db/wal/WalFile.cpp b/core/src/db/wal/WalFile.cpp new file mode 100644 index 0000000000..08e0e58b71 --- /dev/null +++ b/core/src/db/wal/WalFile.cpp @@ -0,0 +1,96 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "db/wal/WalFile.h" +#include "db/Constants.h" +#include "db/Types.h" + +#include + +namespace milvus { +namespace engine { + +WalFile::WalFile() { +} + +WalFile::~WalFile() { + CloseFile(); +} + +Status +WalFile::OpenFile(const std::string& path, OpenMode mode) { + CloseFile(); + + try { + std::string str_mode = (mode == OpenMode::READ) ? "rb" : "awb"; + file_ = fopen(path.c_str(), str_mode.c_str()); + if (file_ == nullptr) { + std::string msg = "Failed to create wal file: " + path; + return Status(DB_ERROR, msg); + } + file_path_ = path; + mode_ = mode; + } catch (std::exception& ex) { + std::string msg = "Failed to create wal file, reason: " + std::string(ex.what()); + return Status(DB_ERROR, msg); + } + + return Status::OK(); +} + +Status +WalFile::CloseFile() { + if (file_ != nullptr) { + fclose(file_); + file_ = nullptr; + file_size_ = 0; + file_path_ = ""; + } + + return Status::OK(); +} + +bool +WalFile::ExceedMaxSize(int64_t append_size) { + return (file_size_ + append_size) > MAX_WAL_FILE_SIZE; +} + +Status +WalFile::ReadLastOpId(idx_t& op_id) { + op_id = std::numeric_limits::max(); + if (file_ == nullptr || mode_ != OpenMode::READ) { + return Status(DB_ERROR, "File not opened or not read mode"); + } + + // current position + auto cur_poz = ftell(file_); + + // get total lenth + fseek(file_, 0, SEEK_END); + auto end_poz = ftell(file_); + + // read last id + idx_t last_id = 0; + int64_t offset = end_poz - sizeof(last_id); + fseek(file_, offset, SEEK_SET); + + int64_t bytes = fread(&last_id, 1, sizeof(last_id), file_); + if (bytes == sizeof(op_id)) { + op_id = last_id; + } + + // back to current postiion + fseek(file_, cur_poz, SEEK_SET); + return Status::OK(); +} + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/wal/WalFile.h b/core/src/db/wal/WalFile.h new file mode 100644 index 0000000000..0de4e07790 --- /dev/null +++ b/core/src/db/wal/WalFile.h @@ -0,0 +1,144 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include "db/Types.h" +#include "utils/Status.h" + +#include +#include +#include +#include + +namespace milvus { +namespace engine { + +class WalFile { + public: + WalFile(); + ~WalFile(); + + bool + IsOpened() const { + return file_ != nullptr; + } + + enum OpenMode { + NA = 0, + READ = 1, + OVER_WRITE = 2, + APPEND_WRITE = 3, + }; + Status + OpenFile(const std::string& path, OpenMode mode); + + Status + CloseFile(); + + bool + ExceedMaxSize(int64_t append_size); + + template + inline int64_t + Write(T* value) { + if (file_ == nullptr) { + return 0; + } + + int64_t bytes = fwrite(value, 1, sizeof(T), file_); + file_size_ += bytes; + return bytes; + } + + inline int64_t + Write(void* data, int64_t length) { + if (file_ == nullptr) { + return 0; + } + + int64_t bytes = fwrite(data, 1, length, file_); + file_size_ += bytes; + return bytes; + } + + template + inline int64_t + Read(T* value) { + if (file_ == nullptr) { + return 0; + } + + int64_t bytes = fread(value, 1, sizeof(T), file_); + return bytes; + } + + inline int64_t + Read(void* data, int64_t length) { + if (file_ == nullptr) { + return 0; + } + + int64_t bytes = fread(data, 1, length, file_); + return bytes; + } + + inline int64_t + ReadStr(std::string& str, int64_t length) { + if (file_ == nullptr || length <= 0) { + return 0; + } + + char* buf = new char[length + 1]; + int64_t bytes = fread(buf, 1, length, file_); + buf[length] = '\0'; + str = buf; + return bytes; + } + + inline void + Flush() { + if (file_ && mode_ != OpenMode::READ) { + fflush(file_); + } + } + + int64_t + Size() const { + return file_size_; + } + std::string + Path() const { + return file_path_; + } + + Status + ReadLastOpId(idx_t& op_id); + + void inline SeekForward(int64_t offset) { + if (file_ == nullptr || mode_ != OpenMode::READ) { + return; + } + + fseek(file_, offset, SEEK_CUR); + } + + private: + FILE* file_ = nullptr; + OpenMode mode_ = OpenMode::NA; + int64_t file_size_ = 0; + std::string file_path_; +}; + +using WalFilePtr = std::shared_ptr; + +} // namespace engine +} // namespace milvus diff --git a/core/src/db/wal/WalManager.cpp b/core/src/db/wal/WalManager.cpp index c3c91264e0..86003c6076 100644 --- a/core/src/db/wal/WalManager.cpp +++ b/core/src/db/wal/WalManager.cpp @@ -10,16 +10,23 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "db/wal/WalManager.h" -#include "config/ServerConfig.h" #include "db/Utils.h" +#include "db/wal/WalOperationCodec.h" +#include "utils/CommonUtil.h" + +#include +#include +#include +#include + +#include namespace milvus { namespace engine { -WalManager::WalManager() { - wal_path_ = config.wal.path(); - wal_buffer_size_ = config.wal.buffer_size(); - insert_buffer_size_ = config.cache.insert_buffer_size(); +const char* MAX_OP_ID_FILE_NAME = "max_op"; + +WalManager::WalManager() : cleanup_thread_pool_(1, 1) { } WalManager& @@ -28,6 +35,32 @@ WalManager::GetInstance() { return s_mgr; } +Status +WalManager::Start(const DBOptions& options) { + enable_ = options.wal_enable_; + wal_path_ = options.meta_.path_; + insert_buffer_size_ = options.insert_buffer_size_; + + CommonUtil::CreateDirectory(wal_path_); + + auto status = ReadMaxOpId(); + if (!status.ok()) { + return status; + } + + return Status::OK(); +} + +Status +WalManager::Stop() { + std::lock_guard lck(cleanup_thread_mutex_); + for (auto& iter : cleanup_thread_results_) { + iter.wait(); + } + + return Status::OK(); +} + Status WalManager::RecordOperation(const WalOperationPtr& operation, const DBPtr& db) { if (operation == nullptr) { @@ -50,6 +83,128 @@ WalManager::RecordOperation(const WalOperationPtr& operation, const DBPtr& db) { break; } + if (!status.ok()) { + LOG_ENGINE_DEBUG_ << "Failed to record wal opertiaon: " << status.message(); + } + + return status; +} + +Status +WalManager::OperationDone(const std::string& collection_name, idx_t op_id) { + if (!enable_) { + return Status::OK(); + } + + bool start_clecnup = false; + { + // record max operation id for each collection + std::lock_guard lock(max_op_mutex_); + idx_t last_id = max_op_id_map_[collection_name]; + if (op_id > last_id) { + max_op_id_map_[collection_name] = op_id; + start_clecnup = true; + + // write max op id to disk + std::string path = ConstructFilePath(collection_name, MAX_OP_ID_FILE_NAME); + WalFile file; + file.OpenFile(path, WalFile::OVER_WRITE); + file.Write(&op_id); + } + } + + if (start_clecnup) { + StartCleanupThread(collection_name); + } + + return Status::OK(); +} + +Status +WalManager::Recovery(const DBPtr& db) { + using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator; + DirectoryIterator iter_outer(wal_path_); + DirectoryIterator end_outer; + for (; iter_outer != end_outer; ++iter_outer) { + auto path_outer = (*iter_outer).path(); + if (!std::experimental::filesystem::is_directory(path_outer)) { + continue; + } + + std::string collection_name = path_outer.filename().c_str(); + + // iterate files + std::map id_files; + DirectoryIterator iter_inner(path_outer); + DirectoryIterator end_inner; + for (; iter_inner != end_inner; ++iter_inner) { + auto path_inner = (*iter_inner).path(); + std::string file_name = path_inner.filename().c_str(); + if (file_name == MAX_OP_ID_FILE_NAME) { + continue; + } + idx_t op_id = std::stol(file_name); + id_files.insert(std::make_pair(op_id, path_inner)); + } + + // the max operation id + idx_t max_op_id = std::numeric_limits::max(); + { + std::lock_guard lock(max_op_mutex_); + if (max_op_id_map_.find(collection_name) != max_op_id_map_.end()) { + max_op_id = max_op_id_map_[collection_name]; + } + } + + // id_files arrange id in assendent, we know which file should be read + for (auto& pair : id_files) { + WalFilePtr file = std::make_shared(); + file->OpenFile(pair.second.c_str(), WalFile::READ); + idx_t last_id = 0; + file->ReadLastOpId(last_id); + if (last_id <= max_op_id) { + file->CloseFile(); + std::experimental::filesystem::remove(pair.second); + continue; // skip and delete this file since all its operations already done + } + + Status status = Status::OK(); + while (status.ok()) { + WalOperationPtr operation; + operation->collection_name_ = collection_name; + status = WalOperationCodec::IterateOperation(file, operation, max_op_id); + PerformOperation(operation, db); + } + } + } + + return Status::OK(); +} + +Status +WalManager::ReadMaxOpId() { + using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator; + DirectoryIterator iter(wal_path_); + DirectoryIterator end; + for (; iter != end; ++iter) { + auto path = (*iter).path(); + if (std::experimental::filesystem::is_directory(path)) { + std::string collection_name = path.filename().c_str(); + path.append(MAX_OP_ID_FILE_NAME); + if (!std::experimental::filesystem::is_regular_file(path)) { + continue; // ignore? + } + + WalFile file; + file.OpenFile(path.c_str(), WalFile::READ); + idx_t max_op = 0; + file.Read(&max_op); + + std::lock_guard lock(max_op_mutex_); + max_op_id_map_.insert(std::make_pair(collection_name, max_op)); + } + } + return Status::OK(); } @@ -58,28 +213,209 @@ WalManager::RecordInsertOperation(const InsertEntityOperationPtr& operation, con std::vector chunks; SplitChunk(operation->data_chunk_, chunks); - return Status::OK(); -} + IDNumbers op_ids; + auto status = id_gen_.GetNextIDNumbers(chunks.size(), op_ids); + if (!status.ok()) { + return status; + } -Status -WalManager::SplitChunk(const DataChunkPtr& chunk, std::vector& chunks) { - int64_t chunk_size = utils::GetSizeOfChunk(chunk); - if (chunk_size > insert_buffer_size_) { - } else { - chunks.push_back(chunk); + for (size_t i = 0; i < chunks.size(); ++i) { + idx_t op_id = op_ids[i]; + DataChunkPtr& chunk = chunks[i]; + int64_t chunk_size = utils::GetSizeOfChunk(chunk); + + { + // open wal file + std::string path = ConstructFilePath(operation->collection_name_, std::to_string(op_id)); + std::lock_guard lock(file_map_mutex_); + WalFilePtr file = file_map_[operation->collection_name_]; + if (file == nullptr) { + file = std::make_shared(); + file_map_[operation->collection_name_] = file; + file->OpenFile(path, WalFile::APPEND_WRITE); + } else if (!file->IsOpened() || file->ExceedMaxSize(chunk_size)) { + file->OpenFile(path, WalFile::APPEND_WRITE); + } + + // write to wal file + status = WalOperationCodec::WriteInsertOperation(file, operation->partition_name, chunk, op_id); + if (!status.ok()) { + return status; + } + } + + // insert action to db + status = db->Insert(operation->collection_name_, operation->partition_name, operation->data_chunk_, op_id); + if (!status.ok()) { + return status; + } } return Status::OK(); } Status -WalManager::RecordDeleteOperation(const DeleteEntityOperationPtr& operation, const DBPtr& db) { +WalManager::SplitChunk(const DataChunkPtr& chunk, std::vector& chunks) { + // int64_t chunk_size = utils::GetSizeOfChunk(chunk); + // if (chunk_size > insert_buffer_size_) { + // int64_t batch = chunk_size / insert_buffer_size_; + // int64_t batch_count = chunk->count_ / batch; + // for (int64_t i = 0; i <= batch; ++i) { + // } + // } else { + // chunks.push_back(chunk); + // } + chunks.push_back(chunk); + return Status::OK(); } Status -WalManager::OperationDone(id_t op_id) { - return Status::OK(); +WalManager::RecordDeleteOperation(const DeleteEntityOperationPtr& operation, const DBPtr& db) { + idx_t op_id = id_gen_.GetNextIDNumber(); + int64_t append_size = operation->entity_ids_.size() * sizeof(idx_t); + + { + // open wal file + std::string path = ConstructFilePath(operation->collection_name_, std::to_string(op_id)); + std::lock_guard lock(file_map_mutex_); + WalFilePtr file = file_map_[operation->collection_name_]; + if (file == nullptr) { + file = std::make_shared(); + file_map_[operation->collection_name_] = file; + file->OpenFile(path, WalFile::APPEND_WRITE); + } else if (!file->IsOpened() || file->ExceedMaxSize(append_size)) { + file->OpenFile(path, WalFile::APPEND_WRITE); + } + + // write to wal file + auto status = WalOperationCodec::WriteDeleteOperation(file, operation->entity_ids_, op_id); + if (!status.ok()) { + return status; + } + } + + // delete action to db + return db->DeleteEntityByID(operation->collection_name_, operation->entity_ids_, op_id); +} + +std::string +WalManager::ConstructFilePath(const std::string& collection_name, const std::string& file_name) { + std::experimental::filesystem::path full_path(wal_path_); + std::experimental::filesystem::create_directory(full_path); + full_path.append(collection_name); + std::experimental::filesystem::create_directory(full_path); + full_path.append(file_name); + + std::string path(full_path.c_str()); + return path; +} + +void +WalManager::StartCleanupThread(const std::string& collection_name) { + // the previous thread finished? + std::lock_guard lck(cleanup_thread_mutex_); + if (cleanup_thread_results_.empty()) { + // start a new cleanup thread + cleanup_thread_results_.push_back( + cleanup_thread_pool_.enqueue(&WalManager::CleanupThread, this, collection_name)); + } else { + std::chrono::milliseconds span(1); + if (cleanup_thread_results_.back().wait_for(span) == std::future_status::ready) { + cleanup_thread_results_.pop_back(); + + // start a new cleanup thread + cleanup_thread_results_.push_back( + cleanup_thread_pool_.enqueue(&WalManager::CleanupThread, this, collection_name)); + } + } +} + +void +WalManager::CleanupThread(std::string collection_name) { + SetThreadName("wal_clean"); + + using DirectoryIterator = std::experimental::filesystem::recursive_directory_iterator; + DirectoryIterator iter_outer(wal_path_); + DirectoryIterator end_outer; + for (; iter_outer != end_outer; ++iter_outer) { + auto path_outer = (*iter_outer).path(); + if (!std::experimental::filesystem::is_directory(path_outer)) { + continue; + } + + // get max operation id + std::string file_name = path_outer.filename().c_str(); + if (file_name != collection_name) { + continue; + } + + idx_t max_op = std::numeric_limits::max(); + { + std::lock_guard lock(max_op_mutex_); + if (max_op_id_map_.find(collection_name) != max_op_id_map_.end()) { + max_op = max_op_id_map_[collection_name]; + } + } + + // iterate files + std::map id_files; + DirectoryIterator iter_inner(path_outer); + DirectoryIterator end_inner; + for (; iter_inner != end_inner; ++iter_inner) { + auto path_inner = (*iter_inner).path(); + std::string file_name = path_inner.filename().c_str(); + if (file_name == MAX_OP_ID_FILE_NAME) { + continue; + } + idx_t op_id = std::stol(file_name); + id_files.insert(std::make_pair(op_id, path_inner)); + } + + if (id_files.empty()) { + continue; + } + + // remove unused files + // the std::map arrange id in assendent, direct delete files except the last one + idx_t max_id = id_files.rbegin()->first; + std::experimental::filesystem::path max_file = id_files.rbegin()->second; + id_files.erase(max_id); + for (auto& pair : id_files) { + std::experimental::filesystem::remove(pair.second); + } + + // the last wal file need to be deleted? + WalFile file; + file.OpenFile(max_file.c_str(), WalFile::READ); + idx_t last_id = 0; + file.ReadLastOpId(last_id); + if (last_id <= max_op) { + file.CloseFile(); + std::experimental::filesystem::remove(max_file); + } + } +} + +Status +WalManager::PerformOperation(const WalOperationPtr& operation, const DBPtr& db) { + Status status; + switch (operation->Type()) { + case WalOperationType::INSERT_ENTITY: { + InsertEntityOperationPtr op = std::static_pointer_cast(operation); + status = db->Insert(op->collection_name_, op->partition_name, op->data_chunk_, op->ID()); + break; + } + case WalOperationType::DELETE_ENTITY: { + DeleteEntityOperationPtr op = std::static_pointer_cast(operation); + status = db->DeleteEntityByID(op->collection_name_, op->entity_ids_, op->ID()); + break; + } + default: + return Status(DB_ERROR, "Unsupportted wal operation"); + } + + return status; } } // namespace engine diff --git a/core/src/db/wal/WalManager.h b/core/src/db/wal/WalManager.h index a1dfe91d15..4b08652e8f 100644 --- a/core/src/db/wal/WalManager.h +++ b/core/src/db/wal/WalManager.h @@ -14,10 +14,15 @@ #include "db/DB.h" #include "db/IDGenerator.h" #include "db/Types.h" +#include "db/wal/WalFile.h" #include "db/wal/WalOperation.h" #include "utils/Status.h" +#include "utils/ThreadPool.h" +#include +#include #include +#include #include namespace milvus { @@ -30,18 +35,25 @@ class WalManager { static WalManager& GetInstance(); - void - SetWalPath(const std::string& path) { - wal_path_ = path; - } + Status + Start(const DBOptions& options); + + Status + Stop(); Status RecordOperation(const WalOperationPtr& operation, const DBPtr& db); Status - OperationDone(id_t op_id); + OperationDone(const std::string& collection_name, idx_t op_id); + + Status + Recovery(const DBPtr& db); private: + Status + ReadMaxOpId(); + Status RecordInsertOperation(const InsertEntityOperationPtr& operation, const DBPtr& db); @@ -51,12 +63,36 @@ class WalManager { Status SplitChunk(const DataChunkPtr& chunk, std::vector& chunks); + std::string + ConstructFilePath(const std::string& collection_name, const std::string& file_name); + + void + StartCleanupThread(const std::string& collection_name); + + void + CleanupThread(std::string collection_name); + + Status + PerformOperation(const WalOperationPtr& operation, const DBPtr& db); + private: SafeIDGenerator id_gen_; + bool enable_ = false; std::string wal_path_; - int64_t wal_buffer_size_ = 0; int64_t insert_buffer_size_ = 0; + + using WalFileMap = std::unordered_map; + WalFileMap file_map_; // mapping collection name to file + std::mutex file_map_mutex_; + + using MaxOpIdMap = std::unordered_map; + MaxOpIdMap max_op_id_map_; // mapping collection name to max operation id + std::mutex max_op_mutex_; + + ThreadPool cleanup_thread_pool_; + std::mutex cleanup_thread_mutex_; + std::list> cleanup_thread_results_; }; } // namespace engine diff --git a/core/src/db/wal/WalOperation.h b/core/src/db/wal/WalOperation.h index f34e5af846..b4fc10733d 100644 --- a/core/src/db/wal/WalOperation.h +++ b/core/src/db/wal/WalOperation.h @@ -32,10 +32,10 @@ class WalOperation { explicit WalOperation(WalOperationType type); void - SetID(id_t id) { + SetID(idx_t id) { id_ = id; } - id_t + idx_t ID() const { return id_; } @@ -46,8 +46,11 @@ class WalOperation { } protected: - id_t id_ = 0; + idx_t id_ = 0; WalOperationType type_ = WalOperationType::INVALID; + + public: + std::string collection_name_; }; using WalOperationPtr = std::shared_ptr; @@ -58,7 +61,6 @@ class InsertEntityOperation : public WalOperation { InsertEntityOperation(); public: - std::string collection_name_; std::string partition_name; DataChunkPtr data_chunk_; }; @@ -71,8 +73,7 @@ class DeleteEntityOperation : public WalOperation { DeleteEntityOperation(); public: - std::string collection_name_; - engine::IDNumbers entity_ids_; + IDNumbers entity_ids_; }; using DeleteEntityOperationPtr = std::shared_ptr; diff --git a/core/src/db/wal/WalOperationCodec.cpp b/core/src/db/wal/WalOperationCodec.cpp index cd37b47c8e..f3e530a422 100644 --- a/core/src/db/wal/WalOperationCodec.cpp +++ b/core/src/db/wal/WalOperationCodec.cpp @@ -10,17 +10,269 @@ // or implied. See the License for the specific language governing permissions and limitations under the License. #include "db/wal/WalOperationCodec.h" +#include "utils/Log.h" + +#include +#include namespace milvus { namespace engine { Status -WalOperationCodec::SerializeOperation(const std::string& path, const InsertEntityOperationPtr& operation) { +WalOperationCodec::WriteInsertOperation(const WalFilePtr& file, const std::string& partition_name, + const DataChunkPtr& chunk, idx_t op_id) { + if (file == nullptr || !file->IsOpened() || chunk == nullptr) { + return Status(DB_ERROR, "Invalid input for write insert operation"); + } + + try { + // calculate total bytes, it must equal to total_bytes + int64_t calculate_total_bytes = 0; + calculate_total_bytes += sizeof(int32_t); // operation type + calculate_total_bytes += sizeof(idx_t); // operation id + calculate_total_bytes += sizeof(int64_t); // calculated total bytes + calculate_total_bytes += sizeof(int32_t); // partition name length + calculate_total_bytes += partition_name.size(); // partition name + calculate_total_bytes += sizeof(int32_t); // fixed field count + for (auto& pair : chunk->fixed_fields_) { + calculate_total_bytes += sizeof(int32_t); // field name length + calculate_total_bytes += pair.first.size(); // field name + + calculate_total_bytes += sizeof(int64_t); // data size + calculate_total_bytes += pair.second->data_.size(); // data + } + calculate_total_bytes += sizeof(idx_t); // operation id again + + int64_t total_bytes = 0; + // write operation type + int32_t type = WalOperationType::INSERT_ENTITY; + total_bytes += file->Write(&type); + + // write operation id + total_bytes += file->Write(&op_id); + + // write calculated total bytes + total_bytes += file->Write(&calculate_total_bytes); + + // write partition name + int32_t part_name_length = partition_name.size(); + total_bytes += file->Write(&part_name_length); + if (part_name_length > 0) { + total_bytes += file->Write((void*)partition_name.data(), part_name_length); + } + + // write fixed data + int32_t field_count = chunk->fixed_fields_.size(); + total_bytes += file->Write(&field_count); + for (auto& pair : chunk->fixed_fields_) { + if (pair.second == nullptr) { + continue; + } + + int32_t field_name_length = pair.first.size(); + total_bytes += file->Write(&field_name_length); + total_bytes += file->Write((void*)pair.first.data(), field_name_length); + + int64_t data_size = pair.second->data_.size(); + total_bytes += file->Write(&data_size); + total_bytes += file->Write((void*)pair.second->data_.data(), data_size); + } + + // TODO: write variable data + + // write operation id again + // Note: makesure operation id is written at end, so that wal cleanup thread know which file can be deleted + total_bytes += file->Write(&op_id); + + // flush to system buffer + file->Flush(); + + if (total_bytes != calculate_total_bytes) { + LOG_ENGINE_ERROR_ << "wal serialize(insert) bytes " << total_bytes << " not equal " + << calculate_total_bytes; + } else { + LOG_ENGINE_DEBUG_ << "Wal serialize(insert) " << total_bytes << " bytes"; + } + } catch (std::exception& ex) { + std::string msg = "Failed to write insert operation, reason: " + std::string(ex.what()); + return Status(DB_ERROR, msg); + } + return Status::OK(); } Status -WalOperationCodec::SerializeOperation(const std::string& path, const DeleteEntityOperationPtr& operation) { +WalOperationCodec::WriteDeleteOperation(const WalFilePtr& file, const IDNumbers& entity_ids, idx_t op_id) { + if (file == nullptr || !file->IsOpened() || entity_ids.empty()) { + return Status(DB_ERROR, "Invalid input for write delete operation"); + } + + try { + // calculate total bytes, it must equal to total_bytes + int64_t calculate_total_bytes = 0; + calculate_total_bytes += sizeof(int32_t); // operation type + calculate_total_bytes += sizeof(idx_t); // operation id + calculate_total_bytes += sizeof(int64_t); // calculated total bytes + calculate_total_bytes += sizeof(int64_t); // id count + calculate_total_bytes += entity_ids.size() * sizeof(idx_t); // ids + calculate_total_bytes += sizeof(idx_t); // operation id again + + int64_t total_bytes = 0; + // write operation type + int32_t type = WalOperationType::DELETE_ENTITY; + total_bytes += file->Write(&type); + + // write operation id + total_bytes += file->Write(&op_id); + + // write calculated total bytes + total_bytes += file->Write(&calculate_total_bytes); + + // write entity ids + int64_t id_count = entity_ids.size(); + total_bytes += file->Write(&id_count); + + total_bytes += file->Write((void*)entity_ids.data(), id_count * sizeof(idx_t)); + + // write operation id again + // Note: makesure operation id is written at end, so that wal cleanup thread know which file can be deleted + total_bytes += file->Write(&op_id); + + // flush to system buffer + file->Flush(); + + if (total_bytes != calculate_total_bytes) { + LOG_ENGINE_ERROR_ << "wal serialize(delete) bytes " << total_bytes << " not equal " + << calculate_total_bytes; + } else { + LOG_ENGINE_DEBUG_ << "Wal serialize(delete) " << total_bytes << " bytes"; + } + } catch (std::exception& ex) { + std::string msg = "Failed to write insert operation, reason: " + std::string(ex.what()); + return Status(DB_ERROR, msg); + } + + return Status::OK(); +} + +Status +WalOperationCodec::IterateOperation(const WalFilePtr& file, WalOperationPtr& operation, idx_t from_op_id) { + if (file == nullptr || !file->IsOpened()) { + return Status(DB_ERROR, "Invalid input iterate wal operation"); + } + + // read operation type + int32_t type = WalOperationType::INVALID; + int64_t read_bytes = file->Read(&type); + if (read_bytes <= 0) { + return Status(DB_ERROR, "End of file"); + } + + // read operation id + idx_t op_id = 0; + read_bytes = file->Read(&op_id); + if (read_bytes <= 0) { + return Status(DB_ERROR, "End of file"); + } + + // read total bytes + int64_t total_bytes = 0; + read_bytes = file->Read(&total_bytes); + if (read_bytes <= 0) { + return Status(DB_ERROR, "End of file"); + } + + // if the operation id is less/equal than from_op_id, skip this operation + if (op_id <= from_op_id) { + int64_t offset = total_bytes - sizeof(int32_t) - sizeof(idx_t) - sizeof(int64_t); + file->SeekForward(offset); + return Status::OK(); + } + + if (type == WalOperationType::INSERT_ENTITY) { + // read partition name + int32_t part_name_length = 0; + read_bytes = file->Read(&part_name_length); + if (read_bytes <= 0) { + return Status(DB_ERROR, "End of file"); + } + + std::string partition_name; + read_bytes = file->ReadStr(partition_name, part_name_length); + if (read_bytes <= 0) { + return Status(DB_ERROR, "End of file"); + } + + // read fixed data + int32_t field_count = 0; + read_bytes = file->Read(&field_count); + if (read_bytes <= 0) { + return Status(DB_ERROR, "End of file"); + } + + DataChunkPtr chunk = std::make_shared(); + for (int32_t i = 0; i < field_count; i++) { + int32_t field_name_length = 0; + read_bytes = file->Read(&field_name_length); + if (read_bytes <= 0) { + return Status(DB_ERROR, "End of file"); + } + + // field name + std::string field_name; + read_bytes = file->ReadStr(field_name, field_name_length); + if (read_bytes <= 0) { + return Status(DB_ERROR, "End of file"); + } + + // binary data + int64_t data_size = 0; + read_bytes = file->Read(&data_size); + if (read_bytes <= 0) { + return Status(DB_ERROR, "End of file"); + } + + BinaryDataPtr data = std::make_shared(); + data->data_.resize(data_size); + read_bytes = file->Read(data->data_.data(), data_size); + if (read_bytes <= 0) { + return Status(DB_ERROR, "End of file"); + } + + chunk->fixed_fields_.insert(std::make_pair(field_name, data)); + } + + InsertEntityOperationPtr insert_op = std::make_shared(); + insert_op->partition_name = partition_name; + insert_op->data_chunk_ = chunk; + operation = insert_op; + } else if (type == WalOperationType::DELETE_ENTITY) { + // read entity ids + int64_t id_count = 0; + read_bytes = file->Read(&id_count); + if (read_bytes <= 0) { + return Status(DB_ERROR, "End of file"); + } + + IDNumbers ids; + ids.resize(id_count); + read_bytes = file->Read(ids.data(), id_count * sizeof(idx_t)); + if (read_bytes <= 0) { + return Status(DB_ERROR, "End of file"); + } + + DeleteEntityOperationPtr delete_op = std::make_shared(); + delete_op->entity_ids_.swap(ids); + operation = delete_op; + } + + read_bytes = file->Read(&op_id); + if (read_bytes <= 0) { + return Status(DB_ERROR, "End of file"); + } + + operation->SetID(op_id); + return Status::OK(); } diff --git a/core/src/db/wal/WalOperationCodec.h b/core/src/db/wal/WalOperationCodec.h index a9d6bd5a1c..89b01a8570 100644 --- a/core/src/db/wal/WalOperationCodec.h +++ b/core/src/db/wal/WalOperationCodec.h @@ -13,6 +13,7 @@ #include +#include "db/wal/WalFile.h" #include "db/wal/WalOperation.h" #include "utils/Status.h" @@ -22,10 +23,14 @@ namespace engine { class WalOperationCodec { public: static Status - SerializeOperation(const std::string& path, const InsertEntityOperationPtr& operation); + WriteInsertOperation(const WalFilePtr& file, const std::string& partition_name, const DataChunkPtr& chunk, + idx_t op_id); static Status - SerializeOperation(const std::string& path, const DeleteEntityOperationPtr& operation); + WriteDeleteOperation(const WalFilePtr& file, const IDNumbers& entity_ids, idx_t op_id); + + static Status + IterateOperation(const WalFilePtr& file, WalOperationPtr& operation, idx_t from_op_id); }; } // namespace engine diff --git a/core/src/db/wal/WalProxy.cpp b/core/src/db/wal/WalProxy.cpp index 7dda24e846..9171944eb9 100644 --- a/core/src/db/wal/WalProxy.cpp +++ b/core/src/db/wal/WalProxy.cpp @@ -25,9 +25,36 @@ WalProxy::WalProxy(const DBPtr& db, const DBOptions& options) : DBProxy(db, opti } } +Status +WalProxy::Start() { + // let service start + auto status = db_->Start(); + if (!status.ok()) { + return status; + } + + if (options_.wal_enable_) { + WalManager::GetInstance().Start(options_); + WalManager::GetInstance().Recovery(db_); + } + + return status; +} + +Status +WalProxy::Stop() { + auto status = db_->Stop(); + + if (options_.wal_enable_) { + WalManager::GetInstance().Stop(); + } + + return status; +} + Status WalProxy::Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, - id_t op_id) { + idx_t op_id) { // write operation into disk InsertEntityOperationPtr op = std::make_shared(); op->collection_name_ = collection_name; @@ -38,7 +65,7 @@ WalProxy::Insert(const std::string& collection_name, const std::string& partitio } Status -WalProxy::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) { +WalProxy::DeleteEntityByID(const std::string& collection_name, const IDNumbers& entity_ids, idx_t op_id) { // write operation into disk DeleteEntityOperationPtr op = std::make_shared(); op->collection_name_ = collection_name; @@ -47,17 +74,5 @@ WalProxy::DeleteEntityByID(const std::string& collection_name, const engine::IDN return WalManager::GetInstance().RecordOperation(op, db_); } -Status -WalProxy::Flush(const std::string& collection_name) { - auto status = db_->Flush(collection_name); - return status; -} - -Status -WalProxy::Flush() { - auto status = db_->Flush(); - return status; -} - } // namespace engine } // namespace milvus diff --git a/core/src/db/wal/WalProxy.h b/core/src/db/wal/WalProxy.h index 6fe9c31a4d..d0d576f009 100644 --- a/core/src/db/wal/WalProxy.h +++ b/core/src/db/wal/WalProxy.h @@ -24,18 +24,18 @@ class WalProxy : public DBProxy { public: WalProxy(const DBPtr& db, const DBOptions& options); + Status + Start() override; + + Status + Stop() override; + Status Insert(const std::string& collection_name, const std::string& partition_name, DataChunkPtr& data_chunk, - id_t op_id) override; + idx_t op_id) override; Status - DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids, id_t op_id) override; - - Status - Flush(const std::string& collection_name) override; - - Status - Flush() override; + DeleteEntityByID(const std::string& collection_name, const IDNumbers& entity_ids, idx_t op_id) override; private: }; diff --git a/core/src/segment/IdBloomFilter.cpp b/core/src/segment/IdBloomFilter.cpp index 408d2ab37c..d65adabf6c 100644 --- a/core/src/segment/IdBloomFilter.cpp +++ b/core/src/segment/IdBloomFilter.cpp @@ -41,14 +41,14 @@ IdBloomFilter::GetBloomFilter() { } bool -IdBloomFilter::Check(id_t uid) { +IdBloomFilter::Check(engine::idx_t uid) { std::string s = std::to_string(uid); const std::lock_guard lock(mutex_); return scaling_bloom_check(bloom_filter_, s.c_str(), s.size()); } Status -IdBloomFilter::Add(id_t uid) { +IdBloomFilter::Add(engine::idx_t uid) { std::string s = std::to_string(uid); const std::lock_guard lock(mutex_); if (scaling_bloom_add(bloom_filter_, s.c_str(), s.size(), uid) == -1) { @@ -60,7 +60,7 @@ IdBloomFilter::Add(id_t uid) { } Status -IdBloomFilter::Remove(id_t uid) { +IdBloomFilter::Remove(engine::idx_t uid) { std::string s = std::to_string(uid); const std::lock_guard lock(mutex_); if (scaling_bloom_remove(bloom_filter_, s.c_str(), s.size(), uid) == -1) { diff --git a/core/src/segment/IdBloomFilter.h b/core/src/segment/IdBloomFilter.h index 5966288a29..6cbb63da62 100644 --- a/core/src/segment/IdBloomFilter.h +++ b/core/src/segment/IdBloomFilter.h @@ -22,6 +22,7 @@ #include "cache/DataObj.h" #include "dablooms/dablooms.h" +#include "db/Types.h" #include "utils/Status.h" namespace milvus { @@ -37,13 +38,13 @@ class IdBloomFilter : public cache::DataObj { GetBloomFilter(); bool - Check(id_t uid); + Check(engine::idx_t uid); Status - Add(id_t uid); + Add(engine::idx_t uid); Status - Remove(id_t uid); + Remove(engine::idx_t uid); int64_t Size() override; diff --git a/core/src/segment/SegmentReader.cpp b/core/src/segment/SegmentReader.cpp index 7fa7f39adf..59644e5486 100644 --- a/core/src/segment/SegmentReader.cpp +++ b/core/src/segment/SegmentReader.cpp @@ -218,7 +218,7 @@ SegmentReader::LoadFieldsEntities(const std::vector& fields_name, c } Status -SegmentReader::LoadUids(std::vector& uids) { +SegmentReader::LoadUids(std::vector& uids) { engine::BinaryDataPtr raw; auto status = LoadField(engine::FIELD_UID, raw); if (!status.ok()) { @@ -230,14 +230,14 @@ SegmentReader::LoadUids(std::vector& uids) { return Status(DB_ERROR, "Failed to load id field"); } - if (raw->data_.size() % sizeof(engine::id_t) != 0) { + if (raw->data_.size() % sizeof(engine::idx_t) != 0) { std::string err_msg = "Failed to load uids: illegal file size"; LOG_ENGINE_ERROR_ << err_msg; return Status(DB_ERROR, err_msg); } uids.clear(); - uids.resize(raw->data_.size() / sizeof(engine::id_t)); + uids.resize(raw->data_.size() / sizeof(engine::idx_t)); memcpy(uids.data(), raw->data_.data(), raw->data_.size()); return Status::OK(); diff --git a/core/src/segment/SegmentReader.h b/core/src/segment/SegmentReader.h index 9496d6eaed..83a84dc1cc 100644 --- a/core/src/segment/SegmentReader.h +++ b/core/src/segment/SegmentReader.h @@ -50,7 +50,7 @@ class SegmentReader { engine::DataChunkPtr& data_chunk); Status - LoadUids(std::vector& uids); + LoadUids(std::vector& uids); Status LoadVectorIndex(const std::string& field_name, knowhere::VecIndexPtr& index_ptr, bool flat = false); diff --git a/core/src/segment/SegmentWriter.cpp b/core/src/segment/SegmentWriter.cpp index 89cb17aabf..e68b8f73bb 100644 --- a/core/src/segment/SegmentWriter.cpp +++ b/core/src/segment/SegmentWriter.cpp @@ -359,7 +359,7 @@ SegmentWriter::RowCount() { } Status -SegmentWriter::LoadUids(std::vector& uids) { +SegmentWriter::LoadUids(std::vector& uids) { engine::BinaryDataPtr raw; auto status = segment_ptr_->GetFixedFieldData(engine::FIELD_UID, raw); if (!status.ok()) { @@ -371,14 +371,14 @@ SegmentWriter::LoadUids(std::vector& uids) { return Status(DB_ERROR, "Invalid id field"); } - if (raw->data_.size() % sizeof(engine::id_t) != 0) { + if (raw->data_.size() % sizeof(engine::idx_t) != 0) { std::string err_msg = "Failed to load uids: illegal file size"; LOG_ENGINE_ERROR_ << err_msg; return Status(DB_ERROR, err_msg); } uids.clear(); - uids.resize(raw->data_.size() / sizeof(engine::id_t)); + uids.resize(raw->data_.size() / sizeof(engine::idx_t)); memcpy(uids.data(), raw->data_.data(), raw->data_.size()); return Status::OK(); diff --git a/core/src/segment/SegmentWriter.h b/core/src/segment/SegmentWriter.h index 1d7bd571f3..e55c850c29 100644 --- a/core/src/segment/SegmentWriter.h +++ b/core/src/segment/SegmentWriter.h @@ -61,7 +61,7 @@ class SegmentWriter { RowCount(); Status - LoadUids(std::vector& uids); + LoadUids(std::vector& uids); Status SetVectorIndex(const std::string& field_name, const knowhere::VecIndexPtr& index); diff --git a/core/unittest/db/CMakeLists.txt b/core/unittest/db/CMakeLists.txt index 00366cc039..bf9ffbe82e 100644 --- a/core/unittest/db/CMakeLists.txt +++ b/core/unittest/db/CMakeLists.txt @@ -17,6 +17,7 @@ set( TEST_FILES ${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp ${CMAKE_CURRENT_SOURCE_DIR}/test_db.cpp ${CMAKE_CURRENT_SOURCE_DIR}/test_meta.cpp ${CMAKE_CURRENT_SOURCE_DIR}/test_ss_event.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/test_wal.cpp ) add_executable( test_db diff --git a/core/unittest/db/test_db.cpp b/core/unittest/db/test_db.cpp index e58c08706d..1af3f98efc 100644 --- a/core/unittest/db/test_db.cpp +++ b/core/unittest/db/test_db.cpp @@ -776,7 +776,7 @@ TEST_F(DBTest, CompactTest) { if (delete_count < 0) { return; } - std::vector delete_ids; + std::vector delete_ids; for (auto i = from; i < to; ++i) { delete_ids.push_back(batch_entity_ids[i]); } diff --git a/core/unittest/db/test_segment.cpp b/core/unittest/db/test_segment.cpp index 3ed2ac79fc..92a4a95c49 100644 --- a/core/unittest/db/test_segment.cpp +++ b/core/unittest/db/test_segment.cpp @@ -94,7 +94,7 @@ TEST_F(SegmentTest, SegmentTest) { break; } - std::vector raw_uids = {123}; + std::vector raw_uids = {123}; std::vector raw_vectors = {1, 2, 3, 4}; { diff --git a/core/unittest/db/test_wal.cpp b/core/unittest/db/test_wal.cpp new file mode 100644 index 0000000000..8131498967 --- /dev/null +++ b/core/unittest/db/test_wal.cpp @@ -0,0 +1,283 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include +#include +#include + +#include +#include +#include +#include + +#include "db/DBProxy.h" +#include "db/utils.h" +#include "db/wal/WalManager.h" +#include "db/wal/WalFile.h" +#include "db/wal/WalOperationCodec.h" +#include "db/wal/WalProxy.h" + +namespace { + +using DBProxy = milvus::engine::DBProxy; +using WalFile = milvus::engine::WalFile; +using WalManager = milvus::engine::WalManager; +using WalOperation = milvus::engine::WalOperation; +using WalOperationPtr = milvus::engine::WalOperationPtr; +using WalOperationType = milvus::engine::WalOperationType; +using WalOperationCodec = milvus::engine::WalOperationCodec; +using WalProxy = milvus::engine::WalProxy; + +void CreateChunk(DataChunkPtr& chunk, int64_t row_count, int64_t& chunk_size) { + chunk = std::make_shared(); + chunk->count_ = row_count; + chunk_size = 0; + { + // int32 type field + std::string field_name = "f1"; + auto bin = std::make_shared(); + bin->data_.resize(chunk->count_ * sizeof(int32_t)); + int32_t* p = (int32_t*)(bin->data_.data()); + for (int64_t i = 0; i < chunk->count_; ++i) { + p[i] = i; + } + chunk->fixed_fields_.insert(std::make_pair(field_name, bin)); + chunk_size += chunk->count_ * sizeof(int32_t); + } + { + // vector type field + int64_t dimension = 128; + std::string field_name = "f2"; + auto bin = std::make_shared(); + bin->data_.resize(chunk->count_ * sizeof(float) * dimension); + float* p = (float*)(bin->data_.data()); + for (int64_t i = 0; i < chunk->count_; ++i) { + for (int64_t j = 0; j < dimension; ++j) { + p[i * dimension + j] = i * j / 100.0; + } + } + chunk->fixed_fields_.insert(std::make_pair(field_name, bin)); + chunk_size += chunk->count_ * sizeof(float) * dimension; + } +} + +class DummyDB : public DBProxy { + public: + Status + Insert(const std::string& collection_name, + const std::string& partition_name, + DataChunkPtr& data_chunk, + idx_t op_id) override { + WalManager::GetInstance().OperationDone(collection_name, op_id); + return Status::OK(); + } + + Status + DeleteEntityByID(const std::string& collection_name, + const IDNumbers& entity_ids, + idx_t op_id) override { + WalManager::GetInstance().OperationDone(collection_name, op_id); + return Status::OK(); + } + +}; + +} // namespace + +TEST_F(WalTest, WalFileTest) { + std::string path = "/tmp/milvus_wal/test_file"; + idx_t last_id = 12345; + + { + WalFile file; + ASSERT_FALSE(file.IsOpened()); + ASSERT_EQ(file.Size(), 0); + + int64_t k = 0; + int64_t bytes = file.Write(&k); + ASSERT_EQ(bytes, 0); + + bytes = file.Read(&k); + ASSERT_EQ(bytes, 0); + + auto status = file.CloseFile(); + ASSERT_TRUE(status.ok()); + } + + { + WalFile file; + auto status = file.OpenFile(path, WalFile::APPEND_WRITE); + ASSERT_TRUE(status.ok()); + ASSERT_TRUE(file.IsOpened()); + + int64_t max_size = milvus::engine::MAX_WAL_FILE_SIZE; + ASSERT_FALSE(file.ExceedMaxSize(max_size)); + + int64_t total_bytes = 0; + int8_t len = path.size(); + int64_t bytes = file.Write(&len); + ASSERT_EQ(bytes, sizeof(int8_t)); + total_bytes += bytes; + + ASSERT_TRUE(file.ExceedMaxSize(max_size)); + + bytes = file.Write(path.data(), len); + ASSERT_EQ(bytes, len); + total_bytes += bytes; + + bytes = file.Write(&last_id); + ASSERT_EQ(bytes, sizeof(last_id)); + total_bytes += bytes; + + int64_t file_size = file.Size(); + ASSERT_EQ(total_bytes, file_size); + + std::string file_path = file.Path(); + ASSERT_EQ(file_path, path); + + file.Flush(); + file.CloseFile(); + ASSERT_FALSE(file.IsOpened()); + } + + { + WalFile file; + auto status = file.OpenFile(path, WalFile::READ); + ASSERT_TRUE(status.ok()); + + int8_t len = 0; + int64_t bytes = file.Read(&len); + ASSERT_EQ(bytes, sizeof(int8_t)); + + std::string str; + bytes = file.ReadStr(str, len); + ASSERT_EQ(bytes, len); + ASSERT_EQ(str, path); + + idx_t id_read = 0; + bytes = file.Read(&id_read); + ASSERT_EQ(bytes, sizeof(id_read)); + ASSERT_EQ(id_read, last_id); + + idx_t op_id = 0; + status = file.ReadLastOpId(op_id); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(op_id, last_id); + } +} + +TEST_F(WalTest, WalFileCodecTest) { + std::string path = "/tmp/milvus_wal/test_file"; + auto file = std::make_shared(); + + IDNumbers op_ids; + std::vector op_types; + // insert operation + { + auto status = file->OpenFile(path, WalFile::APPEND_WRITE); + ASSERT_TRUE(status.ok()); + + DataChunkPtr chunk; + int64_t chunk_size = 0; + CreateChunk(chunk, 1000, chunk_size); + + std::string partition_name = "p1"; + idx_t op_id = 100; + op_ids.push_back(op_id); + op_types.push_back(WalOperationType::INSERT_ENTITY); + WalOperationCodec::WriteInsertOperation(file, partition_name, chunk, op_id); + + ASSERT_GE(file->Size(), chunk_size); + + file->CloseFile(); + + WalFile file_read; + file_read.OpenFile(path, WalFile::READ); + idx_t last_id = 0; + file_read.ReadLastOpId(last_id); + ASSERT_EQ(last_id, op_id); + } + + // delete operation + { + auto status = file->OpenFile(path, WalFile::APPEND_WRITE); + ASSERT_TRUE(status.ok()); + + auto pre_size = file->Size(); + + IDNumbers ids = {1, 2, 3}; + idx_t op_id = 200; + op_ids.push_back(op_id); + op_types.push_back(WalOperationType::DELETE_ENTITY); + WalOperationCodec::WriteDeleteOperation(file, ids, op_id); + + auto post_size = file->Size(); + ASSERT_GE(post_size - pre_size, ids.size() * sizeof(idx_t)); + + file->CloseFile(); + + WalFile file_read; + file_read.OpenFile(path, WalFile::READ); + idx_t last_id = 0; + file_read.ReadLastOpId(last_id); + ASSERT_EQ(last_id, op_id); + } + + // iterate operations + { + auto status = file->OpenFile(path, WalFile::READ); + ASSERT_TRUE(status.ok()); + + Status iter_status; + int32_t op_index = 0; + while(iter_status.ok()) { + WalOperationPtr operation; + iter_status = WalOperationCodec::IterateOperation(file, operation, 0); + if (operation == nullptr) { + continue; + } + + ASSERT_EQ(operation->ID(), op_ids[op_index]); + ASSERT_EQ(operation->Type(), op_types[op_index]); + ++op_index; + } + ASSERT_EQ(op_index, op_ids.size()); + } +} + +TEST_F(WalTest, WalProxyTest) { + std::string collection_name = "col_1"; + std::string partition_name = "part_1"; + + // write over more than 400MB data + for (int64_t i = 1; i <= 1000; i++) { + idx_t op_id = i; + if (i % 10 == 0) { + IDNumbers ids = {1, 2, 3}; + auto status = db_->DeleteEntityByID(collection_name, ids, op_id); + ASSERT_TRUE(status.ok()); + } else { + DataChunkPtr chunk; + int64_t chunk_size = 0; + CreateChunk(chunk, 1000, chunk_size); + + auto status = db_->Insert(collection_name, partition_name, chunk, op_id); + ASSERT_TRUE(status.ok()); + } + } +} + +TEST(WalManagerTest, WalManagerTest) { + std::string path = "/tmp/milvus_wal/test_file"; + +// WalManager::GetInstance().Start(options_); +// WalManager::GetInstance().Recovery(db_); +} \ No newline at end of file diff --git a/core/unittest/db/utils.cpp b/core/unittest/db/utils.cpp index 03fce02c51..37f5daa364 100644 --- a/core/unittest/db/utils.cpp +++ b/core/unittest/db/utils.cpp @@ -38,6 +38,7 @@ #include "db/meta/backend/MockEngine.h" #include "db/meta/backend/MySqlEngine.h" #include "db/meta/backend/SqliteEngine.h" +#include "db/wal/WalProxy.h" #include "scheduler/ResourceFactory.h" #include "scheduler/SchedInst.h" #include "utils/CommonUtil.h" @@ -289,6 +290,7 @@ SchedulerTest::TearDown() { BaseTest::TearDown(); } +///////////////////////////////////////////////////////////////////////////////////////////////////////////////// void EventTest::SetUp() { auto uri = "mock://:@:/"; @@ -300,6 +302,30 @@ void EventTest::TearDown() { } +///////////////////////////////////////////////////////////////////////////////////////////////////////////////// +milvus::engine::DBOptions +WalTest::GetOptions() { + milvus::engine::DBOptions options; + options.meta_.path_ = "/tmp/milvus_wal"; + options.meta_.backend_uri_ = "mock://:@:/"; + options.wal_enable_ = true; + return options; +} + +void +WalTest::SetUp() { + milvus::engine::DBPtr db = std::make_shared(nullptr, GetOptions()); + db_ = std::make_shared(db, GetOptions()); + db_->Start(); +} + +void +WalTest::TearDown() { + db_->Stop(); + db_ = nullptr; + std::experimental::filesystem::remove_all(GetOptions().meta_.path_); +} + ///////////////////////////////////////////////////////////////////////////////////////////////////////////////// int main(int argc, char **argv) { diff --git a/core/unittest/db/utils.h b/core/unittest/db/utils.h index f16c46bd7c..665e15550c 100644 --- a/core/unittest/db/utils.h +++ b/core/unittest/db/utils.h @@ -20,6 +20,7 @@ #include #include "db/DB.h" +#include "db/DBFactory.h" #include "db/meta/MetaAdapter.h" #include "db/snapshot/CompoundOperations.h" #include "db/snapshot/Context.h" @@ -82,13 +83,18 @@ using IterateSegmentFileHandler = milvus::engine::snapshot::IterateHandler db_; + + milvus::engine::DBOptions + GetOptions(); + + void + SetUp() override; + void + TearDown() override; +};