test: add e2e test cases for Timestamptz (#45800)

Issue: #44518, #45756
pr: #44871, #45128, #45770, #45524,  #44794, #45014

---------

Signed-off-by: Eric Hou <eric.hou@zilliz.com>
Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
Co-authored-by: Eric Hou <eric.hou@zilliz.com>
Co-authored-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
Feilong Hou 2025-11-28 18:01:09 +08:00 committed by GitHub
parent b948c62413
commit eaed10538d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 2071 additions and 771 deletions

File diff suppressed because it is too large Load Diff

View File

@ -22,6 +22,7 @@ RELEASE_NAME = 'test-allstandalone-pod-kill-19-25-26'
WAIT_PER_OP = 10 # time to wait in seconds between operations
CHAOS_DURATION = 120 # chaos duration time in seconds
DEFAULT_INDEX_PARAM = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 48, "efConstruction": 500}}
DEFAULT_EMB_LIST_INDEX_PARAM = {"index_type": "HNSW", "metric_type": "MAX_SIM_COSINE", "params": {"M": 16, "efConstruction": 200}}
DEFAULT_SEARCH_PARAM = {"metric_type": "L2", "params": {"ef": 64}}
DEFAULT_INT8_INDEX_PARAM = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 48, "efConstruction": 500}}
DEFAULT_INT8_SEARCH_PARAM = {"metric_type": "L2", "params": {"ef": 64}}

View File

@ -13,6 +13,7 @@ from chaos.checker import (InsertChecker,
TextMatchChecker,
PhraseMatchChecker,
JsonQueryChecker,
GeoQueryChecker,
DeleteChecker,
AddFieldChecker,
Op,
@ -86,6 +87,7 @@ class TestOperations(TestBase):
Op.text_match: TextMatchChecker(collection_name=c_name),
Op.phrase_match: PhraseMatchChecker(collection_name=c_name),
Op.json_query: JsonQueryChecker(collection_name=c_name),
Op.geo_query: GeoQueryChecker(collection_name=c_name),
Op.delete: DeleteChecker(collection_name=c_name),
Op.add_field: AddFieldChecker(collection_name=c_name),
}

View File

