diff --git a/ci/jenkinsfile/milvus_build.groovy b/ci/jenkinsfile/milvus_build.groovy index 44895ed5c9..6130766400 100644 --- a/ci/jenkinsfile/milvus_build.groovy +++ b/ci/jenkinsfile/milvus_build.groovy @@ -9,6 +9,7 @@ container('milvus-build-env') { sh "git config --global user.email \"test@zilliz.com\"" sh "git config --global user.name \"test\"" withCredentials([usernamePassword(credentialsId: "${params.JFROG_USER}", usernameVariable: 'USERNAME', passwordVariable: 'PASSWORD')]) { + sh "./build.sh -l" sh "export JFROG_ARTFACTORY_URL='${params.JFROG_ARTFACTORY_URL}' && export JFROG_USER_NAME='${USERNAME}' && export JFROG_PASSWORD='${PASSWORD}' && ./build.sh -t ${params.BUILD_TYPE} -j -u -c" } } diff --git a/ci/jenkinsfile/milvus_build_no_ut.groovy b/ci/jenkinsfile/milvus_build_no_ut.groovy index a807ec7ad4..f72089e8c3 100644 --- a/ci/jenkinsfile/milvus_build_no_ut.groovy +++ b/ci/jenkinsfile/milvus_build_no_ut.groovy @@ -9,6 +9,7 @@ container('milvus-build-env') { sh "git config --global user.email \"test@zilliz.com\"" sh "git config --global user.name \"test\"" withCredentials([usernamePassword(credentialsId: "${params.JFROG_USER}", usernameVariable: 'USERNAME', passwordVariable: 'PASSWORD')]) { + sh "./build.sh -l" sh "export JFROG_ARTFACTORY_URL='${params.JFROG_ARTFACTORY_URL}' && export JFROG_USER_NAME='${USERNAME}' && export JFROG_PASSWORD='${PASSWORD}' && ./build.sh -t ${params.BUILD_TYPE} -j" } } diff --git a/ci/main_jenkinsfile b/ci/main_jenkinsfile index d65ad512c2..12c6c81cfd 100644 --- a/ci/main_jenkinsfile +++ b/ci/main_jenkinsfile @@ -44,7 +44,7 @@ metadata: spec: containers: - name: milvus-build-env - image: registry.zilliz.com/milvus/milvus-build-env:v0.12 + image: registry.zilliz.com/milvus/milvus-build-env:v0.13 command: - cat tty: true diff --git a/ci/main_jenkinsfile_no_ut b/ci/main_jenkinsfile_no_ut index 72f6e04948..e7382bd1fd 100644 --- a/ci/main_jenkinsfile_no_ut +++ b/ci/main_jenkinsfile_no_ut @@ -44,7 +44,7 @@ metadata: spec: containers: - name: milvus-build-env - image: registry.zilliz.com/milvus/milvus-build-env:v0.12 + image: registry.zilliz.com/milvus/milvus-build-env:v0.13 command: - cat tty: true diff --git a/ci/nightly_main_jenkinsfile b/ci/nightly_main_jenkinsfile index 51c733f0ea..add9e00fb4 100644 --- a/ci/nightly_main_jenkinsfile +++ b/ci/nightly_main_jenkinsfile @@ -44,7 +44,7 @@ metadata: spec: containers: - name: milvus-build-env - image: registry.zilliz.com/milvus/milvus-build-env:v0.12 + image: registry.zilliz.com/milvus/milvus-build-env:v0.13 command: - cat tty: true diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index d4177150dd..c1bf840160 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -10,6 +10,9 @@ Please mark all change in change log and use the ticket from JIRA. - MS-577 - Unittest Query randomly hung - MS-587 - Count get wrong result after adding vectors and index built immediately - MS-599 - search wrong result when table created with metric_type: IP +- MS-601 - Docker logs error caused by get CPUTemperature error +- MS-622 - Delete vectors should be failed if date range is invalid +- MS-620 - Get table row counts display wrong error code ## Improvement - MS-552 - Add and change the easylogging library @@ -28,6 +31,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-609 - Update task construct function - MS-611 - Add resources validity check in ResourceMgr - MS-619 - Add optimizer class in scheduler +- MS-614 - Preload table at startup - MS-626 - Refactor DataObj to support cache any type data ## New Feature diff --git a/cpp/README.md b/cpp/README.md index f5f77d25f8..6e81b567ff 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -105,15 +105,31 @@ please reinstall CMake with curl: ``` ##### code format and linting - +Install clang-format and clang-tidy ```shell CentOS 7: $ yum install clang -Ubuntu 16.04 or 18.04: -$ sudo apt-get install clang-format clang-tidy - +Ubuntu 16.04: +$ sudo apt-get install clang-tidy +$ sudo su +$ wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key | sudo apt-key add - +$ apt-add-repository "deb http://apt.llvm.org/xenial/ llvm-toolchain-xenial-6.0 main" +$ apt-get update +$ apt-get install clang-format-6.0 +Ubuntu 18.04: +$ sudo apt-get install clang-tidy clang-format + +$ rm cmake_build/CMakeCache.txt +``` +Check code style +```shell $ ./build.sh -l ``` +To format the code +```shell +$ cd cmake_build +$ make clang-format +``` ##### Run unit test @@ -122,13 +138,14 @@ $ ./build.sh -u ``` ##### Run code coverage - +Install lcov ```shell CentOS 7: $ yum install lcov Ubuntu 16.04 or 18.04: $ sudo apt-get install lcov - +``` +```shell $ ./build.sh -u -c ``` diff --git a/cpp/build-support/lint_exclusions.txt b/cpp/build-support/lint_exclusions.txt index 6888dfa32c..6ac690f661 100644 --- a/cpp/build-support/lint_exclusions.txt +++ b/cpp/build-support/lint_exclusions.txt @@ -5,4 +5,5 @@ *thirdparty* *easylogging++* *SqliteMetaImpl.cpp -*src/grpc* \ No newline at end of file +*src/grpc* +*milvus/include* \ No newline at end of file diff --git a/cpp/build.sh b/cpp/build.sh index deda35ab27..648206f9ae 100755 --- a/cpp/build.sh +++ b/cpp/build.sh @@ -112,13 +112,13 @@ if [[ ${RUN_CPPLINT} == "ON" ]]; then fi echo "clang-format check passed!" - # clang-tidy check - make check-clang-tidy - if [ $? -ne 0 ]; then - echo "ERROR! clang-tidy check failed" - exit 1 - fi - echo "clang-tidy check passed!" +# # clang-tidy check +# make check-clang-tidy +# if [ $? -ne 0 ]; then +# echo "ERROR! clang-tidy check failed" +# exit 1 +# fi +# echo "clang-tidy check passed!" else # compile and build make -j 4 || exit 1 diff --git a/cpp/conf/server_config.template b/cpp/conf/server_config.template index 3d63cdfafa..2f2f699e09 100644 --- a/cpp/conf/server_config.template +++ b/cpp/conf/server_config.template @@ -18,6 +18,9 @@ db_config: # sum of insert_buffer_size and cpu_cache_capacity cannot exceed total memory build_index_gpu: 0 # gpu id used for building index + preload_table: # preload data at startup, '*' means load all tables, empty value means no preload + # you can specify preload tables like this: table1,table2,table3 + metric_config: enable_monitor: false # enable monitoring or not collector: prometheus # prometheus diff --git a/cpp/coverage.sh b/cpp/coverage.sh index c0defe6f5a..8a7e5f52a1 100755 --- a/cpp/coverage.sh +++ b/cpp/coverage.sh @@ -104,7 +104,7 @@ ${LCOV_CMD} -r "${FILE_INFO_OUTPUT}" -o "${FILE_INFO_OUTPUT_NEW}" \ "src/metrics/MetricBase.h"\ "src/server/Server.cpp"\ "src/server/DBWrapper.cpp"\ - "src/server/grpc_impl/GrpcMilvusServer.cpp"\ + "src/server/grpc_impl/GrpcServer.cpp"\ "src/utils/easylogging++.h"\ "src/utils/easylogging++.cc"\ diff --git a/cpp/src/core/knowhere/knowhere/adapter/SptagAdapter.cpp b/cpp/src/core/knowhere/knowhere/adapter/SptagAdapter.cpp index f4b1a7953f..b4c3910a01 100644 --- a/cpp/src/core/knowhere/knowhere/adapter/SptagAdapter.cpp +++ b/cpp/src/core/knowhere/knowhere/adapter/SptagAdapter.cpp @@ -76,7 +76,7 @@ ConvertToDataset(std::vector query_results) { auto p_id = (int64_t*)malloc(sizeof(int64_t) * elems); auto p_dist = (float*)malloc(sizeof(float) * elems); -// TODO: throw if malloc failed. + // TODO: throw if malloc failed. #pragma omp parallel for for (auto i = 0; i < query_results.size(); ++i) { diff --git a/cpp/src/core/knowhere/knowhere/common/Buffer.h b/cpp/src/core/knowhere/knowhere/common/Buffer.h index d95da18602..f9e15d95bd 100644 --- a/cpp/src/core/knowhere/knowhere/common/Buffer.h +++ b/cpp/src/core/knowhere/knowhere/common/Buffer.h @@ -36,7 +36,7 @@ struct BufferDeleter { free((void*)buffer->data()); } }; -} +} // namespace internal inline BufferPtr MakeBufferSmart(uint8_t* data, const int64_t size) { diff --git a/cpp/src/core/knowhere/knowhere/index/vector_index/FaissBaseIndex.cpp b/cpp/src/core/knowhere/knowhere/index/vector_index/FaissBaseIndex.cpp index 4fc9a82009..2612a63630 100644 --- a/cpp/src/core/knowhere/knowhere/index/vector_index/FaissBaseIndex.cpp +++ b/cpp/src/core/knowhere/knowhere/index/vector_index/FaissBaseIndex.cpp @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -#include #include #include #include "knowhere/common/Exception.h" #include "knowhere/index/vector_index/FaissBaseIndex.h" +#include "knowhere/index/vector_index/IndexIVF.h" #include "knowhere/index/vector_index/helpers/FaissIO.h" namespace knowhere { diff --git a/cpp/src/core/knowhere/knowhere/index/vector_index/IndexGPUIVF.cpp b/cpp/src/core/knowhere/knowhere/index/vector_index/IndexGPUIVF.cpp index dc886b7383..d538c0dea1 100644 --- a/cpp/src/core/knowhere/knowhere/index/vector_index/IndexGPUIVF.cpp +++ b/cpp/src/core/knowhere/knowhere/index/vector_index/IndexGPUIVF.cpp @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -#include #include #include #include @@ -26,6 +25,7 @@ #include "knowhere/adapter/VectorAdapter.h" #include "knowhere/common/Exception.h" #include "knowhere/index/vector_index/IndexGPUIVF.h" +#include "knowhere/index/vector_index/IndexIVFPQ.h" #include "knowhere/index/vector_index/helpers/Cloner.h" #include "knowhere/index/vector_index/helpers/FaissIO.h" diff --git a/cpp/src/core/knowhere/knowhere/index/vector_index/IndexGPUIVFPQ.cpp b/cpp/src/core/knowhere/knowhere/index/vector_index/IndexGPUIVFPQ.cpp index 077f0817da..213141b3ac 100644 --- a/cpp/src/core/knowhere/knowhere/index/vector_index/IndexGPUIVFPQ.cpp +++ b/cpp/src/core/knowhere/knowhere/index/vector_index/IndexGPUIVFPQ.cpp @@ -23,6 +23,7 @@ #include "knowhere/adapter/VectorAdapter.h" #include "knowhere/common/Exception.h" #include "knowhere/index/vector_index/IndexGPUIVFPQ.h" +#include "knowhere/index/vector_index/IndexIVFPQ.h" namespace knowhere { diff --git a/cpp/src/core/unittest/test_ivf.cpp b/cpp/src/core/unittest/test_ivf.cpp index a0da0eca8e..17eb888ddc 100644 --- a/cpp/src/core/unittest/test_ivf.cpp +++ b/cpp/src/core/unittest/test_ivf.cpp @@ -43,9 +43,9 @@ namespace kn = knowhere; } // namespace +using ::testing::Combine; using ::testing::TestWithParam; using ::testing::Values; -using ::testing::Combine; constexpr int device_id = 0; constexpr int64_t DIM = 128; diff --git a/cpp/src/core/unittest/test_kdt.cpp b/cpp/src/core/unittest/test_kdt.cpp index 6fa6ba33f3..f9e02bd9a4 100644 --- a/cpp/src/core/unittest/test_kdt.cpp +++ b/cpp/src/core/unittest/test_kdt.cpp @@ -34,9 +34,9 @@ namespace kn = knowhere; } // namespace +using ::testing::Combine; using ::testing::TestWithParam; using ::testing::Values; -using ::testing::Combine; class KDTTest : public DataGen, public ::testing::Test { protected: diff --git a/cpp/src/core/unittest/test_nsg/test_nsg.cpp b/cpp/src/core/unittest/test_nsg/test_nsg.cpp index 70261a63dc..b59f0d4928 100644 --- a/cpp/src/core/unittest/test_nsg/test_nsg.cpp +++ b/cpp/src/core/unittest/test_nsg/test_nsg.cpp @@ -32,9 +32,9 @@ namespace kn = knowhere; } // namespace +using ::testing::Combine; using ::testing::TestWithParam; using ::testing::Values; -using ::testing::Combine; constexpr int64_t DEVICE_ID = 1; diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index ece62284f7..f81fb32e7f 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -26,6 +26,7 @@ #include "meta/SqliteMetaImpl.h" #include "metrics/Metrics.h" #include "scheduler/SchedInst.h" +#include "scheduler/job/BuildIndexJob.h" #include "scheduler/job/DeleteJob.h" #include "scheduler/job/SearchJob.h" #include "utils/Log.h" @@ -206,7 +207,7 @@ DBImpl::PreloadTable(const std::string& table_id) { size += engine->PhysicalSize(); if (size > available_size) { - break; + return Status(SERVER_CACHE_FULL, "Cache is full"); } else { try { // step 1: load index @@ -296,7 +297,8 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) { std::vector file_types; if (index.engine_type_ == static_cast(EngineType::FAISS_IDMAP)) { file_types = { - static_cast(meta::TableFileSchema::NEW), static_cast(meta::TableFileSchema::NEW_MERGE), + static_cast(meta::TableFileSchema::NEW), + static_cast(meta::TableFileSchema::NEW_MERGE), }; } else { file_types = { @@ -639,8 +641,9 @@ DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const m ENGINE_LOG_DEBUG << "Merging file " << file_schema.file_id_; index_size = index->Size(); - if (index_size >= file_schema.index_file_size_) + if (index_size >= file_schema.index_file_size_) { break; + } } // step 3: serialize to disk @@ -896,17 +899,32 @@ DBImpl::BackgroundBuildIndex() { meta::TableFilesSchema to_index_files; meta_ptr_->FilesToIndex(to_index_files); Status status; - for (auto& file : to_index_files) { - status = BuildIndex(file); - if (!status.ok()) { - ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); - } - if (shutting_down_.load(std::memory_order_acquire)) { - ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action"; - break; - } + scheduler::BuildIndexJobPtr job = std::make_shared(0, meta_ptr_, options_); + + // step 2: put build index task to scheduler + for (auto& file : to_index_files) { + scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); + job->AddToIndexFiles(file_ptr); } + scheduler::JobMgrInst::GetInstance()->Put(job); + job->WaitBuildIndexFinish(); + if (!job->GetStatus().ok()) { + Status status = job->GetStatus(); + ENGINE_LOG_ERROR << "Building index failed: " << status.ToString(); + } + + // for (auto &file : to_index_files) { + // status = BuildIndex(file); + // if (!status.ok()) { + // ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); + // } + // + // if (shutting_down_.load(std::memory_order_acquire)) { + // ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action"; + // break; + // } + // } ENGINE_LOG_TRACE << "Background build index thread exit"; } diff --git a/cpp/src/db/engine/ExecutionEngine.h b/cpp/src/db/engine/ExecutionEngine.h index 1572b967d1..848704bd4b 100644 --- a/cpp/src/db/engine/ExecutionEngine.h +++ b/cpp/src/db/engine/ExecutionEngine.h @@ -66,6 +66,9 @@ class ExecutionEngine { virtual Status CopyToGpu(uint64_t device_id) = 0; + virtual Status + CopyToIndexFileToGpu(uint64_t device_id) = 0; + virtual Status CopyToCpu() = 0; diff --git a/cpp/src/db/engine/ExecutionEngineImpl.cpp b/cpp/src/db/engine/ExecutionEngineImpl.cpp index 7c73657f4f..5f43b41dd2 100644 --- a/cpp/src/db/engine/ExecutionEngineImpl.cpp +++ b/cpp/src/db/engine/ExecutionEngineImpl.cpp @@ -241,6 +241,17 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { return Status::OK(); } +Status +ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) { + auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_); + bool already_in_cache = (index != nullptr); + if (!already_in_cache) { + cache::DataObjPtr obj = std::make_shared(nullptr, PhysicalSize()); + milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_, obj); + } + return Status::OK(); +} + Status ExecutionEngineImpl::CopyToCpu() { auto index = std::static_pointer_cast(cache::CpuCacheMgr::GetInstance()->GetIndex(location_)); diff --git a/cpp/src/db/engine/ExecutionEngineImpl.h b/cpp/src/db/engine/ExecutionEngineImpl.h index b60a81f214..f41072e89d 100644 --- a/cpp/src/db/engine/ExecutionEngineImpl.h +++ b/cpp/src/db/engine/ExecutionEngineImpl.h @@ -58,6 +58,9 @@ class ExecutionEngineImpl : public ExecutionEngine { Status CopyToGpu(uint64_t device_id) override; + Status + CopyToIndexFileToGpu(uint64_t device_id) override; + Status CopyToCpu() override; diff --git a/cpp/src/db/insert/MemTableFile.cpp b/cpp/src/db/insert/MemTableFile.cpp index 5827513297..2c877c78ed 100644 --- a/cpp/src/db/insert/MemTableFile.cpp +++ b/cpp/src/db/insert/MemTableFile.cpp @@ -55,9 +55,9 @@ MemTableFile::CreateTableFile() { Status MemTableFile::Add(const VectorSourcePtr& source, IDNumbers& vector_ids) { if (table_file_schema_.dimension_ <= 0) { - std::string err_msg = "MemTableFile::Add: table_file_schema dimension = " + - std::to_string(table_file_schema_.dimension_) + ", table_id = " + - table_file_schema_.table_id_; + std::string err_msg = + "MemTableFile::Add: table_file_schema dimension = " + std::to_string(table_file_schema_.dimension_) + + ", table_id = " + table_file_schema_.table_id_; ENGINE_LOG_ERROR << err_msg; return Status(DB_ERROR, "Not able to create table file"); } diff --git a/cpp/src/db/meta/MySQLMetaImpl.cpp b/cpp/src/db/meta/MySQLMetaImpl.cpp index 872c1ced2e..f9f1569a65 100644 --- a/cpp/src/db/meta/MySQLMetaImpl.cpp +++ b/cpp/src/db/meta/MySQLMetaImpl.cpp @@ -148,15 +148,18 @@ static const MetaSchema TABLES_SCHEMA(META_TABLES, { }); // TableFiles schema -static const MetaSchema TABLEFILES_SCHEMA( - META_TABLEFILES, - { - MetaField("id", "BIGINT", "PRIMARY KEY AUTO_INCREMENT"), MetaField("table_id", "VARCHAR(255)", "NOT NULL"), - MetaField("engine_type", "INT", "DEFAULT 1 NOT NULL"), MetaField("file_id", "VARCHAR(255)", "NOT NULL"), - MetaField("file_type", "INT", "DEFAULT 0 NOT NULL"), MetaField("file_size", "BIGINT", "DEFAULT 0 NOT NULL"), - MetaField("row_count", "BIGINT", "DEFAULT 0 NOT NULL"), MetaField("updated_time", "BIGINT", "NOT NULL"), - MetaField("created_on", "BIGINT", "NOT NULL"), MetaField("date", "INT", "DEFAULT -1 NOT NULL"), - }); +static const MetaSchema TABLEFILES_SCHEMA(META_TABLEFILES, { + MetaField("id", "BIGINT", "PRIMARY KEY AUTO_INCREMENT"), + MetaField("table_id", "VARCHAR(255)", "NOT NULL"), + MetaField("engine_type", "INT", "DEFAULT 1 NOT NULL"), + MetaField("file_id", "VARCHAR(255)", "NOT NULL"), + MetaField("file_type", "INT", "DEFAULT 0 NOT NULL"), + MetaField("file_size", "BIGINT", "DEFAULT 0 NOT NULL"), + MetaField("row_count", "BIGINT", "DEFAULT 0 NOT NULL"), + MetaField("updated_time", "BIGINT", "NOT NULL"), + MetaField("created_on", "BIGINT", "NOT NULL"), + MetaField("date", "INT", "DEFAULT -1 NOT NULL"), + }); } // namespace diff --git a/cpp/src/metrics/PrometheusMetrics.cpp b/cpp/src/metrics/PrometheusMetrics.cpp index bc1860389f..182f14d46c 100644 --- a/cpp/src/metrics/PrometheusMetrics.cpp +++ b/cpp/src/metrics/PrometheusMetrics.cpp @@ -46,7 +46,7 @@ PrometheusMetrics::Init() { return s.code(); } - const std::string uri = std::string("/tmp/metrics"); + const std::string uri = std::string("/metrics"); const std::size_t num_threads = 2; // Init Exposer diff --git a/cpp/src/metrics/SystemInfo.cpp b/cpp/src/metrics/SystemInfo.cpp index 1414d94eae..154f7b0797 100644 --- a/cpp/src/metrics/SystemInfo.cpp +++ b/cpp/src/metrics/SystemInfo.cpp @@ -16,8 +16,12 @@ // under the License. #include "metrics/SystemInfo.h" +#include "utils/Log.h" +#include #include +#include +#include #include #include #include @@ -60,12 +64,12 @@ SystemInfo::Init() { nvmlReturn_t nvmlresult; nvmlresult = nvmlInit(); if (NVML_SUCCESS != nvmlresult) { - printf("System information initilization failed"); + SERVER_LOG_ERROR << "System information initilization failed"; return; } nvmlresult = nvmlDeviceGetCount(&num_device_); if (NVML_SUCCESS != nvmlresult) { - printf("Unable to get devidce number"); + SERVER_LOG_ERROR << "Unable to get devidce number"; return; } @@ -151,7 +155,7 @@ SystemInfo::getTotalCpuTime(std::vector& work_time_array) { std::vector total_time_array; FILE* file = fopen("/proc/stat", "r"); if (file == NULL) { - perror("Could not open stat file"); + SERVER_LOG_ERROR << "Could not open stat file"; return total_time_array; } @@ -162,7 +166,7 @@ SystemInfo::getTotalCpuTime(std::vector& work_time_array) { char buffer[1024]; char* ret = fgets(buffer, sizeof(buffer) - 1, file); if (ret == NULL) { - perror("Could not read stat file"); + SERVER_LOG_ERROR << "Could not read stat file"; fclose(file); return total_time_array; } @@ -237,18 +241,39 @@ SystemInfo::GPUTemperature() { std::vector SystemInfo::CPUTemperature() { std::vector result; - for (int i = 0; i <= num_physical_processors_; ++i) { - std::string path = "/sys/class/thermal/thermal_zone" + std::to_string(i) + "/temp"; - FILE* file = fopen(path.data(), "r"); - if (file == nullptr) { - perror("Could not open thermal file"); - return result; - } - float temp; - fscanf(file, "%f", &temp); - result.push_back(temp / 1000); - fclose(file); + std::string path = "/sys/class/hwmon/"; + + DIR* dir = NULL; + dir = opendir(path.c_str()); + if (!dir) { + SERVER_LOG_ERROR << "Could not open hwmon directory"; + return result; } + + struct dirent* ptr = NULL; + while ((ptr = readdir(dir)) != NULL) { + std::string filename(path); + filename.append(ptr->d_name); + + char buf[100]; + if (readlink(filename.c_str(), buf, 100) != -1) { + std::string m(buf); + if (m.find("coretemp") != std::string::npos) { + std::string object = filename; + object += "/temp1_input"; + FILE* file = fopen(object.c_str(), "r"); + if (file == nullptr) { + SERVER_LOG_ERROR << "Could not open temperature file"; + return result; + } + float temp; + fscanf(file, "%f", &temp); + result.push_back(temp / 1000); + } + } + } + closedir(dir); + return result; } std::vector diff --git a/cpp/src/scheduler/TaskCreator.cpp b/cpp/src/scheduler/TaskCreator.cpp index 4b848f7140..ee63c2c6b7 100644 --- a/cpp/src/scheduler/TaskCreator.cpp +++ b/cpp/src/scheduler/TaskCreator.cpp @@ -16,6 +16,8 @@ // under the License. #include "scheduler/TaskCreator.h" +#include +#include "SchedInst.h" #include "scheduler/tasklabel/BroadcastLabel.h" #include "tasklabel/DefaultLabel.h" @@ -31,6 +33,9 @@ TaskCreator::Create(const JobPtr& job) { case JobType::DELETE: { return Create(std::static_pointer_cast(job)); } + case JobType::BUILD: { + return Create(std::static_pointer_cast(job)); + } default: { // TODO(wxyu): error return std::vector(); @@ -62,5 +67,20 @@ TaskCreator::Create(const DeleteJobPtr& job) { return tasks; } +std::vector +TaskCreator::Create(const BuildIndexJobPtr& job) { + std::vector tasks; + // TODO(yukun): remove "disk" hardcode here + ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("disk"); + + for (auto& to_index_file : job->to_index_files()) { + auto label = std::make_shared(std::weak_ptr(res_ptr)); + auto task = std::make_shared(to_index_file.second, label); + task->job_ = job; + tasks.emplace_back(task); + } + return tasks; +} + } // namespace scheduler } // namespace milvus diff --git a/cpp/src/scheduler/TaskCreator.h b/cpp/src/scheduler/TaskCreator.h index 1c1e98a615..ef71d9a3d3 100644 --- a/cpp/src/scheduler/TaskCreator.h +++ b/cpp/src/scheduler/TaskCreator.h @@ -30,6 +30,7 @@ #include "job/DeleteJob.h" #include "job/Job.h" #include "job/SearchJob.h" +#include "task/BuildIndexTask.h" #include "task/DeleteTask.h" #include "task/SearchTask.h" #include "task/Task.h" @@ -48,6 +49,9 @@ class TaskCreator { static std::vector Create(const DeleteJobPtr& job); + + static std::vector + Create(const BuildIndexJobPtr& job); }; } // namespace scheduler diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index 2f5042cb73..53dd45faca 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -20,6 +20,7 @@ #include "../Algorithm.h" #include "Action.h" #include "src/cache/GpuCacheMgr.h" +#include "src/server/Config.h" namespace milvus { namespace scheduler { @@ -143,26 +144,49 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr transport_costs.push_back(transport_cost); paths.emplace_back(path); } - - // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost - uint64_t min_cost = std::numeric_limits::max(); - uint64_t min_cost_idx = 0; - for (uint64_t i = 0; i < compute_resources.size(); ++i) { - if (compute_resources[i]->TotalTasks() == 0) { - min_cost_idx = i; - break; + if (task->job_.lock()->type() == JobType::SEARCH) { + // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost + uint64_t min_cost = std::numeric_limits::max(); + uint64_t min_cost_idx = 0; + for (uint64_t i = 0; i < compute_resources.size(); ++i) { + if (compute_resources[i]->TotalTasks() == 0) { + min_cost_idx = i; + break; + } + uint64_t cost = + compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + transport_costs[i]; + if (min_cost > cost) { + min_cost = cost; + min_cost_idx = i; + } } - uint64_t cost = - compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + transport_costs[i]; - if (min_cost > cost) { - min_cost = cost; - min_cost_idx = i; + + // step 3: set path in task + Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); + task->path() = task_path; + } else if (task->job_.lock()->type() == JobType::BUILD) { + // step2: Read device id in config + // get build index gpu resource + server::Config& config = server::Config::GetInstance(); + int32_t build_index_gpu; + Status stat = config.GetDBConfigBuildIndexGPU(build_index_gpu); + + bool find_gpu_res = false; + for (uint64_t i = 0; i < compute_resources.size(); ++i) { + if (res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) { + if (compute_resources[i]->name() == + res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) { + find_gpu_res = true; + Path task_path(paths[i], paths[i].size() - 1); + task->path() = task_path; + break; + } + } + } + if (not find_gpu_res) { + task->path() = Path(paths[0], paths[0].size() - 1); } } - - // step 3: set path in task - Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); - task->path() = task_path; } if (resource->name() == task->path().Last()) { diff --git a/cpp/src/scheduler/job/BuildIndexJob.cpp b/cpp/src/scheduler/job/BuildIndexJob.cpp new file mode 100644 index 0000000000..423121c5fb --- /dev/null +++ b/cpp/src/scheduler/job/BuildIndexJob.cpp @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "scheduler/job/BuildIndexJob.h" +#include "utils/Log.h" + +#include + +namespace milvus { +namespace scheduler { + +BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options) + : Job(id, JobType::BUILD), meta_ptr_(std::move(meta_ptr)), options_(std::move(options)) { +} + +bool +BuildIndexJob::AddToIndexFiles(const engine::meta::TableFileSchemaPtr& to_index_file) { + std::unique_lock lock(mutex_); + if (to_index_file == nullptr || to_index_files_.find(to_index_file->id_) != to_index_files_.end()) { + return false; + } + + SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " add to_index file: " << to_index_file->id_; + + to_index_files_[to_index_file->id_] = to_index_file; +} + +Status& +BuildIndexJob::WaitBuildIndexFinish() { + std::unique_lock lock(mutex_); + cv_.wait(lock, [this] { return to_index_files_.empty(); }); + SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " all done"; +} + +void +BuildIndexJob::BuildIndexDone(size_t to_index_id) { + std::unique_lock lock(mutex_); + to_index_files_.erase(to_index_id); + cv_.notify_all(); + SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " finish index file: " << to_index_id; +} + +} // namespace scheduler +} // namespace milvus diff --git a/cpp/src/scheduler/job/BuildIndexJob.h b/cpp/src/scheduler/job/BuildIndexJob.h new file mode 100644 index 0000000000..b6ca462537 --- /dev/null +++ b/cpp/src/scheduler/job/BuildIndexJob.h @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Job.h" +#include "db/meta/Meta.h" +#include "scheduler/Definition.h" + +namespace milvus { +namespace scheduler { + +using engine::meta::TableFileSchemaPtr; + +using Id2ToIndexMap = std::unordered_map; +using Id2ToTableFileMap = std::unordered_map; + +class BuildIndexJob : public Job { + public: + explicit BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::DBOptions options); + + public: + bool + AddToIndexFiles(const TableFileSchemaPtr& to_index_file); + + Status& + WaitBuildIndexFinish(); + + void + BuildIndexDone(size_t to_index_id); + + public: + Status& + GetStatus() { + return status_; + } + + Id2ToIndexMap& + to_index_files() { + return to_index_files_; + } + + engine::meta::MetaPtr + meta() const { + return meta_ptr_; + } + + engine::DBOptions + options() const { + return options_; + } + + private: + Id2ToIndexMap to_index_files_; + engine::meta::MetaPtr meta_ptr_; + engine::DBOptions options_; + + Status status_; + std::mutex mutex_; + std::condition_variable cv_; +}; + +using BuildIndexJobPtr = std::shared_ptr; + +} // namespace scheduler +} // namespace milvus diff --git a/cpp/src/scheduler/optimizer/HybridPass.cpp b/cpp/src/scheduler/optimizer/HybridPass.cpp index 1b0ab7c4d0..f172a7beb9 100644 --- a/cpp/src/scheduler/optimizer/HybridPass.cpp +++ b/cpp/src/scheduler/optimizer/HybridPass.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "HybridPass.h" +#include "scheduler/optimizer/HybridPass.h" #include "scheduler/task/SearchTask.h" namespace milvus { @@ -24,12 +24,12 @@ namespace scheduler { bool HybridPass::Run(const TaskPtr& task) { // TODO: Index::IVFSQ8Hybrid, if nq < threshold set cpu, else set gpu - if (task->Type() != TaskType::SearchTask) return false; + if (task->Type() != TaskType::SearchTask) + return false; auto search_task = std::static_pointer_cast(task); // if (search_task->file_->engine_type_ == engine::EngineType::FAISS_IVFSQ8Hybrid) return false; } -} -} - +} // namespace scheduler +} // namespace milvus diff --git a/cpp/src/scheduler/optimizer/HybridPass.h b/cpp/src/scheduler/optimizer/HybridPass.h index e043bf8ad5..0d02a8bda9 100644 --- a/cpp/src/scheduler/optimizer/HybridPass.h +++ b/cpp/src/scheduler/optimizer/HybridPass.h @@ -16,16 +16,16 @@ // under the License. #pragma once -#include -#include -#include -#include -#include -#include -#include -#include #include +#include +#include #include +#include +#include +#include +#include +#include +#include #include "Pass.h" @@ -43,6 +43,5 @@ class HybridPass : public Pass { using HybridPassPtr = std::shared_ptr; -} -} - +} // namespace scheduler +} // namespace milvus diff --git a/cpp/src/scheduler/optimizer/Optimizer.cpp b/cpp/src/scheduler/optimizer/Optimizer.cpp index 517cd85f79..c5fa311a27 100644 --- a/cpp/src/scheduler/optimizer/Optimizer.cpp +++ b/cpp/src/scheduler/optimizer/Optimizer.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "Optimizer.h" +#include "scheduler/optimizer/Optimizer.h" namespace milvus { namespace scheduler { @@ -38,6 +38,5 @@ Optimizer::Run(const TaskPtr& task) { return false; } -} -} - +} // namespace scheduler +} // namespace milvus diff --git a/cpp/src/scheduler/optimizer/Optimizer.h b/cpp/src/scheduler/optimizer/Optimizer.h index cbb13a8fc2..99282e66a6 100644 --- a/cpp/src/scheduler/optimizer/Optimizer.h +++ b/cpp/src/scheduler/optimizer/Optimizer.h @@ -16,16 +16,16 @@ // under the License. #pragma once -#include -#include -#include -#include -#include -#include -#include -#include #include +#include +#include #include +#include +#include +#include +#include +#include +#include #include "Pass.h" @@ -40,12 +40,11 @@ class Optimizer { Init(); bool - Run(const TaskPtr &task); + Run(const TaskPtr& task); private: std::vector pass_list_; }; -} -} - +} // namespace scheduler +} // namespace milvus diff --git a/cpp/src/scheduler/optimizer/Pass.h b/cpp/src/scheduler/optimizer/Pass.h index 1c8def41a2..959c3ea5ee 100644 --- a/cpp/src/scheduler/optimizer/Pass.h +++ b/cpp/src/scheduler/optimizer/Pass.h @@ -16,16 +16,16 @@ // under the License. #pragma once -#include -#include -#include -#include -#include -#include -#include -#include #include +#include +#include #include +#include +#include +#include +#include +#include +#include #include "scheduler/task/Task.h" @@ -35,12 +35,13 @@ namespace scheduler { class Pass { public: virtual void - Init() {} + Init() { + } virtual bool Run(const TaskPtr& task) = 0; }; using PassPtr = std::shared_ptr; -} -} +} // namespace scheduler +} // namespace milvus diff --git a/cpp/src/scheduler/task/BuildIndexTask.cpp b/cpp/src/scheduler/task/BuildIndexTask.cpp new file mode 100644 index 0000000000..f2cebcac9e --- /dev/null +++ b/cpp/src/scheduler/task/BuildIndexTask.cpp @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "scheduler/task/BuildIndexTask.h" +#include "db/engine/EngineFactory.h" +#include "metrics/Metrics.h" +#include "scheduler/job/BuildIndexJob.h" +#include "utils/Log.h" +#include "utils/TimeRecorder.h" + +#include +#include +#include +#include + +namespace milvus { +namespace scheduler { + +XBuildIndexTask::XBuildIndexTask(TableFileSchemaPtr file, TaskLabelPtr label) + : Task(TaskType::BuildIndexTask, std::move(label)), file_(file) { + if (file_) { + to_index_engine_ = EngineFactory::Build(file_->dimension_, file_->location_, (EngineType)file_->engine_type_, + (MetricType)file_->metric_type_, file_->nlist_); + } +} + +void +XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) { + TimeRecorder rc(""); + Status stat = Status::OK(); + std::string error_msg; + std::string type_str; + + if (auto job = job_.lock()) { + auto build_index_job = std::static_pointer_cast(job); + auto options = build_index_job->options(); + try { + if (type == LoadType::DISK2CPU) { + stat = to_index_engine_->Load(options.insert_cache_immediately_); + type_str = "DISK2CPU"; + } else if (type == LoadType::CPU2GPU) { + stat = to_index_engine_->CopyToIndexFileToGpu(device_id); + type_str = "CPU2GPU"; + } else if (type == LoadType::GPU2CPU) { + stat = to_index_engine_->CopyToCpu(); + type_str = "GPU2CPU"; + } else { + error_msg = "Wrong load type"; + stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); + } + } catch (std::exception& ex) { + // typical error: out of disk space or permition denied + error_msg = "Failed to load to_index file: " + std::string(ex.what()); + stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); + } + + if (!stat.ok()) { + Status s; + if (stat.ToString().find("out of memory") != std::string::npos) { + error_msg = "out of memory: " + type_str; + s = Status(SERVER_UNEXPECTED_ERROR, error_msg); + } else { + error_msg = "Failed to load to_index file: " + type_str; + s = Status(SERVER_UNEXPECTED_ERROR, error_msg); + } + + if (auto job = job_.lock()) { + auto build_index_job = std::static_pointer_cast(job); + build_index_job->BuildIndexDone(file_->id_); + } + + return; + } + + size_t file_size = to_index_engine_->PhysicalSize(); + + std::string info = "Load file id:" + std::to_string(file_->id_) + + " file type:" + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) + + " bytes from location: " + file_->location_ + " totally cost"; + double span = rc.ElapseFromBegin(info); + + to_index_id_ = file_->id_; + to_index_type_ = file_->file_type_; + } +} + +void +XBuildIndexTask::Execute() { + if (to_index_engine_ == nullptr) { + return; + } + + TimeRecorder rc("DoBuildIndex file id:" + std::to_string(to_index_id_)); + + if (auto job = job_.lock()) { + auto build_index_job = std::static_pointer_cast(job); + std::string location = file_->location_; + EngineType engine_type = (EngineType)file_->engine_type_; + std::shared_ptr index; + + // step 2: create table file + engine::meta::TableFileSchema table_file; + table_file.table_id_ = file_->table_id_; + table_file.date_ = file_->date_; + table_file.file_type_ = engine::meta::TableFileSchema::NEW_INDEX; + + engine::meta::MetaPtr meta_ptr = build_index_job->meta(); + Status status = build_index_job->meta()->CreateTableFile(table_file); + if (!status.ok()) { + ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString(); + build_index_job->BuildIndexDone(to_index_id_); + build_index_job->GetStatus() = status; + return; + } + + // step 3: build index + try { + index = to_index_engine_->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_); + if (index == nullptr) { + table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE; + status = meta_ptr->UpdateTableFile(table_file); + ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ + << " to to_delete"; + + return; + } + } catch (std::exception& ex) { + std::string msg = "BuildIndex encounter exception: " + std::string(ex.what()); + ENGINE_LOG_ERROR << msg; + + table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE; + status = meta_ptr->UpdateTableFile(table_file); + ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; + + std::cout << "ERROR: failed to build index, index file is too large or gpu memory is not enough" + << std::endl; + + build_index_job->GetStatus() = Status(DB_ERROR, msg); + return; + } + + // step 4: if table has been deleted, dont save index file + bool has_table = false; + meta_ptr->HasTable(file_->table_id_, has_table); + if (!has_table) { + meta_ptr->DeleteTableFiles(file_->table_id_); + return; + } + + // step 5: save index file + try { + index->Serialize(); + } catch (std::exception& ex) { + // typical error: out of disk space or permition denied + std::string msg = "Serialize index encounter exception: " + std::string(ex.what()); + ENGINE_LOG_ERROR << msg; + + table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE; + status = meta_ptr->UpdateTableFile(table_file); + ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; + + std::cout << "ERROR: failed to persist index file: " << table_file.location_ + << ", possible out of disk space" << std::endl; + + build_index_job->GetStatus() = Status(DB_ERROR, msg); + return; + } + + // step 6: update meta + table_file.file_type_ = engine::meta::TableFileSchema::INDEX; + table_file.file_size_ = index->PhysicalSize(); + table_file.row_count_ = index->Count(); + + auto origin_file = *file_; + origin_file.file_type_ = engine::meta::TableFileSchema::BACKUP; + + engine::meta::TableFilesSchema update_files = {table_file, origin_file}; + status = meta_ptr->UpdateTableFiles(update_files); + if (status.ok()) { + ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " << index->PhysicalSize() + << " bytes" + << " from file " << origin_file.file_id_; + + // index->Cache(); + } else { + // failed to update meta, mark the new file as to_delete, don't delete old file + origin_file.file_type_ = engine::meta::TableFileSchema::TO_INDEX; + status = meta_ptr->UpdateTableFile(origin_file); + ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << origin_file.file_id_ << " to to_index"; + + table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE; + status = meta_ptr->UpdateTableFile(table_file); + ENGINE_LOG_DEBUG << "Failed to up date file to index, mark file: " << table_file.file_id_ + << " to to_delete"; + } + + build_index_job->BuildIndexDone(to_index_id_); + } + + rc.ElapseFromBegin("totally cost"); + + to_index_engine_ = nullptr; +} + +} // namespace scheduler +} // namespace milvus diff --git a/cpp/src/scheduler/task/BuildIndexTask.h b/cpp/src/scheduler/task/BuildIndexTask.h new file mode 100644 index 0000000000..5c2aa69a00 --- /dev/null +++ b/cpp/src/scheduler/task/BuildIndexTask.h @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "Task.h" +#include "scheduler/Definition.h" +#include "scheduler/job/BuildIndexJob.h" + +namespace milvus { +namespace scheduler { + +class XBuildIndexTask : public Task { + public: + explicit XBuildIndexTask(TableFileSchemaPtr file, TaskLabelPtr label); + + void + Load(LoadType type, uint8_t device_id) override; + + void + Execute() override; + + public: + TableFileSchemaPtr file_; + TableFileSchema table_file_; + size_t to_index_id_ = 0; + int to_index_type_ = 0; + ExecutionEnginePtr to_index_engine_ = nullptr; +}; + +} // namespace scheduler +} // namespace milvus diff --git a/cpp/src/scheduler/task/SearchTask.cpp b/cpp/src/scheduler/task/SearchTask.cpp index ee24dace43..9925a8bcf8 100644 --- a/cpp/src/scheduler/task/SearchTask.cpp +++ b/cpp/src/scheduler/task/SearchTask.cpp @@ -157,8 +157,8 @@ XSearchTask::Load(LoadType type, uint8_t device_id) { size_t file_size = index_engine_->PhysicalSize(); - std::string info = "Load file id:" + std::to_string(file_->id_) + " file type:" + - std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) + + std::string info = "Load file id:" + std::to_string(file_->id_) + + " file type:" + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) + " bytes from location: " + file_->location_ + " totally cost"; double span = rc.ElapseFromBegin(info); // for (auto &context : search_contexts_) { diff --git a/cpp/src/scheduler/task/Task.h b/cpp/src/scheduler/task/Task.h index 63d16daa08..411c18cbde 100644 --- a/cpp/src/scheduler/task/Task.h +++ b/cpp/src/scheduler/task/Task.h @@ -39,6 +39,7 @@ enum class LoadType { enum class TaskType { SearchTask, DeleteTask, + BuildIndexTask, TestTask, }; diff --git a/cpp/src/scheduler/tasklabel/SpecResLabel.h b/cpp/src/scheduler/tasklabel/SpecResLabel.h index 2e42e6c2e9..db2989fbc2 100644 --- a/cpp/src/scheduler/tasklabel/SpecResLabel.h +++ b/cpp/src/scheduler/tasklabel/SpecResLabel.h @@ -18,13 +18,14 @@ #pragma once #include "TaskLabel.h" +#include "scheduler/ResourceMgr.h" #include #include -class Resource; - -using ResourceWPtr = std::weak_ptr; +// class Resource; +// +// using ResourceWPtr = std::weak_ptr; namespace milvus { namespace scheduler { diff --git a/cpp/src/sdk/include/Status.h b/cpp/src/sdk/include/Status.h index a81116b31d..008f9956d2 100644 --- a/cpp/src/sdk/include/Status.h +++ b/cpp/src/sdk/include/Status.h @@ -20,12 +20,12 @@ #include /** \brief Milvus SDK namespace -*/ + */ namespace milvus { /** -* @brief Status Code for SDK interface return -*/ + * @brief Status Code for SDK interface return + */ enum class StatusCode { OK = 0, @@ -41,8 +41,8 @@ enum class StatusCode { }; /** -* @brief Status for SDK interface return -*/ + * @brief Status for SDK interface return + */ class Status { public: Status(StatusCode code, const std::string& msg); diff --git a/cpp/src/server/Config.cpp b/cpp/src/server/Config.cpp index 5d6a237c1d..d383eb5edd 100644 --- a/cpp/src/server/Config.cpp +++ b/cpp/src/server/Config.cpp @@ -655,294 +655,62 @@ Config::SetConfigValueInMem(const std::string& parent_key, const std::string& ch } //////////////////////////////////////////////////////////////////////////////// -/* server config */ std::string -Config::GetServerConfigStrAddress() { +Config::GetConfigStr(const std::string& parent_key, const std::string& child_key, const std::string& default_value) { std::string value; - if (!GetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_ADDRESS, value).ok()) { - value = GetConfigNode(CONFIG_SERVER).GetValue(CONFIG_SERVER_ADDRESS, CONFIG_SERVER_ADDRESS_DEFAULT); - SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_ADDRESS, value); + if (!GetConfigValueInMem(parent_key, child_key, value).ok()) { + value = GetConfigNode(parent_key).GetValue(child_key, default_value); + SetConfigValueInMem(parent_key, child_key, value); } return value; } -std::string -Config::GetServerConfigStrPort() { - std::string value; - if (!GetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_PORT, value).ok()) { - value = GetConfigNode(CONFIG_SERVER).GetValue(CONFIG_SERVER_PORT, CONFIG_SERVER_PORT_DEFAULT); - SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_PORT, value); - } - return value; -} - -std::string -Config::GetServerConfigStrDeployMode() { - std::string value; - if (!GetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_DEPLOY_MODE, value).ok()) { - value = GetConfigNode(CONFIG_SERVER).GetValue(CONFIG_SERVER_DEPLOY_MODE, CONFIG_SERVER_DEPLOY_MODE_DEFAULT); - SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_DEPLOY_MODE, value); - } - return value; -} - -std::string -Config::GetServerConfigStrTimeZone() { - std::string value; - if (!GetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_TIME_ZONE, value).ok()) { - value = GetConfigNode(CONFIG_SERVER).GetValue(CONFIG_SERVER_TIME_ZONE, CONFIG_SERVER_TIME_ZONE_DEFAULT); - SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_TIME_ZONE, value); - } - return value; -} - -//////////////////////////////////////////////////////////////////////////////// -/* db config */ -std::string -Config::GetDBConfigStrPrimaryPath() { - std::string value; - if (!GetConfigValueInMem(CONFIG_DB, CONFIG_DB_PRIMARY_PATH, value).ok()) { - value = GetConfigNode(CONFIG_DB).GetValue(CONFIG_DB_PRIMARY_PATH, CONFIG_DB_PRIMARY_PATH_DEFAULT); - SetConfigValueInMem(CONFIG_DB, CONFIG_DB_PRIMARY_PATH, value); - } - return value; -} - -std::string -Config::GetDBConfigStrSecondaryPath() { - std::string value; - if (!GetConfigValueInMem(CONFIG_DB, CONFIG_DB_SECONDARY_PATH, value).ok()) { - value = GetConfigNode(CONFIG_DB).GetValue(CONFIG_DB_SECONDARY_PATH, CONFIG_DB_SECONDARY_PATH_DEFAULT); - SetConfigValueInMem(CONFIG_DB, CONFIG_DB_SECONDARY_PATH, value); - } - return value; -} - -std::string -Config::GetDBConfigStrBackendUrl() { - std::string value; - if (!GetConfigValueInMem(CONFIG_DB, CONFIG_DB_BACKEND_URL, value).ok()) { - value = GetConfigNode(CONFIG_DB).GetValue(CONFIG_DB_BACKEND_URL, CONFIG_DB_BACKEND_URL_DEFAULT); - SetConfigValueInMem(CONFIG_DB, CONFIG_DB_BACKEND_URL, value); - } - return value; -} - -std::string -Config::GetDBConfigStrArchiveDiskThreshold() { - std::string value; - if (!GetConfigValueInMem(CONFIG_DB, CONFIG_DB_ARCHIVE_DISK_THRESHOLD, value).ok()) { - value = GetConfigNode(CONFIG_DB).GetValue(CONFIG_DB_ARCHIVE_DISK_THRESHOLD, - CONFIG_DB_ARCHIVE_DISK_THRESHOLD_DEFAULT); - SetConfigValueInMem(CONFIG_DB, CONFIG_DB_ARCHIVE_DISK_THRESHOLD, value); - } - return value; -} - -std::string -Config::GetDBConfigStrArchiveDaysThreshold() { - std::string value; - if (!GetConfigValueInMem(CONFIG_DB, CONFIG_DB_ARCHIVE_DAYS_THRESHOLD, value).ok()) { - value = GetConfigNode(CONFIG_DB).GetValue(CONFIG_DB_ARCHIVE_DAYS_THRESHOLD, - CONFIG_DB_ARCHIVE_DAYS_THRESHOLD_DEFAULT); - SetConfigValueInMem(CONFIG_DB, CONFIG_DB_ARCHIVE_DAYS_THRESHOLD, value); - } - return value; -} - -std::string -Config::GetDBConfigStrInsertBufferSize() { - std::string value; - if (!GetConfigValueInMem(CONFIG_DB, CONFIG_DB_INSERT_BUFFER_SIZE, value).ok()) { - value = GetConfigNode(CONFIG_DB).GetValue(CONFIG_DB_INSERT_BUFFER_SIZE, CONFIG_DB_INSERT_BUFFER_SIZE_DEFAULT); - SetConfigValueInMem(CONFIG_DB, CONFIG_DB_INSERT_BUFFER_SIZE, value); - } - return value; -} - -std::string -Config::GetDBConfigStrBuildIndexGPU() { - std::string value; - if (!GetConfigValueInMem(CONFIG_DB, CONFIG_DB_BUILD_INDEX_GPU, value).ok()) { - value = GetConfigNode(CONFIG_DB).GetValue(CONFIG_DB_BUILD_INDEX_GPU, CONFIG_DB_BUILD_INDEX_GPU_DEFAULT); - SetConfigValueInMem(CONFIG_DB, CONFIG_DB_BUILD_INDEX_GPU, value); - } - return value; -} - -//////////////////////////////////////////////////////////////////////////////// -/* metric config */ -std::string -Config::GetMetricConfigStrEnableMonitor() { - std::string value; - if (!GetConfigValueInMem(CONFIG_METRIC, CONFIG_METRIC_ENABLE_MONITOR, value).ok()) { - value = - GetConfigNode(CONFIG_METRIC).GetValue(CONFIG_METRIC_ENABLE_MONITOR, CONFIG_METRIC_ENABLE_MONITOR_DEFAULT); - SetConfigValueInMem(CONFIG_METRIC, CONFIG_METRIC_ENABLE_MONITOR, value); - } - return value; -} - -std::string -Config::GetMetricConfigStrCollector() { - std::string value; - if (!GetConfigValueInMem(CONFIG_METRIC, CONFIG_METRIC_COLLECTOR, value).ok()) { - value = GetConfigNode(CONFIG_METRIC).GetValue(CONFIG_METRIC_COLLECTOR, CONFIG_METRIC_COLLECTOR_DEFAULT); - SetConfigValueInMem(CONFIG_METRIC, CONFIG_METRIC_COLLECTOR, value); - } - return value; -} - -std::string -Config::GetMetricConfigStrPrometheusPort() { - std::string value; - if (!GetConfigValueInMem(CONFIG_METRIC, CONFIG_METRIC_PROMETHEUS_PORT, value).ok()) { - value = - GetConfigNode(CONFIG_METRIC).GetValue(CONFIG_METRIC_PROMETHEUS_PORT, CONFIG_METRIC_PROMETHEUS_PORT_DEFAULT); - SetConfigValueInMem(CONFIG_METRIC, CONFIG_METRIC_PROMETHEUS_PORT, value); - } - return value; -} - -//////////////////////////////////////////////////////////////////////////////// -/* cache config */ -std::string -Config::GetCacheConfigStrCpuCacheCapacity() { - std::string value; - if (!GetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_CPU_CACHE_CAPACITY, value).ok()) { - value = GetConfigNode(CONFIG_CACHE) - .GetValue(CONFIG_CACHE_CPU_CACHE_CAPACITY, CONFIG_CACHE_CPU_CACHE_CAPACITY_DEFAULT); - SetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_CPU_CACHE_CAPACITY, value); - } - return value; -} - -std::string -Config::GetCacheConfigStrCpuCacheThreshold() { - std::string value; - if (!GetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_CPU_CACHE_THRESHOLD, value).ok()) { - value = GetConfigNode(CONFIG_CACHE) - .GetValue(CONFIG_CACHE_CPU_CACHE_THRESHOLD, CONFIG_CACHE_CPU_CACHE_THRESHOLD_DEFAULT); - SetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_CPU_CACHE_THRESHOLD, value); - } - return value; -} - -std::string -Config::GetCacheConfigStrGpuCacheCapacity() { - std::string value; - if (!GetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_GPU_CACHE_CAPACITY, value).ok()) { - value = GetConfigNode(CONFIG_CACHE) - .GetValue(CONFIG_CACHE_GPU_CACHE_CAPACITY, CONFIG_CACHE_GPU_CACHE_CAPACITY_DEFAULT); - SetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_GPU_CACHE_CAPACITY, value); - } - return value; -} - -std::string -Config::GetCacheConfigStrGpuCacheThreshold() { - std::string value; - if (!GetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_GPU_CACHE_THRESHOLD, value).ok()) { - value = GetConfigNode(CONFIG_CACHE) - .GetValue(CONFIG_CACHE_GPU_CACHE_THRESHOLD, CONFIG_CACHE_GPU_CACHE_THRESHOLD_DEFAULT); - SetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_GPU_CACHE_THRESHOLD, value); - } - return value; -} - -std::string -Config::GetCacheConfigStrCacheInsertData() { - std::string value; - if (!GetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_CACHE_INSERT_DATA, value).ok()) { - value = GetConfigNode(CONFIG_CACHE) - .GetValue(CONFIG_CACHE_CACHE_INSERT_DATA, CONFIG_CACHE_CACHE_INSERT_DATA_DEFAULT); - SetConfigValueInMem(CONFIG_CACHE, CONFIG_CACHE_CACHE_INSERT_DATA, value); - } - return value; -} - -//////////////////////////////////////////////////////////////////////////////// -/* engine config */ -std::string -Config::GetEngineConfigStrUseBlasThreshold() { - std::string value; - if (!GetConfigValueInMem(CONFIG_ENGINE, CONFIG_ENGINE_USE_BLAS_THRESHOLD, value).ok()) { - value = GetConfigNode(CONFIG_ENGINE) - .GetValue(CONFIG_ENGINE_USE_BLAS_THRESHOLD, CONFIG_ENGINE_USE_BLAS_THRESHOLD_DEFAULT); - SetConfigValueInMem(CONFIG_ENGINE, CONFIG_ENGINE_USE_BLAS_THRESHOLD, value); - } - return value; -} - -std::string -Config::GetEngineConfigStrOmpThreadNum() { - std::string value; - if (!GetConfigValueInMem(CONFIG_ENGINE, CONFIG_ENGINE_OMP_THREAD_NUM, value).ok()) { - value = - GetConfigNode(CONFIG_ENGINE).GetValue(CONFIG_ENGINE_OMP_THREAD_NUM, CONFIG_ENGINE_OMP_THREAD_NUM_DEFAULT); - SetConfigValueInMem(CONFIG_ENGINE, CONFIG_ENGINE_OMP_THREAD_NUM, value); - } - return value; -} - -//////////////////////////////////////////////////////////////////////////////// -/* resource config */ -std::string -Config::GetResourceConfigStrMode() { - std::string value; - if (!GetConfigValueInMem(CONFIG_RESOURCE, CONFIG_RESOURCE_MODE, value).ok()) { - value = GetConfigNode(CONFIG_RESOURCE).GetValue(CONFIG_RESOURCE_MODE, CONFIG_RESOURCE_MODE_DEFAULT); - SetConfigValueInMem(CONFIG_RESOURCE, CONFIG_RESOURCE_MODE, value); - } - return value; -} - -//////////////////////////////////////////////////////////////////////////////// Status Config::GetServerConfigAddress(std::string& value) { - value = GetServerConfigStrAddress(); + value = GetConfigStr(CONFIG_SERVER, CONFIG_SERVER_ADDRESS, CONFIG_SERVER_ADDRESS_DEFAULT); return CheckServerConfigAddress(value); } Status Config::GetServerConfigPort(std::string& value) { - value = GetServerConfigStrPort(); + value = GetConfigStr(CONFIG_SERVER, CONFIG_SERVER_PORT, CONFIG_SERVER_PORT_DEFAULT); return CheckServerConfigPort(value); } Status Config::GetServerConfigDeployMode(std::string& value) { - value = GetServerConfigStrDeployMode(); + value = GetConfigStr(CONFIG_SERVER, CONFIG_SERVER_DEPLOY_MODE, CONFIG_SERVER_DEPLOY_MODE_DEFAULT); return CheckServerConfigDeployMode(value); } Status Config::GetServerConfigTimeZone(std::string& value) { - value = GetServerConfigStrTimeZone(); + value = GetConfigStr(CONFIG_SERVER, CONFIG_SERVER_TIME_ZONE, CONFIG_SERVER_TIME_ZONE_DEFAULT); return CheckServerConfigTimeZone(value); } Status Config::GetDBConfigPrimaryPath(std::string& value) { - value = GetDBConfigStrPrimaryPath(); + value = GetConfigStr(CONFIG_DB, CONFIG_DB_PRIMARY_PATH, CONFIG_DB_PRIMARY_PATH_DEFAULT); return CheckDBConfigPrimaryPath(value); } Status Config::GetDBConfigSecondaryPath(std::string& value) { - value = GetDBConfigStrSecondaryPath(); + value = GetConfigStr(CONFIG_DB, CONFIG_DB_SECONDARY_PATH, CONFIG_DB_SECONDARY_PATH_DEFAULT); return Status::OK(); } Status Config::GetDBConfigBackendUrl(std::string& value) { - value = GetDBConfigStrBackendUrl(); + value = GetConfigStr(CONFIG_DB, CONFIG_DB_BACKEND_URL, CONFIG_DB_BACKEND_URL_DEFAULT); return CheckDBConfigBackendUrl(value); } Status Config::GetDBConfigArchiveDiskThreshold(int32_t& value) { - std::string str = GetDBConfigStrArchiveDiskThreshold(); + std::string str = + GetConfigStr(CONFIG_DB, CONFIG_DB_ARCHIVE_DISK_THRESHOLD, CONFIG_DB_ARCHIVE_DISK_THRESHOLD_DEFAULT); Status s = CheckDBConfigArchiveDiskThreshold(str); if (!s.ok()) { return s; @@ -954,7 +722,8 @@ Config::GetDBConfigArchiveDiskThreshold(int32_t& value) { Status Config::GetDBConfigArchiveDaysThreshold(int32_t& value) { - std::string str = GetDBConfigStrArchiveDaysThreshold(); + std::string str = + GetConfigStr(CONFIG_DB, CONFIG_DB_ARCHIVE_DAYS_THRESHOLD, CONFIG_DB_ARCHIVE_DAYS_THRESHOLD_DEFAULT); Status s = CheckDBConfigArchiveDaysThreshold(str); if (!s.ok()) { return s; @@ -966,7 +735,7 @@ Config::GetDBConfigArchiveDaysThreshold(int32_t& value) { Status Config::GetDBConfigInsertBufferSize(int32_t& value) { - std::string str = GetDBConfigStrInsertBufferSize(); + std::string str = GetConfigStr(CONFIG_DB, CONFIG_DB_INSERT_BUFFER_SIZE, CONFIG_DB_INSERT_BUFFER_SIZE_DEFAULT); Status s = CheckDBConfigInsertBufferSize(str); if (!s.ok()) { return s; @@ -978,7 +747,7 @@ Config::GetDBConfigInsertBufferSize(int32_t& value) { Status Config::GetDBConfigBuildIndexGPU(int32_t& value) { - std::string str = GetDBConfigStrBuildIndexGPU(); + std::string str = GetConfigStr(CONFIG_DB, CONFIG_DB_BUILD_INDEX_GPU, CONFIG_DB_BUILD_INDEX_GPU_DEFAULT); Status s = CheckDBConfigBuildIndexGPU(str); if (!s.ok()) { return s; @@ -988,9 +757,15 @@ Config::GetDBConfigBuildIndexGPU(int32_t& value) { return Status::OK(); } +Status +Config::GetDBConfigPreloadTable(std::string& value) { + value = GetConfigStr(CONFIG_DB, CONFIG_DB_PRELOAD_TABLE); + return Status::OK(); +} + Status Config::GetMetricConfigEnableMonitor(bool& value) { - std::string str = GetMetricConfigStrEnableMonitor(); + std::string str = GetConfigStr(CONFIG_METRIC, CONFIG_METRIC_ENABLE_MONITOR, CONFIG_METRIC_ENABLE_MONITOR_DEFAULT); Status s = CheckMetricConfigEnableMonitor(str); if (!s.ok()) { return s; @@ -1003,19 +778,20 @@ Config::GetMetricConfigEnableMonitor(bool& value) { Status Config::GetMetricConfigCollector(std::string& value) { - value = GetMetricConfigStrCollector(); + value = GetConfigStr(CONFIG_METRIC, CONFIG_METRIC_COLLECTOR, CONFIG_METRIC_COLLECTOR_DEFAULT); return Status::OK(); } Status Config::GetMetricConfigPrometheusPort(std::string& value) { - value = GetMetricConfigStrPrometheusPort(); + value = GetConfigStr(CONFIG_METRIC, CONFIG_METRIC_PROMETHEUS_PORT, CONFIG_METRIC_PROMETHEUS_PORT_DEFAULT); return CheckMetricConfigPrometheusPort(value); } Status Config::GetCacheConfigCpuCacheCapacity(int32_t& value) { - std::string str = GetCacheConfigStrCpuCacheCapacity(); + std::string str = + GetConfigStr(CONFIG_CACHE, CONFIG_CACHE_CPU_CACHE_CAPACITY, CONFIG_CACHE_CPU_CACHE_CAPACITY_DEFAULT); Status s = CheckCacheConfigCpuCacheCapacity(str); if (!s.ok()) { return s; @@ -1027,7 +803,8 @@ Config::GetCacheConfigCpuCacheCapacity(int32_t& value) { Status Config::GetCacheConfigCpuCacheThreshold(float& value) { - std::string str = GetCacheConfigStrCpuCacheThreshold(); + std::string str = + GetConfigStr(CONFIG_CACHE, CONFIG_CACHE_CPU_CACHE_THRESHOLD, CONFIG_CACHE_CPU_CACHE_THRESHOLD_DEFAULT); Status s = CheckCacheConfigCpuCacheThreshold(str); if (!s.ok()) { return s; @@ -1039,7 +816,8 @@ Config::GetCacheConfigCpuCacheThreshold(float& value) { Status Config::GetCacheConfigGpuCacheCapacity(int32_t& value) { - std::string str = GetCacheConfigStrGpuCacheCapacity(); + std::string str = + GetConfigStr(CONFIG_CACHE, CONFIG_CACHE_GPU_CACHE_CAPACITY, CONFIG_CACHE_GPU_CACHE_CAPACITY_DEFAULT); Status s = CheckCacheConfigGpuCacheCapacity(str); if (!s.ok()) { return s; @@ -1051,7 +829,8 @@ Config::GetCacheConfigGpuCacheCapacity(int32_t& value) { Status Config::GetCacheConfigGpuCacheThreshold(float& value) { - std::string str = GetCacheConfigStrGpuCacheThreshold(); + std::string str = + GetConfigStr(CONFIG_CACHE, CONFIG_CACHE_GPU_CACHE_THRESHOLD, CONFIG_CACHE_GPU_CACHE_THRESHOLD_DEFAULT); Status s = CheckCacheConfigGpuCacheThreshold(str); if (!s.ok()) { return s; @@ -1063,7 +842,8 @@ Config::GetCacheConfigGpuCacheThreshold(float& value) { Status Config::GetCacheConfigCacheInsertData(bool& value) { - std::string str = GetCacheConfigStrCacheInsertData(); + std::string str = + GetConfigStr(CONFIG_CACHE, CONFIG_CACHE_CACHE_INSERT_DATA, CONFIG_CACHE_CACHE_INSERT_DATA_DEFAULT); Status s = CheckCacheConfigCacheInsertData(str); if (!s.ok()) { return s; @@ -1076,7 +856,8 @@ Config::GetCacheConfigCacheInsertData(bool& value) { Status Config::GetEngineConfigUseBlasThreshold(int32_t& value) { - std::string str = GetEngineConfigStrUseBlasThreshold(); + std::string str = + GetConfigStr(CONFIG_ENGINE, CONFIG_ENGINE_USE_BLAS_THRESHOLD, CONFIG_ENGINE_USE_BLAS_THRESHOLD_DEFAULT); Status s = CheckEngineConfigUseBlasThreshold(str); if (!s.ok()) { return s; @@ -1088,7 +869,7 @@ Config::GetEngineConfigUseBlasThreshold(int32_t& value) { Status Config::GetEngineConfigOmpThreadNum(int32_t& value) { - std::string str = GetEngineConfigStrOmpThreadNum(); + std::string str = GetConfigStr(CONFIG_ENGINE, CONFIG_ENGINE_OMP_THREAD_NUM, CONFIG_ENGINE_OMP_THREAD_NUM_DEFAULT); Status s = CheckEngineConfigOmpThreadNum(str); if (!s.ok()) { return s; @@ -1100,7 +881,7 @@ Config::GetEngineConfigOmpThreadNum(int32_t& value) { Status Config::GetResourceConfigMode(std::string& value) { - value = GetResourceConfigStrMode(); + value = GetConfigStr(CONFIG_RESOURCE, CONFIG_RESOURCE_MODE, CONFIG_RESOURCE_MODE_DEFAULT); return CheckResourceConfigMode(value); } diff --git a/cpp/src/server/Config.h b/cpp/src/server/Config.h index 9f6932d267..fb00498b96 100644 --- a/cpp/src/server/Config.h +++ b/cpp/src/server/Config.h @@ -56,6 +56,7 @@ static const char* CONFIG_DB_INSERT_BUFFER_SIZE = "insert_buffer_size"; static const char* CONFIG_DB_INSERT_BUFFER_SIZE_DEFAULT = "4"; static const char* CONFIG_DB_BUILD_INDEX_GPU = "build_index_gpu"; static const char* CONFIG_DB_BUILD_INDEX_GPU_DEFAULT = "0"; +static const char* CONFIG_DB_PRELOAD_TABLE = "preload_table"; /* cache config */ static const char* CONFIG_CACHE = "cache_config"; @@ -178,62 +179,8 @@ class Config { Status CheckResourceConfigPool(const std::vector& value); - /////////////////////////////////////////////////////////////////////////// - /* server config */ std::string - GetServerConfigStrAddress(); - std::string - GetServerConfigStrPort(); - std::string - GetServerConfigStrDeployMode(); - std::string - GetServerConfigStrTimeZone(); - - /* db config */ - std::string - GetDBConfigStrPrimaryPath(); - std::string - GetDBConfigStrSecondaryPath(); - std::string - GetDBConfigStrBackendUrl(); - std::string - GetDBConfigStrArchiveDiskThreshold(); - std::string - GetDBConfigStrArchiveDaysThreshold(); - std::string - GetDBConfigStrInsertBufferSize(); - std::string - GetDBConfigStrBuildIndexGPU(); - - /* metric config */ - std::string - GetMetricConfigStrEnableMonitor(); - std::string - GetMetricConfigStrCollector(); - std::string - GetMetricConfigStrPrometheusPort(); - - /* cache config */ - std::string - GetCacheConfigStrCpuCacheCapacity(); - std::string - GetCacheConfigStrCpuCacheThreshold(); - std::string - GetCacheConfigStrGpuCacheCapacity(); - std::string - GetCacheConfigStrGpuCacheThreshold(); - std::string - GetCacheConfigStrCacheInsertData(); - - /* engine config */ - std::string - GetEngineConfigStrUseBlasThreshold(); - std::string - GetEngineConfigStrOmpThreadNum(); - - /* resource config */ - std::string - GetResourceConfigStrMode(); + GetConfigStr(const std::string& parent_key, const std::string& child_key, const std::string& default_value = ""); public: /* server config */ @@ -261,6 +208,8 @@ class Config { GetDBConfigInsertBufferSize(int32_t& value); Status GetDBConfigBuildIndexGPU(int32_t& value); + Status + GetDBConfigPreloadTable(std::string& value); /* metric config */ Status diff --git a/cpp/src/server/DBWrapper.cpp b/cpp/src/server/DBWrapper.cpp index 401c494575..306863f8ae 100644 --- a/cpp/src/server/DBWrapper.cpp +++ b/cpp/src/server/DBWrapper.cpp @@ -25,6 +25,7 @@ #include #include #include +#include namespace milvus { namespace server { @@ -155,6 +156,20 @@ DBWrapper::StartService() { db_->Start(); + // preload table + std::string preload_tables; + s = config.GetDBConfigPreloadTable(preload_tables); + if (!s.ok()) { + return s; + } + + s = PreloadTables(preload_tables); + if (!s.ok()) { + std::cerr << "ERROR! Failed to preload tables: " << preload_tables << std::endl; + std::cerr << s.ToString() << std::endl; + kill(0, SIGUSR1); + } + return Status::OK(); } @@ -167,5 +182,34 @@ DBWrapper::StopService() { return Status::OK(); } +Status +DBWrapper::PreloadTables(const std::string& preload_tables) { + if (preload_tables.empty()) { + // do nothing + } else if (preload_tables == "*") { + // load all tables + std::vector table_schema_array; + db_->AllTables(table_schema_array); + + for (auto& schema : table_schema_array) { + auto status = db_->PreloadTable(schema.table_id_); + if (!status.ok()) { + return status; + } + } + } else { + std::vector table_names; + StringHelpFunctions::SplitStringByDelimeter(preload_tables, ",", table_names); + for (auto& name : table_names) { + auto status = db_->PreloadTable(name); + if (!status.ok()) { + return status; + } + } + } + + return Status::OK(); +} + } // namespace server } // namespace milvus diff --git a/cpp/src/server/DBWrapper.h b/cpp/src/server/DBWrapper.h index 08e07c09f6..7016aa8805 100644 --- a/cpp/src/server/DBWrapper.h +++ b/cpp/src/server/DBWrapper.h @@ -21,6 +21,7 @@ #include "utils/Status.h" #include +#include namespace milvus { namespace server { @@ -52,6 +53,10 @@ class DBWrapper { return db_; } + private: + Status + PreloadTables(const std::string& preload_tables); + private: engine::DBPtr db_; }; diff --git a/cpp/src/server/grpc_impl/GrpcRequestHandler.h b/cpp/src/server/grpc_impl/GrpcRequestHandler.h index 0decf61500..1a9b591154 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestHandler.h +++ b/cpp/src/server/grpc_impl/GrpcRequestHandler.h @@ -148,25 +148,25 @@ class GrpcRequestHandler final : public ::milvus::grpc::MilvusService::Service { ::milvus::grpc::TopKQueryResultList* response) override; /** - * @brief Internal use query interface - * - * This method is used to query vector in specified files. - * - * @param context, add context for every RPC - * @param request: - * file_id_array, specified files id array, queried. - * query_record_array, all vector are going to be queried. - * query_range_array, optional ranges for conditional search. If not specified, search whole table - * topk, how many similarity vectors will be searched. - * - * @param writer, write query result array. - * - * @return status - * - * @param context - * @param request - * @param writer - */ + * @brief Internal use query interface + * + * This method is used to query vector in specified files. + * + * @param context, add context for every RPC + * @param request: + * file_id_array, specified files id array, queried. + * query_record_array, all vector are going to be queried. + * query_range_array, optional ranges for conditional search. If not specified, search whole table + * topk, how many similarity vectors will be searched. + * + * @param writer, write query result array. + * + * @return status + * + * @param context + * @param request + * @param writer + */ ::grpc::Status SearchInFiles(::grpc::ServerContext* context, const ::milvus::grpc::SearchInFilesParam* request, ::milvus::grpc::TopKQueryResultList* response) override; diff --git a/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp b/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp index b2aefe4f65..ac35f82947 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp @@ -36,7 +36,6 @@ ErrorMap(ErrorCode code) { {SERVER_INVALID_ARGUMENT, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, {SERVER_FILE_NOT_FOUND, ::milvus::grpc::ErrorCode::FILE_NOT_FOUND}, {SERVER_NOT_IMPLEMENT, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, - {SERVER_BLOCKING_QUEUE_EMPTY, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, {SERVER_CANNOT_CREATE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FOLDER}, {SERVER_CANNOT_CREATE_FILE, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FILE}, {SERVER_CANNOT_DELETE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FOLDER}, @@ -57,7 +56,7 @@ ErrorMap(ErrorCode code) { {SERVER_INVALID_INDEX_FILE_SIZE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, {SERVER_ILLEGAL_VECTOR_ID, ::milvus::grpc::ErrorCode::ILLEGAL_VECTOR_ID}, {SERVER_ILLEGAL_SEARCH_RESULT, ::milvus::grpc::ErrorCode::ILLEGAL_SEARCH_RESULT}, - {SERVER_CACHE_ERROR, ::milvus::grpc::ErrorCode::CACHE_FAILED}, + {SERVER_CACHE_FULL, ::milvus::grpc::ErrorCode::CACHE_FAILED}, {DB_META_TRANSACTION_FAILED, ::milvus::grpc::ErrorCode::META_FAILED}, {SERVER_BUILD_INDEX_ERROR, ::milvus::grpc::ErrorCode::BUILD_INDEX_ERROR}, {SERVER_OUT_OF_MEMORY, ::milvus::grpc::ErrorCode::OUT_OF_MEMORY}, diff --git a/cpp/src/server/grpc_impl/GrpcRequestTask.cpp b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp index 9f0b8fd95d..1279cbac9f 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestTask.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp @@ -42,6 +42,8 @@ static const char* DQL_TASK_GROUP = "dql"; static const char* DDL_DML_TASK_GROUP = "ddl_dml"; static const char* PING_TASK_GROUP = "ping"; +constexpr int64_t DAY_SECONDS = 24 * 60 * 60; + using DB_META = milvus::engine::meta::Meta; using DB_DATE = milvus::engine::meta::DateT; @@ -78,8 +80,6 @@ IndexType(engine::EngineType type) { return map_type[type]; } -constexpr int64_t DAY_SECONDS = 24 * 60 * 60; - Status ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range>& range_array, std::vector& dates) { dates.clear(); @@ -94,10 +94,10 @@ ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range>& range_array, return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value()); } - int64_t days = (tt_end > tt_start) ? (tt_end - tt_start) / DAY_SECONDS : (tt_start - tt_end) / DAY_SECONDS; - if (days == 0) { + int64_t days = (tt_end - tt_start) / DAY_SECONDS; + if (days <= 0) { return Status(SERVER_INVALID_TIME_RANGE, - "Invalid time range: " + range.start_value() + " to " + range.end_value()); + "Invalid time range: The start-date should be smaller than end-date!"); } // range: [start_day, end_day) @@ -511,8 +511,8 @@ InsertTask::OnExecute() { } // step 6: update table flag - user_provide_ids ? table_info.flag_ |= engine::meta::FLAG_MASK_HAS_USERID : table_info.flag_ |= - engine::meta::FLAG_MASK_NO_USERID; + user_provide_ids ? table_info.flag_ |= engine::meta::FLAG_MASK_HAS_USERID + : table_info.flag_ |= engine::meta::FLAG_MASK_NO_USERID; status = DBWrapper::DB()->UpdateTableFlag(insert_param_->table_name(), table_info.flag_); #ifdef MILVUS_ENABLE_PROFILING @@ -706,7 +706,11 @@ CountTableTask::OnExecute() { uint64_t row_count = 0; status = DBWrapper::DB()->GetTableRowCount(table_name_, row_count); if (!status.ok()) { - return status; + if (status.code(), DB_NOT_FOUND) { + return Status(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists"); + } else { + return status; + } } row_count_ = static_cast(row_count); diff --git a/cpp/src/utils/Error.h b/cpp/src/utils/Error.h index 9cba18ef41..dfc400ca9a 100644 --- a/cpp/src/utils/Error.h +++ b/cpp/src/utils/Error.h @@ -56,7 +56,6 @@ constexpr ErrorCode SERVER_NULL_POINTER = ToServerErrorCode(3); constexpr ErrorCode SERVER_INVALID_ARGUMENT = ToServerErrorCode(4); constexpr ErrorCode SERVER_FILE_NOT_FOUND = ToServerErrorCode(5); constexpr ErrorCode SERVER_NOT_IMPLEMENT = ToServerErrorCode(6); -constexpr ErrorCode SERVER_BLOCKING_QUEUE_EMPTY = ToServerErrorCode(7); constexpr ErrorCode SERVER_CANNOT_CREATE_FOLDER = ToServerErrorCode(8); constexpr ErrorCode SERVER_CANNOT_CREATE_FILE = ToServerErrorCode(9); constexpr ErrorCode SERVER_CANNOT_DELETE_FOLDER = ToServerErrorCode(10); @@ -74,7 +73,7 @@ constexpr ErrorCode SERVER_INVALID_ROWRECORD_ARRAY = ToServerErrorCode(107); constexpr ErrorCode SERVER_INVALID_TOPK = ToServerErrorCode(108); constexpr ErrorCode SERVER_ILLEGAL_VECTOR_ID = ToServerErrorCode(109); constexpr ErrorCode SERVER_ILLEGAL_SEARCH_RESULT = ToServerErrorCode(110); -constexpr ErrorCode SERVER_CACHE_ERROR = ToServerErrorCode(111); +constexpr ErrorCode SERVER_CACHE_FULL = ToServerErrorCode(111); constexpr ErrorCode SERVER_WRITE_ERROR = ToServerErrorCode(112); constexpr ErrorCode SERVER_INVALID_NPROBE = ToServerErrorCode(113); constexpr ErrorCode SERVER_INVALID_INDEX_NLIST = ToServerErrorCode(114); diff --git a/cpp/src/utils/LogUtil.cpp b/cpp/src/utils/LogUtil.cpp index ed2e1a446b..4a962f466c 100644 --- a/cpp/src/utils/LogUtil.cpp +++ b/cpp/src/utils/LogUtil.cpp @@ -31,7 +31,7 @@ static int warning_idx = 0; static int trace_idx = 0; static int error_idx = 0; static int fatal_idx = 0; -} +} // namespace // TODO(yzb) : change the easylogging library to get the log level from parameter rather than filename void diff --git a/cpp/src/utils/ValidationUtil.cpp b/cpp/src/utils/ValidationUtil.cpp index 248be51088..4345ebb704 100644 --- a/cpp/src/utils/ValidationUtil.cpp +++ b/cpp/src/utils/ValidationUtil.cpp @@ -71,7 +71,11 @@ ValidationUtil::ValidateTableName(const std::string& table_name) { Status ValidationUtil::ValidateTableDimension(int64_t dimension) { - if (dimension <= 0 || dimension > TABLE_DIMENSION_LIMIT) { + if (dimension <= 0) { + std::string msg = "Dimension value should be greater than 0"; + SERVER_LOG_ERROR << msg; + return Status(SERVER_INVALID_VECTOR_DIMENSION, msg); + } else if (dimension > TABLE_DIMENSION_LIMIT) { std::string msg = "Table dimension excceed the limitation: " + std::to_string(TABLE_DIMENSION_LIMIT); SERVER_LOG_ERROR << msg; return Status(SERVER_INVALID_VECTOR_DIMENSION, msg); @@ -95,7 +99,7 @@ ValidationUtil::ValidateTableIndexType(int32_t index_type) { Status ValidationUtil::ValidateTableIndexNlist(int32_t nlist) { if (nlist <= 0) { - std::string msg = "Invalid nlist value: " + std::to_string(nlist); + std::string msg = "nlist value should be greater than 0"; SERVER_LOG_ERROR << msg; return Status(SERVER_INVALID_INDEX_NLIST, msg); } @@ -128,7 +132,7 @@ ValidationUtil::ValidateTableIndexMetricType(int32_t metric_type) { Status ValidationUtil::ValidateSearchTopk(int64_t top_k, const engine::meta::TableSchema& table_schema) { if (top_k <= 0 || top_k > 2048) { - std::string msg = "Invalid top k value: " + std::to_string(top_k); + std::string msg = "Invalid top k value: " + std::to_string(top_k) + ", rational range [1, 2048]"; SERVER_LOG_ERROR << msg; return Status(SERVER_INVALID_TOPK, msg); } @@ -139,7 +143,8 @@ ValidationUtil::ValidateSearchTopk(int64_t top_k, const engine::meta::TableSchem Status ValidationUtil::ValidateSearchNprobe(int64_t nprobe, const engine::meta::TableSchema& table_schema) { if (nprobe <= 0 || nprobe > table_schema.nlist_) { - std::string msg = "Invalid nprobe value: " + std::to_string(nprobe); + std::string msg = "Invalid nprobe value: " + std::to_string(nprobe) + ", rational range [1, " + + std::to_string(table_schema.nlist_) + "]"; SERVER_LOG_ERROR << msg; return Status(SERVER_INVALID_NPROBE, msg); } diff --git a/cpp/unittest/metrics/utils.cpp b/cpp/unittest/metrics/utils.cpp index c2a53babc3..e345923b7b 100644 --- a/cpp/unittest/metrics/utils.cpp +++ b/cpp/unittest/metrics/utils.cpp @@ -66,6 +66,7 @@ ms::engine::DBOptions MetricTest::GetOptions() { } void MetricTest::SetUp() { + boost::filesystem::remove_all("/tmp/milvus_test"); InitLog(); auto options = GetOptions(); db_ = ms::engine::DBFactory::Build(options); diff --git a/cpp/unittest/server/test_rpc.cpp b/cpp/unittest/server/test_rpc.cpp index 5baaed3152..b847ec3116 100644 --- a/cpp/unittest/server/test_rpc.cpp +++ b/cpp/unittest/server/test_rpc.cpp @@ -351,11 +351,11 @@ TEST_F(RpcHandlerTest, TABLES_TEST) { handler->Insert(&context, &request, &vector_ids); -//Show table -// ::milvus::grpc::Command cmd; -// ::grpc::ServerWriter<::milvus::grpc::TableName> *writer; -// status = handler->ShowTables(&context, &cmd, writer); -// ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code()); + //show tables + ::milvus::grpc::Command cmd; + ::milvus::grpc::TableNameList table_name_list; + status = handler->ShowTables(&context, &cmd, &table_name_list); + ASSERT_EQ(status.error_code(), ::grpc::Status::OK.error_code()); //Count Table ::milvus::grpc::TableRowCount count;