diff --git a/internal/core/src/storage/ChunkCache.cpp b/internal/core/src/storage/ChunkCache.cpp index 4509b59fff..365563f205 100644 --- a/internal/core/src/storage/ChunkCache.cpp +++ b/internal/core/src/storage/ChunkCache.cpp @@ -15,32 +15,56 @@ // limitations under the License. #include "ChunkCache.h" +#include +#include #include "common/Types.h" -#include "mmap/Utils.h" namespace milvus::storage { std::shared_ptr ChunkCache::Read(const std::string& filepath, const MmapChunkDescriptorPtr& descriptor) { + // use rlock to get future { std::shared_lock lck(mutex_); auto it = columns_.find(filepath); if (it != columns_.end()) { - AssertInfo(it->second, "unexpected null column, file={}", filepath); - return it->second; + lck.unlock(); + auto result = it->second.second.get(); + AssertInfo(result, "unexpected null column, file={}", filepath); + return result; } } - auto field_data = DownloadAndDecodeRemoteFile(cm_.get(), filepath); - + // lock for mutation std::unique_lock lck(mutex_); + // double check no-futurn auto it = columns_.find(filepath); if (it != columns_.end()) { - return it->second; + lck.unlock(); + auto result = it->second.second.get(); + AssertInfo(result, "unexpected null column, file={}", filepath); + return result; } + + std::promise> p; + std::shared_future> f = p.get_future(); + columns_.emplace(filepath, std::make_pair(std::move(p), f)); + lck.unlock(); + + // release lock and perform download and decode + // other thread request same path shall get the future. + auto field_data = DownloadAndDecodeRemoteFile(cm_.get(), filepath); auto column = Mmap(field_data->GetFieldData(), descriptor); + + // set promise value to notify the future + lck.lock(); + it = columns_.find(filepath); + if (it != columns_.end()) { + // check pair exists then set value + it->second.first.set_value(column); + } + lck.unlock(); AssertInfo(column, "unexpected null column, file={}", filepath); - columns_.emplace(filepath, column); return column; } @@ -58,7 +82,7 @@ ChunkCache::Prefetch(const std::string& filepath) { return; } - auto column = it->second; + auto column = it->second.second.get(); auto ok = madvise( reinterpret_cast(const_cast(column->MmappedData())), column->ByteSize(), diff --git a/internal/core/src/storage/ChunkCache.h b/internal/core/src/storage/ChunkCache.h index d0c742bfaf..2af89386fe 100644 --- a/internal/core/src/storage/ChunkCache.h +++ b/internal/core/src/storage/ChunkCache.h @@ -15,6 +15,8 @@ // limitations under the License. #pragma once +#include +#include #include "storage/MmapChunkManager.h" #include "mmap/Column.h" @@ -60,8 +62,10 @@ class ChunkCache { CachePath(const std::string& filepath); private: - using ColumnTable = - std::unordered_map>; + using ColumnTable = std::unordered_map< + std::string, + std::pair>, + std::shared_future>>>; private: mutable std::shared_mutex mutex_;