From cd0b36c39e1c1d630ba8f940c31873af07cd260d Mon Sep 17 00:00:00 2001 From: Spade A <71589810+SpadeA-Tang@users.noreply.github.com> Date: Tue, 4 Nov 2025 11:57:33 +0800 Subject: [PATCH] feat: impl StructArray -- support diskann index (#45223) issue: https://github.com/milvus-io/milvus/issues/42148 --------- Signed-off-by: SpadeA-Tang Signed-off-by: SpadeA --- internal/core/src/index/IndexFactory.cpp | 83 +++++++++-- internal/core/src/index/Meta.h | 2 + internal/core/src/index/VectorDiskIndex.cpp | 69 ++++++++- internal/core/src/index/VectorDiskIndex.h | 3 + .../core/src/storage/DiskFileManagerImpl.cpp | 131 ++++++++++++++++-- .../core/src/storage/DiskFileManagerImpl.h | 3 +- .../core/thirdparty/knowhere/CMakeLists.txt | 2 +- .../test_storage_v2_index_raw_data.cpp | 2 +- 8 files changed, 274 insertions(+), 21 deletions(-) diff --git a/internal/core/src/index/IndexFactory.cpp b/internal/core/src/index/IndexFactory.cpp index f4809fbc29..53790a9525 100644 --- a/internal/core/src/index/IndexFactory.cpp +++ b/internal/core/src/index/IndexFactory.cpp @@ -577,31 +577,98 @@ IndexFactory::CreateVectorIndex( switch (data_type) { case DataType::VECTOR_FLOAT: { return std::make_unique>( - index_type, metric_type, version, file_manager_context); + DataType::NONE, + index_type, + metric_type, + version, + file_manager_context); } case DataType::VECTOR_FLOAT16: { return std::make_unique>( - index_type, metric_type, version, file_manager_context); + DataType::NONE, + index_type, + metric_type, + version, + file_manager_context); } case DataType::VECTOR_BFLOAT16: { return std::make_unique>( - index_type, metric_type, version, file_manager_context); + DataType::NONE, + index_type, + metric_type, + version, + file_manager_context); } case DataType::VECTOR_BINARY: { return std::make_unique>( - index_type, metric_type, version, file_manager_context); + DataType::NONE, + index_type, + metric_type, + version, + file_manager_context); } case DataType::VECTOR_SPARSE_U32_F32: { return std::make_unique>( - index_type, metric_type, version, file_manager_context); + DataType::NONE, + index_type, + metric_type, + version, + file_manager_context); } case DataType::VECTOR_ARRAY: { - ThrowInfo(Unsupported, - "VECTOR_ARRAY for DiskAnnIndex is not supported"); + auto element_type = + static_cast(file_manager_context.fieldDataMeta + .field_schema.element_type()); + switch (element_type) { + case DataType::VECTOR_FLOAT: + return std::make_unique>( + element_type, + index_type, + metric_type, + version, + file_manager_context); + case DataType::VECTOR_FLOAT16: + return std::make_unique>( + element_type, + index_type, + metric_type, + version, + file_manager_context); + case DataType::VECTOR_BFLOAT16: + return std::make_unique>( + element_type, + index_type, + metric_type, + version, + file_manager_context); + case DataType::VECTOR_BINARY: + return std::make_unique>( + element_type, + index_type, + metric_type, + version, + file_manager_context); + case DataType::VECTOR_INT8: + return std::make_unique>( + element_type, + index_type, + metric_type, + version, + file_manager_context); + default: + ThrowInfo(NotImplemented, + fmt::format("not implemented data type to " + "build disk index: {}", + element_type)); + } } case DataType::VECTOR_INT8: { return std::make_unique>( - index_type, metric_type, version, file_manager_context); + DataType::NONE, + index_type, + metric_type, + version, + file_manager_context); } default: ThrowInfo( diff --git a/internal/core/src/index/Meta.h b/internal/core/src/index/Meta.h index 048a22abde..7d189f623b 100644 --- a/internal/core/src/index/Meta.h +++ b/internal/core/src/index/Meta.h @@ -38,6 +38,7 @@ constexpr const char* BITMAP_INDEX_NUM_ROWS = "bitmap_index_num_rows"; constexpr const char* INDEX_TYPE = "index_type"; constexpr const char* METRIC_TYPE = "metric_type"; +constexpr const char* EMB_LIST = "embedding_list"; // scalar index type constexpr const char* ASCENDING_SORT = "STL_SORT"; @@ -81,6 +82,7 @@ constexpr const char* DISK_ANN_PREFIX_PATH = "index_prefix"; constexpr const char* DISK_ANN_RAW_DATA_PATH = "data_path"; constexpr const char* EMB_LIST_META_PATH = "emb_list_meta_file_path"; constexpr const char* EMB_LIST_META_FILE_NAME = "emb_list_meta"; +constexpr const char* EMB_LIST_OFFSETS_PATH = "emb_list_offset_file_path"; // VecIndex node filtering constexpr const char* VEC_OPT_FIELDS_PATH = "opt_fields_path"; diff --git a/internal/core/src/index/VectorDiskIndex.cpp b/internal/core/src/index/VectorDiskIndex.cpp index d0655a44d6..b2165d66bf 100644 --- a/internal/core/src/index/VectorDiskIndex.cpp +++ b/internal/core/src/index/VectorDiskIndex.cpp @@ -38,11 +38,12 @@ namespace milvus::index { template VectorDiskAnnIndex::VectorDiskAnnIndex( + DataType elem_type, const IndexType& index_type, const MetricType& metric_type, const IndexVersion& version, const storage::FileManagerContext& file_manager_context) - : VectorIndex(index_type, metric_type) { + : VectorIndex(index_type, metric_type), elem_type_(elem_type) { CheckMetricTypeSupport(metric_type); file_manager_ = std::make_shared(file_manager_context); @@ -145,9 +146,35 @@ VectorDiskAnnIndex::Build(const Config& config) { build_config.update(config); auto segment_id = file_manager_->GetFieldDataMeta().segment_id; - auto local_data_path = file_manager_->CacheRawDataToDisk(config); + auto field_id = file_manager_->GetFieldDataMeta().field_id; + + auto is_embedding_list = (elem_type_ != DataType::NONE); + Config config_with_emb_list = config; + config_with_emb_list[EMB_LIST] = is_embedding_list; + + std::string offsets_path; + // Set offsets path in config for VECTOR_ARRAY + if (is_embedding_list) { + offsets_path = storage::GenFieldRawDataPathPrefix( + local_chunk_manager, segment_id, field_id) + + "offset"; + config_with_emb_list[EMB_LIST_OFFSETS_PATH] = offsets_path; + } + + auto local_data_path = + file_manager_->CacheRawDataToDisk(config_with_emb_list); build_config[DISK_ANN_RAW_DATA_PATH] = local_data_path; + // For VECTOR_ARRAY, verify offsets file exists and pass its path to build_config + if (is_embedding_list) { + if (!local_chunk_manager->Exist(offsets_path)) { + ThrowInfo(ErrorCode::UnexpectedError, + fmt::format("Embedding list offsets file not found: {}", + offsets_path)); + } + build_config[EMB_LIST_OFFSETS_PATH] = offsets_path; + } + auto local_index_path_prefix = file_manager_->GetLocalIndexObjectPrefix(); build_config[DISK_ANN_PREFIX_PATH] = local_index_path_prefix; @@ -229,6 +256,44 @@ VectorDiskAnnIndex::BuildWithDataset(const DatasetPtr& dataset, auto raw_data = const_cast(milvus::GetDatasetTensor(dataset)); local_chunk_manager->Write(local_data_path, offset, raw_data, data_size); + // For VECTOR_ARRAY, write offsets to a separate file and pass the path to knowhere + if (elem_type_ != DataType::NONE) { + auto offsets = + dataset->Get(knowhere::meta::EMB_LIST_OFFSET); + if (offsets == nullptr) { + ThrowInfo(ErrorCode::UnexpectedError, + "Embedding list offsets is empty when build index"); + } + + // Write offsets to disk file (use same path convention as Build method) + std::string offsets_path = + storage::GenFieldRawDataPathPrefix( + local_chunk_manager, segment_id, field_id) + + "offset"; + local_chunk_manager->CreateFile(offsets_path); + + // Calculate the number of offsets (num_rows + 1) + // We need to find the actual number by looking at the data + uint32_t num_rows = + static_cast(milvus::GetDatasetRows(dataset)); + uint32_t num_offsets = num_rows + 1; + + // Write offsets to file + // Format: [num_offsets][offsets_data] + int64_t write_pos = 0; + local_chunk_manager->Write( + offsets_path, write_pos, &num_offsets, sizeof(uint32_t)); + write_pos += sizeof(uint32_t); + + local_chunk_manager->Write( + offsets_path, + write_pos, + const_cast(static_cast(offsets)), + num_offsets * sizeof(size_t)); + + build_config[EMB_LIST_OFFSETS_PATH] = offsets_path; + } + auto stat = index_.Build({}, build_config); if (stat != knowhere::Status::success) ThrowInfo(ErrorCode::IndexBuildError, diff --git a/internal/core/src/index/VectorDiskIndex.h b/internal/core/src/index/VectorDiskIndex.h index 52b39c2023..d169d19896 100644 --- a/internal/core/src/index/VectorDiskIndex.h +++ b/internal/core/src/index/VectorDiskIndex.h @@ -28,6 +28,7 @@ template class VectorDiskAnnIndex : public VectorIndex { public: explicit VectorDiskAnnIndex( + DataType elem_type /* used for embedding list only */, const IndexType& index_type, const MetricType& metric_type, const IndexVersion& version, @@ -102,6 +103,8 @@ class VectorDiskAnnIndex : public VectorIndex { knowhere::Index index_; std::shared_ptr file_manager_; uint32_t search_beamwidth_ = 8; + // used for embedding list only + DataType elem_type_; }; template diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index 059b46371e..60c738c777 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -36,6 +36,7 @@ #include "common/Slice.h" #include "common/Types.h" #include "index/Utils.h" +#include "index/Meta.h" #include "log/Log.h" #include "storage/DiskFileManagerImpl.h" @@ -425,6 +426,15 @@ DiskFileManagerImpl::cache_raw_data_to_disk_internal(const Config& config) { std::string local_data_path; bool file_created = false; + // Check if we're dealing with embedding list (VECTOR_ARRAY) + auto is_embedding_list = + index::GetValueFromConfig(config, index::EMB_LIST); + bool is_vector_array = is_embedding_list.value_or(false); + std::vector offsets; + if (is_vector_array) { + offsets.push_back(0); // Initialize with 0 for cumulative offsets + } + // get batch raw data from s3 and write batch data to disk file // TODO: load and write of different batches at the same time std::vector batch_files; @@ -441,12 +451,14 @@ DiskFileManagerImpl::cache_raw_data_to_disk_internal(const Config& config) { for (int i = 0; i < batch_size; i++) { auto field_data = field_datas[i].get()->GetFieldData(); num_rows += uint32_t(field_data->get_num_rows()); - cache_raw_data_to_disk_common(field_data, - local_chunk_manager, - local_data_path, - file_created, - dim, - write_offset); + cache_raw_data_to_disk_common( + field_data, + local_chunk_manager, + local_data_path, + file_created, + dim, + write_offset, + is_vector_array ? &offsets : nullptr); } }; @@ -473,6 +485,32 @@ DiskFileManagerImpl::cache_raw_data_to_disk_internal(const Config& config) { local_chunk_manager->Write( local_data_path, write_offset, &dim, sizeof(dim)); + // Write offsets file for VECTOR_ARRAY + if (is_vector_array) { + AssertInfo(offsets.size() == num_rows + 1, + "offsets size is not equal to num_rows + 1: offset size {}, " + "num_rows {}", + offsets.size(), + num_rows); + // Get offsets path from config if provided, otherwise use default + auto offsets_path = index::GetValueFromConfig( + config, index::EMB_LIST_OFFSETS_PATH) + .value(); + local_chunk_manager->CreateFile(offsets_path); + + uint32_t num_offsets = offsets.size(); + int64_t offsets_write_pos = 0; + + local_chunk_manager->Write( + offsets_path, offsets_write_pos, &num_offsets, sizeof(uint32_t)); + offsets_write_pos += sizeof(uint32_t); + + local_chunk_manager->Write(offsets_path, + offsets_write_pos, + offsets.data(), + offsets.size() * sizeof(size_t)); + } + return local_data_path; } @@ -484,7 +522,8 @@ DiskFileManagerImpl::cache_raw_data_to_disk_common( std::string& local_data_path, bool& file_created, uint32_t& dim, - int64_t& write_offset) { + int64_t& write_offset, + std::vector* offsets) { auto data_type = field_data->get_data_type(); if (!file_created) { auto init_file_info = [&](milvus::DataType dt) { @@ -522,6 +561,45 @@ DiskFileManagerImpl::cache_raw_data_to_disk_common( local_data_path, write_offset, row.data(), row_byte_size); write_offset += row_byte_size; } + } else if (data_type == milvus::DataType::VECTOR_ARRAY) { + // Handle VECTOR_ARRAY - need to flatten the array data + auto vec_array_data = + dynamic_cast*>(field_data.get()); + AssertInfo(vec_array_data != nullptr, + "failed to cast field data to vector array"); + + dim = field_data->get_dim(); + auto rows = vec_array_data->get_num_rows(); + + // Calculate total data size needed + int64_t total_size = 0; + for (auto i = 0; i < rows; ++i) { + total_size += vec_array_data->DataSize(i); + } + + // Allocate buffer and copy data + auto buf = std::unique_ptr(new uint8_t[total_size]); + int64_t buf_offset = 0; + + for (auto i = 0; i < rows; ++i) { + auto vec_array = vec_array_data->value_at(i); + auto size = vec_array_data->DataSize(i); + + // Collect offsets information if needed (cumulative offsets) + if (offsets != nullptr) { + // Add cumulative offset (number of vectors processed so far) + size_t last_offset = offsets->back(); + offsets->push_back(last_offset + vec_array->length()); + } + + std::memcpy(buf.get() + buf_offset, vec_array->data(), size); + buf_offset += size; + } + + // Write flattened data to disk + local_chunk_manager->Write( + local_data_path, write_offset, buf.get(), total_size); + write_offset += total_size; } else { dim = field_data->get_dim(); auto data_size = @@ -559,6 +637,15 @@ DiskFileManagerImpl::cache_raw_data_to_disk_storage_v2(const Config& config) { std::string local_data_path; bool file_created = false; + // Check if we're dealing with embedding list (VECTOR_ARRAY) + auto is_embedding_list = + index::GetValueFromConfig(config, index::EMB_LIST); + bool is_vector_array = is_embedding_list.value_or(false); + std::vector offsets; + if (is_vector_array) { + offsets.push_back(0); // Initialize with 0 for cumulative offsets + } + // file format // num_rows(uint32) | dim(uint32) | index_data ([]uint8_t) uint32_t num_rows = 0; @@ -578,7 +665,8 @@ DiskFileManagerImpl::cache_raw_data_to_disk_storage_v2(const Config& config) { local_data_path, file_created, var_dim, - write_offset); + write_offset, + is_vector_array ? &offsets : nullptr); } // write num_rows and dim value to file header @@ -589,6 +677,33 @@ DiskFileManagerImpl::cache_raw_data_to_disk_storage_v2(const Config& config) { local_chunk_manager->Write( local_data_path, write_offset, &var_dim, sizeof(var_dim)); + // Write offsets file for VECTOR_ARRAY + if (is_vector_array) { + AssertInfo(offsets.size() == num_rows + 1, + "offsets size is not equal to num_rows + 1: offset size {}, " + "num_rows {}", + offsets.size(), + num_rows); + // Get offsets path from config if provided, otherwise use default + auto offsets_path = index::GetValueFromConfig( + config, index::EMB_LIST_OFFSETS_PATH) + .value(); + + local_chunk_manager->CreateFile(offsets_path); + + uint32_t num_offsets = offsets.size(); + int64_t offsets_write_pos = 0; + + local_chunk_manager->Write( + offsets_path, offsets_write_pos, &num_offsets, sizeof(uint32_t)); + offsets_write_pos += sizeof(uint32_t); + + local_chunk_manager->Write(offsets_path, + offsets_write_pos, + offsets.data(), + offsets.size() * sizeof(size_t)); + } + return local_data_path; } diff --git a/internal/core/src/storage/DiskFileManagerImpl.h b/internal/core/src/storage/DiskFileManagerImpl.h index 0e80fb71fc..64c6a38c67 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.h +++ b/internal/core/src/storage/DiskFileManagerImpl.h @@ -260,7 +260,8 @@ class DiskFileManagerImpl : public FileManagerImpl { std::string& local_data_path, bool& file_created, uint32_t& dim, - int64_t& write_offset); + int64_t& write_offset, + std::vector* offsets = nullptr); private: // local file path (abs path) diff --git a/internal/core/thirdparty/knowhere/CMakeLists.txt b/internal/core/thirdparty/knowhere/CMakeLists.txt index 24ef461661..e7f0496707 100644 --- a/internal/core/thirdparty/knowhere/CMakeLists.txt +++ b/internal/core/thirdparty/knowhere/CMakeLists.txt @@ -14,7 +14,7 @@ # Update KNOWHERE_VERSION for the first occurrence milvus_add_pkg_config("knowhere") set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "") -set( KNOWHERE_VERSION ac1d7ad ) +set( KNOWHERE_VERSION deadb8e ) set( GIT_REPOSITORY "https://github.com/zilliztech/knowhere.git") message(STATUS "Knowhere repo: ${GIT_REPOSITORY}") diff --git a/internal/core/unittest/test_storage_v2_index_raw_data.cpp b/internal/core/unittest/test_storage_v2_index_raw_data.cpp index 9f182c9732..99958d0a3f 100644 --- a/internal/core/unittest/test_storage_v2_index_raw_data.cpp +++ b/internal/core/unittest/test_storage_v2_index_raw_data.cpp @@ -243,7 +243,7 @@ TEST_F(StorageV2IndexRawDataTest, TestGetRawData) { try { auto vec_index = std::make_unique>( - index_type, metric_type, 6, ctx); + milvus::DataType::NONE, index_type, metric_type, 6, ctx); vec_index->Build(config); } catch (const std::exception& e) { std::cout << "Exception: " << e.what() << std::endl;