enhance: refine chunk access logic and add some comment on data (#40618)

#40367

Signed-off-by: luzhang <luzhang@zilliz.com>
Co-authored-by: luzhang <luzhang@zilliz.com>
This commit is contained in:
zhagnlu 2025-03-16 22:20:08 +08:00 committed by GitHub
parent 123b6588b6
commit 7ebe3d7038
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 251 additions and 76 deletions

View File

@ -88,7 +88,6 @@ class Chunk {
FixedVector<bool>
valid_; // parse null bitmap to valid_ to be compatible with SpanBase
};
// for fixed size data, includes fixed size array
class FixedWidthChunk : public Chunk {
public:
@ -100,12 +99,14 @@ class FixedWidthChunk : public Chunk {
bool nullable)
: Chunk(row_nums, data, size, nullable),
dim_(dim),
element_size_(element_size){};
element_size_(element_size) {
auto null_bitmap_bytes_num = nullable_ ? (row_nums_ + 7) / 8 : 0;
data_start_ = data_ + null_bitmap_bytes_num;
};
milvus::SpanBase
Span() const {
auto null_bitmap_bytes_num = (row_nums_ + 7) / 8;
return milvus::SpanBase(data_ + null_bitmap_bytes_num,
return milvus::SpanBase(data_start_,
nullable_ ? valid_.data() : nullptr,
row_nums_,
element_size_ * dim_);
@ -113,30 +114,47 @@ class FixedWidthChunk : public Chunk {
const char*
ValueAt(int64_t idx) const override {
auto null_bitmap_bytes_num = (row_nums_ + 7) / 8;
return data_ + null_bitmap_bytes_num + idx * element_size_ * dim_;
return data_start_ + idx * element_size_ * dim_;
}
const char*
Data() const override {
auto null_bitmap_bytes_num = (row_nums_ + 7) / 8;
return data_ + null_bitmap_bytes_num;
return data_start_;
}
private:
int dim_;
int element_size_;
const char* data_start_;
};
// A StringChunk is a class that represents a collection of strings stored in a contiguous memory block.
// It is initialized with the number of rows, a pointer to the data, the size of the data, and a boolean
// indicating whether the data can contain null values. The data is accessed using offsets, which are
// stored after an optional null bitmap. Each string is represented by a range in the data block, defined
// by these offsets.
//
// Example of a valid StringChunk:
//
// Suppose we have a data block containing the strings "apple", "banana", and "cherry", and we want to
// create a StringChunk for these strings. The data block might look like this:
//
// [null_bitmap][offsets][string_data]
// [00000000] [17, 22, 28, 34] ["apple", "banana", "cherry"]
//
// Here, the null_bitmap is empty (indicating no nulls), the offsets array indicates the start of each
// string in the data block, and the string_data contains the actual string content.
//
// StringChunk exampleChunk(3, dataPointer, dataSize, false);
//
// In this example, 'exampleChunk' is a StringChunk with 3 rows, a pointer to the data stored in 'dataPointer',
// a total data size of 'dataSize', and it does not support nullability.
class StringChunk : public Chunk {
public:
StringChunk() = default;
StringChunk(int32_t row_nums, char* data, uint64_t size, bool nullable)
: Chunk(row_nums, data, size, nullable) {
auto null_bitmap_bytes_num = 0;
if (nullable) {
null_bitmap_bytes_num = (row_nums + 7) / 8;
}
auto null_bitmap_bytes_num = nullable_ ? (row_nums_ + 7) / 8 : 0;
offsets_ = reinterpret_cast<uint32_t*>(data + null_bitmap_bytes_num);
}
@ -196,6 +214,32 @@ class StringChunk : public Chunk {
using JSONChunk = StringChunk;
// An ArrayChunk is a class that represents a collection of arrays stored in a contiguous memory block.
// It is initialized with the number of rows, a pointer to the data, the size of the data, the element type,
// and a boolean indicating whether the data can contain null values. The data is accessed using offsets and lengths,
// which are stored after an optional null bitmap. Each array is represented by a range in the data block.
//
// Example of a valid ArrayChunk:
//
// Suppose we have a data block containing arrays of integers [1, 2, 3], [4, 5], and [6, 7, 8, 9], and we want to
// create an ArrayChunk for these arrays. The data block might look like this:
//
// [null_bitmap][offsets_lens][array_data]
// [00000000] [24, 3, 36, 2, 44, 4, 60] [1, 2, 3, 4, 5, 6, 7, 8, 9]
//
// For string arrays, the structure is more complex as each string element needs its own offset:
// [null_bitmap][offsets_lens][array1_offsets][array1_data][array2_offsets][array2_data][array3_offsets][array3_data]
// [00000000] [24, 3, 48, 2, 64, 4, 96] [0, 5, 11, 16] ["hello", "world", "!"] [0, 3, 6] ["foo", "bar"] [0, 6, 12, 18, 24] ["apple", "orange", "banana", "grape"]
//
// Here, the null_bitmap is empty (indicating no nulls), the offsets_lens array contains pairs of (offset, length)
// for each array, and the array_data contains the actual array elements.
//
// ArrayChunk exampleChunk(3, dataPointer, dataSize, DataType::INT32, false);
//
// In this example, 'exampleChunk' is an ArrayChunk with 3 rows, a pointer to the data stored in 'dataPointer',
// a total data size of 'dataSize', element type INT32, and it does not support nullability.
class ArrayChunk : public Chunk {
public:
ArrayChunk(int32_t row_nums,

View File

@ -22,7 +22,6 @@
#include "common/EasyAssert.h"
#include "common/FieldDataInterface.h"
namespace milvus {
class ChunkWriterBase {
public:
explicit ChunkWriterBase(bool nullable) : nullable_(nullable) {
@ -86,9 +85,12 @@ class ChunkWriter final : public ChunkWriterBase {
for (auto& batch : batch_vec) {
row_nums += batch->num_rows();
auto data = batch->column(0);
auto array = std::dynamic_pointer_cast<ArrowType>(data);
auto null_bitmap_n = (data->length() + 7) / 8;
size += null_bitmap_n + array->length() * dim_ * sizeof(T);
auto array = std::static_pointer_cast<ArrowType>(data);
if (nullable_) {
auto null_bitmap_n = (data->length() + 7) / 8;
size += null_bitmap_n;
}
size += array->length() * dim_ * sizeof(T);
}
row_nums_ = row_nums;
@ -97,23 +99,27 @@ class ChunkWriter final : public ChunkWriterBase {
} else {
target_ = std::make_shared<MemChunkTarget>(size);
}
// chunk layout: nullbitmap, data1, data2, ..., datan
for (auto& batch : batch_vec) {
auto data = batch->column(0);
auto null_bitmap = data->null_bitmap_data();
auto null_bitmap_n = (data->length() + 7) / 8;
if (null_bitmap) {
target_->write(null_bitmap, null_bitmap_n);
} else {
std::vector<uint8_t> null_bitmap(null_bitmap_n, 0xff);
target_->write(null_bitmap.data(), null_bitmap_n);
// Chunk layout:
// 1. Null bitmap (if nullable_=true): Indicates which values are null
// 2. Data values: Contiguous storage of data elements in the order:
// data1, data2, ..., dataN where each data element has size dim_*sizeof(T)
if (nullable_) {
for (auto& batch : batch_vec) {
auto data = batch->column(0);
auto null_bitmap = data->null_bitmap_data();
auto null_bitmap_n = (data->length() + 7) / 8;
if (null_bitmap) {
target_->write(null_bitmap, null_bitmap_n);
} else {
std::vector<uint8_t> null_bitmap(null_bitmap_n, 0xff);
target_->write(null_bitmap.data(), null_bitmap_n);
}
}
}
for (auto& batch : batch_vec) {
auto data = batch->column(0);
auto array = std::dynamic_pointer_cast<ArrowType>(data);
auto array = std::static_pointer_cast<ArrowType>(data);
auto data_ptr = array->raw_values();
target_->write(data_ptr, array->length() * dim_ * sizeof(T));
}
@ -151,16 +157,19 @@ ChunkWriter<arrow::BooleanArray, bool>::write(
} else {
target_ = std::make_shared<MemChunkTarget>(size);
}
// chunk layout: nullbitmap, data1, data2, ..., datan
for (auto& batch : batch_vec) {
auto data = batch->column(0);
auto null_bitmap = data->null_bitmap_data();
auto null_bitmap_n = (data->length() + 7) / 8;
if (null_bitmap) {
target_->write(null_bitmap, null_bitmap_n);
} else {
std::vector<uint8_t> null_bitmap(null_bitmap_n, 0xff);
target_->write(null_bitmap.data(), null_bitmap_n);
if (nullable_) {
// chunk layout: nullbitmap, data1, data2, ..., datan
for (auto& batch : batch_vec) {
auto data = batch->column(0);
auto null_bitmap = data->null_bitmap_data();
auto null_bitmap_n = (data->length() + 7) / 8;
if (null_bitmap) {
target_->write(null_bitmap, null_bitmap_n);
} else {
std::vector<uint8_t> null_bitmap(null_bitmap_n, 0xff);
target_->write(null_bitmap.data(), null_bitmap_n);
}
}
}

View File

@ -184,6 +184,11 @@ class ChunkedColumnBase : public ColumnBase {
return {chunk_idx, offset_in_chunk};
}
std::shared_ptr<Chunk>
GetChunk(int64_t chunk_id) const {
return chunks_[chunk_id];
}
int64_t
GetNumRowsUntilChunk(int64_t chunk_id) const {
return num_rows_until_chunk_[chunk_id];
@ -258,7 +263,7 @@ class ChunkedColumn : public ChunkedColumnBase {
SpanBase
Span(int64_t chunk_id) const override {
return std::dynamic_pointer_cast<FixedWidthChunk>(chunks_[chunk_id])
return std::static_pointer_cast<FixedWidthChunk>(chunks_[chunk_id])
->Span();
}
};
@ -289,7 +294,7 @@ class ChunkedSparseFloatColumn : public ChunkedColumnBase {
chunks_.push_back(chunk);
dim_ = std::max(
dim_,
std::dynamic_pointer_cast<SparseFloatVectorChunk>(chunk)->Dim());
std::static_pointer_cast<SparseFloatVectorChunk>(chunk)->Dim());
}
SpanBase
@ -339,19 +344,14 @@ class ChunkedVariableColumn : public ChunkedColumnBase {
StringViews(int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len =
std::nullopt) const override {
return std::dynamic_pointer_cast<StringChunk>(chunks_[chunk_id])
return std::static_pointer_cast<StringChunk>(chunks_[chunk_id])
->StringViews(offset_len);
}
std::shared_ptr<Chunk>
GetChunk(int64_t chunk_id) const {
return chunks_[chunk_id];
}
std::pair<std::vector<std::string_view>, FixedVector<bool>>
ViewsByOffsets(int64_t chunk_id,
const FixedVector<int32_t>& offsets) const override {
return std::dynamic_pointer_cast<StringChunk>(chunks_[chunk_id])
return std::static_pointer_cast<StringChunk>(chunks_[chunk_id])
->ViewsByOffsets(offsets);
}
@ -363,7 +363,7 @@ class ChunkedVariableColumn : public ChunkedColumnBase {
std::vector<BufferView::Element> elements;
elements.push_back(
{chunks_[chunk_id]->Data(),
std::dynamic_pointer_cast<StringChunk>(chunks_[chunk_id])
std::static_pointer_cast<StringChunk>(chunks_[chunk_id])
->Offsets(),
static_cast<int>(start_offset),
static_cast<int>(start_offset + length)});
@ -379,12 +379,8 @@ class ChunkedVariableColumn : public ChunkedColumnBase {
}
auto [chunk_id, offset_in_chunk] = GetChunkIDByOffset(i);
auto data = chunks_[chunk_id]->Data();
auto offsets = std::dynamic_pointer_cast<StringChunk>(chunks_[chunk_id])
->Offsets();
auto len = offsets[offset_in_chunk + 1] - offsets[offset_in_chunk];
return ViewType(data + offsets[offset_in_chunk], len);
std::string_view str_view = std::static_pointer_cast<StringChunk>(chunks_[chunk_id])->operator[](offset_in_chunk);
return ViewType(str_view.data(), str_view.size());
}
std::string_view
@ -392,7 +388,6 @@ class ChunkedVariableColumn : public ChunkedColumnBase {
return std::string_view((*this)[i]);
}
};
class ChunkedArrayColumn : public ChunkedColumnBase {
public:
// memory mode ctor
@ -420,14 +415,14 @@ class ChunkedArrayColumn : public ChunkedColumnBase {
ArrayView
operator[](const int i) const {
auto [chunk_id, offset_in_chunk] = GetChunkIDByOffset(i);
return std::dynamic_pointer_cast<ArrayChunk>(chunks_[chunk_id])
return std::static_pointer_cast<ArrayChunk>(chunks_[chunk_id])
->View(offset_in_chunk);
}
ScalarArray
RawAt(const int i) const {
auto [chunk_id, offset_in_chunk] = GetChunkIDByOffset(i);
return std::dynamic_pointer_cast<ArrayChunk>(chunks_[chunk_id])
return std::static_pointer_cast<ArrayChunk>(chunks_[chunk_id])
->View(offset_in_chunk)
.output_data();
}
@ -436,7 +431,7 @@ class ChunkedArrayColumn : public ChunkedColumnBase {
ArrayViews(int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len =
std::nullopt) const override {
return std::dynamic_pointer_cast<ArrayChunk>(chunks_[chunk_id])
return std::static_pointer_cast<ArrayChunk>(chunks_[chunk_id])
->Views(offset_len);
}
};

View File

@ -252,12 +252,10 @@ class CachedSearchIteratorTest
size_t offset = 0;
for (size_t i = 0; i < num_chunks_; ++i) {
const size_t rows = std::min(nb_ - offset, kSizePerChunk);
const size_t chunk_bitset_size = (rows + 7) / 8;
const size_t buf_size =
chunk_bitset_size + rows * dim_ * sizeof(float);
const size_t buf_size = rows * dim_ * sizeof(float);
auto& chunk_data = column_data_[i];
chunk_data.resize(buf_size);
memcpy(chunk_data.data() + chunk_bitset_size,
memcpy(chunk_data.data(),
base_dataset_.cbegin() + offset * dim_,
rows * dim_ * sizeof(float));
column_->AddChunk(std::make_shared<FixedWidthChunk>(

View File

@ -103,6 +103,48 @@ TEST(chunk, test_variable_field) {
}
}
TEST(chunk, test_variable_field_nullable) {
FixedVector<std::string> data = {
"test1", "test2", "test3", "test4", "test5"};
FixedVector<bool> validity = {true, false, true, false, true};
auto field_data =
milvus::storage::CreateFieldData(storage::DataType::VARCHAR, true);
uint8_t* valid_data = new uint8_t[1]{0x15}; // 10101 in binary
field_data->FillFieldData(data.data(), valid_data, data.size());
delete[] valid_data;
storage::InsertEventData event_data;
event_data.field_data = field_data;
auto ser_data = event_data.Serialize();
auto buffer = std::make_shared<arrow::io::BufferReader>(
ser_data.data() + 2 * sizeof(milvus::Timestamp),
ser_data.size() - 2 * sizeof(milvus::Timestamp));
parquet::arrow::FileReaderBuilder reader_builder;
auto s = reader_builder.Open(buffer);
EXPECT_TRUE(s.ok());
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
s = reader_builder.Build(&arrow_reader);
EXPECT_TRUE(s.ok());
std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
s = arrow_reader->GetRecordBatchReader(&rb_reader);
EXPECT_TRUE(s.ok());
FieldMeta field_meta(
FieldName("a"), milvus::FieldId(1), DataType::STRING, true);
auto chunk = create_chunk(field_meta, 1, rb_reader);
auto views = std::dynamic_pointer_cast<StringChunk>(chunk)->StringViews(
std::nullopt);
for (size_t i = 0; i < data.size(); ++i) {
EXPECT_EQ(views.second[i], validity[i]);
if (validity[i]) {
EXPECT_EQ(views.first[i], data[i]);
}
}
}
TEST(chunk, test_json_field) {
auto row_num = 100;
FixedVector<Json> data;
@ -221,12 +263,16 @@ TEST(chunk, test_json_field) {
}
}
TEST(chunk, test_null_field) {
TEST(chunk, test_null_int64) {
FixedVector<int64_t> data = {1, 2, 3, 4, 5};
auto field_data =
milvus::storage::CreateFieldData(storage::DataType::INT64, true);
uint8_t* valid_data_ = new uint8_t[1]{0x13};
field_data->FillFieldData(data.data(), valid_data_, data.size());
// Set up validity bitmap: 10011 (1st, 4th, and 5th are valid)
uint8_t* valid_data = new uint8_t[1]{0x13}; // 10011 in binary
field_data->FillFieldData(data.data(), valid_data, data.size());
delete[] valid_data;
storage::InsertEventData event_data;
event_data.field_data = field_data;
auto ser_data = event_data.Serialize();
@ -248,17 +294,25 @@ TEST(chunk, test_null_field) {
FieldMeta field_meta(
FieldName("a"), milvus::FieldId(1), DataType::INT64, true);
auto chunk = create_chunk(field_meta, 1, rb_reader);
auto span = std::dynamic_pointer_cast<FixedWidthChunk>(chunk)->Span();
auto fixed_chunk = std::dynamic_pointer_cast<FixedWidthChunk>(chunk);
auto span = fixed_chunk->Span();
EXPECT_EQ(span.row_count(), data.size());
data = {1, 2, 0, 0, 5};
FixedVector<bool> valid_data = {true, true, false, false, true};
// Check validity based on our bitmap pattern (10011)
EXPECT_TRUE(fixed_chunk->isValid(0));
EXPECT_TRUE(fixed_chunk->isValid(1));
EXPECT_FALSE(fixed_chunk->isValid(2));
EXPECT_FALSE(fixed_chunk->isValid(3));
EXPECT_TRUE(fixed_chunk->isValid(4));
// Verify data for valid entries
for (size_t i = 0; i < data.size(); ++i) {
auto n = *(int64_t*)((char*)span.data() + i * span.element_sizeof());
EXPECT_EQ(n, data[i]);
auto v = *(bool*)((char*)span.valid_data() + i);
EXPECT_EQ(v, valid_data[i]);
if (fixed_chunk->isValid(i)) {
auto n =
*(int64_t*)((char*)span.data() + i * span.element_sizeof());
EXPECT_EQ(n, data[i]);
}
}
delete[] valid_data_;
}
TEST(chunk, test_array) {
@ -307,6 +361,82 @@ TEST(chunk, test_array) {
}
}
TEST(chunk, test_null_array) {
// Create a test with some arrays being null
auto array_count = 5;
FixedVector<Array> data;
data.reserve(array_count);
// Create a string array to use for non-null values
milvus::proto::schema::ScalarField field_string_data;
field_string_data.mutable_string_data()->add_data("test1");
field_string_data.mutable_string_data()->add_data("test2");
field_string_data.mutable_string_data()->add_data("test3");
auto string_array = Array(field_string_data);
for (int i = 0; i < array_count; i++) {
data.emplace_back(string_array);
}
auto field_data =
milvus::storage::CreateFieldData(storage::DataType::ARRAY, true);
// Set up validity bitmap: 10101 (1st, 3rd, and 5th are valid)
uint8_t* valid_data = new uint8_t[1]{0x15}; // 10101 in binary
field_data->FillFieldData(data.data(), valid_data, data.size());
delete[] valid_data;
storage::InsertEventData event_data;
event_data.field_data = field_data;
auto ser_data = event_data.Serialize();
auto buffer = std::make_shared<arrow::io::BufferReader>(
ser_data.data() + 2 * sizeof(milvus::Timestamp),
ser_data.size() - 2 * sizeof(milvus::Timestamp));
parquet::arrow::FileReaderBuilder reader_builder;
auto s = reader_builder.Open(buffer);
EXPECT_TRUE(s.ok());
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
s = reader_builder.Build(&arrow_reader);
EXPECT_TRUE(s.ok());
std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
s = arrow_reader->GetRecordBatchReader(&rb_reader);
EXPECT_TRUE(s.ok());
FieldMeta field_meta(FieldName("a"),
milvus::FieldId(1),
DataType::ARRAY,
DataType::STRING,
true);
auto chunk = create_chunk(field_meta, 1, rb_reader);
auto [views, valid] =
std::dynamic_pointer_cast<ArrayChunk>(chunk)->Views(std::nullopt);
EXPECT_EQ(views.size(), array_count);
EXPECT_EQ(valid.size(), array_count);
// Check validity based on our bitmap pattern (10101)
EXPECT_TRUE(valid[0]);
EXPECT_FALSE(valid[1]);
EXPECT_TRUE(valid[2]);
EXPECT_FALSE(valid[3]);
EXPECT_TRUE(valid[4]);
// Verify data for valid arrays
for (size_t i = 0; i < array_count; i++) {
if (valid[i]) {
auto& arr = views[i];
EXPECT_EQ(arr.length(),
field_string_data.string_data().data_size());
for (size_t j = 0; j < arr.length(); j++) {
auto str = arr.get_data<std::string>(j);
EXPECT_EQ(str, field_string_data.string_data().data(j));
}
}
}
}
TEST(chunk, test_array_views) {
milvus::proto::schema::ScalarField field_string_data;
field_string_data.mutable_string_data()->add_data("a");

View File

@ -68,7 +68,6 @@ TEST(test_chunk_segment, TestSearchOnSealed) {
int chunk_size = 100;
int total_row_count = chunk_num * chunk_size;
int bitset_size = (total_row_count + 7) / 8;
int chunk_bitset_size = (chunk_size + 7) / 8;
auto column = std::make_shared<ChunkedColumn>();
auto schema = std::make_shared<Schema>();
@ -78,11 +77,11 @@ TEST(test_chunk_segment, TestSearchOnSealed) {
for (int i = 0; i < chunk_num; i++) {
auto dataset = segcore::DataGen(schema, chunk_size);
auto data = dataset.get_col<float>(fakevec_id);
auto buf_size = chunk_bitset_size + 4 * data.size();
auto buf_size = 4 * data.size();
char* buf = new char[buf_size];
defer.AddDefer([buf]() { delete[] buf; });
memcpy(buf + chunk_bitset_size, data.data(), 4 * data.size());
memcpy(buf, data.data(), 4 * data.size());
auto chunk = std::make_shared<FixedWidthChunk>(
chunk_size, dim, buf, buf_size, 4, false);