diff --git a/internal/core/src/exec/expression/BinaryRangeExpr.cpp b/internal/core/src/exec/expression/BinaryRangeExpr.cpp index 1c226e6925..3cbf51da87 100644 --- a/internal/core/src/exec/expression/BinaryRangeExpr.cpp +++ b/internal/core/src/exec/expression/BinaryRangeExpr.cpp @@ -551,9 +551,10 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJsonStats() { using GetType = std::conditional_t, std::string_view, ValueType>; - auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ - ? active_count_ - current_data_chunk_pos_ - : batch_size_; + auto real_batch_size = GetNextBatchSize(); + if (real_batch_size == 0) { + return nullptr; + } auto pointer = milvus::index::JsonPointer(expr_->column_.nested_path_); bool lower_inclusive = expr_->lower_inclusive_; bool upper_inclusive = expr_->upper_inclusive_; diff --git a/internal/core/src/exec/expression/JsonContainsByStatsTest.cpp b/internal/core/src/exec/expression/JsonContainsByStatsTest.cpp new file mode 100644 index 0000000000..7893ef4c2b --- /dev/null +++ b/internal/core/src/exec/expression/JsonContainsByStatsTest.cpp @@ -0,0 +1,216 @@ +// Copyright (C) 2019-2025 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include +#include +#include +#include + +#include "common/Schema.h" +#include "common/Types.h" +#include "expr/ITypeExpr.h" +#include "index/json_stats/JsonKeyStats.h" +#include "pb/plan.pb.h" +#include "plan/PlanNode.h" +#include "query/ExecPlanNodeVisitor.h" +#include "segcore/ChunkedSegmentSealedImpl.h" +#include "segcore/Types.h" +#include "storage/InsertData.h" +#include "storage/RemoteChunkManagerSingleton.h" +#include "storage/Util.h" +#include "test_utils/storage_test_utils.h" + +using namespace milvus; +using namespace milvus::index; + +namespace { + +std::shared_ptr +BuildAndLoadJsonKeyStats(const std::vector& json_strings, + const milvus::FieldId json_fid, + const std::string& root_path, + int64_t collection_id, + int64_t partition_id, + int64_t segment_id, + int64_t field_id, + int64_t build_id, + int64_t version_id) { + std::vector data; + data.reserve(json_strings.size()); + for (const auto& s : json_strings) { + data.emplace_back(simdjson::padded_string(s)); + } + + auto field_data = + std::make_shared>(DataType::JSON, false); + field_data->add_json_data(data); + + auto payload_reader = + std::make_shared(field_data); + storage::InsertData insert_data(payload_reader); + + proto::schema::FieldSchema field_schema; + field_schema.set_data_type(proto::schema::DataType::JSON); + field_schema.set_fieldid(json_fid.get()); + + storage::FieldDataMeta field_meta{ + collection_id, partition_id, segment_id, field_id, field_schema}; + storage::IndexMeta index_meta{segment_id, field_id, build_id, version_id}; + + insert_data.SetFieldDataMeta(field_meta); + insert_data.SetTimestamps(0, 100); + + auto serialized_bytes = insert_data.Serialize(storage::Remote); + + storage::StorageConfig storage_config; + storage_config.storage_type = "local"; + storage_config.root_path = root_path; + auto chunk_manager = storage::CreateChunkManager(storage_config); + + milvus_storage::ArrowFileSystemSingleton::GetInstance().Init( + milvus_storage::ArrowFileSystemConfig{ + .root_path = root_path, + .storage_type = "local", + }); + + auto log_path = fmt::format("/{}/{}/{}/{}/{}/{}", + root_path, + collection_id, + partition_id, + segment_id, + field_id, + 0); + chunk_manager->Write( + log_path, serialized_bytes.data(), serialized_bytes.size()); + + storage::FileManagerContext ctx(field_meta, index_meta, chunk_manager); + + Config build_config; + build_config[INSERT_FILES_KEY] = std::vector{log_path}; + + auto builder = std::make_shared(ctx, false); + builder->Build(build_config); + + auto create_index_result = builder->Upload(build_config); + auto index_files = create_index_result->GetIndexFiles(); + + Config load_config; + load_config["index_files"] = index_files; + load_config[milvus::LOAD_PRIORITY] = + milvus::proto::common::LoadPriority::HIGH; + + auto reader = std::make_shared(ctx, true); + reader->Load(milvus::tracer::TraceContext{}, load_config); + return reader; +} + +} // namespace + +TEST(JsonContainsByStatsTest, BasicContainsAnyOnArray) { + auto schema = std::make_shared(); + auto json_fid = schema->AddDebugField("json", DataType::JSON); + + auto segment = segcore::CreateSealedSegment(schema); + + const int N = 10000; + std::vector json_raw_data; + json_raw_data.reserve(N); + for (int i = 0; i < N; ++i) { + switch (i % 7) { + case 0: + json_raw_data.emplace_back(R"({"a": [1, 2, 3]})"); + break; + case 1: + json_raw_data.emplace_back(R"({"a": [4, 5]})"); + break; + case 2: + json_raw_data.emplace_back(R"({"a": [1]})"); + break; + case 3: + json_raw_data.emplace_back(R"({"a": []})"); + break; + case 4: + json_raw_data.emplace_back(R"({"b": [1, 2]})"); + break; + case 5: + json_raw_data.emplace_back(R"({"a": [10, 1, 20]})"); + break; + case 6: + json_raw_data.emplace_back(R"({"a": ["x", "y"]})"); + break; + } + } + + // Build and attach JsonKeyStats for the json field + const int64_t collection_id = 1001; + const int64_t partition_id = 2001; + const int64_t segment_id = 3001; + const int64_t field_id = json_fid.get(); + const int64_t build_id = 5001; + const int64_t version_id = 1; + const std::string root_path = "/tmp/test-json-contains-by-stats"; + + auto stats = BuildAndLoadJsonKeyStats(json_raw_data, + json_fid, + root_path, + collection_id, + partition_id, + segment_id, + field_id, + build_id, + version_id); + segment->LoadJsonStats(json_fid, stats); + + // Load raw field data into sealed segment for execution + std::vector jsons; + for (auto& s : json_raw_data) { + jsons.emplace_back(simdjson::padded_string(s)); + } + auto json_field = + std::make_shared>(DataType::JSON, false); + json_field->add_json_data(jsons); + + auto cm = milvus::storage::RemoteChunkManagerSingleton::GetInstance() + .GetRemoteChunkManager(); + auto load_info = PrepareSingleFieldInsertBinlog( + 0, 0, 0, json_fid.get(), {json_field}, cm); + segment->LoadFieldData(load_info); + + // Build json_contains expr: json['a'] contains any 1 + proto::plan::GenericValue value; + value.set_int64_val(1); + auto expr = std::make_shared( + expr::ColumnInfo( + json_fid, DataType::JSON, std::vector{"a"}, true), + proto::plan::JSONContainsExpr_JSONOp_ContainsAny, + true, + std::vector{value}); + + auto plan = + std::make_shared(DEFAULT_PLANNODE_ID, expr); + auto result = query::ExecuteQueryExpr( + plan, segment.get(), json_raw_data.size(), MAX_TIMESTAMP); + + // Expected matches: positions where (i % 7) in {0, 2, 5} + int64_t expected_count = (N / 7) * 3; + int rem = N % 7; + for (int i = 0; i < rem; ++i) { + if (i == 0 || i == 2 || i == 5) { + expected_count++; + } + } + EXPECT_EQ(result.count(), expected_count); + for (int i = 0; i < N; ++i) { + bool should_match = ((i % 7) == 0) || ((i % 7) == 2) || ((i % 7) == 5); + EXPECT_EQ(bool(result[i]), should_match); + } +} diff --git a/internal/core/src/exec/expression/JsonContainsExpr.cpp b/internal/core/src/exec/expression/JsonContainsExpr.cpp index 5245cd45fe..ede09d6511 100644 --- a/internal/core/src/exec/expression/JsonContainsExpr.cpp +++ b/internal/core/src/exec/expression/JsonContainsExpr.cpp @@ -396,10 +396,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsByStats() { std::conditional_t, std::string_view, ExprValueType>; - auto real_batch_size = - (current_data_chunk_pos_ + batch_size_ > active_count_) - ? active_count_ - current_data_chunk_pos_ - : batch_size_; + auto real_batch_size = GetNextBatchSize(); + if (real_batch_size == 0) { + return nullptr; + } std::unordered_set elements; auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); if (!arg_inited_) { @@ -607,10 +607,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArray(EvalCtx& context) { VectorPtr PhyJsonContainsFilterExpr::ExecJsonContainsArrayByStats() { - auto real_batch_size = - (current_data_chunk_pos_ + batch_size_ > active_count_) - ? active_count_ - current_data_chunk_pos_ - : batch_size_; + auto real_batch_size = GetNextBatchSize(); + if (real_batch_size == 0) { + return nullptr; + } std::vector elements; auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); for (auto const& element : expr_->vals_) { @@ -887,10 +887,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllByStats() { std::conditional_t, std::string_view, ExprValueType>; - auto real_batch_size = - (current_data_chunk_pos_ + batch_size_ > active_count_) - ? active_count_ - current_data_chunk_pos_ - : batch_size_; + auto real_batch_size = GetNextBatchSize(); + if (real_batch_size == 0) { + return nullptr; + } std::set elements; auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); for (auto const& element : expr_->vals_) { @@ -1136,10 +1136,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffType(EvalCtx& context) { VectorPtr PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffTypeByStats() { - auto real_batch_size = - (current_data_chunk_pos_ + batch_size_ > active_count_) - ? active_count_ - current_data_chunk_pos_ - : batch_size_; + auto real_batch_size = GetNextBatchSize(); + if (real_batch_size == 0) { + return nullptr; + } auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); auto elements = expr_->vals_; std::set elements_index; @@ -1401,10 +1401,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArray(EvalCtx& context) { VectorPtr PhyJsonContainsFilterExpr::ExecJsonContainsAllArrayByStats() { - auto real_batch_size = - (current_data_chunk_pos_ + batch_size_ > active_count_) - ? active_count_ - current_data_chunk_pos_ - : batch_size_; + auto real_batch_size = GetNextBatchSize(); + if (real_batch_size == 0) { + return nullptr; + } auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); std::vector elements; for (auto const& element : expr_->vals_) { @@ -1644,10 +1644,10 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType(EvalCtx& context) { VectorPtr PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffTypeByStats() { - auto real_batch_size = - (current_data_chunk_pos_ + batch_size_ > active_count_) - ? active_count_ - current_data_chunk_pos_ - : batch_size_; + auto real_batch_size = GetNextBatchSize(); + if (real_batch_size == 0) { + return nullptr; + } auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); auto elements = expr_->vals_; if (elements.empty()) { diff --git a/internal/core/src/exec/operator/FilterBitsNode.cpp b/internal/core/src/exec/operator/FilterBitsNode.cpp index 4855a2c2b7..550037001b 100644 --- a/internal/core/src/exec/operator/FilterBitsNode.cpp +++ b/internal/core/src/exec/operator/FilterBitsNode.cpp @@ -97,7 +97,10 @@ PhyFilterBitsNode::GetOutput() { } } bitset.flip(); - Assert(bitset.size() == need_process_rows_); + AssertInfo(bitset.size() == need_process_rows_, + "bitset size: {}, need_process_rows_: {}", + bitset.size(), + need_process_rows_); Assert(valid_bitset.size() == need_process_rows_); auto filter_ratio = bitset.size() != 0 ? 1 - float(bitset.count()) / bitset.size() : 0; diff --git a/internal/core/src/query/PlanProto.cpp b/internal/core/src/query/PlanProto.cpp index 426ec6b99d..a896e0c25f 100644 --- a/internal/core/src/query/PlanProto.cpp +++ b/internal/core/src/query/PlanProto.cpp @@ -35,8 +35,8 @@ ProtoParser::PlanOptionsFromProto( const proto::plan::PlanOption& plan_option_proto, PlanOptions& plan_options) { plan_options.expr_use_json_stats = plan_option_proto.expr_use_json_stats(); - LOG_INFO("plan_options.expr_use_json_stats: {}", - plan_options.expr_use_json_stats); + LOG_TRACE("plan_options.expr_use_json_stats: {}", + plan_options.expr_use_json_stats); } std::unique_ptr