From dccba87fab6662861376e956c685dc1feb7d9a11 Mon Sep 17 00:00:00 2001 From: laurazhao0611 Date: Thu, 13 Feb 2025 11:40:46 +0800 Subject: [PATCH] test: add some async cases (#39706) /kind improvement --------- Signed-off-by: laurazhao0611 Co-authored-by: laurazhao0611 --- .../base/async_milvus_client_wrapper.py | 28 + .../test_milvus_client_database.py | 9 +- .../test_collection_async.py | 221 +++++ .../async_milvus_client/test_index_async.py | 345 ++++++++ .../test_partition_async.py | 816 ++++++++++++++++++ 5 files changed, 1416 insertions(+), 3 deletions(-) create mode 100644 tests/python_client/testcases/async_milvus_client/test_collection_async.py create mode 100644 tests/python_client/testcases/async_milvus_client/test_index_async.py create mode 100644 tests/python_client/testcases/async_milvus_client/test_partition_async.py diff --git a/tests/python_client/base/async_milvus_client_wrapper.py b/tests/python_client/base/async_milvus_client_wrapper.py index 82f3f2f59f..211da99dce 100644 --- a/tests/python_client/base/async_milvus_client_wrapper.py +++ b/tests/python_client/base/async_milvus_client_wrapper.py @@ -67,12 +67,40 @@ class AsyncMilvusClientWrapper: @logger_interceptor() async def load_collection(self, collection_name: str, timeout: Optional[float] = None, **kwargs): return await self.async_milvus_client.load_collection(collection_name, timeout, **kwargs) + + @logger_interceptor() + async def release_collection(self, collection_name, timeout=None, **kwargs): + return await self.async_milvus_client.release_collection(collection_name, timeout, **kwargs) @logger_interceptor() async def create_index(self, collection_name: str, index_params, timeout: Optional[float] = None, **kwargs): return await self.async_milvus_client.create_index(collection_name, index_params, timeout, **kwargs) + @logger_interceptor() + async def drop_index(self, collection_name, index_name, timeout=None, **kwargs): + return await self.async_milvus_client.drop_index(collection_name, index_name, timeout, **kwargs) + + # @logger_interceptor() + # async def list_indexes(self, collection_name, field_name="", timeout=None, **kwargs): + # return await self.async_milvus_client.list_indexes(collection_name, field_name, timeout, **kwargs) + + @logger_interceptor() + async def create_partition(self, collection_name, partition_name, timeout=None, **kwargs): + return await self.async_milvus_client.create_partition(collection_name, partition_name, timeout, **kwargs) + + @logger_interceptor() + async def drop_partition(self, collection_name, partition_name, timeout=None, **kwargs): + return await self.async_milvus_client.drop_partition(collection_name, partition_name, timeout, **kwargs) + + @logger_interceptor() + async def load_partitions(self, collection_name, partition_names, timeout=None, **kwargs): + return await self.async_milvus_client.load_partitions(collection_name, partition_names, timeout, **kwargs) + + @logger_interceptor() + async def release_partitions(self, collection_name, partition_names, timeout=None, **kwargs): + return await self.async_milvus_client.release_partitions(collection_name, partition_names, timeout, **kwargs) + @logger_interceptor() async def insert(self, collection_name: str, diff --git a/tests/python_client/milvus_client/test_milvus_client_database.py b/tests/python_client/milvus_client/test_milvus_client_database.py index 43f04dd2d2..bf9da177cd 100644 --- a/tests/python_client/milvus_client/test_milvus_client_database.py +++ b/tests/python_client/milvus_client/test_milvus_client_database.py @@ -274,12 +274,15 @@ class TestMilvusClientDatabaseInvalid(TestMilvusClientV2Base): self.create_database(client, db_name, properties=properties) dbs = self.list_databases(client)[0] assert db_name in dbs - describe = self.describe_database(client, db_name)[0] - assert "database.force.deny.writing" in describe - assert "database.replica.number" in describe + self.describe_database(client, db_name, + check_task=CheckTasks.check_describe_database_property, + check_items={"db_name": db_name, + "database.force.deny.writing": "true", + "database.replica.number": "3"}) alter_properties = {"data.replica.number": 2} self.alter_database_properties(client, db_name, properties=alter_properties) describe = self.describe_database(client, db_name)[0] + self.drop_database(client, db_name) @pytest.mark.tags(CaseLabel.L2) @pytest.mark.parametrize("db_name", ["%$#", "test", " "]) diff --git a/tests/python_client/testcases/async_milvus_client/test_collection_async.py b/tests/python_client/testcases/async_milvus_client/test_collection_async.py new file mode 100644 index 0000000000..0a21fdac25 --- /dev/null +++ b/tests/python_client/testcases/async_milvus_client/test_collection_async.py @@ -0,0 +1,221 @@ +import random +import time +import numpy as np +import pytest +import asyncio +from pymilvus.client.types import LoadState, DataType +from pymilvus import AnnSearchRequest, RRFRanker + +from base.client_v2_base import TestMilvusClientV2Base +from common import common_func as cf +from common import common_type as ct +from common.common_type import CaseLabel, CheckTasks +from utils.util_log import test_log as log + +pytestmark = pytest.mark.asyncio +prefix = "async" +partition_prefix = "async_partition" +async_default_nb = 5000 +default_nb = ct.default_nb +default_dim = 2 +default_limit = ct.default_limit +default_search_exp = "id >= 0" +exp_res = "exp_res" +default_primary_key_field_name = "id" +default_vector_field_name = "vector" +default_float_field_name = ct.default_float_field_name +default_string_field_name = ct.default_string_field_name + +class TestAsyncMilvusClientCollectionInvalid(TestMilvusClientV2Base): + """ Test case of collection interface """ + + def teardown_method(self, method): + self.init_async_milvus_client() + loop = asyncio.get_event_loop() + loop.run_until_complete(self.async_milvus_client_wrap.close()) + super().teardown_method(method) + + """ + ****************************************************************** + # The following are invalid base cases + ****************************************************************** + """ + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("collection_name", ["12-s", "12 s", "(mn)", "中文", "%$#"]) + async def test_async_milvus_client_create_collection_invalid_collection_name(self, collection_name): + """ + target: test fast create collection with invalid collection name + method: create collection with invalid collection + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + # 1. create collection + error = {ct.err_code: 1100, ct.err_msg: f"Invalid collection name: {collection_name}. the first character of a " + f"collection name must be an underscore or letter: invalid parameter"} + await async_client.create_collection(collection_name, default_dim, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + async def test_async_milvus_client_create_collection_name_over_max_length(self): + """ + target: test fast create collection with over max collection name length + method: create collection with over max collection name length + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + # 1. create collection + collection_name = "a".join("a" for i in range(256)) + error = {ct.err_code: 1100, ct.err_msg: f"the length of a collection name must be less than 255 characters"} + await async_client.create_collection(collection_name, default_dim, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("collection_name", ["12-s", "12 s", "(mn)", "中文", "%$#"]) + async def test_async_milvus_client_release_collection_invalid_collection_name(self, collection_name): + """ + target: test release collection with invalid collection name + method: release collection with invalid collection name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + # 1. release collection + error = {ct.err_code: 1100, + ct.err_msg: f"Invalid collection name: {collection_name}. " + f"the first character of a collection name must be an underscore or letter"} + await async_client.release_collection(collection_name, check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L2) + async def test_async_milvus_client_release_collection_not_existed(self): + """ + target: test release collection with nonexistent name + method: release collection with nonexistent name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + # 1. release collection + collection_name = cf.gen_unique_str("nonexisted") + error = {ct.err_code: 1100, ct.err_msg: f"collection not found[database=default]" + f"[collection={collection_name}]"} + await async_client.release_collection(collection_name, check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + async def test_async_milvus_client_release_collection_name_over_max_length(self): + """ + target: test fast create collection with over max collection name length + method: create collection with over max collection name length + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + # 1. release collection + collection_name = "a".join("a" for i in range(256)) + error = {ct.err_code: 1100, ct.err_msg: f"the length of a collection name must be less than 255 characters"} + await async_client.release_collection(collection_name, check_task=CheckTasks.err_res, check_items=error) + +class TestAsyncMilvusClientCollectionValid(TestMilvusClientV2Base): + """ Test case of collection interface """ + + def teardown_method(self, method): + self.init_async_milvus_client() + loop = asyncio.get_event_loop() + loop.run_until_complete(self.async_milvus_client_wrap.close()) + super().teardown_method(method) + + """ + ****************************************************************** + # The following are valid base cases + ****************************************************************** + """ + + @pytest.mark.tags(CaseLabel.L0) + async def test_async_milvus_client_release_collection_default(self): + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + # 1. create collection + collection_name = cf.gen_unique_str(prefix) + await async_client.create_collection(collection_name, default_dim) + collections = self.list_collections(client)[0] + assert collection_name in collections + self.describe_collection(client, collection_name, + check_task=CheckTasks.check_describe_collection_property, + check_items={"collection_name": collection_name, + "dim": default_dim, + "consistency_level": 0}) + # 2. create partition + partition_name = cf.gen_unique_str(partition_prefix) + await async_client.create_partition(collection_name, partition_name) + partitions = self.list_partitions(client, collection_name)[0] + assert partition_name in partitions + # 3. insert + rng = np.random.default_rng(seed=19530) + rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]), + default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)] + self.insert(client, collection_name, rows) + tasks = [] + # 4. search + vectors_to_search = rng.random((1, default_dim)) + search_task = async_client.search(collection_name, vectors_to_search, + check_task=CheckTasks.check_search_results, + check_items={"enable_milvus_client_api": True, + "nq": len(vectors_to_search), + "limit": default_limit}) + tasks.append(search_task) + # 5. query + query_task = async_client.query(collection_name, filter=default_search_exp, + check_task=CheckTasks.check_query_results, + check_items={"exp_res": rows, + "with_vec": True, + "primary_field": default_primary_key_field_name}) + tasks.append(query_task) + res = await asyncio.gather(*tasks) + + # 6. release collection + await async_client.release_collection(collection_name) + # 7. search + error = {ct.err_code: 101, ct.err_msg: f"collection not loaded"} + await async_client.search(collection_name, vectors_to_search, + check_task=CheckTasks.err_res, + check_items=error) + # 8. query + await async_client.query(collection_name, filter=default_search_exp, + check_task=CheckTasks.err_res, + check_items=error) + # 9. load collection + await async_client.load_collection(collection_name) + # 10. search + await async_client.search(collection_name, vectors_to_search, + check_task=CheckTasks.check_search_results, + check_items={"enable_milvus_client_api": True, + "nq": len(vectors_to_search), + "limit": default_limit}) + # 11. query + await async_client.query(collection_name, filter=default_search_exp, + check_task=CheckTasks.check_query_results, + check_items={"exp_res": rows, + "with_vec": True, + "primary_field": default_primary_key_field_name}) + + # 12. drop action + if self.has_partition(client, collection_name, partition_name)[0]: + await async_client.release_partitions(collection_name, partition_name) + await async_client.drop_partition(collection_name, partition_name) + partitions = self.list_partitions(client, collection_name)[0] + assert partition_name not in partitions + await async_client.drop_collection(collection_name) diff --git a/tests/python_client/testcases/async_milvus_client/test_index_async.py b/tests/python_client/testcases/async_milvus_client/test_index_async.py new file mode 100644 index 0000000000..f54ecd96e6 --- /dev/null +++ b/tests/python_client/testcases/async_milvus_client/test_index_async.py @@ -0,0 +1,345 @@ +import random +import time +import numpy as np +import pytest +import asyncio +from pymilvus.client.types import LoadState, DataType +from pymilvus import AnnSearchRequest, RRFRanker + +from base.client_v2_base import TestMilvusClientV2Base +from common import common_func as cf +from common import common_type as ct +from common.common_type import CaseLabel, CheckTasks +from utils.util_log import test_log as log + +pytestmark = pytest.mark.asyncio +prefix = "async" +partition_prefix = "async_partition" +async_default_nb = 5000 +default_nb = ct.default_nb +default_dim = 128 +default_limit = ct.default_limit +default_search_exp = "id >= 0" +exp_res = "exp_res" +default_primary_key_field_name = "id" +default_vector_field_name = "vector" +default_float_field_name = ct.default_float_field_name +default_string_field_name = ct.default_string_field_name + +class TestAsyncMilvusClientIndexInvalid(TestMilvusClientV2Base): + """ Test case of index interface """ + + def teardown_method(self, method): + self.init_async_milvus_client() + loop = asyncio.get_event_loop() + loop.run_until_complete(self.async_milvus_client_wrap.close()) + super().teardown_method(method) + + """ + ****************************************************************** + # The following are invalid base cases + ****************************************************************** + """ + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("name", ["12-s", "12 s", "(mn)", "中文", "%$#"]) + async def test_async_milvus_client_create_index_invalid_collection_name(self, name): + """ + target: test create index with invalid collection name + method: create index with invalid collection name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + # 1. create collection + collection_name = cf.gen_unique_str(prefix) + await async_client.create_collection(collection_name, default_dim, consistency_level="Strong") + await async_client.release_collection(collection_name) + await async_client.drop_index(collection_name, "vector") + # 2. prepare index params + index_params = self.prepare_index_params(client)[0] + index_params.add_index(field_name="vector") + # 3. create index + error = {ct.err_code: 1100, ct.err_msg: f"Invalid collection name: {name}. the first character of a collection " + f"name must be an underscore or letter: invalid parameter"} + await async_client.create_index(name, index_params, + check_task=CheckTasks.err_res, + check_items=error) + # 4. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("name", ["a".join("a" for i in range(256))]) + async def test_async_milvus_client_create_index_collection_name_over_max_length(self, name): + """ + target: test create index with over max collection name length + method: create index with over max collection name length + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + # 1. create collection + collection_name = cf.gen_unique_str(prefix) + await async_client.create_collection(collection_name, default_dim, consistency_level="Strong") + await async_client.release_collection(collection_name) + await async_client.drop_index(collection_name, "vector") + # 2. prepare index params + index_params = self.prepare_index_params(client)[0] + index_params.add_index(field_name="vector") + # 3. create index + error = {ct.err_code: 1100, ct.err_msg: f"Invalid collection name: {name}. the length of a collection name " + f"must be less than 255 characters: invalid parameter"} + await async_client.create_index(name, index_params, + check_task=CheckTasks.err_res, + check_items=error) + # 4. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L1) + async def test_async_milvus_client_create_index_collection_name_not_existed(self): + """ + target: test create index with nonexistent collection name + method: create index with nonexistent collection name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str(prefix) + not_existed_collection_name = cf.gen_unique_str("not_existed_collection") + # 1. create collection + await async_client.create_collection(collection_name, default_dim, consistency_level="Strong") + await async_client.release_collection(collection_name) + await async_client.drop_index(collection_name, "vector") + # 2. prepare index params + index_params = self.prepare_index_params(client)[0] + index_params.add_index(field_name="vector") + # 3. create index + error = {ct.err_code: 100, + ct.err_msg: f"can't find collection[database=default][collection={not_existed_collection_name}]"} + await async_client.create_index(not_existed_collection_name, index_params, + check_task=CheckTasks.err_res, + check_items=error) + # 4. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("index", ["12-s", "12 s", "(mn)", "中文", "%$#", "a".join("a" for i in range(256))]) + async def test_async_milvus_client_create_index_invalid_index_type(self, index): + """ + target: test create index with invalid index type name + method: create index with invalid index type name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str(prefix) + # 1. create collection + await async_client.create_collection(collection_name, default_dim, consistency_level="Strong") + await async_client.release_collection(collection_name) + await async_client.drop_index(collection_name, "vector") + # 2. prepare index params + index_params = self.prepare_index_params(client)[0] + index_params.add_index(field_name="vector", index_type=index) + # 3. create index + error = {ct.err_code: 1100, ct.err_msg: f"invalid parameter[expected=valid index][actual=invalid index type: {index}"} + # It's good to show what the valid indexes are + await async_client.create_index(collection_name, index_params, + check_task=CheckTasks.err_res, + check_items=error) + # 4. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("metric", ["12-s", "12 s", "(mn)", "中文", "%$#", "a".join("a" for i in range(256))]) + async def test_async_milvus_client_create_index_invalid_metric_type(self, metric): + """ + target: test create index with invalid metric type + method: create index with invalid metric type + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str(prefix) + # 1. create collection + await async_client.create_collection(collection_name, default_dim, consistency_level="Strong") + await async_client.release_collection(collection_name) + await async_client.drop_index(collection_name, "vector") + # 2. prepare index params + index_params = self.prepare_index_params(client)[0] + index_params.add_index(field_name="vector", metric_type=metric) + # 3. create index + error = {ct.err_code: 1100, ct.err_msg: f"float vector index does not support metric type: {metric}: " + f"invalid parameter[expected=valid index params][actual=invalid index params"} + # It's good to show what the valid index params are + await async_client.create_index(collection_name, index_params, + check_task=CheckTasks.err_res, + check_items=error) + # 4. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L1) + async def test_async_milvus_client_drop_index_before_release(self): + """ + target: test drop index when collection are not released + method: drop index when collection are not released + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str(prefix) + # 1. create collection + await async_client.create_collection(collection_name, default_dim, consistency_level="Strong") + # 2. drop index + error = {ct.err_code: 65535, ct.err_msg: f"index cannot be dropped, collection is loaded, " + f"please release it first"} + await async_client.drop_index(collection_name, "vector", check_task=CheckTasks.err_res, check_items=error) + # 3. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("name", ["12-s", "12 s", "(mn)", "中文", "%$#"]) + async def test_async_milvus_client_drop_index_invalid_collection_name(self, name): + """ + target: test drop index with invalid collection name + method: drop index with invalid collection name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + # 1. create collection + collection_name = cf.gen_unique_str(prefix) + await async_client.create_collection(collection_name, default_dim, consistency_level="Strong") + await async_client.release_collection(collection_name) + # 2. drop index + error = {ct.err_code: 1100, ct.err_msg: f"Invalid collection name: {name}. the first character of a collection " + f"name must be an underscore or letter: invalid parameter"} + await async_client.drop_index(name, "vector", check_task=CheckTasks.err_res, check_items=error) + # 3. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("name", ["a".join("a" for i in range(256))]) + async def test_async_milvus_client_drop_index_collection_name_over_max_length(self, name): + """ + target: test drop index with over max collection name length + method: drop index with over max collection name length + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + # 1. create collection + collection_name = cf.gen_unique_str(prefix) + await async_client.create_collection(collection_name, default_dim, consistency_level="Strong") + await async_client.release_collection(collection_name) + # 2. drop index + error = {ct.err_code: 1100, ct.err_msg: f"Invalid collection name: {name}. the length of a collection name " + f"must be less than 255 characters: invalid parameter"} + await async_client.drop_index(name, "vector", check_task=CheckTasks.err_res, check_items=error) + # 3. drop action + await async_client.drop_collection(collection_name) + +class TestAsyncMilvusClientIndexValid(TestMilvusClientV2Base): + """ Test case of index interface """ + + def teardown_method(self, method): + self.init_async_milvus_client() + loop = asyncio.get_event_loop() + loop.run_until_complete(self.async_milvus_client_wrap.close()) + super().teardown_method(method) + + @pytest.fixture(scope="function", params=["COSINE", "L2", "IP"]) + def metric_type(self, request): + yield request.param + + """ + ****************************************************************** + # The following are valid base cases + ****************************************************************** + """ + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("index, params", + zip(ct.all_index_types[:7], + ct.default_all_indexes_params[:7])) + async def test_async_milvus_client_create_drop_index_default(self, index, params, metric_type): + """ + target: test create and drop index normal case + method: create collection, index; insert; search and query; drop index + expected: search/query successfully; create/drop index successfully + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + # 1. create collection + collection_name = cf.gen_unique_str(prefix) + await async_client.create_collection(collection_name, default_dim) + collections = self.list_collections(client)[0] + assert collection_name in collections + self.describe_collection(client, collection_name, + check_task=CheckTasks.check_describe_collection_property, + check_items={"collection_name": collection_name, + "dim": default_dim, + "consistency_level": 0}) + + await async_client.release_collection(collection_name) + await async_client.drop_index(collection_name, "vector") + res = self.list_indexes(client, collection_name)[0] + assert res == [] + + # 2. prepare index params + index_params = self.prepare_index_params(client)[0] + index_params.add_index(field_name="vector", index_type=index, metric_type=metric_type, params=params) + # 3. create index + await async_client.create_index(collection_name, index_params) + + # 4. insert + rng = np.random.default_rng(seed=19530) + rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]), + default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)] + self.insert(client, collection_name, rows) + self.load_collection(client, collection_name) + + tasks = [] + # 5. search + vectors_to_search = rng.random((1, default_dim)) + search_task = self.async_milvus_client_wrap. \ + search(collection_name, vectors_to_search, + check_task=CheckTasks.check_search_results, + check_items={"enable_milvus_client_api": True, + "nq": len(vectors_to_search), + "limit": default_limit}) + tasks.append(search_task) + # 6. query + query_task = self.async_milvus_client_wrap. \ + query(collection_name, filter=default_search_exp, + check_task=CheckTasks.check_query_results, + check_items={"exp_res": rows, + "with_vec": True, + "primary_field": default_primary_key_field_name}) + tasks.append(query_task) + res = await asyncio.gather(*tasks) + + # 7. drop index + await async_client.release_collection(collection_name) + await async_client.drop_index(collection_name, "vector") + res = self.list_indexes(client, collection_name)[0] + assert res == [] + + # 8. drop action + self.drop_collection(client, collection_name) diff --git a/tests/python_client/testcases/async_milvus_client/test_partition_async.py b/tests/python_client/testcases/async_milvus_client/test_partition_async.py new file mode 100644 index 0000000000..86cf94b430 --- /dev/null +++ b/tests/python_client/testcases/async_milvus_client/test_partition_async.py @@ -0,0 +1,816 @@ +import random +import time +import numpy as np +import pytest +import asyncio +from pymilvus.client.types import LoadState, DataType +from pymilvus import AnnSearchRequest, RRFRanker + +from base.client_v2_base import TestMilvusClientV2Base +from common import common_func as cf +from common import common_type as ct +from common.common_type import CaseLabel, CheckTasks +from utils.util_log import test_log as log + +pytestmark = pytest.mark.asyncio +prefix = "async" +partition_prefix = "async_partition" +async_default_nb = 5000 +default_nb = ct.default_nb +default_dim = 2 +default_limit = ct.default_limit +default_search_exp = "id >= 0" +exp_res = "exp_res" +default_primary_key_field_name = "id" +default_vector_field_name = "vector" +default_float_field_name = ct.default_float_field_name +default_string_field_name = ct.default_string_field_name + + +class TestAsyncMilvusClientPartitionInvalid(TestMilvusClientV2Base): + """ Test case of partition interface """ + + def teardown_method(self, method): + self.init_async_milvus_client() + loop = asyncio.get_event_loop() + loop.run_until_complete(self.async_milvus_client_wrap.close()) + super().teardown_method(method) + + """ + ****************************************************************** + # The following are invalid base cases + ****************************************************************** + """ + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("collection_name", ["12-s", "12 s", "(mn)", "中文", "%$#"]) + async def test_async_milvus_client_create_partition_invalid_collection_name(self, collection_name): + """ + target: test create partition with invalid collection name + method: create partition with invalid collection name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + partition_name = cf.gen_unique_str(partition_prefix) + # 1. create partition + error = {ct.err_code: 1100, ct.err_msg: f"Invalid collection name: {collection_name}. the first character of a " + f"collection name must be an underscore or letter: invalid parameter"} + await async_client.create_partition(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + async def test_async_milvus_client_create_partition_collection_name_over_max_length(self): + """ + target: test create partition with collection name over max length 255 + method: create partition with collection name over max length 255 + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = "a".join("a" for i in range(256)) + partition_name = cf.gen_unique_str(partition_prefix) + # 1. create partition + error = {ct.err_code: 1100, + ct.err_msg: f"Invalid collection name: {collection_name}. the length of a collection name " + f"must be less than 255 characters: invalid parameter"} + await async_client.create_partition(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + async def test_async_milvus_client_create_partition_collection_name_not_existed(self): + """ + target: test create partition with nonexistent collection name + method: create partition with nonexistent collection name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str("partition_not_exist") + partition_name = cf.gen_unique_str(partition_prefix) + # 1. create partition + error = {ct.err_code: 100, ct.err_msg: f"collection not found[database=default]" + f"[collection={collection_name}]"} + await async_client.create_partition(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("partition_name", ["12 s", "(mn)", "中文", "%$#"]) + async def test_async_milvus_client_create_partition_invalid_partition_name(self, partition_name): + """ + target: test create partition with invalid partition name + method: create partition with invalid partition name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str(prefix) + # 1. create collection + await async_client.create_collection(collection_name, default_dim) + self.describe_collection(client, collection_name, + check_task=CheckTasks.check_describe_collection_property, + check_items={"collection_name": collection_name, + "dim": default_dim, + "consistency_level": 0}) + # 2. create partition + error = {ct.err_code: 65535, ct.err_msg: f"Invalid partition name: {partition_name}"} + await async_client.create_partition(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + # 3. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L1) + async def test_async_milvus_client_create_partition_partition_name_over_max_length(self): + """ + target: test create partition with partition name over max length 255 + method: create partition with partition name over max length 255 + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str(prefix) + # 1. create collection + await async_client.create_collection(collection_name, default_dim) + + partition_name = "a".join("a" for i in range(256)) + # 2. create partition + error = {ct.err_code: 65535, + ct.err_msg: f"Invalid partition name: {partition_name}. The length of a partition name " + f"must be less than 255 characters."} + await async_client.create_partition(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + + # 3. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L1) + async def test_async_milvus_client_create_partition_name_lists(self): + """ + target: test create partition with wrong partition name format list + method: create partition with wrong partition name format list + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str(prefix) + partition_names = [cf.gen_unique_str(partition_prefix), cf.gen_unique_str(partition_prefix)] + # 1. create collection + await async_client.create_collection(collection_name, default_dim) + # 2. create partition + error = {ct.err_code: 999, ct.err_msg: f"`partition_name` value {partition_names} is illegal"} + await async_client.create_partition(collection_name, partition_names, + check_task=CheckTasks.err_res, check_items=error) + # 3. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("collection_name", ["12-s", "12 s", "(mn)", "中文", "%$#"]) + async def test_async_milvus_client_drop_partition_invalid_collection_name(self, collection_name): + """ + target: test drop partition with invalid collection name + method: drop partition with invalid collection name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + partition_name = cf.gen_unique_str(partition_prefix) + # 1. create partition + error = {ct.err_code: 1100, ct.err_msg: f"Invalid collection name: {collection_name}. the first character of a " + f"collection name must be an underscore or letter: invalid parameter"} + await async_client.drop_partition(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + async def test_async_milvus_client_drop_partition_collection_name_over_max_length(self): + """ + target: test drop partition with collection name over max length 255 + method: drop partition with collection name over max length 255 + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = "a".join("a" for i in range(256)) + partition_name = cf.gen_unique_str(partition_prefix) + # 1. create partition + error = {ct.err_code: 1100, + ct.err_msg: f"Invalid collection name: {collection_name}. the length of a collection name " + f"must be less than 255 characters: invalid parameter"} + await async_client.drop_partition(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + async def test_async_milvus_client_drop_partition_collection_name_not_existed(self): + """ + target: test drop partition with nonexistent collection name + method: drop partition with nonexistent collection name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str("partition_not_exist") + partition_name = cf.gen_unique_str(partition_prefix) + # 1. create partition + error = {ct.err_code: 100, ct.err_msg: f"collection not found[database=default]" + f"[collection={collection_name}]"} + await async_client.drop_partition(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("partition_name", ["12 s", "(mn)", "中文", "%$#"]) + async def test_async_milvus_client_drop_partition_invalid_partition_name(self, partition_name): + """ + target: test drop partition with invalid partition name + method: drop partition with invalid partition name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str(prefix) + # 1. create collection + await async_client.create_collection(collection_name, default_dim) + # 2. create partition + error = {ct.err_code: 65535, ct.err_msg: f"Invalid partition name: {partition_name}."} + await async_client.drop_partition(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + # 3. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L1) + async def test_async_milvus_client_drop_partition_name_lists(self): + """ + target: test drop partition with wrong partition name format list + method: drop partition with wrong partition name format list + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str(prefix) + partition_names = [cf.gen_unique_str(partition_prefix), cf.gen_unique_str(partition_prefix)] + # 1. create collection + await async_client.create_collection(collection_name, default_dim) + # 2. create partition + error = {ct.err_code: 1, ct.err_msg: f"`partition_name` value {partition_names} is illegal"} + await async_client.drop_partition(collection_name, partition_names, + check_task=CheckTasks.err_res, check_items=error) + # 3. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("name", ["12-s", "12 s", "(mn)", "中文", "%$#"]) + async def test_async_milvus_client_load_partitions_invalid_collection_name(self, name): + """ + target: test load partitions with invalid collection name + method: load partitions with invalid collection name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + # 1. load partitions + partition_name = cf.gen_unique_str(prefix) + error = {ct.err_code: 1100, ct.err_msg: f"Invalid collection name: {name}. the first character of a collection name " + f"must be an underscore or letter: invalid parameter"} + await async_client.load_partitions(name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L2) + async def test_async_milvus_client_load_partitions_collection_not_existed(self): + """ + target: test load partitions with nonexistent collection name + method: load partitions with nonexistent collection name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + # 1. load partitions + collection_name = cf.gen_unique_str("nonexisted") + partition_name = cf.gen_unique_str(prefix) + error = {ct.err_code: 1100, ct.err_msg: f"collection not found[database=default]" + f"[collection={collection_name}]"} + await async_client.load_partitions(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L2) + async def test_async_milvus_client_load_partitions_collection_name_over_max_length(self): + """ + target: test load partitions with collection name over max length 255 + method: load partitions with collection name over max length 255 + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + # 1. load partitions + collection_name = "a".join("a" for i in range(256)) + partition_name = cf.gen_unique_str(prefix) + error = {ct.err_code: 1100, ct.err_msg: f"Invalid collection name: {collection_name}. " + f"the length of a collection name must be less than 255 characters: " + f"invalid parameter"} + await async_client.load_partitions(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.xfail(reason="pymilvus issue 1896") + @pytest.mark.parametrize("name", ["12 s", "(mn)", "中文", "%$#"]) + async def test_async_milvus_client_load_partitions_invalid_partition_name(self, name): + """ + target: test load partitions with invalid partition name + method: load partitions with invalid partition name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str(prefix) + # 1. create collection + await async_client.create_collection(collection_name, default_dim, consistency_level="Strong") + # 2. load partition + error = {ct.err_code: 1100, ct.err_msg: f"Invalid partition name: {name}. collection name can only " + f"contain numbers, letters and underscores: invalid parameter"} + await async_client.load_partitions(collection_name, name, + check_task=CheckTasks.err_res, check_items=error) + # 3. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.xfail(reason="pymilvus issue 1896") + async def test_async_milvus_client_load_partitions_partition_not_existed(self): + """ + target: test load partitions with nonexistent partition name + method: load partitions with nonexistent partition name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str(prefix) + partition_name = cf.gen_unique_str("nonexisted") + # 1. create collection + await async_client.create_collection(collection_name, default_dim, consistency_level="Strong") + # 2. load partition + error = {ct.err_code: 1100, ct.err_msg: f"partition not found[database=default]" + f"[collection={collection_name}]"} + await async_client.load_partitions(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + # 3. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.xfail(reason="pymilvus issue 1896") + async def test_async_milvus_client_load_partitions_partition_name_over_max_length(self): + """ + target: test load partitions with partition name over max length 255 + method: load partitions with partition name over max length 255 + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str(prefix) + partition_name = "a".join("a" for i in range(256)) + # 1. create collection + await async_client.create_collection(collection_name, default_dim, consistency_level="Strong") + # 2. load partition + error = {ct.err_code: 1100, ct.err_msg: f"invalid dimension: {collection_name}. " + f"the length of a collection name must be less than 255 characters: " + f"invalid parameter"} + await async_client.load_partitions(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + + # 3. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L2) + async def test_async_milvus_client_load_partitions_without_index(self): + """ + target: test load partitions after drop index + method: load partitions after drop index + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str(prefix) + partition_name = cf.gen_unique_str(partition_prefix) + # 1. create collection + await async_client.create_collection(collection_name, default_dim, consistency_level="Strong") + # 2. drop index + await async_client.release_collection(collection_name) + await async_client.drop_index(collection_name, "vector") + # 3. load partition + error = {ct.err_code: 700, ct.err_msg: f"index not found[collection={collection_name}]"} + await async_client.load_partitions(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + + # 4. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L2) + @pytest.mark.parametrize("collection_name", ["12-s", "12 s", "(mn)", "中文", "%$#"]) + async def test_async_milvus_client_release_partitions_invalid_collection_name(self, collection_name): + """ + target: test release partitions with invalid collection name + method: release partitions with invalid collection name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + partition_name = cf.gen_unique_str(partition_prefix) + # 1. release partitions + error = {ct.err_code: 1100, ct.err_msg: f"Invalid collection name: {collection_name}. the first character of a " + f"collection name must be an underscore or letter: invalid parameter"} + await async_client.release_partitions(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L2) + async def test_async_milvus_client_release_partitions_collection_name_over_max_length(self): + """ + target: test release partitions with collection name over max length 255 + method: release partitions with collection name over max length 255 + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = "a".join("a" for i in range(256)) + partition_name = cf.gen_unique_str(partition_prefix) + # 1. release partitions + error = {ct.err_code: 999, + ct.err_msg: f"Invalid collection name: {collection_name}. the length of a collection name " + f"must be less than 255 characters: invalid parameter"} + await async_client.release_partitions(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L2) + async def test_async_milvus_client_release_partitions_collection_name_not_existed(self): + """ + target: test release partitions with nonexistent collection name + method: release partitions with nonexistent collection name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str("collection_not_exist") + partition_name = cf.gen_unique_str(partition_prefix) + # 1. release partitions + error = {ct.err_code: 999, ct.err_msg: f"collection not found[database=default]" + f"[collection={collection_name}]"} + await async_client.release_partitions(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.xfail(reason="pymilvus issue 1896") + @pytest.mark.parametrize("partition_name", ["12 s", "(mn)", "中文", "%$#"]) + async def test_async_milvus_client_release_partitions_invalid_partition_name(self, partition_name): + """ + target: test release partitions with invalid partition name + method: release partitions with invalid partition name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str(prefix) + # 1. create collection + await async_client.create_collection(collection_name, default_dim) + # 2. release partitions + error = {ct.err_code: 65535, ct.err_msg: f"Invalid partition name: {partition_name}. The first character of a " + f"partition name must be an underscore or letter.]"} + await async_client.release_partitions(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + # 3. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.xfail(reason="pymilvus issue 1896") + async def test_async_milvus_client_release_partitions_invalid_partition_name_list(self): + """ + target: test release partitions with invalid partition name list + method: release partitions with invalid partition name list + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str(prefix) + # 1. create collection + await async_client.create_collection(collection_name, default_dim) + # 2. release partition + partition_name = ["12-s"] + error = {ct.err_code: 65535, ct.err_msg: f"Invalid partition name: {partition_name}. The first character of a " + f"partition name must be an underscore or letter.]"} + await async_client.release_partitions(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + # 3. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L2) + async def test_async_milvus_client_release_partitions_name_lists_empty(self): + """ + target: test release partitions with partition name list empty + method: release partitions with partition name list empty + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str(prefix) + partition_names = [] + # 1. create collection + await async_client.create_collection(collection_name, default_dim) + # 2. release partition + error = {ct.err_code: 999, ct.err_msg: f"invalid parameter[expected=any partition][actual=empty partition list"} + await async_client.release_partitions(collection_name, partition_names, + check_task=CheckTasks.err_res, check_items=error) + # 3. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L2) + async def test_async_milvus_client_release_partitions_name_lists_not_all_exists(self): + """ + target: test release partitions with partition name lists not all exists + method: release partitions with partition name lists not all exists + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str(prefix) + not_exist_partition = cf.gen_unique_str("partition_not_exist") + partition_names = ["_default", not_exist_partition] + # 1. create collection + await async_client.create_collection(collection_name, default_dim) + # 2. release partitions + error = {ct.err_code: 999, ct.err_msg: f"partition not found[partition={not_exist_partition}]"} + await async_client.release_partitions(collection_name, partition_names, + check_task=CheckTasks.err_res, check_items=error) + + # 3. drop action + await async_client.drop_collection(collection_name) + + @pytest.mark.tags(CaseLabel.L2) + async def test_async_milvus_client_release_partitions_partition_name_not_existed(self): + """ + target: test release partitions with nonexistent partition name + method: release partitions with nonexistent partition name + expected: raise exception + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + collection_name = cf.gen_unique_str(prefix) + partition_name = cf.gen_unique_str("partition_not_exist") + # 1. create collection + await async_client.create_collection(collection_name, default_dim) + # 2. release partitions + error = {ct.err_code: 200, ct.err_msg: f"partition not found[partition={partition_name}]"} + await async_client.release_partitions(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + partition_name = "" + error = {ct.err_code: 200, ct.err_msg: f"partition not found[partition={partition_name}]"} + await async_client.release_partitions(collection_name, partition_name, + check_task=CheckTasks.err_res, check_items=error) + # 3. drop action + await async_client.drop_collection(collection_name) + +class TestAsyncMilvusClientPartitionValid(TestMilvusClientV2Base): + """ Test case of partition interface """ + + def teardown_method(self, method): + self.init_async_milvus_client() + loop = asyncio.get_event_loop() + loop.run_until_complete(self.async_milvus_client_wrap.close()) + super().teardown_method(method) + + """ + ****************************************************************** + # The following are valid base cases + ****************************************************************** + """ + + @pytest.mark.tags(CaseLabel.L0) + async def test_async_milvus_client_create_drop_partition_default(self): + """ + target: test create and drop partition normal case + method: 1. create collection, partition 2. insert to partition 3. search and query 4. drop partition, collection + expected: run successfully + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + # 1. create collection + collection_name = cf.gen_unique_str(prefix) + await async_client.create_collection(collection_name, default_dim) + collections = self.list_collections(client)[0] + assert collection_name in collections + self.describe_collection(client, collection_name, + check_task=CheckTasks.check_describe_collection_property, + check_items={"collection_name": collection_name, + "dim": default_dim, + "consistency_level": 0}) + # 2. create partition + partition_name = cf.gen_unique_str(partition_prefix) + await async_client.create_partition(collection_name, partition_name) + partitions = self.list_partitions(client, collection_name)[0] + assert partition_name in partitions + # 3. insert + rng = np.random.default_rng(seed=19530) + rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]), + default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)] + self.insert(client, collection_name, rows, partition_name=partition_name) + tasks = [] + # 4. search + vectors_to_search = rng.random((1, default_dim)) + search_task = async_client.search(collection_name, vectors_to_search, + partition_names=[partition_name], + check_task=CheckTasks.check_search_results, + check_items={"enable_milvus_client_api": True, + "nq": len(vectors_to_search), + "limit": default_limit}) + tasks.append(search_task) + # 5. query + query_task = async_client.query(collection_name, filter=default_search_exp, + partition_names=[partition_name], + check_task=CheckTasks.check_query_results, + check_items={"exp_res": rows, + "with_vec": True, + "primary_field": default_primary_key_field_name}) + tasks.append(query_task) + res = await asyncio.gather(*tasks) + + # 6. drop action + if self.has_partition(client, collection_name, partition_name)[0]: + await async_client.release_partitions(collection_name, partition_name) + await async_client.drop_partition(collection_name, partition_name) + partitions = self.list_partitions(client, collection_name)[0] + assert partition_name not in partitions + await async_client.drop_collection(collection_name) + + + @pytest.mark.tags(CaseLabel.L0) + async def test_async_milvus_client_load_release_partitions(self): + """ + target: test load and release partitions normal case + method: 1. create collection, two partitions + 2. insert different data to two partitions + 3. search and query + 4. release partitions, search and query + 5. load partitions, search and query + 4. drop partition, collection + expected: run successfully + """ + client = self._client() + self.init_async_milvus_client() + async_client = self.async_milvus_client_wrap + + # 1. create collection + collection_name = cf.gen_unique_str(prefix) + await async_client.create_collection(collection_name, default_dim) + collections = self.list_collections(client)[0] + assert collection_name in collections + self.describe_collection(client, collection_name, + check_task=CheckTasks.check_describe_collection_property, + check_items={"collection_name": collection_name, + "dim": default_dim, + "consistency_level": 0}) + # 2. create partition + partition_name_1 = cf.gen_unique_str(partition_prefix) + await async_client.create_partition(collection_name, partition_name_1) + partition_name_2 = cf.gen_unique_str(partition_prefix) + await async_client.create_partition(collection_name, partition_name_2) + partitions = self.list_partitions(client, collection_name)[0] + assert partition_name_1 in partitions + assert partition_name_2 in partitions + # 3. insert + rng = np.random.default_rng(seed=19530) + rows_default = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]), + default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)] + self.insert(client, collection_name, rows_default) + rows_1 = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]), + default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb, 2 * default_nb)] + self.insert(client, collection_name, rows_1, partition_name=partition_name_1) + rows_2 = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]), + default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(2 * default_nb, 3 * default_nb)] + self.insert(client, collection_name, rows_2, partition_name=partition_name_2) + tasks = [] + # 4. search and query + vectors_to_search = rng.random((1, default_dim)) + # search single partition + search_task = async_client.search(collection_name, vectors_to_search, + partition_names=[partition_name_1], + check_task=CheckTasks.check_search_results, + check_items={"enable_milvus_client_api": True, + "nq": len(vectors_to_search), + "limit": default_limit}) + tasks.append(search_task) + # search multi partition + search_task_multi = async_client.search(collection_name, vectors_to_search, + partition_names=[partition_name_1, partition_name_2], + check_task=CheckTasks.check_search_results, + check_items={"enable_milvus_client_api": True, + "nq": len(vectors_to_search), + "limit": default_limit}) + tasks.append(search_task_multi) + # query single partition + query_task = async_client.query(collection_name, filter=default_search_exp, + partition_names=[partition_name_1], + check_task=CheckTasks.check_query_results, + check_items={"exp_res": rows_1, + "with_vec": True, + "primary_field": default_primary_key_field_name}) + tasks.append(query_task) + # query multi partition + query_task_multi = async_client.query(collection_name, filter=default_search_exp, + partition_names=[partition_name_1, partition_name_2], + check_task=CheckTasks.check_query_results, + check_items={"exp_res": rows_1 + rows_2, + "with_vec": True, + "primary_field": default_primary_key_field_name}) + tasks.append(query_task_multi) + res = await asyncio.gather(*tasks) + # 5. release partitions, search and query + await async_client.release_partitions(collection_name, partition_name_1) + error = {ct.err_code: 201, ct.err_msg: "partition not loaded"} + await async_client.search(collection_name, vectors_to_search, + partition_names=[partition_name_1], + check_task=CheckTasks.err_res, + check_items=error) + + await async_client.query(collection_name, filter=default_search_exp, + partition_names=[partition_name_1], + check_task=CheckTasks.err_res, + check_items=error) + + await async_client.search(collection_name, vectors_to_search, + partition_names=[partition_name_2], + check_task=CheckTasks.check_search_results, + check_items={"enable_milvus_client_api": True, + "nq": len(vectors_to_search), + "limit": default_limit}) + await async_client.query(collection_name, filter=default_search_exp, + partition_names=[partition_name_2], + check_task=CheckTasks.check_query_results, + check_items={"exp_res": rows_2, + "with_vec": True, + "primary_field": default_primary_key_field_name}) + + # 6. load partitions, search and query + tasks_after_load = [] + await async_client.load_partitions(collection_name, [partition_name_1, partition_name_2]) + search_task = async_client.search(collection_name, vectors_to_search, + check_task=CheckTasks.check_search_results, + check_items={"enable_milvus_client_api": True, + "nq": len(vectors_to_search), + "limit": default_limit}) + tasks_after_load.append(search_task) + query_task = async_client.query(collection_name, filter=default_search_exp, + check_task=CheckTasks.check_query_results, + check_items={"exp_res": rows_default + rows_1 + rows_2, + "with_vec": True, + "primary_field": default_primary_key_field_name}) + tasks_after_load.append(query_task) + res = await asyncio.gather(*tasks_after_load) + + # 7. drop action + await async_client.drop_collection(collection_name) \ No newline at end of file