feat: cherry pick json path index (#40313)

issue: #35528 
pr: #36750 
this pr includes json path index pr and some related prs:
1. update tantivy version #39253 
2. json path index #36750 
3. fall back to brute force #40076 
4. term filter #40140 
5. bug fix #40336

---------

Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
Bingyi Sun 2025-03-10 22:14:05 +08:00 committed by GitHub
parent 3b847712bd
commit 683b26ffb7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
62 changed files with 1736 additions and 597 deletions

View File

@ -82,3 +82,5 @@ const size_t MARISA_NULL_KEY_ID = -1;
const int64_t DEFAULT_JSON_INDEX_MEMORY_BUDGET = 16777216; // bytes, 16MB
const int64_t DEFAULT_JSON_INDEX_COMMIT_INTERVAL = 200;
const bool DEFAULT_JSON_INDEX_ENABLED = true;
const std::string JSON_CAST_TYPE = "json_cast_type";
const std::string JSON_PATH = "json_path";

View File

@ -641,6 +641,20 @@ class FieldDataJsonImpl : public FieldDataImpl<Json, true> {
}
length_ += n;
}
// only for test
void
add_json_data(const std::vector<Json>& json) {
std::lock_guard lck(tell_mutex_);
if (length_ + json.size() > get_num_rows()) {
resize_field_data(length_ + json.size());
}
for (size_t i = 0; i < json.size(); ++i) {
data_[length_ + i] = json[i];
}
length_ += json.size();
}
};
class FieldDataSparseVectorImpl

View File

@ -165,6 +165,11 @@ class FieldMeta {
return IsVectorDataType(type_);
}
bool
is_json() const {
return type_ == DataType::JSON;
}
bool
is_string() const {
return IsStringDataType(type_);

View File

@ -564,6 +564,23 @@ struct TypeTraits<DataType::VECTOR_FLOAT> {
static constexpr const char* Name = "VECTOR_FLOAT";
};
inline DataType
FromValCase(milvus::proto::plan::GenericValue::ValCase val_case) {
switch (val_case) {
case milvus::proto::plan::GenericValue::ValCase::kBoolVal:
return milvus::DataType::BOOL;
case milvus::proto::plan::GenericValue::ValCase::kInt64Val:
return DataType::INT64;
case milvus::proto::plan::GenericValue::ValCase::kFloatVal:
return DataType::DOUBLE;
case milvus::proto::plan::GenericValue::ValCase::kStringVal:
return DataType::STRING;
case milvus::proto::plan::GenericValue::ValCase::kArrayVal:
return DataType::ARRAY;
default:
return DataType::NONE;
}
}
} // namespace milvus
template <>
struct fmt::formatter<milvus::DataType> : formatter<string_view> {

View File

@ -455,6 +455,8 @@ class PhyBinaryArithOpEvalRangeExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
DataType::NONE,
active_count,
batch_size,
consistency_level),

View File

@ -231,6 +231,8 @@ class PhyBinaryRangeFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
DataType::NONE,
active_count,
batch_size,
consistency_level),

View File

@ -48,6 +48,8 @@ class PhyExistsFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
DataType::NONE,
active_count,
batch_size,
consistency_level),

View File

