mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-02 01:06:41 +08:00
(db/snapshot) Optimize snapshot mock db (#2896)
* Add mysql time statices Signed-off-by: yhz <413554850@qq.com> * Add detail timestamp recorder Signed-off-by: yhz <413554850@qq.com> * Optimize snapshot mock DB Signed-off-by: yhz <413554850@qq.com> * Remove surplus space Signed-off-by: yhz <413554850@qq.com> * Updat meta test Signed-off-by: yhz <413554850@qq.com>
This commit is contained in:
parent
e8bbe5a322
commit
924960b69d
@ -40,7 +40,7 @@ class MetaAdapter {
|
||||
// TODO move select logic to here
|
||||
auto session = CreateSession();
|
||||
std::vector<typename T::Ptr> resources;
|
||||
auto status = session->Select<T, snapshot::ID_TYPE>(snapshot::IdField::Name, id, resources);
|
||||
auto status = session->Select<T, snapshot::ID_TYPE>(snapshot::IdField::Name, id, {}, resources);
|
||||
if (status.ok() && !resources.empty()) {
|
||||
// TODO: may need to check num of resources
|
||||
resource = resources.at(0);
|
||||
@ -53,7 +53,7 @@ class MetaAdapter {
|
||||
Status
|
||||
SelectBy(const std::string& field, const U& value, std::vector<typename ResourceT::Ptr>& resources) {
|
||||
auto session = CreateSession();
|
||||
return session->Select<ResourceT, U>(field, value, resources);
|
||||
return session->Select<ResourceT, U>(field, value, {}, resources);
|
||||
}
|
||||
|
||||
template <typename ResourceT, typename U>
|
||||
@ -61,7 +61,8 @@ class MetaAdapter {
|
||||
SelectResourceIDs(std::vector<int64_t>& ids, const std::string& filter_field, const U& filter_value) {
|
||||
std::vector<typename ResourceT::Ptr> resources;
|
||||
auto session = CreateSession();
|
||||
auto status = session->Select<ResourceT, U>(filter_field, filter_value, resources);
|
||||
std::vector<std::string> target_attrs = {F_ID};
|
||||
auto status = session->Select<ResourceT, U>(filter_field, filter_value, target_attrs, resources);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
#include <memory>
|
||||
#include <set>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
@ -41,7 +42,8 @@ class MetaSession {
|
||||
public:
|
||||
template <typename ResourceT, typename U>
|
||||
Status
|
||||
Select(const std::string& field, const U& value, std::vector<typename ResourceT::Ptr>& resources);
|
||||
Select(const std::string& field, const U& value, const std::vector<std::string>& target_attrs,
|
||||
std::vector<typename ResourceT::Ptr>& resources);
|
||||
|
||||
template <typename ResourceT>
|
||||
Status
|
||||
@ -90,7 +92,8 @@ class MetaSession {
|
||||
|
||||
template <typename T, typename U>
|
||||
Status
|
||||
MetaSession::Select(const std::string& field, const U& value, std::vector<typename T::Ptr>& resources) {
|
||||
MetaSession::Select(const std::string& field, const U& value, const std::vector<std::string>& target_attrs,
|
||||
std::vector<typename T::Ptr>& resources) {
|
||||
MetaQueryContext context;
|
||||
context.table_ = T::Name;
|
||||
|
||||
@ -100,6 +103,11 @@ MetaSession::Select(const std::string& field, const U& value, std::vector<typena
|
||||
context.filter_attrs_ = {{field, field_value}};
|
||||
}
|
||||
|
||||
if (!target_attrs.empty()) {
|
||||
context.all_required_ = false;
|
||||
context.query_fields_ = target_attrs;
|
||||
}
|
||||
|
||||
AttrsMapList attrs;
|
||||
auto status = db_engine_->Query(context, attrs);
|
||||
if (!status.ok()) {
|
||||
@ -112,135 +120,186 @@ MetaSession::Select(const std::string& field, const U& value, std::vector<typena
|
||||
|
||||
for (auto raw : attrs) {
|
||||
auto resource = snapshot::CreateResPtr<T>();
|
||||
|
||||
std::unordered_map<std::string, std::string>::iterator iter;
|
||||
auto mf_p = std::dynamic_pointer_cast<snapshot::MappingsField>(resource);
|
||||
if (mf_p != nullptr) {
|
||||
std::string mapping = raw[F_MAPPINGS];
|
||||
auto mapping_json = nlohmann::json::parse(mapping);
|
||||
std::set<int64_t> mappings;
|
||||
for (auto& ele : mapping_json) {
|
||||
mappings.insert(ele.get<int64_t>());
|
||||
iter = raw.find(F_MAPPINGS);
|
||||
if (iter != raw.end()) {
|
||||
auto mapping_json = nlohmann::json::parse(iter->second);
|
||||
std::set<int64_t> mappings;
|
||||
for (auto& ele : mapping_json) {
|
||||
mappings.insert(ele.get<int64_t>());
|
||||
}
|
||||
mf_p->GetMappings() = mappings;
|
||||
}
|
||||
mf_p->GetMappings() = mappings;
|
||||
}
|
||||
|
||||
auto sf_p = std::dynamic_pointer_cast<snapshot::StateField>(resource);
|
||||
if (sf_p != nullptr) {
|
||||
auto status_str = raw[F_STATE];
|
||||
auto status_int = std::stol(status_str);
|
||||
switch (static_cast<snapshot::State>(status_int)) {
|
||||
case snapshot::PENDING: {
|
||||
sf_p->ResetStatus();
|
||||
break;
|
||||
iter = raw.find(F_STATE);
|
||||
if (iter != raw.end()) {
|
||||
auto status_int = std::stol(iter->second);
|
||||
switch (static_cast<snapshot::State>(status_int)) {
|
||||
case snapshot::PENDING: {
|
||||
sf_p->ResetStatus();
|
||||
break;
|
||||
}
|
||||
case snapshot::ACTIVE: {
|
||||
sf_p->ResetStatus();
|
||||
sf_p->Activate();
|
||||
break;
|
||||
}
|
||||
case snapshot::DEACTIVE: {
|
||||
sf_p->ResetStatus();
|
||||
sf_p->Deactivate();
|
||||
break;
|
||||
}
|
||||
default: { return Status(SERVER_UNSUPPORTED_ERROR, "Invalid state value"); }
|
||||
}
|
||||
case snapshot::ACTIVE: {
|
||||
sf_p->ResetStatus();
|
||||
sf_p->Activate();
|
||||
break;
|
||||
}
|
||||
case snapshot::DEACTIVE: {
|
||||
sf_p->ResetStatus();
|
||||
sf_p->Deactivate();
|
||||
break;
|
||||
}
|
||||
default: { return Status(SERVER_UNSUPPORTED_ERROR, "Invalid state value"); }
|
||||
}
|
||||
}
|
||||
|
||||
auto lsn_f = std::dynamic_pointer_cast<snapshot::LsnField>(resource);
|
||||
if (lsn_f != nullptr) {
|
||||
auto lsn = std::stoul(raw[F_LSN]);
|
||||
lsn_f->SetLsn(lsn);
|
||||
iter = raw.find(F_LSN);
|
||||
if (iter != raw.end()) {
|
||||
auto lsn = std::stoul(iter->second);
|
||||
lsn_f->SetLsn(lsn);
|
||||
}
|
||||
}
|
||||
|
||||
auto created_on_f = std::dynamic_pointer_cast<snapshot::CreatedOnField>(resource);
|
||||
if (created_on_f != nullptr) {
|
||||
auto created_on = std::stol(raw[F_CREATED_ON]);
|
||||
created_on_f->SetCreatedTime(created_on);
|
||||
iter = raw.find(F_CREATED_ON);
|
||||
if (iter != raw.end()) {
|
||||
auto created_on = std::stol(iter->second);
|
||||
created_on_f->SetCreatedTime(created_on);
|
||||
}
|
||||
}
|
||||
|
||||
auto update_on_p = std::dynamic_pointer_cast<snapshot::UpdatedOnField>(resource);
|
||||
if (update_on_p != nullptr) {
|
||||
auto update_on = std::stol(raw[F_UPDATED_ON]);
|
||||
update_on_p->SetUpdatedTime(update_on);
|
||||
iter = raw.find(F_UPDATED_ON);
|
||||
if (iter != raw.end()) {
|
||||
auto update_on = std::stol(iter->second);
|
||||
update_on_p->SetUpdatedTime(update_on);
|
||||
}
|
||||
}
|
||||
|
||||
auto id_p = std::dynamic_pointer_cast<snapshot::IdField>(resource);
|
||||
if (id_p != nullptr) {
|
||||
auto t_id = std::stol(raw[F_ID]);
|
||||
id_p->SetID(t_id);
|
||||
iter = raw.find(F_ID);
|
||||
if (iter != raw.end()) {
|
||||
auto t_id = std::stol(iter->second);
|
||||
id_p->SetID(t_id);
|
||||
}
|
||||
}
|
||||
|
||||
auto cid_p = std::dynamic_pointer_cast<snapshot::CollectionIdField>(resource);
|
||||
if (cid_p != nullptr) {
|
||||
auto cid = std::stol(raw[F_COLLECTON_ID]);
|
||||
cid_p->SetCollectionId(cid);
|
||||
iter = raw.find(F_COLLECTON_ID);
|
||||
if (iter != raw.end()) {
|
||||
auto cid = std::stol(iter->second);
|
||||
cid_p->SetCollectionId(cid);
|
||||
}
|
||||
}
|
||||
|
||||
auto sid_p = std::dynamic_pointer_cast<snapshot::SchemaIdField>(resource);
|
||||
if (sid_p != nullptr) {
|
||||
auto sid = std::stol(raw[F_SCHEMA_ID]);
|
||||
sid_p->SetSchemaId(sid);
|
||||
iter = raw.find(F_SCHEMA_ID);
|
||||
if (iter != raw.end()) {
|
||||
auto sid = std::stol(iter->second);
|
||||
sid_p->SetSchemaId(sid);
|
||||
}
|
||||
}
|
||||
|
||||
auto num_p = std::dynamic_pointer_cast<snapshot::NumField>(resource);
|
||||
if (num_p != nullptr) {
|
||||
auto num = std::stol(raw[F_NUM]);
|
||||
num_p->SetNum(num);
|
||||
iter = raw.find(F_NUM);
|
||||
if (iter != raw.end()) {
|
||||
auto num = std::stol(iter->second);
|
||||
num_p->SetNum(num);
|
||||
}
|
||||
}
|
||||
|
||||
auto ftype_p = std::dynamic_pointer_cast<snapshot::FtypeField>(resource);
|
||||
if (ftype_p != nullptr) {
|
||||
auto ftype = std::stol(raw[F_FTYPE]);
|
||||
ftype_p->SetFtype(ftype);
|
||||
iter = raw.find(F_FTYPE);
|
||||
if (iter != raw.end()) {
|
||||
auto ftype = std::stol(iter->second);
|
||||
ftype_p->SetFtype(ftype);
|
||||
}
|
||||
}
|
||||
|
||||
auto fid_p = std::dynamic_pointer_cast<snapshot::FieldIdField>(resource);
|
||||
if (fid_p != nullptr) {
|
||||
auto fid = std::stol(raw[F_FIELD_ID]);
|
||||
fid_p->SetFieldId(fid);
|
||||
iter = raw.find(F_FIELD_ID);
|
||||
if (iter != raw.end()) {
|
||||
auto fid = std::stol(iter->second);
|
||||
fid_p->SetFieldId(fid);
|
||||
}
|
||||
}
|
||||
|
||||
auto feid_p = std::dynamic_pointer_cast<snapshot::FieldElementIdField>(resource);
|
||||
if (feid_p != nullptr) {
|
||||
auto feid = std::stol(raw[F_FIELD_ELEMENT_ID]);
|
||||
feid_p->SetFieldElementId(feid);
|
||||
iter = raw.find(F_FIELD_ELEMENT_ID);
|
||||
if (iter != raw.end()) {
|
||||
auto feid = std::stol(iter->second);
|
||||
feid_p->SetFieldElementId(feid);
|
||||
}
|
||||
}
|
||||
|
||||
auto pid_p = std::dynamic_pointer_cast<snapshot::PartitionIdField>(resource);
|
||||
if (pid_p != nullptr) {
|
||||
auto p_id = std::stol(raw[F_PARTITION_ID]);
|
||||
pid_p->SetPartitionId(p_id);
|
||||
iter = raw.find(F_PARTITION_ID);
|
||||
if (iter != raw.end()) {
|
||||
auto p_id = std::stol(iter->second);
|
||||
pid_p->SetPartitionId(p_id);
|
||||
}
|
||||
}
|
||||
|
||||
auto sgid_p = std::dynamic_pointer_cast<snapshot::SegmentIdField>(resource);
|
||||
if (sgid_p != nullptr) {
|
||||
auto sg_id = std::stol(raw[F_SEGMENT_ID]);
|
||||
sgid_p->SetSegmentId(sg_id);
|
||||
iter = raw.find(F_SEGMENT_ID);
|
||||
if (iter != raw.end()) {
|
||||
auto sg_id = std::stol(iter->second);
|
||||
sgid_p->SetSegmentId(sg_id);
|
||||
}
|
||||
}
|
||||
|
||||
auto name_p = std::dynamic_pointer_cast<snapshot::NameField>(resource);
|
||||
if (name_p != nullptr) {
|
||||
auto name = raw[F_NAME];
|
||||
name_p->SetName(name);
|
||||
iter = raw.find(F_NAME);
|
||||
if (iter != raw.end()) {
|
||||
name_p->SetName(iter->second);
|
||||
}
|
||||
}
|
||||
|
||||
auto pf_p = std::dynamic_pointer_cast<snapshot::ParamsField>(resource);
|
||||
if (pf_p != nullptr) {
|
||||
auto params = nlohmann::json::parse(raw[F_PARAMS]);
|
||||
pf_p->SetParams(params);
|
||||
iter = raw.find(F_PARAMS);
|
||||
if (iter != raw.end()) {
|
||||
auto params = nlohmann::json::parse(iter->second);
|
||||
pf_p->SetParams(params);
|
||||
}
|
||||
}
|
||||
|
||||
auto size_p = std::dynamic_pointer_cast<snapshot::SizeField>(resource);
|
||||
if (size_p != nullptr) {
|
||||
auto size = std::stol(raw[F_SIZE]);
|
||||
size_p->SetSize(size);
|
||||
iter = raw.find(F_SIZE);
|
||||
if (iter != raw.end()) {
|
||||
auto size = std::stol(iter->second);
|
||||
size_p->SetSize(size);
|
||||
}
|
||||
}
|
||||
|
||||
auto rc_p = std::dynamic_pointer_cast<snapshot::RowCountField>(resource);
|
||||
if (rc_p != nullptr) {
|
||||
auto rc = std::stol(raw[F_ROW_COUNT]);
|
||||
rc_p->SetRowCount(rc);
|
||||
iter = raw.find(F_ROW_COUNT);
|
||||
if (iter != raw.end()) {
|
||||
auto rc = std::stol(iter->second);
|
||||
rc_p->SetRowCount(rc);
|
||||
}
|
||||
}
|
||||
|
||||
resources.push_back(std::move(resource));
|
||||
|
||||
@ -30,34 +30,57 @@ MockMetaEngine::QueryNoLock(const MetaQueryContext& context, AttrsMapList& attrs
|
||||
return Status(0, "Empty");
|
||||
}
|
||||
|
||||
auto filter_lambda = [](const AttrsMapList& store_attrs, AttrsMapList& candidate_attrs,
|
||||
std::pair<std::string, std::string> filter) {
|
||||
candidate_attrs.clear();
|
||||
for (auto& store_attr : store_attrs) {
|
||||
auto attr = store_attr.find(filter.first);
|
||||
if (attr->second == filter.second) {
|
||||
candidate_attrs.push_back(store_attr);
|
||||
auto select_target_attrs = [](const TableRaw& raw, AttrsMapList& des,
|
||||
const std::vector<std::string>& target_attrs) {
|
||||
if (target_attrs.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto m = std::unordered_map<std::string, std::string>();
|
||||
for (auto& attr : target_attrs) {
|
||||
auto iter = raw.find(attr);
|
||||
if (iter != raw.end()) {
|
||||
m.insert(std::make_pair(iter->first, iter->second));
|
||||
}
|
||||
}
|
||||
if (!m.empty()) {
|
||||
des.push_back(m);
|
||||
}
|
||||
};
|
||||
|
||||
auto table_attrs = resources_.find(context.table_);
|
||||
AttrsMapList candidate_attrs = table_attrs->second;
|
||||
AttrsMapList result_attrs;
|
||||
auto& candidate_raws = resources_[context.table_];
|
||||
|
||||
bool selected = true;
|
||||
if (!context.filter_attrs_.empty()) {
|
||||
for (auto& filter_attr : context.filter_attrs_) {
|
||||
filter_lambda(candidate_attrs, result_attrs, filter_attr);
|
||||
candidate_attrs.clear();
|
||||
candidate_attrs = result_attrs;
|
||||
for (auto& raw : candidate_raws) {
|
||||
for (auto& filter_attr : context.filter_attrs_) {
|
||||
auto iter = raw.find(filter_attr.first);
|
||||
if (iter == raw.end() || iter->second != filter_attr.second) {
|
||||
selected = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (selected) {
|
||||
if (context.all_required_) {
|
||||
attrs.push_back(raw);
|
||||
} else {
|
||||
select_target_attrs(raw, attrs, context.query_fields_);
|
||||
}
|
||||
}
|
||||
selected = true;
|
||||
}
|
||||
|
||||
} else {
|
||||
result_attrs = table_attrs->second;
|
||||
if (context.all_required_) {
|
||||
attrs = candidate_raws;
|
||||
} else {
|
||||
for (auto& attr : candidate_raws) {
|
||||
select_target_attrs(attr, attrs, context.query_fields_);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (auto& raw_attrs : result_attrs) {
|
||||
for (auto& kv : raw_attrs) {
|
||||
for (auto& result_raw : attrs) {
|
||||
for (auto& kv : result_raw) {
|
||||
if (*kv.second.begin() == '\'' && *kv.second.rbegin() == '\'') {
|
||||
std::string v = kv.second;
|
||||
StringHelpFunctions::TrimStringQuote(v, "\'");
|
||||
@ -66,9 +89,6 @@ MockMetaEngine::QueryNoLock(const MetaQueryContext& context, AttrsMapList& attrs
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: filter select field here
|
||||
attrs = result_attrs;
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -169,7 +189,6 @@ MockMetaEngine::ExecuteTransaction(const std::vector<MetaApplyContext>& sql_cont
|
||||
|
||||
if (!status.ok()) {
|
||||
RollBackNoLock(pair_entities);
|
||||
return status;
|
||||
}
|
||||
|
||||
return status;
|
||||
|
||||
@ -347,6 +347,7 @@ MySqlEngine::Query(const MetaQueryContext& context, AttrsMapList& attrs) {
|
||||
mysqlpp::Query query = connectionPtr->query(sql);
|
||||
auto res = query.store();
|
||||
if (!res) {
|
||||
// TODO: change error behavior
|
||||
throw Exception(1, "Query res is false");
|
||||
}
|
||||
|
||||
@ -354,7 +355,7 @@ MySqlEngine::Query(const MetaQueryContext& context, AttrsMapList& attrs) {
|
||||
for (auto& row : res) {
|
||||
AttrsMap attrs_map;
|
||||
for (auto& name : *names) {
|
||||
attrs_map.insert(std::pair<std::string, std::string>(name, row[name.c_str()]));
|
||||
attrs_map.insert(std::make_pair(name, row[name.c_str()]));
|
||||
}
|
||||
attrs.push_back(attrs_map);
|
||||
}
|
||||
|
||||
@ -9,11 +9,10 @@
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
#include "ssdb/utils.h"
|
||||
|
||||
#include "db/meta/MetaFields.h"
|
||||
#include "db/meta/backend/MetaContext.h"
|
||||
#include "db/snapshot/ResourceContext.h"
|
||||
#include "ssdb/utils.h"
|
||||
|
||||
template <typename T>
|
||||
using ResourceContext = milvus::engine::snapshot::ResourceContext<T>;
|
||||
@ -21,6 +20,7 @@ template <typename T>
|
||||
using ResourceContextBuilder = milvus::engine::snapshot::ResourceContextBuilder<T>;
|
||||
|
||||
using FType = milvus::engine::FieldType;
|
||||
using FEType = milvus::engine::FieldElementType;
|
||||
using Op = milvus::engine::meta::MetaContextOp;
|
||||
|
||||
TEST_F(SSMetaTest, ApplyTest) {
|
||||
@ -72,6 +72,14 @@ TEST_F(SSMetaTest, SessionTest) {
|
||||
ASSERT_GT(result_id, 0);
|
||||
field->SetID(result_id);
|
||||
|
||||
auto field_element = std::make_shared<FieldElement>(collection->GetID(), field->GetID(),
|
||||
"meta_test_f1_fe1", FEType::FET_RAW);
|
||||
auto fe_ctx = ResourceContextBuilder<FieldElement>().SetResource(field_element).CreatePtr();
|
||||
status = meta_->Apply<FieldElement>(fe_ctx, result_id);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
ASSERT_GT(result_id, 0);
|
||||
field_element->SetID(result_id);
|
||||
|
||||
auto session = meta_->CreateSession();
|
||||
ASSERT_TRUE(collection->Activate());
|
||||
auto c2_ctx = ResourceContextBuilder<Collection>().SetResource(collection)
|
||||
@ -91,13 +99,20 @@ TEST_F(SSMetaTest, SessionTest) {
|
||||
status = session->Apply<Field>(f2_ctx);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
ASSERT_TRUE(field_element->Activate());
|
||||
auto fe2_ctx = ResourceContextBuilder<FieldElement>().SetResource(field_element)
|
||||
.SetOp(Op::oUpdate).AddAttr(milvus::engine::meta::F_STATE).CreatePtr();
|
||||
status = session->Apply<FieldElement>(fe2_ctx);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
|
||||
std::vector<ID_TYPE> result_ids;
|
||||
status = session->Commit(result_ids);
|
||||
ASSERT_TRUE(status.ok()) << status.ToString();
|
||||
ASSERT_EQ(result_ids.size(), 3);
|
||||
ASSERT_EQ(result_ids.size(), 4);
|
||||
ASSERT_EQ(result_ids.at(0), collection->GetID());
|
||||
ASSERT_EQ(result_ids.at(1), partition->GetID());
|
||||
ASSERT_EQ(result_ids.at(2), field->GetID());
|
||||
ASSERT_EQ(result_ids.at(3), field_element->GetID());
|
||||
}
|
||||
|
||||
TEST_F(SSMetaTest, SelectTest) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user