From 60fdd7e4f4d89f98a8508231fa99ec3db63d8246 Mon Sep 17 00:00:00 2001 From: yah01 Date: Wed, 26 Apr 2023 10:30:34 +0800 Subject: [PATCH] Introduce simdjson (#23644) Signed-off-by: yah01 --- internal/core/src/common/CMakeLists.txt | 1 + internal/core/src/common/Column.h | 28 ++--- internal/core/src/common/Json.h | 107 ++++++++++++++++++ internal/core/src/common/Types.h | 8 +- internal/core/src/common/Utils.h | 10 +- .../src/query/visitors/ExecExprVisitor.cpp | 2 +- .../core/src/segcore/ConcurrentVector.cpp | 7 +- .../core/src/segcore/SegmentGrowingImpl.cpp | 6 +- .../core/src/segcore/SegmentSealedImpl.cpp | 23 +--- internal/core/src/segcore/SegmentSealedImpl.h | 2 +- internal/core/src/segcore/Utils.cpp | 33 ++++-- internal/core/thirdparty/CMakeLists.txt | 2 + .../core/thirdparty/simdjson/CMakeLists.txt | 19 ++++ internal/core/unittest/test_expr.cpp | 41 ++++--- internal/core/unittest/test_query.cpp | 8 +- 15 files changed, 213 insertions(+), 84 deletions(-) create mode 100644 internal/core/src/common/Json.h create mode 100644 internal/core/thirdparty/simdjson/CMakeLists.txt diff --git a/internal/core/src/common/CMakeLists.txt b/internal/core/src/common/CMakeLists.txt index 3bff613faf..8996aa7348 100644 --- a/internal/core/src/common/CMakeLists.txt +++ b/internal/core/src/common/CMakeLists.txt @@ -30,6 +30,7 @@ target_link_libraries(milvus_common milvus_log yaml-cpp boost_bitset_ext + simdjson ${CONAN_LIBS} ) diff --git a/internal/core/src/common/Column.h b/internal/core/src/common/Column.h index cb577ddf22..ff0d5c83d4 100644 --- a/internal/core/src/common/Column.h +++ b/internal/core/src/common/Column.h @@ -75,22 +75,22 @@ class ColumnBase { uint64_t size_{0}; }; -class FixedColumn : public ColumnBase { +class Column : public ColumnBase { public: - FixedColumn(int64_t segment_id, - const FieldMeta& field_meta, - const LoadFieldDataInfo& info) { + Column(int64_t segment_id, + const FieldMeta& field_meta, + const LoadFieldDataInfo& info) { data_ = static_cast(CreateMap(segment_id, field_meta, info)); size_ = field_meta.get_sizeof() * info.row_count; row_count_ = info.row_count; } - FixedColumn(FixedColumn&& column) noexcept + Column(Column&& column) noexcept : ColumnBase(std::move(column)), row_count_(column.row_count_) { column.row_count_ = 0; } - ~FixedColumn() override = default; + ~Column() override = default; SpanBase span() const override { @@ -107,11 +107,9 @@ class VariableColumn : public ColumnBase { using ViewType = std::conditional_t, std::string_view, T>; - template VariableColumn(int64_t segment_id, const FieldMeta& field_meta, - const LoadFieldDataInfo& info, - Ctor&& ctor) { + const LoadFieldDataInfo& info) { auto begin = FIELD_DATA(info.field_data, string).begin(); auto end = FIELD_DATA(info.field_data, string).end(); if constexpr (std::is_same_v) { @@ -127,7 +125,7 @@ class VariableColumn : public ColumnBase { } data_ = static_cast(CreateMap(segment_id, field_meta, info)); - construct_views(std::forward(ctor)); + construct_views(); } VariableColumn(VariableColumn&& field) noexcept @@ -162,16 +160,14 @@ class VariableColumn : public ColumnBase { } protected: - template void - construct_views(Ctor ctor) { + construct_views() { views_.reserve(indices_.size()); for (size_t i = 0; i < indices_.size() - 1; i++) { - views_.emplace_back( - ctor(data_ + indices_[i], indices_[i + 1] - indices_[i])); + views_.emplace_back(data_ + indices_[i], + indices_[i + 1] - indices_[i]); } - views_.emplace_back( - ctor(data_ + indices_.back(), size_ - indices_.back())); + views_.emplace_back(data_ + indices_.back(), size_ - indices_.back()); } private: diff --git a/internal/core/src/common/Json.h b/internal/core/src/common/Json.h new file mode 100644 index 0000000000..63211a6447 --- /dev/null +++ b/internal/core/src/common/Json.h @@ -0,0 +1,107 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "exceptions/EasyAssert.h" +#include "simdjson.h" +#include "fmt/core.h" + +namespace milvus { + +class Json { + public: + Json() = default; + + explicit Json(simdjson::padded_string data) : own_data_(std::move(data)) { + data_ = own_data_.value(); + } + + explicit Json(simdjson::padded_string_view data) : data_(data) { + } + + Json(const char* data, size_t len, size_t cap) : data_(data, len) { + AssertInfo(len + simdjson::SIMDJSON_PADDING <= cap, + fmt::format("create json without enough memory size for " + "SIMD, len={}, cap={}", + len, + cap)); + } + + // WARN: this is used for fast non-copy construction, + // MUST make sure that the data points to a memory that + // with size at least len + SIMDJSON_PADDING + Json(const char* data, size_t len) : data_(data, len) { + } + + Json(Json&& json) = default; + + Json& + operator=(const Json& json) { + if (json.own_data_.has_value()) { + own_data_ = simdjson::padded_string( + json.own_data_.value().data(), json.own_data_.value().length()); + } + + data_ = json.data_; + return *this; + } + + operator std::string_view() const { + return data_; + } + + void + parse(simdjson::padded_string_view data) { + data_ = data; + } + + auto + doc() const { + thread_local simdjson::ondemand::parser parser; + + // it's always safe to add the padding, + // as we have allocated the memory with this padding + auto doc = + parser.iterate(data_, data_.size() + simdjson::SIMDJSON_PADDING); + return doc.get_object(); + } + + auto + operator[](const std::string_view field) const { + return doc()[field]; + } + + auto + at_pointer(const std::string_view pointer) const { + return doc().at_pointer(pointer); + } + + std::string_view + data() const { + return data_; + } + + private: + std::optional + own_data_; // this could be empty, then the Json will be just s view on bytes + simdjson::padded_string_view data_; +}; +} // namespace milvus diff --git a/internal/core/src/common/Types.h b/internal/core/src/common/Types.h index 10fc0a345e..fc3dcfc5f7 100644 --- a/internal/core/src/common/Types.h +++ b/internal/core/src/common/Types.h @@ -18,15 +18,16 @@ #include #include - #include #include #include #include #include + #include #include #include +#include #include #include #include @@ -35,10 +36,11 @@ #include "knowhere/binaryset.h" #include "knowhere/comp/index_param.h" #include "knowhere/dataset.h" -#include "nlohmann/json.hpp" +#include "simdjson.h" #include "pb/plan.pb.h" #include "pb/schema.pb.h" #include "pb/segcore.pb.h" +#include "Json.h" namespace milvus { @@ -79,7 +81,6 @@ using VectorArray = proto::schema::VectorField; using IdArray = proto::schema::IDs; using InsertData = proto::segcore::InsertRecord; using PkType = std::variant; -using Json = nlohmann::json; inline bool IsPrimaryKeyDataType(DataType data_type) { @@ -153,5 +154,4 @@ struct LargeType { int64_t x, y, z; }; static_assert(std::is_same_v>); - } // namespace milvus diff --git a/internal/core/src/common/Utils.h b/internal/core/src/common/Utils.h index 40522d2eba..089a2b3c4f 100644 --- a/internal/core/src/common/Utils.h +++ b/internal/core/src/common/Utils.h @@ -32,6 +32,7 @@ #include "exceptions/EasyAssert.h" #include "knowhere/dataset.h" #include "knowhere/expected.h" +#include "simdjson.h" namespace milvus { #define FIELD_DATA(data_array, type) \ @@ -373,6 +374,11 @@ CreateMap(int64_t segment_id, // macOS doesn't support MAP_POPULATE mmap_flags |= MAP_POPULATE; #endif + + // simdjson requires a padding following the json data + size_t padding = field_meta.get_data_type() == DataType::JSON + ? simdjson::SIMDJSON_PADDING + : 0; // Allocate memory if (info.mmap_dir_path == nullptr) { auto data_type = field_meta.get_data_type(); @@ -383,7 +389,7 @@ CreateMap(int64_t segment_id, // Use anon mapping so we are able to free these memory with munmap only void* map = mmap(nullptr, - data_size, + data_size + padding, PROT_READ | PROT_WRITE, mmap_flags | MAP_ANON, -1, @@ -428,7 +434,7 @@ CreateMap(int64_t segment_id, return nullptr; } - auto map = mmap(nullptr, written, PROT_READ, mmap_flags, fd, 0); + auto map = mmap(nullptr, written + padding, PROT_READ, mmap_flags, fd, 0); AssertInfo(map != MAP_FAILED, fmt::format("failed to create map for data file {}, err: {}", filepath.c_str(), diff --git a/internal/core/src/query/visitors/ExecExprVisitor.cpp b/internal/core/src/query/visitors/ExecExprVisitor.cpp index 869fb77fed..77139d99a5 100644 --- a/internal/core/src/query/visitors/ExecExprVisitor.cpp +++ b/internal/core/src/query/visitors/ExecExprVisitor.cpp @@ -197,7 +197,7 @@ ExecExprVisitor::ExecRangeVisitorImpl(FieldId field_id, auto x = data[index]; result[index] = element_func(x); } - AssertInfo(result.size() == this_size, ""); + results.emplace_back(std::move(result)); } auto final_result = Assemble(results); diff --git a/internal/core/src/segcore/ConcurrentVector.cpp b/internal/core/src/segcore/ConcurrentVector.cpp index b1ff0944bb..fa7df7d69f 100644 --- a/internal/core/src/segcore/ConcurrentVector.cpp +++ b/internal/core/src/segcore/ConcurrentVector.cpp @@ -13,6 +13,7 @@ #include "common/Types.h" #include "common/Utils.h" #include "nlohmann/json.hpp" +#include "simdjson.h" namespace milvus::segcore { @@ -74,10 +75,10 @@ VectorBase::set_data_raw(ssize_t element_offset, return set_data_raw(element_offset, data_raw.data(), element_count); } case DataType::JSON: { - auto json_data = FIELD_DATA(data, json); + auto& json_data = FIELD_DATA(data, json); std::vector data_raw(json_data.size()); for (auto& json_bytes : json_data) { - data_raw.emplace_back(Json::parse(json_bytes)); + data_raw.emplace_back(simdjson::padded_string(json_bytes)); } return set_data_raw(element_offset, data_raw.data(), element_count); } @@ -155,7 +156,7 @@ VectorBase::fill_chunk_data(ssize_t element_count, size_t index = 0; for (auto& str : FIELD_DATA(data, json)) { - chunk[index++] = str; + chunk[index++] = Json(simdjson::padded_string(str)); } return; } diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index d92b27c674..a3e0eeb1cd 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -364,11 +364,7 @@ SegmentGrowingImpl::bulk_subscript_impl(const VectorBase& vec_raw, for (int64_t i = 0; i < count; ++i) { auto offset = seg_offsets[i]; if (offset != INVALID_SEG_OFFSET) { - if constexpr (std::is_same_v) { - output[i] = vec[offset].dump(); - } else { - output[i] = vec[offset]; - } + output[i] = vec[offset]; } } } diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index a86b5c61df..3d0e8efcb9 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -20,12 +20,12 @@ #include #include "Utils.h" +#include "Types.h" #include "common/Column.h" #include "common/Consts.h" #include "common/FieldMeta.h" #include "common/Types.h" #include "log/Log.h" -#include "nlohmann/json.hpp" #include "query/ScalarIndex.h" #include "query/SearchBruteForce.h" #include "query/SearchOnSealed.h" @@ -240,25 +240,12 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { case milvus::DataType::STRING: case milvus::DataType::VARCHAR: { column = std::make_unique>( - get_segment_id(), - field_meta, - info, - [](const char* data, size_t len) { - return std::string_view(data, len); - }); + get_segment_id(), field_meta, info); break; } case milvus::DataType::JSON: { column = std::make_unique>( - get_segment_id(), - field_meta, - info, - [](const char* data, size_t len) { - if (len > 0) { - return Json::parse(data, data + len); - } - return Json{}; - }); + get_segment_id(), field_meta, info); } default: { } @@ -267,7 +254,7 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) { std::unique_lock lck(mutex_); variable_fields_.emplace(field_id, std::move(column)); } else { - auto column = FixedColumn(get_segment_id(), field_meta, info); + auto column = Column(get_segment_id(), field_meta, info); size = column.size(); std::unique_lock lck(mutex_); fixed_fields_.emplace(field_id, std::move(column)); @@ -728,7 +715,7 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id, default: PanicInfo( - fmt::format("unsupported data type: {}", + fmt::format("718 unsupported data type: {}", datatype_name(field_meta.get_data_type()))); } } diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 87d9afb8e6..105fd8e034 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -235,7 +235,7 @@ class SegmentSealedImpl : public SegmentSealed { SchemaPtr schema_; int64_t id_; - std::unordered_map fixed_fields_; + std::unordered_map fixed_fields_; std::unordered_map> variable_fields_; }; diff --git a/internal/core/src/segcore/Utils.cpp b/internal/core/src/segcore/Utils.cpp index fb7cac7215..ab64eca63e 100644 --- a/internal/core/src/segcore/Utils.cpp +++ b/internal/core/src/segcore/Utils.cpp @@ -10,7 +10,9 @@ // or implied. See the License for the specific language governing permissions and limitations under the License #include "segcore/Utils.h" +#include +#include "common/Utils.h" #include "index/ScalarIndex.h" namespace milvus::segcore { @@ -126,6 +128,13 @@ CreateScalarDataArray(int64_t count, const FieldMeta& field_meta) { } break; } + case DataType::JSON: { + auto obj = scalar_array->mutable_json_data(); + obj->mutable_data()->Reserve(count); + for (int i = 0; i < count; i++) { + *(obj->mutable_data()->Add()) = std::string(); + } + } default: { PanicInfo("unsupported datatype"); } @@ -341,7 +350,7 @@ MergeDataArray( auto data = FIELD_DATA(src_field_data, bool).data(); auto obj = scalar_array->mutable_bool_data(); *(obj->mutable_data()->Add()) = data[src_offset]; - continue; + break; } case DataType::INT8: case DataType::INT16: @@ -349,34 +358,40 @@ MergeDataArray( auto data = FIELD_DATA(src_field_data, int).data(); auto obj = scalar_array->mutable_int_data(); *(obj->mutable_data()->Add()) = data[src_offset]; - continue; + break; } case DataType::INT64: { auto data = FIELD_DATA(src_field_data, long).data(); auto obj = scalar_array->mutable_long_data(); *(obj->mutable_data()->Add()) = data[src_offset]; - continue; + break; } case DataType::FLOAT: { auto data = FIELD_DATA(src_field_data, float).data(); auto obj = scalar_array->mutable_float_data(); *(obj->mutable_data()->Add()) = data[src_offset]; - continue; + break; } case DataType::DOUBLE: { auto data = FIELD_DATA(src_field_data, double).data(); auto obj = scalar_array->mutable_double_data(); *(obj->mutable_data()->Add()) = data[src_offset]; - continue; + break; } case DataType::VARCHAR: { - auto& data = src_field_data->scalars().string_data(); + auto& data = FIELD_DATA(src_field_data, string); auto obj = scalar_array->mutable_string_data(); - *(obj->mutable_data()->Add()) = data.data(src_offset); - continue; + *(obj->mutable_data()->Add()) = data[src_offset]; + break; + } + case DataType::JSON: { + auto& data = FIELD_DATA(src_field_data, json); + auto obj = scalar_array->mutable_json_data(); + *(obj->mutable_data()->Add()) = data[src_offset]; + break; } default: { - PanicInfo("unsupported datatype"); + PanicInfo(fmt::format("unsupported data type {}", data_type)); } } } diff --git a/internal/core/thirdparty/CMakeLists.txt b/internal/core/thirdparty/CMakeLists.txt index a82dec0d54..ac516d3659 100644 --- a/internal/core/thirdparty/CMakeLists.txt +++ b/internal/core/thirdparty/CMakeLists.txt @@ -42,7 +42,9 @@ add_subdirectory(knowhere) add_subdirectory(boost_ext) add_subdirectory(rocksdb) +add_subdirectory(simdjson) if (LINUX) add_subdirectory(jemalloc) endif() + diff --git a/internal/core/thirdparty/simdjson/CMakeLists.txt b/internal/core/thirdparty/simdjson/CMakeLists.txt new file mode 100644 index 0000000000..845733762f --- /dev/null +++ b/internal/core/thirdparty/simdjson/CMakeLists.txt @@ -0,0 +1,19 @@ +#------------------------------------------------------------------------------- +# Copyright (C) 2019-2020 Zilliz. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under the License. +#------------------------------------------------------------------------------- + +FetchContent_Declare( + simdjson + GIT_REPOSITORY https://github.com/simdjson/simdjson.git + GIT_TAG v3.1.7 +) +FetchContent_MakeAvailable(simdjson) \ No newline at end of file diff --git a/internal/core/unittest/test_expr.cpp b/internal/core/unittest/test_expr.cpp index 98c9541dd9..aeb586db0a 100644 --- a/internal/core/unittest/test_expr.cpp +++ b/internal/core/unittest/test_expr.cpp @@ -433,10 +433,9 @@ TEST(Expr, TestTerm) { } TEST(Expr, TestSimpleDsl) { - using namespace milvus::query; using namespace milvus::segcore; - auto vec_dsl = Json::parse(R"({ + auto vec_dsl = query::Json::parse(R"({ "vector": { "fakevec": { "metric_type": "L2", @@ -459,47 +458,47 @@ TEST(Expr, TestSimpleDsl) { terms.push_back(i); } } - Json s; + query::Json s; s["term"]["age"]["values"] = terms; return s; }; // std::cout << get_item(0).dump(-2); // std::cout << vec_dsl.dump(-2); - std::vector>> testcases; + std::vector>> testcases; { - Json dsl; - dsl["must"] = Json::array( + query::Json dsl; + dsl["must"] = query::Json::array( {vec_dsl, get_item(0), get_item(1), get_item(2, 0), get_item(3)}); testcases.emplace_back( dsl, [](int64_t x) { return (x & 0b1111) == 0b1011; }); } { - Json dsl; - Json sub_dsl; - sub_dsl["must"] = Json::array( + query::Json dsl; + query::Json sub_dsl; + sub_dsl["must"] = query::Json::array( {get_item(0), get_item(1), get_item(2, 0), get_item(3)}); - dsl["must"] = Json::array({sub_dsl, vec_dsl}); + dsl["must"] = query::Json::array({sub_dsl, vec_dsl}); testcases.emplace_back( dsl, [](int64_t x) { return (x & 0b1111) == 0b1011; }); } { - Json dsl; - Json sub_dsl; - sub_dsl["should"] = Json::array( + query::Json dsl; + query::Json sub_dsl; + sub_dsl["should"] = query::Json::array( {get_item(0), get_item(1), get_item(2, 0), get_item(3)}); - dsl["must"] = Json::array({sub_dsl, vec_dsl}); + dsl["must"] = query::Json::array({sub_dsl, vec_dsl}); testcases.emplace_back( dsl, [](int64_t x) { return !!((x & 0b1111) ^ 0b0100); }); } { - Json dsl; - Json sub_dsl; - sub_dsl["must_not"] = Json::array( + query::Json dsl; + query::Json sub_dsl; + sub_dsl["must_not"] = query::Json::array( {get_item(0), get_item(1), get_item(2, 0), get_item(3)}); - dsl["must"] = Json::array({sub_dsl, vec_dsl}); + dsl["must"] = query::Json::array({sub_dsl, vec_dsl}); testcases.emplace_back( dsl, [](int64_t x) { return (x & 0b1111) != 0b1011; }); } @@ -526,13 +525,13 @@ TEST(Expr, TestSimpleDsl) { } auto seg_promote = dynamic_cast(seg.get()); - ExecExprVisitor visitor( + query::ExecExprVisitor visitor( *seg_promote, seg_promote->get_row_count(), MAX_TIMESTAMP); for (auto [clause, ref_func] : testcases) { - Json dsl; + query::Json dsl; dsl["bool"] = clause; // std::cout << dsl.dump(2); - auto plan = CreatePlan(*schema, dsl.dump()); + auto plan = query::CreatePlan(*schema, dsl.dump()); auto final = visitor.call_child(*plan->plan_node_->predicate_.value()); EXPECT_EQ(final.size(), N * num_iters); diff --git a/internal/core/unittest/test_query.cpp b/internal/core/unittest/test_query.cpp index e51b02b7b6..0d404377bc 100644 --- a/internal/core/unittest/test_query.cpp +++ b/internal/core/unittest/test_query.cpp @@ -194,7 +194,7 @@ TEST(Query, ExecWithPredicateLoader) { auto sr = segment->Search(plan.get(), ph_group.get(), time); int topk = 5; - Json json = SearchResultToJson(*sr); + query::Json json = SearchResultToJson(*sr); #ifdef __linux__ auto ref = json::parse(R"( [ @@ -278,7 +278,7 @@ TEST(Query, ExecWithPredicateSmallN) { auto sr = segment->Search(plan.get(), ph_group.get(), time); int topk = 5; - Json json = SearchResultToJson(*sr); + query::Json json = SearchResultToJson(*sr); std::cout << json.dump(2); } @@ -338,7 +338,7 @@ TEST(Query, ExecWithPredicate) { auto sr = segment->Search(plan.get(), ph_group.get(), time); int topk = 5; - Json json = SearchResultToJson(*sr); + query::Json json = SearchResultToJson(*sr); #ifdef __linux__ auto ref = json::parse(R"( [ @@ -874,7 +874,7 @@ TEST(Query, ExecWithPredicateBinary) { auto sr = segment->Search(plan.get(), ph_group.get(), time); int topk = 5; - Json json = SearchResultToJson(*sr); + query::Json json = SearchResultToJson(*sr); std::cout << json.dump(2); // ASSERT_EQ(json.dump(2), ref.dump(2)); }