mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-05 02:12:48 +08:00
Merge remote-tracking branch 'upstream/branch-0.3.1' into without-grpc-version
Former-commit-id: c879bb4efe166bb643d8a990606d6585470a22c3
This commit is contained in:
commit
50d4411fea
@ -366,6 +366,7 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id_ = table_id;
|
||||
table_file.date_ = date;
|
||||
table_file.file_type_ = meta::TableFileSchema::NEW_MERGE;
|
||||
Status status = meta_ptr_->CreateTableFile(table_file);
|
||||
|
||||
if (!status.ok()) {
|
||||
@ -526,7 +527,7 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
|
||||
meta::TableFileSchema table_file;
|
||||
table_file.table_id_ = file.table_id_;
|
||||
table_file.date_ = file.date_;
|
||||
table_file.file_type_ = meta::TableFileSchema::INDEX; //for multi-db-path, distribute index file averagely to each path
|
||||
table_file.file_type_ = meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path
|
||||
Status status = meta_ptr_->CreateTableFile(table_file);
|
||||
if (!status.ok()) {
|
||||
ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
|
||||
@ -558,15 +559,25 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
|
||||
auto to_remove = file;
|
||||
to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
|
||||
|
||||
meta::TableFilesSchema update_files = {to_remove, table_file};
|
||||
meta_ptr_->UpdateTableFiles(update_files);
|
||||
meta::TableFilesSchema update_files = {table_file, to_remove};
|
||||
status = meta_ptr_->UpdateTableFiles(update_files);
|
||||
if(status.ok()) {
|
||||
ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
|
||||
<< index->PhysicalSize() << " bytes"
|
||||
<< " from file " << to_remove.file_id_;
|
||||
|
||||
ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
|
||||
<< index->PhysicalSize() << " bytes"
|
||||
<< " from file " << to_remove.file_id_;
|
||||
if(options_.insert_cache_immediately_) {
|
||||
index->Cache();
|
||||
}
|
||||
} else {
|
||||
//failed to update meta, mark the new file as to_delete, don't delete old file
|
||||
to_remove.file_type_ = meta::TableFileSchema::TO_INDEX;
|
||||
status = meta_ptr_->UpdateTableFile(to_remove);
|
||||
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << to_remove.file_id_ << " to to_index";
|
||||
|
||||
if(options_.insert_cache_immediately_) {
|
||||
index->Cache();
|
||||
table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
|
||||
status = meta_ptr_->UpdateTableFile(table_file);
|
||||
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
|
||||
}
|
||||
|
||||
} catch (std::exception& ex) {
|
||||
|
||||
@ -302,19 +302,49 @@ Status DBMetaImpl::DescribeTable(TableSchema &table_schema) {
|
||||
Status DBMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has) {
|
||||
has = false;
|
||||
try {
|
||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_),
|
||||
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_,
|
||||
&TableFileSchema::file_type_),
|
||||
where((c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
|
||||
or
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW
|
||||
or
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW_MERGE
|
||||
or
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW_INDEX
|
||||
or
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX)
|
||||
and c(&TableFileSchema::table_id_) == table_id
|
||||
));
|
||||
|
||||
if (selected.size() >= 1) {
|
||||
has = true;
|
||||
} else {
|
||||
has = false;
|
||||
|
||||
int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0, to_index_count = 0;
|
||||
for (auto &file : selected) {
|
||||
switch (std::get<1>(file)) {
|
||||
case (int) TableFileSchema::RAW:
|
||||
raw_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::NEW:
|
||||
new_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::NEW_MERGE:
|
||||
new_merge_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::NEW_INDEX:
|
||||
new_index_count++;
|
||||
break;
|
||||
case (int) TableFileSchema::TO_INDEX:
|
||||
to_index_count++;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count
|
||||
<< " new files:" << new_count << " new_merge files:" << new_merge_count
|
||||
<< " new_index files:" << new_index_count << " to_index files:" << to_index_count;
|
||||
}
|
||||
|
||||
} catch (std::exception &e) {
|
||||
@ -389,7 +419,6 @@ Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
|
||||
MetricCollector metric;
|
||||
|
||||
NextFileId(file_schema.file_id_);
|
||||
file_schema.file_type_ = TableFileSchema::NEW;
|
||||
file_schema.dimension_ = table_schema.dimension_;
|
||||
file_schema.size_ = 0;
|
||||
file_schema.created_on_ = utils::GetMicroSecTimeStamp();
|
||||
@ -958,7 +987,11 @@ Status DBMetaImpl::CleanUp() {
|
||||
std::lock_guard<std::mutex> meta_lock(meta_mutex_);
|
||||
|
||||
auto files = ConnectorPtr->select(columns(&TableFileSchema::id_),
|
||||
where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW));
|
||||
where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW
|
||||
or
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW_INDEX
|
||||
or
|
||||
c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW_MERGE));
|
||||
|
||||
auto commited = ConnectorPtr->transaction([&]() mutable {
|
||||
for (auto &file : files) {
|
||||
|
||||
@ -43,6 +43,8 @@ struct TableFileSchema {
|
||||
TO_INDEX,
|
||||
INDEX,
|
||||
TO_DELETE,
|
||||
NEW_MERGE,
|
||||
NEW_INDEX,
|
||||
} FILE_TYPE;
|
||||
|
||||
size_t id_ = 0;
|
||||
|
||||
@ -404,6 +404,8 @@ Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) {
|
||||
"WHERE table_id = " << quote << table_id << " AND " <<
|
||||
"(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::NEW) << " OR " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::NEW_MERGE) << " OR " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::NEW_INDEX) << " OR " <<
|
||||
"file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ")) " <<
|
||||
"AS " << quote << "check" << ";";
|
||||
|
||||
@ -706,7 +708,6 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
|
||||
MetricCollector metric;
|
||||
|
||||
NextFileId(file_schema.file_id_);
|
||||
file_schema.file_type_ = TableFileSchema::NEW;
|
||||
file_schema.dimension_ = table_schema.dimension_;
|
||||
file_schema.size_ = 0;
|
||||
file_schema.created_on_ = utils::GetMicroSecTimeStamp();
|
||||
@ -1701,7 +1702,10 @@ Status MySQLMetaImpl::CleanUp() {
|
||||
|
||||
if (!res.empty()) {
|
||||
ENGINE_LOG_DEBUG << "Remove table file type as NEW";
|
||||
cleanUpQuery << "DELETE FROM TableFiles WHERE file_type = " << std::to_string(TableFileSchema::NEW) << ";";
|
||||
cleanUpQuery << "DELETE FROM TableFiles WHERE file_type IN ("
|
||||
<< std::to_string(TableFileSchema::NEW) << ","
|
||||
<< std::to_string(TableFileSchema::NEW_MERGE) << ","
|
||||
<< std::to_string(TableFileSchema::NEW_INDEX) << ");";
|
||||
|
||||
ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str();
|
||||
|
||||
|
||||
@ -34,7 +34,7 @@ std::string GetTableFileParentFolder(const DBMetaOptions& options, const meta::T
|
||||
std::string target_path = options.path;
|
||||
uint64_t index = 0;
|
||||
|
||||
if(meta::TableFileSchema::INDEX == table_file.file_type_) {
|
||||
if(meta::TableFileSchema::NEW_INDEX == table_file.file_type_) {
|
||||
// index file is large file and to be persisted permanently
|
||||
// we need to distribute index files to each db_path averagely
|
||||
// round robin according to a file counter
|
||||
|
||||
@ -22,7 +22,7 @@ static constexpr size_t PARALLEL_REDUCE_BATCH = 1000;
|
||||
bool NeedParallelReduce(uint64_t nq, uint64_t topk) {
|
||||
server::ServerConfig &config = server::ServerConfig::GetInstance();
|
||||
server::ConfigNode& db_config = config.GetConfig(server::CONFIG_DB);
|
||||
bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, true);
|
||||
bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, false);
|
||||
if(!need_parallel) {
|
||||
return false;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user