feat: Storage v2 growing segment load (#41001)

support parallel loading sealed and growing segments with storage v2
format by async reading row groups.
related: #39173

---------

Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
This commit is contained in:
sthuang 2025-04-16 17:14:33 +08:00 committed by GitHub
parent d4b56ea4ab
commit 1f1c836fb9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 1525 additions and 165 deletions

View File

@ -12,11 +12,13 @@
#include "common/ChunkWriter.h"
#include <cstdint>
#include <memory>
#include <unordered_map>
#include <utility>
#include <vector>
#include "arrow/array/array_binary.h"
#include "arrow/array/array_primitive.h"
#include "arrow/record_batch.h"
#include "arrow/type_fwd.h"
#include "common/Chunk.h"
#include "common/EasyAssert.h"
#include "common/FieldDataInterface.h"
@ -27,15 +29,11 @@
namespace milvus {
void
StringChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
StringChunkWriter::write(const arrow::ArrayVector& array_vec) {
auto size = 0;
std::vector<std::string_view> strs;
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
std::vector<std::pair<const uint8_t*, int64_t>> null_bitmaps;
for (auto batch : *data) {
auto batch_data = batch.ValueOrDie();
batches.emplace_back(batch_data);
auto data = batch_data->column(0);
for (const auto& data : array_vec) {
auto array = std::dynamic_pointer_cast<arrow::StringArray>(data);
for (int i = 0; i < array->length(); i++) {
auto str = array->GetView(i);
@ -89,12 +87,11 @@ StringChunkWriter::finish() {
}
void
JSONChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
JSONChunkWriter::write(const arrow::ArrayVector& array_vec) {
auto size = 0;
std::vector<Json> jsons;
std::vector<std::pair<const uint8_t*, int64_t>> null_bitmaps;
for (auto batch : *data) {
auto data = batch.ValueOrDie()->column(0);
for (const auto& data : array_vec) {
auto array = std::dynamic_pointer_cast<arrow::BinaryArray>(data);
for (int i = 0; i < array->length(); i++) {
auto str = array->GetView(i);
@ -148,14 +145,13 @@ JSONChunkWriter::finish() {
}
void
ArrayChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
ArrayChunkWriter::write(const arrow::ArrayVector& array_vec) {
auto size = 0;
auto is_string = IsStringDataType(element_type_);
std::vector<Array> arrays;
std::vector<std::pair<const uint8_t*, int64_t>> null_bitmaps;
for (auto batch : *data) {
auto data = batch.ValueOrDie()->column(0);
for (const auto& data : array_vec) {
auto array = std::dynamic_pointer_cast<arrow::BinaryArray>(data);
for (int i = 0; i < array->length(); i++) {
auto str = array->GetView(i);
@ -234,13 +230,11 @@ ArrayChunkWriter::finish() {
}
void
SparseFloatVectorChunkWriter::write(
std::shared_ptr<arrow::RecordBatchReader> data) {
SparseFloatVectorChunkWriter::write(const arrow::ArrayVector& array_vec) {
auto size = 0;
std::vector<std::string> strs;
std::vector<std::pair<const uint8_t*, int64_t>> null_bitmaps;
for (auto batch : *data) {
auto data = batch.ValueOrDie()->column(0);
for (const auto& data : array_vec) {
auto array = std::dynamic_pointer_cast<arrow::BinaryArray>(data);
for (int i = 0; i < array->length(); i++) {
auto str = array->GetView(i);
@ -299,7 +293,7 @@ SparseFloatVectorChunkWriter::finish() {
std::shared_ptr<Chunk>
create_chunk(const FieldMeta& field_meta,
int dim,
std::shared_ptr<arrow::RecordBatchReader> r) {
const arrow::ArrayVector& array_vec) {
std::shared_ptr<ChunkWriterBase> w;
bool nullable = field_meta.is_nullable();
@ -392,7 +386,7 @@ create_chunk(const FieldMeta& field_meta,
PanicInfo(Unsupported, "Unsupported data type");
}
w->write(std::move(r));
w->write(array_vec);
return w->finish();
}
@ -401,7 +395,7 @@ create_chunk(const FieldMeta& field_meta,
int dim,
File& file,
size_t file_offset,
std::shared_ptr<arrow::RecordBatchReader> r) {
const arrow::ArrayVector& array_vec) {
std::shared_ptr<ChunkWriterBase> w;
bool nullable = field_meta.is_nullable();
@ -496,8 +490,18 @@ create_chunk(const FieldMeta& field_meta,
PanicInfo(Unsupported, "Unsupported data type");
}
w->write(std::move(r));
w->write(array_vec);
return w->finish();
}
arrow::ArrayVector
read_single_column_batches(std::shared_ptr<arrow::RecordBatchReader> reader) {
arrow::ArrayVector array_vec;
for (auto batch : *reader) {
auto batch_data = batch.ValueOrDie();
array_vec.push_back(std::move(batch_data->column(0)));
}
return array_vec;
}
} // namespace milvus

View File

@ -16,6 +16,7 @@
#include <numeric>
#include <vector>
#include "arrow/array/array_primitive.h"
#include "arrow/type_fwd.h"
#include "common/ChunkTarget.h"
#include "arrow/record_batch.h"
#include "common/Chunk.h"
@ -32,7 +33,7 @@ class ChunkWriterBase {
}
virtual void
write(std::shared_ptr<arrow::RecordBatchReader> data) = 0;
write(const arrow::ArrayVector& data) = 0;
virtual std::shared_ptr<Chunk>
finish() = 0;
@ -76,15 +77,12 @@ class ChunkWriter final : public ChunkWriterBase {
: ChunkWriterBase(file, offset, nullable), dim_(dim){};
void
write(std::shared_ptr<arrow::RecordBatchReader> data) override {
write(const arrow::ArrayVector& array_vec) override {
auto size = 0;
auto row_nums = 0;
auto batch_vec = data->ToRecordBatches().ValueOrDie();
for (auto& batch : batch_vec) {
row_nums += batch->num_rows();
auto data = batch->column(0);
for (const auto& data : array_vec) {
row_nums += data->length();
auto array = std::static_pointer_cast<ArrowType>(data);
if (nullable_) {
auto null_bitmap_n = (data->length() + 7) / 8;
@ -104,8 +102,7 @@ class ChunkWriter final : public ChunkWriterBase {
// 2. Data values: Contiguous storage of data elements in the order:
// data1, data2, ..., dataN where each data element has size dim_*sizeof(T)
if (nullable_) {
for (auto& batch : batch_vec) {
auto data = batch->column(0);
for (const auto& data : array_vec) {
auto null_bitmap = data->null_bitmap_data();
auto null_bitmap_n = (data->length() + 7) / 8;
if (null_bitmap) {
@ -117,8 +114,7 @@ class ChunkWriter final : public ChunkWriterBase {
}
}
for (auto& batch : batch_vec) {
auto data = batch->column(0);
for (const auto& data : array_vec) {
auto array = std::static_pointer_cast<ArrowType>(data);
auto data_ptr = array->raw_values();
target_->write(data_ptr, array->length() * dim_ * sizeof(T));
@ -139,14 +135,12 @@ class ChunkWriter final : public ChunkWriterBase {
template <>
inline void
ChunkWriter<arrow::BooleanArray, bool>::write(
std::shared_ptr<arrow::RecordBatchReader> data) {
const arrow::ArrayVector& array_vec) {
auto size = 0;
auto row_nums = 0;
auto batch_vec = data->ToRecordBatches().ValueOrDie();
for (auto& batch : batch_vec) {
row_nums += batch->num_rows();
auto data = batch->column(0);
for (const auto& data : array_vec) {
row_nums += data->length();
auto array = std::dynamic_pointer_cast<arrow::BooleanArray>(data);
size += array->length() * dim_;
size += (data->length() + 7) / 8;
@ -160,8 +154,7 @@ ChunkWriter<arrow::BooleanArray, bool>::write(
if (nullable_) {
// chunk layout: nullbitmap, data1, data2, ..., datan
for (auto& batch : batch_vec) {
auto data = batch->column(0);
for (const auto& data : array_vec) {
auto null_bitmap = data->null_bitmap_data();
auto null_bitmap_n = (data->length() + 7) / 8;
if (null_bitmap) {
@ -173,8 +166,7 @@ ChunkWriter<arrow::BooleanArray, bool>::write(
}
}
for (auto& batch : batch_vec) {
auto data = batch->column(0);
for (const auto& data : array_vec) {
auto array = std::dynamic_pointer_cast<arrow::BooleanArray>(data);
for (int i = 0; i < array->length(); i++) {
auto value = array->Value(i);
@ -188,7 +180,7 @@ class StringChunkWriter : public ChunkWriterBase {
using ChunkWriterBase::ChunkWriterBase;
void
write(std::shared_ptr<arrow::RecordBatchReader> data) override;
write(const arrow::ArrayVector& array_vec) override;
std::shared_ptr<Chunk>
finish() override;
@ -199,7 +191,7 @@ class JSONChunkWriter : public ChunkWriterBase {
using ChunkWriterBase::ChunkWriterBase;
void
write(std::shared_ptr<arrow::RecordBatchReader> data) override;
write(const arrow::ArrayVector& array_vec) override;
std::shared_ptr<Chunk>
finish() override;
@ -218,7 +210,7 @@ class ArrayChunkWriter : public ChunkWriterBase {
}
void
write(std::shared_ptr<arrow::RecordBatchReader> data) override;
write(const arrow::ArrayVector& array_vec) override;
std::shared_ptr<Chunk>
finish() override;
@ -232,7 +224,7 @@ class SparseFloatVectorChunkWriter : public ChunkWriterBase {
using ChunkWriterBase::ChunkWriterBase;
void
write(std::shared_ptr<arrow::RecordBatchReader> data) override;
write(const arrow::ArrayVector& array_vec) override;
std::shared_ptr<Chunk>
finish() override;
@ -241,12 +233,16 @@ class SparseFloatVectorChunkWriter : public ChunkWriterBase {
std::shared_ptr<Chunk>
create_chunk(const FieldMeta& field_meta,
int dim,
std::shared_ptr<arrow::RecordBatchReader> r);
const arrow::ArrayVector& array_vec);
std::shared_ptr<Chunk>
create_chunk(const FieldMeta& field_meta,
int dim,
File& file,
size_t file_offset,
std::shared_ptr<arrow::RecordBatchReader> r);
const arrow::ArrayVector& array_vec);
arrow::ArrayVector
read_single_column_batches(std::shared_ptr<arrow::RecordBatchReader> reader);
} // namespace milvus

View File

@ -17,6 +17,7 @@
#include "common/FieldData.h"
#include "arrow/array/array_binary.h"
#include "arrow/chunked_array.h"
#include "common/Array.h"
#include "common/EasyAssert.h"
#include "common/Exception.h"
@ -91,6 +92,20 @@ GetDataInfoFromArray(const std::shared_ptr<arrow::Array> array) {
return std::make_pair(typed_array->raw_values(), element_count);
}
template <typename Type, bool is_type_entire_row>
void
FieldDataImpl<Type, is_type_entire_row>::FillFieldData(
const std::shared_ptr<arrow::ChunkedArray> arrays) {
AssertInfo(arrays != nullptr, "null arrow chunked array");
auto element_count = arrays->length();
if (element_count == 0) {
return;
}
for (const auto& array : arrays->chunks()) {
FillFieldData(array);
}
}
template <typename Type, bool is_type_entire_row>
void
FieldDataImpl<Type, is_type_entire_row>::FillFieldData(

View File

@ -170,6 +170,7 @@ struct ArrowDataWrapper {
std::shared_ptr<parquet::arrow::FileReader> arrow_reader;
// underlying file data memory, must outlive the arrow reader
std::shared_ptr<uint8_t[]> file_data;
std::vector<std::shared_ptr<arrow::Table>> arrow_tables;
};
using ArrowReaderChannel = Channel<std::shared_ptr<milvus::ArrowDataWrapper>>;

View File

@ -28,6 +28,7 @@
#include "Types.h"
#include "arrow/api.h"
#include "arrow/array/array_binary.h"
#include "arrow/chunked_array.h"
#include "common/FieldMeta.h"
#include "common/Utils.h"
#include "common/VectorTrait.h"
@ -56,6 +57,8 @@ class FieldDataBase {
const uint8_t* valid_data,
ssize_t element_count) = 0;
virtual void
FillFieldData(const std::shared_ptr<arrow::ChunkedArray> arrays) = 0;
virtual void
FillFieldData(const std::shared_ptr<arrow::Array> array) = 0;
@ -173,6 +176,14 @@ class FieldBitsetImpl : public FieldDataBase {
"not implemented for bitset");
}
void
FillFieldData(const std::shared_ptr<arrow::ChunkedArray> arrays) override {
PanicInfo(
NotImplemented,
"FillFieldData(const std::shared_ptr<arrow::ChunkedArray>& arrays) "
"not implemented for bitset");
}
void
FillFieldData(const std::optional<DefaultValueType> default_value,
ssize_t element_count) override {
@ -367,6 +378,9 @@ class FieldDataImpl : public FieldDataBase {
const uint8_t* valid_data,
ssize_t element_count) override;
void
FillFieldData(const std::shared_ptr<arrow::ChunkedArray> arrays) override;
void
FillFieldData(const std::shared_ptr<arrow::Array> array) override;

View File

@ -17,11 +17,15 @@
#include <cstddef>
#include <optional>
#include <string>
#include "arrow/type.h"
#include <boost/lexical_cast.hpp>
#include <google/protobuf/text_format.h>
#include <memory>
#include "Schema.h"
#include "SystemProperty.h"
#include "arrow/util/key_value_metadata.h"
#include "milvus-storage/common/constants.h"
#include "protobuf_utils.h"
namespace milvus {
@ -65,4 +69,24 @@ Schema::ParseFrom(const milvus::proto::schema::CollectionSchema& schema_proto) {
const FieldMeta FieldMeta::RowIdMeta(
FieldName("RowID"), RowFieldID, DataType::INT64, false, std::nullopt);
const ArrowSchemaPtr
Schema::ConvertToArrowSchema() const {
arrow::FieldVector arrow_fields;
for (auto& field : fields_) {
auto meta = field.second;
int dim = IsVectorDataType(meta.get_data_type()) &&
!IsSparseFloatVectorDataType(meta.get_data_type())
? meta.get_dim()
: 1;
auto arrow_field = std::make_shared<arrow::Field>(
meta.get_name().get(),
GetArrowDataType(meta.get_data_type(), dim),
meta.is_nullable(),
arrow::key_value_metadata({milvus_storage::ARROW_FIELD_ID_KEY},
{std::to_string(meta.get_id().get())}));
arrow_fields.push_back(arrow_field);
}
return arrow::schema(arrow_fields);
}
} // namespace milvus

View File

@ -27,8 +27,11 @@
#include "pb/schema.pb.h"
#include "Consts.h"
#include "arrow/type.h"
namespace milvus {
using ArrowSchemaPtr = std::shared_ptr<arrow::Schema>;
static int64_t debug_id = START_USER_FIELDID;
class Schema {
@ -243,6 +246,9 @@ class Schema {
return dynamic_field_id_opt_;
}
const ArrowSchemaPtr
ConvertToArrowSchema() const;
public:
static std::shared_ptr<Schema>
ParseFrom(const milvus::proto::schema::CollectionSchema& schema_proto);

View File

@ -36,6 +36,8 @@
#include <variant>
#include <vector>
#include "arrow/type.h"
#include "arrow/type_fwd.h"
#include "fmt/core.h"
#include "knowhere/binaryset.h"
#include "knowhere/comp/index_param.h"
@ -148,6 +150,50 @@ GetDataTypeSize(DataType data_type, int dim = 1) {
}
}
inline std::shared_ptr<arrow::DataType>
GetArrowDataType(DataType data_type, int dim = 1) {
switch (data_type) {
case DataType::BOOL:
return arrow::boolean();
case DataType::INT8:
return arrow::int8();
case DataType::INT16:
return arrow::int16();
case DataType::INT32:
return arrow::int32();
case DataType::INT64:
return arrow::int64();
case DataType::FLOAT:
return arrow::float32();
case DataType::DOUBLE:
return arrow::float64();
case DataType::STRING:
case DataType::VARCHAR:
case DataType::TEXT:
return arrow::utf8();
case DataType::ARRAY:
case DataType::JSON:
return arrow::binary();
case DataType::VECTOR_FLOAT:
return arrow::fixed_size_binary(dim * 4);
case DataType::VECTOR_BINARY: {
return arrow::fixed_size_binary((dim + 7) / 8);
}
case DataType::VECTOR_FLOAT16:
case DataType::VECTOR_BFLOAT16:
return arrow::fixed_size_binary(dim * 2);
case DataType::VECTOR_SPARSE_FLOAT:
return arrow::binary();
case DataType::VECTOR_INT8:
return arrow::fixed_size_binary(dim);
default: {
PanicInfo(DataTypeInvalid,
fmt::format("failed to get data type, invalid type {}",
data_type));
}
}
}
template <typename T>
inline size_t
GetVecRowSize(int64_t dim) {

View File

@ -285,8 +285,10 @@ ChunkedSegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
std::nullopt);
std::shared_ptr<milvus::ArrowDataWrapper> r;
while (data.arrow_reader_channel->pop(r)) {
arrow::ArrayVector array_vec =
read_single_column_batches(r->reader);
auto chunk = std::dynamic_pointer_cast<FixedWidthChunk>(
create_chunk(field_meta, 1, r->reader));
create_chunk(field_meta, 1, array_vec));
std::copy_n(static_cast<const Timestamp*>(chunk->Span().data()),
chunk->Span().row_count(),
timestamps.data() + offset);
@ -349,7 +351,9 @@ ChunkedSegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
field_meta);
std::shared_ptr<milvus::ArrowDataWrapper> r;
while (data.arrow_reader_channel->pop(r)) {
auto chunk = create_chunk(field_meta, 1, r->reader);
arrow::ArrayVector array_vec =
read_single_column_batches(r->reader);
auto chunk = create_chunk(field_meta, 1, array_vec);
var_column->AddChunk(chunk);
}
// var_column->Seal();
@ -365,7 +369,9 @@ ChunkedSegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
field_meta);
std::shared_ptr<milvus::ArrowDataWrapper> r;
while (data.arrow_reader_channel->pop(r)) {
auto chunk = create_chunk(field_meta, 1, r->reader);
arrow::ArrayVector array_vec =
read_single_column_batches(r->reader);
auto chunk = create_chunk(field_meta, 1, array_vec);
var_column->AddChunk(chunk);
}
// var_column->Seal();
@ -379,7 +385,9 @@ ChunkedSegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
std::make_shared<ChunkedArrayColumn>(field_meta);
std::shared_ptr<milvus::ArrowDataWrapper> r;
while (data.arrow_reader_channel->pop(r)) {
auto chunk = create_chunk(field_meta, 1, r->reader);
arrow::ArrayVector array_vec =
read_single_column_batches(r->reader);
auto chunk = create_chunk(field_meta, 1, array_vec);
var_column->AddChunk(chunk);
}
column = std::move(var_column);
@ -390,7 +398,9 @@ ChunkedSegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
std::make_shared<ChunkedSparseFloatColumn>(field_meta);
std::shared_ptr<milvus::ArrowDataWrapper> r;
while (data.arrow_reader_channel->pop(r)) {
auto chunk = create_chunk(field_meta, 1, r->reader);
arrow::ArrayVector array_vec =
read_single_column_batches(r->reader);
auto chunk = create_chunk(field_meta, 1, array_vec);
col->AddChunk(chunk);
}
column = std::move(col);
@ -409,6 +419,8 @@ ChunkedSegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
column = std::make_shared<ChunkedColumn>(field_meta);
std::shared_ptr<milvus::ArrowDataWrapper> r;
while (data.arrow_reader_channel->pop(r)) {
arrow::ArrayVector array_vec =
read_single_column_batches(r->reader);
auto chunk =
create_chunk(field_meta,
IsVectorDataType(field_meta.get_data_type()) &&
@ -416,7 +428,7 @@ ChunkedSegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) {
field_meta.get_data_type())
? field_meta.get_dim()
: 1,
r->reader);
array_vec);
// column->AppendBatch(field_data);
// stats_.mem_size += field_data->Size();
column->AddChunk(chunk);
@ -501,13 +513,7 @@ ChunkedSegmentSealedImpl::MapFieldData(const FieldId field_id,
size_t file_offset = 0;
std::vector<std::shared_ptr<Chunk>> chunks;
while (data.arrow_reader_channel->pop(r)) {
// WriteFieldData(file,
// data_type,
// field_data,
// total_written,
// indices,
// element_indices,
// valid_data);
arrow::ArrayVector array_vec = read_single_column_batches(r->reader);
auto chunk = create_chunk(
field_meta,
IsVectorDataType(field_meta.get_data_type()) &&
@ -516,7 +522,7 @@ ChunkedSegmentSealedImpl::MapFieldData(const FieldId field_id,
: 1,
file,
file_offset,
r->reader);
array_vec);
file_offset += chunk->Size();
chunks.push_back(chunk);
}

View File

@ -18,6 +18,7 @@
#include <thread>
#include <boost/iterator/counting_iterator.hpp>
#include <type_traits>
#include <unordered_map>
#include <variant>
#include "common/Consts.h"
@ -32,10 +33,14 @@
#include "query/SearchOnSealed.h"
#include "segcore/SegmentGrowingImpl.h"
#include "segcore/Utils.h"
#include "segcore/memory_planner.h"
#include "storage/RemoteChunkManagerSingleton.h"
#include "storage/Util.h"
#include "storage/ThreadPools.h"
#include "milvus-storage/format/parquet/file_reader.h"
#include "milvus-storage/filesystem/fs.h"
namespace milvus::segcore {
int64_t
@ -230,6 +235,18 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
void
SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) {
switch (infos.storage_version) {
case 2:
load_column_group_data_internal(infos);
break;
default:
load_field_data_internal(infos);
break;
}
}
void
SegmentGrowingImpl::load_field_data_internal(const LoadFieldDataInfo& infos) {
AssertInfo(infos.field_infos.find(TimestampFieldID.get()) !=
infos.field_infos.end(),
"timestamps field data should be included");
@ -290,77 +307,197 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) {
this->get_segment_id(),
field_id.get());
auto field_data = storage::CollectFieldDataChannel(channel);
if (field_id == TimestampFieldID) {
// step 2: sort timestamp
// query node already guarantees that the timestamp is ordered, avoid field data copy in c++
load_field_data_common(
field_id, reserved_offset, field_data, primary_field_id, num_rows);
}
// step 3: fill into Segment.ConcurrentVector
insert_record_.timestamps_.set_data_raw(reserved_offset,
field_data);
continue;
// step 5: update small indexes
insert_record_.ack_responder_.AddSegment(reserved_offset,
reserved_offset + num_rows);
}
void
SegmentGrowingImpl::load_field_data_common(
FieldId field_id,
size_t reserved_offset,
const std::vector<FieldDataPtr>& field_data,
FieldId primary_field_id,
size_t num_rows) {
if (field_id == TimestampFieldID) {
// step 2: sort timestamp
// query node already guarantees that the timestamp is ordered, avoid field data copy in c++
// step 3: fill into Segment.ConcurrentVector
insert_record_.timestamps_.set_data_raw(reserved_offset, field_data);
return;
}
if (field_id == RowFieldID) {
return;
}
if (!indexing_record_.SyncDataWithIndex(field_id)) {
if (insert_record_.is_valid_data_exist(field_id)) {
insert_record_.get_valid_data(field_id)->set_data_raw(field_data);
}
if (field_id == RowFieldID) {
continue;
insert_record_.get_data_base(field_id)->set_data_raw(reserved_offset,
field_data);
}
if (segcore_config_.get_enable_interim_segment_index()) {
auto offset = reserved_offset;
for (auto& data : field_data) {
auto row_count = data->get_num_rows();
indexing_record_.AppendingIndex(
offset, row_count, field_id, data, insert_record_);
offset += row_count;
}
}
try_remove_chunks(field_id);
if (!indexing_record_.SyncDataWithIndex(field_id)) {
if (insert_record_.is_valid_data_exist(field_id)) {
insert_record_.get_valid_data(field_id)->set_data_raw(
field_data);
}
insert_record_.get_data_base(field_id)->set_data_raw(
reserved_offset, field_data);
}
if (segcore_config_.get_enable_interim_segment_index()) {
auto offset = reserved_offset;
for (auto& data : field_data) {
auto row_count = data->get_num_rows();
indexing_record_.AppendingIndex(
offset, row_count, field_id, data, insert_record_);
offset += row_count;
}
}
try_remove_chunks(field_id);
if (field_id == primary_field_id) {
insert_record_.insert_pks(field_data);
}
if (field_id == primary_field_id) {
insert_record_.insert_pks(field_data);
}
// update average row data size
auto field_meta = (*schema_)[field_id];
if (IsVariableDataType(field_meta.get_data_type())) {
SegmentInternalInterface::set_field_avg_size(
field_id, num_rows, storage::GetByteSizeOfFieldDatas(field_data));
}
// update average row data size
auto field_meta = (*schema_)[field_id];
if (IsVariableDataType(field_meta.get_data_type())) {
SegmentInternalInterface::set_field_avg_size(
field_id,
num_rows,
storage::GetByteSizeOfFieldDatas(field_data));
}
// build text match index
if (field_meta.enable_match()) {
auto index = GetTextIndex(field_id);
index->BuildIndexFromFieldData(field_data, field_meta.is_nullable());
index->Commit();
// Reload reader so that the index can be read immediately
index->Reload();
}
// build text match index
if (field_meta.enable_match()) {
auto index = GetTextIndex(field_id);
index->BuildIndexFromFieldData(field_data,
field_meta.is_nullable());
index->Commit();
// Reload reader so that the index can be read immediately
index->Reload();
}
// build json match index
if (field_meta.enable_growing_jsonStats()) {
auto index = GetJsonKeyIndex(field_id);
index->BuildWithFieldData(field_data, field_meta.is_nullable());
index->Commit();
// Reload reader so that the index can be read immediately
index->Reload();
}
// build json match index
if (field_meta.enable_growing_jsonStats()) {
auto index = GetJsonKeyIndex(field_id);
index->BuildWithFieldData(field_data, field_meta.is_nullable());
index->Commit();
// Reload reader so that the index can be read immediately
index->Reload();
}
// update the mem size
stats_.mem_size += storage::GetByteSizeOfFieldDatas(field_data);
// update the mem size
stats_.mem_size += storage::GetByteSizeOfFieldDatas(field_data);
LOG_INFO("segment {} loads field {} done",
this->get_segment_id(),
field_id.get());
}
LOG_INFO("segment {} loads field {} done",
void
SegmentGrowingImpl::load_column_group_data_internal(
const LoadFieldDataInfo& infos) {
auto primary_field_id =
schema_->get_primary_field_id().value_or(FieldId(-1));
size_t num_rows = storage::GetNumRowsForLoadInfo(infos);
auto reserved_offset = PreInsert(num_rows);
auto parallel_degree =
static_cast<uint64_t>(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
ArrowSchemaPtr arrow_schema = schema_->ConvertToArrowSchema();
for (auto& [id, info] : infos.field_infos) {
auto column_group_id = FieldId(id);
auto insert_files = info.insert_files;
std::sort(insert_files.begin(),
insert_files.end(),
[](const std::string& a, const std::string& b) {
return std::stol(a.substr(a.find_last_of('/') + 1)) <
std::stol(b.substr(b.find_last_of('/') + 1));
});
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
auto file_reader = std::make_shared<milvus_storage::FileRowGroupReader>(
fs, insert_files[0], arrow_schema);
std::shared_ptr<milvus_storage::PackedFileMetadata> metadata =
file_reader->file_metadata();
auto field_id_mapping = metadata->GetFieldIDMapping();
milvus_storage::FieldIDList field_ids =
metadata->GetGroupFieldIDList().GetFieldIDList(
column_group_id.get());
auto column_group_info =
FieldDataInfo(column_group_id.get(), num_rows, infos.mmap_dir_path);
column_group_info.arrow_reader_channel->set_capacity(parallel_degree);
LOG_INFO("segment {} loads column group {} with num_rows {}",
this->get_segment_id(),
field_id.get());
column_group_id.get(),
num_rows);
auto& pool =
ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);
// Get all row groups for each file
std::vector<std::vector<int64_t>> row_group_lists;
row_group_lists.reserve(insert_files.size());
for (const auto& file : insert_files) {
auto reader =
std::make_shared<milvus_storage::FileRowGroupReader>(fs, file);
auto row_group_num =
reader->file_metadata()->GetRowGroupMetadataVector().size();
std::vector<int64_t> all_row_groups(row_group_num);
std::iota(all_row_groups.begin(), all_row_groups.end(), 0);
row_group_lists.push_back(all_row_groups);
}
// create parallel degree split strategy
auto strategy =
std::make_unique<ParallelDegreeSplitStrategy>(parallel_degree);
auto load_future = pool.Submit([&]() {
return LoadWithStrategy(insert_files,
column_group_info.arrow_reader_channel,
DEFAULT_FIELD_MAX_MEMORY_LIMIT,
std::move(strategy),
row_group_lists);
});
LOG_INFO("segment {} submits load fields {} task to thread pool",
this->get_segment_id(),
field_ids.ToString());
std::shared_ptr<milvus::ArrowDataWrapper> r;
std::unordered_map<FieldId, std::vector<FieldDataPtr>> field_data_map;
while (column_group_info.arrow_reader_channel->pop(r)) {
for (const auto& table : r->arrow_tables) {
size_t batch_num_rows = table->num_rows();
for (int i = 0; i < field_ids.size(); ++i) {
auto field_id = FieldId(field_ids.Get(i));
for (auto& field : schema_->get_fields()) {
if (field.second.get_id().get() != field_id.get()) {
continue;
}
auto field_data = storage::CreateFieldData(
field.second.get_data_type(),
field.second.is_nullable(),
field.second.is_vector() ? field.second.get_dim()
: 0,
batch_num_rows);
field_data->FillFieldData(table->column(i));
field_data_map[field_id].push_back(field_data);
}
}
}
}
for (auto& [field_id, field_data] : field_data_map) {
load_field_data_common(field_id,
reserved_offset,
field_data,
primary_field_id,
num_rows);
}
}
// step 5: update small indexes

View File

@ -84,6 +84,19 @@ class SegmentGrowingImpl : public SegmentGrowing {
void
CreateTextIndex(FieldId field_id) override;
void
load_field_data_internal(const LoadFieldDataInfo& load_info);
void
load_column_group_data_internal(const LoadFieldDataInfo& load_info);
void
load_field_data_common(FieldId field_id,
size_t reserved_offset,
const std::vector<FieldDataPtr>& field_data,
FieldId primary_field_id,
size_t num_rows);
public:
const InsertRecord<>&
get_insert_record() const {

View File

@ -0,0 +1,245 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <cstddef>
#include <algorithm>
#include "milvus-storage/common/metadata.h"
#include "segcore/memory_planner.h"
#include <gtest/gtest.h>
#include <memory>
#include <vector>
#include <arrow/record_batch.h>
#include <future>
#include <memory>
#include <string>
#include <vector>
#include "arrow/type.h"
#include "common/EasyAssert.h"
#include "common/FieldData.h"
#include "milvus-storage/format/parquet/file_reader.h"
#include "milvus-storage/filesystem/fs.h"
#include "log/Log.h"
#include "storage/ThreadPools.h"
namespace milvus::segcore {
MemoryBasedSplitStrategy::MemoryBasedSplitStrategy(
const milvus_storage::RowGroupMetadataVector& row_group_metadatas)
: row_group_metadatas_(row_group_metadatas) {
}
ParallelDegreeSplitStrategy::ParallelDegreeSplitStrategy(
uint64_t parallel_degree)
: parallel_degree_(parallel_degree) {
}
std::vector<RowGroupBlock>
MemoryBasedSplitStrategy::split(const std::vector<int64_t>& input_row_groups) {
std::vector<RowGroupBlock> blocks;
if (input_row_groups.empty()) {
return blocks;
}
std::vector<int64_t> sorted_row_groups = input_row_groups;
std::sort(sorted_row_groups.begin(), sorted_row_groups.end());
int64_t current_start = sorted_row_groups[0];
int64_t current_count = 1;
int64_t current_memory =
row_group_metadatas_.Get(current_start).memory_size();
for (size_t i = 1; i < sorted_row_groups.size(); ++i) {
int64_t next_row_group = sorted_row_groups[i];
int64_t next_memory =
row_group_metadatas_.Get(next_row_group).memory_size();
if (next_row_group == current_start + current_count &&
current_memory + next_memory <= MAX_ROW_GROUP_BLOCK_MEMORY) {
current_count++;
current_memory += next_memory;
continue;
}
blocks.push_back({current_start, current_count});
current_start = next_row_group;
current_count = 1;
current_memory = next_memory;
}
if (current_count > 0) {
blocks.push_back({current_start, current_count});
}
return blocks;
}
std::vector<RowGroupBlock>
ParallelDegreeSplitStrategy::split(
const std::vector<int64_t>& input_row_groups) {
std::vector<RowGroupBlock> blocks;
if (input_row_groups.empty()) {
return blocks;
}
std::vector<int64_t> sorted_row_groups = input_row_groups;
std::sort(sorted_row_groups.begin(), sorted_row_groups.end());
uint64_t actual_parallel_degree = std::min(
parallel_degree_, static_cast<uint64_t>(sorted_row_groups.size()));
if (actual_parallel_degree == 0) {
return blocks;
}
// If row group size is less than parallel degree, split non-continuous groups
if (sorted_row_groups.size() <= actual_parallel_degree) {
int64_t current_start = sorted_row_groups[0];
int64_t current_count = 1;
for (size_t i = 1; i < sorted_row_groups.size(); ++i) {
int64_t next_row_group = sorted_row_groups[i];
if (next_row_group == current_start + current_count) {
current_count++;
continue;
}
blocks.push_back({current_start, current_count});
current_start = next_row_group;
current_count = 1;
}
if (current_count > 0) {
blocks.push_back({current_start, current_count});
}
return blocks;
}
// Otherwise, split based on parallel degree
size_t avg_block_size =
(sorted_row_groups.size() + actual_parallel_degree - 1) /
actual_parallel_degree;
int64_t current_start = sorted_row_groups[0];
int64_t current_count = 1;
for (size_t i = 1; i < sorted_row_groups.size(); ++i) {
int64_t next_row_group = sorted_row_groups[i];
if (next_row_group == current_start + current_count &&
current_count < avg_block_size) {
current_count++;
} else {
blocks.push_back({current_start, current_count});
current_start = next_row_group;
current_count = 1;
}
}
if (current_count > 0) {
blocks.push_back({current_start, current_count});
}
return blocks;
}
void
LoadWithStrategy(const std::vector<std::string>& remote_files,
std::shared_ptr<ArrowReaderChannel> channel,
int64_t memory_limit,
std::unique_ptr<RowGroupSplitStrategy> strategy,
const std::vector<std::vector<int64_t>>& row_group_lists) {
try {
AssertInfo(
remote_files.size() == row_group_lists.size(),
"Number of remote files must match number of row group lists");
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
auto& pool = ThreadPools::GetThreadPool(ThreadPoolPriority::HIGH);
for (size_t file_idx = 0; file_idx < remote_files.size(); ++file_idx) {
const auto& file = remote_files[file_idx];
const auto& row_groups = row_group_lists[file_idx];
if (row_groups.empty()) {
continue;
}
auto file_reader =
std::make_shared<milvus_storage::FileRowGroupReader>(
fs, file, memory_limit);
auto metadata = file_reader->file_metadata();
milvus_storage::RowGroupMetadataVector row_group_metadatas =
metadata->GetRowGroupMetadataVector();
// Use provided strategy to split row groups
auto blocks = strategy->split(row_groups);
// Create and submit tasks for each block
std::vector<std::future<std::shared_ptr<milvus::ArrowDataWrapper>>>
futures;
futures.reserve(blocks.size());
// split memory limit for each block, check if it's greater than 0
auto reader_memory_limit = memory_limit / blocks.size();
if (reader_memory_limit < FILE_SLICE_SIZE) {
reader_memory_limit = FILE_SLICE_SIZE;
}
for (const auto& block : blocks) {
futures.emplace_back(pool.Submit([block,
fs,
file,
memory_limit]() {
auto row_group_reader =
std::make_shared<milvus_storage::FileRowGroupReader>(
fs, file, nullptr, memory_limit);
row_group_reader->SetRowGroupOffsetAndCount(block.offset,
block.count);
auto ret = std::make_shared<ArrowDataWrapper>();
for (int64_t i = 0; i < block.count; ++i) {
std::shared_ptr<arrow::Table> table;
auto status =
row_group_reader->ReadNextRowGroup(&table);
AssertInfo(status.ok(),
"Failed to read row group " +
std::to_string(block.offset + i) +
" from file " + file + " with error " +
status.ToString());
ret->arrow_tables.push_back(table);
}
auto close_status = row_group_reader->Close();
AssertInfo(close_status.ok(),
"Failed to close row group reader for file " +
file + " with error " +
close_status.ToString());
return ret;
}));
}
for (auto& future : futures) {
auto field_data = future.get();
channel->push(field_data);
}
}
channel->close();
} catch (std::exception& e) {
LOG_INFO("failed to load data from remote: {}", e.what());
channel->close();
}
}
} // namespace milvus::segcore

View File

@ -0,0 +1,87 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <vector>
#include <cstdint>
#include <cstddef>
#include "milvus-storage/common/metadata.h"
#include <arrow/record_batch.h>
#include <vector>
#include "common/FieldData.h"
namespace milvus::segcore {
struct RowGroupBlock {
int64_t offset; // Start offset of the row group block
int64_t count; // Number of row groups in this block
bool
operator==(const RowGroupBlock& other) const {
return offset == other.offset && count == other.count;
}
};
const std::size_t MAX_ROW_GROUP_BLOCK_MEMORY = 16 << 20;
// Strategy interface for row group splitting
class RowGroupSplitStrategy {
public:
virtual ~RowGroupSplitStrategy() = default;
virtual std::vector<RowGroupBlock>
split(const std::vector<int64_t>& input_row_groups) = 0;
};
// Memory-based splitting strategy
class MemoryBasedSplitStrategy : public RowGroupSplitStrategy {
public:
explicit MemoryBasedSplitStrategy(
const milvus_storage::RowGroupMetadataVector& row_group_metadatas);
std::vector<RowGroupBlock>
split(const std::vector<int64_t>& input_row_groups) override;
private:
const milvus_storage::RowGroupMetadataVector& row_group_metadatas_;
};
// Parallel degree based splitting strategy
class ParallelDegreeSplitStrategy : public RowGroupSplitStrategy {
public:
explicit ParallelDegreeSplitStrategy(uint64_t parallel_degree);
std::vector<RowGroupBlock>
split(const std::vector<int64_t>& input_row_groups) override;
private:
uint64_t parallel_degree_;
};
/*
* Load storage v2 files with specified strategy. The number of row group readers is determined by the strategy.
*
* @param remote_files: list of remote files
* @param channel: channel to store the loaded data
* @param memory_limit: memory limit for each chunk
* @param strategy: strategy to split row groups
* @param row_group_lists: list of row group lists
*/
void
LoadWithStrategy(const std::vector<std::string>& remote_files,
std::shared_ptr<ArrowReaderChannel> channel,
int64_t memory_limit,
std::unique_ptr<RowGroupSplitStrategy> strategy,
const std::vector<std::vector<int64_t>>& row_group_lists);
} // namespace milvus::segcore

View File

@ -81,8 +81,10 @@ ChunkCache::Read(const std::string& filepath,
std::filesystem::create_directories(dir);
auto file = File::Open(path.string(), O_CREAT | O_TRUNC | O_RDWR);
chunk = create_chunk(
field_meta, dim, file, 0, field_data->GetReader()->reader);
arrow::ArrayVector array_vec =
read_single_column_batches(field_data->GetReader()->reader);
chunk = create_chunk(field_meta, dim, file, 0, array_vec);
// unlink
auto ok = unlink(path.c_str());
AssertInfo(ok == 0,
@ -90,8 +92,9 @@ ChunkCache::Read(const std::string& filepath,
path.c_str(),
strerror(errno));
} else {
chunk =
create_chunk(field_meta, dim, field_data->GetReader()->reader);
arrow::ArrayVector array_vec =
read_single_column_batches(field_data->GetReader()->reader);
chunk = create_chunk(field_meta, dim, array_vec);
}
auto data_type = field_meta.get_data_type();

View File

@ -14,7 +14,7 @@
# Update milvus-storage_VERSION for the first occurrence
milvus_add_pkg_config("milvus-storage")
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
set( milvus-storage_VERSION 6cf3724 )
set( milvus-storage_VERSION d143229 )
set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git")
message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}")
message(STATUS "milvus-storage version: ${milvus-storage_VERSION}")

View File

@ -97,6 +97,9 @@ set(MILVUS_TEST_FILES
test_random_sample.cpp
test_json_index.cpp
test_json_key_stats_index.cpp
test_types.cpp
test_growing_storage_v2.cpp
test_memory_planner.cpp
)
if ( INDEX_ENGINE STREQUAL "cardinal" )

View File

@ -65,7 +65,8 @@ TEST(chunk, test_int64_field) {
DataType::INT64,
false,
std::nullopt);
auto chunk = create_chunk(field_meta, 1, rb_reader);
arrow::ArrayVector array_vec = read_single_column_batches(rb_reader);
auto chunk = create_chunk(field_meta, 1, array_vec);
auto span = std::dynamic_pointer_cast<FixedWidthChunk>(chunk)->Span();
EXPECT_EQ(span.row_count(), data.size());
for (size_t i = 0; i < data.size(); ++i) {
@ -106,7 +107,8 @@ TEST(chunk, test_variable_field) {
DataType::STRING,
false,
std::nullopt);
auto chunk = create_chunk(field_meta, 1, rb_reader);
arrow::ArrayVector array_vec = read_single_column_batches(rb_reader);
auto chunk = create_chunk(field_meta, 1, array_vec);
auto views = std::dynamic_pointer_cast<StringChunk>(chunk)->StringViews(
std::nullopt);
for (size_t i = 0; i < data.size(); ++i) {
@ -150,7 +152,8 @@ TEST(chunk, test_variable_field_nullable) {
DataType::STRING,
true,
std::nullopt);
auto chunk = create_chunk(field_meta, 1, rb_reader);
arrow::ArrayVector array_vec = read_single_column_batches(rb_reader);
auto chunk = create_chunk(field_meta, 1, array_vec);
auto views = std::dynamic_pointer_cast<StringChunk>(chunk)->StringViews(
std::nullopt);
for (size_t i = 0; i < data.size(); ++i) {
@ -206,7 +209,8 @@ TEST(chunk, test_json_field) {
DataType::JSON,
false,
std::nullopt);
auto chunk = create_chunk(field_meta, 1, rb_reader);
arrow::ArrayVector array_vec = read_single_column_batches(rb_reader);
auto chunk = create_chunk(field_meta, 1, array_vec);
{
auto [views, valid] =
std::dynamic_pointer_cast<JSONChunk>(chunk)->StringViews(
@ -237,7 +241,8 @@ TEST(chunk, test_json_field) {
DataType::JSON,
true,
std::nullopt);
auto chunk = create_chunk(field_meta, 1, rb_reader);
arrow::ArrayVector array_vec = read_single_column_batches(rb_reader);
auto chunk = create_chunk(field_meta, 1, array_vec);
{
auto [views, valid] =
std::dynamic_pointer_cast<JSONChunk>(chunk)->StringViews(
@ -322,7 +327,8 @@ TEST(chunk, test_null_int64) {
DataType::INT64,
true,
std::nullopt);
auto chunk = create_chunk(field_meta, 1, rb_reader);
arrow::ArrayVector array_vec = read_single_column_batches(rb_reader);
auto chunk = create_chunk(field_meta, 1, array_vec);
auto fixed_chunk = std::dynamic_pointer_cast<FixedWidthChunk>(chunk);
auto span = fixed_chunk->Span();
EXPECT_EQ(span.row_count(), data.size());
@ -382,7 +388,8 @@ TEST(chunk, test_array) {
DataType::STRING,
false,
std::nullopt);
auto chunk = create_chunk(field_meta, 1, rb_reader);
arrow::ArrayVector array_vec = read_single_column_batches(rb_reader);
auto chunk = create_chunk(field_meta, 1, array_vec);
auto [views, valid] =
std::dynamic_pointer_cast<ArrayChunk>(chunk)->Views(std::nullopt);
EXPECT_EQ(views.size(), 1);
@ -444,7 +451,8 @@ TEST(chunk, test_null_array) {
DataType::STRING,
true,
std::nullopt);
auto chunk = create_chunk(field_meta, 1, rb_reader);
arrow::ArrayVector array_vec = read_single_column_batches(rb_reader);
auto chunk = create_chunk(field_meta, 1, array_vec);
auto [views, valid] =
std::dynamic_pointer_cast<ArrayChunk>(chunk)->Views(std::nullopt);
@ -517,7 +525,8 @@ TEST(chunk, test_array_views) {
DataType::STRING,
true,
std::nullopt);
auto chunk = create_chunk(field_meta, 1, rb_reader);
arrow::ArrayVector array_vec = read_single_column_batches(rb_reader);
auto chunk = create_chunk(field_meta, 1, array_vec);
{
auto [views, valid] =
@ -604,7 +613,8 @@ TEST(chunk, test_sparse_float) {
"IP",
false,
std::nullopt);
auto chunk = create_chunk(field_meta, kTestSparseDim, rb_reader);
arrow::ArrayVector array_vec = read_single_column_batches(rb_reader);
auto chunk = create_chunk(field_meta, kTestSparseDim, array_vec);
auto vec = std::dynamic_pointer_cast<SparseFloatVectorChunk>(chunk)->Vec();
for (size_t i = 0; i < n_rows; ++i) {
auto v1 = vec[i];
@ -674,13 +684,15 @@ TEST(chunk, multiple_chunk_mmap) {
std::nullopt);
int file_offset = 0;
auto page_size = sysconf(_SC_PAGESIZE);
auto chunk = create_chunk(field_meta, 1, file, file_offset, rb_reader);
arrow::ArrayVector array_vec = read_single_column_batches(rb_reader);
auto chunk = create_chunk(field_meta, 1, file, file_offset, array_vec);
EXPECT_TRUE(chunk->Size() % page_size == 0);
file_offset += chunk->Size();
std::shared_ptr<::arrow::RecordBatchReader> rb_reader2;
s = arrow_reader->GetRecordBatchReader(&rb_reader2);
EXPECT_TRUE(s.ok());
auto chunk2 = create_chunk(field_meta, 1, file, file_offset, rb_reader2);
arrow::ArrayVector array_vec2 = read_single_column_batches(rb_reader2);
auto chunk2 = create_chunk(field_meta, 1, file, file_offset, array_vec2);
EXPECT_TRUE(chunk->Size() % page_size == 0);
}

View File

@ -9,6 +9,7 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <arrow/util/key_value_metadata.h>
#include <gtest/gtest.h>
#include <algorithm>
#include <cstdint>
@ -26,6 +27,7 @@
#include "index/IndexInfo.h"
#include "index/Meta.h"
#include "knowhere/comp/index_param.h"
#include "milvus-storage/common/constants.h"
#include "mmap/ChunkedColumn.h"
#include "mmap/Types.h"
#include "pb/plan.pb.h"
@ -190,18 +192,47 @@ class TestChunkSegment : public testing::TestWithParam<bool> {
true);
test_data_count = 10000;
auto arrow_i64_field = arrow::field("int64", arrow::int64());
auto arrow_pk_field =
arrow::field("pk", pk_is_string ? arrow::utf8() : arrow::int64());
auto arrow_ts_field = arrow::field("ts", arrow::int64());
auto arrow_str_field = arrow::field("string1", arrow::utf8());
auto arrow_str2_field = arrow::field("string2", arrow::utf8());
auto arrow_i64_field = arrow::field(
"int64",
arrow::int64(),
true,
arrow::key_value_metadata({milvus_storage::ARROW_FIELD_ID_KEY},
{std::to_string(100)}));
auto arrow_pk_field = arrow::field(
"pk",
pk_is_string ? arrow::utf8() : arrow::int64(),
true,
arrow::key_value_metadata({milvus_storage::ARROW_FIELD_ID_KEY},
{std::to_string(101)}));
auto arrow_ts_field = arrow::field(
"ts",
arrow::int64(),
true,
arrow::key_value_metadata({milvus_storage::ARROW_FIELD_ID_KEY},
{std::to_string(1)}));
auto arrow_str_field = arrow::field(
"string1",
arrow::utf8(),
true,
arrow::key_value_metadata({milvus_storage::ARROW_FIELD_ID_KEY},
{std::to_string(102)}));
auto arrow_str2_field = arrow::field(
"string2",
arrow::utf8(),
true,
arrow::key_value_metadata({milvus_storage::ARROW_FIELD_ID_KEY},
{std::to_string(103)}));
std::vector<std::shared_ptr<arrow::Field>> arrow_fields = {
arrow_i64_field,
arrow_pk_field,
arrow_ts_field,
arrow_str2_field,
arrow_str_field,
arrow_str2_field};
arrow_pk_field,
arrow_i64_field,
};
auto expected_arrow_schema =
std::make_shared<arrow::Schema>(arrow_fields);
ASSERT_EQ(schema->ConvertToArrowSchema()->ToString(),
expected_arrow_schema->ToString());
std::vector<FieldId> field_ids = {
int64_fid, pk_fid, TimestampFieldID, str_fid, str2_fid};

View File

@ -332,25 +332,6 @@ TEST(Growing, FillNullableData) {
int64_t dim = 128;
for (int64_t i = 0; i < n_batch; i++) {
auto dataset = DataGen(schema, per_batch);
auto bool_values = dataset.get_col<bool>(bool_field);
auto int8_values = dataset.get_col<int8_t>(int8_field);
auto int16_values = dataset.get_col<int16_t>(int16_field);
auto int32_values = dataset.get_col<int32_t>(int32_field);
auto int64_values = dataset.get_col<int64_t>(int64_field);
auto float_values = dataset.get_col<float>(float_field);
auto double_values = dataset.get_col<double>(double_field);
auto varchar_values = dataset.get_col<std::string>(varchar_field);
auto json_values = dataset.get_col<std::string>(json_field);
auto int_array_values = dataset.get_col<ScalarArray>(int_array_field);
auto long_array_values = dataset.get_col<ScalarArray>(long_array_field);
auto bool_array_values = dataset.get_col<ScalarArray>(bool_array_field);
auto string_array_values =
dataset.get_col<ScalarArray>(string_array_field);
auto double_array_values =
dataset.get_col<ScalarArray>(double_array_field);
auto float_array_values =
dataset.get_col<ScalarArray>(float_array_field);
auto vector_values = dataset.get_col<float>(vec);
auto offset = segment->PreInsert(per_batch);
segment->Insert(offset,

View File

@ -0,0 +1,406 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <folly/Conv.h>
#include <arrow/record_batch.h>
#include <arrow/util/key_value_metadata.h>
#include <gtest/gtest.h>
#include <algorithm>
#include <cstdint>
#include "arrow/table_builder.h"
#include "arrow/type_fwd.h"
#include "common/FieldDataInterface.h"
#include "common/Schema.h"
#include "common/Types.h"
#include "gtest/gtest.h"
#include "milvus-storage/common/constants.h"
#include "milvus-storage/filesystem/fs.h"
#include "milvus-storage/packed/writer.h"
#include "milvus-storage/format/parquet/file_reader.h"
#include "segcore/SegmentGrowing.h"
#include "segcore/SegmentGrowingImpl.h"
#include "segcore/Utils.h"
#include "segcore/memory_planner.h"
#include "test_utils/DataGen.h"
#include "pb/schema.pb.h"
#include <iostream>
#include <memory>
#include <string>
#include <vector>
using namespace milvus;
using namespace milvus::segcore;
namespace pb = milvus::proto;
class TestGrowingStorageV2 : public ::testing::Test {
void
SetUp() override {
auto conf = milvus_storage::ArrowFileSystemConfig();
conf.storage_type = "local";
conf.root_path = path_;
milvus_storage::ArrowFileSystemSingleton::GetInstance().Init(conf);
fs_ = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
SetUpCommonData();
}
void
SetUpCommonData() {
record_batch_ = randomRecordBatch();
table_ = arrow::Table::FromRecordBatches({record_batch_}).ValueOrDie();
schema_ = table_->schema();
}
std::shared_ptr<arrow::RecordBatch>
randomRecordBatch() {
arrow::Int64Builder ts_builder;
arrow::Int64Builder pk_builder;
arrow::StringBuilder str_builder;
ts_values = {rand() % 10000, rand() % 10000, rand() % 10000};
pk_values = {rand() % 10000000, rand() % 10000000, rand() % 10000000};
str_values = {
random_string(10000), random_string(10000), random_string(10000)};
ts_builder.AppendValues(ts_values).ok();
pk_builder.AppendValues(pk_values).ok();
str_builder.AppendValues(str_values).ok();
std::shared_ptr<arrow::Array> ts_array;
std::shared_ptr<arrow::Array> pk_array;
std::shared_ptr<arrow::Array> str_array;
ts_builder.Finish(&ts_array).ok();
pk_builder.Finish(&pk_array).ok();
str_builder.Finish(&str_array).ok();
std::vector<std::shared_ptr<arrow::Array>> arrays = {
ts_array, pk_array, str_array};
auto schema = arrow::schema(
{arrow::field("ts",
arrow::int64(),
true,
arrow::key_value_metadata(
{milvus_storage::ARROW_FIELD_ID_KEY}, {"100"})),
arrow::field("pk",
arrow::int64(),
false,
arrow::key_value_metadata(
{milvus_storage::ARROW_FIELD_ID_KEY}, {"101"})),
arrow::field("str",
arrow::utf8(),
true,
arrow::key_value_metadata(
{milvus_storage::ARROW_FIELD_ID_KEY}, {"102"}))});
return arrow::RecordBatch::Make(schema, 3, arrays);
}
std::string
random_string(size_t length) {
auto randchar = []() -> char {
const char charset[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
const size_t max_index = (sizeof(charset) - 1);
return charset[rand() % max_index];
};
std::string str(length, 0);
std::generate_n(str.begin(), length, randchar);
return str;
}
protected:
milvus_storage::ArrowFileSystemPtr fs_;
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::RecordBatch> record_batch_;
std::shared_ptr<arrow::Table> table_;
std::string path_ = "/tmp";
std::vector<int64_t> ts_values;
std::vector<int64_t> pk_values;
std::vector<std::basic_string<char>> str_values;
};
TEST_F(TestGrowingStorageV2, LoadFieldData) {
int batch_size = 1000;
auto paths = std::vector<std::string>{path_ + "/10000.parquet",
path_ + "/10001.parquet"};
auto column_groups = std::vector<std::vector<int>>{{2}, {0, 1}};
auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig();
milvus_storage::PackedRecordBatchWriter writer(
fs_, paths, schema_, storage_config, column_groups, writer_memory);
for (int i = 0; i < batch_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
}
EXPECT_TRUE(writer.Close().ok());
auto schema = std::make_shared<milvus::Schema>();
auto ts_fid = schema->AddDebugField("ts", milvus::DataType::INT64, true);
auto pk_fid = schema->AddDebugField("pk", milvus::DataType::INT64, false);
auto str_fid =
schema->AddDebugField("str", milvus::DataType::VARCHAR, true);
schema->set_primary_field_id(pk_fid);
auto segment = milvus::segcore::CreateGrowingSegment(
schema, milvus::segcore::empty_index_meta);
LoadFieldDataInfo load_info;
load_info.field_infos = {
{0,
FieldBinlogInfo{0,
3000,
std::vector<int64_t>{3000},
false,
std::vector<std::string>{paths[0]}}},
{1,
FieldBinlogInfo{1,
3000,
std::vector<int64_t>{3000},
false,
std::vector<std::string>{paths[1]}}},
};
load_info.storage_version = 2;
segment->LoadFieldData(load_info);
}
TEST_F(TestGrowingStorageV2, LoadWithStrategy) {
int batch_size = 1000;
auto paths = std::vector<std::string>{path_ + "/10000.parquet",
path_ + "/10001.parquet"};
auto column_groups = std::vector<std::vector<int>>{{2}, {0, 1}};
auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig();
milvus_storage::PackedRecordBatchWriter writer(
fs_, paths, schema_, storage_config, column_groups, writer_memory);
for (int i = 0; i < batch_size; ++i) {
EXPECT_TRUE(writer.Write(record_batch_).ok());
}
EXPECT_TRUE(writer.Close().ok());
auto channel = std::make_shared<milvus::ArrowReaderChannel>();
int64_t memory_limit = 1024 * 1024 * 1024; // 1GB
uint64_t parallel_degree = 2;
// read all row groups
auto fr = std::make_shared<milvus_storage::FileRowGroupReader>(
fs_, paths[0], schema_);
auto row_group_metadata = fr->file_metadata()->GetRowGroupMetadataVector();
std::vector<int64_t> row_groups(row_group_metadata.size());
std::iota(row_groups.begin(), row_groups.end(), 0);
std::vector<std::vector<int64_t>> row_group_lists = {row_groups};
// Test MemoryBasedSplitStrategy
{
auto strategy =
std::make_unique<MemoryBasedSplitStrategy>(row_group_metadata);
milvus::segcore::LoadWithStrategy({paths[0]},
channel,
memory_limit,
std::move(strategy),
row_group_lists);
// Verify each batch matches row group metadata
std::shared_ptr<milvus::ArrowDataWrapper> wrapper;
int64_t total_rows = 0;
int64_t current_row_group = 0;
while (channel->pop(wrapper)) {
for (const auto& table : wrapper->arrow_tables) {
// Verify batch size matches row group metadata
EXPECT_EQ(table->num_rows(),
row_group_metadata.Get(current_row_group).row_num());
total_rows += table->num_rows();
current_row_group++;
}
}
// Verify total rows match sum of all row groups
int64_t expected_total_rows = 0;
for (size_t i = 0; i < row_group_metadata.size(); ++i) {
expected_total_rows += row_group_metadata.Get(i).row_num();
}
EXPECT_EQ(total_rows, expected_total_rows);
}
// Test ParallelDegreeSplitStrategy
{
channel = std::make_shared<milvus::ArrowReaderChannel>();
auto strategy =
std::make_unique<ParallelDegreeSplitStrategy>(parallel_degree);
milvus::segcore::LoadWithStrategy({paths[0]},
channel,
memory_limit,
std::move(strategy),
row_group_lists);
std::shared_ptr<milvus::ArrowDataWrapper> wrapper;
int64_t total_rows = 0;
int64_t current_row_group = 0;
while (channel->pop(wrapper)) {
for (const auto& table : wrapper->arrow_tables) {
// Verify batch size matches row group metadata
EXPECT_EQ(table->num_rows(),
row_group_metadata.Get(current_row_group).row_num());
total_rows += table->num_rows();
current_row_group++;
}
}
// Verify total rows match sum of all row groups
int64_t expected_total_rows = 0;
for (size_t i = 0; i < row_group_metadata.size(); ++i) {
expected_total_rows += row_group_metadata.Get(i).row_num();
}
EXPECT_EQ(total_rows, expected_total_rows);
// Test with non-continuous row groups
channel = std::make_shared<milvus::ArrowReaderChannel>();
row_group_lists = {{0, 2}}; // Skip middle row group
strategy =
std::make_unique<ParallelDegreeSplitStrategy>(parallel_degree);
milvus::segcore::LoadWithStrategy({paths[0]},
channel,
memory_limit,
std::move(strategy),
row_group_lists);
total_rows = 0;
current_row_group = 0;
std::vector<int64_t> selected_row_groups = {0, 2};
while (channel->pop(wrapper)) {
for (const auto& table : wrapper->arrow_tables) {
EXPECT_EQ(table->num_rows(),
row_group_metadata
.Get(selected_row_groups[current_row_group])
.row_num());
total_rows += table->num_rows();
current_row_group++;
}
}
// Verify total rows match sum of selected row groups
expected_total_rows = 0;
for (int64_t rg : selected_row_groups) {
expected_total_rows += row_group_metadata.Get(rg).row_num();
}
EXPECT_EQ(total_rows, expected_total_rows);
}
}
TEST_F(TestGrowingStorageV2, TestAllDataTypes) {
auto schema = std::make_shared<milvus::Schema>();
auto bool_field =
schema->AddDebugField("bool", milvus::DataType::BOOL, true);
auto int8_field =
schema->AddDebugField("int8", milvus::DataType::INT8, true);
auto int16_field =
schema->AddDebugField("int16", milvus::DataType::INT16, true);
auto int32_field =
schema->AddDebugField("int32", milvus::DataType::INT32, true);
auto int64_field = schema->AddDebugField("int64", milvus::DataType::INT64);
auto float_field =
schema->AddDebugField("float", milvus::DataType::FLOAT, true);
auto double_field =
schema->AddDebugField("double", milvus::DataType::DOUBLE, true);
auto varchar_field =
schema->AddDebugField("varchar", milvus::DataType::VARCHAR, true);
auto json_field =
schema->AddDebugField("json", milvus::DataType::JSON, true);
auto int_array_field = schema->AddDebugField(
"int_array", milvus::DataType::ARRAY, milvus::DataType::INT8, true);
auto long_array_field = schema->AddDebugField(
"long_array", milvus::DataType::ARRAY, milvus::DataType::INT64, true);
auto bool_array_field = schema->AddDebugField(
"bool_array", milvus::DataType::ARRAY, milvus::DataType::BOOL, true);
auto string_array_field = schema->AddDebugField("string_array",
milvus::DataType::ARRAY,
milvus::DataType::VARCHAR,
true);
auto double_array_field = schema->AddDebugField("double_array",
milvus::DataType::ARRAY,
milvus::DataType::DOUBLE,
true);
auto float_array_field = schema->AddDebugField(
"float_array", milvus::DataType::ARRAY, milvus::DataType::FLOAT, true);
auto vec = schema->AddDebugField("embeddings",
milvus::DataType::VECTOR_FLOAT,
128,
knowhere::metric::L2);
schema->set_primary_field_id(int64_field);
std::map<std::string, std::string> index_params = {
{"index_type", "IVF_FLAT"},
{"metric_type", knowhere::metric::L2},
{"nlist", "128"}};
std::map<std::string, std::string> type_params = {{"dim", "128"}};
FieldIndexMeta fieldIndexMeta(
vec, std::move(index_params), std::move(type_params));
auto config = SegcoreConfig::default_config();
config.set_chunk_rows(1024);
config.set_enable_interim_segment_index(true);
std::map<FieldId, FieldIndexMeta> filedMap = {{vec, fieldIndexMeta}};
IndexMetaPtr metaPtr =
std::make_shared<CollectionIndexMeta>(100000, std::move(filedMap));
auto segment_growing = CreateGrowingSegment(schema, metaPtr, 1, config);
auto segment = dynamic_cast<SegmentGrowingImpl*>(segment_growing.get());
int64_t per_batch = 1000;
int64_t n_batch = 3;
int64_t dim = 128;
// Write data to storage v2
auto paths = std::vector<std::string>{path_ + "/19530.parquet",
path_ + "/19531.parquet"};
auto column_groups = std::vector<std::vector<int>>{
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14}, {15}};
auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig();
auto arrow_schema = schema->ConvertToArrowSchema();
milvus_storage::PackedRecordBatchWriter writer(
fs_, paths, arrow_schema, storage_config, column_groups, writer_memory);
int64_t total_rows = 0;
for (int64_t i = 0; i < n_batch; i++) {
auto dataset = DataGen(schema, per_batch);
auto record_batch =
ConvertToArrowRecordBatch(dataset, dim, arrow_schema);
total_rows += record_batch->num_rows();
EXPECT_TRUE(writer.Write(record_batch).ok());
}
EXPECT_TRUE(writer.Close().ok());
// Load data back from storage v2
LoadFieldDataInfo load_info;
load_info.field_infos = {
{0,
FieldBinlogInfo{0,
total_rows,
std::vector<int64_t>{total_rows},
false,
std::vector<std::string>{paths[0]}}},
{1,
FieldBinlogInfo{1,
total_rows,
std::vector<int64_t>{total_rows},
false,
std::vector<std::string>{paths[1]}}},
};
load_info.storage_version = 2;
segment->LoadFieldData(load_info);
}

View File

@ -0,0 +1,171 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <cstddef>
#include "milvus-storage/common/metadata.h"
#include "segcore/memory_planner.h"
#include <gtest/gtest.h>
#include <vector>
using namespace milvus::segcore;
milvus_storage::RowGroupMetadataVector
CreateRowGroupMetadataVector(const std::vector<int64_t>& memory_sizes_mb) {
std::vector<milvus_storage::RowGroupMetadata> metadatas;
for (auto size : memory_sizes_mb) {
metadatas.emplace_back(
size * 1024 * 1024, 0, 0); // Convert MB to bytes
}
return milvus_storage::RowGroupMetadataVector(metadatas);
}
TEST(MemoryBasedSplitStrategy, EmptyInput) {
std::vector<int64_t> empty_input;
auto metadatas = CreateRowGroupMetadataVector({});
auto strategy = std::make_unique<MemoryBasedSplitStrategy>(metadatas);
auto result = strategy->split(empty_input);
EXPECT_TRUE(result.empty());
}
TEST(MemoryBasedSplitStrategy, SingleRowGroup) {
std::vector<int64_t> input = {0};
auto metadatas = CreateRowGroupMetadataVector({12}); // 12MB
auto strategy = std::make_unique<MemoryBasedSplitStrategy>(metadatas);
auto result = strategy->split(input);
EXPECT_EQ(result.size(), 1);
EXPECT_EQ(result[0], (RowGroupBlock{0, 1}));
}
TEST(MemoryBasedSplitStrategy, ContinuousWithinMemoryLimit) {
std::vector<int64_t> input = {0, 1, 2, 3};
auto metadatas = CreateRowGroupMetadataVector({
2, 2, 2, 2 // Total 8MB, well within the 16MB limit
});
auto strategy = std::make_unique<MemoryBasedSplitStrategy>(metadatas);
auto result = strategy->split(input);
EXPECT_EQ(result.size(), 1);
EXPECT_EQ(result[0], (RowGroupBlock{0, 4}));
}
TEST(MemoryBasedSplitStrategy, ContinuousExceedMemoryLimit) {
std::vector<int64_t> input = {0, 1, 2, 3, 4};
auto metadatas = CreateRowGroupMetadataVector({
8, 6, 4, 5, 3 // Different sizes: 8MB, 6MB, 4MB, 5MB, 3MB
});
auto strategy = std::make_unique<MemoryBasedSplitStrategy>(metadatas);
auto result = strategy->split(input);
EXPECT_EQ(result.size(), 2);
EXPECT_EQ(result[0], (RowGroupBlock{0, 2}));
EXPECT_EQ(result[1], (RowGroupBlock{2, 3}));
}
TEST(MemoryBasedSplitStrategy, NonContinuous_SmallGap) {
std::vector<int64_t> input = {0, 2, 4, 6};
auto metadatas = CreateRowGroupMetadataVector({
5, 3, 4, 6, 2, 7, 4 // Different sizes for each row group
});
auto strategy = std::make_unique<MemoryBasedSplitStrategy>(metadatas);
auto result = strategy->split(input);
EXPECT_EQ(result.size(), 4);
EXPECT_EQ(result[0], (RowGroupBlock{0, 1}));
EXPECT_EQ(result[1], (RowGroupBlock{2, 1}));
EXPECT_EQ(result[2], (RowGroupBlock{4, 1}));
EXPECT_EQ(result[3], (RowGroupBlock{6, 1}));
}
TEST(MemoryBasedSplitStrategy, Mixed_ContinuousAndNonContinuous) {
std::vector<int64_t> input = {0, 1, 3, 4, 5, 7};
auto metadatas = CreateRowGroupMetadataVector({
4, 3, 2, 5, 4, 3, 6, 4 // Different sizes for each row group
});
auto strategy = std::make_unique<MemoryBasedSplitStrategy>(metadatas);
auto result = strategy->split(input);
EXPECT_EQ(result.size(), 3);
EXPECT_EQ(result[0], (RowGroupBlock{0, 2}));
EXPECT_EQ(result[1], (RowGroupBlock{3, 3}));
EXPECT_EQ(result[2], (RowGroupBlock{7, 1}));
}
TEST(MemoryBasedSplitStrategy, EdgeCase_SingleBlock) {
std::vector<int64_t> input = {0, 1, 2, 3, 4};
auto metadatas = CreateRowGroupMetadataVector({
2, 3, 4, 3, 2 // Different sizes for each row group
});
auto strategy = std::make_unique<MemoryBasedSplitStrategy>(metadatas);
auto result = strategy->split(input);
EXPECT_EQ(result.size(), 1);
EXPECT_EQ(result[0], (RowGroupBlock{0, 5}));
}
TEST(MemoryBasedSplitStrategy, EdgeCase_EachRowGroupSeparate) {
std::vector<int64_t> input = {0, 2, 4, 6, 8};
auto metadatas = CreateRowGroupMetadataVector({
7, 5, 6, 4, 8, 3, 9, 5, 6 // Different sizes for each row group
});
auto strategy = std::make_unique<MemoryBasedSplitStrategy>(metadatas);
auto result = strategy->split(input);
EXPECT_EQ(result.size(), 5);
for (size_t i = 0; i < result.size(); ++i) {
EXPECT_EQ(result[i], (RowGroupBlock{input[i], 1}));
}
}
TEST(ParallelDegreeSplitStrategy, Basic) {
std::vector<int64_t> input = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
auto strategy = std::make_unique<ParallelDegreeSplitStrategy>(3);
auto blocks = strategy->split(input);
EXPECT_EQ(blocks.size(), 3);
EXPECT_EQ(blocks[0], (RowGroupBlock{0, 4}));
EXPECT_EQ(blocks[1], (RowGroupBlock{4, 4}));
EXPECT_EQ(blocks[2], (RowGroupBlock{8, 2}));
}
TEST(ParallelDegreeSplitStrategy, LargerThanInput) {
std::vector<int64_t> input = {0, 1, 2, 3, 4};
auto strategy = std::make_unique<ParallelDegreeSplitStrategy>(10);
auto blocks = strategy->split(input);
EXPECT_EQ(blocks.size(), 1);
EXPECT_EQ(blocks[0], (RowGroupBlock{0, 5}));
}
TEST(ParallelDegreeSplitStrategy, Empty) {
std::vector<int64_t> input;
auto strategy = std::make_unique<ParallelDegreeSplitStrategy>(3);
auto blocks = strategy->split(input);
EXPECT_TRUE(blocks.empty());
}
TEST(ParallelDegreeSplitStrategy, MixedContinuousAndNonContinuous) {
std::vector<int64_t> input = {1, 2, 3, 5, 6, 7, 9, 10};
auto strategy = std::make_unique<ParallelDegreeSplitStrategy>(2);
auto blocks = strategy->split(input);
EXPECT_EQ(blocks.size(), 3); // Three continuous blocks
EXPECT_EQ(blocks[0], (RowGroupBlock{1, 3}));
EXPECT_EQ(blocks[1], (RowGroupBlock{5, 3}));
EXPECT_EQ(blocks[2], (RowGroupBlock{9, 2}));
}
TEST(ParallelDegreeSplitStrategy, ContinuousExceedingAvgSize) {
std::vector<int64_t> input = {1, 2, 3, 4, 5, 6, 7, 8};
auto strategy = std::make_unique<ParallelDegreeSplitStrategy>(2);
auto blocks = strategy->split(input);
EXPECT_EQ(blocks.size(), 2);
EXPECT_EQ(blocks[0], (RowGroupBlock{1, 4}));
EXPECT_EQ(blocks[1], (RowGroupBlock{5, 4}));
}

View File

@ -0,0 +1,115 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include <arrow/api.h>
#include "common/Types.h"
using namespace milvus;
TEST(GetArrowDataTypeTest, BOOL) {
auto result = GetArrowDataType(DataType::BOOL);
ASSERT_TRUE(result->Equals(arrow::boolean()));
}
TEST(GetArrowDataTypeTest, INT8) {
auto result = GetArrowDataType(DataType::INT8);
ASSERT_TRUE(result->Equals(arrow::int8()));
}
TEST(GetArrowDataTypeTest, INT16) {
auto result = GetArrowDataType(DataType::INT16);
ASSERT_TRUE(result->Equals(arrow::int16()));
}
TEST(GetArrowDataTypeTest, INT32) {
auto result = GetArrowDataType(DataType::INT32);
ASSERT_TRUE(result->Equals(arrow::int32()));
}
TEST(GetArrowDataTypeTest, INT64) {
auto result = GetArrowDataType(DataType::INT64);
ASSERT_TRUE(result->Equals(arrow::int64()));
}
TEST(GetArrowDataTypeTest, FLOAT) {
auto result = GetArrowDataType(DataType::FLOAT);
ASSERT_TRUE(result->Equals(arrow::float32()));
}
TEST(GetArrowDataTypeTest, DOUBLE) {
auto result = GetArrowDataType(DataType::DOUBLE);
ASSERT_TRUE(result->Equals(arrow::float64()));
}
TEST(GetArrowDataTypeTest, STRING_TYPES) {
auto result1 = GetArrowDataType(DataType::STRING);
auto result2 = GetArrowDataType(DataType::VARCHAR);
auto result3 = GetArrowDataType(DataType::TEXT);
ASSERT_TRUE(result1->Equals(arrow::utf8()));
ASSERT_TRUE(result2->Equals(arrow::utf8()));
ASSERT_TRUE(result3->Equals(arrow::utf8()));
}
TEST(GetArrowDataTypeTest, ARRAY_JSON) {
auto result1 = GetArrowDataType(DataType::ARRAY);
auto result2 = GetArrowDataType(DataType::JSON);
ASSERT_TRUE(result1->Equals(arrow::binary()));
ASSERT_TRUE(result2->Equals(arrow::binary()));
}
TEST(GetArrowDataTypeTest, VECTOR_FLOAT) {
int dim = 3;
auto result = GetArrowDataType(DataType::VECTOR_FLOAT, dim);
ASSERT_TRUE(result->Equals(arrow::fixed_size_binary(dim * 4)));
}
TEST(GetArrowDataTypeTest, VECTOR_BINARY) {
int dim = 5;
auto result = GetArrowDataType(DataType::VECTOR_BINARY, dim);
ASSERT_TRUE(result->Equals(arrow::fixed_size_binary((dim + 7) / 8)));
}
TEST(GetArrowDataTypeTest, VECTOR_FLOAT16) {
int dim = 2;
auto result = GetArrowDataType(DataType::VECTOR_FLOAT16, dim);
ASSERT_TRUE(result->Equals(arrow::fixed_size_binary(dim * 2)));
}
TEST(GetArrowDataTypeTest, VECTOR_BFLOAT16) {
int dim = 2;
auto result = GetArrowDataType(DataType::VECTOR_BFLOAT16, dim);
ASSERT_TRUE(result->Equals(arrow::fixed_size_binary(dim * 2)));
}
TEST(GetArrowDataTypeTest, VECTOR_SPARSE_FLOAT) {
auto result = GetArrowDataType(DataType::VECTOR_SPARSE_FLOAT);
ASSERT_TRUE(result->Equals(arrow::binary()));
}
TEST(GetArrowDataTypeTest, VECTOR_INT8) {
int dim = 4;
auto result = GetArrowDataType(DataType::VECTOR_INT8, dim);
ASSERT_TRUE(result->Equals(arrow::fixed_size_binary(dim)));
}
TEST(GetArrowDataTypeTest, InvalidDataType) {
EXPECT_THROW(
GetArrowDataType(static_cast<DataType>(999)),
std::runtime_error); // Assuming PanicInfo throws a std::runtime_error
}

View File

@ -43,6 +43,8 @@
#include "storage/PayloadWriter.h"
#include "segcore/ChunkedSegmentSealedImpl.h"
#include "storage/Util.h"
#include "milvus-storage/common/constants.h"
using boost::algorithm::starts_with;
@ -1383,4 +1385,46 @@ NewCollection(const milvus::proto::schema::CollectionSchema* schema,
return (void*)collection.release();
}
inline std::shared_ptr<arrow::RecordBatch>
ConvertToArrowRecordBatch(const GeneratedData& dataset,
int64_t dim,
const std::shared_ptr<arrow::Schema>& arrow_schema) {
if (!dataset.raw_ || dataset.raw_->fields_data_size() == 0) {
return nullptr;
}
// Create a map of field_id to field_data for quick lookup
std::unordered_map<int64_t, const DataArray*> field_data_map;
for (const auto& field_data : dataset.raw_->fields_data()) {
field_data_map[field_data.field_id()] = &field_data;
}
// Create arrays in the order specified by arrow_schema
std::vector<std::shared_ptr<arrow::Array>> arrays;
for (int i = 0; i < arrow_schema->num_fields(); ++i) {
const auto& field = arrow_schema->field(i);
auto field_id = std::stol(field->metadata()
->Get(milvus_storage::ARROW_FIELD_ID_KEY)
.ValueOrDie());
auto it = field_data_map.find(field_id);
if (it != field_data_map.end()) {
auto field_meta = dataset.schema_->operator[](FieldId(field_id));
auto field_data_ptr = CreateFieldDataFromDataArray(
dataset.raw_->num_rows(), it->second, field_meta);
auto arrow_data_wrapper =
storage::ConvertFieldDataToArrowDataWrapper(field_data_ptr);
std::shared_ptr<arrow::RecordBatch> batch;
arrow_data_wrapper->reader->ReadNext(&batch);
if (batch) {
arrays.push_back(batch->column(0));
}
} else {
std::cout << "field_id: " << field_id << " not found" << std::endl;
}
}
return arrow::RecordBatch::Make(
arrow_schema, dataset.raw_->num_rows(), arrays);
}
} // namespace milvus::segcore