mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-02 01:06:41 +08:00
fix: support nullable vector fields for DiskAnn index (#46846)
issue: #46834 relate: #45993 Save valid_data file during DiskAnn index build to fix bitset size mismatch error when searching with nullable vector fields. --------- Signed-off-by: marcelo-cjl <marcelo.chen@zilliz.com>
This commit is contained in:
parent
5d31fca0f5
commit
2b556dcac7
@ -84,6 +84,7 @@ constexpr const char* DISK_ANN_RAW_DATA_PATH = "data_path";
|
||||
constexpr const char* EMB_LIST_META_PATH = "emb_list_meta_file_path";
|
||||
constexpr const char* EMB_LIST_META_FILE_NAME = "emb_list_meta";
|
||||
constexpr const char* EMB_LIST_OFFSETS_PATH = "emb_list_offset_file_path";
|
||||
constexpr const char* VALID_DATA_PATH_KEY = "valid_data_file_path";
|
||||
|
||||
// VecIndex node filtering
|
||||
constexpr const char* VEC_OPT_FIELDS_PATH = "opt_fields_path";
|
||||
|
||||
@ -28,6 +28,7 @@
|
||||
#include "common/RangeSearchHelper.h"
|
||||
#include "indexbuilder/types.h"
|
||||
#include "filemanager/FileManager.h"
|
||||
#include "log/Log.h"
|
||||
|
||||
namespace milvus::index {
|
||||
|
||||
@ -160,6 +161,9 @@ VectorDiskAnnIndex<T>::Upload(const Config& config) {
|
||||
template <typename T>
|
||||
void
|
||||
VectorDiskAnnIndex<T>::Build(const Config& config) {
|
||||
LOG_INFO("start build disk index, build_id: {}",
|
||||
config.value("build_id", "unknown"));
|
||||
|
||||
auto local_chunk_manager =
|
||||
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
|
||||
knowhere::Json build_config;
|
||||
@ -181,6 +185,11 @@ VectorDiskAnnIndex<T>::Build(const Config& config) {
|
||||
config_with_emb_list[EMB_LIST_OFFSETS_PATH] = offsets_path;
|
||||
}
|
||||
|
||||
// Set valid data path to track nullable vector fields
|
||||
auto local_index_path_prefix = file_manager_->GetLocalIndexObjectPrefix();
|
||||
auto valid_data_path = local_index_path_prefix + "/" + VALID_DATA_KEY;
|
||||
config_with_emb_list[VALID_DATA_PATH_KEY] = valid_data_path;
|
||||
|
||||
auto local_data_path =
|
||||
file_manager_->CacheRawDataToDisk<T>(config_with_emb_list);
|
||||
build_config[DISK_ANN_RAW_DATA_PATH] = local_data_path;
|
||||
@ -195,7 +204,6 @@ VectorDiskAnnIndex<T>::Build(const Config& config) {
|
||||
build_config[EMB_LIST_OFFSETS_PATH] = offsets_path;
|
||||
}
|
||||
|
||||
auto local_index_path_prefix = file_manager_->GetLocalIndexObjectPrefix();
|
||||
build_config[DISK_ANN_PREFIX_PATH] = local_index_path_prefix;
|
||||
|
||||
if (GetIndexType() == knowhere::IndexEnum::INDEX_DISKANN) {
|
||||
@ -227,8 +235,16 @@ VectorDiskAnnIndex<T>::Build(const Config& config) {
|
||||
ThrowInfo(ErrorCode::IndexBuildError,
|
||||
"failed to build disk index, " + KnowhereStatusString(stat));
|
||||
|
||||
// Add valid_data file to index if it was created (nullable vector field)
|
||||
if (local_chunk_manager->Exist(valid_data_path)) {
|
||||
file_manager_->AddFile(valid_data_path);
|
||||
}
|
||||
|
||||
local_chunk_manager->RemoveDir(storage::GenFieldRawDataPathPrefix(
|
||||
local_chunk_manager, segment_id, field_id));
|
||||
|
||||
LOG_INFO("build disk index done, build_id: {}",
|
||||
config.value("build_id", "unknown"));
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
|
||||
@ -523,6 +523,12 @@ DiskFileManagerImpl::cache_raw_data_to_disk_internal(const Config& config) {
|
||||
offsets.push_back(0); // Initialize with 0 for cumulative offsets
|
||||
}
|
||||
|
||||
auto valid_data_path = index::GetValueFromConfig<std::string>(
|
||||
config, index::VALID_DATA_PATH_KEY);
|
||||
std::vector<uint8_t> valid_bitmap;
|
||||
uint64_t total_num_rows = 0;
|
||||
bool nullable = false;
|
||||
|
||||
// get batch raw data from s3 and write batch data to disk file
|
||||
// TODO: load and write of different batches at the same time
|
||||
std::vector<std::string> batch_files;
|
||||
@ -541,6 +547,24 @@ DiskFileManagerImpl::cache_raw_data_to_disk_internal(const Config& config) {
|
||||
for (int i = 0; i < batch_size; i++) {
|
||||
auto field_data = codecs[i]->GetFieldData();
|
||||
num_rows += uint32_t(field_data->get_valid_rows());
|
||||
|
||||
if (valid_data_path.has_value() && field_data->IsNullable()) {
|
||||
nullable = true;
|
||||
auto rows = field_data->get_num_rows();
|
||||
if (rows > 0) {
|
||||
auto new_size = (total_num_rows + rows + 7) / 8;
|
||||
if (new_size > static_cast<int64_t>(valid_bitmap.size())) {
|
||||
valid_bitmap.resize(new_size, 0);
|
||||
}
|
||||
for (int64_t i = 0; i < rows; ++i) {
|
||||
if (field_data->is_valid(i)) {
|
||||
set_bit(valid_bitmap, total_num_rows + i);
|
||||
}
|
||||
}
|
||||
total_num_rows += rows;
|
||||
}
|
||||
}
|
||||
|
||||
cache_raw_data_to_disk_common<DataType>(
|
||||
field_data,
|
||||
local_chunk_manager,
|
||||
@ -601,6 +625,13 @@ DiskFileManagerImpl::cache_raw_data_to_disk_internal(const Config& config) {
|
||||
offsets.size() * sizeof(size_t));
|
||||
}
|
||||
|
||||
if (nullable && valid_data_path.has_value() && total_num_rows > 0) {
|
||||
write_valid_data_file(local_chunk_manager,
|
||||
valid_data_path.value(),
|
||||
valid_bitmap,
|
||||
total_num_rows);
|
||||
}
|
||||
|
||||
return local_data_path;
|
||||
}
|
||||
|
||||
@ -702,6 +733,25 @@ DiskFileManagerImpl::cache_raw_data_to_disk_common(
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
DiskFileManagerImpl::write_valid_data_file(
|
||||
const std::shared_ptr<LocalChunkManager>& local_chunk_manager,
|
||||
const std::string& valid_data_path,
|
||||
std::vector<uint8_t>& valid_bitmap,
|
||||
uint64_t total_num_rows) {
|
||||
local_chunk_manager->CreateFile(valid_data_path);
|
||||
int64_t valid_write_pos = 0;
|
||||
|
||||
local_chunk_manager->Write(
|
||||
valid_data_path, valid_write_pos, &total_num_rows, sizeof(uint64_t));
|
||||
valid_write_pos += sizeof(uint64_t);
|
||||
|
||||
local_chunk_manager->Write(valid_data_path,
|
||||
valid_write_pos,
|
||||
valid_bitmap.data(),
|
||||
valid_bitmap.size());
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
std::string
|
||||
DiskFileManagerImpl::cache_raw_data_to_disk_storage_v2(const Config& config) {
|
||||
@ -736,6 +786,10 @@ DiskFileManagerImpl::cache_raw_data_to_disk_storage_v2(const Config& config) {
|
||||
offsets.push_back(0); // Initialize with 0 for cumulative offsets
|
||||
}
|
||||
|
||||
// Check if we need to track validity data for nullable vector fields
|
||||
auto valid_data_path = index::GetValueFromConfig<std::string>(
|
||||
config, index::VALID_DATA_PATH_KEY);
|
||||
|
||||
// file format
|
||||
// num_rows(uint32) | dim(uint32) | index_data ([]uint8_t)
|
||||
uint32_t num_rows = 0;
|
||||
@ -764,8 +818,36 @@ DiskFileManagerImpl::cache_raw_data_to_disk_storage_v2(const Config& config) {
|
||||
dim,
|
||||
fs_);
|
||||
}
|
||||
|
||||
bool nullable = false;
|
||||
uint64_t total_num_rows = 0;
|
||||
if (valid_data_path.has_value()) {
|
||||
for (auto& field_data : field_datas) {
|
||||
if (field_data->IsNullable()) {
|
||||
nullable = true;
|
||||
}
|
||||
total_num_rows += field_data->get_num_rows();
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<uint8_t> valid_bitmap;
|
||||
if (nullable) {
|
||||
valid_bitmap.resize((total_num_rows + 7) / 8, 0);
|
||||
}
|
||||
|
||||
int64_t chunk_offset = 0;
|
||||
for (auto& field_data : field_datas) {
|
||||
num_rows += uint32_t(field_data->get_valid_rows());
|
||||
if (nullable) {
|
||||
auto rows = field_data->get_num_rows();
|
||||
for (int64_t i = 0; i < rows; ++i) {
|
||||
if (field_data->is_valid(i)) {
|
||||
set_bit(valid_bitmap, chunk_offset + i);
|
||||
}
|
||||
}
|
||||
chunk_offset += rows;
|
||||
}
|
||||
|
||||
cache_raw_data_to_disk_common<T>(field_data,
|
||||
local_chunk_manager,
|
||||
local_data_path,
|
||||
@ -810,6 +892,13 @@ DiskFileManagerImpl::cache_raw_data_to_disk_storage_v2(const Config& config) {
|
||||
offsets.size() * sizeof(size_t));
|
||||
}
|
||||
|
||||
if (nullable && valid_data_path.has_value() && total_num_rows > 0) {
|
||||
write_valid_data_file(local_chunk_manager,
|
||||
valid_data_path.value(),
|
||||
valid_bitmap,
|
||||
total_num_rows);
|
||||
}
|
||||
|
||||
return local_data_path;
|
||||
}
|
||||
|
||||
|
||||
@ -281,6 +281,18 @@ class DiskFileManagerImpl : public FileManagerImpl {
|
||||
int64_t& write_offset,
|
||||
std::vector<size_t>* offsets = nullptr);
|
||||
|
||||
inline void
|
||||
set_bit(std::vector<uint8_t>& bitmap, int64_t bit_pos) {
|
||||
bitmap[bit_pos >> 3] |= (1 << (bit_pos & 0x07));
|
||||
}
|
||||
|
||||
void
|
||||
write_valid_data_file(
|
||||
const std::shared_ptr<LocalChunkManager>& local_chunk_manager,
|
||||
const std::string& valid_data_path,
|
||||
std::vector<uint8_t>& valid_bitmap,
|
||||
uint64_t total_num_rows);
|
||||
|
||||
private:
|
||||
// local file path (abs path)
|
||||
std::vector<std::string> local_paths_;
|
||||
|
||||
@ -40,6 +40,7 @@
|
||||
#include "storage/Util.h"
|
||||
#include "storage/DiskFileManagerImpl.h"
|
||||
#include "storage/LocalChunkManagerSingleton.h"
|
||||
#include "index/Meta.h"
|
||||
|
||||
#include "test_utils/storage_test_utils.h"
|
||||
|
||||
@ -816,4 +817,168 @@ TEST_F(DiskAnnFileManagerTest, FileCleanup) {
|
||||
EXPECT_FALSE(local_chunk_manager->Exist(local_text_index_file_path));
|
||||
EXPECT_FALSE(local_chunk_manager->Exist(local_index_file_path));
|
||||
EXPECT_FALSE(local_chunk_manager->Exist(local_json_stats_file_path));
|
||||
}
|
||||
|
||||
TEST_F(DiskAnnFileManagerTest, CacheRawDataToDiskValidDataFile) {
|
||||
const int64_t collection_id = 1;
|
||||
const int64_t partition_id = 2;
|
||||
const int64_t segment_id = 3;
|
||||
const int64_t field_id = 100;
|
||||
const int64_t dim = 128;
|
||||
const int64_t num_rows = 100;
|
||||
const int64_t null_percent = 20; // 20% null
|
||||
const int64_t valid_count = num_rows * (100 - null_percent) / 100;
|
||||
|
||||
std::vector<uint8_t> valid_data((num_rows + 7) / 8, 0);
|
||||
for (int64_t i = 0; i < valid_count; ++i) {
|
||||
valid_data[i >> 3] |= (1 << (i & 0x07));
|
||||
}
|
||||
|
||||
std::vector<float> vec_data(valid_count * dim);
|
||||
for (size_t i = 0; i < vec_data.size(); ++i) {
|
||||
vec_data[i] = static_cast<float>(i % 100);
|
||||
}
|
||||
|
||||
auto field_data = storage::CreateFieldData(
|
||||
DataType::VECTOR_FLOAT, DataType::NONE, true, dim);
|
||||
auto field_data_impl =
|
||||
std::dynamic_pointer_cast<milvus::FieldData<milvus::FloatVector>>(
|
||||
field_data);
|
||||
field_data_impl->FillFieldData(
|
||||
vec_data.data(), valid_data.data(), num_rows, 0);
|
||||
|
||||
ASSERT_EQ(field_data->get_num_rows(), num_rows);
|
||||
ASSERT_EQ(field_data->get_valid_rows(), valid_count);
|
||||
ASSERT_TRUE(field_data->IsNullable());
|
||||
|
||||
auto payload_reader =
|
||||
std::make_shared<milvus::storage::PayloadReader>(field_data);
|
||||
storage::InsertData insert_data(payload_reader);
|
||||
FieldDataMeta field_data_meta = {
|
||||
collection_id, partition_id, segment_id, field_id};
|
||||
insert_data.SetFieldDataMeta(field_data_meta);
|
||||
insert_data.SetTimestamps(0, 100);
|
||||
|
||||
auto serialized_data = insert_data.Serialize(storage::StorageType::Remote);
|
||||
|
||||
std::string insert_file_path = "/tmp/diskann/valid_data_test";
|
||||
boost::filesystem::remove_all(insert_file_path);
|
||||
cm_->Write(
|
||||
insert_file_path, serialized_data.data(), serialized_data.size());
|
||||
|
||||
IndexMeta index_meta = {segment_id,
|
||||
field_id,
|
||||
1000,
|
||||
1,
|
||||
"test",
|
||||
"vec_field",
|
||||
DataType::VECTOR_FLOAT,
|
||||
dim};
|
||||
auto file_manager = std::make_shared<DiskFileManagerImpl>(
|
||||
storage::FileManagerContext(field_data_meta, index_meta, cm_, fs_));
|
||||
|
||||
std::string valid_data_path = "/tmp/diskann/valid_data_test_output";
|
||||
boost::filesystem::remove_all(valid_data_path);
|
||||
|
||||
milvus::Config config;
|
||||
config[INSERT_FILES_KEY] = std::vector<std::string>{insert_file_path};
|
||||
config[index::VALID_DATA_PATH_KEY] = valid_data_path;
|
||||
|
||||
auto local_data_path = file_manager->CacheRawDataToDisk<float>(config);
|
||||
ASSERT_FALSE(local_data_path.empty());
|
||||
|
||||
auto local_chunk_manager =
|
||||
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
|
||||
|
||||
ASSERT_TRUE(local_chunk_manager->Exist(valid_data_path))
|
||||
<< "valid_data file should be created for nullable field";
|
||||
|
||||
size_t read_total_num_rows = 0;
|
||||
local_chunk_manager->Read(
|
||||
valid_data_path, 0, &read_total_num_rows, sizeof(size_t));
|
||||
EXPECT_EQ(read_total_num_rows, num_rows)
|
||||
<< "total_num_rows should match original num_rows";
|
||||
|
||||
size_t bitmap_size = (num_rows + 7) / 8;
|
||||
std::vector<uint8_t> read_bitmap(bitmap_size);
|
||||
local_chunk_manager->Read(
|
||||
valid_data_path, sizeof(size_t), read_bitmap.data(), bitmap_size);
|
||||
|
||||
// Verify bitmap content
|
||||
for (int64_t i = 0; i < num_rows; ++i) {
|
||||
bool expected_valid = (i < valid_count);
|
||||
bool actual_valid = (read_bitmap[i / 8] >> (i % 8)) & 1;
|
||||
EXPECT_EQ(actual_valid, expected_valid)
|
||||
<< "Validity mismatch at row " << i;
|
||||
}
|
||||
|
||||
local_chunk_manager->Remove(local_data_path);
|
||||
local_chunk_manager->Remove(valid_data_path);
|
||||
cm_->Remove(insert_file_path);
|
||||
}
|
||||
|
||||
TEST_F(DiskAnnFileManagerTest, CacheRawDataToDiskNoValidDataForNonNullable) {
|
||||
const int64_t collection_id = 1;
|
||||
const int64_t partition_id = 2;
|
||||
const int64_t segment_id = 3;
|
||||
const int64_t field_id = 100;
|
||||
const int64_t dim = 128;
|
||||
const int64_t num_rows = 100;
|
||||
|
||||
std::vector<float> vec_data(num_rows * dim);
|
||||
for (size_t i = 0; i < vec_data.size(); ++i) {
|
||||
vec_data[i] = static_cast<float>(i % 100);
|
||||
}
|
||||
|
||||
auto field_data = storage::CreateFieldData(
|
||||
DataType::VECTOR_FLOAT, DataType::NONE, false, dim);
|
||||
field_data->FillFieldData(vec_data.data(), num_rows);
|
||||
|
||||
ASSERT_EQ(field_data->get_num_rows(), num_rows);
|
||||
ASSERT_FALSE(field_data->IsNullable());
|
||||
|
||||
auto payload_reader =
|
||||
std::make_shared<milvus::storage::PayloadReader>(field_data);
|
||||
storage::InsertData insert_data(payload_reader);
|
||||
FieldDataMeta field_data_meta = {
|
||||
collection_id, partition_id, segment_id, field_id};
|
||||
insert_data.SetFieldDataMeta(field_data_meta);
|
||||
insert_data.SetTimestamps(0, 100);
|
||||
|
||||
auto serialized_data = insert_data.Serialize(storage::StorageType::Remote);
|
||||
|
||||
std::string insert_file_path = "/tmp/diskann/non_nullable_test";
|
||||
boost::filesystem::remove_all(insert_file_path);
|
||||
cm_->Write(
|
||||
insert_file_path, serialized_data.data(), serialized_data.size());
|
||||
|
||||
IndexMeta index_meta = {segment_id,
|
||||
field_id,
|
||||
1000,
|
||||
1,
|
||||
"test",
|
||||
"vec_field",
|
||||
DataType::VECTOR_FLOAT,
|
||||
dim};
|
||||
auto file_manager = std::make_shared<DiskFileManagerImpl>(
|
||||
storage::FileManagerContext(field_data_meta, index_meta, cm_, fs_));
|
||||
|
||||
std::string valid_data_path = "/tmp/diskann/non_nullable_valid_data";
|
||||
boost::filesystem::remove_all(valid_data_path);
|
||||
|
||||
milvus::Config config;
|
||||
config[INSERT_FILES_KEY] = std::vector<std::string>{insert_file_path};
|
||||
config[index::VALID_DATA_PATH_KEY] = valid_data_path;
|
||||
|
||||
auto local_data_path = file_manager->CacheRawDataToDisk<float>(config);
|
||||
ASSERT_FALSE(local_data_path.empty());
|
||||
|
||||
auto local_chunk_manager =
|
||||
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
|
||||
|
||||
EXPECT_FALSE(local_chunk_manager->Exist(valid_data_path))
|
||||
<< "valid_data file should NOT be created for non-nullable field";
|
||||
|
||||
local_chunk_manager->Remove(local_data_path);
|
||||
cm_->Remove(insert_file_path);
|
||||
}
|
||||
@ -183,7 +183,7 @@ func GetIndexesForVectorType(fieldType entity.FieldType) []IndexConfig {
|
||||
{"IVF_PQ", "IVF_PQ", entity.L2, map[string]string{"nlist": "128", "m": "8", "nbits": "8"}},
|
||||
{"HNSW", "HNSW", entity.L2, map[string]string{"M": "16", "efConstruction": "200"}},
|
||||
{"SCANN", "SCANN", entity.L2, map[string]string{"nlist": "128", "with_raw_data": "true"}},
|
||||
// {"DISKANN", "DISKANN", entity.L2, nil}, // Skip DISKANN for now
|
||||
{"DISKANN", "DISKANN", entity.L2, nil},
|
||||
}
|
||||
case entity.FieldTypeBinaryVector:
|
||||
return []IndexConfig{
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user