mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 09:08:43 +08:00
feat: Add json flat index (#39917)
issue: https://github.com/milvus-io/milvus/issues/35528 This PR introduces a JSON flat index that allows indexing JSON fields and dynamic fields in the same way as other field types. In a previous PR (#36750), we implemented a JSON index that requires specifying a JSON path and casting a type. The only distinction lies in the json_cast_type parameter. When json_cast_type is set to JSON type, Milvus automatically creates a JSON flat index. For details on how Tantivy interprets JSON data, refer to the [tantivy documentation](https://github.com/quickwit-oss/tantivy/blob/main/doc/src/json.md#pitfalls-limitation-and-corner-cases). Limitations Array handling: Arrays do not function as nested objects. See the [limitations section](https://github.com/quickwit-oss/tantivy/blob/main/doc/src/json.md#arrays-do-not-work-like-nested-object) for more details. --------- Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
parent
83877b9faf
commit
fbf5cb4e62
@ -202,8 +202,12 @@ class Json {
|
||||
bool
|
||||
exist(std::string_view pointer) const {
|
||||
auto doc = this->doc();
|
||||
auto res = doc.at_pointer(pointer);
|
||||
return res.error() == simdjson::SUCCESS && !res.is_null();
|
||||
if (pointer.empty()) {
|
||||
return doc.error() == simdjson::SUCCESS && !doc.is_null();
|
||||
} else {
|
||||
auto res = doc.at_pointer(pointer);
|
||||
return res.error() == simdjson::SUCCESS && !res.is_null();
|
||||
}
|
||||
}
|
||||
|
||||
// construct JSON pointer with provided path
|
||||
|
||||
@ -21,7 +21,10 @@ const std::unordered_map<std::string, const JsonCastType>
|
||||
JsonCastType::DataType::DOUBLE)},
|
||||
{"ARRAY_VARCHAR",
|
||||
JsonCastType(JsonCastType::DataType::ARRAY,
|
||||
JsonCastType::DataType::VARCHAR)}};
|
||||
JsonCastType::DataType::VARCHAR)},
|
||||
{"JSON",
|
||||
JsonCastType(JsonCastType::DataType::JSON,
|
||||
JsonCastType::DataType::JSON)}};
|
||||
|
||||
const JsonCastType JsonCastType::UNKNOWN = JsonCastType(
|
||||
JsonCastType::DataType::UNKNOWN, JsonCastType::DataType::UNKNOWN);
|
||||
|
||||
@ -22,7 +22,7 @@ namespace milvus {
|
||||
using MilvusDataType = milvus::DataType;
|
||||
class JsonCastType {
|
||||
public:
|
||||
enum class DataType { UNKNOWN, BOOL, DOUBLE, VARCHAR, ARRAY };
|
||||
enum class DataType { UNKNOWN, BOOL, DOUBLE, VARCHAR, ARRAY, JSON };
|
||||
|
||||
static const JsonCastType UNKNOWN;
|
||||
|
||||
|
||||
74
internal/core/src/common/JsonUtils.cpp
Normal file
74
internal/core/src/common/JsonUtils.cpp
Normal file
@ -0,0 +1,74 @@
|
||||
#include "common/JsonUtils.h"
|
||||
|
||||
namespace milvus {
|
||||
|
||||
// Parse a JSON Pointer into unescaped path segments
|
||||
std::vector<std::string>
|
||||
parse_json_pointer(const std::string& pointer) {
|
||||
std::vector<std::string> tokens;
|
||||
if (pointer.empty())
|
||||
return tokens; // Root path (entire document)
|
||||
if (pointer[0] != '/') {
|
||||
throw std::invalid_argument(
|
||||
"Invalid JSON Pointer: must start with '/'");
|
||||
}
|
||||
size_t start = 1;
|
||||
while (start < pointer.size()) {
|
||||
size_t end = pointer.find('/', start);
|
||||
if (end == std::string::npos)
|
||||
end = pointer.size();
|
||||
std::string token = pointer.substr(start, end - start);
|
||||
// Replace ~1 with / and ~0 with ~
|
||||
size_t pos = 0;
|
||||
while ((pos = token.find("~1", pos)) != std::string::npos) {
|
||||
token.replace(pos, 2, "/");
|
||||
pos += 1; // Avoid infinite loops on overlapping replacements
|
||||
}
|
||||
pos = 0;
|
||||
while ((pos = token.find("~0", pos)) != std::string::npos) {
|
||||
token.replace(pos, 2, "~");
|
||||
pos += 1;
|
||||
}
|
||||
tokens.push_back(token);
|
||||
start = end + 1;
|
||||
}
|
||||
return tokens;
|
||||
}
|
||||
|
||||
// Check if a JSON Pointer path exists
|
||||
bool
|
||||
path_exists(const simdjson::dom::element& root,
|
||||
const std::vector<std::string>& tokens) {
|
||||
simdjson::dom::element current = root;
|
||||
for (const auto& token : tokens) {
|
||||
if (current.type() == simdjson::dom::element_type::OBJECT) {
|
||||
auto obj = current.get_object();
|
||||
if (obj.error())
|
||||
return false;
|
||||
auto next = obj.value().at_key(token);
|
||||
if (next.error())
|
||||
return false;
|
||||
current = next.value();
|
||||
} else if (current.type() == simdjson::dom::element_type::ARRAY) {
|
||||
if (token == "-")
|
||||
return false; // "-" is invalid for existence checks
|
||||
char* endptr;
|
||||
long index = strtol(token.c_str(), &endptr, 10);
|
||||
if (*endptr != '\0' || index < 0)
|
||||
return false; // Not a valid index
|
||||
auto arr = current.get_array();
|
||||
if (arr.error())
|
||||
return false;
|
||||
if (static_cast<size_t>(index) >= arr.value().size())
|
||||
return false;
|
||||
auto next = arr.value().at(index);
|
||||
if (next.error())
|
||||
return false;
|
||||
current = next.value();
|
||||
} else {
|
||||
return false; // Path cannot be resolved
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
} // namespace milvus
|
||||
16
internal/core/src/common/JsonUtils.h
Normal file
16
internal/core/src/common/JsonUtils.h
Normal file
@ -0,0 +1,16 @@
|
||||
#pragma once
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "simdjson/dom.h"
|
||||
|
||||
namespace milvus {
|
||||
|
||||
// Parse a JSON Pointer into unescaped path segments
|
||||
std::vector<std::string>
|
||||
parse_json_pointer(const std::string& pointer);
|
||||
|
||||
// Check if a JSON Pointer path exists
|
||||
bool
|
||||
path_exists(const simdjson::dom::element& root,
|
||||
const std::vector<std::string>& tokens);
|
||||
} // namespace milvus
|
||||
@ -80,6 +80,15 @@ PhyExistsFilterExpr::EvalJsonExistsForIndex() {
|
||||
break;
|
||||
}
|
||||
|
||||
case JsonCastType::DataType::JSON: {
|
||||
auto* json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(index);
|
||||
auto executor =
|
||||
json_flat_index->create_executor<double>(pointer);
|
||||
cached_index_chunk_res_ = executor->IsNotNull().clone();
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
PanicInfo(DataTypeInvalid,
|
||||
"unsupported data type: {}",
|
||||
@ -116,8 +125,8 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegment(EvalCtx& context) {
|
||||
auto pointer = milvus::Json::pointer(expr_->column_.nested_path_);
|
||||
int processed_cursor = 0;
|
||||
auto execute_sub_batch =
|
||||
[&bitmap_input, &
|
||||
processed_cursor ]<FilterType filter_type = FilterType::sequential>(
|
||||
[&bitmap_input,
|
||||
&processed_cursor]<FilterType filter_type = FilterType::sequential>(
|
||||
const milvus::Json* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
@ -125,23 +134,23 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegment(EvalCtx& context) {
|
||||
TargetBitmapView res,
|
||||
TargetBitmapView valid_res,
|
||||
const std::string& pointer) {
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
for (int i = 0; i < size; ++i) {
|
||||
auto offset = i;
|
||||
if constexpr (filter_type == FilterType::random) {
|
||||
offset = (offsets) ? offsets[i] : i;
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
for (int i = 0; i < size; ++i) {
|
||||
auto offset = i;
|
||||
if constexpr (filter_type == FilterType::random) {
|
||||
offset = (offsets) ? offsets[i] : i;
|
||||
}
|
||||
if (valid_data != nullptr && !valid_data[offset]) {
|
||||
res[i] = valid_res[i] = false;
|
||||
continue;
|
||||
}
|
||||
if (has_bitmap_input && !bitmap_input[processed_cursor + i]) {
|
||||
continue;
|
||||
}
|
||||
res[i] = data[offset].exist(pointer);
|
||||
}
|
||||
if (valid_data != nullptr && !valid_data[offset]) {
|
||||
res[i] = valid_res[i] = false;
|
||||
continue;
|
||||
}
|
||||
if (has_bitmap_input && !bitmap_input[processed_cursor + i]) {
|
||||
continue;
|
||||
}
|
||||
res[i] = data[offset].exist(pointer);
|
||||
}
|
||||
processed_cursor += size;
|
||||
};
|
||||
processed_cursor += size;
|
||||
};
|
||||
|
||||
int64_t processed_size;
|
||||
if (has_offset_input_) {
|
||||
|
||||
@ -29,6 +29,8 @@
|
||||
#include "exec/expression/Utils.h"
|
||||
#include "exec/QueryContext.h"
|
||||
#include "expr/ITypeExpr.h"
|
||||
#include "index/Index.h"
|
||||
#include "index/JsonFlatIndex.h"
|
||||
#include "log/Log.h"
|
||||
#include "query/PlanProto.h"
|
||||
#include "segcore/SegmentSealed.h"
|
||||
@ -824,14 +826,34 @@ class SegmentExpr : public Expr {
|
||||
// executing costs quite much time.
|
||||
if (cached_index_chunk_id_ != i) {
|
||||
Index* index_ptr = nullptr;
|
||||
PinWrapper<const index::IndexBase*> json_pw;
|
||||
PinWrapper<const Index*> pw;
|
||||
// Executor for JsonFlatIndex. Must outlive index_ptr. Only used for JSON type.
|
||||
std::shared_ptr<
|
||||
index::JsonFlatIndexQueryExecutor<IndexInnerType>>
|
||||
executor;
|
||||
|
||||
if (field_type_ == DataType::JSON) {
|
||||
auto pointer = milvus::Json::pointer(nested_path_);
|
||||
json_pw = segment_->chunk_json_index(field_id_, pointer, i);
|
||||
|
||||
pw = segment_->chunk_scalar_index<IndexInnerType>(
|
||||
field_id_, pointer, i);
|
||||
index_ptr = const_cast<Index*>(pw.get());
|
||||
// check if it is a json flat index, if so, create a json flat index query executor
|
||||
auto json_flat_index =
|
||||
dynamic_cast<const index::JsonFlatIndex*>(
|
||||
json_pw.get());
|
||||
|
||||
if (json_flat_index) {
|
||||
auto index_path = json_flat_index->GetNestedPath();
|
||||
executor =
|
||||
json_flat_index
|
||||
->template create_executor<IndexInnerType>(
|
||||
pointer.substr(index_path.size()));
|
||||
index_ptr = executor.get();
|
||||
} else {
|
||||
auto json_index =
|
||||
const_cast<index::IndexBase*>(json_pw.get());
|
||||
index_ptr = dynamic_cast<Index*>(json_index);
|
||||
}
|
||||
} else {
|
||||
pw = segment_->chunk_scalar_index<IndexInnerType>(field_id_,
|
||||
i);
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
|
||||
#include "UnaryExpr.h"
|
||||
#include <optional>
|
||||
#include "common/EasyAssert.h"
|
||||
#include "common/Json.h"
|
||||
#include "common/Types.h"
|
||||
#include "common/type_c.h"
|
||||
@ -218,10 +219,6 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
|
||||
case proto::plan::GenericValue::ValCase::kStringVal:
|
||||
result = ExecRangeVisitorImplForIndex<std::string>();
|
||||
break;
|
||||
case proto::plan::GenericValue::ValCase::kArrayVal:
|
||||
result =
|
||||
ExecRangeVisitorImplForIndex<proto::plan::Array>();
|
||||
break;
|
||||
default:
|
||||
PanicInfo(
|
||||
DataTypeInvalid, "unknown data type: {}", val_type);
|
||||
@ -321,8 +318,9 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(EvalCtx& context) {
|
||||
}
|
||||
int processed_cursor = 0;
|
||||
auto execute_sub_batch =
|
||||
[ op_type, &processed_cursor, &
|
||||
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
|
||||
[op_type,
|
||||
&processed_cursor,
|
||||
&bitmap_input]<FilterType filter_type = FilterType::sequential>(
|
||||
const milvus::ArrayView* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
@ -331,185 +329,186 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(EvalCtx& context) {
|
||||
TargetBitmapView valid_res,
|
||||
ValueType val,
|
||||
int index) {
|
||||
switch (op_type) {
|
||||
case proto::plan::GreaterThan: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::GreaterThan,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
switch (op_type) {
|
||||
case proto::plan::GreaterThan: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::GreaterThan,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::GreaterEqual: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::GreaterEqual,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::LessThan: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::LessThan,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::LessEqual: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::LessEqual,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::Equal: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::Equal,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::NotEqual: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::NotEqual,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::PrefixMatch: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::PrefixMatch,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::Match: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::Match,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::PostfixMatch: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::PostfixMatch,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::InnerMatch: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::InnerMatch,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
PanicInfo(
|
||||
OpTypeInvalid,
|
||||
fmt::format(
|
||||
"unsupported operator type for unary expr: {}",
|
||||
op_type));
|
||||
}
|
||||
case proto::plan::GreaterEqual: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::GreaterEqual,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::LessThan: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::LessThan,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::LessEqual: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::LessEqual,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::Equal: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::Equal,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::NotEqual: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::NotEqual,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::PrefixMatch: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::PrefixMatch,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::Match: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::Match,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::PostfixMatch: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::PostfixMatch,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::InnerMatch: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::InnerMatch,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
PanicInfo(
|
||||
OpTypeInvalid,
|
||||
fmt::format("unsupported operator type for unary expr: {}",
|
||||
op_type));
|
||||
}
|
||||
processed_cursor += size;
|
||||
};
|
||||
processed_cursor += size;
|
||||
};
|
||||
int64_t processed_size;
|
||||
if (has_offset_input_) {
|
||||
processed_size =
|
||||
@ -709,16 +708,18 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(EvalCtx& context) {
|
||||
} while (false)
|
||||
|
||||
int processed_cursor = 0;
|
||||
auto execute_sub_batch =
|
||||
[ op_type, pointer, &processed_cursor, &
|
||||
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
|
||||
const milvus::Json* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
const int size,
|
||||
TargetBitmapView res,
|
||||
TargetBitmapView valid_res,
|
||||
ExprValueType val) {
|
||||
auto execute_sub_batch = [op_type,
|
||||
pointer,
|
||||
&processed_cursor,
|
||||
&bitmap_input]<FilterType filter_type =
|
||||
FilterType::sequential>(
|
||||
const milvus::Json* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
const int size,
|
||||
TargetBitmapView res,
|
||||
TargetBitmapView valid_res,
|
||||
ExprValueType val) {
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
switch (op_type) {
|
||||
case proto::plan::GreaterThan: {
|
||||
@ -1647,16 +1648,17 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForData(EvalCtx& context) {
|
||||
auto expr_type = expr_->op_type_;
|
||||
|
||||
size_t processed_cursor = 0;
|
||||
auto execute_sub_batch =
|
||||
[ expr_type, &processed_cursor, &
|
||||
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
|
||||
const T* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
const int size,
|
||||
TargetBitmapView res,
|
||||
TargetBitmapView valid_res,
|
||||
IndexInnerType val) {
|
||||
auto execute_sub_batch = [expr_type,
|
||||
&processed_cursor,
|
||||
&bitmap_input]<FilterType filter_type =
|
||||
FilterType::sequential>(
|
||||
const T* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
const int size,
|
||||
TargetBitmapView res,
|
||||
TargetBitmapView valid_res,
|
||||
IndexInnerType val) {
|
||||
switch (expr_type) {
|
||||
case proto::plan::GreaterThan: {
|
||||
UnaryElementFunc<T, proto::plan::GreaterThan, filter_type> func;
|
||||
|
||||
@ -17,11 +17,13 @@
|
||||
#include "index/IndexFactory.h"
|
||||
#include <cstdlib>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include "common/EasyAssert.h"
|
||||
#include "common/FieldDataInterface.h"
|
||||
#include "common/JsonCastType.h"
|
||||
#include "common/Types.h"
|
||||
#include "index/Index.h"
|
||||
#include "index/JsonFlatIndex.h"
|
||||
#include "index/VectorMemIndex.h"
|
||||
#include "index/Utils.h"
|
||||
#include "index/Meta.h"
|
||||
@ -408,6 +410,9 @@ IndexFactory::CreateJsonIndex(
|
||||
nested_path,
|
||||
file_manager_context,
|
||||
JsonCastFunction::FromString(json_cast_function));
|
||||
case JsonCastType::DataType::JSON:
|
||||
return std::make_unique<JsonFlatIndex>(file_manager_context,
|
||||
nested_path);
|
||||
default:
|
||||
PanicInfo(DataTypeInvalid, "Invalid data type:{}", cast_dtype);
|
||||
}
|
||||
|
||||
@ -61,18 +61,21 @@ InvertedIndexTantivy<T>::InitForBuildIndex() {
|
||||
d_type_,
|
||||
path_.c_str(),
|
||||
tantivy_index_version_,
|
||||
inverted_index_single_segment_);
|
||||
inverted_index_single_segment_,
|
||||
user_specified_doc_id_);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
InvertedIndexTantivy<T>::InvertedIndexTantivy(
|
||||
uint32_t tantivy_index_version,
|
||||
const storage::FileManagerContext& ctx,
|
||||
bool inverted_index_single_segment)
|
||||
bool inverted_index_single_segment,
|
||||
bool user_specified_doc_id)
|
||||
: ScalarIndex<T>(INVERTED_INDEX_TYPE),
|
||||
schema_(ctx.fieldDataMeta.field_schema),
|
||||
tantivy_index_version_(tantivy_index_version),
|
||||
inverted_index_single_segment_(inverted_index_single_segment) {
|
||||
inverted_index_single_segment_(inverted_index_single_segment),
|
||||
user_specified_doc_id_(user_specified_doc_id) {
|
||||
mem_file_manager_ = std::make_shared<MemFileManager>(ctx);
|
||||
disk_file_manager_ = std::make_shared<DiskFileManager>(ctx);
|
||||
// push init wrapper to load process
|
||||
|
||||
@ -49,6 +49,10 @@ get_tantivy_data_type(proto::schema::DataType data_type) {
|
||||
return TantivyDataType::Keyword;
|
||||
}
|
||||
|
||||
case proto::schema::DataType::JSON: {
|
||||
return TantivyDataType::JSON;
|
||||
}
|
||||
|
||||
default:
|
||||
PanicInfo(ErrorCode::NotImplemented,
|
||||
fmt::format("not implemented data type: {}", data_type));
|
||||
@ -72,7 +76,8 @@ class InvertedIndexTantivy : public ScalarIndex<T> {
|
||||
// Default, we build tantivy index with version 7 (newest version now).
|
||||
explicit InvertedIndexTantivy(uint32_t tantivy_index_version,
|
||||
const storage::FileManagerContext& ctx,
|
||||
bool inverted_index_single_segment = false);
|
||||
bool inverted_index_single_segment = false,
|
||||
bool user_specified_doc_id = true);
|
||||
|
||||
~InvertedIndexTantivy();
|
||||
|
||||
@ -183,7 +188,7 @@ class InvertedIndexTantivy : public ScalarIndex<T> {
|
||||
return Count();
|
||||
}
|
||||
|
||||
const TargetBitmap
|
||||
virtual const TargetBitmap
|
||||
PrefixMatch(const std::string_view prefix);
|
||||
|
||||
const TargetBitmap
|
||||
@ -253,7 +258,7 @@ class InvertedIndexTantivy : public ScalarIndex<T> {
|
||||
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas) {
|
||||
PanicInfo(ErrorCode::NotImplemented,
|
||||
"build_index_for_json not implemented");
|
||||
};
|
||||
}
|
||||
|
||||
protected:
|
||||
std::shared_ptr<TantivyIndexWrapper> wrapper_;
|
||||
@ -285,6 +290,10 @@ class InvertedIndexTantivy : public ScalarIndex<T> {
|
||||
// building node to build specific type of tantivy index.
|
||||
bool inverted_index_single_segment_{false};
|
||||
|
||||
// `user_specified_doc_id_` is used to control whether to use user specified doc id.
|
||||
// If `user_specified_doc_id_` is true, the doc id is specified by the user, otherwise, the doc id is generated by the index.
|
||||
bool user_specified_doc_id_{true};
|
||||
|
||||
// `tantivy_index_version_` is used to control which kind of tantivy index should be used.
|
||||
// There could be the case where milvus version of read node is lower than the version of index builder node(and read node
|
||||
// may not be upgraded to a higher version in a predictable time), so we are using a lower version of tantivy to read index
|
||||
|
||||
59
internal/core/src/index/JsonFlatIndex.cpp
Normal file
59
internal/core/src/index/JsonFlatIndex.cpp
Normal file
@ -0,0 +1,59 @@
|
||||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#include "index/JsonFlatIndex.h"
|
||||
#include "common/Types.h"
|
||||
#include "index/InvertedIndexUtil.h"
|
||||
#include "log/Log.h"
|
||||
#include "simdjson/builtin.h"
|
||||
#include "simdjson/padded_string.h"
|
||||
#include "common/JsonUtils.h"
|
||||
namespace milvus::index {
|
||||
|
||||
void
|
||||
JsonFlatIndex::build_index_for_json(
|
||||
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas) {
|
||||
int64_t offset = 0;
|
||||
auto tokens = parse_json_pointer(nested_path_);
|
||||
for (const auto& data : field_datas) {
|
||||
auto n = data->get_num_rows();
|
||||
for (int i = 0; i < n; i++) {
|
||||
if (schema_.nullable() && !data->is_valid(i)) {
|
||||
null_offset_.push_back(offset);
|
||||
wrapper_->add_json_array_data(nullptr, 0, offset++);
|
||||
continue;
|
||||
}
|
||||
auto json = static_cast<const Json*>(data->RawValue(i));
|
||||
auto exists = path_exists(json->dom_doc(), tokens);
|
||||
if (!exists || !json->exist(nested_path_)) {
|
||||
null_offset_.push_back(offset);
|
||||
wrapper_->add_json_array_data(nullptr, 0, offset++);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (nested_path_ == "") {
|
||||
wrapper_->add_json_data(json, 1, offset++);
|
||||
} else {
|
||||
auto doc = json->doc();
|
||||
auto res = doc.at_pointer(nested_path_);
|
||||
auto err = res.error();
|
||||
if (err != simdjson::SUCCESS) {
|
||||
wrapper_->add_json_array_data(nullptr, 0, offset++);
|
||||
} else {
|
||||
auto str = simdjson::to_json_string(res.value()).value();
|
||||
Json subpath_json = Json(simdjson::padded_string(str));
|
||||
wrapper_->add_json_data(&subpath_json, 1, offset++);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} // namespace milvus::index
|
||||
227
internal/core/src/index/JsonFlatIndex.h
Normal file
227
internal/core/src/index/JsonFlatIndex.h
Normal file
@ -0,0 +1,227 @@
|
||||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#pragma once
|
||||
#include <algorithm>
|
||||
#include <memory>
|
||||
#include "common/EasyAssert.h"
|
||||
#include "common/JsonCastType.h"
|
||||
#include "common/Types.h"
|
||||
#include "index/Index.h"
|
||||
#include "index/InvertedIndexTantivy.h"
|
||||
#include "index/InvertedIndexUtil.h"
|
||||
#include "index/ScalarIndex.h"
|
||||
#include "log/Log.h"
|
||||
namespace milvus::index {
|
||||
|
||||
// JsonFlatIndexQueryExecutor is used to execute queries on a specified json path, and can be constructed by JsonFlatIndex
|
||||
template <typename T>
|
||||
class JsonFlatIndexQueryExecutor : public InvertedIndexTantivy<T> {
|
||||
public:
|
||||
JsonFlatIndexQueryExecutor(std::string& json_path,
|
||||
std::shared_ptr<TantivyIndexWrapper> wrapper) {
|
||||
json_path_ = json_path;
|
||||
this->wrapper_ = wrapper;
|
||||
}
|
||||
|
||||
const TargetBitmap
|
||||
In(size_t n, const T* values) override {
|
||||
TargetBitmap bitset(this->Count());
|
||||
for (size_t i = 0; i < n; ++i) {
|
||||
this->wrapper_->json_term_query(json_path_, values[i], &bitset);
|
||||
}
|
||||
return bitset;
|
||||
}
|
||||
|
||||
const TargetBitmap
|
||||
IsNull() override {
|
||||
TargetBitmap bitset(this->Count());
|
||||
this->wrapper_->json_exist_query(json_path_, &bitset);
|
||||
bitset.flip();
|
||||
return bitset;
|
||||
}
|
||||
|
||||
const TargetBitmap
|
||||
IsNotNull() override {
|
||||
TargetBitmap bitset(this->Count());
|
||||
this->wrapper_->json_exist_query(json_path_, &bitset);
|
||||
return bitset;
|
||||
}
|
||||
|
||||
const TargetBitmap
|
||||
InApplyFilter(
|
||||
size_t n,
|
||||
const T* values,
|
||||
const std::function<bool(size_t /* offset */)>& filter) override {
|
||||
TargetBitmap bitset(this->Count());
|
||||
for (size_t i = 0; i < n; ++i) {
|
||||
this->wrapper_->json_term_query(json_path_, values[i], &bitset);
|
||||
apply_hits_with_filter(bitset, filter);
|
||||
}
|
||||
return bitset;
|
||||
}
|
||||
|
||||
virtual void
|
||||
InApplyCallback(
|
||||
size_t n,
|
||||
const T* values,
|
||||
const std::function<void(size_t /* offset */)>& callback) override {
|
||||
TargetBitmap bitset(this->Count());
|
||||
for (size_t i = 0; i < n; ++i) {
|
||||
this->wrapper_->json_term_query(json_path_, values[i], &bitset);
|
||||
apply_hits_with_callback(bitset, callback);
|
||||
}
|
||||
}
|
||||
|
||||
const TargetBitmap
|
||||
NotIn(size_t n, const T* values) override {
|
||||
TargetBitmap bitset(this->Count());
|
||||
for (size_t i = 0; i < n; ++i) {
|
||||
this->wrapper_->json_term_query(json_path_, values[i], &bitset);
|
||||
}
|
||||
|
||||
bitset.flip();
|
||||
|
||||
// TODO: optimize this
|
||||
auto null_bitset = IsNotNull();
|
||||
bitset &= null_bitset;
|
||||
|
||||
return bitset;
|
||||
}
|
||||
|
||||
const TargetBitmap
|
||||
Range(T value, OpType op) override {
|
||||
LOG_INFO("[executor] JsonFlatIndexQueryExecutor Range");
|
||||
TargetBitmap bitset(this->Count());
|
||||
switch (op) {
|
||||
case OpType::LessThan: {
|
||||
this->wrapper_->json_range_query(
|
||||
json_path_, T(), value, true, false, false, false, &bitset);
|
||||
} break;
|
||||
case OpType::LessEqual: {
|
||||
this->wrapper_->json_range_query(
|
||||
json_path_, T(), value, true, false, true, false, &bitset);
|
||||
} break;
|
||||
case OpType::GreaterThan: {
|
||||
this->wrapper_->json_range_query(
|
||||
json_path_, value, T(), false, true, false, false, &bitset);
|
||||
} break;
|
||||
case OpType::GreaterEqual: {
|
||||
this->wrapper_->json_range_query(
|
||||
json_path_, value, T(), false, true, true, false, &bitset);
|
||||
} break;
|
||||
default:
|
||||
PanicInfo(OpTypeInvalid,
|
||||
fmt::format("Invalid OperatorType: {}", op));
|
||||
}
|
||||
return bitset;
|
||||
}
|
||||
|
||||
const TargetBitmap
|
||||
Query(const DatasetPtr& dataset) override {
|
||||
return InvertedIndexTantivy<T>::Query(dataset);
|
||||
}
|
||||
|
||||
const TargetBitmap
|
||||
Range(T lower_bound_value,
|
||||
bool lb_inclusive,
|
||||
T upper_bound_value,
|
||||
bool ub_inclusive) override {
|
||||
TargetBitmap bitset(this->Count());
|
||||
this->wrapper_->json_range_query(json_path_,
|
||||
lower_bound_value,
|
||||
upper_bound_value,
|
||||
false,
|
||||
false,
|
||||
lb_inclusive,
|
||||
ub_inclusive,
|
||||
&bitset);
|
||||
return bitset;
|
||||
}
|
||||
|
||||
const TargetBitmap
|
||||
PrefixMatch(const std::string_view prefix) override {
|
||||
TargetBitmap bitset(this->Count());
|
||||
this->wrapper_->json_prefix_query(
|
||||
json_path_, std::string(prefix), &bitset);
|
||||
return bitset;
|
||||
}
|
||||
|
||||
const TargetBitmap
|
||||
RegexQuery(const std::string& pattern) override {
|
||||
TargetBitmap bitset(this->Count());
|
||||
this->wrapper_->json_regex_query(json_path_, pattern, &bitset);
|
||||
return bitset;
|
||||
}
|
||||
|
||||
private:
|
||||
std::string json_path_;
|
||||
};
|
||||
|
||||
// JsonFlatIndex is not bound to any specific type,
|
||||
// we need to reuse InvertedIndexTantivy's Build and Load implementation, so we specify the template parameter as std::string
|
||||
// JsonFlatIndex should not be used to execute queries, use JsonFlatIndexQueryExecutor instead
|
||||
class JsonFlatIndex : public InvertedIndexTantivy<std::string> {
|
||||
public:
|
||||
JsonFlatIndex() : InvertedIndexTantivy<std::string>() {
|
||||
}
|
||||
|
||||
explicit JsonFlatIndex(const storage::FileManagerContext& ctx,
|
||||
const std::string& nested_path)
|
||||
: InvertedIndexTantivy<std::string>(
|
||||
TANTIVY_INDEX_LATEST_VERSION, ctx, false, false),
|
||||
nested_path_(nested_path) {
|
||||
}
|
||||
|
||||
void
|
||||
build_index_for_json(const std::vector<std::shared_ptr<FieldDataBase>>&
|
||||
field_datas) override;
|
||||
|
||||
template <typename T>
|
||||
std::shared_ptr<JsonFlatIndexQueryExecutor<T>>
|
||||
create_executor(std::string json_path) const {
|
||||
// json path should be in the format of /a/b/c, we need to convert it to tantivy path like a.b.c
|
||||
std::replace(json_path.begin(), json_path.end(), '/', '.');
|
||||
if (!json_path.empty()) {
|
||||
json_path = json_path.substr(1);
|
||||
}
|
||||
|
||||
LOG_INFO("Create JsonFlatIndexQueryExecutor with json_path: {}",
|
||||
json_path);
|
||||
|
||||
return std::make_shared<JsonFlatIndexQueryExecutor<T>>(json_path,
|
||||
this->wrapper_);
|
||||
}
|
||||
|
||||
JsonCastType
|
||||
GetCastType() const override {
|
||||
return JsonCastType::FromString("JSON");
|
||||
}
|
||||
|
||||
std::string
|
||||
GetNestedPath() const {
|
||||
return nested_path_;
|
||||
}
|
||||
|
||||
void
|
||||
finish() {
|
||||
this->wrapper_->finish();
|
||||
}
|
||||
|
||||
void
|
||||
create_reader() {
|
||||
this->wrapper_->create_reader();
|
||||
}
|
||||
|
||||
private:
|
||||
std::string nested_path_;
|
||||
};
|
||||
} // namespace milvus::index
|
||||
@ -19,79 +19,11 @@
|
||||
#include "common/Types.h"
|
||||
#include "folly/FBVector.h"
|
||||
#include "log/Log.h"
|
||||
#include "common/JsonUtils.h"
|
||||
#include "simdjson/error.h"
|
||||
|
||||
namespace milvus::index {
|
||||
|
||||
// Parse a JSON Pointer into unescaped path segments
|
||||
std::vector<std::string>
|
||||
parse_json_pointer(const std::string& pointer) {
|
||||
std::vector<std::string> tokens;
|
||||
if (pointer.empty())
|
||||
return tokens; // Root path (entire document)
|
||||
if (pointer[0] != '/') {
|
||||
throw std::invalid_argument(
|
||||
"Invalid JSON Pointer: must start with '/'");
|
||||
}
|
||||
size_t start = 1;
|
||||
while (start < pointer.size()) {
|
||||
size_t end = pointer.find('/', start);
|
||||
if (end == std::string::npos)
|
||||
end = pointer.size();
|
||||
std::string token = pointer.substr(start, end - start);
|
||||
// Replace ~1 with / and ~0 with ~
|
||||
size_t pos = 0;
|
||||
while ((pos = token.find("~1", pos)) != std::string::npos) {
|
||||
token.replace(pos, 2, "/");
|
||||
pos += 1; // Avoid infinite loops on overlapping replacements
|
||||
}
|
||||
pos = 0;
|
||||
while ((pos = token.find("~0", pos)) != std::string::npos) {
|
||||
token.replace(pos, 2, "~");
|
||||
pos += 1;
|
||||
}
|
||||
tokens.push_back(token);
|
||||
start = end + 1;
|
||||
}
|
||||
return tokens;
|
||||
}
|
||||
|
||||
// Check if a JSON Pointer path exists
|
||||
bool
|
||||
path_exists(const simdjson::dom::element& root,
|
||||
const std::vector<std::string>& tokens) {
|
||||
simdjson::dom::element current = root;
|
||||
for (const auto& token : tokens) {
|
||||
if (current.type() == simdjson::dom::element_type::OBJECT) {
|
||||
auto obj = current.get_object();
|
||||
if (obj.error())
|
||||
return false;
|
||||
auto next = obj.value().at_key(token);
|
||||
if (next.error())
|
||||
return false;
|
||||
current = next.value();
|
||||
} else if (current.type() == simdjson::dom::element_type::ARRAY) {
|
||||
if (token == "-")
|
||||
return false; // "-" is invalid for existence checks
|
||||
char* endptr;
|
||||
long index = strtol(token.c_str(), &endptr, 10);
|
||||
if (*endptr != '\0' || index < 0)
|
||||
return false; // Not a valid index
|
||||
auto arr = current.get_array();
|
||||
if (arr.error())
|
||||
return false;
|
||||
if (static_cast<size_t>(index) >= arr.value().size())
|
||||
return false;
|
||||
auto next = arr.value().at(index);
|
||||
if (next.error())
|
||||
return false;
|
||||
current = next.value();
|
||||
} else {
|
||||
return false; // Path cannot be resolved
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
template <typename T>
|
||||
void
|
||||
JsonInvertedIndex<T>::build_index_for_json(
|
||||
@ -120,9 +52,7 @@ JsonInvertedIndex<T>::build_index_for_json(
|
||||
}
|
||||
|
||||
auto exists = path_exists(json_column->dom_doc(), tokens);
|
||||
if (!exists ||
|
||||
nested_path_ != "" &&
|
||||
json_column->doc().at_pointer(nested_path_).is_null()) {
|
||||
if (!exists || !json_column->exist(nested_path_)) {
|
||||
error_recorder_.Record(
|
||||
*json_column, nested_path_, simdjson::NO_SUCH_FIELD);
|
||||
this->null_offset_.push_back(offset);
|
||||
|
||||
@ -44,6 +44,7 @@
|
||||
#include "google/protobuf/message_lite.h"
|
||||
#include "index/Index.h"
|
||||
#include "index/IndexFactory.h"
|
||||
#include "index/JsonFlatIndex.h"
|
||||
#include "index/VectorMemIndex.h"
|
||||
#include "mmap/ChunkedColumn.h"
|
||||
#include "mmap/Types.h"
|
||||
@ -163,11 +164,12 @@ ChunkedSegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
|
||||
|
||||
if (field_meta.get_data_type() == DataType::JSON) {
|
||||
auto path = info.index_params.at(JSON_PATH);
|
||||
JSONIndexKey key;
|
||||
key.nested_path = path;
|
||||
key.field_id = field_id;
|
||||
json_indexings_[key] =
|
||||
std::move(const_cast<LoadIndexInfo&>(info).index);
|
||||
JsonIndex index;
|
||||
index.nested_path = path;
|
||||
index.field_id = field_id;
|
||||
index.index = std::move(const_cast<LoadIndexInfo&>(info).index);
|
||||
index.cast_type = index.index->GetCastType();
|
||||
json_indices.push_back(std::move(index));
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@ -14,6 +14,7 @@
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include <index/ScalarIndex.h>
|
||||
@ -30,6 +31,8 @@
|
||||
#include "common/QueryResult.h"
|
||||
#include "common/QueryInfo.h"
|
||||
#include "mmap/ChunkedColumnInterface.h"
|
||||
#include "index/Index.h"
|
||||
#include "index/JsonFlatIndex.h"
|
||||
#include "query/Plan.h"
|
||||
#include "pb/segcore.pb.h"
|
||||
#include "index/SkipIndex.h"
|
||||
@ -245,6 +248,17 @@ class SegmentInternalInterface : public SegmentInterface {
|
||||
return PinWrapper<const index::ScalarIndex<T>*>(pw, ptr);
|
||||
}
|
||||
|
||||
// We should not expose this interface directly, but access the index through chunk_scalar_index.
|
||||
// However, chunk_scalar_index requires specifying a template parameter, which makes it impossible to return JsonFlatIndex.
|
||||
// A better approach would be to have chunk_scalar_index return a pointer to a base class,
|
||||
// and then use dynamic_cast to convert it. But this would cause a lot of code changes, so for now, we will do it this way.
|
||||
PinWrapper<const index::IndexBase*>
|
||||
chunk_json_index(FieldId field_id,
|
||||
std::string& json_path,
|
||||
int64_t chunk_id) const {
|
||||
return chunk_index_impl(field_id, json_path, chunk_id);
|
||||
}
|
||||
|
||||
// union(segment_id, field_id) as unique id
|
||||
virtual std::string
|
||||
GetUniqueFieldId(int64_t field_id) const {
|
||||
@ -491,7 +505,7 @@ class SegmentInternalInterface : public SegmentInterface {
|
||||
public:
|
||||
virtual PinWrapper<const index::IndexBase*>
|
||||
chunk_index_impl(FieldId field_id,
|
||||
std::string path,
|
||||
const std::string& path,
|
||||
int64_t chunk_id) const {
|
||||
PanicInfo(ErrorCode::NotImplemented, "not implemented");
|
||||
};
|
||||
|
||||
@ -12,11 +12,15 @@
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <string_view>
|
||||
#include <utility>
|
||||
|
||||
#include "common/JsonCastType.h"
|
||||
#include "common/LoadInfo.h"
|
||||
#include "common/Types.h"
|
||||
#include "index/Index.h"
|
||||
#include "index/JsonInvertedIndex.h"
|
||||
#include "index/JsonFlatIndex.h"
|
||||
#include "pb/segcore.pb.h"
|
||||
#include "segcore/InsertRecord.h"
|
||||
#include "segcore/SegmentInterface.h"
|
||||
@ -53,14 +57,38 @@ class SegmentSealed : public SegmentInternalInterface {
|
||||
|
||||
virtual index::IndexBase*
|
||||
GetJsonIndex(FieldId field_id, std::string path) const override {
|
||||
JSONIndexKey key;
|
||||
key.field_id = field_id;
|
||||
key.nested_path = path;
|
||||
auto index = json_indexings_.find(key);
|
||||
if (index == json_indexings_.end()) {
|
||||
return nullptr;
|
||||
int path_len_diff = std::numeric_limits<int>::max();
|
||||
index::IndexBase* best_match = nullptr;
|
||||
std::string_view path_view = path;
|
||||
for (const auto& index : json_indices) {
|
||||
if (index.field_id != field_id) {
|
||||
continue;
|
||||
}
|
||||
switch (index.cast_type.data_type()) {
|
||||
case JsonCastType::DataType::JSON:
|
||||
if (path_view.length() < index.nested_path.length()) {
|
||||
continue;
|
||||
}
|
||||
if (path_view.substr(0, index.nested_path.length()) ==
|
||||
index.nested_path) {
|
||||
int current_len_diff =
|
||||
path_view.length() - index.nested_path.length();
|
||||
if (current_len_diff < path_len_diff) {
|
||||
path_len_diff = current_len_diff;
|
||||
best_match = index.index.get();
|
||||
}
|
||||
if (path_len_diff == 0) {
|
||||
return best_match;
|
||||
}
|
||||
}
|
||||
break;
|
||||
default:
|
||||
if (index.nested_path == path) {
|
||||
return index.index.get();
|
||||
}
|
||||
}
|
||||
}
|
||||
return index->second.get();
|
||||
return best_match;
|
||||
}
|
||||
|
||||
virtual void
|
||||
@ -73,19 +101,6 @@ class SegmentSealed : public SegmentInternalInterface {
|
||||
return SegmentType::Sealed;
|
||||
}
|
||||
|
||||
PinWrapper<const index::IndexBase*>
|
||||
chunk_index_impl(FieldId field_id,
|
||||
std::string path,
|
||||
int64_t chunk_id) const override {
|
||||
JSONIndexKey key;
|
||||
key.field_id = field_id;
|
||||
key.nested_path = path;
|
||||
AssertInfo(json_indexings_.find(key) != json_indexings_.end(),
|
||||
"Cannot find json index with path: " + path);
|
||||
return PinWrapper<const index::IndexBase*>(
|
||||
json_indexings_.at(key).get());
|
||||
}
|
||||
|
||||
virtual bool
|
||||
HasIndex(FieldId field_id) const override = 0;
|
||||
bool
|
||||
@ -94,43 +109,48 @@ class SegmentSealed : public SegmentInternalInterface {
|
||||
DataType data_type,
|
||||
bool any_type = false,
|
||||
bool is_json_contain = false) const override {
|
||||
JSONIndexKey key;
|
||||
key.field_id = field_id;
|
||||
key.nested_path = path;
|
||||
auto index = json_indexings_.find(key);
|
||||
if (index == json_indexings_.end()) {
|
||||
return false;
|
||||
}
|
||||
if (any_type) {
|
||||
return true;
|
||||
}
|
||||
return index->second->IsDataTypeSupported(data_type, is_json_contain);
|
||||
auto it = std::find_if(
|
||||
json_indices.begin(),
|
||||
json_indices.end(),
|
||||
[field_id, path, data_type, any_type, is_json_contain](
|
||||
const JsonIndex& index) {
|
||||
if (index.field_id != field_id) {
|
||||
return false;
|
||||
}
|
||||
if (index.cast_type.data_type() ==
|
||||
JsonCastType::DataType::JSON) {
|
||||
// for json flat index, path should be a subpath of nested_path
|
||||
return path.substr(0, index.nested_path.length()) ==
|
||||
index.nested_path;
|
||||
}
|
||||
if (any_type) {
|
||||
return true;
|
||||
}
|
||||
return index.nested_path == path &&
|
||||
index.index->IsDataTypeSupported(data_type,
|
||||
is_json_contain);
|
||||
});
|
||||
return it != json_indices.end();
|
||||
}
|
||||
|
||||
protected:
|
||||
struct JSONIndexKey {
|
||||
virtual PinWrapper<const index::IndexBase*>
|
||||
chunk_index_impl(FieldId field_id, int64_t chunk_id) const override = 0;
|
||||
|
||||
PinWrapper<const index::IndexBase*>
|
||||
chunk_index_impl(FieldId field_id,
|
||||
const std::string& path,
|
||||
int64_t chunk_id) const override {
|
||||
return GetJsonIndex(field_id, path);
|
||||
}
|
||||
struct JsonIndex {
|
||||
FieldId field_id;
|
||||
std::string nested_path;
|
||||
bool
|
||||
operator==(const JSONIndexKey& other) const {
|
||||
return field_id == other.field_id &&
|
||||
nested_path == other.nested_path;
|
||||
}
|
||||
JsonCastType cast_type{JsonCastType::UNKNOWN};
|
||||
index::IndexBasePtr index;
|
||||
};
|
||||
|
||||
struct hash_helper {
|
||||
size_t
|
||||
operator()(const JSONIndexKey& k) const {
|
||||
std::hash<int64_t> h1;
|
||||
std::hash<std::string> h2;
|
||||
size_t hash_result = 0;
|
||||
boost::hash_combine(hash_result, h1(k.field_id.get()));
|
||||
boost::hash_combine(hash_result, h2(k.nested_path));
|
||||
return hash_result;
|
||||
}
|
||||
};
|
||||
std::unordered_map<JSONIndexKey, index::IndexBasePtr, hash_helper>
|
||||
json_indexings_;
|
||||
std::vector<JsonIndex> json_indices;
|
||||
};
|
||||
|
||||
using SegmentSealedSPtr = std::shared_ptr<SegmentSealed>;
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
#include <memory>
|
||||
|
||||
#include "arrow/array/builder_binary.h"
|
||||
#include "arrow/array/builder_nested.h"
|
||||
#include "arrow/scalar.h"
|
||||
#include "arrow/type_fwd.h"
|
||||
#include "fmt/format.h"
|
||||
|
||||
651
internal/core/thirdparty/tantivy/tantivy-binding/Cargo.lock
generated
vendored
651
internal/core/thirdparty/tantivy/tantivy-binding/Cargo.lock
generated
vendored
File diff suppressed because it is too large
Load Diff
@ -12,6 +12,7 @@ enum class TantivyDataType : uint8_t {
|
||||
I64,
|
||||
F64,
|
||||
Bool,
|
||||
JSON,
|
||||
};
|
||||
|
||||
struct RustArray {
|
||||
@ -202,6 +203,72 @@ RustResult tantivy_prefix_query_keyword(void *ptr, const char *prefix, void *bit
|
||||
|
||||
RustResult tantivy_regex_query(void *ptr, const char *pattern, void *bitset);
|
||||
|
||||
RustResult tantivy_json_term_query_i64(void *ptr,
|
||||
const char *json_path,
|
||||
int64_t term,
|
||||
void *bitset);
|
||||
|
||||
RustResult tantivy_json_term_query_f64(void *ptr, const char *json_path, double term, void *bitset);
|
||||
|
||||
RustResult tantivy_json_term_query_bool(void *ptr, const char *json_path, bool term, void *bitset);
|
||||
|
||||
RustResult tantivy_json_term_query_keyword(void *ptr,
|
||||
const char *json_path,
|
||||
const char *term,
|
||||
void *bitset);
|
||||
|
||||
RustResult tantivy_json_exist_query(void *ptr, const char *json_path, void *bitset);
|
||||
|
||||
RustResult tantivy_json_range_query_i64(void *ptr,
|
||||
const char *json_path,
|
||||
int64_t lower_bound,
|
||||
int64_t higher_bound,
|
||||
bool lb_unbounded,
|
||||
bool up_unbounded,
|
||||
bool lb_inclusive,
|
||||
bool ub_inclusive,
|
||||
void *bitset);
|
||||
|
||||
RustResult tantivy_json_range_query_f64(void *ptr,
|
||||
const char *json_path,
|
||||
double lower_bound,
|
||||
double higher_bound,
|
||||
bool lb_unbounded,
|
||||
bool up_unbounded,
|
||||
bool lb_inclusive,
|
||||
bool ub_inclusive,
|
||||
void *bitset);
|
||||
|
||||
RustResult tantivy_json_range_query_bool(void *ptr,
|
||||
const char *json_path,
|
||||
bool lower_bound,
|
||||
bool higher_bound,
|
||||
bool lb_unbounded,
|
||||
bool up_unbounded,
|
||||
bool lb_inclusive,
|
||||
bool ub_inclusive,
|
||||
void *bitset);
|
||||
|
||||
RustResult tantivy_json_range_query_keyword(void *ptr,
|
||||
const char *json_path,
|
||||
const char *lower_bound,
|
||||
const char *higher_bound,
|
||||
bool lb_unbounded,
|
||||
bool up_unbounded,
|
||||
bool lb_inclusive,
|
||||
bool ub_inclusive,
|
||||
void *bitset);
|
||||
|
||||
RustResult tantivy_json_regex_query(void *ptr,
|
||||
const char *json_path,
|
||||
const char *pattern,
|
||||
void *bitset);
|
||||
|
||||
RustResult tantivy_json_prefix_query(void *ptr,
|
||||
const char *json_path,
|
||||
const char *prefix,
|
||||
void *bitset);
|
||||
|
||||
RustResult tantivy_match_query(void *ptr, const char *query, void *bitset);
|
||||
|
||||
RustResult tantivy_phrase_match_query(void *ptr, const char *query, uint32_t slop, void *bitset);
|
||||
@ -303,6 +370,13 @@ RustResult tantivy_index_add_json_key_stats_data_by_batch(void *ptr,
|
||||
const uintptr_t *json_offsets_len,
|
||||
uintptr_t len);
|
||||
|
||||
RustResult tantivy_index_add_json(void *ptr, const char *s, int64_t offset);
|
||||
|
||||
RustResult tantivy_index_add_array_json(void *ptr,
|
||||
const char *const *array,
|
||||
uintptr_t len,
|
||||
int64_t offset);
|
||||
|
||||
RustResult tantivy_index_add_array_int8s(void *ptr,
|
||||
const int8_t *array,
|
||||
uintptr_t len,
|
||||
|
||||
@ -8,4 +8,5 @@ pub enum TantivyDataType {
|
||||
I64,
|
||||
F64,
|
||||
Bool,
|
||||
JSON,
|
||||
}
|
||||
|
||||
@ -2,7 +2,8 @@ use std::ffi::c_void;
|
||||
use std::ops::Bound;
|
||||
use std::sync::Arc;
|
||||
|
||||
use tantivy::query::{Query, RangeQuery, RegexQuery, TermQuery};
|
||||
use tantivy::fastfield::FastValue;
|
||||
use tantivy::query::{ExistsQuery, Query, RangeQuery, RegexQuery, TermQuery};
|
||||
use tantivy::schema::{Field, IndexRecordOption};
|
||||
use tantivy::{Index, IndexReader, ReloadPolicy, Term};
|
||||
|
||||
@ -338,6 +339,146 @@ impl IndexReaderWrapper {
|
||||
let q = RegexQuery::from_pattern(&pattern, self.field)?;
|
||||
self.search(&q, bitset)
|
||||
}
|
||||
|
||||
// JSON related query methods
|
||||
// These methods support querying JSON fields with different data types
|
||||
|
||||
pub fn json_term_query_i64(
|
||||
&self,
|
||||
json_path: &str,
|
||||
term: i64,
|
||||
bitset: *mut c_void,
|
||||
) -> Result<()> {
|
||||
let mut json_term = Term::from_field_json_path(self.field, json_path, false);
|
||||
json_term.append_type_and_fast_value(term);
|
||||
let q = TermQuery::new(json_term, IndexRecordOption::Basic);
|
||||
self.search(&q, bitset)
|
||||
}
|
||||
|
||||
pub fn json_term_query_f64(
|
||||
&self,
|
||||
json_path: &str,
|
||||
term: f64,
|
||||
bitset: *mut c_void,
|
||||
) -> Result<()> {
|
||||
let mut json_term = Term::from_field_json_path(self.field, json_path, false);
|
||||
json_term.append_type_and_fast_value(term);
|
||||
let q = TermQuery::new(json_term, IndexRecordOption::Basic);
|
||||
self.search(&q, bitset)
|
||||
}
|
||||
|
||||
pub fn json_term_query_bool(
|
||||
&self,
|
||||
json_path: &str,
|
||||
term: bool,
|
||||
bitset: *mut c_void,
|
||||
) -> Result<()> {
|
||||
let mut json_term = Term::from_field_json_path(self.field, json_path, false);
|
||||
json_term.append_type_and_fast_value(term);
|
||||
let q = TermQuery::new(json_term, IndexRecordOption::Basic);
|
||||
self.search(&q, bitset)
|
||||
}
|
||||
|
||||
pub fn json_term_query_keyword(
|
||||
&self,
|
||||
json_path: &str,
|
||||
term: &str,
|
||||
bitset: *mut c_void,
|
||||
) -> Result<()> {
|
||||
let mut json_term = Term::from_field_json_path(self.field, json_path, false);
|
||||
json_term.append_type_and_str(term);
|
||||
let q = TermQuery::new(json_term, IndexRecordOption::Basic);
|
||||
self.search(&q, bitset)
|
||||
}
|
||||
|
||||
pub fn json_exist_query(&self, json_path: &str, bitset: *mut c_void) -> Result<()> {
|
||||
let full_json_path = if json_path == "" {
|
||||
self.field_name.clone()
|
||||
} else {
|
||||
format!("{}.{}", self.field_name, json_path)
|
||||
};
|
||||
let q = ExistsQuery::new(full_json_path, true);
|
||||
self.search(&q, bitset)
|
||||
}
|
||||
|
||||
pub fn json_range_query<T: FastValue>(
|
||||
&self,
|
||||
json_path: &str,
|
||||
lower_bound: T,
|
||||
higher_bound: T,
|
||||
lb_unbounded: bool,
|
||||
up_unbounded: bool,
|
||||
lb_inclusive: bool,
|
||||
ub_inclusive: bool,
|
||||
bitset: *mut c_void,
|
||||
) -> Result<()> {
|
||||
let lb = if lb_unbounded {
|
||||
Bound::Unbounded
|
||||
} else {
|
||||
let mut term = Term::from_field_json_path(self.field, json_path, false);
|
||||
term.append_type_and_fast_value::<T>(lower_bound);
|
||||
make_bounds(term, lb_inclusive)
|
||||
};
|
||||
let ub = if up_unbounded {
|
||||
Bound::Unbounded
|
||||
} else {
|
||||
let mut term = Term::from_field_json_path(self.field, json_path, false);
|
||||
term.append_type_and_fast_value::<T>(higher_bound);
|
||||
make_bounds(term, ub_inclusive)
|
||||
};
|
||||
let q = RangeQuery::new(lb, ub);
|
||||
self.search(&q, bitset)
|
||||
}
|
||||
|
||||
pub fn json_range_query_keyword(
|
||||
&self,
|
||||
json_path: &str,
|
||||
lower_bound: &str,
|
||||
higher_bound: &str,
|
||||
lb_unbounded: bool,
|
||||
up_unbounded: bool,
|
||||
lb_inclusive: bool,
|
||||
ub_inclusive: bool,
|
||||
bitset: *mut c_void,
|
||||
) -> Result<()> {
|
||||
let lb = if lb_unbounded {
|
||||
Bound::Unbounded
|
||||
} else {
|
||||
let mut term = Term::from_field_json_path(self.field, json_path, false);
|
||||
term.append_type_and_str(lower_bound);
|
||||
make_bounds(term, lb_inclusive)
|
||||
};
|
||||
let ub = if up_unbounded {
|
||||
Bound::Unbounded
|
||||
} else {
|
||||
let mut term = Term::from_field_json_path(self.field, json_path, false);
|
||||
term.append_type_and_str(higher_bound);
|
||||
make_bounds(term, ub_inclusive)
|
||||
};
|
||||
let q = RangeQuery::new(lb, ub);
|
||||
self.search(&q, bitset)
|
||||
}
|
||||
|
||||
pub fn json_regex_query(
|
||||
&self,
|
||||
json_path: &str,
|
||||
pattern: &str,
|
||||
bitset: *mut c_void,
|
||||
) -> Result<()> {
|
||||
let q = RegexQuery::from_pattern_with_json_path(pattern, self.field, json_path)?;
|
||||
self.search(&q, bitset)
|
||||
}
|
||||
|
||||
pub fn json_prefix_query(
|
||||
&self,
|
||||
json_path: &str,
|
||||
prefix: &str,
|
||||
bitset: *mut c_void,
|
||||
) -> Result<()> {
|
||||
let escaped = regex::escape(prefix);
|
||||
let pattern = format!("{}(.|\n)*", escaped);
|
||||
self.json_regex_query(json_path, &pattern, bitset)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@ -301,3 +301,216 @@ pub extern "C" fn tantivy_regex_query(
|
||||
let pattern = cstr_to_str!(pattern);
|
||||
unsafe { (*real).regex_query(pattern, bitset).into() }
|
||||
}
|
||||
|
||||
// -------------------------json query--------------------
|
||||
#[no_mangle]
|
||||
pub extern "C" fn tantivy_json_term_query_i64(
|
||||
ptr: *mut c_void,
|
||||
json_path: *const c_char,
|
||||
term: i64,
|
||||
bitset: *mut c_void,
|
||||
) -> RustResult {
|
||||
let real = ptr as *mut IndexReaderWrapper;
|
||||
let json_path = cstr_to_str!(json_path);
|
||||
unsafe { (*real).json_term_query_i64(json_path, term, bitset).into() }
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub extern "C" fn tantivy_json_term_query_f64(
|
||||
ptr: *mut c_void,
|
||||
json_path: *const c_char,
|
||||
term: f64,
|
||||
bitset: *mut c_void,
|
||||
) -> RustResult {
|
||||
let real = ptr as *mut IndexReaderWrapper;
|
||||
let json_path = cstr_to_str!(json_path);
|
||||
unsafe { (*real).json_term_query_f64(json_path, term, bitset).into() }
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub extern "C" fn tantivy_json_term_query_bool(
|
||||
ptr: *mut c_void,
|
||||
json_path: *const c_char,
|
||||
term: bool,
|
||||
bitset: *mut c_void,
|
||||
) -> RustResult {
|
||||
let real = ptr as *mut IndexReaderWrapper;
|
||||
let json_path = cstr_to_str!(json_path);
|
||||
unsafe { (*real).json_term_query_bool(json_path, term, bitset).into() }
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub extern "C" fn tantivy_json_term_query_keyword(
|
||||
ptr: *mut c_void,
|
||||
json_path: *const c_char,
|
||||
term: *const c_char,
|
||||
bitset: *mut c_void,
|
||||
) -> RustResult {
|
||||
let real = ptr as *mut IndexReaderWrapper;
|
||||
let json_path = cstr_to_str!(json_path);
|
||||
let term = cstr_to_str!(term);
|
||||
unsafe {
|
||||
(*real)
|
||||
.json_term_query_keyword(json_path, term, bitset)
|
||||
.into()
|
||||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub extern "C" fn tantivy_json_exist_query(
|
||||
ptr: *mut c_void,
|
||||
json_path: *const c_char,
|
||||
bitset: *mut c_void,
|
||||
) -> RustResult {
|
||||
let real = ptr as *mut IndexReaderWrapper;
|
||||
let json_path = cstr_to_str!(json_path);
|
||||
unsafe { (*real).json_exist_query(json_path, bitset).into() }
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub extern "C" fn tantivy_json_range_query_i64(
|
||||
ptr: *mut c_void,
|
||||
json_path: *const c_char,
|
||||
lower_bound: i64,
|
||||
higher_bound: i64,
|
||||
lb_unbounded: bool,
|
||||
up_unbounded: bool,
|
||||
lb_inclusive: bool,
|
||||
ub_inclusive: bool,
|
||||
bitset: *mut c_void,
|
||||
) -> RustResult {
|
||||
let real = ptr as *mut IndexReaderWrapper;
|
||||
let json_path = cstr_to_str!(json_path);
|
||||
unsafe {
|
||||
(*real)
|
||||
.json_range_query(
|
||||
json_path,
|
||||
lower_bound,
|
||||
higher_bound,
|
||||
lb_unbounded,
|
||||
up_unbounded,
|
||||
lb_inclusive,
|
||||
ub_inclusive,
|
||||
bitset,
|
||||
)
|
||||
.into()
|
||||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub extern "C" fn tantivy_json_range_query_f64(
|
||||
ptr: *mut c_void,
|
||||
json_path: *const c_char,
|
||||
lower_bound: f64,
|
||||
higher_bound: f64,
|
||||
lb_unbounded: bool,
|
||||
up_unbounded: bool,
|
||||
lb_inclusive: bool,
|
||||
ub_inclusive: bool,
|
||||
bitset: *mut c_void,
|
||||
) -> RustResult {
|
||||
let real = ptr as *mut IndexReaderWrapper;
|
||||
let json_path = cstr_to_str!(json_path);
|
||||
unsafe {
|
||||
(*real)
|
||||
.json_range_query(
|
||||
json_path,
|
||||
lower_bound,
|
||||
higher_bound,
|
||||
lb_unbounded,
|
||||
up_unbounded,
|
||||
lb_inclusive,
|
||||
ub_inclusive,
|
||||
bitset,
|
||||
)
|
||||
.into()
|
||||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub extern "C" fn tantivy_json_range_query_bool(
|
||||
ptr: *mut c_void,
|
||||
json_path: *const c_char,
|
||||
lower_bound: bool,
|
||||
higher_bound: bool,
|
||||
lb_unbounded: bool,
|
||||
up_unbounded: bool,
|
||||
lb_inclusive: bool,
|
||||
ub_inclusive: bool,
|
||||
bitset: *mut c_void,
|
||||
) -> RustResult {
|
||||
let real = ptr as *mut IndexReaderWrapper;
|
||||
let json_path = cstr_to_str!(json_path);
|
||||
unsafe {
|
||||
(*real)
|
||||
.json_range_query(
|
||||
json_path,
|
||||
lower_bound,
|
||||
higher_bound,
|
||||
lb_unbounded,
|
||||
up_unbounded,
|
||||
lb_inclusive,
|
||||
ub_inclusive,
|
||||
bitset,
|
||||
)
|
||||
.into()
|
||||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub extern "C" fn tantivy_json_range_query_keyword(
|
||||
ptr: *mut c_void,
|
||||
json_path: *const c_char,
|
||||
lower_bound: *const c_char,
|
||||
higher_bound: *const c_char,
|
||||
lb_unbounded: bool,
|
||||
up_unbounded: bool,
|
||||
lb_inclusive: bool,
|
||||
ub_inclusive: bool,
|
||||
bitset: *mut c_void,
|
||||
) -> RustResult {
|
||||
let real = ptr as *mut IndexReaderWrapper;
|
||||
let json_path = cstr_to_str!(json_path);
|
||||
let lower_bound = cstr_to_str!(lower_bound);
|
||||
let higher_bound = cstr_to_str!(higher_bound);
|
||||
unsafe {
|
||||
(*real)
|
||||
.json_range_query_keyword(
|
||||
json_path,
|
||||
lower_bound,
|
||||
higher_bound,
|
||||
lb_unbounded,
|
||||
up_unbounded,
|
||||
lb_inclusive,
|
||||
ub_inclusive,
|
||||
bitset,
|
||||
)
|
||||
.into()
|
||||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub extern "C" fn tantivy_json_regex_query(
|
||||
ptr: *mut c_void,
|
||||
json_path: *const c_char,
|
||||
pattern: *const c_char,
|
||||
bitset: *mut c_void,
|
||||
) -> RustResult {
|
||||
let real = ptr as *mut IndexReaderWrapper;
|
||||
let json_path = cstr_to_str!(json_path);
|
||||
let pattern = cstr_to_str!(pattern);
|
||||
unsafe { (*real).json_regex_query(json_path, pattern, bitset).into() }
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub extern "C" fn tantivy_json_prefix_query(
|
||||
ptr: *mut c_void,
|
||||
json_path: *const c_char,
|
||||
prefix: *const c_char,
|
||||
bitset: *mut c_void,
|
||||
) -> RustResult {
|
||||
let real = ptr as *mut IndexReaderWrapper;
|
||||
let json_path = cstr_to_str!(json_path);
|
||||
let prefix = cstr_to_str!(prefix);
|
||||
unsafe { (*real).json_prefix_query(json_path, prefix, bitset).into() }
|
||||
}
|
||||
|
||||
@ -1,6 +1,14 @@
|
||||
use index_writer_v5::TantivyDocumentV5;
|
||||
use index_writer_v7::TantivyDocumentV7;
|
||||
use libc::c_char;
|
||||
use log::info;
|
||||
use tantivy::schema::{
|
||||
Field, IndexRecordOption, OwnedValue, Schema, SchemaBuilder, TextFieldIndexing, TextOptions,
|
||||
FAST, INDEXED, STRING,
|
||||
};
|
||||
use tantivy::{
|
||||
doc, tokenizer, Document, Index, IndexWriter, SingleSegmentIndexWriter, TantivyDocument,
|
||||
};
|
||||
|
||||
use crate::data_type::TantivyDataType;
|
||||
|
||||
@ -103,6 +111,30 @@ impl IndexWriterWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_json(&mut self, data: &str, offset: Option<i64>) -> Result<()> {
|
||||
match self {
|
||||
IndexWriterWrapper::V5(_) => {
|
||||
return Err(TantivyBindingError::InternalError(
|
||||
"add json with tantivy index version 5 is not supported from tantivy with version 7"
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
IndexWriterWrapper::V7(writer) => writer.add_json(data, offset.unwrap() as u32),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_array_json(&mut self, datas: &[*const c_char], offset: Option<i64>) -> Result<()> {
|
||||
match self {
|
||||
IndexWriterWrapper::V5(_) => {
|
||||
return Err(TantivyBindingError::InternalError(
|
||||
"add array json with tantivy index version 5 is not supported from tantivy with version 7"
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
IndexWriterWrapper::V7(writer) => writer.add_array_json(datas, offset.unwrap() as u32),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_array_keywords(
|
||||
&mut self,
|
||||
datas: &[*const c_char],
|
||||
|
||||
@ -445,6 +445,31 @@ pub extern "C" fn tantivy_index_add_json_key_stats_data_by_batch(
|
||||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub extern "C" fn tantivy_index_add_json(
|
||||
ptr: *mut c_void,
|
||||
s: *const c_char,
|
||||
offset: i64,
|
||||
) -> RustResult {
|
||||
let real = ptr as *mut IndexWriterWrapper;
|
||||
let s = cstr_to_str!(s);
|
||||
unsafe { (*real).add_json(s, Some(offset)).into() }
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub extern "C" fn tantivy_index_add_array_json(
|
||||
ptr: *mut c_void,
|
||||
array: *const *const c_char,
|
||||
len: usize,
|
||||
offset: i64,
|
||||
) -> RustResult {
|
||||
let real = ptr as *mut IndexWriterWrapper;
|
||||
unsafe {
|
||||
let arr = convert_to_rust_slice!(array, len);
|
||||
(*real).add_array_json(arr, Some(offset)).into()
|
||||
}
|
||||
}
|
||||
|
||||
// --------------------------------------------- array ------------------------------------------
|
||||
|
||||
#[no_mangle]
|
||||
|
||||
@ -7,8 +7,11 @@ use libc::c_char;
|
||||
use log::info;
|
||||
use tantivy_5::schema::{
|
||||
Field, IndexRecordOption, Schema, SchemaBuilder, TextFieldIndexing, TextOptions, FAST, INDEXED,
|
||||
STRING,
|
||||
};
|
||||
use tantivy_5::{
|
||||
doc, Document as TantivyDocument, Index, IndexWriter, SingleSegmentIndexWriter, UserOperation,
|
||||
};
|
||||
use tantivy_5::{doc, Document as TantivyDocument, Index, IndexWriter, SingleSegmentIndexWriter, UserOperation};
|
||||
|
||||
use crate::data_type::TantivyDataType;
|
||||
|
||||
@ -44,6 +47,7 @@ pub(crate) fn schema_builder_add_field(
|
||||
TantivyDataType::Text => {
|
||||
panic!("text should be indexed with analyzer");
|
||||
}
|
||||
TantivyDataType::JSON => schema_builder.add_json_field(&field_name, STRING | FAST),
|
||||
}
|
||||
}
|
||||
|
||||
@ -81,6 +85,13 @@ impl TantivyValue<TantivyDocument> for bool {
|
||||
}
|
||||
}
|
||||
|
||||
impl TantivyValue<TantivyDocument> for serde_json::Value {
|
||||
#[inline]
|
||||
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
|
||||
document.add_field_value(Field::from_field_id(field), self.clone());
|
||||
}
|
||||
}
|
||||
|
||||
impl IndexWriterWrapperImpl {
|
||||
pub fn new(
|
||||
field_name: &str,
|
||||
@ -174,6 +185,25 @@ impl IndexWriterWrapperImpl {
|
||||
self.add_document(document, offset)
|
||||
}
|
||||
|
||||
pub fn add_json(&mut self, data: &str, offset: Option<i64>) -> Result<()> {
|
||||
let j = serde_json::from_str::<serde_json::Value>(data)?;
|
||||
let mut document = TantivyDocument::default();
|
||||
j.add_to_document(self.field.field_id(), &mut document);
|
||||
|
||||
self.add_document(document, offset)
|
||||
}
|
||||
|
||||
pub fn add_array_json(&mut self, datas: &[*const c_char], offset: Option<i64>) -> Result<()> {
|
||||
let mut document = TantivyDocument::default();
|
||||
for element in datas {
|
||||
let data = unsafe { CStr::from_ptr(*element) };
|
||||
let j = serde_json::from_str::<serde_json::Value>(data.to_str()?)?;
|
||||
j.add_to_document(self.field.field_id(), &mut document);
|
||||
}
|
||||
|
||||
self.add_document(document, offset)
|
||||
}
|
||||
|
||||
pub fn add_array_keywords(
|
||||
&mut self,
|
||||
datas: &[*const c_char],
|
||||
|
||||
@ -7,7 +7,7 @@ use log::info;
|
||||
use tantivy::indexer::UserOperation;
|
||||
use tantivy::schema::{
|
||||
Field, IndexRecordOption, NumericOptions, Schema, SchemaBuilder, TextFieldIndexing,
|
||||
TextOptions, FAST,
|
||||
TextOptions, FAST, STRING,
|
||||
};
|
||||
use tantivy::{doc, Index, IndexWriter, TantivyDocument};
|
||||
|
||||
@ -47,6 +47,7 @@ pub(crate) fn schema_builder_add_field(
|
||||
TantivyDataType::Text => {
|
||||
panic!("text should be indexed with analyzer");
|
||||
}
|
||||
TantivyDataType::JSON => schema_builder.add_json_field(&field_name, STRING | FAST),
|
||||
}
|
||||
}
|
||||
|
||||
@ -84,6 +85,13 @@ impl TantivyValue<TantivyDocument> for bool {
|
||||
}
|
||||
}
|
||||
|
||||
impl TantivyValue<TantivyDocument> for serde_json::Value {
|
||||
#[inline]
|
||||
fn add_to_document(&self, field: u32, document: &mut TantivyDocument) {
|
||||
document.add_field_value(Field::from_field_id(field), self);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IndexWriterWrapperImpl {
|
||||
pub(crate) field: Field,
|
||||
pub(crate) index_writer: IndexWriter,
|
||||
@ -174,6 +182,25 @@ impl IndexWriterWrapperImpl {
|
||||
self.add_document(document, offset)
|
||||
}
|
||||
|
||||
pub fn add_json(&mut self, data: &str, offset: u32) -> Result<()> {
|
||||
let j = serde_json::from_str::<serde_json::Value>(data)?;
|
||||
let mut document = TantivyDocument::default();
|
||||
j.add_to_document(self.field.field_id(), &mut document);
|
||||
|
||||
self.add_document(document, offset)
|
||||
}
|
||||
|
||||
pub fn add_array_json(&mut self, datas: &[*const c_char], offset: u32) -> Result<()> {
|
||||
let mut document = TantivyDocument::default();
|
||||
for element in datas {
|
||||
let data = unsafe { CStr::from_ptr(*element) };
|
||||
let j = serde_json::from_str::<serde_json::Value>(data.to_str()?)?;
|
||||
j.add_to_document(self.field.field_id(), &mut document);
|
||||
}
|
||||
|
||||
self.add_document(document, offset)
|
||||
}
|
||||
|
||||
pub fn add_json_key_stats(
|
||||
&mut self,
|
||||
keys: &[*const c_char],
|
||||
|
||||
197
internal/core/thirdparty/tantivy/tantivy-wrapper.h
vendored
197
internal/core/thirdparty/tantivy/tantivy-wrapper.h
vendored
@ -8,6 +8,7 @@
|
||||
#include <type_traits>
|
||||
|
||||
#include "common/EasyAssert.h"
|
||||
#include "common/Json.h"
|
||||
#include "tantivy-binding.h"
|
||||
#include "rust-binding.h"
|
||||
#include "rust-array.h"
|
||||
@ -85,10 +86,10 @@ struct TantivyIndexWrapper {
|
||||
const char* path,
|
||||
uint32_t tantivy_index_version,
|
||||
bool inverted_single_semgnent = false,
|
||||
bool enable_user_specified_doc_id = true,
|
||||
uintptr_t num_threads = DEFAULT_NUM_THREADS,
|
||||
uintptr_t overall_memory_budget_in_bytes =
|
||||
DEFAULT_OVERALL_MEMORY_BUDGET_IN_BYTES,
|
||||
bool enable_user_specified_doc_id = true) {
|
||||
DEFAULT_OVERALL_MEMORY_BUDGET_IN_BYTES) {
|
||||
RustResultWrapper res;
|
||||
if (inverted_single_semgnent) {
|
||||
AssertInfo(tantivy_index_version == 5,
|
||||
@ -309,6 +310,34 @@ struct TantivyIndexWrapper {
|
||||
res.result_->error);
|
||||
}
|
||||
|
||||
void
|
||||
add_json_data(const Json* array, uintptr_t len, int64_t offset_begin) {
|
||||
assert(!finished_);
|
||||
for (uintptr_t i = 0; i < len; i++) {
|
||||
auto res = RustResultWrapper(tantivy_index_add_json(
|
||||
writer_, array[i].data().data(), offset_begin + i));
|
||||
AssertInfo(res.result_->success,
|
||||
"failed to add json: {}",
|
||||
res.result_->error);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
add_json_array_data(const Json* array,
|
||||
uintptr_t len,
|
||||
int64_t offset_begin) {
|
||||
assert(!finished_);
|
||||
std::vector<const char*> views;
|
||||
for (uintptr_t i = 0; i < len; i++) {
|
||||
views.push_back(array[i].c_str());
|
||||
}
|
||||
auto res = RustResultWrapper(tantivy_index_add_array_json(
|
||||
writer_, views.data(), len, offset_begin));
|
||||
AssertInfo(res.result_->success,
|
||||
"failed to add multi json: {}",
|
||||
res.result_->error);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void
|
||||
add_array_data(const T* array, uintptr_t len, int64_t offset) {
|
||||
@ -879,6 +908,170 @@ struct TantivyIndexWrapper {
|
||||
"TantivyIndexWrapper.phrase_match_query: invalid result type");
|
||||
}
|
||||
|
||||
// json query
|
||||
template <typename T>
|
||||
void
|
||||
json_term_query(const std::string& json_path, T term, void* bitset) {
|
||||
auto array = [&]() {
|
||||
if constexpr (std::is_same_v<T, bool>) {
|
||||
return tantivy_json_term_query_bool(
|
||||
reader_, json_path.c_str(), term, bitset);
|
||||
}
|
||||
|
||||
if constexpr (std::is_integral_v<T>) {
|
||||
auto res = tantivy_json_term_query_i64(
|
||||
reader_, json_path.c_str(), term, bitset);
|
||||
AssertInfo(res.success,
|
||||
"TantivyIndexWrapper.json_term_query: {}",
|
||||
res.error);
|
||||
return tantivy_json_term_query_f64(
|
||||
reader_, json_path.c_str(), term, bitset);
|
||||
}
|
||||
|
||||
if constexpr (std::is_floating_point_v<T>) {
|
||||
// if term can be cast to int64 without precision loss, use int64 query first
|
||||
if (std::floor(term) == term) {
|
||||
auto res = tantivy_json_term_query_i64(
|
||||
reader_, json_path.c_str(), term, bitset);
|
||||
AssertInfo(res.success,
|
||||
"TantivyIndexWrapper.json_term_query: {}",
|
||||
res.error);
|
||||
}
|
||||
return tantivy_json_term_query_f64(
|
||||
reader_, json_path.c_str(), term, bitset);
|
||||
}
|
||||
|
||||
if constexpr (std::is_same_v<T, std::string>) {
|
||||
return tantivy_json_term_query_keyword(
|
||||
reader_, json_path.c_str(), term.c_str(), bitset);
|
||||
}
|
||||
|
||||
throw fmt::format(
|
||||
"InvertedIndex.json_term_query: unsupported data type: {}",
|
||||
typeid(T).name());
|
||||
return RustResult();
|
||||
}();
|
||||
auto res = RustResultWrapper(array);
|
||||
AssertInfo(res.result_->success,
|
||||
"TantivyIndexWrapper.json_term_query: {}",
|
||||
res.result_->error);
|
||||
AssertInfo(res.result_->value.tag == Value::Tag::None,
|
||||
"TantivyIndexWrapper.json_term_query: invalid result type");
|
||||
}
|
||||
|
||||
void
|
||||
json_exist_query(const std::string& json_path, void* bitset) {
|
||||
auto array =
|
||||
tantivy_json_exist_query(reader_, json_path.c_str(), bitset);
|
||||
auto res = RustResultWrapper(array);
|
||||
AssertInfo(res.result_->success,
|
||||
"TantivyIndexWrapper.json_exist_query: {}",
|
||||
res.result_->error);
|
||||
AssertInfo(res.result_->value.tag == Value::Tag::None,
|
||||
"TantivyIndexWrapper.json_exist_query: invalid result type");
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void
|
||||
json_range_query(const std::string& json_path,
|
||||
T lower_bound,
|
||||
T upper_bound,
|
||||
bool lb_unbounded,
|
||||
bool ub_unbounded,
|
||||
bool lb_inclusive,
|
||||
bool ub_inclusive,
|
||||
void* bitset) {
|
||||
auto array = [&]() {
|
||||
if constexpr (std::is_same_v<T, bool>) {
|
||||
return tantivy_json_range_query_bool(reader_,
|
||||
json_path.c_str(),
|
||||
lower_bound,
|
||||
upper_bound,
|
||||
lb_unbounded,
|
||||
ub_unbounded,
|
||||
lb_inclusive,
|
||||
ub_inclusive,
|
||||
bitset);
|
||||
}
|
||||
|
||||
if constexpr (std::is_integral_v<T>) {
|
||||
return tantivy_json_range_query_i64(reader_,
|
||||
json_path.c_str(),
|
||||
lower_bound,
|
||||
upper_bound,
|
||||
lb_unbounded,
|
||||
ub_unbounded,
|
||||
lb_inclusive,
|
||||
ub_inclusive,
|
||||
bitset);
|
||||
}
|
||||
|
||||
if constexpr (std::is_floating_point_v<T>) {
|
||||
return tantivy_json_range_query_f64(reader_,
|
||||
json_path.c_str(),
|
||||
lower_bound,
|
||||
upper_bound,
|
||||
lb_unbounded,
|
||||
ub_unbounded,
|
||||
lb_inclusive,
|
||||
ub_inclusive,
|
||||
bitset);
|
||||
}
|
||||
|
||||
if constexpr (std::is_same_v<T, std::string>) {
|
||||
return tantivy_json_range_query_keyword(reader_,
|
||||
json_path.c_str(),
|
||||
lower_bound.c_str(),
|
||||
upper_bound.c_str(),
|
||||
lb_unbounded,
|
||||
ub_unbounded,
|
||||
lb_inclusive,
|
||||
ub_inclusive,
|
||||
bitset);
|
||||
}
|
||||
|
||||
throw fmt::format(
|
||||
"InvertedIndex.json_range_query: unsupported data type: {}",
|
||||
typeid(T).name());
|
||||
return RustResult();
|
||||
}();
|
||||
auto res = RustResultWrapper(array);
|
||||
AssertInfo(res.result_->success,
|
||||
"TantivyIndexWrapper.json_range_query: {}",
|
||||
res.result_->error);
|
||||
AssertInfo(res.result_->value.tag == Value::Tag::None,
|
||||
"TantivyIndexWrapper.json_range_query: invalid result type");
|
||||
}
|
||||
|
||||
void
|
||||
json_regex_query(const std::string& json_path,
|
||||
const std::string& pattern,
|
||||
void* bitset) {
|
||||
auto array = tantivy_json_regex_query(
|
||||
reader_, json_path.c_str(), pattern.c_str(), bitset);
|
||||
auto res = RustResultWrapper(array);
|
||||
AssertInfo(res.result_->success,
|
||||
"TantivyIndexWrapper.json_regex_query: {}",
|
||||
res.result_->error);
|
||||
AssertInfo(res.result_->value.tag == Value::Tag::None,
|
||||
"TantivyIndexWrapper.json_regex_query: invalid result type");
|
||||
}
|
||||
|
||||
void
|
||||
json_prefix_query(const std::string& json_path,
|
||||
const std::string& prefix,
|
||||
void* bitset) {
|
||||
auto array = tantivy_json_prefix_query(
|
||||
reader_, json_path.c_str(), prefix.c_str(), bitset);
|
||||
auto res = RustResultWrapper(array);
|
||||
AssertInfo(res.result_->success,
|
||||
"TantivyIndexWrapper.json_prefix_query: {}",
|
||||
res.result_->error);
|
||||
AssertInfo(
|
||||
res.result_->value.tag == Value::Tag::None,
|
||||
"TantivyIndexWrapper.json_prefix_query: invalid result type");
|
||||
}
|
||||
|
||||
public:
|
||||
inline IndexWriter
|
||||
get_writer() {
|
||||
|
||||
@ -104,6 +104,7 @@ set(MILVUS_TEST_FILES
|
||||
test_group_chunk_translator.cpp
|
||||
test_chunked_segment_storage_v2.cpp
|
||||
test_thread_pool.cpp
|
||||
test_json_flat_index.cpp
|
||||
)
|
||||
|
||||
if ( INDEX_ENGINE STREQUAL "cardinal" )
|
||||
|
||||
685
internal/core/unittest/test_json_flat_index.cpp
Normal file
685
internal/core/unittest/test_json_flat_index.cpp
Normal file
@ -0,0 +1,685 @@
|
||||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <functional>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <memory>
|
||||
#include <unordered_set>
|
||||
|
||||
#include "common/Consts.h"
|
||||
#include "common/Tracer.h"
|
||||
#include "expr/ITypeExpr.h"
|
||||
#include "index/JsonFlatIndex.h"
|
||||
#include "pb/plan.pb.h"
|
||||
#include "plan/PlanNode.h"
|
||||
#include "query/ExecPlanNodeVisitor.h"
|
||||
#include "segcore/ChunkedSegmentSealedImpl.h"
|
||||
#include "segcore/SegmentSealed.h"
|
||||
#include "storage/RemoteChunkManagerSingleton.h"
|
||||
#include "storage/Util.h"
|
||||
#include "storage/InsertData.h"
|
||||
#include "indexbuilder/IndexFactory.h"
|
||||
#include "index/IndexFactory.h"
|
||||
#include "test_utils/indexbuilder_test_utils.h"
|
||||
#include "index/Meta.h"
|
||||
#include "index/Index.h"
|
||||
#include "common/Json.h"
|
||||
#include "simdjson/padded_string.h"
|
||||
#include "common/FieldData.h"
|
||||
#include "test_utils/storage_test_utils.h"
|
||||
|
||||
using namespace milvus;
|
||||
|
||||
namespace milvus::test {
|
||||
auto
|
||||
generate_field_meta(int64_t collection_id = 1,
|
||||
int64_t partition_id = 2,
|
||||
int64_t segment_id = 3,
|
||||
int64_t field_id = 101,
|
||||
DataType data_type = DataType::NONE,
|
||||
DataType element_type = DataType::NONE,
|
||||
bool nullable = false) -> storage::FieldDataMeta {
|
||||
auto meta = storage::FieldDataMeta{
|
||||
.collection_id = collection_id,
|
||||
.partition_id = partition_id,
|
||||
.segment_id = segment_id,
|
||||
.field_id = field_id,
|
||||
};
|
||||
meta.field_schema.set_data_type(
|
||||
static_cast<proto::schema::DataType>(data_type));
|
||||
meta.field_schema.set_element_type(
|
||||
static_cast<proto::schema::DataType>(element_type));
|
||||
meta.field_schema.set_nullable(nullable);
|
||||
return meta;
|
||||
}
|
||||
|
||||
auto
|
||||
generate_index_meta(int64_t segment_id = 3,
|
||||
int64_t field_id = 101,
|
||||
int64_t index_build_id = 1000,
|
||||
int64_t index_version = 10000) -> storage::IndexMeta {
|
||||
return storage::IndexMeta{
|
||||
.segment_id = segment_id,
|
||||
.field_id = field_id,
|
||||
.build_id = index_build_id,
|
||||
.index_version = index_version,
|
||||
};
|
||||
}
|
||||
|
||||
auto
|
||||
generate_local_storage_config(const std::string& root_path)
|
||||
-> storage::StorageConfig {
|
||||
auto ret = storage::StorageConfig{};
|
||||
ret.storage_type = "local";
|
||||
ret.root_path = root_path;
|
||||
return ret;
|
||||
}
|
||||
|
||||
struct ChunkManagerWrapper {
|
||||
ChunkManagerWrapper(storage::ChunkManagerPtr cm) : cm_(cm) {
|
||||
}
|
||||
|
||||
~ChunkManagerWrapper() {
|
||||
for (const auto& file : written_) {
|
||||
cm_->Remove(file);
|
||||
}
|
||||
|
||||
boost::filesystem::remove_all(cm_->GetRootPath());
|
||||
}
|
||||
|
||||
void
|
||||
Write(const std::string& filepath, void* buf, uint64_t len) {
|
||||
written_.insert(filepath);
|
||||
cm_->Write(filepath, buf, len);
|
||||
}
|
||||
|
||||
const storage::ChunkManagerPtr cm_;
|
||||
std::unordered_set<std::string> written_;
|
||||
};
|
||||
|
||||
class JsonFlatIndexTest : public ::testing::Test {
|
||||
protected:
|
||||
void
|
||||
SetUp() override {
|
||||
int64_t collection_id = 1;
|
||||
int64_t partition_id = 2;
|
||||
int64_t segment_id = 3;
|
||||
int64_t field_id = 101;
|
||||
int64_t index_build_id = 4000;
|
||||
int64_t index_version = 4000;
|
||||
|
||||
field_meta_ = test::generate_field_meta(
|
||||
collection_id, partition_id, segment_id, field_id, DataType::JSON);
|
||||
index_meta_ = test::generate_index_meta(
|
||||
segment_id, field_id, index_build_id, index_version);
|
||||
|
||||
std::string root_path = "/tmp/test-json-flat-index/";
|
||||
auto storage_config = test::generate_local_storage_config(root_path);
|
||||
cm_ = storage::CreateChunkManager(storage_config);
|
||||
|
||||
json_data_ = {
|
||||
R"({"profile": {"name": {"first": "Alice", "last": "Smith", "preferred_name": "Al"}, "team": {"name": "Engineering", "supervisor": {"name": "Bob"}}, "is_active": true, "employee_id": 1001, "skills": ["cpp", "rust", "python"], "scores": [95, 88, 92]}})",
|
||||
R"({"profile": {"name": {"first": "Bob", "last": "Johnson", "preferred_name": null}, "team": {"name": "Product", "supervisor": {"name": "Charlie"}}, "is_active": false, "employee_id": 1002, "skills": ["java", "python"], "scores": [85, 90]}})",
|
||||
R"({"profile": {"name": {"first": "Charlie", "last": "Williams"}, "team": {"name": "Design", "supervisor": {"name": "Alice"}}, "is_active": true, "employee_id": 1003, "skills": ["python", "javascript"], "scores": [87, 91, 89]}})"};
|
||||
|
||||
// Create field data with JSON values
|
||||
auto field_data = storage::CreateFieldData(DataType::JSON);
|
||||
std::vector<Json> json_vec;
|
||||
for (const auto& json_str : json_data_) {
|
||||
json_vec.push_back(Json(simdjson::padded_string(json_str)));
|
||||
}
|
||||
field_data->FillFieldData(json_vec.data(), json_vec.size());
|
||||
|
||||
auto payload_reader =
|
||||
std::make_shared<milvus::storage::PayloadReader>(field_data);
|
||||
storage::InsertData insert_data(payload_reader);
|
||||
insert_data.SetFieldDataMeta(field_meta_);
|
||||
insert_data.SetTimestamps(0, 100);
|
||||
|
||||
auto serialized_bytes = insert_data.Serialize(storage::Remote);
|
||||
|
||||
auto get_binlog_path = [=](int64_t log_id) {
|
||||
return fmt::format("{}/{}/{}/{}/{}",
|
||||
collection_id,
|
||||
partition_id,
|
||||
segment_id,
|
||||
field_id,
|
||||
log_id);
|
||||
};
|
||||
|
||||
log_path_ = get_binlog_path(0);
|
||||
|
||||
cm_w_ = std::make_unique<test::ChunkManagerWrapper>(cm_);
|
||||
cm_w_->Write(
|
||||
log_path_, serialized_bytes.data(), serialized_bytes.size());
|
||||
|
||||
ctx_ = std::make_unique<storage::FileManagerContext>(
|
||||
field_meta_, index_meta_, cm_);
|
||||
|
||||
// Build index
|
||||
Config config;
|
||||
config["index_type"] = milvus::index::INVERTED_INDEX_TYPE;
|
||||
config["insert_files"] = std::vector<std::string>{log_path_};
|
||||
{
|
||||
auto index = std::make_shared<index::JsonFlatIndex>(*ctx_, "");
|
||||
index->Build(config);
|
||||
|
||||
auto create_index_result = index->Upload();
|
||||
auto memSize = create_index_result->GetMemSize();
|
||||
auto serializedSize = create_index_result->GetSerializedSize();
|
||||
ASSERT_GT(memSize, 0);
|
||||
ASSERT_GT(serializedSize, 0);
|
||||
index_files_ = create_index_result->GetIndexFiles();
|
||||
}
|
||||
|
||||
// Load index
|
||||
index::CreateIndexInfo index_info{};
|
||||
index_info.index_type = milvus::index::INVERTED_INDEX_TYPE;
|
||||
index_info.field_type = DataType::JSON;
|
||||
|
||||
Config load_config;
|
||||
load_config["index_files"] = index_files_;
|
||||
|
||||
ctx_->set_for_loading_index(true);
|
||||
json_index_ = std::make_shared<index::JsonFlatIndex>(*ctx_, "");
|
||||
json_index_->Load(milvus::tracer::TraceContext{}, load_config);
|
||||
|
||||
auto cnt = json_index_->Count();
|
||||
ASSERT_EQ(cnt, json_data_.size());
|
||||
}
|
||||
|
||||
void
|
||||
TearDown() override {
|
||||
cm_w_.reset();
|
||||
boost::filesystem::remove_all("/tmp/test-json-flat-index/");
|
||||
}
|
||||
|
||||
storage::FieldDataMeta field_meta_;
|
||||
storage::IndexMeta index_meta_;
|
||||
storage::ChunkManagerPtr cm_;
|
||||
std::unique_ptr<test::ChunkManagerWrapper> cm_w_;
|
||||
std::unique_ptr<storage::FileManagerContext> ctx_;
|
||||
std::string log_path_;
|
||||
std::vector<std::string> json_data_;
|
||||
std::vector<std::string> index_files_;
|
||||
std::shared_ptr<milvus::index::IndexBase> json_index_;
|
||||
};
|
||||
|
||||
TEST_F(JsonFlatIndexTest, TestInQuery) {
|
||||
auto json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(json_index_.get());
|
||||
ASSERT_NE(json_flat_index, nullptr);
|
||||
|
||||
std::string json_path = "/profile/name/first";
|
||||
auto executor = json_flat_index->create_executor<std::string>(json_path);
|
||||
|
||||
std::vector<std::string> names = {"Alice", "Bob"};
|
||||
auto result = executor->In(names.size(), names.data());
|
||||
ASSERT_EQ(result.size(), json_data_.size());
|
||||
ASSERT_TRUE(result[0]); // Alice
|
||||
ASSERT_TRUE(result[1]); // Bob
|
||||
ASSERT_FALSE(result[2]); // Charlie
|
||||
}
|
||||
|
||||
TEST_F(JsonFlatIndexTest, TestIsNullQuery) {
|
||||
auto json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(json_index_.get());
|
||||
ASSERT_NE(json_flat_index, nullptr);
|
||||
|
||||
std::string json_path = "/profile/name/preferred_name";
|
||||
auto executor = json_flat_index->create_executor<std::string>(json_path);
|
||||
auto result = executor->IsNull();
|
||||
ASSERT_EQ(result.size(), json_data_.size());
|
||||
ASSERT_FALSE(result[0]); // Al
|
||||
ASSERT_TRUE(result[1]); // null
|
||||
ASSERT_TRUE(result[2]); // not exist
|
||||
}
|
||||
|
||||
TEST_F(JsonFlatIndexTest, TestIsNotNullQuery) {
|
||||
auto json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(json_index_.get());
|
||||
ASSERT_NE(json_flat_index, nullptr);
|
||||
|
||||
std::string json_path = "/profile/name/preferred_name";
|
||||
auto executor = json_flat_index->create_executor<std::string>(json_path);
|
||||
auto result = executor->IsNotNull();
|
||||
ASSERT_EQ(result.size(), json_data_.size());
|
||||
ASSERT_TRUE(result[0]); // Al
|
||||
ASSERT_FALSE(result[1]); // null
|
||||
ASSERT_FALSE(result[2]); // not exist
|
||||
}
|
||||
|
||||
TEST_F(JsonFlatIndexTest, TestNotInQuery) {
|
||||
auto json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(json_index_.get());
|
||||
ASSERT_NE(json_flat_index, nullptr);
|
||||
|
||||
std::string json_path = "/profile/team/name";
|
||||
auto executor = json_flat_index->create_executor<std::string>(json_path);
|
||||
std::vector<std::string> teams = {"Engineering", "Product"};
|
||||
auto result = executor->NotIn(teams.size(), teams.data());
|
||||
ASSERT_EQ(result.size(), json_data_.size());
|
||||
ASSERT_FALSE(result[0]); // Engineering
|
||||
ASSERT_FALSE(result[1]); // Product
|
||||
ASSERT_TRUE(result[2]); // Design
|
||||
}
|
||||
|
||||
TEST_F(JsonFlatIndexTest, TestRangeQuery) {
|
||||
auto json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(json_index_.get());
|
||||
ASSERT_NE(json_flat_index, nullptr);
|
||||
|
||||
std::string json_path = "/profile/name/first";
|
||||
auto executor = json_flat_index->create_executor<std::string>(json_path);
|
||||
|
||||
// Test LessThan
|
||||
auto result = executor->Range(std::string("Charlie"), OpType::LessThan);
|
||||
ASSERT_EQ(result.size(), json_data_.size());
|
||||
ASSERT_TRUE(result[0]); // Alice < Charlie
|
||||
ASSERT_TRUE(result[1]); // Bob < Charlie
|
||||
ASSERT_FALSE(result[2]); // Charlie = Charlie
|
||||
|
||||
// Test Range between bounds
|
||||
auto range_result = executor->Range(std::string("Alice"),
|
||||
true, // lower bound inclusive
|
||||
std::string("Bob"),
|
||||
true); // upper bound inclusive
|
||||
ASSERT_EQ(range_result.size(), json_data_.size());
|
||||
ASSERT_TRUE(range_result[0]); // Alice in [Alice, Bob]
|
||||
ASSERT_TRUE(range_result[1]); // Bob in [Alice, Bob]
|
||||
ASSERT_FALSE(range_result[2]); // Charlie not in [Alice, Bob]
|
||||
}
|
||||
|
||||
TEST_F(JsonFlatIndexTest, TestPrefixMatchQuery) {
|
||||
auto json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(json_index_.get());
|
||||
ASSERT_NE(json_flat_index, nullptr);
|
||||
|
||||
std::string json_path = "/profile/name/first";
|
||||
auto executor = json_flat_index->create_executor<std::string>(json_path);
|
||||
auto result = executor->PrefixMatch("A");
|
||||
ASSERT_EQ(result.size(), json_data_.size());
|
||||
ASSERT_TRUE(result[0]); // Alice starts with A
|
||||
ASSERT_FALSE(result[1]); // Bob doesn't start with A
|
||||
ASSERT_FALSE(result[2]); // Charlie doesn't start with A
|
||||
}
|
||||
|
||||
TEST_F(JsonFlatIndexTest, TestRegexQuery) {
|
||||
auto json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(json_index_.get());
|
||||
ASSERT_NE(json_flat_index, nullptr);
|
||||
|
||||
std::string json_path = "/profile/name/first";
|
||||
auto executor = json_flat_index->create_executor<std::string>(json_path);
|
||||
auto result = executor->RegexQuery("[AB].*ice");
|
||||
ASSERT_EQ(result.size(), json_data_.size());
|
||||
ASSERT_TRUE(result[0]); // Alice matches [AB].*ice
|
||||
ASSERT_FALSE(result[1]); // Bob doesn't match [AB].*ice
|
||||
ASSERT_FALSE(result[2]); // Charlie doesn't match [AB].*ice
|
||||
|
||||
// Test another regex pattern
|
||||
auto result2 = executor->RegexQuery("B.b");
|
||||
ASSERT_EQ(result2.size(), json_data_.size());
|
||||
ASSERT_FALSE(result2[0]); // Alice doesn't match B.b
|
||||
ASSERT_TRUE(result2[1]); // Bob matches B.b
|
||||
ASSERT_FALSE(result2[2]); // Charlie doesn't match B.b
|
||||
}
|
||||
|
||||
TEST_F(JsonFlatIndexTest, TestPatternMatchQuery) {
|
||||
auto json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(json_index_.get());
|
||||
ASSERT_NE(json_flat_index, nullptr);
|
||||
|
||||
std::string json_path = "/profile/name/first";
|
||||
auto executor = json_flat_index->create_executor<std::string>(json_path);
|
||||
auto result = executor->PatternMatch("A%e", proto::plan::Match);
|
||||
ASSERT_EQ(result.size(), json_data_.size());
|
||||
ASSERT_TRUE(result[0]); // Alice matches A%e
|
||||
ASSERT_FALSE(result[1]); // Bob doesn't match A%e
|
||||
ASSERT_FALSE(result[2]); // Charlie doesn't match A%e
|
||||
}
|
||||
|
||||
TEST_F(JsonFlatIndexTest, TestBooleanInQuery) {
|
||||
auto json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(json_index_.get());
|
||||
ASSERT_NE(json_flat_index, nullptr);
|
||||
|
||||
std::string json_path = "/profile/is_active";
|
||||
auto executor = json_flat_index->create_executor<bool>(json_path);
|
||||
bool values[] = {true};
|
||||
auto result = executor->In(1, values);
|
||||
ASSERT_EQ(result.size(), json_data_.size());
|
||||
ASSERT_TRUE(result[0]); // Alice is active
|
||||
ASSERT_FALSE(result[1]); // Bob is not active
|
||||
ASSERT_TRUE(result[2]); // Charlie is active
|
||||
}
|
||||
|
||||
TEST_F(JsonFlatIndexTest, TestBooleanNotInQuery) {
|
||||
auto json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(json_index_.get());
|
||||
ASSERT_NE(json_flat_index, nullptr);
|
||||
|
||||
std::string json_path = "/profile/is_active";
|
||||
auto executor = json_flat_index->create_executor<bool>(json_path);
|
||||
bool values[] = {false};
|
||||
auto result = executor->NotIn(1, values);
|
||||
ASSERT_EQ(result.size(), json_data_.size());
|
||||
ASSERT_TRUE(result[0]); // Alice is not in [false]
|
||||
ASSERT_FALSE(result[1]); // Bob is in [false]
|
||||
ASSERT_TRUE(result[2]); // Charlie is not in [false]
|
||||
}
|
||||
|
||||
TEST_F(JsonFlatIndexTest, TestInt64InQuery) {
|
||||
auto json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(json_index_.get());
|
||||
ASSERT_NE(json_flat_index, nullptr);
|
||||
|
||||
std::string json_path = "/profile/employee_id";
|
||||
auto executor = json_flat_index->create_executor<int64_t>(json_path);
|
||||
int64_t values[] = {1001, 1002};
|
||||
auto result = executor->In(2, values);
|
||||
ASSERT_EQ(result.size(), json_data_.size());
|
||||
ASSERT_TRUE(result[0]); // Alice's id is 1001
|
||||
ASSERT_TRUE(result[1]); // Bob's id is 1002
|
||||
ASSERT_FALSE(result[2]); // Charlie's id is 1003
|
||||
}
|
||||
|
||||
TEST_F(JsonFlatIndexTest, TestInt64NotInQuery) {
|
||||
auto json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(json_index_.get());
|
||||
ASSERT_NE(json_flat_index, nullptr);
|
||||
|
||||
std::string json_path = "/profile/employee_id";
|
||||
auto executor = json_flat_index->create_executor<int64_t>(json_path);
|
||||
int64_t values[] = {1003};
|
||||
auto result = executor->NotIn(1, values);
|
||||
ASSERT_EQ(result.size(), json_data_.size());
|
||||
ASSERT_TRUE(result[0]); // Alice's id is not 1003
|
||||
ASSERT_TRUE(result[1]); // Bob's id is not 1003
|
||||
ASSERT_FALSE(result[2]); // Charlie's id is 1003
|
||||
}
|
||||
|
||||
TEST_F(JsonFlatIndexTest, TestInt64RangeQuery) {
|
||||
auto json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(json_index_.get());
|
||||
ASSERT_NE(json_flat_index, nullptr);
|
||||
|
||||
std::string json_path = "/profile/employee_id";
|
||||
auto executor = json_flat_index->create_executor<int64_t>(json_path);
|
||||
|
||||
// Test LessThan
|
||||
auto result = executor->Range(int64_t(1002), OpType::LessThan);
|
||||
ASSERT_EQ(result.size(), json_data_.size());
|
||||
ASSERT_TRUE(result[0]); // 1001 < 1002
|
||||
ASSERT_FALSE(result[1]); // 1002 = 1002
|
||||
ASSERT_FALSE(result[2]); // 1003 > 1002
|
||||
|
||||
// Test Range between bounds
|
||||
auto range_result = executor->Range(int64_t(1001), // lower bound
|
||||
true, // lower bound inclusive
|
||||
int64_t(1002), // upper bound
|
||||
true); // upper bound inclusive
|
||||
ASSERT_EQ(range_result.size(), json_data_.size());
|
||||
ASSERT_TRUE(range_result[0]); // 1001 in [1001, 1002]
|
||||
ASSERT_TRUE(range_result[1]); // 1002 in [1001, 1002]
|
||||
ASSERT_FALSE(range_result[2]); // 1003 not in [1001, 1002]
|
||||
|
||||
// Test GreaterEqual
|
||||
auto ge_result = executor->Range(int64_t(1002), OpType::GreaterEqual);
|
||||
ASSERT_EQ(ge_result.size(), json_data_.size());
|
||||
ASSERT_FALSE(ge_result[0]); // 1001 < 1002
|
||||
ASSERT_TRUE(ge_result[1]); // 1002 >= 1002
|
||||
ASSERT_TRUE(ge_result[2]); // 1003 >= 1002
|
||||
}
|
||||
|
||||
TEST_F(JsonFlatIndexTest, TestArrayStringInQuery) {
|
||||
auto json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(json_index_.get());
|
||||
ASSERT_NE(json_flat_index, nullptr);
|
||||
|
||||
std::string json_path = "/profile/skills";
|
||||
auto executor = json_flat_index->create_executor<std::string>(json_path);
|
||||
std::string values[] = {"cpp", "python"};
|
||||
|
||||
// Test for cpp
|
||||
auto result_cpp = executor->In(1, &values[0]);
|
||||
ASSERT_EQ(result_cpp.size(), json_data_.size());
|
||||
ASSERT_TRUE(result_cpp[0]); // Alice has cpp
|
||||
ASSERT_FALSE(result_cpp[1]); // Bob doesn't have cpp
|
||||
ASSERT_FALSE(result_cpp[2]); // Charlie doesn't have cpp
|
||||
|
||||
// Test for python
|
||||
auto result_python = executor->In(1, &values[1]);
|
||||
ASSERT_EQ(result_python.size(), json_data_.size());
|
||||
ASSERT_TRUE(result_python[0]); // Alice has python
|
||||
ASSERT_TRUE(result_python[1]); // Bob has python
|
||||
ASSERT_TRUE(result_python[2]); // Charlie has python
|
||||
}
|
||||
|
||||
TEST_F(JsonFlatIndexTest, TestArrayNumberInQuery) {
|
||||
auto json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(json_index_.get());
|
||||
ASSERT_NE(json_flat_index, nullptr);
|
||||
|
||||
std::string json_path = "/profile/scores";
|
||||
auto executor = json_flat_index->create_executor<int64_t>(json_path);
|
||||
int64_t values[] = {95, 90};
|
||||
|
||||
// Test for score 95
|
||||
auto result_95 = executor->In(1, &values[0]);
|
||||
ASSERT_EQ(result_95.size(), json_data_.size());
|
||||
ASSERT_TRUE(result_95[0]); // Alice has score 95
|
||||
ASSERT_FALSE(result_95[1]); // Bob doesn't have score 95
|
||||
ASSERT_FALSE(result_95[2]); // Charlie doesn't have score 95
|
||||
|
||||
// Test for score 90
|
||||
auto result_90 = executor->In(1, &values[1]);
|
||||
ASSERT_EQ(result_90.size(), json_data_.size());
|
||||
ASSERT_FALSE(result_90[0]); // Alice doesn't have score 90
|
||||
ASSERT_TRUE(result_90[1]); // Bob has score 90
|
||||
ASSERT_FALSE(result_90[2]); // Charlie doesn't have score 90
|
||||
}
|
||||
|
||||
TEST_F(JsonFlatIndexTest, TestArrayNumberRangeQuery) {
|
||||
auto json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(json_index_.get());
|
||||
ASSERT_NE(json_flat_index, nullptr);
|
||||
|
||||
std::string json_path = "/profile/scores";
|
||||
auto executor = json_flat_index->create_executor<int64_t>(json_path);
|
||||
|
||||
// Test scores greater than 90
|
||||
auto result = executor->Range(int64_t(90), OpType::GreaterThan);
|
||||
ASSERT_EQ(result.size(), json_data_.size());
|
||||
ASSERT_TRUE(result[0]); // Alice has scores > 90 (92, 95)
|
||||
ASSERT_FALSE(result[1]); // Bob doesn't have scores > 90
|
||||
ASSERT_TRUE(result[2]); // Charlie has score 91 > 90
|
||||
|
||||
// Test scores in range [90, 92]
|
||||
auto range_result = executor->Range(int64_t(90), // lower bound
|
||||
true, // lower bound inclusive
|
||||
int64_t(92), // upper bound
|
||||
true); // upper bound inclusive
|
||||
ASSERT_EQ(range_result.size(), json_data_.size());
|
||||
ASSERT_TRUE(range_result[0]); // Alice has score 92
|
||||
ASSERT_TRUE(range_result[1]); // Bob has score 90
|
||||
ASSERT_TRUE(range_result[2]); // Charlie has score 91
|
||||
}
|
||||
|
||||
TEST_F(JsonFlatIndexTest, TestInApply) {
|
||||
auto json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(json_index_.get());
|
||||
ASSERT_NE(json_flat_index, nullptr);
|
||||
|
||||
std::string json_path = "/profile/name/first";
|
||||
auto executor = json_flat_index->create_executor<std::string>(json_path);
|
||||
|
||||
std::string values[] = {"Alice", "Bob"};
|
||||
auto result =
|
||||
executor->InApplyFilter(2, values, [](size_t offset) { return true; });
|
||||
ASSERT_EQ(result.size(), json_data_.size());
|
||||
ASSERT_TRUE(result[0]); // Alice
|
||||
ASSERT_TRUE(result[1]); // Bob
|
||||
ASSERT_FALSE(result[2]); // Charlie
|
||||
}
|
||||
|
||||
TEST_F(JsonFlatIndexTest, TestInApplyCallback) {
|
||||
auto json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(json_index_.get());
|
||||
ASSERT_NE(json_flat_index, nullptr);
|
||||
|
||||
std::string json_path = "/profile/name/first";
|
||||
auto executor = json_flat_index->create_executor<std::string>(json_path);
|
||||
std::string values[] = {"Alice", "Bob"};
|
||||
executor->InApplyCallback(2, values, [](size_t offset) {
|
||||
ASSERT_TRUE(offset == 0 || offset == 1);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_F(JsonFlatIndexTest, TestQuery) {
|
||||
auto json_flat_index =
|
||||
dynamic_cast<index::JsonFlatIndex*>(json_index_.get());
|
||||
ASSERT_NE(json_flat_index, nullptr);
|
||||
|
||||
std::string json_path = "/profile/employee_id";
|
||||
auto executor = json_flat_index->create_executor<int64_t>(json_path);
|
||||
|
||||
auto dataset = std::make_unique<Dataset>();
|
||||
dataset->Set(milvus::index::OPERATOR_TYPE,
|
||||
proto::plan::OpType::GreaterThan);
|
||||
dataset->Set<int64_t>(milvus::index::RANGE_VALUE, 1001);
|
||||
auto result = executor->Query(std::move(dataset));
|
||||
ASSERT_EQ(result.size(), json_data_.size());
|
||||
ASSERT_FALSE(result[0]); // Alice
|
||||
ASSERT_TRUE(result[1]); // Bob
|
||||
ASSERT_TRUE(result[2]); // Charlie
|
||||
}
|
||||
|
||||
class JsonFlatIndexExprTest : public ::testing::Test {
|
||||
protected:
|
||||
void
|
||||
SetUp() override {
|
||||
json_data_ = {
|
||||
R"({"a": 1.0})",
|
||||
R"({"a": "abc"})",
|
||||
R"({"a": 3.0})",
|
||||
R"({"a": true})",
|
||||
R"({"a": {"b": 1}})",
|
||||
R"({"a": []})",
|
||||
R"({"a": ["a", "b"]})",
|
||||
R"({"a": null})", // exists null
|
||||
R"(1)",
|
||||
R"("abc")",
|
||||
R"(1.0)",
|
||||
R"(true)",
|
||||
R"([1, 2, 3])",
|
||||
R"({"a": 1, "b": 2})",
|
||||
R"({})",
|
||||
R"(null)",
|
||||
};
|
||||
|
||||
auto json_index_path = "";
|
||||
|
||||
auto schema = std::make_shared<Schema>();
|
||||
auto vec_fid = schema->AddDebugField(
|
||||
"fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2);
|
||||
auto i64_fid = schema->AddDebugField("age64", DataType::INT64);
|
||||
json_fid_ = schema->AddDebugField("json", DataType::JSON, true);
|
||||
schema->set_primary_field_id(i64_fid);
|
||||
|
||||
segment_ = segcore::CreateSealedSegment(schema);
|
||||
segcore::LoadIndexInfo load_index_info;
|
||||
|
||||
auto file_manager_ctx = storage::FileManagerContext();
|
||||
file_manager_ctx.fieldDataMeta.field_schema.set_data_type(
|
||||
milvus::proto::schema::JSON);
|
||||
file_manager_ctx.fieldDataMeta.field_schema.set_fieldid(
|
||||
json_fid_.get());
|
||||
file_manager_ctx.fieldDataMeta.field_schema.set_nullable(true);
|
||||
auto index = index::IndexFactory::GetInstance().CreateJsonIndex(
|
||||
index::INVERTED_INDEX_TYPE,
|
||||
JsonCastType::FromString("JSON"),
|
||||
json_index_path,
|
||||
file_manager_ctx);
|
||||
|
||||
json_index_ = std::unique_ptr<index::JsonFlatIndex>(
|
||||
static_cast<index::JsonFlatIndex*>(index.release()));
|
||||
|
||||
auto json_field =
|
||||
std::make_shared<FieldData<milvus::Json>>(DataType::JSON, true);
|
||||
std::vector<milvus::Json> jsons;
|
||||
for (auto& json_str : json_data_) {
|
||||
jsons.push_back(milvus::Json(simdjson::padded_string(json_str)));
|
||||
}
|
||||
json_field->add_json_data(jsons);
|
||||
auto json_valid_data = json_field->ValidData();
|
||||
json_valid_data[0] = 0xFF;
|
||||
json_valid_data[1] = 0xFF;
|
||||
|
||||
json_index_->BuildWithFieldData({json_field});
|
||||
json_index_->finish();
|
||||
json_index_->create_reader();
|
||||
|
||||
load_index_info.field_id = json_fid_.get();
|
||||
load_index_info.field_type = DataType::JSON;
|
||||
load_index_info.index = std::move(json_index_);
|
||||
load_index_info.index_params = {{JSON_PATH, json_index_path}};
|
||||
segment_->LoadIndex(load_index_info);
|
||||
auto cm = milvus::storage::RemoteChunkManagerSingleton::GetInstance()
|
||||
.GetRemoteChunkManager();
|
||||
auto load_info = PrepareSingleFieldInsertBinlog(
|
||||
1, 1, 1, json_fid_.get(), {json_field}, cm);
|
||||
segment_->LoadFieldData(load_info);
|
||||
}
|
||||
|
||||
void
|
||||
TearDown() override {
|
||||
}
|
||||
|
||||
FieldId json_fid_;
|
||||
std::vector<std::string> json_data_;
|
||||
std::unique_ptr<index::JsonFlatIndex> json_index_;
|
||||
segcore::SegmentSealedUPtr segment_;
|
||||
};
|
||||
|
||||
TEST_F(JsonFlatIndexExprTest, TestUnaryExpr) {
|
||||
proto::plan::GenericValue value;
|
||||
value.set_int64_val(1);
|
||||
auto expr = std::make_shared<expr::UnaryRangeFilterExpr>(
|
||||
expr::ColumnInfo(json_fid_, DataType::JSON, {""}),
|
||||
proto::plan::OpType::GreaterEqual,
|
||||
value,
|
||||
std::vector<proto::plan::GenericValue>());
|
||||
auto plan =
|
||||
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, expr);
|
||||
auto final = query::ExecuteQueryExpr(
|
||||
plan, segment_.get(), json_data_.size(), MAX_TIMESTAMP);
|
||||
EXPECT_EQ(final.count(), 3);
|
||||
EXPECT_TRUE(final[8]);
|
||||
EXPECT_TRUE(final[10]);
|
||||
EXPECT_TRUE(final[12]);
|
||||
}
|
||||
|
||||
TEST_F(JsonFlatIndexExprTest, TestExistsExpr) {
|
||||
auto expr = std::make_shared<expr::ExistsExpr>(
|
||||
expr::ColumnInfo(json_fid_, DataType::JSON, {""}));
|
||||
auto plan =
|
||||
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, expr);
|
||||
auto final = query::ExecuteQueryExpr(
|
||||
plan, segment_.get(), json_data_.size(), MAX_TIMESTAMP);
|
||||
EXPECT_EQ(final.count(), 12);
|
||||
EXPECT_FALSE(final[5]);
|
||||
EXPECT_FALSE(final[7]);
|
||||
EXPECT_FALSE(final[14]);
|
||||
EXPECT_FALSE(final[15]);
|
||||
}
|
||||
} // namespace milvus::test
|
||||
@ -248,21 +248,17 @@ func (m *indexMeta) updateIndexTasksMetrics() {
|
||||
log.Ctx(m.ctx).Info("update index metric", zap.Int("collectionNum", len(taskMetrics)))
|
||||
}
|
||||
|
||||
func checkJsonParams(index *model.Index, req *indexpb.CreateIndexRequest) bool {
|
||||
castType1, err := getIndexParam(index.IndexParams, common.JSONCastTypeKey)
|
||||
if err != nil {
|
||||
func checkIdenticalJson(index *model.Index, req *indexpb.CreateIndexRequest) bool {
|
||||
// Skip error handling since json path existence is guaranteed in CreateIndex
|
||||
jsonPath1, _ := getIndexParam(index.IndexParams, common.JSONPathKey)
|
||||
jsonPath2, _ := getIndexParam(req.GetIndexParams(), common.JSONPathKey)
|
||||
|
||||
if jsonPath1 != jsonPath2 {
|
||||
return false
|
||||
}
|
||||
castType2, err := getIndexParam(req.GetIndexParams(), common.JSONCastTypeKey)
|
||||
if err != nil || castType1 != castType2 {
|
||||
return false
|
||||
}
|
||||
jsonPath1, err := getIndexParam(index.IndexParams, common.JSONPathKey)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
jsonPath2, err := getIndexParam(req.GetIndexParams(), common.JSONPathKey)
|
||||
return err == nil && jsonPath1 == jsonPath2
|
||||
castType1, _ := getIndexParam(index.IndexParams, common.JSONCastTypeKey)
|
||||
castType2, _ := getIndexParam(req.GetIndexParams(), common.JSONCastTypeKey)
|
||||
return castType1 == castType2
|
||||
}
|
||||
|
||||
func checkParams(fieldIndex *model.Index, req *indexpb.CreateIndexRequest) bool {
|
||||
@ -370,7 +366,8 @@ func (m *indexMeta) canCreateIndex(req *indexpb.CreateIndexRequest, isJson bool)
|
||||
continue
|
||||
}
|
||||
if req.IndexName == index.IndexName {
|
||||
if req.FieldID == index.FieldID && checkParams(index, req) && (!isJson || checkJsonParams(index, req)) {
|
||||
if req.FieldID == index.FieldID && checkParams(index, req) &&
|
||||
/*only check json params when it is json index*/ (!isJson || checkIdenticalJson(index, req)) {
|
||||
return index.IndexID, nil
|
||||
}
|
||||
errMsg := "at most one distinct index is allowed per field"
|
||||
@ -383,16 +380,12 @@ func (m *indexMeta) canCreateIndex(req *indexpb.CreateIndexRequest, isJson bool)
|
||||
}
|
||||
if req.FieldID == index.FieldID {
|
||||
if isJson {
|
||||
// if it is json index, check if json paths are same
|
||||
jsonPath1, err := getIndexParam(index.IndexParams, common.JSONPathKey)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
jsonPath2, err := getIndexParam(req.GetIndexParams(), common.JSONPathKey)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// Skip error handling since json path existence is guaranteed in CreateIndex
|
||||
jsonPath1, _ := getIndexParam(index.IndexParams, common.JSONPathKey)
|
||||
jsonPath2, _ := getIndexParam(req.GetIndexParams(), common.JSONPathKey)
|
||||
|
||||
if jsonPath1 != jsonPath2 {
|
||||
// if json path is not same, create index is allowed
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@ -161,16 +161,24 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
|
||||
}
|
||||
|
||||
if isJson {
|
||||
// check json_path and json_cast_type exist
|
||||
jsonPath, err := getIndexParam(req.GetIndexParams(), common.JSONPathKey)
|
||||
if err != nil {
|
||||
log.Error("get json path from index params failed", zap.Error(err))
|
||||
log.Warn("get json path failed", zap.Error(err))
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
_, err = getIndexParam(req.GetIndexParams(), common.JSONCastTypeKey)
|
||||
if err != nil {
|
||||
log.Warn("get json cast type failed", zap.Error(err))
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
nestedPath, err := s.parseAndVerifyNestedPath(jsonPath, schema, req.GetFieldID())
|
||||
if err != nil {
|
||||
log.Error("parse nested path failed", zap.Error(err))
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
// set nested path as json path
|
||||
setIndexParam(req.GetIndexParams(), common.JSONPathKey, nestedPath)
|
||||
}
|
||||
|
||||
@ -183,19 +191,17 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
|
||||
}
|
||||
defaultIndexName := fieldName
|
||||
if isJson {
|
||||
jsonPath, err := getIndexParam(req.GetIndexParams(), common.JSONPathKey)
|
||||
if err != nil {
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
// ignore error, because it's already checked in getIndexParam before
|
||||
jsonPath, _ := getIndexParam(req.GetIndexParams(), common.JSONPathKey)
|
||||
// filter indexes by json path, the length of indexes should not be larger than 1
|
||||
// this is guaranteed by CanCreateIndex
|
||||
indexes = lo.Filter(indexes, func(index *model.Index, i int) bool {
|
||||
path, err := getIndexParam(index.IndexParams, common.JSONPathKey)
|
||||
return err == nil && path == jsonPath
|
||||
path, _ := getIndexParam(index.IndexParams, common.JSONPathKey)
|
||||
return path == jsonPath
|
||||
})
|
||||
|
||||
defaultIndexName += jsonPath
|
||||
}
|
||||
|
||||
if len(indexes) == 0 {
|
||||
req.IndexName = defaultIndexName
|
||||
} else if len(indexes) == 1 {
|
||||
|
||||
@ -19,6 +19,7 @@ package datacoord
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -2822,4 +2823,31 @@ func TestJsonIndex(t *testing.T) {
|
||||
}
|
||||
resp, err = s.CreateIndex(context.Background(), req)
|
||||
assert.NoError(t, merr.CheckRPCCall(resp, err))
|
||||
|
||||
// test json flat index
|
||||
req = &indexpb.CreateIndexRequest{
|
||||
FieldID: 0,
|
||||
IndexName: "h",
|
||||
IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_JSON))}, {Key: common.JSONPathKey, Value: "json[\"a\"][\"b\"]"}},
|
||||
}
|
||||
resp, err = s.CreateIndex(context.Background(), req)
|
||||
assert.NoError(t, merr.CheckRPCCall(resp, err))
|
||||
|
||||
// test json flat index with dynamic field
|
||||
req = &indexpb.CreateIndexRequest{
|
||||
FieldID: 2,
|
||||
IndexName: "i",
|
||||
IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_JSON))}, {Key: common.JSONPathKey, Value: "dynamic"}},
|
||||
}
|
||||
resp, err = s.CreateIndex(context.Background(), req)
|
||||
assert.NoError(t, merr.CheckRPCCall(resp, err))
|
||||
|
||||
// duplicated json flat index
|
||||
req = &indexpb.CreateIndexRequest{
|
||||
FieldID: 0,
|
||||
IndexName: "a",
|
||||
IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_JSON))}, {Key: common.JSONPathKey, Value: "json[\"a\"]"}},
|
||||
}
|
||||
resp, err = s.CreateIndex(context.Background(), req)
|
||||
assert.Error(t, merr.CheckRPCCall(resp, err))
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user