milvus/internal/core/unittest/test_retrieve.cpp
yah01 c92e6b654f
Reduce 1x memory copy for retrieving data (#28106) (#28217)
/kind improvement
Before this, while retrieving data (query/search), we first copy the
data into a fixed vector, and then copy data from this into the proto
field.
Now we can directly copy the data into the proto field.

This optimization can't be done with int8, int16 due to the proto
doesn't provide the two types, we store them in int32s

Also, this can't be done with variable length field like string, JSON,
see https://github.com/protocolbuffers/protobuf/issues/10866. I tried
but it seems proto doesn't guarantee the memory layout as we expected,
it crashed
pr: #28106

Signed-off-by: yah01 <yah2er0ne@outlook.com>
2023-11-15 10:50:20 +08:00

514 lines
19 KiB
C++

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <gtest/gtest.h>
#include "common/Types.h"
#include "knowhere/comp/index_param.h"
#include "query/Expr.h"
#include "query/ExprImpl.h"
#include "segcore/ScalarIndex.h"
#include "test_utils/DataGen.h"
using namespace milvus;
using namespace milvus::segcore;
std::unique_ptr<proto::segcore::RetrieveResults>
RetrieveUsingDefaultOutputSize(SegmentInterface* segment,
const query::RetrievePlan* plan,
Timestamp timestamp) {
return segment->Retrieve(plan, timestamp, DEFAULT_MAX_OUTPUT_SIZE);
}
TEST(Retrieve, ScalarIndex) {
SUCCEED();
auto index = std::make_unique<ScalarIndexVector>();
std::vector<int64_t> data;
int N = 1000;
auto req_ids = std::make_unique<IdArray>();
auto req_ids_arr = req_ids->mutable_int_id();
for (int i = 0; i < N; ++i) {
data.push_back(i * 3 % N);
req_ids_arr->add_data(i);
}
index->append_data(data.data(), N, SegOffset(10000));
index->build();
auto [res_ids, res_offsets] = index->do_search_ids(*req_ids);
auto res_ids_arr = res_ids->int_id();
for (int i = 0; i < N; ++i) {
auto res_offset = res_offsets[i].get() - 10000;
auto res_id = res_ids_arr.data(i);
auto std_id = (res_offset * 3 % N);
ASSERT_EQ(res_id, std_id);
}
}
TEST(Retrieve, AutoID) {
auto schema = std::make_shared<Schema>();
auto fid_64 = schema->AddDebugField("i64", DataType::INT64);
auto DIM = 16;
auto fid_vec = schema->AddDebugField(
"vector_64", DataType::VECTOR_FLOAT, DIM, knowhere::metric::L2);
schema->set_primary_field_id(fid_64);
int64_t N = 100;
int64_t req_size = 10;
auto choose = [=](int i) { return i * 3 % N; };
auto dataset = DataGen(schema, N);
auto segment = CreateSealedSegment(schema);
SealedLoadFieldData(dataset, *segment);
auto i64_col = dataset.get_col<int64_t>(fid_64);
auto plan = std::make_unique<query::RetrievePlan>(*schema);
std::vector<int64_t> values;
for (int i = 0; i < req_size; ++i) {
values.emplace_back(i64_col[choose(i)]);
}
auto term_expr = std::make_unique<query::TermExprImpl<int64_t>>(
milvus::query::ColumnInfo(
fid_64, DataType::INT64, std::vector<std::string>()),
values,
proto::plan::GenericValue::kInt64Val);
plan->plan_node_ = std::make_unique<query::RetrievePlanNode>();
plan->plan_node_->predicate_ = std::move(term_expr);
std::vector<FieldId> target_fields_id{fid_64, fid_vec};
plan->field_ids_ = target_fields_id;
auto retrieve_results =
RetrieveUsingDefaultOutputSize(segment.get(), plan.get(), 100);
Assert(retrieve_results->fields_data_size() == target_fields_id.size());
auto field0 = retrieve_results->fields_data(0);
Assert(field0.has_scalars());
auto field0_data = field0.scalars().long_data();
for (int i = 0; i < req_size; ++i) {
auto index = choose(i);
auto data = field0_data.data(i);
}
for (int i = 0; i < req_size; ++i) {
auto index = choose(i);
auto data = field0_data.data(i);
ASSERT_EQ(data, i64_col[index]);
}
auto field1 = retrieve_results->fields_data(1);
Assert(field1.has_vectors());
auto field1_data = field1.vectors().float_vector();
ASSERT_EQ(field1_data.data_size(), DIM * req_size);
}
TEST(Retrieve, AutoID2) {
auto schema = std::make_shared<Schema>();
auto fid_64 = schema->AddDebugField("i64", DataType::INT64);
auto DIM = 16;
auto fid_vec = schema->AddDebugField(
"vector_64", DataType::VECTOR_FLOAT, DIM, knowhere::metric::L2);
schema->set_primary_field_id(fid_64);
int64_t N = 100;
int64_t req_size = 10;
auto choose = [=](int i) { return i * 3 % N; };
auto dataset = DataGen(schema, N);
auto segment = CreateSealedSegment(schema);
SealedLoadFieldData(dataset, *segment);
auto i64_col = dataset.get_col<int64_t>(fid_64);
auto plan = std::make_unique<query::RetrievePlan>(*schema);
std::vector<int64_t> values;
for (int i = 0; i < req_size; ++i) {
values.emplace_back(i64_col[choose(i)]);
}
auto term_expr = std::make_unique<query::TermExprImpl<int64_t>>(
milvus::query::ColumnInfo(
fid_64, DataType::INT64, std::vector<std::string>()),
values,
proto::plan::GenericValue::kInt64Val);
plan->plan_node_ = std::make_unique<query::RetrievePlanNode>();
plan->plan_node_->predicate_ = std::move(term_expr);
std::vector<FieldId> target_offsets{fid_64, fid_vec};
plan->field_ids_ = target_offsets;
auto retrieve_results =
RetrieveUsingDefaultOutputSize(segment.get(), plan.get(), 100);
Assert(retrieve_results->fields_data_size() == target_offsets.size());
auto field0 = retrieve_results->fields_data(0);
Assert(field0.has_scalars());
auto field0_data = field0.scalars().long_data();
for (int i = 0; i < req_size; ++i) {
auto index = choose(i);
auto data = field0_data.data(i);
ASSERT_EQ(data, i64_col[index]);
}
auto field1 = retrieve_results->fields_data(1);
Assert(field1.has_vectors());
auto field1_data = field1.vectors().float_vector();
ASSERT_EQ(field1_data.data_size(), DIM * req_size);
}
TEST(Retrieve, NotExist) {
auto schema = std::make_shared<Schema>();
auto fid_64 = schema->AddDebugField("i64", DataType::INT64);
auto DIM = 16;
auto fid_vec = schema->AddDebugField(
"vector_64", DataType::VECTOR_FLOAT, DIM, knowhere::metric::L2);
schema->set_primary_field_id(fid_64);
int64_t N = 100;
int64_t req_size = 10;
auto choose = [=](int i) { return i * 3 % N; };
auto choose2 = [=](int i) { return i * 3 % N + 3 * N; };
auto dataset = DataGen(schema, N);
auto segment = CreateSealedSegment(schema);
SealedLoadFieldData(dataset, *segment);
auto i64_col = dataset.get_col<int64_t>(fid_64);
auto plan = std::make_unique<query::RetrievePlan>(*schema);
std::vector<int64_t> values;
for (int i = 0; i < req_size; ++i) {
values.emplace_back(i64_col[choose(i)]);
values.emplace_back(choose2(i));
}
auto term_expr = std::make_unique<query::TermExprImpl<int64_t>>(
milvus::query::ColumnInfo(
fid_64, DataType::INT64, std::vector<std::string>()),
values,
proto::plan::GenericValue::kInt64Val);
plan->plan_node_ = std::make_unique<query::RetrievePlanNode>();
plan->plan_node_->predicate_ = std::move(term_expr);
std::vector<FieldId> target_offsets{fid_64, fid_vec};
plan->field_ids_ = target_offsets;
auto retrieve_results =
RetrieveUsingDefaultOutputSize(segment.get(), plan.get(), 100);
Assert(retrieve_results->fields_data_size() == target_offsets.size());
auto field0 = retrieve_results->fields_data(0);
Assert(field0.has_scalars());
auto field0_data = field0.scalars().long_data();
for (int i = 0; i < req_size; ++i) {
auto index = choose(i);
auto data = field0_data.data(i);
ASSERT_EQ(data, i64_col[index]);
}
auto field1 = retrieve_results->fields_data(1);
Assert(field1.has_vectors());
auto field1_data = field1.vectors().float_vector();
ASSERT_EQ(field1_data.data_size(), DIM * req_size);
}
TEST(Retrieve, Empty) {
auto schema = std::make_shared<Schema>();
auto fid_64 = schema->AddDebugField("i64", DataType::INT64);
auto DIM = 16;
auto fid_vec = schema->AddDebugField(
"vector_64", DataType::VECTOR_FLOAT, DIM, knowhere::metric::L2);
schema->set_primary_field_id(fid_64);
int64_t N = 100;
int64_t req_size = 10;
auto choose = [=](int i) { return i * 3 % N; };
auto segment = CreateSealedSegment(schema);
auto plan = std::make_unique<query::RetrievePlan>(*schema);
std::vector<int64_t> values;
for (int i = 0; i < req_size; ++i) {
values.emplace_back(choose(i));
}
auto term_expr = std::make_unique<query::TermExprImpl<int64_t>>(
milvus::query::ColumnInfo(
fid_64, DataType::INT64, std::vector<std::string>()),
values,
proto::plan::GenericValue::kInt64Val);
plan->plan_node_ = std::make_unique<query::RetrievePlanNode>();
plan->plan_node_->predicate_ = std::move(term_expr);
std::vector<FieldId> target_offsets{fid_64, fid_vec};
plan->field_ids_ = target_offsets;
auto retrieve_results =
RetrieveUsingDefaultOutputSize(segment.get(), plan.get(), 100);
Assert(retrieve_results->fields_data_size() == target_offsets.size());
auto field0 = retrieve_results->fields_data(0);
auto field1 = retrieve_results->fields_data(1);
Assert(field0.has_scalars());
auto field0_data = field0.scalars().long_data();
Assert(field0_data.data_size() == 0);
Assert(field1.vectors().float_vector().data_size() == 0);
}
TEST(Retrieve, Limit) {
auto schema = std::make_shared<Schema>();
auto fid_64 = schema->AddDebugField("i64", DataType::INT64);
auto DIM = 16;
auto fid_vec = schema->AddDebugField(
"vector_64", DataType::VECTOR_FLOAT, DIM, knowhere::metric::L2);
schema->set_primary_field_id(fid_64);
int64_t N = 101;
auto dataset = DataGen(schema, N, 42);
auto segment = CreateSealedSegment(schema);
SealedLoadFieldData(dataset, *segment);
auto plan = std::make_unique<query::RetrievePlan>(*schema);
auto term_expr = std::make_unique<query::UnaryRangeExprImpl<int64_t>>(
milvus::query::ColumnInfo(
fid_64, DataType::INT64, std::vector<std::string>()),
OpType::GreaterEqual,
0,
proto::plan::GenericValue::kInt64Val);
plan->plan_node_ = std::make_unique<query::RetrievePlanNode>();
plan->plan_node_->predicate_ = std::move(term_expr);
// test query results exceed the limit size
std::vector<FieldId> target_fields{TimestampFieldID, fid_64, fid_vec};
plan->field_ids_ = target_fields;
EXPECT_THROW(segment->Retrieve(plan.get(), N, 1), std::runtime_error);
auto retrieve_results =
segment->Retrieve(plan.get(), N, DEFAULT_MAX_OUTPUT_SIZE);
Assert(retrieve_results->fields_data_size() == target_fields.size());
auto field0 = retrieve_results->fields_data(0);
auto field2 = retrieve_results->fields_data(2);
Assert(field0.scalars().long_data().data_size() == N);
Assert(field2.vectors().float_vector().data_size() == N * DIM);
}
TEST(Retrieve, FillEntry) {
auto schema = std::make_shared<Schema>();
auto fid_64 = schema->AddDebugField("i64", DataType::INT64);
auto DIM = 16;
auto fid_bool = schema->AddDebugField("bool", DataType::BOOL);
auto fid_f32 = schema->AddDebugField("f32", DataType::FLOAT);
auto fid_f64 = schema->AddDebugField("f64", DataType::DOUBLE);
auto fid_vec32 = schema->AddDebugField(
"vector_32", DataType::VECTOR_FLOAT, DIM, knowhere::metric::L2);
auto fid_vecbin = schema->AddDebugField(
"vec_bin", DataType::VECTOR_BINARY, DIM, knowhere::metric::L2);
schema->set_primary_field_id(fid_64);
int64_t N = 101;
auto dataset = DataGen(schema, N, 42);
auto segment = CreateSealedSegment(schema);
SealedLoadFieldData(dataset, *segment);
auto plan = std::make_unique<query::RetrievePlan>(*schema);
auto term_expr = std::make_unique<query::UnaryRangeExprImpl<int64_t>>(
milvus::query::ColumnInfo(
fid_64, DataType::INT64, std::vector<std::string>()),
OpType::GreaterEqual,
0,
proto::plan::GenericValue::kInt64Val);
plan->plan_node_ = std::make_unique<query::RetrievePlanNode>();
plan->plan_node_->predicate_ = std::move(term_expr);
// test query results exceed the limit size
std::vector<FieldId> target_fields{TimestampFieldID,
fid_64,
fid_bool,
fid_f32,
fid_f64,
fid_vec32,
fid_vecbin};
plan->field_ids_ = target_fields;
EXPECT_THROW(segment->Retrieve(plan.get(), N, 1), std::runtime_error);
auto retrieve_results =
segment->Retrieve(plan.get(), N, DEFAULT_MAX_OUTPUT_SIZE);
Assert(retrieve_results->fields_data_size() == target_fields.size());
}
TEST(Retrieve, LargeTimestamp) {
auto schema = std::make_shared<Schema>();
auto fid_64 = schema->AddDebugField("i64", DataType::INT64);
auto DIM = 16;
auto fid_vec = schema->AddDebugField(
"vector_64", DataType::VECTOR_FLOAT, DIM, knowhere::metric::L2);
schema->set_primary_field_id(fid_64);
int64_t N = 100;
int64_t req_size = 10;
int choose_sep = 3;
auto choose = [=](int i) { return i * choose_sep % N; };
uint64_t ts_offset = 100;
auto dataset = DataGen(schema, N, 42, ts_offset + 1);
auto segment = CreateSealedSegment(schema);
SealedLoadFieldData(dataset, *segment);
auto i64_col = dataset.get_col<int64_t>(fid_64);
auto plan = std::make_unique<query::RetrievePlan>(*schema);
std::vector<int64_t> values;
for (int i = 0; i < req_size; ++i) {
values.emplace_back(i64_col[choose(i)]);
}
auto term_expr = std::make_unique<query::TermExprImpl<int64_t>>(
milvus::query::ColumnInfo(
fid_64, DataType::INT64, std::vector<std::string>()),
values,
proto::plan::GenericValue::kInt64Val);
plan->plan_node_ = std::make_unique<query::RetrievePlanNode>();
plan->plan_node_->predicate_ = std::move(term_expr);
std::vector<FieldId> target_offsets{fid_64, fid_vec};
plan->field_ids_ = target_offsets;
std::vector<int> filter_timestamps{-1, 0, 1, 10, 20};
filter_timestamps.push_back(N / 2);
for (const auto& f_ts : filter_timestamps) {
auto retrieve_results = RetrieveUsingDefaultOutputSize(
segment.get(), plan.get(), ts_offset + 1 + f_ts);
Assert(retrieve_results->fields_data_size() == 2);
int target_num = (f_ts + choose_sep) / choose_sep;
if (target_num > req_size) {
target_num = req_size;
}
for (auto field_data : retrieve_results->fields_data()) {
if (DataType(field_data.type()) == DataType::INT64) {
Assert(field_data.scalars().long_data().data_size() ==
target_num);
}
if (DataType(field_data.type()) == DataType::VECTOR_FLOAT) {
Assert(field_data.vectors().float_vector().data_size() ==
target_num * DIM);
}
}
}
}
TEST(Retrieve, Delete) {
auto schema = std::make_shared<Schema>();
auto fid_64 = schema->AddDebugField("i64", DataType::INT64);
auto DIM = 16;
auto fid_vec = schema->AddDebugField(
"vector_64", DataType::VECTOR_FLOAT, DIM, knowhere::metric::L2);
schema->set_primary_field_id(fid_64);
auto fid_ts = schema->AddDebugField("Timestamp", DataType::INT64);
int64_t N = 10;
int64_t req_size = 10;
auto choose = [=](int i) { return i; };
auto dataset = DataGen(schema, N);
auto segment = CreateSealedSegment(schema);
SealedLoadFieldData(dataset, *segment);
auto i64_col = dataset.get_col<int64_t>(fid_64);
auto ts_col = dataset.get_col<int64_t>(fid_ts);
auto plan = std::make_unique<query::RetrievePlan>(*schema);
std::vector<int64_t> timestamps;
for (int i = 0; i < req_size; ++i) {
timestamps.emplace_back(ts_col[choose(i)]);
}
std::vector<int64_t> values;
for (int i = 0; i < req_size; ++i) {
values.emplace_back(i64_col[choose(i)]);
}
auto term_expr = std::make_unique<query::TermExprImpl<int64_t>>(
milvus::query::ColumnInfo(
fid_64, DataType::INT64, std::vector<std::string>()),
values,
proto::plan::GenericValue::kInt64Val);
plan->plan_node_ = std::make_unique<query::RetrievePlanNode>();
plan->plan_node_->predicate_ = std::move(term_expr);
std::vector<FieldId> target_offsets{fid_ts, fid_64, fid_vec};
plan->field_ids_ = target_offsets;
{
auto retrieve_results =
RetrieveUsingDefaultOutputSize(segment.get(), plan.get(), 100);
ASSERT_EQ(retrieve_results->fields_data_size(), target_offsets.size());
auto field0 = retrieve_results->fields_data(0);
Assert(field0.has_scalars());
auto field0_data = field0.scalars().long_data();
for (int i = 0; i < req_size; ++i) {
auto index = choose(i);
auto data = field0_data.data(i);
ASSERT_EQ(data, ts_col[index]);
}
auto field1 = retrieve_results->fields_data(1);
Assert(field1.has_scalars());
auto field1_data = field1.scalars().long_data();
for (int i = 0; i < req_size; ++i) {
auto index = choose(i);
auto data = field1_data.data(i);
ASSERT_EQ(data, i64_col[index]);
}
auto field2 = retrieve_results->fields_data(2);
Assert(field2.has_vectors());
auto field2_data = field2.vectors().float_vector();
ASSERT_EQ(field2_data.data_size(), DIM * req_size);
}
int64_t row_count = 0;
// strange, when enable load_delete_record, this test failed
auto load_delete_record = false;
if (load_delete_record) {
std::vector<idx_t> pks{1, 2, 3, 4, 5};
auto ids = std::make_unique<IdArray>();
ids->mutable_int_id()->mutable_data()->Add(pks.begin(), pks.end());
std::vector<Timestamp> timestamps{10, 10, 10, 10, 10};
LoadDeletedRecordInfo info = {timestamps.data(), ids.get(), row_count};
segment->LoadDeletedRecord(info);
row_count = 5;
}
int64_t new_count = 6;
std::vector<idx_t> new_pks{0, 1, 2, 3, 4, 5};
auto ids = std::make_unique<IdArray>();
ids->mutable_int_id()->mutable_data()->Add(new_pks.begin(), new_pks.end());
std::vector<idx_t> new_timestamps{10, 10, 10, 10, 10, 10};
auto reserved_offset = segment->get_deleted_count();
ASSERT_EQ(reserved_offset, row_count);
segment->Delete(reserved_offset,
new_count,
ids.get(),
reinterpret_cast<const Timestamp*>(new_timestamps.data()));
{
auto retrieve_results =
RetrieveUsingDefaultOutputSize(segment.get(), plan.get(), 100);
Assert(retrieve_results->fields_data_size() == target_offsets.size());
auto field1 = retrieve_results->fields_data(1);
Assert(field1.has_scalars());
auto field1_data = field1.scalars().long_data();
auto size = req_size - new_count;
for (int i = 0; i < size; ++i) {
auto index = choose(i);
auto data = field1_data.data(i);
ASSERT_EQ(data, i64_col[index + new_count]);
}
auto field2 = retrieve_results->fields_data(2);
Assert(field2.has_vectors());
auto field2_data = field2.vectors().float_vector();
ASSERT_EQ(field2_data.data_size(), DIM * size);
}
}