From 863e26969a03ac61b4fa1b753c506bedb8aabc29 Mon Sep 17 00:00:00 2001 From: yah01 Date: Mon, 6 Nov 2023 17:48:19 +0800 Subject: [PATCH] Reduce 1x memory copy for retrieving data (#28106) Signed-off-by: yah01 --- .../core/src/segcore/SegmentSealedImpl.cpp | 99 ++++++++++++++----- internal/core/src/segcore/Utils.cpp | 4 + internal/core/unittest/test_retrieve.cpp | 46 +++++++++ 3 files changed, 122 insertions(+), 27 deletions(-) diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index f69b46ea75..4dc427506a 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -1032,7 +1032,6 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id, // we have to clone the shared pointer, // to make sure it won't get released if segment released auto column = fields_.at(field_id); - if (datatype_is_variable(field_meta.get_data_type())) { switch (field_meta.get_data_type()) { case DataType::VARCHAR: @@ -1071,10 +1070,15 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id, auto src_vec = column->Data(); switch (field_meta.get_data_type()) { case DataType::BOOL: { - FixedVector output(count); - bulk_subscript_impl( - src_vec, seg_offsets, count, output.data()); - return CreateScalarDataArrayFrom(output.data(), count, field_meta); + auto ret = fill_with_empty(field_id, count); + bulk_subscript_impl(src_vec, + seg_offsets, + count, + ret->mutable_scalars() + ->mutable_bool_data() + ->mutable_data() + ->mutable_data()); + return ret; } case DataType::INT8: { FixedVector output(count); @@ -1089,40 +1093,81 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id, return CreateScalarDataArrayFrom(output.data(), count, field_meta); } case DataType::INT32: { - FixedVector output(count); - bulk_subscript_impl( - src_vec, seg_offsets, count, output.data()); - return CreateScalarDataArrayFrom(output.data(), count, field_meta); + auto ret = fill_with_empty(field_id, count); + bulk_subscript_impl(src_vec, + seg_offsets, + count, + ret->mutable_scalars() + ->mutable_int_data() + ->mutable_data() + ->mutable_data()); + return ret; } case DataType::INT64: { - FixedVector output(count); - bulk_subscript_impl( - src_vec, seg_offsets, count, output.data()); - return CreateScalarDataArrayFrom(output.data(), count, field_meta); + auto ret = fill_with_empty(field_id, count); + bulk_subscript_impl(src_vec, + seg_offsets, + count, + ret->mutable_scalars() + ->mutable_long_data() + ->mutable_data() + ->mutable_data()); + return ret; } case DataType::FLOAT: { - FixedVector output(count); - bulk_subscript_impl( - src_vec, seg_offsets, count, output.data()); - return CreateScalarDataArrayFrom(output.data(), count, field_meta); + auto ret = fill_with_empty(field_id, count); + bulk_subscript_impl(src_vec, + seg_offsets, + count, + ret->mutable_scalars() + ->mutable_float_data() + ->mutable_data() + ->mutable_data()); + return ret; } case DataType::DOUBLE: { - FixedVector output(count); - bulk_subscript_impl( - src_vec, seg_offsets, count, output.data()); - return CreateScalarDataArrayFrom(output.data(), count, field_meta); + auto ret = fill_with_empty(field_id, count); + bulk_subscript_impl(src_vec, + seg_offsets, + count, + ret->mutable_scalars() + ->mutable_double_data() + ->mutable_data() + ->mutable_data()); + return ret; } - case DataType::VECTOR_FLOAT: - case DataType::VECTOR_FLOAT16: - case DataType::VECTOR_BINARY: { - aligned_vector output(field_meta.get_sizeof() * count); + case DataType::VECTOR_FLOAT: { + auto ret = fill_with_empty(field_id, count); bulk_subscript_impl(field_meta.get_sizeof(), src_vec, seg_offsets, count, - output.data()); - return CreateVectorDataArrayFrom(output.data(), count, field_meta); + ret->mutable_vectors() + ->mutable_float_vector() + ->mutable_data() + ->mutable_data()); + return ret; + } + case DataType::VECTOR_FLOAT16: { + auto ret = fill_with_empty(field_id, count); + bulk_subscript_impl( + field_meta.get_sizeof(), + src_vec, + seg_offsets, + count, + ret->mutable_vectors()->mutable_float16_vector()->data()); + return ret; + } + case DataType::VECTOR_BINARY: { + auto ret = fill_with_empty(field_id, count); + bulk_subscript_impl( + field_meta.get_sizeof(), + src_vec, + seg_offsets, + count, + ret->mutable_vectors()->mutable_binary_vector()->data()); + return ret; } default: { diff --git a/internal/core/src/segcore/Utils.cpp b/internal/core/src/segcore/Utils.cpp index 7928378b05..0024e657c4 100644 --- a/internal/core/src/segcore/Utils.cpp +++ b/internal/core/src/segcore/Utils.cpp @@ -279,6 +279,8 @@ CreateScalarDataArray(int64_t count, const FieldMeta& field_meta) { case DataType::ARRAY: { auto obj = scalar_array->mutable_array_data(); obj->mutable_data()->Reserve(count); + obj->set_element_type(static_cast( + field_meta.get_element_type())); for (int i = 0; i < count; i++) { *(obj->mutable_data()->Add()) = proto::schema::ScalarField(); } @@ -406,6 +408,8 @@ CreateScalarDataArrayFrom(const void* data_raw, case DataType::ARRAY: { auto data = reinterpret_cast(data_raw); auto obj = scalar_array->mutable_array_data(); + obj->set_element_type(static_cast( + field_meta.get_element_type())); for (auto i = 0; i < count; i++) { *(obj->mutable_data()->Add()) = data[i]; } diff --git a/internal/core/unittest/test_retrieve.cpp b/internal/core/unittest/test_retrieve.cpp index 7bef70ea40..bac9e76b4c 100644 --- a/internal/core/unittest/test_retrieve.cpp +++ b/internal/core/unittest/test_retrieve.cpp @@ -11,6 +11,8 @@ #include +#include "common/Types.h" +#include "knowhere/comp/index_param.h" #include "query/Expr.h" #include "query/ExprImpl.h" #include "segcore/ScalarIndex.h" @@ -291,6 +293,50 @@ TEST(Retrieve, Limit) { Assert(field2.vectors().float_vector().data_size() == N * DIM); } +TEST(Retrieve, FillEntry) { + auto schema = std::make_shared(); + auto fid_64 = schema->AddDebugField("i64", DataType::INT64); + auto DIM = 16; + auto fid_bool = schema->AddDebugField("bool", DataType::BOOL); + auto fid_f32 = schema->AddDebugField("f32", DataType::FLOAT); + auto fid_f64 = schema->AddDebugField("f64", DataType::DOUBLE); + auto fid_vec32 = schema->AddDebugField( + "vector_32", DataType::VECTOR_FLOAT, DIM, knowhere::metric::L2); + auto fid_vecbin = schema->AddDebugField( + "vec_bin", DataType::VECTOR_BINARY, DIM, knowhere::metric::L2); + schema->set_primary_field_id(fid_64); + + int64_t N = 101; + auto dataset = DataGen(schema, N, 42); + auto segment = CreateSealedSegment(schema); + SealedLoadFieldData(dataset, *segment); + + auto plan = std::make_unique(*schema); + auto term_expr = std::make_unique>( + milvus::query::ColumnInfo( + fid_64, DataType::INT64, std::vector()), + OpType::GreaterEqual, + 0, + proto::plan::GenericValue::kInt64Val); + plan->plan_node_ = std::make_unique(); + plan->plan_node_->predicate_ = std::move(term_expr); + + // test query results exceed the limit size + std::vector target_fields{TimestampFieldID, + fid_64, + fid_bool, + fid_f32, + fid_f64, + fid_vec32, + fid_vecbin}; + plan->field_ids_ = target_fields; + EXPECT_THROW(segment->Retrieve(plan.get(), N, 1), std::runtime_error); + + auto retrieve_results = + segment->Retrieve(plan.get(), N, DEFAULT_MAX_OUTPUT_SIZE); + Assert(retrieve_results->fields_data_size() == target_fields.size()); +} + TEST(Retrieve, LargeTimestamp) { auto schema = std::make_shared(); auto fid_64 = schema->AddDebugField("i64", DataType::INT64);