From 0fd4b244c056058390d446f8272a6fa6ea882077 Mon Sep 17 00:00:00 2001 From: groot Date: Sat, 4 Apr 2020 21:14:30 +0800 Subject: [PATCH] refine dbimpl (#1869) * #1827 Combine request target vectors exceed max nq Signed-off-by: groot * refine dbimpl Signed-off-by: groot * refine dbimpl Signed-off-by: groot * fix unittest failure Signed-off-by: groot Co-authored-by: Jin Hai --- core/conf/server_cpu_config.template | 5 + core/conf/server_gpu_config.template | 5 + core/src/db/DBImpl.cpp | 176 ++++++++++++++------------- core/src/db/DBImpl.h | 33 +++-- core/unittest/db/test_delete.cpp | 2 +- core/unittest/db/utils.cpp | 10 +- 6 files changed, 134 insertions(+), 97 deletions(-) diff --git a/core/conf/server_cpu_config.template b/core/conf/server_cpu_config.template index 7ddb71ea4a..2a3d52e119 100644 --- a/core/conf/server_cpu_config.template +++ b/core/conf/server_cpu_config.template @@ -47,9 +47,14 @@ server_config: # | '*' means preload all existing tables (single-quote or | | | # | double-quote required). | | | #----------------------+------------------------------------------------------------+------------+-----------------+ +# auto_flush_interval | The interval, in seconds, at which Milvus automatically | Integer | 1 (s) | +# | flushes data to disk. | | | +# | 0 means disable the regular flush. | | | +#----------------------+------------------------------------------------------------+------------+-----------------+ db_config: backend_url: sqlite://:@:/ preload_table: + auto_flush_interval: 1 #----------------------+------------------------------------------------------------+------------+-----------------+ # Storage Config | Description | Type | Default | diff --git a/core/conf/server_gpu_config.template b/core/conf/server_gpu_config.template index 804f332482..593d830c38 100644 --- a/core/conf/server_gpu_config.template +++ b/core/conf/server_gpu_config.template @@ -47,9 +47,14 @@ server_config: # | '*' means preload all existing tables (single-quote or | | | # | double-quote required). | | | #----------------------+------------------------------------------------------------+------------+-----------------+ +# auto_flush_interval | The interval, in seconds, at which Milvus automatically | Integer | 1 (s) | +# | flushes data to disk. | | | +# | 0 means disable the regular flush. | | | +#----------------------+------------------------------------------------------------+------------+-----------------+ db_config: backend_url: sqlite://:@:/ preload_table: + auto_flush_interval: 1 #----------------------+------------------------------------------------------------+------------+-----------------+ # Storage Config | Description | Type | Default | diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 2d674dddad..3191c88441 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -53,10 +53,8 @@ namespace milvus { namespace engine { namespace { - -constexpr uint64_t METRIC_ACTION_INTERVAL = 1; -constexpr uint64_t COMPACT_ACTION_INTERVAL = 1; -constexpr uint64_t INDEX_ACTION_INTERVAL = 1; +constexpr uint64_t BACKGROUND_METRIC_INTERVAL = 1; +constexpr uint64_t BACKGROUND_INDEX_INTERVAL = 1; static const Status SHUTDOWN_ERROR = Status(DB_ERROR, "Milvus server is shutdown!"); @@ -125,18 +123,26 @@ DBImpl::Start() { // for distribute version, some nodes are read only if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) { - // background thread - bg_wal_thread_ = std::thread(&DBImpl::BackgroundWalTask, this); + // background wal thread + bg_wal_thread_ = std::thread(&DBImpl::BackgroundWalThread, this); } - } else { // for distribute version, some nodes are read only if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) { - // ENGINE_LOG_TRACE << "StartTimerTasks"; - bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this); + // background flush thread + bg_flush_thread_ = std::thread(&DBImpl::BackgroundFlushThread, this); } } + // for distribute version, some nodes are read only + if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) { + // background build index thread + bg_index_thread_ = std::thread(&DBImpl::BackgroundIndexThread, this); + } + + // background metric thread + bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this); + return Status::OK(); } @@ -150,24 +156,30 @@ DBImpl::Stop() { if (options_.mode_ != DBOptions::MODE::CLUSTER_READONLY) { if (options_.wal_enable_) { - // wait flush merge/buildindex finish - bg_task_swn_.Notify(); + // wait wal thread finish + swn_wal_.Notify(); bg_wal_thread_.join(); - } else { - // flush all + // flush all without merge wal::MXLogRecord record; record.type = wal::MXLogType::Flush; ExecWalRecord(record); - // wait merge/buildindex finish - bg_task_swn_.Notify(); - bg_timer_thread_.join(); + // wait flush thread finish + swn_flush_.Notify(); + bg_flush_thread_.join(); } + swn_index_.Notify(); + bg_index_thread_.join(); + meta_ptr_->CleanUpShadowFiles(); } + // wait metric thread exit + swn_metric_.Notify(); + bg_metric_thread_.join(); + // ENGINE_LOG_TRACE << "DB service stop"; return Status::OK(); } @@ -512,8 +524,7 @@ DBImpl::InsertVectors(const std::string& collection_id, const std::string& parti } else if (!vectors.binary_data_.empty()) { wal_mgr_->Insert(collection_id, partition_tag, vectors.id_array_, vectors.binary_data_); } - bg_task_swn_.Notify(); - + swn_wal_.Notify(); } else { wal::MXLogRecord record; record.lsn = 0; // need to get from meta ? @@ -555,8 +566,7 @@ DBImpl::DeleteVectors(const std::string& collection_id, IDNumbers vector_ids) { Status status; if (options_.wal_enable_) { wal_mgr_->DeleteById(collection_id, vector_ids); - bg_task_swn_.Notify(); - + swn_wal_.Notify(); } else { wal::MXLogRecord record; record.lsn = 0; // need to get from meta ? @@ -593,19 +603,14 @@ DBImpl::Flush(const std::string& collection_id) { if (options_.wal_enable_) { ENGINE_LOG_DEBUG << "WAL flush"; auto lsn = wal_mgr_->Flush(collection_id); - ENGINE_LOG_DEBUG << "wal_mgr_->Flush"; if (lsn != 0) { - bg_task_swn_.Notify(); - flush_task_swn_.Wait(); - ENGINE_LOG_DEBUG << "flush_task_swn_.Wait()"; + swn_wal_.Notify(); + flush_req_swn_.Wait(); } } else { ENGINE_LOG_DEBUG << "MemTable flush"; - wal::MXLogRecord record; - record.type = wal::MXLogType::Flush; - record.collection_id = collection_id; - status = ExecWalRecord(record); + InternalFlush(collection_id); } ENGINE_LOG_DEBUG << "End flush collection: " << collection_id; @@ -626,14 +631,12 @@ DBImpl::Flush() { ENGINE_LOG_DEBUG << "WAL flush"; auto lsn = wal_mgr_->Flush(); if (lsn != 0) { - bg_task_swn_.Notify(); - flush_task_swn_.Wait(); + swn_wal_.Notify(); + flush_req_swn_.Wait(); } } else { ENGINE_LOG_DEBUG << "MemTable flush"; - wal::MXLogRecord record; - record.type = wal::MXLogType::Flush; - status = ExecWalRecord(record); + InternalFlush(); } ENGINE_LOG_DEBUG << "End flush all collections"; @@ -1236,7 +1239,7 @@ DBImpl::QueryAsync(const std::shared_ptr& context, const meta:: } void -DBImpl::BackgroundTimerTask() { +DBImpl::BackgroundIndexThread() { server::SystemInfo::GetInstance().Init(); while (true) { if (!initialized_.load(std::memory_order_acquire)) { @@ -1247,14 +1250,9 @@ DBImpl::BackgroundTimerTask() { break; } - if (options_.auto_flush_interval_ > 0) { - bg_task_swn_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_)); - } else { - bg_task_swn_.Wait(); - } + swn_index_.Wait_For(std::chrono::seconds(BACKGROUND_INDEX_INTERVAL)); - StartMetricTask(); - StartMergeTask(); + WaitMergeFileFinish(); StartBuildIndexTask(); } } @@ -1281,13 +1279,7 @@ DBImpl::WaitBuildIndexFinish() { void DBImpl::StartMetricTask() { - static uint64_t metric_clock_tick = 0; - ++metric_clock_tick; - if (metric_clock_tick % METRIC_ACTION_INTERVAL != 0) { - return; - } - - server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL); + server::Metrics::GetInstance().KeepingAliveCounterIncrement(BACKGROUND_METRIC_INTERVAL); int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage(); int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity(); fiu_do_on("DBImpl.StartMetricTask.InvalidTotalCache", cache_total = 0); @@ -1317,16 +1309,6 @@ DBImpl::StartMetricTask() { void DBImpl::StartMergeTask() { - static uint64_t compact_clock_tick = 0; - ++compact_clock_tick; - if (compact_clock_tick % COMPACT_ACTION_INTERVAL != 0) { - return; - } - - if (!options_.wal_enable_) { - Flush(); - } - // ENGINE_LOG_DEBUG << "Begin StartMergeTask"; // merge task has been finished? { @@ -1514,13 +1496,7 @@ DBImpl::BackgroundMerge(std::set collection_ids) { } void -DBImpl::StartBuildIndexTask(bool force) { - static uint64_t index_clock_tick = 0; - ++index_clock_tick; - if (!force && (index_clock_tick % INDEX_ACTION_INTERVAL != 0)) { - return; - } - +DBImpl::StartBuildIndexTask() { // build index has been finished? { std::lock_guard lck(index_result_mutex_); @@ -1992,7 +1968,17 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { } void -DBImpl::BackgroundWalTask() { +DBImpl::InternalFlush(const std::string& collection_id) { + wal::MXLogRecord record; + record.type = wal::MXLogType::Flush; + record.collection_id = collection_id; + ExecWalRecord(record); + + StartMergeTask(); +} + +void +DBImpl::BackgroundWalThread() { server::SystemInfo::GetInstance().Init(); std::chrono::system_clock::time_point next_auto_flush_time; @@ -2003,26 +1989,15 @@ DBImpl::BackgroundWalTask() { next_auto_flush_time = get_next_auto_flush_time(); } - wal::MXLogRecord record; - - auto auto_flush = [&]() { - record.type = wal::MXLogType::Flush; - record.collection_id.clear(); - ExecWalRecord(record); - - StartMetricTask(); - StartMergeTask(); - StartBuildIndexTask(); - }; - while (true) { if (options_.auto_flush_interval_ > 0) { if (std::chrono::system_clock::now() >= next_auto_flush_time) { - auto_flush(); + InternalFlush(); next_auto_flush_time = get_next_auto_flush_time(); } } + wal::MXLogRecord record; auto error_code = wal_mgr_->GetNextRecord(record); if (error_code != WAL_SUCCESS) { ENGINE_LOG_ERROR << "WAL background GetNextRecord error"; @@ -2032,8 +2007,8 @@ DBImpl::BackgroundWalTask() { if (record.type != wal::MXLogType::None) { ExecWalRecord(record); if (record.type == wal::MXLogType::Flush) { - // user req flush - flush_task_swn_.Notify(); + // notify flush request to return + flush_req_swn_.Notify(); // if user flush all manually, update auto flush also if (record.collection_id.empty() && options_.auto_flush_interval_ > 0) { @@ -2043,7 +2018,8 @@ DBImpl::BackgroundWalTask() { } else { if (!initialized_.load(std::memory_order_acquire)) { - auto_flush(); + InternalFlush(); + flush_req_swn_.Notify(); WaitMergeFileFinish(); WaitBuildIndexFinish(); ENGINE_LOG_DEBUG << "WAL background thread exit"; @@ -2051,14 +2027,46 @@ DBImpl::BackgroundWalTask() { } if (options_.auto_flush_interval_ > 0) { - bg_task_swn_.Wait_Until(next_auto_flush_time); + swn_wal_.Wait_Until(next_auto_flush_time); } else { - bg_task_swn_.Wait(); + swn_wal_.Wait(); } } } } +void +DBImpl::BackgroundFlushThread() { + server::SystemInfo::GetInstance().Init(); + while (true) { + if (!initialized_.load(std::memory_order_acquire)) { + ENGINE_LOG_DEBUG << "DB background flush thread exit"; + break; + } + + InternalFlush(); + if (options_.auto_flush_interval_ > 0) { + swn_flush_.Wait_For(std::chrono::seconds(options_.auto_flush_interval_)); + } else { + swn_flush_.Wait(); + } + } +} + +void +DBImpl::BackgroundMetricThread() { + server::SystemInfo::GetInstance().Init(); + while (true) { + if (!initialized_.load(std::memory_order_acquire)) { + ENGINE_LOG_DEBUG << "DB background metric thread exit"; + break; + } + + swn_metric_.Wait_For(std::chrono::seconds(BACKGROUND_METRIC_INTERVAL)); + StartMetricTask(); + } +} + void DBImpl::OnCacheInsertDataChanged(bool value) { options_.insert_cache_immediately_ = value; diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index 0d7f222938..a11140a3dc 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -170,7 +170,19 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi const meta::SegmentsSchema& files); void - BackgroundTimerTask(); + InternalFlush(const std::string& collection_id = ""); + + void + BackgroundWalThread(); + + void + BackgroundFlushThread(); + + void + BackgroundMetricThread(); + + void + BackgroundIndexThread(); void WaitMergeFileFinish(); @@ -194,7 +206,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi BackgroundMerge(std::set collection_ids); void - StartBuildIndexTask(bool force = false); + StartBuildIndexTask(); void BackgroundBuildIndex(); @@ -240,22 +252,21 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi Status ExecWalRecord(const wal::MXLogRecord& record); - void - BackgroundWalTask(); - private: DBOptions options_; std::atomic initialized_; - std::thread bg_timer_thread_; - meta::MetaPtr meta_ptr_; MemManagerPtr mem_mgr_; std::shared_ptr wal_mgr_; std::thread bg_wal_thread_; + std::thread bg_flush_thread_; + std::thread bg_metric_thread_; + std::thread bg_index_thread_; + struct SimpleWaitNotify { bool notified_ = false; std::mutex mutex_; @@ -297,8 +308,12 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi } }; - SimpleWaitNotify bg_task_swn_; - SimpleWaitNotify flush_task_swn_; + SimpleWaitNotify swn_wal_; + SimpleWaitNotify swn_flush_; + SimpleWaitNotify swn_metric_; + SimpleWaitNotify swn_index_; + + SimpleWaitNotify flush_req_swn_; ThreadPool merge_thread_pool_; std::mutex merge_result_mutex_; diff --git a/core/unittest/db/test_delete.cpp b/core/unittest/db/test_delete.cpp index f72d20418f..8253ef8ffd 100644 --- a/core/unittest/db/test_delete.cpp +++ b/core/unittest/db/test_delete.cpp @@ -562,7 +562,7 @@ TEST_F(DeleteTest, delete_add_create_index) { // stat = db_->Flush(); // ASSERT_TRUE(stat.ok()); milvus::engine::CollectionIndex index; - index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFSQ8; + index.engine_type_ = (int)milvus::engine::EngineType::FAISS_IVFFLAT; index.extra_params_ = {{"nlist", 100}}; stat = db_->CreateIndex(collection_info.collection_id_, index); ASSERT_TRUE(stat.ok()); diff --git a/core/unittest/db/utils.cpp b/core/unittest/db/utils.cpp index 68ec958c21..b9076d0a24 100644 --- a/core/unittest/db/utils.cpp +++ b/core/unittest/db/utils.cpp @@ -188,8 +188,10 @@ DBTest::SetUp() { void DBTest::TearDown() { - db_->Stop(); - db_->DropAll(); + if (db_) { + db_->Stop(); + db_->DropAll(); + } milvus::scheduler::JobMgrInst::GetInstance()->Stop(); milvus::scheduler::SchedInst::GetInstance()->Stop(); @@ -309,7 +311,9 @@ MySqlMetaTest::SetUp() { void MySqlMetaTest::TearDown() { - impl_->DropAll(); + if (impl_) { + impl_->DropAll(); + } BaseTest::TearDown();