mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance: [2.6]embedding_list support mmap in MemVectorIndex (#44832)
issue: https://github.com/milvus-io/milvus/issues/44702 related pr: https://github.com/milvus-io/milvus/pull/44764 Signed-off-by: cqy123456 <qianya.cheng@zilliz.com>
This commit is contained in:
parent
bb4446e5af
commit
b43dcdf9aa
@ -45,6 +45,7 @@
|
||||
#include "index/VectorIndex.h"
|
||||
#include "storage/Util.h"
|
||||
#include "storage/ChunkManager.h"
|
||||
#include "test_utils/indexbuilder_test_utils.h"
|
||||
#include "test_utils/storage_test_utils.h"
|
||||
#include "common/QueryResult.h"
|
||||
|
||||
@ -52,8 +53,6 @@ using namespace milvus;
|
||||
using namespace milvus::segcore;
|
||||
using namespace milvus::storage;
|
||||
|
||||
const int64_t DIM = 4;
|
||||
|
||||
SchemaPtr
|
||||
GenVectorArrayTestSchema() {
|
||||
auto schema = std::make_shared<Schema>();
|
||||
@ -345,7 +344,7 @@ TEST_F(TestVectorArrayStorageV2, BuildEmbListHNSWIndex) {
|
||||
auto search_conf = knowhere::Json{{knowhere::indexparam::NPROBE, 10}};
|
||||
milvus::SearchInfo searchInfo;
|
||||
searchInfo.topk_ = 5;
|
||||
searchInfo.metric_type_ = knowhere::metric::MAX_SIM;
|
||||
searchInfo.metric_type_ = knowhere::metric::MAX_SIM_IP;
|
||||
searchInfo.search_params_ = search_conf;
|
||||
SearchResult result;
|
||||
milvus::OpContext op_context;
|
||||
@ -359,3 +358,134 @@ TEST_F(TestVectorArrayStorageV2, BuildEmbListHNSWIndex) {
|
||||
EXPECT_EQ(op_context.storage_usage.scanned_total_bytes, 0);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestVectorArrayStorageV2, BuildEmbListHNSWIndexWithMmap) {
|
||||
ASSERT_NE(segment_, nullptr);
|
||||
ASSERT_EQ(segment_->get_row_count(), test_data_count_ * chunk_num_);
|
||||
|
||||
auto vector_array_field_id = fields_["vector_array"];
|
||||
ASSERT_TRUE(segment_->HasFieldData(vector_array_field_id));
|
||||
|
||||
// Get the storage v2 parquet file paths that were already written in SetUp
|
||||
std::vector<std::string> paths = {"test_data/101/10001.parquet"};
|
||||
|
||||
// Use the existing Arrow file system from SetUp
|
||||
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
|
||||
.GetArrowFileSystem();
|
||||
|
||||
// Prepare for index building
|
||||
int64_t collection_id = 1;
|
||||
int64_t partition_id = 2;
|
||||
int64_t segment_id = 3;
|
||||
int64_t index_build_id = 4000;
|
||||
int64_t index_version = 4000;
|
||||
|
||||
auto field_meta =
|
||||
milvus::segcore::gen_field_meta(collection_id,
|
||||
partition_id,
|
||||
segment_id,
|
||||
vector_array_field_id.get(),
|
||||
DataType::VECTOR_ARRAY,
|
||||
DataType::VECTOR_FLOAT,
|
||||
false);
|
||||
|
||||
auto index_meta = gen_index_meta(
|
||||
segment_id, vector_array_field_id.get(), index_build_id, index_version);
|
||||
|
||||
// Create storage config pointing to the test data location
|
||||
auto storage_config =
|
||||
gen_local_storage_config("/tmp/test_vector_array_for_storage_v2");
|
||||
auto cm = CreateChunkManager(storage_config);
|
||||
|
||||
// Create index using storage v2 config
|
||||
milvus::index::CreateIndexInfo create_index_info;
|
||||
create_index_info.field_type = DataType::VECTOR_ARRAY;
|
||||
create_index_info.metric_type = knowhere::metric::MAX_SIM_IP;
|
||||
create_index_info.index_type = knowhere::IndexEnum::INDEX_HNSW;
|
||||
create_index_info.index_engine_version =
|
||||
knowhere::Version::GetCurrentVersion().VersionNumber();
|
||||
|
||||
auto emb_list_hnsw_index =
|
||||
milvus::index::IndexFactory::GetInstance().CreateIndex(
|
||||
create_index_info,
|
||||
storage::FileManagerContext(field_meta, index_meta, cm, fs));
|
||||
|
||||
// Build index with storage v2 configuration
|
||||
Config config;
|
||||
config[milvus::index::INDEX_TYPE] = knowhere::IndexEnum::INDEX_HNSW;
|
||||
config[knowhere::meta::METRIC_TYPE] = create_index_info.metric_type;
|
||||
config[knowhere::indexparam::M] = "16";
|
||||
config[knowhere::indexparam::EF] = "10";
|
||||
config[DIM_KEY] = DIM;
|
||||
config[INDEX_NUM_ROWS_KEY] =
|
||||
test_data_count_ * chunk_num_; // Important: set row count
|
||||
config[STORAGE_VERSION_KEY] = 2; // Use storage v2
|
||||
config[DATA_TYPE_KEY] = DataType::VECTOR_ARRAY;
|
||||
config[ELEMENT_TYPE_KEY] = DataType::VECTOR_FLOAT;
|
||||
|
||||
// For storage v2, we need to provide segment insert files instead of individual binlog files
|
||||
milvus::SegmentInsertFiles segment_insert_files;
|
||||
segment_insert_files.emplace_back(
|
||||
paths); // Column group with vector array field
|
||||
config[SEGMENT_INSERT_FILES_KEY] = segment_insert_files;
|
||||
emb_list_hnsw_index->Build(config);
|
||||
|
||||
auto create_index_result = emb_list_hnsw_index->Upload();
|
||||
emb_list_hnsw_index.reset();
|
||||
auto index_files = create_index_result->GetIndexFiles();
|
||||
auto memSize = create_index_result->GetMemSize();
|
||||
auto serializedSize = create_index_result->GetSerializedSize();
|
||||
ASSERT_GT(memSize, 0);
|
||||
ASSERT_GT(serializedSize, 0);
|
||||
|
||||
auto new_emb_list_hnsw_index =
|
||||
milvus::index::IndexFactory::GetInstance().CreateIndex(
|
||||
create_index_info,
|
||||
storage::FileManagerContext(field_meta, index_meta, cm, fs));
|
||||
milvus::index::VectorIndex* vec_index =
|
||||
dynamic_cast<milvus::index::VectorIndex*>(
|
||||
new_emb_list_hnsw_index.get());
|
||||
// mmap load
|
||||
{
|
||||
auto load_conf = generate_load_conf(
|
||||
knowhere::IndexEnum::INDEX_HNSW, knowhere::metric::MAX_SIM_IP, 0);
|
||||
load_conf["index_files"] = index_files;
|
||||
load_conf["mmap_filepath"] = "mmap/test_emb_list_index";
|
||||
load_conf["emb_list_meta_file_path"] = "mmap/test_index_meta";
|
||||
load_conf[milvus::LOAD_PRIORITY] =
|
||||
milvus::proto::common::LoadPriority::HIGH;
|
||||
vec_index->Load(milvus::tracer::TraceContext{}, load_conf);
|
||||
}
|
||||
// search
|
||||
{
|
||||
// Each row has 3 vectors, so total count should be rows * 3
|
||||
EXPECT_EQ(vec_index->Count(), test_data_count_ * chunk_num_ * 3);
|
||||
EXPECT_EQ(vec_index->GetDim(), DIM);
|
||||
auto vec_num = 10;
|
||||
std::vector<float> query_vec = generate_float_vector(vec_num, DIM);
|
||||
auto query_dataset =
|
||||
knowhere::GenDataSet(vec_num, DIM, query_vec.data());
|
||||
std::vector<size_t> query_vec_lims;
|
||||
query_vec_lims.push_back(0);
|
||||
query_vec_lims.push_back(3);
|
||||
query_vec_lims.push_back(10);
|
||||
query_dataset->Set(knowhere::meta::EMB_LIST_OFFSET,
|
||||
const_cast<const size_t*>(query_vec_lims.data()));
|
||||
|
||||
auto search_conf = knowhere::Json{{knowhere::indexparam::NPROBE, 10}};
|
||||
milvus::SearchInfo searchInfo;
|
||||
searchInfo.topk_ = 5;
|
||||
searchInfo.metric_type_ = knowhere::metric::MAX_SIM_IP;
|
||||
searchInfo.search_params_ = search_conf;
|
||||
SearchResult result;
|
||||
milvus::OpContext op_context;
|
||||
vec_index->Query(
|
||||
query_dataset, searchInfo, nullptr, &op_context, result);
|
||||
auto ref_result = SearchResultToJson(result);
|
||||
std::cout << ref_result.dump(1) << std::endl;
|
||||
EXPECT_EQ(result.total_nq_, 2);
|
||||
EXPECT_EQ(result.distances_.size(), 2 * searchInfo.topk_);
|
||||
EXPECT_EQ(op_context.storage_usage.scanned_cold_bytes, 0);
|
||||
EXPECT_EQ(op_context.storage_usage.scanned_total_bytes, 0);
|
||||
}
|
||||
}
|
||||
@ -17543,7 +17543,7 @@ TEST_P(ExprTest, TestSTDWithinFunction) {
|
||||
});
|
||||
}
|
||||
|
||||
TEST_P(ExprTest, ParseGISFunctionFilterExprs) {
|
||||
TEST(ExprTest, ParseGISFunctionFilterExprs) {
|
||||
// Build Schema
|
||||
auto schema = std::make_shared<Schema>();
|
||||
auto dim = 16;
|
||||
@ -17552,12 +17552,10 @@ TEST_P(ExprTest, ParseGISFunctionFilterExprs) {
|
||||
auto geo_id = schema->AddDebugField("geo", DataType::GEOMETRY);
|
||||
auto pk_id = schema->AddDebugField("pk", DataType::INT64);
|
||||
schema->set_primary_field_id(pk_id);
|
||||
|
||||
// Generate data and load
|
||||
int64_t N = 1000;
|
||||
auto dataset = DataGen(schema, N);
|
||||
auto seg = CreateSealedWithFieldDataLoaded(schema, dataset);
|
||||
|
||||
// Test plan with gisfunction_filter_expr
|
||||
std::string raw_plan = R"PLAN(vector_anns: <
|
||||
field_id: 100
|
||||
@ -17579,17 +17577,13 @@ TEST_P(ExprTest, ParseGISFunctionFilterExprs) {
|
||||
>
|
||||
placeholder_tag: "$0"
|
||||
>)PLAN";
|
||||
|
||||
// Convert and parse
|
||||
auto bin_plan = translate_text_plan_with_metric_type(raw_plan);
|
||||
auto bin_plan = translate_text_plan_to_binary_plan(raw_plan.c_str());
|
||||
auto plan =
|
||||
CreateSearchPlanByExpr(schema, bin_plan.data(), bin_plan.size());
|
||||
|
||||
// If parsing fails, test will fail with exception
|
||||
// If parsing succeeds, ParseGISFunctionFilterExprs is covered
|
||||
|
||||
// Execute search to verify execution logic
|
||||
|
||||
auto ph_raw = CreatePlaceholderGroup(5, dim, 123);
|
||||
auto ph_grp = ParsePlaceholderGroup(plan.get(), ph_raw.SerializeAsString());
|
||||
auto sr = seg->Search(plan.get(), ph_grp.get(), MAX_TIMESTAMP);
|
||||
|
||||
@ -79,6 +79,8 @@ constexpr const char* ENABLE_OFFSET_CACHE = "indexoffsetcache.enabled";
|
||||
// VecIndex file metas
|
||||
constexpr const char* DISK_ANN_PREFIX_PATH = "index_prefix";
|
||||
constexpr const char* DISK_ANN_RAW_DATA_PATH = "data_path";
|
||||
constexpr const char* EMB_LIST_META_PATH = "emb_list_meta_file_path";
|
||||
constexpr const char* EMB_LIST_META_FILE_NAME = "emb_list_meta";
|
||||
|
||||
// VecIndex node filtering
|
||||
constexpr const char* VEC_OPT_FIELDS_PATH = "opt_fields_path";
|
||||
|
||||
@ -591,6 +591,20 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
|
||||
GetValueFromConfig<milvus::proto::common::LoadPriority>(
|
||||
config, milvus::LOAD_PRIORITY)
|
||||
.value_or(milvus::proto::common::LoadPriority::HIGH);
|
||||
auto is_embedding_list = (elem_type_ != DataType::NONE);
|
||||
std::unique_ptr<storage::FileWriter> embedding_list_meta_writer_ptr =
|
||||
nullptr;
|
||||
auto embedding_list_meta_path =
|
||||
GetValueFromConfig<std::string>(config, EMB_LIST_META_PATH);
|
||||
if (is_embedding_list) {
|
||||
AssertInfo(embedding_list_meta_path.has_value(),
|
||||
"mmap filepath is empty when load index");
|
||||
std::filesystem::create_directories(
|
||||
std::filesystem::path(embedding_list_meta_path.value())
|
||||
.parent_path());
|
||||
embedding_list_meta_writer_ptr = std::make_unique<storage::FileWriter>(
|
||||
embedding_list_meta_path.value());
|
||||
}
|
||||
|
||||
auto file_writer = storage::FileWriter(
|
||||
local_filepath.value(),
|
||||
@ -654,7 +668,14 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
|
||||
"lost index slice data");
|
||||
auto&& data = batch_data[file_name];
|
||||
auto start_write_file = std::chrono::system_clock::now();
|
||||
file_writer.Write(data->PayloadData(), data->PayloadSize());
|
||||
if (prefix == knowhere::meta::EMB_LIST_META &&
|
||||
embedding_list_meta_writer_ptr) {
|
||||
embedding_list_meta_writer_ptr->Write(
|
||||
data->PayloadData(), data->PayloadSize());
|
||||
} else {
|
||||
file_writer.Write(data->PayloadData(),
|
||||
data->PayloadSize());
|
||||
}
|
||||
write_disk_duration_sum +=
|
||||
(std::chrono::system_clock::now() - start_write_file);
|
||||
}
|
||||
@ -686,9 +707,15 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
|
||||
(std::chrono::system_clock::now() - start_load_files2_mem);
|
||||
//2. write data into files
|
||||
auto start_write_file = std::chrono::system_clock::now();
|
||||
for (auto& [_, index_data] : result) {
|
||||
file_writer.Write(index_data->PayloadData(),
|
||||
index_data->PayloadSize());
|
||||
for (auto& [prefix, index_data] : result) {
|
||||
if (prefix == knowhere::meta::EMB_LIST_META &&
|
||||
embedding_list_meta_writer_ptr) {
|
||||
embedding_list_meta_writer_ptr->Write(
|
||||
index_data->PayloadData(), index_data->PayloadSize());
|
||||
} else {
|
||||
file_writer.Write(index_data->PayloadData(),
|
||||
index_data->PayloadSize());
|
||||
}
|
||||
}
|
||||
write_disk_duration_sum +=
|
||||
(std::chrono::system_clock::now() - start_write_file);
|
||||
@ -701,11 +728,17 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
|
||||
write_disk_duration_sum)
|
||||
.count());
|
||||
file_writer.Finish();
|
||||
if (embedding_list_meta_writer_ptr) {
|
||||
embedding_list_meta_writer_ptr->Finish();
|
||||
}
|
||||
|
||||
LOG_INFO("load index into Knowhere...");
|
||||
auto conf = config;
|
||||
conf.erase(MMAP_FILE_PATH);
|
||||
conf[ENABLE_MMAP] = true;
|
||||
if (is_embedding_list) {
|
||||
conf["emb_list_meta_file_path"] = embedding_list_meta_path.value();
|
||||
}
|
||||
auto start_deserialize = std::chrono::system_clock::now();
|
||||
auto stat = index_.DeserializeFromFile(local_filepath.value(), conf);
|
||||
auto deserialize_duration =
|
||||
|
||||
@ -117,10 +117,17 @@ SealedIndexTranslator::get_cells(const std::vector<cid_t>& cids) {
|
||||
"mmap directory path is empty");
|
||||
auto filepath = std::filesystem::path(index_load_info_.mmap_dir_path) /
|
||||
"index_files" / index_load_info_.index_id /
|
||||
index_load_info_.segment_id / index_load_info_.field_id;
|
||||
|
||||
index_load_info_.segment_id /
|
||||
index_load_info_.field_id / "index";
|
||||
auto embedding_list_meta_path =
|
||||
std::filesystem::path(index_load_info_.mmap_dir_path) /
|
||||
"index_files" / index_load_info_.index_id /
|
||||
index_load_info_.segment_id / index_load_info_.field_id /
|
||||
index::EMB_LIST_META_FILE_NAME;
|
||||
config_[milvus::index::ENABLE_MMAP] = "true";
|
||||
config_[milvus::index::MMAP_FILE_PATH] = filepath.string();
|
||||
config_[milvus::index::EMB_LIST_META_PATH] =
|
||||
embedding_list_meta_path.string();
|
||||
} else {
|
||||
config_[milvus::index::ENABLE_MMAP] = "false";
|
||||
}
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# Update KNOWHERE_VERSION for the first occurrence
|
||||
milvus_add_pkg_config("knowhere")
|
||||
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
|
||||
set( KNOWHERE_VERSION c4d5dd8 )
|
||||
set( KNOWHERE_VERSION v2.6.4 )
|
||||
set( GIT_REPOSITORY "https://github.com/zilliztech/knowhere.git")
|
||||
|
||||
message(STATUS "Knowhere repo: ${GIT_REPOSITORY}")
|
||||
|
||||
@ -2376,7 +2376,6 @@ class SealedVectorArrayTest
|
||||
|
||||
TEST_P(SealedVectorArrayTest, QueryVectorArrayAllFields) {
|
||||
auto schema = std::make_shared<Schema>();
|
||||
|
||||
auto int64_field = schema->AddDebugField("int64", DataType::INT64);
|
||||
auto array_vec = schema->AddDebugVectorArrayField(
|
||||
"array_vec", element_type, dim, metric_type);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user