mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
enhance: use segment id and type to register in MmapChunkManager and opt malloc in variableChunk (#33993)
issue: https://github.com/milvus-io/milvus/issues/32984 Signed-off-by: cqy123456 <qianya.cheng@zilliz.com>
This commit is contained in:
parent
5be9929d64
commit
dc4437ff82
@ -22,6 +22,7 @@
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
// WARNING: do not change the enum value of Growing and Sealed
|
||||
enum SegmentType {
|
||||
Invalid = 0,
|
||||
Growing = 1,
|
||||
|
||||
@ -25,7 +25,7 @@ struct FixedLengthChunk {
|
||||
public:
|
||||
FixedLengthChunk() = delete;
|
||||
explicit FixedLengthChunk(const uint64_t size,
|
||||
storage::MmapChunkDescriptor descriptor)
|
||||
storage::MmapChunkDescriptorPtr descriptor)
|
||||
: mmap_descriptor_(descriptor), size_(size) {
|
||||
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
|
||||
data_ = (Type*)(mcm->Allocate(mmap_descriptor_, sizeof(Type) * size));
|
||||
@ -52,7 +52,7 @@ struct FixedLengthChunk {
|
||||
private:
|
||||
int64_t size_ = 0;
|
||||
Type* data_ = nullptr;
|
||||
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr;
|
||||
};
|
||||
/**
|
||||
* @brief VariableLengthChunk
|
||||
@ -64,7 +64,7 @@ struct VariableLengthChunk {
|
||||
public:
|
||||
VariableLengthChunk() = delete;
|
||||
explicit VariableLengthChunk(const uint64_t size,
|
||||
storage::MmapChunkDescriptor descriptor)
|
||||
storage::MmapChunkDescriptorPtr descriptor)
|
||||
: mmap_descriptor_(descriptor), size_(size) {
|
||||
data_ = FixedVector<ChunkViewType<Type>>(size);
|
||||
};
|
||||
@ -98,7 +98,7 @@ struct VariableLengthChunk {
|
||||
private:
|
||||
int64_t size_ = 0;
|
||||
FixedVector<ChunkViewType<Type>> data_;
|
||||
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr;
|
||||
};
|
||||
template <>
|
||||
inline void
|
||||
@ -113,13 +113,19 @@ VariableLengthChunk<std::string>::set(const std::string* src,
|
||||
length,
|
||||
begin,
|
||||
size_);
|
||||
size_t total_size = 0;
|
||||
size_t padding_size = 1;
|
||||
for (auto i = 0; i < length; i++) {
|
||||
auto buf_size = src[i].size() + 1;
|
||||
auto buf = (char*)mcm->Allocate(mmap_descriptor_, buf_size);
|
||||
AssertInfo(buf != nullptr,
|
||||
"failed to allocate memory from mmap_manager, error_code");
|
||||
std::strcpy(buf, src[i].c_str());
|
||||
data_[i + begin] = std::string_view(buf, src[i].size());
|
||||
total_size += src[i].size() + padding_size;
|
||||
}
|
||||
auto buf = (char*)mcm->Allocate(mmap_descriptor_, total_size);
|
||||
AssertInfo(buf != nullptr, "failed to allocate memory from mmap_manager.");
|
||||
for (auto i = 0, offset = 0; i < length; i++) {
|
||||
auto data_size = src[i].size() + padding_size;
|
||||
char* data_ptr = buf + offset;
|
||||
std::strcpy(data_ptr, src[i].c_str());
|
||||
data_[i + begin] = std::string_view(data_ptr, src[i].size());
|
||||
offset += data_size;
|
||||
}
|
||||
}
|
||||
template <>
|
||||
@ -141,14 +147,19 @@ VariableLengthChunk<Json>::set(const Json* src,
|
||||
length,
|
||||
begin,
|
||||
size_);
|
||||
size_t total_size = 0;
|
||||
size_t padding_size = simdjson::SIMDJSON_PADDING + 1;
|
||||
for (auto i = 0; i < length; i++) {
|
||||
auto buf_size = src[i].size() + simdjson::SIMDJSON_PADDING + 1;
|
||||
auto buf = (char*)mcm->Allocate(mmap_descriptor_, buf_size);
|
||||
AssertInfo(
|
||||
buf != nullptr,
|
||||
"failed to allocate memory from mmap_manager, error_code:{}");
|
||||
std::strcpy(buf, src[i].c_str());
|
||||
data_[i + begin] = Json(buf, src[i].size());
|
||||
total_size += src[i].size() + padding_size;
|
||||
}
|
||||
auto buf = (char*)mcm->Allocate(mmap_descriptor_, total_size);
|
||||
AssertInfo(buf != nullptr, "failed to allocate memory from mmap_manager.");
|
||||
for (auto i = 0, offset = 0; i < length; i++) {
|
||||
auto data_size = src[i].size() + padding_size;
|
||||
char* data_ptr = buf + offset;
|
||||
std::strcpy(data_ptr, src[i].c_str());
|
||||
data_[i + begin] = Json(data_ptr, src[i].size());
|
||||
offset += data_size;
|
||||
}
|
||||
}
|
||||
template <>
|
||||
@ -169,17 +180,22 @@ VariableLengthChunk<Array>::set(const Array* src,
|
||||
length,
|
||||
begin,
|
||||
size_);
|
||||
size_t total_size = 0;
|
||||
size_t padding_size = 0;
|
||||
for (auto i = 0; i < length; i++) {
|
||||
auto array_data =
|
||||
(char*)mcm->Allocate(mmap_descriptor_, src[i].byte_size());
|
||||
AssertInfo(array_data != nullptr,
|
||||
"failed to allocate memory from mmap_manager, error_code");
|
||||
std::copy(
|
||||
src[i].data(), src[i].data() + src[i].byte_size(), array_data);
|
||||
data_[i + begin] = ArrayView(array_data,
|
||||
src[i].byte_size(),
|
||||
total_size += src[i].byte_size() + padding_size;
|
||||
}
|
||||
auto buf = (char*)mcm->Allocate(mmap_descriptor_, total_size);
|
||||
AssertInfo(buf != nullptr, "failed to allocate memory from mmap_manager.");
|
||||
for (auto i = 0, offset = 0; i < length; i++) {
|
||||
auto data_size = src[i].byte_size() + padding_size;
|
||||
char* data_ptr = buf + offset;
|
||||
std::copy(src[i].data(), src[i].data() + src[i].byte_size(), data_ptr);
|
||||
data_[i + begin] = ArrayView(data_ptr,
|
||||
data_size,
|
||||
src[i].get_element_type(),
|
||||
src[i].get_offsets_in_copy());
|
||||
offset += data_size;
|
||||
}
|
||||
}
|
||||
template <>
|
||||
|
||||
@ -56,7 +56,8 @@ template <typename Type,
|
||||
bool IsMmap = false>
|
||||
class ThreadSafeChunkVector : public ChunkVectorBase<Type> {
|
||||
public:
|
||||
ThreadSafeChunkVector(storage::MmapChunkDescriptor descriptor = nullptr) {
|
||||
ThreadSafeChunkVector(
|
||||
storage::MmapChunkDescriptorPtr descriptor = nullptr) {
|
||||
mmap_descriptor_ = descriptor;
|
||||
}
|
||||
|
||||
@ -181,13 +182,13 @@ class ThreadSafeChunkVector : public ChunkVectorBase<Type> {
|
||||
|
||||
private:
|
||||
mutable std::shared_mutex mutex_;
|
||||
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr;
|
||||
std::deque<ChunkImpl> vec_;
|
||||
};
|
||||
|
||||
template <typename Type>
|
||||
ChunkVectorPtr<Type>
|
||||
SelectChunkVectorPtr(storage::MmapChunkDescriptor& mmap_descriptor) {
|
||||
SelectChunkVectorPtr(storage::MmapChunkDescriptorPtr& mmap_descriptor) {
|
||||
if constexpr (!IsVariableType<Type>) {
|
||||
if (mmap_descriptor != nullptr) {
|
||||
return std::make_unique<
|
||||
|
||||
@ -94,7 +94,7 @@ class ColumnBase {
|
||||
int dim,
|
||||
const DataType& data_type,
|
||||
storage::MmapChunkManagerPtr mcm,
|
||||
storage::MmapChunkDescriptor descriptor)
|
||||
storage::MmapChunkDescriptorPtr descriptor)
|
||||
: mcm_(mcm),
|
||||
mmap_descriptor_(descriptor),
|
||||
type_size_(GetDataTypeSize(data_type, dim)),
|
||||
@ -340,7 +340,7 @@ class ColumnBase {
|
||||
|
||||
// length in bytes
|
||||
size_t size_{0};
|
||||
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr;
|
||||
|
||||
private:
|
||||
void
|
||||
@ -401,7 +401,7 @@ class Column : public ColumnBase {
|
||||
int dim,
|
||||
const DataType& data_type,
|
||||
storage::MmapChunkManagerPtr mcm,
|
||||
storage::MmapChunkDescriptor descriptor)
|
||||
storage::MmapChunkDescriptorPtr descriptor)
|
||||
: ColumnBase(reserve, dim, data_type, mcm, descriptor) {
|
||||
}
|
||||
|
||||
@ -440,7 +440,7 @@ class SparseFloatColumn : public ColumnBase {
|
||||
int dim,
|
||||
const DataType& data_type,
|
||||
storage::MmapChunkManagerPtr mcm,
|
||||
storage::MmapChunkDescriptor descriptor)
|
||||
storage::MmapChunkDescriptorPtr descriptor)
|
||||
: ColumnBase(reserve, dim, data_type, mcm, descriptor) {
|
||||
}
|
||||
|
||||
@ -543,7 +543,7 @@ class VariableColumn : public ColumnBase {
|
||||
int dim,
|
||||
const DataType& data_type,
|
||||
storage::MmapChunkManagerPtr mcm,
|
||||
storage::MmapChunkDescriptor descriptor)
|
||||
storage::MmapChunkDescriptorPtr descriptor)
|
||||
: ColumnBase(reserve, dim, data_type, mcm, descriptor) {
|
||||
}
|
||||
|
||||
@ -658,7 +658,7 @@ class ArrayColumn : public ColumnBase {
|
||||
int dim,
|
||||
const DataType& data_type,
|
||||
storage::MmapChunkManagerPtr mcm,
|
||||
storage::MmapChunkDescriptor descriptor)
|
||||
storage::MmapChunkDescriptorPtr descriptor)
|
||||
: ColumnBase(reserve, dim, data_type, mcm, descriptor) {
|
||||
}
|
||||
|
||||
|
||||
@ -169,7 +169,7 @@ class ConcurrentVectorImpl : public VectorBase {
|
||||
explicit ConcurrentVectorImpl(
|
||||
ssize_t elements_per_row,
|
||||
int64_t size_per_chunk,
|
||||
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
|
||||
: VectorBase(size_per_chunk),
|
||||
elements_per_row_(is_type_entire_row ? 1 : elements_per_row) {
|
||||
chunks_ptr_ = SelectChunkVectorPtr<Type>(mmap_descriptor);
|
||||
@ -359,7 +359,7 @@ class ConcurrentVector : public ConcurrentVectorImpl<Type, true> {
|
||||
static_assert(IsScalar<Type> || std::is_same_v<Type, PkType>);
|
||||
explicit ConcurrentVector(
|
||||
int64_t size_per_chunk,
|
||||
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
|
||||
: ConcurrentVectorImpl<Type, true>::ConcurrentVectorImpl(
|
||||
1, size_per_chunk, mmap_descriptor) {
|
||||
}
|
||||
@ -371,7 +371,7 @@ class ConcurrentVector<std::string>
|
||||
public:
|
||||
explicit ConcurrentVector(
|
||||
int64_t size_per_chunk,
|
||||
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
|
||||
: ConcurrentVectorImpl<std::string, true>::ConcurrentVectorImpl(
|
||||
1, size_per_chunk, mmap_descriptor) {
|
||||
}
|
||||
@ -389,7 +389,7 @@ class ConcurrentVector<Json> : public ConcurrentVectorImpl<Json, true> {
|
||||
public:
|
||||
explicit ConcurrentVector(
|
||||
int64_t size_per_chunk,
|
||||
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
|
||||
: ConcurrentVectorImpl<Json, true>::ConcurrentVectorImpl(
|
||||
1, size_per_chunk, mmap_descriptor) {
|
||||
}
|
||||
@ -408,7 +408,7 @@ class ConcurrentVector<Array> : public ConcurrentVectorImpl<Array, true> {
|
||||
public:
|
||||
explicit ConcurrentVector(
|
||||
int64_t size_per_chunk,
|
||||
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
|
||||
: ConcurrentVectorImpl<Array, true>::ConcurrentVectorImpl(
|
||||
1, size_per_chunk, mmap_descriptor) {
|
||||
}
|
||||
@ -427,7 +427,7 @@ class ConcurrentVector<SparseFloatVector>
|
||||
public:
|
||||
explicit ConcurrentVector(
|
||||
int64_t size_per_chunk,
|
||||
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
|
||||
: ConcurrentVectorImpl<knowhere::sparse::SparseRow<float>,
|
||||
true>::ConcurrentVectorImpl(1,
|
||||
size_per_chunk,
|
||||
@ -465,7 +465,7 @@ class ConcurrentVector<FloatVector>
|
||||
public:
|
||||
ConcurrentVector(int64_t dim,
|
||||
int64_t size_per_chunk,
|
||||
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
|
||||
: ConcurrentVectorImpl<float, false>::ConcurrentVectorImpl(
|
||||
dim, size_per_chunk, mmap_descriptor) {
|
||||
}
|
||||
@ -478,7 +478,7 @@ class ConcurrentVector<BinaryVector>
|
||||
explicit ConcurrentVector(
|
||||
int64_t dim,
|
||||
int64_t size_per_chunk,
|
||||
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
|
||||
: ConcurrentVectorImpl(dim / 8, size_per_chunk, mmap_descriptor) {
|
||||
AssertInfo(dim % 8 == 0,
|
||||
fmt::format("dim is not a multiple of 8, dim={}", dim));
|
||||
@ -491,7 +491,7 @@ class ConcurrentVector<Float16Vector>
|
||||
public:
|
||||
ConcurrentVector(int64_t dim,
|
||||
int64_t size_per_chunk,
|
||||
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
|
||||
: ConcurrentVectorImpl<float16, false>::ConcurrentVectorImpl(
|
||||
dim, size_per_chunk, mmap_descriptor) {
|
||||
}
|
||||
@ -503,7 +503,7 @@ class ConcurrentVector<BFloat16Vector>
|
||||
public:
|
||||
ConcurrentVector(int64_t dim,
|
||||
int64_t size_per_chunk,
|
||||
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
|
||||
: ConcurrentVectorImpl<bfloat16, false>::ConcurrentVectorImpl(
|
||||
dim, size_per_chunk, mmap_descriptor) {
|
||||
}
|
||||
|
||||
@ -296,9 +296,10 @@ class OffsetOrderedArray : public OffsetMap {
|
||||
|
||||
template <bool is_sealed = false>
|
||||
struct InsertRecord {
|
||||
InsertRecord(const Schema& schema,
|
||||
const int64_t size_per_chunk,
|
||||
const storage::MmapChunkDescriptor mmap_descriptor = nullptr)
|
||||
InsertRecord(
|
||||
const Schema& schema,
|
||||
const int64_t size_per_chunk,
|
||||
const storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
|
||||
: timestamps_(size_per_chunk), mmap_descriptor_(mmap_descriptor) {
|
||||
std::optional<FieldId> pk_field_id = schema.get_primary_field_id();
|
||||
|
||||
@ -630,7 +631,7 @@ struct InsertRecord {
|
||||
private:
|
||||
std::unordered_map<FieldId, std::unique_ptr<VectorBase>> fields_data_{};
|
||||
mutable std::shared_mutex shared_mutex_{};
|
||||
storage::MmapChunkDescriptor mmap_descriptor_;
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor_;
|
||||
};
|
||||
|
||||
} // namespace milvus::segcore
|
||||
|
||||
@ -212,8 +212,8 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
: mmap_descriptor_(storage::MmapManager::GetInstance()
|
||||
.GetMmapConfig()
|
||||
.GetEnableGrowingMmap()
|
||||
? storage::MmapChunkDescriptor(
|
||||
new storage::MmapChunkDescriptorValue(
|
||||
? storage::MmapChunkDescriptorPtr(
|
||||
new storage::MmapChunkDescriptor(
|
||||
{segment_id, SegmentType::Growing}))
|
||||
: nullptr),
|
||||
segcore_config_(segcore_config),
|
||||
@ -317,7 +317,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
}
|
||||
|
||||
private:
|
||||
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr;
|
||||
SegcoreConfig segcore_config_;
|
||||
SchemaPtr schema_;
|
||||
IndexMetaPtr index_meta_;
|
||||
|
||||
@ -821,7 +821,7 @@ SegmentSealedImpl::GetFieldDataPath(FieldId field_id, int64_t offset) const {
|
||||
std::tuple<std::string, std::shared_ptr<ColumnBase>> static ReadFromChunkCache(
|
||||
const storage::ChunkCachePtr& cc,
|
||||
const std::string& data_path,
|
||||
const storage::MmapChunkDescriptor& descriptor) {
|
||||
const storage::MmapChunkDescriptorPtr& descriptor) {
|
||||
auto column = cc->Read(data_path, descriptor);
|
||||
cc->Prefetch(data_path);
|
||||
return {data_path, column};
|
||||
@ -1032,9 +1032,8 @@ SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema,
|
||||
id_(segment_id),
|
||||
col_index_meta_(index_meta),
|
||||
TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve) {
|
||||
mmap_descriptor_ = std::shared_ptr<storage::MmapChunkDescriptorValue>(
|
||||
new storage::MmapChunkDescriptorValue(
|
||||
{segment_id, SegmentType::Sealed}));
|
||||
mmap_descriptor_ = std::shared_ptr<storage::MmapChunkDescriptor>(
|
||||
new storage::MmapChunkDescriptor({segment_id, SegmentType::Sealed}));
|
||||
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
|
||||
mcm->Register(mmap_descriptor_);
|
||||
}
|
||||
|
||||
@ -280,7 +280,7 @@ class SegmentSealedImpl : public SegmentSealed {
|
||||
|
||||
private:
|
||||
// mmap descriptor, used in chunk cache
|
||||
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
|
||||
storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr;
|
||||
// segment loading state
|
||||
BitsetType field_data_ready_bitset_;
|
||||
BitsetType index_ready_bitset_;
|
||||
|
||||
@ -21,7 +21,7 @@
|
||||
namespace milvus::storage {
|
||||
std::shared_ptr<ColumnBase>
|
||||
ChunkCache::Read(const std::string& filepath,
|
||||
const MmapChunkDescriptor& descriptor) {
|
||||
const MmapChunkDescriptorPtr& descriptor) {
|
||||
{
|
||||
std::shared_lock lck(mutex_);
|
||||
auto it = columns_.find(filepath);
|
||||
@ -75,7 +75,7 @@ ChunkCache::Prefetch(const std::string& filepath) {
|
||||
|
||||
std::shared_ptr<ColumnBase>
|
||||
ChunkCache::Mmap(const FieldDataPtr& field_data,
|
||||
const MmapChunkDescriptor& descriptor) {
|
||||
const MmapChunkDescriptorPtr& descriptor) {
|
||||
auto dim = field_data->get_dim();
|
||||
auto data_type = field_data->get_data_type();
|
||||
|
||||
|
||||
@ -43,7 +43,7 @@ class ChunkCache {
|
||||
|
||||
public:
|
||||
std::shared_ptr<ColumnBase>
|
||||
Read(const std::string& filepath, const MmapChunkDescriptor& descriptor);
|
||||
Read(const std::string& filepath, const MmapChunkDescriptorPtr& descriptor);
|
||||
|
||||
void
|
||||
Remove(const std::string& filepath);
|
||||
@ -53,7 +53,8 @@ class ChunkCache {
|
||||
|
||||
private:
|
||||
std::shared_ptr<ColumnBase>
|
||||
Mmap(const FieldDataPtr& field_data, const MmapChunkDescriptor& descriptor);
|
||||
Mmap(const FieldDataPtr& field_data,
|
||||
const MmapChunkDescriptorPtr& descriptor);
|
||||
|
||||
std::string
|
||||
CachePath(const std::string& filepath);
|
||||
|
||||
@ -217,43 +217,53 @@ MmapChunkManager::~MmapChunkManager() {
|
||||
}
|
||||
|
||||
void
|
||||
MmapChunkManager::Register(const MmapChunkDescriptor key) {
|
||||
if (HasKey(key)) {
|
||||
LOG_WARN("key has exist in growing mmap manager");
|
||||
MmapChunkManager::Register(const MmapChunkDescriptorPtr descriptor) {
|
||||
if (HasRegister(descriptor)) {
|
||||
LOG_WARN("descriptor has exist in MmapChunkManager");
|
||||
return;
|
||||
}
|
||||
AssertInfo(
|
||||
descriptor->segment_type == SegmentType::Growing ||
|
||||
descriptor->segment_type == SegmentType::Sealed,
|
||||
"only register for growing or sealed segment in MmapChunkManager");
|
||||
std::unique_lock<std::shared_mutex> lck(mtx_);
|
||||
blocks_table_.emplace(key, std::vector<MmapBlockPtr>());
|
||||
blocks_table_.emplace(*descriptor.get(), std::vector<MmapBlockPtr>());
|
||||
return;
|
||||
}
|
||||
|
||||
void
|
||||
MmapChunkManager::UnRegister(const MmapChunkDescriptor key) {
|
||||
MmapChunkManager::UnRegister(const MmapChunkDescriptorPtr descriptor) {
|
||||
std::unique_lock<std::shared_mutex> lck(mtx_);
|
||||
if (blocks_table_.find(key) != blocks_table_.end()) {
|
||||
auto& blocks = blocks_table_[key];
|
||||
MmapChunkDescriptor blocks_table_key = *descriptor.get();
|
||||
if (blocks_table_.find(blocks_table_key) != blocks_table_.end()) {
|
||||
auto& blocks = blocks_table_[blocks_table_key];
|
||||
for (auto i = 0; i < blocks.size(); i++) {
|
||||
blocks_handler_->Deallocate(std::move(blocks[i]));
|
||||
}
|
||||
blocks_table_.erase(key);
|
||||
blocks_table_.erase(blocks_table_key);
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
MmapChunkManager::HasKey(const MmapChunkDescriptor key) {
|
||||
MmapChunkManager::HasRegister(const MmapChunkDescriptorPtr descriptor) {
|
||||
std::shared_lock<std::shared_mutex> lck(mtx_);
|
||||
return (blocks_table_.find(key) != blocks_table_.end());
|
||||
return (blocks_table_.find(*descriptor.get()) != blocks_table_.end());
|
||||
}
|
||||
|
||||
void*
|
||||
MmapChunkManager::Allocate(const MmapChunkDescriptor key, const uint64_t size) {
|
||||
AssertInfo(HasKey(key), "key {} has not been register.", key->segment_id);
|
||||
MmapChunkManager::Allocate(const MmapChunkDescriptorPtr descriptor,
|
||||
const uint64_t size) {
|
||||
AssertInfo(HasRegister(descriptor),
|
||||
"descriptor {} has not been register.",
|
||||
descriptor->segment_id);
|
||||
std::unique_lock<std::shared_mutex> lck(mtx_);
|
||||
auto blocks_table_key = *descriptor.get();
|
||||
if (size < blocks_handler_->GetFixFileSize()) {
|
||||
// find a place to fit in
|
||||
for (auto block_id = 0; block_id < blocks_table_[key].size();
|
||||
for (auto block_id = 0;
|
||||
block_id < blocks_table_[blocks_table_key].size();
|
||||
block_id++) {
|
||||
auto addr = blocks_table_[key][block_id]->Get(size);
|
||||
auto addr = blocks_table_[blocks_table_key][block_id]->Get(size);
|
||||
if (addr != nullptr) {
|
||||
return addr;
|
||||
}
|
||||
@ -263,14 +273,14 @@ MmapChunkManager::Allocate(const MmapChunkDescriptor key, const uint64_t size) {
|
||||
AssertInfo(new_block != nullptr, "new mmap_block can't be nullptr");
|
||||
auto addr = new_block->Get(size);
|
||||
AssertInfo(addr != nullptr, "fail to allocate from mmap block.");
|
||||
blocks_table_[key].emplace_back(std::move(new_block));
|
||||
blocks_table_[blocks_table_key].emplace_back(std::move(new_block));
|
||||
return addr;
|
||||
} else {
|
||||
auto new_block = blocks_handler_->AllocateLargeBlock(size);
|
||||
AssertInfo(new_block != nullptr, "new mmap_block can't be nullptr");
|
||||
auto addr = new_block->Get(size);
|
||||
AssertInfo(addr != nullptr, "fail to allocate from mmap block.");
|
||||
blocks_table_[key].emplace_back(std::move(new_block));
|
||||
blocks_table_[blocks_table_key].emplace_back(std::move(new_block));
|
||||
return addr;
|
||||
}
|
||||
}
|
||||
|
||||
@ -30,18 +30,24 @@
|
||||
#include "storage/LocalChunkManagerSingleton.h"
|
||||
namespace milvus::storage {
|
||||
// use segment id and segment type to descripe a segment in mmap chunk manager, segment only in two type (growing or sealed) in mmap chunk manager
|
||||
struct MmapChunkDescriptorValue {
|
||||
struct MmapChunkDescriptor {
|
||||
struct DescriptorHash {
|
||||
size_t
|
||||
operator()(const MmapChunkDescriptor& x) const {
|
||||
//SegmentType::Growing = 0x01,SegmentType::Sealed = 0x10
|
||||
size_t sign = ((size_t)x.segment_type) << (sizeof(size_t) * 8 - 1);
|
||||
return ((size_t)x.segment_id) | sign;
|
||||
}
|
||||
};
|
||||
bool
|
||||
operator==(const MmapChunkDescriptor& x) const {
|
||||
return segment_id == x.segment_id && segment_type == x.segment_type;
|
||||
}
|
||||
int64_t segment_id;
|
||||
SegmentType segment_type;
|
||||
};
|
||||
using MmapChunkDescriptor = std::shared_ptr<MmapChunkDescriptorValue>;
|
||||
using MmapChunkDescriptorPtr = std::shared_ptr<MmapChunkDescriptor>;
|
||||
|
||||
struct DescriptorHash {
|
||||
size_t
|
||||
operator()(const MmapChunkDescriptor& x) const {
|
||||
return x->segment_id * 10 + (size_t)x->segment_type;
|
||||
}
|
||||
};
|
||||
/**
|
||||
* @brief MmapBlock is a basic unit of MmapChunkManager. It handle all memory mmaping in one tmp file.
|
||||
* static function(TotalBlocksSize) is used to get total files size of chunk mmap.
|
||||
@ -175,13 +181,13 @@ class MmapChunkManager {
|
||||
const uint64_t file_size);
|
||||
~MmapChunkManager();
|
||||
void
|
||||
Register(const MmapChunkDescriptor key);
|
||||
Register(const MmapChunkDescriptorPtr descriptor);
|
||||
void
|
||||
UnRegister(const MmapChunkDescriptor key);
|
||||
UnRegister(const MmapChunkDescriptorPtr descriptor);
|
||||
bool
|
||||
HasKey(const MmapChunkDescriptor key);
|
||||
HasRegister(const MmapChunkDescriptorPtr descriptor);
|
||||
void*
|
||||
Allocate(const MmapChunkDescriptor key, const uint64_t size);
|
||||
Allocate(const MmapChunkDescriptorPtr descriptor, const uint64_t size);
|
||||
uint64_t
|
||||
GetDiskAllocSize() {
|
||||
std::shared_lock<std::shared_mutex> lck(mtx_);
|
||||
@ -205,7 +211,7 @@ class MmapChunkManager {
|
||||
mutable std::shared_mutex mtx_;
|
||||
std::unordered_map<MmapChunkDescriptor,
|
||||
std::vector<MmapBlockPtr>,
|
||||
DescriptorHash>
|
||||
MmapChunkDescriptor::DescriptorHash>
|
||||
blocks_table_;
|
||||
std::unique_ptr<MmapBlocksHandler> blocks_handler_ = nullptr;
|
||||
std::string mmap_file_prefix_;
|
||||
|
||||
@ -70,6 +70,7 @@ set(MILVUS_TEST_FILES
|
||||
test_futures.cpp
|
||||
test_array_inverted_index.cpp
|
||||
test_chunk_vector.cpp
|
||||
test_mmap_chunk_manager.cpp
|
||||
)
|
||||
|
||||
if ( INDEX_ENGINE STREQUAL "cardinal" )
|
||||
|
||||
@ -38,14 +38,13 @@ class ChunkCacheTest : public testing::Test {
|
||||
TearDown() override {
|
||||
mcm->UnRegister(descriptor);
|
||||
}
|
||||
const char* local_storage_path = "/tmp/test_chunk_cache/local";
|
||||
const char* file_name = "chunk_cache_test/insert_log/2/101/1000000";
|
||||
milvus::storage::MmapChunkManagerPtr mcm;
|
||||
milvus::segcore::SegcoreConfig config;
|
||||
milvus::storage::MmapChunkDescriptor descriptor =
|
||||
std::shared_ptr<milvus::storage::MmapChunkDescriptorValue>(
|
||||
new milvus::storage::MmapChunkDescriptorValue(
|
||||
{111, SegmentType::Sealed}));
|
||||
milvus::storage::MmapChunkDescriptorPtr descriptor =
|
||||
std::shared_ptr<milvus::storage::MmapChunkDescriptor>(
|
||||
new milvus::storage::MmapChunkDescriptor(
|
||||
{101, SegmentType::Sealed}));
|
||||
};
|
||||
|
||||
TEST_F(ChunkCacheTest, Read) {
|
||||
@ -53,9 +52,6 @@ TEST_F(ChunkCacheTest, Read) {
|
||||
auto dim = 128;
|
||||
auto metric_type = knowhere::metric::L2;
|
||||
|
||||
milvus::storage::LocalChunkManagerSingleton::GetInstance().Init(
|
||||
local_storage_path);
|
||||
|
||||
auto schema = std::make_shared<milvus::Schema>();
|
||||
auto fake_id = schema->AddDebugField(
|
||||
"fakevec", milvus::DataType::VECTOR_FLOAT, dim, metric_type);
|
||||
@ -87,8 +83,6 @@ TEST_F(ChunkCacheTest, Read) {
|
||||
|
||||
auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache();
|
||||
const auto& column = cc->Read(file_name, descriptor);
|
||||
std::cout << "column->ByteSize() :" << column->ByteSize() << " "
|
||||
<< dim * N * 4 << std::endl;
|
||||
Assert(column->ByteSize() == dim * N * 4);
|
||||
|
||||
auto actual = (float*)column->Data();
|
||||
@ -106,9 +100,6 @@ TEST_F(ChunkCacheTest, TestMultithreads) {
|
||||
auto dim = 128;
|
||||
auto metric_type = knowhere::metric::L2;
|
||||
|
||||
milvus::storage::LocalChunkManagerSingleton::GetInstance().Init(
|
||||
local_storage_path);
|
||||
|
||||
auto schema = std::make_shared<milvus::Schema>();
|
||||
auto fake_id = schema->AddDebugField(
|
||||
"fakevec", milvus::DataType::VECTOR_FLOAT, dim, metric_type);
|
||||
@ -143,7 +134,6 @@ TEST_F(ChunkCacheTest, TestMultithreads) {
|
||||
constexpr int threads = 16;
|
||||
std::vector<int64_t> total_counts(threads);
|
||||
auto executor = [&](int thread_id) {
|
||||
std::cout << "thread id" << thread_id << " read data" << std::endl;
|
||||
const auto& column = cc->Read(file_name, descriptor);
|
||||
Assert(column->ByteSize() == dim * N * 4);
|
||||
|
||||
|
||||
45
internal/core/unittest/test_mmap_chunk_manager.cpp
Normal file
45
internal/core/unittest/test_mmap_chunk_manager.cpp
Normal file
@ -0,0 +1,45 @@
|
||||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include "storage/MmapManager.h"
|
||||
/*
|
||||
checking register function of mmap chunk manager
|
||||
*/
|
||||
TEST(MmapChunkManager, Register) {
|
||||
auto mcm =
|
||||
milvus::storage::MmapManager::GetInstance().GetMmapChunkManager();
|
||||
auto get_descriptor =
|
||||
[](int64_t seg_id,
|
||||
SegmentType seg_type) -> milvus::storage::MmapChunkDescriptorPtr {
|
||||
return std::shared_ptr<milvus::storage::MmapChunkDescriptor>(
|
||||
new milvus::storage::MmapChunkDescriptor({seg_id, seg_type}));
|
||||
};
|
||||
int64_t segment_id = 0x0000456789ABCDEF;
|
||||
int64_t flow_segment_id = 0x8000456789ABCDEF;
|
||||
mcm->Register(get_descriptor(segment_id, SegmentType::Growing));
|
||||
ASSERT_TRUE(
|
||||
mcm->HasRegister(get_descriptor(segment_id, SegmentType::Growing)));
|
||||
ASSERT_FALSE(
|
||||
mcm->HasRegister(get_descriptor(segment_id, SegmentType::Sealed)));
|
||||
mcm->Register(get_descriptor(segment_id, SegmentType::Sealed));
|
||||
ASSERT_FALSE(mcm->HasRegister(
|
||||
get_descriptor(flow_segment_id, SegmentType::Growing)));
|
||||
ASSERT_FALSE(
|
||||
mcm->HasRegister(get_descriptor(flow_segment_id, SegmentType::Sealed)));
|
||||
|
||||
mcm->UnRegister(get_descriptor(segment_id, SegmentType::Sealed));
|
||||
ASSERT_TRUE(
|
||||
mcm->HasRegister(get_descriptor(segment_id, SegmentType::Growing)));
|
||||
ASSERT_FALSE(
|
||||
mcm->HasRegister(get_descriptor(segment_id, SegmentType::Sealed)));
|
||||
mcm->UnRegister(get_descriptor(segment_id, SegmentType::Growing));
|
||||
}
|
||||
@ -15,7 +15,6 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/datanode/allocator"
|
||||
"github.com/milvus-io/milvus/internal/datanode/metacache"
|
||||
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
|
||||
"github.com/milvus-io/milvus/internal/util/initcore"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/hardware"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
@ -54,7 +53,6 @@ func (s *ManagerSuite) SetupSuite() {
|
||||
},
|
||||
},
|
||||
}
|
||||
initcore.InitMmapManager(paramtable.Get())
|
||||
|
||||
s.channelName = "by-dev-rootcoord-dml_0_100_v0"
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user