diff --git a/cpp/src/server/Config.cpp b/cpp/src/server/Config.cpp index 31dc5fe1a9..34d1fab7b3 100644 --- a/cpp/src/server/Config.cpp +++ b/cpp/src/server/Config.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "Config.h" +#include "server/Config.h" #include #include @@ -23,12 +23,13 @@ #include #include #include +#include +#include #include "config/ConfigMgr.h" #include "utils/CommonUtil.h" #include "utils/ValidationUtil.h" - namespace zilliz { namespace milvus { namespace server { @@ -176,11 +177,11 @@ Config::ValidateConfig() { } void -Config::PrintConfigSection(const std::string& config_node_name) { +Config::PrintConfigSection(const std::string &config_node_name) { std::cout << std::endl; std::cout << config_node_name << ":" << std::endl; if (config_map_.find(config_node_name) != config_map_.end()) { - for (auto item: config_map_[config_node_name]) { + for (auto item : config_map_[config_node_name]) { std::cout << item.first << ": " << item.second << std::endl; } } @@ -290,7 +291,7 @@ Config::CheckDBConfigInsertBufferSize(const std::string &value) { return Status(SERVER_INVALID_ARGUMENT, "Invalid DB config insert_buffer_size: " + value); } else { int64_t buffer_size = std::stoi(value) * GB; - unsigned long total_mem = 0, free_mem = 0; + uint64_t total_mem = 0, free_mem = 0; CommonUtil::GetSystemMemInfo(total_mem, free_mem); if (buffer_size >= total_mem) { return Status(SERVER_INVALID_ARGUMENT, "DB config insert_buffer_size exceed system memory: " + value); @@ -313,7 +314,7 @@ Config::CheckDBConfigBuildIndexGPU(const std::string &value) { } Status -Config::CheckMetricConfigEnableMonitor(const std::string& value) { +Config::CheckMetricConfigEnableMonitor(const std::string &value) { if (!ValidationUtil::ValidateStringIsBool(value).ok()) { return Status(SERVER_INVALID_ARGUMENT, "Invalid metric config auto_bootup: " + value); } @@ -321,7 +322,7 @@ Config::CheckMetricConfigEnableMonitor(const std::string& value) { } Status -Config::CheckMetricConfigCollector(const std::string& value) { +Config::CheckMetricConfigCollector(const std::string &value) { if (value != "prometheus") { return Status(SERVER_INVALID_ARGUMENT, "Invalid metric config collector: " + value); } @@ -329,7 +330,7 @@ Config::CheckMetricConfigCollector(const std::string& value) { } Status -Config::CheckMetricConfigPrometheusPort(const std::string& value) { +Config::CheckMetricConfigPrometheusPort(const std::string &value) { if (!ValidationUtil::ValidateStringIsNumber(value).ok()) { return Status(SERVER_INVALID_ARGUMENT, "Invalid metric config prometheus_port: " + value); } @@ -337,12 +338,12 @@ Config::CheckMetricConfigPrometheusPort(const std::string& value) { } Status -Config::CheckCacheConfigCpuMemCapacity(const std::string& value) { +Config::CheckCacheConfigCpuMemCapacity(const std::string &value) { if (!ValidationUtil::ValidateStringIsNumber(value).ok()) { return Status(SERVER_INVALID_ARGUMENT, "Invalid cache config cpu_mem_capacity: " + value); } else { uint64_t cpu_cache_capacity = std::stoi(value) * GB; - unsigned long total_mem = 0, free_mem = 0; + uint64_t total_mem = 0, free_mem = 0; CommonUtil::GetSystemMemInfo(total_mem, free_mem); if (cpu_cache_capacity >= total_mem) { return Status(SERVER_INVALID_ARGUMENT, "Cache config cpu_mem_capacity exceed system memory: " + value); @@ -362,7 +363,7 @@ Config::CheckCacheConfigCpuMemCapacity(const std::string& value) { } Status -Config::CheckCacheConfigCpuMemThreshold(const std::string& value) { +Config::CheckCacheConfigCpuMemThreshold(const std::string &value) { if (!ValidationUtil::ValidateStringIsFloat(value).ok()) { return Status(SERVER_INVALID_ARGUMENT, "Invalid cache config cpu_mem_threshold: " + value); } else { @@ -375,7 +376,7 @@ Config::CheckCacheConfigCpuMemThreshold(const std::string& value) { } Status -Config::CheckCacheConfigGpuMemCapacity(const std::string& value) { +Config::CheckCacheConfigGpuMemCapacity(const std::string &value) { if (!ValidationUtil::ValidateStringIsNumber(value).ok()) { std::cerr << "ERROR: gpu_cache_capacity " << value << " is not a number" << std::endl; } else { @@ -398,7 +399,7 @@ Config::CheckCacheConfigGpuMemCapacity(const std::string& value) { } Status -Config::CheckCacheConfigGpuMemThreshold(const std::string& value) { +Config::CheckCacheConfigGpuMemThreshold(const std::string &value) { if (!ValidationUtil::ValidateStringIsFloat(value).ok()) { return Status(SERVER_INVALID_ARGUMENT, "Invalid cache config gpu_mem_threshold: " + value); } else { @@ -411,7 +412,7 @@ Config::CheckCacheConfigGpuMemThreshold(const std::string& value) { } Status -Config::CheckCacheConfigCacheInsertData(const std::string& value) { +Config::CheckCacheConfigCacheInsertData(const std::string &value) { if (!ValidationUtil::ValidateStringIsBool(value).ok()) { return Status(SERVER_INVALID_ARGUMENT, "Invalid cache config cache_insert_data: " + value); } @@ -419,7 +420,7 @@ Config::CheckCacheConfigCacheInsertData(const std::string& value) { } Status -Config::CheckEngineConfigBlasThreshold(const std::string& value) { +Config::CheckEngineConfigBlasThreshold(const std::string &value) { if (!ValidationUtil::ValidateStringIsNumber(value).ok()) { return Status(SERVER_INVALID_ARGUMENT, "Invalid engine config blas threshold: " + value); } @@ -427,7 +428,7 @@ Config::CheckEngineConfigBlasThreshold(const std::string& value) { } Status -Config::CheckEngineConfigOmpThreadNum(const std::string& value) { +Config::CheckEngineConfigOmpThreadNum(const std::string &value) { if (!ValidationUtil::ValidateStringIsNumber(value).ok()) { return Status(SERVER_INVALID_ARGUMENT, "Invalid engine config omp_thread_num: " + value); } else { @@ -441,7 +442,7 @@ Config::CheckEngineConfigOmpThreadNum(const std::string& value) { } Status -Config::CheckResourceConfigMode(const std::string& value) { +Config::CheckResourceConfigMode(const std::string &value) { if (value != "simple") { return Status(SERVER_INVALID_ARGUMENT, "Invalid resource config mode: " + value); } @@ -449,7 +450,7 @@ Config::CheckResourceConfigMode(const std::string& value) { } Status -Config::CheckResourceConfigPool(const std::vector& value) { +Config::CheckResourceConfigPool(const std::vector &value) { if (value.empty()) { return Status(SERVER_INVALID_ARGUMENT, "Invalid resource config pool"); } @@ -740,52 +741,51 @@ Config::GetResourceConfigStrMode() { return value; } - //////////////////////////////////////////////////////////////////////////////// Status -Config::GetServerConfigAddress(std::string& value) { +Config::GetServerConfigAddress(std::string &value) { value = GetServerConfigStrAddress(); return CheckServerConfigAddress(value); } Status -Config::GetServerConfigPort(std::string& value) { +Config::GetServerConfigPort(std::string &value) { value = GetServerConfigStrPort(); return CheckServerConfigPort(value); } Status -Config::GetServerConfigDeployMode(std::string& value) { +Config::GetServerConfigDeployMode(std::string &value) { value = GetServerConfigStrDeployMode(); return CheckServerConfigDeployMode(value); } Status -Config::GetServerConfigTimeZone(std::string& value) { +Config::GetServerConfigTimeZone(std::string &value) { value = GetServerConfigStrTimeZone(); return CheckServerConfigTimeZone(value); } Status -Config::GetDBConfigPrimaryPath(std::string& value) { +Config::GetDBConfigPrimaryPath(std::string &value) { value = GetDBConfigStrPrimaryPath(); return CheckDBConfigPrimaryPath(value); } Status -Config::GetDBConfigSecondaryPath(std::string& value) { +Config::GetDBConfigSecondaryPath(std::string &value) { value = GetDBConfigStrSecondaryPath(); return Status::OK(); } Status -Config::GetDBConfigBackendUrl(std::string& value) { +Config::GetDBConfigBackendUrl(std::string &value) { value = GetDBConfigStrBackendUrl(); return CheckDBConfigBackendUrl(value); } Status -Config::GetDBConfigArchiveDiskThreshold(int32_t& value) { +Config::GetDBConfigArchiveDiskThreshold(int32_t &value) { std::string str = GetDBConfigStrArchiveDiskThreshold(); Status s = CheckDBConfigArchiveDiskThreshold(str); if (!s.ok()) return s; @@ -794,7 +794,7 @@ Config::GetDBConfigArchiveDiskThreshold(int32_t& value) { } Status -Config::GetDBConfigArchiveDaysThreshold(int32_t& value) { +Config::GetDBConfigArchiveDaysThreshold(int32_t &value) { std::string str = GetDBConfigStrArchiveDaysThreshold(); Status s = CheckDBConfigArchiveDaysThreshold(str); if (!s.ok()) return s; @@ -803,7 +803,7 @@ Config::GetDBConfigArchiveDaysThreshold(int32_t& value) { } Status -Config::GetDBConfigInsertBufferSize(int32_t& value) { +Config::GetDBConfigInsertBufferSize(int32_t &value) { std::string str = GetDBConfigStrInsertBufferSize(); Status s = CheckDBConfigInsertBufferSize(str); if (!s.ok()) return s; @@ -812,7 +812,7 @@ Config::GetDBConfigInsertBufferSize(int32_t& value) { } Status -Config::GetDBConfigBuildIndexGPU(int32_t& value) { +Config::GetDBConfigBuildIndexGPU(int32_t &value) { std::string str = GetDBConfigStrBuildIndexGPU(); Status s = CheckDBConfigBuildIndexGPU(str); if (!s.ok()) return s; @@ -821,7 +821,7 @@ Config::GetDBConfigBuildIndexGPU(int32_t& value) { } Status -Config::GetMetricConfigEnableMonitor(bool& value) { +Config::GetMetricConfigEnableMonitor(bool &value) { std::string str = GetMetricConfigStrEnableMonitor(); Status s = CheckMetricConfigEnableMonitor(str); if (!s.ok()) return s; @@ -831,19 +831,19 @@ Config::GetMetricConfigEnableMonitor(bool& value) { } Status -Config::GetMetricConfigCollector(std::string& value) { +Config::GetMetricConfigCollector(std::string &value) { value = GetMetricConfigStrCollector(); return Status::OK(); } Status -Config::GetMetricConfigPrometheusPort(std::string& value) { +Config::GetMetricConfigPrometheusPort(std::string &value) { value = GetMetricConfigStrPrometheusPort(); return CheckMetricConfigPrometheusPort(value); } Status -Config::GetCacheConfigCpuMemCapacity(int32_t& value) { +Config::GetCacheConfigCpuMemCapacity(int32_t &value) { std::string str = GetCacheConfigStrCpuMemCapacity(); Status s = CheckCacheConfigCpuMemCapacity(str); if (!s.ok()) return s; @@ -852,7 +852,7 @@ Config::GetCacheConfigCpuMemCapacity(int32_t& value) { } Status -Config::GetCacheConfigCpuMemThreshold(float& value) { +Config::GetCacheConfigCpuMemThreshold(float &value) { std::string str = GetCacheConfigStrCpuMemThreshold(); Status s = CheckCacheConfigCpuMemThreshold(str); if (!s.ok()) return s; @@ -861,7 +861,7 @@ Config::GetCacheConfigCpuMemThreshold(float& value) { } Status -Config::GetCacheConfigGpuMemCapacity(int32_t& value) { +Config::GetCacheConfigGpuMemCapacity(int32_t &value) { std::string str = GetCacheConfigStrGpuMemCapacity(); Status s = CheckCacheConfigGpuMemCapacity(str); if (!s.ok()) return s; @@ -870,7 +870,7 @@ Config::GetCacheConfigGpuMemCapacity(int32_t& value) { } Status -Config::GetCacheConfigGpuMemThreshold(float& value) { +Config::GetCacheConfigGpuMemThreshold(float &value) { std::string str = GetCacheConfigStrGpuMemThreshold(); Status s = CheckCacheConfigGpuMemThreshold(str); if (!s.ok()) return s; @@ -879,7 +879,7 @@ Config::GetCacheConfigGpuMemThreshold(float& value) { } Status -Config::GetCacheConfigCacheInsertData(bool& value) { +Config::GetCacheConfigCacheInsertData(bool &value) { std::string str = GetCacheConfigStrCacheInsertData(); Status s = CheckCacheConfigCacheInsertData(str); if (!s.ok()) return s; @@ -889,7 +889,7 @@ Config::GetCacheConfigCacheInsertData(bool& value) { } Status -Config::GetEngineConfigBlasThreshold(int32_t& value) { +Config::GetEngineConfigBlasThreshold(int32_t &value) { std::string str = GetEngineConfigStrBlasThreshold(); Status s = CheckEngineConfigBlasThreshold(str); if (!s.ok()) return s; @@ -898,7 +898,7 @@ Config::GetEngineConfigBlasThreshold(int32_t& value) { } Status -Config::GetEngineConfigOmpThreadNum(int32_t& value) { +Config::GetEngineConfigOmpThreadNum(int32_t &value) { std::string str = GetEngineConfigStrOmpThreadNum(); Status s = CheckEngineConfigOmpThreadNum(str); if (!s.ok()) return s; @@ -907,13 +907,13 @@ Config::GetEngineConfigOmpThreadNum(int32_t& value) { } Status -Config::GetResourceConfigMode(std::string& value) { +Config::GetResourceConfigMode(std::string &value) { value = GetResourceConfigStrMode(); return CheckResourceConfigMode(value); } Status -Config::GetResourceConfigPool(std::vector& value) { +Config::GetResourceConfigPool(std::vector &value) { ConfigNode resource_config = GetConfigNode(CONFIG_RESOURCE); value = resource_config.GetSequence(CONFIG_RESOURCE_POOL); return CheckResourceConfigPool(value); @@ -922,7 +922,7 @@ Config::GetResourceConfigPool(std::vector& value) { /////////////////////////////////////////////////////////////////////////////// /* server config */ Status -Config::SetServerConfigAddress(const std::string& value) { +Config::SetServerConfigAddress(const std::string &value) { Status s = CheckServerConfigAddress(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_ADDRESS, value); @@ -930,7 +930,7 @@ Config::SetServerConfigAddress(const std::string& value) { } Status -Config::SetServerConfigPort(const std::string& value) { +Config::SetServerConfigPort(const std::string &value) { Status s = CheckServerConfigPort(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_PORT, value); @@ -938,7 +938,7 @@ Config::SetServerConfigPort(const std::string& value) { } Status -Config::SetServerConfigDeployMode(const std::string& value) { +Config::SetServerConfigDeployMode(const std::string &value) { Status s = CheckServerConfigDeployMode(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_DEPLOY_MODE, value); @@ -946,7 +946,7 @@ Config::SetServerConfigDeployMode(const std::string& value) { } Status -Config::SetServerConfigTimeZone(const std::string& value) { +Config::SetServerConfigTimeZone(const std::string &value) { Status s = CheckServerConfigTimeZone(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_TIME_ZONE, value); @@ -955,7 +955,7 @@ Config::SetServerConfigTimeZone(const std::string& value) { /* db config */ Status -Config::SetDBConfigPrimaryPath(const std::string& value) { +Config::SetDBConfigPrimaryPath(const std::string &value) { Status s = CheckDBConfigPrimaryPath(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_DB_PRIMARY_PATH, value); @@ -963,7 +963,7 @@ Config::SetDBConfigPrimaryPath(const std::string& value) { } Status -Config::SetDBConfigSecondaryPath(const std::string& value) { +Config::SetDBConfigSecondaryPath(const std::string &value) { Status s = CheckDBConfigSecondaryPath(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_DB_SECONDARY_PATH, value); @@ -971,7 +971,7 @@ Config::SetDBConfigSecondaryPath(const std::string& value) { } Status -Config::SetDBConfigBackendUrl(const std::string& value) { +Config::SetDBConfigBackendUrl(const std::string &value) { Status s = CheckDBConfigBackendUrl(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_DB_BACKEND_URL, value); @@ -979,7 +979,7 @@ Config::SetDBConfigBackendUrl(const std::string& value) { } Status -Config::SetDBConfigArchiveDiskThreshold(const std::string& value) { +Config::SetDBConfigArchiveDiskThreshold(const std::string &value) { Status s = CheckDBConfigArchiveDiskThreshold(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_DB_ARCHIVE_DISK_THRESHOLD, value); @@ -987,7 +987,7 @@ Config::SetDBConfigArchiveDiskThreshold(const std::string& value) { } Status -Config::SetDBConfigArchiveDaysThreshold(const std::string& value) { +Config::SetDBConfigArchiveDaysThreshold(const std::string &value) { Status s = CheckDBConfigArchiveDaysThreshold(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_DB_ARCHIVE_DAYS_THRESHOLD, value); @@ -995,7 +995,7 @@ Config::SetDBConfigArchiveDaysThreshold(const std::string& value) { } Status -Config::SetDBConfigInsertBufferSize(const std::string& value) { +Config::SetDBConfigInsertBufferSize(const std::string &value) { Status s = CheckDBConfigInsertBufferSize(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_DB_INSERT_BUFFER_SIZE, value); @@ -1003,7 +1003,7 @@ Config::SetDBConfigInsertBufferSize(const std::string& value) { } Status -Config::SetDBConfigBuildIndexGPU(const std::string& value) { +Config::SetDBConfigBuildIndexGPU(const std::string &value) { Status s = CheckDBConfigBuildIndexGPU(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_DB_BUILD_INDEX_GPU, value); @@ -1012,7 +1012,7 @@ Config::SetDBConfigBuildIndexGPU(const std::string& value) { /* metric config */ Status -Config::SetMetricConfigEnableMonitor(const std::string& value) { +Config::SetMetricConfigEnableMonitor(const std::string &value) { Status s = CheckMetricConfigEnableMonitor(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_METRIC_ENABLE_MONITOR, value); @@ -1020,7 +1020,7 @@ Config::SetMetricConfigEnableMonitor(const std::string& value) { } Status -Config::SetMetricConfigCollector(const std::string& value) { +Config::SetMetricConfigCollector(const std::string &value) { Status s = CheckMetricConfigCollector(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_METRIC_COLLECTOR, value); @@ -1028,7 +1028,7 @@ Config::SetMetricConfigCollector(const std::string& value) { } Status -Config::SetMetricConfigPrometheusPort(const std::string& value) { +Config::SetMetricConfigPrometheusPort(const std::string &value) { Status s = CheckMetricConfigPrometheusPort(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_METRIC_PROMETHEUS_PORT, value); @@ -1037,7 +1037,7 @@ Config::SetMetricConfigPrometheusPort(const std::string& value) { /* cache config */ Status -Config::SetCacheConfigCpuMemCapacity(const std::string& value) { +Config::SetCacheConfigCpuMemCapacity(const std::string &value) { Status s = CheckCacheConfigCpuMemCapacity(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_CPU_MEM_CAPACITY, value); @@ -1045,7 +1045,7 @@ Config::SetCacheConfigCpuMemCapacity(const std::string& value) { } Status -Config::SetCacheConfigCpuMemThreshold(const std::string& value) { +Config::SetCacheConfigCpuMemThreshold(const std::string &value) { Status s = CheckCacheConfigCpuMemThreshold(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_CPU_MEM_THRESHOLD, value); @@ -1053,7 +1053,7 @@ Config::SetCacheConfigCpuMemThreshold(const std::string& value) { } Status -Config::SetCacheConfigGpuMemCapacity(const std::string& value) { +Config::SetCacheConfigGpuMemCapacity(const std::string &value) { Status s = CheckCacheConfigGpuMemCapacity(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_GPU_MEM_CAPACITY, value); @@ -1061,7 +1061,7 @@ Config::SetCacheConfigGpuMemCapacity(const std::string& value) { } Status -Config::SetCacheConfigGpuMemThreshold(const std::string& value) { +Config::SetCacheConfigGpuMemThreshold(const std::string &value) { Status s = CheckCacheConfigGpuMemThreshold(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_GPU_MEM_THRESHOLD, value); @@ -1069,7 +1069,7 @@ Config::SetCacheConfigGpuMemThreshold(const std::string& value) { } Status -Config::SetCacheConfigCacheInsertData(const std::string& value) { +Config::SetCacheConfigCacheInsertData(const std::string &value) { Status s = CheckCacheConfigCacheInsertData(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_CACHE_INSERT_DATA, value); @@ -1078,7 +1078,7 @@ Config::SetCacheConfigCacheInsertData(const std::string& value) { /* engine config */ Status -Config::SetEngineConfigBlasThreshold(const std::string& value) { +Config::SetEngineConfigBlasThreshold(const std::string &value) { Status s = CheckEngineConfigBlasThreshold(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_ENGINE_BLAS_THRESHOLD, value); @@ -1086,7 +1086,7 @@ Config::SetEngineConfigBlasThreshold(const std::string& value) { } Status -Config::SetEngineConfigOmpThreadNum(const std::string& value) { +Config::SetEngineConfigOmpThreadNum(const std::string &value) { Status s = CheckEngineConfigOmpThreadNum(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_ENGINE_OMP_THREAD_NUM, value); @@ -1095,13 +1095,14 @@ Config::SetEngineConfigOmpThreadNum(const std::string& value) { /* resource config */ Status -Config::SetResourceConfigMode(const std::string& value) { +Config::SetResourceConfigMode(const std::string &value) { Status s = CheckResourceConfigMode(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_RESOURCE_MODE, value); return Status::OK(); } -} -} -} +} // namespace server +} // namespace milvus +} // namespace zilliz + diff --git a/cpp/src/server/Config.h b/cpp/src/server/Config.h index 422632a01f..b1de101bd1 100644 --- a/cpp/src/server/Config.h +++ b/cpp/src/server/Config.h @@ -17,137 +17,138 @@ #pragma once +#include +#include #include #include -#include "yaml-cpp/yaml.h" +#include + #include "utils/Status.h" #include "config/ConfigNode.h" - namespace zilliz { namespace milvus { namespace server { /* server config */ -static const char* CONFIG_SERVER = "server_config"; -static const char* CONFIG_SERVER_ADDRESS = "address"; -static const char* CONFIG_SERVER_ADDRESS_DEFAULT = "127.0.0.1"; -static const char* CONFIG_SERVER_PORT = "port"; -static const char* CONFIG_SERVER_PORT_DEFAULT = "19530"; -static const char* CONFIG_SERVER_DEPLOY_MODE = "deploy_mode"; -static const char* CONFIG_SERVER_DEPLOY_MODE_DEFAULT = "single"; -static const char* CONFIG_SERVER_TIME_ZONE = "time_zone"; -static const char* CONFIG_SERVER_TIME_ZONE_DEFAULT = "UTC+8"; +static const char *CONFIG_SERVER = "server_config"; +static const char *CONFIG_SERVER_ADDRESS = "address"; +static const char *CONFIG_SERVER_ADDRESS_DEFAULT = "127.0.0.1"; +static const char *CONFIG_SERVER_PORT = "port"; +static const char *CONFIG_SERVER_PORT_DEFAULT = "19530"; +static const char *CONFIG_SERVER_DEPLOY_MODE = "deploy_mode"; +static const char *CONFIG_SERVER_DEPLOY_MODE_DEFAULT = "single"; +static const char *CONFIG_SERVER_TIME_ZONE = "time_zone"; +static const char *CONFIG_SERVER_TIME_ZONE_DEFAULT = "UTC+8"; /* db config */ -static const char* CONFIG_DB = "db_config"; -static const char* CONFIG_DB_PRIMARY_PATH = "primary_path"; -static const char* CONFIG_DB_PRIMARY_PATH_DEFAULT = "/tmp/milvus"; -static const char* CONFIG_DB_SECONDARY_PATH = "secondary_path"; -static const char* CONFIG_DB_SECONDARY_PATH_DEFAULT = ""; -static const char* CONFIG_DB_BACKEND_URL = "backend_url"; -static const char* CONFIG_DB_BACKEND_URL_DEFAULT = "sqlite://:@:/"; -static const char* CONFIG_DB_ARCHIVE_DISK_THRESHOLD = "archive_disk_threshold"; -static const char* CONFIG_DB_ARCHIVE_DISK_THRESHOLD_DEFAULT = "0"; -static const char* CONFIG_DB_ARCHIVE_DAYS_THRESHOLD = "archive_days_threshold"; -static const char* CONFIG_DB_ARCHIVE_DAYS_THRESHOLD_DEFAULT = "0"; -static const char* CONFIG_DB_INSERT_BUFFER_SIZE = "insert_buffer_size"; -static const char* CONFIG_DB_INSERT_BUFFER_SIZE_DEFAULT = "4"; -static const char* CONFIG_DB_BUILD_INDEX_GPU = "build_index_gpu"; -static const char* CONFIG_DB_BUILD_INDEX_GPU_DEFAULT = "0"; +static const char *CONFIG_DB = "db_config"; +static const char *CONFIG_DB_PRIMARY_PATH = "primary_path"; +static const char *CONFIG_DB_PRIMARY_PATH_DEFAULT = "/tmp/milvus"; +static const char *CONFIG_DB_SECONDARY_PATH = "secondary_path"; +static const char *CONFIG_DB_SECONDARY_PATH_DEFAULT = ""; +static const char *CONFIG_DB_BACKEND_URL = "backend_url"; +static const char *CONFIG_DB_BACKEND_URL_DEFAULT = "sqlite://:@:/"; +static const char *CONFIG_DB_ARCHIVE_DISK_THRESHOLD = "archive_disk_threshold"; +static const char *CONFIG_DB_ARCHIVE_DISK_THRESHOLD_DEFAULT = "0"; +static const char *CONFIG_DB_ARCHIVE_DAYS_THRESHOLD = "archive_days_threshold"; +static const char *CONFIG_DB_ARCHIVE_DAYS_THRESHOLD_DEFAULT = "0"; +static const char *CONFIG_DB_INSERT_BUFFER_SIZE = "insert_buffer_size"; +static const char *CONFIG_DB_INSERT_BUFFER_SIZE_DEFAULT = "4"; +static const char *CONFIG_DB_BUILD_INDEX_GPU = "build_index_gpu"; +static const char *CONFIG_DB_BUILD_INDEX_GPU_DEFAULT = "0"; /* cache config */ -static const char* CONFIG_CACHE = "cache_config"; -static const char* CONFIG_CACHE_CPU_MEM_CAPACITY = "cpu_mem_capacity"; -static const char* CONFIG_CACHE_CPU_MEM_CAPACITY_DEFAULT = "16"; -static const char* CONFIG_CACHE_GPU_MEM_CAPACITY = "gpu_mem_capacity"; -static const char* CONFIG_CACHE_GPU_MEM_CAPACITY_DEFAULT = "0"; -static const char* CONFIG_CACHE_CPU_MEM_THRESHOLD = "cpu_mem_threshold"; -static const char* CONFIG_CACHE_CPU_MEM_THRESHOLD_DEFAULT = "0.85"; -static const char* CONFIG_CACHE_GPU_MEM_THRESHOLD = "gpu_mem_threshold"; -static const char* CONFIG_CACHE_GPU_MEM_THRESHOLD_DEFAULT = "0.85"; -static const char* CONFIG_CACHE_CACHE_INSERT_DATA = "cache_insert_data"; -static const char* CONFIG_CACHE_CACHE_INSERT_DATA_DEFAULT = "false"; +static const char *CONFIG_CACHE = "cache_config"; +static const char *CONFIG_CACHE_CPU_MEM_CAPACITY = "cpu_mem_capacity"; +static const char *CONFIG_CACHE_CPU_MEM_CAPACITY_DEFAULT = "16"; +static const char *CONFIG_CACHE_GPU_MEM_CAPACITY = "gpu_mem_capacity"; +static const char *CONFIG_CACHE_GPU_MEM_CAPACITY_DEFAULT = "0"; +static const char *CONFIG_CACHE_CPU_MEM_THRESHOLD = "cpu_mem_threshold"; +static const char *CONFIG_CACHE_CPU_MEM_THRESHOLD_DEFAULT = "0.85"; +static const char *CONFIG_CACHE_GPU_MEM_THRESHOLD = "gpu_mem_threshold"; +static const char *CONFIG_CACHE_GPU_MEM_THRESHOLD_DEFAULT = "0.85"; +static const char *CONFIG_CACHE_CACHE_INSERT_DATA = "cache_insert_data"; +static const char *CONFIG_CACHE_CACHE_INSERT_DATA_DEFAULT = "false"; /* metric config */ -static const char* CONFIG_METRIC = "metric_config"; -static const char* CONFIG_METRIC_ENABLE_MONITOR = "enable_monitor"; -static const char* CONFIG_METRIC_ENABLE_MONITOR_DEFAULT = "false"; -static const char* CONFIG_METRIC_COLLECTOR = "collector"; -static const char* CONFIG_METRIC_COLLECTOR_DEFAULT = "prometheus"; -static const char* CONFIG_METRIC_PROMETHEUS = "prometheus_config"; -static const char* CONFIG_METRIC_PROMETHEUS_PORT = "port"; -static const char* CONFIG_METRIC_PROMETHEUS_PORT_DEFAULT = "8080"; +static const char *CONFIG_METRIC = "metric_config"; +static const char *CONFIG_METRIC_ENABLE_MONITOR = "enable_monitor"; +static const char *CONFIG_METRIC_ENABLE_MONITOR_DEFAULT = "false"; +static const char *CONFIG_METRIC_COLLECTOR = "collector"; +static const char *CONFIG_METRIC_COLLECTOR_DEFAULT = "prometheus"; +static const char *CONFIG_METRIC_PROMETHEUS = "prometheus_config"; +static const char *CONFIG_METRIC_PROMETHEUS_PORT = "port"; +static const char *CONFIG_METRIC_PROMETHEUS_PORT_DEFAULT = "8080"; /* engine config */ -static const char* CONFIG_ENGINE = "engine_config"; -static const char* CONFIG_ENGINE_BLAS_THRESHOLD = "blas_threshold"; -static const char* CONFIG_ENGINE_BLAS_THRESHOLD_DEFAULT = "20"; -static const char* CONFIG_ENGINE_OMP_THREAD_NUM = "omp_thread_num"; -static const char* CONFIG_ENGINE_OMP_THREAD_NUM_DEFAULT = "0"; +static const char *CONFIG_ENGINE = "engine_config"; +static const char *CONFIG_ENGINE_BLAS_THRESHOLD = "blas_threshold"; +static const char *CONFIG_ENGINE_BLAS_THRESHOLD_DEFAULT = "20"; +static const char *CONFIG_ENGINE_OMP_THREAD_NUM = "omp_thread_num"; +static const char *CONFIG_ENGINE_OMP_THREAD_NUM_DEFAULT = "0"; /* resource config */ -static const char* CONFIG_RESOURCE = "resource_config"; -static const char* CONFIG_RESOURCE_MODE = "mode"; -static const char* CONFIG_RESOURCE_MODE_DEFAULT = "simple"; -static const char* CONFIG_RESOURCE_POOL = "resource_pool"; - +static const char *CONFIG_RESOURCE = "resource_config"; +static const char *CONFIG_RESOURCE_MODE = "mode"; +static const char *CONFIG_RESOURCE_MODE_DEFAULT = "simple"; +static const char *CONFIG_RESOURCE_POOL = "resource_pool"; class Config { public: - static Config& GetInstance(); - Status LoadConfigFile(const std::string& filename); + static Config &GetInstance(); + Status LoadConfigFile(const std::string &filename); Status ValidateConfig(); void PrintAll(); private: - ConfigNode& GetConfigNode(const std::string& name); + ConfigNode &GetConfigNode(const std::string &name); - Status GetConfigValueInMem(const std::string& parent_key, - const std::string& child_key, - std::string& value); + Status GetConfigValueInMem(const std::string &parent_key, + const std::string &child_key, + std::string &value); - void SetConfigValueInMem(const std::string& parent_key, - const std::string& child_key, - const std::string& value); + void SetConfigValueInMem(const std::string &parent_key, + const std::string &child_key, + const std::string &value); - void PrintConfigSection(const std::string& config_node_name); + void PrintConfigSection(const std::string &config_node_name); /////////////////////////////////////////////////////////////////////////// /* server config */ - Status CheckServerConfigAddress(const std::string& value); - Status CheckServerConfigPort(const std::string& value); - Status CheckServerConfigDeployMode(const std::string& value); - Status CheckServerConfigTimeZone(const std::string& value); + Status CheckServerConfigAddress(const std::string &value); + Status CheckServerConfigPort(const std::string &value); + Status CheckServerConfigDeployMode(const std::string &value); + Status CheckServerConfigTimeZone(const std::string &value); /* db config */ - Status CheckDBConfigPrimaryPath(const std::string& value); - Status CheckDBConfigSecondaryPath(const std::string& value); - Status CheckDBConfigBackendUrl(const std::string& value); - Status CheckDBConfigArchiveDiskThreshold(const std::string& value); - Status CheckDBConfigArchiveDaysThreshold(const std::string& value); - Status CheckDBConfigInsertBufferSize(const std::string& value); - Status CheckDBConfigBuildIndexGPU(const std::string& value); + Status CheckDBConfigPrimaryPath(const std::string &value); + Status CheckDBConfigSecondaryPath(const std::string &value); + Status CheckDBConfigBackendUrl(const std::string &value); + Status CheckDBConfigArchiveDiskThreshold(const std::string &value); + Status CheckDBConfigArchiveDaysThreshold(const std::string &value); + Status CheckDBConfigInsertBufferSize(const std::string &value); + Status CheckDBConfigBuildIndexGPU(const std::string &value); /* metric config */ - Status CheckMetricConfigEnableMonitor(const std::string& value); - Status CheckMetricConfigCollector(const std::string& value); - Status CheckMetricConfigPrometheusPort(const std::string& value); + Status CheckMetricConfigEnableMonitor(const std::string &value); + Status CheckMetricConfigCollector(const std::string &value); + Status CheckMetricConfigPrometheusPort(const std::string &value); /* cache config */ - Status CheckCacheConfigCpuMemCapacity(const std::string& value); - Status CheckCacheConfigCpuMemThreshold(const std::string& value); - Status CheckCacheConfigGpuMemCapacity(const std::string& value); - Status CheckCacheConfigGpuMemThreshold(const std::string& value); - Status CheckCacheConfigCacheInsertData(const std::string& value); + Status CheckCacheConfigCpuMemCapacity(const std::string &value); + Status CheckCacheConfigCpuMemThreshold(const std::string &value); + Status CheckCacheConfigGpuMemCapacity(const std::string &value); + Status CheckCacheConfigGpuMemThreshold(const std::string &value); + Status CheckCacheConfigCacheInsertData(const std::string &value); /* engine config */ - Status CheckEngineConfigBlasThreshold(const std::string& value); - Status CheckEngineConfigOmpThreadNum(const std::string& value); + Status CheckEngineConfigBlasThreshold(const std::string &value); + Status CheckEngineConfigOmpThreadNum(const std::string &value); /* resource config */ - Status CheckResourceConfigMode(const std::string& value); - Status CheckResourceConfigPool(const std::vector& value); + Status CheckResourceConfigMode(const std::string &value); + Status CheckResourceConfigPool(const std::vector &value); /////////////////////////////////////////////////////////////////////////// /* server config */ @@ -186,81 +187,81 @@ class Config { public: /* server config */ - Status GetServerConfigAddress(std::string& value); - Status GetServerConfigPort(std::string& value); - Status GetServerConfigDeployMode(std::string& value); - Status GetServerConfigTimeZone(std::string& value); + Status GetServerConfigAddress(std::string &value); + Status GetServerConfigPort(std::string &value); + Status GetServerConfigDeployMode(std::string &value); + Status GetServerConfigTimeZone(std::string &value); /* db config */ - Status GetDBConfigPrimaryPath(std::string& value); - Status GetDBConfigSecondaryPath(std::string& value); - Status GetDBConfigBackendUrl(std::string& value); - Status GetDBConfigArchiveDiskThreshold(int32_t& value); - Status GetDBConfigArchiveDaysThreshold(int32_t& value); - Status GetDBConfigInsertBufferSize(int32_t& value); - Status GetDBConfigBuildIndexGPU(int32_t& value); + Status GetDBConfigPrimaryPath(std::string &value); + Status GetDBConfigSecondaryPath(std::string &value); + Status GetDBConfigBackendUrl(std::string &value); + Status GetDBConfigArchiveDiskThreshold(int32_t &value); + Status GetDBConfigArchiveDaysThreshold(int32_t &value); + Status GetDBConfigInsertBufferSize(int32_t &value); + Status GetDBConfigBuildIndexGPU(int32_t &value); /* metric config */ - Status GetMetricConfigEnableMonitor(bool& value); - Status GetMetricConfigCollector(std::string& value); - Status GetMetricConfigPrometheusPort(std::string& value); + Status GetMetricConfigEnableMonitor(bool &value); + Status GetMetricConfigCollector(std::string &value); + Status GetMetricConfigPrometheusPort(std::string &value); /* cache config */ - Status GetCacheConfigCpuMemCapacity(int32_t& value); - Status GetCacheConfigCpuMemThreshold(float& value); - Status GetCacheConfigGpuMemCapacity(int32_t& value); - Status GetCacheConfigGpuMemThreshold(float& value); - Status GetCacheConfigCacheInsertData(bool& value); + Status GetCacheConfigCpuMemCapacity(int32_t &value); + Status GetCacheConfigCpuMemThreshold(float &value); + Status GetCacheConfigGpuMemCapacity(int32_t &value); + Status GetCacheConfigGpuMemThreshold(float &value); + Status GetCacheConfigCacheInsertData(bool &value); /* engine config */ - Status GetEngineConfigBlasThreshold(int32_t& value); - Status GetEngineConfigOmpThreadNum(int32_t& value); + Status GetEngineConfigBlasThreshold(int32_t &value); + Status GetEngineConfigOmpThreadNum(int32_t &value); /* resource config */ - Status GetResourceConfigMode(std::string& value); - Status GetResourceConfigPool(std::vector& value); + Status GetResourceConfigMode(std::string &value); + Status GetResourceConfigPool(std::vector &value); public: /* server config */ - Status SetServerConfigAddress(const std::string& value); - Status SetServerConfigPort(const std::string& value); - Status SetServerConfigDeployMode(const std::string& value); - Status SetServerConfigTimeZone(const std::string& value); + Status SetServerConfigAddress(const std::string &value); + Status SetServerConfigPort(const std::string &value); + Status SetServerConfigDeployMode(const std::string &value); + Status SetServerConfigTimeZone(const std::string &value); /* db config */ - Status SetDBConfigPrimaryPath(const std::string& value); - Status SetDBConfigSecondaryPath(const std::string& value); - Status SetDBConfigBackendUrl(const std::string& value); - Status SetDBConfigArchiveDiskThreshold(const std::string& value); - Status SetDBConfigArchiveDaysThreshold(const std::string& value); - Status SetDBConfigInsertBufferSize(const std::string& value); - Status SetDBConfigBuildIndexGPU(const std::string& value); + Status SetDBConfigPrimaryPath(const std::string &value); + Status SetDBConfigSecondaryPath(const std::string &value); + Status SetDBConfigBackendUrl(const std::string &value); + Status SetDBConfigArchiveDiskThreshold(const std::string &value); + Status SetDBConfigArchiveDaysThreshold(const std::string &value); + Status SetDBConfigInsertBufferSize(const std::string &value); + Status SetDBConfigBuildIndexGPU(const std::string &value); /* metric config */ - Status SetMetricConfigEnableMonitor(const std::string& value); - Status SetMetricConfigCollector(const std::string& value); - Status SetMetricConfigPrometheusPort(const std::string& value); + Status SetMetricConfigEnableMonitor(const std::string &value); + Status SetMetricConfigCollector(const std::string &value); + Status SetMetricConfigPrometheusPort(const std::string &value); /* cache config */ - Status SetCacheConfigCpuMemCapacity(const std::string& value); - Status SetCacheConfigCpuMemThreshold(const std::string& value); - Status SetCacheConfigGpuMemCapacity(const std::string& value); - Status SetCacheConfigGpuMemThreshold(const std::string& value); - Status SetCacheConfigCacheInsertData(const std::string& value); + Status SetCacheConfigCpuMemCapacity(const std::string &value); + Status SetCacheConfigCpuMemThreshold(const std::string &value); + Status SetCacheConfigGpuMemCapacity(const std::string &value); + Status SetCacheConfigGpuMemThreshold(const std::string &value); + Status SetCacheConfigCacheInsertData(const std::string &value); /* engine config */ - Status SetEngineConfigBlasThreshold(const std::string& value); - Status SetEngineConfigOmpThreadNum(const std::string& value); + Status SetEngineConfigBlasThreshold(const std::string &value); + Status SetEngineConfigOmpThreadNum(const std::string &value); /* resource config */ - Status SetResourceConfigMode(const std::string& value); + Status SetResourceConfigMode(const std::string &value); private: std::unordered_map> config_map_; std::mutex mutex_; }; -} -} -} +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/DBWrapper.cpp b/cpp/src/server/DBWrapper.cpp index 49c4cac8c8..66104bfe5a 100644 --- a/cpp/src/server/DBWrapper.cpp +++ b/cpp/src/server/DBWrapper.cpp @@ -16,13 +16,14 @@ // under the License. -#include "DBWrapper.h" +#include "server/DBWrapper.h" #include "Config.h" #include "db/DBFactory.h" #include "utils/CommonUtil.h" #include "utils/Log.h" #include "utils/StringHelpFunctions.h" +#include #include #include @@ -31,7 +32,6 @@ namespace milvus { namespace server { DBWrapper::DBWrapper() { - } Status DBWrapper::StartService() { @@ -65,15 +65,14 @@ Status DBWrapper::StartService() { if (mode == "single") { opt.mode_ = engine::DBOptions::MODE::SINGLE; - } - else if (mode == "cluster_readonly") { + } else if (mode == "cluster_readonly") { opt.mode_ = engine::DBOptions::MODE::CLUSTER_READONLY; - } - else if (mode == "cluster_writable") { + } else if (mode == "cluster_writable") { opt.mode_ = engine::DBOptions::MODE::CLUSTER_WRITABLE; - } - else { - std::cerr << "ERROR: mode specified in server_config must be ['single', 'cluster_readonly', 'cluster_writable']" << std::endl; + } else { + std::cerr << + "ERROR: mode specified in server_config must be ['single', 'cluster_readonly', 'cluster_writable']" + << std::endl; kill(0, SIGUSR1); } @@ -86,7 +85,7 @@ Status DBWrapper::StartService() { SERVER_LOG_DEBUG << "Specify openmp thread number: " << omp_thread; } else { uint32_t sys_thread_cnt = 8; - if(CommonUtil::GetSystemAvailableThreads(sys_thread_cnt)) { + if (CommonUtil::GetSystemAvailableThreads(sys_thread_cnt)) { omp_thread = (int32_t)ceil(sys_thread_cnt*0.5); omp_set_num_threads(omp_thread); } @@ -116,14 +115,14 @@ Status DBWrapper::StartService() { //create db root folder Status status = CommonUtil::CreateDirectory(opt.meta_.path_); - if(!status.ok()) { + if (!status.ok()) { std::cerr << "ERROR! Failed to create database root path: " << opt.meta_.path_ << std::endl; kill(0, SIGUSR1); } - for(auto& path : opt.meta_.slave_paths_) { + for (auto& path : opt.meta_.slave_paths_) { status = CommonUtil::CreateDirectory(path); - if(!status.ok()) { + if (!status.ok()) { std::cerr << "ERROR! Failed to create database slave path: " << path << std::endl; kill(0, SIGUSR1); } @@ -143,13 +142,13 @@ Status DBWrapper::StartService() { } Status DBWrapper::StopService() { - if(db_) { + if (db_) { db_->Stop(); } return Status::OK(); } -} -} -} \ No newline at end of file +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/DBWrapper.h b/cpp/src/server/DBWrapper.h index 99b8f90500..4648f4b59d 100644 --- a/cpp/src/server/DBWrapper.h +++ b/cpp/src/server/DBWrapper.h @@ -27,12 +27,12 @@ namespace milvus { namespace server { class DBWrapper { -private: + private: DBWrapper(); ~DBWrapper() = default; -public: - static DBWrapper& GetInstance() { + public: + static DBWrapper &GetInstance() { static DBWrapper wrapper; return wrapper; } @@ -48,10 +48,10 @@ public: return db_; } -private: + private: engine::DBPtr db_; }; -} -} -} +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/Server.cpp b/cpp/src/server/Server.cpp index 87d3b374b2..f0675fc84d 100644 --- a/cpp/src/server/Server.cpp +++ b/cpp/src/server/Server.cpp @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include "server/Server.h" + #include #include #include @@ -24,7 +26,6 @@ #include #include -#include "Server.h" #include "server/grpc_impl/GrpcServer.h" #include "server/Config.h" #include "utils/Log.h" @@ -36,7 +37,6 @@ #include "wrapper/KnowhereResource.h" #include "DBWrapper.h" - namespace zilliz { namespace milvus { namespace server { @@ -116,7 +116,7 @@ Server::Daemonize() { } // Close all open fd - for (long fd = sysconf(_SC_OPEN_MAX); fd > 0; fd--) { + for (int64_t fd = sysconf(_SC_OPEN_MAX); fd > 0; fd--) { close(fd); } @@ -172,9 +172,9 @@ Server::Start() { time_zone = "CUT"; } else { int time_bias = std::stoi(time_zone.substr(3, std::string::npos)); - if (time_bias == 0) + if (time_bias == 0) { time_zone = "CUT"; - else if (time_bias > 0) { + } else if (time_bias > 0) { time_zone = "CUT" + std::to_string(-time_bias); } else { time_zone = "CUT+" + std::to_string(-time_bias); @@ -194,7 +194,6 @@ Server::Start() { StartService(); std::cout << "Milvus server start successfully." << std::endl; - } catch (std::exception &ex) { std::cerr << "Milvus server encounter exception: " << ex.what(); } @@ -232,10 +231,9 @@ Server::Stop() { std::cerr << "Milvus server is closed!" << std::endl; } - ErrorCode Server::LoadConfig() { - Config& config = Config::GetInstance(); + Config &config = Config::GetInstance(); Status s = config.LoadConfigFile(config_filename_); if (!s.ok()) { std::cerr << "Failed to load config file: " << config_filename_ << std::endl; @@ -266,6 +264,6 @@ Server::StopService() { engine::KnowhereResource::Finalize(); } -} -} -} +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/Server.h b/cpp/src/server/Server.h index 8507552b75..9bd0ce13b5 100644 --- a/cpp/src/server/Server.h +++ b/cpp/src/server/Server.h @@ -22,7 +22,6 @@ #include #include - namespace zilliz { namespace milvus { namespace server { @@ -58,6 +57,6 @@ class Server { std::string log_config_file_; }; // Server -} // server -} // sql -} // zilliz +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp b/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp index c85d1b2fcd..a64422aab7 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp @@ -16,10 +16,12 @@ // under the License. -#include "GrpcRequestHandler.h" -#include "GrpcRequestTask.h" +#include "server/grpc_impl/GrpcRequestHandler.h" +#include "server/grpc_impl/GrpcRequestTask.h" #include "utils/TimeRecorder.h" +#include + namespace zilliz { namespace milvus { namespace server { @@ -29,7 +31,6 @@ namespace grpc { GrpcRequestHandler::CreateTable(::grpc::ServerContext *context, const ::milvus::grpc::TableSchema *request, ::milvus::grpc::Status *response) { - BaseTaskPtr task_ptr = CreateTableTask::Create(request); GrpcRequestScheduler::ExecTask(task_ptr, response); return ::grpc::Status::OK; @@ -39,7 +40,6 @@ GrpcRequestHandler::CreateTable(::grpc::ServerContext *context, GrpcRequestHandler::HasTable(::grpc::ServerContext *context, const ::milvus::grpc::TableName *request, ::milvus::grpc::BoolReply *response) { - bool has_table = false; BaseTaskPtr task_ptr = HasTableTask::Create(request->table_name(), has_table); ::milvus::grpc::Status grpc_status; @@ -61,9 +61,8 @@ GrpcRequestHandler::DropTable(::grpc::ServerContext *context, ::grpc::Status GrpcRequestHandler::CreateIndex(::grpc::ServerContext *context, - const ::milvus::grpc::IndexParam *request, - ::milvus::grpc::Status *response) { - + const ::milvus::grpc::IndexParam *request, + ::milvus::grpc::Status *response) { BaseTaskPtr task_ptr = CreateIndexTask::Create(request); GrpcRequestScheduler::ExecTask(task_ptr, response); return ::grpc::Status::OK; @@ -71,9 +70,8 @@ GrpcRequestHandler::CreateIndex(::grpc::ServerContext *context, ::grpc::Status GrpcRequestHandler::Insert(::grpc::ServerContext *context, - const ::milvus::grpc::InsertParam *request, - ::milvus::grpc::VectorIds *response) { - + const ::milvus::grpc::InsertParam *request, + ::milvus::grpc::VectorIds *response) { BaseTaskPtr task_ptr = InsertTask::Create(request, response); ::milvus::grpc::Status grpc_status; GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); @@ -86,7 +84,6 @@ GrpcRequestHandler::Insert(::grpc::ServerContext *context, GrpcRequestHandler::Search(::grpc::ServerContext *context, const ::milvus::grpc::SearchParam *request, ::milvus::grpc::TopKQueryResultList *response) { - std::vector file_id_array; BaseTaskPtr task_ptr = SearchTask::Create(request, file_id_array, response); ::milvus::grpc::Status grpc_status; @@ -100,7 +97,6 @@ GrpcRequestHandler::Search(::grpc::ServerContext *context, GrpcRequestHandler::SearchInFiles(::grpc::ServerContext *context, const ::milvus::grpc::SearchInFilesParam *request, ::milvus::grpc::TopKQueryResultList *response) { - std::vector file_id_array; for (int i = 0; i < request->file_id_array_size(); i++) { file_id_array.push_back(request->file_id_array(i)); @@ -118,7 +114,6 @@ GrpcRequestHandler::SearchInFiles(::grpc::ServerContext *context, GrpcRequestHandler::DescribeTable(::grpc::ServerContext *context, const ::milvus::grpc::TableName *request, ::milvus::grpc::TableSchema *response) { - BaseTaskPtr task_ptr = DescribeTableTask::Create(request->table_name(), response); ::milvus::grpc::Status grpc_status; GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); @@ -129,9 +124,8 @@ GrpcRequestHandler::DescribeTable(::grpc::ServerContext *context, ::grpc::Status GrpcRequestHandler::CountTable(::grpc::ServerContext *context, - const ::milvus::grpc::TableName *request, - ::milvus::grpc::TableRowCount *response) { - + const ::milvus::grpc::TableName *request, + ::milvus::grpc::TableRowCount *response) { int64_t row_count = 0; BaseTaskPtr task_ptr = CountTableTask::Create(request->table_name(), row_count); ::milvus::grpc::Status grpc_status; @@ -146,7 +140,6 @@ GrpcRequestHandler::CountTable(::grpc::ServerContext *context, GrpcRequestHandler::ShowTables(::grpc::ServerContext *context, const ::milvus::grpc::Command *request, ::milvus::grpc::TableNameList *response) { - BaseTaskPtr task_ptr = ShowTablesTask::Create(response); ::milvus::grpc::Status grpc_status; GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); @@ -157,9 +150,8 @@ GrpcRequestHandler::ShowTables(::grpc::ServerContext *context, ::grpc::Status GrpcRequestHandler::Cmd(::grpc::ServerContext *context, - const ::milvus::grpc::Command *request, - ::milvus::grpc::StringReply *response) { - + const ::milvus::grpc::Command *request, + ::milvus::grpc::StringReply *response) { std::string result; BaseTaskPtr task_ptr = CmdTask::Create(request->cmd(), result); ::milvus::grpc::Status grpc_status; @@ -172,8 +164,8 @@ GrpcRequestHandler::Cmd(::grpc::ServerContext *context, ::grpc::Status GrpcRequestHandler::DeleteByRange(::grpc::ServerContext *context, - const ::milvus::grpc::DeleteByRangeParam *request, - ::milvus::grpc::Status *response) { + const ::milvus::grpc::DeleteByRangeParam *request, + ::milvus::grpc::Status *response) { BaseTaskPtr task_ptr = DeleteByRangeTask::Create(request); ::milvus::grpc::Status grpc_status; GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); @@ -184,8 +176,8 @@ GrpcRequestHandler::DeleteByRange(::grpc::ServerContext *context, ::grpc::Status GrpcRequestHandler::PreloadTable(::grpc::ServerContext *context, - const ::milvus::grpc::TableName *request, - ::milvus::grpc::Status *response) { + const ::milvus::grpc::TableName *request, + ::milvus::grpc::Status *response) { BaseTaskPtr task_ptr = PreloadTableTask::Create(request->table_name()); ::milvus::grpc::Status grpc_status; GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); @@ -196,8 +188,8 @@ GrpcRequestHandler::PreloadTable(::grpc::ServerContext *context, ::grpc::Status GrpcRequestHandler::DescribeIndex(::grpc::ServerContext *context, - const ::milvus::grpc::TableName *request, - ::milvus::grpc::IndexParam *response) { + const ::milvus::grpc::TableName *request, + ::milvus::grpc::IndexParam *response) { BaseTaskPtr task_ptr = DescribeIndexTask::Create(request->table_name(), response); ::milvus::grpc::Status grpc_status; GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); @@ -208,8 +200,8 @@ GrpcRequestHandler::DescribeIndex(::grpc::ServerContext *context, ::grpc::Status GrpcRequestHandler::DropIndex(::grpc::ServerContext *context, - const ::milvus::grpc::TableName *request, - ::milvus::grpc::Status *response) { + const ::milvus::grpc::TableName *request, + ::milvus::grpc::Status *response) { BaseTaskPtr task_ptr = DropIndexTask::Create(request->table_name()); ::milvus::grpc::Status grpc_status; GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); @@ -218,8 +210,7 @@ GrpcRequestHandler::DropIndex(::grpc::ServerContext *context, return ::grpc::Status::OK; } - -} -} -} -} \ No newline at end of file +} // namespace grpc +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/grpc_impl/GrpcRequestHandler.h b/cpp/src/server/grpc_impl/GrpcRequestHandler.h index a8a70eb88d..549e1d9d35 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestHandler.h +++ b/cpp/src/server/grpc_impl/GrpcRequestHandler.h @@ -20,15 +20,15 @@ #include #include -#include "milvus.grpc.pb.h" -#include "status.pb.h" +#include "grpc/gen-milvus/milvus.grpc.pb.h" +#include "grpc/gen-status/status.pb.h" namespace zilliz { namespace milvus { namespace server { namespace grpc { class GrpcRequestHandler final : public ::milvus::grpc::MilvusService::Service { -public: + public: /** * @brief Create table method * @@ -103,8 +103,7 @@ public: */ ::grpc::Status CreateIndex(::grpc::ServerContext *context, - const ::milvus::grpc::IndexParam *request, ::milvus::grpc::Status *response) override; - + const ::milvus::grpc::IndexParam *request, ::milvus::grpc::Status *response) override; /** * @brief Insert vector array to table @@ -123,8 +122,8 @@ public: */ ::grpc::Status Insert(::grpc::ServerContext *context, - const ::milvus::grpc::InsertParam *request, - ::milvus::grpc::VectorIds *response) override; + const ::milvus::grpc::InsertParam *request, + ::milvus::grpc::VectorIds *response) override; /** * @brief Query vector @@ -213,8 +212,8 @@ public: */ ::grpc::Status CountTable(::grpc::ServerContext *context, - const ::milvus::grpc::TableName *request, - ::milvus::grpc::TableRowCount *response) override; + const ::milvus::grpc::TableName *request, + ::milvus::grpc::TableRowCount *response) override; /** * @brief List all tables in database @@ -253,8 +252,8 @@ public: */ ::grpc::Status Cmd(::grpc::ServerContext *context, - const ::milvus::grpc::Command *request, - ::milvus::grpc::StringReply *response) override; + const ::milvus::grpc::Command *request, + ::milvus::grpc::StringReply *response) override; /** * @brief delete table by range @@ -291,8 +290,8 @@ public: */ ::grpc::Status PreloadTable(::grpc::ServerContext *context, - const ::milvus::grpc::TableName *request, - ::milvus::grpc::Status *response) override; + const ::milvus::grpc::TableName *request, + ::milvus::grpc::Status *response) override; /** * @brief Describe index @@ -310,8 +309,8 @@ public: */ ::grpc::Status DescribeIndex(::grpc::ServerContext *context, - const ::milvus::grpc::TableName *request, - ::milvus::grpc::IndexParam *response) override; + const ::milvus::grpc::TableName *request, + ::milvus::grpc::IndexParam *response) override; /** * @brief Drop index @@ -329,12 +328,12 @@ public: */ ::grpc::Status DropIndex(::grpc::ServerContext *context, - const ::milvus::grpc::TableName *request, - ::milvus::grpc::Status *response) override; - + const ::milvus::grpc::TableName *request, + ::milvus::grpc::Status *response) override; }; -} -} -} -} + +} // namespace grpc +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp b/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp index fb8daa3540..4c58195f37 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp @@ -15,101 +15,107 @@ // specific language governing permissions and limitations // under the License. -#include "GrpcRequestScheduler.h" +#include "server/grpc_impl/GrpcRequestScheduler.h" #include "utils/Log.h" -#include "src/grpc/gen-status/status.pb.h" +#include "grpc/gen-status/status.pb.h" + +#include namespace zilliz { namespace milvus { namespace server { namespace grpc { -using namespace ::milvus; - namespace { - ::milvus::grpc::ErrorCode ErrorMap(ErrorCode code) { - static const std::map code_map = { - {SERVER_UNEXPECTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, - {SERVER_UNSUPPORTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, - {SERVER_NULL_POINTER, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, - {SERVER_INVALID_ARGUMENT, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, - {SERVER_FILE_NOT_FOUND, ::milvus::grpc::ErrorCode::FILE_NOT_FOUND}, - {SERVER_NOT_IMPLEMENT, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, - {SERVER_BLOCKING_QUEUE_EMPTY, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, - {SERVER_CANNOT_CREATE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FOLDER}, - {SERVER_CANNOT_CREATE_FILE, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FILE}, - {SERVER_CANNOT_DELETE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FOLDER}, - {SERVER_CANNOT_DELETE_FILE, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FILE}, - {SERVER_TABLE_NOT_EXIST, ::milvus::grpc::ErrorCode::TABLE_NOT_EXISTS}, - {SERVER_INVALID_TABLE_NAME, ::milvus::grpc::ErrorCode::ILLEGAL_TABLE_NAME}, - {SERVER_INVALID_TABLE_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION}, - {SERVER_INVALID_TIME_RANGE, ::milvus::grpc::ErrorCode::ILLEGAL_RANGE}, - {SERVER_INVALID_VECTOR_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION}, +::milvus::grpc::ErrorCode +ErrorMap(ErrorCode code) { + static const std::map code_map = { + {SERVER_UNEXPECTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, + {SERVER_UNSUPPORTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, + {SERVER_NULL_POINTER, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, + {SERVER_INVALID_ARGUMENT, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, + {SERVER_FILE_NOT_FOUND, ::milvus::grpc::ErrorCode::FILE_NOT_FOUND}, + {SERVER_NOT_IMPLEMENT, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, + {SERVER_BLOCKING_QUEUE_EMPTY, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, + {SERVER_CANNOT_CREATE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FOLDER}, + {SERVER_CANNOT_CREATE_FILE, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FILE}, + {SERVER_CANNOT_DELETE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FOLDER}, + {SERVER_CANNOT_DELETE_FILE, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FILE}, + {SERVER_TABLE_NOT_EXIST, ::milvus::grpc::ErrorCode::TABLE_NOT_EXISTS}, + {SERVER_INVALID_TABLE_NAME, ::milvus::grpc::ErrorCode::ILLEGAL_TABLE_NAME}, + {SERVER_INVALID_TABLE_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION}, + {SERVER_INVALID_TIME_RANGE, ::milvus::grpc::ErrorCode::ILLEGAL_RANGE}, + {SERVER_INVALID_VECTOR_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION}, - {SERVER_INVALID_INDEX_TYPE, ::milvus::grpc::ErrorCode::ILLEGAL_INDEX_TYPE}, - {SERVER_INVALID_ROWRECORD, ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD}, - {SERVER_INVALID_ROWRECORD_ARRAY, ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD}, - {SERVER_INVALID_TOPK, ::milvus::grpc::ErrorCode::ILLEGAL_TOPK}, - {SERVER_INVALID_NPROBE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, - {SERVER_INVALID_INDEX_NLIST, ::milvus::grpc::ErrorCode::ILLEGAL_NLIST}, - {SERVER_INVALID_INDEX_METRIC_TYPE,::milvus::grpc::ErrorCode::ILLEGAL_METRIC_TYPE}, - {SERVER_INVALID_INDEX_FILE_SIZE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, - {SERVER_ILLEGAL_VECTOR_ID, ::milvus::grpc::ErrorCode::ILLEGAL_VECTOR_ID}, - {SERVER_ILLEGAL_SEARCH_RESULT, ::milvus::grpc::ErrorCode::ILLEGAL_SEARCH_RESULT}, - {SERVER_CACHE_ERROR, ::milvus::grpc::ErrorCode::CACHE_FAILED}, - {DB_META_TRANSACTION_FAILED, ::milvus::grpc::ErrorCode::META_FAILED}, - {SERVER_BUILD_INDEX_ERROR, ::milvus::grpc::ErrorCode::BUILD_INDEX_ERROR}, - {SERVER_OUT_OF_MEMORY, ::milvus::grpc::ErrorCode::OUT_OF_MEMORY}, - }; + {SERVER_INVALID_INDEX_TYPE, ::milvus::grpc::ErrorCode::ILLEGAL_INDEX_TYPE}, + {SERVER_INVALID_ROWRECORD, ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD}, + {SERVER_INVALID_ROWRECORD_ARRAY, ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD}, + {SERVER_INVALID_TOPK, ::milvus::grpc::ErrorCode::ILLEGAL_TOPK}, + {SERVER_INVALID_NPROBE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, + {SERVER_INVALID_INDEX_NLIST, ::milvus::grpc::ErrorCode::ILLEGAL_NLIST}, + {SERVER_INVALID_INDEX_METRIC_TYPE, ::milvus::grpc::ErrorCode::ILLEGAL_METRIC_TYPE}, + {SERVER_INVALID_INDEX_FILE_SIZE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, + {SERVER_ILLEGAL_VECTOR_ID, ::milvus::grpc::ErrorCode::ILLEGAL_VECTOR_ID}, + {SERVER_ILLEGAL_SEARCH_RESULT, ::milvus::grpc::ErrorCode::ILLEGAL_SEARCH_RESULT}, + {SERVER_CACHE_ERROR, ::milvus::grpc::ErrorCode::CACHE_FAILED}, + {DB_META_TRANSACTION_FAILED, ::milvus::grpc::ErrorCode::META_FAILED}, + {SERVER_BUILD_INDEX_ERROR, ::milvus::grpc::ErrorCode::BUILD_INDEX_ERROR}, + {SERVER_OUT_OF_MEMORY, ::milvus::grpc::ErrorCode::OUT_OF_MEMORY}, + }; - if(code_map.find(code) != code_map.end()) { - return code_map.at(code); - } else { - return ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR; - } + if (code_map.find(code) != code_map.end()) { + return code_map.at(code); + } else { + return ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR; } } +} // namespace //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// GrpcBaseTask::GrpcBaseTask(const std::string &task_group, bool async) - : task_group_(task_group), - async_(async), - done_(false) { - + : task_group_(task_group), + async_(async), + done_(false) { } GrpcBaseTask::~GrpcBaseTask() { WaitToFinish(); } -Status GrpcBaseTask::Execute() { +Status +GrpcBaseTask::Execute() { status_ = OnExecute(); Done(); return status_; } -void GrpcBaseTask::Done() { +void +GrpcBaseTask::Done() { done_ = true; finish_cond_.notify_all(); } -Status GrpcBaseTask::SetStatus(ErrorCode error_code, const std::string &error_msg) { +Status +GrpcBaseTask::SetStatus(ErrorCode error_code, const std::string &error_msg) { status_ = Status(error_code, error_msg); SERVER_LOG_ERROR << error_msg; return status_; } -Status GrpcBaseTask::WaitToFinish() { +Status +GrpcBaseTask::WaitToFinish() { std::unique_lock lock(finish_mtx_); - finish_cond_.wait(lock, [this] { return done_; }); + finish_cond_.wait(lock, [this] { + return done_; + }); return status_; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// GrpcRequestScheduler::GrpcRequestScheduler() - : stopped_(false) { + : stopped_(false) { Start(); } @@ -117,7 +123,8 @@ GrpcRequestScheduler::~GrpcRequestScheduler() { Stop(); } -void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) { +void +GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) { if (task_ptr == nullptr) { return; } @@ -127,7 +134,7 @@ void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Statu if (!task_ptr->IsAsync()) { task_ptr->WaitToFinish(); - const Status& status = task_ptr->status(); + const Status &status = task_ptr->status(); if (!status.ok()) { grpc_status->set_reason(status.message()); grpc_status->set_error_code(ErrorMap(status.code())); @@ -135,7 +142,8 @@ void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Statu } } -void GrpcRequestScheduler::Start() { +void +GrpcRequestScheduler::Start() { if (!stopped_) { return; } @@ -143,7 +151,8 @@ void GrpcRequestScheduler::Start() { stopped_ = false; } -void GrpcRequestScheduler::Stop() { +void +GrpcRequestScheduler::Stop() { if (stopped_) { return; } @@ -168,7 +177,8 @@ void GrpcRequestScheduler::Stop() { SERVER_LOG_INFO << "Scheduler stopped"; } -Status GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) { +Status +GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) { if (task_ptr == nullptr) { return Status::OK(); } @@ -186,8 +196,8 @@ Status GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) { return task_ptr->WaitToFinish();//sync execution } - -void GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) { +void +GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) { if (task_queue == nullptr) { return; } @@ -210,7 +220,8 @@ void GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) { } } -Status GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) { +Status +GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) { std::lock_guard lock(queue_mtx_); std::string group_name = task_ptr->TaskGroup(); @@ -230,7 +241,7 @@ Status GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) { return Status::OK(); } -} -} -} -} +} // namespace grpc +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/grpc_impl/GrpcRequestScheduler.h b/cpp/src/server/grpc_impl/GrpcRequestScheduler.h index e1217540fc..df5357a4bb 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestScheduler.h +++ b/cpp/src/server/grpc_impl/GrpcRequestScheduler.h @@ -19,12 +19,14 @@ #include "utils/Status.h" #include "utils/BlockingQueue.h" -#include "status.grpc.pb.h" -#include "status.pb.h" +#include "grpc/gen-status/status.grpc.pb.h" +#include "grpc/gen-status/status.pb.h" #include #include #include +#include +#include namespace zilliz { namespace milvus { @@ -32,30 +34,36 @@ namespace server { namespace grpc { class GrpcBaseTask { -protected: - GrpcBaseTask(const std::string &task_group, bool async = false); + protected: + explicit GrpcBaseTask(const std::string &task_group, bool async = false); virtual ~GrpcBaseTask(); -public: + public: Status Execute(); void Done(); Status WaitToFinish(); - std::string TaskGroup() const { return task_group_; } + std::string TaskGroup() const { + return task_group_; + } - const Status& status() const { return status_; } + const Status &status() const { + return status_; + } - bool IsAsync() const { return async_; } + bool IsAsync() const { + return async_; + } -protected: + protected: virtual Status OnExecute() = 0; Status SetStatus(ErrorCode error_code, const std::string &msg); -protected: + protected: mutable std::mutex finish_mtx_; std::condition_variable finish_cond_; @@ -71,7 +79,7 @@ using TaskQueuePtr = std::shared_ptr; using ThreadPtr = std::shared_ptr; class GrpcRequestScheduler { -public: + public: static GrpcRequestScheduler &GetInstance() { static GrpcRequestScheduler scheduler; return scheduler; @@ -85,7 +93,7 @@ public: static void ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status); -protected: + protected: GrpcRequestScheduler(); virtual ~GrpcRequestScheduler(); @@ -94,7 +102,7 @@ protected: Status PutTaskToQueue(const BaseTaskPtr &task_ptr); -private: + private: mutable std::mutex queue_mtx_; std::map task_groups_; @@ -104,7 +112,7 @@ private: bool stopped_; }; -} -} -} -} +} // namespace grpc +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/grpc_impl/GrpcRequestTask.cpp b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp index ca6b77e960..5702b80bec 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestTask.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp @@ -15,22 +15,24 @@ // specific language governing permissions and limitations // under the License. -#include +#include "server/grpc_impl/GrpcRequestTask.h" -#include "GrpcRequestTask.h" +#include +#include +#include +#include +//#include + +#include "server/Server.h" +#include "server/DBWrapper.h" #include "utils/CommonUtil.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" #include "utils/ValidationUtil.h" -#include "../DBWrapper.h" -#include "version.h" #include "GrpcServer.h" #include "db/Utils.h" #include "scheduler/SchedInst.h" -//#include - -#include "server/Server.h" - +#include "../../../version.h" namespace zilliz { namespace milvus { @@ -45,86 +47,87 @@ using DB_META = zilliz::milvus::engine::meta::Meta; using DB_DATE = zilliz::milvus::engine::meta::DateT; namespace { - engine::EngineType EngineType(int type) { - static std::map map_type = { - {0, engine::EngineType::INVALID}, - {1, engine::EngineType::FAISS_IDMAP}, - {2, engine::EngineType::FAISS_IVFFLAT}, - {3, engine::EngineType::FAISS_IVFSQ8}, - }; +engine::EngineType +EngineType(int type) { + static std::map map_type = { + {0, engine::EngineType::INVALID}, + {1, engine::EngineType::FAISS_IDMAP}, + {2, engine::EngineType::FAISS_IVFFLAT}, + {3, engine::EngineType::FAISS_IVFSQ8}, + }; - if (map_type.find(type) == map_type.end()) { - return engine::EngineType::INVALID; - } - - return map_type[type]; + if (map_type.find(type) == map_type.end()) { + return engine::EngineType::INVALID; } - int IndexType(engine::EngineType type) { - static std::map map_type = { - {engine::EngineType::INVALID, 0}, - {engine::EngineType::FAISS_IDMAP, 1}, - {engine::EngineType::FAISS_IVFFLAT, 2}, - {engine::EngineType::FAISS_IVFSQ8, 3}, - }; - - if (map_type.find(type) == map_type.end()) { - return 0; - } - - return map_type[type]; - } - - constexpr long DAY_SECONDS = 24 * 60 * 60; - - Status - ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range> &range_array, - std::vector &dates) { - dates.clear(); - for (auto &range : range_array) { - time_t tt_start, tt_end; - tm tm_start, tm_end; - if (!CommonUtil::TimeStrToTime(range.start_value(), tt_start, tm_start)) { - return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value()); - } - - if (!CommonUtil::TimeStrToTime(range.end_value(), tt_end, tm_end)) { - return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value()); - } - - long days = (tt_end > tt_start) ? (tt_end - tt_start) / DAY_SECONDS : (tt_start - tt_end) / - DAY_SECONDS; - if (days == 0) { - return Status(SERVER_INVALID_TIME_RANGE, - "Invalid time range: " + range.start_value() + " to " + range.end_value()); - } - - //range: [start_day, end_day) - for (long i = 0; i < days; i++) { - time_t tt_day = tt_start + DAY_SECONDS * i; - tm tm_day; - CommonUtil::ConvertTime(tt_day, tm_day); - - long date = tm_day.tm_year * 10000 + tm_day.tm_mon * 100 + - tm_day.tm_mday;//according to db logic - dates.push_back(date); - } - } - - return Status::OK(); - } + return map_type[type]; } +int +IndexType(engine::EngineType type) { + static std::map map_type = { + {engine::EngineType::INVALID, 0}, + {engine::EngineType::FAISS_IDMAP, 1}, + {engine::EngineType::FAISS_IVFFLAT, 2}, + {engine::EngineType::FAISS_IVFSQ8, 3}, + }; + + if (map_type.find(type) == map_type.end()) { + return 0; + } + + return map_type[type]; +} + +constexpr int64_t DAY_SECONDS = 24 * 60 * 60; + +Status +ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range> &range_array, + std::vector &dates) { + dates.clear(); + for (auto &range : range_array) { + time_t tt_start, tt_end; + tm tm_start, tm_end; + if (!CommonUtil::TimeStrToTime(range.start_value(), tt_start, tm_start)) { + return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value()); + } + + if (!CommonUtil::TimeStrToTime(range.end_value(), tt_end, tm_end)) { + return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value()); + } + + int64_t days = (tt_end > tt_start) ? (tt_end - tt_start) / DAY_SECONDS : (tt_start - tt_end) / + DAY_SECONDS; + if (days == 0) { + return Status(SERVER_INVALID_TIME_RANGE, + "Invalid time range: " + range.start_value() + " to " + range.end_value()); + } + + //range: [start_day, end_day) + for (int64_t i = 0; i < days; i++) { + time_t tt_day = tt_start + DAY_SECONDS * i; + tm tm_day; + CommonUtil::ConvertTime(tt_day, tm_day); + + int64_t date = tm_day.tm_year * 10000 + tm_day.tm_mon * 100 + + tm_day.tm_mday;//according to db logic + dates.push_back(date); + } + } + + return Status::OK(); +} +} // namespace + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// CreateTableTask::CreateTableTask(const ::milvus::grpc::TableSchema *schema) - : GrpcBaseTask(DDL_DML_TASK_GROUP), - schema_(schema) { - + : GrpcBaseTask(DDL_DML_TASK_GROUP), + schema_(schema) { } BaseTaskPtr CreateTableTask::Create(const ::milvus::grpc::TableSchema *schema) { - if(schema == nullptr) { + if (schema == nullptr) { SERVER_LOG_ERROR << "grpc input is null!"; return nullptr; } @@ -168,12 +171,11 @@ CreateTableTask::OnExecute() { status = DBWrapper::DB()->CreateTable(table_info); if (!status.ok()) { //table could exist - if(status.code() == DB_ALREADY_EXIST) { + if (status.code() == DB_ALREADY_EXIST) { return Status(SERVER_INVALID_TABLE_NAME, status.message()); } return status; } - } catch (std::exception &ex) { return Status(SERVER_UNEXPECTED_ERROR, ex.what()); } @@ -185,9 +187,9 @@ CreateTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// DescribeTableTask::DescribeTableTask(const std::string &table_name, ::milvus::grpc::TableSchema *schema) - : GrpcBaseTask(DDL_DML_TASK_GROUP), - table_name_(table_name), - schema_(schema) { + : GrpcBaseTask(DDL_DML_TASK_GROUP), + table_name_(table_name), + schema_(schema) { } BaseTaskPtr @@ -218,7 +220,6 @@ DescribeTableTask::OnExecute() { schema_->set_dimension(table_info.dimension_); schema_->set_index_file_size(table_info.index_file_size_); schema_->set_metric_type(table_info.metric_type_); - } catch (std::exception &ex) { return Status(SERVER_UNEXPECTED_ERROR, ex.what()); } @@ -230,13 +231,13 @@ DescribeTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// CreateIndexTask::CreateIndexTask(const ::milvus::grpc::IndexParam *index_param) - : GrpcBaseTask(DDL_DML_TASK_GROUP), - index_param_(index_param) { + : GrpcBaseTask(DDL_DML_TASK_GROUP), + index_param_(index_param) { } BaseTaskPtr CreateIndexTask::Create(const ::milvus::grpc::IndexParam *index_param) { - if(index_param == nullptr) { + if (index_param == nullptr) { SERVER_LOG_ERROR << "grpc input is null!"; return nullptr; } @@ -295,10 +296,9 @@ CreateIndexTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// HasTableTask::HasTableTask(const std::string &table_name, bool &has_table) - : GrpcBaseTask(DDL_DML_TASK_GROUP), - table_name_(table_name), - has_table_(has_table) { - + : GrpcBaseTask(DDL_DML_TASK_GROUP), + table_name_(table_name), + has_table_(has_table) { } BaseTaskPtr @@ -333,9 +333,8 @@ HasTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// DropTableTask::DropTableTask(const std::string &table_name) - : GrpcBaseTask(DDL_DML_TASK_GROUP), - table_name_(table_name) { - + : GrpcBaseTask(DDL_DML_TASK_GROUP), + table_name_(table_name) { } BaseTaskPtr @@ -385,9 +384,8 @@ DropTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ShowTablesTask::ShowTablesTask(::milvus::grpc::TableNameList *table_name_list) - : GrpcBaseTask(DDL_DML_TASK_GROUP), - table_name_list_(table_name_list) { - + : GrpcBaseTask(DDL_DML_TASK_GROUP), + table_name_list_(table_name_list) { } BaseTaskPtr @@ -419,8 +417,8 @@ InsertTask::InsertTask(const ::milvus::grpc::InsertParam *insert_param, BaseTaskPtr InsertTask::Create(const ::milvus::grpc::InsertParam *insert_param, - ::milvus::grpc::VectorIds *record_ids) { - if(insert_param == nullptr) { + ::milvus::grpc::VectorIds *record_ids) { + if (insert_param == nullptr) { SERVER_LOG_ERROR << "grpc input is null!"; return nullptr; } @@ -444,7 +442,7 @@ InsertTask::OnExecute() { if (!record_ids_->vector_id_array().empty()) { if (record_ids_->vector_id_array().size() != insert_param_->row_record_array_size()) { return Status(SERVER_ILLEGAL_VECTOR_ID, - "Size of vector ids is not equal to row record array size"); + "Size of vector ids is not equal to row record array size"); } } @@ -455,7 +453,7 @@ InsertTask::OnExecute() { if (!status.ok()) { if (status.code() == DB_NOT_FOUND) { return Status(SERVER_TABLE_NOT_EXIST, - "Table " + insert_param_->table_name() + " not exists"); + "Table " + insert_param_->table_name() + " not exists"); } else { return status; } @@ -465,19 +463,22 @@ InsertTask::OnExecute() { //all user provide id, or all internal id bool user_provide_ids = !insert_param_->row_id_array().empty(); //user already provided id before, all insert action require user id - if((table_info.flag_ & engine::meta::FLAG_MASK_HAS_USERID) && !user_provide_ids) { - return Status(SERVER_ILLEGAL_VECTOR_ID, "Table vector ids are user defined, please provide id for this batch"); + if ((table_info.flag_ & engine::meta::FLAG_MASK_HAS_USERID) && !user_provide_ids) { + return Status(SERVER_ILLEGAL_VECTOR_ID, + "Table vector ids are user defined, please provide id for this batch"); } //user didn't provided id before, no need to provide user id - if((table_info.flag_ & engine::meta::FLAG_MASK_NO_USERID) && user_provide_ids) { - return Status(SERVER_ILLEGAL_VECTOR_ID, "Table vector ids are auto generated, no need to provide id for this batch"); + if ((table_info.flag_ & engine::meta::FLAG_MASK_NO_USERID) && user_provide_ids) { + return Status(SERVER_ILLEGAL_VECTOR_ID, + "Table vector ids are auto generated, no need to provide id for this batch"); } rc.RecordSection("check validation"); #ifdef MILVUS_ENABLE_PROFILING - std::string fname = "/tmp/insert_" + std::to_string(this->insert_param_->row_record_array_size()) + ".profiling"; + std::string fname = "/tmp/insert_" + std::to_string(this->insert_param_->row_record_array_size()) + + ".profiling"; ProfilerStart(fname.c_str()); #endif @@ -493,8 +494,8 @@ InsertTask::OnExecute() { if (vec_dim != table_info.dimension_) { ErrorCode error_code = SERVER_INVALID_VECTOR_DIMENSION; std::string error_msg = "Invalid row record dimension: " + std::to_string(vec_dim) - + " vs. table dimension:" + - std::to_string(table_info.dimension_); + + " vs. table dimension:" + + std::to_string(table_info.dimension_); return Status(error_code, error_msg); } memcpy(&vec_f[i * table_info.dimension_], @@ -507,10 +508,10 @@ InsertTask::OnExecute() { //step 5: insert vectors auto vec_count = (uint64_t) insert_param_->row_record_array_size(); std::vector vec_ids(insert_param_->row_id_array_size(), 0); - if(!insert_param_->row_id_array().empty()) { - const int64_t* src_data = insert_param_->row_id_array().data(); - int64_t* target_data = vec_ids.data(); - memcpy(target_data, src_data, (size_t)(sizeof(int64_t)*insert_param_->row_id_array_size())); + if (!insert_param_->row_id_array().empty()) { + const int64_t *src_data = insert_param_->row_id_array().data(); + int64_t *target_data = vec_ids.data(); + memcpy(target_data, src_data, (size_t) (sizeof(int64_t) * insert_param_->row_id_array_size())); } status = DBWrapper::DB()->InsertVectors(insert_param_->table_name(), vec_count, vec_f.data(), vec_ids); @@ -525,13 +526,13 @@ InsertTask::OnExecute() { auto ids_size = record_ids_->vector_id_array_size(); if (ids_size != vec_count) { std::string msg = "Add " + std::to_string(vec_count) + " vectors but only return " - + std::to_string(ids_size) + " id"; + + std::to_string(ids_size) + " id"; return Status(SERVER_ILLEGAL_VECTOR_ID, msg); } //step 6: update table flag user_provide_ids ? table_info.flag_ |= engine::meta::FLAG_MASK_HAS_USERID - : table_info.flag_ |= engine::meta::FLAG_MASK_NO_USERID; + : table_info.flag_ |= engine::meta::FLAG_MASK_NO_USERID; status = DBWrapper::DB()->UpdateTableFlag(insert_param_->table_name(), table_info.flag_); #ifdef MILVUS_ENABLE_PROFILING @@ -540,7 +541,6 @@ InsertTask::OnExecute() { rc.RecordSection("add vectors to engine"); rc.ElapseFromBegin("total cost"); - } catch (std::exception &ex) { return Status(SERVER_UNEXPECTED_ERROR, ex.what()); } @@ -556,19 +556,17 @@ SearchTask::SearchTask(const ::milvus::grpc::SearchParam *search_vector_infos, search_param_(search_vector_infos), file_id_array_(file_id_array), topk_result_list(response) { - } BaseTaskPtr SearchTask::Create(const ::milvus::grpc::SearchParam *search_vector_infos, const std::vector &file_id_array, ::milvus::grpc::TopKQueryResultList *response) { - if(search_vector_infos == nullptr) { + if (search_vector_infos == nullptr) { SERVER_LOG_ERROR << "grpc input is null!"; return nullptr; } - return std::shared_ptr(new SearchTask(search_vector_infos, file_id_array, - response)); + return std::shared_ptr(new SearchTask(search_vector_infos, file_id_array, response)); } Status @@ -640,7 +638,7 @@ SearchTask::OnExecute() { if (query_vec_dim != table_info.dimension_) { ErrorCode error_code = SERVER_INVALID_VECTOR_DIMENSION; std::string error_msg = "Invalid row record dimension: " + std::to_string(query_vec_dim) - + " vs. table dimension:" + std::to_string(table_info.dimension_); + + " vs. table dimension:" + std::to_string(table_info.dimension_); return Status(error_code, error_msg); } @@ -655,16 +653,17 @@ SearchTask::OnExecute() { auto record_count = (uint64_t) search_param_->query_record_array().size(); #ifdef MILVUS_ENABLE_PROFILING - std::string fname = "/tmp/search_nq_" + std::to_string(this->search_param_->query_record_array_size()) + ".profiling"; + std::string fname = "/tmp/search_nq_" + std::to_string(this->search_param_->query_record_array_size()) + + ".profiling"; ProfilerStart(fname.c_str()); #endif if (file_id_array_.empty()) { status = DBWrapper::DB()->Query(table_name_, (size_t) top_k, record_count, nprobe, - vec_f.data(), dates, results); + vec_f.data(), dates, results); } else { status = DBWrapper::DB()->Query(table_name_, file_id_array_, (size_t) top_k, - record_count, nprobe, vec_f.data(), dates, results); + record_count, nprobe, vec_f.data(), dates, results); } #ifdef MILVUS_ENABLE_PROFILING @@ -682,7 +681,7 @@ SearchTask::OnExecute() { if (results.size() != record_count) { std::string msg = "Search " + std::to_string(record_count) + " vectors but only return " - + std::to_string(results.size()) + " results"; + + std::to_string(results.size()) + " results"; return Status(SERVER_ILLEGAL_SEARCH_RESULT, msg); } @@ -699,8 +698,6 @@ SearchTask::OnExecute() { //step 8: print time cost percent rc.RecordSection("construct result and send"); rc.ElapseFromBegin("totally cost"); - - } catch (std::exception &ex) { return Status(SERVER_UNEXPECTED_ERROR, ex.what()); } @@ -710,10 +707,9 @@ SearchTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// CountTableTask::CountTableTask(const std::string &table_name, int64_t &row_count) - : GrpcBaseTask(DDL_DML_TASK_GROUP), - table_name_(table_name), - row_count_(row_count) { - + : GrpcBaseTask(DDL_DML_TASK_GROUP), + table_name_(table_name), + row_count_(row_count) { } BaseTaskPtr @@ -742,7 +738,6 @@ CountTableTask::OnExecute() { row_count_ = (int64_t) row_count; rc.ElapseFromBegin("total cost"); - } catch (std::exception &ex) { return Status(SERVER_UNEXPECTED_ERROR, ex.what()); } @@ -752,10 +747,9 @@ CountTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// CmdTask::CmdTask(const std::string &cmd, std::string &result) - : GrpcBaseTask(PING_TASK_GROUP), - cmd_(cmd), - result_(result) { - + : GrpcBaseTask(PING_TASK_GROUP), + cmd_(cmd), + result_(result) { } BaseTaskPtr @@ -769,8 +763,7 @@ CmdTask::OnExecute() { result_ = MILVUS_VERSION; } else if (cmd_ == "tasktable") { result_ = scheduler::ResMgrInst::GetInstance()->DumpTaskTables(); - } - else { + } else { result_ = "OK"; } @@ -779,16 +772,17 @@ CmdTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// DeleteByRangeTask::DeleteByRangeTask(const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param) - : GrpcBaseTask(DDL_DML_TASK_GROUP), - delete_by_range_param_(delete_by_range_param){ + : GrpcBaseTask(DDL_DML_TASK_GROUP), + delete_by_range_param_(delete_by_range_param) { } BaseTaskPtr DeleteByRangeTask::Create(const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param) { - if(delete_by_range_param == nullptr) { + if (delete_by_range_param == nullptr) { SERVER_LOG_ERROR << "grpc input is null!"; return nullptr; } + return std::shared_ptr(new DeleteByRangeTask(delete_by_range_param)); } @@ -838,23 +832,21 @@ DeleteByRangeTask::OnExecute() { if (!status.ok()) { return status; } - } catch (std::exception &ex) { return Status(SERVER_UNEXPECTED_ERROR, ex.what()); } - + return Status::OK(); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// PreloadTableTask::PreloadTableTask(const std::string &table_name) - : GrpcBaseTask(DDL_DML_TASK_GROUP), - table_name_(table_name) { - + : GrpcBaseTask(DDL_DML_TASK_GROUP), + table_name_(table_name) { } BaseTaskPtr -PreloadTableTask::Create(const std::string &table_name){ +PreloadTableTask::Create(const std::string &table_name) { return std::shared_ptr(new PreloadTableTask(table_name)); } @@ -889,12 +881,11 @@ DescribeIndexTask::DescribeIndexTask(const std::string &table_name, : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), index_param_(index_param) { - } BaseTaskPtr DescribeIndexTask::Create(const std::string &table_name, - ::milvus::grpc::IndexParam *index_param){ + ::milvus::grpc::IndexParam *index_param) { return std::shared_ptr(new DescribeIndexTask(table_name, index_param)); } @@ -932,11 +923,10 @@ DescribeIndexTask::OnExecute() { DropIndexTask::DropIndexTask(const std::string &table_name) : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name) { - } BaseTaskPtr -DropIndexTask::Create(const std::string &table_name){ +DropIndexTask::Create(const std::string &table_name) { return std::shared_ptr(new DropIndexTask(table_name)); } @@ -975,7 +965,7 @@ DropIndexTask::OnExecute() { return Status::OK(); } -} -} -} -} +} // namespace grpc +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/grpc_impl/GrpcRequestTask.h b/cpp/src/server/grpc_impl/GrpcRequestTask.h index a0c4540cb3..4c8c038d44 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestTask.h +++ b/cpp/src/server/grpc_impl/GrpcRequestTask.h @@ -16,15 +16,18 @@ // under the License. #pragma once -#include "GrpcRequestScheduler.h" + +#include "server/grpc_impl/GrpcRequestScheduler.h" #include "utils/Status.h" #include "db/Types.h" -#include "milvus.grpc.pb.h" -#include "status.pb.h" +#include "grpc/gen-milvus/milvus.grpc.pb.h" +#include "grpc/gen-status/status.pb.h" #include #include +#include +#include namespace zilliz { namespace milvus { @@ -33,138 +36,130 @@ namespace grpc { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class CreateTableTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const ::milvus::grpc::TableSchema *schema); -protected: - explicit - CreateTableTask(const ::milvus::grpc::TableSchema *request); + protected: + explicit CreateTableTask(const ::milvus::grpc::TableSchema *request); Status OnExecute() override; -private: + private: const ::milvus::grpc::TableSchema *schema_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class HasTableTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const std::string &table_name, bool &has_table); -protected: + protected: HasTableTask(const std::string &request, bool &has_table); Status OnExecute() override; - -private: + private: std::string table_name_; bool &has_table_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class DescribeTableTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const std::string &table_name, ::milvus::grpc::TableSchema *schema); -protected: + protected: DescribeTableTask(const std::string &table_name, ::milvus::grpc::TableSchema *schema); Status OnExecute() override; - -private: + private: std::string table_name_; ::milvus::grpc::TableSchema *schema_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class DropTableTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const std::string &table_name); -protected: - explicit - DropTableTask(const std::string &table_name); + protected: + explicit DropTableTask(const std::string &table_name); Status OnExecute() override; - -private: + private: std::string table_name_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class CreateIndexTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const ::milvus::grpc::IndexParam *index_Param); -protected: - explicit - CreateIndexTask(const ::milvus::grpc::IndexParam *index_Param); + protected: + explicit CreateIndexTask(const ::milvus::grpc::IndexParam *index_Param); Status OnExecute() override; - -private: + private: const ::milvus::grpc::IndexParam *index_param_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class ShowTablesTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(::milvus::grpc::TableNameList *table_name_list); -protected: - explicit - ShowTablesTask(::milvus::grpc::TableNameList *table_name_list); + protected: + explicit ShowTablesTask(::milvus::grpc::TableNameList *table_name_list); Status OnExecute() override; -private: + private: ::milvus::grpc::TableNameList *table_name_list_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class InsertTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const ::milvus::grpc::InsertParam *insert_Param, ::milvus::grpc::VectorIds *record_ids_); -protected: + protected: InsertTask(const ::milvus::grpc::InsertParam *insert_Param, - ::milvus::grpc::VectorIds *record_ids_); + ::milvus::grpc::VectorIds *record_ids_); Status OnExecute() override; -private: + private: const ::milvus::grpc::InsertParam *insert_param_; ::milvus::grpc::VectorIds *record_ids_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class SearchTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const ::milvus::grpc::SearchParam *search_param, const std::vector &file_id_array, ::milvus::grpc::TopKQueryResultList *response); -protected: + protected: SearchTask(const ::milvus::grpc::SearchParam *search_param, const std::vector &file_id_array, ::milvus::grpc::TopKQueryResultList *response); @@ -172,7 +167,7 @@ protected: Status OnExecute() override; -private: + private: const ::milvus::grpc::SearchParam *search_param_; std::vector file_id_array_; ::milvus::grpc::TopKQueryResultList *topk_result_list; @@ -180,107 +175,106 @@ private: //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class CountTableTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const std::string &table_name, int64_t &row_count); -protected: + protected: CountTableTask(const std::string &table_name, int64_t &row_count); Status OnExecute() override; -private: + private: std::string table_name_; int64_t &row_count_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class CmdTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const std::string &cmd, std::string &result); -protected: + protected: CmdTask(const std::string &cmd, std::string &result); Status OnExecute() override; -private: + private: std::string cmd_; std::string &result_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class DeleteByRangeTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param); -protected: - DeleteByRangeTask(const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param); + protected: + explicit DeleteByRangeTask(const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param); Status OnExecute() override; -private: + private: const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class PreloadTableTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const std::string &table_name); -protected: - PreloadTableTask(const std::string &table_name); + protected: + explicit PreloadTableTask(const std::string &table_name); Status OnExecute() override; -private: + private: std::string table_name_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class DescribeIndexTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const std::string &table_name, - ::milvus::grpc::IndexParam *index_param); + ::milvus::grpc::IndexParam *index_param); -protected: + protected: DescribeIndexTask(const std::string &table_name, - ::milvus::grpc::IndexParam *index_param); + ::milvus::grpc::IndexParam *index_param); Status OnExecute() override; -private: + private: std::string table_name_; ::milvus::grpc::IndexParam *index_param_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class DropIndexTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const std::string &table_name); -protected: - DropIndexTask(const std::string &table_name); + protected: + explicit DropIndexTask(const std::string &table_name); Status OnExecute() override; -private: + private: std::string table_name_; - }; -} -} -} -} \ No newline at end of file +} // namespace grpc +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/grpc_impl/GrpcServer.cpp b/cpp/src/server/grpc_impl/GrpcServer.cpp index 7e20921549..065271dd55 100644 --- a/cpp/src/server/grpc_impl/GrpcServer.cpp +++ b/cpp/src/server/grpc_impl/GrpcServer.cpp @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -#include "milvus.grpc.pb.h" -#include "GrpcServer.h" +#include "grpc/gen-milvus/milvus.grpc.pb.h" +#include "server/grpc_impl/GrpcServer.h" #include "server/Config.h" #include "server/DBWrapper.h" #include "utils/Log.h" @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -35,14 +36,12 @@ #include #include - namespace zilliz { namespace milvus { namespace server { namespace grpc { - -constexpr long MESSAGE_SIZE = -1; +constexpr int64_t MESSAGE_SIZE = -1; //this class is to check port occupation during server start class NoReusePortOption : public ::grpc::ServerBuilderOption { @@ -52,11 +51,9 @@ class NoReusePortOption : public ::grpc::ServerBuilderOption { } void UpdatePlugins(std::vector> *plugins) override { - } }; - void GrpcServer::Start() { thread_ptr_ = std::make_shared(&GrpcServer::StartService, this); @@ -117,7 +114,7 @@ GrpcServer::StopService() { return Status::OK(); } -} -} -} -} \ No newline at end of file +} // namespace grpc +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/grpc_impl/GrpcServer.h b/cpp/src/server/grpc_impl/GrpcServer.h index a861692fac..9101f144b3 100644 --- a/cpp/src/server/grpc_impl/GrpcServer.h +++ b/cpp/src/server/grpc_impl/GrpcServer.h @@ -19,12 +19,12 @@ #include "utils/Status.h" +#include #include #include #include #include - namespace zilliz { namespace milvus { namespace server { @@ -52,7 +52,7 @@ class GrpcServer { std::shared_ptr thread_ptr_; }; -} -} -} -} +} // namespace grpc +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/utils/BlockingQueue.h b/cpp/src/utils/BlockingQueue.h index 6e46ed1bb1..b98fe28ef8 100644 --- a/cpp/src/utils/BlockingQueue.h +++ b/cpp/src/utils/BlockingQueue.h @@ -29,8 +29,9 @@ namespace server { template class BlockingQueue { -public: - BlockingQueue() : mtx(), full_(), empty_() {} + public: + BlockingQueue() : mtx(), full_(), empty_() { + } BlockingQueue(const BlockingQueue &rhs) = delete; @@ -50,7 +51,7 @@ public: void SetCapacity(const size_t capacity); -private: + private: mutable std::mutex mtx; std::condition_variable full_; std::condition_variable empty_; @@ -58,9 +59,8 @@ private: size_t capacity_ = 32; }; -} -} -} - +} // namespace server +} // namespace milvus +} // namespace zilliz #include "./BlockingQueue.inl" diff --git a/cpp/src/utils/BlockingQueue.inl b/cpp/src/utils/BlockingQueue.inl index 86237f3322..ed15aac77a 100644 --- a/cpp/src/utils/BlockingQueue.inl +++ b/cpp/src/utils/BlockingQueue.inl @@ -18,7 +18,6 @@ #pragma once - namespace zilliz { namespace milvus { namespace server { @@ -26,8 +25,10 @@ namespace server { template void BlockingQueue::Put(const T &task) { - std::unique_lock lock(mtx); - full_.wait(lock, [this] { return (queue_.size() < capacity_); }); + std::unique_lock lock(mtx); + full_.wait(lock, [this] { + return (queue_.size() < capacity_); + }); queue_.push(task); empty_.notify_all(); @@ -36,8 +37,10 @@ BlockingQueue::Put(const T &task) { template T BlockingQueue::Take() { - std::unique_lock lock(mtx); - empty_.wait(lock, [this] { return !queue_.empty(); }); + std::unique_lock lock(mtx); + empty_.wait(lock, [this] { + return !queue_.empty(); + }); T front(queue_.front()); queue_.pop(); @@ -48,15 +51,17 @@ BlockingQueue::Take() { template size_t BlockingQueue::Size() { - std::lock_guard lock(mtx); + std::lock_guard lock(mtx); return queue_.size(); } template T BlockingQueue::Front() { - std::unique_lock lock(mtx); - empty_.wait(lock, [this] { return !queue_.empty(); }); + std::unique_lock lock(mtx); + empty_.wait(lock, [this] { + return !queue_.empty(); + }); T front(queue_.front()); return front; @@ -65,8 +70,10 @@ BlockingQueue::Front() { template T BlockingQueue::Back() { - std::unique_lock lock(mtx); - empty_.wait(lock, [this] { return !queue_.empty(); }); + std::unique_lock lock(mtx); + empty_.wait(lock, [this] { + return !queue_.empty(); + }); T back(queue_.back()); return back; @@ -75,7 +82,7 @@ BlockingQueue::Back() { template bool BlockingQueue::Empty() { - std::unique_lock lock(mtx); + std::unique_lock lock(mtx); return queue_.empty(); } @@ -85,7 +92,7 @@ BlockingQueue::SetCapacity(const size_t capacity) { capacity_ = (capacity > 0 ? capacity : capacity_); } -} -} -} +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/utils/CommonUtil.cpp b/cpp/src/utils/CommonUtil.cpp index ff985dcd7d..0116e321e0 100644 --- a/cpp/src/utils/CommonUtil.cpp +++ b/cpp/src/utils/CommonUtil.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "CommonUtil.h" +#include "utils/CommonUtil.h" #include "utils/Log.h" #include @@ -44,7 +44,8 @@ namespace server { namespace fs = boost::filesystem; -bool CommonUtil::GetSystemMemInfo(unsigned long &total_mem, unsigned long &free_mem) { +bool +CommonUtil::GetSystemMemInfo(uint64_t &total_mem, uint64_t &free_mem) { struct sysinfo info; int ret = sysinfo(&info); total_mem = info.totalram; @@ -53,7 +54,8 @@ bool CommonUtil::GetSystemMemInfo(unsigned long &total_mem, unsigned long &free_ return ret == 0;//succeed 0, failed -1 } -bool CommonUtil::GetSystemAvailableThreads(unsigned int &thread_count) { +bool +CommonUtil::GetSystemAvailableThreads(uint32_t &thread_count) { //threadCnt = std::thread::hardware_concurrency(); thread_count = sysconf(_SC_NPROCESSORS_CONF); thread_count *= THREAD_MULTIPLY_CPU; @@ -63,7 +65,8 @@ bool CommonUtil::GetSystemAvailableThreads(unsigned int &thread_count) { return true; } -bool CommonUtil::IsDirectoryExist(const std::string &path) { +bool +CommonUtil::IsDirectoryExist(const std::string &path) { DIR *dp = nullptr; if ((dp = opendir(path.c_str())) == nullptr) { return false; @@ -73,8 +76,9 @@ bool CommonUtil::IsDirectoryExist(const std::string &path) { return true; } -Status CommonUtil::CreateDirectory(const std::string &path) { - if(path.empty()) { +Status +CommonUtil::CreateDirectory(const std::string &path) { + if (path.empty()) { return Status::OK(); } @@ -87,7 +91,7 @@ Status CommonUtil::CreateDirectory(const std::string &path) { fs::path fs_path(path); fs::path parent_path = fs_path.parent_path(); Status err_status = CreateDirectory(parent_path.string()); - if(!err_status.ok()){ + if (!err_status.ok()) { return err_status; } @@ -96,7 +100,7 @@ Status CommonUtil::CreateDirectory(const std::string &path) { return Status::OK();//already exist } - int makeOK = mkdir(path.c_str(), S_IRWXU|S_IRGRP|S_IROTH); + int makeOK = mkdir(path.c_str(), S_IRWXU | S_IRGRP | S_IROTH); if (makeOK != 0) { return Status(SERVER_UNEXPECTED_ERROR, "failed to create directory: " + path); } @@ -105,37 +109,38 @@ Status CommonUtil::CreateDirectory(const std::string &path) { } namespace { - void RemoveDirectory(const std::string &path) { - DIR *dir = nullptr; - struct dirent *dmsg; - char file_name[256]; - char folder_name[256]; +void +RemoveDirectory(const std::string &path) { + DIR *dir = nullptr; + struct dirent *dmsg; + const int32_t buf_size = 256; + char file_name[buf_size]; - strcpy(folder_name, path.c_str()); - strcat(folder_name, "/%s"); - if ((dir = opendir(path.c_str())) != nullptr) { - while ((dmsg = readdir(dir)) != nullptr) { - if (strcmp(dmsg->d_name, ".") != 0 - && strcmp(dmsg->d_name, "..") != 0) { - sprintf(file_name, folder_name, dmsg->d_name); - std::string tmp = file_name; - if (tmp.find(".") == std::string::npos) { - RemoveDirectory(file_name); - } - remove(file_name); + std::string folder_name = path + "/%s"; + if ((dir = opendir(path.c_str())) != nullptr) { + while ((dmsg = readdir(dir)) != nullptr) { + if (strcmp(dmsg->d_name, ".") != 0 + && strcmp(dmsg->d_name, "..") != 0) { + snprintf(file_name, buf_size, folder_name.c_str(), dmsg->d_name); + std::string tmp = file_name; + if (tmp.find(".") == std::string::npos) { + RemoveDirectory(file_name); } + remove(file_name); } } - - if (dir != nullptr) { - closedir(dir); - } - remove(path.c_str()); } -} -Status CommonUtil::DeleteDirectory(const std::string &path) { - if(path.empty()) { + if (dir != nullptr) { + closedir(dir); + } + remove(path.c_str()); +} +} // namespace + +Status +CommonUtil::DeleteDirectory(const std::string &path) { + if (path.empty()) { return Status::OK(); } @@ -149,58 +154,63 @@ Status CommonUtil::DeleteDirectory(const std::string &path) { return Status::OK(); } -bool CommonUtil::IsFileExist(const std::string &path) { +bool +CommonUtil::IsFileExist(const std::string &path) { return (access(path.c_str(), F_OK) == 0); } -uint64_t CommonUtil::GetFileSize(const std::string &path) { +uint64_t +CommonUtil::GetFileSize(const std::string &path) { struct stat file_info; if (stat(path.c_str(), &file_info) < 0) { return 0; } else { - return (uint64_t)file_info.st_size; + return (uint64_t) file_info.st_size; } } -std::string CommonUtil::GetFileName(std::string filename) { +std::string +CommonUtil::GetFileName(std::string filename) { int pos = filename.find_last_of('/'); return filename.substr(pos + 1); } -std::string CommonUtil::GetExePath() { +std::string +CommonUtil::GetExePath() { const size_t buf_len = 1024; char buf[buf_len]; size_t cnt = readlink("/proc/self/exe", buf, buf_len); - if(cnt < 0|| cnt >= buf_len) { + if (cnt < 0 || cnt >= buf_len) { return ""; } buf[cnt] = '\0'; std::string exe_path = buf; - if(exe_path.rfind('/') != exe_path.length()){ + if (exe_path.rfind('/') != exe_path.length()) { std::string sub_str = exe_path.substr(0, exe_path.rfind('/')); return sub_str + "/"; } return exe_path; } -bool CommonUtil::TimeStrToTime(const std::string& time_str, - time_t &time_integer, - tm &time_struct, - const std::string& format) { +bool +CommonUtil::TimeStrToTime(const std::string &time_str, + time_t &time_integer, + tm &time_struct, + const std::string &format) { time_integer = 0; memset(&time_struct, 0, sizeof(tm)); int ret = sscanf(time_str.c_str(), - format.c_str(), - &(time_struct.tm_year), - &(time_struct.tm_mon), - &(time_struct.tm_mday), - &(time_struct.tm_hour), - &(time_struct.tm_min), - &(time_struct.tm_sec)); - if(ret <= 0) { + format.c_str(), + &(time_struct.tm_year), + &(time_struct.tm_mon), + &(time_struct.tm_mday), + &(time_struct.tm_hour), + &(time_struct.tm_min), + &(time_struct.tm_sec)); + if (ret <= 0) { return false; } @@ -211,15 +221,16 @@ bool CommonUtil::TimeStrToTime(const std::string& time_str, return true; } -void CommonUtil::ConvertTime(time_t time_integer, tm &time_struct) { - tm* t_m = localtime (&time_integer); - memcpy(&time_struct, t_m, sizeof(tm)); +void +CommonUtil::ConvertTime(time_t time_integer, tm &time_struct) { + localtime_r(&time_integer, &time_struct); } -void CommonUtil::ConvertTime(tm time_struct, time_t &time_integer) { +void +CommonUtil::ConvertTime(tm time_struct, time_t &time_integer) { time_integer = mktime(&time_struct); } -} -} -} +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/utils/CommonUtil.h b/cpp/src/utils/CommonUtil.h index 7e1349bf9e..b059067d50 100755 --- a/cpp/src/utils/CommonUtil.h +++ b/cpp/src/utils/CommonUtil.h @@ -22,15 +22,14 @@ #include #include - namespace zilliz { namespace milvus { namespace server { class CommonUtil { public: - static bool GetSystemMemInfo(unsigned long &total_mem, unsigned long &free_mem); - static bool GetSystemAvailableThreads(unsigned int &thread_count); + static bool GetSystemMemInfo(uint64_t &total_mem, uint64_t &free_mem); + static bool GetSystemAvailableThreads(uint32_t &thread_count); static bool IsFileExist(const std::string &path); static uint64_t GetFileSize(const std::string &path); @@ -41,16 +40,16 @@ class CommonUtil { static std::string GetFileName(std::string filename); static std::string GetExePath(); - static bool TimeStrToTime(const std::string& time_str, - time_t &time_integer, - tm &time_struct, - const std::string& format = "%d-%d-%d %d:%d:%d"); + static bool TimeStrToTime(const std::string &time_str, + time_t &time_integer, + tm &time_struct, + const std::string &format = "%d-%d-%d %d:%d:%d"); static void ConvertTime(time_t time_integer, tm &time_struct); static void ConvertTime(tm time_struct, time_t &time_integer); }; -} -} -} +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/utils/Error.h b/cpp/src/utils/Error.h index e903dcb2fb..d0bcc5cf8d 100644 --- a/cpp/src/utils/Error.h +++ b/cpp/src/utils/Error.h @@ -28,6 +28,7 @@ using ErrorCode = int32_t; constexpr ErrorCode SERVER_SUCCESS = 0; constexpr ErrorCode SERVER_ERROR_CODE_BASE = 0x30000; + constexpr ErrorCode ToServerErrorCode(const ErrorCode error_code) { return SERVER_ERROR_CODE_BASE + error_code; @@ -35,6 +36,7 @@ ToServerErrorCode(const ErrorCode error_code) { constexpr ErrorCode DB_SUCCESS = 0; constexpr ErrorCode DB_ERROR_CODE_BASE = 0x40000; + constexpr ErrorCode ToDbErrorCode(const ErrorCode error_code) { return DB_ERROR_CODE_BASE + error_code; @@ -42,6 +44,7 @@ ToDbErrorCode(const ErrorCode error_code) { constexpr ErrorCode KNOWHERE_SUCCESS = 0; constexpr ErrorCode KNOWHERE_ERROR_CODE_BASE = 0x50000; + constexpr ErrorCode ToKnowhereErrorCode(const ErrorCode error_code) { return KNOWHERE_ERROR_CODE_BASE + error_code; @@ -96,26 +99,27 @@ constexpr ErrorCode KNOWHERE_UNEXPECTED_ERROR = ToKnowhereErrorCode(3); constexpr ErrorCode KNOWHERE_NO_SPACE = ToKnowhereErrorCode(4); namespace server { - class ServerException : public std::exception { - public: - ServerException(ErrorCode error_code, - const std::string &message = std::string()) - : error_code_(error_code), message_(message) {} +class ServerException : public std::exception { + public: + ServerException(ErrorCode error_code, + const std::string &message = std::string()) + : error_code_(error_code), message_(message) { + } - public: - ErrorCode error_code() const { - return error_code_; - } + public: + ErrorCode error_code() const { + return error_code_; + } - virtual const char *what() const noexcept { - return message_.c_str(); - } + virtual const char *what() const noexcept { + return message_.c_str(); + } - private: - ErrorCode error_code_; - std::string message_; - }; -} + private: + ErrorCode error_code_; + std::string message_; +}; +} // namespace server } // namespace milvus } // namespace zilliz diff --git a/cpp/src/utils/Exception.h b/cpp/src/utils/Exception.h index 3a9ab89294..7e30c372bc 100644 --- a/cpp/src/utils/Exception.h +++ b/cpp/src/utils/Exception.h @@ -26,17 +26,17 @@ namespace zilliz { namespace milvus { class Exception : public std::exception { -public: - Exception(ErrorCode code, const std::string& message) + public: + Exception(ErrorCode code, const std::string &message) : code_(code), message_(message) { - } + } ErrorCode code() const throw() { return code_; } - virtual const char* what() const throw() { + virtual const char *what() const throw() { if (message_.empty()) { return "Default Exception."; } else { @@ -44,24 +44,23 @@ public: } } - virtual ~Exception() throw() {}; + virtual ~Exception() throw() { + } -protected: + protected: ErrorCode code_; std::string message_; }; class InvalidArgumentException : public Exception { -public: + public: InvalidArgumentException() : Exception(SERVER_INVALID_ARGUMENT, "Invalid Argument") { + } - }; - InvalidArgumentException(const std::string& message) + explicit InvalidArgumentException(const std::string &message) : Exception(SERVER_INVALID_ARGUMENT, message) { - - }; - + } }; } // namespace milvus diff --git a/cpp/src/utils/Log.h b/cpp/src/utils/Log.h index 2610a3a8bf..b1402d9e3e 100644 --- a/cpp/src/utils/Log.h +++ b/cpp/src/utils/Log.h @@ -17,7 +17,7 @@ #pragma once -#include "easylogging++.h" +#include "utils/easylogging++.h" namespace zilliz { namespace milvus { diff --git a/cpp/src/utils/LogUtil.cpp b/cpp/src/utils/LogUtil.cpp index 3a3fe0c7e2..0e710a6e3a 100644 --- a/cpp/src/utils/LogUtil.cpp +++ b/cpp/src/utils/LogUtil.cpp @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. +#include "utils/LogUtil.h" + #include #include #include -#include "LogUtil.h" - namespace zilliz { namespace milvus { namespace server { @@ -35,7 +35,8 @@ static int fatal_idx = 0; } // TODO(yzb) : change the easylogging library to get the log level from parameter rather than filename -void RolloutHandler(const char *filename, std::size_t size, el::Level level) { +void +RolloutHandler(const char *filename, std::size_t size, el::Level level) { char *dirc = strdup(filename); char *basec = strdup(filename); char *dir = dirname(dirc); @@ -80,7 +81,8 @@ void RolloutHandler(const char *filename, std::size_t size, el::Level level) { } } -Status InitLog(const std::string &log_config_file) { +Status +InitLog(const std::string &log_config_file) { el::Configurations conf(log_config_file); el::Loggers::reconfigureAllLoggers(conf); @@ -91,7 +93,6 @@ Status InitLog(const std::string &log_config_file) { return Status::OK(); } - -} // server -} // milvus -} // zilliz +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/utils/LogUtil.h b/cpp/src/utils/LogUtil.h index d86b301d9f..e3f7bed51f 100644 --- a/cpp/src/utils/LogUtil.h +++ b/cpp/src/utils/LogUtil.h @@ -18,18 +18,20 @@ #pragma once #include "utils/Status.h" +#include "utils/easylogging++.h" #include #include -#include "easylogging++.h" namespace zilliz { namespace milvus { namespace server { -Status InitLog(const std::string& log_config_file); +Status +InitLog(const std::string &log_config_file); -void RolloutHandler(const char *filename, std::size_t size, el::Level level); +void +RolloutHandler(const char *filename, std::size_t size, el::Level level); #define SHOW_LOCATION #ifdef SHOW_LOCATION @@ -38,6 +40,6 @@ void RolloutHandler(const char *filename, std::size_t size, el::Level level); #define LOCATION_INFO "" #endif -} -} -} +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/utils/SignalUtil.cpp b/cpp/src/utils/SignalUtil.cpp index 92678cfe6b..5f74852995 100644 --- a/cpp/src/utils/SignalUtil.cpp +++ b/cpp/src/utils/SignalUtil.cpp @@ -15,20 +15,20 @@ // specific language governing permissions and limitations // under the License. -#include "SignalUtil.h" +#include "utils/SignalUtil.h" #include "src/server/Server.h" #include "utils/Log.h" +#include #include #include - namespace zilliz { namespace milvus { namespace server { -void SignalUtil::HandleSignal(int signum) { - +void +SignalUtil::HandleSignal(int signum) { switch (signum) { case SIGINT: case SIGUSR2: { @@ -51,7 +51,8 @@ void SignalUtil::HandleSignal(int signum) { } } -void SignalUtil::PrintStacktrace() { +void +SignalUtil::PrintStacktrace() { SERVER_LOG_INFO << "Call stack:"; const int size = 32; @@ -65,6 +66,6 @@ void SignalUtil::PrintStacktrace() { free(stacktrace); } -} -} -} +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/utils/SignalUtil.h b/cpp/src/utils/SignalUtil.h index 39645cf989..c9cd41839b 100644 --- a/cpp/src/utils/SignalUtil.h +++ b/cpp/src/utils/SignalUtil.h @@ -27,6 +27,6 @@ class SignalUtil { static void PrintStacktrace(); }; -} -} -} +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/utils/Status.cpp b/cpp/src/utils/Status.cpp index e21a279a4c..5b512b3369 100644 --- a/cpp/src/utils/Status.cpp +++ b/cpp/src/utils/Status.cpp @@ -14,7 +14,8 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -#include "Status.h" + +#include "utils/Status.h" #include @@ -23,12 +24,12 @@ namespace milvus { constexpr int CODE_WIDTH = sizeof(StatusCode); -Status::Status(StatusCode code, const std::string& msg) { +Status::Status(StatusCode code, const std::string &msg) { //4 bytes store code //4 bytes store message length //the left bytes store message string - const uint32_t length = (uint32_t)msg.size(); - char* result = new char[length + sizeof(length) + CODE_WIDTH]; + const uint32_t length = (uint32_t) msg.size(); + char *result = new char[length + sizeof(length) + CODE_WIDTH]; std::memcpy(result, &code, CODE_WIDTH); std::memcpy(result + CODE_WIDTH, &length, sizeof(length)); memcpy(result + sizeof(length) + CODE_WIDTH, msg.data(), length); @@ -38,7 +39,6 @@ Status::Status(StatusCode code, const std::string& msg) { Status::Status() : state_(nullptr) { - } Status::~Status() { @@ -50,7 +50,7 @@ Status::Status(const Status &s) CopyFrom(s); } -Status& +Status & Status::operator=(const Status &s) { CopyFrom(s); return *this; @@ -61,7 +61,7 @@ Status::Status(Status &&s) MoveFrom(s); } -Status& +Status & Status::operator=(Status &&s) { MoveFrom(s); return *this; @@ -71,7 +71,7 @@ void Status::CopyFrom(const Status &s) { delete state_; state_ = nullptr; - if(s.state_ == nullptr) { + if (s.state_ == nullptr) { return; } @@ -79,7 +79,7 @@ Status::CopyFrom(const Status &s) { memcpy(&length, s.state_ + CODE_WIDTH, sizeof(length)); int buff_len = length + sizeof(length) + CODE_WIDTH; state_ = new char[buff_len]; - memcpy((void*)state_, (void*)s.state_, buff_len); + memcpy((void *) state_, (void *) s.state_, buff_len); } void @@ -98,7 +98,7 @@ Status::message() const { std::string msg; uint32_t length = 0; memcpy(&length, state_ + CODE_WIDTH, sizeof(length)); - if(length > 0) { + if (length > 0) { msg.append(state_ + sizeof(length) + CODE_WIDTH, length); } @@ -113,26 +113,19 @@ Status::ToString() const { std::string result; switch (code()) { - case DB_SUCCESS: - result = "OK "; + case DB_SUCCESS:result = "OK "; break; - case DB_ERROR: - result = "Error: "; + case DB_ERROR:result = "Error: "; break; - case DB_META_TRANSACTION_FAILED: - result = "Database error: "; + case DB_META_TRANSACTION_FAILED:result = "Database error: "; break; - case DB_NOT_FOUND: - result = "Not found: "; + case DB_NOT_FOUND:result = "Not found: "; break; - case DB_ALREADY_EXIST: - result = "Already exist: "; + case DB_ALREADY_EXIST:result = "Already exist: "; break; - case DB_INVALID_PATH: - result = "Invalid path: "; + case DB_INVALID_PATH:result = "Invalid path: "; break; - default: - result = "Error code(" + std::to_string(code()) + "): "; + default:result = "Error code(" + std::to_string(code()) + "): "; break; } diff --git a/cpp/src/utils/Status.h b/cpp/src/utils/Status.h index fe06c8029d..8f8f238979 100644 --- a/cpp/src/utils/Status.h +++ b/cpp/src/utils/Status.h @@ -44,14 +44,18 @@ class Status { operator=(Status &&s); static Status - OK() { return Status(); } + OK() { + return Status(); + } bool - ok() const { return state_ == nullptr || code() == 0; } + ok() const { + return state_ == nullptr || code() == 0; + } StatusCode code() const { - return (state_ == nullptr) ? 0 : *(StatusCode*)(state_); + return (state_ == nullptr) ? 0 : *(StatusCode *) (state_); } std::string @@ -60,14 +64,14 @@ class Status { std::string ToString() const; -private: + private: inline void CopyFrom(const Status &s); inline void MoveFrom(Status &s); -private: + private: const char *state_ = nullptr; }; // Status diff --git a/cpp/src/utils/StringHelpFunctions.cpp b/cpp/src/utils/StringHelpFunctions.cpp index 068c376ea0..8c9e888d3a 100644 --- a/cpp/src/utils/StringHelpFunctions.cpp +++ b/cpp/src/utils/StringHelpFunctions.cpp @@ -15,13 +15,16 @@ // specific language governing permissions and limitations // under the License. -#include "StringHelpFunctions.h" +#include "utils/StringHelpFunctions.h" + +#include namespace zilliz { namespace milvus { namespace server { -void StringHelpFunctions::TrimStringBlank(std::string &string) { +void +StringHelpFunctions::TrimStringBlank(std::string &string) { if (!string.empty()) { static std::string s_format(" \n\r\t"); string.erase(0, string.find_first_not_of(s_format)); @@ -29,17 +32,19 @@ void StringHelpFunctions::TrimStringBlank(std::string &string) { } } -void StringHelpFunctions::TrimStringQuote(std::string &string, const std::string &qoute) { +void +StringHelpFunctions::TrimStringQuote(std::string &string, const std::string &qoute) { if (!string.empty()) { string.erase(0, string.find_first_not_of(qoute)); string.erase(string.find_last_not_of(qoute) + 1); } } -Status StringHelpFunctions::SplitStringByDelimeter(const std::string &str, - const std::string &delimeter, - std::vector &result) { - if(str.empty()) { +Status +StringHelpFunctions::SplitStringByDelimeter(const std::string &str, + const std::string &delimeter, + std::vector &result) { + if (str.empty()) { return Status::OK(); } @@ -58,10 +63,11 @@ Status StringHelpFunctions::SplitStringByDelimeter(const std::string &str, return Status::OK(); } -Status StringHelpFunctions::SplitStringByQuote(const std::string &str, - const std::string &delimeter, - const std::string "e, - std::vector &result) { +Status +StringHelpFunctions::SplitStringByQuote(const std::string &str, + const std::string &delimeter, + const std::string "e, + std::vector &result) { if (quote.empty()) { return SplitStringByDelimeter(str, delimeter, result); } @@ -120,6 +126,6 @@ Status StringHelpFunctions::SplitStringByQuote(const std::string &str, return Status::OK(); } -} -} -} \ No newline at end of file +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/utils/StringHelpFunctions.h b/cpp/src/utils/StringHelpFunctions.h index d50b62d8a3..ebffa075ad 100644 --- a/cpp/src/utils/StringHelpFunctions.h +++ b/cpp/src/utils/StringHelpFunctions.h @@ -20,16 +20,17 @@ #include "utils/Status.h" #include +#include namespace zilliz { namespace milvus { namespace server { class StringHelpFunctions { -private: + private: StringHelpFunctions() = default; -public: + public: static void TrimStringBlank(std::string &string); static void TrimStringQuote(std::string &string, const std::string &qoute); @@ -56,9 +57,8 @@ public: const std::string &delimeter, const std::string "e, std::vector &result); - }; -} -} -} +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/utils/ThreadPool.h b/cpp/src/utils/ThreadPool.h index 8060ee4cf6..0524f6206d 100644 --- a/cpp/src/utils/ThreadPool.h +++ b/cpp/src/utils/ThreadPool.h @@ -26,7 +26,7 @@ #include #include #include - +#include #define MAX_THREADS_NUM 32 @@ -34,8 +34,8 @@ namespace zilliz { namespace milvus { class ThreadPool { -public: - ThreadPool(size_t threads, size_t queue_size = 1000); + public: + explicit ThreadPool(size_t threads, size_t queue_size = 1000); template auto enqueue(F &&f, Args &&... args) @@ -43,7 +43,7 @@ public: ~ThreadPool(); -private: + private: // need to keep track of threads so we can join them std::vector workers_; @@ -60,53 +60,57 @@ private: bool stop; }; - // the constructor just launches some amount of workers inline ThreadPool::ThreadPool(size_t threads, size_t queue_size) - : max_queue_size_(queue_size), stop(false) { + : max_queue_size_(queue_size), stop(false) { for (size_t i = 0; i < threads; ++i) workers_.emplace_back( - [this] { - for (;;) { - std::function task; + [this] { + for (;;) { + std::function task; - { - std::unique_lock lock(this->queue_mutex_); - this->condition_.wait(lock, - [this] { return this->stop || !this->tasks_.empty(); }); - if (this->stop && this->tasks_.empty()) - return; - task = std::move(this->tasks_.front()); - this->tasks_.pop(); - } - this->condition_.notify_all(); - - task(); + { + std::unique_lock lock(this->queue_mutex_); + this->condition_.wait(lock, + [this] { + return this->stop || !this->tasks_.empty(); + }); + if (this->stop && this->tasks_.empty()) + return; + task = std::move(this->tasks_.front()); + this->tasks_.pop(); } + this->condition_.notify_all(); + + task(); } - ); + }); } // add new work item to the pool template -auto ThreadPool::enqueue(F &&f, Args &&... args) +auto +ThreadPool::enqueue(F &&f, Args &&... args) -> std::future::type> { using return_type = typename std::result_of::type; auto task = std::make_shared >( - std::bind(std::forward(f), std::forward(args)...) - ); + std::bind(std::forward(f), std::forward(args)...)); std::future res = task->get_future(); { std::unique_lock lock(queue_mutex_); this->condition_.wait(lock, - [this] { return this->tasks_.size() < max_queue_size_; }); + [this] { + return this->tasks_.size() < max_queue_size_; + }); // don't allow enqueueing after stopping the pool if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); - tasks_.emplace([task]() { (*task)(); }); + tasks_.emplace([task]() { + (*task)(); + }); } condition_.notify_all(); return res; @@ -119,10 +123,11 @@ inline ThreadPool::~ThreadPool() { stop = true; } condition_.notify_all(); - for (std::thread &worker: workers_) + for (std::thread &worker : workers_) { worker.join(); + } } -} -} +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/utils/TimeRecorder.cpp b/cpp/src/utils/TimeRecorder.cpp index 6e4b5c1ab6..5246b35f13 100644 --- a/cpp/src/utils/TimeRecorder.cpp +++ b/cpp/src/utils/TimeRecorder.cpp @@ -15,10 +15,9 @@ // specific language governing permissions and limitations // under the License. -#include "TimeRecorder.h" +#include "utils/TimeRecorder.h" #include "utils/Log.h" - namespace zilliz { namespace milvus { @@ -100,5 +99,5 @@ TimeRecorder::ElapseFromBegin(const std::string &msg) { return span; } -} -} +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/utils/TimeRecorder.h b/cpp/src/utils/TimeRecorder.h index c6efbaeb8a..2bb937e71f 100644 --- a/cpp/src/utils/TimeRecorder.h +++ b/cpp/src/utils/TimeRecorder.h @@ -20,14 +20,13 @@ #include #include - namespace zilliz { namespace milvus { class TimeRecorder { using stdclock = std::chrono::high_resolution_clock; -public: + public: TimeRecorder(const std::string &header, int64_t log_level = 1); @@ -39,15 +38,15 @@ public: static std::string GetTimeSpanStr(double span); -private: + private: void PrintTimeRecord(const std::string &msg, double span); -private: + private: std::string header_; stdclock::time_point start_; stdclock::time_point last_; int64_t log_level_; }; -} -} +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/utils/ValidationUtil.cpp b/cpp/src/utils/ValidationUtil.cpp index c9ec2d046b..a6d83af1dc 100644 --- a/cpp/src/utils/ValidationUtil.cpp +++ b/cpp/src/utils/ValidationUtil.cpp @@ -16,16 +16,16 @@ // under the License. +#include "utils/ValidationUtil.h" #include "db/engine/ExecutionEngine.h" -#include "ValidationUtil.h" #include "Log.h" +#include #include #include #include #include - namespace zilliz { namespace milvus { namespace server { @@ -36,7 +36,6 @@ constexpr int32_t INDEX_FILE_SIZE_LIMIT = 4096; //index trigger size max = 4096 Status ValidationUtil::ValidateTableName(const std::string &table_name) { - // Table name shouldn't be empty. if (table_name.empty()) { std::string msg = "Empty table name"; @@ -78,8 +77,7 @@ ValidationUtil::ValidateTableDimension(int64_t dimension) { std::string msg = "Table dimension excceed the limitation: " + std::to_string(TABLE_DIMENSION_LIMIT); SERVER_LOG_ERROR << msg; return Status(SERVER_INVALID_VECTOR_DIMENSION, msg); - } - else { + } else { return Status::OK(); } } @@ -185,7 +183,6 @@ ValidationUtil::GetGpuMemory(uint32_t gpu_index, size_t &memory) { Status ValidationUtil::ValidateIpAddress(const std::string &ip_address) { - struct in_addr address; int result = inet_pton(AF_INET, ip_address.c_str(), &address); @@ -212,7 +209,7 @@ ValidationUtil::ValidateStringIsNumber(const std::string &str) { } try { int32_t value = std::stoi(str); - } catch(...) { + } catch (...) { return Status(SERVER_INVALID_ARGUMENT, "Invalid number"); } return Status::OK(); @@ -226,8 +223,7 @@ ValidationUtil::ValidateStringIsBool(const std::string &str) { s == "false" || s == "off" || s == "no" || s == "0" || s.empty()) { return Status::OK(); - } - else { + } else { return Status(SERVER_INVALID_ARGUMENT, "Invalid boolean: " + str); } } @@ -236,7 +232,7 @@ Status ValidationUtil::ValidateStringIsFloat(const std::string &str) { try { float val = std::stof(str); - } catch(...) { + } catch (...) { return Status(SERVER_INVALID_ARGUMENT, "Invalid float: " + str); } return Status::OK(); @@ -289,8 +285,7 @@ ValidationUtil::ValidateDbURI(const std::string &uri) { okay = false; } } - } - else { + } else { SERVER_LOG_ERROR << "Wrong URI format: URI = " << uri; okay = false; } @@ -298,6 +293,6 @@ ValidationUtil::ValidateDbURI(const std::string &uri) { return (okay ? Status::OK() : Status(SERVER_INVALID_ARGUMENT, "Invalid db backend uri")); } -} -} -} \ No newline at end of file +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/utils/ValidationUtil.h b/cpp/src/utils/ValidationUtil.h index 789da0de40..44d6065a64 100644 --- a/cpp/src/utils/ValidationUtil.h +++ b/cpp/src/utils/ValidationUtil.h @@ -21,15 +21,17 @@ #include "db/meta/MetaTypes.h" #include "utils/Status.h" +#include + namespace zilliz { namespace milvus { namespace server { class ValidationUtil { -private: + private: ValidationUtil() = default; -public: + public: static Status ValidateTableName(const std::string &table_name); @@ -49,10 +51,10 @@ public: ValidateTableIndexMetricType(int32_t metric_type); static Status - ValidateSearchTopk(int64_t top_k, const engine::meta::TableSchema& table_schema); + ValidateSearchTopk(int64_t top_k, const engine::meta::TableSchema &table_schema); static Status - ValidateSearchNprobe(int64_t nprobe, const engine::meta::TableSchema& table_schema); + ValidateSearchNprobe(int64_t nprobe, const engine::meta::TableSchema &table_schema); static Status ValidateGpuIndex(uint32_t gpu_index); @@ -76,6 +78,6 @@ public: ValidateDbURI(const std::string &uri); }; -} -} -} \ No newline at end of file +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/unittest/metrics/prometheus_test.cpp b/cpp/unittest/metrics/prometheus_test.cpp index 6dbc05e948..a634a6ff9c 100644 --- a/cpp/unittest/metrics/prometheus_test.cpp +++ b/cpp/unittest/metrics/prometheus_test.cpp @@ -25,7 +25,7 @@ using namespace zilliz::milvus; TEST(PrometheusTest, PROMETHEUS_TEST){ - server::Config::GetInstance().SetMetricConfigAutoBootup("on"); + server::Config::GetInstance().SetMetricConfigEnableMonitor("on"); server::PrometheusMetrics instance = server::PrometheusMetrics::GetInstance(); instance.Init(); @@ -76,7 +76,7 @@ TEST(PrometheusTest, PROMETHEUS_TEST){ instance.GPUTemperature(); instance.CPUTemperature(); - server::Config::GetInstance().SetMetricConfigAutoBootup("off"); + server::Config::GetInstance().SetMetricConfigEnableMonitor("off"); instance.Init(); instance.CPUCoreUsagePercentSet(); instance.GPUTemperature(); diff --git a/cpp/unittest/server/rpc_test.cpp b/cpp/unittest/server/rpc_test.cpp index 7895d081a1..0eaadf6146 100644 --- a/cpp/unittest/server/rpc_test.cpp +++ b/cpp/unittest/server/rpc_test.cpp @@ -67,8 +67,8 @@ class RpcHandlerTest : public testing::Test { engine::DBOptions opt; server::Config::GetInstance().SetDBConfigBackendUrl("sqlite://:@:/"); - server::Config::GetInstance().SetDBConfigPath("/tmp/milvus_test"); - server::Config::GetInstance().SetDBConfigSlavePath(""); + server::Config::GetInstance().SetDBConfigPrimaryPath("/tmp/milvus_test"); + server::Config::GetInstance().SetDBConfigSecondaryPath(""); server::Config::GetInstance().SetDBConfigArchiveDiskThreshold(""); server::Config::GetInstance().SetDBConfigArchiveDaysThreshold(""); server::Config::GetInstance().SetCacheConfigCacheInsertData("");