mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-05 02:12:48 +08:00
parent
28a0f1de01
commit
817ea8b9e1
@ -20,7 +20,7 @@
|
||||
#include <memory>
|
||||
|
||||
#include "segment/DeletedDocs.h"
|
||||
#include "src/storage/disk/DiskOperation.h"
|
||||
#include "storage/FSHandler.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace codec {
|
||||
@ -28,10 +28,10 @@ namespace codec {
|
||||
class DeletedDocsFormat {
|
||||
public:
|
||||
virtual void
|
||||
read(const storage::OperationPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) = 0;
|
||||
read(const storage::FSHandlerPtr& fs_ptr, segment::DeletedDocsPtr& deleted_docs) = 0;
|
||||
|
||||
virtual void
|
||||
write(const storage::OperationPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) = 0;
|
||||
write(const storage::FSHandlerPtr& fs_ptr, const segment::DeletedDocsPtr& deleted_docs) = 0;
|
||||
};
|
||||
|
||||
using DeletedDocsFormatPtr = std::shared_ptr<DeletedDocsFormat>;
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
#include <memory>
|
||||
|
||||
#include "segment/IdBloomFilter.h"
|
||||
#include "src/storage/disk/DiskOperation.h"
|
||||
#include "storage/FSHandler.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace codec {
|
||||
@ -28,13 +28,13 @@ namespace codec {
|
||||
class IdBloomFilterFormat {
|
||||
public:
|
||||
virtual void
|
||||
read(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0;
|
||||
read(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0;
|
||||
|
||||
virtual void
|
||||
write(const storage::OperationPtr& directory_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0;
|
||||
write(const storage::FSHandlerPtr& fs_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0;
|
||||
|
||||
virtual void
|
||||
create(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0;
|
||||
create(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) = 0;
|
||||
};
|
||||
|
||||
using IdBloomFilterFormatPtr = std::shared_ptr<IdBloomFilterFormat>;
|
||||
|
||||
@ -21,7 +21,7 @@
|
||||
#include <vector>
|
||||
|
||||
#include "segment/Vectors.h"
|
||||
#include "src/storage/disk/DiskOperation.h"
|
||||
#include "storage/FSHandler.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace codec {
|
||||
@ -29,16 +29,16 @@ namespace codec {
|
||||
class VectorsFormat {
|
||||
public:
|
||||
virtual void
|
||||
read(const storage::OperationPtr& directory_ptr, segment::VectorsPtr& vectors_read) = 0;
|
||||
read(const storage::FSHandlerPtr& fs_ptr, segment::VectorsPtr& vectors_read) = 0;
|
||||
|
||||
virtual void
|
||||
write(const storage::OperationPtr& directory_ptr, const segment::VectorsPtr& vectors) = 0;
|
||||
write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorsPtr& vectors) = 0;
|
||||
|
||||
virtual void
|
||||
read_uids(const storage::OperationPtr& directory_ptr, std::vector<segment::doc_id_t>& uids) = 0;
|
||||
read_uids(const storage::FSHandlerPtr& fs_ptr, std::vector<segment::doc_id_t>& uids) = 0;
|
||||
|
||||
virtual void
|
||||
read_vectors(const storage::OperationPtr& directory_ptr, off_t offset, size_t num_bytes,
|
||||
read_vectors(const storage::FSHandlerPtr& fs_ptr, off_t offset, size_t num_bytes,
|
||||
std::vector<uint8_t>& raw_vectors) = 0;
|
||||
};
|
||||
|
||||
|
||||
@ -35,10 +35,10 @@ namespace milvus {
|
||||
namespace codec {
|
||||
|
||||
void
|
||||
DefaultDeletedDocsFormat::read(const storage::OperationPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) {
|
||||
DefaultDeletedDocsFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::DeletedDocsPtr& deleted_docs) {
|
||||
const std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
std::string dir_path = directory_ptr->GetDirectory();
|
||||
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
|
||||
const std::string del_file_path = dir_path + "/" + deleted_docs_filename_;
|
||||
|
||||
int del_fd = open(del_file_path.c_str(), O_RDONLY, 00664);
|
||||
@ -75,11 +75,10 @@ DefaultDeletedDocsFormat::read(const storage::OperationPtr& directory_ptr, segme
|
||||
}
|
||||
|
||||
void
|
||||
DefaultDeletedDocsFormat::write(const storage::OperationPtr& directory_ptr,
|
||||
const segment::DeletedDocsPtr& deleted_docs) {
|
||||
DefaultDeletedDocsFormat::write(const storage::FSHandlerPtr& fs_ptr, const segment::DeletedDocsPtr& deleted_docs) {
|
||||
const std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
std::string dir_path = directory_ptr->GetDirectory();
|
||||
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
|
||||
const std::string del_file_path = dir_path + "/" + deleted_docs_filename_;
|
||||
|
||||
// Create a temporary file from the existing file
|
||||
|
||||
@ -30,10 +30,10 @@ class DefaultDeletedDocsFormat : public DeletedDocsFormat {
|
||||
DefaultDeletedDocsFormat() = default;
|
||||
|
||||
void
|
||||
read(const storage::OperationPtr& directory_ptr, segment::DeletedDocsPtr& deleted_docs) override;
|
||||
read(const storage::FSHandlerPtr& fs_ptr, segment::DeletedDocsPtr& deleted_docs) override;
|
||||
|
||||
void
|
||||
write(const storage::OperationPtr& directory_ptr, const segment::DeletedDocsPtr& deleted_docs) override;
|
||||
write(const storage::FSHandlerPtr& fs_ptr, const segment::DeletedDocsPtr& deleted_docs) override;
|
||||
|
||||
// No copy and move
|
||||
DefaultDeletedDocsFormat(const DefaultDeletedDocsFormat&) = delete;
|
||||
|
||||
@ -30,11 +30,10 @@ constexpr unsigned int bloom_filter_capacity = 500000;
|
||||
constexpr double bloom_filter_error_rate = 0.01;
|
||||
|
||||
void
|
||||
DefaultIdBloomFilterFormat::read(const storage::OperationPtr& directory_ptr,
|
||||
segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
|
||||
DefaultIdBloomFilterFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
|
||||
const std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
std::string dir_path = directory_ptr->GetDirectory();
|
||||
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
|
||||
const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_;
|
||||
scaling_bloom_t* bloom_filter =
|
||||
new_scaling_bloom_from_file(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str());
|
||||
@ -48,11 +47,11 @@ DefaultIdBloomFilterFormat::read(const storage::OperationPtr& directory_ptr,
|
||||
}
|
||||
|
||||
void
|
||||
DefaultIdBloomFilterFormat::write(const storage::OperationPtr& directory_ptr,
|
||||
DefaultIdBloomFilterFormat::write(const storage::FSHandlerPtr& fs_ptr,
|
||||
const segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
|
||||
const std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
std::string dir_path = directory_ptr->GetDirectory();
|
||||
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
|
||||
const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_;
|
||||
if (scaling_bloom_flush(id_bloom_filter_ptr->GetBloomFilter()) == -1) {
|
||||
std::string err_msg =
|
||||
@ -63,9 +62,9 @@ DefaultIdBloomFilterFormat::write(const storage::OperationPtr& directory_ptr,
|
||||
}
|
||||
|
||||
void
|
||||
DefaultIdBloomFilterFormat::create(const storage::OperationPtr& directory_ptr,
|
||||
DefaultIdBloomFilterFormat::create(const storage::FSHandlerPtr& fs_ptr,
|
||||
segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
|
||||
std::string dir_path = directory_ptr->GetDirectory();
|
||||
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
|
||||
const std::string bloom_filter_file_path = dir_path + "/" + bloom_filter_filename_;
|
||||
scaling_bloom_t* bloom_filter =
|
||||
new_scaling_bloom(bloom_filter_capacity, bloom_filter_error_rate, bloom_filter_file_path.c_str());
|
||||
|
||||
@ -32,13 +32,13 @@ class DefaultIdBloomFilterFormat : public IdBloomFilterFormat {
|
||||
DefaultIdBloomFilterFormat() = default;
|
||||
|
||||
void
|
||||
read(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override;
|
||||
read(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override;
|
||||
|
||||
void
|
||||
write(const storage::OperationPtr& directory_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) override;
|
||||
write(const storage::FSHandlerPtr& fs_ptr, const segment::IdBloomFilterPtr& id_bloom_filter_ptr) override;
|
||||
|
||||
void
|
||||
create(const storage::OperationPtr& directory_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override;
|
||||
create(const storage::FSHandlerPtr& fs_ptr, segment::IdBloomFilterPtr& id_bloom_filter_ptr) override;
|
||||
|
||||
// No copy and move
|
||||
DefaultIdBloomFilterFormat(const DefaultIdBloomFilterFormat&) = delete;
|
||||
|
||||
@ -102,10 +102,10 @@ DefaultVectorsFormat::read_uids_internal(const std::string& file_path, std::vect
|
||||
}
|
||||
|
||||
void
|
||||
DefaultVectorsFormat::read(const storage::OperationPtr& directory_ptr, segment::VectorsPtr& vectors_read) {
|
||||
DefaultVectorsFormat::read(const storage::FSHandlerPtr& fs_ptr, segment::VectorsPtr& vectors_read) {
|
||||
const std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
std::string dir_path = directory_ptr->GetDirectory();
|
||||
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
|
||||
if (!boost::filesystem::is_directory(dir_path)) {
|
||||
std::string err_msg = "Directory: " + dir_path + "does not exist";
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
@ -134,10 +134,10 @@ DefaultVectorsFormat::read(const storage::OperationPtr& directory_ptr, segment::
|
||||
}
|
||||
|
||||
void
|
||||
DefaultVectorsFormat::write(const storage::OperationPtr& directory_ptr, const segment::VectorsPtr& vectors) {
|
||||
DefaultVectorsFormat::write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorsPtr& vectors) {
|
||||
const std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
std::string dir_path = directory_ptr->GetDirectory();
|
||||
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
|
||||
|
||||
const std::string rv_file_path = dir_path + "/" + vectors->GetName() + raw_vector_extension_;
|
||||
const std::string uid_file_path = dir_path + "/" + vectors->GetName() + user_id_extension_;
|
||||
@ -197,10 +197,10 @@ DefaultVectorsFormat::write(const storage::OperationPtr& directory_ptr, const se
|
||||
}
|
||||
|
||||
void
|
||||
DefaultVectorsFormat::read_uids(const storage::OperationPtr& directory_ptr, std::vector<segment::doc_id_t>& uids) {
|
||||
DefaultVectorsFormat::read_uids(const storage::FSHandlerPtr& fs_ptr, std::vector<segment::doc_id_t>& uids) {
|
||||
const std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
std::string dir_path = directory_ptr->GetDirectory();
|
||||
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
|
||||
if (!boost::filesystem::is_directory(dir_path)) {
|
||||
std::string err_msg = "Directory: " + dir_path + "does not exist";
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
@ -221,11 +221,11 @@ DefaultVectorsFormat::read_uids(const storage::OperationPtr& directory_ptr, std:
|
||||
}
|
||||
|
||||
void
|
||||
DefaultVectorsFormat::read_vectors(const storage::OperationPtr& directory_ptr, off_t offset, size_t num_bytes,
|
||||
DefaultVectorsFormat::read_vectors(const storage::FSHandlerPtr& fs_ptr, off_t offset, size_t num_bytes,
|
||||
std::vector<uint8_t>& raw_vectors) {
|
||||
const std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
std::string dir_path = directory_ptr->GetDirectory();
|
||||
std::string dir_path = fs_ptr->operation_ptr_->GetDirectory();
|
||||
if (!boost::filesystem::is_directory(dir_path)) {
|
||||
std::string err_msg = "Directory: " + dir_path + "does not exist";
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
|
||||
@ -32,16 +32,16 @@ class DefaultVectorsFormat : public VectorsFormat {
|
||||
DefaultVectorsFormat() = default;
|
||||
|
||||
void
|
||||
read(const storage::OperationPtr& directory_ptr, segment::VectorsPtr& vectors_read) override;
|
||||
read(const storage::FSHandlerPtr& fs_ptr, segment::VectorsPtr& vectors_read) override;
|
||||
|
||||
void
|
||||
write(const storage::OperationPtr& directory_ptr, const segment::VectorsPtr& vectors) override;
|
||||
write(const storage::FSHandlerPtr& fs_ptr, const segment::VectorsPtr& vectors) override;
|
||||
|
||||
void
|
||||
read_uids(const storage::OperationPtr& directory_ptr, std::vector<segment::doc_id_t>& uids) override;
|
||||
read_uids(const storage::FSHandlerPtr& fs_ptr, std::vector<segment::doc_id_t>& uids) override;
|
||||
|
||||
void
|
||||
read_vectors(const storage::OperationPtr& directory_ptr, off_t offset, size_t num_bytes,
|
||||
read_vectors(const storage::FSHandlerPtr& fs_ptr, off_t offset, size_t num_bytes,
|
||||
std::vector<uint8_t>& raw_vectors) override;
|
||||
|
||||
// No copy and move
|
||||
|
||||
@ -21,14 +21,19 @@
|
||||
|
||||
#include "Vectors.h"
|
||||
#include "codecs/default/DefaultCodec.h"
|
||||
#include "src/storage/disk/DiskOperation.h"
|
||||
#include "storage/disk/DiskIOReader.h"
|
||||
#include "storage/disk/DiskIOWriter.h"
|
||||
#include "storage/disk/DiskOperation.h"
|
||||
#include "utils/Log.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace segment {
|
||||
|
||||
SegmentReader::SegmentReader(const std::string& directory) {
|
||||
directory_ptr_ = std::make_shared<storage::DiskOperation>(directory);
|
||||
storage::IOReaderPtr reader_ptr = std::make_shared<storage::DiskIOReader>();
|
||||
storage::IOWriterPtr writer_ptr = std::make_shared<storage::DiskIOWriter>();
|
||||
storage::OperationPtr operation_ptr = std::make_shared<storage::DiskOperation>(directory);
|
||||
fs_ptr_ = std::make_shared<storage::FSHandler>(reader_ptr, writer_ptr, operation_ptr);
|
||||
segment_ptr_ = std::make_shared<Segment>();
|
||||
}
|
||||
|
||||
@ -43,9 +48,9 @@ SegmentReader::Load() {
|
||||
// TODO(zhiru)
|
||||
codec::DefaultCodec default_codec;
|
||||
try {
|
||||
directory_ptr_->CreateDirectory();
|
||||
default_codec.GetVectorsFormat()->read(directory_ptr_, segment_ptr_->vectors_ptr_);
|
||||
default_codec.GetDeletedDocsFormat()->read(directory_ptr_, segment_ptr_->deleted_docs_ptr_);
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
default_codec.GetVectorsFormat()->read(fs_ptr_, segment_ptr_->vectors_ptr_);
|
||||
default_codec.GetDeletedDocsFormat()->read(fs_ptr_, segment_ptr_->deleted_docs_ptr_);
|
||||
} catch (std::exception& e) {
|
||||
return Status(DB_ERROR, e.what());
|
||||
}
|
||||
@ -56,8 +61,8 @@ Status
|
||||
SegmentReader::LoadVectors(off_t offset, size_t num_bytes, std::vector<uint8_t>& raw_vectors) {
|
||||
codec::DefaultCodec default_codec;
|
||||
try {
|
||||
directory_ptr_->CreateDirectory();
|
||||
default_codec.GetVectorsFormat()->read_vectors(directory_ptr_, offset, num_bytes, raw_vectors);
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
default_codec.GetVectorsFormat()->read_vectors(fs_ptr_, offset, num_bytes, raw_vectors);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to load raw vectors: " + std::string(e.what());
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
@ -70,8 +75,8 @@ Status
|
||||
SegmentReader::LoadUids(std::vector<doc_id_t>& uids) {
|
||||
codec::DefaultCodec default_codec;
|
||||
try {
|
||||
directory_ptr_->CreateDirectory();
|
||||
default_codec.GetVectorsFormat()->read_uids(directory_ptr_, uids);
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
default_codec.GetVectorsFormat()->read_uids(fs_ptr_, uids);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to load uids: " + std::string(e.what());
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
@ -90,8 +95,8 @@ Status
|
||||
SegmentReader::LoadBloomFilter(segment::IdBloomFilterPtr& id_bloom_filter_ptr) {
|
||||
codec::DefaultCodec default_codec;
|
||||
try {
|
||||
directory_ptr_->CreateDirectory();
|
||||
default_codec.GetIdBloomFilterFormat()->read(directory_ptr_, id_bloom_filter_ptr);
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
default_codec.GetIdBloomFilterFormat()->read(fs_ptr_, id_bloom_filter_ptr);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to load bloom filter: " + std::string(e.what());
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
@ -104,8 +109,8 @@ Status
|
||||
SegmentReader::LoadDeletedDocs(segment::DeletedDocsPtr& deleted_docs_ptr) {
|
||||
codec::DefaultCodec default_codec;
|
||||
try {
|
||||
directory_ptr_->CreateDirectory();
|
||||
default_codec.GetDeletedDocsFormat()->read(directory_ptr_, deleted_docs_ptr);
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
default_codec.GetDeletedDocsFormat()->read(fs_ptr_, deleted_docs_ptr);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to load deleted docs: " + std::string(e.what());
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
|
||||
@ -22,7 +22,7 @@
|
||||
#include <vector>
|
||||
|
||||
#include "segment/Types.h"
|
||||
#include "src/storage/disk/DiskOperation.h"
|
||||
#include "storage/FSHandler.h"
|
||||
#include "utils/Status.h"
|
||||
|
||||
namespace milvus {
|
||||
@ -55,7 +55,7 @@ class SegmentReader {
|
||||
GetSegment(SegmentPtr& segment_ptr);
|
||||
|
||||
private:
|
||||
storage::OperationPtr directory_ptr_;
|
||||
storage::FSHandlerPtr fs_ptr_;
|
||||
SegmentPtr segment_ptr_;
|
||||
};
|
||||
|
||||
|
||||
@ -23,14 +23,19 @@
|
||||
#include "SegmentReader.h"
|
||||
#include "Vectors.h"
|
||||
#include "codecs/default/DefaultCodec.h"
|
||||
#include "src/storage/disk/DiskOperation.h"
|
||||
#include "storage/disk/DiskIOReader.h"
|
||||
#include "storage/disk/DiskIOWriter.h"
|
||||
#include "storage/disk/DiskOperation.h"
|
||||
#include "utils/Log.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace segment {
|
||||
|
||||
SegmentWriter::SegmentWriter(const std::string& directory) {
|
||||
directory_ptr_ = std::make_shared<storage::DiskOperation>(directory);
|
||||
storage::IOReaderPtr reader_ptr = std::make_shared<storage::DiskIOReader>();
|
||||
storage::IOWriterPtr writer_ptr = std::make_shared<storage::DiskIOWriter>();
|
||||
storage::OperationPtr operation_ptr = std::make_shared<storage::DiskOperation>(directory);
|
||||
fs_ptr_ = std::make_shared<storage::FSHandler>(reader_ptr, writer_ptr, operation_ptr);
|
||||
segment_ptr_ = std::make_shared<Segment>();
|
||||
}
|
||||
|
||||
@ -84,8 +89,8 @@ Status
|
||||
SegmentWriter::WriteVectors() {
|
||||
codec::DefaultCodec default_codec;
|
||||
try {
|
||||
directory_ptr_->CreateDirectory();
|
||||
default_codec.GetVectorsFormat()->write(directory_ptr_, segment_ptr_->vectors_ptr_);
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
default_codec.GetVectorsFormat()->write(fs_ptr_, segment_ptr_->vectors_ptr_);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to write vectors: " + std::string(e.what());
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
@ -98,11 +103,11 @@ Status
|
||||
SegmentWriter::WriteBloomFilter() {
|
||||
codec::DefaultCodec default_codec;
|
||||
try {
|
||||
directory_ptr_->CreateDirectory();
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
|
||||
default_codec.GetIdBloomFilterFormat()->create(directory_ptr_, segment_ptr_->id_bloom_filter_ptr_);
|
||||
default_codec.GetIdBloomFilterFormat()->create(fs_ptr_, segment_ptr_->id_bloom_filter_ptr_);
|
||||
|
||||
auto end = std::chrono::high_resolution_clock::now();
|
||||
std::chrono::duration<double> diff = end - start;
|
||||
@ -121,7 +126,7 @@ SegmentWriter::WriteBloomFilter() {
|
||||
|
||||
start = std::chrono::high_resolution_clock::now();
|
||||
|
||||
default_codec.GetIdBloomFilterFormat()->write(directory_ptr_, segment_ptr_->id_bloom_filter_ptr_);
|
||||
default_codec.GetIdBloomFilterFormat()->write(fs_ptr_, segment_ptr_->id_bloom_filter_ptr_);
|
||||
|
||||
end = std::chrono::high_resolution_clock::now();
|
||||
diff = end - start;
|
||||
@ -138,9 +143,9 @@ Status
|
||||
SegmentWriter::WriteDeletedDocs() {
|
||||
codec::DefaultCodec default_codec;
|
||||
try {
|
||||
directory_ptr_->CreateDirectory();
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
DeletedDocsPtr deleted_docs_ptr = std::make_shared<DeletedDocs>();
|
||||
default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs_ptr);
|
||||
default_codec.GetDeletedDocsFormat()->write(fs_ptr_, deleted_docs_ptr);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to write deleted docs: " + std::string(e.what());
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
@ -153,8 +158,8 @@ Status
|
||||
SegmentWriter::WriteDeletedDocs(const DeletedDocsPtr& deleted_docs) {
|
||||
codec::DefaultCodec default_codec;
|
||||
try {
|
||||
directory_ptr_->CreateDirectory();
|
||||
default_codec.GetDeletedDocsFormat()->write(directory_ptr_, deleted_docs);
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
default_codec.GetDeletedDocsFormat()->write(fs_ptr_, deleted_docs);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to write deleted docs: " + std::string(e.what());
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
@ -167,8 +172,8 @@ Status
|
||||
SegmentWriter::WriteBloomFilter(const IdBloomFilterPtr& id_bloom_filter_ptr) {
|
||||
codec::DefaultCodec default_codec;
|
||||
try {
|
||||
directory_ptr_->CreateDirectory();
|
||||
default_codec.GetIdBloomFilterFormat()->write(directory_ptr_, id_bloom_filter_ptr);
|
||||
fs_ptr_->operation_ptr_->CreateDirectory();
|
||||
default_codec.GetIdBloomFilterFormat()->write(fs_ptr_, id_bloom_filter_ptr);
|
||||
} catch (std::exception& e) {
|
||||
std::string err_msg = "Failed to write bloom filter: " + std::string(e.what());
|
||||
ENGINE_LOG_ERROR << err_msg;
|
||||
@ -191,11 +196,11 @@ SegmentWriter::GetSegment(SegmentPtr& segment_ptr) {
|
||||
|
||||
Status
|
||||
SegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name) {
|
||||
if (dir_to_merge == directory_ptr_->GetDirectory()) {
|
||||
if (dir_to_merge == fs_ptr_->operation_ptr_->GetDirectory()) {
|
||||
return Status(DB_ERROR, "Cannot Merge Self");
|
||||
}
|
||||
|
||||
ENGINE_LOG_DEBUG << "Merging from " << dir_to_merge << " to " << directory_ptr_->GetDirectory();
|
||||
ENGINE_LOG_DEBUG << "Merging from " << dir_to_merge << " to " << fs_ptr_->operation_ptr_->GetDirectory();
|
||||
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
|
||||
@ -234,7 +239,7 @@ SegmentWriter::Merge(const std::string& dir_to_merge, const std::string& name) {
|
||||
ENGINE_LOG_DEBUG << "Adding " << segment_to_merge->vectors_ptr_->GetCount() << " vectors and uids took "
|
||||
<< diff.count() << " s";
|
||||
|
||||
ENGINE_LOG_DEBUG << "Merging completed from " << dir_to_merge << " to " << directory_ptr_->GetDirectory();
|
||||
ENGINE_LOG_DEBUG << "Merging completed from " << dir_to_merge << " to " << fs_ptr_->operation_ptr_->GetDirectory();
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -22,7 +22,7 @@
|
||||
#include <vector>
|
||||
|
||||
#include "segment/Types.h"
|
||||
#include "src/storage/disk/DiskOperation.h"
|
||||
#include "storage/FSHandler.h"
|
||||
#include "utils/Status.h"
|
||||
|
||||
namespace milvus {
|
||||
@ -70,7 +70,7 @@ class SegmentWriter {
|
||||
WriteDeletedDocs();
|
||||
|
||||
private:
|
||||
storage::OperationPtr directory_ptr_;
|
||||
storage::FSHandlerPtr fs_ptr_;
|
||||
SegmentPtr segment_ptr_;
|
||||
};
|
||||
|
||||
|
||||
42
core/src/storage/FSHandler.h
Normal file
42
core/src/storage/FSHandler.h
Normal file
@ -0,0 +1,42 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "storage/IOReader.h"
|
||||
#include "storage/IOWriter.h"
|
||||
#include "storage/Operation.h"
|
||||
|
||||
namespace milvus {
|
||||
namespace storage {
|
||||
|
||||
struct FSHandler {
|
||||
IOReaderPtr reader_ptr_ = nullptr;
|
||||
IOWriterPtr writer_ptr_ = nullptr;
|
||||
OperationPtr operation_ptr_ = nullptr;
|
||||
|
||||
FSHandler(IOReaderPtr& reader_ptr, IOWriterPtr& writer_ptr, OperationPtr& operation_ptr)
|
||||
: reader_ptr_(reader_ptr), writer_ptr_(writer_ptr), operation_ptr_(operation_ptr) {
|
||||
}
|
||||
};
|
||||
|
||||
using FSHandlerPtr = std::shared_ptr<FSHandler>;
|
||||
|
||||
} // namespace storage
|
||||
} // namespace milvus
|
||||
@ -11,6 +11,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
namespace milvus {
|
||||
@ -34,5 +35,7 @@ class IOReader {
|
||||
close() = 0;
|
||||
};
|
||||
|
||||
using IOReaderPtr = std::shared_ptr<IOReader>;
|
||||
|
||||
} // namespace storage
|
||||
} // namespace milvus
|
||||
|
||||
@ -11,6 +11,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
namespace milvus {
|
||||
@ -31,5 +32,7 @@ class IOWriter {
|
||||
close() = 0;
|
||||
};
|
||||
|
||||
using IOWriterPtr = std::shared_ptr<IOWriter>;
|
||||
|
||||
} // namespace storage
|
||||
} // namespace milvus
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user