Merge remote-tracking branch 'main/0.6.0' into 0.6.0

This commit is contained in:
yhz 2019-11-19 13:16:49 +08:00
commit 8733063d6b
69 changed files with 3233 additions and 1651 deletions

View File

@ -10,7 +10,9 @@ Please mark all change in change log and use the ticket from JIRA.
- \#316 - Some files not merged after vectors added
- \#327 - Search does not use GPU when index type is FLAT
- \#340 - Test cases run failed on 0.6.0
- \#353 - Rename config.h.in to version.h.in
- \#374 - sdk_simple return empty result
- \#397 - sdk_simple return incorrect result
## Feature
- \#12 - Pure CPU version for Milvus
@ -22,10 +24,12 @@ Please mark all change in change log and use the ticket from JIRA.
- \#275 - Rename C++ SDK IndexType
- \#284 - Change C++ SDK to shared library
- \#260 - C++ SDK README
- \#266 - Rpc request source code refactor
- \#314 - add Find FAISS in CMake
- \#310 - Add Q&A for 'protocol https not supported or disable in libcurl' issue
- \#322 - Add option to enable / disable prometheus
- \#358 - Add more information in build.sh and install.md
- \#255 - Add ivfsq8 test report detailed version
## Task

View File

@ -2,7 +2,7 @@
String cron_timezone = "TZ=Asia/Shanghai"
String cron_string = BRANCH_NAME == "master" ? "H 0 * * * " : ""
cron_string = BRANCH_NAME == "0.5.1" ? "H 1 * * * " : cron_string
cron_string = BRANCH_NAME == "0.6.0" ? "H 1 * * * " : cron_string
pipeline {
agent none

View File

@ -70,7 +70,7 @@ if (MILVUS_VERSION_MAJOR STREQUAL ""
endif()
message(STATUS "Build version = ${MILVUS_VERSION}")
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/config.h.in ${CMAKE_CURRENT_SOURCE_DIR}/src/config.h @ONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/version.h.in ${CMAKE_CURRENT_SOURCE_DIR}/src/version.h @ONLY)
message(STATUS "Milvus version: "
"${MILVUS_VERSION_MAJOR}.${MILVUS_VERSION_MINOR}.${MILVUS_VERSION_PATCH} "

View File

@ -75,7 +75,13 @@ set(thirdparty_files
)
aux_source_directory(${MILVUS_ENGINE_SRC}/server server_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/server/grpc_impl grpc_server_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/server/grpc_impl/request grpc_request_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/server/grpc_impl grpc_impl_files)
set(grpc_server_files
${grpc_request_files}
${grpc_impl_files}
)
aux_source_directory(${MILVUS_ENGINE_SRC}/utils utils_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_files)

View File

@ -19,7 +19,7 @@
#include "db/Constants.h"
#include "db/engine/ExecutionEngine.h"
#include "src/config.h"
#include "src/version.h"
#include <map>
#include <memory>

View File

@ -1321,8 +1321,7 @@ class RowRecord :
void clear_vector_data();
float vector_data(int index) const;
void set_vector_data(int index, float value);
// void add_vector_data(float value);
void add_vector_data(std::vector<float>::const_iterator begin, std::vector<float>::const_iterator end);
void add_vector_data(float value);
const ::PROTOBUF_NAMESPACE_ID::RepeatedField< float >&
vector_data() const;
::PROTOBUF_NAMESPACE_ID::RepeatedField< float >*
@ -1474,9 +1473,7 @@ class InsertParam :
void clear_row_id_array();
::PROTOBUF_NAMESPACE_ID::int64 row_id_array(int index) const;
void set_row_id_array(int index, ::PROTOBUF_NAMESPACE_ID::int64 value);
// void add_row_id_array(::PROTOBUF_NAMESPACE_ID::int64 value);
void add_row_id_array(std::vector<::PROTOBUF_NAMESPACE_ID::int64>::const_iterator begin,
std::vector<::PROTOBUF_NAMESPACE_ID::int64>::const_iterator end);
void add_row_id_array(::PROTOBUF_NAMESPACE_ID::int64 value);
const ::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >&
row_id_array() const;
::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >*
@ -2139,9 +2136,7 @@ class TopKQueryResult :
void clear_ids();
::PROTOBUF_NAMESPACE_ID::int64 ids(int index) const;
void set_ids(int index, ::PROTOBUF_NAMESPACE_ID::int64 value);
// void add_ids(::PROTOBUF_NAMESPACE_ID::int64 value);
void add_ids(std::vector<::PROTOBUF_NAMESPACE_ID::int64>::const_iterator begin,
std::vector<::PROTOBUF_NAMESPACE_ID::int64>::const_iterator end);
void add_ids(::PROTOBUF_NAMESPACE_ID::int64 value);
const ::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >&
ids() const;
::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >*
@ -2152,8 +2147,7 @@ class TopKQueryResult :
void clear_distances();
float distances(int index) const;
void set_distances(int index, float value);
// void add_distances(float value);
void add_distances(std::vector<float>::const_iterator begin, std::vector<float>::const_iterator end);
void add_distances(float value);
const ::PROTOBUF_NAMESPACE_ID::RepeatedField< float >&
distances() const;
::PROTOBUF_NAMESPACE_ID::RepeatedField< float >*
@ -3928,14 +3922,9 @@ inline void RowRecord::set_vector_data(int index, float value) {
vector_data_.Set(index, value);
// @@protoc_insertion_point(field_set:milvus.grpc.RowRecord.vector_data)
}
//inline void RowRecord::add_vector_data(float value) {
// vector_data_.Add(value);
// // @@protoc_insertion_point(field_add:milvus.grpc.RowRecord.vector_data)
//}
inline void RowRecord::add_vector_data(std::vector<float>::const_iterator begin,
std::vector<float>::const_iterator end) {
vector_data_.Add(begin, end);
// @@protoc_insertion_point(field_add:milvus.grpc.RowRecord.vector_data)
inline void RowRecord::add_vector_data(float value) {
vector_data_.Add(value);
// @@protoc_insertion_point(field_add:milvus.grpc.RowRecord.vector_data)
}
inline const ::PROTOBUF_NAMESPACE_ID::RepeatedField< float >&
RowRecord::vector_data() const {
@ -4048,14 +4037,9 @@ inline void InsertParam::set_row_id_array(int index, ::PROTOBUF_NAMESPACE_ID::in
row_id_array_.Set(index, value);
// @@protoc_insertion_point(field_set:milvus.grpc.InsertParam.row_id_array)
}
//inline void InsertParam::add_row_id_array(::PROTOBUF_NAMESPACE_ID::int64 value) {
// row_id_array_.Add(value);
// // @@protoc_insertion_point(field_add:milvus.grpc.InsertParam.row_id_array)
//}
inline void InsertParam::add_row_id_array(std::vector<::PROTOBUF_NAMESPACE_ID::int64>::const_iterator begin,
std::vector<::PROTOBUF_NAMESPACE_ID::int64>::const_iterator end) {
row_id_array_.Add(begin, end);
// @@protoc_insertion_point(field_add:milvus.grpc.InsertParam.row_id_array)
inline void InsertParam::add_row_id_array(::PROTOBUF_NAMESPACE_ID::int64 value) {
row_id_array_.Add(value);
// @@protoc_insertion_point(field_add:milvus.grpc.InsertParam.row_id_array)
}
inline const ::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >&
InsertParam::row_id_array() const {
@ -4604,14 +4588,9 @@ inline void TopKQueryResult::set_ids(int index, ::PROTOBUF_NAMESPACE_ID::int64 v
ids_.Set(index, value);
// @@protoc_insertion_point(field_set:milvus.grpc.TopKQueryResult.ids)
}
//inline void TopKQueryResult::add_ids(::PROTOBUF_NAMESPACE_ID::int64 value) {
// ids_.Add(value);
// // @@protoc_insertion_point(field_add:milvus.grpc.TopKQueryResult.ids)
//}
inline void TopKQueryResult::add_ids(std::vector<::PROTOBUF_NAMESPACE_ID::int64>::const_iterator begin,
std::vector<::PROTOBUF_NAMESPACE_ID::int64>::const_iterator end) {
ids_.Add(begin,end);
// @@protoc_insertion_point(field_add:milvus.grpc.TopKQueryResult.ids)
inline void TopKQueryResult::add_ids(::PROTOBUF_NAMESPACE_ID::int64 value) {
ids_.Add(value);
// @@protoc_insertion_point(field_add:milvus.grpc.TopKQueryResult.ids)
}
inline const ::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >&
TopKQueryResult::ids() const {
@ -4639,13 +4618,9 @@ inline void TopKQueryResult::set_distances(int index, float value) {
distances_.Set(index, value);
// @@protoc_insertion_point(field_set:milvus.grpc.TopKQueryResult.distances)
}
//inline void TopKQueryResult::add_distances(float value) {
// distances_.Add(value);
// // @@protoc_insertion_point(field_add:milvus.grpc.TopKQueryResult.distances)
//}
inline void TopKQueryResult::add_distances(std::vector<float>::const_iterator begin, std::vector<float>::const_iterator end) {
distances_.Add(begin, end);
// @@protoc_insertion_point(field_add:milvus.grpc.TopKQueryResult.distances)
inline void TopKQueryResult::add_distances(float value) {
distances_.Add(value);
// @@protoc_insertion_point(field_add:milvus.grpc.TopKQueryResult.distances)
}
inline const ::PROTOBUF_NAMESPACE_ID::RepeatedField< float >&
TopKQueryResult::distances() const {

View File

@ -88,14 +88,14 @@ endif ()
include(ThirdPartyPackagesCore)
if (CMAKE_BUILD_TYPE STREQUAL "Release")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O3 -fPIC -DELPP_THREAD_SAFE -fopenmp")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O3 -fPIC -DELPP_THREAD_SAFE -fopenmp -mavx -mf16c -msse4 -mpopcnt")
if (KNOWHERE_GPU_VERSION)
set(CUDA_NVCC_FLAGS "${CUDA_NVCC_FLAGS} -O3")
endif ()
else ()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O0 -g -fPIC -DELPP_THREAD_SAFE -fopenmp")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O3 -g -fPIC -DELPP_THREAD_SAFE -fopenmp -mavx -mf16c -msse4 -mpopcnt")
if (KNOWHERE_GPU_VERSION)
set(CUDA_NVCC_FLAGS "${CUDA_NVCC_FLAGS} -O0 -g")
set(CUDA_NVCC_FLAGS "${CUDA_NVCC_FLAGS} -O3 -g")
endif ()
endif ()

View File

@ -18,6 +18,8 @@
#pragma once
#include <memory>
#include <sstream>
#include "Log.h"
namespace knowhere {
@ -50,6 +52,18 @@ struct Cfg {
CheckValid() {
return true;
}
void
Dump() {
KNOWHERE_LOG_DEBUG << DumpImpl().str();
}
virtual std::stringstream
DumpImpl() {
std::stringstream ss;
ss << "dim: " << d << ", metric: " << int(metric_type) << ", gpuid: " << gpu_id << ", k: " << k;
return ss;
}
};
using Config = std::shared_ptr<Cfg>;

View File

@ -34,4 +34,26 @@ GetMetricType(METRICTYPE& type) {
KNOWHERE_THROW_MSG("Metric type is invalid");
}
std::stringstream
IVFCfg::DumpImpl() {
auto ss = Cfg::DumpImpl();
ss << ", nlist: " << nlist << ", nprobe: " << nprobe;
return ss;
}
std::stringstream
IVFSQCfg::DumpImpl() {
auto ss = IVFCfg::DumpImpl();
ss << ", nbits: " << nbits;
return ss;
}
std::stringstream
NSGCfg::DumpImpl() {
auto ss = IVFCfg::DumpImpl();
ss << ", knng: " << knng << ", search_length: " << search_length << ", out_degree: " << out_degree
<< ", candidate: " << candidate_pool_size;
return ss;
}
} // namespace knowhere

View File

@ -53,6 +53,9 @@ struct IVFCfg : public Cfg {
IVFCfg() = default;
std::stringstream
DumpImpl() override;
bool
CheckValid() override {
return true;
@ -69,6 +72,9 @@ struct IVFSQCfg : public IVFCfg {
: IVFCfg(dim, k, gpu_id, nlist, nprobe, type), nbits(nbits) {
}
std::stringstream
DumpImpl() override;
IVFSQCfg() = default;
bool
@ -119,6 +125,9 @@ struct NSGCfg : public IVFCfg {
NSGCfg() = default;
std::stringstream
DumpImpl() override;
bool
CheckValid() override {
return true;

View File

@ -25,7 +25,7 @@
#include "easyloggingpp/easylogging++.h"
#include "metrics/Metrics.h"
#include "server/Server.h"
#include "src/config.h"
#include "src/version.h"
#include "utils/CommonUtil.h"
#include "utils/SignalUtil.h"

View File

@ -167,7 +167,7 @@ Utils::PrintSearchResult(const std::vector<std::pair<int64_t, milvus::RowRecord>
index++;
std::cout << "No." << index << " vector " << search_id << " top " << topk << " search result:" << std::endl;
for (size_t j = 0; j < topk; j++) {
size_t idx = i * nq + j;
size_t idx = i * topk + j;
std::cout << "\t" << topk_query_result.ids[idx] << "\t" << topk_query_result.distances[idx] << std::endl;
}
}

View File

@ -17,7 +17,7 @@
#include "sdk/grpc/ClientProxy.h"
#include "grpc/gen-milvus/milvus.grpc.pb.h"
#include "src/config.h"
#include "src/version.h"
#include <memory>
#include <string>
@ -32,6 +32,13 @@ UriCheck(const std::string& uri) {
return (index != std::string::npos);
}
void
CopyRowRecord(::milvus::grpc::RowRecord* target, const RowRecord& src) {
auto vector_data = target->mutable_vector_data();
vector_data->Resize(static_cast<int>(src.data.size()), 0.0);
memcpy(vector_data->mutable_data(), src.data.data(), src.data.size() * sizeof(float));
}
Status
ClientProxy::Connect(const ConnectParam& param) {
std::string uri = param.ip_address + ":" + param.port;
@ -201,14 +208,16 @@ ClientProxy::Insert(const std::string& table_name, const std::string& partition_
for (auto& record : record_array) {
::milvus::grpc::RowRecord* grpc_record = insert_param.add_row_record_array();
grpc_record->add_vector_data(record.data.begin(), record.data.end());
CopyRowRecord(grpc_record, record);
}
// Single thread
::milvus::grpc::VectorIds vector_ids;
if (!id_array.empty()) {
/* set user's ids */
insert_param.add_row_id_array(id_array.begin(), id_array.end());
auto row_ids = insert_param.mutable_row_id_array();
row_ids->Resize(static_cast<int>(id_array.size()), -1);
memcpy(row_ids->mutable_data(), id_array.data(), id_array.size() * sizeof(int64_t));
client_ptr_->Insert(vector_ids, insert_param, status);
} else {
client_ptr_->Insert(vector_ids, insert_param, status);
@ -238,7 +247,7 @@ ClientProxy::Search(const std::string& table_name, const std::vector<std::string
}
for (auto& record : query_record_array) {
::milvus::grpc::RowRecord* row_record = search_param.add_query_record_array();
row_record->add_vector_data(record.data.begin(), record.data.end());
CopyRowRecord(row_record, record);
}
// step 2: convert range array

View File

@ -25,7 +25,7 @@
#include "server/DBWrapper.h"
#include "server/Server.h"
#include "server/grpc_impl/GrpcServer.h"
#include "src/config.h"
#include "src/version.h"
#include "utils/Log.h"
#include "utils/LogUtil.h"
#include "utils/SignalUtil.h"

View File

@ -16,7 +16,24 @@
// under the License.
#include "server/grpc_impl/GrpcRequestHandler.h"
#include "server/grpc_impl/GrpcRequestTask.h"
#include "server/grpc_impl/GrpcRequestScheduler.h"
#include "server/grpc_impl/request/CmdRequest.h"
#include "server/grpc_impl/request/CountTableRequest.h"
#include "server/grpc_impl/request/CreateIndexRequest.h"
#include "server/grpc_impl/request/CreatePartitionRequest.h"
#include "server/grpc_impl/request/CreateTableRequest.h"
#include "server/grpc_impl/request/DeleteByDateRequest.h"
#include "server/grpc_impl/request/DescribeIndexRequest.h"
#include "server/grpc_impl/request/DescribeTableRequest.h"
#include "server/grpc_impl/request/DropIndexRequest.h"
#include "server/grpc_impl/request/DropPartitionRequest.h"
#include "server/grpc_impl/request/DropTableRequest.h"
#include "server/grpc_impl/request/HasTableRequest.h"
#include "server/grpc_impl/request/InsertRequest.h"
#include "server/grpc_impl/request/PreloadTableRequest.h"
#include "server/grpc_impl/request/SearchRequest.h"
#include "server/grpc_impl/request/ShowPartitionsRequest.h"
#include "server/grpc_impl/request/ShowTablesRequest.h"
#include "utils/TimeRecorder.h"
#include <vector>
@ -28,8 +45,8 @@ namespace grpc {
::grpc::Status
GrpcRequestHandler::CreateTable(::grpc::ServerContext* context, const ::milvus::grpc::TableSchema* request,
::milvus::grpc::Status* response) {
BaseTaskPtr task_ptr = CreateTableTask::Create(request);
GrpcRequestScheduler::ExecTask(task_ptr, response);
BaseRequestPtr request_ptr = CreateTableRequest::Create(request);
GrpcRequestScheduler::ExecRequest(request_ptr, response);
return ::grpc::Status::OK;
}
@ -37,9 +54,9 @@ GrpcRequestHandler::CreateTable(::grpc::ServerContext* context, const ::milvus::
GrpcRequestHandler::HasTable(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::BoolReply* response) {
bool has_table = false;
BaseTaskPtr task_ptr = HasTableTask::Create(request->table_name(), has_table);
BaseRequestPtr request_ptr = HasTableRequest::Create(request->table_name(), has_table);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_bool_reply(has_table);
response->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
@ -49,25 +66,25 @@ GrpcRequestHandler::HasTable(::grpc::ServerContext* context, const ::milvus::grp
::grpc::Status
GrpcRequestHandler::DropTable(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::Status* response) {
BaseTaskPtr task_ptr = DropTableTask::Create(request->table_name());
GrpcRequestScheduler::ExecTask(task_ptr, response);
BaseRequestPtr request_ptr = DropTableRequest::Create(request->table_name());
GrpcRequestScheduler::ExecRequest(request_ptr, response);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::CreateIndex(::grpc::ServerContext* context, const ::milvus::grpc::IndexParam* request,
::milvus::grpc::Status* response) {
BaseTaskPtr task_ptr = CreateIndexTask::Create(request);
GrpcRequestScheduler::ExecTask(task_ptr, response);
BaseRequestPtr request_ptr = CreateIndexRequest::Create(request);
GrpcRequestScheduler::ExecRequest(request_ptr, response);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::Insert(::grpc::ServerContext* context, const ::milvus::grpc::InsertParam* request,
::milvus::grpc::VectorIds* response) {
BaseTaskPtr task_ptr = InsertTask::Create(request, response);
BaseRequestPtr request_ptr = InsertRequest::Create(request, response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
return ::grpc::Status::OK;
@ -77,9 +94,9 @@ GrpcRequestHandler::Insert(::grpc::ServerContext* context, const ::milvus::grpc:
GrpcRequestHandler::Search(::grpc::ServerContext* context, const ::milvus::grpc::SearchParam* request,
::milvus::grpc::TopKQueryResult* response) {
std::vector<std::string> file_id_array;
BaseTaskPtr task_ptr = SearchTask::Create(request, file_id_array, response);
BaseRequestPtr request_ptr = SearchRequest::Create(request, file_id_array, response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_error_code(grpc_status.error_code());
response->mutable_status()->set_reason(grpc_status.reason());
return ::grpc::Status::OK;
@ -93,9 +110,10 @@ GrpcRequestHandler::SearchInFiles(::grpc::ServerContext* context, const ::milvus
file_id_array.push_back(request->file_id_array(i));
}
::milvus::grpc::SearchInFilesParam* request_mutable = const_cast<::milvus::grpc::SearchInFilesParam*>(request);
BaseTaskPtr task_ptr = SearchTask::Create(request_mutable->mutable_search_param(), file_id_array, response);
BaseRequestPtr request_ptr =
SearchRequest::Create(request_mutable->mutable_search_param(), file_id_array, response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_error_code(grpc_status.error_code());
response->mutable_status()->set_reason(grpc_status.reason());
return ::grpc::Status::OK;
@ -104,9 +122,9 @@ GrpcRequestHandler::SearchInFiles(::grpc::ServerContext* context, const ::milvus
::grpc::Status
GrpcRequestHandler::DescribeTable(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::TableSchema* response) {
BaseTaskPtr task_ptr = DescribeTableTask::Create(request->table_name(), response);
BaseRequestPtr request_ptr = DescribeTableRequest::Create(request->table_name(), response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_error_code(grpc_status.error_code());
response->mutable_status()->set_reason(grpc_status.reason());
return ::grpc::Status::OK;
@ -116,9 +134,9 @@ GrpcRequestHandler::DescribeTable(::grpc::ServerContext* context, const ::milvus
GrpcRequestHandler::CountTable(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::TableRowCount* response) {
int64_t row_count = 0;
BaseTaskPtr task_ptr = CountTableTask::Create(request->table_name(), row_count);
BaseRequestPtr request_ptr = CountTableRequest::Create(request->table_name(), row_count);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_table_row_count(row_count);
response->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
@ -128,9 +146,9 @@ GrpcRequestHandler::CountTable(::grpc::ServerContext* context, const ::milvus::g
::grpc::Status
GrpcRequestHandler::ShowTables(::grpc::ServerContext* context, const ::milvus::grpc::Command* request,
::milvus::grpc::TableNameList* response) {
BaseTaskPtr task_ptr = ShowTablesTask::Create(response);
BaseRequestPtr request_ptr = ShowTablesRequest::Create(response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_error_code(grpc_status.error_code());
response->mutable_status()->set_reason(grpc_status.reason());
return ::grpc::Status::OK;
@ -140,9 +158,9 @@ GrpcRequestHandler::ShowTables(::grpc::ServerContext* context, const ::milvus::g
GrpcRequestHandler::Cmd(::grpc::ServerContext* context, const ::milvus::grpc::Command* request,
::milvus::grpc::StringReply* response) {
std::string result;
BaseTaskPtr task_ptr = CmdTask::Create(request->cmd(), result);
BaseRequestPtr request_ptr = CmdRequest::Create(request->cmd(), result);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_string_reply(result);
response->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
@ -152,9 +170,9 @@ GrpcRequestHandler::Cmd(::grpc::ServerContext* context, const ::milvus::grpc::Co
::grpc::Status
GrpcRequestHandler::DeleteByDate(::grpc::ServerContext* context, const ::milvus::grpc::DeleteByDateParam* request,
::milvus::grpc::Status* response) {
BaseTaskPtr task_ptr = DeleteByDateTask::Create(request);
BaseRequestPtr request_ptr = DeleteByDateRequest::Create(request);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_error_code(grpc_status.error_code());
response->set_reason(grpc_status.reason());
return ::grpc::Status::OK;
@ -163,9 +181,9 @@ GrpcRequestHandler::DeleteByDate(::grpc::ServerContext* context, const ::milvus:
::grpc::Status
GrpcRequestHandler::PreloadTable(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::Status* response) {
BaseTaskPtr task_ptr = PreloadTableTask::Create(request->table_name());
BaseRequestPtr request_ptr = PreloadTableRequest::Create(request->table_name());
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_reason(grpc_status.reason());
response->set_error_code(grpc_status.error_code());
return ::grpc::Status::OK;
@ -174,9 +192,9 @@ GrpcRequestHandler::PreloadTable(::grpc::ServerContext* context, const ::milvus:
::grpc::Status
GrpcRequestHandler::DescribeIndex(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::IndexParam* response) {
BaseTaskPtr task_ptr = DescribeIndexTask::Create(request->table_name(), response);
BaseRequestPtr request_ptr = DescribeIndexRequest::Create(request->table_name(), response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
return ::grpc::Status::OK;
@ -185,9 +203,9 @@ GrpcRequestHandler::DescribeIndex(::grpc::ServerContext* context, const ::milvus
::grpc::Status
GrpcRequestHandler::DropIndex(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::Status* response) {
BaseTaskPtr task_ptr = DropIndexTask::Create(request->table_name());
BaseRequestPtr request_ptr = DropIndexRequest::Create(request->table_name());
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_reason(grpc_status.reason());
response->set_error_code(grpc_status.error_code());
return ::grpc::Status::OK;
@ -196,17 +214,17 @@ GrpcRequestHandler::DropIndex(::grpc::ServerContext* context, const ::milvus::gr
::grpc::Status
GrpcRequestHandler::CreatePartition(::grpc::ServerContext* context, const ::milvus::grpc::PartitionParam* request,
::milvus::grpc::Status* response) {
BaseTaskPtr task_ptr = CreatePartitionTask::Create(request);
GrpcRequestScheduler::ExecTask(task_ptr, response);
BaseRequestPtr request_ptr = CreatePartitionRequest::Create(request);
GrpcRequestScheduler::ExecRequest(request_ptr, response);
return ::grpc::Status::OK;
}
::grpc::Status
GrpcRequestHandler::ShowPartitions(::grpc::ServerContext* context, const ::milvus::grpc::TableName* request,
::milvus::grpc::PartitionList* response) {
BaseTaskPtr task_ptr = ShowPartitionsTask::Create(request->table_name(), response);
BaseRequestPtr request_ptr = ShowPartitionsRequest::Create(request->table_name(), response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->mutable_status()->set_reason(grpc_status.reason());
response->mutable_status()->set_error_code(grpc_status.error_code());
return ::grpc::Status::OK;
@ -215,9 +233,9 @@ GrpcRequestHandler::ShowPartitions(::grpc::ServerContext* context, const ::milvu
::grpc::Status
GrpcRequestHandler::DropPartition(::grpc::ServerContext* context, const ::milvus::grpc::PartitionParam* request,
::milvus::grpc::Status* response) {
BaseTaskPtr task_ptr = DropPartitionTask::Create(request);
BaseRequestPtr request_ptr = DropPartitionRequest::Create(request);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
GrpcRequestScheduler::ExecRequest(request_ptr, &grpc_status);
response->set_reason(grpc_status.reason());
response->set_error_code(grpc_status.error_code());
return ::grpc::Status::OK;

View File

@ -70,43 +70,6 @@ ErrorMap(ErrorCode code) {
}
} // namespace
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
GrpcBaseTask::GrpcBaseTask(const std::string& task_group, bool async)
: task_group_(task_group), async_(async), done_(false) {
}
GrpcBaseTask::~GrpcBaseTask() {
WaitToFinish();
}
Status
GrpcBaseTask::Execute() {
status_ = OnExecute();
Done();
return status_;
}
void
GrpcBaseTask::Done() {
done_ = true;
finish_cond_.notify_all();
}
Status
GrpcBaseTask::SetStatus(ErrorCode error_code, const std::string& error_msg) {
status_ = Status(error_code, error_msg);
SERVER_LOG_ERROR << error_msg;
return status_;
}
Status
GrpcBaseTask::WaitToFinish() {
std::unique_lock<std::mutex> lock(finish_mtx_);
finish_cond_.wait(lock, [this] { return done_; });
return status_;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
GrpcRequestScheduler::GrpcRequestScheduler() : stopped_(false) {
Start();
@ -117,17 +80,17 @@ GrpcRequestScheduler::~GrpcRequestScheduler() {
}
void
GrpcRequestScheduler::ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status* grpc_status) {
if (task_ptr == nullptr) {
GrpcRequestScheduler::ExecRequest(BaseRequestPtr& request_ptr, ::milvus::grpc::Status* grpc_status) {
if (request_ptr == nullptr) {
return;
}
GrpcRequestScheduler& scheduler = GrpcRequestScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
scheduler.ExecuteRequest(request_ptr);
if (!task_ptr->IsAsync()) {
task_ptr->WaitToFinish();
const Status& status = task_ptr->status();
if (!request_ptr->IsAsync()) {
request_ptr->WaitToFinish();
const Status& status = request_ptr->status();
if (!status.ok()) {
grpc_status->set_reason(status.message());
grpc_status->set_error_code(ErrorMap(status.code()));
@ -153,7 +116,7 @@ GrpcRequestScheduler::Stop() {
SERVER_LOG_INFO << "Scheduler gonna stop...";
{
std::lock_guard<std::mutex> lock(queue_mtx_);
for (auto iter : task_groups_) {
for (auto iter : request_groups_) {
if (iter.second != nullptr) {
iter.second->Put(nullptr);
}
@ -171,64 +134,64 @@ GrpcRequestScheduler::Stop() {
}
Status
GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr& task_ptr) {
if (task_ptr == nullptr) {
GrpcRequestScheduler::ExecuteRequest(const BaseRequestPtr& request_ptr) {
if (request_ptr == nullptr) {
return Status::OK();
}
auto status = PutTaskToQueue(task_ptr);
auto status = PutToQueue(request_ptr);
if (!status.ok()) {
SERVER_LOG_ERROR << "Put task to queue failed with code: " << status.ToString();
SERVER_LOG_ERROR << "Put request to queue failed with code: " << status.ToString();
return status;
}
if (task_ptr->IsAsync()) {
if (request_ptr->IsAsync()) {
return Status::OK(); // async execution, caller need to call WaitToFinish at somewhere
}
return task_ptr->WaitToFinish(); // sync execution
return request_ptr->WaitToFinish(); // sync execution
}
void
GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) {
if (task_queue == nullptr) {
GrpcRequestScheduler::TakeToExecute(RequestQueuePtr request_queue) {
if (request_queue == nullptr) {
return;
}
while (true) {
BaseTaskPtr task = task_queue->Take();
if (task == nullptr) {
SERVER_LOG_ERROR << "Take null from task queue, stop thread";
BaseRequestPtr request = request_queue->Take();
if (request == nullptr) {
SERVER_LOG_ERROR << "Take null from request queue, stop thread";
break; // stop the thread
}
try {
auto status = task->Execute();
auto status = request->Execute();
if (!status.ok()) {
SERVER_LOG_ERROR << "Task failed with code: " << status.ToString();
SERVER_LOG_ERROR << "Request failed with code: " << status.ToString();
}
} catch (std::exception& ex) {
SERVER_LOG_ERROR << "Task failed to execute: " << ex.what();
SERVER_LOG_ERROR << "Request failed to execute: " << ex.what();
}
}
}
Status
GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr& task_ptr) {
GrpcRequestScheduler::PutToQueue(const BaseRequestPtr& request_ptr) {
std::lock_guard<std::mutex> lock(queue_mtx_);
std::string group_name = task_ptr->TaskGroup();
if (task_groups_.count(group_name) > 0) {
task_groups_[group_name]->Put(task_ptr);
std::string group_name = request_ptr->RequestGroup();
if (request_groups_.count(group_name) > 0) {
request_groups_[group_name]->Put(request_ptr);
} else {
TaskQueuePtr queue = std::make_shared<TaskQueue>();
queue->Put(task_ptr);
task_groups_.insert(std::make_pair(group_name, queue));
RequestQueuePtr queue = std::make_shared<RequestQueue>();
queue->Put(request_ptr);
request_groups_.insert(std::make_pair(group_name, queue));
// start a thread
ThreadPtr thread = std::make_shared<std::thread>(&GrpcRequestScheduler::TakeTaskToExecute, this, queue);
ThreadPtr thread = std::make_shared<std::thread>(&GrpcRequestScheduler::TakeToExecute, this, queue);
execute_threads_.push_back(thread);
SERVER_LOG_INFO << "Create new thread for task group: " << group_name;
SERVER_LOG_INFO << "Create new thread for request group: " << group_name;
}
return Status::OK();

View File

@ -19,6 +19,7 @@
#include "grpc/gen-status/status.grpc.pb.h"
#include "grpc/gen-status/status.pb.h"
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include "utils/BlockingQueue.h"
#include "utils/Status.h"
@ -32,57 +33,8 @@ namespace milvus {
namespace server {
namespace grpc {
class GrpcBaseTask {
protected:
explicit GrpcBaseTask(const std::string& task_group, bool async = false);
virtual ~GrpcBaseTask();
public:
Status
Execute();
void
Done();
Status
WaitToFinish();
std::string
TaskGroup() const {
return task_group_;
}
const Status&
status() const {
return status_;
}
bool
IsAsync() const {
return async_;
}
protected:
virtual Status
OnExecute() = 0;
Status
SetStatus(ErrorCode error_code, const std::string& error_msg);
protected:
mutable std::mutex finish_mtx_;
std::condition_variable finish_cond_;
std::string task_group_;
bool async_;
bool done_;
Status status_;
};
using BaseTaskPtr = std::shared_ptr<GrpcBaseTask>;
using TaskQueue = BlockingQueue<BaseTaskPtr>;
using TaskQueuePtr = std::shared_ptr<TaskQueue>;
using RequestQueue = BlockingQueue<BaseRequestPtr>;
using RequestQueuePtr = std::shared_ptr<RequestQueue>;
using ThreadPtr = std::shared_ptr<std::thread>;
class GrpcRequestScheduler {
@ -100,10 +52,10 @@ class GrpcRequestScheduler {
Stop();
Status
ExecuteTask(const BaseTaskPtr& task_ptr);
ExecuteRequest(const BaseRequestPtr& request_ptr);
static void
ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status* grpc_status);
ExecRequest(BaseRequestPtr& request_ptr, ::milvus::grpc::Status* grpc_status);
protected:
GrpcRequestScheduler();
@ -111,15 +63,15 @@ class GrpcRequestScheduler {
virtual ~GrpcRequestScheduler();
void
TakeTaskToExecute(TaskQueuePtr task_queue);
TakeToExecute(RequestQueuePtr request_queue);
Status
PutTaskToQueue(const BaseTaskPtr& task_ptr);
PutToQueue(const BaseRequestPtr& request_ptr);
private:
mutable std::mutex queue_mtx_;
std::map<std::string, TaskQueuePtr> task_groups_;
std::map<std::string, RequestQueuePtr> request_groups_;
std::vector<ThreadPtr> execute_threads_;

File diff suppressed because it is too large Load Diff

View File

@ -1,321 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "db/Types.h"
#include "server/grpc_impl/GrpcRequestScheduler.h"
#include "utils/Status.h"
#include "grpc/gen-milvus/milvus.grpc.pb.h"
#include "grpc/gen-status/status.pb.h"
#include <condition_variable>
#include <memory>
#include <string>
#include <vector>
namespace milvus {
namespace server {
namespace grpc {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class CreateTableTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const ::milvus::grpc::TableSchema* schema);
protected:
explicit CreateTableTask(const ::milvus::grpc::TableSchema* schema);
Status
OnExecute() override;
private:
const ::milvus::grpc::TableSchema* schema_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class HasTableTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name, bool& has_table);
protected:
HasTableTask(const std::string& table_name, bool& has_table);
Status
OnExecute() override;
private:
std::string table_name_;
bool& has_table_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DescribeTableTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name, ::milvus::grpc::TableSchema* schema);
protected:
DescribeTableTask(const std::string& table_name, ::milvus::grpc::TableSchema* schema);
Status
OnExecute() override;
private:
std::string table_name_;
::milvus::grpc::TableSchema* schema_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DropTableTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name);
protected:
explicit DropTableTask(const std::string& table_name);
Status
OnExecute() override;
private:
std::string table_name_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class CreateIndexTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const ::milvus::grpc::IndexParam* index_param);
protected:
explicit CreateIndexTask(const ::milvus::grpc::IndexParam* index_param);
Status
OnExecute() override;
private:
const ::milvus::grpc::IndexParam* index_param_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class ShowTablesTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(::milvus::grpc::TableNameList* table_name_list);
protected:
explicit ShowTablesTask(::milvus::grpc::TableNameList* table_name_list);
Status
OnExecute() override;
private:
::milvus::grpc::TableNameList* table_name_list_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class InsertTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const ::milvus::grpc::InsertParam* insert_param, ::milvus::grpc::VectorIds* record_ids);
protected:
InsertTask(const ::milvus::grpc::InsertParam* insert_param, ::milvus::grpc::VectorIds* record_ids);
Status
OnExecute() override;
private:
const ::milvus::grpc::InsertParam* insert_param_;
::milvus::grpc::VectorIds* record_ids_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class SearchTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const ::milvus::grpc::SearchParam* search_param, const std::vector<std::string>& file_id_array,
::milvus::grpc::TopKQueryResult* response);
protected:
SearchTask(const ::milvus::grpc::SearchParam* search_param, const std::vector<std::string>& file_id_array,
::milvus::grpc::TopKQueryResult* response);
Status
OnExecute() override;
private:
const ::milvus::grpc::SearchParam* search_param_;
std::vector<std::string> file_id_array_;
::milvus::grpc::TopKQueryResult* topk_result_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class CountTableTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name, int64_t& row_count);
protected:
CountTableTask(const std::string& table_name, int64_t& row_count);
Status
OnExecute() override;
private:
std::string table_name_;
int64_t& row_count_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class CmdTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& cmd, std::string& result);
protected:
CmdTask(const std::string& cmd, std::string& result);
Status
OnExecute() override;
private:
std::string cmd_;
std::string& result_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DeleteByDateTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const ::milvus::grpc::DeleteByDateParam* delete_by_range_param);
protected:
explicit DeleteByDateTask(const ::milvus::grpc::DeleteByDateParam* delete_by_range_param);
Status
OnExecute() override;
private:
const ::milvus::grpc::DeleteByDateParam* delete_by_range_param_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class PreloadTableTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name);
protected:
explicit PreloadTableTask(const std::string& table_name);
Status
OnExecute() override;
private:
std::string table_name_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DescribeIndexTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name, ::milvus::grpc::IndexParam* index_param);
protected:
DescribeIndexTask(const std::string& table_name, ::milvus::grpc::IndexParam* index_param);
Status
OnExecute() override;
private:
std::string table_name_;
::milvus::grpc::IndexParam* index_param_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DropIndexTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name);
protected:
explicit DropIndexTask(const std::string& table_name);
Status
OnExecute() override;
private:
std::string table_name_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class CreatePartitionTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const ::milvus::grpc::PartitionParam* partition_param);
protected:
explicit CreatePartitionTask(const ::milvus::grpc::PartitionParam* partition_param);
Status
OnExecute() override;
private:
const ::milvus::grpc::PartitionParam* partition_param_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class ShowPartitionsTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const std::string& table_name, ::milvus::grpc::PartitionList* partition_list);
protected:
ShowPartitionsTask(const std::string& table_name, ::milvus::grpc::PartitionList* partition_list);
Status
OnExecute() override;
private:
std::string table_name_;
::milvus::grpc::PartitionList* partition_list_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DropPartitionTask : public GrpcBaseTask {
public:
static BaseTaskPtr
Create(const ::milvus::grpc::PartitionParam* partition_param);
protected:
explicit DropPartitionTask(const ::milvus::grpc::PartitionParam* partition_param);
Status
OnExecute() override;
private:
const ::milvus::grpc::PartitionParam* partition_param_;
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/CmdRequest.h"
#include "scheduler/SchedInst.h"
#include <memory>
namespace milvus {
namespace server {
namespace grpc {
CmdRequest::CmdRequest(const std::string& cmd, std::string& result)
: GrpcBaseRequest(INFO_REQUEST_GROUP), cmd_(cmd), result_(result) {
}
BaseRequestPtr
CmdRequest::Create(const std::string& cmd, std::string& result) {
return std::shared_ptr<GrpcBaseRequest>(new CmdRequest(cmd, result));
}
Status
CmdRequest::OnExecute() {
if (cmd_ == "version") {
result_ = MILVUS_VERSION;
} else if (cmd_ == "tasktable") {
result_ = scheduler::ResMgrInst::GetInstance()->DumpTaskTables();
} else {
result_ = "OK";
}
return Status::OK();
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <string>
namespace milvus {
namespace server {
namespace grpc {
class CmdRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& cmd, std::string& result);
protected:
CmdRequest(const std::string& cmd, std::string& result);
Status
OnExecute() override;
private:
std::string cmd_;
std::string& result_;
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/CountTableRequest.h"
#include "server/DBWrapper.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
#include <memory>
namespace milvus {
namespace server {
namespace grpc {
CountTableRequest::CountTableRequest(const std::string& table_name, int64_t& row_count)
: GrpcBaseRequest(INFO_REQUEST_GROUP), table_name_(table_name), row_count_(row_count) {
}
BaseRequestPtr
CountTableRequest::Create(const std::string& table_name, int64_t& row_count) {
return std::shared_ptr<GrpcBaseRequest>(new CountTableRequest(table_name, row_count));
}
Status
CountTableRequest::OnExecute() {
try {
TimeRecorder rc("CountTableRequest");
// step 1: check arguments
auto status = ValidationUtil::ValidateTableName(table_name_);
if (!status.ok()) {
return status;
}
// step 2: get row count
uint64_t row_count = 0;
status = DBWrapper::DB()->GetTableRowCount(table_name_, row_count);
if (!status.ok()) {
if (status.code(), DB_NOT_FOUND) {
return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(table_name_));
} else {
return status;
}
}
row_count_ = static_cast<int64_t>(row_count);
rc.ElapseFromBegin("total cost");
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
return Status::OK();
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <string>
namespace milvus {
namespace server {
namespace grpc {
class CountTableRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& table_name, int64_t& row_count);
protected:
CountTableRequest(const std::string& table_name, int64_t& row_count);
Status
OnExecute() override;
private:
std::string table_name_;
int64_t& row_count_;
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,96 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/CreateIndexRequest.h"
#include "server/DBWrapper.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
#include <memory>
#include <string>
namespace milvus {
namespace server {
namespace grpc {
CreateIndexRequest::CreateIndexRequest(const ::milvus::grpc::IndexParam* index_param)
: GrpcBaseRequest(DDL_DML_REQUEST_GROUP), index_param_(index_param) {
}
BaseRequestPtr
CreateIndexRequest::Create(const ::milvus::grpc::IndexParam* index_param) {
if (index_param == nullptr) {
SERVER_LOG_ERROR << "grpc input is null!";
return nullptr;
}
return std::shared_ptr<GrpcBaseRequest>(new CreateIndexRequest(index_param));
}
Status
CreateIndexRequest::OnExecute() {
try {
TimeRecorder rc("CreateIndexRequest");
// step 1: check arguments
std::string table_name_ = index_param_->table_name();
auto status = ValidationUtil::ValidateTableName(table_name_);
if (!status.ok()) {
return status;
}
bool has_table = false;
status = DBWrapper::DB()->HasTable(table_name_, has_table);
if (!status.ok()) {
return status;
}
if (!has_table) {
return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(table_name_));
}
auto& grpc_index = index_param_->index();
status = ValidationUtil::ValidateTableIndexType(grpc_index.index_type());
if (!status.ok()) {
return status;
}
status = ValidationUtil::ValidateTableIndexNlist(grpc_index.nlist());
if (!status.ok()) {
return status;
}
// step 2: check table existence
engine::TableIndex index;
index.engine_type_ = grpc_index.index_type();
index.nlist_ = grpc_index.nlist();
status = DBWrapper::DB()->CreateIndex(table_name_, index);
if (!status.ok()) {
return status;
}
rc.ElapseFromBegin("totally cost");
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
return Status::OK();
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
namespace server {
namespace grpc {
class CreateIndexRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const ::milvus::grpc::IndexParam* index_param);
protected:
explicit CreateIndexRequest(const ::milvus::grpc::IndexParam* index_param);
Status
OnExecute() override;
private:
const ::milvus::grpc::IndexParam* index_param_;
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/CreatePartitionRequest.h"
#include "server/DBWrapper.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
#include <memory>
namespace milvus {
namespace server {
namespace grpc {
CreatePartitionRequest::CreatePartitionRequest(const ::milvus::grpc::PartitionParam* partition_param)
: GrpcBaseRequest(DDL_DML_REQUEST_GROUP), partition_param_(partition_param) {
}
BaseRequestPtr
CreatePartitionRequest::Create(const ::milvus::grpc::PartitionParam* partition_param) {
if (partition_param == nullptr) {
SERVER_LOG_ERROR << "grpc input is null!";
return nullptr;
}
return std::shared_ptr<GrpcBaseRequest>(new CreatePartitionRequest(partition_param));
}
Status
CreatePartitionRequest::OnExecute() {
TimeRecorder rc("CreatePartitionRequest");
try {
// step 1: check arguments
auto status = ValidationUtil::ValidateTableName(partition_param_->table_name());
if (!status.ok()) {
return status;
}
status = ValidationUtil::ValidateTableName(partition_param_->partition_name());
if (!status.ok()) {
return status;
}
status = ValidationUtil::ValidatePartitionTags({partition_param_->tag()});
if (!status.ok()) {
return status;
}
// step 2: create partition
status = DBWrapper::DB()->CreatePartition(partition_param_->table_name(), partition_param_->partition_name(),
partition_param_->tag());
if (!status.ok()) {
// partition could exist
if (status.code() == DB_ALREADY_EXIST) {
return Status(SERVER_INVALID_TABLE_NAME, status.message());
}
return status;
}
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
rc.ElapseFromBegin("totally cost");
return Status::OK();
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
namespace server {
namespace grpc {
class CreatePartitionRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const ::milvus::grpc::PartitionParam* partition_param);
protected:
explicit CreatePartitionRequest(const ::milvus::grpc::PartitionParam* partition_param);
Status
OnExecute() override;
private:
const ::milvus::grpc::PartitionParam* partition_param_;
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,96 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/CreateTableRequest.h"
#include "server/DBWrapper.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
#include <memory>
namespace milvus {
namespace server {
namespace grpc {
CreateTableRequest::CreateTableRequest(const ::milvus::grpc::TableSchema* schema)
: GrpcBaseRequest(DDL_DML_REQUEST_GROUP), schema_(schema) {
}
BaseRequestPtr
CreateTableRequest::Create(const ::milvus::grpc::TableSchema* schema) {
if (schema == nullptr) {
SERVER_LOG_ERROR << "grpc input is null!";
return nullptr;
}
return std::shared_ptr<GrpcBaseRequest>(new CreateTableRequest(schema));
}
Status
CreateTableRequest::OnExecute() {
TimeRecorder rc("CreateTableRequest");
try {
// step 1: check arguments
auto status = ValidationUtil::ValidateTableName(schema_->table_name());
if (!status.ok()) {
return status;
}
status = ValidationUtil::ValidateTableDimension(schema_->dimension());
if (!status.ok()) {
return status;
}
status = ValidationUtil::ValidateTableIndexFileSize(schema_->index_file_size());
if (!status.ok()) {
return status;
}
status = ValidationUtil::ValidateTableIndexMetricType(schema_->metric_type());
if (!status.ok()) {
return status;
}
// step 2: construct table schema
engine::meta::TableSchema table_info;
table_info.table_id_ = schema_->table_name();
table_info.dimension_ = static_cast<uint16_t>(schema_->dimension());
table_info.index_file_size_ = schema_->index_file_size();
table_info.metric_type_ = schema_->metric_type();
// step 3: create table
status = DBWrapper::DB()->CreateTable(table_info);
if (!status.ok()) {
// table could exist
if (status.code() == DB_ALREADY_EXIST) {
return Status(SERVER_INVALID_TABLE_NAME, status.message());
}
return status;
}
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
rc.ElapseFromBegin("totally cost");
return Status::OK();
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
namespace server {
namespace grpc {
class CreateTableRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const ::milvus::grpc::TableSchema* schema);
protected:
explicit CreateTableRequest(const ::milvus::grpc::TableSchema* schema);
Status
OnExecute() override;
private:
const ::milvus::grpc::TableSchema* schema_;
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,101 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/DeleteByDateRequest.h"
#include "server/DBWrapper.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
#include <memory>
#include <string>
#include <vector>
namespace milvus {
namespace server {
namespace grpc {
DeleteByDateRequest::DeleteByDateRequest(const ::milvus::grpc::DeleteByDateParam* delete_by_range_param)
: GrpcBaseRequest(DDL_DML_REQUEST_GROUP), delete_by_range_param_(delete_by_range_param) {
}
BaseRequestPtr
DeleteByDateRequest::Create(const ::milvus::grpc::DeleteByDateParam* delete_by_range_param) {
if (delete_by_range_param == nullptr) {
SERVER_LOG_ERROR << "grpc input is null!";
return nullptr;
}
return std::shared_ptr<GrpcBaseRequest>(new DeleteByDateRequest(delete_by_range_param));
}
Status
DeleteByDateRequest::OnExecute() {
try {
TimeRecorder rc("DeleteByDateRequest");
// step 1: check arguments
std::string table_name = delete_by_range_param_->table_name();
auto status = ValidationUtil::ValidateTableName(table_name);
if (!status.ok()) {
return status;
}
// step 2: check table existence
engine::meta::TableSchema table_info;
table_info.table_id_ = table_name;
status = DBWrapper::DB()->DescribeTable(table_info);
if (!status.ok()) {
if (status.code(), DB_NOT_FOUND) {
return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(table_name));
} else {
return status;
}
}
rc.ElapseFromBegin("check validation");
// step 3: check date range, and convert to db dates
std::vector<DB_DATE> dates;
ErrorCode error_code = SERVER_SUCCESS;
std::string error_msg;
std::vector<::milvus::grpc::Range> range_array;
range_array.emplace_back(delete_by_range_param_->range());
status = ConvertTimeRangeToDBDates(range_array, dates);
if (!status.ok()) {
return status;
}
#ifdef MILVUS_ENABLE_PROFILING
std::string fname = "/tmp/search_nq_" + this->delete_by_range_param_->table_name() + ".profiling";
ProfilerStart(fname.c_str());
#endif
status = DBWrapper::DB()->DropTable(table_name, dates);
if (!status.ok()) {
return status;
}
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
return Status::OK();
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
namespace server {
namespace grpc {
class DeleteByDateRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const ::milvus::grpc::DeleteByDateParam* delete_by_range_param);
protected:
explicit DeleteByDateRequest(const ::milvus::grpc::DeleteByDateParam* delete_by_range_param);
Status
OnExecute() override;
private:
const ::milvus::grpc::DeleteByDateParam* delete_by_range_param_;
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/DescribeIndexRequest.h"
#include "server/DBWrapper.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
#include <memory>
namespace milvus {
namespace server {
namespace grpc {
DescribeIndexRequest::DescribeIndexRequest(const std::string& table_name, ::milvus::grpc::IndexParam* index_param)
: GrpcBaseRequest(INFO_REQUEST_GROUP), table_name_(table_name), index_param_(index_param) {
}
BaseRequestPtr
DescribeIndexRequest::Create(const std::string& table_name, ::milvus::grpc::IndexParam* index_param) {
return std::shared_ptr<GrpcBaseRequest>(new DescribeIndexRequest(table_name, index_param));
}
Status
DescribeIndexRequest::OnExecute() {
try {
TimeRecorder rc("DescribeIndexRequest");
// step 1: check arguments
auto status = ValidationUtil::ValidateTableName(table_name_);
if (!status.ok()) {
return status;
}
// step 2: check table existence
engine::TableIndex index;
status = DBWrapper::DB()->DescribeIndex(table_name_, index);
if (!status.ok()) {
return status;
}
index_param_->set_table_name(table_name_);
index_param_->mutable_index()->set_index_type(index.engine_type_);
index_param_->mutable_index()->set_nlist(index.nlist_);
rc.ElapseFromBegin("totally cost");
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
return Status::OK();
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <string>
namespace milvus {
namespace server {
namespace grpc {
class DescribeIndexRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& table_name, ::milvus::grpc::IndexParam* index_param);
protected:
DescribeIndexRequest(const std::string& table_name, ::milvus::grpc::IndexParam* index_param);
Status
OnExecute() override;
private:
std::string table_name_;
::milvus::grpc::IndexParam* index_param_;
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/DescribeTableRequest.h"
#include "server/DBWrapper.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
#include <memory>
namespace milvus {
namespace server {
namespace grpc {
DescribeTableRequest::DescribeTableRequest(const std::string& table_name, ::milvus::grpc::TableSchema* schema)
: GrpcBaseRequest(INFO_REQUEST_GROUP), table_name_(table_name), schema_(schema) {
}
BaseRequestPtr
DescribeTableRequest::Create(const std::string& table_name, ::milvus::grpc::TableSchema* schema) {
return std::shared_ptr<GrpcBaseRequest>(new DescribeTableRequest(table_name, schema));
}
Status
DescribeTableRequest::OnExecute() {
TimeRecorder rc("DescribeTableRequest");
try {
// step 1: check arguments
auto status = ValidationUtil::ValidateTableName(table_name_);
if (!status.ok()) {
return status;
}
// step 2: get table info
engine::meta::TableSchema table_info;
table_info.table_id_ = table_name_;
status = DBWrapper::DB()->DescribeTable(table_info);
if (!status.ok()) {
return status;
}
schema_->set_table_name(table_info.table_id_);
schema_->set_dimension(table_info.dimension_);
schema_->set_index_file_size(table_info.index_file_size_);
schema_->set_metric_type(table_info.metric_type_);
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
rc.ElapseFromBegin("totally cost");
return Status::OK();
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <string>
namespace milvus {
namespace server {
namespace grpc {
class DescribeTableRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& table_name, ::milvus::grpc::TableSchema* schema);
protected:
DescribeTableRequest(const std::string& table_name, ::milvus::grpc::TableSchema* schema);
Status
OnExecute() override;
private:
std::string table_name_;
::milvus::grpc::TableSchema* schema_;
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/DropIndexRequest.h"
#include "server/DBWrapper.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
#include <memory>
namespace milvus {
namespace server {
namespace grpc {
DropIndexRequest::DropIndexRequest(const std::string& table_name)
: GrpcBaseRequest(DDL_DML_REQUEST_GROUP), table_name_(table_name) {
}
BaseRequestPtr
DropIndexRequest::Create(const std::string& table_name) {
return std::shared_ptr<GrpcBaseRequest>(new DropIndexRequest(table_name));
}
Status
DropIndexRequest::OnExecute() {
try {
TimeRecorder rc("DropIndexRequest");
// step 1: check arguments
auto status = ValidationUtil::ValidateTableName(table_name_);
if (!status.ok()) {
return status;
}
bool has_table = false;
status = DBWrapper::DB()->HasTable(table_name_, has_table);
if (!status.ok()) {
return status;
}
if (!has_table) {
return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(table_name_));
}
// step 2: check table existence
status = DBWrapper::DB()->DropIndex(table_name_);
if (!status.ok()) {
return status;
}
rc.ElapseFromBegin("totally cost");
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
return Status::OK();
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <string>
namespace milvus {
namespace server {
namespace grpc {
class DropIndexRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& table_name);
protected:
explicit DropIndexRequest(const std::string& table_name);
Status
OnExecute() override;
private:
std::string table_name_;
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/DropPartitionRequest.h"
#include "server/DBWrapper.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
#include <memory>
namespace milvus {
namespace server {
namespace grpc {
DropPartitionRequest::DropPartitionRequest(const ::milvus::grpc::PartitionParam* partition_param)
: GrpcBaseRequest(DDL_DML_REQUEST_GROUP), partition_param_(partition_param) {
}
BaseRequestPtr
DropPartitionRequest::Create(const ::milvus::grpc::PartitionParam* partition_param) {
return std::shared_ptr<GrpcBaseRequest>(new DropPartitionRequest(partition_param));
}
Status
DropPartitionRequest::OnExecute() {
if (!partition_param_->partition_name().empty()) {
auto status = ValidationUtil::ValidateTableName(partition_param_->partition_name());
if (!status.ok()) {
return status;
}
return DBWrapper::DB()->DropPartition(partition_param_->partition_name());
} else {
auto status = ValidationUtil::ValidateTableName(partition_param_->table_name());
if (!status.ok()) {
return status;
}
status = ValidationUtil::ValidatePartitionTags({partition_param_->tag()});
if (!status.ok()) {
return status;
}
return DBWrapper::DB()->DropPartitionByTag(partition_param_->table_name(), partition_param_->tag());
}
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
namespace server {
namespace grpc {
class DropPartitionRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const ::milvus::grpc::PartitionParam* partition_param);
protected:
explicit DropPartitionRequest(const ::milvus::grpc::PartitionParam* partition_param);
Status
OnExecute() override;
private:
const ::milvus::grpc::PartitionParam* partition_param_;
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/DropTableRequest.h"
#include "server/DBWrapper.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
#include <memory>
#include <vector>
namespace milvus {
namespace server {
namespace grpc {
DropTableRequest::DropTableRequest(const std::string& table_name)
: GrpcBaseRequest(DDL_DML_REQUEST_GROUP), table_name_(table_name) {
}
BaseRequestPtr
DropTableRequest::Create(const std::string& table_name) {
return std::shared_ptr<GrpcBaseRequest>(new DropTableRequest(table_name));
}
Status
DropTableRequest::OnExecute() {
try {
TimeRecorder rc("DropTableRequest");
// step 1: check arguments
auto status = ValidationUtil::ValidateTableName(table_name_);
if (!status.ok()) {
return status;
}
// step 2: check table existence
engine::meta::TableSchema table_info;
table_info.table_id_ = table_name_;
status = DBWrapper::DB()->DescribeTable(table_info);
if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) {
return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(table_name_));
} else {
return status;
}
}
rc.ElapseFromBegin("check validation");
// step 3: Drop table
std::vector<DB_DATE> dates;
status = DBWrapper::DB()->DropTable(table_name_, dates);
if (!status.ok()) {
return status;
}
rc.ElapseFromBegin("total cost");
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
return Status::OK();
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <string>
namespace milvus {
namespace server {
namespace grpc {
class DropTableRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& table_name);
protected:
explicit DropTableRequest(const std::string& table_name);
Status
OnExecute() override;
private:
std::string table_name_;
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,107 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include "utils/CommonUtil.h"
#include "utils/Log.h"
namespace milvus {
namespace server {
namespace grpc {
constexpr int64_t DAY_SECONDS = 24 * 60 * 60;
Status
ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range>& range_array, std::vector<DB_DATE>& dates) {
dates.clear();
for (auto& range : range_array) {
time_t tt_start, tt_end;
tm tm_start, tm_end;
if (!CommonUtil::TimeStrToTime(range.start_value(), tt_start, tm_start)) {
return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value());
}
if (!CommonUtil::TimeStrToTime(range.end_value(), tt_end, tm_end)) {
return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value());
}
int64_t days = (tt_end - tt_start) / DAY_SECONDS;
if (days <= 0) {
return Status(SERVER_INVALID_TIME_RANGE,
"Invalid time range: The start-date should be smaller than end-date!");
}
// range: [start_day, end_day)
for (int64_t i = 0; i < days; i++) {
time_t tt_day = tt_start + DAY_SECONDS * i;
tm tm_day;
CommonUtil::ConvertTime(tt_day, tm_day);
int64_t date = tm_day.tm_year * 10000 + tm_day.tm_mon * 100 + tm_day.tm_mday; // according to db logic
dates.push_back(date);
}
}
return Status::OK();
}
GrpcBaseRequest::GrpcBaseRequest(const std::string& request_group, bool async)
: request_group_(request_group), async_(async), done_(false) {
}
GrpcBaseRequest::~GrpcBaseRequest() {
WaitToFinish();
}
Status
GrpcBaseRequest::Execute() {
status_ = OnExecute();
Done();
return status_;
}
void
GrpcBaseRequest::Done() {
done_ = true;
finish_cond_.notify_all();
}
Status
GrpcBaseRequest::SetStatus(ErrorCode error_code, const std::string& error_msg) {
status_ = Status(error_code, error_msg);
SERVER_LOG_ERROR << error_msg;
return status_;
}
std::string
GrpcBaseRequest::TableNotExistMsg(const std::string& table_name) {
return "Table " + table_name +
" not exist. Use milvus.has_table to verify whether the table exists. You also can check if the table name "
"exists.";
}
Status
GrpcBaseRequest::WaitToFinish() {
std::unique_lock<std::mutex> lock(finish_mtx_);
finish_cond_.wait(lock, [this] { return done_; });
return status_;
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,101 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "db/meta/MetaTypes.h"
#include "grpc/gen-milvus/milvus.grpc.pb.h"
#include "grpc/gen-status/status.grpc.pb.h"
#include "grpc/gen-status/status.pb.h"
#include "utils/Status.h"
#include <condition_variable>
//#include <gperftools/profiler.h>
#include <memory>
#include <string>
#include <thread>
#include <vector>
namespace milvus {
namespace server {
namespace grpc {
static const char* DQL_REQUEST_GROUP = "dql";
static const char* DDL_DML_REQUEST_GROUP = "ddl_dml";
static const char* INFO_REQUEST_GROUP = "info";
using DB_DATE = milvus::engine::meta::DateT;
Status
ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range>& range_array, std::vector<DB_DATE>& dates);
class GrpcBaseRequest {
protected:
explicit GrpcBaseRequest(const std::string& request_group, bool async = false);
virtual ~GrpcBaseRequest();
public:
Status
Execute();
void
Done();
Status
WaitToFinish();
std::string
RequestGroup() const {
return request_group_;
}
const Status&
status() const {
return status_;
}
bool
IsAsync() const {
return async_;
}
protected:
virtual Status
OnExecute() = 0;
Status
SetStatus(ErrorCode error_code, const std::string& error_msg);
std::string
TableNotExistMsg(const std::string& table_name);
protected:
mutable std::mutex finish_mtx_;
std::condition_variable finish_cond_;
std::string request_group_;
bool async_;
bool done_;
Status status_;
};
using BaseRequestPtr = std::shared_ptr<GrpcBaseRequest>;
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/HasTableRequest.h"
#include "server/DBWrapper.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
#include <memory>
namespace milvus {
namespace server {
namespace grpc {
HasTableRequest::HasTableRequest(const std::string& table_name, bool& has_table)
: GrpcBaseRequest(INFO_REQUEST_GROUP), table_name_(table_name), has_table_(has_table) {
}
BaseRequestPtr
HasTableRequest::Create(const std::string& table_name, bool& has_table) {
return std::shared_ptr<GrpcBaseRequest>(new HasTableRequest(table_name, has_table));
}
Status
HasTableRequest::OnExecute() {
try {
TimeRecorder rc("HasTableRequest");
// step 1: check arguments
auto status = ValidationUtil::ValidateTableName(table_name_);
if (!status.ok()) {
return status;
}
// step 2: check table existence
status = DBWrapper::DB()->HasTable(table_name_, has_table_);
if (!status.ok()) {
return status;
}
rc.ElapseFromBegin("totally cost");
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
return Status::OK();
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <string>
namespace milvus {
namespace server {
namespace grpc {
class HasTableRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& table_name, bool& has_table);
protected:
HasTableRequest(const std::string& table_name, bool& has_table);
Status
OnExecute() override;
private:
std::string table_name_;
bool& has_table_;
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,170 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/InsertRequest.h"
#include "server/DBWrapper.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
#include <memory>
#include <string>
#include <vector>
namespace milvus {
namespace server {
namespace grpc {
InsertRequest::InsertRequest(const ::milvus::grpc::InsertParam* insert_param, ::milvus::grpc::VectorIds* record_ids)
: GrpcBaseRequest(DDL_DML_REQUEST_GROUP), insert_param_(insert_param), record_ids_(record_ids) {
}
BaseRequestPtr
InsertRequest::Create(const ::milvus::grpc::InsertParam* insert_param, ::milvus::grpc::VectorIds* record_ids) {
if (insert_param == nullptr) {
SERVER_LOG_ERROR << "grpc input is null!";
return nullptr;
}
return std::shared_ptr<GrpcBaseRequest>(new InsertRequest(insert_param, record_ids));
}
Status
InsertRequest::OnExecute() {
try {
TimeRecorder rc("InsertRequest");
// step 1: check arguments
auto status = ValidationUtil::ValidateTableName(insert_param_->table_name());
if (!status.ok()) {
return status;
}
if (insert_param_->row_record_array().empty()) {
return Status(SERVER_INVALID_ROWRECORD_ARRAY,
"The vector array is empty. Make sure you have entered vector records.");
}
if (!insert_param_->row_id_array().empty()) {
if (insert_param_->row_id_array().size() != insert_param_->row_record_array_size()) {
return Status(SERVER_ILLEGAL_VECTOR_ID,
"The size of vector ID array must be equal to the size of the vector.");
}
}
// step 2: check table existence
engine::meta::TableSchema table_info;
table_info.table_id_ = insert_param_->table_name();
status = DBWrapper::DB()->DescribeTable(table_info);
if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) {
return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(insert_param_->table_name()));
} else {
return status;
}
}
// step 3: check table flag
// all user provide id, or all internal id
bool user_provide_ids = !insert_param_->row_id_array().empty();
// user already provided id before, all insert action require user id
if ((table_info.flag_ & engine::meta::FLAG_MASK_HAS_USERID) != 0 && !user_provide_ids) {
return Status(SERVER_ILLEGAL_VECTOR_ID,
"Table vector IDs are user-defined. Please provide IDs for all vectors of this table.");
}
// user didn't provided id before, no need to provide user id
if ((table_info.flag_ & engine::meta::FLAG_MASK_NO_USERID) != 0 && user_provide_ids) {
return Status(
SERVER_ILLEGAL_VECTOR_ID,
"Table vector IDs are auto-generated. All vectors of this table must use auto-generated IDs.");
}
rc.RecordSection("check validation");
#ifdef MILVUS_ENABLE_PROFILING
std::string fname =
"/tmp/insert_" + std::to_string(this->insert_param_->row_record_array_size()) + ".profiling";
ProfilerStart(fname.c_str());
#endif
// step 4: prepare float data
std::vector<float> vec_f(insert_param_->row_record_array_size() * table_info.dimension_, 0);
// TODO(yk): change to one dimension array or use multiple-thread to copy the data
for (size_t i = 0; i < insert_param_->row_record_array_size(); i++) {
if (insert_param_->row_record_array(i).vector_data().empty()) {
return Status(SERVER_INVALID_ROWRECORD_ARRAY,
"The vector dimension must be equal to the table dimension.");
}
uint64_t vec_dim = insert_param_->row_record_array(i).vector_data().size();
if (vec_dim != table_info.dimension_) {
ErrorCode error_code = SERVER_INVALID_VECTOR_DIMENSION;
std::string error_msg = "The vector dimension must be equal to the table dimension.";
return Status(error_code, error_msg);
}
memcpy(&vec_f[i * table_info.dimension_], insert_param_->row_record_array(i).vector_data().data(),
table_info.dimension_ * sizeof(float));
}
rc.ElapseFromBegin("prepare vectors data");
// step 5: insert vectors
auto vec_count = static_cast<uint64_t>(insert_param_->row_record_array_size());
std::vector<int64_t> vec_ids(insert_param_->row_id_array_size(), 0);
if (!insert_param_->row_id_array().empty()) {
const int64_t* src_data = insert_param_->row_id_array().data();
int64_t* target_data = vec_ids.data();
memcpy(target_data, src_data, static_cast<size_t>(sizeof(int64_t) * insert_param_->row_id_array_size()));
}
status = DBWrapper::DB()->InsertVectors(insert_param_->table_name(), insert_param_->partition_tag(), vec_count,
vec_f.data(), vec_ids);
rc.ElapseFromBegin("add vectors to engine");
if (!status.ok()) {
return status;
}
for (int64_t id : vec_ids) {
record_ids_->add_vector_id_array(id);
}
auto ids_size = record_ids_->vector_id_array_size();
if (ids_size != vec_count) {
std::string msg =
"Add " + std::to_string(vec_count) + " vectors but only return " + std::to_string(ids_size) + " id";
return Status(SERVER_ILLEGAL_VECTOR_ID, msg);
}
// step 6: update table flag
user_provide_ids ? table_info.flag_ |= engine::meta::FLAG_MASK_HAS_USERID
: table_info.flag_ |= engine::meta::FLAG_MASK_NO_USERID;
status = DBWrapper::DB()->UpdateTableFlag(insert_param_->table_name(), table_info.flag_);
#ifdef MILVUS_ENABLE_PROFILING
ProfilerStop();
#endif
rc.RecordSection("add vectors to engine");
rc.ElapseFromBegin("total cost");
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
return Status::OK();
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
namespace server {
namespace grpc {
class InsertRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const ::milvus::grpc::InsertParam* insert_param, ::milvus::grpc::VectorIds* record_ids);
protected:
InsertRequest(const ::milvus::grpc::InsertParam* insert_param, ::milvus::grpc::VectorIds* record_ids);
Status
OnExecute() override;
private:
const ::milvus::grpc::InsertParam* insert_param_;
::milvus::grpc::VectorIds* record_ids_;
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/PreloadTableRequest.h"
#include "server/DBWrapper.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
#include <memory>
namespace milvus {
namespace server {
namespace grpc {
PreloadTableRequest::PreloadTableRequest(const std::string& table_name)
: GrpcBaseRequest(DQL_REQUEST_GROUP), table_name_(table_name) {
}
BaseRequestPtr
PreloadTableRequest::Create(const std::string& table_name) {
return std::shared_ptr<GrpcBaseRequest>(new PreloadTableRequest(table_name));
}
Status
PreloadTableRequest::OnExecute() {
try {
TimeRecorder rc("PreloadTableRequest");
// step 1: check arguments
auto status = ValidationUtil::ValidateTableName(table_name_);
if (!status.ok()) {
return status;
}
// step 2: check table existence
status = DBWrapper::DB()->PreloadTable(table_name_);
if (!status.ok()) {
return status;
}
rc.ElapseFromBegin("totally cost");
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
return Status::OK();
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <string>
namespace milvus {
namespace server {
namespace grpc {
class PreloadTableRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& table_name);
protected:
explicit PreloadTableRequest(const std::string& table_name);
Status
OnExecute() override;
private:
std::string table_name_;
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,188 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/SearchRequest.h"
#include "server/DBWrapper.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
#include <memory>
namespace milvus {
namespace server {
namespace grpc {
SearchRequest::SearchRequest(const ::milvus::grpc::SearchParam* search_vector_infos,
const std::vector<std::string>& file_id_array, ::milvus::grpc::TopKQueryResult* response)
: GrpcBaseRequest(DQL_REQUEST_GROUP),
search_param_(search_vector_infos),
file_id_array_(file_id_array),
topk_result_(response) {
}
BaseRequestPtr
SearchRequest::Create(const ::milvus::grpc::SearchParam* search_vector_infos,
const std::vector<std::string>& file_id_array, ::milvus::grpc::TopKQueryResult* response) {
if (search_vector_infos == nullptr) {
SERVER_LOG_ERROR << "grpc input is null!";
return nullptr;
}
return std::shared_ptr<GrpcBaseRequest>(new SearchRequest(search_vector_infos, file_id_array, response));
}
Status
SearchRequest::OnExecute() {
try {
int64_t top_k = search_param_->topk();
int64_t nprobe = search_param_->nprobe();
std::string hdr = "SearchRequest(k=" + std::to_string(top_k) + ", nprob=" + std::to_string(nprobe) + ")";
TimeRecorder rc(hdr);
// step 1: check table name
std::string table_name_ = search_param_->table_name();
auto status = ValidationUtil::ValidateTableName(table_name_);
if (!status.ok()) {
return status;
}
// step 2: check table existence
engine::meta::TableSchema table_info;
table_info.table_id_ = table_name_;
status = DBWrapper::DB()->DescribeTable(table_info);
if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) {
return Status(SERVER_TABLE_NOT_EXIST, TableNotExistMsg(table_name_));
} else {
return status;
}
}
// step 3: check search parameter
status = ValidationUtil::ValidateSearchTopk(top_k, table_info);
if (!status.ok()) {
return status;
}
status = ValidationUtil::ValidateSearchNprobe(nprobe, table_info);
if (!status.ok()) {
return status;
}
if (search_param_->query_record_array().empty()) {
return Status(SERVER_INVALID_ROWRECORD_ARRAY,
"The vector array is empty. Make sure you have entered vector records.");
}
// step 4: check date range, and convert to db dates
std::vector<DB_DATE> dates;
std::vector<::milvus::grpc::Range> range_array;
for (size_t i = 0; i < search_param_->query_range_array_size(); i++) {
range_array.emplace_back(search_param_->query_range_array(i));
}
status = ConvertTimeRangeToDBDates(range_array, dates);
if (!status.ok()) {
return status;
}
rc.RecordSection("check validation");
// step 5: prepare float data
auto record_array_size = search_param_->query_record_array_size();
std::vector<float> vec_f(record_array_size * table_info.dimension_, 0);
for (size_t i = 0; i < record_array_size; i++) {
if (search_param_->query_record_array(i).vector_data().empty()) {
return Status(SERVER_INVALID_ROWRECORD_ARRAY,
"The vector dimension must be equal to the table dimension.");
}
uint64_t query_vec_dim = search_param_->query_record_array(i).vector_data().size();
if (query_vec_dim != table_info.dimension_) {
ErrorCode error_code = SERVER_INVALID_VECTOR_DIMENSION;
std::string error_msg = "The vector dimension must be equal to the table dimension.";
return Status(error_code, error_msg);
}
memcpy(&vec_f[i * table_info.dimension_], search_param_->query_record_array(i).vector_data().data(),
table_info.dimension_ * sizeof(float));
}
rc.RecordSection("prepare vector data");
// step 6: search vectors
engine::ResultIds result_ids;
engine::ResultDistances result_distances;
auto record_count = (uint64_t)search_param_->query_record_array().size();
#ifdef MILVUS_ENABLE_PROFILING
std::string fname =
"/tmp/search_nq_" + std::to_string(this->search_param_->query_record_array_size()) + ".profiling";
ProfilerStart(fname.c_str());
#endif
if (file_id_array_.empty()) {
std::vector<std::string> partition_tags;
for (size_t i = 0; i < search_param_->partition_tag_array_size(); i++) {
partition_tags.emplace_back(search_param_->partition_tag_array(i));
}
status = ValidationUtil::ValidatePartitionTags(partition_tags);
if (!status.ok()) {
return status;
}
status = DBWrapper::DB()->Query(table_name_, partition_tags, (size_t)top_k, record_count, nprobe,
vec_f.data(), dates, result_ids, result_distances);
} else {
status = DBWrapper::DB()->QueryByFileID(table_name_, file_id_array_, (size_t)top_k, record_count, nprobe,
vec_f.data(), dates, result_ids, result_distances);
}
#ifdef MILVUS_ENABLE_PROFILING
ProfilerStop();
#endif
rc.RecordSection("search vectors from engine");
if (!status.ok()) {
return status;
}
if (result_ids.empty()) {
return Status::OK(); // empty table
}
// step 7: construct result array
topk_result_->set_row_num(record_count);
topk_result_->mutable_ids()->Resize(static_cast<int>(result_ids.size()), -1);
memcpy(topk_result_->mutable_ids()->mutable_data(), result_ids.data(), result_ids.size() * sizeof(int64_t));
topk_result_->mutable_distances()->Resize(static_cast<int>(result_distances.size()), 0.0);
memcpy(topk_result_->mutable_distances()->mutable_data(), result_distances.data(),
result_distances.size() * sizeof(float));
// step 8: print time cost percent
rc.RecordSection("construct result and send");
rc.ElapseFromBegin("totally cost");
} catch (std::exception& ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what());
}
return Status::OK();
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <string>
#include <vector>
namespace milvus {
namespace server {
namespace grpc {
class SearchRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const ::milvus::grpc::SearchParam* search_param, const std::vector<std::string>& file_id_array,
::milvus::grpc::TopKQueryResult* response);
protected:
SearchRequest(const ::milvus::grpc::SearchParam* search_param, const std::vector<std::string>& file_id_array,
::milvus::grpc::TopKQueryResult* response);
Status
OnExecute() override;
private:
const ::milvus::grpc::SearchParam* search_param_;
std::vector<std::string> file_id_array_;
::milvus::grpc::TopKQueryResult* topk_result_;
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/ShowPartitionsRequest.h"
#include "server/DBWrapper.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
#include <memory>
#include <vector>
namespace milvus {
namespace server {
namespace grpc {
ShowPartitionsRequest::ShowPartitionsRequest(const std::string& table_name,
::milvus::grpc::PartitionList* partition_list)
: GrpcBaseRequest(INFO_REQUEST_GROUP), table_name_(table_name), partition_list_(partition_list) {
}
BaseRequestPtr
ShowPartitionsRequest::Create(const std::string& table_name, ::milvus::grpc::PartitionList* partition_list) {
return std::shared_ptr<GrpcBaseRequest>(new ShowPartitionsRequest(table_name, partition_list));
}
Status
ShowPartitionsRequest::OnExecute() {
auto status = ValidationUtil::ValidateTableName(table_name_);
if (!status.ok()) {
return status;
}
std::vector<engine::meta::TableSchema> schema_array;
auto statuts = DBWrapper::DB()->ShowPartitions(table_name_, schema_array);
if (!statuts.ok()) {
return statuts;
}
for (auto& schema : schema_array) {
::milvus::grpc::PartitionParam* param = partition_list_->add_partition_array();
param->set_table_name(schema.owner_table_);
param->set_partition_name(schema.table_id_);
param->set_tag(schema.partition_tag_);
}
return Status::OK();
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include <string>
namespace milvus {
namespace server {
namespace grpc {
class ShowPartitionsRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(const std::string& table_name, ::milvus::grpc::PartitionList* partition_list);
protected:
ShowPartitionsRequest(const std::string& table_name, ::milvus::grpc::PartitionList* partition_list);
Status
OnExecute() override;
private:
std::string table_name_;
::milvus::grpc::PartitionList* partition_list_;
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/grpc_impl/request/ShowTablesRequest.h"
#include "server/DBWrapper.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include <memory>
#include <vector>
namespace milvus {
namespace server {
namespace grpc {
ShowTablesRequest::ShowTablesRequest(::milvus::grpc::TableNameList* table_name_list)
: GrpcBaseRequest(INFO_REQUEST_GROUP), table_name_list_(table_name_list) {
}
BaseRequestPtr
ShowTablesRequest::Create(::milvus::grpc::TableNameList* table_name_list) {
return std::shared_ptr<GrpcBaseRequest>(new ShowTablesRequest(table_name_list));
}
Status
ShowTablesRequest::OnExecute() {
std::vector<engine::meta::TableSchema> schema_array;
auto statuts = DBWrapper::DB()->AllTables(schema_array);
if (!statuts.ok()) {
return statuts;
}
for (auto& schema : schema_array) {
table_name_list_->add_table_names(schema.table_id_);
}
return Status::OK();
}
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "server/grpc_impl/request/GrpcBaseRequest.h"
namespace milvus {
namespace server {
namespace grpc {
class ShowTablesRequest : public GrpcBaseRequest {
public:
static BaseRequestPtr
Create(::milvus::grpc::TableNameList* table_name_list);
protected:
explicit ShowTablesRequest(::milvus::grpc::TableNameList* table_name_list);
Status
OnExecute() override;
private:
::milvus::grpc::TableNameList* table_name_list_;
};
} // namespace grpc
} // namespace server
} // namespace milvus

View File

@ -47,7 +47,8 @@ ConfAdapter::Match(const TempMetaConf& metaconf) {
auto conf = std::make_shared<knowhere::Cfg>();
conf->d = metaconf.dim;
conf->metric_type = metaconf.metric_type;
conf->gpu_id = conf->gpu_id;
conf->gpu_id = metaconf.gpu_id;
conf->k = metaconf.k;
MatchBase(conf);
return conf;
}
@ -65,7 +66,7 @@ IVFConfAdapter::Match(const TempMetaConf& metaconf) {
conf->nlist = MatchNlist(metaconf.size, metaconf.nlist);
conf->d = metaconf.dim;
conf->metric_type = metaconf.metric_type;
conf->gpu_id = conf->gpu_id;
conf->gpu_id = metaconf.gpu_id;
MatchBase(conf);
return conf;
}
@ -114,7 +115,7 @@ IVFSQConfAdapter::Match(const TempMetaConf& metaconf) {
conf->nlist = MatchNlist(metaconf.size, metaconf.nlist);
conf->d = metaconf.dim;
conf->metric_type = metaconf.metric_type;
conf->gpu_id = conf->gpu_id;
conf->gpu_id = metaconf.gpu_id;
conf->nbits = 8;
MatchBase(conf);
return conf;
@ -126,7 +127,7 @@ IVFPQConfAdapter::Match(const TempMetaConf& metaconf) {
conf->nlist = MatchNlist(metaconf.size, metaconf.nlist);
conf->d = metaconf.dim;
conf->metric_type = metaconf.metric_type;
conf->gpu_id = conf->gpu_id;
conf->gpu_id = metaconf.gpu_id;
conf->nbits = 8;
if (!(conf->d % 4))
@ -175,21 +176,17 @@ NSGConfAdapter::Match(const TempMetaConf& metaconf) {
conf->nlist = MatchNlist(metaconf.size, metaconf.nlist);
conf->d = metaconf.dim;
conf->metric_type = metaconf.metric_type;
conf->gpu_id = conf->gpu_id;
conf->gpu_id = metaconf.gpu_id;
conf->k = metaconf.k;
double factor = metaconf.size / TYPICAL_COUNT;
auto scale_factor = round(metaconf.dim / 128.0);
scale_factor = scale_factor >= 4 ? 4 : scale_factor;
conf->nprobe = conf->nlist > 10000 ? conf->nlist * 0.02 : conf->nlist * 0.1;
conf->knng = (100 + 100 * scale_factor) * factor;
conf->search_length = (40 + 5 * scale_factor) * factor;
conf->out_degree = (50 + 5 * scale_factor) * factor;
conf->candidate_pool_size = (200 + 100 * scale_factor) * factor;
conf->nprobe = int64_t(conf->nlist * 0.01);
conf->knng = 40 + 10 * scale_factor; // the size of knng
conf->search_length = 40 + 5 * scale_factor;
conf->out_degree = 50 + 5 * scale_factor;
conf->candidate_pool_size = 200 + 100 * scale_factor;
MatchBase(conf);
// WRAPPER_LOG_DEBUG << "nlist: " << conf->nlist
// << ", gpu_id: " << conf->gpu_id << ", d: " << conf->d
// << ", nprobe: " << conf->nprobe << ", knng: " << conf->knng;
return conf;
}

View File

@ -46,9 +46,6 @@ class ConfAdapter {
virtual knowhere::Config
MatchSearch(const TempMetaConf& metaconf, const IndexType& type);
// virtual void
// Dump(){}
protected:
static void
MatchBase(knowhere::Config conf);

View File

@ -66,7 +66,13 @@ set(thirdparty_files
)
aux_source_directory(${MILVUS_ENGINE_SRC}/server server_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/server/grpc_impl grpc_server_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/server/grpc_impl/request grpc_request_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/server/grpc_impl grpc_impl_files)
set(grpc_server_files
${grpc_request_files}
${grpc_impl_files}
)
aux_source_directory(${MILVUS_ENGINE_SRC}/utils utils_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_files)

View File

@ -155,12 +155,10 @@ DBTest::SetUp() {
res_mgr->Clear();
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("disk", "DISK", 0, true, false));
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("cpu", "CPU", 0, true, true));
res_mgr->Add(milvus::scheduler::ResourceFactory::Create("gtx1660", "GPU", 0, true, true));
auto default_conn = milvus::scheduler::Connection("IO", 500.0);
auto PCIE = milvus::scheduler::Connection("IO", 11000.0);
res_mgr->Connect("disk", "cpu", default_conn);
res_mgr->Connect("cpu", "gtx1660", PCIE);
res_mgr->Start();
milvus::scheduler::SchedInst::GetInstance()->Start();

View File

@ -22,8 +22,8 @@
#include "server/Server.h"
#include "server/grpc_impl/GrpcRequestHandler.h"
#include "server/grpc_impl/GrpcRequestScheduler.h"
#include "server/grpc_impl/GrpcRequestTask.h"
#include "src/config.h"
#include "server/grpc_impl/request/GrpcBaseRequest.h"
#include "src/version.h"
#include "grpc/gen-milvus/milvus.grpc.pb.h"
#include "grpc/gen-status/status.pb.h"
@ -453,20 +453,20 @@ TEST_F(RpcHandlerTest, DELETE_BY_RANGE_TEST) {
//////////////////////////////////////////////////////////////////////
namespace {
class DummyTask : public milvus::server::grpc::GrpcBaseTask {
class DummyRequest : public milvus::server::grpc::GrpcBaseRequest {
public:
milvus::Status
OnExecute() override {
return milvus::Status::OK();
}
static milvus::server::grpc::BaseTaskPtr
static milvus::server::grpc::BaseRequestPtr
Create(std::string& dummy) {
return std::shared_ptr<milvus::server::grpc::GrpcBaseTask>(new DummyTask(dummy));
return std::shared_ptr<milvus::server::grpc::GrpcBaseRequest>(new DummyRequest(dummy));
}
public:
explicit DummyTask(std::string& dummy) : GrpcBaseTask(dummy) {
explicit DummyRequest(std::string& dummy) : GrpcBaseRequest(dummy) {
}
};
@ -475,27 +475,27 @@ class RpcSchedulerTest : public testing::Test {
void
SetUp() override {
std::string dummy = "dql";
task_ptr = std::make_shared<DummyTask>(dummy);
request_ptr = std::make_shared<DummyRequest>(dummy);
}
std::shared_ptr<DummyTask> task_ptr;
std::shared_ptr<DummyRequest> request_ptr;
};
} // namespace
TEST_F(RpcSchedulerTest, BASE_TASK_TEST) {
auto status = task_ptr->Execute();
auto status = request_ptr->Execute();
ASSERT_TRUE(status.ok());
milvus::server::grpc::GrpcRequestScheduler::GetInstance().Start();
::milvus::grpc::Status grpc_status;
std::string dummy = "dql";
milvus::server::grpc::BaseTaskPtr base_task_ptr = DummyTask::Create(dummy);
milvus::server::grpc::GrpcRequestScheduler::GetInstance().ExecTask(base_task_ptr, &grpc_status);
milvus::server::grpc::BaseRequestPtr base_task_ptr = DummyRequest::Create(dummy);
milvus::server::grpc::GrpcRequestScheduler::GetInstance().ExecRequest(base_task_ptr, &grpc_status);
milvus::server::grpc::GrpcRequestScheduler::GetInstance().ExecuteTask(task_ptr);
task_ptr = nullptr;
milvus::server::grpc::GrpcRequestScheduler::GetInstance().ExecuteTask(task_ptr);
milvus::server::grpc::GrpcRequestScheduler::GetInstance().ExecuteRequest(request_ptr);
request_ptr = nullptr;
milvus::server::grpc::GrpcRequestScheduler::GetInstance().ExecuteRequest(request_ptr);
milvus::server::grpc::GrpcRequestScheduler::GetInstance().Stop();
}

View File

@ -56,10 +56,6 @@ class KnowhereWrapperTest
index_ = GetVecIndexFactory(index_type);
conf = ParamGenerator::GetInstance().GenBuild(index_type, tempconf);
searchconf = ParamGenerator::GetInstance().GenSearchConf(index_type, tempconf);
// conf->k = k;
// conf->d = dim;
// conf->gpu_id = DEVICEID;
}
void TearDown() override {
@ -97,6 +93,7 @@ INSTANTIATE_TEST_CASE_P(WrapperParam, KnowhereWrapperTest,
TEST_P(KnowhereWrapperTest, BASE_TEST) {
EXPECT_EQ(index_->GetType(), index_type);
// conf->Dump();
auto elems = nq * k;
std::vector<int64_t> res_ids(elems);
@ -191,3 +188,119 @@ TEST(whatever, test_config) {
auto pq_conf = std::make_shared<milvus::engine::IVFPQConfAdapter>();
pq_conf->Match(conf);
}
// #include "knowhere/index/vector_index/IndexIDMAP.h"
// #include "src/wrapper/VecImpl.h"
// #include "src/index/unittest/utils.h"
// The two case below prove NSG is concern with data distribution
// Further work: 1. Use right basedata and pass it by milvus
// a. batch size is 100000 [Pass]
// b. transfer all at once [Pass]
// 2. Use SIFT1M in test and check time cost []
// TEST_P(KnowhereWrapperTest, nsgwithidmap) {
// auto idmap = GetVecIndexFactory(milvus::engine::IndexType::FAISS_IDMAP);
// auto ori_xb = xb;
// auto ori_ids = ids;
// std::vector<float> temp_xb;
// std::vector<int64_t> temp_ids;
// nb = 50000;
// for (int i = 0; i < 20; ++i) {
// GenData(dim, nb, nq, xb, xq, ids, k, gt_ids, gt_dis);
// assert(xb.size() == nb*dim);
// //#define IDMAP
// #ifdef IDMAP
// temp_xb.insert(temp_xb.end(), xb.data(), xb.data() + nb*dim);
// temp_ids.insert(temp_ids.end(), ori_ids.data()+nb*i, ori_ids.data() + nb*(i+1));
// if (i == 0) {
// idmap->BuildAll(nb, temp_xb.data(), temp_ids.data(), conf);
// } else {
// idmap->Add(nb, temp_xb.data(), temp_ids.data());
// }
// temp_xb.clear();
// temp_ids.clear();
// #else
// temp_xb.insert(temp_xb.end(), xb.data(), xb.data() + nb*dim);
// temp_ids.insert(temp_ids.end(), ori_ids.data()+nb*i, ori_ids.data() + nb*(i+1));
// #endif
// }
// #ifdef IDMAP
// auto idmap_idx = std::dynamic_pointer_cast<milvus::engine::BFIndex>(idmap);
// auto x = idmap_idx->Count();
// index_->BuildAll(idmap_idx->Count(), idmap_idx->GetRawVectors(), idmap_idx->GetRawIds(), conf);
// #else
// assert(temp_xb.size() == 1000000*128);
// index_->BuildAll(1000000, temp_xb.data(), ori_ids.data(), conf);
// #endif
// }
// TEST_P(KnowhereWrapperTest, nsgwithsidmap) {
// auto idmap = GetVecIndexFactory(milvus::engine::IndexType::FAISS_IDMAP);
// auto ori_xb = xb;
// std::vector<float> temp_xb;
// std::vector<int64_t> temp_ids;
// nb = 50000;
// for (int i = 0; i < 20; ++i) {
// #define IDMAP
// #ifdef IDMAP
// temp_xb.insert(temp_xb.end(), ori_xb.data()+nb*dim*i, ori_xb.data() + nb*dim*(i+1));
// temp_ids.insert(temp_ids.end(), ids.data()+nb*i, ids.data() + nb*(i+1));
// if (i == 0) {
// idmap->BuildAll(nb, temp_xb.data(), temp_ids.data(), conf);
// } else {
// idmap->Add(nb, temp_xb.data(), temp_ids.data());
// }
// temp_xb.clear();
// temp_ids.clear();
// #else
// temp_xb.insert(temp_xb.end(), ori_xb.data()+nb*dim*i, ori_xb.data() + nb*dim*(i+1));
// temp_ids.insert(temp_ids.end(), ids.data()+nb*i, ids.data() + nb*(i+1));
// #endif
// }
// #ifdef IDMAP
// auto idmap_idx = std::dynamic_pointer_cast<milvus::engine::BFIndex>(idmap);
// auto x = idmap_idx->Count();
// index_->BuildAll(idmap_idx->Count(), idmap_idx->GetRawVectors(), idmap_idx->GetRawIds(), conf);
// #else
// index_->BuildAll(1000000, temp_xb.data(), temp_ids.data(), conf);
// #endif
// // The code use to store raw base data
// FileIOWriter writer("/tmp/newraw");
// ori_xb.shrink_to_fit();
// std::cout << "size" << ori_xb.size();
// writer(static_cast<void*>(ori_xb.data()), ori_xb.size()* sizeof(float));
// std::cout << "Finish!" << std::endl;
// }
// void load_data(char* filename, float*& data, unsigned& num,
// unsigned& dim) { // load data with sift10K pattern
// std::ifstream in(filename, std::ios::binary);
// if (!in.is_open()) {
// std::cout << "open file error" << std::endl;
// exit(-1);
// }
// in.read((char*)&dim, 4);
// in.seekg(0, std::ios::end);
// std::ios::pos_type ss = in.tellg();
// size_t fsize = (size_t)ss;
// num = (unsigned)(fsize / (dim + 1) / 4);
// data = new float[(size_t)num * (size_t)dim];
// in.seekg(0, std::ios::beg);
// for (size_t i = 0; i < num; i++) {
// in.seekg(4, std::ios::cur);
// in.read((char*)(data + i * dim), dim * 4);
// }
// in.close();
// }
// TEST_P(KnowhereWrapperTest, Sift1M) {
// float* data = nullptr;
// unsigned points_num, dim;
// load_data("/mnt/112d53a6-5592-4360-a33b-7fd789456fce/workspace/Data/sift/sift_base.fvecs", data, points_num,
// dim); std::cout << points_num << " " << dim << std::endl;
// index_->BuildAll(points_num, data, ids.data(), conf);
// }

View File

@ -117,6 +117,11 @@ void
DataGenBase::GenData(const int& dim, const int& nb, const int& nq, std::vector<float>& xb, std::vector<float>& xq,
std::vector<int64_t>& ids, const int& k, std::vector<int64_t>& gt_ids,
std::vector<float>& gt_dis) {
xb.clear();
xq.clear();
ids.clear();
gt_ids.clear();
gt_dis.clear();
xb.resize(nb * dim);
xq.resize(nq * dim);
ids.resize(nb);

View File

@ -25,7 +25,6 @@
#include <cstdio>
#include <fstream>
#include "wrapper/VecIndex.h"
#include "wrapper/utils.h"
#include "knowhere/index/vector_index/helpers/IndexParameter.h"
@ -90,17 +89,20 @@ class ParamGenerator {
return instance;
}
knowhere::Config GenSearchConf(const milvus::engine::IndexType& type, const milvus::engine::TempMetaConf& conf) {
knowhere::Config
GenSearchConf(const milvus::engine::IndexType& type, const milvus::engine::TempMetaConf& conf) {
auto adapter = milvus::engine::AdapterMgr::GetInstance().GetAdapter(type);
return adapter->MatchSearch(conf, type);
}
knowhere::Config GenBuild(const milvus::engine::IndexType& type, const milvus::engine::TempMetaConf& conf) {
knowhere::Config
GenBuild(const milvus::engine::IndexType& type, const milvus::engine::TempMetaConf& conf) {
auto adapter = milvus::engine::AdapterMgr::GetInstance().GetAdapter(type);
return adapter->Match(conf);
}
knowhere::Config Gen(const milvus::engine::IndexType& type) {
knowhere::Config
Gen(const milvus::engine::IndexType& type) {
switch (type) {
case milvus::engine::IndexType::FAISS_IDMAP: {
auto tempconf = std::make_shared<knowhere::Cfg>();

View File

@ -0,0 +1,210 @@
# milvus_ivfsq8_test_report_detailed_version
## Summary
This document contains the test reports of IVF_SQ8 index on Milvus single server.
## Test objectives
The time cost and recall when searching with different parameters.
## Test method
### Hardware/Software requirements
Operating System: CentOS Linux release 7.6.1810 (Core)
CPU: Intel(R) Xeon(R) CPU E5-2678 v3 @ 2.50GHz
GPU0: GeForce GTX 1080
GPU1: GeForce GTX 1080
Memory: 503GB
Docker version: 18.09
Nvidia Driver version: 430.34
Milvus version: 0.5.3
SDK interface: Python 3.6.8
Pymilvus version: 0.2.5
### Data model
The data used in the tests are:
- Data source: sift1b
- Data type: hdf5
For details on this dataset, you can check : http://corpus-texmex.irisa.fr/ .
### Measures
- Query Elapsed Time: Time cost (in seconds) to run a query. Variables that affect Query Elapsed Time:
- nq (Number of queried vectors)
> Note: In the query test of query elapsed time, we will test the following parameters with different values:
>
> nq - grouped by: [1, 5, 10, 200, 400, 600, 800, 1000],
- Recall: The fraction of the total amount of relevant instances that were actually retrieved . Variables that affect Recall:
- nq (Number of queried vectors)
- topk (Top k result of a query)
> Note: In the query test of recall, we will test the following parameters with different values:
>
> nq - grouped by: [1, 5, 10, 200, 400, 600, 800, 1000],
>
> topk - grouped by: [1, 10, 100]
## Test reports
### Test environment
Data base: sift1b-1,000,000,000 vectors, 128-dimension
Table Attributes
- nlist: 16384
- metric_type: L2
Query configuration
- nprobe: 32
Milvus configuration
- cpu_cache_capacity: 150
- gpu_cache_capacity: 6
- use_blas_threshold: 1100
You can check the definition of Milvus configuration on https://milvus.io/docs/en/reference/milvus_config/.
Test method
Test the query elapsed time and recall with several parameters, and once only change one parameter.
- Whether to restart Milvus after each query: No
### Performance test
#### Data query
**Test result**
Query Elapsed Time
topk : 100
search_resources: gpu0, gpu1
| nq/topk | topk=100 |
| :-----: | :------: |
| nq=1 | 15.57 |
| nq=10 | 15.80 |
| nq=200 | 15.72 |
| nq=400 | 15.94 |
| nq=600 | 16.58 |
| nq=800 | 16.71 |
| nq=1000 | 16.91 |
When nq is 1000, the query time cost of a 128-dimension vector is around 17ms in GPU Mode.
topk : 100
search_resources: cpu, gpu0
| nq/topk | topk=100 |
| :-----: | :------: |
| nq=1 | 1.12 |
| nq=10 | 2.89 |
| nq=200 | 8.10 |
| nq=400 | 12.36 |
| nq=600 | 17.81 |
| nq=800 | 23.24 |
| nq=1000 | 27.41 |
When nq is 1000, the query time cost of a 128-dimension vector is around 27ms in CPU Mode.
**Conclusion**
The query elapsed time in CPU Mode increases quickly with nq, while in GPU Mode query elapsed time increases much slower. When nq is small, CPU Mode consumes less time than GPU Mode. However, as nq becomes larger, GPU Mode shows its advantage against CPU Mode.
The query elapsed time in GPU Mode consists of two parts: (1) index CPU-to-GPU copy time; (2) nprobe buckets search time. When nq is smaller than 500, index CPU-to-GPU copy time cannot be amortized efficiently, CPU Mode is a better choice; when nq is larger than 500, choosing GPU Mode is better.
Compared with CPU, GPU has much more cores and stronger computing capability. When nq is large, it can better reflect GPU's advantages on computing.
### Recall test
**Test result**
topk = 1 : recall - recall@1
topk = 10 : recall - recall@10
topk = 100 : recall - recall@100
We use the ground_truth in sift1b dataset to calculate the recall of query results.
Recall of GPU Mode
search_resources: gpu0, gpu1
| nq/topk | topk=1 | topk=10 | topk=100 |
| :-----: | :----: | :-----: | :------: |
| nq=1 | 1.000 | 0.800 | 0.790 |
| nq=5 | 0.800 | 0.820 | 0.908 |
| nq=10 | 0.900 | 0.910 | 0.939 |
| nq=200 | 0.955 | 0.941 | 0.929 |
| nq=400 | 0.958 | 0.944 | 0.932 |
| nq=600 | 0.952 | 0.946 | 0.934 |
| nq=800 | 0.941 | 0.943 | 0.930 |
| nq=1000 | 0.938 | 0.942 | 0.930 |
Recall of CPU Mode
search_resources: cpu, gpu0
| nq/topk | topk=1 | topk=10 | topk=100 |
| :-----: | :----: | :-----: | :------: |
| nq=1 | 1.000 | 0.800 | 0.790 |
| nq=5 | 0.800 | 0.820 | 0.908 |
| nq=10 | 0.900 | 0.910 | 0.939 |
| nq=200 | 0.955 | 0.941 | 0.929 |
| nq=400 | 0.958 | 0.944 | 0.932 |
| nq=600 | 0.952 | 0.946 | 0.934 |
| nq=800 | 0.941 | 0.943 | 0.930 |
| nq=1000 | 0.938 | 0.942 | 0.930 |
**Conclusion**
As nq increases, the recall gradually stabilizes to over 93%.

View File

@ -0,0 +1,211 @@
# milvus_ivfsq8_test_report_detailed_version_cn
## 概述
本文描述了ivfsq8索引在milvus单机部署方式下的测试报告。
## 测试目标
参数不同情况下的查询时间和召回率。
## 测试方法
### 软硬件环境
操作系统: CentOS Linux release 7.6.1810 (Core)
CPU: Intel(R) Xeon(R) CPU E5-2678 v3 @ 2.50GHz
GPU0: GeForce GTX 1080
GPU1: GeForce GTX 1080
内存: 503GB
Docker版本: 18.09
Nvidia Driver版本: 430.34
Milvus版本: 0.5.3
SDK接口: Python 3.6.8
Pymilvus版本: 0.2.5
### 数据模型
本测试中用到的主要数据:
- 数据来源: sift1b
- 数据类型: hdf5
关于该数据集的详细信息请参考 : http://corpus-texmex.irisa.fr/ 。
### 测试指标
- Query Elapsed Time: 数据库查询所有向量的时间以秒计。影响Query Elapsed Time的变量:
- nq (被查询向量的数量)
> 备注:在向量查询测试中,我们会测试下面参数不同的取值来观察结果:
>
> 被查询向量的数量nq将按照 [1, 5, 10, 200, 400, 600, 800, 1000]的数量分组。
- Recall: 实际返回的正确结果占总数之比 . 影响Recall的变量:
- nq (被查询向量的数量)
- topk (单条查询中最相似的K个结果)
> 备注:在向量准确性测试中,我们会测试下面参数不同的取值来观察结果:
>
> 被查询向量的数量nq将按照 [1, 5, 10, 200, 400, 600, 800, 1000]的数量分组,
>
> 单条查询中最相似的K个结果topk将按照[1, 10, 100]的数量分组。
## 测试报告
### 测试环境
数据集: sift1b-1,000,000,000向量, 128维
表格属性:
- nlist: 16384
- metric_type: L2
查询设置:
- nprobe: 32
Milvus设置
- cpu_cache_capacity: 150
- gpu_cache_capacity: 6
- use_blas_threshold: 1100
你可以在 https://milvus.io/docs/en/reference/milvus_config/上查询Milvus设置的详细定义。
测试方法
通过一次仅改变一个参数的值,测试查询向量时间和召回率。
- 查询后是否重启Milvus
### 性能测试
#### 数据查询
测试结果
Query Elapsed Time
topk : 100
search_resources: gpu0, gpu1
| nq/topk | topk=100 |
| :-----: | :------: |
| nq=1 | 15.57 |
| nq=10 | 15.80 |
| nq=200 | 15.72 |
| nq=400 | 15.94 |
| nq=600 | 16.58 |
| nq=800 | 16.71 |
| nq=1000 | 16.91 |
当nq为1000时在GPU模式下查询一条128维向量需要耗时约17毫秒。
topk : 100
search_resources: cpu, gpu0
| nq/topk | topk=100 |
| :-----: | :------: |
| nq=1 | 1.12 |
| nq=10 | 2.89 |
| nq=200 | 8.10 |
| nq=400 | 12.36 |
| nq=600 | 17.81 |
| nq=800 | 23.24 |
| nq=1000 | 27.41 |
当nq为1000时在GPU模式下查询一条128维向量需要耗时约27毫秒。
**总结**
在CPU模式下查询耗时随nq的增长快速增大而在GPU模式下查询耗时的增大则缓慢许多。当nq较小时CPU模式比GPU模式耗时更少。但当nq足够大时GPU模式则更具有优势。
在GPU模式下的查询耗时由两部分组成1索引从CPU到GPU的拷贝时间2所有分桶的查询时间。当nq小于500时索引从CPU到GPU 的拷贝时间无法被有效均摊此时CPU模式时一个更优的选择当nq大于500时选择GPU模式更合理。
和CPU相比GPU具有更多的核数和更强的算力。当nq较大时GPU在计算上的优势能被更好地被体现。
### 召回率测试
**测试结果**
topk = 1 : recall - recall@1
topk = 10 : recall - recall@10
topk = 100 : recall - recall@100
我们利用sift1b数据集中的ground_truth来计算查询结果的召回率。
Recall of GPU Mode
search_resources: gpu0, gpu1
| nq/topk | topk=1 | topk=10 | topk=100 |
| :-----: | :----: | :-----: | :------: |
| nq=1 | 1.000 | 0.800 | 0.790 |
| nq=5 | 0.800 | 0.820 | 0.908 |
| nq=10 | 0.900 | 0.910 | 0.939 |
| nq=200 | 0.955 | 0.941 | 0.929 |
| nq=400 | 0.958 | 0.944 | 0.932 |
| nq=600 | 0.952 | 0.946 | 0.934 |
| nq=800 | 0.941 | 0.943 | 0.930 |
| nq=1000 | 0.938 | 0.942 | 0.930 |
Recall of CPU Mode
search_resources: cpu, gpu0
| nq/topk | topk=1 | topk=10 | topk=100 |
| :-----: | :----: | :-----: | :------: |
| nq=1 | 1.000 | 0.800 | 0.790 |
| nq=5 | 0.800 | 0.820 | 0.908 |
| nq=10 | 0.900 | 0.910 | 0.939 |
| nq=200 | 0.955 | 0.941 | 0.929 |
| nq=400 | 0.958 | 0.944 | 0.932 |
| nq=600 | 0.952 | 0.946 | 0.934 |
| nq=800 | 0.941 | 0.943 | 0.930 |
| nq=1000 | 0.938 | 0.942 | 0.930 |
**总结**
随着nq的增大召回率逐渐稳定至93%以上。

View File

@ -11,7 +11,9 @@ grpc_server = Server()
def create_app(testing_config=None):
config = testing_config if testing_config else settings.DefaultConfig
db.init_db(uri=config.SQLALCHEMY_DATABASE_URI, echo=config.SQL_ECHO)
db.init_db(uri=config.SQLALCHEMY_DATABASE_URI, echo=config.SQL_ECHO, pool_size=config.SQL_POOL_SIZE,
pool_recycle=config.SQL_POOL_RECYCLE, pool_timeout=config.SQL_POOL_TIMEOUT,
pool_pre_ping=config.SQL_POOL_PRE_PING, max_overflow=config.SQL_MAX_OVERFLOW)
from mishards.connections import ConnectionMgr
connect_mgr = ConnectionMgr()

View File

@ -23,15 +23,12 @@ class DB:
uri and self.init_db(uri, echo)
self.session_factory = scoped_session(sessionmaker(class_=LocalSession, db=self))
def init_db(self, uri, echo=False):
def init_db(self, uri, echo=False, pool_size=100, pool_recycle=5, pool_timeout=30, pool_pre_ping=True, max_overflow=0):
url = make_url(uri)
if url.get_backend_name() == 'sqlite':
self.engine = create_engine(url)
else:
self.engine = create_engine(uri, pool_size=100, pool_recycle=5, pool_timeout=30,
pool_pre_ping=True,
echo=echo,
max_overflow=0)
self.engine = create_engine(uri, pool_size, pool_recycle, pool_timeout, pool_pre_ping, echo, max_overflow)
self.uri = uri
self.url = url

View File

@ -50,10 +50,16 @@ class TracingConfig:
}
}
max_overflow=0
class DefaultConfig:
SQLALCHEMY_DATABASE_URI = env.str('SQLALCHEMY_DATABASE_URI')
SQL_ECHO = env.bool('SQL_ECHO', False)
SQL_POOL_SIZE = env.int('pool_size', 100)
SQL_POOL_RECYCLE = env.int('pool_recycle', 5)
SQL_POOL_TIMEOUT = env.int('pool_timeout', 30)
SQL_POOL_PRE_PING = env.bool('pool_pre_ping', True)
SQL_MAX_OVERFLOW = env.int('max_overflow', 0)
TRACER_PLUGIN_PATH = env.str('TRACER_PLUGIN_PATH', '')
TRACER_CLASS_NAME = env.str('TRACER_CLASS_NAME', '')
ROUTER_PLUGIN_PATH = env.str('ROUTER_PLUGIN_PATH', '')
@ -65,5 +71,10 @@ class DefaultConfig:
class TestingConfig(DefaultConfig):
SQLALCHEMY_DATABASE_URI = env.str('SQLALCHEMY_DATABASE_TEST_URI', '')
SQL_ECHO = env.bool('SQL_TEST_ECHO', False)
SQL_POOL_SIZE = env.int('pool_size', 100)
SQL_POOL_RECYCLE = env.int('pool_recycle', 5)
SQL_POOL_TIMEOUT = env.int('pool_timeout', 30)
SQL_POOL_PRE_PING = env.bool('pool_pre_ping', True)
SQL_MAX_OVERFLOW = env.int('max_overflow', 0)
TRACER_CLASS_NAME = env.str('TRACER_CLASS_TEST_NAME', '')
ROUTER_CLASS_NAME = env.str('ROUTER_CLASS_TEST_NAME', 'FileBasedHashRingRouter')