mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Merge branch 'branch-0.5.0' into 'branch-0.5.0'
MS-578 makesure milvus5.0 dont crack 0.3.1 data See merge request megasearch/milvus!600 Former-commit-id: e61887c0ea0ec243ffb35703edbda5c74028b5a8
This commit is contained in:
commit
161fb4f99b
@ -21,6 +21,7 @@
|
||||
|
||||
#include <mutex>
|
||||
#include <chrono>
|
||||
#include <regex>
|
||||
#include <boost/filesystem.hpp>
|
||||
|
||||
namespace zilliz {
|
||||
@ -195,6 +196,41 @@ meta::DateT GetDate() {
|
||||
return GetDate(std::time(nullptr), 0);
|
||||
}
|
||||
|
||||
// URI format: dialect://username:password@host:port/database
|
||||
Status ParseMetaUri(const std::string& uri, MetaUriInfo& info) {
|
||||
std::string dialect_regex = "(.*)";
|
||||
std::string username_tegex = "(.*)";
|
||||
std::string password_regex = "(.*)";
|
||||
std::string host_regex = "(.*)";
|
||||
std::string port_regex = "(.*)";
|
||||
std::string db_name_regex = "(.*)";
|
||||
std::string uri_regex_str =
|
||||
dialect_regex + "\\:\\/\\/" +
|
||||
username_tegex + "\\:" +
|
||||
password_regex + "\\@" +
|
||||
host_regex + "\\:" +
|
||||
port_regex + "\\/" +
|
||||
db_name_regex;
|
||||
|
||||
std::regex uri_regex(uri_regex_str);
|
||||
std::smatch pieces_match;
|
||||
|
||||
if (std::regex_match(uri, pieces_match, uri_regex)) {
|
||||
info.dialect_ = pieces_match[1].str();
|
||||
info.username_ = pieces_match[2].str();
|
||||
info.password_ = pieces_match[3].str();
|
||||
info.host_ = pieces_match[4].str();
|
||||
info.port_ = pieces_match[5].str();
|
||||
info.db_name_ = pieces_match[6].str();
|
||||
|
||||
//TODO: verify host, port...
|
||||
} else {
|
||||
return Status(DB_INVALID_META_URI, "Invalid meta uri: " + uri);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace utils
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
||||
|
||||
@ -44,6 +44,17 @@ meta::DateT GetDate(const std::time_t &t, int day_delta = 0);
|
||||
meta::DateT GetDate();
|
||||
meta::DateT GetDateWithDelta(int day_delta);
|
||||
|
||||
struct MetaUriInfo {
|
||||
std::string dialect_;
|
||||
std::string username_;
|
||||
std::string password_;
|
||||
std::string host_;
|
||||
std::string port_;
|
||||
std::string db_name_;
|
||||
};
|
||||
|
||||
Status ParseMetaUri(const std::string& uri, MetaUriInfo& info);
|
||||
|
||||
} // namespace utils
|
||||
} // namespace engine
|
||||
} // namespace milvus
|
||||
|
||||
@ -20,13 +20,14 @@
|
||||
#include "MySQLMetaImpl.h"
|
||||
#include "utils/Log.h"
|
||||
#include "utils/Exception.h"
|
||||
#include "db/Utils.h"
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <time.h>
|
||||
#include <sstream>
|
||||
#include <cstdlib>
|
||||
#include <string>
|
||||
#include <regex>
|
||||
#include <string.h>
|
||||
|
||||
namespace zilliz {
|
||||
namespace milvus {
|
||||
@ -49,38 +50,23 @@ namespace engine {
|
||||
meta::MetaPtr MetaFactory::Build(const DBMetaOptions &metaOptions, const int &mode) {
|
||||
std::string uri = metaOptions.backend_uri;
|
||||
|
||||
std::string dialectRegex = "(.*)";
|
||||
std::string usernameRegex = "(.*)";
|
||||
std::string passwordRegex = "(.*)";
|
||||
std::string hostRegex = "(.*)";
|
||||
std::string portRegex = "(.*)";
|
||||
std::string dbNameRegex = "(.*)";
|
||||
std::string uriRegexStr = dialectRegex + "\\:\\/\\/" +
|
||||
usernameRegex + "\\:" +
|
||||
passwordRegex + "\\@" +
|
||||
hostRegex + "\\:" +
|
||||
portRegex + "\\/" +
|
||||
dbNameRegex;
|
||||
std::regex uriRegex(uriRegexStr);
|
||||
std::smatch pieces_match;
|
||||
|
||||
if (std::regex_match(uri, pieces_match, uriRegex)) {
|
||||
std::string dialect = pieces_match[1].str();
|
||||
std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower);
|
||||
if (dialect.find("mysql") != std::string::npos) {
|
||||
ENGINE_LOG_INFO << "Using MySQL";
|
||||
return std::make_shared<meta::MySQLMetaImpl>(metaOptions, mode);
|
||||
} else if (dialect.find("sqlite") != std::string::npos) {
|
||||
ENGINE_LOG_INFO << "Using SQLite";
|
||||
return std::make_shared<meta::SqliteMetaImpl>(metaOptions);
|
||||
} else {
|
||||
ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << dialect;
|
||||
throw InvalidArgumentException("URI dialect is not mysql / sqlite");
|
||||
}
|
||||
} else {
|
||||
utils::MetaUriInfo uri_info;
|
||||
auto status = utils::ParseMetaUri(uri, uri_info);
|
||||
if(!status.ok()) {
|
||||
ENGINE_LOG_ERROR << "Wrong URI format: URI = " << uri;
|
||||
throw InvalidArgumentException("Wrong URI format ");
|
||||
}
|
||||
|
||||
if (strcasecmp(uri_info.dialect_.c_str(), "mysql") == 0) {
|
||||
ENGINE_LOG_INFO << "Using MySQL";
|
||||
return std::make_shared<meta::MySQLMetaImpl>(metaOptions, mode);
|
||||
} else if (strcasecmp(uri_info.dialect_.c_str(), "sqlite") == 0) {
|
||||
ENGINE_LOG_INFO << "Using SQLite";
|
||||
return std::make_shared<meta::SqliteMetaImpl>(metaOptions);
|
||||
} else {
|
||||
ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << uri_info.dialect_;
|
||||
throw InvalidArgumentException("URI dialect is not mysql / sqlite");
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace engine
|
||||
|
||||
@ -19,21 +19,22 @@
|
||||
#include "db/IDGenerator.h"
|
||||
#include "db/Utils.h"
|
||||
#include "utils/Log.h"
|
||||
#include "utils/Exception.h"
|
||||
#include "MetaConsts.h"
|
||||
#include "metrics/Metrics.h"
|
||||
|
||||
#include <unistd.h>
|
||||
#include <sstream>
|
||||
#include <iostream>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <chrono>
|
||||
#include <fstream>
|
||||
#include <regex>
|
||||
#include <string>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
|
||||
#include "mysql++/mysql++.h"
|
||||
#include <string.h>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <mysql++/mysql++.h>
|
||||
|
||||
|
||||
namespace zilliz {
|
||||
@ -56,8 +57,112 @@ Status HandleException(const std::string &desc, const char* what = nullptr) {
|
||||
}
|
||||
}
|
||||
|
||||
class MetaField {
|
||||
public:
|
||||
MetaField(const std::string& name, const std::string& type, const std::string& setting)
|
||||
: name_(name),
|
||||
type_(type),
|
||||
setting_(setting) {
|
||||
}
|
||||
|
||||
std::string name() const {
|
||||
return name_;
|
||||
}
|
||||
|
||||
std::string ToString() const {
|
||||
return name_ + " " + type_ + " " + setting_;
|
||||
}
|
||||
|
||||
// mysql field type has additional information. for instance, a filed type is defined as 'BIGINT'
|
||||
// we get the type from sql is 'bigint(20)', so we need to ignore the '(20)'
|
||||
bool IsEqual(const MetaField& field) const {
|
||||
size_t name_len_min = field.name_.length() > name_.length() ? name_.length() : field.name_.length();
|
||||
size_t type_len_min = field.type_.length() > type_.length() ? type_.length() : field.type_.length();
|
||||
return strncasecmp(field.name_.c_str(), name_.c_str(), name_len_min) == 0 &&
|
||||
strncasecmp(field.type_.c_str(), type_.c_str(), type_len_min) == 0;
|
||||
}
|
||||
|
||||
private:
|
||||
std::string name_;
|
||||
std::string type_;
|
||||
std::string setting_;
|
||||
};
|
||||
|
||||
using MetaFields = std::vector<MetaField>;
|
||||
class MetaSchema {
|
||||
public:
|
||||
MetaSchema(const std::string& name, const MetaFields& fields)
|
||||
: name_(name),
|
||||
fields_(fields) {
|
||||
}
|
||||
|
||||
std::string name() const {
|
||||
return name_;
|
||||
}
|
||||
|
||||
std::string ToString() const {
|
||||
std::string result;
|
||||
for(auto& field : fields_) {
|
||||
if(!result.empty()) {
|
||||
result += ",";
|
||||
}
|
||||
result += field.ToString();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
//if the outer fields contains all this MetaSchema fields, return true
|
||||
//otherwise return false
|
||||
bool IsEqual(const MetaFields& fields) const {
|
||||
std::vector<std::string> found_field;
|
||||
for(const auto& this_field : fields_) {
|
||||
for(const auto& outer_field : fields) {
|
||||
if(this_field.IsEqual(outer_field)) {
|
||||
found_field.push_back(this_field.name());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return found_field.size() == fields_.size();
|
||||
}
|
||||
|
||||
private:
|
||||
std::string name_;
|
||||
MetaFields fields_;
|
||||
};
|
||||
|
||||
//Tables schema
|
||||
static const MetaSchema TABLES_SCHEMA(META_TABLES, {
|
||||
MetaField("id", "BIGINT", "PRIMARY KEY AUTO_INCREMENT"),
|
||||
MetaField("table_id", "VARCHAR(255)", "UNIQUE NOT NULL"),
|
||||
MetaField("state", "INT", "NOT NULL"),
|
||||
MetaField("dimension", "SMALLINT", "NOT NULL"),
|
||||
MetaField("created_on", "BIGINT", "NOT NULL"),
|
||||
MetaField("flag", "BIGINT", "DEFAULT 0 NOT NULL"),
|
||||
MetaField("index_file_size", "BIGINT", "DEFAULT 1024 NOT NULL"),
|
||||
MetaField("engine_type", "INT", "DEFAULT 1 NOT NULL"),
|
||||
MetaField("nlist", "INT", "DEFAULT 16384 NOT NULL"),
|
||||
MetaField("metric_type", "INT", "DEFAULT 1 NOT NULL"),
|
||||
});
|
||||
|
||||
//TableFiles schema
|
||||
static const MetaSchema TABLEFILES_SCHEMA(META_TABLEFILES, {
|
||||
MetaField("id", "BIGINT", "PRIMARY KEY AUTO_INCREMENT"),
|
||||
MetaField("table_id", "VARCHAR(255)", "NOT NULL"),
|
||||
MetaField("engine_type", "INT", "DEFAULT 1 NOT NULL"),
|
||||
MetaField("file_id", "VARCHAR(255)", "NOT NULL"),
|
||||
MetaField("file_type", "INT", "DEFAULT 0 NOT NULL"),
|
||||
MetaField("file_size", "BIGINT", "DEFAULT 0 NOT NULL"),
|
||||
MetaField("row_count", "BIGINT", "DEFAULT 0 NOT NULL"),
|
||||
MetaField("updated_time", "BIGINT", "NOT NULL"),
|
||||
MetaField("created_on", "BIGINT", "NOT NULL"),
|
||||
MetaField("date", "INT", "DEFAULT -1 NOT NULL"),
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
MySQLMetaImpl::MySQLMetaImpl(const DBMetaOptions &options_, const int &mode)
|
||||
: options_(options_),
|
||||
mode_(mode) {
|
||||
@ -84,7 +189,56 @@ Status MySQLMetaImpl::NextFileId(std::string &file_id) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void MySQLMetaImpl::ValidateMetaSchema() {
|
||||
if(nullptr == mysql_connection_pool_) {
|
||||
return;
|
||||
}
|
||||
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
if (connectionPtr == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto validate_func = [&](const MetaSchema& schema) {
|
||||
Query query_statement = connectionPtr->query();
|
||||
query_statement << "DESC " << schema.name() << ";";
|
||||
|
||||
MetaFields exist_fields;
|
||||
|
||||
try {
|
||||
StoreQueryResult res = query_statement.store();
|
||||
for (size_t i = 0; i < res.num_rows(); i++) {
|
||||
const Row &row = res[i];
|
||||
std::string name, type;
|
||||
row["Field"].to_string(name);
|
||||
row["Type"].to_string(type);
|
||||
|
||||
exist_fields.push_back(MetaField(name, type, ""));
|
||||
}
|
||||
} catch (std::exception &e) {
|
||||
ENGINE_LOG_DEBUG << "Meta table '" << schema.name() << "' not exist and will be created";
|
||||
}
|
||||
|
||||
if(exist_fields.empty()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return schema.IsEqual(exist_fields);
|
||||
};
|
||||
|
||||
//verify Tables
|
||||
if (!validate_func(TABLES_SCHEMA)) {
|
||||
throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
|
||||
}
|
||||
|
||||
//verufy TableFiles
|
||||
if (!validate_func(TABLEFILES_SCHEMA)) {
|
||||
throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version");
|
||||
}
|
||||
}
|
||||
|
||||
Status MySQLMetaImpl::Initialize() {
|
||||
//step 1: create db root path
|
||||
if (!boost::filesystem::is_directory(options_.path)) {
|
||||
auto ret = boost::filesystem::create_directory(options_.path);
|
||||
if (!ret) {
|
||||
@ -96,108 +250,79 @@ Status MySQLMetaImpl::Initialize() {
|
||||
|
||||
std::string uri = options_.backend_uri;
|
||||
|
||||
std::string dialectRegex = "(.*)";
|
||||
std::string usernameRegex = "(.*)";
|
||||
std::string passwordRegex = "(.*)";
|
||||
std::string hostRegex = "(.*)";
|
||||
std::string portRegex = "(.*)";
|
||||
std::string dbNameRegex = "(.*)";
|
||||
std::string uriRegexStr = dialectRegex + "\\:\\/\\/" +
|
||||
usernameRegex + "\\:" +
|
||||
passwordRegex + "\\@" +
|
||||
hostRegex + "\\:" +
|
||||
portRegex + "\\/" +
|
||||
dbNameRegex;
|
||||
std::regex uriRegex(uriRegexStr);
|
||||
std::smatch pieces_match;
|
||||
//step 2: parse and check meta uri
|
||||
utils::MetaUriInfo uri_info;
|
||||
auto status = utils::ParseMetaUri(uri, uri_info);
|
||||
if(!status.ok()) {
|
||||
std::string msg = "Wrong URI format: " + uri;
|
||||
ENGINE_LOG_ERROR << msg;
|
||||
throw Exception(DB_INVALID_META_URI, msg);
|
||||
}
|
||||
|
||||
if (std::regex_match(uri, pieces_match, uriRegex)) {
|
||||
std::string dialect = pieces_match[1].str();
|
||||
std::transform(dialect.begin(), dialect.end(), dialect.begin(), ::tolower);
|
||||
if (dialect.find("mysql") == std::string::npos) {
|
||||
return Status(DB_ERROR, "URI's dialect is not MySQL");
|
||||
if (strcasecmp(uri_info.dialect_.c_str(), "mysql") != 0) {
|
||||
std::string msg = "URI's dialect is not MySQL";
|
||||
ENGINE_LOG_ERROR << msg;
|
||||
throw Exception(DB_INVALID_META_URI, msg);
|
||||
}
|
||||
|
||||
//step 3: connect mysql
|
||||
int thread_hint = std::thread::hardware_concurrency();
|
||||
int max_pool_size = (thread_hint == 0) ? 8 : thread_hint;
|
||||
unsigned int port = 0;
|
||||
if (!uri_info.port_.empty()) {
|
||||
port = std::stoi(uri_info.port_);
|
||||
}
|
||||
|
||||
mysql_connection_pool_ =
|
||||
std::make_shared<MySQLConnectionPool>(uri_info.db_name_, uri_info.username_,
|
||||
uri_info.password_, uri_info.host_, port, max_pool_size);
|
||||
ENGINE_LOG_DEBUG << "MySQL connection pool: maximum pool size = " << std::to_string(max_pool_size);
|
||||
|
||||
//step 4: validate to avoid open old version schema
|
||||
ValidateMetaSchema();
|
||||
|
||||
//step 5: create meta tables
|
||||
try {
|
||||
|
||||
if (mode_ != DBOptions::MODE::READ_ONLY) {
|
||||
CleanUp();
|
||||
}
|
||||
std::string username = pieces_match[2].str();
|
||||
std::string password = pieces_match[3].str();
|
||||
std::string serverAddress = pieces_match[4].str();
|
||||
unsigned int port = 0;
|
||||
if (!pieces_match[5].str().empty()) {
|
||||
port = std::stoi(pieces_match[5].str());
|
||||
}
|
||||
std::string dbName = pieces_match[6].str();
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
int threadHint = std::thread::hardware_concurrency();
|
||||
int maxPoolSize = threadHint == 0 ? 8 : threadHint;
|
||||
mysql_connection_pool_ =
|
||||
std::make_shared<MySQLConnectionPool>(dbName, username, password, serverAddress, port, maxPoolSize);
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQL connection pool: maximum pool size = " << std::to_string(maxPoolSize);
|
||||
try {
|
||||
|
||||
if (mode_ != DBOptions::MODE::READ_ONLY) {
|
||||
CleanUp();
|
||||
if (connectionPtr == nullptr) {
|
||||
return Status(DB_ERROR, "Failed to connect to database server");
|
||||
}
|
||||
|
||||
{
|
||||
ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab);
|
||||
|
||||
if (connectionPtr == nullptr) {
|
||||
return Status(DB_ERROR, "Failed to connect to database server");
|
||||
}
|
||||
if (!connectionPtr->thread_aware()) {
|
||||
ENGINE_LOG_ERROR << "MySQL++ wasn't built with thread awareness! Can't run without it.";
|
||||
return Status(DB_ERROR, "MySQL++ wasn't built with thread awareness! Can't run without it.");
|
||||
}
|
||||
Query InitializeQuery = connectionPtr->query();
|
||||
|
||||
InitializeQuery << "CREATE TABLE IF NOT EXISTS " <<
|
||||
TABLES_SCHEMA.name() << " (" << TABLES_SCHEMA.ToString() + ");";
|
||||
|
||||
if (!connectionPtr->thread_aware()) {
|
||||
ENGINE_LOG_ERROR << "MySQL++ wasn't built with thread awareness! Can't run without it.";
|
||||
return Status(DB_ERROR, "MySQL++ wasn't built with thread awareness! Can't run without it.");
|
||||
}
|
||||
Query InitializeQuery = connectionPtr->query();
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
|
||||
|
||||
InitializeQuery << "CREATE TABLE IF NOT EXISTS " <<
|
||||
META_TABLES << " " <<
|
||||
"(id BIGINT PRIMARY KEY AUTO_INCREMENT, " <<
|
||||
"table_id VARCHAR(255) UNIQUE NOT NULL, " <<
|
||||
"state INT NOT NULL, " <<
|
||||
"dimension SMALLINT NOT NULL, " <<
|
||||
"created_on BIGINT NOT NULL, " <<
|
||||
"flag BIGINT DEFAULT 0 NOT NULL, " <<
|
||||
"index_file_size BIGINT DEFAULT 1024 NOT NULL, " <<
|
||||
"engine_type INT DEFAULT 1 NOT NULL, " <<
|
||||
"nlist INT DEFAULT 16384 NOT NULL, " <<
|
||||
"metric_type INT DEFAULT 1 NOT NULL);";
|
||||
if (!InitializeQuery.exec()) {
|
||||
return HandleException("Initialization Error", InitializeQuery.error());
|
||||
}
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
|
||||
InitializeQuery << "CREATE TABLE IF NOT EXISTS " <<
|
||||
TABLEFILES_SCHEMA.name() << " (" << TABLEFILES_SCHEMA.ToString() + ");";
|
||||
|
||||
if (!InitializeQuery.exec()) {
|
||||
return HandleException("Initialization Error", InitializeQuery.error());
|
||||
}
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
|
||||
|
||||
InitializeQuery << "CREATE TABLE IF NOT EXISTS " <<
|
||||
META_TABLEFILES << " " <<
|
||||
"(id BIGINT PRIMARY KEY AUTO_INCREMENT, " <<
|
||||
"table_id VARCHAR(255) NOT NULL, " <<
|
||||
"engine_type INT DEFAULT 1 NOT NULL, " <<
|
||||
"file_id VARCHAR(255) NOT NULL, " <<
|
||||
"file_type INT DEFAULT 0 NOT NULL, " <<
|
||||
"file_size BIGINT DEFAULT 0 NOT NULL, " <<
|
||||
"row_count BIGINT DEFAULT 0 NOT NULL, " <<
|
||||
"updated_time BIGINT NOT NULL, " <<
|
||||
"created_on BIGINT NOT NULL, " <<
|
||||
"date INT DEFAULT -1 NOT NULL);";
|
||||
if (!InitializeQuery.exec()) {
|
||||
return HandleException("Initialization Error", InitializeQuery.error());
|
||||
}
|
||||
} //Scoped Connection
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::Initialize: " << InitializeQuery.str();
|
||||
|
||||
if (!InitializeQuery.exec()) {
|
||||
return HandleException("Initialization Error", InitializeQuery.error());
|
||||
}
|
||||
} //Scoped Connection
|
||||
|
||||
} catch (std::exception &e) {
|
||||
return HandleException("GENERAL ERROR DURING INITIALIZATION", e.what());
|
||||
}
|
||||
} else {
|
||||
ENGINE_LOG_ERROR << "Wrong URI format. URI = " << uri;
|
||||
return Status(DB_ERROR, "Wrong URI format");
|
||||
} catch (std::exception &e) {
|
||||
return HandleException("GENERAL ERROR DURING INITIALIZATION", e.what());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
@ -1843,7 +1968,7 @@ Status MySQLMetaImpl::DropAll() {
|
||||
}
|
||||
|
||||
Query dropTableQuery = connectionPtr->query();
|
||||
dropTableQuery << "DROP TABLE IF EXISTS " << META_TABLES << ", " << META_TABLEFILES << ";";
|
||||
dropTableQuery << "DROP TABLE IF EXISTS " << TABLES_SCHEMA.name() << ", " << TABLEFILES_SCHEMA.name() << ";";
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::DropAll: " << dropTableQuery.str();
|
||||
|
||||
|
||||
@ -103,8 +103,11 @@ class MySQLMetaImpl : public Meta {
|
||||
Status NextFileId(std::string &file_id);
|
||||
Status NextTableId(std::string &table_id);
|
||||
Status DiscardFiles(long long to_discard_size);
|
||||
|
||||
void ValidateMetaSchema();
|
||||
Status Initialize();
|
||||
|
||||
private:
|
||||
const DBMetaOptions options_;
|
||||
const int mode_;
|
||||
|
||||
|
||||
@ -84,7 +84,6 @@ inline auto StoragePrototype(const std::string &path) {
|
||||
|
||||
using ConnectorT = decltype(StoragePrototype(""));
|
||||
static std::unique_ptr<ConnectorT> ConnectorPtr;
|
||||
using ConditionT = decltype(c(&TableFileSchema::id_) == 1UL);
|
||||
|
||||
SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions &options_)
|
||||
: options_(options_) {
|
||||
@ -111,6 +110,23 @@ Status SqliteMetaImpl::NextFileId(std::string &file_id) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void SqliteMetaImpl::ValidateMetaSchema() {
|
||||
if(ConnectorPtr == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
//old meta could be recreated since schema changed, throw exception if meta schema is not compatible
|
||||
auto ret = ConnectorPtr->sync_schema_simulate();
|
||||
if(ret.find(META_TABLES) != ret.end()
|
||||
&& sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLES]) {
|
||||
throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version");
|
||||
}
|
||||
if(ret.find(META_TABLEFILES) != ret.end()
|
||||
&& sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLEFILES]) {
|
||||
throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version");
|
||||
}
|
||||
}
|
||||
|
||||
Status SqliteMetaImpl::Initialize() {
|
||||
if (!boost::filesystem::is_directory(options_.path)) {
|
||||
auto ret = boost::filesystem::create_directory(options_.path);
|
||||
@ -123,16 +139,7 @@ Status SqliteMetaImpl::Initialize() {
|
||||
|
||||
ConnectorPtr = std::make_unique<ConnectorT>(StoragePrototype(options_.path + "/meta.sqlite"));
|
||||
|
||||
//old meta could be recreated since schema changed, throw exception if meta schema is not compatible
|
||||
auto ret = ConnectorPtr->sync_schema_simulate();
|
||||
if(ret.find(META_TABLES) != ret.end()
|
||||
&& sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLES]) {
|
||||
throw Exception(DB_INCOMPATIB_META, "Meta schema is created by Milvus old version");
|
||||
}
|
||||
if(ret.find(META_TABLEFILES) != ret.end()
|
||||
&& sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLEFILES]) {
|
||||
throw Exception(DB_INCOMPATIB_META, "Meta schema is created by Milvus old version");
|
||||
}
|
||||
ValidateMetaSchema();
|
||||
|
||||
ConnectorPtr->sync_schema();
|
||||
ConnectorPtr->open_forever(); // thread safe option
|
||||
|
||||
@ -97,10 +97,12 @@ class SqliteMetaImpl : public Meta {
|
||||
Status NextFileId(std::string &file_id);
|
||||
Status NextTableId(std::string &table_id);
|
||||
Status DiscardFiles(long to_discard_size);
|
||||
|
||||
void ValidateMetaSchema();
|
||||
Status Initialize();
|
||||
|
||||
private:
|
||||
const DBMetaOptions options_;
|
||||
|
||||
std::mutex meta_mutex_;
|
||||
}; // DBMetaImpl
|
||||
|
||||
|
||||
@ -87,6 +87,7 @@ constexpr ErrorCode DB_NOT_FOUND = ToDbErrorCode(3);
|
||||
constexpr ErrorCode DB_ALREADY_EXIST = ToDbErrorCode(4);
|
||||
constexpr ErrorCode DB_INVALID_PATH = ToDbErrorCode(5);
|
||||
constexpr ErrorCode DB_INCOMPATIB_META = ToDbErrorCode(6);
|
||||
constexpr ErrorCode DB_INVALID_META_URI = ToDbErrorCode(7);
|
||||
|
||||
//knowhere error code
|
||||
constexpr ErrorCode KNOWHERE_ERROR = ToKnowhereErrorCode(1);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user