enhance: refine variable-length-type memory usage(#38736) (#43093)

related: #38736
pr: https://github.com/milvus-io/milvus/pull/39578

Signed-off-by: MrPresent-Han <chun.han@gmail.com>
Co-authored-by: MrPresent-Han <chun.han@gmail.com>
This commit is contained in:
Chun Han 2025-07-04 18:26:45 +08:00 committed by zhenshan.cao
parent c00beaeca8
commit 29d9ee7bd2
18 changed files with 564 additions and 190 deletions

View File

@ -19,13 +19,42 @@
namespace milvus {
std::pair<std::vector<std::string_view>, FixedVector<bool>>
StringChunk::StringViews() {
StringChunk::StringViews(
std::optional<std::pair<int64_t, int64_t>> offset_len = std::nullopt) {
auto start_offset = 0;
auto len = row_nums_;
if (offset_len.has_value()) {
start_offset = offset_len->first;
len = offset_len->second;
AssertInfo(
start_offset >= 0 && start_offset < row_nums_,
"Retrieve string views with out-of-bound offset:{}, len:{}, wrong",
start_offset,
len);
AssertInfo(
len > 0 && len <= row_nums_,
"Retrieve string views with out-of-bound offset:{}, len:{}, wrong",
start_offset,
len);
AssertInfo(
start_offset + len <= row_nums_,
"Retrieve string views with out-of-bound offset:{}, len:{}, wrong",
start_offset,
len);
}
std::vector<std::string_view> ret;
ret.reserve(row_nums_);
for (int i = 0; i < row_nums_; i++) {
ret.reserve(len);
auto end_offset = start_offset + len;
for (auto i = start_offset; i < end_offset; i++) {
ret.emplace_back(data_ + offsets_[i], offsets_[i + 1] - offsets_[i]);
}
return {ret, valid_};
if (nullable_) {
FixedVector<bool> res_valid(valid_.begin() + start_offset,
valid_.begin() + end_offset);
return {ret, std::move(res_valid)};
}
return {ret, {}};
}
std::pair<std::vector<std::string_view>, FixedVector<bool>>
@ -43,35 +72,4 @@ StringChunk::ViewsByOffsets(const FixedVector<int32_t>& offsets) {
return {ret, valid_res};
}
void
ArrayChunk::ConstructViews() {
views_.reserve(row_nums_);
for (int i = 0; i < row_nums_; ++i) {
int offset = offsets_lens_[2 * i];
int next_offset = offsets_lens_[2 * (i + 1)];
int len = offsets_lens_[2 * i + 1];
auto data_ptr = data_ + offset;
auto offsets_bytes_len = 0;
uint32_t* offsets_ptr = nullptr;
if (IsStringDataType(element_type_)) {
offsets_bytes_len = len * sizeof(uint32_t);
offsets_ptr = reinterpret_cast<uint32_t*>(data_ptr);
}
views_.emplace_back(data_ptr + offsets_bytes_len,
len,
next_offset - offset - offsets_bytes_len,
element_type_,
offsets_ptr);
}
}
SpanBase
ArrayChunk::Span() const {
return SpanBase(views_.data(),
nullable_ ? valid_.data() : nullptr,
views_.size(),
sizeof(ArrayView));
}
} // namespace milvus

View File

@ -29,6 +29,7 @@
#include "simdjson/common_defs.h"
#include "sys/mman.h"
#include "common/Types.h"
namespace milvus {
constexpr uint64_t MMAP_STRING_PADDING = 1;
constexpr uint64_t MMAP_ARRAY_PADDING = 1;
@ -132,8 +133,11 @@ class StringChunk : public Chunk {
StringChunk() = default;
StringChunk(int32_t row_nums, char* data, uint64_t size, bool nullable)
: Chunk(row_nums, data, size, nullable) {
auto null_bitmap_bytes_num = (row_nums + 7) / 8;
offsets_ = reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
auto null_bitmap_bytes_num = 0;
if (nullable) {
null_bitmap_bytes_num = (row_nums + 7) / 8;
}
offsets_ = reinterpret_cast<uint32_t*>(data + null_bitmap_bytes_num);
}
std::string_view
@ -146,7 +150,7 @@ class StringChunk : public Chunk {
}
std::pair<std::vector<std::string_view>, FixedVector<bool>>
StringViews();
StringViews(std::optional<std::pair<int64_t, int64_t>> offset_len);
int
binary_search_string(std::string_view target) {
@ -181,13 +185,13 @@ class StringChunk : public Chunk {
return (*this)[idx].data();
}
uint64_t*
uint32_t*
Offsets() {
return offsets_;
}
protected:
uint64_t* offsets_;
uint32_t* offsets_;
};
using JSONChunk = StringChunk;
@ -200,22 +204,72 @@ class ArrayChunk : public Chunk {
milvus::DataType element_type,
bool nullable)
: Chunk(row_nums, data, size, nullable), element_type_(element_type) {
auto null_bitmap_bytes_num = (row_nums + 7) / 8;
auto null_bitmap_bytes_num = 0;
if (nullable) {
null_bitmap_bytes_num = (row_nums + 7) / 8;
}
offsets_lens_ =
reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
ConstructViews();
reinterpret_cast<uint32_t*>(data + null_bitmap_bytes_num);
}
SpanBase
Span() const;
ArrayView
View(int64_t idx) const {
return views_[idx];
View(int idx) const {
int idx_off = 2 * idx;
auto offset = offsets_lens_[idx_off];
auto len = offsets_lens_[idx_off + 1];
auto next_offset = offsets_lens_[idx_off + 2];
auto data_ptr = data_ + offset;
uint32_t offsets_bytes_len = 0;
uint32_t* offsets_ptr = nullptr;
if (IsStringDataType(element_type_)) {
offsets_bytes_len = len * sizeof(uint32_t);
offsets_ptr = reinterpret_cast<uint32_t*>(data_ptr);
}
return ArrayView(data_ptr + offsets_bytes_len,
len,
next_offset - offset - offsets_bytes_len,
element_type_,
offsets_ptr);
}
void
ConstructViews();
std::pair<std::vector<ArrayView>, FixedVector<bool>>
Views(std::optional<std::pair<int64_t, int64_t>> offset_len =
std::nullopt) const {
auto start_offset = 0;
auto len = row_nums_;
if (offset_len.has_value()) {
start_offset = offset_len->first;
len = offset_len->second;
AssertInfo(start_offset >= 0 && start_offset < row_nums_,
"Retrieve array views with out-of-bound offset:{}, "
"len:{}, wrong",
start_offset,
len);
AssertInfo(len > 0 && len <= row_nums_,
"Retrieve array views with out-of-bound offset:{}, "
"len:{}, wrong",
start_offset,
len);
AssertInfo(start_offset + len <= row_nums_,
"Retrieve array views with out-of-bound offset:{}, "
"len:{}, wrong",
start_offset,
len);
}
std::vector<ArrayView> views;
views.reserve(len);
auto end_offset = start_offset + len;
for (auto i = start_offset; i < end_offset; i++) {
views.emplace_back(View(i));
}
if (nullable_) {
FixedVector<bool> res_valid(valid_.begin() + start_offset,
valid_.begin() + end_offset);
return {std::move(views), std::move(res_valid)};
}
return {std::move(views), {}};
}
const char*
ValueAt(int64_t idx) const override {
@ -225,8 +279,7 @@ class ArrayChunk : public Chunk {
private:
milvus::DataType element_type_;
uint64_t* offsets_lens_;
std::vector<ArrayView> views_;
uint32_t* offsets_lens_;
};
class SparseFloatVectorChunk : public Chunk {

View File

@ -56,6 +56,12 @@ class MmapChunkTarget : public ChunkTarget {
clear() {
pos = 0;
}
void
write(uint32_t value) {
*reinterpret_cast<uint32_t*>(buf + pos) = value;
pos += sizeof(uint32_t);
}
};
public:

View File

@ -29,22 +29,28 @@ namespace milvus {
void
StringChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
auto size = 0;
std::vector<std::string> strs;
std::vector<std::string_view> strs;
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
std::vector<std::pair<const uint8_t*, int64_t>> null_bitmaps;
for (auto batch : *data) {
auto data = batch.ValueOrDie()->column(0);
auto batch_data = batch.ValueOrDie();
batches.emplace_back(batch_data);
auto data = batch_data->column(0);
auto array = std::dynamic_pointer_cast<arrow::StringArray>(data);
for (int i = 0; i < array->length(); i++) {
auto str = array->GetView(i);
strs.emplace_back(str);
size += str.size();
}
auto null_bitmap_n = (data->length() + 7) / 8;
null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n);
size += null_bitmap_n;
if (nullable_) {
auto null_bitmap_n = (data->length() + 7) / 8;
null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n);
size += null_bitmap_n;
}
row_nums_ += array->length();
}
size += sizeof(uint64_t) * (row_nums_ + 1) + MMAP_STRING_PADDING;
size += sizeof(uint32_t) * (row_nums_ + 1) + MMAP_STRING_PADDING;
if (file_) {
target_ = std::make_shared<MmapChunkTarget>(*file_, file_offset_);
} else {
@ -53,28 +59,20 @@ StringChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
// chunk layout: null bitmap, offset1, offset2, ..., offsetn, str1, str2, ..., strn, padding
// write null bitmaps
for (auto [data, size] : null_bitmaps) {
if (data == nullptr) {
std::vector<uint8_t> null_bitmap(size, 0xff);
target_->write(null_bitmap.data(), size);
} else {
target_->write(data, size);
}
}
write_null_bit_maps(null_bitmaps);
// write data
int offset_num = row_nums_ + 1;
int offset_start_pos = target_->tell() + sizeof(uint64_t) * offset_num;
std::vector<uint64_t> offsets;
uint32_t offset_start_pos = target_->tell() + sizeof(uint32_t) * offset_num;
std::vector<uint32_t> offsets;
offsets.reserve(offset_num);
for (const auto& str : strs) {
offsets.push_back(offset_start_pos);
offset_start_pos += str.size();
}
offsets.push_back(offset_start_pos);
target_->write(offsets.data(), offsets.size() * sizeof(uint64_t));
target_->write(offsets.data(), offsets.size() * sizeof(uint32_t));
for (auto str : strs) {
target_->write(str.data(), str.size());
}
@ -93,7 +91,6 @@ StringChunkWriter::finish() {
void
JSONChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
auto size = 0;
std::vector<Json> jsons;
std::vector<std::pair<const uint8_t*, int64_t>> null_bitmaps;
for (auto batch : *data) {
@ -105,14 +102,14 @@ JSONChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
size += json.data().size();
jsons.push_back(std::move(json));
}
// AssertInfo(data->length() % 8 == 0,
// "String length should be multiple of 8");
auto null_bitmap_n = (data->length() + 7) / 8;
null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n);
size += null_bitmap_n;
if (nullable_) {
auto null_bitmap_n = (data->length() + 7) / 8;
null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n);
size += null_bitmap_n;
}
row_nums_ += array->length();
}
size += sizeof(uint64_t) * (row_nums_ + 1) + simdjson::SIMDJSON_PADDING;
size += sizeof(uint32_t) * (row_nums_ + 1) + simdjson::SIMDJSON_PADDING;
if (file_) {
target_ = std::make_shared<MmapChunkTarget>(*file_, file_offset_);
} else {
@ -121,26 +118,19 @@ JSONChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
// chunk layout: null bitmaps, offset1, offset2, ... ,json1, json2, ..., jsonn
// write null bitmaps
for (auto [data, size] : null_bitmaps) {
if (data == nullptr) {
std::vector<uint8_t> null_bitmap(size, 0xff);
target_->write(null_bitmap.data(), size);
} else {
target_->write(data, size);
}
}
write_null_bit_maps(null_bitmaps);
int offset_num = row_nums_ + 1;
int offset_start_pos = target_->tell() + sizeof(uint64_t) * offset_num;
std::vector<uint64_t> offsets;
uint32_t offset_start_pos = target_->tell() + sizeof(uint32_t) * offset_num;
std::vector<uint32_t> offsets;
offsets.reserve(offset_num);
for (const auto& json : jsons) {
offsets.push_back(offset_start_pos);
offset_start_pos += json.data().size();
}
offsets.push_back(offset_start_pos);
target_->write(offsets.data(), offsets.size() * sizeof(uint64_t));
target_->write(offsets.data(), offset_num * sizeof(uint32_t));
// write data
for (const auto& json : jsons) {
@ -160,10 +150,10 @@ JSONChunkWriter::finish() {
void
ArrayChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
auto size = 0;
auto is_string = IsStringDataType(element_type_);
std::vector<Array> arrays;
std::vector<std::pair<const uint8_t*, int64_t>> null_bitmaps;
for (auto batch : *data) {
auto data = batch.ValueOrDie()->column(0);
auto array = std::dynamic_pointer_cast<arrow::BinaryArray>(data);
@ -180,13 +170,15 @@ ArrayChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
}
}
row_nums_ += array->length();
auto null_bitmap_n = (data->length() + 7) / 8;
null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n);
size += null_bitmap_n;
if (nullable_) {
auto null_bitmap_n = (data->length() + 7) / 8;
null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n);
size += null_bitmap_n;
}
}
// offsets + lens
size += sizeof(uint64_t) * (row_nums_ * 2 + 1) + MMAP_ARRAY_PADDING;
size += sizeof(uint32_t) * (row_nums_ * 2 + 1) + MMAP_ARRAY_PADDING;
if (file_) {
target_ = std::make_shared<MmapChunkTarget>(*file_, file_offset_);
} else {
@ -194,21 +186,14 @@ ArrayChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
}
// chunk layout: nullbitmaps, offsets, elem_off1, elem_off2, .. data1, data2, ..., datan, padding
for (auto [data, size] : null_bitmaps) {
if (data == nullptr) {
std::vector<uint8_t> null_bitmap(size, 0xff);
target_->write(null_bitmap.data(), size);
} else {
target_->write(data, size);
}
}
write_null_bit_maps(null_bitmaps);
int offsets_num = row_nums_ + 1;
int len_num = row_nums_;
uint64_t offset_start_pos =
target_->tell() + sizeof(uint64_t) * (offsets_num + len_num);
std::vector<uint64_t> offsets(offsets_num);
std::vector<uint64_t> lens(len_num);
uint32_t offset_start_pos =
target_->tell() + sizeof(uint32_t) * (offsets_num + len_num);
std::vector<uint32_t> offsets(offsets_num);
std::vector<uint32_t> lens(len_num);
for (auto i = 0; i < arrays.size(); i++) {
auto& arr = arrays[i];
offsets[i] = offset_start_pos;
@ -222,11 +207,11 @@ ArrayChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
for (int i = 0; i < offsets.size(); i++) {
if (i == offsets.size() - 1) {
target_->write(&offsets[i], sizeof(uint64_t));
target_->write(&offsets[i], sizeof(uint32_t));
break;
}
target_->write(&offsets[i], sizeof(uint64_t));
target_->write(&lens[i], sizeof(uint64_t));
target_->write(&offsets[i], sizeof(uint32_t));
target_->write(&lens[i], sizeof(uint32_t));
}
for (auto& arr : arrays) {

View File

@ -43,6 +43,22 @@ class ChunkWriterBase {
return target_->get();
}
void
write_null_bit_maps(
const std::vector<std::pair<const uint8_t*, int64_t>>& null_bitmaps) {
if (nullable_) {
for (auto [data, size] : null_bitmaps) {
if (data != nullptr) {
target_->write(data, size);
} else {
// have to append always-true bitmap due to arrow optimize this
std::vector<uint8_t> null_bitmap(size, 0xff);
target_->write(null_bitmap.data(), size);
}
}
}
}
protected:
int row_nums_ = 0;
File* file_ = nullptr;

View File

@ -67,7 +67,7 @@ SetDefaultConfigParamTypeCheck(bool val);
struct BufferView {
struct Element {
const char* data_;
uint64_t* offsets_;
uint32_t* offsets_;
int start_;
int end_;
};

View File

@ -705,7 +705,8 @@ class SegmentExpr : public Expr {
if (!skip_func || !skip_func(skip_index, field_id_, i)) {
bool is_seal = false;
if constexpr (std::is_same_v<T, std::string_view> ||
std::is_same_v<T, Json>) {
std::is_same_v<T, Json> ||
std::is_same_v<T, ArrayView>) {
if (segment_->type() == SegmentType::Sealed) {
// first is the raw data, second is valid_data
// use valid_data to see if raw data is null
@ -1034,7 +1035,8 @@ class SegmentExpr : public Expr {
bool access_sealed_variable_column = false;
if constexpr (std::is_same_v<T, std::string_view> ||
std::is_same_v<T, Json>) {
std::is_same_v<T, Json> ||
std::is_same_v<T, ArrayView>) {
if (segment_->type() == SegmentType::Sealed) {
auto [data_vec, valid_data] = segment_->get_batch_views<T>(
field_id_, i, data_pos, size);

View File

@ -154,11 +154,19 @@ class ChunkedColumnBase : public ColumnBase {
}
virtual std::pair<std::vector<std::string_view>, FixedVector<bool>>
StringViews(int64_t chunk_id) const {
StringViews(int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len) const {
PanicInfo(ErrorCode::Unsupported,
"StringViews only supported for VariableColumn");
}
virtual std::pair<std::vector<ArrayView>, FixedVector<bool>>
ArrayViews(int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len) const {
PanicInfo(ErrorCode::Unsupported,
"ArrayViews only supported for ArrayChunkedColumn");
}
virtual std::pair<std::vector<std::string_view>, FixedVector<bool>>
ViewsByOffsets(int64_t chunk_id,
const FixedVector<int32_t>& offsets) const {
@ -334,9 +342,11 @@ class ChunkedVariableColumn : public ChunkedColumnBase {
}
std::pair<std::vector<std::string_view>, FixedVector<bool>>
StringViews(int64_t chunk_id) const override {
return std::static_pointer_cast<StringChunk>(chunks_[chunk_id])
->StringViews();
StringViews(int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len =
std::nullopt) const override {
return std::dynamic_pointer_cast<StringChunk>(chunks_[chunk_id])
->StringViews(offset_len);
}
std::shared_ptr<Chunk>
@ -408,7 +418,8 @@ class ChunkedArrayColumn : public ChunkedColumnBase {
SpanBase
Span(int64_t chunk_id) const override {
return std::dynamic_pointer_cast<ArrayChunk>(chunks_[chunk_id])->Span();
PanicInfo(ErrorCode::NotImplemented,
"span() interface is not implemented for arr chunk column");
}
ArrayView
@ -425,5 +436,13 @@ class ChunkedArrayColumn : public ChunkedColumnBase {
->View(offset_in_chunk)
.output_data();
}
std::pair<std::vector<ArrayView>, FixedVector<bool>>
ArrayViews(int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len =
std::nullopt) const override {
return std::dynamic_pointer_cast<ArrayChunk>(chunks_[chunk_id])
->Views(offset_len);
}
};
} // namespace milvus

View File

@ -323,6 +323,12 @@ class SingleChunkColumnBase : public ColumnBase {
"StringViews only supported for VariableColumn");
}
virtual std::pair<std::vector<ArrayView>, FixedVector<bool>>
ArrayViews() const {
PanicInfo(ErrorCode::Unsupported,
"ArrayView only supported for ArrayColumn");
}
virtual std::pair<std::vector<std::string_view>, FixedVector<bool>>
ViewsByOffsets(const FixedVector<int32_t>& offsets) const {
PanicInfo(ErrorCode::Unsupported,
@ -960,6 +966,11 @@ class SingleChunkArrayColumn : public SingleChunkColumnBase {
ConstructViews();
}
std::pair<std::vector<ArrayView>, FixedVector<bool>>
ArrayViews() const override {
return {Views(), valid_data_};
}
protected:
void
ConstructViews() {
@ -978,6 +989,7 @@ class SingleChunkArrayColumn : public SingleChunkColumnBase {
element_type_,
element_indices_[last].data());
lens_.clear();
indices_.clear();
}
private:

View File

@ -373,28 +373,9 @@ ChunkedSegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
std::make_shared<ChunkedArrayColumn>(field_meta);
std::shared_ptr<milvus::ArrowDataWrapper> r;
while (data.arrow_reader_channel->pop(r)) {
// for (auto i = 0; i < field_data->get_num_rows(); i++) {
// auto rawValue = field_data->RawValue(i);
// auto array =
// static_cast<const milvus::Array*>(rawValue);
// if (field_data->IsNullable()) {
// var_column->Append(*array,
// field_data->is_valid(i));
// } else {
// var_column->Append(*array);
// }
// // we stores the offset for each array element, so there is a additional uint64_t for each array element
// field_data_size =
// array->byte_size() + sizeof(uint64_t);
// stats_.mem_size +=
// array->byte_size() + sizeof(uint64_t);
// }
auto chunk = create_chunk(field_meta, 1, r->reader);
var_column->AddChunk(chunk);
}
// var_column->Seal();
column = std::move(var_column);
break;
}
@ -759,24 +740,42 @@ ChunkedSegmentSealedImpl::chunk_data_impl(FieldId field_id,
return field_data->Span(chunk_id);
}
auto field_data = insert_record_.get_data_base(field_id);
AssertInfo(field_data->num_chunk() == 1,
"num chunk not equal to 1 for sealed segment");
// system field
return field_data->get_span_base(0);
}
std::pair<std::vector<std::string_view>, FixedVector<bool>>
ChunkedSegmentSealedImpl::chunk_view_impl(FieldId field_id,
int64_t chunk_id) const {
std::pair<std::vector<ArrayView>, FixedVector<bool>>
ChunkedSegmentSealedImpl::chunk_array_view_impl(
FieldId field_id,
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len =
std::nullopt) const {
std::shared_lock lck(mutex_);
AssertInfo(get_bit(field_data_ready_bitset_, field_id),
"Can't get bitset element at " + std::to_string(field_id.get()));
if (auto it = fields_.find(field_id); it != fields_.end()) {
auto& field_data = it->second;
return field_data->StringViews(chunk_id);
return field_data->ArrayViews(chunk_id, offset_len);
}
PanicInfo(ErrorCode::UnexpectedError,
"chunk_view_impl only used for variable column field ");
"chunk_array_view_impl only used for chunk column field ");
}
std::pair<std::vector<std::string_view>, FixedVector<bool>>
ChunkedSegmentSealedImpl::chunk_string_view_impl(
FieldId field_id,
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len =
std::nullopt) const {
std::shared_lock lck(mutex_);
AssertInfo(get_bit(field_data_ready_bitset_, field_id),
"Can't get bitset element at " + std::to_string(field_id.get()));
if (auto it = fields_.find(field_id); it != fields_.end()) {
auto& field_data = it->second;
return field_data->StringViews(chunk_id, offset_len);
}
PanicInfo(ErrorCode::UnexpectedError,
"chunk_string_view_impl only used for variable column field ");
}
std::pair<std::vector<std::string_view>, FixedVector<bool>>

View File

@ -234,7 +234,16 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
chunk_data_impl(FieldId field_id, int64_t chunk_id) const override;
std::pair<std::vector<std::string_view>, FixedVector<bool>>
chunk_view_impl(FieldId field_id, int64_t chunk_id) const override;
chunk_string_view_impl(
FieldId field_id,
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len) const override;
std::pair<std::vector<ArrayView>, FixedVector<bool>>
chunk_array_view_impl(
FieldId field_id,
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len) const override;
std::pair<std::vector<std::string_view>, FixedVector<bool>>
chunk_view_by_offsets(FieldId field_id,

View File

@ -399,9 +399,23 @@ SegmentGrowingImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const {
}
std::pair<std::vector<std::string_view>, FixedVector<bool>>
SegmentGrowingImpl::chunk_view_impl(FieldId field_id, int64_t chunk_id) const {
SegmentGrowingImpl::chunk_string_view_impl(
FieldId field_id,
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len =
std::nullopt) const {
PanicInfo(ErrorCode::NotImplemented,
"chunk view impl not implement for growing segment");
"chunk string view impl not implement for growing segment");
}
std::pair<std::vector<ArrayView>, FixedVector<bool>>
SegmentGrowingImpl::chunk_array_view_impl(
FieldId field_id,
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len =
std::nullopt) const {
PanicInfo(ErrorCode::NotImplemented,
"chunk array view impl not implement for growing segment");
}
std::pair<std::vector<std::string_view>, FixedVector<bool>>

View File

@ -364,7 +364,16 @@ class SegmentGrowingImpl : public SegmentGrowing {
chunk_data_impl(FieldId field_id, int64_t chunk_id) const override;
std::pair<std::vector<std::string_view>, FixedVector<bool>>
chunk_view_impl(FieldId field_id, int64_t chunk_id) const override;
chunk_string_view_impl(
FieldId field_id,
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len) const override;
std::pair<std::vector<ArrayView>, FixedVector<bool>>
chunk_array_view_impl(
FieldId field_id,
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len) const override;
std::pair<std::vector<std::string_view>, FixedVector<bool>>
chunk_view_by_offsets(FieldId field_id,

View File

@ -164,18 +164,28 @@ class SegmentInternalInterface : public SegmentInterface {
template <typename ViewType>
std::pair<std::vector<ViewType>, FixedVector<bool>>
chunk_view(FieldId field_id, int64_t chunk_id) const {
auto [string_views, valid_data] = chunk_view_impl(field_id, chunk_id);
chunk_view(FieldId field_id,
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len =
std::nullopt) const {
if constexpr (std::is_same_v<ViewType, std::string_view>) {
auto [string_views, valid_data] =
chunk_string_view_impl(field_id, chunk_id, offset_len);
return std::make_pair(std::move(string_views),
std::move(valid_data));
} else {
std::vector<ViewType> res;
} else if constexpr (std::is_same_v<ViewType, ArrayView>) {
auto [array_views, valid_data] =
chunk_array_view_impl(field_id, chunk_id, offset_len);
return std::make_pair(array_views, valid_data);
} else if constexpr (std::is_same_v<ViewType, Json>) {
auto [string_views, valid_data] =
chunk_string_view_impl(field_id, chunk_id, offset_len);
std::vector<Json> res;
res.reserve(string_views.size());
for (const auto& view : string_views) {
res.emplace_back(view);
for (const auto& str_view : string_views) {
res.emplace_back(str_view);
}
return std::make_pair(res, valid_data);
return {std::move(res), std::move(valid_data)};
}
}
@ -189,31 +199,8 @@ class SegmentInternalInterface : public SegmentInterface {
PanicInfo(ErrorCode::Unsupported,
"get chunk views not supported for growing segment");
}
auto chunk_info =
get_chunk_buffer(field_id, chunk_id, start_offset, length);
BufferView buffer = chunk_info.first;
std::vector<ViewType> res;
res.reserve(length);
if (buffer.data_.index() == 1) {
char* pos = std::get<1>(buffer.data_).first;
for (size_t j = 0; j < length; j++) {
uint32_t size;
size = *reinterpret_cast<uint32_t*>(pos);
pos += sizeof(uint32_t);
res.emplace_back(ViewType(pos, size));
pos += size;
}
} else {
auto elements = std::get<0>(buffer.data_);
for (auto& element : elements) {
for (int i = element.start_; i < element.end_; i++) {
res.emplace_back(ViewType(
element.data_ + element.offsets_[i],
element.offsets_[i + 1] - element.offsets_[i]));
}
}
}
return std::make_pair(std::move(res), std::move(chunk_info.second));
return chunk_view<ViewType>(
field_id, chunk_id, std::make_pair(start_offset, length));
}
template <typename ViewType>
@ -470,7 +457,16 @@ class SegmentInternalInterface : public SegmentInterface {
// internal API: return chunk string views in vector
virtual std::pair<std::vector<std::string_view>, FixedVector<bool>>
chunk_view_impl(FieldId field_id, int64_t chunk_id) const = 0;
chunk_string_view_impl(FieldId field_id,
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>>
offset_len = std::nullopt) const = 0;
virtual std::pair<std::vector<ArrayView>, FixedVector<bool>>
chunk_array_view_impl(FieldId field_id,
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>>
offset_len = std::nullopt) const = 0;
// internal API: return buffer reference to field chunk data located from start_offset
virtual std::pair<BufferView, FixedVector<bool>>

View File

@ -750,7 +750,11 @@ SegmentSealedImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const {
}
std::pair<std::vector<std::string_view>, FixedVector<bool>>
SegmentSealedImpl::chunk_view_impl(FieldId field_id, int64_t chunk_id) const {
SegmentSealedImpl::chunk_string_view_impl(
FieldId field_id,
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len =
std::nullopt) const {
std::shared_lock lck(mutex_);
AssertInfo(get_bit(field_data_ready_bitset_, field_id),
"Can't get bitset element at " + std::to_string(field_id.get()));
@ -759,7 +763,24 @@ SegmentSealedImpl::chunk_view_impl(FieldId field_id, int64_t chunk_id) const {
return field_data->StringViews();
}
PanicInfo(ErrorCode::UnexpectedError,
"chunk_view_impl only used for variable column field ");
"chunk_string_view_impl only used for variable column field ");
}
std::pair<std::vector<ArrayView>, FixedVector<bool>>
SegmentSealedImpl::chunk_array_view_impl(
FieldId field_id,
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len =
std::nullopt) const {
std::shared_lock lck(mutex_);
AssertInfo(get_bit(field_data_ready_bitset_, field_id),
"Can't get bitset element at " + std::to_string(field_id.get()));
if (auto it = fields_.find(field_id); it != fields_.end()) {
auto& field_data = it->second;
return field_data->ArrayViews();
}
PanicInfo(ErrorCode::UnexpectedError,
"chunk_array_view_impl only used for array column field ");
}
std::pair<std::vector<std::string_view>, FixedVector<bool>>

View File

@ -234,7 +234,16 @@ class SegmentSealedImpl : public SegmentSealed {
chunk_data_impl(FieldId field_id, int64_t chunk_id) const override;
std::pair<std::vector<std::string_view>, FixedVector<bool>>
chunk_view_impl(FieldId field_id, int64_t chunk_id) const override;
chunk_string_view_impl(
FieldId field_id,
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len) const override;
std::pair<std::vector<ArrayView>, FixedVector<bool>>
chunk_array_view_impl(
FieldId field_id,
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len) const override;
std::pair<std::vector<std::string_view>, FixedVector<bool>>
chunk_view_by_offsets(FieldId field_id,

View File

@ -31,6 +31,7 @@
#include "storage/Util.h"
#include "test_utils/Constants.h"
#include "test_utils/DataGen.h"
using namespace milvus;
TEST(chunk, test_int64_field) {
@ -99,12 +100,133 @@ TEST(chunk, test_variable_field) {
FieldMeta field_meta(
FieldName("a"), milvus::FieldId(1), DataType::STRING, false);
auto chunk = create_chunk(field_meta, 1, rb_reader);
auto views = std::dynamic_pointer_cast<StringChunk>(chunk)->StringViews();
auto views = std::dynamic_pointer_cast<StringChunk>(chunk)->StringViews(
std::nullopt);
for (size_t i = 0; i < data.size(); ++i) {
EXPECT_EQ(views.first[i], data[i]);
}
}
TEST(chunk, test_json_field) {
auto row_num = 100;
FixedVector<Json> data;
data.reserve(row_num);
std::string json_str = "{\"key\": \"value\"}";
for (auto i = 0; i < row_num; i++) {
auto json = Json(json_str.data(), json_str.size());
data.push_back(std::move(json));
}
auto field_data = milvus::storage::CreateFieldData(storage::DataType::JSON);
field_data->FillFieldData(data.data(), data.size());
storage::InsertEventData event_data;
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
event_data.payload_reader = payload_reader;
auto ser_data = event_data.Serialize();
auto get_record_batch_reader =
[&]() -> std::shared_ptr<::arrow::RecordBatchReader> {
auto buffer = std::make_shared<arrow::io::BufferReader>(
ser_data.data() + 2 * sizeof(milvus::Timestamp),
ser_data.size() - 2 * sizeof(milvus::Timestamp));
parquet::arrow::FileReaderBuilder reader_builder;
auto s = reader_builder.Open(buffer);
EXPECT_TRUE(s.ok());
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
s = reader_builder.Build(&arrow_reader);
EXPECT_TRUE(s.ok());
std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
s = arrow_reader->GetRecordBatchReader(&rb_reader);
EXPECT_TRUE(s.ok());
return rb_reader;
};
{
auto rb_reader = get_record_batch_reader();
// nullable=false
FieldMeta field_meta(
FieldName("a"), milvus::FieldId(1), DataType::JSON, false);
auto chunk = create_chunk(field_meta, 1, rb_reader);
{
auto [views, valid] =
std::dynamic_pointer_cast<JSONChunk>(chunk)->StringViews(
std::nullopt);
EXPECT_EQ(row_num, views.size());
for (size_t i = 0; i < row_num; ++i) {
EXPECT_EQ(views[i], data[i].data());
//nullable is false, no judging valid
}
}
{
auto start = 10;
auto len = 20;
auto [views, valid] =
std::dynamic_pointer_cast<JSONChunk>(chunk)->StringViews(
std::make_pair(start, len));
EXPECT_EQ(len, views.size());
for (size_t i = 0; i < len; ++i) {
EXPECT_EQ(views[i], data[i].data());
}
}
}
{
auto rb_reader = get_record_batch_reader();
// nullable=true
FieldMeta field_meta(
FieldName("a"), milvus::FieldId(1), DataType::JSON, true);
auto chunk = create_chunk(field_meta, 1, rb_reader);
{
auto [views, valid] =
std::dynamic_pointer_cast<JSONChunk>(chunk)->StringViews(
std::nullopt);
EXPECT_EQ(row_num, views.size());
for (size_t i = 0; i < row_num; ++i) {
EXPECT_EQ(views[i], data[i].data());
EXPECT_TRUE(valid[i]); //no input valid map, all padded as true
}
}
{
auto start = 10;
auto len = 20;
auto [views, valid] =
std::dynamic_pointer_cast<JSONChunk>(chunk)->StringViews(
std::make_pair(start, len));
EXPECT_EQ(len, views.size());
for (size_t i = 0; i < len; ++i) {
EXPECT_EQ(views[i], data[i].data());
EXPECT_TRUE(valid[i]); //no input valid map, all padded as true
}
}
{
auto start = -1;
auto len = 5;
EXPECT_THROW(
std::dynamic_pointer_cast<JSONChunk>(chunk)->StringViews(
std::make_pair(start, len)),
milvus::SegcoreError);
}
{
auto start = 0;
auto len = row_num + 1;
EXPECT_THROW(
std::dynamic_pointer_cast<JSONChunk>(chunk)->StringViews(
std::make_pair(start, len)),
milvus::SegcoreError);
}
{
auto start = 95;
auto len = 11;
EXPECT_THROW(
std::dynamic_pointer_cast<JSONChunk>(chunk)->StringViews(
std::make_pair(start, len)),
milvus::SegcoreError);
}
}
}
TEST(chunk, test_null_field) {
FixedVector<int64_t> data = {1, 2, 3, 4, 5};
auto field_data =
@ -185,15 +307,112 @@ TEST(chunk, test_array) {
DataType::STRING,
false);
auto chunk = create_chunk(field_meta, 1, rb_reader);
auto span = std::dynamic_pointer_cast<ArrayChunk>(chunk)->Span();
EXPECT_EQ(span.row_count(), 1);
auto arr = *(ArrayView*)span.data();
auto [views, valid] =
std::dynamic_pointer_cast<ArrayChunk>(chunk)->Views(std::nullopt);
EXPECT_EQ(views.size(), 1);
auto& arr = views[0];
for (size_t i = 0; i < arr.length(); ++i) {
auto str = arr.get_data<std::string>(i);
EXPECT_EQ(str, field_string_data.string_data().data(i));
}
}
TEST(chunk, test_array_views) {
milvus::proto::schema::ScalarField field_string_data;
field_string_data.mutable_string_data()->add_data("a");
field_string_data.mutable_string_data()->add_data("b");
field_string_data.mutable_string_data()->add_data("c");
field_string_data.mutable_string_data()->add_data("d");
field_string_data.mutable_string_data()->add_data("e");
auto string_array = Array(field_string_data);
auto array_count = 10;
FixedVector<Array> data;
data.reserve(array_count);
for (int i = 0; i < array_count; i++) {
data.emplace_back(string_array);
}
auto field_data =
milvus::storage::CreateFieldData(storage::DataType::ARRAY);
field_data->FillFieldData(data.data(), data.size());
storage::InsertEventData event_data;
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
event_data.payload_reader = payload_reader;
auto ser_data = event_data.Serialize();
auto buffer = std::make_shared<arrow::io::BufferReader>(
ser_data.data() + 2 * sizeof(milvus::Timestamp),
ser_data.size() - 2 * sizeof(milvus::Timestamp));
parquet::arrow::FileReaderBuilder reader_builder;
auto s = reader_builder.Open(buffer);
EXPECT_TRUE(s.ok());
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
s = reader_builder.Build(&arrow_reader);
EXPECT_TRUE(s.ok());
std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
s = arrow_reader->GetRecordBatchReader(&rb_reader);
EXPECT_TRUE(s.ok());
FieldMeta field_meta(FieldName("field1"),
milvus::FieldId(1),
DataType::ARRAY,
DataType::STRING,
true);
auto chunk = create_chunk(field_meta, 1, rb_reader);
{
auto [views, valid] =
std::dynamic_pointer_cast<ArrayChunk>(chunk)->Views(std::nullopt);
EXPECT_EQ(views.size(), array_count);
for (auto i = 0; i < array_count; i++) {
auto& arr = views[i];
for (size_t j = 0; j < arr.length(); ++j) {
auto str = arr.get_data<std::string>(j);
EXPECT_EQ(str, field_string_data.string_data().data(j));
}
}
}
{
auto start = 2;
auto len = 5;
auto [views, valid] =
std::dynamic_pointer_cast<ArrayChunk>(chunk)->Views(
std::make_pair(start, len));
EXPECT_EQ(views.size(), len);
for (auto i = 0; i < len; i++) {
auto& arr = views[i];
for (size_t j = 0; j < arr.length(); ++j) {
auto str = arr.get_data<std::string>(j);
EXPECT_EQ(str, field_string_data.string_data().data(j));
}
}
}
{
auto start = -1;
auto len = 5;
EXPECT_THROW(std::dynamic_pointer_cast<ArrayChunk>(chunk)->Views(
std::make_pair(start, len)),
milvus::SegcoreError);
}
{
auto start = 0;
auto len = array_count + 1;
EXPECT_THROW(std::dynamic_pointer_cast<ArrayChunk>(chunk)->Views(
std::make_pair(start, len)),
milvus::SegcoreError);
}
{
auto start = 5;
auto len = 7;
EXPECT_THROW(std::dynamic_pointer_cast<ArrayChunk>(chunk)->Views(
std::make_pair(start, len)),
milvus::SegcoreError);
}
}
TEST(chunk, test_sparse_float) {
auto n_rows = 100;
auto vecs = milvus::segcore::GenerateRandomSparseFloatVector(

View File

@ -215,6 +215,13 @@ class TestChunkSegment : public testing::Test {
field_infos.push_back(field_info);
}
std::vector<std::string> str_data;
for (int i = 0; i < test_data_count * chunk_num; i++) {
str_data.push_back("test" + std::to_string(i));
}
std::sort(str_data.begin(), str_data.end());
std::vector<bool> validity(test_data_count, true);
// generate data
for (int chunk_id = 0; chunk_id < chunk_num;
chunk_id++, start_id += test_data_count) {
@ -222,8 +229,8 @@ class TestChunkSegment : public testing::Test {
std::iota(test_data.begin(), test_data.end(), start_id);
auto builder = std::make_shared<arrow::Int64Builder>();
auto status =
builder->AppendValues(test_data.begin(), test_data.end());
auto status = builder->AppendValues(
test_data.begin(), test_data.end(), validity.begin());
ASSERT_TRUE(status.ok());
auto res = builder->Finish();
ASSERT_TRUE(res.ok());