diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index 9b51b0321f..61d090bcc7 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -14,16 +14,22 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include #include #include "common/Common.h" +#include "common/Consts.h" +#include "common/EasyAssert.h" +#include "common/File.h" #include "common/Slice.h" #include "log/Log.h" #include "storage/DiskFileManagerImpl.h" +#include "storage/FieldData.h" +#include "storage/FieldDataInterface.h" #include "storage/FileManager.h" #include "storage/IndexData.h" #include "storage/LocalChunkManagerSingleton.h" @@ -120,6 +126,7 @@ DiskFileManagerImpl::AddBatchIndexFiles( auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::HIGH); std::vector>> futures; + futures.reserve(remote_file_sizes.size()); AssertInfo(local_file_offsets.size() == remote_files.size(), "inconsistent size of offset slices with file slices"); AssertInfo(remote_files.size() == remote_file_sizes.size(), @@ -167,7 +174,7 @@ DiskFileManagerImpl::CacheIndexToDisk( std::map> index_slices; for (auto& file_path : remote_files) { - auto pos = file_path.find_last_of("_"); + auto pos = file_path.find_last_of('_'); index_slices[file_path.substr(0, pos)].emplace_back( std::stoi(file_path.substr(pos + 1))); } @@ -176,39 +183,30 @@ DiskFileManagerImpl::CacheIndexToDisk( std::sort(slices.second.begin(), slices.second.end()); } - auto EstimateParallelDegree = [&](const std::string& file) -> uint64_t { - auto fileSize = rcm_->Size(file); - return uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / fileSize); - }; - for (auto& slices : index_slices) { auto prefix = slices.first; auto local_index_file_name = GetLocalIndexObjectPrefix() + prefix.substr(prefix.find_last_of('/') + 1); local_chunk_manager->CreateFile(local_index_file_name); - int64_t offset = 0; + auto file = + File::Open(local_index_file_name, O_CREAT | O_RDWR | O_TRUNC); + + // Get the remote files std::vector batch_remote_files; - uint64_t max_parallel_degree = INT_MAX; + batch_remote_files.reserve(slices.second.size()); for (int& iter : slices.second) { - if (batch_remote_files.size() == max_parallel_degree) { - auto next_offset = CacheBatchIndexFilesToDisk( - batch_remote_files, local_index_file_name, offset); - offset = next_offset; - batch_remote_files.clear(); - } auto origin_file = prefix + "_" + std::to_string(iter); - if (batch_remote_files.size() == 0) { - // Use first file size as average size to estimate - max_parallel_degree = EstimateParallelDegree(origin_file); - } batch_remote_files.push_back(origin_file); } - if (batch_remote_files.size() > 0) { - auto next_offset = CacheBatchIndexFilesToDisk( - batch_remote_files, local_index_file_name, offset); - offset = next_offset; - batch_remote_files.clear(); + + auto index_chunks = GetObjectData(rcm_.get(), batch_remote_files); + for (auto& chunk : index_chunks) { + auto index_data = chunk.get()->GetFieldData(); + auto index_size = index_data->Size(); + auto chunk_data = reinterpret_cast( + const_cast(index_data->Data())); + file.Write(chunk_data, index_size); } local_paths_.emplace_back(local_index_file_name); }