mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance:mmapchunkmanager allocates MmapChunkDescriptor itself (#42150)
issue: https://github.com/milvus-io/milvus/issues/42157 Signed-off-by: cqy123456 <qianya.cheng@zilliz.com>
This commit is contained in:
parent
5a355d1e57
commit
727f4ec24b
@ -930,10 +930,8 @@ ChunkedSegmentSealedImpl::ChunkedSegmentSealedImpl(
|
||||
return this->search_pk(pk, timestamp);
|
||||
},
|
||||
segment_id) {
|
||||
mmap_descriptor_ = std::shared_ptr<storage::MmapChunkDescriptor>(
|
||||
new storage::MmapChunkDescriptor({segment_id, SegmentType::Sealed}));
|
||||
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
|
||||
mcm->Register(mmap_descriptor_);
|
||||
mmap_descriptor_ = mcm->Register();
|
||||
}
|
||||
|
||||
ChunkedSegmentSealedImpl::~ChunkedSegmentSealedImpl() {
|
||||
@ -1478,7 +1476,8 @@ ChunkedSegmentSealedImpl::HasRawData(int64_t field_id) const {
|
||||
SemiInlineGet(vector_indexings_.get_field_indexing(fieldID)
|
||||
->indexing_->PinCells({0}));
|
||||
auto vec_index = accessor->get_cell_of(0);
|
||||
return vec_index->HasRawData() || get_bit(field_data_ready_bitset_, fieldID);
|
||||
return vec_index->HasRawData() ||
|
||||
get_bit(field_data_ready_bitset_, fieldID);
|
||||
}
|
||||
} else if (IsJsonDataType(field_meta.get_data_type())) {
|
||||
return get_bit(field_data_ready_bitset_, fieldID);
|
||||
|
||||
@ -266,9 +266,9 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
IndexMetaPtr indexMeta,
|
||||
const SegcoreConfig& segcore_config,
|
||||
int64_t segment_id)
|
||||
: mmap_descriptor_(
|
||||
storage::MmapChunkDescriptorPtr(new storage::MmapChunkDescriptor(
|
||||
{segment_id, SegmentType::Growing}))),
|
||||
: mmap_descriptor_(storage::MmapManager::GetInstance()
|
||||
.GetMmapChunkManager()
|
||||
->Register()),
|
||||
segcore_config_(segcore_config),
|
||||
schema_(std::move(schema)),
|
||||
index_meta_(indexMeta),
|
||||
@ -283,9 +283,6 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
return this->search_pk(pk, timestamp);
|
||||
},
|
||||
segment_id) {
|
||||
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
|
||||
mcm->Register(mmap_descriptor_);
|
||||
|
||||
this->CreateTextIndexes();
|
||||
this->CreateJSONIndexes();
|
||||
}
|
||||
|
||||
@ -236,38 +236,44 @@ MmapChunkManager::~MmapChunkManager() {
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
MmapChunkManager::Register(const MmapChunkDescriptorPtr descriptor) {
|
||||
if (HasRegister(descriptor)) {
|
||||
LOG_WARN("descriptor has exist in MmapChunkManager");
|
||||
return;
|
||||
}
|
||||
AssertInfo(
|
||||
descriptor->segment_type == SegmentType::Growing ||
|
||||
descriptor->segment_type == SegmentType::Sealed,
|
||||
"only register for growing or sealed segment in MmapChunkManager");
|
||||
MmapChunkDescriptorPtr
|
||||
MmapChunkManager::Register() {
|
||||
std::unique_lock<std::shared_mutex> lck(mtx_);
|
||||
blocks_table_.emplace(*descriptor.get(), std::vector<MmapBlockPtr>());
|
||||
return;
|
||||
auto new_descriptor = std::shared_ptr<MmapChunkDescriptor>(
|
||||
new MmapChunkDescriptor(descriptor_counter_.load()),
|
||||
[this](MmapChunkDescriptor* ptr) {
|
||||
UnRegister(ptr->GetId());
|
||||
delete ptr;
|
||||
});
|
||||
descriptor_counter_.fetch_add(1);
|
||||
blocks_table_.emplace(new_descriptor->GetId(), std::vector<MmapBlockPtr>());
|
||||
return new_descriptor;
|
||||
}
|
||||
|
||||
void
|
||||
MmapChunkManager::UnRegister(const MmapChunkDescriptorPtr descriptor) {
|
||||
AssertInfo(descriptor != nullptr, "fail to unregister a nullptr.");
|
||||
MmapChunkDescriptor::ID blocks_table_key = descriptor->GetId();
|
||||
UnRegister(blocks_table_key);
|
||||
}
|
||||
|
||||
void
|
||||
MmapChunkManager::UnRegister(
|
||||
const MmapChunkDescriptor::ID descriptor_inner_id) {
|
||||
std::unique_lock<std::shared_mutex> lck(mtx_);
|
||||
MmapChunkDescriptor blocks_table_key = *descriptor.get();
|
||||
if (blocks_table_.find(blocks_table_key) != blocks_table_.end()) {
|
||||
auto& blocks = blocks_table_[blocks_table_key];
|
||||
if (blocks_table_.find(descriptor_inner_id) != blocks_table_.end()) {
|
||||
auto& blocks = blocks_table_[descriptor_inner_id];
|
||||
for (auto i = 0; i < blocks.size(); i++) {
|
||||
blocks_handler_->Deallocate(std::move(blocks[i]));
|
||||
}
|
||||
blocks_table_.erase(blocks_table_key);
|
||||
blocks_table_.erase(descriptor_inner_id);
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
MmapChunkManager::HasRegister(const MmapChunkDescriptorPtr descriptor) {
|
||||
std::shared_lock<std::shared_mutex> lck(mtx_);
|
||||
return (blocks_table_.find(*descriptor.get()) != blocks_table_.end());
|
||||
return (blocks_table_.find(descriptor->GetId()) != blocks_table_.end());
|
||||
}
|
||||
|
||||
void*
|
||||
@ -275,9 +281,9 @@ MmapChunkManager::Allocate(const MmapChunkDescriptorPtr descriptor,
|
||||
const uint64_t size) {
|
||||
AssertInfo(HasRegister(descriptor),
|
||||
"descriptor {} has not been register.",
|
||||
descriptor->segment_id);
|
||||
descriptor->GetId());
|
||||
std::unique_lock<std::shared_mutex> lck(mtx_);
|
||||
auto blocks_table_key = *descriptor.get();
|
||||
auto blocks_table_key = descriptor->GetId();
|
||||
if (size < blocks_handler_->GetFixFileSize()) {
|
||||
// find a place to fit in
|
||||
for (auto block_id = 0;
|
||||
@ -319,6 +325,7 @@ MmapChunkManager::MmapChunkManager(std::string root_path,
|
||||
cm->RemoveDir(root_path);
|
||||
}
|
||||
cm->CreateDir(root_path);
|
||||
this->descriptor_counter_.store(0);
|
||||
LOG_INFO(
|
||||
"Init MappChunkManager with: Path {}, MaxDiskSize {} MB, "
|
||||
"FixedFileSize {} MB.",
|
||||
|
||||
@ -29,22 +29,26 @@
|
||||
#include "common/type_c.h"
|
||||
#include "storage/LocalChunkManagerSingleton.h"
|
||||
namespace milvus::storage {
|
||||
// use segment id and segment type to descripe a segment in mmap chunk manager, segment only in two type (growing or sealed) in mmap chunk manager
|
||||
struct MmapChunkDescriptor {
|
||||
struct DescriptorHash {
|
||||
size_t
|
||||
operator()(const MmapChunkDescriptor& x) const {
|
||||
//SegmentType::Growing = 0x01,SegmentType::Sealed = 0x10
|
||||
size_t sign = ((size_t)x.segment_type) << (sizeof(size_t) * 8 - 1);
|
||||
return ((size_t)x.segment_id) | sign;
|
||||
public:
|
||||
using ID = uint64_t;
|
||||
MmapChunkDescriptor(const MmapChunkDescriptor&) = delete;
|
||||
MmapChunkDescriptor&
|
||||
operator=(const MmapChunkDescriptor&) = delete;
|
||||
|
||||
protected:
|
||||
friend class
|
||||
MmapChunkManager; // only MmapChunkManager can create MmapChunkDescriptor
|
||||
const size_t key_id_;
|
||||
|
||||
protected:
|
||||
ID
|
||||
GetId() const {
|
||||
return key_id_;
|
||||
}
|
||||
};
|
||||
bool
|
||||
operator==(const MmapChunkDescriptor& x) const {
|
||||
return segment_id == x.segment_id && segment_type == x.segment_type;
|
||||
explicit MmapChunkDescriptor(size_t id) : key_id_(id) {
|
||||
}
|
||||
int64_t segment_id;
|
||||
SegmentType segment_type;
|
||||
~MmapChunkDescriptor() = default;
|
||||
};
|
||||
using MmapChunkDescriptorPtr = std::shared_ptr<MmapChunkDescriptor>;
|
||||
|
||||
@ -180,8 +184,8 @@ class MmapChunkManager {
|
||||
const uint64_t disk_limit,
|
||||
const uint64_t file_size);
|
||||
~MmapChunkManager();
|
||||
void
|
||||
Register(const MmapChunkDescriptorPtr descriptor);
|
||||
MmapChunkDescriptorPtr
|
||||
Register();
|
||||
void
|
||||
UnRegister(const MmapChunkDescriptorPtr descriptor);
|
||||
bool
|
||||
@ -207,14 +211,17 @@ class MmapChunkManager {
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
void
|
||||
UnRegister(const MmapChunkDescriptor::ID descriptor_inner_id);
|
||||
|
||||
private:
|
||||
mutable std::shared_mutex mtx_;
|
||||
std::unordered_map<MmapChunkDescriptor,
|
||||
std::vector<MmapBlockPtr>,
|
||||
MmapChunkDescriptor::DescriptorHash>
|
||||
std::unordered_map<MmapChunkDescriptor::ID, std::vector<MmapBlockPtr>>
|
||||
blocks_table_;
|
||||
std::unique_ptr<MmapBlocksHandler> blocks_handler_ = nullptr;
|
||||
std::string mmap_file_prefix_;
|
||||
std::atomic<uint64_t> descriptor_counter_;
|
||||
};
|
||||
using MmapChunkManagerPtr = std::shared_ptr<MmapChunkManager>;
|
||||
} // namespace milvus::storage
|
||||
@ -17,29 +17,7 @@ checking register function of mmap chunk manager
|
||||
TEST(MmapChunkManager, Register) {
|
||||
auto mcm =
|
||||
milvus::storage::MmapManager::GetInstance().GetMmapChunkManager();
|
||||
auto get_descriptor =
|
||||
[](int64_t seg_id,
|
||||
SegmentType seg_type) -> milvus::storage::MmapChunkDescriptorPtr {
|
||||
return std::shared_ptr<milvus::storage::MmapChunkDescriptor>(
|
||||
new milvus::storage::MmapChunkDescriptor({seg_id, seg_type}));
|
||||
};
|
||||
int64_t segment_id = 0x0000456789ABCDEF;
|
||||
int64_t flow_segment_id = 0x8000456789ABCDEF;
|
||||
mcm->Register(get_descriptor(segment_id, SegmentType::Growing));
|
||||
ASSERT_TRUE(
|
||||
mcm->HasRegister(get_descriptor(segment_id, SegmentType::Growing)));
|
||||
ASSERT_FALSE(
|
||||
mcm->HasRegister(get_descriptor(segment_id, SegmentType::Sealed)));
|
||||
mcm->Register(get_descriptor(segment_id, SegmentType::Sealed));
|
||||
ASSERT_FALSE(mcm->HasRegister(
|
||||
get_descriptor(flow_segment_id, SegmentType::Growing)));
|
||||
ASSERT_FALSE(
|
||||
mcm->HasRegister(get_descriptor(flow_segment_id, SegmentType::Sealed)));
|
||||
|
||||
mcm->UnRegister(get_descriptor(segment_id, SegmentType::Sealed));
|
||||
ASSERT_TRUE(
|
||||
mcm->HasRegister(get_descriptor(segment_id, SegmentType::Growing)));
|
||||
ASSERT_FALSE(
|
||||
mcm->HasRegister(get_descriptor(segment_id, SegmentType::Sealed)));
|
||||
mcm->UnRegister(get_descriptor(segment_id, SegmentType::Growing));
|
||||
auto segment_descriptor = mcm->Register();
|
||||
ASSERT_TRUE(mcm->HasRegister(segment_descriptor));
|
||||
ASSERT_NO_THROW(mcm->UnRegister(segment_descriptor));
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user