diff --git a/CHANGELOG.md b/CHANGELOG.md index 9938505788..104a5e2226 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,17 @@ Please mark all change in change log and use the issue from GitHub ## Bug - \#1635 Vectors can be returned by searching after vectors deleted if `cache_insert_data` set true +## Feature + +## Improvement +- \#1537 Optimize raw vector and uids read/write +- \#1546 Move Config.cpp to config directory +- \#1547 Rename storage/file to storage/disk and rename classes +- \#1548 Move store/Directory to storage/Operation and add FSHandler + +## Task + + # Milvus 0.7.0 (2020-03-11) ## Bug @@ -113,9 +124,7 @@ Please mark all change in change log and use the issue from GitHub - \#1448 General proto api for NNS libraries - \#1480 Add return code for AVX512 selection - \#1524 Update config "preload_table" description -- \#1537 Optimize raw vector and uids read/write - \#1544 Update resources name in HTTP module -- \#1546 Move Config.cpp to config directory - \#1567 Update yaml config description ## Task diff --git a/core/src/CMakeLists.txt b/core/src/CMakeLists.txt index 3978a407fd..173e1aa293 100644 --- a/core/src/CMakeLists.txt +++ b/core/src/CMakeLists.txt @@ -106,11 +106,11 @@ set(web_server_files ) aux_source_directory(${MILVUS_ENGINE_SRC}/storage storage_main_files) -aux_source_directory(${MILVUS_ENGINE_SRC}/storage/file storage_file_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/storage/disk storage_disk_files) aux_source_directory(${MILVUS_ENGINE_SRC}/storage/s3 storage_s3_files) set(storage_files ${storage_main_files} - ${storage_file_files} + ${storage_disk_files} ${storage_s3_files} ) @@ -125,8 +125,6 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/codecs/default codecs_default_files) aux_source_directory(${MILVUS_ENGINE_SRC}/segment segment_files) -aux_source_directory(${MILVUS_ENGINE_SRC}/store store_files) - set(engine_files ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp ${cache_files} @@ -143,7 +141,6 @@ set(engine_files ${codecs_files} ${codecs_default_files} ${segment_files} - ${store_files} ) if (MILVUS_WITH_PROMETHEUS) diff --git a/core/src/codecs/DeletedDocsFormat.h b/core/src/codecs/DeletedDocsFormat.h index d5f263a238..431a9a437a 100644 --- a/core/src/codecs/DeletedDocsFormat.h +++ b/core/src/codecs/DeletedDocsFormat.h @@ -20,7 +20,7 @@ #include #include "segment/DeletedDocs.h" -#include "store/Directory.h" +#include "storage/FSHandler.h" namespace milvus { namespace codec { @@ -28,10 +28,10 @@ namespace codec { class DeletedDocsFormat { public: virtual void - read(const store::DirectoryPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) = 0; + read(const storage::FSHandlerPtr& fs_ptr, segment::DeletedDocsPtr& deleted_docs) = 0; virtual void - write(const store::DirectoryPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) = 0; + write(const storage::FSHandlerPtr& fs_ptr, const segment::DeletedDocsPtr& deleted_docs) = 0; }; using DeletedDocsFormatPtr = std::shared_ptr; diff --git a/core/src/codecs/IdBloomFilterFormat.h b/core/src/codecs/IdBloomFilterFormat.h index 11cafac773..a374dd70cb 100644 --- a/core/src/codecs/IdBloomFilterFormat.h +++ b/core/src/codecs/IdBloomFilterFormat.h @@ -20,7 +20,7 @@ #include #include "segment/IdBloomFilter.h" -#include "store/Directory.h" +#include "storage/FSHandler.h" namespace milvus { namespace codec { @@ -28,13 +28,13 @@ namespace codec { class IdBloomFilterFormat { public: virtual void - read(const store::DirectoryPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; + read(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; virtual void - write(const store::DirectoryPtr& directory_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; + write(const storage::FSHandlerPtr& fs_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; virtual void - create(const store::DirectoryPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; + create(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; }; using IdBloomFilterFormatPtr = std::shared_ptr; diff --git a/core/src/codecs/VectorsFormat.h b/core/src/codecs/VectorsFormat.h index cfb7504fd6..5227f9a6a0 100644 --- a/core/src/codecs/VectorsFormat.h +++ b/core/src/codecs/VectorsFormat.h @@ -21,7 +21,7 @@ #include #include "segment/Vectors.h" -#include "store/Directory.h" +#include "storage/FSHandler.h" namespace milvus { namespace codec { @@ -29,16 +29,16 @@ namespace codec { class VectorsFormat { public: virtual void - read(const store::DirectoryPtr& directory_ptr, segment::VectorsPtr& vectors_read) = 0; + read(const storage::FSHandlerPtr& fs_ptr, segment::VectorsPtr& vectors_read) = 0; virtual void - write(const store::DirectoryPtr& directory_ptr, const segment::VectorsPtr& vectors) = 0; + write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorsPtr& vectors) = 0; virtual void - read_uids(const store::DirectoryPtr& directory_ptr, std::vector& uids) = 0; + read_uids(const storage::FSHandlerPtr& fs_ptr, std::vector& uids) = 0; virtual void - read_vectors(const store::DirectoryPtr& directory_ptr, off_t offset, size_t num_bytes, + read_vectors(const storage::FSHandlerPtr& fs_ptr, off_t offset, size_t num_bytes, std::vector& raw_vectors) = 0; }; diff --git a/core/src/codecs/default/DefaultDeletedDocsFormat.cpp b/core/src/codecs/default/DefaultDeletedDocsFormat.cpp index 0b0039cd4f..128b682543 100644 --- a/core/src/codecs/default/DefaultDeletedDocsFormat.cpp +++ b/core/src/codecs/default/DefaultDeletedDocsFormat.cpp @@ -35,10 +35,10 @@ namespace milvus { namespace codec { void -DefaultDeletedDocsFormat::read(const store::DirectoryPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) { +DefaultDeletedDocsFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::DeletedDocsPtr& deleted_docs) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirPath(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); const std::string del_file_path = dir_path + "/" + deleted_docs_filename_; int del_fd = open(del_file_path.c_str(), O_RDONLY, 00664); @@ -75,10 +75,10 @@ DefaultDeletedDocsFormat::read(const store::DirectoryPtr& directory_ptr, segment } void -DefaultDeletedDocsFormat::write(const store::DirectoryPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) { +DefaultDeletedDocsFormat::write(const storage::FSHandlerPtr& fs_ptr, const segment::DeletedDocsPtr& deleted_docs) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirPath(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); const std::string del_file_path = dir_path + "/" + deleted_docs_filename_; // Create a temporary file from the existing file diff --git a/core/src/codecs/default/DefaultDeletedDocsFormat.h b/core/src/codecs/default/DefaultDeletedDocsFormat.h index dd58297f35..d755245a84 100644 --- a/core/src/codecs/default/DefaultDeletedDocsFormat.h +++ b/core/src/codecs/default/DefaultDeletedDocsFormat.h @@ -30,10 +30,10 @@ class DefaultDeletedDocsFormat : public DeletedDocsFormat { DefaultDeletedDocsFormat() = default; void - read(const store::DirectoryPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) override; + read(const storage::FSHandlerPtr& fs_ptr, segment::DeletedDocsPtr& deleted_docs) override; void - write(const store::DirectoryPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) override; + write(const storage::FSHandlerPtr& fs_ptr, const segment::DeletedDocsPtr& deleted_docs) override; // No copy and move DefaultDeletedDocsFormat(const DefaultDeletedDocsFormat&) = delete; diff --git a/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp b/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp index 7cbabcfb74..ed7c40908b 100644 --- a/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp +++ b/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp @@ -30,11 +30,10 @@ constexpr unsigned int bloom_filter_capacity = 500000; constexpr double bloom_filter_error_rate = 0.01; void -DefaultIdBloomFilterFormat::read(const store::DirectoryPtr& directory_ptr, - segment::IdBloomFilterPtr& id_bloom_filter_ptr) { +DefaultIdBloomFilterFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirPath(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_; scaling_bloom_t* bloom_filter = new_scaling_bloom_from_file(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str()); @@ -48,11 +47,11 @@ DefaultIdBloomFilterFormat::read(const store::DirectoryPtr& directory_ptr, } void -DefaultIdBloomFilterFormat::write(const store::DirectoryPtr& directory_ptr, +DefaultIdBloomFilterFormat::write(const storage::FSHandlerPtr& fs_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirPath(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_; if (scaling_bloom_flush(id_bloom_filter_ptr->GetBloomFilter()) == -1) { std::string err_msg = @@ -63,9 +62,9 @@ DefaultIdBloomFilterFormat::write(const store::DirectoryPtr& directory_ptr, } void -DefaultIdBloomFilterFormat::create(const store::DirectoryPtr& directory_ptr, +DefaultIdBloomFilterFormat::create(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) { - std::string dir_path = directory_ptr->GetDirPath(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_; scaling_bloom_t* bloom_filter = new_scaling_bloom(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str()); diff --git a/core/src/codecs/default/DefaultIdBloomFilterFormat.h b/core/src/codecs/default/DefaultIdBloomFilterFormat.h index 7a5c4d29fd..e35daad9ef 100644 --- a/core/src/codecs/default/DefaultIdBloomFilterFormat.h +++ b/core/src/codecs/default/DefaultIdBloomFilterFormat.h @@ -22,7 +22,7 @@ #include "codecs/IdBloomFilterFormat.h" #include "segment/IdBloomFilter.h" -#include "store/Directory.h" +#include "storage/disk/DiskOperation.h" namespace milvus { namespace codec { @@ -32,13 +32,13 @@ class DefaultIdBloomFilterFormat : public IdBloomFilterFormat { DefaultIdBloomFilterFormat() = default; void - read(const store::DirectoryPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; + read(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; void - write(const store::DirectoryPtr& directory_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; + write(const storage::FSHandlerPtr& fs_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; void - create(const store::DirectoryPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; + create(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; // No copy and move DefaultIdBloomFilterFormat(const DefaultIdBloomFilterFormat&) = delete; diff --git a/core/src/codecs/default/DefaultVectorsFormat.cpp b/core/src/codecs/default/DefaultVectorsFormat.cpp index f32f0ae5ba..412049fc9a 100644 --- a/core/src/codecs/default/DefaultVectorsFormat.cpp +++ b/core/src/codecs/default/DefaultVectorsFormat.cpp @@ -102,10 +102,10 @@ DefaultVectorsFormat::read_uids_internal(const std::string& file_path, std::vect } void -DefaultVectorsFormat::read(const store::DirectoryPtr& directory_ptr, segment::VectorsPtr& vectors_read) { +DefaultVectorsFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::VectorsPtr& vectors_read) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirPath(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); if (!boost::filesystem::is_directory(dir_path)) { std::string err_msg = "Directory: " + dir_path + "does not exist"; ENGINE_LOG_ERROR << err_msg; @@ -134,10 +134,10 @@ DefaultVectorsFormat::read(const store::DirectoryPtr& directory_ptr, segment::Ve } void -DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segment::VectorsPtr& vectors) { +DefaultVectorsFormat::write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorsPtr& vectors) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirPath(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); const std::string rv_file_path = dir_path + "/" + vectors->GetName() + raw_vector_extension_; const std::string uid_file_path = dir_path + "/" + vectors->GetName() + user_id_extension_; @@ -197,10 +197,10 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm } void -DefaultVectorsFormat::read_uids(const store::DirectoryPtr& directory_ptr, std::vector& uids) { +DefaultVectorsFormat::read_uids(const storage::FSHandlerPtr& fs_ptr, std::vector& uids) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirPath(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); if (!boost::filesystem::is_directory(dir_path)) { std::string err_msg = "Directory: " + dir_path + "does not exist"; ENGINE_LOG_ERROR << err_msg; @@ -221,11 +221,11 @@ DefaultVectorsFormat::read_uids(const store::DirectoryPtr& directory_ptr, std::v } void -DefaultVectorsFormat::read_vectors(const store::DirectoryPtr& directory_ptr, off_t offset, size_t num_bytes, +DefaultVectorsFormat::read_vectors(const storage::FSHandlerPtr& fs_ptr, off_t offset, size_t num_bytes, std::vector& raw_vectors) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirPath(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); if (!boost::filesystem::is_directory(dir_path)) { std::string err_msg = "Directory: " + dir_path + "does not exist"; ENGINE_LOG_ERROR << err_msg; diff --git a/core/src/codecs/default/DefaultVectorsFormat.h b/core/src/codecs/default/DefaultVectorsFormat.h index 54c9b5278d..bfb20f221b 100644 --- a/core/src/codecs/default/DefaultVectorsFormat.h +++ b/core/src/codecs/default/DefaultVectorsFormat.h @@ -32,16 +32,17 @@ class DefaultVectorsFormat : public VectorsFormat { DefaultVectorsFormat() = default; void - read(const store::DirectoryPtr&, segment::VectorsPtr&) override; + read(const storage::FSHandlerPtr& fs_ptr, segment::VectorsPtr& vectors_read) override; void - write(const store::DirectoryPtr&, const segment::VectorsPtr&) override; + write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorsPtr& vectors) override; void - read_vectors(const store::DirectoryPtr&, off_t, size_t, std::vector&) override; + read_uids(const storage::FSHandlerPtr& fs_ptr, std::vector& uids) override; void - read_uids(const store::DirectoryPtr&, std::vector&) override; + read_vectors(const storage::FSHandlerPtr& fs_ptr, off_t offset, size_t num_bytes, + std::vector& raw_vectors) override; // No copy and move DefaultVectorsFormat(const DefaultVectorsFormat&) = delete; diff --git a/core/src/segment/SegmentReader.cpp b/core/src/segment/SegmentReader.cpp index c0983a89be..757ed21c56 100644 --- a/core/src/segment/SegmentReader.cpp +++ b/core/src/segment/SegmentReader.cpp @@ -21,14 +21,19 @@ #include "Vectors.h" #include "codecs/default/DefaultCodec.h" -#include "store/Directory.h" +#include "storage/disk/DiskIOReader.h" +#include "storage/disk/DiskIOWriter.h" +#include "storage/disk/DiskOperation.h" #include "utils/Log.h" namespace milvus { namespace segment { SegmentReader::SegmentReader(const std::string& directory) { - directory_ptr_ = std::make_shared(directory); + storage::IOReaderPtr reader_ptr = std::make_shared(); + storage::IOWriterPtr writer_ptr = std::make_shared(); + storage::OperationPtr operation_ptr = std::make_shared(directory); + fs_ptr_ = std::make_shared(reader_ptr, writer_ptr, operation_ptr); segment_ptr_ = std::make_shared(); } @@ -43,9 +48,9 @@ SegmentReader::Load() { // TODO(zhiru) codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); - default_codec.GetVectorsFormat()->read(directory_ptr_, segment_ptr_->vectors_ptr_); - default_codec.GetDeletedDocsFormat()->read(directory_ptr_, segment_ptr_->deleted_docs_ptr_); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetVectorsFormat()->read(fs_ptr_, segment_ptr_->vectors_ptr_); + default_codec.GetDeletedDocsFormat()->read(fs_ptr_, segment_ptr_->deleted_docs_ptr_); } catch (std::exception& e) { return Status(DB_ERROR, e.what()); } @@ -56,8 +61,8 @@ Status SegmentReader::LoadVectors(off_t offset, size_t num_bytes, std::vector& raw_vectors) { codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); - default_codec.GetVectorsFormat()->read_vectors(directory_ptr_, offset, num_bytes, raw_vectors); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetVectorsFormat()->read_vectors(fs_ptr_, offset, num_bytes, raw_vectors); } catch (std::exception& e) { std::string err_msg = "Failed to load raw vectors: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -70,8 +75,8 @@ Status SegmentReader::LoadUids(std::vector& uids) { codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); - default_codec.GetVectorsFormat()->read_uids(directory_ptr_, uids); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetVectorsFormat()->read_uids(fs_ptr_, uids); } catch (std::exception& e) { std::string err_msg = "Failed to load uids: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -90,8 +95,8 @@ Status SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) { codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); - default_codec.GetIdBloomFilterFormat()->read(directory_ptr_, id_bloom_filter_ptr); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetIdBloomFilterFormat()->read(fs_ptr_, id_bloom_filter_ptr); } catch (std::exception& e) { std::string err_msg = "Failed to load bloom filter: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -104,8 +109,8 @@ Status SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) { codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); - default_codec.GetDeletedDocsFormat()->read(directory_ptr_, deleted_docs_ptr); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetDeletedDocsFormat()->read(fs_ptr_, deleted_docs_ptr); } catch (std::exception& e) { std::string err_msg = "Failed to load deleted docs: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; diff --git a/core/src/segment/SegmentReader.h b/core/src/segment/SegmentReader.h index 95531d5685..48247e5deb 100644 --- a/core/src/segment/SegmentReader.h +++ b/core/src/segment/SegmentReader.h @@ -22,7 +22,7 @@ #include #include "segment/Types.h" -#include "store/Directory.h" +#include "storage/FSHandler.h" #include "utils/Status.h" namespace milvus { @@ -55,7 +55,7 @@ class SegmentReader { GetSegment(SegmentPtr& segment_ptr); private: - store::DirectoryPtr directory_ptr_; + storage::FSHandlerPtr fs_ptr_; SegmentPtr segment_ptr_; }; diff --git a/core/src/segment/SegmentWriter.cpp b/core/src/segment/SegmentWriter.cpp index c80776e2f5..2136e229c0 100644 --- a/core/src/segment/SegmentWriter.cpp +++ b/core/src/segment/SegmentWriter.cpp @@ -23,14 +23,19 @@ #include "SegmentReader.h" #include "Vectors.h" #include "codecs/default/DefaultCodec.h" -#include "store/Directory.h" +#include "storage/disk/DiskIOReader.h" +#include "storage/disk/DiskIOWriter.h" +#include "storage/disk/DiskOperation.h" #include "utils/Log.h" namespace milvus { namespace segment { SegmentWriter::SegmentWriter(const std::string& directory) { - directory_ptr_ = std::make_shared(directory); + storage::IOReaderPtr reader_ptr = std::make_shared(); + storage::IOWriterPtr writer_ptr = std::make_shared(); + storage::OperationPtr operation_ptr = std::make_shared(directory); + fs_ptr_ = std::make_shared(reader_ptr, writer_ptr, operation_ptr); segment_ptr_ = std::make_shared(); } @@ -84,8 +89,8 @@ Status SegmentWriter::WriteVectors() { codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); - default_codec.GetVectorsFormat()->write(directory_ptr_, segment_ptr_->vectors_ptr_); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetVectorsFormat()->write(fs_ptr_, segment_ptr_->vectors_ptr_); } catch (std::exception& e) { std::string err_msg = "Failed to write vectors: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -98,11 +103,11 @@ Status SegmentWriter::WriteBloomFilter() { codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); + fs_ptr_->operation_ptr_->CreateDirectory(); auto start = std::chrono::high_resolution_clock::now(); - default_codec.GetIdBloomFilterFormat()->create(directory_ptr_, segment_ptr_->id_bloom_filter_ptr_); + default_codec.GetIdBloomFilterFormat()->create(fs_ptr_, segment_ptr_->id_bloom_filter_ptr_); auto end = std::chrono::high_resolution_clock::now(); std::chrono::duration diff = end - start; @@ -121,7 +126,7 @@ SegmentWriter::WriteBloomFilter() { start = std::chrono::high_resolution_clock::now(); - default_codec.GetIdBloomFilterFormat()->write(directory_ptr_, segment_ptr_->id_bloom_filter_ptr_); + default_codec.GetIdBloomFilterFormat()->write(fs_ptr_, segment_ptr_->id_bloom_filter_ptr_); end = std::chrono::high_resolution_clock::now(); diff = end - start; @@ -138,9 +143,9 @@ Status SegmentWriter::WriteDeletedDocs() { codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); + fs_ptr_->operation_ptr_->CreateDirectory(); DeletedDocsPtr deleted_docs_ptr = std::make_shared(); - default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs_ptr); + default_codec.GetDeletedDocsFormat()->write(fs_ptr_, deleted_docs_ptr); } catch (std::exception& e) { std::string err_msg = "Failed to write deleted docs: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -153,8 +158,8 @@ Status SegmentWriter::WriteDeletedDocs(const DeletedDocsPtr& deleted_docs) { codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); - default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetDeletedDocsFormat()->write(fs_ptr_, deleted_docs); } catch (std::exception& e) { std::string err_msg = "Failed to write deleted docs: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -167,8 +172,8 @@ Status SegmentWriter::WriteBloomFilter(const IdBloomFilterPtr& id_bloom_filter_ptr) { codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); - default_codec.GetIdBloomFilterFormat()->write(directory_ptr_, id_bloom_filter_ptr); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetIdBloomFilterFormat()->write(fs_ptr_, id_bloom_filter_ptr); } catch (std::exception& e) { std::string err_msg = "Failed to write bloom filter: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -191,11 +196,11 @@ SegmentWriter::GetSegment(SegmentPtr& segment_ptr) { Status SegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name) { - if (dir_to_merge == directory_ptr_->GetDirPath()) { + if (dir_to_merge == fs_ptr_->operation_ptr_->GetDirectory()) { return Status(DB_ERROR, "Cannot Merge Self"); } - ENGINE_LOG_DEBUG << "Merging from " << dir_to_merge << " to " << directory_ptr_->GetDirPath(); + ENGINE_LOG_DEBUG << "Merging from " << dir_to_merge << " to " << fs_ptr_->operation_ptr_->GetDirectory(); auto start = std::chrono::high_resolution_clock::now(); @@ -234,7 +239,7 @@ SegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name) { ENGINE_LOG_DEBUG << "Adding " << segment_to_merge->vectors_ptr_->GetCount() << " vectors and uids took " << diff.count() << " s"; - ENGINE_LOG_DEBUG << "Merging completed from " << dir_to_merge << " to " << directory_ptr_->GetDirPath(); + ENGINE_LOG_DEBUG << "Merging completed from " << dir_to_merge << " to " << fs_ptr_->operation_ptr_->GetDirectory(); return Status::OK(); } diff --git a/core/src/segment/SegmentWriter.h b/core/src/segment/SegmentWriter.h index f7b6a7398c..b5d42761a1 100644 --- a/core/src/segment/SegmentWriter.h +++ b/core/src/segment/SegmentWriter.h @@ -22,7 +22,7 @@ #include #include "segment/Types.h" -#include "store/Directory.h" +#include "storage/FSHandler.h" #include "utils/Status.h" namespace milvus { @@ -70,7 +70,7 @@ class SegmentWriter { WriteDeletedDocs(); private: - store::DirectoryPtr directory_ptr_; + storage::FSHandlerPtr fs_ptr_; SegmentPtr segment_ptr_; }; diff --git a/core/src/storage/FSHandler.h b/core/src/storage/FSHandler.h new file mode 100644 index 0000000000..8b0f175bf9 --- /dev/null +++ b/core/src/storage/FSHandler.h @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "storage/IOReader.h" +#include "storage/IOWriter.h" +#include "storage/Operation.h" + +namespace milvus { +namespace storage { + +struct FSHandler { + IOReaderPtr reader_ptr_ = nullptr; + IOWriterPtr writer_ptr_ = nullptr; + OperationPtr operation_ptr_ = nullptr; + + FSHandler(IOReaderPtr& reader_ptr, IOWriterPtr& writer_ptr, OperationPtr& operation_ptr) + : reader_ptr_(reader_ptr), writer_ptr_(writer_ptr), operation_ptr_(operation_ptr) { + } +}; + +using FSHandlerPtr = std::shared_ptr; + +} // namespace storage +} // namespace milvus diff --git a/core/src/storage/IOReader.h b/core/src/storage/IOReader.h index 0116602a08..eeddb7438b 100644 --- a/core/src/storage/IOReader.h +++ b/core/src/storage/IOReader.h @@ -11,6 +11,7 @@ #pragma once +#include #include namespace milvus { @@ -18,9 +19,8 @@ namespace storage { class IOReader { public: - explicit IOReader(const std::string& name) : name_(name) { - } - ~IOReader() = default; + virtual void + open(const std::string& name) = 0; virtual void read(void* ptr, size_t size) = 0; @@ -31,9 +31,11 @@ class IOReader { virtual size_t length() = 0; - public: - std::string name_; + virtual void + close() = 0; }; +using IOReaderPtr = std::shared_ptr; + } // namespace storage } // namespace milvus diff --git a/core/src/storage/IOWriter.h b/core/src/storage/IOWriter.h index a2281a02bd..38b6887057 100644 --- a/core/src/storage/IOWriter.h +++ b/core/src/storage/IOWriter.h @@ -11,6 +11,7 @@ #pragma once +#include #include namespace milvus { @@ -18,9 +19,8 @@ namespace storage { class IOWriter { public: - explicit IOWriter(const std::string& name) : name_(name), len_(0) { - } - ~IOWriter() = default; + virtual void + open(const std::string& name) = 0; virtual void write(void* ptr, size_t size) = 0; @@ -28,10 +28,11 @@ class IOWriter { virtual size_t length() = 0; - public: - std::string name_; - size_t len_; + virtual void + close() = 0; }; +using IOWriterPtr = std::shared_ptr; + } // namespace storage } // namespace milvus diff --git a/core/src/storage/IStorage.h b/core/src/storage/IStorage.h deleted file mode 100644 index 6839f2bfca..0000000000 --- a/core/src/storage/IStorage.h +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#pragma once - -#include -#include -#include "utils/Status.h" - -namespace milvus { -namespace storage { - -class IStorage { - public: - virtual Status - CreateBucket() = 0; - virtual Status - DeleteBucket() = 0; - virtual Status - PutObjectFile(const std::string& object_name, const std::string& file_path) = 0; - virtual Status - PutObjectStr(const std::string& object_name, const std::string& content) = 0; - virtual Status - GetObjectFile(const std::string& object_name, const std::string& file_path) = 0; - virtual Status - GetObjectStr(const std::string& object_name, std::string& content) = 0; - virtual Status - ListObjects(std::vector& object_list, const std::string& marker = "") = 0; - virtual Status - DeleteObject(const std::string& object_name) = 0; - virtual Status - DeleteObjects(const std::string& marker) = 0; -}; - -} // namespace storage -} // namespace milvus diff --git a/core/src/storage/Operation.h b/core/src/storage/Operation.h new file mode 100644 index 0000000000..a29069c7da --- /dev/null +++ b/core/src/storage/Operation.h @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +namespace milvus { +namespace storage { + +class Operation { + public: + virtual void + CreateDirectory() = 0; + + virtual const std::string& + GetDirectory() const = 0; + + virtual void + ListDirectory(std::vector& file_paths) = 0; + + virtual bool + DeleteFile(const std::string& file_path) = 0; + + // TODO(zhiru): + // open(), sync(), close() + // function that opens a stream for reading file + // function that creates a new, empty file and returns an stream for appending data to this file + // function that creates a new, empty, temporary file and returns an stream for appending data to this file +}; + +using OperationPtr = std::shared_ptr; + +} // namespace storage +} // namespace milvus diff --git a/core/src/storage/file/FileIOReader.cpp b/core/src/storage/disk/DiskIOReader.cpp similarity index 74% rename from core/src/storage/file/FileIOReader.cpp rename to core/src/storage/disk/DiskIOReader.cpp index 9aab29e344..f51ffef0b3 100644 --- a/core/src/storage/file/FileIOReader.cpp +++ b/core/src/storage/disk/DiskIOReader.cpp @@ -9,33 +9,39 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. -#include "storage/file/FileIOReader.h" +#include "storage/disk/DiskIOReader.h" namespace milvus { namespace storage { -FileIOReader::FileIOReader(const std::string& name) : IOReader(name) { +void +DiskIOReader::open(const std::string& name) { + name_ = name; fs_ = std::fstream(name_, std::ios::in | std::ios::binary); } -FileIOReader::~FileIOReader() { - fs_.close(); -} - void -FileIOReader::read(void* ptr, size_t size) { +DiskIOReader::read(void* ptr, size_t size) { fs_.read(reinterpret_cast(ptr), size); } void -FileIOReader::seekg(size_t pos) { +DiskIOReader::seekg(size_t pos) { fs_.seekg(pos); } size_t -FileIOReader::length() { +DiskIOReader::length() { fs_.seekg(0, fs_.end); - return fs_.tellg(); + size_t len = fs_.tellg(); + fs_.seekg(0, fs_.beg); + return len; } + +void +DiskIOReader::close() { + fs_.close(); +} + } // namespace storage } // namespace milvus diff --git a/core/src/storage/file/FileIOReader.h b/core/src/storage/disk/DiskIOReader.h similarity index 67% rename from core/src/storage/file/FileIOReader.h rename to core/src/storage/disk/DiskIOReader.h index ed35c39d6c..2fcf52457d 100644 --- a/core/src/storage/file/FileIOReader.h +++ b/core/src/storage/disk/DiskIOReader.h @@ -18,10 +18,22 @@ namespace milvus { namespace storage { -class FileIOReader : public IOReader { +class DiskIOReader : public IOReader { public: - explicit FileIOReader(const std::string& name); - ~FileIOReader(); + DiskIOReader() = default; + ~DiskIOReader() = default; + + // No copy and move + DiskIOReader(const DiskIOReader&) = delete; + DiskIOReader(DiskIOReader&&) = delete; + + DiskIOReader& + operator=(const DiskIOReader&) = delete; + DiskIOReader& + operator=(DiskIOReader&&) = delete; + + void + open(const std::string& name) override; void read(void* ptr, size_t size) override; @@ -32,7 +44,11 @@ class FileIOReader : public IOReader { size_t length() override; + void + close() override; + public: + std::string name_; std::fstream fs_; }; diff --git a/core/src/storage/file/FileIOWriter.cpp b/core/src/storage/disk/DiskIOWriter.cpp similarity index 79% rename from core/src/storage/file/FileIOWriter.cpp rename to core/src/storage/disk/DiskIOWriter.cpp index 3992b58369..63899463b1 100644 --- a/core/src/storage/file/FileIOWriter.cpp +++ b/core/src/storage/disk/DiskIOWriter.cpp @@ -9,29 +9,33 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. -#include "storage/file/FileIOWriter.h" +#include "storage/disk/DiskIOWriter.h" namespace milvus { namespace storage { -FileIOWriter::FileIOWriter(const std::string& name) : IOWriter(name) { +void +DiskIOWriter::open(const std::string& name) { + name_ = name; + len_ = 0; fs_ = std::fstream(name_, std::ios::out | std::ios::binary); } -FileIOWriter::~FileIOWriter() { - fs_.close(); -} - void -FileIOWriter::write(void* ptr, size_t size) { +DiskIOWriter::write(void* ptr, size_t size) { fs_.write(reinterpret_cast(ptr), size); len_ += size; } size_t -FileIOWriter::length() { +DiskIOWriter::length() { return len_; } +void +DiskIOWriter::close() { + fs_.close(); +} + } // namespace storage } // namespace milvus diff --git a/core/src/storage/file/FileIOWriter.h b/core/src/storage/disk/DiskIOWriter.h similarity index 66% rename from core/src/storage/file/FileIOWriter.h rename to core/src/storage/disk/DiskIOWriter.h index b06eb3feca..39c9b5ca68 100644 --- a/core/src/storage/file/FileIOWriter.h +++ b/core/src/storage/disk/DiskIOWriter.h @@ -18,10 +18,22 @@ namespace milvus { namespace storage { -class FileIOWriter : public IOWriter { +class DiskIOWriter : public IOWriter { public: - explicit FileIOWriter(const std::string& name); - ~FileIOWriter(); + DiskIOWriter() = default; + ~DiskIOWriter() = default; + + // No copy and move + DiskIOWriter(const DiskIOWriter&) = delete; + DiskIOWriter(DiskIOWriter&&) = delete; + + DiskIOWriter& + operator=(const DiskIOWriter&) = delete; + DiskIOWriter& + operator=(DiskIOWriter&&) = delete; + + void + open(const std::string& name) override; void write(void* ptr, size_t size) override; @@ -29,7 +41,12 @@ class FileIOWriter : public IOWriter { size_t length() override; + void + close() override; + public: + std::string name_; + size_t len_; std::fstream fs_; }; diff --git a/core/src/store/Directory.cpp b/core/src/storage/disk/DiskOperation.cpp similarity index 82% rename from core/src/store/Directory.cpp rename to core/src/storage/disk/DiskOperation.cpp index 1d41da004f..90a36c2bb2 100644 --- a/core/src/store/Directory.cpp +++ b/core/src/storage/disk/DiskOperation.cpp @@ -15,21 +15,20 @@ // specific language governing permissions and limitations // under the License. -#include "store/Directory.h" - #include +#include "storage/disk/DiskOperation.h" #include "utils/Exception.h" #include "utils/Log.h" namespace milvus { -namespace store { +namespace storage { -Directory::Directory(const std::string& dir_path) : dir_path_(dir_path) { +DiskOperation::DiskOperation(const std::string& dir_path) : dir_path_(dir_path) { } void -Directory::Create() { +DiskOperation::CreateDirectory() { if (!boost::filesystem::is_directory(dir_path_)) { auto ret = boost::filesystem::create_directory(dir_path_); if (!ret) { @@ -40,8 +39,13 @@ Directory::Create() { } } +const std::string& +DiskOperation::GetDirectory() const { + return dir_path_; +} + void -Directory::ListAll(std::vector& file_paths) { +DiskOperation::ListDirectory(std::vector& file_paths) { boost::filesystem::path target_path(dir_path_); typedef boost::filesystem::directory_iterator d_it; d_it it_end; @@ -54,14 +58,9 @@ Directory::ListAll(std::vector& file_paths) { } bool -Directory::DeleteFile(const std::string& file_path) { +DiskOperation::DeleteFile(const std::string& file_path) { return boost::filesystem::remove(file_path); } -const std::string& -Directory::GetDirPath() const { - return dir_path_; -} - -} // namespace store +} // namespace storage } // namespace milvus diff --git a/core/src/store/Directory.h b/core/src/storage/disk/DiskOperation.h similarity index 80% rename from core/src/store/Directory.h rename to core/src/storage/disk/DiskOperation.h index de1dc276ac..50c1a4ba3d 100644 --- a/core/src/store/Directory.h +++ b/core/src/storage/disk/DiskOperation.h @@ -21,25 +21,27 @@ #include #include +#include "storage/Operation.h" + namespace milvus { -namespace store { +namespace storage { -class Directory { +class DiskOperation : public Operation { public: - explicit Directory(const std::string& dir_path); + explicit DiskOperation(const std::string& dir_path); void - Create(); + CreateDirectory(); + + const std::string& + GetDirectory() const; void - ListAll(std::vector& file_paths); + ListDirectory(std::vector& file_paths); bool DeleteFile(const std::string& file_path); - const std::string& - GetDirPath() const; - // TODO(zhiru): // open(), sync(), close() // function that opens a stream for reading file @@ -50,7 +52,7 @@ class Directory { const std::string dir_path_; }; -using DirectoryPtr = std::shared_ptr; +using DiskOperationPtr = std::shared_ptr; -} // namespace store +} // namespace storage } // namespace milvus diff --git a/core/src/storage/s3/S3ClientWrapper.h b/core/src/storage/s3/S3ClientWrapper.h index 452c5cc2e7..f0e2d8cfe7 100644 --- a/core/src/storage/s3/S3ClientWrapper.h +++ b/core/src/storage/s3/S3ClientWrapper.h @@ -16,12 +16,13 @@ #include #include #include -#include "storage/IStorage.h" + +#include "utils/Status.h" namespace milvus { namespace storage { -class S3ClientWrapper : public IStorage { +class S3ClientWrapper { public: static S3ClientWrapper& GetInstance() { @@ -35,23 +36,23 @@ class S3ClientWrapper : public IStorage { StopService(); Status - CreateBucket() override; + CreateBucket(); Status - DeleteBucket() override; + DeleteBucket(); Status - PutObjectFile(const std::string& object_key, const std::string& file_path) override; + PutObjectFile(const std::string& object_key, const std::string& file_path); Status - PutObjectStr(const std::string& object_key, const std::string& content) override; + PutObjectStr(const std::string& object_key, const std::string& content); Status - GetObjectFile(const std::string& object_key, const std::string& file_path) override; + GetObjectFile(const std::string& object_key, const std::string& file_path); Status - GetObjectStr(const std::string& object_key, std::string& content) override; + GetObjectStr(const std::string& object_key, std::string& content); Status - ListObjects(std::vector& object_list, const std::string& marker = "") override; + ListObjects(std::vector& object_list, const std::string& marker = ""); Status - DeleteObject(const std::string& object_key) override; + DeleteObject(const std::string& object_key); Status - DeleteObjects(const std::string& marker) override; + DeleteObjects(const std::string& marker); private: std::shared_ptr client_ptr_; diff --git a/core/src/storage/s3/S3IOReader.cpp b/core/src/storage/s3/S3IOReader.cpp index 77edcb3861..e8d073029a 100644 --- a/core/src/storage/s3/S3IOReader.cpp +++ b/core/src/storage/s3/S3IOReader.cpp @@ -15,13 +15,13 @@ namespace milvus { namespace storage { -S3IOReader::S3IOReader(const std::string& name) : IOReader(name), pos_(0) { +void +S3IOReader::open(const std::string& name) { + name_ = name; + pos_ = 0; S3ClientWrapper::GetInstance().GetObjectStr(name_, buffer_); } -S3IOReader::~S3IOReader() { -} - void S3IOReader::read(void* ptr, size_t size) { memcpy(ptr, buffer_.data() + pos_, size); @@ -37,5 +37,9 @@ S3IOReader::length() { return buffer_.length(); } +void +S3IOReader::close() { +} + } // namespace storage } // namespace milvus diff --git a/core/src/storage/s3/S3IOReader.h b/core/src/storage/s3/S3IOReader.h index 5c2529c8e9..e69a64f969 100644 --- a/core/src/storage/s3/S3IOReader.h +++ b/core/src/storage/s3/S3IOReader.h @@ -19,8 +19,20 @@ namespace storage { class S3IOReader : public IOReader { public: - explicit S3IOReader(const std::string& name); - ~S3IOReader(); + S3IOReader() = default; + ~S3IOReader() = default; + + // No copy and move + S3IOReader(const S3IOReader&) = delete; + S3IOReader(S3IOReader&&) = delete; + + S3IOReader& + operator=(const S3IOReader&) = delete; + S3IOReader& + operator=(S3IOReader&&) = delete; + + void + open(const std::string& name) override; void read(void* ptr, size_t size) override; @@ -31,7 +43,11 @@ class S3IOReader : public IOReader { size_t length() override; + void + close() override; + public: + std::string name_; std::string buffer_; size_t pos_; }; diff --git a/core/src/storage/s3/S3IOWriter.cpp b/core/src/storage/s3/S3IOWriter.cpp index 51fff830fe..9d00db3c83 100644 --- a/core/src/storage/s3/S3IOWriter.cpp +++ b/core/src/storage/s3/S3IOWriter.cpp @@ -15,14 +15,13 @@ namespace milvus { namespace storage { -S3IOWriter::S3IOWriter(const std::string& name) : IOWriter(name) { +void +S3IOWriter::open(const std::string& name) { + name_ = name; + len_ = 0; buffer_ = ""; } -S3IOWriter::~S3IOWriter() { - S3ClientWrapper::GetInstance().PutObjectStr(name_, buffer_); -} - void S3IOWriter::write(void* ptr, size_t size) { buffer_ += std::string(reinterpret_cast(ptr), size); @@ -34,5 +33,10 @@ S3IOWriter::length() { return len_; } +void +S3IOWriter::close() { + S3ClientWrapper::GetInstance().PutObjectStr(name_, buffer_); +} + } // namespace storage } // namespace milvus diff --git a/core/src/storage/s3/S3IOWriter.h b/core/src/storage/s3/S3IOWriter.h index 53ec345afe..0b5240d7b1 100644 --- a/core/src/storage/s3/S3IOWriter.h +++ b/core/src/storage/s3/S3IOWriter.h @@ -19,8 +19,20 @@ namespace storage { class S3IOWriter : public IOWriter { public: - explicit S3IOWriter(const std::string& name); - ~S3IOWriter(); + S3IOWriter() = default; + ~S3IOWriter() = default; + + // No copy and move + S3IOWriter(const S3IOWriter&) = delete; + S3IOWriter(S3IOWriter&&) = delete; + + S3IOWriter& + operator=(const S3IOWriter&) = delete; + S3IOWriter& + operator=(S3IOWriter&&) = delete; + + void + open(const std::string& name) override; void write(void* ptr, size_t size) override; @@ -28,7 +40,12 @@ class S3IOWriter : public IOWriter { size_t length() override; + void + close() override; + public: + std::string name_; + size_t len_; std::string buffer_; }; diff --git a/core/src/wrapper/VecIndex.cpp b/core/src/wrapper/VecIndex.cpp index 142b1e1d49..a81dfe52e8 100644 --- a/core/src/wrapper/VecIndex.cpp +++ b/core/src/wrapper/VecIndex.cpp @@ -22,8 +22,8 @@ #include "knowhere/index/vector_index/IndexIVFSQ.h" #include "knowhere/index/vector_index/IndexNSG.h" #include "knowhere/index/vector_index/IndexSPTAG.h" -#include "storage/file/FileIOReader.h" -#include "storage/file/FileIOWriter.h" +#include "storage/disk/DiskIOReader.h" +#include "storage/disk/DiskIOWriter.h" #include "storage/s3/S3IOReader.h" #include "storage/s3/S3IOWriter.h" #include "utils/Exception.h" @@ -179,12 +179,13 @@ read_index(const std::string& location) { std::shared_ptr reader_ptr; if (s3_enable) { - reader_ptr = std::make_shared(location); + reader_ptr = std::make_shared(); } else { - reader_ptr = std::make_shared(location); + reader_ptr = std::make_shared(); } recorder.RecordSection("Start"); + reader_ptr->open(location); size_t length = reader_ptr->length(); if (length <= 0) { @@ -226,6 +227,8 @@ read_index(const std::string& location) { delete[] meta; } + reader_ptr->close(); + double span = recorder.RecordSection("End"); double rate = length * 1000000.0 / span / 1024 / 1024; STORAGE_LOG_DEBUG << "read_index(" << location << ") rate " << rate << "MB/s"; @@ -252,12 +255,13 @@ write_index(VecIndexPtr index, const std::string& location) { std::shared_ptr writer_ptr; if (s3_enable) { - writer_ptr = std::make_shared(location); + writer_ptr = std::make_shared(); } else { - writer_ptr = std::make_shared(location); + writer_ptr = std::make_shared(); } recorder.RecordSection("Start"); + writer_ptr->open(location); writer_ptr->write(&index_type, sizeof(IndexType)); @@ -273,6 +277,8 @@ write_index(VecIndexPtr index, const std::string& location) { writer_ptr->write((void*)binary->data.get(), binary_length); } + writer_ptr->close(); + double span = recorder.RecordSection("End"); double rate = writer_ptr->length() * 1000000.0 / span / 1024 / 1024; STORAGE_LOG_DEBUG << "write_index(" << location << ") rate " << rate << "MB/s"; diff --git a/core/unittest/CMakeLists.txt b/core/unittest/CMakeLists.txt index 8d41b731fe..6674fdf032 100644 --- a/core/unittest/CMakeLists.txt +++ b/core/unittest/CMakeLists.txt @@ -98,11 +98,11 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_files) aux_source_directory(${MILVUS_ENGINE_SRC}/tracing tracing_files) aux_source_directory(${MILVUS_ENGINE_SRC}/storage storage_main_files) -aux_source_directory(${MILVUS_ENGINE_SRC}/storage/file storage_file_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/storage/disk storage_disk_files) aux_source_directory(${MILVUS_ENGINE_SRC}/storage/s3 storage_s3_files) set(storage_files ${storage_main_files} - ${storage_file_files} + ${storage_disk_files} ${storage_s3_files} ) @@ -111,8 +111,6 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/codecs/default codecs_default_files) aux_source_directory(${MILVUS_ENGINE_SRC}/segment segment_files) -aux_source_directory(${MILVUS_ENGINE_SRC}/store store_files) - set(entry_file ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp) @@ -146,7 +144,6 @@ set(common_files ${codecs_files} ${codecs_default_files} ${segment_files} - ${store_files} ) set(unittest_libs diff --git a/core/unittest/storage/test_s3_client.cpp b/core/unittest/storage/test_s3_client.cpp index 1b326d57a7..e868b9d0a5 100644 --- a/core/unittest/storage/test_s3_client.cpp +++ b/core/unittest/storage/test_s3_client.cpp @@ -21,7 +21,6 @@ #include "storage/s3/S3ClientWrapper.h" #include "storage/s3/S3IOReader.h" #include "storage/s3/S3IOWriter.h" -#include "storage/IStorage.h" #include "storage/utils.h" INITIALIZE_EASYLOGGINGPP @@ -91,15 +90,18 @@ TEST_F(StorageTest, S3_RW_TEST) { ASSERT_TRUE(storage_inst.StartService().ok()); { - milvus::storage::S3IOWriter writer(index_name); + milvus::storage::S3IOWriter writer; + writer.open(index_name); size_t len = content.length(); writer.write(&len, sizeof(len)); writer.write((void*)(content.data()), len); ASSERT_TRUE(len + sizeof(len) == writer.length()); + writer.close(); } { - milvus::storage::S3IOReader reader(index_name); + milvus::storage::S3IOReader reader; + reader.open(index_name); size_t length = reader.length(); size_t rp = 0; reader.seekg(rp); @@ -121,6 +123,7 @@ TEST_F(StorageTest, S3_RW_TEST) { } ASSERT_TRUE(content == content_out); + reader.close(); } storage_inst.StopService(); diff --git a/core/unittest/wrapper/CMakeLists.txt b/core/unittest/wrapper/CMakeLists.txt index 7cf4846610..487477f3da 100644 --- a/core/unittest/wrapper/CMakeLists.txt +++ b/core/unittest/wrapper/CMakeLists.txt @@ -27,8 +27,8 @@ set(wrapper_files ) set(storage_files - ${MILVUS_ENGINE_SRC}/storage/file/FileIOReader.cpp - ${MILVUS_ENGINE_SRC}/storage/file/FileIOWriter.cpp + ${MILVUS_ENGINE_SRC}/storage/disk/DiskIOReader.cpp + ${MILVUS_ENGINE_SRC}/storage/disk/DiskIOWriter.cpp ) set(util_files