From 15a07951abcc3ec0f0dc128dc1038ce0938645a0 Mon Sep 17 00:00:00 2001 From: groot Date: Fri, 7 Aug 2020 14:08:13 +0800 Subject: [PATCH] fix build index crash (#3167) * fix build index crash Signed-off-by: yhmo Co-authored-by: shengjun.li --- core/src/scheduler/task/BuildIndexTask.cpp | 5 +- core/src/segment/Segment.cpp | 22 +++----- core/unittest/db/test_db.cpp | 60 +++++++++++++++++++--- 3 files changed, 63 insertions(+), 24 deletions(-) diff --git a/core/src/scheduler/task/BuildIndexTask.cpp b/core/src/scheduler/task/BuildIndexTask.cpp index 0311794877..d067c35ed4 100644 --- a/core/src/scheduler/task/BuildIndexTask.cpp +++ b/core/src/scheduler/task/BuildIndexTask.cpp @@ -55,8 +55,9 @@ BuildIndexTask::OnLoad(milvus::scheduler::LoadType type, uint8_t device_id) { stat = execution_engine_->Load(context); type_str = "DISK2CPU"; } else if (type == LoadType::CPU2GPU) { - stat = execution_engine_->CopyToGpu(device_id); - type_str = "CPU2GPU:" + std::to_string(device_id); + // no need to copy flat to gpu, + // stat = execution_engine_->CopyToGpu(device_id); + // type_str = "CPU2GPU:" + std::to_string(device_id); } else { error_msg = "Wrong load type"; stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); diff --git a/core/src/segment/Segment.cpp b/core/src/segment/Segment.cpp index 2d048b255a..29c65ed412 100644 --- a/core/src/segment/Segment.cpp +++ b/core/src/segment/Segment.cpp @@ -149,30 +149,20 @@ Segment::DeleteEntity(std::vector& offsets) { // sort offset in descendant std::sort(offsets.begin(), offsets.end(), std::greater()); - // delete entity data + // delete entity data from max offset to min offset for (auto& pair : fixed_fields_) { int64_t width = fixed_fields_width_[pair.first]; if (width == 0 || pair.second == nullptr) { continue; } - auto& data = pair.second->data_; - std::vector marked(data.size(), false); - std::vector temp; - temp.reserve(data.size() - offsets.size() * width); - - for (auto pos = offsets.begin(); pos != offsets.end(); pos++) { - for (auto i = 0; i < width; ++i) { - marked[(*pos) * width + i] = true; + auto& data = pair.second; + for (auto offset : offsets) { + if (offset >= 0 && offset < row_count_) { + auto step = offset * width; + data->data_.erase(data->data_.begin() + step, data->data_.begin() + step + width); } } - - for (size_t i = 0; i < data.size(); i++) { - if (!marked[i]) { - temp.push_back(data[i]); - } - } - data = std::move(temp); } // reset row count diff --git a/core/unittest/db/test_db.cpp b/core/unittest/db/test_db.cpp index f4351b8062..c569fe9311 100644 --- a/core/unittest/db/test_db.cpp +++ b/core/unittest/db/test_db.cpp @@ -544,9 +544,16 @@ TEST_F(DBTest, InsertTest) { ASSERT_TRUE(status.ok()); }; + // create collection with auto-generate id, insert entities with user id, insert action failed do_insert(true, true); + + // create collection with auto-generate id, insert entities without user id, insert action succeed do_insert(true, false); + + // create collection without auto-generate id, insert entities with user id, insert action succeed do_insert(false, true); + + // create collection without auto-generate id, insert entities without user id, insert action failed do_insert(false, false); } @@ -559,6 +566,7 @@ TEST_F(DBTest, MergeTest) { milvus::engine::DataChunkPtr data_chunk; BuildEntities(entity_count, 0, data_chunk); + // insert entities into collection multiple times int64_t repeat = 2; for (int32_t i = 0; i < repeat; i++) { status = db_->Insert(collection_name, "", data_chunk); @@ -570,13 +578,16 @@ TEST_F(DBTest, MergeTest) { ASSERT_TRUE(status.ok()); } - sleep(2); // wait to merge + // wait to merge finished + sleep(2); + // validate entities count int64_t row_count = 0; status = db_->CountEntities(collection_name, row_count); ASSERT_TRUE(status.ok()); ASSERT_EQ(row_count, entity_count * repeat); + // validate segment files count is correct ScopedSnapshotT ss; status = Snapshots::GetInstance().GetSnapshot(ss, collection_name); ASSERT_TRUE(status.ok()); @@ -631,6 +642,7 @@ TEST_F(DBTest, CompactTest) { auto status = CreateCollection2(db_, collection_name, 0); ASSERT_TRUE(status.ok()); + // insert 1000 entities into default partition const uint64_t entity_count = 1000; milvus::engine::DataChunkPtr data_chunk; BuildEntities(entity_count, 0, data_chunk); @@ -654,6 +666,8 @@ TEST_F(DBTest, CompactTest) { status = db_->DeleteEntityByID(collection_name, delete_ids); ASSERT_TRUE(status.ok()); }; + + // delete entities from 100 to 300 int64_t delete_count_1 = 200; delete_entity(100, 100 + delete_count_1); @@ -677,8 +691,11 @@ TEST_F(DBTest, CompactTest) { ASSERT_EQ(p[index++], i); } }; + + // validate the left data is correct after deletion validate_entity_data(); + // delete entities from 700 to 800 int64_t delete_count_2 = 100; delete_entity(700, 700 + delete_count_2); @@ -703,6 +720,7 @@ TEST_F(DBTest, CompactTest) { validate_entity_data(); }; + // compact the collection, when threshold = 0.001, the compact do nothing validate_compact(0.001); // compact skip validate_compact(0.5); // do compact } @@ -712,6 +730,7 @@ TEST_F(DBTest, IndexTest) { auto status = CreateCollection2(db_, collection_name, 0); ASSERT_TRUE(status.ok()); + // insert 10000 entities into default partition const uint64_t entity_count = 10000; milvus::engine::DataChunkPtr data_chunk; BuildEntities(entity_count, 0, data_chunk); @@ -722,6 +741,7 @@ TEST_F(DBTest, IndexTest) { status = db_->Flush(); ASSERT_TRUE(status.ok()); + // create index for vector field, validate the index { milvus::engine::CollectionIndex index; index.index_name_ = "my_index1"; @@ -740,6 +760,7 @@ TEST_F(DBTest, IndexTest) { ASSERT_EQ(index.extra_params_, index_get.extra_params_); } + // create index for structured fields, validate the index { milvus::engine::CollectionIndex index; index.index_name_ = "my_index2"; @@ -758,6 +779,7 @@ TEST_F(DBTest, IndexTest) { ASSERT_EQ(index.index_type_, index_get.index_type_); } + // drop index of vector field { status = db_->DropIndex(collection_name, VECTOR_FIELD_NAME); ASSERT_TRUE(status.ok()); @@ -768,6 +790,7 @@ TEST_F(DBTest, IndexTest) { ASSERT_TRUE(index_get.index_name_.empty()); } + // drop index of structured field 'field_0' { status = db_->DropIndex(collection_name, "field_0"); ASSERT_TRUE(status.ok()); @@ -788,6 +811,8 @@ TEST_F(DBTest, StatsTest) { status = db_->CreatePartition(collection_name, partition_name); ASSERT_TRUE(status.ok()); + // insert 10000 entities into default partition + // insert 10000 entities into partition 'p1' const uint64_t entity_count = 10000; milvus::engine::DataChunkPtr data_chunk; BuildEntities(entity_count, 0, data_chunk); @@ -803,6 +828,7 @@ TEST_F(DBTest, StatsTest) { status = db_->Flush(); ASSERT_TRUE(status.ok()); + // create index for vector field { milvus::engine::CollectionIndex index; index.index_type_ = milvus::knowhere::IndexEnum::INDEX_FAISS_IVFFLAT; @@ -812,6 +838,7 @@ TEST_F(DBTest, StatsTest) { ASSERT_TRUE(status.ok()); } + // create index for structured fields { milvus::engine::CollectionIndex index; index.index_type_ = milvus::engine::DEFAULT_STRUCTURED_INDEX_NAME; @@ -823,6 +850,7 @@ TEST_F(DBTest, StatsTest) { ASSERT_TRUE(status.ok()); } + // validate collection stats milvus::json json_stats; status = db_->GetCollectionStats(collection_name, json_stats); @@ -876,6 +904,8 @@ TEST_F(DBTest, FetchTest) { status = db_->CreatePartition(collection_name, partition_name); ASSERT_TRUE(status.ok()); + // insert 1000 entities into default partition + // insert 1000 entities into partition 'p1' const uint64_t entity_count = 1000; milvus::engine::DataChunkPtr data_chunk; BuildEntities(entity_count, 0, data_chunk); @@ -891,10 +921,12 @@ TEST_F(DBTest, FetchTest) { status = db_->Flush(); ASSERT_TRUE(status.ok()); + // get all the entities id of partition 'p1' milvus::engine::IDNumbers batch_entity_ids; milvus::engine::utils::GetIDFromChunk(data_chunk, batch_entity_ids); ASSERT_EQ(batch_entity_ids.size(), entity_count); + // delete first 10 entities from partition 'p1' int64_t delete_count = 10; milvus::engine::IDNumbers delete_entity_ids = batch_entity_ids; delete_entity_ids.resize(delete_count); @@ -904,6 +936,7 @@ TEST_F(DBTest, FetchTest) { status = db_->Flush(); ASSERT_TRUE(status.ok()); + // try fetch first 10 entities of parititon 'p1', since they were deleted, nothing returned std::vector field_names = {milvus::engine::DEFAULT_UID_NAME, VECTOR_FIELD_NAME, "field_0"}; std::vector valid_row; milvus::engine::DataChunkPtr fetch_chunk; @@ -914,6 +947,7 @@ TEST_F(DBTest, FetchTest) { ASSERT_FALSE(valid); } + // fetch all entities from the partition 'p1', 990 entities returned fetch_chunk = nullptr; status = db_->GetEntityByID(collection_name, batch_entity_ids, field_names, valid_row, fetch_chunk); ASSERT_TRUE(status.ok()); @@ -923,6 +957,9 @@ TEST_F(DBTest, FetchTest) { auto& vectors = fetch_chunk->fixed_fields_[VECTOR_FIELD_NAME]; ASSERT_EQ(vectors->Size() / (COLLECTION_DIM * sizeof(float)), fetch_chunk->count_); + // get collection stats + // the segment of default partition has 1000 entities + // the segment of partition 'p1' only has 990 entities milvus::json json_stats; status = db_->GetCollectionStats(collection_name, json_stats); ASSERT_TRUE(status.ok()); @@ -968,12 +1005,13 @@ TEST_F(DBTest, DeleteEntitiesTest) { return Status::OK(); }; + // insert 1000 entities into default partiiton milvus::engine::IDNumbers entity_ids; - auto status = insert_entities(collection_name, "", 10000, 0, entity_ids); + auto status = insert_entities(collection_name, "", 1000, 0, entity_ids); ASSERT_TRUE(status.ok()) << status.ToString(); + // delete the first entity milvus::engine::IDNumbers delete_ids = {entity_ids[0]}; - status = db_->DeleteEntityByID(collection_name, delete_ids); ASSERT_TRUE(status.ok()) << status.ToString(); @@ -990,25 +1028,35 @@ TEST_F(DBTest, DeleteEntitiesTest) { status = db_->CreatePartition(collection_name, partition1); ASSERT_TRUE(status.ok()) << status.ToString(); + // insert 1000 entities into partiiton_0 milvus::engine::IDNumbers partition0_ids; - status = insert_entities(collection_name, partition0, 10000, 2 * i + 1, partition0_ids); + status = insert_entities(collection_name, partition0, 1000, 2 * i + 1, partition0_ids); ASSERT_TRUE(status.ok()) << status.ToString(); + // insert 1000 entities into partiiton_1 milvus::engine::IDNumbers partition1_ids; - status = insert_entities(collection_name, partition1, 10000, 2 * i + 2, partition1_ids); + status = insert_entities(collection_name, partition1, 1000, 2 * i + 2, partition1_ids); ASSERT_TRUE(status.ok()) << status.ToString(); + // delete the first entity of each partition milvus::engine::IDNumbers partition_delete_ids = {partition0_ids[0], partition1_ids[0]}; whole_delete_ids.insert(whole_delete_ids.begin(), partition_delete_ids.begin(), partition_delete_ids.end()); db_->DeleteEntityByID(collection_name, partition_delete_ids); ASSERT_TRUE(status.ok()) << status.ToString(); + // drop partition_1 status = db_->DropPartition(collection_name, partition1); ASSERT_TRUE(status.ok()) << status.ToString(); } - sleep(3); + + // flush will apply the deletion + status = db_->Flush(); + ASSERT_TRUE(status.ok()); + fiu_disable("MemCollection.ApplyDeletes.RandomSleep"); + // get first entity from partition_0, the entity has been deleted, no entity returned + // get first entity from partition_1, the partition has been deleted, no entity returned std::vector valid_row; milvus::engine::DataChunkPtr entity_data_chunk; for (auto& id : whole_delete_ids) {