diff --git a/internal/core/src/common/init_c.cpp b/internal/core/src/common/init_c.cpp index ce1abf1654..b975732aea 100644 --- a/internal/core/src/common/init_c.cpp +++ b/internal/core/src/common/init_c.cpp @@ -19,46 +19,6 @@ #include #include "config/ConfigChunkManager.h" -void -MinioAddressInit(const char* address) { - std::string minio_address(address); - milvus::ChunkMangerConfig::SetAddress(address); -} - -void -MinioAccessKeyInit(const char* key) { - std::string minio_access_key(key); - milvus::ChunkMangerConfig::SetAccessKey(minio_access_key); -} - -void -MinioAccessValueInit(const char* value) { - std::string minio_access_value(value); - milvus::ChunkMangerConfig::SetAccessValue(value); -} - -void -MinioSSLInit(bool use_ssl) { - milvus::ChunkMangerConfig::SetUseSSL(use_ssl); -} - -void -MinioUseIamInit(bool use_iam) { - milvus::ChunkMangerConfig::SetUseIAM(use_iam); -} - -void -MinioBucketNameInit(const char* name) { - std::string bucket_name(name); - milvus::ChunkMangerConfig::SetBucketName(name); -} - -void -MinioRootPathInit(const char* name) { - std::string root_path(name); - milvus::ChunkMangerConfig::SetRemoteRootPath(name); -} - void LocalRootPathInit(const char* root_path) { std::string local_path_root(root_path); diff --git a/internal/core/src/common/init_c.h b/internal/core/src/common/init_c.h index 66411fc8c9..238f5da906 100644 --- a/internal/core/src/common/init_c.h +++ b/internal/core/src/common/init_c.h @@ -22,27 +22,6 @@ extern "C" { #include -void -MinioAddressInit(const char*); - -void -MinioAccessKeyInit(const char*); - -void -MinioAccessValueInit(const char*); - -void -MinioSSLInit(bool use_ssl); - -void -MinioUseIamInit(bool use_iam); - -void -MinioBucketNameInit(const char*); - -void -MinioRootPathInit(const char*); - void LocalRootPathInit(const char*); diff --git a/internal/core/src/common/type_c.h b/internal/core/src/common/type_c.h index 470af5afd8..8946ab2c45 100644 --- a/internal/core/src/common/type_c.h +++ b/internal/core/src/common/type_c.h @@ -15,6 +15,7 @@ // limitations under the License. #pragma once +#include #include #ifdef __cplusplus @@ -82,6 +83,18 @@ typedef struct CLoadDeletedRecordInfo { int64_t row_count; } CLoadDeletedRecordInfo; +typedef struct CStorageConfig { + const char* address; + const char* bucket_name; + const char* access_key_id; + const char* access_key_value; + const char* remote_root_path; + const char* storage_type; + const char* iam_endpoint; + bool useSSL; + bool useIAM; +} CStorageConfig; + #ifdef __cplusplus } #endif diff --git a/internal/core/src/config/ConfigChunkManager.cpp b/internal/core/src/config/ConfigChunkManager.cpp index 561d99a4d1..87de8a0f7f 100644 --- a/internal/core/src/config/ConfigChunkManager.cpp +++ b/internal/core/src/config/ConfigChunkManager.cpp @@ -18,84 +18,7 @@ namespace milvus::ChunkMangerConfig { -std::string REMOTE_ADDRESS = "localhost:9000"; // NOLINT -std::string REMOTE_ACCESS_KEY = "minioadmin"; // NOLINT -std::string REMOTE_ACCESS_VALUE = "minioadmin"; // NOLINT -std::string REMOTE_BUCKET_NAME = "a-bucket"; // NOLINT -std::string REMOTE_ROOT_PATH = "files"; // NOLINT -std::string LOCAL_ROOT_PATH = "/tmp/milvus"; // NOLINT -bool MINIO_USE_SSL = false; -bool MINIO_USE_IAM = false; - -void -SetAddress(const std::string& address) { - REMOTE_ADDRESS = address; -} - -std::string -GetAddress() { - return REMOTE_ADDRESS; -} - -void -SetAccessKey(const std::string& access_key) { - REMOTE_ACCESS_KEY = access_key; -} - -std::string -GetAccessKey() { - return REMOTE_ACCESS_KEY; -} - -void -SetAccessValue(const std::string& access_value) { - REMOTE_ACCESS_VALUE = access_value; -} - -std::string -GetAccessValue() { - return REMOTE_ACCESS_VALUE; -} - -void -SetBucketName(const std::string& bucket_name) { - REMOTE_BUCKET_NAME = bucket_name; -} - -std::string -GetBucketName() { - return REMOTE_BUCKET_NAME; -} - -void -SetUseSSL(bool use_ssl) { - MINIO_USE_SSL = use_ssl; -} - -bool -GetUseSSL() { - return MINIO_USE_SSL; -} - -void -SetUseIAM(bool use_iam) { - MINIO_USE_IAM = use_iam; -} - -bool -GetUseIAM() { - return MINIO_USE_IAM; -} - -void -SetRemoteRootPath(const std::string& root_path) { - REMOTE_ROOT_PATH = root_path; -} - -std::string -GetRemoteRootPath() { - return REMOTE_ROOT_PATH; -} +std::string LOCAL_ROOT_PATH = "/tmp/milvus"; // NOLINT void SetLocalRootPath(const std::string& path_prefix) { diff --git a/internal/core/src/config/ConfigChunkManager.h b/internal/core/src/config/ConfigChunkManager.h index ebdaed586d..40ccec2dee 100644 --- a/internal/core/src/config/ConfigChunkManager.h +++ b/internal/core/src/config/ConfigChunkManager.h @@ -20,48 +20,6 @@ namespace milvus::ChunkMangerConfig { -void -SetAddress(const std::string& address); - -std::string -GetAddress(); - -void -SetAccessKey(const std::string& access_key); - -std::string -GetAccessKey(); - -void -SetAccessValue(const std::string& access_value); - -std::string -GetAccessValue(); - -void -SetUseSSL(bool use_ssl); - -bool -GetUseSSL(); - -void -SetUseIAM(bool use_iam); - -bool -GetUseIAM(); - -void -SetBucketName(const std::string& bucket_name); - -std::string -GetBucketName(); - -void -SetRemoteRootPath(const std::string& path_prefix); - -std::string -GetRemoteRootPath(); - void SetLocalRootPath(const std::string& path_prefix); diff --git a/internal/core/src/index/IndexInfo.h b/internal/core/src/index/IndexInfo.h index 66001a0c42..f68c7e95fc 100644 --- a/internal/core/src/index/IndexInfo.h +++ b/internal/core/src/index/IndexInfo.h @@ -16,31 +16,10 @@ #pragma once -#include -#include -#include -#include - #include "common/Types.h" -#include "common/type_c.h" -#include "index/Index.h" namespace milvus::index { -struct LoadIndexInfo { - int64_t collection_id; - int64_t partition_id; - int64_t segment_id; - int64_t field_id; - DataType field_type; - int64_t index_id; - int64_t index_build_id; - int64_t index_version; - std::map index_params; - std::vector index_files; - index::IndexBasePtr index; -}; - struct CreateIndexInfo { DataType field_type; IndexType index_type; diff --git a/internal/core/src/indexbuilder/IndexFactory.h b/internal/core/src/indexbuilder/IndexFactory.h index 5896ac3de8..e46f79aa2f 100644 --- a/internal/core/src/indexbuilder/IndexFactory.h +++ b/internal/core/src/indexbuilder/IndexFactory.h @@ -17,6 +17,7 @@ #include "indexbuilder/ScalarIndexCreator.h" #include "indexbuilder/VecIndexCreator.h" #include "indexbuilder/type_c.h" +#include "storage/Types.h" #include #include @@ -39,7 +40,10 @@ class IndexFactory { } IndexCreatorBasePtr - CreateIndex(CDataType dtype, const char* type_params, const char* index_params) { + CreateIndex(CDataType dtype, + const char* type_params, + const char* index_params, + const storage::StorageConfig& storage_config) { auto real_dtype = DataType(dtype); auto invalid_dtype_msg = std::string("invalid data type: ") + std::to_string(int(real_dtype)); @@ -57,7 +61,7 @@ class IndexFactory { case DataType::VECTOR_FLOAT: case DataType::VECTOR_BINARY: - return std::make_unique(real_dtype, type_params, index_params); + return std::make_unique(real_dtype, type_params, index_params, storage_config); default: throw std::invalid_argument(invalid_dtype_msg); } diff --git a/internal/core/src/indexbuilder/VecIndexCreator.cpp b/internal/core/src/indexbuilder/VecIndexCreator.cpp index f91e44c2cc..1b747e27a5 100644 --- a/internal/core/src/indexbuilder/VecIndexCreator.cpp +++ b/internal/core/src/indexbuilder/VecIndexCreator.cpp @@ -25,7 +25,8 @@ namespace milvus::indexbuilder { VecIndexCreator::VecIndexCreator(DataType data_type, const char* serialized_type_params, - const char* serialized_index_params) + const char* serialized_index_params, + const storage::StorageConfig& storage_config) : data_type_(data_type) { proto::indexcgo::TypeParams type_params_; proto::indexcgo::IndexParams index_params_; @@ -52,8 +53,8 @@ VecIndexCreator::VecIndexCreator(DataType data_type, #ifdef BUILD_DISK_ANN if (index::is_in_disk_list(index_info.index_type)) { // For now, only support diskann index - file_manager = std::make_shared(index::GetFieldDataMetaFromConfig(config_), - index::GetIndexMetaFromConfig(config_)); + file_manager = std::make_shared( + index::GetFieldDataMetaFromConfig(config_), index::GetIndexMetaFromConfig(config_), storage_config); } #endif diff --git a/internal/core/src/indexbuilder/VecIndexCreator.h b/internal/core/src/indexbuilder/VecIndexCreator.h index a7d47252f4..3a696331cd 100644 --- a/internal/core/src/indexbuilder/VecIndexCreator.h +++ b/internal/core/src/indexbuilder/VecIndexCreator.h @@ -19,6 +19,7 @@ #include "indexbuilder/IndexCreatorBase.h" #include "index/VectorIndex.h" #include "index/IndexInfo.h" +#include "storage/Types.h" namespace milvus::indexbuilder { @@ -27,7 +28,8 @@ class VecIndexCreator : public IndexCreatorBase { public: explicit VecIndexCreator(DataType data_type, const char* serialized_type_params, - const char* serialized_index_params); + const char* serialized_index_params, + const storage::StorageConfig& storage_config); void Build(const milvus::DatasetPtr& dataset) override; diff --git a/internal/core/src/indexbuilder/index_c.cpp b/internal/core/src/indexbuilder/index_c.cpp index 0d13e2a68d..fecf05e6d3 100644 --- a/internal/core/src/indexbuilder/index_c.cpp +++ b/internal/core/src/indexbuilder/index_c.cpp @@ -21,18 +21,37 @@ #include "indexbuilder/index_c.h" #include "indexbuilder/IndexFactory.h" #include "common/type_c.h" +#include "storage/Types.h" CStatus CreateIndex(enum CDataType dtype, const char* serialized_type_params, const char* serialized_index_params, - CIndex* res_index) { + CIndex* res_index, + CStorageConfig c_storage_config) { auto status = CStatus(); try { AssertInfo(res_index, "failed to create index, passed index was null"); - auto index = milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex(dtype, serialized_type_params, - serialized_index_params); + std::string address(c_storage_config.address); + std::string bucket_name(c_storage_config.bucket_name); + std::string access_key(c_storage_config.access_key_id); + std::string access_value(c_storage_config.access_key_value); + std::string remote_root_path(c_storage_config.remote_root_path); + std::string storage_type(c_storage_config.storage_type); + std::string iam_endpoint(c_storage_config.iam_endpoint); + auto storage_config = milvus::storage::StorageConfig{address, + bucket_name, + access_key, + access_value, + remote_root_path, + storage_type, + iam_endpoint, + c_storage_config.useSSL, + c_storage_config.useIAM}; + + auto index = milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex( + dtype, serialized_type_params, serialized_index_params, storage_config); *res_index = index.release(); status.error_code = Success; status.error_msg = ""; diff --git a/internal/core/src/indexbuilder/index_c.h b/internal/core/src/indexbuilder/index_c.h index 4f3b716329..ac3554b60b 100644 --- a/internal/core/src/indexbuilder/index_c.h +++ b/internal/core/src/indexbuilder/index_c.h @@ -24,7 +24,8 @@ CStatus CreateIndex(enum CDataType dtype, const char* serialized_type_params, const char* serialized_index_params, - CIndex* res_index); + CIndex* res_index, + CStorageConfig storage_config); CStatus DeleteIndex(CIndex index); diff --git a/internal/core/src/segcore/SegmentSealed.h b/internal/core/src/segcore/SegmentSealed.h index bac359b600..707dc4cf8f 100644 --- a/internal/core/src/segcore/SegmentSealed.h +++ b/internal/core/src/segcore/SegmentSealed.h @@ -17,13 +17,14 @@ #include "common/LoadInfo.h" #include "pb/segcore.pb.h" #include "segcore/SegmentInterface.h" +#include "segcore/Types.h" namespace milvus::segcore { class SegmentSealed : public SegmentInternalInterface { public: virtual void - LoadIndex(const index::LoadIndexInfo& info) = 0; + LoadIndex(const LoadIndexInfo& info) = 0; virtual void LoadSegmentMeta(const milvus::proto::segcore::LoadSegmentMeta& meta) = 0; virtual void diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 4ccd89878e..293d9b93ae 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -39,7 +39,7 @@ SegmentSealedImpl::PreDelete(int64_t size) { } void -SegmentSealedImpl::LoadIndex(const index::LoadIndexInfo& info) { +SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) { // print(info); // NOTE: lock only when data is ready to avoid starvation auto field_id = FieldId(info.field_id); @@ -53,7 +53,7 @@ SegmentSealedImpl::LoadIndex(const index::LoadIndexInfo& info) { } void -SegmentSealedImpl::LoadVecIndex(const index::LoadIndexInfo& info) { +SegmentSealedImpl::LoadVecIndex(const LoadIndexInfo& info) { // NOTE: lock only when data is ready to avoid starvation auto field_id = FieldId(info.field_id); auto& field_meta = schema_->operator[](field_id); @@ -76,8 +76,7 @@ SegmentSealedImpl::LoadVecIndex(const index::LoadIndexInfo& info) { std::to_string(row_count_opt_.value()) + ")"); } AssertInfo(!vector_indexings_.is_ready(field_id), "vec index is not ready"); - vector_indexings_.append_field_indexing(field_id, metric_type, - std::move(const_cast(info).index)); + vector_indexings_.append_field_indexing(field_id, metric_type, std::move(const_cast(info).index)); set_bit(index_ready_bitset_, field_id, true); update_row_count(row_count); @@ -85,7 +84,7 @@ SegmentSealedImpl::LoadVecIndex(const index::LoadIndexInfo& info) { } void -SegmentSealedImpl::LoadScalarIndex(const index::LoadIndexInfo& info) { +SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) { // NOTE: lock only when data is ready to avoid starvation auto field_id = FieldId(info.field_id); auto& field_meta = schema_->operator[](field_id); @@ -106,7 +105,7 @@ SegmentSealedImpl::LoadScalarIndex(const index::LoadIndexInfo& info) { std::to_string(row_count_opt_.value()) + ")"); } - scalar_indexings_[field_id] = std::move(const_cast(info).index); + scalar_indexings_[field_id] = std::move(const_cast(info).index); // reverse pk from scalar index and set pks to offset if (schema_->get_primary_field_id() == field_id) { AssertInfo(field_id.get() != -1, "Primary key is -1"); diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index e328b0e4e2..19a0e80950 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -35,7 +35,7 @@ class SegmentSealedImpl : public SegmentSealed { public: explicit SegmentSealedImpl(SchemaPtr schema, int64_t segment_id); void - LoadIndex(const index::LoadIndexInfo& info) override; + LoadIndex(const LoadIndexInfo& info) override; void LoadFieldData(const LoadFieldDataInfo& info) override; void @@ -172,10 +172,10 @@ class SegmentSealedImpl : public SegmentSealed { search_ids(const BitsetType& view, Timestamp timestamp) const override; void - LoadVecIndex(const index::LoadIndexInfo& info); + LoadVecIndex(const LoadIndexInfo& info); void - LoadScalarIndex(const index::LoadIndexInfo& info); + LoadScalarIndex(const LoadIndexInfo& info); private: // segment loading state diff --git a/internal/core/src/segcore/Types.h b/internal/core/src/segcore/Types.h new file mode 100644 index 0000000000..83495893a7 --- /dev/null +++ b/internal/core/src/segcore/Types.h @@ -0,0 +1,46 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include "common/Types.h" +#include "common/type_c.h" +#include "index/Index.h" +#include "storage/Types.h" + +namespace milvus::segcore { + +struct LoadIndexInfo { + int64_t collection_id; + int64_t partition_id; + int64_t segment_id; + int64_t field_id; + DataType field_type; + int64_t index_id; + int64_t index_build_id; + int64_t index_version; + std::map index_params; + std::vector index_files; + index::IndexBasePtr index; + storage::StorageConfig storage_config; +}; + +} // namespace milvus::segcore diff --git a/internal/core/src/segcore/load_index_c.cpp b/internal/core/src/segcore/load_index_c.cpp index 62a93eee1f..f1efa77b39 100644 --- a/internal/core/src/segcore/load_index_c.cpp +++ b/internal/core/src/segcore/load_index_c.cpp @@ -17,11 +17,21 @@ #include "index/IndexFactory.h" #include "storage/Util.h" #include "segcore/load_index_c.h" +#include "segcore/Types.h" CStatus -NewLoadIndexInfo(CLoadIndexInfo* c_load_index_info) { +NewLoadIndexInfo(CLoadIndexInfo* c_load_index_info, CStorageConfig c_storage_config) { try { - auto load_index_info = std::make_unique(); + auto load_index_info = std::make_unique(); + auto& storage_config = load_index_info->storage_config; + storage_config.address = std::string(c_storage_config.address); + storage_config.bucket_name = std::string(c_storage_config.bucket_name); + storage_config.access_key_id = std::string(c_storage_config.access_key_id); + storage_config.access_key_value = std::string(c_storage_config.access_key_value); + storage_config.remote_root_path = std::string(c_storage_config.remote_root_path); + storage_config.storage_type = std::string(c_storage_config.storage_type); + storage_config.iam_endpoint = std::string(c_storage_config.iam_endpoint); + *c_load_index_info = load_index_info.release(); auto status = CStatus(); status.error_code = Success; @@ -37,14 +47,14 @@ NewLoadIndexInfo(CLoadIndexInfo* c_load_index_info) { void DeleteLoadIndexInfo(CLoadIndexInfo c_load_index_info) { - auto info = (milvus::index::LoadIndexInfo*)c_load_index_info; + auto info = (milvus::segcore::LoadIndexInfo*)c_load_index_info; delete info; } CStatus AppendIndexParam(CLoadIndexInfo c_load_index_info, const char* c_index_key, const char* c_index_value) { try { - auto load_index_info = (milvus::index::LoadIndexInfo*)c_load_index_info; + auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info; std::string index_key(c_index_key); std::string index_value(c_index_value); load_index_info->index_params[index_key] = index_value; @@ -69,7 +79,7 @@ AppendFieldInfo(CLoadIndexInfo c_load_index_info, int64_t field_id, enum CDataType field_type) { try { - auto load_index_info = (milvus::index::LoadIndexInfo*)c_load_index_info; + auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info; load_index_info->collection_id = collection_id; load_index_info->partition_id = partition_id; load_index_info->segment_id = segment_id; @@ -91,7 +101,7 @@ AppendFieldInfo(CLoadIndexInfo c_load_index_info, CStatus appendVecIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) { try { - auto load_index_info = (milvus::index::LoadIndexInfo*)c_load_index_info; + auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info; auto binary_set = (knowhere::BinarySet*)c_binary_set; auto& index_params = load_index_info->index_params; @@ -117,7 +127,8 @@ appendVecIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) { load_index_info->segment_id, load_index_info->field_id}; milvus::storage::IndexMeta index_meta{load_index_info->segment_id, load_index_info->field_id, load_index_info->index_build_id, load_index_info->index_version}; - auto file_manager = milvus::storage::CreateFileManager(index_info.index_type, field_meta, index_meta); + auto file_manager = milvus::storage::CreateFileManager(index_info.index_type, field_meta, index_meta, + load_index_info->storage_config); auto config = milvus::index::ParseConfigFromIndexParams(load_index_info->index_params); config["index_files"] = load_index_info->index_files; @@ -139,7 +150,7 @@ appendVecIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) { CStatus appendScalarIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) { try { - auto load_index_info = (milvus::index::LoadIndexInfo*)c_load_index_info; + auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info; auto field_type = load_index_info->field_type; auto binary_set = (knowhere::BinarySet*)c_binary_set; auto& index_params = load_index_info->index_params; @@ -171,7 +182,7 @@ appendScalarIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) { CStatus AppendIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) { - auto load_index_info = (milvus::index::LoadIndexInfo*)c_load_index_info; + auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info; auto field_type = load_index_info->field_type; if (milvus::datatype_is_vector(field_type)) { return appendVecIndex(c_load_index_info, c_binary_set); @@ -182,7 +193,7 @@ AppendIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) { CStatus AppendIndexFilePath(CLoadIndexInfo c_load_index_info, const char* c_file_path) { try { - auto load_index_info = (milvus::index::LoadIndexInfo*)c_load_index_info; + auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info; std::string index_file_path(c_file_path); load_index_info->index_files.emplace_back(index_file_path); @@ -201,7 +212,7 @@ AppendIndexFilePath(CLoadIndexInfo c_load_index_info, const char* c_file_path) { CStatus AppendIndexInfo(CLoadIndexInfo c_load_index_info, int64_t index_id, int64_t build_id, int64_t version) { try { - auto load_index_info = (milvus::index::LoadIndexInfo*)c_load_index_info; + auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info; load_index_info->index_id = index_id; load_index_info->index_build_id = build_id; load_index_info->index_version = version; @@ -221,7 +232,7 @@ AppendIndexInfo(CLoadIndexInfo c_load_index_info, int64_t index_id, int64_t buil CStatus CleanLoadedIndex(CLoadIndexInfo c_load_index_info) { try { - auto load_index_info = (milvus::index::LoadIndexInfo*)c_load_index_info; + auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info; auto index_file_path_prefix = milvus::storage::GenLocalIndexPathPrefix(load_index_info->index_build_id, load_index_info->index_version); #ifdef BUILD_DISK_ANN diff --git a/internal/core/src/segcore/load_index_c.h b/internal/core/src/segcore/load_index_c.h index e9f46ebd6a..1e11dd907d 100644 --- a/internal/core/src/segcore/load_index_c.h +++ b/internal/core/src/segcore/load_index_c.h @@ -24,7 +24,7 @@ extern "C" { typedef void* CLoadIndexInfo; CStatus -NewLoadIndexInfo(CLoadIndexInfo* c_load_index_info); +NewLoadIndexInfo(CLoadIndexInfo* c_load_index_info, CStorageConfig c_storage_config); void DeleteLoadIndexInfo(CLoadIndexInfo c_load_index_info); diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index bdd87bb582..c43804991d 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -238,7 +238,7 @@ UpdateSealedSegmentIndex(CSegmentInterface c_segment, CLoadIndexInfo c_load_inde auto segment_interface = reinterpret_cast(c_segment); auto segment = dynamic_cast(segment_interface); AssertInfo(segment != nullptr, "segment conversion failed"); - auto load_index_info = (milvus::index::LoadIndexInfo*)c_load_index_info; + auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info; segment->LoadIndex(*load_index_info); return milvus::SuccessCStatus(); } catch (std::exception& e) { diff --git a/internal/core/src/storage/CMakeLists.txt b/internal/core/src/storage/CMakeLists.txt index 80245ef9bc..47acb32e0d 100644 --- a/internal/core/src/storage/CMakeLists.txt +++ b/internal/core/src/storage/CMakeLists.txt @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - option( EMBEDDED_MILVUS "Enable embedded Milvus" OFF ) if ( EMBEDDED_MILVUS ) @@ -46,10 +45,12 @@ endif () add_library(milvus_storage SHARED ${STORAGE_FILES}) +find_package(Boost REQUIRED COMPONENTS filesystem) + if ( BUILD_DISK_ANN STREQUAL "ON" ) - target_link_libraries( milvus_storage PUBLIC milvus_common boost_system boost_filesystem aws-cpp-sdk-s3 pthread) + target_link_libraries( milvus_storage PUBLIC milvus_common Boost::filesystem aws-cpp-sdk-s3 pthread) else() - target_link_libraries( milvus_storage PUBLIC milvus_common pthread) + target_link_libraries( milvus_storage PUBLIC milvus_common Boost::filesystem pthread) endif() if(NOT CMAKE_INSTALL_PREFIX) diff --git a/internal/core/src/storage/ChunkManager.h b/internal/core/src/storage/ChunkManager.h index 58ea6fc026..8e62d5943b 100644 --- a/internal/core/src/storage/ChunkManager.h +++ b/internal/core/src/storage/ChunkManager.h @@ -123,6 +123,6 @@ class RemoteChunkManager : public ChunkManager { } }; -using RemoteChunkManagerSPtr = std::shared_ptr; +using RemoteChunkManagerPtr = std::unique_ptr; } // namespace milvus::storage diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index 9be934b3c8..be1cfe0ea3 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -58,8 +58,12 @@ using WriteLock = std::lock_guard; namespace milvus::storage { -DiskFileManagerImpl::DiskFileManagerImpl(const FieldDataMeta& field_mata, const IndexMeta& index_meta) +DiskFileManagerImpl::DiskFileManagerImpl(const FieldDataMeta& field_mata, + const IndexMeta& index_meta, + const StorageConfig& storage_config) : field_meta_(field_mata), index_meta_(index_meta) { + remote_root_path_ = storage_config.remote_root_path; + rcm_ = std::make_unique(storage_config); } DiskFileManagerImpl::~DiskFileManagerImpl() { @@ -75,7 +79,6 @@ DiskFileManagerImpl::LoadFile(const std::string& file) noexcept { bool DiskFileManagerImpl::AddFile(const std::string& file) noexcept { auto& local_chunk_manager = LocalChunkManager::GetInstance(); - auto& remote_chunk_manager = MinioChunkManager::GetInstance(); FILEMANAGER_TRY if (!local_chunk_manager.Exist(file)) { LOG_SEGCORE_ERROR_C << "local file: " << file << " does not exist "; @@ -106,7 +109,7 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept { // Put file to remote char objectKey[200]; snprintf(objectKey, sizeof(objectKey), "%s/%s_%d", remotePrefix.c_str(), fileName.c_str(), slice_num); - remote_chunk_manager.Write(objectKey, serialized_index_data.data(), serialized_index_size); + rcm_->Write(objectKey, serialized_index_data.data(), serialized_index_size); offset += batch_size; // record remote file to save etcd @@ -121,7 +124,6 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept { void DiskFileManagerImpl::CacheIndexToDisk(std::vector remote_files) { auto& local_chunk_manager = LocalChunkManager::GetInstance(); - auto& remote_chunk_manager = MinioChunkManager::GetInstance(); std::map> index_slices; for (auto& file_path : remote_files) { @@ -140,9 +142,9 @@ DiskFileManagerImpl::CacheIndexToDisk(std::vector remote_files) { int64_t offset = 0; for (auto iter = slices.second.begin(); iter != slices.second.end(); iter++) { auto origin_file = prefix + "_" + std::to_string(*iter); - auto fileSize = remote_chunk_manager.Size(origin_file); + auto fileSize = rcm_->Size(origin_file); auto buf = std::unique_ptr(new uint8_t[fileSize]); - remote_chunk_manager.Read(origin_file, buf.get(), fileSize); + rcm_->Read(origin_file, buf.get(), fileSize); auto decoded_index_data = DeserializeFileData(buf.get(), fileSize); auto index_payload = decoded_index_data->GetPayload(); @@ -164,9 +166,9 @@ DiskFileManagerImpl::GetFileName(const std::string& localfile) { std::string DiskFileManagerImpl::GetRemoteIndexObjectPrefix() { - return ChunkMangerConfig::GetRemoteRootPath() + "/" + std::string(INDEX_ROOT_PATH) + "/" + - std::to_string(index_meta_.build_id) + "/" + std::to_string(index_meta_.index_version) + "/" + - std::to_string(field_meta_.partition_id) + "/" + std::to_string(field_meta_.segment_id); + return remote_root_path_ + "/" + std::string(INDEX_ROOT_PATH) + "/" + std::to_string(index_meta_.build_id) + "/" + + std::to_string(index_meta_.index_version) + "/" + std::to_string(field_meta_.partition_id) + "/" + + std::to_string(field_meta_.segment_id); } std::string @@ -184,12 +186,11 @@ DiskFileManagerImpl::RemoveFile(const std::string& file) noexcept { // remove local file bool localExist = false; auto& local_chunk_manager = LocalChunkManager::GetInstance(); - auto& remote_chunk_manager = MinioChunkManager::GetInstance(); FILEMANAGER_TRY localExist = local_chunk_manager.Exist(file); FILEMANAGER_CATCH FILEMANAGER_END - if (!localExist) { + if (localExist) { FILEMANAGER_TRY local_chunk_manager.Remove(file); FILEMANAGER_CATCH @@ -200,12 +201,12 @@ DiskFileManagerImpl::RemoveFile(const std::string& file) noexcept { std::string remoteFile = ""; bool remoteExist = false; FILEMANAGER_TRY - remoteExist = remote_chunk_manager.Exist(remoteFile); + remoteExist = rcm_->Exist(remoteFile); FILEMANAGER_CATCH FILEMANAGER_END - if (!remoteExist) { + if (remoteExist) { FILEMANAGER_TRY - remote_chunk_manager.Remove(file); + rcm_->Remove(file); FILEMANAGER_CATCH FILEMANAGER_END } @@ -216,7 +217,6 @@ std::optional DiskFileManagerImpl::IsExisted(const std::string& file) noexcept { bool isExist = false; auto& local_chunk_manager = LocalChunkManager::GetInstance(); - auto& remote_chunk_manager = MinioChunkManager::GetInstance(); try { isExist = local_chunk_manager.Exist(file); } catch (LocalChunkManagerException& e) { diff --git a/internal/core/src/storage/DiskFileManagerImpl.h b/internal/core/src/storage/DiskFileManagerImpl.h index 7761545d3f..07f1419e95 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.h +++ b/internal/core/src/storage/DiskFileManagerImpl.h @@ -24,12 +24,15 @@ #include "storage/IndexData.h" #include "storage/FileManager.h" +#include "storage/MinioChunkManager.h" namespace milvus::storage { class DiskFileManagerImpl : public FileManagerImpl { public: - explicit DiskFileManagerImpl(const FieldDataMeta& field_mata, const IndexMeta& index_meta); + explicit DiskFileManagerImpl(const FieldDataMeta& field_mata, + const IndexMeta& index_meta, + const StorageConfig& storage_config); virtual ~DiskFileManagerImpl(); @@ -65,6 +68,11 @@ class DiskFileManagerImpl : public FileManagerImpl { return remote_paths_to_size_; } + std::vector + GetLocalFilePaths() const { + return local_paths_; + } + void CacheIndexToDisk(std::vector remote_files); @@ -99,6 +107,9 @@ class DiskFileManagerImpl : public FileManagerImpl { // remote file path std::map remote_paths_to_size_; + + RemoteChunkManagerPtr rcm_; + std::string remote_root_path_; }; using DiskANNFileManagerImplPtr = std::shared_ptr; diff --git a/internal/core/src/storage/MinioChunkManager.cpp b/internal/core/src/storage/MinioChunkManager.cpp index 6e95d6bb40..27b801cee6 100644 --- a/internal/core/src/storage/MinioChunkManager.cpp +++ b/internal/core/src/storage/MinioChunkManager.cpp @@ -64,18 +64,13 @@ ConvertFromAwsString(const Aws::String& aws_str) { return std::string(aws_str.c_str(), aws_str.size()); } -MinioChunkManager::MinioChunkManager(const std::string& endpoint, - const std::string& access_key, - const std::string& access_value, - const std::string& bucket_name, - bool secure, - bool use_iam) - : default_bucket_name_(bucket_name) { +MinioChunkManager::MinioChunkManager(const StorageConfig& storage_config) + : default_bucket_name_(storage_config.bucket_name) { Aws::InitAPI(sdk_options_); Aws::Client::ClientConfiguration config; - config.endpointOverride = ConvertToAwsString(endpoint); + config.endpointOverride = ConvertToAwsString(storage_config.address); - if (secure) { + if (storage_config.useSSL) { config.scheme = Aws::Http::Scheme::HTTPS; config.verifySSL = true; } else { @@ -83,7 +78,7 @@ MinioChunkManager::MinioChunkManager(const std::string& endpoint, config.verifySSL = false; } - if (use_iam) { + if (storage_config.useIAM) { auto provider = std::make_shared(); client_ = std::make_shared(provider, config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); @@ -94,13 +89,19 @@ MinioChunkManager::MinioChunkManager(const std::string& endpoint, << " token:" << provider->GetAWSCredentials().GetSessionToken() << "}"; } else { client_ = std::make_shared( - Aws::Auth::AWSCredentials(ConvertToAwsString(access_key), ConvertToAwsString(access_value)), config, - Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); + Aws::Auth::AWSCredentials(ConvertToAwsString(storage_config.access_key_id), + ConvertToAwsString(storage_config.access_key_value)), + config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); } - LOG_SEGCORE_INFO_C << "init MinioChunkManager with parameter[endpoint: '" << endpoint << "', access_key:'" - << access_key << "', access_value:'" << access_value << "', default_bucket_name:'" << bucket_name - << "', use_secure:'" << std::boolalpha << secure << "']"; + if (!BucketExists(storage_config.bucket_name)) { + CreateBucket(storage_config.bucket_name); + } + + LOG_SEGCORE_INFO_C << "init MinioChunkManager with parameter[endpoint: '" << storage_config.address + << "', access_key:'" << storage_config.access_key_id << "', access_value:'" + << storage_config.access_key_value << "', default_bucket_name:'" << storage_config.bucket_name + << "', use_secure:'" << std::boolalpha << storage_config.useSSL << "']"; } MinioChunkManager::~MinioChunkManager() { @@ -179,7 +180,8 @@ MinioChunkManager::CreateBucket(const std::string& bucket_name) { auto outcome = client_->CreateBucket(request); - if (!outcome.IsSuccess()) { + if (!outcome.IsSuccess() && + Aws::S3::S3Errors(outcome.GetError().GetErrorType()) != Aws::S3::S3Errors::BUCKET_ALREADY_OWNED_BY_YOU) { THROWS3ERROR(CreateBucket); } return true; diff --git a/internal/core/src/storage/MinioChunkManager.h b/internal/core/src/storage/MinioChunkManager.h index d3e92711b4..5be2c1f33c 100644 --- a/internal/core/src/storage/MinioChunkManager.h +++ b/internal/core/src/storage/MinioChunkManager.h @@ -23,9 +23,10 @@ #include #include -#include "ChunkManager.h" -#include "Exception.h" #include "config/ConfigChunkManager.h" +#include "storage/ChunkManager.h" +#include "storage/Exception.h" +#include "storage/Types.h" namespace milvus::storage { @@ -33,13 +34,8 @@ namespace milvus::storage { * @brief This MinioChunkManager is responsible for read and write file in S3. */ class MinioChunkManager : public RemoteChunkManager { - private: - explicit MinioChunkManager(const std::string& endpoint, - const std::string& access_key, - const std::string& access_value, - const std::string& default_bucket_name, - bool serure = false, - bool use_iam = false); + public: + explicit MinioChunkManager(const StorageConfig& storage_config); MinioChunkManager(const MinioChunkManager&); MinioChunkManager& @@ -48,15 +44,6 @@ class MinioChunkManager : public RemoteChunkManager { public: virtual ~MinioChunkManager(); - static MinioChunkManager& - GetInstance() { - // thread-safe enough after c++ 11 - static MinioChunkManager instance(ChunkMangerConfig::GetAddress(), ChunkMangerConfig::GetAccessKey(), - ChunkMangerConfig::GetAccessValue(), ChunkMangerConfig::GetBucketName(), - ChunkMangerConfig::GetUseSSL(), ChunkMangerConfig::GetUseIAM()); - return instance; - } - virtual bool Exist(const std::string& filepath); @@ -132,6 +119,6 @@ class MinioChunkManager : public RemoteChunkManager { std::string default_bucket_name_; }; -using MinioChunkManagerSPtr = std::shared_ptr; +using MinioChunkManagerPtr = std::unique_ptr; } // namespace milvus::storage diff --git a/internal/core/src/storage/Types.h b/internal/core/src/storage/Types.h index d42826ba70..3b4817776e 100644 --- a/internal/core/src/storage/Types.h +++ b/internal/core/src/storage/Types.h @@ -79,4 +79,16 @@ struct IndexMeta { std::string key; }; +struct StorageConfig { + std::string address = "localhost:9000"; + std::string bucket_name = "a-bucket"; + std::string access_key_id = "minioadmin"; + std::string access_key_value = "minioadmin"; + std::string remote_root_path = "files"; + std::string storage_type = "minio"; + std::string iam_endpoint = ""; + bool useSSL = false; + bool useIAM = false; +}; + } // namespace milvus::storage diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index 275ec1e245..6813a47b7d 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -360,11 +360,14 @@ is_in_disk_list(const IndexType& index_type) { } FileManagerImplPtr -CreateFileManager(IndexType index_type, const FieldDataMeta& field_meta, const IndexMeta& index_meta) { +CreateFileManager(IndexType index_type, + const FieldDataMeta& field_meta, + const IndexMeta& index_meta, + const StorageConfig& storage_config) { // TODO :: switch case index type to create file manager #ifdef BUILD_DISK_ANN if (is_in_disk_list(index_type)) { - return std::make_shared(field_meta, index_meta); + return std::make_shared(field_meta, index_meta, storage_config); } #endif diff --git a/internal/core/src/storage/Util.h b/internal/core/src/storage/Util.h index 1e9d561d09..359328d39b 100644 --- a/internal/core/src/storage/Util.h +++ b/internal/core/src/storage/Util.h @@ -79,6 +79,9 @@ bool is_in_disk_list(const IndexType& index_type); FileManagerImplPtr -CreateFileManager(IndexType index_type, const FieldDataMeta& field_meta, const IndexMeta& index_meta); +CreateFileManager(IndexType index_type, + const FieldDataMeta& field_meta, + const IndexMeta& index_meta, + const StorageConfig& storage_config); } // namespace milvus::storage diff --git a/internal/core/unittest/bench/bench_indexbuilder.cpp b/internal/core/unittest/bench/bench_indexbuilder.cpp index fd61f59958..d8d596160a 100644 --- a/internal/core/unittest/bench/bench_indexbuilder.cpp +++ b/internal/core/unittest/bench/bench_indexbuilder.cpp @@ -64,7 +64,8 @@ IndexBuilder_build(benchmark::State& state) { for (auto _ : state) { auto index = std::make_unique( - milvus::DataType::VECTOR_FLOAT, type_params_str.c_str(), index_params_str.c_str()); + milvus::DataType::VECTOR_FLOAT, type_params_str.c_str(), index_params_str.c_str(), + get_default_storage_config()); index->Build(xb_dataset); } } @@ -93,7 +94,8 @@ IndexBuilder_build_and_codec(benchmark::State& state) { for (auto _ : state) { auto index = std::make_unique( - milvus::DataType::VECTOR_FLOAT, type_params_str.c_str(), index_params_str.c_str()); + milvus::DataType::VECTOR_FLOAT, type_params_str.c_str(), index_params_str.c_str(), + get_default_storage_config()); index->Build(xb_dataset); index->Serialize(); diff --git a/internal/core/unittest/bench/bench_search.cpp b/internal/core/unittest/bench/bench_search.cpp index 0edef07998..b6407c5800 100644 --- a/internal/core/unittest/bench/bench_search.cpp +++ b/internal/core/unittest/bench/bench_search.cpp @@ -106,7 +106,7 @@ Search_Sealed(benchmark::State& state) { // ivf auto vec = dataset_.get_col(milvus::FieldId(100)); auto indexing = GenVecIndexing(N, dim, vec.data()); - index::LoadIndexInfo info; + segcore::LoadIndexInfo info; info.index = std::move(indexing); info.field_id = (*schema)[FieldName("fakevec")].get_id().get(); info.index_params["index_type"] = "IVF"; diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 7728905e1b..786b79a3db 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -38,12 +38,13 @@ using namespace milvus; using namespace milvus::segcore; using namespace milvus::index; using namespace knowhere; -using milvus::index::LoadIndexInfo; using milvus::index::VectorIndex; +using milvus::segcore::LoadIndexInfo; namespace { // const int DIM = 16; const int64_t ROW_COUNT = 100 * 1000; +const CStorageConfig c_storage_config = get_default_cstorage_config(); const char* get_default_schema_config() { @@ -1413,7 +1414,7 @@ TEST(CApiTest, LoadIndexInfo) { CBinarySet c_binary_set = (CBinarySet)&binary_set; void* c_load_index_info = nullptr; - auto status = NewLoadIndexInfo(&c_load_index_info); + auto status = NewLoadIndexInfo(&c_load_index_info, c_storage_config); assert(status.error_code == Success); std::string index_param_key1 = "index_type"; std::string index_param_value1 = "IVF_PQ"; @@ -1462,7 +1463,7 @@ TEST(CApiTest, LoadIndex_Search) { auto binary_set = indexing->Serialize(conf); // fill loadIndexInfo - milvus::index::LoadIndexInfo load_index_info; + milvus::segcore::LoadIndexInfo load_index_info; auto& index_params = load_index_info.index_params; index_params["index_type"] = "IVF_PQ"; index_params["index_mode"] = "CPU"; @@ -1567,7 +1568,7 @@ TEST(CApiTest, Indexing_Without_Predicate) { auto binary_set = indexing->Serialize(milvus::Config{}); void* c_load_index_info = nullptr; - status = NewLoadIndexInfo(&c_load_index_info); + status = NewLoadIndexInfo(&c_load_index_info, c_storage_config); assert(status.error_code == Success); std::string index_type_key = "index_type"; std::string index_type_value = "IVF_PQ"; @@ -1688,7 +1689,7 @@ TEST(CApiTest, Indexing_Expr_Without_Predicate) { auto binary_set = indexing->Serialize(milvus::Config{}); void* c_load_index_info = nullptr; - status = NewLoadIndexInfo(&c_load_index_info); + status = NewLoadIndexInfo(&c_load_index_info, c_storage_config); assert(status.error_code == Success); std::string index_type_key = "index_type"; std::string index_type_value = "IVF_PQ"; @@ -1826,7 +1827,7 @@ TEST(CApiTest, Indexing_With_float_Predicate_Range) { auto binary_set = indexing->Serialize(milvus::Config{}); void* c_load_index_info = nullptr; - status = NewLoadIndexInfo(&c_load_index_info); + status = NewLoadIndexInfo(&c_load_index_info, c_storage_config); assert(status.error_code == Success); std::string index_type_key = "index_type"; std::string index_type_value = "IVF_PQ"; @@ -1978,7 +1979,7 @@ TEST(CApiTest, Indexing_Expr_With_float_Predicate_Range) { auto binary_set = indexing->Serialize(milvus::Config{}); void* c_load_index_info = nullptr; - status = NewLoadIndexInfo(&c_load_index_info); + status = NewLoadIndexInfo(&c_load_index_info, c_storage_config); assert(status.error_code == Success); std::string index_type_key = "index_type"; std::string index_type_value = "IVF_PQ"; @@ -2114,7 +2115,7 @@ TEST(CApiTest, Indexing_With_float_Predicate_Term) { auto binary_set = indexing->Serialize(milvus::Config{}); void* c_load_index_info = nullptr; - status = NewLoadIndexInfo(&c_load_index_info); + status = NewLoadIndexInfo(&c_load_index_info, c_storage_config); assert(status.error_code == Success); std::string index_type_key = "index_type"; std::string index_type_value = "IVF_PQ"; @@ -2259,7 +2260,7 @@ TEST(CApiTest, Indexing_Expr_With_float_Predicate_Term) { auto binary_set = indexing->Serialize(milvus::Config{}); void* c_load_index_info = nullptr; - status = NewLoadIndexInfo(&c_load_index_info); + status = NewLoadIndexInfo(&c_load_index_info, c_storage_config); assert(status.error_code == Success); std::string index_type_key = "index_type"; std::string index_type_value = "IVF_PQ"; @@ -2397,7 +2398,7 @@ TEST(CApiTest, Indexing_With_binary_Predicate_Range) { auto binary_set = indexing->Serialize(milvus::Config{}); void* c_load_index_info = nullptr; - status = NewLoadIndexInfo(&c_load_index_info); + status = NewLoadIndexInfo(&c_load_index_info, c_storage_config); assert(status.error_code == Success); std::string index_type_key = "index_type"; std::string index_type_value = "BIN_IVF_FLAT"; @@ -2547,7 +2548,7 @@ TEST(CApiTest, Indexing_Expr_With_binary_Predicate_Range) { auto binary_set = indexing->Serialize(milvus::Config{}); void* c_load_index_info = nullptr; - status = NewLoadIndexInfo(&c_load_index_info); + status = NewLoadIndexInfo(&c_load_index_info, c_storage_config); assert(status.error_code == Success); std::string index_type_key = "index_type"; std::string index_type_value = "BIN_IVF_FLAT"; @@ -2684,7 +2685,7 @@ TEST(CApiTest, Indexing_With_binary_Predicate_Term) { auto binary_set = indexing->Serialize(milvus::Config{}); void* c_load_index_info = nullptr; - status = NewLoadIndexInfo(&c_load_index_info); + status = NewLoadIndexInfo(&c_load_index_info, c_storage_config); assert(status.error_code == Success); std::string index_type_key = "index_type"; std::string index_type_value = "BIN_IVF_FLAT"; @@ -2844,7 +2845,7 @@ TEST(CApiTest, Indexing_Expr_With_binary_Predicate_Term) { auto binary_set = indexing->Serialize(milvus::Config{}); void* c_load_index_info = nullptr; - status = NewLoadIndexInfo(&c_load_index_info); + status = NewLoadIndexInfo(&c_load_index_info, c_storage_config); assert(status.error_code == Success); std::string index_type_key = "index_type"; std::string index_type_value = "BIN_IVF_FLAT"; @@ -3003,7 +3004,7 @@ TEST(CApiTest, SealedSegment_search_float_Predicate_Range) { IndexEnum::INDEX_FAISS_IVFPQ, DIM, N); auto binary_set = indexing->Serialize(milvus::Config{}); void* c_load_index_info = nullptr; - status = NewLoadIndexInfo(&c_load_index_info); + status = NewLoadIndexInfo(&c_load_index_info, c_storage_config); assert(status.error_code == Success); std::string index_type_key = "index_type"; std::string index_type_value = "IVF_PQ"; @@ -3282,7 +3283,7 @@ TEST(CApiTest, SealedSegment_search_float_With_Expr_Predicate_Range) { auto binary_set = indexing->Serialize(milvus::Config{}); void* c_load_index_info = nullptr; - status = NewLoadIndexInfo(&c_load_index_info); + status = NewLoadIndexInfo(&c_load_index_info, c_storage_config); assert(status.error_code == Success); std::string index_type_key = "index_type"; std::string index_type_value = "IVF_PQ"; diff --git a/internal/core/unittest/test_disk_file_manager_test.cpp b/internal/core/unittest/test_disk_file_manager_test.cpp index f3c9bea4b2..ff9bf0870d 100644 --- a/internal/core/unittest/test_disk_file_manager_test.cpp +++ b/internal/core/unittest/test_disk_file_manager_test.cpp @@ -9,22 +9,16 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License -#include -#include -#include #include -#include -#include #include #include -#include #include "storage/Event.h" -#include "storage/MinioChunkManager.h" #include "storage/LocalChunkManager.h" #include "storage/DiskFileManagerImpl.h" #include "config/ConfigChunkManager.h" #include "config/ConfigKnowhere.h" +#include "test_utils/indexbuilder_test_utils.h" using namespace std; using namespace milvus; @@ -39,84 +33,21 @@ class DiskAnnFileManagerTest : public testing::Test { ~DiskAnnFileManagerTest() { } - bool - FindFile(const path& dir, const string& file_name, path& path_found) { - const recursive_directory_iterator end; - boost::system::error_code err; - auto iter = recursive_directory_iterator(dir, err); - while (iter != end) { - try { - if ((*iter).path().filename() == file_name) { - path_found = (*iter).path(); - return true; - } - iter++; - } catch (filesystem_error& e) { - } catch (std::exception& e) { - // ignore error - } - } - return false; - } - - string - GetConfig() { - char testPath[100]; - auto pwd = string(getcwd(testPath, sizeof(testPath))); - path filepath; - auto currentPath = path(pwd); - while (!FindFile(currentPath, "milvus.yaml", filepath)) { - currentPath = currentPath.append("../"); - } - return filepath.string(); - } - - void - InitRemoteChunkManager() { - auto configPath = GetConfig(); - cout << configPath << endl; - YAML::Node config; - config = YAML::LoadFile(configPath); - auto minioConfig = config["minio"]; - auto address = minioConfig["address"].as(); - auto port = minioConfig["port"].as(); - auto endpoint = address + ":" + port; - auto accessKey = minioConfig["accessKeyID"].as(); - auto accessValue = minioConfig["secretAccessKey"].as(); - auto useSSL = minioConfig["useSSL"].as(); - auto bucketName = minioConfig["bucketName"].as(); - - ChunkMangerConfig::SetAddress(endpoint); - ChunkMangerConfig::SetAccessKey(accessKey); - ChunkMangerConfig::SetAccessValue(accessValue); - ChunkMangerConfig::SetBucketName(bucketName); - ChunkMangerConfig::SetUseSSL(useSSL); - } - - void - InitLocalChunkManager() { - ChunkMangerConfig::SetLocalRootPath("/tmp/diskann"); - config::KnowhereSetIndexSliceSize(5); - } - virtual void SetUp() { - InitLocalChunkManager(); - InitRemoteChunkManager(); + ChunkMangerConfig::SetLocalRootPath("/tmp/diskann"); + config::KnowhereSetIndexSliceSize(5); + storage_config_ = get_default_storage_config(); } + + protected: + StorageConfig storage_config_; }; TEST_F(DiskAnnFileManagerTest, AddFilePositive) { auto& lcm = LocalChunkManager::GetInstance(); - auto& rcm = MinioChunkManager::GetInstance(); - string testBucketName = "test-diskann"; - rcm.SetBucketName(testBucketName); - EXPECT_EQ(rcm.GetBucketName(), testBucketName); - - if (!rcm.BucketExists(testBucketName)) { - rcm.CreateBucket(testBucketName); - } + storage_config_.bucket_name = testBucketName; std::string indexFilePath = "/tmp/diskann/index_files/1000/index"; auto exist = lcm.Exist(indexFilePath); @@ -132,33 +63,32 @@ TEST_F(DiskAnnFileManagerTest, AddFilePositive) { IndexMeta index_meta = {3, 100, 1000, 1, "index"}; int64_t slice_size = config::KnowhereGetIndexSliceSize() << 20; - auto diskAnnFileManager = std::make_shared(filed_data_meta, index_meta); - diskAnnFileManager->AddFile(indexFilePath); - - // check result - auto remotePrefix = diskAnnFileManager->GetRemoteIndexObjectPrefix(); - auto remoteIndexFiles = rcm.ListWithPrefix(remotePrefix); + auto diskAnnFileManager = std::make_shared(filed_data_meta, index_meta, storage_config_); + auto ok = diskAnnFileManager->AddFile(indexFilePath); + EXPECT_EQ(ok, true); + auto remote_files_to_size = diskAnnFileManager->GetRemotePathsToFileSize(); auto num_slice = index_size / slice_size; - EXPECT_EQ(remoteIndexFiles.size(), index_size % slice_size == 0 ? num_slice : num_slice + 1); + EXPECT_EQ(remote_files_to_size.size(), index_size % slice_size == 0 ? num_slice : num_slice + 1); - diskAnnFileManager->CacheIndexToDisk(remoteIndexFiles); - auto fileSize1 = rcm.Size(remoteIndexFiles[0]); - auto buf = std::unique_ptr(new uint8_t[fileSize1]); - rcm.Read(remoteIndexFiles[0], buf.get(), fileSize1); - - auto index = DeserializeFileData(buf.get(), fileSize1); - auto payload = index->GetPayload(); - auto rows = payload->rows; - auto rawData = payload->raw_data; - - EXPECT_EQ(rows, index_size); - EXPECT_EQ(rawData[0], data[0]); - EXPECT_EQ(rawData[4], data[4]); - - auto files = diskAnnFileManager->GetRemotePathsToFileSize(); - for (auto& value : files) { - rcm.Remove(value.first); + std::vector remote_files; + for (auto& file2size : remote_files_to_size) { + remote_files.emplace_back(file2size.first); + } + diskAnnFileManager->CacheIndexToDisk(remote_files); + auto local_files = diskAnnFileManager->GetLocalFilePaths(); + for (auto& file : local_files) { + auto file_size = lcm.Size(file); + auto buf = std::unique_ptr(new uint8_t[file_size]); + lcm.Read(file, buf.get(), file_size); + + auto index = FieldData(buf.get(), file_size); + auto payload = index.get_payload(); + auto rows = payload->rows; + auto rawData = payload->raw_data; + + EXPECT_EQ(rows, index_size); + EXPECT_EQ(rawData[0], data[0]); + EXPECT_EQ(rawData[4], data[4]); } - rcm.DeleteBucket(testBucketName); } diff --git a/internal/core/unittest/test_expr.cpp b/internal/core/unittest/test_expr.cpp index 76efe66d52..7bc9f521d0 100644 --- a/internal/core/unittest/test_expr.cpp +++ b/internal/core/unittest/test_expr.cpp @@ -630,7 +630,7 @@ TEST(Expr, TestCompareWithScalarIndex) { auto seg = CreateSealedSegment(schema); int N = 1000; auto raw_data = DataGen(schema, N); - index::LoadIndexInfo load_index_info; + segcore::LoadIndexInfo load_index_info; // load index for int32 field auto age32_col = raw_data.get_col(i32_fid); @@ -720,7 +720,7 @@ TEST(Expr, TestCompareWithScalarIndexMaris) { auto seg = CreateSealedSegment(schema); int N = 1000; auto raw_data = DataGen(schema, N); - index::LoadIndexInfo load_index_info; + segcore::LoadIndexInfo load_index_info; // load index for int32 field auto str1_col = raw_data.get_col(str1_fid); @@ -1300,7 +1300,7 @@ TEST(Expr, TestBinaryArithOpEvalRangeWithScalarSortIndex) { auto seg = CreateSealedSegment(schema); int N = 1000; auto raw_data = DataGen(schema, N); - index::LoadIndexInfo load_index_info; + segcore::LoadIndexInfo load_index_info; // load index for int8 field auto age8_col = raw_data.get_col(i8_fid); diff --git a/internal/core/unittest/test_index_c_api.cpp b/internal/core/unittest/test_index_c_api.cpp index 9321d08c83..3ab8cdf941 100644 --- a/internal/core/unittest/test_index_c_api.cpp +++ b/internal/core/unittest/test_index_c_api.cpp @@ -11,23 +11,19 @@ #include #include -#include #include #include #include #include -#include #include "pb/index_cgo_msg.pb.h" -#include "indexbuilder/VecIndexCreator.h" #include "indexbuilder/index_c.h" -#include "test_utils/DataGen.h" #include "test_utils/indexbuilder_test_utils.h" #include "indexbuilder/ScalarIndexCreator.h" -#include "indexbuilder/IndexFactory.h" #include "common/type_c.h" constexpr int NB = 10; +const CStorageConfig c_storage_config = get_default_cstorage_config(); TEST(FloatVecIndex, All) { auto index_type = knowhere::IndexEnum::INDEX_FAISS_IVFPQ; @@ -51,7 +47,7 @@ TEST(FloatVecIndex, All) { CIndex copy_index; { - status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index); + status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index, c_storage_config); ASSERT_EQ(Success, status.error_code); } { @@ -63,7 +59,7 @@ TEST(FloatVecIndex, All) { ASSERT_EQ(Success, status.error_code); } { - status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index); + status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index, c_storage_config); ASSERT_EQ(Success, status.error_code); } { @@ -102,7 +98,7 @@ TEST(BinaryVecIndex, All) { CIndex copy_index; { - status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index); + status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index, c_storage_config); ASSERT_EQ(Success, status.error_code); } { @@ -114,7 +110,7 @@ TEST(BinaryVecIndex, All) { ASSERT_EQ(Success, status.error_code); } { - status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index); + status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index, c_storage_config); ASSERT_EQ(Success, status.error_code); } { @@ -154,7 +150,7 @@ TEST(CBoolIndexTest, All) { CIndex copy_index; { - status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index); + status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index, c_storage_config); ASSERT_EQ(Success, status.error_code); } { @@ -166,7 +162,8 @@ TEST(CBoolIndexTest, All) { ASSERT_EQ(Success, status.error_code); } { - status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index); + status = + CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index, c_storage_config); ASSERT_EQ(Success, status.error_code); } { @@ -204,7 +201,7 @@ TEST(CInt64IndexTest, All) { CIndex copy_index; { - status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index); + status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index, c_storage_config); ASSERT_EQ(Success, status.error_code); } { @@ -216,7 +213,8 @@ TEST(CInt64IndexTest, All) { ASSERT_EQ(Success, status.error_code); } { - status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index); + status = + CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index, c_storage_config); ASSERT_EQ(Success, status.error_code); } { @@ -256,7 +254,7 @@ TEST(CStringIndexTest, All) { CIndex copy_index; { - status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index); + status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index, c_storage_config); ASSERT_EQ(Success, status.error_code); } { @@ -268,7 +266,8 @@ TEST(CStringIndexTest, All) { ASSERT_EQ(Success, status.error_code); } { - status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index); + status = + CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index, c_storage_config); ASSERT_EQ(Success, status.error_code); } { diff --git a/internal/core/unittest/test_index_wrapper.cpp b/internal/core/unittest/test_index_wrapper.cpp index aaafca18db..804c54ff37 100644 --- a/internal/core/unittest/test_index_wrapper.cpp +++ b/internal/core/unittest/test_index_wrapper.cpp @@ -13,7 +13,6 @@ #include #include #include -#include #include "indexbuilder/IndexFactory.h" #include "indexbuilder/VecIndexCreator.h" @@ -33,6 +32,7 @@ class IndexWrapperTest : public ::testing::TestWithParam { SetUp() override { knowhere::KnowhereConfig::SetStatisticsLevel(3); knowhere::KnowhereConfig::SetIndexFileSliceSize(16); + storage_config_ = get_default_storage_config(); auto param = GetParam(); index_type = param.first; @@ -94,6 +94,7 @@ class IndexWrapperTest : public ::testing::TestWithParam { knowhere::DatasetPtr xq_dataset; int64_t query_offset = 100; int64_t NB = 10000; + StorageConfig storage_config_; }; INSTANTIATE_TEST_CASE_P( @@ -111,7 +112,7 @@ INSTANTIATE_TEST_CASE_P( TEST_P(IndexWrapperTest, BuildAndQuery) { auto index = milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex( - vec_field_data_type, type_params_str.c_str(), index_params_str.c_str()); + vec_field_data_type, type_params_str.c_str(), index_params_str.c_str(), storage_config_); auto dataset = GenDataset(NB, metric_type, is_binary); auto xb_data = dataset.get_col(milvus::FieldId(100)); @@ -119,7 +120,7 @@ TEST_P(IndexWrapperTest, BuildAndQuery) { ASSERT_NO_THROW(index->Build(xb_dataset)); auto binary_set = index->Serialize(); auto copy_index = milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex( - vec_field_data_type, type_params_str.c_str(), index_params_str.c_str()); + vec_field_data_type, type_params_str.c_str(), index_params_str.c_str(), storage_config_); auto vec_index = static_cast(copy_index.get()); ASSERT_EQ(vec_index->dim(), DIM); ASSERT_NO_THROW(vec_index->Load(binary_set)); diff --git a/internal/core/unittest/test_indexing.cpp b/internal/core/unittest/test_indexing.cpp index 1e510bf06a..e864946a99 100644 --- a/internal/core/unittest/test_indexing.cpp +++ b/internal/core/unittest/test_indexing.cpp @@ -15,7 +15,6 @@ #include #include #include -#include #include "faiss/utils/distances.h" #include "query/SearchBruteForce.h" @@ -28,7 +27,6 @@ #include "test_utils/Timer.h" #ifdef BUILD_DISK_ANN -#include #include "storage/MinioChunkManager.h" #include "storage/DiskFileManagerImpl.h" @@ -284,68 +282,11 @@ using Param = std::pair; class IndexTest : public ::testing::TestWithParam { protected: - //#ifdef BUILD_DISK_ANN - // bool - // FindFile(const path& dir, const std::string& file_name, path& path_found) { - // const recursive_directory_iterator end; - // boost::system::error_code err; - // auto iter = recursive_directory_iterator(dir, err); - // while (iter != end) { - // try { - // if ((*iter).path().filename() == file_name) { - // path_found = (*iter).path(); - // return true; - // } - // iter++; - // } catch (filesystem_error& e) { - // } catch (std::exception& e) { - // // ignore error - // } - // } - // return false; - // } - // - // void - // init_minio() { - // char testPath[100]; - // auto pwd = std::string(getcwd(testPath, sizeof(testPath))); - // path filepath; - // auto currentPath = path(pwd); - // while (!FindFile(currentPath, "milvus.yaml", filepath)) { - // currentPath = currentPath.append("../"); - // } - // auto configPath = filepath.string(); - // YAML::Node config; - // config = YAML::LoadFile(configPath); - // auto minioConfig = config["minio"]; - // auto address = minioConfig["address"].as(); - // auto port = minioConfig["port"].as(); - // auto endpoint = address + ":" + port; - // auto accessKey = minioConfig["accessKeyID"].as(); - // auto accessValue = minioConfig["secretAccessKey"].as(); - // auto useSSL = minioConfig["useSSL"].as(); - // auto bucketName = minioConfig["bucketName"].as(); - // - // ChunkMangerConfig::SetAddress(endpoint); - // ChunkMangerConfig::SetAccessKey(accessKey); - // ChunkMangerConfig::SetAccessValue(accessValue); - // ChunkMangerConfig::SetBucketName(bucketName); - // ChunkMangerConfig::SetUseSSL(useSSL); - // auto& chunk_manager = milvus::storage::MinioChunkManager::GetInstance(); - // chunk_manager.SetBucketName(bucketName); - // if (!chunk_manager.BucketExists(bucketName)) { - // chunk_manager.CreateBucket(bucketName); - // } - // } - //#endif - void SetUp() override { knowhere::KnowhereConfig::SetStatisticsLevel(3); knowhere::KnowhereConfig::SetIndexFileSliceSize(16); - //#ifdef BUILD_DISK_ANN - // init_minio(); - //#endif + storage_config_ = get_default_storage_config(); auto param = GetParam(); index_type = param.first; @@ -402,6 +343,7 @@ class IndexTest : public ::testing::TestWithParam { knowhere::DatasetPtr xq_dataset; int64_t query_offset = 100; int64_t NB = 10000; + StorageConfig storage_config_; }; INSTANTIATE_TEST_CASE_P( @@ -431,7 +373,8 @@ TEST_P(IndexTest, BuildAndQuery) { #ifdef BUILD_DISK_ANN milvus::storage::FieldDataMeta field_data_meta{1, 2, 3, 100}; milvus::storage::IndexMeta index_meta{3, 100, 1000, 1}; - auto file_manager = std::make_shared(field_data_meta, index_meta); + auto file_manager = + std::make_shared(field_data_meta, index_meta, storage_config_); index = milvus::index::IndexFactory::GetInstance().CreateIndex(create_index_info, file_manager); #endif } else { @@ -447,7 +390,8 @@ TEST_P(IndexTest, BuildAndQuery) { index.reset(); milvus::storage::FieldDataMeta field_data_meta{1, 2, 3, 100}; milvus::storage::IndexMeta index_meta{3, 100, 1000, 1}; - auto file_manager = std::make_shared(field_data_meta, index_meta); + auto file_manager = + std::make_shared(field_data_meta, index_meta, storage_config_); auto new_index = milvus::index::IndexFactory::GetInstance().CreateIndex(create_index_info, file_manager); vec_index = dynamic_cast(new_index.get()); diff --git a/internal/core/unittest/test_minio_chunk_manager.cpp b/internal/core/unittest/test_minio_chunk_manager.cpp index 3a68ade3fe..1553bb2256 100644 --- a/internal/core/unittest/test_minio_chunk_manager.cpp +++ b/internal/core/unittest/test_minio_chunk_manager.cpp @@ -9,16 +9,12 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License -#include -#include #include -#include -#include #include #include -#include #include "storage/MinioChunkManager.h" +#include "test_utils/indexbuilder_test_utils.h" using namespace std; using namespace milvus; @@ -32,121 +28,70 @@ class MinioChunkManagerTest : public testing::Test { ~MinioChunkManagerTest() { } - bool - FindFile(const path& dir, const string& file_name, path& path_found) { - const recursive_directory_iterator end; - boost::system::error_code err; - auto iter = recursive_directory_iterator(dir, err); - while (iter != end) { - try { - if ((*iter).path().filename() == file_name) { - path_found = (*iter).path(); - return true; - } - iter++; - } catch (filesystem_error& e) { - } catch (std::exception& e) { - // ignore error - } - } - return false; - } - - string - GetConfig() { - char testPath[100]; - auto pwd = string(getcwd(testPath, sizeof(testPath))); - path filepath; - auto currentPath = path(pwd); - while (!FindFile(currentPath, "milvus.yaml", filepath)) { - currentPath = currentPath.append("../"); - } - return filepath.string(); - } - virtual void SetUp() { - auto configPath = GetConfig(); - cout << configPath << endl; - YAML::Node config; - config = YAML::LoadFile(configPath); - auto minioConfig = config["minio"]; - auto address = minioConfig["address"].as(); - auto port = minioConfig["port"].as(); - auto endpoint = address + ":" + port; - auto accessKey = minioConfig["accessKeyID"].as(); - auto accessValue = minioConfig["secretAccessKey"].as(); - auto useSSL = minioConfig["useSSL"].as(); - auto bucketName = minioConfig["bucketName"].as(); - - ChunkMangerConfig::SetAddress(endpoint); - ChunkMangerConfig::SetAccessKey(accessKey); - ChunkMangerConfig::SetAccessValue(accessValue); - ChunkMangerConfig::SetBucketName(bucketName); - ChunkMangerConfig::SetUseSSL(useSSL); + chunk_manager_ = std::make_unique(get_default_storage_config()); } + + protected: + MinioChunkManagerPtr chunk_manager_; }; TEST_F(MinioChunkManagerTest, BucketPositive) { - auto& chunk_manager = MinioChunkManager::GetInstance(); string testBucketName = "test-bucket"; - chunk_manager.SetBucketName(testBucketName); - chunk_manager.DeleteBucket(testBucketName); - bool exist = chunk_manager.BucketExists(testBucketName); + chunk_manager_->SetBucketName(testBucketName); + bool exist = chunk_manager_->BucketExists(testBucketName); EXPECT_EQ(exist, false); - chunk_manager.CreateBucket(testBucketName); - exist = chunk_manager.BucketExists(testBucketName); + chunk_manager_->CreateBucket(testBucketName); + exist = chunk_manager_->BucketExists(testBucketName); EXPECT_EQ(exist, true); - chunk_manager.DeleteBucket(testBucketName); + chunk_manager_->DeleteBucket(testBucketName); } TEST_F(MinioChunkManagerTest, BucketNegtive) { - auto& chunk_manager = MinioChunkManager::GetInstance(); string testBucketName = "test-bucket-ng"; - chunk_manager.SetBucketName(testBucketName); - chunk_manager.DeleteBucket(testBucketName); + chunk_manager_->SetBucketName(testBucketName); + chunk_manager_->DeleteBucket(testBucketName); // create already exist bucket - chunk_manager.CreateBucket(testBucketName); + chunk_manager_->CreateBucket(testBucketName); try { - chunk_manager.CreateBucket(testBucketName); + chunk_manager_->CreateBucket(testBucketName); } catch (S3ErrorException& e) { EXPECT_TRUE(std::string(e.what()).find("BucketAlreadyOwnedByYou") != string::npos); } - chunk_manager.DeleteBucket(testBucketName); + chunk_manager_->DeleteBucket(testBucketName); } TEST_F(MinioChunkManagerTest, ObjectExist) { - auto& chunk_manager = MinioChunkManager::GetInstance(); string testBucketName = "test-objexist"; string objPath = "1/3"; - chunk_manager.SetBucketName(testBucketName); - if (!chunk_manager.BucketExists(testBucketName)) { - chunk_manager.CreateBucket(testBucketName); + chunk_manager_->SetBucketName(testBucketName); + if (!chunk_manager_->BucketExists(testBucketName)) { + chunk_manager_->CreateBucket(testBucketName); } - bool exist = chunk_manager.Exist(objPath); + bool exist = chunk_manager_->Exist(objPath); EXPECT_EQ(exist, false); - chunk_manager.DeleteBucket(testBucketName); + chunk_manager_->DeleteBucket(testBucketName); } TEST_F(MinioChunkManagerTest, WritePositive) { - auto& chunk_manager = MinioChunkManager::GetInstance(); string testBucketName = "test-write"; - chunk_manager.SetBucketName(testBucketName); - EXPECT_EQ(chunk_manager.GetBucketName(), testBucketName); + chunk_manager_->SetBucketName(testBucketName); + EXPECT_EQ(chunk_manager_->GetBucketName(), testBucketName); - if (!chunk_manager.BucketExists(testBucketName)) { - chunk_manager.CreateBucket(testBucketName); + if (!chunk_manager_->BucketExists(testBucketName)) { + chunk_manager_->CreateBucket(testBucketName); } uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23}; string path = "1/3/5"; - chunk_manager.Write(path, data, sizeof(data)); + chunk_manager_->Write(path, data, sizeof(data)); - bool exist = chunk_manager.Exist(path); + bool exist = chunk_manager_->Exist(path); EXPECT_EQ(exist, true); - auto size = chunk_manager.Size(path); + auto size = chunk_manager_->Size(path); EXPECT_EQ(size, 5); int datasize = 10000; @@ -155,34 +100,33 @@ TEST_F(MinioChunkManagerTest, WritePositive) { for (int i = 0; i < datasize; ++i) { bigdata[i] = rand() % 256; } - chunk_manager.Write(path, bigdata, datasize); - size = chunk_manager.Size(path); + chunk_manager_->Write(path, bigdata, datasize); + size = chunk_manager_->Size(path); EXPECT_EQ(size, datasize); delete[] bigdata; - chunk_manager.Remove(path); - chunk_manager.DeleteBucket(testBucketName); + chunk_manager_->Remove(path); + chunk_manager_->DeleteBucket(testBucketName); } TEST_F(MinioChunkManagerTest, ReadPositive) { - auto& chunk_manager = MinioChunkManager::GetInstance(); string testBucketName = "test-read"; - chunk_manager.SetBucketName(testBucketName); - EXPECT_EQ(chunk_manager.GetBucketName(), testBucketName); + chunk_manager_->SetBucketName(testBucketName); + EXPECT_EQ(chunk_manager_->GetBucketName(), testBucketName); - if (!chunk_manager.BucketExists(testBucketName)) { - chunk_manager.CreateBucket(testBucketName); + if (!chunk_manager_->BucketExists(testBucketName)) { + chunk_manager_->CreateBucket(testBucketName); } uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23}; string path = "1/4/6"; - chunk_manager.Write(path, data, sizeof(data)); - bool exist = chunk_manager.Exist(path); + chunk_manager_->Write(path, data, sizeof(data)); + bool exist = chunk_manager_->Exist(path); EXPECT_EQ(exist, true); - auto size = chunk_manager.Size(path); + auto size = chunk_manager_->Size(path); EXPECT_EQ(size, 5); uint8_t readdata[20] = {0}; - size = chunk_manager.Read(path, readdata, 20); + size = chunk_manager_->Read(path, readdata, 20); EXPECT_EQ(size, 5); EXPECT_EQ(readdata[0], 0x17); EXPECT_EQ(readdata[1], 0x32); @@ -190,19 +134,19 @@ TEST_F(MinioChunkManagerTest, ReadPositive) { EXPECT_EQ(readdata[3], 0x34); EXPECT_EQ(readdata[4], 0x23); - size = chunk_manager.Read(path, readdata, 3); + size = chunk_manager_->Read(path, readdata, 3); EXPECT_EQ(size, 3); EXPECT_EQ(readdata[0], 0x17); EXPECT_EQ(readdata[1], 0x32); EXPECT_EQ(readdata[2], 0x45); uint8_t dataWithNULL[] = {0x17, 0x32, 0x00, 0x34, 0x23}; - chunk_manager.Write(path, dataWithNULL, sizeof(dataWithNULL)); - exist = chunk_manager.Exist(path); + chunk_manager_->Write(path, dataWithNULL, sizeof(dataWithNULL)); + exist = chunk_manager_->Exist(path); EXPECT_EQ(exist, true); - size = chunk_manager.Size(path); + size = chunk_manager_->Size(path); EXPECT_EQ(size, 5); - size = chunk_manager.Read(path, readdata, 20); + size = chunk_manager_->Read(path, readdata, 20); EXPECT_EQ(size, 5); EXPECT_EQ(readdata[0], 0x17); EXPECT_EQ(readdata[1], 0x32); @@ -210,69 +154,67 @@ TEST_F(MinioChunkManagerTest, ReadPositive) { EXPECT_EQ(readdata[3], 0x34); EXPECT_EQ(readdata[4], 0x23); - chunk_manager.Remove(path); - chunk_manager.DeleteBucket(testBucketName); + chunk_manager_->Remove(path); + chunk_manager_->DeleteBucket(testBucketName); } TEST_F(MinioChunkManagerTest, RemovePositive) { - auto& chunk_manager = MinioChunkManager::GetInstance(); string testBucketName = "test-remove"; - chunk_manager.SetBucketName(testBucketName); - EXPECT_EQ(chunk_manager.GetBucketName(), testBucketName); + chunk_manager_->SetBucketName(testBucketName); + EXPECT_EQ(chunk_manager_->GetBucketName(), testBucketName); - if (!chunk_manager.BucketExists(testBucketName)) { - chunk_manager.CreateBucket(testBucketName); + if (!chunk_manager_->BucketExists(testBucketName)) { + chunk_manager_->CreateBucket(testBucketName); } uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23}; string path = "1/7/8"; - chunk_manager.Write(path, data, sizeof(data)); + chunk_manager_->Write(path, data, sizeof(data)); - bool exist = chunk_manager.Exist(path); + bool exist = chunk_manager_->Exist(path); EXPECT_EQ(exist, true); - chunk_manager.Remove(path); + chunk_manager_->Remove(path); - exist = chunk_manager.Exist(path); + exist = chunk_manager_->Exist(path); EXPECT_EQ(exist, false); - chunk_manager.DeleteBucket(testBucketName); + chunk_manager_->DeleteBucket(testBucketName); } TEST_F(MinioChunkManagerTest, ListWithPrefixPositive) { - auto& chunk_manager = MinioChunkManager::GetInstance(); string testBucketName = "test-listprefix"; - chunk_manager.SetBucketName(testBucketName); - EXPECT_EQ(chunk_manager.GetBucketName(), testBucketName); + chunk_manager_->SetBucketName(testBucketName); + EXPECT_EQ(chunk_manager_->GetBucketName(), testBucketName); - if (!chunk_manager.BucketExists(testBucketName)) { - chunk_manager.CreateBucket(testBucketName); + if (!chunk_manager_->BucketExists(testBucketName)) { + chunk_manager_->CreateBucket(testBucketName); } string path1 = "1/7/8"; string path2 = "1/7/4"; string path3 = "1/4/8"; uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23}; - chunk_manager.Write(path1, data, sizeof(data)); - chunk_manager.Write(path2, data, sizeof(data)); - chunk_manager.Write(path3, data, sizeof(data)); + chunk_manager_->Write(path1, data, sizeof(data)); + chunk_manager_->Write(path2, data, sizeof(data)); + chunk_manager_->Write(path3, data, sizeof(data)); - vector objs = chunk_manager.ListWithPrefix("1/7"); + vector objs = chunk_manager_->ListWithPrefix("1/7"); EXPECT_EQ(objs.size(), 2); std::sort(objs.begin(), objs.end()); EXPECT_EQ(objs[0], "1/7/4"); EXPECT_EQ(objs[1], "1/7/8"); - objs = chunk_manager.ListWithPrefix("//1/7"); + objs = chunk_manager_->ListWithPrefix("//1/7"); EXPECT_EQ(objs.size(), 2); - objs = chunk_manager.ListWithPrefix("1"); + objs = chunk_manager_->ListWithPrefix("1"); EXPECT_EQ(objs.size(), 3); std::sort(objs.begin(), objs.end()); EXPECT_EQ(objs[0], "1/4/8"); EXPECT_EQ(objs[1], "1/7/4"); - chunk_manager.Remove(path1); - chunk_manager.Remove(path2); - chunk_manager.Remove(path3); - chunk_manager.DeleteBucket(testBucketName); + chunk_manager_->Remove(path1); + chunk_manager_->Remove(path2); + chunk_manager_->Remove(path3); + chunk_manager_->DeleteBucket(testBucketName); } diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index 49c6f4643d..87e24b5787 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -17,12 +17,11 @@ #include "segcore/SegmentSealedImpl.h" #include "test_utils/DataGen.h" #include "index/IndexFactory.h" -#include "segcore/segcore_init_c.h" using namespace milvus; using namespace milvus::query; using namespace milvus::segcore; -using milvus::index::LoadIndexInfo; +using milvus::segcore::LoadIndexInfo; const int64_t ROW_COUNT = 100 * 1000; diff --git a/internal/core/unittest/test_segcore.cpp b/internal/core/unittest/test_segcore.cpp index b926f1c5b5..25062f5616 100644 --- a/internal/core/unittest/test_segcore.cpp +++ b/internal/core/unittest/test_segcore.cpp @@ -88,12 +88,11 @@ TEST(InsertRecordTest, growing_int64_t) { auto i64_fid = schema->AddDebugField("age", DataType::INT64); schema->set_primary_field_id(i64_fid); auto record = milvus::segcore::InsertRecord(*schema, int64_t(32)); - const int N=100000; + const int N = 100000; - for (int i = 1; i <= N; i++) - record.insert_pk(PkType(int64_t(i)), int64_t(i)); + for (int i = 1; i <= N; i++) record.insert_pk(PkType(int64_t(i)), int64_t(i)); - for (int i = 1; i <= N; i++){ + for (int i = 1; i <= N; i++) { std::vector offset = record.search_pk(PkType(int64_t(i)), int64_t(N + 1)); ASSERT_EQ(offset[0].get(), int64_t(i)); } @@ -108,10 +107,9 @@ TEST(InsertRecordTest, growing_string) { auto record = milvus::segcore::InsertRecord(*schema, int64_t(32)); const int N = 100000; - for (int i = 1; i <= N; i++) - record.insert_pk(PkType(std::to_string(i)), int64_t(i)); + for (int i = 1; i <= N; i++) record.insert_pk(PkType(std::to_string(i)), int64_t(i)); - for (int i = 1; i <= N; i++){ + for (int i = 1; i <= N; i++) { std::vector offset = record.search_pk(std::to_string(i), int64_t(N + 1)); ASSERT_EQ(offset[0].get(), int64_t(i)); } @@ -126,11 +124,10 @@ TEST(InsertRecordTest, sealed_int64_t) { auto record = milvus::segcore::InsertRecord(*schema, int64_t(32)); const int N = 100000; - for (int i = N; i >= 1; i--) - record.insert_pk(PkType(int64_t(i)), int64_t(i)); + for (int i = N; i >= 1; i--) record.insert_pk(PkType(int64_t(i)), int64_t(i)); record.seal_pks(); - for (int i = 1;i <= N; i++){ + for (int i = 1; i <= N; i++) { std::vector offset = record.search_pk(PkType(int64_t(i)), int64_t(N + 1)); ASSERT_EQ(offset[0].get(), int64_t(i)); } @@ -145,12 +142,11 @@ TEST(InsertRecordTest, sealed_string) { auto record = milvus::segcore::InsertRecord(*schema, int64_t(32)); const int N = 100000; - for (int i = 1; i <= N; i++) - record.insert_pk(PkType(std::to_string(i)), int64_t(i)); + for (int i = 1; i <= N; i++) record.insert_pk(PkType(std::to_string(i)), int64_t(i)); record.seal_pks(); - for (int i = 1; i <= N; i++){ + for (int i = 1; i <= N; i++) { std::vector offset = record.search_pk(std::to_string(i), int64_t(N + 1)); ASSERT_EQ(offset[0].get(), int64_t(i)); } diff --git a/internal/core/unittest/test_utils/indexbuilder_test_utils.h b/internal/core/unittest/test_utils/indexbuilder_test_utils.h index 80d65fe758..232b4c9636 100644 --- a/internal/core/unittest/test_utils/indexbuilder_test_utils.h +++ b/internal/core/unittest/test_utils/indexbuilder_test_utils.h @@ -16,6 +16,8 @@ #include #include #include +#include +#include #include "DataGen.h" #include "index/ScalarIndex.h" @@ -28,6 +30,7 @@ #include "knowhere/index/vector_index/helpers/IndexParameter.h" #include "knowhere/index/vector_index/adapter/VectorAdapter.h" #include "pb/index_cgo_msg.pb.h" +#include "storage/Types.h" constexpr int64_t DIM = 16; constexpr int64_t NQ = 10; @@ -43,8 +46,93 @@ using milvus::indexbuilder::ScalarIndexCreator; using ScalarTestParams = std::pair; using milvus::index::ScalarIndexPtr; using milvus::index::StringIndexPtr; +using milvus::storage::StorageConfig; +using namespace boost::filesystem; namespace { + +bool +find_file(const path& dir, const std::string& file_name, path& path_found) { + const recursive_directory_iterator end; + boost::system::error_code err; + auto iter = recursive_directory_iterator(dir, err); + while (iter != end) { + try { + if ((*iter).path().filename() == file_name) { + path_found = (*iter).path(); + return true; + } + iter++; + } catch (filesystem_error& e) { + } catch (std::exception& e) { + // ignore error + } + } + return false; +} + +StorageConfig +get_default_storage_config() { + char testPath[100]; + auto pwd = std::string(getcwd(testPath, sizeof(testPath))); + path filepath; + auto currentPath = path(pwd); + while (!find_file(currentPath, "milvus.yaml", filepath)) { + currentPath = currentPath.append("../"); + } + auto configPath = filepath.string(); + YAML::Node config; + config = YAML::LoadFile(configPath); + auto minioConfig = config["minio"]; + auto address = minioConfig["address"].as(); + auto port = minioConfig["port"].as(); + auto endpoint = address + ":" + port; + auto accessKey = minioConfig["accessKeyID"].as(); + auto accessValue = minioConfig["secretAccessKey"].as(); + auto rootPath = minioConfig["rootPath"].as(); + auto useSSL = minioConfig["useSSL"].as(); + auto useIam = minioConfig["useIAM"].as(); + auto iamEndPoint = minioConfig["iamEndpoint"].as(); + auto bucketName = minioConfig["bucketName"].as(); + + return StorageConfig{endpoint, bucketName, accessKey, accessValue, rootPath, "minio", iamEndPoint, useSSL, useIam}; +} + +CStorageConfig +get_default_cstorage_config() { + char testPath[100]; + auto pwd = std::string(getcwd(testPath, sizeof(testPath))); + path filepath; + auto currentPath = path(pwd); + while (!find_file(currentPath, "milvus.yaml", filepath)) { + currentPath = currentPath.append("../"); + } + auto configPath = filepath.string(); + YAML::Node config; + config = YAML::LoadFile(configPath); + auto minioConfig = config["minio"]; + auto address = minioConfig["address"].as(); + auto port = minioConfig["port"].as(); + auto endpoint = address + ":" + port; + auto accessKey = minioConfig["accessKeyID"].as(); + auto accessValue = minioConfig["secretAccessKey"].as(); + auto rootPath = minioConfig["rootPath"].as(); + auto useSSL = minioConfig["useSSL"].as(); + auto useIam = minioConfig["useIAM"].as(); + auto iamEndPoint = minioConfig["iamEndpoint"].as(); + auto bucketName = minioConfig["bucketName"].as(); + + return CStorageConfig{endpoint.c_str(), + bucketName.c_str(), + accessKey.c_str(), + accessValue.c_str(), + rootPath.c_str(), + "minio", + iamEndPoint.c_str(), + useSSL, + useIam}; +} + auto generate_build_conf(const milvus::IndexType& index_type, const milvus::MetricType& metric_type) { if (index_type == knowhere::IndexEnum::INDEX_FAISS_IDMAP) { diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index ab8cf3be0c..77ec809560 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -156,7 +156,6 @@ func (i *IndexNode) initKnowhere() { C.IndexBuilderSetIndexSliceSize(cIndexSliceSize) initcore.InitLocalStorageConfig(&Params) - initcore.InitMinioConfig(&Params) } func (i *IndexNode) initSession() error { diff --git a/internal/indexnode/indexnode_service_test.go b/internal/indexnode/indexnode_service_test.go index 1adca2f010..a3ac9770bb 100644 --- a/internal/indexnode/indexnode_service_test.go +++ b/internal/indexnode/indexnode_service_test.go @@ -16,6 +16,19 @@ import ( "github.com/milvus-io/milvus/internal/util/metricsinfo" ) +func genStorageConfig() *indexpb.StorageConfig { + return &indexpb.StorageConfig{ + Address: Params.MinioCfg.Address, + AccessKeyID: Params.MinioCfg.AccessKeyID, + SecretAccessKey: Params.MinioCfg.SecretAccessKey, + BucketName: Params.MinioCfg.BucketName, + RootPath: Params.MinioCfg.RootPath, + IAMEndpoint: Params.MinioCfg.IAMEndpoint, + UseSSL: Params.MinioCfg.UseSSL, + UseIAM: Params.MinioCfg.UseIAM, + } +} + func TestIndexNodeSimple(t *testing.T) { in, err := NewMockIndexNodeComponent(context.TODO()) assert.Nil(t, err) @@ -71,6 +84,7 @@ func TestIndexNodeSimple(t *testing.T) { IndexName: idxName, IndexParams: indexParams, TypeParams: typeParams, + StorageConfig: genStorageConfig(), } status, err := in.CreateJob(ctx, createReq) assert.Nil(t, err) @@ -242,6 +256,7 @@ func TestIndexNodeComplex(t *testing.T) { IndexName: fmt.Sprintf("idx%d", tasks[i].idxID), IndexParams: tasks[i].idxParams, TypeParams: tasks[i].typeParams, + StorageConfig: genStorageConfig(), } testwg.Add(1) go func() { diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index a116b1dad8..e078d6b807 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -240,7 +240,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error { dType := dataset.DType var err error if dType != schemapb.DataType_None { - it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams) + it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams, it.req.GetStorageConfig()) if err != nil { log.Ctx(ctx).Error("failed to create index", zap.Error(err)) return err @@ -335,7 +335,7 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error { it.newIndexParams["index_id"] = strconv.FormatInt(it.req.IndexID, 10) it.newIndexParams["index_version"] = strconv.FormatInt(it.req.GetIndexVersion(), 10) - it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams) + it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams, it.req.GetStorageConfig()) if err != nil { log.Ctx(ctx).Error("failed to create index", zap.Error(err)) return err diff --git a/internal/querynode/load_index_info.go b/internal/querynode/load_index_info.go index f3306faee5..006b4a97d0 100644 --- a/internal/querynode/load_index_info.go +++ b/internal/querynode/load_index_info.go @@ -40,7 +40,36 @@ type LoadIndexInfo struct { // newLoadIndexInfo returns a new LoadIndexInfo and error func newLoadIndexInfo() (*LoadIndexInfo, error) { var cLoadIndexInfo C.CLoadIndexInfo - status := C.NewLoadIndexInfo(&cLoadIndexInfo) + + // TODO::xige-16 support embedded milvus + storageType := "minio" + cAddress := C.CString(Params.MinioCfg.Address) + cBucketName := C.CString(Params.MinioCfg.BucketName) + cAccessKey := C.CString(Params.MinioCfg.AccessKeyID) + cAccessValue := C.CString(Params.MinioCfg.SecretAccessKey) + cRootPath := C.CString(Params.MinioCfg.RootPath) + cStorageType := C.CString(storageType) + cIamEndPoint := C.CString(Params.MinioCfg.IAMEndpoint) + defer C.free(unsafe.Pointer(cAddress)) + defer C.free(unsafe.Pointer(cBucketName)) + defer C.free(unsafe.Pointer(cAccessKey)) + defer C.free(unsafe.Pointer(cAccessValue)) + defer C.free(unsafe.Pointer(cRootPath)) + defer C.free(unsafe.Pointer(cStorageType)) + defer C.free(unsafe.Pointer(cIamEndPoint)) + storageConfig := C.CStorageConfig{ + address: cAddress, + bucket_name: cBucketName, + access_key_id: cAccessKey, + access_key_value: cAccessValue, + remote_root_path: cRootPath, + storage_type: cStorageType, + iam_endpoint: cIamEndPoint, + useSSL: C.bool(Params.MinioCfg.UseSSL), + useIAM: C.bool(Params.MinioCfg.UseIAM), + } + + status := C.NewLoadIndexInfo(&cLoadIndexInfo, storageConfig) if err := HandleCStatus(&status, "NewLoadIndexInfo failed"); err != nil { return nil, err } diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index eb8e6357b1..9568847e5b 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -27,11 +27,6 @@ import ( "runtime" "strconv" - "github.com/milvus-io/milvus/internal/util/concurrency" - "github.com/milvus-io/milvus/internal/util/dependency" - "github.com/milvus-io/milvus/internal/util/indexcgowrapper" - "github.com/milvus-io/milvus/internal/util/typeutil" - "github.com/golang/protobuf/proto" "go.uber.org/zap" @@ -43,13 +38,18 @@ import ( "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/planpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util" + "github.com/milvus-io/milvus/internal/util/concurrency" + "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/indexcgowrapper" + "github.com/milvus-io/milvus/internal/util/typeutil" ) // ---------- unittest util functions ---------- @@ -278,7 +278,7 @@ func genVectorFieldSchema(param vecFieldParam) *schemapb.FieldSchema { func genIndexBinarySet() ([][]byte, error) { typeParams, indexParams := genIndexParams(IndexFaissIVFPQ, L2) - index, err := indexcgowrapper.NewCgoIndex(schemapb.DataType_FloatVector, typeParams, indexParams) + index, err := indexcgowrapper.NewCgoIndex(schemapb.DataType_FloatVector, typeParams, indexParams, genStorageConfig()) if err != nil { return nil, err } @@ -375,7 +375,7 @@ func generateAndSaveIndex(segmentID UniqueID, msgLength int, indexType, metricTy }) } - index, err := indexcgowrapper.NewCgoIndex(schemapb.DataType_FloatVector, typeParams, indexParams) + index, err := indexcgowrapper.NewCgoIndex(schemapb.DataType_FloatVector, typeParams, indexParams, genStorageConfig()) if err != nil { return nil, err } @@ -424,6 +424,19 @@ func generateAndSaveIndex(segmentID UniqueID, msgLength int, indexType, metricTy return indexPaths, nil } +func genStorageConfig() *indexpb.StorageConfig { + return &indexpb.StorageConfig{ + Address: Params.MinioCfg.Address, + AccessKeyID: Params.MinioCfg.AccessKeyID, + SecretAccessKey: Params.MinioCfg.SecretAccessKey, + BucketName: Params.MinioCfg.BucketName, + RootPath: Params.MinioCfg.RootPath, + IAMEndpoint: Params.MinioCfg.IAMEndpoint, + UseSSL: Params.MinioCfg.UseSSL, + UseIAM: Params.MinioCfg.UseIAM, + } +} + func genIndexParams(indexType, metricType string) (map[string]string, map[string]string) { typeParams := make(map[string]string) typeParams["dim"] = strconv.Itoa(defaultDim) diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index afdec7dd27..e245af6c8a 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -219,7 +219,6 @@ func (node *QueryNode) InitSegcore() { C.SegcoreSetIndexSliceSize(cIndexSliceSize) initcore.InitLocalStorageConfig(&Params) - initcore.InitMinioConfig(&Params) } // Init function init historical and streaming module to manage segments diff --git a/internal/util/indexcgowrapper/codec_index_test.go b/internal/util/indexcgowrapper/codec_index_test.go index 65426bdfcb..dc07d92014 100644 --- a/internal/util/indexcgowrapper/codec_index_test.go +++ b/internal/util/indexcgowrapper/codec_index_test.go @@ -10,11 +10,10 @@ import ( "github.com/stretchr/testify/assert" - "github.com/milvus-io/milvus/internal/util/funcutil" - - "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/api/schemapb" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/funcutil" ) type indexTestCase struct { @@ -293,9 +292,26 @@ func genIndexCase() []indexTestCase { return ret } +func genStorageConfig() *indexpb.StorageConfig { + InitOnce.Do(func() { + Params.Init() + }) + + return &indexpb.StorageConfig{ + Address: Params.MinioCfg.Address, + AccessKeyID: Params.MinioCfg.AccessKeyID, + SecretAccessKey: Params.MinioCfg.SecretAccessKey, + BucketName: Params.MinioCfg.BucketName, + RootPath: Params.MinioCfg.RootPath, + IAMEndpoint: Params.MinioCfg.IAMEndpoint, + UseSSL: Params.MinioCfg.UseSSL, + UseIAM: Params.MinioCfg.UseIAM, + } +} + func TestCgoIndex(t *testing.T) { for _, testCase := range genIndexCase() { - index, err := NewCgoIndex(testCase.dtype, testCase.typeParams, testCase.indexParams) + index, err := NewCgoIndex(testCase.dtype, testCase.typeParams, testCase.indexParams, genStorageConfig()) assert.NoError(t, err, testCase) dataset := GenDataset(genFieldData(testCase.dtype, nb, dim)) @@ -304,7 +320,7 @@ func TestCgoIndex(t *testing.T) { blobs, err := index.Serialize() assert.NoError(t, err, testCase) - copyIndex, err := NewCgoIndex(testCase.dtype, testCase.typeParams, testCase.indexParams) + copyIndex, err := NewCgoIndex(testCase.dtype, testCase.typeParams, testCase.indexParams, genStorageConfig()) assert.NoError(t, err, testCase) assert.NoError(t, copyIndex.Load(blobs), testCase) diff --git a/internal/util/indexcgowrapper/index.go b/internal/util/indexcgowrapper/index.go index dd8bada457..a16361a3af 100644 --- a/internal/util/indexcgowrapper/index.go +++ b/internal/util/indexcgowrapper/index.go @@ -9,6 +9,7 @@ package indexcgowrapper import "C" import ( "fmt" + "github.com/milvus-io/milvus/internal/proto/indexpb" "path/filepath" "runtime" "unsafe" @@ -49,7 +50,7 @@ type CgoIndex struct { } // TODO: use proto.Marshal instead of proto.MarshalTextString for better compatibility. -func NewCgoIndex(dtype schemapb.DataType, typeParams, indexParams map[string]string) (*CgoIndex, error) { +func NewCgoIndex(dtype schemapb.DataType, typeParams, indexParams map[string]string, config *indexpb.StorageConfig) (*CgoIndex, error) { protoTypeParams := &indexcgopb.TypeParams{ Params: make([]*commonpb.KeyValuePair, 0), } @@ -71,9 +72,37 @@ func NewCgoIndex(dtype schemapb.DataType, typeParams, indexParams map[string]str defer C.free(unsafe.Pointer(typeParamsPointer)) defer C.free(unsafe.Pointer(indexParamsPointer)) + // TODO::xige-16 support embedded milvus + storageType := "minio" + cAddress := C.CString(config.Address) + cBucketName := C.CString(config.GetBucketName()) + cAccessKey := C.CString(config.GetAccessKeyID()) + cAccessValue := C.CString(config.GetSecretAccessKey()) + cRootPath := C.CString(config.GetRootPath()) + cStorageType := C.CString(storageType) + cIamEndPoint := C.CString(config.GetIAMEndpoint()) + defer C.free(unsafe.Pointer(cAddress)) + defer C.free(unsafe.Pointer(cBucketName)) + defer C.free(unsafe.Pointer(cAccessKey)) + defer C.free(unsafe.Pointer(cAccessValue)) + defer C.free(unsafe.Pointer(cRootPath)) + defer C.free(unsafe.Pointer(cStorageType)) + defer C.free(unsafe.Pointer(cIamEndPoint)) + storageConfig := C.CStorageConfig{ + address: cAddress, + bucket_name: cBucketName, + access_key_id: cAccessKey, + access_key_value: cAccessValue, + remote_root_path: cRootPath, + storage_type: cStorageType, + iam_endpoint: cIamEndPoint, + useSSL: C.bool(config.GetUseSSL()), + useIAM: C.bool(config.GetUseIAM()), + } + var indexPtr C.CIndex cintDType := uint32(dtype) - status := C.CreateIndex(cintDType, typeParamsPointer, indexParamsPointer, &indexPtr) + status := C.CreateIndex(cintDType, typeParamsPointer, indexParamsPointer, &indexPtr, storageConfig) if err := HandleCStatus(&status, "failed to create index"); err != nil { return nil, err } diff --git a/internal/util/indexcgowrapper/index_test.go b/internal/util/indexcgowrapper/index_test.go index 44307d7a93..8319aad712 100644 --- a/internal/util/indexcgowrapper/index_test.go +++ b/internal/util/indexcgowrapper/index_test.go @@ -5,10 +5,13 @@ package indexcgowrapper import ( "strconv" + "sync" "testing" - "github.com/milvus-io/milvus/api/schemapb" "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/api/schemapb" + "github.com/milvus-io/milvus/internal/util/paramtable" ) const ( @@ -40,6 +43,9 @@ const ( ef = 200 ) +var Params paramtable.ServiceParam +var InitOnce sync.Once + type vecTestCase struct { indexType string metricType string @@ -129,7 +135,7 @@ func TestCIndex_New(t *testing.T) { for _, c := range generateTestCases() { typeParams, indexParams := generateParams(c.indexType, c.metricType) - index, err := NewCgoIndex(c.dtype, typeParams, indexParams) + index, err := NewCgoIndex(c.dtype, typeParams, indexParams, genStorageConfig()) assert.Equal(t, err, nil) assert.NotEqual(t, index, nil) @@ -142,7 +148,7 @@ func TestCIndex_BuildFloatVecIndex(t *testing.T) { for _, c := range generateFloatVectorTestCases() { typeParams, indexParams := generateParams(c.indexType, c.metricType) - index, err := NewCgoIndex(c.dtype, typeParams, indexParams) + index, err := NewCgoIndex(c.dtype, typeParams, indexParams, genStorageConfig()) assert.Equal(t, err, nil) assert.NotEqual(t, index, nil) @@ -159,7 +165,7 @@ func TestCIndex_BuildBinaryVecIndex(t *testing.T) { for _, c := range generateBinaryVectorTestCases() { typeParams, indexParams := generateParams(c.indexType, c.metricType) - index, err := NewCgoIndex(c.dtype, typeParams, indexParams) + index, err := NewCgoIndex(c.dtype, typeParams, indexParams, genStorageConfig()) assert.Equal(t, err, nil) assert.NotEqual(t, index, nil) @@ -176,7 +182,7 @@ func TestCIndex_Codec(t *testing.T) { for _, c := range generateTestCases() { typeParams, indexParams := generateParams(c.indexType, c.metricType) - index, err := NewCgoIndex(c.dtype, typeParams, indexParams) + index, err := NewCgoIndex(c.dtype, typeParams, indexParams, genStorageConfig()) assert.Equal(t, err, nil) assert.NotEqual(t, index, nil) @@ -193,7 +199,7 @@ func TestCIndex_Codec(t *testing.T) { blobs, err := index.Serialize() assert.Equal(t, err, nil) - copyIndex, err := NewCgoIndex(c.dtype, typeParams, indexParams) + copyIndex, err := NewCgoIndex(c.dtype, typeParams, indexParams, genStorageConfig()) assert.NotEqual(t, copyIndex, nil) assert.Equal(t, err, nil) err = copyIndex.Load(blobs) @@ -214,7 +220,7 @@ func TestCIndex_Delete(t *testing.T) { for _, c := range generateTestCases() { typeParams, indexParams := generateParams(c.indexType, c.metricType) - index, err := NewCgoIndex(c.dtype, typeParams, indexParams) + index, err := NewCgoIndex(c.dtype, typeParams, indexParams, genStorageConfig()) assert.Equal(t, err, nil) assert.NotEqual(t, index, nil) @@ -227,7 +233,7 @@ func TestCIndex_Error(t *testing.T) { indexParams := make(map[string]string) indexParams["index_type"] = "IVF_FLAT" indexParams["metric_type"] = "L2" - indexPtr, err := NewCgoIndex(schemapb.DataType_FloatVector, nil, indexParams) + indexPtr, err := NewCgoIndex(schemapb.DataType_FloatVector, nil, indexParams, genStorageConfig()) assert.Nil(t, err) t.Run("Serialize error", func(t *testing.T) { diff --git a/internal/util/initcore/init_storage_config.go b/internal/util/initcore/init_storage_config.go index 65d968e155..e2508dbdc5 100644 --- a/internal/util/initcore/init_storage_config.go +++ b/internal/util/initcore/init_storage_config.go @@ -27,40 +27,11 @@ import "C" import ( "os" "path/filepath" - "strings" "unsafe" "github.com/milvus-io/milvus/internal/util/paramtable" ) -func InitMinioConfig(params *paramtable.ComponentParam) { - CMinioAddress := C.CString(params.MinioCfg.Address) - C.MinioAddressInit(CMinioAddress) - C.free(unsafe.Pointer(CMinioAddress)) - - CMinioAccessKey := C.CString(params.MinioCfg.AccessKeyID) - C.MinioAccessKeyInit(CMinioAccessKey) - C.free(unsafe.Pointer(CMinioAccessKey)) - - CMinioAccessValue := C.CString(params.MinioCfg.SecretAccessKey) - C.MinioAccessValueInit(CMinioAccessValue) - C.free(unsafe.Pointer(CMinioAccessValue)) - - CUseSSL := C.bool(params.MinioCfg.UseSSL) - C.MinioSSLInit(CUseSSL) - - CUseIam := C.bool(params.MinioCfg.UseIAM) - C.MinioUseIamInit(CUseIam) - - CMinioBucketName := C.CString(strings.TrimLeft(params.MinioCfg.BucketName, "/")) - C.MinioBucketNameInit(CMinioBucketName) - C.free(unsafe.Pointer(CMinioBucketName)) - - CMinioRootPath := C.CString(params.MinioCfg.RootPath) - C.MinioRootPathInit(CMinioRootPath) - C.free(unsafe.Pointer(CMinioRootPath)) -} - func InitLocalStorageConfig(params *paramtable.ComponentParam) { b, _ := os.Getwd() LocalRootPath := filepath.Dir(b) + "/" + filepath.Base(b) + "/" + "data/"