From 9e4975bdfa1c2163d4f91e9a0f4d390f08a93667 Mon Sep 17 00:00:00 2001 From: Feilong Hou <77430856+FeilongHou@users.noreply.github.com> Date: Tue, 4 Nov 2025 15:47:32 +0800 Subject: [PATCH] test: added test case for partial update on duplicate pk (#45130) Issue: #45129 : On branch feature/partial-update Changes to be committed: modified: milvus_client/test_milvus_client_partial_update.py modified: milvus_client/test_milvus_client_upsert.py --------- Signed-off-by: Eric Hou Co-authored-by: Eric Hou --- .../test_milvus_client_partial_update.py | 55 ++++ .../test_milvus_client_upsert.py | 273 +----------------- 2 files changed, 56 insertions(+), 272 deletions(-) diff --git a/tests/python_client/milvus_client/test_milvus_client_partial_update.py b/tests/python_client/milvus_client/test_milvus_client_partial_update.py index b8f19fe37d..73e739eb86 100644 --- a/tests/python_client/milvus_client/test_milvus_client_partial_update.py +++ b/tests/python_client/milvus_client/test_milvus_client_partial_update.py @@ -1261,6 +1261,61 @@ class TestMilvusClientPartialUpdateValid(TestMilvusClientV2Base): self.drop_collection(client, collection_name) + @pytest.mark.tags(CaseLabel.L1) + def test_milvus_client_partial_update_duplicate_pk(self): + """ + target: test PU will success when partial update duplicate pk + method: + 1. Create a collection + 2. Insert rows with duplicate pk + 3. Upsert the rows with duplicate pk + expected: Step 3 should success + """ + # step 1: create collection + client = self._client() + schema = self.create_schema(client, enable_dynamic_field=False)[0] + schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False) + schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim) + schema.add_field(default_int32_field_name, DataType.INT32, nullable=True) + schema.add_field(default_string_field_name, DataType.VARCHAR, nullable=True, max_length=64) + index_params = self.prepare_index_params(client)[0] + index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX") + index_params.add_index(default_vector_field_name, index_type="AUTOINDEX") + collection_name = cf.gen_collection_name_by_testcase_name(module_index=1) + self.create_collection(client, collection_name, default_dim, schema=schema, + consistency_level="Strong", index_params=index_params) + + # step 2: Insert rows with duplicate pk + rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema, skip_field_names=[default_string_field_name]) + self.insert(client, collection_name, rows) + dup_rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema, skip_field_names=[default_int32_field_name]) + self.insert(client, collection_name, dup_rows) + + # verify the duplicate pk is inserted and can be queried + for row in dup_rows: + row[default_int32_field_name] = None + res = self.query(client, collection_name, filter=default_search_exp, + check_task=CheckTasks.check_query_results, + check_items={exp_res: dup_rows, + "pk_name": default_primary_key_field_name})[0] + assert len(res) == default_nb + + # step 3: Upsert the rows with duplicate pk + new_rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema, + desired_field_names=[default_primary_key_field_name, default_string_field_name]) + + self.upsert(client, collection_name, new_rows, partial_update=True) + for i, row in enumerate(dup_rows): + row[default_string_field_name] = new_rows[i][default_string_field_name] + + res = self.query(client, collection_name, filter=default_search_exp, + check_task=CheckTasks.check_query_results, + check_items={exp_res: dup_rows, + "pk_name": default_primary_key_field_name})[0] + + assert len(res) == default_nb + + self.drop_collection(client, collection_name) class TestMilvusClientPartialUpdateInvalid(TestMilvusClientV2Base): """ Test case of partial update interface """ diff --git a/tests/python_client/milvus_client/test_milvus_client_upsert.py b/tests/python_client/milvus_client/test_milvus_client_upsert.py index 7688897f66..8c896740f7 100644 --- a/tests/python_client/milvus_client/test_milvus_client_upsert.py +++ b/tests/python_client/milvus_client/test_milvus_client_upsert.py @@ -550,275 +550,4 @@ class TestMilvusClientUpsertValid(TestMilvusClientV2Base): self.release_partitions(client, collection_name, partition_name) self.drop_partition(client, collection_name, partition_name) if self.has_collection(client, collection_name)[0]: - self.drop_collection(client, collection_name) - - - - """ Test case of partial update interface """ - @pytest.fixture(scope="function", params=[False, True]) - def auto_id(self, request): - yield request.param - - @pytest.fixture(scope="function", params=["COSINE", "L2"]) - def metric_type(self, request): - yield request.param - - """ - ****************************************************************** - # The following are invalid base cases - ****************************************************************** - """ - @pytest.mark.tags(CaseLabel.L1) - def test_milvus_client_partial_update_new_pk_with_missing_field(self): - """ - target: Test PU will return error when provided new pk and partial field - method: - 1. Create a collection - 2. partial upsert a new pk with only partial field - expected: Step 2 should result fail - """ - # step 1: create collection - client = self._client() - schema = self.create_schema(client, enable_dynamic_field=False)[0] - schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False) - schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim) - schema.add_field(default_int32_field_name, DataType.INT32, nullable=True) - index_params = self.prepare_index_params(client)[0] - index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX") - index_params.add_index(default_vector_field_name, index_type="AUTOINDEX") - index_params.add_index(default_int32_field_name, index_type="AUTOINDEX") - collection_name = cf.gen_collection_name_by_testcase_name(module_index=1) - self.create_collection(client, collection_name, default_dim, schema=schema, - consistency_level="Strong", index_params=index_params) - - # step 2: partial upsert a new pk with only partial field - rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema, - desired_field_names=[default_primary_key_field_name, default_int32_field_name]) - error = {ct.err_code: 1100, ct.err_msg: - f"fieldSchema({default_vector_field_name}) has no corresponding fieldData pass in: invalid parameter"} - self.upsert(client, collection_name, rows, partial_update=True, - check_task=CheckTasks.err_res, check_items=error) - - self.drop_collection(client, collection_name) - - @pytest.mark.tags(CaseLabel.L1) - def test_milvus_client_partial_update_new_field_without_dynamic_field(self): - """ - target: Test PU will return error when provided new field without dynamic field - method: - 1. Create a collection with dynamic field - 2. partial upsert a new field - expected: Step 2 should result fail - """ - # step 1: create collection with dynamic field - client = self._client() - schema = self.create_schema(client, enable_dynamic_field=False)[0] - schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False) - schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim) - index_params = self.prepare_index_params(client)[0] - index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX") - index_params.add_index(default_vector_field_name, index_type="AUTOINDEX") - collection_name = cf.gen_collection_name_by_testcase_name(module_index=1) - self.create_collection(client, collection_name, default_dim, schema=schema, - consistency_level="Strong", index_params=index_params) - - # step 2: partial upsert a new field - row = cf.gen_row_data_by_schema(nb=default_nb, schema=schema) - self.upsert(client, collection_name, row, partial_update=True) - - new_row = [{default_primary_key_field_name: i, default_int32_field_name: 99} for i in range(default_nb)] - error = {ct.err_code: 1, - ct.err_msg: f"Attempt to insert an unexpected field `{default_int32_field_name}` to collection without enabling dynamic field"} - self.upsert(client, collection_name, new_row, partial_update=True, check_task=CheckTasks.err_res, check_items=error) - - self.drop_collection(client, collection_name) - - @pytest.mark.tags(CaseLabel.L1) - def test_milvus_client_partial_update_after_release_collection(self): - """ - target: test basic function of partial update - method: - 1. create collection - 2. insert a full row of data using partial update - 3. partial update data - 4. release collection - 5. partial update data - expected: step 5 should fail - """ - # Step 1: create collection - client = self._client() - schema = self.create_schema(client, enable_dynamic_field=False)[0] - schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False) - schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim) - schema.add_field(default_string_field_name, DataType.VARCHAR, max_length=64) - index_params = self.prepare_index_params(client)[0] - index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX") - index_params.add_index(default_vector_field_name, index_type="AUTOINDEX") - index_params.add_index(default_string_field_name, index_type="AUTOINDEX") - collection_name = cf.gen_collection_name_by_testcase_name(module_index=1) - self.create_collection(client, collection_name, default_dim, schema=schema, - consistency_level="Strong", index_params=index_params) - - # Step 2: insert a full row of data using partial update - rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema) - self.upsert(client, collection_name, rows, partial_update=True) - - # Step 3: partial update data - new_row = cf.gen_row_data_by_schema(nb=default_nb, schema=schema, - desired_field_names=[default_primary_key_field_name, default_string_field_name]) - self.upsert(client, collection_name, new_row, partial_update=True) - - # Step 4: release collection - self.release_collection(client, collection_name) - - # Step 5: partial update data - new_row = cf.gen_row_data_by_schema(nb=default_nb, schema=schema, - desired_field_names=[default_primary_key_field_name, default_string_field_name]) - error = {ct.err_code: 101, - ct.err_msg: f"failed to query: collection not loaded"} - self.upsert(client, collection_name, new_row, partial_update=True, - check_task=CheckTasks.err_res, check_items=error) - - self.drop_collection(client, collection_name) - - @pytest.mark.tags(CaseLabel.L1) - def test_milvus_client_partial_update_same_pk_after_delete(self): - """ - target: test PU will fail when provided same pk and partial field - method: - 1. Create a collection with dynamic field - 2. Insert rows - 3. delete the rows - 4. upsert the rows with same pk and partial field - expected: step 4 should fail - """ - # Step 1: create collection - client = self._client() - schema = self.create_schema(client, enable_dynamic_field=False)[0] - schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False) - schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim) - schema.add_field(default_int32_field_name, DataType.INT32) - index_params = self.prepare_index_params(client)[0] - index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX") - index_params.add_index(default_vector_field_name, index_type="AUTOINDEX") - index_params.add_index(default_int32_field_name, index_type="AUTOINDEX") - collection_name = cf.gen_collection_name_by_testcase_name(module_index=1) - self.create_collection(client, collection_name, default_dim, schema=schema, - consistency_level="Strong", index_params=index_params) - - # Step 2: insert rows - rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema) - self.upsert(client, collection_name, rows, partial_update=True) - - # Step 3: delete the rows - result = self.delete(client, collection_name, filter=default_search_exp)[0] - assert result["delete_count"] == default_nb - result = self.query(client, collection_name, filter=default_search_exp, - check_task=CheckTasks.check_nothing)[0] - assert len(result) == 0 - - # Step 4: upsert the rows with same pk and partial field - new_rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema, - desired_field_names=[default_primary_key_field_name, default_vector_field_name]) - error = {ct.err_code: 1100, - ct.err_msg: f"fieldSchema({default_int32_field_name}) has no corresponding fieldData pass in: invalid parameter"} - self.upsert(client, collection_name, new_rows, partial_update=True, - check_task=CheckTasks.err_res, check_items=error) - - self.drop_collection(client, collection_name) - - @pytest.mark.tags(CaseLabel.L1) - def test_milvus_client_partial_update_pk_in_wrong_partition(self): - """ - target: test PU will fail when provided pk in wrong partition - method: - 1. Create a collection - 2. Create 2 partitions - 3. Insert rows - 4. upsert the rows with pk in wrong partition - expected: step 4 should fail - """ - # Step 1: create collection - client = self._client() - schema = self.create_schema(client, enable_dynamic_field=False)[0] - schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False) - schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim) - schema.add_field(default_int32_field_name, DataType.INT32) - index_params = self.prepare_index_params(client)[0] - index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX") - index_params.add_index(default_vector_field_name, index_type="AUTOINDEX") - index_params.add_index(default_int32_field_name, index_type="AUTOINDEX") - collection_name = cf.gen_collection_name_by_testcase_name(module_index=1) - self.create_collection(client, collection_name, default_dim, schema=schema, - consistency_level="Strong", index_params=index_params) - - # Step 2: Create 2 partitions - num_of_partitions = 2 - partition_names = [] - for _ in range(num_of_partitions): - partition_name = cf.gen_unique_str("partition") - self.create_partition(client, collection_name, partition_name) - partition_names.append(partition_name) - - # Step 3: Insert rows - rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema) - gap = default_nb // num_of_partitions - for i, partition in enumerate(partition_names): - self.upsert(client, collection_name, rows[i*gap:(i+1)*gap], partition_name=partition, partial_update=True) - - # Step 4: upsert the rows with pk in wrong partition - new_rows = cf.gen_row_data_by_schema(nb=gap, schema=schema, - desired_field_names=[default_primary_key_field_name, default_vector_field_name]) - error = {ct.err_code: 1100, - ct.err_msg: f"fieldSchema({default_int32_field_name}) has no corresponding fieldData pass in: invalid parameter"} - self.upsert(client, collection_name, new_rows, partition_name=partition_names[-1], partial_update=True, - check_task=CheckTasks.err_res, check_items=error) - - self.drop_collection(client, collection_name) - - @pytest.mark.tags(CaseLabel.L1) - def test_milvus_client_partial_update_same_pk_multiple_fields(self): - """ - target: Test PU will success and query will success - method: - 1. Create a collection - 2. Insert rows - 3. Upsert the rows with same pk and different field - expected: Step 3 should fail - """ - # step 1: create collection - client = self._client() - schema = self.create_schema(client, enable_dynamic_field=False)[0] - schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False) - schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim) - schema.add_field(default_int32_field_name, DataType.INT32, nullable=True) - index_params = self.prepare_index_params(client)[0] - index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX") - index_params.add_index(default_vector_field_name, index_type="AUTOINDEX") - index_params.add_index(default_int32_field_name, index_type="AUTOINDEX") - collection_name = cf.gen_collection_name_by_testcase_name(module_index=1) - self.create_collection(client, collection_name, default_dim, schema=schema, - consistency_level="Strong", index_params=index_params) - - # step 2: Insert rows - rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema) - self.upsert(client, collection_name, rows, partial_update=True) - - # step 3: Upsert the rows with same pk and different field - new_rows = [] - for i in range(default_nb): - data = {} - if i % 2 == 0: - data[default_int32_field_name] = i + 1000 - data[default_primary_key_field_name] = 0 - else: - data[default_vector_field_name] = [random.random() for _ in range(default_dim)] - data[default_primary_key_field_name] = 0 - new_rows.append(data) - - error = {ct.err_code: 1, - ct.err_msg: f"The data fields length is inconsistent. previous length is 2000, current length is 1000"} - self.upsert(client, collection_name, new_rows, partial_update=True, - check_task=CheckTasks.err_res, check_items=error) - - self.drop_collection(client, collection_name) \ No newline at end of file + self.drop_collection(client, collection_name) \ No newline at end of file