test: refactor checker to using milvus client (#45524)

/kind improvement

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2025-11-20 11:59:08 +08:00 committed by GitHub
parent 79926b412c
commit e0df44481d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1162 additions and 591 deletions

File diff suppressed because it is too large Load Diff

View File

@ -22,6 +22,7 @@ RELEASE_NAME = 'test-allstandalone-pod-kill-19-25-26'
WAIT_PER_OP = 10 # time to wait in seconds between operations
CHAOS_DURATION = 120 # chaos duration time in seconds
DEFAULT_INDEX_PARAM = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 48, "efConstruction": 500}}
DEFAULT_EMB_LIST_INDEX_PARAM = {"index_type": "HNSW", "metric_type": "MAX_SIM_COSINE", "params": {"M": 16, "efConstruction": 200}}
DEFAULT_SEARCH_PARAM = {"metric_type": "L2", "params": {"ef": 64}}
DEFAULT_INT8_INDEX_PARAM = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 48, "efConstruction": 500}}
DEFAULT_INT8_SEARCH_PARAM = {"metric_type": "L2", "params": {"ef": 64}}

View File

@ -24,6 +24,7 @@ from chaos.checker import (CollectionCreateChecker,
AlterCollectionChecker,
AddFieldChecker,
CollectionRenameChecker,
TensorSearchChecker,
Op,
EventRecords,
ResultAnalyzer
@ -83,6 +84,7 @@ class TestOperations(TestBase):
checkers = {
Op.create: CollectionCreateChecker(collection_name=c_name),
Op.insert: InsertChecker(collection_name=c_name),
Op.tensor_search :TensorSearchChecker(collection_name=c_name),
Op.upsert: UpsertChecker(collection_name=c_name),
Op.partial_update: PartialUpdateChecker(collection_name=c_name),
Op.flush: FlushChecker(collection_name=c_name),

View File

@ -836,33 +836,49 @@ def gen_default_collection_schema(description=ct.default_desc, primary_field=ct.
def gen_all_datatype_collection_schema(description=ct.default_desc, primary_field=ct.default_int64_field_name,
auto_id=False, dim=ct.default_dim, enable_dynamic_field=True, nullable=True,**kwargs):
auto_id=False, dim=ct.default_dim, enable_dynamic_field=True, nullable=True,
enable_struct_array_field=True, **kwargs):
analyzer_params = {
"tokenizer": "standard",
}
fields = [
gen_int64_field(),
gen_float_field(nullable=nullable),
gen_string_field(nullable=nullable),
gen_string_field(name="document", max_length=2000, enable_analyzer=True, enable_match=True, nullable=nullable),
gen_string_field(name="text", max_length=2000, enable_analyzer=True, enable_match=True,
analyzer_params=analyzer_params),
gen_json_field(nullable=nullable),
gen_geometry_field(nullable=nullable),
gen_array_field(name="array_int", element_type=DataType.INT64),
gen_array_field(name="array_float", element_type=DataType.FLOAT),
gen_array_field(name="array_varchar", element_type=DataType.VARCHAR, max_length=200),
gen_array_field(name="array_bool", element_type=DataType.BOOL),
gen_float_vec_field(dim=dim),
gen_int8_vec_field(name="image_emb", dim=dim),
gen_float_vec_field(name="text_sparse_emb", vector_data_type=DataType.SPARSE_FLOAT_VECTOR),
gen_float_vec_field(name="voice_emb", dim=dim),
# gen_timestamptz_field(name="timestamptz", nullable=nullable),
]
schema, _ = ApiCollectionSchemaWrapper().init_collection_schema(fields=fields, description=description,
primary_field=primary_field, auto_id=auto_id,
enable_dynamic_field=enable_dynamic_field, **kwargs)
# Create schema using MilvusClient
schema = MilvusClient.create_schema(
auto_id=auto_id,
enable_dynamic_field=enable_dynamic_field,
description=description,
**kwargs
)
# Add all fields using schema.add_field()
schema.add_field(primary_field, DataType.INT64, is_primary=True)
schema.add_field(ct.default_float_field_name, DataType.FLOAT, nullable=nullable)
schema.add_field(ct.default_string_field_name, DataType.VARCHAR, max_length=ct.default_max_length, nullable=nullable)
schema.add_field("document", DataType.VARCHAR, max_length=2000, enable_analyzer=True, enable_match=True, nullable=nullable)
schema.add_field("text", DataType.VARCHAR, max_length=2000, enable_analyzer=True, enable_match=True,
analyzer_params=analyzer_params)
schema.add_field(ct.default_json_field_name, DataType.JSON, nullable=nullable)
schema.add_field(ct.default_geometry_field_name, DataType.GEOMETRY, nullable=nullable)
schema.add_field("array_int", DataType.ARRAY, element_type=DataType.INT64, max_capacity=ct.default_max_capacity)
schema.add_field("array_float", DataType.ARRAY, element_type=DataType.FLOAT, max_capacity=ct.default_max_capacity)
schema.add_field("array_varchar", DataType.ARRAY, element_type=DataType.VARCHAR, max_length=200, max_capacity=ct.default_max_capacity)
schema.add_field("array_bool", DataType.ARRAY, element_type=DataType.BOOL, max_capacity=ct.default_max_capacity)
schema.add_field(ct.default_float_vec_field_name, DataType.FLOAT_VECTOR, dim=dim)
schema.add_field("image_emb", DataType.INT8_VECTOR, dim=dim)
schema.add_field("text_sparse_emb", DataType.SPARSE_FLOAT_VECTOR)
# schema.add_field("voice_emb", DataType.FLOAT_VECTOR, dim=dim)
# schema.add_field("timestamptz", DataType.TIMESTAMPTZ, nullable=nullable)
# Add struct array field
if enable_struct_array_field:
struct_schema = MilvusClient.create_struct_field_schema()
struct_schema.add_field("name", DataType.VARCHAR, max_length=200)
struct_schema.add_field("age", DataType.INT64)
struct_schema.add_field("float_vector", DataType.FLOAT_VECTOR, dim=dim)
schema.add_field("array_struct", datatype=DataType.ARRAY, element_type=DataType.STRUCT,
struct_schema=struct_schema, max_capacity=10)
# Add BM25 function
bm25_function = Function(
name=f"text",
function_type=FunctionType.BM25,
@ -871,6 +887,7 @@ def gen_all_datatype_collection_schema(description=ct.default_desc, primary_fiel
params={},
)
schema.add_function(bm25_function)
return schema
@ -1779,16 +1796,44 @@ def get_column_data_by_schema(nb=ct.default_nb, schema=None, skip_vectors=False,
return data
def convert_orm_schema_to_dict_schema(orm_schema):
"""
Convert ORM CollectionSchema object to dict format (same as describe_collection output).
Args:
orm_schema: CollectionSchema object from pymilvus.orm
Returns:
dict: Schema in dict format compatible with MilvusClient describe_collection output
"""
# Use the built-in to_dict() method which already provides the right structure
schema_dict = orm_schema.to_dict()
# to_dict() already includes:
# - auto_id
# - description
# - fields (with each field's to_dict())
# - enable_dynamic_field
# - functions (if present)
# - struct_fields (if present)
return schema_dict
def gen_row_data_by_schema(nb=ct.default_nb, schema=None, start=0, random_pk=False, skip_field_names=[], desired_field_names=[]):
"""
Generates row data based on the given schema.
Args:
nb (int): Number of rows to generate. Defaults to ct.default_nb.
schema (Schema): Collection schema or collection info. If None, uses default schema.
schema (Schema): Collection schema or collection info. Can be:
- dict (from client.describe_collection())
- CollectionSchema object (from ORM)
- None (uses default schema)
start (int): Starting value for primary key fields. Defaults to 0.
random_pk (bool, optional): Whether to generate random primary key values (default: False)
skip_field_names(list, optional): whether to skip some field to gen data manually (default: [])
desired_field_names(list, optional): only generate data for specified field names (default: [])
Returns:
list[dict]: List of dictionaries where each dictionary represents a row,
@ -1798,6 +1843,7 @@ def gen_row_data_by_schema(nb=ct.default_nb, schema=None, start=0, random_pk=Fal
- Skips auto_id fields and function output fields.
- For primary key fields, generates sequential values starting from 'start'.
- For non-primary fields, generates random data based on field type.
- Supports struct array fields in both dict and ORM schema formats.
"""
# if both skip_field_names and desired_field_names are specified, raise an exception
if skip_field_names and desired_field_names:
@ -1806,10 +1852,12 @@ def gen_row_data_by_schema(nb=ct.default_nb, schema=None, start=0, random_pk=Fal
if schema is None:
schema = gen_default_collection_schema()
# ignore auto id field and the fields in function output
func_output_fields = []
if isinstance(schema, dict):
# a dict of collection schema info is usually from client.describe_collection()
# Convert ORM schema to dict schema for unified processing
if not isinstance(schema, dict):
schema = convert_orm_schema_to_dict_schema(schema)
# Now schema is always a dict after conversion, process it uniformly
# Get all fields from schema
all_fields = schema.get('fields', [])
fields = []
for field in all_fields:
@ -1820,12 +1868,44 @@ def gen_row_data_by_schema(nb=ct.default_nb, schema=None, start=0, random_pk=Fal
elif not desired_field_names:
fields.append(field)
# Get struct_fields from schema
struct_fields = schema.get('struct_fields', [])
log.debug(f"[gen_row_data_by_schema] struct_fields from schema: {len(struct_fields)} items")
if struct_fields:
log.debug(f"[gen_row_data_by_schema] First struct_field: {struct_fields[0]}")
# If struct_fields is not present, extract struct array fields from fields list
# This happens when using client.describe_collection()
if not struct_fields:
struct_fields = []
for field in fields:
if field.get('type') == DataType.ARRAY and field.get('element_type') == DataType.STRUCT:
# Convert field format to struct_field format
struct_field_dict = {
'name': field.get('name'),
'max_capacity': field.get('params', {}).get('max_capacity', 100),
'fields': []
}
# Get struct fields from field - key can be 'struct_fields' or 'struct_schema'
struct_field_list = field.get('struct_fields') or field.get('struct_schema')
if struct_field_list:
# If it's a dict with 'fields' key, get the fields
if isinstance(struct_field_list, dict) and 'fields' in struct_field_list:
struct_field_dict['fields'] = struct_field_list['fields']
# If it's already a list, use it directly
elif isinstance(struct_field_list, list):
struct_field_dict['fields'] = struct_field_list
struct_fields.append(struct_field_dict)
# Get function output fields to skip
func_output_fields = []
functions = schema.get('functions', [])
for func in functions:
output_field_names = func.get('output_field_names', [])
func_output_fields.extend(output_field_names)
func_output_fields = list(set(func_output_fields))
# Filter fields that need data generation
fields_needs_data = []
for field in fields:
field_name = field.get('name', None)
@ -1833,57 +1913,35 @@ def gen_row_data_by_schema(nb=ct.default_nb, schema=None, start=0, random_pk=Fal
continue
if field_name in func_output_fields or field_name in skip_field_names:
continue
# Skip struct array fields as they are handled separately via struct_fields
if field.get('type') == DataType.ARRAY and field.get('element_type') == DataType.STRUCT:
continue
fields_needs_data.append(field)
# Generate data for each row
data = []
for i in range(nb):
tmp = {}
# Generate data for regular fields
for field in fields_needs_data:
tmp[field.get('name', None)] = gen_data_by_collection_field(field, random_pk=random_pk)
# Handle primary key fields specially
if field.get('is_primary', False) is True and field.get('type', None) == DataType.INT64:
tmp[field.get('name', None)] = start
start += 1
if field.get('is_primary', False) is True and field.get('type', None) == DataType.VARCHAR:
tmp[field.get('name', None)] = str(start)
start += 1
data.append(tmp)
else:
# a schema object is usually form orm schema object
all_fields = schema.fields
fields = []
for field in all_fields:
# if desired_field_names is specified, only generate the fields in desired_field_names
if field.name in desired_field_names:
fields.append(field)
# elif desired_field_names is not specified, generate all fields
elif not desired_field_names:
fields.append(field)
if hasattr(schema, "functions"):
functions = schema.functions
for func in functions:
output_field_names = func.output_field_names
func_output_fields.extend(output_field_names)
func_output_fields = list(set(func_output_fields))
# Generate data for struct array fields
for struct_field in struct_fields:
field_name = struct_field.get('name', None)
struct_data = gen_struct_array_data(struct_field, start=start, random_pk=random_pk)
tmp[field_name] = struct_data
fields_needs_data = []
for field in fields:
if field.auto_id:
continue
if field.name in func_output_fields or field.name in skip_field_names:
continue
fields_needs_data.append(field)
data = []
for i in range(nb):
tmp = {}
for field in fields_needs_data:
tmp[field.name] = gen_data_by_collection_field(field, random_pk=random_pk)
if field.is_primary is True and field.dtype == DataType.INT64:
tmp[field.name] = start
start += 1
if field.is_primary is True and field.dtype == DataType.VARCHAR:
tmp[field.name] = str(start)
start += 1
data.append(tmp)
log.debug(f"[gen_row_data_by_schema] Generated {len(data)} rows, first row keys: {list(data[0].keys()) if data else []}")
return data
@ -2035,6 +2093,17 @@ def get_int8_vec_field_name_list(schema=None):
vec_fields.append(field.name)
return vec_fields
def get_emb_list_field_name_list(schema=None):
vec_fields = []
if schema is None:
schema = gen_default_collection_schema()
struct_fields = schema.struct_fields
for struct_field in struct_fields:
for field in struct_field.fields:
if field.dtype in [DataType.FLOAT_VECTOR]:
vec_fields.append(f"{struct_field.name}[{field.name}]")
return vec_fields
def get_bm25_vec_field_name_list(schema=None):
if not hasattr(schema, "functions"):
return []
@ -2072,6 +2141,40 @@ def get_dense_anns_field_name_list(schema=None):
anns_fields.append(item)
return anns_fields
def get_struct_array_vector_field_list(schema=None):
if schema is None:
schema = gen_default_collection_schema()
struct_fields = schema.struct_fields
struct_vector_fields = []
for struct_field in struct_fields:
struct_field_name = struct_field.name
# Check each sub-field for vector types
for sub_field in struct_field.fields:
sub_field_name = sub_field.name if hasattr(sub_field, 'name') else sub_field.get('name')
sub_field_dtype = sub_field.dtype if hasattr(sub_field, 'dtype') else sub_field.get('type')
if sub_field_dtype in [DataType.FLOAT_VECTOR, DataType.FLOAT16_VECTOR,
DataType.BFLOAT16_VECTOR, DataType.INT8_VECTOR,
DataType.BINARY_VECTOR]:
# Get dimension
if hasattr(sub_field, 'params'):
dim = sub_field.params.get('dim')
else:
dim = sub_field.get('params', {}).get('dim')
item = {
"struct_field": struct_field_name,
"vector_field": sub_field_name,
"anns_field": f"{struct_field_name}[{sub_field_name}]",
"dtype": sub_field_dtype,
"dim": dim
}
struct_vector_fields.append(item)
return struct_vector_fields
def gen_varchar_data(length: int, nb: int, text_mode=False):
if text_mode:
@ -2080,6 +2183,38 @@ def gen_varchar_data(length: int, nb: int, text_mode=False):
return ["".join([chr(random.randint(97, 122)) for _ in range(length)]) for _ in range(nb)]
def gen_struct_array_data(struct_field, start=0, random_pk=False):
"""
Generates struct array data based on the struct field schema.
Args:
struct_field: Either a dict (from dict schema) or StructFieldSchema object (from ORM schema)
start: Starting value for primary key fields
random_pk: Whether to generate random primary key values
Returns:
List of struct data dictionaries
"""
struct_array_data = []
# Handle both dict and object formats
if isinstance(struct_field, dict):
max_capacity = struct_field.get('max_capacity', 100)
fields = struct_field.get('fields', [])
else:
# StructFieldSchema object
max_capacity = getattr(struct_field, 'max_capacity', 100) or 100
fields = struct_field.fields
arr_len = random.randint(1, max_capacity)
for _ in range(arr_len):
struct_data = {}
for field in fields:
field_name = field.get('name') if isinstance(field, dict) else field.name
struct_data[field_name] = gen_data_by_collection_field(field, nb=None, start=start, random_pk=random_pk)
struct_array_data.append(struct_data)
return struct_array_data
def gen_data_by_collection_field(field, nb=None, start=0, random_pk=False):
"""
Generates test data for a given collection field based on its data type and properties.
@ -2105,7 +2240,8 @@ def gen_data_by_collection_field(field, nb=None, start=0, random_pk=False):
# for v2 client, it accepts a dict of field info
nullable = field.get('nullable', False)
data_type = field.get('type', None)
enable_analyzer = field.get('params').get("enable_analyzer", False)
params = field.get('params', {}) or {}
enable_analyzer = params.get("enable_analyzer", False)
is_primary = field.get('is_primary', False)
else:
# for ORM client, it accepts a field object
@ -2224,9 +2360,16 @@ def gen_data_by_collection_field(field, nb=None, start=0, random_pk=False):
elif data_type == DataType.ARRAY:
if isinstance(field, dict):
max_capacity = field.get('params')['max_capacity']
element_type = field.get('element_type')
else:
max_capacity = field.params['max_capacity']
element_type = field.element_type
# Struct array fields are handled separately in gen_row_data_by_schema
# by processing struct_fields, so skip here
if element_type == DataType.STRUCT:
return None
if element_type == DataType.INT8:
if nb is None:
return [random.randint(-128, 127) for _ in range(max_capacity)] if random.random() < 0.8 or nullable is False else None

View File

@ -12,6 +12,7 @@ default_dim = 128
default_nb = 2000
default_nb_medium = 5000
default_max_capacity = 100
default_max_length = 500
default_top_k = 10
default_nq = 2
default_limit = 10

View File

@ -111,7 +111,7 @@ class TestMilvusClientPartialUpdateValid(TestMilvusClientV2Base):
"""
# step 1: create collection with all datatype schema
client = self._client()
schema = cf.gen_all_datatype_collection_schema(dim=default_dim)
schema = cf.gen_all_datatype_collection_schema(dim=default_dim, enable_struct_array_field=False)
index_params = self.prepare_index_params(client)[0]
text_sparse_emb_field_name = "text_sparse_emb"
@ -201,7 +201,7 @@ class TestMilvusClientPartialUpdateValid(TestMilvusClientV2Base):
collection_name = cf.gen_collection_name_by_testcase_name()
# Create schema with all data types
schema = cf.gen_all_datatype_collection_schema(dim=dim)
schema = cf.gen_all_datatype_collection_schema(dim=dim, enable_struct_array_field=False)
# Create index parameters
index_params = client.prepare_index_params()

View File

@ -2659,7 +2659,7 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
def test_upsert_struct_array_data(self):
"""
target: test upsert operation with struct array data
method: insert data then upsert with modified struct array
method: insert 3000 records, flush 2000, insert 1000 growing, then upsert with modified struct array
expected: data successfully upserted
"""
collection_name = cf.gen_unique_str(f"{prefix}_crud")
@ -2669,25 +2669,50 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
# Create collection
self.create_collection_with_schema(client, collection_name)
# Initial insert
initial_data = [
{
"id": 1,
# Insert 2000 records for flushed data
flushed_data = []
for i in range(2000):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [
random.random() for _ in range(default_dim)
],
"scalar_field": 100,
"label": "initial",
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i,
"label": f"flushed_{i}",
}
],
}
]
flushed_data.append(row)
res, check = self.insert(client, collection_name, initial_data)
res, check = self.insert(client, collection_name, flushed_data)
assert check
assert res["insert_count"] == 2000
# Flush to persist data
res, check = self.flush(client, collection_name)
assert check
# Insert 1000 records for growing data
growing_data = []
for i in range(2000, 3000):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i,
"label": f"growing_{i}",
}
],
}
growing_data.append(row)
res, check = self.insert(client, collection_name, growing_data)
assert check
assert res["insert_count"] == 1000
# create index and load collection
index_params = client.prepare_index_params()
index_params.add_index(
@ -2707,40 +2732,63 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
res, check = self.load_collection(client, collection_name)
assert check
# Upsert with modified data
upsert_data = [
{
"id": 1, # Same ID
# Upsert data in both flushed and growing segments
upsert_data = []
# Upsert 10 records from flushed data
for i in range(0, 10):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [
random.random() for _ in range(default_dim)
],
"scalar_field": 200, # Modified
"label": "updated", # Modified
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i + 10000, # Modified
"label": f"updated_flushed_{i}", # Modified
}
],
}
]
upsert_data.append(row)
# Upsert 10 records from growing data
for i in range(2000, 2010):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i + 10000, # Modified
"label": f"updated_growing_{i}", # Modified
}
],
}
upsert_data.append(row)
res, check = self.upsert(client, collection_name, upsert_data)
assert check
# Verify upsert worked
# Verify upsert worked for flushed data
res, check = self.flush(client, collection_name)
assert check
results, check = self.query(client, collection_name, filter="id == 1")
results, check = self.query(client, collection_name, filter="id < 10")
assert check
assert len(results) == 1
assert results[0]["clips"][0]["label"] == "updated"
assert len(results) == 10
for result in results:
assert "updated_flushed" in result["clips"][0]["label"]
# Verify upsert worked for growing data
results, check = self.query(client, collection_name, filter="id >= 2000 and id < 2010")
assert check
assert len(results) == 10
for result in results:
assert "updated_growing" in result["clips"][0]["label"]
@pytest.mark.tags(CaseLabel.L0)
def test_delete_struct_array_data(self):
"""
target: test delete operation with struct array data
method: insert struct array data then delete by ID
method: insert 3000 records (2000 flushed + 1000 growing), then delete by ID from both segments
expected: data successfully deleted
"""
collection_name = cf.gen_unique_str(f"{prefix}_crud")
@ -2750,25 +2798,50 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
# Create collection and insert data
self.create_collection_with_schema(client, collection_name)
data = []
for i in range(10):
# Insert 2000 records for flushed data
flushed_data = []
for i in range(2000):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [
random.random() for _ in range(default_dim)
],
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i,
"label": f"label_{i}",
"label": f"flushed_{i}",
}
],
}
data.append(row)
flushed_data.append(row)
res, check = self.insert(client, collection_name, data)
res, check = self.insert(client, collection_name, flushed_data)
assert check
assert res["insert_count"] == 2000
# Flush to persist data
res, check = self.flush(client, collection_name)
assert check
# Insert 1000 records for growing data
growing_data = []
for i in range(2000, 3000):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i,
"label": f"growing_{i}",
}
],
}
growing_data.append(row)
res, check = self.insert(client, collection_name, growing_data)
assert check
assert res["insert_count"] == 1000
# create index and load collection
index_params = client.prepare_index_params()
index_params.add_index(
@ -2788,9 +2861,14 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
res, check = self.load_collection(client, collection_name)
assert check
# Delete some records
delete_ids = [1, 3, 5]
res, check = self.delete(client, collection_name, filter=f"id in {delete_ids}")
# Delete some records from flushed segment
delete_flushed_ids = [1, 3, 5, 100, 500, 1000]
res, check = self.delete(client, collection_name, filter=f"id in {delete_flushed_ids}")
assert check
# Delete some records from growing segment
delete_growing_ids = [2001, 2003, 2500, 2999]
res, check = self.delete(client, collection_name, filter=f"id in {delete_growing_ids}")
assert check
# Verify deletion
@ -2801,14 +2879,21 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
assert check
remaining_ids = {result["id"] for result in results}
for delete_id in delete_ids:
# Verify flushed data deletion
for delete_id in delete_flushed_ids:
assert delete_id not in remaining_ids
# Verify growing data deletion
for delete_id in delete_growing_ids:
assert delete_id not in remaining_ids
# Verify total count is correct (3000 - 10 deleted)
assert len(results) == 2990
@pytest.mark.tags(CaseLabel.L1)
def test_batch_operations(self):
"""
target: test batch insert/upsert operations with struct array
method: perform large batch operations
method: insert 3000 records (2000 flushed + 1000 growing), then perform batch upsert
expected: all operations successful
"""
collection_name = cf.gen_unique_str(f"{prefix}_crud")
@ -2818,42 +2903,77 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
# Create collection
self.create_collection_with_schema(client, collection_name)
# Large batch insert
batch_size = 1000
data = []
for i in range(batch_size):
# Insert 2000 records for flushed data
flushed_data = []
for i in range(2000):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [
random.random() for _ in range(default_dim)
],
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i % 100,
"label": f"batch_{i}",
"label": f"flushed_{i}",
}
],
}
data.append(row)
flushed_data.append(row)
res, check = self.insert(client, collection_name, data)
res, check = self.insert(client, collection_name, flushed_data)
assert check
assert res["insert_count"] == batch_size
assert res["insert_count"] == 2000
# Batch upsert (update first 100 records)
# Flush to persist data
res, check = self.flush(client, collection_name)
assert check
# Insert 1000 records for growing data
growing_data = []
for i in range(2000, 3000):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i % 100,
"label": f"growing_{i}",
}
],
}
growing_data.append(row)
res, check = self.insert(client, collection_name, growing_data)
assert check
assert res["insert_count"] == 1000
# Batch upsert (update first 100 flushed records and 50 growing records)
upsert_data = []
# Update first 100 flushed records
for i in range(100):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [
random.random() for _ in range(default_dim)
],
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i + 1000, # Modified
"label": f"upserted_{i}", # Modified
"label": f"upserted_flushed_{i}", # Modified
}
],
}
upsert_data.append(row)
# Update first 50 growing records
for i in range(2000, 2050):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i + 1000, # Modified
"label": f"upserted_growing_{i}", # Modified
}
],
}
@ -2862,11 +2982,15 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
res, check = self.upsert(client, collection_name, upsert_data)
assert check
# Verify upsert success with flush
res, check = self.flush(client, collection_name)
assert check
@pytest.mark.tags(CaseLabel.L1)
def test_collection_operations(self):
"""
target: test collection operations (load/release/drop) with struct array
method: perform collection management operations
method: insert 3000 records (2000 flushed + 1000 growing), then perform collection management operations
expected: all operations successful
"""
collection_name = cf.gen_unique_str(f"{prefix}_crud")
@ -2876,25 +3000,49 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
# Create collection with data
self.create_collection_with_schema(client, collection_name)
# Insert some data
data = [
{
"id": 1,
# Insert 2000 records for flushed data
flushed_data = []
for i in range(2000):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [
random.random() for _ in range(default_dim)
],
"scalar_field": 100,
"label": "test",
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i,
"label": f"flushed_{i}",
}
],
}
]
flushed_data.append(row)
res, check = self.insert(client, collection_name, data)
res, check = self.insert(client, collection_name, flushed_data)
assert check
assert res["insert_count"] == 2000
# Flush to persist data
res, check = self.flush(client, collection_name)
assert check
# Insert 1000 records for growing data
growing_data = []
for i in range(2000, 3000):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i,
"label": f"growing_{i}",
}
],
}
growing_data.append(row)
res, check = self.insert(client, collection_name, growing_data)
assert check
assert res["insert_count"] == 1000
# Create index for loading
index_params = client.prepare_index_params()
@ -2922,6 +3070,11 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
load_state = client.get_load_state(collection_name)
assert str(load_state["state"]) == "Loaded"
# Query to verify both flushed and growing data are accessible
results, check = self.query(client, collection_name, filter="id >= 0", limit=3000)
assert check
assert len(results) == 3000
# Release collection
res, check = self.release_collection(client, collection_name)
assert check

View File

@ -1,7 +1,7 @@
[pytest]
addopts = --host localhost --html=/tmp/ci_logs/report.html --self-contained-html -v
addopts = --host 10.104.31.219 --html=/tmp/ci_logs/report.html --self-contained-html -v --log-cli-level INFO
# python3 -W ignore -m pytest
log_format = [%(asctime)s - %(levelname)s - %(name)s]: %(message)s (%(filename)s:%(lineno)s)