fix: fix json_contains bug for stats (#44325)

#42533

Signed-off-by: luzhang <luzhang@zilliz.com>
Co-authored-by: luzhang <luzhang@zilliz.com>
This commit is contained in:
zhagnlu 2025-09-15 10:16:07 +08:00 committed by GitHub
parent b38013352d
commit e9bbb6aa9b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 250 additions and 30 deletions

View File

@ -551,9 +551,10 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJsonStats() {
using GetType = std::conditional_t<std::is_same_v<ValueType, std::string>,
std::string_view,
ValueType>;
auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_
? active_count_ - current_data_chunk_pos_
: batch_size_;
auto real_batch_size = GetNextBatchSize();
if (real_batch_size == 0) {
return nullptr;
}
auto pointer = milvus::index::JsonPointer(expr_->column_.nested_path_);
bool lower_inclusive = expr_->lower_inclusive_;
bool upper_inclusive = expr_->upper_inclusive_;

View File

@ -0,0 +1,216 @@
// Copyright (C) 2019-2025 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <gtest/gtest.h>
#include <cstdint>
#include <memory>
#include <string>
#include <vector>
#include "common/Schema.h"
#include "common/Types.h"
#include "expr/ITypeExpr.h"
#include "index/json_stats/JsonKeyStats.h"
#include "pb/plan.pb.h"
#include "plan/PlanNode.h"
#include "query/ExecPlanNodeVisitor.h"
#include "segcore/ChunkedSegmentSealedImpl.h"
#include "segcore/Types.h"
#include "storage/InsertData.h"
#include "storage/RemoteChunkManagerSingleton.h"
#include "storage/Util.h"
#include "test_utils/storage_test_utils.h"
using namespace milvus;
using namespace milvus::index;
namespace {
std::shared_ptr<JsonKeyStats>
BuildAndLoadJsonKeyStats(const std::vector<std::string>& json_strings,
const milvus::FieldId json_fid,
const std::string& root_path,
int64_t collection_id,
int64_t partition_id,
int64_t segment_id,
int64_t field_id,
int64_t build_id,
int64_t version_id) {
std::vector<milvus::Json> data;
data.reserve(json_strings.size());
for (const auto& s : json_strings) {
data.emplace_back(simdjson::padded_string(s));
}
auto field_data =
std::make_shared<FieldData<milvus::Json>>(DataType::JSON, false);
field_data->add_json_data(data);
auto payload_reader =
std::make_shared<milvus::storage::PayloadReader>(field_data);
storage::InsertData insert_data(payload_reader);
proto::schema::FieldSchema field_schema;
field_schema.set_data_type(proto::schema::DataType::JSON);
field_schema.set_fieldid(json_fid.get());
storage::FieldDataMeta field_meta{
collection_id, partition_id, segment_id, field_id, field_schema};
storage::IndexMeta index_meta{segment_id, field_id, build_id, version_id};
insert_data.SetFieldDataMeta(field_meta);
insert_data.SetTimestamps(0, 100);
auto serialized_bytes = insert_data.Serialize(storage::Remote);
storage::StorageConfig storage_config;
storage_config.storage_type = "local";
storage_config.root_path = root_path;
auto chunk_manager = storage::CreateChunkManager(storage_config);
milvus_storage::ArrowFileSystemSingleton::GetInstance().Init(
milvus_storage::ArrowFileSystemConfig{
.root_path = root_path,
.storage_type = "local",
});
auto log_path = fmt::format("/{}/{}/{}/{}/{}/{}",
root_path,
collection_id,
partition_id,
segment_id,
field_id,
0);
chunk_manager->Write(
log_path, serialized_bytes.data(), serialized_bytes.size());
storage::FileManagerContext ctx(field_meta, index_meta, chunk_manager);
Config build_config;
build_config[INSERT_FILES_KEY] = std::vector<std::string>{log_path};
auto builder = std::make_shared<JsonKeyStats>(ctx, false);
builder->Build(build_config);
auto create_index_result = builder->Upload(build_config);
auto index_files = create_index_result->GetIndexFiles();
Config load_config;
load_config["index_files"] = index_files;
load_config[milvus::LOAD_PRIORITY] =
milvus::proto::common::LoadPriority::HIGH;
auto reader = std::make_shared<JsonKeyStats>(ctx, true);
reader->Load(milvus::tracer::TraceContext{}, load_config);
return reader;
}
} // namespace
TEST(JsonContainsByStatsTest, BasicContainsAnyOnArray) {
auto schema = std::make_shared<Schema>();
auto json_fid = schema->AddDebugField("json", DataType::JSON);
auto segment = segcore::CreateSealedSegment(schema);
const int N = 10000;
std::vector<std::string> json_raw_data;
json_raw_data.reserve(N);
for (int i = 0; i < N; ++i) {
switch (i % 7) {
case 0:
json_raw_data.emplace_back(R"({"a": [1, 2, 3]})");
break;
case 1:
json_raw_data.emplace_back(R"({"a": [4, 5]})");
break;
case 2:
json_raw_data.emplace_back(R"({"a": [1]})");
break;
case 3:
json_raw_data.emplace_back(R"({"a": []})");
break;
case 4:
json_raw_data.emplace_back(R"({"b": [1, 2]})");
break;
case 5:
json_raw_data.emplace_back(R"({"a": [10, 1, 20]})");
break;
case 6:
json_raw_data.emplace_back(R"({"a": ["x", "y"]})");
break;
}
}
// Build and attach JsonKeyStats for the json field
const int64_t collection_id = 1001;
const int64_t partition_id = 2001;
const int64_t segment_id = 3001;
const int64_t field_id = json_fid.get();
const int64_t build_id = 5001;
const int64_t version_id = 1;
const std::string root_path = "/tmp/test-json-contains-by-stats";
auto stats = BuildAndLoadJsonKeyStats(json_raw_data,
json_fid,
root_path,
collection_id,
partition_id,
segment_id,
field_id,
build_id,
version_id);
segment->LoadJsonStats(json_fid, stats);
// Load raw field data into sealed segment for execution
std::vector<milvus::Json> jsons;
for (auto& s : json_raw_data) {
jsons.emplace_back(simdjson::padded_string(s));
}
auto json_field =
std::make_shared<FieldData<milvus::Json>>(DataType::JSON, false);
json_field->add_json_data(jsons);
auto cm = milvus::storage::RemoteChunkManagerSingleton::GetInstance()
.GetRemoteChunkManager();
auto load_info = PrepareSingleFieldInsertBinlog(
0, 0, 0, json_fid.get(), {json_field}, cm);
segment->LoadFieldData(load_info);
// Build json_contains expr: json['a'] contains any 1
proto::plan::GenericValue value;
value.set_int64_val(1);
auto expr = std::make_shared<expr::JsonContainsExpr>(
expr::ColumnInfo(
json_fid, DataType::JSON, std::vector<std::string>{"a"}, true),
proto::plan::JSONContainsExpr_JSONOp_ContainsAny,
true,
std::vector<proto::plan::GenericValue>{value});
auto plan =
std::make_shared<plan::FilterBitsNode>(DEFAULT_PLANNODE_ID, expr);
auto result = query::ExecuteQueryExpr(
plan, segment.get(), json_raw_data.size(), MAX_TIMESTAMP);
// Expected matches: positions where (i % 7) in {0, 2, 5}
int64_t expected_count = (N / 7) * 3;
int rem = N % 7;
for (int i = 0; i < rem; ++i) {
if (i == 0 || i == 2 || i == 5) {
expected_count++;
}
}
EXPECT_EQ(result.count(), expected_count);
for (int i = 0; i < N; ++i) {
bool should_match = ((i % 7) == 0) || ((i % 7) == 2) || ((i % 7) == 5);
EXPECT_EQ(bool(result[i]), should_match);
}
}

View File

@ -396,10 +396,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsByStats() {
std::conditional_t<std::is_same_v<ExprValueType, std::string>,
std::string_view,
ExprValueType>;
auto real_batch_size =
(current_data_chunk_pos_ + batch_size_ > active_count_)
? active_count_ - current_data_chunk_pos_
: batch_size_;
auto real_batch_size = GetNextBatchSize();
if (real_batch_size == 0) {
return nullptr;
}
std::unordered_set<GetType> elements;
auto pointer = milvus::Json::pointer(expr_->column_.nested_path_);
if (!arg_inited_) {
@ -607,10 +607,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArray(EvalCtx& context) {
VectorPtr
PhyJsonContainsFilterExpr::ExecJsonContainsArrayByStats() {
auto real_batch_size =
(current_data_chunk_pos_ + batch_size_ > active_count_)
? active_count_ - current_data_chunk_pos_
: batch_size_;
auto real_batch_size = GetNextBatchSize();
if (real_batch_size == 0) {
return nullptr;
}
std::vector<proto::plan::Array> elements;
auto pointer = milvus::Json::pointer(expr_->column_.nested_path_);
for (auto const& element : expr_->vals_) {
@ -887,10 +887,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllByStats() {
std::conditional_t<std::is_same_v<ExprValueType, std::string>,
std::string_view,
ExprValueType>;
auto real_batch_size =
(current_data_chunk_pos_ + batch_size_ > active_count_)
? active_count_ - current_data_chunk_pos_
: batch_size_;
auto real_batch_size = GetNextBatchSize();
if (real_batch_size == 0) {
return nullptr;
}
std::set<GetType> elements;
auto pointer = milvus::Json::pointer(expr_->column_.nested_path_);
for (auto const& element : expr_->vals_) {
@ -1136,10 +1136,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffType(EvalCtx& context) {
VectorPtr
PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffTypeByStats() {
auto real_batch_size =
(current_data_chunk_pos_ + batch_size_ > active_count_)
? active_count_ - current_data_chunk_pos_
: batch_size_;
auto real_batch_size = GetNextBatchSize();
if (real_batch_size == 0) {
return nullptr;
}
auto pointer = milvus::Json::pointer(expr_->column_.nested_path_);
auto elements = expr_->vals_;
std::set<int> elements_index;
@ -1401,10 +1401,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArray(EvalCtx& context) {
VectorPtr
PhyJsonContainsFilterExpr::ExecJsonContainsAllArrayByStats() {
auto real_batch_size =
(current_data_chunk_pos_ + batch_size_ > active_count_)
? active_count_ - current_data_chunk_pos_
: batch_size_;
auto real_batch_size = GetNextBatchSize();
if (real_batch_size == 0) {
return nullptr;
}
auto pointer = milvus::Json::pointer(expr_->column_.nested_path_);
std::vector<proto::plan::Array> elements;
for (auto const& element : expr_->vals_) {
@ -1644,10 +1644,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType(EvalCtx& context) {
VectorPtr
PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffTypeByStats() {
auto real_batch_size =
(current_data_chunk_pos_ + batch_size_ > active_count_)
? active_count_ - current_data_chunk_pos_
: batch_size_;
auto real_batch_size = GetNextBatchSize();
if (real_batch_size == 0) {
return nullptr;
}
auto pointer = milvus::Json::pointer(expr_->column_.nested_path_);
auto elements = expr_->vals_;
if (elements.empty()) {

View File

@ -97,7 +97,10 @@ PhyFilterBitsNode::GetOutput() {
}
}
bitset.flip();
Assert(bitset.size() == need_process_rows_);
AssertInfo(bitset.size() == need_process_rows_,
"bitset size: {}, need_process_rows_: {}",
bitset.size(),
need_process_rows_);
Assert(valid_bitset.size() == need_process_rows_);
auto filter_ratio =
bitset.size() != 0 ? 1 - float(bitset.count()) / bitset.size() : 0;

View File

@ -35,8 +35,8 @@ ProtoParser::PlanOptionsFromProto(
const proto::plan::PlanOption& plan_option_proto,
PlanOptions& plan_options) {
plan_options.expr_use_json_stats = plan_option_proto.expr_use_json_stats();
LOG_INFO("plan_options.expr_use_json_stats: {}",
plan_options.expr_use_json_stats);
LOG_TRACE("plan_options.expr_use_json_stats: {}",
plan_options.expr_use_json_stats);
}
std::unique_ptr<VectorPlanNode>