diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index e67a31d619..6b1caa4b78 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -29,11 +29,9 @@ Please mark all change in change log and use the ticket from JIRA. - MS-492 - Drop index failed if index have been created with index_type: FLAT - MS-493 - Knowhere unittest crash - MS-453 - GPU search error when nprobe set more than 1024 -<<<<<<< HEAD - MS-474 - Create index hang if use branch-0.3.1 server config -======= - MS-510 - unittest out of memory and crashed ->>>>>>> upstream/branch-0.4.0 +- MS-119 - The problem of combining the log files ## Improvement - MS-327 - Clean code for milvus @@ -114,6 +112,8 @@ Please mark all change in change log and use the ticket from JIRA. - MS-528 - Hide some config used future - MS-530 - Add unittest for SearchTask->Load - MS-531 - Disable next version code +- MS-533 - Update resource_test to cover dump function +- MS-523 - Config file validation ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 16e8041c29..1754bfeff3 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -31,7 +31,6 @@ if(CMAKE_BUILD_TYPE STREQUAL "Release") set(BUILD_TYPE "release") else() set(BUILD_TYPE "debug") - SET(CMAKE_VERBOSE_MAKEFILE on) endif() message(STATUS "Build type = ${BUILD_TYPE}") diff --git a/cpp/conf/server_config.template b/cpp/conf/server_config.template index aa65ef5cb5..ecfcd25f78 100644 --- a/cpp/conf/server_config.template +++ b/cpp/conf/server_config.template @@ -1,5 +1,5 @@ server_config: - address: 0.0.0.0 # milvus server ip address + address: 0.0.0.0 # milvus server ip address (IPv4) port: 19530 # the port milvus listen to, default: 19530, range: 1025 ~ 65534 gpu_index: 0 # the gpu milvus use, default: 0, range: 0 ~ gpu number - 1 mode: single # milvus deployment type: single, cluster, read_only diff --git a/cpp/src/core/include/knowhere/index/vector_index/gpu_ivf.h b/cpp/src/core/include/knowhere/index/vector_index/gpu_ivf.h index e1395d49f8..f789a06b1a 100644 --- a/cpp/src/core/include/knowhere/index/vector_index/gpu_ivf.h +++ b/cpp/src/core/include/knowhere/index/vector_index/gpu_ivf.h @@ -3,6 +3,7 @@ #include #include "ivf.h" +#include "src/utils/BlockingQueue.h" namespace zilliz { @@ -16,12 +17,15 @@ struct Resource { std::shared_ptr faiss_res; int64_t id; + std::mutex mutex; }; using ResPtr = std::shared_ptr; using ResWPtr = std::weak_ptr; class FaissGpuResourceMgr { public: + using ResBQ = zilliz::milvus::server::BlockingQueue; + struct DeviceParams { int64_t temp_mem_size = 0; int64_t pinned_mem_size = 0; @@ -55,11 +59,8 @@ class FaissGpuResourceMgr { // allocate gpu memory before search // this func will return True if the device is idle and exists an idle resource. - bool - GetRes(const int64_t& device_id, ResPtr &res, const int64_t& alloc_size = 0); - - void - MoveToInuse(const int64_t &device_id, const ResPtr& res); + //bool + //GetRes(const int64_t& device_id, ResPtr &res, const int64_t& alloc_size = 0); void MoveToIdle(const int64_t &device_id, const ResPtr& res); @@ -67,33 +68,34 @@ class FaissGpuResourceMgr { void Dump(); - protected: - void - RemoveResource(const int64_t& device_id, const ResPtr& res, std::map>& resource_pool); - protected: bool is_init = false; - std::mutex mutex_; std::map devices_params_; - std::map> in_use_; - std::map> idle_; + std::map idle_map; }; class ResScope { public: - ResScope(const int64_t device_id, ResPtr &res) : resource(res), device_id(device_id) { - FaissGpuResourceMgr::GetInstance().MoveToInuse(device_id, resource); + ResScope(const int64_t device_id, ResPtr &res) : resource(res), device_id(device_id), move(true) { + res->mutex.lock(); + } + + ResScope(ResPtr &res) : resource(res), device_id(-1), move(false) { + res->mutex.lock(); } ~ResScope() { - //resource->faiss_res->noTempMemory(); - FaissGpuResourceMgr::GetInstance().MoveToIdle(device_id, resource); + if (move) { + FaissGpuResourceMgr::GetInstance().MoveToIdle(device_id, resource); + } + resource->mutex.unlock(); } private: ResPtr resource; int64_t device_id; + bool move = true; }; class GPUIndex { diff --git a/cpp/src/core/src/knowhere/index/vector_index/gpu_ivf.cpp b/cpp/src/core/src/knowhere/index/vector_index/gpu_ivf.cpp index 6c54afb1b8..1474ff2e11 100644 --- a/cpp/src/core/src/knowhere/index/vector_index/gpu_ivf.cpp +++ b/cpp/src/core/src/knowhere/index/vector_index/gpu_ivf.cpp @@ -130,19 +130,17 @@ void GPUIVF::search_impl(int64_t n, float *distances, int64_t *labels, const Config &cfg) { - // TODO(linxj): allocate mem - auto temp_res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_); - if (temp_res) { - ResScope rs(gpu_id_, temp_res); - if (auto device_index = std::static_pointer_cast(index_)) { - auto nprobe = cfg.get_with_default("nprobe", size_t(1)); + std::lock_guard lk(mutex_); - std::lock_guard lk(mutex_); - device_index->setNumProbes(nprobe); + if (auto device_index = std::static_pointer_cast(index_)) { + auto nprobe = cfg.get_with_default("nprobe", size_t(1)); + device_index->setNumProbes(nprobe); + + { + // TODO(linxj): allocate mem + ResScope rs(res_); device_index->search(n, (float *) data, k, distances, labels); } - } else { - KNOWHERE_THROW_MSG("search can't get gpu resource"); } } @@ -282,120 +280,75 @@ void FaissGpuResourceMgr::InitResource() { is_init = true; + std::cout << "InitResource" << std::endl; for(auto& device : devices_params_) { - auto& resource_vec = idle_[device.first]; + auto& device_id = device.first; + std::cout << "Device Id: " << device_id << std::endl; + auto& device_param = device.second; + auto& bq = idle_map[device_id]; - for (int64_t i = 0; i < device.second.resource_num; ++i) { - auto res = std::make_shared(); + for (int64_t i = 0; i < device_param.resource_num; ++i) { + std::cout << "Resource Id: " << i << std::endl; + auto raw_resource = std::make_shared(); // TODO(linxj): enable set pinned memory - //res->noTempMemory(); - auto res_wrapper = std::make_shared(res); - AllocateTempMem(res_wrapper, device.first, 0); + auto res_wrapper = std::make_shared(raw_resource); + AllocateTempMem(res_wrapper, device_id, 0); - resource_vec.emplace_back(res_wrapper); + bq.Put(res_wrapper); } } + std::cout << "End initResource" << std::endl; } ResPtr FaissGpuResourceMgr::GetRes(const int64_t &device_id, const int64_t &alloc_size) { - std::lock_guard lk(mutex_); + InitResource(); - if (!is_init) { - InitResource(); - is_init = true; - } - - auto search = idle_.find(device_id); - if (search != idle_.end()) { - auto res = search->second.back(); - //AllocateTempMem(res, device_id, alloc_size); - - search->second.pop_back(); - return res; + auto finder = idle_map.find(device_id); + if (finder != idle_map.end()) { + auto& bq = finder->second; + auto&& resource = bq.Take(); + AllocateTempMem(resource, device_id, alloc_size); + return resource; } return nullptr; } -bool FaissGpuResourceMgr::GetRes(const int64_t &device_id, - ResPtr &res, - const int64_t &alloc_size) { - std::lock_guard lk(mutex_); - - if (!is_init) { - InitResource(); - is_init = true; - } - - auto search = idle_.find(device_id); - if (search != idle_.end()) { - auto &res_vec = search->second; - for (auto it = res_vec.cbegin(); it != res_vec.cend(); ++it) { - if ((*it)->id == res->id) { - //AllocateTempMem(res, device_id, alloc_size); - res_vec.erase(it); - return true; - } - } - } - // else - return false; -} - -void FaissGpuResourceMgr::MoveToInuse(const int64_t &device_id, const ResPtr &res) { - std::lock_guard lk(mutex_); - RemoveResource(device_id, res, idle_); - in_use_[device_id].push_back(res); -} +//bool FaissGpuResourceMgr::GetRes(const int64_t &device_id, +// ResPtr &res, +// const int64_t &alloc_size) { +// InitResource(); +// +// std::lock_guard lk(res->mutex); +// AllocateTempMem(res, device_id, alloc_size); +// return true; +//} void FaissGpuResourceMgr::MoveToIdle(const int64_t &device_id, const ResPtr &res) { - std::lock_guard lk(mutex_); - RemoveResource(device_id, res, in_use_); - auto it = idle_[device_id].begin(); - idle_[device_id].insert(it, res); -} - -void -FaissGpuResourceMgr::RemoveResource(const int64_t &device_id, - const ResPtr &res, - std::map> &resource_pool) { - if (resource_pool.find(device_id) != resource_pool.end()) { - std::vector &res_array = resource_pool[device_id]; - res_array.erase(std::remove_if(res_array.begin(), res_array.end(), - [&](ResPtr &ptr) { return ptr->id == res->id; }), - res_array.end()); + auto finder = idle_map.find(device_id); + if (finder != idle_map.end()) { + auto& bq = finder->second; + bq.Put(res); } } void FaissGpuResourceMgr::Free() { - for (auto &item : in_use_) { - auto& res_vec = item.second; - res_vec.clear(); - } - for (auto &item : idle_) { - auto& res_vec = item.second; - res_vec.clear(); + for (auto &item : idle_map) { + auto& bq = item.second; + while (!bq.Empty()) { + bq.Take(); + } } is_init = false; } void FaissGpuResourceMgr::Dump() { - std::cout << "In used resource" << std::endl; - for(auto& item: in_use_) { - std::cout << "device_id: " << item.first << std::endl; - for(auto& elem : item.second) { - std::cout << "resource_id: " << elem->id << std::endl; - } - } - - std::cout << "Idle resource" << std::endl; - for(auto& item: idle_) { - std::cout << "device_id: " << item.first << std::endl; - for(auto& elem : item.second) { - std::cout << "resource_id: " << elem->id << std::endl; - } + for (auto &item : idle_map) { + auto& bq = item.second; + std::cout << "device_id: " << item.first + << ", resource count:" << bq.Size(); } } diff --git a/cpp/src/core/test/test_ivf.cpp b/cpp/src/core/test/test_ivf.cpp index 1e91618e83..625a9ca0fd 100644 --- a/cpp/src/core/test/test_ivf.cpp +++ b/cpp/src/core/test/test_ivf.cpp @@ -386,7 +386,7 @@ class GPURESTEST int64_t elems = 0; }; -const int search_count = 10; +const int search_count = 18; const int load_count = 3; TEST_F(GPURESTEST, gpu_ivf_resource_test) { diff --git a/cpp/src/db/Utils.cpp b/cpp/src/db/Utils.cpp index f03624451b..2c9173f6a7 100644 --- a/cpp/src/db/Utils.cpp +++ b/cpp/src/db/Utils.cpp @@ -135,7 +135,7 @@ Status GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& tab } } - std::string msg = "Table file doesn't exist: " + table_file.file_id_; + std::string msg = "Table file doesn't exist: " + file_path; ENGINE_LOG_ERROR << msg; return Status(DB_ERROR, msg); } diff --git a/cpp/src/db/meta/MySQLMetaImpl.cpp b/cpp/src/db/meta/MySQLMetaImpl.cpp index 19ac25b886..2910210a50 100644 --- a/cpp/src/db/meta/MySQLMetaImpl.cpp +++ b/cpp/src/db/meta/MySQLMetaImpl.cpp @@ -814,7 +814,6 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) { file_schema.engine_type_ = table_schema.engine_type_; file_schema.nlist_ = table_schema.nlist_; file_schema.metric_type_ = table_schema.metric_type_; - utils::GetTableFilePath(options_, file_schema); std::string id = "NULL"; //auto-increment std::string table_id = file_schema.table_id_; @@ -924,7 +923,10 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) { table_file.nlist_ = groups[table_file.table_id_].nlist_; table_file.metric_type_ = groups[table_file.table_id_].metric_type_; - utils::GetTableFilePath(options_, table_file); + auto status = utils::GetTableFilePath(options_, table_file); + if(!status.ok()) { + return status; + } files.push_back(table_file); } @@ -1027,7 +1029,10 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id, table_file.dimension_ = table_schema.dimension_; - utils::GetTableFilePath(options_, table_file); + auto status = utils::GetTableFilePath(options_, table_file); + if(!status.ok()) { + return status; + } auto dateItr = files.find(table_file.date_); if (dateItr == files.end()) { @@ -1113,7 +1118,10 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id, table_file.dimension_ = table_schema.dimension_; - utils::GetTableFilePath(options_, table_file); + auto status = utils::GetTableFilePath(options_, table_file); + if(!status.ok()) { + return status; + } auto dateItr = files.find(table_file.date_); if (dateItr == files.end()) { @@ -1203,7 +1211,10 @@ Status MySQLMetaImpl::GetTableFiles(const std::string &table_id, file_schema.dimension_ = table_schema.dimension_; - utils::GetTableFilePath(options_, file_schema); + auto status = utils::GetTableFilePath(options_, file_schema); + if(!status.ok()) { + return status; + } table_files.emplace_back(file_schema); } diff --git a/cpp/src/db/meta/SqliteMetaImpl.cpp b/cpp/src/db/meta/SqliteMetaImpl.cpp index 559ffd29ff..730a3035ea 100644 --- a/cpp/src/db/meta/SqliteMetaImpl.cpp +++ b/cpp/src/db/meta/SqliteMetaImpl.cpp @@ -614,7 +614,10 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) { table_file.engine_type_ = std::get<7>(file); table_file.created_on_ = std::get<8>(file); - utils::GetTableFilePath(options_, table_file); + auto status = utils::GetTableFilePath(options_, table_file); + if(!status.ok()) { + return status; + } auto groupItr = groups.find(table_file.table_id_); if (groupItr == groups.end()) { TableSchema table_schema; @@ -707,7 +710,11 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id, table_file.nlist_ = table_schema.nlist_; table_file.metric_type_ = table_schema.metric_type_; - utils::GetTableFilePath(options_, table_file); + auto status = utils::GetTableFilePath(options_, table_file); + if(!status.ok()) { + return status; + } + auto dateItr = files.find(table_file.date_); if (dateItr == files.end()) { files[table_file.date_] = TableFilesSchema(); @@ -773,7 +780,11 @@ Status SqliteMetaImpl::FilesToMerge(const std::string &table_id, table_file.nlist_ = table_schema.nlist_; table_file.metric_type_ = table_schema.metric_type_; - utils::GetTableFilePath(options_, table_file); + auto status = utils::GetTableFilePath(options_, table_file); + if(!status.ok()) { + return status; + } + auto dateItr = files.find(table_file.date_); if (dateItr == files.end()) { files[table_file.date_] = TableFilesSchema(); @@ -827,7 +838,10 @@ Status SqliteMetaImpl::GetTableFiles(const std::string& table_id, file_schema.nlist_ = table_schema.nlist_; file_schema.metric_type_ = table_schema.metric_type_; - utils::GetTableFilePath(options_, file_schema); + auto status = utils::GetTableFilePath(options_, file_schema); + if(!status.ok()) { + return status; + } table_files.emplace_back(file_schema); } diff --git a/cpp/src/db/scheduler/TaskScheduler.cpp b/cpp/src/db/scheduler/TaskScheduler.cpp index 2c75af9010..0a50c02bfa 100644 --- a/cpp/src/db/scheduler/TaskScheduler.cpp +++ b/cpp/src/db/scheduler/TaskScheduler.cpp @@ -132,4 +132,4 @@ TaskScheduler::TaskWorker() { } } -} \ No newline at end of file +} diff --git a/cpp/src/db/scheduler/TaskScheduler.h b/cpp/src/db/scheduler/TaskScheduler.h index d03fb858ac..f4556696ec 100644 --- a/cpp/src/db/scheduler/TaskScheduler.h +++ b/cpp/src/db/scheduler/TaskScheduler.h @@ -10,6 +10,8 @@ #include "TaskDispatchQueue.h" #include "utils/BlockingQueue.h" +#include + namespace zilliz { namespace milvus { namespace engine { diff --git a/cpp/src/server/ServerConfig.cpp b/cpp/src/server/ServerConfig.cpp index 09299e0032..d251763ad2 100644 --- a/cpp/src/server/ServerConfig.cpp +++ b/cpp/src/server/ServerConfig.cpp @@ -15,23 +15,24 @@ #include "utils/CommonUtil.h" #include "utils/ValidationUtil.h" + namespace zilliz { namespace milvus { namespace server { -constexpr uint64_t MB = 1024*1024; -constexpr uint64_t GB = MB*1024; +constexpr uint64_t MB = 1024 * 1024; +constexpr uint64_t GB = MB * 1024; -ServerConfig& +ServerConfig & ServerConfig::GetInstance() { static ServerConfig config; return config; } ErrorCode -ServerConfig::LoadConfigFile(const std::string& config_filename) { +ServerConfig::LoadConfigFile(const std::string &config_filename) { std::string filename = config_filename; - if(filename.empty()){ + if (filename.empty()) { std::cout << "ERROR: a config file is required" << std::endl; exit(1);//directly exit program if config file not specified } @@ -43,14 +44,14 @@ ServerConfig::LoadConfigFile(const std::string& config_filename) { } try { - ConfigMgr* mgr = const_cast(ConfigMgr::GetInstance()); + ConfigMgr *mgr = const_cast(ConfigMgr::GetInstance()); ErrorCode err = mgr->LoadConfigFile(filename); - if(err != 0) { + if (err != 0) { std::cout << "Server failed to load config file" << std::endl; exit(1);//directly exit program if the config file is illegal } } - catch (YAML::Exception& e) { + catch (YAML::Exception &e) { std::cout << "Server failed to load config file: " << std::endl; return SERVER_UNEXPECTED_ERROR; } @@ -58,86 +59,515 @@ ServerConfig::LoadConfigFile(const std::string& config_filename) { return SERVER_SUCCESS; } -ErrorCode ServerConfig::ValidateConfig() const { - //server config validation +ErrorCode ServerConfig::ValidateConfig() { + + bool okay = true; + if (CheckServerConfig() != SERVER_SUCCESS) { + okay = false; + } + if (CheckDBConfig() != SERVER_SUCCESS) { + okay = false; + } + if (CheckMetricConfig() != SERVER_SUCCESS) { + okay = false; + } + if (CheckCacheConfig() != SERVER_SUCCESS) { + okay = false; + } + if (CheckEngineConfig() != SERVER_SUCCESS) { + okay = false; + } + if (CheckResourceConfig() != SERVER_SUCCESS) { + okay = false; + } + return (okay ? SERVER_SUCCESS : SERVER_INVALID_ARGUMENT); +} + +ErrorCode +ServerConfig::CheckServerConfig() { +/* + server_config: + address: 0.0.0.0 # milvus server ip address + port: 19530 # the port milvus listen to, default: 19530, range: 1025 ~ 65534 + gpu_index: 0 # the gpu milvus use, default: 0, range: 0 ~ gpu number - 1 + mode: single # milvus deployment type: single, cluster, read_only +*/ + bool okay = true; ConfigNode server_config = GetConfig(CONFIG_SERVER); - uint32_t build_index_gpu_index = (uint32_t)server_config.GetInt32Value(CONFIG_GPU_INDEX, 0); - if(ValidationUtil::ValidateGpuIndex(build_index_gpu_index) != SERVER_SUCCESS) { - std::cerr << "Error: invalid gpu_index " << std::to_string(build_index_gpu_index) << std::endl; - return SERVER_INVALID_ARGUMENT; + + std::string ip_address = server_config.GetValue(CONFIG_SERVER_ADDRESS, "127.0.0.1"); + if (ValidationUtil::ValidateIpAddress(ip_address) != SERVER_SUCCESS) { + std::cerr << "Error: invalid server IP address: " << ip_address << std::endl; + okay = false; } - //db config validation - unsigned long total_mem = 0, free_mem = 0; - CommonUtil::GetSystemMemInfo(total_mem, free_mem); + std::string port_str = server_config.GetValue(CONFIG_SERVER_PORT, "19530"); + if (ValidationUtil::ValidateStringIsNumber(port_str) != SERVER_SUCCESS) { + std::cerr << "Error: port " << port_str << " is not a number" << std::endl; + okay = false; + } + else { + int32_t port = std::stol(port_str); + if (port < 1025 | port > 65534) { + std::cerr << "Error: port " << port_str << " out of range [1025, 65534]" << std::endl; + okay = false; + } + } + std::string gpu_index_str = server_config.GetValue(CONFIG_GPU_INDEX, "0"); + if (ValidationUtil::ValidateStringIsNumber(gpu_index_str) != SERVER_SUCCESS) { + std::cerr << "Error: gpu_index " << gpu_index_str << " is not a number" << std::endl; + okay = false; + } + else { + int32_t gpu_index = std::stol(gpu_index_str); + if (ValidationUtil::ValidateGpuIndex(gpu_index) != SERVER_SUCCESS) { + std::cerr << "Error: invalid gpu_index " << gpu_index_str << std::endl; + okay = false; + } + } + + std::string mode = server_config.GetValue(CONFIG_CLUSTER_MODE, "single"); + if (mode != "single" && mode != "cluster" && mode != "read_only") { + std::cerr << "ERROR: mode " << mode << " is not one of ['single', 'cluster', 'read_only']" << std::endl; + okay = false; + } + + return (okay ? SERVER_SUCCESS : SERVER_INVALID_ARGUMENT); +} + +ErrorCode +ServerConfig::CheckDBConfig() { +/* + db_config: + db_path: @MILVUS_DB_PATH@ # milvus data storage path + db_slave_path: # secondry data storage path, split by semicolon + parallel_reduce: false # use multi-threads to reduce topk result + + # URI format: dialect://username:password@host:port/database + # All parts except dialect are optional, but you MUST include the delimiters + # Currently dialect supports mysql or sqlite + db_backend_url: sqlite://:@:/ + + archive_disk_threshold: 0 # triger archive action if storage size exceed this value, 0 means no limit, unit: GB + archive_days_threshold: 0 # files older than x days will be archived, 0 means no limit, unit: day + insert_buffer_size: 4 # maximum insert buffer size allowed, default: 4, unit: GB, should be at least 1 GB. + # the sum of insert_buffer_size and cpu_cache_capacity should be less than total memory, unit: GB +*/ + bool okay = true; ConfigNode db_config = GetConfig(CONFIG_DB); - uint64_t insert_buffer_size = (uint64_t)db_config.GetInt32Value(CONFIG_DB_INSERT_BUFFER_SIZE, 4); - insert_buffer_size *= GB; - if(insert_buffer_size >= total_mem) { - std::cerr << "Error: insert_buffer_size execeed system memory" << std::endl; - return SERVER_INVALID_ARGUMENT; + + std::string db_path = db_config.GetValue(CONFIG_DB_PATH); + if (db_path.empty()) { + std::cerr << "ERROR: db_path is empty" << std::endl; + okay = false; } - //cache config validation + std::string parallel_reduce_str = db_config.GetValue(CONFIG_DB_PARALLEL_REDUCE, "false"); + if (ValidationUtil::ValidateStringIsBool(parallel_reduce_str) != SERVER_SUCCESS) { + std::cerr << "Error: invalid parallel_reduce config: " << parallel_reduce_str << std::endl; + okay = false; + } + + std::string db_backend_url = db_config.GetValue(CONFIG_DB_URL); + if (ValidationUtil::ValidateDbURI(db_backend_url) != SERVER_SUCCESS) { + std::cerr << "Error: invalid db_backend_url " << db_backend_url << std::endl; + okay = false; + } + + std::string archive_disk_threshold_str = db_config.GetValue(CONFIG_DB_INSERT_BUFFER_SIZE, "0"); + if (ValidationUtil::ValidateStringIsNumber(archive_disk_threshold_str) != SERVER_SUCCESS) { + std::cerr << "Error: archive_disk_threshold " << archive_disk_threshold_str << " is not a number" << std::endl; + okay = false; + } + + std::string archive_days_threshold_str = db_config.GetValue(CONFIG_DB_INSERT_BUFFER_SIZE, "0"); + if (ValidationUtil::ValidateStringIsNumber(archive_days_threshold_str) != SERVER_SUCCESS) { + std::cerr << "Error: archive_days_threshold " << archive_days_threshold_str << " is not a number" << std::endl; + okay = false; + } + + std::string insert_buffer_size_str = db_config.GetValue(CONFIG_DB_INSERT_BUFFER_SIZE, "4"); + if (ValidationUtil::ValidateStringIsNumber(insert_buffer_size_str) != SERVER_SUCCESS) { + std::cerr << "Error: insert_buffer_size " << insert_buffer_size_str << " is not a number" << std::endl; + okay = false; + } + else { + uint64_t insert_buffer_size = (uint64_t) std::stol(insert_buffer_size_str); + insert_buffer_size *= GB; + unsigned long total_mem = 0, free_mem = 0; + CommonUtil::GetSystemMemInfo(total_mem, free_mem); + if (insert_buffer_size >= total_mem) { + std::cerr << "Error: insert_buffer_size exceed system memory" << std::endl; + okay = false; + } + } + + return (okay ? SERVER_SUCCESS : SERVER_INVALID_ARGUMENT); +} + +ErrorCode +ServerConfig::CheckMetricConfig() { +/* + metric_config: + is_startup: off # if monitoring start: on, off + collector: prometheus # metrics collector: prometheus + prometheus_config: # following are prometheus configure + port: 8080 # the port prometheus use to fetch metrics + (not used) push_gateway_ip_address: 127.0.0.1 # push method configure: push gateway ip address + (not used) push_gateway_port: 9091 # push method configure: push gateway port +*/ + bool okay = true; + ConfigNode metric_config = GetConfig(CONFIG_METRIC); + + std::string is_startup_str = metric_config.GetValue(CONFIG_METRIC_IS_STARTUP, "off"); + if (ValidationUtil::ValidateStringIsBool(is_startup_str) != SERVER_SUCCESS) { + std::cerr << "Error: invalid is_startup config: " << is_startup_str << std::endl; + okay = false; + } + + std::string port_str = metric_config.GetChild(CONFIG_PROMETHEUS).GetValue(CONFIG_METRIC_PROMETHEUS_PORT, "8080"); + if (ValidationUtil::ValidateStringIsNumber(port_str) != SERVER_SUCCESS) { + std::cerr << "Error: port specified in prometheus_config " << port_str << " is not a number" << std::endl; + okay = false; + } + + return (okay ? SERVER_SUCCESS : SERVER_INVALID_ARGUMENT); +} + +ErrorCode +ServerConfig::CheckCacheConfig() { +/* + cache_config: + cpu_cache_capacity: 16 # how many memory are used as cache, unit: GB, range: 0 ~ less than total memory + cpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0 + insert_cache_immediately: false # insert data will be load into cache immediately for hot query + gpu_cache_capacity: 5 # how many memory are used as cache in gpu, unit: GB, RANGE: 0 ~ less than total memory + gpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0 + gpu_ids: # gpu id + - 0 + - 1 +*/ + bool okay = true; ConfigNode cache_config = GetConfig(CONFIG_CACHE); - uint64_t cache_cap = (uint64_t)cache_config.GetInt64Value(CONFIG_CPU_CACHE_CAPACITY, 16); - cache_cap *= GB; - if(cache_cap >= total_mem) { - std::cerr << "Error: cpu_cache_capacity execeed system memory" << std::endl; - return SERVER_INVALID_ARGUMENT; - } if(cache_cap > (double)total_mem*0.9) { - std::cerr << "Warning: cpu_cache_capacity value is too aggressive" << std::endl; + + std::string cpu_cache_capacity_str = cache_config.GetValue(CONFIG_CPU_CACHE_CAPACITY, "16"); + if (ValidationUtil::ValidateStringIsNumber(cpu_cache_capacity_str) != SERVER_SUCCESS) { + std::cerr << "Error: cpu_cache_capacity " << cpu_cache_capacity_str << " is not a number" << std::endl; + okay = false; + } + else { + uint64_t cpu_cache_capacity = (uint64_t) std::stol(cpu_cache_capacity_str); + cpu_cache_capacity *= GB; + unsigned long total_mem = 0, free_mem = 0; + CommonUtil::GetSystemMemInfo(total_mem, free_mem); + if (cpu_cache_capacity >= total_mem) { + std::cerr << "Error: cpu_cache_capacity exceed system memory" << std::endl; + okay = false; + } + else if (cpu_cache_capacity > (double) total_mem * 0.9) { + std::cerr << "Warning: cpu_cache_capacity value is too aggressive" << std::endl; + } + + uint64_t insert_buffer_size = (uint64_t) GetConfig(CONFIG_DB).GetInt32Value(CONFIG_DB_INSERT_BUFFER_SIZE, 4); + insert_buffer_size *= GB; + if (insert_buffer_size + cpu_cache_capacity >= total_mem) { + std::cerr << "Error: sum of cpu_cache_capacity and insert_buffer_size exceed system memory" << std::endl; + okay = false; + } } - if(insert_buffer_size + cache_cap >= total_mem) { - std::cerr << "Error: sum of cpu_cache_capacity and insert_buffer_size execeed system memory" << std::endl; - return SERVER_INVALID_ARGUMENT; + std::string cpu_cache_free_percent_str = cache_config.GetValue(CACHE_FREE_PERCENT, "0.85"); + double cpu_cache_free_percent; + if (ValidationUtil::ValidateStringIsDouble(cpu_cache_free_percent_str, cpu_cache_free_percent) != SERVER_SUCCESS) { + std::cerr << "Error: cpu_cache_free_percent " << cpu_cache_free_percent_str << " is not a double" << std::endl; + okay = false; + } + else if (cpu_cache_free_percent < std::numeric_limits::epsilon() || cpu_cache_free_percent > 1.0) { + std::cerr << "Error: invalid cpu_cache_free_percent " << cpu_cache_free_percent_str << std::endl; + okay = false; } - double free_percent = cache_config.GetDoubleValue(server::CACHE_FREE_PERCENT, 0.85); - if(free_percent < std::numeric_limits::epsilon() || free_percent > 1.0) { - std::cerr << "Error: invalid cache_free_percent " << std::to_string(free_percent) << std::endl; - return SERVER_INVALID_ARGUMENT; + std::string insert_cache_immediately_str = cache_config.GetValue(CONFIG_INSERT_CACHE_IMMEDIATELY, "false"); + if (ValidationUtil::ValidateStringIsBool(insert_cache_immediately_str) != SERVER_SUCCESS) { + std::cerr << "Error: invalid insert_cache_immediately config: " << insert_cache_immediately_str << std::endl; + okay = false; } - // Resource config validation - server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE); - if (config.GetChildren().empty()) { + std::string gpu_cache_capacity_str = cache_config.GetValue(CONFIG_GPU_CACHE_CAPACITY, "5"); + if (ValidationUtil::ValidateStringIsNumber(gpu_cache_capacity_str) != SERVER_SUCCESS) { + std::cerr << "Error: gpu_cache_capacity " << gpu_cache_capacity_str << " is not a number" << std::endl; + okay = false; + } + else { + uint64_t gpu_cache_capacity = (uint64_t) std::stol(gpu_cache_capacity_str); + gpu_cache_capacity *= GB; + int gpu_index = GetConfig(CONFIG_SERVER).GetInt32Value(CONFIG_GPU_INDEX, 0); + size_t gpu_memory; + if (ValidationUtil::GetGpuMemory(gpu_index, gpu_memory) != SERVER_SUCCESS) { + std::cerr << "Error: could not get gpu memory for device " << gpu_index << std::endl; + okay = false; + } + else if (gpu_cache_capacity >= gpu_memory) { + std::cerr << "Error: gpu_cache_capacity " << gpu_cache_capacity + << " exceed total gpu memory " << gpu_memory << std::endl; + okay = false; + } + else if (gpu_cache_capacity > (double) gpu_memory * 0.9) { + std::cerr << "Warning: gpu_cache_capacity value is too aggressive" << std::endl; + } + } + + std::string gpu_cache_free_percent_str = cache_config.GetValue(GPU_CACHE_FREE_PERCENT, "0.85"); + double gpu_cache_free_percent; + if (ValidationUtil::ValidateStringIsDouble(gpu_cache_free_percent_str, gpu_cache_free_percent) != SERVER_SUCCESS) { + std::cerr << "Error: gpu_cache_free_percent " << gpu_cache_free_percent_str << " is not a double" << std::endl; + okay = false; + } + else if (gpu_cache_free_percent < std::numeric_limits::epsilon() || gpu_cache_free_percent > 1.0) { + std::cerr << "Error: invalid gpu_cache_free_percent " << gpu_cache_free_percent << std::endl; + okay = false; + } + + auto conf_gpu_ids = cache_config.GetSequence(server::CONFIG_GPU_IDS); + + for (std::string &gpu_id : conf_gpu_ids) { + if (ValidationUtil::ValidateStringIsNumber(gpu_id) != SERVER_SUCCESS) { + std::cerr << "Error: gpu_id " << gpu_id << " is not a number" << std::endl; + okay = false; + } + else if (ValidationUtil::ValidateGpuIndex(std::stol(gpu_id)) != SERVER_SUCCESS) { + std::cerr << "Error: gpu_id " << gpu_id << " is invalid" << std::endl; + okay = false; + } + } + + return (okay ? SERVER_SUCCESS : SERVER_INVALID_ARGUMENT); +} + +ErrorCode +ServerConfig::CheckEngineConfig() { +/* + engine_config: + use_blas_threshold: 20 + omp_thread_num: 0 # how many compute threads be used by engine, 0 means use all cpu core to compute +*/ + bool okay = true; + ConfigNode engine_config = GetConfig(CONFIG_ENGINE); + + std::string use_blas_threshold_str = engine_config.GetValue(CONFIG_DCBT, "20"); + if (ValidationUtil::ValidateStringIsNumber(use_blas_threshold_str) != SERVER_SUCCESS) { + std::cerr << "Error: use_blas_threshold " << use_blas_threshold_str << " is not a number" << std::endl; + okay = false; + } + + std::string omp_thread_num_str = engine_config.GetValue(CONFIG_OMP_THREAD_NUM, "0"); + if (ValidationUtil::ValidateStringIsNumber(omp_thread_num_str) != SERVER_SUCCESS) { + std::cerr << "Error: omp_thread_num " << omp_thread_num_str << " is not a number" << std::endl; + okay = false; + } + else { + int32_t omp_thread = std::stol(omp_thread_num_str); + uint32_t sys_thread_cnt = 8; + if (omp_thread > CommonUtil::GetSystemAvailableThreads(sys_thread_cnt)) { + std::cerr << "Error: omp_thread_num " << omp_thread_num_str << " > system available thread " + << sys_thread_cnt << std::endl; + okay = false; + } + } + + return (okay ? SERVER_SUCCESS : SERVER_INVALID_ARGUMENT); +} + +ErrorCode +ServerConfig::CheckResourceConfig() { +/* + + resource_config: + # resource list, length: 0~N + # please set a DISK resource and a CPU resource least, or system will not return query result. + # + # example: + # resource_name: # resource name, just using in connections below + # type: DISK # resource type, optional: DISK/CPU/GPU + # device_id: 0 + # enable_executor: false # if is enable executor, optional: true, false + + resources: + ssda: + type: DISK + device_id: 0 + enable_executor: false + + cpu: + type: CPU + device_id: 0 + enable_executor: false + + gpu0: + type: GPU + device_id: 0 + enable_executor: true + gpu_resource_num: 2 + pinned_memory: 300 + temp_memory: 300 + + # connection list, length: 0~N + # example: + # connection_name: + # speed: 100 # unit: MS/s + # endpoint: ${resource_name}===${resource_name} + connections: + io: + speed: 500 + endpoint: ssda===cpu + pcie0: + speed: 11000 + endpoint: cpu===gpu0 +*/ + bool okay = true; + server::ConfigNode resource_config = GetConfig(CONFIG_RESOURCE); + if (resource_config.GetChildren().empty()) { std::cerr << "Error: no context under resource" << std::endl; - return SERVER_INVALID_ARGUMENT; + okay = false; } - auto resources = config.GetChild(server::CONFIG_RESOURCES).GetChildren(); + auto resources = resource_config.GetChild(CONFIG_RESOURCES).GetChildren(); if (resources.empty()) { - std::cerr << "Children of resource_config null exception" << std::endl; - return SERVER_INVALID_ARGUMENT; + std::cerr << "no resources specified" << std::endl; + okay = false; } bool resource_valid_flag = false; + bool hasDisk = false; + bool hasCPU = false; + bool hasExecutor = false; + std::set resource_list; for (auto &resource : resources) { - auto &resconf = resource.second; - auto type = resconf.GetValue(server::CONFIG_RESOURCE_TYPE); - if(type == "GPU") { - auto device_id = resconf.GetInt64Value(server::CONFIG_RESOURCE_DEVICE_ID, 0); - if(device_id == build_index_gpu_index) { + resource_list.emplace(resource.first); + auto &resource_conf = resource.second; + auto type = resource_conf.GetValue(CONFIG_RESOURCE_TYPE); + + std::string device_id_str = resource_conf.GetValue(CONFIG_RESOURCE_DEVICE_ID, "0"); + int32_t device_id = -1; + if (ValidationUtil::ValidateStringIsNumber(device_id_str) != SERVER_SUCCESS) { + std::cerr << "Error: device_id " << device_id_str << " is not a number" << std::endl; + okay = false; + } + else { + device_id = std::stol(device_id_str); + } + + std::string enable_executor_str = resource_conf.GetValue(CONFIG_RESOURCE_ENABLE_EXECUTOR, "off"); + if (ValidationUtil::ValidateStringIsBool(enable_executor_str) != SERVER_SUCCESS) { + std::cerr << "Error: invalid enable_executor config: " << enable_executor_str << std::endl; + okay = false; + } + + if (type == "DISK") { + hasDisk = true; + } + else if (type == "CPU") { + hasCPU = true; + if (resource_conf.GetBoolValue(CONFIG_RESOURCE_ENABLE_EXECUTOR, false)) { + hasExecutor = true; + } + } + else if (type == "GPU") { + int build_index_gpu_index = GetConfig(CONFIG_SERVER).GetInt32Value(CONFIG_GPU_INDEX, 0); + if (device_id == build_index_gpu_index) { resource_valid_flag = true; } + if (resource_conf.GetBoolValue(CONFIG_RESOURCE_ENABLE_EXECUTOR, false)) { + hasExecutor = true; + } + std::string gpu_resource_num_str = resource_conf.GetValue(CONFIG_RESOURCE_NUM, "2"); + if (ValidationUtil::ValidateStringIsNumber(gpu_resource_num_str) != SERVER_SUCCESS) { + std::cerr << "Error: gpu_resource_num " << gpu_resource_num_str << " is not a number" << std::endl; + okay = false; + } + bool mem_valid = true; + std::string pinned_memory_str = resource_conf.GetValue(CONFIG_RESOURCE_PIN_MEMORY, "300"); + if (ValidationUtil::ValidateStringIsNumber(pinned_memory_str) != SERVER_SUCCESS) { + std::cerr << "Error: pinned_memory " << pinned_memory_str << " is not a number" << std::endl; + okay = false; + mem_valid = false; + } + std::string temp_memory_str = resource_conf.GetValue(CONFIG_RESOURCE_TEMP_MEMORY, "300"); + if (ValidationUtil::ValidateStringIsNumber(temp_memory_str) != SERVER_SUCCESS) { + std::cerr << "Error: temp_memory " << temp_memory_str << " is not a number" << std::endl; + okay = false; + mem_valid = false; + } + if (mem_valid) { + size_t gpu_memory; + if (ValidationUtil::GetGpuMemory(device_id, gpu_memory) != SERVER_SUCCESS) { + std::cerr << "Error: could not get gpu memory for device " << device_id << std::endl; + okay = false; + } + else { + size_t prealoc_mem = std::stol(pinned_memory_str) + std::stol(temp_memory_str); + if (prealoc_mem >= gpu_memory) { + std::cerr << "Error: sum of pinned_memory and temp_memory " << prealoc_mem + << " exceeds total gpu memory " << gpu_memory << " for device " << device_id << std::endl; + okay = false; + } + } + } } } - if(!resource_valid_flag) { + if (!resource_valid_flag) { std::cerr << "Building index GPU can't be found in resource config." << std::endl; - return SERVER_INVALID_ARGUMENT; + okay = false; + } + if (!hasDisk || !hasCPU) { + std::cerr << "No DISK or CPU resource" << std::endl; + okay = false; + } + if (!hasExecutor) { + std::cerr << "No CPU or GPU resource has executor enabled" << std::endl; + okay = false; } - return SERVER_SUCCESS; + auto connections = resource_config.GetChild(CONFIG_RESOURCE_CONNECTIONS).GetChildren(); + for (auto &connection : connections) { + auto &connection_conf = connection.second; + + std::string speed_str = connection_conf.GetValue(CONFIG_SPEED_CONNECTIONS); + if (ValidationUtil::ValidateStringIsNumber(speed_str) != SERVER_SUCCESS) { + std::cerr << "Error: speed " << speed_str << " is not a number" << std::endl; + okay = false; + } + + std::string endpoint_str = connection_conf.GetValue(CONFIG_ENDPOINT_CONNECTIONS); + std::string delimiter = "==="; + auto delimiter_pos = endpoint_str.find(delimiter); + if (delimiter_pos == std::string::npos) { + std::cerr << "Error: invalid endpoint format: " << endpoint_str << std::endl; + okay = false; + } + else { + std::string left_resource = endpoint_str.substr(0, delimiter_pos); + if (resource_list.find(left_resource) == resource_list.end()) { + std::cerr << "Error: left resource " << left_resource << " does not exist" << std::endl; + okay = false; + } + std::string right_resource = endpoint_str.substr(delimiter_pos + delimiter.length(), endpoint_str.length()); + if (resource_list.find(right_resource) == resource_list.end()) { + std::cerr << "Error: right resource " << right_resource << " does not exist" << std::endl; + okay = false; + } + } + } + + return (okay ? SERVER_SUCCESS : SERVER_INVALID_ARGUMENT); } void ServerConfig::PrintAll() const { - if(const ConfigMgr* mgr = ConfigMgr::GetInstance()) { + if (const ConfigMgr *mgr = ConfigMgr::GetInstance()) { std::string str = mgr->DumpString(); // SERVER_LOG_INFO << "\n" << str; std::cout << "\n" << str << std::endl; @@ -145,16 +575,16 @@ ServerConfig::PrintAll() const { } ConfigNode -ServerConfig::GetConfig(const std::string& name) const { - const ConfigMgr* mgr = ConfigMgr::GetInstance(); - const ConfigNode& root_node = mgr->GetRootNode(); +ServerConfig::GetConfig(const std::string &name) const { + const ConfigMgr *mgr = ConfigMgr::GetInstance(); + const ConfigNode &root_node = mgr->GetRootNode(); return root_node.GetChild(name); } -ConfigNode& -ServerConfig::GetConfig(const std::string& name) { - ConfigMgr* mgr = ConfigMgr::GetInstance(); - ConfigNode& root_node = mgr->GetRootNode(); +ConfigNode & +ServerConfig::GetConfig(const std::string &name) { + ConfigMgr *mgr = ConfigMgr::GetInstance(); + ConfigNode &root_node = mgr->GetRootNode(); return root_node.GetChild(name); } diff --git a/cpp/src/server/ServerConfig.h b/cpp/src/server/ServerConfig.h index 99c535dc04..ac9876363f 100644 --- a/cpp/src/server/ServerConfig.h +++ b/cpp/src/server/ServerConfig.h @@ -52,9 +52,7 @@ static const char* CONFIG_OMP_THREAD_NUM = "omp_thread_num"; static const char* CONFIG_RESOURCE = "resource_config"; static const char* CONFIG_RESOURCES = "resources"; static const char* CONFIG_RESOURCE_TYPE = "type"; -static const char* CONFIG_RESOURCE_MEMORY = "memory"; static const char* CONFIG_RESOURCE_DEVICE_ID = "device_id"; -static const char* CONFIG_RESOURCE_ENABLE_LOADER = "enable_loader"; static const char* CONFIG_RESOURCE_ENABLE_EXECUTOR = "enable_executor"; static const char* CONFIG_RESOURCE_NUM = "gpu_resource_num"; static const char* CONFIG_RESOURCE_PIN_MEMORY = "pinned_memory"; @@ -69,11 +67,19 @@ class ServerConfig { static ServerConfig &GetInstance(); ErrorCode LoadConfigFile(const std::string& config_filename); - ErrorCode ValidateConfig() const; + ErrorCode ValidateConfig(); void PrintAll() const; ConfigNode GetConfig(const std::string& name) const; ConfigNode& GetConfig(const std::string& name); + + private: + ErrorCode CheckServerConfig(); + ErrorCode CheckDBConfig(); + ErrorCode CheckMetricConfig(); + ErrorCode CheckCacheConfig(); + ErrorCode CheckEngineConfig(); + ErrorCode CheckResourceConfig(); }; } diff --git a/cpp/src/utils/BlockingQueue.inl b/cpp/src/utils/BlockingQueue.inl index e703f01576..21146cdfbb 100644 --- a/cpp/src/utils/BlockingQueue.inl +++ b/cpp/src/utils/BlockingQueue.inl @@ -1,6 +1,6 @@ #pragma once -#include "Log.h" +//#include "Log.h" #include "Error.h" namespace zilliz { @@ -17,7 +17,7 @@ BlockingQueue::Put(const T &task) { std::string error_msg = "blocking queue is full, capacity: " + std::to_string(capacity_) + " queue_size: " + std::to_string(queue_.size()); - SERVER_LOG_ERROR << error_msg; + //SERVER_LOG_ERROR << error_msg; throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg); } @@ -33,7 +33,7 @@ BlockingQueue::Take() { if (queue_.empty()) { std::string error_msg = "blocking queue empty"; - SERVER_LOG_ERROR << error_msg; + //SERVER_LOG_ERROR << error_msg; throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg); } @@ -57,7 +57,7 @@ BlockingQueue::Front() { empty_.wait(lock, [this] { return !queue_.empty(); }); if (queue_.empty()) { std::string error_msg = "blocking queue empty"; - SERVER_LOG_ERROR << error_msg; + //SERVER_LOG_ERROR << error_msg; throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg); } T front(queue_.front()); @@ -72,7 +72,7 @@ BlockingQueue::Back() { if (queue_.empty()) { std::string error_msg = "blocking queue empty"; - SERVER_LOG_ERROR << error_msg; + //SERVER_LOG_ERROR << error_msg; throw ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg); } diff --git a/cpp/src/utils/LogUtil.cpp b/cpp/src/utils/LogUtil.cpp index 07f3b92aa5..ec6f078500 100644 --- a/cpp/src/utils/LogUtil.cpp +++ b/cpp/src/utils/LogUtil.cpp @@ -9,11 +9,70 @@ #include #include +#include +#include + + namespace zilliz { namespace milvus { namespace server { -int32_t InitLog(const std::string& log_config_file) { +namespace { +static int global_idx = 0; +static int debug_idx = 0; +static int warning_idx = 0; +static int trace_idx = 0; +static int error_idx = 0; +static int fatal_idx = 0; +} + +// TODO(yzb) : change the easylogging library to get the log level from parameter rather than filename +void rolloutHandler(const char *filename, std::size_t size) { + char *dirc = strdup(filename); + char *basec = strdup(filename); + char *dir = dirname(dirc); + char *base = basename(basec); + + std::string s(base); + std::stringstream ss; + std::string + list[] = {"\\", " ", "\'", "\"", "*", "\?", "{", "}", ";", "<", ">", "|", "^", "&", "$", "#", "!", "`", "~"}; + std::string::size_type position; + for (auto substr : list) { + position = 0; + while ((position = s.find_first_of(substr, position)) != std::string::npos) { + s.insert(position, "\\"); + position += 2; + } + } + int ret; + std::string m(std::string(dir) + "/" + s); + s = m; + if ((position = s.find("global")) != std::string::npos) { + s.append("." + std::to_string(++global_idx)); + ret = rename(m.c_str(), s.c_str()); + } else if ((position = s.find("debug")) != std::string::npos) { + s.append("." + std::to_string(++debug_idx)); + ret = rename(m.c_str(), s.c_str()); + } else if ((position = s.find("warning")) != std::string::npos) { + s.append("." + std::to_string(++warning_idx)); + ret = rename(m.c_str(), s.c_str()); + } else if ((position = s.find("trace")) != std::string::npos) { + s.append("." + std::to_string(++trace_idx)); + ret = rename(m.c_str(), s.c_str()); + } else if ((position = s.find("error")) != std::string::npos) { + s.append("." + std::to_string(++error_idx)); + ret = rename(m.c_str(), s.c_str()); + } else if ((position = s.find("fatal")) != std::string::npos) { + s.append("." + std::to_string(++fatal_idx)); + ret = rename(m.c_str(), s.c_str()); + } else { + s.append("." + std::to_string(++global_idx)); + ret = rename(m.c_str(), s.c_str()); + } +} + +int32_t InitLog(const std::string &log_config_file) { #if 0 ServerConfig &config = ServerConfig::GetInstance(); ConfigNode log_config = config.GetConfig(CONFIG_LOG); @@ -50,8 +109,10 @@ int32_t InitLog(const std::string& log_config_file) { #else el::Configurations conf(log_config_file); #endif - el::Loggers::reconfigureAllLoggers(conf); + + el::Loggers::addFlag(el::LoggingFlag::StrictLogFileSizeCheck); + el::Helpers::installPreRollOutCallback(rolloutHandler); return 0; } diff --git a/cpp/src/utils/LogUtil.h b/cpp/src/utils/LogUtil.h index 4a5b463c42..d6c9eff252 100644 --- a/cpp/src/utils/LogUtil.h +++ b/cpp/src/utils/LogUtil.h @@ -11,9 +11,7 @@ namespace zilliz { namespace milvus { namespace server { - int32_t InitLog(const std::string& log_config_file); - inline std::string GetFileName(std::string filename) { int pos = filename.find_last_of('/'); return filename.substr(pos + 1); diff --git a/cpp/src/utils/ValidationUtil.cpp b/cpp/src/utils/ValidationUtil.cpp index 653c0472dc..91411aac32 100644 --- a/cpp/src/utils/ValidationUtil.cpp +++ b/cpp/src/utils/ValidationUtil.cpp @@ -4,6 +4,12 @@ #include +#include + +#include +#include + + namespace zilliz { namespace milvus { namespace server { @@ -51,15 +57,16 @@ ValidationUtil::ValidateTableDimension(int64_t dimension) { if (dimension <= 0 || dimension > TABLE_DIMENSION_LIMIT) { SERVER_LOG_ERROR << "Table dimension excceed the limitation: " << TABLE_DIMENSION_LIMIT; return SERVER_INVALID_VECTOR_DIMENSION; - } else { + } + else { return SERVER_SUCCESS; } } ErrorCode ValidationUtil::ValidateTableIndexType(int32_t index_type) { - int engine_type = (int)engine::EngineType(index_type); - if(engine_type <= 0 || engine_type > (int)engine::EngineType::MAX_VALUE) { + int engine_type = (int) engine::EngineType(index_type); + if (engine_type <= 0 || engine_type > (int) engine::EngineType::MAX_VALUE) { return SERVER_INVALID_INDEX_TYPE; } @@ -68,7 +75,7 @@ ValidationUtil::ValidateTableIndexType(int32_t index_type) { ErrorCode ValidationUtil::ValidateTableIndexNlist(int32_t nlist) { - if(nlist <= 0) { + if (nlist <= 0) { return SERVER_INVALID_INDEX_NLIST; } @@ -77,7 +84,7 @@ ValidationUtil::ValidateTableIndexNlist(int32_t nlist) { ErrorCode ValidationUtil::ValidateTableIndexFileSize(int64_t index_file_size) { - if(index_file_size <= 0 || index_file_size > INDEX_FILE_SIZE_LIMIT) { + if (index_file_size <= 0 || index_file_size > INDEX_FILE_SIZE_LIMIT) { return SERVER_INVALID_INDEX_FILE_SIZE; } @@ -86,14 +93,14 @@ ValidationUtil::ValidateTableIndexFileSize(int64_t index_file_size) { ErrorCode ValidationUtil::ValidateTableIndexMetricType(int32_t metric_type) { - if(metric_type != (int32_t)engine::MetricType::L2 && metric_type != (int32_t)engine::MetricType::IP) { + if (metric_type != (int32_t) engine::MetricType::L2 && metric_type != (int32_t) engine::MetricType::IP) { return SERVER_INVALID_INDEX_METRIC_TYPE; } return SERVER_SUCCESS; } ErrorCode -ValidationUtil::ValidateSearchTopk(int64_t top_k, const engine::meta::TableSchema& table_schema) { +ValidationUtil::ValidateSearchTopk(int64_t top_k, const engine::meta::TableSchema &table_schema) { if (top_k <= 0 || top_k > 2048) { return SERVER_INVALID_TOPK; } @@ -102,7 +109,7 @@ ValidationUtil::ValidateSearchTopk(int64_t top_k, const engine::meta::TableSchem } ErrorCode -ValidationUtil::ValidateSearchNprobe(int64_t nprobe, const engine::meta::TableSchema& table_schema) { +ValidationUtil::ValidateSearchNprobe(int64_t nprobe, const engine::meta::TableSchema &table_schema) { if (nprobe <= 0 || nprobe > table_schema.nlist_) { return SERVER_INVALID_NPROBE; } @@ -119,7 +126,7 @@ ValidationUtil::ValidateGpuIndex(uint32_t gpu_index) { return SERVER_UNEXPECTED_ERROR; } - if(gpu_index >= num_devices) { + if (gpu_index >= num_devices) { return SERVER_INVALID_ARGUMENT; } @@ -127,7 +134,7 @@ ValidationUtil::ValidateGpuIndex(uint32_t gpu_index) { } ErrorCode -ValidationUtil::GetGpuMemory(uint32_t gpu_index, size_t& memory) { +ValidationUtil::GetGpuMemory(uint32_t gpu_index, size_t &memory) { cudaDeviceProp deviceProp; auto cuda_err = cudaGetDeviceProperties(&deviceProp, gpu_index); if (cuda_err) { @@ -139,6 +146,108 @@ ValidationUtil::GetGpuMemory(uint32_t gpu_index, size_t& memory) { return SERVER_SUCCESS; } +ErrorCode +ValidationUtil::ValidateIpAddress(const std::string &ip_address) { + + struct in_addr address; + + int result = inet_pton(AF_INET, ip_address.c_str(), &address); + + switch (result) { + case 1:return SERVER_SUCCESS; + case 0:SERVER_LOG_ERROR << "Invalid IP address: " << ip_address; + return SERVER_INVALID_ARGUMENT; + default:SERVER_LOG_ERROR << "inet_pton conversion error"; + return SERVER_UNEXPECTED_ERROR; + } +} + +ErrorCode +ValidationUtil::ValidateStringIsNumber(const std::string &string) { + if (!string.empty() && std::all_of(string.begin(), string.end(), ::isdigit)) { + return SERVER_SUCCESS; + } + else { + return SERVER_INVALID_ARGUMENT; + } +} + +ErrorCode +ValidationUtil::ValidateStringIsBool(std::string &str) { + std::transform(str.begin(), str.end(), str.begin(), ::tolower); + if (str == "true" || str == "on" || str == "yes" || str == "1" || + str == "false" || str == "off" || str == "no" || str == "0" || + str.empty()) { + return SERVER_SUCCESS; + } + else { + return SERVER_INVALID_ARGUMENT; + } +} + +ErrorCode +ValidationUtil::ValidateStringIsDouble(const std::string &str, double &val) { + char *end = nullptr; + val = std::strtod(str.c_str(), &end); + if (end != str.c_str() && *end == '\0' && val != HUGE_VAL) { + return SERVER_SUCCESS; + } + else { + return SERVER_INVALID_ARGUMENT; + } +} + +ErrorCode +ValidationUtil::ValidateDbURI(const std::string &uri) { + std::string dialectRegex = "(.*)"; + std::string usernameRegex = "(.*)"; + std::string passwordRegex = "(.*)"; + std::string hostRegex = "(.*)"; + std::string portRegex = "(.*)"; + std::string dbNameRegex = "(.*)"; + std::string uriRegexStr = dialectRegex + "\\:\\/\\/" + + usernameRegex + "\\:" + + passwordRegex + "\\@" + + hostRegex + "\\:" + + portRegex + "\\/" + + dbNameRegex; + std::regex uriRegex(uriRegexStr); + std::smatch pieces_match; + + bool okay = true; + + if (std::regex_match(uri, pieces_match, uriRegex)) { + std::string dialect = pieces_match[1].str(); + std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower); + if (dialect.find("mysql") == std::string::npos && dialect.find("sqlite") == std::string::npos) { + SERVER_LOG_ERROR << "Invalid dialect in URI: dialect = " << dialect; + okay = false; + } + + std::string host = pieces_match[4].str(); + if (!host.empty() && host != "localhost") { + if (ValidateIpAddress(host) != SERVER_SUCCESS) { + SERVER_LOG_ERROR << "Invalid host ip address in uri = " << host; + okay = false; + } + } + + std::string port = pieces_match[5].str(); + if (!port.empty()) { + if (ValidateStringIsNumber(port) != SERVER_SUCCESS) { + SERVER_LOG_ERROR << "Invalid port in uri = " << port; + okay = false; + } + } + } + else { + SERVER_LOG_ERROR << "Wrong URI format: URI = " << uri; + okay = false; + } + + return (okay ? SERVER_SUCCESS : SERVER_INVALID_ARGUMENT); +} + } } } \ No newline at end of file diff --git a/cpp/src/utils/ValidationUtil.h b/cpp/src/utils/ValidationUtil.h index e504354c17..2817716c10 100644 --- a/cpp/src/utils/ValidationUtil.h +++ b/cpp/src/utils/ValidationUtil.h @@ -40,7 +40,19 @@ public: GetGpuMemory(uint32_t gpu_index, size_t &memory); static ErrorCode - ValidateConfig(); + ValidateIpAddress(const std::string &ip_address); + + static ErrorCode + ValidateStringIsNumber(const std::string &str); + + static ErrorCode + ValidateStringIsBool(std::string &str); + + static ErrorCode + ValidateStringIsDouble(const std::string &str, double &val); + + static ErrorCode + ValidateDbURI(const std::string &uri); }; } diff --git a/cpp/unittest/db/db_tests.cpp b/cpp/unittest/db/db_tests.cpp index 2c300849b4..50724eddc3 100644 --- a/cpp/unittest/db/db_tests.cpp +++ b/cpp/unittest/db/db_tests.cpp @@ -315,6 +315,79 @@ TEST_F(DBTest, PRELOADTABLE_TEST) { } +TEST_F(DBTest, SHUTDOWN_TEST) { + db_->Stop(); + + engine::meta::TableSchema table_info = BuildTableSchema(); + engine::Status stat = db_->CreateTable(table_info); + ASSERT_FALSE(stat.ok()); + + stat = db_->DescribeTable(table_info); + ASSERT_FALSE(stat.ok()); + + bool has_table = false; + stat = db_->HasTable(table_info.table_id_, has_table); + ASSERT_FALSE(stat.ok()); + + engine::IDNumbers ids; + stat = db_->InsertVectors(table_info.table_id_, 0, nullptr, ids); + ASSERT_FALSE(stat.ok()); + + stat = db_->PreloadTable(table_info.table_id_); + ASSERT_FALSE(stat.ok()); + + uint64_t row_count = 0; + stat = db_->GetTableRowCount(table_info.table_id_, row_count); + ASSERT_FALSE(stat.ok()); + + engine::TableIndex index; + stat = db_->CreateIndex(table_info.table_id_, index); + ASSERT_FALSE(stat.ok()); + + stat = db_->DescribeIndex(table_info.table_id_, index); + ASSERT_FALSE(stat.ok()); + + engine::meta::DatesT dates; + engine::QueryResults results; + stat = db_->Query(table_info.table_id_, 1, 1, 1, nullptr, dates, results); + ASSERT_FALSE(stat.ok()); + std::vector file_ids; + stat = db_->Query(table_info.table_id_, file_ids, 1, 1, 1, nullptr, dates, results); + ASSERT_FALSE(stat.ok()); + + stat = db_->DeleteTable(table_info.table_id_, dates); + ASSERT_FALSE(stat.ok()); +} + +TEST_F(DBTest, INDEX_TEST) { + engine::meta::TableSchema table_info = BuildTableSchema(); + engine::Status stat = db_->CreateTable(table_info); + + int64_t nb = VECTOR_COUNT; + std::vector xb; + BuildVectors(nb, xb); + + engine::IDNumbers vector_ids; + db_->InsertVectors(TABLE_NAME, nb, xb.data(), vector_ids); + ASSERT_EQ(vector_ids.size(), nb); + + engine::TableIndex index; + index.engine_type_ = (int)engine::EngineType::FAISS_IVFSQ8; + index.metric_type_ = (int)engine::MetricType::IP; + stat = db_->CreateIndex(table_info.table_id_, index); + ASSERT_TRUE(stat.ok()); + + engine::TableIndex index_out; + stat = db_->DescribeIndex(table_info.table_id_, index_out); + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(index.engine_type_, index_out.engine_type_); + ASSERT_EQ(index.nlist_, index_out.nlist_); + ASSERT_EQ(table_info.metric_type_, index_out.metric_type_); + + stat = db_->DropIndex(table_info.table_id_); + ASSERT_TRUE(stat.ok()); +} + TEST_F(DBTest2, ARHIVE_DISK_CHECK) { engine::meta::TableSchema table_info = BuildTableSchema(); diff --git a/cpp/unittest/db/utils.cpp b/cpp/unittest/db/utils.cpp index 5b8d82522d..f598fb1f8a 100644 --- a/cpp/unittest/db/utils.cpp +++ b/cpp/unittest/db/utils.cpp @@ -13,6 +13,7 @@ #include "db/Factories.h" #include "db/Options.h" #include "server/ServerConfig.h" +#include "knowhere/index/vector_index/gpu_ivf.h" INITIALIZE_EASYLOGGINGPP @@ -59,6 +60,8 @@ engine::Options BaseTest::GetOptions() { void DBTest::SetUp() { BaseTest::SetUp(); + zilliz::knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(0, 1024*1024*200, 1024*1024*300, 2); + server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE); config.AddSequenceItem(server::CONFIG_GPU_IDS, "0"); @@ -84,6 +87,8 @@ void DBTest::TearDown() { db_->DropAll(); delete db_; + zilliz::knowhere::FaissGpuResourceMgr::GetInstance().Free(); + engine::ResMgrInst::GetInstance()->Stop(); engine::SchedInst::GetInstance()->Stop(); diff --git a/cpp/unittest/scheduler/resource_test.cpp b/cpp/unittest/scheduler/resource_test.cpp index 03500941b7..8a51bac5f9 100644 --- a/cpp/unittest/scheduler/resource_test.cpp +++ b/cpp/unittest/scheduler/resource_test.cpp @@ -86,9 +86,7 @@ TEST_F(ResourceBaseTest, dump) { ASSERT_FALSE(only_executor_->Dump().empty()); ASSERT_FALSE(both_enable_->Dump().empty()); ASSERT_FALSE(both_disable_->Dump().empty()); - std::stringstream ss; - ss << only_loader_ << only_executor_ << both_enable_ << both_disable_; - ASSERT_FALSE(ss.str().empty()); + std::cout << *only_loader_ << *only_executor_ << *both_enable_ << *both_disable_; } /************ ResourceAdvanceTest ************/ diff --git a/cpp/unittest/server/util_test.cpp b/cpp/unittest/server/util_test.cpp index 68e74a846f..730eedbd33 100644 --- a/cpp/unittest/server/util_test.cpp +++ b/cpp/unittest/server/util_test.cpp @@ -5,6 +5,7 @@ //////////////////////////////////////////////////////////////////////////////// #include #include +#include #include "utils/CommonUtil.h" #include "utils/Error.h"