Merge branch 'release-v1' into 'jinhai'

Release v1

See merge request megasearch/vecwise_engine!26

Former-commit-id: c5fee89765b03603a67274e4756a46ad01300285
This commit is contained in:
yhmo 2019-05-15 16:40:57 +08:00
commit e3b3ba4363
46 changed files with 811 additions and 4361 deletions

1
.gitignore vendored
View File

@ -14,3 +14,4 @@ cmake_build
cpp/third_party/thrift-0.12.0/
cpp/third_party/faiss-1.5.1
cpp/megasearch/

View File

@ -83,8 +83,8 @@ add_custom_target(Clean-All COMMAND ${CMAKE_BUILD_TOOL} clean)
#install
install(FILES
start_server.sh
stop_server.sh
scripts/start_server.sh
scripts/stop_server.sh
DESTINATION
scripts)
install(FILES

View File

@ -23,6 +23,7 @@ cmake_build/src/libvecwise_engine.a is the static library
cd [sourcecode path]/cpp
./build.sh -t Debug
./build.sh -t Release
./build.sh -g # Build GPU version
#### To build unittest:

View File

@ -1,5 +1,5 @@
server_config:
address: 127.0.0.1
address: 0.0.0.0
port: 33001
transfer_protocol: json #optional: binary, compact, json, debug
server_mode: thread_pool #optional: simple, thread_pool

4
cpp/scripts/start_server.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/bash
../bin/vecwise_server -c ../conf/server_config.yaml -l ../conf/vecwise_engine_log.conf

16
cpp/scripts/stop_server.sh Executable file
View File

@ -0,0 +1,16 @@
#!/bin/bash
function kill_progress()
{
kill -s SIGUSR2 $(pgrep $1)
sleep 2
}
STATUS=$(kill_progress "vecwise_server" )
if [[ ${STATUS} == "false" ]];then
echo "vecwise_server closed abnormally!"
else
echo "vecwise_server closed successfully!"
fi

View File

@ -23,8 +23,8 @@ set(license_generator_src
set(service_files
thrift/gen-cpp/VecService.cpp
thrift/gen-cpp/VectorService_constants.cpp
thrift/gen-cpp/VectorService_types.cpp
thrift/gen-cpp/megasearch_constants.cpp
thrift/gen-cpp/megasearch_types.cpp
)
set(vecwise_engine_src

View File

@ -107,7 +107,7 @@ Status DBImpl<EngineT>::search(const std::string& group_id, size_t k, size_t nq,
using SearchResult = std::pair<std::vector<long>, std::vector<float>>;
std::vector<SearchResult> batchresult(nq); // allocate nq cells.
auto cluster = [&](long *nns, float *dis) -> void {
auto cluster = [&](long *nns, float *dis, const int& k) -> void {
for (int i = 0; i < nq; ++i) {
auto f_begin = batchresult[i].first.cbegin();
auto s_begin = batchresult[i].second.cbegin();
@ -134,8 +134,10 @@ Status DBImpl<EngineT>::search(const std::string& group_id, size_t k, size_t nq,
search_set_size += file_size;
LOG(DEBUG) << "Search file_type " << file.file_type << " Of Size: "
<< file_size << " M";
index.Search(nq, vectors, k, output_distence, output_ids);
cluster(output_ids, output_distence); // cluster to each query
int inner_k = index.Count() < k ? index.Count() : k;
index.Search(nq, vectors, inner_k, output_distence, output_ids);
cluster(output_ids, output_distence, inner_k); // cluster to each query
memset(output_distence, 0, k * nq * sizeof(float));
memset(output_ids, 0, k * nq * sizeof(long));
}
@ -145,15 +147,25 @@ Status DBImpl<EngineT>::search(const std::string& group_id, size_t k, size_t nq,
const int &k,
float *output_distence,
long *output_ids) -> void {
std::map<float, int> inverted_table;
std::map<float, std::vector<int>> inverted_table;
for (int i = 0; i < input_data.size(); ++i) {
inverted_table[input_data[i]] = i;
if (inverted_table.count(input_data[i]) == 1) {
auto& ori_vec = inverted_table[input_data[i]];
ori_vec.push_back(i);
}
else {
inverted_table[input_data[i]] = std::vector<int>{i};
}
}
int count = 0;
for (auto it = inverted_table.begin(); it != inverted_table.end() && count < k; ++it, ++count) {
output_distence[count] = it->first;
output_ids[count] = it->second;
for (auto &item : inverted_table){
if (count == k) break;
for (auto &id : item.second){
output_distence[count] = item.first;
output_ids[count] = id;
if (++count == k) break;
}
}
};
auto cluster_topk = [&]() -> void {
@ -161,8 +173,11 @@ Status DBImpl<EngineT>::search(const std::string& group_id, size_t k, size_t nq,
for (auto &result_pair : batchresult) {
auto &dis = result_pair.second;
auto &nns = result_pair.first;
topk_cpu(dis, k, output_distence, output_ids);
for (int i = 0; i < k; ++i) {
int inner_k = dis.size() < k ? dis.size() : k;
for (int i = 0; i < inner_k; ++i) {
res.emplace_back(nns[output_ids[i]]); // mapping
}
results.push_back(res); // append to result list

View File

@ -64,13 +64,17 @@ Status FaissExecutionEngine<IndexTrait>::Serialize() {
template<class IndexTrait>
Status FaissExecutionEngine<IndexTrait>::Load() {
auto index = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool to_cache = false;
if (!index) {
index = read_index(location_);
Cache();
to_cache = true;
LOG(DEBUG) << "Disk io from: " << location_;
}
pIndex_ = index->data();
if (to_cache) {
Cache();
}
return Status::OK();
}

View File

@ -0,0 +1,242 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "RocksIdMapper.h"
#include "ServerConfig.h"
#include "utils/Log.h"
#include "utils/CommonUtil.h"
#include "rocksdb/slice.h"
#include "rocksdb/options.h"
#include <exception>
namespace zilliz {
namespace vecwise {
namespace server {
RocksIdMapper::RocksIdMapper()
: db_(nullptr) {
OpenDb();
}
RocksIdMapper::~RocksIdMapper() {
CloseDb();
}
void RocksIdMapper::OpenDb() {
if(db_) {
return;
}
ConfigNode& config = ServerConfig::GetInstance().GetConfig(CONFIG_DB);
std::string db_path = config.GetValue(CONFIG_DB_PATH);
db_path += "/id_mapping";
CommonUtil::CreateDirectory(db_path);
rocksdb::Options options;
// Optimize RocksDB. This is the easiest way to get RocksDB to perform well
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// create the DB if it's not already present
options.create_if_missing = true;
options.max_open_files = config.GetInt32Value(CONFIG_DB_IDMAPPER_MAX_FILE, 512);
//load column families
std::vector<std::string> column_names;
rocksdb::Status s = rocksdb::DB::ListColumnFamilies(options, db_path, &column_names);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to initialize:" << s.ToString();
}
if(column_names.empty()) {
column_names.push_back("default");
}
SERVER_LOG_INFO << "ID mapper has " << std::to_string(column_names.size()) << " groups";
std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
for(auto& column_name : column_names) {
rocksdb::ColumnFamilyDescriptor desc;
desc.name = column_name;
column_families.emplace_back(desc);
}
// open DB
std::vector<rocksdb::ColumnFamilyHandle*> column_handles;
s = rocksdb::DB::Open(options, db_path, column_families, &column_handles, &db_);
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to initialize:" << s.ToString();
db_ = nullptr;
}
column_handles_.clear();
for(auto handler : column_handles) {
column_handles_.insert(std::make_pair(handler->GetName(), handler));
}
}
void RocksIdMapper::CloseDb() {
for(auto& iter : column_handles_) {
delete iter.second;
}
column_handles_.clear();
if(db_) {
db_->Close();
delete db_;
}
}
ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::Slice key(nid);
rocksdb::Slice value(sid);
if(group.empty()) {//to default group
rocksdb::Status s = db_->Put(rocksdb::WriteOptions(), key, value);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to put:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
} else {
rocksdb::ColumnFamilyHandle *cfh = nullptr;
if(column_handles_.count(group) == 0) {
try {//add group
rocksdb::Status s = db_->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), group, &cfh);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to create group:" << s.ToString();
} else {
column_handles_.insert(std::make_pair(group, cfh));
}
} catch(std::exception& ex) {
std::cout << ex.what() << std::endl;
}
} else {
cfh = column_handles_[group];
}
rocksdb::Status s = db_->Put(rocksdb::WriteOptions(), cfh, key, value);
if (!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to put:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
}
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group) {
if(nid.size() != sid.size()) {
return SERVER_INVALID_ARGUMENT;
}
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
err = Put(nid[i], sid[i], group);
if(err != SERVER_SUCCESS) {
return err;
}
}
return err;
}
ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const {
sid = "";
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::ColumnFamilyHandle *cfh = nullptr;
if(column_handles_.count(group) != 0) {
cfh = column_handles_.at(group);
}
rocksdb::Slice key(nid);
rocksdb::Status s;
if(cfh){
s = db_->Get(rocksdb::ReadOptions(), cfh, key, &sid);
} else {
s = db_->Get(rocksdb::ReadOptions(), key, &sid);
}
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to get:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group) const {
sid.clear();
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
std::string str_id;
ServerError temp_err = Get(nid[i], str_id, group);
if(temp_err != SERVER_SUCCESS) {
sid.push_back("");
SERVER_LOG_ERROR << "ID mapper failed to get id: " << nid[i];
err = temp_err;
continue;
}
sid.push_back(str_id);
}
return err;
}
ServerError RocksIdMapper::Delete(const std::string& nid, const std::string& group) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::ColumnFamilyHandle *cfh = nullptr;
if(column_handles_.count(group) != 0) {
cfh = column_handles_.at(group);
}
rocksdb::Slice key(nid);
rocksdb::Status s;
if(cfh){
s = db_->Delete(rocksdb::WriteOptions(), cfh, key);
} else {
s = db_->Delete(rocksdb::WriteOptions(), key);
}
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to delete:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::DeleteGroup(const std::string& group) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::ColumnFamilyHandle *cfh = nullptr;
if(column_handles_.count(group) != 0) {
cfh = column_handles_.at(group);
}
if(cfh) {
db_->DropColumnFamily(cfh);
db_->DestroyColumnFamilyHandle(cfh);
column_handles_.erase(group);
}
return SERVER_SUCCESS;
}
}
}
}

View File

@ -0,0 +1,46 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "utils/Error.h"
#include "VecIdMapper.h"
#include "rocksdb/db.h"
#include <string>
#include <vector>
#include <unordered_map>
namespace zilliz {
namespace vecwise {
namespace server {
class RocksIdMapper : public IVecIdMapper{
public:
RocksIdMapper();
~RocksIdMapper();
ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") override;
ServerError Get(const std::string& nid, std::string& sid, const std::string& group = "") const override;
ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group = "") const override;
ServerError Delete(const std::string& nid, const std::string& group = "") override;
ServerError DeleteGroup(const std::string& group) override;
private:
void OpenDb();
void CloseDb();
private:
rocksdb::DB* db_;
std::unordered_map<std::string, rocksdb::ColumnFamilyHandle*> column_handles_;
};
}
}
}

View File

@ -5,6 +5,7 @@
******************************************************************************/
#include "VecIdMapper.h"
#include "RocksIdMapper.h"
#include "ServerConfig.h"
#include "utils/Log.h"
#include "utils/CommonUtil.h"
@ -13,6 +14,8 @@
#include "rocksdb/slice.h"
#include "rocksdb/options.h"
#include <exception>
namespace zilliz {
namespace vecwise {
namespace server {
@ -36,26 +39,33 @@ SimpleIdMapper::~SimpleIdMapper() {
}
ServerError SimpleIdMapper::Put(const std::string& nid, const std::string& sid) {
ids_[nid] = sid;
//not thread-safe
ServerError SimpleIdMapper::Put(const std::string& nid, const std::string& sid, const std::string& group) {
ID_MAPPING& mapping = id_groups_[group];
mapping[nid] = sid;
return SERVER_SUCCESS;
}
ServerError SimpleIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) {
//not thread-safe
ServerError SimpleIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group) {
if(nid.size() != sid.size()) {
return SERVER_INVALID_ARGUMENT;
}
ID_MAPPING& mapping = id_groups_[group];
for(size_t i = 0; i < nid.size(); i++) {
ids_[nid[i]] = sid[i];
mapping[nid[i]] = sid[i];
}
return SERVER_SUCCESS;
}
ServerError SimpleIdMapper::Get(const std::string& nid, std::string& sid) const {
auto iter = ids_.find(nid);
if(iter == ids_.end()) {
//not thread-safe
ServerError SimpleIdMapper::Get(const std::string& nid, std::string& sid, const std::string& group) const {
ID_MAPPING& mapping = id_groups_[group];
auto iter = mapping.find(nid);
if(iter == mapping.end()) {
return SERVER_INVALID_ARGUMENT;
}
@ -64,13 +74,16 @@ ServerError SimpleIdMapper::Get(const std::string& nid, std::string& sid) const
return SERVER_SUCCESS;
}
ServerError SimpleIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const {
//not thread-safe
ServerError SimpleIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group) const {
sid.clear();
ID_MAPPING& mapping = id_groups_[group];
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
auto iter = ids_.find(nid[i]);
if(iter == ids_.end()) {
auto iter = mapping.find(nid[i]);
if(iter == mapping.end()) {
sid.push_back("");
SERVER_LOG_ERROR << "ID mapper failed to find id: " << nid[i];
err = SERVER_INVALID_ARGUMENT;
@ -83,120 +96,16 @@ ServerError SimpleIdMapper::Get(const std::vector<std::string>& nid, std::vector
return err;
}
ServerError SimpleIdMapper::Delete(const std::string& nid) {
ids_.erase(nid);
//not thread-safe
ServerError SimpleIdMapper::Delete(const std::string& nid, const std::string& group) {
ID_MAPPING& mapping = id_groups_[group];
mapping.erase(nid);
return SERVER_SUCCESS;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
RocksIdMapper::RocksIdMapper() {
ConfigNode& config = ServerConfig::GetInstance().GetConfig(CONFIG_DB);
std::string db_path = config.GetValue(CONFIG_DB_PATH);
db_path += "/id_mapping";
CommonUtil::CreateDirectory(db_path);
rocksdb::Options options;
// Optimize RocksDB. This is the easiest way to get RocksDB to perform well
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// create the DB if it's not already present
options.create_if_missing = true;
options.max_open_files = config.GetInt32Value(CONFIG_DB_IDMAPPER_MAX_FILE, 128);
// open DB
rocksdb::Status s = rocksdb::DB::Open(options, db_path, &db_);
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to initialize:" << s.ToString();
db_ = nullptr;
}
}
RocksIdMapper::~RocksIdMapper() {
if(db_) {
db_->Close();
delete db_;
}
}
ServerError RocksIdMapper::Put(const std::string& nid, const std::string& sid) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::Slice key(nid);
rocksdb::Slice value(sid);
rocksdb::Status s = db_->Put(rocksdb::WriteOptions(), key, value);
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to put:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) {
if(nid.size() != sid.size()) {
return SERVER_INVALID_ARGUMENT;
}
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
err = Put(nid[i], sid[i]);
if(err != SERVER_SUCCESS) {
return err;
}
}
return err;
}
ServerError RocksIdMapper::Get(const std::string& nid, std::string& sid) const {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::Slice key(nid);
rocksdb::Status s = db_->Get(rocksdb::ReadOptions(), key, &sid);
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to get:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
return SERVER_SUCCESS;
}
ServerError RocksIdMapper::Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const {
sid.clear();
ServerError err = SERVER_SUCCESS;
for(size_t i = 0; i < nid.size(); i++) {
std::string str_id;
ServerError temp_err = Get(nid[i], str_id);
if(temp_err != SERVER_SUCCESS) {
sid.push_back("");
SERVER_LOG_ERROR << "ID mapper failed to get id: " << nid[i];
err = temp_err;
continue;
}
sid.push_back(str_id);
}
return err;
}
ServerError RocksIdMapper::Delete(const std::string& nid) {
if(db_ == nullptr) {
return SERVER_NULL_POINTER;
}
rocksdb::Slice key(nid);
rocksdb::Status s = db_->Delete(rocksdb::WriteOptions(), key);
if(!s.ok()) {
SERVER_LOG_ERROR << "ID mapper failed to delete:" << s.ToString();
return SERVER_UNEXPECTED_ERROR;
}
//not thread-safe
ServerError SimpleIdMapper::DeleteGroup(const std::string& group) {
id_groups_.erase(group);
return SERVER_SUCCESS;
}

View File

@ -25,14 +25,15 @@ public:
virtual ~IVecIdMapper(){}
virtual ServerError Put(const std::string& nid, const std::string& sid) = 0;
virtual ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) = 0;
virtual ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") = 0;
virtual ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") = 0;
virtual ServerError Get(const std::string& nid, std::string& sid) const = 0;
virtual ServerError Get(const std::string& nid, std::string& sid, const std::string& group = "") const = 0;
//NOTE: the 'sid' will be cleared at begin of the function
virtual ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const = 0;
virtual ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group = "") const = 0;
virtual ServerError Delete(const std::string& nid) = 0;
virtual ServerError Delete(const std::string& nid, const std::string& group = "") = 0;
virtual ServerError DeleteGroup(const std::string& group) = 0;
};
class SimpleIdMapper : public IVecIdMapper{
@ -40,33 +41,18 @@ public:
SimpleIdMapper();
~SimpleIdMapper();
ServerError Put(const std::string& nid, const std::string& sid) override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) override;
ServerError Put(const std::string& nid, const std::string& sid, const std::string& group = "") override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid, const std::string& group = "") override;
ServerError Get(const std::string& nid, std::string& sid) const override;
ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const override;
ServerError Get(const std::string& nid, std::string& sid, const std::string& group = "") const override;
ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid, const std::string& group = "") const override;
ServerError Delete(const std::string& nid) override;
ServerError Delete(const std::string& nid, const std::string& group = "") override;
ServerError DeleteGroup(const std::string& group) override;
private:
std::unordered_map<std::string, std::string> ids_;
};
class RocksIdMapper : public IVecIdMapper{
public:
RocksIdMapper();
~RocksIdMapper();
ServerError Put(const std::string& nid, const std::string& sid) override;
ServerError Put(const std::vector<std::string>& nid, const std::vector<std::string>& sid) override;
ServerError Get(const std::string& nid, std::string& sid) const override;
ServerError Get(const std::vector<std::string>& nid, std::vector<std::string>& sid) const override;
ServerError Delete(const std::string& nid) override;
private:
rocksdb::DB* db_;
using ID_MAPPING = std::unordered_map<std::string, std::string>;
mutable std::unordered_map<std::string, ID_MAPPING> id_groups_;
};
}

View File

@ -6,7 +6,6 @@
#include "VecServiceHandler.h"
#include "VecServiceTask.h"
#include "ServerConfig.h"
#include "VecIdMapper.h"
#include "utils/Log.h"
#include "utils/CommonUtil.h"
#include "utils/TimeRecorder.h"
@ -18,17 +17,19 @@ namespace zilliz {
namespace vecwise {
namespace server {
using namespace megasearch;
namespace {
class TimeRecordWrapper {
public:
TimeRecordWrapper(const std::string& func_name)
: recorder_(func_name), func_name_(func_name) {
SERVER_LOG_TRACE << func_name << " called";
//SERVER_LOG_TRACE << func_name << " called";
}
~TimeRecordWrapper() {
recorder_.Elapse("cost");
SERVER_LOG_TRACE << func_name_ << " finished";
//SERVER_LOG_TRACE << func_name_ << " finished";
}
private:
@ -38,87 +39,128 @@ namespace {
void TimeRecord(const std::string& func_name) {
}
const std::map<ServerError, VecErrCode::type>& ErrorMap() {
static const std::map<ServerError, VecErrCode::type> code_map = {
{SERVER_UNEXPECTED_ERROR, VecErrCode::ILLEGAL_ARGUMENT},
{SERVER_NULL_POINTER, VecErrCode::ILLEGAL_ARGUMENT},
{SERVER_INVALID_ARGUMENT, VecErrCode::ILLEGAL_ARGUMENT},
{SERVER_FILE_NOT_FOUND, VecErrCode::ILLEGAL_ARGUMENT},
{SERVER_NOT_IMPLEMENT, VecErrCode::ILLEGAL_ARGUMENT},
{SERVER_BLOCKING_QUEUE_EMPTY, VecErrCode::ILLEGAL_ARGUMENT},
{SERVER_GROUP_NOT_EXIST, VecErrCode::GROUP_NOT_EXISTS},
{SERVER_INVALID_TIME_RANGE, VecErrCode::ILLEGAL_TIME_RANGE},
{SERVER_INVALID_VECTOR_DIMENSION, VecErrCode::ILLEGAL_VECTOR_DIMENSION},
};
return code_map;
}
const std::map<ServerError, std::string>& ErrorMessage() {
static const std::map<ServerError, std::string> msg_map = {
{SERVER_UNEXPECTED_ERROR, "unexpected error occurs"},
{SERVER_NULL_POINTER, "null pointer error"},
{SERVER_INVALID_ARGUMENT, "invalid argument"},
{SERVER_FILE_NOT_FOUND, "file not found"},
{SERVER_NOT_IMPLEMENT, "not implemented"},
{SERVER_BLOCKING_QUEUE_EMPTY, "queue empty"},
{SERVER_GROUP_NOT_EXIST, "group not exist"},
{SERVER_INVALID_TIME_RANGE, "invalid time range"},
{SERVER_INVALID_VECTOR_DIMENSION, "invalid vector dimension"},
};
return msg_map;
}
void ExecTask(BaseTaskPtr& task_ptr) {
if(task_ptr == nullptr) {
return;
}
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
if(!task_ptr->IsAsync()) {
task_ptr->WaitToFinish();
ServerError err = task_ptr->ErrorCode();
if (err != SERVER_SUCCESS) {
VecException ex;
ex.__set_code(ErrorMap().at(err));
std::string msg = task_ptr->ErrorMsg();
if(msg.empty()){
msg = ErrorMessage().at(err);
}
ex.__set_reason(msg);
throw ex;
}
}
}
}
void
VecServiceHandler::add_group(const VecGroup &group) {
TimeRecordWrapper rc("add_group()");
SERVER_LOG_TRACE << "group.id = " << group.id << ", group.dimension = " << group.dimension
<< ", group.index_type = " << group.index_type;
std::string info = "add_group() " + group.id + " dimension = " + std::to_string(group.dimension)
+ " index_type = " + std::to_string(group.index_type);
TimeRecordWrapper rc(info);
BaseTaskPtr task_ptr = AddGroupTask::Create(group.dimension, group.id);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
VecServiceHandler::get_group(VecGroup &_return, const std::string &group_id) {
TimeRecordWrapper rc("get_group()");
SERVER_LOG_TRACE << "group_id = " << group_id;
TimeRecordWrapper rc("get_group() " + group_id);
_return.id = group_id;
BaseTaskPtr task_ptr = GetGroupTask::Create(group_id, _return.dimension);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
VecServiceHandler::del_group(const std::string &group_id) {
TimeRecordWrapper rc("del_group()");
SERVER_LOG_TRACE << "group_id = " << group_id;
TimeRecordWrapper rc("del_group() " + group_id);
BaseTaskPtr task_ptr = DeleteGroupTask::Create(group_id);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
VecServiceHandler::add_vector(std::string& _return, const std::string &group_id, const VecTensor &tensor) {
TimeRecordWrapper rc("add_vector()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", vector size = " << tensor.tensor.size();
TimeRecordWrapper rc("add_vector() to " + group_id);
BaseTaskPtr task_ptr = AddVectorTask::Create(group_id, &tensor, _return);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
VecServiceHandler::add_vector_batch(std::vector<std::string> & _return,
const std::string &group_id,
const VecTensorList &tensor_list) {
TimeRecordWrapper rc("add_vector_batch()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", vector list size = "
<< tensor_list.tensor_list.size();
TimeRecordWrapper rc("add_vector_batch() to " + group_id);
BaseTaskPtr task_ptr = AddBatchVectorTask::Create(group_id, &tensor_list, _return);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
VecServiceHandler::add_binary_vector(std::string& _return,
const std::string& group_id,
const VecBinaryTensor& tensor) {
TimeRecordWrapper rc("add_binary_vector()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", vector size = " << tensor.tensor.size()/4;
TimeRecordWrapper rc("add_binary_vector() to " + group_id);
BaseTaskPtr task_ptr = AddVectorTask::Create(group_id, &tensor, _return);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
VecServiceHandler::add_binary_vector_batch(std::vector<std::string> & _return,
const std::string& group_id,
const VecBinaryTensorList& tensor_list) {
TimeRecordWrapper rc("add_binary_vector_batch()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", vector list size = "
<< tensor_list.tensor_list.size();
TimeRecordWrapper rc("add_binary_vector_batch() to " + group_id);
BaseTaskPtr task_ptr = AddBatchVectorTask::Create(group_id, &tensor_list, _return);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
@ -127,16 +169,13 @@ VecServiceHandler::search_vector(VecSearchResult &_return,
const int64_t top_k,
const VecTensor &tensor,
const VecSearchFilter& filter) {
TimeRecordWrapper rc("search_vector()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", top_k = " << top_k
<< ", vector dimension = " << tensor.tensor.size();
TimeRecordWrapper rc("search_vector() in " + group_id);
VecTensorList tensor_list;
tensor_list.tensor_list.push_back(tensor);
VecSearchResultList result;
BaseTaskPtr task_ptr = SearchVectorTask::Create(group_id, top_k, &tensor_list, filter, result);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
if(!result.result_list.empty()) {
_return = result.result_list[0];
@ -151,13 +190,10 @@ VecServiceHandler::search_vector_batch(VecSearchResultList &_return,
const int64_t top_k,
const VecTensorList &tensor_list,
const VecSearchFilter& filter) {
TimeRecordWrapper rc("search_vector_batch()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", top_k = " << top_k
<< ", vector list size = " << tensor_list.tensor_list.size();
TimeRecordWrapper rc("search_vector_batch() in " + group_id);
BaseTaskPtr task_ptr = SearchVectorTask::Create(group_id, top_k, &tensor_list, filter, _return);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}
void
@ -166,16 +202,13 @@ VecServiceHandler::search_binary_vector(VecSearchResult& _return,
const int64_t top_k,
const VecBinaryTensor& tensor,
const VecSearchFilter& filter) {
TimeRecordWrapper rc("search_binary_vector()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", top_k = " << top_k
<< ", vector dimension = " << tensor.tensor.size();
TimeRecordWrapper rc("search_binary_vector() in " + group_id);
VecBinaryTensorList tensor_list;
tensor_list.tensor_list.push_back(tensor);
VecSearchResultList result;
BaseTaskPtr task_ptr = SearchVectorTask::Create(group_id, top_k, &tensor_list, filter, result);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
if(!result.result_list.empty()) {
_return = result.result_list[0];
@ -190,13 +223,10 @@ VecServiceHandler::search_binary_vector_batch(VecSearchResultList& _return,
const int64_t top_k,
const VecBinaryTensorList& tensor_list,
const VecSearchFilter& filter) {
TimeRecordWrapper rc("search_binary_vector_batch()");
SERVER_LOG_TRACE << "group_id = " << group_id << ", top_k = " << top_k
<< ", vector list size = " << tensor_list.tensor_list.size();
TimeRecordWrapper rc("search_binary_vector_batch() in " + group_id);
BaseTaskPtr task_ptr = SearchVectorTask::Create(group_id, top_k, &tensor_list, filter, _return);
VecServiceScheduler& scheduler = VecServiceScheduler::GetInstance();
scheduler.ExecuteTask(task_ptr);
ExecTask(task_ptr);
}

View File

@ -22,6 +22,8 @@ namespace zilliz {
namespace vecwise {
namespace server {
using namespace megasearch;
class VecServiceHandler : virtual public VecServiceIf {
public:
VecServiceHandler() {

View File

@ -27,6 +27,7 @@ public:
std::string TaskGroup() const { return task_group_; }
ServerError ErrorCode() const { return error_code_; }
std::string ErrorMsg() const { return error_msg_; }
bool IsAsync() const { return async_; }
@ -41,6 +42,7 @@ protected:
bool async_;
bool done_;
ServerError error_code_;
std::string error_msg_;
};
using BaseTaskPtr = std::shared_ptr<BaseTask>;

View File

@ -91,13 +91,16 @@ ServerError AddGroupTask::OnExecute() {
group_info.dimension = (size_t)dimension_;
group_info.group_id = group_id_;
engine::Status stat = DB()->add_group(group_info);
if(!stat.ok()) {
if(!stat.ok()) {//could exist
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_UNEXPECTED_ERROR;
SERVER_LOG_ERROR << error_msg_;
return SERVER_SUCCESS;
}
} catch (std::exception& ex) {
SERVER_LOG_ERROR << ex.what();
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = ex.what();
SERVER_LOG_ERROR << error_msg_;
return SERVER_UNEXPECTED_ERROR;
}
@ -124,14 +127,18 @@ ServerError GetGroupTask::OnExecute() {
group_info.group_id = group_id_;
engine::Status stat = DB()->get_group(group_info);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
} else {
dimension_ = (int32_t)group_info.dimension;
}
} catch (std::exception& ex) {
SERVER_LOG_ERROR << ex.what();
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = ex.what();
SERVER_LOG_ERROR << error_msg_;
return SERVER_UNEXPECTED_ERROR;
}
@ -150,14 +157,13 @@ BaseTaskPtr DeleteGroupTask::Create(const std::string& group_id) {
}
ServerError DeleteGroupTask::OnExecute() {
try {
error_code_ = SERVER_NOT_IMPLEMENT;
error_msg_ = "delete group not implemented";
SERVER_LOG_ERROR << error_msg_;
//IVecIdMapper::GetInstance()->DeleteGroup(group_id_);
} catch (std::exception& ex) {
SERVER_LOG_ERROR << ex.what();
}
return SERVER_SUCCESS;
return SERVER_NOT_IMPLEMENT;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -237,22 +243,7 @@ const AttribMap& AddVectorTask::GetVecAttrib() const {
ServerError AddVectorTask::OnExecute() {
try {
engine::meta::GroupSchema group_info;
group_info.group_id = group_id_;
engine::Status stat = DB()->get_group(group_info);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_INVALID_ARGUMENT;
}
uint64_t group_dim = group_info.dimension;
uint64_t vec_dim = GetVecDimension();
if(group_dim != vec_dim) {
SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim
<< " vs. group dimension:" << group_dim;
return SERVER_INVALID_ARGUMENT;
}
std::vector<float> vec_f;
vec_f.resize(vec_dim);
const double* d_p = GetVecData();
@ -261,13 +252,16 @@ ServerError AddVectorTask::OnExecute() {
}
engine::IDNumbers vector_ids;
stat = DB()->add_vectors(group_id_, 1, vec_f.data(), vector_ids);
engine::Status stat = DB()->add_vectors(group_id_, 1, vec_f.data(), vector_ids);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
} else {
if(vector_ids.empty()) {
SERVER_LOG_ERROR << "Vector ID not returned";
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return SERVER_UNEXPECTED_ERROR;
} else {
std::string uid = GetVecID();
@ -283,14 +277,16 @@ ServerError AddVectorTask::OnExecute() {
attrib[VECTOR_UID] = tensor_id_;
std::string attrib_str;
AttributeSerializer::Encode(attrib, attrib_str);
IVecIdMapper::GetInstance()->Put(nid, attrib_str);
SERVER_LOG_TRACE << "nid = " << vector_ids[0] << ", uid = " << uid;
IVecIdMapper::GetInstance()->Put(nid, attrib_str, group_id_);
//SERVER_LOG_TRACE << "nid = " << vector_ids[0] << ", uid = " << uid;
}
}
} catch (std::exception& ex) {
SERVER_LOG_ERROR << ex.what();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = ex.what();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
return SERVER_SUCCESS;
@ -416,7 +412,7 @@ void AddBatchVectorTask::ProcessIdMapping(engine::IDNumbers& vector_ids,
attrib[VECTOR_UID] = uid;
std::string attrib_str;
AttributeSerializer::Encode(attrib, attrib_str);
IVecIdMapper::GetInstance()->Put(nid, attrib_str);
IVecIdMapper::GetInstance()->Put(nid, attrib_str, group_id_);
}
}
@ -433,8 +429,10 @@ ServerError AddBatchVectorTask::OnExecute() {
group_info.group_id = group_id_;
engine::Status stat = DB()->get_group(group_info);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
rc.Record("check group dimension");
@ -447,7 +445,9 @@ ServerError AddBatchVectorTask::OnExecute() {
if(vec_dim != group_dim) {
SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim
<< " vs. group dimension:" << group_dim;
return SERVER_INVALID_ARGUMENT;
error_code_ = SERVER_INVALID_VECTOR_DIMENSION;
error_msg_ = "Engine failed: " + stat.ToString();
return error_code_;
}
const double* d_p = GetVecData(i);
@ -462,43 +462,48 @@ ServerError AddBatchVectorTask::OnExecute() {
stat = DB()->add_vectors(group_id_, vec_count, vec_f.data(), vector_ids);
rc.Record("add vectors to engine");
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
if(vector_ids.size() < vec_count) {
SERVER_LOG_ERROR << "Vector ID not returned";
return SERVER_UNEXPECTED_ERROR;
} else {
if(vector_ids.size() < vec_count) {
SERVER_LOG_ERROR << "Vector ID not returned";
return SERVER_UNEXPECTED_ERROR;
tensor_ids_.resize(vector_ids.size());
if(vec_count < USE_MT) {
ProcessIdMapping(vector_ids, 0, vec_count, tensor_ids_);
rc.Record("built id mapping");
} else {
tensor_ids_.resize(vector_ids.size());
if(vec_count < USE_MT) {
ProcessIdMapping(vector_ids, 0, vec_count, tensor_ids_);
rc.Record("built id mapping");
} else {
std::list<std::future<void>> threads_list;
std::list<std::future<void>> threads_list;
uint64_t begin_index = 0, end_index = USE_MT;
while(end_index < vec_count) {
threads_list.push_back(
GetThreadPool().enqueue(&AddBatchVectorTask::ProcessIdMapping,
this, vector_ids, begin_index, end_index, tensor_ids_));
begin_index = end_index;
end_index += USE_MT;
if(end_index > vec_count) {
end_index = vec_count;
}
uint64_t begin_index = 0, end_index = USE_MT;
while(end_index < vec_count) {
threads_list.push_back(
GetThreadPool().enqueue(&AddBatchVectorTask::ProcessIdMapping,
this, vector_ids, begin_index, end_index, tensor_ids_));
begin_index = end_index;
end_index += USE_MT;
if(end_index > vec_count) {
end_index = vec_count;
}
for (std::list<std::future<void>>::iterator it = threads_list.begin(); it != threads_list.end(); it++) {
it->wait();
}
rc.Record("built id mapping by multi-threads:" + std::to_string(threads_list.size()));
}
for (std::list<std::future<void>>::iterator it = threads_list.begin(); it != threads_list.end(); it++) {
it->wait();
}
rc.Record("built id mapping by multi-threads:" + std::to_string(threads_list.size()));
}
}
} catch (std::exception& ex) {
SERVER_LOG_ERROR << ex.what();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = ex.what();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
return SERVER_SUCCESS;
@ -612,15 +617,19 @@ ServerError SearchVectorTask::OnExecute() {
group_info.group_id = group_id_;
engine::Status stat = DB()->get_group(group_info);
if(!stat.ok()) {
SERVER_LOG_ERROR << "Engine failed: " << stat.ToString();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_GROUP_NOT_EXIST;
error_msg_ = "Engine failed: " + stat.ToString();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
uint64_t vec_dim = GetTargetDimension();
if(vec_dim != group_info.dimension) {
SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim
<< " vs. group dimension:" << group_info.dimension;
return SERVER_INVALID_ARGUMENT;
error_code_ = SERVER_INVALID_VECTOR_DIMENSION;
error_msg_ = "Engine failed: " + stat.ToString();
return error_code_;
}
rc.Record("check group dimension");
@ -654,18 +663,30 @@ ServerError SearchVectorTask::OnExecute() {
for(auto id : res) {
std::string attrib_str;
std::string nid = nid_prefix + std::to_string(id);
IVecIdMapper::GetInstance()->Get(nid, attrib_str);
IVecIdMapper::GetInstance()->Get(nid, attrib_str, group_id_);
AttribMap attrib_map;
AttributeSerializer::Decode(attrib_str, attrib_map);
AttribMap attrib_return;
VecSearchResultItem item;
item.__set_attrib(attrib_map);
item.uid = item.attrib[VECTOR_UID];
item.uid = attrib_map[VECTOR_UID];
if(filter_.return_attribs.empty()) {//return all attributes
attrib_return.swap(attrib_map);
} else {//filter attributes
for(auto& name : filter_.return_attribs) {
if(attrib_map.count(name) == 0)
continue;
attrib_return[name] = attrib_map[name];
}
}
item.__set_attrib(attrib_return);
item.distance = 0.0;////TODO: return distance
v_res.result_list.emplace_back(item);
SERVER_LOG_TRACE << "nid = " << nid << ", uid = " << item.uid;
//SERVER_LOG_TRACE << "nid = " << nid << ", uid = " << item.uid;
}
result_.result_list.push_back(v_res);
@ -674,8 +695,10 @@ ServerError SearchVectorTask::OnExecute() {
}
} catch (std::exception& ex) {
SERVER_LOG_ERROR << ex.what();
return SERVER_UNEXPECTED_ERROR;
error_code_ = SERVER_UNEXPECTED_ERROR;
error_msg_ = ex.what();
SERVER_LOG_ERROR << error_msg_;
return error_code_;
}
return SERVER_SUCCESS;

View File

@ -10,7 +10,7 @@
#include "utils/AttributeSerializer.h"
#include "db/Types.h"
#include "thrift/gen-cpp/VectorService_types.h"
#include "thrift/gen-cpp/megasearch_types.h"
#include <condition_variable>
#include <memory>
@ -19,6 +19,7 @@ namespace zilliz {
namespace vecwise {
namespace server {
using namespace megasearch;
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class AddGroupTask : public BaseTask {
public:

View File

@ -10,8 +10,8 @@
#include "utils/Log.h"
#include "thrift/gen-cpp/VectorService_types.h"
#include "thrift/gen-cpp/VectorService_constants.h"
#include "thrift/gen-cpp/megasearch_types.h"
#include "thrift/gen-cpp/megasearch_constants.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TJSONProtocol.h>

View File

@ -1,4 +1,4 @@
#!/bin/bash
../../third_party/build/bin/thrift -r --gen cpp ./VectorService.thrift
../../third_party/build/bin/thrift -r --gen cpp ./megasearch.thrift

View File

@ -6,7 +6,7 @@
*/
#include "VecService.h"
namespace zilliz {
namespace megasearch {
VecService_add_group_args::~VecService_add_group_args() throw() {

View File

@ -9,9 +9,9 @@
#include <thrift/TDispatchProcessor.h>
#include <thrift/async/TConcurrentClientSyncInfo.h>
#include "VectorService_types.h"
#include "megasearch_types.h"
namespace zilliz {
namespace megasearch {
#ifdef _MSC_VER
#pragma warning( push )

View File

@ -12,7 +12,7 @@ using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::zilliz;
using namespace ::megasearch;
class VecServiceHandler : virtual public VecServiceIf {
public:

View File

@ -1,17 +0,0 @@
/**
* Autogenerated by Thrift Compiler (0.11.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#include "VectorService_constants.h"
namespace zilliz {
const VectorServiceConstants g_VectorService_constants;
VectorServiceConstants::VectorServiceConstants() {
}
} // namespace

View File

@ -1,24 +0,0 @@
/**
* Autogenerated by Thrift Compiler (0.11.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#ifndef VectorService_CONSTANTS_H
#define VectorService_CONSTANTS_H
#include "VectorService_types.h"
namespace zilliz {
class VectorServiceConstants {
public:
VectorServiceConstants();
};
extern const VectorServiceConstants g_VectorService_constants;
} // namespace
#endif

View File

@ -0,0 +1,17 @@
/**
* Autogenerated by Thrift Compiler (0.11.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#include "megasearch_constants.h"
namespace megasearch {
const megasearchConstants g_megasearch_constants;
megasearchConstants::megasearchConstants() {
}
} // namespace

View File

@ -0,0 +1,24 @@
/**
* Autogenerated by Thrift Compiler (0.11.0)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#ifndef megasearch_CONSTANTS_H
#define megasearch_CONSTANTS_H
#include "megasearch_types.h"
namespace megasearch {
class megasearchConstants {
public:
megasearchConstants();
};
extern const megasearchConstants g_megasearch_constants;
} // namespace
#endif

View File

@ -4,14 +4,14 @@
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#include "VectorService_types.h"
#include "megasearch_types.h"
#include <algorithm>
#include <ostream>
#include <thrift/TToString.h>
namespace zilliz {
namespace megasearch {
int _kVecErrCodeValues[] = {
VecErrCode::SUCCESS,

View File

@ -4,8 +4,8 @@
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#ifndef VectorService_TYPES_H
#define VectorService_TYPES_H
#ifndef megasearch_TYPES_H
#define megasearch_TYPES_H
#include <iosfwd>
@ -18,7 +18,7 @@
#include <thrift/stdcxx.h>
namespace zilliz {
namespace megasearch {
struct VecErrCode {
enum type {

View File

@ -1,22 +1,15 @@
import time
import struct
from zilliz import VecService
from megasearch import VecService
from thrift import Thrift
#Note: pip install thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol, TCompactProtocol, TJSONProtocol
def print_time_cost(desc, t):
time_now = time.time()
print(desc + ' cost ', time_now - t, ' sec')
return time_now
def test_vecwise():
def test_megasearch():
try:
time_start = time.time()
#connect
transport = TSocket.TSocket('localhost', 33001)
transport = TTransport.TBufferedTransport(transport)
@ -24,26 +17,13 @@ def test_vecwise():
client = VecService.Client(protocol)
transport.open()
time_start = print_time_cost('connection', time_start)
print("connected");
#add group
group = VecService.VecGroup("py_group_" + time.strftime('%H%M%S'), 256)
group = VecService.VecGroup("test_" + time.strftime('%H%M%S'), 256)
client.add_group(group)
time_start = print_time_cost('add group', time_start)
#build vectors
# vec_list = VecService.VecTensorList([])
# for i in range(10000):
# vec = VecService.VecTensor("normal_"+str(i), [])
# for k in range(group.dimension):
# vec.tensor.append(k)
# vec_list.tensor_list.append(vec)
#time_start = print_time_cost('build normal vectors', time_start)
#add vectors
#client.add_vector_batch(group.id, vec_list)
#time_start = print_time_cost('add normal vectors', time_start))
print("group added");
# build binary vectors
bin_vec_list = VecService.VecBinaryTensorList([])
@ -55,14 +35,12 @@ def test_vecwise():
bin_vec.tensor = struct.pack(str(group.dimension)+"d", *a)
bin_vec_list.tensor_list.append(bin_vec)
time_start = print_time_cost('build binary vectors', time_start)
# add vectors
client.add_binary_vector_batch(group.id, bin_vec_list)
time_start = print_time_cost('add binary vectors', time_start)
time.sleep(5)
time_start = print_time_cost('sleep 5 seconds', time_start)
wait_storage = 5
print("wait {} seconds for persisting data".format(wait_storage))
time.sleep(wait_storage)
# search vector
a = []
@ -72,18 +50,19 @@ def test_vecwise():
bin_vec.tensor = struct.pack(str(group.dimension) + "d", *a)
filter = VecService.VecSearchFilter()
res = VecService.VecSearchResult()
print("begin search ...");
res = client.search_binary_vector(group.id, 5, bin_vec, filter)
time_start = print_time_cost('search binary vectors', time_start)
print('result count: ' + str(len(res.result_list)))
for item in res.result_list:
print(item.uid)
transport.close()
time_start = print_time_cost('close connection', time_start)
print("disconnected");
except Thrift.TException as ex:
print(ex.message)
except VecService.VecException as ex:
print(ex.reason)
test_vecwise()
test_megasearch()

View File

@ -1,187 +0,0 @@
#!/usr/bin/env python
#
# Autogenerated by Thrift Compiler (0.11.0)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
# options string: py
#
import sys
import pprint
if sys.version_info[0] > 2:
from urllib.parse import urlparse
else:
from urlparse import urlparse
from thrift.transport import TTransport, TSocket, TSSLSocket, THttpClient
from thrift.protocol.TBinaryProtocol import TBinaryProtocol
from zilliz import VecService
from zilliz.ttypes import *
if len(sys.argv) <= 1 or sys.argv[1] == '--help':
print('')
print('Usage: ' + sys.argv[0] + ' [-h host[:port]] [-u url] [-f[ramed]] [-s[sl]] [-novalidate] [-ca_certs certs] [-keyfile keyfile] [-certfile certfile] function [arg1 [arg2...]]')
print('')
print('Functions:')
print(' void add_group(VecGroup group)')
print(' VecGroup get_group(string group_id)')
print(' void del_group(string group_id)')
print(' string add_vector(string group_id, VecTensor tensor)')
print(' add_vector_batch(string group_id, VecTensorList tensor_list)')
print(' string add_binary_vector(string group_id, VecBinaryTensor tensor)')
print(' add_binary_vector_batch(string group_id, VecBinaryTensorList tensor_list)')
print(' VecSearchResult search_vector(string group_id, i64 top_k, VecTensor tensor, VecSearchFilter filter)')
print(' VecSearchResultList search_vector_batch(string group_id, i64 top_k, VecTensorList tensor_list, VecSearchFilter filter)')
print(' VecSearchResult search_binary_vector(string group_id, i64 top_k, VecBinaryTensor tensor, VecSearchFilter filter)')
print(' VecSearchResultList search_binary_vector_batch(string group_id, i64 top_k, VecBinaryTensorList tensor_list, VecSearchFilter filter)')
print('')
sys.exit(0)
pp = pprint.PrettyPrinter(indent=2)
host = 'localhost'
port = 9090
uri = ''
framed = False
ssl = False
validate = True
ca_certs = None
keyfile = None
certfile = None
http = False
argi = 1
if sys.argv[argi] == '-h':
parts = sys.argv[argi + 1].split(':')
host = parts[0]
if len(parts) > 1:
port = int(parts[1])
argi += 2
if sys.argv[argi] == '-u':
url = urlparse(sys.argv[argi + 1])
parts = url[1].split(':')
host = parts[0]
if len(parts) > 1:
port = int(parts[1])
else:
port = 80
uri = url[2]
if url[4]:
uri += '?%s' % url[4]
http = True
argi += 2
if sys.argv[argi] == '-f' or sys.argv[argi] == '-framed':
framed = True
argi += 1
if sys.argv[argi] == '-s' or sys.argv[argi] == '-ssl':
ssl = True
argi += 1
if sys.argv[argi] == '-novalidate':
validate = False
argi += 1
if sys.argv[argi] == '-ca_certs':
ca_certs = sys.argv[argi+1]
argi += 2
if sys.argv[argi] == '-keyfile':
keyfile = sys.argv[argi+1]
argi += 2
if sys.argv[argi] == '-certfile':
certfile = sys.argv[argi+1]
argi += 2
cmd = sys.argv[argi]
args = sys.argv[argi + 1:]
if http:
transport = THttpClient.THttpClient(host, port, uri)
else:
if ssl:
socket = TSSLSocket.TSSLSocket(host, port, validate=validate, ca_certs=ca_certs, keyfile=keyfile, certfile=certfile)
else:
socket = TSocket.TSocket(host, port)
if framed:
transport = TTransport.TFramedTransport(socket)
else:
transport = TTransport.TBufferedTransport(socket)
protocol = TBinaryProtocol(transport)
client = VecService.Client(protocol)
transport.open()
if cmd == 'add_group':
if len(args) != 1:
print('add_group requires 1 args')
sys.exit(1)
pp.pprint(client.add_group(eval(args[0]),))
elif cmd == 'get_group':
if len(args) != 1:
print('get_group requires 1 args')
sys.exit(1)
pp.pprint(client.get_group(args[0],))
elif cmd == 'del_group':
if len(args) != 1:
print('del_group requires 1 args')
sys.exit(1)
pp.pprint(client.del_group(args[0],))
elif cmd == 'add_vector':
if len(args) != 2:
print('add_vector requires 2 args')
sys.exit(1)
pp.pprint(client.add_vector(args[0], eval(args[1]),))
elif cmd == 'add_vector_batch':
if len(args) != 2:
print('add_vector_batch requires 2 args')
sys.exit(1)
pp.pprint(client.add_vector_batch(args[0], eval(args[1]),))
elif cmd == 'add_binary_vector':
if len(args) != 2:
print('add_binary_vector requires 2 args')
sys.exit(1)
pp.pprint(client.add_binary_vector(args[0], eval(args[1]),))
elif cmd == 'add_binary_vector_batch':
if len(args) != 2:
print('add_binary_vector_batch requires 2 args')
sys.exit(1)
pp.pprint(client.add_binary_vector_batch(args[0], eval(args[1]),))
elif cmd == 'search_vector':
if len(args) != 4:
print('search_vector requires 4 args')
sys.exit(1)
pp.pprint(client.search_vector(args[0], eval(args[1]), eval(args[2]), eval(args[3]),))
elif cmd == 'search_vector_batch':
if len(args) != 4:
print('search_vector_batch requires 4 args')
sys.exit(1)
pp.pprint(client.search_vector_batch(args[0], eval(args[1]), eval(args[2]), eval(args[3]),))
elif cmd == 'search_binary_vector':
if len(args) != 4:
print('search_binary_vector requires 4 args')
sys.exit(1)
pp.pprint(client.search_binary_vector(args[0], eval(args[1]), eval(args[2]), eval(args[3]),))
elif cmd == 'search_binary_vector_batch':
if len(args) != 4:
print('search_binary_vector_batch requires 4 args')
sys.exit(1)
pp.pprint(client.search_binary_vector_batch(args[0], eval(args[1]), eval(args[2]), eval(args[3]),))
else:
print('Unrecognized method %s' % cmd)
sys.exit(1)
transport.close()

File diff suppressed because it is too large Load Diff

View File

@ -1 +0,0 @@
__all__ = ['ttypes', 'constants', 'VecService']

View File

@ -1,14 +0,0 @@
#
# Autogenerated by Thrift Compiler (0.11.0)
#
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
#
# options string: py
#
from thrift.Thrift import TType, TMessageType, TFrozenDict, TException, TApplicationException
from thrift.protocol.TProtocol import TProtocolException
from thrift.TRecursive import fix_spec
import sys
from .ttypes import *

File diff suppressed because it is too large Load Diff

View File

@ -3,16 +3,16 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
namespace cl zilliz
namespace cpp zilliz
namespace py zilliz
namespace d zilliz
namespace dart zilliz
namespace java zilliz
namespace perl zilliz
namespace php zilliz
namespace haxe zilliz
namespace netcore zilliz
namespace cl megasearch
namespace cpp megasearch
namespace py megasearch
namespace d megasearch
namespace dart megasearch
namespace java megasearch
namespace perl megasearch
namespace php megasearch
namespace haxe megasearch
namespace netcore megasearch
enum VecErrCode {
SUCCESS = 0,
@ -69,9 +69,9 @@ struct VecSearchResultList {
}
/**
* second; Seconds. [0-60] (1 leap second)
* minute; Minutes. [0-59]
* hour; Hours. [0-23]
* second; Seconds. [0-59] reserved
* minute; Minutes. [0-59] reserved
* hour; Hours. [0-23] reserved
* day; Day. [1-31]
* month; Month. [0-11]
* year; Year - 1900.
@ -87,9 +87,9 @@ struct VecDateTime {
/**
* time_begin; time range begin
* begine_closed; true means '[', false means '('
* begine_closed; true means '[', false means '(' reserved
* time_end; set to true to return tensor double array
* end_closed; time range end
* end_closed; time range end reserved
*/
struct VecTimeRange {
1: required VecDateTime time_begin;
@ -99,7 +99,7 @@ struct VecTimeRange {
}
/**
* attrib_filter; search condition, for example: "color=red"
* attrib_filter; reserved
* time_ranges; search condition, for example: "date between 1999-02-12 and 2008-10-14"
* return_attribs; specify required attribute names
*/

View File

@ -1,4 +1,4 @@
#!/bin/bash
../../third_party/build/bin/thrift -r --gen py ./VectorService.thrift
../../third_party/build/bin/thrift -r --gen py ./megasearch.thrift

View File

@ -21,7 +21,7 @@ constexpr ServerError SERVER_ERROR_CODE_BASE = 0x30000;
constexpr ServerError
ToGlobalServerErrorCode(const ServerError error_code) {
return SERVER_ERROR_CODE_BASE + SERVER_ERROR_CODE_BASE;
return SERVER_ERROR_CODE_BASE + error_code;
}
constexpr ServerError SERVER_UNEXPECTED_ERROR = ToGlobalServerErrorCode(0x001);
@ -31,8 +31,11 @@ constexpr ServerError SERVER_INVALID_ARGUMENT = ToGlobalServerErrorCode(0x004);
constexpr ServerError SERVER_FILE_NOT_FOUND = ToGlobalServerErrorCode(0x005);
constexpr ServerError SERVER_NOT_IMPLEMENT = ToGlobalServerErrorCode(0x006);
constexpr ServerError SERVER_BLOCKING_QUEUE_EMPTY = ToGlobalServerErrorCode(0x007);
constexpr ServerError SERVER_LICENSE_VALIDATION_FAIL = ToGlobalServerErrorCode(0x008);
constexpr ServerError SERVER_LICENSE_FILE_NOT_EXIST = ToGlobalServerErrorCode(0x009);
constexpr ServerError SERVER_GROUP_NOT_EXIST = ToGlobalServerErrorCode(0x008);
constexpr ServerError SERVER_INVALID_TIME_RANGE = ToGlobalServerErrorCode(0x009);
constexpr ServerError SERVER_INVALID_VECTOR_DIMENSION = ToGlobalServerErrorCode(0x00a);
constexpr ServerError SERVER_LICENSE_VALIDATION_FAIL = ToGlobalServerErrorCode(0x00b);
constexpr ServerError SERVER_LICENSE_FILE_NOT_EXIST = ToGlobalServerErrorCode(0x00c);
class ServerException : public std::exception {
public:

View File

@ -16,8 +16,8 @@ set(util_files
set(service_files
../src/thrift/gen-cpp/VecService.cpp
../src/thrift/gen-cpp/VectorService_constants.cpp
../src/thrift/gen-cpp/VectorService_types.cpp)
../src/thrift/gen-cpp/megasearch_constants.cpp
../src/thrift/gen-cpp/megasearch_types.cpp)

View File

@ -6,8 +6,8 @@
#include "ClientSession.h"
#include "Log.h"
#include "thrift/gen-cpp/VectorService_types.h"
#include "thrift/gen-cpp/VectorService_constants.h"
#include "thrift/gen-cpp/megasearch_types.h"
#include "thrift/gen-cpp/megasearch_constants.h"
#include <exception>
@ -26,6 +26,8 @@ namespace zilliz {
namespace vecwise {
namespace client {
using namespace megasearch;
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;

View File

@ -13,7 +13,7 @@ namespace zilliz {
namespace vecwise {
namespace client {
using VecServiceClientPtr = std::shared_ptr<VecServiceClient>;
using VecServiceClientPtr = std::shared_ptr<megasearch::VecServiceClient>;
class ClientSession {
public:

View File

@ -15,13 +15,14 @@
#include <time.h>
using namespace megasearch;
using namespace zilliz;
using namespace zilliz::vecwise;
using namespace zilliz::vecwise::client;
namespace {
static const int32_t VEC_DIMENSION = 256;
static const int64_t BATCH_COUNT = 1000;
static const int64_t BATCH_COUNT = 10000;
static const int64_t REPEAT_COUNT = 1;
static const int64_t TOP_K = 10;
@ -126,6 +127,28 @@ TEST(AddVector, CLIENT_TEST) {
GetServerAddress(address, port, protocol);
client::ClientSession session(address, port, protocol);
//verify get invalid group
try {
std::string id;
VecTensor tensor;
for(int32_t i = 0; i < VEC_DIMENSION; i++) {
tensor.tensor.push_back(0.5);
}
session.interface()->add_vector(id, GetGroupID(), tensor);
} catch (VecException& ex) {
CLIENT_LOG_ERROR << "request encounter exception: " << ex.what();
ASSERT_EQ(ex.code, VecErrCode::ILLEGAL_ARGUMENT);
}
try {
VecGroup temp_group;
session.interface()->get_group(temp_group, GetGroupID());
//ASSERT_TRUE(temp_group.id.empty());
} catch (VecException& ex) {
CLIENT_LOG_ERROR << "request encounter exception: " << ex.what();
ASSERT_EQ(ex.code, VecErrCode::GROUP_NOT_EXISTS);
}
//add group
VecGroup group;
group.id = GetGroupID();
@ -214,7 +237,6 @@ TEST(SearchVector, CLIENT_TEST) {
//search vector
{
const int32_t anchor_index = 100;
server::TimeRecorder rc("Search top_k");
VecTensor tensor;
for (int32_t i = 0; i < VEC_DIMENSION; i++) {
tensor.tensor.push_back((double) (i + anchor_index));
@ -232,35 +254,60 @@ TEST(SearchVector, CLIENT_TEST) {
time_ranges.emplace_back(range);
filter.__set_time_ranges(time_ranges);
//do search
session.interface()->search_vector(res, GetGroupID(), TOP_K, tensor, filter);
//normal search
{
server::TimeRecorder rc("Search top_k");
session.interface()->search_vector(res, GetGroupID(), TOP_K, tensor, filter);
rc.Elapse("done!");
//build result
std::cout << "Search result: " << std::endl;
for(VecSearchResultItem& item : res.result_list) {
std::cout << "\t" << item.uid << std::endl;
//build result
std::cout << "Search result: " << std::endl;
for (VecSearchResultItem &item : res.result_list) {
std::cout << "\t" << item.uid << std::endl;
ASSERT_TRUE(item.attrib.count(TEST_ATTRIB_NUM) != 0);
ASSERT_TRUE(item.attrib.count(TEST_ATTRIB_COMMENT) != 0);
ASSERT_TRUE(!item.attrib[TEST_ATTRIB_COMMENT].empty());
ASSERT_TRUE(item.attrib.count(TEST_ATTRIB_NUM) != 0);
ASSERT_TRUE(item.attrib.count(TEST_ATTRIB_COMMENT) != 0);
ASSERT_TRUE(!item.attrib[TEST_ATTRIB_COMMENT].empty());
}
ASSERT_EQ(res.result_list.size(), (uint64_t) TOP_K);
if (!res.result_list.empty()) {
ASSERT_TRUE(!res.result_list[0].uid.empty());
}
}
rc.Elapse("done!");
ASSERT_EQ(res.result_list.size(), (uint64_t)TOP_K);
if(!res.result_list.empty()) {
ASSERT_TRUE(!res.result_list[0].uid.empty());
//filter attribute search
{
std::vector<std::string> require_attributes = {TEST_ATTRIB_COMMENT};
filter.__set_return_attribs(require_attributes);
server::TimeRecorder rc("Search top_k with attribute filter");
session.interface()->search_vector(res, GetGroupID(), TOP_K, tensor, filter);
rc.Elapse("done!");
//build result
std::cout << "Search result attributes: " << std::endl;
for (VecSearchResultItem &item : res.result_list) {
ASSERT_EQ(item.attrib.size(), 1UL);
ASSERT_TRUE(item.attrib.count(TEST_ATTRIB_COMMENT) != 0);
ASSERT_TRUE(!item.attrib[TEST_ATTRIB_COMMENT].empty());
std::cout << "\t" << item.uid << ":" << item.attrib[TEST_ATTRIB_COMMENT] << std::endl;
}
ASSERT_EQ(res.result_list.size(), (uint64_t) TOP_K);
}
//empty search
date.day > 0 ? date.day -= 1 : date.day += 1;
range.time_begin = date;
range.time_end = date;
time_ranges.clear();
time_ranges.emplace_back(range);
filter.__set_time_ranges(time_ranges);
session.interface()->search_vector(res, GetGroupID(), TOP_K, tensor, filter);
{
date.day > 0 ? date.day -= 1 : date.day += 1;
range.time_begin = date;
range.time_end = date;
time_ranges.clear();
time_ranges.emplace_back(range);
filter.__set_time_ranges(time_ranges);
session.interface()->search_vector(res, GetGroupID(), TOP_K, tensor, filter);
ASSERT_EQ(res.result_list.size(), 0);
ASSERT_EQ(res.result_list.size(), 0);
}
}
//search binary vector

View File

@ -14,6 +14,7 @@ link_directories("/usr/local/cuda/lib64")
set(require_files
../../src/server/VecIdMapper.cpp
../../src/server/RocksIdMapper.cpp
../../src/server/ServerConfig.cpp
../../src/utils/CommonUtil.cpp
../../src/utils/TimeRecorder.cpp

View File

@ -16,6 +16,7 @@ aux_source_directory(./ test_srcs)
set(require_files
../../src/server/VecIdMapper.cpp
../../src/server/RocksIdMapper.cpp
../../src/server/ServerConfig.cpp
../../src/utils/CommonUtil.cpp
../../src/utils/TimeRecorder.cpp

View File

@ -11,12 +11,13 @@ using namespace zilliz::vecwise;
TEST(CommonTest, COMMON_TEST) {
std::string path1 = "/tmp/vecwise_test/common_test_12345/";
std::string path2 = path1 + "abcdef";
server::ServerError err = server::CommonUtil::CreateDirectory(path2);
std::string path1 = "/tmp/vecwise_test/";
std::string path2 = path1 + "common_test_12345/";
std::string path3 = path2 + "abcdef";
server::ServerError err = server::CommonUtil::CreateDirectory(path3);
ASSERT_EQ(err, server::SERVER_SUCCESS);
ASSERT_TRUE(server::CommonUtil::IsDirectoryExit(path2));
ASSERT_TRUE(server::CommonUtil::IsDirectoryExit(path3));
err = server::CommonUtil::DeleteDirectory(path1);
ASSERT_EQ(err, server::SERVER_SUCCESS);

View File

@ -9,44 +9,93 @@
#include "utils/TimeRecorder.h"
#include "utils/CommonUtil.h"
#include <time.h>
using namespace zilliz::vecwise;
namespace {
std::string CurrentTime() {
time_t tt;
time(&tt);
tt = tt + 8 * 3600;
tm *t = gmtime(&tt);
std::string str = std::to_string(t->tm_year + 1900) + "_" + std::to_string(t->tm_mon + 1)
+ "_" + std::to_string(t->tm_mday) + "_" + std::to_string(t->tm_hour)
+ "_" + std::to_string(t->tm_min) + "_" + std::to_string(t->tm_sec);
return str;
}
std::string GetGroupID() {
static std::string s_id(CurrentTime());
return s_id;
}
}
TEST(IdMapperTest, IDMAPPER_TEST) {
server::IVecIdMapper* mapper = server::IVecIdMapper::GetInstance();
std::string group_id = GetGroupID();
std::vector<std::string> nid = {"1", "50", "900", "10000"};
std::vector<std::string> sid = {"one", "fifty", "nine zero zero", "many"};
server::ServerError err = mapper->Put(nid, sid);
server::ServerError err = mapper->Put(nid, sid, group_id);
ASSERT_EQ(err, server::SERVER_SUCCESS);
err = mapper->Put(nid, std::vector<std::string>());
err = mapper->Put(nid, std::vector<std::string>(), group_id);
ASSERT_NE(err, server::SERVER_SUCCESS);
std::vector<std::string> res;
err = mapper->Get(nid, res);
err = mapper->Get(nid, res, group_id);
ASSERT_EQ(res.size(), nid.size());
for(size_t i = 0; i < res.size(); i++) {
ASSERT_EQ(res[i], sid[i]);
}
std::string str_id;
err = mapper->Get(nid[1], str_id);
err = mapper->Get(nid[1], str_id, group_id);
ASSERT_EQ(str_id, "fifty");
err = mapper->Delete(nid[2]);
err = mapper->Get(nid[1], str_id);
ASSERT_EQ(str_id, "");
err = mapper->Get(nid[2], str_id, group_id);
ASSERT_EQ(str_id, "nine zero zero");
err = mapper->Delete(nid[2], group_id);
ASSERT_EQ(err, server::SERVER_SUCCESS);
err = mapper->Get(nid[2], str_id);
ASSERT_NE(err, server::SERVER_SUCCESS);
err = mapper->Get(nid[2], str_id, group_id);
ASSERT_EQ(str_id, "");
err = mapper->Get(nid[3], str_id, group_id);
ASSERT_EQ(str_id, "many");
err = mapper->DeleteGroup(group_id);
ASSERT_EQ(err, server::SERVER_SUCCESS);
err = mapper->Get(nid[3], str_id, group_id);
ASSERT_EQ(str_id, "");
std::string ct = CurrentTime();
err = mapper->Put("current_time", ct, "time");
ASSERT_EQ(err, server::SERVER_SUCCESS);
err = mapper->Get("current_time", str_id, "time");
ASSERT_EQ(str_id, ct);
//test performance
nid.clear();
sid.clear();
const int64_t count = 1000000;
for(int64_t i = 0; i < count; i++) {
nid.push_back(std::to_string(i+100000));
sid.push_back("val_" + std::to_string(i));
{
server::TimeRecorder rc("prepare id data");
for (int64_t i = 0; i < count; i++) {
nid.push_back(std::to_string(i + 100000));
sid.push_back("val_" + std::to_string(i));
}
rc.Record("done!");
}
{