mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
feat: support json for ngram (#43170)
Ref https://github.com/milvus-io/milvus/issues/42053 This PR enable ngram to support json data type. --------- Signed-off-by: SpadeA <tangchenjie1210@gmail.com>
This commit is contained in:
parent
a0c9f499ee
commit
10fe53ff59
@ -1283,11 +1283,13 @@ class SegmentExpr : public Expr {
|
||||
|
||||
bool
|
||||
CanUseNgramIndex(FieldId field_id) const {
|
||||
if (segment_->type() != SegmentType::Sealed) {
|
||||
return false;
|
||||
}
|
||||
auto cast_ptr = dynamic_cast<const segcore::SegmentSealed*>(segment_);
|
||||
return (cast_ptr != nullptr && cast_ptr->HasNgramIndex(field_id));
|
||||
return segment_->HasNgramIndex(field_id);
|
||||
}
|
||||
|
||||
bool
|
||||
CanUseNgramIndexForJson(FieldId field_id,
|
||||
const std::string& nested_path) const {
|
||||
return segment_->HasNgramIndexForJson(field_id, nested_path);
|
||||
}
|
||||
|
||||
protected:
|
||||
|
||||
@ -197,8 +197,19 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
|
||||
}
|
||||
case DataType::JSON: {
|
||||
auto val_type = expr_->val_.val_case();
|
||||
if (CanUseIndexForJson(FromValCase(val_type)) &&
|
||||
auto val_type_inner = FromValCase(val_type);
|
||||
if (CanExecNgramMatchForJson(val_type_inner) &&
|
||||
!has_offset_input_) {
|
||||
auto res = ExecNgramMatch();
|
||||
// If nullopt is returned, it means the query cannot be
|
||||
// optimized by ngram index. Forward it to the normal path.
|
||||
if (res.has_value()) {
|
||||
result = res.value();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (CanUseIndexForJson(val_type_inner) && !has_offset_input_) {
|
||||
switch (val_type) {
|
||||
case proto::plan::GenericValue::ValCase::kBoolVal:
|
||||
result = ExecRangeVisitorImplForIndex<bool>();
|
||||
@ -1885,6 +1896,7 @@ PhyUnaryRangeFilterExpr::CanUseIndexForJson(DataType val_type) {
|
||||
val_type);
|
||||
switch (val_type) {
|
||||
case DataType::STRING:
|
||||
case DataType::VARCHAR:
|
||||
use_index_ = has_index &&
|
||||
expr_->op_type_ != proto::plan::OpType::Match &&
|
||||
expr_->op_type_ != proto::plan::OpType::PostfixMatch &&
|
||||
@ -1974,6 +1986,18 @@ PhyUnaryRangeFilterExpr::CanExecNgramMatch(proto::plan::OpType op_type) {
|
||||
!has_offset_input_ && CanUseNgramIndex(field_id_);
|
||||
}
|
||||
|
||||
bool
|
||||
PhyUnaryRangeFilterExpr::CanExecNgramMatchForJson(DataType val_type) {
|
||||
return (val_type == DataType::STRING || val_type == DataType::VARCHAR) &&
|
||||
(expr_->op_type_ == proto::plan::OpType::InnerMatch ||
|
||||
expr_->op_type_ == proto::plan::OpType::Match ||
|
||||
expr_->op_type_ == proto::plan::OpType::PrefixMatch ||
|
||||
expr_->op_type_ == proto::plan::OpType::PostfixMatch) &&
|
||||
!has_offset_input_ &&
|
||||
CanUseNgramIndexForJson(
|
||||
field_id_, milvus::Json::pointer(expr_->column_.nested_path_));
|
||||
}
|
||||
|
||||
std::optional<VectorPtr>
|
||||
PhyUnaryRangeFilterExpr::ExecNgramMatch() {
|
||||
if (!arg_inited_) {
|
||||
@ -1988,8 +2012,15 @@ PhyUnaryRangeFilterExpr::ExecNgramMatch() {
|
||||
}
|
||||
|
||||
if (cached_ngram_match_res_ == nullptr) {
|
||||
auto pinned_index = segment_->GetNgramIndex(field_id_);
|
||||
auto index = pinned_index.get();
|
||||
index::NgramInvertedIndex* index;
|
||||
if (expr_->column_.data_type_ == DataType::JSON) {
|
||||
auto pinned_index = segment_->GetNgramIndexForJson(
|
||||
field_id_, milvus::Json::pointer(expr_->column_.nested_path_));
|
||||
index = pinned_index.get();
|
||||
} else {
|
||||
auto pinned_index = segment_->GetNgramIndex(field_id_);
|
||||
index = pinned_index.get();
|
||||
}
|
||||
AssertInfo(index != nullptr,
|
||||
"ngram index should not be null, field_id: {}",
|
||||
field_id_.get());
|
||||
|
||||
@ -509,6 +509,9 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr {
|
||||
bool
|
||||
CanExecNgramMatch(proto::plan::OpType op_type);
|
||||
|
||||
bool
|
||||
CanExecNgramMatchForJson(DataType val_type);
|
||||
|
||||
std::optional<VectorPtr>
|
||||
ExecNgramMatch();
|
||||
|
||||
|
||||
@ -67,28 +67,6 @@ IndexFactory::CreatePrimitiveScalarIndex(
|
||||
return CreateScalarIndexSort<T>(file_manager_context);
|
||||
}
|
||||
|
||||
IndexBasePtr
|
||||
IndexFactory::CreateNgramIndex(
|
||||
DataType data_type,
|
||||
const NgramParams& params,
|
||||
const storage::FileManagerContext& file_manager_context) {
|
||||
switch (data_type) {
|
||||
case DataType::VARCHAR:
|
||||
case DataType::STRING:
|
||||
return std::make_unique<NgramInvertedIndex>(file_manager_context,
|
||||
params);
|
||||
|
||||
case DataType::JSON:
|
||||
ThrowInfo(
|
||||
NotImplemented,
|
||||
fmt::format("building ngram index in json is not implemented"));
|
||||
default:
|
||||
ThrowInfo(DataTypeInvalid,
|
||||
fmt::format("invalid data type to build ngram index: {}",
|
||||
data_type));
|
||||
}
|
||||
}
|
||||
|
||||
template <>
|
||||
ScalarIndexPtr<std::string>
|
||||
IndexFactory::CreatePrimitiveScalarIndex<std::string>(
|
||||
@ -364,8 +342,8 @@ IndexFactory::CreatePrimitiveScalarIndex(
|
||||
case DataType::VARCHAR: {
|
||||
auto& ngram_params = create_index_info.ngram_params;
|
||||
if (ngram_params.has_value()) {
|
||||
return CreateNgramIndex(
|
||||
data_type, ngram_params.value(), file_manager_context);
|
||||
return std::make_unique<NgramInvertedIndex>(
|
||||
file_manager_context, ngram_params.value());
|
||||
}
|
||||
return CreatePrimitiveScalarIndex<std::string>(
|
||||
create_index_info, file_manager_context);
|
||||
@ -405,14 +383,15 @@ IndexFactory::CreateComplexScalarIndex(
|
||||
|
||||
IndexBasePtr
|
||||
IndexFactory::CreateJsonIndex(
|
||||
IndexType index_type,
|
||||
JsonCastType cast_dtype,
|
||||
const std::string& nested_path,
|
||||
const storage::FileManagerContext& file_manager_context,
|
||||
const std::string& json_cast_function) {
|
||||
AssertInfo(index_type == INVERTED_INDEX_TYPE,
|
||||
const CreateIndexInfo& create_index_info,
|
||||
const storage::FileManagerContext& file_manager_context) {
|
||||
AssertInfo(create_index_info.index_type == INVERTED_INDEX_TYPE ||
|
||||
create_index_info.index_type == NGRAM_INDEX_TYPE,
|
||||
"Invalid index type for json index");
|
||||
|
||||
const auto& cast_dtype = create_index_info.json_cast_type;
|
||||
const auto& nested_path = create_index_info.json_path;
|
||||
const auto& json_cast_function = create_index_info.json_cast_function;
|
||||
switch (cast_dtype.element_type()) {
|
||||
case JsonCastType::DataType::BOOL:
|
||||
return std::make_unique<index::JsonInvertedIndex<bool>>(
|
||||
@ -426,12 +405,18 @@ IndexFactory::CreateJsonIndex(
|
||||
nested_path,
|
||||
file_manager_context,
|
||||
JsonCastFunction::FromString(json_cast_function));
|
||||
case JsonCastType::DataType::VARCHAR:
|
||||
case JsonCastType::DataType::VARCHAR: {
|
||||
auto& ngram_params = create_index_info.ngram_params;
|
||||
if (ngram_params.has_value()) {
|
||||
return std::make_unique<NgramInvertedIndex>(
|
||||
file_manager_context, ngram_params.value(), nested_path);
|
||||
}
|
||||
return std::make_unique<index::JsonInvertedIndex<std::string>>(
|
||||
cast_dtype,
|
||||
nested_path,
|
||||
file_manager_context,
|
||||
JsonCastFunction::FromString(json_cast_function));
|
||||
}
|
||||
case JsonCastType::DataType::JSON:
|
||||
return std::make_unique<JsonFlatIndex>(file_manager_context,
|
||||
nested_path);
|
||||
@ -462,11 +447,7 @@ IndexFactory::CreateScalarIndex(
|
||||
file_manager_context);
|
||||
}
|
||||
case DataType::JSON: {
|
||||
return CreateJsonIndex(create_index_info.index_type,
|
||||
create_index_info.json_cast_type,
|
||||
create_index_info.json_path,
|
||||
file_manager_context,
|
||||
create_index_info.json_cast_function);
|
||||
return CreateJsonIndex(create_index_info, file_manager_context);
|
||||
}
|
||||
default:
|
||||
ThrowInfo(DataTypeInvalid, "Invalid data type:{}", data_type);
|
||||
|
||||
@ -116,13 +116,9 @@ class IndexFactory {
|
||||
storage::FileManagerContext());
|
||||
|
||||
IndexBasePtr
|
||||
CreateJsonIndex(
|
||||
IndexType index_type,
|
||||
JsonCastType cast_dtype,
|
||||
const std::string& nested_path,
|
||||
const storage::FileManagerContext& file_manager_context =
|
||||
storage::FileManagerContext(),
|
||||
const std::string& json_cast_function = UNKNOW_CAST_FUNCTION_NAME);
|
||||
CreateJsonIndex(const CreateIndexInfo& create_index_info,
|
||||
const storage::FileManagerContext& file_manager_context =
|
||||
storage::FileManagerContext());
|
||||
|
||||
IndexBasePtr
|
||||
CreateScalarIndex(const CreateIndexInfo& create_index_info,
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
#include "common/JsonCastType.h"
|
||||
#include "common/Types.h"
|
||||
#include "common/Consts.h"
|
||||
|
||||
namespace milvus::index {
|
||||
|
||||
@ -37,8 +38,8 @@ struct CreateIndexInfo {
|
||||
uint32_t tantivy_index_version{7};
|
||||
JsonCastType json_cast_type{JsonCastType::UNKNOWN};
|
||||
std::string json_path;
|
||||
std::string json_cast_function;
|
||||
std::optional<NgramParams> ngram_params;
|
||||
std::string json_cast_function{UNKNOW_CAST_FUNCTION_NAME};
|
||||
std::optional<NgramParams> ngram_params{std::nullopt};
|
||||
};
|
||||
|
||||
} // namespace milvus::index
|
||||
|
||||
140
internal/core/src/index/JsonIndexBuilder.cpp
Normal file
140
internal/core/src/index/JsonIndexBuilder.cpp
Normal file
@ -0,0 +1,140 @@
|
||||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include "common/JsonUtils.h"
|
||||
#include "index/JsonIndexBuilder.h"
|
||||
#include "simdjson/error.h"
|
||||
|
||||
namespace milvus::index {
|
||||
|
||||
template <typename T>
|
||||
void
|
||||
ProcessJsonFieldData(
|
||||
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas,
|
||||
const proto::schema::FieldSchema& schema,
|
||||
const std::string& nested_path,
|
||||
const JsonCastType& cast_type,
|
||||
JsonCastFunction cast_function,
|
||||
JsonDataAdder<T> data_adder,
|
||||
JsonNullAdder null_adder,
|
||||
JsonErrorRecorder error_recorder) {
|
||||
int64_t offset = 0;
|
||||
using SIMDJSON_T =
|
||||
std::conditional_t<std::is_same_v<T, std::string>, std::string_view, T>;
|
||||
|
||||
auto tokens = parse_json_pointer(nested_path);
|
||||
|
||||
bool is_array = cast_type.data_type() == JsonCastType::DataType::ARRAY;
|
||||
|
||||
for (const auto& data : field_datas) {
|
||||
auto n = data->get_num_rows();
|
||||
for (int64_t i = 0; i < n; i++) {
|
||||
auto json_column = static_cast<const Json*>(data->RawValue(i));
|
||||
if (schema.nullable() && !data->is_valid(i)) {
|
||||
null_adder(offset++);
|
||||
continue;
|
||||
}
|
||||
|
||||
auto exists = path_exists(json_column->dom_doc(), tokens);
|
||||
if (!exists || !json_column->exist(nested_path)) {
|
||||
error_recorder(
|
||||
*json_column, nested_path, simdjson::NO_SUCH_FIELD);
|
||||
null_adder(offset++);
|
||||
continue;
|
||||
}
|
||||
|
||||
folly::fbvector<T> values;
|
||||
if (is_array) {
|
||||
auto doc = json_column->dom_doc();
|
||||
auto array_res = doc.at_pointer(nested_path).get_array();
|
||||
if (array_res.error() != simdjson::SUCCESS) {
|
||||
error_recorder(
|
||||
*json_column, nested_path, array_res.error());
|
||||
} else {
|
||||
auto array_values = array_res.value();
|
||||
for (auto value : array_values) {
|
||||
auto val = value.template get<SIMDJSON_T>();
|
||||
|
||||
if (val.error() == simdjson::SUCCESS) {
|
||||
values.push_back(static_cast<T>(val.value()));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (cast_function.match<T>()) {
|
||||
auto res = JsonCastFunction::CastJsonValue<T>(
|
||||
cast_function, *json_column, nested_path);
|
||||
if (res.has_value()) {
|
||||
values.push_back(res.value());
|
||||
}
|
||||
} else {
|
||||
value_result<SIMDJSON_T> res =
|
||||
json_column->at<SIMDJSON_T>(nested_path);
|
||||
if (res.error() != simdjson::SUCCESS) {
|
||||
error_recorder(*json_column, nested_path, res.error());
|
||||
} else {
|
||||
values.push_back(static_cast<T>(res.value()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
data_adder(values, offset++);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template void
|
||||
ProcessJsonFieldData<bool>(
|
||||
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas,
|
||||
const proto::schema::FieldSchema& schema,
|
||||
const std::string& nested_path,
|
||||
const JsonCastType& cast_type,
|
||||
JsonCastFunction cast_function,
|
||||
JsonDataAdder<bool> data_adder,
|
||||
JsonNullAdder null_adder,
|
||||
JsonErrorRecorder error_recorder);
|
||||
|
||||
template void
|
||||
ProcessJsonFieldData<int64_t>(
|
||||
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas,
|
||||
const proto::schema::FieldSchema& schema,
|
||||
const std::string& nested_path,
|
||||
const JsonCastType& cast_type,
|
||||
JsonCastFunction cast_function,
|
||||
JsonDataAdder<int64_t> data_adder,
|
||||
JsonNullAdder null_adder,
|
||||
JsonErrorRecorder error_recorder);
|
||||
|
||||
template void
|
||||
ProcessJsonFieldData<double>(
|
||||
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas,
|
||||
const proto::schema::FieldSchema& schema,
|
||||
const std::string& nested_path,
|
||||
const JsonCastType& cast_type,
|
||||
JsonCastFunction cast_function,
|
||||
JsonDataAdder<double> data_adder,
|
||||
JsonNullAdder null_adder,
|
||||
JsonErrorRecorder error_recorder);
|
||||
|
||||
template void
|
||||
ProcessJsonFieldData<std::string>(
|
||||
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas,
|
||||
const proto::schema::FieldSchema& schema,
|
||||
const std::string& nested_path,
|
||||
const JsonCastType& cast_type,
|
||||
JsonCastFunction cast_function,
|
||||
JsonDataAdder<std::string> data_adder,
|
||||
JsonNullAdder null_adder,
|
||||
JsonErrorRecorder error_recorder);
|
||||
|
||||
} // namespace milvus::index
|
||||
88
internal/core/src/index/JsonIndexBuilder.h
Normal file
88
internal/core/src/index/JsonIndexBuilder.h
Normal file
@ -0,0 +1,88 @@
|
||||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#pragma once
|
||||
#include <functional>
|
||||
#include "common/JsonCastType.h"
|
||||
#include "common/JsonCastFunction.h"
|
||||
#include "common/Json.h"
|
||||
#include "folly/FBVector.h"
|
||||
|
||||
namespace milvus::index {
|
||||
|
||||
template <typename T>
|
||||
using JsonDataAdder =
|
||||
std::function<void(const folly::fbvector<T>& values, int64_t offset)>;
|
||||
|
||||
using JsonErrorRecorder = std::function<void(const Json& json,
|
||||
const std::string& nested_path,
|
||||
simdjson::error_code error)>;
|
||||
|
||||
using JsonNullAdder = std::function<void(int64_t offset)>;
|
||||
|
||||
// A helper function for processing json data for building inverted index
|
||||
template <typename T>
|
||||
void
|
||||
ProcessJsonFieldData(
|
||||
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas,
|
||||
const proto::schema::FieldSchema& schema,
|
||||
const std::string& nested_path,
|
||||
const JsonCastType& cast_type,
|
||||
JsonCastFunction cast_function,
|
||||
JsonDataAdder<T> data_adder,
|
||||
JsonNullAdder null_adder,
|
||||
JsonErrorRecorder error_recorder);
|
||||
|
||||
extern template void
|
||||
ProcessJsonFieldData<bool>(
|
||||
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas,
|
||||
const proto::schema::FieldSchema& schema,
|
||||
const std::string& nested_path,
|
||||
const JsonCastType& cast_type,
|
||||
JsonCastFunction cast_function,
|
||||
JsonDataAdder<bool> data_adder,
|
||||
JsonNullAdder null_adder,
|
||||
JsonErrorRecorder error_recorder);
|
||||
|
||||
extern template void
|
||||
ProcessJsonFieldData<int64_t>(
|
||||
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas,
|
||||
const proto::schema::FieldSchema& schema,
|
||||
const std::string& nested_path,
|
||||
const JsonCastType& cast_type,
|
||||
JsonCastFunction cast_function,
|
||||
JsonDataAdder<int64_t> data_adder,
|
||||
JsonNullAdder null_adder,
|
||||
JsonErrorRecorder error_recorder);
|
||||
|
||||
extern template void
|
||||
ProcessJsonFieldData<double>(
|
||||
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas,
|
||||
const proto::schema::FieldSchema& schema,
|
||||
const std::string& nested_path,
|
||||
const JsonCastType& cast_type,
|
||||
JsonCastFunction cast_function,
|
||||
JsonDataAdder<double> data_adder,
|
||||
JsonNullAdder null_adder,
|
||||
JsonErrorRecorder error_recorder);
|
||||
|
||||
extern template void
|
||||
ProcessJsonFieldData<std::string>(
|
||||
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas,
|
||||
const proto::schema::FieldSchema& schema,
|
||||
const std::string& nested_path,
|
||||
const JsonCastType& cast_type,
|
||||
JsonCastFunction cast_function,
|
||||
JsonDataAdder<std::string> data_adder,
|
||||
JsonNullAdder null_adder,
|
||||
JsonErrorRecorder error_recorder);
|
||||
|
||||
} // namespace milvus::index
|
||||
@ -21,6 +21,7 @@
|
||||
#include "log/Log.h"
|
||||
#include "common/JsonUtils.h"
|
||||
#include "simdjson/error.h"
|
||||
#include "index/JsonIndexBuilder.h"
|
||||
|
||||
namespace milvus::index {
|
||||
|
||||
@ -42,74 +43,30 @@ template <typename T>
|
||||
void
|
||||
JsonInvertedIndex<T>::build_index_for_json(
|
||||
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas) {
|
||||
int64_t offset = 0;
|
||||
LOG_INFO("Start to build json inverted index for field: {}", nested_path_);
|
||||
using SIMDJSON_T =
|
||||
std::conditional_t<std::is_same_v<T, std::string>, std::string_view, T>;
|
||||
|
||||
auto tokens = parse_json_pointer(nested_path_);
|
||||
|
||||
bool is_array = cast_type_.data_type() == JsonCastType::DataType::ARRAY;
|
||||
|
||||
for (const auto& data : field_datas) {
|
||||
auto n = data->get_num_rows();
|
||||
for (int64_t i = 0; i < n; i++) {
|
||||
auto json_column = static_cast<const Json*>(data->RawValue(i));
|
||||
if (this->schema_.nullable() && !data->is_valid(i)) {
|
||||
this->null_offset_.push_back(offset);
|
||||
this->wrapper_->template add_array_data<T>(
|
||||
nullptr, 0, offset++);
|
||||
continue;
|
||||
}
|
||||
|
||||
auto exists = path_exists(json_column->dom_doc(), tokens);
|
||||
if (!exists || !json_column->exist(nested_path_)) {
|
||||
error_recorder_.Record(
|
||||
*json_column, nested_path_, simdjson::NO_SUCH_FIELD);
|
||||
this->null_offset_.push_back(offset);
|
||||
this->wrapper_->template add_array_data<T>(
|
||||
nullptr, 0, offset++);
|
||||
continue;
|
||||
}
|
||||
folly::fbvector<T> values;
|
||||
if (is_array) {
|
||||
auto doc = json_column->dom_doc();
|
||||
auto array_res = doc.at_pointer(nested_path_).get_array();
|
||||
if (array_res.error() != simdjson::SUCCESS) {
|
||||
error_recorder_.Record(
|
||||
*json_column, nested_path_, array_res.error());
|
||||
} else {
|
||||
auto array_values = array_res.value();
|
||||
for (auto value : array_values) {
|
||||
auto val = value.template get<SIMDJSON_T>();
|
||||
|
||||
if (val.error() == simdjson::SUCCESS) {
|
||||
values.push_back(static_cast<T>(val.value()));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (cast_function_.match<T>()) {
|
||||
auto res = JsonCastFunction::CastJsonValue<T>(
|
||||
cast_function_, *json_column, nested_path_);
|
||||
if (res.has_value()) {
|
||||
values.push_back(res.value());
|
||||
}
|
||||
} else {
|
||||
value_result<SIMDJSON_T> res =
|
||||
json_column->at<SIMDJSON_T>(nested_path_);
|
||||
if (res.error() != simdjson::SUCCESS) {
|
||||
error_recorder_.Record(
|
||||
*json_column, nested_path_, res.error());
|
||||
} else {
|
||||
values.push_back(static_cast<T>(res.value()));
|
||||
}
|
||||
}
|
||||
}
|
||||
ProcessJsonFieldData<T>(
|
||||
field_datas,
|
||||
this->schema_,
|
||||
nested_path_,
|
||||
cast_type_,
|
||||
cast_function_,
|
||||
// add data
|
||||
[this](const folly::fbvector<T>& values, int64_t offset) {
|
||||
this->wrapper_->template add_array_data<T>(
|
||||
values.data(), values.size(), offset++);
|
||||
}
|
||||
}
|
||||
values.data(), values.size(), offset);
|
||||
},
|
||||
// handle null
|
||||
[this](int64_t offset) {
|
||||
this->null_offset_.push_back(offset);
|
||||
this->wrapper_->template add_array_data<T>(nullptr, 0, offset);
|
||||
},
|
||||
// handle error
|
||||
[this](const Json& json,
|
||||
const std::string& nested_path,
|
||||
simdjson::error_code error) {
|
||||
this->error_recorder_.Record(json, nested_path, error);
|
||||
});
|
||||
|
||||
error_recorder_.PrintErrStats();
|
||||
}
|
||||
|
||||
@ -11,8 +11,16 @@
|
||||
|
||||
#include "index/NgramInvertedIndex.h"
|
||||
#include "exec/expression/Expr.h"
|
||||
#include "index/JsonIndexBuilder.h"
|
||||
|
||||
namespace milvus::index {
|
||||
|
||||
const JsonCastType JSON_CAST_TYPE = JsonCastType::FromString("VARCHAR");
|
||||
// ngram index doesn't need cast function
|
||||
const JsonCastFunction JSON_CAST_FUNCTION =
|
||||
JsonCastFunction::FromString("unknown");
|
||||
|
||||
// for string/varchar type
|
||||
NgramInvertedIndex::NgramInvertedIndex(const storage::FileManagerContext& ctx,
|
||||
const NgramParams& params)
|
||||
: min_gram_(params.min_gram), max_gram_(params.max_gram) {
|
||||
@ -34,14 +42,69 @@ NgramInvertedIndex::NgramInvertedIndex(const storage::FileManagerContext& ctx,
|
||||
}
|
||||
}
|
||||
|
||||
// for json type
|
||||
NgramInvertedIndex::NgramInvertedIndex(const storage::FileManagerContext& ctx,
|
||||
const NgramParams& params,
|
||||
const std::string& nested_path)
|
||||
: NgramInvertedIndex(ctx, params) {
|
||||
nested_path_ = nested_path;
|
||||
}
|
||||
|
||||
void
|
||||
NgramInvertedIndex::BuildWithFieldData(const std::vector<FieldDataPtr>& datas) {
|
||||
AssertInfo(schema_.data_type() == proto::schema::DataType::String ||
|
||||
schema_.data_type() == proto::schema::DataType::VarChar,
|
||||
schema_.data_type() == proto::schema::DataType::VarChar ||
|
||||
schema_.data_type() == proto::schema::DataType::JSON,
|
||||
"schema data type is {}",
|
||||
schema_.data_type());
|
||||
LOG_INFO("Start to build ngram index, data type {}, field id: {}",
|
||||
schema_.data_type(),
|
||||
field_id_);
|
||||
|
||||
index_build_begin_ = std::chrono::system_clock::now();
|
||||
InvertedIndexTantivy<std::string>::BuildWithFieldData(datas);
|
||||
if (schema_.data_type() == proto::schema::DataType::JSON) {
|
||||
BuildWithJsonFieldData(datas);
|
||||
} else {
|
||||
InvertedIndexTantivy<std::string>::BuildWithFieldData(datas);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
NgramInvertedIndex::BuildWithJsonFieldData(
|
||||
const std::vector<FieldDataPtr>& field_datas) {
|
||||
AssertInfo(schema_.data_type() == proto::schema::DataType::JSON,
|
||||
"schema data should be json, but is {}",
|
||||
schema_.data_type());
|
||||
LOG_INFO("Start to build ngram index for json, field_id: {}, field: {}",
|
||||
field_id_,
|
||||
nested_path_);
|
||||
|
||||
index_build_begin_ = std::chrono::system_clock::now();
|
||||
ProcessJsonFieldData<std::string>(
|
||||
field_datas,
|
||||
this->schema_,
|
||||
nested_path_,
|
||||
JSON_CAST_TYPE,
|
||||
JSON_CAST_FUNCTION,
|
||||
// add data
|
||||
[this](const folly::fbvector<std::string>& values, int64_t offset) {
|
||||
this->wrapper_->template add_array_data<std::string>(
|
||||
values.data(), values.size(), offset);
|
||||
},
|
||||
// handle null
|
||||
[this](int64_t offset) {
|
||||
this->null_offset_.push_back(offset);
|
||||
this->wrapper_->template add_array_data<std::string>(
|
||||
nullptr, 0, offset);
|
||||
},
|
||||
// handle error
|
||||
[this](const Json& json,
|
||||
const std::string& nested_path,
|
||||
simdjson::error_code error) {
|
||||
this->error_recorder_.Record(json, nested_path, error);
|
||||
});
|
||||
|
||||
error_recorder_.PrintErrStats();
|
||||
}
|
||||
|
||||
IndexStatsPtr
|
||||
@ -51,9 +114,12 @@ NgramInvertedIndex::Upload(const Config& config) {
|
||||
auto index_build_duration =
|
||||
std::chrono::duration<double>(index_build_end - index_build_begin_)
|
||||
.count();
|
||||
LOG_INFO("index build done for ngram index, field id: {}, duration: {}s",
|
||||
field_id_,
|
||||
index_build_duration);
|
||||
LOG_INFO(
|
||||
"index build done for ngram index, data type {}, field id: {}, "
|
||||
"duration: {}s",
|
||||
schema_.data_type(),
|
||||
field_id_,
|
||||
index_build_duration);
|
||||
return InvertedIndexTantivy<std::string>::Upload(config);
|
||||
}
|
||||
|
||||
@ -116,30 +182,88 @@ NgramInvertedIndex::ExecuteQuery(const std::string& literal,
|
||||
}
|
||||
|
||||
switch (op_type) {
|
||||
case proto::plan::OpType::InnerMatch: {
|
||||
auto predicate = [&literal](const std::string_view& data) {
|
||||
return data.find(literal) != std::string::npos;
|
||||
};
|
||||
bool need_post_filter = literal.length() > max_gram_;
|
||||
return ExecuteQueryWithPredicate(
|
||||
literal, segment, predicate, need_post_filter);
|
||||
}
|
||||
case proto::plan::OpType::Match:
|
||||
return MatchQuery(literal, segment);
|
||||
case proto::plan::OpType::InnerMatch: {
|
||||
bool need_post_filter = literal.length() > max_gram_;
|
||||
|
||||
if (schema_.data_type() == proto::schema::DataType::JSON) {
|
||||
auto predicate = [&literal, this](const milvus::Json& data) {
|
||||
auto x =
|
||||
data.template at<std::string_view>(this->nested_path_);
|
||||
if (x.error()) {
|
||||
return false;
|
||||
}
|
||||
auto data_val = x.value();
|
||||
return data_val.find(literal) != std::string::npos;
|
||||
};
|
||||
|
||||
return ExecuteQueryWithPredicate<milvus::Json>(
|
||||
literal, segment, predicate, need_post_filter);
|
||||
} else {
|
||||
auto predicate = [&literal](const std::string_view& data) {
|
||||
return data.find(literal) != std::string::npos;
|
||||
};
|
||||
|
||||
return ExecuteQueryWithPredicate<std::string_view>(
|
||||
literal, segment, predicate, need_post_filter);
|
||||
}
|
||||
}
|
||||
case proto::plan::OpType::PrefixMatch: {
|
||||
auto predicate = [&literal](const std::string_view& data) {
|
||||
return data.length() >= literal.length() &&
|
||||
std::equal(literal.begin(), literal.end(), data.begin());
|
||||
};
|
||||
return ExecuteQueryWithPredicate(literal, segment, predicate, true);
|
||||
if (schema_.data_type() == proto::schema::DataType::JSON) {
|
||||
auto predicate = [&literal, this](const milvus::Json& data) {
|
||||
auto x =
|
||||
data.template at<std::string_view>(this->nested_path_);
|
||||
if (x.error()) {
|
||||
return false;
|
||||
}
|
||||
auto data_val = x.value();
|
||||
return data_val.length() >= literal.length() &&
|
||||
std::equal(literal.begin(),
|
||||
literal.end(),
|
||||
data_val.begin());
|
||||
};
|
||||
|
||||
return ExecuteQueryWithPredicate<milvus::Json>(
|
||||
literal, segment, predicate, true);
|
||||
} else {
|
||||
auto predicate = [&literal](const std::string_view& data) {
|
||||
return data.length() >= literal.length() &&
|
||||
std::equal(
|
||||
literal.begin(), literal.end(), data.begin());
|
||||
};
|
||||
|
||||
return ExecuteQueryWithPredicate<std::string_view>(
|
||||
literal, segment, predicate, true);
|
||||
}
|
||||
}
|
||||
case proto::plan::OpType::PostfixMatch: {
|
||||
auto predicate = [&literal](const std::string_view& data) {
|
||||
return data.length() >= literal.length() &&
|
||||
std::equal(
|
||||
literal.rbegin(), literal.rend(), data.rbegin());
|
||||
};
|
||||
return ExecuteQueryWithPredicate(literal, segment, predicate, true);
|
||||
if (schema_.data_type() == proto::schema::DataType::JSON) {
|
||||
auto predicate = [&literal, this](const milvus::Json& data) {
|
||||
auto x =
|
||||
data.template at<std::string_view>(this->nested_path_);
|
||||
if (x.error()) {
|
||||
return false;
|
||||
}
|
||||
auto data_val = x.value();
|
||||
return data_val.length() >= literal.length() &&
|
||||
std::equal(literal.rbegin(),
|
||||
literal.rend(),
|
||||
data_val.rbegin());
|
||||
};
|
||||
|
||||
return ExecuteQueryWithPredicate<milvus::Json>(
|
||||
literal, segment, predicate, true);
|
||||
} else {
|
||||
auto predicate = [&literal](const std::string_view& data) {
|
||||
return data.length() >= literal.length() &&
|
||||
std::equal(
|
||||
literal.rbegin(), literal.rend(), data.rbegin());
|
||||
};
|
||||
|
||||
return ExecuteQueryWithPredicate<std::string_view>(
|
||||
literal, segment, predicate, true);
|
||||
}
|
||||
}
|
||||
default:
|
||||
LOG_WARN("unsupported op type for ngram index: {}", op_type);
|
||||
@ -147,12 +271,12 @@ NgramInvertedIndex::ExecuteQuery(const std::string& literal,
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
inline void
|
||||
handle_batch(const std::string_view* data,
|
||||
const int32_t* offsets,
|
||||
handle_batch(const T* data,
|
||||
const int size,
|
||||
TargetBitmapView res,
|
||||
std::function<bool(const std::string_view&)> predicate) {
|
||||
std::function<bool(const T&)> predicate) {
|
||||
auto next_off_option = res.find_first();
|
||||
while (next_off_option.has_value()) {
|
||||
auto next_off = next_off_option.value();
|
||||
@ -166,34 +290,35 @@ handle_batch(const std::string_view* data,
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
std::optional<TargetBitmap>
|
||||
NgramInvertedIndex::ExecuteQueryWithPredicate(
|
||||
const std::string& literal,
|
||||
exec::SegmentExpr* segment,
|
||||
std::function<bool(const std::string_view&)> predicate,
|
||||
std::function<bool(const T&)> predicate,
|
||||
bool need_post_filter) {
|
||||
TargetBitmap bitset{static_cast<size_t>(Count())};
|
||||
wrapper_->ngram_match_query(literal, min_gram_, max_gram_, &bitset);
|
||||
|
||||
TargetBitmapView res(bitset);
|
||||
TargetBitmap valid(res.size(), true);
|
||||
TargetBitmapView valid_res(valid.data(), valid.size());
|
||||
|
||||
if (need_post_filter) {
|
||||
TargetBitmapView res(bitset);
|
||||
TargetBitmap valid(res.size(), true);
|
||||
TargetBitmapView valid_res(valid.data(), valid.size());
|
||||
|
||||
auto execute_batch =
|
||||
[&predicate](
|
||||
const std::string_view* data,
|
||||
const T* data,
|
||||
// `valid_data` is not used as the results returned by ngram_match_query are all valid
|
||||
const bool* _valid_data,
|
||||
const int32_t* offsets,
|
||||
const int32_t* _offsets,
|
||||
const int size,
|
||||
TargetBitmapView res,
|
||||
// the same with `valid_data`
|
||||
TargetBitmapView _valid_res) {
|
||||
handle_batch(data, offsets, size, res, predicate);
|
||||
handle_batch(data, size, res, predicate);
|
||||
};
|
||||
|
||||
segment->ProcessAllDataChunk<std::string_view>(
|
||||
segment->ProcessAllDataChunk<T>(
|
||||
execute_batch, std::nullptr_t{}, res, valid_res);
|
||||
}
|
||||
|
||||
@ -250,24 +375,52 @@ NgramInvertedIndex::MatchQuery(const std::string& literal,
|
||||
auto regex_pattern = translator(literal);
|
||||
RegexMatcher matcher(regex_pattern);
|
||||
|
||||
auto predicate = [&matcher](const std::string_view& data) {
|
||||
return matcher(data);
|
||||
};
|
||||
|
||||
auto execute_batch =
|
||||
[&predicate](
|
||||
const std::string_view* data,
|
||||
// `_valid_data` is not used as the results returned by ngram_match_query are all valid
|
||||
const bool* _valid_data,
|
||||
const int32_t* offsets,
|
||||
const int size,
|
||||
TargetBitmapView res,
|
||||
// the same with `_valid_data`
|
||||
TargetBitmapView _valid_res) {
|
||||
handle_batch(data, offsets, size, res, predicate);
|
||||
if (schema_.data_type() == proto::schema::DataType::JSON) {
|
||||
auto predicate = [&literal, &matcher, this](const milvus::Json& data) {
|
||||
auto x = data.template at<std::string_view>(this->nested_path_);
|
||||
if (x.error()) {
|
||||
return false;
|
||||
}
|
||||
auto data_val = x.value();
|
||||
return matcher(data_val);
|
||||
};
|
||||
segment->ProcessAllDataChunk<std::string_view>(
|
||||
execute_batch, std::nullptr_t{}, res, valid_res);
|
||||
|
||||
auto execute_batch_json =
|
||||
[&predicate](
|
||||
const milvus::Json* data,
|
||||
// `valid_data` is not used as the results returned by ngram_match_query are all valid
|
||||
const bool* _valid_data,
|
||||
const int32_t* _offsets,
|
||||
const int size,
|
||||
TargetBitmapView res,
|
||||
// the same with `valid_data`
|
||||
TargetBitmapView _valid_res,
|
||||
std::string val) {
|
||||
handle_batch<milvus::Json>(data, size, res, predicate);
|
||||
};
|
||||
|
||||
segment->ProcessAllDataChunk<milvus::Json>(
|
||||
execute_batch_json, std::nullptr_t{}, res, valid_res, literal);
|
||||
} else {
|
||||
auto predicate = [&matcher](const std::string_view& data) {
|
||||
return matcher(data);
|
||||
};
|
||||
|
||||
auto execute_batch =
|
||||
[&predicate](
|
||||
const std::string_view* data,
|
||||
// `valid_data` is not used as the results returned by ngram_match_query are all valid
|
||||
const bool* _valid_data,
|
||||
const int32_t* _offsets,
|
||||
const int size,
|
||||
TargetBitmapView res,
|
||||
// the same with `valid_data`
|
||||
TargetBitmapView _valid_res) {
|
||||
handle_batch<std::string_view>(data, size, res, predicate);
|
||||
};
|
||||
segment->ProcessAllDataChunk<std::string_view>(
|
||||
execute_batch, std::nullptr_t{}, res, valid_res);
|
||||
}
|
||||
|
||||
return std::optional<TargetBitmap>(std::move(bitset));
|
||||
}
|
||||
|
||||
@ -13,7 +13,7 @@
|
||||
#include <string>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <optional>
|
||||
|
||||
#include "index/JsonInvertedIndex.h"
|
||||
#include "index/InvertedIndexTantivy.h"
|
||||
|
||||
namespace milvus::exec {
|
||||
@ -23,9 +23,15 @@ class SegmentExpr;
|
||||
namespace milvus::index {
|
||||
class NgramInvertedIndex : public InvertedIndexTantivy<std::string> {
|
||||
public:
|
||||
// for string/varchar type
|
||||
explicit NgramInvertedIndex(const storage::FileManagerContext& ctx,
|
||||
const NgramParams& params);
|
||||
|
||||
// for json type
|
||||
explicit NgramInvertedIndex(const storage::FileManagerContext& ctx,
|
||||
const NgramParams& params,
|
||||
const std::string& nested_path);
|
||||
|
||||
IndexStatsPtr
|
||||
Upload(const Config& config = {}) override;
|
||||
|
||||
@ -35,18 +41,31 @@ class NgramInvertedIndex : public InvertedIndexTantivy<std::string> {
|
||||
void
|
||||
BuildWithFieldData(const std::vector<FieldDataPtr>& datas) override;
|
||||
|
||||
void
|
||||
BuildWithJsonFieldData(const std::vector<FieldDataPtr>& datas);
|
||||
|
||||
std::optional<TargetBitmap>
|
||||
ExecuteQuery(const std::string& literal,
|
||||
proto::plan::OpType op_type,
|
||||
exec::SegmentExpr* segment);
|
||||
|
||||
void
|
||||
finish() {
|
||||
this->wrapper_->finish();
|
||||
}
|
||||
|
||||
void
|
||||
create_reader(SetBitsetFn set_bitset) {
|
||||
this->wrapper_->create_reader(set_bitset);
|
||||
}
|
||||
|
||||
private:
|
||||
template <typename T>
|
||||
std::optional<TargetBitmap>
|
||||
ExecuteQueryWithPredicate(
|
||||
const std::string& literal,
|
||||
exec::SegmentExpr* segment,
|
||||
std::function<bool(const std::string_view&)> predicate,
|
||||
bool need_post_filter);
|
||||
ExecuteQueryWithPredicate(const std::string& literal,
|
||||
exec::SegmentExpr* segment,
|
||||
std::function<bool(const T&)> predicate,
|
||||
bool need_post_filter);
|
||||
|
||||
// Match is something like xxx%xxx%xxx, xxx%xxx, %xxx%xxx, xxx_x etc.
|
||||
std::optional<TargetBitmap>
|
||||
@ -57,5 +76,9 @@ class NgramInvertedIndex : public InvertedIndexTantivy<std::string> {
|
||||
uintptr_t max_gram_{0};
|
||||
int64_t field_id_{0};
|
||||
std::chrono::time_point<std::chrono::system_clock> index_build_begin_;
|
||||
|
||||
// for json type
|
||||
std::string nested_path_;
|
||||
JsonInvertedIndexParseErrorRecorder error_recorder_;
|
||||
};
|
||||
} // namespace milvus::index
|
||||
@ -172,14 +172,27 @@ ChunkedSegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
|
||||
|
||||
if (field_meta.get_data_type() == DataType::JSON) {
|
||||
auto path = info.index_params.at(JSON_PATH);
|
||||
JsonIndex index;
|
||||
index.nested_path = path;
|
||||
index.field_id = field_id;
|
||||
index.index = std::move(const_cast<LoadIndexInfo&>(info).cache_index);
|
||||
index.cast_type =
|
||||
JsonCastType::FromString(info.index_params.at(JSON_CAST_TYPE));
|
||||
json_indices.push_back(std::move(index));
|
||||
return;
|
||||
if (auto it = info.index_params.find(index::INDEX_TYPE);
|
||||
it != info.index_params.end() &&
|
||||
it->second == index::NGRAM_INDEX_TYPE) {
|
||||
if (ngram_indexings_.find(field_id) == ngram_indexings_.end()) {
|
||||
ngram_indexings_[field_id] =
|
||||
std::unordered_map<std::string, index::CacheIndexBasePtr>();
|
||||
}
|
||||
ngram_indexings_[field_id][path] =
|
||||
std::move(const_cast<LoadIndexInfo&>(info).cache_index);
|
||||
return;
|
||||
} else {
|
||||
JsonIndex index;
|
||||
index.nested_path = path;
|
||||
index.field_id = field_id;
|
||||
index.index =
|
||||
std::move(const_cast<LoadIndexInfo&>(info).cache_index);
|
||||
index.cast_type =
|
||||
JsonCastType::FromString(info.index_params.at(JSON_CAST_TYPE));
|
||||
json_indices.push_back(std::move(index));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (auto it = info.index_params.find(index::INDEX_TYPE);
|
||||
@ -637,6 +650,29 @@ ChunkedSegmentSealedImpl::GetNgramIndex(FieldId field_id) const {
|
||||
return PinWrapper<index::NgramInvertedIndex*>(ca, index);
|
||||
}
|
||||
|
||||
PinWrapper<index::NgramInvertedIndex*>
|
||||
ChunkedSegmentSealedImpl::GetNgramIndexForJson(
|
||||
FieldId field_id, const std::string& nested_path) const {
|
||||
std::shared_lock lck(mutex_);
|
||||
auto iter = ngram_indexings_.find(field_id);
|
||||
if (iter == ngram_indexings_.end() ||
|
||||
iter->second.find(nested_path) == iter->second.end()) {
|
||||
return PinWrapper<index::NgramInvertedIndex*>(nullptr);
|
||||
}
|
||||
|
||||
auto slot = iter->second.at(nested_path).get();
|
||||
lck.unlock();
|
||||
|
||||
auto ca = SemiInlineGet(slot->PinCells({0}));
|
||||
auto index = dynamic_cast<index::NgramInvertedIndex*>(ca->get_cell_of(0));
|
||||
AssertInfo(index != nullptr,
|
||||
"ngram index cache for json is corrupted, field_id: {}, "
|
||||
"nested_path: {}",
|
||||
field_id.get(),
|
||||
nested_path);
|
||||
return PinWrapper<index::NgramInvertedIndex*>(ca, index);
|
||||
}
|
||||
|
||||
int64_t
|
||||
ChunkedSegmentSealedImpl::get_row_count() const {
|
||||
std::shared_lock lck(mutex_);
|
||||
@ -1127,6 +1163,7 @@ ChunkedSegmentSealedImpl::ClearData() {
|
||||
ngram_fields_.clear();
|
||||
scalar_indexings_.clear();
|
||||
vector_indexings_.clear();
|
||||
ngram_indexings_.clear();
|
||||
insert_record_.clear();
|
||||
fields_.clear();
|
||||
variable_fields_avg_size_.clear();
|
||||
|
||||
@ -123,9 +123,22 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
|
||||
return ngram_fields_.find(field_id) != ngram_fields_.end();
|
||||
}
|
||||
|
||||
bool
|
||||
HasNgramIndexForJson(FieldId field_id,
|
||||
const std::string& nested_path) const override {
|
||||
std::shared_lock lck(mutex_);
|
||||
return ngram_indexings_.find(field_id) != ngram_indexings_.end() &&
|
||||
ngram_indexings_.at(field_id).find(nested_path) !=
|
||||
ngram_indexings_.at(field_id).end();
|
||||
}
|
||||
|
||||
PinWrapper<index::NgramInvertedIndex*>
|
||||
GetNgramIndex(FieldId field_id) const override;
|
||||
|
||||
PinWrapper<index::NgramInvertedIndex*>
|
||||
GetNgramIndexForJson(FieldId field_id,
|
||||
const std::string& nested_path) const override;
|
||||
|
||||
// TODO(tiered storage 1): should return a PinWrapper
|
||||
void
|
||||
BulkGetJsonData(FieldId field_id,
|
||||
@ -432,6 +445,12 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
|
||||
// TODO: generate index for scalar
|
||||
std::optional<int64_t> num_rows_;
|
||||
|
||||
// ngram indexings for json type
|
||||
std::unordered_map<
|
||||
FieldId,
|
||||
std::unordered_map<std::string, index::CacheIndexBasePtr>>
|
||||
ngram_indexings_;
|
||||
|
||||
// fields that has ngram index
|
||||
std::unordered_set<FieldId> ngram_fields_{};
|
||||
|
||||
|
||||
@ -542,4 +542,21 @@ SegmentInternalInterface::GetNgramIndex(FieldId field_id) const {
|
||||
return PinWrapper<index::NgramInvertedIndex*>(nullptr);
|
||||
}
|
||||
|
||||
PinWrapper<index::NgramInvertedIndex*>
|
||||
SegmentInternalInterface::GetNgramIndexForJson(
|
||||
FieldId field_id, const std::string& nested_path) const {
|
||||
return PinWrapper<index::NgramInvertedIndex*>(nullptr);
|
||||
}
|
||||
|
||||
bool
|
||||
SegmentInternalInterface::HasNgramIndex(FieldId field_id) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
SegmentInternalInterface::HasNgramIndexForJson(
|
||||
FieldId field_id, const std::string& nested_path) const {
|
||||
return false;
|
||||
}
|
||||
|
||||
} // namespace milvus::segcore
|
||||
|
||||
@ -155,6 +155,17 @@ class SegmentInterface {
|
||||
virtual PinWrapper<index::NgramInvertedIndex*>
|
||||
GetNgramIndex(FieldId field_id) const = 0;
|
||||
|
||||
virtual PinWrapper<index::NgramInvertedIndex*>
|
||||
GetNgramIndexForJson(FieldId field_id,
|
||||
const std::string& nested_path) const = 0;
|
||||
|
||||
virtual bool
|
||||
HasNgramIndex(FieldId field_id) const = 0;
|
||||
|
||||
virtual bool
|
||||
HasNgramIndexForJson(FieldId field_id,
|
||||
const std::string& nested_path) const = 0;
|
||||
|
||||
virtual void
|
||||
LazyCheckSchema(SchemaPtr sch) = 0;
|
||||
|
||||
@ -369,6 +380,17 @@ class SegmentInternalInterface : public SegmentInterface {
|
||||
virtual PinWrapper<index::NgramInvertedIndex*>
|
||||
GetNgramIndex(FieldId field_id) const override;
|
||||
|
||||
virtual PinWrapper<index::NgramInvertedIndex*>
|
||||
GetNgramIndexForJson(FieldId field_id,
|
||||
const std::string& nested_path) const override;
|
||||
|
||||
virtual bool
|
||||
HasNgramIndex(FieldId field_id) const override;
|
||||
|
||||
virtual bool
|
||||
HasNgramIndexForJson(FieldId field_id,
|
||||
const std::string& nested_path) const override;
|
||||
|
||||
public:
|
||||
virtual void
|
||||
vector_search(SearchInfo& search_info,
|
||||
|
||||
@ -107,9 +107,17 @@ class SegmentSealed : public SegmentInternalInterface {
|
||||
virtual bool
|
||||
HasNgramIndex(FieldId field_id) const = 0;
|
||||
|
||||
virtual bool
|
||||
HasNgramIndexForJson(FieldId field_id,
|
||||
const std::string& nested_path) const = 0;
|
||||
|
||||
virtual PinWrapper<index::NgramInvertedIndex*>
|
||||
GetNgramIndex(FieldId field_id) const override = 0;
|
||||
|
||||
virtual PinWrapper<index::NgramInvertedIndex*>
|
||||
GetNgramIndexForJson(FieldId field_id,
|
||||
const std::string& nested_path) const override = 0;
|
||||
|
||||
SegmentType
|
||||
type() const override {
|
||||
return SegmentType::Sealed;
|
||||
|
||||
@ -16530,9 +16530,11 @@ TYPED_TEST(JsonIndexTestFixture, TestJsonIndexUnaryExpr) {
|
||||
file_manager_ctx.fieldDataMeta.field_schema.set_fieldid(json_fid.get());
|
||||
file_manager_ctx.fieldDataMeta.field_id = json_fid.get();
|
||||
auto inv_index = index::IndexFactory::GetInstance().CreateJsonIndex(
|
||||
index::INVERTED_INDEX_TYPE,
|
||||
this->cast_type,
|
||||
this->json_path,
|
||||
index::CreateIndexInfo{
|
||||
.index_type = index::INVERTED_INDEX_TYPE,
|
||||
.json_cast_type = this->cast_type,
|
||||
.json_path = this->json_path,
|
||||
},
|
||||
file_manager_ctx);
|
||||
|
||||
using json_index_type =
|
||||
@ -16664,10 +16666,13 @@ TEST(JsonIndexTest, TestJsonNotEqualExpr) {
|
||||
file_manager_ctx.fieldDataMeta.field_schema.set_data_type(
|
||||
milvus::proto::schema::JSON);
|
||||
file_manager_ctx.fieldDataMeta.field_schema.set_fieldid(json_fid.get());
|
||||
|
||||
auto inv_index = index::IndexFactory::GetInstance().CreateJsonIndex(
|
||||
index::INVERTED_INDEX_TYPE,
|
||||
JsonCastType::FromString("DOUBLE"),
|
||||
"/a",
|
||||
index::CreateIndexInfo{
|
||||
.index_type = index::INVERTED_INDEX_TYPE,
|
||||
.json_cast_type = JsonCastType::FromString("DOUBLE"),
|
||||
.json_path = "/a",
|
||||
},
|
||||
file_manager_ctx);
|
||||
|
||||
using json_index_type = index::JsonInvertedIndex<double>;
|
||||
@ -16772,9 +16777,11 @@ TEST_P(JsonIndexExistsTest, TestExistsExpr) {
|
||||
file_manager_ctx.fieldDataMeta.field_schema.set_fieldid(json_fid.get());
|
||||
file_manager_ctx.fieldDataMeta.field_schema.set_nullable(true);
|
||||
auto inv_index = index::IndexFactory::GetInstance().CreateJsonIndex(
|
||||
index::INVERTED_INDEX_TYPE,
|
||||
JsonCastType::FromString("DOUBLE"),
|
||||
json_index_path,
|
||||
index::CreateIndexInfo{
|
||||
.index_type = index::INVERTED_INDEX_TYPE,
|
||||
.json_cast_type = JsonCastType::FromString("DOUBLE"),
|
||||
.json_path = json_index_path,
|
||||
},
|
||||
file_manager_ctx);
|
||||
|
||||
using json_index_type = index::JsonInvertedIndex<double>;
|
||||
@ -16958,7 +16965,12 @@ TEST_P(JsonIndexBinaryExprTest, TestBinaryRangeExpr) {
|
||||
file_manager_ctx.fieldDataMeta.field_schema.set_fieldid(json_fid.get());
|
||||
|
||||
auto inv_index = index::IndexFactory::GetInstance().CreateJsonIndex(
|
||||
index::INVERTED_INDEX_TYPE, GetParam(), "/a", file_manager_ctx);
|
||||
index::CreateIndexInfo{
|
||||
.index_type = index::INVERTED_INDEX_TYPE,
|
||||
.json_cast_type = GetParam(),
|
||||
.json_path = "/a",
|
||||
},
|
||||
file_manager_ctx);
|
||||
|
||||
using json_index_type = index::JsonInvertedIndex<double>;
|
||||
auto json_index = std::unique_ptr<json_index_type>(
|
||||
|
||||
@ -609,9 +609,11 @@ class JsonFlatIndexExprTest : public ::testing::Test {
|
||||
json_fid_.get());
|
||||
file_manager_ctx.fieldDataMeta.field_schema.set_nullable(true);
|
||||
auto index = index::IndexFactory::GetInstance().CreateJsonIndex(
|
||||
index::INVERTED_INDEX_TYPE,
|
||||
JsonCastType::FromString("JSON"),
|
||||
json_index_path,
|
||||
index::CreateIndexInfo{
|
||||
.index_type = index::INVERTED_INDEX_TYPE,
|
||||
.json_cast_type = JsonCastType::FromString("JSON"),
|
||||
.json_path = json_index_path,
|
||||
},
|
||||
file_manager_ctx);
|
||||
|
||||
json_index_ = std::unique_ptr<index::JsonFlatIndex>(
|
||||
|
||||
@ -57,9 +57,11 @@ TEST(JsonIndexTest, TestJSONErrRecorder) {
|
||||
file_manager_ctx.fieldDataMeta.field_id = json_fid.get();
|
||||
|
||||
auto inv_index = index::IndexFactory::GetInstance().CreateJsonIndex(
|
||||
index::INVERTED_INDEX_TYPE,
|
||||
JsonCastType::FromString("DOUBLE"),
|
||||
json_path,
|
||||
index::CreateIndexInfo{
|
||||
.index_type = index::INVERTED_INDEX_TYPE,
|
||||
.json_cast_type = JsonCastType::FromString("DOUBLE"),
|
||||
.json_path = json_path,
|
||||
},
|
||||
file_manager_ctx);
|
||||
auto json_index = std::unique_ptr<JsonInvertedIndex<double>>(
|
||||
static_cast<JsonInvertedIndex<double>*>(inv_index.release()));
|
||||
@ -118,9 +120,11 @@ TEST(JsonIndexTest, TestJsonContains) {
|
||||
file_manager_ctx.fieldDataMeta.field_id = json_fid.get();
|
||||
|
||||
auto inv_index = index::IndexFactory::GetInstance().CreateJsonIndex(
|
||||
index::INVERTED_INDEX_TYPE,
|
||||
JsonCastType::FromString("ARRAY_DOUBLE"),
|
||||
json_path,
|
||||
index::CreateIndexInfo{
|
||||
.index_type = index::INVERTED_INDEX_TYPE,
|
||||
.json_cast_type = JsonCastType::FromString("ARRAY_DOUBLE"),
|
||||
.json_path = json_path,
|
||||
},
|
||||
file_manager_ctx);
|
||||
auto json_index = std::unique_ptr<JsonInvertedIndex<double>>(
|
||||
static_cast<JsonInvertedIndex<double>*>(inv_index.release()));
|
||||
@ -212,11 +216,13 @@ TEST(JsonIndexTest, TestJsonCast) {
|
||||
file_manager_ctx.fieldDataMeta.field_id = json_fid.get();
|
||||
|
||||
auto inv_index = index::IndexFactory::GetInstance().CreateJsonIndex(
|
||||
index::INVERTED_INDEX_TYPE,
|
||||
JsonCastType::FromString("DOUBLE"),
|
||||
json_path,
|
||||
file_manager_ctx,
|
||||
"STRING_TO_DOUBLE");
|
||||
index::CreateIndexInfo{
|
||||
.index_type = index::INVERTED_INDEX_TYPE,
|
||||
.json_cast_type = JsonCastType::FromString("DOUBLE"),
|
||||
.json_path = json_path,
|
||||
.json_cast_function = "STRING_TO_DOUBLE",
|
||||
},
|
||||
file_manager_ctx);
|
||||
auto json_index = std::unique_ptr<JsonInvertedIndex<double>>(
|
||||
static_cast<JsonInvertedIndex<double>*>(inv_index.release()));
|
||||
|
||||
|
||||
@ -22,6 +22,8 @@
|
||||
#include "index/IndexFactory.h"
|
||||
#include "index/NgramInvertedIndex.h"
|
||||
#include "segcore/load_index_c.h"
|
||||
#include "test_cachinglayer/cachinglayer_test_utils.h"
|
||||
#include "expr/ITypeExpr.h"
|
||||
|
||||
using namespace milvus;
|
||||
using namespace milvus::query;
|
||||
@ -381,3 +383,137 @@ TEST(NgramIndex, TestNgramSimple) {
|
||||
proto::plan::OpType::PostfixMatch,
|
||||
std::vector<bool>(10000, true));
|
||||
}
|
||||
|
||||
TEST(NgramIndex, TestNgramJson) {
|
||||
std::vector<std::string> json_raw_data = {
|
||||
R"(1)",
|
||||
R"({"a": "Milvus project"})",
|
||||
R"({"a": "Zilliz cloud"})",
|
||||
R"({"a": "Query Node"})",
|
||||
R"({"a": "Data Node"})",
|
||||
R"({"a": [1, 2, 3]})",
|
||||
R"({"a": {"b": 1}})",
|
||||
R"({"a": 1001})",
|
||||
R"({"a": true})",
|
||||
R"({"a": "Milvus", "b": "Zilliz cloud"})",
|
||||
};
|
||||
|
||||
auto json_path = "/a";
|
||||
auto schema = std::make_shared<Schema>();
|
||||
auto json_fid = schema->AddDebugField("json", DataType::JSON);
|
||||
|
||||
auto file_manager_ctx = storage::FileManagerContext();
|
||||
file_manager_ctx.fieldDataMeta.field_schema.set_data_type(
|
||||
milvus::proto::schema::JSON);
|
||||
file_manager_ctx.fieldDataMeta.field_schema.set_fieldid(json_fid.get());
|
||||
file_manager_ctx.fieldDataMeta.field_id = json_fid.get();
|
||||
|
||||
index::CreateIndexInfo create_index_info{
|
||||
.index_type = index::INVERTED_INDEX_TYPE,
|
||||
.json_cast_type = JsonCastType::FromString("VARCHAR"),
|
||||
.json_path = json_path,
|
||||
.ngram_params = std::optional<index::NgramParams>{index::NgramParams{
|
||||
.loading_index = false,
|
||||
.min_gram = 2,
|
||||
.max_gram = 3,
|
||||
}},
|
||||
};
|
||||
auto inv_index = index::IndexFactory::GetInstance().CreateJsonIndex(
|
||||
create_index_info, file_manager_ctx);
|
||||
|
||||
auto ngram_index = std::unique_ptr<index::NgramInvertedIndex>(
|
||||
static_cast<index::NgramInvertedIndex*>(inv_index.release()));
|
||||
|
||||
std::vector<milvus::Json> jsons;
|
||||
for (auto& json : json_raw_data) {
|
||||
jsons.push_back(milvus::Json(simdjson::padded_string(json)));
|
||||
}
|
||||
|
||||
auto json_field =
|
||||
std::make_shared<FieldData<milvus::Json>>(DataType::JSON, false);
|
||||
json_field->add_json_data(jsons);
|
||||
ngram_index->BuildWithFieldData({json_field});
|
||||
ngram_index->finish();
|
||||
ngram_index->create_reader(milvus::index::SetBitsetSealed);
|
||||
|
||||
auto segment = segcore::CreateSealedSegment(schema);
|
||||
segcore::LoadIndexInfo load_index_info;
|
||||
load_index_info.field_id = json_fid.get();
|
||||
load_index_info.field_type = DataType::JSON;
|
||||
load_index_info.cache_index =
|
||||
CreateTestCacheIndex("", std::move(ngram_index));
|
||||
|
||||
std::map<std::string, std::string> index_params{
|
||||
{milvus::index::INDEX_TYPE, milvus::index::NGRAM_INDEX_TYPE},
|
||||
{milvus::index::MIN_GRAM, "2"},
|
||||
{milvus::index::MAX_GRAM, "3"},
|
||||
{milvus::LOAD_PRIORITY, "HIGH"},
|
||||
{JSON_PATH, json_path},
|
||||
{JSON_CAST_TYPE, "VARCHAR"}};
|
||||
load_index_info.index_params = index_params;
|
||||
|
||||
segment->LoadIndex(load_index_info);
|
||||
|
||||
auto cm = milvus::storage::RemoteChunkManagerSingleton::GetInstance()
|
||||
.GetRemoteChunkManager();
|
||||
auto load_info = PrepareSingleFieldInsertBinlog(
|
||||
0, 0, 0, json_fid.get(), {json_field}, cm);
|
||||
segment->LoadFieldData(load_info);
|
||||
|
||||
std::vector<std::tuple<proto::plan::GenericValue,
|
||||
std::vector<int64_t>,
|
||||
proto::plan::OpType>>
|
||||
test_cases;
|
||||
proto::plan::GenericValue value;
|
||||
value.set_string_val("nothing");
|
||||
test_cases.push_back(std::make_tuple(
|
||||
value, std::vector<int64_t>{}, proto::plan::OpType::InnerMatch));
|
||||
|
||||
value.set_string_val("il");
|
||||
test_cases.push_back(std::make_tuple(
|
||||
value, std::vector<int64_t>{1, 2, 9}, proto::plan::OpType::InnerMatch));
|
||||
|
||||
value.set_string_val("lliz");
|
||||
test_cases.push_back(std::make_tuple(
|
||||
value, std::vector<int64_t>{2}, proto::plan::OpType::InnerMatch));
|
||||
|
||||
value.set_string_val("Zi");
|
||||
test_cases.push_back(std::make_tuple(
|
||||
value, std::vector<int64_t>{2}, proto::plan::OpType::PrefixMatch));
|
||||
|
||||
value.set_string_val("Zilliz");
|
||||
test_cases.push_back(std::make_tuple(
|
||||
value, std::vector<int64_t>{2}, proto::plan::OpType::PrefixMatch));
|
||||
|
||||
value.set_string_val("de");
|
||||
test_cases.push_back(std::make_tuple(
|
||||
value, std::vector<int64_t>{3, 4}, proto::plan::OpType::PostfixMatch));
|
||||
|
||||
value.set_string_val("Node");
|
||||
test_cases.push_back(std::make_tuple(
|
||||
value, std::vector<int64_t>{3, 4}, proto::plan::OpType::PostfixMatch));
|
||||
|
||||
value.set_string_val("%ery%ode%");
|
||||
test_cases.push_back(std::make_tuple(
|
||||
value, std::vector<int64_t>{3}, proto::plan::OpType::Match));
|
||||
|
||||
for (auto& test_case : test_cases) {
|
||||
auto value = std::get<0>(test_case);
|
||||
auto expr = std::make_shared<milvus::expr::UnaryRangeFilterExpr>(
|
||||
milvus::expr::ColumnInfo(json_fid, DataType::JSON, {"a"}, true),
|
||||
std::get<2>(test_case),
|
||||
value,
|
||||
std::vector<proto::plan::GenericValue>{});
|
||||
|
||||
auto plan =
|
||||
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, expr);
|
||||
|
||||
auto result = milvus::query::ExecuteQueryExpr(
|
||||
plan, segment.get(), json_raw_data.size(), MAX_TIMESTAMP);
|
||||
auto expect_result = std::get<1>(test_case);
|
||||
EXPECT_EQ(result.count(), expect_result.size());
|
||||
for (auto& id : expect_result) {
|
||||
EXPECT_TRUE(result[id]);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -23,9 +23,8 @@ func newNgramIndexChecker() *NgramIndexChecker {
|
||||
}
|
||||
|
||||
func (c *NgramIndexChecker) CheckTrain(dataType schemapb.DataType, params map[string]string) error {
|
||||
if dataType != schemapb.DataType_VarChar {
|
||||
// todo(SpadeA): we may support it for json in the future
|
||||
return merr.WrapErrParameterInvalidMsg("Ngram index can only be created on VARCHAR field")
|
||||
if dataType != schemapb.DataType_VarChar && dataType != schemapb.DataType_JSON {
|
||||
return merr.WrapErrParameterInvalidMsg("Ngram index can only be created on VARCHAR or JSON field")
|
||||
}
|
||||
|
||||
minGramStr, minGramExist := params[MinGramKey]
|
||||
@ -53,8 +52,8 @@ func (c *NgramIndexChecker) CheckTrain(dataType schemapb.DataType, params map[st
|
||||
|
||||
func (c *NgramIndexChecker) CheckValidDataType(indexType IndexType, field *schemapb.FieldSchema) error {
|
||||
dType := field.GetDataType()
|
||||
if !typeutil.IsStringType(dType) {
|
||||
return fmt.Errorf("ngram index can only be created on VARCHAR field")
|
||||
if !typeutil.IsStringType(dType) && dType != schemapb.DataType_JSON {
|
||||
return fmt.Errorf("ngram index can only be created on VARCHAR or JSON field")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user