Fix multiple deletes not taking effect (#17033)

Signed-off-by: xige-16 <xi.ge@zilliz.com>
This commit is contained in:
xige-16 2022-05-16 18:39:56 +08:00 committed by GitHub
parent ffff291018
commit 935b729a0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 220 additions and 23 deletions

View File

@ -655,7 +655,7 @@ SegmentSealedImpl::Delete(int64_t reserved_offset, int64_t size, const IdArray*
}
deleted_record_.timestamps_.set_data_raw(reserved_offset, sort_timestamps.data(), size);
deleted_record_.pks_.set_data_raw(reserved_offset, sort_pks.data(), size);
deleted_record_.ack_responder_.AddSegment(reserved_offset, size);
deleted_record_.ack_responder_.AddSegment(reserved_offset, reserved_offset + size);
return Status::OK();
}

View File

@ -220,7 +220,8 @@ TEST(CApiTest, InsertTest) {
PreInsert(segment, N, &offset);
auto insert_data = serialize(dataset.raw_);
auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
assert(res.error_code == Success);
DeleteCollection(c_collection);
@ -246,6 +247,185 @@ TEST(CApiTest, DeleteTest) {
DeleteSegment(segment);
}
TEST(CApiTest, MultiDeleteGrowingSegment) {
auto collection = NewCollection(get_default_schema_config());
auto segment = NewSegment(collection, Growing, -1);
auto col = (milvus::segcore::Collection*)collection;
int N = 10;
auto dataset = DataGen(col->get_schema(), N);
auto insert_data = serialize(dataset.raw_);
// insert, pks= {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
int64_t offset;
PreInsert(segment, N, &offset);
auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
assert(res.error_code == Success);
// delete data pks = {1}
std::vector<int64_t> delete_pks = {1};
auto ids = std::make_unique<IdArray>();
ids->mutable_int_id()->mutable_data()->Add(delete_pks.begin(), delete_pks.end());
auto delete_data = serialize(ids.get());
std::vector<uint64_t> delete_timestamps(1, dataset.timestamps_[N - 1]);
offset = PreDelete(segment, 1);
auto del_res = Delete(segment, offset, 1, delete_data.data(), delete_data.size(), delete_timestamps.data());
assert(del_res.error_code == Success);
// retrieve pks = {1}
std::vector<int64_t> retrive_pks = {1};
auto schema = ((milvus::segcore::Collection*)collection)->get_schema();
auto plan = std::make_unique<query::RetrievePlan>(*schema);
auto term_expr = std::make_unique<query::TermExprImpl<int64_t>>(FieldId(101), DataType::INT64, retrive_pks);
plan->plan_node_ = std::make_unique<query::RetrievePlanNode>();
plan->plan_node_->predicate_ = std::move(term_expr);
std::vector<FieldId> target_field_ids{FieldId(100), FieldId(101)};
plan->field_ids_ = target_field_ids;
CRetrieveResult retrieve_result;
res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 0);
// retrieve pks = {2}
retrive_pks = {2};
term_expr = std::make_unique<query::TermExprImpl<int64_t>>(FieldId(101), DataType::INT64, retrive_pks);
plan->plan_node_->predicate_ = std::move(term_expr);
res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 1);
// delete pks = {2}
delete_pks = {2};
ids = std::make_unique<IdArray>();
ids->mutable_int_id()->mutable_data()->Add(delete_pks.begin(), delete_pks.end());
delete_data = serialize(ids.get());
offset = PreDelete(segment, 1);
del_res = Delete(segment, offset, 1, delete_data.data(), delete_data.size(), delete_timestamps.data());
assert(del_res.error_code == Success);
// retrieve pks in {2}
res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 0);
DeleteRetrievePlan(plan.release());
DeleteRetrieveResult(&retrieve_result);
DeleteCollection(collection);
DeleteSegment(segment);
}
TEST(CApiTest, MultiDeleteSealedSegment) {
auto collection = NewCollection(get_default_schema_config());
auto segment = NewSegment(collection, Sealed, -1);
auto col = (milvus::segcore::Collection*)collection;
int N = 10;
auto dataset = DataGen(col->get_schema(), N);
// load field data
for (auto& [field_id, field_meta] : col->get_schema()->get_fields()) {
auto array = dataset.get_col(field_id);
auto data = serialize(array.get());
auto load_info = CLoadFieldDataInfo{field_id.get(), data.data(), data.size(), N};
auto res = LoadFieldData(segment, load_info);
assert(res.error_code == Success);
auto count = GetRowCount(segment);
assert(count == N);
}
// load timestamps
FieldMeta ts_field_meta(FieldName("Timestamp"), TimestampFieldID, DataType::INT64);
auto ts_array = CreateScalarDataArrayFrom(dataset.timestamps_.data(), N, ts_field_meta);
auto ts_data = serialize(ts_array.get());
auto load_info = CLoadFieldDataInfo{TimestampFieldID.get(), ts_data.data(), ts_data.size(), N};
auto res = LoadFieldData(segment, load_info);
assert(res.error_code == Success);
auto count = GetRowCount(segment);
assert(count == N);
// load rowID
FieldMeta row_id_field_meta(FieldName("RowID"), RowFieldID, DataType::INT64);
auto row_id_array = CreateScalarDataArrayFrom(dataset.row_ids_.data(), N, row_id_field_meta);
auto row_id_data = serialize(row_id_array.get());
load_info = CLoadFieldDataInfo{RowFieldID.get(), row_id_data.data(), row_id_data.size(), N};
res = LoadFieldData(segment, load_info);
assert(res.error_code == Success);
count = GetRowCount(segment);
assert(count == N);
// delete data pks = {1}
std::vector<int64_t> delete_pks = {1};
auto ids = std::make_unique<IdArray>();
ids->mutable_int_id()->mutable_data()->Add(delete_pks.begin(), delete_pks.end());
auto delete_data = serialize(ids.get());
std::vector<uint64_t> delete_timestamps(1, dataset.timestamps_[N - 1]);
auto offset = PreDelete(segment, 1);
auto del_res = Delete(segment, offset, 1, delete_data.data(), delete_data.size(), delete_timestamps.data());
assert(del_res.error_code == Success);
// retrieve pks = {1}
std::vector<int64_t> retrive_pks = {1};
auto schema = ((milvus::segcore::Collection*)collection)->get_schema();
auto plan = std::make_unique<query::RetrievePlan>(*schema);
auto term_expr = std::make_unique<query::TermExprImpl<int64_t>>(FieldId(101), DataType::INT64, retrive_pks);
plan->plan_node_ = std::make_unique<query::RetrievePlanNode>();
plan->plan_node_->predicate_ = std::move(term_expr);
std::vector<FieldId> target_field_ids{FieldId(100), FieldId(101)};
plan->field_ids_ = target_field_ids;
CRetrieveResult retrieve_result;
res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
auto query_result = std::make_unique<proto::segcore::RetrieveResults>();
auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 0);
// retrieve pks = {2}
retrive_pks = {2};
term_expr = std::make_unique<query::TermExprImpl<int64_t>>(FieldId(101), DataType::INT64, retrive_pks);
plan->plan_node_->predicate_ = std::move(term_expr);
res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 1);
// delete pks = {2}
delete_pks = {2};
ids = std::make_unique<IdArray>();
ids->mutable_int_id()->mutable_data()->Add(delete_pks.begin(), delete_pks.end());
delete_data = serialize(ids.get());
offset = PreDelete(segment, 1);
del_res = Delete(segment, offset, 1, delete_data.data(), delete_data.size(), delete_timestamps.data());
assert(del_res.error_code == Success);
// retrieve pks in {2}
res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result);
ASSERT_EQ(res.error_code, Success);
suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size);
ASSERT_TRUE(suc);
ASSERT_EQ(query_result->ids().int_id().data().size(), 0);
DeleteRetrievePlan(plan.release());
DeleteRetrieveResult(&retrieve_result);
DeleteCollection(collection);
DeleteSegment(segment);
}
TEST(CApiTest, DeleteRepeatedPksFromGrowingSegment) {
auto collection = NewCollection(get_default_schema_config());
auto segment = NewSegment(collection, Growing, -1);
@ -259,12 +439,14 @@ TEST(CApiTest, DeleteRepeatedPksFromGrowingSegment) {
// first insert, pks= {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
int64_t offset;
PreInsert(segment, N, &offset);
auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
assert(res.error_code == Success);
// second insert, pks= {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
PreInsert(segment, N, &offset);
res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
assert(res.error_code == Success);
// create retrieve plan pks in {1, 2, 3}
@ -409,7 +591,8 @@ TEST(CApiTest, SearchTest) {
PreInsert(segment, N, &offset);
auto insert_data = serialize(dataset.raw_);
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
ASSERT_EQ(ins_res.error_code, Success);
const char* dsl_string = R"(
@ -471,7 +654,8 @@ TEST(CApiTest, SearchTestWithExpr) {
PreInsert(segment, N, &offset);
auto insert_data = serialize(dataset.raw_);
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
ASSERT_EQ(ins_res.error_code, Success);
const char* serialized_expr_plan = R"(vector_anns: <
@ -525,7 +709,8 @@ TEST(CApiTest, RetrieveTestWithExpr) {
PreInsert(segment, N, &offset);
auto insert_data = serialize(dataset.raw_);
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
ASSERT_EQ(ins_res.error_code, Success);
// create retrieve plan "age in [0]"
@ -563,7 +748,8 @@ TEST(CApiTest, GetMemoryUsageInBytesTest) {
PreInsert(segment, N, &offset);
auto insert_data = serialize(dataset.raw_);
auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
assert(res.error_code == Success);
auto memory_usage_size = GetMemoryUsageInBytes(segment);
@ -610,7 +796,8 @@ TEST(CApiTest, GetRowCountTest) {
PreInsert(segment, N, &offset);
auto insert_data = serialize(dataset.raw_);
auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
assert(res.error_code == Success);
auto row_count = GetRowCount(segment);
@ -679,7 +866,8 @@ TEST(CApiTest, ReduceRemoveDuplicates) {
PreInsert(segment, N, &offset);
auto insert_data = serialize(dataset.raw_);
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
assert(ins_res.error_code == Success);
const char* dsl_string = R"(
@ -774,7 +962,8 @@ testReduceSearchWithExpr(int N, int topK, int num_queries) {
PreInsert(segment, N, &offset);
auto insert_data = serialize(dataset.raw_);
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
assert(ins_res.error_code == Success);
auto fmt = boost::format(R"(vector_anns: <
@ -965,7 +1154,8 @@ TEST(CApiTest, Indexing_Without_Predicate) {
PreInsert(segment, N, &offset);
auto insert_data = serialize(dataset.raw_);
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
assert(ins_res.error_code == Success);
const char* dsl_string = R"(
@ -1090,7 +1280,8 @@ TEST(CApiTest, Indexing_Expr_Without_Predicate) {
PreInsert(segment, N, &offset);
auto insert_data = serialize(dataset.raw_);
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
assert(ins_res.error_code == Success);
const char* serialized_expr_plan = R"(vector_anns: <
@ -1210,7 +1401,8 @@ TEST(CApiTest, Indexing_With_float_Predicate_Range) {
PreInsert(segment, N, &offset);
auto insert_data = serialize(dataset.raw_);
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
assert(ins_res.error_code == Success);
const char* dsl_string = R"({
@ -1349,8 +1541,8 @@ TEST(CApiTest, Indexing_Expr_With_float_Predicate_Range) {
PreInsert(segment, N, &offset);
auto insert_data = serialize(dataset.raw_);
auto ins_res =
Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(),
insert_data.data(), insert_data.size());
assert(ins_res.error_code == Success);
}
@ -1501,7 +1693,8 @@ TEST(CApiTest, Indexing_With_float_Predicate_Term) {
PreInsert(segment, N, &offset);
auto insert_data = serialize(dataset.raw_);
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
assert(ins_res.error_code == Success);
const char* dsl_string = R"({
@ -1637,7 +1830,8 @@ TEST(CApiTest, Indexing_Expr_With_float_Predicate_Term) {
PreInsert(segment, N, &offset);
auto insert_data = serialize(dataset.raw_);
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
assert(ins_res.error_code == Success);
const char* serialized_expr_plan = R"(
@ -1782,7 +1976,8 @@ TEST(CApiTest, Indexing_With_binary_Predicate_Range) {
PreInsert(segment, N, &offset);
auto insert_data = serialize(dataset.raw_);
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
assert(ins_res.error_code == Success);
const char* dsl_string = R"({
@ -1920,7 +2115,8 @@ TEST(CApiTest, Indexing_Expr_With_binary_Predicate_Range) {
PreInsert(segment, N, &offset);
auto insert_data = serialize(dataset.raw_);
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
assert(ins_res.error_code == Success);
const char* serialized_expr_plan = R"(vector_anns: <
@ -2071,7 +2267,8 @@ TEST(CApiTest, Indexing_With_binary_Predicate_Term) {
PreInsert(segment, N, &offset);
auto insert_data = serialize(dataset.raw_);
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
assert(ins_res.error_code == Success);
const char* dsl_string = R"({
@ -2213,7 +2410,8 @@ TEST(CApiTest, Indexing_Expr_With_binary_Predicate_Term) {
PreInsert(segment, N, &offset);
auto insert_data = serialize(dataset.raw_);
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(), insert_data.size());
auto ins_res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.data(),
insert_data.size());
assert(ins_res.error_code == Success);
const char* serialized_expr_plan = R"(vector_anns: <
@ -2844,4 +3042,3 @@ TEST(CApiTest, SealedSegment_search_float_With_Expr_Predicate_Range) {
DeleteCollection(collection);
DeleteSegment(segment);
}