From 1d18c0a938d277e379f28741d184f93f4f7ec146 Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Mon, 9 Mar 2020 13:08:17 +0800 Subject: [PATCH 01/12] #1547 rename storage/file to storage/disk Signed-off-by: yudong.cai --- core/src/CMakeLists.txt | 4 +- core/src/storage/IStorage.h | 44 ------------------- .../DiskIOReader.cpp} | 12 ++--- .../FileIOReader.h => disk/DiskIOReader.h} | 6 +-- .../DiskIOWriter.cpp} | 10 ++--- .../FileIOWriter.h => disk/DiskIOWriter.h} | 6 +-- core/src/storage/s3/S3ClientWrapper.h | 23 +++++----- core/src/wrapper/VecIndex.cpp | 8 ++-- core/unittest/CMakeLists.txt | 4 +- core/unittest/storage/test_s3_client.cpp | 1 - core/unittest/wrapper/CMakeLists.txt | 4 +- 11 files changed, 39 insertions(+), 83 deletions(-) delete mode 100644 core/src/storage/IStorage.h rename core/src/storage/{file/FileIOReader.cpp => disk/DiskIOReader.cpp} (79%) rename core/src/storage/{file/FileIOReader.h => disk/DiskIOReader.h} (89%) rename core/src/storage/{file/FileIOWriter.cpp => disk/DiskIOWriter.cpp} (81%) rename core/src/storage/{file/FileIOWriter.h => disk/DiskIOWriter.h} (88%) diff --git a/core/src/CMakeLists.txt b/core/src/CMakeLists.txt index 3978a407fd..ef1af69eb9 100644 --- a/core/src/CMakeLists.txt +++ b/core/src/CMakeLists.txt @@ -106,11 +106,11 @@ set(web_server_files ) aux_source_directory(${MILVUS_ENGINE_SRC}/storage storage_main_files) -aux_source_directory(${MILVUS_ENGINE_SRC}/storage/file storage_file_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/storage/disk storage_disk_files) aux_source_directory(${MILVUS_ENGINE_SRC}/storage/s3 storage_s3_files) set(storage_files ${storage_main_files} - ${storage_file_files} + ${storage_disk_files} ${storage_s3_files} ) diff --git a/core/src/storage/IStorage.h b/core/src/storage/IStorage.h deleted file mode 100644 index 6839f2bfca..0000000000 --- a/core/src/storage/IStorage.h +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -#pragma once - -#include -#include -#include "utils/Status.h" - -namespace milvus { -namespace storage { - -class IStorage { - public: - virtual Status - CreateBucket() = 0; - virtual Status - DeleteBucket() = 0; - virtual Status - PutObjectFile(const std::string& object_name, const std::string& file_path) = 0; - virtual Status - PutObjectStr(const std::string& object_name, const std::string& content) = 0; - virtual Status - GetObjectFile(const std::string& object_name, const std::string& file_path) = 0; - virtual Status - GetObjectStr(const std::string& object_name, std::string& content) = 0; - virtual Status - ListObjects(std::vector& object_list, const std::string& marker = "") = 0; - virtual Status - DeleteObject(const std::string& object_name) = 0; - virtual Status - DeleteObjects(const std::string& marker) = 0; -}; - -} // namespace storage -} // namespace milvus diff --git a/core/src/storage/file/FileIOReader.cpp b/core/src/storage/disk/DiskIOReader.cpp similarity index 79% rename from core/src/storage/file/FileIOReader.cpp rename to core/src/storage/disk/DiskIOReader.cpp index 9aab29e344..2b3e649f47 100644 --- a/core/src/storage/file/FileIOReader.cpp +++ b/core/src/storage/disk/DiskIOReader.cpp @@ -9,31 +9,31 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. -#include "storage/file/FileIOReader.h" +#include "storage/disk/DiskIOReader.h" namespace milvus { namespace storage { -FileIOReader::FileIOReader(const std::string& name) : IOReader(name) { +DiskIOReader::DiskIOReader(const std::string& name) : IOReader(name) { fs_ = std::fstream(name_, std::ios::in | std::ios::binary); } -FileIOReader::~FileIOReader() { +DiskIOReader::~DiskIOReader() { fs_.close(); } void -FileIOReader::read(void* ptr, size_t size) { +DiskIOReader::read(void* ptr, size_t size) { fs_.read(reinterpret_cast(ptr), size); } void -FileIOReader::seekg(size_t pos) { +DiskIOReader::seekg(size_t pos) { fs_.seekg(pos); } size_t -FileIOReader::length() { +DiskIOReader::length() { fs_.seekg(0, fs_.end); return fs_.tellg(); } diff --git a/core/src/storage/file/FileIOReader.h b/core/src/storage/disk/DiskIOReader.h similarity index 89% rename from core/src/storage/file/FileIOReader.h rename to core/src/storage/disk/DiskIOReader.h index ed35c39d6c..08aa2fdd3f 100644 --- a/core/src/storage/file/FileIOReader.h +++ b/core/src/storage/disk/DiskIOReader.h @@ -18,10 +18,10 @@ namespace milvus { namespace storage { -class FileIOReader : public IOReader { +class DiskIOReader : public IOReader { public: - explicit FileIOReader(const std::string& name); - ~FileIOReader(); + explicit DiskIOReader(const std::string& name); + ~DiskIOReader(); void read(void* ptr, size_t size) override; diff --git a/core/src/storage/file/FileIOWriter.cpp b/core/src/storage/disk/DiskIOWriter.cpp similarity index 81% rename from core/src/storage/file/FileIOWriter.cpp rename to core/src/storage/disk/DiskIOWriter.cpp index 3992b58369..08e7704529 100644 --- a/core/src/storage/file/FileIOWriter.cpp +++ b/core/src/storage/disk/DiskIOWriter.cpp @@ -9,27 +9,27 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. -#include "storage/file/FileIOWriter.h" +#include "storage/disk/DiskIOWriter.h" namespace milvus { namespace storage { -FileIOWriter::FileIOWriter(const std::string& name) : IOWriter(name) { +DiskIOWriter::DiskIOWriter(const std::string& name) : IOWriter(name) { fs_ = std::fstream(name_, std::ios::out | std::ios::binary); } -FileIOWriter::~FileIOWriter() { +DiskIOWriter::~DiskIOWriter() { fs_.close(); } void -FileIOWriter::write(void* ptr, size_t size) { +DiskIOWriter::write(void* ptr, size_t size) { fs_.write(reinterpret_cast(ptr), size); len_ += size; } size_t -FileIOWriter::length() { +DiskIOWriter::length() { return len_; } diff --git a/core/src/storage/file/FileIOWriter.h b/core/src/storage/disk/DiskIOWriter.h similarity index 88% rename from core/src/storage/file/FileIOWriter.h rename to core/src/storage/disk/DiskIOWriter.h index b06eb3feca..4bcbdaf2d3 100644 --- a/core/src/storage/file/FileIOWriter.h +++ b/core/src/storage/disk/DiskIOWriter.h @@ -18,10 +18,10 @@ namespace milvus { namespace storage { -class FileIOWriter : public IOWriter { +class DiskIOWriter : public IOWriter { public: - explicit FileIOWriter(const std::string& name); - ~FileIOWriter(); + explicit DiskIOWriter(const std::string& name); + ~DiskIOWriter(); void write(void* ptr, size_t size) override; diff --git a/core/src/storage/s3/S3ClientWrapper.h b/core/src/storage/s3/S3ClientWrapper.h index 452c5cc2e7..f0e2d8cfe7 100644 --- a/core/src/storage/s3/S3ClientWrapper.h +++ b/core/src/storage/s3/S3ClientWrapper.h @@ -16,12 +16,13 @@ #include #include #include -#include "storage/IStorage.h" + +#include "utils/Status.h" namespace milvus { namespace storage { -class S3ClientWrapper : public IStorage { +class S3ClientWrapper { public: static S3ClientWrapper& GetInstance() { @@ -35,23 +36,23 @@ class S3ClientWrapper : public IStorage { StopService(); Status - CreateBucket() override; + CreateBucket(); Status - DeleteBucket() override; + DeleteBucket(); Status - PutObjectFile(const std::string& object_key, const std::string& file_path) override; + PutObjectFile(const std::string& object_key, const std::string& file_path); Status - PutObjectStr(const std::string& object_key, const std::string& content) override; + PutObjectStr(const std::string& object_key, const std::string& content); Status - GetObjectFile(const std::string& object_key, const std::string& file_path) override; + GetObjectFile(const std::string& object_key, const std::string& file_path); Status - GetObjectStr(const std::string& object_key, std::string& content) override; + GetObjectStr(const std::string& object_key, std::string& content); Status - ListObjects(std::vector& object_list, const std::string& marker = "") override; + ListObjects(std::vector& object_list, const std::string& marker = ""); Status - DeleteObject(const std::string& object_key) override; + DeleteObject(const std::string& object_key); Status - DeleteObjects(const std::string& marker) override; + DeleteObjects(const std::string& marker); private: std::shared_ptr client_ptr_; diff --git a/core/src/wrapper/VecIndex.cpp b/core/src/wrapper/VecIndex.cpp index 9f5552c7a3..bc8f0626a0 100644 --- a/core/src/wrapper/VecIndex.cpp +++ b/core/src/wrapper/VecIndex.cpp @@ -23,8 +23,8 @@ #include "knowhere/index/vector_index/IndexNSG.h" #include "knowhere/index/vector_index/IndexSPTAG.h" #include "server/Config.h" -#include "storage/file/FileIOReader.h" -#include "storage/file/FileIOWriter.h" +#include "storage/disk/DiskIOReader.h" +#include "storage/disk/DiskIOWriter.h" #include "storage/s3/S3IOReader.h" #include "storage/s3/S3IOWriter.h" #include "utils/Exception.h" @@ -181,7 +181,7 @@ read_index(const std::string& location) { if (s3_enable) { reader_ptr = std::make_shared(location); } else { - reader_ptr = std::make_shared(location); + reader_ptr = std::make_shared(location); } recorder.RecordSection("Start"); @@ -254,7 +254,7 @@ write_index(VecIndexPtr index, const std::string& location) { if (s3_enable) { writer_ptr = std::make_shared(location); } else { - writer_ptr = std::make_shared(location); + writer_ptr = std::make_shared(location); } recorder.RecordSection("Start"); diff --git a/core/unittest/CMakeLists.txt b/core/unittest/CMakeLists.txt index 95b63523d5..aedbada0b3 100644 --- a/core/unittest/CMakeLists.txt +++ b/core/unittest/CMakeLists.txt @@ -98,11 +98,11 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_files) aux_source_directory(${MILVUS_ENGINE_SRC}/tracing tracing_files) aux_source_directory(${MILVUS_ENGINE_SRC}/storage storage_main_files) -aux_source_directory(${MILVUS_ENGINE_SRC}/storage/file storage_file_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/storage/disk storage_disk_files) aux_source_directory(${MILVUS_ENGINE_SRC}/storage/s3 storage_s3_files) set(storage_files ${storage_main_files} - ${storage_file_files} + ${storage_disk_files} ${storage_s3_files} ) diff --git a/core/unittest/storage/test_s3_client.cpp b/core/unittest/storage/test_s3_client.cpp index fab6d5e94b..96098a7b37 100644 --- a/core/unittest/storage/test_s3_client.cpp +++ b/core/unittest/storage/test_s3_client.cpp @@ -21,7 +21,6 @@ #include "storage/s3/S3ClientWrapper.h" #include "storage/s3/S3IOReader.h" #include "storage/s3/S3IOWriter.h" -#include "storage/IStorage.h" #include "storage/utils.h" INITIALIZE_EASYLOGGINGPP diff --git a/core/unittest/wrapper/CMakeLists.txt b/core/unittest/wrapper/CMakeLists.txt index 7cf4846610..487477f3da 100644 --- a/core/unittest/wrapper/CMakeLists.txt +++ b/core/unittest/wrapper/CMakeLists.txt @@ -27,8 +27,8 @@ set(wrapper_files ) set(storage_files - ${MILVUS_ENGINE_SRC}/storage/file/FileIOReader.cpp - ${MILVUS_ENGINE_SRC}/storage/file/FileIOWriter.cpp + ${MILVUS_ENGINE_SRC}/storage/disk/DiskIOReader.cpp + ${MILVUS_ENGINE_SRC}/storage/disk/DiskIOWriter.cpp ) set(util_files From ea63f65ceb2bedad36d586827cd95e02a38c586c Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Mon, 9 Mar 2020 15:11:55 +0800 Subject: [PATCH 02/12] #1537 add interface open()/close() for IOReader/IOWriter Signed-off-by: yudong.cai --- core/src/storage/IOReader.h | 9 ++++----- core/src/storage/IOWriter.h | 10 ++++------ core/src/storage/disk/DiskIOReader.cpp | 18 ++++++++++++------ core/src/storage/disk/DiskIOReader.h | 20 ++++++++++++++++++-- core/src/storage/disk/DiskIOWriter.cpp | 14 +++++++++----- core/src/storage/disk/DiskIOWriter.h | 21 +++++++++++++++++++-- core/src/storage/s3/S3IOReader.cpp | 12 ++++++++---- core/src/storage/s3/S3IOReader.h | 20 ++++++++++++++++++-- core/src/storage/s3/S3IOWriter.cpp | 14 +++++++++----- core/src/storage/s3/S3IOWriter.h | 21 +++++++++++++++++++-- core/src/wrapper/VecIndex.cpp | 14 ++++++++++---- core/unittest/storage/test_s3_client.cpp | 8 ++++++-- 12 files changed, 136 insertions(+), 45 deletions(-) diff --git a/core/src/storage/IOReader.h b/core/src/storage/IOReader.h index 0116602a08..adfeee95db 100644 --- a/core/src/storage/IOReader.h +++ b/core/src/storage/IOReader.h @@ -18,9 +18,8 @@ namespace storage { class IOReader { public: - explicit IOReader(const std::string& name) : name_(name) { - } - ~IOReader() = default; + virtual void + open(const std::string& name) = 0; virtual void read(void* ptr, size_t size) = 0; @@ -31,8 +30,8 @@ class IOReader { virtual size_t length() = 0; - public: - std::string name_; + virtual void + close() = 0; }; } // namespace storage diff --git a/core/src/storage/IOWriter.h b/core/src/storage/IOWriter.h index a2281a02bd..0662d8eb9d 100644 --- a/core/src/storage/IOWriter.h +++ b/core/src/storage/IOWriter.h @@ -18,9 +18,8 @@ namespace storage { class IOWriter { public: - explicit IOWriter(const std::string& name) : name_(name), len_(0) { - } - ~IOWriter() = default; + virtual void + open(const std::string& name) = 0; virtual void write(void* ptr, size_t size) = 0; @@ -28,9 +27,8 @@ class IOWriter { virtual size_t length() = 0; - public: - std::string name_; - size_t len_; + virtual void + close() = 0; }; } // namespace storage diff --git a/core/src/storage/disk/DiskIOReader.cpp b/core/src/storage/disk/DiskIOReader.cpp index 2b3e649f47..f51ffef0b3 100644 --- a/core/src/storage/disk/DiskIOReader.cpp +++ b/core/src/storage/disk/DiskIOReader.cpp @@ -14,14 +14,12 @@ namespace milvus { namespace storage { -DiskIOReader::DiskIOReader(const std::string& name) : IOReader(name) { +void +DiskIOReader::open(const std::string& name) { + name_ = name; fs_ = std::fstream(name_, std::ios::in | std::ios::binary); } -DiskIOReader::~DiskIOReader() { - fs_.close(); -} - void DiskIOReader::read(void* ptr, size_t size) { fs_.read(reinterpret_cast(ptr), size); @@ -35,7 +33,15 @@ DiskIOReader::seekg(size_t pos) { size_t DiskIOReader::length() { fs_.seekg(0, fs_.end); - return fs_.tellg(); + size_t len = fs_.tellg(); + fs_.seekg(0, fs_.beg); + return len; } + +void +DiskIOReader::close() { + fs_.close(); +} + } // namespace storage } // namespace milvus diff --git a/core/src/storage/disk/DiskIOReader.h b/core/src/storage/disk/DiskIOReader.h index 08aa2fdd3f..2fcf52457d 100644 --- a/core/src/storage/disk/DiskIOReader.h +++ b/core/src/storage/disk/DiskIOReader.h @@ -20,8 +20,20 @@ namespace storage { class DiskIOReader : public IOReader { public: - explicit DiskIOReader(const std::string& name); - ~DiskIOReader(); + DiskIOReader() = default; + ~DiskIOReader() = default; + + // No copy and move + DiskIOReader(const DiskIOReader&) = delete; + DiskIOReader(DiskIOReader&&) = delete; + + DiskIOReader& + operator=(const DiskIOReader&) = delete; + DiskIOReader& + operator=(DiskIOReader&&) = delete; + + void + open(const std::string& name) override; void read(void* ptr, size_t size) override; @@ -32,7 +44,11 @@ class DiskIOReader : public IOReader { size_t length() override; + void + close() override; + public: + std::string name_; std::fstream fs_; }; diff --git a/core/src/storage/disk/DiskIOWriter.cpp b/core/src/storage/disk/DiskIOWriter.cpp index 08e7704529..63899463b1 100644 --- a/core/src/storage/disk/DiskIOWriter.cpp +++ b/core/src/storage/disk/DiskIOWriter.cpp @@ -14,14 +14,13 @@ namespace milvus { namespace storage { -DiskIOWriter::DiskIOWriter(const std::string& name) : IOWriter(name) { +void +DiskIOWriter::open(const std::string& name) { + name_ = name; + len_ = 0; fs_ = std::fstream(name_, std::ios::out | std::ios::binary); } -DiskIOWriter::~DiskIOWriter() { - fs_.close(); -} - void DiskIOWriter::write(void* ptr, size_t size) { fs_.write(reinterpret_cast(ptr), size); @@ -33,5 +32,10 @@ DiskIOWriter::length() { return len_; } +void +DiskIOWriter::close() { + fs_.close(); +} + } // namespace storage } // namespace milvus diff --git a/core/src/storage/disk/DiskIOWriter.h b/core/src/storage/disk/DiskIOWriter.h index 4bcbdaf2d3..39c9b5ca68 100644 --- a/core/src/storage/disk/DiskIOWriter.h +++ b/core/src/storage/disk/DiskIOWriter.h @@ -20,8 +20,20 @@ namespace storage { class DiskIOWriter : public IOWriter { public: - explicit DiskIOWriter(const std::string& name); - ~DiskIOWriter(); + DiskIOWriter() = default; + ~DiskIOWriter() = default; + + // No copy and move + DiskIOWriter(const DiskIOWriter&) = delete; + DiskIOWriter(DiskIOWriter&&) = delete; + + DiskIOWriter& + operator=(const DiskIOWriter&) = delete; + DiskIOWriter& + operator=(DiskIOWriter&&) = delete; + + void + open(const std::string& name) override; void write(void* ptr, size_t size) override; @@ -29,7 +41,12 @@ class DiskIOWriter : public IOWriter { size_t length() override; + void + close() override; + public: + std::string name_; + size_t len_; std::fstream fs_; }; diff --git a/core/src/storage/s3/S3IOReader.cpp b/core/src/storage/s3/S3IOReader.cpp index 77edcb3861..e8d073029a 100644 --- a/core/src/storage/s3/S3IOReader.cpp +++ b/core/src/storage/s3/S3IOReader.cpp @@ -15,13 +15,13 @@ namespace milvus { namespace storage { -S3IOReader::S3IOReader(const std::string& name) : IOReader(name), pos_(0) { +void +S3IOReader::open(const std::string& name) { + name_ = name; + pos_ = 0; S3ClientWrapper::GetInstance().GetObjectStr(name_, buffer_); } -S3IOReader::~S3IOReader() { -} - void S3IOReader::read(void* ptr, size_t size) { memcpy(ptr, buffer_.data() + pos_, size); @@ -37,5 +37,9 @@ S3IOReader::length() { return buffer_.length(); } +void +S3IOReader::close() { +} + } // namespace storage } // namespace milvus diff --git a/core/src/storage/s3/S3IOReader.h b/core/src/storage/s3/S3IOReader.h index 5c2529c8e9..e69a64f969 100644 --- a/core/src/storage/s3/S3IOReader.h +++ b/core/src/storage/s3/S3IOReader.h @@ -19,8 +19,20 @@ namespace storage { class S3IOReader : public IOReader { public: - explicit S3IOReader(const std::string& name); - ~S3IOReader(); + S3IOReader() = default; + ~S3IOReader() = default; + + // No copy and move + S3IOReader(const S3IOReader&) = delete; + S3IOReader(S3IOReader&&) = delete; + + S3IOReader& + operator=(const S3IOReader&) = delete; + S3IOReader& + operator=(S3IOReader&&) = delete; + + void + open(const std::string& name) override; void read(void* ptr, size_t size) override; @@ -31,7 +43,11 @@ class S3IOReader : public IOReader { size_t length() override; + void + close() override; + public: + std::string name_; std::string buffer_; size_t pos_; }; diff --git a/core/src/storage/s3/S3IOWriter.cpp b/core/src/storage/s3/S3IOWriter.cpp index 51fff830fe..9d00db3c83 100644 --- a/core/src/storage/s3/S3IOWriter.cpp +++ b/core/src/storage/s3/S3IOWriter.cpp @@ -15,14 +15,13 @@ namespace milvus { namespace storage { -S3IOWriter::S3IOWriter(const std::string& name) : IOWriter(name) { +void +S3IOWriter::open(const std::string& name) { + name_ = name; + len_ = 0; buffer_ = ""; } -S3IOWriter::~S3IOWriter() { - S3ClientWrapper::GetInstance().PutObjectStr(name_, buffer_); -} - void S3IOWriter::write(void* ptr, size_t size) { buffer_ += std::string(reinterpret_cast(ptr), size); @@ -34,5 +33,10 @@ S3IOWriter::length() { return len_; } +void +S3IOWriter::close() { + S3ClientWrapper::GetInstance().PutObjectStr(name_, buffer_); +} + } // namespace storage } // namespace milvus diff --git a/core/src/storage/s3/S3IOWriter.h b/core/src/storage/s3/S3IOWriter.h index 53ec345afe..0b5240d7b1 100644 --- a/core/src/storage/s3/S3IOWriter.h +++ b/core/src/storage/s3/S3IOWriter.h @@ -19,8 +19,20 @@ namespace storage { class S3IOWriter : public IOWriter { public: - explicit S3IOWriter(const std::string& name); - ~S3IOWriter(); + S3IOWriter() = default; + ~S3IOWriter() = default; + + // No copy and move + S3IOWriter(const S3IOWriter&) = delete; + S3IOWriter(S3IOWriter&&) = delete; + + S3IOWriter& + operator=(const S3IOWriter&) = delete; + S3IOWriter& + operator=(S3IOWriter&&) = delete; + + void + open(const std::string& name) override; void write(void* ptr, size_t size) override; @@ -28,7 +40,12 @@ class S3IOWriter : public IOWriter { size_t length() override; + void + close() override; + public: + std::string name_; + size_t len_; std::string buffer_; }; diff --git a/core/src/wrapper/VecIndex.cpp b/core/src/wrapper/VecIndex.cpp index bc8f0626a0..e0ae664583 100644 --- a/core/src/wrapper/VecIndex.cpp +++ b/core/src/wrapper/VecIndex.cpp @@ -179,12 +179,13 @@ read_index(const std::string& location) { std::shared_ptr reader_ptr; if (s3_enable) { - reader_ptr = std::make_shared(location); + reader_ptr = std::make_shared(); } else { - reader_ptr = std::make_shared(location); + reader_ptr = std::make_shared(); } recorder.RecordSection("Start"); + reader_ptr->open(location); size_t length = reader_ptr->length(); if (length <= 0) { @@ -226,6 +227,8 @@ read_index(const std::string& location) { delete[] meta; } + reader_ptr->close(); + double span = recorder.RecordSection("End"); double rate = length * 1000000.0 / span / 1024 / 1024; STORAGE_LOG_DEBUG << "read_index(" << location << ") rate " << rate << "MB/s"; @@ -252,12 +255,13 @@ write_index(VecIndexPtr index, const std::string& location) { std::shared_ptr writer_ptr; if (s3_enable) { - writer_ptr = std::make_shared(location); + writer_ptr = std::make_shared(); } else { - writer_ptr = std::make_shared(location); + writer_ptr = std::make_shared(); } recorder.RecordSection("Start"); + writer_ptr->open(location); writer_ptr->write(&index_type, sizeof(IndexType)); @@ -273,6 +277,8 @@ write_index(VecIndexPtr index, const std::string& location) { writer_ptr->write((void*)binary->data.get(), binary_length); } + writer_ptr->close(); + double span = recorder.RecordSection("End"); double rate = writer_ptr->length() * 1000000.0 / span / 1024 / 1024; STORAGE_LOG_DEBUG << "write_index(" << location << ") rate " << rate << "MB/s"; diff --git a/core/unittest/storage/test_s3_client.cpp b/core/unittest/storage/test_s3_client.cpp index 96098a7b37..75e4cb5c3e 100644 --- a/core/unittest/storage/test_s3_client.cpp +++ b/core/unittest/storage/test_s3_client.cpp @@ -90,15 +90,18 @@ TEST_F(StorageTest, S3_RW_TEST) { ASSERT_TRUE(storage_inst.StartService().ok()); { - milvus::storage::S3IOWriter writer(index_name); + milvus::storage::S3IOWriter writer; + writer.open(index_name); size_t len = content.length(); writer.write(&len, sizeof(len)); writer.write((void*)(content.data()), len); ASSERT_TRUE(len + sizeof(len) == writer.length()); + writer.close(); } { - milvus::storage::S3IOReader reader(index_name); + milvus::storage::S3IOReader reader; + reader.open(index_name); size_t length = reader.length(); size_t rp = 0; reader.seekg(rp); @@ -120,6 +123,7 @@ TEST_F(StorageTest, S3_RW_TEST) { } ASSERT_TRUE(content == content_out); + reader.close(); } storage_inst.StopService(); From 4707493026b8f3d9cbe9fb73cb172c95a016b7d8 Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Wed, 11 Mar 2020 18:48:12 +0800 Subject: [PATCH 03/12] #1547 update changelog Signed-off-by: yudong.cai --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cb778c2c8a..fcc9169488 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -111,6 +111,7 @@ Please mark all change in change log and use the issue from GitHub - \#1537 Optimize raw vector and uids read/write - \#1544 Update resources name in HTTP module - \#1546 Move Config.cpp to config directory +- \#1547 Rename storage/file to storage/disk and rename classes - \#1567 Update yaml config description ## Task From 16aa46f6af84d4eaaab2bede9f4e43b78f1339cd Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Wed, 11 Mar 2020 21:05:00 +0800 Subject: [PATCH 04/12] #1547 update changelog Signed-off-by: yudong.cai --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fcc9169488..a603465f77 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ Please mark all change in change log and use the issue from GitHub -# Milvus 0.7.0 (TBD) +# Milvus 0.7.0 (2020-03-11) ## Bug - \#715 Milvus crash when searching and building index simultaneously using SQ8H From 41db1d7a5c10b0d37ae953b568c5d787a0a01f34 Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Wed, 11 Mar 2020 23:56:05 +0800 Subject: [PATCH 05/12] update changelog Signed-off-by: yudong.cai --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a603465f77..0524cb309a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ Please mark all change in change log and use the issue from GitHub +# Milvus 0.8.0 (TBD) + + # Milvus 0.7.0 (2020-03-11) ## Bug From 68ba8baa4eb94c949dd6dde99acd89e0822601f9 Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Thu, 12 Mar 2020 09:37:14 +0800 Subject: [PATCH 06/12] #1547 update changelog Signed-off-by: yudong.cai --- CHANGELOG.md | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0524cb309a..f5708d203c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,17 @@ Please mark all change in change log and use the issue from GitHub # Milvus 0.8.0 (TBD) +## Bug + +## Feature + +## Improvement +- \#1537 Optimize raw vector and uids read/write +- \#1546 Move Config.cpp to config directory +- \#1547 Rename storage/file to storage/disk and rename classes + +## Task + # Milvus 0.7.0 (2020-03-11) @@ -111,10 +122,7 @@ Please mark all change in change log and use the issue from GitHub - \#1448 General proto api for NNS libraries - \#1480 Add return code for AVX512 selection - \#1524 Update config "preload_table" description -- \#1537 Optimize raw vector and uids read/write - \#1544 Update resources name in HTTP module -- \#1546 Move Config.cpp to config directory -- \#1547 Rename storage/file to storage/disk and rename classes - \#1567 Update yaml config description ## Task From 28a0f1de01c2b0dd425fe39a9db53cf2b453c416 Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Mon, 9 Mar 2020 16:19:51 +0800 Subject: [PATCH 07/12] #1548 move store/Directory.cpp to storage Signed-off-by: yudong.cai --- core/src/CMakeLists.txt | 3 -- core/src/codecs/DeletedDocsFormat.h | 6 +-- core/src/codecs/IdBloomFilterFormat.h | 8 +-- core/src/codecs/VectorsFormat.h | 10 ++-- .../default/DefaultDeletedDocsFormat.cpp | 9 ++-- .../codecs/default/DefaultDeletedDocsFormat.h | 4 +- .../default/DefaultIdBloomFilterFormat.cpp | 12 ++--- .../default/DefaultIdBloomFilterFormat.h | 8 +-- .../codecs/default/DefaultVectorsFormat.cpp | 16 +++--- .../src/codecs/default/DefaultVectorsFormat.h | 9 ++-- core/src/segment/SegmentReader.cpp | 14 ++--- core/src/segment/SegmentReader.h | 4 +- core/src/segment/SegmentWriter.cpp | 20 ++++---- core/src/segment/SegmentWriter.h | 4 +- core/src/storage/Operation.h | 51 +++++++++++++++++++ .../disk/DiskOperation.cpp} | 25 +++++---- .../disk/DiskOperation.h} | 22 ++++---- core/unittest/CMakeLists.txt | 3 -- 18 files changed, 138 insertions(+), 90 deletions(-) create mode 100644 core/src/storage/Operation.h rename core/src/{store/Directory.cpp => storage/disk/DiskOperation.cpp} (82%) rename core/src/{store/Directory.h => storage/disk/DiskOperation.h} (80%) diff --git a/core/src/CMakeLists.txt b/core/src/CMakeLists.txt index ef1af69eb9..173e1aa293 100644 --- a/core/src/CMakeLists.txt +++ b/core/src/CMakeLists.txt @@ -125,8 +125,6 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/codecs/default codecs_default_files) aux_source_directory(${MILVUS_ENGINE_SRC}/segment segment_files) -aux_source_directory(${MILVUS_ENGINE_SRC}/store store_files) - set(engine_files ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp ${cache_files} @@ -143,7 +141,6 @@ set(engine_files ${codecs_files} ${codecs_default_files} ${segment_files} - ${store_files} ) if (MILVUS_WITH_PROMETHEUS) diff --git a/core/src/codecs/DeletedDocsFormat.h b/core/src/codecs/DeletedDocsFormat.h index d5f263a238..1369664629 100644 --- a/core/src/codecs/DeletedDocsFormat.h +++ b/core/src/codecs/DeletedDocsFormat.h @@ -20,7 +20,7 @@ #include #include "segment/DeletedDocs.h" -#include "store/Directory.h" +#include "src/storage/disk/DiskOperation.h" namespace milvus { namespace codec { @@ -28,10 +28,10 @@ namespace codec { class DeletedDocsFormat { public: virtual void - read(const store::DirectoryPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) = 0; + read(const storage::OperationPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) = 0; virtual void - write(const store::DirectoryPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) = 0; + write(const storage::OperationPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) = 0; }; using DeletedDocsFormatPtr = std::shared_ptr; diff --git a/core/src/codecs/IdBloomFilterFormat.h b/core/src/codecs/IdBloomFilterFormat.h index 11cafac773..17736bfaa4 100644 --- a/core/src/codecs/IdBloomFilterFormat.h +++ b/core/src/codecs/IdBloomFilterFormat.h @@ -20,7 +20,7 @@ #include #include "segment/IdBloomFilter.h" -#include "store/Directory.h" +#include "src/storage/disk/DiskOperation.h" namespace milvus { namespace codec { @@ -28,13 +28,13 @@ namespace codec { class IdBloomFilterFormat { public: virtual void - read(const store::DirectoryPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; + read(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; virtual void - write(const store::DirectoryPtr& directory_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; + write(const storage::OperationPtr& directory_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; virtual void - create(const store::DirectoryPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; + create(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; }; using IdBloomFilterFormatPtr = std::shared_ptr; diff --git a/core/src/codecs/VectorsFormat.h b/core/src/codecs/VectorsFormat.h index cfb7504fd6..2fa77d628d 100644 --- a/core/src/codecs/VectorsFormat.h +++ b/core/src/codecs/VectorsFormat.h @@ -21,7 +21,7 @@ #include #include "segment/Vectors.h" -#include "store/Directory.h" +#include "src/storage/disk/DiskOperation.h" namespace milvus { namespace codec { @@ -29,16 +29,16 @@ namespace codec { class VectorsFormat { public: virtual void - read(const store::DirectoryPtr& directory_ptr, segment::VectorsPtr& vectors_read) = 0; + read(const storage::OperationPtr& directory_ptr, segment::VectorsPtr& vectors_read) = 0; virtual void - write(const store::DirectoryPtr& directory_ptr, const segment::VectorsPtr& vectors) = 0; + write(const storage::OperationPtr& directory_ptr, const segment::VectorsPtr& vectors) = 0; virtual void - read_uids(const store::DirectoryPtr& directory_ptr, std::vector& uids) = 0; + read_uids(const storage::OperationPtr& directory_ptr, std::vector& uids) = 0; virtual void - read_vectors(const store::DirectoryPtr& directory_ptr, off_t offset, size_t num_bytes, + read_vectors(const storage::OperationPtr& directory_ptr, off_t offset, size_t num_bytes, std::vector& raw_vectors) = 0; }; diff --git a/core/src/codecs/default/DefaultDeletedDocsFormat.cpp b/core/src/codecs/default/DefaultDeletedDocsFormat.cpp index 0b0039cd4f..1027e477a1 100644 --- a/core/src/codecs/default/DefaultDeletedDocsFormat.cpp +++ b/core/src/codecs/default/DefaultDeletedDocsFormat.cpp @@ -35,10 +35,10 @@ namespace milvus { namespace codec { void -DefaultDeletedDocsFormat::read(const store::DirectoryPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) { +DefaultDeletedDocsFormat::read(const storage::OperationPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirPath(); + std::string dir_path = directory_ptr->GetDirectory(); const std::string del_file_path = dir_path + "/" + deleted_docs_filename_; int del_fd = open(del_file_path.c_str(), O_RDONLY, 00664); @@ -75,10 +75,11 @@ DefaultDeletedDocsFormat::read(const store::DirectoryPtr& directory_ptr, segment } void -DefaultDeletedDocsFormat::write(const store::DirectoryPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) { +DefaultDeletedDocsFormat::write(const storage::OperationPtr& directory_ptr, + const segment::DeletedDocsPtr& deleted_docs) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirPath(); + std::string dir_path = directory_ptr->GetDirectory(); const std::string del_file_path = dir_path + "/" + deleted_docs_filename_; // Create a temporary file from the existing file diff --git a/core/src/codecs/default/DefaultDeletedDocsFormat.h b/core/src/codecs/default/DefaultDeletedDocsFormat.h index dd58297f35..f1e13ed574 100644 --- a/core/src/codecs/default/DefaultDeletedDocsFormat.h +++ b/core/src/codecs/default/DefaultDeletedDocsFormat.h @@ -30,10 +30,10 @@ class DefaultDeletedDocsFormat : public DeletedDocsFormat { DefaultDeletedDocsFormat() = default; void - read(const store::DirectoryPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) override; + read(const storage::OperationPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) override; void - write(const store::DirectoryPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) override; + write(const storage::OperationPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) override; // No copy and move DefaultDeletedDocsFormat(const DefaultDeletedDocsFormat&) = delete; diff --git a/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp b/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp index 7cbabcfb74..5eb759fd50 100644 --- a/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp +++ b/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp @@ -30,11 +30,11 @@ constexpr unsigned int bloom_filter_capacity = 500000; constexpr double bloom_filter_error_rate = 0.01; void -DefaultIdBloomFilterFormat::read(const store::DirectoryPtr& directory_ptr, +DefaultIdBloomFilterFormat::read(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirPath(); + std::string dir_path = directory_ptr->GetDirectory(); const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_; scaling_bloom_t* bloom_filter = new_scaling_bloom_from_file(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str()); @@ -48,11 +48,11 @@ DefaultIdBloomFilterFormat::read(const store::DirectoryPtr& directory_ptr, } void -DefaultIdBloomFilterFormat::write(const store::DirectoryPtr& directory_ptr, +DefaultIdBloomFilterFormat::write(const storage::OperationPtr& directory_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirPath(); + std::string dir_path = directory_ptr->GetDirectory(); const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_; if (scaling_bloom_flush(id_bloom_filter_ptr->GetBloomFilter()) == -1) { std::string err_msg = @@ -63,9 +63,9 @@ DefaultIdBloomFilterFormat::write(const store::DirectoryPtr& directory_ptr, } void -DefaultIdBloomFilterFormat::create(const store::DirectoryPtr& directory_ptr, +DefaultIdBloomFilterFormat::create(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) { - std::string dir_path = directory_ptr->GetDirPath(); + std::string dir_path = directory_ptr->GetDirectory(); const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_; scaling_bloom_t* bloom_filter = new_scaling_bloom(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str()); diff --git a/core/src/codecs/default/DefaultIdBloomFilterFormat.h b/core/src/codecs/default/DefaultIdBloomFilterFormat.h index 7a5c4d29fd..ceb669ffff 100644 --- a/core/src/codecs/default/DefaultIdBloomFilterFormat.h +++ b/core/src/codecs/default/DefaultIdBloomFilterFormat.h @@ -22,7 +22,7 @@ #include "codecs/IdBloomFilterFormat.h" #include "segment/IdBloomFilter.h" -#include "store/Directory.h" +#include "src/storage/disk/DiskOperation.h" namespace milvus { namespace codec { @@ -32,13 +32,13 @@ class DefaultIdBloomFilterFormat : public IdBloomFilterFormat { DefaultIdBloomFilterFormat() = default; void - read(const store::DirectoryPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; + read(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; void - write(const store::DirectoryPtr& directory_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; + write(const storage::OperationPtr& directory_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; void - create(const store::DirectoryPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; + create(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; // No copy and move DefaultIdBloomFilterFormat(const DefaultIdBloomFilterFormat&) = delete; diff --git a/core/src/codecs/default/DefaultVectorsFormat.cpp b/core/src/codecs/default/DefaultVectorsFormat.cpp index f32f0ae5ba..36936a07c0 100644 --- a/core/src/codecs/default/DefaultVectorsFormat.cpp +++ b/core/src/codecs/default/DefaultVectorsFormat.cpp @@ -102,10 +102,10 @@ DefaultVectorsFormat::read_uids_internal(const std::string& file_path, std::vect } void -DefaultVectorsFormat::read(const store::DirectoryPtr& directory_ptr, segment::VectorsPtr& vectors_read) { +DefaultVectorsFormat::read(const storage::OperationPtr& directory_ptr, segment::VectorsPtr& vectors_read) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirPath(); + std::string dir_path = directory_ptr->GetDirectory(); if (!boost::filesystem::is_directory(dir_path)) { std::string err_msg = "Directory: " + dir_path + "does not exist"; ENGINE_LOG_ERROR << err_msg; @@ -134,10 +134,10 @@ DefaultVectorsFormat::read(const store::DirectoryPtr& directory_ptr, segment::Ve } void -DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segment::VectorsPtr& vectors) { +DefaultVectorsFormat::write(const storage::OperationPtr& directory_ptr, const segment::VectorsPtr& vectors) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirPath(); + std::string dir_path = directory_ptr->GetDirectory(); const std::string rv_file_path = dir_path + "/" + vectors->GetName() + raw_vector_extension_; const std::string uid_file_path = dir_path + "/" + vectors->GetName() + user_id_extension_; @@ -197,10 +197,10 @@ DefaultVectorsFormat::write(const store::DirectoryPtr& directory_ptr, const segm } void -DefaultVectorsFormat::read_uids(const store::DirectoryPtr& directory_ptr, std::vector& uids) { +DefaultVectorsFormat::read_uids(const storage::OperationPtr& directory_ptr, std::vector& uids) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirPath(); + std::string dir_path = directory_ptr->GetDirectory(); if (!boost::filesystem::is_directory(dir_path)) { std::string err_msg = "Directory: " + dir_path + "does not exist"; ENGINE_LOG_ERROR << err_msg; @@ -221,11 +221,11 @@ DefaultVectorsFormat::read_uids(const store::DirectoryPtr& directory_ptr, std::v } void -DefaultVectorsFormat::read_vectors(const store::DirectoryPtr& directory_ptr, off_t offset, size_t num_bytes, +DefaultVectorsFormat::read_vectors(const storage::OperationPtr& directory_ptr, off_t offset, size_t num_bytes, std::vector& raw_vectors) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirPath(); + std::string dir_path = directory_ptr->GetDirectory(); if (!boost::filesystem::is_directory(dir_path)) { std::string err_msg = "Directory: " + dir_path + "does not exist"; ENGINE_LOG_ERROR << err_msg; diff --git a/core/src/codecs/default/DefaultVectorsFormat.h b/core/src/codecs/default/DefaultVectorsFormat.h index 54c9b5278d..1fd2081fb9 100644 --- a/core/src/codecs/default/DefaultVectorsFormat.h +++ b/core/src/codecs/default/DefaultVectorsFormat.h @@ -32,16 +32,17 @@ class DefaultVectorsFormat : public VectorsFormat { DefaultVectorsFormat() = default; void - read(const store::DirectoryPtr&, segment::VectorsPtr&) override; + read(const storage::OperationPtr& directory_ptr, segment::VectorsPtr& vectors_read) override; void - write(const store::DirectoryPtr&, const segment::VectorsPtr&) override; + write(const storage::OperationPtr& directory_ptr, const segment::VectorsPtr& vectors) override; void - read_vectors(const store::DirectoryPtr&, off_t, size_t, std::vector&) override; + read_uids(const storage::OperationPtr& directory_ptr, std::vector& uids) override; void - read_uids(const store::DirectoryPtr&, std::vector&) override; + read_vectors(const storage::OperationPtr& directory_ptr, off_t offset, size_t num_bytes, + std::vector& raw_vectors) override; // No copy and move DefaultVectorsFormat(const DefaultVectorsFormat&) = delete; diff --git a/core/src/segment/SegmentReader.cpp b/core/src/segment/SegmentReader.cpp index c0983a89be..6bbf497ba1 100644 --- a/core/src/segment/SegmentReader.cpp +++ b/core/src/segment/SegmentReader.cpp @@ -21,14 +21,14 @@ #include "Vectors.h" #include "codecs/default/DefaultCodec.h" -#include "store/Directory.h" +#include "src/storage/disk/DiskOperation.h" #include "utils/Log.h" namespace milvus { namespace segment { SegmentReader::SegmentReader(const std::string& directory) { - directory_ptr_ = std::make_shared(directory); + directory_ptr_ = std::make_shared(directory); segment_ptr_ = std::make_shared(); } @@ -43,7 +43,7 @@ SegmentReader::Load() { // TODO(zhiru) codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); + directory_ptr_->CreateDirectory(); default_codec.GetVectorsFormat()->read(directory_ptr_, segment_ptr_->vectors_ptr_); default_codec.GetDeletedDocsFormat()->read(directory_ptr_, segment_ptr_->deleted_docs_ptr_); } catch (std::exception& e) { @@ -56,7 +56,7 @@ Status SegmentReader::LoadVectors(off_t offset, size_t num_bytes, std::vector& raw_vectors) { codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); + directory_ptr_->CreateDirectory(); default_codec.GetVectorsFormat()->read_vectors(directory_ptr_, offset, num_bytes, raw_vectors); } catch (std::exception& e) { std::string err_msg = "Failed to load raw vectors: " + std::string(e.what()); @@ -70,7 +70,7 @@ Status SegmentReader::LoadUids(std::vector& uids) { codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); + directory_ptr_->CreateDirectory(); default_codec.GetVectorsFormat()->read_uids(directory_ptr_, uids); } catch (std::exception& e) { std::string err_msg = "Failed to load uids: " + std::string(e.what()); @@ -90,7 +90,7 @@ Status SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) { codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); + directory_ptr_->CreateDirectory(); default_codec.GetIdBloomFilterFormat()->read(directory_ptr_, id_bloom_filter_ptr); } catch (std::exception& e) { std::string err_msg = "Failed to load bloom filter: " + std::string(e.what()); @@ -104,7 +104,7 @@ Status SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) { codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); + directory_ptr_->CreateDirectory(); default_codec.GetDeletedDocsFormat()->read(directory_ptr_, deleted_docs_ptr); } catch (std::exception& e) { std::string err_msg = "Failed to load deleted docs: " + std::string(e.what()); diff --git a/core/src/segment/SegmentReader.h b/core/src/segment/SegmentReader.h index 95531d5685..7a41a7939a 100644 --- a/core/src/segment/SegmentReader.h +++ b/core/src/segment/SegmentReader.h @@ -22,7 +22,7 @@ #include #include "segment/Types.h" -#include "store/Directory.h" +#include "src/storage/disk/DiskOperation.h" #include "utils/Status.h" namespace milvus { @@ -55,7 +55,7 @@ class SegmentReader { GetSegment(SegmentPtr& segment_ptr); private: - store::DirectoryPtr directory_ptr_; + storage::OperationPtr directory_ptr_; SegmentPtr segment_ptr_; }; diff --git a/core/src/segment/SegmentWriter.cpp b/core/src/segment/SegmentWriter.cpp index c80776e2f5..fa210ae36e 100644 --- a/core/src/segment/SegmentWriter.cpp +++ b/core/src/segment/SegmentWriter.cpp @@ -23,14 +23,14 @@ #include "SegmentReader.h" #include "Vectors.h" #include "codecs/default/DefaultCodec.h" -#include "store/Directory.h" +#include "src/storage/disk/DiskOperation.h" #include "utils/Log.h" namespace milvus { namespace segment { SegmentWriter::SegmentWriter(const std::string& directory) { - directory_ptr_ = std::make_shared(directory); + directory_ptr_ = std::make_shared(directory); segment_ptr_ = std::make_shared(); } @@ -84,7 +84,7 @@ Status SegmentWriter::WriteVectors() { codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); + directory_ptr_->CreateDirectory(); default_codec.GetVectorsFormat()->write(directory_ptr_, segment_ptr_->vectors_ptr_); } catch (std::exception& e) { std::string err_msg = "Failed to write vectors: " + std::string(e.what()); @@ -98,7 +98,7 @@ Status SegmentWriter::WriteBloomFilter() { codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); + directory_ptr_->CreateDirectory(); auto start = std::chrono::high_resolution_clock::now(); @@ -138,7 +138,7 @@ Status SegmentWriter::WriteDeletedDocs() { codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); + directory_ptr_->CreateDirectory(); DeletedDocsPtr deleted_docs_ptr = std::make_shared(); default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs_ptr); } catch (std::exception& e) { @@ -153,7 +153,7 @@ Status SegmentWriter::WriteDeletedDocs(const DeletedDocsPtr& deleted_docs) { codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); + directory_ptr_->CreateDirectory(); default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs); } catch (std::exception& e) { std::string err_msg = "Failed to write deleted docs: " + std::string(e.what()); @@ -167,7 +167,7 @@ Status SegmentWriter::WriteBloomFilter(const IdBloomFilterPtr& id_bloom_filter_ptr) { codec::DefaultCodec default_codec; try { - directory_ptr_->Create(); + directory_ptr_->CreateDirectory(); default_codec.GetIdBloomFilterFormat()->write(directory_ptr_, id_bloom_filter_ptr); } catch (std::exception& e) { std::string err_msg = "Failed to write bloom filter: " + std::string(e.what()); @@ -191,11 +191,11 @@ SegmentWriter::GetSegment(SegmentPtr& segment_ptr) { Status SegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name) { - if (dir_to_merge == directory_ptr_->GetDirPath()) { + if (dir_to_merge == directory_ptr_->GetDirectory()) { return Status(DB_ERROR, "Cannot Merge Self"); } - ENGINE_LOG_DEBUG << "Merging from " << dir_to_merge << " to " << directory_ptr_->GetDirPath(); + ENGINE_LOG_DEBUG << "Merging from " << dir_to_merge << " to " << directory_ptr_->GetDirectory(); auto start = std::chrono::high_resolution_clock::now(); @@ -234,7 +234,7 @@ SegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name) { ENGINE_LOG_DEBUG << "Adding " << segment_to_merge->vectors_ptr_->GetCount() << " vectors and uids took " << diff.count() << " s"; - ENGINE_LOG_DEBUG << "Merging completed from " << dir_to_merge << " to " << directory_ptr_->GetDirPath(); + ENGINE_LOG_DEBUG << "Merging completed from " << dir_to_merge << " to " << directory_ptr_->GetDirectory(); return Status::OK(); } diff --git a/core/src/segment/SegmentWriter.h b/core/src/segment/SegmentWriter.h index f7b6a7398c..a430cda9b8 100644 --- a/core/src/segment/SegmentWriter.h +++ b/core/src/segment/SegmentWriter.h @@ -22,7 +22,7 @@ #include #include "segment/Types.h" -#include "store/Directory.h" +#include "src/storage/disk/DiskOperation.h" #include "utils/Status.h" namespace milvus { @@ -70,7 +70,7 @@ class SegmentWriter { WriteDeletedDocs(); private: - store::DirectoryPtr directory_ptr_; + storage::OperationPtr directory_ptr_; SegmentPtr segment_ptr_; }; diff --git a/core/src/storage/Operation.h b/core/src/storage/Operation.h new file mode 100644 index 0000000000..a29069c7da --- /dev/null +++ b/core/src/storage/Operation.h @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +namespace milvus { +namespace storage { + +class Operation { + public: + virtual void + CreateDirectory() = 0; + + virtual const std::string& + GetDirectory() const = 0; + + virtual void + ListDirectory(std::vector& file_paths) = 0; + + virtual bool + DeleteFile(const std::string& file_path) = 0; + + // TODO(zhiru): + // open(), sync(), close() + // function that opens a stream for reading file + // function that creates a new, empty file and returns an stream for appending data to this file + // function that creates a new, empty, temporary file and returns an stream for appending data to this file +}; + +using OperationPtr = std::shared_ptr; + +} // namespace storage +} // namespace milvus diff --git a/core/src/store/Directory.cpp b/core/src/storage/disk/DiskOperation.cpp similarity index 82% rename from core/src/store/Directory.cpp rename to core/src/storage/disk/DiskOperation.cpp index 1d41da004f..90a36c2bb2 100644 --- a/core/src/store/Directory.cpp +++ b/core/src/storage/disk/DiskOperation.cpp @@ -15,21 +15,20 @@ // specific language governing permissions and limitations // under the License. -#include "store/Directory.h" - #include +#include "storage/disk/DiskOperation.h" #include "utils/Exception.h" #include "utils/Log.h" namespace milvus { -namespace store { +namespace storage { -Directory::Directory(const std::string& dir_path) : dir_path_(dir_path) { +DiskOperation::DiskOperation(const std::string& dir_path) : dir_path_(dir_path) { } void -Directory::Create() { +DiskOperation::CreateDirectory() { if (!boost::filesystem::is_directory(dir_path_)) { auto ret = boost::filesystem::create_directory(dir_path_); if (!ret) { @@ -40,8 +39,13 @@ Directory::Create() { } } +const std::string& +DiskOperation::GetDirectory() const { + return dir_path_; +} + void -Directory::ListAll(std::vector& file_paths) { +DiskOperation::ListDirectory(std::vector& file_paths) { boost::filesystem::path target_path(dir_path_); typedef boost::filesystem::directory_iterator d_it; d_it it_end; @@ -54,14 +58,9 @@ Directory::ListAll(std::vector& file_paths) { } bool -Directory::DeleteFile(const std::string& file_path) { +DiskOperation::DeleteFile(const std::string& file_path) { return boost::filesystem::remove(file_path); } -const std::string& -Directory::GetDirPath() const { - return dir_path_; -} - -} // namespace store +} // namespace storage } // namespace milvus diff --git a/core/src/store/Directory.h b/core/src/storage/disk/DiskOperation.h similarity index 80% rename from core/src/store/Directory.h rename to core/src/storage/disk/DiskOperation.h index de1dc276ac..50c1a4ba3d 100644 --- a/core/src/store/Directory.h +++ b/core/src/storage/disk/DiskOperation.h @@ -21,25 +21,27 @@ #include #include +#include "storage/Operation.h" + namespace milvus { -namespace store { +namespace storage { -class Directory { +class DiskOperation : public Operation { public: - explicit Directory(const std::string& dir_path); + explicit DiskOperation(const std::string& dir_path); void - Create(); + CreateDirectory(); + + const std::string& + GetDirectory() const; void - ListAll(std::vector& file_paths); + ListDirectory(std::vector& file_paths); bool DeleteFile(const std::string& file_path); - const std::string& - GetDirPath() const; - // TODO(zhiru): // open(), sync(), close() // function that opens a stream for reading file @@ -50,7 +52,7 @@ class Directory { const std::string dir_path_; }; -using DirectoryPtr = std::shared_ptr; +using DiskOperationPtr = std::shared_ptr; -} // namespace store +} // namespace storage } // namespace milvus diff --git a/core/unittest/CMakeLists.txt b/core/unittest/CMakeLists.txt index cdc2f033f9..6674fdf032 100644 --- a/core/unittest/CMakeLists.txt +++ b/core/unittest/CMakeLists.txt @@ -111,8 +111,6 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/codecs/default codecs_default_files) aux_source_directory(${MILVUS_ENGINE_SRC}/segment segment_files) -aux_source_directory(${MILVUS_ENGINE_SRC}/store store_files) - set(entry_file ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp) @@ -146,7 +144,6 @@ set(common_files ${codecs_files} ${codecs_default_files} ${segment_files} - ${store_files} ) set(unittest_libs From 817ea8b9e1f4b980344927e272ba6a05e7001370 Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Mon, 9 Mar 2020 16:58:15 +0800 Subject: [PATCH 08/12] #1548 add FSHandler Signed-off-by: yudong.cai --- core/src/codecs/DeletedDocsFormat.h | 6 +-- core/src/codecs/IdBloomFilterFormat.h | 8 ++-- core/src/codecs/VectorsFormat.h | 10 ++--- .../default/DefaultDeletedDocsFormat.cpp | 9 ++-- .../codecs/default/DefaultDeletedDocsFormat.h | 4 +- .../default/DefaultIdBloomFilterFormat.cpp | 13 +++--- .../default/DefaultIdBloomFilterFormat.h | 6 +-- .../codecs/default/DefaultVectorsFormat.cpp | 16 +++---- .../src/codecs/default/DefaultVectorsFormat.h | 8 ++-- core/src/segment/SegmentReader.cpp | 31 ++++++++------ core/src/segment/SegmentReader.h | 4 +- core/src/segment/SegmentWriter.cpp | 37 +++++++++------- core/src/segment/SegmentWriter.h | 4 +- core/src/storage/FSHandler.h | 42 +++++++++++++++++++ core/src/storage/IOReader.h | 3 ++ core/src/storage/IOWriter.h | 3 ++ 16 files changed, 130 insertions(+), 74 deletions(-) create mode 100644 core/src/storage/FSHandler.h diff --git a/core/src/codecs/DeletedDocsFormat.h b/core/src/codecs/DeletedDocsFormat.h index 1369664629..431a9a437a 100644 --- a/core/src/codecs/DeletedDocsFormat.h +++ b/core/src/codecs/DeletedDocsFormat.h @@ -20,7 +20,7 @@ #include #include "segment/DeletedDocs.h" -#include "src/storage/disk/DiskOperation.h" +#include "storage/FSHandler.h" namespace milvus { namespace codec { @@ -28,10 +28,10 @@ namespace codec { class DeletedDocsFormat { public: virtual void - read(const storage::OperationPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) = 0; + read(const storage::FSHandlerPtr& fs_ptr, segment::DeletedDocsPtr& deleted_docs) = 0; virtual void - write(const storage::OperationPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) = 0; + write(const storage::FSHandlerPtr& fs_ptr, const segment::DeletedDocsPtr& deleted_docs) = 0; }; using DeletedDocsFormatPtr = std::shared_ptr; diff --git a/core/src/codecs/IdBloomFilterFormat.h b/core/src/codecs/IdBloomFilterFormat.h index 17736bfaa4..a374dd70cb 100644 --- a/core/src/codecs/IdBloomFilterFormat.h +++ b/core/src/codecs/IdBloomFilterFormat.h @@ -20,7 +20,7 @@ #include #include "segment/IdBloomFilter.h" -#include "src/storage/disk/DiskOperation.h" +#include "storage/FSHandler.h" namespace milvus { namespace codec { @@ -28,13 +28,13 @@ namespace codec { class IdBloomFilterFormat { public: virtual void - read(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; + read(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; virtual void - write(const storage::OperationPtr& directory_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; + write(const storage::FSHandlerPtr& fs_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; virtual void - create(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; + create(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0; }; using IdBloomFilterFormatPtr = std::shared_ptr; diff --git a/core/src/codecs/VectorsFormat.h b/core/src/codecs/VectorsFormat.h index 2fa77d628d..5227f9a6a0 100644 --- a/core/src/codecs/VectorsFormat.h +++ b/core/src/codecs/VectorsFormat.h @@ -21,7 +21,7 @@ #include #include "segment/Vectors.h" -#include "src/storage/disk/DiskOperation.h" +#include "storage/FSHandler.h" namespace milvus { namespace codec { @@ -29,16 +29,16 @@ namespace codec { class VectorsFormat { public: virtual void - read(const storage::OperationPtr& directory_ptr, segment::VectorsPtr& vectors_read) = 0; + read(const storage::FSHandlerPtr& fs_ptr, segment::VectorsPtr& vectors_read) = 0; virtual void - write(const storage::OperationPtr& directory_ptr, const segment::VectorsPtr& vectors) = 0; + write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorsPtr& vectors) = 0; virtual void - read_uids(const storage::OperationPtr& directory_ptr, std::vector& uids) = 0; + read_uids(const storage::FSHandlerPtr& fs_ptr, std::vector& uids) = 0; virtual void - read_vectors(const storage::OperationPtr& directory_ptr, off_t offset, size_t num_bytes, + read_vectors(const storage::FSHandlerPtr& fs_ptr, off_t offset, size_t num_bytes, std::vector& raw_vectors) = 0; }; diff --git a/core/src/codecs/default/DefaultDeletedDocsFormat.cpp b/core/src/codecs/default/DefaultDeletedDocsFormat.cpp index 1027e477a1..128b682543 100644 --- a/core/src/codecs/default/DefaultDeletedDocsFormat.cpp +++ b/core/src/codecs/default/DefaultDeletedDocsFormat.cpp @@ -35,10 +35,10 @@ namespace milvus { namespace codec { void -DefaultDeletedDocsFormat::read(const storage::OperationPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) { +DefaultDeletedDocsFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::DeletedDocsPtr& deleted_docs) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirectory(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); const std::string del_file_path = dir_path + "/" + deleted_docs_filename_; int del_fd = open(del_file_path.c_str(), O_RDONLY, 00664); @@ -75,11 +75,10 @@ DefaultDeletedDocsFormat::read(const storage::OperationPtr& directory_ptr, segme } void -DefaultDeletedDocsFormat::write(const storage::OperationPtr& directory_ptr, - const segment::DeletedDocsPtr& deleted_docs) { +DefaultDeletedDocsFormat::write(const storage::FSHandlerPtr& fs_ptr, const segment::DeletedDocsPtr& deleted_docs) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirectory(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); const std::string del_file_path = dir_path + "/" + deleted_docs_filename_; // Create a temporary file from the existing file diff --git a/core/src/codecs/default/DefaultDeletedDocsFormat.h b/core/src/codecs/default/DefaultDeletedDocsFormat.h index f1e13ed574..d755245a84 100644 --- a/core/src/codecs/default/DefaultDeletedDocsFormat.h +++ b/core/src/codecs/default/DefaultDeletedDocsFormat.h @@ -30,10 +30,10 @@ class DefaultDeletedDocsFormat : public DeletedDocsFormat { DefaultDeletedDocsFormat() = default; void - read(const storage::OperationPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) override; + read(const storage::FSHandlerPtr& fs_ptr, segment::DeletedDocsPtr& deleted_docs) override; void - write(const storage::OperationPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) override; + write(const storage::FSHandlerPtr& fs_ptr, const segment::DeletedDocsPtr& deleted_docs) override; // No copy and move DefaultDeletedDocsFormat(const DefaultDeletedDocsFormat&) = delete; diff --git a/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp b/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp index 5eb759fd50..ed7c40908b 100644 --- a/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp +++ b/core/src/codecs/default/DefaultIdBloomFilterFormat.cpp @@ -30,11 +30,10 @@ constexpr unsigned int bloom_filter_capacity = 500000; constexpr double bloom_filter_error_rate = 0.01; void -DefaultIdBloomFilterFormat::read(const storage::OperationPtr& directory_ptr, - segment::IdBloomFilterPtr& id_bloom_filter_ptr) { +DefaultIdBloomFilterFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirectory(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_; scaling_bloom_t* bloom_filter = new_scaling_bloom_from_file(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str()); @@ -48,11 +47,11 @@ DefaultIdBloomFilterFormat::read(const storage::OperationPtr& directory_ptr, } void -DefaultIdBloomFilterFormat::write(const storage::OperationPtr& directory_ptr, +DefaultIdBloomFilterFormat::write(const storage::FSHandlerPtr& fs_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirectory(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_; if (scaling_bloom_flush(id_bloom_filter_ptr->GetBloomFilter()) == -1) { std::string err_msg = @@ -63,9 +62,9 @@ DefaultIdBloomFilterFormat::write(const storage::OperationPtr& directory_ptr, } void -DefaultIdBloomFilterFormat::create(const storage::OperationPtr& directory_ptr, +DefaultIdBloomFilterFormat::create(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) { - std::string dir_path = directory_ptr->GetDirectory(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_; scaling_bloom_t* bloom_filter = new_scaling_bloom(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str()); diff --git a/core/src/codecs/default/DefaultIdBloomFilterFormat.h b/core/src/codecs/default/DefaultIdBloomFilterFormat.h index ceb669ffff..b2c89ebfa2 100644 --- a/core/src/codecs/default/DefaultIdBloomFilterFormat.h +++ b/core/src/codecs/default/DefaultIdBloomFilterFormat.h @@ -32,13 +32,13 @@ class DefaultIdBloomFilterFormat : public IdBloomFilterFormat { DefaultIdBloomFilterFormat() = default; void - read(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; + read(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; void - write(const storage::OperationPtr& directory_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; + write(const storage::FSHandlerPtr& fs_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; void - create(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; + create(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override; // No copy and move DefaultIdBloomFilterFormat(const DefaultIdBloomFilterFormat&) = delete; diff --git a/core/src/codecs/default/DefaultVectorsFormat.cpp b/core/src/codecs/default/DefaultVectorsFormat.cpp index 36936a07c0..412049fc9a 100644 --- a/core/src/codecs/default/DefaultVectorsFormat.cpp +++ b/core/src/codecs/default/DefaultVectorsFormat.cpp @@ -102,10 +102,10 @@ DefaultVectorsFormat::read_uids_internal(const std::string& file_path, std::vect } void -DefaultVectorsFormat::read(const storage::OperationPtr& directory_ptr, segment::VectorsPtr& vectors_read) { +DefaultVectorsFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::VectorsPtr& vectors_read) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirectory(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); if (!boost::filesystem::is_directory(dir_path)) { std::string err_msg = "Directory: " + dir_path + "does not exist"; ENGINE_LOG_ERROR << err_msg; @@ -134,10 +134,10 @@ DefaultVectorsFormat::read(const storage::OperationPtr& directory_ptr, segment:: } void -DefaultVectorsFormat::write(const storage::OperationPtr& directory_ptr, const segment::VectorsPtr& vectors) { +DefaultVectorsFormat::write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorsPtr& vectors) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirectory(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); const std::string rv_file_path = dir_path + "/" + vectors->GetName() + raw_vector_extension_; const std::string uid_file_path = dir_path + "/" + vectors->GetName() + user_id_extension_; @@ -197,10 +197,10 @@ DefaultVectorsFormat::write(const storage::OperationPtr& directory_ptr, const se } void -DefaultVectorsFormat::read_uids(const storage::OperationPtr& directory_ptr, std::vector& uids) { +DefaultVectorsFormat::read_uids(const storage::FSHandlerPtr& fs_ptr, std::vector& uids) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirectory(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); if (!boost::filesystem::is_directory(dir_path)) { std::string err_msg = "Directory: " + dir_path + "does not exist"; ENGINE_LOG_ERROR << err_msg; @@ -221,11 +221,11 @@ DefaultVectorsFormat::read_uids(const storage::OperationPtr& directory_ptr, std: } void -DefaultVectorsFormat::read_vectors(const storage::OperationPtr& directory_ptr, off_t offset, size_t num_bytes, +DefaultVectorsFormat::read_vectors(const storage::FSHandlerPtr& fs_ptr, off_t offset, size_t num_bytes, std::vector& raw_vectors) { const std::lock_guard lock(mutex_); - std::string dir_path = directory_ptr->GetDirectory(); + std::string dir_path = fs_ptr->operation_ptr_->GetDirectory(); if (!boost::filesystem::is_directory(dir_path)) { std::string err_msg = "Directory: " + dir_path + "does not exist"; ENGINE_LOG_ERROR << err_msg; diff --git a/core/src/codecs/default/DefaultVectorsFormat.h b/core/src/codecs/default/DefaultVectorsFormat.h index 1fd2081fb9..bfb20f221b 100644 --- a/core/src/codecs/default/DefaultVectorsFormat.h +++ b/core/src/codecs/default/DefaultVectorsFormat.h @@ -32,16 +32,16 @@ class DefaultVectorsFormat : public VectorsFormat { DefaultVectorsFormat() = default; void - read(const storage::OperationPtr& directory_ptr, segment::VectorsPtr& vectors_read) override; + read(const storage::FSHandlerPtr& fs_ptr, segment::VectorsPtr& vectors_read) override; void - write(const storage::OperationPtr& directory_ptr, const segment::VectorsPtr& vectors) override; + write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorsPtr& vectors) override; void - read_uids(const storage::OperationPtr& directory_ptr, std::vector& uids) override; + read_uids(const storage::FSHandlerPtr& fs_ptr, std::vector& uids) override; void - read_vectors(const storage::OperationPtr& directory_ptr, off_t offset, size_t num_bytes, + read_vectors(const storage::FSHandlerPtr& fs_ptr, off_t offset, size_t num_bytes, std::vector& raw_vectors) override; // No copy and move diff --git a/core/src/segment/SegmentReader.cpp b/core/src/segment/SegmentReader.cpp index 6bbf497ba1..757ed21c56 100644 --- a/core/src/segment/SegmentReader.cpp +++ b/core/src/segment/SegmentReader.cpp @@ -21,14 +21,19 @@ #include "Vectors.h" #include "codecs/default/DefaultCodec.h" -#include "src/storage/disk/DiskOperation.h" +#include "storage/disk/DiskIOReader.h" +#include "storage/disk/DiskIOWriter.h" +#include "storage/disk/DiskOperation.h" #include "utils/Log.h" namespace milvus { namespace segment { SegmentReader::SegmentReader(const std::string& directory) { - directory_ptr_ = std::make_shared(directory); + storage::IOReaderPtr reader_ptr = std::make_shared(); + storage::IOWriterPtr writer_ptr = std::make_shared(); + storage::OperationPtr operation_ptr = std::make_shared(directory); + fs_ptr_ = std::make_shared(reader_ptr, writer_ptr, operation_ptr); segment_ptr_ = std::make_shared(); } @@ -43,9 +48,9 @@ SegmentReader::Load() { // TODO(zhiru) codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); - default_codec.GetVectorsFormat()->read(directory_ptr_, segment_ptr_->vectors_ptr_); - default_codec.GetDeletedDocsFormat()->read(directory_ptr_, segment_ptr_->deleted_docs_ptr_); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetVectorsFormat()->read(fs_ptr_, segment_ptr_->vectors_ptr_); + default_codec.GetDeletedDocsFormat()->read(fs_ptr_, segment_ptr_->deleted_docs_ptr_); } catch (std::exception& e) { return Status(DB_ERROR, e.what()); } @@ -56,8 +61,8 @@ Status SegmentReader::LoadVectors(off_t offset, size_t num_bytes, std::vector& raw_vectors) { codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); - default_codec.GetVectorsFormat()->read_vectors(directory_ptr_, offset, num_bytes, raw_vectors); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetVectorsFormat()->read_vectors(fs_ptr_, offset, num_bytes, raw_vectors); } catch (std::exception& e) { std::string err_msg = "Failed to load raw vectors: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -70,8 +75,8 @@ Status SegmentReader::LoadUids(std::vector& uids) { codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); - default_codec.GetVectorsFormat()->read_uids(directory_ptr_, uids); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetVectorsFormat()->read_uids(fs_ptr_, uids); } catch (std::exception& e) { std::string err_msg = "Failed to load uids: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -90,8 +95,8 @@ Status SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) { codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); - default_codec.GetIdBloomFilterFormat()->read(directory_ptr_, id_bloom_filter_ptr); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetIdBloomFilterFormat()->read(fs_ptr_, id_bloom_filter_ptr); } catch (std::exception& e) { std::string err_msg = "Failed to load bloom filter: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -104,8 +109,8 @@ Status SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) { codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); - default_codec.GetDeletedDocsFormat()->read(directory_ptr_, deleted_docs_ptr); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetDeletedDocsFormat()->read(fs_ptr_, deleted_docs_ptr); } catch (std::exception& e) { std::string err_msg = "Failed to load deleted docs: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; diff --git a/core/src/segment/SegmentReader.h b/core/src/segment/SegmentReader.h index 7a41a7939a..48247e5deb 100644 --- a/core/src/segment/SegmentReader.h +++ b/core/src/segment/SegmentReader.h @@ -22,7 +22,7 @@ #include #include "segment/Types.h" -#include "src/storage/disk/DiskOperation.h" +#include "storage/FSHandler.h" #include "utils/Status.h" namespace milvus { @@ -55,7 +55,7 @@ class SegmentReader { GetSegment(SegmentPtr& segment_ptr); private: - storage::OperationPtr directory_ptr_; + storage::FSHandlerPtr fs_ptr_; SegmentPtr segment_ptr_; }; diff --git a/core/src/segment/SegmentWriter.cpp b/core/src/segment/SegmentWriter.cpp index fa210ae36e..2136e229c0 100644 --- a/core/src/segment/SegmentWriter.cpp +++ b/core/src/segment/SegmentWriter.cpp @@ -23,14 +23,19 @@ #include "SegmentReader.h" #include "Vectors.h" #include "codecs/default/DefaultCodec.h" -#include "src/storage/disk/DiskOperation.h" +#include "storage/disk/DiskIOReader.h" +#include "storage/disk/DiskIOWriter.h" +#include "storage/disk/DiskOperation.h" #include "utils/Log.h" namespace milvus { namespace segment { SegmentWriter::SegmentWriter(const std::string& directory) { - directory_ptr_ = std::make_shared(directory); + storage::IOReaderPtr reader_ptr = std::make_shared(); + storage::IOWriterPtr writer_ptr = std::make_shared(); + storage::OperationPtr operation_ptr = std::make_shared(directory); + fs_ptr_ = std::make_shared(reader_ptr, writer_ptr, operation_ptr); segment_ptr_ = std::make_shared(); } @@ -84,8 +89,8 @@ Status SegmentWriter::WriteVectors() { codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); - default_codec.GetVectorsFormat()->write(directory_ptr_, segment_ptr_->vectors_ptr_); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetVectorsFormat()->write(fs_ptr_, segment_ptr_->vectors_ptr_); } catch (std::exception& e) { std::string err_msg = "Failed to write vectors: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -98,11 +103,11 @@ Status SegmentWriter::WriteBloomFilter() { codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); + fs_ptr_->operation_ptr_->CreateDirectory(); auto start = std::chrono::high_resolution_clock::now(); - default_codec.GetIdBloomFilterFormat()->create(directory_ptr_, segment_ptr_->id_bloom_filter_ptr_); + default_codec.GetIdBloomFilterFormat()->create(fs_ptr_, segment_ptr_->id_bloom_filter_ptr_); auto end = std::chrono::high_resolution_clock::now(); std::chrono::duration diff = end - start; @@ -121,7 +126,7 @@ SegmentWriter::WriteBloomFilter() { start = std::chrono::high_resolution_clock::now(); - default_codec.GetIdBloomFilterFormat()->write(directory_ptr_, segment_ptr_->id_bloom_filter_ptr_); + default_codec.GetIdBloomFilterFormat()->write(fs_ptr_, segment_ptr_->id_bloom_filter_ptr_); end = std::chrono::high_resolution_clock::now(); diff = end - start; @@ -138,9 +143,9 @@ Status SegmentWriter::WriteDeletedDocs() { codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); + fs_ptr_->operation_ptr_->CreateDirectory(); DeletedDocsPtr deleted_docs_ptr = std::make_shared(); - default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs_ptr); + default_codec.GetDeletedDocsFormat()->write(fs_ptr_, deleted_docs_ptr); } catch (std::exception& e) { std::string err_msg = "Failed to write deleted docs: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -153,8 +158,8 @@ Status SegmentWriter::WriteDeletedDocs(const DeletedDocsPtr& deleted_docs) { codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); - default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetDeletedDocsFormat()->write(fs_ptr_, deleted_docs); } catch (std::exception& e) { std::string err_msg = "Failed to write deleted docs: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -167,8 +172,8 @@ Status SegmentWriter::WriteBloomFilter(const IdBloomFilterPtr& id_bloom_filter_ptr) { codec::DefaultCodec default_codec; try { - directory_ptr_->CreateDirectory(); - default_codec.GetIdBloomFilterFormat()->write(directory_ptr_, id_bloom_filter_ptr); + fs_ptr_->operation_ptr_->CreateDirectory(); + default_codec.GetIdBloomFilterFormat()->write(fs_ptr_, id_bloom_filter_ptr); } catch (std::exception& e) { std::string err_msg = "Failed to write bloom filter: " + std::string(e.what()); ENGINE_LOG_ERROR << err_msg; @@ -191,11 +196,11 @@ SegmentWriter::GetSegment(SegmentPtr& segment_ptr) { Status SegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name) { - if (dir_to_merge == directory_ptr_->GetDirectory()) { + if (dir_to_merge == fs_ptr_->operation_ptr_->GetDirectory()) { return Status(DB_ERROR, "Cannot Merge Self"); } - ENGINE_LOG_DEBUG << "Merging from " << dir_to_merge << " to " << directory_ptr_->GetDirectory(); + ENGINE_LOG_DEBUG << "Merging from " << dir_to_merge << " to " << fs_ptr_->operation_ptr_->GetDirectory(); auto start = std::chrono::high_resolution_clock::now(); @@ -234,7 +239,7 @@ SegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name) { ENGINE_LOG_DEBUG << "Adding " << segment_to_merge->vectors_ptr_->GetCount() << " vectors and uids took " << diff.count() << " s"; - ENGINE_LOG_DEBUG << "Merging completed from " << dir_to_merge << " to " << directory_ptr_->GetDirectory(); + ENGINE_LOG_DEBUG << "Merging completed from " << dir_to_merge << " to " << fs_ptr_->operation_ptr_->GetDirectory(); return Status::OK(); } diff --git a/core/src/segment/SegmentWriter.h b/core/src/segment/SegmentWriter.h index a430cda9b8..b5d42761a1 100644 --- a/core/src/segment/SegmentWriter.h +++ b/core/src/segment/SegmentWriter.h @@ -22,7 +22,7 @@ #include #include "segment/Types.h" -#include "src/storage/disk/DiskOperation.h" +#include "storage/FSHandler.h" #include "utils/Status.h" namespace milvus { @@ -70,7 +70,7 @@ class SegmentWriter { WriteDeletedDocs(); private: - storage::OperationPtr directory_ptr_; + storage::FSHandlerPtr fs_ptr_; SegmentPtr segment_ptr_; }; diff --git a/core/src/storage/FSHandler.h b/core/src/storage/FSHandler.h new file mode 100644 index 0000000000..8b0f175bf9 --- /dev/null +++ b/core/src/storage/FSHandler.h @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "storage/IOReader.h" +#include "storage/IOWriter.h" +#include "storage/Operation.h" + +namespace milvus { +namespace storage { + +struct FSHandler { + IOReaderPtr reader_ptr_ = nullptr; + IOWriterPtr writer_ptr_ = nullptr; + OperationPtr operation_ptr_ = nullptr; + + FSHandler(IOReaderPtr& reader_ptr, IOWriterPtr& writer_ptr, OperationPtr& operation_ptr) + : reader_ptr_(reader_ptr), writer_ptr_(writer_ptr), operation_ptr_(operation_ptr) { + } +}; + +using FSHandlerPtr = std::shared_ptr; + +} // namespace storage +} // namespace milvus diff --git a/core/src/storage/IOReader.h b/core/src/storage/IOReader.h index adfeee95db..eeddb7438b 100644 --- a/core/src/storage/IOReader.h +++ b/core/src/storage/IOReader.h @@ -11,6 +11,7 @@ #pragma once +#include #include namespace milvus { @@ -34,5 +35,7 @@ class IOReader { close() = 0; }; +using IOReaderPtr = std::shared_ptr; + } // namespace storage } // namespace milvus diff --git a/core/src/storage/IOWriter.h b/core/src/storage/IOWriter.h index 0662d8eb9d..38b6887057 100644 --- a/core/src/storage/IOWriter.h +++ b/core/src/storage/IOWriter.h @@ -11,6 +11,7 @@ #pragma once +#include #include namespace milvus { @@ -31,5 +32,7 @@ class IOWriter { close() = 0; }; +using IOWriterPtr = std::shared_ptr; + } // namespace storage } // namespace milvus From a279da832e49d52019098ce477d9c187e8e7c46a Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Thu, 12 Mar 2020 11:43:50 +0800 Subject: [PATCH 09/12] #1548 move store/Directory to storage and add FSHandler Signed-off-by: yudong.cai --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f5708d203c..e25f3e1afa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Please mark all change in change log and use the issue from GitHub - \#1537 Optimize raw vector and uids read/write - \#1546 Move Config.cpp to config directory - \#1547 Rename storage/file to storage/disk and rename classes +- \#1548 Move store/Directory to storage/Operation and add FSHandler ## Task From acfd6415a857eb2c683af2a9f8a097264793055c Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Thu, 12 Mar 2020 14:36:12 +0800 Subject: [PATCH 10/12] retry CI Signed-off-by: yudong.cai --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e25f3e1afa..c2723cad88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Please mark all change in change log and use the issue from GitHub ## Task + # Milvus 0.7.0 (2020-03-11) ## Bug From 7fc237f4bdceed585175185a1f4d9e550b02c288 Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Thu, 12 Mar 2020 14:49:11 +0800 Subject: [PATCH 11/12] retry CI Signed-off-by: yudong.cai --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c2723cad88..e25f3e1afa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,6 @@ Please mark all change in change log and use the issue from GitHub ## Task - # Milvus 0.7.0 (2020-03-11) ## Bug From 058c993a99edf84a58187955d6c95e7e2fa2fb0c Mon Sep 17 00:00:00 2001 From: "yudong.cai" Date: Thu, 12 Mar 2020 16:26:56 +0800 Subject: [PATCH 12/12] update header file Signed-off-by: yudong.cai --- core/src/codecs/default/DefaultIdBloomFilterFormat.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/codecs/default/DefaultIdBloomFilterFormat.h b/core/src/codecs/default/DefaultIdBloomFilterFormat.h index b2c89ebfa2..e35daad9ef 100644 --- a/core/src/codecs/default/DefaultIdBloomFilterFormat.h +++ b/core/src/codecs/default/DefaultIdBloomFilterFormat.h @@ -22,7 +22,7 @@ #include "codecs/IdBloomFilterFormat.h" #include "segment/IdBloomFilter.h" -#include "src/storage/disk/DiskOperation.h" +#include "storage/disk/DiskOperation.h" namespace milvus { namespace codec {