diff --git a/core/src/codecs/DeletedDocsFormat.h b/core/src/codecs/DeletedDocsFormat.h index 1369664629..431a9a437a 100644 --- a/core/src/codecs/DeletedDocsFormat.h +++ b/core/src/codecs/DeletedDocsFormat.h @@ -20,7 +20,7 @@ #include #include "segment/DeletedDocs.h" -#include "src/storage/disk/DiskOperation.h" +#include "storage/FSHandler.h" namespace milvus { namespace codec { @@ -28,10 +28,10 @@ namespace codec { class DeletedDocsFormat { public: virtual void - read(const storage::OperationPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) = 0; + read(const storage::FSHandlerPtr& fs_ptr, segment::DeletedDocsPtr& deleted_docs) = 0; virtual void - write(const storage::OperationPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) = 0; + write(const storage::FSHandlerPtr& fs_ptr, const segment::DeletedDocsPtr& deleted_docs) = 0; }; using DeletedDocsFormatPtr = std::shared_ptr; diff --git a/core/src/codecs/IdBloomFilterFormat.h b/core/src/codecs/IdBloomFilterFormat.h index 17736bfaa4..a374dd70cb 100644 --- a/core/src/codecs/IdBloomFilterFormat.h +++ b/core/src/codecs/IdBloomFilterFormat.h @@ -20,7 +20,7 @@ #include #include "segment/IdBloomFilter.h" -#include "src/storage/disk/DiskOperation.h" +#include "storage/FSHandler.h" namespace milvus { namespace codec { @@ -28,13 +28,13 @@ namespace codec { class IdBloomFilterFormat { public: virtual void - read(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; + read(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; virtual void - write(const storage::OperationPtr& directory_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; + write(const storage::FSHandlerPtr& fs_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; virtual void - create(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; + create(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; }; using IdBloomFilterFormatPtr = std::shared_ptr; diff --git a/core/src/codecs/VectorsFormat.h b/core/src/codecs/VectorsFormat.h index 2fa77d628d..5227f9a6a0 100644 --- a/core/src/codecs/VectorsFormat.h +++ b/core/src/codecs/VectorsFormat.h @@ -21,7 +21,7 @@ #include #include "segment/Vectors.h" -#include "src/storage/disk/DiskOperation.h" +#include "storage/FSHandler.h" namespace milvus { namespace codec { @@ -29,16 +29,16 @@ namespace codec { class VectorsFormat { public: virtual void - read(const storage::OperationPtr& directory_ptr, segment::VectorsPtr& vectors_read) = 0; + read(const storage::FSHandlerPtr& fs_ptr, segment::VectorsPtr& vectors_read) = 0; virtual void - write(const storage::OperationPtr& directory_ptr, const segment::VectorsPtr& vectors) = 0; + write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorsPtr& vectors) = 0; virtual void - read_uids(const storage::OperationPtr& directory_ptr, std::vector& uids) = 0; + read_uids(const storage::FSHandlerPtr& fs_ptr, std::vector& uids) = 0; virtual void - read_vectors(const storage::OperationPtr& directory_ptr, off_t offset, size_t num_bytes, + read_vectors(const storage::FSHandlerPtr& fs_ptr, off_t offset, size_t num_bytes, std::vector& raw_vectors) = 0; }; diff --git a/core/src/codecs/default/DefaultDeletedDocsFormat.cpp b/core/src/codecs/default/DefaultDeletedDocsFormat.cpp index 1027e477a1..128b682543 100644 --- a/core/src/codecs/default/DefaultDeletedDocsFormat.cpp +++ b/core/src/codecs/default/DefaultDeletedDocsFormat.cpp @@ -35,10 +35,10 @@ namespace milvus { namespace codec { void -DefaultDeletedDocsFormat::read(const storage::OperationPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) { +DefaultDeletedDocsFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::DeletedDocsPtr& deleted_docs) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirectory(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); const std::string del_file_path = dir_path + "/" + deleted_docs_filename_; int del_fd = open(del_file_path.c_str(), O_RDONLY, 00664); @@ -75,11 +75,10 @@ DefaultDeletedDocsFormat::read(const storage::OperationPtr& directory_ptr, segme } void -DefaultDeletedDocsFormat::write(const storage::OperationPtr& directory_ptr, - const segment::DeletedDocsPtr& deleted_docs) { +DefaultDeletedDocsFormat::write(const storage::FSHandlerPtr& fs_ptr, const segment::DeletedDocsPtr& deleted_docs) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirectory(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); const std::string del_file_path = dir_path + "/" + deleted_docs_filename_; // Create a temporary file from the existing file diff --git a/core/src/codecs/default/DefaultDeletedDocsFormat.h b/core/src/codecs/default/DefaultDeletedDocsFormat.h index f1e13ed574..d755245a84 100644 --- a/core/src/codecs/default/DefaultDeletedDocsFormat.h +++ b/core/src/codecs/default/DefaultDeletedDocsFormat.h @@ -30,10 +30,10 @@ class DefaultDeletedDocsFormat : public DeletedDocsFormat { DefaultDeletedDocsFormat() = default; void - read(const storage::OperationPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) override; + read(const storage::FSHandlerPtr& fs_ptr, segment::DeletedDocsPtr& deleted_docs) override; void - write(const storage::OperationPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) override; + write(const storage::FSHandlerPtr& fs_ptr, const segment::DeletedDocsPtr& deleted_docs) override; // No copy and move DefaultDeletedDocsFormat(const DefaultDeletedDocsFormat&) = delete; diff --git a/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp b/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp index 5eb759fd50..ed7c40908b 100644 --- a/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp +++ b/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp @@ -30,11 +30,10 @@ constexpr unsigned int bloom_filter_capacity = 500000; constexpr double bloom_filter_error_rate = 0.01; void -DefaultIdBloomFilterFormat::read(const storage::OperationPtr& directory_ptr, - segment::IdBloomFilterPtr& id_bloom_filter_ptr) { +DefaultIdBloomFilterFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirectory(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_; scaling_bloom_t* bloom_filter = new_scaling_bloom_from_file(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str()); @@ -48,11 +47,11 @@ DefaultIdBloomFilterFormat::read(const storage::OperationPtr& directory_ptr, } void -DefaultIdBloomFilterFormat::write(const storage::OperationPtr& directory_ptr, +DefaultIdBloomFilterFormat::write(const storage::FSHandlerPtr& fs_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirectory(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_; if (scaling_bloom_flush(id_bloom_filter_ptr->GetBloomFilter()) == -1) { std::string err_msg = @@ -63,9 +62,9 @@ DefaultIdBloomFilterFormat::write(const storage::OperationPtr& directory_ptr, } void -DefaultIdBloomFilterFormat::create(const storage::OperationPtr& directory_ptr, +DefaultIdBloomFilterFormat::create(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) { - std::string dir_path = directory_ptr->GetDirectory(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_; scaling_bloom_t* bloom_filter = new_scaling_bloom(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str()); diff --git a/core/src/codecs/default/DefaultIdBloomFilterFormat.h b/core/src/codecs/default/DefaultIdBloomFilterFormat.h index ceb669ffff..b2c89ebfa2 100644 --- a/core/src/codecs/default/DefaultIdBloomFilterFormat.h +++ b/core/src/codecs/default/DefaultIdBloomFilterFormat.h @@ -32,13 +32,13 @@ class DefaultIdBloomFilterFormat : public IdBloomFilterFormat { DefaultIdBloomFilterFormat() = default; void - read(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; + read(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; void - write(const storage::OperationPtr& directory_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; + write(const storage::FSHandlerPtr& fs_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; void - create(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; + create(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; // No copy and move DefaultIdBloomFilterFormat(const DefaultIdBloomFilterFormat&) = delete; diff --git a/core/src/codecs/default/DefaultVectorsFormat.cpp b/core/src/codecs/default/DefaultVectorsFormat.cpp index 36936a07c0..412049fc9a 100644 --- a/core/src/codecs/default/DefaultVectorsFormat.cpp +++ b/core/src/codecs/default/DefaultVectorsFormat.cpp @@ -102,10 +102,10 @@ DefaultVectorsFormat::read_uids_internal(const std::string& file_path, std::vect } void -DefaultVectorsFormat::read(const storage::OperationPtr& directory_ptr, segment::VectorsPtr& vectors_read) { +DefaultVectorsFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::VectorsPtr& vectors_read) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirectory(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); if (!boost::filesystem::is_directory(dir_path)) { std::string err_msg = "Directory: " + dir_path + "does not exist"; ENGINE_LOG_ERROR << err_msg; @@ -134,10 +134,10 @@ DefaultVectorsFormat::read(const storage::OperationPtr& directory_ptr, segment:: } void -DefaultVectorsFormat::write(const storage::OperationPtr& directory_ptr, const segment::VectorsPtr& vectors) { +DefaultVectorsFormat::write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorsPtr& vectors) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirectory(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); const std::string rv_file_path = dir_path + "/" + vectors->GetName() + raw_vector_extension_; const std::string uid_file_path = dir_path + "/" + vectors->GetName() + user_id_extension_; @@ -197,10 +197,10 @@ DefaultVectorsFormat::write(const storage::OperationPtr& directory_ptr, const se } void -DefaultVectorsFormat::read_uids(const storage::OperationPtr& directory_ptr, std::vector& uids) { +DefaultVectorsFormat::read_uids(const storage::FSHandlerPtr& fs_ptr, std::vector& uids) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirectory(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); if (!boost::filesystem::is_directory(dir_path)) { std::string err_msg = "Directory: " + dir_path + "does not exist"; ENGINE_LOG_ERROR << err_msg; @@ -221,11 +221,11 @@ DefaultVectorsFormat::read_uids(const storage::OperationPtr& directory_ptr, std: } void -DefaultVectorsFormat::read_vectors(const storage::OperationPtr& directory_ptr, off_t offset, size_t num_bytes, +DefaultVectorsFormat::read_vectors(const storage::FSHandlerPtr& fs_ptr, off_t offset, size_t num_bytes, std::vector& raw_vectors) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirectory(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); if (!boost::filesystem::is_directory(dir_path)) { std::string err_msg = "Directory: " + dir_path + "does not exist"; ENGINE_LOG_ERROR << err_msg; diff --git a/core/src/codecs/default/DefaultVectorsFormat.h b/core/src/codecs/default/DefaultVectorsFormat.h index 1fd2081fb9..bfb20f221b 100644 --- a/core/src/codecs/default/DefaultVectorsFormat.h +++ b/core/src/codecs/default/DefaultVectorsFormat.h @@ -32,16 +32,16 @@ class DefaultVectorsFormat : public VectorsFormat { DefaultVectorsFormat() = default; void - read(const storage::OperationPtr& directory_ptr, segment::VectorsPtr& vectors_read) override; + read(const storage::FSHandlerPtr& fs_ptr, segment::VectorsPtr& vectors_read) override; void - write(const storage::OperationPtr& directory_ptr, const segment::VectorsPtr& vectors) override; + write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorsPtr& vectors) override; void - read_uids(const storage::OperationPtr& directory_ptr, std::vector& uids) override; + read_uids(const storage::FSHandlerPtr& fs_ptr, std::vector& uids) override; void - read_vectors(const storage::OperationPtr& directory_ptr, off_t offset, size_t num_bytes, + read_vectors(const storage::FSHandlerPtr& fs_ptr, off_t offset, size_t num_bytes, std::vector& raw_vectors) override; // No copy and move diff --git a/core/src/segment/SegmentReader.cpp b/core/src/segment/SegmentReader.cpp index 6bbf497ba1..757ed21c56 100644 --- a/core/src/segment/SegmentReader.cpp +++ b/core/src/segment/SegmentReader.cpp @@ -21,14 +21,19 @@ #include "Vectors.h" #include "codecs/default/DefaultCodec.h" -#include "src/storage/disk/DiskOperation.h" +#include "storage/disk/DiskIOReader.h" +#include "storage/disk/DiskIOWriter.h" +#include "storage/disk/DiskOperation.h" #include "utils/Log.h" namespace milvus { namespace segment { SegmentReader::SegmentReader(const std::string& directory) { - directory_ptr_ = std::make_shared(directory); + storage::IOReaderPtr reader_ptr = std::make_shared(); + storage::IOWriterPtr writer_ptr = std::make_shared(); + storage::OperationPtr operation_ptr = std::make_shared(directory); + fs_ptr_ = std::make_shared(reader_ptr, writer_ptr, operation_ptr); segment_ptr_ = std::make_shared(); } @@ -43,9 +48,9 @@ SegmentReader::Load() { // TODO(zhiru) codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); - default_codec.GetVectorsFormat()->read(directory_ptr_, segment_ptr_->vectors_ptr_); - default_codec.GetDeletedDocsFormat()->read(directory_ptr_, segment_ptr_->deleted_docs_ptr_); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetVectorsFormat()->read(fs_ptr_, segment_ptr_->vectors_ptr_); + default_codec.GetDeletedDocsFormat()->read(fs_ptr_, segment_ptr_->deleted_docs_ptr_); } catch (std::exception& e) { return Status(DB_ERROR, e.what()); } @@ -56,8 +61,8 @@ Status SegmentReader::LoadVectors(off_t offset, size_t num_bytes, std::vector& raw_vectors) { codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); - default_codec.GetVectorsFormat()->read_vectors(directory_ptr_, offset, num_bytes, raw_vectors); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetVectorsFormat()->read_vectors(fs_ptr_, offset, num_bytes, raw_vectors); } catch (std::exception& e) { std::string err_msg = "Failed to load raw vectors: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -70,8 +75,8 @@ Status SegmentReader::LoadUids(std::vector& uids) { codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); - default_codec.GetVectorsFormat()->read_uids(directory_ptr_, uids); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetVectorsFormat()->read_uids(fs_ptr_, uids); } catch (std::exception& e) { std::string err_msg = "Failed to load uids: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -90,8 +95,8 @@ Status SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) { codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); - default_codec.GetIdBloomFilterFormat()->read(directory_ptr_, id_bloom_filter_ptr); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetIdBloomFilterFormat()->read(fs_ptr_, id_bloom_filter_ptr); } catch (std::exception& e) { std::string err_msg = "Failed to load bloom filter: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -104,8 +109,8 @@ Status SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) { codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); - default_codec.GetDeletedDocsFormat()->read(directory_ptr_, deleted_docs_ptr); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetDeletedDocsFormat()->read(fs_ptr_, deleted_docs_ptr); } catch (std::exception& e) { std::string err_msg = "Failed to load deleted docs: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; diff --git a/core/src/segment/SegmentReader.h b/core/src/segment/SegmentReader.h index 7a41a7939a..48247e5deb 100644 --- a/core/src/segment/SegmentReader.h +++ b/core/src/segment/SegmentReader.h @@ -22,7 +22,7 @@ #include #include "segment/Types.h" -#include "src/storage/disk/DiskOperation.h" +#include "storage/FSHandler.h" #include "utils/Status.h" namespace milvus { @@ -55,7 +55,7 @@ class SegmentReader { GetSegment(SegmentPtr& segment_ptr); private: - storage::OperationPtr directory_ptr_; + storage::FSHandlerPtr fs_ptr_; SegmentPtr segment_ptr_; }; diff --git a/core/src/segment/SegmentWriter.cpp b/core/src/segment/SegmentWriter.cpp index fa210ae36e..2136e229c0 100644 --- a/core/src/segment/SegmentWriter.cpp +++ b/core/src/segment/SegmentWriter.cpp @@ -23,14 +23,19 @@ #include "SegmentReader.h" #include "Vectors.h" #include "codecs/default/DefaultCodec.h" -#include "src/storage/disk/DiskOperation.h" +#include "storage/disk/DiskIOReader.h" +#include "storage/disk/DiskIOWriter.h" +#include "storage/disk/DiskOperation.h" #include "utils/Log.h" namespace milvus { namespace segment { SegmentWriter::SegmentWriter(const std::string& directory) { - directory_ptr_ = std::make_shared(directory); + storage::IOReaderPtr reader_ptr = std::make_shared(); + storage::IOWriterPtr writer_ptr = std::make_shared(); + storage::OperationPtr operation_ptr = std::make_shared(directory); + fs_ptr_ = std::make_shared(reader_ptr, writer_ptr, operation_ptr); segment_ptr_ = std::make_shared(); } @@ -84,8 +89,8 @@ Status SegmentWriter::WriteVectors() { codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); - default_codec.GetVectorsFormat()->write(directory_ptr_, segment_ptr_->vectors_ptr_); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetVectorsFormat()->write(fs_ptr_, segment_ptr_->vectors_ptr_); } catch (std::exception& e) { std::string err_msg = "Failed to write vectors: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -98,11 +103,11 @@ Status SegmentWriter::WriteBloomFilter() { codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); + fs_ptr_->operation_ptr_->CreateDirectory(); auto start = std::chrono::high_resolution_clock::now(); - default_codec.GetIdBloomFilterFormat()->create(directory_ptr_, segment_ptr_->id_bloom_filter_ptr_); + default_codec.GetIdBloomFilterFormat()->create(fs_ptr_, segment_ptr_->id_bloom_filter_ptr_); auto end = std::chrono::high_resolution_clock::now(); std::chrono::duration diff = end - start; @@ -121,7 +126,7 @@ SegmentWriter::WriteBloomFilter() { start = std::chrono::high_resolution_clock::now(); - default_codec.GetIdBloomFilterFormat()->write(directory_ptr_, segment_ptr_->id_bloom_filter_ptr_); + default_codec.GetIdBloomFilterFormat()->write(fs_ptr_, segment_ptr_->id_bloom_filter_ptr_); end = std::chrono::high_resolution_clock::now(); diff = end - start; @@ -138,9 +143,9 @@ Status SegmentWriter::WriteDeletedDocs() { codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); + fs_ptr_->operation_ptr_->CreateDirectory(); DeletedDocsPtr deleted_docs_ptr = std::make_shared(); - default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs_ptr); + default_codec.GetDeletedDocsFormat()->write(fs_ptr_, deleted_docs_ptr); } catch (std::exception& e) { std::string err_msg = "Failed to write deleted docs: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -153,8 +158,8 @@ Status SegmentWriter::WriteDeletedDocs(const DeletedDocsPtr& deleted_docs) { codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); - default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetDeletedDocsFormat()->write(fs_ptr_, deleted_docs); } catch (std::exception& e) { std::string err_msg = "Failed to write deleted docs: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -167,8 +172,8 @@ Status SegmentWriter::WriteBloomFilter(const IdBloomFilterPtr& id_bloom_filter_ptr) { codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); - default_codec.GetIdBloomFilterFormat()->write(directory_ptr_, id_bloom_filter_ptr); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetIdBloomFilterFormat()->write(fs_ptr_, id_bloom_filter_ptr); } catch (std::exception& e) { std::string err_msg = "Failed to write bloom filter: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -191,11 +196,11 @@ SegmentWriter::GetSegment(SegmentPtr& segment_ptr) { Status SegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name) { - if (dir_to_merge == directory_ptr_->GetDirectory()) { + if (dir_to_merge == fs_ptr_->operation_ptr_->GetDirectory()) { return Status(DB_ERROR, "Cannot Merge Self"); } - ENGINE_LOG_DEBUG << "Merging from " << dir_to_merge << " to " << directory_ptr_->GetDirectory(); + ENGINE_LOG_DEBUG << "Merging from " << dir_to_merge << " to " << fs_ptr_->operation_ptr_->GetDirectory(); auto start = std::chrono::high_resolution_clock::now(); @@ -234,7 +239,7 @@ SegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name) { ENGINE_LOG_DEBUG << "Adding " << segment_to_merge->vectors_ptr_->GetCount() << " vectors and uids took " << diff.count() << " s"; - ENGINE_LOG_DEBUG << "Merging completed from " << dir_to_merge << " to " << directory_ptr_->GetDirectory(); + ENGINE_LOG_DEBUG << "Merging completed from " << dir_to_merge << " to " << fs_ptr_->operation_ptr_->GetDirectory(); return Status::OK(); } diff --git a/core/src/segment/SegmentWriter.h b/core/src/segment/SegmentWriter.h index a430cda9b8..b5d42761a1 100644 --- a/core/src/segment/SegmentWriter.h +++ b/core/src/segment/SegmentWriter.h @@ -22,7 +22,7 @@ #include #include "segment/Types.h" -#include "src/storage/disk/DiskOperation.h" +#include "storage/FSHandler.h" #include "utils/Status.h" namespace milvus { @@ -70,7 +70,7 @@ class SegmentWriter { WriteDeletedDocs(); private: - storage::OperationPtr directory_ptr_; + storage::FSHandlerPtr fs_ptr_; SegmentPtr segment_ptr_; }; diff --git a/core/src/storage/FSHandler.h b/core/src/storage/FSHandler.h new file mode 100644 index 0000000000..8b0f175bf9 --- /dev/null +++ b/core/src/storage/FSHandler.h @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "storage/IOReader.h" +#include "storage/IOWriter.h" +#include "storage/Operation.h" + +namespace milvus { +namespace storage { + +struct FSHandler { + IOReaderPtr reader_ptr_ = nullptr; + IOWriterPtr writer_ptr_ = nullptr; + OperationPtr operation_ptr_ = nullptr; + + FSHandler(IOReaderPtr& reader_ptr, IOWriterPtr& writer_ptr, OperationPtr& operation_ptr) + : reader_ptr_(reader_ptr), writer_ptr_(writer_ptr), operation_ptr_(operation_ptr) { + } +}; + +using FSHandlerPtr = std::shared_ptr; + +} // namespace storage +} // namespace milvus diff --git a/core/src/storage/IOReader.h b/core/src/storage/IOReader.h index adfeee95db..eeddb7438b 100644 --- a/core/src/storage/IOReader.h +++ b/core/src/storage/IOReader.h @@ -11,6 +11,7 @@ #pragma once +#include #include namespace milvus { @@ -34,5 +35,7 @@ class IOReader { close() = 0; }; +using IOReaderPtr = std::shared_ptr; + } // namespace storage } // namespace milvus diff --git a/core/src/storage/IOWriter.h b/core/src/storage/IOWriter.h index 0662d8eb9d..38b6887057 100644 --- a/core/src/storage/IOWriter.h +++ b/core/src/storage/IOWriter.h @@ -11,6 +11,7 @@ #pragma once +#include #include namespace milvus { @@ -31,5 +32,7 @@ class IOWriter { close() = 0; }; +using IOWriterPtr = std::shared_ptr; + } // namespace storage } // namespace milvus