enhance: [2.6] mmap once for each group chunk (#45893)

issue: #45486
pr: #45487

This commit refactors the chunk writing system by introducing a
two-phase
approach: size calculation followed by writing to a target. This enables
efficient group chunk creation where multiple fields share a single mmap
region, significantly reducing the number of mmap system calls and VMAs.

- Optimize `mmap` usage: single `mmap` per group chunk instead of per
field
- Split ChunkWriter into two phases:
  - `calculate_size()`: Pre-compute required memory without allocation
  - `write_to_target()`: Write data to a provided ChunkTarget
- Implement `ChunkMmapGuard` for unified mmap region lifecycle
management
  - Handles `munmap` and file cleanup via RAII
  - Shared via `std::shared_ptr` across multiple chunks in a group



---------

Signed-off-by: Shawn Wang <shawn.wang@zilliz.com>
This commit is contained in:
sparknack 2025-11-28 23:55:09 +08:00 committed by GitHub
parent 5d3e4dd038
commit 7afa1e7ce4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 899 additions and 577 deletions

View File

@ -38,6 +38,39 @@ namespace milvus {
constexpr uint64_t MMAP_STRING_PADDING = 1;
constexpr uint64_t MMAP_GEOMETRY_PADDING = 1;
constexpr uint64_t MMAP_ARRAY_PADDING = 1;
// Shared mmap region manager for group chunks
class ChunkMmapGuard {
public:
ChunkMmapGuard(char* mmap_ptr, size_t mmap_size, std::string file_path)
: mmap_ptr_(mmap_ptr), mmap_size_(mmap_size), file_path_(file_path) {
}
~ChunkMmapGuard() {
if (mmap_ptr_ != nullptr) {
munmap(mmap_ptr_, mmap_size_);
}
if (!file_path_.empty()) {
unlink(file_path_.c_str());
}
}
char*
get_ptr() const {
return mmap_ptr_;
}
bool
is_file_backed() const {
return !file_path_.empty();
}
private:
char* mmap_ptr_;
size_t mmap_size_;
const std::string file_path_;
};
class Chunk {
public:
Chunk() = default;
@ -45,12 +78,12 @@ class Chunk {
char* data,
uint64_t size,
bool nullable,
std::unique_ptr<MmapFileRAII> mmap_file_raii = nullptr)
std::shared_ptr<ChunkMmapGuard> chunk_mmap_guard)
: data_(data),
row_nums_(row_nums),
size_(size),
nullable_(nullable),
mmap_file_raii_(std::move(mmap_file_raii)) {
chunk_mmap_guard_(chunk_mmap_guard) {
if (nullable) {
valid_.reserve(row_nums);
for (int i = 0; i < row_nums; i++) {
@ -59,7 +92,7 @@ class Chunk {
}
}
virtual ~Chunk() {
munmap(data_, size_);
// The ChunkMmapGuard will handle the unmapping and unlinking of the file if it is file backed
}
uint64_t
@ -69,7 +102,7 @@ class Chunk {
cachinglayer::ResourceUsage
CellByteSize() const {
if (mmap_file_raii_) {
if (chunk_mmap_guard_ && chunk_mmap_guard_->is_file_backed()) {
return cachinglayer::ResourceUsage(0, static_cast<int64_t>(size_));
}
return cachinglayer::ResourceUsage(static_cast<int64_t>(size_), 0);
@ -109,7 +142,7 @@ class Chunk {
FixedVector<bool>
valid_; // parse null bitmap to valid_ to be compatible with SpanBase
std::unique_ptr<MmapFileRAII> mmap_file_raii_;
std::shared_ptr<ChunkMmapGuard> chunk_mmap_guard_{nullptr};
};
// for fixed size data, includes fixed size array
@ -121,8 +154,8 @@ class FixedWidthChunk : public Chunk {
uint64_t size,
uint64_t element_size,
bool nullable,
std::unique_ptr<MmapFileRAII> mmap_file_raii = nullptr)
: Chunk(row_nums, data, size, nullable, std::move(mmap_file_raii)),
std::shared_ptr<ChunkMmapGuard> chunk_mmap_guard)
: Chunk(row_nums, data, size, nullable, chunk_mmap_guard),
dim_(dim),
element_size_(element_size) {
auto null_bitmap_bytes_num = nullable_ ? (row_nums_ + 7) / 8 : 0;
@ -181,8 +214,8 @@ class StringChunk : public Chunk {
char* data,
uint64_t size,
bool nullable,
std::unique_ptr<MmapFileRAII> mmap_file_raii = nullptr)
: Chunk(row_nums, data, size, nullable, std::move(mmap_file_raii)) {
std::shared_ptr<ChunkMmapGuard> chunk_mmap_guard)
: Chunk(row_nums, data, size, nullable, chunk_mmap_guard) {
auto null_bitmap_bytes_num = nullable_ ? (row_nums_ + 7) / 8 : 0;
offsets_ = reinterpret_cast<uint32_t*>(data + null_bitmap_bytes_num);
}
@ -314,8 +347,8 @@ class ArrayChunk : public Chunk {
uint64_t size,
milvus::DataType element_type,
bool nullable,
std::unique_ptr<MmapFileRAII> mmap_file_raii = nullptr)
: Chunk(row_nums, data, size, nullable, std::move(mmap_file_raii)),
std::shared_ptr<ChunkMmapGuard> chunk_mmap_guard)
: Chunk(row_nums, data, size, nullable, chunk_mmap_guard),
element_type_(element_type) {
auto null_bitmap_bytes_num = 0;
if (nullable) {
@ -431,8 +464,8 @@ class VectorArrayChunk : public Chunk {
char* data,
uint64_t size,
milvus::DataType element_type,
std::unique_ptr<MmapFileRAII> mmap_file_raii = nullptr)
: Chunk(row_nums, data, size, false, std::move(mmap_file_raii)),
std::shared_ptr<ChunkMmapGuard> chunk_mmap_guard)
: Chunk(row_nums, data, size, false, chunk_mmap_guard),
dim_(dim),
element_type_(element_type) {
offsets_lens_ = reinterpret_cast<uint32_t*>(data);
@ -520,15 +553,14 @@ class VectorArrayChunk : public Chunk {
class SparseFloatVectorChunk : public Chunk {
public:
SparseFloatVectorChunk(
int32_t row_nums,
char* data,
uint64_t size,
bool nullable,
std::unique_ptr<MmapFileRAII> mmap_file_raii = nullptr)
: Chunk(row_nums, data, size, nullable, std::move(mmap_file_raii)) {
SparseFloatVectorChunk(int32_t row_nums,
char* data,
uint64_t size,
bool nullable,
std::shared_ptr<ChunkMmapGuard> chunk_mmap_guard)
: Chunk(row_nums, data, size, nullable, chunk_mmap_guard) {
vec_.resize(row_nums);
auto null_bitmap_bytes_num = (row_nums + 7) / 8;
auto null_bitmap_bytes_num = nullable ? (row_nums + 7) / 8 : 0;
auto offsets_ptr =
reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
for (int i = 0; i < row_nums; i++) {

View File

@ -27,9 +27,9 @@ MemChunkTarget::write(const void* data, size_t size) {
size_ += size;
}
std::pair<char*, size_t>
MemChunkTarget::get() {
return {data_, cap_};
char*
MemChunkTarget::release() {
return data_;
}
size_t
@ -39,6 +39,11 @@ MemChunkTarget::tell() {
void
MmapChunkTarget::flush() {
if (cap_ > size_) {
std::string padding(cap_ - size_, 0);
file_writer_->Write(padding.data(), cap_ - size_);
size_ = cap_;
}
file_writer_->Finish();
}
@ -48,17 +53,17 @@ MmapChunkTarget::write(const void* data, size_t size) {
size_ += size;
}
std::pair<char*, size_t>
MmapChunkTarget::get() {
char*
MmapChunkTarget::release() {
flush();
auto file = File::Open(file_path_, O_RDWR);
auto m = mmap(nullptr, size_, PROT_READ, MAP_SHARED, file.Descriptor(), 0);
auto m = mmap(nullptr, cap_, PROT_READ, MAP_SHARED, file.Descriptor(), 0);
AssertInfo(m != MAP_FAILED,
"failed to map: {}, map_size={}",
strerror(errno),
size_);
return {static_cast<char*>(m), size_};
cap_);
return static_cast<char*>(m);
}
size_t

View File

@ -23,46 +23,63 @@
namespace milvus {
class ChunkTarget {
public:
virtual void
write(const void* data, size_t size) = 0;
virtual std::pair<char*, size_t>
get() = 0;
static constexpr size_t ALIGNED_SIZE = 4096; // 4KB
virtual ~ChunkTarget() = default;
/**
* @brief write data to the target at the current position
* @param data the data to write
* @param size the size of the data to write
*/
virtual void
write(const void* data, size_t size) = 0;
/**
* @brief release the data pointer to the caller
* @note no write() should be called after release()
* @return the data pointer
*/
virtual char*
release() = 0;
/**
* @brief get the current position of the target
* @return the current position
*/
virtual size_t
tell() = 0;
};
class MmapChunkTarget : public ChunkTarget {
public:
explicit MmapChunkTarget(std::string file_path)
: file_path_(std::move(file_path)) {
explicit MmapChunkTarget(std::string file_path, size_t cap)
: file_path_(std::move(file_path)), cap_(cap) {
file_writer_ = std::make_unique<storage::FileWriter>(file_path_);
}
void
flush();
void
write(const void* data, size_t size) override;
std::pair<char*, size_t>
get() override;
char*
release() override;
size_t
tell() override;
private:
void
flush();
std::unique_ptr<storage::FileWriter> file_writer_{nullptr};
std::string file_path_{};
size_t size_ = 0;
size_t cap_{0};
size_t size_{0};
};
class MemChunkTarget : public ChunkTarget {
public:
MemChunkTarget(size_t cap) : cap_(cap) {
explicit MemChunkTarget(size_t cap) : cap_(cap) {
auto m = mmap(nullptr,
cap,
PROT_READ | PROT_WRITE,
@ -79,8 +96,8 @@ class MemChunkTarget : public ChunkTarget {
void
write(const void* data, size_t size) override;
std::pair<char*, size_t>
get() override;
char*
release() override;
size_t
tell() override;

File diff suppressed because it is too large Load Diff

View File

@ -13,7 +13,6 @@
#include <cstddef>
#include <cstdint>
#include <memory>
#include <numeric>
#include <utility>
#include <vector>
#include "arrow/array/array_primitive.h"
@ -21,37 +20,26 @@
#include "common/ChunkTarget.h"
#include "arrow/record_batch.h"
#include "common/Chunk.h"
#include "common/EasyAssert.h"
#include "common/FieldDataInterface.h"
#include "storage/FileWriter.h"
#include "common/Geometry.h"
namespace milvus {
class ChunkWriterBase {
public:
explicit ChunkWriterBase(bool nullable) : nullable_(nullable) {
}
ChunkWriterBase(std::string file_path, bool nullable)
: file_path_(std::move(file_path)), nullable_(nullable) {
}
virtual std::pair<size_t, size_t>
calculate_size(const arrow::ArrayVector& data) = 0;
virtual void
write(const arrow::ArrayVector& data) = 0;
virtual std::unique_ptr<Chunk>
finish() = 0;
std::pair<char*, size_t>
get_data() {
return target_->get();
}
write_to_target(const arrow::ArrayVector& array_vec,
const std::shared_ptr<ChunkTarget>& target) = 0;
protected:
void
write_null_bit_maps(
const std::vector<std::tuple<const uint8_t*, int64_t, int64_t>>&
null_bitmaps) {
null_bitmaps,
const std::shared_ptr<ChunkTarget>& target) {
if (nullable_) {
// merge all null bitmaps in case of multiple chunk null bitmap dislocation
// say [0xFF, 0x00] with size [7, 8] cannot be treated as [0xFF, 0x00] after merged but
@ -81,15 +69,13 @@ class ChunkWriterBase {
}
size_total_bit += size_bits;
}
target_->write(merged_null_bitmap.data(), (size_total_bit + 7) / 8);
target->write(merged_null_bitmap.data(), (size_total_bit + 7) / 8);
}
}
protected:
int row_nums_ = 0;
std::string file_path_{""};
size_t row_nums_ = 0;
bool nullable_ = false;
std::shared_ptr<ChunkTarget> target_;
};
template <typename ArrowType, typename T>
@ -98,30 +84,25 @@ class ChunkWriter final : public ChunkWriterBase {
ChunkWriter(int dim, bool nullable) : ChunkWriterBase(nullable), dim_(dim) {
}
ChunkWriter(int dim, std::string file_path, bool nullable)
: ChunkWriterBase(std::move(file_path), nullable), dim_(dim){};
void
write(const arrow::ArrayVector& array_vec) override {
auto size = 0;
auto row_nums = 0;
std::pair<size_t, size_t>
calculate_size(const arrow::ArrayVector& array_vec) override {
size_t size = 0;
size_t row_nums = 0;
for (const auto& data : array_vec) {
row_nums += data->length();
auto array = std::static_pointer_cast<ArrowType>(data);
if (nullable_) {
auto null_bitmap_n = (data->length() + 7) / 8;
size += null_bitmap_n;
}
size += array->length() * dim_ * sizeof(T);
}
row_nums_ = row_nums;
if (!file_path_.empty()) {
target_ = std::make_shared<MmapChunkTarget>(file_path_);
} else {
target_ = std::make_shared<MemChunkTarget>(size);
if (nullable_) {
size += (row_nums + 7) / 8;
}
row_nums_ = row_nums;
return {size, row_nums};
}
void
write_to_target(const arrow::ArrayVector& array_vec,
const std::shared_ptr<ChunkTarget>& target) override {
// Chunk layout:
// 1. Null bitmap (if nullable_=true): Indicates which values are null
// 2. Data values: Contiguous storage of data elements in the order:
@ -134,55 +115,25 @@ class ChunkWriter final : public ChunkWriterBase {
null_bitmaps.emplace_back(
data->null_bitmap_data(), data->length(), data->offset());
}
write_null_bit_maps(null_bitmaps);
write_null_bit_maps(null_bitmaps, target);
}
for (const auto& data : array_vec) {
auto array = std::static_pointer_cast<ArrowType>(data);
auto data_ptr = array->raw_values();
target_->write(data_ptr, array->length() * dim_ * sizeof(T));
target->write(data_ptr, array->length() * dim_ * sizeof(T));
}
}
std::unique_ptr<Chunk>
finish() override {
auto [data, size] = target_->get();
auto mmap_file_raii = file_path_.empty()
? nullptr
: std::make_unique<MmapFileRAII>(file_path_);
return std::make_unique<FixedWidthChunk>(row_nums_,
dim_,
data,
size,
sizeof(T),
nullable_,
std::move(mmap_file_raii));
}
private:
int dim_;
const int64_t dim_;
};
template <>
inline void
ChunkWriter<arrow::BooleanArray, bool>::write(
const arrow::ArrayVector& array_vec) {
auto size = 0;
auto row_nums = 0;
for (const auto& data : array_vec) {
row_nums += data->length();
auto array = std::dynamic_pointer_cast<arrow::BooleanArray>(data);
size += array->length() * dim_;
size += (data->length() + 7) / 8;
}
row_nums_ = row_nums;
if (!file_path_.empty()) {
target_ = std::make_shared<MmapChunkTarget>(file_path_);
} else {
target_ = std::make_shared<MemChunkTarget>(size);
}
ChunkWriter<arrow::BooleanArray, bool>::write_to_target(
const arrow::ArrayVector& array_vec,
const std::shared_ptr<ChunkTarget>& target) {
if (nullable_) {
// tuple <data, size, offset>
std::vector<std::tuple<const uint8_t*, int64_t, int64_t>> null_bitmaps;
@ -190,14 +141,14 @@ ChunkWriter<arrow::BooleanArray, bool>::write(
null_bitmaps.emplace_back(
data->null_bitmap_data(), data->length(), data->offset());
}
write_null_bit_maps(null_bitmaps);
write_null_bit_maps(null_bitmaps, target);
}
for (const auto& data : array_vec) {
auto array = std::dynamic_pointer_cast<arrow::BooleanArray>(data);
for (int i = 0; i < array->length(); i++) {
auto value = array->Value(i);
target_->write(&value, sizeof(bool));
target->write(&value, sizeof(bool));
}
}
}
@ -206,32 +157,39 @@ class StringChunkWriter : public ChunkWriterBase {
public:
using ChunkWriterBase::ChunkWriterBase;
void
write(const arrow::ArrayVector& array_vec) override;
std::pair<size_t, size_t>
calculate_size(const arrow::ArrayVector& array_vec) override;
std::unique_ptr<Chunk>
finish() override;
void
write_to_target(const arrow::ArrayVector& array_vec,
const std::shared_ptr<ChunkTarget>& target) override;
private:
std::vector<std::string_view> strs_;
};
class JSONChunkWriter : public ChunkWriterBase {
public:
using ChunkWriterBase::ChunkWriterBase;
void
write(const arrow::ArrayVector& array_vec) override;
std::pair<size_t, size_t>
calculate_size(const arrow::ArrayVector& array_vec) override;
std::unique_ptr<Chunk>
finish() override;
void
write_to_target(const arrow::ArrayVector& array_vec,
const std::shared_ptr<ChunkTarget>& target) override;
};
class GeometryChunkWriter : public ChunkWriterBase {
public:
using ChunkWriterBase::ChunkWriterBase;
void
write(const arrow::ArrayVector& array_vec) override;
std::unique_ptr<Chunk>
finish() override;
std::pair<size_t, size_t>
calculate_size(const arrow::ArrayVector& array_vec) override;
void
write_to_target(const arrow::ArrayVector& array_vec,
const std::shared_ptr<ChunkTarget>& target) override;
};
class ArrayChunkWriter : public ChunkWriterBase {
@ -239,18 +197,13 @@ class ArrayChunkWriter : public ChunkWriterBase {
ArrayChunkWriter(const milvus::DataType element_type, bool nullable)
: ChunkWriterBase(nullable), element_type_(element_type) {
}
ArrayChunkWriter(const milvus::DataType element_type,
std::string file_path,
bool nullable)
: ChunkWriterBase(std::move(file_path), nullable),
element_type_(element_type) {
}
std::pair<size_t, size_t>
calculate_size(const arrow::ArrayVector& array_vec) override;
void
write(const arrow::ArrayVector& array_vec) override;
std::unique_ptr<Chunk>
finish() override;
write_to_target(const arrow::ArrayVector& array_vec,
const std::shared_ptr<ChunkTarget>& target) override;
private:
const milvus::DataType element_type_;
@ -258,46 +211,44 @@ class ArrayChunkWriter : public ChunkWriterBase {
class VectorArrayChunkWriter : public ChunkWriterBase {
public:
VectorArrayChunkWriter(int64_t dim,
const milvus::DataType element_type,
std::string file_path = "")
: ChunkWriterBase(std::move(file_path), false),
element_type_(element_type),
dim_(dim) {
VectorArrayChunkWriter(int64_t dim, const milvus::DataType element_type)
: ChunkWriterBase(false), element_type_(element_type), dim_(dim) {
}
void
write(const arrow::ArrayVector& array_vec) override;
std::pair<size_t, size_t>
calculate_size(const arrow::ArrayVector& array_vec) override;
std::unique_ptr<Chunk>
finish() override;
void
write_to_target(const arrow::ArrayVector& array_vec,
const std::shared_ptr<ChunkTarget>& target) override;
private:
size_t
calculateTotalSize(const arrow::ArrayVector& array_vec);
const milvus::DataType element_type_;
int64_t dim_;
const int64_t dim_;
};
class SparseFloatVectorChunkWriter : public ChunkWriterBase {
public:
using ChunkWriterBase::ChunkWriterBase;
std::pair<size_t, size_t>
calculate_size(const arrow::ArrayVector& array_vec) override;
void
write(const arrow::ArrayVector& array_vec) override;
std::unique_ptr<Chunk>
finish() override;
write_to_target(const arrow::ArrayVector& array_vec,
const std::shared_ptr<ChunkTarget>& target) override;
};
std::unique_ptr<Chunk>
create_chunk(const FieldMeta& field_meta, const arrow::ArrayVector& array_vec);
std::unique_ptr<Chunk>
create_chunk(const FieldMeta& field_meta,
const arrow::ArrayVector& array_vec,
const std::string& file_path);
const std::string& file_path = "");
std::unordered_map<FieldId, std::shared_ptr<Chunk>>
create_group_chunk(const std::vector<FieldId>& field_ids,
const std::vector<FieldMeta>& field_metas,
const std::vector<arrow::ArrayVector>& array_vec,
const std::string& file_path = "");
arrow::ArrayVector
read_single_column_batches(std::shared_ptr<arrow::RecordBatchReader> reader);

View File

@ -98,4 +98,9 @@ class GroupChunk {
std::unordered_map<FieldId, std::shared_ptr<Chunk>> chunks_;
};
enum class GroupChunkType : uint8_t {
DEFAULT = 0,
JSON_KEY_STATS = 1,
};
} // namespace milvus

View File

@ -21,6 +21,7 @@
#include "common/Chunk.h"
#include "common/VectorArray.h"
#include "common/Types.h"
#include "common/FieldMeta.h"
#include "pb/schema.pb.h"
#include "test_utils/DataGen.h"
@ -243,11 +244,14 @@ TEST_F(VectorArrayChunkTest, TestWriteMultipleBatches) {
createFloatVectorListArray(batch_data, batch_offsets, dim));
}
// Write using VectorArrayChunkWriter
VectorArrayChunkWriter writer(dim, DataType::VECTOR_FLOAT);
writer.write(array_vec);
auto chunk = writer.finish();
// Write using create_chunk with FieldMeta
FieldMeta field_meta(FieldName("va"),
FieldId(1),
DataType::VECTOR_ARRAY,
DataType::VECTOR_FLOAT,
dim,
std::nullopt);
auto chunk = create_chunk(field_meta, array_vec);
auto vector_array_chunk = static_cast<VectorArrayChunk*>(chunk.get());
// Verify total rows
@ -297,11 +301,14 @@ TEST_F(VectorArrayChunkTest, TestWriteWithMmap) {
auto list_array = createFloatVectorListArray(all_data, offsets, dim);
arrow::ArrayVector array_vec = {list_array};
// Write with mmap
VectorArrayChunkWriter writer(dim, DataType::VECTOR_FLOAT, temp_file);
writer.write(array_vec);
auto chunk = writer.finish();
// Write with mmap using create_chunk
FieldMeta field_meta(FieldName("va"),
FieldId(2),
DataType::VECTOR_ARRAY,
DataType::VECTOR_FLOAT,
dim,
std::nullopt);
auto chunk = create_chunk(field_meta, array_vec, temp_file);
auto vector_array_chunk = static_cast<VectorArrayChunk*>(chunk.get());
// Verify mmap write
@ -330,10 +337,13 @@ TEST_F(VectorArrayChunkTest, TestEmptyVectorArray) {
arrow::ArrayVector array_vec;
VectorArrayChunkWriter writer(dim, DataType::VECTOR_FLOAT);
writer.write(array_vec);
auto chunk = writer.finish();
FieldMeta field_meta(FieldName("va"),
FieldId(3),
DataType::VECTOR_ARRAY,
DataType::VECTOR_FLOAT,
dim,
std::nullopt);
auto chunk = create_chunk(field_meta, array_vec);
auto vector_array_chunk = static_cast<VectorArrayChunk*>(chunk.get());
EXPECT_EQ(vector_array_chunk->RowNums(), 0);
@ -469,11 +479,14 @@ TEST_P(VectorArrayChunkParameterizedTest, TestWriteVectorArray) {
createVectorListArray(all_data, offsets, param.dim, param.data_type);
arrow::ArrayVector array_vec = {list_array};
// Test VectorArrayChunkWriter
VectorArrayChunkWriter writer(param.dim, param.data_type);
writer.write(array_vec);
auto chunk = writer.finish();
// Test create_chunk with FieldMeta for VECTOR_ARRAY
FieldMeta field_meta(FieldName("va_param"),
FieldId(100),
DataType::VECTOR_ARRAY,
param.data_type,
param.dim,
std::nullopt);
auto chunk = create_chunk(field_meta, array_vec);
auto vector_array_chunk = static_cast<VectorArrayChunk*>(chunk.get());
// Verify results

View File

@ -898,7 +898,7 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id,
auto enable_mmap = !mmap_filepath_.empty();
auto column_group_info =
FieldDataInfo(column_group_id, num_rows, mmap_filepath_);
FieldDataInfo(column_group_id, field_id_, num_rows, mmap_filepath_);
LOG_INFO(
"loads column group {} with num_rows {} for segment "
"{}",
@ -923,6 +923,7 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id,
auto translator = std::make_unique<
milvus::segcore::storagev2translator::GroupChunkTranslator>(
segment_id_,
GroupChunkType::JSON_KEY_STATS,
field_meta_map,
column_group_info,
files,
@ -968,9 +969,11 @@ JsonKeyStats::LoadShreddingData(const std::vector<std::string>& index_files) {
void
JsonKeyStats::Load(milvus::tracer::TraceContext ctx, const Config& config) {
if (config.contains(MMAP_FILE_PATH)) {
mmap_filepath_ = GetValueFromConfig<std::string>(config, MMAP_FILE_PATH)
.value_or("");
if (config.contains(ENABLE_MMAP)) {
mmap_filepath_ =
milvus::storage::LocalChunkManagerSingleton::GetInstance()
.GetChunkManager()
->GetRootPath();
LOG_INFO("load json stats for segment {} with mmap local file path: {}",
segment_id_,
mmap_filepath_);

View File

@ -105,7 +105,9 @@ class DefaultColumnGroupingStrategy : public ColumnGroupingStrategy {
CreateGroups(const TableStatsInfo& table_info) const override {
// put all columns into one group
std::vector<std::vector<int>> column_groups;
column_groups.reserve(1);
std::vector<int> group;
group.reserve(table_info.schema->num_fields());
for (size_t i = 0; i < table_info.schema->num_fields(); ++i) {
group.push_back(i);
}

View File

@ -23,8 +23,10 @@ TEST(test_chunked_column, test_get_chunkid) {
std::vector<std::unique_ptr<Chunk>> chunks;
for (auto i = 0; i < num_chunks; ++i) {
auto row_num = num_rows_per_chunk[i];
auto chunk =
std::make_unique<FixedWidthChunk>(row_num, 1, nullptr, 0, 4, false);
auto chunk_mmap_guard =
std::make_shared<ChunkMmapGuard>(nullptr, 0, "");
auto chunk = std::make_unique<FixedWidthChunk>(
row_num, 1, nullptr, 0, 4, false, chunk_mmap_guard);
chunks.push_back(std::move(chunk));
}
auto translator = std::make_unique<TestChunkTranslator>(

View File

@ -40,10 +40,24 @@ struct FieldDataInfo {
arrow_reader_channel = std::make_shared<ArrowReaderChannel>();
}
FieldDataInfo(int64_t field_id,
int64_t main_field_id,
size_t row_count,
std::string mmap_dir_path = "",
bool in_load_list = false)
: field_id(field_id),
row_count(row_count),
main_field_id(main_field_id),
mmap_dir_path(std::move(mmap_dir_path)),
in_load_list(in_load_list) {
arrow_reader_channel = std::make_shared<ArrowReaderChannel>();
}
int64_t field_id;
int64_t main_field_id{INVALID_FIELD_ID}; // used for json stats only
size_t row_count;
std::string mmap_dir_path;
std::string mmap_dir_path{};
std::shared_ptr<ArrowReaderChannel> arrow_reader_channel;
bool in_load_list = false;
bool in_load_list{false};
};
} // namespace milvus

View File

@ -250,8 +250,16 @@ class CachedSearchIteratorTest
memcpy(chunk_data.data(),
base_dataset_.cbegin() + offset * dim_,
rows * dim_ * sizeof(float));
chunks.emplace_back(std::make_unique<FixedWidthChunk>(
rows, dim_, chunk_data.data(), buf_size, sizeof(float), false));
auto chunk_mmap_guard =
std::make_shared<ChunkMmapGuard>(nullptr, 0, "");
chunks.emplace_back(
std::make_unique<FixedWidthChunk>(rows,
dim_,
chunk_data.data(),
buf_size,
sizeof(float),
false,
chunk_mmap_guard));
offset += rows;
}
auto translator = std::make_unique<TestChunkTranslator>(

View File

@ -317,6 +317,7 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal(
auto translator =
std::make_unique<storagev2translator::GroupChunkTranslator>(
get_segment_id(),
GroupChunkType::DEFAULT,
field_metas,
column_group_info,
insert_files,

View File

@ -95,8 +95,10 @@ TEST(test_chunk_segment, TestSearchOnSealed) {
defer.AddDefer([buf]() { delete[] buf; });
memcpy(buf, data.data(), 4 * data.size());
auto chunk_mmap_guard =
std::make_shared<ChunkMmapGuard>(nullptr, 0, "");
chunks.emplace_back(std::make_unique<FixedWidthChunk>(
chunk_size, dim, buf, buf_size, 4, false));
chunk_size, dim, buf, buf_size, 4, false, chunk_mmap_guard));
}
auto translator = std::make_unique<TestChunkTranslator>(

View File

@ -172,14 +172,6 @@ ChunkTranslator::get_cells(
auto data_type = field_meta_.get_data_type();
std::filesystem::path folder;
if (use_mmap_) {
folder = std::filesystem::path(mmap_dir_path_) /
std::to_string(segment_id_) / std::to_string(field_id_);
std::filesystem::create_directories(folder);
}
for (auto cid : cids) {
std::unique_ptr<milvus::Chunk> chunk = nullptr;
if (!use_mmap_) {
@ -192,7 +184,11 @@ ChunkTranslator::get_cells(
chunk = create_chunk(field_meta_, array_vec);
} else {
// we don't know the resulting file size beforehand, thus using a separate file for each chunk.
auto filepath = folder / std::to_string(cid);
auto filepath =
std::filesystem::path(mmap_dir_path_) /
fmt::format("seg_{}_fid_{}_{}", segment_id_, field_id_, cid);
std::filesystem::create_directories(
std::filesystem::path(mmap_dir_path_));
LOG_INFO("segment {} mmaping field {} chunk {} to path {}",
segment_id_,

View File

@ -45,6 +45,7 @@ namespace milvus::segcore::storagev2translator {
GroupChunkTranslator::GroupChunkTranslator(
int64_t segment_id,
GroupChunkType group_chunk_type,
const std::unordered_map<FieldId, FieldMeta>& field_metas,
FieldDataInfo column_group_info,
std::vector<std::string> insert_files,
@ -52,7 +53,23 @@ GroupChunkTranslator::GroupChunkTranslator(
int64_t num_fields,
milvus::proto::common::LoadPriority load_priority)
: segment_id_(segment_id),
key_(fmt::format("seg_{}_cg_{}", segment_id, column_group_info.field_id)),
group_chunk_type_(group_chunk_type),
key_([&]() {
switch (group_chunk_type) {
case GroupChunkType::DEFAULT:
return fmt::format(
"seg_{}_cg_{}", segment_id, column_group_info.field_id);
case GroupChunkType::JSON_KEY_STATS:
AssertInfo(
column_group_info.main_field_id != INVALID_FIELD_ID,
"main field id is not set for json key stats group "
"chunk");
return fmt::format("seg_{}_jks_{}_cg_{}",
segment_id,
column_group_info.main_field_id,
column_group_info.field_id);
}
}()),
field_metas_(field_metas),
column_group_info_(column_group_info),
insert_files_(insert_files),
@ -298,6 +315,12 @@ GroupChunkTranslator::load_group_chunk(
// Create chunks for each field in this batch
std::unordered_map<FieldId, std::shared_ptr<Chunk>> chunks;
// Iterate through field_id_list to get field_id and create chunk
std::vector<FieldId> field_ids;
std::vector<FieldMeta> field_metas;
std::vector<arrow::ArrayVector> array_vecs;
field_metas.reserve(table->schema()->num_fields());
array_vecs.reserve(table->schema()->num_fields());
for (int i = 0; i < table->schema()->num_fields(); ++i) {
AssertInfo(table->schema()->field(i)->metadata()->Contains(
milvus_storage::ARROW_FIELD_ID_KEY),
@ -324,41 +347,41 @@ GroupChunkTranslator::load_group_chunk(
fid.get());
const auto& field_meta = it->second;
const arrow::ArrayVector& array_vec = table->column(i)->chunks();
std::unique_ptr<Chunk> chunk;
if (!use_mmap_) {
// Memory mode
chunk = create_chunk(field_meta, array_vec);
} else {
// Mmap mode
std::filesystem::path filepath;
if (field_meta.get_main_field_id() != INVALID_FIELD_ID) {
// json shredding mode
field_ids.push_back(fid);
field_metas.push_back(field_meta);
array_vecs.push_back(array_vec);
}
if (!use_mmap_) {
chunks = create_group_chunk(field_ids, field_metas, array_vecs);
} else {
std::filesystem::path filepath;
switch (group_chunk_type_) {
case GroupChunkType::DEFAULT:
filepath =
std::filesystem::path(column_group_info_.mmap_dir_path) /
std::to_string(segment_id_) /
std::to_string(field_meta.get_main_field_id()) /
std::to_string(field_id) / std::to_string(cid);
} else {
fmt::format("seg_{}_cg_{}_{}",
segment_id_,
column_group_info_.field_id,
cid);
break;
case GroupChunkType::JSON_KEY_STATS:
filepath =
std::filesystem::path(column_group_info_.mmap_dir_path) /
std::to_string(segment_id_) / std::to_string(field_id) /
std::to_string(cid);
}
LOG_INFO(
"[StorageV2] translator {} mmaping field {} chunk {} to path "
"{}",
key_,
field_id,
cid,
filepath.string());
std::filesystem::create_directories(filepath.parent_path());
chunk = create_chunk(field_meta, array_vec, filepath.string());
fmt::format("seg_{}_jks_{}_cg_{}_{}",
segment_id_,
column_group_info_.main_field_id,
column_group_info_.field_id,
cid);
break;
default:
ThrowInfo(ErrorCode::UnexpectedError,
"unknown group chunk type: {}",
static_cast<uint8_t>(group_chunk_type_));
}
chunks[fid] = std::move(chunk);
std::filesystem::create_directories(filepath.parent_path());
chunks = create_group_chunk(
field_ids, field_metas, array_vecs, filepath.string());
}
return std::make_unique<milvus::GroupChunk>(chunks);
}

View File

@ -37,6 +37,7 @@ class GroupChunkTranslator
public:
GroupChunkTranslator(
int64_t segment_id,
GroupChunkType group_chunk_type,
const std::unordered_map<FieldId, FieldMeta>& field_metas,
FieldDataInfo column_group_info,
std::vector<std::string> insert_files,
@ -97,6 +98,7 @@ class GroupChunkTranslator
const milvus::cachinglayer::cid_t cid);
int64_t segment_id_;
GroupChunkType group_chunk_type_{GroupChunkType::DEFAULT};
std::string key_;
std::unordered_map<FieldId, FieldMeta> field_metas_;
FieldDataInfo column_group_info_;

View File

@ -97,12 +97,17 @@ class GroupChunkTranslatorTest : public ::testing::TestWithParam<bool> {
};
TEST_P(GroupChunkTranslatorTest, TestWithMmap) {
auto temp_dir =
std::filesystem::temp_directory_path() / "gctt_test_with_mmap";
std::filesystem::create_directory(temp_dir);
auto use_mmap = GetParam();
std::unordered_map<FieldId, FieldMeta> field_metas = schema_->get_fields();
auto column_group_info = FieldDataInfo(0, 3000, "");
auto column_group_info = FieldDataInfo(0, 3000, temp_dir.string());
auto translator = std::make_unique<GroupChunkTranslator>(
segment_id_,
GroupChunkType::DEFAULT,
field_metas,
column_group_info,
paths_,
@ -164,17 +169,23 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) {
EXPECT_EQ(meta->chunk_memory_size_.size(), num_cells);
EXPECT_EQ(expected_total_size, chunked_column_group->memory_size());
// Verify the mmap files for cell 0 and 1 are created
std::vector<std::string> mmap_paths = {
(temp_dir / "seg_0_cg_0_0").string(),
(temp_dir / "seg_0_cg_0_1").string()};
// Verify mmap directory and files if in mmap mode
if (use_mmap) {
std::string mmap_dir = std::to_string(segment_id_);
EXPECT_TRUE(std::filesystem::exists(mmap_dir));
for (const auto& mmap_path : mmap_paths) {
EXPECT_TRUE(std::filesystem::exists(mmap_path));
}
}
// DO NOT Verify each field has a corresponding file: files are unlinked immediately after being mmaped.
// for (size_t i = 0; i < field_id_list.size(); ++i) {
// auto field_id = field_id_list.Get(i);
// std::string field_file = mmap_dir + "/" + std::to_string(field_id);
// EXPECT_TRUE(std::filesystem::exists(field_file));
// }
// Clean up mmap files
if (use_mmap) {
for (const auto& mmap_path : mmap_paths) {
std::filesystem::remove(mmap_path);
}
std::filesystem::remove(temp_dir);
}
}
@ -228,10 +239,14 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) {
AssertInfo(status.ok(), "failed to close file reader");
}
auto column_group_info = FieldDataInfo(0, total_rows, "");
auto temp_dir =
std::filesystem::temp_directory_path() / "gctt_test_multiple_files";
std::filesystem::create_directory(temp_dir);
auto column_group_info = FieldDataInfo(0, total_rows, temp_dir.string());
auto translator = std::make_unique<GroupChunkTranslator>(
segment_id_,
GroupChunkType::DEFAULT,
field_metas,
column_group_info,
multi_file_paths,
@ -310,6 +325,10 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) {
std::filesystem::remove(file_path);
}
}
// Clean up cached column group files
if (use_mmap && std::filesystem::exists(temp_dir)) {
std::filesystem::remove_all(temp_dir);
}
}
INSTANTIATE_TEST_SUITE_P(GroupChunkTranslatorTest,