diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index f7c8e986a2..fa4066e27c 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -366,6 +366,7 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, meta::TableFileSchema table_file; table_file.table_id_ = table_id; table_file.date_ = date; + table_file.file_type_ = meta::TableFileSchema::NEW_MERGE; Status status = meta_ptr_->CreateTableFile(table_file); if (!status.ok()) { @@ -526,7 +527,7 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { meta::TableFileSchema table_file; table_file.table_id_ = file.table_id_; table_file.date_ = file.date_; - table_file.file_type_ = meta::TableFileSchema::INDEX; //for multi-db-path, distribute index file averagely to each path + table_file.file_type_ = meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path Status status = meta_ptr_->CreateTableFile(table_file); if (!status.ok()) { ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString(); @@ -558,15 +559,25 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { auto to_remove = file; to_remove.file_type_ = meta::TableFileSchema::TO_DELETE; - meta::TableFilesSchema update_files = {to_remove, table_file}; - meta_ptr_->UpdateTableFiles(update_files); + meta::TableFilesSchema update_files = {table_file, to_remove}; + status = meta_ptr_->UpdateTableFiles(update_files); + if(status.ok()) { + ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " + << index->PhysicalSize() << " bytes" + << " from file " << to_remove.file_id_; - ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size " - << index->PhysicalSize() << " bytes" - << " from file " << to_remove.file_id_; + if(options_.insert_cache_immediately_) { + index->Cache(); + } + } else { + //failed to update meta, mark the new file as to_delete, don't delete old file + to_remove.file_type_ = meta::TableFileSchema::TO_INDEX; + status = meta_ptr_->UpdateTableFile(to_remove); + ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << to_remove.file_id_ << " to to_index"; - if(options_.insert_cache_immediately_) { - index->Cache(); + table_file.file_type_ = meta::TableFileSchema::TO_DELETE; + status = meta_ptr_->UpdateTableFile(table_file); + ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; } } catch (std::exception& ex) { diff --git a/cpp/src/db/DBMetaImpl.cpp b/cpp/src/db/DBMetaImpl.cpp index 6149202123..23c308c1ec 100644 --- a/cpp/src/db/DBMetaImpl.cpp +++ b/cpp/src/db/DBMetaImpl.cpp @@ -302,19 +302,49 @@ Status DBMetaImpl::DescribeTable(TableSchema &table_schema) { Status DBMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has) { has = false; try { - auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_), + auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_, + &TableFileSchema::file_type_), where((c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW or c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW or + c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW_MERGE + or + c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW_INDEX + or c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX) and c(&TableFileSchema::table_id_) == table_id )); if (selected.size() >= 1) { has = true; - } else { - has = false; + + int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0, to_index_count = 0; + for (auto &file : selected) { + switch (std::get<1>(file)) { + case (int) TableFileSchema::RAW: + raw_count++; + break; + case (int) TableFileSchema::NEW: + new_count++; + break; + case (int) TableFileSchema::NEW_MERGE: + new_merge_count++; + break; + case (int) TableFileSchema::NEW_INDEX: + new_index_count++; + break; + case (int) TableFileSchema::TO_INDEX: + to_index_count++; + break; + default: + break; + } + } + + ENGINE_LOG_DEBUG << "Table " << table_id << " currently has raw files:" << raw_count + << " new files:" << new_count << " new_merge files:" << new_merge_count + << " new_index files:" << new_index_count << " to_index files:" << to_index_count; } } catch (std::exception &e) { @@ -389,7 +419,6 @@ Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) { MetricCollector metric; NextFileId(file_schema.file_id_); - file_schema.file_type_ = TableFileSchema::NEW; file_schema.dimension_ = table_schema.dimension_; file_schema.size_ = 0; file_schema.created_on_ = utils::GetMicroSecTimeStamp(); @@ -958,7 +987,11 @@ Status DBMetaImpl::CleanUp() { std::lock_guard meta_lock(meta_mutex_); auto files = ConnectorPtr->select(columns(&TableFileSchema::id_), - where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW)); + where(c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW + or + c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW_INDEX + or + c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW_MERGE)); auto commited = ConnectorPtr->transaction([&]() mutable { for (auto &file : files) { diff --git a/cpp/src/db/MetaTypes.h b/cpp/src/db/MetaTypes.h index 641712e9d5..d7039685e3 100644 --- a/cpp/src/db/MetaTypes.h +++ b/cpp/src/db/MetaTypes.h @@ -43,6 +43,8 @@ struct TableFileSchema { TO_INDEX, INDEX, TO_DELETE, + NEW_MERGE, + NEW_INDEX, } FILE_TYPE; size_t id_ = 0; diff --git a/cpp/src/db/MySQLMetaImpl.cpp b/cpp/src/db/MySQLMetaImpl.cpp index 14879d81fe..373d1b5507 100644 --- a/cpp/src/db/MySQLMetaImpl.cpp +++ b/cpp/src/db/MySQLMetaImpl.cpp @@ -404,6 +404,8 @@ Status MySQLMetaImpl::HasNonIndexFiles(const std::string &table_id, bool &has) { "WHERE table_id = " << quote << table_id << " AND " << "(file_type = " << std::to_string(TableFileSchema::RAW) << " OR " << "file_type = " << std::to_string(TableFileSchema::NEW) << " OR " << + "file_type = " << std::to_string(TableFileSchema::NEW_MERGE) << " OR " << + "file_type = " << std::to_string(TableFileSchema::NEW_INDEX) << " OR " << "file_type = " << std::to_string(TableFileSchema::TO_INDEX) << ")) " << "AS " << quote << "check" << ";"; @@ -706,7 +708,6 @@ Status MySQLMetaImpl::CreateTableFile(TableFileSchema &file_schema) { MetricCollector metric; NextFileId(file_schema.file_id_); - file_schema.file_type_ = TableFileSchema::NEW; file_schema.dimension_ = table_schema.dimension_; file_schema.size_ = 0; file_schema.created_on_ = utils::GetMicroSecTimeStamp(); @@ -1701,7 +1702,10 @@ Status MySQLMetaImpl::CleanUp() { if (!res.empty()) { ENGINE_LOG_DEBUG << "Remove table file type as NEW"; - cleanUpQuery << "DELETE FROM TableFiles WHERE file_type = " << std::to_string(TableFileSchema::NEW) << ";"; + cleanUpQuery << "DELETE FROM TableFiles WHERE file_type IN (" + << std::to_string(TableFileSchema::NEW) << "," + << std::to_string(TableFileSchema::NEW_MERGE) << "," + << std::to_string(TableFileSchema::NEW_INDEX) << ");"; ENGINE_LOG_DEBUG << "MySQLMetaImpl::CleanUp: " << cleanUpQuery.str(); diff --git a/cpp/src/db/Utils.cpp b/cpp/src/db/Utils.cpp index aa533559bd..933ca06bd5 100644 --- a/cpp/src/db/Utils.cpp +++ b/cpp/src/db/Utils.cpp @@ -34,7 +34,7 @@ std::string GetTableFileParentFolder(const DBMetaOptions& options, const meta::T std::string target_path = options.path; uint64_t index = 0; - if(meta::TableFileSchema::INDEX == table_file.file_type_) { + if(meta::TableFileSchema::NEW_INDEX == table_file.file_type_) { // index file is large file and to be persisted permanently // we need to distribute index files to each db_path averagely // round robin according to a file counter diff --git a/cpp/src/db/scheduler/task/SearchTask.cpp b/cpp/src/db/scheduler/task/SearchTask.cpp index e696faaed0..79baeeafe9 100644 --- a/cpp/src/db/scheduler/task/SearchTask.cpp +++ b/cpp/src/db/scheduler/task/SearchTask.cpp @@ -22,7 +22,7 @@ static constexpr size_t PARALLEL_REDUCE_BATCH = 1000; bool NeedParallelReduce(uint64_t nq, uint64_t topk) { server::ServerConfig &config = server::ServerConfig::GetInstance(); server::ConfigNode& db_config = config.GetConfig(server::CONFIG_DB); - bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, true); + bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, false); if(!need_parallel) { return false; }