mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-08 10:08:42 +08:00
584 lines
23 KiB
Python
584 lines
23 KiB
Python
import threading
|
|
import pytest
|
|
|
|
from pymilvus_orm import Partition
|
|
from base.client_request import ApiReq
|
|
from utils.util_log import test_log as log
|
|
from common import common_func as cf
|
|
from common import common_type as ct
|
|
from common.common_type import CaseLabel, CheckParams
|
|
|
|
|
|
prefix = "partition_"
|
|
|
|
|
|
class TestPartitionParams(ApiReq):
|
|
""" Test case of partition interface in parameters"""
|
|
|
|
@pytest.mark.tags(CaseLabel.L0)
|
|
def test_partition_default(self):
|
|
"""
|
|
target: verify create a partition
|
|
method: 1. create a partition
|
|
expected: 1. create successfully
|
|
"""
|
|
m_collection = self._collection()
|
|
p_name = cf.gen_unique_str(prefix)
|
|
descriptions = cf.gen_unique_str("desc_")
|
|
_, _ = self.partition.partition_init(
|
|
m_collection, p_name, description=descriptions,
|
|
check_res=CheckParams.partition_property_check
|
|
)
|
|
assert (m_collection.has_partition(p_name))
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_partition_empty_name(self):
|
|
"""
|
|
target: verify create a partition with empyt name
|
|
method: 1. create a partition empty none name
|
|
expected: 1. raise exception
|
|
"""
|
|
m_collection = self._collection()
|
|
p_name = ""
|
|
ex, _ = self.partition.partition_init(m_collection, p_name)
|
|
assert ex.code == 1
|
|
assert "Partition tag should not be empty" in ex.message
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_partition_empty_description(self):
|
|
"""
|
|
target: verify create a partition with empty description
|
|
method: 1. create a partition with empty description
|
|
expected: 1. create successfully
|
|
"""
|
|
m_collection = self._collection()
|
|
p_name = cf.gen_unique_str(prefix)
|
|
descriptions = ""
|
|
_, _ = self.partition.partition_init(
|
|
m_collection, p_name, description=descriptions,
|
|
check_res=CheckParams.partition_property_check)
|
|
assert (m_collection.has_partition(p_name))
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_partition_dup_name(self):
|
|
"""
|
|
target: verify create partitions with duplicate name
|
|
method: 1. create partitions with duplicate name
|
|
expected: 1. create successfully
|
|
2. the same partition returned with diff object id
|
|
"""
|
|
m_collection = self._collection(name=cf.gen_unique_str())
|
|
p_name = cf.gen_unique_str(prefix)
|
|
descriptions = cf.gen_unique_str()
|
|
m_partition, _ = self.partition.partition_init(m_collection, p_name, descriptions)
|
|
m_partition2, _ = self.partition.partition_init(m_collection, p_name, descriptions)
|
|
assert (id(m_partition2) != id(m_partition))
|
|
assert m_partition.name == m_partition2.name
|
|
assert m_partition.description == m_partition2.description
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_partition_specialchars_description(self, get_invalid_string):
|
|
"""
|
|
target: verify create a partition with special characters in description
|
|
method: 1. create a partition with special characters in description
|
|
expected: 1. create successfully
|
|
"""
|
|
m_collection = self._collection()
|
|
p_name = cf.gen_unique_str(prefix)
|
|
descriptions = get_invalid_string
|
|
m_partition, _ = self.partition.partition_init(
|
|
m_collection, p_name, description=descriptions,
|
|
check_res=CheckParams.partition_property_check)
|
|
assert (m_collection.has_partition(p_name))
|
|
assert (m_partition.description == descriptions)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.xfail(reason="issue #5373")
|
|
def test_partition_default_name(self):
|
|
"""
|
|
target: verify create a partition with default name
|
|
method: 1. get the _default partition
|
|
2. create a partition with _default name
|
|
expected: 1. the same partition returned
|
|
"""
|
|
m_collection = self._collection()
|
|
assert m_collection.has_partition(ct.default_partition_name)
|
|
m_partition = m_collection.partition(ct.default_partition_name)
|
|
m_partition2, _ = self.partition.partition_init(m_collection, ct.default_partition_name)
|
|
assert (id(m_partition2) == id(m_partition))
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_partition_invalid_name(self, get_invalid_string):
|
|
"""
|
|
target: verify create a partition with invalid name
|
|
method: 1. create a partition with invalid names
|
|
expected: 1. raise exception
|
|
"""
|
|
m_collection = self._collection()
|
|
p_name = get_invalid_string
|
|
ex, _ = self.partition.partition_init(m_collection, p_name)
|
|
# assert ex.code == 1
|
|
# TODO: need an error code issue #5144 and assert independently
|
|
assert "is illegal" in ex.message or ex.code == 1
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_partition_none_collection(self):
|
|
"""
|
|
target: verify create a partition with none collection
|
|
method: 1. create a partition with none collection
|
|
expected: 1. raise exception
|
|
"""
|
|
m_collection = None
|
|
p_name = cf.gen_unique_str(prefix)
|
|
ex, _ = self.partition.partition_init(m_collection, p_name)
|
|
assert "'NoneType' object has no attribute" in ex.message
|
|
|
|
@pytest.mark.tags(CaseLabel.L0)
|
|
def test_partition_drop(self):
|
|
"""
|
|
target: verify drop a partition in one collection
|
|
method: 1. create a partition in one collection
|
|
2. drop the partition
|
|
expected: 1. drop successfully
|
|
"""
|
|
m_collection = self._collection()
|
|
p_name = cf.gen_unique_str(prefix)
|
|
m_partition, _ = self.partition.partition_init(m_collection, p_name)
|
|
assert m_collection.has_partition(p_name)
|
|
m_partition.drop()
|
|
assert m_collection.has_partition(p_name) is False
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.xfail(reason="issue #5384")
|
|
def test_partition_release(self):
|
|
"""
|
|
target: verify release partition
|
|
method: 1. create a collection and several partitions
|
|
2. insert data into each partition
|
|
3. flush and load the partitions
|
|
4. release partition1
|
|
5. release partition1 twice
|
|
expected: 1. the released partition is released
|
|
2. the other partition is not released
|
|
"""
|
|
m_partition = self._partition()
|
|
m_partition2 = self._partition()
|
|
m_partition.insert(cf.gen_default_list_data())
|
|
m_partition2.insert(cf.gen_default_list_data())
|
|
m_partition.load()
|
|
m_partition2.load()
|
|
search_vec = cf.gen_vectors(1, ct.default_dim)
|
|
result = m_partition.search(data=search_vec,
|
|
anns_field=ct.default_float_vec_field_name,
|
|
params={"nprobe": 32},
|
|
limit=1
|
|
)
|
|
result2 = m_partition2.search(data=search_vec,
|
|
anns_field=ct.default_float_vec_field_name,
|
|
params={"nprobe": 32},
|
|
limit=1
|
|
)
|
|
assert len(result) == 1
|
|
assert len(result2) == 1
|
|
|
|
for _ in range(2):
|
|
m_partition.release()
|
|
result = m_partition.search(data=search_vec,
|
|
anns_field=ct.default_float_vec_field_name,
|
|
params={"nprobe": 32},
|
|
limit=1
|
|
)
|
|
result2 = m_partition2.search(data=search_vec,
|
|
anns_field=ct.default_float_vec_field_name,
|
|
params={"nprobe": 32},
|
|
limit=1
|
|
)
|
|
assert len(result) == 0
|
|
assert len(result2) == 1
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.xfail(reason="issue #5302")
|
|
@pytest.mark.parametrize("data, nums", [(cf.gen_default_dataframe_data(10), 10),
|
|
(cf.gen_default_list_data(1), 1),
|
|
(cf.gen_default_tuple_data(10), 10)
|
|
])
|
|
def test_partition_insert(self, data, nums):
|
|
"""
|
|
target: verify insert multi entities by dataFrame
|
|
method: 1. create a collection and a partition
|
|
2. partition.insert(data)
|
|
3. insert data again
|
|
expected: 1. insert data successfully
|
|
"""
|
|
m_partition = self._partition()
|
|
assert m_partition.is_empty
|
|
assert m_partition.num_entities == 0
|
|
m_partition.insert(data) # TODO: add ndarray type data
|
|
assert m_partition.is_empty is False
|
|
assert m_partition.num_entities == nums
|
|
m_partition.insert(data)
|
|
assert m_partition.is_empty is False
|
|
assert m_partition.num_entities == (nums + nums)
|
|
|
|
|
|
class TestPartitionOperations(ApiReq):
|
|
""" Test case of partition interface in operations """
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_partition_dropped_collection(self):
|
|
"""
|
|
target: verify create partition against a dropped collection
|
|
method: 1. create collection1
|
|
2. drop collection1
|
|
3. create partition in collection1
|
|
expected: 1. raise exception
|
|
"""
|
|
m_collection = self._collection()
|
|
m_collection.drop()
|
|
p_name = cf.gen_unique_str(prefix)
|
|
ex, _ = self.partition.partition_init(m_collection, p_name)
|
|
assert ex.code == 1
|
|
assert "can't find collection" in ex.message
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_partition_same_name_in_diff_collections(self):
|
|
"""
|
|
target: verify create partitions with sanme name in diff collections
|
|
method: 1. create a partition in collection1
|
|
2. create a partition in collection2
|
|
expected: 1. create successfully
|
|
"""
|
|
m_collection = self._collection(cf.gen_unique_str())
|
|
m_collection2 = self._collection(cf.gen_unique_str())
|
|
p_name = cf.gen_unique_str(prefix)
|
|
_, _ = self.partition.partition_init(m_collection, p_name)
|
|
_, _ = self.partition.partition_init(m_collection2, p_name)
|
|
assert m_collection.has_partition(p_name)
|
|
assert m_collection2.has_partition(p_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_partition_multi_partitions_in_collection(self):
|
|
"""
|
|
target: verify create multiple partitions in one collection
|
|
method: 1. create multiple partitions in one collection
|
|
expected: 1. create successfully
|
|
"""
|
|
m_collection = self._collection()
|
|
for _ in range(10):
|
|
p_name = cf.gen_unique_str(prefix)
|
|
_, _ = self.partition.partition_init(m_collection, p_name)
|
|
assert m_collection.has_partition(p_name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_partition_maximum_partitions(self):
|
|
"""
|
|
target: verify create maximum partitions
|
|
method: 1. create maximum partitions
|
|
2. create one more partition
|
|
expected: 1. raise exception
|
|
"""
|
|
threads_num = 8
|
|
threads = []
|
|
# if args["handler"] == "HTTP":
|
|
# pytest.skip("skip in http mode")
|
|
|
|
def create_partition(collection, threads_n):
|
|
for _ in range(ct.max_partition_num // threads_n):
|
|
name = cf.gen_unique_str(prefix)
|
|
Partition(collection, name)
|
|
|
|
m_collection = self._collection()
|
|
for _ in range(threads_num):
|
|
t = threading.Thread(target=create_partition, args=(m_collection, threads_num))
|
|
threads.append(t)
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
p_name = cf.gen_unique_str()
|
|
ex, _ = self.partition.partition_init(m_collection, p_name)
|
|
assert ex.code == 1
|
|
assert "maximum partition's number should be limit to 4096" in ex.message
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.xfail(reason="issue #5302")
|
|
def test_partition_drop_default_partition(self):
|
|
"""
|
|
target: verify drop the _default partition
|
|
method: 1. drop the _default partition
|
|
expected: 1. raise exception
|
|
"""
|
|
m_collection = self._collection()
|
|
default_partition = m_collection.partition(ct.default_partition_name)
|
|
default_partition.insert(cf.gen_default_list_data())
|
|
# TODO need a flush?
|
|
assert default_partition.is_empty is False
|
|
with pytest.raises(Exception) as e:
|
|
default_partition.drop()
|
|
log.info(e)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_partition_drop_partition_twice(self):
|
|
"""
|
|
target: verify drop the same partition twice
|
|
method: 1.create a partition with default schema
|
|
2. drop the partition
|
|
3. drop the same partition again
|
|
expected: raise exception when 2nd time
|
|
"""
|
|
m_collection = self._collection()
|
|
p_name = cf.gen_unique_str(prefix)
|
|
m_partition, _ = self.partition.partition_init(m_collection, p_name)
|
|
assert m_collection.has_partition(p_name)
|
|
m_partition.drop()
|
|
assert m_collection.has_partition(p_name) is False
|
|
with pytest.raises(Exception) as e:
|
|
m_partition.drop()
|
|
log.info(e)
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
def test_partition_create_and_drop_multi_times(self):
|
|
"""
|
|
target: verify create and drop for times
|
|
method: 1.create a partition with default schema
|
|
2. drop the partition
|
|
3. loop #1 and #2 for times
|
|
expected: create and drop successfully
|
|
"""
|
|
m_collection = self._collection()
|
|
p_name = cf.gen_unique_str(prefix)
|
|
for _ in range(5):
|
|
m_partition, _ = self.partition.partition_init(m_collection, p_name)
|
|
assert m_collection.has_partition(p_name)
|
|
m_partition.drop()
|
|
assert m_collection.has_partition(p_name) is False
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("flush", [True, False])
|
|
def test_partition_drop_non_empty_partition(self, flush):
|
|
"""
|
|
target: verify drop a partition which has data inserted
|
|
method: 1.create a partition with default schema
|
|
2. insert some data
|
|
3. flush / not flush
|
|
3. drop the partition
|
|
expected: drop successfully
|
|
"""
|
|
m_collection = self._collection()
|
|
p_name = cf.gen_unique_str(prefix)
|
|
m_partition, _ = self.partition.partition_init(m_collection, p_name)
|
|
assert m_collection.has_partition(p_name)
|
|
m_partition.insert(cf.gen_default_dataframe_data())
|
|
if flush:
|
|
# conn = self._connect()
|
|
# conn.flush([m_collection.name])
|
|
# TODO: m_partition.flush()
|
|
pass
|
|
m_partition.drop()
|
|
assert m_collection.has_partition(p_name) is False
|
|
|
|
@pytest.mark.tags(CaseLabel.L2)
|
|
@pytest.mark.parametrize("flush", [True, False])
|
|
def test_partition_drop_indexed_partition(self, flush, get_index_param):
|
|
"""
|
|
target: verify drop an indexed partition
|
|
method: 1.create a partition
|
|
2. insert same data
|
|
3. create an index
|
|
4. flush or not flush
|
|
5. drop the partition
|
|
expected: drop successfully
|
|
"""
|
|
m_collection = self._collection()
|
|
p_name = cf.gen_unique_str(prefix)
|
|
m_partition, _ = self.partition.partition_init(m_collection, p_name)
|
|
assert m_collection.has_partition(p_name)
|
|
data = cf.gen_default_list_data(nb=10)
|
|
m_partition.insert(data)
|
|
index_param = get_index_param
|
|
log.info(m_collection.schema)
|
|
m_collection.create_index(
|
|
ct.default_float_vec_field_name,
|
|
index_param)
|
|
if flush:
|
|
# TODO: m_partition.flush()
|
|
pass
|
|
m_partition.drop()
|
|
assert m_collection.has_partition(p_name) is False
|
|
log.info("collection name: %s", m_collection.name)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_partition_release_empty_partition(self):
|
|
"""
|
|
target: verify release an empty partition
|
|
method: 1.create a partition
|
|
2. release the partition
|
|
expected: release successfully
|
|
"""
|
|
m_partition = self._partition()
|
|
assert m_partition.is_empty
|
|
m_partition.release()
|
|
# TODO: assert no more memory consumed
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_partition_release_dropped_partition(self):
|
|
"""
|
|
target: verify release an dropped partition
|
|
method: 1.create a partition
|
|
2. drop the partition
|
|
2. release the partition
|
|
expected: raise exception
|
|
"""
|
|
m_partition = self._partition()
|
|
m_partition.drop()
|
|
with pytest.raises(Exception) as e:
|
|
m_partition.release()
|
|
log.info(e)
|
|
# TODO assert the error code
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_partition_release_dropped_collection(self):
|
|
"""
|
|
target: verify release an dropped collection
|
|
method: 1.create a collection and partition
|
|
2. drop the collection
|
|
2. release the partition
|
|
expected: raise exception
|
|
"""
|
|
m_collection = self._collection()
|
|
p_name = cf.gen_unique_str(prefix)
|
|
m_partition, _ = self.partition.partition_init(m_collection, p_name)
|
|
assert m_collection.has_partition(p_name)
|
|
m_collection.drop()
|
|
with pytest.raises(Exception) as e:
|
|
m_partition.release()
|
|
log.info(e)
|
|
# TODO assert the error code
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.xfail(reason="issue #5384")
|
|
def test_partition_release_after_collection_released(self):
|
|
"""
|
|
target: verify release a partition after the collection released
|
|
method: 1.create a collection and partition
|
|
2. insert some data
|
|
2. release the collection
|
|
2. release the partition
|
|
expected: partition released successfully
|
|
"""
|
|
m_collection = self._collection()
|
|
p_name = cf.gen_unique_str(prefix)
|
|
m_partition, _ = self.partition.partition_init(m_collection, p_name)
|
|
assert m_collection.has_partition(p_name)
|
|
m_partition.insert(cf.gen_default_list_data())
|
|
m_partition.load()
|
|
search_vec = cf.gen_vectors(1, ct.default_dim)
|
|
result = m_partition.search(data=search_vec,
|
|
anns_field=ct.default_float_vec_field_name,
|
|
params={"nprobe": 32},
|
|
limit=1
|
|
)
|
|
assert len(result) == 1
|
|
m_collection.release()
|
|
result = m_partition.search(data=search_vec,
|
|
anns_field=ct.default_float_vec_field_name,
|
|
params={"nprobe": 32},
|
|
limit=1
|
|
)
|
|
assert len(result) == 0
|
|
m_partition.release()
|
|
# TODO assert release successfully
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.xfail(reason="issue #5302")
|
|
def test_partition_insert_default_partition(self):
|
|
"""
|
|
target: verify insert data into _default partition
|
|
method: 1.create a collection
|
|
2. insert some data into _default partition
|
|
expected: insert successfully
|
|
"""
|
|
m_collection = self._collection()
|
|
default_partition = m_collection.partition(ct.default_partition_name)
|
|
data = cf.gen_default_dataframe_data()
|
|
default_partition.insert(data)
|
|
assert default_partition.num_entities == len(data)
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_partition_insert_dropped_partition(self):
|
|
"""
|
|
target: verify insert data into dropped partition
|
|
method: 1.create a collection
|
|
2. insert some data into dropped partition
|
|
expected: raise exception
|
|
"""
|
|
m_partition = self._partition()
|
|
m_partition.drop()
|
|
with pytest.raises(Exception) as e:
|
|
m_partition.insert(cf.gen_default_dataframe_data())
|
|
log.info(e)
|
|
# TODO: assert the error code
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
def test_partition_insert_dropped_collection(self):
|
|
"""
|
|
target: verify insert data into dropped collection
|
|
method: 1.create a collection
|
|
2. insert some data into dropped collection
|
|
expected: raise exception
|
|
"""
|
|
m_collection = self._collection()
|
|
p_name = cf.gen_unique_str(prefix)
|
|
m_partition, _ = self.partition.partition_init(m_collection, p_name)
|
|
assert m_collection.has_partition(p_name)
|
|
m_collection.drop()
|
|
with pytest.raises(Exception) as e:
|
|
m_partition.insert(cf.gen_default_dataframe_data())
|
|
log.info(e)
|
|
# TODO: assert the error code
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.xfail(reason="issue #5302")
|
|
def test_partition_insert_maximum_size_data(self):
|
|
"""
|
|
target: verify insert maximum size data(256M?) a time
|
|
method: 1.create a partition
|
|
2. insert maximum size data
|
|
expected: insert successfully
|
|
"""
|
|
m_partition = self._partition()
|
|
max_size = 100000 # TODO: clarify the max size of data
|
|
data = cf.gen_default_dataframe_data(max_size)
|
|
m_partition.insert(data)
|
|
assert m_partition.num_entities == max_size
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("dim, expected_err",
|
|
[(ct.default_dim - 1, "error"), (ct.default_dim + 1, "error")])
|
|
def test_partition_insert_dismatched_dimensions(self, dim, expected_err):
|
|
"""
|
|
target: verify insert maximum size data(256M?) a time
|
|
method: 1.create a collection with default dim
|
|
2. insert dismatch dim data
|
|
expected: raise exception
|
|
"""
|
|
m_collection = self._collection(schema=cf.gen_default_collection_schema())
|
|
p_name = cf.gen_unique_str(prefix)
|
|
m_partition, _ = self.partition.partition_init(m_collection, p_name)
|
|
data = cf.gen_default_list_data(nb=10, dim=dim)
|
|
with pytest.raises(Exception) as e:
|
|
m_partition.insert(data)
|
|
log.info(e)
|
|
# TODO: assert expected_err in error code
|
|
|
|
@pytest.mark.tags(CaseLabel.L1)
|
|
@pytest.mark.parametrize("sync", [True, False])
|
|
def test_partition_insert_sync(self, sync):
|
|
"""
|
|
target: verify insert sync
|
|
method: 1.create a partition
|
|
2. insert data in sync
|
|
expected: insert successfully
|
|
"""
|
|
pass
|
|
|