mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
related: #38736 700m data, array_length=10 non-mmap_offsets_uint64: 2.0G mmap_offsets_uint64: 1.1G mmap_offsets_uint32: 880MB Signed-off-by: MrPresent-Han <chun.han@gmail.com> Co-authored-by: MrPresent-Han <chun.han@gmail.com>
This commit is contained in:
parent
72f5b85c05
commit
3739446a33
@ -35,6 +35,27 @@ class Array {
|
|||||||
|
|
||||||
~Array() {
|
~Array() {
|
||||||
delete[] data_;
|
delete[] data_;
|
||||||
|
if (offsets_ptr_) {
|
||||||
|
// only deallocate offsets for string type array
|
||||||
|
delete[] offsets_ptr_;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Array(char* data,
|
||||||
|
int len,
|
||||||
|
size_t size,
|
||||||
|
DataType element_type,
|
||||||
|
const uint32_t* offsets_ptr)
|
||||||
|
: size_(size), length_(len), element_type_(element_type) {
|
||||||
|
data_ = new char[size];
|
||||||
|
std::copy(data, data + size, data_);
|
||||||
|
if (IsVariableDataType(element_type)) {
|
||||||
|
AssertInfo(offsets_ptr != nullptr,
|
||||||
|
"For variable type elements in array, offsets_ptr must "
|
||||||
|
"be non-null");
|
||||||
|
offsets_ptr_ = new uint32_t[len];
|
||||||
|
std::copy(offsets_ptr, offsets_ptr + len, offsets_ptr_);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
explicit Array(const ScalarArray& field_data) {
|
explicit Array(const ScalarArray& field_data) {
|
||||||
@ -97,17 +118,19 @@ class Array {
|
|||||||
case ScalarArray::kStringData: {
|
case ScalarArray::kStringData: {
|
||||||
element_type_ = DataType::STRING;
|
element_type_ = DataType::STRING;
|
||||||
length_ = field_data.string_data().data().size();
|
length_ = field_data.string_data().data().size();
|
||||||
offsets_.reserve(length_);
|
offsets_ptr_ = new uint32_t[length_];
|
||||||
for (int i = 0; i < length_; ++i) {
|
for (int i = 0; i < length_; ++i) {
|
||||||
offsets_.push_back(size_);
|
offsets_ptr_[i] = size_;
|
||||||
size_ += field_data.string_data().data(i).size();
|
size_ +=
|
||||||
|
field_data.string_data()
|
||||||
|
.data(i)
|
||||||
|
.size(); //type risk here between uint32_t vs size_t
|
||||||
}
|
}
|
||||||
|
|
||||||
data_ = new char[size_];
|
data_ = new char[size_];
|
||||||
for (int i = 0; i < length_; ++i) {
|
for (int i = 0; i < length_; ++i) {
|
||||||
std::copy_n(field_data.string_data().data(i).data(),
|
std::copy_n(field_data.string_data().data(i).data(),
|
||||||
field_data.string_data().data(i).size(),
|
field_data.string_data().data(i).size(),
|
||||||
data_ + offsets_[i]);
|
data_ + offsets_ptr_[i]);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -117,49 +140,39 @@ class Array {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Array(char* data,
|
|
||||||
size_t size,
|
|
||||||
DataType element_type,
|
|
||||||
std::vector<uint64_t>&& element_offsets)
|
|
||||||
: size_(size),
|
|
||||||
offsets_(std::move(element_offsets)),
|
|
||||||
element_type_(element_type) {
|
|
||||||
delete[] data_;
|
|
||||||
data_ = new char[size];
|
|
||||||
std::copy(data, data + size, data_);
|
|
||||||
if (IsVariableDataType(element_type_)) {
|
|
||||||
length_ = offsets_.size();
|
|
||||||
} else {
|
|
||||||
// int8, int16, int32 are all promoted to int32
|
|
||||||
if (element_type_ == DataType::INT8 ||
|
|
||||||
element_type_ == DataType::INT16) {
|
|
||||||
length_ = size / sizeof(int32_t);
|
|
||||||
} else {
|
|
||||||
length_ = size / GetDataTypeSize(element_type_);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Array(const Array& array) noexcept
|
Array(const Array& array) noexcept
|
||||||
: length_{array.length_},
|
: length_{array.length_},
|
||||||
size_{array.size_},
|
size_{array.size_},
|
||||||
element_type_{array.element_type_} {
|
element_type_{array.element_type_} {
|
||||||
delete[] data_;
|
|
||||||
data_ = new char[array.size_];
|
data_ = new char[array.size_];
|
||||||
std::copy(array.data_, array.data_ + array.size_, data_);
|
std::copy(array.data_, array.data_ + array.size_, data_);
|
||||||
offsets_ = array.offsets_;
|
if (IsVariableDataType(array.element_type_)) {
|
||||||
|
AssertInfo(array.get_offsets_data() != nullptr,
|
||||||
|
"for array with variable length elements, offsets_ptr"
|
||||||
|
"must not be nullptr");
|
||||||
|
offsets_ptr_ = new uint32_t[length_];
|
||||||
|
std::copy_n(array.get_offsets_data(), array.length(), offsets_ptr_);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Array&
|
Array&
|
||||||
operator=(const Array& array) {
|
operator=(const Array& array) {
|
||||||
delete[] data_;
|
delete[] data_;
|
||||||
|
if (offsets_ptr_) {
|
||||||
data_ = new char[array.size_];
|
delete[] offsets_ptr_;
|
||||||
std::copy(array.data_, array.data_ + array.size_, data_);
|
}
|
||||||
length_ = array.length_;
|
length_ = array.length_;
|
||||||
size_ = array.size_;
|
size_ = array.size_;
|
||||||
offsets_ = array.offsets_;
|
|
||||||
element_type_ = array.element_type_;
|
element_type_ = array.element_type_;
|
||||||
|
data_ = new char[size_];
|
||||||
|
std::copy(array.data_, array.data_ + size_, data_);
|
||||||
|
if (IsVariableDataType(element_type_)) {
|
||||||
|
AssertInfo(array.get_offsets_data() != nullptr,
|
||||||
|
"for array with variable length elements, offsets_ptr"
|
||||||
|
"must not be nullptr");
|
||||||
|
offsets_ptr_ = new uint32_t[length_];
|
||||||
|
std::copy_n(array.get_offsets_data(), array.length(), offsets_ptr_);
|
||||||
|
}
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -241,10 +254,11 @@ class Array {
|
|||||||
length_);
|
length_);
|
||||||
if constexpr (std::is_same_v<T, std::string> ||
|
if constexpr (std::is_same_v<T, std::string> ||
|
||||||
std::is_same_v<T, std::string_view>) {
|
std::is_same_v<T, std::string_view>) {
|
||||||
size_t element_length = (index == length_ - 1)
|
size_t element_length =
|
||||||
? size_ - offsets_.back()
|
(index == length_ - 1)
|
||||||
: offsets_[index + 1] - offsets_[index];
|
? size_ - offsets_ptr_[length_ - 1]
|
||||||
return T(data_ + offsets_[index], element_length);
|
: offsets_ptr_[index + 1] - offsets_ptr_[index];
|
||||||
|
return T(data_ + offsets_ptr_[index], element_length);
|
||||||
}
|
}
|
||||||
if constexpr (std::is_same_v<T, int> || std::is_same_v<T, int64_t> ||
|
if constexpr (std::is_same_v<T, int> || std::is_same_v<T, int64_t> ||
|
||||||
std::is_same_v<T, int8_t> || std::is_same_v<T, int16_t> ||
|
std::is_same_v<T, int8_t> || std::is_same_v<T, int16_t> ||
|
||||||
@ -272,14 +286,9 @@ class Array {
|
|||||||
return reinterpret_cast<T*>(data_)[index];
|
return reinterpret_cast<T*>(data_)[index];
|
||||||
}
|
}
|
||||||
|
|
||||||
const std::vector<uint64_t>&
|
uint32_t*
|
||||||
get_offsets() const {
|
get_offsets_data() const {
|
||||||
return offsets_;
|
return offsets_ptr_;
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<uint64_t>
|
|
||||||
get_offsets_in_copy() const {
|
|
||||||
return offsets_;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ScalarArray
|
ScalarArray
|
||||||
@ -436,32 +445,45 @@ class Array {
|
|||||||
char* data_{nullptr};
|
char* data_{nullptr};
|
||||||
int length_ = 0;
|
int length_ = 0;
|
||||||
int size_ = 0;
|
int size_ = 0;
|
||||||
std::vector<uint64_t> offsets_{};
|
|
||||||
DataType element_type_ = DataType::NONE;
|
DataType element_type_ = DataType::NONE;
|
||||||
|
uint32_t* offsets_ptr_{nullptr};
|
||||||
};
|
};
|
||||||
|
|
||||||
class ArrayView {
|
class ArrayView {
|
||||||
public:
|
public:
|
||||||
ArrayView() = default;
|
ArrayView() = default;
|
||||||
|
|
||||||
|
ArrayView(const ArrayView& other)
|
||||||
|
: data_(other.data_),
|
||||||
|
length_(other.length_),
|
||||||
|
size_(other.size_),
|
||||||
|
element_type_(other.element_type_),
|
||||||
|
offsets_ptr_(other.offsets_ptr_) {
|
||||||
|
AssertInfo(data_ != nullptr,
|
||||||
|
"data pointer for ArrayView cannot be nullptr");
|
||||||
|
if (IsVariableDataType(element_type_)) {
|
||||||
|
AssertInfo(offsets_ptr_ != nullptr,
|
||||||
|
"for array with variable length elements, offsets_ptr "
|
||||||
|
"must not be nullptr");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ArrayView(char* data,
|
ArrayView(char* data,
|
||||||
|
int len,
|
||||||
size_t size,
|
size_t size,
|
||||||
DataType element_type,
|
DataType element_type,
|
||||||
std::vector<uint64_t>&& element_offsets)
|
uint32_t* offsets_ptr)
|
||||||
: size_(size),
|
: data_(data),
|
||||||
offsets_(std::move(element_offsets)),
|
length_(len),
|
||||||
element_type_(element_type) {
|
size_(size),
|
||||||
data_ = data;
|
element_type_(element_type),
|
||||||
|
offsets_ptr_(offsets_ptr) {
|
||||||
|
AssertInfo(data != nullptr,
|
||||||
|
"data pointer for ArrayView cannot be nullptr");
|
||||||
if (IsVariableDataType(element_type_)) {
|
if (IsVariableDataType(element_type_)) {
|
||||||
length_ = offsets_.size();
|
AssertInfo(offsets_ptr != nullptr,
|
||||||
} else {
|
"for array with variable length elements, offsets_ptr "
|
||||||
// int8, int16, int32 are all promoted to int32
|
"must not be nullptr");
|
||||||
if (element_type_ == DataType::INT8 ||
|
|
||||||
element_type_ == DataType::INT16) {
|
|
||||||
length_ = size / sizeof(int32_t);
|
|
||||||
} else {
|
|
||||||
length_ = size / GetDataTypeSize(element_type_);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -475,10 +497,11 @@ class ArrayView {
|
|||||||
|
|
||||||
if constexpr (std::is_same_v<T, std::string> ||
|
if constexpr (std::is_same_v<T, std::string> ||
|
||||||
std::is_same_v<T, std::string_view>) {
|
std::is_same_v<T, std::string_view>) {
|
||||||
size_t element_length = (index == length_ - 1)
|
size_t element_length =
|
||||||
? size_ - offsets_.back()
|
(index == length_ - 1)
|
||||||
: offsets_[index + 1] - offsets_[index];
|
? size_ - offsets_ptr_[length_ - 1]
|
||||||
return T(data_ + offsets_[index], element_length);
|
: offsets_ptr_[index + 1] - offsets_ptr_[index];
|
||||||
|
return T(data_ + offsets_ptr_[index], element_length);
|
||||||
}
|
}
|
||||||
if constexpr (std::is_same_v<T, int> || std::is_same_v<T, int64_t> ||
|
if constexpr (std::is_same_v<T, int> || std::is_same_v<T, int64_t> ||
|
||||||
std::is_same_v<T, float> || std::is_same_v<T, double>) {
|
std::is_same_v<T, float> || std::is_same_v<T, double>) {
|
||||||
@ -580,11 +603,6 @@ class ArrayView {
|
|||||||
data() const {
|
data() const {
|
||||||
return data_;
|
return data_;
|
||||||
}
|
}
|
||||||
// copy to result
|
|
||||||
std::vector<uint64_t>
|
|
||||||
get_offsets_in_copy() const {
|
|
||||||
return offsets_;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool
|
bool
|
||||||
is_same_array(const proto::plan::Array& arr2) const {
|
is_same_array(const proto::plan::Array& arr2) const {
|
||||||
@ -661,8 +679,10 @@ class ArrayView {
|
|||||||
char* data_{nullptr};
|
char* data_{nullptr};
|
||||||
int length_ = 0;
|
int length_ = 0;
|
||||||
int size_ = 0;
|
int size_ = 0;
|
||||||
std::vector<uint64_t> offsets_{};
|
|
||||||
DataType element_type_ = DataType::NONE;
|
DataType element_type_ = DataType::NONE;
|
||||||
|
|
||||||
|
//offsets ptr
|
||||||
|
uint32_t* offsets_ptr_{nullptr};
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace milvus
|
} // namespace milvus
|
||||||
|
|||||||
@ -51,21 +51,18 @@ ArrayChunk::ConstructViews() {
|
|||||||
int offset = offsets_lens_[2 * i];
|
int offset = offsets_lens_[2 * i];
|
||||||
int next_offset = offsets_lens_[2 * (i + 1)];
|
int next_offset = offsets_lens_[2 * (i + 1)];
|
||||||
int len = offsets_lens_[2 * i + 1];
|
int len = offsets_lens_[2 * i + 1];
|
||||||
|
|
||||||
auto data_ptr = data_ + offset;
|
auto data_ptr = data_ + offset;
|
||||||
auto offsets_len = 0;
|
auto offsets_bytes_len = 0;
|
||||||
std::vector<uint64_t> element_indices = {};
|
uint32_t* offsets_ptr = nullptr;
|
||||||
if (IsStringDataType(element_type_)) {
|
if (IsStringDataType(element_type_)) {
|
||||||
offsets_len = len * sizeof(uint64_t);
|
offsets_bytes_len = len * sizeof(uint32_t);
|
||||||
std::vector<uint64_t> tmp(
|
offsets_ptr = reinterpret_cast<uint32_t*>(data_ptr);
|
||||||
reinterpret_cast<uint64_t*>(data_ptr),
|
|
||||||
reinterpret_cast<uint64_t*>(data_ptr + offsets_len));
|
|
||||||
element_indices = std::move(tmp);
|
|
||||||
}
|
}
|
||||||
views_.emplace_back(data_ptr + offsets_len,
|
views_.emplace_back(data_ptr + offsets_bytes_len,
|
||||||
next_offset - offset - offsets_len,
|
len,
|
||||||
|
next_offset - offset - offsets_bytes_len,
|
||||||
element_type_,
|
element_type_,
|
||||||
std::move(element_indices));
|
offsets_ptr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -176,7 +176,7 @@ ArrayChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
|
|||||||
arrays.push_back(std::move(arr));
|
arrays.push_back(std::move(arr));
|
||||||
if (is_string) {
|
if (is_string) {
|
||||||
// element offsets size
|
// element offsets size
|
||||||
size += sizeof(uint64_t) * arr.length();
|
size += sizeof(uint32_t) * arr.length();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
row_nums_ += array->length();
|
row_nums_ += array->length();
|
||||||
@ -205,18 +205,20 @@ ArrayChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
|
|||||||
|
|
||||||
int offsets_num = row_nums_ + 1;
|
int offsets_num = row_nums_ + 1;
|
||||||
int len_num = row_nums_;
|
int len_num = row_nums_;
|
||||||
int offset_start_pos =
|
uint64_t offset_start_pos =
|
||||||
target_->tell() + sizeof(uint64_t) * (offsets_num + len_num);
|
target_->tell() + sizeof(uint64_t) * (offsets_num + len_num);
|
||||||
std::vector<uint64_t> offsets;
|
std::vector<uint64_t> offsets(offsets_num);
|
||||||
std::vector<uint64_t> lens;
|
std::vector<uint64_t> lens(len_num);
|
||||||
for (auto& arr : arrays) {
|
for (auto i = 0; i < arrays.size(); i++) {
|
||||||
offsets.push_back(offset_start_pos);
|
auto& arr = arrays[i];
|
||||||
lens.push_back(arr.length());
|
offsets[i] = offset_start_pos;
|
||||||
offset_start_pos +=
|
lens[i] = arr.length();
|
||||||
is_string ? sizeof(uint64_t) * arr.get_offsets().size() : 0;
|
offset_start_pos += is_string ? sizeof(uint32_t) * lens[i] : 0;
|
||||||
offset_start_pos += arr.byte_size();
|
offset_start_pos += arr.byte_size();
|
||||||
}
|
}
|
||||||
offsets.push_back(offset_start_pos);
|
if (offsets_num > 0) {
|
||||||
|
offsets[offsets_num - 1] = offset_start_pos;
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < offsets.size(); i++) {
|
for (int i = 0; i < offsets.size(); i++) {
|
||||||
if (i == offsets.size() - 1) {
|
if (i == offsets.size() - 1) {
|
||||||
@ -229,8 +231,8 @@ ArrayChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
|
|||||||
|
|
||||||
for (auto& arr : arrays) {
|
for (auto& arr : arrays) {
|
||||||
if (is_string) {
|
if (is_string) {
|
||||||
target_->write(arr.get_offsets().data(),
|
target_->write(arr.get_offsets_data(),
|
||||||
arr.get_offsets().size() * sizeof(uint64_t));
|
arr.length() * sizeof(uint32_t));
|
||||||
}
|
}
|
||||||
target_->write(arr.data(), arr.byte_size());
|
target_->write(arr.data(), arr.byte_size());
|
||||||
}
|
}
|
||||||
|
|||||||
@ -191,21 +191,35 @@ VariableLengthChunk<Array>::set(const Array* src,
|
|||||||
begin,
|
begin,
|
||||||
size_);
|
size_);
|
||||||
size_t total_size = 0;
|
size_t total_size = 0;
|
||||||
size_t padding_size = 0;
|
|
||||||
for (auto i = 0; i < length; i++) {
|
for (auto i = 0; i < length; i++) {
|
||||||
total_size += src[i].byte_size() + padding_size;
|
total_size += src[i].byte_size();
|
||||||
}
|
}
|
||||||
|
if (length > 0 && IsVariableDataType(src[0].get_element_type())) {
|
||||||
|
for (auto i = 0; i < length; i++) {
|
||||||
|
total_size += (src[i].length() * sizeof(uint32_t));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
auto buf = (char*)mcm->Allocate(mmap_descriptor_, total_size);
|
auto buf = (char*)mcm->Allocate(mmap_descriptor_, total_size);
|
||||||
AssertInfo(buf != nullptr, "failed to allocate memory from mmap_manager.");
|
AssertInfo(buf != nullptr, "failed to allocate memory from mmap_manager.");
|
||||||
for (auto i = 0, offset = 0; i < length; i++) {
|
char* data_ptr = buf;
|
||||||
auto data_size = src[i].byte_size() + padding_size;
|
for (auto i = 0; i < length; i++) {
|
||||||
char* data_ptr = buf + offset;
|
int length = src[i].length();
|
||||||
std::copy(src[i].data(), src[i].data() + src[i].byte_size(), data_ptr);
|
uint32_t* src_offsets_ptr = src[i].get_offsets_data();
|
||||||
data_[i + begin] = ArrayView(data_ptr,
|
auto element_type = src[i].get_element_type();
|
||||||
data_size,
|
// need copy offsets for variable types
|
||||||
src[i].get_element_type(),
|
uint32_t* target_offsets_ptr = nullptr;
|
||||||
src[i].get_offsets_in_copy());
|
if (IsVariableDataType(element_type)) {
|
||||||
offset += data_size;
|
target_offsets_ptr = reinterpret_cast<uint32_t*>(data_ptr);
|
||||||
|
std::copy(
|
||||||
|
src_offsets_ptr, src_offsets_ptr + length, target_offsets_ptr);
|
||||||
|
data_ptr += length * sizeof(uint32_t);
|
||||||
|
}
|
||||||
|
auto data_size = src[i].byte_size();
|
||||||
|
std::copy(src[i].data(), src[i].data() + data_size, data_ptr);
|
||||||
|
data_[i + begin] = ArrayView(
|
||||||
|
data_ptr, length, data_size, element_type, target_offsets_ptr);
|
||||||
|
data_ptr += data_size;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -119,9 +119,10 @@ class ThreadSafeChunkVector : public ChunkVectorBase<Type> {
|
|||||||
} else if constexpr (std::is_same_v<Array, Type>) {
|
} else if constexpr (std::is_same_v<Array, Type>) {
|
||||||
auto& src = chunk[chunk_offset];
|
auto& src = chunk[chunk_offset];
|
||||||
return ArrayView(const_cast<char*>(src.data()),
|
return ArrayView(const_cast<char*>(src.data()),
|
||||||
|
src.length(),
|
||||||
src.byte_size(),
|
src.byte_size(),
|
||||||
src.get_element_type(),
|
src.get_element_type(),
|
||||||
src.get_offsets_in_copy());
|
src.get_offsets_data());
|
||||||
} else {
|
} else {
|
||||||
return chunk[chunk_offset];
|
return chunk[chunk_offset];
|
||||||
}
|
}
|
||||||
|
|||||||
@ -918,7 +918,15 @@ class SingleChunkArrayColumn : public SingleChunkColumnBase {
|
|||||||
void
|
void
|
||||||
Append(const Array& array, bool valid_data = false) {
|
Append(const Array& array, bool valid_data = false) {
|
||||||
indices_.emplace_back(data_size_);
|
indices_.emplace_back(data_size_);
|
||||||
element_indices_.emplace_back(array.get_offsets());
|
lens_.emplace_back(array.length());
|
||||||
|
if (IsVariableDataType(array.get_element_type())) {
|
||||||
|
element_indices_.emplace_back(
|
||||||
|
array.get_offsets_data(),
|
||||||
|
array.get_offsets_data() + array.length());
|
||||||
|
} else {
|
||||||
|
element_indices_.emplace_back();
|
||||||
|
}
|
||||||
|
|
||||||
if (nullable_) {
|
if (nullable_) {
|
||||||
return SingleChunkColumnBase::Append(
|
return SingleChunkColumnBase::Append(
|
||||||
static_cast<const char*>(array.data()),
|
static_cast<const char*>(array.data()),
|
||||||
@ -931,10 +939,14 @@ class SingleChunkArrayColumn : public SingleChunkColumnBase {
|
|||||||
|
|
||||||
void
|
void
|
||||||
Seal(std::vector<uint64_t>&& indices = {},
|
Seal(std::vector<uint64_t>&& indices = {},
|
||||||
std::vector<std::vector<uint64_t>>&& element_indices = {}) {
|
std::vector<std::vector<uint32_t>>&& element_indices = {}) {
|
||||||
if (!indices.empty()) {
|
if (!indices.empty()) {
|
||||||
indices_ = std::move(indices);
|
indices_ = std::move(indices);
|
||||||
element_indices_ = std::move(element_indices);
|
element_indices_ = std::move(element_indices);
|
||||||
|
lens_.reserve(element_indices_.size());
|
||||||
|
for (auto& ele_idices : element_indices_) {
|
||||||
|
lens_.emplace_back(ele_idices.size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
num_rows_ = indices_.size();
|
num_rows_ = indices_.size();
|
||||||
ConstructViews();
|
ConstructViews();
|
||||||
@ -944,22 +956,26 @@ class SingleChunkArrayColumn : public SingleChunkColumnBase {
|
|||||||
void
|
void
|
||||||
ConstructViews() {
|
ConstructViews() {
|
||||||
views_.reserve(indices_.size());
|
views_.reserve(indices_.size());
|
||||||
for (size_t i = 0; i < indices_.size() - 1; i++) {
|
auto last = indices_.size() - 1;
|
||||||
|
for (size_t i = 0; i < last; i++) {
|
||||||
views_.emplace_back(data_ + indices_[i],
|
views_.emplace_back(data_ + indices_[i],
|
||||||
|
lens_[i],
|
||||||
indices_[i + 1] - indices_[i],
|
indices_[i + 1] - indices_[i],
|
||||||
element_type_,
|
element_type_,
|
||||||
std::move(element_indices_[i]));
|
element_indices_[i].data());
|
||||||
}
|
}
|
||||||
views_.emplace_back(data_ + indices_.back(),
|
views_.emplace_back(data_ + indices_.back(),
|
||||||
|
lens_[last],
|
||||||
data_size_ - indices_.back(),
|
data_size_ - indices_.back(),
|
||||||
element_type_,
|
element_type_,
|
||||||
std::move(element_indices_[indices_.size() - 1]));
|
element_indices_[last].data());
|
||||||
element_indices_.clear();
|
lens_.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::vector<uint64_t> indices_{};
|
std::vector<uint64_t> indices_{};
|
||||||
std::vector<std::vector<uint64_t>> element_indices_{};
|
std::vector<std::vector<uint32_t>> element_indices_{};
|
||||||
|
std::vector<int> lens_{};
|
||||||
// Compatible with current Span type
|
// Compatible with current Span type
|
||||||
std::vector<ArrayView> views_{};
|
std::vector<ArrayView> views_{};
|
||||||
DataType element_type_;
|
DataType element_type_;
|
||||||
|
|||||||
@ -90,7 +90,7 @@ WriteFieldData(File& file,
|
|||||||
const FieldDataPtr& data,
|
const FieldDataPtr& data,
|
||||||
uint64_t& total_written,
|
uint64_t& total_written,
|
||||||
std::vector<uint64_t>& indices,
|
std::vector<uint64_t>& indices,
|
||||||
std::vector<std::vector<uint64_t>>& element_indices,
|
std::vector<std::vector<uint32_t>>& element_indices,
|
||||||
FixedVector<bool>& valid_data) {
|
FixedVector<bool>& valid_data) {
|
||||||
if (IsVariableDataType(data_type)) {
|
if (IsVariableDataType(data_type)) {
|
||||||
// use buffered writer to reduce fwrite/write syscall
|
// use buffered writer to reduce fwrite/write syscall
|
||||||
@ -131,8 +131,14 @@ WriteFieldData(File& file,
|
|||||||
indices.push_back(total_written);
|
indices.push_back(total_written);
|
||||||
auto array = static_cast<const Array*>(data->RawValue(i));
|
auto array = static_cast<const Array*>(data->RawValue(i));
|
||||||
bw.Write(array->data(), array->byte_size());
|
bw.Write(array->data(), array->byte_size());
|
||||||
element_indices.emplace_back(array->get_offsets());
|
|
||||||
total_written += array->byte_size();
|
total_written += array->byte_size();
|
||||||
|
if (IsVariableDataType(array->get_element_type())) {
|
||||||
|
element_indices.emplace_back(
|
||||||
|
array->get_offsets_data(),
|
||||||
|
array->get_offsets_data() + array->length());
|
||||||
|
} else {
|
||||||
|
element_indices.emplace_back();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -430,11 +430,11 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
|
|||||||
var_column->Append(*array);
|
var_column->Append(*array);
|
||||||
}
|
}
|
||||||
|
|
||||||
// we stores the offset for each array element, so there is a additional uint64_t for each array element
|
// we stores the offset for each array element, so there is a additional uint32_t for each array element
|
||||||
field_data_size =
|
field_data_size =
|
||||||
array->byte_size() + sizeof(uint64_t);
|
array->byte_size() + sizeof(uint32_t);
|
||||||
stats_.mem_size +=
|
stats_.mem_size +=
|
||||||
array->byte_size() + sizeof(uint64_t);
|
array->byte_size() + sizeof(uint32_t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var_column->Seal();
|
var_column->Seal();
|
||||||
@ -544,7 +544,7 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
|
|||||||
FieldDataPtr field_data;
|
FieldDataPtr field_data;
|
||||||
uint64_t total_written = 0;
|
uint64_t total_written = 0;
|
||||||
std::vector<uint64_t> indices{};
|
std::vector<uint64_t> indices{};
|
||||||
std::vector<std::vector<uint64_t>> element_indices{};
|
std::vector<std::vector<uint32_t>> element_indices{};
|
||||||
FixedVector<bool> valid_data{};
|
FixedVector<bool> valid_data{};
|
||||||
while (data.channel->pop(field_data)) {
|
while (data.channel->pop(field_data)) {
|
||||||
WriteFieldData(file,
|
WriteFieldData(file,
|
||||||
|
|||||||
@ -169,8 +169,10 @@ MmapBlocksHandler::AllocateLargeBlock(const uint64_t size) {
|
|||||||
if (size + Size() > max_disk_limit_) {
|
if (size + Size() > max_disk_limit_) {
|
||||||
PanicInfo(ErrorCode::MemAllocateSizeNotMatch,
|
PanicInfo(ErrorCode::MemAllocateSizeNotMatch,
|
||||||
"Failed to create a new mmap_block, not enough disk for "
|
"Failed to create a new mmap_block, not enough disk for "
|
||||||
"create a new mmap block. Allocated size: {}, Max size: {} "
|
"create a new mmap block. To Allocate:{} Allocated size: {}, "
|
||||||
|
"Max size: {} "
|
||||||
"under mmap file_prefix: {}",
|
"under mmap file_prefix: {}",
|
||||||
|
size,
|
||||||
Size(),
|
Size(),
|
||||||
max_disk_limit_,
|
max_disk_limit_,
|
||||||
mmap_file_prefix_);
|
mmap_file_prefix_);
|
||||||
|
|||||||
@ -18,6 +18,7 @@ TEST(Array, TestConstructArray) {
|
|||||||
using namespace milvus;
|
using namespace milvus;
|
||||||
|
|
||||||
int N = 10;
|
int N = 10;
|
||||||
|
// 1. test int
|
||||||
milvus::proto::schema::ScalarField field_int_data;
|
milvus::proto::schema::ScalarField field_int_data;
|
||||||
milvus::proto::plan::Array field_int_array;
|
milvus::proto::plan::Array field_int_array;
|
||||||
field_int_array.set_same_type(true);
|
field_int_array.set_same_type(true);
|
||||||
@ -33,28 +34,33 @@ TEST(Array, TestConstructArray) {
|
|||||||
}
|
}
|
||||||
ASSERT_TRUE(int_array.is_same_array(field_int_array));
|
ASSERT_TRUE(int_array.is_same_array(field_int_array));
|
||||||
auto int_array_tmp = Array(const_cast<char*>(int_array.data()),
|
auto int_array_tmp = Array(const_cast<char*>(int_array.data()),
|
||||||
|
int_array.length(),
|
||||||
int_array.byte_size(),
|
int_array.byte_size(),
|
||||||
int_array.get_element_type(),
|
int_array.get_element_type(),
|
||||||
{});
|
int_array.get_offsets_data());
|
||||||
auto int_8_array = Array(const_cast<char*>(int_array.data()),
|
auto int_8_array = Array(const_cast<char*>(int_array.data()),
|
||||||
|
int_array.length(),
|
||||||
int_array.byte_size(),
|
int_array.byte_size(),
|
||||||
DataType::INT8,
|
DataType::INT8,
|
||||||
{});
|
int_array.get_offsets_data());
|
||||||
ASSERT_EQ(int_array.length(), int_8_array.length());
|
ASSERT_EQ(int_array.length(), int_8_array.length());
|
||||||
auto int_16_array = Array(const_cast<char*>(int_array.data()),
|
auto int_16_array = Array(const_cast<char*>(int_array.data()),
|
||||||
|
int_array.length(),
|
||||||
int_array.byte_size(),
|
int_array.byte_size(),
|
||||||
DataType::INT16,
|
DataType::INT16,
|
||||||
{});
|
int_array.get_offsets_data());
|
||||||
ASSERT_EQ(int_array.length(), int_16_array.length());
|
ASSERT_EQ(int_array.length(), int_16_array.length());
|
||||||
ASSERT_TRUE(int_array_tmp == int_array);
|
ASSERT_TRUE(int_array_tmp == int_array);
|
||||||
auto int_array_view = ArrayView(const_cast<char*>(int_array.data()),
|
auto int_array_view = ArrayView(const_cast<char*>(int_array.data()),
|
||||||
|
int_array.length(),
|
||||||
int_array.byte_size(),
|
int_array.byte_size(),
|
||||||
int_array.get_element_type(),
|
int_array.get_element_type(),
|
||||||
{});
|
int_array.get_offsets_data());
|
||||||
ASSERT_EQ(int_array.length(), int_array_view.length());
|
ASSERT_EQ(int_array.length(), int_array_view.length());
|
||||||
ASSERT_EQ(int_array.byte_size(), int_array_view.byte_size());
|
ASSERT_EQ(int_array.byte_size(), int_array_view.byte_size());
|
||||||
ASSERT_EQ(int_array.get_element_type(), int_array_view.get_element_type());
|
ASSERT_EQ(int_array.get_element_type(), int_array_view.get_element_type());
|
||||||
|
|
||||||
|
// 2. test long
|
||||||
milvus::proto::schema::ScalarField field_long_data;
|
milvus::proto::schema::ScalarField field_long_data;
|
||||||
milvus::proto::plan::Array field_long_array;
|
milvus::proto::plan::Array field_long_array;
|
||||||
field_long_array.set_same_type(true);
|
field_long_array.set_same_type(true);
|
||||||
@ -70,19 +76,22 @@ TEST(Array, TestConstructArray) {
|
|||||||
}
|
}
|
||||||
ASSERT_TRUE(long_array.is_same_array(field_int_array));
|
ASSERT_TRUE(long_array.is_same_array(field_int_array));
|
||||||
auto long_array_tmp = Array(const_cast<char*>(long_array.data()),
|
auto long_array_tmp = Array(const_cast<char*>(long_array.data()),
|
||||||
|
long_array.length(),
|
||||||
long_array.byte_size(),
|
long_array.byte_size(),
|
||||||
long_array.get_element_type(),
|
long_array.get_element_type(),
|
||||||
{});
|
long_array.get_offsets_data());
|
||||||
ASSERT_TRUE(long_array_tmp == long_array);
|
ASSERT_TRUE(long_array_tmp == long_array);
|
||||||
auto long_array_view = ArrayView(const_cast<char*>(long_array.data()),
|
auto long_array_view = ArrayView(const_cast<char*>(long_array.data()),
|
||||||
|
long_array.length(),
|
||||||
long_array.byte_size(),
|
long_array.byte_size(),
|
||||||
long_array.get_element_type(),
|
long_array.get_element_type(),
|
||||||
{});
|
long_array.get_offsets_data());
|
||||||
ASSERT_EQ(long_array.length(), long_array_view.length());
|
ASSERT_EQ(long_array.length(), long_array_view.length());
|
||||||
ASSERT_EQ(long_array.byte_size(), long_array_view.byte_size());
|
ASSERT_EQ(long_array.byte_size(), long_array_view.byte_size());
|
||||||
ASSERT_EQ(long_array.get_element_type(),
|
ASSERT_EQ(long_array.get_element_type(),
|
||||||
long_array_view.get_element_type());
|
long_array_view.get_element_type());
|
||||||
|
|
||||||
|
// 3. test string
|
||||||
milvus::proto::schema::ScalarField field_string_data;
|
milvus::proto::schema::ScalarField field_string_data;
|
||||||
milvus::proto::plan::Array field_string_array;
|
milvus::proto::plan::Array field_string_array;
|
||||||
field_string_array.set_same_type(true);
|
field_string_array.set_same_type(true);
|
||||||
@ -94,32 +103,28 @@ TEST(Array, TestConstructArray) {
|
|||||||
}
|
}
|
||||||
auto string_array = Array(field_string_data);
|
auto string_array = Array(field_string_data);
|
||||||
ASSERT_EQ(N, string_array.length());
|
ASSERT_EQ(N, string_array.length());
|
||||||
// ASSERT_EQ(N, string_array.size());
|
|
||||||
for (int i = 0; i < N; ++i) {
|
for (int i = 0; i < N; ++i) {
|
||||||
ASSERT_EQ(string_array.get_data<std::string_view>(i),
|
ASSERT_EQ(string_array.get_data<std::string_view>(i),
|
||||||
std::to_string(i));
|
std::to_string(i));
|
||||||
}
|
}
|
||||||
ASSERT_TRUE(string_array.is_same_array(field_string_array));
|
ASSERT_TRUE(string_array.is_same_array(field_string_array));
|
||||||
std::vector<uint64_t> string_element_offsets;
|
|
||||||
std::vector<uint64_t> string_view_element_offsets;
|
|
||||||
for (auto& offset : string_array.get_offsets()) {
|
|
||||||
string_element_offsets.emplace_back(offset);
|
|
||||||
string_view_element_offsets.emplace_back(offset);
|
|
||||||
}
|
|
||||||
auto string_array_tmp = Array(const_cast<char*>(string_array.data()),
|
auto string_array_tmp = Array(const_cast<char*>(string_array.data()),
|
||||||
|
string_array.length(),
|
||||||
string_array.byte_size(),
|
string_array.byte_size(),
|
||||||
string_array.get_element_type(),
|
string_array.get_element_type(),
|
||||||
std::move(string_element_offsets));
|
string_array.get_offsets_data());
|
||||||
ASSERT_TRUE(string_array_tmp == string_array);
|
ASSERT_TRUE(string_array_tmp == string_array);
|
||||||
auto string_array_view = ArrayView(const_cast<char*>(string_array.data()),
|
auto string_array_view = ArrayView(const_cast<char*>(string_array.data()),
|
||||||
|
string_array.length(),
|
||||||
string_array.byte_size(),
|
string_array.byte_size(),
|
||||||
string_array.get_element_type(),
|
string_array.get_element_type(),
|
||||||
std::move(string_view_element_offsets));
|
string_array.get_offsets_data());
|
||||||
ASSERT_EQ(string_array.length(), string_array_view.length());
|
ASSERT_EQ(string_array.length(), string_array_view.length());
|
||||||
ASSERT_EQ(string_array.byte_size(), string_array_view.byte_size());
|
ASSERT_EQ(string_array.byte_size(), string_array_view.byte_size());
|
||||||
ASSERT_EQ(string_array.get_element_type(),
|
ASSERT_EQ(string_array.get_element_type(),
|
||||||
string_array_view.get_element_type());
|
string_array_view.get_element_type());
|
||||||
|
|
||||||
|
// 4. test bool
|
||||||
milvus::proto::schema::ScalarField field_bool_data;
|
milvus::proto::schema::ScalarField field_bool_data;
|
||||||
milvus::proto::plan::Array field_bool_array;
|
milvus::proto::plan::Array field_bool_array;
|
||||||
field_bool_array.set_same_type(true);
|
field_bool_array.set_same_type(true);
|
||||||
@ -135,19 +140,22 @@ TEST(Array, TestConstructArray) {
|
|||||||
}
|
}
|
||||||
ASSERT_TRUE(bool_array.is_same_array(field_bool_array));
|
ASSERT_TRUE(bool_array.is_same_array(field_bool_array));
|
||||||
auto bool_array_tmp = Array(const_cast<char*>(bool_array.data()),
|
auto bool_array_tmp = Array(const_cast<char*>(bool_array.data()),
|
||||||
|
bool_array.length(),
|
||||||
bool_array.byte_size(),
|
bool_array.byte_size(),
|
||||||
bool_array.get_element_type(),
|
bool_array.get_element_type(),
|
||||||
{});
|
bool_array.get_offsets_data());
|
||||||
ASSERT_TRUE(bool_array_tmp == bool_array);
|
ASSERT_TRUE(bool_array_tmp == bool_array);
|
||||||
auto bool_array_view = ArrayView(const_cast<char*>(bool_array.data()),
|
auto bool_array_view = ArrayView(const_cast<char*>(bool_array.data()),
|
||||||
|
bool_array.length(),
|
||||||
bool_array.byte_size(),
|
bool_array.byte_size(),
|
||||||
bool_array.get_element_type(),
|
bool_array.get_element_type(),
|
||||||
{});
|
bool_array.get_offsets_data());
|
||||||
ASSERT_EQ(bool_array.length(), bool_array_view.length());
|
ASSERT_EQ(bool_array.length(), bool_array_view.length());
|
||||||
ASSERT_EQ(bool_array.byte_size(), bool_array_view.byte_size());
|
ASSERT_EQ(bool_array.byte_size(), bool_array_view.byte_size());
|
||||||
ASSERT_EQ(bool_array.get_element_type(),
|
ASSERT_EQ(bool_array.get_element_type(),
|
||||||
bool_array_view.get_element_type());
|
bool_array_view.get_element_type());
|
||||||
|
|
||||||
|
//5. test float
|
||||||
milvus::proto::schema::ScalarField field_float_data;
|
milvus::proto::schema::ScalarField field_float_data;
|
||||||
milvus::proto::plan::Array field_float_array;
|
milvus::proto::plan::Array field_float_array;
|
||||||
field_float_array.set_same_type(true);
|
field_float_array.set_same_type(true);
|
||||||
@ -163,19 +171,22 @@ TEST(Array, TestConstructArray) {
|
|||||||
}
|
}
|
||||||
ASSERT_TRUE(float_array.is_same_array(field_float_array));
|
ASSERT_TRUE(float_array.is_same_array(field_float_array));
|
||||||
auto float_array_tmp = Array(const_cast<char*>(float_array.data()),
|
auto float_array_tmp = Array(const_cast<char*>(float_array.data()),
|
||||||
|
float_array.length(),
|
||||||
float_array.byte_size(),
|
float_array.byte_size(),
|
||||||
float_array.get_element_type(),
|
float_array.get_element_type(),
|
||||||
{});
|
float_array.get_offsets_data());
|
||||||
ASSERT_TRUE(float_array_tmp == float_array);
|
ASSERT_TRUE(float_array_tmp == float_array);
|
||||||
auto float_array_view = ArrayView(const_cast<char*>(float_array.data()),
|
auto float_array_view = ArrayView(const_cast<char*>(float_array.data()),
|
||||||
|
float_array.length(),
|
||||||
float_array.byte_size(),
|
float_array.byte_size(),
|
||||||
float_array.get_element_type(),
|
float_array.get_element_type(),
|
||||||
{});
|
float_array.get_offsets_data());
|
||||||
ASSERT_EQ(float_array.length(), float_array_view.length());
|
ASSERT_EQ(float_array.length(), float_array_view.length());
|
||||||
ASSERT_EQ(float_array.byte_size(), float_array_view.byte_size());
|
ASSERT_EQ(float_array.byte_size(), float_array_view.byte_size());
|
||||||
ASSERT_EQ(float_array.get_element_type(),
|
ASSERT_EQ(float_array.get_element_type(),
|
||||||
float_array_view.get_element_type());
|
float_array_view.get_element_type());
|
||||||
|
|
||||||
|
//6. test double
|
||||||
milvus::proto::schema::ScalarField field_double_data;
|
milvus::proto::schema::ScalarField field_double_data;
|
||||||
milvus::proto::plan::Array field_double_array;
|
milvus::proto::plan::Array field_double_array;
|
||||||
field_double_array.set_same_type(true);
|
field_double_array.set_same_type(true);
|
||||||
@ -192,14 +203,16 @@ TEST(Array, TestConstructArray) {
|
|||||||
}
|
}
|
||||||
ASSERT_TRUE(double_array.is_same_array(field_double_array));
|
ASSERT_TRUE(double_array.is_same_array(field_double_array));
|
||||||
auto double_array_tmp = Array(const_cast<char*>(double_array.data()),
|
auto double_array_tmp = Array(const_cast<char*>(double_array.data()),
|
||||||
|
double_array.length(),
|
||||||
double_array.byte_size(),
|
double_array.byte_size(),
|
||||||
double_array.get_element_type(),
|
double_array.get_element_type(),
|
||||||
{});
|
double_array.get_offsets_data());
|
||||||
ASSERT_TRUE(double_array_tmp == double_array);
|
ASSERT_TRUE(double_array_tmp == double_array);
|
||||||
auto double_array_view = ArrayView(const_cast<char*>(double_array.data()),
|
auto double_array_view = ArrayView(const_cast<char*>(double_array.data()),
|
||||||
|
double_array.length(),
|
||||||
double_array.byte_size(),
|
double_array.byte_size(),
|
||||||
double_array.get_element_type(),
|
double_array.get_element_type(),
|
||||||
{});
|
double_array.get_offsets_data());
|
||||||
ASSERT_EQ(double_array.length(), double_array_view.length());
|
ASSERT_EQ(double_array.length(), double_array_view.length());
|
||||||
ASSERT_EQ(double_array.byte_size(), double_array_view.byte_size());
|
ASSERT_EQ(double_array.byte_size(), double_array_view.byte_size());
|
||||||
ASSERT_EQ(double_array.get_element_type(),
|
ASSERT_EQ(double_array.get_element_type(),
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user