mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
refactor(db): change groupfilesschema to table*
Former-commit-id: e786234155eecb9d22ad2b6a8b2c9357675e5de4
This commit is contained in:
parent
e94499cdff
commit
209d0e4806
@ -28,7 +28,7 @@ public:
|
||||
virtual Status has_group(const std::string& group_id_, bool& has_or_not_) = 0;
|
||||
virtual Status get_group_files(const std::string& group_id_,
|
||||
const int date_delta_,
|
||||
meta::GroupFilesSchema& group_files_info_) = 0;
|
||||
meta::TableFilesSchema& group_files_info_) = 0;
|
||||
|
||||
virtual Status add_vectors(const std::string& group_id_,
|
||||
size_t n, const float* vectors, IDNumbers& vector_ids_) = 0;
|
||||
|
||||
@ -58,7 +58,7 @@ Status DBImpl<EngineT>::has_group(const std::string& group_id_, bool& has_or_not
|
||||
template<typename EngineT>
|
||||
Status DBImpl<EngineT>::get_group_files(const std::string& group_id,
|
||||
const int date_delta,
|
||||
meta::GroupFilesSchema& group_files_info) {
|
||||
meta::TableFilesSchema& group_files_info) {
|
||||
return _pMeta->get_group_files(group_id, date_delta, group_files_info);
|
||||
|
||||
}
|
||||
@ -83,14 +83,14 @@ template<typename EngineT>
|
||||
Status DBImpl<EngineT>::search(const std::string& group_id, size_t k, size_t nq,
|
||||
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
|
||||
|
||||
meta::DatePartionedGroupFilesSchema files;
|
||||
meta::DatePartionedTableFilesSchema files;
|
||||
auto status = _pMeta->files_to_search(group_id, dates, files);
|
||||
if (!status.ok()) { return status; }
|
||||
|
||||
LOG(DEBUG) << "Search DateT Size=" << files.size();
|
||||
|
||||
meta::GroupFilesSchema index_files;
|
||||
meta::GroupFilesSchema raw_files;
|
||||
meta::TableFilesSchema index_files;
|
||||
meta::TableFilesSchema raw_files;
|
||||
for (auto &day_files : files) {
|
||||
for (auto &file : day_files.second) {
|
||||
file.file_type == meta::TableFileSchema::INDEX ?
|
||||
@ -132,7 +132,7 @@ Status DBImpl<EngineT>::search(const std::string& group_id, size_t k, size_t nq,
|
||||
|
||||
long search_set_size = 0;
|
||||
|
||||
auto search_in_index = [&](meta::GroupFilesSchema& file_vec) -> void {
|
||||
auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
|
||||
for (auto &file : file_vec) {
|
||||
EngineT index(file.dimension, file.location);
|
||||
index.Load();
|
||||
@ -258,7 +258,7 @@ void DBImpl<EngineT>::background_call() {
|
||||
|
||||
template<typename EngineT>
|
||||
Status DBImpl<EngineT>::merge_files(const std::string& group_id, const meta::DateT& date,
|
||||
const meta::GroupFilesSchema& files) {
|
||||
const meta::TableFilesSchema& files) {
|
||||
meta::TableFileSchema group_file;
|
||||
group_file.group_id = group_id;
|
||||
group_file.date = date;
|
||||
@ -271,7 +271,7 @@ Status DBImpl<EngineT>::merge_files(const std::string& group_id, const meta::Dat
|
||||
|
||||
EngineT index(group_file.dimension, group_file.location);
|
||||
|
||||
meta::GroupFilesSchema updated;
|
||||
meta::TableFilesSchema updated;
|
||||
long index_size = 0;
|
||||
|
||||
for (auto& file : files) {
|
||||
@ -305,7 +305,7 @@ Status DBImpl<EngineT>::merge_files(const std::string& group_id, const meta::Dat
|
||||
|
||||
template<typename EngineT>
|
||||
Status DBImpl<EngineT>::background_merge_files(const std::string& group_id) {
|
||||
meta::DatePartionedGroupFilesSchema raw_files;
|
||||
meta::DatePartionedTableFilesSchema raw_files;
|
||||
auto status = _pMeta->files_to_merge(group_id, raw_files);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
@ -356,7 +356,7 @@ Status DBImpl<EngineT>::build_index(const meta::TableFileSchema& file) {
|
||||
auto to_remove = file;
|
||||
to_remove.file_type = meta::TableFileSchema::TO_DELETE;
|
||||
|
||||
meta::GroupFilesSchema update_files = {to_remove, group_file};
|
||||
meta::TableFilesSchema update_files = {to_remove, group_file};
|
||||
_pMeta->update_files(update_files);
|
||||
|
||||
LOG(DEBUG) << "New index file " << group_file.file_id << " of size "
|
||||
@ -373,7 +373,7 @@ template<typename EngineT>
|
||||
void DBImpl<EngineT>::background_build_index() {
|
||||
std::lock_guard<std::mutex> lock(build_index_mutex_);
|
||||
assert(bg_build_index_started_);
|
||||
meta::GroupFilesSchema to_index_files;
|
||||
meta::TableFilesSchema to_index_files;
|
||||
_pMeta->files_to_index(to_index_files);
|
||||
Status status;
|
||||
for (auto& file : to_index_files) {
|
||||
|
||||
@ -40,7 +40,7 @@ public:
|
||||
|
||||
virtual Status get_group_files(const std::string& group_id_,
|
||||
const int date_delta_,
|
||||
meta::GroupFilesSchema& group_files_info_) override;
|
||||
meta::TableFilesSchema& group_files_info_) override;
|
||||
|
||||
virtual Status add_vectors(const std::string& group_id_,
|
||||
size_t n, const float* vectors, IDNumbers& vector_ids_) override;
|
||||
@ -66,7 +66,7 @@ private:
|
||||
Status try_build_index();
|
||||
Status merge_files(const std::string& group_id,
|
||||
const meta::DateT& date,
|
||||
const meta::GroupFilesSchema& files);
|
||||
const meta::TableFilesSchema& files);
|
||||
Status background_merge_files(const std::string& group_id);
|
||||
|
||||
void try_schedule_compaction();
|
||||
|
||||
@ -263,7 +263,7 @@ Status DBMetaImpl::add_group_file(TableFileSchema& group_file) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
|
||||
Status DBMetaImpl::files_to_index(TableFilesSchema& files) {
|
||||
files.clear();
|
||||
|
||||
try {
|
||||
@ -309,7 +309,7 @@ Status DBMetaImpl::files_to_index(GroupFilesSchema& files) {
|
||||
|
||||
Status DBMetaImpl::files_to_search(const std::string &group_id,
|
||||
const DatesT& partition,
|
||||
DatePartionedGroupFilesSchema &files) {
|
||||
DatePartionedTableFilesSchema &files) {
|
||||
files.clear();
|
||||
DatesT today = {Meta::GetDate()};
|
||||
const DatesT& dates = (partition.empty() == true) ? today : partition;
|
||||
@ -346,7 +346,7 @@ Status DBMetaImpl::files_to_search(const std::string &group_id,
|
||||
GetGroupFilePath(group_file);
|
||||
auto dateItr = files.find(group_file.date);
|
||||
if (dateItr == files.end()) {
|
||||
files[group_file.date] = GroupFilesSchema();
|
||||
files[group_file.date] = TableFilesSchema();
|
||||
}
|
||||
files[group_file.date].push_back(group_file);
|
||||
}
|
||||
@ -359,7 +359,7 @@ Status DBMetaImpl::files_to_search(const std::string &group_id,
|
||||
}
|
||||
|
||||
Status DBMetaImpl::files_to_merge(const std::string& group_id,
|
||||
DatePartionedGroupFilesSchema& files) {
|
||||
DatePartionedTableFilesSchema& files) {
|
||||
files.clear();
|
||||
|
||||
try {
|
||||
@ -391,7 +391,7 @@ Status DBMetaImpl::files_to_merge(const std::string& group_id,
|
||||
GetGroupFilePath(group_file);
|
||||
auto dateItr = files.find(group_file.date);
|
||||
if (dateItr == files.end()) {
|
||||
files[group_file.date] = GroupFilesSchema();
|
||||
files[group_file.date] = TableFilesSchema();
|
||||
}
|
||||
files[group_file.date].push_back(group_file);
|
||||
}
|
||||
@ -444,7 +444,7 @@ Status DBMetaImpl::get_group_file(const std::string& group_id_,
|
||||
|
||||
Status DBMetaImpl::get_group_files(const std::string& group_id_,
|
||||
const int date_delta_,
|
||||
GroupFilesSchema& group_files_info_) {
|
||||
TableFilesSchema& group_files_info_) {
|
||||
// PXU TODO
|
||||
return Status::OK();
|
||||
}
|
||||
@ -568,7 +568,7 @@ Status DBMetaImpl::update_group_file(TableFileSchema& group_file) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBMetaImpl::update_files(GroupFilesSchema& files) {
|
||||
Status DBMetaImpl::update_files(TableFilesSchema& files) {
|
||||
try {
|
||||
auto commited = ConnectorPtr->transaction([&] () mutable {
|
||||
for (auto& file : files) {
|
||||
@ -599,7 +599,7 @@ Status DBMetaImpl::cleanup_ttl_files(uint16_t seconds) {
|
||||
where(c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_DELETE and
|
||||
c(&TableFileSchema::updated_time) > now - seconds*US_PS));
|
||||
|
||||
GroupFilesSchema updated;
|
||||
TableFilesSchema updated;
|
||||
|
||||
for (auto& file : selected) {
|
||||
TableFileSchema group_file;
|
||||
@ -635,7 +635,7 @@ Status DBMetaImpl::cleanup() {
|
||||
where(c(&TableFileSchema::file_type) == (int)TableFileSchema::TO_DELETE or
|
||||
c(&TableFileSchema::file_type) == (int)TableFileSchema::NEW));
|
||||
|
||||
GroupFilesSchema updated;
|
||||
TableFilesSchema updated;
|
||||
|
||||
for (auto& file : selected) {
|
||||
TableFileSchema group_file;
|
||||
|
||||
@ -37,18 +37,18 @@ public:
|
||||
|
||||
virtual Status get_group_files(const std::string& group_id_,
|
||||
const int date_delta_,
|
||||
GroupFilesSchema& group_files_info_) override;
|
||||
TableFilesSchema& group_files_info_) override;
|
||||
|
||||
virtual Status update_files(GroupFilesSchema& files) override;
|
||||
virtual Status update_files(TableFilesSchema& files) override;
|
||||
|
||||
virtual Status files_to_merge(const std::string& group_id,
|
||||
DatePartionedGroupFilesSchema& files) override;
|
||||
DatePartionedTableFilesSchema& files) override;
|
||||
|
||||
virtual Status files_to_search(const std::string& group_id,
|
||||
const DatesT& partition,
|
||||
DatePartionedGroupFilesSchema& files) override;
|
||||
DatePartionedTableFilesSchema& files) override;
|
||||
|
||||
virtual Status files_to_index(GroupFilesSchema&) override;
|
||||
virtual Status files_to_index(TableFilesSchema&) override;
|
||||
|
||||
virtual Status archive_files() override;
|
||||
|
||||
|
||||
@ -152,7 +152,7 @@ Status LocalMetaImpl::add_group_file(TableFileSchema& group_file_info) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::files_to_index(GroupFilesSchema& files) {
|
||||
Status LocalMetaImpl::files_to_index(TableFilesSchema& files) {
|
||||
files.clear();
|
||||
|
||||
std::string suffix;
|
||||
@ -182,7 +182,7 @@ Status LocalMetaImpl::files_to_index(GroupFilesSchema& files) {
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::files_to_merge(const std::string& group_id,
|
||||
DatePartionedGroupFilesSchema& files) {
|
||||
DatePartionedTableFilesSchema& files) {
|
||||
files.clear();
|
||||
/* std::string suffix; */
|
||||
/* boost::filesystem::directory_iterator end_itr; */
|
||||
@ -226,7 +226,7 @@ Status LocalMetaImpl::get_group_file(const std::string& group_id_,
|
||||
|
||||
Status LocalMetaImpl::get_group_files(const std::string& group_id_,
|
||||
const int date_delta_,
|
||||
GroupFilesSchema& group_files_info_) {
|
||||
TableFilesSchema& group_files_info_) {
|
||||
// PXU TODO
|
||||
return Status::OK();
|
||||
}
|
||||
@ -236,7 +236,7 @@ Status LocalMetaImpl::update_group_file(TableFileSchema& group_file_) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LocalMetaImpl::update_files(GroupFilesSchema& files) {
|
||||
Status LocalMetaImpl::update_files(TableFilesSchema& files) {
|
||||
//PXU TODO
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -36,16 +36,16 @@ public:
|
||||
|
||||
virtual Status get_group_files(const std::string& group_id_,
|
||||
const int date_delta_,
|
||||
GroupFilesSchema& group_files_info_) override;
|
||||
TableFilesSchema& group_files_info_) override;
|
||||
|
||||
virtual Status update_files(GroupFilesSchema& files) override;
|
||||
virtual Status update_files(TableFilesSchema& files) override;
|
||||
|
||||
virtual Status cleanup() override;
|
||||
|
||||
virtual Status files_to_merge(const std::string& group_id,
|
||||
DatePartionedGroupFilesSchema& files) override;
|
||||
DatePartionedTableFilesSchema& files) override;
|
||||
|
||||
virtual Status files_to_index(GroupFilesSchema&) override;
|
||||
virtual Status files_to_index(TableFilesSchema&) override;
|
||||
|
||||
virtual Status archive_files() override;
|
||||
|
||||
|
||||
@ -40,22 +40,22 @@ public:
|
||||
|
||||
virtual Status get_group_files(const std::string& group_id_,
|
||||
const int date_delta_,
|
||||
GroupFilesSchema& group_files_info_) = 0;
|
||||
TableFilesSchema& group_files_info_) = 0;
|
||||
|
||||
virtual Status update_files(GroupFilesSchema& files) = 0;
|
||||
virtual Status update_files(TableFilesSchema& files) = 0;
|
||||
|
||||
virtual Status files_to_search(const std::string& group_id,
|
||||
const DatesT& partition,
|
||||
DatePartionedGroupFilesSchema& files) = 0;
|
||||
DatePartionedTableFilesSchema& files) = 0;
|
||||
|
||||
virtual Status files_to_merge(const std::string& group_id,
|
||||
DatePartionedGroupFilesSchema& files) = 0;
|
||||
DatePartionedTableFilesSchema& files) = 0;
|
||||
|
||||
virtual Status size(long& result) = 0;
|
||||
|
||||
virtual Status archive_files() = 0;
|
||||
|
||||
virtual Status files_to_index(GroupFilesSchema&) = 0;
|
||||
virtual Status files_to_index(TableFilesSchema&) = 0;
|
||||
|
||||
virtual Status cleanup() = 0;
|
||||
virtual Status cleanup_ttl_files(uint16_t) = 0;
|
||||
|
||||
@ -48,8 +48,8 @@ struct TableFileSchema {
|
||||
long created_on;
|
||||
}; // TableFileSchema
|
||||
|
||||
typedef std::vector<TableFileSchema> GroupFilesSchema;
|
||||
typedef std::map<DateT, GroupFilesSchema> DatePartionedGroupFilesSchema;
|
||||
typedef std::vector<TableFileSchema> TableFilesSchema;
|
||||
typedef std::map<DateT, TableFilesSchema> DatePartionedTableFilesSchema;
|
||||
|
||||
} // namespace meta
|
||||
} // namespace engine
|
||||
|
||||
@ -106,7 +106,7 @@ TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
|
||||
group.group_id = group_id;
|
||||
auto status = impl.add_group(group);
|
||||
|
||||
meta::GroupFilesSchema files;
|
||||
meta::TableFilesSchema files;
|
||||
meta::TableFileSchema group_file;
|
||||
group_file.group_id = group.group_id;
|
||||
|
||||
@ -152,7 +152,7 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) {
|
||||
group.group_id = group_id;
|
||||
auto status = impl.add_group(group);
|
||||
|
||||
meta::GroupFilesSchema files;
|
||||
meta::TableFilesSchema files;
|
||||
meta::TableFileSchema group_file;
|
||||
group_file.group_id = group.group_id;
|
||||
|
||||
@ -222,13 +222,13 @@ TEST_F(MetaTest, GROUP_FILES_TEST) {
|
||||
status = impl_->update_group_file(group_file);
|
||||
}
|
||||
|
||||
meta::GroupFilesSchema files;
|
||||
meta::TableFilesSchema files;
|
||||
|
||||
status = impl_->files_to_index(files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(files.size(), to_index_files_cnt);
|
||||
|
||||
meta::DatePartionedGroupFilesSchema dated_files;
|
||||
meta::DatePartionedTableFilesSchema dated_files;
|
||||
status = impl_->files_to_merge(group.group_id, dated_files);
|
||||
ASSERT_TRUE(status.ok());
|
||||
ASSERT_EQ(dated_files[group_file.date].size(), raw_files_cnt);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user