@ -17,12 +17,14 @@ from chaos.checker import (CollectionCreateChecker,
TextMatchChecker,
PhraseMatchChecker,
JsonQueryChecker,
GeoQueryChecker,
IndexCreateChecker,
DeleteChecker,
CollectionDropChecker,
AlterCollectionChecker,
AddFieldChecker,
CollectionRenameChecker,
TensorSearchChecker,
Op,
EventRecords,
ResultAnalyzer
@ -82,8 +84,9 @@ class TestOperations(TestBase):
checkers = {
Op.create: CollectionCreateChecker(collection_name=c_name),
Op.insert: InsertChecker(collection_name=c_name),
Op.tensor_search :TensorSearchChecker(collection_name=c_name),
Op.upsert: UpsertChecker(collection_name=c_name),
Op.partial_update: PartialUpdateChecker(collection_name=c_name),
Op.partial_update: PartialUpdateChecker(collection_name=c_name),
Op.flush: FlushChecker(collection_name=c_name),
Op.index: IndexCreateChecker(collection_name=c_name),
Op.search: SearchChecker(collection_name=c_name),
@ -93,6 +96,7 @@ class TestOperations(TestBase):
Op.text_match: TextMatchChecker(collection_name=c_name),
Op.phrase_match: PhraseMatchChecker(collection_name=c_name),
Op.json_query: JsonQueryChecker(collection_name=c_name),
Op.geo_query: GeoQueryChecker(collection_name=c_name),
Op.delete: DeleteChecker(collection_name=c_name),
Op.drop: CollectionDropChecker(collection_name=c_name),
Op.alter_collection: AlterCollectionChecker(collection_name=c_name),

View File

@ -588,8 +588,8 @@ class ResponseChecker:
if isinstance(query_res, list):
result = pc.compare_lists_with_epsilon_ignore_dict_order(a=query_res, b=exp_res)
if result is False:
log.debug(f"query expected: {exp_res}")
log.debug(f"query actual: {query_res}")
# Only for debug, compare the result with deepdiff
pc.compare_lists_with_epsilon_ignore_dict_order_deepdiff(a=query_res, b=exp_res)
assert result
return result
else:

View File

@ -7,6 +7,9 @@ from utils.util_log import test_log as log
import numpy as np
from collections.abc import Iterable
import json
from datetime import datetime
from deepdiff import DeepDiff
epsilon = ct.epsilon
@ -69,6 +72,75 @@ def deep_approx_compare(x, y, epsilon=epsilon):
return x == y
import re
# Pre-compile regex patterns for better performance
_GEO_PATTERN = re.compile(r'(POINT|LINESTRING|POLYGON)\s+\(')
_WHITESPACE_PATTERN = re.compile(r'\s+')
def normalize_geo_string(s):
"""
Normalize a GEO string by removing extra whitespace.
Args:
s: String value that might be a GEO type (POINT, LINESTRING, POLYGON)
Returns:
Normalized GEO string or original value if not a GEO string
"""
if isinstance(s, str) and s.startswith(('POINT', 'LINESTRING', 'POLYGON')):
s = _GEO_PATTERN.sub(r'\1(', s)
s = _WHITESPACE_PATTERN.sub(' ', s).strip()
return s
def normalize_value(value):
"""
Normalize values for comparison by converting to standard types and formats.
"""
# Fast path for None and simple immutable types
if value is None or isinstance(value, (bool, int)):
return value
# Convert numpy types to Python native types
if isinstance(value, (np.integer, np.floating)):
return float(value) if isinstance(value, np.floating) else int(value)
# Handle strings (common case for GEO fields)
if isinstance(value, str):
return normalize_geo_string(value)
# Convert list-like protobuf/custom types to standard list
type_name = type(value).__name__
if type_name in ('RepeatedScalarContainer', 'HybridExtraList', 'RepeatedCompositeContainer'):
value = list(value)
# Handle list of dicts (main use case for search/query results)
if isinstance(value, (list, tuple)):
normalized_list = []
for item in value:
if isinstance(item, dict):
# Normalize GEO strings in dict values
normalized_dict = {}
for k, v in item.items():
if isinstance(v, str):
normalized_dict[k] = normalize_geo_string(v)
elif isinstance(v, (np.integer, np.floating)):
normalized_dict[k] = float(v) if isinstance(v, np.floating) else int(v)
elif isinstance(v, np.ndarray):
normalized_dict[k] = v.tolist()
elif type(v).__name__ in ('RepeatedScalarContainer', 'HybridExtraList', 'RepeatedCompositeContainer'):
normalized_dict[k] = list(v)
else:
normalized_dict[k] = v
normalized_list.append(normalized_dict)
else:
# For non-dict items, just add as-is
normalized_list.append(item)
return normalized_list
# Return as-is for other types
return value
def compare_lists_with_epsilon_ignore_dict_order(a, b, epsilon=epsilon):
"""
Compares two lists of dictionaries for equality (order-insensitive) with floating-point tolerance.
@ -87,7 +159,8 @@ def compare_lists_with_epsilon_ignore_dict_order(a, b, epsilon=epsilon):
"""
if len(a) != len(b):
return False
a = normalize_value(a)
b = normalize_value(b)
# Create a set of available indices for b
available_indices = set(range(len(b)))
@ -110,6 +183,25 @@ def compare_lists_with_epsilon_ignore_dict_order(a, b, epsilon=epsilon):
return True
def compare_lists_with_epsilon_ignore_dict_order_deepdiff(a, b, epsilon=epsilon):
"""
Compare two lists of dictionaries for equality (order-insensitive) with floating-point tolerance using DeepDiff.
"""
# Normalize both lists to handle type differences
a_normalized = normalize_value(a)
b_normalized = normalize_value(b)
for i in range(len(a_normalized)):
diff = DeepDiff(
a_normalized[i],
b_normalized[i],
ignore_order=True,
math_epsilon=epsilon,
significant_digits=1,
ignore_type_in_groups=[(list, tuple)],
ignore_string_type_changes=True,
)
if diff:
log.debug(f"[COMPARE_LISTS] Found differences at row {i}: {diff}")
def ip_check(ip):
if ip == "localhost":

View File

@ -12,6 +12,7 @@ import uuid
from faker import Faker
from sklearn import preprocessing
from common.common_func import gen_unique_str
from common.common_func import gen_timestamptz_str
from common.minio_comm import copy_files_to_minio
from utils.util_log import test_log as log
import pyarrow as pa
@ -45,6 +46,8 @@ class DataField:
array_float_field = "array_float"
array_string_field = "array_string"
new_field = "new_field"
geo_field = "geo"
timestamp_field = "timestamptz"
class DataErrorType:
@ -100,6 +103,51 @@ def gen_binary_vectors(nb, dim):
return vectors
def gen_wkt_geometry(nb, bounds=(0, 100, 0, 100)):
"""
Generate random WKT geometry strings for bulk insert
Generates a mix of POINT, LINESTRING, and POLYGON types
Args:
nb: Number of geometry strings to generate
bounds: Coordinate bounds as (min_x, max_x, min_y, max_y)
Returns:
List of WKT strings
"""
geometries = []
geom_types = ["POINT", "LINESTRING", "POLYGON"]
for _ in range(nb):
geom_type = random.choice(geom_types)
if geom_type == "POINT":
x = random.uniform(bounds[0], bounds[1])
y = random.uniform(bounds[2], bounds[3])
wkt = f"POINT ({x:.2f} {y:.2f})"
elif geom_type == "LINESTRING":
num_points = random.randint(2, 5)
points = []
for _ in range(num_points):
x = random.uniform(bounds[0], bounds[1])
y = random.uniform(bounds[2], bounds[3])
points.append(f"{x:.2f} {y:.2f}")
wkt = f"LINESTRING ({', '.join(points)})"
else: # POLYGON
# Generate a simple rectangle polygon
x = random.uniform(bounds[0], bounds[1] - 20)
y = random.uniform(bounds[2], bounds[3] - 20)
width = random.uniform(10, 20)
height = random.uniform(10, 20)
wkt = f"POLYGON (({x:.2f} {y:.2f}, {x + width:.2f} {y:.2f}, {x + width:.2f} {y + height:.2f}, {x:.2f} {y + height:.2f}, {x:.2f} {y:.2f}))"
geometries.append(wkt)
return geometries
def gen_fp16_vectors(num, dim, for_json=False):
"""
generate float16 vector data
@ -468,6 +516,19 @@ def gen_json_in_numpy_file(dir, data_field, rows, start=0, force=False):
return file_name
def gen_geometry_in_numpy_file(dir, data_field, rows, start=0, force=False):
file_name = f"{data_field}.npy"
file = f"{dir}/{file_name}"
if not os.path.exists(file) or force:
data = []
if rows > 0:
data = gen_wkt_geometry(rows)
arr = np.array(data)
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}")
np.save(file, arr)
return file_name
def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False, nullable=False, **kwargs):
file_name = f"{data_field}.npy"
file = f"{dir}/{file_name}"
@ -635,6 +696,17 @@ def gen_data_by_data_field(data_field, rows, start=0, float_vector=True, dim=128
for i in range(start, rows + start)])
else:
data = [None for _ in range(start, rows + start)]
elif data_field == DataField.geo_field:
if not nullable:
# Generate WKT geometry strings for parquet
data = gen_wkt_geometry(rows)
else:
data = [None for _ in range(start, rows + start)]
elif data_field == DataField.timestamp_field:
if not nullable:
data = [gen_timestamptz_str() for _ in range(start, rows + start)]
else:
data = [None for _ in range(start, rows + start)]
else:
raise Exception("unsupported field name")
@ -796,6 +868,17 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d
d[data_field] = [gen_unique_str(str(i)) for i in range(array_length)]
else:
d[data_field] = None
elif data_field == DataField.geo_field:
if not nullable:
# Generate a single WKT geometry string
d[data_field] = gen_wkt_geometry(1)[0]
else:
d[data_field] = None
elif data_field == DataField.timestamp_field:
if not nullable:
d[data_field] = gen_timestamptz_str()
else:
d[data_field] = None
else:
raise Exception("unsupported field name")
if enable_dynamic_field:
@ -906,6 +989,8 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num
file_name = gen_bool_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force)
elif data_field == DataField.json_field:
file_name = gen_json_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force)
elif data_field == DataField.geo_field:
file_name = gen_geometry_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force)
else:
file_name = gen_int_or_float_in_numpy_file(dir=data_source_new, data_field=data_field,
rows=rows, force=force, nullable=nullable, shuffle_pk=shuffle_pk)

View File

@ -25,6 +25,10 @@ import bm25s
import jieba
import re
import inspect
from typing import Optional, Tuple
from zoneinfo import ZoneInfo
from datetime import datetime, timedelta, timezone as tzmod
from datetime import timezone
from pymilvus import CollectionSchema, DataType, FunctionType, Function, MilvusException, MilvusClient
@ -677,10 +681,19 @@ def gen_string_field(name=ct.default_string_field_name, description=ct.default_d
def gen_json_field(name=ct.default_json_field_name, description=ct.default_desc, is_primary=False, **kwargs):
return gen_scalar_field(DataType.JSON, name=name, description=description, is_primary=is_primary, **kwargs)
def gen_geometry_field(name=ct.default_geometry_field_name, description=ct.default_desc, is_primary=False, **kwargs):
return gen_scalar_field(DataType.GEOMETRY, name=name, description=description, is_primary=is_primary, **kwargs)
def gen_geometry_field(name="geo", description=ct.default_desc, is_primary=False, **kwargs):
return gen_scalar_field(DataType.GEOMETRY, name=name, description=description, is_primary=is_primary, **kwargs)
def gen_timestamptz_field(name=ct.default_timestamptz_field_name, description=ct.default_desc, is_primary=False, **kwargs):
return gen_scalar_field(DataType.TIMESTAMPTZ, name=name, description=description, is_primary=is_primary, **kwargs)
def gen_array_field(name=ct.default_array_field_name, element_type=DataType.INT64, max_capacity=ct.default_max_capacity,
description=ct.default_desc, is_primary=False, **kwargs):
return gen_scalar_field(DataType.ARRAY, name=name, description=description, is_primary=is_primary,
return gen_scalar_field(DataType.ARRAY, name=name, description=description, is_primary=is_primary,
element_type=element_type, max_capacity=max_capacity, **kwargs)
@ -827,7 +840,8 @@ def gen_default_collection_schema(description=ct.default_desc, primary_field=ct.
def gen_all_datatype_collection_schema(description=ct.default_desc, primary_field=ct.default_int64_field_name,
auto_id=False, dim=ct.default_dim, enable_dynamic_field=True, nullable=True,**kwargs):
auto_id=False, dim=ct.default_dim, enable_dynamic_field=True, nullable=True,
enable_struct_array_field=True, **kwargs):
analyzer_params = {
"tokenizer": "standard",
}
@ -839,6 +853,8 @@ def gen_all_datatype_collection_schema(description=ct.default_desc, primary_fiel
gen_string_field(name="text", max_length=2000, enable_analyzer=True, enable_match=True,
analyzer_params=analyzer_params),
gen_json_field(nullable=nullable),
gen_geometry_field(nullable=nullable),
gen_timestamptz_field(nullable=nullable),
gen_array_field(name="array_int", element_type=DataType.INT64),
gen_array_field(name="array_float", element_type=DataType.FLOAT),
gen_array_field(name="array_varchar", element_type=DataType.VARCHAR, max_length=200),
@ -847,11 +863,46 @@ def gen_all_datatype_collection_schema(description=ct.default_desc, primary_fiel
gen_int8_vec_field(name="image_emb", dim=dim),
gen_float_vec_field(name="text_sparse_emb", vector_data_type=DataType.SPARSE_FLOAT_VECTOR),
gen_float_vec_field(name="voice_emb", dim=dim),
# gen_timestamptz_field(name="timestamptz", nullable=nullable),
]
schema, _ = ApiCollectionSchemaWrapper().init_collection_schema(fields=fields, description=description,
primary_field=primary_field, auto_id=auto_id,
enable_dynamic_field=enable_dynamic_field, **kwargs)
# Create schema using MilvusClient
schema = MilvusClient.create_schema(
auto_id=auto_id,
enable_dynamic_field=enable_dynamic_field,
description=description,
**kwargs
)
# Add all fields using schema.add_field()
schema.add_field(primary_field, DataType.INT64, is_primary=True)
schema.add_field(ct.default_float_field_name, DataType.FLOAT, nullable=nullable)
schema.add_field(ct.default_string_field_name, DataType.VARCHAR, max_length=ct.default_max_length, nullable=nullable)
schema.add_field("document", DataType.VARCHAR, max_length=2000, enable_analyzer=True, enable_match=True, nullable=nullable)
schema.add_field("text", DataType.VARCHAR, max_length=2000, enable_analyzer=True, enable_match=True,
analyzer_params=analyzer_params)
schema.add_field(ct.default_json_field_name, DataType.JSON, nullable=nullable)
schema.add_field(ct.default_geometry_field_name, DataType.GEOMETRY, nullable=nullable)
schema.add_field(ct.default_timestamptz_field_name, DataType.TIMESTAMPTZ, nullable=nullable)
schema.add_field("array_int", DataType.ARRAY, element_type=DataType.INT64, max_capacity=ct.default_max_capacity)
schema.add_field("array_float", DataType.ARRAY, element_type=DataType.FLOAT, max_capacity=ct.default_max_capacity)
schema.add_field("array_varchar", DataType.ARRAY, element_type=DataType.VARCHAR, max_length=200, max_capacity=ct.default_max_capacity)
schema.add_field("array_bool", DataType.ARRAY, element_type=DataType.BOOL, max_capacity=ct.default_max_capacity)
schema.add_field(ct.default_float_vec_field_name, DataType.FLOAT_VECTOR, dim=dim)
schema.add_field("image_emb", DataType.INT8_VECTOR, dim=dim)
schema.add_field("text_sparse_emb", DataType.SPARSE_FLOAT_VECTOR)
# schema.add_field("voice_emb", DataType.FLOAT_VECTOR, dim=dim)
# Add struct array field
if enable_struct_array_field:
struct_schema = MilvusClient.create_struct_field_schema()
struct_schema.add_field("name", DataType.VARCHAR, max_length=200)
struct_schema.add_field("age", DataType.INT64)
struct_schema.add_field("float_vector", DataType.FLOAT_VECTOR, dim=dim)
schema.add_field("array_struct", datatype=DataType.ARRAY, element_type=DataType.STRUCT,
struct_schema=struct_schema, max_capacity=10)
# Add BM25 function
bm25_function = Function(
name=f"text",
function_type=FunctionType.BM25,
@ -860,6 +911,7 @@ def gen_all_datatype_collection_schema(description=ct.default_desc, primary_fiel
params={},
)
schema.add_function(bm25_function)
return schema
@ -1768,25 +1820,54 @@ def get_column_data_by_schema(nb=ct.default_nb, schema=None, skip_vectors=False,
return data
def convert_orm_schema_to_dict_schema(orm_schema):
"""
Convert ORM CollectionSchema object to dict format (same as describe_collection output).
Args:
orm_schema: CollectionSchema object from pymilvus.orm
Returns:
dict: Schema in dict format compatible with MilvusClient describe_collection output
"""
# Use the built-in to_dict() method which already provides the right structure
schema_dict = orm_schema.to_dict()
# to_dict() already includes:
# - auto_id
# - description
# - fields (with each field's to_dict())
# - enable_dynamic_field
# - functions (if present)
# - struct_fields (if present)
return schema_dict
def gen_row_data_by_schema(nb=ct.default_nb, schema=None, start=0, random_pk=False, skip_field_names=[], desired_field_names=[]):
"""
Generates row data based on the given schema.
Args:
nb (int): Number of rows to generate. Defaults to ct.default_nb.
schema (Schema): Collection schema or collection info. If None, uses default schema.
schema (Schema): Collection schema or collection info. Can be:
- dict (from client.describe_collection())
- CollectionSchema object (from ORM)
- None (uses default schema)
start (int): Starting value for primary key fields. Defaults to 0.
random_pk (bool, optional): Whether to generate random primary key values (default: False)
skip_field_names(list, optional): whether to skip some field to gen data manually (default: [])
desired_field_names(list, optional): only generate data for specified field names (default: [])
Returns:
list[dict]: List of dictionaries where each dictionary represents a row,
with field names as keys and generated data as values.
Notes:
- Skips auto_id fields and function output fields.
- For primary key fields, generates sequential values starting from 'start'.
- For non-primary fields, generates random data based on field type.
- Supports struct array fields in both dict and ORM schema formats.
"""
# if both skip_field_names and desired_field_names are specified, raise an exception
if skip_field_names and desired_field_names:
@ -1795,84 +1876,96 @@ def gen_row_data_by_schema(nb=ct.default_nb, schema=None, start=0, random_pk=Fal
if schema is None:
schema = gen_default_collection_schema()
# ignore auto id field and the fields in function output
# Convert ORM schema to dict schema for unified processing
if not isinstance(schema, dict):
schema = convert_orm_schema_to_dict_schema(schema)
# Now schema is always a dict after conversion, process it uniformly
# Get all fields from schema
all_fields = schema.get('fields', [])
fields = []
for field in all_fields:
# if desired_field_names is specified, only generate the fields in desired_field_names
if field.get('name', None) in desired_field_names:
fields.append(field)
# elif desired_field_names is not specified, generate all fields
elif not desired_field_names:
fields.append(field)
# Get struct_fields from schema
struct_fields = schema.get('struct_fields', [])
log.debug(f"[gen_row_data_by_schema] struct_fields from schema: {len(struct_fields)} items")
if struct_fields:
log.debug(f"[gen_row_data_by_schema] First struct_field: {struct_fields[0]}")
# If struct_fields is not present, extract struct array fields from fields list
# This happens when using client.describe_collection()
if not struct_fields:
struct_fields = []
for field in fields:
if field.get('type') == DataType.ARRAY and field.get('element_type') == DataType.STRUCT:
# Convert field format to struct_field format
struct_field_dict = {
'name': field.get('name'),
'max_capacity': field.get('params', {}).get('max_capacity', 100),
'fields': []
}
# Get struct fields from field - key can be 'struct_fields' or 'struct_schema'
struct_field_list = field.get('struct_fields') or field.get('struct_schema')
if struct_field_list:
# If it's a dict with 'fields' key, get the fields
if isinstance(struct_field_list, dict) and 'fields' in struct_field_list:
struct_field_dict['fields'] = struct_field_list['fields']
# If it's already a list, use it directly
elif isinstance(struct_field_list, list):
struct_field_dict['fields'] = struct_field_list
struct_fields.append(struct_field_dict)
# Get function output fields to skip
func_output_fields = []
if isinstance(schema, dict):
# a dict of collection schema info is usually from client.describe_collection()
all_fields = schema.get('fields', [])
fields = []
for field in all_fields:
# if desired_field_names is specified, only generate the fields in desired_field_names
if field.get('name', None) in desired_field_names:
fields.append(field)
# elif desired_field_names is not specified, generate all fields
elif not desired_field_names:
fields.append(field)
functions = schema.get('functions', [])
for func in functions:
output_field_names = func.get('output_field_names', [])
func_output_fields.extend(output_field_names)
func_output_fields = list(set(func_output_fields))
functions = schema.get('functions', [])
for func in functions:
output_field_names = func.get('output_field_names', [])
func_output_fields.extend(output_field_names)
func_output_fields = list(set(func_output_fields))
# Filter fields that need data generation
fields_needs_data = []
for field in fields:
field_name = field.get('name', None)
if field.get('auto_id', False):
continue
if field_name in func_output_fields or field_name in skip_field_names:
continue
# Skip struct array fields as they are handled separately via struct_fields
if field.get('type') == DataType.ARRAY and field.get('element_type') == DataType.STRUCT:
continue
fields_needs_data.append(field)
fields_needs_data = []
for field in fields:
field_name = field.get('name', None)
if field.get('auto_id', False):
continue
if field_name in func_output_fields or field_name in skip_field_names:
continue
fields_needs_data.append(field)
data = []
for i in range(nb):
tmp = {}
for field in fields_needs_data:
tmp[field.get('name', None)] = gen_data_by_collection_field(field, random_pk=random_pk)
if field.get('is_primary', False) is True and field.get('type', None) == DataType.INT64:
tmp[field.get('name', None)] = start
start += 1
if field.get('is_primary', False) is True and field.get('type', None) == DataType.VARCHAR:
tmp[field.get('name', None)] = str(start)
start += 1
data.append(tmp)
else:
# a schema object is usually form orm schema object
all_fields = schema.fields
fields = []
for field in all_fields:
# if desired_field_names is specified, only generate the fields in desired_field_names
if field.name in desired_field_names:
fields.append(field)
# elif desired_field_names is not specified, generate all fields
elif not desired_field_names:
fields.append(field)
# Generate data for each row
data = []
for i in range(nb):
tmp = {}
# Generate data for regular fields
for field in fields_needs_data:
tmp[field.get('name', None)] = gen_data_by_collection_field(field, random_pk=random_pk)
# Handle primary key fields specially
if field.get('is_primary', False) is True and field.get('type', None) == DataType.INT64:
tmp[field.get('name', None)] = start
start += 1
if field.get('is_primary', False) is True and field.get('type', None) == DataType.VARCHAR:
tmp[field.get('name', None)] = str(start)
start += 1
if hasattr(schema, "functions"):
functions = schema.functions
for func in functions:
output_field_names = func.output_field_names
func_output_fields.extend(output_field_names)
func_output_fields = list(set(func_output_fields))
# Generate data for struct array fields
for struct_field in struct_fields:
field_name = struct_field.get('name', None)
struct_data = gen_struct_array_data(struct_field, start=start, random_pk=random_pk)
tmp[field_name] = struct_data
fields_needs_data = []
for field in fields:
if field.auto_id:
continue
if field.name in func_output_fields or field.name in skip_field_names:
continue
fields_needs_data.append(field)
data = []
for i in range(nb):
tmp = {}
for field in fields_needs_data:
tmp[field.name] = gen_data_by_collection_field(field, random_pk=random_pk)
if field.is_primary is True and field.dtype == DataType.INT64:
tmp[field.name] = start
start += 1
if field.is_primary is True and field.dtype == DataType.VARCHAR:
tmp[field.name] = str(start)
start += 1
data.append(tmp)
data.append(tmp)
log.debug(f"[gen_row_data_by_schema] Generated {len(data)} rows, first row keys: {list(data[0].keys()) if data else []}")
return data
@ -1984,6 +2077,15 @@ def get_json_field_name_list(schema=None):
json_fields.append(field.name)
return json_fields
def get_geometry_field_name_list(schema=None):
geometry_fields = []
if schema is None:
schema = gen_default_collection_schema()
fields = schema.fields
for field in fields:
if field.dtype == DataType.GEOMETRY:
geometry_fields.append(field.name)
return geometry_fields
def get_binary_vec_field_name(schema=None):
if schema is None:
@ -2015,6 +2117,17 @@ def get_int8_vec_field_name_list(schema=None):
vec_fields.append(field.name)
return vec_fields
def get_emb_list_field_name_list(schema=None):
vec_fields = []
if schema is None:
schema = gen_default_collection_schema()
struct_fields = schema.struct_fields
for struct_field in struct_fields:
for field in struct_field.fields:
if field.dtype in [DataType.FLOAT_VECTOR]:
vec_fields.append(f"{struct_field.name}[{field.name}]")
return vec_fields
def get_bm25_vec_field_name_list(schema=None):
if not hasattr(schema, "functions"):
return []
@ -2052,6 +2165,40 @@ def get_dense_anns_field_name_list(schema=None):
anns_fields.append(item)
return anns_fields
def get_struct_array_vector_field_list(schema=None):
if schema is None:
schema = gen_default_collection_schema()
struct_fields = schema.struct_fields
struct_vector_fields = []
for struct_field in struct_fields:
struct_field_name = struct_field.name
# Check each sub-field for vector types
for sub_field in struct_field.fields:
sub_field_name = sub_field.name if hasattr(sub_field, 'name') else sub_field.get('name')
sub_field_dtype = sub_field.dtype if hasattr(sub_field, 'dtype') else sub_field.get('type')
if sub_field_dtype in [DataType.FLOAT_VECTOR, DataType.FLOAT16_VECTOR,
DataType.BFLOAT16_VECTOR, DataType.INT8_VECTOR,
DataType.BINARY_VECTOR]:
# Get dimension
if hasattr(sub_field, 'params'):
dim = sub_field.params.get('dim')
else:
dim = sub_field.get('params', {}).get('dim')
item = {
"struct_field": struct_field_name,
"vector_field": sub_field_name,
"anns_field": f"{struct_field_name}[{sub_field_name}]",
"dtype": sub_field_dtype,
"dim": dim
}
struct_vector_fields.append(item)
return struct_vector_fields
def gen_varchar_data(length: int, nb: int, text_mode=False):
if text_mode:
@ -2060,6 +2207,38 @@ def gen_varchar_data(length: int, nb: int, text_mode=False):
return ["".join([chr(random.randint(97, 122)) for _ in range(length)]) for _ in range(nb)]
def gen_struct_array_data(struct_field, start=0, random_pk=False):
"""
Generates struct array data based on the struct field schema.
Args:
struct_field: Either a dict (from dict schema) or StructFieldSchema object (from ORM schema)
start: Starting value for primary key fields
random_pk: Whether to generate random primary key values
Returns:
List of struct data dictionaries
"""
struct_array_data = []
# Handle both dict and object formats
if isinstance(struct_field, dict):
max_capacity = struct_field.get('max_capacity', 100)
fields = struct_field.get('fields', [])
else:
# StructFieldSchema object
max_capacity = getattr(struct_field, 'max_capacity', 100) or 100
fields = struct_field.fields
arr_len = random.randint(1, max_capacity)
for _ in range(arr_len):
struct_data = {}
for field in fields:
field_name = field.get('name') if isinstance(field, dict) else field.name
struct_data[field_name] = gen_data_by_collection_field(field, nb=None, start=start, random_pk=random_pk)
struct_array_data.append(struct_data)
return struct_array_data
def gen_data_by_collection_field(field, nb=None, start=0, random_pk=False):
"""
Generates test data for a given collection field based on its data type and properties.
@ -2085,7 +2264,8 @@ def gen_data_by_collection_field(field, nb=None, start=0, random_pk=False):
# for v2 client, it accepts a dict of field info
nullable = field.get('nullable', False)
data_type = field.get('type', None)
enable_analyzer = field.get('params').get("enable_analyzer", False)
params = field.get('params', {}) or {}
enable_analyzer = params.get("enable_analyzer", False)
is_primary = field.get('is_primary', False)
else:
# for ORM client, it accepts a field object
@ -2179,6 +2359,17 @@ def gen_data_by_collection_field(field, nb=None, start=0, random_pk=False):
else:
# gen 20% none data for nullable field
return [None if i % 2 == 0 and random.random() < 0.4 else {"name": str(i), "address": i, "count": random.randint(0, 100)} for i in range(nb)]
elif data_type == DataType.GEOMETRY:
if nb is None:
lon = random.uniform(-180, 180)
lat = random.uniform(-90, 90)
return f"POINT({lon} {lat})" if random.random() < 0.8 or nullable is False else None
if nullable is False:
return [f"POINT({random.uniform(-180, 180)} {random.uniform(-90, 90)})" for _ in range(nb)]
else:
# gen 20% none data for nullable field
return [None if i % 2 == 0 and random.random() < 0.4 else f"POINT({random.uniform(-180, 180)} {random.uniform(-90, 90)})" for i in range(nb)]
elif data_type in ct.all_vector_types:
if isinstance(field, dict):
dim = ct.default_dim if data_type == DataType.SPARSE_FLOAT_VECTOR else field.get('params')['dim']
@ -2193,9 +2384,16 @@ def gen_data_by_collection_field(field, nb=None, start=0, random_pk=False):
elif data_type == DataType.ARRAY:
if isinstance(field, dict):
max_capacity = field.get('params')['max_capacity']
element_type = field.get('element_type')
else:
max_capacity = field.params['max_capacity']
element_type = field.element_type
element_type = field.element_type
# Struct array fields are handled separately in gen_row_data_by_schema
# by processing struct_fields, so skip here
if element_type == DataType.STRUCT:
return None
if element_type == DataType.INT8:
if nb is None:
return [random.randint(-128, 127) for _ in range(max_capacity)] if random.random() < 0.8 or nullable is False else None
@ -2266,10 +2464,52 @@ def gen_data_by_collection_field(field, nb=None, start=0, random_pk=False):
else:
# gen 20% none data for nullable field
return [None if i % 2 == 0 and random.random() < 0.4 else "".join([chr(random.randint(97, 122)) for _ in range(length)]) for i in range(nb)]
elif data_type == DataType.TIMESTAMPTZ:
if nb is None:
return gen_timestamptz_str() if random.random() < 0.8 or nullable is False else None
if nullable is False:
return [gen_timestamptz_str() for _ in range(nb)]
# gen 20% none data for nullable field
return [None if i % 2 == 0 and random.random() < 0.4 else gen_timestamptz_str() for i in range(nb)]
else:
raise MilvusException(message=f"gen data failed, data type {data_type} not implemented")
return None
def gen_timestamptz_str():
"""
Generate a timestamptz string
Example:
"2024-12-31 22:00:00"
"2024-12-31T22:00:00"
"2024-12-31T22:00:00+08:00"
"2024-12-31T22:00:00-08:00"
"2024-12-31T22:00:00Z"
"""
base = datetime(2024, 1, 1, tzinfo=timezone.utc) + timedelta(
days=random.randint(0, 365 * 3), seconds=random.randint(0, 86399)
)
# 2/3 chance to generate timezone-aware string, otherwise naive
if random.random() < 2 / 3:
# 20% chance to use 'Z' (UTC), always RFC3339 with 'T'
if random.random() < 0.2:
return base.strftime("%Y-%m-%dT%H:%M:%S") + "Z"
# otherwise use explicit offset
offset_hours = random.randint(-12, 14)
if offset_hours == -12 or offset_hours == 14:
offset_minutes = 0
else:
offset_minutes = random.choice([0, 30])
tz = timezone(timedelta(hours=offset_hours, minutes=offset_minutes))
local_dt = base.astimezone(tz)
tz_str = local_dt.strftime("%z") # "+0800"
tz_str = tz_str[:3] + ":" + tz_str[3:] # "+08:00"
dt_str = local_dt.strftime("%Y-%m-%dT%H:%M:%S")
return dt_str + tz_str
else:
# naive time string (no timezone), e.g. "2024-12-31 22:00:00"
return base.strftime("%Y-%m-%d %H:%M:%S")
def gen_varchar_values(nb: int, length: int = 0):
return ["".join([chr(random.randint(97, 122)) for _ in range(length)]) for _ in range(nb)]
@ -3968,102 +4208,206 @@ def parse_fmod(x: int, y: int) -> int:
return v if x >= 0 else -v
def gen_partial_row_data_by_schema(nb=ct.default_nb, schema=None, desired_field_names=None, num_fields=1,
start=0, random_pk=False, skip_field_names=[]):
def convert_timestamptz(rows, timestamptz_field_name, timezone="UTC"):
"""
Generate row data that contains a subset of fields from the given schema.
Convert timestamptz string to desired timezone string
Args:
schema: Collection schema or collection info dict. If None, uses default schema.
desired_field_names (list[str] | None): Explicit field names to include (intersected with eligible fields).
num_fields (int): Number of fields to include if desired_field_names is not provided. Defaults to 1.
start (int): Starting value for primary key fields when sequential values are needed.
random_pk (bool): Whether to generate random primary key values.
skip_field_names (list[str]): Field names to skip.
nb (int): Number of rows to generate. Defaults to 1.
rows: list of rows data with timestamptz string
timestamptz_field_name: name of the timestamptz field
timezone: timezone to convert to (default: UTC)
Returns:
list[dict]: a list of rows.
Notes:
- Skips auto_id fields and function output fields.
- Primary INT64/VARCHAR fields get sequential values from `start` unless `random_pk=True`.
- Works with both schema dicts (from v2 client describe_collection) and ORM schema objects.
list of rows data with timestamptz string converted to desired timezone string
Note:
Naive timestamps (e.g. ``YYYY-MM-DD HH:MM:SS`` with no offset information)
are treated as already expressed in the desired timezone. In those cases we
simply append the correct offset for the provided timezone instead of
converting from UTC first.
"""
if schema is None:
schema = gen_default_collection_schema()
func_output_fields = []
# Build list of eligible fields
if isinstance(schema, dict):
fields = schema.get('fields', [])
functions = schema.get('functions', [])
for func in functions:
output_field_names = func.get('output_field_names', [])
func_output_fields.extend(output_field_names)
func_output_fields = list(set(func_output_fields))
eligible_fields = []
for field in fields:
field_name = field.get('name', None)
if field.get('auto_id', False):
continue
if field_name in func_output_fields or field_name in skip_field_names:
continue
eligible_fields.append(field)
# Choose subset
if desired_field_names:
desired_set = set(desired_field_names)
chosen_fields = [f for f in eligible_fields if f.get('name') in desired_set]
iso_offset_re = re.compile(r"([+-])(\d{2}):(\d{2})$")
def _days_in_month(year: int, month: int) -> int:
if month in (1, 3, 5, 7, 9, 10, 12):
return 31
if month in (4, 6, 8, 11):
return 30
# February
is_leap = (year % 4 == 0 and (year % 100 != 0 or year % 400 == 0))
return 29 if is_leap else 28
def _parse_basic(ts: str) -> Tuple[int, int, int, int, int, int, Optional[Tuple[str, int, int]], bool]:
s = ts.strip()
s = s.replace(" ", "T", 1)
has_z = False
if s.endswith("Z") or s.endswith("z"):
has_z = True
s = s[:-1]
# split offset if present
m = iso_offset_re.search(s)
offset = None
if m:
sign, hh, mm = m.groups()
offset = (sign, int(hh), int(mm))
s = s[:m.start()]
# now s like YYYY-MM-DDTHH:MM:SS or with fractional seconds
if "T" not in s:
raise ValueError(f"Invalid timestamp string: {ts}")
date_part, time_part = s.split("T", 1)
y_str, mon_str, d_str = date_part.split("-")
# strip fractional seconds
if "." in time_part:
time_part = time_part.split(".", 1)[0]
hh_str, mi_str, se_str = time_part.split(":")
return int(y_str), int(mon_str), int(d_str), int(hh_str), int(mi_str), int(se_str), offset, has_z
def _apply_offset_to_utc(year: int, month: int, day: int, hour: int, minute: int, second: int, offset: Tuple[str, int, int]) -> Tuple[int, int, int, int, int, int]:
sign, oh, om = offset
# local time -> UTC
delta_minutes = oh * 60 + om
if sign == '+':
# UTC = local - offset
delta_minutes = -delta_minutes
else:
n = max(0, min(len(eligible_fields), num_fields if num_fields is not None else 1))
chosen_fields = eligible_fields[:n]
rows = []
curr_start = start
for _ in range(nb):
row = {}
for field in chosen_fields:
fname = field.get('name', None)
value = gen_data_by_collection_field(field, random_pk=random_pk)
# Override for PKs when not random
if not random_pk and field.get('is_primary', False) is True:
if field.get('type', None) == DataType.INT64:
value = curr_start
curr_start += 1
elif field.get('type', None) == DataType.VARCHAR:
value = str(curr_start)
curr_start += 1
row[fname] = value
rows.append(row)
return rows
# ORM schema path
fields = schema.fields
if hasattr(schema, "functions"):
functions = schema.functions
for func in functions:
func_output_fields.extend(func.output_field_names)
func_output_fields = list(set(func_output_fields))
eligible_fields = []
for field in fields:
if field.auto_id:
continue
if field.name in func_output_fields or field.name in skip_field_names:
continue
eligible_fields.append(field)
if desired_field_names:
desired_set = set(desired_field_names)
chosen_fields = [f for f in eligible_fields if f.name in desired_set]
else:
n = max(0, min(len(eligible_fields), num_fields if num_fields is not None else 1))
chosen_fields = eligible_fields[:n]
rows = []
curr_start = start
for _ in range(nb):
row = {}
for field in chosen_fields:
value = gen_data_by_collection_field(field, random_pk=random_pk)
if not random_pk and field.is_primary is True:
if field.dtype == DataType.INT64:
value = curr_start
curr_start += 1
elif field.dtype == DataType.VARCHAR:
value = str(curr_start)
curr_start += 1
row[field.name] = value
rows.append(row)
return rows
# sign '-' means local is behind UTC; UTC = local + offset
delta_minutes = +delta_minutes
# apply minutes
total_minutes = hour * 60 + minute + delta_minutes
new_hour = hour
new_minute = minute
carry_days = 0
# normalize down
if total_minutes < 0:
carry_days = (total_minutes - 59) // (60 * 24) # negative floor division
total_minutes -= carry_days * 60 * 24
else:
carry_days = total_minutes // (60 * 24)
total_minutes = total_minutes % (60 * 24)
new_hour = total_minutes // 60
new_minute = total_minutes % 60
# seconds unchanged here
# apply day carry
day += carry_days
# normalize date
while True:
if day <= 0:
month -= 1
if month == 0:
month = 12
year -= 1
day += _days_in_month(year, month)
else:
dim = _days_in_month(year, month)
if day > dim:
day -= dim
month += 1
if month == 13:
month = 1
year += 1
else:
break
return year, month, day, new_hour, new_minute, second
def _format_with_offset_str(dt: datetime) -> str:
# format with colon in tz offset
if dt.tzinfo is not None and dt.utcoffset() == tzmod.utc.utcoffset(dt):
return dt.strftime('%Y-%m-%dT%H:%M:%SZ')
s = dt.strftime('%Y-%m-%dT%H:%M:%S%z') # +0800
if len(s) >= 5:
return s[:-5] + s[-5:-2] + ':' + s[-2:]
return s
def _format_fixed(y: int, m: int, d: int, hh: int, mi: int, ss: int, offset_minutes: int) -> str:
if offset_minutes == 0:
return f"{y:04d}-{m:02d}-{d:02d}T{hh:02d}:{mi:02d}:{ss:02d}Z"
sign = '+' if offset_minutes >= 0 else '-'
total = abs(offset_minutes)
oh, om = divmod(total, 60)
return f"{y:04d}-{m:02d}-{d:02d}T{hh:02d}:{mi:02d}:{ss:02d}{sign}{oh:02d}:{om:02d}"
def convert_one(ts: str) -> str:
# Try python builtins first for typical range 1..9999
raw = ts.strip()
# normalize space separator and 'Z'
norm = raw.replace(' ', 'T', 1)
if norm.endswith('Z') or norm.endswith('z'):
norm = norm[:-1] + '+00:00'
try:
dt = None
if iso_offset_re.search(norm):
# aware input; convert to target zone
dt = datetime.fromisoformat(norm)
dt_target = dt.astimezone(ZoneInfo(timezone))
return _format_with_offset_str(dt_target)
else:
y, mo, d, hh, mi, ss, _, _ = _parse_basic(raw)
if not (1 <= y <= 9999):
raise ValueError("year out of range for datetime")
tzinfo = ZoneInfo(timezone)
dt_local = datetime(y, mo, d, hh, mi, ss, tzinfo=tzinfo)
return _format_with_offset_str(dt_local)
except Exception:
# manual fallback (handles year 0 and overflow beyond 9999)
y, mo, d, hh, mi, ss, offset, has_z = _parse_basic(raw)
if offset is None and not has_z:
# naive input outside datetime supported range; attach offset only
target_minutes = 0
try:
tzinfo = ZoneInfo(timezone)
ref_year = 2004 # leap year to keep Feb 29 valid
ref_dt = datetime(ref_year, mo, d, hh, mi, ss, tzinfo=tzinfo)
off_td = ref_dt.utcoffset()
if off_td is not None:
target_minutes = int(off_td.total_seconds() // 60)
except Exception:
if timezone == 'Asia/Shanghai':
target_minutes = 480
return _format_fixed(y, mo, d, hh, mi, ss, target_minutes)
# compute UTC components first
if offset is None and has_z:
uy, um, ud, uh, umi, uss = y, mo, d, hh, mi, ss
elif offset is None:
# already handled above, but keep safety fallback to just append offset
if 1 <= y <= 9999:
tzinfo = ZoneInfo(timezone)
dt_local = datetime(y, mo, d, hh, mi, ss, tzinfo=tzinfo)
return _format_with_offset_str(dt_local)
target_minutes = 480 if timezone == 'Asia/Shanghai' else 0
return _format_fixed(y, mo, d, hh, mi, ss, target_minutes)
else:
uy, um, ud, uh, umi, uss = _apply_offset_to_utc(y, mo, d, hh, mi, ss, offset)
# convert UTC to target timezone if feasible
try:
if 1 <= uy <= 9999:
dt_utc = datetime(uy, um, ud, uh, umi, uss, tzinfo=tzmod.utc)
dt_target = dt_utc.astimezone(ZoneInfo(timezone))
return _format_with_offset_str(dt_target)
except Exception:
pass
# fallback: manually apply timezone offset when datetime conversion fails
# Get target timezone offset
target_minutes = 480 if timezone == 'Asia/Shanghai' else 0
try:
# Try to get actual offset from timezone if possible
if 1 <= uy <= 9999:
test_dt = datetime(uy, um, ud, uh, umi, uss, tzinfo=tzmod.utc)
test_target = test_dt.astimezone(ZoneInfo(timezone))
off_td = test_target.utcoffset() or tzmod.utc.utcoffset(test_target)
target_minutes = int(off_td.total_seconds() // 60)
except Exception:
pass
# Convert UTC to local time: UTC + offset = local
# Reverse the offset sign to convert UTC->local (opposite of local->UTC)
reverse_sign = '-' if target_minutes >= 0 else '+'
ty, tm, td, th, tmi, ts = _apply_offset_to_utc(uy, um, ud, uh, umi, uss, (reverse_sign, abs(target_minutes) // 60, abs(target_minutes) % 60))
return _format_fixed(ty, tm, td, th, tmi, ts, target_minutes)
new_rows = []
for row in rows:
if isinstance(row, dict) and timestamptz_field_name in row and isinstance(row[timestamptz_field_name], str):
row = row.copy()
row[timestamptz_field_name] = convert_one(row[timestamptz_field_name])
new_rows.append(row)
return new_rows

View File

@ -12,6 +12,7 @@ default_dim = 128
default_nb = 2000
default_nb_medium = 5000
default_max_capacity = 100
default_max_length = 500
default_top_k = 10
default_nq = 2
default_limit = 10
@ -39,6 +40,8 @@ default_float_field_name = "float"
default_double_field_name = "double"
default_string_field_name = "varchar"
default_json_field_name = "json_field"
default_geometry_field_name = "geometry_field"
default_timestamptz_field_name = "timestamptz_field"
default_array_field_name = "int_array"
default_int8_array_field_name = "int8_array"
default_int16_array_field_name = "int16_array"

View File

@ -111,7 +111,7 @@ class TestMilvusClientPartialUpdateValid(TestMilvusClientV2Base):
"""
# step 1: create collection with all datatype schema
client = self._client()
schema = cf.gen_all_datatype_collection_schema(dim=default_dim)
schema = cf.gen_all_datatype_collection_schema(dim=default_dim, enable_struct_array_field=False)
index_params = self.prepare_index_params(client)[0]
text_sparse_emb_field_name = "text_sparse_emb"
@ -141,7 +141,8 @@ class TestMilvusClientPartialUpdateValid(TestMilvusClientV2Base):
vector_field_type = [DataType.FLOAT16_VECTOR,
DataType.BFLOAT16_VECTOR,
DataType.INT8_VECTOR]
DataType.INT8_VECTOR,
DataType.FLOAT_VECTOR]
# fields to be updated
update_fields_name = []
scalar_update_name = []
@ -163,6 +164,7 @@ class TestMilvusClientPartialUpdateValid(TestMilvusClientV2Base):
expected = [{field: new_rows[i][field] for field in scalar_update_name}
for i in range(default_nb)]
expected = cf.convert_timestamptz(expected, ct.default_timestamptz_field_name, "UTC")
result = self.query(client, collection_name, filter=f"{primary_key_field_name} >= 0",
check_task=CheckTasks.check_query_results,
output_fields=scalar_update_name,
@ -201,7 +203,7 @@ class TestMilvusClientPartialUpdateValid(TestMilvusClientV2Base):
collection_name = cf.gen_collection_name_by_testcase_name()
# Create schema with all data types
schema = cf.gen_all_datatype_collection_schema(dim=dim)
schema = cf.gen_all_datatype_collection_schema(dim=dim, enable_struct_array_field=False)
# Create index parameters
index_params = client.prepare_index_params()
@ -231,7 +233,7 @@ class TestMilvusClientPartialUpdateValid(TestMilvusClientV2Base):
primary_key_field_name = schema.fields[0].name
for i in range(len(schema.fields)):
update_field_name = schema.fields[i if i != 0 else 1].name
new_row = cf.gen_partial_row_data_by_schema(nb=nb, schema=schema,
new_row = cf.gen_row_data_by_schema(nb=nb, schema=schema,
desired_field_names=[primary_key_field_name, update_field_name])
client.upsert(collection_name, new_row, partial_update=True)
@ -446,7 +448,7 @@ class TestMilvusClientPartialUpdateValid(TestMilvusClientV2Base):
self.upsert(client, collection_name, rows, partial_update=True)
# step 3: Partial Update the nullable field with null
new_row = cf.gen_partial_row_data_by_schema(
new_row = cf.gen_row_data_by_schema(
nb=default_nb,
schema=schema,
desired_field_names=[default_primary_key_field_name, default_int32_field_name],

View File

@ -2659,7 +2659,7 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
def test_upsert_struct_array_data(self):
"""
target: test upsert operation with struct array data
method: insert data then upsert with modified struct array
method: insert 3000 records, flush 2000, insert 1000 growing, then upsert with modified struct array
expected: data successfully upserted
"""
collection_name = cf.gen_unique_str(f"{prefix}_crud")
@ -2669,25 +2669,50 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
# Create collection
self.create_collection_with_schema(client, collection_name)
# Initial insert
initial_data = [
{
"id": 1,
# Insert 2000 records for flushed data
flushed_data = []
for i in range(2000):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [
random.random() for _ in range(default_dim)
],
"scalar_field": 100,
"label": "initial",
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i,
"label": f"flushed_{i}",
}
],
}
]
flushed_data.append(row)
res, check = self.insert(client, collection_name, initial_data)
res, check = self.insert(client, collection_name, flushed_data)
assert check
assert res["insert_count"] == 2000
# Flush to persist data
res, check = self.flush(client, collection_name)
assert check
# Insert 1000 records for growing data
growing_data = []
for i in range(2000, 3000):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i,
"label": f"growing_{i}",
}
],
}
growing_data.append(row)
res, check = self.insert(client, collection_name, growing_data)
assert check
assert res["insert_count"] == 1000
# create index and load collection
index_params = client.prepare_index_params()
index_params.add_index(
@ -2707,40 +2732,63 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
res, check = self.load_collection(client, collection_name)
assert check
# Upsert with modified data
upsert_data = [
{
"id": 1, # Same ID
# Upsert data in both flushed and growing segments
upsert_data = []
# Upsert 10 records from flushed data
for i in range(0, 10):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [
random.random() for _ in range(default_dim)
],
"scalar_field": 200, # Modified
"label": "updated", # Modified
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i + 10000, # Modified
"label": f"updated_flushed_{i}", # Modified
}
],
}
]
upsert_data.append(row)
# Upsert 10 records from growing data
for i in range(2000, 2010):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i + 10000, # Modified
"label": f"updated_growing_{i}", # Modified
}
],
}
upsert_data.append(row)
res, check = self.upsert(client, collection_name, upsert_data)
assert check
# Verify upsert worked
# Verify upsert worked for flushed data
res, check = self.flush(client, collection_name)
assert check
results, check = self.query(client, collection_name, filter="id == 1")
results, check = self.query(client, collection_name, filter="id < 10")
assert check
assert len(results) == 1
assert results[0]["clips"][0]["label"] == "updated"
assert len(results) == 10
for result in results:
assert "updated_flushed" in result["clips"][0]["label"]
# Verify upsert worked for growing data
results, check = self.query(client, collection_name, filter="id >= 2000 and id < 2010")
assert check
assert len(results) == 10
for result in results:
assert "updated_growing" in result["clips"][0]["label"]
@pytest.mark.tags(CaseLabel.L0)
def test_delete_struct_array_data(self):
"""
target: test delete operation with struct array data
method: insert struct array data then delete by ID
method: insert 3000 records (2000 flushed + 1000 growing), then delete by ID from both segments
expected: data successfully deleted
"""
collection_name = cf.gen_unique_str(f"{prefix}_crud")
@ -2750,25 +2798,50 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
# Create collection and insert data
self.create_collection_with_schema(client, collection_name)
data = []
for i in range(10):
# Insert 2000 records for flushed data
flushed_data = []
for i in range(2000):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [
random.random() for _ in range(default_dim)
],
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i,
"label": f"label_{i}",
"label": f"flushed_{i}",
}
],
}
data.append(row)
flushed_data.append(row)
res, check = self.insert(client, collection_name, data)
res, check = self.insert(client, collection_name, flushed_data)
assert check
assert res["insert_count"] == 2000
# Flush to persist data
res, check = self.flush(client, collection_name)
assert check
# Insert 1000 records for growing data
growing_data = []
for i in range(2000, 3000):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i,
"label": f"growing_{i}",
}
],
}
growing_data.append(row)
res, check = self.insert(client, collection_name, growing_data)
assert check
assert res["insert_count"] == 1000
# create index and load collection
index_params = client.prepare_index_params()
index_params.add_index(
@ -2788,9 +2861,14 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
res, check = self.load_collection(client, collection_name)
assert check
# Delete some records
delete_ids = [1, 3, 5]
res, check = self.delete(client, collection_name, filter=f"id in {delete_ids}")
# Delete some records from flushed segment
delete_flushed_ids = [1, 3, 5, 100, 500, 1000]
res, check = self.delete(client, collection_name, filter=f"id in {delete_flushed_ids}")
assert check
# Delete some records from growing segment
delete_growing_ids = [2001, 2003, 2500, 2999]
res, check = self.delete(client, collection_name, filter=f"id in {delete_growing_ids}")
assert check
# Verify deletion
@ -2801,14 +2879,21 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
assert check
remaining_ids = {result["id"] for result in results}
for delete_id in delete_ids:
# Verify flushed data deletion
for delete_id in delete_flushed_ids:
assert delete_id not in remaining_ids
# Verify growing data deletion
for delete_id in delete_growing_ids:
assert delete_id not in remaining_ids
# Verify total count is correct (3000 - 10 deleted)
assert len(results) == 2990
@pytest.mark.tags(CaseLabel.L1)
def test_batch_operations(self):
"""
target: test batch insert/upsert operations with struct array
method: perform large batch operations
method: insert 3000 records (2000 flushed + 1000 growing), then perform batch upsert
expected: all operations successful
"""
collection_name = cf.gen_unique_str(f"{prefix}_crud")
@ -2818,42 +2903,77 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
# Create collection
self.create_collection_with_schema(client, collection_name)
# Large batch insert
batch_size = 1000
data = []
for i in range(batch_size):
# Insert 2000 records for flushed data
flushed_data = []
for i in range(2000):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [
random.random() for _ in range(default_dim)
],
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i % 100,
"label": f"batch_{i}",
"label": f"flushed_{i}",
}
],
}
data.append(row)
flushed_data.append(row)
res, check = self.insert(client, collection_name, data)
res, check = self.insert(client, collection_name, flushed_data)
assert check
assert res["insert_count"] == batch_size
assert res["insert_count"] == 2000
# Batch upsert (update first 100 records)
# Flush to persist data
res, check = self.flush(client, collection_name)
assert check
# Insert 1000 records for growing data
growing_data = []
for i in range(2000, 3000):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i % 100,
"label": f"growing_{i}",
}
],
}
growing_data.append(row)
res, check = self.insert(client, collection_name, growing_data)
assert check
assert res["insert_count"] == 1000
# Batch upsert (update first 100 flushed records and 50 growing records)
upsert_data = []
# Update first 100 flushed records
for i in range(100):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [
random.random() for _ in range(default_dim)
],
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i + 1000, # Modified
"label": f"upserted_{i}", # Modified
"label": f"upserted_flushed_{i}", # Modified
}
],
}
upsert_data.append(row)
# Update first 50 growing records
for i in range(2000, 2050):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i + 1000, # Modified
"label": f"upserted_growing_{i}", # Modified
}
],
}
@ -2862,11 +2982,15 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
res, check = self.upsert(client, collection_name, upsert_data)
assert check
# Verify upsert success with flush
res, check = self.flush(client, collection_name)
assert check
@pytest.mark.tags(CaseLabel.L1)
def test_collection_operations(self):
"""
target: test collection operations (load/release/drop) with struct array
method: perform collection management operations
method: insert 3000 records (2000 flushed + 1000 growing), then perform collection management operations
expected: all operations successful
"""
collection_name = cf.gen_unique_str(f"{prefix}_crud")
@ -2876,25 +3000,49 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
# Create collection with data
self.create_collection_with_schema(client, collection_name)
# Insert some data
data = [
{
"id": 1,
# Insert 2000 records for flushed data
flushed_data = []
for i in range(2000):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [
random.random() for _ in range(default_dim)
],
"scalar_field": 100,
"label": "test",
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i,
"label": f"flushed_{i}",
}
],
}
]
flushed_data.append(row)
res, check = self.insert(client, collection_name, data)
res, check = self.insert(client, collection_name, flushed_data)
assert check
assert res["insert_count"] == 2000
# Flush to persist data
res, check = self.flush(client, collection_name)
assert check
# Insert 1000 records for growing data
growing_data = []
for i in range(2000, 3000):
row = {
"id": i,
"normal_vector": [random.random() for _ in range(default_dim)],
"clips": [
{
"clip_embedding1": [random.random() for _ in range(default_dim)],
"scalar_field": i,
"label": f"growing_{i}",
}
],
}
growing_data.append(row)
res, check = self.insert(client, collection_name, growing_data)
assert check
assert res["insert_count"] == 1000
# Create index for loading
index_params = client.prepare_index_params()
@ -2922,6 +3070,11 @@ class TestMilvusClientStructArrayCRUD(TestMilvusClientV2Base):
load_state = client.get_load_state(collection_name)
assert str(load_state["state"]) == "Loaded"
# Query to verify both flushed and growing data are accessible
results, check = self.query(client, collection_name, filter="id >= 0", limit=3000)
assert check
assert len(results) == 3000
# Release collection
res, check = self.release_collection(client, collection_name)
assert check

View File

@ -43,7 +43,6 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
******************************************************************
"""
@pytest.mark.tags(CaseLabel.L0)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_UTC(self):
"""
target: Test timestamptz can be successfully inserted and queried
@ -81,18 +80,66 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L0)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_Asia_Shanghai(self):
# BUG: https://github.com/milvus-io/milvus/issues/44595
def test_milvus_client_timestamptz_alter_database_property(self):
"""
target: Test timestamptz can be successfully inserted and queried
method:
1. Create a collection
1. Create a collection and alter database properties
2. Generate rows with timestamptz and insert the rows
3. Insert the rows
expected: Step 3 should result success
"""
# step 1: create collection
IANA_timezone = "America/New_York"
client = self._client()
db_name = cf.gen_unique_str("db")
collection_name = cf.gen_collection_name_by_testcase_name()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_timestamp_field_name, DataType.TIMESTAMPTZ, nullable=True)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
index_params.add_index(default_timestamp_field_name, index_type="AUTOINDEX")
self.create_database(client, db_name)
self.use_database(client, db_name)
self.alter_database_properties(client, db_name, properties={"timezone": IANA_timezone})
prop = self.describe_database(client, db_name)
assert prop[0]["timezone"] == IANA_timezone
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
prop = self.describe_collection(client, collection_name)[0].get("properties")
assert prop["timezone"] == IANA_timezone
# step 2: generate rows and insert the rows
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
self.insert(client, collection_name, rows)
# step 3: query the rows
new_rows = cf.convert_timestamptz(rows, default_timestamp_field_name, IANA_timezone)
self.query(client, collection_name, filter=f"{default_primary_key_field_name} >= 0",
check_task=CheckTasks.check_query_results,
check_items={exp_res: new_rows,
"pk_name": default_primary_key_field_name})
self.drop_collection(client, collection_name)
self.drop_database(client, db_name)
@pytest.mark.tags(CaseLabel.L0)
def test_milvus_client_timestamptz_alter_collection_property(self):
"""
target: Test timestamptz can be successfully inserted and queried
method:
1. Create a collection and alter collection properties
2. Generate rows with timestamptz and insert the rows
3. Insert the rows
expected: Step 3 should result success
"""
# step 1: create collection
IANA_timezone = "America/New_York"
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
@ -106,24 +153,251 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
db_name = self.list_databases(client)[0]
self.alter_database_properties(client, db_name, properties={"database.timezone": "Asia/Shanghai"})
# step 2: generate rows and insert the rows
# step 2: alter collection properties
self.alter_collection_properties(client, collection_name, properties={"timezone": IANA_timezone})
prop = self.describe_collection(client, collection_name)[0].get("properties")
assert prop["timezone"] == IANA_timezone
# step 3: query the rows
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
self.insert(client, collection_name, rows)
# step 3: query the rows
rows = cf.convert_timestamptz(rows, default_timestamp_field_name, "Asia/Shanghai")
# step 4: query the rows
new_rows = cf.convert_timestamptz(rows, default_timestamp_field_name, IANA_timezone)
self.query(client, collection_name, filter=f"{default_primary_key_field_name} >= 0",
check_task=CheckTasks.check_query_results,
check_items={exp_res: rows,
"pk_name": default_primary_key_field_name})
check_task=CheckTasks.check_query_results,
check_items={exp_res: new_rows,
"pk_name": default_primary_key_field_name})
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_alter_collection_property_after_insert(self):
"""
target: Test timestamptz can be successfully inserted and queried after alter collection properties
method:
1. Create a collection and insert the rows
2. Alter collection properties
3. Insert the rows
expected: Step 3 should result success
"""
# step 1: create collection
IANA_timezone = "America/New_York"
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_timestamp_field_name, DataType.TIMESTAMPTZ, nullable=True)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
index_params.add_index(default_timestamp_field_name, index_type="AUTOINDEX")
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
# step 2: insert the rows
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
self.insert(client, collection_name, rows)
# verify the rows are in UTC time
rows = cf.convert_timestamptz(rows, default_timestamp_field_name, "UTC")
self.query(client, collection_name, filter=f"{default_primary_key_field_name} >= 0",
check_task=CheckTasks.check_query_results,
check_items={exp_res: rows,
"pk_name": default_primary_key_field_name})
# step 3: alter collection properties
self.alter_collection_properties(client, collection_name, properties={"timezone": IANA_timezone})
prop = self.describe_collection(client, collection_name)[0].get("properties")
assert prop["timezone"] == IANA_timezone
# step 4: query the rows
new_rows = cf.convert_timestamptz(rows, default_timestamp_field_name, IANA_timezone)
self.query(client, collection_name, filter=f"{default_primary_key_field_name} >= 0",
check_task=CheckTasks.check_query_results,
check_items={exp_res: new_rows,
"pk_name": default_primary_key_field_name})
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_timestamptz_alter_two_collections_property_after_alter_database_property(self):
"""
target: Test timestamptz can be successfully inserted and queried after alter database and collection property
method:
1. Alter database property and then create 2 collections
2. Alter collection properties of the 2 collections
3. Insert the rows into the 2 collections
4. Query the rows from the 2 collections
expected: Step 4 should result success
"""
# step 1: alter database property and then create 2 collections
IANA_timezone_1 = "America/New_York"
IANA_timezone_2 = "Asia/Shanghai"
client = self._client()
db_name = cf.gen_unique_str("db")
self.create_database(client, db_name)
self.use_database(client, db_name)
collection_name1 = cf.gen_collection_name_by_testcase_name() + "_1"
collection_name2 = cf.gen_collection_name_by_testcase_name() + "_2"
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_timestamp_field_name, DataType.TIMESTAMPTZ, nullable=True)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
index_params.add_index(default_timestamp_field_name, index_type="AUTOINDEX")
self.alter_database_properties(client, db_name, properties={"timezone": IANA_timezone_1})
self.create_collection(client, collection_name1, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params, database_name=db_name)
self.create_collection(client, collection_name2, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params, database_name=db_name)
# step 2: alter collection properties of the 1 collections
prop = self.describe_collection(client, collection_name1)[0].get("properties")
assert prop["timezone"] == IANA_timezone_1
self.alter_collection_properties(client, collection_name2, properties={"timezone": IANA_timezone_2})
prop = self.describe_collection(client, collection_name2)[0].get("properties")
assert prop["timezone"] == IANA_timezone_2
self.alter_database_properties(client, db_name, properties={"timezone": "America/Los_Angeles"})
prop = self.describe_database(client, db_name)[0]
assert prop["timezone"] == "America/Los_Angeles"
# step 3: insert the rows into the 2 collections
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
self.insert(client, collection_name1, rows)
self.insert(client, collection_name2, rows)
# step 4: query the rows from the 2 collections
new_rows1 = cf.convert_timestamptz(rows, default_timestamp_field_name, IANA_timezone_1)
new_rows2 = cf.convert_timestamptz(rows, default_timestamp_field_name, IANA_timezone_2)
self.query(client, collection_name1, filter=f"{default_primary_key_field_name} >= 0",
check_task=CheckTasks.check_query_results,
check_items={exp_res: new_rows1,
"pk_name": default_primary_key_field_name})
self.query(client, collection_name2, filter=f"{default_primary_key_field_name} >= 0",
check_task=CheckTasks.check_query_results,
check_items={exp_res: new_rows2,
"pk_name": default_primary_key_field_name})
self.drop_collection(client, collection_name1)
self.drop_collection(client, collection_name2)
self.drop_database(client, db_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_timestamptz_alter_database_property_after_alter_collection_property(self):
"""
target: Test timestamptz can be successfully queried after alter database property
method:
1. Create a database and collection
2. Alter collection properties
3. Insert the rows and query the rows in UTC time
4. Alter database property
5. Query the rows and result should be the collection's timezone
expected: Step 2-5 should result success
"""
# step 1: alter collection properties and then alter database property
IANA_timezone = "America/New_York"
client = self._client()
db_name = cf.gen_unique_str("db")
self.create_database(client, db_name)
self.use_database(client, db_name)
collection_name = cf.gen_collection_name_by_testcase_name()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_timestamp_field_name, DataType.TIMESTAMPTZ, nullable=True)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
index_params.add_index(default_timestamp_field_name, index_type="AUTOINDEX")
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
# step 2: alter collection properties
self.alter_collection_properties(client, collection_name, properties={"timezone": IANA_timezone})
prop = self.describe_collection(client, collection_name)[0].get("properties")
assert prop["timezone"] == IANA_timezone
# step 3: insert the rows
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
self.insert(client, collection_name, rows)
rows = cf.convert_timestamptz(rows, default_timestamp_field_name, IANA_timezone)
self.query(client, collection_name, filter=f"{default_primary_key_field_name} >= 0",
check_task=CheckTasks.check_query_results,
check_items={exp_res: rows,
"pk_name": default_primary_key_field_name})
# step 4: alter database property
new_timezone = "Asia/Shanghai"
self.alter_database_properties(client, db_name, properties={"timezone": new_timezone})
prop = self.describe_database(client, db_name)[0]
assert prop["timezone"] == new_timezone
# step 5: query the rows
self.query(client, collection_name, filter=f"{default_primary_key_field_name} >= 0",
check_task=CheckTasks.check_query_results,
check_items={exp_res: rows,
"pk_name": default_primary_key_field_name})
self.drop_collection(client, collection_name)
self.drop_database(client, db_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_timestamptz_alter_collection_property_and_query_from_different_timezone(self):
"""
target: Test timestamptz can be successfully queried from different timezone
method:
1. Create a collection
2. Alter collection properties to America/New_York timezone
3. Insert the rows and query the rows in UTC time
4. Query the rows from the Asia/Shanghai timezone
expected: Step 4 should result success
"""
# step 1: create collection
IANA_timezone_1 = "America/New_York"
IANA_timezone_2 = "Asia/Shanghai"
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_timestamp_field_name, DataType.TIMESTAMPTZ, nullable=True)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
index_params.add_index(default_timestamp_field_name, index_type="AUTOINDEX")
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
# step 2: Alter collection properties
self.alter_collection_properties(client, collection_name, properties={"timezone": IANA_timezone_1})
prop = self.describe_collection(client, collection_name)[0].get("properties")
assert prop["timezone"] == IANA_timezone_1
# step 3: insert the rows
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
self.insert(client, collection_name, rows)
rows = cf.convert_timestamptz(rows, default_timestamp_field_name, IANA_timezone_1)
self.query(client, collection_name, filter=f"{default_primary_key_field_name} >= 0",
check_task=CheckTasks.check_query_results,
check_items={exp_res: rows,
"pk_name": default_primary_key_field_name})
# step 4: query the rows
rows = cf.convert_timestamptz(rows, default_timestamp_field_name, IANA_timezone_2)
self.query(client, collection_name, filter=f"{default_primary_key_field_name} >= 0",
check_task=CheckTasks.check_query_results,
timezone=IANA_timezone_2,
check_items={exp_res: rows,
"pk_name": default_primary_key_field_name})
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_timestamptz_edge_case(self):
"""
target: Test timestamptz can be successfully inserted and queried
@ -166,7 +440,6 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_Feb_29(self):
"""
target: Milvus raise error when input data with Feb 29
@ -205,9 +478,7 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_partial_update(self):
# BUG: https://github.com/milvus-io/milvus/issues/44527
"""
target: Test timestamptz can be successfully inserted and queried
method:
@ -250,9 +521,7 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_default_value(self):
# BUG: https://github.com/milvus-io/milvus/issues/44585
"""
target: Test timestamptz can be successfully inserted and queried with default value
method:
@ -281,7 +550,7 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
# step 3: query the rows
for row in rows:
row[default_timestamp_field_name] = "2025-01-01T00:00:00+08:00"
row[default_timestamp_field_name] = "2025-01-01T00:00:00"
rows = cf.convert_timestamptz(rows, default_timestamp_field_name, "UTC")
self.query(client, collection_name, filter=f"{default_primary_key_field_name} >= 0",
check_task=CheckTasks.check_query_results,
@ -291,9 +560,7 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_search(self):
# BUG: https://github.com/milvus-io/milvus/issues/44594
"""
target: Milvus can search with timestamptz expr
method:
@ -336,9 +603,51 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_search_group_by(self):
"""
target: test search with group by and timestamptz
method:
1. Create a collection
2. Generate rows with timestamptz and insert the rows
3. Search with group by timestamptz
expected: Step 3 should result success
"""
# step 1: create collection
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_timestamp_field_name, DataType.TIMESTAMPTZ, nullable=True)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
index_params.add_index(default_timestamp_field_name, index_type="AUTOINDEX")
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
# step 2: generate rows with timestamptz and insert the rows
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
self.insert(client, collection_name, rows)
# step 3: search with group by timestamptz
vectors_to_search = cf.gen_vectors(1, default_dim, vector_data_type=DataType.FLOAT_VECTOR)
insert_ids = [i for i in range(default_nb)]
self.search(client, collection_name, vectors_to_search,
timezone="Asia/Shanghai",
time_fields="year, month, day, hour, minute, second, microsecond",
group_by_field=default_timestamp_field_name,
check_task=CheckTasks.check_search_results,
check_items={"enable_milvus_client_api": True,
"nq": len(vectors_to_search),
"ids": insert_ids,
"pk_name": default_primary_key_field_name,
"limit": default_limit})
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_client_timestamptz_query(self):
# BUG: https://github.com/milvus-io/milvus/issues/44598
"""
target: Milvus can query with timestamptz expr
method:
@ -352,17 +661,17 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
collection_name = cf.gen_collection_name_by_testcase_name()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=3)
schema.add_field(default_timestamp_field_name, DataType.TIMESTAMPTZ, nullable=True)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
index_params.add_index(default_timestamp_field_name, index_type="AUTOINDEX")
self.create_collection(client, collection_name, default_dim, schema=schema,
self.create_collection(client, collection_name, 3, schema=schema,
consistency_level="Strong", index_params=index_params)
# step 2: generate rows with timestamptz and insert the rows
rows = [{default_primary_key_field_name: 0, default_vector_field_name: [1,2,3], default_timestamp_field_name: "0000-01-01 00:00:00"},
rows = [{default_primary_key_field_name: 0, default_vector_field_name: [1,2,3], default_timestamp_field_name: "1970-01-01 00:00:00"},
{default_primary_key_field_name: 1, default_vector_field_name: [4,5,6], default_timestamp_field_name: "2021-02-28T00:00:00Z"},
{default_primary_key_field_name: 2, default_vector_field_name: [7,8,9], default_timestamp_field_name: "2025-05-25T23:46:05"},
{default_primary_key_field_name: 3, default_vector_field_name: [10,11,12], default_timestamp_field_name:"2025-05-30T23:46:05+05:30"},
@ -371,98 +680,63 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
self.insert(client, collection_name, rows)
# step 3: query with timestamptz expr
shanghai_time_row = cf.convert_timestamptz(rows, default_timestamp_field_name, "Asia/Shanghai")
UTC_time_row = cf.convert_timestamptz(rows, default_timestamp_field_name, "UTC")
shanghai_time_row = cf.convert_timestamptz(UTC_time_row, default_timestamp_field_name, "Asia/Shanghai")
self.query(client, collection_name, filter=default_search_exp,
timezone="Asia/Shanghai",
time_fields="year, month, day, hour, minute, second, microsecond",
check_task=CheckTasks.check_query_results,
check_items={exp_res: shanghai_time_row,
"pk_name": default_primary_key_field_name})
# >=
expr = f"{default_timestamp_field_name} >= ISO '2025-05-30T23:46:05+05:30'"
self.query(client, collection_name, filter=expr,
timezone="Asia/Shanghai",
check_task=CheckTasks.check_query_results,
check_items={exp_res: shanghai_time_row,
check_items={exp_res: shanghai_time_row[3:],
"pk_name": default_primary_key_field_name})
# ==
expr = f"{default_timestamp_field_name} == ISO '9999-12-31T23:46:05Z'"
self.query(client, collection_name, filter=expr,
timezone="Asia/Shanghai",
check_task=CheckTasks.check_query_results,
check_items={exp_res: shanghai_time_row,
check_items={exp_res: [shanghai_time_row[-1]],
"pk_name": default_primary_key_field_name})
# <=
expr = f"{default_timestamp_field_name} <= ISO '2025-01-01T00:00:00+08:00'"
self.query(client, collection_name, filter=expr,
timezone="Asia/Shanghai",
check_task=CheckTasks.check_query_results,
check_items={exp_res: shanghai_time_row,
check_items={exp_res: shanghai_time_row[:2],
"pk_name": default_primary_key_field_name})
# !=
expr = f"{default_timestamp_field_name} != ISO '9999-12-31T23:46:05'"
expr = f"{default_timestamp_field_name} != ISO '9999-12-31T23:46:05Z'"
self.query(client, collection_name, filter=expr,
timezone="Asia/Shanghai",
check_task=CheckTasks.check_query_results,
check_items={exp_res: shanghai_time_row,
check_items={exp_res: shanghai_time_row[:-1],
"pk_name": default_primary_key_field_name})
# INTERVAL
expr = f"{default_timestamp_field_name} + INTERVAL 'P3D' != ISO '0000-01-02T00:00:00Z'"
expr = f"{default_timestamp_field_name} - INTERVAL 'P3D' >= ISO '1970-01-01T00:00:00Z'"
self.query(client, collection_name, filter=expr,
timezone="Asia/Shanghai",
check_task=CheckTasks.check_query_results,
check_items={exp_res: shanghai_time_row,
check_items={exp_res: shanghai_time_row[1:],
"pk_name": default_primary_key_field_name})
# lower < tz < upper
# BUG: https://github.com/milvus-io/milvus/issues/44600
expr = f"ISO '2025-01-01T00:00:00+08:00' < {default_timestamp_field_name} < ISO '2026-10-05T12:56:34+08:00'"
self.query(client, collection_name, filter=expr,
check_task=CheckTasks.check_query_results,
check_items={exp_res: shanghai_time_row,
"pk_name": default_primary_key_field_name})
# expr = f"ISO '2025-01-01T00:00:00+08:00' < {default_timestamp_field_name} < ISO '2026-10-05T12:56:34+08:00'"
# self.query(client, collection_name, filter=expr,
# check_task=CheckTasks.check_query_results,
# check_items={exp_res: shanghai_time_row,
# "pk_name": default_primary_key_field_name})
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_alter_collection(self):
"""
target: Milvus raise error when alter collection properties
method:
1. Create a collection
2. Alter collection properties
3. Query the rows
expected: Step 3 should result success
"""
# step 1: create collection
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
schema = self.create_schema(client, enable_dynamic_field=False)[0]
schema.add_field(default_primary_key_field_name, DataType.INT64, is_primary=True, auto_id=False)
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_timestamp_field_name, DataType.TIMESTAMPTZ, nullable=True)
index_params = self.prepare_index_params(client)[0]
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
index_params.add_index(default_timestamp_field_name, index_type="AUTOINDEX")
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
# step 2: alter collection properties
self.alter_collection_properties(client, collection_name, properties={"timezone": "Asia/Shanghai"})
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
self.insert(client, collection_name, rows)
# step 3: query the rows
rows = cf.convert_timestamptz(rows, default_timestamp_field_name, "Asia/Shanghai")
self.query(client, collection_name, filter=f"{default_primary_key_field_name} >= 0",
check_task=CheckTasks.check_query_results,
check_items={exp_res: rows,
"pk_name": default_primary_key_field_name})
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_add_collection_field(self):
# BUG: https://github.com/milvus-io/milvus/issues/44527
"""
target: Milvus raise error when add collection field with timestamptz
method:
@ -516,7 +790,7 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
desired_field_names=[default_primary_key_field_name, default_timestamp_field_name])
self.upsert(client, collection_name, pu_rows, partial_update=True)
pu_rows = cf.convert_timestamptz(pu_rows, default_timestamp_field_name, "UTC")
self.query(client, collection_name, filter=f"0 <= {default_primary_key_field_name} <= {default_nb}",
self.query(client, collection_name, filter=f"0 <= {default_primary_key_field_name} < {default_nb}",
check_task=CheckTasks.check_query_results,
output_fields=[default_timestamp_field_name],
check_items={exp_res: pu_rows,
@ -525,7 +799,6 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_add_field_compaction(self):
"""
target: test compaction with added timestamptz field
@ -534,7 +807,8 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
2. insert rows
3. add field with timestamptz
4. compact
expected: Step 4 should success
5. query the rows
expected: Step 4 and Step 5 should success
"""
# step 1: create collection
client = self._client()
@ -573,12 +847,28 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
if time.time() - start > cost:
raise Exception(1, f"Compact after index cost more than {cost}s")
# step 5: query the rows
# first release the collection
self.release_collection(client, collection_name)
# then load the collection
self.load_collection(client, collection_name)
# then query the rows
for row in rows:
row[default_timestamp_field_name] = None
self.query(client, collection_name, filter=f"0 <= {default_primary_key_field_name} < {default_nb}",
check_task=CheckTasks.check_query_results,
check_items={exp_res: rows,
"pk_name": default_primary_key_field_name})
new_rows = cf.convert_timestamptz(new_rows, default_timestamp_field_name, "UTC")
self.query(client, collection_name, filter=f"{default_primary_key_field_name} >= {default_nb}",
check_task=CheckTasks.check_query_results,
check_items={exp_res: new_rows,
"pk_name": default_primary_key_field_name})
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_add_field_search(self):
# BUG: https://github.com/milvus-io/milvus/issues/44622
"""
target: test add field with timestamptz and search
method:
@ -608,7 +898,7 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
self.add_collection_field(client, collection_name, field_name=default_timestamp_field_name, data_type=DataType.TIMESTAMPTZ,
nullable=True)
schema.add_field(default_timestamp_field_name, DataType.TIMESTAMPTZ, nullable=True)
index_params.add_index(default_timestamp_field_name, index_type="STL_SORT")
index_params.add_index(default_timestamp_field_name, index_type="AUTOINDEX")
self.create_index(client, collection_name, index_params=index_params)
# step 4: search the rows
@ -623,7 +913,9 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
filter=f"{default_timestamp_field_name} is null",
check_task=CheckTasks.check_search_results,
check_items=check_items)
new_rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
self.insert(client, collection_name, new_rows)
self.search(client, collection_name, vectors_to_search,
filter=f"{default_timestamp_field_name} is not null",
check_task=CheckTasks.check_search_results,
@ -632,7 +924,6 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_add_field_with_default_value(self):
"""
target: Milvus raise error when add field with timestamptz and default value
@ -655,7 +946,7 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
consistency_level="Strong", index_params=index_params)
# step 2: add field with timestamptz and default value
default_timestamp_value = "2025-01-01T00:00:00"
default_timestamp_value = "2025-01-01T00:00:00Z"
rows = cf.gen_row_data_by_schema(nb=default_nb, schema=schema)
self.insert(client, collection_name, rows)
self.add_collection_field(client, collection_name, field_name=default_timestamp_field_name, data_type=DataType.TIMESTAMPTZ,
@ -674,7 +965,6 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_add_another_timestamptz_field(self):
"""
target: Milvus raise error when add another timestamptz field
@ -726,7 +1016,6 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_insert_delete_upsert_with_flush(self):
"""
target: test insert, delete, upsert with flush on timestamptz
@ -783,9 +1072,7 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_insert_upsert_flush_delete_upsert_flush(self):
# BUG: blocked by partial update
"""
target: test insert, upsert, flush, delete, upsert with flush on timestamptz
method:
@ -840,7 +1127,6 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
"pk_name": default_primary_key_field_name})
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_read_from_different_client(self):
"""
target: test read from different client in different timezone
@ -869,9 +1155,10 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
self.insert(client, collection_name, rows)
# step 3: query the rows from different client in different timezone
client2 = self._client()
shanghai_rows = cf.convert_timestamptz(rows, default_timestamp_field_name, "Asia/Shanghai")
LA_rows = cf.convert_timestamptz(rows, default_timestamp_field_name, "America/Los_Angeles")
client2 = self._client(alias="client2_alias")
UTC_time_row = cf.convert_timestamptz(rows, default_timestamp_field_name, "UTC")
shanghai_rows = cf.convert_timestamptz(UTC_time_row, default_timestamp_field_name, "Asia/Shanghai")
LA_rows = cf.convert_timestamptz(UTC_time_row, default_timestamp_field_name, "America/Los_Angeles")
result_1 = self.query(client, collection_name, filter=f"{default_primary_key_field_name} >= 0",
check_task=CheckTasks.check_query_results,
timezone="Asia/Shanghai",
@ -887,7 +1174,6 @@ class TestMilvusClientTimestamptzValid(TestMilvusClientV2Base):
self.drop_collection(client, collection_name)
class TestMilvusClientTimestamptzInvalid(TestMilvusClientV2Base):
"""
@ -896,9 +1182,7 @@ class TestMilvusClientTimestamptzInvalid(TestMilvusClientV2Base):
******************************************************************
"""
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_input_data_invalid_time_format(self):
# BUG: https://github.com/milvus-io/milvus/issues/44537
"""
target: Milvus raise error when input data with invalid time format
method:
@ -933,7 +1217,8 @@ class TestMilvusClientTimestamptzInvalid(TestMilvusClientV2Base):
# step 3: query the rows
for row in rows:
error = {ct.err_code: 1, ct.err_msg: f"got invalid timestamptz string: {row[default_timestamp_field_name]}"}
print(row[default_timestamp_field_name])
error = {ct.err_code: 1100, ct.err_msg: f"got invalid timestamptz string '{row[default_timestamp_field_name]}': invalid timezone name; must be a valid IANA Time Zone ID (e.g., 'Asia/Shanghai' or 'UTC'): invalid parameter"}
self.insert(client, collection_name, row,
check_task=CheckTasks.err_res,
check_items=error)
@ -941,7 +1226,6 @@ class TestMilvusClientTimestamptzInvalid(TestMilvusClientV2Base):
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_wrong_index_type(self):
"""
target: Milvus raise error when input data with wrong index type
@ -960,11 +1244,13 @@ class TestMilvusClientTimestamptzInvalid(TestMilvusClientV2Base):
index_params.add_index(default_primary_key_field_name, index_type="AUTOINDEX")
index_params.add_index(default_vector_field_name, index_type="AUTOINDEX")
index_params.add_index(default_timestamp_field_name, index_type="INVERTED")
error = {ct.err_code: 1100, ct.err_msg: "INVERTED are not supported on Timestamptz field: invalid parameter[expected=valid index params][actual=invalid index params]"}
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong", index_params=index_params)
consistency_level="Strong", index_params=index_params,
check_task=CheckTasks.err_res,
check_items=error)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_wrong_default_value(self):
"""
target: Milvus raise error when input data with wrong default value
@ -981,7 +1267,7 @@ class TestMilvusClientTimestamptzInvalid(TestMilvusClientV2Base):
schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field(default_timestamp_field_name, DataType.TIMESTAMPTZ, nullable=True, default_value="timestamp")
error = {ct.err_code: 1100, ct.err_msg: "type (Timestamptz) of field (timestamp) is not equal to the type(DataType_VarChar) of default_value: invalid parameter"}
error = {ct.err_code: 65536, ct.err_msg: "invalid timestamp string: 'timestamp'. Does not match any known format"}
self.create_collection(client, collection_name, default_dim, schema=schema,
consistency_level="Strong",
check_task=CheckTasks.err_res, check_items=error)
@ -992,15 +1278,14 @@ class TestMilvusClientTimestamptzInvalid(TestMilvusClientV2Base):
new_schema.add_field(default_vector_field_name, DataType.FLOAT_VECTOR, dim=default_dim)
new_schema.add_field(default_timestamp_field_name, DataType.TIMESTAMPTZ, nullable=True, default_value=10)
error = {ct.err_code: 1100, ct.err_msg: "type (Timestamptz) of field (timestamp) is not equal to the type(DataType_VarChar) of default_value: invalid parameter"}
self.create_collection(client, collection_name, default_dim, schema=schema,
error = {ct.err_code: 65536, ct.err_msg: "type (Timestamptz) of field (timestamp) is not equal to the type(DataType_Int64) of default_value: invalid parameter"}
self.create_collection(client, collection_name, default_dim, schema=new_schema,
consistency_level="Strong",
check_task=CheckTasks.err_res, check_items=error)
self.drop_collection(client, collection_name)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="timesptamptz is not ready")
def test_milvus_client_timestamptz_add_field_not_nullable(self):
"""
target: Milvus raise error when add non-nullable timestamptz field

View File

@ -1,6 +1,6 @@
[pytest]
addopts = --host localhost --html=/tmp/ci_logs/report.html --self-contained-html -v --log-cli-level INFO
addopts = --host localhost --html=/tmp/ci_logs/report.html --self-contained-html -v
# python3 -W ignore -m pytest
log_format = [%(asctime)s - %(levelname)s - %(name)s]: %(message)s (%(filename)s:%(lineno)s)

View File

@ -789,6 +789,8 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT, nullable=nullable),
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100, nullable=nullable),
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL, nullable=nullable),
cf.gen_geometry_field(name=df.geo_field),
cf.gen_timestamptz_field(name=df.timestamp_field, nullable=nullable),
cf.gen_float_vec_field(name=df.float_vec_field, dim=float_vec_field_dim),
cf.gen_binary_vec_field(name=df.binary_vec_field, dim=binary_vec_field_dim),
cf.gen_bfloat16_vec_field(name=df.bf16_vec_field, dim=bf16_vec_field_dim),
@ -984,6 +986,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_string_field(name=df.string_field, is_partition_key=enable_partition_key),
cf.gen_string_field(name=df.text_field, enable_analyzer=True, enable_match=True, nullable=nullable),
cf.gen_json_field(name=df.json_field),
cf.gen_geometry_field(name=df.geo_field),
cf.gen_float_vec_field(name=df.float_vec_field, dim=float_vec_field_dim),
cf.gen_binary_vec_field(name=df.binary_vec_field, dim=binary_vec_field_dim),
cf.gen_bfloat16_vec_field(name=df.bf16_vec_field, dim=bf16_vec_field_dim),
@ -1165,6 +1168,8 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT, nullable=nullable),
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100, nullable=nullable),
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL, nullable=nullable),
cf.gen_geometry_field(name=df.geo_field),
cf.gen_timestamptz_field(name=df.timestamp_field, nullable=nullable),
cf.gen_float_vec_field(name=df.float_vec_field, dim=float_vec_field_dim),
cf.gen_binary_vec_field(name=df.binary_vec_field, dim=binary_vec_field_dim),
cf.gen_bfloat16_vec_field(name=df.bf16_vec_field, dim=bf16_vec_field_dim),
@ -2188,6 +2193,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_float_field(name=df.float_field, nullable=nullable),
cf.gen_string_field(name=df.string_field, nullable=nullable),
cf.gen_json_field(name=df.json_field, nullable=nullable),
cf.gen_timestamptz_field(name=df.timestamp_field, nullable=nullable),
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64, nullable=nullable),
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT, nullable=nullable),
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100, nullable=nullable),
@ -2224,6 +2230,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
df.float_field: 1.0 if not (nullable and random.random() < 0.5) else None,
df.string_field: "string" if not (nullable and random.random() < 0.5) else None,
df.json_field: json_value[i%len(json_value)] if not (nullable and random.random() < 0.5) else None,
df.timestamp_field: cf.gen_timestamptz_str() if not (nullable and random.random() < 0.5) else None,
df.array_int_field: [1, 2] if not (nullable and random.random() < 0.5) else None,
df.array_float_field: [1.0, 2.0] if not (nullable and random.random() < 0.5) else None,
df.array_string_field: ["string1", "string2"] if not (nullable and random.random() < 0.5) else None,

View File

@ -1020,6 +1020,7 @@ class TestCreateImportJob(TestBase):
time.sleep(10)
c_restore = Collection(restore_collection_name)
# since we import both original and sorted segments, the number of entities should be 2x
time.sleep(10)
logger.info(f"c.num_entities: {c.num_entities}, c_restore.num_entities: {c_restore.num_entities}")
assert c.num_entities*2 == c_restore.num_entities