diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 4595b809ea..7216c3693e 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -131,7 +131,6 @@ DBImpl::Start() { if (record.type == wal::MXLogType::None) { break; } - ExecWalRecord(record); } @@ -227,8 +226,9 @@ DBImpl::CreateHybridCollection(meta::CollectionSchema& collection_schema, meta:: } meta::CollectionSchema temp_schema = collection_schema; + temp_schema.index_file_size_ *= MB; if (options_.wal_enable_) { - // TODO(yukun): wal_mgr_->CreateHybridCollection() + temp_schema.flush_lsn_ = wal_mgr_->CreateHybridCollection(collection_schema.collection_id_); } return meta_ptr_->CreateHybridCollection(temp_schema, fields_schema); @@ -606,6 +606,127 @@ DBImpl::InsertVectors(const std::string& collection_id, const std::string& parti return status; } +Status +CopyToAttr(std::vector& record, uint64_t row_num, const std::vector& field_names, + std::unordered_map& attr_types, + std::unordered_map>& attr_datas, + std::unordered_map& attr_nbytes, + std::unordered_map& attr_data_size) { + uint64_t offset = 0; + for (auto name : field_names) { + switch (attr_types.at(name)) { + case meta::hybrid::DataType::INT8: { + std::vector data; + data.resize(row_num * sizeof(int8_t)); + + std::vector attr_value(row_num, 0); + memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(int64_t)); + + std::vector raw_value(row_num, 0); + for (uint64_t i = 0; i < row_num; ++i) { + raw_value[i] = attr_value[i]; + } + + memcpy(data.data(), raw_value.data(), row_num * sizeof(int8_t)); + attr_datas.insert(std::make_pair(name, data)); + + attr_nbytes.insert(std::make_pair(name, sizeof(int8_t))); + attr_data_size.insert(std::make_pair(name, row_num * sizeof(int8_t))); + offset += row_num * sizeof(int64_t); + break; + } + case meta::hybrid::DataType::INT16: { + std::vector data; + data.resize(row_num * sizeof(int16_t)); + + std::vector attr_value(row_num, 0); + memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(int64_t)); + + std::vector raw_value(row_num, 0); + for (uint64_t i = 0; i < row_num; ++i) { + raw_value[i] = attr_value[i]; + } + + memcpy(data.data(), raw_value.data(), row_num * sizeof(int16_t)); + attr_datas.insert(std::make_pair(name, data)); + + attr_nbytes.insert(std::make_pair(name, sizeof(int16_t))); + attr_data_size.insert(std::make_pair(name, row_num * sizeof(int16_t))); + offset += row_num * sizeof(int64_t); + break; + } + case meta::hybrid::DataType::INT32: { + std::vector data; + data.resize(row_num * sizeof(int32_t)); + + std::vector attr_value(row_num, 0); + memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(int64_t)); + + std::vector raw_value(row_num, 0); + for (uint64_t i = 0; i < row_num; ++i) { + raw_value[i] = attr_value[i]; + } + + memcpy(data.data(), raw_value.data(), row_num * sizeof(int32_t)); + attr_datas.insert(std::make_pair(name, data)); + + attr_nbytes.insert(std::make_pair(name, sizeof(int32_t))); + attr_data_size.insert(std::make_pair(name, row_num * sizeof(int32_t))); + offset += row_num * sizeof(int64_t); + break; + } + case meta::hybrid::DataType::INT64: { + std::vector data; + data.resize(row_num * sizeof(int64_t)); + memcpy(data.data(), record.data() + offset, row_num * sizeof(int64_t)); + attr_datas.insert(std::make_pair(name, data)); + + std::vector test_data(row_num); + memcpy(test_data.data(), record.data(), row_num * sizeof(int64_t)); + + attr_nbytes.insert(std::make_pair(name, sizeof(int64_t))); + attr_data_size.insert(std::make_pair(name, row_num * sizeof(int64_t))); + offset += row_num * sizeof(int64_t); + break; + } + case meta::hybrid::DataType::FLOAT: { + std::vector data; + data.resize(row_num * sizeof(float)); + + std::vector attr_value(row_num, 0); + memcpy(attr_value.data(), record.data() + offset, row_num * sizeof(double)); + + std::vector raw_value(row_num, 0); + for (uint64_t i = 0; i < row_num; ++i) { + raw_value[i] = attr_value[i]; + } + + memcpy(data.data(), raw_value.data(), row_num * sizeof(float)); + attr_datas.insert(std::make_pair(name, data)); + + attr_nbytes.insert(std::make_pair(name, sizeof(float))); + attr_data_size.insert(std::make_pair(name, row_num * sizeof(float))); + offset += row_num * sizeof(double); + break; + } + case meta::hybrid::DataType::DOUBLE: { + std::vector data; + data.resize(row_num * sizeof(double)); + memcpy(data.data(), record.data() + offset, row_num * sizeof(double)); + attr_datas.insert(std::make_pair(name, data)); + + attr_nbytes.insert(std::make_pair(name, sizeof(double))); + attr_data_size.insert(std::make_pair(name, row_num * sizeof(double))); + offset += row_num * sizeof(double); + break; + } + default: + break; + } + } + return Status::OK(); +} + Status DBImpl::InsertEntities(const std::string& collection_id, const std::string& partition_tag, const std::vector& field_names, Entity& entity, @@ -624,7 +745,15 @@ DBImpl::InsertEntities(const std::string& collection_id, const std::string& part } Status status; - // insert entities: collection_name is field id + std::unordered_map> attr_data; + std::unordered_map attr_nbytes; + std::unordered_map attr_data_size; + status = CopyToAttr(entity.attr_value_, entity.entity_count_, field_names, attr_types, attr_data, attr_nbytes, + attr_data_size); + if (!status.ok()) { + return status; + } + wal::MXLogRecord record; record.lsn = 0; record.collection_id = collection_id; @@ -637,123 +766,62 @@ DBImpl::InsertEntities(const std::string& collection_id, const std::string& part record.type = wal::MXLogType::Entity; record.data = vector_it->second.float_data_.data(); record.data_size = vector_it->second.float_data_.size() * sizeof(float); + record.attr_data = attr_data; + record.attr_nbytes = attr_nbytes; + record.attr_data_size = attr_data_size; } else { // record.type = wal::MXLogType::InsertBinary; // record.data = entities.vector_data_[0].binary_data_.data(); // record.length = entities.vector_data_[0].binary_data_.size() * sizeof(uint8_t); } - uint64_t offset = 0; - for (auto field_name : field_names) { - switch (attr_types.at(field_name)) { - case meta::hybrid::DataType::INT8: { - std::vector data; - data.resize(entity.entity_count_ * sizeof(int8_t)); - - std::vector attr_value(entity.entity_count_, 0); - memcpy(attr_value.data(), entity.attr_value_.data() + offset, entity.entity_count_ * sizeof(int64_t)); - offset += entity.entity_count_ * sizeof(int64_t); - - std::vector raw_value(entity.entity_count_, 0); - for (uint64_t i = 0; i < entity.entity_count_; ++i) { - raw_value[i] = attr_value[i]; - } - - memcpy(data.data(), raw_value.data(), entity.entity_count_ * sizeof(int8_t)); - record.attr_data.insert(std::make_pair(field_name, data)); - - record.attr_nbytes.insert(std::make_pair(field_name, sizeof(int8_t))); - record.attr_data_size.insert(std::make_pair(field_name, entity.entity_count_ * sizeof(int8_t))); - break; - } - case meta::hybrid::DataType::INT16: { - std::vector data; - data.resize(entity.entity_count_ * sizeof(int16_t)); - - std::vector attr_value(entity.entity_count_, 0); - memcpy(attr_value.data(), entity.attr_value_.data() + offset, entity.entity_count_ * sizeof(int64_t)); - offset += entity.entity_count_ * sizeof(int64_t); - - std::vector raw_value(entity.entity_count_, 0); - for (uint64_t i = 0; i < entity.entity_count_; ++i) { - raw_value[i] = attr_value[i]; - } - - memcpy(data.data(), raw_value.data(), entity.entity_count_ * sizeof(int16_t)); - record.attr_data.insert(std::make_pair(field_name, data)); - - record.attr_nbytes.insert(std::make_pair(field_name, sizeof(int16_t))); - record.attr_data_size.insert(std::make_pair(field_name, entity.entity_count_ * sizeof(int16_t))); - break; - } - case meta::hybrid::DataType::INT32: { - std::vector data; - data.resize(entity.entity_count_ * sizeof(int32_t)); - - std::vector attr_value(entity.entity_count_, 0); - memcpy(attr_value.data(), entity.attr_value_.data() + offset, entity.entity_count_ * sizeof(int64_t)); - offset += entity.entity_count_ * sizeof(int64_t); - - std::vector raw_value(entity.entity_count_, 0); - for (uint64_t i = 0; i < entity.entity_count_; ++i) { - raw_value[i] = attr_value[i]; - } - - memcpy(data.data(), raw_value.data(), entity.entity_count_ * sizeof(int32_t)); - record.attr_data.insert(std::make_pair(field_name, data)); - - record.attr_nbytes.insert(std::make_pair(field_name, sizeof(int32_t))); - record.attr_data_size.insert(std::make_pair(field_name, entity.entity_count_ * sizeof(int32_t))); - break; - } - case meta::hybrid::DataType::INT64: { - std::vector data; - data.resize(entity.entity_count_ * sizeof(int64_t)); - memcpy(data.data(), entity.attr_value_.data() + offset, entity.entity_count_ * sizeof(int64_t)); - record.attr_data.insert(std::make_pair(field_name, data)); - - record.attr_nbytes.insert(std::make_pair(field_name, sizeof(int64_t))); - record.attr_data_size.insert(std::make_pair(field_name, entity.entity_count_ * sizeof(int64_t))); - offset += entity.entity_count_ * sizeof(int64_t); - break; - } - case meta::hybrid::DataType::FLOAT: { - std::vector data; - data.resize(entity.entity_count_ * sizeof(float)); - - std::vector attr_value(entity.entity_count_, 0); - memcpy(attr_value.data(), entity.attr_value_.data() + offset, entity.entity_count_ * sizeof(double)); - offset += entity.entity_count_ * sizeof(double); - - std::vector raw_value(entity.entity_count_, 0); - for (uint64_t i = 0; i < entity.entity_count_; ++i) { - raw_value[i] = attr_value[i]; - } - - memcpy(data.data(), raw_value.data(), entity.entity_count_ * sizeof(float)); - record.attr_data.insert(std::make_pair(field_name, data)); - - record.attr_nbytes.insert(std::make_pair(field_name, sizeof(float))); - record.attr_data_size.insert(std::make_pair(field_name, entity.entity_count_ * sizeof(float))); - break; - } - case meta::hybrid::DataType::DOUBLE: { - std::vector data; - data.resize(entity.entity_count_ * sizeof(double)); - memcpy(data.data(), entity.attr_value_.data() + offset, entity.entity_count_ * sizeof(double)); - record.attr_data.insert(std::make_pair(field_name, data)); - - record.attr_nbytes.insert(std::make_pair(field_name, sizeof(double))); - record.attr_data_size.insert(std::make_pair(field_name, entity.entity_count_ * sizeof(double))); - offset += entity.entity_count_ * sizeof(double); - break; - } - default: - break; - } - } - status = ExecWalRecord(record); + +#if 0 + if (options_.wal_enable_) { + std::string target_collection_name; + status = GetPartitionByTag(collection_id, partition_tag, target_collection_name); + if (!status.ok()) { + LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Get partition fail: %s", "insert", 0, status.message().c_str()); + return status; + } + + auto vector_it = entity.vector_data_.begin(); + if (!vector_it->second.binary_data_.empty()) { + wal_mgr_->InsertEntities(collection_id, partition_tag, entity.id_array_, vector_it->second.binary_data_, + attr_nbytes, attr_data); + } else if (!vector_it->second.float_data_.empty()) { + wal_mgr_->InsertEntities(collection_id, partition_tag, entity.id_array_, vector_it->second.float_data_, + attr_nbytes, attr_data); + } + swn_wal_.Notify(); + } else { + // insert entities: collection_name is field id + wal::MXLogRecord record; + record.lsn = 0; + record.collection_id = collection_id; + record.partition_tag = partition_tag; + record.ids = entity.id_array_.data(); + record.length = entity.entity_count_; + + auto vector_it = entity.vector_data_.begin(); + if (vector_it->second.binary_data_.empty()) { + record.type = wal::MXLogType::Entity; + record.data = vector_it->second.float_data_.data(); + record.data_size = vector_it->second.float_data_.size() * sizeof(float); + record.attr_data = attr_data; + record.attr_nbytes = attr_nbytes; + record.attr_data_size = attr_data_size; + } else { + // record.type = wal::MXLogType::InsertBinary; + // record.data = entities.vector_data_[0].binary_data_.data(); + // record.length = entities.vector_data_[0].binary_data_.size() * sizeof(uint8_t); + } + + status = ExecWalRecord(record); + } +#endif + return status; } diff --git a/core/src/db/meta/MetaTypes.h b/core/src/db/meta/MetaTypes.h index 5d555ec9a3..fff6d74abe 100644 --- a/core/src/db/meta/MetaTypes.h +++ b/core/src/db/meta/MetaTypes.h @@ -133,26 +133,6 @@ struct VectorFieldsSchema { }; using VectorFieldSchemaPtr = std::shared_ptr; -struct CollectionSchema { - typedef enum { - NORMAL, - TO_DELETE, - } COLLETION_STATE; - - size_t id_ = 0; - std::string collection_id_; - int32_t state_ = (int)NORMAL; - int64_t field_num = 0; - int64_t created_on_ = 0; - int64_t flag_ = 0; - std::string owner_collection_; - std::string partition_tag_; - std::string version_ = CURRENT_VERSION; - uint64_t flush_lsn_ = 0; -}; - -using CollectionSchemaPtr = std::shared_ptr; - struct FieldSchema { typedef enum { INT8 = 1, diff --git a/core/src/db/meta/MySQLMetaImpl.cpp b/core/src/db/meta/MySQLMetaImpl.cpp index ecb3d35de3..18d61e4e1c 100644 --- a/core/src/db/meta/MySQLMetaImpl.cpp +++ b/core/src/db/meta/MySQLMetaImpl.cpp @@ -184,6 +184,14 @@ static const MetaSchema TABLEFILES_SCHEMA(META_TABLEFILES, { MetaField("flush_lsn", "BIGINT", "DEFAULT 0 NOT NULL"), }); +// Fields schema +static const MetaSchema FIELDS_SCHEMA(META_FIELDS, { + MetaField("collection_id", "VARCHAR(255)", "NOT NULL"), + MetaField("field_name", "VARCHAR(255)", "NOT NULL"), + MetaField("field_type", "INT", "DEFAULT 0 NOT NULL"), + MetaField("field_params", "VARCHAR(255)", "NOT NULL"), + }); + } // namespace //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -264,10 +272,15 @@ MySQLMetaImpl::ValidateMetaSchema() { throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version"); } - // verufy TableFiles + // verify TableFiles if (!validate_func(TABLEFILES_SCHEMA)) { throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version"); } + + // verify Fields + if (!validate_func(FIELDS_SCHEMA)) { + throw Exception(DB_INCOMPATIB_META, "Meta Fields schema is created by milvus old version"); + } } Status @@ -380,6 +393,18 @@ MySQLMetaImpl::Initialize() { throw Exception(DB_META_TRANSACTION_FAILED, msg); } + // step 10: create meta table Field + InitializeQuery << "CREATE TABLE IF NOT EXISTS " << FIELDS_SCHEMA.name() << " (" << FIELDS_SCHEMA.ToString() + ");"; + + LOG_ENGINE_DEBUG_ << "Initialize: " << InitializeQuery.str(); + + initialize_query_exec = InitializeQuery.exec(); + if (!initialize_query_exec) { + std::string msg = "Failed to create meta table 'Fields' in MySQL"; + LOG_ENGINE_ERROR_ << msg; + throw Exception(DB_META_TRANSACTION_FAILED, msg); + } + return Status::OK(); } @@ -2641,10 +2666,179 @@ MySQLMetaImpl::GetGlobalLastLSN(uint64_t& lsn) { Status MySQLMetaImpl::CreateHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) { + try { + server::MetricCollector metric; + { + mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_); + + bool is_null_connection = (connectionPtr == nullptr); + fiu_do_on("MySQLMetaImpl.CreateCollection.null_connection", is_null_connection = true); + fiu_do_on("MySQLMetaImpl.CreateCollection.throw_exception", throw std::exception();); + if (is_null_connection) { + return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); + } + + mysqlpp::Query statement = connectionPtr->query(); + + if (collection_schema.collection_id_.empty()) { + NextCollectionId(collection_schema.collection_id_); + } else { + statement << "SELECT state FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote + << collection_schema.collection_id_ << ";"; + + LOG_ENGINE_DEBUG_ << "CreateCollection: " << statement.str(); + + mysqlpp::StoreQueryResult res = statement.store(); + + if (res.num_rows() == 1) { + int state = res[0]["state"]; + fiu_do_on("MySQLMetaImpl.CreateCollection.schema_TO_DELETE", state = CollectionSchema::TO_DELETE); + if (CollectionSchema::TO_DELETE == state) { + return Status(DB_ERROR, + "Collection already exists and it is in delete state, please wait a second"); + } else { + return Status(DB_ALREADY_EXIST, "Collection already exists"); + } + } + } + + collection_schema.id_ = -1; + collection_schema.created_on_ = utils::GetMicroSecTimeStamp(); + + std::string id = "NULL"; // auto-increment + std::string& collection_id = collection_schema.collection_id_; + std::string state = std::to_string(collection_schema.state_); + std::string dimension = std::to_string(collection_schema.dimension_); + std::string created_on = std::to_string(collection_schema.created_on_); + std::string flag = std::to_string(collection_schema.flag_); + std::string index_file_size = std::to_string(collection_schema.index_file_size_); + std::string engine_type = std::to_string(collection_schema.engine_type_); + std::string& index_params = collection_schema.index_params_; + std::string metric_type = std::to_string(collection_schema.metric_type_); + std::string& owner_collection = collection_schema.owner_collection_; + std::string& partition_tag = collection_schema.partition_tag_; + std::string& version = collection_schema.version_; + std::string flush_lsn = std::to_string(collection_schema.flush_lsn_); + + statement << "INSERT INTO " << META_TABLES << " VALUES(" << id << ", " << mysqlpp::quote << collection_id + << ", " << state << ", " << dimension << ", " << created_on << ", " << flag << ", " + << index_file_size << ", " << engine_type << ", " << mysqlpp::quote << index_params << ", " + << metric_type << ", " << mysqlpp::quote << owner_collection << ", " << mysqlpp::quote + << partition_tag << ", " << mysqlpp::quote << version << ", " << flush_lsn << ");"; + + LOG_ENGINE_DEBUG_ << "CreateHybridCollection: " << statement.str(); + + if (mysqlpp::SimpleResult res = statement.execute()) { + collection_schema.id_ = res.insert_id(); // Might need to use SELECT LAST_INSERT_ID()? + + // Consume all results to avoid "Commands out of sync" error + } else { + return HandleException("Failed to create collection", statement.error()); + } + + for (auto schema : fields_schema.fields_schema_) { + std::string id = "NULL"; + std::string collection_id = schema.collection_id_; + std::string field_name = schema.field_name_; + std::string field_type = std::to_string(schema.field_type_); + std::string field_params = schema.field_params_; + + statement << "INSERT INTO " << META_FIELDS << " VALUES(" << mysqlpp::quote << collection_id << ", " + << mysqlpp::quote << field_name << ", " << field_type << ", " << mysqlpp::quote << ", " + << field_params << ");"; + + LOG_ENGINE_DEBUG_ << "Create field: " << statement.str(); + + if (mysqlpp::SimpleResult field_res = statement.execute()) { + // TODO(yukun): need field id? + + } else { + return HandleException("Failed to create field table", statement.error()); + } + } + } // Scoped Connection + + LOG_ENGINE_DEBUG_ << "Successfully create hybrid collection: " << collection_schema.collection_id_; + std::cout << collection_schema.collection_id_; + return utils::CreateCollectionPath(options_, collection_schema.collection_id_); + } catch (std::exception& e) { + return HandleException("Failed to create collection", e.what()); + } } Status MySQLMetaImpl::DescribeHybridCollection(CollectionSchema& collection_schema, hybrid::FieldsSchema& fields_schema) { + try { + server::MetricCollector metric; + mysqlpp::StoreQueryResult res, field_res; + { + mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_); + + bool is_null_connection = (connectionPtr == nullptr); + fiu_do_on("MySQLMetaImpl.DescribeCollection.null_connection", is_null_connection = true); + fiu_do_on("MySQLMetaImpl.DescribeCollection.throw_exception", throw std::exception();); + if (is_null_connection) { + return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); + } + + mysqlpp::Query statement = connectionPtr->query(); + statement << "SELECT id, state, dimension, created_on, flag, index_file_size, engine_type, index_params" + << " , metric_type ,owner_table, partition_tag, version, flush_lsn" + << " FROM " << META_TABLES << " WHERE table_id = " << mysqlpp::quote + << collection_schema.collection_id_ << " AND state <> " + << std::to_string(CollectionSchema::TO_DELETE) << ";"; + + LOG_ENGINE_DEBUG_ << "DescribeHybridCollection: " << statement.str(); + + res = statement.store(); + + mysqlpp::Query field_statement = connectionPtr->query(); + field_statement << "SELECT collection_id, field_name, field_type, field_params" + << " FROM " << META_FIELDS << " WHERE collection_id = " << mysqlpp::quote + << collection_schema.collection_id_ << ";"; + + LOG_ENGINE_DEBUG_ << "Describe Collection Fields: " << field_statement.str(); + + field_res = field_statement.store(); + } // Scoped Connection + + if (res.num_rows() == 1) { + const mysqlpp::Row& resRow = res[0]; + collection_schema.id_ = resRow["id"]; // implicit conversion + collection_schema.state_ = resRow["state"]; + collection_schema.dimension_ = resRow["dimension"]; + collection_schema.created_on_ = resRow["created_on"]; + collection_schema.flag_ = resRow["flag"]; + collection_schema.index_file_size_ = resRow["index_file_size"]; + collection_schema.engine_type_ = resRow["engine_type"]; + resRow["index_params"].to_string(collection_schema.index_params_); + collection_schema.metric_type_ = resRow["metric_type"]; + resRow["owner_table"].to_string(collection_schema.owner_collection_); + resRow["partition_tag"].to_string(collection_schema.partition_tag_); + resRow["version"].to_string(collection_schema.version_); + collection_schema.flush_lsn_ = resRow["flush_lsn"]; + } else { + return Status(DB_NOT_FOUND, "Collection " + collection_schema.collection_id_ + " not found"); + } + + auto num_row = field_res.num_rows(); + if (num_row >= 1) { + fields_schema.fields_schema_.resize(num_row); + for (uint64_t i = 0; i < num_row; ++i) { + const mysqlpp::Row& resRow = field_res[i]; + resRow["collection_id"].to_string(fields_schema.fields_schema_[i].collection_id_); + resRow["field_name"].to_string(fields_schema.fields_schema_[i].field_name_); + fields_schema.fields_schema_[i].field_type_ = resRow["field_type"]; + resRow["field_params"].to_string(fields_schema.fields_schema_[i].field_params_); + } + } else { + return Status(DB_NOT_FOUND, "Fields of " + collection_schema.collection_id_ + " not found"); + } + } catch (std::exception& e) { + return HandleException("Failed to describe collection", e.what()); + } + + return Status::OK(); } Status diff --git a/core/src/db/meta/SqliteMetaImpl.cpp b/core/src/db/meta/SqliteMetaImpl.cpp index c6ab5d2e10..f677fae364 100644 --- a/core/src/db/meta/SqliteMetaImpl.cpp +++ b/core/src/db/meta/SqliteMetaImpl.cpp @@ -34,8 +34,9 @@ #include "utils/StringHelpFunctions.h" #include "utils/ValidationUtil.h" -#define USING_SQLITE_WARNING LOG_ENGINE_WARNING_ << \ - "You are using SQLite as the meta data management, which can't be used in production. Please change it to MySQL!"; +#define USING_SQLITE_WARNING \ + LOG_ENGINE_WARNING_ << "You are using SQLite as the meta data management, which can't be used in production. " \ + "Please change it to MySQL!"; namespace milvus { namespace engine { @@ -83,59 +84,22 @@ StoragePrototype(const std::string& path) { make_column("field_name", &hybrid::FieldSchema::field_name_), make_column("field_type", &hybrid::FieldSchema::field_type_), make_column("field_params", &hybrid::FieldSchema::field_params_)), - make_table( - META_TABLEFILES, make_column("id", &SegmentSchema::id_, primary_key()), - make_column("table_id", &SegmentSchema::collection_id_), - make_column("segment_id", &SegmentSchema::segment_id_, default_value("")), - make_column("engine_type", &SegmentSchema::engine_type_), - make_column("file_id", &SegmentSchema::file_id_), make_column("file_type", &SegmentSchema::file_type_), - make_column("file_size", &SegmentSchema::file_size_, default_value(0)), - make_column("row_count", &SegmentSchema::row_count_, default_value(0)), - make_column("updated_time", &SegmentSchema::updated_time_), - make_column("created_on", &SegmentSchema::created_on_), make_column("date", &SegmentSchema::date_), - make_column("flush_lsn", &SegmentSchema::flush_lsn_))); -} - -inline auto -CollectionPrototype(const std::string& path) { - return make_storage( - path, - make_table(META_ENVIRONMENT, make_column("global_lsn", &EnvironmentSchema::global_lsn_, default_value(0))), - make_table(META_COLLECTIONS, make_column("id", &hybrid::CollectionSchema::id_, primary_key()), - make_column("collection_id", &hybrid::CollectionSchema::collection_id_, unique()), - make_column("state", &hybrid::CollectionSchema::state_), - make_column("field_num", &hybrid::CollectionSchema::field_num), - make_column("created_on", &hybrid::CollectionSchema::created_on_), - make_column("flag", &hybrid::CollectionSchema::flag_, default_value(0)), - make_column("owner_collection", &hybrid::CollectionSchema::owner_collection_, default_value("")), - make_column("partition_tag", &hybrid::CollectionSchema::partition_tag_, default_value("")), - make_column("version", &hybrid::CollectionSchema::version_, default_value(CURRENT_VERSION)), - make_column("flush_lsn", &hybrid::CollectionSchema::flush_lsn_)), - make_table(META_FIELDS, make_column("collection_id", &hybrid::FieldSchema::collection_id_), - make_column("field_name", &hybrid::FieldSchema::field_name_), - make_column("field_type", &hybrid::FieldSchema::field_type_), - make_column("field_params", &hybrid::FieldSchema::field_params_)), - make_table( - META_COLLECTIONFILES, - make_column("id", &hybrid::CollectionFileSchema::id_, primary_key()), - make_column("collection_id", &hybrid::CollectionFileSchema::collection_id_), - make_column("segment_id", &hybrid::CollectionFileSchema::segment_id_, default_value("")), - make_column("file_id", &hybrid::CollectionFileSchema::file_id_), - make_column("file_type", &hybrid::CollectionFileSchema::file_type_), - make_column("file_size", &hybrid::CollectionFileSchema::file_size_, default_value(0)), - make_column("row_count", &hybrid::CollectionFileSchema::row_count_, default_value(0)), - make_column("updated_time", &hybrid::CollectionFileSchema::updated_time_), - make_column("created_on", &hybrid::CollectionFileSchema::created_on_), - make_column("date", &hybrid::CollectionFileSchema::date_), - make_column("flush_lsn", &hybrid::CollectionFileSchema::flush_lsn_))); + make_table(META_TABLEFILES, make_column("id", &SegmentSchema::id_, primary_key()), + make_column("table_id", &SegmentSchema::collection_id_), + make_column("segment_id", &SegmentSchema::segment_id_, default_value("")), + make_column("engine_type", &SegmentSchema::engine_type_), + make_column("file_id", &SegmentSchema::file_id_), + make_column("file_type", &SegmentSchema::file_type_), + make_column("file_size", &SegmentSchema::file_size_, default_value(0)), + make_column("row_count", &SegmentSchema::row_count_, default_value(0)), + make_column("updated_time", &SegmentSchema::updated_time_), + make_column("created_on", &SegmentSchema::created_on_), make_column("date", &SegmentSchema::date_), + make_column("flush_lsn", &SegmentSchema::flush_lsn_))); } using ConnectorT = decltype(StoragePrototype("table")); static std::unique_ptr ConnectorPtr; -using CollectionConnectT = decltype(CollectionPrototype("")); -static std::unique_ptr CollectionConnectPtr; - SqliteMetaImpl::SqliteMetaImpl(const DBMetaOptions& options) : options_(options) { Initialize(); } @@ -177,8 +141,8 @@ SqliteMetaImpl::ValidateMetaSchema() { sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLES]) { throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version"); } - if (ret.find(META_FIELDS) != ret.end() - && sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_FIELDS]) { + if (ret.find(META_FIELDS) != ret.end() && + sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_FIELDS]) { throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version"); } if (ret.find(META_TABLEFILES) != ret.end() && @@ -187,30 +151,6 @@ SqliteMetaImpl::ValidateMetaSchema() { } } -void -SqliteMetaImpl::ValidateCollectionMetaSchema() { - bool is_null_connector{CollectionConnectPtr == nullptr}; - fiu_do_on("SqliteMetaImpl.ValidateMetaSchema.NullConnection", is_null_connector = true); - if (is_null_connector) { - return; - } - - // old meta could be recreated since schema changed, throw exception if meta schema is not compatible - auto ret = CollectionConnectPtr->sync_schema_simulate(); - if (ret.find(META_COLLECTIONS) != ret.end() && - sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_COLLECTIONS]) { - throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version"); - } - if (ret.find(META_FIELDS) != ret.end() - && sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_FIELDS]) { - throw Exception(DB_INCOMPATIB_META, "Meta Tables schema is created by Milvus old version"); - } - if (ret.find(META_COLLECTIONFILES) != ret.end() && - sqlite_orm::sync_schema_result::dropped_and_recreated == ret[META_TABLEFILES]) { - throw Exception(DB_INCOMPATIB_META, "Meta TableFiles schema is created by Milvus old version"); - } -} - Status SqliteMetaImpl::Initialize() { if (!boost::filesystem::is_directory(options_.path_)) { @@ -231,14 +171,6 @@ SqliteMetaImpl::Initialize() { ConnectorPtr->open_forever(); // thread safe option ConnectorPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log - CollectionConnectPtr = std::make_unique(CollectionPrototype(options_.path_ + "/metah.sqlite")); - - ValidateCollectionMetaSchema(); - - CollectionConnectPtr->sync_schema(); - CollectionConnectPtr->open_forever(); - CollectionConnectPtr->pragma.journal_mode(journal_mode::WAL); // WAL => write ahead log - CleanUpShadowFiles(); return Status::OK(); @@ -656,7 +588,7 @@ SqliteMetaImpl::UpdateCollectionFlushLSN(const std::string& collection_id, uint6 ConnectorPtr->update_all(set(c(&CollectionSchema::flush_lsn_) = flush_lsn), where(c(&CollectionSchema::collection_id_) == collection_id)); LOG_ENGINE_DEBUG_ << "Successfully update collection flush_lsn, collection id = " << collection_id - << " flush_lsn = " << flush_lsn;; + << " flush_lsn = " << flush_lsn; } catch (std::exception& e) { std::string msg = "Encounter exception when update collection lsn: collection_id = " + collection_id; return HandleException(msg, e.what()); @@ -1094,10 +1026,9 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& fil } // perform query - auto select_columns = - columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_); + auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_); auto match_collectionid = c(&SegmentSchema::collection_id_) == collection_id; @@ -1309,14 +1240,10 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, } // get files by type - auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, - &SegmentSchema::file_type_, - &SegmentSchema::file_size_, - &SegmentSchema::row_count_, - &SegmentSchema::date_, - &SegmentSchema::engine_type_, - &SegmentSchema::created_on_); + auto select_columns = + columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, + &SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_, + &SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_); decltype(ConnectorPtr->select(select_columns)) selected; { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here @@ -1348,21 +1275,29 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, file_schema.metric_type_ = collection_schema.metric_type_; switch (file_schema.file_type_) { - case (int)SegmentSchema::RAW:++raw_count; + case (int)SegmentSchema::RAW: + ++raw_count; break; - case (int)SegmentSchema::NEW:++new_count; + case (int)SegmentSchema::NEW: + ++new_count; break; - case (int)SegmentSchema::NEW_MERGE:++new_merge_count; + case (int)SegmentSchema::NEW_MERGE: + ++new_merge_count; break; - case (int)SegmentSchema::NEW_INDEX:++new_index_count; + case (int)SegmentSchema::NEW_INDEX: + ++new_index_count; break; - case (int)SegmentSchema::TO_INDEX:++to_index_count; + case (int)SegmentSchema::TO_INDEX: + ++to_index_count; break; - case (int)SegmentSchema::INDEX:++index_count; + case (int)SegmentSchema::INDEX: + ++index_count; break; - case (int)SegmentSchema::BACKUP:++backup_count; + case (int)SegmentSchema::BACKUP: + ++backup_count; + break; + default: break; - default:break; } auto status = utils::GetCollectionFilePath(options_, file_schema); @@ -1376,25 +1311,29 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, std::string msg = "Get collection files by type."; for (int file_type : file_types) { switch (file_type) { - case (int)SegmentSchema::RAW:msg = msg + " raw files:" + std::to_string(raw_count); + case (int)SegmentSchema::RAW: + msg = msg + " raw files:" + std::to_string(raw_count); break; - case (int)SegmentSchema::NEW:msg = msg + " new files:" + std::to_string(new_count); + case (int)SegmentSchema::NEW: + msg = msg + " new files:" + std::to_string(new_count); break; case (int)SegmentSchema::NEW_MERGE: - msg = msg + " new_merge files:" - + std::to_string(new_merge_count); + msg = msg + " new_merge files:" + std::to_string(new_merge_count); break; case (int)SegmentSchema::NEW_INDEX: - msg = msg + " new_index files:" - + std::to_string(new_index_count); + msg = msg + " new_index files:" + std::to_string(new_index_count); break; - case (int)SegmentSchema::TO_INDEX:msg = msg + " to_index files:" + std::to_string(to_index_count); + case (int)SegmentSchema::TO_INDEX: + msg = msg + " to_index files:" + std::to_string(to_index_count); break; - case (int)SegmentSchema::INDEX:msg = msg + " index files:" + std::to_string(index_count); + case (int)SegmentSchema::INDEX: + msg = msg + " index files:" + std::to_string(index_count); break; - case (int)SegmentSchema::BACKUP:msg = msg + " backup files:" + std::to_string(backup_count); + case (int)SegmentSchema::BACKUP: + msg = msg + " backup files:" + std::to_string(backup_count); + break; + default: break; - default:break; } } LOG_ENGINE_DEBUG_ << msg; @@ -1416,10 +1355,9 @@ SqliteMetaImpl::FilesByID(const std::vector& ids, FilesHolder& files_hol server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.FilesByID.throw_exception", throw std::exception()); - auto select_columns = - columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_); + auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_); std::vector file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX, (int)SegmentSchema::INDEX}; @@ -1613,12 +1551,12 @@ SqliteMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/ std::lock_guard meta_lock(meta_mutex_); // collect files to be deleted - auto files = ConnectorPtr->select( - columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::engine_type_, &SegmentSchema::file_id_, &SegmentSchema::file_type_, - &SegmentSchema::date_), - where(in(&SegmentSchema::file_type_, file_types) and - c(&SegmentSchema::updated_time_) < now - seconds * US_PS)); + auto files = + ConnectorPtr->select(columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, + &SegmentSchema::segment_id_, &SegmentSchema::engine_type_, + &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::date_), + where(in(&SegmentSchema::file_type_, file_types) and + c(&SegmentSchema::updated_time_) < now - seconds * US_PS)); int64_t clean_files = 0; auto commited = ConnectorPtr->transaction([&]() mutable { @@ -1828,10 +1766,9 @@ SqliteMetaImpl::DiscardFiles(int64_t to_discard_size) { std::lock_guard meta_lock(meta_mutex_); auto commited = ConnectorPtr->transaction([&]() mutable { - auto selected = - ConnectorPtr->select(columns(&SegmentSchema::id_, &SegmentSchema::file_size_), - where(c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE), - order_by(&SegmentSchema::id_), limit(10)); + auto selected = ConnectorPtr->select(columns(&SegmentSchema::id_, &SegmentSchema::file_size_), + where(c(&SegmentSchema::file_type_) != (int)SegmentSchema::TO_DELETE), + order_by(&SegmentSchema::id_), limit(10)); std::vector ids; SegmentSchema collection_file; @@ -1928,9 +1865,9 @@ SqliteMetaImpl::CreateHybridCollection(meta::CollectionSchema& collection_schema NextCollectionId(collection_schema.collection_id_); } else { fiu_do_on("SqliteMetaImpl.CreateCollection.throw_exception", throw std::exception()); - auto collection = ConnectorPtr->select(columns(&CollectionSchema::state_), - where(c(&CollectionSchema::collection_id_) - == collection_schema.collection_id_)); + auto collection = + ConnectorPtr->select(columns(&CollectionSchema::state_), + where(c(&CollectionSchema::collection_id_) == collection_schema.collection_id_)); if (collection.size() == 1) { if (CollectionSchema::TO_DELETE == std::get<0>(collection[0])) { return Status(DB_ERROR, @@ -1961,8 +1898,7 @@ SqliteMetaImpl::CreateHybridCollection(meta::CollectionSchema& collection_schema } try { - for (uint64_t i = 0; i < fields_schema.fields_schema_.size(); ++i) { - hybrid::FieldSchema schema = fields_schema.fields_schema_[i]; + for (auto schema : fields_schema.fields_schema_) { auto field_id = ConnectorPtr->insert(schema); LOG_ENGINE_DEBUG_ << "Successfully create collection field" << field_id; } @@ -2017,12 +1953,10 @@ SqliteMetaImpl::DescribeHybridCollection(milvus::engine::meta::CollectionSchema& return Status(DB_NOT_FOUND, "Collection " + collection_schema.collection_id_ + " not found"); } - auto field_groups = ConnectorPtr->select( - columns(&hybrid::FieldSchema::collection_id_, - &hybrid::FieldSchema::field_name_, - &hybrid::FieldSchema::field_type_, - &hybrid::FieldSchema::field_params_), - where(c(&hybrid::FieldSchema::collection_id_) == collection_schema.collection_id_)); + auto field_groups = + ConnectorPtr->select(columns(&hybrid::FieldSchema::collection_id_, &hybrid::FieldSchema::field_name_, + &hybrid::FieldSchema::field_type_, &hybrid::FieldSchema::field_params_), + where(c(&hybrid::FieldSchema::collection_id_) == collection_schema.collection_id_)); if (field_groups.size() >= 1) { fields_schema.fields_schema_.resize(field_groups.size()); diff --git a/core/src/db/meta/SqliteMetaImpl.h b/core/src/db/meta/SqliteMetaImpl.h index 6ce4af8f92..67a0e688b0 100644 --- a/core/src/db/meta/SqliteMetaImpl.h +++ b/core/src/db/meta/SqliteMetaImpl.h @@ -164,8 +164,6 @@ class SqliteMetaImpl : public Meta { void ValidateMetaSchema(); - void - ValidateCollectionMetaSchema(); Status Initialize(); diff --git a/core/src/db/wal/WalBuffer.cpp b/core/src/db/wal/WalBuffer.cpp index 258848b25c..9ec1335e8b 100644 --- a/core/src/db/wal/WalBuffer.cpp +++ b/core/src/db/wal/WalBuffer.cpp @@ -12,6 +12,8 @@ #include "db/wal/WalBuffer.h" #include +#include +#include #include "db/wal/WalDefinations.h" #include "utils/Log.h" @@ -189,6 +191,28 @@ MXLogBuffer::RecordSize(const MXLogRecord& record) { record.length * (uint32_t)sizeof(IDNumber) + record.data_size; } +uint32_t +MXLogBuffer::EntityRecordSize(const milvus::engine::wal::MXLogRecord& record, uint32_t attr_num, + std::vector& field_name_size) { + uint32_t attr_header_size = 0; + attr_header_size += sizeof(uint32_t); + attr_header_size += attr_num * sizeof(uint64_t) * 3; + + uint32_t name_sizes = 0; + for (auto field_name : record.field_names) { + field_name_size.emplace_back(field_name.size()); + name_sizes += field_name.size(); + } + + uint64_t attr_size = 0; + auto attr_it = record.attr_data_size.begin(); + for (; attr_it != record.attr_data_size.end(); attr_it++) { + attr_size += attr_it->second; + } + + return RecordSize(record) + name_sizes + attr_size + attr_header_size; +} + ErrorCode MXLogBuffer::Append(MXLogRecord& record) { uint32_t record_size = RecordSize(record); @@ -257,6 +281,116 @@ MXLogBuffer::Append(MXLogRecord& record) { return WAL_SUCCESS; } +ErrorCode +MXLogBuffer::AppendEntity(milvus::engine::wal::MXLogRecord& record) { + std::vector field_name_size; + MXLogAttrRecordHeader attr_header; + attr_header.attr_num = 0; + for (auto name : record.field_names) { + attr_header.attr_num++; + attr_header.field_name_size.emplace_back(name.size()); + attr_header.attr_size.emplace_back(record.attr_data_size.at(name)); + attr_header.attr_nbytes.emplace_back(record.attr_nbytes.at(name)); + } + + uint32_t record_size = EntityRecordSize(record, attr_header.attr_num, field_name_size); + if (SurplusSpace() < record_size) { + // writer buffer has no space, switch wal file and write to a new buffer + std::unique_lock lck(mutex_); + if (mxlog_buffer_writer_.buf_idx == mxlog_buffer_reader_.buf_idx) { + // swith writer buffer + mxlog_buffer_reader_.max_offset = mxlog_buffer_writer_.buf_offset; + mxlog_buffer_writer_.buf_idx ^= 1; + } + mxlog_buffer_writer_.file_no++; + mxlog_buffer_writer_.buf_offset = 0; + lck.unlock(); + + // Reborn means close old wal file and open new wal file + if (!mxlog_writer_.ReBorn(ToFileName(mxlog_buffer_writer_.file_no), "w")) { + LOG_WAL_ERROR_ << "ReBorn wal file error " << mxlog_buffer_writer_.file_no; + return WAL_FILE_ERROR; + } + } + + // point to the offset of current record in wal file + char* current_write_buf = buf_[mxlog_buffer_writer_.buf_idx].get(); + uint32_t current_write_offset = mxlog_buffer_writer_.buf_offset; + + MXLogRecordHeader head; + BuildLsn(mxlog_buffer_writer_.file_no, mxlog_buffer_writer_.buf_offset + (uint32_t)record_size, head.mxl_lsn); + head.mxl_type = (uint8_t)record.type; + head.table_id_size = (uint16_t)record.collection_id.size(); + head.partition_tag_size = (uint16_t)record.partition_tag.size(); + head.vector_num = record.length; + head.data_size = record.data_size; + + memcpy(current_write_buf + current_write_offset, &head, SizeOfMXLogRecordHeader); + current_write_offset += SizeOfMXLogRecordHeader; + + memcpy(current_write_buf + current_write_offset, &attr_header.attr_num, sizeof(int32_t)); + current_write_offset += sizeof(int32_t); + + memcpy(current_write_buf + current_write_offset, attr_header.field_name_size.data(), + sizeof(int64_t) * attr_header.attr_num); + current_write_offset += sizeof(int64_t) * attr_header.attr_num; + + memcpy(current_write_buf + current_write_offset, attr_header.attr_size.data(), + sizeof(int64_t) * attr_header.attr_num); + current_write_offset += sizeof(int64_t) * attr_header.attr_num; + + memcpy(current_write_buf + current_write_offset, attr_header.attr_nbytes.data(), + sizeof(int64_t) * attr_header.attr_num); + current_write_offset += sizeof(int64_t) * attr_header.attr_num; + + if (!record.collection_id.empty()) { + memcpy(current_write_buf + current_write_offset, record.collection_id.data(), record.collection_id.size()); + current_write_offset += record.collection_id.size(); + } + + if (!record.partition_tag.empty()) { + memcpy(current_write_buf + current_write_offset, record.partition_tag.data(), record.partition_tag.size()); + current_write_offset += record.partition_tag.size(); + } + if (record.ids != nullptr && record.length > 0) { + memcpy(current_write_buf + current_write_offset, record.ids, record.length * sizeof(IDNumber)); + current_write_offset += record.length * sizeof(IDNumber); + } + + if (record.data != nullptr && record.data_size > 0) { + memcpy(current_write_buf + current_write_offset, record.data, record.data_size); + current_write_offset += record.data_size; + } + + // Assign attr names + for (auto name : record.field_names) { + if (name.size() > 0) { + memcpy(current_write_buf + current_write_offset, name.data(), name.size()); + current_write_offset += name.size(); + } + } + + // Assign attr values + for (auto name : record.field_names) { + if (record.attr_data_size.at(name) != 0) { + memcpy(current_write_buf + current_write_offset, record.attr_data.at(name).data(), + record.attr_data_size.at(name)); + current_write_offset += record.attr_data_size.at(name); + } + } + + bool write_rst = mxlog_writer_.Write(current_write_buf + mxlog_buffer_writer_.buf_offset, record_size); + if (!write_rst) { + LOG_WAL_ERROR_ << "write wal file error"; + return WAL_FILE_ERROR; + } + + mxlog_buffer_writer_.buf_offset = current_write_offset; + + record.lsn = head.mxl_lsn; + return WAL_SUCCESS; +} + ErrorCode MXLogBuffer::Next(const uint64_t last_applied_lsn, MXLogRecord& record) { // init output @@ -337,6 +471,138 @@ MXLogBuffer::Next(const uint64_t last_applied_lsn, MXLogRecord& record) { return WAL_SUCCESS; } +ErrorCode +MXLogBuffer::NextEntity(const uint64_t last_applied_lsn, milvus::engine::wal::MXLogRecord& record) { + // init output + record.type = MXLogType::None; + + // reader catch up to writer, no next record, read fail + if (GetReadLsn() >= last_applied_lsn) { + return WAL_SUCCESS; + } + + // otherwise, it means there must exists next record, in buffer or wal log + bool need_load_new = false; + std::unique_lock lck(mutex_); + if (mxlog_buffer_reader_.file_no != mxlog_buffer_writer_.file_no) { + if (mxlog_buffer_reader_.buf_offset == mxlog_buffer_reader_.max_offset) { // last record + mxlog_buffer_reader_.file_no++; + mxlog_buffer_reader_.buf_offset = 0; + need_load_new = (mxlog_buffer_reader_.file_no != mxlog_buffer_writer_.file_no); + if (!need_load_new) { + // read reach write buffer + mxlog_buffer_reader_.buf_idx = mxlog_buffer_writer_.buf_idx; + } + } + } + lck.unlock(); + + if (need_load_new) { + MXLogFileHandler mxlog_reader(mxlog_writer_.GetFilePath()); + mxlog_reader.SetFileName(ToFileName(mxlog_buffer_reader_.file_no)); + mxlog_reader.SetFileOpenMode("r"); + uint32_t file_size = mxlog_reader.Load(buf_[mxlog_buffer_reader_.buf_idx].get(), 0); + if (file_size == 0) { + LOG_WAL_ERROR_ << "load wal file error " << mxlog_buffer_reader_.file_no; + return WAL_FILE_ERROR; + } + mxlog_buffer_reader_.max_offset = file_size; + } + + char* current_read_buf = buf_[mxlog_buffer_reader_.buf_idx].get(); + uint64_t current_read_offset = mxlog_buffer_reader_.buf_offset; + + MXLogRecordHeader* head = (MXLogRecordHeader*)(current_read_buf + current_read_offset); + + record.type = (MXLogType)head->mxl_type; + record.lsn = head->mxl_lsn; + record.length = head->vector_num; + record.data_size = head->data_size; + + current_read_offset += SizeOfMXLogRecordHeader; + + MXLogAttrRecordHeader attr_head; + + memcpy(&attr_head.attr_num, current_read_buf + current_read_offset, sizeof(uint32_t)); + current_read_offset += sizeof(uint32_t); + + attr_head.attr_size.resize(attr_head.attr_num); + attr_head.field_name_size.resize(attr_head.attr_num); + attr_head.attr_nbytes.resize(attr_head.attr_num); + memcpy(attr_head.field_name_size.data(), current_read_buf + current_read_offset, + sizeof(uint64_t) * attr_head.attr_num); + current_read_offset += sizeof(uint64_t) * attr_head.attr_num; + + memcpy(attr_head.attr_size.data(), current_read_buf + current_read_offset, sizeof(uint64_t) * attr_head.attr_num); + current_read_offset += sizeof(uint64_t) * attr_head.attr_num; + + memcpy(attr_head.attr_nbytes.data(), current_read_buf + current_read_offset, sizeof(uint64_t) * attr_head.attr_num); + current_read_offset += sizeof(uint64_t) * attr_head.attr_num; + + if (head->table_id_size != 0) { + record.collection_id.assign(current_read_buf + current_read_offset, head->table_id_size); + current_read_offset += head->table_id_size; + } else { + record.collection_id = ""; + } + + if (head->partition_tag_size != 0) { + record.partition_tag.assign(current_read_buf + current_read_offset, head->partition_tag_size); + current_read_offset += head->partition_tag_size; + } else { + record.partition_tag = ""; + } + + if (head->vector_num != 0) { + record.ids = (IDNumber*)(current_read_buf + current_read_offset); + current_read_offset += head->vector_num * sizeof(IDNumber); + } else { + record.ids = nullptr; + } + + if (record.data_size != 0) { + record.data = current_read_buf + current_read_offset; + current_read_offset += record.data_size; + } else { + record.data = nullptr; + } + + // Read field names + auto attr_num = attr_head.attr_num; + record.field_names.clear(); + if (attr_num > 0) { + for (auto size : attr_head.field_name_size) { + if (size != 0) { + std::string name; + name.assign(current_read_buf + current_read_offset, size); + record.field_names.emplace_back(name); + current_read_offset += size; + } else { + record.field_names.emplace_back(""); + } + } + } + + // Read attributes data + record.attr_data.clear(); + record.attr_data_size.clear(); + record.attr_nbytes.clear(); + if (attr_num > 0) { + for (uint64_t i = 0; i < attr_num; ++i) { + auto attr_size = attr_head.attr_size[i]; + record.attr_data_size.insert(std::make_pair(record.field_names[i], attr_size)); + record.attr_nbytes.insert(std::make_pair(record.field_names[i], attr_head.attr_nbytes[i])); + std::vector data(attr_size); + memcpy(data.data(), current_read_buf + current_read_offset, attr_size); + record.attr_data.insert(std::make_pair(record.field_names[i], data)); + current_read_offset += attr_size; + } + } + + mxlog_buffer_reader_.buf_offset = uint32_t(head->mxl_lsn & LSN_OFFSET_MASK); + return WAL_SUCCESS; +} + uint64_t MXLogBuffer::GetReadLsn() { uint64_t read_lsn; diff --git a/core/src/db/wal/WalBuffer.h b/core/src/db/wal/WalBuffer.h index 4834bce7f9..fee819fa65 100644 --- a/core/src/db/wal/WalBuffer.h +++ b/core/src/db/wal/WalBuffer.h @@ -15,6 +15,7 @@ #include #include #include +#include #include "WalDefinations.h" #include "WalFileHandler.h" @@ -39,6 +40,13 @@ struct MXLogRecordHeader { const uint32_t SizeOfMXLogRecordHeader = sizeof(MXLogRecordHeader); +struct MXLogAttrRecordHeader { + uint32_t attr_num; + std::vector field_name_size; + std::vector attr_size; + std::vector attr_nbytes; +}; + #pragma pack(pop) struct MXLogBufferHandler { @@ -66,9 +74,15 @@ class MXLogBuffer { ErrorCode Append(MXLogRecord& record); + ErrorCode + AppendEntity(MXLogRecord& record); + ErrorCode Next(const uint64_t last_applied_lsn, MXLogRecord& record); + ErrorCode + NextEntity(const uint64_t last_applied_lsn, MXLogRecord& record); + uint64_t GetReadLsn(); @@ -91,6 +105,10 @@ class MXLogBuffer { uint32_t RecordSize(const MXLogRecord& record); + uint32_t + EntityRecordSize(const milvus::engine::wal::MXLogRecord& record, uint32_t attr_num, + std::vector& field_name_size); + private: uint32_t mxlog_buffer_size_; // from config BufferPtr buf_[2]; diff --git a/core/src/db/wal/WalDefinations.h b/core/src/db/wal/WalDefinations.h index b93205faef..6491a02f61 100644 --- a/core/src/db/wal/WalDefinations.h +++ b/core/src/db/wal/WalDefinations.h @@ -40,6 +40,9 @@ struct MXLogRecord { const IDNumber* ids; uint32_t data_size; const void* data; + std::vector field_names; + // std::vector attrs_size; + // std::vector attrs_data; std::unordered_map attr_nbytes; std::unordered_map attr_data_size; std::unordered_map> attr_data; diff --git a/core/src/db/wal/WalManager.cpp b/core/src/db/wal/WalManager.cpp index 574096bfed..246e114af1 100644 --- a/core/src/db/wal/WalManager.cpp +++ b/core/src/db/wal/WalManager.cpp @@ -15,6 +15,7 @@ #include #include +#include #include "config/Config.h" #include "utils/CommonUtil.h" @@ -157,6 +158,44 @@ WalManager::GetNextRecovery(MXLogRecord& record) { return error_code; } +ErrorCode +WalManager::GetNextEntityRecovery(milvus::engine::wal::MXLogRecord& record) { + ErrorCode error_code = WAL_SUCCESS; + while (true) { + error_code = p_buffer_->NextEntity(last_applied_lsn_, record); + if (error_code != WAL_SUCCESS) { + if (mxlog_config_.recovery_error_ignore) { + // reset and break recovery + p_buffer_->Reset(last_applied_lsn_); + + record.type = MXLogType::None; + error_code = WAL_SUCCESS; + } + break; + } + if (record.type == MXLogType::None) { + break; + } + + // background thread has not started. + // so, needn't lock here. + auto it = tables_.find(record.collection_id); + if (it != tables_.end()) { + if (it->second.flush_lsn < record.lsn) { + break; + } + } + } + + // print the log only when record.type != MXLogType::None + if (record.type != MXLogType::None) { + LOG_WAL_INFO_ << "record type " << (int32_t)record.type << " record lsn " << record.lsn << " error code " + << error_code; + } + + return error_code; +} + ErrorCode WalManager::GetNextRecord(MXLogRecord& record) { auto check_flush = [&]() -> bool { @@ -203,6 +242,52 @@ WalManager::GetNextRecord(MXLogRecord& record) { return error_code; } +ErrorCode +WalManager::GetNextEntityRecord(milvus::engine::wal::MXLogRecord& record) { + auto check_flush = [&]() -> bool { + std::lock_guard lck(mutex_); + if (flush_info_.IsValid()) { + if (p_buffer_->GetReadLsn() >= flush_info_.lsn_) { + // can exec flush requirement + record.type = MXLogType::Flush; + record.collection_id = flush_info_.collection_id_; + record.lsn = flush_info_.lsn_; + flush_info_.Clear(); + + LOG_WAL_INFO_ << "record flush collection " << record.collection_id << " lsn " << record.lsn; + return true; + } + } + return false; + }; + + if (check_flush()) { + return WAL_SUCCESS; + } + + ErrorCode error_code = WAL_SUCCESS; + while (WAL_SUCCESS == p_buffer_->NextEntity(last_applied_lsn_, record)) { + if (record.type == MXLogType::None) { + if (check_flush()) { + return WAL_SUCCESS; + } + break; + } + + std::lock_guard lck(mutex_); + auto it = tables_.find(record.collection_id); + if (it != tables_.end()) { + if (it->second.flush_lsn < record.lsn) { + break; + } + } + } + + LOG_WAL_INFO_ << "record type " << (int32_t)record.type << " collection " << record.collection_id << " lsn " + << record.lsn; + return error_code; +} + uint64_t WalManager::CreateCollection(const std::string& collection_id) { LOG_WAL_INFO_ << "create collection " << collection_id << " " << last_applied_lsn_; @@ -212,6 +297,15 @@ WalManager::CreateCollection(const std::string& collection_id) { return applied_lsn; } +uint64_t +WalManager::CreateHybridCollection(const std::string& collection_id) { + LOG_WAL_INFO_ << "create hybrid collection " << collection_id << " " << last_applied_lsn_; + std::lock_guard lck(mutex_); + uint64_t applied_lsn = last_applied_lsn_; + tables_[collection_id] = {applied_lsn, applied_lsn}; + return applied_lsn; +} + void WalManager::DropCollection(const std::string& collection_id) { LOG_WAL_INFO_ << "drop collection " << collection_id; @@ -300,6 +394,98 @@ WalManager::Insert(const std::string& collection_id, const std::string& partitio return p_meta_handler_->SetMXLogInternalMeta(new_lsn); } +template +bool +WalManager::InsertEntities(const std::string& collection_id, const std::string& partition_tag, + const milvus::engine::IDNumbers& entity_ids, const std::vector& vectors, + const std::unordered_map& attr_nbytes, + const std::unordered_map>& attrs) { + MXLogType log_type; + if (std::is_same::value) { + log_type = MXLogType::Entity; + } else { + return false; + } + + size_t entity_num = entity_ids.size(); + if (entity_num == 0) { + LOG_WAL_ERROR_ << LogOut("[%s][%ld] The ids is empty.", "insert", 0); + return false; + } + size_t dim = vectors.size() / entity_num; + + MXLogRecord record; + + size_t attr_unit_size = 0; + auto attr_it = attr_nbytes.begin(); + for (; attr_it != attr_nbytes.end(); attr_it++) { + record.field_names.emplace_back(attr_it->first); + attr_unit_size += attr_it->second; + } + + size_t unit_size = dim * sizeof(T) + sizeof(IDNumber) + attr_unit_size; + size_t head_size = SizeOfMXLogRecordHeader + collection_id.length() + partition_tag.length(); + + // TODO(yukun): field_name put into MXLogRecord??? + + record.type = log_type; + record.collection_id = collection_id; + record.partition_tag = partition_tag; + record.attr_nbytes = attr_nbytes; + + uint64_t new_lsn = 0; + for (size_t i = 0; i < entity_num; i += record.length) { + size_t surplus_space = p_buffer_->SurplusSpace(); + size_t max_rcd_num = 0; + if (surplus_space >= head_size + unit_size) { + max_rcd_num = (surplus_space - head_size) / unit_size; + } else { + max_rcd_num = (mxlog_config_.buffer_size - head_size) / unit_size; + } + if (max_rcd_num == 0) { + LOG_WAL_ERROR_ << LogOut("[%s][%ld]", "insert", 0) << "Wal buffer size is too small " + << mxlog_config_.buffer_size << " unit " << unit_size; + return false; + } + + size_t length = std::min(entity_num - i, max_rcd_num); + record.length = length; + record.ids = entity_ids.data() + i; + record.data_size = record.length * dim * sizeof(T); + record.data = vectors.data() + i * dim; + + record.attr_data.clear(); + record.attr_data_size.clear(); + for (auto field_name : record.field_names) { + size_t attr_size = length * attr_nbytes.at(field_name); + record.attr_data_size.insert(std::make_pair(field_name, attr_size)); + std::vector attr_data(attr_size, 0); + memcpy(attr_data.data(), attrs.at(field_name).data() + i * attr_nbytes.at(field_name), attr_size); + record.attr_data.insert(std::make_pair(field_name, attr_data)); + } + + auto error_code = p_buffer_->AppendEntity(record); + if (error_code != WAL_SUCCESS) { + p_buffer_->ResetWriteLsn(last_applied_lsn_); + return false; + } + new_lsn = record.lsn; + } + + std::unique_lock lck(mutex_); + last_applied_lsn_ = new_lsn; + auto it = tables_.find(collection_id); + if (it != tables_.end()) { + it->second.wal_lsn = new_lsn; + } + lck.unlock(); + + LOG_WAL_INFO_ << LogOut("[%s][%ld]", "insert", 0) << collection_id << " insert in part " << partition_tag + << " with lsn " << new_lsn; + + return p_meta_handler_->SetMXLogInternalMeta(new_lsn); +} + bool WalManager::DeleteById(const std::string& collection_id, const IDNumbers& vector_ids) { size_t vector_num = vector_ids.size(); @@ -404,6 +590,18 @@ template bool WalManager::Insert(const std::string& collection_id, const std::string& partition_tag, const IDNumbers& vector_ids, const std::vector& vectors); +template bool +WalManager::InsertEntities(const std::string& collection_id, const std::string& partition_tag, + const milvus::engine::IDNumbers& entity_ids, const std::vector& vectors, + const std::unordered_map& attr_nbytes, + const std::unordered_map>& attrs); + +template bool +WalManager::InsertEntities(const std::string& collection_id, const std::string& partition_tag, + const milvus::engine::IDNumbers& entity_ids, const std::vector& vectors, + const std::unordered_map& attr_nbytes, + const std::unordered_map>& attrs); + } // namespace wal } // namespace engine } // namespace milvus diff --git a/core/src/db/wal/WalManager.h b/core/src/db/wal/WalManager.h index 220caa7b45..a8be297b04 100644 --- a/core/src/db/wal/WalManager.h +++ b/core/src/db/wal/WalManager.h @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -48,6 +49,9 @@ class WalManager { ErrorCode GetNextRecovery(MXLogRecord& record); + ErrorCode + GetNextEntityRecovery(MXLogRecord& record); + /* * Get next record * @param record[out]: record @@ -56,6 +60,8 @@ class WalManager { ErrorCode GetNextRecord(MXLogRecord& record); + ErrorCode + GetNextEntityRecord(MXLogRecord& record); /* * Create collection * @param collection_id: collection id @@ -64,6 +70,14 @@ class WalManager { uint64_t CreateCollection(const std::string& collection_id); + /* + * Create hybrid collection + * @param collection_id: collection id + * @retval lsn + */ + uint64_t + CreateHybridCollection(const std::string& collection_id); + /* * Drop collection * @param collection_id: collection id @@ -92,6 +106,21 @@ class WalManager { Insert(const std::string& collection_id, const std::string& partition_tag, const IDNumbers& vector_ids, const std::vector& vectors); + /* + * Insert + * @param collection_id: collection id + * @param partition_tag: partition tag + * @param vector_ids: vector ids + * @param vectors: vectors + * @param attrs: attributes + */ + template + bool + InsertEntities(const std::string& collection_id, const std::string& partition_tag, + const milvus::engine::IDNumbers& entity_ids, const std::vector& vectors, + const std::unordered_map& attr_nbytes, + const std::unordered_map>& attrs); + /* * Insert * @param collection_id: collection id diff --git a/core/unittest/db/test_hybrid_db.cpp b/core/unittest/db/test_hybrid_db.cpp index a248e41b4a..eb62db0067 100644 --- a/core/unittest/db/test_hybrid_db.cpp +++ b/core/unittest/db/test_hybrid_db.cpp @@ -88,26 +88,26 @@ BuildEntity(uint64_t n, uint64_t batch_index, milvus::engine::Entity& entity) { vectors.id_array_.push_back(n * batch_index + i); } entity.vector_data_.insert(std::make_pair("field_3", vectors)); - std::vector value_0; + std::vector value_0; std::vector value_1; - std::vector value_2; + std::vector value_2; value_0.resize(n); value_1.resize(n); value_2.resize(n); for (uint64_t i = 0; i < n; ++i) { value_0[i] = i; value_1[i] = i + n; - value_2[i] = (float)((i + 100) / (n + 1)); + value_2[i] = (double)((i + 100) / (n + 1)); } entity.entity_count_ = n; - size_t attr_size = n * (sizeof(int32_t) + sizeof(float) + sizeof(int64_t)); + size_t attr_size = n * (sizeof(int64_t) + sizeof(double) + sizeof(int64_t)); std::vector attr_value(attr_size, 0); size_t offset = 0; - memcpy(attr_value.data(), value_0.data(), n * sizeof(int32_t)); - offset += n * sizeof(int32_t); + memcpy(attr_value.data(), value_0.data(), n * sizeof(int64_t)); + offset += n * sizeof(int64_t); memcpy(attr_value.data() + offset, value_1.data(), n * sizeof(int64_t)); offset += n * sizeof(int64_t); - memcpy(attr_value.data() + offset, value_2.data(), n * sizeof(float)); + memcpy(attr_value.data() + offset, value_2.data(), n * sizeof(double)); entity.attr_value_ = attr_value; } diff --git a/core/unittest/db/test_meta.cpp b/core/unittest/db/test_meta.cpp index 1f2ab2ec22..2040e11a26 100644 --- a/core/unittest/db/test_meta.cpp +++ b/core/unittest/db/test_meta.cpp @@ -15,12 +15,11 @@ #include "db/meta/SqliteMetaImpl.h" #include "db/utils.h" +#include +#include #include #include #include -#include -#include -#include #include TEST_F(MetaTest, COLLECTION_TEST) { @@ -71,7 +70,7 @@ TEST_F(MetaTest, FALID_TEST) { fiu_disable("SqliteMetaImpl.ValidateMetaSchema.NullConnection"); } { - //failed initialize + // failed initialize auto options_1 = options; options_1.meta_.path_ = options.meta_.path_ + "1"; if (boost::filesystem::is_directory(options_1.meta_.path_)) { @@ -97,7 +96,7 @@ TEST_F(MetaTest, FALID_TEST) { ASSERT_FALSE(status.ok()); fiu_disable("SqliteMetaImpl.CreateCollection.insert_throw_exception"); - //success create collection + // success create collection collection.collection_id_ = collection_id; status = impl_->CreateCollection(collection); ASSERT_TRUE(status.ok()); @@ -236,7 +235,7 @@ TEST_F(MetaTest, FALID_TEST) { status = impl_->CreatePartition(collection_id, partition, partition_tag, 0); ASSERT_FALSE(status.ok()); - //create empty name partition + // create empty name partition partition = ""; status = impl_->CreatePartition(collection_id, partition, partition_tag, 0); ASSERT_TRUE(status.ok()); @@ -381,6 +380,34 @@ TEST_F(MetaTest, COLLECTION_FILE_TEST) { ASSERT_EQ(table_file.file_type_, new_file_type); } +TEST_F(MetaTest, HYBRID_COLLECTION_TEST) { + auto collection_id = "meta_test_hybrid"; + + milvus::engine::meta::CollectionSchema collection; + collection.collection_id_ = collection_id; + collection.dimension_ = 128; + milvus::engine::meta::hybrid::FieldsSchema fields_schema; + fields_schema.fields_schema_.resize(2); + fields_schema.fields_schema_[0].collection_id_ = collection_id; + fields_schema.fields_schema_[0].field_name_ = "field_0"; + fields_schema.fields_schema_[0].field_type_ = (int32_t)milvus::engine::meta::hybrid::DataType::INT64; + fields_schema.fields_schema_[0].field_params_ = ""; + + fields_schema.fields_schema_[1].collection_id_ = collection_id; + fields_schema.fields_schema_[1].field_name_ = "field_1"; + fields_schema.fields_schema_[1].field_type_ = (int32_t)milvus::engine::meta::hybrid::DataType::VECTOR; + fields_schema.fields_schema_[1].field_params_ = ""; + + auto status = impl_->CreateHybridCollection(collection, fields_schema); + ASSERT_TRUE(status.ok()); + milvus::engine::meta::CollectionSchema describe_collection; + milvus::engine::meta::hybrid::FieldsSchema describe_fields; + describe_collection.collection_id_ = collection_id; + status = impl_->DescribeHybridCollection(describe_collection, describe_fields); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(describe_fields.fields_schema_.size(), 2); +} + TEST_F(MetaTest, COLLECTION_FILE_ROW_COUNT_TEST) { auto collection_id = "row_count_test_table"; @@ -649,9 +676,9 @@ TEST_F(MetaTest, COLLECTION_FILES_TEST) { ASSERT_FALSE(status.ok()); file_types = { - milvus::engine::meta::SegmentSchema::NEW, milvus::engine::meta::SegmentSchema::NEW_MERGE, + milvus::engine::meta::SegmentSchema::NEW, milvus::engine::meta::SegmentSchema::NEW_MERGE, milvus::engine::meta::SegmentSchema::NEW_INDEX, milvus::engine::meta::SegmentSchema::TO_INDEX, - milvus::engine::meta::SegmentSchema::INDEX, milvus::engine::meta::SegmentSchema::RAW, + milvus::engine::meta::SegmentSchema::INDEX, milvus::engine::meta::SegmentSchema::RAW, milvus::engine::meta::SegmentSchema::BACKUP, }; status = impl_->FilesByType(collection.collection_id_, file_types, files_holder); diff --git a/core/unittest/db/test_meta_mysql.cpp b/core/unittest/db/test_meta_mysql.cpp index 8b0c4091b4..791729fe6d 100644 --- a/core/unittest/db/test_meta_mysql.cpp +++ b/core/unittest/db/test_meta_mysql.cpp @@ -14,6 +14,8 @@ #include "db/meta/MySQLMetaImpl.h" #include "db/utils.h" +#include +#include #include #include #include @@ -21,8 +23,6 @@ #include #include #include -#include -#include const char* FAILED_CONNECT_SQL_SERVER = "Failed to connect to meta server(mysql)"; const char* COLLECTION_ALREADY_EXISTS = "Collection already exists and it is in delete state, please wait a second"; @@ -67,7 +67,7 @@ TEST_F(MySqlMetaTest, COLLECTION_TEST) { ASSERT_FALSE(stat.ok()); fiu_disable("MySQLMetaImpl.CreateCollection.throw_exception"); - //ensure collection exists + // ensure collection exists stat = impl_->CreateCollection(collection); FIU_ENABLE_FIU("MySQLMetaImpl.CreateCollection.schema_TO_DELETE"); stat = impl_->CreateCollection(collection); @@ -121,6 +121,34 @@ TEST_F(MySqlMetaTest, COLLECTION_TEST) { ASSERT_TRUE(status.ok()); } +TEST_F(MySqlMetaTest, HYBRID_COLLECTION_TEST) { + auto collection_id = "meta_test_hybrid"; + + milvus::engine::meta::CollectionSchema collection; + collection.collection_id_ = collection_id; + collection.dimension_ = 128; + milvus::engine::meta::hybrid::FieldsSchema fields_schema; + fields_schema.fields_schema_.resize(2); + fields_schema.fields_schema_[0].collection_id_ = collection_id; + fields_schema.fields_schema_[0].field_name_ = "field_0"; + fields_schema.fields_schema_[0].field_type_ = (int32_t)milvus::engine::meta::hybrid::DataType::INT64; + fields_schema.fields_schema_[0].field_params_ = ""; + + fields_schema.fields_schema_[1].collection_id_ = collection_id; + fields_schema.fields_schema_[1].field_name_ = "field_1"; + fields_schema.fields_schema_[1].field_type_ = (int32_t)milvus::engine::meta::hybrid::DataType::VECTOR; + fields_schema.fields_schema_[1].field_params_ = ""; + + auto status = impl_->CreateHybridCollection(collection, fields_schema); + ASSERT_TRUE(status.ok()); + milvus::engine::meta::CollectionSchema describe_collection; + milvus::engine::meta::hybrid::FieldsSchema describe_fields; + describe_collection.collection_id_ = collection_id; + status = impl_->DescribeHybridCollection(describe_collection, describe_fields); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(describe_fields.fields_schema_.size(), 2); +} + TEST_F(MySqlMetaTest, COLLECTION_FILE_TEST) { auto collection_id = "meta_test_table"; fiu_init(0); @@ -135,7 +163,7 @@ TEST_F(MySqlMetaTest, COLLECTION_FILE_TEST) { collection.dimension_ = 256; status = impl_->CreateCollection(collection); - //CreateCollectionFile + // CreateCollectionFile milvus::engine::meta::SegmentSchema table_file; table_file.collection_id_ = collection.collection_id_; status = impl_->CreateCollectionFile(table_file); @@ -157,7 +185,7 @@ TEST_F(MySqlMetaTest, COLLECTION_FILE_TEST) { ASSERT_FALSE(status.ok()); fiu_disable("MySQLMetaImpl.DescribeCollection.throw_exception"); - //Count + // Count uint64_t cnt = 0; status = impl_->Count(collection_id, cnt); // ASSERT_TRUE(status.ok()); @@ -182,7 +210,7 @@ TEST_F(MySqlMetaTest, COLLECTION_FILE_TEST) { auto new_file_type = milvus::engine::meta::SegmentSchema::INDEX; table_file.file_type_ = new_file_type; - //UpdateCollectionFile + // UpdateCollectionFile FIU_ENABLE_FIU("MySQLMetaImpl.UpdateCollectionFile.null_connection"); status = impl_->UpdateCollectionFile(table_file); ASSERT_FALSE(status.ok()); @@ -487,7 +515,7 @@ TEST_F(MySqlMetaTest, INVALID_INITILIZE_TEST) { milvus::engine::DBMetaOptions meta = GetOptions().meta_; { FIU_ENABLE_FIU("MySQLMetaImpl.Initialize.fail_create_directory"); - //delete directory created by SetUp + // delete directory created by SetUp boost::filesystem::remove_all(meta.path_); ASSERT_ANY_THROW(milvus::engine::meta::MySQLMetaImpl impl(meta, GetOptions().mode_)); fiu_disable("MySQLMetaImpl.Initialize.fail_create_directory"); @@ -674,9 +702,9 @@ TEST_F(MySqlMetaTest, COLLECTION_FILES_TEST) { ASSERT_FALSE(status.ok()); file_types = { - milvus::engine::meta::SegmentSchema::NEW, milvus::engine::meta::SegmentSchema::NEW_MERGE, + milvus::engine::meta::SegmentSchema::NEW, milvus::engine::meta::SegmentSchema::NEW_MERGE, milvus::engine::meta::SegmentSchema::NEW_INDEX, milvus::engine::meta::SegmentSchema::TO_INDEX, - milvus::engine::meta::SegmentSchema::INDEX, milvus::engine::meta::SegmentSchema::RAW, + milvus::engine::meta::SegmentSchema::INDEX, milvus::engine::meta::SegmentSchema::RAW, milvus::engine::meta::SegmentSchema::BACKUP, }; status = impl_->FilesByType(collection.collection_id_, file_types, files_holder); @@ -810,4 +838,3 @@ TEST_F(MySqlMetaTest, INDEX_TEST) { status = impl_->UpdateCollectionFilesToIndex(collection_id); ASSERT_TRUE(status.ok()); } - diff --git a/core/unittest/db/test_wal.cpp b/core/unittest/db/test_wal.cpp index 17f0903fd5..a4bdc34e22 100644 --- a/core/unittest/db/test_wal.cpp +++ b/core/unittest/db/test_wal.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -413,6 +414,254 @@ TEST(WalTest, BUFFER_TEST) { } } +TEST(WalTest, HYBRID_BUFFFER_TEST) { + MakeEmptyTestPath(); + + milvus::engine::wal::MXLogBuffer buffer(WAL_GTEST_PATH, 2048); + + uint32_t file_no = 4; + uint32_t buf_off = 100; + uint64_t lsn = (uint64_t)file_no << 32 | buf_off; + buffer.mxlog_buffer_size_ = 2000; + buffer.Reset(lsn); + + milvus::engine::wal::MXLogRecord record[4]; + milvus::engine::wal::MXLogRecord read_rst; + + // write 0 + record[0].type = milvus::engine::wal::MXLogType::Entity; + record[0].collection_id = "insert_hybrid_collection"; + record[0].partition_tag = "parti1"; + uint64_t length = 50; + record[0].length = length; + record[0].ids = (milvus::engine::IDNumber*)malloc(record[0].length * sizeof(milvus::engine::IDNumber)); + record[0].data_size = record[0].length * sizeof(float); + record[0].data = malloc(record[0].data_size); + record[0].field_names.resize(2); + record[0].field_names[0] = "field_0"; + record[0].field_names[1] = "field_1"; + record[0].attr_data_size.insert(std::make_pair("field_0", length * sizeof(int64_t))); + record[0].attr_data_size.insert(std::make_pair("field_1", length * sizeof(float))); + record[0].attr_nbytes.insert(std::make_pair("field_0", sizeof(uint64_t))); + record[0].attr_nbytes.insert(std::make_pair("field_1", sizeof(float))); + + std::vector data_0(length); + std::default_random_engine e; + std::uniform_int_distribution u(0, 1000); + for (uint64_t i = 0; i < length; ++i) { + data_0[i] = u(e); + } + std::vector attr_data_0(length * sizeof(int64_t)); + memcpy(attr_data_0.data(), data_0.data(), length * sizeof(int64_t)); + record[0].attr_data.insert(std::make_pair("field_0", attr_data_0)); + + std::vector data_1(length); + std::default_random_engine e1; + std::uniform_real_distribution u1(0, 1); + for (uint64_t i = 0; i < length; ++i) { + data_1[i] = u1(e1); + } + std::vector attr_data_1(length * sizeof(float)); + memcpy(attr_data_1.data(), data_1.data(), length * sizeof(float)); + record[0].attr_data.insert(std::make_pair("field_1", attr_data_1)); + + ASSERT_EQ(buffer.AppendEntity(record[0]), milvus::WAL_SUCCESS); + uint32_t new_file_no = uint32_t(record[0].lsn >> 32); + ASSERT_EQ(new_file_no, ++file_no); + + // write 1 + record[1].type = milvus::engine::wal::MXLogType::Delete; + record[1].collection_id = "insert_hybrid_collection"; + record[1].partition_tag = "parti1"; + length = 10; + record[1].length = length; + record[1].ids = (milvus::engine::IDNumber*)malloc(record[0].length * sizeof(milvus::engine::IDNumber)); + record[1].data_size = 0; + record[1].data = nullptr; + record[1].field_names.resize(2); + record[1].field_names[0] = "field_0"; + record[1].field_names[1] = "field_1"; + record[1].attr_data_size.insert(std::make_pair("field_0", length * sizeof(int64_t))); + record[1].attr_data_size.insert(std::make_pair("field_1", length * sizeof(float))); + record[1].attr_nbytes.insert(std::make_pair("field_0", sizeof(uint64_t))); + record[1].attr_nbytes.insert(std::make_pair("field_1", sizeof(float))); + + std::vector data1_0(length); + for (uint64_t i = 0; i < length; ++i) { + data_0[i] = u(e); + } + std::vector attr_data1_0(length * sizeof(int64_t)); + memcpy(attr_data1_0.data(), data1_0.data(), length * sizeof(int64_t)); + record[1].attr_data.insert(std::make_pair("field_0", attr_data1_0)); + + std::vector data1_1(length); + for (uint64_t i = 0; i < length; ++i) { + data_1[i] = u1(e1); + } + std::vector attr_data1_1(length * sizeof(float)); + memcpy(attr_data1_1.data(), data1_1.data(), length * sizeof(float)); + record[1].attr_data.insert(std::make_pair("field_1", attr_data1_1)); + ASSERT_EQ(buffer.AppendEntity(record[1]), milvus::WAL_SUCCESS); + new_file_no = uint32_t(record[1].lsn >> 32); + ASSERT_EQ(new_file_no, file_no); + + // read 0 + ASSERT_EQ(buffer.NextEntity(record[1].lsn, read_rst), milvus::WAL_SUCCESS); + ASSERT_EQ(read_rst.type, record[0].type); + ASSERT_EQ(read_rst.collection_id, record[0].collection_id); + ASSERT_EQ(read_rst.partition_tag, record[0].partition_tag); + ASSERT_EQ(read_rst.length, record[0].length); + ASSERT_EQ(memcmp(read_rst.ids, record[0].ids, read_rst.length * sizeof(milvus::engine::IDNumber)), 0); + ASSERT_EQ(read_rst.data_size, record[0].data_size); + ASSERT_EQ(memcmp(read_rst.data, record[0].data, read_rst.data_size), 0); + ASSERT_EQ(read_rst.field_names.size(), record[0].field_names.size()); + ASSERT_EQ(read_rst.field_names[0], record[0].field_names[0]); + ASSERT_EQ(read_rst.attr_data.at("field_0").size(), record[0].attr_data.at("field_0").size()); + ASSERT_EQ(read_rst.attr_nbytes.at("field_0"), record[0].attr_nbytes.at("field_0")); + + // read 1 + ASSERT_EQ(buffer.NextEntity(record[1].lsn, read_rst), milvus::WAL_SUCCESS); + ASSERT_EQ(read_rst.type, record[1].type); + ASSERT_EQ(read_rst.collection_id, record[1].collection_id); + ASSERT_EQ(read_rst.partition_tag, record[1].partition_tag); + ASSERT_EQ(read_rst.length, record[1].length); + ASSERT_EQ(memcmp(read_rst.ids, record[1].ids, read_rst.length * sizeof(milvus::engine::IDNumber)), 0); + ASSERT_EQ(read_rst.data_size, 0); + ASSERT_EQ(read_rst.data, nullptr); + ASSERT_EQ(read_rst.field_names.size(), record[1].field_names.size()); + ASSERT_EQ(read_rst.field_names[1], record[1].field_names[1]); + ASSERT_EQ(read_rst.attr_data.at("field_1").size(), record[1].attr_data.at("field_1").size()); + ASSERT_EQ(read_rst.attr_nbytes.at("field_0"), record[1].attr_nbytes.at("field_0")); + + // read empty + ASSERT_EQ(buffer.NextEntity(record[1].lsn, read_rst), milvus::WAL_SUCCESS); + ASSERT_EQ(read_rst.type, milvus::engine::wal::MXLogType::None); + + // write 2 (new file) + record[2].type = milvus::engine::wal::MXLogType::Entity; + record[2].collection_id = "insert_table"; + record[2].partition_tag = "parti1"; + length = 50; + record[2].length = length; + record[2].ids = (milvus::engine::IDNumber*)malloc(record[2].length * sizeof(milvus::engine::IDNumber)); + record[2].data_size = record[2].length * sizeof(float); + record[2].data = malloc(record[2].data_size); + + record[2].field_names.resize(2); + record[2].field_names[0] = "field_0"; + record[2].field_names[1] = "field_1"; + record[2].attr_data_size.insert(std::make_pair("field_0", length * sizeof(int64_t))); + record[2].attr_data_size.insert(std::make_pair("field_1", length * sizeof(float))); + + record[2].attr_data.insert(std::make_pair("field_0", attr_data_0)); + record[2].attr_data.insert(std::make_pair("field_1", attr_data_1)); + record[2].attr_nbytes.insert(std::make_pair("field_0", sizeof(uint64_t))); + record[2].attr_nbytes.insert(std::make_pair("field_1", sizeof(float))); + + ASSERT_EQ(buffer.AppendEntity(record[2]), milvus::WAL_SUCCESS); + new_file_no = uint32_t(record[2].lsn >> 32); + ASSERT_EQ(new_file_no, ++file_no); + + // write 3 (new file) + record[3].type = milvus::engine::wal::MXLogType::Entity; + record[3].collection_id = "insert_table"; + record[3].partition_tag = "parti1"; + record[3].length = 10; + record[3].ids = (milvus::engine::IDNumber*)malloc(record[3].length * sizeof(milvus::engine::IDNumber)); + record[3].data_size = record[3].length * sizeof(uint8_t); + record[3].data = malloc(record[3].data_size); + + record[3].field_names.resize(2); + record[3].field_names[0] = "field_0"; + record[3].field_names[1] = "field_1"; + record[3].attr_data_size.insert(std::make_pair("field_0", length * sizeof(int64_t))); + record[3].attr_data_size.insert(std::make_pair("field_1", length * sizeof(float))); + + record[3].attr_data.insert(std::make_pair("field_0", attr_data1_0)); + record[3].attr_data.insert(std::make_pair("field_1", attr_data1_1)); + record[3].attr_nbytes.insert(std::make_pair("field_0", sizeof(uint64_t))); + record[3].attr_nbytes.insert(std::make_pair("field_1", sizeof(float))); + ASSERT_EQ(buffer.AppendEntity(record[3]), milvus::WAL_SUCCESS); + new_file_no = uint32_t(record[3].lsn >> 32); + ASSERT_EQ(new_file_no, ++file_no); + + // reset write lsn (record 2) + ASSERT_TRUE(buffer.ResetWriteLsn(record[3].lsn)); + ASSERT_TRUE(buffer.ResetWriteLsn(record[2].lsn)); + ASSERT_TRUE(buffer.ResetWriteLsn(record[1].lsn)); + + // write 2 and 3 again + ASSERT_EQ(buffer.AppendEntity(record[2]), milvus::WAL_SUCCESS); + ASSERT_EQ(buffer.AppendEntity(record[3]), milvus::WAL_SUCCESS); + + // read 2 + ASSERT_EQ(buffer.NextEntity(record[3].lsn, read_rst), milvus::WAL_SUCCESS); + ASSERT_EQ(read_rst.type, record[2].type); + ASSERT_EQ(read_rst.collection_id, record[2].collection_id); + ASSERT_EQ(read_rst.partition_tag, record[2].partition_tag); + ASSERT_EQ(read_rst.length, record[2].length); + ASSERT_EQ(memcmp(read_rst.ids, record[2].ids, read_rst.length * sizeof(milvus::engine::IDNumber)), 0); + ASSERT_EQ(read_rst.data_size, record[2].data_size); + ASSERT_EQ(memcmp(read_rst.data, record[2].data, read_rst.data_size), 0); + + ASSERT_EQ(read_rst.field_names.size(), record[2].field_names.size()); + ASSERT_EQ(read_rst.field_names[1], record[2].field_names[1]); + ASSERT_EQ(read_rst.attr_data.at("field_1").size(), record[2].attr_data.at("field_1").size()); + ASSERT_EQ(read_rst.attr_nbytes.at("field_0"), record[2].attr_nbytes.at("field_0")); + + // read 3 + ASSERT_EQ(buffer.NextEntity(record[3].lsn, read_rst), milvus::WAL_SUCCESS); + ASSERT_EQ(read_rst.type, record[3].type); + ASSERT_EQ(read_rst.collection_id, record[3].collection_id); + ASSERT_EQ(read_rst.partition_tag, record[3].partition_tag); + ASSERT_EQ(read_rst.length, record[3].length); + ASSERT_EQ(memcmp(read_rst.ids, record[3].ids, read_rst.length * sizeof(milvus::engine::IDNumber)), 0); + ASSERT_EQ(read_rst.data_size, record[3].data_size); + ASSERT_EQ(memcmp(read_rst.data, record[3].data, read_rst.data_size), 0); + + ASSERT_EQ(read_rst.field_names.size(), record[3].field_names.size()); + ASSERT_EQ(read_rst.field_names[1], record[3].field_names[1]); + ASSERT_EQ(read_rst.attr_nbytes.at("field_0"), record[3].attr_nbytes.at("field_0")); + + // test an empty record + milvus::engine::wal::MXLogRecord empty; + empty.type = milvus::engine::wal::MXLogType::None; + empty.length = 0; + empty.data_size = 0; + ASSERT_EQ(buffer.AppendEntity(empty), milvus::WAL_SUCCESS); + ASSERT_EQ(buffer.NextEntity(empty.lsn, read_rst), milvus::WAL_SUCCESS); + ASSERT_EQ(read_rst.type, milvus::engine::wal::MXLogType::None); + ASSERT_TRUE(read_rst.collection_id.empty()); + ASSERT_TRUE(read_rst.partition_tag.empty()); + ASSERT_EQ(read_rst.length, 0); + ASSERT_EQ(read_rst.data_size, 0); + + // remove old files + buffer.RemoveOldFiles(record[3].lsn); + ASSERT_EQ(buffer.file_no_from_, file_no); + + // clear writen lsn and reset failed + buffer.mxlog_buffer_writer_.file_no = 0; + buffer.mxlog_buffer_writer_.buf_offset = 0; + ASSERT_FALSE(buffer.ResetWriteLsn(record[1].lsn)); + + // clear writen lsn and reset failed + FILE *fi = fopen(WAL_GTEST_PATH "5.wal", "w"); + fclose(fi); + buffer.mxlog_buffer_writer_.file_no = 0; + buffer.mxlog_buffer_writer_.buf_offset = 0; + ASSERT_FALSE(buffer.ResetWriteLsn(record[1].lsn)); + + for (int i = 0; i < 3; i++) { + if (record[i].ids != nullptr) { + free((void*)record[i].ids); + } + if (record[i].data != nullptr) { + free((void*)record[i].data); + } + } +} + TEST(WalTest, MANAGER_INIT_TEST) { MakeEmptyTestPath(); diff --git a/core/unittest/server/test_rpc.cpp b/core/unittest/server/test_rpc.cpp index 0c941ebbcc..4294ab3889 100644 --- a/core/unittest/server/test_rpc.cpp +++ b/core/unittest/server/test_rpc.cpp @@ -91,6 +91,7 @@ class RpcHandlerTest : public testing::Test { milvus::server::Config::GetInstance().SetStorageConfigSecondaryPath(""); milvus::server::Config::GetInstance().SetCacheConfigCacheInsertData(""); milvus::server::Config::GetInstance().SetEngineConfigOmpThreadNum(""); + milvus::server::Config::GetInstance().SetServerConfigPort("19531"); // serverConfig.SetValue(server::CONFIG_CLUSTER_MODE, "cluster"); // DBWrapper::GetInstance().GetInstance().StartService(); diff --git a/core/unittest/server/test_web.cpp b/core/unittest/server/test_web.cpp index e13ecc73cb..7135909730 100644 --- a/core/unittest/server/test_web.cpp +++ b/core/unittest/server/test_web.cpp @@ -342,9 +342,6 @@ class TestClient : public oatpp::web::client::ApiClient { API_CALL("POST", "/hybrid_collections/{collection_name}/entities", InsertEntity, PATH(String, collection_name), BODY_STRING(String, body)) -// API_CALL("POST", "/hybrid_collections/{collection_name}/vectors", HybridSearch, -// PATH(String, collection_name), BODY_STRING(String, body)) - #include OATPP_CODEGEN_END(ApiClient) };