mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: [StorageV2] Align null bitmap offset when loading multi-chunk (#43321)
Related to #43262 This patch fixes following logic bug: - When multiple chunks are loaded and size cannot be divided by 8, just appending uint8_t as bitmap will cause null bitmap dislocation - `null_bitmap_data()` points to start of whole row group, which may not stand for current `arrow::Array` The current solutions is: - Reorganize the null_bitmap with currect size & offset - Pass `array->offset()` in tuple to info the current offset Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
ccfaa7bee8
commit
fe8de016d5
@ -12,6 +12,7 @@
|
||||
#include "common/ChunkWriter.h"
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <tuple>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
@ -34,7 +35,8 @@ void
|
||||
StringChunkWriter::write(const arrow::ArrayVector& array_vec) {
|
||||
auto size = 0;
|
||||
std::vector<std::string_view> strs;
|
||||
std::vector<std::pair<const uint8_t*, int64_t>> null_bitmaps;
|
||||
// tuple <data, size, offset>
|
||||
std::vector<std::tuple<const uint8_t*, int64_t, int64_t>> null_bitmaps;
|
||||
for (const auto& data : array_vec) {
|
||||
auto array = std::dynamic_pointer_cast<arrow::StringArray>(data);
|
||||
for (int i = 0; i < array->length(); i++) {
|
||||
@ -44,7 +46,9 @@ StringChunkWriter::write(const arrow::ArrayVector& array_vec) {
|
||||
}
|
||||
if (nullable_) {
|
||||
auto null_bitmap_n = (data->length() + 7) / 8;
|
||||
null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n);
|
||||
// size, offset all in bits
|
||||
null_bitmaps.emplace_back(
|
||||
data->null_bitmap_data(), data->length(), data->offset());
|
||||
size += null_bitmap_n;
|
||||
}
|
||||
row_nums_ += array->length();
|
||||
@ -92,7 +96,8 @@ void
|
||||
JSONChunkWriter::write(const arrow::ArrayVector& array_vec) {
|
||||
auto size = 0;
|
||||
std::vector<Json> jsons;
|
||||
std::vector<std::pair<const uint8_t*, int64_t>> null_bitmaps;
|
||||
// tuple <data, size, offset>
|
||||
std::vector<std::tuple<const uint8_t*, int64_t, int64_t>> null_bitmaps;
|
||||
for (const auto& data : array_vec) {
|
||||
auto array = std::dynamic_pointer_cast<arrow::BinaryArray>(data);
|
||||
for (int i = 0; i < array->length(); i++) {
|
||||
@ -103,7 +108,9 @@ JSONChunkWriter::write(const arrow::ArrayVector& array_vec) {
|
||||
}
|
||||
if (nullable_) {
|
||||
auto null_bitmap_n = (data->length() + 7) / 8;
|
||||
null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n);
|
||||
// size, offset all in bits
|
||||
null_bitmaps.emplace_back(
|
||||
data->null_bitmap_data(), data->length(), data->offset());
|
||||
size += null_bitmap_n;
|
||||
}
|
||||
row_nums_ += array->length();
|
||||
@ -151,7 +158,8 @@ ArrayChunkWriter::write(const arrow::ArrayVector& array_vec) {
|
||||
auto size = 0;
|
||||
auto is_string = IsStringDataType(element_type_);
|
||||
std::vector<Array> arrays;
|
||||
std::vector<std::pair<const uint8_t*, int64_t>> null_bitmaps;
|
||||
// tuple <data, size, offset>
|
||||
std::vector<std::tuple<const uint8_t*, int64_t, int64_t>> null_bitmaps;
|
||||
|
||||
for (const auto& data : array_vec) {
|
||||
auto array = std::dynamic_pointer_cast<arrow::BinaryArray>(data);
|
||||
@ -170,7 +178,9 @@ ArrayChunkWriter::write(const arrow::ArrayVector& array_vec) {
|
||||
row_nums_ += array->length();
|
||||
if (nullable_) {
|
||||
auto null_bitmap_n = (data->length() + 7) / 8;
|
||||
null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n);
|
||||
// size, offset all in bits
|
||||
null_bitmaps.emplace_back(
|
||||
data->null_bitmap_data(), data->length(), data->offset());
|
||||
size += null_bitmap_n;
|
||||
}
|
||||
}
|
||||
@ -368,9 +378,9 @@ template <typename... Args>
|
||||
std::shared_ptr<ChunkWriterBase>
|
||||
create_chunk_writer(const FieldMeta& field_meta, Args&&... args) {
|
||||
int dim = IsVectorDataType(field_meta.get_data_type()) &&
|
||||
!IsSparseFloatVectorDataType(field_meta.get_data_type())
|
||||
? field_meta.get_dim()
|
||||
: 1;
|
||||
!IsSparseFloatVectorDataType(field_meta.get_data_type())
|
||||
? field_meta.get_dim()
|
||||
: 1;
|
||||
bool nullable = field_meta.is_nullable();
|
||||
switch (field_meta.get_data_type()) {
|
||||
case milvus::DataType::BOOL:
|
||||
@ -395,41 +405,53 @@ create_chunk_writer(const FieldMeta& field_meta, Args&&... args) {
|
||||
return std::make_shared<ChunkWriter<arrow::DoubleArray, double>>(
|
||||
dim, std::forward<Args>(args)..., nullable);
|
||||
case milvus::DataType::VECTOR_FLOAT:
|
||||
return std::make_shared<ChunkWriter<arrow::FixedSizeBinaryArray, knowhere::fp32>>(
|
||||
return std::make_shared<
|
||||
ChunkWriter<arrow::FixedSizeBinaryArray, knowhere::fp32>>(
|
||||
dim, std::forward<Args>(args)..., nullable);
|
||||
case milvus::DataType::VECTOR_BINARY:
|
||||
return std::make_shared<ChunkWriter<arrow::FixedSizeBinaryArray, knowhere::bin1>>(
|
||||
return std::make_shared<
|
||||
ChunkWriter<arrow::FixedSizeBinaryArray, knowhere::bin1>>(
|
||||
dim / 8, std::forward<Args>(args)..., nullable);
|
||||
case milvus::DataType::VECTOR_FLOAT16:
|
||||
return std::make_shared<ChunkWriter<arrow::FixedSizeBinaryArray, knowhere::fp16>>(
|
||||
return std::make_shared<
|
||||
ChunkWriter<arrow::FixedSizeBinaryArray, knowhere::fp16>>(
|
||||
dim, std::forward<Args>(args)..., nullable);
|
||||
case milvus::DataType::VECTOR_BFLOAT16:
|
||||
return std::make_shared<ChunkWriter<arrow::FixedSizeBinaryArray, knowhere::bf16>>(
|
||||
return std::make_shared<
|
||||
ChunkWriter<arrow::FixedSizeBinaryArray, knowhere::bf16>>(
|
||||
dim, std::forward<Args>(args)..., nullable);
|
||||
case milvus::DataType::VECTOR_INT8:
|
||||
return std::make_shared<ChunkWriter<arrow::FixedSizeBinaryArray, knowhere::int8>>(
|
||||
return std::make_shared<
|
||||
ChunkWriter<arrow::FixedSizeBinaryArray, knowhere::int8>>(
|
||||
dim, std::forward<Args>(args)..., nullable);
|
||||
case milvus::DataType::VARCHAR:
|
||||
case milvus::DataType::STRING:
|
||||
case milvus::DataType::TEXT:
|
||||
return std::make_shared<StringChunkWriter>(std::forward<Args>(args)..., nullable);
|
||||
return std::make_shared<StringChunkWriter>(
|
||||
std::forward<Args>(args)..., nullable);
|
||||
case milvus::DataType::JSON:
|
||||
return std::make_shared<JSONChunkWriter>(std::forward<Args>(args)..., nullable);
|
||||
return std::make_shared<JSONChunkWriter>(
|
||||
std::forward<Args>(args)..., nullable);
|
||||
case milvus::DataType::ARRAY:
|
||||
return std::make_shared<ArrayChunkWriter>(
|
||||
field_meta.get_element_type(), std::forward<Args>(args)..., nullable);
|
||||
field_meta.get_element_type(),
|
||||
std::forward<Args>(args)...,
|
||||
nullable);
|
||||
case milvus::DataType::VECTOR_SPARSE_FLOAT:
|
||||
return std::make_shared<SparseFloatVectorChunkWriter>(std::forward<Args>(args)..., nullable);
|
||||
return std::make_shared<SparseFloatVectorChunkWriter>(
|
||||
std::forward<Args>(args)..., nullable);
|
||||
case milvus::DataType::VECTOR_ARRAY:
|
||||
return std::make_shared<VectorArrayChunkWriter>(dim, field_meta.get_element_type(), std::forward<Args>(args)...);
|
||||
return std::make_shared<VectorArrayChunkWriter>(
|
||||
dim,
|
||||
field_meta.get_element_type(),
|
||||
std::forward<Args>(args)...);
|
||||
default:
|
||||
PanicInfo(Unsupported, "Unsupported data type");
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<Chunk>
|
||||
create_chunk(const FieldMeta& field_meta,
|
||||
const arrow::ArrayVector& array_vec) {
|
||||
create_chunk(const FieldMeta& field_meta, const arrow::ArrayVector& array_vec) {
|
||||
auto cw = create_chunk_writer(field_meta);
|
||||
cw->write(array_vec);
|
||||
return cw->finish();
|
||||
@ -437,7 +459,8 @@ create_chunk(const FieldMeta& field_meta,
|
||||
|
||||
std::unique_ptr<Chunk>
|
||||
create_chunk(const FieldMeta& field_meta,
|
||||
const arrow::ArrayVector& array_vec, const std::string& file_path) {
|
||||
const arrow::ArrayVector& array_vec,
|
||||
const std::string& file_path) {
|
||||
auto cw = create_chunk_writer(field_meta, file_path);
|
||||
cw->write(array_vec);
|
||||
return cw->finish();
|
||||
|
||||
@ -49,17 +49,38 @@ class ChunkWriterBase {
|
||||
|
||||
void
|
||||
write_null_bit_maps(
|
||||
const std::vector<std::pair<const uint8_t*, int64_t>>& null_bitmaps) {
|
||||
const std::vector<std::tuple<const uint8_t*, int64_t, int64_t>>&
|
||||
null_bitmaps) {
|
||||
if (nullable_) {
|
||||
for (auto [data, size] : null_bitmaps) {
|
||||
// merge all null bitmaps in case of multiple chunk null bitmap dislocation
|
||||
// say [0xFF, 0x00] with size [7, 8] cannot be treated as [0xFF, 0x00] after merged but
|
||||
// [0x7F, 0x00], othersize the null index will be dislocated
|
||||
std::vector<uint8_t> merged_null_bitmap;
|
||||
int64_t size_total_bit = 0;
|
||||
for (auto [data, size_bits, offset_bits] : null_bitmaps) {
|
||||
// resize in byte
|
||||
merged_null_bitmap.resize((size_total_bit + size_bits + 7) / 8,
|
||||
0xFF);
|
||||
if (data != nullptr) {
|
||||
target_->write(data, size);
|
||||
bitset::detail::ElementWiseBitsetPolicy<uint8_t>::op_copy(
|
||||
data,
|
||||
offset_bits,
|
||||
merged_null_bitmap.data(),
|
||||
size_total_bit,
|
||||
size_bits);
|
||||
} else {
|
||||
// have to append always-true bitmap due to arrow optimize this
|
||||
std::vector<uint8_t> null_bitmap(size, 0xff);
|
||||
target_->write(null_bitmap.data(), size);
|
||||
std::vector<uint8_t> null_bitmap(size_bits, 0xff);
|
||||
bitset::detail::ElementWiseBitsetPolicy<uint8_t>::op_copy(
|
||||
null_bitmap.data(),
|
||||
0,
|
||||
merged_null_bitmap.data(),
|
||||
size_total_bit,
|
||||
size_bits);
|
||||
}
|
||||
size_total_bit += size_bits;
|
||||
}
|
||||
target_->write(merged_null_bitmap.data(), (size_total_bit + 7) / 8);
|
||||
}
|
||||
}
|
||||
|
||||
@ -208,7 +229,8 @@ class ArrayChunkWriter : public ChunkWriterBase {
|
||||
ArrayChunkWriter(const milvus::DataType element_type,
|
||||
std::string file_path,
|
||||
bool nullable)
|
||||
: ChunkWriterBase(std::move(file_path), nullable), element_type_(element_type) {
|
||||
: ChunkWriterBase(std::move(file_path), nullable),
|
||||
element_type_(element_type) {
|
||||
}
|
||||
|
||||
void
|
||||
@ -257,8 +279,7 @@ class SparseFloatVectorChunkWriter : public ChunkWriterBase {
|
||||
};
|
||||
|
||||
std::unique_ptr<Chunk>
|
||||
create_chunk(const FieldMeta& field_meta,
|
||||
const arrow::ArrayVector& array_vec);
|
||||
create_chunk(const FieldMeta& field_meta, const arrow::ArrayVector& array_vec);
|
||||
|
||||
std::unique_ptr<Chunk>
|
||||
create_chunk(const FieldMeta& field_meta,
|
||||
|
||||
@ -115,7 +115,8 @@ class File {
|
||||
}
|
||||
|
||||
private:
|
||||
static inline const char* get_mode_from_flags(int flags) {
|
||||
static inline const char*
|
||||
get_mode_from_flags(int flags) {
|
||||
switch (flags) {
|
||||
case O_RDONLY: {
|
||||
return "rb";
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user