diff --git a/Makefile b/Makefile index 0856c124cf..565f533f72 100644 --- a/Makefile +++ b/Makefile @@ -56,12 +56,12 @@ verifiers: cppcheck fmt lint ruleguard # Builds various components locally. build-go: @echo "Building each component's binary to './bin'" - @echo "Building query node ..." - @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/query_node.go 1>/dev/null @echo "Building master ..." @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="0" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/master $(PWD)/cmd/master/main.go 1>/dev/null @echo "Building proxy ..." @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="0" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/proxy $(PWD)/cmd/proxy/proxy.go 1>/dev/null + @echo "Building query node ..." + @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/query_node.go 1>/dev/null build-cpp: @(env bash $(PWD)/scripts/core_build.sh) diff --git a/internal/core/build-support/add_license.sh b/internal/core/build-support/add_license.sh index 5d27816890..83dd6d48f3 100755 --- a/internal/core/build-support/add_license.sh +++ b/internal/core/build-support/add_license.sh @@ -1,11 +1,13 @@ -FOLDER=$1 -if [ -z ${FOLDER} ]; then - echo usage $0 [folder_to_add_license] +LICENSE=$1 +FOLDER=$2 + +if [ -z ${FOLDER} ] || [ -z ${LICENSE} ]; then + echo "usage $0 " exit -else - echo good fi +cat ${LICENSE} > /dev/null || exit -1 + FILES=`find ${FOLDER} \ | grep -E "(*\.cpp$|*\.h$|*\.cu$)" \ | grep -v thirdparty \ @@ -13,13 +15,16 @@ FILES=`find ${FOLDER} \ | grep -v cmake-build \ | grep -v output \ | grep -v "\.pb\."` -echo formating ${FILES} ... +# echo formating ${FILES} ... +skip_count=0 for f in ${FILES}; do - if (grep "Apache License" $f);then - echo "No need to copy the License Header to $f" + if (grep "Apache License" $f > /dev/null);then + # echo "No need to copy the License Header to $f" + skip_count=$((skip_count+1)) else - cat cpp_license.txt $f > $f.new + cat ${LICENSE} $f > $f.new mv $f.new $f echo "License Header copied to $f" fi done +echo "license adder: $skip_count file(s) skiped" diff --git a/internal/core/run_clang_format.sh b/internal/core/run_clang_format.sh index e713a542df..b5a682639c 100755 --- a/internal/core/run_clang_format.sh +++ b/internal/core/run_clang_format.sh @@ -13,3 +13,4 @@ formatThis() { formatThis "${CorePath}/src" formatThis "${CorePath}/unittest" +${CorePath}/build-support/add_license.sh ${CorePath}/build-support/cpp_license.txt ${CorePath} diff --git a/internal/core/src/query/BruteForceSearch.cpp b/internal/core/src/query/BruteForceSearch.cpp new file mode 100644 index 0000000000..626fe54290 --- /dev/null +++ b/internal/core/src/query/BruteForceSearch.cpp @@ -0,0 +1,88 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include "BruteForceSearch.h" +#include + +namespace milvus::query { + +void +BinarySearchBruteForce(faiss::MetricType metric_type, + int64_t code_size, + const uint8_t* binary_chunk, + int64_t chunk_size, + int64_t topk, + int64_t num_queries, + const uint8_t* query_data, + float* result_distances, + idx_t* result_labels, + faiss::ConcurrentBitsetPtr bitset) { + const idx_t block_size = segcore::DefaultElementPerChunk; + bool use_heap = true; + + if (metric_type == faiss::METRIC_Jaccard || metric_type == faiss::METRIC_Tanimoto) { + float* D = result_distances; + for (idx_t query_base_index = 0; query_base_index < num_queries; query_base_index += block_size) { + idx_t query_size = block_size; + if (query_base_index + block_size > num_queries) { + query_size = num_queries - query_base_index; + } + + // We see the distances and labels as heaps. + faiss::float_maxheap_array_t res = {size_t(query_size), size_t(topk), + result_labels + query_base_index * topk, D + query_base_index * topk}; + + binary_distence_knn_hc(metric_type, &res, query_data + query_base_index * code_size, binary_chunk, + chunk_size, code_size, + /* ordered = */ true, bitset); + } + if (metric_type == faiss::METRIC_Tanimoto) { + for (int i = 0; i < topk * num_queries; i++) { + D[i] = -log2(1 - D[i]); + } + } + } else if (metric_type == faiss::METRIC_Substructure || metric_type == faiss::METRIC_Superstructure) { + float* D = result_distances; + for (idx_t s = 0; s < num_queries; s += block_size) { + idx_t nn = block_size; + if (s + block_size > num_queries) { + nn = num_queries - s; + } + + // only match ids will be chosed, not to use heap + binary_distence_knn_mc(metric_type, query_data + s * code_size, binary_chunk, nn, chunk_size, topk, + code_size, D + s * topk, result_labels + s * topk, bitset); + } + } else if (metric_type == faiss::METRIC_Hamming) { + std::vector int_distances(topk * num_queries); + for (idx_t s = 0; s < num_queries; s += block_size) { + idx_t nn = block_size; + if (s + block_size > num_queries) { + nn = num_queries - s; + } + if (use_heap) { + // We see the distances and labels as heaps. + faiss::int_maxheap_array_t res = {size_t(nn), size_t(topk), result_labels + s * topk, + int_distances.data() + s * topk}; + + hammings_knn_hc(&res, query_data + s * code_size, binary_chunk, chunk_size, code_size, + /* ordered = */ true, bitset); + } else { + hammings_knn_mc(query_data + s * code_size, binary_chunk, nn, chunk_size, topk, code_size, + int_distances.data() + s * topk, result_labels + s * topk, bitset); + } + } + for (int i = 0; i < num_queries; ++i) { + result_distances[i] = static_cast(int_distances[i]); + } + } +} +} // namespace milvus::query diff --git a/internal/core/src/query/BruteForceSearch.h b/internal/core/src/query/BruteForceSearch.h new file mode 100644 index 0000000000..1edc19e159 --- /dev/null +++ b/internal/core/src/query/BruteForceSearch.h @@ -0,0 +1,29 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once +#include +#include "segcore/ConcurrentVector.h" +#include "common/Schema.h" + +namespace milvus::query { +void +BinarySearchBruteForce(faiss::MetricType metric_type, + int64_t code_size, + const uint8_t* binary_chunk, + int64_t chunk_size, + int64_t topk, + int64_t num_queries, + const uint8_t* query_data, + float* result_distances, + idx_t* result_labels, + faiss::ConcurrentBitsetPtr bitset = nullptr); +} // namespace milvus::query diff --git a/internal/core/src/query/CMakeLists.txt b/internal/core/src/query/CMakeLists.txt index 9cd8c684af..671713dc5b 100644 --- a/internal/core/src/query/CMakeLists.txt +++ b/internal/core/src/query/CMakeLists.txt @@ -9,6 +9,7 @@ set(MILVUS_QUERY_SRCS visitors/ExecExprVisitor.cpp Plan.cpp Search.cpp + BruteForceSearch.cpp ) add_library(milvus_query ${MILVUS_QUERY_SRCS}) target_link_libraries(milvus_query milvus_proto) diff --git a/internal/core/src/query/Search.cpp b/internal/core/src/query/Search.cpp index d18f4dc57f..b73014a86a 100644 --- a/internal/core/src/query/Search.cpp +++ b/internal/core/src/query/Search.cpp @@ -93,8 +93,8 @@ QueryBruteForceImpl(const segcore::SegmentSmallIndex& segment, } segcore::merge_into(num_queries, topK, final_dis.data(), final_uids.data(), dis, uids); } - - auto vec_ptr = record.get_vec_entity(vecfield_offset); + using segcore::FloatVector; + auto vec_ptr = record.get_entity(vecfield_offset); // step 4: brute force search where small indexing is unavailable for (int chunk_id = max_indexed_id; chunk_id < max_chunk; ++chunk_id) { std::vector buf_uids(total_count, -1); diff --git a/internal/core/src/query/visitors/ExecExprVisitor.cpp b/internal/core/src/query/visitors/ExecExprVisitor.cpp index d1b5c3d963..83e5782c91 100644 --- a/internal/core/src/query/visitors/ExecExprVisitor.cpp +++ b/internal/core/src/query/visitors/ExecExprVisitor.cpp @@ -79,7 +79,7 @@ ExecExprVisitor::ExecRangeVisitorImpl(RangeExprImpl& expr, IndexFunc index_fu Assert(field_offset_opt); auto field_offset = field_offset_opt.value(); auto& field_meta = schema[field_offset]; - auto vec_ptr = records.get_scalar_entity(field_offset); + auto vec_ptr = records.get_entity(field_offset); auto& vec = *vec_ptr; auto& indexing_record = segment_.get_indexing_record(); const segcore::ScalarIndexingEntry& entry = indexing_record.get_scalar_entry(field_offset); diff --git a/internal/core/src/segcore/ConcurrentVector.h b/internal/core/src/segcore/ConcurrentVector.h index 8373b6ee2f..74ece55121 100644 --- a/internal/core/src/segcore/ConcurrentVector.h +++ b/internal/core/src/segcore/ConcurrentVector.h @@ -109,20 +109,20 @@ class VectorBase { }; template -class ConcurrentVector : public VectorBase { +class ConcurrentVectorImpl : public VectorBase { public: // constants using Chunk = FixedVector; - ConcurrentVector(ConcurrentVector&&) = delete; - ConcurrentVector(const ConcurrentVector&) = delete; + ConcurrentVectorImpl(ConcurrentVectorImpl&&) = delete; + ConcurrentVectorImpl(const ConcurrentVectorImpl&) = delete; - ConcurrentVector& - operator=(ConcurrentVector&&) = delete; - ConcurrentVector& - operator=(const ConcurrentVector&) = delete; + ConcurrentVectorImpl& + operator=(ConcurrentVectorImpl&&) = delete; + ConcurrentVectorImpl& + operator=(const ConcurrentVectorImpl&) = delete; public: - explicit ConcurrentVector(ssize_t dim = 1) : Dim(is_scalar ? 1 : dim), SizePerChunk(Dim * ElementsPerChunk) { + explicit ConcurrentVectorImpl(ssize_t dim = 1) : Dim(is_scalar ? 1 : dim), SizePerChunk(Dim * ElementsPerChunk) { Assert(is_scalar ? dim == 1 : dim != 1); } @@ -221,4 +221,28 @@ class ConcurrentVector : public VectorBase { ThreadSafeVector chunks_; }; +template +class ConcurrentVector : public ConcurrentVectorImpl { + using ConcurrentVectorImpl::ConcurrentVectorImpl; +}; + +class FloatVector {}; +class BinaryVector {}; + +template <> +class ConcurrentVector : public ConcurrentVectorImpl { + using ConcurrentVectorImpl::ConcurrentVectorImpl; +}; + +template <> +class ConcurrentVector : public ConcurrentVectorImpl { + public: + explicit ConcurrentVector(int64_t dim) : binary_dim_(dim), ConcurrentVectorImpl(dim / 8) { + Assert(dim % 8 == 0); + } + + private: + int64_t binary_dim_; +}; + } // namespace milvus::segcore diff --git a/internal/core/src/segcore/DeletedRecord.h b/internal/core/src/segcore/DeletedRecord.h index 58b8f81207..c88d1edde5 100644 --- a/internal/core/src/segcore/DeletedRecord.h +++ b/internal/core/src/segcore/DeletedRecord.h @@ -55,8 +55,8 @@ struct DeletedRecord { public: std::atomic reserved = 0; AckResponder ack_responder_; - ConcurrentVector timestamps_; - ConcurrentVector uids_; + ConcurrentVector timestamps_; + ConcurrentVector uids_; private: std::shared_ptr lru_; diff --git a/internal/core/src/segcore/IndexingEntry.cpp b/internal/core/src/segcore/IndexingEntry.cpp index 460291a7da..4930bf7f98 100644 --- a/internal/core/src/segcore/IndexingEntry.cpp +++ b/internal/core/src/segcore/IndexingEntry.cpp @@ -22,7 +22,7 @@ VecIndexingEntry::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const Vector assert(field_meta_.get_data_type() == DataType::VECTOR_FLOAT); auto dim = field_meta_.get_dim(); - auto source = dynamic_cast*>(vec_base); + auto source = dynamic_cast*>(vec_base); Assert(source); auto chunk_size = source->chunk_size(); assert(ack_end <= chunk_size); @@ -87,7 +87,7 @@ void ScalarIndexingEntry::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const VectorBase* vec_base) { auto dim = field_meta_.get_dim(); - auto source = dynamic_cast*>(vec_base); + auto source = dynamic_cast*>(vec_base); Assert(source); auto chunk_size = source->chunk_size(); assert(ack_end <= chunk_size); @@ -106,7 +106,12 @@ ScalarIndexingEntry::BuildIndexRange(int64_t ack_beg, int64_t ack_end, const std::unique_ptr CreateIndex(const FieldMeta& field_meta) { if (field_meta.is_vector()) { - return std::make_unique(field_meta); + if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { + return std::make_unique(field_meta); + } else { + // TODO + PanicInfo("unsupported"); + } } switch (field_meta.get_data_type()) { case DataType::INT8: diff --git a/internal/core/src/segcore/IndexingEntry.h b/internal/core/src/segcore/IndexingEntry.h index 33b2d96c52..bc7787d8c7 100644 --- a/internal/core/src/segcore/IndexingEntry.h +++ b/internal/core/src/segcore/IndexingEntry.h @@ -100,7 +100,9 @@ class IndexingRecord { Initialize() { int offset = 0; for (auto& field : schema_) { - entries_.try_emplace(offset, CreateIndex(field)); + if (field.get_data_type() != DataType::VECTOR_BINARY) { + entries_.try_emplace(offset, CreateIndex(field)); + } ++offset; } assert(offset == schema_.size()); diff --git a/internal/core/src/segcore/InsertRecord.cpp b/internal/core/src/segcore/InsertRecord.cpp index ac007fb440..5eeb91c0af 100644 --- a/internal/core/src/segcore/InsertRecord.cpp +++ b/internal/core/src/segcore/InsertRecord.cpp @@ -16,36 +16,42 @@ namespace milvus::segcore { InsertRecord::InsertRecord(const Schema& schema) : uids_(1), timestamps_(1) { for (auto& field : schema) { if (field.is_vector()) { - Assert(field.get_data_type() == DataType::VECTOR_FLOAT); - entity_vec_.emplace_back(std::make_shared>(field.get_dim())); - continue; + if (field.get_data_type() == DataType::VECTOR_FLOAT) { + entity_vec_.emplace_back(std::make_shared>(field.get_dim())); + continue; + } else if (field.get_data_type() == DataType::VECTOR_BINARY) { + entity_vec_.emplace_back(std::make_shared>(field.get_dim())); + continue; + } else { + PanicInfo("unsupported"); + } } switch (field.get_data_type()) { case DataType::INT8: { - entity_vec_.emplace_back(std::make_shared>()); + entity_vec_.emplace_back(std::make_shared>()); break; } case DataType::INT16: { - entity_vec_.emplace_back(std::make_shared>()); + entity_vec_.emplace_back(std::make_shared>()); break; } case DataType::INT32: { - entity_vec_.emplace_back(std::make_shared>()); + entity_vec_.emplace_back(std::make_shared>()); break; } case DataType::INT64: { - entity_vec_.emplace_back(std::make_shared>()); + entity_vec_.emplace_back(std::make_shared>()); break; } case DataType::FLOAT: { - entity_vec_.emplace_back(std::make_shared>()); + entity_vec_.emplace_back(std::make_shared>()); break; } case DataType::DOUBLE: { - entity_vec_.emplace_back(std::make_shared>()); + entity_vec_.emplace_back(std::make_shared>()); break; } default: { diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index c8115ab428..ab8d5f5284 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -21,22 +21,14 @@ namespace milvus::segcore { struct InsertRecord { std::atomic reserved = 0; AckResponder ack_responder_; - ConcurrentVector timestamps_; - ConcurrentVector uids_; + ConcurrentVector timestamps_; + ConcurrentVector uids_; std::vector> entity_vec_; explicit InsertRecord(const Schema& schema); template auto - get_scalar_entity(int offset) const { - auto ptr = std::dynamic_pointer_cast>(entity_vec_[offset]); - Assert(ptr); - return ptr; - } - - template - auto - get_vec_entity(int offset) const { + get_entity(int offset) const { auto ptr = std::dynamic_pointer_cast>(entity_vec_[offset]); Assert(ptr); return ptr; @@ -44,15 +36,7 @@ struct InsertRecord { template auto - get_scalar_entity(int offset) { - auto ptr = std::dynamic_pointer_cast>(entity_vec_[offset]); - Assert(ptr); - return ptr; - } - - template - auto - get_vec_entity(int offset) { + get_entity(int offset) { auto ptr = std::dynamic_pointer_cast>(entity_vec_[offset]); Assert(ptr); return ptr; diff --git a/internal/core/src/segcore/SegmentNaive.cpp b/internal/core/src/segcore/SegmentNaive.cpp index 29edd86bd1..1091c6df25 100644 --- a/internal/core/src/segcore/SegmentNaive.cpp +++ b/internal/core/src/segcore/SegmentNaive.cpp @@ -249,7 +249,8 @@ SegmentNaive::QueryImpl(query::QueryDeprecatedPtr query_info, Timestamp timestam auto the_offset_opt = schema_->get_offset(query_info->field_name); Assert(the_offset_opt.has_value()); Assert(the_offset_opt.value() < record_.entity_vec_.size()); - auto vec_ptr = std::static_pointer_cast>(record_.entity_vec_.at(the_offset_opt.value())); + auto vec_ptr = + std::static_pointer_cast>(record_.entity_vec_.at(the_offset_opt.value())); auto index_entry = index_meta_->lookup_by_field(query_info->field_name); auto conf = index_entry.config; @@ -308,7 +309,8 @@ SegmentNaive::QueryBruteForceImpl(query::QueryDeprecatedPtr query_info, Timestam auto the_offset_opt = schema_->get_offset(query_info->field_name); Assert(the_offset_opt.has_value()); Assert(the_offset_opt.value() < record_.entity_vec_.size()); - auto vec_ptr = std::static_pointer_cast>(record_.entity_vec_.at(the_offset_opt.value())); + auto vec_ptr = + std::static_pointer_cast>(record_.entity_vec_.at(the_offset_opt.value())); std::vector final_uids(total_count); std::vector final_dis(total_count, std::numeric_limits::max()); @@ -364,7 +366,8 @@ SegmentNaive::QuerySlowImpl(query::QueryDeprecatedPtr query_info, Timestamp time auto the_offset_opt = schema_->get_offset(query_info->field_name); Assert(the_offset_opt.has_value()); Assert(the_offset_opt.value() < record_.entity_vec_.size()); - auto vec_ptr = std::static_pointer_cast>(record_.entity_vec_.at(the_offset_opt.value())); + auto vec_ptr = + std::static_pointer_cast>(record_.entity_vec_.at(the_offset_opt.value())); std::vector>> records(num_queries); auto get_L2_distance = [dim](const float* a, const float* b) { @@ -467,7 +470,7 @@ SegmentNaive::BuildVecIndexImpl(const IndexMeta::Entry& entry) { auto chunk_size = record_.uids_.chunk_size(); auto& uids = record_.uids_; - auto entities = record_.get_vec_entity(offset); + auto entities = record_.get_entity(offset); std::vector datasets; for (int chunk_id = 0; chunk_id < uids.chunk_size(); ++chunk_id) { diff --git a/internal/core/src/segcore/SegmentSmallIndex.cpp b/internal/core/src/segcore/SegmentSmallIndex.cpp index de52f78252..cae5160ff8 100644 --- a/internal/core/src/segcore/SegmentSmallIndex.cpp +++ b/internal/core/src/segcore/SegmentSmallIndex.cpp @@ -238,7 +238,7 @@ SegmentSmallIndex::BuildVecIndexImpl(const IndexMeta::Entry& entry) { auto indexing = knowhere::VecIndexFactory::GetInstance().CreateVecIndex(entry.type, entry.mode); auto& uids = record_.uids_; - auto entities = record_.get_vec_entity(offset); + auto entities = record_.get_entity(offset); std::vector datasets; for (int chunk_id = 0; chunk_id < uids.chunk_size(); ++chunk_id) { @@ -356,7 +356,7 @@ SegmentSmallIndex::FillTargetEntry(const query::Plan* plan, QueryResult& results auto& uids = record_.uids_; for (int64_t i = 0; i < size; ++i) { auto seg_offset = results.internal_seg_offsets_[i]; - auto row_id = uids[seg_offset]; + auto row_id = seg_offset == -1 ? -1 : uids[seg_offset]; std::vector blob(sizeof(row_id)); memcpy(blob.data(), &row_id, sizeof(row_id)); results.row_data_.emplace_back(std::move(blob)); @@ -367,10 +367,10 @@ SegmentSmallIndex::FillTargetEntry(const query::Plan* plan, QueryResult& results auto key_offset = key_offset_opt.value(); auto field_meta = schema_->operator[](key_offset); Assert(field_meta.get_data_type() == DataType::INT64); - auto uids = record_.get_scalar_entity(key_offset); + auto uids = record_.get_entity(key_offset); for (int64_t i = 0; i < size; ++i) { auto seg_offset = results.internal_seg_offsets_[i]; - auto row_id = uids->operator[](seg_offset); + auto row_id = seg_offset == -1 ? -1 : uids->operator[](seg_offset); std::vector blob(sizeof(row_id)); memcpy(blob.data(), &row_id, sizeof(row_id)); results.row_data_.emplace_back(std::move(blob)); diff --git a/internal/core/unittest/CMakeLists.txt b/internal/core/unittest/CMakeLists.txt index 4210b43f54..cb61890d2d 100644 --- a/internal/core/unittest/CMakeLists.txt +++ b/internal/core/unittest/CMakeLists.txt @@ -10,7 +10,9 @@ set(MILVUS_TEST_FILES test_indexing.cpp test_query.cpp test_expr.cpp - test_bitmap.cpp) + test_bitmap.cpp + test_binary.cpp + ) add_executable(all_tests ${MILVUS_TEST_FILES} ) diff --git a/internal/core/unittest/test_binary.cpp b/internal/core/unittest/test_binary.cpp new file mode 100644 index 0000000000..e5adf3a269 --- /dev/null +++ b/internal/core/unittest/test_binary.cpp @@ -0,0 +1,31 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include "test_utils/DataGen.h" + +using namespace milvus; +using namespace milvus::query; +using namespace milvus::segcore; + +TEST(Binary, Insert) { + int64_t N = 100000; + int64_t num_queries = 10; + int64_t topK = 5; + auto schema = std::make_shared(); + schema->AddField("vecbin", DataType::VECTOR_BINARY, 128); + schema->AddField("age", DataType::INT64); + auto dataset = DataGen(schema, N, 10); + auto segment = CreateSegment(schema); + segment->PreInsert(N); + segment->Insert(0, N, dataset.row_ids_.data(), dataset.timestamps_.data(), dataset.raw_); + int i = 1 + 1; +} diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index cf43eb2a36..d974f4bc46 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -150,12 +150,12 @@ TEST(CApiTest, SearchTest) { } auto blob = raw_group.SerializeAsString(); - void *plan = nullptr; + void* plan = nullptr; auto status = CreatePlan(collection, dsl_string, &plan); assert(status.error_code == Success); - void *placeholderGroup = nullptr; + void* placeholderGroup = nullptr; status = ParsePlaceholderGroup(plan, blob.data(), blob.length(), &placeholderGroup); assert(status.error_code == Success); @@ -617,12 +617,12 @@ TEST(CApiTest, Reduce) { } auto blob = raw_group.SerializeAsString(); - void *plan = nullptr; + void* plan = nullptr; auto status = CreatePlan(collection, dsl_string, &plan); assert(status.error_code == Success); - void *placeholderGroup = nullptr; + void* placeholderGroup = nullptr; status = ParsePlaceholderGroup(plan, blob.data(), blob.length(), &placeholderGroup); assert(status.error_code == Success); diff --git a/internal/core/unittest/test_concurrent_vector.cpp b/internal/core/unittest/test_concurrent_vector.cpp index 847697dde6..ce0acfb11c 100644 --- a/internal/core/unittest/test_concurrent_vector.cpp +++ b/internal/core/unittest/test_concurrent_vector.cpp @@ -38,7 +38,7 @@ TEST(ConcurrentVector, TestABI) { TEST(ConcurrentVector, TestSingle) { auto dim = 8; - ConcurrentVector c_vec(dim); + ConcurrentVectorImpl c_vec(dim); std::default_random_engine e(42); int data = 0; auto total_count = 0; @@ -66,7 +66,7 @@ TEST(ConcurrentVector, TestMultithreads) { constexpr int threads = 16; std::vector total_counts(threads); - ConcurrentVector c_vec(dim); + ConcurrentVectorImpl c_vec(dim); std::atomic ack_counter = 0; // std::mutex mutex; diff --git a/internal/core/unittest/test_indexing.cpp b/internal/core/unittest/test_indexing.cpp index 6f677f4566..266549d01c 100644 --- a/internal/core/unittest/test_indexing.cpp +++ b/internal/core/unittest/test_indexing.cpp @@ -32,6 +32,8 @@ #include #include "test_utils/Timer.h" #include "segcore/Reduce.h" +#include "test_utils/DataGen.h" +#include "query/BruteForceSearch.h" using std::cin; using std::cout; @@ -55,7 +57,7 @@ generate_data(int N) { uids.push_back(10 * N + i); timestamps.push_back(0); // append vec - float vec[DIM]; + vector vec(DIM); for (auto& x : vec) { x = distribution(er); } @@ -81,6 +83,7 @@ TEST(Indexing, SmartBruteForce) { auto [raw_data, timestamps, uids] = generate_data(N); auto total_count = DIM * TOPK; auto raw = (const float*)raw_data.data(); + AssertInfo(raw, "wtf"); constexpr int64_t queries = 3; auto heap = faiss::float_maxheap_array_t{}; @@ -231,3 +234,106 @@ TEST(Indexing, IVFFlatNM) { cout << ids[i] << "->" << dis[i] << endl; } } + +TEST(Indexing, DISABLED_BinaryBruteForce) { + int64_t N = 100000; + int64_t num_queries = 10; + int64_t topk = 5; + int64_t dim = 64; + auto result_count = topk * num_queries; + auto schema = std::make_shared(); + schema->AddField("vecbin", DataType::VECTOR_BINARY, dim); + schema->AddField("age", DataType::INT64); + auto dataset = DataGen(schema, N, 10); + vector distances(result_count); + vector ids(result_count); + auto bin_vec = dataset.get_col(0); + auto line_sizeof = schema->operator[](0).get_sizeof(); + auto query_data = 1024 * line_sizeof + bin_vec.data(); + query::BinarySearchBruteForce(faiss::MetricType::METRIC_Jaccard, line_sizeof, bin_vec.data(), N, topk, num_queries, + query_data, distances.data(), ids.data()); + QueryResult qr; + qr.num_queries_ = num_queries; + qr.topK_ = topk; + qr.internal_seg_offsets_ = ids; + qr.result_distances_ = distances; + + auto json = QueryResultToJson(qr); + auto ref = Json::parse(R"( +[ + [ + [ + "1024->0.000000", + "86966->0.395349", + "24843->0.404762", + "13806->0.416667", + "44313->0.421053" + ], + [ + "1025->0.000000", + "14226->0.348837", + "1488->0.365854", + "47337->0.377778", + "20913->0.377778" + ], + [ + "1026->0.000000", + "81882->0.386364", + "9215->0.409091", + "95024->0.409091", + "54987->0.414634" + ], + [ + "1027->0.000000", + "68981->0.394737", + "75528->0.404762", + "68794->0.405405", + "21975->0.425000" + ], + [ + "1028->0.000000", + "90290->0.375000", + "34309->0.394737", + "58559->0.400000", + "33865->0.400000" + ], + [ + "1029->0.000000", + "62722->0.388889", + "89070->0.394737", + "18528->0.414634", + "94971->0.421053" + ], + [ + "1030->0.000000", + "67402->0.333333", + "3988->0.347826", + "86376->0.354167", + "84381->0.361702" + ], + [ + "1031->0.000000", + "81569->0.325581", + "12715->0.347826", + "40332->0.363636", + "21037->0.372093" + ], + [ + "1032->0.000000", + "60536->0.428571", + "93293->0.432432", + "70969->0.435897", + "64048->0.450000" + ], + [ + "1033->0.000000", + "99022->0.394737", + "11763->0.405405", + "50073->0.428571", + "97118->0.428571" + ] + ] +] +)"); + ASSERT_EQ(json, ref); +} diff --git a/internal/core/unittest/test_query.cpp b/internal/core/unittest/test_query.cpp index 556cfd7cbb..f25f3ddf62 100644 --- a/internal/core/unittest/test_query.cpp +++ b/internal/core/unittest/test_query.cpp @@ -214,17 +214,9 @@ TEST(Query, ExecWithPredicate) { Timestamp time = 1000000; std::vector ph_group_arr = {ph_group.get()}; segment->Search(plan.get(), ph_group_arr.data(), &time, 1, qr); - std::vector> results; int topk = 5; - for (int q = 0; q < num_queries; ++q) { - std::vector result; - for (int k = 0; k < topk; ++k) { - int index = q * topk + k; - result.emplace_back(std::to_string(qr.result_ids_[index]) + "->" + - std::to_string(qr.result_distances_[index])); - } - results.emplace_back(std::move(result)); - } + + Json json = QueryResultToJson(qr); auto ref = Json::parse(R"([ [ @@ -266,7 +258,6 @@ TEST(Query, ExecWithPredicate) { ] ])"); - Json json{results}; ASSERT_EQ(json, ref); } diff --git a/internal/core/unittest/test_utils/DataGen.h b/internal/core/unittest/test_utils/DataGen.h index de6860b503..4ce51c3612 100644 --- a/internal/core/unittest/test_utils/DataGen.h +++ b/internal/core/unittest/test_utils/DataGen.h @@ -87,6 +87,24 @@ DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42) { insert_cols(data); break; } + case engine::DataType::VECTOR_BINARY: { + auto dim = field.get_dim(); + Assert(dim % 8 == 0); + vector data(dim / 8 * N); + for (auto& x : data) { + x = er(); + } + insert_cols(data); + break; + } + case engine::DataType::INT64: { + vector data(N); + for (auto& x : data) { + x = er(); + } + insert_cols(data); + break; + } case engine::DataType::INT32: { vector data(N); for (auto& x : data) { @@ -142,4 +160,41 @@ CreatePlaceholderGroup(int64_t num_queries, int dim, int64_t seed = 42) { return raw_group; } +inline auto +CreateBinaryPlaceholderGroup(int64_t num_queries, int64_t dim, int64_t seed = 42) { + assert(dim % 8 == 0); + namespace ser = milvus::proto::service; + ser::PlaceholderGroup raw_group; + auto value = raw_group.add_placeholders(); + value->set_tag("$0"); + value->set_type(ser::PlaceholderType::VECTOR_FLOAT); + std::default_random_engine e(seed); + for (int i = 0; i < num_queries; ++i) { + std::vector vec; + for (int d = 0; d < dim / 8; ++d) { + vec.push_back(e()); + } + // std::string line((char*)vec.data(), (char*)vec.data() + vec.size() * sizeof(float)); + value->add_values(vec.data(), vec.size() * sizeof(float)); + } + return raw_group; +} + +inline Json +QueryResultToJson(const QueryResult& qr) { + int64_t num_queries = qr.num_queries_; + int64_t topk = qr.topK_; + std::vector> results; + for (int q = 0; q < num_queries; ++q) { + std::vector result; + for (int k = 0; k < topk; ++k) { + int index = q * topk + k; + result.emplace_back(std::to_string(qr.internal_seg_offsets_[index]) + "->" + + std::to_string(qr.result_distances_[index])); + } + results.emplace_back(std::move(result)); + } + return Json{results}; +}; + } // namespace milvus::segcore diff --git a/internal/proxy/task.go b/internal/proxy/task.go index e67c4bfc58..a2bf2e8513 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -309,7 +309,8 @@ func (dct *DropCollectionTask) Execute() error { } func (dct *DropCollectionTask) PostExecute() error { - return globalMetaCache.Remove(dct.CollectionName.CollectionName) + globalMetaCache.Remove(dct.CollectionName.CollectionName) + return nil } type QueryTask struct { @@ -413,16 +414,24 @@ func (qt *QueryTask) PostExecute() error { return errors.New("wait to finish failed, timeout") case searchResults := <-qt.resultBuf: filterSearchResult := make([]*internalpb.SearchResult, 0) + var filterReason string for _, partialSearchResult := range searchResults { if partialSearchResult.Status.ErrorCode == commonpb.ErrorCode_SUCCESS { filterSearchResult = append(filterSearchResult, partialSearchResult) + } else { + filterReason += partialSearchResult.Status.Reason + "\n" } } - rlen := len(filterSearchResult) // query num + rlen := len(filterSearchResult) // query node num if rlen <= 0 { - qt.result = &servicepb.QueryResult{} - return nil + qt.result = &servicepb.QueryResult{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: filterReason, + }, + } + return errors.New(filterReason) } n := len(filterSearchResult[0].Hits) // n diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index a2a66c329d..e045e9ff9b 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -138,7 +138,7 @@ func (ss *searchService) receiveSearchMsg() { for _, msg := range searchMsg { err := ss.search(msg) if err != nil { - log.Println("search Failed, error msg type: ", msg.Type()) + log.Println(err) err = ss.publishFailedSearchResult(msg) if err != nil { log.Println("publish FailedSearchResult failed, error message: ", err) @@ -190,7 +190,7 @@ func (ss *searchService) doUnsolvedMsgSearch() { for _, msg := range searchMsg { err := ss.search(msg) if err != nil { - log.Println("search Failed, error msg type: ", msg.Type()) + log.Println(err) err = ss.publishFailedSearchResult(msg) if err != nil { log.Println("publish FailedSearchResult failed, error message: ", err)