From 0907d76253aca34297abff26d660eea5e851481f Mon Sep 17 00:00:00 2001 From: Patrick Weizhi Xu Date: Wed, 24 Jan 2024 00:04:55 +0800 Subject: [PATCH] enhance: pass partition key scalar info if enabled when build vector index (#29931) issue: #29892 Pass optional scalar IVF offsets to Cardinal Signed-off-by: Patrick Weizhi Xu --- internal/core/src/common/Consts.h | 1 + internal/core/src/common/Types.h | 6 + internal/core/src/index/Meta.h | 3 + internal/core/src/index/VectorDiskIndex.cpp | 18 ++ internal/core/src/index/VectorMemIndex.cpp | 1 + internal/core/src/indexbuilder/index_c.cpp | 24 ++ internal/core/src/indexbuilder/index_c.h | 7 + internal/core/src/indexbuilder/types.h | 1 + .../core/src/storage/DiskFileManagerImpl.cpp | 301 +++++++++++++++++- .../core/src/storage/DiskFileManagerImpl.h | 7 + .../unittest/test_disk_file_manager_test.cpp | 235 ++++++++++++++ internal/datacoord/const.go | 3 - internal/datacoord/index_builder.go | 118 ++++--- internal/datacoord/index_builder_test.go | 290 +++++++++++++++++ internal/datacoord/index_service.go | 4 +- internal/datacoord/util.go | 7 +- internal/indexnode/task.go | 26 ++ internal/indexnode/task_test.go | 3 + internal/proto/index_coord.proto | 9 + .../util/indexcgowrapper/build_index_info.go | 15 + pkg/util/paramtable/component_param.go | 8 + 21 files changed, 1032 insertions(+), 55 deletions(-) diff --git a/internal/core/src/common/Consts.h b/internal/core/src/common/Consts.h index 6f07d6dc98..65e6795b16 100644 --- a/internal/core/src/common/Consts.h +++ b/internal/core/src/common/Consts.h @@ -38,6 +38,7 @@ const char INDEX_BUILD_ID_KEY[] = "indexBuildID"; const char INDEX_ROOT_PATH[] = "index_files"; const char RAWDATA_ROOT_PATH[] = "raw_datas"; +const char VEC_OPT_FIELDS[] = "opt_fields"; const char DEFAULT_PLANNODE_ID[] = "0"; const char DEAFULT_QUERY_ID[] = "0"; diff --git a/internal/core/src/common/Types.h b/internal/core/src/common/Types.h index e9bcb734e4..2c822590dd 100644 --- a/internal/core/src/common/Types.h +++ b/internal/core/src/common/Types.h @@ -147,6 +147,12 @@ using FieldName = fluent::NamedType; + +// field id -> (field name, field type, binlog paths) +using OptFieldT = std::unordered_map< + int64_t, + std::tuple>>; + // using FieldOffset = fluent::NamedType; using SegOffset = fluent::NamedType; diff --git a/internal/core/src/index/Meta.h b/internal/core/src/index/Meta.h index 250cc509bc..c8fad3dbf4 100644 --- a/internal/core/src/index/Meta.h +++ b/internal/core/src/index/Meta.h @@ -52,6 +52,9 @@ constexpr const char* INDEX_ENGINE_VERSION = "index_engine_version"; constexpr const char* DISK_ANN_PREFIX_PATH = "index_prefix"; constexpr const char* DISK_ANN_RAW_DATA_PATH = "data_path"; +// VecIndex node filtering +constexpr const char* VEC_OPT_FIELDS_PATH = "opt_fields_path"; + // DiskAnn build params constexpr const char* DISK_ANN_MAX_DEGREE = "max_degree"; constexpr const char* DISK_ANN_SEARCH_LIST_SIZE = "search_list_size"; diff --git a/internal/core/src/index/VectorDiskIndex.cpp b/internal/core/src/index/VectorDiskIndex.cpp index 92908a30f6..cca80bf6b9 100644 --- a/internal/core/src/index/VectorDiskIndex.cpp +++ b/internal/core/src/index/VectorDiskIndex.cpp @@ -17,6 +17,7 @@ #include "index/VectorDiskIndex.h" #include "common/Tracer.h" +#include "common/Types.h" #include "common/Utils.h" #include "config/ConfigKnowhere.h" #include "index/Meta.h" @@ -25,6 +26,7 @@ #include "storage/Util.h" #include "common/Consts.h" #include "common/RangeSearchHelper.h" +#include "indexbuilder/types.h" namespace milvus::index { @@ -190,7 +192,15 @@ VectorDiskAnnIndex::BuildV2(const Config& config) { build_config[DISK_ANN_THREADS_NUM] = std::atoi(num_threads.value().c_str()); } + + auto opt_fields = GetValueFromConfig(config, VEC_OPT_FIELDS); + if (opt_fields.has_value() && index_.IsAdditionalScalarSupported()) { + build_config[VEC_OPT_FIELDS_PATH] = + file_manager_->CacheOptFieldToDisk(opt_fields.value()); + } + build_config.erase("insert_files"); + build_config.erase(VEC_OPT_FIELDS); index_.Build({}, build_config); auto local_chunk_manager = @@ -229,7 +239,15 @@ VectorDiskAnnIndex::Build(const Config& config) { build_config[DISK_ANN_THREADS_NUM] = std::atoi(num_threads.value().c_str()); } + + auto opt_fields = GetValueFromConfig(config, VEC_OPT_FIELDS); + if (opt_fields.has_value() && index_.IsAdditionalScalarSupported()) { + build_config[VEC_OPT_FIELDS_PATH] = + file_manager_->CacheOptFieldToDisk(opt_fields.value()); + } + build_config.erase("insert_files"); + build_config.erase(VEC_OPT_FIELDS); auto stat = index_.Build({}, build_config); if (stat != knowhere::Status::success) PanicInfo(ErrorCode::IndexBuildError, diff --git a/internal/core/src/index/VectorMemIndex.cpp b/internal/core/src/index/VectorMemIndex.cpp index 8b0c967bd4..639dae7073 100644 --- a/internal/core/src/index/VectorMemIndex.cpp +++ b/internal/core/src/index/VectorMemIndex.cpp @@ -507,6 +507,7 @@ VectorMemIndex::Build(const Config& config) { Config build_config; build_config.update(config); build_config.erase("insert_files"); + build_config.erase(VEC_OPT_FIELDS); auto dataset = GenDataset(total_num_rows, dim, buf.get()); BuildWithDataset(dataset, build_config); diff --git a/internal/core/src/indexbuilder/index_c.cpp b/internal/core/src/indexbuilder/index_c.cpp index 6b34ca3734..b921118586 100644 --- a/internal/core/src/indexbuilder/index_c.cpp +++ b/internal/core/src/indexbuilder/index_c.cpp @@ -90,6 +90,9 @@ CreateIndex(CIndex* res_index, CBuildIndexInfo c_build_index_info) { auto& config = build_index_info->config; config["insert_files"] = build_index_info->insert_files; + if (build_index_info->opt_fields.size()) { + config["opt_fields"] = build_index_info->opt_fields; + } // get index type auto index_type = milvus::index::GetValueFromConfig( @@ -713,3 +716,24 @@ SerializeIndexAndUpLoadV2(CIndex index, CBinarySet* c_binary_set) { } return status; } + +CStatus +AppendOptionalFieldDataPath(CBuildIndexInfo c_build_index_info, + const int64_t field_id, + const char* field_name, + const int32_t field_type, + const char* c_file_path) { + try { + auto build_index_info = (BuildIndexInfo*)c_build_index_info; + std::string field_name_str(field_name); + auto& opt_fields_map = build_index_info->opt_fields; + if (opt_fields_map.find(field_id) == opt_fields_map.end()) { + opt_fields_map[field_id] = { + field_name, static_cast(field_type), {}}; + } + std::get<2>(opt_fields_map[field_id]).emplace_back(c_file_path); + return CStatus{Success, ""}; + } catch (std::exception& e) { + return milvus::FailureCStatus(&e); + } +} diff --git a/internal/core/src/indexbuilder/index_c.h b/internal/core/src/indexbuilder/index_c.h index 506f164360..d13e121737 100644 --- a/internal/core/src/indexbuilder/index_c.h +++ b/internal/core/src/indexbuilder/index_c.h @@ -108,6 +108,13 @@ CStatus AppendIndexEngineVersionToBuildInfo(CBuildIndexInfo c_load_index_info, int32_t c_index_engine_version); +CStatus +AppendOptionalFieldDataPath(CBuildIndexInfo c_build_index_info, + const int64_t field_id, + const char* field_name, + const int32_t field_type, + const char* c_file_path); + CStatus SerializeIndexAndUpLoad(CIndex index, CBinarySet* c_binary_set); diff --git a/internal/core/src/indexbuilder/types.h b/internal/core/src/indexbuilder/types.h index 5f5ce89fba..e1c656e25d 100644 --- a/internal/core/src/indexbuilder/types.h +++ b/internal/core/src/indexbuilder/types.h @@ -39,4 +39,5 @@ struct BuildIndexInfo { std::string index_store_path; int64_t dim; int32_t index_engine_version; + milvus::OptFieldT opt_fields; }; diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index 2ee03543f3..6a1ca207e8 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -16,12 +16,22 @@ #include #include +#include #include #include +#include +#include +#include #include +#include #include "common/Common.h" +#include "common/Consts.h" +#include "common/EasyAssert.h" +#include "common/FieldData.h" +#include "common/FieldDataInterface.h" #include "common/Slice.h" +#include "common/Types.h" #include "log/Log.h" #include "storage/DiskFileManagerImpl.h" @@ -434,14 +444,19 @@ DiskFileManagerImpl::CacheRawDataToDisk( return local_data_path; } -std::string -DiskFileManagerImpl::CacheRawDataToDisk(std::vector remote_files) { - std::sort(remote_files.begin(), - remote_files.end(), +void +SortByPath(std::vector& paths) { + std::sort(paths.begin(), + paths.end(), [](const std::string& a, const std::string& b) { return std::stol(a.substr(a.find_last_of("/") + 1)) < std::stol(b.substr(b.find_last_of("/") + 1)); }); +} + +std::string +DiskFileManagerImpl::CacheRawDataToDisk(std::vector remote_files) { + SortByPath(remote_files); auto segment_id = GetFieldDataMeta().segment_id; auto field_id = GetFieldDataMeta().field_id; @@ -508,6 +523,284 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector remote_files) { return local_data_path; } +template +struct has_native_type : std::false_type {}; +template +struct has_native_type> + : std::true_type {}; +template +using DataTypeNativeOrVoid = + typename std::conditional>::value, + typename TypeTraits::NativeType, + void>::type; +template +using DataTypeToOffsetMap = + std::unordered_map, int64_t>; + +template +void +WriteOptFieldIvfDataImpl( + const int64_t field_id, + const std::shared_ptr& local_chunk_manager, + const std::string& local_data_path, + const std::vector& field_datas, + uint64_t& write_offset) { + using FieldDataT = DataTypeNativeOrVoid; + using OffsetT = uint32_t; + std::unordered_map> mp; + OffsetT offset = 0; + for (const auto& field_data : field_datas) { + for (int64_t i = 0; i < field_data->get_num_rows(); ++i) { + auto val = + *reinterpret_cast(field_data->RawValue(i)); + mp[val].push_back(offset++); + } + } + local_chunk_manager->Write(local_data_path, + write_offset, + const_cast(&field_id), + sizeof(field_id)); + write_offset += sizeof(field_id); + const uint32_t num_of_unique_field_data = mp.size(); + local_chunk_manager->Write(local_data_path, + write_offset, + const_cast(&num_of_unique_field_data), + sizeof(num_of_unique_field_data)); + write_offset += sizeof(num_of_unique_field_data); + for (const auto& [val, offsets] : mp) { + const uint32_t offsets_cnt = offsets.size(); + local_chunk_manager->Write(local_data_path, + write_offset, + const_cast(&offsets_cnt), + sizeof(offsets_cnt)); + write_offset += sizeof(offsets_cnt); + const size_t data_size = offsets_cnt * sizeof(OffsetT); + local_chunk_manager->Write(local_data_path, + write_offset, + const_cast(offsets.data()), + data_size); + write_offset += data_size; + } +} + +#define GENERATE_OPT_FIELD_IVF_IMPL(DT) \ + WriteOptFieldIvfDataImpl
(field_id, \ + local_chunk_manager, \ + local_data_path, \ + field_datas, \ + write_offset) +bool +WriteOptFieldIvfData( + const DataType& dt, + const int64_t field_id, + const std::shared_ptr& local_chunk_manager, + const std::string& local_data_path, + const std::vector& field_datas, + uint64_t& write_offset) { + switch (dt) { + case DataType::BOOL: + GENERATE_OPT_FIELD_IVF_IMPL(DataType::BOOL); + break; + case DataType::INT8: + GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT8); + break; + case DataType::INT16: + GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT16); + break; + case DataType::INT32: + GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT32); + break; + case DataType::INT64: + GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT64); + break; + case DataType::FLOAT: + GENERATE_OPT_FIELD_IVF_IMPL(DataType::FLOAT); + break; + case DataType::DOUBLE: + GENERATE_OPT_FIELD_IVF_IMPL(DataType::DOUBLE); + break; + case DataType::STRING: + GENERATE_OPT_FIELD_IVF_IMPL(DataType::STRING); + break; + case DataType::VARCHAR: + GENERATE_OPT_FIELD_IVF_IMPL(DataType::VARCHAR); + break; + default: + LOG_WARN("Unsupported data type in optional scalar field: ", dt); + return false; + } + return true; +} +#undef GENERATE_OPT_FIELD_IVF_IMPL + +void +WriteOptFieldsIvfMeta( + const std::shared_ptr& local_chunk_manager, + const std::string& local_data_path, + const uint32_t num_of_fields, + uint64_t& write_offset) { + const uint8_t kVersion = 0; + local_chunk_manager->Write(local_data_path, + write_offset, + const_cast(&kVersion), + sizeof(kVersion)); + write_offset += sizeof(kVersion); + local_chunk_manager->Write(local_data_path, + write_offset, + const_cast(&num_of_fields), + sizeof(num_of_fields)); + write_offset += sizeof(num_of_fields); +} + +// write optional scalar fields ivf info in the following format without space among them +// | (meta) +// | version (uint8_t) | num_of_fields (uint32_t) | +// | (field_0) +// | field_id (int64_t) | num_of_unique_field_data (uint32_t) +// | size_0 (uint32_t) | offset_0 (uint32_t)... +// | size_1 | offset_0, offset_1, ... +std::string +DiskFileManagerImpl::CacheOptFieldToDisk( + std::shared_ptr space, OptFieldT& fields_map) { + uint32_t num_of_fields = fields_map.size(); + if (0 == num_of_fields) { + return ""; + } else if (num_of_fields > 1) { + PanicInfo( + ErrorCode::NotImplemented, + "vector index build with multiple fields is not supported yet"); + } + if (nullptr == space) { + LOG_ERROR("Failed to cache optional field. Space is null"); + return ""; + } + + auto segment_id = GetFieldDataMeta().segment_id; + auto vec_field_id = GetFieldDataMeta().field_id; + auto local_chunk_manager = + LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + auto local_data_path = storage::GenFieldRawDataPathPrefix( + local_chunk_manager, segment_id, vec_field_id) + + std::string(VEC_OPT_FIELDS); + local_chunk_manager->CreateFile(local_data_path); + + uint64_t write_offset = 0; + WriteOptFieldsIvfMeta( + local_chunk_manager, local_data_path, num_of_fields, write_offset); + + auto res = space->ScanData(); + if (!res.ok()) { + PanicInfo(IndexBuildError, + fmt::format("failed to create scan iterator: {}", + res.status().ToString())); + } + auto reader = res.value(); + for (auto& [field_id, tup] : fields_map) { + const auto& field_name = std::get<0>(tup); + const auto& field_type = std::get<1>(tup); + std::vector field_datas; + for (auto rec : *reader) { + if (!rec.ok()) { + PanicInfo(IndexBuildError, + fmt::format("failed to read optional field data: {}", + rec.status().ToString())); + } + auto data = rec.ValueUnsafe(); + if (data == nullptr) { + break; + } + auto total_num_rows = data->num_rows(); + if (0 == total_num_rows) { + LOG_WARN("optional field {} has no data", field_name); + return ""; + } + auto col_data = data->GetColumnByName(field_name); + auto field_data = + storage::CreateFieldData(field_type, 1, total_num_rows); + field_data->FillFieldData(col_data); + field_datas.emplace_back(field_data); + } + if (!WriteOptFieldIvfData(field_type, + field_id, + local_chunk_manager, + local_data_path, + field_datas, + write_offset)) { + return ""; + } + } + return local_data_path; +} + +std::string +DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) { + uint32_t num_of_fields = fields_map.size(); + if (0 == num_of_fields) { + return ""; + } else if (num_of_fields > 1) { + PanicInfo( + ErrorCode::NotImplemented, + "vector index build with multiple fields is not supported yet"); + } + + auto segment_id = GetFieldDataMeta().segment_id; + auto vec_field_id = GetFieldDataMeta().field_id; + auto local_chunk_manager = + LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + auto local_data_path = storage::GenFieldRawDataPathPrefix( + local_chunk_manager, segment_id, vec_field_id) + + std::string(VEC_OPT_FIELDS); + local_chunk_manager->CreateFile(local_data_path); + + std::vector field_datas; + std::vector batch_files; + uint64_t write_offset = 0; + WriteOptFieldsIvfMeta( + local_chunk_manager, local_data_path, num_of_fields, write_offset); + + auto FetchRawData = [&]() { + auto fds = GetObjectData(rcm_.get(), batch_files); + for (size_t i = 0; i < batch_files.size(); ++i) { + auto data = fds[i].get()->GetFieldData(); + field_datas.emplace_back(data); + } + }; + + auto parallel_degree = + uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); + for (auto& [field_id, tup] : fields_map) { + const auto& field_type = std::get<1>(tup); + auto& field_paths = std::get<2>(tup); + if (0 == field_paths.size()) { + LOG_WARN("optional field {} has no data", field_id); + return ""; + } + + std::vector().swap(field_datas); + SortByPath(field_paths); + + for (auto& file : field_paths) { + if (batch_files.size() >= parallel_degree) { + FetchRawData(); + batch_files.clear(); + } + batch_files.emplace_back(file); + } + if (batch_files.size() > 0) { + FetchRawData(); + } + if (!WriteOptFieldIvfData(field_type, + field_id, + local_chunk_manager, + local_data_path, + field_datas, + write_offset)) { + return ""; + } + } + return local_data_path; +} + std::string DiskFileManagerImpl::GetFileName(const std::string& localfile) { boost::filesystem::path localPath(localfile); diff --git a/internal/core/src/storage/DiskFileManagerImpl.h b/internal/core/src/storage/DiskFileManagerImpl.h index 91d33b9406..66d5830659 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.h +++ b/internal/core/src/storage/DiskFileManagerImpl.h @@ -102,6 +102,13 @@ class DiskFileManagerImpl : public FileManagerImpl { std::string CacheRawDataToDisk(std::shared_ptr space); + std::string + CacheOptFieldToDisk(OptFieldT& fields_map); + + std::string + CacheOptFieldToDisk(std::shared_ptr space, + OptFieldT& fields_map); + virtual bool AddFileUsingSpace(const std::string& local_file_name, const std::vector& local_file_offsets, diff --git a/internal/core/unittest/test_disk_file_manager_test.cpp b/internal/core/unittest/test_disk_file_manager_test.cpp index 310dec776c..f48ccab65d 100644 --- a/internal/core/unittest/test_disk_file_manager_test.cpp +++ b/internal/core/unittest/test_disk_file_manager_test.cpp @@ -9,15 +9,35 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License +#include #include +#include +#include +#include +#include +#include #include +#include +#include +#include #include +#include #include #include +#include "common/EasyAssert.h" +#include "common/FieldDataInterface.h" #include "common/Slice.h" #include "common/Common.h" +#include "common/Types.h" +#include "storage/ChunkManager.h" +#include "storage/DataCodec.h" +#include "storage/InsertData.h" #include "storage/ThreadPool.h" +#include "storage/Types.h" +#include "storage/options.h" +#include "storage/schema.h" +#include "storage/space.h" #include "storage/Util.h" #include "storage/DiskFileManagerImpl.h" #include "storage/LocalChunkManagerSingleton.h" @@ -183,3 +203,218 @@ TEST_F(DiskAnnFileManagerTest, TestThreadPoolException) { EXPECT_EQ(std::string(e.what()), "run time error"); } } + +const int64_t kOptFieldId = 123456; +const std::string kOptFieldName = "opt_field_name"; +const DataType kOptFiledType = DataType::INT64; +const int64_t kOptFieldDataRange = 10000; +const std::string kOptFieldPath = "/tmp/diskann/opt_field/123123"; +// const std::string kOptFieldPath = "/tmp/diskann/index_files/1000/index"; +const size_t kEntityCnt = 1000 * 1000; +const DataType kOptFieldDataType = DataType::INT64; +const FieldDataMeta kOptVecFieldDataMeta = {1, 2, 3, 100}; +using OffsetT = uint32_t; + +auto +CreateFileManager(const ChunkManagerPtr& cm) + -> std::shared_ptr { + // collection_id: 1, partition_id: 2, segment_id: 3 + // field_id: 100, index_build_id: 1000, index_version: 1 + IndexMeta index_meta = { + 3, 100, 1000, 1, "opt_fields", "field_name", DataType::VECTOR_FLOAT, 1}; + int64_t slice_size = milvus::FILE_SLICE_SIZE; + return std::make_shared( + storage::FileManagerContext(kOptVecFieldDataMeta, index_meta, cm)); +} + +auto +PrepareRawFieldData() -> std::vector { + std::vector data(kEntityCnt); + int64_t field_val = 0; + for (size_t i = 0; i < kEntityCnt; ++i) { + data[i] = field_val++; + if (field_val >= kOptFieldDataRange) { + field_val = 0; + } + } + return data; +} + +auto +PrepareInsertData() -> std::string { + size_t sz = sizeof(int64_t) * kEntityCnt; + std::vector data = PrepareRawFieldData(); + auto field_data = + storage::CreateFieldData(kOptFieldDataType, 1, kEntityCnt); + field_data->FillFieldData(data.data(), kEntityCnt); + storage::InsertData insert_data(field_data); + insert_data.SetFieldDataMeta(kOptVecFieldDataMeta); + insert_data.SetTimestamps(0, 100); + auto serialized_data = insert_data.Serialize(storage::StorageType::Remote); + + auto chunk_manager = + storage::CreateChunkManager(get_default_local_storage_config()); + + std::string path = kOptFieldPath + "0"; + boost::filesystem::remove_all(path); + chunk_manager->Write(path, serialized_data.data(), serialized_data.size()); + return path; +} + +auto +PrepareInsertDataSpace() + -> std::pair> { + std::string path = kOptFieldPath + "1"; + arrow::FieldVector arrow_fields{ + arrow::field("pk", arrow::int64()), + arrow::field("ts", arrow::int64()), + arrow::field(kOptFieldName, arrow::int64()), + arrow::field("vec", arrow::fixed_size_binary(1))}; + auto arrow_schema = std::make_shared(arrow_fields); + auto schema_options = std::make_shared(); + schema_options->primary_column = "pk"; + schema_options->version_column = "ts"; + schema_options->vector_column = "vec"; + auto schema = + std::make_shared(arrow_schema, schema_options); + boost::filesystem::remove_all(path); + boost::filesystem::create_directories(path); + EXPECT_TRUE(schema->Validate().ok()); + auto opt_space = milvus_storage::Space::Open( + "file://" + boost::filesystem::canonical(path).string(), + milvus_storage::Options{schema}); + EXPECT_TRUE(opt_space.has_value()); + auto space = std::move(opt_space.value()); + const auto data = PrepareRawFieldData(); + arrow::Int64Builder pk_builder; + arrow::Int64Builder ts_builder; + arrow::NumericBuilder scalar_builder; + arrow::FixedSizeBinaryBuilder vec_builder(arrow::fixed_size_binary(1)); + const uint8_t kByteZero = 0; + for (size_t i = 0; i < kEntityCnt; ++i) { + EXPECT_TRUE(pk_builder.Append(i).ok()); + EXPECT_TRUE(ts_builder.Append(i).ok()); + EXPECT_TRUE(vec_builder.Append(&kByteZero).ok()); + } + for (size_t i = 0; i < kEntityCnt; ++i) { + EXPECT_TRUE(scalar_builder.Append(data[i]).ok()); + } + std::shared_ptr pk_array; + EXPECT_TRUE(pk_builder.Finish(&pk_array).ok()); + std::shared_ptr ts_array; + EXPECT_TRUE(ts_builder.Finish(&ts_array).ok()); + std::shared_ptr scalar_array; + EXPECT_TRUE(scalar_builder.Finish(&scalar_array).ok()); + std::shared_ptr vec_array; + EXPECT_TRUE(vec_builder.Finish(&vec_array).ok()); + auto batch = + arrow::RecordBatch::Make(arrow_schema, + kEntityCnt, + {pk_array, ts_array, scalar_array, vec_array}); + auto write_opt = milvus_storage::WriteOption{kEntityCnt}; + space->Write(arrow::RecordBatchReader::Make({batch}, arrow_schema) + .ValueOrDie() + .get(), + &write_opt); + return {path, std::move(space)}; +} + +auto +PrepareOptionalField(const std::shared_ptr& file_manager, + const std::string& insert_file_path) -> OptFieldT { + OptFieldT opt_field; + std::vector insert_files; + insert_files.emplace_back(insert_file_path); + opt_field[kOptFieldId] = {kOptFieldName, kOptFiledType, insert_files}; + return opt_field; +} + +void +CheckOptFieldCorrectness(const std::string& local_file_path) { + std::ifstream ifs(local_file_path); + if (!ifs.is_open()) { + FAIL() << "open file failed: " << local_file_path << std::endl; + return; + } + uint8_t meta_version; + uint32_t meta_num_of_fields, num_of_unique_field_data; + int64_t field_id; + ifs.read(reinterpret_cast(&meta_version), sizeof(meta_version)); + EXPECT_EQ(meta_version, 0); + ifs.read(reinterpret_cast(&meta_num_of_fields), + sizeof(meta_num_of_fields)); + EXPECT_EQ(meta_num_of_fields, 1); + ifs.read(reinterpret_cast(&field_id), sizeof(field_id)); + EXPECT_EQ(field_id, kOptFieldId); + ifs.read(reinterpret_cast(&num_of_unique_field_data), + sizeof(num_of_unique_field_data)); + EXPECT_EQ(num_of_unique_field_data, kOptFieldDataRange); + + uint32_t expected_single_category_offset_cnt = + kEntityCnt / kOptFieldDataRange; + uint32_t read_single_category_offset_cnt; + std::vector single_category_offsets( + expected_single_category_offset_cnt); + for (uint32_t i = 0; i < num_of_unique_field_data; ++i) { + ifs.read(reinterpret_cast(&read_single_category_offset_cnt), + sizeof(read_single_category_offset_cnt)); + ASSERT_EQ(read_single_category_offset_cnt, + expected_single_category_offset_cnt); + ifs.read(reinterpret_cast(single_category_offsets.data()), + read_single_category_offset_cnt * sizeof(OffsetT)); + + OffsetT first_offset = 0; + if (read_single_category_offset_cnt > 0) { + first_offset = single_category_offsets[0]; + } + for (size_t j = 1; j < read_single_category_offset_cnt; ++j) { + ASSERT_EQ(single_category_offsets[j] % kOptFieldDataRange, + first_offset % kOptFieldDataRange); + } + } +} + +TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskFieldEmpty) { + auto file_manager = CreateFileManager(cm_); + const auto& [insert_file_space_path, space] = PrepareInsertDataSpace(); + OptFieldT opt_fields; + EXPECT_TRUE(file_manager->CacheOptFieldToDisk(opt_fields).empty()); + EXPECT_TRUE(file_manager->CacheOptFieldToDisk(space, opt_fields).empty()); +} + +TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskSpaceEmpty) { + auto file_manager = CreateFileManager(cm_); + auto opt_fileds = PrepareOptionalField(file_manager, ""); + auto res = file_manager->CacheOptFieldToDisk(nullptr, opt_fileds); + EXPECT_TRUE(res.empty()); +} + +TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskOptFieldMoreThanOne) { + auto file_manager = CreateFileManager(cm_); + const auto insert_file_path = PrepareInsertData(); + const auto& [insert_file_space_path, space] = PrepareInsertDataSpace(); + + OptFieldT opt_fields = PrepareOptionalField(file_manager, insert_file_path); + opt_fields[kOptFieldId + 1] = { + kOptFieldName + "second", kOptFiledType, {insert_file_space_path}}; + EXPECT_THROW(file_manager->CacheOptFieldToDisk(opt_fields), SegcoreError); + EXPECT_THROW(file_manager->CacheOptFieldToDisk(space, opt_fields), SegcoreError); +} + +TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskCorrect) { + auto file_manager = CreateFileManager(cm_); + const auto insert_file_path = PrepareInsertData(); + auto opt_fileds = PrepareOptionalField(file_manager, insert_file_path); + auto res = file_manager->CacheOptFieldToDisk(opt_fileds); + ASSERT_FALSE(res.empty()); + CheckOptFieldCorrectness(res); +} + +TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskSpaceCorrect) { + auto file_manager = CreateFileManager(cm_); + const auto& [insert_file_path, space] = PrepareInsertDataSpace(); + auto opt_fileds = PrepareOptionalField(file_manager, insert_file_path); + auto res = file_manager->CacheOptFieldToDisk(space, opt_fileds); + ASSERT_FALSE(res.empty()); + CheckOptFieldCorrectness(res); +} \ No newline at end of file diff --git a/internal/datacoord/const.go b/internal/datacoord/const.go index 9490f77653..fed537552c 100644 --- a/internal/datacoord/const.go +++ b/internal/datacoord/const.go @@ -27,8 +27,5 @@ const ( ) const ( - flatIndex = "FLAT" - binFlatIndex = "BIN_FLAT" - diskAnnIndex = "DISKANN" invalidIndex = "invalid" ) diff --git a/internal/datacoord/index_builder.go b/internal/datacoord/index_builder.go index bf7780df54..9a9b7b0030 100644 --- a/internal/datacoord/index_builder.go +++ b/internal/datacoord/index_builder.go @@ -30,10 +30,11 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" - "github.com/milvus-io/milvus/internal/util/typeutil" + itypeutil "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type indexTaskState int32 @@ -203,6 +204,19 @@ func (ib *indexBuilder) run() { } } +func getBinLogIds(segment *SegmentInfo, fieldID int64) []int64 { + binlogIDs := make([]int64, 0) + for _, fieldBinLog := range segment.GetBinlogs() { + if fieldBinLog.GetFieldID() == fieldID { + for _, binLog := range fieldBinLog.GetBinlogs() { + binlogIDs = append(binlogIDs, binLog.GetLogID()) + } + break + } + } + return binlogIDs +} + func (ib *indexBuilder) process(buildID UniqueID) bool { ib.taskMutex.RLock() state := ib.tasks[buildID] @@ -240,7 +254,8 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { return true } indexParams := ib.meta.GetIndexParams(meta.CollectionID, meta.IndexID) - if isFlatIndex(getIndexType(indexParams)) || meta.NumRows < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() { + indexType := getIndexType(indexParams) + if isFlatIndex(indexType) || meta.NumRows < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() { log.Ctx(ib.ctx).Info("segment does not need index really", zap.Int64("buildID", buildID), zap.Int64("segmentID", meta.SegmentID), zap.Int64("num rows", meta.NumRows)) if err := ib.meta.FinishTask(&indexpb.IndexTaskInfo{ @@ -269,14 +284,23 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { return false } - binlogIDs := make([]int64, 0) - fieldID := ib.meta.GetFieldIDByIndexID(meta.CollectionID, meta.IndexID) - for _, fieldBinLog := range segment.GetBinlogs() { - if fieldBinLog.GetFieldID() == fieldID { - for _, binLog := range fieldBinLog.GetBinlogs() { - binlogIDs = append(binlogIDs, binLog.GetLogID()) + // vector index build needs information of optional scalar fields data + optionalFields := make([]*indexpb.OptionalFieldInfo, 0) + if Params.CommonCfg.EnableNodeFilteringOnPartitionKey.GetAsBool() && isOptionalScalarFieldSupported(indexType) { + colSchema := ib.meta.GetCollection(meta.CollectionID).Schema + hasPartitionKey := typeutil.HasPartitionKey(colSchema) + if hasPartitionKey { + partitionKeyField, err := typeutil.GetPartitionKeyFieldSchema(colSchema) + if partitionKeyField == nil || err != nil { + log.Ctx(ib.ctx).Warn("index builder get partition key field failed", zap.Int64("build", buildID), zap.Error(err)) + } else { + optionalFields = append(optionalFields, &indexpb.OptionalFieldInfo{ + FieldID: partitionKeyField.FieldID, + FieldName: partitionKeyField.Name, + FieldType: int32(partitionKeyField.DataType), + DataIds: getBinLogIds(segment, partitionKeyField.FieldID), + }) } - break } } @@ -306,6 +330,8 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { } } + fieldID := ib.meta.GetFieldIDByIndexID(meta.CollectionID, meta.IndexID) + binlogIDs := getBinLogIds(segment, fieldID) var req *indexpb.CreateJobRequest if Params.CommonCfg.EnableStorageV2.GetAsBool() { collectionInfo, err := ib.handler.GetCollection(ib.ctx, segment.GetCollectionID()) @@ -329,55 +355,57 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { return false } - storePath, err := typeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue(), segment.GetID()) + storePath, err := itypeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue(), segment.GetID()) if err != nil { log.Ctx(ib.ctx).Warn("failed to get storage uri", zap.Error(err)) return false } - indexStorePath, err := typeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue()+"/index", segment.GetID()) + indexStorePath, err := itypeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue()+"/index", segment.GetID()) if err != nil { log.Ctx(ib.ctx).Warn("failed to get storage uri", zap.Error(err)) return false } req = &indexpb.CreateJobRequest{ - ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), - IndexFilePrefix: path.Join(ib.chunkManager.RootPath(), common.SegmentIndexPath), - BuildID: buildID, - IndexVersion: meta.IndexVersion + 1, - StorageConfig: storageConfig, - IndexParams: indexParams, - TypeParams: typeParams, - NumRows: meta.NumRows, - CollectionID: segment.GetCollectionID(), - PartitionID: segment.GetPartitionID(), - SegmentID: segment.GetID(), - FieldID: fieldID, - FieldName: field.Name, - FieldType: field.DataType, - StorePath: storePath, - StoreVersion: segment.GetStorageVersion(), - IndexStorePath: indexStorePath, - Dim: int64(dim), - CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(), - DataIds: binlogIDs, + ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), + IndexFilePrefix: path.Join(ib.chunkManager.RootPath(), common.SegmentIndexPath), + BuildID: buildID, + IndexVersion: meta.IndexVersion + 1, + StorageConfig: storageConfig, + IndexParams: indexParams, + TypeParams: typeParams, + NumRows: meta.NumRows, + CollectionID: segment.GetCollectionID(), + PartitionID: segment.GetPartitionID(), + SegmentID: segment.GetID(), + FieldID: fieldID, + FieldName: field.Name, + FieldType: field.DataType, + StorePath: storePath, + StoreVersion: segment.GetStorageVersion(), + IndexStorePath: indexStorePath, + Dim: int64(dim), + CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(), + DataIds: binlogIDs, + OptionalScalarFields: optionalFields, } } else { req = &indexpb.CreateJobRequest{ - ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), - IndexFilePrefix: path.Join(ib.chunkManager.RootPath(), common.SegmentIndexPath), - BuildID: buildID, - IndexVersion: meta.IndexVersion + 1, - StorageConfig: storageConfig, - IndexParams: indexParams, - TypeParams: typeParams, - NumRows: meta.NumRows, - CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(), - DataIds: binlogIDs, - CollectionID: segment.GetCollectionID(), - PartitionID: segment.GetPartitionID(), - SegmentID: segment.GetID(), - FieldID: fieldID, + ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), + IndexFilePrefix: path.Join(ib.chunkManager.RootPath(), common.SegmentIndexPath), + BuildID: buildID, + IndexVersion: meta.IndexVersion + 1, + StorageConfig: storageConfig, + IndexParams: indexParams, + TypeParams: typeParams, + NumRows: meta.NumRows, + CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(), + DataIds: binlogIDs, + CollectionID: segment.GetCollectionID(), + PartitionID: segment.GetPartitionID(), + SegmentID: segment.GetID(), + FieldID: fieldID, + OptionalScalarFields: optionalFields, } } diff --git a/internal/datacoord/index_builder_test.go b/internal/datacoord/index_builder_test.go index 4580be0834..f34809374f 100644 --- a/internal/datacoord/index_builder_test.go +++ b/internal/datacoord/index_builder_test.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/types" mclient "github.com/milvus-io/milvus/internal/util/mock" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -1196,3 +1197,292 @@ func TestIndexBuilderV2(t *testing.T) { } ib.Stop() } + +func TestVecIndexWithOptionalScalarField(t *testing.T) { + var ( + collID = UniqueID(100) + partID = UniqueID(200) + indexID = UniqueID(300) + segID = UniqueID(500) + buildID = UniqueID(600) + nodeID = UniqueID(700) + partitionKeyID = UniqueID(800) + ) + + paramtable.Init() + ctx := context.Background() + minNumberOfRowsToBuild := paramtable.Get().DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() + 1 + + catalog := catalogmocks.NewDataCoordCatalog(t) + catalog.On("CreateSegmentIndex", + mock.Anything, + mock.Anything, + ).Return(nil) + catalog.On("AlterSegmentIndexes", + mock.Anything, + mock.Anything, + ).Return(nil) + + ic := mocks.NewMockIndexNodeClient(t) + ic.EXPECT().GetJobStats(mock.Anything, mock.Anything, mock.Anything). + Return(&indexpb.GetJobStatsResponse{ + Status: merr.Success(), + TotalJobNum: 0, + EnqueueJobNum: 0, + InProgressJobNum: 0, + TaskSlots: 1, + JobInfos: []*indexpb.JobInfo{}, + }, nil) + ic.EXPECT().QueryJobs(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, in *indexpb.QueryJobsRequest, option ...grpc.CallOption) (*indexpb.QueryJobsResponse, error) { + indexInfos := make([]*indexpb.IndexTaskInfo, 0) + for _, buildID := range in.BuildIDs { + indexInfos = append(indexInfos, &indexpb.IndexTaskInfo{ + BuildID: buildID, + State: commonpb.IndexState_Finished, + IndexFileKeys: []string{"file1", "file2"}, + }) + } + return &indexpb.QueryJobsResponse{ + Status: merr.Success(), + ClusterID: in.ClusterID, + IndexInfos: indexInfos, + }, nil + }) + + ic.EXPECT().DropJobs(mock.Anything, mock.Anything, mock.Anything). + Return(merr.Success(), nil) + + mt := meta{ + catalog: catalog, + collections: map[int64]*collectionInfo{ + collID: { + ID: collID, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: fieldID, + Name: "vec", + DataType: schemapb.DataType_FloatVector, + }, + { + FieldID: partitionKeyID, + Name: "scalar", + DataType: schemapb.DataType_Int64, + IsPartitionKey: true, + }, + }, + }, + CreatedAt: 0, + }, + }, + indexes: map[UniqueID]map[UniqueID]*model.Index{ + collID: { + indexID: { + TenantID: "", + CollectionID: collID, + FieldID: fieldID, + IndexID: indexID, + IndexName: indexName, + IsDeleted: false, + CreateTime: 1, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.MetricTypeKey, + Value: "L2", + }, + { + Key: common.IndexTypeKey, + Value: indexparamcheck.IndexHNSW, + }, + }, + }, + }, + }, + segments: &SegmentsInfo{ + segments: map[UniqueID]*SegmentInfo{ + segID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "", + NumOfRows: minNumberOfRowsToBuild, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: 10, + }, + segmentIndexes: map[UniqueID]*model.SegmentIndex{ + indexID: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: minNumberOfRowsToBuild, + IndexID: indexID, + BuildID: buildID, + NodeID: 0, + IndexVersion: 0, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: nil, + IndexSize: 0, + }, + }, + }, + }, + }, + buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{ + buildID: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: minNumberOfRowsToBuild, + IndexID: indexID, + BuildID: buildID, + NodeID: 0, + IndexVersion: 0, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: nil, + IndexSize: 0, + }, + }, + } + + nodeManager := &IndexNodeManager{ + ctx: ctx, + nodeClients: map[UniqueID]types.IndexNodeClient{ + 1: ic, + }, + } + + cm := &mocks.ChunkManager{} + cm.EXPECT().RootPath().Return("root") + + waitTaskDoneFunc := func(builder *indexBuilder) { + for { + builder.taskMutex.RLock() + if len(builder.tasks) == 0 { + builder.taskMutex.RUnlock() + break + } + builder.taskMutex.RUnlock() + } + + assert.Zero(t, len(builder.tasks)) + } + + resetMetaFunc := func() { + mt.buildID2SegmentIndex[buildID].IndexState = commonpb.IndexState_Unissued + mt.segments.segments[segID].segmentIndexes[indexID].IndexState = commonpb.IndexState_Unissued + mt.indexes[collID][indexID].IndexParams[1].Value = indexparamcheck.IndexHNSW + mt.collections[collID].Schema.Fields[1].IsPartitionKey = true + } + + paramtable.Get().CommonCfg.EnableNodeFilteringOnPartitionKey.SwapTempValue("true") + defer paramtable.Get().CommonCfg.EnableNodeFilteringOnPartitionKey.SwapTempValue("false") + ib := newIndexBuilder(ctx, &mt, nodeManager, cm, newIndexEngineVersionManager(), nil) + + t.Run("success to get opt field on startup", func(t *testing.T) { + ic.EXPECT().CreateJob(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, in *indexpb.CreateJobRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + assert.NotZero(t, len(in.OptionalScalarFields), "optional scalar field should be set") + return merr.Success(), nil + }).Once() + assert.Equal(t, 1, len(ib.tasks)) + assert.Equal(t, indexTaskInit, ib.tasks[buildID]) + + ib.scheduleDuration = time.Millisecond * 500 + ib.Start() + waitTaskDoneFunc(ib) + resetMetaFunc() + }) + + segIdx := &model.SegmentIndex{ + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: minNumberOfRowsToBuild, + IndexID: indexID, + BuildID: buildID, + NodeID: 0, + IndexVersion: 0, + IndexState: 0, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: nil, + IndexSize: 0, + } + + t.Run("enqueue", func(t *testing.T) { + ic.EXPECT().CreateJob(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, in *indexpb.CreateJobRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + assert.NotZero(t, len(in.OptionalScalarFields), "optional scalar field should be set") + return merr.Success(), nil + }).Once() + err := ib.meta.AddSegmentIndex(segIdx) + assert.NoError(t, err) + ib.enqueue(buildID) + waitTaskDoneFunc(ib) + resetMetaFunc() + }) + + // should still be able to build vec index when opt field is not set + t.Run("enqueue returns empty optional field when cfg disable", func(t *testing.T) { + paramtable.Get().CommonCfg.EnableNodeFilteringOnPartitionKey.SwapTempValue("false") + ic.EXPECT().CreateJob(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, in *indexpb.CreateJobRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + assert.Zero(t, len(in.OptionalScalarFields), "optional scalar field should be set") + return merr.Success(), nil + }).Once() + err := ib.meta.AddSegmentIndex(segIdx) + assert.NoError(t, err) + ib.enqueue(buildID) + waitTaskDoneFunc(ib) + resetMetaFunc() + }) + + t.Run("enqueue returns empty optional field when index is not HNSW", func(t *testing.T) { + paramtable.Get().CommonCfg.EnableNodeFilteringOnPartitionKey.SwapTempValue("true") + mt.indexes[collID][indexID].IndexParams[1].Value = indexparamcheck.IndexDISKANN + ic.EXPECT().CreateJob(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, in *indexpb.CreateJobRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + assert.Zero(t, len(in.OptionalScalarFields), "optional scalar field should be set") + return merr.Success(), nil + }).Once() + err := ib.meta.AddSegmentIndex(segIdx) + assert.NoError(t, err) + ib.enqueue(buildID) + waitTaskDoneFunc(ib) + resetMetaFunc() + }) + + t.Run("enqueue returns empty optional field when no partition key", func(t *testing.T) { + paramtable.Get().CommonCfg.EnableNodeFilteringOnPartitionKey.SwapTempValue("true") + mt.collections[collID].Schema.Fields[1].IsPartitionKey = false + ic.EXPECT().CreateJob(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, in *indexpb.CreateJobRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + assert.Zero(t, len(in.OptionalScalarFields), "optional scalar field should be set") + return merr.Success(), nil + }).Once() + err := ib.meta.AddSegmentIndex(segIdx) + assert.NoError(t, err) + ib.enqueue(buildID) + waitTaskDoneFunc(ib) + resetMetaFunc() + }) + + ib.nodeDown(nodeID) + ib.Stop() +} diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index a62e2c5f14..ed1058ed11 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -197,10 +197,10 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() return merr.Status(err), nil } - if getIndexType(req.GetIndexParams()) == diskAnnIndex && !s.indexNodeManager.ClientSupportDisk() { + if getIndexType(req.GetIndexParams()) == indexparamcheck.IndexDISKANN && !s.indexNodeManager.ClientSupportDisk() { errMsg := "all IndexNodes do not support disk indexes, please verify" log.Warn(errMsg) - err = merr.WrapErrIndexNotSupported(diskAnnIndex) + err = merr.WrapErrIndexNotSupported(indexparamcheck.IndexDISKANN) metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() return merr.Status(err), nil } diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 68ad20329d..c8aef002c5 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -189,7 +190,11 @@ func getIndexType(indexParams []*commonpb.KeyValuePair) string { } func isFlatIndex(indexType string) bool { - return indexType == flatIndex || indexType == binFlatIndex + return indexType == indexparamcheck.IndexFaissIDMap || indexType == indexparamcheck.IndexFaissBinIDMap +} + +func isOptionalScalarFieldSupported(indexType string) bool { + return indexType == indexparamcheck.IndexHNSW } func parseBuildIDFromFilePath(key string) (UniqueID, error) { diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index fd5ebf4aca..b14343900d 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -185,6 +185,13 @@ func (it *indexBuildTaskV2) BuildIndex(ctx context.Context) error { return err } + for _, optField := range it.req.GetOptionalScalarFields() { + if err := buildIndexInfo.AppendOptionalField(optField); err != nil { + log.Ctx(ctx).Warn("append optional field failed", zap.Error(err)) + return err + } + } + it.index, err = indexcgowrapper.CreateIndexV2(ctx, buildIndexInfo) if err != nil { if it.index != nil && it.index.CleanLocalData() != nil { @@ -325,6 +332,18 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error { it.req.DataPaths = append(it.req.DataPaths, path) } } + + if it.req.OptionalScalarFields != nil { + for _, optFields := range it.req.GetOptionalScalarFields() { + if len(optFields.DataPaths) == 0 { + for _, id := range optFields.DataIds { + path := metautil.BuildInsertLogPath(it.req.GetStorageConfig().RootPath, it.req.GetCollectionID(), it.req.GetPartitionID(), it.req.GetSegmentID(), optFields.FieldID, id) + optFields.DataPaths = append(optFields.DataPaths, path) + } + } + } + } + // type params can be removed for _, kvPair := range it.req.GetTypeParams() { key, value := kvPair.GetKey(), kvPair.GetValue() @@ -518,6 +537,13 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error { return err } + for _, optField := range it.req.GetOptionalScalarFields() { + if err := buildIndexInfo.AppendOptionalField(optField); err != nil { + log.Ctx(ctx).Warn("append optional field failed", zap.Error(err)) + return err + } + } + it.index, err = indexcgowrapper.CreateIndex(ctx, buildIndexInfo) if err != nil { if it.index != nil && it.index.CleanLocalData() != nil { diff --git a/internal/indexnode/task_test.go b/internal/indexnode/task_test.go index ae3160b104..dc30abd800 100644 --- a/internal/indexnode/task_test.go +++ b/internal/indexnode/task_test.go @@ -293,6 +293,9 @@ func (suite *IndexBuildTaskV2Suite) TestBuildIndex() { StoreVersion: suite.space.GetCurrentVersion(), IndexStorePath: "file://" + suite.space.Path(), Dim: 4, + OptionalScalarFields: []*indexpb.OptionalFieldInfo{ + {FieldID: 1, FieldName: "pk", FieldType: 5, DataIds: []int64{0}}, + }, } task := &indexBuildTaskV2{ diff --git a/internal/proto/index_coord.proto b/internal/proto/index_coord.proto index b27250d2ee..25e762c380 100644 --- a/internal/proto/index_coord.proto +++ b/internal/proto/index_coord.proto @@ -242,6 +242,14 @@ message StorageConfig { int64 request_timeout_ms = 13; } +message OptionalFieldInfo { + int64 fieldID = 1; + string field_name = 2; + int32 field_type = 3; + repeated string data_paths = 4; + repeated int64 data_ids = 5; +} + message CreateJobRequest { string clusterID = 1; string index_file_prefix = 2; @@ -266,6 +274,7 @@ message CreateJobRequest { string index_store_path = 21; int64 dim = 22; repeated int64 data_ids = 23; + repeated OptionalFieldInfo optional_scalar_fields = 24; } message QueryJobsRequest { diff --git a/internal/util/indexcgowrapper/build_index_info.go b/internal/util/indexcgowrapper/build_index_info.go index abca946373..d7cf841d09 100644 --- a/internal/util/indexcgowrapper/build_index_info.go +++ b/internal/util/indexcgowrapper/build_index_info.go @@ -181,3 +181,18 @@ func (bi *BuildIndexInfo) AppendIndexEngineVersion(indexEngineVersion int32) err status := C.AppendIndexEngineVersionToBuildInfo(bi.cBuildIndexInfo, cIndexEngineVersion) return HandleCStatus(&status, "AppendIndexEngineVersion failed") } + +func (bi *BuildIndexInfo) AppendOptionalField(optField *indexpb.OptionalFieldInfo) error { + cFieldId := C.int64_t(optField.GetFieldID()) + cFieldType := C.int32_t(optField.GetFieldType()) + cFieldName := C.CString(optField.GetFieldName()) + for _, dataPath := range optField.GetDataPaths() { + cDataPath := C.CString(dataPath) + defer C.free(unsafe.Pointer(cDataPath)) + status := C.AppendOptionalFieldDataPath(bi.cBuildIndexInfo, cFieldId, cFieldName, cFieldType, cDataPath) + if err := HandleCStatus(&status, "AppendOptionalFieldDataPath failed"); err != nil { + return err + } + } + return nil +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index cb6d39da77..1b5f9fec09 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -190,6 +190,7 @@ type commonConfig struct { HighPriorityThreadCoreCoefficient ParamItem `refreshable:"false"` MiddlePriorityThreadCoreCoefficient ParamItem `refreshable:"false"` LowPriorityThreadCoreCoefficient ParamItem `refreshable:"false"` + EnableNodeFilteringOnPartitionKey ParamItem `refreshable:"false"` MaxDegree ParamItem `refreshable:"true"` SearchListSize ParamItem `refreshable:"true"` PQCodeBudgetGBRatio ParamItem `refreshable:"true"` @@ -422,6 +423,13 @@ This configuration is only used by querynode and indexnode, it selects CPU instr } p.IndexSliceSize.Init(base.mgr) + p.EnableNodeFilteringOnPartitionKey = ParamItem{ + Key: "common.nodeFiltering.enableOnPartitionKey", + Version: "2.5.0", + DefaultValue: "false", + } + p.EnableNodeFilteringOnPartitionKey.Init(base.mgr) + p.MaxDegree = ParamItem{ Key: "common.DiskIndex.MaxDegree", Version: "2.0.0",