mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-04 01:42:15 +08:00
Merge branch 'branch-0.3.1' into 'branch-0.3.1'
MS-251 Build/Merge background thread error handling See merge request megasearch/milvus!289 Former-commit-id: a594c6111e15dcc556e9f2bf238606047b1bb7b2
This commit is contained in:
commit
b93df27e5c
@ -362,6 +362,7 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id_ = table_id;
|
||||
table_file.date_ = date;
|
||||
table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
|
||||
Status status = meta_ptr_->CreateTableFile(table_file);
|
||||
|
||||
if (!status.ok()) {
|
||||
@ -522,7 +523,7 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id_ = file.table_id_;
|
||||
table_file.date_ = file.date_;
|
||||
table_file.file_type_ = meta::TableFileSchema::INDEX; //for multi-db-path, distribute index file averagely to each path
|
||||
table_file.file_type_ = meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path
|
||||
Status status = meta_ptr_->CreateTableFile(table_file);
|
||||
if (!status.ok()) {
|
||||
ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
|
||||
|
||||
@ -302,19 +302,49 @@ Status DBMetaImpl::DescribeTable(TableSchema &table_schema) {
|
||||
Status DBMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has) {
|
||||
has = false;
|
||||
try {
|
||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_),
|
||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||
&TableFileSchema::file_type_),
|
||||
where((c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
|
||||
or
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW
|
||||
or
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW_MERGE
|
||||
or
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW_INDEX
|
||||
or
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX)
|
||||
and c(&TableFileSchema::table_id_) == table_id
|
||||
));
|
||||
|
||||
if (selected.size() >= 1) {
|
||||
has = true;
|
||||
} else {
|
||||
has = false;
|
||||
|
||||
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0, to_index_count = 0;
|
||||
for (auto &file : selected) {
|
||||
switch (std::get<1>(file)) {
|
||||
case (int) TableFileSchema::RAW:
|
||||
raw_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::NEW:
|
||||
new_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::NEW_MERGE:
|
||||
new_merge_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::NEW_INDEX:
|
||||
new_index_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::TO_INDEX:
|
||||
to_index_count++;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
|
||||
<< " new files:" << new_count << " new_merge files:" << new_merge_count
|
||||
<< " new_index files:" << new_index_count << " to_index files:" << to_index_count;
|
||||
}
|
||||
|
||||
} catch (std::exception &e) {
|
||||
@ -389,7 +419,6 @@ Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
|
||||
MetricCollector metric;
|
||||
|
||||
NextFileId(file_schema.file_id_);
|
||||
file_schema.file_type_ = TableFileSchema::NEW;
|
||||
file_schema.dimension_ = table_schema.dimension_;
|
||||
file_schema.size_ = 0;
|
||||
file_schema.created_on_ = utils::GetMicroSecTimeStamp();
|
||||
@ -958,7 +987,11 @@ Status DBMetaImpl::CleanUp() {
|
||||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||
|
||||
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_),
|
||||
where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW));
|
||||
where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW
|
||||
or
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW_INDEX
|
||||
or
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW_MERGE));
|
||||
|
||||
auto commited = ConnectorPtr->transaction([&]() mutable {
|
||||
for (auto &file : files) {
|
||||
|
||||
@ -43,6 +43,8 @@ struct TableFileSchema {
|
||||
TO_INDEX,
|
||||
INDEX,
|
||||
TO_DELETE,
|
||||
NEW_MERGE,
|
||||
NEW_INDEX,
|
||||
} FILE_TYPE;
|
||||
|
||||
size_t id_ = 0;
|
||||
|
||||
@ -404,6 +404,8 @@ Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) {
|
||||
"WHERE table_id = " << quote << table_id << " AND " <<
|
||||
"(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::NEW) << " OR " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::NEW_MERGE) << " OR " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::NEW_INDEX) << " OR " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ")) " <<
|
||||
"AS " << quote << "check" << ";";
|
||||
|
||||
@ -706,7 +708,6 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
|
||||
MetricCollector metric;
|
||||
|
||||
NextFileId(file_schema.file_id_);
|
||||
file_schema.file_type_ = TableFileSchema::NEW;
|
||||
file_schema.dimension_ = table_schema.dimension_;
|
||||
file_schema.size_ = 0;
|
||||
file_schema.created_on_ = utils::GetMicroSecTimeStamp();
|
||||
@ -1701,7 +1702,10 @@ Status MySQLMetaImpl::CleanUp() {
|
||||
|
||||
if (!res.empty()) {
|
||||
ENGINE_LOG_DEBUG << "Remove table file type as NEW";
|
||||
cleanUpQuery << "DELETE FROM TableFiles WHERE file_type = " << std::to_string(TableFileSchema::NEW) << ";";
|
||||
cleanUpQuery << "DELETE FROM TableFiles WHERE file_type IN ("
|
||||
<< std::to_string(TableFileSchema::NEW) << ","
|
||||
<< std::to_string(TableFileSchema::NEW_MERGE) << ","
|
||||
<< std::to_string(TableFileSchema::NEW_INDEX) << ");";
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str();
|
||||
|
||||
|
||||
@ -34,7 +34,7 @@ std::string GetTableFileParentFolder(const DBMetaOptions& options, const meta::T
|
||||
std::string target_path = options.path;
|
||||
uint64_t index = 0;
|
||||
|
||||
if(meta::TableFileSchema::INDEX == table_file.file_type_) {
|
||||
if(meta::TableFileSchema::NEW_INDEX == table_file.file_type_) {
|
||||
// index file is large file and to be persisted permanently
|
||||
// we need to distribute index files to each db_path averagely
|
||||
// round robin according to a file counter
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user