From ea63f65ceb2bedad36d586827cd95e02a38c586c Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Mon, 9 Mar 2020 15:11:55 +0800 Subject: [PATCH] #1537 add interface open()/close() for IOReader/IOWriter Signed-off-by: yudong.cai --- core/src/storage/IOReader.h | 9 ++++----- core/src/storage/IOWriter.h | 10 ++++------ core/src/storage/disk/DiskIOReader.cpp | 18 ++++++++++++------ core/src/storage/disk/DiskIOReader.h | 20 ++++++++++++++++++-- core/src/storage/disk/DiskIOWriter.cpp | 14 +++++++++----- core/src/storage/disk/DiskIOWriter.h | 21 +++++++++++++++++++-- core/src/storage/s3/S3IOReader.cpp | 12 ++++++++---- core/src/storage/s3/S3IOReader.h | 20 ++++++++++++++++++-- core/src/storage/s3/S3IOWriter.cpp | 14 +++++++++----- core/src/storage/s3/S3IOWriter.h | 21 +++++++++++++++++++-- core/src/wrapper/VecIndex.cpp | 14 ++++++++++---- core/unittest/storage/test_s3_client.cpp | 8 ++++++-- 12 files changed, 136 insertions(+), 45 deletions(-) diff --git a/core/src/storage/IOReader.h b/core/src/storage/IOReader.h index 0116602a08..adfeee95db 100644 --- a/core/src/storage/IOReader.h +++ b/core/src/storage/IOReader.h @@ -18,9 +18,8 @@ namespace storage { class IOReader { public: - explicit IOReader(const std::string& name) : name_(name) { - } - ~IOReader() = default; + virtual void + open(const std::string& name) = 0; virtual void read(void* ptr, size_t size) = 0; @@ -31,8 +30,8 @@ class IOReader { virtual size_t length() = 0; - public: - std::string name_; + virtual void + close() = 0; }; } // namespace storage diff --git a/core/src/storage/IOWriter.h b/core/src/storage/IOWriter.h index a2281a02bd..0662d8eb9d 100644 --- a/core/src/storage/IOWriter.h +++ b/core/src/storage/IOWriter.h @@ -18,9 +18,8 @@ namespace storage { class IOWriter { public: - explicit IOWriter(const std::string& name) : name_(name), len_(0) { - } - ~IOWriter() = default; + virtual void + open(const std::string& name) = 0; virtual void write(void* ptr, size_t size) = 0; @@ -28,9 +27,8 @@ class IOWriter { virtual size_t length() = 0; - public: - std::string name_; - size_t len_; + virtual void + close() = 0; }; } // namespace storage diff --git a/core/src/storage/disk/DiskIOReader.cpp b/core/src/storage/disk/DiskIOReader.cpp index 2b3e649f47..f51ffef0b3 100644 --- a/core/src/storage/disk/DiskIOReader.cpp +++ b/core/src/storage/disk/DiskIOReader.cpp @@ -14,14 +14,12 @@ namespace milvus { namespace storage { -DiskIOReader::DiskIOReader(const std::string& name) : IOReader(name) { +void +DiskIOReader::open(const std::string& name) { + name_ = name; fs_ = std::fstream(name_, std::ios::in | std::ios::binary); } -DiskIOReader::~DiskIOReader() { - fs_.close(); -} - void DiskIOReader::read(void* ptr, size_t size) { fs_.read(reinterpret_cast(ptr), size); @@ -35,7 +33,15 @@ DiskIOReader::seekg(size_t pos) { size_t DiskIOReader::length() { fs_.seekg(0, fs_.end); - return fs_.tellg(); + size_t len = fs_.tellg(); + fs_.seekg(0, fs_.beg); + return len; } + +void +DiskIOReader::close() { + fs_.close(); +} + } // namespace storage } // namespace milvus diff --git a/core/src/storage/disk/DiskIOReader.h b/core/src/storage/disk/DiskIOReader.h index 08aa2fdd3f..2fcf52457d 100644 --- a/core/src/storage/disk/DiskIOReader.h +++ b/core/src/storage/disk/DiskIOReader.h @@ -20,8 +20,20 @@ namespace storage { class DiskIOReader : public IOReader { public: - explicit DiskIOReader(const std::string& name); - ~DiskIOReader(); + DiskIOReader() = default; + ~DiskIOReader() = default; + + // No copy and move + DiskIOReader(const DiskIOReader&) = delete; + DiskIOReader(DiskIOReader&&) = delete; + + DiskIOReader& + operator=(const DiskIOReader&) = delete; + DiskIOReader& + operator=(DiskIOReader&&) = delete; + + void + open(const std::string& name) override; void read(void* ptr, size_t size) override; @@ -32,7 +44,11 @@ class DiskIOReader : public IOReader { size_t length() override; + void + close() override; + public: + std::string name_; std::fstream fs_; }; diff --git a/core/src/storage/disk/DiskIOWriter.cpp b/core/src/storage/disk/DiskIOWriter.cpp index 08e7704529..63899463b1 100644 --- a/core/src/storage/disk/DiskIOWriter.cpp +++ b/core/src/storage/disk/DiskIOWriter.cpp @@ -14,14 +14,13 @@ namespace milvus { namespace storage { -DiskIOWriter::DiskIOWriter(const std::string& name) : IOWriter(name) { +void +DiskIOWriter::open(const std::string& name) { + name_ = name; + len_ = 0; fs_ = std::fstream(name_, std::ios::out | std::ios::binary); } -DiskIOWriter::~DiskIOWriter() { - fs_.close(); -} - void DiskIOWriter::write(void* ptr, size_t size) { fs_.write(reinterpret_cast(ptr), size); @@ -33,5 +32,10 @@ DiskIOWriter::length() { return len_; } +void +DiskIOWriter::close() { + fs_.close(); +} + } // namespace storage } // namespace milvus diff --git a/core/src/storage/disk/DiskIOWriter.h b/core/src/storage/disk/DiskIOWriter.h index 4bcbdaf2d3..39c9b5ca68 100644 --- a/core/src/storage/disk/DiskIOWriter.h +++ b/core/src/storage/disk/DiskIOWriter.h @@ -20,8 +20,20 @@ namespace storage { class DiskIOWriter : public IOWriter { public: - explicit DiskIOWriter(const std::string& name); - ~DiskIOWriter(); + DiskIOWriter() = default; + ~DiskIOWriter() = default; + + // No copy and move + DiskIOWriter(const DiskIOWriter&) = delete; + DiskIOWriter(DiskIOWriter&&) = delete; + + DiskIOWriter& + operator=(const DiskIOWriter&) = delete; + DiskIOWriter& + operator=(DiskIOWriter&&) = delete; + + void + open(const std::string& name) override; void write(void* ptr, size_t size) override; @@ -29,7 +41,12 @@ class DiskIOWriter : public IOWriter { size_t length() override; + void + close() override; + public: + std::string name_; + size_t len_; std::fstream fs_; }; diff --git a/core/src/storage/s3/S3IOReader.cpp b/core/src/storage/s3/S3IOReader.cpp index 77edcb3861..e8d073029a 100644 --- a/core/src/storage/s3/S3IOReader.cpp +++ b/core/src/storage/s3/S3IOReader.cpp @@ -15,13 +15,13 @@ namespace milvus { namespace storage { -S3IOReader::S3IOReader(const std::string& name) : IOReader(name), pos_(0) { +void +S3IOReader::open(const std::string& name) { + name_ = name; + pos_ = 0; S3ClientWrapper::GetInstance().GetObjectStr(name_, buffer_); } -S3IOReader::~S3IOReader() { -} - void S3IOReader::read(void* ptr, size_t size) { memcpy(ptr, buffer_.data() + pos_, size); @@ -37,5 +37,9 @@ S3IOReader::length() { return buffer_.length(); } +void +S3IOReader::close() { +} + } // namespace storage } // namespace milvus diff --git a/core/src/storage/s3/S3IOReader.h b/core/src/storage/s3/S3IOReader.h index 5c2529c8e9..e69a64f969 100644 --- a/core/src/storage/s3/S3IOReader.h +++ b/core/src/storage/s3/S3IOReader.h @@ -19,8 +19,20 @@ namespace storage { class S3IOReader : public IOReader { public: - explicit S3IOReader(const std::string& name); - ~S3IOReader(); + S3IOReader() = default; + ~S3IOReader() = default; + + // No copy and move + S3IOReader(const S3IOReader&) = delete; + S3IOReader(S3IOReader&&) = delete; + + S3IOReader& + operator=(const S3IOReader&) = delete; + S3IOReader& + operator=(S3IOReader&&) = delete; + + void + open(const std::string& name) override; void read(void* ptr, size_t size) override; @@ -31,7 +43,11 @@ class S3IOReader : public IOReader { size_t length() override; + void + close() override; + public: + std::string name_; std::string buffer_; size_t pos_; }; diff --git a/core/src/storage/s3/S3IOWriter.cpp b/core/src/storage/s3/S3IOWriter.cpp index 51fff830fe..9d00db3c83 100644 --- a/core/src/storage/s3/S3IOWriter.cpp +++ b/core/src/storage/s3/S3IOWriter.cpp @@ -15,14 +15,13 @@ namespace milvus { namespace storage { -S3IOWriter::S3IOWriter(const std::string& name) : IOWriter(name) { +void +S3IOWriter::open(const std::string& name) { + name_ = name; + len_ = 0; buffer_ = ""; } -S3IOWriter::~S3IOWriter() { - S3ClientWrapper::GetInstance().PutObjectStr(name_, buffer_); -} - void S3IOWriter::write(void* ptr, size_t size) { buffer_ += std::string(reinterpret_cast(ptr), size); @@ -34,5 +33,10 @@ S3IOWriter::length() { return len_; } +void +S3IOWriter::close() { + S3ClientWrapper::GetInstance().PutObjectStr(name_, buffer_); +} + } // namespace storage } // namespace milvus diff --git a/core/src/storage/s3/S3IOWriter.h b/core/src/storage/s3/S3IOWriter.h index 53ec345afe..0b5240d7b1 100644 --- a/core/src/storage/s3/S3IOWriter.h +++ b/core/src/storage/s3/S3IOWriter.h @@ -19,8 +19,20 @@ namespace storage { class S3IOWriter : public IOWriter { public: - explicit S3IOWriter(const std::string& name); - ~S3IOWriter(); + S3IOWriter() = default; + ~S3IOWriter() = default; + + // No copy and move + S3IOWriter(const S3IOWriter&) = delete; + S3IOWriter(S3IOWriter&&) = delete; + + S3IOWriter& + operator=(const S3IOWriter&) = delete; + S3IOWriter& + operator=(S3IOWriter&&) = delete; + + void + open(const std::string& name) override; void write(void* ptr, size_t size) override; @@ -28,7 +40,12 @@ class S3IOWriter : public IOWriter { size_t length() override; + void + close() override; + public: + std::string name_; + size_t len_; std::string buffer_; }; diff --git a/core/src/wrapper/VecIndex.cpp b/core/src/wrapper/VecIndex.cpp index bc8f0626a0..e0ae664583 100644 --- a/core/src/wrapper/VecIndex.cpp +++ b/core/src/wrapper/VecIndex.cpp @@ -179,12 +179,13 @@ read_index(const std::string& location) { std::shared_ptr reader_ptr; if (s3_enable) { - reader_ptr = std::make_shared(location); + reader_ptr = std::make_shared(); } else { - reader_ptr = std::make_shared(location); + reader_ptr = std::make_shared(); } recorder.RecordSection("Start"); + reader_ptr->open(location); size_t length = reader_ptr->length(); if (length <= 0) { @@ -226,6 +227,8 @@ read_index(const std::string& location) { delete[] meta; } + reader_ptr->close(); + double span = recorder.RecordSection("End"); double rate = length * 1000000.0 / span / 1024 / 1024; STORAGE_LOG_DEBUG << "read_index(" << location << ") rate " << rate << "MB/s"; @@ -252,12 +255,13 @@ write_index(VecIndexPtr index, const std::string& location) { std::shared_ptr writer_ptr; if (s3_enable) { - writer_ptr = std::make_shared(location); + writer_ptr = std::make_shared(); } else { - writer_ptr = std::make_shared(location); + writer_ptr = std::make_shared(); } recorder.RecordSection("Start"); + writer_ptr->open(location); writer_ptr->write(&index_type, sizeof(IndexType)); @@ -273,6 +277,8 @@ write_index(VecIndexPtr index, const std::string& location) { writer_ptr->write((void*)binary->data.get(), binary_length); } + writer_ptr->close(); + double span = recorder.RecordSection("End"); double rate = writer_ptr->length() * 1000000.0 / span / 1024 / 1024; STORAGE_LOG_DEBUG << "write_index(" << location << ") rate " << rate << "MB/s"; diff --git a/core/unittest/storage/test_s3_client.cpp b/core/unittest/storage/test_s3_client.cpp index 96098a7b37..75e4cb5c3e 100644 --- a/core/unittest/storage/test_s3_client.cpp +++ b/core/unittest/storage/test_s3_client.cpp @@ -90,15 +90,18 @@ TEST_F(StorageTest, S3_RW_TEST) { ASSERT_TRUE(storage_inst.StartService().ok()); { - milvus::storage::S3IOWriter writer(index_name); + milvus::storage::S3IOWriter writer; + writer.open(index_name); size_t len = content.length(); writer.write(&len, sizeof(len)); writer.write((void*)(content.data()), len); ASSERT_TRUE(len + sizeof(len) == writer.length()); + writer.close(); } { - milvus::storage::S3IOReader reader(index_name); + milvus::storage::S3IOReader reader; + reader.open(index_name); size_t length = reader.length(); size_t rp = 0; reader.seekg(rp); @@ -120,6 +123,7 @@ TEST_F(StorageTest, S3_RW_TEST) { } ASSERT_TRUE(content == content_out); + reader.close(); } storage_inst.StopService();