milvus/internal/core/src/storage/MinioChunkManager.cpp
Enwei Jiao 66a5efeb3d
Fix load failed with DiskANN (#23760)
Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>
2023-04-27 16:56:35 +08:00

438 lines
14 KiB
C++

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "storage/MinioChunkManager.h"
#include <aws/core/auth/AWSCredentials.h>
#include <aws/core/auth/AWSCredentialsProviderChain.h>
#include <aws/core/auth/STSCredentialsProvider.h>
#include <aws/s3/model/CreateBucketRequest.h>
#include <aws/s3/model/DeleteBucketRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/HeadBucketRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/ListObjectsRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <fstream>
#include "storage/AliyunSTSClient.h"
#include "storage/AliyunCredentialsProvider.h"
#include "exceptions/EasyAssert.h"
#include "log/Log.h"
#define THROWS3ERROR(FUNCTION) \
do { \
auto& err = outcome.GetError(); \
std::stringstream err_msg; \
err_msg << "Error:" << #FUNCTION << ":" << err.GetExceptionName() \
<< " " << err.GetMessage(); \
throw S3ErrorException(err_msg.str()); \
} while (0)
#define S3NoSuchBucket "NoSuchBucket"
namespace milvus::storage {
std::atomic<size_t> MinioChunkManager::init_count_(0);
std::mutex MinioChunkManager::client_mutex_;
/**
* @brief convert std::string to Aws::String
* because Aws has String type internally
* but has a copy of string content unfortunately
* TODO: remove this convert
* @param str
* @return Aws::String
*/
inline Aws::String
ConvertToAwsString(const std::string& str) {
return Aws::String(str.c_str(), str.size());
}
/**
* @brief convert Aws::string to std::string
* @param aws_str
* @return std::string
*/
inline std::string
ConvertFromAwsString(const Aws::String& aws_str) {
return std::string(aws_str.c_str(), aws_str.size());
}
void
MinioChunkManager::InitSDKAPI(RemoteStorageType type) {
std::scoped_lock lock{client_mutex_};
const size_t initCount = init_count_++;
if (initCount == 0) {
sdk_options_.httpOptions.installSigPipeHandler = true;
Aws::InitAPI(sdk_options_);
}
}
void
MinioChunkManager::ShutdownSDKAPI() {
std::scoped_lock lock{client_mutex_};
const size_t initCount = --init_count_;
if (initCount == 0) {
Aws::ShutdownAPI(sdk_options_);
}
}
void
MinioChunkManager::BuildS3Client(
const StorageConfig& storage_config,
const Aws::Client::ClientConfiguration& config) {
if (storage_config.useIAM) {
auto provider =
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
auto aws_credentials = provider->GetAWSCredentials();
AssertInfo(!aws_credentials.GetAWSAccessKeyId().empty(),
"if use iam, access key id should not be empty");
AssertInfo(!aws_credentials.GetAWSSecretKey().empty(),
"if use iam, secret key should not be empty");
AssertInfo(!aws_credentials.GetSessionToken().empty(),
"if use iam, token should not be empty");
client_ = std::make_shared<Aws::S3::S3Client>(
provider,
config,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
false);
} else {
AssertInfo(!storage_config.access_key_id.empty(),
"if not use iam, access key should not be empty");
AssertInfo(!storage_config.access_key_value.empty(),
"if not use iam, access value should not be empty");
client_ = std::make_shared<Aws::S3::S3Client>(
Aws::Auth::AWSCredentials(
ConvertToAwsString(storage_config.access_key_id),
ConvertToAwsString(storage_config.access_key_value)),
config,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
false);
}
}
void
MinioChunkManager::BuildAliyunCloudClient(
const StorageConfig& storage_config,
const Aws::Client::ClientConfiguration& config) {
if (storage_config.useIAM) {
auto aliyun_provider = Aws::MakeShared<
Aws::Auth::AliyunSTSAssumeRoleWebIdentityCredentialsProvider>(
"AliyunSTSAssumeRoleWebIdentityCredentialsProvider");
auto aliyun_credentials = aliyun_provider->GetAWSCredentials();
AssertInfo(!aliyun_credentials.GetAWSAccessKeyId().empty(),
"if use iam, access key id should not be empty");
AssertInfo(!aliyun_credentials.GetAWSSecretKey().empty(),
"if use iam, secret key should not be empty");
AssertInfo(!aliyun_credentials.GetSessionToken().empty(),
"if use iam, token should not be empty");
client_ = std::make_shared<Aws::S3::S3Client>(
aliyun_provider,
config,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
true);
} else {
throw std::runtime_error("aliyun cloud only support iam mode now");
}
}
MinioChunkManager::MinioChunkManager(const StorageConfig& storage_config)
: default_bucket_name_(storage_config.bucket_name) {
RemoteStorageType storageType;
if (storage_config.address.find("aliyun") != std::string::npos) {
storageType = RemoteStorageType::ALIYUN_CLOUD;
} else {
storageType = RemoteStorageType::S3;
}
InitSDKAPI(storageType);
Aws::Client::ClientConfiguration config;
config.endpointOverride = ConvertToAwsString(storage_config.address);
if (storage_config.useSSL) {
config.scheme = Aws::Http::Scheme::HTTPS;
config.verifySSL = true;
} else {
config.scheme = Aws::Http::Scheme::HTTP;
config.verifySSL = false;
}
if (storageType == RemoteStorageType::S3) {
BuildS3Client(storage_config, config);
} else if (storageType == RemoteStorageType::ALIYUN_CLOUD) {
BuildAliyunCloudClient(storage_config, config);
}
// TODO ::BucketExist and CreateBucket func not work, should be fixed
// index node has already tried to create bucket when receive index task if bucket not exist
// query node has already tried to create bucket during init stage if bucket not exist
// if (!BucketExists(storage_config.bucket_name)) {
// CreateBucket(storage_config.bucket_name);
// }
LOG_SEGCORE_INFO_ << "init MinioChunkManager with parameter[endpoint: '"
<< storage_config.address << "', default_bucket_name:'"
<< storage_config.bucket_name << "', use_secure:'"
<< std::boolalpha << storage_config.useSSL << "']";
}
MinioChunkManager::~MinioChunkManager() {
ShutdownSDKAPI();
client_.reset();
}
uint64_t
MinioChunkManager::Size(const std::string& filepath) {
return GetObjectSize(default_bucket_name_, filepath);
}
bool
MinioChunkManager::Exist(const std::string& filepath) {
return ObjectExists(default_bucket_name_, filepath);
}
void
MinioChunkManager::Remove(const std::string& filepath) {
DeleteObject(default_bucket_name_, filepath);
}
std::vector<std::string>
MinioChunkManager::ListWithPrefix(const std::string& filepath) {
return ListObjects(default_bucket_name_.c_str(), filepath.c_str());
}
uint64_t
MinioChunkManager::Read(const std::string& filepath, void* buf, uint64_t size) {
if (!ObjectExists(default_bucket_name_, filepath)) {
std::stringstream err_msg;
err_msg << "object('" << default_bucket_name_ << "', " << filepath
<< "') not exists";
throw ObjectNotExistException(err_msg.str());
}
return GetObjectBuffer(default_bucket_name_, filepath, buf, size);
}
void
MinioChunkManager::Write(const std::string& filepath,
void* buf,
uint64_t size) {
PutObjectBuffer(default_bucket_name_, filepath, buf, size);
}
bool
MinioChunkManager::BucketExists(const std::string& bucket_name) {
// auto outcome = client_->ListBuckets();
// if (!outcome.IsSuccess()) {
// THROWS3ERROR(BucketExists);
// }
// for (auto&& b : outcome.GetResult().GetBuckets()) {
// if (ConvertFromAwsString(b.GetName()) == bucket_name) {
// return true;
// }
// }
Aws::S3::Model::HeadBucketRequest request;
request.SetBucket(bucket_name.c_str());
auto outcome = client_->HeadBucket(request);
if (!outcome.IsSuccess()) {
auto error = outcome.GetError();
if (!error.GetExceptionName().empty()) {
std::stringstream err_msg;
err_msg << "Error: BucketExists: "
<< error.GetExceptionName() + " - " + error.GetMessage();
throw S3ErrorException(err_msg.str());
}
return false;
}
return true;
}
std::vector<std::string>
MinioChunkManager::ListBuckets() {
std::vector<std::string> buckets;
auto outcome = client_->ListBuckets();
if (!outcome.IsSuccess()) {
THROWS3ERROR(CreateBucket);
}
for (auto&& b : outcome.GetResult().GetBuckets()) {
buckets.emplace_back(b.GetName().c_str());
}
return buckets;
}
bool
MinioChunkManager::CreateBucket(const std::string& bucket_name) {
Aws::S3::Model::CreateBucketRequest request;
request.SetBucket(bucket_name.c_str());
auto outcome = client_->CreateBucket(request);
if (!outcome.IsSuccess() &&
Aws::S3::S3Errors(outcome.GetError().GetErrorType()) !=
Aws::S3::S3Errors::BUCKET_ALREADY_OWNED_BY_YOU) {
THROWS3ERROR(CreateBucket);
}
return true;
}
bool
MinioChunkManager::DeleteBucket(const std::string& bucket_name) {
Aws::S3::Model::DeleteBucketRequest request;
request.SetBucket(bucket_name.c_str());
auto outcome = client_->DeleteBucket(request);
if (!outcome.IsSuccess()) {
auto err = outcome.GetError();
if (err.GetExceptionName() != S3NoSuchBucket) {
THROWS3ERROR(DeleteBucket);
}
return false;
}
return true;
}
bool
MinioChunkManager::ObjectExists(const std::string& bucket_name,
const std::string& object_name) {
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(bucket_name.c_str());
request.SetKey(object_name.c_str());
auto outcome = client_->HeadObject(request);
if (!outcome.IsSuccess()) {
auto& err = outcome.GetError();
if (!err.GetExceptionName().empty()) {
std::stringstream err_msg;
err_msg << "Error: ObjectExists: " << err.GetMessage();
throw S3ErrorException(err_msg.str());
}
return false;
}
return true;
}
int64_t
MinioChunkManager::GetObjectSize(const std::string& bucket_name,
const std::string& object_name) {
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(bucket_name.c_str());
request.SetKey(object_name.c_str());
auto outcome = client_->HeadObject(request);
if (!outcome.IsSuccess()) {
THROWS3ERROR(GetObjectSize);
}
return outcome.GetResult().GetContentLength();
}
bool
MinioChunkManager::DeleteObject(const std::string& bucket_name,
const std::string& object_name) {
Aws::S3::Model::DeleteObjectRequest request;
request.SetBucket(bucket_name.c_str());
request.SetKey(object_name.c_str());
auto outcome = client_->DeleteObject(request);
if (!outcome.IsSuccess()) {
// auto err = outcome.GetError();
// std::stringstream err_msg;
// err_msg << "Error: DeleteObject:" << err.GetMessage();
// throw S3ErrorException(err_msg.str());
THROWS3ERROR(DeleteObject);
}
return true;
}
bool
MinioChunkManager::PutObjectBuffer(const std::string& bucket_name,
const std::string& object_name,
void* buf,
uint64_t size) {
Aws::S3::Model::PutObjectRequest request;
request.SetBucket(bucket_name.c_str());
request.SetKey(object_name.c_str());
const std::shared_ptr<Aws::IOStream> input_data =
Aws::MakeShared<Aws::StringStream>("");
input_data->write(reinterpret_cast<char*>(buf), size);
request.SetBody(input_data);
auto outcome = client_->PutObject(request);
if (!outcome.IsSuccess()) {
THROWS3ERROR(PutObjectBuffer);
}
return true;
}
uint64_t
MinioChunkManager::GetObjectBuffer(const std::string& bucket_name,
const std::string& object_name,
void* buf,
uint64_t size) {
Aws::S3::Model::GetObjectRequest request;
request.SetBucket(bucket_name.c_str());
request.SetKey(object_name.c_str());
request.SetResponseStreamFactory([buf, size]() {
std::unique_ptr<Aws::StringStream> stream(
Aws::New<Aws::StringStream>(""));
stream->rdbuf()->pubsetbuf(static_cast<char*>(buf), size);
return stream.release();
});
auto outcome = client_->GetObject(request);
if (!outcome.IsSuccess()) {
THROWS3ERROR(GetObjectBuffer);
}
return size;
}
std::vector<std::string>
MinioChunkManager::ListObjects(const char* bucket_name, const char* prefix) {
std::vector<std::string> objects_vec;
Aws::S3::Model::ListObjectsRequest request;
request.WithBucket(bucket_name);
if (prefix != nullptr) {
request.SetPrefix(prefix);
}
auto outcome = client_->ListObjects(request);
if (!outcome.IsSuccess()) {
THROWS3ERROR(ListObjects);
}
auto objects = outcome.GetResult().GetContents();
for (auto& obj : objects) {
objects_vec.emplace_back(obj.GetKey().c_str());
}
return objects_vec;
}
} // namespace milvus::storage