fix the changelog merge conflict

Former-commit-id: e3032f4d47c793365de5bce0f49d790c7b3c82ba
This commit is contained in:
Heisenberg 2019-09-09 19:17:50 +08:00
commit a823d6e4bf
63 changed files with 1657 additions and 648 deletions

View File

@ -29,6 +29,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-492 - Drop index failed if index have been created with index_type: FLAT
- MS-493 - Knowhere unittest crash
- MS-453 - GPU search error when nprobe set more than 1024
- MS-510 - unittest out of memory and crashed
- MS-119 - The problem of combining the log files
## Improvement
@ -94,7 +95,20 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-488 - Improve code format in scheduler
- MS-495 - cmake: integrated knowhere
- MS-496 - Change the top_k limitation from 1024 to 2048
- MS-502 - Update tasktable_test in scheduler
- MS-504 - Update node_test in scheduler
- MS-505 - Install core unit test and add to coverage
- MS-508 - Update normal_test in scheduler
- MS-511 - Update resource_test in scheduler
- MS-517 - Update resource_mgr_test in scheduler
- MS-518 - Add schedinst_test in scheduler
- MS-519 - Add event_test in scheduler
- MS-520 - Update resource_test in scheduler
- MS-524 - Add some unittest in event_test and resource_test
- MS-525 - Disable parallel reduce in SearchTask
- MS-527 - Update scheduler_test and enable it
- MS-528 - Hide some config used future
- MS-530 - Add unittest for SearchTask->Load
## New Feature
- MS-343 - Implement ResourceMgr

View File

@ -48,51 +48,38 @@ resource_config:
# example:
# resource_name: # resource name, just using in connections below
# type: DISK # resource type, optional: DISK/CPU/GPU
# memory: 256 # memory size, unit: GB
# device_id: 0
# enable_loader: true # if is enable loader, optional: true, false
# enable_executor: false # if is enable executor, optional: true, false
resources:
ssda:
type: DISK
memory: 2048
device_id: 0
enable_loader: true
enable_executor: false
cpu:
type: CPU
memory: 64
device_id: 0
enable_loader: true
enable_executor: false
gpu0:
type: GPU
memory: 6
device_id: 0
enable_loader: true
enable_executor: true
gpu_resource_num: 2
pinned_memory: 300
temp_memory: 300
# gtx1660:
# type: GPU
# memory: 6
# device_id: 1
# enable_loader: true
# enable_executor: true
# connection list, length: 0~N
# format: -${resource_name}===${resource_name}
# example:
# connection_name:
# speed: 100 # unit: MS/s
# endpoint: ${resource_name}===${resource_name}
connections:
io:
speed: 500
endpoint: ssda===cpu
pcie:
pcie0:
speed: 11000
endpoint: cpu===gpu0
# - cpu===gtx1660

View File

@ -64,6 +64,13 @@ class FaissGpuResourceMgr {
void
MoveToIdle(const int64_t &device_id, const ResPtr& res);
void
Dump();
protected:
void
RemoveResource(const int64_t& device_id, const ResPtr& res, std::map<int64_t, std::vector<ResPtr>>& resource_pool);
protected:
bool is_init = false;

View File

@ -138,8 +138,8 @@ class NsgIndex {
void FindUnconnectedNode(boost::dynamic_bitset<> &flags, int64_t &root);
private:
void GetKnnGraphFromFile();
//private:
// void GetKnnGraphFromFile();
};
}

View File

