test: add ST_ISVALID geometry function test cases (#46232)

/kind improvement

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2025-12-11 13:47:21 +08:00 committed by GitHub
parent b2c49d0197
commit 75d6f0d509
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -3249,6 +3249,193 @@ class TestMilvusClientGeometryBasic(TestMilvusClientV2Base):
"Larger distances should return same or more results"
)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("with_geo_index", [True, False])
def test_st_isvalid_with_invalid_geometries(self, with_geo_index):
"""
target: test ST_ISVALID operator can detect invalid geometries
method: insert both valid and invalid geometries (self-intersecting polygons), use ST_ISVALID to filter
expected: ST_ISVALID returns only valid geometries, NOT ST_ISVALID returns invalid geometries
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# Create collection with geometry field
schema, _ = self.create_schema(client, auto_id=False)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field("geo", DataType.GEOMETRY)
self.create_collection(client, collection_name, schema=schema)
# Valid geometries
valid_geometries = [
("POINT (10 20)", True),
("LINESTRING (0 0, 10 10, 20 20)", True),
("POLYGON ((0 0, 10 0, 10 10, 0 10, 0 0))", True), # Simple valid polygon
]
# Invalid geometries - self-intersecting polygons (bowtie/figure-8 shape)
invalid_geometries = [
("POLYGON ((0 0, 10 10, 10 0, 0 10, 0 0))", False), # Self-intersecting (bowtie)
("POLYGON ((0 0, 20 10, 10 0, 20 0, 0 10, 0 0))", False), # Another self-intersecting
]
all_geometries = valid_geometries + invalid_geometries
data = []
for i, (geo_wkt, is_valid) in enumerate(all_geometries):
data.append({
"id": i,
"vector": [random.random() for _ in range(default_dim)],
"geo": geo_wkt
})
self.insert(client, collection_name, data)
self.flush(client, collection_name)
# Create indexes
index_params, _ = self.prepare_index_params(client)
index_params.add_index(
field_name="vector", index_type="IVF_FLAT", metric_type="L2", nlist=128
)
if with_geo_index:
index_params.add_index(field_name="geo", index_type="RTREE")
self.create_index(client, collection_name, index_params=index_params)
self.load_collection(client, collection_name)
# Test ST_ISVALID - should return only valid geometries
results_valid, _ = self.query(
client,
collection_name=collection_name,
filter="ST_ISVALID(geo)",
output_fields=["id", "geo"],
)
valid_ids = {i for i, (_, is_valid) in enumerate(all_geometries) if is_valid}
result_ids = {r["id"] for r in results_valid}
assert result_ids == valid_ids, (
f"ST_ISVALID should return valid geometry IDs {valid_ids}, "
f"got {result_ids} (index={with_geo_index})"
)
# Test NOT ST_ISVALID - should return only invalid geometries
results_invalid, _ = self.query(
client,
collection_name=collection_name,
filter="not ST_ISVALID(geo)",
output_fields=["id", "geo"],
)
invalid_ids = {i for i, (_, is_valid) in enumerate(all_geometries) if not is_valid}
result_invalid_ids = {r["id"] for r in results_invalid}
assert result_invalid_ids == invalid_ids, (
f"NOT ST_ISVALID should return invalid geometry IDs {invalid_ids}, "
f"got {result_invalid_ids} (index={with_geo_index})"
)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("with_geo_index", [True, False])
def test_st_isvalid_basic(self, with_geo_index):
"""
target: test ST_ISVALID operator basic functionality
method: insert valid WKT geometries, query using ST_ISVALID, test NOT ST_ISVALID and combined conditions
expected: ST_ISVALID returns true for all valid geometries, correct filtering with logical operators
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# Create collection with geometry field and category
schema, _ = self.create_schema(client, auto_id=False)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field("geo", DataType.GEOMETRY)
schema.add_field("category", DataType.INT64)
self.create_collection(client, collection_name, schema=schema)
# Insert valid geometry data of various types
valid_geometries = [
"POINT (10.5 20.3)",
"LINESTRING (0 0, 10 10, 20 25)",
"POLYGON ((0 0, 10 0, 10 10, 0 10, 0 0))",
"MULTIPOINT ((0 0), (10 10), (20 20))",
"MULTILINESTRING ((0 0, 10 10), (20 20, 30 30))",
"MULTIPOLYGON (((0 0, 5 0, 5 5, 0 5, 0 0)), ((10 10, 15 10, 15 15, 10 15, 10 10)))",
"GEOMETRYCOLLECTION (POINT(10 10), LINESTRING(0 0, 5 5))",
]
data = []
for i, geo_wkt in enumerate(valid_geometries):
data.append({
"id": i,
"vector": [random.random() for _ in range(default_dim)],
"geo": geo_wkt,
"category": i % 2
})
self.insert(client, collection_name, data)
self.flush(client, collection_name)
# Create indexes
index_params, _ = self.prepare_index_params(client)
index_params.add_index(
field_name="vector", index_type="IVF_FLAT", metric_type="L2", nlist=128
)
if with_geo_index:
index_params.add_index(field_name="geo", index_type="RTREE")
self.create_index(client, collection_name, index_params=index_params)
self.load_collection(client, collection_name)
# Test 1: ST_ISVALID returns all valid geometries
results, _ = self.query(
client,
collection_name=collection_name,
filter="ST_ISVALID(geo)",
output_fields=["id", "geo"],
)
assert len(results) == len(valid_geometries), (
f"ST_ISVALID should return all {len(valid_geometries)} valid geometries, "
f"got {len(results)} (index={with_geo_index})"
)
# Test 2: NOT ST_ISVALID returns empty for all valid geometries
results_not, _ = self.query(
client,
collection_name=collection_name,
filter="not ST_ISVALID(geo)",
output_fields=["id", "geo"],
)
assert len(results_not) == 0, (
f"NOT ST_ISVALID should return 0 results, got {len(results_not)} (index={with_geo_index})"
)
# Test 3: ST_ISVALID combined with other condition
results_and, _ = self.query(
client,
collection_name=collection_name,
filter="ST_ISVALID(geo) and category == 1",
output_fields=["id", "geo", "category"],
)
expected_category_1 = {i for i in range(len(valid_geometries)) if i % 2 == 1}
result_ids = {r["id"] for r in results_and}
assert result_ids == expected_category_1, (
f"ST_ISVALID AND category==1 should return IDs {expected_category_1}, "
f"got {result_ids} (index={with_geo_index})"
)
# Test 4: Case insensitive (lowercase)
results_lower, _ = self.query(
client,
collection_name=collection_name,
filter="st_isvalid(geo)",
output_fields=["id"],
)
assert len(results_lower) == len(valid_geometries), (
f"st_isvalid (lowercase) should return all records (index={with_geo_index})"
)
@ -3868,3 +4055,90 @@ class TestMilvusClientGeometryNegative(TestMilvusClientV2Base):
check_task=CheckTasks.err_res,
check_items=error_invalid_insert,
)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("with_geo_index", [True, False])
def test_st_isvalid_invalid_usage(self, with_geo_index):
"""
target: test ST_ISVALID error handling for invalid usage
method: test ST_ISVALID on non-geometry fields, with wrong parameters, and nonexistent fields
expected: should raise appropriate errors
"""
client = self._client()
collection_name = cf.gen_collection_name_by_testcase_name()
# Create collection with geometry and non-geometry fields
schema, _ = self.create_schema(client, auto_id=False)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=default_dim)
schema.add_field("geo", DataType.GEOMETRY)
schema.add_field("int_field", DataType.INT64)
self.create_collection(client, collection_name, schema=schema)
# Insert test data
data = [{
"id": 0,
"vector": [random.random() for _ in range(default_dim)],
"geo": "POINT (10 20)",
"int_field": 100
}]
self.insert(client, collection_name, data)
self.flush(client, collection_name)
# Create indexes
index_params, _ = self.prepare_index_params(client)
index_params.add_index(
field_name="vector", index_type="IVF_FLAT", metric_type="L2", nlist=128
)
if with_geo_index:
index_params.add_index(field_name="geo", index_type="RTREE")
self.create_index(client, collection_name, index_params=index_params)
self.load_collection(client, collection_name)
error = {
ct.err_code: 1100,
ct.err_msg: "failed to create query plan: cannot parse expression",
}
# Test 1: ST_ISVALID on non-geometry field
self.query(
client,
collection_name=collection_name,
filter="ST_ISVALID(int_field)",
output_fields=["id"],
check_task=CheckTasks.err_res,
check_items=error,
)
# Test 2: ST_ISVALID with no parameters
self.query(
client,
collection_name=collection_name,
filter="ST_ISVALID()",
output_fields=["id", "geo"],
check_task=CheckTasks.err_res,
check_items=error,
)
# Test 3: ST_ISVALID with too many parameters
self.query(
client,
collection_name=collection_name,
filter="ST_ISVALID(geo, 1)",
output_fields=["id", "geo"],
check_task=CheckTasks.err_res,
check_items=error,
)
# Test 4: ST_ISVALID with non-existent field
self.query(
client,
collection_name=collection_name,
filter="ST_ISVALID(nonexistent_geo_field)",
output_fields=["id", "geo"],
check_task=CheckTasks.err_res,
check_items=error,
)