diff --git a/tests/python_client/testcases/test_compaction.py b/tests/python_client/testcases/test_compaction.py index 9071510419..52be93449d 100644 --- a/tests/python_client/testcases/test_compaction.py +++ b/tests/python_client/testcases/test_compaction.py @@ -340,6 +340,7 @@ class TestCompactionParams(TestcaseBase): class TestCompactionOperation(TestcaseBase): @pytest.mark.xfail(reason="Issue https://github.com/milvus-io/milvus/issues/15665") + @pytest.mark.xfail(reason="https://github.com/milvus-io/milvus/issues/17625") @pytest.mark.tags(CaseLabel.L3) def test_compact_both_delete_merge(self): """ @@ -359,7 +360,9 @@ class TestCompactionOperation(TestcaseBase): assert collection_w.num_entities == (i + 1) * tmp_nb ids.extend(insert_res.primary_keys) - expr = f'{ct.default_int64_field_name} in {[0, 2 * tmp_nb - 1]}' + # delete_ids = ids[:tmp_nb] + delete_ids = [0, tmp_nb // 2] + expr = f'{ct.default_int64_field_name} in {delete_ids}' collection_w.delete(expr) collection_w.insert(cf.gen_default_dataframe_data(1, start=2 * tmp_nb)) @@ -371,7 +374,7 @@ class TestCompactionOperation(TestcaseBase): collection_w.wait_for_compaction_completed() c_plans = collection_w.get_compaction_plans()[0] - assert len(c_plans.plans) == 2 + # assert len(c_plans.plans) == 2 # todo assert two types compaction plan # search @@ -384,6 +387,74 @@ class TestCompactionOperation(TestcaseBase): check_items={"nq": ct.default_nq, "ids": ids, "limit": ct.default_limit}) + collection_w.query(f"{ct.default_int64_field_name} in {delete_ids}", check_task=CheckTasks.check_query_empty) + + @pytest.mark.tags(CaseLabel.L3) + def test_compact_delete_multi_segments(self): + """ + target: test compact multi delete segments + method: 1.create collection with shard_num=2 + 2.insert data into two segments + 3.delete entities from two segments + 4.compact + 5.load and search + expected: Verify two delta compaction plans + """ + collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix)) + df = cf.gen_default_dataframe_data(2*tmp_nb) + insert_res, _ = collection_w.insert(df) + assert collection_w.num_entities == 2 * tmp_nb + + collection_w.load() + log.debug(self.utility_wrap.get_query_segment_info(collection_w.name)) + + delete_ids = [i for i in range(10)] + expr = f'{ct.default_int64_field_name} in {delete_ids}' + collection_w.delete(expr) + + sleep(ct.compact_retention_duration + 1) + + collection_w.compact() + collection_w.wait_for_compaction_completed() + c_plans = collection_w.get_compaction_plans()[0] + assert len(c_plans.plans) == 2 + for plan in c_plans.plans: + assert len(plan.sources) == 1 + + collection_w.query(f"{ct.default_int64_field_name} in {delete_ids}", check_task=CheckTasks.check_query_empty) + + @pytest.mark.tags(CaseLabel.L2) + def test_compact_merge_multi_shards(self): + """ + target: test compact merge multi shards + method: 1.Create a collection with 2 shards + 2.Insert twice and generate 4 segments + 3.Compact and wait it completed + expected: Verify there are 2 merge type complation plans + """ + collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix)) + for i in range(2): + df = cf.gen_default_dataframe_data(2 * tmp_nb) + insert_res, _ = collection_w.insert(df) + log.debug(collection_w.num_entities) + + collection_w.load() + log.debug(self.utility_wrap.get_query_segment_info(collection_w.name)) + + collection_w.compact() + collection_w.wait_for_compaction_completed() + c_plans = collection_w.get_compaction_plans()[0] + assert len(c_plans.plans) == 2 + targets = [] + for plan in c_plans.plans: + assert len(plan.sources) == 2 + targets.append(plan.target) + + collection_w.release() + collection_w.load() + seg_info, _ = self.utility_wrap.get_query_segment_info(collection_w.name) + for seg in seg_info: + seg.segmentID in targets @pytest.mark.tags(CaseLabel.L1) def test_compact_after_index(self): @@ -533,6 +604,7 @@ class TestCompactionOperation(TestcaseBase): "ids": insert_res.primary_keys[ct.default_nb // 2:], "limit": ct.default_limit} ) + collection_w.query("int64 in [0]", check_task=CheckTasks.check_query_empty) @pytest.mark.tags(CaseLabel.L0) def test_compact_merge_and_search(self): @@ -583,6 +655,7 @@ class TestCompactionOperation(TestcaseBase): collection_w.delete(expr) collection_w.compact() + collection_w.wait_for_compaction_completed() c_plans = collection_w.get_compaction_plans()[0] assert len(c_plans.plans) == 1 @@ -647,8 +720,9 @@ class TestCompactionOperation(TestcaseBase): delete_res, _ = collection_w.delete(expr) log.debug(collection_w.num_entities) - c_plans = collection_w.get_compaction_plans()[0] - assert len(c_plans.plans) == 0 + collection_w.compact() + collection_w.wait_for_compaction_completed() + collection_w.get_compaction_plans() collection_w.load() search_one, _ = collection_w.search(df[ct.default_float_vec_field_name][:1].to_list(), @@ -731,7 +805,7 @@ class TestCompactionOperation(TestcaseBase): method: 1.create with shard_num=1 2.insert and flush 3.compact and search - expected: No exception and no compact plans + expected: No exception and compact plans """ # create collection collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1) @@ -739,10 +813,18 @@ class TestCompactionOperation(TestcaseBase): collection_w.insert(df) assert collection_w.num_entities == tmp_nb + collection_w.load() + + seg_before, _ = self.utility_wrap.get_query_segment_info(collection_w.name) + collection_w.compact() collection_w.wait_for_compaction_completed() c_plans, _ = collection_w.get_compaction_plans() assert len(c_plans.plans) == 1 + assert [seg_before[0].segmentID] == c_plans.plans[0].sources + + seg_after, _ = self.utility_wrap.get_query_segment_info(collection_w.name) + assert seg_after[0].segmentID == c_plans.plans[0].target @pytest.mark.tags(CaseLabel.L1) def test_compact_manual_and_auto(self): @@ -762,13 +844,12 @@ class TestCompactionOperation(TestcaseBase): collection_w.compact() collection_w.wait_for_compaction_completed() - collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact, check_items={"segment_num": 2}) + c_plans, _ = collection_w.get_compaction_plans(check_task=CheckTasks.check_merge_compact, check_items={"segment_num": 2}) collection_w.load() - replicas = collection_w.get_replicas()[0] - replica_num = len(replicas.groups) segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0] - assert len(segments_info) == 1*replica_num + assert len(segments_info) == 1 + assert segments_info[0].segmentID == c_plans.plans[0].target @pytest.mark.tags(CaseLabel.L1) def test_compact_merge_multi_segments(self): @@ -952,7 +1033,7 @@ class TestCompactionOperation(TestcaseBase): method: 1.create with shard_num=2 2.insert once and flush (two segments, belonging to two shards) 3.compact and completed - expected: Verify no compact + expected: Verify compact plan sources only one segment """ # insert into two segments with two shard collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=2) @@ -967,6 +1048,8 @@ class TestCompactionOperation(TestcaseBase): # Actually no merged assert len(c_plans.plans) == 2 + for plan in c_plans.plans: + assert len(plan.sources) == 1 @pytest.mark.tags(CaseLabel.L3) def test_compact_delete_cross_shards(self): @@ -1002,7 +1085,7 @@ class TestCompactionOperation(TestcaseBase): 2.create partition and insert, flush 3.insert _default partition and flush 4.compact - expected: Verify no compact + expected: Verify two independent compaction plans """ # create collection and partition collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1) @@ -1021,7 +1104,9 @@ class TestCompactionOperation(TestcaseBase): c_plans = collection_w.get_compaction_plans()[0] # since manual compaction, segment should be compacted any way - assert len(c_plans.plans) != 0 + assert len(c_plans.plans) == 2 + for plan in c_plans.plans: + assert len(plan.sources) == 1 @pytest.mark.tags(CaseLabel.L2) def test_compact_during_insert(self):