@ -19,6 +19,7 @@
#include "knowhere/adapter/faiss_adopt.h"
#include "knowhere/index/vector_index/gpu_ivf.h"
#include <algorithm>
namespace zilliz {
namespace knowhere {
@ -33,7 +34,7 @@ IndexModelPtr GPUIVF::Train(const DatasetPtr &dataset, const Config &config) {
auto temp_resource = FaissGpuResourceMgr::GetInstance().GetRes(gpu_device);
if (temp_resource != nullptr) {
ResScope rs(gpu_device, temp_resource );
ResScope rs(gpu_device, temp_resource);
faiss::gpu::GpuIndexIVFFlatConfig idx_config;
idx_config.device = gpu_device;
faiss::gpu::GpuIndexIVFFlat device_index(temp_resource->faiss_res.get(), dim, nlist, metric_type, idx_config);
@ -130,9 +131,9 @@ void GPUIVF::search_impl(int64_t n,
int64_t *labels,
const Config &cfg) {
// TODO(linxj): allocate mem
if (FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_, res_)) {
ResScope rs(gpu_id_, res_);
auto temp_res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_);
if (temp_res) {
ResScope rs(gpu_id_, temp_res);
if (auto device_index = std::static_pointer_cast<faiss::gpu::GpuIndexIVF>(index_)) {
auto nprobe = cfg.get_with_default("nprobe", size_t(1));
@ -143,7 +144,6 @@ void GPUIVF::search_impl(int64_t n,
} else {
KNOWHERE_THROW_MSG("search can't get gpu resource");
}
}
VectorIndexPtr GPUIVF::CopyGpuToCpu(const Config &config) {
@ -229,8 +229,7 @@ IndexModelPtr GPUIVFSQ::Train(const DatasetPtr &dataset, const Config &config) {
delete build_index;
return std::make_shared<IVFIndexModel>(host_index);
}
else {
} else {
KNOWHERE_THROW_MSG("Build IVFSQ can't get gpu resource");
}
}
@ -279,6 +278,10 @@ void FaissGpuResourceMgr::InitDevice(int64_t device_id,
}
void FaissGpuResourceMgr::InitResource() {
if(is_init) return ;
is_init = true;
for(auto& device : devices_params_) {
auto& resource_vec = idle_[device.first];
@ -342,15 +345,29 @@ bool FaissGpuResourceMgr::GetRes(const int64_t &device_id,
void FaissGpuResourceMgr::MoveToInuse(const int64_t &device_id, const ResPtr &res) {
std::lock_guard<std::mutex> lk(mutex_);
RemoveResource(device_id, res, idle_);
in_use_[device_id].push_back(res);
}
void FaissGpuResourceMgr::MoveToIdle(const int64_t &device_id, const ResPtr &res) {
std::lock_guard<std::mutex> lk(mutex_);
RemoveResource(device_id, res, in_use_);
auto it = idle_[device_id].begin();
idle_[device_id].insert(it, res);
}
void
FaissGpuResourceMgr::RemoveResource(const int64_t &device_id,
const ResPtr &res,
std::map<int64_t, std::vector<ResPtr>> &resource_pool) {
if (resource_pool.find(device_id) != resource_pool.end()) {
std::vector<ResPtr> &res_array = resource_pool[device_id];
res_array.erase(std::remove_if(res_array.begin(), res_array.end(),
[&](ResPtr &ptr) { return ptr->id == res->id; }),
res_array.end());
}
}
void FaissGpuResourceMgr::Free() {
for (auto &item : in_use_) {
auto& res_vec = item.second;
@ -363,6 +380,25 @@ void FaissGpuResourceMgr::Free() {
is_init = false;
}
void
FaissGpuResourceMgr::Dump() {
std::cout << "In used resource" << std::endl;
for(auto& item: in_use_) {
std::cout << "device_id: " << item.first << std::endl;
for(auto& elem : item.second) {
std::cout << "resource_id: " << elem->id << std::endl;
}
}
std::cout << "Idle resource" << std::endl;
for(auto& item: idle_) {
std::cout << "device_id: " << item.first << std::endl;
for(auto& elem : item.second) {
std::cout << "resource_id: " << elem->id << std::endl;
}
}
}
void GPUIndex::SetGpuDevice(const int &gpu_id) {
gpu_id_ = gpu_id;
}

View File

@ -722,30 +722,30 @@ void NsgIndex::SetKnnGraph(Graph &g) {
knng = std::move(g);
}
void NsgIndex::GetKnnGraphFromFile() {
//std::string filename = "/home/zilliz/opt/workspace/wook/efanna_graph/tests/sift.1M.50NN.graph";
std::string filename = "/home/zilliz/opt/workspace/wook/efanna_graph/tests/sift.50NN.graph";
std::ifstream in(filename, std::ios::binary);
unsigned k;
in.read((char *) &k, sizeof(unsigned));
in.seekg(0, std::ios::end);
std::ios::pos_type ss = in.tellg();
size_t fsize = (size_t) ss;
size_t num = (unsigned) (fsize / (k + 1) / 4);
in.seekg(0, std::ios::beg);
knng.resize(num);
knng.reserve(num);
unsigned kk = (k + 3) / 4 * 4;
for (size_t i = 0; i < num; i++) {
in.seekg(4, std::ios::cur);
knng[i].resize(k);
knng[i].reserve(kk);
in.read((char *) knng[i].data(), k * sizeof(unsigned));
}
in.close();
}
//void NsgIndex::GetKnnGraphFromFile() {
// //std::string filename = "/home/zilliz/opt/workspace/wook/efanna_graph/tests/sift.1M.50NN.graph";
// std::string filename = "/home/zilliz/opt/workspace/wook/efanna_graph/tests/sift.50NN.graph";
//
// std::ifstream in(filename, std::ios::binary);
// unsigned k;
// in.read((char *) &k, sizeof(unsigned));
// in.seekg(0, std::ios::end);
// std::ios::pos_type ss = in.tellg();
// size_t fsize = (size_t) ss;
// size_t num = (unsigned) (fsize / (k + 1) / 4);
// in.seekg(0, std::ios::beg);
//
// knng.resize(num);
// knng.reserve(num);
// unsigned kk = (k + 3) / 4 * 4;
// for (size_t i = 0; i < num; i++) {
// in.seekg(4, std::ios::cur);
// knng[i].resize(k);
// knng[i].reserve(kk);
// in.read((char *) knng[i].data(), k * sizeof(unsigned));
// }
// in.close();
//}
}
}

View File

@ -78,6 +78,6 @@ install(TARGETS test_ivf DESTINATION unittest)
install(TARGETS test_idmap DESTINATION unittest)
install(TARGETS test_kdt DESTINATION unittest)
add_subdirectory(faiss_ori)
#add_subdirectory(faiss_ori)
add_subdirectory(test_nsg)

View File

@ -148,10 +148,6 @@ TEST_F(IDMAPTest, copy_test) {
{
// cpu to gpu
static int64_t device_id = 0;
FaissGpuResourceMgr::GetInstance().InitDevice(0);
FaissGpuResourceMgr::GetInstance().InitDevice(1);
auto clone_index = CopyCpuToGpu(index_, device_id, Config());
auto clone_result = clone_index->Search(query_dataset, Config::object{{"k", k}});
AssertAnns(clone_result, nq, k);
@ -169,7 +165,7 @@ TEST_F(IDMAPTest, copy_test) {
assert(std::static_pointer_cast<IDMAP>(host_index)->GetRawIds() != nullptr);
// gpu to gpu
auto device_index = CopyCpuToGpu(index_, 1, Config());
auto device_index = CopyCpuToGpu(index_, device_id, Config());
auto device_result = device_index->Search(query_dataset, Config::object{{"k", k}});
AssertAnns(device_result, nq, k);
//assert(std::static_pointer_cast<GPUIDMAP>(device_index)->GetRawVectors() != nullptr);

View File

@ -52,9 +52,9 @@ class IVFTest
void SetUp() override {
std::tie(index_type, preprocess_cfg, train_cfg, add_cfg, search_cfg) = GetParam();
//Init_with_default();
Generate(128, 1000000/5, 10);
Generate(128, 1000000/100, 10);
index_ = IndexFactory(index_type);
FaissGpuResourceMgr::GetInstance().InitDevice(device_id, 1024*1024*200, 1024*1024*300, 2);
FaissGpuResourceMgr::GetInstance().InitDevice(device_id, 1024*1024*200, 1024*1024*600, 2);
}
void TearDown() override {
FaissGpuResourceMgr::GetInstance().Free();
@ -77,21 +77,21 @@ INSTANTIATE_TEST_CASE_P(IVFParameters, IVFTest,
Config::object{{"nlist", 100}, {"metric_type", "L2"}},
Config(),
Config::object{{"k", 10}}),
//std::make_tuple("IVFPQ",
// Config(),
// Config::object{{"nlist", 100}, {"M", 8}, {"nbits", 8}, {"metric_type", "L2"}},
// Config(),
// Config::object{{"k", 10}}),
std::make_tuple("GPUIVF",
std::make_tuple("IVFPQ",
Config(),
Config::object{{"nlist", 1638}, {"gpu_id", device_id}, {"metric_type", "L2"}},
Config::object{{"nlist", 100}, {"M", 8}, {"nbits", 8}, {"metric_type", "L2"}},
Config(),
Config::object{{"k", 10}}),
std::make_tuple("GPUIVF",
Config(),
Config::object{{"nlist", 100}, {"gpu_id", device_id}, {"metric_type", "L2"}},
Config(),
Config::object{{"k", 10}}),
std::make_tuple("GPUIVFPQ",
Config(),
Config::object{{"gpu_id", device_id}, {"nlist", 100}, {"M", 8}, {"nbits", 8}, {"metric_type", "L2"}},
Config(),
Config::object{{"k", 10}}),
//std::make_tuple("GPUIVFPQ",
// Config(),
// Config::object{{"gpu_id", device_id}, {"nlist", 100}, {"M", 8}, {"nbits", 8}, {"metric_type", "L2"}},
// Config(),
// Config::object{{"k", 10}}),
std::make_tuple("IVFSQ",
Config(),
Config::object{{"nlist", 100}, {"nbits", 8}, {"metric_type", "L2"}},
@ -99,7 +99,7 @@ INSTANTIATE_TEST_CASE_P(IVFParameters, IVFTest,
Config::object{{"k", 10}}),
std::make_tuple("GPUIVFSQ",
Config(),
Config::object{{"gpu_id", device_id}, {"nlist", 1638}, {"nbits", 8}, {"metric_type", "L2"}},
Config::object{{"gpu_id", device_id}, {"nlist", 100}, {"nbits", 8}, {"metric_type", "L2"}},
Config(),
Config::object{{"k", 10}})
)
@ -386,8 +386,8 @@ class GPURESTEST
int64_t elems = 0;
};
const int search_count = 100;
const int load_count = 30;
const int search_count = 10;
const int load_count = 3;
TEST_F(GPURESTEST, gpu_ivf_resource_test) {
assert(!xb.empty());

View File

@ -7,6 +7,7 @@
#include <gtest/gtest.h>
#include <memory>
#include "knowhere/index/vector_index/gpu_ivf.h"
#include "knowhere/index/vector_index/nsg_index.h"
#include "knowhere/index/vector_index/nsg/nsg_io.h"
@ -18,15 +19,22 @@ using ::testing::TestWithParam;
using ::testing::Values;
using ::testing::Combine;
constexpr int64_t DEVICE_ID = 0;
class NSGInterfaceTest : public DataGen, public TestWithParam<::std::tuple<Config, Config>> {
protected:
void SetUp() override {
//Init_with_default();
FaissGpuResourceMgr::GetInstance().InitDevice(DEVICE_ID, 1024*1024*200, 1024*1024*600, 2);
Generate(256, 1000000, 1);
index_ = std::make_shared<NSG>();
std::tie(train_cfg, search_cfg) = GetParam();
}
void TearDown() override {
FaissGpuResourceMgr::GetInstance().Free();
}
protected:
std::shared_ptr<NSG> index_;
Config train_cfg;

View File

@ -257,7 +257,7 @@ Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index)
if(!utils::IsSameIndex(old_index, new_index)) {
DropIndex(table_id);
status = meta_ptr_->UpdateTableIndexParam(table_id, new_index);
status = meta_ptr_->UpdateTableIndex(table_id, new_index);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Failed to update table index info for table: " << table_id;
return status;

View File

@ -32,7 +32,7 @@ class Meta {
virtual Status AllTables(std::vector<TableSchema> &table_schema_array) = 0;
virtual Status UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) = 0;
virtual Status UpdateTableIndex(const std::string &table_id, const TableIndex& index) = 0;
virtual Status UpdateTableFlag(const std::string &table_id, int64_t flag) = 0;

View File

@ -179,12 +179,6 @@ Status MySQLMetaImpl::Initialize() {
}
} //Scoped Connection
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR DURING INITIALIZATION", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR DURING INITIALIZATION", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR DURING INITIALIZATION", e.what());
}
@ -240,12 +234,6 @@ Status MySQLMetaImpl::DropPartitionsByDates(const std::string &table_id,
return HandleException("QUERY ERROR WHEN DROPPING PARTITIONS BY DATES", dropPartitionsByDatesQuery.error());
}
} //Scoped Connection
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN DROPPING PARTITIONS BY DATES", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN DROPPING PARTITIONS BY DATES", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN DROPPING PARTITIONS BY DATES", e.what());
}
@ -316,12 +304,6 @@ Status MySQLMetaImpl::CreateTable(TableSchema &table_schema) {
return utils::CreateTablePath(options_, table_schema.table_id_);
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN CREATING TABLE", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN CREATING TABLE", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN CREATING TABLE", e.what());
}
@ -406,12 +388,6 @@ Status MySQLMetaImpl::FilesByType(const std::string &table_id,
<< " index files:" << index_count << " backup files:" << backup_count;
}
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN GET FILE BY TYPE", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN GET FILE BY TYPE", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN GET FILE BY TYPE", e.what());
}
@ -419,7 +395,7 @@ Status MySQLMetaImpl::FilesByType(const std::string &table_id,
return Status::OK();
}
Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) {
Status MySQLMetaImpl::UpdateTableIndex(const std::string &table_id, const TableIndex& index) {
try {
server::MetricCollector metric;
@ -436,7 +412,7 @@ Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const T
"WHERE table_id = " << quote << table_id << " AND " <<
"state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndexParam: " << updateTableIndexParamQuery.str();
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndex: " << updateTableIndexParamQuery.str();
StoreQueryResult res = updateTableIndexParamQuery.store();
@ -453,12 +429,12 @@ Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const T
"state = " << state << ", " <<
"dimension = " << dimension << ", " <<
"created_on = " << created_on << ", " <<
"engine_type_ = " << index.engine_type_ << ", " <<
"engine_type = " << index.engine_type_ << ", " <<
"nlist = " << index.nlist_ << ", " <<
"metric_type = " << index.metric_type_ << " " <<
"WHERE id = " << quote << table_id << ";";
"WHERE table_id = " << quote << table_id << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndexParam: " << updateTableIndexParamQuery.str();
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableIndex: " << updateTableIndexParamQuery.str();
if (!updateTableIndexParamQuery.exec()) {
@ -470,12 +446,6 @@ Status MySQLMetaImpl::UpdateTableIndexParam(const std::string &table_id, const T
} //Scoped Connection
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN UPDATING TABLE INDEX PARAM", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN UPDATING TABLE INDEX PARAM", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN UPDATING TABLE INDEX PARAM", e.what());
}
@ -497,7 +467,7 @@ Status MySQLMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag)
Query updateTableFlagQuery = connectionPtr->query();
updateTableFlagQuery << "UPDATE Tables " <<
"SET flag = " << flag << " " <<
"WHERE id = " << quote << table_id << ";";
"WHERE table_id = " << quote << table_id << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::UpdateTableFlag: " << updateTableFlagQuery.str();
@ -507,12 +477,6 @@ Status MySQLMetaImpl::UpdateTableFlag(const std::string &table_id, int64_t flag)
} //Scoped Connection
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLAG", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLAG", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLAG", e.what());
}
@ -553,12 +517,6 @@ Status MySQLMetaImpl::DescribeTableIndex(const std::string &table_id, TableIndex
} //Scoped Connection
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN DESCRIBE TABLE INDEX", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN DESCRIBE TABLE INDEX", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN UPDATING TABLE FLAG", e.what());
}
@ -608,7 +566,7 @@ Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
//set table index type to raw
dropTableIndexQuery << "UPDATE Tables " <<
"SET engine_type = " << std::to_string(DEFAULT_ENGINE_TYPE) << "," <<
"nlist = " << std::to_string(DEFAULT_NLIST) << " " <<
"nlist = " << std::to_string(DEFAULT_NLIST) << ", " <<
"metric_type = " << std::to_string(DEFAULT_METRIC_TYPE) << " " <<
"WHERE table_id = " << quote << table_id << ";";
@ -620,12 +578,6 @@ Status MySQLMetaImpl::DropTableIndex(const std::string &table_id) {
} //Scoped Connection
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN DROPPING TABLE INDEX", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN DROPPING TABLE INDEX", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN DROPPING TABLE INDEX", e.what());
}
@ -662,12 +614,6 @@ Status MySQLMetaImpl::DeleteTable(const std::string &table_id) {
DeleteTableFiles(table_id);
}
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN DELETING TABLE", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN DELETING TABLE", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN DELETING TABLE", e.what());
}
@ -700,12 +646,6 @@ Status MySQLMetaImpl::DeleteTableFiles(const std::string &table_id) {
return HandleException("QUERY ERROR WHEN DELETING TABLE FILES", deleteTableFilesQuery.error());
}
} //Scoped Connection
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN DELETING TABLE FILES", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN DELETING TABLE FILES", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN DELETING TABLE FILES", e.what());
}
@ -725,7 +665,8 @@ Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
}
Query describeTableQuery = connectionPtr->query();
describeTableQuery << "SELECT id, state, dimension, engine_type, nlist, index_file_size, metric_type " <<
describeTableQuery << "SELECT id, state, dimension, created_on, " <<
"flag, index_file_size, engine_type, nlist, metric_type " <<
"FROM Tables " <<
"WHERE table_id = " << quote << table_schema.table_id_ << " " <<
"AND state <> " << std::to_string(TableSchema::TO_DELETE) << ";";
@ -744,6 +685,10 @@ Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
table_schema.dimension_ = resRow["dimension"];
table_schema.created_on_ = resRow["created_on"];
table_schema.flag_ = resRow["flag"];
table_schema.index_file_size_ = resRow["index_file_size"];
table_schema.engine_type_ = resRow["engine_type"];
@ -755,12 +700,6 @@ Status MySQLMetaImpl::DescribeTable(TableSchema &table_schema) {
return Status(DB_NOT_FOUND, "Table " + table_schema.table_id_ + " not found");
}
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN DESCRIBING TABLE", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN DESCRIBING TABLE", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN DESCRIBING TABLE", e.what());
}
@ -795,12 +734,6 @@ Status MySQLMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
int check = res[0]["check"];
has_or_not = (check == 1);
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN CHECKING IF TABLE EXISTS", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN CHECKING IF TABLE EXISTS", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN CHECKING IF TABLE EXISTS", e.what());
}
@ -850,12 +783,6 @@ Status MySQLMetaImpl::AllTables(std::vector<TableSchema> &table_schema_array) {
table_schema_array.emplace_back(table_schema);
}
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN DESCRIBING ALL TABLES", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN DESCRIBING ALL TABLES", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN DESCRIBING ALL TABLES", e.what());
}
@ -927,12 +854,6 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
return utils::CreateTableFilePath(options_, file_schema);
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN CREATING TABLE FILE", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN CREATING TABLE FILE", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN CREATING TABLE FILE", e.what());
}
@ -1007,12 +928,6 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
files.push_back(table_file);
}
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO INDEX", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO INDEX", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO INDEX", e.what());
}
@ -1121,12 +1036,6 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
files[table_file.date_].push_back(table_file);
}
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", e.what());
}
@ -1214,12 +1123,6 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
files[table_file.date_].push_back(table_file);
}
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO MERGE", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO MERGE", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO MERGE", e.what());
}
@ -1304,12 +1207,6 @@ Status MySQLMetaImpl::GetTableFiles(const std::string &table_id,
table_files.emplace_back(file_schema);
}
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN RETRIEVING TABLE FILES", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN RETRIEVING TABLE FILES", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN RETRIEVING TABLE FILES", e.what());
}
@ -1350,12 +1247,6 @@ Status MySQLMetaImpl::Archive() {
return HandleException("QUERY ERROR DURING ARCHIVE", archiveQuery.error());
}
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN DURING ARCHIVE", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN DURING ARCHIVE", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN DURING ARCHIVE", e.what());
}
@ -1400,12 +1291,6 @@ Status MySQLMetaImpl::Size(uint64_t &result) {
result = res[0]["sum"];
}
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN RETRIEVING SIZE", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN RETRIEVING SIZE", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN RETRIEVING SIZE", e.what());
}
@ -1476,12 +1361,6 @@ Status MySQLMetaImpl::DiscardFiles(long long to_discard_size) {
return DiscardFiles(to_discard_size);
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN DISCARDING FILES", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN DISCARDING FILES", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN DISCARDING FILES", e.what());
}
@ -1551,14 +1430,6 @@ Status MySQLMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
}
} //Scoped Connection
} catch (const BadQuery &e) {
// Handle any query errors
ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_;
return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILE", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
ENGINE_LOG_DEBUG << "table_id= " << file_schema.table_id_ << " file_id=" << file_schema.file_id_;
return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILE", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILE", e.what());
}
@ -1587,12 +1458,6 @@ Status MySQLMetaImpl::UpdateTableFilesToIndex(const std::string &table_id) {
return HandleException("QUERY ERROR WHEN UPDATING TABLE FILE TO INDEX", updateTableFilesToIndexQuery.error());
}
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES TO INDEX", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES TO INDEX", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES TO INDEX", e.what());
}
@ -1671,12 +1536,6 @@ Status MySQLMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
}
} //Scoped Connection
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN UPDATING TABLE FILES", e.what());
}
@ -1755,12 +1614,6 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
}
} //Scoped Connection
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", e.what());
}
@ -1810,12 +1663,6 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
}
} //Scoped Connection
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
}
@ -1847,12 +1694,6 @@ Status MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
}
}
}
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN CLEANING UP TABLES WITH TTL", e.what());
}
@ -1892,12 +1733,6 @@ Status MySQLMetaImpl::CleanUp() {
}
}
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN CLEANING UP FILES", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN CLEANING UP FILES", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN CLEANING UP FILES", e.what());
}
@ -1927,7 +1762,7 @@ Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) {
Query countQuery = connectionPtr->query();
countQuery << "SELECT size " <<
countQuery << "SELECT row_count " <<
"FROM TableFiles " <<
"WHERE table_id = " << quote << table_id << " AND " <<
"(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
@ -1941,26 +1776,10 @@ Status MySQLMetaImpl::Count(const std::string &table_id, uint64_t &result) {
result = 0;
for (auto &resRow : res) {
size_t size = resRow["size"];
size_t size = resRow["row_count"];
result += size;
}
if (table_schema.dimension_ <= 0) {
std::stringstream errorMsg;
errorMsg << "MySQLMetaImpl::Count: " << "table dimension = " << std::to_string(table_schema.dimension_)
<< ", table_id = " << table_id;
ENGINE_LOG_ERROR << errorMsg.str();
return Status(DB_ERROR, errorMsg.str());
}
result /= table_schema.dimension_;
result /= sizeof(float);
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN RETRIEVING COUNT", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN RETRIEVING COUNT", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN RETRIEVING COUNT", e.what());
}
@ -1987,12 +1806,6 @@ Status MySQLMetaImpl::DropAll() {
} else {
return HandleException("QUERY ERROR WHEN DROPPING ALL", dropTableQuery.error());
}
} catch (const BadQuery &e) {
// Handle any query errors
return HandleException("GENERAL ERROR WHEN DROPPING ALL", e.what());
} catch (const Exception &e) {
// Catch-all for any other MySQL++ exceptions
return HandleException("GENERAL ERROR WHEN DROPPING ALL", e.what());
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN DROPPING ALL", e.what());
}

View File

@ -51,7 +51,7 @@ class MySQLMetaImpl : public Meta {
const std::vector<int> &file_types,
std::vector<std::string> &file_ids) override;
Status UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) override;
Status UpdateTableIndex(const std::string &table_id, const TableIndex& index) override;
Status UpdateTableFlag(const std::string &table_id, int64_t flag);

View File