@ -20,12 +20,15 @@
#include <memory>
#include <string>
#include "common/FieldDataInterface.h"
#include "common/Json.h"
#include "common/Types.h"
#include "exec/expression/EvalCtx.h"
#include "exec/expression/VectorFunction.h"
#include "exec/expression/Utils.h"
#include "exec/QueryContext.h"
#include "expr/ITypeExpr.h"
#include "log/Log.h"
#include "query/PlanProto.h"
#include "segcore/SegmentSealedImpl.h"
#include "segcore/SegmentInterface.h"
@ -111,13 +114,17 @@ class SegmentExpr : public Expr {
SegmentExpr(const std::vector<ExprPtr>&& input,
const std::string& name,
const segcore::SegmentInternalInterface* segment,
const FieldId& field_id,
const FieldId field_id,
const std::vector<std::string> nested_path,
const DataType value_type,
int64_t active_count,
int64_t batch_size,
int32_t consistency_level)
: Expr(DataType::BOOL, std::move(input), name),
segment_(segment),
field_id_(field_id),
nested_path_(nested_path),
value_type_(value_type),
active_count_(active_count),
batch_size_(batch_size),
consistency_level_(consistency_level) {
@ -133,6 +140,7 @@ class SegmentExpr : public Expr {
InitSegmentExpr() {
auto& schema = segment_->get_schema();
auto& field_meta = schema[field_id_];
field_type_ = field_meta.get_data_type();
if (schema.get_primary_field_id().has_value() &&
schema.get_primary_field_id().value() == field_id_ &&
@ -141,10 +149,18 @@ class SegmentExpr : public Expr {
pk_type_ = field_meta.get_data_type();
}
if (field_meta.get_data_type() == DataType::JSON) {
auto pointer = milvus::Json::pointer(nested_path_);
if (is_index_mode_ =
segment_->HasIndex(field_id_, pointer, value_type_)) {
num_index_chunk_ = 1;
}
} else {
is_index_mode_ = segment_->HasIndex(field_id_);
if (is_index_mode_) {
num_index_chunk_ = segment_->num_chunk_index(field_id_);
}
}
// if index not include raw data, also need load data
if (segment_->HasFieldData(field_id_)) {
if (segment_->is_chunked()) {
@ -774,9 +790,21 @@ class SegmentExpr : public Expr {
// It avoids indexing execute for every batch because indexing
// executing costs quite much time.
if (cached_index_chunk_id_ != i) {
Index* index_ptr = nullptr;
if (field_type_ == DataType::JSON) {
auto pointer = milvus::Json::pointer(nested_path_);
const Index& index =
segment_->chunk_scalar_index<IndexInnerType>(field_id_, i);
auto* index_ptr = const_cast<Index*>(&index);
segment_->chunk_scalar_index<IndexInnerType>(
field_id_, pointer, i);
index_ptr = const_cast<Index*>(&index);
} else {
const Index& index =
segment_->chunk_scalar_index<IndexInnerType>(field_id_,
i);
index_ptr = const_cast<Index*>(&index);
}
cached_index_chunk_res_ = std::move(func(index_ptr, values...));
auto valid_result = index_ptr->IsNotNull();
cached_index_chunk_valid_res_ = std::move(valid_result);
@ -1107,6 +1135,9 @@ class SegmentExpr : public Expr {
DataType pk_type_;
int64_t batch_size_;
std::vector<std::string> nested_path_;
DataType field_type_;
DataType value_type_;
bool is_index_mode_{false};
bool is_data_mode_{false};
// sometimes need to skip index and using raw data

View File

@ -42,6 +42,8 @@ class PhyJsonContainsFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
DataType::NONE,
active_count,
batch_size,
consistency_level),

View File

@ -41,6 +41,8 @@ class PhyNullExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
DataType::NONE,
active_count,
batch_size,
consistency_level),

View File

@ -236,10 +236,14 @@ VectorPtr
PhyTermFilterExpr::ExecVisitorImplTemplateJson(OffsetVector* input) {
if (expr_->is_in_field_) {
return ExecTermJsonVariableInField<ValueType>(input);
} else {
if (is_index_mode_) {
return ExecVisitorImplForIndex<ValueType>(input);
} else {
return ExecTermJsonFieldInVariable<ValueType>(input);
}
}
}
template <typename ValueType>
VectorPtr

View File

@ -63,6 +63,10 @@ class PhyTermFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
expr->vals_.size() == 0
? DataType::NONE
: FromValCase(expr->vals_[0].val_case()),
active_count,
batch_size,
consistency_level),

View File

@ -18,6 +18,10 @@
#include <optional>
#include "common/Json.h"
#include <boost/regex.hpp>
#include "common/Types.h"
#include "common/type_c.h"
#include "log/Log.h"
namespace milvus {
namespace exec {
@ -191,6 +195,30 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
}
case DataType::JSON: {
auto val_type = expr_->val_.val_case();
if (CanUseIndexForJson(FromValCase(val_type)) &&
!has_offset_input_) {
switch (val_type) {
case proto::plan::GenericValue::ValCase::kBoolVal:
result = ExecRangeVisitorImplForIndex<bool>();
break;
case proto::plan::GenericValue::ValCase::kInt64Val:
result = ExecRangeVisitorImplForIndex<int64_t>();
break;
case proto::plan::GenericValue::ValCase::kFloatVal:
result = ExecRangeVisitorImplForIndex<double>();
break;
case proto::plan::GenericValue::ValCase::kStringVal:
result = ExecRangeVisitorImplForIndex<std::string>();
break;
case proto::plan::GenericValue::ValCase::kArrayVal:
result =
ExecRangeVisitorImplForIndex<proto::plan::Array>();
break;
default:
PanicInfo(
DataTypeInvalid, "unknown data type: {}", val_type);
}
} else {
switch (val_type) {
case proto::plan::GenericValue::ValCase::kBoolVal:
result = ExecRangeVisitorImplJson<bool>(input);
@ -212,6 +240,7 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
PanicInfo(
DataTypeInvalid, "unknown data type: {}", val_type);
}
}
break;
}
case DataType::ARRAY: {
@ -1362,6 +1391,15 @@ PhyUnaryRangeFilterExpr::CanUseIndex() {
return res;
}
bool
PhyUnaryRangeFilterExpr::CanUseIndexForJson(DataType val_type) {
use_index_ =
segment_->HasIndex(field_id_,
milvus::Json::pointer(expr_->column_.nested_path_),
val_type);
return use_index_;
}
VectorPtr
PhyUnaryRangeFilterExpr::ExecTextMatch() {
using Index = index::TextMatchIndex;

View File

@ -322,6 +322,8 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr {
name,
segment,
expr->column_.field_id_,
expr->column_.nested_path_,
FromValCase(expr->val_.val_case()),
active_count,
batch_size,
consistency_level),
@ -385,6 +387,9 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr {
bool
CanUseIndexForArray();
bool
CanUseIndexForJson(DataType val_type);
VectorPtr
ExecTextMatch();

View File

@ -72,6 +72,11 @@ class IndexBase {
return index_type_;
}
virtual enum DataType
JsonCastType() const {
return DataType::NONE;
}
protected:
explicit IndexBase(IndexType index_type)
: index_type_(std::move(index_type)) {

View File

@ -15,11 +15,15 @@
// limitations under the License.
#include "index/IndexFactory.h"
#include <cstdlib>
#include <memory>
#include "common/EasyAssert.h"
#include "common/FieldDataInterface.h"
#include "common/Types.h"
#include "index/VectorMemIndex.h"
#include "index/Utils.h"
#include "index/Meta.h"
#include "index/JsonInvertedIndex.h"
#include "knowhere/utils.h"
#include "index/VectorDiskIndex.h"
@ -29,6 +33,8 @@
#include "index/InvertedIndexTantivy.h"
#include "index/HybridScalarIndex.h"
#include "knowhere/comp/knowhere_check.h"
#include "log/Log.h"
#include "pb/schema.pb.h"
namespace milvus::index {
@ -365,6 +371,45 @@ IndexFactory::CreateComplexScalarIndex(
PanicInfo(Unsupported, "Complex index not supported now");
}
IndexBasePtr
IndexFactory::CreateJsonIndex(
IndexType index_type,
DataType cast_dtype,
const std::string& nested_path,
const storage::FileManagerContext& file_manager_context) {
AssertInfo(index_type == INVERTED_INDEX_TYPE,
"Invalid index type for json index");
switch (cast_dtype) {
case DataType::BOOL:
return std::make_unique<index::JsonInvertedIndex<bool>>(
proto::schema::DataType::Bool,
nested_path,
file_manager_context);
case milvus::DataType::INT8:
case milvus::DataType::INT16:
case milvus::DataType::INT32:
case DataType::INT64:
return std::make_unique<index::JsonInvertedIndex<int64_t>>(
proto::schema::DataType::Int64,
nested_path,
file_manager_context);
case DataType::FLOAT:
case DataType::DOUBLE:
return std::make_unique<index::JsonInvertedIndex<double>>(
proto::schema::DataType::Double,
nested_path,
file_manager_context);
case DataType::STRING:
case DataType::VARCHAR:
return std::make_unique<index::JsonInvertedIndex<std::string>>(
proto::schema::DataType::String,
nested_path,
file_manager_context);
default:
PanicInfo(DataTypeInvalid, "Invalid data type:{}", cast_dtype);
}
}
IndexBasePtr
IndexFactory::CreateScalarIndex(
const CreateIndexInfo& create_index_info,
@ -387,7 +432,9 @@ IndexFactory::CreateScalarIndex(
file_manager_context);
}
case DataType::JSON: {
return CreateComplexScalarIndex(create_index_info.index_type,
return CreateJsonIndex(create_index_info.index_type,
create_index_info.json_cast_type,
create_index_info.json_path,
file_manager_context);
}
default:

View File

@ -21,6 +21,7 @@
#include <mutex>
#include <shared_mutex>
#include "common/Types.h"
#include "common/type_c.h"
#include "index/Index.h"
#include "index/ScalarIndex.h"
@ -103,6 +104,13 @@ class IndexFactory {
const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext());
IndexBasePtr
CreateJsonIndex(IndexType index_type,
DataType cast_dtype,
const std::string& nested_path,
const storage::FileManagerContext& file_manager_context =
storage::FileManagerContext());
IndexBasePtr
CreateScalarIndex(const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager_context =

View File

@ -27,6 +27,8 @@ struct CreateIndexInfo {
std::string field_name;
int64_t dim;
int32_t scalar_index_engine_version;
DataType json_cast_type;
std::string json_path;
};
} // namespace milvus::index

View File

@ -29,36 +29,6 @@
namespace milvus::index {
constexpr const char* TMP_INVERTED_INDEX_PREFIX = "/tmp/milvus/inverted-index/";
inline TantivyDataType
get_tantivy_data_type(proto::schema::DataType data_type) {
switch (data_type) {
case proto::schema::DataType::Bool: {
return TantivyDataType::Bool;
}
case proto::schema::DataType::Int8:
case proto::schema::DataType::Int16:
case proto::schema::DataType::Int32:
case proto::schema::DataType::Int64: {
return TantivyDataType::I64;
}
case proto::schema::DataType::Float:
case proto::schema::DataType::Double: {
return TantivyDataType::F64;
}
case proto::schema::DataType::String:
case proto::schema::DataType::VarChar: {
return TantivyDataType::Keyword;
}
default:
PanicInfo(ErrorCode::NotImplemented,
fmt::format("not implemented data type: {}", data_type));
}
}
inline TantivyDataType
get_tantivy_data_type(const proto::schema::FieldSchema& schema) {
switch (schema.data_type()) {
@ -327,7 +297,6 @@ template <typename T>
const TargetBitmap
InvertedIndexTantivy<T>::Range(T value, OpType op) {
TargetBitmap bitset(Count());
switch (op) {
case OpType::LessThan: {
auto array = wrapper_->upper_bound_range_query(value, false);
@ -549,6 +518,11 @@ InvertedIndexTantivy<T>::BuildWithFieldData(
break;
}
case proto::schema::DataType::JSON: {
build_index_for_json(field_datas);
break;
}
default:
PanicInfo(ErrorCode::NotImplemented,
fmt::format("Inverted index not supported on {}",

View File

@ -25,6 +25,36 @@
namespace milvus::index {
inline TantivyDataType
get_tantivy_data_type(proto::schema::DataType data_type) {
switch (data_type) {
case proto::schema::DataType::Bool: {
return TantivyDataType::Bool;
}
case proto::schema::DataType::Int8:
case proto::schema::DataType::Int16:
case proto::schema::DataType::Int32:
case proto::schema::DataType::Int64: {
return TantivyDataType::I64;
}
case proto::schema::DataType::Float:
case proto::schema::DataType::Double: {
return TantivyDataType::F64;
}
case proto::schema::DataType::String:
case proto::schema::DataType::VarChar: {
return TantivyDataType::Keyword;
}
default:
PanicInfo(ErrorCode::NotImplemented,
fmt::format("not implemented data type: {}", data_type));
}
}
using TantivyIndexWrapper = milvus::tantivy::TantivyIndexWrapper;
using RustArrayWrapper = milvus::tantivy::RustArrayWrapper;
@ -177,10 +207,10 @@ class InvertedIndexTantivy : public ScalarIndex<T> {
const TargetBitmap
RegexQuery(const std::string& regex_pattern) override;
protected:
void
BuildWithFieldData(const std::vector<FieldDataPtr>& datas) override;
protected:
void
finish();
@ -188,6 +218,13 @@ class InvertedIndexTantivy : public ScalarIndex<T> {
build_index_for_array(
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas);
virtual void
build_index_for_json(
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas) {
PanicInfo(ErrorCode::NotImplemented,
"build_index_for_json not implemented");
};
protected:
std::shared_ptr<TantivyIndexWrapper> wrapper_;
TantivyDataType d_type_;

View File

@ -0,0 +1,71 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "index/JsonInvertedIndex.h"
#include <string>
#include <string_view>
#include <type_traits>
#include "common/EasyAssert.h"
#include "common/FieldDataInterface.h"
#include "common/Json.h"
#include "common/Types.h"
#include "log/Log.h"
#include "simdjson/error.h"
namespace milvus::index {
template <typename T>
void
JsonInvertedIndex<T>::build_index_for_json(
const std::vector<std::shared_ptr<FieldDataBase>>& field_datas) {
using GetType =
std::conditional_t<std::is_same_v<std::string, T>, std::string_view, T>;
int64_t offset = 0;
LOG_INFO("Start to build json inverted index for field: {}", nested_path_);
for (const auto& data : field_datas) {
auto n = data->get_num_rows();
for (int64_t i = 0; i < n; i++) {
auto json_column = static_cast<const Json*>(data->RawValue(i));
if (this->schema_.nullable() && !data->is_valid(i)) {
this->null_offset_.push_back(i);
this->wrapper_->template add_multi_data<T>(
nullptr, 0, offset++);
continue;
}
value_result<GetType> res = json_column->at<GetType>(nested_path_);
auto err = res.error();
if (err != simdjson::SUCCESS) {
AssertInfo(err == simdjson::INCORRECT_TYPE ||
err == simdjson::NO_SUCH_FIELD,
"Failed to parse json, err: {}",
err);
this->null_offset_.push_back(i);
this->wrapper_->template add_multi_data<T>(
nullptr, 0, offset++);
continue;
}
if constexpr (std::is_same_v<GetType, std::string_view>) {
auto value = std::string(res.value());
this->wrapper_->template add_data(&value, 1, offset++);
} else {
auto value = res.value();
this->wrapper_->template add_data(&value, 1, offset++);
}
}
}
}
template class JsonInvertedIndex<bool>;
template class JsonInvertedIndex<int64_t>;
template class JsonInvertedIndex<double>;
template class JsonInvertedIndex<std::string>;
} // namespace milvus::index

View File

@ -0,0 +1,75 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include "common/FieldDataInterface.h"
#include "index/InvertedIndexTantivy.h"
#include "index/ScalarIndex.h"
#include "storage/FileManager.h"
#include "boost/filesystem.hpp"
#include "tantivy-binding.h"
namespace milvus::index {
template <typename T>
class JsonInvertedIndex : public index::InvertedIndexTantivy<T> {
public:
JsonInvertedIndex(const proto::schema::DataType cast_type,
const std::string& nested_path,
const storage::FileManagerContext& ctx)
: nested_path_(nested_path), cast_type_(cast_type) {
this->schema_ = ctx.fieldDataMeta.field_schema;
this->mem_file_manager_ =
std::make_shared<storage::MemFileManagerImpl>(ctx);
this->disk_file_manager_ =
std::make_shared<storage::DiskFileManagerImpl>(ctx);
if (ctx.for_loading_index) {
return;
}
auto prefix = this->disk_file_manager_->GetTextIndexIdentifier();
constexpr const char* TMP_INVERTED_INDEX_PREFIX =
"/tmp/milvus/inverted-index/";
this->path_ = std::string(TMP_INVERTED_INDEX_PREFIX) + prefix;
this->d_type_ = index::get_tantivy_data_type(cast_type);
boost::filesystem::create_directories(this->path_);
std::string field_name = std::to_string(
this->disk_file_manager_->GetFieldDataMeta().field_id);
this->wrapper_ = std::make_shared<index::TantivyIndexWrapper>(
field_name.c_str(), this->d_type_, this->path_.c_str());
}
void
build_index_for_json(const std::vector<std::shared_ptr<FieldDataBase>>&
field_datas) override;
void
finish() {
this->wrapper_->finish();
}
void
create_reader() {
this->wrapper_->create_reader();
}
enum DataType
JsonCastType() const override {
return static_cast<enum DataType>(cast_type_);
}
private:
std::string nested_path_;
proto::schema::DataType cast_type_;
};
} // namespace milvus::index

View File

@ -18,6 +18,7 @@
#include "common/EasyAssert.h"
#include "indexbuilder/IndexCreatorBase.h"
#include "index/JsonInvertedIndex.h"
#include "indexbuilder/ScalarIndexCreator.h"
#include "indexbuilder/VecIndexCreator.h"
#include "indexbuilder/type_c.h"
@ -60,6 +61,7 @@ class IndexFactory {
case DataType::VARCHAR:
case DataType::STRING:
case DataType::ARRAY:
case DataType::JSON:
return CreateScalarIndex(type, config, context);
case DataType::VECTOR_FLOAT:
@ -68,6 +70,7 @@ class IndexFactory {
case DataType::VECTOR_BINARY:
case DataType::VECTOR_SPARSE_FLOAT:
return std::make_unique<VecIndexCreator>(type, config, context);
default:
PanicInfo(DataTypeInvalid,
fmt::format("invalid type is {}", invalid_dtype_msg));

View File

@ -10,6 +10,9 @@
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "indexbuilder/ScalarIndexCreator.h"
#include "common/Consts.h"
#include "common/FieldDataInterface.h"
#include "common/Types.h"
#include "index/IndexFactory.h"
#include "index/IndexInfo.h"
#include "index/Meta.h"
@ -39,6 +42,11 @@ ScalarIndexCreator::ScalarIndexCreator(
index_info.field_type = dtype_;
index_info.index_type = index_type();
if (dtype == DataType::JSON) {
index_info.json_cast_type = static_cast<DataType>(
std::stoi(config.at(JSON_CAST_TYPE).get<std::string>()));
index_info.json_path = config.at(JSON_PATH).get<std::string>();
}
index_ = index::IndexFactory::GetInstance().CreateIndex(
index_info, file_manager_context);
}

View File

@ -175,13 +175,22 @@ ChunkedSegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
return;
}
auto row_count = info.index->Count();
AssertInfo(row_count > 0, "Index count is 0");
std::unique_lock lck(mutex_);
AssertInfo(
!get_bit(index_ready_bitset_, field_id),
"scalar index has been exist at " + std::to_string(field_id.get()));
if (field_meta.get_data_type() == DataType::JSON) {
auto path = info.index_params.at(JSON_PATH);
JSONIndexKey key;
key.nested_path = path;
key.field_id = field_id;
json_indexings_[key] =
std::move(const_cast<LoadIndexInfo&>(info).index);
return;
}
auto row_count = info.index->Count();
AssertInfo(row_count > 0, "Index count is 0");
if (num_rows_.has_value()) {
AssertInfo(num_rows_.value() == row_count,
"field (" + std::to_string(field_id.get()) +
@ -1805,6 +1814,8 @@ ChunkedSegmentSealedImpl::HasRawData(int64_t field_id) const {
dynamic_cast<index::VectorIndex*>(field_indexing->indexing_.get());
return vec_index->HasRawData() ||
get_bit(field_data_ready_bitset_, fieldID);
} else if (IsJsonDataType(field_meta.get_data_type())) {
return get_bit(field_data_ready_bitset_, fieldID);
} else {
auto scalar_index = scalar_indexings_.find(fieldID);
if (scalar_index != scalar_indexings_.end()) {

View File

@ -301,6 +301,13 @@ class SegmentGrowingImpl : public SegmentGrowing {
return false;
}
bool
HasIndex(FieldId field_id,
const std::string& nested_path,
DataType data_type) const override {
return false;
};
bool
HasFieldData(FieldId field_id) const override {
return true;

View File

@ -21,6 +21,7 @@
#include "FieldIndexing.h"
#include "common/Common.h"
#include "common/EasyAssert.h"
#include "common/Schema.h"
#include "common/Span.h"
#include "common/SystemProperty.h"
@ -248,6 +249,18 @@ class SegmentInternalInterface : public SegmentInterface {
std::to_string(field_id);
}
template <typename T>
const index::ScalarIndex<T>&
chunk_scalar_index(FieldId field_id,
std::string path,
int64_t chunk_id) const {
using IndexType = index::ScalarIndex<T>;
auto base_ptr = chunk_index_impl(field_id, path, chunk_id);
auto ptr = dynamic_cast<const IndexType*>(base_ptr);
AssertInfo(ptr, "entry mismatch");
return *ptr;
}
std::unique_ptr<SearchResult>
Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_group,
@ -279,6 +292,12 @@ class SegmentInternalInterface : public SegmentInterface {
virtual bool
HasIndex(FieldId field_id) const = 0;
virtual bool
HasIndex(FieldId field_id,
const std::string& nested_path,
DataType data_type) const {
PanicInfo(ErrorCode::NotImplemented, "not implemented");
};
virtual bool
HasFieldData(FieldId field_id) const = 0;
@ -464,6 +483,13 @@ class SegmentInternalInterface : public SegmentInterface {
get_timestamps() const = 0;
public:
virtual const index::IndexBase*
chunk_index_impl(FieldId field_id,
std::string path,
int64_t chunk_id) const {
PanicInfo(ErrorCode::NotImplemented, "not implemented");
};
// calculate output[i] = Vec[seg_offsets[i]}, where Vec binds to system_type
virtual void
bulk_subscript(SystemFieldType system_type,

View File

@ -16,6 +16,7 @@
#include <tuple>
#include "common/LoadInfo.h"
#include "index/JsonInvertedIndex.h"
#include "pb/segcore.pb.h"
#include "segcore/SegmentInterface.h"
#include "segcore/Types.h"
@ -69,6 +70,57 @@ class SegmentSealed : public SegmentInternalInterface {
type() const override {
return SegmentType::Sealed;
}
index::IndexBase*
chunk_index_impl(FieldId field_id,
std::string path,
int64_t chunk_id) const override {
JSONIndexKey key;
key.field_id = field_id;
key.nested_path = path;
AssertInfo(json_indexings_.find(key) != json_indexings_.end(),
"Cannot find json index with path: " + path);
return json_indexings_.at(key).get();
}
virtual bool
HasIndex(FieldId field_id) const override = 0;
bool
HasIndex(FieldId field_id,
const std::string& path,
DataType data_type) const override {
JSONIndexKey key;
key.field_id = field_id;
key.nested_path = path;
auto index = json_indexings_.find(key);
return index != json_indexings_.end() &&
data_type == index->second->JsonCastType();
}
protected:
struct JSONIndexKey {
FieldId field_id;
std::string nested_path;
bool
operator==(const JSONIndexKey& other) const {
return field_id == other.field_id &&
nested_path == other.nested_path;
}
};
struct hash_helper {
size_t
operator()(const JSONIndexKey& k) const {
std::hash<int64_t> h1;
std::hash<std::string> h2;
size_t hash_result = 0;
boost::hash_combine(hash_result, h1(k.field_id.get()));
boost::hash_combine(hash_result, h2(k.nested_path));
return hash_result;
}
};
std::unordered_map<JSONIndexKey, index::IndexBasePtr, hash_helper>
json_indexings_;
};
using SegmentSealedSPtr = std::shared_ptr<SegmentSealed>;

View File

@ -212,13 +212,23 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
return;
}
auto row_count = info.index->Count();
AssertInfo(row_count > 0, "Index count is 0");
std::unique_lock lck(mutex_);
AssertInfo(
!get_bit(index_ready_bitset_, field_id),
"scalar index has been exist at " + std::to_string(field_id.get()));
if (field_meta.get_data_type() == DataType::JSON) {
auto path = info.index_params.at(JSON_PATH);
JSONIndexKey key;
key.nested_path = path;
key.field_id = field_id;
json_indexings_[key] =
std::move(const_cast<LoadIndexInfo&>(info).index);
return;
}
auto row_count = info.index->Count();
AssertInfo(row_count > 0, "Index count is 0");
if (num_rows_.has_value()) {
AssertInfo(num_rows_.value() == row_count,
"field (" + std::to_string(field_id.get()) +
@ -227,7 +237,6 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
") than other column's row count (" +
std::to_string(num_rows_.value()) + ")");
}
scalar_indexings_[field_id] =
std::move(const_cast<LoadIndexInfo&>(info).index);
// reverse pk from scalar index and set pks to offset
@ -675,7 +684,6 @@ SegmentSealedImpl::num_chunk_index(FieldId field_id) const {
if (field_meta.is_vector()) {
return int64_t(vector_indexings_.is_ready(field_id));
}
return scalar_indexings_.count(field_id);
}
@ -1662,6 +1670,8 @@ SegmentSealedImpl::HasRawData(int64_t field_id) const {
return vec_index->HasRawData() ||
get_bit(field_data_ready_bitset_, fieldID);
}
} else if (IsJsonDataType(field_meta.get_data_type())) {
return get_bit(field_data_ready_bitset_, fieldID);
} else {
auto scalar_index = scalar_indexings_.find(fieldID);
if (scalar_index != scalar_indexings_.end()) {

View File

@ -11,6 +11,7 @@
#include "segcore/load_index_c.h"
#include "common/Consts.h"
#include "common/FieldMeta.h"
#include "common/EasyAssert.h"
#include "common/Types.h"
@ -305,6 +306,11 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) {
load_index_info->index_params);
config[milvus::index::INDEX_FILES] = load_index_info->index_files;
if (load_index_info->field_type == milvus::DataType::JSON) {
index_info.json_cast_type = static_cast<milvus::DataType>(
std::stoi(config.at(JSON_CAST_TYPE).get<std::string>()));
index_info.json_path = config.at(JSON_PATH).get<std::string>();
}
milvus::storage::FileManagerContext fileManagerContext(
field_meta, index_meta, remote_chunk_manager);
fileManagerContext.set_for_loading_index(true);

File diff suppressed because it is too large Load Diff

View File

@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tantivy = { git = "https://github.com/milvus-io/tantivy", tag = "0.21.1-fix3" } # we have make a private fix for milvus, should be removed in future after milvus fixing the bug.
tantivy = { git = "https://github.com/milvus-io/tantivy", tag = "v0.1.0" } # we have make a private fix for milvus, should be removed in future after milvus fixing the bug.
futures = "0.3.21"
libc = "0.2"
scopeguard = "1.2"

View File

@ -107,14 +107,24 @@ RustResult tantivy_term_query_i64(void *ptr, int64_t term);
RustResult tantivy_lower_bound_range_query_i64(void *ptr, int64_t lower_bound, bool inclusive);
RustResult tantivy_lower_bound_range_query_bool(void *ptr, bool lower_bound, bool inclusive);
RustResult tantivy_upper_bound_range_query_i64(void *ptr, int64_t upper_bound, bool inclusive);
RustResult tantivy_upper_bound_range_query_bool(void *ptr, bool upper_bound, bool inclusive);
RustResult tantivy_range_query_i64(void *ptr,
int64_t lower_bound,
int64_t upper_bound,
bool lb_inclusive,
bool ub_inclusive);
RustResult tantivy_range_query_bool(void *ptr,
bool lower_bound,
bool upper_bound,
bool lb_inclusive,
bool ub_inclusive);
RustResult tantivy_term_query_f64(void *ptr, double term);
RustResult tantivy_lower_bound_range_query_f64(void *ptr, double lower_bound, bool inclusive);

View File

@ -40,7 +40,7 @@ impl IndexReaderWrapper {
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::OnCommit) // OnCommit serve for growing segment.
.reload_policy(ReloadPolicy::OnCommitWithDelay) // OnCommitWithDelay serve for growing segment.
.try_into()?;
reader.reload()?;
@ -108,11 +108,9 @@ impl IndexReaderWrapper {
lower_bound: i64,
inclusive: bool,
) -> Result<Vec<u32>> {
let q = RangeQuery::new_i64_bounds(
self.field_name.to_string(),
make_bounds(lower_bound, inclusive),
Bound::Unbounded,
);
let term = Term::from_field_i64(self.field, lower_bound);
let q = RangeQuery::new(make_bounds(term, inclusive), Bound::Unbounded);
self.search(&q)
}
@ -121,11 +119,30 @@ impl IndexReaderWrapper {
upper_bound: i64,
inclusive: bool,
) -> Result<Vec<u32>> {
let q = RangeQuery::new_i64_bounds(
self.field_name.to_string(),
Bound::Unbounded,
make_bounds(upper_bound, inclusive),
);
let term = Term::from_field_i64(self.field, upper_bound);
let q = RangeQuery::new(Bound::Unbounded, make_bounds(term, inclusive));
self.search(&q)
}
pub fn lower_bound_range_query_bool(
&self,
lower_bound: bool,
inclusive: bool,
) -> Result<Vec<u32>> {
let lower_bound = make_bounds(Term::from_field_bool(self.field, lower_bound), inclusive);
let upper_bound = Bound::Unbounded;
let q = RangeQuery::new(lower_bound, upper_bound);
self.search(&q)
}
pub fn upper_bound_range_query_bool(
&self,
upper_bound: bool,
inclusive: bool,
) -> Result<Vec<u32>> {
let lower_bound = Bound::Unbounded;
let upper_bound = make_bounds(Term::from_field_bool(self.field, upper_bound), inclusive);
let q = RangeQuery::new(lower_bound, upper_bound);
self.search(&q)
}
@ -136,9 +153,9 @@ impl IndexReaderWrapper {
lb_inclusive: bool,
ub_inclusive: bool,
) -> Result<Vec<u32>> {
let lb = make_bounds(lower_bound, lb_inclusive);
let ub = make_bounds(upper_bound, ub_inclusive);
let q = RangeQuery::new_i64_bounds(self.field_name.to_string(), lb, ub);
let lb = make_bounds(Term::from_field_i64(self.field, lower_bound), lb_inclusive);
let ub = make_bounds(Term::from_field_i64(self.field, upper_bound), ub_inclusive);
let q = RangeQuery::new(lb, ub);
self.search(&q)
}
@ -150,14 +167,26 @@ impl IndexReaderWrapper {
self.search(&q)
}
pub fn range_query_bool(
&self,
lower_bound: bool,
upper_bound: bool,
lb_inclusive: bool,
ub_inclusive: bool,
) -> Result<Vec<u32>> {
let lower_bound = make_bounds(Term::from_field_bool(self.field, lower_bound), lb_inclusive);
let upper_bound = make_bounds(Term::from_field_bool(self.field, upper_bound), ub_inclusive);
let q = RangeQuery::new(lower_bound, upper_bound);
self.search(&q)
}
pub fn lower_bound_range_query_f64(
&self,
lower_bound: f64,
inclusive: bool,
) -> Result<Vec<u32>> {
let q = RangeQuery::new_f64_bounds(
self.field_name.to_string(),
make_bounds(lower_bound, inclusive),
let q = RangeQuery::new(
make_bounds(Term::from_field_f64(self.field, lower_bound), inclusive),
Bound::Unbounded,
);
self.search(&q)
@ -168,10 +197,9 @@ impl IndexReaderWrapper {
upper_bound: f64,
inclusive: bool,
) -> Result<Vec<u32>> {
let q = RangeQuery::new_f64_bounds(
self.field_name.to_string(),
let q = RangeQuery::new(
Bound::Unbounded,
make_bounds(upper_bound, inclusive),
make_bounds(Term::from_field_f64(self.field, upper_bound), inclusive),
);
self.search(&q)
}
@ -183,9 +211,9 @@ impl IndexReaderWrapper {
lb_inclusive: bool,
ub_inclusive: bool,
) -> Result<Vec<u32>> {
let lb = make_bounds(lower_bound, lb_inclusive);
let ub = make_bounds(upper_bound, ub_inclusive);
let q = RangeQuery::new_f64_bounds(self.field_name.to_string(), lb, ub);
let lb = make_bounds(Term::from_field_f64(self.field, lower_bound), lb_inclusive);
let ub = make_bounds(Term::from_field_f64(self.field, upper_bound), ub_inclusive);
let q = RangeQuery::new(lb, ub);
self.search(&q)
}
@ -218,9 +246,8 @@ impl IndexReaderWrapper {
lower_bound: &str,
inclusive: bool,
) -> Result<Vec<u32>> {
let q = RangeQuery::new_str_bounds(
self.field_name.to_string(),
make_bounds(lower_bound, inclusive),
let q = RangeQuery::new(
make_bounds(Term::from_field_text(self.field, lower_bound), inclusive),
Bound::Unbounded,
);
self.search(&q)
@ -231,10 +258,9 @@ impl IndexReaderWrapper {
upper_bound: &str,
inclusive: bool,
) -> Result<Vec<u32>> {
let q = RangeQuery::new_str_bounds(
self.field_name.to_string(),
let q = RangeQuery::new(
Bound::Unbounded,
make_bounds(upper_bound, inclusive),
make_bounds(Term::from_field_text(self.field, upper_bound), inclusive),
);
self.search(&q)
}
@ -246,9 +272,9 @@ impl IndexReaderWrapper {
lb_inclusive: bool,
ub_inclusive: bool,
) -> Result<Vec<u32>> {
let lb = make_bounds(lower_bound, lb_inclusive);
let ub = make_bounds(upper_bound, ub_inclusive);
let q = RangeQuery::new_str_bounds(self.field_name.to_string(), lb, ub);
let lb = make_bounds(Term::from_field_text(self.field, lower_bound), lb_inclusive);
let ub = make_bounds(Term::from_field_text(self.field, upper_bound), ub_inclusive);
let q = RangeQuery::new(lb, ub);
self.search(&q)
}

View File

@ -56,6 +56,20 @@ pub extern "C" fn tantivy_lower_bound_range_query_i64(
}
}
#[no_mangle]
pub extern "C" fn tantivy_lower_bound_range_query_bool(
ptr: *mut c_void,
lower_bound: bool,
inclusive: bool,
) -> RustResult {
let real = ptr as *mut IndexReaderWrapper;
unsafe {
(*real)
.lower_bound_range_query_bool(lower_bound, inclusive)
.into()
}
}
#[no_mangle]
pub extern "C" fn tantivy_upper_bound_range_query_i64(
ptr: *mut c_void,
@ -70,6 +84,20 @@ pub extern "C" fn tantivy_upper_bound_range_query_i64(
}
}
#[no_mangle]
pub extern "C" fn tantivy_upper_bound_range_query_bool(
ptr: *mut c_void,
upper_bound: bool,
inclusive: bool,
) -> RustResult {
let real = ptr as *mut IndexReaderWrapper;
unsafe {
(*real)
.upper_bound_range_query_bool(upper_bound, inclusive)
.into()
}
}
#[no_mangle]
pub extern "C" fn tantivy_range_query_i64(
ptr: *mut c_void,
@ -86,6 +114,21 @@ pub extern "C" fn tantivy_range_query_i64(
}
}
#[no_mangle]
pub extern "C" fn tantivy_range_query_bool(
ptr: *mut c_void,
lower_bound: bool,
upper_bound: bool,
lb_inclusive: bool,
ub_inclusive: bool,
) -> RustResult {
let real = ptr as *mut IndexReaderWrapper;
unsafe {
(*real)
.range_query_bool(lower_bound, upper_bound, lb_inclusive, ub_inclusive)
.into()
}
}
#[no_mangle]
pub extern "C" fn tantivy_term_query_f64(ptr: *mut c_void, term: f64) -> RustResult {
let real = ptr as *mut IndexReaderWrapper;

View File

@ -6,9 +6,12 @@ use futures::executor::block_on;
use libc::c_char;
use log::info;
use tantivy::schema::{
Field, IndexRecordOption, Schema, SchemaBuilder, TextFieldIndexing, TextOptions, FAST, INDEXED,
Field, IndexRecordOption, OwnedValue, Schema, SchemaBuilder, TextFieldIndexing, TextOptions,
FAST, INDEXED,
};
use tantivy::{
doc, tokenizer, Document, Index, IndexWriter, SingleSegmentIndexWriter, TantivyDocument,
};
use tantivy::{doc, Document, Index, IndexWriter, SingleSegmentIndexWriter};
use crate::data_type::TantivyDataType;
@ -56,7 +59,10 @@ impl IndexWriterWrapper {
in_ram: bool,
) -> Result<IndexWriterWrapper> {
init_log();
info!("create index writer, field_name: {}, data_type: {:?}", field_name, data_type);
info!(
"create index writer, field_name: {}, data_type: {:?}",
field_name, data_type
);
let mut schema_builder = Schema::builder();
let field = schema_builder_add_field(&mut schema_builder, &field_name, data_type);
// We cannot build direct connection from rows in multi-segments to milvus row data. So we have this doc_id field.
@ -83,7 +89,10 @@ impl IndexWriterWrapper {
path: String,
) -> Result<IndexWriterWrapper> {
init_log();
info!("create single segment index writer, field_name: {}, data_type: {:?}", field_name, data_type);
info!(
"create single segment index writer, field_name: {}, data_type: {:?}",
field_name, data_type
);
let mut schema_builder = Schema::builder();
let field = schema_builder_add_field(&mut schema_builder, &field_name, data_type);
let schema = schema_builder.build();
@ -101,7 +110,7 @@ impl IndexWriterWrapper {
IndexReaderWrapper::from_index(self.index.clone())
}
fn index_writer_add_document(&self, document: Document) -> Result<()> {
fn index_writer_add_document(&self, document: TantivyDocument) -> Result<()> {
match self.index_writer {
Either::Left(ref writer) => {
let _ = writer.add_document(document)?;
@ -113,7 +122,10 @@ impl IndexWriterWrapper {
Ok(())
}
fn single_segment_index_writer_add_document(&mut self, document: Document) -> Result<()> {
fn single_segment_index_writer_add_document(
&mut self,
document: TantivyDocument,
) -> Result<()> {
match self.index_writer {
Either::Left(_) => {
panic!("unexpected writer");
@ -170,70 +182,70 @@ impl IndexWriterWrapper {
}
pub fn add_multi_i8s(&mut self, datas: &[i8], offset: i64) -> Result<()> {
let mut document = Document::default();
let mut document = TantivyDocument::default();
for data in datas {
document.add_field_value(self.field, *data as i64);
document.add_field_value(self.field, &(*data as i64));
}
document.add_i64(self.id_field.unwrap(), offset);
self.index_writer_add_document(document)
}
pub fn add_multi_i16s(&mut self, datas: &[i16], offset: i64) -> Result<()> {
let mut document = Document::default();
let mut document = TantivyDocument::default();
for data in datas {
document.add_field_value(self.field, *data as i64);
document.add_field_value(self.field, &(*data as i64));
}
document.add_i64(self.id_field.unwrap(), offset);
self.index_writer_add_document(document)
}
pub fn add_multi_i32s(&mut self, datas: &[i32], offset: i64) -> Result<()> {
let mut document = Document::default();
let mut document = TantivyDocument::default();
for data in datas {
document.add_field_value(self.field, *data as i64);
document.add_field_value(self.field, &(*data as i64));
}
document.add_i64(self.id_field.unwrap(), offset);
self.index_writer_add_document(document)
}
pub fn add_multi_i64s(&mut self, datas: &[i64], offset: i64) -> Result<()> {
let mut document = Document::default();
let mut document = TantivyDocument::default();
for data in datas {
document.add_field_value(self.field, *data);
document.add_field_value(self.field, data);
}
document.add_i64(self.id_field.unwrap(), offset);
self.index_writer_add_document(document)
}
pub fn add_multi_f32s(&mut self, datas: &[f32], offset: i64) -> Result<()> {
let mut document = Document::default();
let mut document = TantivyDocument::default();
for data in datas {
document.add_field_value(self.field, *data as f64);
document.add_field_value(self.field, &(*data as f64));
}
document.add_i64(self.id_field.unwrap(), offset);
self.index_writer_add_document(document)
}
pub fn add_multi_f64s(&mut self, datas: &[f64], offset: i64) -> Result<()> {
let mut document = Document::default();
let mut document = TantivyDocument::default();
for data in datas {
document.add_field_value(self.field, *data);
document.add_field_value(self.field, data);
}
document.add_i64(self.id_field.unwrap(), offset);
self.index_writer_add_document(document)
}
pub fn add_multi_bools(&mut self, datas: &[bool], offset: i64) -> Result<()> {
let mut document = Document::default();
let mut document = TantivyDocument::default();
for data in datas {
document.add_field_value(self.field, *data);
document.add_field_value(self.field, data);
}
document.add_i64(self.id_field.unwrap(), offset);
self.index_writer_add_document(document)
}
pub fn add_multi_keywords(&mut self, datas: &[*const c_char], offset: i64) -> Result<()> {
let mut document = Document::default();
let mut document = TantivyDocument::default();
for element in datas {
let data = unsafe { CStr::from_ptr(*element) };
document.add_field_value(self.field, data.to_str()?);
@ -283,57 +295,57 @@ impl IndexWriterWrapper {
}
pub fn add_multi_i8s_by_single_segment_writer(&mut self, datas: &[i8]) -> Result<()> {
let mut document = Document::default();
let mut document = TantivyDocument::default();
for data in datas {
document.add_field_value(self.field, *data as i64);
document.add_field_value(self.field, &(*data as i64));
}
self.single_segment_index_writer_add_document(document)
}
pub fn add_multi_i16s_by_single_segment_writer(&mut self, datas: &[i16]) -> Result<()> {
let mut document = Document::default();
let mut document = TantivyDocument::default();
for data in datas {
document.add_field_value(self.field, *data as i64);
document.add_field_value(self.field, &(*data as i64));
}
self.single_segment_index_writer_add_document(document)
}
pub fn add_multi_i32s_by_single_segment_writer(&mut self, datas: &[i32]) -> Result<()> {
let mut document = Document::default();
let mut document = TantivyDocument::default();
for data in datas {
document.add_field_value(self.field, *data as i64);
document.add_field_value(self.field, &(*data as i64));
}
self.single_segment_index_writer_add_document(document)
}
pub fn add_multi_i64s_by_single_segment_writer(&mut self, datas: &[i64]) -> Result<()> {
let mut document = Document::default();
let mut document = TantivyDocument::default();
for data in datas {
document.add_field_value(self.field, *data);
document.add_field_value(self.field, data);
}
self.single_segment_index_writer_add_document(document)
}
pub fn add_multi_f32s_by_single_segment_writer(&mut self, datas: &[f32]) -> Result<()> {
let mut document = Document::default();
let mut document = TantivyDocument::default();
for data in datas {
document.add_field_value(self.field, *data as f64);
document.add_field_value(self.field, &(*data as f64));
}
self.single_segment_index_writer_add_document(document)
}
pub fn add_multi_f64s_by_single_segment_writer(&mut self, datas: &[f64]) -> Result<()> {
let mut document = Document::default();
let mut document = TantivyDocument::default();
for data in datas {
document.add_field_value(self.field, *data);
document.add_field_value(self.field, data);
}
self.single_segment_index_writer_add_document(document)
}
pub fn add_multi_bools_by_single_segment_writer(&mut self, datas: &[bool]) -> Result<()> {
let mut document = Document::default();
let mut document = TantivyDocument::default();
for data in datas {
document.add_field_value(self.field, *data);
document.add_field_value(self.field, data);
}
self.single_segment_index_writer_add_document(document)
}
@ -342,7 +354,7 @@ impl IndexWriterWrapper {
&mut self,
datas: &[*const c_char],
) -> Result<()> {
let mut document = Document::default();
let mut document = TantivyDocument::default();
for element in datas {
let data = unsafe { CStr::from_ptr(*element) };
document.add_field_value(self.field, data.to_str()?);

View File

@ -5,6 +5,7 @@
#include <iostream>
#include <map>
#include <vector>
#include <type_traits>
#include "common/EasyAssert.h"
#include "tantivy-binding.h"
@ -636,6 +637,11 @@ struct TantivyIndexWrapper {
RustArrayWrapper
lower_bound_range_query(T lower_bound, bool inclusive) {
auto array = [&]() {
if constexpr (std::is_same_v<T, bool>) {
return tantivy_lower_bound_range_query_bool(
reader_, static_cast<bool>(lower_bound), inclusive);
}
if constexpr (std::is_integral_v<T>) {
return tantivy_lower_bound_range_query_i64(
reader_, static_cast<int64_t>(lower_bound), inclusive);
@ -673,6 +679,11 @@ struct TantivyIndexWrapper {
RustArrayWrapper
upper_bound_range_query(T upper_bound, bool inclusive) {
auto array = [&]() {
if constexpr (std::is_same_v<T, bool>) {
return tantivy_upper_bound_range_query_bool(
reader_, static_cast<bool>(upper_bound), inclusive);
}
if constexpr (std::is_integral_v<T>) {
return tantivy_upper_bound_range_query_i64(
reader_, static_cast<int64_t>(upper_bound), inclusive);
@ -713,6 +724,14 @@ struct TantivyIndexWrapper {
bool lb_inclusive,
bool ub_inclusive) {
auto array = [&]() {
if constexpr (std::is_same_v<T, bool>) {
return tantivy_range_query_bool(reader_,
static_cast<bool>(lower_bound),
static_cast<bool>(upper_bound),
lb_inclusive,
ub_inclusive);
}
if constexpr (std::is_integral_v<T>) {
return tantivy_range_query_i64(
reader_,

View File

@ -13,16 +13,26 @@
#include <fstream>
#include <gtest/gtest.h>
#include <cstdint>
#include <limits>
#include <memory>
#include <regex>
#include <string>
#include <string_view>
#include <type_traits>
#include <vector>
#include <chrono>
#include <roaring/roaring.hh>
#include "common/FieldDataInterface.h"
#include "common/Json.h"
#include "common/LoadInfo.h"
#include "common/Types.h"
#include "index/Meta.h"
#include "index/JsonInvertedIndex.h"
#include "knowhere/comp/index_param.h"
#include "mmap/Types.h"
#include "pb/plan.pb.h"
#include "pb/schema.pb.h"
#include "query/Plan.h"
#include "query/PlanNode.h"
#include "query/PlanProto.h"
@ -30,6 +40,8 @@
#include "segcore/SegmentGrowingImpl.h"
#include "simdjson/padded_string.h"
#include "segcore/segment_c.h"
#include "storage/FileManager.h"
#include "storage/Types.h"
#include "test_utils/DataGen.h"
#include "test_utils/GenExprProto.h"
#include "index/IndexFactory.h"
@ -15963,3 +15975,180 @@ TEST_P(ExprTest, TestJsonContainsDiffTypeNullable) {
}
}
}
template <typename T>
class JsonIndexTestFixture : public testing::Test {
public:
using DataType = T;
JsonIndexTestFixture() {
if constexpr (std::is_same_v<T, bool>) {
schema_data_type = proto::schema::Bool;
json_path = "/bool";
lower_bound.set_bool_val(std::numeric_limits<bool>::min());
upper_bound.set_bool_val(std::numeric_limits<bool>::max());
cast_type = milvus::DataType::BOOL;
wrong_type_val.set_int64_val(123);
} else if constexpr (std::is_same_v<T, int64_t>) {
schema_data_type = proto::schema::Int64;
json_path = "/int";
lower_bound.set_int64_val(std::numeric_limits<int64_t>::min());
upper_bound.set_int64_val(std::numeric_limits<int64_t>::max());
cast_type = milvus::DataType::INT64;
wrong_type_val.set_string_val("123");
} else if constexpr (std::is_same_v<T, double>) {
schema_data_type = proto::schema::Double;
json_path = "/double";
lower_bound.set_float_val(std::numeric_limits<double>::min());
upper_bound.set_float_val(std::numeric_limits<double>::max());
cast_type = milvus::DataType::DOUBLE;
wrong_type_val.set_string_val("123");
} else if constexpr (std::is_same_v<T, std::string>) {
schema_data_type = proto::schema::String;
json_path = "/string";
lower_bound.set_string_val("");
std::string s(1024, '9');
upper_bound.set_string_val(s);
cast_type = milvus::DataType::STRING;
wrong_type_val.set_int64_val(123);
}
}
proto::schema::DataType schema_data_type;
std::string json_path;
proto::plan::GenericValue lower_bound;
proto::plan::GenericValue upper_bound;
milvus::DataType cast_type;
proto::plan::GenericValue wrong_type_val;
};
using JsonIndexTypes = ::testing::Types<bool, int64_t, double, std::string>;
TYPED_TEST_SUITE(JsonIndexTestFixture, JsonIndexTypes);
TYPED_TEST(JsonIndexTestFixture, TestJsonIndexUnaryExpr) {
auto schema = std::make_shared<Schema>();
auto vec_fid = schema->AddDebugField(
"fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2);
auto i32_fid = schema->AddDebugField("age32", DataType::INT32);
auto i64_fid = schema->AddDebugField("age64", DataType::INT64);
auto json_fid = schema->AddDebugField("json", DataType::JSON);
schema->set_primary_field_id(i64_fid);
auto seg = CreateSealedSegment(schema);
int N = 1000;
auto raw_data = DataGen(schema, N);
segcore::LoadIndexInfo load_index_info;
auto file_manager_ctx = storage::FileManagerContext();
file_manager_ctx.fieldDataMeta.field_schema.set_data_type(
milvus::proto::schema::JSON);
file_manager_ctx.fieldDataMeta.field_schema.set_fieldid(json_fid.get());
auto inv_index = index::IndexFactory::GetInstance().CreateJsonIndex(
index::INVERTED_INDEX_TYPE,
this->cast_type,
this->json_path,
file_manager_ctx);
using json_index_type =
index::JsonInvertedIndex<typename TestFixture::DataType>;
auto json_index = std::unique_ptr<json_index_type>(
static_cast<json_index_type*>(inv_index.release()));
auto json_col = raw_data.get_col<std::string>(json_fid);
auto json_field =
std::make_shared<FieldData<milvus::Json>>(DataType::JSON, false);
std::vector<milvus::Json> jsons;
for (auto& json : json_col) {
jsons.push_back(milvus::Json(simdjson::padded_string(json)));
}
json_field->add_json_data(jsons);
json_index->BuildWithFieldData({json_field});
json_index->finish();
json_index->create_reader();
load_index_info.field_id = json_fid.get();
load_index_info.field_type = DataType::JSON;
load_index_info.index = std::move(json_index);
load_index_info.index_params = {{JSON_PATH, this->json_path}};
seg->LoadIndex(load_index_info);
auto json_field_data_info = FieldDataInfo(json_fid.get(), N, {json_field});
seg->LoadFieldData(json_fid, json_field_data_info);
auto unary_expr = std::make_shared<expr::UnaryRangeFilterExpr>(
expr::ColumnInfo(json_fid, DataType::JSON, {this->json_path.substr(1)}),
proto::plan::OpType::LessEqual,
this->upper_bound);
auto plan =
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, unary_expr);
auto final = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP);
EXPECT_EQ(final.count(), N);
// test for wrong filter type
unary_expr = std::make_shared<expr::UnaryRangeFilterExpr>(
expr::ColumnInfo(json_fid, DataType::JSON, {this->json_path.substr(1)}),
proto::plan::OpType::LessEqual,
this->wrong_type_val);
plan =
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, unary_expr);
final = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP);
EXPECT_EQ(final.count(), 0);
unary_expr = std::make_shared<expr::UnaryRangeFilterExpr>(
expr::ColumnInfo(json_fid, DataType::JSON, {this->json_path.substr(1)}),
proto::plan::OpType::GreaterEqual,
this->lower_bound);
plan =
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, unary_expr);
final = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP);
EXPECT_EQ(final.count(), N);
auto term_expr = std::make_shared<expr::TermFilterExpr>(
expr::ColumnInfo(json_fid, DataType::JSON, {this->json_path.substr(1)}),
std::vector<proto::plan::GenericValue>(),
false);
plan =
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, term_expr);
final = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP);
EXPECT_EQ(final.count(), 0);
using DT = std::conditional_t<
std::is_same_v<typename TestFixture::DataType, std::string>,
std::string_view,
typename TestFixture::DataType>;
std::vector<proto::plan::GenericValue> vals;
int expect_count = 10;
if constexpr (std::is_same_v<DT, bool>) {
proto::plan::GenericValue val;
val.set_bool_val(true);
vals.push_back(val);
val.set_bool_val(false);
vals.push_back(val);
expect_count = N;
} else {
for (int i = 0; i < expect_count; ++i) {
proto::plan::GenericValue val;
auto v = jsons[i].at<DT>(this->json_path).value();
if constexpr (std::is_same_v<DT, int64_t>) {
val.set_int64_val(v);
} else if constexpr (std::is_same_v<DT, double>) {
val.set_float_val(v);
} else if constexpr (std::is_same_v<DT, std::string_view>) {
val.set_string_val(std::string(v));
} else if constexpr (std::is_same_v<DT, bool>) {
val.set_bool_val(i % 2 == 0);
}
vals.push_back(val);
}
}
term_expr = std::make_shared<expr::TermFilterExpr>(
expr::ColumnInfo(json_fid, DataType::JSON, {this->json_path.substr(1)}),
vals,
false);
plan =
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, term_expr);
final = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP);
EXPECT_EQ(final.count(), expect_count);
}

View File

@ -665,7 +665,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() {
CollectionID: 1,
}
task.updateAndSaveTaskMeta(setResultSegments([]int64{10, 11}))
_, err := s.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 3)
_, err := s.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 3, false)
s.NoError(err)
s.False(task.Process())
@ -678,7 +678,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() {
indexReq := &indexpb.CreateIndexRequest{
CollectionID: 1,
}
_, err := s.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 3)
_, err := s.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 3, false)
s.NoError(err)
s.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{

View File

@ -45,7 +45,7 @@ func TestGetQueryVChanPositionsRetrieveM2N(t *testing.T) {
CollectionID: 1,
FieldID: 2,
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false)
require.NoError(t, err)
segArgs := []struct {
@ -158,7 +158,7 @@ func TestGetQueryVChanPositions(t *testing.T) {
FieldID: 2,
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false)
assert.NoError(t, err)
s1 := &datapb.SegmentInfo{
@ -336,7 +336,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
CollectionID: 0,
FieldID: 2,
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false)
assert.NoError(t, err)
c := &datapb.SegmentInfo{
ID: 1,
@ -405,7 +405,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
CollectionID: 0,
FieldID: 2,
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false)
assert.NoError(t, err)
a := &datapb.SegmentInfo{
ID: 99,
@ -490,7 +490,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
CollectionID: 0,
FieldID: 2,
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false)
assert.NoError(t, err)
c := &datapb.SegmentInfo{
ID: 1,
@ -599,7 +599,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
CollectionID: 0,
FieldID: 2,
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false)
assert.NoError(t, err)
seg1 := &datapb.SegmentInfo{
ID: 1,
@ -979,7 +979,7 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
CollectionID: 0,
FieldID: 2,
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false)
assert.NoError(t, err)
seg1 := &datapb.SegmentInfo{
ID: 1,
@ -1183,7 +1183,7 @@ func TestGetCurrentSegmentsView(t *testing.T) {
CollectionID: 0,
FieldID: 2,
}
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1)
_, err := svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 1, false)
assert.NoError(t, err)
seg1 := &datapb.SegmentInfo{
ID: 1,

View File

@ -238,6 +238,23 @@ func (m *indexMeta) updateIndexTasksMetrics() {
log.Ctx(m.ctx).Info("update index metric", zap.Int("collectionNum", len(taskMetrics)))
}
func checkJsonParams(index *model.Index, req *indexpb.CreateIndexRequest) bool {
castType1, err := getIndexParam(index.IndexParams, common.JSONCastTypeKey)
if err != nil {
return false
}
castType2, err := getIndexParam(req.GetIndexParams(), common.JSONCastTypeKey)
if err != nil || castType1 != castType2 {
return false
}
jsonPath1, err := getIndexParam(index.IndexParams, common.JSONPathKey)
if err != nil {
return false
}
jsonPath2, err := getIndexParam(req.GetIndexParams(), common.JSONPathKey)
return err == nil && jsonPath1 == jsonPath2
}
func checkParams(fieldIndex *model.Index, req *indexpb.CreateIndexRequest) bool {
metaTypeParams := DeleteParams(fieldIndex.TypeParams, []string{common.MmapEnabledKey})
reqTypeParams := DeleteParams(req.TypeParams, []string{common.MmapEnabledKey})
@ -326,14 +343,13 @@ func checkParams(fieldIndex *model.Index, req *indexpb.CreateIndexRequest) bool
return !notEq
}
// CanCreateIndex currently is used in Unittest
func (m *indexMeta) CanCreateIndex(req *indexpb.CreateIndexRequest) (UniqueID, error) {
func (m *indexMeta) CanCreateIndex(req *indexpb.CreateIndexRequest, isJson bool) (UniqueID, error) {
m.RLock()
defer m.RUnlock()
return m.canCreateIndex(req)
return m.canCreateIndex(req, isJson)
}
func (m *indexMeta) canCreateIndex(req *indexpb.CreateIndexRequest) (UniqueID, error) {
func (m *indexMeta) canCreateIndex(req *indexpb.CreateIndexRequest, isJson bool) (UniqueID, error) {
indexes, ok := m.indexes[req.CollectionID]
if !ok {
return 0, nil
@ -343,7 +359,7 @@ func (m *indexMeta) canCreateIndex(req *indexpb.CreateIndexRequest) (UniqueID, e
continue
}
if req.IndexName == index.IndexName {
if req.FieldID == index.FieldID && checkParams(index, req) {
if req.FieldID == index.FieldID && checkParams(index, req) && (!isJson || checkJsonParams(index, req)) {
return index.IndexID, nil
}
errMsg := "at most one distinct index is allowed per field"
@ -355,6 +371,20 @@ func (m *indexMeta) canCreateIndex(req *indexpb.CreateIndexRequest) (UniqueID, e
return 0, fmt.Errorf("CreateIndex failed: %s", errMsg)
}
if req.FieldID == index.FieldID {
if isJson {
// if it is json index, check if json paths are same
jsonPath1, err := getIndexParam(index.IndexParams, common.JSONPathKey)
if err != nil {
return 0, err
}
jsonPath2, err := getIndexParam(req.GetIndexParams(), common.JSONPathKey)
if err != nil {
return 0, err
}
if jsonPath1 != jsonPath2 {
continue
}
}
// creating multiple indexes on same field is not supported
errMsg := "CreateIndex failed: creating multiple indexes on same field is not supported"
log.Warn(errMsg)
@ -388,11 +418,11 @@ func (m *indexMeta) HasSameReq(req *indexpb.CreateIndexRequest) (bool, UniqueID)
return false, 0
}
func (m *indexMeta) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest, allocatedIndexID UniqueID) (UniqueID, error) {
func (m *indexMeta) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest, allocatedIndexID UniqueID, isJson bool) (UniqueID, error) {
m.Lock()
defer m.Unlock()
indexID, err := m.canCreateIndex(req)
indexID, err := m.canCreateIndex(req, isJson)
if err != nil {
return indexID, err
}

View File

@ -132,7 +132,7 @@ func TestMeta_ScalarAutoIndex(t *testing.T) {
UserIndexParams: userIndexParams,
},
}
tmpIndexID, err := m.CanCreateIndex(req)
tmpIndexID, err := m.CanCreateIndex(req, false)
assert.NoError(t, err)
assert.Equal(t, int64(indexID), tmpIndexID)
})
@ -154,12 +154,12 @@ func TestMeta_ScalarAutoIndex(t *testing.T) {
},
}
req.UserIndexParams = append(req.UserIndexParams, &commonpb.KeyValuePair{Key: "bitmap_cardinality_limit", Value: "1000"})
tmpIndexID, err := m.CanCreateIndex(req)
tmpIndexID, err := m.CanCreateIndex(req, false)
assert.Error(t, err)
assert.Equal(t, int64(0), tmpIndexID)
req.UserIndexParams = append(req.UserIndexParams, &commonpb.KeyValuePair{Key: "bitmap_cardinality_limit", Value: "500"})
tmpIndexID, err = m.CanCreateIndex(req)
tmpIndexID, err = m.CanCreateIndex(req, false)
assert.Error(t, err)
assert.Equal(t, int64(0), tmpIndexID)
})
@ -201,7 +201,7 @@ func TestMeta_ScalarAutoIndex(t *testing.T) {
UserIndexParams: userIndexParams,
},
}
tmpIndexID, err := m.CanCreateIndex(req)
tmpIndexID, err := m.CanCreateIndex(req, false)
assert.NoError(t, err)
assert.Equal(t, int64(indexID), tmpIndexID)
newIndexParams := req.GetIndexParams()
@ -266,44 +266,44 @@ func TestMeta_CanCreateIndex(t *testing.T) {
}
t.Run("can create index", func(t *testing.T) {
tmpIndexID, err := m.CanCreateIndex(req)
tmpIndexID, err := m.CanCreateIndex(req, false)
assert.NoError(t, err)
assert.Equal(t, int64(0), tmpIndexID)
_, err = m.CreateIndex(context.TODO(), req, indexID)
_, err = m.CreateIndex(context.TODO(), req, indexID, false)
assert.NoError(t, err)
tmpIndexID, err = m.CanCreateIndex(req)
tmpIndexID, err = m.CanCreateIndex(req, false)
assert.NoError(t, err)
assert.Equal(t, indexID, tmpIndexID)
})
t.Run("params not consistent", func(t *testing.T) {
req.TypeParams = append(req.TypeParams, &commonpb.KeyValuePair{Key: "primary_key", Value: "false"})
tmpIndexID, err := m.CanCreateIndex(req)
tmpIndexID, err := m.CanCreateIndex(req, false)
assert.Error(t, err)
assert.Equal(t, int64(0), tmpIndexID)
req.TypeParams = []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "64"}}
tmpIndexID, err = m.CanCreateIndex(req)
tmpIndexID, err = m.CanCreateIndex(req, false)
assert.Error(t, err)
assert.Equal(t, int64(0), tmpIndexID)
req.TypeParams = typeParams
req.UserIndexParams = append(indexParams, &commonpb.KeyValuePair{Key: "metrics_type", Value: "L2"})
tmpIndexID, err = m.CanCreateIndex(req)
tmpIndexID, err = m.CanCreateIndex(req, false)
assert.Error(t, err)
assert.Equal(t, int64(0), tmpIndexID)
req.IndexParams = []*commonpb.KeyValuePair{{Key: common.IndexTypeKey, Value: "HNSW"}}
req.UserIndexParams = req.IndexParams
tmpIndexID, err = m.CanCreateIndex(req)
tmpIndexID, err = m.CanCreateIndex(req, false)
assert.Error(t, err)
assert.Equal(t, int64(0), tmpIndexID)
req.IndexParams = []*commonpb.KeyValuePair{{Key: common.IndexTypeKey, Value: "FLAT"}, {Key: common.MetricTypeKey, Value: "COSINE"}}
req.UserIndexParams = req.IndexParams
tmpIndexID, err = m.CanCreateIndex(req)
tmpIndexID, err = m.CanCreateIndex(req, false)
assert.Error(t, err)
assert.Equal(t, int64(0), tmpIndexID)
@ -312,7 +312,7 @@ func TestMeta_CanCreateIndex(t *testing.T) {
req.IndexParams = []*commonpb.KeyValuePair{{Key: common.IndexTypeKey, Value: "FLAT"}, {Key: common.MetricTypeKey, Value: "COSINE"}}
req.UserIndexParams = []*commonpb.KeyValuePair{{Key: common.IndexTypeKey, Value: "AUTOINDEX"}, {Key: common.MetricTypeKey, Value: "COSINE"}}
req.UserAutoindexMetricTypeSpecified = false
tmpIndexID, err = m.CanCreateIndex(req)
tmpIndexID, err = m.CanCreateIndex(req, false)
assert.NoError(t, err)
assert.Equal(t, indexID, tmpIndexID)
// req should follow the meta
@ -323,14 +323,14 @@ func TestMeta_CanCreateIndex(t *testing.T) {
req.IndexParams = []*commonpb.KeyValuePair{{Key: common.IndexTypeKey, Value: "FLAT"}, {Key: common.MetricTypeKey, Value: "COSINE"}}
req.UserIndexParams = []*commonpb.KeyValuePair{{Key: common.IndexTypeKey, Value: "AUTOINDEX"}, {Key: common.MetricTypeKey, Value: "COSINE"}}
req.UserAutoindexMetricTypeSpecified = true
tmpIndexID, err = m.CanCreateIndex(req)
tmpIndexID, err = m.CanCreateIndex(req, false)
assert.Error(t, err)
assert.Equal(t, int64(0), tmpIndexID)
req.IndexParams = indexParams
req.UserIndexParams = indexParams
req.FieldID++
tmpIndexID, err = m.CanCreateIndex(req)
tmpIndexID, err = m.CanCreateIndex(req, false)
assert.Error(t, err)
assert.Equal(t, int64(0), tmpIndexID)
})
@ -338,14 +338,14 @@ func TestMeta_CanCreateIndex(t *testing.T) {
t.Run("multiple indexes", func(t *testing.T) {
req.IndexName = "_default_idx_2"
req.FieldID = fieldID
tmpIndexID, err := m.CanCreateIndex(req)
tmpIndexID, err := m.CanCreateIndex(req, false)
assert.Error(t, err)
assert.Equal(t, int64(0), tmpIndexID)
})
t.Run("index has been deleted", func(t *testing.T) {
m.indexes[collID][indexID].IsDeleted = true
tmpIndexID, err := m.CanCreateIndex(req)
tmpIndexID, err := m.CanCreateIndex(req, false)
assert.NoError(t, err)
assert.Equal(t, int64(0), tmpIndexID)
})
@ -472,7 +472,7 @@ func TestMeta_CreateIndex(t *testing.T) {
).Return(nil)
m := newSegmentIndexMeta(sc)
_, err := m.CreateIndex(context.TODO(), req, 3)
_, err := m.CreateIndex(context.TODO(), req, 3, false)
assert.NoError(t, err)
})
@ -484,7 +484,7 @@ func TestMeta_CreateIndex(t *testing.T) {
).Return(errors.New("fail"))
m := newSegmentIndexMeta(ec)
_, err := m.CreateIndex(context.TODO(), req, 4)
_, err := m.CreateIndex(context.TODO(), req, 4, false)
assert.Error(t, err)
})
}

View File

@ -19,20 +19,25 @@ package datacoord
import (
"context"
"fmt"
"strings"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/parser/planparserv2"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/internal/util/vecindexmgr"
"github.com/milvus-io/milvus/pkg/v2/common"
pkgcommon "github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/planpb"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/metautil"
@ -165,13 +170,8 @@ func (s *Server) createIndexForSegmentLoop(ctx context.Context) {
}
}
func (s *Server) getFieldNameByID(ctx context.Context, collID, fieldID int64) (string, error) {
resp, err := s.broker.DescribeCollectionInternal(ctx, collID)
if err != nil {
return "", err
}
for _, field := range resp.GetSchema().GetFields() {
func (s *Server) getFieldNameByID(schema *schemapb.CollectionSchema, fieldID int64) (string, error) {
for _, field := range schema.GetFields() {
if field.FieldID == fieldID {
return field.Name, nil
}
@ -179,6 +179,62 @@ func (s *Server) getFieldNameByID(ctx context.Context, collID, fieldID int64) (s
return "", nil
}
func (s *Server) getSchema(ctx context.Context, collID int64) (*schemapb.CollectionSchema, error) {
resp, err := s.broker.DescribeCollectionInternal(ctx, collID)
if err != nil {
return nil, err
}
return resp.GetSchema(), nil
}
func isJsonField(schema *schemapb.CollectionSchema, fieldID int64) (bool, error) {
for _, f := range schema.Fields {
if f.FieldID == fieldID {
return typeutil.IsJSONType(f.DataType), nil
}
}
return false, merr.WrapErrFieldNotFound(fieldID)
}
func getIndexParam(indexParams []*commonpb.KeyValuePair, key string) (string, error) {
for _, p := range indexParams {
if p.Key == key {
return p.Value, nil
}
}
return "", merr.WrapErrParameterInvalidMsg("%s not found", key)
}
func setIndexParam(indexParams []*commonpb.KeyValuePair, key, value string) {
for _, p := range indexParams {
if p.Key == key {
p.Value = value
}
}
}
func (s *Server) parseAndVerifyNestedPath(identifier string, schema *schemapb.CollectionSchema, fieldID int64) (string, error) {
helper, err := typeutil.CreateSchemaHelper(schema)
if err != nil {
return "", err
}
var identifierExpr *planpb.Expr
err = planparserv2.ParseIdentifier(helper, identifier, func(expr *planpb.Expr) error {
identifierExpr = expr
return nil
})
if err != nil {
return "", err
}
if identifierExpr.GetColumnExpr().GetInfo().GetFieldId() != fieldID {
return "", fmt.Errorf("fieldID not match with field name")
}
nestedPath := identifierExpr.GetColumnExpr().GetInfo().GetNestedPath()
return "/" + strings.Join(nestedPath, "/"), nil
}
// CreateIndex create an index on collection.
// Index building is asynchronous, so when an index building request comes, an IndexID is assigned to the task and
// will get all flushed segments from DataCoord and record tasks with these segments. The background process
@ -200,15 +256,53 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
}
metrics.IndexRequestCounter.WithLabelValues(metrics.TotalLabel).Inc()
schema, err := s.getSchema(ctx, req.GetCollectionID())
if err != nil {
return merr.Status(err), nil
}
isJson, err := isJsonField(schema, req.GetFieldID())
if err != nil {
return merr.Status(err), nil
}
if isJson {
jsonPath, err := getIndexParam(req.GetIndexParams(), common.JSONPathKey)
if err != nil {
log.Error("get json path from index params failed", zap.Error(err))
return merr.Status(err), nil
}
nestedPath, err := s.parseAndVerifyNestedPath(jsonPath, schema, req.GetFieldID())
if err != nil {
log.Error("parse nested path failed", zap.Error(err))
return merr.Status(err), nil
}
setIndexParam(req.GetIndexParams(), common.JSONPathKey, nestedPath)
}
if req.GetIndexName() == "" {
indexes := s.meta.indexMeta.GetFieldIndexes(req.GetCollectionID(), req.GetFieldID(), req.GetIndexName())
if len(indexes) == 0 {
fieldName, err := s.getFieldNameByID(ctx, req.GetCollectionID(), req.GetFieldID())
fieldName, err := s.getFieldNameByID(schema, req.GetFieldID())
if err != nil {
log.Warn("get field name from schema failed", zap.Int64("fieldID", req.GetFieldID()))
return merr.Status(err), nil
}
req.IndexName = fieldName
defaultIndexName := fieldName
if isJson {
jsonPath, err := getIndexParam(req.GetIndexParams(), common.JSONPathKey)
if err != nil {
return merr.Status(err), nil
}
indexes = lo.Filter(indexes, func(index *model.Index, i int) bool {
path, err := getIndexParam(index.IndexParams, common.JSONPathKey)
return err == nil && path == jsonPath
})
defaultIndexName += jsonPath
}
if len(indexes) == 0 {
req.IndexName = defaultIndexName
} else if len(indexes) == 1 {
req.IndexName = indexes[0].IndexName
}
@ -229,7 +323,7 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
}
// Get flushed segments and create index
indexID, err := s.meta.indexMeta.CreateIndex(ctx, req, allocatedIndexID)
indexID, err := s.meta.indexMeta.CreateIndex(ctx, req, allocatedIndexID, isJson)
if err != nil {
log.Warn("CreateIndex fail",
zap.Int64("fieldID", req.GetFieldID()), zap.String("indexName", req.GetIndexName()), zap.Error(err))

View File

@ -19,6 +19,7 @@ package datacoord
import (
"context"
"fmt"
"strconv"
"testing"
"time"
@ -2584,3 +2585,178 @@ func TestValidateIndexParams(t *testing.T) {
assert.Error(t, err)
})
}
func TestJsonIndex(t *testing.T) {
catalog := catalogmocks.NewDataCoordCatalog(t)
catalog.EXPECT().CreateIndex(mock.Anything, mock.Anything).Return(nil).Maybe()
mock0Allocator := newMockAllocator(t)
indexMeta := newSegmentIndexMeta(catalog)
b := mocks.NewMockRootCoordClient(t)
b.EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{
ErrorCode: 0,
Code: 0,
},
Schema: &schemapb.CollectionSchema{
Name: "test_index",
Fields: []*schemapb.FieldSchema{
{
FieldID: 0,
Name: "json",
DataType: schemapb.DataType_JSON,
},
{
FieldID: 1,
Name: "json2",
DataType: schemapb.DataType_JSON,
},
{
FieldID: 2,
Name: "dynamic",
DataType: schemapb.DataType_JSON,
IsDynamic: true,
},
},
},
}, nil)
s := &Server{
meta: &meta{
catalog: catalog,
collections: map[UniqueID]*collectionInfo{
collID: {
ID: collID,
},
},
indexMeta: indexMeta,
},
allocator: mock0Allocator,
notifyIndexChan: make(chan UniqueID, 1),
broker: broker.NewCoordinatorBroker(b),
}
s.stateCode.Store(commonpb.StateCode_Healthy)
req := &indexpb.CreateIndexRequest{
FieldID: 0,
IndexName: "a",
IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_String))}, {Key: common.JSONPathKey, Value: "json[\"a\"]"}},
}
resp, err := s.CreateIndex(context.Background(), req)
assert.NoError(t, merr.CheckRPCCall(resp, err))
req = &indexpb.CreateIndexRequest{
FieldID: 0,
IndexName: "",
IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_String))}, {Key: common.JSONPathKey, Value: "json[\"c\"]"}},
}
resp, err = s.CreateIndex(context.Background(), req)
assert.NoError(t, merr.CheckRPCCall(resp, err))
// different json field with same json path
req = &indexpb.CreateIndexRequest{
FieldID: 1,
IndexName: "",
IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_String))}, {Key: common.JSONPathKey, Value: "json2[\"c\"]"}},
}
resp, err = s.CreateIndex(context.Background(), req)
assert.NoError(t, merr.CheckRPCCall(resp, err))
// duplicated index with same params
req = &indexpb.CreateIndexRequest{
FieldID: 0,
IndexName: "a",
IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_String))}, {Key: common.JSONPathKey, Value: "json[\"a\"]"}},
}
resp, err = s.CreateIndex(context.Background(), req)
assert.NoError(t, merr.CheckRPCCall(resp, err))
// duplicated index with different cast type
req = &indexpb.CreateIndexRequest{
FieldID: 0,
IndexName: "a",
IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_Int16))}, {Key: common.JSONPathKey, Value: "json[\"a\"]"}},
}
resp, err = s.CreateIndex(context.Background(), req)
assert.Error(t, merr.CheckRPCCall(resp, err))
// duplicated index with different index name
req = &indexpb.CreateIndexRequest{
FieldID: 0,
IndexName: "b",
IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_Int16))}, {Key: common.JSONPathKey, Value: "json[\"a\"]"}},
}
resp, err = s.CreateIndex(context.Background(), req)
assert.Error(t, merr.CheckRPCCall(resp, err))
// another field json index with same index name
req = &indexpb.CreateIndexRequest{
FieldID: 0,
IndexName: "a",
IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_Int16))}, {Key: common.JSONPathKey, Value: "json[\"b\"]"}},
}
resp, err = s.CreateIndex(context.Background(), req)
assert.Error(t, merr.CheckRPCCall(resp, err))
// lack of json params
req = &indexpb.CreateIndexRequest{
FieldID: 0,
IndexName: "a",
IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONPathKey, Value: "json[\"a\"]"}},
}
resp, err = s.CreateIndex(context.Background(), req)
assert.Error(t, merr.CheckRPCCall(resp, err))
// incorrect field name in json path
req = &indexpb.CreateIndexRequest{
FieldID: 1,
IndexName: "c",
IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_Int16))}, {Key: common.JSONPathKey, Value: "bad_json[\"a\"]"}},
}
resp, err = s.CreateIndex(context.Background(), req)
assert.Error(t, merr.CheckRPCCall(resp, err))
// dynamic field
req = &indexpb.CreateIndexRequest{
FieldID: 2,
IndexName: "",
IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_Int16))}, {Key: common.JSONPathKey, Value: "dynamic_a_field"}},
}
resp, err = s.CreateIndex(context.Background(), req)
assert.NoError(t, merr.CheckRPCCall(resp, err))
// wrong path: missing quotes
req = &indexpb.CreateIndexRequest{
FieldID: 0,
IndexName: "d",
IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_Int16))}, {Key: common.JSONPathKey, Value: "json[a][\"b\"]"}},
}
resp, err = s.CreateIndex(context.Background(), req)
assert.Error(t, merr.CheckRPCCall(resp, err))
// wrong path: missing closing quote
req = &indexpb.CreateIndexRequest{
FieldID: 0,
IndexName: "e",
IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_Int16))}, {Key: common.JSONPathKey, Value: "json[\"a\"][\"b"}},
}
resp, err = s.CreateIndex(context.Background(), req)
assert.Error(t, merr.CheckRPCCall(resp, err))
// wrong path: malformed brackets
req = &indexpb.CreateIndexRequest{
FieldID: 0,
IndexName: "f",
IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_Int16))}, {Key: common.JSONPathKey, Value: "json[\"a\"[\"b]"}},
}
resp, err = s.CreateIndex(context.Background(), req)
assert.Error(t, merr.CheckRPCCall(resp, err))
// valid path with array index
req = &indexpb.CreateIndexRequest{
FieldID: 0,
IndexName: "g",
IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_Int16))}, {Key: common.JSONPathKey, Value: "json[\"a\"][0][\"b\"]"}},
}
resp, err = s.CreateIndex(context.Background(), req)
assert.NoError(t, merr.CheckRPCCall(resp, err))
}

