mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
parent
fd24d6f96c
commit
5d2d34791d
@ -68,7 +68,7 @@ TraverseFiles(const meta::DatePartionedTableFilesSchema& date_files, meta::Table
|
||||
} // namespace
|
||||
|
||||
DBImpl::DBImpl(const DBOptions& options)
|
||||
: options_(options), shutting_down_(true), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) {
|
||||
: options_(options), initialized_(false), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) {
|
||||
meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
|
||||
mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
|
||||
Start();
|
||||
@ -83,12 +83,12 @@ DBImpl::~DBImpl() {
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
Status
|
||||
DBImpl::Start() {
|
||||
if (!shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (initialized_.load(std::memory_order_acquire)) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// ENGINE_LOG_TRACE << "DB service start";
|
||||
shutting_down_.store(false, std::memory_order_release);
|
||||
initialized_.store(true, std::memory_order_release);
|
||||
|
||||
// for distribute version, some nodes are read only
|
||||
if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) {
|
||||
@ -101,11 +101,10 @@ DBImpl::Start() {
|
||||
|
||||
Status
|
||||
DBImpl::Stop() {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
shutting_down_.store(true, std::memory_order_release);
|
||||
initialized_.store(false, std::memory_order_release);
|
||||
|
||||
// makesure all memory data serialized
|
||||
std::set<std::string> sync_table_ids;
|
||||
@ -129,7 +128,7 @@ DBImpl::DropAll() {
|
||||
|
||||
Status
|
||||
DBImpl::CreateTable(meta::TableSchema& table_schema) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -140,7 +139,7 @@ DBImpl::CreateTable(meta::TableSchema& table_schema) {
|
||||
|
||||
Status
|
||||
DBImpl::DropTable(const std::string& table_id, const meta::DatesT& dates) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -149,7 +148,7 @@ DBImpl::DropTable(const std::string& table_id, const meta::DatesT& dates) {
|
||||
|
||||
Status
|
||||
DBImpl::DescribeTable(meta::TableSchema& table_schema) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -160,7 +159,7 @@ DBImpl::DescribeTable(meta::TableSchema& table_schema) {
|
||||
|
||||
Status
|
||||
DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -169,7 +168,7 @@ DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
|
||||
|
||||
Status
|
||||
DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -189,7 +188,7 @@ DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
|
||||
|
||||
Status
|
||||
DBImpl::PreloadTable(const std::string& table_id) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -248,7 +247,7 @@ DBImpl::PreloadTable(const std::string& table_id) {
|
||||
|
||||
Status
|
||||
DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -257,7 +256,7 @@ DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
|
||||
|
||||
Status
|
||||
DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -267,7 +266,7 @@ DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
|
||||
Status
|
||||
DBImpl::CreatePartition(const std::string& table_id, const std::string& partition_name,
|
||||
const std::string& partition_tag) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -276,7 +275,7 @@ DBImpl::CreatePartition(const std::string& table_id, const std::string& partitio
|
||||
|
||||
Status
|
||||
DBImpl::DropPartition(const std::string& partition_name) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -294,7 +293,7 @@ DBImpl::DropPartition(const std::string& partition_name) {
|
||||
|
||||
Status
|
||||
DBImpl::DropPartitionByTag(const std::string& table_id, const std::string& partition_tag) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -310,7 +309,7 @@ DBImpl::DropPartitionByTag(const std::string& table_id, const std::string& parti
|
||||
|
||||
Status
|
||||
DBImpl::ShowPartitions(const std::string& table_id, std::vector<meta::TableSchema>& partition_schema_array) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -321,7 +320,7 @@ Status
|
||||
DBImpl::InsertVectors(const std::string& table_id, const std::string& partition_tag, uint64_t n, const float* vectors,
|
||||
IDNumbers& vector_ids) {
|
||||
// ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -346,7 +345,7 @@ DBImpl::InsertVectors(const std::string& table_id, const std::string& partition_
|
||||
|
||||
Status
|
||||
DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -389,7 +388,7 @@ DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
|
||||
|
||||
Status
|
||||
DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -398,7 +397,7 @@ DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
|
||||
|
||||
Status
|
||||
DBImpl::DropIndex(const std::string& table_id) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -410,7 +409,7 @@ Status
|
||||
DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string& table_id,
|
||||
const std::vector<std::string>& partition_tags, uint64_t k, uint64_t nq, uint64_t nprobe,
|
||||
const float* vectors, ResultIds& result_ids, ResultDistances& result_distances) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -427,7 +426,7 @@ DBImpl::Query(const std::shared_ptr<server::Context>& context, const std::string
|
||||
ResultDistances& result_distances) {
|
||||
auto query_ctx = context->Child("Query");
|
||||
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -476,7 +475,7 @@ DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std
|
||||
ResultDistances& result_distances) {
|
||||
auto query_ctx = context->Child("Query by file id");
|
||||
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -512,7 +511,7 @@ DBImpl::QueryByFileID(const std::shared_ptr<server::Context>& context, const std
|
||||
|
||||
Status
|
||||
DBImpl::Size(uint64_t& result) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
return SHUTDOWN_ERROR;
|
||||
}
|
||||
|
||||
@ -566,7 +565,7 @@ DBImpl::BackgroundTimerTask() {
|
||||
Status status;
|
||||
server::SystemInfo::GetInstance().Init();
|
||||
while (true) {
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
WaitMergeFileFinish();
|
||||
WaitBuildIndexFinish();
|
||||
|
||||
@ -802,7 +801,7 @@ DBImpl::BackgroundMergeFiles(const std::string& table_id) {
|
||||
MergeFiles(table_id, kv.first, kv.second);
|
||||
status = ongoing_files_checker_.UnmarkOngoingFiles(files);
|
||||
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id;
|
||||
break;
|
||||
}
|
||||
@ -822,7 +821,7 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
|
||||
ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
|
||||
}
|
||||
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
if (!initialized_.load(std::memory_order_acquire)) {
|
||||
ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action";
|
||||
break;
|
||||
}
|
||||
|
||||
@ -187,7 +187,7 @@ class DBImpl : public DB {
|
||||
private:
|
||||
const DBOptions options_;
|
||||
|
||||
std::atomic<bool> shutting_down_;
|
||||
std::atomic<bool> initialized_;
|
||||
|
||||
std::thread bg_timer_thread_;
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user