diff --git a/internal/core/src/common/Consts.h b/internal/core/src/common/Consts.h index 23878fd59a..db9970c94c 100644 --- a/internal/core/src/common/Consts.h +++ b/internal/core/src/common/Consts.h @@ -59,8 +59,9 @@ constexpr const char* JSON_STATS_ROOT_PATH = "json_stats"; constexpr const char* JSON_STATS_DATA_FORMAT_VERSION = "2"; constexpr const char* JSON_STATS_SHARED_INDEX_PATH = "shared_key_index"; constexpr const char* JSON_STATS_SHREDDING_DATA_PATH = "shredding_data"; +constexpr const char* JSON_STATS_META_FILE_NAME = "meta.json"; constexpr const char* JSON_KEY_STATS_SHARED_FIELD_NAME = "__shared"; -// store key layout type in parquet file metadata +// store key layout type in parquet file metadata (deprecated, now stored in separate file) inline constexpr const char* JSON_STATS_META_KEY_LAYOUT_TYPE_MAP = "key_layout_type_map"; // start json stats field id for mock column diff --git a/internal/core/src/index/json_stats/JsonKeyStats.cpp b/internal/core/src/index/json_stats/JsonKeyStats.cpp index 392b6d2be6..bbc9976aec 100644 --- a/internal/core/src/index/json_stats/JsonKeyStats.cpp +++ b/internal/core/src/index/json_stats/JsonKeyStats.cpp @@ -23,6 +23,7 @@ #include "index/InvertedIndexUtil.h" #include "index/Utils.h" #include "milvus-storage/filesystem/fs.h" +#include "storage/LocalChunkManagerSingleton.h" #include "storage/MmapManager.h" #include "storage/Util.h" #include "common/bson_view.h" @@ -624,6 +625,64 @@ JsonKeyStats::GetSharedKeyIndexDir() { return shared_key_index_path.string(); } +std::string +JsonKeyStats::GetMetaFilePath() { + std::filesystem::path json_stats_dir = path_; + std::filesystem::path meta_file_path = + json_stats_dir / JSON_STATS_META_FILE_NAME; + return meta_file_path.string(); +} + +void +JsonKeyStats::WriteMetaFile() { + json_stats_meta_.SetLayoutTypeMap(key_types_); + json_stats_meta_.SetInt64(META_KEY_NUM_ROWS, num_rows_); + json_stats_meta_.SetInt64(META_KEY_NUM_SHREDDING_COLUMNS, + column_keys_.size()); + + auto meta_content = json_stats_meta_.Serialize(); + auto meta_file_path = GetMetaFilePath(); + + auto local_chunk_manager = + milvus::storage::LocalChunkManagerSingleton::GetInstance() + .GetChunkManager(); + local_chunk_manager->Write( + meta_file_path, meta_content.data(), meta_content.size()); + + meta_file_size_ = meta_content.size(); + LOG_INFO("write meta file: {} with size {} for segment {} for field {}", + meta_file_path, + meta_file_size_, + segment_id_, + field_id_); +} + +void +JsonKeyStats::LoadMetaFile(const std::string& local_meta_file_path) { + LOG_INFO("load meta file: {} for segment {} for field {}", + local_meta_file_path, + segment_id_, + field_id_); + + auto local_chunk_manager = + storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + + auto file_size = local_chunk_manager->Size(local_meta_file_path); + std::string meta_content; + meta_content.resize(file_size); + local_chunk_manager->Read( + local_meta_file_path, meta_content.data(), file_size); + + key_field_map_ = JsonStatsMeta::DeserializeToKeyFieldMap(meta_content); + + LOG_INFO( + "loaded meta file with {} key field entries for segment {} for field " + "{}", + key_field_map_.size(), + segment_id_, + field_id_); +} + BinarySet JsonKeyStats::Serialize(const Config& config) { return BinarySet(); @@ -709,6 +768,9 @@ JsonKeyStats::BuildWithFieldData(const std::vector& field_datas, BuildKeyStats(field_datas, nullable); parquet_writer_->Close(); bson_inverted_index_->BuildIndex(); + + // write meta file with layout type map and other metadata + WriteMetaFile(); } void @@ -725,7 +787,6 @@ JsonKeyStats::GetColumnSchemaFromParquet(int64_t column_group_id, LOG_DEBUG("get column schema: [{}] for segment {}", file_schema->ToString(true), segment_id_); - column_group_schemas_[column_group_id] = file_schema; for (const auto& field : file_schema->fields()) { auto field_name = field->name(); @@ -845,11 +906,18 @@ JsonKeyStats::LoadShreddingMeta( auto remote_prefix = AddBucketName(disk_file_manager_->GetRemoteJsonStatsShreddingPrefix()); - // load common meta from parquet, all parquet files have the same meta - // just need to read one file - auto file = CreateColumnGroupParquetPath( - remote_prefix, sorted_files[0].first, sorted_files[0].second[0]); - GetCommonMetaFromParquet(file); + // load common meta from parquet only if key_field_map_ is not already populated + // (for backward compatibility with old data that doesn't have separate meta file) + if (key_field_map_.empty()) { + auto file = CreateColumnGroupParquetPath( + remote_prefix, sorted_files[0].first, sorted_files[0].second[0]); + GetCommonMetaFromParquet(file); + } else { + LOG_INFO( + "skip loading common meta from parquet, already loaded from meta " + "file for segment {}", + segment_id_); + } // load distinct meta from parquet, distinct meta is different for each parquet file // main purpose is to get column schema @@ -1003,7 +1071,11 @@ JsonKeyStats::Load(milvus::tracer::TraceContext ctx, const Config& config) { AssertInfo(index_files.has_value(), "index file paths is empty when load json stats for segment {}", segment_id_); - // split index_files into shared_key_index and shredding_data + + // split index_files into meta, shared_key_index, and shredding_data + // Note: Check directory paths (shared_key_index, shredding_data) BEFORE meta.json, + // because shared_key_index/meta.json_0 contains "meta.json" but is not the meta file. + std::vector meta_files; std::vector shared_key_index_files; std::vector shredding_data_files; for (const auto& file : index_files.value()) { @@ -1012,6 +1084,8 @@ JsonKeyStats::Load(milvus::tracer::TraceContext ctx, const Config& config) { } else if (file.find(JSON_STATS_SHREDDING_DATA_PATH) != std::string::npos) { shredding_data_files.emplace_back(file); + } else if (file.find(JSON_STATS_META_FILE_NAME) != std::string::npos) { + meta_files.emplace_back(file); } else { ThrowInfo(ErrorCode::UnexpectedError, "unknown file path: {} for segment {}", @@ -1020,6 +1094,20 @@ JsonKeyStats::Load(milvus::tracer::TraceContext ctx, const Config& config) { } } + // load meta file first (contains layout type map) + if (!meta_files.empty()) { + AssertInfo( + meta_files.size() == 1, + "expected exactly one meta file, got {} for segment {}, field {}", + meta_files.size(), + segment_id_, + field_id_); + // cache meta file to local disk + auto local_meta_file = disk_file_manager_->CacheJsonStatsMetaToDisk( + meta_files[0], load_priority_); + LoadMetaFile(local_meta_file); + } + // load shredding data LoadShreddingData(shredding_data_files); @@ -1034,15 +1122,35 @@ JsonKeyStats::Upload(const Config& config) { // upload inverted index auto bson_index_stats = bson_inverted_index_->UploadIndex(); + // upload meta file + auto meta_file_path = GetMetaFilePath(); + AssertInfo(disk_file_manager_->AddJsonStatsMetaLog(meta_file_path), + "failed to upload meta file: {} for segment {}", + meta_file_path, + segment_id_); + // upload parquet file, parquet writer has already upload file to remote auto shredding_remote_paths_to_size = parquet_writer_->GetPathsToSize(); auto shared_key_index_remote_paths_to_size = bson_index_stats->GetSerializedIndexFileInfo(); + auto meta_remote_paths_to_size = + disk_file_manager_->GetRemotePathsToFileSize(); // get all index files for meta std::vector index_files; index_files.reserve(shredding_remote_paths_to_size.size() + - shared_key_index_remote_paths_to_size.size()); + shared_key_index_remote_paths_to_size.size() + 1); + + // add meta file + for (const auto& [path, size] : meta_remote_paths_to_size) { + if (path.find(JSON_STATS_META_FILE_NAME) != std::string::npos) { + auto file_path = path.substr(path.find(JSON_STATS_META_FILE_NAME)); + index_files.emplace_back(file_path, size); + LOG_INFO( + "upload meta file: {} for segment {}", file_path, segment_id_); + } + } + // only store shared_key_index/... and shredding_data/... to meta // for saving meta space for (const auto& file_info : shared_key_index_remote_paths_to_size) { @@ -1065,16 +1173,18 @@ JsonKeyStats::Upload(const Config& config) { LOG_INFO( "upload json key stats for segment {} with bson mem size: {} " - "and " - "shredding data mem size: {} and index files size: {}", + "and shredding data mem size: {} and meta file size: {} " + "and index files size: {}", segment_id_, bson_index_stats->GetMemSize(), parquet_writer_->GetTotalSize(), + meta_file_size_, index_files.size()); - return IndexStats::New( - bson_index_stats->GetMemSize() + parquet_writer_->GetTotalSize(), - std::move(index_files)); + return IndexStats::New(bson_index_stats->GetMemSize() + + parquet_writer_->GetTotalSize() + + meta_file_size_, + std::move(index_files)); } } // namespace milvus::index diff --git a/internal/core/src/index/json_stats/JsonKeyStats.h b/internal/core/src/index/json_stats/JsonKeyStats.h index 49af1a33ab..17c2d7c20c 100644 --- a/internal/core/src/index/json_stats/JsonKeyStats.h +++ b/internal/core/src/index/json_stats/JsonKeyStats.h @@ -457,6 +457,15 @@ class JsonKeyStats : public ScalarIndex { std::string GetSharedKeyIndexDir(); + std::string + GetMetaFilePath(); + + void + WriteMetaFile(); + + void + LoadMetaFile(const std::string& meta_file_path); + void AddKeyStats(const std::vector& path, JSONType type, @@ -648,8 +657,6 @@ class JsonKeyStats : public ScalarIndex { std::unordered_map field_id_to_name_map_; // field_name vector, the sequece is the same as the order of files std::vector field_names_; - // column_group_id -> schema, the sequence of schemas is the same as the order of files - std::map> column_group_schemas_; // field_name -> column mutable std::unordered_map> @@ -661,6 +668,10 @@ class JsonKeyStats : public ScalarIndex { SkipIndex skip_index_; cachinglayer::ResourceUsage cell_size_ = {0, 0}; + // Meta file for storing layout type map and other metadata + JsonStatsMeta json_stats_meta_; + int64_t meta_file_size_{0}; + // Friend accessor for unit tests to call private methods safely. friend class ::TraverseJsonForBuildStatsAccessor; friend class ::CollectSingleJsonStatsInfoAccessor; diff --git a/internal/core/src/index/json_stats/utils.cpp b/internal/core/src/index/json_stats/utils.cpp index 10bc157a1e..92fbb7155e 100644 --- a/internal/core/src/index/json_stats/utils.cpp +++ b/internal/core/src/index/json_stats/utils.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include "milvus-storage/common/constants.h" namespace milvus::index { @@ -207,15 +208,101 @@ CreateArrowSchema(std::map column_map) { std::vector> CreateParquetKVMetadata(std::map column_map) { - nlohmann::json key_type; - for (const auto& [key, type] : column_map) { - key_type[key.ToColumnName()] = ToString(type); + // layout type map is now stored in a separate meta file to reduce parquet file size. + // return empty metadata vector. + return {}; +} + +std::string +JsonStatsMeta::Serialize() const { + nlohmann::json root; + + // Serialize string values + for (const auto& [key, value] : string_values_) { + root[key] = value; } - // for shared field, not need to store in metadata - std::vector> res; - res.push_back( - std::make_pair(JSON_STATS_META_KEY_LAYOUT_TYPE_MAP, key_type.dump())); - return res; + + // Serialize int64 values + for (const auto& [key, value] : int64_values_) { + root[key] = value; + } + + // Serialize layout type map + if (!layout_type_map_.empty()) { + nlohmann::json layout_map; + for (const auto& [json_key, layout_type] : layout_type_map_) { + layout_map[json_key.ToColumnName()] = ToString(layout_type); + } + root[META_KEY_LAYOUT_TYPE_MAP] = layout_map; + } + + return root.dump(); +} + +JsonStatsMeta +JsonStatsMeta::Deserialize(const std::string& json_str) { + JsonStatsMeta meta; + + try { + nlohmann::json root = nlohmann::json::parse(json_str); + + for (auto it = root.begin(); it != root.end(); ++it) { + const std::string& key = it.key(); + + if (key == META_KEY_LAYOUT_TYPE_MAP) { + // Parse layout type map + std::map layout_map; + for (auto& [column_name, layout_type_str] : + it.value().items()) { + auto json_type = GetJsonTypeFromKeyName(column_name); + auto json_pointer = GetKeyFromColumnName(column_name); + JsonKey json_key(json_pointer, json_type); + auto layout_type = + JsonKeyLayoutTypeFromString(layout_type_str); + layout_map[json_key] = layout_type; + } + meta.SetLayoutTypeMap(layout_map); + } else if (it.value().is_string()) { + meta.SetString(key, it.value().get()); + } else if (it.value().is_number_integer()) { + meta.SetInt64(key, it.value().get()); + } + // Other types can be added as needed + } + } catch (const std::exception& e) { + ThrowInfo(ErrorCode::UnexpectedError, + "Failed to deserialize JsonStatsMeta: {}", + e.what()); + } + + return meta; +} + +std::unordered_map> +JsonStatsMeta::DeserializeToKeyFieldMap(const std::string& json_str) { + std::unordered_map> key_field_map; + + try { + nlohmann::json root = nlohmann::json::parse(json_str); + + auto it = root.find(META_KEY_LAYOUT_TYPE_MAP); + if (it != root.end()) { + for (auto& [column_name, layout_type_str] : it.value().items()) { + auto layout_type = JsonKeyLayoutTypeFromString(layout_type_str); + if (layout_type == JsonKeyLayoutType::SHARED) { + continue; + } + auto json_pointer = GetKeyFromColumnName(column_name); + key_field_map[json_pointer].insert(column_name); + } + } + } catch (const std::exception& e) { + ThrowInfo(ErrorCode::UnexpectedError, + "Failed to deserialize JsonStatsMeta to key_field_map: {}", + e.what()); + } + + return key_field_map; } } // namespace milvus::index \ No newline at end of file diff --git a/internal/core/src/index/json_stats/utils.h b/internal/core/src/index/json_stats/utils.h index edddfcf24b..797ca6e577 100644 --- a/internal/core/src/index/json_stats/utils.h +++ b/internal/core/src/index/json_stats/utils.h @@ -16,18 +16,22 @@ #pragma once +#include +#include +#include +#include #include -#include +#include +#include #include #include +#include -#include "index/InvertedIndexTantivy.h" -#include "common/jsmn.h" #include "arrow/api.h" #include "common/EasyAssert.h" -#include -#include +#include "common/jsmn.h" +#include "index/InvertedIndexTantivy.h" namespace milvus::index { @@ -492,6 +496,97 @@ SortByParquetPath(const std::vector& paths) { return result; } +// Meta keys for JsonStatsMeta +inline constexpr const char* META_KEY_VERSION = "version"; +inline constexpr const char* META_KEY_LAYOUT_TYPE_MAP = "layout_type_map"; +inline constexpr const char* META_KEY_NUM_ROWS = "num_rows"; +inline constexpr const char* META_KEY_NUM_SHREDDING_COLUMNS = + "num_shredding_columns"; +inline constexpr const char* META_CURRENT_VERSION = "1"; + +// Generic metadata container for JSON stats +// Supports storing various key-value pairs and can be serialized to/from JSON file +class JsonStatsMeta { + public: + JsonStatsMeta() { + SetString(META_KEY_VERSION, META_CURRENT_VERSION); + } + + void + SetString(const std::string& key, const std::string& value) { + string_values_[key] = value; + } + + std::optional + GetString(const std::string& key) const { + auto it = string_values_.find(key); + if (it != string_values_.end()) { + return it->second; + } + return std::nullopt; + } + + void + SetInt64(const std::string& key, int64_t value) { + int64_values_[key] = value; + } + + std::optional + GetInt64(const std::string& key) const { + auto it = int64_values_.find(key); + if (it != int64_values_.end()) { + return it->second; + } + return std::nullopt; + } + + // layout type map operations (special handling for backward compatibility) + void + SetLayoutTypeMap(const std::map& layout_map) { + layout_type_map_ = layout_map; + } + + const std::map& + GetLayoutTypeMap() const { + return layout_type_map_; + } + + // build key_field_map from layout_type_map (for loading) + std::unordered_map> + BuildKeyFieldMap() const { + std::unordered_map> key_field_map; + for (const auto& [json_key, layout_type] : layout_type_map_) { + // only store metadata for shredding columns (TYPED/DYNAMIC), + // skip SHARED keys to save memory + if (layout_type == JsonKeyLayoutType::SHARED) { + continue; + } + auto column_name = json_key.ToColumnName(); + key_field_map[json_key.key_].insert(column_name); + } + return key_field_map; + } + + std::string + Serialize() const; + + static JsonStatsMeta + Deserialize(const std::string& json_str); + + static std::unordered_map> + DeserializeToKeyFieldMap(const std::string& json_str); + + size_t + GetSerializedSize() const { + return Serialize().size(); + } + + private: + std::map string_values_; + std::map int64_values_; + std::map layout_type_map_; +}; + } // namespace milvus::index template <> diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index 8dad667ecf..c6dbfad4de 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -113,6 +113,18 @@ DiskFileManagerImpl::GetRemoteJsonStatsShreddingPrefix() { return (prefix / suffix).string(); } +std::string +DiskFileManagerImpl::GetRemoteJsonStatsMetaPath(const std::string& file_name) { + namespace fs = std::filesystem; + fs::path prefix = GetRemoteJsonStatsLogPrefix(); + return (prefix / file_name).string(); +} + +std::string +DiskFileManagerImpl::GetLocalJsonStatsMetaPrefix() { + return GetLocalJsonStatsPrefix(); +} + bool DiskFileManagerImpl::AddFileInternal( const std::string& file, @@ -229,6 +241,38 @@ DiskFileManagerImpl::AddJsonSharedIndexLog(const std::string& file) noexcept { }); } +bool +DiskFileManagerImpl::AddJsonStatsMetaLog(const std::string& file) noexcept { + auto local_chunk_manager = + LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + FILEMANAGER_TRY + if (!local_chunk_manager->Exist(file)) { + LOG_ERROR("local meta file {} not exists", file); + return false; + } + + local_paths_.emplace_back(file); + auto fileName = GetFileName(file); + auto fileSize = local_chunk_manager->Size(file); + added_total_file_size_ += fileSize; + + // Meta file is small, upload directly without slicing + auto remote_path = GetRemoteJsonStatsMetaPath(fileName); + auto buf = std::shared_ptr(new uint8_t[fileSize]); + local_chunk_manager->Read(file, buf.get(), fileSize); + rcm_->Write(remote_path, buf.get(), fileSize); + + remote_paths_to_size_[remote_path] = fileSize; + LOG_INFO("upload json stats meta file: {} to remote: {}, size: {}", + file, + remote_path, + fileSize); + + FILEMANAGER_CATCH + FILEMANAGER_END + return true; +} + bool DiskFileManagerImpl::AddTextLog(const std::string& file) noexcept { return AddFileInternal( @@ -404,6 +448,40 @@ DiskFileManagerImpl::CacheJsonStatsSharedIndexToDisk( remote_files, GetLocalJsonStatsSharedIndexPrefix(), priority); } +std::string +DiskFileManagerImpl::CacheJsonStatsMetaToDisk( + const std::string& remote_file, + milvus::proto::common::LoadPriority priority) { + auto local_chunk_manager = + LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + auto local_prefix = GetLocalJsonStatsMetaPrefix(); + + auto file_name = remote_file.substr(remote_file.find_last_of('/') + 1); + auto local_file = + (std::filesystem::path(local_prefix) / file_name).string(); + + auto parent_path = std::filesystem::path(local_file).parent_path(); + if (!local_chunk_manager->Exist(parent_path.string())) { + local_chunk_manager->CreateDir(parent_path.string()); + } + + auto remote_prefix = GetRemoteJsonStatsLogPrefix(); + auto full_remote_path = + (std::filesystem::path(remote_prefix) / remote_file).string(); + + auto file_size = rcm_->Size(full_remote_path); + auto buf = std::shared_ptr(new uint8_t[file_size]); + rcm_->Read(full_remote_path, buf.get(), file_size); + local_chunk_manager->Write(local_file, buf.get(), file_size); + + LOG_INFO("Cached json stats meta file from {} to {}, size: {}", + full_remote_path, + local_file, + file_size); + + return local_file; +} + template std::string DiskFileManagerImpl::CacheRawDataToDisk(const Config& config) { diff --git a/internal/core/src/storage/DiskFileManagerImpl.h b/internal/core/src/storage/DiskFileManagerImpl.h index 6049eb7f2c..787ea7f643 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.h +++ b/internal/core/src/storage/DiskFileManagerImpl.h @@ -87,6 +87,9 @@ class DiskFileManagerImpl : public FileManagerImpl { bool AddJsonSharedIndexLog(const std::string& filename) noexcept; + bool + AddJsonStatsMetaLog(const std::string& filename) noexcept; + public: std::string GetName() const override { @@ -142,6 +145,12 @@ class DiskFileManagerImpl : public FileManagerImpl { GetRemoteJsonStatsSharedIndexPath(const std::string& file_name, int64_t slice_num); + std::string + GetRemoteJsonStatsMetaPath(const std::string& file_name); + + std::string + GetLocalJsonStatsMetaPrefix(); + std::string GetLocalRawDataObjectPrefix(); @@ -184,6 +193,12 @@ class DiskFileManagerImpl : public FileManagerImpl { const std::vector& remote_files, milvus::proto::common::LoadPriority priority); + // Cache meta file to local disk + // Returns local file path + std::string + CacheJsonStatsMetaToDisk(const std::string& remote_file, + milvus::proto::common::LoadPriority priority); + void RemoveNgramIndexFiles(); diff --git a/internal/core/unittest/test_json_stats/test_json_key_stats.cpp b/internal/core/unittest/test_json_stats/test_json_key_stats.cpp index aef4c74717..7392859596 100644 --- a/internal/core/unittest/test_json_stats/test_json_key_stats.cpp +++ b/internal/core/unittest/test_json_stats/test_json_key_stats.cpp @@ -558,4 +558,182 @@ TEST_F(JsonKeyStatsUploadLoadTest, TestComplexJson) { VerifyJsonType("/user/address/city_STRING", JSONType::STRING); VerifyJsonType("/user/address/zip_INT64", JSONType::INT64); VerifyJsonType("/timestamp_INT64", JSONType::INT64); +} + +// Test that meta.json file is created and contains correct metadata +TEST_F(JsonKeyStatsUploadLoadTest, TestMetaFileCreation) { + std::vector json_strings = { + R"({"int": 1, "string": "test1"})", + R"({"int": 2, "string": "test2"})", + R"({"int": 3, "string": "test3"})"}; + + InitContext(); + PrepareData(json_strings); + BuildAndUpload(); + + // Verify meta.json is in the index files + bool meta_file_found = false; + for (const auto& file : index_files_) { + if (file.find(JSON_STATS_META_FILE_NAME) != std::string::npos && + file.find(JSON_STATS_SHARED_INDEX_PATH) == std::string::npos && + file.find(JSON_STATS_SHREDDING_DATA_PATH) == std::string::npos) { + meta_file_found = true; + break; + } + } + EXPECT_TRUE(meta_file_found) << "meta.json should be in index files"; + + // Load and verify + Load(); + VerifyBasicOperations(); + VerifyPathInShredding("/int"); + VerifyPathInShredding("/string"); +} + +// Test full pipeline: build -> upload -> load -> query +TEST_F(JsonKeyStatsUploadLoadTest, TestFullPipelineWithMetaFile) { + std::vector json_strings; + // Generate more data for a comprehensive test + for (int i = 0; i < 100; i++) { + json_strings.push_back(fmt::format( + R"({{"id": {}, "name": "user_{}", "score": {}, "active": {}}})", + i, + i, + i * 1.5, + i % 2 == 0 ? "true" : "false")); + } + + InitContext(); + PrepareData(json_strings); + BuildAndUpload(); + + // Verify index files structure + bool has_meta = false; + bool has_shredding = false; + bool has_shared_index = false; + for (const auto& file : index_files_) { + if (file.find(JSON_STATS_META_FILE_NAME) != std::string::npos && + file.find(JSON_STATS_SHARED_INDEX_PATH) == std::string::npos && + file.find(JSON_STATS_SHREDDING_DATA_PATH) == std::string::npos) { + has_meta = true; + } + if (file.find(JSON_STATS_SHREDDING_DATA_PATH) != std::string::npos) { + has_shredding = true; + } + if (file.find(JSON_STATS_SHARED_INDEX_PATH) != std::string::npos) { + has_shared_index = true; + } + } + EXPECT_TRUE(has_meta) << "Should have meta.json file"; + EXPECT_TRUE(has_shredding) << "Should have shredding data files"; + EXPECT_TRUE(has_shared_index) << "Should have shared key index files"; + + // Load and verify + Load(); + + // Verify count + EXPECT_EQ(load_index_->Count(), 100); + + // Verify shredding fields exist + auto id_fields = load_index_->GetShreddingFields("/id"); + EXPECT_FALSE(id_fields.empty()); + + auto name_fields = load_index_->GetShreddingFields("/name"); + EXPECT_FALSE(name_fields.empty()); + + auto score_fields = load_index_->GetShreddingFields("/score"); + EXPECT_FALSE(score_fields.empty()); + + auto active_fields = load_index_->GetShreddingFields("/active"); + EXPECT_FALSE(active_fields.empty()); + + // Verify types + VerifyJsonType("/id_INT64", JSONType::INT64); + VerifyJsonType("/name_STRING", JSONType::STRING); + VerifyJsonType("/score_DOUBLE", JSONType::DOUBLE); + VerifyJsonType("/active_BOOL", JSONType::BOOL); +} + +// Test backward compatibility: load data without meta.json +// This simulates old format where metadata was stored in parquet files. +// Note: Since new code doesn't write metadata to parquet anymore, this test +// verifies that the loading code path handles missing meta.json gracefully +// and falls back to reading from parquet (even if parquet metadata is empty). +TEST_F(JsonKeyStatsUploadLoadTest, TestLoadWithoutMetaFile) { + std::vector json_strings = { + R"({"int": 1, "string": "test1"})", + R"({"int": 2, "string": "test2"})", + R"({"int": 3, "string": "test3"})"}; + + InitContext(); + PrepareData(json_strings); + BuildAndUpload(); + + // Remove meta.json from index_files to simulate old format + std::vector index_files_without_meta; + for (const auto& file : index_files_) { + if (file.find(JSON_STATS_META_FILE_NAME) == std::string::npos || + file.find(JSON_STATS_SHARED_INDEX_PATH) != std::string::npos || + file.find(JSON_STATS_SHREDDING_DATA_PATH) != std::string::npos) { + index_files_without_meta.push_back(file); + } + } + + // Verify we actually removed the meta file + EXPECT_LT(index_files_without_meta.size(), index_files_.size()); + + // Replace index_files_ with version without meta + index_files_ = index_files_without_meta; + + // Load should still work - it will try to read from parquet metadata + // (which is empty in new format, but the code path should not crash) + Load(); + + // Basic operations should still work + EXPECT_EQ(load_index_->Count(), data_.size()); + EXPECT_EQ(load_index_->Size(), data_.size()); + + // Note: GetShreddingFields may return empty because key_field_map_ is empty + // when both meta.json and parquet metadata are missing/empty. + // This is expected behavior for backward compatibility path. +} + +// Test that multiple build-upload-load cycles work correctly +TEST_F(JsonKeyStatsUploadLoadTest, TestMultipleBuildCycles) { + // First cycle + { + std::vector json_strings = {R"({"a": 1, "b": "test1"})", + R"({"a": 2, "b": "test2"})"}; + + InitContext(); + PrepareData(json_strings); + BuildAndUpload(); + Load(); + + EXPECT_EQ(load_index_->Count(), 2); + VerifyPathInShredding("/a"); + VerifyPathInShredding("/b"); + } + + // Clean up for second cycle + boost::filesystem::remove_all(chunk_manager_->GetRootPath()); + + // Second cycle with different schema + { + index_build_id_ = GenerateRandomInt64(1, 100000); // New build id + std::vector json_strings = { + R"({"x": 100, "y": 200, "z": "data1"})", + R"({"x": 101, "y": 201, "z": "data2"})", + R"({"x": 102, "y": 202, "z": "data3"})"}; + + InitContext(); + PrepareData(json_strings); + BuildAndUpload(); + Load(); + + EXPECT_EQ(load_index_->Count(), 3); + VerifyPathInShredding("/x"); + VerifyPathInShredding("/y"); + VerifyPathInShredding("/z"); + } } \ No newline at end of file diff --git a/internal/core/unittest/test_json_stats/test_parquet_writer.cpp b/internal/core/unittest/test_json_stats/test_parquet_writer.cpp index a408ecfceb..ab009c4e13 100644 --- a/internal/core/unittest/test_json_stats/test_parquet_writer.cpp +++ b/internal/core/unittest/test_json_stats/test_parquet_writer.cpp @@ -58,7 +58,7 @@ TEST_F(ParquetWriterFactoryTest, CreateContextBasicTest) { EXPECT_EQ(context.builders_map.size(), column_map_.size()); // Verify metadata - EXPECT_FALSE(context.kv_metadata.empty()); + EXPECT_TRUE(context.kv_metadata.empty()); // Verify column groups EXPECT_FALSE(context.column_groups.empty()); diff --git a/internal/core/unittest/test_json_stats/test_utils.cpp b/internal/core/unittest/test_json_stats/test_utils.cpp index ef90d83efb..48092685f8 100644 --- a/internal/core/unittest/test_json_stats/test_utils.cpp +++ b/internal/core/unittest/test_json_stats/test_utils.cpp @@ -49,22 +49,196 @@ TEST_F(UtilsTest, CreateArrowBuildersTest) { TEST_F(UtilsTest, CreateParquetKVMetadataTest) { std::map column_map = { {JsonKey("int_key", JSONType::INT64), JsonKeyLayoutType::TYPED}, - {JsonKey("string_key", JSONType::STRING), JsonKeyLayoutType::DYNAMIC}, - {JsonKey("double_key", JSONType::DOUBLE), JsonKeyLayoutType::TYPED}, - {JsonKey("bool_key", JSONType::BOOL), JsonKeyLayoutType::TYPED}, - {JsonKey("shared_key", JSONType::STRING), JsonKeyLayoutType::SHARED}}; + {JsonKey("string_key", JSONType::STRING), JsonKeyLayoutType::DYNAMIC}}; + // After moving meta to separate file, this should return empty auto metadata = CreateParquetKVMetadata(column_map); - EXPECT_FALSE(metadata.empty()); - EXPECT_EQ(metadata.size(), 1); // layout_type + EXPECT_TRUE(metadata.empty()); +} - // Parse and verify layout_type - auto layout_type_json = nlohmann::json::parse(metadata[0].second); - for (const auto& [key, type] : column_map) { - std::string key_with_type = key.key_ + "_" + ToString(key.type_); - EXPECT_TRUE(layout_type_json.contains(key_with_type)); - EXPECT_EQ(layout_type_json[key_with_type], ToString(type)); +// JsonStatsMeta tests +class JsonStatsMetaTest : public ::testing::Test { + protected: + void + SetUp() override { + } +}; + +TEST_F(JsonStatsMetaTest, StringValuesTest) { + JsonStatsMeta meta; + + // Default version should be set + auto version = meta.GetString(META_KEY_VERSION); + EXPECT_TRUE(version.has_value()); + EXPECT_EQ(version.value(), META_CURRENT_VERSION); + + // Set and get custom string value + meta.SetString("custom_key", "custom_value"); + auto value = meta.GetString("custom_key"); + EXPECT_TRUE(value.has_value()); + EXPECT_EQ(value.value(), "custom_value"); + + // Non-existent key should return nullopt + auto non_existent = meta.GetString("non_existent"); + EXPECT_FALSE(non_existent.has_value()); +} + +TEST_F(JsonStatsMetaTest, Int64ValuesTest) { + JsonStatsMeta meta; + + meta.SetInt64(META_KEY_NUM_ROWS, 10000); + auto num_rows = meta.GetInt64(META_KEY_NUM_ROWS); + EXPECT_TRUE(num_rows.has_value()); + EXPECT_EQ(num_rows.value(), 10000); + + meta.SetInt64(META_KEY_NUM_SHREDDING_COLUMNS, 50); + auto num_columns = meta.GetInt64(META_KEY_NUM_SHREDDING_COLUMNS); + EXPECT_TRUE(num_columns.has_value()); + EXPECT_EQ(num_columns.value(), 50); + + // Non-existent key should return nullopt + auto non_existent = meta.GetInt64("non_existent"); + EXPECT_FALSE(non_existent.has_value()); +} + +TEST_F(JsonStatsMetaTest, LayoutTypeMapTest) { + JsonStatsMeta meta; + + std::map layout_map = { + {JsonKey("/a/b", JSONType::INT64), JsonKeyLayoutType::TYPED}, + {JsonKey("/a/c", JSONType::STRING), JsonKeyLayoutType::DYNAMIC}, + {JsonKey("/x/y", JSONType::DOUBLE), JsonKeyLayoutType::SHARED}}; + + meta.SetLayoutTypeMap(layout_map); + + const auto& retrieved = meta.GetLayoutTypeMap(); + EXPECT_EQ(retrieved.size(), layout_map.size()); + + for (const auto& [key, type] : layout_map) { + auto it = retrieved.find(key); + EXPECT_NE(it, retrieved.end()); + EXPECT_EQ(it->second, type); } } +TEST_F(JsonStatsMetaTest, BuildKeyFieldMapTest) { + JsonStatsMeta meta; + + std::map layout_map = { + {JsonKey("/a/b", JSONType::INT64), JsonKeyLayoutType::TYPED}, + {JsonKey("/a/b", JSONType::STRING), JsonKeyLayoutType::DYNAMIC}, + {JsonKey("/x/y", JSONType::DOUBLE), JsonKeyLayoutType::SHARED}}; + + meta.SetLayoutTypeMap(layout_map); + + auto key_field_map = meta.BuildKeyFieldMap(); + + // SHARED keys should be skipped + EXPECT_EQ(key_field_map.size(), 1); // Only /a/b + EXPECT_TRUE(key_field_map.find("/a/b") != key_field_map.end()); + EXPECT_EQ(key_field_map["/a/b"].size(), 2); // INT64 and STRING columns + EXPECT_TRUE(key_field_map["/a/b"].count("/a/b_INT64") > 0); + EXPECT_TRUE(key_field_map["/a/b"].count("/a/b_STRING") > 0); + + // SHARED key should not be in the map + EXPECT_TRUE(key_field_map.find("/x/y") == key_field_map.end()); +} + +TEST_F(JsonStatsMetaTest, SerializeDeserializeTest) { + JsonStatsMeta meta; + + // Set various values + meta.SetString("custom_string", "test_value"); + meta.SetInt64(META_KEY_NUM_ROWS, 12345); + meta.SetInt64(META_KEY_NUM_SHREDDING_COLUMNS, 67); + + std::map layout_map = { + {JsonKey("/a/b", JSONType::INT64), JsonKeyLayoutType::TYPED}, + {JsonKey("/c/d", JSONType::STRING), JsonKeyLayoutType::DYNAMIC}}; + meta.SetLayoutTypeMap(layout_map); + + // Serialize + std::string serialized = meta.Serialize(); + EXPECT_FALSE(serialized.empty()); + + // Deserialize + JsonStatsMeta deserialized = JsonStatsMeta::Deserialize(serialized); + + // Verify string values + EXPECT_EQ(deserialized.GetString(META_KEY_VERSION).value(), + META_CURRENT_VERSION); + EXPECT_EQ(deserialized.GetString("custom_string").value(), "test_value"); + + // Verify int64 values + EXPECT_EQ(deserialized.GetInt64(META_KEY_NUM_ROWS).value(), 12345); + EXPECT_EQ(deserialized.GetInt64(META_KEY_NUM_SHREDDING_COLUMNS).value(), + 67); + + // Verify layout type map + const auto& retrieved_layout = deserialized.GetLayoutTypeMap(); + EXPECT_EQ(retrieved_layout.size(), layout_map.size()); +} + +TEST_F(JsonStatsMetaTest, DeserializeToKeyFieldMapTest) { + JsonStatsMeta meta; + + std::map layout_map = { + {JsonKey("/a/b", JSONType::INT64), JsonKeyLayoutType::TYPED}, + {JsonKey("/a/b", JSONType::STRING), JsonKeyLayoutType::DYNAMIC}, + {JsonKey("/c/d", JSONType::DOUBLE), JsonKeyLayoutType::TYPED}, + {JsonKey("/x/y", JSONType::BOOL), JsonKeyLayoutType::SHARED}}; + meta.SetLayoutTypeMap(layout_map); + + std::string serialized = meta.Serialize(); + + // Use static method to deserialize directly to key_field_map + auto key_field_map = JsonStatsMeta::DeserializeToKeyFieldMap(serialized); + + // SHARED keys should be skipped + EXPECT_EQ(key_field_map.size(), 2); // /a/b and /c/d + + // Check /a/b has two columns + EXPECT_TRUE(key_field_map.find("/a/b") != key_field_map.end()); + EXPECT_EQ(key_field_map["/a/b"].size(), 2); + + // Check /c/d has one column + EXPECT_TRUE(key_field_map.find("/c/d") != key_field_map.end()); + EXPECT_EQ(key_field_map["/c/d"].size(), 1); + + // SHARED key should not be present + EXPECT_TRUE(key_field_map.find("/x/y") == key_field_map.end()); +} + +TEST_F(JsonStatsMetaTest, DeserializeInvalidJsonTest) { + std::string invalid_json = "not a valid json"; + + EXPECT_THROW(JsonStatsMeta::Deserialize(invalid_json), std::exception); + EXPECT_THROW(JsonStatsMeta::DeserializeToKeyFieldMap(invalid_json), + std::exception); +} + +TEST_F(JsonStatsMetaTest, GetSerializedSizeTest) { + JsonStatsMeta meta; + meta.SetInt64(META_KEY_NUM_ROWS, 100); + + size_t size = meta.GetSerializedSize(); + std::string serialized = meta.Serialize(); + + EXPECT_EQ(size, serialized.size()); +} + +TEST_F(JsonStatsMetaTest, EmptyLayoutTypeMapTest) { + JsonStatsMeta meta; + + // Empty layout type map + std::string serialized = meta.Serialize(); + EXPECT_FALSE(serialized.empty()); + + JsonStatsMeta deserialized = JsonStatsMeta::Deserialize(serialized); + EXPECT_TRUE(deserialized.GetLayoutTypeMap().empty()); + + auto key_field_map = JsonStatsMeta::DeserializeToKeyFieldMap(serialized); + EXPECT_TRUE(key_field_map.empty()); +} + } // namespace milvus::index \ No newline at end of file