// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include #include "arrow/array/builder_binary.h" #include "arrow/array/builder_nested.h" #include "arrow/array/builder_primitive.h" #include "arrow/scalar.h" #include "arrow/type_fwd.h" #include "common/type_c.h" #include "fmt/format.h" #include "index/Utils.h" #include "log/Log.h" #include "common/Consts.h" #include "common/EasyAssert.h" #include "common/FieldData.h" #include "common/FieldDataInterface.h" #include "pb/common.pb.h" #include "storage/StorageV2FSCache.h" #ifdef AZURE_BUILD_DIR #include "storage/azure/AzureChunkManager.h" #endif #ifdef ENABLE_GCP_NATIVE #include "storage/gcp-native-storage/GcpNativeChunkManager.h" #endif #include "storage/ChunkManager.h" #include "storage/DiskFileManagerImpl.h" #include "storage/InsertData.h" #include "storage/LocalChunkManager.h" #include "storage/MemFileManagerImpl.h" #include "storage/MinioChunkManager.h" #ifdef USE_OPENDAL #include "storage/opendal/OpenDALChunkManager.h" #endif #include "storage/Types.h" #include "storage/Util.h" #include "common/Common.h" #include "common/Types.h" #include "common/VectorArray.h" #include "storage/ThreadPools.h" #include "storage/MemFileManagerImpl.h" #include "storage/DiskFileManagerImpl.h" #include "storage/KeyRetriever.h" #include "segcore/memory_planner.h" #include "mmap/Types.h" #include "milvus-storage/format/parquet/file_reader.h" #include "milvus-storage/filesystem/fs.h" namespace milvus::storage { constexpr const char* TEMP = "tmp"; std::map ChunkManagerType_Map = { {"local", ChunkManagerType::Local}, {"minio", ChunkManagerType::Minio}, {"remote", ChunkManagerType::Remote}, {"opendal", ChunkManagerType::OpenDAL}}; enum class CloudProviderType : int8_t { UNKNOWN = 0, AWS = 1, GCP = 2, ALIYUN = 3, AZURE = 4, TENCENTCLOUD = 5, GCPNATIVE = 6, HUAWEICLOUD = 7, }; std::map CloudProviderType_Map = { {"aws", CloudProviderType::AWS}, {"gcp", CloudProviderType::GCP}, {"aliyun", CloudProviderType::ALIYUN}, {"azure", CloudProviderType::AZURE}, {"tencent", CloudProviderType::TENCENTCLOUD}, {"gcpnative", CloudProviderType::GCPNATIVE}, {"huawei", CloudProviderType::HUAWEICLOUD}}; std::map ReadAheadPolicy_Map = { {"normal", MADV_NORMAL}, {"random", MADV_RANDOM}, {"sequential", MADV_SEQUENTIAL}, {"willneed", MADV_WILLNEED}, {"dontneed", MADV_DONTNEED}}; // in arrow, null_bitmap read from the least significant bit std::vector genValidIter(const uint8_t* valid_data, int length) { std::vector valid_data_; valid_data_.reserve(length); for (size_t i = 0; i < length; ++i) { auto bit = (valid_data[i >> 3] >> (i & 0x07)) & 1; valid_data_.push_back(bit); } return valid_data_; } void ReadMediumType(BinlogReaderPtr reader) { AssertInfo(reader->Tell() == 0, "medium type must be parsed from stream header"); int32_t magic_num; auto ret = reader->Read(sizeof(magic_num), &magic_num); AssertInfo(ret.ok(), "read binlog failed: {}", ret.what()); AssertInfo(magic_num == MAGIC_NUM, "invalid magic num: {}", magic_num); } void add_vector_payload(std::shared_ptr builder, uint8_t* values, int length) { AssertInfo(builder != nullptr, "empty arrow builder"); auto binary_builder = std::dynamic_pointer_cast(builder); auto ast = binary_builder->AppendValues(values, length); AssertInfo( ast.ok(), "append value to arrow builder failed: {}", ast.ToString()); } // append values for numeric data template void add_numeric_payload(std::shared_ptr builder, DT* start, const uint8_t* valid_data, bool nullable, int length) { AssertInfo(builder != nullptr, "empty arrow builder"); auto numeric_builder = std::dynamic_pointer_cast(builder); arrow::Status ast; if (nullable) { // need iter to read valid_data when write auto iter = genValidIter(valid_data, length); ast = numeric_builder->AppendValues(start, start + length, iter.begin()); AssertInfo(ast.ok(), "append value to arrow builder failed"); } else { ast = numeric_builder->AppendValues(start, start + length); AssertInfo(ast.ok(), "append value to arrow builder failed"); } } void AddPayloadToArrowBuilder(std::shared_ptr builder, const Payload& payload) { AssertInfo(builder != nullptr, "empty arrow builder"); auto raw_data = const_cast(payload.raw_data); auto length = payload.rows; auto data_type = payload.data_type; auto nullable = payload.nullable; switch (data_type) { case DataType::BOOL: { auto bool_data = reinterpret_cast(raw_data); add_numeric_payload( builder, bool_data, payload.valid_data, nullable, length); break; } case DataType::INT8: { auto int8_data = reinterpret_cast(raw_data); add_numeric_payload( builder, int8_data, payload.valid_data, nullable, length); break; } case DataType::INT16: { auto int16_data = reinterpret_cast(raw_data); add_numeric_payload( builder, int16_data, payload.valid_data, nullable, length); break; } case DataType::INT32: { auto int32_data = reinterpret_cast(raw_data); add_numeric_payload( builder, int32_data, payload.valid_data, nullable, length); break; } case DataType::INT64: { auto int64_data = reinterpret_cast(raw_data); add_numeric_payload( builder, int64_data, payload.valid_data, nullable, length); break; } case DataType::FLOAT: { auto float_data = reinterpret_cast(raw_data); add_numeric_payload( builder, float_data, payload.valid_data, nullable, length); break; } case DataType::DOUBLE: { auto double_data = reinterpret_cast(raw_data); add_numeric_payload( builder, double_data, payload.valid_data, nullable, length); break; } case DataType::TIMESTAMPTZ: { auto timestamptz_data = reinterpret_cast(raw_data); add_numeric_payload( builder, timestamptz_data, payload.valid_data, nullable, length); break; } case DataType::VECTOR_FLOAT16: case DataType::VECTOR_BFLOAT16: case DataType::VECTOR_BINARY: case DataType::VECTOR_INT8: case DataType::VECTOR_FLOAT: { add_vector_payload(builder, const_cast(raw_data), length); break; } case DataType::VECTOR_SPARSE_U32_F32: { ThrowInfo(DataTypeInvalid, "Sparse Float Vector payload should be added by calling " "add_one_binary_payload", data_type); } case DataType::VECTOR_ARRAY: { auto list_builder = std::dynamic_pointer_cast(builder); AssertInfo(list_builder != nullptr, "builder must be ListBuilder for VECTOR_ARRAY"); auto vector_arrays = reinterpret_cast(raw_data); if (length > 0) { auto element_type = vector_arrays[0].get_element_type(); // Validate element type switch (element_type) { case DataType::VECTOR_FLOAT: case DataType::VECTOR_BINARY: case DataType::VECTOR_FLOAT16: case DataType::VECTOR_BFLOAT16: case DataType::VECTOR_INT8: break; default: ThrowInfo(DataTypeInvalid, "Unsupported element type in VectorArray: {}", element_type); } // All supported vector types use FixedSizeBinaryBuilder auto value_builder = static_cast( list_builder->value_builder()); AssertInfo(value_builder != nullptr, "value_builder must be FixedSizeBinaryBuilder for " "VectorArray"); for (int i = 0; i < length; ++i) { auto status = list_builder->Append(); AssertInfo(status.ok(), "Failed to append list: {}", status.ToString()); const auto& array = vector_arrays[i]; AssertInfo(array.get_element_type() == element_type, "Inconsistent element types in VectorArray"); int num_vectors = array.length(); auto ast = value_builder->AppendValues( reinterpret_cast(array.data()), num_vectors); AssertInfo(ast.ok(), "Failed to batch append vectors: {}", ast.ToString()); } } break; } default: { ThrowInfo(DataTypeInvalid, "unsupported data type {}", data_type); } } } void AddOneStringToArrowBuilder(std::shared_ptr builder, const char* str, int str_size) { AssertInfo(builder != nullptr, "empty arrow builder"); auto string_builder = std::dynamic_pointer_cast(builder); arrow::Status ast; if (str == nullptr || str_size < 0) { ast = string_builder->AppendNull(); } else { ast = string_builder->Append(str, str_size); } AssertInfo( ast.ok(), "append value to arrow builder failed: {}", ast.ToString()); } void AddOneBinaryToArrowBuilder(std::shared_ptr builder, const uint8_t* data, int length) { AssertInfo(builder != nullptr, "empty arrow builder"); auto binary_builder = std::dynamic_pointer_cast(builder); arrow::Status ast; if (data == nullptr || length < 0) { ast = binary_builder->AppendNull(); } else { ast = binary_builder->Append(data, length); } AssertInfo( ast.ok(), "append value to arrow builder failed: {}", ast.ToString()); } std::shared_ptr CreateArrowBuilder(DataType data_type) { switch (static_cast(data_type)) { case DataType::BOOL: { return std::make_shared(); } case DataType::INT8: { return std::make_shared(); } case DataType::INT16: { return std::make_shared(); } case DataType::INT32: { return std::make_shared(); } case DataType::INT64: { return std::make_shared(); } case DataType::FLOAT: { return std::make_shared(); } case DataType::DOUBLE: { return std::make_shared(); } case DataType::TIMESTAMPTZ: { return std::make_shared(); } case DataType::VARCHAR: case DataType::STRING: case DataType::TEXT: { return std::make_shared(); } case DataType::ARRAY: case DataType::JSON: case DataType::GEOMETRY: { return std::make_shared(); } // sparse float vector doesn't require a dim case DataType::VECTOR_SPARSE_U32_F32: { return std::make_shared(); } default: { ThrowInfo( DataTypeInvalid, "unsupported numeric data type {}", data_type); } } } std::shared_ptr CreateArrowBuilder(DataType data_type, DataType element_type, int dim) { switch (static_cast(data_type)) { case DataType::VECTOR_FLOAT: { AssertInfo(dim > 0, "invalid dim value: {}", dim); return std::make_shared( arrow::fixed_size_binary(dim * sizeof(float))); } case DataType::VECTOR_BINARY: { AssertInfo(dim % 8 == 0 && dim > 0, "invalid dim value: {}", dim); return std::make_shared( arrow::fixed_size_binary(dim / 8)); } case DataType::VECTOR_FLOAT16: { AssertInfo(dim > 0, "invalid dim value: {}", dim); return std::make_shared( arrow::fixed_size_binary(dim * sizeof(float16))); } case DataType::VECTOR_BFLOAT16: { AssertInfo(dim > 0, "invalid dim value"); return std::make_shared( arrow::fixed_size_binary(dim * sizeof(bfloat16))); } case DataType::VECTOR_INT8: { AssertInfo(dim > 0, "invalid dim value"); return std::make_shared( arrow::fixed_size_binary(dim * sizeof(int8))); } case DataType::VECTOR_ARRAY: { AssertInfo(dim > 0, "invalid dim value"); AssertInfo(element_type != DataType::NONE, "element_type must be specified for VECTOR_ARRAY"); std::shared_ptr value_builder; switch (element_type) { case DataType::VECTOR_FLOAT: { int byte_width = dim * sizeof(float); value_builder = std::make_shared( arrow::fixed_size_binary(byte_width)); break; } case DataType::VECTOR_BINARY: { int byte_width = (dim + 7) / 8; value_builder = std::make_shared( arrow::fixed_size_binary(byte_width)); break; } case DataType::VECTOR_FLOAT16: { int byte_width = dim * 2; value_builder = std::make_shared( arrow::fixed_size_binary(byte_width)); break; } case DataType::VECTOR_BFLOAT16: { int byte_width = dim * 2; value_builder = std::make_shared( arrow::fixed_size_binary(byte_width)); break; } case DataType::VECTOR_INT8: { int byte_width = dim; value_builder = std::make_shared( arrow::fixed_size_binary(byte_width)); break; } default: { ThrowInfo(DataTypeInvalid, "unsupported element type {} for VECTOR_ARRAY", GetDataTypeName(element_type)); } } return std::make_shared( arrow::default_memory_pool(), value_builder); } default: { ThrowInfo( DataTypeInvalid, "unsupported vector data type {}", data_type); } } } std::shared_ptr CreateArrowScalarFromDefaultValue(const FieldMeta& field_meta) { auto default_var = field_meta.default_value(); AssertInfo(default_var.has_value(), "cannot create Arrow Scalar from empty default value"); auto default_value = default_var.value(); switch (field_meta.get_data_type()) { case DataType::BOOL: return std::make_shared( default_value.bool_data()); case DataType::INT8: return std::make_shared( default_value.int_data()); case DataType::INT16: return std::make_shared( default_value.int_data()); case DataType::INT32: return std::make_shared( default_value.int_data()); case DataType::INT64: return std::make_shared( default_value.long_data()); case DataType::FLOAT: return std::make_shared( default_value.float_data()); case DataType::DOUBLE: return std::make_shared( default_value.double_data()); case DataType::TIMESTAMPTZ: return std::make_shared( default_value.timestamptz_data()); case DataType::VARCHAR: case DataType::STRING: case DataType::TEXT: return std::make_shared( default_value.string_data()); case DataType::JSON: return std::make_shared( default_value.bytes_data()); default: ThrowInfo(DataTypeInvalid, "unsupported default value data type {}", field_meta.get_data_type()); } } std::shared_ptr CreateArrowSchema(DataType data_type, bool nullable) { switch (static_cast(data_type)) { case DataType::BOOL: { return arrow::schema( {arrow::field("val", arrow::boolean(), nullable)}); } case DataType::INT8: { return arrow::schema( {arrow::field("val", arrow::int8(), nullable)}); } case DataType::INT16: { return arrow::schema( {arrow::field("val", arrow::int16(), nullable)}); } case DataType::INT32: { return arrow::schema( {arrow::field("val", arrow::int32(), nullable)}); } case DataType::INT64: { return arrow::schema( {arrow::field("val", arrow::int64(), nullable)}); } case DataType::FLOAT: { return arrow::schema( {arrow::field("val", arrow::float32(), nullable)}); } case DataType::DOUBLE: { return arrow::schema( {arrow::field("val", arrow::float64(), nullable)}); } case DataType::TIMESTAMPTZ: { return arrow::schema( {arrow::field("val", arrow::int64(), nullable)}); } case DataType::VARCHAR: case DataType::STRING: case DataType::TEXT: { return arrow::schema( {arrow::field("val", arrow::utf8(), nullable)}); } case DataType::ARRAY: case DataType::JSON: case DataType::GEOMETRY: { return arrow::schema( {arrow::field("val", arrow::binary(), nullable)}); } // sparse float vector doesn't require a dim case DataType::VECTOR_SPARSE_U32_F32: { return arrow::schema( {arrow::field("val", arrow::binary(), nullable)}); } default: { ThrowInfo( DataTypeInvalid, "unsupported numeric data type {}", data_type); } } } std::shared_ptr CreateArrowSchema(DataType data_type, int dim, bool nullable) { switch (static_cast(data_type)) { case DataType::VECTOR_FLOAT: { AssertInfo(dim > 0, "invalid dim value: {}", dim); return arrow::schema( {arrow::field("val", arrow::fixed_size_binary(dim * sizeof(float)), nullable)}); } case DataType::VECTOR_BINARY: { AssertInfo(dim % 8 == 0 && dim > 0, "invalid dim value: {}", dim); return arrow::schema({arrow::field( "val", arrow::fixed_size_binary(dim / 8), nullable)}); } case DataType::VECTOR_FLOAT16: { AssertInfo(dim > 0, "invalid dim value: {}", dim); return arrow::schema( {arrow::field("val", arrow::fixed_size_binary(dim * sizeof(float16)), nullable)}); } case DataType::VECTOR_BFLOAT16: { AssertInfo(dim > 0, "invalid dim value"); return arrow::schema( {arrow::field("val", arrow::fixed_size_binary(dim * sizeof(bfloat16)), nullable)}); } case DataType::VECTOR_SPARSE_U32_F32: { return arrow::schema( {arrow::field("val", arrow::binary(), nullable)}); } case DataType::VECTOR_INT8: { AssertInfo(dim > 0, "invalid dim value"); return arrow::schema( {arrow::field("val", arrow::fixed_size_binary(dim * sizeof(int8)), nullable)}); } case DataType::VECTOR_ARRAY: { // VectorArray should not use this overload - should call the one with element_type ThrowInfo( NotImplemented, "VectorArray requires element_type parameter. Use " "CreateArrowSchema(data_type, dim, element_type, nullable)"); } default: { ThrowInfo( DataTypeInvalid, "unsupported vector data type {}", data_type); } } } std::shared_ptr CreateArrowSchema(DataType data_type, int dim, DataType element_type) { AssertInfo(data_type == DataType::VECTOR_ARRAY, "This overload is only for VECTOR_ARRAY type"); AssertInfo(dim > 0, "invalid dim value"); auto value_type = GetArrowDataTypeForVectorArray(element_type, dim); auto metadata = arrow::KeyValueMetadata::Make( {ELEMENT_TYPE_KEY_FOR_ARROW, DIM_KEY}, {std::to_string(static_cast(element_type)), std::to_string(dim)}); // VECTOR_ARRAY is not nullable auto field = arrow::field("val", value_type, false)->WithMetadata(metadata); return arrow::schema({field}); } int GetDimensionFromFileMetaData(const parquet::ColumnDescriptor* schema, DataType data_type) { switch (data_type) { case DataType::VECTOR_FLOAT: { return schema->type_length() / sizeof(float); } case DataType::VECTOR_BINARY: { return schema->type_length() * 8; } case DataType::VECTOR_FLOAT16: { return schema->type_length() / sizeof(float16); } case DataType::VECTOR_BFLOAT16: { return schema->type_length() / sizeof(bfloat16); } case DataType::VECTOR_SPARSE_U32_F32: { ThrowInfo(DataTypeInvalid, fmt::format("GetDimensionFromFileMetaData should not be " "called for sparse vector")); } case DataType::VECTOR_INT8: { return schema->type_length() / sizeof(int8); } default: ThrowInfo(DataTypeInvalid, "unsupported data type {}", data_type); } } std::string GenIndexPathIdentifier(int64_t build_id, int64_t index_version, int64_t segment_id, int64_t field_id) { return std::to_string(build_id) + "_" + std::to_string(index_version) + "_" + std::to_string(segment_id) + "_" + std::to_string(field_id) + "/"; } std::string GenIndexPathPrefix(ChunkManagerPtr cm, int64_t build_id, int64_t index_version, int64_t segment_id, int64_t field_id, bool is_temp) { return GenIndexPathPrefixByType(cm, build_id, index_version, segment_id, field_id, INDEX_ROOT_PATH, is_temp); } std::string GenIndexPathPrefixByType(ChunkManagerPtr cm, int64_t build_id, int64_t index_version, int64_t segment_id, int64_t field_id, const std::string& index_type, bool is_temp) { boost::filesystem::path prefix = cm->GetRootPath(); if (is_temp) { prefix = prefix / TEMP; } boost::filesystem::path path = std::string(index_type); boost::filesystem::path path1 = GenIndexPathIdentifier(build_id, index_version, segment_id, field_id); return (prefix / path / path1).string(); } std::string GenTextIndexPathPrefix(ChunkManagerPtr cm, int64_t build_id, int64_t index_version, int64_t segment_id, int64_t field_id, bool is_temp) { return GenIndexPathPrefixByType(cm, build_id, index_version, segment_id, field_id, TEXT_LOG_ROOT_PATH, is_temp); } std::string GenJsonStatsPathPrefix(ChunkManagerPtr cm, int64_t build_id, int64_t index_version, int64_t segment_id, int64_t field_id, bool is_temp) { boost::filesystem::path prefix = cm->GetRootPath(); if (is_temp) { prefix = prefix / TEMP; } boost::filesystem::path path = std::string(JSON_STATS_ROOT_PATH); boost::filesystem::path path1 = GenIndexPathIdentifier(build_id, index_version, segment_id, field_id); return (prefix / path / path1).string(); } std::string GenJsonStatsPathIdentifier(int64_t build_id, int64_t index_version, int64_t collection_id, int64_t partition_id, int64_t segment_id, int64_t field_id) { boost::filesystem::path p = boost::filesystem::path(std::to_string(build_id)) / std::to_string(index_version) / std::to_string(collection_id) / std::to_string(partition_id) / std::to_string(segment_id) / std::to_string(field_id); return p.string() + "/"; } std::string GenRemoteJsonStatsPathPrefix(ChunkManagerPtr cm, int64_t build_id, int64_t index_version, int64_t collection_id, int64_t partition_id, int64_t segment_id, int64_t field_id) { boost::filesystem::path p = cm->GetRootPath(); p /= std::string(JSON_STATS_ROOT_PATH); p /= std::string(JSON_STATS_DATA_FORMAT_VERSION); p /= GenJsonStatsPathIdentifier(build_id, index_version, collection_id, partition_id, segment_id, field_id); return p.string(); } std::string GenNgramIndexPrefix(ChunkManagerPtr cm, int64_t build_id, int64_t index_version, int64_t segment_id, int64_t field_id, bool is_temp) { return GenIndexPathPrefixByType(cm, build_id, index_version, segment_id, field_id, NGRAM_LOG_ROOT_PATH, is_temp); } std::string GenFieldRawDataPathPrefix(ChunkManagerPtr cm, int64_t segment_id, int64_t field_id) { boost::filesystem::path prefix = cm->GetRootPath(); boost::filesystem::path path = std::string(RAWDATA_ROOT_PATH); boost::filesystem::path path1 = std::to_string(segment_id) + "/" + std::to_string(field_id) + "/"; return (prefix / path / path1).string(); } std::string GetSegmentRawDataPathPrefix(ChunkManagerPtr cm, int64_t segment_id) { boost::filesystem::path prefix = cm->GetRootPath(); boost::filesystem::path path = std::string(RAWDATA_ROOT_PATH); boost::filesystem::path path1 = std::to_string(segment_id); return (prefix / path / path1).string(); } std::pair EncodeAndUploadIndexSlice(ChunkManager* chunk_manager, uint8_t* buf, int64_t batch_size, IndexMeta index_meta, FieldDataMeta field_meta, std::string object_key, std::shared_ptr plugin_context) { std::shared_ptr index_data = nullptr; if (index_meta.index_non_encoding) { index_data = std::make_shared(buf, batch_size); // index-build tasks assigned from new milvus-coord nodes to none-encoding } else { auto field_data = CreateFieldData(DataType::INT8, DataType::NONE, false); field_data->FillFieldData(buf, batch_size); auto payload_reader = std::make_shared(field_data); index_data = std::make_shared(payload_reader); // index-build tasks assigned from old milvus-coord nodes, fallback to int8 encoding } // index not use valid_data, so no need to set nullable==true index_data->set_index_meta(index_meta); index_data->SetFieldDataMeta(field_meta); auto serialized_index_data = index_data->serialize_to_remote_file(plugin_context); auto serialized_index_size = serialized_index_data.size(); chunk_manager->Write( object_key, serialized_index_data.data(), serialized_index_size); return std::make_pair(std::move(object_key), serialized_index_size); } std::vector>> GetObjectData(ChunkManager* remote_chunk_manager, const std::vector& remote_files, milvus::ThreadPoolPriority priority, bool is_field_data) { auto& pool = ThreadPools::GetThreadPool(priority); std::vector>> futures; futures.reserve(remote_files.size()); auto DownloadAndDeserialize = [](ChunkManager* chunk_manager, bool is_field_data, const std::string file) { // TODO remove this Size() cost auto fileSize = chunk_manager->Size(file); auto buf = std::shared_ptr(new uint8_t[fileSize]); chunk_manager->Read(file, buf.get(), fileSize); auto res = DeserializeFileData(buf, fileSize, is_field_data); return res; }; for (auto& file : remote_files) { futures.emplace_back(pool.Submit( DownloadAndDeserialize, remote_chunk_manager, is_field_data, file)); } return futures; } std::map PutIndexData(ChunkManager* remote_chunk_manager, const std::vector& data_slices, const std::vector& slice_sizes, const std::vector& slice_names, FieldDataMeta& field_meta, IndexMeta& index_meta, std::shared_ptr plugin_context) { auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); std::vector>> futures; AssertInfo(data_slices.size() == slice_sizes.size(), "inconsistent data slices size {} with slice sizes {}", data_slices.size(), slice_sizes.size()); AssertInfo(data_slices.size() == slice_names.size(), "inconsistent data slices size {} with slice names size {}", data_slices.size(), slice_names.size()); for (int64_t i = 0; i < data_slices.size(); ++i) { futures.push_back(pool.Submit(EncodeAndUploadIndexSlice, remote_chunk_manager, const_cast(data_slices[i]), slice_sizes[i], index_meta, field_meta, slice_names[i], plugin_context)); } std::map remote_paths_to_size; std::exception_ptr first_exception = nullptr; for (auto& future : futures) { try { auto res = future.get(); remote_paths_to_size[res.first] = res.second; } catch (...) { if (!first_exception) { first_exception = std::current_exception(); } } } ReleaseArrowUnused(); if (first_exception) { std::rethrow_exception(first_exception); } return remote_paths_to_size; } int64_t GetTotalNumRowsForFieldDatas(const std::vector& field_datas) { int64_t count = 0; for (auto& field_data : field_datas) { count += field_data->get_num_rows(); } return count; } size_t GetNumRowsForLoadInfo(const LoadFieldDataInfo& load_info) { if (load_info.field_infos.empty()) { return 0; } auto& field = load_info.field_infos.begin()->second; return field.row_count; } void ReleaseArrowUnused() { static std::mutex release_mutex; // While multiple threads are releasing memory, // we don't need everyone do releasing, // just let some of them do this also works well if (release_mutex.try_lock()) { arrow::default_memory_pool()->ReleaseUnused(); release_mutex.unlock(); } } ChunkManagerPtr CreateChunkManager(const StorageConfig& storage_config) { auto storage_type = ChunkManagerType_Map[storage_config.storage_type]; switch (storage_type) { case ChunkManagerType::Local: { return std::make_shared( storage_config.root_path); } case ChunkManagerType::Minio: { return std::make_shared(storage_config); } case ChunkManagerType::Remote: { auto cloud_provider_type = CloudProviderType_Map[storage_config.cloud_provider]; switch (cloud_provider_type) { case CloudProviderType::AWS: { return std::make_shared(storage_config); } case CloudProviderType::GCP: { return std::make_shared(storage_config); } case CloudProviderType::ALIYUN: { return std::make_shared(storage_config); } case CloudProviderType::TENCENTCLOUD: { return std::make_shared( storage_config); } case CloudProviderType::HUAWEICLOUD: { return std::make_shared( storage_config); } #ifdef AZURE_BUILD_DIR case CloudProviderType::AZURE: { return std::make_shared(storage_config); } #endif #ifdef ENABLE_GCP_NATIVE case CloudProviderType::GCPNATIVE: { return std::make_shared( storage_config); } #endif default: { return std::make_shared(storage_config); } } } #ifdef USE_OPENDAL case ChunkManagerType::OpenDAL: { return std::make_shared(storage_config); } #endif default: { ThrowInfo(ConfigInvalid, "unsupported storage_config.storage_type {}", fmt::underlying(storage_type)); } } } milvus_storage::ArrowFileSystemPtr InitArrowFileSystem(milvus::storage::StorageConfig storage_config) { StorageV2FSCache::Key conf; if (storage_config.storage_type == "local") { std::string path(storage_config.root_path); conf.root_path = path; conf.storage_type = "local"; } else { conf.address = std::string(storage_config.address); conf.bucket_name = std::string(storage_config.bucket_name); conf.access_key_id = std::string(storage_config.access_key_id); conf.access_key_value = std::string(storage_config.access_key_value); conf.root_path = std::string(storage_config.root_path); conf.storage_type = std::string(storage_config.storage_type); conf.cloud_provider = std::string(storage_config.cloud_provider); conf.iam_endpoint = std::string(storage_config.iam_endpoint); conf.log_level = std::string(storage_config.log_level); conf.region = std::string(storage_config.region); conf.useSSL = storage_config.useSSL; conf.sslCACert = std::string(storage_config.sslCACert); conf.useIAM = storage_config.useIAM; conf.useVirtualHost = storage_config.useVirtualHost; conf.requestTimeoutMs = storage_config.requestTimeoutMs; conf.gcp_credential_json = std::string(storage_config.gcp_credential_json); conf.use_custom_part_upload = true; conf.max_connections = storage_config.max_connections; } return StorageV2FSCache::Instance().Get(conf); } FieldDataPtr CreateFieldData(const DataType& type, const DataType& element_type, bool nullable, int64_t dim, int64_t total_num_rows) { switch (type) { case DataType::BOOL: return std::make_shared>( type, nullable, total_num_rows); case DataType::INT8: return std::make_shared>( type, nullable, total_num_rows); case DataType::INT16: return std::make_shared>( type, nullable, total_num_rows); case DataType::INT32: return std::make_shared>( type, nullable, total_num_rows); case DataType::INT64: return std::make_shared>( type, nullable, total_num_rows); case DataType::FLOAT: return std::make_shared>( type, nullable, total_num_rows); case DataType::DOUBLE: return std::make_shared>( type, nullable, total_num_rows); case DataType::TIMESTAMPTZ: return std::make_shared>( type, nullable, total_num_rows); case DataType::STRING: case DataType::VARCHAR: case DataType::TEXT: return std::make_shared>( type, nullable, total_num_rows); case DataType::JSON: return std::make_shared>( type, nullable, total_num_rows); case DataType::GEOMETRY: return std::make_shared>( type, nullable, total_num_rows); case DataType::ARRAY: return std::make_shared>( type, nullable, total_num_rows); case DataType::VECTOR_FLOAT: return std::make_shared>( dim, type, total_num_rows); case DataType::VECTOR_BINARY: return std::make_shared>( dim, type, total_num_rows); case DataType::VECTOR_FLOAT16: return std::make_shared>( dim, type, total_num_rows); case DataType::VECTOR_BFLOAT16: return std::make_shared>( dim, type, total_num_rows); case DataType::VECTOR_SPARSE_U32_F32: return std::make_shared>( type, total_num_rows); case DataType::VECTOR_INT8: return std::make_shared>( dim, type, total_num_rows); case DataType::VECTOR_ARRAY: return std::make_shared>( dim, element_type, total_num_rows); default: ThrowInfo(DataTypeInvalid, "CreateFieldData not support data type " + GetDataTypeName(type)); } } int64_t GetByteSizeOfFieldDatas(const std::vector& field_datas) { int64_t result = 0; for (auto& data : field_datas) { result += data->Size(); } return result; } std::vector CollectFieldDataChannel(FieldDataChannelPtr& channel) { std::vector result; FieldDataPtr field_data; while (channel->pop(field_data)) { result.push_back(field_data); } return result; } FieldDataPtr MergeFieldData(std::vector& data_array) { if (data_array.size() == 0) { return nullptr; } if (data_array.size() == 1) { return data_array[0]; } size_t total_length = 0; for (const auto& data : data_array) { total_length += data->Length(); } auto element_type = DataType::NONE; auto vector_array_data = dynamic_cast*>(data_array[0].get()); if (vector_array_data) { element_type = vector_array_data->get_element_type(); } auto merged_data = storage::CreateFieldData(data_array[0]->get_data_type(), element_type, data_array[0]->IsNullable()); merged_data->Reserve(total_length); for (const auto& data : data_array) { if (merged_data->IsNullable()) { merged_data->FillFieldData( data->Data(), data->ValidData(), data->Length(), 0); } else { merged_data->FillFieldData(data->Data(), data->Length()); } } return merged_data; } std::vector FetchFieldData(ChunkManager* cm, const std::vector& remote_files) { std::vector field_datas; std::vector batch_files; auto FetchRawData = [&]() { auto fds = GetObjectData(cm, batch_files); for (size_t i = 0; i < batch_files.size(); ++i) { auto data = fds[i].get()->GetFieldData(); field_datas.emplace_back(data); } }; auto parallel_degree = uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); for (auto& file : remote_files) { if (batch_files.size() >= parallel_degree) { FetchRawData(); batch_files.clear(); } batch_files.emplace_back(file); } if (batch_files.size() > 0) { FetchRawData(); } return field_datas; } std::vector GetFieldDatasFromStorageV2(std::vector>& remote_files, int64_t field_id, DataType data_type, DataType element_type, int64_t dim, milvus_storage::ArrowFileSystemPtr fs) { AssertInfo(remote_files.size() > 0, "[StorageV2] remote files size is 0"); std::vector field_data_list; // remote files might not followed the sequence of column group id, // so we need to put into map std::unordered_map> column_group_files; for (auto& remote_chunk_files : remote_files) { AssertInfo(remote_chunk_files.size() > 0, "[StorageV2] remote files size is 0"); int64_t group_id = ExtractGroupIdFromPath(remote_chunk_files[0]); column_group_files[group_id] = remote_chunk_files; } std::vector remote_chunk_files; int64_t column_group_id = -1; size_t col_offset = -1; if (column_group_files.find(field_id) == column_group_files.end()) { for (auto& [group_id, files] : column_group_files) { if (group_id >= START_USER_FIELDID) { continue; } milvus_storage::FieldIDList field_id_list = storage::GetFieldIDList( FieldId(group_id), files[0], nullptr, fs); for (size_t i = 0; i < field_id_list.size(); ++i) { if (field_id_list.Get(i) == field_id) { remote_chunk_files = files; column_group_id = group_id; col_offset = i; break; } } if (column_group_id != -1) { break; } } } else { remote_chunk_files = column_group_files[field_id]; column_group_id = field_id; } if (column_group_id == -1) { LOG_INFO( "[StorageV2] field {} not found in any column group, return " "empty result set", field_id); return field_data_list; } AssertInfo(remote_chunk_files.size() > 0, "[StorageV2] remote files size is 0"); // find column offset if (col_offset == -1) { milvus_storage::FieldIDList field_id_list = storage::GetFieldIDList( FieldId(column_group_id), remote_chunk_files[0], nullptr, fs); for (size_t i = 0; i < field_id_list.size(); ++i) { if (field_id_list.Get(i) == field_id) { col_offset = i; break; } } } // field not found, must be newly added field, return empty resultset if (col_offset == -1) { LOG_INFO( "[StorageV2] field {} not found in column group {}, return empty " "result set", field_id, column_group_id); return field_data_list; } AssertInfo(fs != nullptr, "[StorageV2] storage v2 arrow file system is not initialized"); // set up channel for arrow reader auto field_data_info = FieldDataInfo(); auto parallel_degree = static_cast(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); field_data_info.arrow_reader_channel->set_capacity(parallel_degree); auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); for (auto& column_group_file : remote_chunk_files) { // get all row groups for each file std::vector> row_group_lists; auto reader = std::make_shared( fs, column_group_file, milvus_storage::DEFAULT_READ_BUFFER_SIZE, GetReaderProperties()); auto row_group_num = reader->file_metadata()->GetRowGroupMetadataVector().size(); std::vector all_row_groups(row_group_num); std::iota(all_row_groups.begin(), all_row_groups.end(), 0); row_group_lists.push_back(all_row_groups); // create a schema with only the field id auto field_schema = reader->schema()->field(col_offset)->Copy(); auto arrow_schema = arrow::schema({field_schema}); auto status = reader->Close(); AssertInfo(status.ok(), "[StorageV2] failed to close file reader when get arrow " "schema from file: " + column_group_file + " with error: " + status.ToString()); // split row groups for parallel reading auto strategy = std::make_unique( parallel_degree); auto load_future = pool.Submit([&]() { return LoadWithStrategy(std::vector{column_group_file}, field_data_info.arrow_reader_channel, DEFAULT_FIELD_MAX_MEMORY_LIMIT, std::move(strategy), row_group_lists, fs, nullptr, milvus::proto::common::LoadPriority::HIGH); }); // read field data from channel std::shared_ptr r; while (field_data_info.arrow_reader_channel->pop(r)) { size_t num_rows = 0; std::vector> chunked_arrays; for (const auto& table_info : r->arrow_tables) { num_rows += table_info.table->num_rows(); chunked_arrays.push_back(table_info.table->column(col_offset)); } auto field_data = storage::CreateFieldData(data_type, element_type, field_schema->nullable(), dim, num_rows); for (const auto& chunked_array : chunked_arrays) { field_data->FillFieldData(chunked_array); } field_data_list.push_back(field_data); } } return field_data_list; } std::vector CacheRawDataAndFillMissing(const MemFileManagerImplPtr& file_manager, const Config& config) { // download field data auto field_datas = file_manager->CacheRawDataToMemory(config); // check storage version auto storage_version = index::GetValueFromConfig(config, STORAGE_VERSION_KEY) .value_or(0); int64_t lack_binlog_rows = index::GetValueFromConfig(config, INDEX_NUM_ROWS_KEY) .value_or(0); for (auto& field_data : field_datas) { lack_binlog_rows -= field_data->get_num_rows(); } if (lack_binlog_rows > 0) { LOG_INFO("create index lack binlog detected, lack row num: {}", lack_binlog_rows); auto field_schema = file_manager->GetFieldDataMeta().field_schema; auto default_value = [&]() -> std::optional { if (!field_schema.has_default_value()) { return std::nullopt; } return field_schema.default_value(); }(); auto field_data = storage::CreateFieldData( static_cast(field_schema.data_type()), static_cast(field_schema.element_type()), true, 1, lack_binlog_rows); field_data->FillFieldData(default_value, lack_binlog_rows); field_datas.insert(field_datas.begin(), field_data); } return field_datas; } int64_t ExtractGroupIdFromPath(const std::string& path) { // find second last of / to get group_id size_t last_slash = path.find_last_of("/"); size_t second_last_slash = path.find_last_of("/", last_slash - 1); return std::stol( path.substr(second_last_slash + 1, last_slash - second_last_slash - 1)); } // if it is multi-field column group, read field id list from file metadata // if it is single-field column group, return the column group id milvus_storage::FieldIDList GetFieldIDList(FieldId column_group_id, const std::string& filepath, const std::shared_ptr& arrow_schema, milvus_storage::ArrowFileSystemPtr fs) { milvus_storage::FieldIDList field_id_list; if (column_group_id >= FieldId(START_USER_FIELDID)) { field_id_list.Add(column_group_id.get()); return field_id_list; } auto file_reader = std::make_shared( fs, filepath, arrow_schema, milvus_storage::DEFAULT_READ_BUFFER_SIZE, GetReaderProperties()); field_id_list = file_reader->file_metadata()->GetGroupFieldIDList().GetFieldIDList( column_group_id.get()); auto status = file_reader->Close(); AssertInfo(status.ok(), "failed to close file reader when get field id list from {}", filepath); return field_id_list; } } // namespace milvus::storage