@ -192,8 +192,6 @@ Status SqliteMetaImpl::CreateTable(TableSchema &table_schema) {
} catch (std::exception &e) {
return HandleException("Encounter exception when create table", e.what());
}
return Status::OK();
}
Status SqliteMetaImpl::DeleteTable(const std::string& table_id) {
@ -341,7 +339,7 @@ Status SqliteMetaImpl::FilesByType(const std::string& table_id,
return Status::OK();
}
Status SqliteMetaImpl::UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) {
Status SqliteMetaImpl::UpdateTableIndex(const std::string &table_id, const TableIndex& index) {
try {
server::MetricCollector metric;

View File

@ -46,7 +46,7 @@ class SqliteMetaImpl : public Meta {
const std::vector<int> &file_types,
std::vector<std::string> &file_ids) override;
Status UpdateTableIndexParam(const std::string &table_id, const TableIndex& index) override;
Status UpdateTableIndex(const std::string &table_id, const TableIndex& index) override;
Status UpdateTableFlag(const std::string &table_id, int64_t flag) override;

View File

@ -50,7 +50,17 @@ std::shared_ptr<IScheduleTask> IndexLoadTask::Execute() {
file_->nlist_);
try {
index_ptr->Load();
auto stat = index_ptr->Load();
if(!stat.ok()) {
//typical error: file not available
ENGINE_LOG_ERROR << "Failed to load index file: file not available";
for(auto& context : search_contexts_) {
context->IndexSearchDone(file_->id_);//mark as done avoid dead lock, even failed
}
return nullptr;
}
} catch (std::exception& ex) {
//typical error: out of disk space or permition denied
std::string msg = "Failed to load index file: " + std::string(ex.what());

View File

@ -59,7 +59,7 @@ ResourceMgr::Add(ResourcePtr &&resource) {
return ret;
}
void
bool
ResourceMgr::Connect(const std::string &name1, const std::string &name2, Connection &connection) {
auto res1 = GetResource(name1);
auto res2 = GetResource(name2);
@ -67,7 +67,9 @@ ResourceMgr::Connect(const std::string &name1, const std::string &name2, Connect
res1->AddNeighbour(std::static_pointer_cast<Node>(res2), connection);
// TODO: enable when task balance supported
// res2->AddNeighbour(std::static_pointer_cast<Node>(res1), connection);
return true;
}
return false;
}
void
@ -78,7 +80,7 @@ ResourceMgr::Clear() {
}
std::vector<ResourcePtr>
ResourceMgr::GetComputeResource() {
ResourceMgr::GetComputeResources() {
std::vector<ResourcePtr> result;
for (auto &resource : resources_) {
if (resource->HasExecutor()) {
@ -109,7 +111,12 @@ ResourceMgr::GetResource(const std::string &name) {
}
uint64_t
ResourceMgr::GetNumOfComputeResource() {
ResourceMgr::GetNumOfResource() const {
return resources_.size();
}
uint64_t
ResourceMgr::GetNumOfComputeResource() const {
uint64_t count = 0;
for (auto &res : resources_) {
if (res->HasExecutor()) {

View File

@ -35,7 +35,7 @@ public:
ResourceWPtr
Add(ResourcePtr &&resource);
void
bool
Connect(const std::string &res1, const std::string &res2, Connection &connection);
void
@ -60,7 +60,7 @@ public:
}
std::vector<ResourcePtr>
GetComputeResource();
GetComputeResources();
ResourcePtr
GetResource(ResourceType type, uint64_t device_id);
@ -69,7 +69,10 @@ public:
GetResource(const std::string &name);
uint64_t
GetNumOfComputeResource();
GetNumOfResource() const;
uint64_t
GetNumOfComputeResource() const;
uint64_t
GetNumGpuResource() const;

View File

@ -36,7 +36,8 @@ StartSchedulerService() {
auto type = resconf.GetValue(server::CONFIG_RESOURCE_TYPE);
// auto memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_MEMORY);
auto device_id = resconf.GetInt64Value(server::CONFIG_RESOURCE_DEVICE_ID);
auto enable_loader = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_LOADER);
// auto enable_loader = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_LOADER);
auto enable_loader = true;
auto enable_executor = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_EXECUTOR);
auto pinned_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_PIN_MEMORY);
auto temp_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_TEMP_MEMORY);
@ -81,7 +82,9 @@ StartSchedulerService() {
}
} catch (const char* msg) {
SERVER_LOG_ERROR << msg;
// TODO: throw exception instead
exit(-1);
// throw std::exception();
}
ResMgrInst::GetInstance()->Start();

View File

@ -108,6 +108,7 @@ void
Scheduler::OnFinishTask(const EventPtr &event) {
}
// TODO: refactor the function
void
Scheduler::OnLoadCompleted(const EventPtr &event) {
auto load_completed_event = std::static_pointer_cast<LoadCompletedEvent>(event);
@ -120,18 +121,23 @@ Scheduler::OnLoadCompleted(const EventPtr &event) {
if (not resource->HasExecutor() && load_completed_event->task_table_item_->Move()) {
auto task = load_completed_event->task_table_item_->task;
auto search_task = std::static_pointer_cast<XSearchTask>(task);
auto location = search_task->index_engine_->GetLocation();
bool moved = false;
for (auto i = 0; i < res_mgr_.lock()->GetNumGpuResource(); ++i) {
auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
if (index != nullptr) {
moved = true;
auto dest_resource = res_mgr_.lock()->GetResource(ResourceType::GPU, i);
Action::PushTaskToResource(load_completed_event->task_table_item_->task, dest_resource);
break;
// to support test task, REFACTOR
if (auto index_engine = search_task->index_engine_) {
auto location = index_engine->GetLocation();
for (auto i = 0; i < res_mgr_.lock()->GetNumGpuResource(); ++i) {
auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
if (index != nullptr) {
moved = true;
auto dest_resource = res_mgr_.lock()->GetResource(ResourceType::GPU, i);
Action::PushTaskToResource(load_completed_event->task_table_item_->task, dest_resource);
break;
}
}
}
if (not moved) {
Action::PushTaskToNeighbourRandomly(task, resource);
}
@ -145,9 +151,9 @@ Scheduler::OnLoadCompleted(const EventPtr &event) {
// if this resource is disk, assign it to smallest cost resource
if (self->type() == ResourceType::DISK) {
// step 1: calculate shortest path per resource, from disk to compute resource
auto compute_resources = res_mgr_.lock()->GetComputeResource();
auto compute_resources = res_mgr_.lock()->GetComputeResources();
std::vector<std::vector<std::string>> paths;
std::vector<uint64_t > transport_costs;
std::vector<uint64_t> transport_costs;
for (auto &res : compute_resources) {
std::vector<std::string> path;
uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path);
@ -176,7 +182,7 @@ Scheduler::OnLoadCompleted(const EventPtr &event) {
task->path() = task_path;
}
if(self->name() == task->path().Last()) {
if (self->name() == task->path().Last()) {
self->WakeupLoader();
} else {
auto next_res_name = task->path().Next();

View File

@ -21,6 +21,7 @@ namespace milvus {
namespace engine {
// TODO: refactor, not friendly to unittest, logical in framework code
class Scheduler {
public:
explicit

View File

@ -136,7 +136,7 @@ std::vector<uint64_t>
TaskTable::PickToLoad(uint64_t limit) {
std::vector<uint64_t> indexes;
bool cross = false;
for (uint64_t i = last_finish_, count = 0; i < table_.size() && count < limit; ++i) {
for (uint64_t i = last_finish_ + 1, count = 0; i < table_.size() && count < limit; ++i) {
if (not cross && table_[i]->IsFinish()) {
last_finish_ = i;
} else if (table_[i]->state == TaskTableItemState::START) {
@ -152,7 +152,7 @@ std::vector<uint64_t>
TaskTable::PickToExecute(uint64_t limit) {
std::vector<uint64_t> indexes;
bool cross = false;
for (uint64_t i = last_finish_, count = 0; i < table_.size() && count < limit; ++i) {
for (uint64_t i = last_finish_ + 1, count = 0; i < table_.size() && count < limit; ++i) {
if (not cross && table_[i]->IsFinish()) {
last_finish_ = i;
} else if (table_[i]->state == TaskTableItemState::LOADED) {
@ -200,15 +200,15 @@ TaskTable::Get(uint64_t index) {
return table_[index];
}
void
TaskTable::Clear() {
// find first task is NOT (done or moved), erase from begin to it;
// auto iterator = table_.begin();
// while (iterator->state == TaskTableItemState::EXECUTED or
// iterator->state == TaskTableItemState::MOVED)
// iterator++;
// table_.erase(table_.begin(), iterator);
}
//void
//TaskTable::Clear() {
//// find first task is NOT (done or moved), erase from begin to it;
//// auto iterator = table_.begin();
//// while (iterator->state == TaskTableItemState::EXECUTED or
//// iterator->state == TaskTableItemState::MOVED)
//// iterator++;
//// table_.erase(table_.begin(), iterator);
//}
std::string

View File

@ -40,10 +40,10 @@ struct TaskTimestamp {
};
struct TaskTableItem {
TaskTableItem() : id(0), state(TaskTableItemState::INVALID), mutex() {}
TaskTableItem() : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex() {}
TaskTableItem(const TaskTableItem &src)
: id(src.id), state(src.state), mutex() {}
TaskTableItem(const TaskTableItem &src) = delete;
TaskTableItem(TaskTableItem &&) = delete;
uint64_t id; // auto increment from 0;
TaskPtr task; // the task;
@ -114,8 +114,8 @@ public:
* Remove sequence task which is DONE or MOVED from front;
* Called by ?
*/
void
Clear();
// void
// Clear();
/*
* Return true if task table empty, otherwise false;
@ -229,7 +229,9 @@ private:
std::function<void(void)> subscriber_ = nullptr;
// cache last finish avoid Pick task from begin always
uint64_t last_finish_ = 0;
// pick from (last_finish_ + 1)
// init with -1, pick from (last_finish_ + 1) = 0
uint64_t last_finish_ = -1;
};

View File

@ -32,10 +32,8 @@ public:
return type_;
}
inline virtual std::string
Dump() const {
return "<Event>";
}
virtual std::string
Dump() const = 0;
friend std::ostream &operator<<(std::ostream &out, const Event &event);

View File

@ -17,27 +17,6 @@ Node::Node() {
id_ = counter++;
}
void Node::DelNeighbour(const NeighbourNodePtr &neighbour_ptr) {
std::lock_guard<std::mutex> lk(mutex_);
if (auto s = neighbour_ptr.lock()) {
auto search = neighbours_.find(s->id_);
if (search != neighbours_.end()) {
neighbours_.erase(search);
}
}
}
bool Node::IsNeighbour(const NeighbourNodePtr &neighbour_ptr) {
std::lock_guard<std::mutex> lk(mutex_);
if (auto s = neighbour_ptr.lock()) {
auto search = neighbours_.find(s->id_);
if (search != neighbours_.end()) {
return true;
}
}
return false;
}
std::vector<Neighbour> Node::GetNeighbours() {
std::lock_guard<std::mutex> lk(mutex_);
std::vector<Neighbour> ret;
@ -48,8 +27,13 @@ std::vector<Neighbour> Node::GetNeighbours() {
}
std::string Node::Dump() {
// TODO(linxj): what's that?
return std::__cxx11::string();
std::stringstream ss;
ss << "<Node, id=" << std::to_string(id_) << ">::neighbours:" << std::endl;
for (auto &neighbour : neighbours_) {
ss << "\t<Neighbour, id=" << std::to_string(neighbour.first);
ss << ", connection: " << neighbour.second.connection.Dump() << ">" << std::endl;
}
return ss.str();
}
void Node::AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) {

View File

@ -37,12 +37,6 @@ public:
void
AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection);
void
DelNeighbour(const NeighbourNodePtr &neighbour_ptr);
bool
IsNeighbour(const NeighbourNodePtr& neighbour_ptr);
std::vector<Neighbour>
GetNeighbours();

View File

@ -31,7 +31,8 @@ namespace engine {
enum class ResourceType {
DISK = 0,
CPU = 1,
GPU = 2
GPU = 2,
TEST = 3,
};
class Resource : public Node, public std::enable_shared_from_this<Resource> {
@ -126,7 +127,6 @@ public:
bool enable_loader,
bool enable_executor);
// TODO: SearchContextPtr to TaskPtr
/*
* Implementation by inherit class;
* Blocking function;
@ -142,11 +142,6 @@ public:
Process(TaskPtr task) = 0;
private:
/*
* These function should move to cost.h ???
* COST.H ???
*/
/*
* Pick one task to load;
* Order by start time;

View File

@ -0,0 +1,33 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "TestResource.h"
namespace zilliz {
namespace milvus {
namespace engine {
std::ostream &operator<<(std::ostream &out, const TestResource &resource) {
out << resource.Dump();
return out;
}
TestResource::TestResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::TEST, device_id, enable_loader, enable_executor) {
}
void TestResource::LoadFile(TaskPtr task) {
task->Load(LoadType::TEST, 0);
}
void TestResource::Process(TaskPtr task) {
task->Execute();
}
}
}
}

View File

@ -0,0 +1,38 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "Resource.h"
namespace zilliz {
namespace milvus {
namespace engine {
class TestResource : public Resource {
public:
explicit
TestResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
inline std::string
Dump() const override {
return "<TestResource, name=" + name_ + ">";
}
friend std::ostream &operator<<(std::ostream &out, const TestResource &resource);
protected:
void
LoadFile(TaskPtr task) override;
void
Process(TaskPtr task) override;
};
}
}
}

View File

@ -20,47 +20,47 @@ namespace engine {
static constexpr size_t PARALLEL_REDUCE_THRESHOLD = 10000;
static constexpr size_t PARALLEL_REDUCE_BATCH = 1000;
bool
NeedParallelReduce(uint64_t nq, uint64_t topk) {
server::ServerConfig &config = server::ServerConfig::GetInstance();
server::ConfigNode &db_config = config.GetConfig(server::CONFIG_DB);
bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, false);
if (!need_parallel) {
return false;
}
return nq * topk >= PARALLEL_REDUCE_THRESHOLD;
}
void
ParallelReduce(std::function<void(size_t, size_t)> &reduce_function, size_t max_index) {
size_t reduce_batch = PARALLEL_REDUCE_BATCH;
auto thread_count = std::thread::hardware_concurrency() - 1; //not all core do this work
if (thread_count > 0) {
reduce_batch = max_index / thread_count + 1;
}
ENGINE_LOG_DEBUG << "use " << thread_count <<
" thread parallelly do reduce, each thread process " << reduce_batch << " vectors";
std::vector<std::shared_ptr<std::thread> > thread_array;
size_t from_index = 0;
while (from_index < max_index) {
size_t to_index = from_index + reduce_batch;
if (to_index > max_index) {
to_index = max_index;
}
auto reduce_thread = std::make_shared<std::thread>(reduce_function, from_index, to_index);
thread_array.push_back(reduce_thread);
from_index = to_index;
}
for (auto &thread_ptr : thread_array) {
thread_ptr->join();
}
}
//bool
//NeedParallelReduce(uint64_t nq, uint64_t topk) {
// server::ServerConfig &config = server::ServerConfig::GetInstance();
// server::ConfigNode &db_config = config.GetConfig(server::CONFIG_DB);
// bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, false);
// if (!need_parallel) {
// return false;
// }
//
// return nq * topk >= PARALLEL_REDUCE_THRESHOLD;
//}
//
//void
//ParallelReduce(std::function<void(size_t, size_t)> &reduce_function, size_t max_index) {
// size_t reduce_batch = PARALLEL_REDUCE_BATCH;
//
// auto thread_count = std::thread::hardware_concurrency() - 1; //not all core do this work
// if (thread_count > 0) {
// reduce_batch = max_index / thread_count + 1;
// }
// ENGINE_LOG_DEBUG << "use " << thread_count <<
// " thread parallelly do reduce, each thread process " << reduce_batch << " vectors";
//
// std::vector<std::shared_ptr<std::thread> > thread_array;
// size_t from_index = 0;
// while (from_index < max_index) {
// size_t to_index = from_index + reduce_batch;
// if (to_index > max_index) {
// to_index = max_index;
// }
//
// auto reduce_thread = std::make_shared<std::thread>(reduce_function, from_index, to_index);
// thread_array.push_back(reduce_thread);
//
// from_index = to_index;
// }
//
// for (auto &thread_ptr : thread_array) {
// thread_ptr->join();
// }
//}
void
CollectFileMetrics(int file_type, size_t file_size) {
@ -83,33 +83,44 @@ CollectFileMetrics(int file_type, size_t file_size) {
XSearchTask::XSearchTask(TableFileSchemaPtr file)
: Task(TaskType::SearchTask), file_(file) {
index_engine_ = EngineFactory::Build(file_->dimension_,
file_->location_,
(EngineType) file_->engine_type_,
(MetricType) file_->metric_type_,
file_->nlist_);
if (file_) {
index_engine_ = EngineFactory::Build(file_->dimension_,
file_->location_,
(EngineType) file_->engine_type_,
(MetricType) file_->metric_type_,
file_->nlist_);
}
}
void
XSearchTask::Load(LoadType type, uint8_t device_id) {
server::TimeRecorder rc("");
Status stat = Status::OK();
std::string error_msg;
try {
if (type == LoadType::DISK2CPU) {
index_engine_->Load();
stat = index_engine_->Load();
} else if (type == LoadType::CPU2GPU) {
index_engine_->CopyToGpu(device_id);
stat = index_engine_->CopyToGpu(device_id);
} else if (type == LoadType::GPU2CPU) {
index_engine_->CopyToCpu();
stat = index_engine_->CopyToCpu();
} else {
// TODO: exception
std::string msg = "Wrong load type";
ENGINE_LOG_ERROR << msg;
error_msg = "Wrong load type";
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
} catch (std::exception &ex) {
//typical error: out of disk space or permition denied
std::string msg = "Failed to load index file: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
error_msg = "Failed to load index file: " + std::string(ex.what());
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
if (!stat.ok()) {
if (error_msg.empty())
error_msg = std::string("Failed to load index file: file not available");
//typical error: file not available
ENGINE_LOG_ERROR << error_msg;
for (auto &context : search_contexts_) {
context->IndexSearchDone(file_->id_);//mark as done avoid dead lock, even failed
@ -225,11 +236,11 @@ Status XSearchTask::ClusterResult(const std::vector<long> &output_ids,
}
};
if (NeedParallelReduce(nq, topk)) {
ParallelReduce(reduce_worker, nq);
} else {
reduce_worker(0, nq);
}
// if (NeedParallelReduce(nq, topk)) {
// ParallelReduce(reduce_worker, nq);
// } else {
reduce_worker(0, nq);
// }
return Status::OK();
}
@ -330,11 +341,11 @@ Status XSearchTask::TopkResult(SearchContext::ResultSet &result_src,
}
};
if (NeedParallelReduce(result_src.size(), topk)) {
ParallelReduce(ReduceWorker, result_src.size());
} else {
ReduceWorker(0, result_src.size());
}
// if (NeedParallelReduce(result_src.size(), topk)) {
// ParallelReduce(ReduceWorker, result_src.size());
// } else {
ReduceWorker(0, result_src.size());
// }
return Status::OK();
}

View File

@ -12,6 +12,7 @@ namespace zilliz {
namespace milvus {
namespace engine {
// TODO: rewrite
class XSearchTask : public Task {
public:
explicit

View File

@ -22,6 +22,7 @@ enum class LoadType {
DISK2CPU,
CPU2GPU,
GPU2CPU,
TEST,
};
enum class TaskType {
@ -34,6 +35,7 @@ class Task;
using TaskPtr = std::shared_ptr<Task>;
// TODO: re-design
class Task {
public:
explicit

View File

@ -13,7 +13,7 @@ namespace milvus {
namespace engine {
TestTask::TestTask(TableFileSchemaPtr& file) : XSearchTask(file) {}
TestTask::TestTask(TableFileSchemaPtr &file) : XSearchTask(file) {}
void
TestTask::Load(LoadType type, uint8_t device_id) {
@ -22,9 +22,12 @@ TestTask::Load(LoadType type, uint8_t device_id) {
void
TestTask::Execute() {
std::lock_guard<std::mutex> lock(mutex_);
exec_count_++;
done_ = true;
{
std::lock_guard<std::mutex> lock(mutex_);
exec_count_++;
done_ = true;
}
cv_.notify_one();
}
void

View File

@ -61,9 +61,9 @@ ServerConfig::LoadConfigFile(const std::string& config_filename) {
ErrorCode ServerConfig::ValidateConfig() const {
//server config validation
ConfigNode server_config = GetConfig(CONFIG_SERVER);
uint32_t gpu_index = (uint32_t)server_config.GetInt32Value(CONFIG_GPU_INDEX, 0);
if(ValidationUtil::ValidateGpuIndex(gpu_index) != SERVER_SUCCESS) {
std::cout << "Error: invalid gpu_index " << std::to_string(gpu_index) << std::endl;
uint32_t build_index_gpu_index = (uint32_t)server_config.GetInt32Value(CONFIG_GPU_INDEX, 0);
if(ValidationUtil::ValidateGpuIndex(build_index_gpu_index) != SERVER_SUCCESS) {
std::cerr << "Error: invalid gpu_index " << std::to_string(build_index_gpu_index) << std::endl;
return SERVER_INVALID_ARGUMENT;
}
@ -75,7 +75,7 @@ ErrorCode ServerConfig::ValidateConfig() const {
uint64_t insert_buffer_size = (uint64_t)db_config.GetInt32Value(CONFIG_DB_INSERT_BUFFER_SIZE, 4);
insert_buffer_size *= GB;
if(insert_buffer_size >= total_mem) {
std::cout << "Error: insert_buffer_size execeed system memory" << std::endl;
std::cerr << "Error: insert_buffer_size execeed system memory" << std::endl;
return SERVER_INVALID_ARGUMENT;
}
@ -84,20 +84,51 @@ ErrorCode ServerConfig::ValidateConfig() const {
uint64_t cache_cap = (uint64_t)cache_config.GetInt64Value(CONFIG_CPU_CACHE_CAPACITY, 16);
cache_cap *= GB;
if(cache_cap >= total_mem) {
std::cout << "Error: cpu_cache_capacity execeed system memory" << std::endl;
std::cerr << "Error: cpu_cache_capacity execeed system memory" << std::endl;
return SERVER_INVALID_ARGUMENT;
} if(cache_cap > (double)total_mem*0.9) {
std::cout << "Warnning: cpu_cache_capacity value is too aggressive" << std::endl;
std::cerr << "Warning: cpu_cache_capacity value is too aggressive" << std::endl;
}
if(insert_buffer_size + cache_cap >= total_mem) {
std::cout << "Error: sum of cpu_cache_capacity and insert_buffer_size execeed system memory" << std::endl;
std::cerr << "Error: sum of cpu_cache_capacity and insert_buffer_size execeed system memory" << std::endl;
return SERVER_INVALID_ARGUMENT;
}
double free_percent = cache_config.GetDoubleValue(server::CACHE_FREE_PERCENT, 0.85);
if(free_percent < std::numeric_limits<double>::epsilon() || free_percent > 1.0) {
std::cout << "Error: invalid cache_free_percent " << std::to_string(free_percent) << std::endl;
std::cerr << "Error: invalid cache_free_percent " << std::to_string(free_percent) << std::endl;
return SERVER_INVALID_ARGUMENT;
}
// Resource config validation
server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE);
if (config.GetChildren().empty()) {
std::cerr << "Error: no context under resource" << std::endl;
return SERVER_INVALID_ARGUMENT;
}
auto resources = config.GetChild(server::CONFIG_RESOURCES).GetChildren();
if (resources.empty()) {
std::cerr << "Children of resource_config null exception" << std::endl;
return SERVER_INVALID_ARGUMENT;
}
bool resource_valid_flag = false;
for (auto &resource : resources) {
auto &resconf = resource.second;
auto type = resconf.GetValue(server::CONFIG_RESOURCE_TYPE);
if(type == "GPU") {
auto device_id = resconf.GetInt64Value(server::CONFIG_RESOURCE_DEVICE_ID, 0);
if(device_id == build_index_gpu_index) {
resource_valid_flag = true;
}
}
}
if(!resource_valid_flag) {
std::cerr << "Building index GPU can't be found in resource config." << std::endl;
return SERVER_INVALID_ARGUMENT;
}

View File

@ -953,8 +953,18 @@ DropIndexTask::OnExecute() {
return SetError(res, "Invalid table name: " + table_name_);
}
bool has_table = false;
auto stat = DBWrapper::DB()->HasTable(table_name_, has_table);
if (!stat.ok()) {
return SetError(DB_META_TRANSACTION_FAILED, stat.ToString());
}
if (!has_table) {
return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
}
//step 2: check table existence
auto stat = DBWrapper::DB()->DropIndex(table_name_);
stat = DBWrapper::DB()->DropIndex(table_name_);
if (!stat.ok()) {
return SetError(DB_META_TRANSACTION_FAILED, stat.ToString());
}

View File

@ -78,6 +78,7 @@ constexpr ErrorCode DB_INVALID_PATH = ToDbErrorCode(5);
constexpr ErrorCode KNOWHERE_ERROR = ToKnowhereErrorCode(1);
constexpr ErrorCode KNOWHERE_INVALID_ARGUMENT = ToKnowhereErrorCode(2);
constexpr ErrorCode KNOWHERE_UNEXPECTED_ERROR = ToKnowhereErrorCode(3);
constexpr ErrorCode KNOWHERE_NO_SPACE = ToKnowhereErrorCode(4);
namespace server {
class ServerException : public std::exception {

View File

@ -8,9 +8,9 @@ namespace zilliz {
namespace milvus {
namespace server {
constexpr size_t table_name_size_limit = 255;
constexpr int64_t table_dimension_limit = 16384;
constexpr int32_t index_file_size_limit = 4096; //index trigger size max = 4096 MB
constexpr size_t TABLE_NAME_SIZE_LIMIT = 255;
constexpr int64_t TABLE_DIMENSION_LIMIT = 16384;
constexpr int32_t INDEX_FILE_SIZE_LIMIT = 4096; //index trigger size max = 4096 MB
ErrorCode
ValidationUtil::ValidateTableName(const std::string &table_name) {
@ -22,7 +22,7 @@ ValidationUtil::ValidateTableName(const std::string &table_name) {
}
// Table name size shouldn't exceed 16384.
if (table_name.size() > table_name_size_limit) {
if (table_name.size() > TABLE_NAME_SIZE_LIMIT) {
SERVER_LOG_ERROR << "Table name size exceed the limitation";
return SERVER_INVALID_TABLE_NAME;
}
@ -48,8 +48,8 @@ ValidationUtil::ValidateTableName(const std::string &table_name) {
ErrorCode
ValidationUtil::ValidateTableDimension(int64_t dimension) {
if (dimension <= 0 || dimension > table_dimension_limit) {
SERVER_LOG_ERROR << "Table dimension excceed the limitation: " << table_dimension_limit;
if (dimension <= 0 || dimension > TABLE_DIMENSION_LIMIT) {
SERVER_LOG_ERROR << "Table dimension excceed the limitation: " << TABLE_DIMENSION_LIMIT;
return SERVER_INVALID_VECTOR_DIMENSION;
} else {
return SERVER_SUCCESS;
@ -77,7 +77,7 @@ ValidationUtil::ValidateTableIndexNlist(int32_t nlist) {
ErrorCode
ValidationUtil::ValidateTableIndexFileSize(int64_t index_file_size) {
if(index_file_size <= 0 || index_file_size > index_file_size_limit) {
if(index_file_size <= 0 || index_file_size > INDEX_FILE_SIZE_LIMIT) {
return SERVER_INVALID_INDEX_FILE_SIZE;
}

View File

@ -38,6 +38,9 @@ public:
static ErrorCode
GetGpuMemory(uint32_t gpu_index, size_t &memory);
static ErrorCode
ValidateConfig();
};
}

View File

@ -139,7 +139,11 @@ VecIndexPtr read_index(const std::string &location) {
knowhere::BinarySet load_data_list;
FileIOReader reader(location);
reader.fs.seekg(0, reader.fs.end);
size_t length = reader.fs.tellg();
int64_t length = reader.fs.tellg();
if (length <= 0) {
return nullptr;
}
reader.fs.seekg(0);
size_t rp = 0;
@ -197,7 +201,13 @@ ErrorCode write_index(VecIndexPtr index, const std::string &location) {
return KNOWHERE_UNEXPECTED_ERROR;
} catch (std::exception &e) {
WRAPPER_LOG_ERROR << e.what();
return KNOWHERE_ERROR;
std::string estring(e.what());
if (estring.find("No space left on device") != estring.npos) {
WRAPPER_LOG_ERROR << "No space left on the device";
return KNOWHERE_NO_SPACE;
} else {
return KNOWHERE_ERROR;
}
}
return KNOWHERE_SUCCESS;
}
@ -209,7 +219,7 @@ void AutoGenParams(const IndexType &type, const long &size, zilliz::knowhere::Co
if (size <= TYPICAL_COUNT / 16384 + 1) {
//handle less row count, avoid nlist set to 0
cfg["nlist"] = 1;
} else if (int(size / TYPICAL_COUNT) * nlist == 0) {
} else if (int(size / TYPICAL_COUNT) *nlist == 0) {
//calculate a proper nlist if nlist not specified or size less than TYPICAL_COUNT
cfg["nlist"] = int(size / TYPICAL_COUNT * 16384);
}

View File

@ -46,5 +46,5 @@ add_subdirectory(server)
add_subdirectory(db)
add_subdirectory(knowhere)
add_subdirectory(metrics)
#add_subdirectory(scheduler)
add_subdirectory(scheduler)
#add_subdirectory(storage)

View File

@ -147,7 +147,7 @@ TEST_F(DBTest, DB_TEST) {
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
engine::IDNumbers vector_ids;
@ -181,7 +181,7 @@ TEST_F(DBTest, DB_TEST) {
ss << "Search " << j << " With Size " << count/engine::meta::M << " M";
STOP_TIMER(ss.str());
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
for (auto k=0; k<qb; ++k) {
ASSERT_EQ(results[k][0].first, target_ids[k]);
ss.str("");
@ -212,7 +212,7 @@ TEST_F(DBTest, DB_TEST) {
uint64_t count;
stat = db_->GetTableRowCount(TABLE_NAME, count);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
ASSERT_TRUE(count > 0);
};
@ -223,7 +223,7 @@ TEST_F(DBTest, SEARCH_TEST) {
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
// prepare raw data
@ -258,7 +258,7 @@ TEST_F(DBTest, SEARCH_TEST) {
for (int j = 0; j < nb / batch_size; ++j) {
stat = db_->InsertVectors(TABLE_NAME, batch_size, xb.data()+batch_size*j*TABLE_DIM, ids);
if (j == 200){ sleep(1);}
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
}
engine::TableIndex index;
@ -268,7 +268,7 @@ TEST_F(DBTest, SEARCH_TEST) {
{
engine::QueryResults results;
stat = db_->Query(TABLE_NAME, k, nq, 10, xq.data(), results);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
}
{//search by specify index file
@ -276,7 +276,7 @@ TEST_F(DBTest, SEARCH_TEST) {
std::vector<std::string> file_ids = {"1", "2", "3", "4", "5", "6"};
engine::QueryResults results;
stat = db_->Query(TABLE_NAME, file_ids, k, nq, 10, xq.data(), dates, results);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
}
// TODO(linxj): add groundTruth assert
@ -289,7 +289,7 @@ TEST_F(DBTest, PRELOADTABLE_TEST) {
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
int64_t nb = VECTOR_COUNT;
@ -309,7 +309,7 @@ TEST_F(DBTest, PRELOADTABLE_TEST) {
int64_t prev_cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
stat = db_->PreloadTable(TABLE_NAME);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
int64_t cur_cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
ASSERT_TRUE(prev_cache_usage < cur_cache_usage);
@ -322,7 +322,7 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
std::vector<engine::meta::TableSchema> table_schema_array;
stat = db_->AllTables(table_schema_array);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
bool bfound = false;
for(auto& schema : table_schema_array) {
if(schema.table_id_ == TABLE_NAME) {
@ -335,7 +335,7 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) {
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
uint64_t size;
@ -366,7 +366,7 @@ TEST_F(DBTest2, DELETE_TEST) {
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
bool has_table = false;
db_->HasTable(TABLE_NAME, has_table);
@ -405,7 +405,7 @@ TEST_F(DBTest2, DELETE_BY_RANGE_TEST) {
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
bool has_table = false;
db_->HasTable(TABLE_NAME, has_table);
@ -433,7 +433,7 @@ TEST_F(DBTest2, DELETE_BY_RANGE_TEST) {
ConvertTimeRangeToDBDates(start_value, end_value, dates);
stat = db_->DeleteTable(TABLE_NAME, dates);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
uint64_t row_count = 0;
db_->GetTableRowCount(TABLE_NAME, row_count);

View File

@ -3,12 +3,12 @@
#include "db/insert/VectorSource.h"
#include "db/insert/MemTableFile.h"
#include "db/insert/MemTable.h"
#include "utils.h"
#include "db/Factories.h"
#include "db/Constants.h"
#include "db/engine/EngineFactory.h"
#include "metrics/Metrics.h"
#include "db/meta/MetaConsts.h"
#include "metrics/Metrics.h"
#include "utils.h"
#include <boost/filesystem.hpp>
#include <thread>
@ -223,7 +223,7 @@ TEST_F(MemManagerTest2, SERIAL_INSERT_SEARCH_TEST) {
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
std::map<int64_t, std::vector<float>> search_vectors;
@ -269,7 +269,7 @@ TEST_F(MemManagerTest2, INSERT_TEST) {
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
auto start_time = METRICS_NOW_TIME;
@ -295,7 +295,7 @@ TEST_F(MemManagerTest2, CONCURRENT_INSERT_SEARCH_TEST) {
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
engine::IDNumbers vector_ids;
@ -329,7 +329,7 @@ TEST_F(MemManagerTest2, CONCURRENT_INSERT_SEARCH_TEST) {
ss << "Search " << j << " With Size " << count / engine::meta::M << " M";
STOP_TIMER(ss.str());
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
for (auto k = 0; k < qb; ++k) {
ASSERT_EQ(results[k][0].first, target_ids[k]);
ss.str("");
@ -366,7 +366,7 @@ TEST_F(MemManagerTest2, VECTOR_IDS_TEST) {
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
engine::IDNumbers vector_ids;
@ -383,7 +383,7 @@ TEST_F(MemManagerTest2, VECTOR_IDS_TEST) {
stat = db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
ASSERT_EQ(vector_ids[0], 0);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
nb = 25000;
xb.clear();
@ -395,7 +395,7 @@ TEST_F(MemManagerTest2, VECTOR_IDS_TEST) {
}
stat = db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
ASSERT_EQ(vector_ids[0], nb);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
nb = 262144; //512M
xb.clear();
@ -407,14 +407,14 @@ TEST_F(MemManagerTest2, VECTOR_IDS_TEST) {
}
stat = db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
ASSERT_EQ(vector_ids[0], nb/2);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
nb = 65536; //128M
xb.clear();
BuildVectors(nb, xb);
vector_ids.clear();
stat = db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
nb = 100;
xb.clear();

View File

@ -213,14 +213,36 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
table.table_id_ = table_id;
auto status = impl_->CreateTable(table);
int new_files_cnt = 4;
int raw_files_cnt = 5;
int to_index_files_cnt = 6;
int index_files_cnt = 7;
uint64_t new_merge_files_cnt = 1;
uint64_t new_index_files_cnt = 2;
uint64_t backup_files_cnt = 3;
uint64_t new_files_cnt = 4;
uint64_t raw_files_cnt = 5;
uint64_t to_index_files_cnt = 6;
uint64_t index_files_cnt = 7;
meta::TableFileSchema table_file;
table_file.table_id_ = table.table_id_;
for (auto i=0; i<new_merge_files_cnt; ++i) {
status = impl_->CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
status = impl_->UpdateTableFile(table_file);
}
for (auto i=0; i<new_index_files_cnt; ++i) {
status = impl_->CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::NEW_INDEX;
status = impl_->UpdateTableFile(table_file);
}
for (auto i=0; i<backup_files_cnt; ++i) {
status = impl_->CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::BACKUP;
table_file.row_count_ = 1;
status = impl_->UpdateTableFile(table_file);
}
for (auto i=0; i<new_files_cnt; ++i) {
status = impl_->CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::NEW;
@ -230,23 +252,30 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
for (auto i=0; i<raw_files_cnt; ++i) {
status = impl_->CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::RAW;
table_file.row_count_ = 1;
status = impl_->UpdateTableFile(table_file);
}
for (auto i=0; i<to_index_files_cnt; ++i) {
status = impl_->CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
table_file.row_count_ = 1;
status = impl_->UpdateTableFile(table_file);
}
for (auto i=0; i<index_files_cnt; ++i) {
status = impl_->CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::INDEX;
table_file.row_count_ = 1;
status = impl_->UpdateTableFile(table_file);
}
meta::TableFilesSchema files;
uint64_t total_row_count = 0;
status = impl_->Count(table_id, total_row_count);
ASSERT_TRUE(status.ok());
ASSERT_EQ(total_row_count, raw_files_cnt+to_index_files_cnt+index_files_cnt);
meta::TableFilesSchema files;
status = impl_->FilesToIndex(files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(files.size(), to_index_files_cnt);
@ -281,4 +310,75 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
status = impl_->FilesToSearch(table_id, ids, dates, dated_files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(),0);
std::vector<int> file_types;
std::vector<std::string> file_ids;
status = impl_->FilesByType(table.table_id_, file_types, file_ids);
ASSERT_TRUE(file_ids.empty());
ASSERT_FALSE(status.ok());
file_types = {
meta::TableFileSchema::NEW,
meta::TableFileSchema::NEW_MERGE,
meta::TableFileSchema::NEW_INDEX,
meta::TableFileSchema::TO_INDEX,
meta::TableFileSchema::INDEX,
meta::TableFileSchema::RAW,
meta::TableFileSchema::BACKUP,
};
status = impl_->FilesByType(table.table_id_, file_types, file_ids);
ASSERT_TRUE(status.ok());
uint64_t total_cnt = new_index_files_cnt + new_merge_files_cnt +
backup_files_cnt + new_files_cnt + raw_files_cnt +
to_index_files_cnt + index_files_cnt;
ASSERT_EQ(file_ids.size(), total_cnt);
status = impl_->DeleteTableFiles(table_id);
ASSERT_TRUE(status.ok());
status = impl_->DeleteTable(table_id);
ASSERT_TRUE(status.ok());
status = impl_->CleanUpFilesWithTTL(1UL);
ASSERT_TRUE(status.ok());
}
TEST_F(MetaTest, INDEX_TEST) {
auto table_id = "index_test";
meta::TableSchema table;
table.table_id_ = table_id;
auto status = impl_->CreateTable(table);
TableIndex index;
index.metric_type_ = 2;
index.nlist_ = 1234;
index.engine_type_ = 3;
status = impl_->UpdateTableIndex(table_id, index);
ASSERT_TRUE(status.ok());
int64_t flag = 65536;
status = impl_->UpdateTableFlag(table_id, flag);
ASSERT_TRUE(status.ok());
engine::meta::TableSchema table_info;
table_info.table_id_ = table_id;
status = impl_->DescribeTable(table_info);
ASSERT_EQ(table_info.flag_, flag);
TableIndex index_out;
status = impl_->DescribeTableIndex(table_id, index_out);
ASSERT_EQ(index_out.metric_type_, index.metric_type_);
ASSERT_EQ(index_out.nlist_, index.nlist_);
ASSERT_EQ(index_out.engine_type_, index.engine_type_);
status = impl_->DropTableIndex(table_id);
ASSERT_TRUE(status.ok());
status = impl_->DescribeTableIndex(table_id, index_out);
ASSERT_NE(index_out.metric_type_, index.metric_type_);
ASSERT_NE(index_out.nlist_, index.nlist_);
ASSERT_NE(index_out.engine_type_, index.engine_type_);
status = impl_->UpdateTableFilesToIndex(table_id);
ASSERT_TRUE(status.ok());
}

View File

@ -53,7 +53,7 @@ TEST_F(MySqlDBTest, DB_TEST) {
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
engine::IDNumbers vector_ids;
@ -90,7 +90,7 @@ TEST_F(MySqlDBTest, DB_TEST) {
ss << "Search " << j << " With Size " << count/engine::meta::M << " M";
STOP_TIMER(ss.str());
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
for (auto k=0; k<qb; ++k) {
// std::cout << results[k][0].first << " " << target_ids[k] << std::endl;
// ASSERT_EQ(results[k][0].first, target_ids[k]);
@ -138,7 +138,7 @@ TEST_F(MySqlDBTest, SEARCH_TEST) {
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
// prepare raw data
@ -173,14 +173,14 @@ TEST_F(MySqlDBTest, SEARCH_TEST) {
for (int j = 0; j < nb / batch_size; ++j) {
stat = db_->InsertVectors(TABLE_NAME, batch_size, xb.data()+batch_size*j*TABLE_DIM, ids);
if (j == 200){ sleep(1);}
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
}
sleep(2); // wait until build index finish
engine::QueryResults results;
stat = db_->Query(TABLE_NAME, k, nq, 10, xq.data(), results);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
};
TEST_F(MySqlDBTest, ARHIVE_DISK_CHECK) {
@ -189,7 +189,7 @@ TEST_F(MySqlDBTest, ARHIVE_DISK_CHECK) {
std::vector<engine::meta::TableSchema> table_schema_array;
stat = db_->AllTables(table_schema_array);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
bool bfound = false;
for(auto& schema : table_schema_array) {
if(schema.table_id_ == TABLE_NAME) {
@ -202,7 +202,7 @@ TEST_F(MySqlDBTest, ARHIVE_DISK_CHECK) {
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(table_info_get.dimension_, TABLE_DIM);
engine::IDNumbers vector_ids;
@ -236,7 +236,7 @@ TEST_F(MySqlDBTest, DELETE_TEST) {
engine::meta::TableSchema table_info_get;
table_info_get.table_id_ = TABLE_NAME;
stat = db_->DescribeTable(table_info_get);
ASSERT_STATS(stat);
ASSERT_TRUE(stat.ok());
bool has_table = false;
db_->HasTable(TABLE_NAME, has_table);

View File

@ -238,14 +238,36 @@ TEST_F(MySqlMetaTest, TABLE_FILES_TEST) {
table.table_id_ = table_id;
auto status = impl_->CreateTable(table);
int new_files_cnt = 4;
int raw_files_cnt = 5;
int to_index_files_cnt = 6;
int index_files_cnt = 7;
uint64_t new_merge_files_cnt = 1;
uint64_t new_index_files_cnt = 2;
uint64_t backup_files_cnt = 3;
uint64_t new_files_cnt = 4;
uint64_t raw_files_cnt = 5;
uint64_t to_index_files_cnt = 6;
uint64_t index_files_cnt = 7;
meta::TableFileSchema table_file;
table_file.table_id_ = table.table_id_;
for (auto i=0; i<new_merge_files_cnt; ++i) {
status = impl_->CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
status = impl_->UpdateTableFile(table_file);
}
for (auto i=0; i<new_index_files_cnt; ++i) {
status = impl_->CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::NEW_INDEX;
status = impl_->UpdateTableFile(table_file);
}
for (auto i=0; i<backup_files_cnt; ++i) {
status = impl_->CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::BACKUP;
table_file.row_count_ = 1;
status = impl_->UpdateTableFile(table_file);
}
for (auto i=0; i<new_files_cnt; ++i) {
status = impl_->CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::NEW;
@ -255,23 +277,30 @@ TEST_F(MySqlMetaTest, TABLE_FILES_TEST) {
for (auto i=0; i<raw_files_cnt; ++i) {
status = impl_->CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::RAW;
table_file.row_count_ = 1;
status = impl_->UpdateTableFile(table_file);
}
for (auto i=0; i<to_index_files_cnt; ++i) {
status = impl_->CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::TO_INDEX;
table_file.row_count_ = 1;
status = impl_->UpdateTableFile(table_file);
}
for (auto i=0; i<index_files_cnt; ++i) {
status = impl_->CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::INDEX;
table_file.row_count_ = 1;
status = impl_->UpdateTableFile(table_file);
}
meta::TableFilesSchema files;
uint64_t total_row_count = 0;
status = impl_->Count(table_id, total_row_count);
ASSERT_TRUE(status.ok());
ASSERT_EQ(total_row_count, raw_files_cnt+to_index_files_cnt+index_files_cnt);
meta::TableFilesSchema files;
status = impl_->FilesToIndex(files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(files.size(), to_index_files_cnt);
@ -307,6 +336,74 @@ TEST_F(MySqlMetaTest, TABLE_FILES_TEST) {
ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(),0);
status = impl_->DropAll();
std::vector<int> file_types;
std::vector<std::string> file_ids;
status = impl_->FilesByType(table.table_id_, file_types, file_ids);
ASSERT_TRUE(file_ids.empty());
ASSERT_FALSE(status.ok());
file_types = {
meta::TableFileSchema::NEW,
meta::TableFileSchema::NEW_MERGE,
meta::TableFileSchema::NEW_INDEX,
meta::TableFileSchema::TO_INDEX,
meta::TableFileSchema::INDEX,
meta::TableFileSchema::RAW,
meta::TableFileSchema::BACKUP,
};
status = impl_->FilesByType(table.table_id_, file_types, file_ids);
ASSERT_TRUE(status.ok());
uint64_t total_cnt = new_index_files_cnt + new_merge_files_cnt +
backup_files_cnt + new_files_cnt + raw_files_cnt +
to_index_files_cnt + index_files_cnt;
ASSERT_EQ(file_ids.size(), total_cnt);
status = impl_->DeleteTableFiles(table_id);
ASSERT_TRUE(status.ok());
status = impl_->DeleteTable(table_id);
ASSERT_TRUE(status.ok());
status = impl_->CleanUpFilesWithTTL(1UL);
ASSERT_TRUE(status.ok());
}
TEST_F(MySqlMetaTest, INDEX_TEST) {
auto table_id = "index_test";
meta::TableSchema table;
table.table_id_ = table_id;
auto status = impl_->CreateTable(table);
TableIndex index;
index.metric_type_ = 2;
index.nlist_ = 1234;
index.engine_type_ = 3;
status = impl_->UpdateTableIndex(table_id, index);
ASSERT_TRUE(status.ok());
int64_t flag = 65536;
status = impl_->UpdateTableFlag(table_id, flag);
ASSERT_TRUE(status.ok());
engine::meta::TableSchema table_info;
table_info.table_id_ = table_id;
status = impl_->DescribeTable(table_info);
ASSERT_EQ(table_info.flag_, flag);
TableIndex index_out;
status = impl_->DescribeTableIndex(table_id, index_out);
ASSERT_EQ(index_out.metric_type_, index.metric_type_);
ASSERT_EQ(index_out.nlist_, index.nlist_);
ASSERT_EQ(index_out.engine_type_, index.engine_type_);
status = impl_->DropTableIndex(table_id);
ASSERT_TRUE(status.ok());
status = impl_->DescribeTableIndex(table_id, index_out);
ASSERT_NE(index_out.metric_type_, index.metric_type_);
ASSERT_NE(index_out.nlist_, index.nlist_);
ASSERT_NE(index_out.engine_type_, index.engine_type_);
status = impl_->UpdateTableFilesToIndex(table_id);
ASSERT_TRUE(status.ok());
}

View File

@ -35,13 +35,6 @@ public:
};
void ASSERT_STATS(engine::Status& stat) {
ASSERT_TRUE(stat.ok());
if(!stat.ok()) {
std::cout << stat.ToString() << std::endl;
}
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void BaseTest::InitLog() {
el::Configurations defaultConf;
@ -94,7 +87,8 @@ void DBTest::TearDown() {
engine::ResMgrInst::GetInstance()->Stop();
engine::SchedInst::GetInstance()->Stop();
boost::filesystem::remove_all("/tmp/milvus_test");
auto options = GetOptions();
boost::filesystem::remove_all(options.meta.path);
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -110,11 +104,15 @@ engine::Options DBTest2::GetOptions() {
void MetaTest::SetUp() {
BaseTest::SetUp();
impl_ = engine::DBMetaImplFactory::Build();
auto options = GetOptions();
impl_ = std::make_shared<engine::meta::SqliteMetaImpl>(options.meta);
}
void MetaTest::TearDown() {
impl_->DropAll();
auto options = GetOptions();
boost::filesystem::remove_all(options.meta.path);
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -134,13 +132,15 @@ engine::Options MySqlDBTest::GetOptions() {
void MySqlMetaTest::SetUp() {
BaseTest::SetUp();
engine::DBMetaOptions options = GetOptions().meta;
int mode = engine::Options::MODE::SINGLE;
impl_ = std::make_shared<engine::meta::MySQLMetaImpl>(options, mode);
auto options = GetOptions();
impl_ = std::make_shared<engine::meta::MySQLMetaImpl>(options.meta, options.mode);
}
void MySqlMetaTest::TearDown() {
impl_->DropAll();
auto options = GetOptions();
boost::filesystem::remove_all(options.meta.path);
}
zilliz::milvus::engine::Options MySqlMetaTest::GetOptions() {

View File

@ -32,8 +32,6 @@
#define STOP_TIMER(name)
#endif
void ASSERT_STATS(zilliz::milvus::engine::Status &stat);
class BaseTest : public ::testing::Test {
protected:
void InitLog();

View File

@ -34,7 +34,6 @@ class KnowhereWrapperTest
std::string generator_type;
std::tie(index_type, generator_type, dim, nb, nq, k, train_cfg, search_cfg) = GetParam();
//auto generator = GetGenerateFactory(generator_type);
auto generator = std::make_shared<DataGenBase>();
generator->GenData(dim, nb, nq, xb, xq, ids, k, gt_ids, gt_dis);

View File

@ -9,14 +9,6 @@
#include "utils.h"
DataGenPtr GetGenerateFactory(const std::string &gen_type) {
std::shared_ptr<DataGenBase> generator;
if (gen_type == "default") {
generator = std::make_shared<DataGenBase>();
}
return generator;
}
void DataGenBase::GenData(const int &dim, const int &nb, const int &nq,
float *xb, float *xq, long *ids,
const int &k, long *gt_ids, float *gt_dis) {

View File

@ -17,8 +17,6 @@ class DataGenBase;
using DataGenPtr = std::shared_ptr<DataGenBase>;
extern DataGenPtr GetGenerateFactory(const std::string &gen_type);
class DataGenBase {
public:

View File

@ -90,8 +90,6 @@ TEST_F(MetricTest, Metric_Tes) {
// stat = db_->Query(group_name, k, qb, qxb, results);
ss << "Search " << j << " With Size " << (float)(count*group_dim*sizeof(float))/(1024*1024) << " M";
// ASSERT_STATS(stat);
for (auto k=0; k<qb; ++k) {
// ASSERT_EQ(results[k][0].first, target_ids[k]);
ss.str("");

View File

@ -34,14 +34,6 @@ public:
};
void ASSERT_STATS(engine::Status& stat) {
ASSERT_TRUE(stat.ok());
if(!stat.ok()) {
std::cout << stat.ToString() << std::endl;
}
}
void MetricTest::InitLog() {
el::Configurations defaultConf;
defaultConf.setToDefault();

View File

@ -0,0 +1,54 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <gtest/gtest.h>
#include "scheduler/resource/Resource.h"
#include "scheduler/event/Event.h"
#include "scheduler/event/StartUpEvent.h"
namespace zilliz {
namespace milvus {
namespace engine {
TEST(EventTest, start_up_event) {
ResourceWPtr res(ResourcePtr(nullptr));
auto event = std::make_shared<StartUpEvent>(res);
ASSERT_FALSE(event->Dump().empty());
std::cout << *event;
std::cout << *EventPtr(event);
}
TEST(EventTest, load_completed_event) {
ResourceWPtr res(ResourcePtr(nullptr));
auto event = std::make_shared<LoadCompletedEvent>(res, nullptr);
ASSERT_FALSE(event->Dump().empty());
std::cout << *event;
std::cout << *EventPtr(event);
}
TEST(EventTest, finish_task_event) {
ResourceWPtr res(ResourcePtr(nullptr));
auto event = std::make_shared<FinishTaskEvent>(res, nullptr);
ASSERT_FALSE(event->Dump().empty());
std::cout << *event;
std::cout << *EventPtr(event);
}
TEST(EventTest, tasktable_updated_event) {
ResourceWPtr res(ResourcePtr(nullptr));
auto event = std::make_shared<TaskTableUpdatedEvent>(res);
ASSERT_FALSE(event->Dump().empty());
std::cout << *event;
std::cout << *EventPtr(event);
}
}
}
}

View File

@ -11,58 +11,71 @@ protected:
node1_ = std::make_shared<Node>();
node2_ = std::make_shared<Node>();
node3_ = std::make_shared<Node>();
node4_ = std::make_shared<Node>();
isolated_node1_ = std::make_shared<Node>();
isolated_node2_ = std::make_shared<Node>();
auto pcie = Connection("PCIe", 11.0);
node1_->AddNeighbour(node2_, pcie);
node1_->AddNeighbour(node3_, pcie);
node2_->AddNeighbour(node1_, pcie);
}
NodePtr node1_;
NodePtr node2_;
NodePtr node3_;
NodePtr node4_;
NodePtr isolated_node1_;
NodePtr isolated_node2_;
};
TEST_F(NodeTest, add_neighbour) {
ASSERT_EQ(node3_->GetNeighbours().size(), 0);
ASSERT_EQ(node4_->GetNeighbours().size(), 0);
ASSERT_EQ(isolated_node1_->GetNeighbours().size(), 0);
ASSERT_EQ(isolated_node2_->GetNeighbours().size(), 0);
auto pcie = Connection("PCIe", 11.0);
node3_->AddNeighbour(node4_, pcie);
node4_->AddNeighbour(node3_, pcie);
ASSERT_EQ(node3_->GetNeighbours().size(), 1);
ASSERT_EQ(node4_->GetNeighbours().size(), 1);
isolated_node1_->AddNeighbour(isolated_node2_, pcie);
ASSERT_EQ(isolated_node1_->GetNeighbours().size(), 1);
ASSERT_EQ(isolated_node2_->GetNeighbours().size(), 0);
}
TEST_F(NodeTest, del_neighbour) {
ASSERT_EQ(node1_->GetNeighbours().size(), 1);
ASSERT_EQ(node2_->GetNeighbours().size(), 1);
ASSERT_EQ(node3_->GetNeighbours().size(), 0);
node1_->DelNeighbour(node2_);
node2_->DelNeighbour(node2_);
node3_->DelNeighbour(node2_);
ASSERT_EQ(node1_->GetNeighbours().size(), 0);
ASSERT_EQ(node2_->GetNeighbours().size(), 1);
ASSERT_EQ(node3_->GetNeighbours().size(), 0);
}
TEST_F(NodeTest, is_neighbour) {
ASSERT_TRUE(node1_->IsNeighbour(node2_));
ASSERT_TRUE(node2_->IsNeighbour(node1_));
ASSERT_FALSE(node1_->IsNeighbour(node3_));
ASSERT_FALSE(node2_->IsNeighbour(node3_));
ASSERT_FALSE(node3_->IsNeighbour(node1_));
ASSERT_FALSE(node3_->IsNeighbour(node2_));
TEST_F(NodeTest, repeat_add_neighbour) {
ASSERT_EQ(isolated_node1_->GetNeighbours().size(), 0);
ASSERT_EQ(isolated_node2_->GetNeighbours().size(), 0);
auto pcie = Connection("PCIe", 11.0);
isolated_node1_->AddNeighbour(isolated_node2_, pcie);
isolated_node1_->AddNeighbour(isolated_node2_, pcie);
ASSERT_EQ(isolated_node1_->GetNeighbours().size(), 1);
ASSERT_EQ(isolated_node2_->GetNeighbours().size(), 0);
}
TEST_F(NodeTest, get_neighbours) {
auto node1_neighbours = node1_->GetNeighbours();
ASSERT_EQ(node1_neighbours.size(), 1);
ASSERT_EQ(node1_neighbours[0].neighbour_node.lock(), node2_);
{
bool n2 = false, n3 = false;
auto node1_neighbours = node1_->GetNeighbours();
ASSERT_EQ(node1_neighbours.size(), 2);
for (auto &n : node1_neighbours) {
if (n.neighbour_node.lock() == node2_) n2 = true;
if (n.neighbour_node.lock() == node3_) n3 = true;
}
ASSERT_TRUE(n2);
ASSERT_TRUE(n3);
}
auto node2_neighbours = node2_->GetNeighbours();
ASSERT_EQ(node2_neighbours.size(), 1);
ASSERT_EQ(node2_neighbours[0].neighbour_node.lock(), node1_);
{
auto node2_neighbours = node2_->GetNeighbours();
ASSERT_EQ(node2_neighbours.size(), 1);
ASSERT_EQ(node2_neighbours[0].neighbour_node.lock(), node1_);
}
{
auto node3_neighbours = node3_->GetNeighbours();
ASSERT_EQ(node3_neighbours.size(), 0);
}
}
TEST_F(NodeTest, dump) {
std::cout << node1_->Dump();
ASSERT_FALSE(node1_->Dump().empty());
std::cout << node2_->Dump();
ASSERT_FALSE(node2_->Dump().empty());
}

View File

@ -2,6 +2,7 @@
#include "scheduler/ResourceMgr.h"
#include "scheduler/Scheduler.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/tasklabel/DefaultLabel.h"
#include "scheduler/SchedInst.h"
#include "utils/Log.h"
#include <gtest/gtest.h>
@ -9,48 +10,44 @@
using namespace zilliz::milvus::engine;
TEST(normal_test, test1) {
TEST(normal_test, inst_test) {
// ResourceMgr only compose resources, provide unified event
// auto res_mgr = std::make_shared<ResourceMgr>();
auto res_mgr = ResMgrInst::GetInstance();
auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd", true, false));
auto cpu = res_mgr->Add(ResourceFactory::Create("cpu", "CPU", 0));
auto gpu1 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu0", false, false));
auto gpu2 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu2", false, false));
res_mgr->Add(ResourceFactory::Create("disk", "DISK", 0, true, false));
res_mgr->Add(ResourceFactory::Create("cpu", "CPU", 0, true, true));
auto IO = Connection("IO", 500.0);
auto PCIE = Connection("IO", 11000.0);
res_mgr->Connect(disk, cpu, IO);
res_mgr->Connect(cpu, gpu1, PCIE);
res_mgr->Connect(cpu, gpu2, PCIE);
res_mgr->Connect("disk", "cpu", IO);
auto scheduler = SchedInst::GetInstance();
res_mgr->Start();
// auto scheduler = new Scheduler(res_mgr);
auto scheduler = SchedInst::GetInstance();
scheduler->Start();
const uint64_t NUM_TASK = 1000;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM_TASK; ++i) {
if (auto observe = disk.lock()) {
auto disks = res_mgr->GetDiskResources();
ASSERT_FALSE(disks.empty());
if (auto observe = disks[0].lock()) {
for (uint64_t i = 0; i < NUM_TASK; ++i) {
auto task = std::make_shared<TestTask>(dummy);
task->label() = std::make_shared<DefaultLabel>();
tasks.push_back(task);
observe->task_table().Put(task);
}
}
sleep(1);
for (auto &task : tasks) {
task->Wait();
ASSERT_EQ(task->load_count_, 1);
ASSERT_EQ(task->exec_count_, 1);
}
scheduler->Stop();
res_mgr->Stop();
auto pcpu = cpu.lock();
for (uint64_t i = 0; i < NUM_TASK; ++i) {
auto task = std::static_pointer_cast<TestTask>(pcpu->task_table()[i]->task);
ASSERT_EQ(task->load_count_, 1);
ASSERT_EQ(task->exec_count_, 1);
}
}

View File

@ -0,0 +1,187 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "scheduler/resource/CpuResource.h"
#include "scheduler/resource/GpuResource.h"
#include "scheduler/resource/DiskResource.h"
#include "scheduler/resource/TestResource.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/ResourceMgr.h"
#include <gtest/gtest.h>
namespace zilliz {
namespace milvus {
namespace engine {
/************ ResourceMgrBaseTest ************/
class ResourceMgrBaseTest : public testing::Test {
protected:
void
SetUp() override {
empty_mgr_ = std::make_shared<ResourceMgr>();
mgr1_ = std::make_shared<ResourceMgr>();
disk_res = std::make_shared<DiskResource>("disk", 0, true, false);
cpu_res = std::make_shared<CpuResource>("cpu", 1, true, false);
gpu_res = std::make_shared<GpuResource>("gpu", 2, true, true);
mgr1_->Add(ResourcePtr(disk_res));
mgr1_->Add(ResourcePtr(cpu_res));
mgr1_->Add(ResourcePtr(gpu_res));
}
void
TearDown() override {
}
ResourceMgrPtr empty_mgr_;
ResourceMgrPtr mgr1_;
ResourcePtr disk_res;
ResourcePtr cpu_res;
ResourcePtr gpu_res;
};
TEST_F(ResourceMgrBaseTest, add) {
auto resource = std::make_shared<TestResource>("test", 0, true, true);
auto ret = empty_mgr_->Add(ResourcePtr(resource));
ASSERT_EQ(ret.lock(), resource);
}
TEST_F(ResourceMgrBaseTest, add_disk) {
auto resource = std::make_shared<DiskResource>("disk", 0, true, true);
auto ret = empty_mgr_->Add(ResourcePtr(resource));
ASSERT_EQ(ret.lock(), resource);
}
TEST_F(ResourceMgrBaseTest, connect) {
auto resource1 = std::make_shared<TestResource>("resource1", 0, true, true);
auto resource2 = std::make_shared<TestResource>("resource2", 2, true, true);
empty_mgr_->Add(resource1);
empty_mgr_->Add(resource2);
Connection io("io", 500.0);
ASSERT_TRUE(empty_mgr_->Connect("resource1", "resource2", io));
}
TEST_F(ResourceMgrBaseTest, invalid_connect) {
auto resource1 = std::make_shared<TestResource>("resource1", 0, true, true);
auto resource2 = std::make_shared<TestResource>("resource2", 2, true, true);
empty_mgr_->Add(resource1);
empty_mgr_->Add(resource2);
Connection io("io", 500.0);
ASSERT_FALSE(empty_mgr_->Connect("xx", "yy", io));
}
TEST_F(ResourceMgrBaseTest, clear) {
ASSERT_EQ(mgr1_->GetNumOfResource(), 3);
mgr1_->Clear();
ASSERT_EQ(mgr1_->GetNumOfResource(), 0);
}
TEST_F(ResourceMgrBaseTest, get_disk_resources) {
auto disks = mgr1_->GetDiskResources();
ASSERT_EQ(disks.size(), 1);
ASSERT_EQ(disks[0].lock(), disk_res);
}
TEST_F(ResourceMgrBaseTest, get_all_resources) {
bool disk = false, cpu = false, gpu = false;
auto resources = mgr1_->GetAllResources();
ASSERT_EQ(resources.size(), 3);
for (auto &res : resources) {
if (res->type() == ResourceType::DISK) disk = true;
if (res->type() == ResourceType::CPU) cpu = true;
if (res->type() == ResourceType::GPU) gpu = true;
}
ASSERT_TRUE(disk);
ASSERT_TRUE(cpu);
ASSERT_TRUE(gpu);
}
TEST_F(ResourceMgrBaseTest, get_compute_resources) {
auto compute_resources = mgr1_->GetComputeResources();
ASSERT_EQ(compute_resources.size(), 1);
ASSERT_EQ(compute_resources[0], gpu_res);
}
TEST_F(ResourceMgrBaseTest, get_resource_by_type_and_deviceid) {
auto cpu = mgr1_->GetResource(ResourceType::CPU, 1);
ASSERT_EQ(cpu, cpu_res);
auto invalid = mgr1_->GetResource(ResourceType::GPU, 1);
ASSERT_EQ(invalid, nullptr);
}
TEST_F(ResourceMgrBaseTest, get_resource_by_name) {
auto disk = mgr1_->GetResource("disk");
ASSERT_EQ(disk, disk_res);
auto invalid = mgr1_->GetResource("invalid");
ASSERT_EQ(invalid, nullptr);
}
TEST_F(ResourceMgrBaseTest, get_num_of_resource) {
ASSERT_EQ(empty_mgr_->GetNumOfResource(), 0);
ASSERT_EQ(mgr1_->GetNumOfResource(), 3);
}
TEST_F(ResourceMgrBaseTest, get_num_of_compute_resource) {
ASSERT_EQ(empty_mgr_->GetNumOfComputeResource(), 0);
ASSERT_EQ(mgr1_->GetNumOfComputeResource(), 1);
}
TEST_F(ResourceMgrBaseTest, get_num_of_gpu_resource) {
ASSERT_EQ(empty_mgr_->GetNumGpuResource(), 0);
ASSERT_EQ(mgr1_->GetNumGpuResource(), 1);
}
TEST_F(ResourceMgrBaseTest, dump) {
ASSERT_FALSE(mgr1_->Dump().empty());
}
TEST_F(ResourceMgrBaseTest, dump_tasktables) {
ASSERT_FALSE(mgr1_->DumpTaskTables().empty());
}
/************ ResourceMgrAdvanceTest ************/
class ResourceMgrAdvanceTest : public testing::Test {
protected:
void
SetUp() override {
mgr1_ = std::make_shared<ResourceMgr>();
disk_res = std::make_shared<DiskResource>("disk", 0, true, false);
mgr1_->Add(ResourcePtr(disk_res));
mgr1_->Start();
}
void
TearDown() override {
mgr1_->Stop();
}
ResourceMgrPtr mgr1_;
ResourcePtr disk_res;
};
TEST_F(ResourceMgrAdvanceTest, register_subscriber) {
bool flag = false;
auto callback = [&](EventPtr event) {
flag = true;
};
mgr1_->RegisterSubscriber(callback);
TableFileSchemaPtr dummy = nullptr;
disk_res->task_table().Put(std::make_shared<TestTask>(dummy));
sleep(1);
ASSERT_TRUE(flag);
}
}
}
}

View File

@ -8,6 +8,7 @@
#include "scheduler/resource/DiskResource.h"
#include "scheduler/resource/CpuResource.h"
#include "scheduler/resource/GpuResource.h"
#include "scheduler/resource/TestResource.h"
#include "scheduler/task/Task.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/ResourceFactory.h"
@ -18,27 +19,107 @@ namespace zilliz {
namespace milvus {
namespace engine {
class ResourceTest : public testing::Test {
/************ ResourceBaseTest ************/
class ResourceBaseTest : public testing::Test {
protected:
void
SetUp() override {
only_loader_ = std::make_shared<DiskResource>(name1, id1, true, false);
only_executor_ = std::make_shared<CpuResource>(name2, id2, false, true);
both_enable_ = std::make_shared<GpuResource>(name3, id3, true, true);
both_disable_ = std::make_shared<TestResource>(name4, id4, false, false);
}
const std::string name1 = "only_loader_";
const std::string name2 = "only_executor_";
const std::string name3 = "both_enable_";
const std::string name4 = "both_disable_";
const uint64_t id1 = 1;
const uint64_t id2 = 2;
const uint64_t id3 = 3;
const uint64_t id4 = 4;
ResourcePtr only_loader_ = nullptr;
ResourcePtr only_executor_ = nullptr;
ResourcePtr both_enable_ = nullptr;
ResourcePtr both_disable_ = nullptr;
};
TEST_F(ResourceBaseTest, name) {
ASSERT_EQ(only_loader_->name(), name1);
ASSERT_EQ(only_executor_->name(), name2);
ASSERT_EQ(both_enable_->name(), name3);
ASSERT_EQ(both_disable_->name(), name4);
}
TEST_F(ResourceBaseTest, type) {
ASSERT_EQ(only_loader_->type(), ResourceType::DISK);
ASSERT_EQ(only_executor_->type(), ResourceType::CPU);
ASSERT_EQ(both_enable_->type(), ResourceType::GPU);
ASSERT_EQ(both_disable_->type(), ResourceType::TEST);
}
TEST_F(ResourceBaseTest, device_id) {
ASSERT_EQ(only_loader_->device_id(), id1);
ASSERT_EQ(only_executor_->device_id(), id2);
ASSERT_EQ(both_enable_->device_id(), id3);
ASSERT_EQ(both_disable_->device_id(), id4);
}
TEST_F(ResourceBaseTest, has_loader) {
ASSERT_TRUE(only_loader_->HasLoader());
ASSERT_FALSE(only_executor_->HasLoader());
ASSERT_TRUE(both_enable_->HasLoader());
ASSERT_FALSE(both_disable_->HasLoader());
}
TEST_F(ResourceBaseTest, has_executor) {
ASSERT_FALSE(only_loader_->HasExecutor());
ASSERT_TRUE(only_executor_->HasExecutor());
ASSERT_TRUE(both_enable_->HasExecutor());
ASSERT_FALSE(both_disable_->HasExecutor());
}
TEST_F(ResourceBaseTest, dump) {
ASSERT_FALSE(only_loader_->Dump().empty());
ASSERT_FALSE(only_executor_->Dump().empty());
ASSERT_FALSE(both_enable_->Dump().empty());
ASSERT_FALSE(both_disable_->Dump().empty());
std::stringstream ss;
ss << only_loader_ << only_executor_ << both_enable_ << both_disable_;
ASSERT_FALSE(ss.str().empty());
}
/************ ResourceAdvanceTest ************/
class ResourceAdvanceTest : public testing::Test {
protected:
void
SetUp() override {
disk_resource_ = ResourceFactory::Create("ssd", "DISK", 0);
cpu_resource_ = ResourceFactory::Create("cpu", "CPU", 0);
gpu_resource_ = ResourceFactory::Create("gpu", "GPU", 0);
test_resource_ = std::make_shared<TestResource>("test", 0, true, true);
resources_.push_back(disk_resource_);
resources_.push_back(cpu_resource_);
resources_.push_back(gpu_resource_);
resources_.push_back(test_resource_);
auto subscriber = [&](EventPtr event) {
if (event->Type() == EventType::LOAD_COMPLETED) {
std::lock_guard<std::mutex> lock(load_mutex_);
++load_count_;
{
std::lock_guard<std::mutex> lock(load_mutex_);
++load_count_;
}
cv_.notify_one();
}
if (event->Type() == EventType::FINISH_TASK) {
std::lock_guard<std::mutex> lock(load_mutex_);
++exec_count_;
{
std::lock_guard<std::mutex> lock(load_mutex_);
++exec_count_;
}
cv_.notify_one();
}
};
@ -46,10 +127,12 @@ protected:
disk_resource_->RegisterSubscriber(subscriber);
cpu_resource_->RegisterSubscriber(subscriber);
gpu_resource_->RegisterSubscriber(subscriber);
test_resource_->RegisterSubscriber(subscriber);
disk_resource_->Start();
cpu_resource_->Start();
gpu_resource_->Start();
test_resource_->Start();
}
void
@ -57,6 +140,7 @@ protected:
disk_resource_->Stop();
cpu_resource_->Stop();
gpu_resource_->Stop();
test_resource_->Stop();
}
void
@ -74,6 +158,7 @@ protected:
ResourcePtr disk_resource_;
ResourcePtr cpu_resource_;
ResourcePtr gpu_resource_;
ResourcePtr test_resource_;
std::vector<ResourcePtr> resources_;
uint64_t load_count_ = 0;
uint64_t exec_count_ = 0;
@ -82,7 +167,32 @@ protected:
std::condition_variable cv_;
};
TEST_F(ResourceTest, cpu_resource_test) {
TEST_F(ResourceAdvanceTest, disk_resource_test) {
const uint64_t NUM = 100;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>(dummy);
tasks.push_back(task);
disk_resource_->task_table().Put(task);
}
disk_resource_->WakeupLoader();
WaitLoader(NUM);
for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->load_count_, 0);
}
disk_resource_->WakeupExecutor();
WaitExecutor(NUM);
for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->exec_count_, 0);
}
}
TEST_F(ResourceAdvanceTest, cpu_resource_test) {
const uint64_t NUM = 100;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
@ -94,8 +204,6 @@ TEST_F(ResourceTest, cpu_resource_test) {
cpu_resource_->WakeupLoader();
WaitLoader(NUM);
// std::cout << "after WakeupLoader" << std::endl;
// std::cout << cpu_resource_->task_table().Dump();
for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->load_count_, 1);
@ -103,15 +211,13 @@ TEST_F(ResourceTest, cpu_resource_test) {
cpu_resource_->WakeupExecutor();
WaitExecutor(NUM);
// std::cout << "after WakeupExecutor" << std::endl;
// std::cout << cpu_resource_->task_table().Dump();
for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->exec_count_, 1);
}
}
TEST_F(ResourceTest, gpu_resource_test) {
TEST_F(ResourceAdvanceTest, gpu_resource_test) {
const uint64_t NUM = 100;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
@ -123,8 +229,6 @@ TEST_F(ResourceTest, gpu_resource_test) {
gpu_resource_->WakeupLoader();
WaitLoader(NUM);
// std::cout << "after WakeupLoader" << std::endl;
// std::cout << cpu_resource_->task_table().Dump();
for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->load_count_, 1);
@ -132,14 +236,36 @@ TEST_F(ResourceTest, gpu_resource_test) {
gpu_resource_->WakeupExecutor();
WaitExecutor(NUM);
// std::cout << "after WakeupExecutor" << std::endl;
// std::cout << cpu_resource_->task_table().Dump();
for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->exec_count_, 1);
}
}
TEST_F(ResourceAdvanceTest, test_resource_test) {
const uint64_t NUM = 100;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>(dummy);
tasks.push_back(task);
test_resource_->task_table().Put(task);
}
test_resource_->WakeupLoader();
WaitLoader(NUM);
for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->load_count_, 1);
}
test_resource_->WakeupExecutor();
WaitExecutor(NUM);
for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->exec_count_, 1);
}
}
}
}

View File

@ -0,0 +1,77 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "scheduler/SchedInst.h"
#include "server/ServerConfig.h"
#include <boost/filesystem.hpp>
#include <gtest/gtest.h>
namespace zilliz {
namespace milvus {
namespace engine {
class SchedInstTest : public testing::Test {
protected:
void
SetUp() override {
boost::filesystem::create_directory(TMP_DIR);
std::stringstream ss;
ss << "resource_config: " << std::endl;
ss << " resources: " << std::endl;
ss << " ssda: " << std::endl;
ss << " type: DISK" << std::endl;
ss << " device_id: 0" << std::endl;
ss << " enable_loader: true" << std::endl;
ss << " enable_executor: false" << std::endl;
ss << " " << std::endl;
ss << " cpu: " << std::endl;
ss << " type: CPU" << std::endl;
ss << " device_id: 0" << std::endl;
ss << " enable_loader: true" << std::endl;
ss << " enable_executor: false" << std::endl;
ss << " " << std::endl;
ss << " gpu0: " << std::endl;
ss << " type: GPU" << std::endl;
ss << " device_id: 0" << std::endl;
ss << " enable_loader: true" << std::endl;
ss << " enable_executor: true" << std::endl;
ss << " " << std::endl;
ss << " connections: " << std::endl;
ss << " io: " << std::endl;
ss << " speed: 500" << std::endl;
ss << " endpoint: ssda===cpu" << std::endl;
ss << " pcie: " << std::endl;
ss << " speed: 11000" << std::endl;
ss << " endpoint: cpu===gpu0" << std::endl;
boost::filesystem::path fpath(CONFIG_FILE);
boost::filesystem::fstream fstream(fpath, std::ios_base::out);
fstream << ss.str();
fstream.close();
server::ServerConfig::GetInstance().LoadConfigFile(CONFIG_FILE);
}
void
TearDown() override {
StopSchedulerService();
boost::filesystem::remove_all(TMP_DIR);
}
const std::string TMP_DIR = "/tmp/milvus_sched_test";
const std::string CONFIG_FILE = "/tmp/milvus_sched_test/config.yaml";
};
TEST_F(SchedInstTest, simple_gpu) {
StartSchedulerService();
}
}
}
}

View File

@ -6,6 +6,7 @@
#include "scheduler/Scheduler.h"
#include <gtest/gtest.h>
#include <src/scheduler/tasklabel/DefaultLabel.h>
#include <src/server/ServerConfig.h>
#include "cache/DataObj.h"
#include "cache/GpuCacheMgr.h"
#include "scheduler/task/TestTask.h"
@ -15,18 +16,19 @@
#include "wrapper/knowhere/vec_index.h"
#include "scheduler/tasklabel/SpecResLabel.h"
namespace zilliz {
namespace milvus {
namespace engine {
class MockVecIndex : public engine::VecIndex {
public:
virtual server::KnowhereError BuildAll(const long &nb,
const float *xb,
const long *ids,
const engine::Config &cfg,
const long &nt = 0,
const float *xt = nullptr) {
virtual ErrorCode BuildAll(const long &nb,
const float *xb,
const long *ids,
const engine::Config &cfg,
const long &nt = 0,
const float *xt = nullptr) {
}
@ -42,18 +44,18 @@ public:
return engine::IndexType::INVALID;
}
virtual server::KnowhereError Add(const long &nb,
const float *xb,
const long *ids,
const engine::Config &cfg = engine::Config()) {
virtual ErrorCode Add(const long &nb,
const float *xb,
const long *ids,
const engine::Config &cfg = engine::Config()) {
}
virtual server::KnowhereError Search(const long &nq,
const float *xq,
float *dist,
long *ids,
const engine::Config &cfg = engine::Config()) {
virtual ErrorCode Search(const long &nq,
const float *xq,
float *dist,
long *ids,
const engine::Config &cfg = engine::Config()) {
}
@ -78,7 +80,7 @@ public:
return binset;
}
virtual server::KnowhereError Load(const zilliz::knowhere::BinarySet &index_binary) {
virtual ErrorCode Load(const zilliz::knowhere::BinarySet &index_binary) {
}
@ -92,6 +94,10 @@ class SchedulerTest : public testing::Test {
protected:
void
SetUp() override {
server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE);
config.AddSequenceItem(server::CONFIG_GPU_IDS, "0");
config.AddSequenceItem(server::CONFIG_GPU_IDS, "1");
ResourcePtr cpu = ResourceFactory::Create("cpu", "CPU", 0, true, false);
ResourcePtr gpu_0 = ResourceFactory::Create("gpu0", "GPU", 0);
ResourcePtr gpu_1 = ResourceFactory::Create("gpu1", "GPU", 1);
@ -127,16 +133,16 @@ protected:
void
insert_dummy_index_into_gpu_cache(uint64_t device_id) {
MockVecIndex* mock_index = new MockVecIndex();
MockVecIndex *mock_index = new MockVecIndex();
mock_index->ntotal_ = 1000;
engine::VecIndexPtr index(mock_index);
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index);
cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location",obj);
cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location", obj);
}
TEST_F(SchedulerTest, OnCopyCompleted) {
TEST_F(SchedulerTest, OnLoadCompleted) {
const uint64_t NUM = 10;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>();
@ -176,7 +182,7 @@ TEST_F(SchedulerTest, PushTaskToNeighbourRandomlyTest) {
}
class SchedulerTest2 : public testing::Test {
protected:
protected:
void
SetUp() override {
ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false);

View File

@ -0,0 +1,23 @@
/*******************************************************************************
* Copyright (Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "scheduler/task/SearchTask.h"
#include <gtest/gtest.h>
namespace zilliz {
namespace milvus {
namespace engine {
TEST(TaskTest, invalid_index) {
auto search_task = std::make_shared<XSearchTask>(nullptr);
search_task->Load(LoadType::TEST, 10);
}
}
}
}

View File

@ -5,30 +5,37 @@
using namespace zilliz::milvus::engine;
/************ TaskTableBaseTest ************/
class TaskTableItemTest : public ::testing::Test {
protected:
void
SetUp() override {
item1_.id = 0;
item1_.state = TaskTableItemState::MOVED;
item1_.priority = 10;
std::vector<TaskTableItemState> states{
TaskTableItemState::INVALID,
TaskTableItemState::START,
TaskTableItemState::LOADING,
TaskTableItemState::LOADED,
TaskTableItemState::EXECUTING,
TaskTableItemState::EXECUTED,
TaskTableItemState::MOVING,
TaskTableItemState::MOVED};
for (auto &state : states) {
auto item = std::make_shared<TaskTableItem>();
item->state = state;
items_.emplace_back(item);
}
}
TaskTableItem default_;
TaskTableItem item1_;
std::vector<TaskTableItemPtr> items_;
};
TEST_F(TaskTableItemTest, construct) {
ASSERT_EQ(default_.id, 0);
ASSERT_EQ(default_.task, nullptr);
ASSERT_EQ(default_.state, TaskTableItemState::INVALID);
ASSERT_EQ(default_.priority, 0);
}
TEST_F(TaskTableItemTest, copy) {
TaskTableItem another(item1_);
ASSERT_EQ(another.id, item1_.id);
ASSERT_EQ(another.state, item1_.state);
ASSERT_EQ(another.priority, item1_.priority);
}
TEST_F(TaskTableItemTest, destruct) {
@ -36,6 +43,107 @@ TEST_F(TaskTableItemTest, destruct) {
delete p_item;
}
TEST_F(TaskTableItemTest, is_finish) {
for (auto &item : items_) {
if (item->state == TaskTableItemState::EXECUTED
|| item->state == TaskTableItemState::MOVED) {
ASSERT_TRUE(item->IsFinish());
} else {
ASSERT_FALSE(item->IsFinish());
}
}
}
TEST_F(TaskTableItemTest, dump) {
for (auto &item : items_) {
ASSERT_FALSE(item->Dump().empty());
}
}
TEST_F(TaskTableItemTest, load) {
for (auto &item : items_) {
auto before_state = item->state;
auto ret = item->Load();
if (before_state == TaskTableItemState::START) {
ASSERT_TRUE(ret);
ASSERT_EQ(item->state, TaskTableItemState::LOADING);
} else {
ASSERT_FALSE(ret);
ASSERT_EQ(item->state, before_state);
}
}
}
TEST_F(TaskTableItemTest, loaded) {
for (auto &item : items_) {
auto before_state = item->state;
auto ret = item->Loaded();
if (before_state == TaskTableItemState::LOADING) {
ASSERT_TRUE(ret);
ASSERT_EQ(item->state, TaskTableItemState::LOADED);
} else {
ASSERT_FALSE(ret);
ASSERT_EQ(item->state, before_state);
}
}
}
TEST_F(TaskTableItemTest, execute) {
for (auto &item : items_) {
auto before_state = item->state;
auto ret = item->Execute();
if (before_state == TaskTableItemState::LOADED) {
ASSERT_TRUE(ret);
ASSERT_EQ(item->state, TaskTableItemState::EXECUTING);
} else {
ASSERT_FALSE(ret);
ASSERT_EQ(item->state, before_state);
}
}
}
TEST_F(TaskTableItemTest, executed) {
for (auto &item : items_) {
auto before_state = item->state;
auto ret = item->Executed();
if (before_state == TaskTableItemState::EXECUTING) {
ASSERT_TRUE(ret);
ASSERT_EQ(item->state, TaskTableItemState::EXECUTED);
} else {
ASSERT_FALSE(ret);
ASSERT_EQ(item->state, before_state);
}
}
}
TEST_F(TaskTableItemTest, move) {
for (auto &item : items_) {
auto before_state = item->state;
auto ret = item->Move();
if (before_state == TaskTableItemState::LOADED) {
ASSERT_TRUE(ret);
ASSERT_EQ(item->state, TaskTableItemState::MOVING);
} else {
ASSERT_FALSE(ret);
ASSERT_EQ(item->state, before_state);
}
}
}
TEST_F(TaskTableItemTest, moved) {
for (auto &item : items_) {
auto before_state = item->state;
auto ret = item->Moved();
if (before_state == TaskTableItemState::MOVING) {
ASSERT_TRUE(ret);
ASSERT_EQ(item->state, TaskTableItemState::MOVED);
} else {
ASSERT_FALSE(ret);
ASSERT_EQ(item->state, before_state);
}
}
}
/************ TaskTableBaseTest ************/
@ -55,6 +163,16 @@ protected:
TaskTable empty_table_;
};
TEST_F(TaskTableBaseTest, subscriber) {
bool flag = false;
auto callback = [&]() {
flag = true;
};
empty_table_.RegisterSubscriber(callback);
empty_table_.Put(task1_);
ASSERT_TRUE(flag);
}
TEST_F(TaskTableBaseTest, put_task) {
empty_table_.Put(task1_);
@ -78,6 +196,125 @@ TEST_F(TaskTableBaseTest, put_empty_batch) {
empty_table_.Put(tasks);
}
TEST_F(TaskTableBaseTest, empty) {
ASSERT_TRUE(empty_table_.Empty());
empty_table_.Put(task1_);
ASSERT_FALSE(empty_table_.Empty());
}
TEST_F(TaskTableBaseTest, size) {
ASSERT_EQ(empty_table_.Size(), 0);
empty_table_.Put(task1_);
ASSERT_EQ(empty_table_.Size(), 1);
}
TEST_F(TaskTableBaseTest, operator_) {
empty_table_.Put(task1_);
ASSERT_EQ(empty_table_.Get(0), empty_table_[0]);
}
TEST_F(TaskTableBaseTest, pick_to_load) {
const size_t NUM_TASKS = 10;
for (size_t i = 0; i < NUM_TASKS; ++i) {
empty_table_.Put(task1_);
}
empty_table_[0]->state = TaskTableItemState::MOVED;
empty_table_[1]->state = TaskTableItemState::EXECUTED;
auto indexes = empty_table_.PickToLoad(1);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0], 2);
}
TEST_F(TaskTableBaseTest, pick_to_load_limit) {
const size_t NUM_TASKS = 10;
for (size_t i = 0; i < NUM_TASKS; ++i) {
empty_table_.Put(task1_);
}
empty_table_[0]->state = TaskTableItemState::MOVED;
empty_table_[1]->state = TaskTableItemState::EXECUTED;
auto indexes = empty_table_.PickToLoad(3);
ASSERT_EQ(indexes.size(), 3);
ASSERT_EQ(indexes[0], 2);
ASSERT_EQ(indexes[1], 3);
ASSERT_EQ(indexes[2], 4);
}
TEST_F(TaskTableBaseTest, pick_to_load_cache) {
const size_t NUM_TASKS = 10;
for (size_t i = 0; i < NUM_TASKS; ++i) {
empty_table_.Put(task1_);
}
empty_table_[0]->state = TaskTableItemState::MOVED;
empty_table_[1]->state = TaskTableItemState::EXECUTED;
// first pick, non-cache
auto indexes = empty_table_.PickToLoad(1);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0], 2);
// second pick, iterate from 2
// invalid state change
empty_table_[1]->state = TaskTableItemState::START;
indexes = empty_table_.PickToLoad(1);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0], 2);
}
TEST_F(TaskTableBaseTest, pick_to_execute) {
const size_t NUM_TASKS = 10;
for (size_t i = 0; i < NUM_TASKS; ++i) {
empty_table_.Put(task1_);
}
empty_table_[0]->state = TaskTableItemState::MOVED;
empty_table_[1]->state = TaskTableItemState::EXECUTED;
empty_table_[2]->state = TaskTableItemState::LOADED;
auto indexes = empty_table_.PickToExecute(1);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0], 2);
}
TEST_F(TaskTableBaseTest, pick_to_execute_limit) {
const size_t NUM_TASKS = 10;
for (size_t i = 0; i < NUM_TASKS; ++i) {
empty_table_.Put(task1_);
}
empty_table_[0]->state = TaskTableItemState::MOVED;
empty_table_[1]->state = TaskTableItemState::EXECUTED;
empty_table_[2]->state = TaskTableItemState::LOADED;
empty_table_[3]->state = TaskTableItemState::LOADED;
auto indexes = empty_table_.PickToExecute(3);
ASSERT_EQ(indexes.size(), 2);
ASSERT_EQ(indexes[0], 2);
ASSERT_EQ(indexes[1], 3);
}
TEST_F(TaskTableBaseTest, pick_to_execute_cache) {
const size_t NUM_TASKS = 10;
for (size_t i = 0; i < NUM_TASKS; ++i) {
empty_table_.Put(task1_);
}
empty_table_[0]->state = TaskTableItemState::MOVED;
empty_table_[1]->state = TaskTableItemState::EXECUTED;
empty_table_[2]->state = TaskTableItemState::LOADED;
// first pick, non-cache
auto indexes = empty_table_.PickToExecute(1);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0], 2);
// second pick, iterate from 2
// invalid state change
empty_table_[1]->state = TaskTableItemState::START;
indexes = empty_table_.PickToExecute(1);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0], 2);
}
/************ TaskTableAdvanceTest ************/
class TaskTableAdvanceTest : public ::testing::Test {
@ -104,25 +341,116 @@ protected:
};
TEST_F(TaskTableAdvanceTest, load) {
table1_.Load(1);
table1_.Loaded(2);
std::vector<TaskTableItemState> before_state;
for (auto &task : table1_) {
before_state.push_back(task->state);
}
ASSERT_EQ(table1_.Get(1)->state, TaskTableItemState::LOADING);
ASSERT_EQ(table1_.Get(2)->state, TaskTableItemState::LOADED);
for (size_t i = 0; i < table1_.Size(); ++i) {
table1_.Load(i);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
if (before_state[i] == TaskTableItemState::START) {
ASSERT_EQ(table1_.Get(i)->state, TaskTableItemState::LOADING);
} else {
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
}
}
}
TEST_F(TaskTableAdvanceTest, loaded) {
std::vector<TaskTableItemState> before_state;
for (auto &task : table1_) {
before_state.push_back(task->state);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
table1_.Loaded(i);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
if (before_state[i] == TaskTableItemState::LOADING) {
ASSERT_EQ(table1_.Get(i)->state, TaskTableItemState::LOADED);
} else {
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
}
}
}
TEST_F(TaskTableAdvanceTest, execute) {
table1_.Execute(3);
table1_.Executed(4);
std::vector<TaskTableItemState> before_state;
for (auto &task : table1_) {
before_state.push_back(task->state);
}
ASSERT_EQ(table1_.Get(3)->state, TaskTableItemState::EXECUTING);
ASSERT_EQ(table1_.Get(4)->state, TaskTableItemState::EXECUTED);
for (size_t i = 0; i < table1_.Size(); ++i) {
table1_.Execute(i);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
if (before_state[i] == TaskTableItemState::LOADED) {
ASSERT_EQ(table1_.Get(i)->state, TaskTableItemState::EXECUTING);
} else {
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
}
}
}
TEST_F(TaskTableAdvanceTest, executed) {
std::vector<TaskTableItemState> before_state;
for (auto &task : table1_) {
before_state.push_back(task->state);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
table1_.Executed(i);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
if (before_state[i] == TaskTableItemState::EXECUTING) {
ASSERT_EQ(table1_.Get(i)->state, TaskTableItemState::EXECUTED);
} else {
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
}
}
}
TEST_F(TaskTableAdvanceTest, move) {
table1_.Move(3);
table1_.Moved(6);
std::vector<TaskTableItemState> before_state;
for (auto &task : table1_) {
before_state.push_back(task->state);
}
ASSERT_EQ(table1_.Get(3)->state, TaskTableItemState::MOVING);
ASSERT_EQ(table1_.Get(6)->state, TaskTableItemState::MOVED);
for (size_t i = 0; i < table1_.Size(); ++i) {
table1_.Move(i);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
if (before_state[i] == TaskTableItemState::LOADED) {
ASSERT_EQ(table1_.Get(i)->state, TaskTableItemState::MOVING);
} else {
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
}
}
}
TEST_F(TaskTableAdvanceTest, moved) {
std::vector<TaskTableItemState> before_state;
for (auto &task : table1_) {
before_state.push_back(task->state);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
table1_.Moved(i);
}
for (size_t i = 0; i < table1_.Size(); ++i) {
if (before_state[i] == TaskTableItemState::MOVING) {
ASSERT_EQ(table1_.Get(i)->state, TaskTableItemState::MOVED);
} else {
ASSERT_EQ(table1_.Get(i)->state, before_state[i]);
}
}
}

View File

@ -36,7 +36,7 @@ public:
const engine::Config &cfg,
const long &nt = 0,
const float *xt = nullptr) {
return 0;
}
engine::VecIndexPtr Clone() override {
@ -55,7 +55,7 @@ public:
const float *xb,
const long *ids,
const engine::Config &cfg = engine::Config()) {
return 0;
}
virtual ErrorCode Search(const long &nq,
@ -63,15 +63,16 @@ public:
float *dist,
long *ids,
const engine::Config &cfg = engine::Config()) {
return 0;
}
engine::VecIndexPtr CopyToGpu(const int64_t &device_id, const engine::Config &cfg) override {
engine::VecIndexPtr CopyToGpu(const int64_t &device_id,
const engine::Config &cfg) override {
return nullptr;
}
engine::VecIndexPtr CopyToCpu(const engine::Config &cfg) override {
return nullptr;
}
virtual int64_t Dimension() {
@ -88,7 +89,7 @@ public:
}
virtual ErrorCode Load(const zilliz::knowhere::BinarySet &index_binary) {
return 0;
}
public:
@ -98,6 +99,24 @@ public:
}
TEST(CacheTest, DUMMY_TEST) {
engine::Config cfg;
MockVecIndex mock_index;
mock_index.Dimension();
mock_index.Count();
mock_index.Add(1, nullptr, nullptr);
mock_index.BuildAll(1, nullptr, nullptr, cfg);
mock_index.Search(1, nullptr, nullptr, nullptr, cfg);
mock_index.Clone();
mock_index.CopyToCpu(cfg);
mock_index.CopyToGpu(1, cfg);
mock_index.GetDeviceId();
mock_index.GetType();
zilliz::knowhere::BinarySet index_binary;
mock_index.Load(index_binary);
mock_index.Serialize();
}
TEST(CacheTest, CPU_CACHE_TEST) {
cache::CacheMgr *cpu_mgr = cache::CpuCacheMgr::GetInstance();