diff --git a/internal/core/src/segcore/DeletedRecord.h b/internal/core/src/segcore/DeletedRecord.h index 0ea0ed9ca2..7238e5fc10 100644 --- a/internal/core/src/segcore/DeletedRecord.h +++ b/internal/core/src/segcore/DeletedRecord.h @@ -58,15 +58,29 @@ class DeletedRecord { int64_t removed_num = 0; int64_t mem_add = 0; for (size_t i = 0; i < pks.size(); ++i) { - auto offsets = insert_record_->search_pk(pks[i], timestamps[i]); + auto delete_pk = pks[i]; + auto delete_timestamp = timestamps[i]; + auto offsets = + insert_record_->search_pk(delete_pk, delete_timestamp); + bool has_duplicate_pk_timestamps = false; for (auto offset : offsets) { - int64_t insert_row_offset = offset.get(); - // Assert(insert_record->timestamps_.size() >= insert_row_offset); - if (insert_record_->timestamps_[insert_row_offset] < - timestamps[i]) { - InsertIntoInnerPairs(timestamps[i], {insert_row_offset}); + int64_t row_offset = offset.get(); + auto row_timestamp = insert_record_->timestamps_[row_offset]; + // Assert(insert_record->timestamps_.size() >= row_offset); + if (row_timestamp < delete_timestamp) { + InsertIntoInnerPairs(delete_timestamp, {row_offset}); removed_num++; mem_add += sizeof(Timestamp) + sizeof(int64_t); + } else if (row_timestamp == delete_timestamp) { + // if insert record have multi same (pk, timestamp) pairs, + // need to remove the next pairs, just keep first + if (!has_duplicate_pk_timestamps) { + has_duplicate_pk_timestamps = true; + } else { + InsertIntoInnerPairs(delete_timestamp, {row_offset}); + removed_num++; + mem_add += sizeof(Timestamp) + sizeof(int64_t); + } } } } @@ -84,15 +98,24 @@ class DeletedRecord { auto end = deleted_pairs_.lower_bound( std::make_pair(timestamp, std::set{})); for (auto it = deleted_pairs_.begin(); it != end; it++) { + // this may happen if lower_bound end is deleted_pairs_ end and + // other threads insert node to deleted_pairs_ concurrently + if (it->first > timestamp) { + break; + } for (auto& v : it->second) { - bitset.set(v); + if (v < insert_barrier) { + bitset.set(v); + } } } // handle the case where end points to an element with the same timestamp if (end != deleted_pairs_.end() && end->first == timestamp) { for (auto& v : end->second) { - bitset.set(v); + if (v < insert_barrier) { + bitset.set(v); + } } } } diff --git a/internal/core/unittest/test_array_expr.cpp b/internal/core/unittest/test_array_expr.cpp index 02f15bac7b..ec503d8952 100644 --- a/internal/core/unittest/test_array_expr.cpp +++ b/internal/core/unittest/test_array_expr.cpp @@ -701,7 +701,7 @@ TEST(Expr, TestArrayEqual) { std::vector long_array_col; int num_iters = 1; for (int iter = 0; iter < num_iters; ++iter) { - auto raw_data = DataGen(schema, N, iter, 0, 1, 3); + auto raw_data = DataGen(schema, N, iter, 0, 0, 1, 3); auto new_long_array_col = raw_data.get_col(long_array_fid); long_array_col.insert(long_array_col.end(), new_long_array_col.begin(), diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index cb0d5bfe87..d58150e28f 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -936,7 +936,7 @@ TEST(CApiTest, DeleteRepeatedPksFromSealedSegment) { auto col = (milvus::segcore::Collection*)collection; int N = 20; - auto dataset = DataGen(col->get_schema(), N, 42, 0, 2); + auto dataset = DataGen(col->get_schema(), N, 42, 0, 0, 2); auto segment_interface = reinterpret_cast(segment); auto sealed_segment = dynamic_cast(segment_interface); @@ -1156,7 +1156,7 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnGrowingSegment) { // second insert data // insert data with pks = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9} , timestamps = {10, 11, 12, 13, 14, 15, 16, 17, 18, 19} - dataset = DataGen(col->get_schema(), N, 42, N); + dataset = DataGen(col->get_schema(), N, 42, 0, N); insert_data = serialize(dataset.raw_); PreInsert(segment, N, &offset); res = Insert(segment, @@ -1194,7 +1194,7 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnSealedSegment) { auto col = (milvus::segcore::Collection*)collection; int N = 10; - auto dataset = DataGen(col->get_schema(), N, 42, 0, 2); + auto dataset = DataGen(col->get_schema(), N, 42, 0, 0, 2); // insert data with pks = {0, 0, 1, 1, 2, 2, 3, 3, 4, 4} , timestamps = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9} auto segment_interface = reinterpret_cast(segment); diff --git a/internal/core/unittest/test_c_stream_reduce.cpp b/internal/core/unittest/test_c_stream_reduce.cpp index 8573e6771a..9689503b64 100644 --- a/internal/core/unittest/test_c_stream_reduce.cpp +++ b/internal/core/unittest/test_c_stream_reduce.cpp @@ -29,7 +29,7 @@ TEST(CApiTest, StreamReduce) { //2. insert data into segments auto schema = ((milvus::segcore::Collection*)collection)->get_schema(); - auto dataset_1 = DataGen(schema, N, 55, 0, 1, 10, true); + auto dataset_1 = DataGen(schema, N, 55, 0, 0, 1, 10, true); int64_t offset_1; PreInsert(segment_1, N, &offset_1); auto insert_data_1 = serialize(dataset_1.raw_); @@ -42,7 +42,7 @@ TEST(CApiTest, StreamReduce) { insert_data_1.size()); ASSERT_EQ(ins_res_1.error_code, Success); - auto dataset_2 = DataGen(schema, N, 66, 0, 1, 10, true); + auto dataset_2 = DataGen(schema, N, 66, 0, 0, 1, 10, true); int64_t offset_2; PreInsert(segment_2, N, &offset_2); auto insert_data_2 = serialize(dataset_2.raw_); diff --git a/internal/core/unittest/test_group_by.cpp b/internal/core/unittest/test_group_by.cpp index 9b9fd5fa1c..ad71679110 100644 --- a/internal/core/unittest/test_group_by.cpp +++ b/internal/core/unittest/test_group_by.cpp @@ -95,7 +95,7 @@ TEST(GroupBY, SealedIndex) { size_t N = 50; //2. load raw data - auto raw_data = DataGen(schema, N, 42, 0, 8, 10, false, false); + auto raw_data = DataGen(schema, N, 42, 0, 0, 8, 10, false, false); auto fields = schema->get_fields(); for (auto field_data : raw_data.raw_->fields_data()) { int64_t field_id = field_data.field_id(); @@ -447,7 +447,7 @@ TEST(GroupBY, SealedData) { size_t N = 100; //2. load raw data - auto raw_data = DataGen(schema, N, 42, 0, 8, 10, false, false); + auto raw_data = DataGen(schema, N, 42, 0, 0, 8, 10, false, false); auto fields = schema->get_fields(); for (auto field_data : raw_data.raw_->fields_data()) { int64_t field_id = field_data.field_id(); @@ -542,9 +542,9 @@ TEST(GroupBY, Reduce) { int repeat_count_1 = 2; int repeat_count_2 = 5; auto raw_data1 = - DataGen(schema, N, seed, ts_offset, repeat_count_1, false, false); + DataGen(schema, N, seed, 0, ts_offset, repeat_count_1, false, false); auto raw_data2 = - DataGen(schema, N, seed, ts_offset, repeat_count_2, false, false); + DataGen(schema, N, seed, 0, ts_offset, repeat_count_2, false, false); auto fields = schema->get_fields(); //load segment1 raw data @@ -676,7 +676,7 @@ TEST(GroupBY, GrowingRawData) { int n_batch = 3; for (int i = 0; i < n_batch; i++) { auto data_set = - DataGen(schema, rows_per_batch, 42, 0, 8, 10, false, false); + DataGen(schema, rows_per_batch, 42, 0, 0, 8, 10, false, false); auto offset = segment_growing_impl->PreInsert(rows_per_batch); segment_growing_impl->Insert(offset, rows_per_batch, @@ -774,9 +774,9 @@ TEST(GroupBY, GrowingIndex) { int64_t rows_per_batch = 1024; int n_batch = 10; for (int i = 0; i < n_batch; i++) { - auto data_set = - DataGen(schema, rows_per_batch, 42, 0, 8, 10, false, false); auto offset = segment_growing_impl->PreInsert(rows_per_batch); + auto data_set = DataGen( + schema, rows_per_batch, 42, offset, offset, 1, 10, false, false); segment_growing_impl->Insert(offset, rows_per_batch, data_set.row_ids_.data(), diff --git a/internal/core/unittest/test_growing.cpp b/internal/core/unittest/test_growing.cpp index bb17265f98..77a75f7899 100644 --- a/internal/core/unittest/test_growing.cpp +++ b/internal/core/unittest/test_growing.cpp @@ -59,7 +59,7 @@ TEST(Growing, RemoveDuplicatedRecords) { int64_t c = 1000; auto offset = 0; - auto dataset = DataGen(schema, c, 42, 0, 1, 10, true); + auto dataset = DataGen(schema, c, 42, 0, 0, 1, 10, true); auto pks = dataset.get_col(pk); segment->Insert(offset, c, @@ -109,6 +109,34 @@ TEST(Growing, RemoveDuplicatedRecords) { } } +TEST(Growing, RealCountWithDuplicateRecords) { + auto schema = std::make_shared(); + auto pk = schema->AddDebugField("pk", DataType::INT64); + schema->set_primary_field_id(pk); + auto segment = CreateGrowingSegment(schema, empty_index_meta); + + int64_t c = 10; + auto offset = 0; + auto dataset = DataGen(schema, c); + auto pks = dataset.get_col(pk); + + // insert same values twice + segment->Insert(offset, + c, + dataset.row_ids_.data(), + dataset.timestamps_.data(), + dataset.raw_); + + segment->Insert(offset + c, + c, + dataset.row_ids_.data(), + dataset.timestamps_.data(), + dataset.raw_); + + // real count is still c not 2c + ASSERT_EQ(c, segment->get_real_count()); +} + TEST(Growing, RealCount) { auto schema = std::make_shared(); auto pk = schema->AddDebugField("pk", DataType::INT64); diff --git a/internal/core/unittest/test_retrieve.cpp b/internal/core/unittest/test_retrieve.cpp index 840e033457..31846efbbf 100644 --- a/internal/core/unittest/test_retrieve.cpp +++ b/internal/core/unittest/test_retrieve.cpp @@ -383,7 +383,7 @@ TEST_P(RetrieveTest, LargeTimestamp) { int choose_sep = 3; auto choose = [=](int i) { return i * choose_sep % N; }; uint64_t ts_offset = 100; - auto dataset = DataGen(schema, N, 42, ts_offset + 1); + auto dataset = DataGen(schema, N, 42, 0, ts_offset + 1); auto segment = CreateSealedSegment(schema); SealedLoadFieldData(dataset, *segment); auto i64_col = dataset.get_col(fid_64); diff --git a/internal/core/unittest/test_utils/DataGen.h b/internal/core/unittest/test_utils/DataGen.h index 842529a492..a54841394b 100644 --- a/internal/core/unittest/test_utils/DataGen.h +++ b/internal/core/unittest/test_utils/DataGen.h @@ -248,6 +248,7 @@ struct GeneratedData { DataGen(SchemaPtr schema, int64_t N, uint64_t seed, + uint64_t pk_offset, uint64_t ts_offset, int repeat_count, int array_len, @@ -317,14 +318,16 @@ GenerateRandomSparseFloatVector(size_t rows, return tensor; } -inline GeneratedData DataGen(SchemaPtr schema, - int64_t N, - uint64_t seed = 42, - uint64_t ts_offset = 0, - int repeat_count = 1, - int array_len = 10, - bool random_pk = false, - bool random_val = true) { +inline GeneratedData +DataGen(SchemaPtr schema, + int64_t N, + uint64_t seed = 42, + uint64_t pk_offset = 0, + uint64_t ts_offset = 0, + int repeat_count = 1, + int array_len = 10, + bool random_pk = false, + bool random_val = true) { using std::vector; std::default_random_engine random(seed); std::normal_distribution<> distr(0, 1); @@ -425,9 +428,11 @@ inline GeneratedData DataGen(SchemaPtr schema, case DataType::INT64: { vector data(N); for (int i = 0; i < N; i++) { - if (random_pk && schema->get_primary_field_id()->get() == - field_id.get()) { - data[i] = random() % N; + if (schema->get_primary_field_id()->get() == + field_id.get()) { + data[i] = random_pk + ? random() % N + pk_offset + : data[i] = i / repeat_count + pk_offset; } else { data[i] = i / repeat_count; } diff --git a/tests/python_client/testcases/test_query.py b/tests/python_client/testcases/test_query.py index f2af136d4a..ede1c7c34c 100644 --- a/tests/python_client/testcases/test_query.py +++ b/tests/python_client/testcases/test_query.py @@ -2996,7 +2996,7 @@ class TestQueryCount(TestcaseBase): # query count collection_w.query(expr=default_expr, output_fields=[ct.default_count_output], check_task=CheckTasks.check_query_results, - check_items={exp_res: [{count: tmp_nb}]} + check_items={exp_res: [{count: 1}]} ) # delete and verify count