From 935b729a0c2af5eaa2e1e479cbf20d58744d8389 Mon Sep 17 00:00:00 2001 From: xige-16 Date: Mon, 16 May 2022 18:39:56 +0800 Subject: [PATCH] Fix multiple deletes not taking effect (#17033) Signed-off-by: xige-16 --- .../core/src/segcore/SegmentSealedImpl.cpp | 2 +- internal/core/unittest/test_c_api.cpp | 241 ++++++++++++++++-- 2 files changed, 220 insertions(+), 23 deletions(-) diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index f02c8d1991..dbcbe6f1e3 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -655,7 +655,7 @@ SegmentSealedImpl::Delete(int64_t reserved_offset, int64_t size, const IdArray* } deleted_record_.timestamps_.set_data_raw(reserved_offset, sort_timestamps.data(), size); deleted_record_.pks_.set_data_raw(reserved_offset, sort_pks.data(), size); - deleted_record_.ack_responder_.AddSegment(reserved_offset, size); + deleted_record_.ack_responder_.AddSegment(reserved_offset, reserved_offset + size); return Status::OK(); } diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 2c9d42a856..9c00fd3931 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -220,7 +220,8 @@ TEST(CApiTest, InsertTest) { PreInsert(segment, N, &offset); auto insert_data = serialize(dataset.raw_); - auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); assert(res.error_code == Success); DeleteCollection(c_collection); @@ -246,6 +247,185 @@ TEST(CApiTest, DeleteTest) { DeleteSegment(segment); } +TEST(CApiTest, MultiDeleteGrowingSegment) { + auto collection = NewCollection(get_default_schema_config()); + auto segment = NewSegment(collection, Growing, -1); + auto col = (milvus::segcore::Collection*)collection; + + int N = 10; + auto dataset = DataGen(col->get_schema(), N); + auto insert_data = serialize(dataset.raw_); + + // insert, pks= {0, 1, 2, 3, 4, 5, 6, 7, 8, 9} + int64_t offset; + PreInsert(segment, N, &offset); + auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); + assert(res.error_code == Success); + + // delete data pks = {1} + std::vector delete_pks = {1}; + auto ids = std::make_unique(); + ids->mutable_int_id()->mutable_data()->Add(delete_pks.begin(), delete_pks.end()); + auto delete_data = serialize(ids.get()); + std::vector delete_timestamps(1, dataset.timestamps_[N - 1]); + offset = PreDelete(segment, 1); + auto del_res = Delete(segment, offset, 1, delete_data.data(), delete_data.size(), delete_timestamps.data()); + assert(del_res.error_code == Success); + + // retrieve pks = {1} + std::vector retrive_pks = {1}; + auto schema = ((milvus::segcore::Collection*)collection)->get_schema(); + auto plan = std::make_unique(*schema); + auto term_expr = std::make_unique>(FieldId(101), DataType::INT64, retrive_pks); + plan->plan_node_ = std::make_unique(); + plan->plan_node_->predicate_ = std::move(term_expr); + std::vector target_field_ids{FieldId(100), FieldId(101)}; + plan->field_ids_ = target_field_ids; + + CRetrieveResult retrieve_result; + res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + ASSERT_EQ(res.error_code, Success); + auto query_result = std::make_unique(); + auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size); + ASSERT_TRUE(suc); + ASSERT_EQ(query_result->ids().int_id().data().size(), 0); + + // retrieve pks = {2} + retrive_pks = {2}; + term_expr = std::make_unique>(FieldId(101), DataType::INT64, retrive_pks); + plan->plan_node_->predicate_ = std::move(term_expr); + res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + ASSERT_EQ(res.error_code, Success); + suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size); + ASSERT_TRUE(suc); + ASSERT_EQ(query_result->ids().int_id().data().size(), 1); + + // delete pks = {2} + delete_pks = {2}; + ids = std::make_unique(); + ids->mutable_int_id()->mutable_data()->Add(delete_pks.begin(), delete_pks.end()); + delete_data = serialize(ids.get()); + offset = PreDelete(segment, 1); + del_res = Delete(segment, offset, 1, delete_data.data(), delete_data.size(), delete_timestamps.data()); + assert(del_res.error_code == Success); + + // retrieve pks in {2} + res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + ASSERT_EQ(res.error_code, Success); + suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size); + ASSERT_TRUE(suc); + ASSERT_EQ(query_result->ids().int_id().data().size(), 0); + + DeleteRetrievePlan(plan.release()); + DeleteRetrieveResult(&retrieve_result); + + DeleteCollection(collection); + DeleteSegment(segment); +} + +TEST(CApiTest, MultiDeleteSealedSegment) { + auto collection = NewCollection(get_default_schema_config()); + auto segment = NewSegment(collection, Sealed, -1); + auto col = (milvus::segcore::Collection*)collection; + + int N = 10; + auto dataset = DataGen(col->get_schema(), N); + + // load field data + for (auto& [field_id, field_meta] : col->get_schema()->get_fields()) { + auto array = dataset.get_col(field_id); + auto data = serialize(array.get()); + + auto load_info = CLoadFieldDataInfo{field_id.get(), data.data(), data.size(), N}; + + auto res = LoadFieldData(segment, load_info); + assert(res.error_code == Success); + auto count = GetRowCount(segment); + assert(count == N); + } + + // load timestamps + FieldMeta ts_field_meta(FieldName("Timestamp"), TimestampFieldID, DataType::INT64); + auto ts_array = CreateScalarDataArrayFrom(dataset.timestamps_.data(), N, ts_field_meta); + auto ts_data = serialize(ts_array.get()); + auto load_info = CLoadFieldDataInfo{TimestampFieldID.get(), ts_data.data(), ts_data.size(), N}; + auto res = LoadFieldData(segment, load_info); + assert(res.error_code == Success); + auto count = GetRowCount(segment); + assert(count == N); + + // load rowID + FieldMeta row_id_field_meta(FieldName("RowID"), RowFieldID, DataType::INT64); + auto row_id_array = CreateScalarDataArrayFrom(dataset.row_ids_.data(), N, row_id_field_meta); + auto row_id_data = serialize(row_id_array.get()); + load_info = CLoadFieldDataInfo{RowFieldID.get(), row_id_data.data(), row_id_data.size(), N}; + res = LoadFieldData(segment, load_info); + assert(res.error_code == Success); + count = GetRowCount(segment); + assert(count == N); + + // delete data pks = {1} + std::vector delete_pks = {1}; + auto ids = std::make_unique(); + ids->mutable_int_id()->mutable_data()->Add(delete_pks.begin(), delete_pks.end()); + auto delete_data = serialize(ids.get()); + std::vector delete_timestamps(1, dataset.timestamps_[N - 1]); + auto offset = PreDelete(segment, 1); + auto del_res = Delete(segment, offset, 1, delete_data.data(), delete_data.size(), delete_timestamps.data()); + assert(del_res.error_code == Success); + + // retrieve pks = {1} + std::vector retrive_pks = {1}; + auto schema = ((milvus::segcore::Collection*)collection)->get_schema(); + auto plan = std::make_unique(*schema); + auto term_expr = std::make_unique>(FieldId(101), DataType::INT64, retrive_pks); + plan->plan_node_ = std::make_unique(); + plan->plan_node_->predicate_ = std::move(term_expr); + std::vector target_field_ids{FieldId(100), FieldId(101)}; + plan->field_ids_ = target_field_ids; + + CRetrieveResult retrieve_result; + res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + ASSERT_EQ(res.error_code, Success); + auto query_result = std::make_unique(); + auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size); + ASSERT_TRUE(suc); + ASSERT_EQ(query_result->ids().int_id().data().size(), 0); + + // retrieve pks = {2} + retrive_pks = {2}; + term_expr = std::make_unique>(FieldId(101), DataType::INT64, retrive_pks); + plan->plan_node_->predicate_ = std::move(term_expr); + res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + ASSERT_EQ(res.error_code, Success); + suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size); + ASSERT_TRUE(suc); + ASSERT_EQ(query_result->ids().int_id().data().size(), 1); + + // delete pks = {2} + delete_pks = {2}; + ids = std::make_unique(); + ids->mutable_int_id()->mutable_data()->Add(delete_pks.begin(), delete_pks.end()); + delete_data = serialize(ids.get()); + offset = PreDelete(segment, 1); + del_res = Delete(segment, offset, 1, delete_data.data(), delete_data.size(), delete_timestamps.data()); + assert(del_res.error_code == Success); + + // retrieve pks in {2} + res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + ASSERT_EQ(res.error_code, Success); + suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size); + ASSERT_TRUE(suc); + ASSERT_EQ(query_result->ids().int_id().data().size(), 0); + + DeleteRetrievePlan(plan.release()); + DeleteRetrieveResult(&retrieve_result); + + DeleteCollection(collection); + DeleteSegment(segment); +} + TEST(CApiTest, DeleteRepeatedPksFromGrowingSegment) { auto collection = NewCollection(get_default_schema_config()); auto segment = NewSegment(collection, Growing, -1); @@ -259,12 +439,14 @@ TEST(CApiTest, DeleteRepeatedPksFromGrowingSegment) { // first insert, pks= {0, 1, 2, 3, 4, 5, 6, 7, 8, 9} int64_t offset; PreInsert(segment, N, &offset); - auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); assert(res.error_code == Success); // second insert, pks= {0, 1, 2, 3, 4, 5, 6, 7, 8, 9} PreInsert(segment, N, &offset); - res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); assert(res.error_code == Success); // create retrieve plan pks in {1, 2, 3} @@ -409,7 +591,8 @@ TEST(CApiTest, SearchTest) { PreInsert(segment, N, &offset); auto insert_data = serialize(dataset.raw_); - auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); ASSERT_EQ(ins_res.error_code, Success); const char* dsl_string = R"( @@ -471,7 +654,8 @@ TEST(CApiTest, SearchTestWithExpr) { PreInsert(segment, N, &offset); auto insert_data = serialize(dataset.raw_); - auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); ASSERT_EQ(ins_res.error_code, Success); const char* serialized_expr_plan = R"(vector_anns: < @@ -525,7 +709,8 @@ TEST(CApiTest, RetrieveTestWithExpr) { PreInsert(segment, N, &offset); auto insert_data = serialize(dataset.raw_); - auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); ASSERT_EQ(ins_res.error_code, Success); // create retrieve plan "age in [0]" @@ -563,7 +748,8 @@ TEST(CApiTest, GetMemoryUsageInBytesTest) { PreInsert(segment, N, &offset); auto insert_data = serialize(dataset.raw_); - auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); assert(res.error_code == Success); auto memory_usage_size = GetMemoryUsageInBytes(segment); @@ -610,7 +796,8 @@ TEST(CApiTest, GetRowCountTest) { PreInsert(segment, N, &offset); auto insert_data = serialize(dataset.raw_); - auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); assert(res.error_code == Success); auto row_count = GetRowCount(segment); @@ -679,7 +866,8 @@ TEST(CApiTest, ReduceRemoveDuplicates) { PreInsert(segment, N, &offset); auto insert_data = serialize(dataset.raw_); - auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); assert(ins_res.error_code == Success); const char* dsl_string = R"( @@ -774,7 +962,8 @@ testReduceSearchWithExpr(int N, int topK, int num_queries) { PreInsert(segment, N, &offset); auto insert_data = serialize(dataset.raw_); - auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); assert(ins_res.error_code == Success); auto fmt = boost::format(R"(vector_anns: < @@ -965,7 +1154,8 @@ TEST(CApiTest, Indexing_Without_Predicate) { PreInsert(segment, N, &offset); auto insert_data = serialize(dataset.raw_); - auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); assert(ins_res.error_code == Success); const char* dsl_string = R"( @@ -1090,7 +1280,8 @@ TEST(CApiTest, Indexing_Expr_Without_Predicate) { PreInsert(segment, N, &offset); auto insert_data = serialize(dataset.raw_); - auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); assert(ins_res.error_code == Success); const char* serialized_expr_plan = R"(vector_anns: < @@ -1210,7 +1401,8 @@ TEST(CApiTest, Indexing_With_float_Predicate_Range) { PreInsert(segment, N, &offset); auto insert_data = serialize(dataset.raw_); - auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); assert(ins_res.error_code == Success); const char* dsl_string = R"({ @@ -1349,8 +1541,8 @@ TEST(CApiTest, Indexing_Expr_With_float_Predicate_Range) { PreInsert(segment, N, &offset); auto insert_data = serialize(dataset.raw_); - auto ins_res = - Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), + insert_data.data(), insert_data.size()); assert(ins_res.error_code == Success); } @@ -1501,7 +1693,8 @@ TEST(CApiTest, Indexing_With_float_Predicate_Term) { PreInsert(segment, N, &offset); auto insert_data = serialize(dataset.raw_); - auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); assert(ins_res.error_code == Success); const char* dsl_string = R"({ @@ -1637,7 +1830,8 @@ TEST(CApiTest, Indexing_Expr_With_float_Predicate_Term) { PreInsert(segment, N, &offset); auto insert_data = serialize(dataset.raw_); - auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); assert(ins_res.error_code == Success); const char* serialized_expr_plan = R"( @@ -1782,7 +1976,8 @@ TEST(CApiTest, Indexing_With_binary_Predicate_Range) { PreInsert(segment, N, &offset); auto insert_data = serialize(dataset.raw_); - auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); assert(ins_res.error_code == Success); const char* dsl_string = R"({ @@ -1920,7 +2115,8 @@ TEST(CApiTest, Indexing_Expr_With_binary_Predicate_Range) { PreInsert(segment, N, &offset); auto insert_data = serialize(dataset.raw_); - auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); assert(ins_res.error_code == Success); const char* serialized_expr_plan = R"(vector_anns: < @@ -2071,7 +2267,8 @@ TEST(CApiTest, Indexing_With_binary_Predicate_Term) { PreInsert(segment, N, &offset); auto insert_data = serialize(dataset.raw_); - auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); assert(ins_res.error_code == Success); const char* dsl_string = R"({ @@ -2213,7 +2410,8 @@ TEST(CApiTest, Indexing_Expr_With_binary_Predicate_Term) { PreInsert(segment, N, &offset); auto insert_data = serialize(dataset.raw_); - auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size()); + auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), + insert_data.size()); assert(ins_res.error_code == Success); const char* serialized_expr_plan = R"(vector_anns: < @@ -2844,4 +3042,3 @@ TEST(CApiTest, SealedSegment_search_float_With_Expr_Predicate_Range) { DeleteCollection(collection); DeleteSegment(segment); } -