From 7040ba1c1200e0072da3469409982cbcdafce8cf Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Tue, 4 Mar 2025 11:56:02 +0800 Subject: [PATCH] enhance: make json path index support term filter (#40140) issue: #35528 --------- Signed-off-by: sunby --- .../core/src/exec/expression/TermExpr.cpp | 6 ++- internal/core/src/exec/expression/TermExpr.h | 4 +- internal/core/src/segcore/arrow_fs_c.cpp | 3 +- internal/core/src/segcore/arrow_fs_c.h | 1 - internal/core/src/segcore/column_groups_c.cpp | 36 ++++++++------ internal/core/src/segcore/column_groups_c.h | 12 +++-- internal/core/src/segcore/packed_reader_c.cpp | 13 +++-- internal/core/src/segcore/packed_writer_c.cpp | 14 ++++-- .../core/unittest/test_column_groups_c.cpp | 39 +++++++-------- internal/core/unittest/test_expr.cpp | 49 +++++++++++++++++++ internal/core/unittest/test_packed_c.cpp | 13 +++-- 11 files changed, 134 insertions(+), 56 deletions(-) diff --git a/internal/core/src/exec/expression/TermExpr.cpp b/internal/core/src/exec/expression/TermExpr.cpp index ad8abf01d1..cc05414954 100644 --- a/internal/core/src/exec/expression/TermExpr.cpp +++ b/internal/core/src/exec/expression/TermExpr.cpp @@ -237,7 +237,11 @@ PhyTermFilterExpr::ExecVisitorImplTemplateJson(OffsetVector* input) { if (expr_->is_in_field_) { return ExecTermJsonVariableInField(input); } else { - return ExecTermJsonFieldInVariable(input); + if (is_index_mode_) { + return ExecVisitorImplForIndex(input); + } else { + return ExecTermJsonFieldInVariable(input); + } } } diff --git a/internal/core/src/exec/expression/TermExpr.h b/internal/core/src/exec/expression/TermExpr.h index 2aca8cdeb2..100e6bc2ca 100644 --- a/internal/core/src/exec/expression/TermExpr.h +++ b/internal/core/src/exec/expression/TermExpr.h @@ -63,7 +63,9 @@ class PhyTermFilterExpr : public SegmentExpr { segment, expr->column_.field_id_, expr->column_.nested_path_, - DataType::NONE, + expr->vals_.size() == 0 + ? DataType::NONE + : FromValCase(expr->vals_[0].val_case()), active_count, batch_size), expr_(expr), diff --git a/internal/core/src/segcore/arrow_fs_c.cpp b/internal/core/src/segcore/arrow_fs_c.cpp index 20ddb1b5f2..5a0faaf1aa 100644 --- a/internal/core/src/segcore/arrow_fs_c.cpp +++ b/internal/core/src/segcore/arrow_fs_c.cpp @@ -58,7 +58,8 @@ InitRemoteArrowFileSystemSingleton(CStorageConfig c_storage_config) { conf.useIAM = c_storage_config.useIAM; conf.useVirtualHost = c_storage_config.useVirtualHost; conf.requestTimeoutMs = c_storage_config.requestTimeoutMs; - conf.gcp_credential_json = std::string(c_storage_config.gcp_credential_json); + conf.gcp_credential_json = + std::string(c_storage_config.gcp_credential_json); conf.use_custom_part_upload = c_storage_config.use_custom_part_upload; milvus_storage::ArrowFileSystemSingleton::GetInstance().Init(conf); diff --git a/internal/core/src/segcore/arrow_fs_c.h b/internal/core/src/segcore/arrow_fs_c.h index 61ad419c92..9dd4fc9083 100644 --- a/internal/core/src/segcore/arrow_fs_c.h +++ b/internal/core/src/segcore/arrow_fs_c.h @@ -34,7 +34,6 @@ InitRemoteArrowFileSystemSingleton(CStorageConfig c_storage_config); void CleanRemoteArrowFileSystemSingleton(); - #ifdef __cplusplus } #endif diff --git a/internal/core/src/segcore/column_groups_c.cpp b/internal/core/src/segcore/column_groups_c.cpp index 7d7c3c9452..7646e0c041 100644 --- a/internal/core/src/segcore/column_groups_c.cpp +++ b/internal/core/src/segcore/column_groups_c.cpp @@ -21,27 +21,33 @@ using VecVecInt = std::vector>; extern "C" { -CColumnGroups NewCColumnGroups() { - auto vv = std::make_unique(); - return vv.release(); +CColumnGroups +NewCColumnGroups() { + auto vv = std::make_unique(); + return vv.release(); } -void AddCColumnGroup(CColumnGroups cgs, int* group, int group_size) { - if (!cgs || !group) - return; +void +AddCColumnGroup(CColumnGroups cgs, int* group, int group_size) { + if (!cgs || !group) + return; - auto vv = static_cast(cgs); - std::vector new_group(group, group + group_size); - vv->emplace_back(std::move(new_group)); + auto vv = static_cast(cgs); + std::vector new_group(group, group + group_size); + vv->emplace_back(std::move(new_group)); } -int CColumnGroupsSize(CColumnGroups cgs) { - if (!cgs) - return 0; +int +CColumnGroupsSize(CColumnGroups cgs) { + if (!cgs) + return 0; - auto vv = static_cast(cgs); - return static_cast(vv->size()); + auto vv = static_cast(cgs); + return static_cast(vv->size()); } -void FreeCColumnGroups(CColumnGroups cgs) { delete static_cast(cgs); } +void +FreeCColumnGroups(CColumnGroups cgs) { + delete static_cast(cgs); +} } \ No newline at end of file diff --git a/internal/core/src/segcore/column_groups_c.h b/internal/core/src/segcore/column_groups_c.h index fca407d417..571878b948 100644 --- a/internal/core/src/segcore/column_groups_c.h +++ b/internal/core/src/segcore/column_groups_c.h @@ -22,13 +22,17 @@ extern "C" { typedef void* CColumnGroups; -CColumnGroups NewCColumnGroups(); +CColumnGroups +NewCColumnGroups(); -void AddCColumnGroup(CColumnGroups cgs, int* group, int group_size); +void +AddCColumnGroup(CColumnGroups cgs, int* group, int group_size); -int CColumnGroupsSize(CColumnGroups cgs); +int +CColumnGroupsSize(CColumnGroups cgs); -void FreeCColumnGroups(CColumnGroups cgs); +void +FreeCColumnGroups(CColumnGroups cgs); #ifdef __cplusplus } diff --git a/internal/core/src/segcore/packed_reader_c.cpp b/internal/core/src/segcore/packed_reader_c.cpp index faf9c466ff..eda3ab0cfd 100644 --- a/internal/core/src/segcore/packed_reader_c.cpp +++ b/internal/core/src/segcore/packed_reader_c.cpp @@ -25,7 +25,6 @@ #include "common/EasyAssert.h" #include "common/type_c.h" - CStatus NewPackedReader(char** paths, int64_t num_paths, @@ -34,9 +33,11 @@ NewPackedReader(char** paths, CPackedReader* c_packed_reader) { try { auto truePaths = std::vector(paths, paths + num_paths); - auto trueFs = milvus_storage::ArrowFileSystemSingleton::GetInstance().GetArrowFileSystem(); + auto trueFs = milvus_storage::ArrowFileSystemSingleton::GetInstance() + .GetArrowFileSystem(); if (!trueFs) { - return milvus::FailureCStatus(milvus::ErrorCode::FileReadFailed, "Failed to get filesystem"); + return milvus::FailureCStatus(milvus::ErrorCode::FileReadFailed, + "Failed to get filesystem"); } auto trueSchema = arrow::ImportSchema(schema).ValueOrDie(); std::set needed_columns; @@ -63,7 +64,8 @@ ReadNext(CPackedReader c_packed_reader, std::shared_ptr record_batch; auto status = packed_reader->ReadNext(&record_batch); if (!status.ok()) { - return milvus::FailureCStatus(milvus::ErrorCode::FileReadFailed, status.ToString()); + return milvus::FailureCStatus(milvus::ErrorCode::FileReadFailed, + status.ToString()); } if (record_batch == nullptr) { // end of file @@ -75,7 +77,8 @@ ReadNext(CPackedReader c_packed_reader, auto status = arrow::ExportRecordBatch( *record_batch, arr.get(), schema.get()); if (!status.ok()) { - return milvus::FailureCStatus(milvus::ErrorCode::FileReadFailed, status.ToString()); + return milvus::FailureCStatus(milvus::ErrorCode::FileReadFailed, + status.ToString()); } *out_array = arr.release(); *out_schema = schema.release(); diff --git a/internal/core/src/segcore/packed_writer_c.cpp b/internal/core/src/segcore/packed_writer_c.cpp index 2b86602024..70925cac70 100644 --- a/internal/core/src/segcore/packed_writer_c.cpp +++ b/internal/core/src/segcore/packed_writer_c.cpp @@ -39,14 +39,16 @@ NewPackedWriter(struct ArrowSchema* schema, conf.part_size = part_upload_size; auto trueFs = milvus_storage::ArrowFileSystemSingleton::GetInstance() - .GetArrowFileSystem(); + .GetArrowFileSystem(); if (!trueFs) { - return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed, "Failed to get filesystem"); + return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed, + "Failed to get filesystem"); } auto trueSchema = arrow::ImportSchema(schema).ValueOrDie(); - auto columnGroups = *static_cast>*>(column_groups); + auto columnGroups = + *static_cast>*>(column_groups); auto writer = std::make_unique( trueFs, truePaths, trueSchema, conf, columnGroups, buffer_size); @@ -70,7 +72,8 @@ WriteRecordBatch(CPackedWriter c_packed_writer, arrow::ImportRecordBatch(array, schema).ValueOrDie(); auto status = packed_writer->Write(record_batch); if (!status.ok()) { - return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed, status.ToString()); + return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed, + status.ToString()); } return milvus::SuccessCStatus(); } catch (std::exception& e) { @@ -87,7 +90,8 @@ CloseWriter(CPackedWriter c_packed_writer) { auto status = packed_writer->Close(); delete packed_writer; if (!status.ok()) { - return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed, status.ToString()); + return milvus::FailureCStatus(milvus::ErrorCode::FileWriteFailed, + status.ToString()); } return milvus::SuccessCStatus(); } catch (std::exception& e) { diff --git a/internal/core/unittest/test_column_groups_c.cpp b/internal/core/unittest/test_column_groups_c.cpp index d32852bbe2..7ac7f02869 100644 --- a/internal/core/unittest/test_column_groups_c.cpp +++ b/internal/core/unittest/test_column_groups_c.cpp @@ -19,29 +19,28 @@ #include #include "segcore/column_groups_c.h" - TEST(CColumnGroups, TestCColumnGroups) { - CColumnGroups cgs = NewCColumnGroups(); - int group1[] = {2, 4, 5}; - int group2[] = {0, 1}; - int group3[] = {3, 6, 7, 8}; + CColumnGroups cgs = NewCColumnGroups(); + int group1[] = {2, 4, 5}; + int group2[] = {0, 1}; + int group3[] = {3, 6, 7, 8}; - int* test_groups[] = {group1, group2, group3}; - int group_sizes[] = {3, 2, 4}; + int* test_groups[] = {group1, group2, group3}; + int group_sizes[] = {3, 2, 4}; - for (int i = 0; i < 3; i++) { - AddCColumnGroup(cgs, test_groups[i], group_sizes[i]); - } - - ASSERT_EQ(CColumnGroupsSize(cgs), 3); - auto vv = static_cast>*>(cgs); - - for (int i = 0; i < 3; i++) { - ASSERT_EQ(vv->at(i).size(), group_sizes[i]); - for (int j = 0; j < group_sizes[i]; j++) { - EXPECT_EQ(vv->at(i)[j], test_groups[i][j]); + for (int i = 0; i < 3; i++) { + AddCColumnGroup(cgs, test_groups[i], group_sizes[i]); } - } - FreeCColumnGroups(cgs); + ASSERT_EQ(CColumnGroupsSize(cgs), 3); + auto vv = static_cast>*>(cgs); + + for (int i = 0; i < 3; i++) { + ASSERT_EQ(vv->at(i).size(), group_sizes[i]); + for (int j = 0; j < group_sizes[i]; j++) { + EXPECT_EQ(vv->at(i)[j], test_groups[i][j]); + } + } + + FreeCColumnGroups(cgs); } \ No newline at end of file diff --git a/internal/core/unittest/test_expr.cpp b/internal/core/unittest/test_expr.cpp index bc3752b667..13865f34c6 100644 --- a/internal/core/unittest/test_expr.cpp +++ b/internal/core/unittest/test_expr.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -16125,4 +16126,52 @@ TYPED_TEST(JsonIndexTestFixture, TestJsonIndexUnaryExpr) { std::make_shared(DEFAULT_PLANNODE_ID, unary_expr); final = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP); EXPECT_EQ(final.count(), N); + + auto term_expr = std::make_shared( + expr::ColumnInfo(json_fid, DataType::JSON, {this->json_path.substr(1)}), + std::vector(), + false); + plan = + std::make_shared(DEFAULT_PLANNODE_ID, term_expr); + final = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP); + EXPECT_EQ(final.count(), 0); + + using DT = std::conditional_t< + std::is_same_v, + std::string_view, + typename TestFixture::DataType>; + std::vector vals; + int expect_count = 10; + if constexpr (std::is_same_v) { + proto::plan::GenericValue val; + val.set_bool_val(true); + vals.push_back(val); + val.set_bool_val(false); + vals.push_back(val); + expect_count = N; + } else { + for (int i = 0; i < expect_count; ++i) { + proto::plan::GenericValue val; + + auto v = jsons[i].at
(this->json_path).value(); + if constexpr (std::is_same_v) { + val.set_int64_val(v); + } else if constexpr (std::is_same_v) { + val.set_float_val(v); + } else if constexpr (std::is_same_v) { + val.set_string_val(std::string(v)); + } else if constexpr (std::is_same_v) { + val.set_bool_val(i % 2 == 0); + } + vals.push_back(val); + } + } + term_expr = std::make_shared( + expr::ColumnInfo(json_fid, DataType::JSON, {this->json_path.substr(1)}), + vals, + false); + plan = + std::make_shared(DEFAULT_PLANNODE_ID, term_expr); + final = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP); + EXPECT_EQ(final.count(), expect_count); } diff --git a/internal/core/unittest/test_packed_c.cpp b/internal/core/unittest/test_packed_c.cpp index 93217ddc58..9ff6d692bf 100644 --- a/internal/core/unittest/test_packed_c.cpp +++ b/internal/core/unittest/test_packed_c.cpp @@ -36,7 +36,7 @@ TEST(CPackedTest, PackedWriterAndReader) { auto status = builder->AppendValues(test_data.begin(), test_data.end()); ASSERT_TRUE(status.ok()); auto res = builder->Finish(); - ASSERT_TRUE(res.ok()); + ASSERT_TRUE(res.ok()); std::shared_ptr array = res.ValueOrDie(); auto schema = arrow::schema({arrow::field("int64", arrow::int64())}); @@ -57,7 +57,13 @@ TEST(CPackedTest, PackedWriterAndReader) { auto c_status = InitLocalArrowFileSystemSingleton(path); EXPECT_EQ(c_status.error_code, 0); CPackedWriter c_packed_writer = nullptr; - c_status = NewPackedWriter(&c_write_schema, buffer_size, paths, 1, part_upload_size, cgs, &c_packed_writer); + c_status = NewPackedWriter(&c_write_schema, + buffer_size, + paths, + 1, + part_upload_size, + cgs, + &c_packed_writer); EXPECT_EQ(c_status.error_code, 0); EXPECT_NE(c_packed_writer, nullptr); @@ -74,7 +80,8 @@ TEST(CPackedTest, PackedWriterAndReader) { struct ArrowSchema c_read_schema; ASSERT_TRUE(arrow::ExportSchema(*schema, &c_read_schema).ok()); CPackedReader c_packed_reader = nullptr; - c_status = NewPackedReader(paths, 1, &c_read_schema, buffer_size, &c_packed_reader); + c_status = NewPackedReader( + paths, 1, &c_read_schema, buffer_size, &c_packed_reader); EXPECT_EQ(c_status.error_code, 0); EXPECT_NE(c_packed_reader, nullptr);