diff --git a/internal/core/src/common/VectorArrayStorageV2Test.cpp b/internal/core/src/common/VectorArrayStorageV2Test.cpp index c035ccfb69..899bc3b7f6 100644 --- a/internal/core/src/common/VectorArrayStorageV2Test.cpp +++ b/internal/core/src/common/VectorArrayStorageV2Test.cpp @@ -45,6 +45,7 @@ #include "index/VectorIndex.h" #include "storage/Util.h" #include "storage/ChunkManager.h" +#include "test_utils/indexbuilder_test_utils.h" #include "test_utils/storage_test_utils.h" #include "common/QueryResult.h" @@ -52,8 +53,6 @@ using namespace milvus; using namespace milvus::segcore; using namespace milvus::storage; -const int64_t DIM = 4; - SchemaPtr GenVectorArrayTestSchema() { auto schema = std::make_shared(); @@ -345,7 +344,7 @@ TEST_F(TestVectorArrayStorageV2, BuildEmbListHNSWIndex) { auto search_conf = knowhere::Json{{knowhere::indexparam::NPROBE, 10}}; milvus::SearchInfo searchInfo; searchInfo.topk_ = 5; - searchInfo.metric_type_ = knowhere::metric::MAX_SIM; + searchInfo.metric_type_ = knowhere::metric::MAX_SIM_IP; searchInfo.search_params_ = search_conf; SearchResult result; milvus::OpContext op_context; @@ -359,3 +358,134 @@ TEST_F(TestVectorArrayStorageV2, BuildEmbListHNSWIndex) { EXPECT_EQ(op_context.storage_usage.scanned_total_bytes, 0); } } + +TEST_F(TestVectorArrayStorageV2, BuildEmbListHNSWIndexWithMmap) { + ASSERT_NE(segment_, nullptr); + ASSERT_EQ(segment_->get_row_count(), test_data_count_ * chunk_num_); + + auto vector_array_field_id = fields_["vector_array"]; + ASSERT_TRUE(segment_->HasFieldData(vector_array_field_id)); + + // Get the storage v2 parquet file paths that were already written in SetUp + std::vector paths = {"test_data/101/10001.parquet"}; + + // Use the existing Arrow file system from SetUp + auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() + .GetArrowFileSystem(); + + // Prepare for index building + int64_t collection_id = 1; + int64_t partition_id = 2; + int64_t segment_id = 3; + int64_t index_build_id = 4000; + int64_t index_version = 4000; + + auto field_meta = + milvus::segcore::gen_field_meta(collection_id, + partition_id, + segment_id, + vector_array_field_id.get(), + DataType::VECTOR_ARRAY, + DataType::VECTOR_FLOAT, + false); + + auto index_meta = gen_index_meta( + segment_id, vector_array_field_id.get(), index_build_id, index_version); + + // Create storage config pointing to the test data location + auto storage_config = + gen_local_storage_config("/tmp/test_vector_array_for_storage_v2"); + auto cm = CreateChunkManager(storage_config); + + // Create index using storage v2 config + milvus::index::CreateIndexInfo create_index_info; + create_index_info.field_type = DataType::VECTOR_ARRAY; + create_index_info.metric_type = knowhere::metric::MAX_SIM_IP; + create_index_info.index_type = knowhere::IndexEnum::INDEX_HNSW; + create_index_info.index_engine_version = + knowhere::Version::GetCurrentVersion().VersionNumber(); + + auto emb_list_hnsw_index = + milvus::index::IndexFactory::GetInstance().CreateIndex( + create_index_info, + storage::FileManagerContext(field_meta, index_meta, cm, fs)); + + // Build index with storage v2 configuration + Config config; + config[milvus::index::INDEX_TYPE] = knowhere::IndexEnum::INDEX_HNSW; + config[knowhere::meta::METRIC_TYPE] = create_index_info.metric_type; + config[knowhere::indexparam::M] = "16"; + config[knowhere::indexparam::EF] = "10"; + config[DIM_KEY] = DIM; + config[INDEX_NUM_ROWS_KEY] = + test_data_count_ * chunk_num_; // Important: set row count + config[STORAGE_VERSION_KEY] = 2; // Use storage v2 + config[DATA_TYPE_KEY] = DataType::VECTOR_ARRAY; + config[ELEMENT_TYPE_KEY] = DataType::VECTOR_FLOAT; + + // For storage v2, we need to provide segment insert files instead of individual binlog files + milvus::SegmentInsertFiles segment_insert_files; + segment_insert_files.emplace_back( + paths); // Column group with vector array field + config[SEGMENT_INSERT_FILES_KEY] = segment_insert_files; + emb_list_hnsw_index->Build(config); + + auto create_index_result = emb_list_hnsw_index->Upload(); + emb_list_hnsw_index.reset(); + auto index_files = create_index_result->GetIndexFiles(); + auto memSize = create_index_result->GetMemSize(); + auto serializedSize = create_index_result->GetSerializedSize(); + ASSERT_GT(memSize, 0); + ASSERT_GT(serializedSize, 0); + + auto new_emb_list_hnsw_index = + milvus::index::IndexFactory::GetInstance().CreateIndex( + create_index_info, + storage::FileManagerContext(field_meta, index_meta, cm, fs)); + milvus::index::VectorIndex* vec_index = + dynamic_cast( + new_emb_list_hnsw_index.get()); + // mmap load + { + auto load_conf = generate_load_conf( + knowhere::IndexEnum::INDEX_HNSW, knowhere::metric::MAX_SIM_IP, 0); + load_conf["index_files"] = index_files; + load_conf["mmap_filepath"] = "mmap/test_emb_list_index"; + load_conf["emb_list_meta_file_path"] = "mmap/test_index_meta"; + load_conf[milvus::LOAD_PRIORITY] = + milvus::proto::common::LoadPriority::HIGH; + vec_index->Load(milvus::tracer::TraceContext{}, load_conf); + } + // search + { + // Each row has 3 vectors, so total count should be rows * 3 + EXPECT_EQ(vec_index->Count(), test_data_count_ * chunk_num_ * 3); + EXPECT_EQ(vec_index->GetDim(), DIM); + auto vec_num = 10; + std::vector query_vec = generate_float_vector(vec_num, DIM); + auto query_dataset = + knowhere::GenDataSet(vec_num, DIM, query_vec.data()); + std::vector query_vec_lims; + query_vec_lims.push_back(0); + query_vec_lims.push_back(3); + query_vec_lims.push_back(10); + query_dataset->Set(knowhere::meta::EMB_LIST_OFFSET, + const_cast(query_vec_lims.data())); + + auto search_conf = knowhere::Json{{knowhere::indexparam::NPROBE, 10}}; + milvus::SearchInfo searchInfo; + searchInfo.topk_ = 5; + searchInfo.metric_type_ = knowhere::metric::MAX_SIM_IP; + searchInfo.search_params_ = search_conf; + SearchResult result; + milvus::OpContext op_context; + vec_index->Query( + query_dataset, searchInfo, nullptr, &op_context, result); + auto ref_result = SearchResultToJson(result); + std::cout << ref_result.dump(1) << std::endl; + EXPECT_EQ(result.total_nq_, 2); + EXPECT_EQ(result.distances_.size(), 2 * searchInfo.topk_); + EXPECT_EQ(op_context.storage_usage.scanned_cold_bytes, 0); + EXPECT_EQ(op_context.storage_usage.scanned_total_bytes, 0); + } +} \ No newline at end of file diff --git a/internal/core/src/exec/expression/ExprTest.cpp b/internal/core/src/exec/expression/ExprTest.cpp index 31300ad248..ed761713dd 100644 --- a/internal/core/src/exec/expression/ExprTest.cpp +++ b/internal/core/src/exec/expression/ExprTest.cpp @@ -17543,7 +17543,7 @@ TEST_P(ExprTest, TestSTDWithinFunction) { }); } -TEST_P(ExprTest, ParseGISFunctionFilterExprs) { +TEST(ExprTest, ParseGISFunctionFilterExprs) { // Build Schema auto schema = std::make_shared(); auto dim = 16; @@ -17552,12 +17552,10 @@ TEST_P(ExprTest, ParseGISFunctionFilterExprs) { auto geo_id = schema->AddDebugField("geo", DataType::GEOMETRY); auto pk_id = schema->AddDebugField("pk", DataType::INT64); schema->set_primary_field_id(pk_id); - // Generate data and load int64_t N = 1000; auto dataset = DataGen(schema, N); auto seg = CreateSealedWithFieldDataLoaded(schema, dataset); - // Test plan with gisfunction_filter_expr std::string raw_plan = R"PLAN(vector_anns: < field_id: 100 @@ -17579,17 +17577,13 @@ TEST_P(ExprTest, ParseGISFunctionFilterExprs) { > placeholder_tag: "$0" >)PLAN"; - // Convert and parse - auto bin_plan = translate_text_plan_with_metric_type(raw_plan); + auto bin_plan = translate_text_plan_to_binary_plan(raw_plan.c_str()); auto plan = CreateSearchPlanByExpr(schema, bin_plan.data(), bin_plan.size()); - // If parsing fails, test will fail with exception // If parsing succeeds, ParseGISFunctionFilterExprs is covered - // Execute search to verify execution logic - auto ph_raw = CreatePlaceholderGroup(5, dim, 123); auto ph_grp = ParsePlaceholderGroup(plan.get(), ph_raw.SerializeAsString()); auto sr = seg->Search(plan.get(), ph_grp.get(), MAX_TIMESTAMP); diff --git a/internal/core/src/index/Meta.h b/internal/core/src/index/Meta.h index d3928a2845..048a22abde 100644 --- a/internal/core/src/index/Meta.h +++ b/internal/core/src/index/Meta.h @@ -79,6 +79,8 @@ constexpr const char* ENABLE_OFFSET_CACHE = "indexoffsetcache.enabled"; // VecIndex file metas constexpr const char* DISK_ANN_PREFIX_PATH = "index_prefix"; constexpr const char* DISK_ANN_RAW_DATA_PATH = "data_path"; +constexpr const char* EMB_LIST_META_PATH = "emb_list_meta_file_path"; +constexpr const char* EMB_LIST_META_FILE_NAME = "emb_list_meta"; // VecIndex node filtering constexpr const char* VEC_OPT_FIELDS_PATH = "opt_fields_path"; diff --git a/internal/core/src/index/VectorMemIndex.cpp b/internal/core/src/index/VectorMemIndex.cpp index 842c9b69a1..1018c4c232 100644 --- a/internal/core/src/index/VectorMemIndex.cpp +++ b/internal/core/src/index/VectorMemIndex.cpp @@ -591,6 +591,20 @@ void VectorMemIndex::LoadFromFile(const Config& config) { GetValueFromConfig( config, milvus::LOAD_PRIORITY) .value_or(milvus::proto::common::LoadPriority::HIGH); + auto is_embedding_list = (elem_type_ != DataType::NONE); + std::unique_ptr embedding_list_meta_writer_ptr = + nullptr; + auto embedding_list_meta_path = + GetValueFromConfig(config, EMB_LIST_META_PATH); + if (is_embedding_list) { + AssertInfo(embedding_list_meta_path.has_value(), + "mmap filepath is empty when load index"); + std::filesystem::create_directories( + std::filesystem::path(embedding_list_meta_path.value()) + .parent_path()); + embedding_list_meta_writer_ptr = std::make_unique( + embedding_list_meta_path.value()); + } auto file_writer = storage::FileWriter( local_filepath.value(), @@ -654,7 +668,14 @@ void VectorMemIndex::LoadFromFile(const Config& config) { "lost index slice data"); auto&& data = batch_data[file_name]; auto start_write_file = std::chrono::system_clock::now(); - file_writer.Write(data->PayloadData(), data->PayloadSize()); + if (prefix == knowhere::meta::EMB_LIST_META && + embedding_list_meta_writer_ptr) { + embedding_list_meta_writer_ptr->Write( + data->PayloadData(), data->PayloadSize()); + } else { + file_writer.Write(data->PayloadData(), + data->PayloadSize()); + } write_disk_duration_sum += (std::chrono::system_clock::now() - start_write_file); } @@ -686,9 +707,15 @@ void VectorMemIndex::LoadFromFile(const Config& config) { (std::chrono::system_clock::now() - start_load_files2_mem); //2. write data into files auto start_write_file = std::chrono::system_clock::now(); - for (auto& [_, index_data] : result) { - file_writer.Write(index_data->PayloadData(), - index_data->PayloadSize()); + for (auto& [prefix, index_data] : result) { + if (prefix == knowhere::meta::EMB_LIST_META && + embedding_list_meta_writer_ptr) { + embedding_list_meta_writer_ptr->Write( + index_data->PayloadData(), index_data->PayloadSize()); + } else { + file_writer.Write(index_data->PayloadData(), + index_data->PayloadSize()); + } } write_disk_duration_sum += (std::chrono::system_clock::now() - start_write_file); @@ -701,11 +728,17 @@ void VectorMemIndex::LoadFromFile(const Config& config) { write_disk_duration_sum) .count()); file_writer.Finish(); + if (embedding_list_meta_writer_ptr) { + embedding_list_meta_writer_ptr->Finish(); + } LOG_INFO("load index into Knowhere..."); auto conf = config; conf.erase(MMAP_FILE_PATH); conf[ENABLE_MMAP] = true; + if (is_embedding_list) { + conf["emb_list_meta_file_path"] = embedding_list_meta_path.value(); + } auto start_deserialize = std::chrono::system_clock::now(); auto stat = index_.DeserializeFromFile(local_filepath.value(), conf); auto deserialize_duration = diff --git a/internal/core/src/segcore/storagev1translator/SealedIndexTranslator.cpp b/internal/core/src/segcore/storagev1translator/SealedIndexTranslator.cpp index 5abc8d8768..f0f9e43a23 100644 --- a/internal/core/src/segcore/storagev1translator/SealedIndexTranslator.cpp +++ b/internal/core/src/segcore/storagev1translator/SealedIndexTranslator.cpp @@ -117,10 +117,17 @@ SealedIndexTranslator::get_cells(const std::vector& cids) { "mmap directory path is empty"); auto filepath = std::filesystem::path(index_load_info_.mmap_dir_path) / "index_files" / index_load_info_.index_id / - index_load_info_.segment_id / index_load_info_.field_id; - + index_load_info_.segment_id / + index_load_info_.field_id / "index"; + auto embedding_list_meta_path = + std::filesystem::path(index_load_info_.mmap_dir_path) / + "index_files" / index_load_info_.index_id / + index_load_info_.segment_id / index_load_info_.field_id / + index::EMB_LIST_META_FILE_NAME; config_[milvus::index::ENABLE_MMAP] = "true"; config_[milvus::index::MMAP_FILE_PATH] = filepath.string(); + config_[milvus::index::EMB_LIST_META_PATH] = + embedding_list_meta_path.string(); } else { config_[milvus::index::ENABLE_MMAP] = "false"; } diff --git a/internal/core/thirdparty/knowhere/CMakeLists.txt b/internal/core/thirdparty/knowhere/CMakeLists.txt index 9d6ab2f4b6..4a62f330dd 100644 --- a/internal/core/thirdparty/knowhere/CMakeLists.txt +++ b/internal/core/thirdparty/knowhere/CMakeLists.txt @@ -14,7 +14,7 @@ # Update KNOWHERE_VERSION for the first occurrence milvus_add_pkg_config("knowhere") set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "") -set( KNOWHERE_VERSION c4d5dd8 ) +set( KNOWHERE_VERSION v2.6.4 ) set( GIT_REPOSITORY "https://github.com/zilliztech/knowhere.git") message(STATUS "Knowhere repo: ${GIT_REPOSITORY}") diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index e54a3d780e..1ec5bc5b24 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -2376,7 +2376,6 @@ class SealedVectorArrayTest TEST_P(SealedVectorArrayTest, QueryVectorArrayAllFields) { auto schema = std::make_shared(); - auto int64_field = schema->AddDebugField("int64", DataType::INT64); auto array_vec = schema->AddDebugVectorArrayField( "array_vec", element_type, dim, metric_type);