diff --git a/core/src/CMakeLists.txt b/core/src/CMakeLists.txt index 573ef524d2..dc65c3eb75 100644 --- a/core/src/CMakeLists.txt +++ b/core/src/CMakeLists.txt @@ -58,20 +58,34 @@ add_subdirectory( db ) # target milvus_engine add_subdirectory( log ) add_subdirectory( server ) +set(link_lib + milvus_engine + config + metrics + tracing + log + oatpp + query + utils + ) + +if (MILVUS_WITH_PROMETHEUS) + set(link_lib + ${link_lib} + # dependency prometheus + prometheus-cpp-push + prometheus-cpp-pull + prometheus-cpp-core + ) +endif () + +set(link_lib + ${link_lib} + curl + ) + target_link_libraries( server - PUBLIC milvus_engine - config - metrics - tracing - log - oatpp - query - utils - # dependency prometheus - prometheus-cpp-push - prometheus-cpp-pull - prometheus-cpp-core - curl + PUBLIC ${link_lib} ) # **************************** Get&Print Include Directories **************************** diff --git a/core/src/codecs/DeletedDocsFormat.cpp b/core/src/codecs/DeletedDocsFormat.cpp index 87b2f65e68..dae1bb65ff 100644 --- a/core/src/codecs/DeletedDocsFormat.cpp +++ b/core/src/codecs/DeletedDocsFormat.cpp @@ -90,7 +90,7 @@ DeletedDocsFormat::Write(const storage::FSHandlerPtr& fs_ptr, const std::string& throw Exception(SERVER_WRITE_ERROR, err_msg); } fs_ptr->reader_ptr_->read(&old_num_bytes, sizeof(size_t)); - delete_ids.resize(old_num_bytes / sizeof(size_t)); + delete_ids.resize(old_num_bytes / sizeof(engine::offset_t)); fs_ptr->reader_ptr_->read(delete_ids.data(), old_num_bytes); fs_ptr->reader_ptr_->close(); } else { diff --git a/core/src/db/insert/MemCollection.cpp b/core/src/db/insert/MemCollection.cpp index c765085a31..b77af7fb91 100644 --- a/core/src/db/insert/MemCollection.cpp +++ b/core/src/db/insert/MemCollection.cpp @@ -114,9 +114,14 @@ MemCollection::Serialize(uint64_t wal_lsn) { if (status.ok()) { break; } else if (status.code() == SS_STALE_ERROR) { - LOG_ENGINE_WARNING_ << "ApplyDeletes is stale, try again"; + std::string err = "ApplyDeletes is stale, try again"; + LOG_ENGINE_WARNING_ << err; + std::cout << err << std::endl; continue; } else { + std::string err = "ApplyDeletes failed: " + status.ToString(); + LOG_ENGINE_ERROR_ << err; + std::cout << err << std::endl; return status; } } @@ -172,6 +177,7 @@ MemCollection::ApplyDeletes() { auto segments_op = std::make_shared(context, ss); int64_t segment_iterated = 0; + // std::vector modified_segments; auto segment_executor = [&](const snapshot::SegmentPtr& segment, snapshot::SegmentIterator* iterator) -> Status { segment_iterated++; auto seg_visitor = engine::SegmentVisitor::Build(ss, segment->GetID()); @@ -179,6 +185,8 @@ MemCollection::ApplyDeletes() { std::make_shared(options_.meta_.path_, seg_visitor); segment::IdBloomFilterPtr pre_bloom_filter; STATUS_CHECK(segment_reader->LoadBloomFilter(pre_bloom_filter)); + std::vector uids; + STATUS_CHECK(segment_reader->LoadUids(uids)); // Step 1: Check delete_id in mem std::vector delete_ids; @@ -197,9 +205,13 @@ MemCollection::ApplyDeletes() { STATUS_CHECK(segment_reader->LoadDeletedDocs(prev_del_docs)); std::vector pre_del_ids; if (prev_del_docs) { - pre_del_ids = prev_del_docs->GetDeletedDocs(); - if (!pre_del_ids.empty()) + auto pre_doc_ids = prev_del_docs->GetDeletedDocs(); + if (!pre_doc_ids.empty()) { + for (auto& id : pre_doc_ids) { + pre_del_ids.push_back(uids[id]); + } delete_ids.insert(delete_ids.end(), pre_del_ids.begin(), pre_del_ids.end()); + } } // TODO(yhz): Update blacklist in cache @@ -265,8 +277,7 @@ MemCollection::ApplyDeletes() { segment::IdBloomFilterPtr bloom_filter; STATUS_CHECK(segment_writer->CreateBloomFilter(bloom_filter_file_path, bloom_filter)); auto delete_docs = std::make_shared(); - std::vector uids; - STATUS_CHECK(segment_reader->LoadUids(uids)); + for (size_t i = 0; i < uids.size(); i++) { if (std::binary_search(ids_to_check.begin(), ids_to_check.end(), uids[i])) { delete_docs->AddDeletedDoc(i); @@ -284,6 +295,7 @@ MemCollection::ApplyDeletes() { delete_file->SetSize(CommonUtil::GetFileSize(del_docs_path + codec::DeletedDocsFormat::FilePostfix())); bloom_filter_file->SetSize( CommonUtil::GetFileSize(bloom_filter_file_path + codec::IdBloomFilterFormat::FilePostfix())); + // modified_segments.push_back(segment); return Status::OK(); }; @@ -296,10 +308,22 @@ MemCollection::ApplyDeletes() { return Status::OK(); // no segment, nothing to do } - fiu_do_on("MemCollection.ApplyDeletes.RandomSleep", { - std::srand(std::time(nullptr)); - sleep(std::rand() % 3); - }); + fiu_do_on("MemCollection.ApplyDeletes.RandomSleep", sleep(1)); + + // snapshot::ScopedSnapshotT new_ss; + // STATUS_CHECK(snapshot::Snapshots::GetInstance().GetSnapshot(new_ss, collection_id_)); + // if (new_ss->GetID() != ss->GetID()) { + // for (auto& seg : modified_segments) { + // auto pre_seg_commit = ss->GetSegmentCommitBySegmentId(seg->GetID()); + // auto new_seg_commit = new_ss->GetSegmentCommitBySegmentId(seg->GetID()); + // if (new_seg_commit->GetID() != pre_seg_commit->GetID()) { + // // TODO: Rollback CompoundSegmentsOp + // std::string err = "[CSOE] Segment " + std::to_string(seg->GetID()) + " is stale."; + // LOG_ENGINE_ERROR_ << err; + // return Status(SS_STALE_ERROR, err); + // } + // } + // } return segments_op->Push(); } diff --git a/core/src/db/snapshot/SnapshotHolder.cpp b/core/src/db/snapshot/SnapshotHolder.cpp index ba07e2cae5..1a677969fb 100644 --- a/core/src/db/snapshot/SnapshotHolder.cpp +++ b/core/src/db/snapshot/SnapshotHolder.cpp @@ -114,10 +114,7 @@ SnapshotHolder::Get(ScopedSnapshotT& ss, ID_TYPE id, bool scoped) const { bool SnapshotHolder::IsActive(Snapshot::Ptr& ss) { auto collection = ss->GetCollection(); - if (collection && collection->IsActive()) { - return true; - } - return false; + return collection && collection->IsActive(); } Status diff --git a/core/src/db/snapshot/Store.h b/core/src/db/snapshot/Store.h index 02d4328981..230d70f9b6 100644 --- a/core/src/db/snapshot/Store.h +++ b/core/src/db/snapshot/Store.h @@ -195,7 +195,7 @@ class Store : public std::enable_shared_from_this { IDS_TYPE ids; IDS_TYPE selected_ids; std::vector filter_states = {State::ACTIVE}; - adapter_->SelectResourceIDs(selected_ids, "", filter_states); + adapter_->SelectResourceIDs(selected_ids, StateField::Name, filter_states); if (!reversed) { ids = selected_ids; diff --git a/core/unittest/db/test_db.cpp b/core/unittest/db/test_db.cpp index dec13f96a8..f4351b8062 100644 --- a/core/unittest/db/test_db.cpp +++ b/core/unittest/db/test_db.cpp @@ -979,7 +979,7 @@ TEST_F(DBTest, DeleteEntitiesTest) { milvus::engine::IDNumbers whole_delete_ids; fiu_init(0); - fiu_enable("MemCollection.ApplyDeletes.RandomSleep", 1, nullptr, 0); + fiu_enable_random("MemCollection.ApplyDeletes.RandomSleep", 1, nullptr, 0, 0.5); for (size_t i = 0; i < 5; i++) { std::string partition0 = collection_name + "p_" + std::to_string(i) + "_0"; std::string partition1 = collection_name + "p_" + std::to_string(i) + "_1"; @@ -1019,6 +1019,7 @@ TEST_F(DBTest, DeleteEntitiesTest) { } TEST_F(DBTest, DeleteStaleTest) { + const int del_id_pair = 3; auto insert_entities = [&](const std::string& collection, const std::string& partition, uint64_t count, uint64_t batch_index, milvus::engine::IDNumbers& ids) -> Status { milvus::engine::DataChunkPtr data_chunk; @@ -1048,7 +1049,7 @@ TEST_F(DBTest, DeleteStaleTest) { auto delete_task = [&](const std::string& collection, const milvus::engine::IDNumbers& del_ids) { auto status = Status::OK(); - for (size_t i = 0; i < 5; i++) { + for (size_t i = 0; i < del_id_pair; i++) { milvus::engine::IDNumbers ids = {del_ids[2 * i], del_ids[2 * i + 1]}; status = db_->DeleteEntityByID(collection, ids); ASSERT_TRUE(status.ok()) << status.ToString(); @@ -1072,24 +1073,26 @@ TEST_F(DBTest, DeleteStaleTest) { status = db_->Flush(collection_name); ASSERT_TRUE(status.ok()) << status.ToString(); - for (size_t i = 0; i < 5; i++) { + for (size_t i = 0; i < del_id_pair; i ++) { del_ids.push_back(entity_ids[i]); del_ids.push_back(entity_ids2[i]); } fiu_init(0); - fiu_enable("MemCollection.ApplyDeletes.RandomSleep", 1, nullptr, 0); + fiu_enable_random("MemCollection.ApplyDeletes.RandomSleep", 1, nullptr, 0, 0.5); auto build_thread = std::thread(build_task, collection_name, VECTOR_FIELD_NAME); auto delete_thread = std::thread(delete_task, collection_name, del_ids); - - build_thread.join(); delete_thread.join(); -// sleep(15); + build_thread.join(); fiu_disable("MemCollection.ApplyDeletes.RandomSleep"); + db_->Flush(); +// int64_t row_count; +// status = db_->CountEntities(collection_name, row_count); +// ASSERT_TRUE(status.ok()) << status.ToString(); +// ASSERT_EQ(row_count, 10000 * 2 - 2 * del_id_pair); // // std::vector valid_row; // milvus::engine::DataChunkPtr entity_data_chunk; -// std::cout << "Get Entity" << std::endl; // for (size_t j = 0; j < del_ids.size(); j++) { // status = db_->GetEntityByID(collection_name, {del_ids[j]}, {}, valid_row, entity_data_chunk); // ASSERT_TRUE(status.ok()) << status.ToString();