test: added test case for partial update on duplicate pk (#45130)

Issue: #45129
 <test>: <add new test case>
 <also delete duplicate test case>

 On branch feature/partial-update
 Changes to be committed:
	modified:   milvus_client/test_milvus_client_partial_update.py
	modified:   milvus_client/test_milvus_client_upsert.py

---------

Signed-off-by: Eric Hou <eric.hou@zilliz.com>
Co-authored-by: Eric Hou <eric.hou@zilliz.com>
This commit is contained in:
Feilong Hou 2025-11-04 15:47:32 +08:00 committed by GitHub
parent 7193d01808
commit 9e4975bdfa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 56 additions and 272 deletions

View File

@ -1261,6 +1261,61 @@ class TestMilvusClientPartialUpdateValid(TestMilvusClientV2Base):
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_partial_update_duplicate_pk(self):
"""
target: test PU will success when partial update duplicate pk
method:
1. Create a collection
2. Insert rows with duplicate pk
3. Upsert the rows with duplicate pk
expected: Step 3 should success
"""
# step 1: create collection
client = self._client()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_int32_field_name, DataType.INT32, nullable=True)
schema.add_field(default_string_field_name, DataType.VARCHAR, nullable=True, max_length=64)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
collection_name = cf.gen_collection_name_by_testcase_name(module_index=1)
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
# step 2: Insert rows with duplicate pk
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema, skip_field_names=[default_string_field_name])
self.insert(client, collection_name, rows)
dup_rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema, skip_field_names=[default_int32_field_name])
self.insert(client, collection_name, dup_rows)
# verify the duplicate pk is inserted and can be queried
for row in dup_rows:
row[default_int32_field_name] = None
res = self.query(client, collection_name, filter=default_search_exp,
check_task=CheckTasks.check_query_results,
check_items={exp_res: dup_rows,
"pk_name": default_primary_key_field_name})[0]
assert len(res) == default_nb
# step 3: Upsert the rows with duplicate pk
new_rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema,
desired_field_names=[default_primary_key_field_name, default_string_field_name])
self.upsert(client, collection_name, new_rows, partial_update=True)
for i, row in enumerate(dup_rows):
row[default_string_field_name] = new_rows[i][default_string_field_name]
res = self.query(client, collection_name, filter=default_search_exp,
check_task=CheckTasks.check_query_results,
check_items={exp_res: dup_rows,
"pk_name": default_primary_key_field_name})[0]
assert len(res) == default_nb
self.drop_collection(client, collection_name)
class TestMilvusClientPartialUpdateInvalid(TestMilvusClientV2Base):
""" Test case of partial update interface """

View File

