milvus/tests/python_client/milvus_client/test_milvus_client_upsert.py
yanliang567 bb4446e5af
test:[cp 2.6] add test for allow insert auto id (#44837)
related issue: https://github.com/milvus-io/milvus/issues/44425
pr: #44801

split insert.py into a few files: upsert.py, insert.py,
partial_upsert.py ...
add test for allow insert auto id

---------

Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>
2025-10-15 11:38:00 +08:00

824 lines
43 KiB
Python

import pytest
import numpy as np
from base.client_v2_base import TestMilvusClientV2Base
from utils.util_log import test_log as log
from common import common_func as cf
from common import common_type as ct
from common.common_type import CaseLabel, CheckTasks
from utils.util_pymilvus import *
prefix = "client_insert"
epsilon = ct.epsilon
default_nb = ct.default_nb
default_nb_medium = ct.default_nb_medium
default_nq = ct.default_nq
default_dim = ct.default_dim
default_limit = ct.default_limit
default_search_exp = "id >= 0"
exp_res = "exp_res"
default_search_string_exp = "varchar >= \"0\""
default_search_mix_exp = "int64 >= 0 && varchar >= \"0\""
default_invaild_string_exp = "varchar >= 0"
default_json_search_exp = "json_field[\"number\"] >= 0"
perfix_expr = 'varchar like "0%"'
default_search_field = ct.default_float_vec_field_name
default_search_params = ct.default_search_params
default_primary_key_field_name = "id"
default_vector_field_name = "vector"
default_dynamic_field_name = "field_new"
default_float_field_name = ct.default_float_field_name
default_bool_field_name = ct.default_bool_field_name
default_string_field_name = ct.default_string_field_name
default_int32_array_field_name = ct.default_int32_array_field_name
default_string_array_field_name = ct.default_string_array_field_name
default_int32_field_name = ct.default_int32_field_name
default_int32_value = ct.default_int32_value
class TestMilvusClientUpsertInvalid(TestMilvusClientV2Base):
""" Test case of search interface """
@pytest.fixture(scope="function", params=[False, True])
def auto_id(self, request):
yield request.param
@pytest.fixture(scope="function", params=["COSINE", "L2"])
def metric_type(self, request):
yield request.param
"""
******************************************************************
# The following are invalid base cases
******************************************************************
"""
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_upsert_column_data(self):
"""
target: test insert column data
method: create connection, collection, insert and search
expected: raise error
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. create collection
self.create_collection(client, collection_name, default_dim)
# 2. insert
vectors = [[random.random() for _ in range(default_dim)] for _ in range(default_nb)]
data = [[i for i in range(default_nb)], vectors]
error = {ct.err_code: 999,
ct.err_msg: "The Input data type is inconsistent with defined schema, please check it."}
self.upsert(client, collection_name, data,
check_task=CheckTasks.err_res, check_items=error)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_upsert_empty_collection_name(self):
"""
target: test high level api: client.create_collection
method: create collection with invalid primary field
expected: Raise exception
"""
client = self._client()
collection_name = ""
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
error = {ct.err_code: 1, ct.err_msg: f"`collection_name` value {collection_name} is illegal"}
self.upsert(client, collection_name, rows,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("collection_name", ["12-s", "12 s", "(mn)", "中文", "%$#"])
def test_milvus_client_upsert_invalid_collection_name(self, collection_name):
"""
target: test high level api: client.create_collection
method: create collection with invalid primary field
expected: Raise exception
"""
client = self._client()
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
error = {ct.err_code: 1100, ct.err_msg: f"Invalid collection name: {collection_name}. the first character of a "
f"collection name must be an underscore or letter: invalid parameter"}
self.upsert(client, collection_name, rows,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_upsert_collection_name_over_max_length(self):
"""
target: test high level api: client.create_collection
method: create collection with invalid primary field
expected: Raise exception
"""
client = self._client()
collection_name = "a".join("a" for i in range(256))
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
error = {ct.err_code: 1100, ct.err_msg: f"the length of a collection name must be less than 255 characters"}
self.upsert(client, collection_name, rows,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_upsert_not_exist_collection_name(self):
"""
target: test high level api: client.create_collection
method: create collection with invalid primary field
expected: Raise exception
"""
client = self._client()
collection_name = cf.gen_unique_str("insert_not_exist")
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
error = {ct.err_code: 100, ct.err_msg: f"can't find collection[database=default][collection={collection_name}]"}
self.upsert(client, collection_name, rows,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("data", ["12-s", "12 s", "(mn)", "中文", "%$#", " "])
def test_milvus_client_upsert_data_invalid_type(self, data):
"""
target: test high level api: client.create_collection
method: create collection with invalid primary field
expected: Raise exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# 2. insert
error = {ct.err_code: 1, ct.err_msg: f"wrong type of argument 'data',expected 'Dict' or list of 'Dict'"}
self.upsert(client, collection_name, data,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_upsert_data_empty(self):
"""
target: test high level api: client.create_collection
method: create collection with invalid primary field
expected: Raise exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# 2. insert
error = {ct.err_code: 1, ct.err_msg: f"wrong type of argument 'data',expected 'Dict' or list of 'Dict'"}
self.upsert(client, collection_name, data="",
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_upsert_data_vector_field_missing(self):
"""
target: test high level api: client.create_collection
method: create collection with invalid primary field
expected: Raise exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# 2. insert
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i,
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(10)]
error = {ct.err_code: 1,
ct.err_msg: "Insert missed an field `vector` to collection without set nullable==true or set default_value"}
self.upsert(client, collection_name, data=rows,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_upsert_data_id_field_missing(self):
"""
target: test high level api: client.create_collection
method: create collection with invalid primary field
expected: Raise exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# 2. insert
rng = np.random.default_rng(seed=19530)
rows = [{default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(20)]
error = {ct.err_code: 1,
ct.err_msg: f"Insert missed an field `id` to collection without set nullable==true or set default_value"}
self.upsert(client, collection_name, data=rows,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_upsert_data_extra_field(self):
"""
target: test milvus client: insert extra field than schema
method: insert extra field than schema when enable_dynamic_field is False
expected: Raise exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. create collection
dim = 32
self.create_collection(client, collection_name, dim, enable_dynamic_field=False)
# 2. insert
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(10)]
error = {ct.err_code: 1,
ct.err_msg: f"Attempt to insert an unexpected field `float` to collection without enabling dynamic field"}
self.upsert(client, collection_name, data=rows,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_upsert_data_dim_not_match(self):
"""
target: test milvus client: insert extra field than schema
method: insert extra field than schema when enable_dynamic_field is False
expected: Raise exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. create collection
self.create_collection(client, collection_name, default_dim)
# 2. insert
rng = np.random.default_rng(seed=19530)
rows = [
{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim + 1))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
error = {ct.err_code: 65536, ct.err_msg: f"of float data should divide the dim({default_dim})"}
self.upsert(client, collection_name, data=rows,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_upsert_not_matched_data(self):
"""
target: test milvus client: insert not matched data then defined
method: insert string to int primary field
expected: Raise exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. create collection
self.create_collection(client, collection_name, default_dim)
# 2. insert
rng = np.random.default_rng(seed=19530)
rows = [
{default_primary_key_field_name: str(i), default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
error = {ct.err_code: 1,
ct.err_msg: "The Input data type is inconsistent with defined schema, {id} field should be a int64"}
self.upsert(client, collection_name, data=rows,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("partition_name", ["12 s", "(mn)", "中文", "%$#", " "])
def test_milvus_client_upsert_invalid_partition_name(self, partition_name):
"""
target: test milvus client: insert extra field than schema
method: insert extra field than schema when enable_dynamic_field is False
expected: Raise exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. create collection
self.create_collection(client, collection_name, default_dim)
# 2. insert
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
error = {ct.err_code: 65535, ct.err_msg: f"Invalid partition name: {partition_name}"}
if partition_name == " ":
error = {ct.err_code: 1, ct.err_msg: f"Invalid partition name: . Partition name should not be empty."}
self.upsert(client, collection_name, data=rows, partition_name=partition_name,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_upsert_not_exist_partition_name(self):
"""
target: test milvus client: insert extra field than schema
method: insert extra field than schema when enable_dynamic_field is False
expected: Raise exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. create collection
self.create_collection(client, collection_name, default_dim)
# 2. insert
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
partition_name = cf.gen_unique_str("partition_not_exist")
error = {ct.err_code: 200, ct.err_msg: f"partition not found[partition={partition_name}]"}
self.upsert(client, collection_name, data=rows, partition_name=partition_name,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_upsert_collection_partition_not_match(self):
"""
target: test milvus client: insert extra field than schema
method: insert extra field than schema when enable_dynamic_field is False
expected: Raise exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
another_collection_name = cf.gen_unique_str(prefix + "another")
partition_name = cf.gen_unique_str("partition")
# 1. create collection
self.create_collection(client, collection_name, default_dim)
self.create_collection(client, another_collection_name, default_dim)
self.create_partition(client, another_collection_name, partition_name)
# 2. insert
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
error = {ct.err_code: 200, ct.err_msg: f"partition not found[partition={partition_name}]"}
self.upsert(client, collection_name, data=rows, partition_name=partition_name,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("nullable", [True, False])
def test_milvus_client_insert_array_element_null(self, nullable):
"""
target: test search with null expression on each key of json
method: create connection, collection, insert and search
expected: raise exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
dim = 5
# 1. create collection
nullable_field_name = "nullable_field"
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.VARCHAR, max_length=64, is_primary=True,
auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=dim)
schema.add_field(nullable_field_name, DataType.ARRAY, element_type=DataType.INT64, max_capacity=12,
max_length=64, nullable=nullable)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_vector_field_name, metric_type="COSINE")
self.create_collection(client, collection_name, dimension=dim, schema=schema, index_params=index_params)
# 2. insert
vectors = cf.gen_vectors(default_nb, dim)
rows = [{default_primary_key_field_name: str(i), default_vector_field_name: vectors[i],
nullable_field_name: [None, 2, 3]} for i in range(default_nb)]
error = {ct.err_code: 1,
ct.err_msg: "The Input data type is inconsistent with defined schema, {nullable_field} field "
"should be a array, but got a {<class 'list'>} instead."}
self.insert(client, collection_name, rows,
check_task=CheckTasks.err_res,
check_items=error)
class TestMilvusClientUpsertValid(TestMilvusClientV2Base):
""" Test case of search interface """
@pytest.fixture(scope="function", params=[False, True])
def auto_id(self, request):
yield request.param
@pytest.fixture(scope="function", params=["COSINE", "L2"])
def metric_type(self, request):
yield request.param
"""
******************************************************************
# The following are valid base cases
******************************************************************
"""
@pytest.mark.tags(CaseLabel.L0)
def test_milvus_client_upsert_default(self):
"""
target: test search (high level api) normal case
method: create connection, collection, insert and search
expected: search/query successfully
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
collections = self.list_collections(client)[0]
assert collection_name in collections
self.describe_collection(client, collection_name,
check_task=CheckTasks.check_describe_collection_property,
check_items={"collection_name": collection_name,
"dim": default_dim,
"consistency_level": 0})
# 2. insert
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
results = self.upsert(client, collection_name, rows)[0]
assert results['upsert_count'] == default_nb
# 3. search
vectors_to_search = rng.random((1, default_dim))
insert_ids = [i for i in range(default_nb)]
self.search(client, collection_name, vectors_to_search,
check_task=CheckTasks.check_search_results,
check_items={"enable_milvus_client_api": True,
"nq": len(vectors_to_search),
"ids": insert_ids,
"limit": default_limit,
"pk_name": default_primary_key_field_name})
# 4. query
self.query(client, collection_name, filter=default_search_exp,
check_task=CheckTasks.check_query_results,
check_items={exp_res: rows,
"with_vec": True,
"pk_name": default_primary_key_field_name})
self.release_collection(client, collection_name)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_upsert_empty_data(self):
"""
target: test search (high level api) normal case
method: create connection, collection, insert and search
expected: search/query successfully
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# 2. insert
rows = []
results = self.upsert(client, collection_name, rows)[0]
assert results['upsert_count'] == 0
# 3. search
rng = np.random.default_rng(seed=19530)
vectors_to_search = rng.random((1, default_dim))
self.search(client, collection_name, vectors_to_search,
check_task=CheckTasks.check_search_results,
check_items={"enable_milvus_client_api": True,
"nq": len(vectors_to_search),
"ids": [],
"pk_name": default_primary_key_field_name,
"limit": 0})
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_upsert_partition(self):
"""
target: test fast create collection normal case
method: create collection
expected: create collection with default schema, index, and load successfully
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
partition_name = cf.gen_unique_str(prefix)
# 1. create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# 2. create partition
self.create_partition(client, collection_name, partition_name)
partitions = self.list_partitions(client, collection_name)[0]
assert partition_name in partitions
index = self.list_indexes(client, collection_name)[0]
assert index == ['vector']
# load_state = self.get_load_state(collection_name)[0]
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
# 3. upsert to default partition
results = self.upsert(client, collection_name, rows, partition_name=partitions[0])[0]
assert results['upsert_count'] == default_nb
# 4. upsert to non-default partition
results = self.upsert(client, collection_name, rows, partition_name=partition_name)[0]
assert results['upsert_count'] == default_nb
# 5. search
vectors_to_search = rng.random((1, default_dim))
insert_ids = [i for i in range(default_nb)]
self.search(client, collection_name, vectors_to_search,
check_task=CheckTasks.check_search_results,
check_items={"enable_milvus_client_api": True,
"nq": len(vectors_to_search),
"ids": insert_ids,
"limit": default_limit,
"pk_name": default_primary_key_field_name})
# partition_number = self.get_partition_stats(client, collection_name, "_default")[0]
# assert partition_number == default_nb
# partition_number = self.get_partition_stats(client, collection_name, partition_name)[0]
# assert partition_number[0]['value'] == 0
if self.has_partition(client, collection_name, partition_name)[0]:
self.release_partitions(client, collection_name, partition_name)
self.drop_partition(client, collection_name, partition_name)
if self.has_collection(client, collection_name)[0]:
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_insert_upsert(self):
"""
target: test fast create collection normal case
method: create collection
expected: create collection with default schema, index, and load successfully
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
partition_name = cf.gen_unique_str(prefix)
# 1. create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
# 2. create partition
self.create_partition(client, collection_name, partition_name)
partitions = self.list_partitions(client, collection_name)[0]
assert partition_name in partitions
index = self.list_indexes(client, collection_name)[0]
assert index == ['vector']
# load_state = self.get_load_state(collection_name)[0]
# 3. insert and upsert
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
results = self.insert(client, collection_name, rows, partition_name=partition_name)[0]
assert results['insert_count'] == default_nb
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, "new_diff_str_field": str(i)} for i in range(default_nb)]
results = self.upsert(client, collection_name, rows, partition_name=partition_name)[0]
assert results['upsert_count'] == default_nb
# 3. search
vectors_to_search = rng.random((1, default_dim))
insert_ids = [i for i in range(default_nb)]
self.search(client, collection_name, vectors_to_search,
check_task=CheckTasks.check_search_results,
check_items={"enable_milvus_client_api": True,
"nq": len(vectors_to_search),
"ids": insert_ids,
"limit": default_limit,
"pk_name": default_primary_key_field_name})
if self.has_partition(client, collection_name, partition_name)[0]:
self.release_partitions(client, collection_name, partition_name)
self.drop_partition(client, collection_name, partition_name)
if self.has_collection(client, collection_name)[0]:
self.drop_collection(client, collection_name)
""" Test case of partial update interface """
@pytest.fixture(scope="function", params=[False, True])
def auto_id(self, request):
yield request.param
@pytest.fixture(scope="function", params=["COSINE", "L2"])
def metric_type(self, request):
yield request.param
"""
******************************************************************
# The following are invalid base cases
******************************************************************
"""
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_partial_update_new_pk_with_missing_field(self):
"""
target: Test PU will return error when provided new pk and partial field
method:
1. Create a collection
2. partial upsert a new pk with only partial field
expected: Step 2 should result fail
"""
# step 1: create collection
client = self._client()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_int32_field_name, DataType.INT32, nullable=True)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
index_params.add_index(default_int32_field_name, index_type="AUTOINDEX")
collection_name = cf.gen_collection_name_by_testcase_name(module_index=1)
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
# step 2: partial upsert a new pk with only partial field
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema,
desired_field_names=[default_primary_key_field_name, default_int32_field_name])
error = {ct.err_code: 1100, ct.err_msg:
f"fieldSchema({default_vector_field_name}) has no corresponding fieldData pass in: invalid parameter"}
self.upsert(client, collection_name, rows, partial_update=True,
check_task=CheckTasks.err_res, check_items=error)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_partial_update_new_field_without_dynamic_field(self):
"""
target: Test PU will return error when provided new field without dynamic field
method:
1. Create a collection with dynamic field
2. partial upsert a new field
expected: Step 2 should result fail
"""
# step 1: create collection with dynamic field
client = self._client()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
collection_name = cf.gen_collection_name_by_testcase_name(module_index=1)
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
# step 2: partial upsert a new field
row = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
self.upsert(client, collection_name, row, partial_update=True)
new_row = [{default_primary_key_field_name: i, default_int32_field_name: 99} for i in range(default_nb)]
error = {ct.err_code: 1,
ct.err_msg: f"Attempt to insert an unexpected field `{default_int32_field_name}` to collection without enabling dynamic field"}
self.upsert(client, collection_name, new_row, partial_update=True, check_task=CheckTasks.err_res, check_items=error)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_partial_update_after_release_collection(self):
"""
target: test basic function of partial update
method:
1. create collection
2. insert a full row of data using partial update
3. partial update data
4. release collection
5. partial update data
expected: step 5 should fail
"""
# Step 1: create collection
client = self._client()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_string_field_name, DataType.VARCHAR, max_length=64)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
index_params.add_index(default_string_field_name, index_type="AUTOINDEX")
collection_name = cf.gen_collection_name_by_testcase_name(module_index=1)
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
# Step 2: insert a full row of data using partial update
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
self.upsert(client, collection_name, rows, partial_update=True)
# Step 3: partial update data
new_row = cf.gen_row_data_by_schema(nb=default_nb, schema=schema,
desired_field_names=[default_primary_key_field_name, default_string_field_name])
self.upsert(client, collection_name, new_row, partial_update=True)
# Step 4: release collection
self.release_collection(client, collection_name)
# Step 5: partial update data
new_row = cf.gen_row_data_by_schema(nb=default_nb, schema=schema,
desired_field_names=[default_primary_key_field_name, default_string_field_name])
error = {ct.err_code: 101,
ct.err_msg: f"failed to query: collection not loaded"}
self.upsert(client, collection_name, new_row, partial_update=True,
check_task=CheckTasks.err_res, check_items=error)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_partial_update_same_pk_after_delete(self):
"""
target: test PU will fail when provided same pk and partial field
method:
1. Create a collection with dynamic field
2. Insert rows
3. delete the rows
4. upsert the rows with same pk and partial field
expected: step 4 should fail
"""
# Step 1: create collection
client = self._client()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_int32_field_name, DataType.INT32)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
index_params.add_index(default_int32_field_name, index_type="AUTOINDEX")
collection_name = cf.gen_collection_name_by_testcase_name(module_index=1)
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
# Step 2: insert rows
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
self.upsert(client, collection_name, rows, partial_update=True)
# Step 3: delete the rows
result = self.delete(client, collection_name, filter=default_search_exp)[0]
assert result["delete_count"] == default_nb
result = self.query(client, collection_name, filter=default_search_exp,
check_task=CheckTasks.check_nothing)[0]
assert len(result) == 0
# Step 4: upsert the rows with same pk and partial field
new_rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema,
desired_field_names=[default_primary_key_field_name, default_vector_field_name])
error = {ct.err_code: 1100,
ct.err_msg: f"fieldSchema({default_int32_field_name}) has no corresponding fieldData pass in: invalid parameter"}
self.upsert(client, collection_name, new_rows, partial_update=True,
check_task=CheckTasks.err_res, check_items=error)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_partial_update_pk_in_wrong_partition(self):
"""
target: test PU will fail when provided pk in wrong partition
method:
1. Create a collection
2. Create 2 partitions
3. Insert rows
4. upsert the rows with pk in wrong partition
expected: step 4 should fail
"""
# Step 1: create collection
client = self._client()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_int32_field_name, DataType.INT32)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
index_params.add_index(default_int32_field_name, index_type="AUTOINDEX")
collection_name = cf.gen_collection_name_by_testcase_name(module_index=1)
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
# Step 2: Create 2 partitions
num_of_partitions = 2
partition_names = []
for _ in range(num_of_partitions):
partition_name = cf.gen_unique_str("partition")
self.create_partition(client, collection_name, partition_name)
partition_names.append(partition_name)
# Step 3: Insert rows
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
gap = default_nb // num_of_partitions
for i, partition in enumerate(partition_names):
self.upsert(client, collection_name, rows[i*gap:(i+1)*gap], partition_name=partition, partial_update=True)
# Step 4: upsert the rows with pk in wrong partition
new_rows = cf.gen_row_data_by_schema(nb=gap, schema=schema,
desired_field_names=[default_primary_key_field_name, default_vector_field_name])
error = {ct.err_code: 1100,
ct.err_msg: f"fieldSchema({default_int32_field_name}) has no corresponding fieldData pass in: invalid parameter"}
self.upsert(client, collection_name, new_rows, partition_name=partition_names[-1], partial_update=True,
check_task=CheckTasks.err_res, check_items=error)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_partial_update_same_pk_multiple_fields(self):
"""
target: Test PU will success and query will success
method:
1. Create a collection
2. Insert rows
3. Upsert the rows with same pk and different field
expected: Step 3 should fail
"""
# step 1: create collection
client = self._client()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_int32_field_name, DataType.INT32, nullable=True)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
index_params.add_index(default_int32_field_name, index_type="AUTOINDEX")
collection_name = cf.gen_collection_name_by_testcase_name(module_index=1)
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
# step 2: Insert rows
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
self.upsert(client, collection_name, rows, partial_update=True)
# step 3: Upsert the rows with same pk and different field
new_rows = []
for i in range(default_nb):
data = {}
if i % 2 == 0:
data[default_int32_field_name] = i + 1000
data[default_primary_key_field_name] = 0
else:
data[default_vector_field_name] = [random.random() for _ in range(default_dim)]
data[default_primary_key_field_name] = 0
new_rows.append(data)
error = {ct.err_code: 1,
ct.err_msg: f"The data fields length is inconsistent. previous length is 2000, current length is 1000"}
self.upsert(client, collection_name, new_rows, partial_update=True,
check_task=CheckTasks.err_res, check_items=error)
self.drop_collection(client, collection_name)