View File

@ -1235,7 +1235,7 @@ func TestGetRecoveryInfo(t *testing.T) {
CollectionID: 0,
FieldID: 2,
}
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0)
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0, false)
assert.NoError(t, err)
seg1 := createSegment(0, 0, 0, 100, 10, "vchan1", commonpb.SegmentState_Flushed)
@ -1456,7 +1456,7 @@ func TestGetRecoveryInfo(t *testing.T) {
CollectionID: 0,
FieldID: 2,
}
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0)
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0, false)
assert.NoError(t, err)
err = svr.meta.indexMeta.AddSegmentIndex(context.TODO(), &model.SegmentIndex{
SegmentID: segment.ID,
@ -1620,7 +1620,7 @@ func TestGetRecoveryInfo(t *testing.T) {
FieldID: 2,
IndexName: "_default_idx_2",
}
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0)
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0, false)
assert.NoError(t, err)
svr.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{
SegmentID: seg4.ID,

View File

@ -857,7 +857,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
CollectionID: 0,
FieldID: 2,
}
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0)
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0, false)
assert.NoError(t, err)
seg1 := createSegment(0, 0, 0, 100, 10, "vchan1", commonpb.SegmentState_Flushed)
@ -1080,7 +1080,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
CollectionID: 0,
FieldID: 2,
}
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0)
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0, false)
assert.NoError(t, err)
err = svr.meta.indexMeta.AddSegmentIndex(context.TODO(), &model.SegmentIndex{
SegmentID: segment.ID,
@ -1245,7 +1245,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
FieldID: 2,
IndexName: "_default_idx_2",
}
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0)
_, err = svr.meta.indexMeta.CreateIndex(context.TODO(), indexReq, 0, false)
assert.NoError(t, err)
svr.meta.indexMeta.updateSegmentIndex(&model.SegmentIndex{
SegmentID: seg4.ID,

View File

@ -817,6 +817,7 @@ func GenAndSaveIndexV2(collectionID, partitionID, segmentID, buildID int64,
IndexParams: indexInfo.GetIndexParams(),
IndexFilePaths: indexPaths,
CurrentIndexVersion: indexVersion.CurrentIndexVersion,
IndexID: indexInfo.GetIndexID(),
}, nil
}

View File

@ -420,7 +420,7 @@ func (cit *createIndexTask) getIndexedFieldAndFunction(ctx context.Context) erro
return fmt.Errorf("failed to get collection schema: %s", err)
}
field, err := schema.schemaHelper.GetFieldFromName(cit.req.GetFieldName())
field, err := schema.schemaHelper.GetFieldFromNameDefaultJSON(cit.req.GetFieldName())
if err != nil {
log.Ctx(ctx).Error("create index on non-exist field", zap.Error(err))
return fmt.Errorf("cannot create index on non-exist field: %s", cit.req.GetFieldName())

View File

@ -164,7 +164,7 @@ func (c *IndexChecker) checkSegment(segment *meta.Segment, indexInfos []*indexpb
var result []int64
for _, indexInfo := range indexInfos {
fieldID, indexID := indexInfo.FieldID, indexInfo.IndexID
info, ok := segment.IndexInfo[fieldID]
info, ok := segment.IndexInfo[indexID]
if !ok {
result = append(result, fieldID)
continue

View File

@ -124,8 +124,8 @@ type Segment struct {
Node int64 // Node the segment is in
Version int64 // Version is the timestamp of loading segment
LastDeltaTimestamp uint64 // The timestamp of the last delta record
IndexInfo map[int64]*querypb.FieldIndexInfo // index info of loaded segment
JSONIndexField []int64 // json index info of loaded segment
IndexInfo map[int64]*querypb.FieldIndexInfo // index info of loaded segment, indexID -> FieldIndexInfo
}
func SegmentFromInfo(info *datapb.SegmentInfo) *Segment {

View File

@ -411,19 +411,19 @@ func (_c *MockSegment_GetFieldJSONIndexStats_Call) RunAndReturn(run func() []int
}
// GetIndex provides a mock function with given fields: fieldID
func (_m *MockSegment) GetIndex(fieldID int64) *IndexedFieldInfo {
func (_m *MockSegment) GetIndex(fieldID int64) []*IndexedFieldInfo {
ret := _m.Called(fieldID)
if len(ret) == 0 {
panic("no return value specified for GetIndex")
}
var r0 *IndexedFieldInfo
if rf, ok := ret.Get(0).(func(int64) *IndexedFieldInfo); ok {
var r0 []*IndexedFieldInfo
if rf, ok := ret.Get(0).(func(int64) []*IndexedFieldInfo); ok {
r0 = rf(fieldID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*IndexedFieldInfo)
r0 = ret.Get(0).([]*IndexedFieldInfo)
}
}
@ -448,12 +448,60 @@ func (_c *MockSegment_GetIndex_Call) Run(run func(fieldID int64)) *MockSegment_G
return _c
}
func (_c *MockSegment_GetIndex_Call) Return(_a0 *IndexedFieldInfo) *MockSegment_GetIndex_Call {
func (_c *MockSegment_GetIndex_Call) Return(_a0 []*IndexedFieldInfo) *MockSegment_GetIndex_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSegment_GetIndex_Call) RunAndReturn(run func(int64) *IndexedFieldInfo) *MockSegment_GetIndex_Call {
func (_c *MockSegment_GetIndex_Call) RunAndReturn(run func(int64) []*IndexedFieldInfo) *MockSegment_GetIndex_Call {
_c.Call.Return(run)
return _c
}
// GetIndexByID provides a mock function with given fields: indexID
func (_m *MockSegment) GetIndexByID(indexID int64) *IndexedFieldInfo {
ret := _m.Called(indexID)
if len(ret) == 0 {
panic("no return value specified for GetIndexByID")
}
var r0 *IndexedFieldInfo
if rf, ok := ret.Get(0).(func(int64) *IndexedFieldInfo); ok {
r0 = rf(indexID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*IndexedFieldInfo)
}
}
return r0
}
// MockSegment_GetIndexByID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetIndexByID'
type MockSegment_GetIndexByID_Call struct {
*mock.Call
}
// GetIndexByID is a helper method to define mock.On call
// - indexID int64
func (_e *MockSegment_Expecter) GetIndexByID(indexID interface{}) *MockSegment_GetIndexByID_Call {
return &MockSegment_GetIndexByID_Call{Call: _e.mock.On("GetIndexByID", indexID)}
}
func (_c *MockSegment_GetIndexByID_Call) Run(run func(indexID int64)) *MockSegment_GetIndexByID_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockSegment_GetIndexByID_Call) Return(_a0 *IndexedFieldInfo) *MockSegment_GetIndexByID_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSegment_GetIndexByID_Call) RunAndReturn(run func(int64) *IndexedFieldInfo) *MockSegment_GetIndexByID_Call {
_c.Call.Return(run)
return _c
}

View File

@ -295,7 +295,7 @@ type LocalSegment struct {
lastDeltaTimestamp *atomic.Uint64
fields *typeutil.ConcurrentMap[int64, *FieldInfo]
fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo]
fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo] // indexID -> IndexedFieldInfo
warmupDispatcher *AsyncWarmupDispatcher
fieldJSONStats []int64
}
@ -383,13 +383,14 @@ func (s *LocalSegment) initializeSegment() error {
indexedFieldInfos, fieldBinlogs := separateIndexAndBinlog(loadInfo)
schemaHelper, _ := typeutil.CreateSchemaHelper(s.collection.Schema())
for fieldID, info := range indexedFieldInfos {
for _, info := range indexedFieldInfos {
fieldID := info.IndexInfo.FieldID
field, err := schemaHelper.GetFieldFromID(fieldID)
if err != nil {
return err
}
indexInfo := info.IndexInfo
s.fieldIndexes.Insert(indexInfo.GetFieldID(), &IndexedFieldInfo{
s.fieldIndexes.Insert(indexInfo.GetIndexID(), &IndexedFieldInfo{
FieldBinlog: &datapb.FieldBinlog{
FieldID: indexInfo.GetFieldID(),
},
@ -473,17 +474,32 @@ func (s *LocalSegment) LastDeltaTimestamp() uint64 {
return s.lastDeltaTimestamp.Load()
}
func (s *LocalSegment) GetIndex(fieldID int64) *IndexedFieldInfo {
info, _ := s.fieldIndexes.Get(fieldID)
func (s *LocalSegment) GetIndexByID(indexID int64) *IndexedFieldInfo {
info, _ := s.fieldIndexes.Get(indexID)
return info
}
func (s *LocalSegment) GetIndex(fieldID int64) []*IndexedFieldInfo {
var info []*IndexedFieldInfo
s.fieldIndexes.Range(func(key int64, value *IndexedFieldInfo) bool {
if value.IndexInfo.FieldID == fieldID {
info = append(info, value)
}
return true
})
return info
}
func (s *LocalSegment) ExistIndex(fieldID int64) bool {
fieldInfo, ok := s.fieldIndexes.Get(fieldID)
if !ok {
return false
contain := false
s.fieldIndexes.Range(func(key int64, value *IndexedFieldInfo) bool {
if value.IndexInfo.FieldID == fieldID {
contain = true
}
return fieldInfo.IndexInfo != nil
return !contain
})
return contain
}
func (s *LocalSegment) HasRawData(fieldID int64) bool {
@ -1002,9 +1018,9 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn
zap.Int64("indexID", indexInfo.GetIndexID()),
)
old := s.GetIndex(indexInfo.GetFieldID())
old := s.GetIndexByID(indexInfo.GetIndexID())
// the index loaded
if old != nil && old.IndexInfo.GetIndexID() == indexInfo.GetIndexID() && old.IsLoaded {
if old != nil && old.IsLoaded {
log.Warn("index already loaded")
return nil
}
@ -1208,7 +1224,7 @@ func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.F
return err
}
s.fieldIndexes.Insert(indexInfo.GetFieldID(), &IndexedFieldInfo{
s.fieldIndexes.Insert(indexInfo.GetIndexID(), &IndexedFieldInfo{
FieldBinlog: &datapb.FieldBinlog{
FieldID: indexInfo.GetFieldID(),
},

View File

@ -71,7 +71,8 @@ type Segment interface {
ResourceUsageEstimate() ResourceUsage
// Index related
GetIndex(fieldID int64) *IndexedFieldInfo
GetIndexByID(indexID int64) *IndexedFieldInfo
GetIndex(fieldID int64) []*IndexedFieldInfo
ExistIndex(fieldID int64) bool
Indexes() []*IndexedFieldInfo
HasRawData(fieldID int64) bool

View File

@ -105,7 +105,11 @@ func (s *L0Segment) LastDeltaTimestamp() uint64 {
return last
}
func (s *L0Segment) GetIndex(fieldID int64) *IndexedFieldInfo {
func (s *L0Segment) GetIndex(fieldID int64) []*IndexedFieldInfo {
return nil
}
func (s *L0Segment) GetIndexByID(indexID int64) *IndexedFieldInfo {
return nil
}

View File

@ -661,11 +661,11 @@ func (loader *segmentLoader) LoadBloomFilterSet(ctx context.Context, collectionI
}
func separateIndexAndBinlog(loadInfo *querypb.SegmentLoadInfo) (map[int64]*IndexedFieldInfo, []*datapb.FieldBinlog) {
fieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo)
fieldID2IndexInfo := make(map[int64][]*querypb.FieldIndexInfo)
for _, indexInfo := range loadInfo.IndexInfos {
if len(indexInfo.GetIndexFilePaths()) > 0 {
fieldID := indexInfo.FieldID
fieldID2IndexInfo[fieldID] = indexInfo
fieldID2IndexInfo[fieldID] = append(fieldID2IndexInfo[fieldID], indexInfo)
}
}
@ -676,11 +676,13 @@ func separateIndexAndBinlog(loadInfo *querypb.SegmentLoadInfo) (map[int64]*Index
fieldID := fieldBinlog.FieldID
// check num rows of data meta and index meta are consistent
if indexInfo, ok := fieldID2IndexInfo[fieldID]; ok {
for _, index := range indexInfo {
fieldInfo := &IndexedFieldInfo{
FieldBinlog: fieldBinlog,
IndexInfo: indexInfo,
IndexInfo: index,
}
indexedFieldInfos[index.IndexID] = fieldInfo
}
indexedFieldInfos[fieldID] = fieldInfo
} else {
fieldBinlogs = append(fieldBinlogs, fieldBinlog)
}
@ -696,11 +698,11 @@ func separateLoadInfoV2(loadInfo *querypb.SegmentLoadInfo, schema *schemapb.Coll
map[int64]struct{}, // unindexed text fields
map[int64]*datapb.JsonKeyStats, // json key stats info
) {
fieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo)
fieldID2IndexInfo := make(map[int64][]*querypb.FieldIndexInfo)
for _, indexInfo := range loadInfo.IndexInfos {
if len(indexInfo.GetIndexFilePaths()) > 0 {
fieldID := indexInfo.FieldID
fieldID2IndexInfo[fieldID] = indexInfo
fieldID2IndexInfo[fieldID] = append(fieldID2IndexInfo[fieldID], indexInfo)
}
}
@ -710,12 +712,14 @@ func separateLoadInfoV2(loadInfo *querypb.SegmentLoadInfo, schema *schemapb.Coll
for _, fieldBinlog := range loadInfo.BinlogPaths {
fieldID := fieldBinlog.FieldID
// check num rows of data meta and index meta are consistent
if indexInfo, ok := fieldID2IndexInfo[fieldID]; ok {
if infos, ok := fieldID2IndexInfo[fieldID]; ok {
for _, indexInfo := range infos {
fieldInfo := &IndexedFieldInfo{
FieldBinlog: fieldBinlog,
IndexInfo: indexInfo,
}
indexedFieldInfos[fieldID] = fieldInfo
indexedFieldInfos[indexInfo.IndexID] = fieldInfo
}
} else {
fieldBinlogs = append(fieldBinlogs, fieldBinlog)
}
@ -782,7 +786,7 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu
log := log.Ctx(ctx).With(zap.Int64("segmentID", segment.ID()))
tr := timerecord.NewTimeRecorder("segmentLoader.loadSealedSegment")
log.Info("Start loading fields...",
zap.Int64s("indexedFields", lo.Keys(indexedFieldInfos)),
// zap.Int64s("indexedFields", lo.Keys(indexedFieldInfos)),
zap.Int64s("indexed text fields", lo.Keys(textIndexes)),
zap.Int64s("unindexed text fields", lo.Keys(unindexedTextFields)),
zap.Int64s("indexed json key fields", lo.Keys(jsonKeyStats)),
@ -794,7 +798,8 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu
metrics.QueryNodeLoadIndexLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(loadFieldsIndexSpan.Milliseconds()))
// 2. complement raw data for the scalar fields without raw data
for fieldID, info := range indexedFieldInfos {
for _, info := range indexedFieldInfos {
fieldID := info.IndexInfo.FieldID
field, err := schemaHelper.GetFieldFromID(fieldID)
if err != nil {
return err
@ -1044,7 +1049,8 @@ func (loader *segmentLoader) loadFieldsIndex(ctx context.Context,
zap.Int64("rowCount", numRows),
)
for fieldID, fieldInfo := range indexedFieldInfos {
for _, fieldInfo := range indexedFieldInfos {
fieldID := fieldInfo.IndexInfo.FieldID
indexInfo := fieldInfo.IndexInfo
tr := timerecord.NewTimeRecorder("loadFieldIndex")
err := loader.loadFieldIndex(ctx, segment, indexInfo)
@ -1305,7 +1311,7 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment,
func (loader *segmentLoader) patchEntryNumber(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo) error {
var needReset bool
segment.fieldIndexes.Range(func(fieldID int64, info *IndexedFieldInfo) bool {
segment.fieldIndexes.Range(func(indexID int64, info *IndexedFieldInfo) bool {
for _, info := range info.FieldBinlog.GetBinlogs() {
if info.GetEntriesNum() == 0 {
needReset = true
@ -1355,7 +1361,7 @@ func (loader *segmentLoader) patchEntryNumber(ctx context.Context, segment *Loca
}
var err error
segment.fieldIndexes.Range(func(fieldID int64, info *IndexedFieldInfo) bool {
segment.fieldIndexes.Range(func(indexID int64, info *IndexedFieldInfo) bool {
if len(info.FieldBinlog.GetBinlogs()) != len(counts) {
err = errors.New("rowID & index binlog number not matched")
return false

View File

@ -678,7 +678,7 @@ func (suite *SegmentLoaderSuite) TestPatchEntryNum() {
info := segment.GetIndex(vecFields[0])
suite.Require().NotNil(info)
for _, binlog := range info.FieldBinlog.GetBinlogs() {
for _, binlog := range info[0].FieldBinlog.GetBinlogs() {
suite.Greater(binlog.EntriesNum, int64(0))
}
}

View File

@ -628,8 +628,15 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *querypb.GetSegmen
indexInfos []*querypb.FieldIndexInfo
)
for _, field := range vecFields {
index := segment.GetIndex(field)
if index != nil {
indexes := segment.GetIndex(field)
if indexes != nil {
if len(indexes) != 1 {
log.Error("only support one index for vector field", zap.Int64("fieldID", field), zap.Int("index count", len(indexes)))
return &querypb.GetSegmentInfoResponse{
Status: merr.Status(merr.WrapErrServiceInternal("only support one index for vector field")),
}, nil
}
index := indexes[0]
indexName = index.IndexInfo.GetIndexName()
indexID = index.IndexInfo.GetIndexID()
indexInfos = append(indexInfos, index.IndexInfo)
@ -1178,7 +1185,7 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
IsSorted: s.IsSorted(),
LastDeltaTimestamp: s.LastDeltaTimestamp(),
IndexInfo: lo.SliceToMap(s.Indexes(), func(info *segments.IndexedFieldInfo) (int64, *querypb.FieldIndexInfo) {
return info.IndexInfo.FieldID, info.IndexInfo
return info.IndexInfo.IndexID, info.IndexInfo
}),
FieldJsonIndexStats: s.GetFieldJSONIndexStats(),
})

View File

@ -4,6 +4,8 @@ import (
"fmt"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -13,13 +15,23 @@ type INVERTEDChecker struct {
}
func (c *INVERTEDChecker) CheckTrain(dataType schemapb.DataType, params map[string]string) error {
// check json index params
isJSONIndex := typeutil.IsJSONType(dataType)
if isJSONIndex {
if _, exist := params[common.JSONCastTypeKey]; !exist {
return merr.WrapErrParameterMissing(common.JSONCastTypeKey, "json index must specify cast type")
}
if _, exist := params[common.JSONPathKey]; !exist {
return merr.WrapErrParameterMissing(common.JSONPathKey, "json index must specify json path")
}
}
return c.scalarIndexChecker.CheckTrain(dataType, params)
}
func (c *INVERTEDChecker) CheckValidDataType(indexType IndexType, field *schemapb.FieldSchema) error {
dType := field.GetDataType()
if !typeutil.IsBoolType(dType) && !typeutil.IsArithmetic(dType) && !typeutil.IsStringType(dType) &&
!typeutil.IsArrayType(dType) {
!typeutil.IsArrayType(dType) && !typeutil.IsJSONType(dType) {
return fmt.Errorf("INVERTED are not supported on %s field", dType.String())
}
return nil

View File

@ -19,7 +19,7 @@ func Test_INVERTEDIndexChecker(t *testing.T) {
assert.NoError(t, c.CheckValidDataType(IndexINVERTED, &schemapb.FieldSchema{DataType: schemapb.DataType_Int64}))
assert.NoError(t, c.CheckValidDataType(IndexINVERTED, &schemapb.FieldSchema{DataType: schemapb.DataType_Float}))
assert.NoError(t, c.CheckValidDataType(IndexINVERTED, &schemapb.FieldSchema{DataType: schemapb.DataType_Array}))
assert.NoError(t, c.CheckValidDataType(IndexINVERTED, &schemapb.FieldSchema{DataType: schemapb.DataType_JSON}))
assert.Error(t, c.CheckValidDataType(IndexINVERTED, &schemapb.FieldSchema{DataType: schemapb.DataType_JSON}))
assert.Error(t, c.CheckValidDataType(IndexINVERTED, &schemapb.FieldSchema{DataType: schemapb.DataType_FloatVector}))
}

View File

@ -145,6 +145,9 @@ const (
IgnoreGrowing = "ignore_growing"
ConsistencyLevel = "consistency_level"
HintsKey = "hints"
JSONCastTypeKey = "json_cast_type"
JSONPathKey = "json_path"
)
// Doc-in-doc-out

View File

@ -609,7 +609,6 @@ func TestCreateIndexJsonField(t *testing.T) {
errMsg string
}
inxError := []scalarIndexError{
{index.NewInvertedIndex(), "INVERTED are not supported on JSON field"},
{index.NewSortedIndex(), "STL_SORT are only supported on numeric field"},
{index.NewTrieIndex(), "TRIE are only supported on varchar field"},
}
@ -911,7 +910,7 @@ func TestIndexNotExistName(t *testing.T) {
cp := hp.NewCreateCollectionParams(hp.Int64Vec)
_, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, cp, hp.TNewFieldsOption(), hp.TNewSchemaOption().TWithEnableDynamicField(true))
_, err1 := mc.CreateIndex(ctx, client.NewCreateIndexOption(schema.CollectionName, "aaa", idx))
common.CheckErr(t, err1, false, "cannot create index on non-exist field: aaa")
common.CheckErr(t, err1, false, "index HNSW only supports vector data type")
// describe index with not exist field name
_, errDesc := mc.DescribeIndex(ctx, client.NewDescribeIndexOption(schema.CollectionName, "aaa"))

View File

@ -21,6 +21,7 @@ from common.common_params import (
from utils.util_pymilvus import *
from common.constants import *
from pymilvus.exceptions import MilvusException
from pymilvus import DataType
prefix = "index"
default_schema = cf.gen_default_collection_schema()
@ -1230,14 +1231,11 @@ class TestIndexInvalid(TestcaseBase):
"""
target: test create scalar index on json field
method: 1.create collection, and create index
expected: Raise exception
expected: success
"""
collection_w = self.init_collection_general(prefix, is_index=False, vector_data_type=vector_data_type)[0]
scalar_index_params = {"index_type": "INVERTED"}
collection_w.create_index(ct.default_json_field_name, index_params=scalar_index_params,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1100,
ct.err_msg: "INVERTED are not supported on JSON field"})
scalar_index_params = {"index_type": "INVERTED", "json_cast_type": DataType.INT32, "json_path": ct.default_json_field_name+"['a']"}
collection_w.create_index(ct.default_json_field_name, index_params=scalar_index_params)
@pytest.mark.tags(CaseLabel.L1)
def test_create_inverted_index_on_array_field(self):