// Copyright (C) 2019-2020 Zilliz. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License #include "segcore/Utils.h" #include #include #include #include #include #include "common/Common.h" #include "common/FieldData.h" #include "common/Types.h" #include "index/ScalarIndex.h" #include "mmap/Utils.h" #include "log/Log.h" #include "storage/DataCodec.h" #include "storage/RemoteChunkManagerSingleton.h" #include "storage/ThreadPools.h" #include "storage/Util.h" namespace milvus::segcore { void ParsePksFromFieldData(std::vector& pks, const DataArray& data) { auto data_type = static_cast(data.type()); switch (data_type) { case DataType::INT64: { auto source_data = reinterpret_cast( data.scalars().long_data().data().data()); std::copy_n(source_data, pks.size(), pks.data()); break; } case DataType::VARCHAR: { auto& src_data = data.scalars().string_data().data(); std::copy(src_data.begin(), src_data.end(), pks.begin()); break; } default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported PK {}", data_type)); } } } void ParsePksFromFieldData(DataType data_type, std::vector& pks, const std::vector& datas) { int64_t offset = 0; for (auto& field_data : datas) { AssertInfo(data_type == field_data->get_data_type(), "inconsistent data type when parse pk from field data"); int64_t row_count = field_data->get_num_rows(); switch (data_type) { case DataType::INT64: { std::copy_n(static_cast(field_data->Data()), row_count, pks.data() + offset); break; } case DataType::VARCHAR: { std::copy_n(static_cast(field_data->Data()), row_count, pks.data() + offset); break; } default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported PK {}", data_type)); } } offset += row_count; } } void ParsePksFromIDs(std::vector& pks, DataType data_type, const IdArray& data) { switch (data_type) { case DataType::INT64: { auto source_data = reinterpret_cast(data.int_id().data().data()); std::copy_n(source_data, pks.size(), pks.data()); break; } case DataType::VARCHAR: { auto& source_data = data.str_id().data(); std::copy(source_data.begin(), source_data.end(), pks.begin()); break; } default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported PK {}", data_type)); } } } int64_t GetSizeOfIdArray(const IdArray& data) { if (data.has_int_id()) { return data.int_id().data_size(); } if (data.has_str_id()) { return data.str_id().data_size(); } PanicInfo(DataTypeInvalid, fmt::format("unsupported id {}", data.descriptor()->name())); } int64_t GetRawDataSizeOfDataArray(const DataArray* data, const FieldMeta& field_meta, int64_t num_rows) { int64_t result = 0; auto data_type = field_meta.get_data_type(); if (!IsVariableDataType(data_type)) { result = field_meta.get_sizeof() * num_rows; } else { switch (data_type) { case DataType::STRING: case DataType::VARCHAR: case DataType::TEXT: { auto& string_data = FIELD_DATA(data, string); for (auto& str : string_data) { result += str.size(); } break; } case DataType::JSON: { auto& json_data = FIELD_DATA(data, json); for (auto& json_bytes : json_data) { result += json_bytes.size(); } break; } case DataType::ARRAY: { auto& array_data = FIELD_DATA(data, array); switch (field_meta.get_element_type()) { case DataType::BOOL: { for (auto& array_bytes : array_data) { result += array_bytes.bool_data().data_size() * sizeof(bool); } break; } case DataType::INT8: case DataType::INT16: case DataType::INT32: { for (auto& array_bytes : array_data) { result += array_bytes.int_data().data_size() * sizeof(int); } break; } case DataType::INT64: { for (auto& array_bytes : array_data) { result += array_bytes.long_data().data_size() * sizeof(int64_t); } break; } case DataType::FLOAT: { for (auto& array_bytes : array_data) { result += array_bytes.float_data().data_size() * sizeof(float); } break; } case DataType::DOUBLE: { for (auto& array_bytes : array_data) { result += array_bytes.double_data().data_size() * sizeof(double); } break; } case DataType::VARCHAR: case DataType::STRING: case DataType::TEXT: { for (auto& array_bytes : array_data) { auto element_num = array_bytes.string_data().data_size(); for (int i = 0; i < element_num; ++i) { result += array_bytes.string_data().data(i).size(); } } break; } default: PanicInfo( DataTypeInvalid, fmt::format("unsupported element type for array", field_meta.get_element_type())); } break; } case DataType::VECTOR_SPARSE_FLOAT: { // TODO(SPARSE, size) result += data->vectors().sparse_float_vector().ByteSizeLong(); break; } default: { PanicInfo( DataTypeInvalid, fmt::format("unsupported variable datatype {}", data_type)); } } } return result; } // Note: this is temporary solution. // modify bulk script implement to make process more clear std::unique_ptr CreateScalarDataArray(int64_t count, const FieldMeta& field_meta) { auto data_type = field_meta.get_data_type(); auto data_array = std::make_unique(); data_array->set_field_id(field_meta.get_id().get()); data_array->set_type(static_cast( field_meta.get_data_type())); if (field_meta.is_nullable()) { data_array->mutable_valid_data()->Resize(count, false); } auto scalar_array = data_array->mutable_scalars(); switch (data_type) { case DataType::BOOL: { auto obj = scalar_array->mutable_bool_data(); obj->mutable_data()->Resize(count, false); break; } case DataType::INT8: { auto obj = scalar_array->mutable_int_data(); obj->mutable_data()->Resize(count, 0); break; } case DataType::INT16: { auto obj = scalar_array->mutable_int_data(); obj->mutable_data()->Resize(count, 0); break; } case DataType::INT32: { auto obj = scalar_array->mutable_int_data(); obj->mutable_data()->Resize(count, 0); break; } case DataType::INT64: { auto obj = scalar_array->mutable_long_data(); obj->mutable_data()->Resize(count, 0); break; } case DataType::FLOAT: { auto obj = scalar_array->mutable_float_data(); obj->mutable_data()->Resize(count, 0); break; } case DataType::DOUBLE: { auto obj = scalar_array->mutable_double_data(); obj->mutable_data()->Resize(count, 0); break; } case DataType::VARCHAR: case DataType::STRING: case DataType::TEXT: { auto obj = scalar_array->mutable_string_data(); obj->mutable_data()->Reserve(count); for (auto i = 0; i < count; i++) { *(obj->mutable_data()->Add()) = std::string(); } break; } case DataType::JSON: { auto obj = scalar_array->mutable_json_data(); obj->mutable_data()->Reserve(count); for (int i = 0; i < count; i++) { *(obj->mutable_data()->Add()) = std::string(); } break; } case DataType::ARRAY: { auto obj = scalar_array->mutable_array_data(); obj->mutable_data()->Reserve(count); obj->set_element_type(static_cast( field_meta.get_element_type())); for (int i = 0; i < count; i++) { *(obj->mutable_data()->Add()) = proto::schema::ScalarField(); } break; } default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported datatype {}", data_type)); } } return data_array; } std::unique_ptr CreateVectorDataArray(int64_t count, const FieldMeta& field_meta) { auto data_type = field_meta.get_data_type(); auto data_array = std::make_unique(); data_array->set_field_id(field_meta.get_id().get()); data_array->set_type(static_cast( field_meta.get_data_type())); auto vector_array = data_array->mutable_vectors(); auto dim = 0; if (data_type != DataType::VECTOR_SPARSE_FLOAT) { dim = field_meta.get_dim(); vector_array->set_dim(dim); } switch (data_type) { case DataType::VECTOR_FLOAT: { auto length = count * dim; auto obj = vector_array->mutable_float_vector(); obj->mutable_data()->Resize(length, 0); break; } case DataType::VECTOR_BINARY: { AssertInfo(dim % 8 == 0, "Binary vector field dimension is not a multiple of 8"); auto num_bytes = count * dim / 8; auto obj = vector_array->mutable_binary_vector(); obj->resize(num_bytes); break; } case DataType::VECTOR_FLOAT16: { auto length = count * dim; auto obj = vector_array->mutable_float16_vector(); obj->resize(length * sizeof(float16)); break; } case DataType::VECTOR_BFLOAT16: { auto length = count * dim; auto obj = vector_array->mutable_bfloat16_vector(); obj->resize(length * sizeof(bfloat16)); break; } case DataType::VECTOR_SPARSE_FLOAT: { // does nothing here break; } case DataType::VECTOR_INT8: { auto length = count * dim; auto obj = vector_array->mutable_int8_vector(); obj->resize(length); break; } default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported datatype {}", data_type)); } } return data_array; } std::unique_ptr CreateScalarDataArrayFrom(const void* data_raw, const void* valid_data, int64_t count, const FieldMeta& field_meta) { auto data_type = field_meta.get_data_type(); auto data_array = std::make_unique(); data_array->set_field_id(field_meta.get_id().get()); data_array->set_type(static_cast( field_meta.get_data_type())); if (field_meta.is_nullable()) { auto valid_data_ = reinterpret_cast(valid_data); auto obj = data_array->mutable_valid_data(); obj->Add(valid_data_, valid_data_ + count); } auto scalar_array = data_array->mutable_scalars(); switch (data_type) { case DataType::BOOL: { auto data = reinterpret_cast(data_raw); auto obj = scalar_array->mutable_bool_data(); obj->mutable_data()->Add(data, data + count); break; } case DataType::INT8: { auto data = reinterpret_cast(data_raw); auto obj = scalar_array->mutable_int_data(); obj->mutable_data()->Add(data, data + count); break; } case DataType::INT16: { auto data = reinterpret_cast(data_raw); auto obj = scalar_array->mutable_int_data(); obj->mutable_data()->Add(data, data + count); break; } case DataType::INT32: { auto data = reinterpret_cast(data_raw); auto obj = scalar_array->mutable_int_data(); obj->mutable_data()->Add(data, data + count); break; } case DataType::INT64: { auto data = reinterpret_cast(data_raw); auto obj = scalar_array->mutable_long_data(); obj->mutable_data()->Add(data, data + count); break; } case DataType::FLOAT: { auto data = reinterpret_cast(data_raw); auto obj = scalar_array->mutable_float_data(); obj->mutable_data()->Add(data, data + count); break; } case DataType::DOUBLE: { auto data = reinterpret_cast(data_raw); auto obj = scalar_array->mutable_double_data(); obj->mutable_data()->Add(data, data + count); break; } case DataType::VARCHAR: case DataType::TEXT: { auto data = reinterpret_cast(data_raw); auto obj = scalar_array->mutable_string_data(); for (auto i = 0; i < count; i++) { *(obj->mutable_data()->Add()) = data[i]; } break; } case DataType::JSON: { auto data = reinterpret_cast(data_raw); auto obj = scalar_array->mutable_json_data(); for (auto i = 0; i < count; i++) { *(obj->mutable_data()->Add()) = data[i]; } break; } case DataType::ARRAY: { auto data = reinterpret_cast(data_raw); auto obj = scalar_array->mutable_array_data(); obj->set_element_type(static_cast( field_meta.get_element_type())); for (auto i = 0; i < count; i++) { *(obj->mutable_data()->Add()) = data[i]; } break; } default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported datatype {}", data_type)); } } return data_array; } std::unique_ptr CreateVectorDataArrayFrom(const void* data_raw, int64_t count, const FieldMeta& field_meta) { auto data_type = field_meta.get_data_type(); auto data_array = std::make_unique(); data_array->set_field_id(field_meta.get_id().get()); data_array->set_type(static_cast( field_meta.get_data_type())); auto vector_array = data_array->mutable_vectors(); auto dim = 0; if (!IsSparseFloatVectorDataType(data_type)) { dim = field_meta.get_dim(); vector_array->set_dim(dim); } switch (data_type) { case DataType::VECTOR_FLOAT: { auto length = count * dim; auto data = reinterpret_cast(data_raw); auto obj = vector_array->mutable_float_vector(); obj->mutable_data()->Add(data, data + length); break; } case DataType::VECTOR_BINARY: { AssertInfo(dim % 8 == 0, "Binary vector field dimension is not a multiple of 8"); auto num_bytes = count * dim / 8; auto data = reinterpret_cast(data_raw); auto obj = vector_array->mutable_binary_vector(); obj->assign(data, num_bytes); break; } case DataType::VECTOR_FLOAT16: { auto length = count * dim; auto data = reinterpret_cast(data_raw); auto obj = vector_array->mutable_float16_vector(); obj->assign(data, length * sizeof(float16)); break; } case DataType::VECTOR_BFLOAT16: { auto length = count * dim; auto data = reinterpret_cast(data_raw); auto obj = vector_array->mutable_bfloat16_vector(); obj->assign(data, length * sizeof(bfloat16)); break; } case DataType::VECTOR_SPARSE_FLOAT: { SparseRowsToProto( [&](size_t i) { return reinterpret_cast< const knowhere::sparse::SparseRow*>( data_raw) + i; }, count, vector_array->mutable_sparse_float_vector()); vector_array->set_dim(vector_array->sparse_float_vector().dim()); break; } case DataType::VECTOR_INT8: { auto length = count * dim; auto data = reinterpret_cast(data_raw); auto obj = vector_array->mutable_int8_vector(); obj->assign(data, length * sizeof(int8)); break; } default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported datatype {}", data_type)); } } return data_array; } std::unique_ptr CreateDataArrayFrom(const void* data_raw, const void* valid_data, int64_t count, const FieldMeta& field_meta) { auto data_type = field_meta.get_data_type(); if (!IsVectorDataType(data_type)) { return CreateScalarDataArrayFrom( data_raw, valid_data, count, field_meta); } return CreateVectorDataArrayFrom(data_raw, count, field_meta); } // TODO remove merge dataArray, instead fill target entity when get data slice std::unique_ptr MergeDataArray(std::vector& merge_bases, const FieldMeta& field_meta) { auto data_type = field_meta.get_data_type(); auto data_array = std::make_unique(); data_array->set_field_id(field_meta.get_id().get()); auto nullable = field_meta.is_nullable(); data_array->set_type(static_cast( field_meta.get_data_type())); for (auto& merge_base : merge_bases) { auto src_field_data = merge_base.get_field_data(field_meta.get_id()); auto src_offset = merge_base.getOffset(); AssertInfo(data_type == DataType(src_field_data->type()), "merge field data type not consistent"); if (field_meta.is_vector()) { auto vector_array = data_array->mutable_vectors(); auto dim = 0; if (!IsSparseFloatVectorDataType(data_type)) { dim = field_meta.get_dim(); vector_array->set_dim(dim); } if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { auto data = VEC_FIELD_DATA(src_field_data, float).data(); auto obj = vector_array->mutable_float_vector(); obj->mutable_data()->Add(data + src_offset * dim, data + (src_offset + 1) * dim); } else if (field_meta.get_data_type() == DataType::VECTOR_FLOAT16) { auto data = VEC_FIELD_DATA(src_field_data, float16); auto obj = vector_array->mutable_float16_vector(); obj->assign(data, dim * sizeof(float16)); } else if (field_meta.get_data_type() == DataType::VECTOR_BFLOAT16) { auto data = VEC_FIELD_DATA(src_field_data, bfloat16); auto obj = vector_array->mutable_bfloat16_vector(); obj->assign(data, dim * sizeof(bfloat16)); } else if (field_meta.get_data_type() == DataType::VECTOR_BINARY) { AssertInfo( dim % 8 == 0, "Binary vector field dimension is not a multiple of 8"); auto num_bytes = dim / 8; auto data = VEC_FIELD_DATA(src_field_data, binary); auto obj = vector_array->mutable_binary_vector(); obj->assign(data + src_offset * num_bytes, num_bytes); } else if (field_meta.get_data_type() == DataType::VECTOR_SPARSE_FLOAT) { auto src = src_field_data->vectors().sparse_float_vector(); auto dst = vector_array->mutable_sparse_float_vector(); if (src.dim() > dst->dim()) { dst->set_dim(src.dim()); } vector_array->set_dim(dst->dim()); *dst->mutable_contents() = src.contents(); } else if (field_meta.get_data_type() == DataType::VECTOR_INT8) { auto data = VEC_FIELD_DATA(src_field_data, int8); auto obj = vector_array->mutable_int8_vector(); obj->assign(data, dim * sizeof(int8)); } else { PanicInfo(DataTypeInvalid, fmt::format("unsupported datatype {}", data_type)); } continue; } if (nullable) { auto data = src_field_data->valid_data().data(); auto obj = data_array->mutable_valid_data(); *(obj->Add()) = data[src_offset]; } auto scalar_array = data_array->mutable_scalars(); switch (data_type) { case DataType::BOOL: { auto data = FIELD_DATA(src_field_data, bool).data(); auto obj = scalar_array->mutable_bool_data(); *(obj->mutable_data()->Add()) = data[src_offset]; break; } case DataType::INT8: case DataType::INT16: case DataType::INT32: { auto data = FIELD_DATA(src_field_data, int).data(); auto obj = scalar_array->mutable_int_data(); *(obj->mutable_data()->Add()) = data[src_offset]; break; } case DataType::INT64: { auto data = FIELD_DATA(src_field_data, long).data(); auto obj = scalar_array->mutable_long_data(); *(obj->mutable_data()->Add()) = data[src_offset]; break; } case DataType::FLOAT: { auto data = FIELD_DATA(src_field_data, float).data(); auto obj = scalar_array->mutable_float_data(); *(obj->mutable_data()->Add()) = data[src_offset]; break; } case DataType::DOUBLE: { auto data = FIELD_DATA(src_field_data, double).data(); auto obj = scalar_array->mutable_double_data(); *(obj->mutable_data()->Add()) = data[src_offset]; break; } case DataType::VARCHAR: case DataType::TEXT: { auto& data = FIELD_DATA(src_field_data, string); auto obj = scalar_array->mutable_string_data(); *(obj->mutable_data()->Add()) = data[src_offset]; break; } case DataType::JSON: { auto& data = FIELD_DATA(src_field_data, json); auto obj = scalar_array->mutable_json_data(); *(obj->mutable_data()->Add()) = data[src_offset]; break; } case DataType::ARRAY: { auto& data = FIELD_DATA(src_field_data, array); auto obj = scalar_array->mutable_array_data(); obj->set_element_type( proto::schema::DataType(field_meta.get_element_type())); *(obj->mutable_data()->Add()) = data[src_offset]; break; } default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported datatype {}", data_type)); } } } return data_array; } // TODO: split scalar IndexBase with knowhere::Index std::unique_ptr ReverseDataFromIndex(const index::IndexBase* index, const int64_t* seg_offsets, int64_t count, const FieldMeta& field_meta) { auto data_type = field_meta.get_data_type(); auto data_array = std::make_unique(); data_array->set_field_id(field_meta.get_id().get()); data_array->set_type(static_cast( field_meta.get_data_type())); auto nullable = field_meta.is_nullable(); std::vector valid_data; if (nullable) { valid_data.resize(count); } auto scalar_array = data_array->mutable_scalars(); switch (data_type) { case DataType::BOOL: { using IndexType = index::ScalarIndex; auto ptr = dynamic_cast(index); std::vector raw_data(count); for (int64_t i = 0; i < count; ++i) { auto raw = ptr->Reverse_Lookup(seg_offsets[i]); // if has no value, means nullable must be true, no need to check nullable again here if (!raw.has_value()) { valid_data[i] = false; continue; } if (nullable) { valid_data[i] = true; } raw_data[i] = raw.value(); } auto obj = scalar_array->mutable_bool_data(); *(obj->mutable_data()) = {raw_data.begin(), raw_data.end()}; break; } case DataType::INT8: { using IndexType = index::ScalarIndex; auto ptr = dynamic_cast(index); std::vector raw_data(count); for (int64_t i = 0; i < count; ++i) { auto raw = ptr->Reverse_Lookup(seg_offsets[i]); // if has no value, means nullable must be true, no need to check nullable again here if (!raw.has_value()) { valid_data[i] = false; continue; } if (nullable) { valid_data[i] = true; } raw_data[i] = raw.value(); } auto obj = scalar_array->mutable_int_data(); *(obj->mutable_data()) = {raw_data.begin(), raw_data.end()}; break; } case DataType::INT16: { using IndexType = index::ScalarIndex; auto ptr = dynamic_cast(index); std::vector raw_data(count); for (int64_t i = 0; i < count; ++i) { auto raw = ptr->Reverse_Lookup(seg_offsets[i]); // if has no value, means nullable must be true, no need to check nullable again here if (!raw.has_value()) { valid_data[i] = false; continue; } if (nullable) { valid_data[i] = true; } raw_data[i] = raw.value(); } auto obj = scalar_array->mutable_int_data(); *(obj->mutable_data()) = {raw_data.begin(), raw_data.end()}; break; } case DataType::INT32: { using IndexType = index::ScalarIndex; auto ptr = dynamic_cast(index); std::vector raw_data(count); for (int64_t i = 0; i < count; ++i) { auto raw = ptr->Reverse_Lookup(seg_offsets[i]); // if has no value, means nullable must be true, no need to check nullable again here if (!raw.has_value()) { valid_data[i] = false; continue; } if (nullable) { valid_data[i] = true; } raw_data[i] = raw.value(); } auto obj = scalar_array->mutable_int_data(); *(obj->mutable_data()) = {raw_data.begin(), raw_data.end()}; break; } case DataType::INT64: { using IndexType = index::ScalarIndex; auto ptr = dynamic_cast(index); std::vector raw_data(count); for (int64_t i = 0; i < count; ++i) { auto raw = ptr->Reverse_Lookup(seg_offsets[i]); // if has no value, means nullable must be true, no need to check nullable again here if (!raw.has_value()) { valid_data[i] = false; continue; } if (nullable) { valid_data[i] = true; } raw_data[i] = raw.value(); } auto obj = scalar_array->mutable_long_data(); *(obj->mutable_data()) = {raw_data.begin(), raw_data.end()}; break; } case DataType::FLOAT: { using IndexType = index::ScalarIndex; auto ptr = dynamic_cast(index); std::vector raw_data(count); for (int64_t i = 0; i < count; ++i) { auto raw = ptr->Reverse_Lookup(seg_offsets[i]); // if has no value, means nullable must be true, no need to check nullable again here if (!raw.has_value()) { valid_data[i] = false; continue; } if (nullable) { valid_data[i] = true; } raw_data[i] = raw.value(); } auto obj = scalar_array->mutable_float_data(); *(obj->mutable_data()) = {raw_data.begin(), raw_data.end()}; break; } case DataType::DOUBLE: { using IndexType = index::ScalarIndex; auto ptr = dynamic_cast(index); std::vector raw_data(count); for (int64_t i = 0; i < count; ++i) { auto raw = ptr->Reverse_Lookup(seg_offsets[i]); // if has no value, means nullable must be true, no need to check nullable again here if (!raw.has_value()) { valid_data[i] = false; continue; } if (nullable) { valid_data[i] = true; } raw_data[i] = raw.value(); } auto obj = scalar_array->mutable_double_data(); *(obj->mutable_data()) = {raw_data.begin(), raw_data.end()}; break; } case DataType::VARCHAR: { using IndexType = index::ScalarIndex; auto ptr = dynamic_cast(index); std::vector raw_data(count); for (int64_t i = 0; i < count; ++i) { auto raw = ptr->Reverse_Lookup(seg_offsets[i]); // if has no value, means nullable must be true, no need to check nullable again here if (!raw.has_value()) { valid_data[i] = false; continue; } if (nullable) { valid_data[i] = true; } raw_data[i] = raw.value(); } auto obj = scalar_array->mutable_string_data(); *(obj->mutable_data()) = {raw_data.begin(), raw_data.end()}; break; } default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported datatype {}", data_type)); } } if (nullable) { *(data_array->mutable_valid_data()) = {valid_data.begin(), valid_data.end()}; } return data_array; } // init segcore storage config first, and create default remote chunk manager // segcore use default remote chunk manager to load data from minio/s3 void LoadArrowReaderFromRemote(const std::vector& remote_files, std::shared_ptr channel) { try { auto rcm = storage::RemoteChunkManagerSingleton::GetInstance() .GetRemoteChunkManager(); auto& pool = ThreadPools::GetThreadPool(ThreadPoolPriority::HIGH); std::vector>> futures; futures.reserve(remote_files.size()); for (const auto& file : remote_files) { auto future = pool.Submit([&]() { auto fileSize = rcm->Size(file); auto buf = std::shared_ptr(new uint8_t[fileSize]); rcm->Read(file, buf.get(), fileSize); auto result = storage::DeserializeFileData(buf, fileSize, false); result->SetData(buf); return result->GetReader(); }); futures.emplace_back(std::move(future)); } for (auto& future : futures) { auto field_data = future.get(); channel->push(field_data); } channel->close(); } catch (std::exception& e) { LOG_INFO("failed to load data from remote: {}", e.what()); channel->close(std::current_exception()); } } void LoadFieldDatasFromRemote(const std::vector& remote_files, FieldDataChannelPtr channel) { try { auto rcm = storage::RemoteChunkManagerSingleton::GetInstance() .GetRemoteChunkManager(); auto& pool = ThreadPools::GetThreadPool(ThreadPoolPriority::HIGH); std::vector> futures; futures.reserve(remote_files.size()); for (const auto& file : remote_files) { auto future = pool.Submit([rcm, file]() { auto fileSize = rcm->Size(file); auto buf = std::shared_ptr(new uint8_t[fileSize]); rcm->Read(file, buf.get(), fileSize); auto result = storage::DeserializeFileData(buf, fileSize); return result->GetFieldData(); }); futures.emplace_back(std::move(future)); } for (auto& future : futures) { auto field_data = future.get(); channel->push(field_data); } channel->close(); } catch (std::exception& e) { LOG_INFO("failed to load data from remote: {}", e.what()); channel->close(std::current_exception()); } } int64_t upper_bound(const ConcurrentVector& timestamps, int64_t first, int64_t last, Timestamp value) { int64_t mid; while (first < last) { mid = first + (last - first) / 2; if (value >= timestamps[mid]) { first = mid + 1; } else { last = mid; } } return first; } } // namespace milvus::segcore