diff --git a/core/src/db/DB.h b/core/src/db/DB.h index c25ab8e60e..ef458b59ea 100644 --- a/core/src/db/DB.h +++ b/core/src/db/DB.h @@ -71,7 +71,8 @@ class DB { GetCollectionRowCount(const std::string& collection_id, uint64_t& row_count) = 0; virtual Status - PreloadCollection(const std::string& collection_id) = 0; + PreloadCollection(const std::shared_ptr& context, const std::string& collection_id, + bool force = false) = 0; virtual Status UpdateCollectionFlag(const std::string& collection_id, int64_t flag) = 0; @@ -108,10 +109,11 @@ class DB { Flush() = 0; virtual Status - Compact(const std::string& collection_id, double threshold = 0.0) = 0; + Compact(const std::shared_ptr& context, const std::string& collection_id, + double threshold = 0.0) = 0; virtual Status - GetVectorsByID(const std::string& collection_id, const IDNumbers& id_array, + GetVectorsByID(const engine::meta::CollectionSchema& collection, const IDNumbers& id_array, std::vector& vectors) = 0; virtual Status diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index e4042f77f5..f9d1245264 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -115,6 +115,16 @@ DBImpl::Start() { // LOG_ENGINE_TRACE_ << "DB service start"; initialized_.store(true, std::memory_order_release); + // server may be closed unexpected, these un-merge files need to be merged when server restart + // and soft-delete files need to be deleted when server restart + std::set merge_collection_ids; + std::vector collection_schema_array; + meta_ptr_->AllCollections(collection_schema_array); + for (auto& schema : collection_schema_array) { + merge_collection_ids.insert(schema.collection_id_); + } + StartMergeTask(merge_collection_ids, true); + // wal if (options_.wal_enable_) { auto error_code = DB_ERROR; @@ -158,7 +168,9 @@ DBImpl::Start() { } // background metric thread - bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this); + if (options_.metric_enable_) { + bg_metric_thread_ = std::thread(&DBImpl::BackgroundMetricThread, this); + } return Status::OK(); } @@ -196,8 +208,10 @@ DBImpl::Stop() { } // wait metric thread exit - swn_metric_.Notify(); - bg_metric_thread_.join(); + if (options_.metric_enable_) { + swn_metric_.Notify(); + bg_metric_thread_.join(); + } // LOG_ENGINE_TRACE_ << "DB service stop"; return Status::OK(); @@ -386,7 +400,8 @@ DBImpl::GetCollectionInfo(const std::string& collection_id, std::string& collect } Status -DBImpl::PreloadCollection(const std::string& collection_id) { +DBImpl::PreloadCollection(const std::shared_ptr& context, const std::string& collection_id, + bool force) { if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -436,6 +451,12 @@ DBImpl::PreloadCollection(const std::string& collection_id) { << " files need to be pre-loaded"; TimeRecorderAuto rc("Pre-load collection:" + collection_id); for (auto& file : files_array) { + // client break the connection, no need to continue + if (context && context->IsConnectionBroken()) { + LOG_ENGINE_DEBUG_ << "Client connection broken, stop load collection"; + break; + } + EngineType engine_type; if (file.file_type_ == meta::SegmentSchema::FILE_TYPE::RAW || file.file_type_ == meta::SegmentSchema::FILE_TYPE::TO_INDEX || @@ -467,7 +488,7 @@ DBImpl::PreloadCollection(const std::string& collection_id) { } size += engine->Size(); - if (size > available_size) { + if (!force && size > available_size) { LOG_ENGINE_DEBUG_ << "Pre-load cancelled since cache is almost full"; return Status(SERVER_CACHE_FULL, "Cache is full"); } @@ -919,7 +940,6 @@ DBImpl::Flush(const std::string& collection_id) { swn_wal_.Notify(); flush_req_swn_.Wait(); } - StartMergeTask(); } else { LOG_ENGINE_DEBUG_ << "MemTable flush"; InternalFlush(collection_id); @@ -946,7 +966,6 @@ DBImpl::Flush() { swn_wal_.Notify(); flush_req_swn_.Wait(); } - StartMergeTask(); } else { LOG_ENGINE_DEBUG_ << "MemTable flush"; InternalFlush(); @@ -958,7 +977,7 @@ DBImpl::Flush() { } Status -DBImpl::Compact(const std::string& collection_id, double threshold) { +DBImpl::Compact(const std::shared_ptr& context, const std::string& collection_id, double threshold) { if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } @@ -982,7 +1001,9 @@ DBImpl::Compact(const std::string& collection_id, double threshold) { LOG_ENGINE_DEBUG_ << "Before compacting, wait for build index thread to finish..."; - // WaitBuildIndexFinish(); + std::vector collection_array; + status = meta_ptr_->ShowPartitions(collection_id, collection_array); + collection_array.push_back(collection_schema); const std::lock_guard index_lock(build_index_mutex_); const std::lock_guard merge_lock(flush_merge_compact_mutex_); @@ -993,7 +1014,7 @@ DBImpl::Compact(const std::string& collection_id, double threshold) { std::vector file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX, meta::SegmentSchema::FILE_TYPE::BACKUP}; meta::FilesHolder files_holder; - status = meta_ptr_->FilesByType(collection_id, file_types, files_holder); + status = meta_ptr_->FilesByTypeEx(collection_array, file_types, files_holder); if (!status.ok()) { std::string err_msg = "Failed to get files to compact: " + status.message(); LOG_ENGINE_ERROR_ << err_msg; @@ -1006,6 +1027,12 @@ DBImpl::Compact(const std::string& collection_id, double threshold) { // attention: here is a copy, not reference, since files_holder.UnmarkFile will change the array internal milvus::engine::meta::SegmentsSchema files_to_compact = files_holder.HoldFiles(); for (auto iter = files_to_compact.begin(); iter != files_to_compact.end();) { + // client break the connection, no need to continue + if (context && context->IsConnectionBroken()) { + LOG_ENGINE_DEBUG_ << "Client connection broken, stop compact operation"; + break; + } + meta::SegmentSchema file = *iter; iter = files_to_compact.erase(iter); @@ -1023,7 +1050,7 @@ DBImpl::Compact(const std::string& collection_id, double threshold) { meta::SegmentsSchema files_to_update; if (deleted_docs_size != 0) { - compact_status = CompactFile(collection_id, threshold, file, files_to_update); + compact_status = CompactFile(file, threshold, files_to_update); if (!compact_status.ok()) { LOG_ENGINE_ERROR_ << "Compact failed for segment " << file.segment_id_ << ": " @@ -1054,9 +1081,8 @@ DBImpl::Compact(const std::string& collection_id, double threshold) { } Status -DBImpl::CompactFile(const std::string& collection_id, double threshold, const meta::SegmentSchema& file, - meta::SegmentsSchema& files_to_update) { - LOG_ENGINE_DEBUG_ << "Compacting segment " << file.segment_id_ << " for collection: " << collection_id; +DBImpl::CompactFile(const meta::SegmentSchema& file, double threshold, meta::SegmentsSchema& files_to_update) { + LOG_ENGINE_DEBUG_ << "Compacting segment " << file.segment_id_ << " for collection: " << file.collection_id_; std::string segment_dir_to_merge; utils::GetParentPath(file.location_, segment_dir_to_merge); @@ -1068,7 +1094,7 @@ DBImpl::CompactFile(const std::string& collection_id, double threshold, const me auto status = segment_reader_to_merge.LoadDeletedDocs(deleted_docs_ptr); if (status.ok()) { auto delete_items = deleted_docs_ptr->GetDeletedDocs(); - double delete_rate = (double)delete_items.size() / (double)file.row_count_; + double delete_rate = (double)delete_items.size() / (double)(delete_items.size() + file.row_count_); if (delete_rate < threshold) { LOG_ENGINE_DEBUG_ << "Delete rate less than " << threshold << ", no need to compact for" << segment_dir_to_merge; @@ -1079,8 +1105,7 @@ DBImpl::CompactFile(const std::string& collection_id, double threshold, const me // Create new collection file meta::SegmentSchema compacted_file; - compacted_file.collection_id_ = collection_id; - // compacted_file.date_ = date; + compacted_file.collection_id_ = file.collection_id_; compacted_file.file_type_ = meta::SegmentSchema::NEW_MERGE; // TODO: use NEW_MERGE for now auto status = meta_ptr_->CreateCollectionFile(compacted_file); @@ -1090,7 +1115,6 @@ DBImpl::CompactFile(const std::string& collection_id, double threshold, const me } // Compact (merge) file to the newly created collection file - std::string new_segment_dir; utils::GetParentPath(compacted_file.location_, new_segment_dir); auto segment_writer_ptr = std::make_shared(new_segment_dir); @@ -1112,7 +1136,7 @@ DBImpl::CompactFile(const std::string& collection_id, double threshold, const me return status; } - // Update compacted file state, if origin file is backup or to_index, set compected file to to_index + // Update compacted file state, if origin file is backup or to_index, set compacted file to to_index compacted_file.file_size_ = segment_writer_ptr->Size(); compacted_file.row_count_ = segment_writer_ptr->VectorCount(); if ((file.file_type_ == (int32_t)meta::SegmentSchema::BACKUP || @@ -1157,53 +1181,41 @@ DBImpl::CompactFile(const std::string& collection_id, double threshold, const me } Status -DBImpl::GetVectorsByID(const std::string& collection_id, const IDNumbers& id_array, +DBImpl::GetVectorsByID(const engine::meta::CollectionSchema& collection, const IDNumbers& id_array, std::vector& vectors) { if (!initialized_.load(std::memory_order_acquire)) { return SHUTDOWN_ERROR; } - bool has_collection; - auto status = HasCollection(collection_id, has_collection); - if (!has_collection) { - LOG_ENGINE_ERROR_ << "Collection " << collection_id << " does not exist: "; - return Status(DB_NOT_FOUND, "Collection does not exist"); - } - if (!status.ok()) { - return status; - } - meta::FilesHolder files_holder; std::vector file_types{meta::SegmentSchema::FILE_TYPE::RAW, meta::SegmentSchema::FILE_TYPE::TO_INDEX, meta::SegmentSchema::FILE_TYPE::BACKUP}; - status = meta_ptr_->FilesByType(collection_id, file_types, files_holder); + std::vector collection_array; + auto status = meta_ptr_->ShowPartitions(collection.collection_id_, collection_array); + + collection_array.push_back(collection); + status = meta_ptr_->FilesByTypeEx(collection_array, file_types, files_holder); if (!status.ok()) { - std::string err_msg = "Failed to get files for GetVectorsByID: " + status.message(); + std::string err_msg = "Failed to get files for GetVectorByID: " + status.message(); LOG_ENGINE_ERROR_ << err_msg; return status; } - std::vector partition_array; - status = meta_ptr_->ShowPartitions(collection_id, partition_array); - for (auto& schema : partition_array) { - status = meta_ptr_->FilesByType(schema.collection_id_, file_types, files_holder); - if (!status.ok()) { - std::string err_msg = "Failed to get files for GetVectorByID: " + status.message(); - LOG_ENGINE_ERROR_ << err_msg; - return status; - } - } - if (files_holder.HoldFiles().empty()) { LOG_ENGINE_DEBUG_ << "No files to get vector by id from"; return Status(DB_NOT_FOUND, "Collection is empty"); } cache::CpuCacheMgr::GetInstance()->PrintInfo(); - status = GetVectorsByIdHelper(collection_id, id_array, vectors, files_holder); + status = GetVectorsByIdHelper(id_array, vectors, files_holder); cache::CpuCacheMgr::GetInstance()->PrintInfo(); + if (vectors.empty()) { + std::string msg = "Vectors not found in collection " + collection.collection_id_; + LOG_ENGINE_DEBUG_ << msg; + } + return status; } @@ -1280,8 +1292,8 @@ DBImpl::GetVectorIDs(const std::string& collection_id, const std::string& segmen } Status -DBImpl::GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers& id_array, - std::vector& vectors, meta::FilesHolder& files_holder) { +DBImpl::GetVectorsByIdHelper(const IDNumbers& id_array, std::vector& vectors, + meta::FilesHolder& files_holder) { // attention: this is a copy, not a reference, since the files_holder.UnMarkFile will change the array internal milvus::engine::meta::SegmentsSchema files = files_holder.HoldFiles(); LOG_ENGINE_DEBUG_ << "Getting vector by id in " << files.size() << " files, id count = " << id_array.size(); @@ -1298,6 +1310,9 @@ DBImpl::GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers& IDNumbers temp_ids = id_array; for (auto& file : files) { + if (temp_ids.empty()) { + break; // all vectors found, no need to continue + } // Load bloom filter std::string segment_dir; engine::utils::GetParentPath(file.location_, segment_dir); @@ -1380,11 +1395,6 @@ DBImpl::GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers& vectors.emplace_back(data); } - if (vectors.empty()) { - std::string msg = "Vectors not found in collection " + collection_id; - LOG_ENGINE_DEBUG_ << msg; - } - return Status::OK(); } @@ -1395,15 +1405,17 @@ DBImpl::CreateIndex(const std::shared_ptr& context, const std:: return SHUTDOWN_ERROR; } - // serialize memory data - // std::set sync_collection_ids; - // auto status = SyncMemData(sync_collection_ids); + // step 1: wait merge file thread finished to avoid duplicate data bug auto status = Flush(); + WaitMergeFileFinish(); // let merge file thread finish + std::set merge_collection_ids; + StartMergeTask(merge_collection_ids, true); // start force-merge task + WaitMergeFileFinish(); // let force-merge file thread finish { std::unique_lock lock(build_index_mutex_); - // step 1: check index difference + // step 2: check index difference CollectionIndex old_index; status = DescribeIndex(collection_id, old_index); if (!status.ok()) { @@ -1411,7 +1423,7 @@ DBImpl::CreateIndex(const std::shared_ptr& context, const std:: return status; } - // step 2: update index info + // step 3: update index info CollectionIndex new_index = index; new_index.metric_type_ = old_index.metric_type_; // dont change metric type, it was defined by CreateCollection if (!utils::IsSameIndex(old_index, new_index)) { @@ -1422,11 +1434,6 @@ DBImpl::CreateIndex(const std::shared_ptr& context, const std:: } } - // step 3: wait merge file thread finished to avoid duplicate data bug - WaitMergeFileFinish(); // let merge file thread finish - StartMergeTask(true); // start force-merge task - WaitMergeFileFinish(); // let force-merge file thread finish - // step 4: wait and build index status = index_failed_checker_.CleanFailedIndexFileOfCollection(collection_id); status = WaitCollectionIndexRecursively(context, collection_id, index); @@ -1451,7 +1458,8 @@ DBImpl::DropIndex(const std::string& collection_id) { LOG_ENGINE_DEBUG_ << "Drop index for collection: " << collection_id; auto status = DropCollectionIndexRecursively(collection_id); - StartMergeTask(); // merge small files after drop index + std::set merge_collection_ids = {collection_id}; + StartMergeTask(merge_collection_ids, true); // merge small files after drop index return status; } @@ -1493,7 +1501,7 @@ DBImpl::QueryByIDs(const std::shared_ptr& context, const std::s // get target vectors data std::vector vectors; - status = GetVectorsByID(collection_id, id_array, vectors); + status = GetVectorsByID(collection_schema, id_array, vectors); if (!status.ok()) { std::string msg = "Failed to get vector data for collection: " + collection_id; LOG_ENGINE_ERROR_ << msg; @@ -1897,6 +1905,7 @@ DBImpl::HybridQueryAsync(const std::shared_ptr& context, const void DBImpl::BackgroundIndexThread() { + SetThreadName("index_thread"); server::SystemInfo::GetInstance().Init(); while (true) { if (!initialized_.load(std::memory_order_acquire)) { @@ -1965,7 +1974,7 @@ DBImpl::StartMetricTask() { } void -DBImpl::StartMergeTask(bool force_merge_all) { +DBImpl::StartMergeTask(const std::set& merge_collection_ids, bool force_merge_all) { // LOG_ENGINE_DEBUG_ << "Begin StartMergeTask"; // merge task has been finished? { @@ -1982,21 +1991,9 @@ DBImpl::StartMergeTask(bool force_merge_all) { { std::lock_guard lck(merge_result_mutex_); if (merge_thread_results_.empty()) { - // collect merge files for all collections(if merge_collection_ids_ is empty) for two reasons: - // 1. other collections may still has un-merged files - // 2. server may be closed unexpected, these un-merge files need to be merged when server restart - if (merge_collection_ids_.empty()) { - std::vector collection_schema_array; - meta_ptr_->AllCollections(collection_schema_array); - for (auto& schema : collection_schema_array) { - merge_collection_ids_.insert(schema.collection_id_); - } - } - // start merge file thread merge_thread_results_.push_back( - merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, merge_collection_ids_, force_merge_all)); - merge_collection_ids_.clear(); + merge_thread_pool_.enqueue(&DBImpl::BackgroundMerge, this, merge_collection_ids, force_merge_all)); } } @@ -2124,7 +2121,7 @@ DBImpl::BackgroundMerge(std::set collection_ids, bool force_merge_a } } - meta_ptr_->Archive(); + // meta_ptr_->Archive(); { uint64_t timeout = (options_.file_cleanup_timeout_ >= 0) ? options_.file_cleanup_timeout_ : 10; @@ -2163,7 +2160,7 @@ DBImpl::BackgroundBuildIndex() { meta::FilesHolder files_holder; meta_ptr_->FilesToIndex(files_holder); - milvus::engine::meta::SegmentsSchema& to_index_files = files_holder.HoldFiles(); + milvus::engine::meta::SegmentsSchema to_index_files = files_holder.HoldFiles(); Status status = index_failed_checker_.IgnoreFailedIndexFiles(to_index_files); if (!to_index_files.empty()) { @@ -2383,7 +2380,7 @@ DBImpl::WaitCollectionIndexRecursively(const std::shared_ptr& c index_req_swn_.Wait_For(std::chrono::seconds(1)); // client break the connection, no need to block, check every 1 second - if (context->IsConnectionBroken()) { + if (context && context->IsConnectionBroken()) { LOG_ENGINE_DEBUG_ << "Client connection broken, build index in background"; break; // just break, not return, continue to update partitions files to to_index } @@ -2490,10 +2487,11 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { wal_mgr_->CollectionFlushed(collection_id, lsn); } - std::lock_guard lck(merge_result_mutex_); + std::set merge_collection_ids; for (auto& collection : target_collection_names) { - merge_collection_ids_.insert(collection); + merge_collection_ids.insert(collection); } + StartMergeTask(merge_collection_ids); return max_lsn; }; @@ -2505,8 +2503,8 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) { wal_mgr_->PartitionFlushed(collection_id, partition, lsn); } - std::lock_guard lck(merge_result_mutex_); - merge_collection_ids_.insert(target_collection_name); + std::set merge_collection_ids = {target_collection_name}; + StartMergeTask(merge_collection_ids); }; Status status; @@ -2663,8 +2661,6 @@ DBImpl::InternalFlush(const std::string& collection_id) { record.type = wal::MXLogType::Flush; record.collection_id = collection_id; ExecWalRecord(record); - - StartMergeTask(); } void @@ -2747,6 +2743,7 @@ DBImpl::BackgroundFlushThread() { void DBImpl::BackgroundMetricThread() { + SetThreadName("metric_thread"); server::SystemInfo::GetInstance().Init(); while (true) { if (!initialized_.load(std::memory_order_acquire)) { diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index 69a52f729a..385cae39ce 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -78,7 +78,8 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi GetCollectionInfo(const std::string& collection_id, std::string& collection_info) override; Status - PreloadCollection(const std::string& collection_id) override; + PreloadCollection(const std::shared_ptr& context, const std::string& collection_id, + bool force = false) override; Status UpdateCollectionFlag(const std::string& collection_id, int64_t flag) override; @@ -119,10 +120,11 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi Flush() override; Status - Compact(const std::string& collection_id, double threshold = 0.0) override; + Compact(const std::shared_ptr& context, const std::string& collection_id, + double threshold = 0.0) override; Status - GetVectorsByID(const std::string& collection_id, const IDNumbers& id_array, + GetVectorsByID(const engine::meta::CollectionSchema& collection, const IDNumbers& id_array, std::vector& vectors) override; Status @@ -200,8 +202,8 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi ResultIds& result_ids, ResultDistances& result_distances); Status - GetVectorsByIdHelper(const std::string& collection_id, const IDNumbers& id_array, - std::vector& vectors, meta::FilesHolder& files_holder); + GetVectorsByIdHelper(const IDNumbers& id_array, std::vector& vectors, + meta::FilesHolder& files_holder); void InternalFlush(const std::string& collection_id = ""); @@ -228,7 +230,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi StartMetricTask(); void - StartMergeTask(bool force_merge_all = false); + StartMergeTask(const std::set& merge_collection_ids, bool force_merge_all = false); void BackgroundMerge(std::set collection_ids, bool force_merge_all); @@ -243,13 +245,7 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi BackgroundBuildIndex(); Status - CompactFile(const std::string& collection_id, double threshold, const meta::SegmentSchema& file, - meta::SegmentsSchema& files_to_update); - - /* - Status - SyncMemData(std::set& sync_collection_ids); - */ + CompactFile(const meta::SegmentSchema& file, double threshold, meta::SegmentsSchema& files_to_update); Status GetFilesToBuildIndex(const std::string& collection_id, const std::vector& file_types, @@ -355,7 +351,6 @@ class DBImpl : public DB, public server::CacheConfigHandler, public server::Engi ThreadPool merge_thread_pool_; std::mutex merge_result_mutex_; std::list> merge_thread_results_; - std::set merge_collection_ids_; ThreadPool index_thread_pool_; std::mutex index_result_mutex_; diff --git a/core/src/db/Options.h b/core/src/db/Options.h index c2fc7de597..91e7f91b11 100644 --- a/core/src/db/Options.h +++ b/core/src/db/Options.h @@ -75,6 +75,8 @@ struct DBOptions { int64_t auto_flush_interval_ = 1; int64_t file_cleanup_timeout_ = 10; + bool metric_enable_ = false; + // wal relative configurations bool wal_enable_ = true; bool recovery_error_ignore_ = true; diff --git a/core/src/db/meta/Meta.h b/core/src/db/meta/Meta.h index b8be49e442..e4cac45205 100644 --- a/core/src/db/meta/Meta.h +++ b/core/src/db/meta/Meta.h @@ -138,6 +138,10 @@ class Meta { virtual Status FilesByType(const std::string& collection_id, const std::vector& file_types, FilesHolder& files_holder) = 0; + virtual Status + FilesByTypeEx(const std::vector& collections, const std::vector& file_types, + FilesHolder& files_holder) = 0; + virtual Status FilesByID(const std::vector& ids, FilesHolder& files_holder) = 0; diff --git a/core/src/db/meta/MySQLMetaImpl.cpp b/core/src/db/meta/MySQLMetaImpl.cpp index e18c0ef715..68029c6513 100644 --- a/core/src/db/meta/MySQLMetaImpl.cpp +++ b/core/src/db/meta/MySQLMetaImpl.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include "MetaConsts.h" @@ -1637,7 +1638,8 @@ MySQLMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& file std::lock_guard meta_lock(meta_mutex_); mysqlpp::Query statement = connectionPtr->query(); - statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date" + statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date," + << " engine_type, created_on, updated_time" << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id; // End @@ -1665,16 +1667,19 @@ MySQLMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& file collection_file.id_ = resRow["id"]; // implicit conversion resRow["table_id"].to_string(collection_file.collection_id_); resRow["segment_id"].to_string(collection_file.segment_id_); - collection_file.index_file_size_ = collection_schema.index_file_size_; - collection_file.engine_type_ = resRow["engine_type"]; - collection_file.index_params_ = collection_schema.index_params_; - collection_file.metric_type_ = collection_schema.metric_type_; resRow["file_id"].to_string(collection_file.file_id_); collection_file.file_type_ = resRow["file_type"]; collection_file.file_size_ = resRow["file_size"]; collection_file.row_count_ = resRow["row_count"]; collection_file.date_ = resRow["date"]; + collection_file.engine_type_ = resRow["engine_type"]; + collection_file.created_on_ = resRow["created_on"]; + collection_file.updated_time_ = resRow["updated_time"]; + collection_file.dimension_ = collection_schema.dimension_; + collection_file.index_file_size_ = collection_schema.index_file_size_; + collection_file.index_params_ = collection_schema.index_params_; + collection_file.metric_type_ = collection_schema.metric_type_; auto status = utils::GetCollectionFilePath(options_, collection_file); if (!status.ok()) { @@ -1711,18 +1716,15 @@ MySQLMetaImpl::FilesToSearchEx(const std::string& root_collection, const std::se return status; } - // distribute id array to batchs - const int64_t batch_size = 50; + // distribute id array to batches + const uint64_t batch_size = 50; std::vector> id_groups; std::vector temp_group; - int64_t count = 1; for (auto& id : partition_id_array) { temp_group.push_back(id); - count++; - if (count >= batch_size) { + if (temp_group.size() >= batch_size) { id_groups.emplace_back(temp_group); temp_group.clear(); - count = 0; } } @@ -1749,9 +1751,9 @@ MySQLMetaImpl::FilesToSearchEx(const std::string& root_collection, const std::se std::lock_guard meta_lock(meta_mutex_); mysqlpp::Query statement = connectionPtr->query(); - statement - << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date" - << " FROM " << META_TABLEFILES << " WHERE table_id in ("; + statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date," + << " engine_type, created_on, updated_time" + << " FROM " << META_TABLEFILES << " WHERE table_id in ("; for (size_t i = 0; i < group.size(); i++) { statement << mysqlpp::quote << group[i]; if (i != group.size() - 1) { @@ -1776,16 +1778,19 @@ MySQLMetaImpl::FilesToSearchEx(const std::string& root_collection, const std::se collection_file.id_ = resRow["id"]; // implicit conversion resRow["table_id"].to_string(collection_file.collection_id_); resRow["segment_id"].to_string(collection_file.segment_id_); - collection_file.index_file_size_ = collection_schema.index_file_size_; - collection_file.engine_type_ = resRow["engine_type"]; - collection_file.index_params_ = collection_schema.index_params_; - collection_file.metric_type_ = collection_schema.metric_type_; resRow["file_id"].to_string(collection_file.file_id_); collection_file.file_type_ = resRow["file_type"]; collection_file.file_size_ = resRow["file_size"]; collection_file.row_count_ = resRow["row_count"]; collection_file.date_ = resRow["date"]; + collection_file.engine_type_ = resRow["engine_type"]; + collection_file.created_on_ = resRow["created_on"]; + collection_file.updated_time_ = resRow["updated_time"]; + collection_file.dimension_ = collection_schema.dimension_; + collection_file.index_file_size_ = collection_schema.index_file_size_; + collection_file.index_params_ = collection_schema.index_params_; + collection_file.metric_type_ = collection_schema.metric_type_; auto status = utils::GetCollectionFilePath(options_, collection_file); if (!status.ok()) { @@ -1837,8 +1842,8 @@ MySQLMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& files std::lock_guard meta_lock(meta_mutex_); mysqlpp::Query statement = connectionPtr->query(); - statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date, " - "engine_type, created_on" + statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date," + " engine_type, created_on, updated_time" << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id << " AND file_type = " << std::to_string(SegmentSchema::RAW) << " ORDER BY row_count DESC;"; @@ -1861,14 +1866,17 @@ MySQLMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& files resRow["segment_id"].to_string(collection_file.segment_id_); resRow["file_id"].to_string(collection_file.file_id_); collection_file.file_type_ = resRow["file_type"]; + collection_file.file_size_ = resRow["file_size"]; collection_file.row_count_ = resRow["row_count"]; collection_file.date_ = resRow["date"]; - collection_file.index_file_size_ = collection_schema.index_file_size_; collection_file.engine_type_ = resRow["engine_type"]; + collection_file.created_on_ = resRow["created_on"]; + collection_file.updated_time_ = resRow["updated_time"]; + + collection_file.dimension_ = collection_schema.dimension_; + collection_file.index_file_size_ = collection_schema.index_file_size_; collection_file.index_params_ = collection_schema.index_params_; collection_file.metric_type_ = collection_schema.metric_type_; - collection_file.created_on_ = resRow["created_on"]; - collection_file.dimension_ = collection_schema.dimension_; auto status = utils::GetCollectionFilePath(options_, collection_file); if (!status.ok()) { @@ -1911,12 +1919,12 @@ MySQLMetaImpl::FilesToIndex(FilesHolder& files_holder) { std::lock_guard meta_lock(meta_mutex_); mysqlpp::Query statement = connectionPtr->query(); - statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, " - "row_count, date, created_on" + statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date," + << " engine_type, created_on, updated_time" << " FROM " << META_TABLEFILES << " WHERE file_type = " << std::to_string(SegmentSchema::TO_INDEX) << ";"; - LOG_ENGINE_DEBUG_ << "FilesToIndex: " << statement.str(); + // LOG_ENGINE_DEBUG_ << "FilesToIndex: " << statement.str(); res = statement.store(); } // Scoped Connection @@ -1929,13 +1937,14 @@ MySQLMetaImpl::FilesToIndex(FilesHolder& files_holder) { collection_file.id_ = resRow["id"]; // implicit conversion resRow["table_id"].to_string(collection_file.collection_id_); resRow["segment_id"].to_string(collection_file.segment_id_); - collection_file.engine_type_ = resRow["engine_type"]; resRow["file_id"].to_string(collection_file.file_id_); collection_file.file_type_ = resRow["file_type"]; collection_file.file_size_ = resRow["file_size"]; collection_file.row_count_ = resRow["row_count"]; collection_file.date_ = resRow["date"]; + collection_file.engine_type_ = resRow["engine_type"]; collection_file.created_on_ = resRow["created_on"]; + collection_file.updated_time_ = resRow["updated_time"]; auto groupItr = groups.find(collection_file.collection_id_); if (groupItr == groups.end()) { @@ -2003,10 +2012,10 @@ MySQLMetaImpl::FilesByType(const std::string& collection_id, const std::vectorquery(); // since collection_id is a unique column we just need to check whether it exists or not - statement - << "SELECT id, segment_id, engine_type, file_id, file_type, file_size, row_count, date, created_on" - << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id - << " AND file_type in (" << types << ");"; + statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date," + << " engine_type, created_on, updated_time" + << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id + << " AND file_type in (" << types << ");"; LOG_ENGINE_DEBUG_ << "FilesByType: " << statement.str(); @@ -2028,13 +2037,14 @@ MySQLMetaImpl::FilesByType(const std::string& collection_id, const std::vector& collections, const std::vector& file_types, + FilesHolder& files_holder) { + try { + server::MetricCollector metric; + + // distribute id array to batches + const uint64_t batch_size = 50; + std::vector> id_groups; + std::vector temp_group; + std::unordered_map map_collections; + for (auto& collection : collections) { + map_collections.insert(std::make_pair(collection.collection_id_, collection)); + temp_group.push_back(collection.collection_id_); + if (temp_group.size() >= batch_size) { + id_groups.emplace_back(temp_group); + temp_group.clear(); + } + } + + if (!temp_group.empty()) { + id_groups.emplace_back(temp_group); + } + + // perform query batch by batch + Status ret; + int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0; + int to_index_count = 0, index_count = 0, backup_count = 0; + for (auto group : id_groups) { + mysqlpp::StoreQueryResult res; + { + mysqlpp::ScopedConnection connectionPtr(*mysql_connection_pool_, safe_grab_); + + bool is_null_connection = (connectionPtr == nullptr); + fiu_do_on("MySQLMetaImpl.FilesByType.null_connection", is_null_connection = true); + fiu_do_on("MySQLMetaImpl.FilesByType.throw_exception", throw std::exception();); + if (is_null_connection) { + return Status(DB_ERROR, "Failed to connect to meta server(mysql)"); + } + + std::string types; + for (auto type : file_types) { + if (!types.empty()) { + types += ","; + } + types += std::to_string(type); + } + + // to ensure UpdateCollectionFiles to be a atomic operation + std::lock_guard meta_lock(meta_mutex_); + + mysqlpp::Query statement = connectionPtr->query(); + // since collection_id is a unique column we just need to check whether it exists or not + statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date," + << " engine_type, created_on, updated_time" + << " FROM " << META_TABLEFILES << " WHERE table_id in ("; + for (size_t i = 0; i < group.size(); i++) { + statement << mysqlpp::quote << group[i]; + if (i != group.size() - 1) { + statement << ","; + } + } + statement << ") AND file_type in (" << types << ");"; + + LOG_ENGINE_DEBUG_ << "FilesByType: " << statement.str(); + + res = statement.store(); + } // Scoped Connection + + for (auto& resRow : res) { + SegmentSchema file_schema; + file_schema.id_ = resRow["id"]; // implicit conversion + resRow["table_id"].to_string(file_schema.collection_id_); + resRow["segment_id"].to_string(file_schema.segment_id_); + resRow["file_id"].to_string(file_schema.file_id_); + file_schema.file_type_ = resRow["file_type"]; + file_schema.file_size_ = resRow["file_size"]; + file_schema.row_count_ = resRow["row_count"]; + file_schema.date_ = resRow["date"]; + file_schema.engine_type_ = resRow["engine_type"]; + file_schema.created_on_ = resRow["created_on"]; + file_schema.updated_time_ = resRow["updated_time"]; + + auto& collection_schema = map_collections[file_schema.collection_id_]; + file_schema.dimension_ = collection_schema.dimension_; + file_schema.index_file_size_ = collection_schema.index_file_size_; + file_schema.index_params_ = collection_schema.index_params_; + file_schema.metric_type_ = collection_schema.metric_type_; + + auto status = utils::GetCollectionFilePath(options_, file_schema); + if (!status.ok()) { + ret = status; + continue; + } + + files_holder.MarkFile(file_schema); + + int32_t file_type = resRow["file_type"]; + switch (file_type) { + case (int)SegmentSchema::RAW: + ++raw_count; + break; + case (int)SegmentSchema::NEW: + ++new_count; + break; + case (int)SegmentSchema::NEW_MERGE: + ++new_merge_count; + break; + case (int)SegmentSchema::NEW_INDEX: + ++new_index_count; + break; + case (int)SegmentSchema::TO_INDEX: + ++to_index_count; + break; + case (int)SegmentSchema::INDEX: + ++index_count; + break; + case (int)SegmentSchema::BACKUP: + ++backup_count; + break; + default: + break; + } + } + } + + std::string msg = "Get collection files by type."; + for (int file_type : file_types) { + switch (file_type) { + case (int)SegmentSchema::RAW: + msg = msg + " raw files:" + std::to_string(raw_count); + break; + case (int)SegmentSchema::NEW: + msg = msg + " new files:" + std::to_string(new_count); + break; + case (int)SegmentSchema::NEW_MERGE: + msg = msg + " new_merge files:" + std::to_string(new_merge_count); + break; + case (int)SegmentSchema::NEW_INDEX: + msg = msg + " new_index files:" + std::to_string(new_index_count); + break; + case (int)SegmentSchema::TO_INDEX: + msg = msg + " to_index files:" + std::to_string(to_index_count); + break; + case (int)SegmentSchema::INDEX: + msg = msg + " index files:" + std::to_string(index_count); + break; + case (int)SegmentSchema::BACKUP: + msg = msg + " backup files:" + std::to_string(backup_count); + break; + default: + break; + } + } + LOG_ENGINE_DEBUG_ << msg; + return ret; + } catch (std::exception& e) { + return HandleException("Failed to get files by type", e.what()); + } +} + Status MySQLMetaImpl::FilesByID(const std::vector& ids, FilesHolder& files_holder) { if (ids.empty()) { @@ -2136,7 +2307,8 @@ MySQLMetaImpl::FilesByID(const std::vector& ids, FilesHolder& files_hold std::lock_guard meta_lock(meta_mutex_); mysqlpp::Query statement = connectionPtr->query(); - statement << "SELECT id, table_id, segment_id, engine_type, file_id, file_type, file_size, row_count, date" + statement << "SELECT id, table_id, segment_id, file_id, file_type, file_size, row_count, date," + << " engine_type, created_on, updated_time" << " FROM " << META_TABLEFILES; std::stringstream idSS; @@ -2167,12 +2339,14 @@ MySQLMetaImpl::FilesByID(const std::vector& ids, FilesHolder& files_hold collection_file.id_ = resRow["id"]; // implicit conversion resRow["table_id"].to_string(collection_file.collection_id_); resRow["segment_id"].to_string(collection_file.segment_id_); - collection_file.engine_type_ = resRow["engine_type"]; resRow["file_id"].to_string(collection_file.file_id_); collection_file.file_type_ = resRow["file_type"]; collection_file.file_size_ = resRow["file_size"]; collection_file.row_count_ = resRow["row_count"]; collection_file.date_ = resRow["date"]; + collection_file.engine_type_ = resRow["engine_type"]; + collection_file.created_on_ = resRow["created_on"]; + collection_file.updated_time_ = resRow["updated_time"]; if (collections.find(collection_file.collection_id_) == collections.end()) { CollectionSchema collection_schema; @@ -2390,7 +2564,7 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/) << ")" << " AND updated_time < " << std::to_string(now - seconds * US_PS) << ";"; - LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str(); + // LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str(); res = statement.store(); } @@ -2481,7 +2655,7 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/) << " FROM " << META_TABLES << " WHERE state = " << std::to_string(CollectionSchema::TO_DELETE) << ";"; - LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str(); + // LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str(); mysqlpp::StoreQueryResult res = statement.store(); @@ -2539,7 +2713,7 @@ MySQLMetaImpl::CleanUpFilesWithTTL(uint64_t seconds /*, CleanUpFilter* filter*/) << " FROM " << META_TABLEFILES << " WHERE table_id = " << mysqlpp::quote << collection_id << ";"; - LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str(); + // LOG_ENGINE_DEBUG_ << "CleanUpFilesWithTTL: " << statement.str(); mysqlpp::StoreQueryResult res = statement.store(); diff --git a/core/src/db/meta/MySQLMetaImpl.h b/core/src/db/meta/MySQLMetaImpl.h index e327a6372d..65fa5b3015 100644 --- a/core/src/db/meta/MySQLMetaImpl.h +++ b/core/src/db/meta/MySQLMetaImpl.h @@ -124,6 +124,10 @@ class MySQLMetaImpl : public Meta { FilesByType(const std::string& collection_id, const std::vector& file_types, FilesHolder& files_holder) override; + Status + FilesByTypeEx(const std::vector& collections, const std::vector& file_types, + FilesHolder& files_holder) override; + Status FilesByID(const std::vector& ids, FilesHolder& files_holder) override; diff --git a/core/src/db/meta/SqliteMetaImpl.cpp b/core/src/db/meta/SqliteMetaImpl.cpp index b15602b73a..afb58f7591 100644 --- a/core/src/db/meta/SqliteMetaImpl.cpp +++ b/core/src/db/meta/SqliteMetaImpl.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include "MetaConsts.h" #include "db/IDGenerator.h" @@ -1076,7 +1077,8 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& fil // perform query auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_); + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, + &SegmentSchema::created_on_, &SegmentSchema::updated_time_); auto match_collectionid = c(&SegmentSchema::collection_id_) == collection_id; @@ -1104,6 +1106,9 @@ SqliteMetaImpl::FilesToSearch(const std::string& collection_id, FilesHolder& fil collection_file.row_count_ = std::get<6>(file); collection_file.date_ = std::get<7>(file); collection_file.engine_type_ = std::get<8>(file); + collection_file.created_on_ = std::get<9>(file); + collection_file.updated_time_ = std::get<10>(file); + collection_file.dimension_ = collection_schema.dimension_; collection_file.index_file_size_ = collection_schema.index_file_size_; collection_file.index_params_ = collection_schema.index_params_; @@ -1145,18 +1150,15 @@ SqliteMetaImpl::FilesToSearchEx(const std::string& root_collection, return status; } - // distribute id array to batchs - const int64_t batch_size = 50; + // distribute id array to batches + const uint64_t batch_size = 50; std::vector> id_groups; std::vector temp_group; - int64_t count = 1; for (auto& id : partition_id_array) { temp_group.push_back(id); - count++; - if (count >= batch_size) { + if (temp_group.size() >= batch_size) { id_groups.emplace_back(temp_group); temp_group.clear(); - count = 0; } } @@ -1171,7 +1173,8 @@ SqliteMetaImpl::FilesToSearchEx(const std::string& root_collection, auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_); + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, + &SegmentSchema::created_on_, &SegmentSchema::updated_time_); auto match_collectionid = in(&SegmentSchema::collection_id_, group); @@ -1197,6 +1200,8 @@ SqliteMetaImpl::FilesToSearchEx(const std::string& root_collection, collection_file.row_count_ = std::get<6>(file); collection_file.date_ = std::get<7>(file); collection_file.engine_type_ = std::get<8>(file); + collection_file.created_on_ = std::get<9>(file); + collection_file.updated_time_ = std::get<10>(file); collection_file.dimension_ = collection_schema.dimension_; collection_file.index_file_size_ = collection_schema.index_file_size_; collection_file.index_params_ = collection_schema.index_params_; @@ -1239,9 +1244,11 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& file } // get files to merge - auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::created_on_); + auto select_columns = + columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, + &SegmentSchema::created_on_, &SegmentSchema::updated_time_); decltype(ConnectorPtr->select(select_columns)) selected; { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here @@ -1268,7 +1275,10 @@ SqliteMetaImpl::FilesToMerge(const std::string& collection_id, FilesHolder& file collection_file.file_type_ = std::get<4>(file); collection_file.row_count_ = std::get<6>(file); collection_file.date_ = std::get<7>(file); - collection_file.created_on_ = std::get<8>(file); + collection_file.engine_type_ = std::get<8>(file); + collection_file.created_on_ = std::get<9>(file); + collection_file.updated_time_ = std::get<10>(file); + collection_file.dimension_ = collection_schema.dimension_; collection_file.index_file_size_ = collection_schema.index_file_size_; collection_file.index_params_ = collection_schema.index_params_; @@ -1302,10 +1312,11 @@ SqliteMetaImpl::FilesToIndex(FilesHolder& files_holder) { server::MetricCollector metric; - auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, - &SegmentSchema::created_on_); + auto select_columns = + columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, + &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_, + &SegmentSchema::created_on_, &SegmentSchema::updated_time_); decltype(ConnectorPtr->select(select_columns)) selected; { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here @@ -1329,6 +1340,7 @@ SqliteMetaImpl::FilesToIndex(FilesHolder& files_holder) { collection_file.date_ = std::get<7>(file); collection_file.engine_type_ = std::get<8>(file); collection_file.created_on_ = std::get<9>(file); + collection_file.updated_time_ = std::get<10>(file); auto status = utils::GetCollectionFilePath(options_, collection_file); if (!status.ok()) { @@ -1388,7 +1400,8 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::segment_id_, &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, &SegmentSchema::row_count_, - &SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_); + &SegmentSchema::date_, &SegmentSchema::engine_type_, &SegmentSchema::created_on_, + &SegmentSchema::updated_time_); decltype(ConnectorPtr->select(select_columns)) selected; { // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here @@ -1413,6 +1426,7 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, file_schema.date_ = std::get<6>(file); file_schema.engine_type_ = std::get<7>(file); file_schema.created_on_ = std::get<8>(file); + file_schema.updated_time_ = std::get<9>(file); file_schema.dimension_ = collection_schema.dimension_; file_schema.index_file_size_ = collection_schema.index_file_size_; @@ -1476,6 +1490,146 @@ SqliteMetaImpl::FilesByType(const std::string& collection_id, return ret; } +Status +SqliteMetaImpl::FilesByTypeEx(const std::vector& collections, + const std::vector& file_types, + FilesHolder& files_holder) { + if (file_types.empty()) { + return Status(DB_ERROR, "file types array is empty"); + } + + Status ret = Status::OK(); + + try { + fiu_do_on("SqliteMetaImpl.FilesByType.throw_exception", throw std::exception()); + + // distribute id array to batches + const uint64_t batch_size = 50; + std::vector> id_groups; + std::vector temp_group; + std::unordered_map map_collections; + for (auto& collection : collections) { + map_collections.insert(std::make_pair(collection.collection_id_, collection)); + temp_group.push_back(collection.collection_id_); + if (temp_group.size() >= batch_size) { + id_groups.emplace_back(temp_group); + temp_group.clear(); + } + } + + if (!temp_group.empty()) { + id_groups.emplace_back(temp_group); + } + + // perform query batch by batch + Status ret; + int raw_count = 0, new_count = 0, new_merge_count = 0, new_index_count = 0; + int to_index_count = 0, index_count = 0, backup_count = 0; + for (auto group : id_groups) { + auto select_columns = + columns(&SegmentSchema::id_, + &SegmentSchema::collection_id_, + &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, + &SegmentSchema::file_type_, + &SegmentSchema::file_size_, + &SegmentSchema::row_count_, + &SegmentSchema::date_, + &SegmentSchema::engine_type_, + &SegmentSchema::created_on_, + &SegmentSchema::updated_time_); + decltype(ConnectorPtr->select(select_columns)) selected; + + auto match_collectionid = in(&SegmentSchema::collection_id_, group); + + std::vector file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX, + (int)SegmentSchema::INDEX}; + auto match_type = in(&SegmentSchema::file_type_, file_types); + { + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + auto filter = where(match_collectionid and match_type); + selected = ConnectorPtr->select(select_columns, filter); + } + + for (auto& file : selected) { + SegmentSchema file_schema; + file_schema.id_ = std::get<0>(file); + file_schema.collection_id_ = std::get<1>(file); + file_schema.segment_id_ = std::get<2>(file); + file_schema.file_id_ = std::get<3>(file); + file_schema.file_type_ = std::get<4>(file); + file_schema.file_size_ = std::get<5>(file); + file_schema.row_count_ = std::get<6>(file); + file_schema.date_ = std::get<7>(file); + file_schema.engine_type_ = std::get<8>(file); + file_schema.created_on_ = std::get<9>(file); + file_schema.updated_time_ = std::get<10>(file); + + auto& collection_schema = map_collections[file_schema.collection_id_]; + file_schema.dimension_ = collection_schema.dimension_; + file_schema.index_file_size_ = collection_schema.index_file_size_; + file_schema.index_params_ = collection_schema.index_params_; + file_schema.metric_type_ = collection_schema.metric_type_; + + switch (file_schema.file_type_) { + case (int)SegmentSchema::RAW:++raw_count; + break; + case (int)SegmentSchema::NEW:++new_count; + break; + case (int)SegmentSchema::NEW_MERGE:++new_merge_count; + break; + case (int)SegmentSchema::NEW_INDEX:++new_index_count; + break; + case (int)SegmentSchema::TO_INDEX:++to_index_count; + break; + case (int)SegmentSchema::INDEX:++index_count; + break; + case (int)SegmentSchema::BACKUP:++backup_count; + break; + default:break; + } + + auto status = utils::GetCollectionFilePath(options_, file_schema); + if (!status.ok()) { + ret = status; + } + + files_holder.MarkFile(file_schema); + } + } + + std::string msg = "Get collection files by type."; + for (int file_type : file_types) { + switch (file_type) { + case (int)SegmentSchema::RAW:msg = msg + " raw files:" + std::to_string(raw_count); + break; + case (int)SegmentSchema::NEW:msg = msg + " new files:" + std::to_string(new_count); + break; + case (int)SegmentSchema::NEW_MERGE: + msg = msg + " new_merge files:" + std::to_string(new_merge_count); + break; + case (int)SegmentSchema::NEW_INDEX: + msg = msg + " new_index files:" + std::to_string(new_index_count); + break; + case (int)SegmentSchema::TO_INDEX: + msg = msg + " to_index files:" + std::to_string(to_index_count); + break; + case (int)SegmentSchema::INDEX:msg = msg + " index files:" + std::to_string(index_count); + break; + case (int)SegmentSchema::BACKUP:msg = msg + " backup files:" + std::to_string(backup_count); + break; + default:break; + } + } + LOG_ENGINE_DEBUG_ << msg; + } catch (std::exception& e) { + return HandleException("Encounter exception when check non index files", e.what()); + } + + return ret; +} + Status SqliteMetaImpl::FilesByID(const std::vector& ids, FilesHolder& files_holder) { if (ids.empty()) { @@ -1486,9 +1640,17 @@ SqliteMetaImpl::FilesByID(const std::vector& ids, FilesHolder& files_hol server::MetricCollector metric; fiu_do_on("SqliteMetaImpl.FilesByID.throw_exception", throw std::exception()); - auto select_columns = columns(&SegmentSchema::id_, &SegmentSchema::collection_id_, &SegmentSchema::segment_id_, - &SegmentSchema::file_id_, &SegmentSchema::file_type_, &SegmentSchema::file_size_, - &SegmentSchema::row_count_, &SegmentSchema::date_, &SegmentSchema::engine_type_); + auto select_columns = columns(&SegmentSchema::id_, + &SegmentSchema::collection_id_, + &SegmentSchema::segment_id_, + &SegmentSchema::file_id_, + &SegmentSchema::file_type_, + &SegmentSchema::file_size_, + &SegmentSchema::row_count_, + &SegmentSchema::date_, + &SegmentSchema::engine_type_, + &SegmentSchema::created_on_, + &SegmentSchema::updated_time_); std::vector file_types = {(int)SegmentSchema::RAW, (int)SegmentSchema::TO_INDEX, (int)SegmentSchema::INDEX}; @@ -1518,6 +1680,8 @@ SqliteMetaImpl::FilesByID(const std::vector& ids, FilesHolder& files_hol collection_file.row_count_ = std::get<6>(file); collection_file.date_ = std::get<7>(file); collection_file.engine_type_ = std::get<8>(file); + collection_file.created_on_ = std::get<9>(file); + collection_file.updated_time_ = std::get<10>(file); if (collections.find(collection_file.collection_id_) == collections.end()) { CollectionSchema collection_schema; @@ -1943,6 +2107,9 @@ SqliteMetaImpl::SetGlobalLastLSN(uint64_t lsn) { try { server::MetricCollector metric; + // multi-threads call sqlite update may get exception('bad logic', etc), so we add a lock here + std::lock_guard meta_lock(meta_mutex_); + auto selected = ConnectorPtr->select(columns(&EnvironmentSchema::global_lsn_)); if (selected.size() == 0) { EnvironmentSchema env; diff --git a/core/src/db/meta/SqliteMetaImpl.h b/core/src/db/meta/SqliteMetaImpl.h index ba46351a88..9e0ce20dca 100644 --- a/core/src/db/meta/SqliteMetaImpl.h +++ b/core/src/db/meta/SqliteMetaImpl.h @@ -126,6 +126,10 @@ class SqliteMetaImpl : public Meta { FilesByType(const std::string& collection_id, const std::vector& file_types, FilesHolder& files_holder) override; + Status + FilesByTypeEx(const std::vector& collections, const std::vector& file_types, + FilesHolder& files_holder) override; + Status FilesByID(const std::vector& ids, FilesHolder& files_holder) override; diff --git a/core/src/db/wal/WalManager.cpp b/core/src/db/wal/WalManager.cpp index 4c7d5f8e2d..6d32b66837 100644 --- a/core/src/db/wal/WalManager.cpp +++ b/core/src/db/wal/WalManager.cpp @@ -260,8 +260,10 @@ WalManager::GetNextRecord(MXLogRecord& record) { } } - LOG_WAL_INFO_ << "record type " << (int32_t)record.type << " collection " << record.collection_id << " lsn " - << record.lsn; + if (record.type != MXLogType::None) { + LOG_WAL_INFO_ << "record type " << (int32_t)record.type << " collection " << record.collection_id << " lsn " + << record.lsn; + } return error_code; } diff --git a/core/src/server/DBWrapper.cpp b/core/src/server/DBWrapper.cpp index f1ca8aa8dc..4c44ed3d58 100644 --- a/core/src/server/DBWrapper.cpp +++ b/core/src/server/DBWrapper.cpp @@ -70,6 +70,13 @@ DBWrapper::StartService() { return s; } + // metric config + s = config.GetMetricConfigEnableMonitor(opt.metric_enable_); + if (!s.ok()) { + std::cerr << s.ToString() << std::endl; + return s; + } + // cache config s = config.GetCacheConfigCacheInsertData(opt.insert_cache_immediately_); if (!s.ok()) { @@ -255,7 +262,7 @@ DBWrapper::PreloadCollections(const std::string& preload_collections) { db_->AllCollections(table_schema_array); for (auto& schema : table_schema_array) { - auto status = db_->PreloadCollection(schema.collection_id_); + auto status = db_->PreloadCollection(nullptr, schema.collection_id_); if (!status.ok()) { return status; } @@ -264,7 +271,7 @@ DBWrapper::PreloadCollections(const std::string& preload_collections) { std::vector collection_names; StringHelpFunctions::SplitStringByDelimeter(preload_collections, ",", collection_names); for (auto& name : collection_names) { - auto status = db_->PreloadCollection(name); + auto status = db_->PreloadCollection(nullptr, name); if (!status.ok()) { return status; } diff --git a/core/src/server/delivery/request/CompactRequest.cpp b/core/src/server/delivery/request/CompactRequest.cpp index 6ee7115e17..58ff387487 100644 --- a/core/src/server/delivery/request/CompactRequest.cpp +++ b/core/src/server/delivery/request/CompactRequest.cpp @@ -70,7 +70,7 @@ CompactRequest::OnExecute() { rc.RecordSection("check validation"); // step 2: check collection existence - status = DBWrapper::DB()->Compact(collection_name_, compact_threshold_); + status = DBWrapper::DB()->Compact(context_, collection_name_, compact_threshold_); if (!status.ok()) { return status; } diff --git a/core/src/server/delivery/request/GetVectorsByIDRequest.cpp b/core/src/server/delivery/request/GetVectorsByIDRequest.cpp index b85567ed6a..f78441a849 100644 --- a/core/src/server/delivery/request/GetVectorsByIDRequest.cpp +++ b/core/src/server/delivery/request/GetVectorsByIDRequest.cpp @@ -83,7 +83,7 @@ GetVectorsByIDRequest::OnExecute() { } // step 2: get vector data, now only support get one id - return DBWrapper::DB()->GetVectorsByID(collection_name_, ids_, vectors_); + return DBWrapper::DB()->GetVectorsByID(collection_schema, ids_, vectors_); } catch (std::exception& ex) { return Status(SERVER_UNEXPECTED_ERROR, ex.what()); } diff --git a/core/src/server/delivery/request/PreloadCollectionRequest.cpp b/core/src/server/delivery/request/PreloadCollectionRequest.cpp index ff7fcee6cf..f015d5d6ec 100644 --- a/core/src/server/delivery/request/PreloadCollectionRequest.cpp +++ b/core/src/server/delivery/request/PreloadCollectionRequest.cpp @@ -60,8 +60,9 @@ PreloadCollectionRequest::OnExecute() { } } - // step 2: check collection existence - status = DBWrapper::DB()->PreloadCollection(collection_name_); + // step 2: force load collection data into cache + // load each segment and insert into cache even cache capacity is not enough + status = DBWrapper::DB()->PreloadCollection(context_, collection_name_, true); fiu_do_on("PreloadCollectionRequest.OnExecute.preload_collection_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, "")); fiu_do_on("PreloadCollectionRequest.OnExecute.throw_std_exception", throw std::exception()); diff --git a/core/unittest/db/test_db.cpp b/core/unittest/db/test_db.cpp index c55508c211..133ceadd94 100644 --- a/core/unittest/db/test_db.cpp +++ b/core/unittest/db/test_db.cpp @@ -32,7 +32,7 @@ namespace { static const char* COLLECTION_NAME = "test_group"; static constexpr int64_t COLLECTION_DIM = 256; -static constexpr int64_t VECTOR_COUNT = 25000; +static constexpr int64_t VECTOR_COUNT = 5000; static constexpr int64_t INSERT_LOOP = 100; static constexpr int64_t SECONDS_EACH_HOUR = 3600; static constexpr int64_t DAY_SECONDS = 24 * 60 * 60; @@ -180,7 +180,7 @@ TEST_F(DBTest, DB_TEST) { milvus::engine::ResultIds result_ids; milvus::engine::ResultDistances result_distances; int k = 10; - std::this_thread::sleep_for(std::chrono::seconds(2)); + std::this_thread::sleep_for(std::chrono::seconds(1)); INIT_TIMER; std::stringstream ss; @@ -214,7 +214,7 @@ TEST_F(DBTest, DB_TEST) { /* LOG(DEBUG) << ss.str(); */ } ASSERT_TRUE(count >= prev_count); - std::this_thread::sleep_for(std::chrono::seconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } }); @@ -236,7 +236,7 @@ TEST_F(DBTest, DB_TEST) { stat = db_->Flush(); ASSERT_TRUE(stat.ok()); - std::this_thread::sleep_for(std::chrono::microseconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); } search.join(); @@ -455,34 +455,34 @@ TEST_F(DBTest, PRELOAD_TEST) { db_->CreateIndex(dummy_context_, COLLECTION_NAME, index); // wait until build index finish int64_t prev_cache_usage = milvus::cache::CpuCacheMgr::GetInstance()->CacheUsage(); - stat = db_->PreloadCollection(COLLECTION_NAME); + stat = db_->PreloadCollection(dummy_context_, COLLECTION_NAME); ASSERT_TRUE(stat.ok()); int64_t cur_cache_usage = milvus::cache::CpuCacheMgr::GetInstance()->CacheUsage(); ASSERT_TRUE(prev_cache_usage < cur_cache_usage); FIU_ENABLE_FIU("SqliteMetaImpl.FilesToSearch.throw_exception"); - stat = db_->PreloadCollection(COLLECTION_NAME); + stat = db_->PreloadCollection(dummy_context_, COLLECTION_NAME); ASSERT_FALSE(stat.ok()); fiu_disable("SqliteMetaImpl.FilesToSearch.throw_exception"); // create a partition stat = db_->CreatePartition(COLLECTION_NAME, "part0", "0"); ASSERT_TRUE(stat.ok()); - stat = db_->PreloadCollection(COLLECTION_NAME); + stat = db_->PreloadCollection(dummy_context_, COLLECTION_NAME); ASSERT_TRUE(stat.ok()); FIU_ENABLE_FIU("DBImpl.PreloadCollection.null_engine"); - stat = db_->PreloadCollection(COLLECTION_NAME); + stat = db_->PreloadCollection(dummy_context_, COLLECTION_NAME); ASSERT_FALSE(stat.ok()); fiu_disable("DBImpl.PreloadCollection.null_engine"); FIU_ENABLE_FIU("DBImpl.PreloadCollection.exceed_cache"); - stat = db_->PreloadCollection(COLLECTION_NAME); + stat = db_->PreloadCollection(dummy_context_, COLLECTION_NAME); ASSERT_FALSE(stat.ok()); fiu_disable("DBImpl.PreloadCollection.exceed_cache"); FIU_ENABLE_FIU("DBImpl.PreloadCollection.engine_throw_exception"); - stat = db_->PreloadCollection(COLLECTION_NAME); + stat = db_->PreloadCollection(dummy_context_, COLLECTION_NAME); ASSERT_FALSE(stat.ok()); fiu_disable("DBImpl.PreloadCollection.engine_throw_exception"); } @@ -535,15 +535,15 @@ TEST_F(DBTest, SHUTDOWN_TEST) { stat = db_->DeleteVectors(collection_info.collection_id_, ids_to_delete); ASSERT_FALSE(stat.ok()); - stat = db_->Compact(collection_info.collection_id_); + stat = db_->Compact(dummy_context_, collection_info.collection_id_); ASSERT_FALSE(stat.ok()); std::vector vectors; std::vector id_array = {0}; - stat = db_->GetVectorsByID(collection_info.collection_id_, id_array, vectors); + stat = db_->GetVectorsByID(collection_info, id_array, vectors); ASSERT_FALSE(stat.ok()); - stat = db_->PreloadCollection(collection_info.collection_id_); + stat = db_->PreloadCollection(dummy_context_, collection_info.collection_id_); ASSERT_FALSE(stat.ok()); uint64_t row_count = 0; @@ -612,7 +612,6 @@ TEST_F(DBTest, BACK_TIMER_THREAD_1) { ASSERT_EQ(xb.id_array_.size(), nb); } - std::this_thread::sleep_for(std::chrono::seconds(2)); db_->Stop(); fiu_disable("DBImpl.StartMetricTask.InvalidTotalCache"); fiu_disable("SqliteMetaImpl.FilesToMerge.throw_exception"); @@ -620,7 +619,6 @@ TEST_F(DBTest, BACK_TIMER_THREAD_1) { FIU_ENABLE_FIU("DBImpl.StartMetricTask.InvalidTotalCache"); db_->Start(); - std::this_thread::sleep_for(std::chrono::seconds(2)); db_->Stop(); fiu_disable("DBImpl.StartMetricTask.InvalidTotalCache"); } @@ -644,7 +642,6 @@ TEST_F(DBTest, BACK_TIMER_THREAD_2) { } FIU_ENABLE_FIU("SqliteMetaImpl.CreateCollectionFile.throw_exception"); - std::this_thread::sleep_for(std::chrono::seconds(2)); db_->Stop(); fiu_disable("SqliteMetaImpl.CreateCollectionFile.throw_exception"); } @@ -669,7 +666,6 @@ TEST_F(DBTest, BACK_TIMER_THREAD_3) { FIU_ENABLE_FIU("DBImpl.MergeFiles.Serialize_ThrowException"); db_->Start(); - std::this_thread::sleep_for(std::chrono::seconds(2)); db_->Stop(); fiu_disable("DBImpl.MergeFiles.Serialize_ThrowException"); } @@ -694,7 +690,6 @@ TEST_F(DBTest, BACK_TIMER_THREAD_4) { FIU_ENABLE_FIU("DBImpl.MergeFiles.Serialize_ErrorStatus"); db_->Start(); - std::this_thread::sleep_for(std::chrono::seconds(2)); db_->Stop(); fiu_disable("DBImpl.MergeFiles.Serialize_ErrorStatus"); } @@ -934,11 +929,9 @@ TEST_F(DBTest2, ARHIVE_DISK_CHECK) { BuildVectors(nb, i, xb); db_->InsertVectors(COLLECTION_NAME, "", xb); - std::this_thread::sleep_for(std::chrono::microseconds(1)); } - std::this_thread::sleep_for(std::chrono::seconds(1)); - + db_->Flush(); db_->Size(size); LOG(DEBUG) << "size=" << size; ASSERT_LE(size, 1 * milvus::engine::GB); @@ -981,8 +974,6 @@ TEST_F(DBTest2, DELETE_TEST) { fiu_disable("DBImpl.DropCollectionRecursively.failed"); stat = db_->DropCollection(COLLECTION_NAME); - - std::this_thread::sleep_for(std::chrono::seconds(2)); ASSERT_TRUE(stat.ok()); db_->HasCollection(COLLECTION_NAME, has_collection); @@ -1183,7 +1174,9 @@ TEST_F(DBTest2, FLUSH_NON_EXISTING_COLLECTION) { TEST_F(DBTest2, GET_VECTOR_NON_EXISTING_COLLECTION) { std::vector vectors; std::vector id_array = {0}; - auto status = db_->GetVectorsByID("non_existing", id_array, vectors); + milvus::engine::meta::CollectionSchema collection_info; + collection_info.collection_id_ = "non_existing"; + auto status = db_->GetVectorsByID(collection_info, id_array, vectors); ASSERT_FALSE(status.ok()); } @@ -1203,7 +1196,7 @@ TEST_F(DBTest2, GET_VECTOR_BY_ID_TEST) { std::vector vectors; std::vector empty_array; - stat = db_->GetVectorsByID(COLLECTION_NAME, empty_array, vectors); + stat = db_->GetVectorsByID(collection_info, empty_array, vectors); ASSERT_FALSE(stat.ok()); stat = db_->InsertVectors(collection_info.collection_id_, partition_tag, qxb); @@ -1211,7 +1204,7 @@ TEST_F(DBTest2, GET_VECTOR_BY_ID_TEST) { db_->Flush(collection_info.collection_id_); - stat = db_->GetVectorsByID(COLLECTION_NAME, qxb.id_array_, vectors); + stat = db_->GetVectorsByID(collection_info, qxb.id_array_, vectors); ASSERT_TRUE(stat.ok()); ASSERT_EQ(vectors.size(), qxb.id_array_.size()); ASSERT_EQ(vectors[0].float_data_.size(), COLLECTION_DIM); @@ -1221,7 +1214,7 @@ TEST_F(DBTest2, GET_VECTOR_BY_ID_TEST) { } std::vector invalid_array = {-1, -1}; - stat = db_->GetVectorsByID(COLLECTION_NAME, empty_array, vectors); + stat = db_->GetVectorsByID(collection_info, empty_array, vectors); ASSERT_TRUE(stat.ok()); for (auto& vector : vectors) { ASSERT_EQ(vector.vector_count_, 0); @@ -1344,7 +1337,7 @@ TEST_F(DBTest2, SEARCH_WITH_DIFFERENT_INDEX) { stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_TRUE(stat.ok()); - stat = db_->PreloadCollection(collection_info.collection_id_); + stat = db_->PreloadCollection(dummy_context_, collection_info.collection_id_); ASSERT_TRUE(stat.ok()); int topk = 10, nprobe = 10; @@ -1369,7 +1362,7 @@ result_distances); stat = db_->CreateIndex(dummy_context_, collection_info.collection_id_, index); ASSERT_TRUE(stat.ok()); - stat = db_->PreloadCollection(collection_info.collection_id_); + stat = db_->PreloadCollection(dummy_context_, collection_info.collection_id_); ASSERT_TRUE(stat.ok()); for (auto id : ids_to_search) { diff --git a/core/unittest/db/test_db_mysql.cpp b/core/unittest/db/test_db_mysql.cpp index cb9b8fd54a..64509c661a 100644 --- a/core/unittest/db/test_db_mysql.cpp +++ b/core/unittest/db/test_db_mysql.cpp @@ -72,7 +72,7 @@ TEST_F(MySqlDBTest, DB_TEST) { milvus::engine::ResultIds result_ids; milvus::engine::ResultDistances result_distances; int k = 10; - std::this_thread::sleep_for(std::chrono::seconds(2)); + std::this_thread::sleep_for(std::chrono::seconds(1)); INIT_TIMER; std::stringstream ss; @@ -106,7 +106,7 @@ TEST_F(MySqlDBTest, DB_TEST) { /* LOG(DEBUG) << ss.str(); */ } ASSERT_TRUE(count >= prev_count); - std::this_thread::sleep_for(std::chrono::seconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } }); @@ -128,7 +128,7 @@ TEST_F(MySqlDBTest, DB_TEST) { stat = db_->Flush(); ASSERT_TRUE(stat.ok()); - std::this_thread::sleep_for(std::chrono::microseconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); } search.join(); @@ -183,7 +183,6 @@ TEST_F(MySqlDBTest, SEARCH_TEST) { stat = db_->InsertVectors(COLLECTION_NAME, "", xb); ASSERT_TRUE(stat.ok()); - // sleep(2); // wait until build index finish stat = db_->Flush(); ASSERT_TRUE(stat.ok()); @@ -241,10 +240,8 @@ TEST_F(MySqlDBTest, ARHIVE_DISK_CHECK) { milvus::engine::VectorsData xb; BuildVectors(nb, i, xb); db_->InsertVectors(COLLECTION_NAME, "", xb); - std::this_thread::sleep_for(std::chrono::microseconds(1)); } - // std::this_thread::sleep_for(std::chrono::seconds(1)); stat = db_->Flush(); ASSERT_TRUE(stat.ok()); @@ -288,16 +285,12 @@ TEST_F(MySqlDBTest, DELETE_TEST) { milvus::engine::VectorsData xb; BuildVectors(nb, i, xb); db_->InsertVectors(COLLECTION_NAME, "", xb); - std::this_thread::sleep_for(std::chrono::microseconds(1)); } stat = db_->Flush(); ASSERT_TRUE(stat.ok()); stat = db_->DropCollection(COLLECTION_NAME); - //// std::cout << "5 sec start" << std::endl; - // std::this_thread::sleep_for(std::chrono::seconds(5)); - //// std::cout << "5 sec finish" << std::endl; ASSERT_TRUE(stat.ok()); // db_->HasCollection(COLLECTION_NAME, has_collection); diff --git a/core/unittest/db/test_delete.cpp b/core/unittest/db/test_delete.cpp index 47f91202ee..1feb5b8241 100644 --- a/core/unittest/db/test_delete.cpp +++ b/core/unittest/db/test_delete.cpp @@ -287,7 +287,7 @@ TEST_F(DeleteTest, delete_before_create_index) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM); - int64_t nb = 10000; + int64_t nb = 5000; milvus::engine::VectorsData xb; BuildVectors(nb, xb); @@ -369,7 +369,7 @@ TEST_F(DeleteTest, delete_with_index) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM); - int64_t nb = 10000; + int64_t nb = 5000; milvus::engine::VectorsData xb; BuildVectors(nb, xb); @@ -451,7 +451,7 @@ TEST_F(DeleteTest, delete_multiple_times_with_index) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM); - int64_t nb = 100000; + int64_t nb = 5000; milvus::engine::VectorsData xb; BuildVectors(nb, xb); @@ -749,7 +749,7 @@ TEST_F(CompactTest, compact_basic) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(row_count, nb - 2); - stat = db_->Compact(collection_info.collection_id_); + stat = db_->Compact(dummy_context_, collection_info.collection_id_); ASSERT_TRUE(stat.ok()); const int topk = 1, nprobe = 1; @@ -834,7 +834,7 @@ TEST_F(CompactTest, compact_with_index) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(row_count, nb - ids_to_delete.size()); - stat = db_->Compact(collection_info.collection_id_); + stat = db_->Compact(dummy_context_, collection_info.collection_id_); ASSERT_TRUE(stat.ok()); stat = db_->GetCollectionRowCount(collection_info.collection_id_, row_count); @@ -864,6 +864,6 @@ TEST_F(CompactTest, compact_with_index) { } TEST_F(CompactTest, compact_non_existing_table) { - auto status = db_->Compact("non_existing_table"); + auto status = db_->Compact(dummy_context_, "non_existing_table"); ASSERT_FALSE(status.ok()); } diff --git a/core/unittest/db/test_hybrid_db.cpp b/core/unittest/db/test_hybrid_db.cpp index eb62db0067..6d09b8649f 100644 --- a/core/unittest/db/test_hybrid_db.cpp +++ b/core/unittest/db/test_hybrid_db.cpp @@ -282,7 +282,7 @@ TEST_F(DBTest, COMPACT_TEST) { stat = db_->Flush(); ASSERT_TRUE(stat.ok()); - stat = db_->Compact(collection_info.collection_id_); + stat = db_->Compact(dummy_context_, collection_info.collection_id_); ASSERT_TRUE(stat.ok()); const int topk = 1, nprobe = 1; diff --git a/core/unittest/db/test_mem.cpp b/core/unittest/db/test_mem.cpp index fce750e067..46e8406644 100644 --- a/core/unittest/db/test_mem.cpp +++ b/core/unittest/db/test_mem.cpp @@ -318,20 +318,15 @@ TEST_F(MemManagerTest2, INSERT_TEST) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM); - auto start_time = METRICS_NOW_TIME; - - int insert_loop = 20; + int insert_loop = 10; for (int i = 0; i < insert_loop; ++i) { - int64_t nb = 40960; + int64_t nb = 4096; milvus::engine::VectorsData xb; BuildVectors(nb, xb); milvus::engine::IDNumbers vector_ids; stat = db_->InsertVectors(GetCollectionName(), "", xb); ASSERT_TRUE(stat.ok()); } - auto end_time = METRICS_NOW_TIME; - auto total_time = METRICS_MICROSECONDS(start_time, end_time); - LOG(DEBUG) << "total_time spent in INSERT_TEST (ms) : " << total_time; } TEST_F(MemManagerTest2, INSERT_BINARY_TEST) { diff --git a/core/unittest/db/test_search_by_id.cpp b/core/unittest/db/test_search_by_id.cpp index 812d80268b..b8b4cfe359 100644 --- a/core/unittest/db/test_search_by_id.cpp +++ b/core/unittest/db/test_search_by_id.cpp @@ -86,7 +86,7 @@ TEST_F(SearchByIdTest, BASIC_TEST) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM); - int64_t nb = 100000; + int64_t nb = 10000; milvus::engine::VectorsData xb; BuildVectors(nb, xb); @@ -185,7 +185,7 @@ TEST_F(SearchByIdTest, WITH_INDEX_TEST) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM); - int64_t nb = 10000; + int64_t nb = 5000; milvus::engine::VectorsData xb; BuildVectors(nb, xb); @@ -246,7 +246,7 @@ TEST_F(SearchByIdTest, WITH_DELETE_TEST) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM); - int64_t nb = 100000; + int64_t nb = 10000; milvus::engine::VectorsData xb; BuildVectors(nb, xb); @@ -315,7 +315,7 @@ TEST_F(GetVectorByIdTest, BASIC_TEST) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM); - int64_t nb = 100000; + int64_t nb = 10000; milvus::engine::VectorsData xb; BuildVectors(nb, xb); @@ -349,7 +349,7 @@ TEST_F(GetVectorByIdTest, BASIC_TEST) { milvus::engine::ResultDistances result_distances; std::vector vectors; - stat = db_->GetVectorsByID(collection_info.collection_id_, ids_to_search, vectors); + stat = db_->GetVectorsByID(collection_info, ids_to_search, vectors); ASSERT_TRUE(stat.ok()); stat = db_->Query(dummy_context_, collection_info.collection_id_, tags, topk, json_params, vectors[0], result_ids, @@ -369,7 +369,7 @@ TEST_F(GetVectorByIdTest, WITH_INDEX_TEST) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM); - int64_t nb = 10000; + int64_t nb = 5000; milvus::engine::VectorsData xb; BuildVectors(nb, xb); @@ -409,7 +409,7 @@ TEST_F(GetVectorByIdTest, WITH_INDEX_TEST) { milvus::engine::ResultDistances result_distances; std::vector vectors; - stat = db_->GetVectorsByID(collection_info.collection_id_, ids_to_search, vectors); + stat = db_->GetVectorsByID(collection_info, ids_to_search, vectors); ASSERT_TRUE(stat.ok()); stat = db_->Query(dummy_context_, collection_info.collection_id_, tags, topk, json_params, vectors[0], result_ids, @@ -429,7 +429,7 @@ TEST_F(GetVectorByIdTest, WITH_DELETE_TEST) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(collection_info_get.dimension_, COLLECTION_DIM); - int64_t nb = 100000; + int64_t nb = 10000; milvus::engine::VectorsData xb; BuildVectors(nb, xb); @@ -469,7 +469,7 @@ TEST_F(GetVectorByIdTest, WITH_DELETE_TEST) { milvus::engine::ResultDistances result_distances; std::vector vectors; - stat = db_->GetVectorsByID(collection_info.collection_id_, ids_to_search, vectors); + stat = db_->GetVectorsByID(collection_info, ids_to_search, vectors); ASSERT_TRUE(stat.ok()); for (auto& vector : vectors) { ASSERT_EQ(vector.vector_count_, 0); @@ -541,7 +541,7 @@ TEST_F(SearchByIdTest, BINARY_TEST) { milvus::engine::ResultDistances result_distances; std::vector vectors; - stat = db_->GetVectorsByID(collection_info.collection_id_, ids_to_search, vectors); + stat = db_->GetVectorsByID(collection_info, ids_to_search, vectors); ASSERT_TRUE(stat.ok()); ASSERT_EQ(vectors.size(), ids_to_search.size()); diff --git a/core/unittest/metrics/test_metrics.cpp b/core/unittest/metrics/test_metrics.cpp index 304b33e193..bbf298853f 100644 --- a/core/unittest/metrics/test_metrics.cpp +++ b/core/unittest/metrics/test_metrics.cpp @@ -89,7 +89,7 @@ TEST_F(MetricTest, METRIC_TEST) { // std::vector tags; // milvus::engine::ResultIds result_ids; // milvus::engine::ResultDistances result_distances; - std::this_thread::sleep_for(std::chrono::seconds(2)); + std::this_thread::sleep_for(std::chrono::seconds(1)); INIT_TIMER; std::stringstream ss; @@ -115,11 +115,11 @@ TEST_F(MetricTest, METRIC_TEST) { // } } ASSERT_TRUE(count >= prev_count); - std::this_thread::sleep_for(std::chrono::seconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } }); - int loop = 10000; + int loop = 100; for (auto i = 0; i < loop; ++i) { if (i == 40) { @@ -131,7 +131,7 @@ TEST_F(MetricTest, METRIC_TEST) { db_->InsertVectors(group_name, "", xb); ASSERT_EQ(xb.id_array_.size(), nb); } - std::this_thread::sleep_for(std::chrono::microseconds(2000)); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); } search.join(); diff --git a/core/unittest/server/test_util.cpp b/core/unittest/server/test_util.cpp index ed3ebcabf1..c6a149e641 100644 --- a/core/unittest/server/test_util.cpp +++ b/core/unittest/server/test_util.cpp @@ -813,7 +813,7 @@ TEST(UtilTest, ROLLOUTHANDLER_TEST) { TEST(UtilTest, THREADPOOL_TEST) { auto thread_pool_ptr = std::make_unique(3); auto fun = [](int i) { - sleep(1); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); }; for (int i = 0; i < 10; ++i) { thread_pool_ptr->enqueue(fun, i);