test: add import case for struct array (#45146)

/kind improvement

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2025-11-03 17:19:39 +08:00 committed by GitHub
parent 25e0485a56
commit a03c398986
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 745 additions and 9 deletions

View File

@ -689,7 +689,6 @@ def gen_array_field(name=ct.default_array_field_name, element_type=DataType.INT6
return gen_scalar_field(DataType.ARRAY, name=name, description=description, is_primary=is_primary,
element_type=element_type, max_capacity=max_capacity, **kwargs)
def gen_int8_field(name=ct.default_int8_field_name, description=ct.default_desc, is_primary=False, **kwargs):
return gen_scalar_field(DataType.INT8, name=name, description=description, is_primary=is_primary, **kwargs)

View File

@ -1,6 +1,10 @@
import pytest
import numpy as np
import random
import time
import os
import json
from pathlib import Path
from typing import List, Dict, Any
from base.client_v2_base import TestMilvusClientV2Base
@ -8,9 +12,15 @@ from utils.util_log import test_log as log
from common import common_func as cf
from common import common_type as ct
from common.common_type import CaseLabel, CheckTasks
from check.param_check import compare_lists_with_epsilon_ignore_dict_order
from utils.util_pymilvus import *
from pymilvus import DataType, MilvusClient, AnnSearchRequest, RRFRanker, WeightedRanker
from pymilvus.client.embedding_list import EmbeddingList
from pymilvus.bulk_writer import bulk_import, get_import_progress, LocalBulkWriter, BulkFileType
import pyarrow as pa
import pyarrow.parquet as pq
from minio import Minio
from minio.error import S3Error
prefix = "struct_array"
@ -2886,6 +2896,46 @@ class TestMilvusClientStructArrayInvalid(TestMilvusClientV2Base):
or "not supported for fields in struct" in error_msg
)
@pytest.mark.tags(CaseLabel.L2)
def test_struct_with_unsupported_vector_field(self):
"""
target: test creating struct with BinaryVector field (should fail)
method: attempt to create struct with BinaryVector field
expected: creation should fail
"""
collection_name = cf.gen_unique_str(f"{prefix}_invalid")
client = self._client()
# Create schema
schema = client.create_schema(auto_id=False, enable_dynamic_field=False)
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(
field_name="normal_vector", datatype=DataType.FLOAT_VECTOR, dim=default_dim
)
struct_schema = client.create_struct_field_schema()
struct_schema.add_field("binary_vector_field", DataType.BINARY_VECTOR, dim=default_dim)
schema.add_field(
"struct_array",
datatype=DataType.ARRAY,
element_type=DataType.STRUCT,
struct_schema=struct_schema,
max_capacity=100,
)
error = {
ct.err_code: 65535,
ct.err_msg: "now only float vector is supported",
}
self.create_collection(
client,
collection_name,
schema=schema,
check_task=CheckTasks.err_res,
check_items=error,
)
@pytest.mark.tags(CaseLabel.L2)
def test_struct_with_json_field(self):
"""
@ -2915,7 +2965,46 @@ class TestMilvusClientStructArrayInvalid(TestMilvusClientV2Base):
)
error = {
ct.err_code: 65535,
ct.err_msg: "JSON is not supported for fields in struct",
ct.err_msg: "element type JSON is not supported",
}
self.create_collection(
client,
collection_name,
schema=schema,
check_task=CheckTasks.err_res,
check_items=error,
)
@pytest.mark.tags(CaseLabel.L2)
def test_struct_with_geometry_field(self):
"""
target: test creating struct with Geometry field (should fail)
method: attempt to create struct with Geometry field
expected: creation should fail
"""
collection_name = cf.gen_unique_str(f"{prefix}_invalid")
client = self._client()
# Create schema
schema = client.create_schema(auto_id=False, enable_dynamic_field=False)
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(
field_name="normal_vector", datatype=DataType.FLOAT_VECTOR, dim=default_dim
)
struct_schema = client.create_struct_field_schema()
struct_schema.add_field("geometry_field", DataType.GEOMETRY)
schema.add_field(
"struct_array",
datatype=DataType.ARRAY,
element_type=DataType.STRUCT,
struct_schema=struct_schema,
max_capacity=100,
)
error = {
ct.err_code: 65535,
ct.err_msg: "element type Geometry is not supported",
}
self.create_collection(
client,
@ -3276,13 +3365,661 @@ class TestMilvusClientStructArrayInvalid(TestMilvusClientV2Base):
)
class TestMilvusClientStructArrayBulkInsert(TestMilvusClientV2Base):
"""Test case of struct array bulk insert functionality"""
class TestMilvusClientStructArrayImport(TestMilvusClientV2Base):
"""Test case of struct array data import functionality using bulk_import API"""
def test_bulk_insert_struct_array(self):
# MinIO configuration constants
MINIO_ACCESS_KEY = "minioadmin"
MINIO_SECRET_KEY = "minioadmin"
REMOTE_DATA_PATH = "bulkinsert_data"
LOCAL_FILES_PATH = "/tmp/milvus_bulkinsert/"
@pytest.fixture(scope="function", autouse=True)
def setup_minio(self, minio_host, minio_bucket):
"""Setup MinIO configuration from fixtures"""
Path(self.LOCAL_FILES_PATH).mkdir(parents=True, exist_ok=True)
self.minio_host = minio_host
self.bucket_name = minio_bucket
self.minio_endpoint = f"{minio_host}:9000"
def gen_parquet_file(self, num_rows: int, dim: int, file_path: str) -> dict:
"""
target: test bulk insert struct array functionality
method: insert struct array data into collection
expected: insert successful
Generate parquet file with struct array data
Args:
num_rows: Number of rows to generate
dim: Dimension of vector fields
file_path: Path to save the parquet file
Returns:
Dictionary containing the generated data for verification
"""
pass
id_arr = []
float_vector_arr = []
struct_arr = []
for i in range(num_rows):
# ID field
id_arr.append(np.int64(i))
# Float vector field
raw_vector = [random.random() for _ in range(dim)]
float_vector_arr.append(np.array(raw_vector, dtype=np.float32))
# Struct array field - generate array of struct objects
arr_len = random.randint(1, 5) # Random number of struct elements (1-5)
struct_list = []
for j in range(arr_len):
struct_obj = {
"struct_str": f"struct_str_{i}_{j}_{cf.gen_unique_str()}",
"struct_float_vec": [random.random() for _ in range(dim)]
}
struct_list.append(struct_obj)
struct_arr.append(struct_list)
# Define PyArrow schema for struct field
struct_type = pa.struct([
pa.field("struct_str", pa.string()),
pa.field("struct_float_vec", pa.list_(pa.float32()))
])
# Build PyArrow arrays with explicit types
pa_arrays = {
"id": pa.array(id_arr, type=pa.int64()),
"float_vector": pa.array(
[np.array(v, dtype=np.float32) for v in float_vector_arr],
type=pa.list_(pa.float32())
),
"struct_array": pa.array(struct_arr, type=pa.list_(struct_type))
}
# Create PyArrow table and write to Parquet
table = pa.table(pa_arrays)
pq.write_table(table, file_path, row_group_size=10000)
log.info(f"Generated parquet file with {num_rows} rows: {file_path}")
return {
"id": id_arr,
"float_vector": float_vector_arr,
"struct_array": struct_arr
}
def gen_json_file(self, num_rows: int, dim: int, file_path: str) -> dict:
"""
Generate JSON file with struct array data in array format: [row1, row2, ...]
Args:
num_rows: Number of rows to generate
dim: Dimension of vector fields
file_path: Path to save the JSON file
Returns:
Dictionary containing the generated data for verification
"""
rows = []
id_arr = []
float_vector_arr = []
struct_arr = []
for i in range(num_rows):
# ID field
id_arr.append(i)
# Float vector field
float_vector = [random.random() for _ in range(dim)]
float_vector_arr.append(float_vector)
# Struct array field - generate array of struct objects
arr_len = random.randint(1, 5) # Random number of struct elements (1-5)
struct_list = []
for j in range(arr_len):
struct_obj = {
"struct_str": f"struct_str_{i}_{j}_{cf.gen_unique_str()}",
"struct_float_vec": [random.random() for _ in range(dim)]
}
struct_list.append(struct_obj)
struct_arr.append(struct_list)
# Build row object
row = {
"id": i,
"float_vector": float_vector,
"struct_array": struct_list
}
rows.append(row)
# Write to JSON file as array format: [row1, row2, ...]
with open(file_path, "w") as f:
json.dump(rows, f, indent=2)
log.info(f"Generated JSON file with {num_rows} rows: {file_path}")
return {
"id": id_arr,
"float_vector": float_vector_arr,
"struct_array": struct_arr
}
def gen_file_with_local_bulk_writer(
self,
schema,
data: List[Dict[str, Any]],
file_type: str = "PARQUET"
) -> tuple[str, dict]:
"""
Generate import file using LocalBulkWriter from insert-format data
Args:
schema: Collection schema
data: List of dictionaries in insert format (same format as client.insert())
file_type: Output file type, "PARQUET" or "JSON"
Returns:
Tuple of (directory path containing generated files, original data dict for verification)
"""
# Convert file_type string to BulkFileType enum
bulk_file_type = BulkFileType.PARQUET if file_type == "PARQUET" else BulkFileType.JSON
# Create LocalBulkWriter
writer = LocalBulkWriter(
schema=schema,
local_path=self.LOCAL_FILES_PATH,
segment_size=512 * 1024 * 1024, # 512MB
file_type=bulk_file_type
)
log.info(f"Creating {file_type} file using LocalBulkWriter with {len(data)} rows")
# Append each row using the same format as insert
for row in data:
writer.append_row(row)
# Commit to generate files
writer.commit()
# Get the generated file paths
batch_files = writer.batch_files
log.info(f"LocalBulkWriter generated files: {batch_files}")
# Extract data for verification (same format as other gen methods)
id_arr = [row["id"] for row in data]
float_vector_arr = [row["float_vector"] for row in data]
struct_arr = [row["struct_array"] for row in data]
return batch_files, {
"id": id_arr,
"float_vector": float_vector_arr,
"struct_array": struct_arr
}
def upload_to_minio(self, local_file_path: str) -> List[List[str]]:
"""
Upload parquet file to MinIO
Args:
local_file_path: Local path of the file to upload
Returns:
List of remote file paths in MinIO
"""
if not os.path.exists(local_file_path):
raise Exception(f"Local file '{local_file_path}' doesn't exist")
try:
minio_client = Minio(
endpoint=self.minio_endpoint,
access_key=self.MINIO_ACCESS_KEY,
secret_key=self.MINIO_SECRET_KEY,
secure=False
)
# Check if bucket exists
if not minio_client.bucket_exists(self.bucket_name):
raise Exception(f"MinIO bucket '{self.bucket_name}' doesn't exist")
# Upload file
filename = os.path.basename(local_file_path)
minio_file_path = os.path.join(self.REMOTE_DATA_PATH, filename)
minio_client.fput_object(self.bucket_name, minio_file_path, local_file_path)
log.info(f"Uploaded file to MinIO: {minio_file_path}")
return [[minio_file_path]]
except S3Error as e:
raise Exception(f"Failed to connect MinIO server {self.minio_endpoint}, error: {e}")
def call_bulkinsert(self, collection_name: str, batch_files: List[List[str]]):
"""
Call bulk import API and wait for completion
Args:
collection_name: Target collection name
batch_files: List of file paths in MinIO
"""
url = f"http://{cf.param_info.param_host}:{cf.param_info.param_port}"
log.info(f"Starting bulk import to collection '{collection_name}'")
resp = bulk_import(
url=url,
collection_name=collection_name,
files=batch_files,
)
job_id = resp.json()['data']['jobId']
log.info(f"Bulk import job created, job_id: {job_id}")
# Wait for import to complete
timeout = 300
start_time = time.time()
while time.time() - start_time < timeout:
time.sleep(5)
resp = get_import_progress(url=url, job_id=job_id)
state = resp.json()['data']['state']
progress = resp.json()['data']['progress']
log.info(f"Import job {job_id} - state: {state}, progress: {progress}%")
if state == "Importing":
continue
elif state == "Failed":
reason = resp.json()['data']['reason']
raise Exception(f"Bulk import job {job_id} failed: {reason}")
elif state == "Completed" and progress == 100:
log.info(f"Bulk import job {job_id} completed successfully")
break
else:
raise Exception(f"Bulk import job {job_id} timeout after {timeout}s")
log.info("Bulk import finished")
def verify_data(self, client: MilvusClient, collection_name: str, original_data: dict):
"""
Verify imported data matches original data using compare_lists_with_epsilon_ignore_dict_order
Args:
client: MilvusClient instance
collection_name: Collection name
original_data: Original data dictionary for comparison
"""
log.info("============= Verifying imported data ==============")
# Query all data from the collection
num_rows = len(original_data["id"])
log.info(f"Total rows to verify: {num_rows}")
results = client.query(
collection_name=collection_name,
filter=f"id >= {min(original_data['id'])}",
output_fields=["*"],
limit=num_rows + 100 # Add buffer to ensure all data is retrieved
)
log.info(f"Query returned {len(results)} rows")
# Check if row count matches
if len(results) != num_rows:
assert False, f"Row count mismatch: expected {num_rows}, got {len(results)}"
# Convert original data to comparable format (list of dicts per row)
original_rows = []
for i in range(num_rows):
row_id = int(original_data["id"][i])
original_rows.append({
"id": row_id,
"float_vector": [float(x) for x in original_data["float_vector"][i]],
"struct_array": original_data["struct_array"][i]
})
# Convert query results to comparable format
query_rows_formatted = []
for row in results:
formatted_row = {
"id": row["id"],
"float_vector": [float(x) for x in row["float_vector"]],
"struct_array": row["struct_array"]
}
query_rows_formatted.append(formatted_row)
# Use compare_lists_with_epsilon_ignore_dict_order for comparison
# This function handles floating-point tolerance and dict order
is_equal = compare_lists_with_epsilon_ignore_dict_order(
original_rows,
query_rows_formatted,
epsilon=epsilon
)
if not is_equal:
assert False, "Data verification failed: original data and query results do not match"
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("dim", [128])
@pytest.mark.parametrize("entities", [1000])
@pytest.mark.parametrize("array_capacity", [100])
def test_import_struct_array_with_parquet(self, dim, entities, array_capacity):
"""
Test bulk import of struct array data from parquet file
Collection schema: [id, float_vector, struct_array]
Struct array contains: [struct_str (VARCHAR), struct_float_vec (FLOAT_VECTOR)]
Data file format: parquet
Steps:
1. Create collection with struct array field using MilvusClient
2. Generate parquet data with PyArrow
3. Upload parquet file to MinIO
4. Import data using bulk_import API
5. Verify imported data count
6. Verify imported data integrity
"""
client = self._client()
c_name = cf.gen_unique_str("import_struct_array")
# Step 1: Create collection with struct array field
schema = client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add primary key field
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
# Add float vector field
schema.add_field(field_name="float_vector", datatype=DataType.FLOAT_VECTOR, dim=dim)
# Create struct schema for array elements
struct_schema = client.create_struct_field_schema()
struct_schema.add_field("struct_str", DataType.VARCHAR, max_length=65535)
struct_schema.add_field("struct_float_vec", DataType.FLOAT_VECTOR, dim=dim)
# Add struct array field
schema.add_field(
"struct_array",
datatype=DataType.ARRAY,
element_type=DataType.STRUCT,
struct_schema=struct_schema,
max_capacity=array_capacity,
)
# Create index params
index_params = client.prepare_index_params()
# Index for regular vector field
index_params.add_index(
field_name="float_vector",
index_type="HNSW",
metric_type="COSINE",
params={"M": 16, "efConstruction": 200}
)
# Index for vector field inside struct array
# Field name format: struct_array[struct_float_vec]
index_params.add_index(
field_name="struct_array[struct_float_vec]",
index_type="HNSW",
metric_type="MAX_SIM_COSINE",
params={"M": 16, "efConstruction": 200}
)
# Create collection
client.create_collection(
collection_name=c_name,
schema=schema,
index_params=index_params
)
log.info(f"Collection '{c_name}' created")
# Step 2: Generate parquet file
local_file_path = os.path.join(self.LOCAL_FILES_PATH, f"test_{cf.gen_unique_str()}.parquet")
original_data = self.gen_parquet_file(entities, dim, local_file_path)
# Step 3: Upload to MinIO
remote_files = self.upload_to_minio(local_file_path)
# Step 4: Import data
self.call_bulkinsert(c_name, remote_files)
# Refresh load state after import
client.refresh_load(collection_name=c_name)
# Step 5: Verify number of entities
stats = client.get_collection_stats(collection_name=c_name)
num_entities = stats['row_count']
log.info(f"Collection entities: {num_entities}")
assert num_entities == entities, f"Expected {entities} entities, got {num_entities}"
# Step 6: Verify data integrity
self.verify_data(client, c_name, original_data)
log.info("Struct array import test completed successfully")
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("dim", [128])
@pytest.mark.parametrize("entities", [1000])
@pytest.mark.parametrize("array_capacity", [100])
def test_import_struct_array_with_json(self, dim, entities, array_capacity):
"""
Test bulk import of struct array data from JSON file
Collection schema: [id, float_vector, struct_array]
Struct array contains: [struct_str (VARCHAR), struct_float_vec (FLOAT_VECTOR)]
Data file format: JSON array format [row1, row2, ...]
Steps:
1. Create collection with struct array field using MilvusClient
2. Generate JSON data in array format
3. Upload JSON file to MinIO
4. Import data using bulk_import API
5. Verify imported data count
6. Verify imported data integrity
"""
client = self._client()
c_name = cf.gen_unique_str("import_struct_array_json")
# Step 1: Create collection with struct array field
schema = client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add primary key field
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
# Add float vector field
schema.add_field(field_name="float_vector", datatype=DataType.FLOAT_VECTOR, dim=dim)
# Create struct schema for array elements
struct_schema = client.create_struct_field_schema()
struct_schema.add_field("struct_str", DataType.VARCHAR, max_length=65535)
struct_schema.add_field("struct_float_vec", DataType.FLOAT_VECTOR, dim=dim)
# Add struct array field
schema.add_field(
"struct_array",
datatype=DataType.ARRAY,
element_type=DataType.STRUCT,
struct_schema=struct_schema,
max_capacity=array_capacity,
)
# Create index params
index_params = client.prepare_index_params()
# Index for regular vector field
index_params.add_index(
field_name="float_vector",
index_type="HNSW",
metric_type="COSINE",
params={"M": 16, "efConstruction": 200}
)
# Index for vector field inside struct array
# Field name format: struct_array[struct_float_vec]
index_params.add_index(
field_name="struct_array[struct_float_vec]",
index_type="HNSW",
metric_type="MAX_SIM_COSINE",
params={"M": 16, "efConstruction": 200}
)
# Create collection
client.create_collection(
collection_name=c_name,
schema=schema,
index_params=index_params
)
log.info(f"Collection '{c_name}' created")
# Step 2: Generate JSON file
local_file_path = os.path.join(self.LOCAL_FILES_PATH, f"test_{cf.gen_unique_str()}.json")
original_data = self.gen_json_file(entities, dim, local_file_path)
# Step 3: Upload to MinIO
remote_files = self.upload_to_minio(local_file_path)
# Step 4: Import data
self.call_bulkinsert(c_name, remote_files)
# Refresh load state after import
client.refresh_load(collection_name=c_name)
# Step 5: Verify number of entities
stats = client.get_collection_stats(collection_name=c_name)
num_entities = stats['row_count']
log.info(f"Collection entities: {num_entities}")
assert num_entities == entities, f"Expected {entities} entities, got {num_entities}"
# Step 6: Verify data integrity
self.verify_data(client, c_name, original_data)
log.info("Struct array JSON import test completed successfully")
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("dim", [128])
@pytest.mark.parametrize("entities", [1000])
@pytest.mark.parametrize("array_capacity", [100])
@pytest.mark.parametrize("file_type", ["PARQUET", "JSON"])
@pytest.mark.xfail(reason="issue: https://github.com/milvus-io/pymilvus/issues/3050")
def test_import_struct_array_with_local_bulk_writer(self, dim, entities, array_capacity, file_type):
"""
Test bulk import of struct array data using LocalBulkWriter
This test demonstrates using LocalBulkWriter to convert insert-format data
into import files, which is much simpler than manually creating PyArrow/JSON files.
Collection schema: [id, float_vector, struct_array]
Struct array contains: [struct_str (VARCHAR), struct_float_vec (FLOAT_VECTOR)]
Steps:
1. Create collection with struct array field using MilvusClient
2. Generate insert-format data (same format as client.insert())
3. Use LocalBulkWriter to convert data to import files
4. Upload generated files to MinIO
5. Import data using bulk_import API
6. Verify imported data count and integrity
"""
client = self._client()
c_name = cf.gen_unique_str("import_lbw_struct")
# Step 1: Create collection with struct array field
schema = client.create_schema(auto_id=False, enable_dynamic_field=False)
# Add primary key field
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
# Add float vector field
schema.add_field(field_name="float_vector", datatype=DataType.FLOAT_VECTOR, dim=dim)
# Create struct schema for array elements
struct_schema = client.create_struct_field_schema()
struct_schema.add_field("struct_str", DataType.VARCHAR, max_length=65535)
struct_schema.add_field("struct_float_vec", DataType.FLOAT_VECTOR, dim=dim)
# Add struct array field
schema.add_field(
"struct_array",
datatype=DataType.ARRAY,
element_type=DataType.STRUCT,
struct_schema=struct_schema,
max_capacity=array_capacity,
)
# Create index params
index_params = client.prepare_index_params()
# Index for regular vector field
index_params.add_index(
field_name="float_vector",
index_type="HNSW",
metric_type="COSINE",
params={"M": 16, "efConstruction": 200}
)
# Index for vector field inside struct array
index_params.add_index(
field_name="struct_array[struct_float_vec]",
index_type="HNSW",
metric_type="MAX_SIM_COSINE",
params={"M": 16, "efConstruction": 200}
)
# Create collection
client.create_collection(
collection_name=c_name,
schema=schema,
index_params=index_params
)
log.info(f"Collection '{c_name}' created")
# Step 2: Generate insert-format data
log.info(f"Generating {entities} rows of insert-format data")
insert_data = []
for i in range(entities):
# Generate random number of struct elements (1-5)
arr_len = random.randint(1, 5)
struct_list = []
for j in range(arr_len):
struct_obj = {
"struct_str": f"struct_str_{i}_{j}_{cf.gen_unique_str()}",
"struct_float_vec": [random.random() for _ in range(dim)]
}
struct_list.append(struct_obj)
row = {
"id": i,
"float_vector": [random.random() for _ in range(dim)],
"struct_array": struct_list
}
insert_data.append(row)
# Step 3: Use LocalBulkWriter to convert data to import files
log.info(f"Using LocalBulkWriter to generate {file_type} files")
batch_files, original_data = self.gen_file_with_local_bulk_writer(
schema=schema,
data=insert_data,
file_type=file_type
)
# Step 4: Upload generated files to MinIO
# LocalBulkWriter generates files in a UUID subdirectory
# We need to upload all files in that directory
import_files = []
for file_list in batch_files:
for file_path in file_list:
remote_files = self.upload_to_minio(file_path)
import_files.extend(remote_files)
log.info(f"Uploaded {len(import_files)} file(s) to MinIO")
# Step 5: Import data using bulk_import API
self.call_bulkinsert(c_name, import_files)
# Refresh load state after import
client.refresh_load(collection_name=c_name)
# Step 6: Verify number of entities
stats = client.get_collection_stats(collection_name=c_name)
num_entities = stats['row_count']
log.info(f"Collection entities: {num_entities}")
assert num_entities == entities, f"Expected {entities} entities, got {num_entities}"
# Step 7: Verify data integrity
self.verify_data(client, c_name, original_data)
log.info(f"LocalBulkWriter {file_type} import test completed successfully")