enhance: mmap once for each group chunk (#45487)

issue: #45486

This commit refactors the chunk writing system by introducing a
two-phase
approach: size calculation followed by writing to a target. This enables
efficient group chunk creation where multiple fields share a single mmap
region, significantly reducing the number of mmap system calls and VMAs.

- Optimize `mmap` usage: single `mmap` per group chunk instead of per
field
- Split ChunkWriter into two phases:
  - `calculate_size()`: Pre-compute required memory without allocation
  - `write_to_target()`: Write data to a provided ChunkTarget
- Implement `ChunkMmapGuard` for unified mmap region lifecycle
management
  - Handles `munmap` and file cleanup via RAII
  - Shared via `std::shared_ptr` across multiple chunks in a group

Signed-off-by: Shawn Wang <shawn.wang@zilliz.com>

---------

Signed-off-by: Shawn Wang <shawn.wang@zilliz.com>
This commit is contained in:
sparknack 2025-11-26 10:37:08 +08:00 committed by GitHub
parent 0392db6976
commit 4b14ab14e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 899 additions and 577 deletions

View File

@ -38,6 +38,39 @@ namespace milvus {
constexpr uint64_t MMAP_STRING_PADDING = 1; constexpr uint64_t MMAP_STRING_PADDING = 1;
constexpr uint64_t MMAP_GEOMETRY_PADDING = 1; constexpr uint64_t MMAP_GEOMETRY_PADDING = 1;
constexpr uint64_t MMAP_ARRAY_PADDING = 1; constexpr uint64_t MMAP_ARRAY_PADDING = 1;
// Shared mmap region manager for group chunks
class ChunkMmapGuard {
public:
ChunkMmapGuard(char* mmap_ptr, size_t mmap_size, std::string file_path)
: mmap_ptr_(mmap_ptr), mmap_size_(mmap_size), file_path_(file_path) {
}
~ChunkMmapGuard() {
if (mmap_ptr_ != nullptr) {
munmap(mmap_ptr_, mmap_size_);
}
if (!file_path_.empty()) {
unlink(file_path_.c_str());
}
}
char*
get_ptr() const {
return mmap_ptr_;
}
bool
is_file_backed() const {
return !file_path_.empty();
}
private:
char* mmap_ptr_;
size_t mmap_size_;
const std::string file_path_;
};
class Chunk { class Chunk {
public: public:
Chunk() = default; Chunk() = default;
@ -45,12 +78,12 @@ class Chunk {
char* data, char* data,
uint64_t size, uint64_t size,
bool nullable, bool nullable,
std::unique_ptr<MmapFileRAII> mmap_file_raii = nullptr) std::shared_ptr<ChunkMmapGuard> chunk_mmap_guard)
: data_(data), : data_(data),
row_nums_(row_nums), row_nums_(row_nums),
size_(size), size_(size),
nullable_(nullable), nullable_(nullable),
mmap_file_raii_(std::move(mmap_file_raii)) { chunk_mmap_guard_(chunk_mmap_guard) {
if (nullable) { if (nullable) {
valid_.reserve(row_nums); valid_.reserve(row_nums);
for (int i = 0; i < row_nums; i++) { for (int i = 0; i < row_nums; i++) {
@ -59,7 +92,7 @@ class Chunk {
} }
} }
virtual ~Chunk() { virtual ~Chunk() {
munmap(data_, size_); // The ChunkMmapGuard will handle the unmapping and unlinking of the file if it is file backed
} }
uint64_t uint64_t
@ -69,7 +102,7 @@ class Chunk {
cachinglayer::ResourceUsage cachinglayer::ResourceUsage
CellByteSize() const { CellByteSize() const {
if (mmap_file_raii_) { if (chunk_mmap_guard_ && chunk_mmap_guard_->is_file_backed()) {
return cachinglayer::ResourceUsage(0, static_cast<int64_t>(size_)); return cachinglayer::ResourceUsage(0, static_cast<int64_t>(size_));
} }
return cachinglayer::ResourceUsage(static_cast<int64_t>(size_), 0); return cachinglayer::ResourceUsage(static_cast<int64_t>(size_), 0);
@ -109,7 +142,7 @@ class Chunk {
FixedVector<bool> FixedVector<bool>
valid_; // parse null bitmap to valid_ to be compatible with SpanBase valid_; // parse null bitmap to valid_ to be compatible with SpanBase
std::unique_ptr<MmapFileRAII> mmap_file_raii_; std::shared_ptr<ChunkMmapGuard> chunk_mmap_guard_{nullptr};
}; };
// for fixed size data, includes fixed size array // for fixed size data, includes fixed size array
@ -121,8 +154,8 @@ class FixedWidthChunk : public Chunk {
uint64_t size, uint64_t size,
uint64_t element_size, uint64_t element_size,
bool nullable, bool nullable,
std::unique_ptr<MmapFileRAII> mmap_file_raii = nullptr) std::shared_ptr<ChunkMmapGuard> chunk_mmap_guard)
: Chunk(row_nums, data, size, nullable, std::move(mmap_file_raii)), : Chunk(row_nums, data, size, nullable, chunk_mmap_guard),
dim_(dim), dim_(dim),
element_size_(element_size) { element_size_(element_size) {
auto null_bitmap_bytes_num = nullable_ ? (row_nums_ + 7) / 8 : 0; auto null_bitmap_bytes_num = nullable_ ? (row_nums_ + 7) / 8 : 0;
@ -181,8 +214,8 @@ class StringChunk : public Chunk {
char* data, char* data,
uint64_t size, uint64_t size,
bool nullable, bool nullable,
std::unique_ptr<MmapFileRAII> mmap_file_raii = nullptr) std::shared_ptr<ChunkMmapGuard> chunk_mmap_guard)
: Chunk(row_nums, data, size, nullable, std::move(mmap_file_raii)) { : Chunk(row_nums, data, size, nullable, chunk_mmap_guard) {
auto null_bitmap_bytes_num = nullable_ ? (row_nums_ + 7) / 8 : 0; auto null_bitmap_bytes_num = nullable_ ? (row_nums_ + 7) / 8 : 0;
offsets_ = reinterpret_cast<uint32_t*>(data + null_bitmap_bytes_num); offsets_ = reinterpret_cast<uint32_t*>(data + null_bitmap_bytes_num);
} }
@ -314,8 +347,8 @@ class ArrayChunk : public Chunk {
uint64_t size, uint64_t size,
milvus::DataType element_type, milvus::DataType element_type,
bool nullable, bool nullable,
std::unique_ptr<MmapFileRAII> mmap_file_raii = nullptr) std::shared_ptr<ChunkMmapGuard> chunk_mmap_guard)
: Chunk(row_nums, data, size, nullable, std::move(mmap_file_raii)), : Chunk(row_nums, data, size, nullable, chunk_mmap_guard),
element_type_(element_type) { element_type_(element_type) {
auto null_bitmap_bytes_num = 0; auto null_bitmap_bytes_num = 0;
if (nullable) { if (nullable) {
@ -431,8 +464,8 @@ class VectorArrayChunk : public Chunk {
char* data, char* data,
uint64_t size, uint64_t size,
milvus::DataType element_type, milvus::DataType element_type,
std::unique_ptr<MmapFileRAII> mmap_file_raii = nullptr) std::shared_ptr<ChunkMmapGuard> chunk_mmap_guard)
: Chunk(row_nums, data, size, false, std::move(mmap_file_raii)), : Chunk(row_nums, data, size, false, chunk_mmap_guard),
dim_(dim), dim_(dim),
element_type_(element_type) { element_type_(element_type) {
offsets_lens_ = reinterpret_cast<uint32_t*>(data); offsets_lens_ = reinterpret_cast<uint32_t*>(data);
@ -520,15 +553,14 @@ class VectorArrayChunk : public Chunk {
class SparseFloatVectorChunk : public Chunk { class SparseFloatVectorChunk : public Chunk {
public: public:
SparseFloatVectorChunk( SparseFloatVectorChunk(int32_t row_nums,
int32_t row_nums,
char* data, char* data,
uint64_t size, uint64_t size,
bool nullable, bool nullable,
std::unique_ptr<MmapFileRAII> mmap_file_raii = nullptr) std::shared_ptr<ChunkMmapGuard> chunk_mmap_guard)
: Chunk(row_nums, data, size, nullable, std::move(mmap_file_raii)) { : Chunk(row_nums, data, size, nullable, chunk_mmap_guard) {
vec_.resize(row_nums); vec_.resize(row_nums);
auto null_bitmap_bytes_num = (row_nums + 7) / 8; auto null_bitmap_bytes_num = nullable ? (row_nums + 7) / 8 : 0;
auto offsets_ptr = auto offsets_ptr =
reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num); reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
for (int i = 0; i < row_nums; i++) { for (int i = 0; i < row_nums; i++) {

View File

@ -27,9 +27,9 @@ MemChunkTarget::write(const void* data, size_t size) {
size_ += size; size_ += size;
} }
std::pair<char*, size_t> char*
MemChunkTarget::get() { MemChunkTarget::release() {
return {data_, cap_}; return data_;
} }
size_t size_t
@ -39,6 +39,11 @@ MemChunkTarget::tell() {
void void
MmapChunkTarget::flush() { MmapChunkTarget::flush() {
if (cap_ > size_) {
std::string padding(cap_ - size_, 0);
file_writer_->Write(padding.data(), cap_ - size_);
size_ = cap_;
}
file_writer_->Finish(); file_writer_->Finish();
} }
@ -48,17 +53,17 @@ MmapChunkTarget::write(const void* data, size_t size) {
size_ += size; size_ += size;
} }
std::pair<char*, size_t> char*
MmapChunkTarget::get() { MmapChunkTarget::release() {
flush(); flush();
auto file = File::Open(file_path_, O_RDWR); auto file = File::Open(file_path_, O_RDWR);
auto m = mmap(nullptr, size_, PROT_READ, MAP_SHARED, file.Descriptor(), 0); auto m = mmap(nullptr, cap_, PROT_READ, MAP_SHARED, file.Descriptor(), 0);
AssertInfo(m != MAP_FAILED, AssertInfo(m != MAP_FAILED,
"failed to map: {}, map_size={}", "failed to map: {}, map_size={}",
strerror(errno), strerror(errno),
size_); cap_);
return {static_cast<char*>(m), size_}; return static_cast<char*>(m);
} }
size_t size_t

View File

@ -23,46 +23,63 @@
namespace milvus { namespace milvus {
class ChunkTarget { class ChunkTarget {
public: public:
virtual void static constexpr size_t ALIGNED_SIZE = 4096; // 4KB
write(const void* data, size_t size) = 0;
virtual std::pair<char*, size_t>
get() = 0;
virtual ~ChunkTarget() = default; virtual ~ChunkTarget() = default;
/**
* @brief write data to the target at the current position
* @param data the data to write
* @param size the size of the data to write
*/
virtual void
write(const void* data, size_t size) = 0;
/**
* @brief release the data pointer to the caller
* @note no write() should be called after release()
* @return the data pointer
*/
virtual char*
release() = 0;
/**
* @brief get the current position of the target
* @return the current position
*/
virtual size_t virtual size_t
tell() = 0; tell() = 0;
}; };
class MmapChunkTarget : public ChunkTarget { class MmapChunkTarget : public ChunkTarget {
public: public:
explicit MmapChunkTarget(std::string file_path) explicit MmapChunkTarget(std::string file_path, size_t cap)
: file_path_(std::move(file_path)) { : file_path_(std::move(file_path)), cap_(cap) {
file_writer_ = std::make_unique<storage::FileWriter>(file_path_); file_writer_ = std::make_unique<storage::FileWriter>(file_path_);
} }
void
flush();
void void
write(const void* data, size_t size) override; write(const void* data, size_t size) override;
std::pair<char*, size_t> char*
get() override; release() override;
size_t size_t
tell() override; tell() override;
private: private:
void
flush();
std::unique_ptr<storage::FileWriter> file_writer_{nullptr}; std::unique_ptr<storage::FileWriter> file_writer_{nullptr};
std::string file_path_{}; std::string file_path_{};
size_t size_ = 0; size_t cap_{0};
size_t size_{0};
}; };
class MemChunkTarget : public ChunkTarget { class MemChunkTarget : public ChunkTarget {
public: public:
MemChunkTarget(size_t cap) : cap_(cap) { explicit MemChunkTarget(size_t cap) : cap_(cap) {
auto m = mmap(nullptr, auto m = mmap(nullptr,
cap, cap,
PROT_READ | PROT_WRITE, PROT_READ | PROT_WRITE,
@ -79,8 +96,8 @@ class MemChunkTarget : public ChunkTarget {
void void
write(const void* data, size_t size) override; write(const void* data, size_t size) override;
std::pair<char*, size_t> char*
get() override; release() override;
size_t size_t
tell() override; tell() override;

File diff suppressed because it is too large Load Diff

View File

@ -13,7 +13,6 @@
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
#include <numeric>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "arrow/array/array_primitive.h" #include "arrow/array/array_primitive.h"
@ -21,37 +20,26 @@
#include "common/ChunkTarget.h" #include "common/ChunkTarget.h"
#include "arrow/record_batch.h" #include "arrow/record_batch.h"
#include "common/Chunk.h" #include "common/Chunk.h"
#include "common/EasyAssert.h"
#include "common/FieldDataInterface.h"
#include "storage/FileWriter.h"
#include "common/Geometry.h"
namespace milvus { namespace milvus {
class ChunkWriterBase { class ChunkWriterBase {
public: public:
explicit ChunkWriterBase(bool nullable) : nullable_(nullable) { explicit ChunkWriterBase(bool nullable) : nullable_(nullable) {
} }
ChunkWriterBase(std::string file_path, bool nullable) virtual std::pair<size_t, size_t>
: file_path_(std::move(file_path)), nullable_(nullable) { calculate_size(const arrow::ArrayVector& data) = 0;
}
virtual void virtual void
write(const arrow::ArrayVector& data) = 0; write_to_target(const arrow::ArrayVector& array_vec,
const std::shared_ptr<ChunkTarget>& target) = 0;
virtual std::unique_ptr<Chunk>
finish() = 0;
std::pair<char*, size_t>
get_data() {
return target_->get();
}
protected:
void void
write_null_bit_maps( write_null_bit_maps(
const std::vector<std::tuple<const uint8_t*, int64_t, int64_t>>& const std::vector<std::tuple<const uint8_t*, int64_t, int64_t>>&
null_bitmaps) { null_bitmaps,
const std::shared_ptr<ChunkTarget>& target) {
if (nullable_) { if (nullable_) {
// merge all null bitmaps in case of multiple chunk null bitmap dislocation // merge all null bitmaps in case of multiple chunk null bitmap dislocation
// say [0xFF, 0x00] with size [7, 8] cannot be treated as [0xFF, 0x00] after merged but // say [0xFF, 0x00] with size [7, 8] cannot be treated as [0xFF, 0x00] after merged but
@ -81,15 +69,13 @@ class ChunkWriterBase {
} }
size_total_bit += size_bits; size_total_bit += size_bits;
} }
target_->write(merged_null_bitmap.data(), (size_total_bit + 7) / 8); target->write(merged_null_bitmap.data(), (size_total_bit + 7) / 8);
} }
} }
protected: protected:
int row_nums_ = 0; size_t row_nums_ = 0;
std::string file_path_{""};
bool nullable_ = false; bool nullable_ = false;
std::shared_ptr<ChunkTarget> target_;
}; };
template <typename ArrowType, typename T> template <typename ArrowType, typename T>
@ -98,30 +84,25 @@ class ChunkWriter final : public ChunkWriterBase {
ChunkWriter(int dim, bool nullable) : ChunkWriterBase(nullable), dim_(dim) { ChunkWriter(int dim, bool nullable) : ChunkWriterBase(nullable), dim_(dim) {
} }
ChunkWriter(int dim, std::string file_path, bool nullable) std::pair<size_t, size_t>
: ChunkWriterBase(std::move(file_path), nullable), dim_(dim){}; calculate_size(const arrow::ArrayVector& array_vec) override {
size_t size = 0;
void size_t row_nums = 0;
write(const arrow::ArrayVector& array_vec) override {
auto size = 0;
auto row_nums = 0;
for (const auto& data : array_vec) { for (const auto& data : array_vec) {
row_nums += data->length(); row_nums += data->length();
auto array = std::static_pointer_cast<ArrowType>(data); auto array = std::static_pointer_cast<ArrowType>(data);
if (nullable_) {
auto null_bitmap_n = (data->length() + 7) / 8;
size += null_bitmap_n;
}
size += array->length() * dim_ * sizeof(T); size += array->length() * dim_ * sizeof(T);
} }
if (nullable_) {
row_nums_ = row_nums; size += (row_nums + 7) / 8;
if (!file_path_.empty()) {
target_ = std::make_shared<MmapChunkTarget>(file_path_);
} else {
target_ = std::make_shared<MemChunkTarget>(size);
} }
row_nums_ = row_nums;
return {size, row_nums};
}
void
write_to_target(const arrow::ArrayVector& array_vec,
const std::shared_ptr<ChunkTarget>& target) override {
// Chunk layout: // Chunk layout:
// 1. Null bitmap (if nullable_=true): Indicates which values are null // 1. Null bitmap (if nullable_=true): Indicates which values are null
// 2. Data values: Contiguous storage of data elements in the order: // 2. Data values: Contiguous storage of data elements in the order:
@ -134,55 +115,25 @@ class ChunkWriter final : public ChunkWriterBase {
null_bitmaps.emplace_back( null_bitmaps.emplace_back(
data->null_bitmap_data(), data->length(), data->offset()); data->null_bitmap_data(), data->length(), data->offset());
} }
write_null_bit_maps(null_bitmaps); write_null_bit_maps(null_bitmaps, target);
} }
for (const auto& data : array_vec) { for (const auto& data : array_vec) {
auto array = std::static_pointer_cast<ArrowType>(data); auto array = std::static_pointer_cast<ArrowType>(data);
auto data_ptr = array->raw_values(); auto data_ptr = array->raw_values();
target_->write(data_ptr, array->length() * dim_ * sizeof(T)); target->write(data_ptr, array->length() * dim_ * sizeof(T));
} }
} }
std::unique_ptr<Chunk>
finish() override {
auto [data, size] = target_->get();
auto mmap_file_raii = file_path_.empty()
? nullptr
: std::make_unique<MmapFileRAII>(file_path_);
return std::make_unique<FixedWidthChunk>(row_nums_,
dim_,
data,
size,
sizeof(T),
nullable_,
std::move(mmap_file_raii));
}
private: private:
int dim_; const int64_t dim_;
}; };
template <> template <>
inline void inline void
ChunkWriter<arrow::BooleanArray, bool>::write( ChunkWriter<arrow::BooleanArray, bool>::write_to_target(
const arrow::ArrayVector& array_vec) { const arrow::ArrayVector& array_vec,
auto size = 0; const std::shared_ptr<ChunkTarget>& target) {
auto row_nums = 0;
for (const auto& data : array_vec) {
row_nums += data->length();
auto array = std::dynamic_pointer_cast<arrow::BooleanArray>(data);
size += array->length() * dim_;
size += (data->length() + 7) / 8;
}
row_nums_ = row_nums;
if (!file_path_.empty()) {
target_ = std::make_shared<MmapChunkTarget>(file_path_);
} else {
target_ = std::make_shared<MemChunkTarget>(size);
}
if (nullable_) { if (nullable_) {
// tuple <data, size, offset> // tuple <data, size, offset>
std::vector<std::tuple<const uint8_t*, int64_t, int64_t>> null_bitmaps; std::vector<std::tuple<const uint8_t*, int64_t, int64_t>> null_bitmaps;
@ -190,14 +141,14 @@ ChunkWriter<arrow::BooleanArray, bool>::write(
null_bitmaps.emplace_back( null_bitmaps.emplace_back(
data->null_bitmap_data(), data->length(), data->offset()); data->null_bitmap_data(), data->length(), data->offset());
} }
write_null_bit_maps(null_bitmaps); write_null_bit_maps(null_bitmaps, target);
} }
for (const auto& data : array_vec) { for (const auto& data : array_vec) {
auto array = std::dynamic_pointer_cast<arrow::BooleanArray>(data); auto array = std::dynamic_pointer_cast<arrow::BooleanArray>(data);
for (int i = 0; i < array->length(); i++) { for (int i = 0; i < array->length(); i++) {
auto value = array->Value(i); auto value = array->Value(i);
target_->write(&value, sizeof(bool)); target->write(&value, sizeof(bool));
} }
} }
} }
@ -206,32 +157,39 @@ class StringChunkWriter : public ChunkWriterBase {
public: public:
using ChunkWriterBase::ChunkWriterBase; using ChunkWriterBase::ChunkWriterBase;
void std::pair<size_t, size_t>
write(const arrow::ArrayVector& array_vec) override; calculate_size(const arrow::ArrayVector& array_vec) override;
std::unique_ptr<Chunk> void
finish() override; write_to_target(const arrow::ArrayVector& array_vec,
const std::shared_ptr<ChunkTarget>& target) override;
private:
std::vector<std::string_view> strs_;
}; };
class JSONChunkWriter : public ChunkWriterBase { class JSONChunkWriter : public ChunkWriterBase {
public: public:
using ChunkWriterBase::ChunkWriterBase; using ChunkWriterBase::ChunkWriterBase;
void std::pair<size_t, size_t>
write(const arrow::ArrayVector& array_vec) override; calculate_size(const arrow::ArrayVector& array_vec) override;
std::unique_ptr<Chunk> void
finish() override; write_to_target(const arrow::ArrayVector& array_vec,
const std::shared_ptr<ChunkTarget>& target) override;
}; };
class GeometryChunkWriter : public ChunkWriterBase { class GeometryChunkWriter : public ChunkWriterBase {
public: public:
using ChunkWriterBase::ChunkWriterBase; using ChunkWriterBase::ChunkWriterBase;
void
write(const arrow::ArrayVector& array_vec) override;
std::unique_ptr<Chunk> std::pair<size_t, size_t>
finish() override; calculate_size(const arrow::ArrayVector& array_vec) override;
void
write_to_target(const arrow::ArrayVector& array_vec,
const std::shared_ptr<ChunkTarget>& target) override;
}; };
class ArrayChunkWriter : public ChunkWriterBase { class ArrayChunkWriter : public ChunkWriterBase {
@ -239,18 +197,13 @@ class ArrayChunkWriter : public ChunkWriterBase {
ArrayChunkWriter(const milvus::DataType element_type, bool nullable) ArrayChunkWriter(const milvus::DataType element_type, bool nullable)
: ChunkWriterBase(nullable), element_type_(element_type) { : ChunkWriterBase(nullable), element_type_(element_type) {
} }
ArrayChunkWriter(const milvus::DataType element_type,
std::string file_path, std::pair<size_t, size_t>
bool nullable) calculate_size(const arrow::ArrayVector& array_vec) override;
: ChunkWriterBase(std::move(file_path), nullable),
element_type_(element_type) {
}
void void
write(const arrow::ArrayVector& array_vec) override; write_to_target(const arrow::ArrayVector& array_vec,
const std::shared_ptr<ChunkTarget>& target) override;
std::unique_ptr<Chunk>
finish() override;
private: private:
const milvus::DataType element_type_; const milvus::DataType element_type_;
@ -258,46 +211,44 @@ class ArrayChunkWriter : public ChunkWriterBase {
class VectorArrayChunkWriter : public ChunkWriterBase { class VectorArrayChunkWriter : public ChunkWriterBase {
public: public:
VectorArrayChunkWriter(int64_t dim, VectorArrayChunkWriter(int64_t dim, const milvus::DataType element_type)
const milvus::DataType element_type, : ChunkWriterBase(false), element_type_(element_type), dim_(dim) {
std::string file_path = "")
: ChunkWriterBase(std::move(file_path), false),
element_type_(element_type),
dim_(dim) {
} }
void std::pair<size_t, size_t>
write(const arrow::ArrayVector& array_vec) override; calculate_size(const arrow::ArrayVector& array_vec) override;
std::unique_ptr<Chunk> void
finish() override; write_to_target(const arrow::ArrayVector& array_vec,
const std::shared_ptr<ChunkTarget>& target) override;
private: private:
size_t
calculateTotalSize(const arrow::ArrayVector& array_vec);
const milvus::DataType element_type_; const milvus::DataType element_type_;
int64_t dim_; const int64_t dim_;
}; };
class SparseFloatVectorChunkWriter : public ChunkWriterBase { class SparseFloatVectorChunkWriter : public ChunkWriterBase {
public: public:
using ChunkWriterBase::ChunkWriterBase; using ChunkWriterBase::ChunkWriterBase;
std::pair<size_t, size_t>
calculate_size(const arrow::ArrayVector& array_vec) override;
void void
write(const arrow::ArrayVector& array_vec) override; write_to_target(const arrow::ArrayVector& array_vec,
const std::shared_ptr<ChunkTarget>& target) override;
std::unique_ptr<Chunk>
finish() override;
}; };
std::unique_ptr<Chunk>
create_chunk(const FieldMeta& field_meta, const arrow::ArrayVector& array_vec);
std::unique_ptr<Chunk> std::unique_ptr<Chunk>
create_chunk(const FieldMeta& field_meta, create_chunk(const FieldMeta& field_meta,
const arrow::ArrayVector& array_vec, const arrow::ArrayVector& array_vec,
const std::string& file_path); const std::string& file_path = "");
std::unordered_map<FieldId, std::shared_ptr<Chunk>>
create_group_chunk(const std::vector<FieldId>& field_ids,
const std::vector<FieldMeta>& field_metas,
const std::vector<arrow::ArrayVector>& array_vec,
const std::string& file_path = "");
arrow::ArrayVector arrow::ArrayVector
read_single_column_batches(std::shared_ptr<arrow::RecordBatchReader> reader); read_single_column_batches(std::shared_ptr<arrow::RecordBatchReader> reader);

View File

@ -98,4 +98,9 @@ class GroupChunk {
std::unordered_map<FieldId, std::shared_ptr<Chunk>> chunks_; std::unordered_map<FieldId, std::shared_ptr<Chunk>> chunks_;
}; };
enum class GroupChunkType : uint8_t {
DEFAULT = 0,
JSON_KEY_STATS = 1,
};
} // namespace milvus } // namespace milvus

View File

@ -21,6 +21,7 @@
#include "common/Chunk.h" #include "common/Chunk.h"
#include "common/VectorArray.h" #include "common/VectorArray.h"
#include "common/Types.h" #include "common/Types.h"
#include "common/FieldMeta.h"
#include "pb/schema.pb.h" #include "pb/schema.pb.h"
#include "test_utils/DataGen.h" #include "test_utils/DataGen.h"
@ -243,11 +244,14 @@ TEST_F(VectorArrayChunkTest, TestWriteMultipleBatches) {
createFloatVectorListArray(batch_data, batch_offsets, dim)); createFloatVectorListArray(batch_data, batch_offsets, dim));
} }
// Write using VectorArrayChunkWriter // Write using create_chunk with FieldMeta
VectorArrayChunkWriter writer(dim, DataType::VECTOR_FLOAT); FieldMeta field_meta(FieldName("va"),
writer.write(array_vec); FieldId(1),
DataType::VECTOR_ARRAY,
auto chunk = writer.finish(); DataType::VECTOR_FLOAT,
dim,
std::nullopt);
auto chunk = create_chunk(field_meta, array_vec);
auto vector_array_chunk = static_cast<VectorArrayChunk*>(chunk.get()); auto vector_array_chunk = static_cast<VectorArrayChunk*>(chunk.get());
// Verify total rows // Verify total rows
@ -297,11 +301,14 @@ TEST_F(VectorArrayChunkTest, TestWriteWithMmap) {
auto list_array = createFloatVectorListArray(all_data, offsets, dim); auto list_array = createFloatVectorListArray(all_data, offsets, dim);
arrow::ArrayVector array_vec = {list_array}; arrow::ArrayVector array_vec = {list_array};
// Write with mmap // Write with mmap using create_chunk
VectorArrayChunkWriter writer(dim, DataType::VECTOR_FLOAT, temp_file); FieldMeta field_meta(FieldName("va"),
writer.write(array_vec); FieldId(2),
DataType::VECTOR_ARRAY,
auto chunk = writer.finish(); DataType::VECTOR_FLOAT,
dim,
std::nullopt);
auto chunk = create_chunk(field_meta, array_vec, temp_file);
auto vector_array_chunk = static_cast<VectorArrayChunk*>(chunk.get()); auto vector_array_chunk = static_cast<VectorArrayChunk*>(chunk.get());
// Verify mmap write // Verify mmap write
@ -330,10 +337,13 @@ TEST_F(VectorArrayChunkTest, TestEmptyVectorArray) {
arrow::ArrayVector array_vec; arrow::ArrayVector array_vec;
VectorArrayChunkWriter writer(dim, DataType::VECTOR_FLOAT); FieldMeta field_meta(FieldName("va"),
writer.write(array_vec); FieldId(3),
DataType::VECTOR_ARRAY,
auto chunk = writer.finish(); DataType::VECTOR_FLOAT,
dim,
std::nullopt);
auto chunk = create_chunk(field_meta, array_vec);
auto vector_array_chunk = static_cast<VectorArrayChunk*>(chunk.get()); auto vector_array_chunk = static_cast<VectorArrayChunk*>(chunk.get());
EXPECT_EQ(vector_array_chunk->RowNums(), 0); EXPECT_EQ(vector_array_chunk->RowNums(), 0);
@ -469,11 +479,14 @@ TEST_P(VectorArrayChunkParameterizedTest, TestWriteVectorArray) {
createVectorListArray(all_data, offsets, param.dim, param.data_type); createVectorListArray(all_data, offsets, param.dim, param.data_type);
arrow::ArrayVector array_vec = {list_array}; arrow::ArrayVector array_vec = {list_array};
// Test VectorArrayChunkWriter // Test create_chunk with FieldMeta for VECTOR_ARRAY
VectorArrayChunkWriter writer(param.dim, param.data_type); FieldMeta field_meta(FieldName("va_param"),
writer.write(array_vec); FieldId(100),
DataType::VECTOR_ARRAY,
auto chunk = writer.finish(); param.data_type,
param.dim,
std::nullopt);
auto chunk = create_chunk(field_meta, array_vec);
auto vector_array_chunk = static_cast<VectorArrayChunk*>(chunk.get()); auto vector_array_chunk = static_cast<VectorArrayChunk*>(chunk.get());
// Verify results // Verify results

View File

@ -898,7 +898,7 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id,
auto enable_mmap = !mmap_filepath_.empty(); auto enable_mmap = !mmap_filepath_.empty();
auto column_group_info = auto column_group_info =
FieldDataInfo(column_group_id, num_rows, mmap_filepath_); FieldDataInfo(column_group_id, field_id_, num_rows, mmap_filepath_);
LOG_INFO( LOG_INFO(
"loads column group {} with num_rows {} for segment " "loads column group {} with num_rows {} for segment "
"{}", "{}",
@ -923,6 +923,7 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id,
auto translator = std::make_unique< auto translator = std::make_unique<
milvus::segcore::storagev2translator::GroupChunkTranslator>( milvus::segcore::storagev2translator::GroupChunkTranslator>(
segment_id_, segment_id_,
GroupChunkType::JSON_KEY_STATS,
field_meta_map, field_meta_map,
column_group_info, column_group_info,
files, files,
@ -968,9 +969,11 @@ JsonKeyStats::LoadShreddingData(const std::vector<std::string>& index_files) {
void void
JsonKeyStats::Load(milvus::tracer::TraceContext ctx, const Config& config) { JsonKeyStats::Load(milvus::tracer::TraceContext ctx, const Config& config) {
if (config.contains(MMAP_FILE_PATH)) { if (config.contains(ENABLE_MMAP)) {
mmap_filepath_ = GetValueFromConfig<std::string>(config, MMAP_FILE_PATH) mmap_filepath_ =
.value_or(""); milvus::storage::LocalChunkManagerSingleton::GetInstance()
.GetChunkManager()
->GetRootPath();
LOG_INFO("load json stats for segment {} with mmap local file path: {}", LOG_INFO("load json stats for segment {} with mmap local file path: {}",
segment_id_, segment_id_,
mmap_filepath_); mmap_filepath_);

View File

@ -105,7 +105,9 @@ class DefaultColumnGroupingStrategy : public ColumnGroupingStrategy {
CreateGroups(const TableStatsInfo& table_info) const override { CreateGroups(const TableStatsInfo& table_info) const override {
// put all columns into one group // put all columns into one group
std::vector<std::vector<int>> column_groups; std::vector<std::vector<int>> column_groups;
column_groups.reserve(1);
std::vector<int> group; std::vector<int> group;
group.reserve(table_info.schema->num_fields());
for (size_t i = 0; i < table_info.schema->num_fields(); ++i) { for (size_t i = 0; i < table_info.schema->num_fields(); ++i) {
group.push_back(i); group.push_back(i);
} }

View File

@ -23,8 +23,10 @@ TEST(test_chunked_column, test_get_chunkid) {
std::vector<std::unique_ptr<Chunk>> chunks; std::vector<std::unique_ptr<Chunk>> chunks;
for (auto i = 0; i < num_chunks; ++i) { for (auto i = 0; i < num_chunks; ++i) {
auto row_num = num_rows_per_chunk[i]; auto row_num = num_rows_per_chunk[i];
auto chunk = auto chunk_mmap_guard =
std::make_unique<FixedWidthChunk>(row_num, 1, nullptr, 0, 4, false); std::make_shared<ChunkMmapGuard>(nullptr, 0, "");
auto chunk = std::make_unique<FixedWidthChunk>(
row_num, 1, nullptr, 0, 4, false, chunk_mmap_guard);
chunks.push_back(std::move(chunk)); chunks.push_back(std::move(chunk));
} }
auto translator = std::make_unique<TestChunkTranslator>( auto translator = std::make_unique<TestChunkTranslator>(

View File

@ -40,10 +40,24 @@ struct FieldDataInfo {
arrow_reader_channel = std::make_shared<ArrowReaderChannel>(); arrow_reader_channel = std::make_shared<ArrowReaderChannel>();
} }
FieldDataInfo(int64_t field_id,
int64_t main_field_id,
size_t row_count,
std::string mmap_dir_path = "",
bool in_load_list = false)
: field_id(field_id),
row_count(row_count),
main_field_id(main_field_id),
mmap_dir_path(std::move(mmap_dir_path)),
in_load_list(in_load_list) {
arrow_reader_channel = std::make_shared<ArrowReaderChannel>();
}
int64_t field_id; int64_t field_id;
int64_t main_field_id{INVALID_FIELD_ID}; // used for json stats only
size_t row_count; size_t row_count;
std::string mmap_dir_path; std::string mmap_dir_path{};
std::shared_ptr<ArrowReaderChannel> arrow_reader_channel; std::shared_ptr<ArrowReaderChannel> arrow_reader_channel;
bool in_load_list = false; bool in_load_list{false};
}; };
} // namespace milvus } // namespace milvus

View File

@ -250,8 +250,16 @@ class CachedSearchIteratorTest
memcpy(chunk_data.data(), memcpy(chunk_data.data(),
base_dataset_.cbegin() + offset * dim_, base_dataset_.cbegin() + offset * dim_,
rows * dim_ * sizeof(float)); rows * dim_ * sizeof(float));
chunks.emplace_back(std::make_unique<FixedWidthChunk>( auto chunk_mmap_guard =
rows, dim_, chunk_data.data(), buf_size, sizeof(float), false)); std::make_shared<ChunkMmapGuard>(nullptr, 0, "");
chunks.emplace_back(
std::make_unique<FixedWidthChunk>(rows,
dim_,
chunk_data.data(),
buf_size,
sizeof(float),
false,
chunk_mmap_guard));
offset += rows; offset += rows;
} }
auto translator = std::make_unique<TestChunkTranslator>( auto translator = std::make_unique<TestChunkTranslator>(

View File

@ -562,6 +562,7 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal(
auto translator = auto translator =
std::make_unique<storagev2translator::GroupChunkTranslator>( std::make_unique<storagev2translator::GroupChunkTranslator>(
get_segment_id(), get_segment_id(),
GroupChunkType::DEFAULT,
field_metas, field_metas,
column_group_info, column_group_info,
insert_files, insert_files,

View File

@ -95,8 +95,10 @@ TEST(test_chunk_segment, TestSearchOnSealed) {
defer.AddDefer([buf]() { delete[] buf; }); defer.AddDefer([buf]() { delete[] buf; });
memcpy(buf, data.data(), 4 * data.size()); memcpy(buf, data.data(), 4 * data.size());
auto chunk_mmap_guard =
std::make_shared<ChunkMmapGuard>(nullptr, 0, "");
chunks.emplace_back(std::make_unique<FixedWidthChunk>( chunks.emplace_back(std::make_unique<FixedWidthChunk>(
chunk_size, dim, buf, buf_size, 4, false)); chunk_size, dim, buf, buf_size, 4, false, chunk_mmap_guard));
} }
auto translator = std::make_unique<TestChunkTranslator>( auto translator = std::make_unique<TestChunkTranslator>(

View File

@ -172,14 +172,6 @@ ChunkTranslator::get_cells(
auto data_type = field_meta_.get_data_type(); auto data_type = field_meta_.get_data_type();
std::filesystem::path folder;
if (use_mmap_) {
folder = std::filesystem::path(mmap_dir_path_) /
std::to_string(segment_id_) / std::to_string(field_id_);
std::filesystem::create_directories(folder);
}
for (auto cid : cids) { for (auto cid : cids) {
std::unique_ptr<milvus::Chunk> chunk = nullptr; std::unique_ptr<milvus::Chunk> chunk = nullptr;
if (!use_mmap_) { if (!use_mmap_) {
@ -192,7 +184,11 @@ ChunkTranslator::get_cells(
chunk = create_chunk(field_meta_, array_vec); chunk = create_chunk(field_meta_, array_vec);
} else { } else {
// we don't know the resulting file size beforehand, thus using a separate file for each chunk. // we don't know the resulting file size beforehand, thus using a separate file for each chunk.
auto filepath = folder / std::to_string(cid); auto filepath =
std::filesystem::path(mmap_dir_path_) /
fmt::format("seg_{}_fid_{}_{}", segment_id_, field_id_, cid);
std::filesystem::create_directories(
std::filesystem::path(mmap_dir_path_));
LOG_INFO("segment {} mmaping field {} chunk {} to path {}", LOG_INFO("segment {} mmaping field {} chunk {} to path {}",
segment_id_, segment_id_,

View File

@ -45,6 +45,7 @@ namespace milvus::segcore::storagev2translator {
GroupChunkTranslator::GroupChunkTranslator( GroupChunkTranslator::GroupChunkTranslator(
int64_t segment_id, int64_t segment_id,
GroupChunkType group_chunk_type,
const std::unordered_map<FieldId, FieldMeta>& field_metas, const std::unordered_map<FieldId, FieldMeta>& field_metas,
FieldDataInfo column_group_info, FieldDataInfo column_group_info,
std::vector<std::string> insert_files, std::vector<std::string> insert_files,
@ -52,7 +53,23 @@ GroupChunkTranslator::GroupChunkTranslator(
int64_t num_fields, int64_t num_fields,
milvus::proto::common::LoadPriority load_priority) milvus::proto::common::LoadPriority load_priority)
: segment_id_(segment_id), : segment_id_(segment_id),
key_(fmt::format("seg_{}_cg_{}", segment_id, column_group_info.field_id)), group_chunk_type_(group_chunk_type),
key_([&]() {
switch (group_chunk_type) {
case GroupChunkType::DEFAULT:
return fmt::format(
"seg_{}_cg_{}", segment_id, column_group_info.field_id);
case GroupChunkType::JSON_KEY_STATS:
AssertInfo(
column_group_info.main_field_id != INVALID_FIELD_ID,
"main field id is not set for json key stats group "
"chunk");
return fmt::format("seg_{}_jks_{}_cg_{}",
segment_id,
column_group_info.main_field_id,
column_group_info.field_id);
}
}()),
field_metas_(field_metas), field_metas_(field_metas),
column_group_info_(column_group_info), column_group_info_(column_group_info),
insert_files_(insert_files), insert_files_(insert_files),
@ -303,6 +320,12 @@ GroupChunkTranslator::load_group_chunk(
// Create chunks for each field in this batch // Create chunks for each field in this batch
std::unordered_map<FieldId, std::shared_ptr<Chunk>> chunks; std::unordered_map<FieldId, std::shared_ptr<Chunk>> chunks;
// Iterate through field_id_list to get field_id and create chunk // Iterate through field_id_list to get field_id and create chunk
std::vector<FieldId> field_ids;
std::vector<FieldMeta> field_metas;
std::vector<arrow::ArrayVector> array_vecs;
field_metas.reserve(table->schema()->num_fields());
array_vecs.reserve(table->schema()->num_fields());
for (int i = 0; i < table->schema()->num_fields(); ++i) { for (int i = 0; i < table->schema()->num_fields(); ++i) {
AssertInfo(table->schema()->field(i)->metadata()->Contains( AssertInfo(table->schema()->field(i)->metadata()->Contains(
milvus_storage::ARROW_FIELD_ID_KEY), milvus_storage::ARROW_FIELD_ID_KEY),
@ -329,41 +352,41 @@ GroupChunkTranslator::load_group_chunk(
fid.get()); fid.get());
const auto& field_meta = it->second; const auto& field_meta = it->second;
const arrow::ArrayVector& array_vec = table->column(i)->chunks(); const arrow::ArrayVector& array_vec = table->column(i)->chunks();
std::unique_ptr<Chunk> chunk; field_ids.push_back(fid);
field_metas.push_back(field_meta);
array_vecs.push_back(array_vec);
}
if (!use_mmap_) { if (!use_mmap_) {
// Memory mode chunks = create_group_chunk(field_ids, field_metas, array_vecs);
chunk = create_chunk(field_meta, array_vec);
} else { } else {
// Mmap mode
std::filesystem::path filepath; std::filesystem::path filepath;
if (field_meta.get_main_field_id() != INVALID_FIELD_ID) { switch (group_chunk_type_) {
// json shredding mode case GroupChunkType::DEFAULT:
filepath = filepath =
std::filesystem::path(column_group_info_.mmap_dir_path) / std::filesystem::path(column_group_info_.mmap_dir_path) /
std::to_string(segment_id_) / fmt::format("seg_{}_cg_{}_{}",
std::to_string(field_meta.get_main_field_id()) / segment_id_,
std::to_string(field_id) / std::to_string(cid); column_group_info_.field_id,
} else { cid);
break;
case GroupChunkType::JSON_KEY_STATS:
filepath = filepath =
std::filesystem::path(column_group_info_.mmap_dir_path) / std::filesystem::path(column_group_info_.mmap_dir_path) /
std::to_string(segment_id_) / std::to_string(field_id) / fmt::format("seg_{}_jks_{}_cg_{}_{}",
std::to_string(cid); segment_id_,
column_group_info_.main_field_id,
column_group_info_.field_id,
cid);
break;
default:
ThrowInfo(ErrorCode::UnexpectedError,
"unknown group chunk type: {}",
static_cast<uint8_t>(group_chunk_type_));
} }
LOG_INFO(
"[StorageV2] translator {} mmaping field {} chunk {} to path "
"{}",
key_,
field_id,
cid,
filepath.string());
std::filesystem::create_directories(filepath.parent_path()); std::filesystem::create_directories(filepath.parent_path());
chunks = create_group_chunk(
chunk = create_chunk(field_meta, array_vec, filepath.string()); field_ids, field_metas, array_vecs, filepath.string());
}
chunks[fid] = std::move(chunk);
} }
return std::make_unique<milvus::GroupChunk>(chunks); return std::make_unique<milvus::GroupChunk>(chunks);
} }

View File

@ -38,6 +38,7 @@ class GroupChunkTranslator
public: public:
GroupChunkTranslator( GroupChunkTranslator(
int64_t segment_id, int64_t segment_id,
GroupChunkType group_chunk_type,
const std::unordered_map<FieldId, FieldMeta>& field_metas, const std::unordered_map<FieldId, FieldMeta>& field_metas,
FieldDataInfo column_group_info, FieldDataInfo column_group_info,
std::vector<std::string> insert_files, std::vector<std::string> insert_files,
@ -108,6 +109,7 @@ class GroupChunkTranslator
const milvus::cachinglayer::cid_t cid); const milvus::cachinglayer::cid_t cid);
int64_t segment_id_; int64_t segment_id_;
GroupChunkType group_chunk_type_{GroupChunkType::DEFAULT};
std::string key_; std::string key_;
std::unordered_map<FieldId, FieldMeta> field_metas_; std::unordered_map<FieldId, FieldMeta> field_metas_;
FieldDataInfo column_group_info_; FieldDataInfo column_group_info_;

View File

@ -97,12 +97,17 @@ class GroupChunkTranslatorTest : public ::testing::TestWithParam<bool> {
}; };
TEST_P(GroupChunkTranslatorTest, TestWithMmap) { TEST_P(GroupChunkTranslatorTest, TestWithMmap) {
auto temp_dir =
std::filesystem::temp_directory_path() / "gctt_test_with_mmap";
std::filesystem::create_directory(temp_dir);
auto use_mmap = GetParam(); auto use_mmap = GetParam();
std::unordered_map<FieldId, FieldMeta> field_metas = schema_->get_fields(); std::unordered_map<FieldId, FieldMeta> field_metas = schema_->get_fields();
auto column_group_info = FieldDataInfo(0, 3000, ""); auto column_group_info = FieldDataInfo(0, 3000, temp_dir.string());
auto translator = std::make_unique<GroupChunkTranslator>( auto translator = std::make_unique<GroupChunkTranslator>(
segment_id_, segment_id_,
GroupChunkType::DEFAULT,
field_metas, field_metas,
column_group_info, column_group_info,
paths_, paths_,
@ -164,17 +169,23 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) {
EXPECT_EQ(meta->chunk_memory_size_.size(), num_cells); EXPECT_EQ(meta->chunk_memory_size_.size(), num_cells);
EXPECT_EQ(expected_total_size, chunked_column_group->memory_size()); EXPECT_EQ(expected_total_size, chunked_column_group->memory_size());
// Verify the mmap files for cell 0 and 1 are created
std::vector<std::string> mmap_paths = {
(temp_dir / "seg_0_cg_0_0").string(),
(temp_dir / "seg_0_cg_0_1").string()};
// Verify mmap directory and files if in mmap mode // Verify mmap directory and files if in mmap mode
if (use_mmap) { if (use_mmap) {
std::string mmap_dir = std::to_string(segment_id_); for (const auto& mmap_path : mmap_paths) {
EXPECT_TRUE(std::filesystem::exists(mmap_dir)); EXPECT_TRUE(std::filesystem::exists(mmap_path));
}
}
// DO NOT Verify each field has a corresponding file: files are unlinked immediately after being mmaped. // Clean up mmap files
// for (size_t i = 0; i < field_id_list.size(); ++i) { if (use_mmap) {
// auto field_id = field_id_list.Get(i); for (const auto& mmap_path : mmap_paths) {
// std::string field_file = mmap_dir + "/" + std::to_string(field_id); std::filesystem::remove(mmap_path);
// EXPECT_TRUE(std::filesystem::exists(field_file)); }
// } std::filesystem::remove(temp_dir);
} }
} }
@ -228,10 +239,14 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) {
AssertInfo(status.ok(), "failed to close file reader"); AssertInfo(status.ok(), "failed to close file reader");
} }
auto column_group_info = FieldDataInfo(0, total_rows, ""); auto temp_dir =
std::filesystem::temp_directory_path() / "gctt_test_multiple_files";
std::filesystem::create_directory(temp_dir);
auto column_group_info = FieldDataInfo(0, total_rows, temp_dir.string());
auto translator = std::make_unique<GroupChunkTranslator>( auto translator = std::make_unique<GroupChunkTranslator>(
segment_id_, segment_id_,
GroupChunkType::DEFAULT,
field_metas, field_metas,
column_group_info, column_group_info,
multi_file_paths, multi_file_paths,
@ -310,6 +325,10 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) {
std::filesystem::remove(file_path); std::filesystem::remove(file_path);
} }
} }
// Clean up cached column group files
if (use_mmap && std::filesystem::exists(temp_dir)) {
std::filesystem::remove_all(temp_dir);
}
} }
INSTANTIATE_TEST_SUITE_P(GroupChunkTranslatorTest, INSTANTIATE_TEST_SUITE_P(GroupChunkTranslatorTest,