// Copyright (C) 2019-2020 Zilliz. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License #include "common/ChunkWriter.h" #include #include #include #include #include "arrow/array/array_binary.h" #include "arrow/array/array_primitive.h" #include "arrow/record_batch.h" #include "common/Chunk.h" #include "common/EasyAssert.h" #include "common/FieldDataInterface.h" #include "common/Geometry.h" #include "common/Types.h" #include "common/VectorTrait.h" #include "simdjson/common_defs.h" #include "simdjson/padded_string.h" #include "storage/FileWriter.h" namespace milvus { void StringChunkWriter::write(std::shared_ptr data) { auto size = 0; std::vector strs; std::vector> batches; std::vector> null_bitmaps; for (auto batch : *data) { auto batch_data = batch.ValueOrDie(); batches.emplace_back(batch_data); auto data = batch_data->column(0); auto array = std::dynamic_pointer_cast(data); for (int i = 0; i < array->length(); i++) { auto str = array->GetView(i); strs.emplace_back(str); size += str.size(); } if (nullable_) { auto null_bitmap_n = (data->length() + 7) / 8; null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n); size += null_bitmap_n; } row_nums_ += array->length(); } size += sizeof(uint32_t) * (row_nums_ + 1) + MMAP_STRING_PADDING; if (!file_path_.empty()) { target_ = std::make_shared(file_path_); } else { target_ = std::make_shared(size); } // chunk layout: null bitmap, offset1, offset2, ..., offsetn, str1, str2, ..., strn, padding // write null bitmaps write_null_bit_maps(null_bitmaps); // write data int offset_num = row_nums_ + 1; uint32_t offset_start_pos = target_->tell() + sizeof(uint32_t) * offset_num; std::vector offsets; offsets.reserve(offset_num); for (const auto& str : strs) { offsets.push_back(offset_start_pos); offset_start_pos += str.size(); } offsets.push_back(offset_start_pos); target_->write(offsets.data(), offsets.size() * sizeof(uint32_t)); for (auto str : strs) { target_->write(str.data(), str.size()); } } std::shared_ptr StringChunkWriter::finish() { // write padding, maybe not needed anymore // FIXME char padding[MMAP_STRING_PADDING]; target_->write(padding, MMAP_STRING_PADDING); auto [data, size] = target_->get(); auto mmap_file_raii = file_path_.empty() ? nullptr : std::make_unique(file_path_); return std::make_unique( row_nums_, data, size, nullable_, std::move(mmap_file_raii)); } void JSONChunkWriter::write(std::shared_ptr data) { auto size = 0; std::vector jsons; std::vector> null_bitmaps; for (auto batch : *data) { auto data = batch.ValueOrDie()->column(0); auto array = std::dynamic_pointer_cast(data); for (int i = 0; i < array->length(); i++) { auto str = array->GetView(i); auto json = Json(simdjson::padded_string(str)); size += json.data().size(); jsons.push_back(std::move(json)); } if (nullable_) { auto null_bitmap_n = (data->length() + 7) / 8; null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n); size += null_bitmap_n; } row_nums_ += array->length(); } size += sizeof(uint32_t) * (row_nums_ + 1) + simdjson::SIMDJSON_PADDING; if (!file_path_.empty()) { target_ = std::make_shared(file_path_); } else { target_ = std::make_shared(size); } // chunk layout: null bitmaps, offset1, offset2, ... ,json1, json2, ..., jsonn // write null bitmaps write_null_bit_maps(null_bitmaps); int offset_num = row_nums_ + 1; uint32_t offset_start_pos = target_->tell() + sizeof(uint32_t) * offset_num; std::vector offsets; offsets.reserve(offset_num); for (const auto& json : jsons) { offsets.push_back(offset_start_pos); offset_start_pos += json.data().size(); } offsets.push_back(offset_start_pos); target_->write(offsets.data(), offset_num * sizeof(uint32_t)); // write data for (const auto& json : jsons) { target_->write(json.data().data(), json.data().size()); } } std::shared_ptr JSONChunkWriter::finish() { char padding[simdjson::SIMDJSON_PADDING]; target_->write(padding, simdjson::SIMDJSON_PADDING); auto [data, size] = target_->get(); auto mmap_file_raii = file_path_.empty() ? nullptr : std::make_unique(file_path_); return std::make_unique( row_nums_, data, size, nullable_, std::move(mmap_file_raii)); } void GeometryChunkWriter::write(std::shared_ptr data) { auto size = 0; std::vector wkb_strs; std::vector> null_bitmaps; for (auto batch : *data) { auto data = batch.ValueOrDie()->column(0); auto array = std::dynamic_pointer_cast(data); for (int i = 0; i < array->length(); i++) { auto str = array->GetView(i); wkb_strs.emplace_back(str); size += str.size(); } if (nullable_) { auto null_bitmap_n = (data->length() + 7) / 8; null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n); size += null_bitmap_n; } row_nums_ += array->length(); } // use 32-bit offsets to align with StringChunk layout size += sizeof(uint32_t) * (row_nums_ + 1) + MMAP_GEOMETRY_PADDING; if (!file_path_.empty()) { target_ = std::make_shared(file_path_); } else { target_ = std::make_shared(size); } // chunk layout: null bitmap, offset1, offset2, ..., offsetn, wkb1, wkb2, ..., wkbn, padding // write null bitmaps write_null_bit_maps(null_bitmaps); int offset_num = row_nums_ + 1; uint32_t offset_start_pos = static_cast(target_->tell() + sizeof(uint32_t) * offset_num); std::vector offsets; for (auto str : wkb_strs) { offsets.push_back(offset_start_pos); offset_start_pos += str.size(); } offsets.push_back(offset_start_pos); target_->write(offsets.data(), offsets.size() * sizeof(uint32_t)); for (auto str : wkb_strs) { target_->write(str.data(), str.size()); } } std::shared_ptr GeometryChunkWriter::finish() { // write padding, maybe not needed anymore // FIXME char padding[MMAP_GEOMETRY_PADDING]; target_->write(padding, MMAP_GEOMETRY_PADDING); auto [data, size] = target_->get(); return std::make_shared(row_nums_, data, size, nullable_); } void ArrayChunkWriter::write(std::shared_ptr data) { auto size = 0; auto is_string = IsStringDataType(element_type_); std::vector arrays; std::vector> null_bitmaps; for (auto batch : *data) { auto data = batch.ValueOrDie()->column(0); auto array = std::dynamic_pointer_cast(data); for (int i = 0; i < array->length(); i++) { auto str = array->GetView(i); ScalarArray scalar_array; scalar_array.ParseFromArray(str.data(), str.size()); auto arr = Array(scalar_array); size += arr.byte_size(); arrays.push_back(std::move(arr)); if (is_string) { // element offsets size size += sizeof(uint32_t) * arr.length(); } } row_nums_ += array->length(); if (nullable_) { auto null_bitmap_n = (data->length() + 7) / 8; null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n); size += null_bitmap_n; } } // offsets + lens size += sizeof(uint32_t) * (row_nums_ * 2 + 1) + MMAP_ARRAY_PADDING; if (!file_path_.empty()) { target_ = std::make_shared(file_path_); } else { target_ = std::make_shared(size); } // chunk layout: nullbitmaps, offsets, elem_off1, elem_off2, .. data1, data2, ..., datan, padding write_null_bit_maps(null_bitmaps); int offsets_num = row_nums_ + 1; int len_num = row_nums_; uint32_t offset_start_pos = target_->tell() + sizeof(uint32_t) * (offsets_num + len_num); std::vector offsets(offsets_num); std::vector lens(len_num); for (auto i = 0; i < arrays.size(); i++) { auto& arr = arrays[i]; offsets[i] = offset_start_pos; lens[i] = arr.length(); offset_start_pos += is_string ? sizeof(uint32_t) * lens[i] : 0; offset_start_pos += arr.byte_size(); } if (offsets_num > 0) { offsets[offsets_num - 1] = offset_start_pos; } for (int i = 0; i < offsets.size(); i++) { if (i == offsets.size() - 1) { target_->write(&offsets[i], sizeof(uint32_t)); break; } target_->write(&offsets[i], sizeof(uint32_t)); target_->write(&lens[i], sizeof(uint32_t)); } for (auto& arr : arrays) { if (is_string) { target_->write(arr.get_offsets_data(), arr.length() * sizeof(uint32_t)); } target_->write(arr.data(), arr.byte_size()); } } std::shared_ptr ArrayChunkWriter::finish() { char padding[MMAP_ARRAY_PADDING]; target_->write(padding, MMAP_ARRAY_PADDING); auto [data, size] = target_->get(); auto mmap_file_raii = file_path_.empty() ? nullptr : std::make_unique(file_path_); return std::make_unique(row_nums_, data, size, element_type_, nullable_, std::move(mmap_file_raii)); } void SparseFloatVectorChunkWriter::write( std::shared_ptr data) { auto size = 0; std::vector strs; std::vector> null_bitmaps; for (auto batch : *data) { auto data = batch.ValueOrDie()->column(0); auto array = std::dynamic_pointer_cast(data); for (int i = 0; i < array->length(); i++) { auto str = array->GetView(i); strs.emplace_back(str); size += str.size(); } auto null_bitmap_n = (data->length() + 7) / 8; null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n); size += null_bitmap_n; row_nums_ += array->length(); } size += sizeof(uint64_t) * (row_nums_ + 1); if (!file_path_.empty()) { target_ = std::make_shared(file_path_); } else { target_ = std::make_shared(size); } // chunk layout: null bitmap, offset1, offset2, ..., offsetn, str1, str2, ..., strn // write null bitmaps for (auto [data, size] : null_bitmaps) { if (data == nullptr) { std::vector null_bitmap(size, 0xff); target_->write(null_bitmap.data(), size); } else { target_->write(data, size); } } // write data int offset_num = row_nums_ + 1; int offset_start_pos = target_->tell() + sizeof(uint64_t) * offset_num; std::vector offsets; for (const auto& str : strs) { offsets.push_back(offset_start_pos); offset_start_pos += str.size(); } offsets.push_back(offset_start_pos); target_->write(offsets.data(), offsets.size() * sizeof(uint64_t)); for (auto str : strs) { target_->write(str.data(), str.size()); } } std::shared_ptr SparseFloatVectorChunkWriter::finish() { auto [data, size] = target_->get(); auto mmap_file_raii = file_path_.empty() ? nullptr : std::make_unique(file_path_); return std::make_unique( row_nums_, data, size, nullable_, std::move(mmap_file_raii)); } template std::shared_ptr create_chunk_writer(const FieldMeta& field_meta, Args&&... args) { int dim = IsVectorDataType(field_meta.get_data_type()) && !IsSparseFloatVectorDataType(field_meta.get_data_type()) ? field_meta.get_dim() : 1; bool nullable = field_meta.is_nullable(); switch (field_meta.get_data_type()) { case milvus::DataType::BOOL: return std::make_shared>( dim, std::forward(args)..., nullable); case milvus::DataType::INT8: return std::make_shared>( dim, std::forward(args)..., nullable); case milvus::DataType::INT16: return std::make_shared>( dim, std::forward(args)..., nullable); case milvus::DataType::INT32: return std::make_shared>( dim, std::forward(args)..., nullable); case milvus::DataType::INT64: return std::make_shared>( dim, std::forward(args)..., nullable); case milvus::DataType::FLOAT: return std::make_shared>( dim, std::forward(args)..., nullable); case milvus::DataType::DOUBLE: return std::make_shared>( dim, std::forward(args)..., nullable); case milvus::DataType::VECTOR_FLOAT: return std::make_shared< ChunkWriter>( dim, std::forward(args)..., nullable); case milvus::DataType::VECTOR_BINARY: return std::make_shared< ChunkWriter>( dim / 8, std::forward(args)..., nullable); case milvus::DataType::VECTOR_FLOAT16: return std::make_shared< ChunkWriter>( dim, std::forward(args)..., nullable); case milvus::DataType::VECTOR_BFLOAT16: return std::make_shared< ChunkWriter>( dim, std::forward(args)..., nullable); case milvus::DataType::VARCHAR: case milvus::DataType::STRING: return std::make_shared( std::forward(args)..., nullable); case milvus::DataType::JSON: return std::make_shared( std::forward(args)..., nullable); case milvus::DataType::GEOMETRY: { return std::make_shared( std::forward(args)..., nullable); } case milvus::DataType::ARRAY: return std::make_shared( field_meta.get_element_type(), std::forward(args)..., nullable); case milvus::DataType::VECTOR_SPARSE_FLOAT: return std::make_shared( std::forward(args)..., nullable); default: PanicInfo(Unsupported, "Unsupported data type"); } } std::shared_ptr create_chunk(const FieldMeta& field_meta, std::shared_ptr r) { auto cw = create_chunk_writer(field_meta); cw->write(std::move(r)); return cw->finish(); } std::shared_ptr create_chunk(const FieldMeta& field_meta, std::shared_ptr r, const std::string& file_path) { auto cw = create_chunk_writer(field_meta, file_path); cw->write(std::move(r)); return cw->finish(); } } // namespace milvus