mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-30 23:45:28 +08:00
Merge remote-tracking branch 'main/branch-0.4.0' into branch-0.4.0
Former-commit-id: 824e89aeca41aebd20ad63bbe3d136126e0ba336
This commit is contained in:
commit
90cd973587
@ -3,8 +3,6 @@ container('milvus-build-env') {
|
||||
gitlabCommitStatus(name: 'Build Engine') {
|
||||
dir ("milvus_engine") {
|
||||
try {
|
||||
def knowhere_build_dir = "${env.WORKSPACE}/milvus_engine/cpp/src/core/cmake_build"
|
||||
|
||||
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [[$class: 'SubmoduleOption',disableSubmodules: false,parentCredentials: true,recursiveSubmodules: true,reference: '',trackingSubmodules: false]], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/milvus.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
|
||||
|
||||
/*
|
||||
@ -17,7 +15,7 @@ container('milvus-build-env') {
|
||||
dir ("cpp") {
|
||||
sh "git config --global user.email \"test@zilliz.com\""
|
||||
sh "git config --global user.name \"test\""
|
||||
sh "./build.sh -t ${params.BUILD_TYPE} -k ${knowhere_build_dir} -j -u -c"
|
||||
sh "./build.sh -t ${params.BUILD_TYPE} -j -u -c"
|
||||
}
|
||||
} catch (exc) {
|
||||
updateGitlabCommitStatus name: 'Build Engine', state: 'failed'
|
||||
|
||||
@ -3,8 +3,6 @@ container('milvus-build-env') {
|
||||
gitlabCommitStatus(name: 'Build Engine') {
|
||||
dir ("milvus_engine") {
|
||||
try {
|
||||
def knowhere_build_dir = "${env.WORKSPACE}/milvus_engine/cpp/src/core/cmake_build"
|
||||
|
||||
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [[$class: 'SubmoduleOption',disableSubmodules: false,parentCredentials: true,recursiveSubmodules: true,reference: '',trackingSubmodules: false]], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/milvus.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
|
||||
|
||||
/*
|
||||
@ -17,7 +15,7 @@ container('milvus-build-env') {
|
||||
dir ("cpp") {
|
||||
sh "git config --global user.email \"test@zilliz.com\""
|
||||
sh "git config --global user.name \"test\""
|
||||
sh "./build.sh -t ${params.BUILD_TYPE} -k ${knowhere_build_dir} -j"
|
||||
sh "./build.sh -t ${params.BUILD_TYPE} -j"
|
||||
}
|
||||
} catch (exc) {
|
||||
updateGitlabCommitStatus name: 'Build Engine', state: 'failed'
|
||||
|
||||
@ -31,6 +31,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
- MS-492 - Drop index failed if index have been created with index_type: FLAT
|
||||
- MS-493 - Knowhere unittest crash
|
||||
- MS-453 - GPU search error when nprobe set more than 1024
|
||||
- MS-474 - Create index hang if use branch-0.3.1 server config
|
||||
- MS-510 - unittest out of memory and crashed
|
||||
- MS-507 - Dataset 10m-512, index type sq8,performance in-normal when set CPU_CACHE to 16 or 64
|
||||
|
||||
@ -101,6 +102,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
- MS-504 - Update node_test in scheduler
|
||||
- MS-505 - Install core unit test and add to coverage
|
||||
- MS-508 - Update normal_test in scheduler
|
||||
- MS-532 - Add grpc server unittest
|
||||
- MS-511 - Update resource_test in scheduler
|
||||
- MS-517 - Update resource_mgr_test in scheduler
|
||||
- MS-518 - Add schedinst_test in scheduler
|
||||
@ -115,6 +117,7 @@ Please mark all change in change log and use the ticket from JIRA.
|
||||
- MS-533 - Update resource_test to cover dump function
|
||||
- MS-523 - Config file validation
|
||||
- MS-539 - Remove old task code
|
||||
- MS-546 - Add simple mode resource_config
|
||||
|
||||
## New Feature
|
||||
- MS-343 - Implement ResourceMgr
|
||||
|
||||
11
cpp/build.sh
11
cpp/build.sh
@ -9,15 +9,12 @@ DB_PATH="/opt/milvus"
|
||||
PROFILING="OFF"
|
||||
BUILD_FAISS_WITH_MKL="OFF"
|
||||
USE_JFROG_CACHE="OFF"
|
||||
KNOWHERE_BUILD_DIR="`pwd`/src/core/cmake_build"
|
||||
KNOWHERE_OPTIONS="-t ${BUILD_TYPE}"
|
||||
|
||||
while getopts "p:d:t:k:uhrcgmj" arg
|
||||
while getopts "p:d:t:uhrcgmj" arg
|
||||
do
|
||||
case $arg in
|
||||
t)
|
||||
BUILD_TYPE=$OPTARG # BUILD_TYPE
|
||||
KNOWHERE_OPTIONS="-t ${BUILD_TYPE}"
|
||||
;;
|
||||
u)
|
||||
echo "Build and run unittest cases" ;
|
||||
@ -41,15 +38,11 @@ do
|
||||
g)
|
||||
PROFILING="ON"
|
||||
;;
|
||||
k)
|
||||
KNOWHERE_BUILD_DIR=$OPTARG
|
||||
;;
|
||||
m)
|
||||
BUILD_FAISS_WITH_MKL="ON"
|
||||
;;
|
||||
j)
|
||||
USE_JFROG_CACHE="ON"
|
||||
KNOWHERE_OPTIONS="${KNOWHERE_OPTIONS} -j"
|
||||
;;
|
||||
h) # help
|
||||
echo "
|
||||
@ -62,7 +55,6 @@ parameter:
|
||||
-r: remove previous build directory(default: OFF)
|
||||
-c: code coverage(default: OFF)
|
||||
-g: profiling(default: OFF)
|
||||
-k: specify knowhere header/binary path
|
||||
-m: build faiss with MKL(default: OFF)
|
||||
-j: use jfrog cache build directory
|
||||
|
||||
@ -96,7 +88,6 @@ if [[ ${MAKE_CLEAN} == "ON" ]]; then
|
||||
-DMILVUS_DB_PATH=${DB_PATH} \
|
||||
-DMILVUS_ENABLE_PROFILING=${PROFILING} \
|
||||
-DBUILD_FAISS_WITH_MKL=${BUILD_FAISS_WITH_MKL} \
|
||||
-DKNOWHERE_BUILD_DIR=${KNOWHERE_BUILD_DIR} \
|
||||
-DUSE_JFROG_CACHE=${USE_JFROG_CACHE} \
|
||||
../"
|
||||
echo ${CMAKE_CMD}
|
||||
|
||||
@ -96,6 +96,8 @@ define_option(MILVUS_WITH_ZLIB "Build with zlib compression" ON)
|
||||
|
||||
define_option(MILVUS_WITH_KNOWHERE "Build with Knowhere" OFF)
|
||||
|
||||
#define_option(MILVUS_ENABLE_PROFILING "Build with profiling" ON)
|
||||
|
||||
if(CMAKE_VERSION VERSION_LESS 3.7)
|
||||
set(MILVUS_WITH_ZSTD_DEFAULT OFF)
|
||||
else()
|
||||
|
||||
@ -31,51 +31,12 @@ cache_config:
|
||||
cpu_cache_capacity: 16 # how many memory are used as cache, unit: GB, range: 0 ~ less than total memory
|
||||
cpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
|
||||
insert_cache_immediately: false # insert data will be load into cache immediately for hot query
|
||||
gpu_cache_capacity: 5 # how many memory are used as cache in gpu, unit: GB, RANGE: 0 ~ less than total memory
|
||||
gpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
|
||||
|
||||
engine_config:
|
||||
use_blas_threshold: 20
|
||||
|
||||
resource_config:
|
||||
# resource list, length: 0~N
|
||||
# please set a DISK resource and a CPU resource least, or system will not return query result.
|
||||
#
|
||||
# example:
|
||||
# resource_name: # resource name, just using in connections below
|
||||
# type: DISK # resource type, optional: DISK/CPU/GPU
|
||||
# device_id: 0
|
||||
# enable_executor: false # if is enable executor, optional: true, false
|
||||
|
||||
mode: simple
|
||||
resources:
|
||||
ssda:
|
||||
type: DISK
|
||||
device_id: 0
|
||||
enable_executor: false
|
||||
|
||||
cpu:
|
||||
type: CPU
|
||||
device_id: 0
|
||||
enable_executor: false
|
||||
|
||||
gpu0:
|
||||
type: GPU
|
||||
device_id: 0
|
||||
enable_executor: true
|
||||
gpu_resource_num: 2
|
||||
pinned_memory: 300
|
||||
temp_memory: 300
|
||||
|
||||
# connection list, length: 0~N
|
||||
# example:
|
||||
# connection_name:
|
||||
# speed: 100 # unit: MS/s
|
||||
# endpoint: ${resource_name}===${resource_name}
|
||||
connections:
|
||||
io:
|
||||
speed: 500
|
||||
endpoint: ssda===cpu
|
||||
pcie0:
|
||||
speed: 11000
|
||||
endpoint: cpu===gpu0
|
||||
|
||||
# - cpu
|
||||
- gpu0
|
||||
|
||||
@ -13,6 +13,10 @@ DIR_LCOV_OUTPUT="lcov_out"
|
||||
|
||||
DIR_GCNO="cmake_build"
|
||||
DIR_UNITTEST="milvus/unittest"
|
||||
|
||||
# delete old code coverage info files
|
||||
rm -rf lcov_out
|
||||
rm -f FILE_INFO_BASE FILE_INFO_MILVUS FILE_INFO_OUTPUT FILE_INFO_OUTPUT_NEW
|
||||
|
||||
MYSQL_USER_NAME=root
|
||||
MYSQL_PASSWORD=Fantast1c
|
||||
@ -84,7 +88,7 @@ done
|
||||
|
||||
mysql_exc "DROP DATABASE IF EXISTS ${MYSQL_DB_NAME};"
|
||||
|
||||
# gen test converage
|
||||
# gen code coverage
|
||||
${LCOV_CMD} -d ${DIR_GCNO} -o "${FILE_INFO_MILVUS}" -c
|
||||
# merge coverage
|
||||
${LCOV_CMD} -a ${FILE_INFO_BASE} -a ${FILE_INFO_MILVUS} -o "${FILE_INFO_OUTPUT}"
|
||||
@ -96,6 +100,10 @@ ${LCOV_CMD} -r "${FILE_INFO_OUTPUT}" -o "${FILE_INFO_OUTPUT_NEW}" \
|
||||
"*/cmake_build/*_ep-prefix/*" \
|
||||
"src/core/cmake_build*" \
|
||||
"src/core/thirdparty*" \
|
||||
"src/grpc*"\
|
||||
"src/server/Server.cpp"\
|
||||
"src/server/DBWrapper.cpp"\
|
||||
"src/server/grpc_impl/GrpcMilvusServer.cpp"\
|
||||
|
||||
# gen html report
|
||||
${LCOV_GEN_CMD} "${FILE_INFO_OUTPUT_NEW}" --output-directory ${DIR_LCOV_OUTPUT}/
|
||||
2
cpp/src/cache/GpuCacheMgr.cpp
vendored
2
cpp/src/cache/GpuCacheMgr.cpp
vendored
@ -23,7 +23,7 @@ namespace {
|
||||
GpuCacheMgr::GpuCacheMgr() {
|
||||
server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE);
|
||||
|
||||
int64_t cap = config.GetInt64Value(server::CONFIG_GPU_CACHE_CAPACITY, 2);
|
||||
int64_t cap = config.GetInt64Value(server::CONFIG_GPU_CACHE_CAPACITY, 0);
|
||||
cap *= G_BYTE;
|
||||
cache_ = std::make_shared<Cache>(cap, 1UL<<32);
|
||||
|
||||
|
||||
@ -79,5 +79,5 @@ install(TARGETS test_idmap DESTINATION unittest)
|
||||
install(TARGETS test_kdt DESTINATION unittest)
|
||||
|
||||
#add_subdirectory(faiss_ori)
|
||||
add_subdirectory(test_nsg)
|
||||
#add_subdirectory(test_nsg)
|
||||
|
||||
|
||||
@ -10,7 +10,7 @@ namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace engine {
|
||||
|
||||
constexpr uint64_t MAXINT = 99999;
|
||||
constexpr uint64_t MAXINT = std::numeric_limits<uint32_t >::max();
|
||||
|
||||
uint64_t
|
||||
ShortestPath(const ResourcePtr &src,
|
||||
|
||||
@ -8,6 +8,8 @@
|
||||
#include "server/ServerConfig.h"
|
||||
#include "ResourceFactory.h"
|
||||
#include "knowhere/index/vector_index/gpu_ivf.h"
|
||||
#include "Utils.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
@ -20,73 +22,132 @@ SchedulerPtr SchedInst::instance = nullptr;
|
||||
std::mutex SchedInst::mutex_;
|
||||
|
||||
void
|
||||
StartSchedulerService() {
|
||||
try {
|
||||
server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE);
|
||||
load_simple_config() {
|
||||
server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE);
|
||||
auto mode = config.GetValue("mode", "simple");
|
||||
|
||||
if (config.GetChildren().empty()) throw "resource_config null exception";
|
||||
|
||||
auto resources = config.GetChild(server::CONFIG_RESOURCES).GetChildren();
|
||||
|
||||
if (resources.empty()) throw "Children of resource_config null exception";
|
||||
|
||||
for (auto &resource : resources) {
|
||||
auto &resname = resource.first;
|
||||
auto &resconf = resource.second;
|
||||
auto type = resconf.GetValue(server::CONFIG_RESOURCE_TYPE);
|
||||
// auto memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_MEMORY);
|
||||
auto device_id = resconf.GetInt64Value(server::CONFIG_RESOURCE_DEVICE_ID);
|
||||
// auto enable_loader = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_LOADER);
|
||||
auto enable_loader = true;
|
||||
auto enable_executor = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_EXECUTOR);
|
||||
auto pinned_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_PIN_MEMORY);
|
||||
auto temp_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_TEMP_MEMORY);
|
||||
auto resource_num = resconf.GetInt64Value(server::CONFIG_RESOURCE_NUM);
|
||||
|
||||
auto res = ResMgrInst::GetInstance()->Add(ResourceFactory::Create(resname,
|
||||
type,
|
||||
device_id,
|
||||
enable_loader,
|
||||
enable_executor));
|
||||
|
||||
if (res.lock()->type() == ResourceType::GPU) {
|
||||
auto pinned_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_PIN_MEMORY, 300);
|
||||
auto temp_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_TEMP_MEMORY, 300);
|
||||
auto resource_num = resconf.GetInt64Value(server::CONFIG_RESOURCE_NUM, 2);
|
||||
pinned_memory = 1024 * 1024 * pinned_memory;
|
||||
temp_memory = 1024 * 1024 * temp_memory;
|
||||
knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(device_id,
|
||||
pinned_memory,
|
||||
temp_memory,
|
||||
resource_num);
|
||||
auto resources = config.GetSequence("resources");
|
||||
bool cpu = false;
|
||||
std::set<uint64_t> gpu_ids;
|
||||
for (auto &resource : resources) {
|
||||
if (resource == "cpu") {
|
||||
cpu = true;
|
||||
break;
|
||||
} else {
|
||||
if (resource.length() < 4 || resource.substr(0, 3) != "gpu") {
|
||||
// error
|
||||
exit(-1);
|
||||
}
|
||||
auto gpu_id = std::stoi(resource.substr(3));
|
||||
if (gpu_id >= get_num_gpu()) {
|
||||
// error
|
||||
exit(-1);
|
||||
}
|
||||
gpu_ids.insert(gpu_id);
|
||||
}
|
||||
}
|
||||
|
||||
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("disk", "DISK", 0, true, false));
|
||||
auto io = Connection("io", 500);
|
||||
if (cpu) {
|
||||
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("cpu", "CPU", 0, true, true));
|
||||
ResMgrInst::GetInstance()->Connect("disk", "cpu", io);
|
||||
} else {
|
||||
ResMgrInst::GetInstance()->Add(ResourceFactory::Create("cpu", "CPU", 0, true, false));
|
||||
ResMgrInst::GetInstance()->Connect("disk", "cpu", io);
|
||||
|
||||
auto pcie = Connection("pcie", 12000);
|
||||
for (auto &gpu_id : gpu_ids) {
|
||||
ResMgrInst::GetInstance()->Add(ResourceFactory::Create(std::to_string(gpu_id), "GPU", gpu_id, true, true));
|
||||
ResMgrInst::GetInstance()->Connect("cpu", std::to_string(gpu_id), io);
|
||||
auto pinned_memory = 300;
|
||||
auto temp_memory = 300;
|
||||
auto resource_num = 2;
|
||||
pinned_memory = 1024 * 1024 * pinned_memory;
|
||||
temp_memory = 1024 * 1024 * temp_memory;
|
||||
knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(gpu_id,
|
||||
pinned_memory,
|
||||
temp_memory,
|
||||
resource_num);
|
||||
}
|
||||
|
||||
knowhere::FaissGpuResourceMgr::GetInstance().InitResource();
|
||||
|
||||
auto connections = config.GetChild(server::CONFIG_RESOURCE_CONNECTIONS).GetChildren();
|
||||
if(connections.empty()) throw "connections config null exception";
|
||||
for (auto &conn : connections) {
|
||||
auto &connect_name = conn.first;
|
||||
auto &connect_conf = conn.second;
|
||||
auto connect_speed = connect_conf.GetInt64Value(server::CONFIG_SPEED_CONNECTIONS);
|
||||
auto connect_endpoint = connect_conf.GetValue(server::CONFIG_ENDPOINT_CONNECTIONS);
|
||||
|
||||
std::string delimiter = "===";
|
||||
std::string left = connect_endpoint.substr(0, connect_endpoint.find(delimiter));
|
||||
std::string right = connect_endpoint.substr(connect_endpoint.find(delimiter) + 3,
|
||||
connect_endpoint.length());
|
||||
|
||||
auto connection = Connection(connect_name, connect_speed);
|
||||
ResMgrInst::GetInstance()->Connect(left, right, connection);
|
||||
}
|
||||
} catch (const char* msg) {
|
||||
SERVER_LOG_ERROR << msg;
|
||||
// TODO: throw exception instead
|
||||
exit(-1);
|
||||
// throw std::exception();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
load_advance_config() {
|
||||
// try {
|
||||
// server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE);
|
||||
//
|
||||
// if (config.GetChildren().empty()) throw "resource_config null exception";
|
||||
//
|
||||
// auto resources = config.GetChild(server::CONFIG_RESOURCES).GetChildren();
|
||||
//
|
||||
// if (resources.empty()) throw "Children of resource_config null exception";
|
||||
//
|
||||
// for (auto &resource : resources) {
|
||||
// auto &resname = resource.first;
|
||||
// auto &resconf = resource.second;
|
||||
// auto type = resconf.GetValue(server::CONFIG_RESOURCE_TYPE);
|
||||
//// auto memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_MEMORY);
|
||||
// auto device_id = resconf.GetInt64Value(server::CONFIG_RESOURCE_DEVICE_ID);
|
||||
//// auto enable_loader = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_LOADER);
|
||||
// auto enable_loader = true;
|
||||
// auto enable_executor = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_EXECUTOR);
|
||||
// auto pinned_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_PIN_MEMORY);
|
||||
// auto temp_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_TEMP_MEMORY);
|
||||
// auto resource_num = resconf.GetInt64Value(server::CONFIG_RESOURCE_NUM);
|
||||
//
|
||||
// auto res = ResMgrInst::GetInstance()->Add(ResourceFactory::Create(resname,
|
||||
// type,
|
||||
// device_id,
|
||||
// enable_loader,
|
||||
// enable_executor));
|
||||
//
|
||||
// if (res.lock()->type() == ResourceType::GPU) {
|
||||
// auto pinned_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_PIN_MEMORY, 300);
|
||||
// auto temp_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_TEMP_MEMORY, 300);
|
||||
// auto resource_num = resconf.GetInt64Value(server::CONFIG_RESOURCE_NUM, 2);
|
||||
// pinned_memory = 1024 * 1024 * pinned_memory;
|
||||
// temp_memory = 1024 * 1024 * temp_memory;
|
||||
// knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(device_id,
|
||||
// pinned_memory,
|
||||
// temp_memory,
|
||||
// resource_num);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// knowhere::FaissGpuResourceMgr::GetInstance().InitResource();
|
||||
//
|
||||
// auto connections = config.GetChild(server::CONFIG_RESOURCE_CONNECTIONS).GetChildren();
|
||||
// if (connections.empty()) throw "connections config null exception";
|
||||
// for (auto &conn : connections) {
|
||||
// auto &connect_name = conn.first;
|
||||
// auto &connect_conf = conn.second;
|
||||
// auto connect_speed = connect_conf.GetInt64Value(server::CONFIG_SPEED_CONNECTIONS);
|
||||
// auto connect_endpoint = connect_conf.GetValue(server::CONFIG_ENDPOINT_CONNECTIONS);
|
||||
//
|
||||
// std::string delimiter = "===";
|
||||
// std::string left = connect_endpoint.substr(0, connect_endpoint.find(delimiter));
|
||||
// std::string right = connect_endpoint.substr(connect_endpoint.find(delimiter) + 3,
|
||||
// connect_endpoint.length());
|
||||
//
|
||||
// auto connection = Connection(connect_name, connect_speed);
|
||||
// ResMgrInst::GetInstance()->Connect(left, right, connection);
|
||||
// }
|
||||
// } catch (const char *msg) {
|
||||
// SERVER_LOG_ERROR << msg;
|
||||
// // TODO: throw exception instead
|
||||
// exit(-1);
|
||||
//// throw std::exception();
|
||||
// }
|
||||
}
|
||||
|
||||
void
|
||||
StartSchedulerService() {
|
||||
load_simple_config();
|
||||
// load_advance_config();
|
||||
ResMgrInst::GetInstance()->Start();
|
||||
SchedInst::GetInstance()->Start();
|
||||
}
|
||||
|
||||
@ -7,6 +7,7 @@
|
||||
#include "Utils.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <cuda_runtime.h>
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
@ -21,6 +22,13 @@ get_current_timestamp() {
|
||||
return millis;
|
||||
}
|
||||
|
||||
uint64_t
|
||||
get_num_gpu() {
|
||||
int n_devices = 0;
|
||||
cudaGetDeviceCount(&n_devices);
|
||||
return n_devices;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -14,6 +14,9 @@ namespace engine {
|
||||
uint64_t
|
||||
get_current_timestamp();
|
||||
|
||||
uint64_t
|
||||
get_num_gpu();
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -152,10 +152,10 @@ XSearchTask::Execute() {
|
||||
return;
|
||||
}
|
||||
|
||||
ENGINE_LOG_DEBUG << "Searching in file id " << index_id_ << " with "
|
||||
ENGINE_LOG_DEBUG << "Searching in file id:" << index_id_ << " with "
|
||||
<< search_contexts_.size() << " tasks";
|
||||
|
||||
server::TimeRecorder rc("DoSearch file id " + std::to_string(index_id_));
|
||||
server::TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_));
|
||||
|
||||
server::CollectDurationMetrics metrics(index_type_);
|
||||
|
||||
@ -163,16 +163,16 @@ XSearchTask::Execute() {
|
||||
std::vector<float> output_distance;
|
||||
for (auto &context : search_contexts_) {
|
||||
//step 1: allocate memory
|
||||
auto nq = context->nq();
|
||||
auto topk = context->topk();
|
||||
auto nprobe = context->nprobe();
|
||||
auto vectors = context->vectors();
|
||||
uint64_t nq = context->nq();
|
||||
uint64_t topk = context->topk();
|
||||
uint64_t nprobe = context->nprobe();
|
||||
const float* vectors = context->vectors();
|
||||
|
||||
output_ids.resize(topk * nq);
|
||||
output_distance.resize(topk * nq);
|
||||
std::string hdr = "context " + context->Identity() +
|
||||
" nq " + std::to_string(nq) +
|
||||
" topk " + std::to_string(topk);
|
||||
" nq " + std::to_string(nq) +
|
||||
" topk " + std::to_string(topk);
|
||||
|
||||
try {
|
||||
//step 2: search
|
||||
|
||||
@ -22,10 +22,10 @@ std::string GetTableName();
|
||||
|
||||
const std::string TABLE_NAME = GetTableName();
|
||||
constexpr int64_t TABLE_DIMENSION = 512;
|
||||
constexpr int64_t TABLE_INDEX_FILE_SIZE = 768;
|
||||
constexpr int64_t TABLE_INDEX_FILE_SIZE = 1024;
|
||||
constexpr int64_t BATCH_ROW_COUNT = 100000;
|
||||
constexpr int64_t NQ = 100;
|
||||
constexpr int64_t TOP_K = 10;
|
||||
constexpr int64_t TOP_K = 1;
|
||||
constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different
|
||||
constexpr int64_t ADD_VECTOR_LOOP = 1;
|
||||
constexpr int64_t SECONDS_EACH_HOUR = 3600;
|
||||
@ -283,14 +283,14 @@ ClientTest::Test(const std::string& address, const std::string& port) {
|
||||
int64_t row_count = 0;
|
||||
Status stat = conn->CountTable(TABLE_NAME, row_count);
|
||||
std::cout << TABLE_NAME << "(" << row_count << " rows)" << std::endl;
|
||||
DoSearch(conn, search_record_array, "Search without index");
|
||||
// DoSearch(conn, search_record_array, "Search without index");
|
||||
}
|
||||
|
||||
{//wait unit build index finish
|
||||
std::cout << "Wait until create all index done" << std::endl;
|
||||
IndexParam index;
|
||||
index.table_name = TABLE_NAME;
|
||||
index.index_type = IndexType::gpu_ivfflat;
|
||||
index.index_type = IndexType::gpu_ivfsq8;
|
||||
index.nlist = 16384;
|
||||
Status stat = conn->CreateIndex(index);
|
||||
std::cout << "CreateIndex function call status: " << stat.ToString() << std::endl;
|
||||
@ -306,7 +306,9 @@ ClientTest::Test(const std::string& address, const std::string& port) {
|
||||
}
|
||||
|
||||
{//search vectors after build index finish
|
||||
DoSearch(conn, search_record_array, "Search after build index finish");
|
||||
for (uint64_t i = 0; i < 5; ++i) {
|
||||
DoSearch(conn, search_record_array, "Search after build index finish");
|
||||
}
|
||||
// std::cout << conn->DumpTaskTables() << std::endl;
|
||||
}
|
||||
|
||||
@ -338,7 +340,6 @@ ClientTest::Test(const std::string& address, const std::string& port) {
|
||||
std::cout << "Server status before disconnect: " << status << std::endl;
|
||||
}
|
||||
Connection::Destroy(conn);
|
||||
// conn->Disconnect();
|
||||
{//server status
|
||||
std::string status = conn->ServerStatus();
|
||||
std::cout << "Server status after disconnect: " << status << std::endl;
|
||||
|
||||
@ -86,11 +86,12 @@ ErrorCode ServerConfig::ValidateConfig() {
|
||||
ErrorCode
|
||||
ServerConfig::CheckServerConfig() {
|
||||
/*
|
||||
server_config:
|
||||
address: 0.0.0.0 # milvus server ip address
|
||||
port: 19530 # the port milvus listen to, default: 19530, range: 1025 ~ 65534
|
||||
gpu_index: 0 # the gpu milvus use, default: 0, range: 0 ~ gpu number - 1
|
||||
mode: single # milvus deployment type: single, cluster, read_only
|
||||
server_config:
|
||||
address: 0.0.0.0 # milvus server ip address (IPv4)
|
||||
port: 19530 # the port milvus listen to, default: 19530, range: 1025 ~ 65534
|
||||
mode: single # milvus deployment type: single, cluster, read_only
|
||||
time_zone: UTC+8 # Use the UTC-x or UTC+x to specify a time zone. eg. UTC+8 for China Standard Time
|
||||
|
||||
*/
|
||||
bool okay = true;
|
||||
ConfigNode server_config = GetConfig(CONFIG_SERVER);
|
||||
@ -144,20 +145,20 @@ ServerConfig::CheckServerConfig() {
|
||||
ErrorCode
|
||||
ServerConfig::CheckDBConfig() {
|
||||
/*
|
||||
db_config:
|
||||
db_path: @MILVUS_DB_PATH@ # milvus data storage path
|
||||
db_slave_path: # secondry data storage path, split by semicolon
|
||||
parallel_reduce: false # use multi-threads to reduce topk result
|
||||
db_config:
|
||||
db_path: @MILVUS_DB_PATH@ # milvus data storage path
|
||||
db_slave_path: # secondry data storage path, split by semicolon
|
||||
|
||||
# URI format: dialect://username:password@host:port/database
|
||||
# All parts except dialect are optional, but you MUST include the delimiters
|
||||
# Currently dialect supports mysql or sqlite
|
||||
db_backend_url: sqlite://:@:/
|
||||
# URI format: dialect://username:password@host:port/database
|
||||
# All parts except dialect are optional, but you MUST include the delimiters
|
||||
# Currently dialect supports mysql or sqlite
|
||||
db_backend_url: sqlite://:@:/
|
||||
|
||||
archive_disk_threshold: 0 # triger archive action if storage size exceed this value, 0 means no limit, unit: GB
|
||||
archive_days_threshold: 0 # files older than x days will be archived, 0 means no limit, unit: day
|
||||
insert_buffer_size: 4 # maximum insert buffer size allowed, default: 4, unit: GB, should be at least 1 GB.
|
||||
# the sum of insert_buffer_size and cpu_cache_capacity should be less than total memory, unit: GB
|
||||
archive_disk_threshold: 0 # triger archive action if storage size exceed this value, 0 means no limit, unit: GB
|
||||
archive_days_threshold: 0 # files older than x days will be archived, 0 means no limit, unit: day
|
||||
insert_buffer_size: 4 # maximum insert buffer size allowed, default: 4, unit: GB, should be at least 1 GB.
|
||||
# the sum of insert_buffer_size and cpu_cache_capacity should be less than total memory, unit: GB
|
||||
build_index_gpu: 0 # which gpu is used to build index, default: 0, range: 0 ~ gpu number - 1
|
||||
*/
|
||||
bool okay = true;
|
||||
ConfigNode db_config = GetConfig(CONFIG_DB);
|
||||
@ -249,15 +250,13 @@ ServerConfig::CheckMetricConfig() {
|
||||
ErrorCode
|
||||
ServerConfig::CheckCacheConfig() {
|
||||
/*
|
||||
cache_config:
|
||||
cpu_cache_capacity: 16 # how many memory are used as cache, unit: GB, range: 0 ~ less than total memory
|
||||
cpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
|
||||
insert_cache_immediately: false # insert data will be load into cache immediately for hot query
|
||||
gpu_cache_capacity: 5 # how many memory are used as cache in gpu, unit: GB, RANGE: 0 ~ less than total memory
|
||||
gpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
|
||||
gpu_ids: # gpu id
|
||||
- 0
|
||||
- 1
|
||||
cache_config:
|
||||
cpu_cache_capacity: 16 # how many memory are used as cache, unit: GB, range: 0 ~ less than total memory
|
||||
cpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
|
||||
insert_cache_immediately: false # insert data will be load into cache immediately for hot query
|
||||
gpu_cache_capacity: 5 # how many memory are used as cache in gpu, unit: GB, RANGE: 0 ~ less than total memory
|
||||
gpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
|
||||
|
||||
*/
|
||||
bool okay = true;
|
||||
ConfigNode cache_config = GetConfig(CONFIG_CACHE);
|
||||
@ -305,7 +304,7 @@ ServerConfig::CheckCacheConfig() {
|
||||
okay = false;
|
||||
}
|
||||
|
||||
std::string gpu_cache_capacity_str = cache_config.GetValue(CONFIG_GPU_CACHE_CAPACITY, "5");
|
||||
std::string gpu_cache_capacity_str = cache_config.GetValue(CONFIG_GPU_CACHE_CAPACITY, "0");
|
||||
if (ValidationUtil::ValidateStringIsNumber(gpu_cache_capacity_str) != SERVER_SUCCESS) {
|
||||
std::cerr << "ERROR: gpu_cache_capacity " << gpu_cache_capacity_str << " is not a number" << std::endl;
|
||||
okay = false;
|
||||
@ -378,187 +377,213 @@ ServerConfig::CheckEngineConfig() {
|
||||
|
||||
ErrorCode
|
||||
ServerConfig::CheckResourceConfig() {
|
||||
/*
|
||||
|
||||
resource_config:
|
||||
# resource list, length: 0~N
|
||||
# please set a DISK resource and a CPU resource least, or system will not return query result.
|
||||
#
|
||||
# example:
|
||||
# resource_name: # resource name, just using in connections below
|
||||
# type: DISK # resource type, optional: DISK/CPU/GPU
|
||||
# device_id: 0
|
||||
# enable_executor: false # if is enable executor, optional: true, false
|
||||
|
||||
resources:
|
||||
ssda:
|
||||
type: DISK
|
||||
device_id: 0
|
||||
enable_executor: false
|
||||
|
||||
cpu:
|
||||
type: CPU
|
||||
device_id: 0
|
||||
enable_executor: false
|
||||
|
||||
gpu0:
|
||||
type: GPU
|
||||
device_id: 0
|
||||
enable_executor: true
|
||||
gpu_resource_num: 2
|
||||
pinned_memory: 300
|
||||
temp_memory: 300
|
||||
|
||||
# connection list, length: 0~N
|
||||
# example:
|
||||
# connection_name:
|
||||
# speed: 100 # unit: MS/s
|
||||
# endpoint: ${resource_name}===${resource_name}
|
||||
connections:
|
||||
io:
|
||||
speed: 500
|
||||
endpoint: ssda===cpu
|
||||
pcie0:
|
||||
speed: 11000
|
||||
endpoint: cpu===gpu0
|
||||
*/
|
||||
/*
|
||||
resource_config:
|
||||
mode: simple
|
||||
resources:
|
||||
- cpu
|
||||
- gpu0
|
||||
- gpu100
|
||||
*/
|
||||
bool okay = true;
|
||||
server::ConfigNode resource_config = GetConfig(CONFIG_RESOURCE);
|
||||
if (resource_config.GetChildren().empty()) {
|
||||
std::cerr << "ERROR: no context under resource" << std::endl;
|
||||
server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE);
|
||||
auto mode = config.GetValue("mode", "simple");
|
||||
if (mode != "simple") {
|
||||
std::cerr << "ERROR: invalid resource config: mode is " << mode << std::endl;
|
||||
okay = false;
|
||||
}
|
||||
|
||||
auto resources = resource_config.GetChild(CONFIG_RESOURCES).GetChildren();
|
||||
|
||||
auto resources = config.GetSequence("resources");
|
||||
if (resources.empty()) {
|
||||
std::cerr << "no resources specified" << std::endl;
|
||||
std::cerr << "ERROR: invalid resource config: resources empty" << std::endl;
|
||||
okay = false;
|
||||
}
|
||||
|
||||
bool resource_valid_flag = false;
|
||||
bool hasDisk = false;
|
||||
bool hasCPU = false;
|
||||
bool hasExecutor = false;
|
||||
std::set<std::string> resource_list;
|
||||
for (auto &resource : resources) {
|
||||
resource_list.emplace(resource.first);
|
||||
auto &resource_conf = resource.second;
|
||||
auto type = resource_conf.GetValue(CONFIG_RESOURCE_TYPE);
|
||||
|
||||
std::string device_id_str = resource_conf.GetValue(CONFIG_RESOURCE_DEVICE_ID, "0");
|
||||
int32_t device_id = -1;
|
||||
if (ValidationUtil::ValidateStringIsNumber(device_id_str) != SERVER_SUCCESS) {
|
||||
std::cerr << "ERROR: device_id " << device_id_str << " is not a number" << std::endl;
|
||||
okay = false;
|
||||
} else {
|
||||
device_id = std::stol(device_id_str);
|
||||
}
|
||||
|
||||
std::string enable_executor_str = resource_conf.GetValue(CONFIG_RESOURCE_ENABLE_EXECUTOR, "off");
|
||||
if (ValidationUtil::ValidateStringIsBool(enable_executor_str) != SERVER_SUCCESS) {
|
||||
std::cerr << "ERROR: invalid enable_executor config: " << enable_executor_str << std::endl;
|
||||
okay = false;
|
||||
}
|
||||
|
||||
if (type == "DISK") {
|
||||
hasDisk = true;
|
||||
} else if (type == "CPU") {
|
||||
hasCPU = true;
|
||||
if (resource_conf.GetBoolValue(CONFIG_RESOURCE_ENABLE_EXECUTOR, false)) {
|
||||
hasExecutor = true;
|
||||
}
|
||||
}
|
||||
else if (type == "GPU") {
|
||||
int build_index_gpu_index = GetConfig(CONFIG_DB).GetInt32Value(CONFIG_DB_BUILD_INDEX_GPU, 0);
|
||||
if (device_id == build_index_gpu_index) {
|
||||
resource_valid_flag = true;
|
||||
}
|
||||
if (resource_conf.GetBoolValue(CONFIG_RESOURCE_ENABLE_EXECUTOR, false)) {
|
||||
hasExecutor = true;
|
||||
}
|
||||
std::string gpu_resource_num_str = resource_conf.GetValue(CONFIG_RESOURCE_NUM, "2");
|
||||
if (ValidationUtil::ValidateStringIsNumber(gpu_resource_num_str) != SERVER_SUCCESS) {
|
||||
std::cerr << "ERROR: gpu_resource_num " << gpu_resource_num_str << " is not a number" << std::endl;
|
||||
okay = false;
|
||||
}
|
||||
bool mem_valid = true;
|
||||
std::string pinned_memory_str = resource_conf.GetValue(CONFIG_RESOURCE_PIN_MEMORY, "300");
|
||||
if (ValidationUtil::ValidateStringIsNumber(pinned_memory_str) != SERVER_SUCCESS) {
|
||||
std::cerr << "ERROR: pinned_memory " << pinned_memory_str << " is not a number" << std::endl;
|
||||
okay = false;
|
||||
mem_valid = false;
|
||||
}
|
||||
std::string temp_memory_str = resource_conf.GetValue(CONFIG_RESOURCE_TEMP_MEMORY, "300");
|
||||
if (ValidationUtil::ValidateStringIsNumber(temp_memory_str) != SERVER_SUCCESS) {
|
||||
std::cerr << "ERROR: temp_memory " << temp_memory_str << " is not a number" << std::endl;
|
||||
okay = false;
|
||||
mem_valid = false;
|
||||
}
|
||||
if (mem_valid) {
|
||||
size_t gpu_memory;
|
||||
if (ValidationUtil::GetGpuMemory(device_id, gpu_memory) != SERVER_SUCCESS) {
|
||||
std::cerr << "ERROR: could not get gpu memory for device " << device_id << std::endl;
|
||||
okay = false;
|
||||
}
|
||||
else {
|
||||
size_t prealoc_mem = std::stol(pinned_memory_str) + std::stol(temp_memory_str);
|
||||
if (prealoc_mem >= gpu_memory) {
|
||||
std::cerr << "ERROR: sum of pinned_memory and temp_memory " << prealoc_mem
|
||||
<< " exceeds total gpu memory " << gpu_memory << " for device " << device_id << std::endl;
|
||||
okay = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!resource_valid_flag) {
|
||||
std::cerr << "Building index GPU can't be found in resource config." << std::endl;
|
||||
okay = false;
|
||||
}
|
||||
if (!hasDisk || !hasCPU) {
|
||||
std::cerr << "No DISK or CPU resource" << std::endl;
|
||||
okay = false;
|
||||
}
|
||||
if (!hasExecutor) {
|
||||
std::cerr << "No CPU or GPU resource has executor enabled" << std::endl;
|
||||
okay = false;
|
||||
}
|
||||
|
||||
auto connections = resource_config.GetChild(CONFIG_RESOURCE_CONNECTIONS).GetChildren();
|
||||
for (auto &connection : connections) {
|
||||
auto &connection_conf = connection.second;
|
||||
|
||||
std::string speed_str = connection_conf.GetValue(CONFIG_SPEED_CONNECTIONS);
|
||||
if (ValidationUtil::ValidateStringIsNumber(speed_str) != SERVER_SUCCESS) {
|
||||
std::cerr << "ERROR: speed " << speed_str << " is not a number" << std::endl;
|
||||
okay = false;
|
||||
}
|
||||
|
||||
std::string endpoint_str = connection_conf.GetValue(CONFIG_ENDPOINT_CONNECTIONS);
|
||||
std::string delimiter = "===";
|
||||
auto delimiter_pos = endpoint_str.find(delimiter);
|
||||
if (delimiter_pos == std::string::npos) {
|
||||
std::cerr << "ERROR: invalid endpoint format: " << endpoint_str << std::endl;
|
||||
okay = false;
|
||||
} else {
|
||||
std::string left_resource = endpoint_str.substr(0, delimiter_pos);
|
||||
if (resource_list.find(left_resource) == resource_list.end()) {
|
||||
std::cerr << "ERROR: left resource " << left_resource << " does not exist" << std::endl;
|
||||
okay = false;
|
||||
}
|
||||
std::string right_resource = endpoint_str.substr(delimiter_pos + delimiter.length(), endpoint_str.length());
|
||||
if (resource_list.find(right_resource) == resource_list.end()) {
|
||||
std::cerr << "ERROR: right resource " << right_resource << " does not exist" << std::endl;
|
||||
okay = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return (okay ? SERVER_SUCCESS : SERVER_INVALID_ARGUMENT);
|
||||
}
|
||||
|
||||
//ErrorCode
|
||||
//ServerConfig::CheckResourceConfig() {
|
||||
/*
|
||||
resource_config:
|
||||
# resource list, length: 0~N
|
||||
# please set a DISK resource and a CPU resource least, or system will not return query result.
|
||||
#
|
||||
# example:
|
||||
# resource_name: # resource name, just using in connections below
|
||||
# type: DISK # resource type, optional: DISK/CPU/GPU
|
||||
# device_id: 0
|
||||
# enable_executor: false # if is enable executor, optional: true, false
|
||||
|
||||
resources:
|
||||
ssda:
|
||||
type: DISK
|
||||
device_id: 0
|
||||
enable_executor: false
|
||||
|
||||
cpu:
|
||||
type: CPU
|
||||
device_id: 0
|
||||
enable_executor: true
|
||||
|
||||
gpu0:
|
||||
type: GPU
|
||||
device_id: 0
|
||||
enable_executor: false
|
||||
gpu_resource_num: 2
|
||||
pinned_memory: 300
|
||||
temp_memory: 300
|
||||
|
||||
# connection list, length: 0~N
|
||||
# example:
|
||||
# connection_name:
|
||||
# speed: 100 # unit: MS/s
|
||||
# endpoint: ${resource_name}===${resource_name}
|
||||
connections:
|
||||
io:
|
||||
speed: 500
|
||||
endpoint: ssda===cpu
|
||||
pcie0:
|
||||
speed: 11000
|
||||
endpoint: cpu===gpu0
|
||||
*/
|
||||
// bool okay = true;
|
||||
// server::ConfigNode resource_config = GetConfig(CONFIG_RESOURCE);
|
||||
// if (resource_config.GetChildren().empty()) {
|
||||
// std::cerr << "ERROR: no context under resource" << std::endl;
|
||||
// okay = false;
|
||||
// }
|
||||
//
|
||||
// auto resources = resource_config.GetChild(CONFIG_RESOURCES).GetChildren();
|
||||
//
|
||||
// if (resources.empty()) {
|
||||
// std::cerr << "no resources specified" << std::endl;
|
||||
// okay = false;
|
||||
// }
|
||||
//
|
||||
// bool resource_valid_flag = false;
|
||||
// bool hasDisk = false;
|
||||
// bool hasCPU = false;
|
||||
// bool hasExecutor = false;
|
||||
// std::set<std::string> resource_list;
|
||||
// for (auto &resource : resources) {
|
||||
// resource_list.emplace(resource.first);
|
||||
// auto &resource_conf = resource.second;
|
||||
// auto type = resource_conf.GetValue(CONFIG_RESOURCE_TYPE);
|
||||
//
|
||||
// std::string device_id_str = resource_conf.GetValue(CONFIG_RESOURCE_DEVICE_ID, "0");
|
||||
// int32_t device_id = -1;
|
||||
// if (ValidationUtil::ValidateStringIsNumber(device_id_str) != SERVER_SUCCESS) {
|
||||
// std::cerr << "ERROR: device_id " << device_id_str << " is not a number" << std::endl;
|
||||
// okay = false;
|
||||
// } else {
|
||||
// device_id = std::stol(device_id_str);
|
||||
// }
|
||||
//
|
||||
// std::string enable_executor_str = resource_conf.GetValue(CONFIG_RESOURCE_ENABLE_EXECUTOR, "off");
|
||||
// if (ValidationUtil::ValidateStringIsBool(enable_executor_str) != SERVER_SUCCESS) {
|
||||
// std::cerr << "ERROR: invalid enable_executor config: " << enable_executor_str << std::endl;
|
||||
// okay = false;
|
||||
// }
|
||||
//
|
||||
// if (type == "DISK") {
|
||||
// hasDisk = true;
|
||||
// } else if (type == "CPU") {
|
||||
// hasCPU = true;
|
||||
// if (resource_conf.GetBoolValue(CONFIG_RESOURCE_ENABLE_EXECUTOR, false)) {
|
||||
// hasExecutor = true;
|
||||
// }
|
||||
// }
|
||||
// else if (type == "GPU") {
|
||||
// int build_index_gpu_index = GetConfig(CONFIG_DB).GetInt32Value(CONFIG_DB_BUILD_INDEX_GPU, 0);
|
||||
// if (device_id == build_index_gpu_index) {
|
||||
// resource_valid_flag = true;
|
||||
// }
|
||||
// if (resource_conf.GetBoolValue(CONFIG_RESOURCE_ENABLE_EXECUTOR, false)) {
|
||||
// hasExecutor = true;
|
||||
// }
|
||||
// std::string gpu_resource_num_str = resource_conf.GetValue(CONFIG_RESOURCE_NUM, "2");
|
||||
// if (ValidationUtil::ValidateStringIsNumber(gpu_resource_num_str) != SERVER_SUCCESS) {
|
||||
// std::cerr << "ERROR: gpu_resource_num " << gpu_resource_num_str << " is not a number" << std::endl;
|
||||
// okay = false;
|
||||
// }
|
||||
// bool mem_valid = true;
|
||||
// std::string pinned_memory_str = resource_conf.GetValue(CONFIG_RESOURCE_PIN_MEMORY, "300");
|
||||
// if (ValidationUtil::ValidateStringIsNumber(pinned_memory_str) != SERVER_SUCCESS) {
|
||||
// std::cerr << "ERROR: pinned_memory " << pinned_memory_str << " is not a number" << std::endl;
|
||||
// okay = false;
|
||||
// mem_valid = false;
|
||||
// }
|
||||
// std::string temp_memory_str = resource_conf.GetValue(CONFIG_RESOURCE_TEMP_MEMORY, "300");
|
||||
// if (ValidationUtil::ValidateStringIsNumber(temp_memory_str) != SERVER_SUCCESS) {
|
||||
// std::cerr << "ERROR: temp_memory " << temp_memory_str << " is not a number" << std::endl;
|
||||
// okay = false;
|
||||
// mem_valid = false;
|
||||
// }
|
||||
// if (mem_valid) {
|
||||
// size_t gpu_memory;
|
||||
// if (ValidationUtil::GetGpuMemory(device_id, gpu_memory) != SERVER_SUCCESS) {
|
||||
// std::cerr << "ERROR: could not get gpu memory for device " << device_id << std::endl;
|
||||
// okay = false;
|
||||
// }
|
||||
// else {
|
||||
// size_t prealoc_mem = std::stol(pinned_memory_str) + std::stol(temp_memory_str);
|
||||
// if (prealoc_mem >= gpu_memory) {
|
||||
// std::cerr << "ERROR: sum of pinned_memory and temp_memory " << prealoc_mem
|
||||
// << " exceeds total gpu memory " << gpu_memory << " for device " << device_id << std::endl;
|
||||
// okay = false;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// if (!resource_valid_flag) {
|
||||
// std::cerr << "Building index GPU can't be found in resource config." << std::endl;
|
||||
// okay = false;
|
||||
// }
|
||||
// if (!hasDisk || !hasCPU) {
|
||||
// std::cerr << "No DISK or CPU resource" << std::endl;
|
||||
// okay = false;
|
||||
// }
|
||||
// if (!hasExecutor) {
|
||||
// std::cerr << "No CPU or GPU resource has executor enabled" << std::endl;
|
||||
// okay = false;
|
||||
// }
|
||||
//
|
||||
// auto connections = resource_config.GetChild(CONFIG_RESOURCE_CONNECTIONS).GetChildren();
|
||||
// for (auto &connection : connections) {
|
||||
// auto &connection_conf = connection.second;
|
||||
//
|
||||
// std::string speed_str = connection_conf.GetValue(CONFIG_SPEED_CONNECTIONS);
|
||||
// if (ValidationUtil::ValidateStringIsNumber(speed_str) != SERVER_SUCCESS) {
|
||||
// std::cerr << "ERROR: speed " << speed_str << " is not a number" << std::endl;
|
||||
// okay = false;
|
||||
// }
|
||||
//
|
||||
// std::string endpoint_str = connection_conf.GetValue(CONFIG_ENDPOINT_CONNECTIONS);
|
||||
// std::string delimiter = "===";
|
||||
// auto delimiter_pos = endpoint_str.find(delimiter);
|
||||
// if (delimiter_pos == std::string::npos) {
|
||||
// std::cerr << "ERROR: invalid endpoint format: " << endpoint_str << std::endl;
|
||||
// okay = false;
|
||||
// } else {
|
||||
// std::string left_resource = endpoint_str.substr(0, delimiter_pos);
|
||||
// if (resource_list.find(left_resource) == resource_list.end()) {
|
||||
// std::cerr << "ERROR: left resource " << left_resource << " does not exist" << std::endl;
|
||||
// okay = false;
|
||||
// }
|
||||
// std::string right_resource = endpoint_str.substr(delimiter_pos + delimiter.length(), endpoint_str.length());
|
||||
// if (resource_list.find(right_resource) == resource_list.end()) {
|
||||
// std::cerr << "ERROR: right resource " << right_resource << " does not exist" << std::endl;
|
||||
// okay = false;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// return (okay ? SERVER_SUCCESS : SERVER_INVALID_ARGUMENT);
|
||||
// return SERVER_SUCCESS;
|
||||
//}
|
||||
|
||||
void
|
||||
ServerConfig::PrintAll() const {
|
||||
if (const ConfigMgr *mgr = ConfigMgr::GetInstance()) {
|
||||
|
||||
@ -79,12 +79,9 @@ GrpcRequestHandler::Search(::grpc::ServerContext *context,
|
||||
BaseTaskPtr task_ptr = SearchTask::Create(request, file_id_array, response);
|
||||
::milvus::grpc::Status grpc_status;
|
||||
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
|
||||
if (grpc_status.error_code() != SERVER_SUCCESS) {
|
||||
::grpc::Status status(::grpc::INVALID_ARGUMENT, grpc_status.reason());
|
||||
return status;
|
||||
} else {
|
||||
return ::grpc::Status::OK;
|
||||
}
|
||||
response->mutable_status()->set_error_code(grpc_status.error_code());
|
||||
response->mutable_status()->set_reason(grpc_status.reason());
|
||||
return ::grpc::Status::OK;
|
||||
}
|
||||
|
||||
::grpc::Status
|
||||
@ -100,12 +97,9 @@ GrpcRequestHandler::SearchInFiles(::grpc::ServerContext *context,
|
||||
BaseTaskPtr task_ptr = SearchTask::Create(request_mutable->mutable_search_param(), file_id_array, response);
|
||||
::milvus::grpc::Status grpc_status;
|
||||
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
|
||||
if (grpc_status.error_code() != SERVER_SUCCESS) {
|
||||
::grpc::Status status(::grpc::INVALID_ARGUMENT, grpc_status.reason());
|
||||
return status;
|
||||
} else {
|
||||
return ::grpc::Status::OK;
|
||||
}
|
||||
response->mutable_status()->set_error_code(grpc_status.error_code());
|
||||
response->mutable_status()->set_reason(grpc_status.reason());
|
||||
return ::grpc::Status::OK;
|
||||
}
|
||||
|
||||
::grpc::Status
|
||||
|
||||
@ -14,6 +14,7 @@
|
||||
#include "GrpcMilvusServer.h"
|
||||
#include "db/Utils.h"
|
||||
#include "scheduler/SchedInst.h"
|
||||
//#include <gperftools/profiler.h>
|
||||
|
||||
#include "src/server/Server.h"
|
||||
|
||||
@ -411,7 +412,6 @@ InsertTask::InsertTask(const ::milvus::grpc::InsertParam *insert_param,
|
||||
: GrpcBaseTask(DDL_DML_TASK_GROUP),
|
||||
insert_param_(insert_param),
|
||||
record_ids_(record_ids) {
|
||||
record_ids_->Clear();
|
||||
}
|
||||
|
||||
BaseTaskPtr
|
||||
@ -474,8 +474,7 @@ InsertTask::OnExecute() {
|
||||
rc.RecordSection("check validation");
|
||||
|
||||
#ifdef MILVUS_ENABLE_PROFILING
|
||||
std::string fname = "/tmp/insert_" + std::to_string(this->record_array_.size()) +
|
||||
"_" + GetCurrTimeStr() + ".profiling";
|
||||
std::string fname = "/tmp/insert_" + std::to_string(this->insert_param_->row_record_array_size()) + ".profiling";
|
||||
ProfilerStart(fname.c_str());
|
||||
#endif
|
||||
|
||||
@ -628,12 +627,6 @@ SearchTask::OnExecute() {
|
||||
|
||||
double span_check = rc.RecordSection("check validation");
|
||||
|
||||
#ifdef MILVUS_ENABLE_PROFILING
|
||||
std::string fname = "/tmp/search_nq_" + std::to_string(this->record_array_.size()) +
|
||||
"_top_" + std::to_string(this->top_k_) + "_" +
|
||||
GetCurrTimeStr() + ".profiling";
|
||||
ProfilerStart(fname.c_str());
|
||||
#endif
|
||||
|
||||
//step 5: prepare float data
|
||||
auto record_array_size = search_param_->query_record_array_size();
|
||||
@ -660,6 +653,11 @@ SearchTask::OnExecute() {
|
||||
engine::QueryResults results;
|
||||
auto record_count = (uint64_t) search_param_->query_record_array().size();
|
||||
|
||||
#ifdef MILVUS_ENABLE_PROFILING
|
||||
std::string fname = "/tmp/search_nq_" + std::to_string(this->search_param_->query_record_array_size()) + ".profiling";
|
||||
ProfilerStart(fname.c_str());
|
||||
#endif
|
||||
|
||||
if (file_id_array_.empty()) {
|
||||
stat = DBWrapper::DB()->Query(table_name_, (size_t) top_k, record_count, nprobe, vec_f.data(),
|
||||
dates, results);
|
||||
@ -668,6 +666,10 @@ SearchTask::OnExecute() {
|
||||
record_count, nprobe, vec_f.data(), dates, results);
|
||||
}
|
||||
|
||||
#ifdef MILVUS_ENABLE_PROFILING
|
||||
ProfilerStop();
|
||||
#endif
|
||||
|
||||
rc.RecordSection("search vectors from engine");
|
||||
if (!stat.ok()) {
|
||||
return SetError(DB_META_TRANSACTION_FAILED, stat.ToString());
|
||||
@ -693,10 +695,6 @@ SearchTask::OnExecute() {
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef MILVUS_ENABLE_PROFILING
|
||||
ProfilerStop();
|
||||
#endif
|
||||
|
||||
//step 8: print time cost percent
|
||||
rc.RecordSection("construct result and send");
|
||||
rc.ElapseFromBegin("totally cost");
|
||||
@ -833,9 +831,7 @@ DeleteByRangeTask::OnExecute() {
|
||||
}
|
||||
|
||||
#ifdef MILVUS_ENABLE_PROFILING
|
||||
std::string fname = "/tmp/search_nq_" + std::to_string(this->record_array_.size()) +
|
||||
"_top_" + std::to_string(this->top_k_) + "_" +
|
||||
GetCurrTimeStr() + ".profiling";
|
||||
std::string fname = "/tmp/search_nq_" + this->delete_by_range_param_->table_name() + ".profiling";
|
||||
ProfilerStart(fname.c_str());
|
||||
#endif
|
||||
engine::Status status = DBWrapper::DB()->DeleteTable(table_name, dates);
|
||||
|
||||
@ -224,6 +224,9 @@ ValidationUtil::ValidateDbURI(const std::string &uri) {
|
||||
okay = false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Could be DNS, skip checking
|
||||
*
|
||||
std::string host = pieces_match[4].str();
|
||||
if (!host.empty() && host != "localhost") {
|
||||
if (ValidateIpAddress(host) != SERVER_SUCCESS) {
|
||||
@ -231,6 +234,7 @@ ValidationUtil::ValidateDbURI(const std::string &uri) {
|
||||
okay = false;
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
std::string port = pieces_match[5].str();
|
||||
if (!port.empty()) {
|
||||
|
||||
@ -6,29 +6,85 @@
|
||||
include_directories(${MILVUS_ENGINE_SRC}/)
|
||||
include_directories(/usr/include)
|
||||
|
||||
include_directories(/usr/include/mysql)
|
||||
|
||||
include_directories(/usr/local/cuda/include)
|
||||
link_directories(/usr/local/cuda/lib64)
|
||||
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/db db_main_files)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/db/engine db_engine_files)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/db/insert db_insert_files)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/db/meta db_meta_files)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/cache cache_srcs)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper wrapper_src)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/wrapper/knowhere knowhere_src)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/action scheduler_action_srcs)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/event scheduler_event_srcs)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_srcs)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_srcs)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler scheduler_srcs)
|
||||
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/server server_src)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/server/grpc_impl grpc_server_src)
|
||||
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files)
|
||||
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/task scheduler_task_files)
|
||||
|
||||
aux_source_directory(./ test_srcs)
|
||||
|
||||
set(db_scheduler_srcs
|
||||
${scheduler_files}
|
||||
${scheduler_context_files}
|
||||
${scheduler_task_files}
|
||||
)
|
||||
|
||||
set(db_src
|
||||
${config_files}
|
||||
${cache_srcs}
|
||||
${db_main_files}
|
||||
${db_engine_files}
|
||||
${db_insert_files}
|
||||
${db_meta_files}
|
||||
${db_scheduler_srcs}
|
||||
${wrapper_src}
|
||||
${scheduler_action_srcs}
|
||||
${scheduler_event_srcs}
|
||||
${scheduler_resource_srcs}
|
||||
${scheduler_task_srcs}
|
||||
${scheduler_srcs}
|
||||
${knowhere_src}
|
||||
${util_files}
|
||||
${require_files}
|
||||
${test_srcs}
|
||||
)
|
||||
|
||||
set(utils_srcs
|
||||
${MILVUS_ENGINE_SRC}/utils/StringHelpFunctions.cpp
|
||||
${MILVUS_ENGINE_SRC}/utils/TimeRecorder.cpp
|
||||
${MILVUS_ENGINE_SRC}/utils/CommonUtil.cpp
|
||||
${MILVUS_ENGINE_SRC}/utils/LogUtil.cpp
|
||||
${MILVUS_ENGINE_SRC}/utils/ValidationUtil.cpp
|
||||
${MILVUS_ENGINE_SRC}/utils/SignalUtil.cpp
|
||||
)
|
||||
|
||||
set(grpc_service_files
|
||||
${MILVUS_ENGINE_SRC}/grpc/gen-milvus/milvus.grpc.pb.cc
|
||||
${MILVUS_ENGINE_SRC}/grpc/gen-milvus/milvus.pb.cc
|
||||
${MILVUS_ENGINE_SRC}/grpc/gen-status/status.grpc.pb.cc
|
||||
${MILVUS_ENGINE_SRC}/grpc/gen-status/status.pb.cc
|
||||
)
|
||||
|
||||
include_directories(${MILVUS_ENGINE_SRC}/grpc/gen-status)
|
||||
include_directories(${MILVUS_ENGINE_SRC}/grpc/gen-milvus)
|
||||
|
||||
cuda_add_executable(server_test
|
||||
${db_src}
|
||||
${unittest_srcs}
|
||||
${config_files}
|
||||
${cache_srcs}
|
||||
${wrapper_src}
|
||||
${test_srcs}
|
||||
${grpc_server_src}
|
||||
${server_src}
|
||||
${utils_srcs}
|
||||
${grpc_service_files}
|
||||
${require_files}
|
||||
)
|
||||
|
||||
@ -38,6 +94,7 @@ set(require_libs
|
||||
cudart
|
||||
cublas
|
||||
sqlite
|
||||
mysqlpp
|
||||
boost_system_static
|
||||
boost_filesystem_static
|
||||
snappy
|
||||
@ -46,6 +103,11 @@ set(require_libs
|
||||
zstd
|
||||
lz4
|
||||
pthread
|
||||
grpcpp_channelz
|
||||
grpc++
|
||||
grpc
|
||||
grpc_protobuf
|
||||
grpc_protoc
|
||||
)
|
||||
|
||||
target_link_libraries(server_test
|
||||
|
||||
478
cpp/unittest/server/rpc_test.cpp
Normal file
478
cpp/unittest/server/rpc_test.cpp
Normal file
@ -0,0 +1,478 @@
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
|
||||
// Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
// Proprietary and confidential.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
#include <gtest/gtest.h>
|
||||
#include <thread>
|
||||
#include <boost/filesystem.hpp>
|
||||
|
||||
#include "server/Server.h"
|
||||
#include "server/grpc_impl/GrpcRequestHandler.h"
|
||||
#include "server/grpc_impl/GrpcRequestScheduler.h"
|
||||
#include "server/grpc_impl/GrpcRequestTask.h"
|
||||
#include "version.h"
|
||||
|
||||
#include "grpc/gen-milvus/milvus.grpc.pb.h"
|
||||
#include "grpc/gen-status/status.pb.h"
|
||||
|
||||
#include "server/DBWrapper.h"
|
||||
#include "server/ServerConfig.h"
|
||||
#include "scheduler/SchedInst.h"
|
||||
#include "scheduler/ResourceFactory.h"
|
||||
#include "utils/CommonUtil.h"
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
namespace server {
|
||||
namespace grpc {
|
||||
|
||||
static const char *TABLE_NAME = "test_grpc";
|
||||
static constexpr int64_t TABLE_DIM = 256;
|
||||
static constexpr int64_t INDEX_FILE_SIZE = 1024;
|
||||
static constexpr int64_t VECTOR_COUNT = 1000;
|
||||
static constexpr int64_t INSERT_LOOP = 10;
|
||||
constexpr int64_t SECONDS_EACH_HOUR = 3600;
|
||||
|
||||
class RpcHandlerTest : public testing::Test {
|
||||
protected:
|
||||
void
|
||||
SetUp() override {
|
||||
|
||||
auto res_mgr = engine::ResMgrInst::GetInstance();
|
||||
res_mgr->Clear();
|
||||
res_mgr->Add(engine::ResourceFactory::Create("disk", "DISK", 0, true, false));
|
||||
res_mgr->Add(engine::ResourceFactory::Create("cpu", "CPU", 0, true, true));
|
||||
res_mgr->Add(engine::ResourceFactory::Create("gtx1660", "GPU", 0, true, true));
|
||||
|
||||
auto default_conn = engine::Connection("IO", 500.0);
|
||||
auto PCIE = engine::Connection("IO", 11000.0);
|
||||
res_mgr->Connect("disk", "cpu", default_conn);
|
||||
res_mgr->Connect("cpu", "gtx1660", PCIE);
|
||||
res_mgr->Start();
|
||||
engine::SchedInst::GetInstance()->Start();
|
||||
|
||||
zilliz::milvus::engine::Options opt;
|
||||
|
||||
ConfigNode &db_config = ServerConfig::GetInstance().GetConfig(CONFIG_DB);
|
||||
db_config.SetValue(CONFIG_DB_URL, "sqlite://:@:/");
|
||||
db_config.SetValue(CONFIG_DB_PATH, "/tmp/milvus_test");
|
||||
db_config.SetValue(CONFIG_DB_SLAVE_PATH, "");
|
||||
db_config.SetValue(CONFIG_DB_ARCHIVE_DISK, "");
|
||||
db_config.SetValue(CONFIG_DB_ARCHIVE_DAYS, "");
|
||||
|
||||
ConfigNode &cache_config = ServerConfig::GetInstance().GetConfig(CONFIG_CACHE);
|
||||
cache_config.SetValue(CONFIG_INSERT_CACHE_IMMEDIATELY, "");
|
||||
|
||||
ConfigNode &engine_config = ServerConfig::GetInstance().GetConfig(CONFIG_ENGINE);
|
||||
engine_config.SetValue(CONFIG_OMP_THREAD_NUM, "");
|
||||
|
||||
ConfigNode &serverConfig = ServerConfig::GetInstance().GetConfig(CONFIG_SERVER);
|
||||
// serverConfig.SetValue(CONFIG_CLUSTER_MODE, "cluster");
|
||||
// DBWrapper::GetInstance().GetInstance().StartService();
|
||||
// DBWrapper::GetInstance().GetInstance().StopService();
|
||||
//
|
||||
// serverConfig.SetValue(CONFIG_CLUSTER_MODE, "read_only");
|
||||
// DBWrapper::GetInstance().GetInstance().StartService();
|
||||
// DBWrapper::GetInstance().GetInstance().StopService();
|
||||
|
||||
serverConfig.SetValue(CONFIG_CLUSTER_MODE, "single");
|
||||
DBWrapper::GetInstance().GetInstance().StartService();
|
||||
|
||||
//initialize handler, create table
|
||||
handler = std::make_shared<GrpcRequestHandler>();
|
||||
::grpc::ServerContext context;
|
||||
::milvus::grpc::TableSchema request;
|
||||
::milvus::grpc::Status status;
|
||||
request.mutable_table_name()->set_table_name(TABLE_NAME);
|
||||
request.set_dimension(TABLE_DIM);
|
||||
request.set_index_file_size(INDEX_FILE_SIZE);
|
||||
request.set_metric_type(1);
|
||||
::grpc::Status grpc_status = handler->CreateTable(&context, &request, &status);
|
||||
}
|
||||
|
||||
void
|
||||
TearDown() override {
|
||||
DBWrapper::GetInstance().StopService();
|
||||
engine::ResMgrInst::GetInstance()->Stop();
|
||||
engine::SchedInst::GetInstance()->Stop();
|
||||
boost::filesystem::remove_all("/tmp/milvus_test");
|
||||
}
|
||||
protected:
|
||||
std::shared_ptr<GrpcRequestHandler> handler;
|
||||
};
|
||||
|
||||
namespace {
|
||||
void BuildVectors(int64_t from, int64_t to,
|
||||
std::vector<std::vector<float >> &vector_record_array) {
|
||||
if (to <= from) {
|
||||
return;
|
||||
}
|
||||
|
||||
vector_record_array.clear();
|
||||
for (int64_t k = from; k < to; k++) {
|
||||
std::vector<float> record;
|
||||
record.resize(TABLE_DIM);
|
||||
for (int64_t i = 0; i < TABLE_DIM; i++) {
|
||||
record[i] = (float) (k % (i + 1));
|
||||
}
|
||||
|
||||
vector_record_array.emplace_back(record);
|
||||
}
|
||||
}
|
||||
|
||||
std::string CurrentTmDate(int64_t offset_day = 0) {
|
||||
time_t tt;
|
||||
time(&tt);
|
||||
tt = tt + 8 * SECONDS_EACH_HOUR;
|
||||
tt = tt + 24 * SECONDS_EACH_HOUR * offset_day;
|
||||
tm *t = gmtime(&tt);
|
||||
|
||||
std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1)
|
||||
+ "-" + std::to_string(t->tm_mday);
|
||||
|
||||
return str;
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(RpcHandlerTest, HasTableTest) {
|
||||
::grpc::ServerContext context;
|
||||
::milvus::grpc::TableName request;
|
||||
::milvus::grpc::BoolReply reply;
|
||||
::grpc::Status status = handler->HasTable(&context, &request, &reply);
|
||||
request.set_table_name(TABLE_NAME);
|
||||
status = handler->HasTable(&context, &request, &reply);
|
||||
ASSERT_TRUE(status.error_code() == ::grpc::Status::OK.error_code());
|
||||
int error_code = reply.status().error_code();
|
||||
ASSERT_EQ(error_code, ::milvus::grpc::ErrorCode::SUCCESS);
|
||||
}
|
||||
|
||||
TEST_F(RpcHandlerTest, IndexTest) {
|
||||
::grpc::ServerContext context;
|
||||
::milvus::grpc::IndexParam request;
|
||||
::milvus::grpc::Status response;
|
||||
::grpc::Status grpc_status = handler->CreateIndex(&context, &request, &response);
|
||||
request.mutable_table_name()->set_table_name("test1");
|
||||
handler->CreateIndex(&context, &request, &response);
|
||||
|
||||
request.mutable_table_name()->set_table_name(TABLE_NAME);
|
||||
handler->CreateIndex(&context, &request, &response);
|
||||
|
||||
request.mutable_index()->set_index_type(1);
|
||||
handler->CreateIndex(&context, &request, &response);
|
||||
|
||||
request.mutable_index()->set_nlist(16384);
|
||||
grpc_status = handler->CreateIndex(&context, &request, &response);
|
||||
ASSERT_EQ(grpc_status.error_code(), ::grpc::Status::OK.error_code());
|
||||
int error_code = response.error_code();
|
||||
// ASSERT_EQ(error_code, ::milvus::grpc::ErrorCode::SUCCESS);
|
||||
|
||||
::milvus::grpc::TableName table_name;
|
||||
::milvus::grpc::IndexParam index_param;
|
||||
handler->DescribeIndex(&context, &table_name, &index_param);
|
||||
table_name.set_table_name("test4");
|
||||
handler->DescribeIndex(&context, &table_name, &index_param);
|
||||
table_name.set_table_name(TABLE_NAME);
|
||||
handler->DescribeIndex(&context, &table_name, &index_param);
|
||||
::milvus::grpc::Status status;
|
||||
table_name.Clear();
|
||||
handler->DropIndex(&context, &table_name, &status);
|
||||
table_name.set_table_name("test5");
|
||||
handler->DropIndex(&context, &table_name, &status);
|
||||
table_name.set_table_name(TABLE_NAME);
|
||||
handler->DropIndex(&context, &table_name, &status);
|
||||
}
|
||||
|
||||
TEST_F(RpcHandlerTest, InsertTest) {
|
||||
::grpc::ServerContext context;
|
||||
::milvus::grpc::InsertParam request;
|
||||
::milvus::grpc::Status response;
|
||||
|
||||
request.set_table_name(TABLE_NAME);
|
||||
std::vector<std::vector<float>> record_array;
|
||||
BuildVectors(0, VECTOR_COUNT, record_array);
|
||||
::milvus::grpc::VectorIds vector_ids;
|
||||
for (auto &record : record_array) {
|
||||
::milvus::grpc::RowRecord *grpc_record = request.add_row_record_array();
|
||||
for (size_t i = 0; i < record.size(); i++) {
|
||||
grpc_record->add_vector_data(record[i]);
|
||||
}
|
||||
}
|
||||
handler->Insert(&context, &request, &vector_ids);
|
||||
ASSERT_EQ(vector_ids.vector_id_array_size(), VECTOR_COUNT);
|
||||
}
|
||||
|
||||
TEST_F(RpcHandlerTest, SearchTest) {
|
||||
::grpc::ServerContext context;
|
||||
::milvus::grpc::SearchParam request;
|
||||
::milvus::grpc::TopKQueryResultList response;
|
||||
//test null input
|
||||
handler->Search(&context, nullptr, &response);
|
||||
|
||||
//test invalid table name
|
||||
handler->Search(&context, &request, &response);
|
||||
|
||||
//test table not exist
|
||||
request.set_table_name("test3");
|
||||
handler->Search(&context, &request, &response);
|
||||
|
||||
//test invalid topk
|
||||
request.set_table_name(TABLE_NAME);
|
||||
handler->Search(&context, &request, &response);
|
||||
|
||||
//test invalid nprobe
|
||||
request.set_topk(10);
|
||||
handler->Search(&context, &request, &response);
|
||||
|
||||
//test empty query record array
|
||||
request.set_nprobe(32);
|
||||
handler->Search(&context, &request, &response);
|
||||
|
||||
std::vector<std::vector<float>> record_array;
|
||||
BuildVectors(0, VECTOR_COUNT, record_array);
|
||||
::milvus::grpc::InsertParam insert_param;
|
||||
for (auto &record : record_array) {
|
||||
::milvus::grpc::RowRecord *grpc_record = insert_param.add_row_record_array();
|
||||
for (size_t i = 0; i < record.size(); i++) {
|
||||
grpc_record->add_vector_data(record[i]);
|
||||
}
|
||||
}
|
||||
//insert vectors
|
||||
insert_param.set_table_name(TABLE_NAME);
|
||||
::milvus::grpc::VectorIds vector_ids;
|
||||
handler->Insert(&context, &insert_param, &vector_ids);
|
||||
sleep(7);
|
||||
|
||||
BuildVectors(0, 10, record_array);
|
||||
for (auto &record : record_array) {
|
||||
::milvus::grpc::RowRecord *row_record = request.add_query_record_array();
|
||||
for (auto &rec : record) {
|
||||
row_record->add_vector_data(rec);
|
||||
}
|
||||
}
|
||||
handler->Search(&context, &request, &response);
|
||||
|
||||
//test search with range
|
||||
::milvus::grpc::Range *range = request.mutable_query_range_array()->Add();
|
||||
range->set_start_value(CurrentTmDate(-2));
|
||||
range->set_end_value(CurrentTmDate(-3));
|
||||
handler->Search(&context, &request, &response);
|
||||
request.mutable_query_range_array()->Clear();
|
||||
|
||||
request.set_table_name("test2");
|
||||
handler->Search(&context, &request, &response);
|
||||
request.set_table_name(TABLE_NAME);
|
||||
handler->Search(&context, &request, &response);
|
||||
|
||||
::milvus::grpc::SearchInFilesParam search_in_files_param;
|
||||
std::string *file_id = search_in_files_param.add_file_id_array();
|
||||
*file_id = "test_tbl";
|
||||
handler->SearchInFiles(&context, &search_in_files_param, &response);
|
||||
}
|
||||
|
||||
TEST_F(RpcHandlerTest, TablesTest) {
|
||||
::grpc::ServerContext context;
|
||||
::milvus::grpc::TableSchema tableschema;
|
||||
::milvus::grpc::Status response;
|
||||
std::string tablename = "tbl";
|
||||
|
||||
//create table test
|
||||
//test null input
|
||||
handler->CreateTable(&context, nullptr, &response);
|
||||
//test invalid table name
|
||||
handler->CreateTable(&context, &tableschema, &response);
|
||||
//test invalid table dimension
|
||||
tableschema.mutable_table_name()->set_table_name(tablename);
|
||||
handler->CreateTable(&context, &tableschema, &response);
|
||||
//test invalid index file size
|
||||
tableschema.set_dimension(TABLE_DIM);
|
||||
// handler->CreateTable(&context, &tableschema, &response);
|
||||
//test invalid index metric type
|
||||
tableschema.set_index_file_size(INDEX_FILE_SIZE);
|
||||
handler->CreateTable(&context, &tableschema, &response);
|
||||
//test table already exist
|
||||
tableschema.set_metric_type(1);
|
||||
handler->CreateTable(&context, &tableschema, &response);
|
||||
|
||||
//describe table test
|
||||
//test invalid table name
|
||||
::milvus::grpc::TableName table_name;
|
||||
::milvus::grpc::TableSchema table_schema;
|
||||
handler->DescribeTable(&context, &table_name, &table_schema);
|
||||
|
||||
table_name.set_table_name(TABLE_NAME);
|
||||
::grpc::Status status = handler->DescribeTable(&context, &table_name, &table_schema);
|
||||
ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code());
|
||||
|
||||
|
||||
::milvus::grpc::InsertParam request;
|
||||
std::vector<std::vector<float>> record_array;
|
||||
BuildVectors(0, VECTOR_COUNT, record_array);
|
||||
::milvus::grpc::VectorIds vector_ids;
|
||||
//Insert vectors
|
||||
//test invalid table name
|
||||
handler->Insert(&context, &request, &vector_ids);
|
||||
request.set_table_name(tablename);
|
||||
//test empty row record
|
||||
handler->Insert(&context, &request, &vector_ids);
|
||||
|
||||
for (auto &record : record_array) {
|
||||
::milvus::grpc::RowRecord *grpc_record = request.add_row_record_array();
|
||||
for (size_t i = 0; i < record.size(); i++) {
|
||||
grpc_record->add_vector_data(record[i]);
|
||||
}
|
||||
}
|
||||
//test vector_id size not equal to row record size
|
||||
vector_ids.clear_vector_id_array();
|
||||
vector_ids.add_vector_id_array(1);
|
||||
handler->Insert(&context, &request, &vector_ids);
|
||||
|
||||
//normally test
|
||||
vector_ids.clear_vector_id_array();
|
||||
handler->Insert(&context, &request, &vector_ids);
|
||||
|
||||
request.clear_row_record_array();
|
||||
vector_ids.clear_vector_id_array();
|
||||
for (uint64_t i = 0; i < 10; ++i) {
|
||||
::milvus::grpc::RowRecord *grpc_record = request.add_row_record_array();
|
||||
for (size_t j = 0; j < 10; j++) {
|
||||
grpc_record->add_vector_data(record_array[i][j]);
|
||||
}
|
||||
}
|
||||
handler->Insert(&context, &request, &vector_ids);
|
||||
|
||||
|
||||
//Show table
|
||||
// ::milvus::grpc::Command cmd;
|
||||
// ::grpc::ServerWriter<::milvus::grpc::TableName> *writer;
|
||||
// status = handler->ShowTables(&context, &cmd, writer);
|
||||
// ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code());
|
||||
|
||||
//Count Table
|
||||
::milvus::grpc::TableRowCount count;
|
||||
table_name.Clear();
|
||||
status = handler->CountTable(&context, &table_name, &count);
|
||||
table_name.set_table_name(tablename);
|
||||
status = handler->CountTable(&context, &table_name, &count);
|
||||
ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code());
|
||||
// ASSERT_EQ(count.table_row_count(), vector_ids.vector_id_array_size());
|
||||
|
||||
|
||||
//Preload Table
|
||||
table_name.Clear();
|
||||
status = handler->PreloadTable(&context, &table_name, &response);
|
||||
table_name.set_table_name(TABLE_NAME);
|
||||
status = handler->PreloadTable(&context, &table_name, &response);
|
||||
ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code());
|
||||
|
||||
//Drop table
|
||||
table_name.set_table_name("");
|
||||
//test invalid table name
|
||||
::grpc::Status grpc_status = handler->DropTable(&context, &table_name, &response);
|
||||
table_name.set_table_name(tablename);
|
||||
grpc_status = handler->DropTable(&context, &table_name, &response);
|
||||
ASSERT_EQ(grpc_status.error_code(), ::grpc::Status::OK.error_code());
|
||||
int error_code = status.error_code();
|
||||
ASSERT_EQ(error_code, ::milvus::grpc::ErrorCode::SUCCESS);
|
||||
}
|
||||
|
||||
TEST_F(RpcHandlerTest, CmdTest) {
|
||||
::grpc::ServerContext context;
|
||||
::milvus::grpc::Command command;
|
||||
command.set_cmd("version");
|
||||
::milvus::grpc::StringReply reply;
|
||||
handler->Cmd(&context, &command, &reply);
|
||||
ASSERT_EQ(reply.string_reply(), MILVUS_VERSION);
|
||||
|
||||
command.set_cmd("tasktable");
|
||||
handler->Cmd(&context, &command, &reply);
|
||||
command.set_cmd("test");
|
||||
handler->Cmd(&context, &command, &reply);
|
||||
}
|
||||
|
||||
TEST_F(RpcHandlerTest, DeleteByRangeTest) {
|
||||
::grpc::ServerContext context;
|
||||
::milvus::grpc::DeleteByRangeParam request;
|
||||
::milvus::grpc::Status status;
|
||||
handler->DeleteByRange(&context, nullptr, &status);
|
||||
handler->DeleteByRange(&context, &request, &status);
|
||||
|
||||
request.set_table_name(TABLE_NAME);
|
||||
request.mutable_range()->set_start_value(CurrentTmDate(-2));
|
||||
request.mutable_range()->set_end_value(CurrentTmDate(-3));
|
||||
|
||||
::grpc::Status grpc_status = handler->DeleteByRange(&context, &request, &status);
|
||||
int error_code = status.error_code();
|
||||
ASSERT_EQ(error_code, ::milvus::grpc::ErrorCode::SUCCESS);
|
||||
|
||||
request.mutable_range()->set_start_value("test6");
|
||||
grpc_status = handler->DeleteByRange(&context, &request, &status);
|
||||
request.mutable_range()->set_start_value(CurrentTmDate(-2));
|
||||
request.mutable_range()->set_end_value("test6");
|
||||
grpc_status = handler->DeleteByRange(&context, &request, &status);
|
||||
request.mutable_range()->set_end_value(CurrentTmDate(-2));
|
||||
grpc_status = handler->DeleteByRange(&context, &request, &status);
|
||||
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
class DummyTask : public GrpcBaseTask {
|
||||
public:
|
||||
ErrorCode
|
||||
OnExecute() override {
|
||||
return 0;
|
||||
}
|
||||
|
||||
static BaseTaskPtr
|
||||
Create(std::string& dummy) {
|
||||
return std::shared_ptr<GrpcBaseTask>(new DummyTask(dummy));
|
||||
}
|
||||
|
||||
ErrorCode
|
||||
DummySetError(ErrorCode error_code, const std::string &msg) {
|
||||
return SetError(error_code, msg);
|
||||
}
|
||||
|
||||
public:
|
||||
explicit DummyTask(std::string &dummy) : GrpcBaseTask(dummy) {
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
class RpcSchedulerTest : public testing::Test {
|
||||
protected:
|
||||
void
|
||||
SetUp() override {
|
||||
std::string dummy = "dql";
|
||||
task_ptr = std::make_shared<DummyTask>(dummy);
|
||||
}
|
||||
|
||||
std::shared_ptr<DummyTask> task_ptr;
|
||||
};
|
||||
|
||||
TEST_F(RpcSchedulerTest, BaseTaskTest){
|
||||
ErrorCode error_code = task_ptr->Execute();
|
||||
ASSERT_EQ(error_code, 0);
|
||||
|
||||
error_code = task_ptr->DummySetError(0, "test error");
|
||||
ASSERT_EQ(error_code, 0);
|
||||
|
||||
GrpcRequestScheduler::GetInstance().Start();
|
||||
::milvus::grpc::Status grpc_status;
|
||||
std::string dummy = "dql";
|
||||
BaseTaskPtr base_task_ptr = DummyTask::Create(dummy);
|
||||
GrpcRequestScheduler::GetInstance().ExecTask(base_task_ptr, &grpc_status);
|
||||
|
||||
GrpcRequestScheduler::GetInstance().ExecuteTask(task_ptr);
|
||||
task_ptr = nullptr;
|
||||
GrpcRequestScheduler::GetInstance().ExecuteTask(task_ptr);
|
||||
|
||||
GrpcRequestScheduler::GetInstance().Stop();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user