mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
Signed-off-by: zhagnlu <lu.zhang@zilliz.com> Co-authored-by: zhagnlu <lu.zhang@zilliz.com>
This commit is contained in:
parent
7ac21fac91
commit
257da153ce
@ -190,7 +190,6 @@ ReduceHelper::ReduceResultData(int slice_index) {
|
||||
|
||||
// `search_records` records the search result offsets
|
||||
std::vector<std::vector<int64_t>> search_records(num_segments_);
|
||||
std::unordered_set<milvus::PkType> pk_set;
|
||||
int64_t skip_dup_cnt = 0;
|
||||
|
||||
// reduce search results
|
||||
@ -199,6 +198,9 @@ ReduceHelper::ReduceResultData(int slice_index) {
|
||||
std::vector<SearchResultPair> result_pairs;
|
||||
for (int i = 0; i < num_segments_; i++) {
|
||||
auto search_result = search_results_[i];
|
||||
if (search_result->real_topK_per_nq_[qi] == 0) {
|
||||
continue;
|
||||
}
|
||||
auto base_offset = search_result->get_result_count(qi);
|
||||
auto primary_key = search_result->primary_keys_[base_offset];
|
||||
auto distance = search_result->distances_[base_offset];
|
||||
@ -206,11 +208,13 @@ ReduceHelper::ReduceResultData(int slice_index) {
|
||||
base_offset + search_result->real_topK_per_nq_[qi]);
|
||||
}
|
||||
|
||||
pk_set.clear();
|
||||
// nq has no results for all segments
|
||||
if (result_pairs.size() == 0) {
|
||||
continue;
|
||||
}
|
||||
std::unordered_set<milvus::PkType> pk_set;
|
||||
int64_t last_nq_result_offset = result_offset;
|
||||
int j = 0;
|
||||
while (result_offset - last_nq_result_offset < slice_topKs_[slice_index]) {
|
||||
j++;
|
||||
std::sort(result_pairs.begin(), result_pairs.end(), std::greater<>());
|
||||
auto& pilot = result_pairs[0];
|
||||
auto index = pilot.segment_index_;
|
||||
|
||||
@ -87,6 +87,32 @@ generate_data(int N) {
|
||||
}
|
||||
return std::make_tuple(raw_data, timestamps, uids);
|
||||
}
|
||||
std::string
|
||||
generate_max_float_query_data(int all_nq, int max_float_nq) {
|
||||
assert(max_float_nq <= all_nq);
|
||||
namespace ser = milvus::proto::common;
|
||||
int dim = DIM;
|
||||
ser::PlaceholderGroup raw_group;
|
||||
auto value = raw_group.add_placeholders();
|
||||
value->set_tag("$0");
|
||||
value->set_type(ser::PlaceholderType::FloatVector);
|
||||
for (int i = 0; i < all_nq; ++i) {
|
||||
std::vector<float> vec;
|
||||
if (i < max_float_nq) {
|
||||
for (int d = 0; d < dim; ++d) {
|
||||
vec.push_back(std::numeric_limits<float>::max());
|
||||
}
|
||||
} else {
|
||||
for (int d = 0; d < dim; ++d) {
|
||||
vec.push_back(1);
|
||||
}
|
||||
}
|
||||
value->add_values(vec.data(), vec.size() * sizeof(float));
|
||||
}
|
||||
auto blob = raw_group.SerializeAsString();
|
||||
return blob;
|
||||
|
||||
}
|
||||
|
||||
std::string
|
||||
generate_query_data(int nq) {
|
||||
@ -1017,6 +1043,83 @@ CheckSearchResultDuplicate(const std::vector<CSearchResult>& results) {
|
||||
// assert(cnt == topk * num_queries);
|
||||
}
|
||||
|
||||
TEST(CApiTest, ReudceNullResult) {
|
||||
auto collection = NewCollection(get_default_schema_config());
|
||||
auto segment = NewSegment(collection, Growing, -1);
|
||||
auto schema = ((milvus::segcore::Collection*)collection)->get_schema();
|
||||
int N = 10000;
|
||||
auto dataset = DataGen(schema, N);
|
||||
int64_t offset;
|
||||
|
||||
PreInsert(segment, N, &offset);
|
||||
auto insert_data = serialize(dataset.raw_);
|
||||
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
|
||||
insert_data.size());
|
||||
assert(ins_res.error_code == Success);
|
||||
|
||||
const char* dsl_string = R"(
|
||||
{
|
||||
"bool": {
|
||||
"vector": {
|
||||
"fakevec": {
|
||||
"metric_type": "L2",
|
||||
"params": {
|
||||
"nprobe": 10
|
||||
},
|
||||
"query": "$0",
|
||||
"topk": 10,
|
||||
"round_decimal": 3
|
||||
}
|
||||
}
|
||||
}
|
||||
})";
|
||||
|
||||
int num_queries = 10;
|
||||
int topK = 10;
|
||||
|
||||
auto blob = generate_max_float_query_data(num_queries, num_queries / 2);
|
||||
|
||||
void* plan = nullptr;
|
||||
auto status = CreateSearchPlan(collection, dsl_string, &plan);
|
||||
assert(status.error_code == Success);
|
||||
|
||||
void* placeholderGroup = nullptr;
|
||||
status = ParsePlaceholderGroup(plan, blob.data(), blob.length(), &placeholderGroup);
|
||||
assert(status.error_code == Success);
|
||||
|
||||
std::vector<CPlaceholderGroup> placeholderGroups;
|
||||
placeholderGroups.push_back(placeholderGroup);
|
||||
dataset.timestamps_.clear();
|
||||
dataset.timestamps_.push_back(1);
|
||||
|
||||
{
|
||||
auto slice_nqs = std::vector<int32_t>{10};
|
||||
auto slice_topKs = std::vector<int32_t>{1};
|
||||
std::vector<CSearchResult> results;
|
||||
CSearchResult res;
|
||||
status = Search(segment, plan, placeholderGroup, dataset.timestamps_[0], &res, -1);
|
||||
assert(status.error_code == Success);
|
||||
results.push_back(res);
|
||||
CSearchResultDataBlobs cSearchResultData;
|
||||
status = ReduceSearchResultsAndFillData(&cSearchResultData, plan, results.data(), results.size(),
|
||||
slice_nqs.data(), slice_topKs.data(), slice_nqs.size());
|
||||
assert(status.error_code == Success);
|
||||
|
||||
auto search_result = (SearchResult*)results[0];
|
||||
auto size = search_result->result_offsets_.size();
|
||||
EXPECT_EQ(size, num_queries / 2);
|
||||
|
||||
DeleteSearchResult(res);
|
||||
|
||||
}
|
||||
|
||||
DeleteSearchPlan(plan);
|
||||
DeletePlaceholderGroup(placeholderGroup);
|
||||
DeleteCollection(collection);
|
||||
DeleteSegment(segment);
|
||||
|
||||
}
|
||||
|
||||
TEST(CApiTest, ReduceRemoveDuplicates) {
|
||||
auto collection = NewCollection(get_default_schema_config());
|
||||
auto segment = NewSegment(collection, Growing, -1);
|
||||
|
||||
@ -10,6 +10,7 @@
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <boost/format.hpp>
|
||||
|
||||
#include "knowhere/index/VecIndex.h"
|
||||
#include "knowhere/index/vector_index/IndexIVF.h"
|
||||
@ -529,3 +530,141 @@ TEST(Sealed, Delete) {
|
||||
segment->Delete(reserved_offset, new_count, new_ids.get(),
|
||||
reinterpret_cast<const Timestamp*>(new_timestamps.data()));
|
||||
}
|
||||
|
||||
auto
|
||||
GenMaxFloatVecs(int N, int dim) {
|
||||
std::vector<float> vecs;
|
||||
for (int i = 0; i < N; i++) {
|
||||
for (int j = 0; j < dim; j++) {
|
||||
vecs.push_back(std::numeric_limits<float>::max());
|
||||
}
|
||||
}
|
||||
return vecs;
|
||||
}
|
||||
|
||||
auto
|
||||
GenRandomFloatVecs(int N, int dim) {
|
||||
std::vector<float> vecs;
|
||||
srand(time(NULL));
|
||||
for (int i = 0; i < N; i++) {
|
||||
for (int j = 0; j < dim; j++) {
|
||||
vecs.push_back(static_cast<float>(rand()) / static_cast<float>(RAND_MAX));
|
||||
}
|
||||
}
|
||||
return vecs;
|
||||
}
|
||||
|
||||
auto
|
||||
GenQueryVecs(int N, int dim) {
|
||||
std::vector<float> vecs;
|
||||
for (int i = 0; i < N; i++) {
|
||||
for (int j = 0; j < dim; j++) {
|
||||
vecs.push_back(1);
|
||||
}
|
||||
}
|
||||
return vecs;
|
||||
}
|
||||
|
||||
auto
|
||||
transfer_to_fields_data(const std::vector<float>& vecs) {
|
||||
auto arr = std::make_unique<DataArray>();
|
||||
*(arr->mutable_vectors()->mutable_float_vector()->mutable_data()) = {vecs.begin(), vecs.end()};
|
||||
return arr;
|
||||
}
|
||||
|
||||
TEST(Sealed, BF) {
|
||||
auto schema = std::make_shared<Schema>();
|
||||
auto dim = 128;
|
||||
auto metric_type = "L2";
|
||||
auto fake_id = schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, dim, metric_type);
|
||||
auto i64_fid = schema->AddDebugField("counter", DataType::INT64);
|
||||
schema->set_primary_field_id(i64_fid);
|
||||
|
||||
int64_t N = 100000;
|
||||
auto base = GenRandomFloatVecs(N, dim);
|
||||
auto base_arr = transfer_to_fields_data(base);
|
||||
base_arr->set_type(proto::schema::DataType::FloatVector);
|
||||
|
||||
LoadFieldDataInfo load_info{100, base_arr.get(), N};
|
||||
|
||||
auto dataset = DataGen(schema, N);
|
||||
auto segment = CreateSealedSegment(schema);
|
||||
std::cout << fake_id.get() << std::endl;
|
||||
SealedLoadFieldData(dataset, *segment, {fake_id.get()});
|
||||
|
||||
segment->LoadFieldData(load_info);
|
||||
|
||||
auto topK = 1;
|
||||
auto fmt = boost::format(R"(vector_anns: <
|
||||
field_id: 100
|
||||
query_info: <
|
||||
topk: %1%
|
||||
metric_type: "L2"
|
||||
search_params: "{\"nprobe\": 10}"
|
||||
>
|
||||
placeholder_tag: "$0">
|
||||
output_field_ids: 101)") %
|
||||
topK;
|
||||
auto serialized_expr_plan = fmt.str();
|
||||
auto binary_plan = translate_text_plan_to_binary_plan(serialized_expr_plan.data());
|
||||
auto plan = CreateSearchPlanByExpr(*schema, binary_plan.data(), binary_plan.size());
|
||||
|
||||
auto num_queries = 10;
|
||||
auto query = GenQueryVecs(num_queries, dim);
|
||||
auto ph_group_raw = CreatePlaceholderGroup(num_queries, dim, query);
|
||||
auto ph_group = ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());
|
||||
|
||||
auto result = segment->Search(plan.get(), ph_group.get(), MAX_TIMESTAMP);
|
||||
auto ves = SearchResultToVector(*result);
|
||||
// first: offset, second: distance
|
||||
EXPECT_GT(ves[0].first, 0);
|
||||
EXPECT_LE(ves[0].first, N);
|
||||
EXPECT_LE(ves[0].second, dim);
|
||||
}
|
||||
|
||||
TEST(Sealed, BF_Overflow) {
|
||||
auto schema = std::make_shared<Schema>();
|
||||
auto dim = 128;
|
||||
auto metric_type = "L2";
|
||||
auto fake_id = schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, dim, metric_type);
|
||||
auto i64_fid = schema->AddDebugField("counter", DataType::INT64);
|
||||
schema->set_primary_field_id(i64_fid);
|
||||
|
||||
int64_t N = 10;
|
||||
auto base = GenMaxFloatVecs(N, dim);
|
||||
auto base_arr = transfer_to_fields_data(base);
|
||||
base_arr->set_type(proto::schema::DataType::FloatVector);
|
||||
LoadFieldDataInfo load_info{100, base_arr.get(), N};
|
||||
auto dataset = DataGen(schema, N);
|
||||
auto segment = CreateSealedSegment(schema);
|
||||
std::cout<< fake_id.get() <<std::endl;
|
||||
SealedLoadFieldData(dataset, *segment, {fake_id.get()});
|
||||
|
||||
segment->LoadFieldData(load_info);
|
||||
|
||||
auto topK = 1;
|
||||
auto fmt = boost::format(R"(vector_anns: <
|
||||
field_id: 100
|
||||
query_info: <
|
||||
topk: %1%
|
||||
metric_type: "L2"
|
||||
search_params: "{\"nprobe\": 10}"
|
||||
>
|
||||
placeholder_tag: "$0">
|
||||
output_field_ids: 101)") %
|
||||
topK;
|
||||
auto serialized_expr_plan = fmt.str();
|
||||
auto binary_plan = translate_text_plan_to_binary_plan(serialized_expr_plan.data());
|
||||
auto plan = CreateSearchPlanByExpr(*schema, binary_plan.data(), binary_plan.size());
|
||||
|
||||
auto num_queries = 10;
|
||||
auto query = GenQueryVecs(num_queries, dim);
|
||||
auto ph_group_raw = CreatePlaceholderGroup(num_queries, dim, query);
|
||||
auto ph_group = ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());
|
||||
|
||||
auto result = segment->Search(plan.get(), ph_group.get(), MAX_TIMESTAMP);
|
||||
auto ves = SearchResultToVector(*result);
|
||||
for (int i = 0; i < num_queries; ++i) {
|
||||
EXPECT_EQ(ves[0].first, -1);
|
||||
}
|
||||
}
|
||||
@ -280,6 +280,23 @@ CreatePlaceholderGroup(int64_t num_queries, int dim, int64_t seed = 42) {
|
||||
return raw_group;
|
||||
}
|
||||
|
||||
inline auto
|
||||
CreatePlaceholderGroup(int64_t num_queries, int dim, const std::vector<float>& vecs) {
|
||||
namespace ser = milvus::proto::common;
|
||||
ser::PlaceholderGroup raw_group;
|
||||
auto value = raw_group.add_placeholders();
|
||||
value->set_tag("$0");
|
||||
value->set_type(ser::PlaceholderType::FloatVector);
|
||||
for (int i = 0; i < num_queries; ++i) {
|
||||
std::vector<float> vec;
|
||||
for (int d = 0; d < dim; ++d) {
|
||||
vec.push_back(vecs[i*dim+d]);
|
||||
}
|
||||
value->add_values(vec.data(), vec.size() * sizeof(float));
|
||||
}
|
||||
return raw_group;
|
||||
}
|
||||
|
||||
inline auto
|
||||
CreatePlaceholderGroupFromBlob(int64_t num_queries, int dim, const float* src) {
|
||||
namespace ser = milvus::proto::common;
|
||||
@ -340,6 +357,20 @@ CreateBinaryPlaceholderGroupFromBlob(int64_t num_queries, int64_t dim, const uin
|
||||
return raw_group;
|
||||
}
|
||||
|
||||
inline auto
|
||||
SearchResultToVector(const SearchResult& sr) {
|
||||
int64_t num_queries = sr.total_nq_;
|
||||
int64_t topk = sr.unity_topK_;
|
||||
std::vector<std::pair<int, float>> result;
|
||||
for (int q = 0; q < num_queries; ++q) {
|
||||
for (int k = 0; k < topk; ++k) {
|
||||
int index = q * topk + k;
|
||||
result.emplace_back(std::make_pair(sr.seg_offsets_[index], sr.distances_[index]));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
inline json
|
||||
SearchResultToJson(const SearchResult& sr) {
|
||||
int64_t num_queries = sr.total_nq_;
|
||||
@ -357,7 +388,7 @@ SearchResultToJson(const SearchResult& sr) {
|
||||
};
|
||||
|
||||
inline void
|
||||
SealedLoadFieldData(const GeneratedData& dataset, SegmentSealed& seg) {
|
||||
SealedLoadFieldData(const GeneratedData& dataset, SegmentSealed& seg, const std::set<int64_t>& exclude_fields = {}) {
|
||||
auto row_count = dataset.row_ids_.size();
|
||||
{
|
||||
LoadFieldDataInfo info;
|
||||
@ -378,6 +409,10 @@ SealedLoadFieldData(const GeneratedData& dataset, SegmentSealed& seg) {
|
||||
seg.LoadFieldData(info);
|
||||
}
|
||||
for (auto field_data : dataset.raw_->fields_data()) {
|
||||
int64_t field_id = field_data.field_id();
|
||||
if (exclude_fields.find(field_id) != exclude_fields.end()) {
|
||||
continue;
|
||||
}
|
||||
LoadFieldDataInfo info;
|
||||
info.field_id = field_data.field_id();
|
||||
info.row_count = row_count;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user