@ -551,274 +551,3 @@ class TestMilvusClientUpsertValid(TestMilvusClientV2Base):
self.drop_partition(client, collection_name, partition_name)
if self.has_collection(client, collection_name)[0]:
self.drop_collection(client, collection_name)
""" Test case of partial update interface """
@pytest.fixture(scope="function", params=[False, True])
def auto_id(self, request):
yield request.param
@pytest.fixture(scope="function", params=["COSINE", "L2"])
def metric_type(self, request):
yield request.param
"""
******************************************************************
# The following are invalid base cases
******************************************************************
"""
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_partial_update_new_pk_with_missing_field(self):
"""
target: Test PU will return error when provided new pk and partial field
method:
1. Create a collection
2. partial upsert a new pk with only partial field
expected: Step 2 should result fail
"""
# step 1: create collection
client = self._client()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_int32_field_name, DataType.INT32, nullable=True)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
index_params.add_index(default_int32_field_name, index_type="AUTOINDEX")
collection_name = cf.gen_collection_name_by_testcase_name(module_index=1)
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
# step 2: partial upsert a new pk with only partial field
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema,
desired_field_names=[default_primary_key_field_name, default_int32_field_name])
error = {ct.err_code: 1100, ct.err_msg:
f"fieldSchema({default_vector_field_name}) has no corresponding fieldData pass in: invalid parameter"}
self.upsert(client, collection_name, rows, partial_update=True,
check_task=CheckTasks.err_res, check_items=error)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_partial_update_new_field_without_dynamic_field(self):
"""
target: Test PU will return error when provided new field without dynamic field
method:
1. Create a collection with dynamic field
2. partial upsert a new field
expected: Step 2 should result fail
"""
# step 1: create collection with dynamic field
client = self._client()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
collection_name = cf.gen_collection_name_by_testcase_name(module_index=1)
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
# step 2: partial upsert a new field
row = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
self.upsert(client, collection_name, row, partial_update=True)
new_row = [{default_primary_key_field_name: i, default_int32_field_name: 99} for i in range(default_nb)]
error = {ct.err_code: 1,
ct.err_msg: f"Attempt to insert an unexpected field `{default_int32_field_name}` to collection without enabling dynamic field"}
self.upsert(client, collection_name, new_row, partial_update=True, check_task=CheckTasks.err_res, check_items=error)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_partial_update_after_release_collection(self):
"""
target: test basic function of partial update
method:
1. create collection
2. insert a full row of data using partial update
3. partial update data
4. release collection
5. partial update data
expected: step 5 should fail
"""
# Step 1: create collection
client = self._client()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_string_field_name, DataType.VARCHAR, max_length=64)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
index_params.add_index(default_string_field_name, index_type="AUTOINDEX")
collection_name = cf.gen_collection_name_by_testcase_name(module_index=1)
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
# Step 2: insert a full row of data using partial update
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
self.upsert(client, collection_name, rows, partial_update=True)
# Step 3: partial update data
new_row = cf.gen_row_data_by_schema(nb=default_nb, schema=schema,
desired_field_names=[default_primary_key_field_name, default_string_field_name])
self.upsert(client, collection_name, new_row, partial_update=True)
# Step 4: release collection
self.release_collection(client, collection_name)
# Step 5: partial update data
new_row = cf.gen_row_data_by_schema(nb=default_nb, schema=schema,
desired_field_names=[default_primary_key_field_name, default_string_field_name])
error = {ct.err_code: 101,
ct.err_msg: f"failed to query: collection not loaded"}
self.upsert(client, collection_name, new_row, partial_update=True,
check_task=CheckTasks.err_res, check_items=error)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_partial_update_same_pk_after_delete(self):
"""
target: test PU will fail when provided same pk and partial field
method:
1. Create a collection with dynamic field
2. Insert rows
3. delete the rows
4. upsert the rows with same pk and partial field
expected: step 4 should fail
"""
# Step 1: create collection
client = self._client()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_int32_field_name, DataType.INT32)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
index_params.add_index(default_int32_field_name, index_type="AUTOINDEX")
collection_name = cf.gen_collection_name_by_testcase_name(module_index=1)
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
# Step 2: insert rows
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
self.upsert(client, collection_name, rows, partial_update=True)
# Step 3: delete the rows
result = self.delete(client, collection_name, filter=default_search_exp)[0]
assert result["delete_count"] == default_nb
result = self.query(client, collection_name, filter=default_search_exp,
check_task=CheckTasks.check_nothing)[0]
assert len(result) == 0
# Step 4: upsert the rows with same pk and partial field
new_rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema,
desired_field_names=[default_primary_key_field_name, default_vector_field_name])
error = {ct.err_code: 1100,
ct.err_msg: f"fieldSchema({default_int32_field_name}) has no corresponding fieldData pass in: invalid parameter"}
self.upsert(client, collection_name, new_rows, partial_update=True,
check_task=CheckTasks.err_res, check_items=error)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_partial_update_pk_in_wrong_partition(self):
"""
target: test PU will fail when provided pk in wrong partition
method:
1. Create a collection
2. Create 2 partitions
3. Insert rows
4. upsert the rows with pk in wrong partition
expected: step 4 should fail
"""
# Step 1: create collection
client = self._client()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_int32_field_name, DataType.INT32)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
index_params.add_index(default_int32_field_name, index_type="AUTOINDEX")
collection_name = cf.gen_collection_name_by_testcase_name(module_index=1)
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
# Step 2: Create 2 partitions
num_of_partitions = 2
partition_names = []
for _ in range(num_of_partitions):
partition_name = cf.gen_unique_str("partition")
self.create_partition(client, collection_name, partition_name)
partition_names.append(partition_name)
# Step 3: Insert rows
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
gap = default_nb // num_of_partitions
for i, partition in enumerate(partition_names):
self.upsert(client, collection_name, rows[i*gap:(i+1)*gap], partition_name=partition, partial_update=True)
# Step 4: upsert the rows with pk in wrong partition
new_rows = cf.gen_row_data_by_schema(nb=gap, schema=schema,
desired_field_names=[default_primary_key_field_name, default_vector_field_name])
error = {ct.err_code: 1100,
ct.err_msg: f"fieldSchema({default_int32_field_name}) has no corresponding fieldData pass in: invalid parameter"}
self.upsert(client, collection_name, new_rows, partition_name=partition_names[-1], partial_update=True,
check_task=CheckTasks.err_res, check_items=error)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_partial_update_same_pk_multiple_fields(self):
"""
target: Test PU will success and query will success
method:
1. Create a collection
2. Insert rows
3. Upsert the rows with same pk and different field
expected: Step 3 should fail
"""
# step 1: create collection
client = self._client()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_int32_field_name, DataType.INT32, nullable=True)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
index_params.add_index(default_int32_field_name, index_type="AUTOINDEX")
collection_name = cf.gen_collection_name_by_testcase_name(module_index=1)
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
# step 2: Insert rows
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
self.upsert(client, collection_name, rows, partial_update=True)
# step 3: Upsert the rows with same pk and different field
new_rows = []
for i in range(default_nb):
data = {}
if i % 2 == 0:
data[default_int32_field_name] = i + 1000
data[default_primary_key_field_name] = 0
else:
data[default_vector_field_name] = [random.random() for _ in range(default_dim)]
data[default_primary_key_field_name] = 0
new_rows.append(data)
error = {ct.err_code: 1,
ct.err_msg: f"The data fields length is inconsistent. previous length is 2000, current length is 1000"}
self.upsert(client, collection_name, new_rows, partial_update=True,
check_task=CheckTasks.err_res, check_items=error)
self.drop_collection(client, collection_name)