diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md index d3a9f247d6..40cf60f3dd 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -1,7 +1,7 @@ --- name: "\U0001F41B Bug report" about: Create a bug report to help us improve Milvus -title: "[BUG]" +title: '' labels: '' assignees: '' diff --git a/.github/ISSUE_TEMPLATE/documentation-request.md b/.github/ISSUE_TEMPLATE/documentation-request.md index 1e3193f9f3..133fb9e1e9 100644 --- a/.github/ISSUE_TEMPLATE/documentation-request.md +++ b/.github/ISSUE_TEMPLATE/documentation-request.md @@ -1,7 +1,7 @@ --- name: "\U0001F4DD Documentation request" about: Report incorrect or needed documentation -title: "[DOC]" +title: '' labels: '' assignees: '' diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md index 24de651b12..01bceb3321 100644 --- a/.github/ISSUE_TEMPLATE/feature_request.md +++ b/.github/ISSUE_TEMPLATE/feature_request.md @@ -1,7 +1,7 @@ --- name: "\U0001F680 Feature request" about: Suggest an idea for Milvus -title: "[FEATURE]" +title: '' labels: '' assignees: '' diff --git a/.github/ISSUE_TEMPLATE/general-question.md b/.github/ISSUE_TEMPLATE/general-question.md index 32ce5dd701..d49fce1817 100644 --- a/.github/ISSUE_TEMPLATE/general-question.md +++ b/.github/ISSUE_TEMPLATE/general-question.md @@ -1,7 +1,7 @@ --- name: "\U0001F914 General question" about: Ask a general question about Milvus -title: "[QUESTION]" +title: '' labels: '' assignees: '' diff --git a/.gitignore b/.gitignore index 8fda9f2980..0a120ebe95 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,6 @@ cmake_build .coverage *.pyc cov_html/ + +# temp +shards/all_in_one_with_mysql/metadata/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ddff2a51c..7083cc12eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,7 +24,12 @@ Please mark all change in change log and use the ticket from JIRA. - \#486 - gpu no usage during index building - \#509 - IVF_PQ index build trapped into dead loop caused by invalid params - \#513 - Unittest DELETE_BY_RANGE sometimes failed +- \#523 - Erase file data from cache once the file is marked as deleted - \#527 - faiss benchmark not compatible with faiss 1.6.0 +- \#530 - BuildIndex stop when do build index and search simultaneously +- \#532 - assigin value to `table_name` from confest shell +- \#533 - NSG build failed with MetricType Inner Product +- \#543 - client raise exception in shards when search results is empty ## Feature - \#12 - Pure CPU version for Milvus @@ -33,6 +38,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#226 - Experimental shards middleware for Milvus - \#227 - Support new index types SPTAG-KDT and SPTAG-BKT - \#346 - Support build index with multiple gpu +- \#420 - Update shards merge part to match v0.5.3 - \#488 - Add log in scheduler/optimizer - \#502 - C++ SDK support IVFPQ and SPTAG diff --git a/ci/jenkins/pod/milvus-cpu-version-build-env-pod.yaml b/ci/jenkins/pod/milvus-cpu-version-build-env-pod.yaml index 894067d66c..58eae39061 100644 --- a/ci/jenkins/pod/milvus-cpu-version-build-env-pod.yaml +++ b/ci/jenkins/pod/milvus-cpu-version-build-env-pod.yaml @@ -14,6 +14,8 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP + - name: BUILD_ENV_IMAGE_ID + value: "23476391bec80c64f10d44a6370c73c71f011a6b95114b10ff82a60e771e11c7" command: - cat tty: true diff --git a/ci/jenkins/pod/milvus-gpu-version-build-env-pod.yaml b/ci/jenkins/pod/milvus-gpu-version-build-env-pod.yaml index f5ceb9462b..bd321a87ae 100644 --- a/ci/jenkins/pod/milvus-gpu-version-build-env-pod.yaml +++ b/ci/jenkins/pod/milvus-gpu-version-build-env-pod.yaml @@ -14,6 +14,8 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP + - name: BUILD_ENV_IMAGE_ID + value: "da9023b0f858f072672f86483a869aa87e90a5140864f89e5a012ec766d96dea" command: - cat tty: true diff --git a/ci/jenkins/step/build.groovy b/ci/jenkins/step/build.groovy index 6c1da64a82..d39f104a17 100644 --- a/ci/jenkins/step/build.groovy +++ b/ci/jenkins/step/build.groovy @@ -1,11 +1,13 @@ timeout(time: 60, unit: 'MINUTES') { dir ("ci/scripts") { withCredentials([usernamePassword(credentialsId: "${params.JFROG_CREDENTIALS_ID}", usernameVariable: 'USERNAME', passwordVariable: 'PASSWORD')]) { + def checkResult = sh(script: "./check_ccache.sh -l ${params.JFROG_ARTFACTORY_URL}/ccache", returnStatus: true) if ("${env.BINRARY_VERSION}" == "gpu") { - sh "export JFROG_ARTFACTORY_URL='${params.JFROG_ARTFACTORY_URL}' && export JFROG_USER_NAME='${USERNAME}' && export JFROG_PASSWORD='${PASSWORD}' && ./build.sh -t ${params.BUILD_TYPE} -o /opt/milvus -l -g -j -u -c" - } else { - sh "export JFROG_ARTFACTORY_URL='${params.JFROG_ARTFACTORY_URL}' && export JFROG_USER_NAME='${USERNAME}' && export JFROG_PASSWORD='${PASSWORD}' && ./build.sh -t ${params.BUILD_TYPE} -o /opt/milvus -l -m -j -u -c" - } + sh ". ./before-install.sh && ./build.sh -t ${params.BUILD_TYPE} -o /opt/milvus -l -g -u -c" + } else { + sh ". ./before-install.sh && ./build.sh -t ${params.BUILD_TYPE} -o /opt/milvus -l -m -u -c" + } + sh "./update_ccache.sh -l ${params.JFROG_ARTFACTORY_URL}/ccache -u ${USERNAME} -p ${PASSWORD}" } } } diff --git a/ci/scripts/before-install.sh b/ci/scripts/before-install.sh new file mode 100755 index 0000000000..ad6cf17e00 --- /dev/null +++ b/ci/scripts/before-install.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +set -ex + +export CCACHE_COMPRESS=1 +export CCACHE_COMPRESSLEVEL=5 +export CCACHE_COMPILERCHECK=content +export PATH=/usr/lib/ccache/:$PATH +ccache --show-stats + +set +ex diff --git a/ci/scripts/check_ccache.sh b/ci/scripts/check_ccache.sh new file mode 100755 index 0000000000..17e3ed2f43 --- /dev/null +++ b/ci/scripts/check_ccache.sh @@ -0,0 +1,73 @@ +#!/bin/bash + +OS_NAME="linux" +CODE_NAME=$(lsb_release -sc) +BUILD_ENV_DOCKER_IMAGE_ID="${BUILD_ENV_IMAGE_ID}" +BRANCH_NAMES=$(git log --decorate | head -n 1 | sed 's/.*(\(.*\))/\1/' | sed 's=[a-zA-Z]*\/==g' | awk -F", " '{$1=""; print $0}') +ARTIFACTORY_URL="" +CCACHE_DIRECTORY="${HOME}/.ccache" + +while getopts "l:d:h" arg +do + case $arg in + l) + ARTIFACTORY_URL=$OPTARG + ;; + d) + CCACHE_DIRECTORY=$OPTARG + ;; + h) # help + echo " + +parameter: +-l: artifactory url +-d: ccache directory +-h: help + +usage: +./build.sh -l \${ARTIFACTORY_URL} -d \${CCACHE_DIRECTORY} [-h] + " + exit 0 + ;; + ?) + echo "ERROR! unknown argument" + exit 1 + ;; + esac +done + +if [[ -z "${ARTIFACTORY_URL}" || "${ARTIFACTORY_URL}" == "" ]];then + echo "you have not input ARTIFACTORY_URL !" + exit 1 +fi + +check_ccache() { + BRANCH=$1 + echo "fetching ${BRANCH}/ccache-${OS_NAME}-${CODE_NAME}-${BUILD_ENV_DOCKER_IMAGE_ID}.tar.gz" + wget -q --method HEAD "${ARTIFACTORY_URL}/${BRANCH}/ccache-${OS_NAME}-${CODE_NAME}-${BUILD_ENV_DOCKER_IMAGE_ID}.tar.gz" + if [[ $? == 0 ]];then + wget "${ARTIFACTORY_URL}/${BRANCH}/ccache-${OS_NAME}-${CODE_NAME}-${BUILD_ENV_DOCKER_IMAGE_ID}.tar.gz" && \ + mkdir -p ${CCACHE_DIRECTORY} && \ + tar zxf ccache-${OS_NAME}-${CODE_NAME}-${BUILD_ENV_DOCKER_IMAGE_ID}.tar.gz -C ${CCACHE_DIRECTORY} && \ + rm ccache-${OS_NAME}-${CODE_NAME}-${BUILD_ENV_DOCKER_IMAGE_ID}.tar.gz + if [[ $? == 0 ]];then + echo "found cache" + exit 0 + fi + fi +} + +for BRANCH_NAME in ${BRANCH_NAMES} +do + if [[ "${BRANCH_NAME}" != "HEAD" ]];then + check_ccache ${BRANCH_NAME} + fi +done + +if [[ -n "${CHANGE_BRANCH}" && "${BRANCH_NAME}" =~ "PR-" ]];then + check_ccache ${CHANGE_BRANCH} + check_ccache ${BRANCH_NAME} +fi + +echo "could not download cache" && exit 1 + diff --git a/ci/scripts/update_ccache.sh b/ci/scripts/update_ccache.sh new file mode 100755 index 0000000000..f4afc29d1e --- /dev/null +++ b/ci/scripts/update_ccache.sh @@ -0,0 +1,71 @@ +#!/bin/bash + +OS_NAME="linux" +CODE_NAME=$(lsb_release -sc) +BUILD_ENV_DOCKER_IMAGE_ID="${BUILD_ENV_IMAGE_ID}" +BRANCH_NAME=$(git log --decorate | head -n 1 | sed 's/.*(\(.*\))/\1/' | sed 's/.*, //' | sed 's=[a-zA-Z]*\/==g') +ARTIFACTORY_URL="" +ARTIFACTORY_USER="" +ARTIFACTORY_PASSWORD="" +CCACHE_DIRECTORY="${HOME}/.ccache" + +while getopts "l:u:p:d:h" arg +do + case $arg in + l) + ARTIFACTORY_URL=$OPTARG + ;; + u) + ARTIFACTORY_USER=$OPTARG + ;; + p) + ARTIFACTORY_PASSWORD=$OPTARG + ;; + d) + CCACHE_DIRECTORY=$OPTARG + ;; + h) # help + echo " + +parameter: +-l: artifactory url +-u: artifactory user +-p: artifactory password +-d: ccache directory +-h: help + +usage: +./build.sh -l \${ARTIFACTORY_URL} -u \${ARTIFACTORY_USER} -p \${ARTIFACTORY_PASSWORD} -d \${CCACHE_DIRECTORY} [-h] + " + exit 0 + ;; + ?) + echo "ERROR! unknown argument" + exit 1 + ;; + esac +done + +if [[ -z "${ARTIFACTORY_URL}" || "${ARTIFACTORY_URL}" == "" ]];then + echo "you have not input ARTIFACTORY_URL !" + exit 1 +fi + +PACKAGE_FILE="ccache-${OS_NAME}-${CODE_NAME}-${BUILD_ENV_DOCKER_IMAGE_ID}.tar.gz" +REMOTE_PACKAGE_PATH="${ARTIFACTORY_URL}/${BRANCH_NAME}" + +ccache --show-stats + +if [[ "${BRANCH_NAME}" != "HEAD" ]];then + echo "Updating ccache package file: ${PACKAGE_FILE}" + tar zcf ./${PACKAGE_FILE} -C ${HOME}/.ccache . + echo "Uploading ccache package file ${PACKAGE_FILE} to ${REMOTE_PACKAGE_PATH}" + curl -u${ARTIFACTORY_USER}:${ARTIFACTORY_PASSWORD} -T ${PACKAGE_FILE} ${REMOTE_PACKAGE_PATH}/${PACKAGE_FILE} + if [[ $? == 0 ]];then + echo "Uploading ccache package file success !" + exit 0 + else + echo "Uploading ccache package file fault !" + exit 1 + fi +fi diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index ba391c2d6b..bfa391f46f 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -38,7 +38,7 @@ endif () set (GIT_BRANCH_NAME_REGEX "[0-9]+\\.[0-9]+\\.[0-9]") MACRO(GET_GIT_BRANCH_NAME GIT_BRANCH_NAME) - execute_process(COMMAND sh "-c" "git log --decorate | head -n 1 | sed 's/.*(\\(.*\\))/\\1/' | sed 's/.* \\(.*\\),.*/\\1/' | sed 's=[a-zA-Z]*\/==g'" + execute_process(COMMAND sh "-c" "git log --decorate | head -n 1 | sed 's/.*(\\(.*\\))/\\1/' | sed 's/.*, //' | sed 's=[a-zA-Z]*\/==g'" OUTPUT_VARIABLE ${GIT_BRANCH_NAME}) if(NOT GIT_BRANCH_NAME MATCHES "${GIT_BRANCH_NAME_REGEX}") execute_process(COMMAND "git" rev-parse --abbrev-ref HEAD OUTPUT_VARIABLE ${GIT_BRANCH_NAME}) @@ -187,7 +187,7 @@ endif () add_custom_target(Clean-All COMMAND ${CMAKE_BUILD_TOOL} clean) if ("${MILVUS_DB_PATH}" STREQUAL "") - set(MILVUS_DB_PATH "/tmp/milvus") + set(MILVUS_DB_PATH "${CMAKE_INSTALL_PREFIX}") endif () if (MILVUS_GPU_VERSION) diff --git a/core/src/cache/Cache.inl b/core/src/cache/Cache.inl index 9ac7ff21e6..9ebec7cfdd 100644 --- a/core/src/cache/Cache.inl +++ b/core/src/cache/Cache.inl @@ -99,8 +99,8 @@ Cache::insert(const std::string& key, const ItemObj& item) { std::lock_guard lock(mutex_); lru_.put(key, item); - SERVER_LOG_DEBUG << "Insert " << key << " size:" << item->Size() << " bytes into cache, usage: " << usage_ - << " bytes"; + SERVER_LOG_DEBUG << "Insert " << key << " size: " << item->Size() << " bytes into cache, usage: " << usage_ + << " bytes," << " capacity: " << capacity_ << " bytes"; } } @@ -115,7 +115,8 @@ Cache::erase(const std::string& key) { const ItemObj& old_item = lru_.get(key); usage_ -= old_item->Size(); - SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->Size(); + SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->Size() << " bytes from cache, usage: " << usage_ + << " bytes," << " capacity: " << capacity_ << " bytes"; lru_.erase(key); } diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 1c18c22409..50cf4614ca 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -112,7 +112,7 @@ DBImpl::Stop() { bg_timer_thread_.join(); if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) { - meta_ptr_->CleanUp(); + meta_ptr_->CleanUpShadowFiles(); } // ENGINE_LOG_TRACE << "DB service stop"; @@ -777,11 +777,18 @@ DBImpl::BackgroundCompaction(std::set table_ids) { meta_ptr_->Archive(); - int ttl = 5 * meta::M_SEC; // default: file will be deleted after 5 minutes - if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) { - ttl = meta::D_SEC; + { + uint64_t ttl = 10 * meta::SECOND; // default: file data will be erase from cache after few seconds + meta_ptr_->CleanUpCacheWithTTL(ttl); + } + + { + uint64_t ttl = 5 * meta::M_SEC; // default: file will be deleted after few minutes + if (options_.mode_ == DBOptions::MODE::CLUSTER_WRITABLE) { + ttl = meta::D_SEC; + } + meta_ptr_->CleanUpFilesWithTTL(ttl); } - meta_ptr_->CleanUpFilesWithTTL(ttl); // ENGINE_LOG_TRACE << " Background compaction thread exit"; } diff --git a/core/src/db/engine/ExecutionEngineImpl.cpp b/core/src/db/engine/ExecutionEngineImpl.cpp index c0ab4e829e..0f92b34eca 100644 --- a/core/src/db/engine/ExecutionEngineImpl.cpp +++ b/core/src/db/engine/ExecutionEngineImpl.cpp @@ -257,6 +257,11 @@ ExecutionEngineImpl::PhysicalSize() const { Status ExecutionEngineImpl::Serialize() { auto status = write_index(index_, location_); + + // here we reset index size by file size, + // since some index type(such as SQ8) data size become smaller after serialized + index_->set_size(PhysicalSize()); + return status; } diff --git a/core/src/db/meta/Meta.h b/core/src/db/meta/Meta.h index 52fe86fe69..bf46f02fea 100644 --- a/core/src/db/meta/Meta.h +++ b/core/src/db/meta/Meta.h @@ -118,9 +118,13 @@ class Meta { Archive() = 0; virtual Status - CleanUp() = 0; + CleanUpShadowFiles() = 0; - virtual Status CleanUpFilesWithTTL(uint16_t) = 0; + virtual Status + CleanUpCacheWithTTL(uint64_t seconds) = 0; + + virtual Status + CleanUpFilesWithTTL(uint64_t seconds) = 0; virtual Status DropAll() = 0; diff --git a/core/src/db/meta/MySQLMetaImpl.cpp b/core/src/db/meta/MySQLMetaImpl.cpp index 6d13cad248..2fb5eb0f3c 100644 --- a/core/src/db/meta/MySQLMetaImpl.cpp +++ b/core/src/db/meta/MySQLMetaImpl.cpp @@ -20,6 +20,7 @@ #include "db/IDGenerator.h" #include "db/Utils.h" #include "metrics/Metrics.h" +#include "utils/CommonUtil.h" #include "utils/Exception.h" #include "utils/Log.h" #include "utils/StringHelpFunctions.h" @@ -292,7 +293,7 @@ MySQLMetaImpl::Initialize() { // step 5: create meta tables try { if (mode_ != DBOptions::MODE::CLUSTER_READONLY) { - CleanUp(); + CleanUpShadowFiles(); } { @@ -1710,7 +1711,7 @@ MySQLMetaImpl::Size(uint64_t& result) { } Status -MySQLMetaImpl::CleanUp() { +MySQLMetaImpl::CleanUpShadowFiles() { try { mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_); @@ -1752,7 +1753,49 @@ MySQLMetaImpl::CleanUp() { } Status -MySQLMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { +MySQLMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) { + auto now = utils::GetMicroSecTimeStamp(); + + // erase deleted/backup files from cache + try { + server::MetricCollector metric; + + mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_); + + if (connectionPtr == nullptr) { + return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); + } + + mysqlpp::Query cleanUpFilesWithTTLQuery = connectionPtr->query(); + cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date" + << " FROM " << META_TABLEFILES << " WHERE file_type IN (" + << std::to_string(TableFileSchema::TO_DELETE) << "," + << std::to_string(TableFileSchema::BACKUP) << ")" + << " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";"; + + mysqlpp::StoreQueryResult res = cleanUpFilesWithTTLQuery.store(); + + TableFileSchema table_file; + std::vector idsToDelete; + + for (auto& resRow : res) { + table_file.id_ = resRow["id"]; // implicit conversion + resRow["table_id"].to_string(table_file.table_id_); + resRow["file_id"].to_string(table_file.file_id_); + table_file.date_ = resRow["date"]; + + utils::GetTableFilePath(options_, table_file); + server::CommonUtil::EraseFromCache(table_file.location_); + } + } catch (std::exception& e) { + return HandleException("GENERAL ERROR WHEN CLEANING UP FILES WITH TTL", e.what()); + } + + return Status::OK(); +} + +Status +MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { auto now = utils::GetMicroSecTimeStamp(); std::set table_ids; diff --git a/core/src/db/meta/MySQLMetaImpl.h b/core/src/db/meta/MySQLMetaImpl.h index dd882fca2e..e7697316af 100644 --- a/core/src/db/meta/MySQLMetaImpl.h +++ b/core/src/db/meta/MySQLMetaImpl.h @@ -117,10 +117,13 @@ class MySQLMetaImpl : public Meta { Size(uint64_t& result) override; Status - CleanUp() override; + CleanUpShadowFiles() override; Status - CleanUpFilesWithTTL(uint16_t seconds) override; + CleanUpCacheWithTTL(uint64_t seconds) override; + + Status + CleanUpFilesWithTTL(uint64_t seconds) override; Status DropAll() override; diff --git a/core/src/db/meta/SqliteMetaImpl.cpp b/core/src/db/meta/SqliteMetaImpl.cpp index 74460c1b4d..24d5d78bad 100644 --- a/core/src/db/meta/SqliteMetaImpl.cpp +++ b/core/src/db/meta/SqliteMetaImpl.cpp @@ -20,6 +20,7 @@ #include "db/IDGenerator.h" #include "db/Utils.h" #include "metrics/Metrics.h" +#include "utils/CommonUtil.h" #include "utils/Exception.h" #include "utils/Log.h" #include "utils/StringHelpFunctions.h" @@ -154,7 +155,7 @@ SqliteMetaImpl::Initialize() { ConnectorPtr->open_forever(); // thread safe option ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log - CleanUp(); + CleanUpShadowFiles(); return Status::OK(); } @@ -1231,7 +1232,7 @@ SqliteMetaImpl::Size(uint64_t& result) { } Status -SqliteMetaImpl::CleanUp() { +SqliteMetaImpl::CleanUpShadowFiles() { try { server::MetricCollector metric; @@ -1269,7 +1270,51 @@ SqliteMetaImpl::CleanUp() { } Status -SqliteMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) { +SqliteMetaImpl::CleanUpCacheWithTTL(uint64_t seconds) { + auto now = utils::GetMicroSecTimeStamp(); + + // erase deleted/backup files from cache + try { + server::MetricCollector metric; + + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + + std::vector file_types = { + (int)TableFileSchema::TO_DELETE, + (int)TableFileSchema::BACKUP, + }; + + auto files = ConnectorPtr->select(columns(&TableFileSchema::id_, + &TableFileSchema::table_id_, + &TableFileSchema::file_id_, + &TableFileSchema::date_), + where( + in(&TableFileSchema::file_type_, file_types) + and + c(&TableFileSchema::updated_time_) + < now - seconds * US_PS)); + + for (auto& file : files) { + TableFileSchema table_file; + table_file.id_ = std::get<0>(file); + table_file.table_id_ = std::get<1>(file); + table_file.file_id_ = std::get<2>(file); + table_file.date_ = std::get<3>(file); + + utils::GetTableFilePath(options_, table_file); + server::CommonUtil::EraseFromCache(table_file.location_); + } + + } catch (std::exception& e) { + return HandleException("Encounter exception when clean cache", e.what()); + } + + return Status::OK(); +} + +Status +SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds) { auto now = utils::GetMicroSecTimeStamp(); std::set table_ids; diff --git a/core/src/db/meta/SqliteMetaImpl.h b/core/src/db/meta/SqliteMetaImpl.h index 8e821d81de..5581efe361 100644 --- a/core/src/db/meta/SqliteMetaImpl.h +++ b/core/src/db/meta/SqliteMetaImpl.h @@ -117,10 +117,13 @@ class SqliteMetaImpl : public Meta { Archive() override; Status - CleanUp() override; + CleanUpShadowFiles() override; Status - CleanUpFilesWithTTL(uint16_t seconds) override; + CleanUpCacheWithTTL(uint64_t seconds) override; + + Status + CleanUpFilesWithTTL(uint64_t seconds) override; Status DropAll() override; diff --git a/core/src/grpc/README.md b/core/src/grpc/README.md index 44c4e90841..6a3fe1157c 100644 --- a/core/src/grpc/README.md +++ b/core/src/grpc/README.md @@ -1,4 +1,4 @@ -We manually change two APIs in "milvus.pd.h": +We manually change two APIs in "milvus.pb.h": add_vector_data() add_row_id_array() add_ids() diff --git a/core/src/index/knowhere/CMakeLists.txt b/core/src/index/knowhere/CMakeLists.txt index 285461bdef..a7d3966481 100644 --- a/core/src/index/knowhere/CMakeLists.txt +++ b/core/src/index/knowhere/CMakeLists.txt @@ -38,6 +38,7 @@ set(index_srcs knowhere/index/vector_index/nsg/NSG.cpp knowhere/index/vector_index/nsg/NSGIO.cpp knowhere/index/vector_index/nsg/NSGHelper.cpp + knowhere/index/vector_index/nsg/Distance.cpp knowhere/index/vector_index/IndexIVFSQ.cpp knowhere/index/vector_index/IndexIVFPQ.cpp knowhere/index/vector_index/FaissBaseIndex.cpp diff --git a/core/src/index/knowhere/knowhere/index/vector_index/IndexNSG.cpp b/core/src/index/knowhere/knowhere/index/vector_index/IndexNSG.cpp index 204819517a..3cf0122233 100644 --- a/core/src/index/knowhere/knowhere/index/vector_index/IndexNSG.cpp +++ b/core/src/index/knowhere/knowhere/index/vector_index/IndexNSG.cpp @@ -115,10 +115,6 @@ NSG::Train(const DatasetPtr& dataset, const Config& config) { build_cfg->CheckValid(); // throw exception } - if (build_cfg->metric_type != METRICTYPE::L2) { - KNOWHERE_THROW_MSG("NSG not support this kind of metric type"); - } - // TODO(linxj): dev IndexFactory, support more IndexType #ifdef MILVUS_GPU_VERSION auto preprocess_index = std::make_shared(build_cfg->gpu_id); diff --git a/core/src/index/knowhere/knowhere/index/vector_index/nsg/Distance.cpp b/core/src/index/knowhere/knowhere/index/vector_index/nsg/Distance.cpp new file mode 100644 index 0000000000..8508b65218 --- /dev/null +++ b/core/src/index/knowhere/knowhere/index/vector_index/nsg/Distance.cpp @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "knowhere/index/vector_index/nsg/Distance.h" + +namespace knowhere { +namespace algo { + +float +DistanceL2::Compare(const float* a, const float* b, unsigned size) const { + float result = 0; + +#ifdef __GNUC__ +#ifdef __AVX__ + +#define AVX_L2SQR(addr1, addr2, dest, tmp1, tmp2) \ + tmp1 = _mm256_loadu_ps(addr1); \ + tmp2 = _mm256_loadu_ps(addr2); \ + tmp1 = _mm256_sub_ps(tmp1, tmp2); \ + tmp1 = _mm256_mul_ps(tmp1, tmp1); \ + dest = _mm256_add_ps(dest, tmp1); + + __m256 sum; + __m256 l0, l1; + __m256 r0, r1; + unsigned D = (size + 7) & ~7U; + unsigned DR = D % 16; + unsigned DD = D - DR; + const float* l = a; + const float* r = b; + const float* e_l = l + DD; + const float* e_r = r + DD; + float unpack[8] __attribute__((aligned(32))) = {0, 0, 0, 0, 0, 0, 0, 0}; + + sum = _mm256_loadu_ps(unpack); + if (DR) { + AVX_L2SQR(e_l, e_r, sum, l0, r0); + } + + for (unsigned i = 0; i < DD; i += 16, l += 16, r += 16) { + AVX_L2SQR(l, r, sum, l0, r0); + AVX_L2SQR(l + 8, r + 8, sum, l1, r1); + } + _mm256_storeu_ps(unpack, sum); + result = unpack[0] + unpack[1] + unpack[2] + unpack[3] + unpack[4] + unpack[5] + unpack[6] + unpack[7]; + +#else +#ifdef __SSE2__ +#define SSE_L2SQR(addr1, addr2, dest, tmp1, tmp2) \ + tmp1 = _mm_load_ps(addr1); \ + tmp2 = _mm_load_ps(addr2); \ + tmp1 = _mm_sub_ps(tmp1, tmp2); \ + tmp1 = _mm_mul_ps(tmp1, tmp1); \ + dest = _mm_add_ps(dest, tmp1); + + __m128 sum; + __m128 l0, l1, l2, l3; + __m128 r0, r1, r2, r3; + unsigned D = (size + 3) & ~3U; + unsigned DR = D % 16; + unsigned DD = D - DR; + const float* l = a; + const float* r = b; + const float* e_l = l + DD; + const float* e_r = r + DD; + float unpack[4] __attribute__((aligned(16))) = {0, 0, 0, 0}; + + sum = _mm_load_ps(unpack); + switch (DR) { + case 12: + SSE_L2SQR(e_l + 8, e_r + 8, sum, l2, r2); + case 8: + SSE_L2SQR(e_l + 4, e_r + 4, sum, l1, r1); + case 4: + SSE_L2SQR(e_l, e_r, sum, l0, r0); + default: + break; + } + for (unsigned i = 0; i < DD; i += 16, l += 16, r += 16) { + SSE_L2SQR(l, r, sum, l0, r0); + SSE_L2SQR(l + 4, r + 4, sum, l1, r1); + SSE_L2SQR(l + 8, r + 8, sum, l2, r2); + SSE_L2SQR(l + 12, r + 12, sum, l3, r3); + } + _mm_storeu_ps(unpack, sum); + result += unpack[0] + unpack[1] + unpack[2] + unpack[3]; + +// nomal distance +#else + + float diff0, diff1, diff2, diff3; + const float* last = a + size; + const float* unroll_group = last - 3; + + /* Process 4 items with each loop for efficiency. */ + while (a < unroll_group) { + diff0 = a[0] - b[0]; + diff1 = a[1] - b[1]; + diff2 = a[2] - b[2]; + diff3 = a[3] - b[3]; + result += diff0 * diff0 + diff1 * diff1 + diff2 * diff2 + diff3 * diff3; + a += 4; + b += 4; + } + /* Process last 0-3 pixels. Not needed for standard vector lengths. */ + while (a < last) { + diff0 = *a++ - *b++; + result += diff0 * diff0; + } +#endif +#endif +#endif + + return result; +} + +float +DistanceIP::Compare(const float* a, const float* b, unsigned size) const { + float result = 0; + +#ifdef __GNUC__ +#ifdef __AVX__ +#define AVX_DOT(addr1, addr2, dest, tmp1, tmp2) \ + tmp1 = _mm256_loadu_ps(addr1); \ + tmp2 = _mm256_loadu_ps(addr2); \ + tmp1 = _mm256_mul_ps(tmp1, tmp2); \ + dest = _mm256_add_ps(dest, tmp1); + + __m256 sum; + __m256 l0, l1; + __m256 r0, r1; + unsigned D = (size + 7) & ~7U; + unsigned DR = D % 16; + unsigned DD = D - DR; + const float* l = a; + const float* r = b; + const float* e_l = l + DD; + const float* e_r = r + DD; + float unpack[8] __attribute__((aligned(32))) = {0, 0, 0, 0, 0, 0, 0, 0}; + + sum = _mm256_loadu_ps(unpack); + if (DR) { + AVX_DOT(e_l, e_r, sum, l0, r0); + } + + for (unsigned i = 0; i < DD; i += 16, l += 16, r += 16) { + AVX_DOT(l, r, sum, l0, r0); + AVX_DOT(l + 8, r + 8, sum, l1, r1); + } + _mm256_storeu_ps(unpack, sum); + result = unpack[0] + unpack[1] + unpack[2] + unpack[3] + unpack[4] + unpack[5] + unpack[6] + unpack[7]; + +#else +#ifdef __SSE2__ +#define SSE_DOT(addr1, addr2, dest, tmp1, tmp2) \ + tmp1 = _mm128_loadu_ps(addr1); \ + tmp2 = _mm128_loadu_ps(addr2); \ + tmp1 = _mm128_mul_ps(tmp1, tmp2); \ + dest = _mm128_add_ps(dest, tmp1); + __m128 sum; + __m128 l0, l1, l2, l3; + __m128 r0, r1, r2, r3; + unsigned D = (size + 3) & ~3U; + unsigned DR = D % 16; + unsigned DD = D - DR; + const float* l = a; + const float* r = b; + const float* e_l = l + DD; + const float* e_r = r + DD; + float unpack[4] __attribute__((aligned(16))) = {0, 0, 0, 0}; + + sum = _mm_load_ps(unpack); + switch (DR) { + case 12: + SSE_DOT(e_l + 8, e_r + 8, sum, l2, r2); + case 8: + SSE_DOT(e_l + 4, e_r + 4, sum, l1, r1); + case 4: + SSE_DOT(e_l, e_r, sum, l0, r0); + default: + break; + } + for (unsigned i = 0; i < DD; i += 16, l += 16, r += 16) { + SSE_DOT(l, r, sum, l0, r0); + SSE_DOT(l + 4, r + 4, sum, l1, r1); + SSE_DOT(l + 8, r + 8, sum, l2, r2); + SSE_DOT(l + 12, r + 12, sum, l3, r3); + } + _mm_storeu_ps(unpack, sum); + result += unpack[0] + unpack[1] + unpack[2] + unpack[3]; +#else + + float dot0, dot1, dot2, dot3; + const float* last = a + size; + const float* unroll_group = last - 3; + + /* Process 4 items with each loop for efficiency. */ + while (a < unroll_group) { + dot0 = a[0] * b[0]; + dot1 = a[1] * b[1]; + dot2 = a[2] * b[2]; + dot3 = a[3] * b[3]; + result += dot0 + dot1 + dot2 + dot3; + a += 4; + b += 4; + } + /* Process last 0-3 pixels. Not needed for standard vector lengths. */ + while (a < last) { + result += *a++ * *b++; + } +#endif +#endif +#endif + return result; +} + +//#include +// float +// DistanceL2::Compare(const float* a, const float* b, unsigned size) const { +// return faiss::fvec_L2sqr(a,b,size); +//} +// +// float +// DistanceIP::Compare(const float* a, const float* b, unsigned size) const { +// return faiss::fvec_inner_product(a,b,size); +//} + +} // namespace algo +} // namespace knowhere diff --git a/core/src/index/knowhere/knowhere/index/vector_index/nsg/Distance.h b/core/src/index/knowhere/knowhere/index/vector_index/nsg/Distance.h new file mode 100644 index 0000000000..df24ca8725 --- /dev/null +++ b/core/src/index/knowhere/knowhere/index/vector_index/nsg/Distance.h @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +namespace knowhere { +namespace algo { + +struct Distance { + virtual float + Compare(const float* a, const float* b, unsigned size) const = 0; +}; + +struct DistanceL2 : public Distance { + float + Compare(const float* a, const float* b, unsigned size) const override; +}; + +struct DistanceIP : public Distance { + float + Compare(const float* a, const float* b, unsigned size) const override; +}; + +} // namespace algo +} // namespace knowhere diff --git a/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSG.cpp b/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSG.cpp index b4e00e57b7..e9e65b1191 100644 --- a/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSG.cpp +++ b/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSG.cpp @@ -35,17 +35,24 @@ namespace knowhere { namespace algo { -NsgIndex::NsgIndex(const size_t& dimension, const size_t& n, MetricType metric) +NsgIndex::NsgIndex(const size_t& dimension, const size_t& n, METRICTYPE metric) : dimension(dimension), ntotal(n), metric_type(metric) { + switch (metric) { + case METRICTYPE::L2: + distance_ = new DistanceL2; + break; + case METRICTYPE::IP: + distance_ = new DistanceIP; + break; + } } NsgIndex::~NsgIndex() { delete[] ori_data_; delete[] ids_; + delete distance_; } -// void NsgIndex::Build(size_t nb, const float *data, const BuildParam ¶meters) { -//} void NsgIndex::Build_with_ids(size_t nb, const float* data, const int64_t* ids, const BuildParams& parameters) { TimeRecorder rc("NSG"); @@ -126,7 +133,7 @@ NsgIndex::InitNavigationPoint() { //>> Debug code ///// - // float r1 = calculate(center, ori_data_ + navigation_point * dimension, dimension); + // float r1 = distance_->Compare(center, ori_data_ + navigation_point * dimension, dimension); // assert(r1 == resset[0].distance); ///// } @@ -180,7 +187,7 @@ NsgIndex::GetNeighbors(const float* query, std::vector& resset, std::v continue; } - float dist = calculate(ori_data_ + dimension * id, query, dimension); + float dist = distance_->Compare(ori_data_ + dimension * id, query, dimension); resset[i] = Neighbor(id, dist, false); ///////////// difference from other GetNeighbors /////////////// @@ -205,7 +212,7 @@ NsgIndex::GetNeighbors(const float* query, std::vector& resset, std::v continue; has_calculated_dist[id] = true; - float dist = calculate(query, ori_data_ + dimension * id, dimension); + float dist = distance_->Compare(query, ori_data_ + dimension * id, dimension); Neighbor nn(id, dist, false); fullset.push_back(nn); @@ -278,7 +285,7 @@ NsgIndex::GetNeighbors(const float* query, std::vector& resset, std::v continue; } - float dist = calculate(ori_data_ + id * dimension, query, dimension); + float dist = distance_->Compare(ori_data_ + id * dimension, query, dimension); resset[i] = Neighbor(id, dist, false); } std::sort(resset.begin(), resset.end()); // sort by distance @@ -299,7 +306,7 @@ NsgIndex::GetNeighbors(const float* query, std::vector& resset, std::v continue; has_calculated_dist[id] = true; - float dist = calculate(ori_data_ + dimension * id, query, dimension); + float dist = distance_->Compare(ori_data_ + dimension * id, query, dimension); Neighbor nn(id, dist, false); fullset.push_back(nn); @@ -371,7 +378,7 @@ NsgIndex::GetNeighbors(const float* query, std::vector& resset, Graph& continue; } - float dist = calculate(ori_data_ + id * dimension, query, dimension); + float dist = distance_->Compare(ori_data_ + id * dimension, query, dimension); resset[i] = Neighbor(id, dist, false); } std::sort(resset.begin(), resset.end()); // sort by distance @@ -399,7 +406,7 @@ NsgIndex::GetNeighbors(const float* query, std::vector& resset, Graph& continue; has_calculated_dist[id] = true; - float dist = calculate(query, ori_data_ + dimension * id, dimension); + float dist = distance_->Compare(query, ori_data_ + dimension * id, dimension); if (dist >= resset[buffer_size - 1].distance) continue; @@ -449,7 +456,7 @@ NsgIndex::Link() { //>> Debug code ///// - // float r1 = calculate(ori_data_ + n * dimension, ori_data_ + temp[0].id * dimension, dimension); + // float r1 = distance_->Compare(ori_data_ + n * dimension, ori_data_ + temp[0].id * dimension, dimension); // assert(r1 == temp[0].distance); ///// SyncPrune(n, fullset, flags, cut_graph_dist); @@ -496,7 +503,7 @@ NsgIndex::SyncPrune(size_t n, std::vector& pool, boost::dynamic_bitset auto id = knng[n][i]; if (has_calculated[id]) continue; - float dist = calculate(ori_data_ + dimension * n, ori_data_ + dimension * id, dimension); + float dist = distance_->Compare(ori_data_ + dimension * n, ori_data_ + dimension * id, dimension); pool.emplace_back(Neighbor(id, dist, true)); } @@ -613,7 +620,8 @@ NsgIndex::SelectEdge(unsigned& cursor, std::vector& sort_pool, std::ve auto& p = pool[cursor]; bool should_link = true; for (size_t t = 0; t < result.size(); ++t) { - float dist = calculate(ori_data_ + dimension * result[t].id, ori_data_ + dimension * p.id, dimension); + float dist = + distance_->Compare(ori_data_ + dimension * result[t].id, ori_data_ + dimension * p.id, dimension); if (dist < p.distance) { should_link = false; diff --git a/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSG.h b/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSG.h index 160c076e45..5dd128610f 100644 --- a/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSG.h +++ b/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSG.h @@ -22,18 +22,16 @@ #include #include + +#include "Distance.h" #include "Neighbor.h" +#include "knowhere/common/Config.h" namespace knowhere { namespace algo { using node_t = int64_t; -enum class MetricType { - METRIC_INNER_PRODUCT = 0, - METRIC_L2 = 1, -}; - struct BuildParams { size_t search_length; size_t out_degree; @@ -50,7 +48,8 @@ class NsgIndex { public: size_t dimension; size_t ntotal; // totabl nb of indexed vectors - MetricType metric_type; // L2 | IP + METRICTYPE metric_type; // L2 | IP + Distance* distance_; float* ori_data_; int64_t* ids_; // TODO: support different type @@ -69,7 +68,7 @@ class NsgIndex { size_t out_degree; public: - explicit NsgIndex(const size_t& dimension, const size_t& n, MetricType metric = MetricType::METRIC_L2); + explicit NsgIndex(const size_t& dimension, const size_t& n, METRICTYPE metric = METRICTYPE::L2); NsgIndex() = default; diff --git a/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSGHelper.cpp b/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSGHelper.cpp index 05e8d18787..dd250570b8 100644 --- a/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSGHelper.cpp +++ b/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSGHelper.cpp @@ -16,7 +16,6 @@ // under the License. #include -#include #include "knowhere/index/vector_index/nsg/NSGHelper.h" @@ -27,9 +26,9 @@ namespace algo { int InsertIntoPool(Neighbor* addr, unsigned K, Neighbor nn) { //>> Fix: Add assert - for (unsigned int i = 0; i < K; ++i) { - assert(addr[i].id != nn.id); - } + // for (unsigned int i = 0; i < K; ++i) { + // assert(addr[i].id != nn.id); + // } // find the location to insert int left = 0, right = K - 1; @@ -68,114 +67,5 @@ InsertIntoPool(Neighbor* addr, unsigned K, Neighbor nn) { return right; } -// TODO: support L2 / IP -float -calculate(const float* a, const float* b, unsigned size) { - float result = 0; - -#ifdef __GNUC__ -#ifdef __AVX__ - -#define AVX_L2SQR(addr1, addr2, dest, tmp1, tmp2) \ - tmp1 = _mm256_loadu_ps(addr1); \ - tmp2 = _mm256_loadu_ps(addr2); \ - tmp1 = _mm256_sub_ps(tmp1, tmp2); \ - tmp1 = _mm256_mul_ps(tmp1, tmp1); \ - dest = _mm256_add_ps(dest, tmp1); - - __m256 sum; - __m256 l0, l1; - __m256 r0, r1; - unsigned D = (size + 7) & ~7U; - unsigned DR = D % 16; - unsigned DD = D - DR; - const float* l = a; - const float* r = b; - const float* e_l = l + DD; - const float* e_r = r + DD; - float unpack[8] __attribute__((aligned(32))) = {0, 0, 0, 0, 0, 0, 0, 0}; - - sum = _mm256_loadu_ps(unpack); - if (DR) { - AVX_L2SQR(e_l, e_r, sum, l0, r0); - } - - for (unsigned i = 0; i < DD; i += 16, l += 16, r += 16) { - AVX_L2SQR(l, r, sum, l0, r0); - AVX_L2SQR(l + 8, r + 8, sum, l1, r1); - } - _mm256_storeu_ps(unpack, sum); - result = unpack[0] + unpack[1] + unpack[2] + unpack[3] + unpack[4] + unpack[5] + unpack[6] + unpack[7]; - -#else -#ifdef __SSE2__ -#define SSE_L2SQR(addr1, addr2, dest, tmp1, tmp2) \ - tmp1 = _mm_load_ps(addr1); \ - tmp2 = _mm_load_ps(addr2); \ - tmp1 = _mm_sub_ps(tmp1, tmp2); \ - tmp1 = _mm_mul_ps(tmp1, tmp1); \ - dest = _mm_add_ps(dest, tmp1); - - __m128 sum; - __m128 l0, l1, l2, l3; - __m128 r0, r1, r2, r3; - unsigned D = (size + 3) & ~3U; - unsigned DR = D % 16; - unsigned DD = D - DR; - const float* l = a; - const float* r = b; - const float* e_l = l + DD; - const float* e_r = r + DD; - float unpack[4] __attribute__((aligned(16))) = {0, 0, 0, 0}; - - sum = _mm_load_ps(unpack); - switch (DR) { - case 12: - SSE_L2SQR(e_l + 8, e_r + 8, sum, l2, r2); - case 8: - SSE_L2SQR(e_l + 4, e_r + 4, sum, l1, r1); - case 4: - SSE_L2SQR(e_l, e_r, sum, l0, r0); - default: - break; - } - for (unsigned i = 0; i < DD; i += 16, l += 16, r += 16) { - SSE_L2SQR(l, r, sum, l0, r0); - SSE_L2SQR(l + 4, r + 4, sum, l1, r1); - SSE_L2SQR(l + 8, r + 8, sum, l2, r2); - SSE_L2SQR(l + 12, r + 12, sum, l3, r3); - } - _mm_storeu_ps(unpack, sum); - result += unpack[0] + unpack[1] + unpack[2] + unpack[3]; - -// nomal distance -#else - - float diff0, diff1, diff2, diff3; - const float* last = a + size; - const float* unroll_group = last - 3; - - /* Process 4 items with each loop for efficiency. */ - while (a < unroll_group) { - diff0 = a[0] - b[0]; - diff1 = a[1] - b[1]; - diff2 = a[2] - b[2]; - diff3 = a[3] - b[3]; - result += diff0 * diff0 + diff1 * diff1 + diff2 * diff2 + diff3 * diff3; - a += 4; - b += 4; - } - /* Process last 0-3 pixels. Not needed for standard vector lengths. */ - while (a < last) { - diff0 = *a++ - *b++; - result += diff0 * diff0; - } -#endif -#endif -#endif - - return result; -} - -} // namespace algo +}; // namespace algo } // namespace knowhere diff --git a/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSGHelper.h b/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSGHelper.h index 5007cf019c..a909dd84e7 100644 --- a/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSGHelper.h +++ b/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSGHelper.h @@ -17,21 +17,13 @@ #pragma once -#include -#include - -#include - -#include "NSG.h" -#include "knowhere/common/Config.h" +#include "Neighbor.h" namespace knowhere { namespace algo { extern int InsertIntoPool(Neighbor* addr, unsigned K, Neighbor nn); -extern float -calculate(const float* a, const float* b, unsigned size); } // namespace algo } // namespace knowhere diff --git a/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSGIO.h b/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSGIO.h index 12913b69df..9f2a42c4ad 100644 --- a/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSGIO.h +++ b/core/src/index/knowhere/knowhere/index/vector_index/nsg/NSGIO.h @@ -18,7 +18,6 @@ #pragma once #include "NSG.h" -#include "knowhere/index/vector_index/IndexIVF.h" #include "knowhere/index/vector_index/helpers/FaissIO.h" namespace knowhere { @@ -26,6 +25,7 @@ namespace algo { extern void write_index(NsgIndex* index, MemoryIOWriter& writer); + extern NsgIndex* read_index(MemoryIOReader& reader); diff --git a/core/src/index/unittest/test_nsg/test_nsg.cpp b/core/src/index/unittest/test_nsg/test_nsg.cpp index 47c014e691..4722c7e8f6 100644 --- a/core/src/index/unittest/test_nsg/test_nsg.cpp +++ b/core/src/index/unittest/test_nsg/test_nsg.cpp @@ -24,6 +24,8 @@ #ifdef MILVUS_GPU_VERSION #include "knowhere/index/vector_index/helpers/FaissGpuResourceMgr.h" #endif + +#include "knowhere/common/Timer.h" #include "knowhere/index/vector_index/nsg/NSGIO.h" #include "unittest/utils.h" @@ -95,20 +97,19 @@ TEST_F(NSGInterfaceTest, basic_test) { index_->Add(base_dataset, knowhere::Config()); index_->Seal(); }); - - { - // std::cout << "k = 1" << std::endl; - // new_index->Search(GenQuery(1), Config::object{{"k", 1}}); - // new_index->Search(GenQuery(10), Config::object{{"k", 1}}); - // new_index->Search(GenQuery(100), Config::object{{"k", 1}}); - // new_index->Search(GenQuery(1000), Config::object{{"k", 1}}); - // new_index->Search(GenQuery(10000), Config::object{{"k", 1}}); - - // std::cout << "k = 5" << std::endl; - // new_index->Search(GenQuery(1), Config::object{{"k", 5}}); - // new_index->Search(GenQuery(20), Config::object{{"k", 5}}); - // new_index->Search(GenQuery(100), Config::object{{"k", 5}}); - // new_index->Search(GenQuery(300), Config::object{{"k", 5}}); - // new_index->Search(GenQuery(500), Config::object{{"k", 5}}); - } +} + +TEST_F(NSGInterfaceTest, comparetest) { + knowhere::algo::DistanceL2 distanceL2; + knowhere::algo::DistanceIP distanceIP; + + knowhere::TimeRecorder tc("Compare"); + for (int i = 0; i < 1000; ++i) { + distanceL2.Compare(xb.data(), xq.data(), 256); + } + tc.RecordSection("L2"); + for (int i = 0; i < 1000; ++i) { + distanceIP.Compare(xb.data(), xq.data(), 256); + } + tc.RecordSection("IP"); } diff --git a/core/src/scheduler/BuildMgr.h b/core/src/scheduler/BuildMgr.h index 805c01aafd..3c466421a0 100644 --- a/core/src/scheduler/BuildMgr.h +++ b/core/src/scheduler/BuildMgr.h @@ -55,6 +55,11 @@ class BuildMgr { } } + int64_t + NumOfAvailable() { + return available_; + } + private: std::int64_t available_; std::mutex mutex_; diff --git a/core/src/scheduler/TaskTable.cpp b/core/src/scheduler/TaskTable.cpp index 425eb0ab06..1bb6525215 100644 --- a/core/src/scheduler/TaskTable.cpp +++ b/core/src/scheduler/TaskTable.cpp @@ -178,7 +178,8 @@ TaskTable::PickToLoad(uint64_t limit) { // if task is a build index task, limit it if (task->Type() == TaskType::BuildIndexTask && task->path().Current() == "cpu") { - if (not BuildMgrInst::GetInstance()->Take()) { + if (BuildMgrInst::GetInstance()->NumOfAvailable() < 1) { + SERVER_LOG_WARNING << "BuildMgr doesnot have available place for building index"; continue; } } diff --git a/core/src/scheduler/resource/Resource.cpp b/core/src/scheduler/resource/Resource.cpp index 8cf03275f7..596d7da468 100644 --- a/core/src/scheduler/resource/Resource.cpp +++ b/core/src/scheduler/resource/Resource.cpp @@ -178,6 +178,10 @@ Resource::loader_function() { if (task_item == nullptr) { break; } + if (task_item->task->Type() == TaskType::BuildIndexTask && name() == "cpu") { + BuildMgrInst::GetInstance()->Take(); + SERVER_LOG_DEBUG << name() << " load BuildIndexTask"; + } LoadFile(task_item->task); task_item->Loaded(); if (task_item->from) { @@ -208,7 +212,6 @@ Resource::executor_function() { if (task_item == nullptr) { break; } - auto start = get_current_timestamp(); Process(task_item->task); auto finish = get_current_timestamp(); diff --git a/core/src/utils/CommonUtil.cpp b/core/src/utils/CommonUtil.cpp index 26e43619fb..cdfae8f1e5 100644 --- a/core/src/utils/CommonUtil.cpp +++ b/core/src/utils/CommonUtil.cpp @@ -16,6 +16,9 @@ // under the License. #include "utils/CommonUtil.h" +#include "cache/CpuCacheMgr.h" +#include "cache/GpuCacheMgr.h" +#include "server/Config.h" #include "utils/Log.h" #include @@ -27,6 +30,7 @@ #include #include #include +#include #include "boost/filesystem.hpp" @@ -222,5 +226,24 @@ CommonUtil::ConvertTime(tm time_struct, time_t& time_integer) { time_integer = mktime(&time_struct); } +void +CommonUtil::EraseFromCache(const std::string& item_key) { + if (item_key.empty()) { + SERVER_LOG_ERROR << "Empty key cannot be erased from cache"; + return; + } + + cache::CpuCacheMgr::GetInstance()->EraseItem(item_key); + +#ifdef MILVUS_GPU_VERSION + server::Config& config = server::Config::GetInstance(); + std::vector gpus; + Status s = config.GetGpuResourceConfigSearchResources(gpus); + for (auto& gpu : gpus) { + cache::GpuCacheMgr::GetInstance(gpu)->EraseItem(item_key); + } +#endif +} + } // namespace server } // namespace milvus diff --git a/core/src/utils/CommonUtil.h b/core/src/utils/CommonUtil.h index 121196986a..39b553d830 100644 --- a/core/src/utils/CommonUtil.h +++ b/core/src/utils/CommonUtil.h @@ -56,6 +56,9 @@ class CommonUtil { ConvertTime(time_t time_integer, tm& time_struct); static void ConvertTime(tm time_struct, time_t& time_integer); + + static void + EraseFromCache(const std::string& item_key); }; } // namespace server diff --git a/core/unittest/db/test_meta.cpp b/core/unittest/db/test_meta.cpp index 143bf39383..b89c73c296 100644 --- a/core/unittest/db/test_meta.cpp +++ b/core/unittest/db/test_meta.cpp @@ -329,7 +329,7 @@ TEST_F(MetaTest, TABLE_FILES_TEST) { status = impl_->CreateTableFile(table_file); table_file.file_type_ = milvus::engine::meta::TableFileSchema::NEW; status = impl_->UpdateTableFile(table_file); - status = impl_->CleanUp(); + status = impl_->CleanUpShadowFiles(); ASSERT_TRUE(status.ok()); status = impl_->DropTable(table_id); diff --git a/docker/build_env/cpu/ubuntu16.04/Dockerfile b/docker/build_env/cpu/ubuntu16.04/Dockerfile index 45e2b53938..c44eaf44dc 100644 --- a/docker/build_env/cpu/ubuntu16.04/Dockerfile +++ b/docker/build_env/cpu/ubuntu16.04/Dockerfile @@ -8,7 +8,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends wget ca-certifi sh -c 'echo deb https://apt.repos.intel.com/mkl all main > /etc/apt/sources.list.d/intel-mkl.list' && \ wget -qO- "https://cmake.org/files/v3.14/cmake-3.14.3-Linux-x86_64.tar.gz" | tar --strip-components=1 -xz -C /usr/local && \ apt-get update && apt-get install -y --no-install-recommends \ - g++ git gfortran lsb-core \ + g++ git gfortran lsb-core ccache \ libboost-serialization-dev libboost-filesystem-dev libboost-system-dev libboost-regex-dev \ curl libtool automake libssl-dev pkg-config libcurl4-openssl-dev python3-pip \ clang-format-6.0 clang-tidy-6.0 \ diff --git a/docker/build_env/cpu/ubuntu18.04/Dockerfile b/docker/build_env/cpu/ubuntu18.04/Dockerfile index 7c76e2ec7a..8e4a90c819 100644 --- a/docker/build_env/cpu/ubuntu18.04/Dockerfile +++ b/docker/build_env/cpu/ubuntu18.04/Dockerfile @@ -8,7 +8,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends wget ca-certifi sh -c 'echo deb https://apt.repos.intel.com/mkl all main > /etc/apt/sources.list.d/intel-mkl.list' && \ wget -qO- "https://cmake.org/files/v3.14/cmake-3.14.3-Linux-x86_64.tar.gz" | tar --strip-components=1 -xz -C /usr/local && \ apt-get update && apt-get install -y --no-install-recommends \ - g++ git gfortran lsb-core \ + g++ git gfortran lsb-core ccache \ libboost-serialization-dev libboost-filesystem-dev libboost-system-dev libboost-regex-dev \ curl libtool automake libssl-dev pkg-config libcurl4-openssl-dev python3-pip \ clang-format-6.0 clang-tidy-6.0 \ diff --git a/docker/build_env/gpu/ubuntu16.04/Dockerfile b/docker/build_env/gpu/ubuntu16.04/Dockerfile index d35a7dccfd..55190f1f99 100644 --- a/docker/build_env/gpu/ubuntu16.04/Dockerfile +++ b/docker/build_env/gpu/ubuntu16.04/Dockerfile @@ -8,7 +8,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends wget && \ apt-key add /tmp/GPG-PUB-KEY-INTEL-SW-PRODUCTS-2019.PUB && \ sh -c 'echo deb https://apt.repos.intel.com/mkl all main > /etc/apt/sources.list.d/intel-mkl.list' && \ apt-get update && apt-get install -y --no-install-recommends \ - git flex bison gfortran lsb-core \ + git flex bison gfortran lsb-core ccache \ curl libtool automake libboost1.58-all-dev libssl-dev pkg-config libcurl4-openssl-dev python3-pip \ clang-format-6.0 clang-tidy-6.0 \ lcov mysql-client libmysqlclient-dev intel-mkl-gnu-2019.5-281 intel-mkl-core-2019.5-281 && \ diff --git a/docker/build_env/gpu/ubuntu18.04/Dockerfile b/docker/build_env/gpu/ubuntu18.04/Dockerfile index 9f2f3f55ac..178551049c 100644 --- a/docker/build_env/gpu/ubuntu18.04/Dockerfile +++ b/docker/build_env/gpu/ubuntu18.04/Dockerfile @@ -8,7 +8,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends wget && \ apt-key add /tmp/GPG-PUB-KEY-INTEL-SW-PRODUCTS-2019.PUB && \ sh -c 'echo deb https://apt.repos.intel.com/mkl all main > /etc/apt/sources.list.d/intel-mkl.list' && \ apt-get update && apt-get install -y --no-install-recommends \ - git flex bison gfortran lsb-core \ + git flex bison gfortran lsb-core ccache \ curl libtool automake libboost-all-dev libssl-dev pkg-config libcurl4-openssl-dev python3-pip \ clang-format-6.0 clang-tidy-6.0 \ lcov mysql-client libmysqlclient-dev intel-mkl-gnu-2019.5-281 intel-mkl-core-2019.5-281 && \ diff --git a/shards/.dockerignore b/shards/.dockerignore index e450610057..cb7486f242 100644 --- a/shards/.dockerignore +++ b/shards/.dockerignore @@ -11,3 +11,4 @@ __pycache__ *.md *.yml *.yaml +*/metadata/ diff --git a/shards/Makefile b/shards/Makefile index c8aa6127f8..5127882d6f 100644 --- a/shards/Makefile +++ b/shards/Makefile @@ -13,6 +13,12 @@ clean_deploy: cd all_in_one && docker-compose -f all_in_one.yml down && cd - probe_deploy: docker run --rm --name probe --net=host milvusdb/mishards /bin/bash -c "python all_in_one/probe_test.py" +deploy_m: clean_deploy_m + cd all_in_one_with_mysql && docker-compose -f all_in_one.yml up -d && cd - +clean_deploy_m: + cd all_in_one_with_mysql && docker-compose -f all_in_one.yml down && cd - +probe_deploy_m: + docker run --rm --name probe --net=host milvusdb/mishards /bin/bash -c "python all_in_one_with_mysql/probe_test.py" cluster: cd kubernetes_demo;./start.sh baseup;sleep 10;./start.sh appup;cd - clean_cluster: @@ -26,7 +32,7 @@ probe: docker run --rm --name probe --net=host milvusdb/mishards /bin/bash -c "python all_in_one/probe_test.py --port=${PORT} --host=${HOST}" clean_coverage: rm -rf cov_html -clean: clean_coverage clean_deploy clean_cluster +clean: clean_coverage clean_deploy clean_cluster clean_deploy_m style: pycodestyle --config=. coverage: diff --git a/shards/all_in_one_with_mysql/all_in_one.yml b/shards/all_in_one_with_mysql/all_in_one.yml new file mode 100644 index 0000000000..6619635f28 --- /dev/null +++ b/shards/all_in_one_with_mysql/all_in_one.yml @@ -0,0 +1,77 @@ +version: "2.3" +services: + milvus-mysql: + restart: always + image: mysql:5.7 + volumes: + - ./mysqld.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf + - ./metadata:/var/lib/mysql + environment: + MYSQL_ROOT_PASSWORD: 'milvusroot' + MYSQL_DATABASE: 'milvus' + healthcheck: + test: ["CMD", "sleep", "5"] + interval: 1s + timeout: 10s + retries: 2 + + milvus_wr: + runtime: nvidia + restart: always + image: milvusdb/milvus + volumes: + - /tmp/milvus/db:/opt/milvus/db + - ./wr_server.yml:/opt/milvus/conf/server_config.yaml + depends_on: + milvus-mysql: + condition: service_healthy + + milvus_ro: + runtime: nvidia + restart: always + image: milvusdb/milvus + volumes: + - /tmp/milvus/db:/opt/milvus/db + - ./ro_server.yml:/opt/milvus/conf/server_config.yaml + depends_on: + - milvus-mysql + - milvus_wr + + jaeger: + restart: always + image: jaegertracing/all-in-one:1.14 + ports: + - "0.0.0.0:5775:5775/udp" + - "0.0.0.0:16686:16686" + - "0.0.0.0:9441:9441" + environment: + COLLECTOR_ZIPKIN_HTTP_PORT: 9411 + + mishards: + restart: always + image: milvusdb/mishards + ports: + - "0.0.0.0:19531:19531" + - "0.0.0.0:19532:19532" + volumes: + - /tmp/milvus/db:/tmp/milvus/db + # - /tmp/mishards_env:/source/mishards/.env + command: ["python", "mishards/main.py"] + environment: + FROM_EXAMPLE: 'true' + SQLALCHEMY_DATABASE_URI: mysql+pymysql://root:milvusroot@milvus-mysql:3306/milvus?charset=utf8mb4 + DEBUG: 'true' + SERVER_PORT: 19531 + WOSERVER: tcp://milvus_wr:19530 + DISCOVERY_PLUGIN_PATH: static + DISCOVERY_STATIC_HOSTS: milvus_wr,milvus_ro + TRACER_CLASS_NAME: jaeger + TRACING_SERVICE_NAME: mishards-demo + TRACING_REPORTING_HOST: jaeger + TRACING_REPORTING_PORT: 5775 + + depends_on: + - milvus_wr + - milvus_ro + - milvus-mysql + - jaeger diff --git a/shards/all_in_one_with_mysql/mysqld.cnf b/shards/all_in_one_with_mysql/mysqld.cnf new file mode 100644 index 0000000000..5f63676757 --- /dev/null +++ b/shards/all_in_one_with_mysql/mysqld.cnf @@ -0,0 +1,28 @@ +[mysqld] +pid-file = /var/run/mysqld/mysqld.pid +socket = /var/run/mysqld/mysqld.sock +datadir = /var/lib/mysql +log-error = /var/log/mysql/error.log +bind-address = 0.0.0.0 +symbolic-links=0 +character-set-server = utf8mb4 +collation-server = utf8mb4_unicode_ci +init_connect='SET NAMES utf8mb4' +skip-character-set-client-handshake = true +max_connections = 1000 +wait_timeout = 31536000 +table_open_cache = 128 +external-locking = FALSE +binlog_cache_size = 1M +max_heap_table_size = 8M +tmp_table_size = 16M +read_rnd_buffer_size = 8M +sort_buffer_size = 8M +join_buffer_size = 8M +thread_cache_size = 32 +query_cache_size = 64M +innodb_buffer_pool_size = 64M +innodb_flush_log_at_trx_commit = 0 +innodb_log_buffer_size = 2M +max_allowed_packet=64M +explicit_defaults_for_timestamp=true \ No newline at end of file diff --git a/shards/all_in_one_with_mysql/probe_test.py b/shards/all_in_one_with_mysql/probe_test.py new file mode 100644 index 0000000000..6250465910 --- /dev/null +++ b/shards/all_in_one_with_mysql/probe_test.py @@ -0,0 +1,25 @@ +from milvus import Milvus + +RED = '\033[0;31m' +GREEN = '\033[0;32m' +ENDC = '' + + +def test(host='127.0.0.1', port=19531): + client = Milvus() + try: + status = client.connect(host=host, port=port) + if status.OK(): + print('{}Pass: Connected{}'.format(GREEN, ENDC)) + return 0 + else: + print('{}Error: {}{}'.format(RED, status, ENDC)) + return 1 + except Exception as exc: + print('{}Error: {}{}'.format(RED, exc, ENDC)) + return 1 + + +if __name__ == '__main__': + import fire + fire.Fire(test) diff --git a/shards/all_in_one_with_mysql/ro_server.yml b/shards/all_in_one_with_mysql/ro_server.yml new file mode 100644 index 0000000000..768fafb1a0 --- /dev/null +++ b/shards/all_in_one_with_mysql/ro_server.yml @@ -0,0 +1,42 @@ +server_config: + address: 0.0.0.0 # milvus server ip address (IPv4) + port: 19530 # port range: 1025 ~ 65534 + deploy_mode: cluster_readonly # deployment type: single, cluster_readonly, cluster_writable + time_zone: UTC+8 + +db_config: + primary_path: /opt/milvus # path used to store data and meta + secondary_path: # path used to store data only, split by semicolon + + backend_url: mysql://root:milvusroot@milvus-mysql:3306/milvus + # URI format: dialect://username:password@host:port/database + # Keep 'dialect://:@:/', and replace other texts with real values + # Replace 'dialect' with 'mysql' or 'sqlite' + + insert_buffer_size: 1 # GB, maximum insert buffer size allowed + # sum of insert_buffer_size and cpu_cache_capacity cannot exceed total memory + + preload_table: # preload data at startup, '*' means load all tables, empty value means no preload + # you can specify preload tables like this: table1,table2,table3 + +metric_config: + enable_monitor: false # enable monitoring or not + collector: prometheus # prometheus + prometheus_config: + port: 8080 # port prometheus uses to fetch metrics + +cache_config: + cpu_cache_capacity: 4 # GB, CPU memory used for cache + cpu_cache_threshold: 0.85 # percentage of data that will be kept when cache cleanup is triggered + gpu_cache_capacity: 1 # GB, GPU memory used for cache + gpu_cache_threshold: 0.85 # percentage of data that will be kept when cache cleanup is triggered + cache_insert_data: false # whether to load inserted data into cache + +engine_config: + use_blas_threshold: 800 # if nq < use_blas_threshold, use SSE, faster with fluctuated response times + # if nq >= use_blas_threshold, use OpenBlas, slower with stable response times + +resource_config: + search_resources: # define the GPUs used for search computation, valid value: gpux + - gpu0 + index_build_device: gpu0 # GPU used for building index diff --git a/shards/all_in_one_with_mysql/wr_server.yml b/shards/all_in_one_with_mysql/wr_server.yml new file mode 100644 index 0000000000..b2332532e5 --- /dev/null +++ b/shards/all_in_one_with_mysql/wr_server.yml @@ -0,0 +1,41 @@ +server_config: + address: 0.0.0.0 # milvus server ip address (IPv4) + port: 19530 # port range: 1025 ~ 65534 + deploy_mode: cluster_writable # deployment type: single, cluster_readonly, cluster_writable + time_zone: UTC+8 + +db_config: + primary_path: /opt/milvus # path used to store data and meta + secondary_path: # path used to store data only, split by semicolon + + backend_url: mysql://root:milvusroot@milvus-mysql:3306/milvus # URI format: dialect://username:password@host:port/database + # Keep 'dialect://:@:/', and replace other texts with real values + # Replace 'dialect' with 'mysql' or 'sqlite' + + insert_buffer_size: 2 # GB, maximum insert buffer size allowed + # sum of insert_buffer_size and cpu_cache_capacity cannot exceed total memory + + preload_table: # preload data at startup, '*' means load all tables, empty value means no preload + # you can specify preload tables like this: table1,table2,table3 + +metric_config: + enable_monitor: false # enable monitoring or not + collector: prometheus # prometheus + prometheus_config: + port: 8080 # port prometheus uses to fetch metrics + +cache_config: + cpu_cache_capacity: 2 # GB, CPU memory used for cache + cpu_cache_threshold: 0.85 # percentage of data that will be kept when cache cleanup is triggered + gpu_cache_capacity: 2 # GB, GPU memory used for cache + gpu_cache_threshold: 0.85 # percentage of data that will be kept when cache cleanup is triggered + cache_insert_data: false # whether to load inserted data into cache + +engine_config: + use_blas_threshold: 800 # if nq < use_blas_threshold, use SSE, faster with fluctuated response times + # if nq >= use_blas_threshold, use OpenBlas, slower with stable response times + +resource_config: + search_resources: # define the GPUs used for search computation, valid value: gpux + - gpu0 + index_build_device: gpu0 # GPU used for building index diff --git a/shards/mishards/db_base.py b/shards/mishards/db_base.py index e55c330352..a80c95aa2f 100644 --- a/shards/mishards/db_base.py +++ b/shards/mishards/db_base.py @@ -28,7 +28,12 @@ class DB: if url.get_backend_name() == 'sqlite': self.engine = create_engine(url) else: - self.engine = create_engine(uri, pool_size, pool_recycle, pool_timeout, pool_pre_ping, echo, max_overflow) + self.engine = create_engine(uri, pool_size=pool_size, + pool_recycle=pool_recycle, + pool_timeout=pool_timeout, + pool_pre_ping=pool_pre_ping, + echo=echo, + max_overflow=max_overflow) self.uri = uri self.url = url diff --git a/shards/mishards/exception_handlers.py b/shards/mishards/exception_handlers.py index c79a6db5a3..e7e0bfc4cb 100644 --- a/shards/mishards/exception_handlers.py +++ b/shards/mishards/exception_handlers.py @@ -24,8 +24,11 @@ def resp_handler(err, error_code): if resp_class == milvus_pb2.VectorIds: return resp_class(status=status, vector_id_array=[]) - if resp_class == milvus_pb2.TopKQueryResultList: - return resp_class(status=status, topk_query_result=[]) + if resp_class == milvus_pb2.TopKQueryResult: + return resp_class(status=status, + row_num=0, + ids=[], + distances=[]) if resp_class == milvus_pb2.TableRowCount: return resp_class(status=status, table_row_count=-1) diff --git a/shards/mishards/service_handler.py b/shards/mishards/service_handler.py index fc0ee0fa2b..91e2c7a2a0 100644 --- a/shards/mishards/service_handler.py +++ b/shards/mishards/service_handler.py @@ -49,7 +49,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): status = status_pb2.Status(error_code=status_pb2.SUCCESS, reason="Success") if not files_n_topk_results: - return status, [] + return status, [], [] merge_id_results = [] merge_dis_results = [] @@ -58,9 +58,13 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): for files_collection in files_n_topk_results: if isinstance(files_collection, tuple): status, _ = files_collection - return status, [] - + return status, [], [] + row_num = files_collection.row_num + # row_num is equal to 0, result is empty + if not row_num: + continue + ids = files_collection.ids diss = files_collection.distances # distance collections # TODO: batch_len is equal to topk, may need to compare with topk @@ -322,7 +326,7 @@ class ServiceHandler(milvus_pb2_grpc.MilvusServiceServicer): topk_result_list = milvus_pb2.TopKQueryResult( status=status_pb2.Status(error_code=status.error_code, reason=status.reason), - row_num=len(query_record_array), + row_num=len(request.query_record_array) if len(id_results) else 0, ids=id_results, distances=dis_results) return topk_result_list diff --git a/shards/utils/colors.py b/shards/utils/colors.py new file mode 100644 index 0000000000..7f55ae1a2f --- /dev/null +++ b/shards/utils/colors.py @@ -0,0 +1,72 @@ +# Reset +Color_Off='\033[0m' # Text Reset + +# Regular Colors +Black='\033[0;30m' # Black +Red='\033[0;31m' # Red +Green='\033[0;32m' # Green +Yellow='\033[0;33m' # Yellow +Blue='\033[0;34m' # Blue +Purple='\033[0;35m' # Purple +Cyan='\033[0;36m' # Cyan +White='\033[0;37m' # White + +# Bold +BBlack='\033[1;30m' # Black +BRed='\033[1;31m' # Red +BGreen='\033[1;32m' # Green +BYellow='\033[1;33m' # Yellow +BBlue='\033[1;34m' # Blue +BPurple='\033[1;35m' # Purple +BCyan='\033[1;36m' # Cyan +BWhite='\033[1;37m' # White + +# Underline +UBlack='\033[4;30m' # Black +URed='\033[4;31m' # Red +UGreen='\033[4;32m' # Green +UYellow='\033[4;33m' # Yellow +UBlue='\033[4;34m' # Blue +UPurple='\033[4;35m' # Purple +UCyan='\033[4;36m' # Cyan +UWhite='\033[4;37m' # White + +# Background +On_Black='\033[40m' # Black +On_Red='\033[41m' # Red +On_Green='\033[42m' # Green +On_Yellow='\033[43m' # Yellow +On_Blue='\033[44m' # Blue +On_Purple='\033[45m' # Purple +On_Cyan='\033[46m' # Cyan +On_White='\033[47m' # White + +# High Intensity +IBlack='\033[0;90m' # Black +IRed='\033[0;91m' # Red +IGreen='\033[0;92m' # Green +IYellow='\033[0;93m' # Yellow +IBlue='\033[0;94m' # Blue +IPurple='\033[0;95m' # Purple +ICyan='\033[0;96m' # Cyan +IWhite='\033[0;97m' # White + +# Bold High Intensity +BIBlack='\033[1;90m' # Black +BIRed='\033[1;91m' # Red +BIGreen='\033[1;92m' # Green +BIYellow='\033[1;93m' # Yellow +BIBlue='\033[1;94m' # Blue +BIPurple='\033[1;95m' # Purple +BICyan='\033[1;96m' # Cyan +BIWhite='\033[1;97m' # White + +# High Intensity backgrounds +On_IBlack='\033[0;100m' # Black +On_IRed='\033[0;101m' # Red +On_IGreen='\033[0;102m' # Green +On_IYellow='\033[0;103m' # Yellow +On_IBlue='\033[0;104m' # Blue +On_IPurple='\033[0;105m' # Purple +On_ICyan='\033[0;106m' # Cyan +On_IWhite='\033[0;107m' # White diff --git a/shards/utils/logger_helper.py b/shards/utils/logger_helper.py index b4e3b9c5b6..5141f5dc73 100644 --- a/shards/utils/logger_helper.py +++ b/shards/utils/logger_helper.py @@ -3,6 +3,7 @@ import datetime from pytz import timezone from logging import Filter import logging.config +from utils import colors class InfoFilter(logging.Filter): @@ -31,29 +32,53 @@ class CriticalFilter(logging.Filter): COLORS = { - 'HEADER': '\033[95m', - 'INFO': '\033[92m', - 'DEBUG': '\033[94m', - 'WARNING': '\033[93m', - 'ERROR': '\033[95m', - 'CRITICAL': '\033[91m', - 'ENDC': '\033[0m', + 'HEADER': colors.BWhite, + 'INFO': colors.On_IWhite + colors.BBlack, + 'INFOM': colors.White, + 'DEBUG': colors.On_IBlue + colors.BWhite, + 'DEBUGM': colors.BIBlue, + 'WARNING': colors.On_IYellow + colors.BWhite, + 'WARNINGM': colors.BIYellow, + 'ERROR': colors.On_IRed + colors.BWhite, + 'ERRORM': colors.BIRed, + 'CRITICAL': colors.On_Red + colors.BWhite, + 'CRITICALM': colors.BRed, + 'ASCTIME': colors.On_Cyan + colors.BIYellow, + 'MESSAGE': colors.IGreen, + 'FILENAME': colors.BCyan, + 'LINENO': colors.BCyan, + 'THREAD': colors.BCyan, + 'ENDC': colors.Color_Off, } class ColorFulFormatColMixin: def format_col(self, message_str, level_name): if level_name in COLORS.keys(): - message_str = COLORS.get(level_name) + message_str + COLORS.get( - 'ENDC') + message_str = COLORS[level_name] + message_str + COLORS['ENDC'] return message_str + def formatTime(self, record, datefmt=None): + ret = super().formatTime(record, datefmt) + ret = COLORS['ASCTIME'] + ret + COLORS['ENDC'] + return ret -class ColorfulFormatter(logging.Formatter, ColorFulFormatColMixin): + def format_record(self, record): + msg_schema = record.levelname + 'M' + record.msg = '{}{}{}'.format(COLORS[msg_schema], record.msg, COLORS['ENDC']) + record.filename = COLORS['FILENAME'] + record.filename + COLORS['ENDC'] + record.lineno = '{}{}{}'.format(COLORS['LINENO'], record.lineno, COLORS['ENDC']) + record.threadName = '{}{}{}'.format(COLORS['THREAD'], record.threadName, COLORS['ENDC']) + record.levelname = COLORS[record.levelname] + record.levelname + COLORS['ENDC'] + return record + + +class ColorfulFormatter(ColorFulFormatColMixin, logging.Formatter): def format(self, record): + record = self.format_record(record) message_str = super(ColorfulFormatter, self).format(record) - return self.format_col(message_str, level_name=record.levelname) + return message_str def config(log_level, log_path, name, tz='UTC'): @@ -76,7 +101,9 @@ def config(log_level, log_path, name, tz='UTC'): 'format': '%(asctime)s | %(levelname)s | %(name)s | %(threadName)s: %(message)s (%(filename)s:%(lineno)s)', }, 'colorful_console': { - 'format': '%(asctime)s | %(levelname)s | %(name)s | %(threadName)s: %(message)s (%(filename)s:%(lineno)s)', + 'format': '%(asctime)s | %(levelname)s: %(message)s (%(filename)s:%(lineno)s) (%(threadName)s)', + # 'format': '%(asctime)s | %(levelname)s | %(threadName)s: %(message)s (%(filename)s:%(lineno)s)', + # 'format': '%(asctime)s | %(levelname)s | %(name)s | %(threadName)s: %(message)s (%(filename)s:%(lineno)s)', '()': ColorfulFormatter, }, }, diff --git a/tests/milvus_python_test/test_add_vectors.py b/tests/milvus_python_test/test_add_vectors.py index 7245d51ea2..7c9d9e691c 100644 --- a/tests/milvus_python_test/test_add_vectors.py +++ b/tests/milvus_python_test/test_add_vectors.py @@ -1300,7 +1300,8 @@ class TestNameInvalid(object): assert not status.OK() @pytest.mark.level(2) - def test_add_vectors_with_invalid_tag_name(self, connect, get_tag_name): + def test_add_vectors_with_invalid_tag_name(self, connect, get_table_name, get_tag_name): + table_name = get_table_name tag_name = get_tag_name vectors = gen_vectors(1, dim) status, result = connect.add_vectors(table_name, vectors, partition_tag=tag_name) diff --git a/tests/milvus_python_test/utils.py b/tests/milvus_python_test/utils.py index 1686ad7129..6f7c81d135 100644 --- a/tests/milvus_python_test/utils.py +++ b/tests/milvus_python_test/utils.py @@ -69,7 +69,7 @@ def gen_invalid_ips(): "\n", "\t", "中文", - "a".join("a" for i in range(256)) + "a".join("a" for _ in range(256)) ] return ips @@ -116,7 +116,7 @@ def gen_invalid_uris(): "tcp:// :%s" % port, # "tcp://123.0.0.1:%s" % port, "tcp://127.0.0:%s" % port, - "tcp://255.0.0.0:%s" % port, + # "tcp://255.0.0.0:%s" % port, # "tcp://255.255.0.0:%s" % port, # "tcp://255.255.255.0:%s" % port, # "tcp://255.255.255.255:%s" % port,