From 762f5f296fcf3200d9bfefbca4ed86d4457f4d75 Mon Sep 17 00:00:00 2001 From: groot Date: Thu, 29 Oct 2020 15:29:02 +0800 Subject: [PATCH] #4066 Rewrite SqliteMetaImpl to discard sqlite_orm (#4114) * #4066 Rewrite SqliteMetaImpl to discard sqlite_orm Signed-off-by: yhmo * fix unittest Signed-off-by: yhmo --- core/src/db/meta/MetaSchema.cpp | 67 + core/src/db/meta/MetaSchema.h | 69 + core/src/db/meta/MySQLMetaImpl.cpp | 88 +- core/src/db/meta/SqliteMetaImpl.cpp | 2240 ++++++++++++++++----------- core/src/db/meta/SqliteMetaImpl.h | 21 +- core/unittest/db/test_db.cpp | 2 +- 6 files changed, 1485 insertions(+), 1002 deletions(-) create mode 100644 core/src/db/meta/MetaSchema.cpp create mode 100644 core/src/db/meta/MetaSchema.h diff --git a/core/src/db/meta/MetaSchema.cpp b/core/src/db/meta/MetaSchema.cpp new file mode 100644 index 0000000000..a0497e53e6 --- /dev/null +++ b/core/src/db/meta/MetaSchema.cpp @@ -0,0 +1,67 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "db/meta/MetaSchema.h" + +#include "utils/StringHelpFunctions.h" + +namespace milvus::engine::meta { + +bool +MetaField::IsEqual(const MetaField& field) const { + // mysql field type has additional information. for instance, a filed type is defined as 'BIGINT' + // we get the type from sql is 'bigint(20)', so we need to ignore the '(20)' + size_t name_len_min = field.name_.length() > name_.length() ? name_.length() : field.name_.length(); + size_t type_len_min = field.type_.length() > type_.length() ? type_.length() : field.type_.length(); + + // only check field type, don't check field width, for example: VARCHAR(255) and VARCHAR(100) is equal + std::vector type_split; + milvus::server::StringHelpFunctions::SplitStringByDelimeter(type_, "(", type_split); + if (!type_split.empty()) { + type_len_min = type_split[0].length() > type_len_min ? type_len_min : type_split[0].length(); + } + + // field name must be equal, ignore type width + return strncasecmp(field.name_.c_str(), name_.c_str(), name_len_min) == 0 && + strncasecmp(field.type_.c_str(), type_.c_str(), type_len_min) == 0; +} + +std::string +MetaSchema::ToString() const { + std::string result; + for (auto& field : fields_) { + if (!result.empty()) { + result += ","; + } + result += field.ToString(); + } + + return result; +} + +// if the outer fields contains all this MetaSchema fields, return true +// otherwise return false +bool +MetaSchema::IsEqual(const MetaFields& fields) const { + std::vector found_field; + for (const auto& this_field : fields_) { + for (const auto& outer_field : fields) { + if (this_field.IsEqual(outer_field)) { + found_field.push_back(this_field.name()); + break; + } + } + } + + return found_field.size() == fields_.size(); +} + +} // namespace milvus::engine::meta diff --git a/core/src/db/meta/MetaSchema.h b/core/src/db/meta/MetaSchema.h new file mode 100644 index 0000000000..2a5acfb096 --- /dev/null +++ b/core/src/db/meta/MetaSchema.h @@ -0,0 +1,69 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#pragma once + +#include +#include +#include +#include + +namespace milvus::engine::meta { + +class MetaField { + public: + MetaField(const std::string& name, const std::string& type, const std::string& setting) + : name_(name), type_(type), setting_(setting) { + } + + const std::string& + name() const { + return name_; + } + + std::string + ToString() const { + return name_ + " " + type_ + " " + setting_; + } + + bool + IsEqual(const MetaField& field) const; + + private: + std::string name_; + std::string type_; + std::string setting_; +}; + +using MetaFields = std::vector; + +class MetaSchema { + public: + MetaSchema(const std::string& name, const MetaFields& fields) : name_(name), fields_(fields) { + } + + std::string + name() const { + return name_; + } + + std::string + ToString() const; + + bool + IsEqual(const MetaFields& fields) const; + + private: + std::string name_; + MetaFields fields_; +}; + +} // namespace milvus::engine::meta diff --git a/core/src/db/meta/MySQLMetaImpl.cpp b/core/src/db/meta/MySQLMetaImpl.cpp index c452f318b8..9f3de9b19c 100644 --- a/core/src/db/meta/MySQLMetaImpl.cpp +++ b/core/src/db/meta/MySQLMetaImpl.cpp @@ -33,6 +33,7 @@ #include "MetaConsts.h" #include "db/IDGenerator.h" #include "db/Utils.h" +#include "db/meta/MetaSchema.h" #include "metrics/Metrics.h" #include "utils/CommonUtil.h" #include "utils/Exception.h" @@ -78,93 +79,6 @@ HandleException(const std::string& desc, const char* what = nullptr) { return Status(DB_META_TRANSACTION_FAILED, msg); } -class MetaField { - public: - MetaField(const std::string& name, const std::string& type, const std::string& setting) - : name_(name), type_(type), setting_(setting) { - } - - std::string - name() const { - return name_; - } - - std::string - ToString() const { - return name_ + " " + type_ + " " + setting_; - } - - // mysql field type has additional information. for instance, a filed type is defined as 'BIGINT' - // we get the type from sql is 'bigint(20)', so we need to ignore the '(20)' - bool - IsEqual(const MetaField& field) const { - size_t name_len_min = field.name_.length() > name_.length() ? name_.length() : field.name_.length(); - size_t type_len_min = field.type_.length() > type_.length() ? type_.length() : field.type_.length(); - - // only check field type, don't check field width, for example: VARCHAR(255) and VARCHAR(100) is equal - std::vector type_split; - milvus::server::StringHelpFunctions::SplitStringByDelimeter(type_, "(", type_split); - if (!type_split.empty()) { - type_len_min = type_split[0].length() > type_len_min ? type_len_min : type_split[0].length(); - } - - // field name must be equal, ignore type width - return strncasecmp(field.name_.c_str(), name_.c_str(), name_len_min) == 0 && - strncasecmp(field.type_.c_str(), type_.c_str(), type_len_min) == 0; - } - - private: - std::string name_; - std::string type_; - std::string setting_; -}; - -using MetaFields = std::vector; - -class MetaSchema { - public: - MetaSchema(const std::string& name, const MetaFields& fields) : name_(name), fields_(fields) { - } - - std::string - name() const { - return name_; - } - - std::string - ToString() const { - std::string result; - for (auto& field : fields_) { - if (!result.empty()) { - result += ","; - } - result += field.ToString(); - } - return result; - } - - // if the outer fields contains all this MetaSchema fields, return true - // otherwise return false - bool - IsEqual(const MetaFields& fields) const { - std::vector found_field; - for (const auto& this_field : fields_) { - for (const auto& outer_field : fields) { - if (this_field.IsEqual(outer_field)) { - found_field.push_back(this_field.name()); - break; - } - } - } - - return found_field.size() == fields_.size(); - } - - private: - std::string name_; - MetaFields fields_; -}; - // Environment schema static const MetaSchema ENVIRONMENT_SCHEMA(META_ENVIRONMENT, { MetaField("global_lsn", "BIGINT", "NOT NULL"), diff --git a/core/src/db/meta/SqliteMetaImpl.cpp b/core/src/db/meta/SqliteMetaImpl.cpp index ab43dcb461..2ead90942b 100644 --- a/core/src/db/meta/SqliteMetaImpl.cpp +++ b/core/src/db/meta/SqliteMetaImpl.cpp @@ -11,7 +11,6 @@ #include "db/meta/SqliteMetaImpl.h" -#include #include #include @@ -26,6 +25,7 @@ #include #include "MetaConsts.h" +#include "db/meta/MetaSchema.h" #include "db/IDGenerator.h" #include "db/Utils.h" #include "metrics/Metrics.h" @@ -43,13 +43,11 @@ namespace milvus { namespace engine { namespace meta { -using namespace sqlite_orm; - namespace { constexpr uint64_t SQL_BATCH_SIZE = 50; -template +template void DistributeBatch(const T& id_array, std::vector>& id_groups) { std::vector temp_group; @@ -79,46 +77,88 @@ HandleException(const std::string& desc, const char* what = nullptr) { } } -} // namespace - -inline auto -StoragePrototype(const std::string& path) { - return make_storage( - path, - make_table(META_ENVIRONMENT, make_column("global_lsn", &EnvironmentSchema::global_lsn_, default_value(0))), - make_table(META_TABLES, make_column("id", &CollectionSchema::id_, primary_key()), - make_column("table_id", &CollectionSchema::collection_id_, unique()), - make_column("state", &CollectionSchema::state_), - make_column("dimension", &CollectionSchema::dimension_), - make_column("created_on", &CollectionSchema::created_on_), - make_column("flag", &CollectionSchema::flag_, default_value(0)), - make_column("index_file_size", &CollectionSchema::index_file_size_), - make_column("engine_type", &CollectionSchema::engine_type_), - make_column("index_params", &CollectionSchema::index_params_), - make_column("metric_type", &CollectionSchema::metric_type_), - make_column("owner_table", &CollectionSchema::owner_collection_, default_value("")), - make_column("partition_tag", &CollectionSchema::partition_tag_, default_value("")), - make_column("version", &CollectionSchema::version_, default_value(CURRENT_VERSION)), - make_column("flush_lsn", &CollectionSchema::flush_lsn_)), - make_table(META_FIELDS, make_column("collection_id", &hybrid::FieldSchema::collection_id_), - make_column("field_name", &hybrid::FieldSchema::field_name_), - make_column("field_type", &hybrid::FieldSchema::field_type_), - make_column("field_params", &hybrid::FieldSchema::field_params_)), - make_table(META_TABLEFILES, make_column("id", &SegmentSchema::id_, primary_key()), - make_column("table_id", &SegmentSchema::collection_id_), - make_column("segment_id", &SegmentSchema::segment_id_, default_value("")), - make_column("engine_type", &SegmentSchema::engine_type_), - make_column("file_id", &SegmentSchema::file_id_), - make_column("file_type", &SegmentSchema::file_type_), - make_column("file_size", &SegmentSchema::file_size_, default_value(0)), - make_column("row_count", &SegmentSchema::row_count_, default_value(0)), - make_column("updated_time", &SegmentSchema::updated_time_), - make_column("created_on", &SegmentSchema::created_on_), make_column("date", &SegmentSchema::date_), - make_column("flush_lsn", &SegmentSchema::flush_lsn_))); +std::string +ErrorMsg(sqlite3* db) { + std::stringstream ss; + ss << "(sqlite3 error code:" << sqlite3_errcode(db) << ", extend code: " << sqlite3_extended_errcode(db) + << ", sys errno:" << sqlite3_system_errno(db) << ", sys err msg:" << strerror(errno) + << ", msg:" << sqlite3_errmsg(db) << ")"; + return ss.str(); } -using ConnectorT = decltype(StoragePrototype("table")); -static std::unique_ptr ConnectorPtr; +std::string +Quote(const std::string& str) { + return "\'" + str + "\'"; +} + +///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Environment schema +static const MetaSchema ENVIRONMENT_SCHEMA(META_ENVIRONMENT, { + MetaField("global_lsn", "INTEGER", "DEFAULT (0) NOT NULL"), +}); + +// Tables schema +static const MetaSchema TABLES_SCHEMA(META_TABLES, { + MetaField("id", "INTEGER", "PRIMARY KEY NOT NULL"), + MetaField("table_id", "TEXT", "UNIQUE NOT NULL"), + MetaField("state", "INTEGER", "NOT NULL"), + MetaField("dimension", "INTEGER", "NOT NULL"), + MetaField("created_on", "INTEGER", "NOT NULL"), + MetaField("flag", "INTEGER", "DEFAULT (0) NOT NULL"), + MetaField("index_file_size", "INTEGER", "NOT NULL"), + MetaField("engine_type", "INTEGER", "NOT NULL"), + MetaField("index_params", "TEXT", "NOT NULL"), + MetaField("metric_type", "INTEGER", "NOT NULL"), + MetaField("owner_table", "TEXT", "DEFAULT ('') NOT NULL"), + MetaField("partition_tag", "TEXT", "DEFAULT ('') NOT NULL"), + MetaField("version", "TEXT", + std::string("DEFAULT ('") + CURRENT_VERSION + "') NOT NULL"), + MetaField("flush_lsn", "INTEGER", "NOT NULL"), +}); + +// TableFiles schema +static const MetaSchema TABLEFILES_SCHEMA(META_TABLEFILES, { + MetaField("id", "INTEGER", "PRIMARY KEY NOT NULL"), + MetaField("table_id", "TEXT", "NOT NULL"), + MetaField("segment_id", "TEXT", "DEFAULT ('') NOT NULL"), + MetaField("engine_type", "INTEGER", "NOT NULL"), + MetaField("file_id", "TEXT", "NOT NULL"), + MetaField("file_type", "INTEGER", "NOT NULL"), + MetaField("file_size", "INTEGER", "DEFAULT (0) NOT NULL"), + MetaField("row_count", "INTEGER", "DEFAULT (0) NOT NULL"), + MetaField("updated_time", "INTEGER", "NOT NULL"), + MetaField("created_on", "INTEGER", "NOT NULL"), + MetaField("date", "INTEGER", "NOT NULL"), + MetaField("flush_lsn", "INTEGER", "NOT NULL"), +}); + +// Fields schema +static const MetaSchema FIELDS_SCHEMA(META_FIELDS, { + MetaField("collection_id", "TEXT", "NOT NULL"), + MetaField("field_name", "TEXT", "NOT NULL"), + MetaField("field_type", "INTEGER", "NOT NULL"), + MetaField("field_params", "TEXT", "NOT NULL"), +}); + +///////////////////////////////////////////////////// +static AttrsMapList* QueryData = nullptr; + +static int +QueryCallback(void* data, int argc, char** argv, char** azColName) { + AttrsMap raw; + for (size_t i = 0; i < argc; i++) { + // TODO: here check argv[i]. Refer to 'https://www.tutorialspoint.com/sqlite/sqlite_c_cpp.htm' + raw.insert(std::make_pair(azColName[i], argv[i])); + } + + if (QueryData) { + QueryData->push_back(raw); + } + + return 0; +} + +} // namespace SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions& options) : options_(options) { Initialize(); @@ -149,30 +189,92 @@ SqliteMetaImpl::NextFileId(std::string& file_id) { void SqliteMetaImpl::ValidateMetaSchema() { - bool is_null_connector{ConnectorPtr == nullptr}; + bool is_null_connector{db_ == nullptr}; fiu_do_on("SqliteMetaImpl.ValidateMetaSchema.NullConnection", is_null_connector = true); if (is_null_connector) { throw Exception(DB_ERROR, "Connector is null pointer"); } - // old meta could be recreated since schema changed, throw exception if meta schema is not compatible - auto ret = ConnectorPtr->sync_schema_simulate(); - if (ret.find(META_TABLES) != ret.end() && - sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLES]) { - throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version"); +// // old meta could be recreated since schema changed, throw exception if meta schema is not compatible +// auto ret = ConnectorPtr->sync_schema_simulate(); +// if (ret.find(META_TABLES) != ret.end() && +// sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLES]) { +// throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version"); +// } +// if (ret.find(META_FIELDS) != ret.end() && +// sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_FIELDS]) { +// throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version"); +// } +// if (ret.find(META_TABLEFILES) != ret.end() && +// sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLEFILES]) { +// throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version"); +// } +} + +Status +SqliteMetaImpl::SqlQuery(const std::string& sql, AttrsMapList* res) { + try { + LOG_ENGINE_DEBUG_ << sql; + + std::lock_guard meta_lock(sqlite_mutex_); + + int (* call_back)(void*, int, char**, char**) = nullptr; + if (res) { + QueryData = res; + call_back = QueryCallback; + } + + auto rc = sqlite3_exec(db_, sql.c_str(), call_back, nullptr, nullptr); + QueryData = nullptr; + if (rc != SQLITE_OK) { + std::string err = ErrorMsg(db_); + LOG_ENGINE_ERROR_ << err; + return Status(DB_ERROR, err); + } + } catch (std::exception& e) { + LOG_ENGINE_ERROR_ << e.what(); + return Status(DB_ERROR, e.what()); } - if (ret.find(META_FIELDS) != ret.end() && - sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_FIELDS]) { - throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version"); - } - if (ret.find(META_TABLEFILES) != ret.end() && - sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLEFILES]) { - throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version"); + + return Status::OK(); +} + +Status +SqliteMetaImpl::SqlTransaction(const std::vector& sql_statements) { + try { + std::lock_guard meta_lock(sqlite_mutex_); + + if (SQLITE_OK != sqlite3_exec(db_, "BEGIN", nullptr, nullptr, nullptr)) { + std::string sql_err = "Sqlite begin transaction failed: " + ErrorMsg(db_); + return Status(DB_META_TRANSACTION_FAILED, sql_err); + } + + int rc = SQLITE_OK; + for (auto& sql : sql_statements) { + LOG_ENGINE_DEBUG_ << sql; + + rc = sqlite3_exec(db_, sql.c_str(), nullptr, nullptr, nullptr); + if (rc != SQLITE_OK) { + break; + } + } + + if (SQLITE_OK != rc || SQLITE_OK != sqlite3_exec(db_, "COMMIT", nullptr, nullptr, nullptr)) { + std::string err = ErrorMsg(db_); + LOG_ENGINE_ERROR_ << err; + sqlite3_exec(db_, "ROLLBACK", nullptr, nullptr, nullptr); + return Status(DB_META_TRANSACTION_FAILED, err); + } + } catch (std::exception& e) { + return Status(DB_META_TRANSACTION_FAILED, e.what()); } + + return Status::OK(); } Status SqliteMetaImpl::Initialize() { + // create db path if (!boost::filesystem::is_directory(options_.path_)) { auto ret = boost::filesystem::create_directory(options_.path_); fiu_do_on("SqliteMetaImpl.Initialize.fail_create_directory", ret = false); @@ -183,13 +285,41 @@ SqliteMetaImpl::Initialize() { } } - ConnectorPtr = std::make_unique(StoragePrototype(options_.path_ + "/meta.sqlite")); + // open/create sqlite db + std::string meta_path = options_.path_ + "/meta.sqlite"; + if (sqlite3_open(meta_path.c_str(), &db_) != SQLITE_OK) { + std::string err = "Cannot open Sqlite database: " + ErrorMsg(db_); + std::cerr << err << std::endl; + throw std::runtime_error(err); + } + // set sqlite wal mode + sqlite3_extended_result_codes(db_, 1); // allow extend error code + if (SQLITE_OK != sqlite3_exec(db_, "pragma journal_mode = WAL", nullptr, nullptr, nullptr)) { + std::string errs = "Sqlite configure wal journal mode to WAL failed: " + ErrorMsg(db_); + std::cerr << errs << std::endl; + throw std::runtime_error(errs); + } + + // validate exist schema ValidateMetaSchema(); - ConnectorPtr->sync_schema(); - ConnectorPtr->open_forever(); // thread safe option - ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log + // create meta tables + auto create_schema = [&](const MetaSchema& schema) { + std::string create_table_str = "CREATE TABLE IF NOT EXISTS " + schema.name() + "(" + schema.ToString() + ");"; + std::vector statements = {create_table_str}; + auto status = SqlTransaction(statements); + if (!status.ok()) { + std::string err = "Cannot create Sqlite table: "; + err += status.message(); + throw std::runtime_error(err); + } + }; + + create_schema(ENVIRONMENT_SCHEMA); + create_schema(TABLES_SCHEMA); + create_schema(TABLEFILES_SCHEMA); + create_schema(FIELDS_SCHEMA); CleanUpShadowFiles(); @@ -199,21 +329,20 @@ SqliteMetaImpl::Initialize() { Status SqliteMetaImpl::CreateCollection(CollectionSchema& collection_schema) { USING_SQLITE_WARNING + try { server::MetricCollector metric; - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - if (collection_schema.collection_id_ == "") { NextCollectionId(collection_schema.collection_id_); } else { fiu_do_on("SqliteMetaImpl.CreateCollection.throw_exception", throw std::exception()); - auto collection = - ConnectorPtr->select(columns(&CollectionSchema::state_), - where(c(&CollectionSchema::collection_id_) == collection_schema.collection_id_)); - if (collection.size() == 1) { - if (CollectionSchema::TO_DELETE == std::get<0>(collection[0])) { + std::string statement = "SELECT state FROM " + std::string(META_TABLES) + " WHERE table_id = " + + Quote(collection_schema.collection_id_) + ";"; + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (res.size() == 1) { + if (res[0]["state"] == std::to_string(CollectionSchema::TO_DELETE)) { return Status(DB_ERROR, "Collection already exists and it is in delete state, please wait a second"); } else { @@ -226,16 +355,39 @@ SqliteMetaImpl::CreateCollection(CollectionSchema& collection_schema) { collection_schema.id_ = -1; collection_schema.created_on_ = utils::GetMicroSecTimeStamp(); - try { - fiu_do_on("SqliteMetaImpl.CreateCollection.insert_throw_exception", throw std::exception()); - auto id = ConnectorPtr->insert(collection_schema); - collection_schema.id_ = id; - } catch (std::exception& e) { - return HandleException("Encounter exception when create collection", e.what()); + std::string id = "NULL"; // auto-increment + std::string& collection_id = collection_schema.collection_id_; + std::string state = std::to_string(collection_schema.state_); + std::string dimension = std::to_string(collection_schema.dimension_); + std::string created_on = std::to_string(collection_schema.created_on_); + std::string flag = std::to_string(collection_schema.flag_); + std::string index_file_size = std::to_string(collection_schema.index_file_size_); + std::string engine_type = std::to_string(collection_schema.engine_type_); + std::string& index_params = collection_schema.index_params_; + std::string metric_type = std::to_string(collection_schema.metric_type_); + std::string& owner_collection = collection_schema.owner_collection_; + std::string& partition_tag = collection_schema.partition_tag_; + std::string& version = collection_schema.version_; + std::string flush_lsn = std::to_string(collection_schema.flush_lsn_); + + std::string + statement = "INSERT INTO " + std::string(META_TABLES) + " VALUES(" + id + ", " + Quote(collection_id) + + ", " + state + ", " + dimension + ", " + created_on + ", " + flag + ", " + + index_file_size + ", " + engine_type + ", " + Quote(index_params) + ", " + metric_type + + ", " + Quote(owner_collection) + ", " + Quote(partition_tag) + ", " + Quote(version) + + ", " + flush_lsn + ");"; + + LOG_ENGINE_DEBUG_ << statement; + + fiu_do_on("SqliteMetaImpl.CreateCollection.insert_throw_exception", throw std::exception()); + auto status = SqlTransaction({statement}); + if (status.ok()) { + collection_schema.id_ = sqlite3_last_insert_rowid(db_); + } else { + return HandleException("Failed to create collection", status.message().c_str()); } LOG_ENGINE_DEBUG_ << "Successfully create collection: " << collection_schema.collection_id_; - return utils::CreateCollectionPath(options_, collection_schema.collection_id_); } catch (std::exception& e) { return HandleException("Encounter exception when create collection", e.what()); @@ -245,34 +397,31 @@ SqliteMetaImpl::CreateCollection(CollectionSchema& collection_schema) { Status SqliteMetaImpl::DescribeCollection(CollectionSchema& collection_schema) { try { - server::MetricCollector metric; - - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); fiu_do_on("SqliteMetaImpl.DescribeCollection.throw_exception", throw std::exception()); - auto groups = ConnectorPtr->select( - columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, - &CollectionSchema::created_on_, &CollectionSchema::flag_, &CollectionSchema::index_file_size_, - &CollectionSchema::engine_type_, &CollectionSchema::index_params_, &CollectionSchema::metric_type_, - &CollectionSchema::owner_collection_, &CollectionSchema::partition_tag_, - &CollectionSchema::version_, &CollectionSchema::flush_lsn_), - where(c(&CollectionSchema::collection_id_) == collection_schema.collection_id_ and - c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + server::MetricCollector metric; + std::string statement = "SELECT id, state, dimension, created_on, flag, index_file_size, engine_type," + " index_params, metric_type ,owner_table, partition_tag, version, flush_lsn FROM " + + std::string(META_TABLES) + " WHERE table_id = " + + Quote(collection_schema.collection_id_) + " AND state <> " + + std::to_string(CollectionSchema::TO_DELETE) + ";"; - if (groups.size() == 1) { - collection_schema.id_ = std::get<0>(groups[0]); - collection_schema.state_ = std::get<1>(groups[0]); - collection_schema.dimension_ = std::get<2>(groups[0]); - collection_schema.created_on_ = std::get<3>(groups[0]); - collection_schema.flag_ = std::get<4>(groups[0]); - collection_schema.index_file_size_ = std::get<5>(groups[0]); - collection_schema.engine_type_ = std::get<6>(groups[0]); - collection_schema.index_params_ = std::get<7>(groups[0]); - collection_schema.metric_type_ = std::get<8>(groups[0]); - collection_schema.owner_collection_ = std::get<9>(groups[0]); - collection_schema.partition_tag_ = std::get<10>(groups[0]); - collection_schema.version_ = std::get<11>(groups[0]); - collection_schema.flush_lsn_ = std::get<12>(groups[0]); + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (res.size() == 1) { + auto& resRow = res[0]; + collection_schema.id_ = std::stoul(resRow["id"]); + collection_schema.state_ = std::stoi(resRow["state"]); + collection_schema.dimension_ = std::stoi(resRow["dimension"]); + collection_schema.created_on_ = std::stol(resRow["created_on"]); + collection_schema.flag_ = std::stol(resRow["flag"]); + collection_schema.index_file_size_ = std::stol(resRow["index_file_size"]); + collection_schema.engine_type_ = std::stol(resRow["engine_type"]); + collection_schema.index_params_ = resRow["index_params"]; + collection_schema.metric_type_ = std::stol(resRow["metric_type"]); + collection_schema.owner_collection_ = resRow["owner_table"]; + collection_schema.partition_tag_ = resRow["partition_tag"]; + collection_schema.version_ = resRow["version"]; + collection_schema.flush_lsn_ = std::stoul(resRow["flush_lsn"]); } else { return Status(DB_NOT_FOUND, "Collection " + collection_schema.collection_id_ + " not found"); } @@ -291,27 +440,24 @@ SqliteMetaImpl::HasCollection(const std::string& collection_id, bool& has_or_not fiu_do_on("SqliteMetaImpl.HasCollection.throw_exception", throw std::exception()); server::MetricCollector metric; - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - - auto select_columns = columns(&CollectionSchema::id_, &CollectionSchema::owner_collection_); - decltype(ConnectorPtr->select(select_columns)) selected; + // since collection_id is a unique column we just need to check whether it exists or not + std::string statement; if (is_root) { - selected = ConnectorPtr->select(select_columns, - where(c(&CollectionSchema::collection_id_) == collection_id and - c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE and - c(&CollectionSchema::owner_collection_) == "")); + statement = "SELECT id FROM " + std::string(META_TABLES) + " WHERE table_id = " + Quote(collection_id) + + " AND state <> " + std::to_string(CollectionSchema::TO_DELETE) + + " AND owner_table = ''" + ";"; } else { - selected = ConnectorPtr->select(select_columns, - where(c(&CollectionSchema::collection_id_) == collection_id and - c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + statement = "SELECT id FROM " + std::string(META_TABLES) + " WHERE table_id = " + Quote(collection_id) + + " AND state <> " + std::to_string(CollectionSchema::TO_DELETE) + ";"; } - if (selected.size() == 1) { - has_or_not = true; - } else { - has_or_not = false; + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; } + + has_or_not = (res.size() > 0); } catch (std::exception& e) { return HandleException("Encounter exception when lookup collection", e.what()); } @@ -324,43 +470,37 @@ SqliteMetaImpl::AllCollections(std::vector& collection_schema_ try { fiu_do_on("SqliteMetaImpl.AllCollections.throw_exception", throw std::exception()); server::MetricCollector metric; - - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - auto select_columns = - columns(&CollectionSchema::id_, &CollectionSchema::collection_id_, &CollectionSchema::dimension_, - &CollectionSchema::created_on_, &CollectionSchema::flag_, &CollectionSchema::index_file_size_, - &CollectionSchema::engine_type_, &CollectionSchema::index_params_, &CollectionSchema::metric_type_, - &CollectionSchema::owner_collection_, &CollectionSchema::partition_tag_, - &CollectionSchema::version_, &CollectionSchema::flush_lsn_); - decltype(ConnectorPtr->select(select_columns)) selected; - + std::string + statement = "SELECT id, table_id, dimension, engine_type, index_params, index_file_size, metric_type," + "owner_table, partition_tag, version, flush_lsn FROM " + Quote(META_TABLES) + + " WHERE state <> " + std::to_string(CollectionSchema::TO_DELETE); if (is_root) { - selected = ConnectorPtr->select(select_columns, - where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE and - c(&CollectionSchema::owner_collection_) == "")); + statement += " AND owner_table = \"\";"; } else { - selected = ConnectorPtr->select(select_columns, - where(c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + statement += ";"; } - for (auto& collection : selected) { - CollectionSchema schema; - schema.id_ = std::get<0>(collection); - schema.collection_id_ = std::get<1>(collection); - schema.dimension_ = std::get<2>(collection); - schema.created_on_ = std::get<3>(collection); - schema.flag_ = std::get<4>(collection); - schema.index_file_size_ = std::get<5>(collection); - schema.engine_type_ = std::get<6>(collection); - schema.index_params_ = std::get<7>(collection); - schema.metric_type_ = std::get<8>(collection); - schema.owner_collection_ = std::get<9>(collection); - schema.partition_tag_ = std::get<10>(collection); - schema.version_ = std::get<11>(collection); - schema.flush_lsn_ = std::get<12>(collection); + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } - collection_schema_array.emplace_back(schema); + for (auto& resRow : res) { + CollectionSchema collection_schema; + collection_schema.id_ = std::stoul(resRow["id"]); + collection_schema.collection_id_ = resRow["table_id"]; + collection_schema.dimension_ = std::stoi(resRow["dimension"]); + collection_schema.index_file_size_ = std::stol(resRow["index_file_size"]); + collection_schema.engine_type_ = std::stoi(resRow["engine_type"]); + collection_schema.index_params_ = resRow["index_params"]; + collection_schema.metric_type_ = std::stoi(resRow["metric_type"]); + collection_schema.owner_collection_ = resRow["owner_table"]; + collection_schema.partition_tag_ = resRow["partition_tag"]; + collection_schema.version_ = resRow["version"]; + collection_schema.flush_lsn_ = std::stoul(resRow["flush_lsn"]); + + collection_schema_array.emplace_back(collection_schema); } } catch (std::exception& e) { return HandleException("Encounter exception when lookup all collections", e.what()); @@ -372,23 +512,35 @@ SqliteMetaImpl::AllCollections(std::vector& collection_schema_ Status SqliteMetaImpl::DropCollections(const std::vector& collection_id_array) { try { + server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.DropCollection.throw_exception", throw std::exception()); // distribute id array to batches std::vector> id_groups; DistributeBatch(collection_id_array, id_groups); - server::MetricCollector metric; + // soft delete collections + std::vector statements; + for (auto group : id_groups) { + std::string statement = "UPDATE " + std::string(META_TABLES) + " SET state = " + + std::to_string(CollectionSchema::TO_DELETE) + " WHERE table_id in("; + for (size_t i = 0; i < group.size(); i++) { + statement += Quote(group[i]); + if (i != group.size() - 1) { + statement += ","; + } + } + statement += ");"; + statements.emplace_back(statement); + } { - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(operation_mutex_); - for (auto group : id_groups) { - // soft delete collection - ConnectorPtr->update_all(set(c(&CollectionSchema::state_) = (int)CollectionSchema::TO_DELETE), - where(in(&CollectionSchema::collection_id_, group) and - c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + auto status = SqlTransaction(statements); + if (!status.ok()) { + return HandleException("Failed to drop collections", status.message().c_str()); } } @@ -398,30 +550,40 @@ SqliteMetaImpl::DropCollections(const std::vector& collection_id_ar } catch (std::exception& e) { return HandleException("Encounter exception when delete collection", e.what()); } - - return Status::OK(); } Status SqliteMetaImpl::DeleteCollectionFiles(const std::vector& collection_id_array) { try { + server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.DeleteCollectionFiles.throw_exception", throw std::exception()); // distribute id array to batches std::vector> id_groups; DistributeBatch(collection_id_array, id_groups); - server::MetricCollector metric; - - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(operation_mutex_); + // soft delete collection files + std::vector statements; for (auto group : id_groups) { - // soft delete collection files - ConnectorPtr->update_all(set(c(&SegmentSchema::file_type_) = (int)SegmentSchema::TO_DELETE, - c(&SegmentSchema::updated_time_) = utils::GetMicroSecTimeStamp()), - where(in(&SegmentSchema::collection_id_, group) and - c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE)); + std::string statement = "UPDATE " + std::string(META_TABLEFILES) + " SET file_type = " + + std::to_string(SegmentSchema::TO_DELETE) + " ,updated_time = " + + std::to_string(utils::GetMicroSecTimeStamp()) + " WHERE table_id in ("; + for (size_t i = 0; i < group.size(); i++) { + statement += Quote(group[i]); + if (i != group.size() - 1) { + statement += ","; + } + } + statement += (") AND file_type <> " + std::to_string(SegmentSchema::TO_DELETE) + ";"); + statements.emplace_back(statement); + } + + auto status = SqlTransaction(statements); + if (!status.ok()) { + return HandleException("Failed to drop collection files", status.message().c_str()); } LOG_ENGINE_DEBUG_ << "Successfully delete collection files"; @@ -463,19 +625,39 @@ SqliteMetaImpl::CreateCollectionFile(SegmentSchema& file_schema) { file_schema.engine_type_ = collection_schema.engine_type_; file_schema.metric_type_ = collection_schema.metric_type_; - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); + std::string id = "NULL"; // auto-increment + std::string collection_id = file_schema.collection_id_; + std::string segment_id = file_schema.segment_id_; + std::string engine_type = std::to_string(file_schema.engine_type_); + std::string file_id = file_schema.file_id_; + std::string file_type = std::to_string(file_schema.file_type_); + std::string file_size = std::to_string(file_schema.file_size_); + std::string row_count = std::to_string(file_schema.row_count_); + std::string updated_time = std::to_string(file_schema.updated_time_); + std::string created_on = std::to_string(file_schema.created_on_); + std::string date = std::to_string(file_schema.date_); + std::string flush_lsn = std::to_string(file_schema.flush_lsn_); - auto id = ConnectorPtr->insert(file_schema); - file_schema.id_ = id; + std::string statement = "INSERT INTO " + std::string(META_TABLEFILES) + " VALUES(" + id + ", " + + Quote(collection_id) + ", " + Quote(segment_id) + ", " + engine_type + ", " + + Quote(file_id) + ", " + file_type + ", " + file_size + ", " + row_count + + ", " + updated_time + ", " + created_on + ", " + date + ", " + flush_lsn + ");"; + + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(operation_mutex_); + + auto status = SqlTransaction({statement}); + if (!status.ok()) { + return HandleException("Failed to create collection file", status.message().c_str()); + } + + file_schema.id_ = sqlite3_last_insert_rowid(db_); LOG_ENGINE_DEBUG_ << "Successfully create collection file, file id = " << file_schema.file_id_; return utils::CreateCollectionFilePath(options_, file_schema); } catch (std::exception& e) { return HandleException("Encounter exception when create collection file", e.what()); } - - return Status::OK(); } Status @@ -484,108 +666,118 @@ SqliteMetaImpl::GetCollectionFiles(const std::string& collection_id, const std:: try { fiu_do_on("SqliteMetaImpl.GetCollectionFiles.throw_exception", throw std::exception()); - CollectionSchema collection_schema; - collection_schema.collection_id_ = collection_id; - auto status = DescribeCollection(collection_schema); + std::stringstream idSS; + for (auto& id : ids) { + idSS << "id = " << std::to_string(id) << " OR "; + } + std::string idStr = idSS.str(); + idStr = idStr.substr(0, idStr.size() - 4); // remove the last " OR " + + std::string statement = "SELECT id, segment_id, engine_type, file_id, file_type, file_size," + " row_count, date, created_on FROM " + std::string(META_TABLEFILES) + + " WHERE table_id = " + Quote(collection_id) + " AND (" + idStr + ")" + + " AND file_type <> " + std::to_string(SegmentSchema::TO_DELETE) + ";"; + + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(operation_mutex_); + + AttrsMapList res; + auto status = SqlQuery(statement, &res); if (!status.ok()) { return status; } - auto select_columns = - columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, - &SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_, - &SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_); - decltype(ConnectorPtr->select(select_columns)) selected; - { - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - selected = ConnectorPtr->select( - select_columns, - where(c(&SegmentSchema::collection_id_) == collection_id and in(&SegmentSchema::id_, ids) and - c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE)); + CollectionSchema collection_schema; + collection_schema.collection_id_ = collection_id; + status = DescribeCollection(collection_schema); + if (!status.ok()) { + return status; } - Status result; - for (auto& file : selected) { + for (auto& resRow : res) { SegmentSchema file_schema; + file_schema.id_ = std::stoul(resRow["id"]); file_schema.collection_id_ = collection_id; - file_schema.id_ = std::get<0>(file); - file_schema.segment_id_ = std::get<1>(file); - file_schema.file_id_ = std::get<2>(file); - file_schema.file_type_ = std::get<3>(file); - file_schema.file_size_ = std::get<4>(file); - file_schema.row_count_ = std::get<5>(file); - file_schema.date_ = std::get<6>(file); - file_schema.engine_type_ = std::get<7>(file); - file_schema.created_on_ = std::get<8>(file); - file_schema.dimension_ = collection_schema.dimension_; + file_schema.segment_id_ = resRow["segment_id"]; file_schema.index_file_size_ = collection_schema.index_file_size_; + file_schema.engine_type_ = std::stoi(resRow["engine_type"]); file_schema.index_params_ = collection_schema.index_params_; file_schema.metric_type_ = collection_schema.metric_type_; + file_schema.file_id_ = resRow["file_id"]; + file_schema.file_type_ = std::stoi(resRow["file_type"]); + file_schema.file_size_ = std::stoul(resRow["file_size"]); + file_schema.row_count_ = std::stoul(resRow["row_count"]); + file_schema.date_ = std::stoi(resRow["date"]); + file_schema.created_on_ = std::stol(resRow["created_on"]); + file_schema.dimension_ = collection_schema.dimension_; utils::GetCollectionFilePath(options_, file_schema); - files_holder.MarkFile(file_schema); } - LOG_ENGINE_DEBUG_ << "Get " << selected.size() << " files by id from collection " << collection_id; - return result; + LOG_ENGINE_DEBUG_ << "Get " << res.size() << " files by id from collection " << collection_id; } catch (std::exception& e) { return HandleException("Encounter exception when lookup collection files", e.what()); } + + return Status::OK(); } Status SqliteMetaImpl::GetCollectionFilesBySegmentId(const std::string& segment_id, FilesHolder& files_holder) { try { - auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, - &SegmentSchema::created_on_); - decltype(ConnectorPtr->select(select_columns)) selected; + std::string statement = "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size," + " row_count, date, created_on FROM " + std::string(META_TABLEFILES) + + " WHERE segment_id = " + Quote(segment_id) + " AND file_type <> " + + std::to_string(SegmentSchema::TO_DELETE) + ";"; + + AttrsMapList res; { - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - selected = ConnectorPtr->select(select_columns, - where(c(&SegmentSchema::segment_id_) == segment_id and - c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE)); + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(operation_mutex_); + + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } } - if (!selected.empty()) { + if (!res.empty()) { CollectionSchema collection_schema; - collection_schema.collection_id_ = std::get<1>(selected[0]); + collection_schema.collection_id_ = res[0]["table_id"]; auto status = DescribeCollection(collection_schema); if (!status.ok()) { return status; } - for (auto& file : selected) { + for (auto& resRow : res) { SegmentSchema file_schema; + file_schema.id_ = std::stoul(resRow["id"]); file_schema.collection_id_ = collection_schema.collection_id_; - file_schema.id_ = std::get<0>(file); - file_schema.segment_id_ = std::get<2>(file); - file_schema.file_id_ = std::get<3>(file); - file_schema.file_type_ = std::get<4>(file); - file_schema.file_size_ = std::get<5>(file); - file_schema.row_count_ = std::get<6>(file); - file_schema.date_ = std::get<7>(file); - file_schema.engine_type_ = std::get<8>(file); - file_schema.created_on_ = std::get<9>(file); - file_schema.dimension_ = collection_schema.dimension_; + file_schema.segment_id_ = resRow["segment_id"]; file_schema.index_file_size_ = collection_schema.index_file_size_; + file_schema.engine_type_ = std::stoi(resRow["engine_type"]); file_schema.index_params_ = collection_schema.index_params_; file_schema.metric_type_ = collection_schema.metric_type_; + file_schema.file_id_ = resRow["file_id"]; + file_schema.file_type_ = std::stoi(resRow["file_type"]); + file_schema.file_size_ = std::stoul(resRow["file_size"]); + file_schema.row_count_ = std::stoul(resRow["row_count"]); + file_schema.date_ = std::stoi(resRow["date"]); + file_schema.created_on_ = std::stol(resRow["created_on"]); + file_schema.dimension_ = collection_schema.dimension_; utils::GetCollectionFilePath(options_, file_schema); files_holder.MarkFile(file_schema); } } - LOG_ENGINE_DEBUG_ << "Get " << selected.size() << " files by segment id" << segment_id; - return Status::OK(); + LOG_ENGINE_DEBUG_ << "Get " << res.size() << " files by segment id" << segment_id; } catch (std::exception& e) { return HandleException("Encounter exception when lookup collection files by segment id", e.what()); } + + return Status::OK(); } Status @@ -594,12 +786,14 @@ SqliteMetaImpl::UpdateCollectionFlag(const std::string& collection_id, int64_t f server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.UpdateCollectionFlag.throw_exception", throw std::exception()); - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); + std::string statement = "UPDATE " + std::string(META_TABLES) + " SET flag = " + std::to_string(flag) + + " WHERE table_id = " + Quote(collection_id) + ";"; + + auto status = SqlTransaction({statement}); + if (!status.ok()) { + return HandleException("Failed to update collection flag", status.message().c_str()); + } - // set all backup file to raw - ConnectorPtr->update_all(set(c(&CollectionSchema::flag_) = flag), - where(c(&CollectionSchema::collection_id_) == collection_id)); LOG_ENGINE_DEBUG_ << "Successfully update collection flag, collection id = " << collection_id; } catch (std::exception& e) { std::string msg = "Encounter exception when update collection flag: collection_id = " + collection_id; @@ -614,11 +808,14 @@ SqliteMetaImpl::UpdateCollectionFlushLSN(const std::string& collection_id, uint6 try { server::MetricCollector metric; - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); + std::string statement = "UPDATE " + std::string(META_TABLES) + " SET flush_lsn = " + + std::to_string(flush_lsn) + " WHERE table_id = " + Quote(collection_id) + ";"; + + auto status = SqlTransaction({statement}); + if (!status.ok()) { + return HandleException("Failed to update collection flush_lsn", status.message().c_str()); + } - ConnectorPtr->update_all(set(c(&CollectionSchema::flush_lsn_) = flush_lsn), - where(c(&CollectionSchema::collection_id_) == collection_id)); LOG_ENGINE_DEBUG_ << "Successfully update collection flush_lsn, collection id = " << collection_id << " flush_lsn = " << flush_lsn; } catch (std::exception& e) { @@ -634,14 +831,17 @@ SqliteMetaImpl::GetCollectionFlushLSN(const std::string& collection_id, uint64_t try { server::MetricCollector metric; - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); + std::string statement = "SELECT flush_lsn FROM " + std::string(META_TABLES) + " WHERE table_id = " + + Quote(collection_id) + ";"; - auto selected = ConnectorPtr->select(columns(&CollectionSchema::flush_lsn_), - where(c(&CollectionSchema::collection_id_) == collection_id)); + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } - if (selected.size() > 0) { - flush_lsn = std::get<0>(selected[0]); + if (res.size() > 0) { + flush_lsn = std::stoul(res[0]["flush_lsn"]); } else { return Status(DB_NOT_FOUND, "Collection " + collection_id + " not found"); } @@ -660,20 +860,45 @@ SqliteMetaImpl::UpdateCollectionFile(SegmentSchema& file_schema) { server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.UpdateCollectionFile.throw_exception", throw std::exception()); - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); + std::string statement = "SELECT state FROM " + std::string(META_TABLES) + " WHERE table_id = " + + Quote(file_schema.collection_id_) + ";"; - auto collections = - ConnectorPtr->select(columns(&CollectionSchema::state_), - where(c(&CollectionSchema::collection_id_) == file_schema.collection_id_)); + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(operation_mutex_); + + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } // if the collection has been deleted, just mark the collection file as TO_DELETE // clean thread will delete the file later - if (collections.size() < 1 || std::get<0>(collections[0]) == (int)CollectionSchema::TO_DELETE) { + if (res.size() < 1 || res[0]["state"] == std::to_string(CollectionSchema::TO_DELETE)) { file_schema.file_type_ = SegmentSchema::TO_DELETE; } - ConnectorPtr->update(file_schema); + std::string id = std::to_string(file_schema.id_); + std::string collection_id = file_schema.collection_id_; + std::string engine_type = std::to_string(file_schema.engine_type_); + std::string file_id = file_schema.file_id_; + std::string file_type = std::to_string(file_schema.file_type_); + std::string file_size = std::to_string(file_schema.file_size_); + std::string row_count = std::to_string(file_schema.row_count_); + std::string updated_time = std::to_string(file_schema.updated_time_); + std::string created_on = std::to_string(file_schema.created_on_); + std::string date = std::to_string(file_schema.date_); + + statement = "UPDATE " + std::string(META_TABLEFILES) + " SET table_id = " + Quote(collection_id) + + " ,engine_type = " + engine_type + " ,file_id = " + Quote(file_id) + + " ,file_type = " + file_type + " ,file_size = " + file_size + " ,row_count = " + row_count + + " ,updated_time = " + updated_time + " ,created_on = " + created_on + " ,date = " + date + + " WHERE id = " + id + ";"; + + status = SqlTransaction({statement}); + if (!status.ok()) { + return HandleException("Failed to update collection flush_lsn", status.message().c_str()); + } LOG_ENGINE_DEBUG_ << "Update single collection file, file id = " << file_schema.file_id_; } catch (std::exception& e) { @@ -681,6 +906,7 @@ SqliteMetaImpl::UpdateCollectionFile(SegmentSchema& file_schema) { " file_id = " + file_schema.file_id_; return HandleException(msg, e.what()); } + return Status::OK(); } @@ -690,46 +916,71 @@ SqliteMetaImpl::UpdateCollectionFiles(SegmentsSchema& files) { server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.UpdateCollectionFiles.throw_exception", throw std::exception()); - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(operation_mutex_); std::map has_collections; for (auto& file : files) { if (has_collections.find(file.collection_id_) != has_collections.end()) { continue; } - auto collections = - ConnectorPtr->select(columns(&CollectionSchema::id_), - where(c(&CollectionSchema::collection_id_) == file.collection_id_ and - c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); - if (collections.size() >= 1) { + + std::string statement = "SELECT id FROM " + std::string(META_TABLES) + + " WHERE table_id = " + Quote(file.collection_id_) + " AND state <> " + + std::to_string(CollectionSchema::TO_DELETE) + ";"; + + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } + + if (res.size() >= 1) { has_collections[file.collection_id_] = true; } else { has_collections[file.collection_id_] = false; } } - auto commited = ConnectorPtr->transaction([&]() mutable { - for (auto& file : files) { - if (!has_collections[file.collection_id_]) { - file.file_type_ = SegmentSchema::TO_DELETE; - } - - file.updated_time_ = utils::GetMicroSecTimeStamp(); - ConnectorPtr->update(file); + std::vector statements; + for (auto& file : files) { + if (!has_collections[file.collection_id_]) { + file.file_type_ = SegmentSchema::TO_DELETE; } - return true; - }); - fiu_do_on("SqliteMetaImpl.UpdateCollectionFiles.fail_commited", commited = false); - if (!commited) { - return HandleException("UpdateCollectionFiles error: sqlite transaction failed"); + file.updated_time_ = utils::GetMicroSecTimeStamp(); + + std::string id = std::to_string(file.id_); + std::string& collection_id = file.collection_id_; + std::string engine_type = std::to_string(file.engine_type_); + std::string& file_id = file.file_id_; + std::string file_type = std::to_string(file.file_type_); + std::string file_size = std::to_string(file.file_size_); + std::string row_count = std::to_string(file.row_count_); + std::string updated_time = std::to_string(file.updated_time_); + std::string created_on = std::to_string(file.created_on_); + std::string date = std::to_string(file.date_); + + std::string statement = "UPDATE " + std::string(META_TABLEFILES) + " SET table_id = " + + Quote(collection_id) + " ,engine_type = " + engine_type + " ,file_id = " + + Quote(file_id) + + " ,file_type = " + file_type + " ,file_size = " + file_size + + " ,row_count = " + row_count + " ,updated_time = " + updated_time + + " ,created_on = " + created_on + " ,date = " + date + " WHERE id = " + id + ";"; + statements.emplace_back(statement); + } + + auto status = SqlTransaction(statements); + fiu_do_on("SqliteMetaImpl.UpdateCollectionFiles.fail_commited", status = Status(DB_ERROR, "")); + if (!status.ok()) { + return HandleException("Failed to update collection flush_lsn", status.message().c_str()); } LOG_ENGINE_DEBUG_ << "Update " << files.size() << " collection files"; } catch (std::exception& e) { return HandleException("Encounter exception when update collection files", e.what()); } + return Status::OK(); } @@ -738,18 +989,27 @@ SqliteMetaImpl::UpdateCollectionFilesRowCount(SegmentsSchema& files) { try { server::MetricCollector metric; - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - for (auto& file : files) { - ConnectorPtr->update_all(set(c(&SegmentSchema::row_count_) = file.row_count_, - c(&SegmentSchema::updated_time_) = utils::GetMicroSecTimeStamp()), - where(c(&SegmentSchema::file_id_) == file.file_id_)); + std::string row_count = std::to_string(file.row_count_); + std::string updated_time = std::to_string(utils::GetMicroSecTimeStamp()); + + std::string statement = "UPDATE " + std::string(META_TABLEFILES) + " SET row_count = " + row_count + + " , updated_time = " + updated_time + " WHERE file_id = " + file.file_id_ + ";"; + + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(operation_mutex_); + + auto status = SqlTransaction({statement}); + if (!status.ok()) { + return HandleException("Failed to update collection file row count", status.message().c_str()); + } + LOG_ENGINE_DEBUG_ << "Update file " << file.file_id_ << " row count to " << file.row_count_; } } catch (std::exception& e) { return HandleException("Encounter exception when update collection files row count", e.what()); } + return Status::OK(); } @@ -759,46 +1019,38 @@ SqliteMetaImpl::UpdateCollectionIndex(const std::string& collection_id, const Co server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.UpdateCollectionIndex.throw_exception", throw std::exception()); - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); + std::string statement = "SELECT id, state, dimension, created_on FROM " + std::string(META_TABLES) + + " WHERE table_id = " + Quote(collection_id) + + " AND state <> " + std::to_string(CollectionSchema::TO_DELETE) + ";"; - auto collections = ConnectorPtr->select( + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } - columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, - &CollectionSchema::created_on_, &CollectionSchema::flag_, &CollectionSchema::index_file_size_, - &CollectionSchema::owner_collection_, &CollectionSchema::partition_tag_, - &CollectionSchema::version_, &CollectionSchema::flush_lsn_), - where(c(&CollectionSchema::collection_id_) == collection_id and - c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + if (res.size() == 1) { + auto& resRow = res[0]; - if (collections.size() > 0) { - meta::CollectionSchema collection_schema; - collection_schema.id_ = std::get<0>(collections[0]); - collection_schema.collection_id_ = collection_id; - collection_schema.state_ = std::get<1>(collections[0]); - collection_schema.dimension_ = std::get<2>(collections[0]); - collection_schema.created_on_ = std::get<3>(collections[0]); - collection_schema.flag_ = std::get<4>(collections[0]); - collection_schema.index_file_size_ = std::get<5>(collections[0]); - collection_schema.owner_collection_ = std::get<6>(collections[0]); - collection_schema.partition_tag_ = std::get<7>(collections[0]); - collection_schema.version_ = std::get<8>(collections[0]); - collection_schema.flush_lsn_ = std::get<9>(collections[0]); - collection_schema.engine_type_ = index.engine_type_; - collection_schema.index_params_ = index.extra_params_.dump(); - collection_schema.metric_type_ = index.metric_type_; + auto id = resRow["id"]; + auto state = resRow["state"]; + auto dimension = resRow["dimension"]; + auto created_on = resRow["created_on"]; - ConnectorPtr->update(collection_schema); + statement = "UPDATE " + std::string(META_TABLES) + " SET id = " + id + " ,state = " + state + + " ,dimension = " + dimension + " ,created_on = " + created_on + + " ,engine_type = " + std::to_string(index.engine_type_) + " ,index_params = " + + Quote(index.extra_params_.dump()) + " ,metric_type = " + std::to_string(index.metric_type_) + + " WHERE table_id = " + Quote(collection_id) + ";"; + + auto status = SqlTransaction({statement}); + if (!status.ok()) { + return HandleException("Failed to update collection index", status.message().c_str()); + } } else { return Status(DB_NOT_FOUND, "Collection " + collection_id + " not found"); } - // set all backup file to raw - ConnectorPtr->update_all(set(c(&SegmentSchema::file_type_) = (int)SegmentSchema::RAW, - c(&SegmentSchema::updated_time_) = utils::GetMicroSecTimeStamp()), - where(c(&SegmentSchema::collection_id_) == collection_id and - c(&SegmentSchema::file_type_) == (int)SegmentSchema::BACKUP)); - LOG_ENGINE_DEBUG_ << "Successfully update collection index, collection id = " << collection_id; } catch (std::exception& e) { std::string msg = "Encounter exception when update collection index: collection_id = " + collection_id; @@ -814,13 +1066,18 @@ SqliteMetaImpl::UpdateCollectionFilesToIndex(const std::string& collection_id) { server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.UpdateCollectionFilesToIndex.throw_exception", throw std::exception()); - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); + std::string statement = "UPDATE " + std::string(META_TABLEFILES) + " SET file_type = " + + std::to_string(SegmentSchema::TO_INDEX) + " WHERE table_id = " + Quote(collection_id) + + " AND row_count >= " + std::to_string(meta::BUILD_INDEX_THRESHOLD) + + " AND file_type = " + std::to_string(SegmentSchema::RAW) + ";"; - ConnectorPtr->update_all(set(c(&SegmentSchema::file_type_) = (int)SegmentSchema::TO_INDEX), - where(c(&SegmentSchema::collection_id_) == collection_id and - c(&SegmentSchema::row_count_) >= meta::BUILD_INDEX_THRESHOLD and - c(&SegmentSchema::file_type_) == (int)SegmentSchema::RAW)); + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(operation_mutex_); + + auto status = SqlTransaction({statement}); + if (!status.ok()) { + return HandleException("Failed to update collection files to index", status.message().c_str()); + } LOG_ENGINE_DEBUG_ << "Update files to to_index, collection id = " << collection_id; } catch (std::exception& e) { @@ -836,15 +1093,23 @@ SqliteMetaImpl::DescribeCollectionIndex(const std::string& collection_id, Collec server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.DescribeCollectionIndex.throw_exception", throw std::exception()); - auto groups = ConnectorPtr->select( - columns(&CollectionSchema::engine_type_, &CollectionSchema::index_params_, &CollectionSchema::metric_type_), - where(c(&CollectionSchema::collection_id_) == collection_id and - c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + std::string statement = "SELECT engine_type, index_params, metric_type FROM " + + std::string(META_TABLES) + " WHERE table_id = " + Quote(collection_id) + + " AND state <> " + std::to_string(CollectionSchema::TO_DELETE) + ";"; - if (groups.size() == 1) { - index.engine_type_ = std::get<0>(groups[0]); - index.extra_params_ = milvus::json::parse(std::get<1>(groups[0])); - index.metric_type_ = std::get<2>(groups[0]); + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } + + if (res.size() == 1) { + auto& resRow = res[0]; + + index.engine_type_ = std::stoi(resRow["engine_type"]); + std::string str_index_params = resRow["index_params"]; + index.extra_params_ = milvus::json::parse(str_index_params); + index.metric_type_ = std::stoi(resRow["metric_type"]); } else { return Status(DB_NOT_FOUND, "Collection " + collection_id + " not found"); } @@ -861,35 +1126,36 @@ SqliteMetaImpl::DropCollectionIndex(const std::string& collection_id) { server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.DropCollectionIndex.throw_exception", throw std::exception()); - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - + std::vector statements; // soft delete index files - ConnectorPtr->update_all(set(c(&SegmentSchema::file_type_) = (int)SegmentSchema::TO_DELETE, - c(&SegmentSchema::updated_time_) = utils::GetMicroSecTimeStamp()), - where(c(&SegmentSchema::collection_id_) == collection_id and - c(&SegmentSchema::file_type_) == (int)SegmentSchema::INDEX)); + std::string statement = "UPDATE " + std::string(META_TABLEFILES) + " SET file_type = " + + std::to_string(SegmentSchema::TO_DELETE) + " ,updated_time = " + + std::to_string(utils::GetMicroSecTimeStamp()) + " WHERE table_id = " + + Quote(collection_id) + " AND file_type = " + std::to_string(SegmentSchema::INDEX) + + ";"; + statements.emplace_back(statement); // set all backup file to raw - ConnectorPtr->update_all(set(c(&SegmentSchema::file_type_) = (int)SegmentSchema::RAW, - c(&SegmentSchema::updated_time_) = utils::GetMicroSecTimeStamp()), - where(c(&SegmentSchema::collection_id_) == collection_id and - c(&SegmentSchema::file_type_) == (int)SegmentSchema::BACKUP)); + statement = "UPDATE " + std::string(META_TABLEFILES) + " SET file_type = " + + std::to_string(SegmentSchema::RAW) + " ,updated_time = " + + std::to_string(utils::GetMicroSecTimeStamp()) + " WHERE table_id = " + + Quote(collection_id) + " AND file_type = " + std::to_string(SegmentSchema::BACKUP) + ";"; + statements.emplace_back(statement); // set collection index type to raw - auto groups = ConnectorPtr->select(columns(&CollectionSchema::metric_type_), - where(c(&CollectionSchema::collection_id_) == collection_id)); + statement = "UPDATE " + std::string(META_TABLES) + " SET engine_type = (CASE WHEN metric_type in (" + + std::to_string((int32_t)MetricType::HAMMING) + " ," + + std::to_string((int32_t)MetricType::JACCARD) + " ," + + std::to_string((int32_t)MetricType::TANIMOTO) + ")" + + " THEN " + std::to_string((int32_t)EngineType::FAISS_BIN_IDMAP) + + " ELSE " + std::to_string((int32_t)EngineType::FAISS_IDMAP) + " END)" + + " , index_params = '{}' WHERE table_id = " + Quote(collection_id) + ";"; + statements.emplace_back(statement); - int32_t raw_engine_type = DEFAULT_ENGINE_TYPE; - if (groups.size() == 1) { - int32_t metric_type_ = std::get<0>(groups[0]); - if (engine::utils::IsBinaryMetricType(metric_type_)) { - raw_engine_type = (int32_t)EngineType::FAISS_BIN_IDMAP; - } + auto status = SqlTransaction(statements); + if (!status.ok()) { + return HandleException("Failed to drop collection index", status.message().c_str()); } - ConnectorPtr->update_all( - set(c(&CollectionSchema::engine_type_) = raw_engine_type, c(&CollectionSchema::index_params_) = "{}"), - where(c(&CollectionSchema::collection_id_) == collection_id)); LOG_ENGINE_DEBUG_ << "Successfully drop collection index, collection id = " << collection_id; } catch (std::exception& e) { @@ -961,11 +1227,18 @@ SqliteMetaImpl::HasPartition(const std::string& collection_id, const std::string std::string valid_tag = tag; server::StringHelpFunctions::TrimStringBlank(valid_tag); - auto name = ConnectorPtr->select(columns(&CollectionSchema::collection_id_), - where(c(&CollectionSchema::owner_collection_) == collection_id and - c(&CollectionSchema::partition_tag_) == valid_tag and - c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); - if (name.size() > 0) { + std::string statement = "SELECT table_id FROM " + std::string(META_TABLES) + + " WHERE owner_table = " + Quote(collection_id) + + " AND partition_tag = " + Quote(valid_tag) + + " AND state <> " + std::to_string(CollectionSchema::TO_DELETE) + ";"; + + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } + + if (res.size() > 0) { has_or_not = true; } else { has_or_not = false; @@ -989,31 +1262,35 @@ SqliteMetaImpl::ShowPartitions(const std::string& collection_id, server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.ShowPartitions.throw_exception", throw std::exception()); - auto partitions = ConnectorPtr->select( - columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, - &CollectionSchema::created_on_, &CollectionSchema::flag_, &CollectionSchema::index_file_size_, - &CollectionSchema::engine_type_, &CollectionSchema::index_params_, &CollectionSchema::metric_type_, - &CollectionSchema::partition_tag_, &CollectionSchema::version_, &CollectionSchema::collection_id_, - &CollectionSchema::flush_lsn_), - where(c(&CollectionSchema::owner_collection_) == collection_id and - c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); + std::string statement = "SELECT table_id, id, state, dimension, created_on, flag, index_file_size," + " engine_type, index_params, metric_type, partition_tag, version, flush_lsn FROM " + + std::string(META_TABLES) + " WHERE owner_table = " + + Quote(collection_id) + " AND state <> " + + std::to_string(CollectionSchema::TO_DELETE) + ";"; - for (size_t i = 0; i < partitions.size(); i++) { + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } + + for (auto& resRow : res) { meta::CollectionSchema partition_schema; - partition_schema.id_ = std::get<0>(partitions[i]); - partition_schema.state_ = std::get<1>(partitions[i]); - partition_schema.dimension_ = std::get<2>(partitions[i]); - partition_schema.created_on_ = std::get<3>(partitions[i]); - partition_schema.flag_ = std::get<4>(partitions[i]); - partition_schema.index_file_size_ = std::get<5>(partitions[i]); - partition_schema.engine_type_ = std::get<6>(partitions[i]); - partition_schema.index_params_ = std::get<7>(partitions[i]); - partition_schema.metric_type_ = std::get<8>(partitions[i]); + partition_schema.collection_id_ = resRow["table_id"]; + partition_schema.id_ = std::stoul(resRow["id"]); + partition_schema.state_ = std::stoi(resRow["state"]); + partition_schema.dimension_ = std::stoi(resRow["dimension"]); + partition_schema.created_on_ = std::stol(resRow["created_on"]); + partition_schema.flag_ = std::stol(resRow["flag"]); + partition_schema.index_file_size_ = std::stol(resRow["index_file_size"]); + partition_schema.engine_type_ = std::stoi(resRow["engine_type"]); + partition_schema.index_params_ = resRow["index_params"]; + partition_schema.metric_type_ = std::stoi(resRow["metric_type"]); partition_schema.owner_collection_ = collection_id; - partition_schema.partition_tag_ = std::get<9>(partitions[i]); - partition_schema.version_ = std::get<10>(partitions[i]); - partition_schema.collection_id_ = std::get<11>(partitions[i]); - partition_schema.flush_lsn_ = std::get<12>(partitions[i]); + partition_schema.partition_tag_ = resRow["partition_tag"]; + partition_schema.version_ = resRow["version"]; + partition_schema.flush_lsn_ = std::stoul(resRow["flush_lsn"]); + partition_schema_array.emplace_back(partition_schema); } } catch (std::exception& e) { @@ -1027,11 +1304,19 @@ Status SqliteMetaImpl::CountPartitions(const std::string& collection_id, int64_t& partition_count) { try { partition_count = 0; - auto ret = ConnectorPtr->select(count(), - where(c(&CollectionSchema::owner_collection_) == collection_id and - c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); - if (ret.size() == 1) { - partition_count = ret[0]; + + std::string statement = "SELECT count(*) FROM " + std::string(META_TABLES) + + " WHERE owner_table = " + Quote(collection_id) + + " AND state <> " + std::to_string(CollectionSchema::TO_DELETE) + ";"; + + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } + + if (res.size() == 1) { + partition_count = std::stol(res[0]["count(*)"]); } else { return Status(DB_NOT_FOUND, "Collection " + collection_id + " not found"); } @@ -1054,12 +1339,19 @@ SqliteMetaImpl::GetPartitionName(const std::string& collection_id, const std::st std::string valid_tag = tag; server::StringHelpFunctions::TrimStringBlank(valid_tag); - auto name = ConnectorPtr->select(columns(&CollectionSchema::collection_id_), - where(c(&CollectionSchema::owner_collection_) == collection_id and - c(&CollectionSchema::partition_tag_) == valid_tag and - c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); - if (name.size() > 0) { - partition_name = std::get<0>(name[0]); + std::string statement = "SELECT table_id FROM " + std::string(META_TABLES) + + " WHERE owner_table = " + Quote(collection_id) + + " AND partition_tag = " + Quote(valid_tag) + + " AND state <> " + std::to_string(CollectionSchema::TO_DELETE) + ";"; + + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } + + if (res.size() > 0) { + partition_name = res[0]["table_id"]; } else { return Status(DB_NOT_FOUND, "Collection " + collection_id + "'s partition " + valid_tag + " not found"); } @@ -1076,47 +1368,44 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& fil server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.FilesToSearch.throw_exception", throw std::exception()); - CollectionSchema collection_schema; - collection_schema.collection_id_ = collection_id; - auto status = DescribeCollection(collection_schema); + std::string statement = "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date," + " engine_type, created_on, updated_time FROM " + std::string(META_TABLEFILES) + + " WHERE table_id = " + Quote(collection_id) + + " AND (file_type = " + std::to_string(SegmentSchema::RAW) + + " OR file_type = " + std::to_string(SegmentSchema::TO_INDEX) + + " OR file_type = " + std::to_string(SegmentSchema::INDEX) + ");"; + + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(operation_mutex_); + + AttrsMapList res; + auto status = SqlQuery(statement, &res); if (!status.ok()) { return status; } - // perform query - auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, - &SegmentSchema::created_on_, &SegmentSchema::updated_time_); - - auto match_collectionid = c(&SegmentSchema::collection_id_) == collection_id; - - std::vector file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX, - (int)SegmentSchema::INDEX}; - auto match_type = in(&SegmentSchema::file_type_, file_types); - decltype(ConnectorPtr->select(select_columns)) selected; - { - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - auto filter = where(match_collectionid and match_type); - selected = ConnectorPtr->select(select_columns, filter); + CollectionSchema collection_schema; + collection_schema.collection_id_ = collection_id; + status = DescribeCollection(collection_schema); + if (!status.ok()) { + return status; } Status ret; int64_t files_count = 0; - for (auto& file : selected) { + for (auto& resRow : res) { SegmentSchema collection_file; - collection_file.id_ = std::get<0>(file); - collection_file.collection_id_ = std::get<1>(file); - collection_file.segment_id_ = std::get<2>(file); - collection_file.file_id_ = std::get<3>(file); - collection_file.file_type_ = std::get<4>(file); - collection_file.file_size_ = std::get<5>(file); - collection_file.row_count_ = std::get<6>(file); - collection_file.date_ = std::get<7>(file); - collection_file.engine_type_ = std::get<8>(file); - collection_file.created_on_ = std::get<9>(file); - collection_file.updated_time_ = std::get<10>(file); + collection_file.id_ = std::stoul(resRow["id"]); + collection_file.collection_id_ = resRow["table_id"]; + collection_file.segment_id_ = resRow["segment_id"]; + collection_file.file_id_ = resRow["file_id"]; + collection_file.file_type_ = std::stoi(resRow["file_type"]); + collection_file.file_size_ = std::stoul(resRow["file_size"]); + collection_file.row_count_ = std::stoul(resRow["row_count"]); + collection_file.date_ = std::stoi(resRow["date"]); + collection_file.engine_type_ = std::stoi(resRow["engine_type"]); + collection_file.created_on_ = std::stol(resRow["created_on"]); + collection_file.updated_time_ = std::stol(resRow["updated_time"]); collection_file.dimension_ = collection_schema.dimension_; collection_file.index_file_size_ = collection_schema.index_file_size_; @@ -1132,6 +1421,7 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& fil files_holder.MarkFile(collection_file); files_count++; } + if (files_count == 0) { LOG_ENGINE_DEBUG_ << "No file to search for collection: " << collection_id; } else { @@ -1178,38 +1468,43 @@ SqliteMetaImpl::FilesToSearchEx(const std::string& root_collection, const std::s int64_t files_count = 0; Status ret; for (auto group : id_groups) { - auto select_columns = - columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, - &SegmentSchema::created_on_, &SegmentSchema::updated_time_); + std::string statement = "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, " + "date, engine_type, created_on, updated_time FROM " + Quote(META_TABLEFILES) + + " WHERE table_id in ("; + for (size_t i = 0; i < group.size(); i++) { + statement += Quote(group[i]); + if (i != group.size() - 1) { + statement += ","; + } + } + statement += ")"; + statement += (" AND (file_type = " + std::to_string(SegmentSchema::RAW)); + statement += (" OR file_type = " + std::to_string(SegmentSchema::TO_INDEX)); + statement += (" OR file_type = " + std::to_string(SegmentSchema::INDEX) + ");"); - auto match_collectionid = in(&SegmentSchema::collection_id_, group); + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(operation_mutex_); - std::vector file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX, - (int)SegmentSchema::INDEX}; - auto match_type = in(&SegmentSchema::file_type_, file_types); - decltype(ConnectorPtr->select(select_columns)) selected; - { - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - auto filter = where(match_collectionid and match_type); - selected = ConnectorPtr->select(select_columns, filter); + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; } - for (auto& file : selected) { + for (auto& resRow : res) { SegmentSchema collection_file; - collection_file.id_ = std::get<0>(file); - collection_file.collection_id_ = std::get<1>(file); - collection_file.segment_id_ = std::get<2>(file); - collection_file.file_id_ = std::get<3>(file); - collection_file.file_type_ = std::get<4>(file); - collection_file.file_size_ = std::get<5>(file); - collection_file.row_count_ = std::get<6>(file); - collection_file.date_ = std::get<7>(file); - collection_file.engine_type_ = std::get<8>(file); - collection_file.created_on_ = std::get<9>(file); - collection_file.updated_time_ = std::get<10>(file); + collection_file.id_ = std::stoul(resRow["id"]); + collection_file.collection_id_ = resRow["table_id"]; + collection_file.segment_id_ = resRow["segment_id"]; + collection_file.file_id_ = resRow["file_id"]; + collection_file.file_type_ = std::stoi(resRow["file_type"]); + collection_file.file_size_ = std::stoul(resRow["file_size"]); + collection_file.row_count_ = std::stoul(resRow["row_count"]); + collection_file.date_ = std::stoi(resRow["date"]); + collection_file.engine_type_ = std::stoi(resRow["engine_type"]); + collection_file.created_on_ = std::stol(resRow["created_on"]); + collection_file.updated_time_ = std::stol(resRow["updated_time"]); + collection_file.dimension_ = collection_schema.dimension_; collection_file.index_file_size_ = collection_schema.index_file_size_; collection_file.index_params_ = collection_schema.index_params_; @@ -1234,6 +1529,8 @@ SqliteMetaImpl::FilesToSearchEx(const std::string& root_collection, const std::s } catch (std::exception& e) { return HandleException("Encounter exception when iterate index files", e.what()); } + + return Status::OK(); } Status @@ -1252,39 +1549,41 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& file } // get files to merge - auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, - &SegmentSchema::created_on_, &SegmentSchema::updated_time_); - decltype(ConnectorPtr->select(select_columns)) selected; - { - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - selected = ConnectorPtr->select(select_columns, - where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::RAW and - c(&SegmentSchema::collection_id_) == collection_id), - order_by(&SegmentSchema::file_size_).desc()); + std::string statement = "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, " + "date, engine_type, created_on, updated_time FROM " + Quote(META_TABLEFILES) + + " WHERE table_id = " + Quote(collection_id) + + " AND file_type = " + std::to_string(SegmentSchema::RAW) + + " ORDER BY row_count DESC;"; + + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(operation_mutex_); + + AttrsMapList res; + status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; } - Status result; + Status ret; SegmentsSchema files; - for (auto& file : selected) { + for (auto& resRow : res) { SegmentSchema collection_file; - collection_file.file_size_ = std::get<5>(file); - if (collection_file.file_size_ >= (size_t)(collection_schema.index_file_size_)) { + collection_file.file_size_ = std::stoul(resRow["file_size"]); + if ((int64_t)(collection_file.file_size_) >= collection_schema.index_file_size_) { continue; // skip large file } - collection_file.id_ = std::get<0>(file); - collection_file.collection_id_ = std::get<1>(file); - collection_file.segment_id_ = std::get<2>(file); - collection_file.file_id_ = std::get<3>(file); - collection_file.file_type_ = std::get<4>(file); - collection_file.row_count_ = std::get<6>(file); - collection_file.date_ = std::get<7>(file); - collection_file.engine_type_ = std::get<8>(file); - collection_file.created_on_ = std::get<9>(file); - collection_file.updated_time_ = std::get<10>(file); + collection_file.id_ = std::stoul(resRow["id"]); + collection_file.collection_id_ = resRow["table_id"]; + collection_file.segment_id_ = resRow["segment_id"]; + collection_file.file_id_ = resRow["file_id"]; + collection_file.file_type_ = std::stoi(resRow["file_type"]); + collection_file.file_size_ = std::stoul(resRow["file_size"]); + collection_file.row_count_ = std::stoul(resRow["row_count"]); + collection_file.date_ = std::stoi(resRow["date"]); + collection_file.engine_type_ = std::stoi(resRow["engine_type"]); + collection_file.created_on_ = std::stol(resRow["created_on"]); + collection_file.updated_time_ = std::stol(resRow["updated_time"]); collection_file.dimension_ = collection_schema.dimension_; collection_file.index_file_size_ = collection_schema.index_file_size_; @@ -1293,7 +1592,8 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& file auto status = utils::GetCollectionFilePath(options_, collection_file); if (!status.ok()) { - result = status; + ret = status; + continue; } files.emplace_back(collection_file); @@ -1306,7 +1606,7 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& file files_holder.MarkFile(file); } } - return result; + return ret; } catch (std::exception& e) { return HandleException("Encounter exception when iterate merge files", e.what()); } @@ -1316,42 +1616,38 @@ Status SqliteMetaImpl::FilesToIndex(FilesHolder& files_holder) { try { fiu_do_on("SqliteMetaImpl.FilesToIndex.throw_exception", throw std::exception()); - server::MetricCollector metric; - auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, - &SegmentSchema::created_on_, &SegmentSchema::updated_time_); - decltype(ConnectorPtr->select(select_columns)) selected; - { - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - selected = ConnectorPtr->select(select_columns, - where(c(&SegmentSchema::file_type_) == (int)SegmentSchema::TO_INDEX)); + std::string statement = "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, " + "date, engine_type, created_on, updated_time FROM " + Quote(META_TABLEFILES) + + " WHERE file_type = " + std::to_string(SegmentSchema::TO_INDEX) + ";"; + + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(operation_mutex_); + + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; } Status ret; int64_t files_count = 0; std::map groups; - for (auto& file : selected) { + for (auto& resRow : res) { SegmentSchema collection_file; - collection_file.id_ = std::get<0>(file); - collection_file.collection_id_ = std::get<1>(file); - collection_file.segment_id_ = std::get<2>(file); - collection_file.file_id_ = std::get<3>(file); - collection_file.file_type_ = std::get<4>(file); - collection_file.file_size_ = std::get<5>(file); - collection_file.row_count_ = std::get<6>(file); - collection_file.date_ = std::get<7>(file); - collection_file.engine_type_ = std::get<8>(file); - collection_file.created_on_ = std::get<9>(file); - collection_file.updated_time_ = std::get<10>(file); + collection_file.id_ = std::stoul(resRow["id"]); + collection_file.collection_id_ = resRow["table_id"]; + collection_file.segment_id_ = resRow["segment_id"]; + collection_file.file_id_ = resRow["file_id"]; + collection_file.file_type_ = std::stoi(resRow["file_type"]); + collection_file.file_size_ = std::stoul(resRow["file_size"]); + collection_file.row_count_ = std::stol(resRow["row_count"]); + collection_file.date_ = std::stoi(resRow["date"]); + collection_file.engine_type_ = std::stoi(resRow["engine_type"]); + collection_file.created_on_ = std::stol(resRow["created_on"]); + collection_file.updated_time_ = std::stol(resRow["updated_time"]); - auto status = utils::GetCollectionFilePath(options_, collection_file); - if (!status.ok()) { - ret = status; - } auto groupItr = groups.find(collection_file.collection_id_); if (groupItr == groups.end()) { CollectionSchema collection_schema; @@ -1368,8 +1664,13 @@ SqliteMetaImpl::FilesToIndex(FilesHolder& files_holder) { collection_file.index_file_size_ = groups[collection_file.collection_id_].index_file_size_; collection_file.index_params_ = groups[collection_file.collection_id_].index_params_; collection_file.metric_type_ = groups[collection_file.collection_id_].metric_type_; - files_holder.MarkFile(collection_file); + auto status = utils::GetCollectionFilePath(options_, collection_file); + if (!status.ok()) { + ret = status; + } + + files_holder.MarkFile(collection_file); files_count++; } @@ -1394,73 +1695,57 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, const std::vector< try { fiu_do_on("SqliteMetaImpl.FilesByType.throw_exception", throw std::exception()); - CollectionSchema collection_schema; - collection_schema.collection_id_ = collection_id; - auto status = DescribeCollection(collection_schema); + std::string types; + for (auto type : file_types) { + if (!types.empty()) { + types += ","; + } + types += std::to_string(type); + } + + // since collection_id is a unique column we just need to check whether it exists or not + std::string statement = "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, " + "date, engine_type, created_on, updated_time FROM " + Quote(META_TABLEFILES) + + " WHERE table_id = " + Quote(collection_id) + + " AND file_type in (" + types + ");"; + + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(operation_mutex_); + + AttrsMapList res; + auto status = SqlQuery(statement, &res); if (!status.ok()) { return status; } - // get files by type - auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, - &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, - &SegmentSchema::created_on_, &SegmentSchema::updated_time_); - decltype(ConnectorPtr->select(select_columns)) selected; - { - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - selected = ConnectorPtr->select(select_columns, where(in(&SegmentSchema::file_type_, file_types) and - c(&SegmentSchema::collection_id_) == collection_id)); + CollectionSchema collection_schema; + collection_schema.collection_id_ = collection_id; + status = DescribeCollection(collection_schema); + if (!status.ok()) { + return status; } - if (selected.size() >= 1) { + if (res.size() > 0) { int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0; int to_index_count = 0, index_count = 0, backup_count = 0; - for (auto& file : selected) { + for (auto& resRow : res) { SegmentSchema file_schema; + file_schema.id_ = std::stoul(resRow["id"]); file_schema.collection_id_ = collection_id; - file_schema.id_ = std::get<0>(file); - file_schema.segment_id_ = std::get<1>(file); - file_schema.file_id_ = std::get<2>(file); - file_schema.file_type_ = std::get<3>(file); - file_schema.file_size_ = std::get<4>(file); - file_schema.row_count_ = std::get<5>(file); - file_schema.date_ = std::get<6>(file); - file_schema.engine_type_ = std::get<7>(file); - file_schema.created_on_ = std::get<8>(file); - file_schema.updated_time_ = std::get<9>(file); + file_schema.segment_id_ = resRow["segment_id"]; + file_schema.file_id_ = resRow["file_id"]; + file_schema.file_type_ = std::stoi(resRow["file_type"]); + file_schema.file_size_ = std::stoul(resRow["file_size"]); + file_schema.row_count_ = std::stoul(resRow["row_count"]); + file_schema.date_ = std::stoi(resRow["date"]); + file_schema.engine_type_ = std::stoi(resRow["engine_type"]); + file_schema.created_on_ = std::stol(resRow["created_on"]); + file_schema.updated_time_ = std::stol(resRow["updated_time"]); - file_schema.dimension_ = collection_schema.dimension_; file_schema.index_file_size_ = collection_schema.index_file_size_; file_schema.index_params_ = collection_schema.index_params_; file_schema.metric_type_ = collection_schema.metric_type_; - - switch (file_schema.file_type_) { - case (int)SegmentSchema::RAW: - ++raw_count; - break; - case (int)SegmentSchema::NEW: - ++new_count; - break; - case (int)SegmentSchema::NEW_MERGE: - ++new_merge_count; - break; - case (int)SegmentSchema::NEW_INDEX: - ++new_index_count; - break; - case (int)SegmentSchema::TO_INDEX: - ++to_index_count; - break; - case (int)SegmentSchema::INDEX: - ++index_count; - break; - case (int)SegmentSchema::BACKUP: - ++backup_count; - break; - default: - break; - } + file_schema.dimension_ = collection_schema.dimension_; auto status = utils::GetCollectionFilePath(options_, file_schema); if (!status.ok()) { @@ -1468,16 +1753,33 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, const std::vector< } files_holder.MarkFile(file_schema); + + int32_t file_type = file_schema.file_type_; + switch (file_type) { + case (int)SegmentSchema::RAW:++raw_count; + break; + case (int)SegmentSchema::NEW:++new_count; + break; + case (int)SegmentSchema::NEW_MERGE:++new_merge_count; + break; + case (int)SegmentSchema::NEW_INDEX:++new_index_count; + break; + case (int)SegmentSchema::TO_INDEX:++to_index_count; + break; + case (int)SegmentSchema::INDEX:++index_count; + break; + case (int)SegmentSchema::BACKUP:++backup_count; + break; + default:break; + } } std::string msg = "Get collection files by type."; for (int file_type : file_types) { switch (file_type) { - case (int)SegmentSchema::RAW: - msg = msg + " raw files:" + std::to_string(raw_count); + case (int)SegmentSchema::RAW:msg = msg + " raw files:" + std::to_string(raw_count); break; - case (int)SegmentSchema::NEW: - msg = msg + " new files:" + std::to_string(new_count); + case (int)SegmentSchema::NEW:msg = msg + " new files:" + std::to_string(new_count); break; case (int)SegmentSchema::NEW_MERGE: msg = msg + " new_merge files:" + std::to_string(new_merge_count); @@ -1485,26 +1787,22 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, const std::vector< case (int)SegmentSchema::NEW_INDEX: msg = msg + " new_index files:" + std::to_string(new_index_count); break; - case (int)SegmentSchema::TO_INDEX: - msg = msg + " to_index files:" + std::to_string(to_index_count); + case (int)SegmentSchema::TO_INDEX:msg = msg + " to_index files:" + std::to_string(to_index_count); break; - case (int)SegmentSchema::INDEX: - msg = msg + " index files:" + std::to_string(index_count); + case (int)SegmentSchema::INDEX:msg = msg + " index files:" + std::to_string(index_count); break; - case (int)SegmentSchema::BACKUP: - msg = msg + " backup files:" + std::to_string(backup_count); - break; - default: + case (int)SegmentSchema::BACKUP:msg = msg + " backup files:" + std::to_string(backup_count); break; + default:break; } } LOG_ENGINE_DEBUG_ << msg; } - } catch (std::exception& e) { - return HandleException("Encounter exception when check non index files", e.what()); - } - return ret; + return ret; + } catch (std::exception& e) { + return HandleException("Failed to get files by type", e.what()); + } } Status @@ -1542,38 +1840,48 @@ SqliteMetaImpl::FilesByTypeEx(const std::vector& collect int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0; int to_index_count = 0, index_count = 0, backup_count = 0; for (auto group : id_groups) { - auto select_columns = - columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, - &SegmentSchema::created_on_, &SegmentSchema::updated_time_); - decltype(ConnectorPtr->select(select_columns)) selected; - - auto match_collectionid = in(&SegmentSchema::collection_id_, group); - - std::vector file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX, - (int)SegmentSchema::INDEX}; - auto match_type = in(&SegmentSchema::file_type_, file_types); - { - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - auto filter = where(match_collectionid and match_type); - selected = ConnectorPtr->select(select_columns, filter); + std::string types; + for (auto type : file_types) { + if (!types.empty()) { + types += ","; + } + types += std::to_string(type); } - for (auto& file : selected) { + // since collection_id is a unique column we just need to check whether it exists or not + std::string statement = "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, " + "date, engine_type, created_on, updated_time FROM " + Quote(META_TABLEFILES) + + " WHERE table_id in ("; + for (size_t i = 0; i < group.size(); i++) { + statement += Quote(group[i]); + if (i != group.size() - 1) { + statement += ","; + } + } + statement += (") AND file_type in (" + types + ");"); + + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(operation_mutex_); + + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } + + for (auto& resRow : res) { SegmentSchema file_schema; - file_schema.id_ = std::get<0>(file); - file_schema.collection_id_ = std::get<1>(file); - file_schema.segment_id_ = std::get<2>(file); - file_schema.file_id_ = std::get<3>(file); - file_schema.file_type_ = std::get<4>(file); - file_schema.file_size_ = std::get<5>(file); - file_schema.row_count_ = std::get<6>(file); - file_schema.date_ = std::get<7>(file); - file_schema.engine_type_ = std::get<8>(file); - file_schema.created_on_ = std::get<9>(file); - file_schema.updated_time_ = std::get<10>(file); + file_schema.id_ = std::stoul(resRow["id"]); + file_schema.collection_id_ = resRow["table_id"]; + file_schema.segment_id_ = resRow["segment_id"]; + file_schema.file_id_ = resRow["file_id"]; + file_schema.file_type_ = std::stoi(resRow["file_type"]); + file_schema.file_size_ = std::stoul(resRow["file_size"]); + file_schema.row_count_ = std::stoul(resRow["row_count"]); + file_schema.date_ = std::stoi(resRow["date"]); + file_schema.engine_type_ = std::stoi(resRow["engine_type"]); + file_schema.created_on_ = std::stol(resRow["created_on"]); + file_schema.updated_time_ = std::stol(resRow["updated_time"]); auto& collection_schema = map_collections[file_schema.collection_id_]; file_schema.dimension_ = collection_schema.dimension_; @@ -1581,75 +1889,61 @@ SqliteMetaImpl::FilesByTypeEx(const std::vector& collect file_schema.index_params_ = collection_schema.index_params_; file_schema.metric_type_ = collection_schema.metric_type_; - switch (file_schema.file_type_) { - case (int)SegmentSchema::RAW: - ++raw_count; - break; - case (int)SegmentSchema::NEW: - ++new_count; - break; - case (int)SegmentSchema::NEW_MERGE: - ++new_merge_count; - break; - case (int)SegmentSchema::NEW_INDEX: - ++new_index_count; - break; - case (int)SegmentSchema::TO_INDEX: - ++to_index_count; - break; - case (int)SegmentSchema::INDEX: - ++index_count; - break; - case (int)SegmentSchema::BACKUP: - ++backup_count; - break; - default: - break; - } - auto status = utils::GetCollectionFilePath(options_, file_schema); if (!status.ok()) { ret = status; + continue; } files_holder.MarkFile(file_schema); + + int32_t file_type = file_schema.file_type_; + switch (file_type) { + case (int)SegmentSchema::RAW:++raw_count; + break; + case (int)SegmentSchema::NEW:++new_count; + break; + case (int)SegmentSchema::NEW_MERGE:++new_merge_count; + break; + case (int)SegmentSchema::NEW_INDEX:++new_index_count; + break; + case (int)SegmentSchema::TO_INDEX:++to_index_count; + break; + case (int)SegmentSchema::INDEX:++index_count; + break; + case (int)SegmentSchema::BACKUP:++backup_count; + break; + default:break; + } } } std::string msg = "Get collection files by type."; for (int file_type : file_types) { switch (file_type) { - case (int)SegmentSchema::RAW: - msg = msg + " raw files:" + std::to_string(raw_count); + case (int)SegmentSchema::RAW:msg = msg + " raw files:" + std::to_string(raw_count); break; - case (int)SegmentSchema::NEW: - msg = msg + " new files:" + std::to_string(new_count); + case (int)SegmentSchema::NEW:msg = msg + " new files:" + std::to_string(new_count); break; - case (int)SegmentSchema::NEW_MERGE: - msg = msg + " new_merge files:" + std::to_string(new_merge_count); + case (int)SegmentSchema::NEW_MERGE:msg = msg + " new_merge files:" + std::to_string(new_merge_count); break; - case (int)SegmentSchema::NEW_INDEX: - msg = msg + " new_index files:" + std::to_string(new_index_count); + case (int)SegmentSchema::NEW_INDEX:msg = msg + " new_index files:" + std::to_string(new_index_count); break; - case (int)SegmentSchema::TO_INDEX: - msg = msg + " to_index files:" + std::to_string(to_index_count); + case (int)SegmentSchema::TO_INDEX:msg = msg + " to_index files:" + std::to_string(to_index_count); break; - case (int)SegmentSchema::INDEX: - msg = msg + " index files:" + std::to_string(index_count); + case (int)SegmentSchema::INDEX:msg = msg + " index files:" + std::to_string(index_count); break; - case (int)SegmentSchema::BACKUP: - msg = msg + " backup files:" + std::to_string(backup_count); - break; - default: + case (int)SegmentSchema::BACKUP:msg = msg + " backup files:" + std::to_string(backup_count); break; + default:break; } } - LOG_ENGINE_DEBUG_ << msg; - } catch (std::exception& e) { - return HandleException("Encounter exception when check non index files", e.what()); - } - return ret; + LOG_ENGINE_DEBUG_ << msg; + return ret; + } catch (std::exception& e) { + return HandleException("Failed to get files by type", e.what()); + } } Status @@ -1662,37 +1956,43 @@ SqliteMetaImpl::FilesByID(const std::vector& ids, FilesHolder& files_hol server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.FilesByID.throw_exception", throw std::exception()); - auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, - &SegmentSchema::created_on_, &SegmentSchema::updated_time_); + std::string statement = "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, " + "date, engine_type, created_on, updated_time FROM " + Quote(META_TABLEFILES); - // perform query - decltype(ConnectorPtr->select(select_columns)) selected; - auto match_fileid = in(&SegmentSchema::id_, ids); - auto filter = where(match_fileid); - { - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - selected = ConnectorPtr->select(select_columns, filter); + std::stringstream idSS; + for (auto& id : ids) { + idSS << "id = " << std::to_string(id) << " OR "; + } + std::string idStr = idSS.str(); + idStr = idStr.substr(0, idStr.size() - 4); // remove the last " OR " + + statement += (" WHERE (" + idStr + ")"); + + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(operation_mutex_); + + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; } std::map collections; Status ret; int64_t files_count = 0; - for (auto& file : selected) { + for (auto& resRow : res) { SegmentSchema collection_file; - collection_file.id_ = std::get<0>(file); - collection_file.collection_id_ = std::get<1>(file); - collection_file.segment_id_ = std::get<2>(file); - collection_file.file_id_ = std::get<3>(file); - collection_file.file_type_ = std::get<4>(file); - collection_file.file_size_ = std::get<5>(file); - collection_file.row_count_ = std::get<6>(file); - collection_file.date_ = std::get<7>(file); - collection_file.engine_type_ = std::get<8>(file); - collection_file.created_on_ = std::get<9>(file); - collection_file.updated_time_ = std::get<10>(file); + collection_file.id_ = std::stoul(resRow["id"]); + collection_file.collection_id_ = resRow["table_id"]; + collection_file.segment_id_ = resRow["segment_id"]; + collection_file.file_id_ = resRow["file_id"]; + collection_file.file_type_ = std::stoi(resRow["file_type"]); + collection_file.file_size_ = std::stoul(resRow["file_size"]); + collection_file.row_count_ = std::stoul(resRow["row_count"]); + collection_file.date_ = std::stoi(resRow["date"]); + collection_file.engine_type_ = std::stoi(resRow["engine_type"]); + collection_file.created_on_ = std::stol(resRow["created_on"]); + collection_file.updated_time_ = std::stol(resRow["updated_time"]); if (collections.find(collection_file.collection_id_) == collections.end()) { CollectionSchema collection_schema; @@ -1707,6 +2007,7 @@ SqliteMetaImpl::FilesByID(const std::vector& ids, FilesHolder& files_hol auto status = utils::GetCollectionFilePath(options_, collection_file); if (!status.ok()) { ret = status; + continue; } files_holder.MarkFile(collection_file); @@ -1732,43 +2033,47 @@ SqliteMetaImpl::FilesByID(const std::vector& ids, FilesHolder& files_hol } catch (std::exception& e) { return HandleException("Encounter exception when iterate index files", e.what()); } + return Status::OK(); } -// TODO(myh): Support swap to cloud storage Status SqliteMetaImpl::Archive() { auto& criterias = options_.archive_conf_.GetCriterias(); - if (criterias.size() == 0) { + if (criterias.empty()) { return Status::OK(); } - for (auto kv : criterias) { + for (auto& kv : criterias) { auto& criteria = kv.first; auto& limit = kv.second; if (criteria == engine::ARCHIVE_CONF_DAYS) { - int64_t usecs = limit * DAY * US_PS; + size_t usecs = limit * DAY * US_PS; int64_t now = utils::GetMicroSecTimeStamp(); + try { fiu_do_on("SqliteMetaImpl.Archive.throw_exception", throw std::exception()); - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); + std::string statement = "UPDATE " + std::string(META_TABLEFILES) + + " SET file_type = " + std::to_string(SegmentSchema::TO_DELETE) + + " WHERE created_on < " + std::to_string(now - usecs) + + " AND file_type <> " + std::to_string(SegmentSchema::TO_DELETE) + ";"; - ConnectorPtr->update_all(set(c(&SegmentSchema::file_type_) = (int)SegmentSchema::TO_DELETE), - where(c(&SegmentSchema::created_on_) < (int64_t)(now - usecs) and - c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE)); + auto status = SqlTransaction({statement}); + if (!status.ok()) { + return HandleException("Failed to archive", status.message().c_str()); + } + + LOG_ENGINE_DEBUG_ << "Archive old files"; } catch (std::exception& e) { - return HandleException("Encounter exception when update collection files", e.what()); + return HandleException("Failed to archive", e.what()); } - - LOG_ENGINE_DEBUG_ << "Archive old files"; } if (criteria == engine::ARCHIVE_CONF_DISK) { uint64_t sum = 0; Size(sum); - int64_t to_delete = (int64_t)sum - limit * GB; + auto to_delete = (sum - limit * GB); DiscardFiles(to_delete); LOG_ENGINE_DEBUG_ << "Archive files to free disk"; @@ -1784,13 +2089,22 @@ SqliteMetaImpl::Size(uint64_t& result) { try { fiu_do_on("SqliteMetaImpl.Size.throw_exception", throw std::exception()); - auto selected = ConnectorPtr->select(columns(sum(&SegmentSchema::file_size_)), - where(c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE)); - for (auto& total_size : selected) { - if (!std::get<0>(total_size)) { - continue; - } - result += (uint64_t)(*std::get<0>(total_size)); + std::string statement = "SELECT IFNULL(SUM(file_size),0) AS sum FROM " + Quote(META_TABLEFILES) + + " WHERE file_type <> " + std::to_string(SegmentSchema::TO_DELETE) + ";"; + + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(operation_mutex_); + + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } + + if (res.empty()) { + result = 0; + } else { + result = std::stoul(res[0]["sum"]); } } catch (std::exception& e) { return HandleException("Encounter exception when calculate db size", e.what()); @@ -1804,31 +2118,17 @@ SqliteMetaImpl::CleanUpShadowFiles() { try { server::MetricCollector metric; - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); + std::string statement = "DELETE FROM " + std::string(META_TABLEFILES) + " WHERE file_type IN (" + + std::to_string(SegmentSchema::NEW) + "," + + std::to_string(SegmentSchema::NEW_MERGE) + "," + + std::to_string(SegmentSchema::NEW_INDEX) + ");"; - std::vector file_types = {(int)SegmentSchema::NEW, (int)SegmentSchema::NEW_INDEX, - (int)SegmentSchema::NEW_MERGE}; - auto files = - ConnectorPtr->select(columns(&SegmentSchema::id_), where(in(&SegmentSchema::file_type_, file_types))); - - auto commited = ConnectorPtr->transaction([&]() mutable { - for (auto& file : files) { - LOG_ENGINE_DEBUG_ << "Remove collection file type as NEW"; - ConnectorPtr->remove(std::get<0>(file)); - } - return true; - }); - - fiu_do_on("SqliteMetaImpl.CleanUpShadowFiles.fail_commited", commited = false); + auto status = SqlTransaction({statement}); + fiu_do_on("SqliteMetaImpl.CleanUpShadowFiles.fail_commited", status = Status(DB_ERROR, "")); fiu_do_on("SqliteMetaImpl.CleanUpShadowFiles.throw_exception", throw std::exception()); - if (!commited) { + if (!status.ok()) { return HandleException("CleanUp error: sqlite transaction failed"); } - - if (files.size() > 0) { - LOG_ENGINE_DEBUG_ << "Clean " << files.size() << " files"; - } } catch (std::exception& e) { return HandleException("Encounter exception when clean collection file", e.what()); } @@ -1853,68 +2153,88 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/ (int)SegmentSchema::BACKUP, }; - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - // collect files to be deleted - auto files = - ConnectorPtr->select(columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, - &SegmentSchema::segment_id_, &SegmentSchema::engine_type_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::date_), - where(in(&SegmentSchema::file_type_, file_types) and - c(&SegmentSchema::updated_time_) < now - seconds * US_PS)); + std::string statement = "SELECT id, table_id, segment_id, engine_type, file_id, file_type, date" + " FROM " + std::string(META_TABLEFILES) + " WHERE file_type IN (" + + std::to_string(SegmentSchema::TO_DELETE) + "," + + std::to_string(SegmentSchema::BACKUP) + ")" + + " AND updated_time < " + std::to_string(now - seconds * US_PS) + ";"; + + + AttrsMapList res; + { + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(operation_mutex_); + + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } + } + + SegmentSchema collection_file; + std::vector delete_ids; int64_t clean_files = 0; - auto commited = ConnectorPtr->transaction([&]() mutable { - SegmentSchema collection_file; - for (auto& file : files) { - collection_file.id_ = std::get<0>(file); - collection_file.collection_id_ = std::get<1>(file); - collection_file.segment_id_ = std::get<2>(file); - collection_file.engine_type_ = std::get<3>(file); - collection_file.file_id_ = std::get<4>(file); - collection_file.file_type_ = std::get<5>(file); - collection_file.date_ = std::get<6>(file); + for (auto& resRow : res) { + collection_file.id_ = std::stoul(resRow["id"]); + collection_file.collection_id_ = resRow["table_id"]; + collection_file.segment_id_ = resRow["segment_id"]; + collection_file.engine_type_ = std::stoi(resRow["engine_type"]); + collection_file.file_id_ = resRow["file_id"]; + collection_file.date_ = std::stoi(resRow["date"]); + collection_file.file_type_ = std::stoi(resRow["file_type"]); - // check if the file can be deleted - if (!FilesHolder::CanBeDeleted(collection_file)) { - LOG_ENGINE_DEBUG_ << "File:" << collection_file.file_id_ - << " currently is in use, not able to delete now"; - continue; // ignore this file, don't delete it - } - - // erase from cache, must do this before file deleted, - // because GetCollectionFilePath won't able to generate file path after the file is deleted - // TODO(zhiru): clean up - utils::GetCollectionFilePath(options_, collection_file); - server::CommonUtil::EraseFromCache(collection_file.location_); - - if (collection_file.file_type_ == (int)SegmentSchema::TO_DELETE) { - // delete file from meta - ConnectorPtr->remove(collection_file.id_); - - // delete file from disk storage - utils::DeleteCollectionFilePath(options_, collection_file); - - LOG_ENGINE_DEBUG_ << "Remove file id:" << collection_file.file_id_ - << " location:" << collection_file.location_; - collection_ids.insert(collection_file.collection_id_); - segment_ids.insert(std::make_pair(collection_file.segment_id_, collection_file)); - - ++clean_files; - } + // check if the file can be deleted + if (!FilesHolder::CanBeDeleted(collection_file)) { + LOG_ENGINE_DEBUG_ << "File:" << collection_file.file_id_ + << " currently is in use, not able to delete now"; + continue; // ignore this file, don't delete it } - return true; - }); - fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveFile_FailCommited", commited = false); - if (!commited) { - return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed"); + // erase file data from cache + // because GetCollectionFilePath won't able to generate file path after the file is deleted + utils::GetCollectionFilePath(options_, collection_file); + server::CommonUtil::EraseFromCache(collection_file.location_); + + if (collection_file.file_type_ == (int)SegmentSchema::TO_DELETE) { + // delete file from disk storage + utils::DeleteCollectionFilePath(options_, collection_file); + LOG_ENGINE_DEBUG_ << "Remove file id:" << collection_file.id_ + << " location:" << collection_file.location_; + + delete_ids.emplace_back(std::to_string(collection_file.id_)); + collection_ids.insert(collection_file.collection_id_); + segment_ids.insert(std::make_pair(collection_file.segment_id_, collection_file)); + + clean_files++; + } } if (clean_files > 0) { LOG_ENGINE_DEBUG_ << "Clean " << clean_files << " files expired in " << seconds << " seconds"; } + + // delete file from meta + std::vector statements; + if (!delete_ids.empty()) { + std::stringstream idsToDeleteSS; + for (auto& id : delete_ids) { + idsToDeleteSS << "id = " << id << " OR "; + } + + std::string idsToDeleteStr = idsToDeleteSS.str(); + idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); // remove the last " OR " + statement = "DELETE FROM " + std::string(META_TABLEFILES) + " WHERE " + idsToDeleteStr + ";"; + + statements.emplace_back(statement); + } + + auto status = SqlTransaction(statements); + fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveFile_FailCommited", status = Status(DB_ERROR, "")); + if (!status.ok()) { + return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed"); + } } catch (std::exception& e) { return HandleException("Encounter exception when clean collection files", e.what()); } @@ -1924,29 +2244,41 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/ fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveCollection_ThrowException", throw std::exception()); server::MetricCollector metric; - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); + std::string statement = "SELECT id, table_id FROM " + std::string(META_TABLES) + + " WHERE state = " + std::to_string(CollectionSchema::TO_DELETE) + ";"; - auto collections = - ConnectorPtr->select(columns(&CollectionSchema::id_, &CollectionSchema::collection_id_), - where(c(&CollectionSchema::state_) == (int)CollectionSchema::TO_DELETE)); - - auto commited = ConnectorPtr->transaction([&]() mutable { - for (auto& collection : collections) { - utils::DeleteCollectionPath(options_, std::get<1>(collection), false); // only delete empty folder - ConnectorPtr->remove(std::get<0>(collection)); - } - - return true; - }); - fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveCollection_Failcommited", commited = false); - - if (!commited) { - return HandleException("CleanUpFilesWithTTL error: sqlite transaction failed"); + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; } - if (collections.size() > 0) { - LOG_ENGINE_DEBUG_ << "Remove " << collections.size() << " collections from meta"; + int64_t remove_collections = 0; + if (!res.empty()) { + std::stringstream idsToDeleteSS; + for (auto& resRow : res) { + size_t id = std::stoul(resRow["id"]); + std::string collection_id; + collection_id = resRow["table_id"]; + + utils::DeleteCollectionPath(options_, collection_id, false); // only delete empty folder + ++remove_collections; + idsToDeleteSS << "id = " << std::to_string(id) << " OR "; + } + std::string idsToDeleteStr = idsToDeleteSS.str(); + idsToDeleteStr = idsToDeleteStr.substr(0, idsToDeleteStr.size() - 4); // remove the last " OR " + statement = "DELETE FROM " + std::string(META_TABLES) + " WHERE " + idsToDeleteStr + ";"; + + status = SqlTransaction({statement}); + if (!status.ok()) { + return HandleException("Failed to clean up with ttl", status.message().c_str()); + } + } + + fiu_do_on("SqliteMetaImpl.CleanUpFilesWithTTL.RemoveCollection_Failcommited", throw std::exception()); + + if (remove_collections > 0) { + LOG_ENGINE_DEBUG_ << "Remove " << remove_collections << " collections from meta"; } } catch (std::exception& e) { return HandleException("Encounter exception when clean collection files", e.what()); @@ -1960,9 +2292,16 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/ int64_t remove_collections = 0; for (auto& collection_id : collection_ids) { - auto selected = ConnectorPtr->select(columns(&SegmentSchema::file_id_), - where(c(&SegmentSchema::collection_id_) == collection_id)); - if (selected.size() == 0) { + std::string statement = "SELECT file_id FROM " + std::string(META_TABLEFILES) + + " WHERE table_id = " + Quote(collection_id) + ";"; + + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } + + if (res.empty()) { utils::DeleteCollectionPath(options_, collection_id); ++remove_collections; } @@ -1983,9 +2322,16 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/ int64_t remove_segments = 0; for (auto& segment_id : segment_ids) { - auto selected = ConnectorPtr->select(columns(&SegmentSchema::id_), - where(c(&SegmentSchema::segment_id_) == segment_id.first)); - if (selected.size() == 0) { + std::string statement = "SELECT id FROM " + std::string(META_TABLEFILES) + + " WHERE segment_id = " + Quote(segment_id.first) + ";"; + + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } + + if (res.empty()) { utils::DeleteSegment(options_, segment_id.second); std::string segment_dir; utils::GetParentPath(segment_id.second.location_, segment_dir); @@ -2008,20 +2354,8 @@ Status SqliteMetaImpl::Count(const std::string& collection_id, uint64_t& result) { try { fiu_do_on("SqliteMetaImpl.Count.throw_exception", throw std::exception()); - server::MetricCollector metric; - std::vector file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX, - (int)SegmentSchema::INDEX}; - auto select_columns = columns(&SegmentSchema::row_count_); - decltype(ConnectorPtr->select(select_columns)) selected; - { - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - selected = ConnectorPtr->select(select_columns, where(in(&SegmentSchema::file_type_, file_types) and - c(&SegmentSchema::collection_id_) == collection_id)); - } - CollectionSchema collection_schema; collection_schema.collection_id_ = collection_id; auto status = DescribeCollection(collection_schema); @@ -2030,13 +2364,30 @@ SqliteMetaImpl::Count(const std::string& collection_id, uint64_t& result) { return status; } + std::string statement = "SELECT row_count FROM " + std::string(META_TABLEFILES) + + " WHERE table_id = " + Quote(collection_id) + + " AND (file_type = " + std::to_string(SegmentSchema::RAW) + + " OR file_type = " + std::to_string(SegmentSchema::TO_INDEX) + + " OR file_type = " + std::to_string(SegmentSchema::INDEX) + ");"; + + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(operation_mutex_); + + AttrsMapList res; + status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } + result = 0; - for (auto& file : selected) { - result += std::get<0>(file); + for (auto& resRow : res) { + size_t size = std::stoul(resRow["row_count"]); + result += size; } } catch (std::exception& e) { return HandleException("Encounter exception when calculate collection file size", e.what()); } + return Status::OK(); } @@ -2045,10 +2396,18 @@ SqliteMetaImpl::DropAll() { LOG_ENGINE_DEBUG_ << "Drop all sqlite meta"; try { - ConnectorPtr->drop_table(META_TABLES); - ConnectorPtr->drop_table(META_TABLEFILES); - ConnectorPtr->drop_table(META_ENVIRONMENT); - ConnectorPtr->drop_table(META_FIELDS); + std::string statement = "DROP TABLE IF EXISTS "; + std::vector statements = { + statement + TABLES_SCHEMA.name() + ";", + statement + TABLEFILES_SCHEMA.name() + ";", + statement + ENVIRONMENT_SCHEMA.name() + ";", + statement + FIELDS_SCHEMA.name() + ";", + }; + + auto status = SqlTransaction(statements); + if (!status.ok()) { + return HandleException("Failed to drop all", status.message().c_str()); + } } catch (std::exception& e) { return HandleException("Encounter exception when drop all meta", e.what()); } @@ -2066,44 +2425,48 @@ SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) { try { fiu_do_on("SqliteMetaImpl.DiscardFiles.throw_exception", throw std::exception()); - server::MetricCollector metric; - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); + std::string statement = "SELECT id, file_size FROM " + std::string(META_TABLEFILES) + + " WHERE file_type <> " + std::to_string(SegmentSchema::TO_DELETE) + + " ORDER BY id ASC LIMIT 10;"; - auto commited = ConnectorPtr->transaction([&]() mutable { - auto selected = ConnectorPtr->select(columns(&SegmentSchema::id_, &SegmentSchema::file_size_), - where(c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE), - order_by(&SegmentSchema::id_), limit(10)); + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } - std::vector ids; - SegmentSchema collection_file; + if (res.empty()) { + return Status::OK(); + } - for (auto& file : selected) { - if (to_discard_size <= 0) - break; - collection_file.id_ = std::get<0>(file); - collection_file.file_size_ = std::get<1>(file); - ids.push_back(collection_file.id_); - LOG_ENGINE_DEBUG_ << "Discard file id=" << collection_file.file_id_ - << " file size=" << collection_file.file_size_; - to_discard_size -= collection_file.file_size_; + SegmentSchema collection_file; + std::stringstream idsToDiscardSS; + for (auto& resRow : res) { + if (to_discard_size <= 0) { + break; } + collection_file.id_ = std::stoul(resRow["id"]); + collection_file.file_size_ = std::stoul(resRow["file_size"]); + idsToDiscardSS << "id = " << std::to_string(collection_file.id_) << " OR "; + LOG_ENGINE_DEBUG_ << "Discard file id=" << collection_file.file_id_ + << " file size=" << collection_file.file_size_; + to_discard_size -= collection_file.file_size_; + } - if (ids.size() == 0) { - return true; - } + std::string idsToDiscardStr = idsToDiscardSS.str(); + idsToDiscardStr = idsToDiscardStr.substr(0, idsToDiscardStr.size() - 4); // remove the last " OR " - ConnectorPtr->update_all(set(c(&SegmentSchema::file_type_) = (int)SegmentSchema::TO_DELETE, - c(&SegmentSchema::updated_time_) = utils::GetMicroSecTimeStamp()), - where(in(&SegmentSchema::id_, ids))); + statement = "UPDATE " + std::string(META_TABLEFILES) + + " SET file_type = " + std::to_string(SegmentSchema::TO_DELETE) + + " ,updated_time = " + std::to_string(utils::GetMicroSecTimeStamp()) + + " WHERE " + idsToDiscardStr + ";"; - return true; - }); - fiu_do_on("SqliteMetaImpl.DiscardFiles.fail_commited", commited = false); - if (!commited) { - return HandleException("DiscardFiles error: sqlite transaction failed"); + status = SqlTransaction({statement}); + fiu_do_on("SqliteMetaImpl.DiscardFiles.fail_commited", status = Status(DB_ERROR, "")); + if (!status.ok()) { + return HandleException("DiscardFiles error: sqlite transaction failed", status.message().c_str()); } } catch (std::exception& e) { return HandleException("Encounter exception when discard collection file", e.what()); @@ -2117,21 +2480,36 @@ SqliteMetaImpl::SetGlobalLastLSN(uint64_t lsn) { try { server::MetricCollector metric; - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); + bool first_create = false; + uint64_t last_lsn = 0; + std::string statement = "SELECT global_lsn FROM " + std::string(META_ENVIRONMENT) + ";"; - auto selected = ConnectorPtr->select(columns(&EnvironmentSchema::global_lsn_)); - if (selected.size() == 0) { - EnvironmentSchema env; - env.global_lsn_ = lsn; - ConnectorPtr->insert(env); + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } + + if (res.size() == 0) { + first_create = true; } else { - uint64_t last_lsn = std::get<0>(selected[0]); - if (lsn <= last_lsn) { - return Status::OK(); - } + last_lsn = std::stoul(res[0]["global_lsn"]); + } - ConnectorPtr->update_all(set(c(&EnvironmentSchema::global_lsn_) = lsn)); + if (first_create) { // first time to get global lsn + statement = "INSERT INTO " + std::string(META_ENVIRONMENT) + " VALUES(" + std::to_string(lsn) + ");"; + + status = SqlTransaction({statement}); + if (!status.ok()) { + return HandleException("QUERY ERROR WHEN SET GLOBAL LSN", status.message().c_str()); + } + } else if (lsn > last_lsn) { + statement = "UPDATE " + std::string(META_ENVIRONMENT) + " SET global_lsn = " + std::to_string(lsn) + ";"; + + status = SqlTransaction({statement}); + if (!status.ok()) { + return HandleException("Failed to set global lsn", status.message().c_str()); + } } LOG_ENGINE_DEBUG_ << "Update global lsn = " << lsn; @@ -2148,11 +2526,18 @@ SqliteMetaImpl::GetGlobalLastLSN(uint64_t& lsn) { try { server::MetricCollector metric; - auto selected = ConnectorPtr->select(columns(&EnvironmentSchema::global_lsn_)); - if (selected.size() == 0) { + std::string statement = "SELECT global_lsn FROM " + std::string(META_ENVIRONMENT) + ";"; + + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } + + if (res.empty()) { lsn = 0; } else { - lsn = std::get<0>(selected[0]); + lsn = std::stoul(res[0]["global_lsn"]); } } catch (std::exception& e) { return HandleException("Encounter exception when delete collection folder", e.what()); @@ -2168,22 +2553,27 @@ SqliteMetaImpl::CreateHybridCollection(meta::CollectionSchema& collection_schema try { server::MetricCollector metric; - // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here - std::lock_guard meta_lock(meta_mutex_); - if (collection_schema.collection_id_ == "") { NextCollectionId(collection_schema.collection_id_); } else { fiu_do_on("SqliteMetaImpl.CreateCollection.throw_exception", throw std::exception()); - auto collection = - ConnectorPtr->select(columns(&CollectionSchema::state_), - where(c(&CollectionSchema::collection_id_) == collection_schema.collection_id_)); - if (collection.size() == 1) { - if (CollectionSchema::TO_DELETE == std::get<0>(collection[0])) { + + std::string statement = "SELECT state FROM " + std::string(META_TABLES) + + " WHERE table_id = " + Quote(collection_schema.collection_id_) + ";"; + + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } + + if (res.size() == 1) { + int state = std::stoi(res[0]["state"]); + fiu_do_on("MySQLMetaImpl.CreateCollection.schema_TO_DELETE", state = CollectionSchema::TO_DELETE); + if (CollectionSchema::TO_DELETE == state) { return Status(DB_ERROR, "Collection already exists and it is in delete state, please wait a second"); } else { - // Change from no error to already exist. return Status(DB_ALREADY_EXIST, "Collection already exists"); } } @@ -2192,34 +2582,58 @@ SqliteMetaImpl::CreateHybridCollection(meta::CollectionSchema& collection_schema collection_schema.id_ = -1; collection_schema.created_on_ = utils::GetMicroSecTimeStamp(); - try { - fiu_do_on("SqliteMetaImpl.CreateHybridCollection.insert_throw_exception", throw std::exception()); - auto id = ConnectorPtr->insert(collection_schema); - collection_schema.id_ = id; - } catch (std::exception& e) { - return HandleException("Encounter exception when create collection", e.what()); + std::string id = "NULL"; // auto-increment + std::string& collection_id = collection_schema.collection_id_; + std::string state = std::to_string(collection_schema.state_); + std::string dimension = std::to_string(collection_schema.dimension_); + std::string created_on = std::to_string(collection_schema.created_on_); + std::string flag = std::to_string(collection_schema.flag_); + std::string index_file_size = std::to_string(collection_schema.index_file_size_); + std::string engine_type = std::to_string(collection_schema.engine_type_); + std::string& index_params = collection_schema.index_params_; + std::string metric_type = std::to_string(collection_schema.metric_type_); + std::string& owner_collection = collection_schema.owner_collection_; + std::string& partition_tag = collection_schema.partition_tag_; + std::string& version = collection_schema.version_; + std::string flush_lsn = std::to_string(collection_schema.flush_lsn_); + + std::string statement = "INSERT INTO " + std::string(META_TABLES) + + " VALUES(" + id + ", " + Quote(collection_id) + ", " + state + ", " + dimension + ", " + + created_on + ", " + flag + ", " + index_file_size + ", " + engine_type + ", " + + Quote(index_params) + ", " + metric_type + ", " + Quote(owner_collection) + ", " + + Quote(partition_tag) + ", " + Quote(version) + ", " + flush_lsn + ");"; + + auto status = SqlTransaction({statement}); + if (!status.ok()) { + return HandleException("Encounter exception when create collection", status.message().c_str()); } + collection_schema.id_ = sqlite3_last_insert_rowid(db_); LOG_ENGINE_DEBUG_ << "Successfully create collection collection: " << collection_schema.collection_id_; - Status status = utils::CreateCollectionPath(options_, collection_schema.collection_id_); - if (!status.ok()) { - return status; - } + for (auto schema : fields_schema.fields_schema_) { + std::string id = "NULL"; + std::string collection_id = schema.collection_id_; + std::string field_name = schema.field_name_; + std::string field_type = std::to_string(schema.field_type_); + std::string field_params = schema.field_params_; - try { - for (auto schema : fields_schema.fields_schema_) { - auto field_id = ConnectorPtr->insert(schema); - LOG_ENGINE_DEBUG_ << "Successfully create collection field" << field_id; + statement = "INSERT INTO " + std::string(META_FIELDS) + " VALUES(" + Quote(collection_id) + ", " + + Quote(field_name) + ", " + field_type + ", " + Quote(field_params) + ");"; + + status = SqlTransaction({statement}); + if (!status.ok()) { + return HandleException("Failed to create field table", status.message().c_str()); } - } catch (std::exception& e) { - return HandleException("Encounter exception when create collection field", e.what()); } - return status; + LOG_ENGINE_DEBUG_ << "Successfully create hybrid collection: " << collection_schema.collection_id_; + return utils::CreateCollectionPath(options_, collection_schema.collection_id_); } catch (std::exception& e) { return HandleException("Encounter exception when create collection", e.what()); } + + return Status::OK(); } Status @@ -2228,50 +2642,60 @@ SqliteMetaImpl::DescribeHybridCollection(milvus::engine::meta::CollectionSchema& try { server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.DescriCollection.throw_exception", throw std::exception()); - auto groups = ConnectorPtr->select( - columns(&CollectionSchema::id_, &CollectionSchema::state_, &CollectionSchema::dimension_, - &CollectionSchema::created_on_, &CollectionSchema::flag_, &CollectionSchema::index_file_size_, - &CollectionSchema::engine_type_, &CollectionSchema::index_params_, &CollectionSchema::metric_type_, - &CollectionSchema::owner_collection_, &CollectionSchema::partition_tag_, - &CollectionSchema::version_, &CollectionSchema::flush_lsn_), - where(c(&CollectionSchema::collection_id_) == collection_schema.collection_id_ and - c(&CollectionSchema::state_) != (int)CollectionSchema::TO_DELETE)); - if (groups.size() == 1) { - collection_schema.id_ = std::get<0>(groups[0]); - collection_schema.state_ = std::get<1>(groups[0]); - collection_schema.dimension_ = std::get<2>(groups[0]); - collection_schema.created_on_ = std::get<3>(groups[0]); - collection_schema.flag_ = std::get<4>(groups[0]); - collection_schema.index_file_size_ = std::get<5>(groups[0]); - collection_schema.engine_type_ = std::get<6>(groups[0]); - collection_schema.index_params_ = std::get<7>(groups[0]); - collection_schema.metric_type_ = std::get<8>(groups[0]); - collection_schema.owner_collection_ = std::get<9>(groups[0]); - collection_schema.partition_tag_ = std::get<10>(groups[0]); - collection_schema.version_ = std::get<11>(groups[0]); - collection_schema.flush_lsn_ = std::get<12>(groups[0]); + std::string statement = "SELECT id, state, dimension, created_on, flag, index_file_size, engine_type," + " index_params, metric_type ,owner_table, partition_tag, version, flush_lsn" + " FROM " + std::string(META_TABLES) + + " WHERE table_id = " + Quote(collection_schema.collection_id_) + + " AND state <> " + std::to_string(CollectionSchema::TO_DELETE) + ";"; + + AttrsMapList res; + auto status = SqlQuery(statement, &res); + if (!status.ok()) { + return status; + } + + if (res.size() == 1) { + auto& resRow = res[0]; + collection_schema.id_ = std::stoul(resRow["id"]); + collection_schema.state_ = std::stoi(resRow["state"]); + collection_schema.dimension_ = std::stoi(resRow["dimension"]); + collection_schema.created_on_ = std::stol(resRow["created_on"]); + collection_schema.flag_ = std::stol(resRow["flag"]); + collection_schema.index_file_size_ = std::stol(resRow["index_file_size"]); + collection_schema.engine_type_ = std::stoi(resRow["engine_type"]); + collection_schema.index_params_ = resRow["index_params"]; + collection_schema.metric_type_ = std::stoi(resRow["metric_type"]); + collection_schema.owner_collection_ = resRow["owner_table"]; + collection_schema.partition_tag_ = resRow["partition_tag"]; + collection_schema.version_ = resRow["version"]; + collection_schema.flush_lsn_ = std::stoul(resRow["flush_lsn"]); } else { return Status(DB_NOT_FOUND, "Collection " + collection_schema.collection_id_ + " not found"); } - auto field_groups = - ConnectorPtr->select(columns(&hybrid::FieldSchema::collection_id_, &hybrid::FieldSchema::field_name_, - &hybrid::FieldSchema::field_type_, &hybrid::FieldSchema::field_params_), - where(c(&hybrid::FieldSchema::collection_id_) == collection_schema.collection_id_)); + statement = "SELECT collection_id, field_name, field_type, field_params FROM " + std::string(META_FIELDS) + + " WHERE collection_id = " + Quote(collection_schema.collection_id_) + ";"; - if (field_groups.size() >= 1) { - fields_schema.fields_schema_.resize(field_groups.size()); - for (uint64_t i = 0; i < field_groups.size(); ++i) { - fields_schema.fields_schema_[i].collection_id_ = std::get<0>(field_groups[i]); - fields_schema.fields_schema_[i].field_name_ = std::get<1>(field_groups[i]); - fields_schema.fields_schema_[i].field_type_ = std::get<2>(field_groups[i]); - fields_schema.fields_schema_[i].field_params_ = std::get<3>(field_groups[i]); - } - } else { - return Status(DB_NOT_FOUND, "Collection " + collection_schema.collection_id_ + " fields not found"); + AttrsMapList field_res; + status = SqlQuery(statement, &field_res); + if (!status.ok()) { + return status; } + auto num_row = field_res.size(); + if (num_row >= 1) { + fields_schema.fields_schema_.resize(num_row); + for (uint64_t i = 0; i < num_row; ++i) { + auto& resRow = field_res[i]; + fields_schema.fields_schema_[i].collection_id_ = resRow["collection_id"]; + fields_schema.fields_schema_[i].field_name_ = resRow["field_name"]; + fields_schema.fields_schema_[i].field_type_ = std::stoi(resRow["field_type"]); + fields_schema.fields_schema_[i].field_params_ = resRow["field_params"]; + } + } else { + return Status(DB_NOT_FOUND, "Fields of " + collection_schema.collection_id_ + " not found"); + } } catch (std::exception& e) { return HandleException("Encounter exception when describe collection", e.what()); } diff --git a/core/src/db/meta/SqliteMetaImpl.h b/core/src/db/meta/SqliteMetaImpl.h index 43c575196e..982bc49b4b 100644 --- a/core/src/db/meta/SqliteMetaImpl.h +++ b/core/src/db/meta/SqliteMetaImpl.h @@ -14,20 +14,20 @@ #include #include #include +#include #include #include "Meta.h" #include "db/Options.h" +#include + namespace milvus { namespace engine { namespace meta { -auto -StoragePrototype(const std::string& path); - -auto -CollectionPrototype(const std::string& path); +using AttrsMap = std::unordered_map; +using AttrsMapList = std::vector; class SqliteMetaImpl : public Meta { public: @@ -179,10 +179,19 @@ class SqliteMetaImpl : public Meta { Status Initialize(); + Status + SqlQuery(const std::string& sql, AttrsMapList* res = nullptr); + + Status + SqlTransaction(const std::vector& sql_statements); + private: const DBMetaOptions options_; - std::mutex meta_mutex_; + std::mutex sqlite_mutex_; // make sqlite query/execute action to be atomic + std::mutex operation_mutex_; // make operation such UpdateTableFiles to be atomic std::mutex genid_mutex_; + + sqlite3* db_ = nullptr; }; // DBMetaImpl } // namespace meta diff --git a/core/unittest/db/test_db.cpp b/core/unittest/db/test_db.cpp index 1e640fdd2c..3876dcc3d7 100644 --- a/core/unittest/db/test_db.cpp +++ b/core/unittest/db/test_db.cpp @@ -200,7 +200,7 @@ TEST_F(DBTest, DB_TEST) { std::vector tags; stat = db_->Query(dummy_context_, COLLECTION_NAME, tags, k, json_params, qxb, result_ids, result_distances); - ss << "Search " << j << " With Size " << count / milvus::engine::MB << " MB"; + ss << "Search " << j << " With Size " << count / milvus::engine::MB << " MB" << stat.message(); STOP_TIMER(ss.str()); ASSERT_TRUE(stat.ok());