From 031acf5711acd58d2841722b946dba4176ab5d02 Mon Sep 17 00:00:00 2001 From: zhagnlu Date: Wed, 31 Dec 2025 10:39:21 +0800 Subject: [PATCH] enhance: convert jsonstats translator to bson_index translator (#45036) issue: #42533 Signed-off-by: luzhang Co-authored-by: luzhang --- .../src/exec/expression/BinaryRangeExpr.cpp | 8 +- .../src/exec/expression/BinaryRangeExpr.h | 4 +- .../core/src/exec/expression/ExistsExpr.cpp | 7 +- .../core/src/exec/expression/ExistsExpr.h | 5 +- internal/core/src/exec/expression/Expr.h | 4 +- .../expression/JsonContainsByStatsTest.cpp | 23 +--- .../src/exec/expression/JsonContainsExpr.cpp | 48 +++---- .../src/exec/expression/JsonContainsExpr.h | 4 +- .../core/src/exec/expression/TermExpr.cpp | 22 ++-- internal/core/src/exec/expression/TermExpr.h | 5 +- .../core/src/exec/expression/UnaryExpr.cpp | 10 +- internal/core/src/exec/expression/UnaryExpr.h | 4 +- internal/core/src/index/Meta.h | 1 + .../src/index/json_stats/JsonKeyStats.cpp | 61 +++++++-- .../core/src/index/json_stats/JsonKeyStats.h | 53 ++++---- .../src/index/json_stats/bson_inverted.cpp | 45 +++---- .../core/src/index/json_stats/bson_inverted.h | 15 ++- .../src/segcore/ChunkedSegmentSealedImpl.h | 37 +++--- .../core/src/segcore/SegmentGrowingImpl.h | 19 --- .../core/src/segcore/SegmentInterface.cpp | 11 ++ internal/core/src/segcore/SegmentInterface.h | 15 +-- internal/core/src/segcore/SegmentSealed.h | 6 + internal/core/src/segcore/segment_c.cpp | 38 +++--- .../BsonInvertedIndexTranslator.cpp | 124 ++++++++++++++++++ .../BsonInvertedIndexTranslator.h} | 53 ++++---- .../JsonStatsTranslator.cpp | 115 ---------------- .../test_json_stats/test_json_key_stats.cpp | 48 +++---- internal/querynodev2/segments/segment.go | 2 +- 28 files changed, 414 insertions(+), 373 deletions(-) create mode 100644 internal/core/src/segcore/storagev1translator/BsonInvertedIndexTranslator.cpp rename internal/core/src/segcore/{storagev2translator/JsonStatsTranslator.h => storagev1translator/BsonInvertedIndexTranslator.h} (56%) delete mode 100644 internal/core/src/segcore/storagev2translator/JsonStatsTranslator.cpp diff --git a/internal/core/src/exec/expression/BinaryRangeExpr.cpp b/internal/core/src/exec/expression/BinaryRangeExpr.cpp index bd01d855f3..a7a74797ac 100644 --- a/internal/core/src/exec/expression/BinaryRangeExpr.cpp +++ b/internal/core/src/exec/expression/BinaryRangeExpr.cpp @@ -611,9 +611,8 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJsonStats() { segment_->type() == SegmentType::Sealed) { auto* segment = dynamic_cast(segment_); auto field_id = expr_->column_.field_id_; - pinned_json_stats_ = segment->GetJsonStats(op_ctx_, field_id); - auto* index = pinned_json_stats_.get(); - Assert(index != nullptr); + auto index = segment->GetJsonStats(op_ctx_, field_id); + Assert(index.get() != nullptr); cached_index_chunk_res_ = std::make_shared(active_count_); cached_index_chunk_valid_res_ = @@ -753,7 +752,8 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJsonStats() { } } }; - index->ExecuteForSharedData(op_ctx_, pointer, shared_executor); + index->ExecuteForSharedData( + op_ctx_, bson_index_, pointer, shared_executor); cached_index_chunk_id_ = 0; } diff --git a/internal/core/src/exec/expression/BinaryRangeExpr.h b/internal/core/src/exec/expression/BinaryRangeExpr.h index 21daf95fb6..6422cf397c 100644 --- a/internal/core/src/exec/expression/BinaryRangeExpr.h +++ b/internal/core/src/exec/expression/BinaryRangeExpr.h @@ -24,6 +24,8 @@ #include "exec/expression/Expr.h" #include "exec/expression/Element.h" #include "segcore/SegmentInterface.h" +#include "cachinglayer/CacheSlot.h" +#include "index/json_stats/bson_inverted.h" namespace milvus { namespace exec { @@ -330,7 +332,7 @@ class PhyBinaryRangeFilterExpr : public SegmentExpr { SingleElement lower_arg_; SingleElement upper_arg_; bool arg_inited_{false}; - PinWrapper pinned_json_stats_{nullptr}; + PinWrapper bson_index_{nullptr}; }; } //namespace exec } // namespace milvus diff --git a/internal/core/src/exec/expression/ExistsExpr.cpp b/internal/core/src/exec/expression/ExistsExpr.cpp index af29224b7e..fb23948874 100644 --- a/internal/core/src/exec/expression/ExistsExpr.cpp +++ b/internal/core/src/exec/expression/ExistsExpr.cpp @@ -20,6 +20,7 @@ #include "common/Types.h" #include "common/Vector.h" #include "index/JsonInvertedIndex.h" +#include "index/json_stats/JsonKeyStats.h" namespace milvus { namespace exec { @@ -207,9 +208,8 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegmentByStats() { cached_index_chunk_id_ = 0; auto segment = static_cast(segment_); auto field_id = expr_->column_.field_id_; - pinned_json_stats_ = segment->GetJsonStats(op_ctx_, field_id); - auto* index = pinned_json_stats_.get(); - Assert(index != nullptr); + auto index = segment->GetJsonStats(op_ctx_, field_id); + Assert(index.get() != nullptr); cached_index_chunk_res_ = std::make_shared(active_count_); TargetBitmapView res_view(*cached_index_chunk_res_); @@ -228,6 +228,7 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegmentByStats() { // which match the semantics of exists in Json.h index->ExecuteForSharedData( op_ctx_, + bson_index_, pointer, [&](BsonView bson, uint32_t row_id, uint32_t offset) { res_view[row_id] = !bson.IsBsonValueEmpty(offset); diff --git a/internal/core/src/exec/expression/ExistsExpr.h b/internal/core/src/exec/expression/ExistsExpr.h index 348b2c697e..f53826f9c7 100644 --- a/internal/core/src/exec/expression/ExistsExpr.h +++ b/internal/core/src/exec/expression/ExistsExpr.h @@ -23,7 +23,8 @@ #include "common/Vector.h" #include "exec/expression/Expr.h" #include "segcore/SegmentInterface.h" -#include "index/json_stats/JsonKeyStats.h" +#include "index/json_stats/bson_inverted.h" +#include "cachinglayer/CacheSlot.h" namespace milvus { namespace exec { @@ -90,7 +91,7 @@ class PhyExistsFilterExpr : public SegmentExpr { private: std::shared_ptr expr_; - PinWrapper pinned_json_stats_{nullptr}; + PinWrapper bson_index_{nullptr}; }; } //namespace exec } // namespace milvus diff --git a/internal/core/src/exec/expression/Expr.h b/internal/core/src/exec/expression/Expr.h index b12f6af6ed..8a8160a8e6 100644 --- a/internal/core/src/exec/expression/Expr.h +++ b/internal/core/src/exec/expression/Expr.h @@ -1705,7 +1705,9 @@ class SegmentExpr : public Expr { bool HasJsonStats(FieldId field_id) const { return segment_->type() == SegmentType::Sealed && - segment_->GetJsonStats(op_ctx_, field_id).get() != nullptr; + static_cast(segment_) + ->GetJsonStats(op_ctx_, field_id) + .get() != nullptr; } bool diff --git a/internal/core/src/exec/expression/JsonContainsByStatsTest.cpp b/internal/core/src/exec/expression/JsonContainsByStatsTest.cpp index 7b50393679..a89fbffab3 100644 --- a/internal/core/src/exec/expression/JsonContainsByStatsTest.cpp +++ b/internal/core/src/exec/expression/JsonContainsByStatsTest.cpp @@ -19,8 +19,6 @@ #include "common/Types.h" #include "expr/ITypeExpr.h" #include "index/json_stats/JsonKeyStats.h" -#include "cachinglayer/Manager.h" -#include "segcore/storagev2translator/JsonStatsTranslator.h" #include "pb/plan.pb.h" #include "plan/PlanNode.h" #include "query/ExecPlanNodeVisitor.h" @@ -36,7 +34,7 @@ using namespace milvus::index; namespace { -milvus::index::CacheJsonKeyStatsPtr +std::shared_ptr BuildAndLoadJsonKeyStats(const std::vector& json_strings, const milvus::FieldId json_fid, const std::string& root_path, @@ -111,22 +109,9 @@ BuildAndLoadJsonKeyStats(const std::vector& json_strings, load_config[milvus::LOAD_PRIORITY] = milvus::proto::common::LoadPriority::HIGH; - milvus::segcore::storagev2translator::JsonStatsLoadInfo load_info{ - /* enable_mmap */ false, - /* mmap_dir_path */ "", - /* segment_id */ segment_id, - /* field_id */ field_id, - /* stats_size */ 0}; - - std::unique_ptr< - milvus::cachinglayer::Translator> - base_translator = std::make_unique< - milvus::segcore::storagev2translator::JsonStatsTranslator>( - load_info, milvus::tracer::TraceContext{}, ctx, load_config); - - auto slot = milvus::cachinglayer::Manager::GetInstance().CreateCacheSlot( - std::move(base_translator)); - return slot; + auto reader = std::make_shared(ctx, true); + reader->Load(milvus::tracer::TraceContext{}, load_config); + return reader; } } // namespace diff --git a/internal/core/src/exec/expression/JsonContainsExpr.cpp b/internal/core/src/exec/expression/JsonContainsExpr.cpp index 5b99f13616..f8e1298537 100644 --- a/internal/core/src/exec/expression/JsonContainsExpr.cpp +++ b/internal/core/src/exec/expression/JsonContainsExpr.cpp @@ -461,9 +461,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsByStats() { segment_->type() == SegmentType::Sealed) { auto* segment = dynamic_cast(segment_); auto field_id = expr_->column_.field_id_; - pinned_json_stats_ = segment->GetJsonStats(op_ctx_, field_id); - auto* index = pinned_json_stats_.get(); - Assert(index != nullptr); + auto index = segment->GetJsonStats(op_ctx_, field_id); + Assert(index.get() != nullptr); cached_index_chunk_res_ = std::make_shared(active_count_); cached_index_chunk_valid_res_ = @@ -520,7 +519,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsByStats() { } } }; - index->ExecuteForSharedData(op_ctx_, pointer, shared_executor); + index->ExecuteForSharedData( + op_ctx_, bson_index_, pointer, shared_executor); cached_index_chunk_id_ = 0; } @@ -675,9 +675,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArrayByStats() { segment_->type() == SegmentType::Sealed) { auto* segment = dynamic_cast(segment_); auto field_id = expr_->column_.field_id_; - pinned_json_stats_ = segment->GetJsonStats(op_ctx_, field_id); - auto* index = pinned_json_stats_.get(); - Assert(index != nullptr); + auto index = segment->GetJsonStats(op_ctx_, field_id); + Assert(index.get() != nullptr); cached_index_chunk_res_ = std::make_shared(active_count_); cached_index_chunk_valid_res_ = @@ -725,7 +724,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArrayByStats() { } return false; }; - index->ExecuteForSharedData(op_ctx_, pointer, shared_executor); + index->ExecuteForSharedData( + op_ctx_, bson_index_, pointer, shared_executor); cached_index_chunk_id_ = 0; } @@ -1003,9 +1003,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllByStats() { segment_->type() == SegmentType::Sealed) { auto* segment = dynamic_cast(segment_); auto field_id = expr_->column_.field_id_; - pinned_json_stats_ = segment->GetJsonStats(op_ctx_, field_id); - auto* index = pinned_json_stats_.get(); - Assert(index != nullptr); + auto index = segment->GetJsonStats(op_ctx_, field_id); + Assert(index.get() != nullptr); cached_index_chunk_res_ = std::make_shared(active_count_); cached_index_chunk_valid_res_ = @@ -1072,7 +1071,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllByStats() { } res_view[row_offset] = tmp_elements.empty(); }; - index->ExecuteForSharedData(op_ctx_, pointer, shared_executor); + index->ExecuteForSharedData( + op_ctx_, bson_index_, pointer, shared_executor); cached_index_chunk_id_ = 0; } @@ -1287,9 +1287,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffTypeByStats() { segment_->type() == SegmentType::Sealed) { auto* segment = dynamic_cast(segment_); auto field_id = expr_->column_.field_id_; - pinned_json_stats_ = segment->GetJsonStats(op_ctx_, field_id); - auto* index = pinned_json_stats_.get(); - Assert(index != nullptr); + auto index = segment->GetJsonStats(op_ctx_, field_id); + Assert(index.get() != nullptr); cached_index_chunk_res_ = std::make_shared(active_count_); cached_index_chunk_valid_res_ = @@ -1407,7 +1406,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffTypeByStats() { } res_view[row_offset] = tmp_elements_index.size() == 0; }; - index->ExecuteForSharedData(op_ctx_, pointer, shared_executor); + index->ExecuteForSharedData( + op_ctx_, bson_index_, pointer, shared_executor); cached_index_chunk_id_ = 0; } @@ -1561,9 +1561,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArrayByStats() { segment_->type() == SegmentType::Sealed) { auto* segment = dynamic_cast(segment_); auto field_id = expr_->column_.field_id_; - pinned_json_stats_ = segment->GetJsonStats(op_ctx_, field_id); - auto* index = pinned_json_stats_.get(); - Assert(index != nullptr); + auto index = segment->GetJsonStats(op_ctx_, field_id); + Assert(index.get() != nullptr); cached_index_chunk_res_ = std::make_shared(active_count_); cached_index_chunk_valid_res_ = @@ -1618,7 +1617,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArrayByStats() { res_view[row_offset] = exist_elements_index.size() == elements.size(); }; - index->ExecuteForSharedData(op_ctx_, pointer, shared_executor); + index->ExecuteForSharedData( + op_ctx_, bson_index_, pointer, shared_executor); cached_index_chunk_id_ = 0; } @@ -1817,9 +1817,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffTypeByStats() { segment_->type() == SegmentType::Sealed) { auto* segment = dynamic_cast(segment_); auto field_id = expr_->column_.field_id_; - pinned_json_stats_ = segment->GetJsonStats(op_ctx_, field_id); - auto* index = pinned_json_stats_.get(); - Assert(index != nullptr); + auto index = segment->GetJsonStats(op_ctx_, field_id); + Assert(index.get() != nullptr); cached_index_chunk_res_ = std::make_shared(active_count_); cached_index_chunk_valid_res_ = @@ -1928,7 +1927,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffTypeByStats() { } } }; - index->ExecuteForSharedData(op_ctx_, pointer, shared_executor); + index->ExecuteForSharedData( + op_ctx_, bson_index_, pointer, shared_executor); cached_index_chunk_id_ = 0; } diff --git a/internal/core/src/exec/expression/JsonContainsExpr.h b/internal/core/src/exec/expression/JsonContainsExpr.h index 09b844880b..08fd5f83ff 100644 --- a/internal/core/src/exec/expression/JsonContainsExpr.h +++ b/internal/core/src/exec/expression/JsonContainsExpr.h @@ -26,6 +26,8 @@ #include "segcore/SegmentInterface.h" #include "common/bson_view.h" #include "exec/expression/Utils.h" +#include "index/json_stats/bson_inverted.h" +#include "cachinglayer/CacheSlot.h" namespace milvus { namespace exec { @@ -556,7 +558,7 @@ class PhyJsonContainsFilterExpr : public SegmentExpr { std::shared_ptr arg_set_double_; std::shared_ptr arg_cached_set_; // For caching std::set or std::vector - PinWrapper pinned_json_stats_{nullptr}; + PinWrapper bson_index_{nullptr}; }; } //namespace exec } // namespace milvus diff --git a/internal/core/src/exec/expression/TermExpr.cpp b/internal/core/src/exec/expression/TermExpr.cpp index 814dbf01aa..37fcab02d1 100644 --- a/internal/core/src/exec/expression/TermExpr.cpp +++ b/internal/core/src/exec/expression/TermExpr.cpp @@ -588,9 +588,8 @@ PhyTermFilterExpr::ExecJsonInVariableByStats() { auto segment = dynamic_cast(segment_); auto field_id = expr_->column_.field_id_; auto vals = expr_->vals_; - pinned_json_stats_ = segment->GetJsonStats(op_ctx_, field_id); - auto* index = pinned_json_stats_.get(); - Assert(index != nullptr); + auto index = segment->GetJsonStats(op_ctx_, field_id); + Assert(index.get() != nullptr); cached_index_chunk_res_ = std::make_shared(active_count_); cached_index_chunk_valid_res_ = @@ -623,13 +622,12 @@ PhyTermFilterExpr::ExecJsonInVariableByStats() { } } }; - index->template ExecutorForShreddingData( - op_ctx_, - target_field, - shredding_executor, - nullptr, - res_view, - valid_res_view); + index->ExecutorForShreddingData(op_ctx_, + target_field, + shredding_executor, + nullptr, + res_view, + valid_res_view); LOG_DEBUG("using shredding data's field: {} count {}", target_field, res_view.count()); @@ -709,7 +707,9 @@ PhyTermFilterExpr::ExecJsonInVariableByStats() { return; } }; - index->ExecuteForSharedData(op_ctx_, pointer, shared_executor); + + index->ExecuteForSharedData( + op_ctx_, bson_index_, pointer, shared_executor); cached_index_chunk_id_ = 0; } diff --git a/internal/core/src/exec/expression/TermExpr.h b/internal/core/src/exec/expression/TermExpr.h index 3fe086a876..fdea754b0e 100644 --- a/internal/core/src/exec/expression/TermExpr.h +++ b/internal/core/src/exec/expression/TermExpr.h @@ -24,7 +24,8 @@ #include "exec/expression/Expr.h" #include "exec/expression/Element.h" #include "segcore/SegmentInterface.h" -#include "index/json_stats/JsonKeyStats.h" +#include "index/json_stats/bson_inverted.h" +#include "cachinglayer/CacheSlot.h" namespace milvus { namespace exec { @@ -156,7 +157,7 @@ class PhyTermFilterExpr : public SegmentExpr { std::shared_ptr arg_set_double_; SingleElement arg_val_; int32_t consistency_level_ = 0; - PinWrapper pinned_json_stats_{nullptr}; + PinWrapper bson_index_{nullptr}; }; } //namespace exec } // namespace milvus diff --git a/internal/core/src/exec/expression/UnaryExpr.cpp b/internal/core/src/exec/expression/UnaryExpr.cpp index 9910e880a7..4345044fa4 100644 --- a/internal/core/src/exec/expression/UnaryExpr.cpp +++ b/internal/core/src/exec/expression/UnaryExpr.cpp @@ -1009,9 +1009,8 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJsonByStats() { auto segment = static_cast(segment_); auto field_id = expr_->column_.field_id_; - pinned_json_stats_ = segment->GetJsonStats(op_ctx_, field_id); - auto* index = pinned_json_stats_.get(); - Assert(index != nullptr); + auto index = segment->GetJsonStats(op_ctx_, field_id); + Assert(index.get() != nullptr); cached_index_chunk_res_ = (op_type == proto::plan::OpType::NotEqual) ? std::make_shared(active_count_, true) @@ -1115,7 +1114,7 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJsonByStats() { pointer, milvus::index::JSONType::ARRAY); if (!target_field.empty()) { ShreddingArrayBsonExecutor executor(op_type, pointer, val); - index->template ExecutorForShreddingData( + index->ExecutorForShreddingData( op_ctx_, target_field, executor, @@ -1273,7 +1272,8 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJsonByStats() { ms); }); - index->ExecuteForSharedData(op_ctx_, pointer, shared_executor); + index->ExecuteForSharedData( + op_ctx_, bson_index_, pointer, shared_executor); } // for NotEqual: flip the result diff --git a/internal/core/src/exec/expression/UnaryExpr.h b/internal/core/src/exec/expression/UnaryExpr.h index 75d4e23608..6f8ec7cf9d 100644 --- a/internal/core/src/exec/expression/UnaryExpr.h +++ b/internal/core/src/exec/expression/UnaryExpr.h @@ -33,6 +33,8 @@ #include "common/RegexQuery.h" #include "exec/expression/Utils.h" #include "common/bson_view.h" +#include "index/json_stats/bson_inverted.h" +#include "cachinglayer/CacheSlot.h" namespace milvus { namespace exec { @@ -894,7 +896,7 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr { bool arg_inited_{false}; SingleElement value_arg_; PinWrapper pinned_ngram_index_{nullptr}; - PinWrapper pinned_json_stats_{nullptr}; + PinWrapper bson_index_{nullptr}; }; } // namespace exec } // namespace milvus diff --git a/internal/core/src/index/Meta.h b/internal/core/src/index/Meta.h index 7d189f623b..236520abb9 100644 --- a/internal/core/src/index/Meta.h +++ b/internal/core/src/index/Meta.h @@ -75,6 +75,7 @@ constexpr const char* BITMAP_INDEX_CARDINALITY_LIMIT = constexpr const char* MMAP_FILE_PATH = "mmap_filepath"; constexpr const char* ENABLE_MMAP = "enable_mmap"; constexpr const char* INDEX_FILES = "index_files"; +constexpr const char* INDEX_SIZE = "index_size"; constexpr const char* ENABLE_OFFSET_CACHE = "indexoffsetcache.enabled"; // VecIndex file metas diff --git a/internal/core/src/index/json_stats/JsonKeyStats.cpp b/internal/core/src/index/json_stats/JsonKeyStats.cpp index bbc9976aec..d356d3d0b6 100644 --- a/internal/core/src/index/json_stats/JsonKeyStats.cpp +++ b/internal/core/src/index/json_stats/JsonKeyStats.cpp @@ -33,6 +33,9 @@ #include "segcore/storagev1translator/ChunkTranslator.h" #include "segcore/storagev1translator/DefaultValueChunkTranslator.h" #include "segcore/storagev2translator/GroupChunkTranslator.h" +#include "segcore/storagev1translator/BsonInvertedIndexTranslator.h" +#include "cachinglayer/Manager.h" +#include "segcore/Utils.h" namespace milvus::index { @@ -66,8 +69,9 @@ JsonKeyStats::JsonKeyStats(const storage::FileManagerContext& ctx, if (is_load) { auto prefix = disk_file_manager_->GetLocalJsonStatsPrefix(); path_ = prefix; - bson_inverted_index_ = std::make_shared( - path_, field_id_, true, ctx, tantivy_index_version); + LOG_INFO("load json key stats from local path: {} for segment {}", + path_, + segment_id_); } else { auto prefix = disk_file_manager_->GetLocalTempJsonStatsPrefix(); path_ = prefix; @@ -96,17 +100,14 @@ JsonKeyStats::JsonKeyStats(const storage::FileManagerContext& ctx, shared_key_index_path, segment_id_); boost::filesystem::create_directories(shared_key_index_path); - bson_inverted_index_ = - std::make_shared(shared_key_index_path, - field_id_, - false, - ctx, - tantivy_index_version); + bson_inverted_index_ = std::make_shared( + shared_key_index_path, field_id_, ctx, tantivy_index_version); } } JsonKeyStats::~JsonKeyStats() { boost::filesystem::remove_all(path_); + LOG_INFO("remove json key stats with path: {}", path_); } void @@ -1050,9 +1051,41 @@ JsonKeyStats::LoadShreddingData(const std::vector& index_files) { } } +void +JsonKeyStats::LoadSharedKeyIndex( + const std::vector& shared_key_index_files, + bool enable_mmap, + int64_t index_size) { + segcore::storagev1translator::BsonInvertedIndexLoadInfo load_info; + load_info.enable_mmap = enable_mmap; + load_info.segment_id = segment_id_; + load_info.field_id = field_id_; + load_info.index_files = shared_key_index_files; + load_info.index_size = index_size; + load_info.load_priority = load_priority_; + + std::unique_ptr> + translator = std::make_unique< + segcore::storagev1translator::BsonInvertedIndexTranslator>( + load_info, disk_file_manager_); + + bson_index_cache_slot_ = + cachinglayer::Manager::GetInstance().CreateCacheSlot( + std::move(translator)); + + LOG_INFO( + "loaded bson inverted index using translator for field:{} of " + "segment:{}, enable_mmap:{}", + field_id_, + segment_id_, + enable_mmap); +} + void JsonKeyStats::Load(milvus::tracer::TraceContext ctx, const Config& config) { - if (config.contains(ENABLE_MMAP)) { + auto enable_mmap = + GetValueFromConfig(config, ENABLE_MMAP).value_or(false); + if (enable_mmap) { mmap_filepath_ = milvus::storage::LocalChunkManagerSingleton::GetInstance() .GetChunkManager() @@ -1111,10 +1144,14 @@ JsonKeyStats::Load(milvus::tracer::TraceContext ctx, const Config& config) { // load shredding data LoadShreddingData(shredding_data_files); + // get all index files size as shared key index size, + // no accurate way to get the shared key index size, + // so we use the total size of all index files as the shared key index size + auto index_size = + GetValueFromConfig(config, milvus::index::INDEX_SIZE) + .value_or(0); // load shared key index - bson_inverted_index_->LoadIndex(shared_key_index_files, - load_priority_, - config.contains(MMAP_FILE_PATH)); + LoadSharedKeyIndex(shared_key_index_files, enable_mmap, index_size); } IndexStatsPtr diff --git a/internal/core/src/index/json_stats/JsonKeyStats.h b/internal/core/src/index/json_stats/JsonKeyStats.h index 17c2d7c20c..e19cb5a37c 100644 --- a/internal/core/src/index/json_stats/JsonKeyStats.h +++ b/internal/core/src/index/json_stats/JsonKeyStats.h @@ -29,6 +29,7 @@ class CollectSingleJsonStatsInfoAccessor; #include "arrow/api.h" #include "index/json_stats/utils.h" #include "index/json_stats/bson_inverted.h" +#include "cachinglayer/CacheSlot.h" #include "index/json_stats/parquet_writer.h" #include "index/json_stats/bson_builder.h" #include "common/bson_view.h" @@ -162,16 +163,30 @@ class JsonKeyStats : public ScalarIndex { } public: + PinWrapper + GetBsonIndex(milvus::OpContext* op_ctx) const { + if (bson_index_cache_slot_ == nullptr) { + return PinWrapper(nullptr); + } + auto ca = SemiInlineGet(bson_index_cache_slot_->PinCells(op_ctx, {0})); + auto index = ca->get_cell_of(0); + return PinWrapper(ca, index); + } + void ExecuteForSharedData( milvus::OpContext* op_ctx, + PinWrapper& bson_index_cache, const std::string& path, std::function func) { - if (shared_column_ == nullptr || bson_inverted_index_ == nullptr) { + if (bson_index_cache.get() == nullptr) { + bson_index_cache = GetBsonIndex(op_ctx); + } + if (bson_index_cache.get() == nullptr || shared_column_ == nullptr) { return; } - bson_inverted_index_->TermQuery( + bson_index_cache.get()->TermQuery( path, [this, &func, op_ctx](const uint32_t* row_id_array, const uint32_t* offset_array, @@ -181,18 +196,6 @@ class JsonKeyStats : public ScalarIndex { }); } - void - ExecuteExistsPathForSharedData(const std::string& path, - TargetBitmapView bitset) { - if (bson_inverted_index_ == nullptr) { - return; - } - bson_inverted_index_->TermQueryEach( - path, [&bitset](uint32_t row_id, uint32_t offset) { - bitset[row_id] = true; - }); - } - int64_t ExecutorForGettingValid(milvus::OpContext* op_ctx, const std::string& path, @@ -385,16 +388,6 @@ class JsonKeyStats : public ScalarIndex { return JSONType::UNKNOWN; } - cachinglayer::ResourceUsage - CellByteSize() const { - return cell_size_; - } - - void - SetCellSize(cachinglayer::ResourceUsage cell_size) { - cell_size_ = cell_size; - } - private: void CollectSingleJsonStatsInfo(const char* json_str, @@ -624,6 +617,11 @@ class JsonKeyStats : public ScalarIndex { std::string AddBucketName(const std::string& remote_prefix); + void + LoadSharedKeyIndex(const std::vector& shared_key_index_files, + bool enable_mmap, + int64_t index_size); + private: proto::schema::FieldSchema schema_; int64_t segment_id_; @@ -644,6 +642,9 @@ class JsonKeyStats : public ScalarIndex { std::set column_keys_; std::shared_ptr parquet_writer_; std::shared_ptr bson_inverted_index_; + // cache slot for bson inverted index when using translator + std::shared_ptr> + bson_index_cache_slot_; milvus::proto::common::LoadPriority load_priority_; // some meta cache for searching @@ -666,7 +667,6 @@ class JsonKeyStats : public ScalarIndex { std::string shared_column_field_name_; std::shared_ptr shared_column_; SkipIndex skip_index_; - cachinglayer::ResourceUsage cell_size_ = {0, 0}; // Meta file for storing layout type map and other metadata JsonStatsMeta json_stats_meta_; @@ -677,7 +677,4 @@ class JsonKeyStats : public ScalarIndex { friend class ::CollectSingleJsonStatsInfoAccessor; }; -using CacheJsonKeyStatsPtr = - std::shared_ptr>; - } // namespace milvus::index diff --git a/internal/core/src/index/json_stats/bson_inverted.cpp b/internal/core/src/index/json_stats/bson_inverted.cpp index 3d8af02c46..cd8b48ff02 100644 --- a/internal/core/src/index/json_stats/bson_inverted.cpp +++ b/internal/core/src/index/json_stats/bson_inverted.cpp @@ -29,36 +29,36 @@ namespace milvus::index { BsonInvertedIndex::BsonInvertedIndex(const std::string& path, int64_t field_id, - bool is_load, const storage::FileManagerContext& ctx, int64_t tantivy_index_version) - : is_load_(is_load), + : is_load_(false), field_id_(field_id), tantivy_index_version_(tantivy_index_version) { disk_file_manager_ = std::make_shared(ctx); - if (is_load_) { - auto prefix = disk_file_manager_->GetLocalJsonStatsSharedIndexPrefix(); - path_ = prefix; - LOG_INFO("bson inverted index load path:{}", path_); - } else { - path_ = path; - LOG_INFO("bson inverted index build path:{}", path_); - } + path_ = path; + LOG_INFO("bson inverted index build path:{}", path_); +} + +BsonInvertedIndex::BsonInvertedIndex( + std::shared_ptr disk_file_manager) + : is_load_(true) { + disk_file_manager_ = disk_file_manager; + field_id_ = disk_file_manager->GetFieldDataMeta().field_id; + path_ = disk_file_manager_->GetLocalJsonStatsSharedIndexPrefix(); + LOG_INFO("bson inverted index load path:{}", path_); } BsonInvertedIndex::~BsonInvertedIndex() { if (wrapper_) { wrapper_->free(); } - if (!is_load_) { - auto local_chunk_manager = - milvus::storage::LocalChunkManagerSingleton::GetInstance() - .GetChunkManager(); - auto prefix = path_; - LOG_INFO("bson inverted index remove path:{}", path_); - local_chunk_manager->RemoveDir(prefix); - } + auto local_chunk_manager = + milvus::storage::LocalChunkManagerSingleton::GetInstance() + .GetChunkManager(); + auto prefix = path_; + LOG_INFO("bson inverted index remove path:{}", path_); + local_chunk_manager->RemoveDir(prefix); } void @@ -124,13 +124,14 @@ BsonInvertedIndex::LoadIndex(const std::vector& index_files, path_); wrapper_ = std::make_shared( path_.c_str(), load_in_mmap, milvus::index::SetBitsetUnused); - if (!load_in_mmap) { + // the index is loaded in ram, so we can remove files in advance disk_file_manager_->RemoveJsonStatsSharedIndexFiles(); } + load_in_mmap_ = load_in_mmap; LOG_INFO( - "load json shared key index done for field id:{} with " - "dir:{},load_in_mmap:{}", + "load json shared key index done for field id:{} with dir:{}, " + "load_in_mmap:{}", field_id_, path_, load_in_mmap); @@ -230,4 +231,4 @@ BsonInvertedIndex::TermQueryEach( } } -} // namespace milvus::index \ No newline at end of file +} // namespace milvus::index diff --git a/internal/core/src/index/json_stats/bson_inverted.h b/internal/core/src/index/json_stats/bson_inverted.h index 772149f665..370acb9297 100644 --- a/internal/core/src/index/json_stats/bson_inverted.h +++ b/internal/core/src/index/json_stats/bson_inverted.h @@ -43,10 +43,12 @@ class BsonInvertedIndex { public: BsonInvertedIndex(const std::string& path, int64_t field_id, - bool is_load, const storage::FileManagerContext& ctx, int64_t tantivy_index_version); + BsonInvertedIndex(std::shared_ptr + disk_file_manager); + ~BsonInvertedIndex(); void @@ -77,12 +79,21 @@ class BsonInvertedIndex { bool KeyExists(const std::string& key) { auto array = wrapper_->term_query_i64(key); - return !array.array_.len == 0; + return array.array_.len != 0; + } + + cachinglayer::ResourceUsage + CellByteSize() const { + return load_in_mmap_ ? cachinglayer::ResourceUsage( + 0, wrapper_->index_size_bytes()) + : cachinglayer::ResourceUsage( + wrapper_->index_size_bytes(), 0); } private: std::string path_; bool is_load_; + bool load_in_mmap_{false}; // json field id that this inverted index belongs to int64_t field_id_; int64_t tantivy_index_version_; diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h index bc663379ef..8315482a46 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h @@ -137,29 +137,26 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { info_proto) override; void - LoadJsonStats(FieldId field_id, - index::CacheJsonKeyStatsPtr cache_slot) override { - json_stats_.wlock()->insert({field_id, std::move(cache_slot)}); - } - - PinWrapper - GetJsonStats(milvus::OpContext* op_ctx, FieldId field_id) const override { - auto r = json_stats_.rlock(); - auto it = r->find(field_id); - if (it == r->end()) { - return PinWrapper(nullptr); - } - auto ca = SemiInlineGet(it->second->PinCells(op_ctx, {0})); - auto* stats = ca->get_cell_of(0); - AssertInfo(stats != nullptr, - "json stats cache is corrupted, field_id: {}", - field_id.get()); - return PinWrapper(ca, stats); + RemoveJsonStats(FieldId field_id) override { + std::unique_lock lck(mutex_); + json_stats_.erase(field_id); } void - RemoveJsonStats(FieldId field_id) override { - json_stats_.wlock()->erase(field_id); + LoadJsonStats(FieldId field_id, + std::shared_ptr stats) override { + std::unique_lock lck(mutex_); + json_stats_[field_id] = stats; + } + + std::shared_ptr + GetJsonStats(milvus::OpContext* op_ctx, FieldId field_id) const override { + std::shared_lock lck(mutex_); + auto iter = json_stats_.find(field_id); + if (iter == json_stats_.end()) { + return nullptr; + } + return iter->second; } PinWrapper diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 7a28a99ea9..e8c9b8b57d 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -477,25 +477,6 @@ class SegmentGrowingImpl : public SegmentGrowing { schema_->get_fields().end(); } - void - LoadJsonStats(FieldId field_id, - index::CacheJsonKeyStatsPtr cache_slot) override { - ThrowInfo(ErrorCode::NotImplemented, - "LoadJsonStats not implemented for SegmentGrowingImpl"); - } - - PinWrapper - GetJsonStats(milvus::OpContext* op_ctx, FieldId field_id) const override { - ThrowInfo(ErrorCode::NotImplemented, - "GetJsonStats not implemented for SegmentGrowingImpl"); - } - - void - RemoveJsonStats(FieldId field_id) override { - ThrowInfo(ErrorCode::NotImplemented, - "RemoveJsonStats not implemented for SegmentGrowingImpl"); - } - std::shared_ptr GetArrayOffsets(FieldId field_id) const override { auto it = array_offsets_map_.find(field_id); diff --git a/internal/core/src/segcore/SegmentInterface.cpp b/internal/core/src/segcore/SegmentInterface.cpp index 96bed94ea9..bb8d5d17b6 100644 --- a/internal/core/src/segcore/SegmentInterface.cpp +++ b/internal/core/src/segcore/SegmentInterface.cpp @@ -649,4 +649,15 @@ SegmentInternalInterface::GetNgramIndexForJson( return PinWrapper(nullptr); } +std::shared_ptr +SegmentInternalInterface::GetJsonStats(milvus::OpContext* op_ctx, + FieldId field_id) const { + std::shared_lock lock(mutex_); + auto iter = json_stats_.find(field_id); + if (iter == json_stats_.end()) { + return nullptr; + } + return iter->second; +} + } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index 1ff07567e9..e395d76797 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -218,15 +218,9 @@ class SegmentInterface { FieldId field_id, const std::string& nested_path) const = 0; - virtual PinWrapper + virtual std::shared_ptr GetJsonStats(milvus::OpContext* op_ctx, FieldId field_id) const = 0; - virtual void - LoadJsonStats(FieldId field_id, index::CacheJsonKeyStatsPtr cache_slot) = 0; - - virtual void - RemoveJsonStats(FieldId field_id) = 0; - virtual void LazyCheckSchema(SchemaPtr sch) = 0; @@ -454,6 +448,9 @@ class SegmentInternalInterface : public SegmentInterface { load_info_ = load_info; } + virtual std::shared_ptr + GetJsonStats(milvus::OpContext* op_ctx, FieldId field_id) const override; + public: // `query_offsets` is not null only for vector array (embedding list) search // where it denotes the number of vectors in each embedding list. The length @@ -692,9 +689,7 @@ class SegmentInternalInterface : public SegmentInterface { milvus::index::TextMatchIndex>>>> text_indexes_; - // json stats cache (field_id -> CacheSlot of JsonKeyStats) - mutable folly::Synchronized< - std::unordered_map> + std::unordered_map> json_stats_; GEOSContextHandle_t ctx_ = GEOS_init_r(); diff --git a/internal/core/src/segcore/SegmentSealed.h b/internal/core/src/segcore/SegmentSealed.h index 530b2d7b3f..574f146395 100644 --- a/internal/core/src/segcore/SegmentSealed.h +++ b/internal/core/src/segcore/SegmentSealed.h @@ -136,6 +136,12 @@ class SegmentSealed : public SegmentInternalInterface { GetNgramIndexForJson(milvus::OpContext* op_ctx, FieldId field_id, const std::string& nested_path) const override = 0; + virtual void + LoadJsonStats(FieldId field_id, + std::shared_ptr stats) = 0; + + virtual void + RemoveJsonStats(FieldId field_id) = 0; SegmentType type() const override { diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index e0da06b9a3..a6da82e276 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -44,7 +44,6 @@ #include "storage/RemoteChunkManagerSingleton.h" #include "exec/expression/ExprCache.h" #include "monitor/Monitor.h" -#include "segcore/storagev2translator/JsonStatsTranslator.h" #include "common/GeometryCache.h" ////////////////////////////// common interfaces ////////////////////////////// @@ -576,9 +575,6 @@ LoadJsonKeyIndex(CTraceContext c_trace, try { auto ctx = milvus::tracer::TraceContext{ c_trace.traceID, c_trace.spanID, c_trace.traceFlags}; - auto span = milvus::tracer::StartSpan("SegCoreLoadJsonStats", &ctx); - milvus::tracer::SetRootSpan(span); - auto segment_interface = reinterpret_cast(c_segment); auto segment = @@ -616,26 +612,30 @@ LoadJsonKeyIndex(CTraceContext c_trace, if (info_proto->enable_mmap()) { config[milvus::index::MMAP_FILE_PATH] = info_proto->mmap_dir_path(); } + config[milvus::index::INDEX_SIZE] = info_proto->stats_size(); - milvus::segcore::storagev2translator::JsonStatsLoadInfo load_info{ - info_proto->enable_mmap(), - info_proto->mmap_dir_path(), - segment->get_segment_id(), - info_proto->fieldid(), - info_proto->stats_size()}; milvus::storage::FileManagerContext file_ctx( field_meta, index_meta, remote_chunk_manager, fs); - std::unique_ptr< - milvus::cachinglayer::Translator> - translator = std::make_unique< - milvus::segcore::storagev2translator::JsonStatsTranslator>( - load_info, ctx, file_ctx, config); + auto index = + std::make_shared(file_ctx, true); + { + milvus::ScopedTimer timer( + "json_stats_load", + [](double ms) { + milvus::monitor::internal_json_stats_latency_load.Observe( + ms); + }, + milvus::ScopedTimer::LogLevel::Info); + index->Load(ctx, config); + } - segment->LoadJsonStats( - milvus::FieldId(info_proto->fieldid()), - milvus::cachinglayer::Manager::GetInstance().CreateCacheSlot( - std::move(translator))); + segment->LoadJsonStats(milvus::FieldId(info_proto->fieldid()), + std::move(index)); + + LOG_INFO("load json stats success for field:{} of segment:{}", + info_proto->fieldid(), + segment->get_segment_id()); return milvus::SuccessCStatus(); } catch (std::exception& e) { diff --git a/internal/core/src/segcore/storagev1translator/BsonInvertedIndexTranslator.cpp b/internal/core/src/segcore/storagev1translator/BsonInvertedIndexTranslator.cpp new file mode 100644 index 0000000000..12e32c7a11 --- /dev/null +++ b/internal/core/src/segcore/storagev1translator/BsonInvertedIndexTranslator.cpp @@ -0,0 +1,124 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "segcore/storagev1translator/BsonInvertedIndexTranslator.h" + +#include + +#include "cachinglayer/CacheSlot.h" +#include "segcore/Utils.h" +#include "monitor/Monitor.h" +#include "common/ScopedTimer.h" +#include "log/Log.h" +#include "fmt/format.h" + +namespace milvus::segcore::storagev1translator { + +BsonInvertedIndexTranslator::BsonInvertedIndexTranslator( + BsonInvertedIndexLoadInfo load_info, + std::shared_ptr disk_file_manager) + : load_info_(std::move(load_info)), + disk_file_manager_(disk_file_manager), + key_(fmt::format("seg_{}_json_stats_shared_field_{}", + load_info_.segment_id, + load_info_.field_id)), + meta_(load_info_.enable_mmap ? milvus::cachinglayer::StorageType::DISK + : milvus::cachinglayer::StorageType::MEMORY, + milvus::cachinglayer::CellIdMappingMode::ALWAYS_ZERO, + milvus::segcore::getCellDataType(/* is_vector */ false, + /* is_index */ true), + milvus::segcore::getCacheWarmupPolicy(/* is_vector */ false, + /* is_index */ true), + /* support_eviction */ true) { +} + +size_t +BsonInvertedIndexTranslator::num_cells() const { + return 1; +} + +milvus::cachinglayer::cid_t +BsonInvertedIndexTranslator::cell_id_of(milvus::cachinglayer::uid_t) const { + return 0; +} + +std::pair +BsonInvertedIndexTranslator::estimated_byte_size_of_cell( + milvus::cachinglayer::cid_t) const { + // ignore the cid checking, because there is only one cell + if (load_info_.enable_mmap) { + return {{0, load_info_.index_size}, + {load_info_.index_size, load_info_.index_size}}; + } else { + return {{load_info_.index_size, 0}, + {load_info_.index_size, load_info_.index_size}}; + } +} + +int64_t +BsonInvertedIndexTranslator::cells_storage_bytes( + const std::vector&) const { + // ignore the cids checking, because there is only one cell + constexpr int64_t MIN_STORAGE_BYTES = 1 * 1024 * 1024; + return std::max(load_info_.index_size, MIN_STORAGE_BYTES); +} + +const std::string& +BsonInvertedIndexTranslator::key() const { + return key_; +} + +std::vector>> +BsonInvertedIndexTranslator::get_cells( + const std::vector&) { + auto index = + std::make_unique(disk_file_manager_); + + { + milvus::ScopedTimer timer( + "bson_inverted_index_load", + [](double /*ms*/) { + // no specific metric defined for bson inverted index load yet + }, + milvus::ScopedTimer::LogLevel::Info); + + // Load the index using the files from load_info_ + // Cast uint32_t to LoadPriority enum + index->LoadIndex(load_info_.index_files, + static_cast( + load_info_.load_priority), + load_info_.enable_mmap); + } + + LOG_INFO("load bson inverted index success for field:{} of segment:{}", + load_info_.field_id, + load_info_.segment_id); + + std::vector>> + result; + result.emplace_back(std::make_pair(0, std::move(index))); + return result; +} + +milvus::cachinglayer::Meta* +BsonInvertedIndexTranslator::meta() { + return &meta_; +} + +} // namespace milvus::segcore::storagev1translator diff --git a/internal/core/src/segcore/storagev2translator/JsonStatsTranslator.h b/internal/core/src/segcore/storagev1translator/BsonInvertedIndexTranslator.h similarity index 56% rename from internal/core/src/segcore/storagev2translator/JsonStatsTranslator.h rename to internal/core/src/segcore/storagev1translator/BsonInvertedIndexTranslator.h index 01df14c2b9..923354e15b 100644 --- a/internal/core/src/segcore/storagev2translator/JsonStatsTranslator.h +++ b/internal/core/src/segcore/storagev1translator/BsonInvertedIndexTranslator.h @@ -18,32 +18,32 @@ #include "cachinglayer/Translator.h" #include "common/Types.h" #include "common/LoadInfo.h" -#include "index/json_stats/JsonKeyStats.h" +#include "index/json_stats/bson_inverted.h" #include "storage/FileManager.h" -namespace milvus::segcore::storagev2translator { +namespace milvus::segcore::storagev1translator { -struct JsonStatsLoadInfo { +struct BsonInvertedIndexLoadInfo { bool enable_mmap; - std::string mmap_dir_path; int64_t segment_id; int64_t field_id; - int64_t stats_size; + int64_t index_size; + std::vector index_files; + uint32_t load_priority; }; -// Translator for JSON Key Stats (non-knowhere index). It loads a single-cell -// JsonKeyStats instance for a sealed segment field and exposes it to the cache +// Translator for BsonInvertedIndex in json stats. It loads a single-cell +// BsonInvertedIndex instance for json stats shared field and exposes it to the cache // layer with a stable key and resource usage. -class JsonStatsTranslator - : public milvus::cachinglayer::Translator { +class BsonInvertedIndexTranslator : public milvus::cachinglayer::Translator< + milvus::index::BsonInvertedIndex> { public: - JsonStatsTranslator( - JsonStatsLoadInfo load_info, - milvus::tracer::TraceContext ctx, - milvus::storage::FileManagerContext file_manager_context, - milvus::Config config); + BsonInvertedIndexTranslator( + BsonInvertedIndexLoadInfo load_info, + std::shared_ptr + disk_file_manager); - ~JsonStatsTranslator() override = default; + ~BsonInvertedIndexTranslator() override = default; size_t num_cells() const override; @@ -55,30 +55,25 @@ class JsonStatsTranslator milvus::cachinglayer::ResourceUsage> estimated_byte_size_of_cell(milvus::cachinglayer::cid_t cid) const override; + int64_t + cells_storage_bytes( + const std::vector&) const override; + const std::string& key() const override; std::vector>> + std::unique_ptr>> get_cells(const std::vector& cids) override; - Meta* + milvus::cachinglayer::Meta* meta() override; - int64_t - cells_storage_bytes( - const std::vector& cids) const override { - constexpr int64_t MIN_STORAGE_BYTES = 1 * 1024 * 1024; - return std::max(load_info_.stats_size, MIN_STORAGE_BYTES); - } - private: - milvus::tracer::TraceContext ctx_; - milvus::storage::FileManagerContext file_manager_context_; - milvus::Config config_; + BsonInvertedIndexLoadInfo load_info_; + std::shared_ptr disk_file_manager_; std::string key_; - JsonStatsLoadInfo load_info_{}; milvus::cachinglayer::Meta meta_; }; -} // namespace milvus::segcore::storagev2translator +} // namespace milvus::segcore::storagev1translator diff --git a/internal/core/src/segcore/storagev2translator/JsonStatsTranslator.cpp b/internal/core/src/segcore/storagev2translator/JsonStatsTranslator.cpp deleted file mode 100644 index b492409860..0000000000 --- a/internal/core/src/segcore/storagev2translator/JsonStatsTranslator.cpp +++ /dev/null @@ -1,115 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "segcore/storagev2translator/JsonStatsTranslator.h" - -#include - -#include "cachinglayer/CacheSlot.h" -#include "segcore/Utils.h" -#include "monitor/Monitor.h" -#include "common/ScopedTimer.h" - -namespace milvus::segcore::storagev2translator { - -JsonStatsTranslator::JsonStatsTranslator( - JsonStatsLoadInfo load_info, - milvus::tracer::TraceContext ctx, - milvus::storage::FileManagerContext file_manager_context, - milvus::Config config) - : load_info_(load_info), - ctx_(ctx), - file_manager_context_(std::move(file_manager_context)), - config_(std::move(config)), - key_(fmt::format( - "seg_{}_jsonstats_{}", load_info_.segment_id, load_info_.field_id)), - meta_(load_info_.enable_mmap ? milvus::cachinglayer::StorageType::DISK - : milvus::cachinglayer::StorageType::MEMORY, - milvus::cachinglayer::CellIdMappingMode::ALWAYS_ZERO, - milvus::segcore::getCellDataType(/* is_vector */ false, - /* is_index */ true), - milvus::segcore::getCacheWarmupPolicy(/* is_vector */ false, - /* is_index */ true), - /* support_eviction */ false) { -} - -size_t -JsonStatsTranslator::num_cells() const { - return 1; -} - -milvus::cachinglayer::cid_t -JsonStatsTranslator::cell_id_of(milvus::cachinglayer::uid_t) const { - return 0; -} - -std::pair -JsonStatsTranslator::estimated_byte_size_of_cell( - milvus::cachinglayer::cid_t) const { - if (load_info_.enable_mmap) { - return {{0, load_info_.stats_size}, - {load_info_.stats_size, load_info_.stats_size}}; - } else { - return {{load_info_.stats_size, 0}, {load_info_.stats_size, 0}}; - } -} - -const std::string& -JsonStatsTranslator::key() const { - return key_; -} - -std::vector>> -JsonStatsTranslator::get_cells( - const std::vector&) { - auto stats = std::make_unique( - file_manager_context_, /* is_load */ true); - - { - milvus::ScopedTimer timer( - "json_stats_load", - [](double ms) { - milvus::monitor::internal_json_stats_latency_load.Observe(ms); - }, - milvus::ScopedTimer::LogLevel::Info); - - stats->Load(ctx_, config_); - - if (load_info_.enable_mmap) { - stats->SetCellSize({0, load_info_.stats_size}); - } else { - stats->SetCellSize({load_info_.stats_size, 0}); - } - } - - LOG_INFO("load json stats success for field:{} of segment:{}", - load_info_.field_id, - load_info_.segment_id); - - std::vector>> - result; - result.emplace_back(std::make_pair(0, std::move(stats))); - return result; -} - -Meta* -JsonStatsTranslator::meta() { - return &meta_; -} - -} // namespace milvus::segcore::storagev2translator diff --git a/internal/core/unittest/test_json_stats/test_json_key_stats.cpp b/internal/core/unittest/test_json_stats/test_json_key_stats.cpp index 7392859596..698e546394 100644 --- a/internal/core/unittest/test_json_stats/test_json_key_stats.cpp +++ b/internal/core/unittest/test_json_stats/test_json_key_stats.cpp @@ -170,7 +170,7 @@ class JsonKeyStatsTest : public ::testing::TestWithParam { int64_t collection_id = 1; int64_t partition_id = 2; int64_t segment_id = 3; - int64_t field_id = 101; + field_id_ = 101; int64_t index_build_id = GenerateRandomInt64(1, 100000); int64_t index_version = 1; size_ = 1000; // Use a larger size for better testing @@ -191,7 +191,7 @@ class JsonKeyStatsTest : public ::testing::TestWithParam { Init(collection_id, partition_id, segment_id, - field_id, + field_id_, index_build_id, index_version, size_); @@ -206,6 +206,7 @@ class JsonKeyStatsTest : public ::testing::TestWithParam { DataType type_; bool nullable_; size_t size_; + int64_t field_id_; FixedVector valid_data; std::vector data_; std::vector json_col; @@ -231,10 +232,12 @@ TEST_P(JsonKeyStatsTest, TestBasicOperations) { TEST_P(JsonKeyStatsTest, TestExecuteForSharedData) { std::string path = "/int_shared"; int count = 0; + PinWrapper bson_index{nullptr}; index_->ExecuteForSharedData( - nullptr, path, [&](BsonView bson, uint32_t row_id, uint32_t offset) { - count++; - }); + nullptr, + bson_index, + path, + [&](BsonView bson, uint32_t row_id, uint32_t offset) { count++; }); std::cout << "count: " << count << std::endl; if (nullable_) { EXPECT_EQ(count, 100); @@ -243,20 +246,6 @@ TEST_P(JsonKeyStatsTest, TestExecuteForSharedData) { } } -TEST_P(JsonKeyStatsTest, TestExecuteExistsPathForSharedData) { - std::string path = "/int_shared"; - TargetBitmap bitset(size_); - TargetBitmapView bitset_view(bitset); - index_->ExecuteExistsPathForSharedData(path, bitset_view); - std::cout << "bitset.count(): " << bitset.count() << std::endl; - auto count = bitset.count(); - if (nullable_) { - EXPECT_EQ(count, 100); - } else { - EXPECT_EQ(count, 200); - } -} - TEST_P(JsonKeyStatsTest, TestExecutorForGettingValid) { std::string path = "/int"; TargetBitmap valid_res(size_, true); @@ -267,7 +256,15 @@ TEST_P(JsonKeyStatsTest, TestExecutorForGettingValid) { index_->ExecutorForGettingValid(nullptr, field, valid_res_view); EXPECT_EQ(processed_size, size_); } - index_->ExecuteExistsPathForSharedData(path, valid_res_view); + std::cout << "can not skip shared" << std::endl; + PinWrapper bson_index{nullptr}; + index_->ExecuteForSharedData( + nullptr, + bson_index, + path, + [&](BsonView bson, uint32_t row_id, uint32_t offset) { + valid_res[row_id] = true; + }); std::cout << "valid_res.count(): " << valid_res.count() << std::endl; if (nullable_) { EXPECT_EQ(valid_res.count(), 400); @@ -439,7 +436,14 @@ class JsonKeyStatsUploadLoadTest : public ::testing::Test { VerifyPathInShared(const std::string& path) { TargetBitmap bitset(data_.size()); TargetBitmapView bitset_view(bitset); - load_index_->ExecuteExistsPathForSharedData(path, bitset_view); + PinWrapper bson_index{nullptr}; + load_index_->ExecuteForSharedData( + nullptr, + bson_index, + path, + [&](BsonView bson, uint32_t row_id, uint32_t offset) { + bitset[row_id] = true; + }); EXPECT_GT(bitset.size(), 0); } @@ -736,4 +740,4 @@ TEST_F(JsonKeyStatsUploadLoadTest, TestMultipleBuildCycles) { VerifyPathInShredding("/y"); VerifyPathInShredding("/z"); } -} \ No newline at end of file +} diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index ed1186786a..93d03036cb 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -1271,7 +1271,7 @@ func (s *LocalSegment) LoadJSONKeyIndex(ctx context.Context, jsonKeyStats *datap LoadPriority: s.loadInfo.Load().GetPriority(), EnableMmap: paramtable.Get().QueryNodeCfg.MmapJSONStats.GetAsBool(), MmapDirPath: paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue(), - StatsSize: jsonKeyStats.GetMemorySize(), + StatsSize: jsonKeyStats.GetLogSize(), } marshaled, err := proto.Marshal(cgoProto)