// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include #include "arrow/array/builder_binary.h" #include "arrow/type_fwd.h" #include "fmt/format.h" #include "log/Log.h" #include "common/Consts.h" #include "common/EasyAssert.h" #include "common/FieldData.h" #include "common/FieldDataInterface.h" #ifdef AZURE_BUILD_DIR #include "storage/azure/AzureChunkManager.h" #endif #ifdef ENABLE_GCP_NATIVE #include "storage/gcp-native-storage/GcpNativeChunkManager.h" #endif #include "storage/ChunkManager.h" #include "storage/DiskFileManagerImpl.h" #include "storage/InsertData.h" #include "storage/LocalChunkManager.h" #include "storage/MemFileManagerImpl.h" #include "storage/MinioChunkManager.h" #ifdef USE_OPENDAL #include "storage/opendal/OpenDALChunkManager.h" #endif #include "storage/Types.h" #include "storage/Util.h" #include "storage/ThreadPools.h" #include "storage/MemFileManagerImpl.h" #include "storage/DiskFileManagerImpl.h" namespace milvus::storage { std::map ChunkManagerType_Map = { {"local", ChunkManagerType::Local}, {"minio", ChunkManagerType::Minio}, {"remote", ChunkManagerType::Remote}, {"opendal", ChunkManagerType::OpenDAL}}; enum class CloudProviderType : int8_t { UNKNOWN = 0, AWS = 1, GCP = 2, ALIYUN = 3, AZURE = 4, TENCENTCLOUD = 5, GCPNATIVE = 6, }; std::map CloudProviderType_Map = { {"aws", CloudProviderType::AWS}, {"gcp", CloudProviderType::GCP}, {"aliyun", CloudProviderType::ALIYUN}, {"azure", CloudProviderType::AZURE}, {"tencent", CloudProviderType::TENCENTCLOUD}, {"gcpnative", CloudProviderType::GCPNATIVE}}; std::map ReadAheadPolicy_Map = { {"normal", MADV_NORMAL}, {"random", MADV_RANDOM}, {"sequential", MADV_SEQUENTIAL}, {"willneed", MADV_WILLNEED}, {"dontneed", MADV_DONTNEED}}; // in arrow, null_bitmap read from the least significant bit std::vector genValidIter(const uint8_t* valid_data, int length) { std::vector valid_data_; valid_data_.reserve(length); for (size_t i = 0; i < length; ++i) { auto bit = (valid_data[i >> 3] >> (i & 0x07)) & 1; valid_data_.push_back(bit); } return valid_data_; } StorageType ReadMediumType(BinlogReaderPtr reader) { AssertInfo(reader->Tell() == 0, "medium type must be parsed from stream header"); int32_t magic_num; auto ret = reader->Read(sizeof(magic_num), &magic_num); AssertInfo(ret.ok(), "read binlog failed: {}", ret.what()); if (magic_num == MAGIC_NUM) { return StorageType::Remote; } return StorageType::LocalDisk; } void add_vector_payload(std::shared_ptr builder, uint8_t* values, int length) { AssertInfo(builder != nullptr, "empty arrow builder"); auto binary_builder = std::dynamic_pointer_cast(builder); auto ast = binary_builder->AppendValues(values, length); AssertInfo( ast.ok(), "append value to arrow builder failed: {}", ast.ToString()); } // append values for numeric data template void add_numeric_payload(std::shared_ptr builder, DT* start, const uint8_t* valid_data, bool nullable, int length) { AssertInfo(builder != nullptr, "empty arrow builder"); auto numeric_builder = std::dynamic_pointer_cast(builder); arrow::Status ast; if (nullable) { // need iter to read valid_data when write auto iter = genValidIter(valid_data, length); ast = numeric_builder->AppendValues(start, start + length, iter.begin()); AssertInfo(ast.ok(), "append value to arrow builder failed"); } else { ast = numeric_builder->AppendValues(start, start + length); AssertInfo(ast.ok(), "append value to arrow builder failed"); } } void AddPayloadToArrowBuilder(std::shared_ptr builder, const Payload& payload) { AssertInfo(builder != nullptr, "empty arrow builder"); auto raw_data = const_cast(payload.raw_data); auto length = payload.rows; auto data_type = payload.data_type; auto nullable = payload.nullable; switch (data_type) { case DataType::BOOL: { auto bool_data = reinterpret_cast(raw_data); add_numeric_payload( builder, bool_data, payload.valid_data, nullable, length); break; } case DataType::INT8: { auto int8_data = reinterpret_cast(raw_data); add_numeric_payload( builder, int8_data, payload.valid_data, nullable, length); break; } case DataType::INT16: { auto int16_data = reinterpret_cast(raw_data); add_numeric_payload( builder, int16_data, payload.valid_data, nullable, length); break; } case DataType::INT32: { auto int32_data = reinterpret_cast(raw_data); add_numeric_payload( builder, int32_data, payload.valid_data, nullable, length); break; } case DataType::INT64: { auto int64_data = reinterpret_cast(raw_data); add_numeric_payload( builder, int64_data, payload.valid_data, nullable, length); break; } case DataType::FLOAT: { auto float_data = reinterpret_cast(raw_data); add_numeric_payload( builder, float_data, payload.valid_data, nullable, length); break; } case DataType::DOUBLE: { auto double_data = reinterpret_cast(raw_data); add_numeric_payload( builder, double_data, payload.valid_data, nullable, length); break; } case DataType::VECTOR_FLOAT16: case DataType::VECTOR_BFLOAT16: case DataType::VECTOR_BINARY: case DataType::VECTOR_FLOAT: { add_vector_payload(builder, const_cast(raw_data), length); break; } case DataType::VECTOR_SPARSE_FLOAT: { PanicInfo(DataTypeInvalid, "Sparse Float Vector payload should be added by calling " "add_one_binary_payload", data_type); } default: { PanicInfo(DataTypeInvalid, "unsupported data type {}", data_type); } } } void AddOneStringToArrowBuilder(std::shared_ptr builder, const char* str, int str_size) { AssertInfo(builder != nullptr, "empty arrow builder"); auto string_builder = std::dynamic_pointer_cast(builder); arrow::Status ast; if (str == nullptr || str_size < 0) { ast = string_builder->AppendNull(); } else { ast = string_builder->Append(str, str_size); } AssertInfo( ast.ok(), "append value to arrow builder failed: {}", ast.ToString()); } void AddOneBinaryToArrowBuilder(std::shared_ptr builder, const uint8_t* data, int length) { AssertInfo(builder != nullptr, "empty arrow builder"); auto binary_builder = std::dynamic_pointer_cast(builder); arrow::Status ast; if (data == nullptr || length < 0) { ast = binary_builder->AppendNull(); } else { ast = binary_builder->Append(data, length); } AssertInfo( ast.ok(), "append value to arrow builder failed: {}", ast.ToString()); } std::shared_ptr CreateArrowBuilder(DataType data_type) { switch (static_cast(data_type)) { case DataType::BOOL: { return std::make_shared(); } case DataType::INT8: { return std::make_shared(); } case DataType::INT16: { return std::make_shared(); } case DataType::INT32: { return std::make_shared(); } case DataType::INT64: { return std::make_shared(); } case DataType::FLOAT: { return std::make_shared(); } case DataType::DOUBLE: { return std::make_shared(); } case DataType::VARCHAR: case DataType::STRING: { return std::make_shared(); } case DataType::ARRAY: case DataType::JSON: { return std::make_shared(); } // sparse float vector doesn't require a dim case DataType::VECTOR_SPARSE_FLOAT: { return std::make_shared(); } default: { PanicInfo( DataTypeInvalid, "unsupported numeric data type {}", data_type); } } } std::shared_ptr CreateArrowBuilder(DataType data_type, int dim) { switch (static_cast(data_type)) { case DataType::VECTOR_FLOAT: { AssertInfo(dim > 0, "invalid dim value: {}", dim); return std::make_shared( arrow::fixed_size_binary(dim * sizeof(float))); } case DataType::VECTOR_BINARY: { AssertInfo(dim % 8 == 0 && dim > 0, "invalid dim value: {}", dim); return std::make_shared( arrow::fixed_size_binary(dim / 8)); } case DataType::VECTOR_FLOAT16: { AssertInfo(dim > 0, "invalid dim value: {}", dim); return std::make_shared( arrow::fixed_size_binary(dim * sizeof(float16))); } case DataType::VECTOR_BFLOAT16: { AssertInfo(dim > 0, "invalid dim value"); return std::make_shared( arrow::fixed_size_binary(dim * sizeof(bfloat16))); } default: { PanicInfo( DataTypeInvalid, "unsupported vector data type {}", data_type); } } } std::shared_ptr CreateArrowSchema(DataType data_type, bool nullable) { switch (static_cast(data_type)) { case DataType::BOOL: { return arrow::schema( {arrow::field("val", arrow::boolean(), nullable)}); } case DataType::INT8: { return arrow::schema( {arrow::field("val", arrow::int8(), nullable)}); } case DataType::INT16: { return arrow::schema( {arrow::field("val", arrow::int16(), nullable)}); } case DataType::INT32: { return arrow::schema( {arrow::field("val", arrow::int32(), nullable)}); } case DataType::INT64: { return arrow::schema( {arrow::field("val", arrow::int64(), nullable)}); } case DataType::FLOAT: { return arrow::schema( {arrow::field("val", arrow::float32(), nullable)}); } case DataType::DOUBLE: { return arrow::schema( {arrow::field("val", arrow::float64(), nullable)}); } case DataType::VARCHAR: case DataType::STRING: { return arrow::schema( {arrow::field("val", arrow::utf8(), nullable)}); } case DataType::ARRAY: case DataType::JSON: { return arrow::schema( {arrow::field("val", arrow::binary(), nullable)}); } // sparse float vector doesn't require a dim case DataType::VECTOR_SPARSE_FLOAT: { return arrow::schema( {arrow::field("val", arrow::binary(), nullable)}); } default: { PanicInfo( DataTypeInvalid, "unsupported numeric data type {}", data_type); } } } std::shared_ptr CreateArrowSchema(DataType data_type, int dim, bool nullable) { switch (static_cast(data_type)) { case DataType::VECTOR_FLOAT: { AssertInfo(dim > 0, "invalid dim value: {}", dim); return arrow::schema( {arrow::field("val", arrow::fixed_size_binary(dim * sizeof(float)), nullable)}); } case DataType::VECTOR_BINARY: { AssertInfo(dim % 8 == 0 && dim > 0, "invalid dim value: {}", dim); return arrow::schema({arrow::field( "val", arrow::fixed_size_binary(dim / 8), nullable)}); } case DataType::VECTOR_FLOAT16: { AssertInfo(dim > 0, "invalid dim value: {}", dim); return arrow::schema( {arrow::field("val", arrow::fixed_size_binary(dim * sizeof(float16)), nullable)}); } case DataType::VECTOR_BFLOAT16: { AssertInfo(dim > 0, "invalid dim value"); return arrow::schema( {arrow::field("val", arrow::fixed_size_binary(dim * sizeof(bfloat16)), nullable)}); } case DataType::VECTOR_SPARSE_FLOAT: { return arrow::schema( {arrow::field("val", arrow::binary(), nullable)}); } default: { PanicInfo( DataTypeInvalid, "unsupported vector data type {}", data_type); } } } int GetDimensionFromFileMetaData(const parquet::ColumnDescriptor* schema, DataType data_type) { switch (data_type) { case DataType::VECTOR_FLOAT: { return schema->type_length() / sizeof(float); } case DataType::VECTOR_BINARY: { return schema->type_length() * 8; } case DataType::VECTOR_FLOAT16: { return schema->type_length() / sizeof(float16); } case DataType::VECTOR_BFLOAT16: { return schema->type_length() / sizeof(bfloat16); } case DataType::VECTOR_SPARSE_FLOAT: { PanicInfo(DataTypeInvalid, fmt::format("GetDimensionFromFileMetaData should not be " "called for sparse vector")); } default: PanicInfo(DataTypeInvalid, "unsupported data type {}", data_type); } } int GetDimensionFromArrowArray(std::shared_ptr data, DataType data_type) { switch (data_type) { case DataType::VECTOR_FLOAT: { AssertInfo( data->type()->id() == arrow::Type::type::FIXED_SIZE_BINARY, "inconsistent data type: {}", data->type_id()); auto array = std::dynamic_pointer_cast(data); return array->byte_width() / sizeof(float); } case DataType::VECTOR_BINARY: { AssertInfo( data->type()->id() == arrow::Type::type::FIXED_SIZE_BINARY, "inconsistent data type: {}", data->type_id()); auto array = std::dynamic_pointer_cast(data); return array->byte_width() * 8; } case DataType::VECTOR_FLOAT16: { AssertInfo( data->type()->id() == arrow::Type::type::FIXED_SIZE_BINARY, "inconsistent data type: {}", data->type_id()); auto array = std::dynamic_pointer_cast(data); return array->byte_width() / sizeof(float16); } case DataType::VECTOR_BFLOAT16: { AssertInfo( data->type()->id() == arrow::Type::type::FIXED_SIZE_BINARY, "inconsistent data type: {}", data->type_id()); auto array = std::dynamic_pointer_cast(data); return array->byte_width() / sizeof(bfloat16); } default: PanicInfo(DataTypeInvalid, "unsupported data type {}", data_type); } } std::string GenIndexPathIdentifier(int64_t build_id, int64_t index_version) { return std::to_string(build_id) + "/" + std::to_string(index_version) + "/"; } std::string GenTextIndexPathIdentifier(int64_t build_id, int64_t index_version, int64_t segment_id, int64_t field_id) { return std::to_string(build_id) + "/" + std::to_string(index_version) + "/" + std::to_string(segment_id) + "/" + std::to_string(field_id) + "/"; } std::string GenIndexPathPrefix(ChunkManagerPtr cm, int64_t build_id, int64_t index_version) { boost::filesystem::path prefix = cm->GetRootPath(); boost::filesystem::path path = std::string(INDEX_ROOT_PATH); boost::filesystem::path path1 = GenIndexPathIdentifier(build_id, index_version); return (prefix / path / path1).string(); } std::string GenTextIndexPathPrefix(ChunkManagerPtr cm, int64_t build_id, int64_t index_version, int64_t segment_id, int64_t field_id) { boost::filesystem::path prefix = cm->GetRootPath(); boost::filesystem::path path = std::string(TEXT_LOG_ROOT_PATH); boost::filesystem::path path1 = GenTextIndexPathIdentifier( build_id, index_version, segment_id, field_id); return (prefix / path / path1).string(); } std::string GenJsonKeyIndexPathIdentifier(int64_t build_id, int64_t index_version, int64_t collection_id, int64_t partition_id, int64_t segment_id, int64_t field_id) { return std::to_string(build_id) + "/" + std::to_string(index_version) + "/" + std::to_string(collection_id) + "/" + std::to_string(partition_id) + "/" + std::to_string(segment_id) + "/" + std::to_string(field_id) + "/"; } std::string GenJsonKeyIndexPathPrefix(ChunkManagerPtr cm, int64_t build_id, int64_t index_version, int64_t collection_id, int64_t partition_id, int64_t segment_id, int64_t field_id) { return cm->GetRootPath() + "/" + std::string(JSON_KEY_INDEX_LOG_ROOT_PATH) + "/" + GenJsonKeyIndexPathIdentifier(build_id, index_version, collection_id, partition_id, segment_id, field_id); } std::string GetIndexPathPrefixWithBuildID(ChunkManagerPtr cm, int64_t build_id) { boost::filesystem::path prefix = cm->GetRootPath(); boost::filesystem::path path = std::string(INDEX_ROOT_PATH); boost::filesystem::path path1 = std::to_string(build_id); return (prefix / path / path1).string(); } std::string GenFieldRawDataPathPrefix(ChunkManagerPtr cm, int64_t segment_id, int64_t field_id) { boost::filesystem::path prefix = cm->GetRootPath(); boost::filesystem::path path = std::string(RAWDATA_ROOT_PATH); boost::filesystem::path path1 = std::to_string(segment_id) + "/" + std::to_string(field_id) + "/"; return (prefix / path / path1).string(); } std::string GetSegmentRawDataPathPrefix(ChunkManagerPtr cm, int64_t segment_id) { boost::filesystem::path prefix = cm->GetRootPath(); boost::filesystem::path path = std::string(RAWDATA_ROOT_PATH); boost::filesystem::path path1 = std::to_string(segment_id); return (prefix / path / path1).string(); } std::unique_ptr DownloadAndDecodeRemoteFile(ChunkManager* chunk_manager, const std::string& file, bool is_field_data) { auto fileSize = chunk_manager->Size(file); auto buf = std::shared_ptr(new uint8_t[fileSize]); chunk_manager->Read(file, buf.get(), fileSize); auto res = DeserializeFileData(buf, fileSize, is_field_data); res->SetData(buf); return res; } std::pair EncodeAndUploadIndexSlice(ChunkManager* chunk_manager, uint8_t* buf, int64_t batch_size, IndexMeta index_meta, FieldDataMeta field_meta, std::string object_key) { // index not use valid_data, so no need to set nullable==true auto field_data = CreateFieldData(DataType::INT8, false); field_data->FillFieldData(buf, batch_size); auto indexData = std::make_shared(field_data); indexData->set_index_meta(index_meta); indexData->SetFieldDataMeta(field_meta); auto serialized_index_data = indexData->serialize_to_remote_file(); auto serialized_index_size = serialized_index_data.size(); chunk_manager->Write( object_key, serialized_index_data.data(), serialized_index_size); return std::make_pair(std::move(object_key), serialized_index_size); } std::pair EncodeAndUploadFieldSlice(ChunkManager* chunk_manager, void* buf, int64_t element_count, FieldDataMeta field_data_meta, const FieldMeta& field_meta, std::string object_key) { // dim should not be used for sparse float vector field auto dim = IsSparseFloatVectorDataType(field_meta.get_data_type()) ? -1 : field_meta.get_dim(); auto field_data = CreateFieldData(field_meta.get_data_type(), false, dim, 0); field_data->FillFieldData(buf, element_count); auto insertData = std::make_shared(field_data); insertData->SetFieldDataMeta(field_data_meta); auto serialized_inserted_data = insertData->serialize_to_remote_file(); auto serialized_inserted_data_size = serialized_inserted_data.size(); chunk_manager->Write(object_key, serialized_inserted_data.data(), serialized_inserted_data_size); return std::make_pair(std::move(object_key), serialized_inserted_data_size); } std::vector>> GetObjectData(ChunkManager* remote_chunk_manager, const std::vector& remote_files) { auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH); std::vector>> futures; futures.reserve(remote_files.size()); for (auto& file : remote_files) { futures.emplace_back(pool.Submit( DownloadAndDecodeRemoteFile, remote_chunk_manager, file, true)); } return futures; } std::map PutIndexData(ChunkManager* remote_chunk_manager, const std::vector& data_slices, const std::vector& slice_sizes, const std::vector& slice_names, FieldDataMeta& field_meta, IndexMeta& index_meta) { auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); std::vector>> futures; AssertInfo(data_slices.size() == slice_sizes.size(), "inconsistent data slices size {} with slice sizes {}", data_slices.size(), slice_sizes.size()); AssertInfo(data_slices.size() == slice_names.size(), "inconsistent data slices size {} with slice names size {}", data_slices.size(), slice_names.size()); for (int64_t i = 0; i < data_slices.size(); ++i) { futures.push_back(pool.Submit(EncodeAndUploadIndexSlice, remote_chunk_manager, const_cast(data_slices[i]), slice_sizes[i], index_meta, field_meta, slice_names[i])); } std::map remote_paths_to_size; std::exception_ptr first_exception = nullptr; for (auto& future : futures) { try { auto res = future.get(); remote_paths_to_size[res.first] = res.second; } catch (...) { if (!first_exception) { first_exception = std::current_exception(); } } } ReleaseArrowUnused(); if (first_exception) { std::rethrow_exception(first_exception); } return remote_paths_to_size; } int64_t GetTotalNumRowsForFieldDatas(const std::vector& field_datas) { int64_t count = 0; for (auto& field_data : field_datas) { count += field_data->get_num_rows(); } return count; } size_t GetNumRowsForLoadInfo(const LoadFieldDataInfo& load_info) { if (load_info.field_infos.empty()) { return 0; } auto& field = load_info.field_infos.begin()->second; return field.row_count; } void ReleaseArrowUnused() { static std::mutex release_mutex; // While multiple threads are releasing memory, // we don't need everyone do releasing, // just let some of them do this also works well if (release_mutex.try_lock()) { arrow::default_memory_pool()->ReleaseUnused(); release_mutex.unlock(); } } ChunkManagerPtr CreateChunkManager(const StorageConfig& storage_config) { auto storage_type = ChunkManagerType_Map[storage_config.storage_type]; switch (storage_type) { case ChunkManagerType::Local: { return std::make_shared( storage_config.root_path); } case ChunkManagerType::Minio: { return std::make_shared(storage_config); } case ChunkManagerType::Remote: { auto cloud_provider_type = CloudProviderType_Map[storage_config.cloud_provider]; switch (cloud_provider_type) { case CloudProviderType::AWS: { return std::make_shared(storage_config); } case CloudProviderType::GCP: { return std::make_shared(storage_config); } case CloudProviderType::ALIYUN: { return std::make_shared(storage_config); } case CloudProviderType::TENCENTCLOUD: { return std::make_shared( storage_config); } #ifdef AZURE_BUILD_DIR case CloudProviderType::AZURE: { return std::make_shared(storage_config); } #endif #ifdef ENABLE_GCP_NATIVE case CloudProviderType::GCPNATIVE: { return std::make_shared( storage_config); } #endif default: { return std::make_shared(storage_config); } } } #ifdef USE_OPENDAL case ChunkManagerType::OpenDAL: { return std::make_shared(storage_config); } #endif default: { PanicInfo(ConfigInvalid, "unsupported storage_config.storage_type {}", fmt::underlying(storage_type)); } } } FieldDataPtr CreateFieldData(const DataType& type, bool nullable, int64_t dim, int64_t total_num_rows) { switch (type) { case DataType::BOOL: return std::make_shared>( type, nullable, total_num_rows); case DataType::INT8: return std::make_shared>( type, nullable, total_num_rows); case DataType::INT16: return std::make_shared>( type, nullable, total_num_rows); case DataType::INT32: return std::make_shared>( type, nullable, total_num_rows); case DataType::INT64: return std::make_shared>( type, nullable, total_num_rows); case DataType::FLOAT: return std::make_shared>( type, nullable, total_num_rows); case DataType::DOUBLE: return std::make_shared>( type, nullable, total_num_rows); case DataType::STRING: case DataType::VARCHAR: return std::make_shared>( type, nullable, total_num_rows); case DataType::JSON: return std::make_shared>( type, nullable, total_num_rows); case DataType::ARRAY: return std::make_shared>( type, nullable, total_num_rows); case DataType::VECTOR_FLOAT: return std::make_shared>( dim, type, total_num_rows); case DataType::VECTOR_BINARY: return std::make_shared>( dim, type, total_num_rows); case DataType::VECTOR_FLOAT16: return std::make_shared>( dim, type, total_num_rows); case DataType::VECTOR_BFLOAT16: return std::make_shared>( dim, type, total_num_rows); case DataType::VECTOR_SPARSE_FLOAT: return std::make_shared>( type, total_num_rows); default: PanicInfo(DataTypeInvalid, "CreateFieldData not support data type " + GetDataTypeName(type)); } } int64_t GetByteSizeOfFieldDatas(const std::vector& field_datas) { int64_t result = 0; for (auto& data : field_datas) { result += data->Size(); } return result; } std::vector CollectFieldDataChannel(FieldDataChannelPtr& channel) { std::vector result; FieldDataPtr field_data; while (channel->pop(field_data)) { result.push_back(field_data); } return result; } FieldDataPtr MergeFieldData(std::vector& data_array) { if (data_array.size() == 0) { return nullptr; } if (data_array.size() == 1) { return data_array[0]; } size_t total_length = 0; for (const auto& data : data_array) { total_length += data->Length(); } auto merged_data = storage::CreateFieldData(data_array[0]->get_data_type(), data_array[0]->IsNullable()); merged_data->Reserve(total_length); for (const auto& data : data_array) { if (merged_data->IsNullable()) { merged_data->FillFieldData( data->Data(), data->ValidData(), data->Length()); } else { merged_data->FillFieldData(data->Data(), data->Length()); } } return merged_data; } std::vector FetchFieldData(ChunkManager* cm, const std::vector& remote_files) { std::vector field_datas; std::vector batch_files; auto FetchRawData = [&]() { auto fds = GetObjectData(cm, batch_files); for (size_t i = 0; i < batch_files.size(); ++i) { auto data = fds[i].get()->GetFieldData(); field_datas.emplace_back(data); } }; auto parallel_degree = uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); for (auto& file : remote_files) { if (batch_files.size() >= parallel_degree) { FetchRawData(); batch_files.clear(); } batch_files.emplace_back(file); } if (batch_files.size() > 0) { FetchRawData(); } return field_datas; } } // namespace milvus::storage