test: add create collection V2 cases for milvus client (#43684)

issue: #43590
Migrate collection test cases from TestcaseBase to
TestMilvusClientV2Base
+25 cases in test_milvus_client_collection.py
-27 cases in test_collection.py

@yanliang567

---------

Signed-off-by: Orpheus Wang <orpheus.wang@zilliz.com>
This commit is contained in:
9Eurydice9 2025-08-06 16:17:40 +08:00 committed by GitHub
parent 8ff118a9ff
commit 8578f8e13e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 788 additions and 745 deletions

View File

@ -6,6 +6,7 @@ from common import common_func as cf
from common import common_type as ct
from common.common_type import CaseLabel, CheckTasks
from utils.util_pymilvus import *
from pymilvus.client.types import LoadState
prefix = "client_collection"
epsilon = ct.epsilon
@ -229,6 +230,48 @@ class TestMilvusClientCollectionInvalid(TestMilvusClientV2Base):
self.create_collection(client, collection_name, schema=schema,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L0)
def test_milvus_client_collection_without_vectors(self):
"""
target: test create collection without vectors
method: create collection only with int field
expected: raise exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# Create schema with only non-vector fields
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field("int_field", DataType.INT64, is_primary=True, auto_id=False)
error = {ct.err_code: 1100, ct.err_msg: "schema does not contain vector field: invalid parameter"}
self.create_collection(client, collection_name, schema=schema,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("vector_type", [DataType.FLOAT_VECTOR, DataType.INT8_VECTOR, DataType.BINARY_VECTOR])
def test_milvus_client_collection_without_primary_field(self, vector_type):
"""
target: test create collection without primary field
method: no primary field specified in collection schema and fields
expected: raise exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# Create schema with fields but no primary key
schema1 = self.create_schema(client, enable_dynamic_field=False)[0]
schema1.add_field("int_field", DataType.INT64) # Not primary
schema1.add_field("vector_field", vector_type, dim=default_dim)
error = {ct.err_code: 1100, ct.err_msg: "Schema must have a primary key field"}
self.create_collection(client, collection_name, schema=schema1,
check_task=CheckTasks.err_res, check_items=error)
# Create schema with only vector field
schema2 = self.create_schema(client, enable_dynamic_field=False)[0]
schema2.add_field("vector_field", vector_type, dim=default_dim)
error = {ct.err_code: 1100, ct.err_msg: "Schema must have a primary key field"}
self.create_collection(client, collection_name, schema=schema2,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_collection_dup_field(self):
"""
@ -307,6 +350,24 @@ class TestMilvusClientCollectionInvalid(TestMilvusClientV2Base):
self.create_collection(client, collection_name, schema=schema,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("shards_num,error_type", [(ct.max_shards_num + 1, "range"), (257, "range"), (1.0, "type"), ("2", "type")])
def test_milvus_client_collection_invalid_shards(self, shards_num, error_type):
"""
target: test collection with invalid shards_num values
method: create collection with shards_num that are out of valid range or wrong type
expected: raise exception with appropriate error message
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
if error_type == "range":
error = {ct.err_code: 1, ct.err_msg: f"maximum shards's number should be limited to {ct.max_shards_num}"}
else: # error_type == "type"
error = {ct.err_code: 999, ct.err_msg: "invalid num_shards type"}
# Try to create collection with invalid shards_num (should fail)
self.create_collection(client, collection_name, default_dim, shards_num=shards_num,
check_task=CheckTasks.err_res, check_items=error)
class TestMilvusClientCollectionValid(TestMilvusClientV2Base):
""" Test case of create collection interface """
@ -920,6 +981,68 @@ class TestMilvusClientCollectionValid(TestMilvusClientV2Base):
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L0)
def test_milvus_client_collection_binary(self):
"""
target: test collection with binary-vec
method: create collection with binary vector field
expected: collection created successfully with binary vector field
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# Create schema with binary vector field
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(ct.default_int64_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(ct.default_binary_vec_field_name, DataType.BINARY_VECTOR, dim=default_dim)
self.create_collection(client, collection_name, schema=schema)
collections = self.list_collections(client)[0]
assert collection_name in collections
collection_info = self.describe_collection(client, collection_name)[0]
field_names = [field["name"] for field in collection_info["fields"]]
assert ct.default_int64_field_name in field_names
assert ct.default_binary_vec_field_name in field_names
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_collection_multi_create_drop(self):
"""
target: test cycle creation and deletion of multiple collections
method: in a loop, collections are created and deleted sequentially
expected: no exception, each collection is created and dropped successfully
"""
client = self._client()
c_num = 20
for i in range(c_num):
collection_name = cf.gen_collection_name_by_testcase_name() + f"_{i}"
self.create_collection(client, collection_name, default_dim)
collections = self.list_collections(client)[0]
assert collection_name in collections
# Drop the collection
self.drop_collection(client, collection_name)
collections_after_drop = self.list_collections(client)[0]
assert collection_name not in collections_after_drop
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_collection_after_drop(self):
"""
target: test create collection after create and drop
method: 1. create a collection 2. drop the collection 3. re-create with same name
expected: no exception, collection can be recreated with the same name after dropping
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
self.create_collection(client, collection_name, default_dim)
self.drop_collection(client, collection_name)
assert not self.has_collection(client, collection_name)[0]
self.create_collection(client, collection_name, default_dim)
assert self.has_collection(client, collection_name)[0]
self.drop_collection(client, collection_name)
class TestMilvusClientDropCollectionInvalid(TestMilvusClientV2Base):
""" Test case of drop collection interface """
@ -1044,9 +1167,9 @@ class TestMilvusClientReleaseCollectionValid(TestMilvusClientV2Base):
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_release_unloaded_collection(self):
"""
target: test fast create collection normal case
method: create collection
expected: create collection with default schema, index, and load successfully
target: Test releasing a collection that has not been loaded
method: Create a collection and call release_collection multiple times without loading
expected: No raising errors, and the collection can still be dropped
"""
client = self._client()
collection_name = cf.gen_unique_str(prefix)
@ -1058,16 +1181,18 @@ class TestMilvusClientReleaseCollectionValid(TestMilvusClientV2Base):
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_load_partially_loaded_collection(self):
def test_milvus_client_release_partition_after_load_collection(self):
"""
target: test fast create collection normal case
method: create collection
expected: create collection with default schema, index, and load successfully
target: test releasing specific partitions after loading entire collection
method: 1. create collection and partition
2. load entire collection
3. attempt to release specific partition while collection is loaded
expected: partition release operations work correctly with loaded collection
"""
client = self._client()
collection_name = cf.gen_unique_str(prefix)
partition_name = cf.gen_unique_str("partition")
# 1. create collection
# 1. create collection and partition
self.create_collection(client, collection_name, default_dim)
self.create_partition(client, collection_name, partition_name)
self.release_partitions(client, collection_name, ["_default", partition_name])
@ -1116,6 +1241,50 @@ class TestMilvusClientLoadCollectionInvalid(TestMilvusClientV2Base):
self.load_collection(client, collection_name,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_load_collection_after_drop(self):
"""
target: test load collection after it has been dropped
method: 1. create collection
2. drop the collection
3. try to load the dropped collection
expected: raise exception indicating collection not found
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
self.create_collection(client, collection_name, default_dim)
self.drop_collection(client, collection_name)
error = {ct.err_code: 999, ct.err_msg: "collection not found"}
self.load_collection(client, collection_name,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_load_release_collection(self):
"""
target: test load, release non-exist collection
method: 1. load, release and drop collection
2. load and release dropped collection
expected: raise exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# Create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
self.release_collection(client, collection_name)
self.drop_index(client, collection_name, "vector")
# Prepare and create index
index_params = self.prepare_index_params(client)[0]
index_params.add_index(field_name="vector", index_type="HNSW", metric_type="L2")
self.create_index(client, collection_name, index_params)
# Load, release and drop collection
self.load_collection(client, collection_name)
self.release_collection(client, collection_name)
self.drop_collection(client, collection_name)
# Try to load and release dropped collection - should raise exception
error = {ct.err_code: 100, ct.err_msg: "collection not found"}
self.load_collection(client, collection_name, check_task=CheckTasks.err_res, check_items=error)
self.release_collection(client, collection_name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_load_collection_over_max_length(self):
"""
@ -1134,9 +1303,9 @@ class TestMilvusClientLoadCollectionInvalid(TestMilvusClientV2Base):
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_load_collection_without_index(self):
"""
target: test fast create collection normal case
method: create collection
expected: create collection with default schema, index, and load successfully
target: test loading a collection without an index
method: create a collection, drop its index, then attempt to load the collection
expected: loading should fail with an 'index not found' error
"""
client = self._client()
collection_name = cf.gen_unique_str(prefix)
@ -1150,6 +1319,190 @@ class TestMilvusClientLoadCollectionInvalid(TestMilvusClientV2Base):
if self.has_collection(client, collection_name)[0]:
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("partition_names", [[], None])
def test_milvus_client_load_partition_names_empty(self, partition_names):
"""
target: test load partitions with empty partition names list
method: 1. create collection and partition
2. insert data into both default partition and custom partition
3. create index
4. attempt to load with empty partition_names list
expected: should raise exception indicating no partition specified
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
partition_name = cf.gen_unique_str("partition")
# 1. Create collection and partition
self.create_collection(client, collection_name, default_dim)
self.create_partition(client, collection_name, partition_name)
self.release_collection(client, collection_name)
self.drop_index(client, collection_name, "vector")
# 2. Insert data into both partitions
rng = np.random.default_rng(seed=19530)
half = default_nb // 2
# Insert into default partition
data_default = [{
default_primary_key_field_name: i,
default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0
} for i in range(half)]
self.insert(client, collection_name, data_default, partition_name="_default")
# Insert into custom partition
data_partition = [{
default_primary_key_field_name: i + half,
default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: (i + half) * 1.0
} for i in range(half)]
self.insert(client, collection_name, data_partition, partition_name=partition_name)
# 3. Create index
self.flush(client, collection_name)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(field_name="vector", index_type="HNSW", metric_type="L2")
self.create_index(client, collection_name, index_params)
# 4. Attempt to load with empty partition_names list
error = {ct.err_code: 0, ct.err_msg: "due to no partition specified"}
self.load_partitions(client, collection_name, partition_names=partition_names,
check_task=CheckTasks.err_res, check_items=error)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("invalid_num_replica", [0.2, "not-int"])
def test_milvus_client_load_replica_non_number(self, invalid_num_replica):
"""
target: test load collection with non-number replicas
method: load with non-number replicas
expected: raise exceptions
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. Create collection and insert data
self.create_collection(client, collection_name, default_dim)
self.release_collection(client, collection_name)
self.drop_index(client, collection_name, "vector")
# 2. Insert data
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
self.insert(client, collection_name, rows)
# Verify entity count
self.flush(client, collection_name)
stats = self.get_collection_stats(client, collection_name)[0]
assert stats['row_count'] == default_nb
# 3. Create index
index_params = self.prepare_index_params(client)[0]
index_params.add_index(field_name="vector", index_type="HNSW", metric_type="L2")
self.create_index(client, collection_name, index_params)
# 4. Attempt to load with invalid replica_number
error = {ct.err_code: 999, ct.err_msg: f"`replica_number` value {invalid_num_replica} is illegal"}
self.load_collection(client, collection_name, replica_number=invalid_num_replica,
check_task=CheckTasks.err_res, check_items=error)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("replicas", [None, -1, 0])
def test_milvus_client_load_replica_invalid_input(self, replicas):
"""
target: test load partition with invalid replica number or None
method: load with invalid replica number or None
expected: load successfully as replica = 1
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. Create collection and prepare
self.create_collection(client, collection_name, default_dim)
self.release_collection(client, collection_name)
self.drop_index(client, collection_name, "vector")
# 2. Insert data
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
self.insert(client, collection_name, rows)
# Verify entity count
self.flush(client, collection_name)
stats = self.get_collection_stats(client, collection_name)[0]
assert stats['row_count'] == default_nb
# 3. Create index
index_params = self.prepare_index_params(client)[0]
index_params.add_index(field_name="vector", index_type="HNSW", metric_type="L2")
self.create_index(client, collection_name, index_params)
# 4. Load with invalid replica_number (should succeed as replica=1)
self.load_collection(client, collection_name, replica_number=replicas)
# 5. Verify replicas
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after loading collection, but got {load_state['state']}"
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_load_replica_greater_than_querynodes(self):
"""
target: test load with replicas that greater than querynodes
method: load with 3 replicas (2 querynode)
expected: Raise exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# 1. Create collection
self.create_collection(client, collection_name, default_dim)
self.release_collection(client, collection_name)
self.drop_index(client, collection_name, "vector")
# 2. Insert data
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
self.insert(client, collection_name, rows)
# 3. Verify entity count
self.flush(client, collection_name)
stats = self.get_collection_stats(client, collection_name)[0]
assert stats['row_count'] == default_nb
# 4. Create index
index_params = self.prepare_index_params(client)[0]
index_params.add_index(field_name="vector", index_type="HNSW", metric_type="L2")
self.create_index(client, collection_name, index_params)
# 5. Load with replica_number=3 (should fail if only 2 querynodes available)
error = {ct.err_code: 999,
ct.err_msg: "call query coordinator LoadCollection: when load 3 replica count: "
"service resource insufficient[currentStreamingNode=1][expectedStreamingNode=3]"}
self.load_collection(client, collection_name, replica_number=3,
check_task=CheckTasks.err_res, check_items=error)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_load_collection_after_disconnect(self):
"""
target: test load/release collection operations after connection is closed
method: 1. create collection with client
2. close the client connection
3. try to load collection with disconnected client
expected: operations should raise appropriate connection errors
"""
client_temp = self._client(alias="client_temp")
collection_name = cf.gen_collection_name_by_testcase_name()
self.create_collection(client_temp, collection_name, default_dim)
self.close(client_temp)
error = {ct.err_code: 1, ct.err_msg: 'should create connection first'}
self.load_collection(client_temp, collection_name,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_release_collection_after_disconnect(self):
"""
target: test load/release collection operations after connection is closed
method: 1. create collection with client
2. close the client connection
3. try to release collection with disconnected client
expected: operations should raise appropriate connection errors
"""
client_temp = self._client(alias="client_temp2")
collection_name = cf.gen_collection_name_by_testcase_name()
self.create_collection(client_temp, collection_name, default_dim)
self.close(client_temp)
error = {ct.err_code: 999, ct.err_msg: 'should create connection first'}
self.release_collection(client_temp, collection_name,
check_task=CheckTasks.err_res, check_items=error)
class TestMilvusClientLoadCollectionValid(TestMilvusClientV2Base):
""" Test case of search interface """
@ -1188,27 +1541,443 @@ class TestMilvusClientLoadCollectionValid(TestMilvusClientV2Base):
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_load_partially_loaded_collection(self):
def test_milvus_client_load_partition_after_release_collection(self):
"""
target: test fast create collection normal case
method: create collection
expected: create collection with default schema, index, and load successfully
target: test mixed loading scenarios with partial partitions and full collection
method: 1. create collection and partition
2. load specific partition first
3. then load entire collection
4. release and load again
expected: all loading operations work correctly without conflicts
"""
client = self._client()
collection_name = cf.gen_unique_str(prefix)
partition_name = cf.gen_unique_str("partition")
# 1. create collection
# Step 1: Create collection and partition
self.create_collection(client, collection_name, default_dim)
self.create_partition(client, collection_name, partition_name)
# Step 2: Release collection and verify state NotLoad
self.release_collection(client, collection_name)
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.NotLoad, f"Expected NotLoad after release, but got {load_state['state']}"
# Step 3: Load specific partition and verify state changes to Loaded
self.load_partitions(client, collection_name, [partition_name])
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after loading partition, but got {load_state['state']}"
# Step 4: Load entire collection and verify state remains Loaded
self.load_collection(client, collection_name)
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after loading collection, but got {load_state['state']}"
# Step 5: Release collection and verify state changes to NotLoad
self.release_collection(client, collection_name)
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.NotLoad, f"Expected NotLoad after release, but got {load_state['state']}"
# Step 6: Load multiple partitions and verify state changes to Loaded
self.load_partitions(client, collection_name, ["_default", partition_name])
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after loading partitions, but got {load_state['state']}"
# Step 7: Load collection again and verify state remains Loaded
self.load_collection(client, collection_name)
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after final load collection, but got {load_state['state']}"
# Step 8: Cleanup - drop collection if it exists
if self.has_collection(client, collection_name)[0]:
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_load_partitions_after_load_collection(self):
"""
target: test load partitions after load collection
method: 1. load collection
2. load partitions
3. search on one partition
expected: No exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
partition_name_1 = cf.gen_unique_str("partition1")
partition_name_2 = cf.gen_unique_str("partition2")
# Create collection and partitions
self.create_collection(client, collection_name, default_dim)
self.create_partition(client, collection_name, partition_name_1)
self.create_partition(client, collection_name, partition_name_2)
# Verify initial state is Loaded
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after loading collection, but got {load_state['state']}"
# Load collection and verify state
self.load_collection(client, collection_name)
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after loading collection, but got {load_state['state']}"
# Load partitions and verify state (should remain Loaded)
self.load_partitions(client, collection_name, [partition_name_1, partition_name_2])
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after loading partitions, but got {load_state['state']}"
# Search on one partition
vectors_to_search = np.random.default_rng(seed=19530).random((1, default_dim))
self.search(client, collection_name, vectors_to_search,
limit=default_limit, partition_names=[partition_name_1])
# Verify state remains Loaded after search
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after search, but got {load_state['state']}"
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L0)
def test_milvus_client_collection_load_release_comprehensive(self):
"""
target: comprehensive test for collection load/release operations with search/query validation
method: 1. test collection load -> search/query (should work)
2. test collection release -> search/query (should fail)
3. test repeated load/release operations
4. test load after release
expected: proper search/query behavior based on collection load/release state
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# Step 1: Create collection with data for testing
self.create_collection(client, collection_name, default_dim)
# Step 2: Test point 1 - loaded collection can be searched/queried
self.load_collection(client, collection_name)
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded, but got {load_state['state']}"
vectors_to_search = np.random.default_rng(seed=19530).random((1, default_dim))
self.search(client, collection_name, vectors_to_search, limit=default_limit)
self.query(client, collection_name, filter=default_search_exp)
# Step 3: Test point 2 - loaded collection can be loaded again
self.load_collection(client, collection_name)
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after repeated load, but got {load_state['state']}"
# Step 4: Test point 3 - released collection cannot be searched/queried
self.release_collection(client, collection_name)
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.NotLoad, f"Expected NotLoad, but got {load_state['state']}"
error_search = {ct.err_code: 101, ct.err_msg: "collection not loaded"}
self.search(client, collection_name, vectors_to_search, limit=default_limit,
check_task=CheckTasks.err_res, check_items=error_search)
error_query = {ct.err_code: 101, ct.err_msg: "collection not loaded"}
self.query(client, collection_name, filter=default_search_exp,
check_task=CheckTasks.err_res, check_items=error_query)
# Step 5: Test point 4 - released collection can be released again
self.release_collection(client, collection_name)
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.NotLoad, f"Expected NotLoad after repeated release, but got {load_state['state']}"
# Step 6: Test point 5 - released collection can be loaded again
self.load_collection(client, collection_name)
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after reload, but got {load_state['state']}"
self.search(client, collection_name, vectors_to_search, limit=default_limit)
# Step 7: Cleanup
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L0)
def test_milvus_client_partition_load_release_comprehensive(self):
"""
target: comprehensive test for partition load/release operations with search/query validation
method: 1. test partition load -> search/query
2. test partition release -> search/query (should fail)
3. test repeated load/release operations
4. test load after release
expected: proper search/query behavior based on partition load/release state
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
partition_name_1 = cf.gen_unique_str("partition1")
partition_name_2 = cf.gen_unique_str("partition2")
# Step 1: Create collection with partitions
self.create_collection(client, collection_name, default_dim)
self.create_partition(client, collection_name, partition_name_1)
self.create_partition(client, collection_name, partition_name_2)
# Step 2: Test point 1 - loaded partitions can be searched/queried
self.load_partitions(client, collection_name, [partition_name_1, partition_name_2])
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded, but got {load_state['state']}"
vectors_to_search = np.random.default_rng(seed=19530).random((1, default_dim))
self.search(client, collection_name, vectors_to_search, limit=default_limit, partition_names=[partition_name_1, partition_name_2])
self.query(client, collection_name, filter=default_search_exp, partition_names=[partition_name_1, partition_name_2])
# Step 3: Test point 2 - loaded partitions can be loaded again
self.load_partitions(client, collection_name, [partition_name_1, partition_name_2])
self.search(client, collection_name, vectors_to_search, limit=default_limit, partition_names=[partition_name_1, partition_name_2])
self.query(client, collection_name, filter=default_search_exp, partition_names=[partition_name_1, partition_name_2])
# Step 4: Test point 3 - released partitions cannot be searched/queried
self.release_partitions(client, collection_name, [partition_name_1])
error_search = {ct.err_code: 201, ct.err_msg: "partition not loaded"}
self.search(client, collection_name, vectors_to_search, limit=default_limit, partition_names=[partition_name_1],
check_task=CheckTasks.err_res, check_items=error_search)
error_query = {ct.err_code: 201, ct.err_msg: "partition not loaded"}
self.query(client, collection_name, filter=default_search_exp, partition_names=[partition_name_1],
check_task=CheckTasks.err_res, check_items=error_query)
# Non-released partition should still work
self.search(client, collection_name, vectors_to_search, limit=default_limit, partition_names=[partition_name_2])
# Step 5: Test point 4 - released partitions can be released again
self.release_partitions(client, collection_name, [partition_name_1]) # Release again
error_search = {ct.err_code: 201, ct.err_msg: "partition not loaded"}
self.search(client, collection_name, vectors_to_search, limit=default_limit, partition_names=[partition_name_1],
check_task=CheckTasks.err_res, check_items=error_search)
# Step 6: Test point 5 - released partitions can be loaded again
self.load_partitions(client, collection_name, [partition_name_1])
self.search(client, collection_name, vectors_to_search, limit=default_limit, partition_names=[partition_name_1])
# Step 8: Cleanup
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_mixed_collection_partition_operations_comprehensive(self):
"""
target: comprehensive test for mixed collection/partition load/release operations
method: 1. test collection load -> partition release -> mixed behavior
2. test partition load -> collection load -> behavior
3. test collection release -> partition load -> behavior
expected: consistent behavior across mixed operations
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
partition_name_1 = cf.gen_unique_str("partition1")
partition_name_2 = cf.gen_unique_str("partition2")
# Step 1: Setup collection with partitions
self.create_collection(client, collection_name, default_dim)
self.create_partition(client, collection_name, partition_name_1)
self.create_partition(client, collection_name, partition_name_2)
vectors_to_search = np.random.default_rng(seed=19530).random((1, default_dim))
# Step 2: Test Release partition after collection release
self.release_collection(client, collection_name)
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.NotLoad, f"Expected NotLoad after collection release, but got {load_state['state']}"
self.release_partitions(client, collection_name, ["_default"])
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.NotLoad, f"Expected NotLoad after default partition release, but got {load_state['state']}"
# Step 3: Load specific partitions
self.load_partitions(client, collection_name, [partition_name_1])
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.Loaded, f"Expected Loaded after partition load, but got {load_state['state']}"
# Search should work on loaded partitions
self.search(client, collection_name, vectors_to_search, limit=default_limit, partition_names=[partition_name_1])
self.query(client, collection_name, filter=default_search_exp, partition_names=[partition_name_1])
# Step 4: Test load collection after partition load
self.load_collection(client, collection_name)
self.search(client, collection_name, vectors_to_search, limit=default_limit, partition_names=[partition_name_1, partition_name_2])
self.query(client, collection_name, filter=default_search_exp, partition_names=[partition_name_1, partition_name_2])
# Step 5: Test edge case - release all partitions individually
self.release_partitions(client, collection_name, ["_default", partition_name_1, partition_name_2])
load_state = self.get_load_state(client, collection_name)[0]
assert load_state["state"] == LoadState.NotLoad, f"Expected NotLoad after releasing all partitions, but got {load_state['state']}"
error_search = {ct.err_code: 101, ct.err_msg: "collection not loaded"}
self.search(client, collection_name, vectors_to_search, limit=default_limit,
check_task=CheckTasks.err_res, check_items=error_search)
# Step 6: Test release collection after partition release
self.release_collection(client, collection_name)
assert load_state["state"] == LoadState.NotLoad, f"Expected NotLoad after releasing all partitions, but got {load_state['state']}"
error = {ct.err_code: 101, ct.err_msg: "collection not loaded"}
self.search(client, collection_name, vectors_to_search, limit=default_limit,
check_task=CheckTasks.err_res, check_items=error)
self.query(client, collection_name, filter=default_search_exp,
check_task=CheckTasks.err_res, check_items=error)
# Step 7: Cleanup
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_load_collection_after_drop_partition_and_release_another(self):
"""
target: test load collection after drop a partition and release another
method: 1. load collection
2. drop a partition
3. release left partition
4. query on the left partition
5. load collection
expected: No exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
partition_name_1 = cf.gen_unique_str("partition1")
partition_name_2 = cf.gen_unique_str("partition2")
self.create_collection(client, collection_name, default_dim)
self.create_partition(client, collection_name, partition_name_1)
self.create_partition(client, collection_name, partition_name_2)
self.load_collection(client, collection_name)
self.release_partitions(client, collection_name, [partition_name_1])
self.drop_partition(client, collection_name, partition_name_1)
self.release_partitions(client, collection_name, [partition_name_2])
error = {ct.err_code: 65538, ct.err_msg: 'partition not loaded'}
self.query(client, collection_name, filter=default_search_exp,
partition_names=[partition_name_2],
check_task=CheckTasks.err_res, check_items=error)
self.load_collection(client, collection_name)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_load_partition_after_drop_partition_and_release_another(self):
"""
target: test load partition after drop a partition and release another
method: 1. load collection
2. drop a partition
3. release left partition
4. load partition
5. query on the partition
expected: No exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
partition_name_1 = cf.gen_unique_str("partition1")
partition_name_2 = cf.gen_unique_str("partition2")
self.create_collection(client, collection_name, default_dim)
self.create_partition(client, collection_name, partition_name_1)
self.create_partition(client, collection_name, partition_name_2)
self.load_collection(client, collection_name)
self.release_partitions(client, collection_name, [partition_name_1])
self.drop_partition(client, collection_name, partition_name_1)
self.release_partitions(client, collection_name, [partition_name_2])
self.load_partitions(client, collection_name, [partition_name_2])
self.query(client, collection_name, filter=default_search_exp,
partition_names=[partition_name_2])
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_load_another_partition_after_drop_one_partition(self):
"""
target: test load another partition after drop a partition
method: 1. load collection
2. drop a partition
3. load another partition
4. query on the partition
expected: No exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
partition_name_1 = cf.gen_unique_str("partition1")
partition_name_2 = cf.gen_unique_str("partition2")
self.create_collection(client, collection_name, default_dim)
self.create_partition(client, collection_name, partition_name_1)
self.create_partition(client, collection_name, partition_name_2)
self.load_collection(client, collection_name)
self.release_partitions(client, collection_name, [partition_name_1])
self.drop_partition(client, collection_name, partition_name_1)
self.load_partitions(client, collection_name, [partition_name_2])
self.query(client, collection_name, filter=default_search_exp,
partition_names=[partition_name_2])
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_load_collection_after_drop_one_partition(self):
"""
target: test load collection after drop a partition
method: 1. load collection
2. drop a partition
3. load collection
4. query on the partition
expected: No exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
partition_name_1 = cf.gen_unique_str("partition1")
partition_name_2 = cf.gen_unique_str("partition2")
self.create_collection(client, collection_name, default_dim)
self.create_partition(client, collection_name, partition_name_1)
self.create_partition(client, collection_name, partition_name_2)
self.load_collection(client, collection_name)
self.release_partitions(client, collection_name, [partition_name_1])
self.drop_partition(client, collection_name, partition_name_1)
self.load_collection(client, collection_name)
# Query on the remaining partition
self.query(client, collection_name, filter=default_search_exp,
partition_names=[partition_name_2])
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L0)
def test_milvus_client_load_collection_after_index(self):
"""
target: test load collection, after index created
method: insert and create index, load collection with correct params
expected: no error raised
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# Create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
self.release_collection(client, collection_name)
self.drop_index(client, collection_name, "vector")
# Insert data
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
self.insert(client, collection_name, rows)
# Prepare and create index
index_params = self.prepare_index_params(client)[0]
index_params.add_index(field_name="vector", index_type="IVF_SQ8", metric_type="L2")
self.create_index(client, collection_name, index_params)
# Load and release collection
self.load_collection(client, collection_name)
self.release_collection(client, collection_name)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L0)
def test_milvus_client_load_collection_after_load_release(self):
"""
target: test load collection after load and release
method: 1.load and release collection after entities flushed
2.re-load collection
expected: No exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# Create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
self.release_collection(client, collection_name)
self.drop_index(client, collection_name, "vector")
# Insert data
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
self.insert(client, collection_name, rows)
# Verify entity count
self.flush(client, collection_name)
stats = self.get_collection_stats(client, collection_name)[0]
assert stats['row_count'] == default_nb
# Prepare and create index
index_params = self.prepare_index_params(client)[0]
index_params.add_index(field_name="vector", index_type="HNSW", metric_type="L2")
self.create_index(client, collection_name, index_params)
# Load, release, and re-load collection
self.load_collection(client, collection_name)
self.release_collection(client, collection_name)
self.load_collection(client, collection_name)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L2)
def test_milvus_client_load_collection_repeatedly(self):
"""
target: test load collection repeatedly
method: load collection twice
expected: No exception
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# Create collection
self.create_collection(client, collection_name, default_dim, consistency_level="Strong")
self.release_collection(client, collection_name)
self.drop_index(client, collection_name, "vector")
# Insert data
rng = np.random.default_rng(seed=19530)
rows = [{default_primary_key_field_name: i, default_vector_field_name: list(rng.random((1, default_dim))[0]),
default_float_field_name: i * 1.0, default_string_field_name: str(i)} for i in range(default_nb)]
self.insert(client, collection_name, rows)
# Verify entity count
self.flush(client, collection_name)
stats = self.get_collection_stats(client, collection_name)[0]
assert stats['row_count'] == default_nb
# Prepare and create index
index_params = self.prepare_index_params(client)[0]
index_params.add_index(field_name="vector", index_type="HNSW", metric_type="L2")
self.create_index(client, collection_name, index_params)
# Load collection twice (test repeated loading)
self.load_collection(client, collection_name)
self.load_collection(client, collection_name)
self.drop_collection(client, collection_name)
class TestMilvusClientDescribeCollectionInvalid(TestMilvusClientV2Base):
""" Test case of search interface """
@ -1613,4 +2382,5 @@ class TestMilvusClientCollectionPropertiesValid(TestMilvusClientV2Base):
describe = self.describe_collection(client, collection_name)[0].get("properties")
assert "mmap.enabled" not in describe
#TODO add case that confirm the parameter is actually invalid
self.drop_collection(client, collection_name)
self.drop_collection(client, collection_name)

View File

@ -106,18 +106,6 @@ class TestCollectionParams(TestcaseBase):
self.field_schema_wrap.init_field_schema(name="test", dtype=dtype, is_primary=True,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L0)
@pytest.mark.parametrize("field", [cf.gen_float_vec_field(), cf.gen_binary_vec_field()])
def test_collection_only_vector_field(self, field):
"""
target: test collection just with vec field
method: create with float-vec fields
expected: raise exception
"""
self._connect()
error = {ct.err_code: 1, ct.err_msg: "Schema must have a primary key field."}
self.collection_schema_wrap.init_collection_schema([field], check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_collection_multi_float_vectors(self):
"""
@ -151,34 +139,6 @@ class TestCollectionParams(TestcaseBase):
check_task=CheckTasks.check_collection_property,
check_items={exp_name: c_name, exp_schema: schema})
@pytest.mark.tags(CaseLabel.L0)
def test_collection_without_vectors(self):
"""
target: test collection without vectors
method: create collection only with int field
expected: raise exception
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
schema = cf.gen_collection_schema([cf.gen_int64_field(is_primary=True)])
error = {ct.err_code: 999, ct.err_msg: "No vector field is found."}
self.collection_wrap.init_collection(c_name, schema=schema, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
def test_collection_without_primary_field(self):
"""
target: test collection without primary field
method: no primary field specified in collection schema and fields
expected: raise exception
"""
self._connect()
int_fields, _ = self.field_schema_wrap.init_field_schema(name=ct.default_int64_field_name, dtype=DataType.INT64)
vec_fields, _ = self.field_schema_wrap.init_field_schema(name=ct.default_float_vec_field_name,
dtype=DataType.FLOAT_VECTOR, dim=ct.default_dim)
error = {ct.err_code: 1, ct.err_msg: "Schema must have a primary key field."}
self.collection_schema_wrap.init_collection_schema([int_fields, vec_fields],
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_collection_is_primary_false(self):
"""
@ -528,78 +488,6 @@ class TestCollectionParams(TestcaseBase):
self.collection_wrap.init_collection(c_name, schema=schema, check_task=CheckTasks.check_collection_property,
check_items={exp_name: c_name, exp_schema: schema})
@pytest.mark.tags(CaseLabel.L0)
def test_collection_binary(self):
"""
target: test collection with binary-vec
method: create collection with binary field
expected: assert binary field
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
self.collection_wrap.init_collection(c_name, schema=default_binary_schema,
check_task=CheckTasks.check_collection_property,
check_items={exp_name: c_name, exp_schema: default_binary_schema})
assert c_name in self.utility_wrap.list_collections()[0]
@pytest.mark.tags(CaseLabel.L0)
def test_collection_shards_num_with_default_value(self):
"""
target:test collection with shards_num
method:create collection with shards_num
expected: no exception
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
self.collection_wrap.init_collection(c_name, schema=default_schema, shards_num=default_shards_num,
check_task=CheckTasks.check_collection_property,
check_items={exp_name: c_name, exp_shards_num: default_shards_num})
assert c_name in self.utility_wrap.list_collections()[0]
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("shards_num", [-256, 0, ct.max_shards_num // 2, ct.max_shards_num])
def test_collection_shards_num_with_not_default_value(self, shards_num):
"""
target:test collection with shards_num
method:create collection with not default shards_num
expected: no exception
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
self.collection_wrap.init_collection(c_name, schema=default_schema, shards_num=shards_num,
check_task=CheckTasks.check_collection_property,
check_items={exp_name: c_name, exp_shards_num: shards_num})
assert c_name in self.utility_wrap.list_collections()[0]
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("shards_num", [ct.max_shards_num + 1, 257])
def test_collection_shards_num_invalid(self, shards_num):
"""
target:test collection with invalid shards_num
method:create collection with shards_num out of [1, 16]
expected: raise exception
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
error = {ct.err_code: 1, ct.err_msg: f"maximum shards's number should be limited to {ct.max_shards_num}"}
self.collection_wrap.init_collection(c_name, schema=default_schema, shards_num=shards_num,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("error_type_shards_num", [1.0, "2"])
def test_collection_shards_num_with_error_type(self, error_type_shards_num):
"""
target:test collection with error type shards_num
method:create collection with error type shards_num
expected: raise exception
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
error = {ct.err_code: 999, ct.err_msg: f"invalid num_shards type"}
self.collection_wrap.init_collection(c_name, schema=default_schema, shards_num=error_type_shards_num,
check_task=CheckTasks.err_res,
check_items=error)
@pytest.mark.tags(CaseLabel.L1)
def test_create_collection_maximum_fields(self):
"""
@ -839,39 +727,6 @@ class TestCollectionOperation(TestcaseBase):
check_task=CheckTasks.err_res, check_items=error)
assert self.collection_wrap.collection is None
@pytest.mark.tags(CaseLabel.L1)
def test_collection_multi_create_drop(self):
"""
target: test cycle creation and deletion of multiple collections
method: in a loop, collections are created and deleted sequentially
expected: no exception
"""
self._connect()
c_num = 20
for _ in range(c_num):
c_name = cf.gen_unique_str(prefix)
self.collection_wrap.init_collection(c_name, schema=default_schema,
check_task=CheckTasks.check_collection_property,
check_items={exp_name: c_name, exp_schema: default_schema})
self.collection_wrap.drop()
assert c_name not in self.utility_wrap.list_collections()[0]
@pytest.mark.tags(CaseLabel.L1)
def test_collection_after_drop(self):
"""
target: test create collection after create and drop
method: 1. create a 2. drop a 3, re-create a
expected: no exception
"""
c_name = cf.gen_unique_str(prefix)
collection_w = self.init_collection_wrap(name=c_name, check_task=CheckTasks.check_collection_property,
check_items={exp_name: c_name, exp_schema: default_schema})
collection_w.drop()
assert not self.utility_wrap.has_collection(collection_w.name)[0]
self.init_collection_wrap(name=c_name, check_task=CheckTasks.check_collection_property,
check_items={exp_name: c_name, exp_schema: default_schema})
assert self.utility_wrap.has_collection(c_name)[0]
@pytest.mark.tags(CaseLabel.L1)
def test_collection_all_datatype_fields(self):
"""
@ -900,7 +755,7 @@ class TestCollectionOperation(TestcaseBase):
@pytest.mark.tags(CaseLabel.L2)
def test_load_collection_after_load_partition(self):
"""
target: test release the partition after load collection
target: test load the partition after load collection
method: load collection and load the partition
expected: raise exception
"""
@ -1906,22 +1761,6 @@ class TestLoadCollection(TestcaseBase):
******************************************************************
"""
@pytest.mark.tags(CaseLabel.L0)
def test_load_collection_after_index(self):
"""
target: test load collection, after index created
method: insert and create index, load collection with correct params
expected: no error raised
"""
self._connect()
collection_w = self.init_collection_wrap()
data = cf.gen_default_list_data()
collection_w.insert(data)
collection_w.create_index(ct.default_float_vec_field_name, default_index_params,
index_name=ct.default_index_name)
collection_w.load()
collection_w.release()
@pytest.mark.tags(CaseLabel.L1)
def test_load_collection_after_index_binary(self):
"""
@ -1938,561 +1777,6 @@ class TestLoadCollection(TestcaseBase):
collection_w.load()
collection_w.release()
@pytest.mark.tags(CaseLabel.L2)
def test_load_empty_collection(self):
"""
target: test load an empty collection with no data inserted
method: no entities in collection, load and release the collection
expected: load and release successfully
"""
self._connect()
collection_w = self.init_collection_wrap()
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
collection_w.release()
@pytest.mark.tags(CaseLabel.L2)
def test_load_collection_dis_connect(self):
"""
target: test load collection, without connection
method: load collection with correct params, with a disconnected instance
expected: load raise exception
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
collection_wr = self.init_collection_wrap(c_name)
self.connection_wrap.remove_connection(ct.default_alias)
res_list, _ = self.connection_wrap.list_connections()
assert ct.default_alias not in res_list
error = {ct.err_code: 1, ct.err_msg: 'should create connection first'}
collection_wr.load(check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_release_collection_dis_connect(self):
"""
target: test release collection, without connection
method: release collection with correct params, with a disconnected instance
expected: release raise exception
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
collection_wr = self.init_collection_wrap(c_name)
self.connection_wrap.remove_connection(ct.default_alias)
res_list, _ = self.connection_wrap.list_connections()
assert ct.default_alias not in res_list
error = {ct.err_code: 999, ct.err_msg: 'should create connection first'}
collection_wr.release(check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_load_collection_not_existed(self):
"""
target: test load invalid collection
method: load not existed collection
expected: raise exception
"""
self._connect()
c_name = cf.gen_unique_str()
collection_wr = self.init_collection_wrap(name=c_name)
collection_wr.drop()
error = {ct.err_code: 999,
ct.err_msg: "collection not found"}
collection_wr.load(check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_release_collection_not_existed(self):
"""
target: test release a not existed collection
method: release with a not existed collection name
expected: raise exception
"""
self._connect()
c_name = cf.gen_unique_str()
collection_wr = self.init_collection_wrap(name=c_name)
collection_wr.drop()
error = {ct.err_code: 999,
ct.err_msg: "collection not found"}
collection_wr.release(check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_release_collection_not_load(self):
"""
target: test release collection without load
method: release collection without load
expected: release successfully
"""
self._connect()
c_name = cf.gen_unique_str()
collection_wr = self.init_collection_wrap(name=c_name)
collection_wr.release()
@pytest.mark.tags(CaseLabel.L0)
def test_load_collection_after_load_release(self):
"""
target: test load collection after load and release
method: 1.load and release collection after entities flushed
2.re-load collection
expected: No exception
"""
self._connect()
collection_w = self.init_collection_wrap()
insert_data = cf.gen_default_list_data()
collection_w.insert(data=insert_data)
assert collection_w.num_entities == ct.default_nb
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
collection_w.release()
collection_w.load()
@pytest.mark.tags(CaseLabel.L2)
def test_load_collection_repeatedly(self):
"""
target: test load collection repeatedly
method: load collection twice
expected: No exception
"""
self._connect()
collection_w = self.init_collection_wrap()
insert_data = cf.gen_default_list_data()
collection_w.insert(data=insert_data)
assert collection_w.num_entities == ct.default_nb
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
collection_w.load()
@pytest.mark.tags(CaseLabel.L1)
def test_load_partitions_after_load_collection(self):
"""
target: test load partitions after load collection
method: 1. load collection
2. load partitions
3. search on one partition
expected: No exception
"""
collection_w = self.init_collection_general(prefix)[0]
self.init_partition_wrap(collection_w, partition1)
self.init_partition_wrap(collection_w, partition2)
collection_w.load()
collection_w.load(partition_names=[partition1, partition2])
res = collection_w.search(vectors, default_search_field, default_search_params,
default_limit, partition_names=[partition1])
@pytest.mark.tags(CaseLabel.L2)
def test_load_partitions_after_load_release_collection(self):
"""
target: test load partitions after load release collection
method: 1. load collection
2. release collection
3. load partitions
4. search on one partition
expected: No exception
"""
collection_w = self.init_collection_general(prefix)[0]
self.init_partition_wrap(collection_w, partition1)
self.init_partition_wrap(collection_w, partition2)
collection_w.load()
collection_w.release()
collection_w.load(partition_names=[partition1, partition2])
collection_w.search(vectors, default_search_field, default_search_params,
default_limit, partition_names=[partition1])
@pytest.mark.tags(CaseLabel.L2)
def test_load_collection_after_release_collection_partition(self):
"""
target: test load collection after release collection and partition
method: 1. load collection
2. release collection
3. release one partition
4. load collection
5. search on the partition
expected: No exception
"""
collection_w = self.init_collection_general(prefix)[0]
partition_w = self.init_partition_wrap(collection_w, partition1)
self.init_partition_wrap(collection_w, partition2)
collection_w.load()
collection_w.release()
partition_w.release()
collection_w.load()
collection_w.search(vectors, default_search_field, default_search_params,
default_limit, partition_names=[partition1])
@pytest.mark.tags(CaseLabel.L2)
def test_load_partitions_after_release_collection_partition(self):
"""
target: test load partitions after release collection and partition
method: 1. load collection
2. release collection
3. release partition
4. search on the partition and report error
5. load partitions
6. search on the partition
expected: No exception
"""
collection_w = self.init_collection_general(prefix)[0]
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
collection_w.load()
collection_w.release()
partition_w1.release()
collection_w.search(vectors, default_search_field, default_search_params,
default_limit, partition_names=[partition1],
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1,
ct.err_msg: "not loaded"})
partition_w1.load()
partition_w2.load()
collection_w.search(vectors, default_search_field, default_search_params,
default_limit, partition_names=[partition1])
@pytest.mark.tags(CaseLabel.L2)
def test_load_collection_after_release_partition(self):
"""
target: test load collection after load collection and release partition
method: 1. load collection
2. release one partition
3. search on the released partition and report error
4. search on the non-released partition and raise no exception
3. load collection
expected: No exception
"""
collection_w = self.init_collection_general(prefix)[0]
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
collection_w.load()
partition_w1.release()
collection_w.search(vectors, default_search_field, default_search_params,
default_limit, partition_names=[partition1],
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1,
ct.err_msg: "not loaded"})
collection_w.search(vectors, default_search_field, default_search_params,
default_limit, partition_names=[partition2])
collection_w.load()
@pytest.mark.tags(CaseLabel.L2)
def test_load_partitions_after_release_partition(self):
"""
target: test load collection after release partition and load partitions
method: 1. load collection
2. release partition
3. search on the released partition and report error
4. load partitions
expected: No exception
"""
collection_w = self.init_collection_general(prefix)[0]
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
collection_w.load()
partition_w1.release()
collection_w.search(vectors, default_search_field, default_search_params,
default_limit, partition_names=[partition1],
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1,
ct.err_msg: "not loaded"})
partition_w1.load()
partition_w2.load()
@pytest.mark.tags(CaseLabel.L2)
def test_load_collection_after_release_partition_collection(self):
"""
target: test load collection after release partition and collection
method: 1. load collection
2. release partition
3. query on the released partition and report error
3. release collection
4. load collection
expected: No exception
"""
collection_w = self.init_collection_general(prefix)[0]
partition_w = self.init_partition_wrap(collection_w, partition1)
self.init_partition_wrap(collection_w, partition2)
collection_w.load()
partition_w.release()
error = {ct.err_code: 65538, ct.err_msg: 'partition not loaded'}
collection_w.query(default_term_expr, partition_names=[partition1],
check_task=CheckTasks.err_res, check_items=error)
collection_w.release()
collection_w.load()
@pytest.mark.tags(CaseLabel.L2)
def test_load_partitions_after_release_partition_collection(self):
"""
target: test load partitions after release partition and collection
method: 1. load collection
2. release partition
3. release collection
4. load one partition
5. query on the other partition and raise error
6. load the other partition
expected: No exception
"""
collection_w = self.init_collection_general(prefix)[0]
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
collection_w.load()
partition_w1.release()
collection_w.release()
partition_w1.load()
error = {ct.err_code: 65538, ct.err_msg: 'partition not loaded'}
collection_w.query(default_term_expr, partition_names=[partition2],
check_task=CheckTasks.err_res, check_items=error)
partition_w2.load()
@pytest.mark.tags(CaseLabel.L2)
def test_load_collection_after_release_partitions(self):
"""
target: test load collection after release partitions
method: 1. load collection
2. release partitions
3. load collection
expected: No exception
"""
collection_w = self.init_collection_general(prefix)[0]
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
collection_w.load()
partition_w1.release()
partition_w2.release()
collection_w.load()
@pytest.mark.tags(CaseLabel.L2)
def test_load_partitions_after_release_partitions(self):
"""
target: test load partitions after release partitions
method: 1. load collection
2. release partitions
3. load partitions
expected: No exception
"""
collection_w = self.init_collection_general(prefix)[0]
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
collection_w.load()
partition_w1.release()
partition_w2.release()
partition_w1.load()
partition_w2.load()
@pytest.mark.tags(CaseLabel.L2)
def test_load_collection_after_drop_partition_and_release_another(self):
"""
target: test load collection after drop a partition and release another
method: 1. load collection
2. drop a partition
3. release left partition
4. query on the left partition
5. load collection
expected: No exception
"""
collection_w = self.init_collection_general(prefix)[0]
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
collection_w.load()
partition_w1.release()
partition_w1.drop()
partition_w2.release()
error = {ct.err_code: 65538, ct.err_msg: 'partition not loaded'}
collection_w.query(default_term_expr, partition_names=[partition2],
check_task=CheckTasks.err_res, check_items=error)
collection_w.load()
@pytest.mark.tags(CaseLabel.L2)
def test_load_partition_after_drop_partition_and_release_another(self):
"""
target: test load partition after drop a partition and release another
method: 1. load collection
2. drop a partition
3. release left partition
4. load partition
5. query on the partition
expected: No exception
"""
collection_w = self.init_collection_general(prefix)[0]
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
collection_w.load()
partition_w1.release()
partition_w1.drop()
partition_w2.release()
partition_w2.load()
collection_w.query(default_term_expr, partition_names=[partition2])
@pytest.mark.tags(CaseLabel.L2)
def test_load_another_partition_after_drop_one_partition(self):
"""
target: test load another partition after drop a partition
method: 1. load collection
2. drop a partition
3. load another partition
4. query on the partition
expected: No exception
"""
collection_w = self.init_collection_general(prefix)[0]
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
collection_w.load()
partition_w1.release()
partition_w1.drop()
partition_w2.load()
collection_w.query(default_term_expr, partition_names=[partition2])
@pytest.mark.tags(CaseLabel.L2)
def test_load_collection_after_drop_one_partition(self):
"""
target: test load collection after drop a partition
method: 1. load collection
2. drop a partition
3. load collection
4. query on the partition
expected: No exception
"""
collection_w = self.init_collection_general(prefix)[0]
partition_w1 = self.init_partition_wrap(collection_w, partition1)
partition_w2 = self.init_partition_wrap(collection_w, partition2)
collection_w.load()
partition_w1.release()
partition_w1.drop()
collection_w.load()
collection_w.query(default_term_expr, partition_names=[partition2])
@pytest.mark.tags(CaseLabel.L2)
def test_load_release_collection(self):
"""
target: test load, release non-exist collection
method: 1. load, release and drop collection
2. load and release dropped collection
expected: raise exception
"""
self._connect()
c_name = cf.gen_unique_str()
collection_wr = self.init_collection_wrap(name=c_name)
collection_wr.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_wr.load()
collection_wr.release()
collection_wr.drop()
error = {ct.err_code: 100, ct.err_msg: "collection not found"}
collection_wr.load(check_task=CheckTasks.err_res, check_items=error)
collection_wr.release(check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_release_collection_after_drop(self):
"""
target: test release collection after drop
method: insert and flush, then release collection after load and drop
expected: raise exception
"""
self._connect()
c_name = cf.gen_unique_str()
collection_wr = self.init_collection_wrap(name=c_name)
collection_wr.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_wr.load()
collection_wr.drop()
error = {ct.err_code: 100, ct.err_msg: "collection not found"}
collection_wr.release(check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
def test_load_partition_names_empty(self):
"""
target: test query another partition
method: 1. insert entities into two partitions
2.query on one partition and query result empty
expected: query result is empty
"""
self._connect()
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
partition_w = self.init_partition_wrap(collection_wrap=collection_w)
# insert [0, half) into partition_w
half = ct.default_nb // 2
df_partition = cf.gen_default_dataframe_data(nb=half)
partition_w.insert(df_partition)
# insert [half, nb) into _default
df_default = cf.gen_default_dataframe_data(nb=half, start=half)
collection_w.insert(df_default)
# flush
collection_w.num_entities
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
# load
error = {ct.err_code: 0, ct.err_msg: "due to no partition specified"}
collection_w.load(partition_names=[], check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("invalid_num_replica", [0.2, "not-int"])
def test_load_replica_non_number(self, invalid_num_replica):
"""
target: test load collection with non-number replicas
method: load with non-number replicas
expected: raise exceptions
"""
# create, insert
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix))
df = cf.gen_default_dataframe_data()
insert_res, _ = collection_w.insert(df)
assert collection_w.num_entities == ct.default_nb
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
# load with non-number replicas
error = {ct.err_code: 999, ct.err_msg: f"`replica_number` value {invalid_num_replica} is illegal"}
collection_w.load(replica_number=invalid_num_replica, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("replicas", [-1, 0])
def test_load_replica_invalid_number(self, replicas):
"""
target: test load partition with invalid replica number
method: load with invalid replica number
expected: load successfully as replica = 1
"""
# create, insert
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix))
df = cf.gen_default_dataframe_data()
insert_res, _ = collection_w.insert(df)
assert collection_w.num_entities == ct.default_nb
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load(replica_number=replicas)
replicas = collection_w.get_replicas()[0]
groups = replicas.groups
assert len(groups) == 1
assert len(groups[0].shards) == 1
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("replicas", [None])
def test_load_replica_number_none(self, replicas):
"""
target: test load partition with replica number none
method: load with replica number=None
expected: raise exceptions
"""
# create, insert
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix))
df = cf.gen_default_dataframe_data()
insert_res, _ = collection_w.insert(df)
assert collection_w.num_entities == ct.default_nb
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load(replica_number=replicas)
@pytest.mark.tags(CaseLabel.L2)
def test_load_replica_greater_than_querynodes(self):
"""
target: test load with replicas that greater than querynodes
method: load with 3 replicas (2 querynode)
expected: Raise exception
"""
# create, insert
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix))
df = cf.gen_default_dataframe_data()
insert_res, _ = collection_w.insert(df)
assert collection_w.num_entities == ct.default_nb
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
error = {ct.err_code: 999,
ct.err_msg: "call query coordinator LoadCollection: when load 3 replica count: service resource "
"insufficient[currentStreamingNode=2][expectedStreamingNode=3]"}
collection_w.load(replica_number=3, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.ClusterOnly)
def test_load_replica_change(self):
"""
@ -2782,17 +2066,6 @@ class TestLoadCollection(TestcaseBase):
check_task=CheckTasks.check_query_results,
check_items={'exp_res': [{"count(*)": ct.default_nb}]})
@pytest.mark.tags(CaseLabel.L1)
def test_load_collection_without_creating_index(self):
"""
target: test drop index after load without release
method: create a collection without index, then load
expected: raise exception
"""
collection_w = self.init_collection_general(prefix, True, is_index=False)[0]
collection_w.load(check_task=CheckTasks.err_res,
check_items={"err_code": 1,
"err_msg": "index not found"})
class TestDescribeCollection(TestcaseBase):