diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 4e7a4448f2..be7024672f 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -21,6 +21,10 @@ Please mark all change in change log and use the ticket from JIRA. - MS-92 - Unify behavior of debug and release build - MS-98 - Install all unit test to installation directory - MS-115 - Change is_startup of metric_config switch from true to on +- MS-122 - Archive criteria config +- MS-124 - HasTable interface +- MS-126 - Add more error code + ## New Feature - MS-57 - Implement index load/search pipeline diff --git a/cpp/conf/server_config.yaml b/cpp/conf/server_config.yaml index 33858c9455..6258e74509 100644 --- a/cpp/conf/server_config.yaml +++ b/cpp/conf/server_config.yaml @@ -11,6 +11,8 @@ db_config: #Currently supports mysql or sqlite db_backend_url: mysql://root:1234@:/test # meta database uri index_building_threshold: 1024 # index building trigger threshold, default: 1024, unit: MB + archive_disk_threshold: 512 # triger archive action if storage size exceed this value, unit: GB + archive_days_threshold: 30 # files older than x days will be archived, unit: day metric_config: is_startup: off # if monitoring start: on, off diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index 6549a1740a..8c56c863e7 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -193,9 +193,11 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) { auto table = ConnectorPtr->select(columns(&TableSchema::state_), where(c(&TableSchema::table_id_) == table_schema.table_id_)); if (table.size() == 1) { - std::string msg = (TableSchema::TO_DELETE == std::get<0>(table[0])) ? - "Table already exists and it is in delete state, please wait a second" : "Table already exists"; - return Status::Error(msg); + if(TableSchema::TO_DELETE == std::get<0>(table[0])) { + return Status::Error("Table already exists and it is in delete state, please wait a second"); + } else { + return Status::OK();//table already exists, no error + } } } @@ -329,7 +331,7 @@ Status DBMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) { } } catch (std::exception &e) { - HandleException("Encounter exception when lookup table", e); + return HandleException("Encounter exception when lookup table", e); } return Status::OK(); @@ -359,7 +361,7 @@ Status DBMetaImpl::AllTables(std::vector& table_schema_array) { } } catch (std::exception &e) { - HandleException("Encounter exception when lookup all tables", e); + return HandleException("Encounter exception when lookup all tables", e); } return Status::OK(); @@ -656,7 +658,7 @@ Status DBMetaImpl::Archive() { for (auto kv : criterias) { auto &criteria = kv.first; auto &limit = kv.second; - if (criteria == "days") { + if (criteria == engine::ARCHIVE_CONF_DAYS) { long usecs = limit * D_SEC * US_PS; long now = utils::GetMicroSecTimeStamp(); try { @@ -672,11 +674,11 @@ Status DBMetaImpl::Archive() { return HandleException("Encounter exception when update table files", e); } } - if (criteria == "disk") { + if (criteria == engine::ARCHIVE_CONF_DISK) { uint64_t sum = 0; Size(sum); - auto to_delete = (sum - limit * G); + int64_t to_delete = (int64_t)sum - limit * G; DiscardFiles(to_delete); } } diff --git a/cpp/src/db/Factories.cpp b/cpp/src/db/Factories.cpp index a6998d10dc..15e188d3db 100644 --- a/cpp/src/db/Factories.cpp +++ b/cpp/src/db/Factories.cpp @@ -81,11 +81,11 @@ std::shared_ptr DBMetaImplFactory::Build(const DBMetaOptions& metaOp std::string dialect = pieces_match[1].str(); std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower); if (dialect.find("mysql") != std::string::npos) { - ENGINE_LOG_DEBUG << "Using MySQL"; + ENGINE_LOG_INFO << "Using MySQL"; return std::make_shared(meta::MySQLMetaImpl(metaOptions)); } else if (dialect.find("sqlite") != std::string::npos) { - ENGINE_LOG_DEBUG << "Using SQLite"; + ENGINE_LOG_INFO << "Using SQLite"; return std::make_shared(meta::DBMetaImpl(metaOptions)); } else { diff --git a/cpp/src/db/MemManager.cpp b/cpp/src/db/MemManager.cpp index fa7f3c54b0..e36b0c45ba 100644 --- a/cpp/src/db/MemManager.cpp +++ b/cpp/src/db/MemManager.cpp @@ -146,11 +146,16 @@ Status MemManager::InsertVectorsNoLock(const std::string& table_id, Status MemManager::ToImmutable() { std::unique_lock lock(mutex_); + MemIdMap temp_map; for (auto& kv: mem_id_map_) { + if(kv.second->RowCount() == 0) { + temp_map.insert(kv); + continue;//empty vector, no need to serialize + } immu_mem_list_.push_back(kv.second); } - mem_id_map_.clear(); + mem_id_map_.swap(temp_map); return Status::OK(); } @@ -168,8 +173,21 @@ Status MemManager::Serialize(std::set& table_ids) { } Status MemManager::EraseMemVector(const std::string& table_id) { - std::unique_lock lock(mutex_); - mem_id_map_.erase(table_id); + {//erase MemVector from rapid-insert cache + std::unique_lock lock(mutex_); + mem_id_map_.erase(table_id); + } + + {//erase MemVector from serialize cache + std::unique_lock lock(serialization_mtx_); + MemList temp_list; + for (auto& mem : immu_mem_list_) { + if(mem->TableId() != table_id) { + temp_list.push_back(mem); + } + } + immu_mem_list_.swap(temp_list); + } return Status::OK(); } diff --git a/cpp/src/db/MemManager.h b/cpp/src/db/MemManager.h index 2aa0183898..0ce88d504d 100644 --- a/cpp/src/db/MemManager.h +++ b/cpp/src/db/MemManager.h @@ -45,6 +45,8 @@ public: const std::string& Location() const { return schema_.location_; } + std::string TableId() const { return schema_.table_id_; } + private: MemVectors() = delete; MemVectors(const MemVectors&) = delete; diff --git a/cpp/src/db/MySQLConnectionPool.h b/cpp/src/db/MySQLConnectionPool.h index 8a240102dc..b79089c1da 100644 --- a/cpp/src/db/MySQLConnectionPool.h +++ b/cpp/src/db/MySQLConnectionPool.h @@ -2,6 +2,7 @@ #include #include +#include #include "Log.h" @@ -53,7 +54,7 @@ public: // Other half of in-use conn count limit void release(const mysqlpp::Connection* pc) override { mysqlpp::ConnectionPool::release(pc); -// ENGINE_LOG_DEBUG << "conns_in_use_ in release: " << conns_in_use_ << std::endl; + if (conns_in_use_ <= 0) { ENGINE_LOG_WARNING << "MySQLConnetionPool::release: conns_in_use_ is less than zero. conns_in_use_ = " << conns_in_use_ << std::endl; } @@ -62,6 +63,10 @@ public: } } + int getConnectionsInUse() { + return conns_in_use_; + } + void set_max_idle_time(int max_idle) { maxIdleTime_ = max_idle; } @@ -96,7 +101,7 @@ protected: private: // Number of connections currently in use - int conns_in_use_; + std::atomic conns_in_use_; // Our connection parameters std::string db_, user_, password_, server_; diff --git a/cpp/src/db/MySQLMetaImpl.cpp b/cpp/src/db/MySQLMetaImpl.cpp index 40ff30b88a..61dac3a635 100644 --- a/cpp/src/db/MySQLMetaImpl.cpp +++ b/cpp/src/db/MySQLMetaImpl.cpp @@ -158,13 +158,15 @@ namespace meta { int maxPoolSize = threadHint == 0 ? 8 : threadHint; mySQLConnectionPool_ = std::make_shared(dbName, username, password, serverAddress, port, maxPoolSize); // std::cout << "MySQL++ thread aware:" << std::to_string(connectionPtr->thread_aware()) << std::endl; - + ENGINE_LOG_DEBUG << "MySQL connection pool: maximum pool size = " << std::to_string(maxPoolSize); try { CleanUp(); { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); + +// ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: connections in use = " << mySQLConnectionPool_->getConnectionsInUse(); // if (!connectionPtr->connect(dbName, serverAddress, username, password, port)) { // return Status::Error("DB connection failed: ", connectionPtr->error()); // } @@ -281,6 +283,10 @@ namespace meta { { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::DropPartitionsByDates connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + Query dropPartitionsByDatesQuery = connectionPtr->query(); dropPartitionsByDatesQuery << "UPDATE TableFiles " << @@ -294,7 +300,6 @@ namespace meta { dropPartitionsByDatesQuery.error()); } } //Scoped Connection - } catch (const BadQuery& er) { // Handle any query errors ENGINE_LOG_ERROR << "QUERY ERROR WHEN DROPPING PARTITIONS BY DATES" << ": " << er.what(); @@ -319,6 +324,10 @@ namespace meta { { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CreateTable connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + Query createTableQuery = connectionPtr->query(); // ENGINE_LOG_DEBUG << "Create Table in"; if (table_schema.table_id_.empty()) { @@ -331,11 +340,12 @@ namespace meta { assert(res && res.num_rows() <= 1); if (res.num_rows() == 1) { int state = res[0]["state"]; - std::string msg = (TableSchema::TO_DELETE == state) ? - "Table already exists and it is in delete state, please wait a second" - : "Table already exists"; - ENGINE_LOG_WARNING << "MySQLMetaImpl::CreateTable: " << msg; - return Status::Error(msg); + if (TableSchema::TO_DELETE == state) { + return Status::Error("Table already exists and it is in delete state, please wait a second"); + } + else { + return Status::OK();//table already exists, no error + } } } // ENGINE_LOG_DEBUG << "Create Table start"; @@ -411,6 +421,10 @@ namespace meta { { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::DeleteTable connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + //soft delete table Query deleteTableQuery = connectionPtr->query(); // @@ -444,6 +458,10 @@ namespace meta { { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::DeleteTableFiles connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + //soft delete table files Query deleteTableFilesQuery = connectionPtr->query(); // @@ -484,6 +502,10 @@ namespace meta { { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::DescribeTable connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + Query describeTableQuery = connectionPtr->query(); describeTableQuery << "SELECT id, dimension, files_cnt, engine_type, store_raw_data " << "FROM Tables " << @@ -539,6 +561,10 @@ namespace meta { { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::HasTable connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + Query hasTableQuery = connectionPtr->query(); //since table_id is a unique column we just need to check whether it exists or not hasTableQuery << "SELECT EXISTS " << @@ -579,6 +605,10 @@ namespace meta { { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::AllTables connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + Query allTablesQuery = connectionPtr->query(); allTablesQuery << "SELECT id, table_id, dimension, files_cnt, engine_type, store_raw_data " << "FROM Tables " << @@ -658,6 +688,10 @@ namespace meta { { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CreateTableFile connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + Query createTableFileQuery = connectionPtr->query(); createTableFileQuery << "INSERT INTO TableFiles VALUES" << @@ -718,6 +752,10 @@ namespace meta { { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToIndex connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + Query filesToIndexQuery = connectionPtr->query(); filesToIndexQuery << "SELECT id, table_id, engine_type, file_id, file_type, size, date " << "FROM TableFiles " << @@ -794,6 +832,10 @@ namespace meta { { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToSearch connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + if (partition.empty()) { Query filesToSearchQuery = connectionPtr->query(); @@ -895,6 +937,10 @@ namespace meta { { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::FilesToMerge connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + Query filesToMergeQuery = connectionPtr->query(); filesToMergeQuery << "SELECT id, table_id, file_id, file_type, size, date " << "FROM TableFiles " << @@ -980,6 +1026,10 @@ namespace meta { { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::GetTableFiles connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + Query getTableFileQuery = connectionPtr->query(); getTableFileQuery << "SELECT engine_type, file_id, file_type, size, date " << "FROM TableFiles " << @@ -1055,6 +1105,10 @@ namespace meta { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::Archive connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + Query archiveQuery = connectionPtr->query(); archiveQuery << "UPDATE TableFiles " << "SET file_type = " << std::to_string(TableFileSchema::TO_DELETE) << " " << @@ -1098,6 +1152,10 @@ namespace meta { { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::Size connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + Query getSizeQuery = connectionPtr->query(); getSizeQuery << "SELECT SUM(size) AS sum " << "FROM TableFiles " << @@ -1151,6 +1209,10 @@ namespace meta { { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::DiscardFiles connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + Query discardFilesQuery = connectionPtr->query(); discardFilesQuery << "SELECT id, size " << "FROM TableFiles " << @@ -1220,6 +1282,10 @@ namespace meta { { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::UpdateTableFile connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + Query updateTableFileQuery = connectionPtr->query(); //if the table has been deleted, just mark the table file as TO_DELETE @@ -1293,6 +1359,10 @@ namespace meta { { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::UpdateTableFiles connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + Query updateTableFilesQuery = connectionPtr->query(); std::map has_tables; @@ -1373,8 +1443,17 @@ namespace meta { MetricCollector metric; { + +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean table files: connection in use before creating ScopedConnection = " +// << mySQLConnectionPool_->getConnectionsInUse(); + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean table files: connection in use after creating ScopedConnection = " +// << mySQLConnectionPool_->getConnectionsInUse(); +// } + Query cleanUpFilesWithTTLQuery = connectionPtr->query(); cleanUpFilesWithTTLQuery << "SELECT id, table_id, file_id, date " << "FROM TableFiles " << @@ -1443,8 +1522,16 @@ namespace meta { MetricCollector metric; { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean tables: connection in use before creating ScopedConnection = " +// << mySQLConnectionPool_->getConnectionsInUse(); + ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUpFilesWithTTL: clean tables: connection in use after creating ScopedConnection = " +// << mySQLConnectionPool_->getConnectionsInUse(); +// } + Query cleanUpFilesWithTTLQuery = connectionPtr->query(); cleanUpFilesWithTTLQuery << "SELECT id, table_id " << "FROM Tables " << @@ -1500,6 +1587,10 @@ namespace meta { try { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::CleanUp: connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + ENGINE_LOG_DEBUG << "Remove table file type as NEW"; Query cleanUpQuery = connectionPtr->query(); cleanUpQuery << "DELETE FROM TableFiles WHERE file_type = " << std::to_string(TableFileSchema::NEW) << ";"; @@ -1542,6 +1633,10 @@ namespace meta { { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::Count: connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + Query countQuery = connectionPtr->query(); countQuery << "SELECT size " << "FROM TableFiles " << @@ -1585,6 +1680,10 @@ namespace meta { ScopedConnection connectionPtr(*mySQLConnectionPool_, safe_grab); +// if (mySQLConnectionPool_->getConnectionsInUse() <= 0) { +// ENGINE_LOG_WARNING << "MySQLMetaImpl::DropAll: connection in use = " << mySQLConnectionPool_->getConnectionsInUse(); +// } + Query dropTableQuery = connectionPtr->query(); dropTableQuery << "DROP TABLE IF EXISTS Tables, TableFiles;"; if (dropTableQuery.exec()) { diff --git a/cpp/src/db/Options.cpp b/cpp/src/db/Options.cpp index ff6404751f..5f591dcb0f 100644 --- a/cpp/src/db/Options.cpp +++ b/cpp/src/db/Options.cpp @@ -24,6 +24,12 @@ ArchiveConf::ArchiveConf(const std::string& type, const std::string& criterias) ParseCritirias(criterias); } +void ArchiveConf::SetCriterias(const ArchiveConf::CriteriaT& criterial) { + for(auto& pair : criterial) { + criterias_[pair.first] = pair.second; + } +} + void ArchiveConf::ParseCritirias(const std::string& criterias) { std::stringstream ss(criterias); std::vector tokens; diff --git a/cpp/src/db/Options.h b/cpp/src/db/Options.h index 039134ebaa..dbe80f8d5f 100644 --- a/cpp/src/db/Options.h +++ b/cpp/src/db/Options.h @@ -19,14 +19,20 @@ static constexpr uint64_t ONE_KB = 1024; static constexpr uint64_t ONE_MB = ONE_KB*ONE_KB; static constexpr uint64_t ONE_GB = ONE_KB*ONE_MB; +static const std::string ARCHIVE_CONF_DISK = "disk"; +static const std::string ARCHIVE_CONF_DAYS = "days"; +static const std::string ARCHIVE_CONF_DEFAULT = ARCHIVE_CONF_DISK + ":512"; + struct ArchiveConf { using CriteriaT = std::map; - ArchiveConf(const std::string& type, const std::string& criterias = "disk:512"); + ArchiveConf(const std::string& type, const std::string& criterias = ARCHIVE_CONF_DEFAULT); const std::string& GetType() const { return type_; } const CriteriaT GetCriterias() const { return criterias_; } + void SetCriterias(const ArchiveConf::CriteriaT& criterial); + private: void ParseCritirias(const std::string& type); void ParseType(const std::string& criterias); diff --git a/cpp/src/sdk/examples/simple/src/ClientTest.cpp b/cpp/src/sdk/examples/simple/src/ClientTest.cpp index 78145446a6..19c764fd0a 100644 --- a/cpp/src/sdk/examples/simple/src/ClientTest.cpp +++ b/cpp/src/sdk/examples/simple/src/ClientTest.cpp @@ -165,6 +165,11 @@ ClientTest::Test(const std::string& address, const std::string& port) { Status stat = conn->CreateTable(tb_schema); std::cout << "CreateTable function call status: " << stat.ToString() << std::endl; PrintTableSchema(tb_schema); + + bool has_table = conn->HasTable(tb_schema.table_name); + if(has_table) { + std::cout << "Table is created" << std::endl; + } } {//describe table diff --git a/cpp/src/sdk/include/MilvusApi.h b/cpp/src/sdk/include/MilvusApi.h index 2f4532b761..302871c48b 100644 --- a/cpp/src/sdk/include/MilvusApi.h +++ b/cpp/src/sdk/include/MilvusApi.h @@ -156,6 +156,18 @@ public: virtual Status CreateTable(const TableSchema ¶m) = 0; + /** + * @brief Test table existence method + * + * This method is used to create table + * + * @param table_name, table name is going to be tested. + * + * @return Indicate if table is cexist + */ + virtual bool HasTable(const std::string &table_name) = 0; + + /** * @brief Delete table method * diff --git a/cpp/src/sdk/src/client/ClientProxy.cpp b/cpp/src/sdk/src/client/ClientProxy.cpp index 6f68344fac..1185e4988c 100644 --- a/cpp/src/sdk/src/client/ClientProxy.cpp +++ b/cpp/src/sdk/src/client/ClientProxy.cpp @@ -102,6 +102,15 @@ ClientProxy::CreateTable(const TableSchema ¶m) { return Status::OK(); } +bool +ClientProxy::HasTable(const std::string &table_name) { + if(!IsConnected()) { + return false; + } + + return ClientPtr()->interface()->HasTable(table_name); +} + Status ClientProxy::DeleteTable(const std::string &table_name) { if(!IsConnected()) { diff --git a/cpp/src/sdk/src/client/ClientProxy.h b/cpp/src/sdk/src/client/ClientProxy.h index 601bcddd8a..a2ede77c40 100644 --- a/cpp/src/sdk/src/client/ClientProxy.h +++ b/cpp/src/sdk/src/client/ClientProxy.h @@ -23,6 +23,8 @@ public: virtual Status CreateTable(const TableSchema ¶m) override; + virtual bool HasTable(const std::string &table_name) override; + virtual Status DeleteTable(const std::string &table_name) override; virtual Status AddVector(const std::string &table_name, diff --git a/cpp/src/sdk/src/interface/ConnectionImpl.cpp b/cpp/src/sdk/src/interface/ConnectionImpl.cpp index e303cf0aa0..efb5f61b1b 100644 --- a/cpp/src/sdk/src/interface/ConnectionImpl.cpp +++ b/cpp/src/sdk/src/interface/ConnectionImpl.cpp @@ -56,6 +56,11 @@ ConnectionImpl::CreateTable(const TableSchema ¶m) { return client_proxy_->CreateTable(param); } +bool +ConnectionImpl::HasTable(const std::string &table_name) { + return client_proxy_->HasTable(table_name); +} + Status ConnectionImpl::DeleteTable(const std::string &table_name) { return client_proxy_->DeleteTable(table_name); diff --git a/cpp/src/sdk/src/interface/ConnectionImpl.h b/cpp/src/sdk/src/interface/ConnectionImpl.h index 61e11c9390..0f9cd14e39 100644 --- a/cpp/src/sdk/src/interface/ConnectionImpl.h +++ b/cpp/src/sdk/src/interface/ConnectionImpl.h @@ -25,6 +25,8 @@ public: virtual Status CreateTable(const TableSchema ¶m) override; + virtual bool HasTable(const std::string &table_name) override; + virtual Status DeleteTable(const std::string &table_name) override; virtual Status AddVector(const std::string &table_name, diff --git a/cpp/src/server/DBWrapper.cpp b/cpp/src/server/DBWrapper.cpp index 6ad308a8d6..7892a57f2b 100644 --- a/cpp/src/server/DBWrapper.cpp +++ b/cpp/src/server/DBWrapper.cpp @@ -23,16 +23,30 @@ DBWrapper::DBWrapper() { if(index_size > 0) {//ensure larger than zero, unit is MB opt.index_trigger_size = (size_t)index_size * engine::ONE_MB; } - ConfigNode& serverConfig = ServerConfig::GetInstance().GetConfig(CONFIG_SERVER); - opt.mode = serverConfig.GetValue(CONFIG_CLUSTER_MODE, "single"); -// std::cout << "mode = " << opt.mode << std::endl; - CommonUtil::CreateDirectory(opt.meta.path); + //set archive config + engine::ArchiveConf::CriteriaT criterial; + int64_t disk = config.GetInt64Value(CONFIG_DB_ARCHIVE_DISK, 0); + int64_t days = config.GetInt64Value(CONFIG_DB_ARCHIVE_DAYS, 0); + if(disk > 0) { + criterial[engine::ARCHIVE_CONF_DISK] = disk; + } + if(days > 0) { + criterial[engine::ARCHIVE_CONF_DAYS] = days; + } + opt.meta.archive_conf.SetCriterias(criterial); + + //create db root folder + ServerError err = CommonUtil::CreateDirectory(opt.meta.path); + if(err != SERVER_SUCCESS) { + std::cout << "ERROR! Failed to create database root path: " << opt.meta.path << std::endl; + kill(0, SIGUSR1); + } zilliz::milvus::engine::DB::Open(opt, &db_); if(db_ == nullptr) { - SERVER_LOG_ERROR << "Failed to open db. Provided database uri = " << opt.meta.backend_uri; - throw ServerException(SERVER_NULL_POINTER, "Failed to open db"); + std::cout << "ERROR! Failed to open database" << std::endl; + kill(0, SIGUSR1); } } diff --git a/cpp/src/server/MilvusServer.cpp b/cpp/src/server/MilvusServer.cpp index 1b1b85e883..322460968f 100644 --- a/cpp/src/server/MilvusServer.cpp +++ b/cpp/src/server/MilvusServer.cpp @@ -8,7 +8,6 @@ #include "ServerConfig.h" #include "ThreadPoolServer.h" #include "DBWrapper.h" -#include "utils/Log.h" #include "milvus_types.h" #include "milvus_constants.h" @@ -24,6 +23,7 @@ #include #include +#include namespace zilliz { namespace milvus { @@ -68,7 +68,7 @@ MilvusServer::StartService() { } else if (protocol == "compact") { protocol_factory.reset(new TCompactProtocolFactory()); } else { - SERVER_LOG_ERROR << "Service protocol: " << protocol << " is not supported currently"; + //SERVER_LOG_INFO << "Service protocol: " << protocol << " is not supported currently"; return; } @@ -89,11 +89,12 @@ MilvusServer::StartService() { threadManager)); s_server->serve(); } else { - SERVER_LOG_ERROR << "Service mode: " << mode << " is not supported currently"; + //SERVER_LOG_INFO << "Service mode: " << mode << " is not supported currently"; return; } } catch (apache::thrift::TException& ex) { - SERVER_LOG_ERROR << "Server encounter exception: " << ex.what(); + std::cout << "ERROR! " << ex.what() << std::endl; + kill(0, SIGUSR1); } } diff --git a/cpp/src/server/RequestHandler.cpp b/cpp/src/server/RequestHandler.cpp index 62ce0711b4..037f80e0db 100644 --- a/cpp/src/server/RequestHandler.cpp +++ b/cpp/src/server/RequestHandler.cpp @@ -24,6 +24,15 @@ RequestHandler::CreateTable(const thrift::TableSchema ¶m) { RequestScheduler::ExecTask(task_ptr); } +bool +RequestHandler::HasTable(const std::string &table_name) { + bool has_table = false; + BaseTaskPtr task_ptr = HasTableTask::Create(table_name, has_table); + RequestScheduler::ExecTask(task_ptr); + + return has_table; +} + void RequestHandler::DeleteTable(const std::string &table_name) { BaseTaskPtr task_ptr = DeleteTableTask::Create(table_name); diff --git a/cpp/src/server/RequestHandler.h b/cpp/src/server/RequestHandler.h index 8f3eca576b..e736b7593f 100644 --- a/cpp/src/server/RequestHandler.h +++ b/cpp/src/server/RequestHandler.h @@ -19,16 +19,28 @@ public: RequestHandler(); /** - * @brief Create table method - * - * This method is used to create table - * - * @param param, use to provide table information to be created. - * - * - * @param param - */ - void CreateTable(const ::milvus::thrift::TableSchema& param); + * @brief Create table method + * + * This method is used to create table + * + * @param param, use to provide table information to be created. + * + * + * @param param + */ + void CreateTable(const ::milvus::thrift::TableSchema ¶m); + + /** + * @brief Test table existence method + * + * This method is used to test table existence. + * + * @param table_name, table name is going to be tested. + * + * + * @param table_name + */ + bool HasTable(const std::string &table_name); /** * @brief Delete table method diff --git a/cpp/src/server/RequestScheduler.cpp b/cpp/src/server/RequestScheduler.cpp index 99ae36701e..36df155b62 100644 --- a/cpp/src/server/RequestScheduler.cpp +++ b/cpp/src/server/RequestScheduler.cpp @@ -18,35 +18,35 @@ using namespace ::milvus; namespace { const std::map &ErrorMap() { static const std::map code_map = { - {SERVER_UNEXPECTED_ERROR, thrift::ErrorCode::ILLEGAL_ARGUMENT}, - {SERVER_NULL_POINTER, thrift::ErrorCode::ILLEGAL_ARGUMENT}, + {SERVER_UNEXPECTED_ERROR, thrift::ErrorCode::UNEXPECTED_ERROR}, + {SERVER_UNSUPPORTED_ERROR, thrift::ErrorCode::UNEXPECTED_ERROR}, + {SERVER_NULL_POINTER, thrift::ErrorCode::UNEXPECTED_ERROR}, {SERVER_INVALID_ARGUMENT, thrift::ErrorCode::ILLEGAL_ARGUMENT}, - {SERVER_FILE_NOT_FOUND, thrift::ErrorCode::ILLEGAL_ARGUMENT}, - {SERVER_NOT_IMPLEMENT, thrift::ErrorCode::ILLEGAL_ARGUMENT}, - {SERVER_BLOCKING_QUEUE_EMPTY, thrift::ErrorCode::ILLEGAL_ARGUMENT}, + {SERVER_FILE_NOT_FOUND, thrift::ErrorCode::FILE_NOT_FOUND}, + {SERVER_NOT_IMPLEMENT, thrift::ErrorCode::UNEXPECTED_ERROR}, + {SERVER_BLOCKING_QUEUE_EMPTY, thrift::ErrorCode::UNEXPECTED_ERROR}, + {SERVER_CANNOT_CREATE_FOLDER, thrift::ErrorCode::CANNOT_CREATE_FOLDER}, + {SERVER_CANNOT_CREATE_FILE, thrift::ErrorCode::CANNOT_CREATE_FILE}, + {SERVER_CANNOT_DELETE_FOLDER, thrift::ErrorCode::CANNOT_DELETE_FOLDER}, + {SERVER_CANNOT_DELETE_FILE, thrift::ErrorCode::CANNOT_DELETE_FILE}, {SERVER_TABLE_NOT_EXIST, thrift::ErrorCode::TABLE_NOT_EXISTS}, + {SERVER_INVALID_TABLE_NAME, thrift::ErrorCode::ILLEGAL_TABLE_NAME}, + {SERVER_INVALID_TABLE_DIMENSION, thrift::ErrorCode::ILLEGAL_DIMENSION}, {SERVER_INVALID_TIME_RANGE, thrift::ErrorCode::ILLEGAL_RANGE}, {SERVER_INVALID_VECTOR_DIMENSION, thrift::ErrorCode::ILLEGAL_DIMENSION}, + + {SERVER_INVALID_INDEX_TYPE, thrift::ErrorCode::ILLEGAL_INDEX_TYPE}, + {SERVER_INVALID_ROWRECORD, thrift::ErrorCode::ILLEGAL_ROWRECORD}, + {SERVER_INVALID_ROWRECORD_ARRAY, thrift::ErrorCode::ILLEGAL_ROWRECORD}, + {SERVER_INVALID_TOPK, thrift::ErrorCode::ILLEGAL_TOPK}, + {SERVER_ILLEGAL_VECTOR_ID, thrift::ErrorCode::ILLEGAL_VECTOR_ID}, + {SERVER_ILLEGAL_SEARCH_RESULT, thrift::ErrorCode::ILLEGAL_SEARCH_RESULT}, + {SERVER_CACHE_ERROR, thrift::ErrorCode::CACHE_FAILED}, + {DB_META_TRANSACTION_FAILED, thrift::ErrorCode::META_FAILED}, }; return code_map; } - - const std::map &ErrorMessage() { - static const std::map msg_map = { - {SERVER_UNEXPECTED_ERROR, "unexpected error occurs"}, - {SERVER_NULL_POINTER, "null pointer error"}, - {SERVER_INVALID_ARGUMENT, "invalid argument"}, - {SERVER_FILE_NOT_FOUND, "file not found"}, - {SERVER_NOT_IMPLEMENT, "not implemented"}, - {SERVER_BLOCKING_QUEUE_EMPTY, "queue empty"}, - {SERVER_TABLE_NOT_EXIST, "table not exist"}, - {SERVER_INVALID_TIME_RANGE, "invalid time range"}, - {SERVER_INVALID_VECTOR_DIMENSION, "invalid vector dimension"}, - }; - - return msg_map; - } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -69,6 +69,14 @@ ServerError BaseTask::Execute() { return error_code_; } +ServerError BaseTask::SetError(ServerError error_code, const std::string& error_msg) { + error_code_ = error_code; + error_msg_ = error_msg; + + SERVER_LOG_ERROR << error_msg_; + return error_code_; +} + ServerError BaseTask::WaitToFinish() { std::unique_lock lock(finish_mtx_); finish_cond_.wait(lock, [this] { return done_; }); @@ -102,7 +110,7 @@ void RequestScheduler::ExecTask(BaseTaskPtr& task_ptr) { ex.__set_code(ErrorMap().at(err)); std::string msg = task_ptr->ErrorMsg(); if(msg.empty()){ - msg = ErrorMessage().at(err); + msg = "Error message not set"; } ex.__set_reason(msg); throw ex; diff --git a/cpp/src/server/RequestScheduler.h b/cpp/src/server/RequestScheduler.h index b7562cc367..d4b1e1c826 100644 --- a/cpp/src/server/RequestScheduler.h +++ b/cpp/src/server/RequestScheduler.h @@ -34,6 +34,8 @@ public: protected: virtual ServerError OnExecute() = 0; + ServerError SetError(ServerError error_code, const std::string& msg); + protected: mutable std::mutex finish_mtx_; std::condition_variable finish_cond_; diff --git a/cpp/src/server/RequestTask.cpp b/cpp/src/server/RequestTask.cpp index 77396d0046..5bf2165cfe 100644 --- a/cpp/src/server/RequestTask.cpp +++ b/cpp/src/server/RequestTask.cpp @@ -53,26 +53,27 @@ namespace { return map_type[type]; } - ServerError + void ConvertRowRecordToFloatArray(const std::vector& record_array, uint64_t dimension, - std::vector& float_array) { - ServerError error_code; + std::vector& float_array, + ServerError& error_code, + std::string& error_msg) { uint64_t vec_count = record_array.size(); float_array.resize(vec_count*dimension);//allocate enough memory for(uint64_t i = 0; i < vec_count; i++) { const auto& record = record_array[i]; if(record.vector_data.empty()) { - error_code = SERVER_INVALID_ARGUMENT; - SERVER_LOG_ERROR << "No vector provided in record"; - return error_code; + error_code = SERVER_INVALID_ROWRECORD; + error_msg = "Rowrecord float array is empty"; + return; } uint64_t vec_dim = record.vector_data.size()/sizeof(double);//how many double value? if(vec_dim != dimension) { - SERVER_LOG_ERROR << "Invalid vector dimension: " << vec_dim - << " vs. group dimension:" << dimension; error_code = SERVER_INVALID_VECTOR_DIMENSION; - return error_code; + error_msg = "Invalid rowrecord dimension: " + std::to_string(vec_dim) + + " vs. table dimension:" + std::to_string(dimension); + return; } //convert double array to float array(thrift has no float type) @@ -81,30 +82,29 @@ namespace { float_array[i*vec_dim + d] = (float)(d_p[d]); } } - - return SERVER_SUCCESS; } static constexpr long DAY_SECONDS = 86400; - ServerError + void ConvertTimeRangeToDBDates(const std::vector &range_array, - std::vector& dates) { + std::vector& dates, + ServerError& error_code, + std::string& error_msg) { dates.clear(); - ServerError error_code; for(auto& range : range_array) { time_t tt_start, tt_end; tm tm_start, tm_end; if(!CommonUtil::TimeStrToTime(range.start_value, tt_start, tm_start)){ error_code = SERVER_INVALID_TIME_RANGE; - SERVER_LOG_ERROR << "Invalid time range: " << range.start_value; - return error_code; + error_msg = "Invalid time range: " + range.start_value; + return; } if(!CommonUtil::TimeStrToTime(range.end_value, tt_end, tm_end)){ error_code = SERVER_INVALID_TIME_RANGE; - SERVER_LOG_ERROR << "Invalid time range: " << range.end_value; - return error_code; + error_msg = "Invalid time range: " + range.start_value; + return; } long days = (tt_end > tt_start) ? (tt_end - tt_start)/DAY_SECONDS : (tt_start - tt_end)/DAY_SECONDS; @@ -117,8 +117,6 @@ namespace { dates.push_back(date); } } - - return SERVER_SUCCESS; } } @@ -138,21 +136,16 @@ ServerError CreateTableTask::OnExecute() { try { //step 1: check arguments - if(schema_.table_name.empty() || schema_.dimension <= 0) { - error_code_ = SERVER_INVALID_ARGUMENT; -// error_msg_ = schema_.table_name.empty() ? - error_msg_ = "CreateTableTask: Invalid table name or dimension. table name = " + schema_.table_name - + "dimension = " + std::to_string(schema_.dimension); - SERVER_LOG_ERROR << error_msg_; - return error_code_; + if(schema_.table_name.empty()) { + return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name"); + } + if(schema_.dimension <= 0) { + return SetError(SERVER_INVALID_TABLE_DIMENSION, "Invalid table dimension: " + std::to_string(schema_.dimension)); } engine::EngineType engine_type = EngineType(schema_.index_type); if(engine_type == engine::EngineType::INVALID) { - error_code_ = SERVER_INVALID_ARGUMENT; - error_msg_ = "CreateTableTask: Invalid index type. type = " + std::to_string(schema_.index_type); - SERVER_LOG_ERROR << error_msg_; - return error_code_; + return SetError(SERVER_INVALID_INDEX_TYPE, "Invalid index type: " + std::to_string(schema_.index_type)); } //step 2: construct table schema @@ -165,17 +158,11 @@ ServerError CreateTableTask::OnExecute() { //step 3: create table engine::Status stat = DBWrapper::DB()->CreateTable(table_info); if(!stat.ok()) {//table could exist - error_code_ = SERVER_UNEXPECTED_ERROR; - error_msg_ = "CreateTableTask: Engine failed: " + stat.ToString(); - SERVER_LOG_ERROR << error_msg_; - return error_code_; + return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); } } catch (std::exception& ex) { - error_code_ = SERVER_UNEXPECTED_ERROR; - error_msg_ = ex.what(); - SERVER_LOG_ERROR << "CreateTableTask: " << error_msg_; - return error_code_; + return SetError(SERVER_UNEXPECTED_ERROR, ex.what()); } rc.Record("done"); @@ -201,10 +188,7 @@ ServerError DescribeTableTask::OnExecute() { try { //step 1: check arguments if(table_name_.empty()) { - error_code_ = SERVER_INVALID_ARGUMENT; - error_msg_ = "DescribeTableTask: Table name cannot be empty"; - SERVER_LOG_ERROR << error_msg_; - return error_code_; + return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name"); } //step 2: get table info @@ -212,10 +196,7 @@ ServerError DescribeTableTask::OnExecute() { table_info.table_id_ = table_name_; engine::Status stat = DBWrapper::DB()->DescribeTable(table_info); if(!stat.ok()) { - error_code_ = SERVER_TABLE_NOT_EXIST; - error_msg_ = "DescribeTableTask: Engine failed: " + stat.ToString(); - SERVER_LOG_ERROR << error_msg_; - return error_code_; + return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); } schema_.table_name = table_info.table_id_; @@ -224,10 +205,7 @@ ServerError DescribeTableTask::OnExecute() { schema_.store_raw_vector = table_info.store_raw_data_; } catch (std::exception& ex) { - error_code_ = SERVER_UNEXPECTED_ERROR; - error_msg_ = ex.what(); - SERVER_LOG_ERROR << "DescribeTableTask: " << error_msg_; - return SERVER_UNEXPECTED_ERROR; + return SetError(SERVER_UNEXPECTED_ERROR, ex.what()); } rc.Record("done"); @@ -235,6 +213,41 @@ ServerError DescribeTableTask::OnExecute() { return SERVER_SUCCESS; } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +HasTableTask::HasTableTask(const std::string& table_name, bool& has_table) + : BaseTask(DDL_DML_TASK_GROUP), + table_name_(table_name), + has_table_(has_table) { + +} + +BaseTaskPtr HasTableTask::Create(const std::string& table_name, bool& has_table) { + return std::shared_ptr(new HasTableTask(table_name, has_table)); +} + +ServerError HasTableTask::OnExecute() { + try { + TimeRecorder rc("HasTableTask"); + + //step 1: check arguments + if(table_name_.empty()) { + return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name"); + } + + //step 2: check table existence + engine::Status stat = DBWrapper::DB()->HasTable(table_name_, has_table_); + if(!stat.ok()) { + return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); + } + + rc.Elapse("totally cost"); + } catch (std::exception& ex) { + return SetError(SERVER_UNEXPECTED_ERROR, ex.what()); + } + + return SERVER_SUCCESS; +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// DeleteTableTask::DeleteTableTask(const std::string& table_name) : BaseTask(DDL_DML_TASK_GROUP), @@ -242,8 +255,8 @@ DeleteTableTask::DeleteTableTask(const std::string& table_name) } -BaseTaskPtr DeleteTableTask::Create(const std::string& group_id) { - return std::shared_ptr(new DeleteTableTask(group_id)); +BaseTaskPtr DeleteTableTask::Create(const std::string& table_name) { + return std::shared_ptr(new DeleteTableTask(table_name)); } ServerError DeleteTableTask::OnExecute() { @@ -252,10 +265,7 @@ ServerError DeleteTableTask::OnExecute() { //step 1: check arguments if (table_name_.empty()) { - error_code_ = SERVER_INVALID_ARGUMENT; - error_msg_ = "DeleteTableTask: Table name cannot be empty"; - SERVER_LOG_ERROR << error_msg_; - return error_code_; + return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name"); } //step 2: check table existence @@ -263,10 +273,11 @@ ServerError DeleteTableTask::OnExecute() { table_info.table_id_ = table_name_; engine::Status stat = DBWrapper::DB()->DescribeTable(table_info); if(!stat.ok()) { - error_code_ = SERVER_TABLE_NOT_EXIST; - error_msg_ = "DeleteTableTask: Engine failed: " + stat.ToString(); - SERVER_LOG_ERROR << error_msg_; - return error_code_; + if(stat.IsNotFound()) { + return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists"); + } else { + return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); + } } rc.Record("check validation"); @@ -275,17 +286,13 @@ ServerError DeleteTableTask::OnExecute() { std::vector dates; stat = DBWrapper::DB()->DeleteTable(table_name_, dates); if(!stat.ok()) { - SERVER_LOG_ERROR << "DeleteTableTask: Engine failed: " << stat.ToString(); - return SERVER_UNEXPECTED_ERROR; + return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); } rc.Record("deleta table"); - rc.Elapse("total cost"); + rc.Elapse("totally cost"); } catch (std::exception& ex) { - error_code_ = SERVER_UNEXPECTED_ERROR; - error_msg_ = ex.what(); - SERVER_LOG_ERROR << "DeleteTableTask: " << error_msg_; - return error_code_; + return SetError(SERVER_UNEXPECTED_ERROR, ex.what()); } return SERVER_SUCCESS; @@ -306,10 +313,7 @@ ServerError ShowTablesTask::OnExecute() { std::vector schema_array; engine::Status stat = DBWrapper::DB()->AllTables(schema_array); if(!stat.ok()) { - error_code_ = SERVER_UNEXPECTED_ERROR; - error_msg_ = "ShowTablesTask: Engine failed: " + stat.ToString(); - SERVER_LOG_ERROR << error_msg_; - return error_code_; + return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); } tables_.clear(); @@ -343,17 +347,11 @@ ServerError AddVectorTask::OnExecute() { //step 1: check arguments if (table_name_.empty()) { - error_code_ = SERVER_INVALID_ARGUMENT; - error_msg_ = "AddVectorTask: Table name cannot be empty"; - SERVER_LOG_ERROR << error_msg_; - return error_code_; + return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name"); } if(record_array_.empty()) { - error_code_ = SERVER_INVALID_ARGUMENT; - error_msg_ = "AddVectorTask: Row record array is empty"; - SERVER_LOG_ERROR << error_msg_; - return error_code_; + return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record array is empty"); } //step 2: check table existence @@ -361,20 +359,22 @@ ServerError AddVectorTask::OnExecute() { table_info.table_id_ = table_name_; engine::Status stat = DBWrapper::DB()->DescribeTable(table_info); if(!stat.ok()) { - error_code_ = SERVER_TABLE_NOT_EXIST; - error_msg_ = "AddVectorTask: Engine failed when DescribeTable: " + stat.ToString(); - SERVER_LOG_ERROR << error_msg_; - return error_code_; + if(stat.IsNotFound()) { + return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists"); + } else { + return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); + } } rc.Record("check validation"); //step 3: prepare float data std::vector vec_f; - error_code_ = ConvertRowRecordToFloatArray(record_array_, table_info.dimension_, vec_f); - if(error_code_ != SERVER_SUCCESS) { - error_msg_ = "AddVectorTask when ConvertRowRecordToFloatArray: Invalid row record data"; - return error_code_; + ServerError error_code = SERVER_SUCCESS; + std::string error_msg; + ConvertRowRecordToFloatArray(record_array_, table_info.dimension_, vec_f, error_code, error_msg); + if(error_code != SERVER_SUCCESS) { + return SetError(error_code, error_msg); } rc.Record("prepare vectors data"); @@ -384,25 +384,20 @@ ServerError AddVectorTask::OnExecute() { stat = DBWrapper::DB()->InsertVectors(table_name_, vec_count, vec_f.data(), record_ids_); rc.Record("add vectors to engine"); if(!stat.ok()) { - error_code_ = SERVER_UNEXPECTED_ERROR; - error_msg_ = "AddVectorTask: Engine failed when InsertVectors: " + stat.ToString(); - SERVER_LOG_ERROR << error_msg_; - return error_code_; + return SetError(SERVER_CACHE_ERROR, "Cache error: " + stat.ToString()); } if(record_ids_.size() != vec_count) { - SERVER_LOG_ERROR << "AddVectorTask: Vector ID not returned"; - return SERVER_UNEXPECTED_ERROR; + std::string msg = "Add " + std::to_string(vec_count) + " vectors but only return " + + std::to_string(record_ids_.size()) + " id"; + return SetError(SERVER_ILLEGAL_VECTOR_ID, msg); } rc.Record("do insert"); - rc.Elapse("total cost"); + rc.Elapse("totally cost"); } catch (std::exception& ex) { - error_code_ = SERVER_UNEXPECTED_ERROR; - error_msg_ = ex.what(); - SERVER_LOG_ERROR << "AddVectorTask: " << error_msg_; - return error_code_; + return SetError(SERVER_UNEXPECTED_ERROR, ex.what()); } return SERVER_SUCCESS; @@ -441,17 +436,14 @@ ServerError SearchVectorTask::OnExecute() { //step 1: check arguments if (table_name_.empty()) { - error_code_ = SERVER_INVALID_ARGUMENT; - error_msg_ = "SearchVectorTask: Table name cannot be empty"; - SERVER_LOG_ERROR << error_msg_; - return error_code_; + return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name"); } - if(top_k_ <= 0 || record_array_.empty()) { - error_code_ = SERVER_INVALID_ARGUMENT; - error_msg_ = "SearchVectorTask: Invalid topk value, or query record array is empty"; - SERVER_LOG_ERROR << error_msg_; - return error_code_; + if(top_k_ <= 0) { + return SetError(SERVER_INVALID_TOPK, "Invalid topk: " + std::to_string(top_k_)); + } + if(record_array_.empty()) { + return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record array is empty"); } //step 2: check table existence @@ -459,28 +451,29 @@ ServerError SearchVectorTask::OnExecute() { table_info.table_id_ = table_name_; engine::Status stat = DBWrapper::DB()->DescribeTable(table_info); if(!stat.ok()) { - error_code_ = SERVER_TABLE_NOT_EXIST; - error_msg_ = "SearchVectorTask: Engine failed when DescribeTable: " + stat.ToString(); - SERVER_LOG_ERROR << error_msg_; - return error_code_; + if(stat.IsNotFound()) { + return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists"); + } else { + return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); + } } //step 3: check date range, and convert to db dates std::vector dates; - error_code_ = ConvertTimeRangeToDBDates(range_array_, dates); - if(error_code_ != SERVER_SUCCESS) { - error_msg_ = "SearchVectorTask: Invalid query range when ConvertTimeRangeToDBDates"; - return error_code_; + ServerError error_code = SERVER_SUCCESS; + std::string error_msg; + ConvertTimeRangeToDBDates(range_array_, dates, error_code, error_msg); + if(error_code != SERVER_SUCCESS) { + return SetError(error_code, error_msg); } rc.Record("check validation"); //step 3: prepare float data std::vector vec_f; - error_code_ = ConvertRowRecordToFloatArray(record_array_, table_info.dimension_, vec_f); - if(error_code_ != SERVER_SUCCESS) { - error_msg_ = "Invalid row record data when ConvertRowRecordToFloatArray"; - return error_code_; + ConvertRowRecordToFloatArray(record_array_, table_info.dimension_, vec_f, error_code, error_msg); + if(error_code != SERVER_SUCCESS) { + return SetError(error_code, error_msg); } rc.Record("prepare vector data"); @@ -497,13 +490,17 @@ ServerError SearchVectorTask::OnExecute() { rc.Record("search vectors from engine"); if(!stat.ok()) { - SERVER_LOG_ERROR << "SearchVectorTask: Engine failed: " << stat.ToString(); - return SERVER_UNEXPECTED_ERROR; + return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); + } + + if(results.empty()) { + return SERVER_SUCCESS; //empty table } if(results.size() != record_count) { - SERVER_LOG_ERROR << "SearchVectorTask: Search result not returned"; - return SERVER_UNEXPECTED_ERROR; + std::string msg = "Search " + std::to_string(record_count) + " vectors but only return " + + std::to_string(results.size()) + " results"; + return SetError(SERVER_ILLEGAL_SEARCH_RESULT, msg); } rc.Record("do search"); @@ -525,13 +522,10 @@ ServerError SearchVectorTask::OnExecute() { result_array_.emplace_back(thrift_topk_result); } rc.Record("construct result"); - rc.Elapse("total cost"); + rc.Elapse("totally cost"); } catch (std::exception& ex) { - error_code_ = SERVER_UNEXPECTED_ERROR; - error_msg_ = ex.what(); - SERVER_LOG_ERROR << "SearchVectorTask: " << error_msg_; - return error_code_; + return SetError(SERVER_UNEXPECTED_ERROR, ex.what()); } return SERVER_SUCCESS; @@ -555,31 +549,22 @@ ServerError GetTableRowCountTask::OnExecute() { //step 1: check arguments if (table_name_.empty()) { - error_code_ = SERVER_INVALID_ARGUMENT; - error_msg_ = "GetTableRowCountTask: Table name cannot be empty"; - SERVER_LOG_ERROR << error_msg_; - return error_code_; + return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name"); } //step 2: get row count uint64_t row_count = 0; engine::Status stat = DBWrapper::DB()->GetTableRowCount(table_name_, row_count); if (!stat.ok()) { - error_code_ = SERVER_UNEXPECTED_ERROR; - error_msg_ = "GetTableRowCountTask: Engine failed: " + stat.ToString(); - SERVER_LOG_ERROR << error_msg_; - return error_code_; + return SetError(DB_META_TRANSACTION_FAILED, "Engine failed: " + stat.ToString()); } row_count_ = (int64_t) row_count; - rc.Elapse("total cost"); + rc.Elapse("totally cost"); } catch (std::exception& ex) { - error_code_ = SERVER_UNEXPECTED_ERROR; - error_msg_ = ex.what(); - SERVER_LOG_ERROR << "GetTableRowCountTask: " << error_msg_; - return error_code_; + return SetError(SERVER_UNEXPECTED_ERROR, ex.what()); } return SERVER_SUCCESS; diff --git a/cpp/src/server/RequestTask.h b/cpp/src/server/RequestTask.h index b4ddc69726..3061b3b75d 100644 --- a/cpp/src/server/RequestTask.h +++ b/cpp/src/server/RequestTask.h @@ -33,6 +33,22 @@ private: const ::milvus::thrift::TableSchema& schema_; }; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +class HasTableTask : public BaseTask { +public: + static BaseTaskPtr Create(const std::string& table_name, bool& has_table); + +protected: + HasTableTask(const std::string& table_name, bool& has_table); + + ServerError OnExecute() override; + + +private: + std::string table_name_; + bool& has_table_; +}; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class DescribeTableTask : public BaseTask { public: diff --git a/cpp/src/server/ServerConfig.h b/cpp/src/server/ServerConfig.h index aa77b081d7..f337275a46 100644 --- a/cpp/src/server/ServerConfig.h +++ b/cpp/src/server/ServerConfig.h @@ -25,6 +25,8 @@ static const std::string CONFIG_DB = "db_config"; static const std::string CONFIG_DB_URL = "db_backend_url"; static const std::string CONFIG_DB_PATH = "db_path"; static const std::string CONFIG_DB_INDEX_TRIGGER_SIZE = "index_building_threshold"; +static const std::string CONFIG_DB_ARCHIVE_DISK = "archive_disk_threshold"; +static const std::string CONFIG_DB_ARCHIVE_DAYS = "archive_days_threshold"; static const std::string CONFIG_LOG = "log_config"; diff --git a/cpp/src/thrift/gen-cpp/MilvusService.cpp b/cpp/src/thrift/gen-cpp/MilvusService.cpp index 7e0a120bc0..7a591276f6 100644 --- a/cpp/src/thrift/gen-cpp/MilvusService.cpp +++ b/cpp/src/thrift/gen-cpp/MilvusService.cpp @@ -196,6 +196,213 @@ uint32_t MilvusService_CreateTable_presult::read(::apache::thrift::protocol::TPr } +MilvusService_HasTable_args::~MilvusService_HasTable_args() throw() { +} + + +uint32_t MilvusService_HasTable_args::read(::apache::thrift::protocol::TProtocol* iprot) { + + ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 2: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->table_name); + this->__isset.table_name = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t MilvusService_HasTable_args::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("MilvusService_HasTable_args"); + + xfer += oprot->writeFieldBegin("table_name", ::apache::thrift::protocol::T_STRING, 2); + xfer += oprot->writeString(this->table_name); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + + +MilvusService_HasTable_pargs::~MilvusService_HasTable_pargs() throw() { +} + + +uint32_t MilvusService_HasTable_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("MilvusService_HasTable_pargs"); + + xfer += oprot->writeFieldBegin("table_name", ::apache::thrift::protocol::T_STRING, 2); + xfer += oprot->writeString((*(this->table_name))); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + + +MilvusService_HasTable_result::~MilvusService_HasTable_result() throw() { +} + + +uint32_t MilvusService_HasTable_result::read(::apache::thrift::protocol::TProtocol* iprot) { + + ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 0: + if (ftype == ::apache::thrift::protocol::T_BOOL) { + xfer += iprot->readBool(this->success); + this->__isset.success = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 1: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->e.read(iprot); + this->__isset.e = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t MilvusService_HasTable_result::write(::apache::thrift::protocol::TProtocol* oprot) const { + + uint32_t xfer = 0; + + xfer += oprot->writeStructBegin("MilvusService_HasTable_result"); + + if (this->__isset.success) { + xfer += oprot->writeFieldBegin("success", ::apache::thrift::protocol::T_BOOL, 0); + xfer += oprot->writeBool(this->success); + xfer += oprot->writeFieldEnd(); + } else if (this->__isset.e) { + xfer += oprot->writeFieldBegin("e", ::apache::thrift::protocol::T_STRUCT, 1); + xfer += this->e.write(oprot); + xfer += oprot->writeFieldEnd(); + } + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + + +MilvusService_HasTable_presult::~MilvusService_HasTable_presult() throw() { +} + + +uint32_t MilvusService_HasTable_presult::read(::apache::thrift::protocol::TProtocol* iprot) { + + ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 0: + if (ftype == ::apache::thrift::protocol::T_BOOL) { + xfer += iprot->readBool((*(this->success))); + this->__isset.success = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 1: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->e.read(iprot); + this->__isset.e = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + + MilvusService_DeleteTable_args::~MilvusService_DeleteTable_args() throw() { } @@ -2290,6 +2497,67 @@ void MilvusServiceClient::recv_CreateTable() return; } +bool MilvusServiceClient::HasTable(const std::string& table_name) +{ + send_HasTable(table_name); + return recv_HasTable(); +} + +void MilvusServiceClient::send_HasTable(const std::string& table_name) +{ + int32_t cseqid = 0; + oprot_->writeMessageBegin("HasTable", ::apache::thrift::protocol::T_CALL, cseqid); + + MilvusService_HasTable_pargs args; + args.table_name = &table_name; + args.write(oprot_); + + oprot_->writeMessageEnd(); + oprot_->getTransport()->writeEnd(); + oprot_->getTransport()->flush(); +} + +bool MilvusServiceClient::recv_HasTable() +{ + + int32_t rseqid = 0; + std::string fname; + ::apache::thrift::protocol::TMessageType mtype; + + iprot_->readMessageBegin(fname, mtype, rseqid); + if (mtype == ::apache::thrift::protocol::T_EXCEPTION) { + ::apache::thrift::TApplicationException x; + x.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw x; + } + if (mtype != ::apache::thrift::protocol::T_REPLY) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + } + if (fname.compare("HasTable") != 0) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + } + bool _return; + MilvusService_HasTable_presult result; + result.success = &_return; + result.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + + if (result.__isset.success) { + return _return; + } + if (result.__isset.e) { + throw result.e; + } + throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "HasTable failed: unknown result"); +} + void MilvusServiceClient::DeleteTable(const std::string& table_name) { send_DeleteTable(table_name); @@ -2855,6 +3123,63 @@ void MilvusServiceProcessor::process_CreateTable(int32_t seqid, ::apache::thrift } } +void MilvusServiceProcessor::process_HasTable(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext) +{ + void* ctx = NULL; + if (this->eventHandler_.get() != NULL) { + ctx = this->eventHandler_->getContext("MilvusService.HasTable", callContext); + } + ::apache::thrift::TProcessorContextFreer freer(this->eventHandler_.get(), ctx, "MilvusService.HasTable"); + + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->preRead(ctx, "MilvusService.HasTable"); + } + + MilvusService_HasTable_args args; + args.read(iprot); + iprot->readMessageEnd(); + uint32_t bytes = iprot->getTransport()->readEnd(); + + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->postRead(ctx, "MilvusService.HasTable", bytes); + } + + MilvusService_HasTable_result result; + try { + result.success = iface_->HasTable(args.table_name); + result.__isset.success = true; + } catch (Exception &e) { + result.e = e; + result.__isset.e = true; + } catch (const std::exception& e) { + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->handlerError(ctx, "MilvusService.HasTable"); + } + + ::apache::thrift::TApplicationException x(e.what()); + oprot->writeMessageBegin("HasTable", ::apache::thrift::protocol::T_EXCEPTION, seqid); + x.write(oprot); + oprot->writeMessageEnd(); + oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + return; + } + + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->preWrite(ctx, "MilvusService.HasTable"); + } + + oprot->writeMessageBegin("HasTable", ::apache::thrift::protocol::T_REPLY, seqid); + result.write(oprot); + oprot->writeMessageEnd(); + bytes = oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->postWrite(ctx, "MilvusService.HasTable", bytes); + } +} + void MilvusServiceProcessor::process_DeleteTable(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext) { void* ctx = NULL; @@ -3399,6 +3724,94 @@ void MilvusServiceConcurrentClient::recv_CreateTable(const int32_t seqid) } // end while(true) } +bool MilvusServiceConcurrentClient::HasTable(const std::string& table_name) +{ + int32_t seqid = send_HasTable(table_name); + return recv_HasTable(seqid); +} + +int32_t MilvusServiceConcurrentClient::send_HasTable(const std::string& table_name) +{ + int32_t cseqid = this->sync_.generateSeqId(); + ::apache::thrift::async::TConcurrentSendSentry sentry(&this->sync_); + oprot_->writeMessageBegin("HasTable", ::apache::thrift::protocol::T_CALL, cseqid); + + MilvusService_HasTable_pargs args; + args.table_name = &table_name; + args.write(oprot_); + + oprot_->writeMessageEnd(); + oprot_->getTransport()->writeEnd(); + oprot_->getTransport()->flush(); + + sentry.commit(); + return cseqid; +} + +bool MilvusServiceConcurrentClient::recv_HasTable(const int32_t seqid) +{ + + int32_t rseqid = 0; + std::string fname; + ::apache::thrift::protocol::TMessageType mtype; + + // the read mutex gets dropped and reacquired as part of waitForWork() + // The destructor of this sentry wakes up other clients + ::apache::thrift::async::TConcurrentRecvSentry sentry(&this->sync_, seqid); + + while(true) { + if(!this->sync_.getPending(fname, mtype, rseqid)) { + iprot_->readMessageBegin(fname, mtype, rseqid); + } + if(seqid == rseqid) { + if (mtype == ::apache::thrift::protocol::T_EXCEPTION) { + ::apache::thrift::TApplicationException x; + x.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + sentry.commit(); + throw x; + } + if (mtype != ::apache::thrift::protocol::T_REPLY) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + } + if (fname.compare("HasTable") != 0) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + + // in a bad state, don't commit + using ::apache::thrift::protocol::TProtocolException; + throw TProtocolException(TProtocolException::INVALID_DATA); + } + bool _return; + MilvusService_HasTable_presult result; + result.success = &_return; + result.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + + if (result.__isset.success) { + sentry.commit(); + return _return; + } + if (result.__isset.e) { + sentry.commit(); + throw result.e; + } + // in a bad state, don't commit + throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "HasTable failed: unknown result"); + } + // seqid != rseqid + this->sync_.updatePending(fname, mtype, rseqid); + + // this will temporarily unlock the readMutex, and let other clients get work done + this->sync_.waitForWork(seqid); + } // end while(true) +} + void MilvusServiceConcurrentClient::DeleteTable(const std::string& table_name) { int32_t seqid = send_DeleteTable(table_name); diff --git a/cpp/src/thrift/gen-cpp/MilvusService.h b/cpp/src/thrift/gen-cpp/MilvusService.h index 4c1183de60..3868681463 100644 --- a/cpp/src/thrift/gen-cpp/MilvusService.h +++ b/cpp/src/thrift/gen-cpp/MilvusService.h @@ -34,6 +34,18 @@ class MilvusServiceIf { */ virtual void CreateTable(const TableSchema& param) = 0; + /** + * @brief Test table existence method + * + * This method is used to test table existence. + * + * @param table_name, table name is going to be tested. + * + * + * @param table_name + */ + virtual bool HasTable(const std::string& table_name) = 0; + /** * @brief Delete table method * @@ -178,6 +190,10 @@ class MilvusServiceNull : virtual public MilvusServiceIf { void CreateTable(const TableSchema& /* param */) { return; } + bool HasTable(const std::string& /* table_name */) { + bool _return = false; + return _return; + } void DeleteTable(const std::string& /* table_name */) { return; } @@ -309,6 +325,118 @@ class MilvusService_CreateTable_presult { }; +typedef struct _MilvusService_HasTable_args__isset { + _MilvusService_HasTable_args__isset() : table_name(false) {} + bool table_name :1; +} _MilvusService_HasTable_args__isset; + +class MilvusService_HasTable_args { + public: + + MilvusService_HasTable_args(const MilvusService_HasTable_args&); + MilvusService_HasTable_args& operator=(const MilvusService_HasTable_args&); + MilvusService_HasTable_args() : table_name() { + } + + virtual ~MilvusService_HasTable_args() throw(); + std::string table_name; + + _MilvusService_HasTable_args__isset __isset; + + void __set_table_name(const std::string& val); + + bool operator == (const MilvusService_HasTable_args & rhs) const + { + if (!(table_name == rhs.table_name)) + return false; + return true; + } + bool operator != (const MilvusService_HasTable_args &rhs) const { + return !(*this == rhs); + } + + bool operator < (const MilvusService_HasTable_args & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + + +class MilvusService_HasTable_pargs { + public: + + + virtual ~MilvusService_HasTable_pargs() throw(); + const std::string* table_name; + + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + +typedef struct _MilvusService_HasTable_result__isset { + _MilvusService_HasTable_result__isset() : success(false), e(false) {} + bool success :1; + bool e :1; +} _MilvusService_HasTable_result__isset; + +class MilvusService_HasTable_result { + public: + + MilvusService_HasTable_result(const MilvusService_HasTable_result&); + MilvusService_HasTable_result& operator=(const MilvusService_HasTable_result&); + MilvusService_HasTable_result() : success(0) { + } + + virtual ~MilvusService_HasTable_result() throw(); + bool success; + Exception e; + + _MilvusService_HasTable_result__isset __isset; + + void __set_success(const bool val); + + void __set_e(const Exception& val); + + bool operator == (const MilvusService_HasTable_result & rhs) const + { + if (!(success == rhs.success)) + return false; + if (!(e == rhs.e)) + return false; + return true; + } + bool operator != (const MilvusService_HasTable_result &rhs) const { + return !(*this == rhs); + } + + bool operator < (const MilvusService_HasTable_result & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + +typedef struct _MilvusService_HasTable_presult__isset { + _MilvusService_HasTable_presult__isset() : success(false), e(false) {} + bool success :1; + bool e :1; +} _MilvusService_HasTable_presult__isset; + +class MilvusService_HasTable_presult { + public: + + + virtual ~MilvusService_HasTable_presult() throw(); + bool* success; + Exception e; + + _MilvusService_HasTable_presult__isset __isset; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + +}; + typedef struct _MilvusService_DeleteTable_args__isset { _MilvusService_DeleteTable_args__isset() : table_name(false) {} bool table_name :1; @@ -1269,6 +1397,9 @@ class MilvusServiceClient : virtual public MilvusServiceIf { void CreateTable(const TableSchema& param); void send_CreateTable(const TableSchema& param); void recv_CreateTable(); + bool HasTable(const std::string& table_name); + void send_HasTable(const std::string& table_name); + bool recv_HasTable(); void DeleteTable(const std::string& table_name); void send_DeleteTable(const std::string& table_name); void recv_DeleteTable(); @@ -1309,6 +1440,7 @@ class MilvusServiceProcessor : public ::apache::thrift::TDispatchProcessor { typedef std::map ProcessMap; ProcessMap processMap_; void process_CreateTable(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); + void process_HasTable(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); void process_DeleteTable(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); void process_AddVector(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); void process_SearchVector(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); @@ -1321,6 +1453,7 @@ class MilvusServiceProcessor : public ::apache::thrift::TDispatchProcessor { MilvusServiceProcessor(::apache::thrift::stdcxx::shared_ptr iface) : iface_(iface) { processMap_["CreateTable"] = &MilvusServiceProcessor::process_CreateTable; + processMap_["HasTable"] = &MilvusServiceProcessor::process_HasTable; processMap_["DeleteTable"] = &MilvusServiceProcessor::process_DeleteTable; processMap_["AddVector"] = &MilvusServiceProcessor::process_AddVector; processMap_["SearchVector"] = &MilvusServiceProcessor::process_SearchVector; @@ -1366,6 +1499,15 @@ class MilvusServiceMultiface : virtual public MilvusServiceIf { ifaces_[i]->CreateTable(param); } + bool HasTable(const std::string& table_name) { + size_t sz = ifaces_.size(); + size_t i = 0; + for (; i < (sz - 1); ++i) { + ifaces_[i]->HasTable(table_name); + } + return ifaces_[i]->HasTable(table_name); + } + void DeleteTable(const std::string& table_name) { size_t sz = ifaces_.size(); size_t i = 0; @@ -1477,6 +1619,9 @@ class MilvusServiceConcurrentClient : virtual public MilvusServiceIf { void CreateTable(const TableSchema& param); int32_t send_CreateTable(const TableSchema& param); void recv_CreateTable(const int32_t seqid); + bool HasTable(const std::string& table_name); + int32_t send_HasTable(const std::string& table_name); + bool recv_HasTable(const int32_t seqid); void DeleteTable(const std::string& table_name); int32_t send_DeleteTable(const std::string& table_name); void recv_DeleteTable(const int32_t seqid); diff --git a/cpp/src/thrift/gen-cpp/MilvusService_server.skeleton.cpp b/cpp/src/thrift/gen-cpp/MilvusService_server.skeleton.cpp index 55d73b9642..ecb22c0b62 100644 --- a/cpp/src/thrift/gen-cpp/MilvusService_server.skeleton.cpp +++ b/cpp/src/thrift/gen-cpp/MilvusService_server.skeleton.cpp @@ -35,6 +35,21 @@ class MilvusServiceHandler : virtual public MilvusServiceIf { printf("CreateTable\n"); } + /** + * @brief Test table existence method + * + * This method is used to test table existence. + * + * @param table_name, table name is going to be tested. + * + * + * @param table_name + */ + bool HasTable(const std::string& table_name) { + // Your implementation goes here + printf("HasTable\n"); + } + /** * @brief Delete table method * diff --git a/cpp/src/thrift/gen-cpp/milvus_types.cpp b/cpp/src/thrift/gen-cpp/milvus_types.cpp index 203faa2e52..af77fda0ad 100644 --- a/cpp/src/thrift/gen-cpp/milvus_types.cpp +++ b/cpp/src/thrift/gen-cpp/milvus_types.cpp @@ -15,23 +15,51 @@ namespace milvus { namespace thrift { int _kErrorCodeValues[] = { ErrorCode::SUCCESS, + ErrorCode::UNEXPECTED_ERROR, ErrorCode::CONNECT_FAILED, ErrorCode::PERMISSION_DENIED, ErrorCode::TABLE_NOT_EXISTS, ErrorCode::ILLEGAL_ARGUMENT, ErrorCode::ILLEGAL_RANGE, - ErrorCode::ILLEGAL_DIMENSION + ErrorCode::ILLEGAL_DIMENSION, + ErrorCode::ILLEGAL_INDEX_TYPE, + ErrorCode::ILLEGAL_TABLE_NAME, + ErrorCode::ILLEGAL_TOPK, + ErrorCode::ILLEGAL_ROWRECORD, + ErrorCode::ILLEGAL_VECTOR_ID, + ErrorCode::ILLEGAL_SEARCH_RESULT, + ErrorCode::FILE_NOT_FOUND, + ErrorCode::META_FAILED, + ErrorCode::CACHE_FAILED, + ErrorCode::CANNOT_CREATE_FOLDER, + ErrorCode::CANNOT_CREATE_FILE, + ErrorCode::CANNOT_DELETE_FOLDER, + ErrorCode::CANNOT_DELETE_FILE }; const char* _kErrorCodeNames[] = { "SUCCESS", + "UNEXPECTED_ERROR", "CONNECT_FAILED", "PERMISSION_DENIED", "TABLE_NOT_EXISTS", "ILLEGAL_ARGUMENT", "ILLEGAL_RANGE", - "ILLEGAL_DIMENSION" + "ILLEGAL_DIMENSION", + "ILLEGAL_INDEX_TYPE", + "ILLEGAL_TABLE_NAME", + "ILLEGAL_TOPK", + "ILLEGAL_ROWRECORD", + "ILLEGAL_VECTOR_ID", + "ILLEGAL_SEARCH_RESULT", + "FILE_NOT_FOUND", + "META_FAILED", + "CACHE_FAILED", + "CANNOT_CREATE_FOLDER", + "CANNOT_CREATE_FILE", + "CANNOT_DELETE_FOLDER", + "CANNOT_DELETE_FILE" }; -const std::map _ErrorCode_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(7, _kErrorCodeValues, _kErrorCodeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map _ErrorCode_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(21, _kErrorCodeValues, _kErrorCodeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); std::ostream& operator<<(std::ostream& out, const ErrorCode::type& val) { std::map::const_iterator it = _ErrorCode_VALUES_TO_NAMES.find(val); diff --git a/cpp/src/thrift/gen-cpp/milvus_types.h b/cpp/src/thrift/gen-cpp/milvus_types.h index ea4178aa99..8840e94fd9 100644 --- a/cpp/src/thrift/gen-cpp/milvus_types.h +++ b/cpp/src/thrift/gen-cpp/milvus_types.h @@ -23,12 +23,26 @@ namespace milvus { namespace thrift { struct ErrorCode { enum type { SUCCESS = 0, - CONNECT_FAILED = 1, - PERMISSION_DENIED = 2, - TABLE_NOT_EXISTS = 3, - ILLEGAL_ARGUMENT = 4, - ILLEGAL_RANGE = 5, - ILLEGAL_DIMENSION = 6 + UNEXPECTED_ERROR = 1, + CONNECT_FAILED = 2, + PERMISSION_DENIED = 3, + TABLE_NOT_EXISTS = 4, + ILLEGAL_ARGUMENT = 5, + ILLEGAL_RANGE = 6, + ILLEGAL_DIMENSION = 7, + ILLEGAL_INDEX_TYPE = 8, + ILLEGAL_TABLE_NAME = 9, + ILLEGAL_TOPK = 10, + ILLEGAL_ROWRECORD = 11, + ILLEGAL_VECTOR_ID = 12, + ILLEGAL_SEARCH_RESULT = 13, + FILE_NOT_FOUND = 14, + META_FAILED = 15, + CACHE_FAILED = 16, + CANNOT_CREATE_FOLDER = 17, + CANNOT_CREATE_FILE = 18, + CANNOT_DELETE_FOLDER = 19, + CANNOT_DELETE_FILE = 20 }; }; diff --git a/cpp/src/thrift/milvus.thrift b/cpp/src/thrift/milvus.thrift index 2936b85a5c..88ba223f02 100644 --- a/cpp/src/thrift/milvus.thrift +++ b/cpp/src/thrift/milvus.thrift @@ -15,12 +15,26 @@ namespace netcore milvus.thrift enum ErrorCode { SUCCESS = 0, + UNEXPECTED_ERROR, CONNECT_FAILED, PERMISSION_DENIED, TABLE_NOT_EXISTS, ILLEGAL_ARGUMENT, ILLEGAL_RANGE, ILLEGAL_DIMENSION, + ILLEGAL_INDEX_TYPE, + ILLEGAL_TABLE_NAME, + ILLEGAL_TOPK, + ILLEGAL_ROWRECORD, + ILLEGAL_VECTOR_ID, + ILLEGAL_SEARCH_RESULT, + FILE_NOT_FOUND, + META_FAILED, + CACHE_FAILED, + CANNOT_CREATE_FOLDER, + CANNOT_CREATE_FILE, + CANNOT_DELETE_FOLDER, + CANNOT_DELETE_FILE, } exception Exception { @@ -80,6 +94,16 @@ service MilvusService { */ void CreateTable(2: TableSchema param) throws(1: Exception e); + /** + * @brief Test table existence method + * + * This method is used to test table existence. + * + * @param table_name, table name is going to be tested. + * + */ + bool HasTable(2: string table_name) throws(1: Exception e); + /** * @brief Delete table method diff --git a/cpp/src/utils/Error.h b/cpp/src/utils/Error.h index bfb19f47a9..8c4da70339 100644 --- a/cpp/src/utils/Error.h +++ b/cpp/src/utils/Error.h @@ -24,18 +24,35 @@ ToGlobalServerErrorCode(const ServerError error_code) { return SERVER_ERROR_CODE_BASE + error_code; } -constexpr ServerError SERVER_UNEXPECTED_ERROR = ToGlobalServerErrorCode(0x001); -constexpr ServerError SERVER_UNSUPPORTED_ERROR = ToGlobalServerErrorCode(0x002); -constexpr ServerError SERVER_NULL_POINTER = ToGlobalServerErrorCode(0x003); -constexpr ServerError SERVER_INVALID_ARGUMENT = ToGlobalServerErrorCode(0x004); -constexpr ServerError SERVER_FILE_NOT_FOUND = ToGlobalServerErrorCode(0x005); -constexpr ServerError SERVER_NOT_IMPLEMENT = ToGlobalServerErrorCode(0x006); -constexpr ServerError SERVER_BLOCKING_QUEUE_EMPTY = ToGlobalServerErrorCode(0x007); -constexpr ServerError SERVER_TABLE_NOT_EXIST = ToGlobalServerErrorCode(0x008); -constexpr ServerError SERVER_INVALID_TIME_RANGE = ToGlobalServerErrorCode(0x009); -constexpr ServerError SERVER_INVALID_VECTOR_DIMENSION = ToGlobalServerErrorCode(0x00a); -constexpr ServerError SERVER_LICENSE_VALIDATION_FAIL = ToGlobalServerErrorCode(0x00b); -constexpr ServerError SERVER_LICENSE_FILE_NOT_EXIST = ToGlobalServerErrorCode(0x00c); +constexpr ServerError SERVER_UNEXPECTED_ERROR = ToGlobalServerErrorCode(1); +constexpr ServerError SERVER_UNSUPPORTED_ERROR = ToGlobalServerErrorCode(2); +constexpr ServerError SERVER_NULL_POINTER = ToGlobalServerErrorCode(3); +constexpr ServerError SERVER_INVALID_ARGUMENT = ToGlobalServerErrorCode(4); +constexpr ServerError SERVER_FILE_NOT_FOUND = ToGlobalServerErrorCode(5); +constexpr ServerError SERVER_NOT_IMPLEMENT = ToGlobalServerErrorCode(6); +constexpr ServerError SERVER_BLOCKING_QUEUE_EMPTY = ToGlobalServerErrorCode(7); +constexpr ServerError SERVER_CANNOT_CREATE_FOLDER = ToGlobalServerErrorCode(8); +constexpr ServerError SERVER_CANNOT_CREATE_FILE = ToGlobalServerErrorCode(9); +constexpr ServerError SERVER_CANNOT_DELETE_FOLDER = ToGlobalServerErrorCode(10); +constexpr ServerError SERVER_CANNOT_DELETE_FILE = ToGlobalServerErrorCode(11); + +constexpr ServerError SERVER_TABLE_NOT_EXIST = ToGlobalServerErrorCode(100); +constexpr ServerError SERVER_INVALID_TABLE_NAME = ToGlobalServerErrorCode(101); +constexpr ServerError SERVER_INVALID_TABLE_DIMENSION = ToGlobalServerErrorCode(102); +constexpr ServerError SERVER_INVALID_TIME_RANGE = ToGlobalServerErrorCode(103); +constexpr ServerError SERVER_INVALID_VECTOR_DIMENSION = ToGlobalServerErrorCode(104); +constexpr ServerError SERVER_INVALID_INDEX_TYPE = ToGlobalServerErrorCode(105); +constexpr ServerError SERVER_INVALID_ROWRECORD = ToGlobalServerErrorCode(106); +constexpr ServerError SERVER_INVALID_ROWRECORD_ARRAY = ToGlobalServerErrorCode(107); +constexpr ServerError SERVER_INVALID_TOPK = ToGlobalServerErrorCode(108); +constexpr ServerError SERVER_ILLEGAL_VECTOR_ID = ToGlobalServerErrorCode(109); +constexpr ServerError SERVER_ILLEGAL_SEARCH_RESULT = ToGlobalServerErrorCode(110); +constexpr ServerError SERVER_CACHE_ERROR = ToGlobalServerErrorCode(111); + +constexpr ServerError SERVER_LICENSE_FILE_NOT_EXIST = ToGlobalServerErrorCode(500); +constexpr ServerError SERVER_LICENSE_VALIDATION_FAIL = ToGlobalServerErrorCode(501); + +constexpr ServerError DB_META_TRANSACTION_FAILED = ToGlobalServerErrorCode(1000); class ServerException : public std::exception { public: diff --git a/cpp/unittest/db/meta_tests.cpp b/cpp/unittest/db/meta_tests.cpp index 7bc9db090f..a7933829c9 100644 --- a/cpp/unittest/db/meta_tests.cpp +++ b/cpp/unittest/db/meta_tests.cpp @@ -17,39 +17,39 @@ using namespace zilliz::milvus::engine; -TEST_F(MetaTest, GROUP_TEST) { - auto table_id = "meta_test_group"; +TEST_F(MetaTest, TABLE_TEST) { + auto table_id = "meta_test_table"; - meta::TableSchema group; - group.table_id_ = table_id; - auto status = impl_->CreateTable(group); + meta::TableSchema table; + table.table_id_ = table_id; + auto status = impl_->CreateTable(table); ASSERT_TRUE(status.ok()); - auto gid = group.id_; - group.id_ = -1; - status = impl_->DescribeTable(group); + auto gid = table.id_; + table.id_ = -1; + status = impl_->DescribeTable(table); ASSERT_TRUE(status.ok()); - ASSERT_EQ(group.id_, gid); - ASSERT_EQ(group.table_id_, table_id); + ASSERT_EQ(table.id_, gid); + ASSERT_EQ(table.table_id_, table_id); - group.table_id_ = "not_found"; - status = impl_->DescribeTable(group); + table.table_id_ = "not_found"; + status = impl_->DescribeTable(table); ASSERT_TRUE(!status.ok()); - group.table_id_ = table_id; - status = impl_->CreateTable(group); - ASSERT_TRUE(!status.ok()); + table.table_id_ = table_id; + status = impl_->CreateTable(table); + ASSERT_TRUE(status.ok()); } -TEST_F(MetaTest, table_file_TEST) { - auto table_id = "meta_test_group"; +TEST_F(MetaTest, TABLE_FILE_TEST) { + auto table_id = "meta_test_table"; - meta::TableSchema group; - group.table_id_ = table_id; - auto status = impl_->CreateTable(group); + meta::TableSchema table; + table.table_id_ = table_id; + auto status = impl_->CreateTable(table); meta::TableFileSchema table_file; - table_file.table_id_ = group.table_id_; + table_file.table_id_ = table.table_id_; status = impl_->CreateTableFile(table_file); ASSERT_TRUE(status.ok()); ASSERT_EQ(table_file.file_type_, meta::TableFileSchema::NEW); @@ -104,15 +104,15 @@ TEST_F(MetaTest, ARCHIVE_TEST_DAYS) { options.archive_conf = ArchiveConf("delete", ss.str()); auto impl = meta::DBMetaImpl(options); - auto table_id = "meta_test_group"; + auto table_id = "meta_test_table"; - meta::TableSchema group; - group.table_id_ = table_id; - auto status = impl.CreateTable(group); + meta::TableSchema table; + table.table_id_ = table_id; + auto status = impl.CreateTable(table); meta::TableFilesSchema files; meta::TableFileSchema table_file; - table_file.table_id_ = group.table_id_; + table_file.table_id_ = table.table_id_; auto cnt = 100; long ts = utils::GetMicroSecTimeStamp(); @@ -156,13 +156,13 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) { auto impl = meta::DBMetaImpl(options); auto table_id = "meta_test_group"; - meta::TableSchema group; - group.table_id_ = table_id; - auto status = impl.CreateTable(group); + meta::TableSchema table; + table.table_id_ = table_id; + auto status = impl.CreateTable(table); meta::TableFilesSchema files; meta::TableFileSchema table_file; - table_file.table_id_ = group.table_id_; + table_file.table_id_ = table.table_id_; auto cnt = 10; auto each_size = 2UL; @@ -198,9 +198,9 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) { TEST_F(MetaTest, TABLE_FILES_TEST) { auto table_id = "meta_test_group"; - meta::TableSchema group; - group.table_id_ = table_id; - auto status = impl_->CreateTable(group); + meta::TableSchema table; + table.table_id_ = table_id; + auto status = impl_->CreateTable(table); int new_files_cnt = 4; int raw_files_cnt = 5; @@ -208,7 +208,7 @@ TEST_F(MetaTest, TABLE_FILES_TEST) { int index_files_cnt = 7; meta::TableFileSchema table_file; - table_file.table_id_ = group.table_id_; + table_file.table_id_ = table.table_id_; for (auto i=0; iCreateTableFile(table_file); @@ -241,7 +241,7 @@ TEST_F(MetaTest, TABLE_FILES_TEST) { ASSERT_EQ(files.size(), to_index_files_cnt); meta::DatePartionedTableFilesSchema dated_files; - status = impl_->FilesToMerge(group.table_id_, dated_files); + status = impl_->FilesToMerge(table.table_id_, dated_files); ASSERT_TRUE(status.ok()); ASSERT_EQ(dated_files[table_file.date_].size(), raw_files_cnt);