fix: [2.6]Ensure fulfill promise when CreateArrowFileSystem throw an exception (#44976)

issue: #44974 

master pr: #44975

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2025-10-21 10:14:03 +08:00 committed by GitHub
parent 6476b1dbcf
commit 09880e8e6c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -33,41 +33,54 @@ StorageV2FSCache::Get(const Key& key) {
std::promise<milvus_storage::ArrowFileSystemPtr> p;
std::shared_future<milvus_storage::ArrowFileSystemPtr> f = p.get_future();
auto iter = concurrent_map_.emplace(key, Value(std::move(p), f));
if (!iter.second) {
return iter.first->second.second.get();
auto [iter, inserted] =
concurrent_map_.emplace(key, Value(std::move(p), f));
if (!inserted) {
return iter->second.second.get();
}
// singleflight create fs for one key
milvus_storage::ArrowFileSystemConfig conf;
conf.address = std::string(key.address);
conf.bucket_name = std::string(key.bucket_name);
conf.access_key_id = std::string(key.access_key_id);
conf.access_key_value = std::string(key.access_key_value);
conf.root_path = std::string(key.root_path);
conf.storage_type = std::string(key.storage_type);
conf.cloud_provider = std::string(key.cloud_provider);
conf.iam_endpoint = std::string(key.iam_endpoint);
conf.log_level = std::string(key.log_level);
conf.region = std::string(key.region);
conf.use_ssl = key.useSSL;
conf.ssl_ca_cert = std::string(key.sslCACert);
conf.use_iam = key.useIAM;
conf.use_virtual_host = key.useVirtualHost;
conf.request_timeout_ms = key.requestTimeoutMs;
conf.gcp_credential_json = std::string(key.gcp_credential_json);
conf.use_custom_part_upload = key.use_custom_part_upload;
auto result = milvus_storage::CreateArrowFileSystem(conf);
try {
milvus_storage::ArrowFileSystemConfig conf;
conf.address = std::string(key.address);
conf.bucket_name = std::string(key.bucket_name);
conf.access_key_id = std::string(key.access_key_id);
conf.access_key_value = std::string(key.access_key_value);
conf.root_path = std::string(key.root_path);
conf.storage_type = std::string(key.storage_type);
conf.cloud_provider = std::string(key.cloud_provider);
conf.iam_endpoint = std::string(key.iam_endpoint);
conf.log_level = std::string(key.log_level);
conf.region = std::string(key.region);
conf.use_ssl = key.useSSL;
conf.ssl_ca_cert = std::string(key.sslCACert);
conf.use_iam = key.useIAM;
conf.use_virtual_host = key.useVirtualHost;
conf.request_timeout_ms = key.requestTimeoutMs;
conf.gcp_credential_json = std::string(key.gcp_credential_json);
conf.use_custom_part_upload = key.use_custom_part_upload;
auto result = milvus_storage::CreateArrowFileSystem(conf);
if (!result.ok()) {
iter->second.first.set_value(nullptr);
std::unique_lock lck(mutex_);
concurrent_map_.unsafe_erase(iter);
return nullptr;
}
auto fs = result.value();
iter->second.first.set_value(fs);
return fs;
} catch (...) {
try {
iter->second.first.set_exception(std::current_exception());
} catch (...) {
}
if (!result.ok()) {
iter.first->second.first.set_value(nullptr);
std::unique_lock lck(mutex_);
concurrent_map_.unsafe_erase(iter.first);
concurrent_map_.unsafe_erase(iter);
return nullptr;
}
auto fs = result.value();
iter.first->second.first.set_value(fs);
return fs;
}
} // namespace milvus::storage