mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
enhance: pass partition key scalar info if enabled when build vector index (#29931)
issue: #29892 Pass optional scalar IVF offsets to Cardinal Signed-off-by: Patrick Weizhi Xu <weizhi.xu@zilliz.com>
This commit is contained in:
parent
38746dfc2b
commit
0907d76253
@ -38,6 +38,7 @@ const char INDEX_BUILD_ID_KEY[] = "indexBuildID";
|
||||
|
||||
const char INDEX_ROOT_PATH[] = "index_files";
|
||||
const char RAWDATA_ROOT_PATH[] = "raw_datas";
|
||||
const char VEC_OPT_FIELDS[] = "opt_fields";
|
||||
|
||||
const char DEFAULT_PLANNODE_ID[] = "0";
|
||||
const char DEAFULT_QUERY_ID[] = "0";
|
||||
|
||||
@ -147,6 +147,12 @@ using FieldName = fluent::NamedType<std::string,
|
||||
impl::FieldNameTag,
|
||||
fluent::Comparable,
|
||||
fluent::Hashable>;
|
||||
|
||||
// field id -> (field name, field type, binlog paths)
|
||||
using OptFieldT = std::unordered_map<
|
||||
int64_t,
|
||||
std::tuple<std::string, milvus::DataType, std::vector<std::string>>>;
|
||||
|
||||
// using FieldOffset = fluent::NamedType<int64_t, impl::FieldOffsetTag, fluent::Comparable, fluent::Hashable>;
|
||||
using SegOffset =
|
||||
fluent::NamedType<int64_t, impl::SegOffsetTag, fluent::Arithmetic>;
|
||||
|
||||
@ -52,6 +52,9 @@ constexpr const char* INDEX_ENGINE_VERSION = "index_engine_version";
|
||||
constexpr const char* DISK_ANN_PREFIX_PATH = "index_prefix";
|
||||
constexpr const char* DISK_ANN_RAW_DATA_PATH = "data_path";
|
||||
|
||||
// VecIndex node filtering
|
||||
constexpr const char* VEC_OPT_FIELDS_PATH = "opt_fields_path";
|
||||
|
||||
// DiskAnn build params
|
||||
constexpr const char* DISK_ANN_MAX_DEGREE = "max_degree";
|
||||
constexpr const char* DISK_ANN_SEARCH_LIST_SIZE = "search_list_size";
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
#include "index/VectorDiskIndex.h"
|
||||
|
||||
#include "common/Tracer.h"
|
||||
#include "common/Types.h"
|
||||
#include "common/Utils.h"
|
||||
#include "config/ConfigKnowhere.h"
|
||||
#include "index/Meta.h"
|
||||
@ -25,6 +26,7 @@
|
||||
#include "storage/Util.h"
|
||||
#include "common/Consts.h"
|
||||
#include "common/RangeSearchHelper.h"
|
||||
#include "indexbuilder/types.h"
|
||||
|
||||
namespace milvus::index {
|
||||
|
||||
@ -190,7 +192,15 @@ VectorDiskAnnIndex<T>::BuildV2(const Config& config) {
|
||||
build_config[DISK_ANN_THREADS_NUM] =
|
||||
std::atoi(num_threads.value().c_str());
|
||||
}
|
||||
|
||||
auto opt_fields = GetValueFromConfig<OptFieldT>(config, VEC_OPT_FIELDS);
|
||||
if (opt_fields.has_value() && index_.IsAdditionalScalarSupported()) {
|
||||
build_config[VEC_OPT_FIELDS_PATH] =
|
||||
file_manager_->CacheOptFieldToDisk(opt_fields.value());
|
||||
}
|
||||
|
||||
build_config.erase("insert_files");
|
||||
build_config.erase(VEC_OPT_FIELDS);
|
||||
index_.Build({}, build_config);
|
||||
|
||||
auto local_chunk_manager =
|
||||
@ -229,7 +239,15 @@ VectorDiskAnnIndex<T>::Build(const Config& config) {
|
||||
build_config[DISK_ANN_THREADS_NUM] =
|
||||
std::atoi(num_threads.value().c_str());
|
||||
}
|
||||
|
||||
auto opt_fields = GetValueFromConfig<OptFieldT>(config, VEC_OPT_FIELDS);
|
||||
if (opt_fields.has_value() && index_.IsAdditionalScalarSupported()) {
|
||||
build_config[VEC_OPT_FIELDS_PATH] =
|
||||
file_manager_->CacheOptFieldToDisk(opt_fields.value());
|
||||
}
|
||||
|
||||
build_config.erase("insert_files");
|
||||
build_config.erase(VEC_OPT_FIELDS);
|
||||
auto stat = index_.Build({}, build_config);
|
||||
if (stat != knowhere::Status::success)
|
||||
PanicInfo(ErrorCode::IndexBuildError,
|
||||
|
||||
@ -507,6 +507,7 @@ VectorMemIndex<T>::Build(const Config& config) {
|
||||
Config build_config;
|
||||
build_config.update(config);
|
||||
build_config.erase("insert_files");
|
||||
build_config.erase(VEC_OPT_FIELDS);
|
||||
|
||||
auto dataset = GenDataset(total_num_rows, dim, buf.get());
|
||||
BuildWithDataset(dataset, build_config);
|
||||
|
||||
@ -90,6 +90,9 @@ CreateIndex(CIndex* res_index, CBuildIndexInfo c_build_index_info) {
|
||||
|
||||
auto& config = build_index_info->config;
|
||||
config["insert_files"] = build_index_info->insert_files;
|
||||
if (build_index_info->opt_fields.size()) {
|
||||
config["opt_fields"] = build_index_info->opt_fields;
|
||||
}
|
||||
|
||||
// get index type
|
||||
auto index_type = milvus::index::GetValueFromConfig<std::string>(
|
||||
@ -713,3 +716,24 @@ SerializeIndexAndUpLoadV2(CIndex index, CBinarySet* c_binary_set) {
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
CStatus
|
||||
AppendOptionalFieldDataPath(CBuildIndexInfo c_build_index_info,
|
||||
const int64_t field_id,
|
||||
const char* field_name,
|
||||
const int32_t field_type,
|
||||
const char* c_file_path) {
|
||||
try {
|
||||
auto build_index_info = (BuildIndexInfo*)c_build_index_info;
|
||||
std::string field_name_str(field_name);
|
||||
auto& opt_fields_map = build_index_info->opt_fields;
|
||||
if (opt_fields_map.find(field_id) == opt_fields_map.end()) {
|
||||
opt_fields_map[field_id] = {
|
||||
field_name, static_cast<milvus::DataType>(field_type), {}};
|
||||
}
|
||||
std::get<2>(opt_fields_map[field_id]).emplace_back(c_file_path);
|
||||
return CStatus{Success, ""};
|
||||
} catch (std::exception& e) {
|
||||
return milvus::FailureCStatus(&e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -108,6 +108,13 @@ CStatus
|
||||
AppendIndexEngineVersionToBuildInfo(CBuildIndexInfo c_load_index_info,
|
||||
int32_t c_index_engine_version);
|
||||
|
||||
CStatus
|
||||
AppendOptionalFieldDataPath(CBuildIndexInfo c_build_index_info,
|
||||
const int64_t field_id,
|
||||
const char* field_name,
|
||||
const int32_t field_type,
|
||||
const char* c_file_path);
|
||||
|
||||
CStatus
|
||||
SerializeIndexAndUpLoad(CIndex index, CBinarySet* c_binary_set);
|
||||
|
||||
|
||||
@ -39,4 +39,5 @@ struct BuildIndexInfo {
|
||||
std::string index_store_path;
|
||||
int64_t dim;
|
||||
int32_t index_engine_version;
|
||||
milvus::OptFieldT opt_fields;
|
||||
};
|
||||
|
||||
@ -16,12 +16,22 @@
|
||||
|
||||
#include <algorithm>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <type_traits>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "common/Common.h"
|
||||
#include "common/Consts.h"
|
||||
#include "common/EasyAssert.h"
|
||||
#include "common/FieldData.h"
|
||||
#include "common/FieldDataInterface.h"
|
||||
#include "common/Slice.h"
|
||||
#include "common/Types.h"
|
||||
#include "log/Log.h"
|
||||
|
||||
#include "storage/DiskFileManagerImpl.h"
|
||||
@ -434,14 +444,19 @@ DiskFileManagerImpl::CacheRawDataToDisk(
|
||||
return local_data_path;
|
||||
}
|
||||
|
||||
std::string
|
||||
DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
|
||||
std::sort(remote_files.begin(),
|
||||
remote_files.end(),
|
||||
void
|
||||
SortByPath(std::vector<std::string>& paths) {
|
||||
std::sort(paths.begin(),
|
||||
paths.end(),
|
||||
[](const std::string& a, const std::string& b) {
|
||||
return std::stol(a.substr(a.find_last_of("/") + 1)) <
|
||||
std::stol(b.substr(b.find_last_of("/") + 1));
|
||||
});
|
||||
}
|
||||
|
||||
std::string
|
||||
DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
|
||||
SortByPath(remote_files);
|
||||
|
||||
auto segment_id = GetFieldDataMeta().segment_id;
|
||||
auto field_id = GetFieldDataMeta().field_id;
|
||||
@ -508,6 +523,284 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector<std::string> remote_files) {
|
||||
return local_data_path;
|
||||
}
|
||||
|
||||
template <typename T, typename = void>
|
||||
struct has_native_type : std::false_type {};
|
||||
template <typename T>
|
||||
struct has_native_type<T, std::void_t<typename T::NativeType>>
|
||||
: std::true_type {};
|
||||
template <DataType T>
|
||||
using DataTypeNativeOrVoid =
|
||||
typename std::conditional<has_native_type<TypeTraits<T>>::value,
|
||||
typename TypeTraits<T>::NativeType,
|
||||
void>::type;
|
||||
template <DataType T>
|
||||
using DataTypeToOffsetMap =
|
||||
std::unordered_map<DataTypeNativeOrVoid<T>, int64_t>;
|
||||
|
||||
template <DataType T>
|
||||
void
|
||||
WriteOptFieldIvfDataImpl(
|
||||
const int64_t field_id,
|
||||
const std::shared_ptr<LocalChunkManager>& local_chunk_manager,
|
||||
const std::string& local_data_path,
|
||||
const std::vector<FieldDataPtr>& field_datas,
|
||||
uint64_t& write_offset) {
|
||||
using FieldDataT = DataTypeNativeOrVoid<T>;
|
||||
using OffsetT = uint32_t;
|
||||
std::unordered_map<FieldDataT, std::vector<OffsetT>> mp;
|
||||
OffsetT offset = 0;
|
||||
for (const auto& field_data : field_datas) {
|
||||
for (int64_t i = 0; i < field_data->get_num_rows(); ++i) {
|
||||
auto val =
|
||||
*reinterpret_cast<const FieldDataT*>(field_data->RawValue(i));
|
||||
mp[val].push_back(offset++);
|
||||
}
|
||||
}
|
||||
local_chunk_manager->Write(local_data_path,
|
||||
write_offset,
|
||||
const_cast<int64_t*>(&field_id),
|
||||
sizeof(field_id));
|
||||
write_offset += sizeof(field_id);
|
||||
const uint32_t num_of_unique_field_data = mp.size();
|
||||
local_chunk_manager->Write(local_data_path,
|
||||
write_offset,
|
||||
const_cast<uint32_t*>(&num_of_unique_field_data),
|
||||
sizeof(num_of_unique_field_data));
|
||||
write_offset += sizeof(num_of_unique_field_data);
|
||||
for (const auto& [val, offsets] : mp) {
|
||||
const uint32_t offsets_cnt = offsets.size();
|
||||
local_chunk_manager->Write(local_data_path,
|
||||
write_offset,
|
||||
const_cast<uint32_t*>(&offsets_cnt),
|
||||
sizeof(offsets_cnt));
|
||||
write_offset += sizeof(offsets_cnt);
|
||||
const size_t data_size = offsets_cnt * sizeof(OffsetT);
|
||||
local_chunk_manager->Write(local_data_path,
|
||||
write_offset,
|
||||
const_cast<OffsetT*>(offsets.data()),
|
||||
data_size);
|
||||
write_offset += data_size;
|
||||
}
|
||||
}
|
||||
|
||||
#define GENERATE_OPT_FIELD_IVF_IMPL(DT) \
|
||||
WriteOptFieldIvfDataImpl<DT>(field_id, \
|
||||
local_chunk_manager, \
|
||||
local_data_path, \
|
||||
field_datas, \
|
||||
write_offset)
|
||||
bool
|
||||
WriteOptFieldIvfData(
|
||||
const DataType& dt,
|
||||
const int64_t field_id,
|
||||
const std::shared_ptr<LocalChunkManager>& local_chunk_manager,
|
||||
const std::string& local_data_path,
|
||||
const std::vector<FieldDataPtr>& field_datas,
|
||||
uint64_t& write_offset) {
|
||||
switch (dt) {
|
||||
case DataType::BOOL:
|
||||
GENERATE_OPT_FIELD_IVF_IMPL(DataType::BOOL);
|
||||
break;
|
||||
case DataType::INT8:
|
||||
GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT8);
|
||||
break;
|
||||
case DataType::INT16:
|
||||
GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT16);
|
||||
break;
|
||||
case DataType::INT32:
|
||||
GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT32);
|
||||
break;
|
||||
case DataType::INT64:
|
||||
GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT64);
|
||||
break;
|
||||
case DataType::FLOAT:
|
||||
GENERATE_OPT_FIELD_IVF_IMPL(DataType::FLOAT);
|
||||
break;
|
||||
case DataType::DOUBLE:
|
||||
GENERATE_OPT_FIELD_IVF_IMPL(DataType::DOUBLE);
|
||||
break;
|
||||
case DataType::STRING:
|
||||
GENERATE_OPT_FIELD_IVF_IMPL(DataType::STRING);
|
||||
break;
|
||||
case DataType::VARCHAR:
|
||||
GENERATE_OPT_FIELD_IVF_IMPL(DataType::VARCHAR);
|
||||
break;
|
||||
default:
|
||||
LOG_WARN("Unsupported data type in optional scalar field: ", dt);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
#undef GENERATE_OPT_FIELD_IVF_IMPL
|
||||
|
||||
void
|
||||
WriteOptFieldsIvfMeta(
|
||||
const std::shared_ptr<LocalChunkManager>& local_chunk_manager,
|
||||
const std::string& local_data_path,
|
||||
const uint32_t num_of_fields,
|
||||
uint64_t& write_offset) {
|
||||
const uint8_t kVersion = 0;
|
||||
local_chunk_manager->Write(local_data_path,
|
||||
write_offset,
|
||||
const_cast<uint8_t*>(&kVersion),
|
||||
sizeof(kVersion));
|
||||
write_offset += sizeof(kVersion);
|
||||
local_chunk_manager->Write(local_data_path,
|
||||
write_offset,
|
||||
const_cast<uint32_t*>(&num_of_fields),
|
||||
sizeof(num_of_fields));
|
||||
write_offset += sizeof(num_of_fields);
|
||||
}
|
||||
|
||||
// write optional scalar fields ivf info in the following format without space among them
|
||||
// | (meta)
|
||||
// | version (uint8_t) | num_of_fields (uint32_t) |
|
||||
// | (field_0)
|
||||
// | field_id (int64_t) | num_of_unique_field_data (uint32_t)
|
||||
// | size_0 (uint32_t) | offset_0 (uint32_t)...
|
||||
// | size_1 | offset_0, offset_1, ...
|
||||
std::string
|
||||
DiskFileManagerImpl::CacheOptFieldToDisk(
|
||||
std::shared_ptr<milvus_storage::Space> space, OptFieldT& fields_map) {
|
||||
uint32_t num_of_fields = fields_map.size();
|
||||
if (0 == num_of_fields) {
|
||||
return "";
|
||||
} else if (num_of_fields > 1) {
|
||||
PanicInfo(
|
||||
ErrorCode::NotImplemented,
|
||||
"vector index build with multiple fields is not supported yet");
|
||||
}
|
||||
if (nullptr == space) {
|
||||
LOG_ERROR("Failed to cache optional field. Space is null");
|
||||
return "";
|
||||
}
|
||||
|
||||
auto segment_id = GetFieldDataMeta().segment_id;
|
||||
auto vec_field_id = GetFieldDataMeta().field_id;
|
||||
auto local_chunk_manager =
|
||||
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
|
||||
auto local_data_path = storage::GenFieldRawDataPathPrefix(
|
||||
local_chunk_manager, segment_id, vec_field_id) +
|
||||
std::string(VEC_OPT_FIELDS);
|
||||
local_chunk_manager->CreateFile(local_data_path);
|
||||
|
||||
uint64_t write_offset = 0;
|
||||
WriteOptFieldsIvfMeta(
|
||||
local_chunk_manager, local_data_path, num_of_fields, write_offset);
|
||||
|
||||
auto res = space->ScanData();
|
||||
if (!res.ok()) {
|
||||
PanicInfo(IndexBuildError,
|
||||
fmt::format("failed to create scan iterator: {}",
|
||||
res.status().ToString()));
|
||||
}
|
||||
auto reader = res.value();
|
||||
for (auto& [field_id, tup] : fields_map) {
|
||||
const auto& field_name = std::get<0>(tup);
|
||||
const auto& field_type = std::get<1>(tup);
|
||||
std::vector<FieldDataPtr> field_datas;
|
||||
for (auto rec : *reader) {
|
||||
if (!rec.ok()) {
|
||||
PanicInfo(IndexBuildError,
|
||||
fmt::format("failed to read optional field data: {}",
|
||||
rec.status().ToString()));
|
||||
}
|
||||
auto data = rec.ValueUnsafe();
|
||||
if (data == nullptr) {
|
||||
break;
|
||||
}
|
||||
auto total_num_rows = data->num_rows();
|
||||
if (0 == total_num_rows) {
|
||||
LOG_WARN("optional field {} has no data", field_name);
|
||||
return "";
|
||||
}
|
||||
auto col_data = data->GetColumnByName(field_name);
|
||||
auto field_data =
|
||||
storage::CreateFieldData(field_type, 1, total_num_rows);
|
||||
field_data->FillFieldData(col_data);
|
||||
field_datas.emplace_back(field_data);
|
||||
}
|
||||
if (!WriteOptFieldIvfData(field_type,
|
||||
field_id,
|
||||
local_chunk_manager,
|
||||
local_data_path,
|
||||
field_datas,
|
||||
write_offset)) {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
return local_data_path;
|
||||
}
|
||||
|
||||
std::string
|
||||
DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) {
|
||||
uint32_t num_of_fields = fields_map.size();
|
||||
if (0 == num_of_fields) {
|
||||
return "";
|
||||
} else if (num_of_fields > 1) {
|
||||
PanicInfo(
|
||||
ErrorCode::NotImplemented,
|
||||
"vector index build with multiple fields is not supported yet");
|
||||
}
|
||||
|
||||
auto segment_id = GetFieldDataMeta().segment_id;
|
||||
auto vec_field_id = GetFieldDataMeta().field_id;
|
||||
auto local_chunk_manager =
|
||||
LocalChunkManagerSingleton::GetInstance().GetChunkManager();
|
||||
auto local_data_path = storage::GenFieldRawDataPathPrefix(
|
||||
local_chunk_manager, segment_id, vec_field_id) +
|
||||
std::string(VEC_OPT_FIELDS);
|
||||
local_chunk_manager->CreateFile(local_data_path);
|
||||
|
||||
std::vector<FieldDataPtr> field_datas;
|
||||
std::vector<std::string> batch_files;
|
||||
uint64_t write_offset = 0;
|
||||
WriteOptFieldsIvfMeta(
|
||||
local_chunk_manager, local_data_path, num_of_fields, write_offset);
|
||||
|
||||
auto FetchRawData = [&]() {
|
||||
auto fds = GetObjectData(rcm_.get(), batch_files);
|
||||
for (size_t i = 0; i < batch_files.size(); ++i) {
|
||||
auto data = fds[i].get()->GetFieldData();
|
||||
field_datas.emplace_back(data);
|
||||
}
|
||||
};
|
||||
|
||||
auto parallel_degree =
|
||||
uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
|
||||
for (auto& [field_id, tup] : fields_map) {
|
||||
const auto& field_type = std::get<1>(tup);
|
||||
auto& field_paths = std::get<2>(tup);
|
||||
if (0 == field_paths.size()) {
|
||||
LOG_WARN("optional field {} has no data", field_id);
|
||||
return "";
|
||||
}
|
||||
|
||||
std::vector<FieldDataPtr>().swap(field_datas);
|
||||
SortByPath(field_paths);
|
||||
|
||||
for (auto& file : field_paths) {
|
||||
if (batch_files.size() >= parallel_degree) {
|
||||
FetchRawData();
|
||||
batch_files.clear();
|
||||
}
|
||||
batch_files.emplace_back(file);
|
||||
}
|
||||
if (batch_files.size() > 0) {
|
||||
FetchRawData();
|
||||
}
|
||||
if (!WriteOptFieldIvfData(field_type,
|
||||
field_id,
|
||||
local_chunk_manager,
|
||||
local_data_path,
|
||||
field_datas,
|
||||
write_offset)) {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
return local_data_path;
|
||||
}
|
||||
|
||||
std::string
|
||||
DiskFileManagerImpl::GetFileName(const std::string& localfile) {
|
||||
boost::filesystem::path localPath(localfile);
|
||||
|
||||
@ -102,6 +102,13 @@ class DiskFileManagerImpl : public FileManagerImpl {
|
||||
std::string
|
||||
CacheRawDataToDisk(std::shared_ptr<milvus_storage::Space> space);
|
||||
|
||||
std::string
|
||||
CacheOptFieldToDisk(OptFieldT& fields_map);
|
||||
|
||||
std::string
|
||||
CacheOptFieldToDisk(std::shared_ptr<milvus_storage::Space> space,
|
||||
OptFieldT& fields_map);
|
||||
|
||||
virtual bool
|
||||
AddFileUsingSpace(const std::string& local_file_name,
|
||||
const std::vector<int64_t>& local_file_offsets,
|
||||
|
||||
@ -9,15 +9,35 @@
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#include <boost/filesystem/operations.hpp>
|
||||
#include <chrono>
|
||||
#include <arrow/array/builder_binary.h>
|
||||
#include <arrow/array/builder_primitive.h>
|
||||
#include <arrow/record_batch.h>
|
||||
#include <arrow/type.h>
|
||||
#include <arrow/type_fwd.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <memory_resource>
|
||||
#include <string>
|
||||
#include <fstream>
|
||||
#include <vector>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "common/EasyAssert.h"
|
||||
#include "common/FieldDataInterface.h"
|
||||
#include "common/Slice.h"
|
||||
#include "common/Common.h"
|
||||
#include "common/Types.h"
|
||||
#include "storage/ChunkManager.h"
|
||||
#include "storage/DataCodec.h"
|
||||
#include "storage/InsertData.h"
|
||||
#include "storage/ThreadPool.h"
|
||||
#include "storage/Types.h"
|
||||
#include "storage/options.h"
|
||||
#include "storage/schema.h"
|
||||
#include "storage/space.h"
|
||||
#include "storage/Util.h"
|
||||
#include "storage/DiskFileManagerImpl.h"
|
||||
#include "storage/LocalChunkManagerSingleton.h"
|
||||
@ -183,3 +203,218 @@ TEST_F(DiskAnnFileManagerTest, TestThreadPoolException) {
|
||||
EXPECT_EQ(std::string(e.what()), "run time error");
|
||||
}
|
||||
}
|
||||
|
||||
const int64_t kOptFieldId = 123456;
|
||||
const std::string kOptFieldName = "opt_field_name";
|
||||
const DataType kOptFiledType = DataType::INT64;
|
||||
const int64_t kOptFieldDataRange = 10000;
|
||||
const std::string kOptFieldPath = "/tmp/diskann/opt_field/123123";
|
||||
// const std::string kOptFieldPath = "/tmp/diskann/index_files/1000/index";
|
||||
const size_t kEntityCnt = 1000 * 1000;
|
||||
const DataType kOptFieldDataType = DataType::INT64;
|
||||
const FieldDataMeta kOptVecFieldDataMeta = {1, 2, 3, 100};
|
||||
using OffsetT = uint32_t;
|
||||
|
||||
auto
|
||||
CreateFileManager(const ChunkManagerPtr& cm)
|
||||
-> std::shared_ptr<DiskFileManagerImpl> {
|
||||
// collection_id: 1, partition_id: 2, segment_id: 3
|
||||
// field_id: 100, index_build_id: 1000, index_version: 1
|
||||
IndexMeta index_meta = {
|
||||
3, 100, 1000, 1, "opt_fields", "field_name", DataType::VECTOR_FLOAT, 1};
|
||||
int64_t slice_size = milvus::FILE_SLICE_SIZE;
|
||||
return std::make_shared<DiskFileManagerImpl>(
|
||||
storage::FileManagerContext(kOptVecFieldDataMeta, index_meta, cm));
|
||||
}
|
||||
|
||||
auto
|
||||
PrepareRawFieldData() -> std::vector<int64_t> {
|
||||
std::vector<int64_t> data(kEntityCnt);
|
||||
int64_t field_val = 0;
|
||||
for (size_t i = 0; i < kEntityCnt; ++i) {
|
||||
data[i] = field_val++;
|
||||
if (field_val >= kOptFieldDataRange) {
|
||||
field_val = 0;
|
||||
}
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
auto
|
||||
PrepareInsertData() -> std::string {
|
||||
size_t sz = sizeof(int64_t) * kEntityCnt;
|
||||
std::vector<int64_t> data = PrepareRawFieldData();
|
||||
auto field_data =
|
||||
storage::CreateFieldData(kOptFieldDataType, 1, kEntityCnt);
|
||||
field_data->FillFieldData(data.data(), kEntityCnt);
|
||||
storage::InsertData insert_data(field_data);
|
||||
insert_data.SetFieldDataMeta(kOptVecFieldDataMeta);
|
||||
insert_data.SetTimestamps(0, 100);
|
||||
auto serialized_data = insert_data.Serialize(storage::StorageType::Remote);
|
||||
|
||||
auto chunk_manager =
|
||||
storage::CreateChunkManager(get_default_local_storage_config());
|
||||
|
||||
std::string path = kOptFieldPath + "0";
|
||||
boost::filesystem::remove_all(path);
|
||||
chunk_manager->Write(path, serialized_data.data(), serialized_data.size());
|
||||
return path;
|
||||
}
|
||||
|
||||
auto
|
||||
PrepareInsertDataSpace()
|
||||
-> std::pair<std::string, std::shared_ptr<milvus_storage::Space>> {
|
||||
std::string path = kOptFieldPath + "1";
|
||||
arrow::FieldVector arrow_fields{
|
||||
arrow::field("pk", arrow::int64()),
|
||||
arrow::field("ts", arrow::int64()),
|
||||
arrow::field(kOptFieldName, arrow::int64()),
|
||||
arrow::field("vec", arrow::fixed_size_binary(1))};
|
||||
auto arrow_schema = std::make_shared<arrow::Schema>(arrow_fields);
|
||||
auto schema_options = std::make_shared<milvus_storage::SchemaOptions>();
|
||||
schema_options->primary_column = "pk";
|
||||
schema_options->version_column = "ts";
|
||||
schema_options->vector_column = "vec";
|
||||
auto schema =
|
||||
std::make_shared<milvus_storage::Schema>(arrow_schema, schema_options);
|
||||
boost::filesystem::remove_all(path);
|
||||
boost::filesystem::create_directories(path);
|
||||
EXPECT_TRUE(schema->Validate().ok());
|
||||
auto opt_space = milvus_storage::Space::Open(
|
||||
"file://" + boost::filesystem::canonical(path).string(),
|
||||
milvus_storage::Options{schema});
|
||||
EXPECT_TRUE(opt_space.has_value());
|
||||
auto space = std::move(opt_space.value());
|
||||
const auto data = PrepareRawFieldData();
|
||||
arrow::Int64Builder pk_builder;
|
||||
arrow::Int64Builder ts_builder;
|
||||
arrow::NumericBuilder<arrow::Int64Type> scalar_builder;
|
||||
arrow::FixedSizeBinaryBuilder vec_builder(arrow::fixed_size_binary(1));
|
||||
const uint8_t kByteZero = 0;
|
||||
for (size_t i = 0; i < kEntityCnt; ++i) {
|
||||
EXPECT_TRUE(pk_builder.Append(i).ok());
|
||||
EXPECT_TRUE(ts_builder.Append(i).ok());
|
||||
EXPECT_TRUE(vec_builder.Append(&kByteZero).ok());
|
||||
}
|
||||
for (size_t i = 0; i < kEntityCnt; ++i) {
|
||||
EXPECT_TRUE(scalar_builder.Append(data[i]).ok());
|
||||
}
|
||||
std::shared_ptr<arrow::Array> pk_array;
|
||||
EXPECT_TRUE(pk_builder.Finish(&pk_array).ok());
|
||||
std::shared_ptr<arrow::Array> ts_array;
|
||||
EXPECT_TRUE(ts_builder.Finish(&ts_array).ok());
|
||||
std::shared_ptr<arrow::Array> scalar_array;
|
||||
EXPECT_TRUE(scalar_builder.Finish(&scalar_array).ok());
|
||||
std::shared_ptr<arrow::Array> vec_array;
|
||||
EXPECT_TRUE(vec_builder.Finish(&vec_array).ok());
|
||||
auto batch =
|
||||
arrow::RecordBatch::Make(arrow_schema,
|
||||
kEntityCnt,
|
||||
{pk_array, ts_array, scalar_array, vec_array});
|
||||
auto write_opt = milvus_storage::WriteOption{kEntityCnt};
|
||||
space->Write(arrow::RecordBatchReader::Make({batch}, arrow_schema)
|
||||
.ValueOrDie()
|
||||
.get(),
|
||||
&write_opt);
|
||||
return {path, std::move(space)};
|
||||
}
|
||||
|
||||
auto
|
||||
PrepareOptionalField(const std::shared_ptr<DiskFileManagerImpl>& file_manager,
|
||||
const std::string& insert_file_path) -> OptFieldT {
|
||||
OptFieldT opt_field;
|
||||
std::vector<std::string> insert_files;
|
||||
insert_files.emplace_back(insert_file_path);
|
||||
opt_field[kOptFieldId] = {kOptFieldName, kOptFiledType, insert_files};
|
||||
return opt_field;
|
||||
}
|
||||
|
||||
void
|
||||
CheckOptFieldCorrectness(const std::string& local_file_path) {
|
||||
std::ifstream ifs(local_file_path);
|
||||
if (!ifs.is_open()) {
|
||||
FAIL() << "open file failed: " << local_file_path << std::endl;
|
||||
return;
|
||||
}
|
||||
uint8_t meta_version;
|
||||
uint32_t meta_num_of_fields, num_of_unique_field_data;
|
||||
int64_t field_id;
|
||||
ifs.read(reinterpret_cast<char*>(&meta_version), sizeof(meta_version));
|
||||
EXPECT_EQ(meta_version, 0);
|
||||
ifs.read(reinterpret_cast<char*>(&meta_num_of_fields),
|
||||
sizeof(meta_num_of_fields));
|
||||
EXPECT_EQ(meta_num_of_fields, 1);
|
||||
ifs.read(reinterpret_cast<char*>(&field_id), sizeof(field_id));
|
||||
EXPECT_EQ(field_id, kOptFieldId);
|
||||
ifs.read(reinterpret_cast<char*>(&num_of_unique_field_data),
|
||||
sizeof(num_of_unique_field_data));
|
||||
EXPECT_EQ(num_of_unique_field_data, kOptFieldDataRange);
|
||||
|
||||
uint32_t expected_single_category_offset_cnt =
|
||||
kEntityCnt / kOptFieldDataRange;
|
||||
uint32_t read_single_category_offset_cnt;
|
||||
std::vector<OffsetT> single_category_offsets(
|
||||
expected_single_category_offset_cnt);
|
||||
for (uint32_t i = 0; i < num_of_unique_field_data; ++i) {
|
||||
ifs.read(reinterpret_cast<char*>(&read_single_category_offset_cnt),
|
||||
sizeof(read_single_category_offset_cnt));
|
||||
ASSERT_EQ(read_single_category_offset_cnt,
|
||||
expected_single_category_offset_cnt);
|
||||
ifs.read(reinterpret_cast<char*>(single_category_offsets.data()),
|
||||
read_single_category_offset_cnt * sizeof(OffsetT));
|
||||
|
||||
OffsetT first_offset = 0;
|
||||
if (read_single_category_offset_cnt > 0) {
|
||||
first_offset = single_category_offsets[0];
|
||||
}
|
||||
for (size_t j = 1; j < read_single_category_offset_cnt; ++j) {
|
||||
ASSERT_EQ(single_category_offsets[j] % kOptFieldDataRange,
|
||||
first_offset % kOptFieldDataRange);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskFieldEmpty) {
|
||||
auto file_manager = CreateFileManager(cm_);
|
||||
const auto& [insert_file_space_path, space] = PrepareInsertDataSpace();
|
||||
OptFieldT opt_fields;
|
||||
EXPECT_TRUE(file_manager->CacheOptFieldToDisk(opt_fields).empty());
|
||||
EXPECT_TRUE(file_manager->CacheOptFieldToDisk(space, opt_fields).empty());
|
||||
}
|
||||
|
||||
TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskSpaceEmpty) {
|
||||
auto file_manager = CreateFileManager(cm_);
|
||||
auto opt_fileds = PrepareOptionalField(file_manager, "");
|
||||
auto res = file_manager->CacheOptFieldToDisk(nullptr, opt_fileds);
|
||||
EXPECT_TRUE(res.empty());
|
||||
}
|
||||
|
||||
TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskOptFieldMoreThanOne) {
|
||||
auto file_manager = CreateFileManager(cm_);
|
||||
const auto insert_file_path = PrepareInsertData();
|
||||
const auto& [insert_file_space_path, space] = PrepareInsertDataSpace();
|
||||
|
||||
OptFieldT opt_fields = PrepareOptionalField(file_manager, insert_file_path);
|
||||
opt_fields[kOptFieldId + 1] = {
|
||||
kOptFieldName + "second", kOptFiledType, {insert_file_space_path}};
|
||||
EXPECT_THROW(file_manager->CacheOptFieldToDisk(opt_fields), SegcoreError);
|
||||
EXPECT_THROW(file_manager->CacheOptFieldToDisk(space, opt_fields), SegcoreError);
|
||||
}
|
||||
|
||||
TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskCorrect) {
|
||||
auto file_manager = CreateFileManager(cm_);
|
||||
const auto insert_file_path = PrepareInsertData();
|
||||
auto opt_fileds = PrepareOptionalField(file_manager, insert_file_path);
|
||||
auto res = file_manager->CacheOptFieldToDisk(opt_fileds);
|
||||
ASSERT_FALSE(res.empty());
|
||||
CheckOptFieldCorrectness(res);
|
||||
}
|
||||
|
||||
TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskSpaceCorrect) {
|
||||
auto file_manager = CreateFileManager(cm_);
|
||||
const auto& [insert_file_path, space] = PrepareInsertDataSpace();
|
||||
auto opt_fileds = PrepareOptionalField(file_manager, insert_file_path);
|
||||
auto res = file_manager->CacheOptFieldToDisk(space, opt_fileds);
|
||||
ASSERT_FALSE(res.empty());
|
||||
CheckOptFieldCorrectness(res);
|
||||
}
|
||||
@ -27,8 +27,5 @@ const (
|
||||
)
|
||||
|
||||
const (
|
||||
flatIndex = "FLAT"
|
||||
binFlatIndex = "BIN_FLAT"
|
||||
diskAnnIndex = "DISKANN"
|
||||
invalidIndex = "invalid"
|
||||
)
|
||||
|
||||
@ -30,10 +30,11 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
itypeutil "github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
type indexTaskState int32
|
||||
@ -203,6 +204,19 @@ func (ib *indexBuilder) run() {
|
||||
}
|
||||
}
|
||||
|
||||
func getBinLogIds(segment *SegmentInfo, fieldID int64) []int64 {
|
||||
binlogIDs := make([]int64, 0)
|
||||
for _, fieldBinLog := range segment.GetBinlogs() {
|
||||
if fieldBinLog.GetFieldID() == fieldID {
|
||||
for _, binLog := range fieldBinLog.GetBinlogs() {
|
||||
binlogIDs = append(binlogIDs, binLog.GetLogID())
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
return binlogIDs
|
||||
}
|
||||
|
||||
func (ib *indexBuilder) process(buildID UniqueID) bool {
|
||||
ib.taskMutex.RLock()
|
||||
state := ib.tasks[buildID]
|
||||
@ -240,7 +254,8 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
|
||||
return true
|
||||
}
|
||||
indexParams := ib.meta.GetIndexParams(meta.CollectionID, meta.IndexID)
|
||||
if isFlatIndex(getIndexType(indexParams)) || meta.NumRows < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() {
|
||||
indexType := getIndexType(indexParams)
|
||||
if isFlatIndex(indexType) || meta.NumRows < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() {
|
||||
log.Ctx(ib.ctx).Info("segment does not need index really", zap.Int64("buildID", buildID),
|
||||
zap.Int64("segmentID", meta.SegmentID), zap.Int64("num rows", meta.NumRows))
|
||||
if err := ib.meta.FinishTask(&indexpb.IndexTaskInfo{
|
||||
@ -269,14 +284,23 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
binlogIDs := make([]int64, 0)
|
||||
fieldID := ib.meta.GetFieldIDByIndexID(meta.CollectionID, meta.IndexID)
|
||||
for _, fieldBinLog := range segment.GetBinlogs() {
|
||||
if fieldBinLog.GetFieldID() == fieldID {
|
||||
for _, binLog := range fieldBinLog.GetBinlogs() {
|
||||
binlogIDs = append(binlogIDs, binLog.GetLogID())
|
||||
// vector index build needs information of optional scalar fields data
|
||||
optionalFields := make([]*indexpb.OptionalFieldInfo, 0)
|
||||
if Params.CommonCfg.EnableNodeFilteringOnPartitionKey.GetAsBool() && isOptionalScalarFieldSupported(indexType) {
|
||||
colSchema := ib.meta.GetCollection(meta.CollectionID).Schema
|
||||
hasPartitionKey := typeutil.HasPartitionKey(colSchema)
|
||||
if hasPartitionKey {
|
||||
partitionKeyField, err := typeutil.GetPartitionKeyFieldSchema(colSchema)
|
||||
if partitionKeyField == nil || err != nil {
|
||||
log.Ctx(ib.ctx).Warn("index builder get partition key field failed", zap.Int64("build", buildID), zap.Error(err))
|
||||
} else {
|
||||
optionalFields = append(optionalFields, &indexpb.OptionalFieldInfo{
|
||||
FieldID: partitionKeyField.FieldID,
|
||||
FieldName: partitionKeyField.Name,
|
||||
FieldType: int32(partitionKeyField.DataType),
|
||||
DataIds: getBinLogIds(segment, partitionKeyField.FieldID),
|
||||
})
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
@ -306,6 +330,8 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
|
||||
}
|
||||
}
|
||||
|
||||
fieldID := ib.meta.GetFieldIDByIndexID(meta.CollectionID, meta.IndexID)
|
||||
binlogIDs := getBinLogIds(segment, fieldID)
|
||||
var req *indexpb.CreateJobRequest
|
||||
if Params.CommonCfg.EnableStorageV2.GetAsBool() {
|
||||
collectionInfo, err := ib.handler.GetCollection(ib.ctx, segment.GetCollectionID())
|
||||
@ -329,55 +355,57 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
storePath, err := typeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue(), segment.GetID())
|
||||
storePath, err := itypeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue(), segment.GetID())
|
||||
if err != nil {
|
||||
log.Ctx(ib.ctx).Warn("failed to get storage uri", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
indexStorePath, err := typeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue()+"/index", segment.GetID())
|
||||
indexStorePath, err := itypeutil.GetStorageURI(params.Params.CommonCfg.StorageScheme.GetValue(), params.Params.CommonCfg.StoragePathPrefix.GetValue()+"/index", segment.GetID())
|
||||
if err != nil {
|
||||
log.Ctx(ib.ctx).Warn("failed to get storage uri", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
req = &indexpb.CreateJobRequest{
|
||||
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
|
||||
IndexFilePrefix: path.Join(ib.chunkManager.RootPath(), common.SegmentIndexPath),
|
||||
BuildID: buildID,
|
||||
IndexVersion: meta.IndexVersion + 1,
|
||||
StorageConfig: storageConfig,
|
||||
IndexParams: indexParams,
|
||||
TypeParams: typeParams,
|
||||
NumRows: meta.NumRows,
|
||||
CollectionID: segment.GetCollectionID(),
|
||||
PartitionID: segment.GetPartitionID(),
|
||||
SegmentID: segment.GetID(),
|
||||
FieldID: fieldID,
|
||||
FieldName: field.Name,
|
||||
FieldType: field.DataType,
|
||||
StorePath: storePath,
|
||||
StoreVersion: segment.GetStorageVersion(),
|
||||
IndexStorePath: indexStorePath,
|
||||
Dim: int64(dim),
|
||||
CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
|
||||
DataIds: binlogIDs,
|
||||
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
|
||||
IndexFilePrefix: path.Join(ib.chunkManager.RootPath(), common.SegmentIndexPath),
|
||||
BuildID: buildID,
|
||||
IndexVersion: meta.IndexVersion + 1,
|
||||
StorageConfig: storageConfig,
|
||||
IndexParams: indexParams,
|
||||
TypeParams: typeParams,
|
||||
NumRows: meta.NumRows,
|
||||
CollectionID: segment.GetCollectionID(),
|
||||
PartitionID: segment.GetPartitionID(),
|
||||
SegmentID: segment.GetID(),
|
||||
FieldID: fieldID,
|
||||
FieldName: field.Name,
|
||||
FieldType: field.DataType,
|
||||
StorePath: storePath,
|
||||
StoreVersion: segment.GetStorageVersion(),
|
||||
IndexStorePath: indexStorePath,
|
||||
Dim: int64(dim),
|
||||
CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
|
||||
DataIds: binlogIDs,
|
||||
OptionalScalarFields: optionalFields,
|
||||
}
|
||||
} else {
|
||||
req = &indexpb.CreateJobRequest{
|
||||
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
|
||||
IndexFilePrefix: path.Join(ib.chunkManager.RootPath(), common.SegmentIndexPath),
|
||||
BuildID: buildID,
|
||||
IndexVersion: meta.IndexVersion + 1,
|
||||
StorageConfig: storageConfig,
|
||||
IndexParams: indexParams,
|
||||
TypeParams: typeParams,
|
||||
NumRows: meta.NumRows,
|
||||
CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
|
||||
DataIds: binlogIDs,
|
||||
CollectionID: segment.GetCollectionID(),
|
||||
PartitionID: segment.GetPartitionID(),
|
||||
SegmentID: segment.GetID(),
|
||||
FieldID: fieldID,
|
||||
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
|
||||
IndexFilePrefix: path.Join(ib.chunkManager.RootPath(), common.SegmentIndexPath),
|
||||
BuildID: buildID,
|
||||
IndexVersion: meta.IndexVersion + 1,
|
||||
StorageConfig: storageConfig,
|
||||
IndexParams: indexParams,
|
||||
TypeParams: typeParams,
|
||||
NumRows: meta.NumRows,
|
||||
CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
|
||||
DataIds: binlogIDs,
|
||||
CollectionID: segment.GetCollectionID(),
|
||||
PartitionID: segment.GetPartitionID(),
|
||||
SegmentID: segment.GetID(),
|
||||
FieldID: fieldID,
|
||||
OptionalScalarFields: optionalFields,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -37,6 +37,7 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
mclient "github.com/milvus-io/milvus/internal/util/mock"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
@ -1196,3 +1197,292 @@ func TestIndexBuilderV2(t *testing.T) {
|
||||
}
|
||||
ib.Stop()
|
||||
}
|
||||
|
||||
func TestVecIndexWithOptionalScalarField(t *testing.T) {
|
||||
var (
|
||||
collID = UniqueID(100)
|
||||
partID = UniqueID(200)
|
||||
indexID = UniqueID(300)
|
||||
segID = UniqueID(500)
|
||||
buildID = UniqueID(600)
|
||||
nodeID = UniqueID(700)
|
||||
partitionKeyID = UniqueID(800)
|
||||
)
|
||||
|
||||
paramtable.Init()
|
||||
ctx := context.Background()
|
||||
minNumberOfRowsToBuild := paramtable.Get().DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() + 1
|
||||
|
||||
catalog := catalogmocks.NewDataCoordCatalog(t)
|
||||
catalog.On("CreateSegmentIndex",
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
).Return(nil)
|
||||
catalog.On("AlterSegmentIndexes",
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
).Return(nil)
|
||||
|
||||
ic := mocks.NewMockIndexNodeClient(t)
|
||||
ic.EXPECT().GetJobStats(mock.Anything, mock.Anything, mock.Anything).
|
||||
Return(&indexpb.GetJobStatsResponse{
|
||||
Status: merr.Success(),
|
||||
TotalJobNum: 0,
|
||||
EnqueueJobNum: 0,
|
||||
InProgressJobNum: 0,
|
||||
TaskSlots: 1,
|
||||
JobInfos: []*indexpb.JobInfo{},
|
||||
}, nil)
|
||||
ic.EXPECT().QueryJobs(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
|
||||
func(ctx context.Context, in *indexpb.QueryJobsRequest, option ...grpc.CallOption) (*indexpb.QueryJobsResponse, error) {
|
||||
indexInfos := make([]*indexpb.IndexTaskInfo, 0)
|
||||
for _, buildID := range in.BuildIDs {
|
||||
indexInfos = append(indexInfos, &indexpb.IndexTaskInfo{
|
||||
BuildID: buildID,
|
||||
State: commonpb.IndexState_Finished,
|
||||
IndexFileKeys: []string{"file1", "file2"},
|
||||
})
|
||||
}
|
||||
return &indexpb.QueryJobsResponse{
|
||||
Status: merr.Success(),
|
||||
ClusterID: in.ClusterID,
|
||||
IndexInfos: indexInfos,
|
||||
}, nil
|
||||
})
|
||||
|
||||
ic.EXPECT().DropJobs(mock.Anything, mock.Anything, mock.Anything).
|
||||
Return(merr.Success(), nil)
|
||||
|
||||
mt := meta{
|
||||
catalog: catalog,
|
||||
collections: map[int64]*collectionInfo{
|
||||
collID: {
|
||||
ID: collID,
|
||||
Schema: &schemapb.CollectionSchema{
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: fieldID,
|
||||
Name: "vec",
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
},
|
||||
{
|
||||
FieldID: partitionKeyID,
|
||||
Name: "scalar",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
IsPartitionKey: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
CreatedAt: 0,
|
||||
},
|
||||
},
|
||||
indexes: map[UniqueID]map[UniqueID]*model.Index{
|
||||
collID: {
|
||||
indexID: {
|
||||
TenantID: "",
|
||||
CollectionID: collID,
|
||||
FieldID: fieldID,
|
||||
IndexID: indexID,
|
||||
IndexName: indexName,
|
||||
IsDeleted: false,
|
||||
CreateTime: 1,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.DimKey,
|
||||
Value: "128",
|
||||
},
|
||||
},
|
||||
IndexParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.MetricTypeKey,
|
||||
Value: "L2",
|
||||
},
|
||||
{
|
||||
Key: common.IndexTypeKey,
|
||||
Value: indexparamcheck.IndexHNSW,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
segments: &SegmentsInfo{
|
||||
segments: map[UniqueID]*SegmentInfo{
|
||||
segID: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: segID,
|
||||
CollectionID: collID,
|
||||
PartitionID: partID,
|
||||
InsertChannel: "",
|
||||
NumOfRows: minNumberOfRowsToBuild,
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
MaxRowNum: 65536,
|
||||
LastExpireTime: 10,
|
||||
},
|
||||
segmentIndexes: map[UniqueID]*model.SegmentIndex{
|
||||
indexID: {
|
||||
SegmentID: segID,
|
||||
CollectionID: collID,
|
||||
PartitionID: partID,
|
||||
NumRows: minNumberOfRowsToBuild,
|
||||
IndexID: indexID,
|
||||
BuildID: buildID,
|
||||
NodeID: 0,
|
||||
IndexVersion: 0,
|
||||
IndexState: commonpb.IndexState_Unissued,
|
||||
FailReason: "",
|
||||
IsDeleted: false,
|
||||
CreateTime: 0,
|
||||
IndexFileKeys: nil,
|
||||
IndexSize: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{
|
||||
buildID: {
|
||||
SegmentID: segID,
|
||||
CollectionID: collID,
|
||||
PartitionID: partID,
|
||||
NumRows: minNumberOfRowsToBuild,
|
||||
IndexID: indexID,
|
||||
BuildID: buildID,
|
||||
NodeID: 0,
|
||||
IndexVersion: 0,
|
||||
IndexState: commonpb.IndexState_Unissued,
|
||||
FailReason: "",
|
||||
IsDeleted: false,
|
||||
CreateTime: 0,
|
||||
IndexFileKeys: nil,
|
||||
IndexSize: 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
nodeManager := &IndexNodeManager{
|
||||
ctx: ctx,
|
||||
nodeClients: map[UniqueID]types.IndexNodeClient{
|
||||
1: ic,
|
||||
},
|
||||
}
|
||||
|
||||
cm := &mocks.ChunkManager{}
|
||||
cm.EXPECT().RootPath().Return("root")
|
||||
|
||||
waitTaskDoneFunc := func(builder *indexBuilder) {
|
||||
for {
|
||||
builder.taskMutex.RLock()
|
||||
if len(builder.tasks) == 0 {
|
||||
builder.taskMutex.RUnlock()
|
||||
break
|
||||
}
|
||||
builder.taskMutex.RUnlock()
|
||||
}
|
||||
|
||||
assert.Zero(t, len(builder.tasks))
|
||||
}
|
||||
|
||||
resetMetaFunc := func() {
|
||||
mt.buildID2SegmentIndex[buildID].IndexState = commonpb.IndexState_Unissued
|
||||
mt.segments.segments[segID].segmentIndexes[indexID].IndexState = commonpb.IndexState_Unissued
|
||||
mt.indexes[collID][indexID].IndexParams[1].Value = indexparamcheck.IndexHNSW
|
||||
mt.collections[collID].Schema.Fields[1].IsPartitionKey = true
|
||||
}
|
||||
|
||||
paramtable.Get().CommonCfg.EnableNodeFilteringOnPartitionKey.SwapTempValue("true")
|
||||
defer paramtable.Get().CommonCfg.EnableNodeFilteringOnPartitionKey.SwapTempValue("false")
|
||||
ib := newIndexBuilder(ctx, &mt, nodeManager, cm, newIndexEngineVersionManager(), nil)
|
||||
|
||||
t.Run("success to get opt field on startup", func(t *testing.T) {
|
||||
ic.EXPECT().CreateJob(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
|
||||
func(ctx context.Context, in *indexpb.CreateJobRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||
assert.NotZero(t, len(in.OptionalScalarFields), "optional scalar field should be set")
|
||||
return merr.Success(), nil
|
||||
}).Once()
|
||||
assert.Equal(t, 1, len(ib.tasks))
|
||||
assert.Equal(t, indexTaskInit, ib.tasks[buildID])
|
||||
|
||||
ib.scheduleDuration = time.Millisecond * 500
|
||||
ib.Start()
|
||||
waitTaskDoneFunc(ib)
|
||||
resetMetaFunc()
|
||||
})
|
||||
|
||||
segIdx := &model.SegmentIndex{
|
||||
SegmentID: segID,
|
||||
CollectionID: collID,
|
||||
PartitionID: partID,
|
||||
NumRows: minNumberOfRowsToBuild,
|
||||
IndexID: indexID,
|
||||
BuildID: buildID,
|
||||
NodeID: 0,
|
||||
IndexVersion: 0,
|
||||
IndexState: 0,
|
||||
FailReason: "",
|
||||
IsDeleted: false,
|
||||
CreateTime: 0,
|
||||
IndexFileKeys: nil,
|
||||
IndexSize: 0,
|
||||
}
|
||||
|
||||
t.Run("enqueue", func(t *testing.T) {
|
||||
ic.EXPECT().CreateJob(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
|
||||
func(ctx context.Context, in *indexpb.CreateJobRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||
assert.NotZero(t, len(in.OptionalScalarFields), "optional scalar field should be set")
|
||||
return merr.Success(), nil
|
||||
}).Once()
|
||||
err := ib.meta.AddSegmentIndex(segIdx)
|
||||
assert.NoError(t, err)
|
||||
ib.enqueue(buildID)
|
||||
waitTaskDoneFunc(ib)
|
||||
resetMetaFunc()
|
||||
})
|
||||
|
||||
// should still be able to build vec index when opt field is not set
|
||||
t.Run("enqueue returns empty optional field when cfg disable", func(t *testing.T) {
|
||||
paramtable.Get().CommonCfg.EnableNodeFilteringOnPartitionKey.SwapTempValue("false")
|
||||
ic.EXPECT().CreateJob(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
|
||||
func(ctx context.Context, in *indexpb.CreateJobRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||
assert.Zero(t, len(in.OptionalScalarFields), "optional scalar field should be set")
|
||||
return merr.Success(), nil
|
||||
}).Once()
|
||||
err := ib.meta.AddSegmentIndex(segIdx)
|
||||
assert.NoError(t, err)
|
||||
ib.enqueue(buildID)
|
||||
waitTaskDoneFunc(ib)
|
||||
resetMetaFunc()
|
||||
})
|
||||
|
||||
t.Run("enqueue returns empty optional field when index is not HNSW", func(t *testing.T) {
|
||||
paramtable.Get().CommonCfg.EnableNodeFilteringOnPartitionKey.SwapTempValue("true")
|
||||
mt.indexes[collID][indexID].IndexParams[1].Value = indexparamcheck.IndexDISKANN
|
||||
ic.EXPECT().CreateJob(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
|
||||
func(ctx context.Context, in *indexpb.CreateJobRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||
assert.Zero(t, len(in.OptionalScalarFields), "optional scalar field should be set")
|
||||
return merr.Success(), nil
|
||||
}).Once()
|
||||
err := ib.meta.AddSegmentIndex(segIdx)
|
||||
assert.NoError(t, err)
|
||||
ib.enqueue(buildID)
|
||||
waitTaskDoneFunc(ib)
|
||||
resetMetaFunc()
|
||||
})
|
||||
|
||||
t.Run("enqueue returns empty optional field when no partition key", func(t *testing.T) {
|
||||
paramtable.Get().CommonCfg.EnableNodeFilteringOnPartitionKey.SwapTempValue("true")
|
||||
mt.collections[collID].Schema.Fields[1].IsPartitionKey = false
|
||||
ic.EXPECT().CreateJob(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
|
||||
func(ctx context.Context, in *indexpb.CreateJobRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
|
||||
assert.Zero(t, len(in.OptionalScalarFields), "optional scalar field should be set")
|
||||
return merr.Success(), nil
|
||||
}).Once()
|
||||
err := ib.meta.AddSegmentIndex(segIdx)
|
||||
assert.NoError(t, err)
|
||||
ib.enqueue(buildID)
|
||||
waitTaskDoneFunc(ib)
|
||||
resetMetaFunc()
|
||||
})
|
||||
|
||||
ib.nodeDown(nodeID)
|
||||
ib.Stop()
|
||||
}
|
||||
|
||||
@ -197,10 +197,10 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
|
||||
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
if getIndexType(req.GetIndexParams()) == diskAnnIndex && !s.indexNodeManager.ClientSupportDisk() {
|
||||
if getIndexType(req.GetIndexParams()) == indexparamcheck.IndexDISKANN && !s.indexNodeManager.ClientSupportDisk() {
|
||||
errMsg := "all IndexNodes do not support disk indexes, please verify"
|
||||
log.Warn(errMsg)
|
||||
err = merr.WrapErrIndexNotSupported(diskAnnIndex)
|
||||
err = merr.WrapErrIndexNotSupported(indexparamcheck.IndexDISKANN)
|
||||
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
@ -29,6 +29,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
@ -189,7 +190,11 @@ func getIndexType(indexParams []*commonpb.KeyValuePair) string {
|
||||
}
|
||||
|
||||
func isFlatIndex(indexType string) bool {
|
||||
return indexType == flatIndex || indexType == binFlatIndex
|
||||
return indexType == indexparamcheck.IndexFaissIDMap || indexType == indexparamcheck.IndexFaissBinIDMap
|
||||
}
|
||||
|
||||
func isOptionalScalarFieldSupported(indexType string) bool {
|
||||
return indexType == indexparamcheck.IndexHNSW
|
||||
}
|
||||
|
||||
func parseBuildIDFromFilePath(key string) (UniqueID, error) {
|
||||
|
||||
@ -185,6 +185,13 @@ func (it *indexBuildTaskV2) BuildIndex(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, optField := range it.req.GetOptionalScalarFields() {
|
||||
if err := buildIndexInfo.AppendOptionalField(optField); err != nil {
|
||||
log.Ctx(ctx).Warn("append optional field failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
it.index, err = indexcgowrapper.CreateIndexV2(ctx, buildIndexInfo)
|
||||
if err != nil {
|
||||
if it.index != nil && it.index.CleanLocalData() != nil {
|
||||
@ -325,6 +332,18 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error {
|
||||
it.req.DataPaths = append(it.req.DataPaths, path)
|
||||
}
|
||||
}
|
||||
|
||||
if it.req.OptionalScalarFields != nil {
|
||||
for _, optFields := range it.req.GetOptionalScalarFields() {
|
||||
if len(optFields.DataPaths) == 0 {
|
||||
for _, id := range optFields.DataIds {
|
||||
path := metautil.BuildInsertLogPath(it.req.GetStorageConfig().RootPath, it.req.GetCollectionID(), it.req.GetPartitionID(), it.req.GetSegmentID(), optFields.FieldID, id)
|
||||
optFields.DataPaths = append(optFields.DataPaths, path)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// type params can be removed
|
||||
for _, kvPair := range it.req.GetTypeParams() {
|
||||
key, value := kvPair.GetKey(), kvPair.GetValue()
|
||||
@ -518,6 +537,13 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, optField := range it.req.GetOptionalScalarFields() {
|
||||
if err := buildIndexInfo.AppendOptionalField(optField); err != nil {
|
||||
log.Ctx(ctx).Warn("append optional field failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
it.index, err = indexcgowrapper.CreateIndex(ctx, buildIndexInfo)
|
||||
if err != nil {
|
||||
if it.index != nil && it.index.CleanLocalData() != nil {
|
||||
|
||||
@ -293,6 +293,9 @@ func (suite *IndexBuildTaskV2Suite) TestBuildIndex() {
|
||||
StoreVersion: suite.space.GetCurrentVersion(),
|
||||
IndexStorePath: "file://" + suite.space.Path(),
|
||||
Dim: 4,
|
||||
OptionalScalarFields: []*indexpb.OptionalFieldInfo{
|
||||
{FieldID: 1, FieldName: "pk", FieldType: 5, DataIds: []int64{0}},
|
||||
},
|
||||
}
|
||||
|
||||
task := &indexBuildTaskV2{
|
||||
|
||||
@ -242,6 +242,14 @@ message StorageConfig {
|
||||
int64 request_timeout_ms = 13;
|
||||
}
|
||||
|
||||
message OptionalFieldInfo {
|
||||
int64 fieldID = 1;
|
||||
string field_name = 2;
|
||||
int32 field_type = 3;
|
||||
repeated string data_paths = 4;
|
||||
repeated int64 data_ids = 5;
|
||||
}
|
||||
|
||||
message CreateJobRequest {
|
||||
string clusterID = 1;
|
||||
string index_file_prefix = 2;
|
||||
@ -266,6 +274,7 @@ message CreateJobRequest {
|
||||
string index_store_path = 21;
|
||||
int64 dim = 22;
|
||||
repeated int64 data_ids = 23;
|
||||
repeated OptionalFieldInfo optional_scalar_fields = 24;
|
||||
}
|
||||
|
||||
message QueryJobsRequest {
|
||||
|
||||
@ -181,3 +181,18 @@ func (bi *BuildIndexInfo) AppendIndexEngineVersion(indexEngineVersion int32) err
|
||||
status := C.AppendIndexEngineVersionToBuildInfo(bi.cBuildIndexInfo, cIndexEngineVersion)
|
||||
return HandleCStatus(&status, "AppendIndexEngineVersion failed")
|
||||
}
|
||||
|
||||
func (bi *BuildIndexInfo) AppendOptionalField(optField *indexpb.OptionalFieldInfo) error {
|
||||
cFieldId := C.int64_t(optField.GetFieldID())
|
||||
cFieldType := C.int32_t(optField.GetFieldType())
|
||||
cFieldName := C.CString(optField.GetFieldName())
|
||||
for _, dataPath := range optField.GetDataPaths() {
|
||||
cDataPath := C.CString(dataPath)
|
||||
defer C.free(unsafe.Pointer(cDataPath))
|
||||
status := C.AppendOptionalFieldDataPath(bi.cBuildIndexInfo, cFieldId, cFieldName, cFieldType, cDataPath)
|
||||
if err := HandleCStatus(&status, "AppendOptionalFieldDataPath failed"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -190,6 +190,7 @@ type commonConfig struct {
|
||||
HighPriorityThreadCoreCoefficient ParamItem `refreshable:"false"`
|
||||
MiddlePriorityThreadCoreCoefficient ParamItem `refreshable:"false"`
|
||||
LowPriorityThreadCoreCoefficient ParamItem `refreshable:"false"`
|
||||
EnableNodeFilteringOnPartitionKey ParamItem `refreshable:"false"`
|
||||
MaxDegree ParamItem `refreshable:"true"`
|
||||
SearchListSize ParamItem `refreshable:"true"`
|
||||
PQCodeBudgetGBRatio ParamItem `refreshable:"true"`
|
||||
@ -422,6 +423,13 @@ This configuration is only used by querynode and indexnode, it selects CPU instr
|
||||
}
|
||||
p.IndexSliceSize.Init(base.mgr)
|
||||
|
||||
p.EnableNodeFilteringOnPartitionKey = ParamItem{
|
||||
Key: "common.nodeFiltering.enableOnPartitionKey",
|
||||
Version: "2.5.0",
|
||||
DefaultValue: "false",
|
||||
}
|
||||
p.EnableNodeFilteringOnPartitionKey.Init(base.mgr)
|
||||
|
||||
p.MaxDegree = ParamItem{
|
||||
Key: "common.DiskIndex.MaxDegree",
|
||||
Version: "2.0.0",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user