diff --git a/internal/core/src/segcore/Reduce.cpp b/internal/core/src/segcore/Reduce.cpp index c85b74bedd..fba5ad04f5 100644 --- a/internal/core/src/segcore/Reduce.cpp +++ b/internal/core/src/segcore/Reduce.cpp @@ -190,7 +190,6 @@ ReduceHelper::ReduceResultData(int slice_index) { // `search_records` records the search result offsets std::vector> search_records(num_segments_); - std::unordered_set pk_set; int64_t skip_dup_cnt = 0; // reduce search results @@ -199,6 +198,9 @@ ReduceHelper::ReduceResultData(int slice_index) { std::vector result_pairs; for (int i = 0; i < num_segments_; i++) { auto search_result = search_results_[i]; + if (search_result->real_topK_per_nq_[qi] == 0) { + continue; + } auto base_offset = search_result->get_result_count(qi); auto primary_key = search_result->primary_keys_[base_offset]; auto distance = search_result->distances_[base_offset]; @@ -206,11 +208,13 @@ ReduceHelper::ReduceResultData(int slice_index) { base_offset + search_result->real_topK_per_nq_[qi]); } - pk_set.clear(); + // nq has no results for all segments + if (result_pairs.size() == 0) { + continue; + } + std::unordered_set pk_set; int64_t last_nq_result_offset = result_offset; - int j = 0; while (result_offset - last_nq_result_offset < slice_topKs_[slice_index]) { - j++; std::sort(result_pairs.begin(), result_pairs.end(), std::greater<>()); auto& pilot = result_pairs[0]; auto index = pilot.segment_index_; diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 9a4034cdce..c354a9f087 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -87,6 +87,32 @@ generate_data(int N) { } return std::make_tuple(raw_data, timestamps, uids); } +std::string +generate_max_float_query_data(int all_nq, int max_float_nq) { + assert(max_float_nq <= all_nq); + namespace ser = milvus::proto::common; + int dim = DIM; + ser::PlaceholderGroup raw_group; + auto value = raw_group.add_placeholders(); + value->set_tag("$0"); + value->set_type(ser::PlaceholderType::FloatVector); + for (int i = 0; i < all_nq; ++i) { + std::vector vec; + if (i < max_float_nq) { + for (int d = 0; d < dim; ++d) { + vec.push_back(std::numeric_limits::max()); + } + } else { + for (int d = 0; d < dim; ++d) { + vec.push_back(1); + } + } + value->add_values(vec.data(), vec.size() * sizeof(float)); + } + auto blob = raw_group.SerializeAsString(); + return blob; + +} std::string generate_query_data(int nq) { @@ -1017,6 +1043,83 @@ CheckSearchResultDuplicate(const std::vector& results) { // assert(cnt == topk * num_queries); } +TEST(CApiTest, ReudceNullResult) { + auto collection = NewCollection(get_default_schema_config()); + auto segment = NewSegment(collection, Growing, -1); + auto schema = ((milvus::segcore::Collection*)collection)->get_schema(); + int N = 10000; + auto dataset = DataGen(schema, N); + int64_t offset; + + PreInsert(segment, N, &offset); + auto insert_data = serialize(dataset.raw_); + auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); + assert(ins_res.error_code == Success); + + const char* dsl_string = R"( + { + "bool": { + "vector": { + "fakevec": { + "metric_type": "L2", + "params": { + "nprobe": 10 + }, + "query": "$0", + "topk": 10, + "round_decimal": 3 + } + } + } + })"; + + int num_queries = 10; + int topK = 10; + + auto blob = generate_max_float_query_data(num_queries, num_queries / 2); + + void* plan = nullptr; + auto status = CreateSearchPlan(collection, dsl_string, &plan); + assert(status.error_code == Success); + + void* placeholderGroup = nullptr; + status = ParsePlaceholderGroup(plan, blob.data(), blob.length(), &placeholderGroup); + assert(status.error_code == Success); + + std::vector placeholderGroups; + placeholderGroups.push_back(placeholderGroup); + dataset.timestamps_.clear(); + dataset.timestamps_.push_back(1); + + { + auto slice_nqs = std::vector{10}; + auto slice_topKs = std::vector{1}; + std::vector results; + CSearchResult res; + status = Search(segment, plan, placeholderGroup, dataset.timestamps_[0], &res, -1); + assert(status.error_code == Success); + results.push_back(res); + CSearchResultDataBlobs cSearchResultData; + status = ReduceSearchResultsAndFillData(&cSearchResultData, plan, results.data(), results.size(), + slice_nqs.data(), slice_topKs.data(), slice_nqs.size()); + assert(status.error_code == Success); + + auto search_result = (SearchResult*)results[0]; + auto size = search_result->result_offsets_.size(); + EXPECT_EQ(size, num_queries / 2); + + DeleteSearchResult(res); + + } + + DeleteSearchPlan(plan); + DeletePlaceholderGroup(placeholderGroup); + DeleteCollection(collection); + DeleteSegment(segment); + +} + TEST(CApiTest, ReduceRemoveDuplicates) { auto collection = NewCollection(get_default_schema_config()); auto segment = NewSegment(collection, Growing, -1); diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index a899d9e61a..def12e11f9 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -10,6 +10,7 @@ // or implied. See the License for the specific language governing permissions and limitations under the License #include +#include #include "knowhere/index/VecIndex.h" #include "knowhere/index/vector_index/IndexIVF.h" @@ -529,3 +530,141 @@ TEST(Sealed, Delete) { segment->Delete(reserved_offset, new_count, new_ids.get(), reinterpret_cast(new_timestamps.data())); } + +auto +GenMaxFloatVecs(int N, int dim) { + std::vector vecs; + for (int i = 0; i < N; i++) { + for (int j = 0; j < dim; j++) { + vecs.push_back(std::numeric_limits::max()); + } + } + return vecs; +} + +auto +GenRandomFloatVecs(int N, int dim) { + std::vector vecs; + srand(time(NULL)); + for (int i = 0; i < N; i++) { + for (int j = 0; j < dim; j++) { + vecs.push_back(static_cast(rand()) / static_cast(RAND_MAX)); + } + } + return vecs; +} + +auto +GenQueryVecs(int N, int dim) { + std::vector vecs; + for (int i = 0; i < N; i++) { + for (int j = 0; j < dim; j++) { + vecs.push_back(1); + } + } + return vecs; +} + +auto +transfer_to_fields_data(const std::vector& vecs) { + auto arr = std::make_unique(); + *(arr->mutable_vectors()->mutable_float_vector()->mutable_data()) = {vecs.begin(), vecs.end()}; + return arr; +} + +TEST(Sealed, BF) { + auto schema = std::make_shared(); + auto dim = 128; + auto metric_type = "L2"; + auto fake_id = schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, dim, metric_type); + auto i64_fid = schema->AddDebugField("counter", DataType::INT64); + schema->set_primary_field_id(i64_fid); + + int64_t N = 100000; + auto base = GenRandomFloatVecs(N, dim); + auto base_arr = transfer_to_fields_data(base); + base_arr->set_type(proto::schema::DataType::FloatVector); + + LoadFieldDataInfo load_info{100, base_arr.get(), N}; + + auto dataset = DataGen(schema, N); + auto segment = CreateSealedSegment(schema); + std::cout << fake_id.get() << std::endl; + SealedLoadFieldData(dataset, *segment, {fake_id.get()}); + + segment->LoadFieldData(load_info); + + auto topK = 1; + auto fmt = boost::format(R"(vector_anns: < + field_id: 100 + query_info: < + topk: %1% + metric_type: "L2" + search_params: "{\"nprobe\": 10}" + > + placeholder_tag: "$0"> + output_field_ids: 101)") % + topK; + auto serialized_expr_plan = fmt.str(); + auto binary_plan = translate_text_plan_to_binary_plan(serialized_expr_plan.data()); + auto plan = CreateSearchPlanByExpr(*schema, binary_plan.data(), binary_plan.size()); + + auto num_queries = 10; + auto query = GenQueryVecs(num_queries, dim); + auto ph_group_raw = CreatePlaceholderGroup(num_queries, dim, query); + auto ph_group = ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString()); + + auto result = segment->Search(plan.get(), ph_group.get(), MAX_TIMESTAMP); + auto ves = SearchResultToVector(*result); + // first: offset, second: distance + EXPECT_GT(ves[0].first, 0); + EXPECT_LE(ves[0].first, N); + EXPECT_LE(ves[0].second, dim); +} + +TEST(Sealed, BF_Overflow) { + auto schema = std::make_shared(); + auto dim = 128; + auto metric_type = "L2"; + auto fake_id = schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, dim, metric_type); + auto i64_fid = schema->AddDebugField("counter", DataType::INT64); + schema->set_primary_field_id(i64_fid); + + int64_t N = 10; + auto base = GenMaxFloatVecs(N, dim); + auto base_arr = transfer_to_fields_data(base); + base_arr->set_type(proto::schema::DataType::FloatVector); + LoadFieldDataInfo load_info{100, base_arr.get(), N}; + auto dataset = DataGen(schema, N); + auto segment = CreateSealedSegment(schema); + std::cout<< fake_id.get() <LoadFieldData(load_info); + + auto topK = 1; + auto fmt = boost::format(R"(vector_anns: < + field_id: 100 + query_info: < + topk: %1% + metric_type: "L2" + search_params: "{\"nprobe\": 10}" + > + placeholder_tag: "$0"> + output_field_ids: 101)") % + topK; + auto serialized_expr_plan = fmt.str(); + auto binary_plan = translate_text_plan_to_binary_plan(serialized_expr_plan.data()); + auto plan = CreateSearchPlanByExpr(*schema, binary_plan.data(), binary_plan.size()); + + auto num_queries = 10; + auto query = GenQueryVecs(num_queries, dim); + auto ph_group_raw = CreatePlaceholderGroup(num_queries, dim, query); + auto ph_group = ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString()); + + auto result = segment->Search(plan.get(), ph_group.get(), MAX_TIMESTAMP); + auto ves = SearchResultToVector(*result); + for (int i = 0; i < num_queries; ++i) { + EXPECT_EQ(ves[0].first, -1); + } +} \ No newline at end of file diff --git a/internal/core/unittest/test_utils/DataGen.h b/internal/core/unittest/test_utils/DataGen.h index ed083f9a3c..0f16c431ba 100644 --- a/internal/core/unittest/test_utils/DataGen.h +++ b/internal/core/unittest/test_utils/DataGen.h @@ -280,6 +280,23 @@ CreatePlaceholderGroup(int64_t num_queries, int dim, int64_t seed = 42) { return raw_group; } +inline auto +CreatePlaceholderGroup(int64_t num_queries, int dim, const std::vector& vecs) { + namespace ser = milvus::proto::common; + ser::PlaceholderGroup raw_group; + auto value = raw_group.add_placeholders(); + value->set_tag("$0"); + value->set_type(ser::PlaceholderType::FloatVector); + for (int i = 0; i < num_queries; ++i) { + std::vector vec; + for (int d = 0; d < dim; ++d) { + vec.push_back(vecs[i*dim+d]); + } + value->add_values(vec.data(), vec.size() * sizeof(float)); + } + return raw_group; +} + inline auto CreatePlaceholderGroupFromBlob(int64_t num_queries, int dim, const float* src) { namespace ser = milvus::proto::common; @@ -340,6 +357,20 @@ CreateBinaryPlaceholderGroupFromBlob(int64_t num_queries, int64_t dim, const uin return raw_group; } +inline auto +SearchResultToVector(const SearchResult& sr) { + int64_t num_queries = sr.total_nq_; + int64_t topk = sr.unity_topK_; + std::vector> result; + for (int q = 0; q < num_queries; ++q) { + for (int k = 0; k < topk; ++k) { + int index = q * topk + k; + result.emplace_back(std::make_pair(sr.seg_offsets_[index], sr.distances_[index])); + } + } + return result; +} + inline json SearchResultToJson(const SearchResult& sr) { int64_t num_queries = sr.total_nq_; @@ -357,7 +388,7 @@ SearchResultToJson(const SearchResult& sr) { }; inline void -SealedLoadFieldData(const GeneratedData& dataset, SegmentSealed& seg) { +SealedLoadFieldData(const GeneratedData& dataset, SegmentSealed& seg, const std::set& exclude_fields = {}) { auto row_count = dataset.row_ids_.size(); { LoadFieldDataInfo info; @@ -378,6 +409,10 @@ SealedLoadFieldData(const GeneratedData& dataset, SegmentSealed& seg) { seg.LoadFieldData(info); } for (auto field_data : dataset.raw_->fields_data()) { + int64_t field_id = field_data.field_id(); + if (exclude_fields.find(field_id) != exclude_fields.end()) { + continue; + } LoadFieldDataInfo info; info.field_id = field_data.field_id(); info.row_count = row_count;