diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index ceac415eda..a1a305380b 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -68,7 +68,7 @@ TraverseFiles(const meta::DatePartionedTableFilesSchema& date_files, meta::Table } // namespace DBImpl::DBImpl(const DBOptions& options) - : options_(options), shutting_down_(true), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) { + : options_(options), initialized_(false), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) { meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_); mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_); Start(); @@ -83,12 +83,12 @@ DBImpl::~DBImpl() { /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// Status DBImpl::Start() { - if (!shutting_down_.load(std::memory_order_acquire)) { + if (initialized_.load(std::memory_order_acquire)) { return Status::OK(); } // ENGINE_LOG_TRACE << "DB service start"; - shutting_down_.store(false, std::memory_order_release); + initialized_.store(true, std::memory_order_release); // for distribute version, some nodes are read only if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) { @@ -101,11 +101,10 @@ DBImpl::Start() { Status DBImpl::Stop() { - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return Status::OK(); } - - shutting_down_.store(true, std::memory_order_release); + initialized_.store(false, std::memory_order_release); // makesure all memory data serialized std::set sync_table_ids; @@ -129,7 +128,7 @@ DBImpl::DropAll() { Status DBImpl::CreateTable(meta::TableSchema& table_schema) { - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -140,7 +139,7 @@ DBImpl::CreateTable(meta::TableSchema& table_schema) { Status DBImpl::DropTable(const std::string& table_id, const meta::DatesT& dates) { - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -149,7 +148,7 @@ DBImpl::DropTable(const std::string& table_id, const meta::DatesT& dates) { Status DBImpl::DescribeTable(meta::TableSchema& table_schema) { - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -160,7 +159,7 @@ DBImpl::DescribeTable(meta::TableSchema& table_schema) { Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) { - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -169,7 +168,7 @@ DBImpl::HasTable(const std::string& table_id, bool& has_or_not) { Status DBImpl::AllTables(std::vector& table_schema_array) { - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -189,7 +188,7 @@ DBImpl::AllTables(std::vector& table_schema_array) { Status DBImpl::PreloadTable(const std::string& table_id) { - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -248,7 +247,7 @@ DBImpl::PreloadTable(const std::string& table_id) { Status DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) { - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -257,7 +256,7 @@ DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) { Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) { - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -267,7 +266,7 @@ DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) { Status DBImpl::CreatePartition(const std::string& table_id, const std::string& partition_name, const std::string& partition_tag) { - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -276,7 +275,7 @@ DBImpl::CreatePartition(const std::string& table_id, const std::string& partitio Status DBImpl::DropPartition(const std::string& partition_name) { - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -294,7 +293,7 @@ DBImpl::DropPartition(const std::string& partition_name) { Status DBImpl::DropPartitionByTag(const std::string& table_id, const std::string& partition_tag) { - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -310,7 +309,7 @@ DBImpl::DropPartitionByTag(const std::string& table_id, const std::string& parti Status DBImpl::ShowPartitions(const std::string& table_id, std::vector& partition_schema_array) { - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -321,7 +320,7 @@ Status DBImpl::InsertVectors(const std::string& table_id, const std::string& partition_tag, uint64_t n, const float* vectors, IDNumbers& vector_ids) { // ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache"; - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -346,7 +345,7 @@ DBImpl::InsertVectors(const std::string& table_id, const std::string& partition_ Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) { - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -389,7 +388,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) { Status DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) { - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -398,7 +397,7 @@ DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) { Status DBImpl::DropIndex(const std::string& table_id) { - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -410,7 +409,7 @@ Status DBImpl::Query(const std::shared_ptr& context, const std::string& table_id, const std::vector& partition_tags, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors, ResultIds& result_ids, ResultDistances& result_distances) { - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -427,7 +426,7 @@ DBImpl::Query(const std::shared_ptr& context, const std::string ResultDistances& result_distances) { auto query_ctx = context->Child("Query"); - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -476,7 +475,7 @@ DBImpl::QueryByFileID(const std::shared_ptr& context, const std ResultDistances& result_distances) { auto query_ctx = context->Child("Query by file id"); - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -512,7 +511,7 @@ DBImpl::QueryByFileID(const std::shared_ptr& context, const std Status DBImpl::Size(uint64_t& result) { - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -566,7 +565,7 @@ DBImpl::BackgroundTimerTask() { Status status; server::SystemInfo::GetInstance().Init(); while (true) { - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { WaitMergeFileFinish(); WaitBuildIndexFinish(); @@ -802,7 +801,7 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) { MergeFiles(table_id, kv.first, kv.second); status = ongoing_files_checker_.UnmarkOngoingFiles(files); - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id; break; } @@ -822,7 +821,7 @@ DBImpl::BackgroundCompaction(std::set table_ids) { ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString(); } - if (shutting_down_.load(std::memory_order_acquire)) { + if (!initialized_.load(std::memory_order_acquire)) { ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action"; break; } diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index c8b4fe4f73..3c9161f4d9 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -187,7 +187,7 @@ class DBImpl : public DB { private: const DBOptions options_; - std::atomic shutting_down_; + std::atomic initialized_; std::thread bg_timer_thread_;