From fc393b3aeb358753e7ea45dc5b725befca744d3e Mon Sep 17 00:00:00 2001 From: BossZou <40255591+BossZou@users.noreply.github.com> Date: Wed, 5 Aug 2020 16:27:58 +0800 Subject: [PATCH] (scalar) Debug delete request (#3107) * Debug delete request Signed-off-by: yinghao.zou * Chang previous bloom filter and deleted docs file to stale during ApplyDeletes Signed-off-by: yinghao.zou * Add delete case Signed-off-by: yinghao.zou * Add delete ut Signed-off-by: yinghao.zou * Add Delete test Signed-off-by: yinghao.zou * Format Signed-off-by: yinghao.zou Co-authored-by: Wang Xiangyu --- core/src/db/DB.h | 2 +- core/src/db/DBImpl.cpp | 2 +- core/src/db/DBImpl.h | 2 +- core/src/db/insert/MemCollection.cpp | 81 +++++++--- core/src/db/snapshot/CompoundOperations.cpp | 8 + core/src/db/snapshot/CompoundOperations.h | 3 + core/src/db/snapshot/ResourceOperations.cpp | 4 +- core/src/segment/SegmentWriter.cpp | 2 +- core/unittest/db/test_db.cpp | 152 +++++++++++++++++- .../milvus_python_test/entity/test_delete.py | 10 +- 10 files changed, 230 insertions(+), 36 deletions(-) diff --git a/core/src/db/DB.h b/core/src/db/DB.h index ca699ec262..c0b53c78e6 100644 --- a/core/src/db/DB.h +++ b/core/src/db/DB.h @@ -100,7 +100,7 @@ class DB { DataChunkPtr& data_chunk) = 0; virtual Status - DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers entity_ids) = 0; + DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) = 0; virtual Status ListIDInSegment(const std::string& collection_id, int64_t segment_id, IDNumbers& entity_ids) = 0; diff --git a/core/src/db/DBImpl.cpp b/core/src/db/DBImpl.cpp index 703f7e5d20..bc448c78ac 100644 --- a/core/src/db/DBImpl.cpp +++ b/core/src/db/DBImpl.cpp @@ -576,7 +576,7 @@ DBImpl::GetEntityByID(const std::string& collection_name, const IDNumbers& id_ar } Status -DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers entity_ids) { +DBImpl::DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) { CHECK_INITIALIZED; Status status; diff --git a/core/src/db/DBImpl.h b/core/src/db/DBImpl.h index 95e794c3a3..1f694ed355 100644 --- a/core/src/db/DBImpl.h +++ b/core/src/db/DBImpl.h @@ -95,7 +95,7 @@ class DBImpl : public DB, public ConfigObserver { DataChunkPtr& data_chunk) override; Status - DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers entity_ids) override; + DeleteEntityByID(const std::string& collection_name, const engine::IDNumbers& entity_ids) override; Status Query(const server::ContextPtr& context, const query::QueryPtr& query_ptr, engine::QueryResultPtr& result) override; diff --git a/core/src/db/insert/MemCollection.cpp b/core/src/db/insert/MemCollection.cpp index 24d0b7aec0..c628f458cc 100644 --- a/core/src/db/insert/MemCollection.cpp +++ b/core/src/db/insert/MemCollection.cpp @@ -11,11 +11,17 @@ #include "db/insert/MemCollection.h" +#include + #include #include +#include +#include #include #include +#include + #include "config/ServerConfig.h" #include "db/Utils.h" #include "db/snapshot/CompoundOperations.h" @@ -101,6 +107,22 @@ Status MemCollection::Serialize(uint64_t wal_lsn) { TimeRecorder recorder("MemCollection::Serialize collection " + collection_id_); + if (!doc_ids_to_delete_.empty()) { + while (true) { + auto status = ApplyDeletes(); + if (status.ok()) { + break; + } else if (status.code() == SS_STALE_ERROR) { + LOG_ENGINE_WARNING_ << "ApplyDeletes is stale, try again"; + continue; + } else { + return status; + } + } + } + + doc_ids_to_delete_.clear(); + std::lock_guard lock(mutex_); for (auto& partition_segments : mem_segments_) { MemSegmentList& segments = partition_segments.second; @@ -113,13 +135,6 @@ MemCollection::Serialize(uint64_t wal_lsn) { } } - if (!doc_ids_to_delete_.empty()) { - auto status = ApplyDeletes(); - if (!status.ok()) { - return Status(DB_ERROR, status.message()); - } - } - mem_segments_.clear(); recorder.RecordSection("Finished flushing"); @@ -152,6 +167,7 @@ MemCollection::ApplyDeletes() { // TODO: check stale segment files here snapshot::OperationContext context; + context.lsn = lsn_; auto segments_op = std::make_shared(context, ss); auto segment_executor = [&](const snapshot::SegmentPtr& segment, snapshot::SegmentIterator* iterator) -> Status { @@ -161,6 +177,7 @@ MemCollection::ApplyDeletes() { segment::IdBloomFilterPtr pre_bloom_filter; STATUS_CHECK(segment_reader->LoadBloomFilter(pre_bloom_filter)); + // Step 1: Check delete_id in mem std::vector delete_ids; for (auto& id : doc_ids_to_delete_) { if (pre_bloom_filter->Check(id)) { @@ -168,11 +185,11 @@ MemCollection::ApplyDeletes() { } } - // No entities to delete, skip if (delete_ids.empty()) { return Status::OK(); } + // Step 2: Load previous delete_id and merge into 'delete_ids' segment::DeletedDocsPtr prev_del_docs; STATUS_CHECK(segment_reader->LoadDeletedDocs(prev_del_docs)); std::vector pre_del_ids; @@ -190,12 +207,32 @@ MemCollection::ApplyDeletes() { std::sort(delete_ids.begin(), delete_ids.end()); std::set ids_to_check(delete_ids.begin(), delete_ids.end()); - // write delete docs + // Step 3: Mark previous deleted docs file and bloom filter file stale auto& field_visitors_map = seg_visitor->GetFieldVisitors(); auto uid_field_visitor = seg_visitor->GetFieldVisitor(engine::DEFAULT_UID_NAME); auto del_doc_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_DELETED_DOCS); auto del_docs_element = del_doc_visitor->GetElement(); - // TODO(yhz): Create a new delete doc file in snapshot and obtain a new SegmentFile Res + auto blm_filter_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_BLOOM_FILTER); + auto blm_filter_element = blm_filter_visitor->GetElement(); + + auto segment_file_executor = [&](const snapshot::SegmentFilePtr& segment_file, + snapshot::SegmentFileIterator* iterator) -> Status { + if (segment_file->GetSegmentId() != segment->GetID()) { + return Status::OK(); + } + + if (segment_file->GetFieldElementId() == del_docs_element->GetID() || + segment_file->GetFieldElementId() == blm_filter_element->GetID()) { + segments_op->AddStaleSegmentFile(segment_file); + } + return Status::OK(); + }; + + auto segment_file_iterator = std::make_shared(ss, segment_file_executor); + segment_file_iterator->Iterate(); + STATUS_CHECK(segment_file_iterator->GetStatus()); + + // Step 4: Create new deleted docs file and bloom filter file snapshot::SegmentFileContext del_file_context; del_file_context.field_name = uid_field_visitor->GetField()->GetName(); del_file_context.field_element_name = del_docs_element->GetName(); @@ -203,15 +240,11 @@ MemCollection::ApplyDeletes() { del_file_context.partition_id = segment->GetPartitionId(); del_file_context.segment_id = segment->GetID(); snapshot::SegmentFilePtr delete_file; - segments_op->CommitNewSegmentFile(del_file_context, delete_file); + STATUS_CHECK(segments_op->CommitNewSegmentFile(del_file_context, delete_file)); auto segment_writer = std::make_shared(options_.meta_.path_, seg_visitor); std::string del_docs_path = snapshot::GetResPath(collection_root_path, delete_file); - // write bloom filter - auto blm_filter_visitor = uid_field_visitor->GetElementVisitor(FieldElementType::FET_BLOOM_FILTER); - auto blm_filter_element = blm_filter_visitor->GetElement(); - snapshot::SegmentFileContext bloom_file_context; bloom_file_context.field_name = uid_field_visitor->GetField()->GetName(); bloom_file_context.field_element_name = blm_filter_element->GetName(); @@ -220,14 +253,15 @@ MemCollection::ApplyDeletes() { bloom_file_context.segment_id = segment->GetID(); engine::snapshot::SegmentFile::Ptr bloom_filter_file; - segments_op->CommitNewSegmentFile(bloom_file_context, bloom_filter_file); + STATUS_CHECK(segments_op->CommitNewSegmentFile(bloom_file_context, bloom_filter_file)); std::string bloom_filter_file_path = snapshot::GetResPath(collection_root_path, bloom_filter_file); - auto delete_docs = std::make_shared(); + // Step 5: Write to file segment::IdBloomFilterPtr bloom_filter; - segment_writer->CreateBloomFilter(bloom_filter_file_path, bloom_filter); + STATUS_CHECK(segment_writer->CreateBloomFilter(bloom_filter_file_path, bloom_filter)); + auto delete_docs = std::make_shared(); std::vector uids; STATUS_CHECK(segment_reader->LoadUids(uids)); for (size_t i = 0; i < uids.size(); i++) { @@ -238,10 +272,11 @@ MemCollection::ApplyDeletes() { } } - segments_op->CommitRowCountDelta(segment->GetID(), delete_docs->GetCount() - pre_del_ids.size(), true); + STATUS_CHECK( + segments_op->CommitRowCountDelta(segment->GetID(), delete_docs->GetCount() - pre_del_ids.size(), true)); - segment_writer->WriteDeletedDocs(del_docs_path, delete_docs); - segment_writer->WriteBloomFilter(bloom_filter_file_path, bloom_filter); + STATUS_CHECK(segment_writer->WriteDeletedDocs(del_docs_path, delete_docs)); + STATUS_CHECK(segment_writer->WriteBloomFilter(bloom_filter_file_path, bloom_filter)); return Status::OK(); }; @@ -250,6 +285,10 @@ MemCollection::ApplyDeletes() { segment_iterator->Iterate(); STATUS_CHECK(segment_iterator->GetStatus()); + fiu_do_on("MemCollection.ApplyDeletes.RandomSleep", { + std::srand(std::time(nullptr)); + sleep(std::rand() % 3); + }); return segments_op->Push(); } diff --git a/core/src/db/snapshot/CompoundOperations.cpp b/core/src/db/snapshot/CompoundOperations.cpp index 2413ad2471..1edc366600 100644 --- a/core/src/db/snapshot/CompoundOperations.cpp +++ b/core/src/db/snapshot/CompoundOperations.cpp @@ -87,6 +87,14 @@ CompoundSegmentsOperation::CommitNewSegmentFile(const SegmentFileContext& contex return Status::OK(); } +Status +CompoundSegmentsOperation::AddStaleSegmentFile(const SegmentFilePtr& stale_segment_file) { + stale_segment_files_[stale_segment_file->GetSegmentId()].push_back(stale_segment_file); + modified_segments_.insert(stale_segment_file->GetSegmentId()); + + return Status::OK(); +} + Status CompoundSegmentsOperation::DoExecute(StorePtr store) { if (!context_.new_segment && stale_segment_files_.size() == 0 && new_segment_files_.size() == 0) { diff --git a/core/src/db/snapshot/CompoundOperations.h b/core/src/db/snapshot/CompoundOperations.h index 07a93f462b..caa9f73a6c 100644 --- a/core/src/db/snapshot/CompoundOperations.h +++ b/core/src/db/snapshot/CompoundOperations.h @@ -93,6 +93,9 @@ class CompoundSegmentsOperation : public CompoundBaseOperationGetMappings().erase(stale_segment_file->GetID()); size -= stale_segment_file->GetSize(); } - } else { + } else if (context_.new_segment && GetStartedSS()->GetResource(context_.new_segment->GetPartitionId())) { resource_ = std::make_shared(GetStartedSS()->GetLatestSchemaCommitId(), context_.new_segment->GetPartitionId(), context_.new_segment->GetID()); + } else { + return Status(SS_STALE_ERROR, "Stale Error"); } for (auto& new_segment_file : context_.new_segment_files) { resource_->GetMappings().insert(new_segment_file->GetID()); diff --git a/core/src/segment/SegmentWriter.cpp b/core/src/segment/SegmentWriter.cpp index db47d0e326..ff45187190 100644 --- a/core/src/segment/SegmentWriter.cpp +++ b/core/src/segment/SegmentWriter.cpp @@ -212,7 +212,7 @@ SegmentWriter::CreateBloomFilter(const std::string& file_path, IdBloomFilterPtr& try { ss_codec.GetIdBloomFilterFormat()->Create(fs_ptr_, file_path, bloom_filter_ptr); } catch (std::exception& er) { - return Status(DB_ERROR, "Create a new bloom filter fail"); + return Status(DB_ERROR, "Create a new bloom filter fail: " + std::string(er.what())); } return Status::OK(); diff --git a/core/unittest/db/test_db.cpp b/core/unittest/db/test_db.cpp index f60a4fce58..60db8c5aac 100644 --- a/core/unittest/db/test_db.cpp +++ b/core/unittest/db/test_db.cpp @@ -31,7 +31,7 @@ namespace { const char* VECTOR_FIELD_NAME = "vector"; milvus::Status -CreateCollection(std::shared_ptr db, const std::string& collection_name, const LSN_TYPE& lsn) { +CreateCollection(const std::shared_ptr& db, const std::string& collection_name, const LSN_TYPE& lsn) { CreateCollectionContext context; context.lsn = lsn; auto collection_schema = std::make_shared(collection_name); @@ -734,3 +734,153 @@ TEST_F(DBTest, StatsTest) { std::string ss = json_stats.dump(); std::cout << ss << std::endl; } + +TEST_F(DBTest, DeleteEntitiesTest) { + std::string collection_name = "test_collection_delete_"; + CreateCollection2(db_, collection_name, 0); + + auto insert_entities = [&](const std::string& collection, const std::string& partition, + uint64_t count, uint64_t batch_index, milvus::engine::IDNumbers& ids) -> Status { + milvus::engine::DataChunkPtr data_chunk; + BuildEntities(count, batch_index, data_chunk); + STATUS_CHECK(db_->Insert(collection, partition, data_chunk)); + STATUS_CHECK(db_->Flush(collection)); + auto iter = data_chunk->fixed_fields_.find(milvus::engine::DEFAULT_UID_NAME); + if (iter == data_chunk->fixed_fields_.end()) { + return Status(1, "Cannot find uid field"); + } + auto& ids_buffer = iter->second; + ids.resize(data_chunk->count_); + memcpy(ids.data(), ids_buffer->data_.data(), ids_buffer->Size()); + + return Status::OK(); + }; + + milvus::engine::IDNumbers entity_ids; + auto status = insert_entities(collection_name, "", 10000, 0, entity_ids); + ASSERT_TRUE(status.ok()) << status.ToString(); + + milvus::engine::IDNumbers delete_ids = {entity_ids[0]}; + + status = db_->DeleteEntityByID(collection_name, delete_ids); + ASSERT_TRUE(status.ok()) << status.ToString(); + + milvus::engine::IDNumbers whole_delete_ids; + fiu_init(0); + fiu_enable("MemCollection.ApplyDeletes.RandomSleep", 1, nullptr, 0); + for (size_t i = 0; i < 5; i++) { + std::string partition0 = collection_name + "p_" + std::to_string(i) + "_0"; + std::string partition1 = collection_name + "p_" + std::to_string(i) + "_1"; + + status = db_->CreatePartition(collection_name, partition0); + ASSERT_TRUE(status.ok()) << status.ToString(); + + status = db_->CreatePartition(collection_name, partition1); + ASSERT_TRUE(status.ok()) << status.ToString(); + + milvus::engine::IDNumbers partition0_ids; + status = insert_entities(collection_name, partition0, 10000, 2 * i + 1, partition0_ids); + ASSERT_TRUE(status.ok()) << status.ToString(); + + milvus::engine::IDNumbers partition1_ids; + status = insert_entities(collection_name, partition1, 10000, 2 * i + 2, partition1_ids); + ASSERT_TRUE(status.ok()) << status.ToString(); + + milvus::engine::IDNumbers partition_delete_ids = {partition0_ids[0], partition1_ids[0]}; + whole_delete_ids.insert(whole_delete_ids.begin(), partition_delete_ids.begin(), partition_delete_ids.end()); + db_->DeleteEntityByID(collection_name, partition_delete_ids); + ASSERT_TRUE(status.ok()) << status.ToString(); + + status = db_->DropPartition(collection_name, partition1); + ASSERT_TRUE(status.ok()) << status.ToString(); + } + sleep(3); + fiu_disable("MemCollection.ApplyDeletes.RandomSleep"); + + std::vector valid_row; + milvus::engine::DataChunkPtr entity_data_chunk; + for (auto& id : whole_delete_ids) { + status = db_->GetEntityByID(collection_name, {id}, {}, valid_row, entity_data_chunk); + ASSERT_TRUE(status.ok()) << status.ToString(); + ASSERT_EQ(entity_data_chunk->count_, 0); + } +} + +TEST_F(DBTest, DeleteStaleTest) { + auto insert_entities = [&](const std::string& collection, const std::string& partition, + uint64_t count, uint64_t batch_index, milvus::engine::IDNumbers& ids) -> Status { + milvus::engine::DataChunkPtr data_chunk; + BuildEntities(count, batch_index, data_chunk); + STATUS_CHECK(db_->Insert(collection, partition, data_chunk)); + STATUS_CHECK(db_->Flush(collection)); + auto iter = data_chunk->fixed_fields_.find(milvus::engine::DEFAULT_UID_NAME); + if (iter == data_chunk->fixed_fields_.end()) { + return Status(1, "Cannot find uid field"); + } + auto& ids_buffer = iter->second; + ids.resize(data_chunk->count_); + memcpy(ids.data(), ids_buffer->data_.data(), ids_buffer->Size()); + + return Status::OK(); + }; + + auto build_task = [&](const std::string& collection, const std::string& field) { + milvus::engine::CollectionIndex index; + index.index_name_ = "my_index1"; + index.index_type_ = milvus::knowhere::IndexEnum::INDEX_FAISS_IVFFLAT; + index.metric_name_ = milvus::knowhere::Metric::L2; + index.extra_params_["nlist"] = 2048; + auto status = db_->CreateIndex(dummy_context_, collection, field, index); + ASSERT_TRUE(status.ok()) << status.ToString(); + }; + + auto delete_task = [&](const std::string& collection, const milvus::engine::IDNumbers& del_ids) { + auto status = Status::OK(); + for (size_t i = 0; i < 5; i++) { + milvus::engine::IDNumbers ids = {del_ids[2 * i], del_ids[2 * i + 1]}; + status = db_->DeleteEntityByID(collection, ids); + ASSERT_TRUE(status.ok()) << status.ToString(); + sleep(1); + } + }; + + const std::string collection_name = "test_delete_stale_"; + auto status = CreateCollection2(db_, collection_name, 0); + ASSERT_TRUE(status.ok()) << status.ToString(); + milvus::engine::IDNumbers del_ids; + milvus::engine::IDNumbers entity_ids; + status = insert_entities(collection_name, "", 10000, 0, entity_ids); + ASSERT_TRUE(status.ok()) << status.ToString(); + status = db_->Flush(collection_name); + ASSERT_TRUE(status.ok()) << status.ToString(); + + milvus::engine::IDNumbers entity_ids2; + status = insert_entities(collection_name, "", 10000, 1, entity_ids2); + ASSERT_TRUE(status.ok()) << status.ToString(); + status = db_->Flush(collection_name); + ASSERT_TRUE(status.ok()) << status.ToString(); + + for (size_t i = 0; i < 5; i ++) { + del_ids.push_back(entity_ids[i]); + del_ids.push_back(entity_ids2[i]); + } + + fiu_init(0); + fiu_enable("MemCollection.ApplyDeletes.RandomSleep", 1, nullptr, 0); + auto build_thread = std::thread(build_task, collection_name, VECTOR_FIELD_NAME); + auto delete_thread = std::thread(delete_task, collection_name, del_ids); + + build_thread.join(); + delete_thread.join(); +// sleep(15); + fiu_disable("MemCollection.ApplyDeletes.RandomSleep"); +// +// std::vector valid_row; +// milvus::engine::DataChunkPtr entity_data_chunk; +// std::cout << "Get Entity" << std::endl; +// for (size_t j = 0; j < del_ids.size(); j++) { +// status = db_->GetEntityByID(collection_name, {del_ids[j]}, {}, valid_row, entity_data_chunk); +// ASSERT_TRUE(status.ok()) << status.ToString(); +// ASSERT_EQ(entity_data_chunk->count_, 0) << "[" << j << "] Delete id " << del_ids[j] << " failed."; +// } +} diff --git a/tests/milvus_python_test/entity/test_delete.py b/tests/milvus_python_test/entity/test_delete.py index cacbfd4673..48fd09b8d4 100644 --- a/tests/milvus_python_test/entity/test_delete.py +++ b/tests/milvus_python_test/entity/test_delete.py @@ -114,7 +114,6 @@ class TestDeleteBase: status = connect.delete_entity_by_id(collection, delete_ids) assert status - @pytest.mark.level(2) def test_insert_delete_A(self, connect, collection): ''' target: test delete entity @@ -130,7 +129,6 @@ class TestDeleteBase: res_count = connect.count_entities(collection) assert res_count == nb - 1 - @pytest.mark.level(2) def test_insert_delete_B(self, connect, collection): ''' target: test delete entity @@ -147,7 +145,6 @@ class TestDeleteBase: res_count = connect.count_entities(collection) assert res_count == 0 - @pytest.mark.level(2) def test_delete_exceed_limit(self, connect, collection): ''' target: test delete entity @@ -163,7 +160,6 @@ class TestDeleteBase: assert res_count == 0 # TODO - @pytest.mark.level(2) def test_flush_after_delete(self, connect, collection): ''' target: test delete entity @@ -197,11 +193,11 @@ class TestDeleteBase: assert res_count == nb - len(delete_ids) # TODO - @pytest.mark.level(2) def test_insert_same_ids_after_delete(self, connect, collection): ''' method: add entities and delete expected: status DELETED + note: Not flush after delete ''' insert_ids = [i for i in range(nb)] ids = connect.insert(collection, entities, insert_ids) @@ -216,7 +212,6 @@ class TestDeleteBase: assert res_count == nb - 1 # TODO - @pytest.mark.level(2) def test_insert_same_ids_after_delete_binary(self, connect, binary_collection): ''' method: add entities, with the same id and delete the ids @@ -356,7 +351,6 @@ class TestDeleteBase: assert status # TODO: - @pytest.mark.level(2) def test_insert_tags_delete(self, connect, collection): ''' method: add entitys with given two tags, delete entities with the return ids @@ -422,13 +416,11 @@ class TestDeleteInvalid(object): with pytest.raises(Exception) as e: status = connect.delete_entity_by_id(collection, [invalid_id]) - @pytest.mark.level(2) def test_delete_entity_ids_invalid(self, connect, collection, gen_entity_id): invalid_id = gen_entity_id with pytest.raises(Exception) as e: status = connect.delete_entity_by_id(collection, [1, invalid_id]) - @pytest.mark.level(2) def test_delete_entity_with_invalid_collection_name(self, connect, get_collection_name): collection_name = get_collection_name with pytest.raises(Exception) as e: