diff --git a/internal/core/src/query/CMakeLists.txt b/internal/core/src/query/CMakeLists.txt index 9cd8c684af..395bb345d4 100644 --- a/internal/core/src/query/CMakeLists.txt +++ b/internal/core/src/query/CMakeLists.txt @@ -8,7 +8,6 @@ set(MILVUS_QUERY_SRCS visitors/ShowExprVisitor.cpp visitors/ExecExprVisitor.cpp Plan.cpp - Search.cpp ) add_library(milvus_query ${MILVUS_QUERY_SRCS}) target_link_libraries(milvus_query milvus_proto) diff --git a/internal/core/src/query/Search.cpp b/internal/core/src/query/Search.cpp deleted file mode 100644 index 1a38d1dea3..0000000000 --- a/internal/core/src/query/Search.cpp +++ /dev/null @@ -1,107 +0,0 @@ -#include "Search.h" -#include -#include -#include "segcore/Reduce.h" - -#include -#include "utils/tools.h" - -namespace milvus::query { -static faiss::ConcurrentBitsetPtr -create_bitmap_view(std::optional bitmaps_opt, int64_t chunk_id) { - if (!bitmaps_opt.has_value()) { - return nullptr; - } - auto& bitmaps = *bitmaps_opt.value(); - auto& src_vec = bitmaps.at(chunk_id); - auto dst = std::make_shared(src_vec.size()); - boost::to_block_range(src_vec, dst->mutable_data()); - return dst; -} - -using namespace segcore; -Status -QueryBruteForceImpl(const SegmentSmallIndex& segment, - const query::QueryInfo& info, - const float* query_data, - int64_t num_queries, - Timestamp timestamp, - std::optional bitmaps_opt, - QueryResult& results) { - auto& record = segment.get_insert_record(); - auto& schema = segment.get_schema(); - auto& indexing_record = segment.get_indexing_record(); - // step 1: binary search to find the barrier of the snapshot - auto ins_barrier = get_barrier(record, timestamp); - auto max_chunk = upper_div(ins_barrier, DefaultElementPerChunk); - // auto del_barrier = get_barrier(deleted_record_, timestamp); - -#if 0 - auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier); - Assert(bitmap_holder); - auto bitmap = bitmap_holder->bitmap_ptr; -#endif - - // step 2.1: get meta - // step 2.2: get which vector field to search - auto vecfield_offset_opt = schema.get_offset(info.field_id_); - Assert(vecfield_offset_opt.has_value()); - auto vecfield_offset = vecfield_offset_opt.value(); - auto& field = schema[vecfield_offset]; - auto vec_ptr = record.get_vec_entity(vecfield_offset); - - Assert(field.get_data_type() == DataType::VECTOR_FLOAT); - auto dim = field.get_dim(); - auto topK = info.topK_; - auto total_count = topK * num_queries; - // TODO: optimize - - // step 3: small indexing search - std::vector final_uids(total_count, -1); - std::vector final_dis(total_count, std::numeric_limits::max()); - - auto max_indexed_id = indexing_record.get_finished_ack(); - const auto& indexing_entry = indexing_record.get_indexing(vecfield_offset); - auto search_conf = indexing_entry.get_search_conf(topK); - - for (int chunk_id = 0; chunk_id < max_indexed_id; ++chunk_id) { - auto indexing = indexing_entry.get_indexing(chunk_id); - auto src_data = vec_ptr->get_chunk(chunk_id).data(); - auto dataset = knowhere::GenDataset(num_queries, dim, src_data); - auto bitmap_view = create_bitmap_view(bitmaps_opt, chunk_id); - auto ans = indexing->Query(dataset, search_conf, bitmap_view); - auto dis = ans->Get(milvus::knowhere::meta::DISTANCE); - auto uids = ans->Get(milvus::knowhere::meta::IDS); - merge_into(num_queries, topK, final_dis.data(), final_uids.data(), dis, uids); - } - - // step 4: brute force search where small indexing is unavailable - for (int chunk_id = max_indexed_id; chunk_id < max_chunk; ++chunk_id) { - std::vector buf_uids(total_count, -1); - std::vector buf_dis(total_count, std::numeric_limits::max()); - - faiss::float_maxheap_array_t buf = {(size_t)num_queries, (size_t)topK, buf_uids.data(), buf_dis.data()}; - auto src_data = vec_ptr->get_chunk(chunk_id).data(); - auto nsize = - chunk_id != max_chunk - 1 ? DefaultElementPerChunk : ins_barrier - chunk_id * DefaultElementPerChunk; - auto bitmap_view = create_bitmap_view(bitmaps_opt, chunk_id); - faiss::knn_L2sqr(query_data, src_data, dim, num_queries, nsize, &buf, bitmap_view); - merge_into(num_queries, topK, final_dis.data(), final_uids.data(), buf_dis.data(), buf_uids.data()); - } - - // step 5: convert offset to uids - for (auto& id : final_uids) { - if (id == -1) { - continue; - } - id = record.uids_[id]; - } - - results.result_ids_ = std::move(final_uids); - results.result_distances_ = std::move(final_dis); - results.topK_ = topK; - results.num_queries_ = num_queries; - - return Status::OK(); -} -} // namespace milvus::query diff --git a/internal/core/src/query/Search.h b/internal/core/src/query/Search.h deleted file mode 100644 index 0febf74a49..0000000000 --- a/internal/core/src/query/Search.h +++ /dev/null @@ -1,19 +0,0 @@ -#pragma once -#include -#include "segcore/SegmentSmallIndex.h" -#include - -namespace milvus::query { -using BitmapChunk = boost::dynamic_bitset<>; -using BitmapSimple = std::deque; - -// note: c++17 don't support optional ref -Status -QueryBruteForceImpl(const segcore::SegmentSmallIndex& segment, - const QueryInfo& info, - const float* query_data, - int64_t num_queries, - Timestamp timestamp, - std::optional bitmap_opt, - segcore::QueryResult& results); -} // namespace milvus::query diff --git a/internal/core/src/query/generated/ExecExprVisitor.h b/internal/core/src/query/generated/ExecExprVisitor.h index e5327f7e71..2e173289ab 100644 --- a/internal/core/src/query/generated/ExecExprVisitor.h +++ b/internal/core/src/query/generated/ExecExprVisitor.h @@ -4,7 +4,6 @@ #include "segcore/SegmentSmallIndex.h" #include #include "query/ExprImpl.h" -#include "boost/dynamic_bitset.hpp" #include "ExprVisitor.h" namespace milvus::query { @@ -23,7 +22,7 @@ class ExecExprVisitor : ExprVisitor { visit(RangeExpr& expr) override; public: - using RetType = std::deque>; + using RetType = std::vector>; explicit ExecExprVisitor(segcore::SegmentSmallIndex& segment) : segment_(segment) { } RetType diff --git a/internal/core/src/query/visitors/ExecExprVisitor.cpp b/internal/core/src/query/visitors/ExecExprVisitor.cpp index 889485393e..43b6971200 100644 --- a/internal/core/src/query/visitors/ExecExprVisitor.cpp +++ b/internal/core/src/query/visitors/ExecExprVisitor.cpp @@ -1,7 +1,6 @@ #include "segcore/SegmentSmallIndex.h" #include #include "query/ExprImpl.h" -#include "boost/dynamic_bitset.hpp" #include "query/generated/ExecExprVisitor.h" namespace milvus::query { @@ -11,7 +10,7 @@ namespace milvus::query { namespace impl { class ExecExprVisitor : ExprVisitor { public: - using RetType = std::deque>; + using RetType = std::vector>; explicit ExecExprVisitor(segcore::SegmentSmallIndex& segment) : segment_(segment) { } RetType @@ -67,7 +66,7 @@ ExecExprVisitor::ExecRangeVisitorImpl(RangeExprImpl& expr, Func func) -> RetT auto& field_meta = schema[field_offset]; auto vec_ptr = records.get_scalar_entity(field_offset); auto& vec = *vec_ptr; - RetType results(vec.chunk_size()); + std::vector> results(vec.chunk_size()); for (auto chunk_id = 0; chunk_id < vec.chunk_size(); ++chunk_id) { auto& result = results[chunk_id]; result.resize(segcore::DefaultElementPerChunk); diff --git a/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp b/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp index f35f7e9367..14cc43c99b 100644 --- a/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp +++ b/internal/core/src/query/visitors/ExecPlanNodeVisitor.cpp @@ -3,8 +3,6 @@ #include "segcore/SegmentBase.h" #include "query/generated/ExecPlanNodeVisitor.h" #include "segcore/SegmentSmallIndex.h" -#include "query/generated/ExecExprVisitor.h" -#include "query/Search.h" namespace milvus::query { @@ -51,12 +49,7 @@ ExecPlanNodeVisitor::visit(FloatVectorANNS& node) { auto& ph = placeholder_group_.at(0); auto src_data = ph.get_blob(); auto num_queries = ph.num_of_queries_; - if (node.predicate_.has_value()) { - auto bitmap = ExecExprVisitor(*segment).call_child(*node.predicate_.value()); - auto ptr = &bitmap; - QueryBruteForceImpl(*segment, node.query_info_, src_data, num_queries, timestamp_, ptr, ret); - } - QueryBruteForceImpl(*segment, node.query_info_, src_data, num_queries, timestamp_, std::nullopt, ret); + segment->QueryBruteForceImpl(node.query_info_, src_data, num_queries, timestamp_, ret); ret_ = ret; } diff --git a/internal/core/src/segcore/AckResponder.h b/internal/core/src/segcore/AckResponder.h index bb12db60e4..894a2dd237 100644 --- a/internal/core/src/segcore/AckResponder.h +++ b/internal/core/src/segcore/AckResponder.h @@ -37,5 +37,6 @@ class AckResponder { std::shared_mutex mutex_; std::set acks_ = {0}; std::atomic minimum_ = 0; + // std::atomic maximum_ = 0; }; } // namespace milvus::segcore diff --git a/internal/core/src/segcore/DeletedRecord.h b/internal/core/src/segcore/DeletedRecord.h index 3edbcc9c40..54e35e9275 100644 --- a/internal/core/src/segcore/DeletedRecord.h +++ b/internal/core/src/segcore/DeletedRecord.h @@ -4,7 +4,6 @@ #include "common/Schema.h" #include "knowhere/index/vector_index/IndexIVF.h" #include -#include "segcore/Record.h" namespace milvus::segcore { diff --git a/internal/core/src/segcore/IndexingEntry.h b/internal/core/src/segcore/IndexingEntry.h index 6cc61df36d..8823602b39 100644 --- a/internal/core/src/segcore/IndexingEntry.h +++ b/internal/core/src/segcore/IndexingEntry.h @@ -67,7 +67,7 @@ class IndexingRecord { // concurrent int64_t - get_finished_ack() const { + get_finished_ack() { return finished_ack_.GetAck(); } diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index e565a1aaca..a34ecf02eb 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -2,7 +2,6 @@ #include "common/Schema.h" #include "ConcurrentVector.h" #include "AckResponder.h" -#include "segcore/Record.h" namespace milvus::segcore { struct InsertRecord { diff --git a/internal/core/src/segcore/Record.h b/internal/core/src/segcore/Record.h deleted file mode 100644 index 8e35ef4bc3..0000000000 --- a/internal/core/src/segcore/Record.h +++ /dev/null @@ -1,21 +0,0 @@ -#pragma once -#include "common/Schema.h" - -namespace milvus::segcore { -template -inline int64_t -get_barrier(const RecordType& record, Timestamp timestamp) { - auto& vec = record.timestamps_; - int64_t beg = 0; - int64_t end = record.ack_responder_.GetAck(); - while (beg < end) { - auto mid = (beg + end) / 2; - if (vec[mid] < timestamp) { - beg = mid + 1; - } else { - end = mid; - } - } - return beg; -} -} // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentBase.h b/internal/core/src/segcore/SegmentBase.h index 1e870c8d61..faea669804 100644 --- a/internal/core/src/segcore/SegmentBase.h +++ b/internal/core/src/segcore/SegmentBase.h @@ -52,7 +52,10 @@ class SegmentBase { virtual Status Delete(int64_t reserved_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) = 0; - public: + // query contains metadata of + virtual Status + QueryDeprecated(query::QueryDeprecatedPtr query, Timestamp timestamp, QueryResult& results) = 0; + virtual Status Search(const query::Plan* Plan, const query::PlaceholderGroup* placeholder_groups[], diff --git a/internal/core/src/segcore/SegmentNaive.cpp b/internal/core/src/segcore/SegmentNaive.cpp index eab6b6a508..917b4b524a 100644 --- a/internal/core/src/segcore/SegmentNaive.cpp +++ b/internal/core/src/segcore/SegmentNaive.cpp @@ -219,6 +219,23 @@ SegmentNaive::Delete(int64_t reserved_begin, int64_t size, const int64_t* uids_r // return Status::OK(); } +template +int64_t +get_barrier(const RecordType& record, Timestamp timestamp) { + auto& vec = record.timestamps_; + int64_t beg = 0; + int64_t end = record.ack_responder_.GetAck(); + while (beg < end) { + auto mid = (beg + end) / 2; + if (vec[mid] < timestamp) { + beg = mid + 1; + } else { + end = mid; + } + } + return beg; +} + Status SegmentNaive::QueryImpl(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& result) { auto ins_barrier = get_barrier(record_, timestamp); diff --git a/internal/core/src/segcore/SegmentNaive.h b/internal/core/src/segcore/SegmentNaive.h index 6a4ced97ca..a4e35afb56 100644 --- a/internal/core/src/segcore/SegmentNaive.h +++ b/internal/core/src/segcore/SegmentNaive.h @@ -41,12 +41,10 @@ class SegmentNaive : public SegmentBase { Status Delete(int64_t reserverd_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) override; - private: - // NOTE: now deprecated, remains for further copy out + // query contains metadata of Status - QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results); + QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results) override; - public: Status Search(const query::Plan* Plan, const query::PlaceholderGroup* placeholder_groups[], diff --git a/internal/core/src/segcore/SegmentSmallIndex.cpp b/internal/core/src/segcore/SegmentSmallIndex.cpp index 3af53741d6..47248e75c9 100644 --- a/internal/core/src/segcore/SegmentSmallIndex.cpp +++ b/internal/core/src/segcore/SegmentSmallIndex.cpp @@ -204,6 +204,137 @@ SegmentSmallIndex::Delete(int64_t reserved_begin, // return Status::OK(); } +template +int64_t +get_barrier(const RecordType& record, Timestamp timestamp) { + auto& vec = record.timestamps_; + int64_t beg = 0; + int64_t end = record.ack_responder_.GetAck(); + while (beg < end) { + auto mid = (beg + end) / 2; + if (vec[mid] < timestamp) { + beg = mid + 1; + } else { + end = mid; + } + } + return beg; +} + +Status +SegmentSmallIndex::QueryBruteForceImpl(const query::QueryInfo& info, + const float* query_data, + int64_t num_queries, + Timestamp timestamp, + QueryResult& results) { + // step 1: binary search to find the barrier of the snapshot + auto ins_barrier = get_barrier(record_, timestamp); + // auto del_barrier = get_barrier(deleted_record_, timestamp); +#if 0 + auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier); + Assert(bitmap_holder); + auto bitmap = bitmap_holder->bitmap_ptr; +#endif + + // step 2.1: get meta + // step 2.2: get which vector field to search + auto vecfield_offset_opt = schema_->get_offset(info.field_id_); + Assert(vecfield_offset_opt.has_value()); + auto vecfield_offset = vecfield_offset_opt.value(); + Assert(vecfield_offset < record_.entity_vec_.size()); + + auto& field = schema_->operator[](vecfield_offset); + auto vec_ptr = std::static_pointer_cast>(record_.entity_vec_.at(vecfield_offset)); + + Assert(field.get_data_type() == DataType::VECTOR_FLOAT); + auto dim = field.get_dim(); + auto topK = info.topK_; + auto total_count = topK * num_queries; + // TODO: optimize + + // step 3: small indexing search + std::vector final_uids(total_count, -1); + std::vector final_dis(total_count, std::numeric_limits::max()); + + auto max_chunk = (ins_barrier + DefaultElementPerChunk - 1) / DefaultElementPerChunk; + + auto max_indexed_id = indexing_record_.get_finished_ack(); + const auto& indexing_entry = indexing_record_.get_indexing(vecfield_offset); + auto search_conf = indexing_entry.get_search_conf(topK); + + for (int chunk_id = 0; chunk_id < max_indexed_id; ++chunk_id) { + auto indexing = indexing_entry.get_indexing(chunk_id); + auto src_data = vec_ptr->get_chunk(chunk_id).data(); + auto dataset = knowhere::GenDataset(num_queries, dim, src_data); + auto ans = indexing->Query(dataset, search_conf, nullptr); + auto dis = ans->Get(milvus::knowhere::meta::DISTANCE); + auto uids = ans->Get(milvus::knowhere::meta::IDS); + merge_into(num_queries, topK, final_dis.data(), final_uids.data(), dis, uids); + } + + // step 4: brute force search where small indexing is unavailable + for (int chunk_id = max_indexed_id; chunk_id < max_chunk; ++chunk_id) { + std::vector buf_uids(total_count, -1); + std::vector buf_dis(total_count, std::numeric_limits::max()); + + faiss::float_maxheap_array_t buf = {(size_t)num_queries, (size_t)topK, buf_uids.data(), buf_dis.data()}; + + auto src_data = vec_ptr->get_chunk(chunk_id).data(); + auto nsize = + chunk_id != max_chunk - 1 ? DefaultElementPerChunk : ins_barrier - chunk_id * DefaultElementPerChunk; + faiss::knn_L2sqr(query_data, src_data, dim, num_queries, nsize, &buf); + merge_into(num_queries, topK, final_dis.data(), final_uids.data(), buf_dis.data(), buf_uids.data()); + } + + // step 5: convert offset to uids + for (auto& id : final_uids) { + if (id == -1) { + continue; + } + id = record_.uids_[id]; + } + + results.result_ids_ = std::move(final_uids); + results.result_distances_ = std::move(final_dis); + results.topK_ = topK; + results.num_queries_ = num_queries; + + // throw std::runtime_error("unimplemented"); + return Status::OK(); +} + +Status +SegmentSmallIndex::QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& result) { + // TODO: enable delete + // TODO: enable index + // TODO: remove mock + if (query_info == nullptr) { + query_info = std::make_shared(); + query_info->field_name = "fakevec"; + query_info->topK = 10; + query_info->num_queries = 1; + + auto dim = schema_->operator[]("fakevec").get_dim(); + std::default_random_engine e(42); + std::uniform_real_distribution<> dis(0.0, 1.0); + query_info->query_raw_data.resize(query_info->num_queries * dim); + for (auto& x : query_info->query_raw_data) { + x = dis(e); + } + } + // TODO + query::QueryInfo info{ + query_info->topK, + query_info->field_name, + "L2", + nlohmann::json{ + {"nprobe", 10}, + }, + }; + auto num_queries = query_info->num_queries; + return QueryBruteForceImpl(info, query_info->query_raw_data.data(), num_queries, timestamp, result); +} + Status SegmentSmallIndex::Close() { if (this->record_.reserved != this->record_.ack_responder_.GetAck()) { diff --git a/internal/core/src/segcore/SegmentSmallIndex.h b/internal/core/src/segcore/SegmentSmallIndex.h index 70541f8631..9bb3030e89 100644 --- a/internal/core/src/segcore/SegmentSmallIndex.h +++ b/internal/core/src/segcore/SegmentSmallIndex.h @@ -65,6 +65,10 @@ class SegmentSmallIndex : public SegmentBase { Status Delete(int64_t reserverd_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps) override; + // query contains metadata of + Status + QueryDeprecated(query::QueryDeprecatedPtr query_info, Timestamp timestamp, QueryResult& results) override; + Status Search(const query::Plan* Plan, const query::PlaceholderGroup* placeholder_groups[], @@ -108,16 +112,6 @@ class SegmentSmallIndex : public SegmentBase { return record_; } - const IndexingRecord& - get_indexing_record() const { - return indexing_record_; - } - - const DeletedRecord& - get_deleted_record() const { - return deleted_record_; - } - const Schema& get_schema() const { return *schema_; @@ -154,12 +148,12 @@ class SegmentSmallIndex : public SegmentBase { // Status // QueryBruteForceImpl(query::QueryPtr query, Timestamp timestamp, QueryResult& results); - // Status - // QueryBruteForceImpl(const query::QueryInfo& info, - // const float* query_data, - // int64_t num_queries, - // Timestamp timestamp, - // QueryResult& results); + Status + QueryBruteForceImpl(const query::QueryInfo& info, + const float* query_data, + int64_t num_queries, + Timestamp timestamp, + QueryResult& results); template knowhere::IndexPtr @@ -178,5 +172,4 @@ class SegmentSmallIndex : public SegmentBase { // std::unordered_map indexings_; // index_name => indexing tbb::concurrent_unordered_multimap uid2offset_; }; - } // namespace milvus::segcore diff --git a/scripts/README.md b/scripts/README.md index c3475d6e15..6f1dbcb0a1 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -34,11 +34,7 @@ #### Generate the go files from proto file ```shell script - cd milvus-distributed - pwd_dir=`pwd` - export PATH=$PATH:$(go env GOPATH)/bin - export protoc=${pwd_dir}/cmake_build/thirdparty/protobuf/protobuf-build/protoc - ./scripts/proto_gen_go.sh + make check-proto-product ``` #### Check code specifications @@ -53,6 +49,16 @@ make all ``` +#### Install docker-compose + +refer: https://docs.docker.com/compose/install/ +```shell script + sudo curl -L "https://github.com/docker/compose/releases/download/1.27.4/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose + sudo chmod +x /usr/local/bin/docker-compose + sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose + docker-compose --version +``` + #### Start service ```shell script