diff --git a/internal/core/src/common/Chunk.h b/internal/core/src/common/Chunk.h index 9d3512f24c..3fb44b1cce 100644 --- a/internal/core/src/common/Chunk.h +++ b/internal/core/src/common/Chunk.h @@ -38,6 +38,39 @@ namespace milvus { constexpr uint64_t MMAP_STRING_PADDING = 1; constexpr uint64_t MMAP_GEOMETRY_PADDING = 1; constexpr uint64_t MMAP_ARRAY_PADDING = 1; + +// Shared mmap region manager for group chunks +class ChunkMmapGuard { + public: + ChunkMmapGuard(char* mmap_ptr, size_t mmap_size, std::string file_path) + : mmap_ptr_(mmap_ptr), mmap_size_(mmap_size), file_path_(file_path) { + } + + ~ChunkMmapGuard() { + if (mmap_ptr_ != nullptr) { + munmap(mmap_ptr_, mmap_size_); + } + if (!file_path_.empty()) { + unlink(file_path_.c_str()); + } + } + + char* + get_ptr() const { + return mmap_ptr_; + } + + bool + is_file_backed() const { + return !file_path_.empty(); + } + + private: + char* mmap_ptr_; + size_t mmap_size_; + const std::string file_path_; +}; + class Chunk { public: Chunk() = default; @@ -45,12 +78,12 @@ class Chunk { char* data, uint64_t size, bool nullable, - std::unique_ptr mmap_file_raii = nullptr) + std::shared_ptr chunk_mmap_guard) : data_(data), row_nums_(row_nums), size_(size), nullable_(nullable), - mmap_file_raii_(std::move(mmap_file_raii)) { + chunk_mmap_guard_(chunk_mmap_guard) { if (nullable) { valid_.reserve(row_nums); for (int i = 0; i < row_nums; i++) { @@ -59,7 +92,7 @@ class Chunk { } } virtual ~Chunk() { - munmap(data_, size_); + // The ChunkMmapGuard will handle the unmapping and unlinking of the file if it is file backed } uint64_t @@ -69,7 +102,7 @@ class Chunk { cachinglayer::ResourceUsage CellByteSize() const { - if (mmap_file_raii_) { + if (chunk_mmap_guard_ && chunk_mmap_guard_->is_file_backed()) { return cachinglayer::ResourceUsage(0, static_cast(size_)); } return cachinglayer::ResourceUsage(static_cast(size_), 0); @@ -109,7 +142,7 @@ class Chunk { FixedVector valid_; // parse null bitmap to valid_ to be compatible with SpanBase - std::unique_ptr mmap_file_raii_; + std::shared_ptr chunk_mmap_guard_{nullptr}; }; // for fixed size data, includes fixed size array @@ -121,8 +154,8 @@ class FixedWidthChunk : public Chunk { uint64_t size, uint64_t element_size, bool nullable, - std::unique_ptr mmap_file_raii = nullptr) - : Chunk(row_nums, data, size, nullable, std::move(mmap_file_raii)), + std::shared_ptr chunk_mmap_guard) + : Chunk(row_nums, data, size, nullable, chunk_mmap_guard), dim_(dim), element_size_(element_size) { auto null_bitmap_bytes_num = nullable_ ? (row_nums_ + 7) / 8 : 0; @@ -181,8 +214,8 @@ class StringChunk : public Chunk { char* data, uint64_t size, bool nullable, - std::unique_ptr mmap_file_raii = nullptr) - : Chunk(row_nums, data, size, nullable, std::move(mmap_file_raii)) { + std::shared_ptr chunk_mmap_guard) + : Chunk(row_nums, data, size, nullable, chunk_mmap_guard) { auto null_bitmap_bytes_num = nullable_ ? (row_nums_ + 7) / 8 : 0; offsets_ = reinterpret_cast(data + null_bitmap_bytes_num); } @@ -314,8 +347,8 @@ class ArrayChunk : public Chunk { uint64_t size, milvus::DataType element_type, bool nullable, - std::unique_ptr mmap_file_raii = nullptr) - : Chunk(row_nums, data, size, nullable, std::move(mmap_file_raii)), + std::shared_ptr chunk_mmap_guard) + : Chunk(row_nums, data, size, nullable, chunk_mmap_guard), element_type_(element_type) { auto null_bitmap_bytes_num = 0; if (nullable) { @@ -431,8 +464,8 @@ class VectorArrayChunk : public Chunk { char* data, uint64_t size, milvus::DataType element_type, - std::unique_ptr mmap_file_raii = nullptr) - : Chunk(row_nums, data, size, false, std::move(mmap_file_raii)), + std::shared_ptr chunk_mmap_guard) + : Chunk(row_nums, data, size, false, chunk_mmap_guard), dim_(dim), element_type_(element_type) { offsets_lens_ = reinterpret_cast(data); @@ -520,15 +553,14 @@ class VectorArrayChunk : public Chunk { class SparseFloatVectorChunk : public Chunk { public: - SparseFloatVectorChunk( - int32_t row_nums, - char* data, - uint64_t size, - bool nullable, - std::unique_ptr mmap_file_raii = nullptr) - : Chunk(row_nums, data, size, nullable, std::move(mmap_file_raii)) { + SparseFloatVectorChunk(int32_t row_nums, + char* data, + uint64_t size, + bool nullable, + std::shared_ptr chunk_mmap_guard) + : Chunk(row_nums, data, size, nullable, chunk_mmap_guard) { vec_.resize(row_nums); - auto null_bitmap_bytes_num = (row_nums + 7) / 8; + auto null_bitmap_bytes_num = nullable ? (row_nums + 7) / 8 : 0; auto offsets_ptr = reinterpret_cast(data + null_bitmap_bytes_num); for (int i = 0; i < row_nums; i++) { diff --git a/internal/core/src/common/ChunkTarget.cpp b/internal/core/src/common/ChunkTarget.cpp index 81c9fb67ad..730fffa17a 100644 --- a/internal/core/src/common/ChunkTarget.cpp +++ b/internal/core/src/common/ChunkTarget.cpp @@ -27,9 +27,9 @@ MemChunkTarget::write(const void* data, size_t size) { size_ += size; } -std::pair -MemChunkTarget::get() { - return {data_, cap_}; +char* +MemChunkTarget::release() { + return data_; } size_t @@ -39,6 +39,11 @@ MemChunkTarget::tell() { void MmapChunkTarget::flush() { + if (cap_ > size_) { + std::string padding(cap_ - size_, 0); + file_writer_->Write(padding.data(), cap_ - size_); + size_ = cap_; + } file_writer_->Finish(); } @@ -48,17 +53,17 @@ MmapChunkTarget::write(const void* data, size_t size) { size_ += size; } -std::pair -MmapChunkTarget::get() { +char* +MmapChunkTarget::release() { flush(); auto file = File::Open(file_path_, O_RDWR); - auto m = mmap(nullptr, size_, PROT_READ, MAP_SHARED, file.Descriptor(), 0); + auto m = mmap(nullptr, cap_, PROT_READ, MAP_SHARED, file.Descriptor(), 0); AssertInfo(m != MAP_FAILED, "failed to map: {}, map_size={}", strerror(errno), - size_); - return {static_cast(m), size_}; + cap_); + return static_cast(m); } size_t diff --git a/internal/core/src/common/ChunkTarget.h b/internal/core/src/common/ChunkTarget.h index cf44b66d31..86c4b257d9 100644 --- a/internal/core/src/common/ChunkTarget.h +++ b/internal/core/src/common/ChunkTarget.h @@ -23,46 +23,63 @@ namespace milvus { class ChunkTarget { public: - virtual void - write(const void* data, size_t size) = 0; - - virtual std::pair - get() = 0; + static constexpr size_t ALIGNED_SIZE = 4096; // 4KB virtual ~ChunkTarget() = default; + /** + * @brief write data to the target at the current position + * @param data the data to write + * @param size the size of the data to write + */ + virtual void + write(const void* data, size_t size) = 0; + + /** + * @brief release the data pointer to the caller + * @note no write() should be called after release() + * @return the data pointer + */ + virtual char* + release() = 0; + + /** + * @brief get the current position of the target + * @return the current position + */ virtual size_t tell() = 0; }; class MmapChunkTarget : public ChunkTarget { public: - explicit MmapChunkTarget(std::string file_path) - : file_path_(std::move(file_path)) { + explicit MmapChunkTarget(std::string file_path, size_t cap) + : file_path_(std::move(file_path)), cap_(cap) { file_writer_ = std::make_unique(file_path_); } - void - flush(); - void write(const void* data, size_t size) override; - std::pair - get() override; + char* + release() override; size_t tell() override; private: + void + flush(); + std::unique_ptr file_writer_{nullptr}; std::string file_path_{}; - size_t size_ = 0; + size_t cap_{0}; + size_t size_{0}; }; class MemChunkTarget : public ChunkTarget { public: - MemChunkTarget(size_t cap) : cap_(cap) { + explicit MemChunkTarget(size_t cap) : cap_(cap) { auto m = mmap(nullptr, cap, PROT_READ | PROT_WRITE, @@ -79,8 +96,8 @@ class MemChunkTarget : public ChunkTarget { void write(const void* data, size_t size) override; - std::pair - get() override; + char* + release() override; size_t tell() override; diff --git a/internal/core/src/common/ChunkWriter.cpp b/internal/core/src/common/ChunkWriter.cpp index 0808279f42..a7e9e09c93 100644 --- a/internal/core/src/common/ChunkWriter.cpp +++ b/internal/core/src/common/ChunkWriter.cpp @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include "arrow/array/array_binary.h" @@ -22,19 +21,36 @@ #include "arrow/type_fwd.h" #include "common/Chunk.h" #include "common/EasyAssert.h" -#include "common/FieldDataInterface.h" -#include "common/Geometry.h" #include "common/Types.h" -#include "common/VectorTrait.h" -#include "simdjson/common_defs.h" #include "simdjson/padded_string.h" -#include "storage/FileWriter.h" namespace milvus { +std::pair +StringChunkWriter::calculate_size(const arrow::ArrayVector& array_vec) { + row_nums_ = 0; + size_t size = 0; + // tuple + std::vector> null_bitmaps; + for (const auto& data : array_vec) { + // for bson, we use binary array to store the string + auto array = std::dynamic_pointer_cast(data); + for (int i = 0; i < array->length(); i++) { + auto str = array->GetView(i); + size += str.size(); + } + row_nums_ += array->length(); + } + if (nullable_) { + size += (row_nums_ + 7) / 8; + } + size += sizeof(uint32_t) * (row_nums_ + 1) + MMAP_STRING_PADDING; + return {size, row_nums_}; +} + void -StringChunkWriter::write(const arrow::ArrayVector& array_vec) { - auto size = 0; +StringChunkWriter::write_to_target(const arrow::ArrayVector& array_vec, + const std::shared_ptr& target) { std::vector strs; // tuple std::vector> null_bitmaps; @@ -44,32 +60,23 @@ StringChunkWriter::write(const arrow::ArrayVector& array_vec) { for (int i = 0; i < array->length(); i++) { auto str = array->GetView(i); strs.emplace_back(str); - size += str.size(); } if (nullable_) { - auto null_bitmap_n = (data->length() + 7) / 8; - // size, offset all in bits null_bitmaps.emplace_back( data->null_bitmap_data(), data->length(), data->offset()); - size += null_bitmap_n; } - row_nums_ += array->length(); - } - - size += sizeof(uint32_t) * (row_nums_ + 1) + MMAP_STRING_PADDING; - if (!file_path_.empty()) { - target_ = std::make_shared(file_path_); - } else { - target_ = std::make_shared(size); } // chunk layout: null bitmap, offset1, offset2, ..., offsetn, str1, str2, ..., strn, padding // write null bitmaps - write_null_bit_maps(null_bitmaps); + write_null_bit_maps(null_bitmaps, target); // write data - int offset_num = row_nums_ + 1; - uint32_t offset_start_pos = target_->tell() + sizeof(uint32_t) * offset_num; + const int offset_num = row_nums_ + 1; + const uint32_t null_bitmap_bytes = + nullable_ ? static_cast((row_nums_ + 7) / 8) : 0; + uint32_t offset_start_pos = + null_bitmap_bytes + sizeof(uint32_t) * offset_num; std::vector offsets; offsets.reserve(offset_num); for (const auto& str : strs) { @@ -78,29 +85,41 @@ StringChunkWriter::write(const arrow::ArrayVector& array_vec) { } offsets.push_back(offset_start_pos); - target_->write(offsets.data(), offsets.size() * sizeof(uint32_t)); + target->write(offsets.data(), offsets.size() * sizeof(uint32_t)); for (auto str : strs) { - target_->write(str.data(), str.size()); + target->write(str.data(), str.size()); } -} -std::unique_ptr -StringChunkWriter::finish() { // write padding, maybe not needed anymore // FIXME char padding[MMAP_STRING_PADDING]; - target_->write(padding, MMAP_STRING_PADDING); - auto [data, size] = target_->get(); - auto mmap_file_raii = file_path_.empty() - ? nullptr - : std::make_unique(file_path_); - return std::make_unique( - row_nums_, data, size, nullable_, std::move(mmap_file_raii)); + target->write(padding, MMAP_STRING_PADDING); +} + +std::pair +JSONChunkWriter::calculate_size(const arrow::ArrayVector& array_vec) { + row_nums_ = 0; + size_t size = 0; + for (const auto& data : array_vec) { + auto array = std::dynamic_pointer_cast(data); + for (int i = 0; i < array->length(); i++) { + auto str = array->GetView(i); + auto json = Json(simdjson::padded_string(str)); + size += json.data().size(); + } + row_nums_ += array->length(); + } + if (nullable_) { + size += (row_nums_ + 7) / 8; + } + size += sizeof(uint32_t) * (row_nums_ + 1) + simdjson::SIMDJSON_PADDING; + + return {size, row_nums_}; } void -JSONChunkWriter::write(const arrow::ArrayVector& array_vec) { - auto size = 0; +JSONChunkWriter::write_to_target(const arrow::ArrayVector& array_vec, + const std::shared_ptr& target) { std::vector jsons; // tuple std::vector> null_bitmaps; @@ -109,31 +128,23 @@ JSONChunkWriter::write(const arrow::ArrayVector& array_vec) { for (int i = 0; i < array->length(); i++) { auto str = array->GetView(i); auto json = Json(simdjson::padded_string(str)); - size += json.data().size(); jsons.push_back(std::move(json)); } if (nullable_) { - auto null_bitmap_n = (data->length() + 7) / 8; - // size, offset all in bits null_bitmaps.emplace_back( data->null_bitmap_data(), data->length(), data->offset()); - size += null_bitmap_n; } - row_nums_ += array->length(); - } - size += sizeof(uint32_t) * (row_nums_ + 1) + simdjson::SIMDJSON_PADDING; - if (!file_path_.empty()) { - target_ = std::make_shared(file_path_); - } else { - target_ = std::make_shared(size); } // chunk layout: null bitmaps, offset1, offset2, ... ,json1, json2, ..., jsonn // write null bitmaps - write_null_bit_maps(null_bitmaps); + write_null_bit_maps(null_bitmaps, target); - int offset_num = row_nums_ + 1; - uint32_t offset_start_pos = target_->tell() + sizeof(uint32_t) * offset_num; + const int offset_num = row_nums_ + 1; + const uint32_t null_bitmap_bytes = + nullable_ ? static_cast((row_nums_ + 7) / 8) : 0; + uint32_t offset_start_pos = + null_bitmap_bytes + sizeof(uint32_t) * offset_num; std::vector offsets; offsets.reserve(offset_num); for (const auto& json : jsons) { @@ -142,262 +153,182 @@ JSONChunkWriter::write(const arrow::ArrayVector& array_vec) { } offsets.push_back(offset_start_pos); - target_->write(offsets.data(), offset_num * sizeof(uint32_t)); + target->write(offsets.data(), offset_num * sizeof(uint32_t)); // write data for (const auto& json : jsons) { - target_->write(json.data().data(), json.data().size()); + target->write(json.data().data(), json.data().size()); } -} -std::unique_ptr -JSONChunkWriter::finish() { char padding[simdjson::SIMDJSON_PADDING]; - target_->write(padding, simdjson::SIMDJSON_PADDING); - - auto [data, size] = target_->get(); - auto mmap_file_raii = file_path_.empty() - ? nullptr - : std::make_unique(file_path_); - return std::make_unique( - row_nums_, data, size, nullable_, std::move(mmap_file_raii)); + target->write(padding, simdjson::SIMDJSON_PADDING); } -void -GeometryChunkWriter::write(const arrow::ArrayVector& array_vec) { - auto size = 0; - std::vector wkb_strs; - // tuple - std::vector> null_bitmaps; +std::pair +GeometryChunkWriter::calculate_size(const arrow::ArrayVector& array_vec) { + row_nums_ = 0; + size_t size = 0; for (const auto& data : array_vec) { auto array = std::dynamic_pointer_cast(data); - for (int i = 0; i < array->length(); i++) { + for (int64_t i = 0; i < array->length(); ++i) { auto str = array->GetView(i); - wkb_strs.emplace_back(str); size += str.size(); } - if (nullable_) { - auto null_bitmap_n = (data->length() + 7) / 8; - null_bitmaps.emplace_back( - data->null_bitmap_data(), data->length(), data->offset()); - size += null_bitmap_n; - } row_nums_ += array->length(); } - // use 32-bit offsets to align with StringChunk layout + if (nullable_) { + size += (row_nums_ + 7) / 8; + } size += sizeof(uint32_t) * (row_nums_ + 1) + MMAP_GEOMETRY_PADDING; - if (!file_path_.empty()) { - target_ = std::make_shared(file_path_); - } else { - target_ = std::make_shared(size); + return {size, row_nums_}; +} + +void +GeometryChunkWriter::write_to_target( + const arrow::ArrayVector& array_vec, + const std::shared_ptr& target) { + std::vector wkb_strs; + std::vector> null_bitmaps; + wkb_strs.reserve(row_nums_); + + for (const auto& data : array_vec) { + auto array = std::dynamic_pointer_cast(data); + for (int64_t i = 0; i < array->length(); ++i) { + auto str = array->GetView(i); + wkb_strs.emplace_back(str); + } + if (nullable_) { + null_bitmaps.emplace_back( + data->null_bitmap_data(), data->length(), data->offset()); + } } - // chunk layout: null bitmap, offset1, offset2, ..., offsetn, wkb1, wkb2, ..., wkbn, padding - // write null bitmaps - write_null_bit_maps(null_bitmaps); + // chunk layout: null bitmap, offsets, wkb strings, padding + write_null_bit_maps(null_bitmaps, target); - int offset_num = row_nums_ + 1; + const int offset_num = row_nums_ + 1; + const uint32_t null_bitmap_bytes = + nullable_ ? static_cast((row_nums_ + 7) / 8) : 0; uint32_t offset_start_pos = - static_cast(target_->tell() + sizeof(uint32_t) * offset_num); + null_bitmap_bytes + + static_cast(sizeof(uint32_t) * offset_num); std::vector offsets; offsets.reserve(offset_num); - - for (auto str : wkb_strs) { + for (const auto& str : wkb_strs) { offsets.push_back(offset_start_pos); offset_start_pos += str.size(); } offsets.push_back(offset_start_pos); - target_->write(offsets.data(), offsets.size() * sizeof(uint32_t)); + target->write(offsets.data(), offsets.size() * sizeof(uint32_t)); - for (auto str : wkb_strs) { - target_->write(str.data(), str.size()); + for (const auto& str : wkb_strs) { + target->write(str.data(), str.size()); } + + char padding[MMAP_GEOMETRY_PADDING]; + target->write(padding, MMAP_GEOMETRY_PADDING); } -std::unique_ptr -GeometryChunkWriter::finish() { - // write padding, maybe not needed anymore - // FIXME - char padding[MMAP_GEOMETRY_PADDING]; - target_->write(padding, MMAP_GEOMETRY_PADDING); - auto [data, size] = target_->get(); - auto mmap_file_raii = file_path_.empty() - ? nullptr - : std::make_unique(file_path_); - return std::make_unique( - row_nums_, data, size, nullable_, std::move(mmap_file_raii)); +std::pair +ArrayChunkWriter::calculate_size(const arrow::ArrayVector& array_vec) { + row_nums_ = 0; + size_t size = 0; + const bool is_string = IsStringDataType(element_type_); + + for (const auto& data : array_vec) { + auto array = std::dynamic_pointer_cast(data); + for (int64_t i = 0; i < array->length(); ++i) { + auto str = array->GetView(i); + ScalarFieldProto scalar_array; + scalar_array.ParseFromArray(str.data(), str.size()); + Array arr(scalar_array); + size += arr.byte_size(); + if (is_string) { + size += sizeof(uint32_t) * arr.length(); + } + } + + row_nums_ += array->length(); + } + if (nullable_) { + size += (row_nums_ + 7) / 8; + } + size += sizeof(uint32_t) * (row_nums_ * 2 + 1) + MMAP_ARRAY_PADDING; + return {size, row_nums_}; } void -ArrayChunkWriter::write(const arrow::ArrayVector& array_vec) { - auto size = 0; - auto is_string = IsStringDataType(element_type_); +ArrayChunkWriter::write_to_target(const arrow::ArrayVector& array_vec, + const std::shared_ptr& target) { + const bool is_string = IsStringDataType(element_type_); std::vector arrays; - // tuple + arrays.reserve(row_nums_); std::vector> null_bitmaps; for (const auto& data : array_vec) { auto array = std::dynamic_pointer_cast(data); - for (int i = 0; i < array->length(); i++) { + for (int64_t i = 0; i < array->length(); ++i) { auto str = array->GetView(i); ScalarFieldProto scalar_array; scalar_array.ParseFromArray(str.data(), str.size()); - auto arr = Array(scalar_array); - size += arr.byte_size(); - if (is_string) { - // element offsets size - size += sizeof(uint32_t) * arr.length(); - } - arrays.push_back(std::move(arr)); + arrays.emplace_back(Array(scalar_array)); } - row_nums_ += array->length(); if (nullable_) { - auto null_bitmap_n = (data->length() + 7) / 8; - // size, offset all in bits null_bitmaps.emplace_back( data->null_bitmap_data(), data->length(), data->offset()); - size += null_bitmap_n; } } - // offsets + lens - size += sizeof(uint32_t) * (row_nums_ * 2 + 1) + MMAP_ARRAY_PADDING; - if (!file_path_.empty()) { - target_ = std::make_shared(file_path_); - } else { - target_ = std::make_shared(size); - } + write_null_bit_maps(null_bitmaps, target); - // chunk layout: nullbitmaps, offsets, elem_off1, elem_off2, .. data1, data2, ..., datan, padding - write_null_bit_maps(null_bitmaps); - - int offsets_num = row_nums_ + 1; - int len_num = row_nums_; + const int offsets_num = row_nums_ + 1; + const int len_num = row_nums_; + const uint32_t null_bitmap_bytes = + nullable_ ? static_cast((row_nums_ + 7) / 8) : 0; uint32_t offset_start_pos = - target_->tell() + sizeof(uint32_t) * (offsets_num + len_num); + null_bitmap_bytes + sizeof(uint32_t) * (offsets_num + len_num); + std::vector offsets(offsets_num); std::vector lens(len_num); - for (auto i = 0; i < arrays.size(); i++) { + + for (size_t i = 0; i < arrays.size(); ++i) { auto& arr = arrays[i]; offsets[i] = offset_start_pos; lens[i] = arr.length(); - offset_start_pos += is_string ? sizeof(uint32_t) * lens[i] : 0; + if (is_string) { + offset_start_pos += sizeof(uint32_t) * lens[i]; + } offset_start_pos += arr.byte_size(); } - if (offsets_num > 0) { - offsets[offsets_num - 1] = offset_start_pos; + + if (!offsets.empty()) { + offsets.back() = offset_start_pos; } - for (int i = 0; i < offsets.size(); i++) { - if (i == offsets.size() - 1) { - target_->write(&offsets[i], sizeof(uint32_t)); - break; - } - target_->write(&offsets[i], sizeof(uint32_t)); - target_->write(&lens[i], sizeof(uint32_t)); + for (int i = 0; i < row_nums_; ++i) { + target->write(&offsets[i], sizeof(uint32_t)); + target->write(&lens[i], sizeof(uint32_t)); } + target->write(&offsets.back(), sizeof(uint32_t)); for (auto& arr : arrays) { if (is_string) { - target_->write(arr.get_offsets_data(), - arr.length() * sizeof(uint32_t)); + target->write(arr.get_offsets_data(), + arr.length() * sizeof(uint32_t)); } - target_->write(arr.data(), arr.byte_size()); + target->write(arr.data(), arr.byte_size()); } -} -std::unique_ptr -ArrayChunkWriter::finish() { char padding[MMAP_ARRAY_PADDING]; - target_->write(padding, MMAP_ARRAY_PADDING); - auto [data, size] = target_->get(); - auto mmap_file_raii = file_path_.empty() - ? nullptr - : std::make_unique(file_path_); - return std::make_unique(row_nums_, - data, - size, - element_type_, - nullable_, - std::move(mmap_file_raii)); + target->write(padding, MMAP_ARRAY_PADDING); } -// Read vector array data from arrow::ArrayVector and write to target_ -void -VectorArrayChunkWriter::write(const arrow::ArrayVector& array_vec) { - size_t total_size = calculateTotalSize(array_vec); - row_nums_ = 0; - - for (const auto& array_data : array_vec) { - row_nums_ += array_data->length(); - } - - if (!file_path_.empty()) { - target_ = std::make_shared(file_path_); - } else { - target_ = std::make_shared(total_size); - } - - // Seirialization, the format is: [offsets_lens][all_vector_data_concatenated] - std::vector offsets_lens; - std::vector vector_data_ptrs; - std::vector data_sizes; - - uint32_t current_offset = - sizeof(uint32_t) * (row_nums_ * 2 + 1) + target_->tell(); - - for (const auto& array_data : array_vec) { - auto list_array = - std::static_pointer_cast(array_data); - auto binary_values = - std::static_pointer_cast( - list_array->values()); - const int32_t* list_offsets = list_array->raw_value_offsets(); - int byte_width = binary_values->byte_width(); - - // Generate offsets and lengths for each row - // Each list contains multiple vectors, each stored as a fixed-size binary chunk - for (int64_t i = 0; i < list_array->length(); i++) { - auto start_idx = list_offsets[i]; - auto end_idx = list_offsets[i + 1]; - auto vector_count = end_idx - start_idx; - auto byte_size = vector_count * byte_width; - - offsets_lens.push_back(current_offset); - offsets_lens.push_back(static_cast(vector_count)); - - for (int j = start_idx; j < end_idx; j++) { - vector_data_ptrs.push_back(binary_values->GetValue(j)); - data_sizes.push_back(byte_width); - } - - current_offset += byte_size; - } - } - - // Add final offset - offsets_lens.push_back(current_offset); - - // Write offset and length arrays - for (size_t i = 0; i < offsets_lens.size() - 1; i += 2) { - target_->write(&offsets_lens[i], sizeof(uint32_t)); // offset - target_->write(&offsets_lens[i + 1], sizeof(uint32_t)); // length - } - target_->write(&offsets_lens.back(), sizeof(uint32_t)); // final offset - - for (size_t i = 0; i < vector_data_ptrs.size(); i++) { - target_->write(vector_data_ptrs[i], data_sizes[i]); - } -} - -size_t -VectorArrayChunkWriter::calculateTotalSize( - const arrow::ArrayVector& array_vec) { - size_t total_size = 0; +std::pair +VectorArrayChunkWriter::calculate_size(const arrow::ArrayVector& array_vec) { size_t total_rows = 0; + size_t total_size = 0; - // Calculate total size for vector data and count rows for (const auto& array_data : array_vec) { total_rows += array_data->length(); auto list_array = @@ -423,66 +354,121 @@ VectorArrayChunkWriter::calculateTotalSize( } } + row_nums_ = total_rows; + // Add space for offset and length arrays - total_size += sizeof(uint32_t) * (total_rows * 2 + 1 /* final offset */) + - MMAP_ARRAY_PADDING; - - return total_size; -} - -std::unique_ptr -VectorArrayChunkWriter::finish() { - char padding[MMAP_ARRAY_PADDING]; - target_->write(padding, MMAP_ARRAY_PADDING); - - auto [data, size] = target_->get(); - auto mmap_file_raii = file_path_.empty() - ? nullptr - : std::make_unique(file_path_); - return std::make_unique( - dim_, row_nums_, data, size, element_type_, std::move(mmap_file_raii)); + total_size += sizeof(uint32_t) * (total_rows * 2 + 1) + MMAP_ARRAY_PADDING; + return {total_size, total_rows}; } void -SparseFloatVectorChunkWriter::write(const arrow::ArrayVector& array_vec) { - auto size = 0; - std::vector strs; - std::vector> null_bitmaps; +VectorArrayChunkWriter::write_to_target( + const arrow::ArrayVector& array_vec, + const std::shared_ptr& target) { + std::vector offsets_lens; + offsets_lens.reserve(row_nums_ * 2 + 1); + std::vector vector_data_ptrs; + std::vector data_sizes; + + uint32_t current_offset = sizeof(uint32_t) * (row_nums_ * 2 + 1); + + for (const auto& array_data : array_vec) { + auto list_array = + std::static_pointer_cast(array_data); + auto binary_values = + std::static_pointer_cast( + list_array->values()); + const int32_t* list_offsets = list_array->raw_value_offsets(); + int byte_width = binary_values->byte_width(); + + // Generate offsets and lengths for each row + // Each list contains multiple vectors, each stored as a fixed-size binary chunk + for (int64_t i = 0; i < list_array->length(); i++) { + auto start_idx = list_offsets[i]; + auto end_idx = list_offsets[i + 1]; + auto vector_count = end_idx - start_idx; + auto byte_size = static_cast(vector_count * byte_width); + + offsets_lens.push_back(current_offset); + offsets_lens.push_back(static_cast(vector_count)); + + for (int32_t j = start_idx; j < end_idx; ++j) { + vector_data_ptrs.push_back(binary_values->GetValue(j)); + data_sizes.push_back(byte_width); + } + + current_offset += byte_size; + } + } + + offsets_lens.push_back(current_offset); + + // Write offset and length arrays + for (size_t i = 0; i < offsets_lens.size() - 1; i += 2) { + target->write(&offsets_lens[i], sizeof(uint32_t)); // offset + target->write(&offsets_lens[i + 1], sizeof(uint32_t)); // length + } + target->write(&offsets_lens.back(), sizeof(uint32_t)); // final offset + + for (size_t i = 0; i < vector_data_ptrs.size(); i++) { + target->write(vector_data_ptrs[i], data_sizes[i]); + } + + char padding[MMAP_ARRAY_PADDING]; + target->write(padding, MMAP_ARRAY_PADDING); +} + +std::pair +SparseFloatVectorChunkWriter::calculate_size( + const arrow::ArrayVector& array_vec) { + row_nums_ = 0; + size_t size = 0; + for (const auto& data : array_vec) { auto array = std::dynamic_pointer_cast(data); - for (int i = 0; i < array->length(); i++) { + for (int64_t i = 0; i < array->length(); ++i) { auto str = array->GetView(i); - strs.emplace_back(str); size += str.size(); } - auto null_bitmap_n = (data->length() + 7) / 8; - null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n); - size += null_bitmap_n; row_nums_ += array->length(); } - size += sizeof(uint64_t) * (row_nums_ + 1); - if (!file_path_.empty()) { - target_ = std::make_shared(file_path_); - } else { - target_ = std::make_shared(size); - } - // chunk layout: null bitmap, offset1, offset2, ..., offsetn, str1, str2, ..., strn - // write null bitmaps - for (auto [data, size] : null_bitmaps) { - if (data == nullptr) { - std::vector null_bitmap(size, 0xff); - target_->write(null_bitmap.data(), size); - } else { - target_->write(data, size); + if (nullable_) { + size += (row_nums_ + 7) / 8; + } + size += sizeof(uint64_t) * (row_nums_ + 1); + return {size, row_nums_}; +} + +void +SparseFloatVectorChunkWriter::write_to_target( + const arrow::ArrayVector& array_vec, + const std::shared_ptr& target) { + std::vector strs; + strs.reserve(row_nums_); + std::vector> null_bitmaps; + + for (const auto& data : array_vec) { + auto array = std::dynamic_pointer_cast(data); + for (int64_t i = 0; i < array->length(); ++i) { + auto str = array->GetView(i); + strs.emplace_back(str); + } + if (nullable_) { + null_bitmaps.emplace_back( + data->null_bitmap_data(), data->length(), data->offset()); } } - // write data + write_null_bit_maps(null_bitmaps, target); - int offset_num = row_nums_ + 1; - int offset_start_pos = target_->tell() + sizeof(uint64_t) * offset_num; + const int offset_num = row_nums_ + 1; + const uint64_t null_bitmap_bytes = + nullable_ ? static_cast((row_nums_ + 7) / 8) : 0; + uint64_t offset_start_pos = + null_bitmap_bytes + sizeof(uint64_t) * offset_num; std::vector offsets; + offsets.reserve(offset_num); for (const auto& str : strs) { offsets.push_back(offset_start_pos); @@ -490,26 +476,15 @@ SparseFloatVectorChunkWriter::write(const arrow::ArrayVector& array_vec) { } offsets.push_back(offset_start_pos); - target_->write(offsets.data(), offsets.size() * sizeof(uint64_t)); + target->write(offsets.data(), offsets.size() * sizeof(uint64_t)); - for (auto str : strs) { - target_->write(str.data(), str.size()); + for (const auto& str : strs) { + target->write(str.data(), str.size()); } } -std::unique_ptr -SparseFloatVectorChunkWriter::finish() { - auto [data, size] = target_->get(); - auto mmap_file_raii = file_path_.empty() - ? nullptr - : std::make_unique(file_path_); - return std::make_unique( - row_nums_, data, size, nullable_, std::move(mmap_file_raii)); -} - -template -std::shared_ptr -create_chunk_writer(const FieldMeta& field_meta, Args&&... args) { +static inline std::shared_ptr +create_chunk_writer(const FieldMeta& field_meta) { int dim = IsVectorDataType(field_meta.get_data_type()) && !IsSparseFloatVectorDataType(field_meta.get_data_type()) ? field_meta.get_dim() @@ -518,92 +493,344 @@ create_chunk_writer(const FieldMeta& field_meta, Args&&... args) { switch (field_meta.get_data_type()) { case milvus::DataType::BOOL: return std::make_shared>( - dim, std::forward(args)..., nullable); + dim, nullable); case milvus::DataType::INT8: return std::make_shared>( - dim, std::forward(args)..., nullable); + dim, nullable); case milvus::DataType::INT16: return std::make_shared>( - dim, std::forward(args)..., nullable); + dim, nullable); case milvus::DataType::INT32: return std::make_shared>( - dim, std::forward(args)..., nullable); + dim, nullable); case milvus::DataType::INT64: return std::make_shared>( - dim, std::forward(args)..., nullable); + dim, nullable); case milvus::DataType::FLOAT: return std::make_shared>( - dim, std::forward(args)..., nullable); + dim, nullable); case milvus::DataType::DOUBLE: return std::make_shared>( - dim, std::forward(args)..., nullable); + dim, nullable); case milvus::DataType::TIMESTAMPTZ: return std::make_shared>( - dim, std::forward(args)..., nullable); + dim, nullable); case milvus::DataType::VECTOR_FLOAT: return std::make_shared< ChunkWriter>( - dim, std::forward(args)..., nullable); + dim, nullable); case milvus::DataType::VECTOR_BINARY: return std::make_shared< ChunkWriter>( - dim / 8, std::forward(args)..., nullable); + dim / 8, nullable); case milvus::DataType::VECTOR_FLOAT16: return std::make_shared< ChunkWriter>( - dim, std::forward(args)..., nullable); + dim, nullable); case milvus::DataType::VECTOR_BFLOAT16: return std::make_shared< ChunkWriter>( - dim, std::forward(args)..., nullable); + dim, nullable); case milvus::DataType::VECTOR_INT8: return std::make_shared< ChunkWriter>( - dim, std::forward(args)..., nullable); + dim, nullable); case milvus::DataType::VARCHAR: case milvus::DataType::STRING: case milvus::DataType::TEXT: - return std::make_shared( - std::forward(args)..., nullable); + return std::make_shared(nullable); case milvus::DataType::JSON: - return std::make_shared( - std::forward(args)..., nullable); + return std::make_shared(nullable); case milvus::DataType::GEOMETRY: { - return std::make_shared( - std::forward(args)..., nullable); + return std::make_shared(nullable); } case milvus::DataType::ARRAY: return std::make_shared( - field_meta.get_element_type(), - std::forward(args)..., - nullable); + field_meta.get_element_type(), nullable); case milvus::DataType::VECTOR_SPARSE_U32_F32: - return std::make_shared( - std::forward(args)..., nullable); + return std::make_shared(nullable); case milvus::DataType::VECTOR_ARRAY: return std::make_shared( - dim, - field_meta.get_element_type(), - std::forward(args)...); + dim, field_meta.get_element_type()); default: ThrowInfo(Unsupported, "Unsupported data type"); } } -std::unique_ptr -create_chunk(const FieldMeta& field_meta, const arrow::ArrayVector& array_vec) { - auto cw = create_chunk_writer(field_meta); - cw->write(array_vec); - return cw->finish(); +static inline std::unique_ptr +make_chunk(const FieldMeta& field_meta, + size_t row_nums, + char* data, + size_t size, + const std::string& file_path, + std::shared_ptr chunk_mmap_guard) { + int dim = IsVectorDataType(field_meta.get_data_type()) && + !IsSparseFloatVectorDataType(field_meta.get_data_type()) + ? field_meta.get_dim() + : 1; + bool nullable = field_meta.is_nullable(); + switch (field_meta.get_data_type()) { + case milvus::DataType::BOOL: + return std::make_unique(row_nums, + dim, + data, + size, + sizeof(bool), + nullable, + chunk_mmap_guard); + case milvus::DataType::INT8: + return std::make_unique(row_nums, + dim, + data, + size, + sizeof(int8_t), + nullable, + chunk_mmap_guard); + case milvus::DataType::INT16: + return std::make_unique(row_nums, + dim, + data, + size, + sizeof(int16_t), + nullable, + chunk_mmap_guard); + case milvus::DataType::INT32: + return std::make_unique(row_nums, + dim, + data, + size, + sizeof(int32_t), + nullable, + chunk_mmap_guard); + case milvus::DataType::INT64: + return std::make_unique(row_nums, + dim, + data, + size, + sizeof(int64_t), + nullable, + chunk_mmap_guard); + case milvus::DataType::FLOAT: + return std::make_unique(row_nums, + dim, + data, + size, + sizeof(float), + nullable, + chunk_mmap_guard); + case milvus::DataType::DOUBLE: + return std::make_unique(row_nums, + dim, + data, + size, + sizeof(double), + nullable, + chunk_mmap_guard); + case milvus::DataType::TIMESTAMPTZ: + return std::make_unique(row_nums, + dim, + data, + size, + sizeof(int64_t), + nullable, + chunk_mmap_guard); + case milvus::DataType::VECTOR_FLOAT: + return std::make_unique(row_nums, + dim, + data, + size, + sizeof(knowhere::fp32), + nullable, + chunk_mmap_guard); + case milvus::DataType::VECTOR_BINARY: + return std::make_unique(row_nums, + dim / 8, + data, + size, + sizeof(knowhere::bin1), + nullable, + chunk_mmap_guard); + case milvus::DataType::VECTOR_FLOAT16: + return std::make_unique(row_nums, + dim, + data, + size, + sizeof(knowhere::fp16), + nullable, + chunk_mmap_guard); + case milvus::DataType::VECTOR_BFLOAT16: + return std::make_unique(row_nums, + dim, + data, + size, + sizeof(knowhere::bf16), + nullable, + chunk_mmap_guard); + case milvus::DataType::VECTOR_INT8: + return std::make_unique(row_nums, + dim, + data, + size, + sizeof(knowhere::int8), + nullable, + chunk_mmap_guard); + case milvus::DataType::VARCHAR: + case milvus::DataType::STRING: + case milvus::DataType::TEXT: + return std::make_unique( + row_nums, data, size, nullable, chunk_mmap_guard); + case milvus::DataType::JSON: + return std::make_unique( + row_nums, data, size, nullable, chunk_mmap_guard); + case milvus::DataType::GEOMETRY: { + return std::make_unique( + row_nums, data, size, nullable, chunk_mmap_guard); + } + case milvus::DataType::ARRAY: + return std::make_unique(row_nums, + data, + size, + field_meta.get_element_type(), + nullable, + chunk_mmap_guard); + case milvus::DataType::VECTOR_SPARSE_U32_F32: + return std::make_unique( + row_nums, data, size, nullable, chunk_mmap_guard); + case milvus::DataType::VECTOR_ARRAY: + return std::make_unique( + dim, + row_nums, + data, + size, + field_meta.get_element_type(), + chunk_mmap_guard); + default: + ThrowInfo(DataTypeInvalid, "Unsupported data type"); + } } std::unique_ptr create_chunk(const FieldMeta& field_meta, const arrow::ArrayVector& array_vec, const std::string& file_path) { - auto cw = create_chunk_writer(field_meta, file_path); - cw->write(array_vec); - return cw->finish(); + auto cw = create_chunk_writer(field_meta); + auto [size, row_nums] = cw->calculate_size(array_vec); + size_t aligned_size = (size + ChunkTarget::ALIGNED_SIZE - 1) & + ~(ChunkTarget::ALIGNED_SIZE - 1); + std::shared_ptr target; + if (file_path.empty()) { + target = std::make_shared(aligned_size); + } else { + target = std::make_shared(file_path, aligned_size); + } + cw->write_to_target(array_vec, target); + auto data = target->release(); + std::shared_ptr chunk_mmap_guard = nullptr; + if (!file_path.empty()) { + chunk_mmap_guard = + std::make_shared(data, size, file_path); + } else { + chunk_mmap_guard = std::make_shared(data, size, ""); + } + return make_chunk( + field_meta, row_nums, data, size, file_path, chunk_mmap_guard); +} + +std::unordered_map> +create_group_chunk(const std::vector& field_ids, + const std::vector& field_metas, + const std::vector& array_vec, + const std::string& file_path) { + std::vector> cws; + cws.reserve(field_ids.size()); + size_t total_aligned_size = 0, final_row_nums = 0; + for (size_t i = 0; i < field_ids.size(); i++) { + auto field_meta = field_metas[i]; + cws.push_back(create_chunk_writer(field_meta)); + } + std::vector chunk_sizes; + chunk_sizes.reserve(field_ids.size()); + std::vector chunk_offsets; + chunk_offsets.reserve(field_ids.size()); + for (size_t i = 0; i < field_ids.size(); i++) { + auto [size, row_nums] = cws[i]->calculate_size(array_vec[i]); + // Allocate and place each sub-chunk at an aligned boundary, + // but keep the raw (unaligned) size for chunk construction. + auto aligned_size = (size + ChunkTarget::ALIGNED_SIZE - 1) & + ~(ChunkTarget::ALIGNED_SIZE - 1); + // store raw size for make_chunk() + chunk_sizes.push_back(size); + chunk_offsets.push_back(total_aligned_size); + total_aligned_size += aligned_size; + // each column should have the same number of rows + if (i == 0) { + final_row_nums = row_nums; + } else { + if (row_nums != final_row_nums) { + ThrowInfo(DataTypeInvalid, + "All columns should have the same number of rows"); + } + } + } + std::shared_ptr target; + if (file_path.empty()) { + target = std::make_shared(total_aligned_size); + } else { + target = + std::make_shared(file_path, total_aligned_size); + } + for (size_t i = 0; i < field_ids.size(); i++) { + auto start_off = target->tell(); + cws[i]->write_to_target(array_vec[i], target); + auto end_off = target->tell(); + auto written = static_cast(end_off - start_off); + if (written != chunk_sizes[i]) { + ThrowInfo(DataTypeInvalid, + "The written size {} of field {} does not match the " + "expected size {}", + written, + field_ids[i].get(), + chunk_sizes[i]); + } + auto aligned_size = (written + ChunkTarget::ALIGNED_SIZE - 1) & + ~(ChunkTarget::ALIGNED_SIZE - 1); + auto padding_size = aligned_size - written; + if (padding_size > 0) { + std::string padding(padding_size, 0); + target->write(padding.data(), padding_size); + } + } + + auto data = target->release(); + + // For mmap mode, create a shared mmap region manager + std::shared_ptr chunk_mmap_guard = nullptr; + if (!file_path.empty()) { + chunk_mmap_guard = std::make_shared( + data, total_aligned_size, file_path); + } else { + chunk_mmap_guard = + std::make_shared(data, total_aligned_size, ""); + } + + std::unordered_map> chunks; + for (size_t i = 0; i < field_ids.size(); i++) { + chunks[field_ids[i]] = std::move(make_chunk(field_metas[i], + final_row_nums, + data + chunk_offsets[i], + chunk_sizes[i], + file_path, + chunk_mmap_guard)); + LOG_INFO( + "created chunk for field {} with chunk offset: {}, chunk " + "size: {}, file path: {}", + field_ids[i].get(), + chunk_offsets[i], + chunk_sizes[i], + file_path); + } + + return chunks; } arrow::ArrayVector diff --git a/internal/core/src/common/ChunkWriter.h b/internal/core/src/common/ChunkWriter.h index 18ef8e0241..675c135781 100644 --- a/internal/core/src/common/ChunkWriter.h +++ b/internal/core/src/common/ChunkWriter.h @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include "arrow/array/array_primitive.h" @@ -21,37 +20,26 @@ #include "common/ChunkTarget.h" #include "arrow/record_batch.h" #include "common/Chunk.h" -#include "common/EasyAssert.h" -#include "common/FieldDataInterface.h" -#include "storage/FileWriter.h" - -#include "common/Geometry.h" namespace milvus { class ChunkWriterBase { public: explicit ChunkWriterBase(bool nullable) : nullable_(nullable) { } - ChunkWriterBase(std::string file_path, bool nullable) - : file_path_(std::move(file_path)), nullable_(nullable) { - } + virtual std::pair + calculate_size(const arrow::ArrayVector& data) = 0; virtual void - write(const arrow::ArrayVector& data) = 0; - - virtual std::unique_ptr - finish() = 0; - - std::pair - get_data() { - return target_->get(); - } + write_to_target(const arrow::ArrayVector& array_vec, + const std::shared_ptr& target) = 0; + protected: void write_null_bit_maps( const std::vector>& - null_bitmaps) { + null_bitmaps, + const std::shared_ptr& target) { if (nullable_) { // merge all null bitmaps in case of multiple chunk null bitmap dislocation // say [0xFF, 0x00] with size [7, 8] cannot be treated as [0xFF, 0x00] after merged but @@ -81,15 +69,13 @@ class ChunkWriterBase { } size_total_bit += size_bits; } - target_->write(merged_null_bitmap.data(), (size_total_bit + 7) / 8); + target->write(merged_null_bitmap.data(), (size_total_bit + 7) / 8); } } protected: - int row_nums_ = 0; - std::string file_path_{""}; + size_t row_nums_ = 0; bool nullable_ = false; - std::shared_ptr target_; }; template @@ -98,30 +84,25 @@ class ChunkWriter final : public ChunkWriterBase { ChunkWriter(int dim, bool nullable) : ChunkWriterBase(nullable), dim_(dim) { } - ChunkWriter(int dim, std::string file_path, bool nullable) - : ChunkWriterBase(std::move(file_path), nullable), dim_(dim){}; - - void - write(const arrow::ArrayVector& array_vec) override { - auto size = 0; - auto row_nums = 0; - + std::pair + calculate_size(const arrow::ArrayVector& array_vec) override { + size_t size = 0; + size_t row_nums = 0; for (const auto& data : array_vec) { row_nums += data->length(); auto array = std::static_pointer_cast(data); - if (nullable_) { - auto null_bitmap_n = (data->length() + 7) / 8; - size += null_bitmap_n; - } size += array->length() * dim_ * sizeof(T); } - - row_nums_ = row_nums; - if (!file_path_.empty()) { - target_ = std::make_shared(file_path_); - } else { - target_ = std::make_shared(size); + if (nullable_) { + size += (row_nums + 7) / 8; } + row_nums_ = row_nums; + return {size, row_nums}; + } + + void + write_to_target(const arrow::ArrayVector& array_vec, + const std::shared_ptr& target) override { // Chunk layout: // 1. Null bitmap (if nullable_=true): Indicates which values are null // 2. Data values: Contiguous storage of data elements in the order: @@ -134,55 +115,25 @@ class ChunkWriter final : public ChunkWriterBase { null_bitmaps.emplace_back( data->null_bitmap_data(), data->length(), data->offset()); } - write_null_bit_maps(null_bitmaps); + write_null_bit_maps(null_bitmaps, target); } for (const auto& data : array_vec) { auto array = std::static_pointer_cast(data); auto data_ptr = array->raw_values(); - target_->write(data_ptr, array->length() * dim_ * sizeof(T)); + target->write(data_ptr, array->length() * dim_ * sizeof(T)); } } - std::unique_ptr - finish() override { - auto [data, size] = target_->get(); - auto mmap_file_raii = file_path_.empty() - ? nullptr - : std::make_unique(file_path_); - return std::make_unique(row_nums_, - dim_, - data, - size, - sizeof(T), - nullable_, - std::move(mmap_file_raii)); - } - private: - int dim_; + const int64_t dim_; }; template <> inline void -ChunkWriter::write( - const arrow::ArrayVector& array_vec) { - auto size = 0; - auto row_nums = 0; - - for (const auto& data : array_vec) { - row_nums += data->length(); - auto array = std::dynamic_pointer_cast(data); - size += array->length() * dim_; - size += (data->length() + 7) / 8; - } - row_nums_ = row_nums; - if (!file_path_.empty()) { - target_ = std::make_shared(file_path_); - } else { - target_ = std::make_shared(size); - } - +ChunkWriter::write_to_target( + const arrow::ArrayVector& array_vec, + const std::shared_ptr& target) { if (nullable_) { // tuple std::vector> null_bitmaps; @@ -190,14 +141,14 @@ ChunkWriter::write( null_bitmaps.emplace_back( data->null_bitmap_data(), data->length(), data->offset()); } - write_null_bit_maps(null_bitmaps); + write_null_bit_maps(null_bitmaps, target); } for (const auto& data : array_vec) { auto array = std::dynamic_pointer_cast(data); for (int i = 0; i < array->length(); i++) { auto value = array->Value(i); - target_->write(&value, sizeof(bool)); + target->write(&value, sizeof(bool)); } } } @@ -206,32 +157,39 @@ class StringChunkWriter : public ChunkWriterBase { public: using ChunkWriterBase::ChunkWriterBase; - void - write(const arrow::ArrayVector& array_vec) override; + std::pair + calculate_size(const arrow::ArrayVector& array_vec) override; - std::unique_ptr - finish() override; + void + write_to_target(const arrow::ArrayVector& array_vec, + const std::shared_ptr& target) override; + + private: + std::vector strs_; }; class JSONChunkWriter : public ChunkWriterBase { public: using ChunkWriterBase::ChunkWriterBase; - void - write(const arrow::ArrayVector& array_vec) override; + std::pair + calculate_size(const arrow::ArrayVector& array_vec) override; - std::unique_ptr - finish() override; + void + write_to_target(const arrow::ArrayVector& array_vec, + const std::shared_ptr& target) override; }; class GeometryChunkWriter : public ChunkWriterBase { public: using ChunkWriterBase::ChunkWriterBase; - void - write(const arrow::ArrayVector& array_vec) override; - std::unique_ptr - finish() override; + std::pair + calculate_size(const arrow::ArrayVector& array_vec) override; + + void + write_to_target(const arrow::ArrayVector& array_vec, + const std::shared_ptr& target) override; }; class ArrayChunkWriter : public ChunkWriterBase { @@ -239,18 +197,13 @@ class ArrayChunkWriter : public ChunkWriterBase { ArrayChunkWriter(const milvus::DataType element_type, bool nullable) : ChunkWriterBase(nullable), element_type_(element_type) { } - ArrayChunkWriter(const milvus::DataType element_type, - std::string file_path, - bool nullable) - : ChunkWriterBase(std::move(file_path), nullable), - element_type_(element_type) { - } + + std::pair + calculate_size(const arrow::ArrayVector& array_vec) override; void - write(const arrow::ArrayVector& array_vec) override; - - std::unique_ptr - finish() override; + write_to_target(const arrow::ArrayVector& array_vec, + const std::shared_ptr& target) override; private: const milvus::DataType element_type_; @@ -258,46 +211,44 @@ class ArrayChunkWriter : public ChunkWriterBase { class VectorArrayChunkWriter : public ChunkWriterBase { public: - VectorArrayChunkWriter(int64_t dim, - const milvus::DataType element_type, - std::string file_path = "") - : ChunkWriterBase(std::move(file_path), false), - element_type_(element_type), - dim_(dim) { + VectorArrayChunkWriter(int64_t dim, const milvus::DataType element_type) + : ChunkWriterBase(false), element_type_(element_type), dim_(dim) { } - void - write(const arrow::ArrayVector& array_vec) override; + std::pair + calculate_size(const arrow::ArrayVector& array_vec) override; - std::unique_ptr - finish() override; + void + write_to_target(const arrow::ArrayVector& array_vec, + const std::shared_ptr& target) override; private: - size_t - calculateTotalSize(const arrow::ArrayVector& array_vec); - const milvus::DataType element_type_; - int64_t dim_; + const int64_t dim_; }; class SparseFloatVectorChunkWriter : public ChunkWriterBase { public: using ChunkWriterBase::ChunkWriterBase; + std::pair + calculate_size(const arrow::ArrayVector& array_vec) override; + void - write(const arrow::ArrayVector& array_vec) override; - - std::unique_ptr - finish() override; + write_to_target(const arrow::ArrayVector& array_vec, + const std::shared_ptr& target) override; }; -std::unique_ptr -create_chunk(const FieldMeta& field_meta, const arrow::ArrayVector& array_vec); - std::unique_ptr create_chunk(const FieldMeta& field_meta, const arrow::ArrayVector& array_vec, - const std::string& file_path); + const std::string& file_path = ""); + +std::unordered_map> +create_group_chunk(const std::vector& field_ids, + const std::vector& field_metas, + const std::vector& array_vec, + const std::string& file_path = ""); arrow::ArrayVector read_single_column_batches(std::shared_ptr reader); diff --git a/internal/core/src/common/GroupChunk.h b/internal/core/src/common/GroupChunk.h index 284551d531..b85c94e92b 100644 --- a/internal/core/src/common/GroupChunk.h +++ b/internal/core/src/common/GroupChunk.h @@ -98,4 +98,9 @@ class GroupChunk { std::unordered_map> chunks_; }; +enum class GroupChunkType : uint8_t { + DEFAULT = 0, + JSON_KEY_STATS = 1, +}; + } // namespace milvus \ No newline at end of file diff --git a/internal/core/src/common/VectorArrayChunkTest.cpp b/internal/core/src/common/VectorArrayChunkTest.cpp index dd07782e04..ac8bac306b 100644 --- a/internal/core/src/common/VectorArrayChunkTest.cpp +++ b/internal/core/src/common/VectorArrayChunkTest.cpp @@ -21,6 +21,7 @@ #include "common/Chunk.h" #include "common/VectorArray.h" #include "common/Types.h" +#include "common/FieldMeta.h" #include "pb/schema.pb.h" #include "test_utils/DataGen.h" @@ -243,11 +244,14 @@ TEST_F(VectorArrayChunkTest, TestWriteMultipleBatches) { createFloatVectorListArray(batch_data, batch_offsets, dim)); } - // Write using VectorArrayChunkWriter - VectorArrayChunkWriter writer(dim, DataType::VECTOR_FLOAT); - writer.write(array_vec); - - auto chunk = writer.finish(); + // Write using create_chunk with FieldMeta + FieldMeta field_meta(FieldName("va"), + FieldId(1), + DataType::VECTOR_ARRAY, + DataType::VECTOR_FLOAT, + dim, + std::nullopt); + auto chunk = create_chunk(field_meta, array_vec); auto vector_array_chunk = static_cast(chunk.get()); // Verify total rows @@ -297,11 +301,14 @@ TEST_F(VectorArrayChunkTest, TestWriteWithMmap) { auto list_array = createFloatVectorListArray(all_data, offsets, dim); arrow::ArrayVector array_vec = {list_array}; - // Write with mmap - VectorArrayChunkWriter writer(dim, DataType::VECTOR_FLOAT, temp_file); - writer.write(array_vec); - - auto chunk = writer.finish(); + // Write with mmap using create_chunk + FieldMeta field_meta(FieldName("va"), + FieldId(2), + DataType::VECTOR_ARRAY, + DataType::VECTOR_FLOAT, + dim, + std::nullopt); + auto chunk = create_chunk(field_meta, array_vec, temp_file); auto vector_array_chunk = static_cast(chunk.get()); // Verify mmap write @@ -330,10 +337,13 @@ TEST_F(VectorArrayChunkTest, TestEmptyVectorArray) { arrow::ArrayVector array_vec; - VectorArrayChunkWriter writer(dim, DataType::VECTOR_FLOAT); - writer.write(array_vec); - - auto chunk = writer.finish(); + FieldMeta field_meta(FieldName("va"), + FieldId(3), + DataType::VECTOR_ARRAY, + DataType::VECTOR_FLOAT, + dim, + std::nullopt); + auto chunk = create_chunk(field_meta, array_vec); auto vector_array_chunk = static_cast(chunk.get()); EXPECT_EQ(vector_array_chunk->RowNums(), 0); @@ -469,11 +479,14 @@ TEST_P(VectorArrayChunkParameterizedTest, TestWriteVectorArray) { createVectorListArray(all_data, offsets, param.dim, param.data_type); arrow::ArrayVector array_vec = {list_array}; - // Test VectorArrayChunkWriter - VectorArrayChunkWriter writer(param.dim, param.data_type); - writer.write(array_vec); - - auto chunk = writer.finish(); + // Test create_chunk with FieldMeta for VECTOR_ARRAY + FieldMeta field_meta(FieldName("va_param"), + FieldId(100), + DataType::VECTOR_ARRAY, + param.data_type, + param.dim, + std::nullopt); + auto chunk = create_chunk(field_meta, array_vec); auto vector_array_chunk = static_cast(chunk.get()); // Verify results diff --git a/internal/core/src/index/json_stats/JsonKeyStats.cpp b/internal/core/src/index/json_stats/JsonKeyStats.cpp index 2d1265085f..e7aac4c86f 100644 --- a/internal/core/src/index/json_stats/JsonKeyStats.cpp +++ b/internal/core/src/index/json_stats/JsonKeyStats.cpp @@ -898,7 +898,7 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id, auto enable_mmap = !mmap_filepath_.empty(); auto column_group_info = - FieldDataInfo(column_group_id, num_rows, mmap_filepath_); + FieldDataInfo(column_group_id, field_id_, num_rows, mmap_filepath_); LOG_INFO( "loads column group {} with num_rows {} for segment " "{}", @@ -923,6 +923,7 @@ JsonKeyStats::LoadColumnGroup(int64_t column_group_id, auto translator = std::make_unique< milvus::segcore::storagev2translator::GroupChunkTranslator>( segment_id_, + GroupChunkType::JSON_KEY_STATS, field_meta_map, column_group_info, files, @@ -968,9 +969,11 @@ JsonKeyStats::LoadShreddingData(const std::vector& index_files) { void JsonKeyStats::Load(milvus::tracer::TraceContext ctx, const Config& config) { - if (config.contains(MMAP_FILE_PATH)) { - mmap_filepath_ = GetValueFromConfig(config, MMAP_FILE_PATH) - .value_or(""); + if (config.contains(ENABLE_MMAP)) { + mmap_filepath_ = + milvus::storage::LocalChunkManagerSingleton::GetInstance() + .GetChunkManager() + ->GetRootPath(); LOG_INFO("load json stats for segment {} with mmap local file path: {}", segment_id_, mmap_filepath_); diff --git a/internal/core/src/index/json_stats/parquet_writer.h b/internal/core/src/index/json_stats/parquet_writer.h index 9ef6781ff2..d45bde7381 100644 --- a/internal/core/src/index/json_stats/parquet_writer.h +++ b/internal/core/src/index/json_stats/parquet_writer.h @@ -105,7 +105,9 @@ class DefaultColumnGroupingStrategy : public ColumnGroupingStrategy { CreateGroups(const TableStatsInfo& table_info) const override { // put all columns into one group std::vector> column_groups; + column_groups.reserve(1); std::vector group; + group.reserve(table_info.schema->num_fields()); for (size_t i = 0; i < table_info.schema->num_fields(); ++i) { group.push_back(i); } diff --git a/internal/core/src/mmap/ChunkedColumnTest.cpp b/internal/core/src/mmap/ChunkedColumnTest.cpp index ee5116feed..54b6f13fe8 100644 --- a/internal/core/src/mmap/ChunkedColumnTest.cpp +++ b/internal/core/src/mmap/ChunkedColumnTest.cpp @@ -23,8 +23,10 @@ TEST(test_chunked_column, test_get_chunkid) { std::vector> chunks; for (auto i = 0; i < num_chunks; ++i) { auto row_num = num_rows_per_chunk[i]; - auto chunk = - std::make_unique(row_num, 1, nullptr, 0, 4, false); + auto chunk_mmap_guard = + std::make_shared(nullptr, 0, ""); + auto chunk = std::make_unique( + row_num, 1, nullptr, 0, 4, false, chunk_mmap_guard); chunks.push_back(std::move(chunk)); } auto translator = std::make_unique( diff --git a/internal/core/src/mmap/Types.h b/internal/core/src/mmap/Types.h index 37e6f0f0cc..b953b79db2 100644 --- a/internal/core/src/mmap/Types.h +++ b/internal/core/src/mmap/Types.h @@ -40,10 +40,24 @@ struct FieldDataInfo { arrow_reader_channel = std::make_shared(); } + FieldDataInfo(int64_t field_id, + int64_t main_field_id, + size_t row_count, + std::string mmap_dir_path = "", + bool in_load_list = false) + : field_id(field_id), + row_count(row_count), + main_field_id(main_field_id), + mmap_dir_path(std::move(mmap_dir_path)), + in_load_list(in_load_list) { + arrow_reader_channel = std::make_shared(); + } + int64_t field_id; + int64_t main_field_id{INVALID_FIELD_ID}; // used for json stats only size_t row_count; - std::string mmap_dir_path; + std::string mmap_dir_path{}; std::shared_ptr arrow_reader_channel; - bool in_load_list = false; + bool in_load_list{false}; }; } // namespace milvus diff --git a/internal/core/src/query/CachedSearchIteratorTest.cpp b/internal/core/src/query/CachedSearchIteratorTest.cpp index 54d18e45d8..7c883c4271 100644 --- a/internal/core/src/query/CachedSearchIteratorTest.cpp +++ b/internal/core/src/query/CachedSearchIteratorTest.cpp @@ -250,8 +250,16 @@ class CachedSearchIteratorTest memcpy(chunk_data.data(), base_dataset_.cbegin() + offset * dim_, rows * dim_ * sizeof(float)); - chunks.emplace_back(std::make_unique( - rows, dim_, chunk_data.data(), buf_size, sizeof(float), false)); + auto chunk_mmap_guard = + std::make_shared(nullptr, 0, ""); + chunks.emplace_back( + std::make_unique(rows, + dim_, + chunk_data.data(), + buf_size, + sizeof(float), + false, + chunk_mmap_guard)); offset += rows; } auto translator = std::make_unique( diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 8d08b0f48e..d5f9203792 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -562,6 +562,7 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal( auto translator = std::make_unique( get_segment_id(), + GroupChunkType::DEFAULT, field_metas, column_group_info, insert_files, diff --git a/internal/core/src/segcore/ChunkedSegmentSealedTest.cpp b/internal/core/src/segcore/ChunkedSegmentSealedTest.cpp index b748f44cd4..4b95803fa5 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedTest.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedTest.cpp @@ -95,8 +95,10 @@ TEST(test_chunk_segment, TestSearchOnSealed) { defer.AddDefer([buf]() { delete[] buf; }); memcpy(buf, data.data(), 4 * data.size()); + auto chunk_mmap_guard = + std::make_shared(nullptr, 0, ""); chunks.emplace_back(std::make_unique( - chunk_size, dim, buf, buf_size, 4, false)); + chunk_size, dim, buf, buf_size, 4, false, chunk_mmap_guard)); } auto translator = std::make_unique( diff --git a/internal/core/src/segcore/storagev1translator/ChunkTranslator.cpp b/internal/core/src/segcore/storagev1translator/ChunkTranslator.cpp index 7f4bb659c3..d2e58225c9 100644 --- a/internal/core/src/segcore/storagev1translator/ChunkTranslator.cpp +++ b/internal/core/src/segcore/storagev1translator/ChunkTranslator.cpp @@ -172,14 +172,6 @@ ChunkTranslator::get_cells( auto data_type = field_meta_.get_data_type(); - std::filesystem::path folder; - - if (use_mmap_) { - folder = std::filesystem::path(mmap_dir_path_) / - std::to_string(segment_id_) / std::to_string(field_id_); - std::filesystem::create_directories(folder); - } - for (auto cid : cids) { std::unique_ptr chunk = nullptr; if (!use_mmap_) { @@ -192,7 +184,11 @@ ChunkTranslator::get_cells( chunk = create_chunk(field_meta_, array_vec); } else { // we don't know the resulting file size beforehand, thus using a separate file for each chunk. - auto filepath = folder / std::to_string(cid); + auto filepath = + std::filesystem::path(mmap_dir_path_) / + fmt::format("seg_{}_fid_{}_{}", segment_id_, field_id_, cid); + std::filesystem::create_directories( + std::filesystem::path(mmap_dir_path_)); LOG_INFO("segment {} mmaping field {} chunk {} to path {}", segment_id_, diff --git a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp index fcffc89b6c..b254edffca 100644 --- a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp +++ b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.cpp @@ -45,6 +45,7 @@ namespace milvus::segcore::storagev2translator { GroupChunkTranslator::GroupChunkTranslator( int64_t segment_id, + GroupChunkType group_chunk_type, const std::unordered_map& field_metas, FieldDataInfo column_group_info, std::vector insert_files, @@ -52,7 +53,23 @@ GroupChunkTranslator::GroupChunkTranslator( int64_t num_fields, milvus::proto::common::LoadPriority load_priority) : segment_id_(segment_id), - key_(fmt::format("seg_{}_cg_{}", segment_id, column_group_info.field_id)), + group_chunk_type_(group_chunk_type), + key_([&]() { + switch (group_chunk_type) { + case GroupChunkType::DEFAULT: + return fmt::format( + "seg_{}_cg_{}", segment_id, column_group_info.field_id); + case GroupChunkType::JSON_KEY_STATS: + AssertInfo( + column_group_info.main_field_id != INVALID_FIELD_ID, + "main field id is not set for json key stats group " + "chunk"); + return fmt::format("seg_{}_jks_{}_cg_{}", + segment_id, + column_group_info.main_field_id, + column_group_info.field_id); + } + }()), field_metas_(field_metas), column_group_info_(column_group_info), insert_files_(insert_files), @@ -303,6 +320,12 @@ GroupChunkTranslator::load_group_chunk( // Create chunks for each field in this batch std::unordered_map> chunks; // Iterate through field_id_list to get field_id and create chunk + std::vector field_ids; + std::vector field_metas; + std::vector array_vecs; + field_metas.reserve(table->schema()->num_fields()); + array_vecs.reserve(table->schema()->num_fields()); + for (int i = 0; i < table->schema()->num_fields(); ++i) { AssertInfo(table->schema()->field(i)->metadata()->Contains( milvus_storage::ARROW_FIELD_ID_KEY), @@ -329,41 +352,41 @@ GroupChunkTranslator::load_group_chunk( fid.get()); const auto& field_meta = it->second; const arrow::ArrayVector& array_vec = table->column(i)->chunks(); - std::unique_ptr chunk; - if (!use_mmap_) { - // Memory mode - chunk = create_chunk(field_meta, array_vec); - } else { - // Mmap mode - std::filesystem::path filepath; - if (field_meta.get_main_field_id() != INVALID_FIELD_ID) { - // json shredding mode + field_ids.push_back(fid); + field_metas.push_back(field_meta); + array_vecs.push_back(array_vec); + } + + if (!use_mmap_) { + chunks = create_group_chunk(field_ids, field_metas, array_vecs); + } else { + std::filesystem::path filepath; + switch (group_chunk_type_) { + case GroupChunkType::DEFAULT: filepath = std::filesystem::path(column_group_info_.mmap_dir_path) / - std::to_string(segment_id_) / - std::to_string(field_meta.get_main_field_id()) / - std::to_string(field_id) / std::to_string(cid); - } else { + fmt::format("seg_{}_cg_{}_{}", + segment_id_, + column_group_info_.field_id, + cid); + break; + case GroupChunkType::JSON_KEY_STATS: filepath = std::filesystem::path(column_group_info_.mmap_dir_path) / - std::to_string(segment_id_) / std::to_string(field_id) / - std::to_string(cid); - } - - LOG_INFO( - "[StorageV2] translator {} mmaping field {} chunk {} to path " - "{}", - key_, - field_id, - cid, - filepath.string()); - - std::filesystem::create_directories(filepath.parent_path()); - - chunk = create_chunk(field_meta, array_vec, filepath.string()); + fmt::format("seg_{}_jks_{}_cg_{}_{}", + segment_id_, + column_group_info_.main_field_id, + column_group_info_.field_id, + cid); + break; + default: + ThrowInfo(ErrorCode::UnexpectedError, + "unknown group chunk type: {}", + static_cast(group_chunk_type_)); } - - chunks[fid] = std::move(chunk); + std::filesystem::create_directories(filepath.parent_path()); + chunks = create_group_chunk( + field_ids, field_metas, array_vecs, filepath.string()); } return std::make_unique(chunks); } diff --git a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.h b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.h index dba9b30298..9b6eee0416 100644 --- a/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.h +++ b/internal/core/src/segcore/storagev2translator/GroupChunkTranslator.h @@ -38,6 +38,7 @@ class GroupChunkTranslator public: GroupChunkTranslator( int64_t segment_id, + GroupChunkType group_chunk_type, const std::unordered_map& field_metas, FieldDataInfo column_group_info, std::vector insert_files, @@ -108,6 +109,7 @@ class GroupChunkTranslator const milvus::cachinglayer::cid_t cid); int64_t segment_id_; + GroupChunkType group_chunk_type_{GroupChunkType::DEFAULT}; std::string key_; std::unordered_map field_metas_; FieldDataInfo column_group_info_; diff --git a/internal/core/src/segcore/storagev2translator/GroupChunkTranslatorTest.cpp b/internal/core/src/segcore/storagev2translator/GroupChunkTranslatorTest.cpp index d94c3a2e2a..50d592864e 100644 --- a/internal/core/src/segcore/storagev2translator/GroupChunkTranslatorTest.cpp +++ b/internal/core/src/segcore/storagev2translator/GroupChunkTranslatorTest.cpp @@ -97,12 +97,17 @@ class GroupChunkTranslatorTest : public ::testing::TestWithParam { }; TEST_P(GroupChunkTranslatorTest, TestWithMmap) { + auto temp_dir = + std::filesystem::temp_directory_path() / "gctt_test_with_mmap"; + std::filesystem::create_directory(temp_dir); + auto use_mmap = GetParam(); std::unordered_map field_metas = schema_->get_fields(); - auto column_group_info = FieldDataInfo(0, 3000, ""); + auto column_group_info = FieldDataInfo(0, 3000, temp_dir.string()); auto translator = std::make_unique( segment_id_, + GroupChunkType::DEFAULT, field_metas, column_group_info, paths_, @@ -164,17 +169,23 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) { EXPECT_EQ(meta->chunk_memory_size_.size(), num_cells); EXPECT_EQ(expected_total_size, chunked_column_group->memory_size()); + // Verify the mmap files for cell 0 and 1 are created + std::vector mmap_paths = { + (temp_dir / "seg_0_cg_0_0").string(), + (temp_dir / "seg_0_cg_0_1").string()}; // Verify mmap directory and files if in mmap mode if (use_mmap) { - std::string mmap_dir = std::to_string(segment_id_); - EXPECT_TRUE(std::filesystem::exists(mmap_dir)); + for (const auto& mmap_path : mmap_paths) { + EXPECT_TRUE(std::filesystem::exists(mmap_path)); + } + } - // DO NOT Verify each field has a corresponding file: files are unlinked immediately after being mmaped. - // for (size_t i = 0; i < field_id_list.size(); ++i) { - // auto field_id = field_id_list.Get(i); - // std::string field_file = mmap_dir + "/" + std::to_string(field_id); - // EXPECT_TRUE(std::filesystem::exists(field_file)); - // } + // Clean up mmap files + if (use_mmap) { + for (const auto& mmap_path : mmap_paths) { + std::filesystem::remove(mmap_path); + } + std::filesystem::remove(temp_dir); } } @@ -228,10 +239,14 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) { AssertInfo(status.ok(), "failed to close file reader"); } - auto column_group_info = FieldDataInfo(0, total_rows, ""); + auto temp_dir = + std::filesystem::temp_directory_path() / "gctt_test_multiple_files"; + std::filesystem::create_directory(temp_dir); + auto column_group_info = FieldDataInfo(0, total_rows, temp_dir.string()); auto translator = std::make_unique( segment_id_, + GroupChunkType::DEFAULT, field_metas, column_group_info, multi_file_paths, @@ -310,6 +325,10 @@ TEST_P(GroupChunkTranslatorTest, TestMultipleFiles) { std::filesystem::remove(file_path); } } + // Clean up cached column group files + if (use_mmap && std::filesystem::exists(temp_dir)) { + std::filesystem::remove_all(temp_dir); + } } INSTANTIATE_TEST_SUITE_P(GroupChunkTranslatorTest,