mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
Assign different storage config for indexes (#19517)
Signed-off-by: xige-16 <xi.ge@zilliz.com> Signed-off-by: xige-16 <xi.ge@zilliz.com>
This commit is contained in:
parent
089912baa7
commit
8c9c1672ae
@ -19,46 +19,6 @@
|
||||
#include <string>
|
||||
#include "config/ConfigChunkManager.h"
|
||||
|
||||
void
|
||||
MinioAddressInit(const char* address) {
|
||||
std::string minio_address(address);
|
||||
milvus::ChunkMangerConfig::SetAddress(address);
|
||||
}
|
||||
|
||||
void
|
||||
MinioAccessKeyInit(const char* key) {
|
||||
std::string minio_access_key(key);
|
||||
milvus::ChunkMangerConfig::SetAccessKey(minio_access_key);
|
||||
}
|
||||
|
||||
void
|
||||
MinioAccessValueInit(const char* value) {
|
||||
std::string minio_access_value(value);
|
||||
milvus::ChunkMangerConfig::SetAccessValue(value);
|
||||
}
|
||||
|
||||
void
|
||||
MinioSSLInit(bool use_ssl) {
|
||||
milvus::ChunkMangerConfig::SetUseSSL(use_ssl);
|
||||
}
|
||||
|
||||
void
|
||||
MinioUseIamInit(bool use_iam) {
|
||||
milvus::ChunkMangerConfig::SetUseIAM(use_iam);
|
||||
}
|
||||
|
||||
void
|
||||
MinioBucketNameInit(const char* name) {
|
||||
std::string bucket_name(name);
|
||||
milvus::ChunkMangerConfig::SetBucketName(name);
|
||||
}
|
||||
|
||||
void
|
||||
MinioRootPathInit(const char* name) {
|
||||
std::string root_path(name);
|
||||
milvus::ChunkMangerConfig::SetRemoteRootPath(name);
|
||||
}
|
||||
|
||||
void
|
||||
LocalRootPathInit(const char* root_path) {
|
||||
std::string local_path_root(root_path);
|
||||
|
||||
@ -22,27 +22,6 @@ extern "C" {
|
||||
|
||||
#include <stdbool.h>
|
||||
|
||||
void
|
||||
MinioAddressInit(const char*);
|
||||
|
||||
void
|
||||
MinioAccessKeyInit(const char*);
|
||||
|
||||
void
|
||||
MinioAccessValueInit(const char*);
|
||||
|
||||
void
|
||||
MinioSSLInit(bool use_ssl);
|
||||
|
||||
void
|
||||
MinioUseIamInit(bool use_iam);
|
||||
|
||||
void
|
||||
MinioBucketNameInit(const char*);
|
||||
|
||||
void
|
||||
MinioRootPathInit(const char*);
|
||||
|
||||
void
|
||||
LocalRootPathInit(const char*);
|
||||
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
#pragma once
|
||||
#include <stdbool.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
@ -82,6 +83,18 @@ typedef struct CLoadDeletedRecordInfo {
|
||||
int64_t row_count;
|
||||
} CLoadDeletedRecordInfo;
|
||||
|
||||
typedef struct CStorageConfig {
|
||||
const char* address;
|
||||
const char* bucket_name;
|
||||
const char* access_key_id;
|
||||
const char* access_key_value;
|
||||
const char* remote_root_path;
|
||||
const char* storage_type;
|
||||
const char* iam_endpoint;
|
||||
bool useSSL;
|
||||
bool useIAM;
|
||||
} CStorageConfig;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
@ -18,84 +18,7 @@
|
||||
|
||||
namespace milvus::ChunkMangerConfig {
|
||||
|
||||
std::string REMOTE_ADDRESS = "localhost:9000"; // NOLINT
|
||||
std::string REMOTE_ACCESS_KEY = "minioadmin"; // NOLINT
|
||||
std::string REMOTE_ACCESS_VALUE = "minioadmin"; // NOLINT
|
||||
std::string REMOTE_BUCKET_NAME = "a-bucket"; // NOLINT
|
||||
std::string REMOTE_ROOT_PATH = "files"; // NOLINT
|
||||
std::string LOCAL_ROOT_PATH = "/tmp/milvus"; // NOLINT
|
||||
bool MINIO_USE_SSL = false;
|
||||
bool MINIO_USE_IAM = false;
|
||||
|
||||
void
|
||||
SetAddress(const std::string& address) {
|
||||
REMOTE_ADDRESS = address;
|
||||
}
|
||||
|
||||
std::string
|
||||
GetAddress() {
|
||||
return REMOTE_ADDRESS;
|
||||
}
|
||||
|
||||
void
|
||||
SetAccessKey(const std::string& access_key) {
|
||||
REMOTE_ACCESS_KEY = access_key;
|
||||
}
|
||||
|
||||
std::string
|
||||
GetAccessKey() {
|
||||
return REMOTE_ACCESS_KEY;
|
||||
}
|
||||
|
||||
void
|
||||
SetAccessValue(const std::string& access_value) {
|
||||
REMOTE_ACCESS_VALUE = access_value;
|
||||
}
|
||||
|
||||
std::string
|
||||
GetAccessValue() {
|
||||
return REMOTE_ACCESS_VALUE;
|
||||
}
|
||||
|
||||
void
|
||||
SetBucketName(const std::string& bucket_name) {
|
||||
REMOTE_BUCKET_NAME = bucket_name;
|
||||
}
|
||||
|
||||
std::string
|
||||
GetBucketName() {
|
||||
return REMOTE_BUCKET_NAME;
|
||||
}
|
||||
|
||||
void
|
||||
SetUseSSL(bool use_ssl) {
|
||||
MINIO_USE_SSL = use_ssl;
|
||||
}
|
||||
|
||||
bool
|
||||
GetUseSSL() {
|
||||
return MINIO_USE_SSL;
|
||||
}
|
||||
|
||||
void
|
||||
SetUseIAM(bool use_iam) {
|
||||
MINIO_USE_IAM = use_iam;
|
||||
}
|
||||
|
||||
bool
|
||||
GetUseIAM() {
|
||||
return MINIO_USE_IAM;
|
||||
}
|
||||
|
||||
void
|
||||
SetRemoteRootPath(const std::string& root_path) {
|
||||
REMOTE_ROOT_PATH = root_path;
|
||||
}
|
||||
|
||||
std::string
|
||||
GetRemoteRootPath() {
|
||||
return REMOTE_ROOT_PATH;
|
||||
}
|
||||
std::string LOCAL_ROOT_PATH = "/tmp/milvus"; // NOLINT
|
||||
|
||||
void
|
||||
SetLocalRootPath(const std::string& path_prefix) {
|
||||
|
||||
@ -20,48 +20,6 @@
|
||||
|
||||
namespace milvus::ChunkMangerConfig {
|
||||
|
||||
void
|
||||
SetAddress(const std::string& address);
|
||||
|
||||
std::string
|
||||
GetAddress();
|
||||
|
||||
void
|
||||
SetAccessKey(const std::string& access_key);
|
||||
|
||||
std::string
|
||||
GetAccessKey();
|
||||
|
||||
void
|
||||
SetAccessValue(const std::string& access_value);
|
||||
|
||||
std::string
|
||||
GetAccessValue();
|
||||
|
||||
void
|
||||
SetUseSSL(bool use_ssl);
|
||||
|
||||
bool
|
||||
GetUseSSL();
|
||||
|
||||
void
|
||||
SetUseIAM(bool use_iam);
|
||||
|
||||
bool
|
||||
GetUseIAM();
|
||||
|
||||
void
|
||||
SetBucketName(const std::string& bucket_name);
|
||||
|
||||
std::string
|
||||
GetBucketName();
|
||||
|
||||
void
|
||||
SetRemoteRootPath(const std::string& path_prefix);
|
||||
|
||||
std::string
|
||||
GetRemoteRootPath();
|
||||
|
||||
void
|
||||
SetLocalRootPath(const std::string& path_prefix);
|
||||
|
||||
|
||||
@ -16,31 +16,10 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <iostream>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "common/Types.h"
|
||||
#include "common/type_c.h"
|
||||
#include "index/Index.h"
|
||||
|
||||
namespace milvus::index {
|
||||
|
||||
struct LoadIndexInfo {
|
||||
int64_t collection_id;
|
||||
int64_t partition_id;
|
||||
int64_t segment_id;
|
||||
int64_t field_id;
|
||||
DataType field_type;
|
||||
int64_t index_id;
|
||||
int64_t index_build_id;
|
||||
int64_t index_version;
|
||||
std::map<std::string, std::string> index_params;
|
||||
std::vector<std::string> index_files;
|
||||
index::IndexBasePtr index;
|
||||
};
|
||||
|
||||
struct CreateIndexInfo {
|
||||
DataType field_type;
|
||||
IndexType index_type;
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
#include "indexbuilder/ScalarIndexCreator.h"
|
||||
#include "indexbuilder/VecIndexCreator.h"
|
||||
#include "indexbuilder/type_c.h"
|
||||
#include "storage/Types.h"
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
@ -39,7 +40,10 @@ class IndexFactory {
|
||||
}
|
||||
|
||||
IndexCreatorBasePtr
|
||||
CreateIndex(CDataType dtype, const char* type_params, const char* index_params) {
|
||||
CreateIndex(CDataType dtype,
|
||||
const char* type_params,
|
||||
const char* index_params,
|
||||
const storage::StorageConfig& storage_config) {
|
||||
auto real_dtype = DataType(dtype);
|
||||
auto invalid_dtype_msg = std::string("invalid data type: ") + std::to_string(int(real_dtype));
|
||||
|
||||
@ -57,7 +61,7 @@ class IndexFactory {
|
||||
|
||||
case DataType::VECTOR_FLOAT:
|
||||
case DataType::VECTOR_BINARY:
|
||||
return std::make_unique<VecIndexCreator>(real_dtype, type_params, index_params);
|
||||
return std::make_unique<VecIndexCreator>(real_dtype, type_params, index_params, storage_config);
|
||||
default:
|
||||
throw std::invalid_argument(invalid_dtype_msg);
|
||||
}
|
||||
|
||||
@ -25,7 +25,8 @@ namespace milvus::indexbuilder {
|
||||
|
||||
VecIndexCreator::VecIndexCreator(DataType data_type,
|
||||
const char* serialized_type_params,
|
||||
const char* serialized_index_params)
|
||||
const char* serialized_index_params,
|
||||
const storage::StorageConfig& storage_config)
|
||||
: data_type_(data_type) {
|
||||
proto::indexcgo::TypeParams type_params_;
|
||||
proto::indexcgo::IndexParams index_params_;
|
||||
@ -52,8 +53,8 @@ VecIndexCreator::VecIndexCreator(DataType data_type,
|
||||
#ifdef BUILD_DISK_ANN
|
||||
if (index::is_in_disk_list(index_info.index_type)) {
|
||||
// For now, only support diskann index
|
||||
file_manager = std::make_shared<storage::DiskFileManagerImpl>(index::GetFieldDataMetaFromConfig(config_),
|
||||
index::GetIndexMetaFromConfig(config_));
|
||||
file_manager = std::make_shared<storage::DiskFileManagerImpl>(
|
||||
index::GetFieldDataMetaFromConfig(config_), index::GetIndexMetaFromConfig(config_), storage_config);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
@ -19,6 +19,7 @@
|
||||
#include "indexbuilder/IndexCreatorBase.h"
|
||||
#include "index/VectorIndex.h"
|
||||
#include "index/IndexInfo.h"
|
||||
#include "storage/Types.h"
|
||||
|
||||
namespace milvus::indexbuilder {
|
||||
|
||||
@ -27,7 +28,8 @@ class VecIndexCreator : public IndexCreatorBase {
|
||||
public:
|
||||
explicit VecIndexCreator(DataType data_type,
|
||||
const char* serialized_type_params,
|
||||
const char* serialized_index_params);
|
||||
const char* serialized_index_params,
|
||||
const storage::StorageConfig& storage_config);
|
||||
|
||||
void
|
||||
Build(const milvus::DatasetPtr& dataset) override;
|
||||
|
||||
@ -21,18 +21,37 @@
|
||||
#include "indexbuilder/index_c.h"
|
||||
#include "indexbuilder/IndexFactory.h"
|
||||
#include "common/type_c.h"
|
||||
#include "storage/Types.h"
|
||||
|
||||
CStatus
|
||||
CreateIndex(enum CDataType dtype,
|
||||
const char* serialized_type_params,
|
||||
const char* serialized_index_params,
|
||||
CIndex* res_index) {
|
||||
CIndex* res_index,
|
||||
CStorageConfig c_storage_config) {
|
||||
auto status = CStatus();
|
||||
try {
|
||||
AssertInfo(res_index, "failed to create index, passed index was null");
|
||||
|
||||
auto index = milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex(dtype, serialized_type_params,
|
||||
serialized_index_params);
|
||||
std::string address(c_storage_config.address);
|
||||
std::string bucket_name(c_storage_config.bucket_name);
|
||||
std::string access_key(c_storage_config.access_key_id);
|
||||
std::string access_value(c_storage_config.access_key_value);
|
||||
std::string remote_root_path(c_storage_config.remote_root_path);
|
||||
std::string storage_type(c_storage_config.storage_type);
|
||||
std::string iam_endpoint(c_storage_config.iam_endpoint);
|
||||
auto storage_config = milvus::storage::StorageConfig{address,
|
||||
bucket_name,
|
||||
access_key,
|
||||
access_value,
|
||||
remote_root_path,
|
||||
storage_type,
|
||||
iam_endpoint,
|
||||
c_storage_config.useSSL,
|
||||
c_storage_config.useIAM};
|
||||
|
||||
auto index = milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex(
|
||||
dtype, serialized_type_params, serialized_index_params, storage_config);
|
||||
*res_index = index.release();
|
||||
status.error_code = Success;
|
||||
status.error_msg = "";
|
||||
|
||||
@ -24,7 +24,8 @@ CStatus
|
||||
CreateIndex(enum CDataType dtype,
|
||||
const char* serialized_type_params,
|
||||
const char* serialized_index_params,
|
||||
CIndex* res_index);
|
||||
CIndex* res_index,
|
||||
CStorageConfig storage_config);
|
||||
|
||||
CStatus
|
||||
DeleteIndex(CIndex index);
|
||||
|
||||
@ -17,13 +17,14 @@
|
||||
#include "common/LoadInfo.h"
|
||||
#include "pb/segcore.pb.h"
|
||||
#include "segcore/SegmentInterface.h"
|
||||
#include "segcore/Types.h"
|
||||
|
||||
namespace milvus::segcore {
|
||||
|
||||
class SegmentSealed : public SegmentInternalInterface {
|
||||
public:
|
||||
virtual void
|
||||
LoadIndex(const index::LoadIndexInfo& info) = 0;
|
||||
LoadIndex(const LoadIndexInfo& info) = 0;
|
||||
virtual void
|
||||
LoadSegmentMeta(const milvus::proto::segcore::LoadSegmentMeta& meta) = 0;
|
||||
virtual void
|
||||
|
||||
@ -39,7 +39,7 @@ SegmentSealedImpl::PreDelete(int64_t size) {
|
||||
}
|
||||
|
||||
void
|
||||
SegmentSealedImpl::LoadIndex(const index::LoadIndexInfo& info) {
|
||||
SegmentSealedImpl::LoadIndex(const LoadIndexInfo& info) {
|
||||
// print(info);
|
||||
// NOTE: lock only when data is ready to avoid starvation
|
||||
auto field_id = FieldId(info.field_id);
|
||||
@ -53,7 +53,7 @@ SegmentSealedImpl::LoadIndex(const index::LoadIndexInfo& info) {
|
||||
}
|
||||
|
||||
void
|
||||
SegmentSealedImpl::LoadVecIndex(const index::LoadIndexInfo& info) {
|
||||
SegmentSealedImpl::LoadVecIndex(const LoadIndexInfo& info) {
|
||||
// NOTE: lock only when data is ready to avoid starvation
|
||||
auto field_id = FieldId(info.field_id);
|
||||
auto& field_meta = schema_->operator[](field_id);
|
||||
@ -76,8 +76,7 @@ SegmentSealedImpl::LoadVecIndex(const index::LoadIndexInfo& info) {
|
||||
std::to_string(row_count_opt_.value()) + ")");
|
||||
}
|
||||
AssertInfo(!vector_indexings_.is_ready(field_id), "vec index is not ready");
|
||||
vector_indexings_.append_field_indexing(field_id, metric_type,
|
||||
std::move(const_cast<index::LoadIndexInfo&>(info).index));
|
||||
vector_indexings_.append_field_indexing(field_id, metric_type, std::move(const_cast<LoadIndexInfo&>(info).index));
|
||||
|
||||
set_bit(index_ready_bitset_, field_id, true);
|
||||
update_row_count(row_count);
|
||||
@ -85,7 +84,7 @@ SegmentSealedImpl::LoadVecIndex(const index::LoadIndexInfo& info) {
|
||||
}
|
||||
|
||||
void
|
||||
SegmentSealedImpl::LoadScalarIndex(const index::LoadIndexInfo& info) {
|
||||
SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
|
||||
// NOTE: lock only when data is ready to avoid starvation
|
||||
auto field_id = FieldId(info.field_id);
|
||||
auto& field_meta = schema_->operator[](field_id);
|
||||
@ -106,7 +105,7 @@ SegmentSealedImpl::LoadScalarIndex(const index::LoadIndexInfo& info) {
|
||||
std::to_string(row_count_opt_.value()) + ")");
|
||||
}
|
||||
|
||||
scalar_indexings_[field_id] = std::move(const_cast<index::LoadIndexInfo&>(info).index);
|
||||
scalar_indexings_[field_id] = std::move(const_cast<LoadIndexInfo&>(info).index);
|
||||
// reverse pk from scalar index and set pks to offset
|
||||
if (schema_->get_primary_field_id() == field_id) {
|
||||
AssertInfo(field_id.get() != -1, "Primary key is -1");
|
||||
|
||||
@ -35,7 +35,7 @@ class SegmentSealedImpl : public SegmentSealed {
|
||||
public:
|
||||
explicit SegmentSealedImpl(SchemaPtr schema, int64_t segment_id);
|
||||
void
|
||||
LoadIndex(const index::LoadIndexInfo& info) override;
|
||||
LoadIndex(const LoadIndexInfo& info) override;
|
||||
void
|
||||
LoadFieldData(const LoadFieldDataInfo& info) override;
|
||||
void
|
||||
@ -172,10 +172,10 @@ class SegmentSealedImpl : public SegmentSealed {
|
||||
search_ids(const BitsetType& view, Timestamp timestamp) const override;
|
||||
|
||||
void
|
||||
LoadVecIndex(const index::LoadIndexInfo& info);
|
||||
LoadVecIndex(const LoadIndexInfo& info);
|
||||
|
||||
void
|
||||
LoadScalarIndex(const index::LoadIndexInfo& info);
|
||||
LoadScalarIndex(const LoadIndexInfo& info);
|
||||
|
||||
private:
|
||||
// segment loading state
|
||||
|
||||
46
internal/core/src/segcore/Types.h
Normal file
46
internal/core/src/segcore/Types.h
Normal file
@ -0,0 +1,46 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <iostream>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "common/Types.h"
|
||||
#include "common/type_c.h"
|
||||
#include "index/Index.h"
|
||||
#include "storage/Types.h"
|
||||
|
||||
namespace milvus::segcore {
|
||||
|
||||
struct LoadIndexInfo {
|
||||
int64_t collection_id;
|
||||
int64_t partition_id;
|
||||
int64_t segment_id;
|
||||
int64_t field_id;
|
||||
DataType field_type;
|
||||
int64_t index_id;
|
||||
int64_t index_build_id;
|
||||
int64_t index_version;
|
||||
std::map<std::string, std::string> index_params;
|
||||
std::vector<std::string> index_files;
|
||||
index::IndexBasePtr index;
|
||||
storage::StorageConfig storage_config;
|
||||
};
|
||||
|
||||
} // namespace milvus::segcore
|
||||
@ -17,11 +17,21 @@
|
||||
#include "index/IndexFactory.h"
|
||||
#include "storage/Util.h"
|
||||
#include "segcore/load_index_c.h"
|
||||
#include "segcore/Types.h"
|
||||
|
||||
CStatus
|
||||
NewLoadIndexInfo(CLoadIndexInfo* c_load_index_info) {
|
||||
NewLoadIndexInfo(CLoadIndexInfo* c_load_index_info, CStorageConfig c_storage_config) {
|
||||
try {
|
||||
auto load_index_info = std::make_unique<milvus::index::LoadIndexInfo>();
|
||||
auto load_index_info = std::make_unique<milvus::segcore::LoadIndexInfo>();
|
||||
auto& storage_config = load_index_info->storage_config;
|
||||
storage_config.address = std::string(c_storage_config.address);
|
||||
storage_config.bucket_name = std::string(c_storage_config.bucket_name);
|
||||
storage_config.access_key_id = std::string(c_storage_config.access_key_id);
|
||||
storage_config.access_key_value = std::string(c_storage_config.access_key_value);
|
||||
storage_config.remote_root_path = std::string(c_storage_config.remote_root_path);
|
||||
storage_config.storage_type = std::string(c_storage_config.storage_type);
|
||||
storage_config.iam_endpoint = std::string(c_storage_config.iam_endpoint);
|
||||
|
||||
*c_load_index_info = load_index_info.release();
|
||||
auto status = CStatus();
|
||||
status.error_code = Success;
|
||||
@ -37,14 +47,14 @@ NewLoadIndexInfo(CLoadIndexInfo* c_load_index_info) {
|
||||
|
||||
void
|
||||
DeleteLoadIndexInfo(CLoadIndexInfo c_load_index_info) {
|
||||
auto info = (milvus::index::LoadIndexInfo*)c_load_index_info;
|
||||
auto info = (milvus::segcore::LoadIndexInfo*)c_load_index_info;
|
||||
delete info;
|
||||
}
|
||||
|
||||
CStatus
|
||||
AppendIndexParam(CLoadIndexInfo c_load_index_info, const char* c_index_key, const char* c_index_value) {
|
||||
try {
|
||||
auto load_index_info = (milvus::index::LoadIndexInfo*)c_load_index_info;
|
||||
auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info;
|
||||
std::string index_key(c_index_key);
|
||||
std::string index_value(c_index_value);
|
||||
load_index_info->index_params[index_key] = index_value;
|
||||
@ -69,7 +79,7 @@ AppendFieldInfo(CLoadIndexInfo c_load_index_info,
|
||||
int64_t field_id,
|
||||
enum CDataType field_type) {
|
||||
try {
|
||||
auto load_index_info = (milvus::index::LoadIndexInfo*)c_load_index_info;
|
||||
auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info;
|
||||
load_index_info->collection_id = collection_id;
|
||||
load_index_info->partition_id = partition_id;
|
||||
load_index_info->segment_id = segment_id;
|
||||
@ -91,7 +101,7 @@ AppendFieldInfo(CLoadIndexInfo c_load_index_info,
|
||||
CStatus
|
||||
appendVecIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
|
||||
try {
|
||||
auto load_index_info = (milvus::index::LoadIndexInfo*)c_load_index_info;
|
||||
auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info;
|
||||
auto binary_set = (knowhere::BinarySet*)c_binary_set;
|
||||
auto& index_params = load_index_info->index_params;
|
||||
|
||||
@ -117,7 +127,8 @@ appendVecIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
|
||||
load_index_info->segment_id, load_index_info->field_id};
|
||||
milvus::storage::IndexMeta index_meta{load_index_info->segment_id, load_index_info->field_id,
|
||||
load_index_info->index_build_id, load_index_info->index_version};
|
||||
auto file_manager = milvus::storage::CreateFileManager(index_info.index_type, field_meta, index_meta);
|
||||
auto file_manager = milvus::storage::CreateFileManager(index_info.index_type, field_meta, index_meta,
|
||||
load_index_info->storage_config);
|
||||
|
||||
auto config = milvus::index::ParseConfigFromIndexParams(load_index_info->index_params);
|
||||
config["index_files"] = load_index_info->index_files;
|
||||
@ -139,7 +150,7 @@ appendVecIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
|
||||
CStatus
|
||||
appendScalarIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
|
||||
try {
|
||||
auto load_index_info = (milvus::index::LoadIndexInfo*)c_load_index_info;
|
||||
auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info;
|
||||
auto field_type = load_index_info->field_type;
|
||||
auto binary_set = (knowhere::BinarySet*)c_binary_set;
|
||||
auto& index_params = load_index_info->index_params;
|
||||
@ -171,7 +182,7 @@ appendScalarIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
|
||||
|
||||
CStatus
|
||||
AppendIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
|
||||
auto load_index_info = (milvus::index::LoadIndexInfo*)c_load_index_info;
|
||||
auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info;
|
||||
auto field_type = load_index_info->field_type;
|
||||
if (milvus::datatype_is_vector(field_type)) {
|
||||
return appendVecIndex(c_load_index_info, c_binary_set);
|
||||
@ -182,7 +193,7 @@ AppendIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
|
||||
CStatus
|
||||
AppendIndexFilePath(CLoadIndexInfo c_load_index_info, const char* c_file_path) {
|
||||
try {
|
||||
auto load_index_info = (milvus::index::LoadIndexInfo*)c_load_index_info;
|
||||
auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info;
|
||||
std::string index_file_path(c_file_path);
|
||||
load_index_info->index_files.emplace_back(index_file_path);
|
||||
|
||||
@ -201,7 +212,7 @@ AppendIndexFilePath(CLoadIndexInfo c_load_index_info, const char* c_file_path) {
|
||||
CStatus
|
||||
AppendIndexInfo(CLoadIndexInfo c_load_index_info, int64_t index_id, int64_t build_id, int64_t version) {
|
||||
try {
|
||||
auto load_index_info = (milvus::index::LoadIndexInfo*)c_load_index_info;
|
||||
auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info;
|
||||
load_index_info->index_id = index_id;
|
||||
load_index_info->index_build_id = build_id;
|
||||
load_index_info->index_version = version;
|
||||
@ -221,7 +232,7 @@ AppendIndexInfo(CLoadIndexInfo c_load_index_info, int64_t index_id, int64_t buil
|
||||
CStatus
|
||||
CleanLoadedIndex(CLoadIndexInfo c_load_index_info) {
|
||||
try {
|
||||
auto load_index_info = (milvus::index::LoadIndexInfo*)c_load_index_info;
|
||||
auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info;
|
||||
auto index_file_path_prefix =
|
||||
milvus::storage::GenLocalIndexPathPrefix(load_index_info->index_build_id, load_index_info->index_version);
|
||||
#ifdef BUILD_DISK_ANN
|
||||
|
||||
@ -24,7 +24,7 @@ extern "C" {
|
||||
typedef void* CLoadIndexInfo;
|
||||
|
||||
CStatus
|
||||
NewLoadIndexInfo(CLoadIndexInfo* c_load_index_info);
|
||||
NewLoadIndexInfo(CLoadIndexInfo* c_load_index_info, CStorageConfig c_storage_config);
|
||||
|
||||
void
|
||||
DeleteLoadIndexInfo(CLoadIndexInfo c_load_index_info);
|
||||
|
||||
@ -238,7 +238,7 @@ UpdateSealedSegmentIndex(CSegmentInterface c_segment, CLoadIndexInfo c_load_inde
|
||||
auto segment_interface = reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
|
||||
auto segment = dynamic_cast<milvus::segcore::SegmentSealed*>(segment_interface);
|
||||
AssertInfo(segment != nullptr, "segment conversion failed");
|
||||
auto load_index_info = (milvus::index::LoadIndexInfo*)c_load_index_info;
|
||||
auto load_index_info = (milvus::segcore::LoadIndexInfo*)c_load_index_info;
|
||||
segment->LoadIndex(*load_index_info);
|
||||
return milvus::SuccessCStatus();
|
||||
} catch (std::exception& e) {
|
||||
|
||||
@ -14,7 +14,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
option( EMBEDDED_MILVUS "Enable embedded Milvus" OFF )
|
||||
|
||||
if ( EMBEDDED_MILVUS )
|
||||
@ -46,10 +45,12 @@ endif ()
|
||||
|
||||
add_library(milvus_storage SHARED ${STORAGE_FILES})
|
||||
|
||||
find_package(Boost REQUIRED COMPONENTS filesystem)
|
||||
|
||||
if ( BUILD_DISK_ANN STREQUAL "ON" )
|
||||
target_link_libraries( milvus_storage PUBLIC milvus_common boost_system boost_filesystem aws-cpp-sdk-s3 pthread)
|
||||
target_link_libraries( milvus_storage PUBLIC milvus_common Boost::filesystem aws-cpp-sdk-s3 pthread)
|
||||
else()
|
||||
target_link_libraries( milvus_storage PUBLIC milvus_common pthread)
|
||||
target_link_libraries( milvus_storage PUBLIC milvus_common Boost::filesystem pthread)
|
||||
endif()
|
||||
|
||||
if(NOT CMAKE_INSTALL_PREFIX)
|
||||
|
||||
@ -123,6 +123,6 @@ class RemoteChunkManager : public ChunkManager {
|
||||
}
|
||||
};
|
||||
|
||||
using RemoteChunkManagerSPtr = std::shared_ptr<milvus::storage::RemoteChunkManager>;
|
||||
using RemoteChunkManagerPtr = std::unique_ptr<RemoteChunkManager>;
|
||||
|
||||
} // namespace milvus::storage
|
||||
|
||||
@ -58,8 +58,12 @@ using WriteLock = std::lock_guard<std::shared_mutex>;
|
||||
|
||||
namespace milvus::storage {
|
||||
|
||||
DiskFileManagerImpl::DiskFileManagerImpl(const FieldDataMeta& field_mata, const IndexMeta& index_meta)
|
||||
DiskFileManagerImpl::DiskFileManagerImpl(const FieldDataMeta& field_mata,
|
||||
const IndexMeta& index_meta,
|
||||
const StorageConfig& storage_config)
|
||||
: field_meta_(field_mata), index_meta_(index_meta) {
|
||||
remote_root_path_ = storage_config.remote_root_path;
|
||||
rcm_ = std::make_unique<MinioChunkManager>(storage_config);
|
||||
}
|
||||
|
||||
DiskFileManagerImpl::~DiskFileManagerImpl() {
|
||||
@ -75,7 +79,6 @@ DiskFileManagerImpl::LoadFile(const std::string& file) noexcept {
|
||||
bool
|
||||
DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
|
||||
auto& local_chunk_manager = LocalChunkManager::GetInstance();
|
||||
auto& remote_chunk_manager = MinioChunkManager::GetInstance();
|
||||
FILEMANAGER_TRY
|
||||
if (!local_chunk_manager.Exist(file)) {
|
||||
LOG_SEGCORE_ERROR_C << "local file: " << file << " does not exist ";
|
||||
@ -106,7 +109,7 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
|
||||
// Put file to remote
|
||||
char objectKey[200];
|
||||
snprintf(objectKey, sizeof(objectKey), "%s/%s_%d", remotePrefix.c_str(), fileName.c_str(), slice_num);
|
||||
remote_chunk_manager.Write(objectKey, serialized_index_data.data(), serialized_index_size);
|
||||
rcm_->Write(objectKey, serialized_index_data.data(), serialized_index_size);
|
||||
|
||||
offset += batch_size;
|
||||
// record remote file to save etcd
|
||||
@ -121,7 +124,6 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
|
||||
void
|
||||
DiskFileManagerImpl::CacheIndexToDisk(std::vector<std::string> remote_files) {
|
||||
auto& local_chunk_manager = LocalChunkManager::GetInstance();
|
||||
auto& remote_chunk_manager = MinioChunkManager::GetInstance();
|
||||
|
||||
std::map<std::string, std::vector<int>> index_slices;
|
||||
for (auto& file_path : remote_files) {
|
||||
@ -140,9 +142,9 @@ DiskFileManagerImpl::CacheIndexToDisk(std::vector<std::string> remote_files) {
|
||||
int64_t offset = 0;
|
||||
for (auto iter = slices.second.begin(); iter != slices.second.end(); iter++) {
|
||||
auto origin_file = prefix + "_" + std::to_string(*iter);
|
||||
auto fileSize = remote_chunk_manager.Size(origin_file);
|
||||
auto fileSize = rcm_->Size(origin_file);
|
||||
auto buf = std::unique_ptr<uint8_t[]>(new uint8_t[fileSize]);
|
||||
remote_chunk_manager.Read(origin_file, buf.get(), fileSize);
|
||||
rcm_->Read(origin_file, buf.get(), fileSize);
|
||||
|
||||
auto decoded_index_data = DeserializeFileData(buf.get(), fileSize);
|
||||
auto index_payload = decoded_index_data->GetPayload();
|
||||
@ -164,9 +166,9 @@ DiskFileManagerImpl::GetFileName(const std::string& localfile) {
|
||||
|
||||
std::string
|
||||
DiskFileManagerImpl::GetRemoteIndexObjectPrefix() {
|
||||
return ChunkMangerConfig::GetRemoteRootPath() + "/" + std::string(INDEX_ROOT_PATH) + "/" +
|
||||
std::to_string(index_meta_.build_id) + "/" + std::to_string(index_meta_.index_version) + "/" +
|
||||
std::to_string(field_meta_.partition_id) + "/" + std::to_string(field_meta_.segment_id);
|
||||
return remote_root_path_ + "/" + std::string(INDEX_ROOT_PATH) + "/" + std::to_string(index_meta_.build_id) + "/" +
|
||||
std::to_string(index_meta_.index_version) + "/" + std::to_string(field_meta_.partition_id) + "/" +
|
||||
std::to_string(field_meta_.segment_id);
|
||||
}
|
||||
|
||||
std::string
|
||||
@ -184,12 +186,11 @@ DiskFileManagerImpl::RemoveFile(const std::string& file) noexcept {
|
||||
// remove local file
|
||||
bool localExist = false;
|
||||
auto& local_chunk_manager = LocalChunkManager::GetInstance();
|
||||
auto& remote_chunk_manager = MinioChunkManager::GetInstance();
|
||||
FILEMANAGER_TRY
|
||||
localExist = local_chunk_manager.Exist(file);
|
||||
FILEMANAGER_CATCH
|
||||
FILEMANAGER_END
|
||||
if (!localExist) {
|
||||
if (localExist) {
|
||||
FILEMANAGER_TRY
|
||||
local_chunk_manager.Remove(file);
|
||||
FILEMANAGER_CATCH
|
||||
@ -200,12 +201,12 @@ DiskFileManagerImpl::RemoveFile(const std::string& file) noexcept {
|
||||
std::string remoteFile = "";
|
||||
bool remoteExist = false;
|
||||
FILEMANAGER_TRY
|
||||
remoteExist = remote_chunk_manager.Exist(remoteFile);
|
||||
remoteExist = rcm_->Exist(remoteFile);
|
||||
FILEMANAGER_CATCH
|
||||
FILEMANAGER_END
|
||||
if (!remoteExist) {
|
||||
if (remoteExist) {
|
||||
FILEMANAGER_TRY
|
||||
remote_chunk_manager.Remove(file);
|
||||
rcm_->Remove(file);
|
||||
FILEMANAGER_CATCH
|
||||
FILEMANAGER_END
|
||||
}
|
||||
@ -216,7 +217,6 @@ std::optional<bool>
|
||||
DiskFileManagerImpl::IsExisted(const std::string& file) noexcept {
|
||||
bool isExist = false;
|
||||
auto& local_chunk_manager = LocalChunkManager::GetInstance();
|
||||
auto& remote_chunk_manager = MinioChunkManager::GetInstance();
|
||||
try {
|
||||
isExist = local_chunk_manager.Exist(file);
|
||||
} catch (LocalChunkManagerException& e) {
|
||||
|
||||
@ -24,12 +24,15 @@
|
||||
|
||||
#include "storage/IndexData.h"
|
||||
#include "storage/FileManager.h"
|
||||
#include "storage/MinioChunkManager.h"
|
||||
|
||||
namespace milvus::storage {
|
||||
|
||||
class DiskFileManagerImpl : public FileManagerImpl {
|
||||
public:
|
||||
explicit DiskFileManagerImpl(const FieldDataMeta& field_mata, const IndexMeta& index_meta);
|
||||
explicit DiskFileManagerImpl(const FieldDataMeta& field_mata,
|
||||
const IndexMeta& index_meta,
|
||||
const StorageConfig& storage_config);
|
||||
|
||||
virtual ~DiskFileManagerImpl();
|
||||
|
||||
@ -65,6 +68,11 @@ class DiskFileManagerImpl : public FileManagerImpl {
|
||||
return remote_paths_to_size_;
|
||||
}
|
||||
|
||||
std::vector<std::string>
|
||||
GetLocalFilePaths() const {
|
||||
return local_paths_;
|
||||
}
|
||||
|
||||
void
|
||||
CacheIndexToDisk(std::vector<std::string> remote_files);
|
||||
|
||||
@ -99,6 +107,9 @@ class DiskFileManagerImpl : public FileManagerImpl {
|
||||
|
||||
// remote file path
|
||||
std::map<std::string, int64_t> remote_paths_to_size_;
|
||||
|
||||
RemoteChunkManagerPtr rcm_;
|
||||
std::string remote_root_path_;
|
||||
};
|
||||
|
||||
using DiskANNFileManagerImplPtr = std::shared_ptr<DiskFileManagerImpl>;
|
||||
|
||||
@ -64,18 +64,13 @@ ConvertFromAwsString(const Aws::String& aws_str) {
|
||||
return std::string(aws_str.c_str(), aws_str.size());
|
||||
}
|
||||
|
||||
MinioChunkManager::MinioChunkManager(const std::string& endpoint,
|
||||
const std::string& access_key,
|
||||
const std::string& access_value,
|
||||
const std::string& bucket_name,
|
||||
bool secure,
|
||||
bool use_iam)
|
||||
: default_bucket_name_(bucket_name) {
|
||||
MinioChunkManager::MinioChunkManager(const StorageConfig& storage_config)
|
||||
: default_bucket_name_(storage_config.bucket_name) {
|
||||
Aws::InitAPI(sdk_options_);
|
||||
Aws::Client::ClientConfiguration config;
|
||||
config.endpointOverride = ConvertToAwsString(endpoint);
|
||||
config.endpointOverride = ConvertToAwsString(storage_config.address);
|
||||
|
||||
if (secure) {
|
||||
if (storage_config.useSSL) {
|
||||
config.scheme = Aws::Http::Scheme::HTTPS;
|
||||
config.verifySSL = true;
|
||||
} else {
|
||||
@ -83,7 +78,7 @@ MinioChunkManager::MinioChunkManager(const std::string& endpoint,
|
||||
config.verifySSL = false;
|
||||
}
|
||||
|
||||
if (use_iam) {
|
||||
if (storage_config.useIAM) {
|
||||
auto provider = std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
|
||||
client_ = std::make_shared<Aws::S3::S3Client>(provider, config,
|
||||
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
|
||||
@ -94,13 +89,19 @@ MinioChunkManager::MinioChunkManager(const std::string& endpoint,
|
||||
<< " token:" << provider->GetAWSCredentials().GetSessionToken() << "}";
|
||||
} else {
|
||||
client_ = std::make_shared<Aws::S3::S3Client>(
|
||||
Aws::Auth::AWSCredentials(ConvertToAwsString(access_key), ConvertToAwsString(access_value)), config,
|
||||
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
|
||||
Aws::Auth::AWSCredentials(ConvertToAwsString(storage_config.access_key_id),
|
||||
ConvertToAwsString(storage_config.access_key_value)),
|
||||
config, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false);
|
||||
}
|
||||
|
||||
LOG_SEGCORE_INFO_C << "init MinioChunkManager with parameter[endpoint: '" << endpoint << "', access_key:'"
|
||||
<< access_key << "', access_value:'" << access_value << "', default_bucket_name:'" << bucket_name
|
||||
<< "', use_secure:'" << std::boolalpha << secure << "']";
|
||||
if (!BucketExists(storage_config.bucket_name)) {
|
||||
CreateBucket(storage_config.bucket_name);
|
||||
}
|
||||
|
||||
LOG_SEGCORE_INFO_C << "init MinioChunkManager with parameter[endpoint: '" << storage_config.address
|
||||
<< "', access_key:'" << storage_config.access_key_id << "', access_value:'"
|
||||
<< storage_config.access_key_value << "', default_bucket_name:'" << storage_config.bucket_name
|
||||
<< "', use_secure:'" << std::boolalpha << storage_config.useSSL << "']";
|
||||
}
|
||||
|
||||
MinioChunkManager::~MinioChunkManager() {
|
||||
@ -179,7 +180,8 @@ MinioChunkManager::CreateBucket(const std::string& bucket_name) {
|
||||
|
||||
auto outcome = client_->CreateBucket(request);
|
||||
|
||||
if (!outcome.IsSuccess()) {
|
||||
if (!outcome.IsSuccess() &&
|
||||
Aws::S3::S3Errors(outcome.GetError().GetErrorType()) != Aws::S3::S3Errors::BUCKET_ALREADY_OWNED_BY_YOU) {
|
||||
THROWS3ERROR(CreateBucket);
|
||||
}
|
||||
return true;
|
||||
|
||||
@ -23,9 +23,10 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "ChunkManager.h"
|
||||
#include "Exception.h"
|
||||
#include "config/ConfigChunkManager.h"
|
||||
#include "storage/ChunkManager.h"
|
||||
#include "storage/Exception.h"
|
||||
#include "storage/Types.h"
|
||||
|
||||
namespace milvus::storage {
|
||||
|
||||
@ -33,13 +34,8 @@ namespace milvus::storage {
|
||||
* @brief This MinioChunkManager is responsible for read and write file in S3.
|
||||
*/
|
||||
class MinioChunkManager : public RemoteChunkManager {
|
||||
private:
|
||||
explicit MinioChunkManager(const std::string& endpoint,
|
||||
const std::string& access_key,
|
||||
const std::string& access_value,
|
||||
const std::string& default_bucket_name,
|
||||
bool serure = false,
|
||||
bool use_iam = false);
|
||||
public:
|
||||
explicit MinioChunkManager(const StorageConfig& storage_config);
|
||||
|
||||
MinioChunkManager(const MinioChunkManager&);
|
||||
MinioChunkManager&
|
||||
@ -48,15 +44,6 @@ class MinioChunkManager : public RemoteChunkManager {
|
||||
public:
|
||||
virtual ~MinioChunkManager();
|
||||
|
||||
static MinioChunkManager&
|
||||
GetInstance() {
|
||||
// thread-safe enough after c++ 11
|
||||
static MinioChunkManager instance(ChunkMangerConfig::GetAddress(), ChunkMangerConfig::GetAccessKey(),
|
||||
ChunkMangerConfig::GetAccessValue(), ChunkMangerConfig::GetBucketName(),
|
||||
ChunkMangerConfig::GetUseSSL(), ChunkMangerConfig::GetUseIAM());
|
||||
return instance;
|
||||
}
|
||||
|
||||
virtual bool
|
||||
Exist(const std::string& filepath);
|
||||
|
||||
@ -132,6 +119,6 @@ class MinioChunkManager : public RemoteChunkManager {
|
||||
std::string default_bucket_name_;
|
||||
};
|
||||
|
||||
using MinioChunkManagerSPtr = std::shared_ptr<MinioChunkManager>;
|
||||
using MinioChunkManagerPtr = std::unique_ptr<MinioChunkManager>;
|
||||
|
||||
} // namespace milvus::storage
|
||||
|
||||
@ -79,4 +79,16 @@ struct IndexMeta {
|
||||
std::string key;
|
||||
};
|
||||
|
||||
struct StorageConfig {
|
||||
std::string address = "localhost:9000";
|
||||
std::string bucket_name = "a-bucket";
|
||||
std::string access_key_id = "minioadmin";
|
||||
std::string access_key_value = "minioadmin";
|
||||
std::string remote_root_path = "files";
|
||||
std::string storage_type = "minio";
|
||||
std::string iam_endpoint = "";
|
||||
bool useSSL = false;
|
||||
bool useIAM = false;
|
||||
};
|
||||
|
||||
} // namespace milvus::storage
|
||||
|
||||
@ -360,11 +360,14 @@ is_in_disk_list(const IndexType& index_type) {
|
||||
}
|
||||
|
||||
FileManagerImplPtr
|
||||
CreateFileManager(IndexType index_type, const FieldDataMeta& field_meta, const IndexMeta& index_meta) {
|
||||
CreateFileManager(IndexType index_type,
|
||||
const FieldDataMeta& field_meta,
|
||||
const IndexMeta& index_meta,
|
||||
const StorageConfig& storage_config) {
|
||||
// TODO :: switch case index type to create file manager
|
||||
#ifdef BUILD_DISK_ANN
|
||||
if (is_in_disk_list(index_type)) {
|
||||
return std::make_shared<DiskFileManagerImpl>(field_meta, index_meta);
|
||||
return std::make_shared<DiskFileManagerImpl>(field_meta, index_meta, storage_config);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
@ -79,6 +79,9 @@ bool
|
||||
is_in_disk_list(const IndexType& index_type);
|
||||
|
||||
FileManagerImplPtr
|
||||
CreateFileManager(IndexType index_type, const FieldDataMeta& field_meta, const IndexMeta& index_meta);
|
||||
CreateFileManager(IndexType index_type,
|
||||
const FieldDataMeta& field_meta,
|
||||
const IndexMeta& index_meta,
|
||||
const StorageConfig& storage_config);
|
||||
|
||||
} // namespace milvus::storage
|
||||
|
||||
@ -64,7 +64,8 @@ IndexBuilder_build(benchmark::State& state) {
|
||||
|
||||
for (auto _ : state) {
|
||||
auto index = std::make_unique<milvus::indexbuilder::VecIndexCreator>(
|
||||
milvus::DataType::VECTOR_FLOAT, type_params_str.c_str(), index_params_str.c_str());
|
||||
milvus::DataType::VECTOR_FLOAT, type_params_str.c_str(), index_params_str.c_str(),
|
||||
get_default_storage_config());
|
||||
index->Build(xb_dataset);
|
||||
}
|
||||
}
|
||||
@ -93,7 +94,8 @@ IndexBuilder_build_and_codec(benchmark::State& state) {
|
||||
|
||||
for (auto _ : state) {
|
||||
auto index = std::make_unique<milvus::indexbuilder::VecIndexCreator>(
|
||||
milvus::DataType::VECTOR_FLOAT, type_params_str.c_str(), index_params_str.c_str());
|
||||
milvus::DataType::VECTOR_FLOAT, type_params_str.c_str(), index_params_str.c_str(),
|
||||
get_default_storage_config());
|
||||
|
||||
index->Build(xb_dataset);
|
||||
index->Serialize();
|
||||
|
||||
@ -106,7 +106,7 @@ Search_Sealed(benchmark::State& state) {
|
||||
// ivf
|
||||
auto vec = dataset_.get_col<float>(milvus::FieldId(100));
|
||||
auto indexing = GenVecIndexing(N, dim, vec.data());
|
||||
index::LoadIndexInfo info;
|
||||
segcore::LoadIndexInfo info;
|
||||
info.index = std::move(indexing);
|
||||
info.field_id = (*schema)[FieldName("fakevec")].get_id().get();
|
||||
info.index_params["index_type"] = "IVF";
|
||||
|
||||
@ -38,12 +38,13 @@ using namespace milvus;
|
||||
using namespace milvus::segcore;
|
||||
using namespace milvus::index;
|
||||
using namespace knowhere;
|
||||
using milvus::index::LoadIndexInfo;
|
||||
using milvus::index::VectorIndex;
|
||||
using milvus::segcore::LoadIndexInfo;
|
||||
|
||||
namespace {
|
||||
// const int DIM = 16;
|
||||
const int64_t ROW_COUNT = 100 * 1000;
|
||||
const CStorageConfig c_storage_config = get_default_cstorage_config();
|
||||
|
||||
const char*
|
||||
get_default_schema_config() {
|
||||
@ -1413,7 +1414,7 @@ TEST(CApiTest, LoadIndexInfo) {
|
||||
CBinarySet c_binary_set = (CBinarySet)&binary_set;
|
||||
|
||||
void* c_load_index_info = nullptr;
|
||||
auto status = NewLoadIndexInfo(&c_load_index_info);
|
||||
auto status = NewLoadIndexInfo(&c_load_index_info, c_storage_config);
|
||||
assert(status.error_code == Success);
|
||||
std::string index_param_key1 = "index_type";
|
||||
std::string index_param_value1 = "IVF_PQ";
|
||||
@ -1462,7 +1463,7 @@ TEST(CApiTest, LoadIndex_Search) {
|
||||
auto binary_set = indexing->Serialize(conf);
|
||||
|
||||
// fill loadIndexInfo
|
||||
milvus::index::LoadIndexInfo load_index_info;
|
||||
milvus::segcore::LoadIndexInfo load_index_info;
|
||||
auto& index_params = load_index_info.index_params;
|
||||
index_params["index_type"] = "IVF_PQ";
|
||||
index_params["index_mode"] = "CPU";
|
||||
@ -1567,7 +1568,7 @@ TEST(CApiTest, Indexing_Without_Predicate) {
|
||||
|
||||
auto binary_set = indexing->Serialize(milvus::Config{});
|
||||
void* c_load_index_info = nullptr;
|
||||
status = NewLoadIndexInfo(&c_load_index_info);
|
||||
status = NewLoadIndexInfo(&c_load_index_info, c_storage_config);
|
||||
assert(status.error_code == Success);
|
||||
std::string index_type_key = "index_type";
|
||||
std::string index_type_value = "IVF_PQ";
|
||||
@ -1688,7 +1689,7 @@ TEST(CApiTest, Indexing_Expr_Without_Predicate) {
|
||||
|
||||
auto binary_set = indexing->Serialize(milvus::Config{});
|
||||
void* c_load_index_info = nullptr;
|
||||
status = NewLoadIndexInfo(&c_load_index_info);
|
||||
status = NewLoadIndexInfo(&c_load_index_info, c_storage_config);
|
||||
assert(status.error_code == Success);
|
||||
std::string index_type_key = "index_type";
|
||||
std::string index_type_value = "IVF_PQ";
|
||||
@ -1826,7 +1827,7 @@ TEST(CApiTest, Indexing_With_float_Predicate_Range) {
|
||||
|
||||
auto binary_set = indexing->Serialize(milvus::Config{});
|
||||
void* c_load_index_info = nullptr;
|
||||
status = NewLoadIndexInfo(&c_load_index_info);
|
||||
status = NewLoadIndexInfo(&c_load_index_info, c_storage_config);
|
||||
assert(status.error_code == Success);
|
||||
std::string index_type_key = "index_type";
|
||||
std::string index_type_value = "IVF_PQ";
|
||||
@ -1978,7 +1979,7 @@ TEST(CApiTest, Indexing_Expr_With_float_Predicate_Range) {
|
||||
|
||||
auto binary_set = indexing->Serialize(milvus::Config{});
|
||||
void* c_load_index_info = nullptr;
|
||||
status = NewLoadIndexInfo(&c_load_index_info);
|
||||
status = NewLoadIndexInfo(&c_load_index_info, c_storage_config);
|
||||
assert(status.error_code == Success);
|
||||
std::string index_type_key = "index_type";
|
||||
std::string index_type_value = "IVF_PQ";
|
||||
@ -2114,7 +2115,7 @@ TEST(CApiTest, Indexing_With_float_Predicate_Term) {
|
||||
|
||||
auto binary_set = indexing->Serialize(milvus::Config{});
|
||||
void* c_load_index_info = nullptr;
|
||||
status = NewLoadIndexInfo(&c_load_index_info);
|
||||
status = NewLoadIndexInfo(&c_load_index_info, c_storage_config);
|
||||
assert(status.error_code == Success);
|
||||
std::string index_type_key = "index_type";
|
||||
std::string index_type_value = "IVF_PQ";
|
||||
@ -2259,7 +2260,7 @@ TEST(CApiTest, Indexing_Expr_With_float_Predicate_Term) {
|
||||
|
||||
auto binary_set = indexing->Serialize(milvus::Config{});
|
||||
void* c_load_index_info = nullptr;
|
||||
status = NewLoadIndexInfo(&c_load_index_info);
|
||||
status = NewLoadIndexInfo(&c_load_index_info, c_storage_config);
|
||||
assert(status.error_code == Success);
|
||||
std::string index_type_key = "index_type";
|
||||
std::string index_type_value = "IVF_PQ";
|
||||
@ -2397,7 +2398,7 @@ TEST(CApiTest, Indexing_With_binary_Predicate_Range) {
|
||||
|
||||
auto binary_set = indexing->Serialize(milvus::Config{});
|
||||
void* c_load_index_info = nullptr;
|
||||
status = NewLoadIndexInfo(&c_load_index_info);
|
||||
status = NewLoadIndexInfo(&c_load_index_info, c_storage_config);
|
||||
assert(status.error_code == Success);
|
||||
std::string index_type_key = "index_type";
|
||||
std::string index_type_value = "BIN_IVF_FLAT";
|
||||
@ -2547,7 +2548,7 @@ TEST(CApiTest, Indexing_Expr_With_binary_Predicate_Range) {
|
||||
|
||||
auto binary_set = indexing->Serialize(milvus::Config{});
|
||||
void* c_load_index_info = nullptr;
|
||||
status = NewLoadIndexInfo(&c_load_index_info);
|
||||
status = NewLoadIndexInfo(&c_load_index_info, c_storage_config);
|
||||
assert(status.error_code == Success);
|
||||
std::string index_type_key = "index_type";
|
||||
std::string index_type_value = "BIN_IVF_FLAT";
|
||||
@ -2684,7 +2685,7 @@ TEST(CApiTest, Indexing_With_binary_Predicate_Term) {
|
||||
|
||||
auto binary_set = indexing->Serialize(milvus::Config{});
|
||||
void* c_load_index_info = nullptr;
|
||||
status = NewLoadIndexInfo(&c_load_index_info);
|
||||
status = NewLoadIndexInfo(&c_load_index_info, c_storage_config);
|
||||
assert(status.error_code == Success);
|
||||
std::string index_type_key = "index_type";
|
||||
std::string index_type_value = "BIN_IVF_FLAT";
|
||||
@ -2844,7 +2845,7 @@ TEST(CApiTest, Indexing_Expr_With_binary_Predicate_Term) {
|
||||
|
||||
auto binary_set = indexing->Serialize(milvus::Config{});
|
||||
void* c_load_index_info = nullptr;
|
||||
status = NewLoadIndexInfo(&c_load_index_info);
|
||||
status = NewLoadIndexInfo(&c_load_index_info, c_storage_config);
|
||||
assert(status.error_code == Success);
|
||||
std::string index_type_key = "index_type";
|
||||
std::string index_type_value = "BIN_IVF_FLAT";
|
||||
@ -3003,7 +3004,7 @@ TEST(CApiTest, SealedSegment_search_float_Predicate_Range) {
|
||||
IndexEnum::INDEX_FAISS_IVFPQ, DIM, N);
|
||||
auto binary_set = indexing->Serialize(milvus::Config{});
|
||||
void* c_load_index_info = nullptr;
|
||||
status = NewLoadIndexInfo(&c_load_index_info);
|
||||
status = NewLoadIndexInfo(&c_load_index_info, c_storage_config);
|
||||
assert(status.error_code == Success);
|
||||
std::string index_type_key = "index_type";
|
||||
std::string index_type_value = "IVF_PQ";
|
||||
@ -3282,7 +3283,7 @@ TEST(CApiTest, SealedSegment_search_float_With_Expr_Predicate_Range) {
|
||||
|
||||
auto binary_set = indexing->Serialize(milvus::Config{});
|
||||
void* c_load_index_info = nullptr;
|
||||
status = NewLoadIndexInfo(&c_load_index_info);
|
||||
status = NewLoadIndexInfo(&c_load_index_info, c_storage_config);
|
||||
assert(status.error_code == Success);
|
||||
std::string index_type_key = "index_type";
|
||||
std::string index_type_value = "IVF_PQ";
|
||||
|
||||
@ -9,22 +9,16 @@
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <boost/system/error_code.hpp>
|
||||
#include <fstream>
|
||||
#include <gtest/gtest.h>
|
||||
#include <iostream>
|
||||
#include <random>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <yaml-cpp/yaml.h>
|
||||
|
||||
#include "storage/Event.h"
|
||||
#include "storage/MinioChunkManager.h"
|
||||
#include "storage/LocalChunkManager.h"
|
||||
#include "storage/DiskFileManagerImpl.h"
|
||||
#include "config/ConfigChunkManager.h"
|
||||
#include "config/ConfigKnowhere.h"
|
||||
#include "test_utils/indexbuilder_test_utils.h"
|
||||
|
||||
using namespace std;
|
||||
using namespace milvus;
|
||||
@ -39,84 +33,21 @@ class DiskAnnFileManagerTest : public testing::Test {
|
||||
~DiskAnnFileManagerTest() {
|
||||
}
|
||||
|
||||
bool
|
||||
FindFile(const path& dir, const string& file_name, path& path_found) {
|
||||
const recursive_directory_iterator end;
|
||||
boost::system::error_code err;
|
||||
auto iter = recursive_directory_iterator(dir, err);
|
||||
while (iter != end) {
|
||||
try {
|
||||
if ((*iter).path().filename() == file_name) {
|
||||
path_found = (*iter).path();
|
||||
return true;
|
||||
}
|
||||
iter++;
|
||||
} catch (filesystem_error& e) {
|
||||
} catch (std::exception& e) {
|
||||
// ignore error
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
string
|
||||
GetConfig() {
|
||||
char testPath[100];
|
||||
auto pwd = string(getcwd(testPath, sizeof(testPath)));
|
||||
path filepath;
|
||||
auto currentPath = path(pwd);
|
||||
while (!FindFile(currentPath, "milvus.yaml", filepath)) {
|
||||
currentPath = currentPath.append("../");
|
||||
}
|
||||
return filepath.string();
|
||||
}
|
||||
|
||||
void
|
||||
InitRemoteChunkManager() {
|
||||
auto configPath = GetConfig();
|
||||
cout << configPath << endl;
|
||||
YAML::Node config;
|
||||
config = YAML::LoadFile(configPath);
|
||||
auto minioConfig = config["minio"];
|
||||
auto address = minioConfig["address"].as<string>();
|
||||
auto port = minioConfig["port"].as<string>();
|
||||
auto endpoint = address + ":" + port;
|
||||
auto accessKey = minioConfig["accessKeyID"].as<string>();
|
||||
auto accessValue = minioConfig["secretAccessKey"].as<string>();
|
||||
auto useSSL = minioConfig["useSSL"].as<bool>();
|
||||
auto bucketName = minioConfig["bucketName"].as<string>();
|
||||
|
||||
ChunkMangerConfig::SetAddress(endpoint);
|
||||
ChunkMangerConfig::SetAccessKey(accessKey);
|
||||
ChunkMangerConfig::SetAccessValue(accessValue);
|
||||
ChunkMangerConfig::SetBucketName(bucketName);
|
||||
ChunkMangerConfig::SetUseSSL(useSSL);
|
||||
}
|
||||
|
||||
void
|
||||
InitLocalChunkManager() {
|
||||
ChunkMangerConfig::SetLocalRootPath("/tmp/diskann");
|
||||
config::KnowhereSetIndexSliceSize(5);
|
||||
}
|
||||
|
||||
virtual void
|
||||
SetUp() {
|
||||
InitLocalChunkManager();
|
||||
InitRemoteChunkManager();
|
||||
ChunkMangerConfig::SetLocalRootPath("/tmp/diskann");
|
||||
config::KnowhereSetIndexSliceSize(5);
|
||||
storage_config_ = get_default_storage_config();
|
||||
}
|
||||
|
||||
protected:
|
||||
StorageConfig storage_config_;
|
||||
};
|
||||
|
||||
TEST_F(DiskAnnFileManagerTest, AddFilePositive) {
|
||||
auto& lcm = LocalChunkManager::GetInstance();
|
||||
auto& rcm = MinioChunkManager::GetInstance();
|
||||
|
||||
string testBucketName = "test-diskann";
|
||||
rcm.SetBucketName(testBucketName);
|
||||
EXPECT_EQ(rcm.GetBucketName(), testBucketName);
|
||||
|
||||
if (!rcm.BucketExists(testBucketName)) {
|
||||
rcm.CreateBucket(testBucketName);
|
||||
}
|
||||
storage_config_.bucket_name = testBucketName;
|
||||
|
||||
std::string indexFilePath = "/tmp/diskann/index_files/1000/index";
|
||||
auto exist = lcm.Exist(indexFilePath);
|
||||
@ -132,33 +63,32 @@ TEST_F(DiskAnnFileManagerTest, AddFilePositive) {
|
||||
IndexMeta index_meta = {3, 100, 1000, 1, "index"};
|
||||
|
||||
int64_t slice_size = config::KnowhereGetIndexSliceSize() << 20;
|
||||
auto diskAnnFileManager = std::make_shared<DiskFileManagerImpl>(filed_data_meta, index_meta);
|
||||
diskAnnFileManager->AddFile(indexFilePath);
|
||||
|
||||
// check result
|
||||
auto remotePrefix = diskAnnFileManager->GetRemoteIndexObjectPrefix();
|
||||
auto remoteIndexFiles = rcm.ListWithPrefix(remotePrefix);
|
||||
auto diskAnnFileManager = std::make_shared<DiskFileManagerImpl>(filed_data_meta, index_meta, storage_config_);
|
||||
auto ok = diskAnnFileManager->AddFile(indexFilePath);
|
||||
EXPECT_EQ(ok, true);
|
||||
|
||||
auto remote_files_to_size = diskAnnFileManager->GetRemotePathsToFileSize();
|
||||
auto num_slice = index_size / slice_size;
|
||||
EXPECT_EQ(remoteIndexFiles.size(), index_size % slice_size == 0 ? num_slice : num_slice + 1);
|
||||
EXPECT_EQ(remote_files_to_size.size(), index_size % slice_size == 0 ? num_slice : num_slice + 1);
|
||||
|
||||
diskAnnFileManager->CacheIndexToDisk(remoteIndexFiles);
|
||||
auto fileSize1 = rcm.Size(remoteIndexFiles[0]);
|
||||
auto buf = std::unique_ptr<uint8_t[]>(new uint8_t[fileSize1]);
|
||||
rcm.Read(remoteIndexFiles[0], buf.get(), fileSize1);
|
||||
|
||||
auto index = DeserializeFileData(buf.get(), fileSize1);
|
||||
auto payload = index->GetPayload();
|
||||
auto rows = payload->rows;
|
||||
auto rawData = payload->raw_data;
|
||||
|
||||
EXPECT_EQ(rows, index_size);
|
||||
EXPECT_EQ(rawData[0], data[0]);
|
||||
EXPECT_EQ(rawData[4], data[4]);
|
||||
|
||||
auto files = diskAnnFileManager->GetRemotePathsToFileSize();
|
||||
for (auto& value : files) {
|
||||
rcm.Remove(value.first);
|
||||
std::vector<std::string> remote_files;
|
||||
for (auto& file2size : remote_files_to_size) {
|
||||
remote_files.emplace_back(file2size.first);
|
||||
}
|
||||
diskAnnFileManager->CacheIndexToDisk(remote_files);
|
||||
auto local_files = diskAnnFileManager->GetLocalFilePaths();
|
||||
for (auto& file : local_files) {
|
||||
auto file_size = lcm.Size(file);
|
||||
auto buf = std::unique_ptr<uint8_t[]>(new uint8_t[file_size]);
|
||||
lcm.Read(file, buf.get(), file_size);
|
||||
|
||||
auto index = FieldData(buf.get(), file_size);
|
||||
auto payload = index.get_payload();
|
||||
auto rows = payload->rows;
|
||||
auto rawData = payload->raw_data;
|
||||
|
||||
EXPECT_EQ(rows, index_size);
|
||||
EXPECT_EQ(rawData[0], data[0]);
|
||||
EXPECT_EQ(rawData[4], data[4]);
|
||||
}
|
||||
rcm.DeleteBucket(testBucketName);
|
||||
}
|
||||
|
||||
@ -630,7 +630,7 @@ TEST(Expr, TestCompareWithScalarIndex) {
|
||||
auto seg = CreateSealedSegment(schema);
|
||||
int N = 1000;
|
||||
auto raw_data = DataGen(schema, N);
|
||||
index::LoadIndexInfo load_index_info;
|
||||
segcore::LoadIndexInfo load_index_info;
|
||||
|
||||
// load index for int32 field
|
||||
auto age32_col = raw_data.get_col<int32_t>(i32_fid);
|
||||
@ -720,7 +720,7 @@ TEST(Expr, TestCompareWithScalarIndexMaris) {
|
||||
auto seg = CreateSealedSegment(schema);
|
||||
int N = 1000;
|
||||
auto raw_data = DataGen(schema, N);
|
||||
index::LoadIndexInfo load_index_info;
|
||||
segcore::LoadIndexInfo load_index_info;
|
||||
|
||||
// load index for int32 field
|
||||
auto str1_col = raw_data.get_col<std::string>(str1_fid);
|
||||
@ -1300,7 +1300,7 @@ TEST(Expr, TestBinaryArithOpEvalRangeWithScalarSortIndex) {
|
||||
auto seg = CreateSealedSegment(schema);
|
||||
int N = 1000;
|
||||
auto raw_data = DataGen(schema, N);
|
||||
index::LoadIndexInfo load_index_info;
|
||||
segcore::LoadIndexInfo load_index_info;
|
||||
|
||||
// load index for int8 field
|
||||
auto age8_col = raw_data.get_col<int8_t>(i8_fid);
|
||||
|
||||
@ -11,23 +11,19 @@
|
||||
|
||||
#include <google/protobuf/text_format.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include <map>
|
||||
#include <tuple>
|
||||
#include <knowhere/index/vector_index/helpers/IndexParameter.h>
|
||||
#include <knowhere/index/vector_index/adapter/VectorAdapter.h>
|
||||
#include <knowhere/index/vector_index/ConfAdapterMgr.h>
|
||||
#include <knowhere/archive/KnowhereConfig.h>
|
||||
#include "pb/index_cgo_msg.pb.h"
|
||||
|
||||
#include "indexbuilder/VecIndexCreator.h"
|
||||
#include "indexbuilder/index_c.h"
|
||||
#include "test_utils/DataGen.h"
|
||||
#include "test_utils/indexbuilder_test_utils.h"
|
||||
#include "indexbuilder/ScalarIndexCreator.h"
|
||||
#include "indexbuilder/IndexFactory.h"
|
||||
#include "common/type_c.h"
|
||||
|
||||
constexpr int NB = 10;
|
||||
const CStorageConfig c_storage_config = get_default_cstorage_config();
|
||||
|
||||
TEST(FloatVecIndex, All) {
|
||||
auto index_type = knowhere::IndexEnum::INDEX_FAISS_IVFPQ;
|
||||
@ -51,7 +47,7 @@ TEST(FloatVecIndex, All) {
|
||||
CIndex copy_index;
|
||||
|
||||
{
|
||||
status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index);
|
||||
status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index, c_storage_config);
|
||||
ASSERT_EQ(Success, status.error_code);
|
||||
}
|
||||
{
|
||||
@ -63,7 +59,7 @@ TEST(FloatVecIndex, All) {
|
||||
ASSERT_EQ(Success, status.error_code);
|
||||
}
|
||||
{
|
||||
status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index);
|
||||
status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index, c_storage_config);
|
||||
ASSERT_EQ(Success, status.error_code);
|
||||
}
|
||||
{
|
||||
@ -102,7 +98,7 @@ TEST(BinaryVecIndex, All) {
|
||||
CIndex copy_index;
|
||||
|
||||
{
|
||||
status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index);
|
||||
status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index, c_storage_config);
|
||||
ASSERT_EQ(Success, status.error_code);
|
||||
}
|
||||
{
|
||||
@ -114,7 +110,7 @@ TEST(BinaryVecIndex, All) {
|
||||
ASSERT_EQ(Success, status.error_code);
|
||||
}
|
||||
{
|
||||
status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index);
|
||||
status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index, c_storage_config);
|
||||
ASSERT_EQ(Success, status.error_code);
|
||||
}
|
||||
{
|
||||
@ -154,7 +150,7 @@ TEST(CBoolIndexTest, All) {
|
||||
CIndex copy_index;
|
||||
|
||||
{
|
||||
status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index);
|
||||
status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index, c_storage_config);
|
||||
ASSERT_EQ(Success, status.error_code);
|
||||
}
|
||||
{
|
||||
@ -166,7 +162,8 @@ TEST(CBoolIndexTest, All) {
|
||||
ASSERT_EQ(Success, status.error_code);
|
||||
}
|
||||
{
|
||||
status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index);
|
||||
status =
|
||||
CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index, c_storage_config);
|
||||
ASSERT_EQ(Success, status.error_code);
|
||||
}
|
||||
{
|
||||
@ -204,7 +201,7 @@ TEST(CInt64IndexTest, All) {
|
||||
CIndex copy_index;
|
||||
|
||||
{
|
||||
status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index);
|
||||
status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index, c_storage_config);
|
||||
ASSERT_EQ(Success, status.error_code);
|
||||
}
|
||||
{
|
||||
@ -216,7 +213,8 @@ TEST(CInt64IndexTest, All) {
|
||||
ASSERT_EQ(Success, status.error_code);
|
||||
}
|
||||
{
|
||||
status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index);
|
||||
status =
|
||||
CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index, c_storage_config);
|
||||
ASSERT_EQ(Success, status.error_code);
|
||||
}
|
||||
{
|
||||
@ -256,7 +254,7 @@ TEST(CStringIndexTest, All) {
|
||||
CIndex copy_index;
|
||||
|
||||
{
|
||||
status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index);
|
||||
status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), &index, c_storage_config);
|
||||
ASSERT_EQ(Success, status.error_code);
|
||||
}
|
||||
{
|
||||
@ -268,7 +266,8 @@ TEST(CStringIndexTest, All) {
|
||||
ASSERT_EQ(Success, status.error_code);
|
||||
}
|
||||
{
|
||||
status = CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index);
|
||||
status =
|
||||
CreateIndex(dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index, c_storage_config);
|
||||
ASSERT_EQ(Success, status.error_code);
|
||||
}
|
||||
{
|
||||
|
||||
@ -13,7 +13,6 @@
|
||||
#include <gtest/gtest.h>
|
||||
#include <map>
|
||||
#include <tuple>
|
||||
#include <yaml-cpp/yaml.h>
|
||||
|
||||
#include "indexbuilder/IndexFactory.h"
|
||||
#include "indexbuilder/VecIndexCreator.h"
|
||||
@ -33,6 +32,7 @@ class IndexWrapperTest : public ::testing::TestWithParam<Param> {
|
||||
SetUp() override {
|
||||
knowhere::KnowhereConfig::SetStatisticsLevel(3);
|
||||
knowhere::KnowhereConfig::SetIndexFileSliceSize(16);
|
||||
storage_config_ = get_default_storage_config();
|
||||
|
||||
auto param = GetParam();
|
||||
index_type = param.first;
|
||||
@ -94,6 +94,7 @@ class IndexWrapperTest : public ::testing::TestWithParam<Param> {
|
||||
knowhere::DatasetPtr xq_dataset;
|
||||
int64_t query_offset = 100;
|
||||
int64_t NB = 10000;
|
||||
StorageConfig storage_config_;
|
||||
};
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
@ -111,7 +112,7 @@ INSTANTIATE_TEST_CASE_P(
|
||||
|
||||
TEST_P(IndexWrapperTest, BuildAndQuery) {
|
||||
auto index = milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex(
|
||||
vec_field_data_type, type_params_str.c_str(), index_params_str.c_str());
|
||||
vec_field_data_type, type_params_str.c_str(), index_params_str.c_str(), storage_config_);
|
||||
|
||||
auto dataset = GenDataset(NB, metric_type, is_binary);
|
||||
auto xb_data = dataset.get_col<float>(milvus::FieldId(100));
|
||||
@ -119,7 +120,7 @@ TEST_P(IndexWrapperTest, BuildAndQuery) {
|
||||
ASSERT_NO_THROW(index->Build(xb_dataset));
|
||||
auto binary_set = index->Serialize();
|
||||
auto copy_index = milvus::indexbuilder::IndexFactory::GetInstance().CreateIndex(
|
||||
vec_field_data_type, type_params_str.c_str(), index_params_str.c_str());
|
||||
vec_field_data_type, type_params_str.c_str(), index_params_str.c_str(), storage_config_);
|
||||
auto vec_index = static_cast<milvus::indexbuilder::VecIndexCreator*>(copy_index.get());
|
||||
ASSERT_EQ(vec_index->dim(), DIM);
|
||||
ASSERT_NO_THROW(vec_index->Load(binary_set));
|
||||
|
||||
@ -15,7 +15,6 @@
|
||||
#include <random>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <yaml-cpp/yaml.h>
|
||||
|
||||
#include "faiss/utils/distances.h"
|
||||
#include "query/SearchBruteForce.h"
|
||||
@ -28,7 +27,6 @@
|
||||
#include "test_utils/Timer.h"
|
||||
|
||||
#ifdef BUILD_DISK_ANN
|
||||
#include <boost/filesystem.hpp>
|
||||
#include "storage/MinioChunkManager.h"
|
||||
#include "storage/DiskFileManagerImpl.h"
|
||||
|
||||
@ -284,68 +282,11 @@ using Param = std::pair<knowhere::IndexType, knowhere::MetricType>;
|
||||
|
||||
class IndexTest : public ::testing::TestWithParam<Param> {
|
||||
protected:
|
||||
//#ifdef BUILD_DISK_ANN
|
||||
// bool
|
||||
// FindFile(const path& dir, const std::string& file_name, path& path_found) {
|
||||
// const recursive_directory_iterator end;
|
||||
// boost::system::error_code err;
|
||||
// auto iter = recursive_directory_iterator(dir, err);
|
||||
// while (iter != end) {
|
||||
// try {
|
||||
// if ((*iter).path().filename() == file_name) {
|
||||
// path_found = (*iter).path();
|
||||
// return true;
|
||||
// }
|
||||
// iter++;
|
||||
// } catch (filesystem_error& e) {
|
||||
// } catch (std::exception& e) {
|
||||
// // ignore error
|
||||
// }
|
||||
// }
|
||||
// return false;
|
||||
// }
|
||||
//
|
||||
// void
|
||||
// init_minio() {
|
||||
// char testPath[100];
|
||||
// auto pwd = std::string(getcwd(testPath, sizeof(testPath)));
|
||||
// path filepath;
|
||||
// auto currentPath = path(pwd);
|
||||
// while (!FindFile(currentPath, "milvus.yaml", filepath)) {
|
||||
// currentPath = currentPath.append("../");
|
||||
// }
|
||||
// auto configPath = filepath.string();
|
||||
// YAML::Node config;
|
||||
// config = YAML::LoadFile(configPath);
|
||||
// auto minioConfig = config["minio"];
|
||||
// auto address = minioConfig["address"].as<std::string>();
|
||||
// auto port = minioConfig["port"].as<std::string>();
|
||||
// auto endpoint = address + ":" + port;
|
||||
// auto accessKey = minioConfig["accessKeyID"].as<std::string>();
|
||||
// auto accessValue = minioConfig["secretAccessKey"].as<std::string>();
|
||||
// auto useSSL = minioConfig["useSSL"].as<bool>();
|
||||
// auto bucketName = minioConfig["bucketName"].as<std::string>();
|
||||
//
|
||||
// ChunkMangerConfig::SetAddress(endpoint);
|
||||
// ChunkMangerConfig::SetAccessKey(accessKey);
|
||||
// ChunkMangerConfig::SetAccessValue(accessValue);
|
||||
// ChunkMangerConfig::SetBucketName(bucketName);
|
||||
// ChunkMangerConfig::SetUseSSL(useSSL);
|
||||
// auto& chunk_manager = milvus::storage::MinioChunkManager::GetInstance();
|
||||
// chunk_manager.SetBucketName(bucketName);
|
||||
// if (!chunk_manager.BucketExists(bucketName)) {
|
||||
// chunk_manager.CreateBucket(bucketName);
|
||||
// }
|
||||
// }
|
||||
//#endif
|
||||
|
||||
void
|
||||
SetUp() override {
|
||||
knowhere::KnowhereConfig::SetStatisticsLevel(3);
|
||||
knowhere::KnowhereConfig::SetIndexFileSliceSize(16);
|
||||
//#ifdef BUILD_DISK_ANN
|
||||
// init_minio();
|
||||
//#endif
|
||||
storage_config_ = get_default_storage_config();
|
||||
|
||||
auto param = GetParam();
|
||||
index_type = param.first;
|
||||
@ -402,6 +343,7 @@ class IndexTest : public ::testing::TestWithParam<Param> {
|
||||
knowhere::DatasetPtr xq_dataset;
|
||||
int64_t query_offset = 100;
|
||||
int64_t NB = 10000;
|
||||
StorageConfig storage_config_;
|
||||
};
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
@ -431,7 +373,8 @@ TEST_P(IndexTest, BuildAndQuery) {
|
||||
#ifdef BUILD_DISK_ANN
|
||||
milvus::storage::FieldDataMeta field_data_meta{1, 2, 3, 100};
|
||||
milvus::storage::IndexMeta index_meta{3, 100, 1000, 1};
|
||||
auto file_manager = std::make_shared<milvus::storage::DiskFileManagerImpl>(field_data_meta, index_meta);
|
||||
auto file_manager =
|
||||
std::make_shared<milvus::storage::DiskFileManagerImpl>(field_data_meta, index_meta, storage_config_);
|
||||
index = milvus::index::IndexFactory::GetInstance().CreateIndex(create_index_info, file_manager);
|
||||
#endif
|
||||
} else {
|
||||
@ -447,7 +390,8 @@ TEST_P(IndexTest, BuildAndQuery) {
|
||||
index.reset();
|
||||
milvus::storage::FieldDataMeta field_data_meta{1, 2, 3, 100};
|
||||
milvus::storage::IndexMeta index_meta{3, 100, 1000, 1};
|
||||
auto file_manager = std::make_shared<milvus::storage::DiskFileManagerImpl>(field_data_meta, index_meta);
|
||||
auto file_manager =
|
||||
std::make_shared<milvus::storage::DiskFileManagerImpl>(field_data_meta, index_meta, storage_config_);
|
||||
auto new_index = milvus::index::IndexFactory::GetInstance().CreateIndex(create_index_info, file_manager);
|
||||
vec_index = dynamic_cast<milvus::index::VectorIndex*>(new_index.get());
|
||||
|
||||
|
||||
@ -9,16 +9,12 @@
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <boost/system/error_code.hpp>
|
||||
#include <gtest/gtest.h>
|
||||
#include <iostream>
|
||||
#include <random>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <yaml-cpp/yaml.h>
|
||||
|
||||
#include "storage/MinioChunkManager.h"
|
||||
#include "test_utils/indexbuilder_test_utils.h"
|
||||
|
||||
using namespace std;
|
||||
using namespace milvus;
|
||||
@ -32,121 +28,70 @@ class MinioChunkManagerTest : public testing::Test {
|
||||
~MinioChunkManagerTest() {
|
||||
}
|
||||
|
||||
bool
|
||||
FindFile(const path& dir, const string& file_name, path& path_found) {
|
||||
const recursive_directory_iterator end;
|
||||
boost::system::error_code err;
|
||||
auto iter = recursive_directory_iterator(dir, err);
|
||||
while (iter != end) {
|
||||
try {
|
||||
if ((*iter).path().filename() == file_name) {
|
||||
path_found = (*iter).path();
|
||||
return true;
|
||||
}
|
||||
iter++;
|
||||
} catch (filesystem_error& e) {
|
||||
} catch (std::exception& e) {
|
||||
// ignore error
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
string
|
||||
GetConfig() {
|
||||
char testPath[100];
|
||||
auto pwd = string(getcwd(testPath, sizeof(testPath)));
|
||||
path filepath;
|
||||
auto currentPath = path(pwd);
|
||||
while (!FindFile(currentPath, "milvus.yaml", filepath)) {
|
||||
currentPath = currentPath.append("../");
|
||||
}
|
||||
return filepath.string();
|
||||
}
|
||||
|
||||
virtual void
|
||||
SetUp() {
|
||||
auto configPath = GetConfig();
|
||||
cout << configPath << endl;
|
||||
YAML::Node config;
|
||||
config = YAML::LoadFile(configPath);
|
||||
auto minioConfig = config["minio"];
|
||||
auto address = minioConfig["address"].as<string>();
|
||||
auto port = minioConfig["port"].as<string>();
|
||||
auto endpoint = address + ":" + port;
|
||||
auto accessKey = minioConfig["accessKeyID"].as<string>();
|
||||
auto accessValue = minioConfig["secretAccessKey"].as<string>();
|
||||
auto useSSL = minioConfig["useSSL"].as<bool>();
|
||||
auto bucketName = minioConfig["bucketName"].as<string>();
|
||||
|
||||
ChunkMangerConfig::SetAddress(endpoint);
|
||||
ChunkMangerConfig::SetAccessKey(accessKey);
|
||||
ChunkMangerConfig::SetAccessValue(accessValue);
|
||||
ChunkMangerConfig::SetBucketName(bucketName);
|
||||
ChunkMangerConfig::SetUseSSL(useSSL);
|
||||
chunk_manager_ = std::make_unique<MinioChunkManager>(get_default_storage_config());
|
||||
}
|
||||
|
||||
protected:
|
||||
MinioChunkManagerPtr chunk_manager_;
|
||||
};
|
||||
|
||||
TEST_F(MinioChunkManagerTest, BucketPositive) {
|
||||
auto& chunk_manager = MinioChunkManager::GetInstance();
|
||||
string testBucketName = "test-bucket";
|
||||
chunk_manager.SetBucketName(testBucketName);
|
||||
chunk_manager.DeleteBucket(testBucketName);
|
||||
bool exist = chunk_manager.BucketExists(testBucketName);
|
||||
chunk_manager_->SetBucketName(testBucketName);
|
||||
bool exist = chunk_manager_->BucketExists(testBucketName);
|
||||
EXPECT_EQ(exist, false);
|
||||
chunk_manager.CreateBucket(testBucketName);
|
||||
exist = chunk_manager.BucketExists(testBucketName);
|
||||
chunk_manager_->CreateBucket(testBucketName);
|
||||
exist = chunk_manager_->BucketExists(testBucketName);
|
||||
EXPECT_EQ(exist, true);
|
||||
chunk_manager.DeleteBucket(testBucketName);
|
||||
chunk_manager_->DeleteBucket(testBucketName);
|
||||
}
|
||||
|
||||
TEST_F(MinioChunkManagerTest, BucketNegtive) {
|
||||
auto& chunk_manager = MinioChunkManager::GetInstance();
|
||||
string testBucketName = "test-bucket-ng";
|
||||
chunk_manager.SetBucketName(testBucketName);
|
||||
chunk_manager.DeleteBucket(testBucketName);
|
||||
chunk_manager_->SetBucketName(testBucketName);
|
||||
chunk_manager_->DeleteBucket(testBucketName);
|
||||
|
||||
// create already exist bucket
|
||||
chunk_manager.CreateBucket(testBucketName);
|
||||
chunk_manager_->CreateBucket(testBucketName);
|
||||
try {
|
||||
chunk_manager.CreateBucket(testBucketName);
|
||||
chunk_manager_->CreateBucket(testBucketName);
|
||||
} catch (S3ErrorException& e) {
|
||||
EXPECT_TRUE(std::string(e.what()).find("BucketAlreadyOwnedByYou") != string::npos);
|
||||
}
|
||||
chunk_manager.DeleteBucket(testBucketName);
|
||||
chunk_manager_->DeleteBucket(testBucketName);
|
||||
}
|
||||
|
||||
TEST_F(MinioChunkManagerTest, ObjectExist) {
|
||||
auto& chunk_manager = MinioChunkManager::GetInstance();
|
||||
string testBucketName = "test-objexist";
|
||||
string objPath = "1/3";
|
||||
chunk_manager.SetBucketName(testBucketName);
|
||||
if (!chunk_manager.BucketExists(testBucketName)) {
|
||||
chunk_manager.CreateBucket(testBucketName);
|
||||
chunk_manager_->SetBucketName(testBucketName);
|
||||
if (!chunk_manager_->BucketExists(testBucketName)) {
|
||||
chunk_manager_->CreateBucket(testBucketName);
|
||||
}
|
||||
|
||||
bool exist = chunk_manager.Exist(objPath);
|
||||
bool exist = chunk_manager_->Exist(objPath);
|
||||
EXPECT_EQ(exist, false);
|
||||
chunk_manager.DeleteBucket(testBucketName);
|
||||
chunk_manager_->DeleteBucket(testBucketName);
|
||||
}
|
||||
|
||||
TEST_F(MinioChunkManagerTest, WritePositive) {
|
||||
auto& chunk_manager = MinioChunkManager::GetInstance();
|
||||
string testBucketName = "test-write";
|
||||
chunk_manager.SetBucketName(testBucketName);
|
||||
EXPECT_EQ(chunk_manager.GetBucketName(), testBucketName);
|
||||
chunk_manager_->SetBucketName(testBucketName);
|
||||
EXPECT_EQ(chunk_manager_->GetBucketName(), testBucketName);
|
||||
|
||||
if (!chunk_manager.BucketExists(testBucketName)) {
|
||||
chunk_manager.CreateBucket(testBucketName);
|
||||
if (!chunk_manager_->BucketExists(testBucketName)) {
|
||||
chunk_manager_->CreateBucket(testBucketName);
|
||||
}
|
||||
uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23};
|
||||
string path = "1/3/5";
|
||||
chunk_manager.Write(path, data, sizeof(data));
|
||||
chunk_manager_->Write(path, data, sizeof(data));
|
||||
|
||||
bool exist = chunk_manager.Exist(path);
|
||||
bool exist = chunk_manager_->Exist(path);
|
||||
EXPECT_EQ(exist, true);
|
||||
|
||||
auto size = chunk_manager.Size(path);
|
||||
auto size = chunk_manager_->Size(path);
|
||||
EXPECT_EQ(size, 5);
|
||||
|
||||
int datasize = 10000;
|
||||
@ -155,34 +100,33 @@ TEST_F(MinioChunkManagerTest, WritePositive) {
|
||||
for (int i = 0; i < datasize; ++i) {
|
||||
bigdata[i] = rand() % 256;
|
||||
}
|
||||
chunk_manager.Write(path, bigdata, datasize);
|
||||
size = chunk_manager.Size(path);
|
||||
chunk_manager_->Write(path, bigdata, datasize);
|
||||
size = chunk_manager_->Size(path);
|
||||
EXPECT_EQ(size, datasize);
|
||||
delete[] bigdata;
|
||||
|
||||
chunk_manager.Remove(path);
|
||||
chunk_manager.DeleteBucket(testBucketName);
|
||||
chunk_manager_->Remove(path);
|
||||
chunk_manager_->DeleteBucket(testBucketName);
|
||||
}
|
||||
|
||||
TEST_F(MinioChunkManagerTest, ReadPositive) {
|
||||
auto& chunk_manager = MinioChunkManager::GetInstance();
|
||||
string testBucketName = "test-read";
|
||||
chunk_manager.SetBucketName(testBucketName);
|
||||
EXPECT_EQ(chunk_manager.GetBucketName(), testBucketName);
|
||||
chunk_manager_->SetBucketName(testBucketName);
|
||||
EXPECT_EQ(chunk_manager_->GetBucketName(), testBucketName);
|
||||
|
||||
if (!chunk_manager.BucketExists(testBucketName)) {
|
||||
chunk_manager.CreateBucket(testBucketName);
|
||||
if (!chunk_manager_->BucketExists(testBucketName)) {
|
||||
chunk_manager_->CreateBucket(testBucketName);
|
||||
}
|
||||
uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23};
|
||||
string path = "1/4/6";
|
||||
chunk_manager.Write(path, data, sizeof(data));
|
||||
bool exist = chunk_manager.Exist(path);
|
||||
chunk_manager_->Write(path, data, sizeof(data));
|
||||
bool exist = chunk_manager_->Exist(path);
|
||||
EXPECT_EQ(exist, true);
|
||||
auto size = chunk_manager.Size(path);
|
||||
auto size = chunk_manager_->Size(path);
|
||||
EXPECT_EQ(size, 5);
|
||||
|
||||
uint8_t readdata[20] = {0};
|
||||
size = chunk_manager.Read(path, readdata, 20);
|
||||
size = chunk_manager_->Read(path, readdata, 20);
|
||||
EXPECT_EQ(size, 5);
|
||||
EXPECT_EQ(readdata[0], 0x17);
|
||||
EXPECT_EQ(readdata[1], 0x32);
|
||||
@ -190,19 +134,19 @@ TEST_F(MinioChunkManagerTest, ReadPositive) {
|
||||
EXPECT_EQ(readdata[3], 0x34);
|
||||
EXPECT_EQ(readdata[4], 0x23);
|
||||
|
||||
size = chunk_manager.Read(path, readdata, 3);
|
||||
size = chunk_manager_->Read(path, readdata, 3);
|
||||
EXPECT_EQ(size, 3);
|
||||
EXPECT_EQ(readdata[0], 0x17);
|
||||
EXPECT_EQ(readdata[1], 0x32);
|
||||
EXPECT_EQ(readdata[2], 0x45);
|
||||
|
||||
uint8_t dataWithNULL[] = {0x17, 0x32, 0x00, 0x34, 0x23};
|
||||
chunk_manager.Write(path, dataWithNULL, sizeof(dataWithNULL));
|
||||
exist = chunk_manager.Exist(path);
|
||||
chunk_manager_->Write(path, dataWithNULL, sizeof(dataWithNULL));
|
||||
exist = chunk_manager_->Exist(path);
|
||||
EXPECT_EQ(exist, true);
|
||||
size = chunk_manager.Size(path);
|
||||
size = chunk_manager_->Size(path);
|
||||
EXPECT_EQ(size, 5);
|
||||
size = chunk_manager.Read(path, readdata, 20);
|
||||
size = chunk_manager_->Read(path, readdata, 20);
|
||||
EXPECT_EQ(size, 5);
|
||||
EXPECT_EQ(readdata[0], 0x17);
|
||||
EXPECT_EQ(readdata[1], 0x32);
|
||||
@ -210,69 +154,67 @@ TEST_F(MinioChunkManagerTest, ReadPositive) {
|
||||
EXPECT_EQ(readdata[3], 0x34);
|
||||
EXPECT_EQ(readdata[4], 0x23);
|
||||
|
||||
chunk_manager.Remove(path);
|
||||
chunk_manager.DeleteBucket(testBucketName);
|
||||
chunk_manager_->Remove(path);
|
||||
chunk_manager_->DeleteBucket(testBucketName);
|
||||
}
|
||||
|
||||
TEST_F(MinioChunkManagerTest, RemovePositive) {
|
||||
auto& chunk_manager = MinioChunkManager::GetInstance();
|
||||
string testBucketName = "test-remove";
|
||||
chunk_manager.SetBucketName(testBucketName);
|
||||
EXPECT_EQ(chunk_manager.GetBucketName(), testBucketName);
|
||||
chunk_manager_->SetBucketName(testBucketName);
|
||||
EXPECT_EQ(chunk_manager_->GetBucketName(), testBucketName);
|
||||
|
||||
if (!chunk_manager.BucketExists(testBucketName)) {
|
||||
chunk_manager.CreateBucket(testBucketName);
|
||||
if (!chunk_manager_->BucketExists(testBucketName)) {
|
||||
chunk_manager_->CreateBucket(testBucketName);
|
||||
}
|
||||
uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23};
|
||||
string path = "1/7/8";
|
||||
chunk_manager.Write(path, data, sizeof(data));
|
||||
chunk_manager_->Write(path, data, sizeof(data));
|
||||
|
||||
bool exist = chunk_manager.Exist(path);
|
||||
bool exist = chunk_manager_->Exist(path);
|
||||
EXPECT_EQ(exist, true);
|
||||
|
||||
chunk_manager.Remove(path);
|
||||
chunk_manager_->Remove(path);
|
||||
|
||||
exist = chunk_manager.Exist(path);
|
||||
exist = chunk_manager_->Exist(path);
|
||||
EXPECT_EQ(exist, false);
|
||||
|
||||
chunk_manager.DeleteBucket(testBucketName);
|
||||
chunk_manager_->DeleteBucket(testBucketName);
|
||||
}
|
||||
|
||||
TEST_F(MinioChunkManagerTest, ListWithPrefixPositive) {
|
||||
auto& chunk_manager = MinioChunkManager::GetInstance();
|
||||
string testBucketName = "test-listprefix";
|
||||
chunk_manager.SetBucketName(testBucketName);
|
||||
EXPECT_EQ(chunk_manager.GetBucketName(), testBucketName);
|
||||
chunk_manager_->SetBucketName(testBucketName);
|
||||
EXPECT_EQ(chunk_manager_->GetBucketName(), testBucketName);
|
||||
|
||||
if (!chunk_manager.BucketExists(testBucketName)) {
|
||||
chunk_manager.CreateBucket(testBucketName);
|
||||
if (!chunk_manager_->BucketExists(testBucketName)) {
|
||||
chunk_manager_->CreateBucket(testBucketName);
|
||||
}
|
||||
|
||||
string path1 = "1/7/8";
|
||||
string path2 = "1/7/4";
|
||||
string path3 = "1/4/8";
|
||||
uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23};
|
||||
chunk_manager.Write(path1, data, sizeof(data));
|
||||
chunk_manager.Write(path2, data, sizeof(data));
|
||||
chunk_manager.Write(path3, data, sizeof(data));
|
||||
chunk_manager_->Write(path1, data, sizeof(data));
|
||||
chunk_manager_->Write(path2, data, sizeof(data));
|
||||
chunk_manager_->Write(path3, data, sizeof(data));
|
||||
|
||||
vector<string> objs = chunk_manager.ListWithPrefix("1/7");
|
||||
vector<string> objs = chunk_manager_->ListWithPrefix("1/7");
|
||||
EXPECT_EQ(objs.size(), 2);
|
||||
std::sort(objs.begin(), objs.end());
|
||||
EXPECT_EQ(objs[0], "1/7/4");
|
||||
EXPECT_EQ(objs[1], "1/7/8");
|
||||
|
||||
objs = chunk_manager.ListWithPrefix("//1/7");
|
||||
objs = chunk_manager_->ListWithPrefix("//1/7");
|
||||
EXPECT_EQ(objs.size(), 2);
|
||||
|
||||
objs = chunk_manager.ListWithPrefix("1");
|
||||
objs = chunk_manager_->ListWithPrefix("1");
|
||||
EXPECT_EQ(objs.size(), 3);
|
||||
std::sort(objs.begin(), objs.end());
|
||||
EXPECT_EQ(objs[0], "1/4/8");
|
||||
EXPECT_EQ(objs[1], "1/7/4");
|
||||
|
||||
chunk_manager.Remove(path1);
|
||||
chunk_manager.Remove(path2);
|
||||
chunk_manager.Remove(path3);
|
||||
chunk_manager.DeleteBucket(testBucketName);
|
||||
chunk_manager_->Remove(path1);
|
||||
chunk_manager_->Remove(path2);
|
||||
chunk_manager_->Remove(path3);
|
||||
chunk_manager_->DeleteBucket(testBucketName);
|
||||
}
|
||||
|
||||
@ -17,12 +17,11 @@
|
||||
#include "segcore/SegmentSealedImpl.h"
|
||||
#include "test_utils/DataGen.h"
|
||||
#include "index/IndexFactory.h"
|
||||
#include "segcore/segcore_init_c.h"
|
||||
|
||||
using namespace milvus;
|
||||
using namespace milvus::query;
|
||||
using namespace milvus::segcore;
|
||||
using milvus::index::LoadIndexInfo;
|
||||
using milvus::segcore::LoadIndexInfo;
|
||||
|
||||
const int64_t ROW_COUNT = 100 * 1000;
|
||||
|
||||
|
||||
@ -88,12 +88,11 @@ TEST(InsertRecordTest, growing_int64_t) {
|
||||
auto i64_fid = schema->AddDebugField("age", DataType::INT64);
|
||||
schema->set_primary_field_id(i64_fid);
|
||||
auto record = milvus::segcore::InsertRecord<false>(*schema, int64_t(32));
|
||||
const int N=100000;
|
||||
const int N = 100000;
|
||||
|
||||
for (int i = 1; i <= N; i++)
|
||||
record.insert_pk(PkType(int64_t(i)), int64_t(i));
|
||||
for (int i = 1; i <= N; i++) record.insert_pk(PkType(int64_t(i)), int64_t(i));
|
||||
|
||||
for (int i = 1; i <= N; i++){
|
||||
for (int i = 1; i <= N; i++) {
|
||||
std::vector<SegOffset> offset = record.search_pk(PkType(int64_t(i)), int64_t(N + 1));
|
||||
ASSERT_EQ(offset[0].get(), int64_t(i));
|
||||
}
|
||||
@ -108,10 +107,9 @@ TEST(InsertRecordTest, growing_string) {
|
||||
auto record = milvus::segcore::InsertRecord<false>(*schema, int64_t(32));
|
||||
const int N = 100000;
|
||||
|
||||
for (int i = 1; i <= N; i++)
|
||||
record.insert_pk(PkType(std::to_string(i)), int64_t(i));
|
||||
for (int i = 1; i <= N; i++) record.insert_pk(PkType(std::to_string(i)), int64_t(i));
|
||||
|
||||
for (int i = 1; i <= N; i++){
|
||||
for (int i = 1; i <= N; i++) {
|
||||
std::vector<SegOffset> offset = record.search_pk(std::to_string(i), int64_t(N + 1));
|
||||
ASSERT_EQ(offset[0].get(), int64_t(i));
|
||||
}
|
||||
@ -126,11 +124,10 @@ TEST(InsertRecordTest, sealed_int64_t) {
|
||||
auto record = milvus::segcore::InsertRecord<true>(*schema, int64_t(32));
|
||||
const int N = 100000;
|
||||
|
||||
for (int i = N; i >= 1; i--)
|
||||
record.insert_pk(PkType(int64_t(i)), int64_t(i));
|
||||
for (int i = N; i >= 1; i--) record.insert_pk(PkType(int64_t(i)), int64_t(i));
|
||||
record.seal_pks();
|
||||
|
||||
for (int i = 1;i <= N; i++){
|
||||
for (int i = 1; i <= N; i++) {
|
||||
std::vector<SegOffset> offset = record.search_pk(PkType(int64_t(i)), int64_t(N + 1));
|
||||
ASSERT_EQ(offset[0].get(), int64_t(i));
|
||||
}
|
||||
@ -145,12 +142,11 @@ TEST(InsertRecordTest, sealed_string) {
|
||||
auto record = milvus::segcore::InsertRecord<true>(*schema, int64_t(32));
|
||||
const int N = 100000;
|
||||
|
||||
for (int i = 1; i <= N; i++)
|
||||
record.insert_pk(PkType(std::to_string(i)), int64_t(i));
|
||||
for (int i = 1; i <= N; i++) record.insert_pk(PkType(std::to_string(i)), int64_t(i));
|
||||
|
||||
record.seal_pks();
|
||||
|
||||
for (int i = 1; i <= N; i++){
|
||||
for (int i = 1; i <= N; i++) {
|
||||
std::vector<SegOffset> offset = record.search_pk(std::to_string(i), int64_t(N + 1));
|
||||
ASSERT_EQ(offset[0].get(), int64_t(i));
|
||||
}
|
||||
|
||||
@ -16,6 +16,8 @@
|
||||
#include <limits>
|
||||
#include <cmath>
|
||||
#include <google/protobuf/text_format.h>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <yaml-cpp/yaml.h>
|
||||
|
||||
#include "DataGen.h"
|
||||
#include "index/ScalarIndex.h"
|
||||
@ -28,6 +30,7 @@
|
||||
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
|
||||
#include "knowhere/index/vector_index/adapter/VectorAdapter.h"
|
||||
#include "pb/index_cgo_msg.pb.h"
|
||||
#include "storage/Types.h"
|
||||
|
||||
constexpr int64_t DIM = 16;
|
||||
constexpr int64_t NQ = 10;
|
||||
@ -43,8 +46,93 @@ using milvus::indexbuilder::ScalarIndexCreator;
|
||||
using ScalarTestParams = std::pair<MapParams, MapParams>;
|
||||
using milvus::index::ScalarIndexPtr;
|
||||
using milvus::index::StringIndexPtr;
|
||||
using milvus::storage::StorageConfig;
|
||||
using namespace boost::filesystem;
|
||||
|
||||
namespace {
|
||||
|
||||
bool
|
||||
find_file(const path& dir, const std::string& file_name, path& path_found) {
|
||||
const recursive_directory_iterator end;
|
||||
boost::system::error_code err;
|
||||
auto iter = recursive_directory_iterator(dir, err);
|
||||
while (iter != end) {
|
||||
try {
|
||||
if ((*iter).path().filename() == file_name) {
|
||||
path_found = (*iter).path();
|
||||
return true;
|
||||
}
|
||||
iter++;
|
||||
} catch (filesystem_error& e) {
|
||||
} catch (std::exception& e) {
|
||||
// ignore error
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
StorageConfig
|
||||
get_default_storage_config() {
|
||||
char testPath[100];
|
||||
auto pwd = std::string(getcwd(testPath, sizeof(testPath)));
|
||||
path filepath;
|
||||
auto currentPath = path(pwd);
|
||||
while (!find_file(currentPath, "milvus.yaml", filepath)) {
|
||||
currentPath = currentPath.append("../");
|
||||
}
|
||||
auto configPath = filepath.string();
|
||||
YAML::Node config;
|
||||
config = YAML::LoadFile(configPath);
|
||||
auto minioConfig = config["minio"];
|
||||
auto address = minioConfig["address"].as<std::string>();
|
||||
auto port = minioConfig["port"].as<std::string>();
|
||||
auto endpoint = address + ":" + port;
|
||||
auto accessKey = minioConfig["accessKeyID"].as<std::string>();
|
||||
auto accessValue = minioConfig["secretAccessKey"].as<std::string>();
|
||||
auto rootPath = minioConfig["rootPath"].as<std::string>();
|
||||
auto useSSL = minioConfig["useSSL"].as<bool>();
|
||||
auto useIam = minioConfig["useIAM"].as<bool>();
|
||||
auto iamEndPoint = minioConfig["iamEndpoint"].as<std::string>();
|
||||
auto bucketName = minioConfig["bucketName"].as<std::string>();
|
||||
|
||||
return StorageConfig{endpoint, bucketName, accessKey, accessValue, rootPath, "minio", iamEndPoint, useSSL, useIam};
|
||||
}
|
||||
|
||||
CStorageConfig
|
||||
get_default_cstorage_config() {
|
||||
char testPath[100];
|
||||
auto pwd = std::string(getcwd(testPath, sizeof(testPath)));
|
||||
path filepath;
|
||||
auto currentPath = path(pwd);
|
||||
while (!find_file(currentPath, "milvus.yaml", filepath)) {
|
||||
currentPath = currentPath.append("../");
|
||||
}
|
||||
auto configPath = filepath.string();
|
||||
YAML::Node config;
|
||||
config = YAML::LoadFile(configPath);
|
||||
auto minioConfig = config["minio"];
|
||||
auto address = minioConfig["address"].as<std::string>();
|
||||
auto port = minioConfig["port"].as<std::string>();
|
||||
auto endpoint = address + ":" + port;
|
||||
auto accessKey = minioConfig["accessKeyID"].as<std::string>();
|
||||
auto accessValue = minioConfig["secretAccessKey"].as<std::string>();
|
||||
auto rootPath = minioConfig["rootPath"].as<std::string>();
|
||||
auto useSSL = minioConfig["useSSL"].as<bool>();
|
||||
auto useIam = minioConfig["useIAM"].as<bool>();
|
||||
auto iamEndPoint = minioConfig["iamEndpoint"].as<std::string>();
|
||||
auto bucketName = minioConfig["bucketName"].as<std::string>();
|
||||
|
||||
return CStorageConfig{endpoint.c_str(),
|
||||
bucketName.c_str(),
|
||||
accessKey.c_str(),
|
||||
accessValue.c_str(),
|
||||
rootPath.c_str(),
|
||||
"minio",
|
||||
iamEndPoint.c_str(),
|
||||
useSSL,
|
||||
useIam};
|
||||
}
|
||||
|
||||
auto
|
||||
generate_build_conf(const milvus::IndexType& index_type, const milvus::MetricType& metric_type) {
|
||||
if (index_type == knowhere::IndexEnum::INDEX_FAISS_IDMAP) {
|
||||
|
||||
@ -156,7 +156,6 @@ func (i *IndexNode) initKnowhere() {
|
||||
C.IndexBuilderSetIndexSliceSize(cIndexSliceSize)
|
||||
|
||||
initcore.InitLocalStorageConfig(&Params)
|
||||
initcore.InitMinioConfig(&Params)
|
||||
}
|
||||
|
||||
func (i *IndexNode) initSession() error {
|
||||
|
||||
@ -16,6 +16,19 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||
)
|
||||
|
||||
func genStorageConfig() *indexpb.StorageConfig {
|
||||
return &indexpb.StorageConfig{
|
||||
Address: Params.MinioCfg.Address,
|
||||
AccessKeyID: Params.MinioCfg.AccessKeyID,
|
||||
SecretAccessKey: Params.MinioCfg.SecretAccessKey,
|
||||
BucketName: Params.MinioCfg.BucketName,
|
||||
RootPath: Params.MinioCfg.RootPath,
|
||||
IAMEndpoint: Params.MinioCfg.IAMEndpoint,
|
||||
UseSSL: Params.MinioCfg.UseSSL,
|
||||
UseIAM: Params.MinioCfg.UseIAM,
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndexNodeSimple(t *testing.T) {
|
||||
in, err := NewMockIndexNodeComponent(context.TODO())
|
||||
assert.Nil(t, err)
|
||||
@ -71,6 +84,7 @@ func TestIndexNodeSimple(t *testing.T) {
|
||||
IndexName: idxName,
|
||||
IndexParams: indexParams,
|
||||
TypeParams: typeParams,
|
||||
StorageConfig: genStorageConfig(),
|
||||
}
|
||||
status, err := in.CreateJob(ctx, createReq)
|
||||
assert.Nil(t, err)
|
||||
@ -242,6 +256,7 @@ func TestIndexNodeComplex(t *testing.T) {
|
||||
IndexName: fmt.Sprintf("idx%d", tasks[i].idxID),
|
||||
IndexParams: tasks[i].idxParams,
|
||||
TypeParams: tasks[i].typeParams,
|
||||
StorageConfig: genStorageConfig(),
|
||||
}
|
||||
testwg.Add(1)
|
||||
go func() {
|
||||
|
||||
@ -240,7 +240,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
|
||||
dType := dataset.DType
|
||||
var err error
|
||||
if dType != schemapb.DataType_None {
|
||||
it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams)
|
||||
it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams, it.req.GetStorageConfig())
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Error("failed to create index", zap.Error(err))
|
||||
return err
|
||||
@ -335,7 +335,7 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error {
|
||||
it.newIndexParams["index_id"] = strconv.FormatInt(it.req.IndexID, 10)
|
||||
it.newIndexParams["index_version"] = strconv.FormatInt(it.req.GetIndexVersion(), 10)
|
||||
|
||||
it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams)
|
||||
it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams, it.req.GetStorageConfig())
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Error("failed to create index", zap.Error(err))
|
||||
return err
|
||||
|
||||
@ -40,7 +40,36 @@ type LoadIndexInfo struct {
|
||||
// newLoadIndexInfo returns a new LoadIndexInfo and error
|
||||
func newLoadIndexInfo() (*LoadIndexInfo, error) {
|
||||
var cLoadIndexInfo C.CLoadIndexInfo
|
||||
status := C.NewLoadIndexInfo(&cLoadIndexInfo)
|
||||
|
||||
// TODO::xige-16 support embedded milvus
|
||||
storageType := "minio"
|
||||
cAddress := C.CString(Params.MinioCfg.Address)
|
||||
cBucketName := C.CString(Params.MinioCfg.BucketName)
|
||||
cAccessKey := C.CString(Params.MinioCfg.AccessKeyID)
|
||||
cAccessValue := C.CString(Params.MinioCfg.SecretAccessKey)
|
||||
cRootPath := C.CString(Params.MinioCfg.RootPath)
|
||||
cStorageType := C.CString(storageType)
|
||||
cIamEndPoint := C.CString(Params.MinioCfg.IAMEndpoint)
|
||||
defer C.free(unsafe.Pointer(cAddress))
|
||||
defer C.free(unsafe.Pointer(cBucketName))
|
||||
defer C.free(unsafe.Pointer(cAccessKey))
|
||||
defer C.free(unsafe.Pointer(cAccessValue))
|
||||
defer C.free(unsafe.Pointer(cRootPath))
|
||||
defer C.free(unsafe.Pointer(cStorageType))
|
||||
defer C.free(unsafe.Pointer(cIamEndPoint))
|
||||
storageConfig := C.CStorageConfig{
|
||||
address: cAddress,
|
||||
bucket_name: cBucketName,
|
||||
access_key_id: cAccessKey,
|
||||
access_key_value: cAccessValue,
|
||||
remote_root_path: cRootPath,
|
||||
storage_type: cStorageType,
|
||||
iam_endpoint: cIamEndPoint,
|
||||
useSSL: C.bool(Params.MinioCfg.UseSSL),
|
||||
useIAM: C.bool(Params.MinioCfg.UseIAM),
|
||||
}
|
||||
|
||||
status := C.NewLoadIndexInfo(&cLoadIndexInfo, storageConfig)
|
||||
if err := HandleCStatus(&status, "NewLoadIndexInfo failed"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -27,11 +27,6 @@ import (
|
||||
"runtime"
|
||||
"strconv"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/concurrency"
|
||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||
"github.com/milvus-io/milvus/internal/util/indexcgowrapper"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -43,13 +38,18 @@ import (
|
||||
"github.com/milvus-io/milvus/internal/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/planpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/util"
|
||||
"github.com/milvus-io/milvus/internal/util/concurrency"
|
||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||
"github.com/milvus-io/milvus/internal/util/etcd"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/indexcgowrapper"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
)
|
||||
|
||||
// ---------- unittest util functions ----------
|
||||
@ -278,7 +278,7 @@ func genVectorFieldSchema(param vecFieldParam) *schemapb.FieldSchema {
|
||||
func genIndexBinarySet() ([][]byte, error) {
|
||||
typeParams, indexParams := genIndexParams(IndexFaissIVFPQ, L2)
|
||||
|
||||
index, err := indexcgowrapper.NewCgoIndex(schemapb.DataType_FloatVector, typeParams, indexParams)
|
||||
index, err := indexcgowrapper.NewCgoIndex(schemapb.DataType_FloatVector, typeParams, indexParams, genStorageConfig())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -375,7 +375,7 @@ func generateAndSaveIndex(segmentID UniqueID, msgLength int, indexType, metricTy
|
||||
})
|
||||
}
|
||||
|
||||
index, err := indexcgowrapper.NewCgoIndex(schemapb.DataType_FloatVector, typeParams, indexParams)
|
||||
index, err := indexcgowrapper.NewCgoIndex(schemapb.DataType_FloatVector, typeParams, indexParams, genStorageConfig())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -424,6 +424,19 @@ func generateAndSaveIndex(segmentID UniqueID, msgLength int, indexType, metricTy
|
||||
return indexPaths, nil
|
||||
}
|
||||
|
||||
func genStorageConfig() *indexpb.StorageConfig {
|
||||
return &indexpb.StorageConfig{
|
||||
Address: Params.MinioCfg.Address,
|
||||
AccessKeyID: Params.MinioCfg.AccessKeyID,
|
||||
SecretAccessKey: Params.MinioCfg.SecretAccessKey,
|
||||
BucketName: Params.MinioCfg.BucketName,
|
||||
RootPath: Params.MinioCfg.RootPath,
|
||||
IAMEndpoint: Params.MinioCfg.IAMEndpoint,
|
||||
UseSSL: Params.MinioCfg.UseSSL,
|
||||
UseIAM: Params.MinioCfg.UseIAM,
|
||||
}
|
||||
}
|
||||
|
||||
func genIndexParams(indexType, metricType string) (map[string]string, map[string]string) {
|
||||
typeParams := make(map[string]string)
|
||||
typeParams["dim"] = strconv.Itoa(defaultDim)
|
||||
|
||||
@ -219,7 +219,6 @@ func (node *QueryNode) InitSegcore() {
|
||||
C.SegcoreSetIndexSliceSize(cIndexSliceSize)
|
||||
|
||||
initcore.InitLocalStorageConfig(&Params)
|
||||
initcore.InitMinioConfig(&Params)
|
||||
}
|
||||
|
||||
// Init function init historical and streaming module to manage segments
|
||||
|
||||
@ -10,11 +10,10 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
|
||||
"github.com/milvus-io/milvus/api/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
)
|
||||
|
||||
type indexTestCase struct {
|
||||
@ -293,9 +292,26 @@ func genIndexCase() []indexTestCase {
|
||||
return ret
|
||||
}
|
||||
|
||||
func genStorageConfig() *indexpb.StorageConfig {
|
||||
InitOnce.Do(func() {
|
||||
Params.Init()
|
||||
})
|
||||
|
||||
return &indexpb.StorageConfig{
|
||||
Address: Params.MinioCfg.Address,
|
||||
AccessKeyID: Params.MinioCfg.AccessKeyID,
|
||||
SecretAccessKey: Params.MinioCfg.SecretAccessKey,
|
||||
BucketName: Params.MinioCfg.BucketName,
|
||||
RootPath: Params.MinioCfg.RootPath,
|
||||
IAMEndpoint: Params.MinioCfg.IAMEndpoint,
|
||||
UseSSL: Params.MinioCfg.UseSSL,
|
||||
UseIAM: Params.MinioCfg.UseIAM,
|
||||
}
|
||||
}
|
||||
|
||||
func TestCgoIndex(t *testing.T) {
|
||||
for _, testCase := range genIndexCase() {
|
||||
index, err := NewCgoIndex(testCase.dtype, testCase.typeParams, testCase.indexParams)
|
||||
index, err := NewCgoIndex(testCase.dtype, testCase.typeParams, testCase.indexParams, genStorageConfig())
|
||||
assert.NoError(t, err, testCase)
|
||||
|
||||
dataset := GenDataset(genFieldData(testCase.dtype, nb, dim))
|
||||
@ -304,7 +320,7 @@ func TestCgoIndex(t *testing.T) {
|
||||
blobs, err := index.Serialize()
|
||||
assert.NoError(t, err, testCase)
|
||||
|
||||
copyIndex, err := NewCgoIndex(testCase.dtype, testCase.typeParams, testCase.indexParams)
|
||||
copyIndex, err := NewCgoIndex(testCase.dtype, testCase.typeParams, testCase.indexParams, genStorageConfig())
|
||||
assert.NoError(t, err, testCase)
|
||||
|
||||
assert.NoError(t, copyIndex.Load(blobs), testCase)
|
||||
|
||||
@ -9,6 +9,7 @@ package indexcgowrapper
|
||||
import "C"
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"unsafe"
|
||||
@ -49,7 +50,7 @@ type CgoIndex struct {
|
||||
}
|
||||
|
||||
// TODO: use proto.Marshal instead of proto.MarshalTextString for better compatibility.
|
||||
func NewCgoIndex(dtype schemapb.DataType, typeParams, indexParams map[string]string) (*CgoIndex, error) {
|
||||
func NewCgoIndex(dtype schemapb.DataType, typeParams, indexParams map[string]string, config *indexpb.StorageConfig) (*CgoIndex, error) {
|
||||
protoTypeParams := &indexcgopb.TypeParams{
|
||||
Params: make([]*commonpb.KeyValuePair, 0),
|
||||
}
|
||||
@ -71,9 +72,37 @@ func NewCgoIndex(dtype schemapb.DataType, typeParams, indexParams map[string]str
|
||||
defer C.free(unsafe.Pointer(typeParamsPointer))
|
||||
defer C.free(unsafe.Pointer(indexParamsPointer))
|
||||
|
||||
// TODO::xige-16 support embedded milvus
|
||||
storageType := "minio"
|
||||
cAddress := C.CString(config.Address)
|
||||
cBucketName := C.CString(config.GetBucketName())
|
||||
cAccessKey := C.CString(config.GetAccessKeyID())
|
||||
cAccessValue := C.CString(config.GetSecretAccessKey())
|
||||
cRootPath := C.CString(config.GetRootPath())
|
||||
cStorageType := C.CString(storageType)
|
||||
cIamEndPoint := C.CString(config.GetIAMEndpoint())
|
||||
defer C.free(unsafe.Pointer(cAddress))
|
||||
defer C.free(unsafe.Pointer(cBucketName))
|
||||
defer C.free(unsafe.Pointer(cAccessKey))
|
||||
defer C.free(unsafe.Pointer(cAccessValue))
|
||||
defer C.free(unsafe.Pointer(cRootPath))
|
||||
defer C.free(unsafe.Pointer(cStorageType))
|
||||
defer C.free(unsafe.Pointer(cIamEndPoint))
|
||||
storageConfig := C.CStorageConfig{
|
||||
address: cAddress,
|
||||
bucket_name: cBucketName,
|
||||
access_key_id: cAccessKey,
|
||||
access_key_value: cAccessValue,
|
||||
remote_root_path: cRootPath,
|
||||
storage_type: cStorageType,
|
||||
iam_endpoint: cIamEndPoint,
|
||||
useSSL: C.bool(config.GetUseSSL()),
|
||||
useIAM: C.bool(config.GetUseIAM()),
|
||||
}
|
||||
|
||||
var indexPtr C.CIndex
|
||||
cintDType := uint32(dtype)
|
||||
status := C.CreateIndex(cintDType, typeParamsPointer, indexParamsPointer, &indexPtr)
|
||||
status := C.CreateIndex(cintDType, typeParamsPointer, indexParamsPointer, &indexPtr, storageConfig)
|
||||
if err := HandleCStatus(&status, "failed to create index"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -5,10 +5,13 @@ package indexcgowrapper
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/api/schemapb"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/api/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -40,6 +43,9 @@ const (
|
||||
ef = 200
|
||||
)
|
||||
|
||||
var Params paramtable.ServiceParam
|
||||
var InitOnce sync.Once
|
||||
|
||||
type vecTestCase struct {
|
||||
indexType string
|
||||
metricType string
|
||||
@ -129,7 +135,7 @@ func TestCIndex_New(t *testing.T) {
|
||||
for _, c := range generateTestCases() {
|
||||
typeParams, indexParams := generateParams(c.indexType, c.metricType)
|
||||
|
||||
index, err := NewCgoIndex(c.dtype, typeParams, indexParams)
|
||||
index, err := NewCgoIndex(c.dtype, typeParams, indexParams, genStorageConfig())
|
||||
assert.Equal(t, err, nil)
|
||||
assert.NotEqual(t, index, nil)
|
||||
|
||||
@ -142,7 +148,7 @@ func TestCIndex_BuildFloatVecIndex(t *testing.T) {
|
||||
for _, c := range generateFloatVectorTestCases() {
|
||||
typeParams, indexParams := generateParams(c.indexType, c.metricType)
|
||||
|
||||
index, err := NewCgoIndex(c.dtype, typeParams, indexParams)
|
||||
index, err := NewCgoIndex(c.dtype, typeParams, indexParams, genStorageConfig())
|
||||
assert.Equal(t, err, nil)
|
||||
assert.NotEqual(t, index, nil)
|
||||
|
||||
@ -159,7 +165,7 @@ func TestCIndex_BuildBinaryVecIndex(t *testing.T) {
|
||||
for _, c := range generateBinaryVectorTestCases() {
|
||||
typeParams, indexParams := generateParams(c.indexType, c.metricType)
|
||||
|
||||
index, err := NewCgoIndex(c.dtype, typeParams, indexParams)
|
||||
index, err := NewCgoIndex(c.dtype, typeParams, indexParams, genStorageConfig())
|
||||
assert.Equal(t, err, nil)
|
||||
assert.NotEqual(t, index, nil)
|
||||
|
||||
@ -176,7 +182,7 @@ func TestCIndex_Codec(t *testing.T) {
|
||||
for _, c := range generateTestCases() {
|
||||
typeParams, indexParams := generateParams(c.indexType, c.metricType)
|
||||
|
||||
index, err := NewCgoIndex(c.dtype, typeParams, indexParams)
|
||||
index, err := NewCgoIndex(c.dtype, typeParams, indexParams, genStorageConfig())
|
||||
assert.Equal(t, err, nil)
|
||||
assert.NotEqual(t, index, nil)
|
||||
|
||||
@ -193,7 +199,7 @@ func TestCIndex_Codec(t *testing.T) {
|
||||
blobs, err := index.Serialize()
|
||||
assert.Equal(t, err, nil)
|
||||
|
||||
copyIndex, err := NewCgoIndex(c.dtype, typeParams, indexParams)
|
||||
copyIndex, err := NewCgoIndex(c.dtype, typeParams, indexParams, genStorageConfig())
|
||||
assert.NotEqual(t, copyIndex, nil)
|
||||
assert.Equal(t, err, nil)
|
||||
err = copyIndex.Load(blobs)
|
||||
@ -214,7 +220,7 @@ func TestCIndex_Delete(t *testing.T) {
|
||||
for _, c := range generateTestCases() {
|
||||
typeParams, indexParams := generateParams(c.indexType, c.metricType)
|
||||
|
||||
index, err := NewCgoIndex(c.dtype, typeParams, indexParams)
|
||||
index, err := NewCgoIndex(c.dtype, typeParams, indexParams, genStorageConfig())
|
||||
assert.Equal(t, err, nil)
|
||||
assert.NotEqual(t, index, nil)
|
||||
|
||||
@ -227,7 +233,7 @@ func TestCIndex_Error(t *testing.T) {
|
||||
indexParams := make(map[string]string)
|
||||
indexParams["index_type"] = "IVF_FLAT"
|
||||
indexParams["metric_type"] = "L2"
|
||||
indexPtr, err := NewCgoIndex(schemapb.DataType_FloatVector, nil, indexParams)
|
||||
indexPtr, err := NewCgoIndex(schemapb.DataType_FloatVector, nil, indexParams, genStorageConfig())
|
||||
assert.Nil(t, err)
|
||||
|
||||
t.Run("Serialize error", func(t *testing.T) {
|
||||
|
||||
@ -27,40 +27,11 @@ import "C"
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"unsafe"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
)
|
||||
|
||||
func InitMinioConfig(params *paramtable.ComponentParam) {
|
||||
CMinioAddress := C.CString(params.MinioCfg.Address)
|
||||
C.MinioAddressInit(CMinioAddress)
|
||||
C.free(unsafe.Pointer(CMinioAddress))
|
||||
|
||||
CMinioAccessKey := C.CString(params.MinioCfg.AccessKeyID)
|
||||
C.MinioAccessKeyInit(CMinioAccessKey)
|
||||
C.free(unsafe.Pointer(CMinioAccessKey))
|
||||
|
||||
CMinioAccessValue := C.CString(params.MinioCfg.SecretAccessKey)
|
||||
C.MinioAccessValueInit(CMinioAccessValue)
|
||||
C.free(unsafe.Pointer(CMinioAccessValue))
|
||||
|
||||
CUseSSL := C.bool(params.MinioCfg.UseSSL)
|
||||
C.MinioSSLInit(CUseSSL)
|
||||
|
||||
CUseIam := C.bool(params.MinioCfg.UseIAM)
|
||||
C.MinioUseIamInit(CUseIam)
|
||||
|
||||
CMinioBucketName := C.CString(strings.TrimLeft(params.MinioCfg.BucketName, "/"))
|
||||
C.MinioBucketNameInit(CMinioBucketName)
|
||||
C.free(unsafe.Pointer(CMinioBucketName))
|
||||
|
||||
CMinioRootPath := C.CString(params.MinioCfg.RootPath)
|
||||
C.MinioRootPathInit(CMinioRootPath)
|
||||
C.free(unsafe.Pointer(CMinioRootPath))
|
||||
}
|
||||
|
||||
func InitLocalStorageConfig(params *paramtable.ComponentParam) {
|
||||
b, _ := os.Getwd()
|
||||
LocalRootPath := filepath.Dir(b) + "/" + filepath.Base(b) + "/" + "data/"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user