multi processes test

Former-commit-id: dd645c73b6983f86c5ed3b1a5a3480673bad4538
This commit is contained in:
kun yu 2019-08-05 09:44:29 +08:00
parent 90516aeb52
commit 409175f9d5
5 changed files with 145 additions and 97 deletions

View File

@ -185,108 +185,153 @@ namespace {
void
ClientTest::Test(const std::string& address, const std::string& port) {
std::shared_ptr<Connection> conn = Connection::Create();
// std::shared_ptr<Connection> conn = Connection::Create();
//
// {//connect server
// ConnectParam param = {address, port};
// Status stat = conn->Connect(param);
// std::cout << "Connect function call status: " << stat.ToString() << std::endl;
// }
//
// {//server version
// std::string version = conn->ServerVersion();
// std::cout << "Server version: " << version << std::endl;
// }
//
// {//sdk version
// std::string version = conn->ClientVersion();
// std::cout << "SDK version: " << version << std::endl;
// }
//
// {
// std::vector<std::string> tables;
// Status stat = conn->ShowTables(tables);
// std::cout << "ShowTables function call status: " << stat.ToString() << std::endl;
// std::cout << "All tables: " << std::endl;
// for(auto& table : tables) {
// int64_t row_count = 0;
// stat = conn->GetTableRowCount(table, row_count);
// std::cout << "\t" << table << "(" << row_count << " rows)" << std::endl;
// }
// }
//
// {//create table
// TableSchema tb_schema = BuildTableSchema();
// Status stat = conn->CreateTable(tb_schema);
// std::cout << "CreateTable function call status: " << stat.ToString() << std::endl;
// PrintTableSchema(tb_schema);
//
// bool has_table = conn->HasTable(tb_schema.table_name);
// if(has_table) {
// std::cout << "Table is created" << std::endl;
// }
// }
//
// {//describe table
// TableSchema tb_schema;
// Status stat = conn->DescribeTable(TABLE_NAME, tb_schema);
// std::cout << "DescribeTable function call status: " << stat.ToString() << std::endl;
// PrintTableSchema(tb_schema);
// }
//
// Connection::Destroy(conn);
{//connect server
ConnectParam param = {address, port};
Status stat = conn->Connect(param);
std::cout << "Connect function call status: " << stat.ToString() << std::endl;
}
{//server version
std::string version = conn->ServerVersion();
std::cout << "Server version: " << version << std::endl;
}
{//sdk version
std::string version = conn->ClientVersion();
std::cout << "SDK version: " << version << std::endl;
}
{
std::vector<std::string> tables;
Status stat = conn->ShowTables(tables);
std::cout << "ShowTables function call status: " << stat.ToString() << std::endl;
std::cout << "All tables: " << std::endl;
for(auto& table : tables) {
int64_t row_count = 0;
stat = conn->GetTableRowCount(table, row_count);
std::cout << "\t" << table << "(" << row_count << " rows)" << std::endl;
pid_t pid;
for (int i = 0; i < 5; ++i) {
pid = fork();
if (pid == 0 || pid == -1) {
break;
}
}
if (pid == -1) {
std::cout << "fail to fork!\n";
exit(1);
} else if (pid == 0) {
std::shared_ptr<Connection> conn = Connection::Create();
{//create table
TableSchema tb_schema = BuildTableSchema();
Status stat = conn->CreateTable(tb_schema);
std::cout << "CreateTable function call status: " << stat.ToString() << std::endl;
PrintTableSchema(tb_schema);
bool has_table = conn->HasTable(tb_schema.table_name);
if(has_table) {
std::cout << "Table is created" << std::endl;
{//connect server
ConnectParam param = {address, port};
Status stat = conn->Connect(param);
std::cout << "Connect function call status: " << stat.ToString() << std::endl;
}
}
{//describe table
TableSchema tb_schema;
Status stat = conn->DescribeTable(TABLE_NAME, tb_schema);
std::cout << "DescribeTable function call status: " << stat.ToString() << std::endl;
PrintTableSchema(tb_schema);
}
std::vector<std::pair<int64_t, RowRecord>> search_record_array;
{//insert vectors
for (int i = 0; i < ADD_VECTOR_LOOP; i++) {//add vectors
std::vector<RowRecord> record_array;
int64_t begin_index = i * BATCH_ROW_COUNT;
BuildVectors(begin_index, begin_index + BATCH_ROW_COUNT, record_array);
std::vector<int64_t> record_ids;
auto start = std::chrono::high_resolution_clock::now();
Status stat = conn->InsertVector(TABLE_NAME, record_array, record_ids);
auto finish = std::chrono::high_resolution_clock::now();
std::cout << "InsertVector cost: " << std::chrono::duration_cast<std::chrono::duration<double>>(finish - start).count() << "s\n";
std::cout << "InsertVector function call status: " << stat.ToString() << std::endl;
std::cout << "Returned id array count: " << record_ids.size() << std::endl;
if(search_record_array.size() < NQ) {
search_record_array.push_back(
std::make_pair(record_ids[SEARCH_TARGET], record_array[SEARCH_TARGET]));
}
{//server version
std::string version = conn->ServerVersion();
std::cout << "Server version: " << version << std::endl;
}
Connection::Destroy(conn);
exit(0);
} else {
std::shared_ptr<Connection> conn = Connection::Create();
{//connect server
ConnectParam param = {address, port};
Status stat = conn->Connect(param);
std::cout << "Connect function call status: " << stat.ToString() << std::endl;
}
{//server version
std::string version = conn->ServerVersion();
std::cout << "Server version: " << version << std::endl;
}
Connection::Destroy(conn);
std::cout << "in main process\n";
exit(0);
}
{//search vectors without index
Sleep(2);
DoSearch(conn, search_record_array, "Search without index");
}
{//wait unit build index finish
std::cout << "Wait until build all index done" << std::endl;
Status stat = conn->BuildIndex(TABLE_NAME);
std::cout << "BuildIndex function call status: " << stat.ToString() << std::endl;
}
{//search vectors after build index finish
DoSearch(conn, search_record_array, "Search after build index finish");
}
{//delete table
Status stat = conn->DropTable(TABLE_NAME);
std::cout << "DeleteTable function call status: " << stat.ToString() << std::endl;
}
{//server status
std::string status = conn->ServerStatus();
std::cout << "Server status before disconnect: " << status << std::endl;
}
Connection::Destroy(conn);
// conn->Disconnect();
{//server status
std::string status = conn->ServerStatus();
std::cout << "Server status after disconnect: " << status << std::endl;
}
// std::vector<std::pair<int64_t, RowRecord>> search_record_array;
// {//insert vectors
// for (int i = 0; i < ADD_VECTOR_LOOP; i++) {//add vectors
// std::vector<RowRecord> record_array;
// int64_t begin_index = i * BATCH_ROW_COUNT;
// BuildVectors(begin_index, begin_index + BATCH_ROW_COUNT, record_array);
// std::vector<int64_t> record_ids;
//
// auto start = std::chrono::high_resolution_clock::now();
//
// Status stat = conn->InsertVector(TABLE_NAME, record_array, record_ids);
// auto finish = std::chrono::high_resolution_clock::now();
// std::cout << "InsertVector cost: " << std::chrono::duration_cast<std::chrono::duration<double>>(finish - start).count() << "s\n";
//
//
// std::cout << "InsertVector function call status: " << stat.ToString() << std::endl;
// std::cout << "Returned id array count: " << record_ids.size() << std::endl;
//
// if(search_record_array.size() < NQ) {
// search_record_array.push_back(
// std::make_pair(record_ids[SEARCH_TARGET], record_array[SEARCH_TARGET]));
// }
// }
// }
//
// {//search vectors without index
// Sleep(2);
// DoSearch(conn, search_record_array, "Search without index");
// }
//
// {//wait unit build index finish
//// std::cout << "Wait until build all index done" << std::endl;
//// Status stat = conn->BuildIndex(TABLE_NAME);
//// std::cout << "BuildIndex function call status: " << stat.ToString() << std::endl;
// }
//
// {//search vectors after build index finish
// DoSearch(conn, search_record_array, "Search after build index finish");
// }
//
// {//delete table
// Status stat = conn->DropTable(TABLE_NAME);
// std::cout << "DeleteTable function call status: " << stat.ToString() << std::endl;
// }
//
// {//server status
// std::string status = conn->ServerStatus();
// std::cout << "Server status before disconnect: " << status << std::endl;
// }
// Connection::Destroy(conn);
//// conn->Disconnect();
// {//server status
// std::string status = conn->ServerStatus();
// std::cout << "Server status after disconnect: " << status << std::endl;
// }
}

View File

@ -133,7 +133,6 @@ ClientProxy::InsertVector(const std::string &table_name,
const std::vector<RowRecord> &record_array,
std::vector<int64_t> &id_array) {
try {
////////////////////////////////////////////////////////////////////////////
#ifdef GRPC_MULTIPLE_THREAD
//multithread

View File

@ -190,7 +190,9 @@ GrpcClient::Ping(std::string &result, const std::string& cmd) {
::milvus::grpc::ServerStatus response;
::milvus::grpc::Command command;
command.set_cmd(cmd);
std::cout << "in ping \n";
::grpc::Status status = stub_->Ping(&context, command, &response);
std::cout << "finish ping stub \n";
result = response.info();
if (!status.ok()) {

View File

@ -31,7 +31,7 @@ namespace server {
static std::unique_ptr<grpc::Server> server;
constexpr long MESSAGE_SIZE = 400 * 1024 * 1024;
constexpr long MESSAGE_SIZE = -1;
void
MilvusServer::StartService() {

View File

@ -107,6 +107,7 @@ RequestHandler::GetTableRowCount(::grpc::ServerContext* context, const ::milvus:
::grpc::Status
RequestHandler::ShowTables(::grpc::ServerContext* context, const ::milvus::grpc::Command* request, ::grpc::ServerWriter< ::milvus::grpc::TableName>* writer) {
std::cout << "in handler\n";
BaseTaskPtr task_ptr = ShowTablesTask::Create(*writer);
::milvus::grpc::Status grpc_status;
RequestScheduler::ExecTask(task_ptr, &grpc_status);
@ -120,6 +121,7 @@ RequestHandler::ShowTables(::grpc::ServerContext* context, const ::milvus::grpc:
::grpc::Status
RequestHandler::Ping(::grpc::ServerContext* context, const ::milvus::grpc::Command* request, ::milvus::grpc::ServerStatus* response) {
std::cout << "in handler\n";
std::string result;
BaseTaskPtr task_ptr = PingTask::Create(request->cmd(), result);
::milvus::grpc::Status grpc_status;