From 5f8daa0f6df4898219a7122f7201b208ce80e495 Mon Sep 17 00:00:00 2001 From: zhuwenxing Date: Mon, 15 Dec 2025 15:45:15 +0800 Subject: [PATCH] test: Add geometry operations test suite for RESTful API (#46174) /kind improvement --------- Signed-off-by: zhuwenxing --- tests/restful_client_v2/requirements.txt | 6 +- .../testcases/test_geometry_operations.py | 724 ++++++++++++++++++ tests/restful_client_v2/utils/utils.py | 409 ++++++++++ 3 files changed, 1137 insertions(+), 2 deletions(-) create mode 100644 tests/restful_client_v2/testcases/test_geometry_operations.py diff --git a/tests/restful_client_v2/requirements.txt b/tests/restful_client_v2/requirements.txt index 6ceb37d2b3..14bfea9399 100644 --- a/tests/restful_client_v2/requirements.txt +++ b/tests/restful_client_v2/requirements.txt @@ -6,7 +6,7 @@ pyyaml==6.0 numpy==1.24.3 allure-pytest>=2.8.18 Faker==19.2.0 -pymilvus==2.5.0rc108 +pymilvus scikit-learn>=1.5.2 pytest-xdist==2.5.0 minio==7.2.0 @@ -16,4 +16,6 @@ ml-dtypes==0.2.0 loguru==0.7.3 bm25s==0.2.13 jieba==0.42.1 -pyarrow==21.0.0 \ No newline at end of file +pyarrow==21.0.0 +# for geometry data type +shapely>=2.0.0 \ No newline at end of file diff --git a/tests/restful_client_v2/testcases/test_geometry_operations.py b/tests/restful_client_v2/testcases/test_geometry_operations.py new file mode 100644 index 0000000000..3116055220 --- /dev/null +++ b/tests/restful_client_v2/testcases/test_geometry_operations.py @@ -0,0 +1,724 @@ +import random +import pytest +import numpy as np +from sklearn import preprocessing +from base.testbase import TestBase +from utils.utils import gen_collection_name, generate_wkt_by_type +from utils.util_log import test_log as logger + + +default_dim = 128 + + +@pytest.mark.L0 +class TestGeometryCollection(TestBase): + """Test geometry collection operations""" + + def test_create_collection_with_geometry_field(self): + """ + target: test create collection with geometry field + method: create collection with geometry field using schema + expected: create collection successfully + """ + name = gen_collection_name() + payload = { + "collectionName": name, + "schema": { + "autoId": False, + "enableDynamicField": True, + "fields": [ + {"fieldName": "id", "dataType": "Int64", "isPrimary": True}, + {"fieldName": "vector", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{default_dim}"}}, + {"fieldName": "geo", "dataType": "Geometry"} + ] + }, + "indexParams": [ + {"fieldName": "vector", "indexName": "vector_idx", "metricType": "L2"} + ] + } + rsp = self.collection_client.collection_create(payload) + assert rsp['code'] == 0 + # Verify collection exists + rsp = self.collection_client.collection_describe(name) + assert rsp['code'] == 0 + logger.info(f"Collection created: {rsp}") + + @pytest.mark.parametrize("wkt_type", [ + "POINT", + "LINESTRING", + "POLYGON", + "MULTIPOINT", + "MULTILINESTRING", + "MULTIPOLYGON", + "GEOMETRYCOLLECTION" + ]) + def test_insert_wkt_data(self, wkt_type): + """ + target: test insert various WKT geometry types + method: generate and insert different WKT geometry data + expected: insert successfully + """ + name = gen_collection_name() + payload = { + "collectionName": name, + "schema": { + "autoId": False, + "enableDynamicField": True, + "fields": [ + {"fieldName": "id", "dataType": "Int64", "isPrimary": True}, + {"fieldName": "vector", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{default_dim}"}}, + {"fieldName": "geo", "dataType": "Geometry"} + ] + }, + "indexParams": [ + {"fieldName": "vector", "indexName": "vector_idx", "metricType": "L2"} + ] + } + rsp = self.collection_client.collection_create(payload) + assert rsp['code'] == 0 + + # Generate WKT data + nb = 100 + wkt_data = generate_wkt_by_type(wkt_type, bounds=(0, 100, 0, 100), count=nb) + data = [] + for i, wkt in enumerate(wkt_data): + data.append({ + "id": i, + "vector": preprocessing.normalize([np.array([random.random() for _ in range(default_dim)])])[0].tolist(), + "geo": wkt + }) + + # Insert data + insert_payload = { + "collectionName": name, + "data": data + } + rsp = self.vector_client.vector_insert(insert_payload) + assert rsp['code'] == 0 + assert rsp['data']['insertCount'] == nb + logger.info(f"Inserted {nb} {wkt_type} geometries") + + @pytest.mark.parametrize("index_type", ["RTREE", "AUTOINDEX"]) + def test_build_geometry_index(self, index_type): + """ + target: test build geometry index on geometry field + method: create geometry index on geometry field + expected: build index successfully + """ + name = gen_collection_name() + payload = { + "collectionName": name, + "schema": { + "autoId": False, + "enableDynamicField": True, + "fields": [ + {"fieldName": "id", "dataType": "Int64", "isPrimary": True}, + {"fieldName": "vector", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{default_dim}"}}, + {"fieldName": "geo", "dataType": "Geometry"} + ] + }, + "indexParams": [ + {"fieldName": "vector", "indexName": "vector_idx", "metricType": "L2"}, + {"fieldName": "geo", "indexName": "geo_idx", "indexType": index_type} + ] + } + rsp = self.collection_client.collection_create(payload) + assert rsp['code'] == 0 + + # Insert some geometry data + nb = 50 + data = [] + for i in range(nb): + x = random.uniform(0, 100) + y = random.uniform(0, 100) + data.append({ + "id": i, + "vector": preprocessing.normalize([np.array([random.random() for _ in range(default_dim)])])[0].tolist(), + "geo": f"POINT ({x:.2f} {y:.2f})" + }) + + insert_payload = { + "collectionName": name, + "data": data + } + rsp = self.vector_client.vector_insert(insert_payload) + assert rsp['code'] == 0 + + # Load collection + self.wait_collection_load_completed(name) + + # Verify index + rsp = self.index_client.index_list(name) + assert rsp['code'] == 0 + logger.info(f"Indexes: {rsp}") + + @pytest.mark.parametrize("spatial_func", [ + "ST_INTERSECTS", + "ST_CONTAINS", + "ST_WITHIN", + "ST_EQUALS", + "ST_TOUCHES", + "ST_OVERLAPS", + "ST_CROSSES" + ]) + @pytest.mark.parametrize("data_state", ["sealed", "growing", "sealed_and_growing"]) + @pytest.mark.parametrize("with_geo_index", [True, False]) + @pytest.mark.parametrize("nullable", [True, False]) + def test_spatial_query_and_search(self, spatial_func, data_state, with_geo_index, nullable): + """ + target: test spatial query and search with geometry filter + method: query and search geometry data using spatial operators on sealed/growing data + expected: query and search execute successfully (with or without geo index, nullable or not) + """ + name = gen_collection_name() + index_params = [{"fieldName": "vector", "indexName": "vector_idx", "metricType": "L2"}] + if with_geo_index: + index_params.append({"fieldName": "geo", "indexName": "geo_idx", "indexType": "RTREE"}) + + geo_field = {"fieldName": "geo", "dataType": "Geometry"} + if nullable: + geo_field["nullable"] = True + + payload = { + "collectionName": name, + "schema": { + "autoId": False, + "enableDynamicField": True, + "fields": [ + {"fieldName": "id", "dataType": "Int64", "isPrimary": True}, + {"fieldName": "vector", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{default_dim}"}}, + geo_field + ] + }, + "indexParams": index_params + } + rsp = self.collection_client.collection_create(payload) + assert rsp['code'] == 0 + + nb = 100 + + # Define query geometry and matching data based on spatial function + # Each spatial function needs specific data patterns to guarantee matches + if spatial_func == "ST_INTERSECTS": + # Query: large polygon covering center area + # Data: points and polygons inside the query area will intersect + query_geom = "POLYGON ((20 20, 80 20, 80 80, 20 80, 20 20))" + + def generate_geo_data(start_id, count): + data = [] + for i in range(count): + # Generate points inside query polygon (25-75 range) + x = 25 + (i % 10) * 5 + y = 25 + (i // 10) * 5 + item = { + "id": start_id + i, + "vector": preprocessing.normalize([np.array([random.random() for _ in range(default_dim)])])[0].tolist(), + } + if nullable and i % 5 == 0: + item["geo"] = None + elif i % 2 == 0: + item["geo"] = f"POINT ({x:.2f} {y:.2f})" + else: + # Small polygon inside query area + item["geo"] = f"POLYGON (({x:.2f} {y:.2f}, {x + 3:.2f} {y:.2f}, {x + 3:.2f} {y + 3:.2f}, {x:.2f} {y + 3:.2f}, {x:.2f} {y:.2f}))" + data.append(item) + return data + + elif spatial_func == "ST_CONTAINS": + # ST_CONTAINS(geo, query_geom) - data geometry contains query geometry + # Data: large polygons that contain the query point + # Query: small point that is inside the data polygons + query_geom = "POINT (50.00 50.00)" + + def generate_geo_data(start_id, count): + data = [] + for i in range(count): + item = { + "id": start_id + i, + "vector": preprocessing.normalize([np.array([random.random() for _ in range(default_dim)])])[0].tolist(), + } + if nullable and i % 5 == 0: + item["geo"] = None + else: + # Large polygons that contain the point (50, 50) + # Create polygons centered around (50, 50) with varying sizes + size = 20 + (i % 5) * 10 # sizes: 20, 30, 40, 50, 60 + x1 = 50 - size + y1 = 50 - size + x2 = 50 + size + y2 = 50 + size + item["geo"] = f"POLYGON (({x1} {y1}, {x2} {y1}, {x2} {y2}, {x1} {y2}, {x1} {y1}))" + data.append(item) + return data + + elif spatial_func == "ST_WITHIN": + # ST_WITHIN(geo, query_geom) - data geometry is within query geometry + # Same as ST_CONTAINS but reversed semantics + query_geom = "POLYGON ((10 10, 90 10, 90 90, 10 90, 10 10))" + + def generate_geo_data(start_id, count): + data = [] + for i in range(count): + x = 20 + (i % 10) * 6 + y = 20 + (i // 10) * 6 + item = { + "id": start_id + i, + "vector": preprocessing.normalize([np.array([random.random() for _ in range(default_dim)])])[0].tolist(), + } + if nullable and i % 5 == 0: + item["geo"] = None + else: + item["geo"] = f"POINT ({x:.2f} {y:.2f})" + data.append(item) + return data + + elif spatial_func == "ST_EQUALS": + # ST_EQUALS requires exact geometry match + # Insert known points and query with one of them + query_geom = "POINT (50.00 50.00)" + + def generate_geo_data(start_id, count): + data = [] + for i in range(count): + item = { + "id": start_id + i, + "vector": preprocessing.normalize([np.array([random.random() for _ in range(default_dim)])])[0].tolist(), + } + if nullable and i % 5 == 0: + item["geo"] = None + elif i % 10 == 0: + # Every 10th record has the exact query point + item["geo"] = "POINT (50.00 50.00)" + else: + x = 20 + (i % 10) * 6 + y = 20 + (i // 10) * 6 + item["geo"] = f"POINT ({x:.2f} {y:.2f})" + data.append(item) + return data + + elif spatial_func == "ST_TOUCHES": + # ST_TOUCHES: geometries touch at boundary but don't overlap interiors + # Query polygon and data polygons that share edges + query_geom = "POLYGON ((50 50, 60 50, 60 60, 50 60, 50 50))" + + def generate_geo_data(start_id, count): + data = [] + for i in range(count): + item = { + "id": start_id + i, + "vector": preprocessing.normalize([np.array([random.random() for _ in range(default_dim)])])[0].tolist(), + } + if nullable and i % 5 == 0: + item["geo"] = None + elif i % 4 == 0: + # Polygon touching right edge of query (starts at x=60) + item["geo"] = "POLYGON ((60 50, 70 50, 70 60, 60 60, 60 50))" + elif i % 4 == 1: + # Polygon touching top edge of query (starts at y=60) + item["geo"] = "POLYGON ((50 60, 60 60, 60 70, 50 70, 50 60))" + elif i % 4 == 2: + # Point on edge of query polygon + item["geo"] = "POINT (55.00 50.00)" + else: + # Point on corner + item["geo"] = "POINT (50.00 50.00)" + data.append(item) + return data + + elif spatial_func == "ST_OVERLAPS": + # ST_OVERLAPS: geometries overlap but neither contains the other (same dimension) + # Need polygons that partially overlap + query_geom = "POLYGON ((40 40, 60 40, 60 60, 40 60, 40 40))" + + def generate_geo_data(start_id, count): + data = [] + for i in range(count): + item = { + "id": start_id + i, + "vector": preprocessing.normalize([np.array([random.random() for _ in range(default_dim)])])[0].tolist(), + } + if nullable and i % 5 == 0: + item["geo"] = None + else: + # Polygons that partially overlap with query + # Shifted to overlap but not contain/be contained + offset = (i % 4) * 5 + if i % 2 == 0: + # Overlapping from right side + item["geo"] = f"POLYGON (({50 + offset} 45, {70 + offset} 45, {70 + offset} 55, {50 + offset} 55, {50 + offset} 45))" + else: + # Overlapping from bottom + item["geo"] = f"POLYGON ((45 {50 + offset}, 55 {50 + offset}, 55 {70 + offset}, 45 {70 + offset}, 45 {50 + offset}))" + data.append(item) + return data + + elif spatial_func == "ST_CROSSES": + # ST_CROSSES: geometries cross (line crosses polygon interior) + # Query with a line, data has polygons that the line passes through + query_geom = "LINESTRING (0 50, 100 50)" + + def generate_geo_data(start_id, count): + data = [] + for i in range(count): + item = { + "id": start_id + i, + "vector": preprocessing.normalize([np.array([random.random() for _ in range(default_dim)])])[0].tolist(), + } + if nullable and i % 5 == 0: + item["geo"] = None + else: + # Polygons that the horizontal line y=50 crosses through + x = 10 + (i % 10) * 8 + # Polygon spanning y=40 to y=60, so line y=50 crosses it + item["geo"] = f"POLYGON (({x} 40, {x + 5} 40, {x + 5} 60, {x} 60, {x} 40))" + data.append(item) + return data + else: + query_geom = "POLYGON ((20 20, 80 20, 80 80, 20 80, 20 20))" + + def generate_geo_data(start_id, count): + data = [] + for i in range(count): + x = 30 + (i % 10) * 4 + y = 30 + (i // 10) * 4 + item = { + "id": start_id + i, + "vector": preprocessing.normalize([np.array([random.random() for _ in range(default_dim)])])[0].tolist(), + } + if nullable and i % 5 == 0: + item["geo"] = None + else: + item["geo"] = f"POINT ({x:.2f} {y:.2f})" + data.append(item) + return data + + # Insert data based on data_state + if data_state == "sealed": + data = generate_geo_data(0, nb) + insert_payload = {"collectionName": name, "data": data} + rsp = self.vector_client.vector_insert(insert_payload) + assert rsp['code'] == 0 + rsp = self.collection_client.flush(name) + self.wait_collection_load_completed(name) + + elif data_state == "growing": + self.wait_collection_load_completed(name) + data = generate_geo_data(0, nb) + insert_payload = {"collectionName": name, "data": data} + rsp = self.vector_client.vector_insert(insert_payload) + assert rsp['code'] == 0 + + else: # sealed_and_growing + sealed_data = generate_geo_data(0, nb // 2) + insert_payload = {"collectionName": name, "data": sealed_data} + rsp = self.vector_client.vector_insert(insert_payload) + assert rsp['code'] == 0 + rsp = self.collection_client.flush(name) + self.wait_collection_load_completed(name) + growing_data = generate_geo_data(nb // 2, nb // 2) + insert_payload = {"collectionName": name, "data": growing_data} + rsp = self.vector_client.vector_insert(insert_payload) + assert rsp['code'] == 0 + + filter_expr = f"{spatial_func}(geo, '{query_geom}')" + + # 1. Query with spatial filter + query_payload = { + "collectionName": name, + "filter": filter_expr, + "outputFields": ["id", "geo"], + "limit": 100 + } + rsp = self.vector_client.vector_query(query_payload) + assert rsp['code'] == 0 + query_count = len(rsp.get('data', [])) + logger.info(f"{spatial_func} ({data_state}, geo_index={with_geo_index}, nullable={nullable}) query returned {query_count} results") + # Verify we got results (except for edge cases) + if not nullable or spatial_func not in ["ST_EQUALS"]: + assert query_count > 0, f"{spatial_func} query should return results" + + # 2. Search with geo filter + query_vector = preprocessing.normalize([np.array([random.random() for _ in range(default_dim)])])[0].tolist() + search_payload = { + "collectionName": name, + "data": [query_vector], + "annsField": "vector", + "filter": filter_expr, + "limit": 10, + "outputFields": ["id", "geo"] + } + rsp = self.vector_client.vector_search(search_payload) + assert rsp['code'] == 0 + search_count = len(rsp.get('data', [])) + logger.info(f"{spatial_func} ({data_state}, geo_index={with_geo_index}, nullable={nullable}) search returned {search_count} results") + + def test_upsert_geometry_data(self): + """ + target: test upsert geometry data + method: upsert geometry data + expected: upsert executes successfully + """ + name = gen_collection_name() + payload = { + "collectionName": name, + "schema": { + "autoId": False, + "enableDynamicField": True, + "fields": [ + {"fieldName": "id", "dataType": "Int64", "isPrimary": True}, + {"fieldName": "vector", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{default_dim}"}}, + {"fieldName": "geo", "dataType": "Geometry"} + ] + }, + "indexParams": [ + {"fieldName": "vector", "indexName": "vector_idx", "metricType": "L2"}, + {"fieldName": "geo", "indexName": "geo_idx", "indexType": "RTREE"} + ] + } + rsp = self.collection_client.collection_create(payload) + assert rsp['code'] == 0 + + nb = 100 + + def generate_geo_data(start_id, count): + data = [] + for i in range(count): + x = random.uniform(10, 90) + y = random.uniform(10, 90) + data.append({ + "id": start_id + i, + "vector": preprocessing.normalize([np.array([random.random() for _ in range(default_dim)])])[0].tolist(), + "geo": f"POINT ({x:.2f} {y:.2f})" + }) + return data + + # Insert initial data + data = generate_geo_data(0, nb) + insert_payload = {"collectionName": name, "data": data} + rsp = self.vector_client.vector_insert(insert_payload) + assert rsp['code'] == 0 + self.wait_collection_load_completed(name) + + # Upsert data + upsert_data = generate_geo_data(0, nb // 2) + upsert_payload = {"collectionName": name, "data": upsert_data} + rsp = self.vector_client.vector_upsert(upsert_payload) + assert rsp['code'] == 0 + logger.info("Upsert geometry data completed successfully") + + def test_delete_geometry_data(self): + """ + target: test delete geometry data + method: delete geometry data + expected: delete executes successfully + """ + name = gen_collection_name() + payload = { + "collectionName": name, + "schema": { + "autoId": False, + "enableDynamicField": True, + "fields": [ + {"fieldName": "id", "dataType": "Int64", "isPrimary": True}, + {"fieldName": "vector", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{default_dim}"}}, + {"fieldName": "geo", "dataType": "Geometry"} + ] + }, + "indexParams": [ + {"fieldName": "vector", "indexName": "vector_idx", "metricType": "L2"}, + {"fieldName": "geo", "indexName": "geo_idx", "indexType": "RTREE"} + ] + } + rsp = self.collection_client.collection_create(payload) + assert rsp['code'] == 0 + + nb = 100 + + def generate_geo_data(start_id, count): + data = [] + for i in range(count): + x = random.uniform(10, 90) + y = random.uniform(10, 90) + data.append({ + "id": start_id + i, + "vector": preprocessing.normalize([np.array([random.random() for _ in range(default_dim)])])[0].tolist(), + "geo": f"POINT ({x:.2f} {y:.2f})" + }) + return data + + # Insert data + data = generate_geo_data(0, nb) + insert_payload = {"collectionName": name, "data": data} + rsp = self.vector_client.vector_insert(insert_payload) + assert rsp['code'] == 0 + self.wait_collection_load_completed(name) + + # Delete data + delete_ids = list(range(0, nb // 2)) + delete_payload = {"collectionName": name, "filter": f"id in {delete_ids}"} + rsp = self.vector_client.vector_delete(delete_payload) + assert rsp['code'] == 0 + + # Verify deletion by querying + query_payload = { + "collectionName": name, + "filter": "id >= 0", + "outputFields": ["id", "geo"], + "limit": 200 + } + rsp = self.vector_client.vector_query(query_payload) + assert rsp['code'] == 0 + logger.info(f"Delete geometry data completed, remaining: {len(rsp.get('data', []))} records") + + def test_geometry_default_value(self): + """ + target: test geometry field with default value + method: create collection with geometry field having default value + expected: records without geo field use default value + """ + name = gen_collection_name() + default_geo = "POINT (0 0)" + payload = { + "collectionName": name, + "schema": { + "autoId": False, + "enableDynamicField": True, + "fields": [ + {"fieldName": "id", "dataType": "Int64", "isPrimary": True}, + {"fieldName": "vector", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{default_dim}"}}, + {"fieldName": "geo", "dataType": "Geometry", "defaultValue": default_geo} + ] + }, + "indexParams": [ + {"fieldName": "vector", "indexName": "vector_idx", "metricType": "L2"}, + {"fieldName": "geo", "indexName": "geo_idx", "indexType": "RTREE"} + ] + } + rsp = self.collection_client.collection_create(payload) + assert rsp['code'] == 0 + + nb = 100 + data = [] + for i in range(nb): + item = { + "id": i, + "vector": preprocessing.normalize([np.array([random.random() for _ in range(default_dim)])])[0].tolist(), + } + # 30% use default value (omit geo field) + if i % 3 != 0: + x = random.uniform(10, 90) + y = random.uniform(10, 90) + item["geo"] = f"POINT ({x:.2f} {y:.2f})" + # else: geo field omitted, will use default value + data.append(item) + + insert_payload = {"collectionName": name, "data": data} + rsp = self.vector_client.vector_insert(insert_payload) + assert rsp['code'] == 0 + self.wait_collection_load_completed(name) + + # Query for records with default geometry value + query_payload = { + "collectionName": name, + "filter": f"ST_EQUALS(geo, '{default_geo}')", + "outputFields": ["id", "geo"], + "limit": 100 + } + rsp = self.vector_client.vector_query(query_payload) + assert rsp['code'] == 0 + default_count = len(rsp.get('data', [])) + logger.info(f"Default geometry: found {default_count} records with default value") + + # Query all records + query_payload = { + "collectionName": name, + "filter": "id >= 0", + "outputFields": ["id", "geo"], + "limit": 200 + } + rsp = self.vector_client.vector_query(query_payload) + assert rsp['code'] == 0 + total_count = len(rsp.get('data', [])) + logger.info(f"Default geometry: total {total_count} records") + + # Spatial query with default value area + query_payload = { + "collectionName": name, + "filter": "ST_WITHIN(geo, 'POLYGON ((-5 -5, 5 -5, 5 5, -5 5, -5 -5))')", + "outputFields": ["id", "geo"], + "limit": 100 + } + rsp = self.vector_client.vector_query(query_payload) + assert rsp['code'] == 0 + logger.info(f"Default geometry: spatial query near origin returned {len(rsp.get('data', []))} results") + + @pytest.mark.parametrize("spatial_func", [ + "ST_INTERSECTS", + "ST_CONTAINS", + "ST_WITHIN", + ]) + def test_spatial_query_empty_result(self, spatial_func): + """ + target: test spatial query returns empty result when no data matches + method: query with geometry that doesn't match any data + expected: query returns empty result (edge case) + """ + name = gen_collection_name() + payload = { + "collectionName": name, + "schema": { + "autoId": False, + "enableDynamicField": True, + "fields": [ + {"fieldName": "id", "dataType": "Int64", "isPrimary": True}, + {"fieldName": "vector", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{default_dim}"}}, + {"fieldName": "geo", "dataType": "Geometry"} + ] + }, + "indexParams": [ + {"fieldName": "vector", "indexName": "vector_idx", "metricType": "L2"}, + {"fieldName": "geo", "indexName": "geo_idx", "indexType": "RTREE"} + ] + } + rsp = self.collection_client.collection_create(payload) + assert rsp['code'] == 0 + + # Insert data in region (0-50, 0-50) + nb = 50 + data = [] + for i in range(nb): + x = 10 + (i % 10) * 4 + y = 10 + (i // 10) * 4 + data.append({ + "id": i, + "vector": preprocessing.normalize([np.array([random.random() for _ in range(default_dim)])])[0].tolist(), + "geo": f"POINT ({x:.2f} {y:.2f})" + }) + + insert_payload = {"collectionName": name, "data": data} + rsp = self.vector_client.vector_insert(insert_payload) + assert rsp['code'] == 0 + self.wait_collection_load_completed(name) + + # Query with geometry far away from all data (region 200-300, 200-300) + # This should return empty results + if spatial_func == "ST_INTERSECTS": + query_geom = "POLYGON ((200 200, 300 200, 300 300, 200 300, 200 200))" + elif spatial_func == "ST_CONTAINS": + # Data points cannot contain this distant point + query_geom = "POINT (250.00 250.00)" + else: # ST_WITHIN + query_geom = "POLYGON ((200 200, 300 200, 300 300, 200 300, 200 200))" + + filter_expr = f"{spatial_func}(geo, '{query_geom}')" + query_payload = { + "collectionName": name, + "filter": filter_expr, + "outputFields": ["id", "geo"], + "limit": 100 + } + rsp = self.vector_client.vector_query(query_payload) + assert rsp['code'] == 0 + result_count = len(rsp.get('data', [])) + logger.info(f"{spatial_func} empty result test: query returned {result_count} results") + assert result_count == 0, f"{spatial_func} query should return empty result when no data matches" diff --git a/tests/restful_client_v2/utils/utils.py b/tests/restful_client_v2/utils/utils.py index 14aa69e4c0..7cfcf72e01 100644 --- a/tests/restful_client_v2/utils/utils.py +++ b/tests/restful_client_v2/utils/utils.py @@ -370,3 +370,412 @@ def get_sorted_distance(train_emb, test_emb, metric_type): distance = np.array(distance.T, order='C', dtype=np.float32) distance_sorted = np.sort(distance, axis=1).tolist() return distance_sorted + + +# ============= Geometry Utils ============= + +def generate_wkt_by_type(wkt_type: str, bounds: tuple = (0, 100, 0, 100), count: int = 10) -> list: + """ + Generate WKT examples dynamically based on geometry type + + Args: + wkt_type: Type of WKT geometry to generate (POINT, LINESTRING, POLYGON, MULTIPOINT, MULTILINESTRING, MULTIPOLYGON, GEOMETRYCOLLECTION) + bounds: Coordinate bounds as (min_x, max_x, min_y, max_y) + count: Number of geometries to generate + + Returns: + List of WKT strings + """ + if wkt_type == "POINT": + points = [] + for _ in range(count): + wkt_string = f"POINT ({random.uniform(bounds[0], bounds[1]):.2f} {random.uniform(bounds[2], bounds[3]):.2f})" + points.append(wkt_string) + return points + + elif wkt_type == "LINESTRING": + lines = [] + for _ in range(count): + points = [] + num_points = random.randint(2, 6) + for _ in range(num_points): + x = random.uniform(bounds[0], bounds[1]) + y = random.uniform(bounds[2], bounds[3]) + points.append(f"{x:.2f} {y:.2f}") + wkt_string = f"LINESTRING ({', '.join(points)})" + lines.append(wkt_string) + return lines + + elif wkt_type == "POLYGON": + polygons = [] + for _ in range(count): + if random.random() < 0.7: # 70% rectangles + x = random.uniform(bounds[0], bounds[1] - 50) + y = random.uniform(bounds[2], bounds[3] - 50) + width = random.uniform(10, 50) + height = random.uniform(10, 50) + polygon_wkt = f"POLYGON (({x:.2f} {y:.2f}, {x + width:.2f} {y:.2f}, {x + width:.2f} {y + height:.2f}, {x:.2f} {y + height:.2f}, {x:.2f} {y:.2f}))" + else: # 30% triangles + x1, y1 = random.uniform(bounds[0], bounds[1]), random.uniform(bounds[2], bounds[3]) + x2, y2 = random.uniform(bounds[0], bounds[1]), random.uniform(bounds[2], bounds[3]) + x3, y3 = random.uniform(bounds[0], bounds[1]), random.uniform(bounds[2], bounds[3]) + polygon_wkt = f"POLYGON (({x1:.2f} {y1:.2f}, {x2:.2f} {y2:.2f}, {x3:.2f} {y3:.2f}, {x1:.2f} {y1:.2f}))" + polygons.append(polygon_wkt) + return polygons + + elif wkt_type == "MULTIPOINT": + multipoints = [] + for _ in range(count): + points = [] + num_points = random.randint(2, 8) + for _ in range(num_points): + x = random.uniform(bounds[0], bounds[1]) + y = random.uniform(bounds[2], bounds[3]) + points.append(f"({x:.2f} {y:.2f})") + wkt_string = f"MULTIPOINT ({', '.join(points)})" + multipoints.append(wkt_string) + return multipoints + + elif wkt_type == "MULTILINESTRING": + multilines = [] + for _ in range(count): + lines = [] + num_lines = random.randint(2, 5) + for _ in range(num_lines): + line_points = [] + num_points = random.randint(2, 4) + for _ in range(num_points): + x = random.uniform(bounds[0], bounds[1]) + y = random.uniform(bounds[2], bounds[3]) + line_points.append(f"{x:.2f} {y:.2f}") + lines.append(f"({', '.join(line_points)})") + wkt_string = f"MULTILINESTRING ({', '.join(lines)})" + multilines.append(wkt_string) + return multilines + + elif wkt_type == "MULTIPOLYGON": + multipolygons = [] + for _ in range(count): + polygons = [] + num_polygons = random.randint(2, 4) + for _ in range(num_polygons): + x = random.uniform(bounds[0], bounds[1] - 30) + y = random.uniform(bounds[2], bounds[3] - 30) + size = random.uniform(10, 30) + polygon_coords = f"(({x:.2f} {y:.2f}, {x + size:.2f} {y:.2f}, {x + size:.2f} {y + size:.2f}, {x:.2f} {y + size:.2f}, {x:.2f} {y:.2f}))" + polygons.append(polygon_coords) + wkt_string = f"MULTIPOLYGON ({', '.join(polygons)})" + multipolygons.append(wkt_string) + return multipolygons + + elif wkt_type == "GEOMETRYCOLLECTION": + collections = [] + for _ in range(count): + collection_types = random.randint(2, 4) + geoms = [] + + for _ in range(collection_types): + geom_type = random.choice(["POINT", "LINESTRING", "POLYGON"]) + if geom_type == "POINT": + x, y = random.uniform(bounds[0], bounds[1]), random.uniform(bounds[2], bounds[3]) + geoms.append(f"POINT({x:.2f} {y:.2f})") + elif geom_type == "LINESTRING": + x1, y1 = random.uniform(bounds[0], bounds[1]), random.uniform(bounds[2], bounds[3]) + x2, y2 = random.uniform(bounds[0], bounds[1]), random.uniform(bounds[2], bounds[3]) + geoms.append(f"LINESTRING({x1:.2f} {y1:.2f}, {x2:.2f} {y2:.2f})") + else: # POLYGON + x, y = random.uniform(bounds[0], bounds[1] - 20), random.uniform(bounds[2], bounds[3] - 20) + size = random.uniform(5, 20) + geoms.append(f"POLYGON(({x:.2f} {y:.2f}, {x + size:.2f} {y:.2f}, {x + size:.2f} {y + size:.2f}, {x:.2f} {y + size:.2f}, {x:.2f} {y:.2f}))") + + wkt_string = f"GEOMETRYCOLLECTION({', '.join(geoms)})" + collections.append(wkt_string) + return collections + + else: + raise ValueError(f"Unsupported WKT type: {wkt_type}") + + +def generate_diverse_base_data(count=100, bounds=(0, 100, 0, 100), pk_field_name="id", geo_field_name="geo"): + """ + Generate diverse base geometry data for testing + + Args: + count: Number of geometries to generate (default: 100) + bounds: Coordinate bounds as (min_x, max_x, min_y, max_y) + pk_field_name: Name of the primary key field (default: "id") + geo_field_name: Name of the geometry field (default: "geo") + + Returns: + List of geometry data with format [{pk_field_name: int, geo_field_name: "WKT_STRING"}, ...] + """ + base_data = [] + min_x, max_x, min_y, max_y = bounds + + # Generate points (30% of data) + point_count = int(count * 0.3) + for _ in range(point_count): + x = random.uniform(min_x, max_x) + y = random.uniform(min_y, max_y) + wkt_string = f"POINT ({x:.2f} {y:.2f})" + base_data.append({pk_field_name: len(base_data), geo_field_name: wkt_string}) + + # Generate polygons (40% of data) + polygon_count = int(count * 0.4) + for _ in range(polygon_count): + size = random.uniform(5, 20) + x = random.uniform(min_x, max_x - size) + y = random.uniform(min_y, max_y - size) + wkt_string = f"POLYGON (({x:.2f} {y:.2f}, {x + size:.2f} {y:.2f}, {x + size:.2f} {y + size:.2f}, {x:.2f} {y + size:.2f}, {x:.2f} {y:.2f}))" + base_data.append({pk_field_name: len(base_data), geo_field_name: wkt_string}) + + # Generate linestrings (25% of data) + line_count = int(count * 0.25) + for _ in range(line_count): + point_count_per_line = random.randint(2, 4) + coords = [] + for _ in range(point_count_per_line): + x = random.uniform(min_x, max_x) + y = random.uniform(min_y, max_y) + coords.append(f"{x:.2f} {y:.2f}") + wkt_string = f"LINESTRING ({', '.join(coords)})" + base_data.append({pk_field_name: len(base_data), geo_field_name: wkt_string}) + + # Add some specific geometries for edge cases + remaining = count - len(base_data) + if remaining > 0: + # Add duplicate points for ST_EQUALS testing + if len(base_data) > 0 and "POINT" in base_data[0][geo_field_name]: + base_data.append({pk_field_name: len(base_data), geo_field_name: base_data[0][geo_field_name]}) + remaining -= 1 + + # Fill remaining with random points + for _ in range(remaining): + x = random.uniform(min_x, max_x) + y = random.uniform(min_y, max_y) + wkt_string = f"POINT ({x:.2f} {y:.2f})" + base_data.append({pk_field_name: len(base_data), geo_field_name: wkt_string}) + + return base_data + + +def generate_spatial_query_data_for_function(spatial_func, base_data, geo_field_name="geo"): + """ + Generate query geometry for specific spatial function based on base data + Ensures the query will match multiple results (>1) + + Args: + spatial_func: The spatial function name (e.g., "ST_INTERSECTS", "ST_CONTAINS") + base_data: List of base geometry data with format [{"id": int, geo_field_name: "WKT_STRING"}, ...] + geo_field_name: Name of the geometry field in base_data (default: "geo") + + Returns: + query_geom: WKT string of the query geometry that should match multiple base geometries + """ + import re + + def parse_point(wkt): + """Extract x, y from POINT WKT""" + match = re.search(r"POINT \(([0-9.-]+) ([0-9.-]+)\)", wkt) + if match: + return float(match.group(1)), float(match.group(2)) + return None, None + + def parse_polygon_bounds(wkt): + """Extract min/max bounds from POLYGON WKT""" + match = re.search(r"POLYGON \(\(([^)]+)\)\)", wkt) + if match: + coords = match.group(1).split(", ") + xs, ys = [], [] + for coord in coords: + parts = coord.strip().split() + if len(parts) >= 2: + xs.append(float(parts[0])) + ys.append(float(parts[1])) + if xs and ys: + return min(xs), max(xs), min(ys), max(ys) + return None, None, None, None + + if spatial_func == "ST_INTERSECTS": + # Create a large query polygon that will intersect with many geometries + all_coords = [] + for item in base_data: + if "POINT" in item[geo_field_name]: + x, y = parse_point(item[geo_field_name]) + if x is not None and y is not None: + all_coords.append((x, y)) + elif "POLYGON" in item[geo_field_name]: + min_x, max_x, min_y, max_y = parse_polygon_bounds(item[geo_field_name]) + if min_x is not None: + all_coords.append(((min_x + max_x) / 2, (min_y + max_y) / 2)) + + if all_coords and len(all_coords) >= 5: + target_coords = all_coords[:min(10, len(all_coords))] + center_x = sum(coord[0] for coord in target_coords) / len(target_coords) + center_y = sum(coord[1] for coord in target_coords) / len(target_coords) + size = 40 + query_geom = f"POLYGON (({center_x - size / 2} {center_y - size / 2}, {center_x + size / 2} {center_y - size / 2}, {center_x + size / 2} {center_y + size / 2}, {center_x - size / 2} {center_y + size / 2}, {center_x - size / 2} {center_y - size / 2}))" + else: + query_geom = "POLYGON ((30 30, 70 30, 70 70, 30 70, 30 30))" + + elif spatial_func == "ST_CONTAINS": + # Create a query polygon that contains multiple points + points = [] + for item in base_data: + if "POINT" in item[geo_field_name]: + x, y = parse_point(item[geo_field_name]) + if x is not None and y is not None: + points.append((x, y)) + + if len(points) >= 3: + target_points = points[:min(10, len(points))] + min_x = min(p[0] for p in target_points) - 5 + max_x = max(p[0] for p in target_points) + 5 + min_y = min(p[1] for p in target_points) - 5 + max_y = max(p[1] for p in target_points) + 5 + query_geom = f"POLYGON (({min_x} {min_y}, {max_x} {min_y}, {max_x} {max_y}, {min_x} {max_y}, {min_x} {min_y}))" + else: + query_geom = "POLYGON ((25 25, 75 25, 75 75, 25 75, 25 25))" + + elif spatial_func == "ST_WITHIN": + # Create a large query polygon that contains many small geometries + query_geom = "POLYGON ((5 5, 95 5, 95 95, 5 95, 5 5))" + + elif spatial_func == "ST_EQUALS": + # Find a point in base data and create query with same point + for item in base_data: + if "POINT" in item[geo_field_name]: + query_geom = item[geo_field_name] + break + else: + query_geom = "POINT (25 25)" + + elif spatial_func == "ST_TOUCHES": + # Create a polygon that touches some base geometries + points = [] + for item in base_data: + if "POINT" in item[geo_field_name]: + x, y = parse_point(item[geo_field_name]) + if x is not None and y is not None: + points.append((x, y)) + + if points: + target_point = points[0] + x, y = target_point[0], target_point[1] + size = 20 + query_geom = f"POLYGON (({x} {y - size}, {x + size} {y - size}, {x + size} {y}, {x} {y}, {x} {y - size}))" + else: + query_geom = "POLYGON ((0 0, 20 0, 20 20, 0 20, 0 0))" + + elif spatial_func == "ST_OVERLAPS": + # Find polygons in base data and create overlapping query polygon + polygons = [] + for item in base_data: + if "POLYGON" in item[geo_field_name]: + min_x, max_x, min_y, max_y = parse_polygon_bounds(item[geo_field_name]) + if min_x is not None: + polygons.append((min_x, max_x, min_y, max_y)) + + if polygons: + target_poly = polygons[0] + min_x, max_x, min_y, max_y = target_poly[0], target_poly[1], target_poly[2], target_poly[3] + shift = (max_x - min_x) * 0.3 + query_geom = f"POLYGON (({min_x + shift} {min_y + shift}, {max_x + shift} {min_y + shift}, {max_x + shift} {max_y + shift}, {min_x + shift} {max_y + shift}, {min_x + shift} {min_y + shift}))" + else: + query_geom = "POLYGON ((10 10, 30 10, 30 30, 10 30, 10 10))" + + elif spatial_func == "ST_CROSSES": + # Create a line that crosses polygons + polygons = [] + for item in base_data: + if "POLYGON" in item[geo_field_name]: + min_x, max_x, min_y, max_y = parse_polygon_bounds(item[geo_field_name]) + if min_x is not None: + polygons.append((min_x, max_x, min_y, max_y)) + + if polygons: + target_poly = polygons[0] + min_x, max_x, min_y, max_y = target_poly[0], target_poly[1], target_poly[2], target_poly[3] + center_x = (min_x + max_x) / 2 + center_y = (min_y + max_y) / 2 + query_geom = f"LINESTRING ({center_x} {min_y - 10}, {center_x} {max_y + 10})" + else: + query_geom = "LINESTRING (15 -5, 15 25)" + + else: + query_geom = "POLYGON ((0 0, 50 0, 50 50, 0 50, 0 0))" + + return query_geom + + +def generate_gt(spatial_func, base_data, query_geom, geo_field_name="geo", pk_field_name="id"): + """ + Generate ground truth (expected IDs) using shapely + + Args: + spatial_func: The spatial function name (e.g., "ST_INTERSECTS", "ST_CONTAINS") + base_data: List of base geometry data with format [{pk_field_name: int, geo_field_name: "WKT_STRING"}, ...] + query_geom: WKT string of the query geometry + geo_field_name: Name of the geometry field in base_data (default: "geo") + pk_field_name: Name of the primary key field in base_data (default: "id") + + Returns: + expected_ids: List of primary key values that should match the spatial function + """ + try: + from shapely import wkt + import shapely + except ImportError: + logger.warning("shapely not installed, returning empty expected_ids") + return [] + + # Spatial function mapping + spatial_function_mapping = { + "ST_EQUALS": shapely.equals, + "ST_TOUCHES": shapely.touches, + "ST_OVERLAPS": shapely.overlaps, + "ST_CROSSES": shapely.crosses, + "ST_CONTAINS": shapely.contains, + "ST_INTERSECTS": shapely.intersects, + "ST_WITHIN": shapely.within, + } + + if spatial_func not in spatial_function_mapping: + logger.warning(f"Unsupported spatial function {spatial_func}, returning empty expected_ids") + return [] + + try: + # Parse query geometry + query_geometry = wkt.loads(query_geom) + shapely_func = spatial_function_mapping[spatial_func] + + # Parse all base geometries + base_geometries = [] + base_ids = [] + for item in base_data: + try: + base_geometry = wkt.loads(item[geo_field_name]) + base_geometries.append(base_geometry) + base_ids.append(item[pk_field_name]) + except Exception as e: + logger.warning(f"Failed to parse geometry {item[geo_field_name]}: {e}") + continue + + if not base_geometries: + return [] + + # Convert to numpy array for vectorized operation + base_geoms_array = np.array(base_geometries) + base_ids_array = np.array(base_ids) + + # Apply vectorized spatial function + results = shapely_func(base_geoms_array, query_geometry) + + # Get matching IDs + expected_ids = base_ids_array[results].tolist() + + return expected_ids + + except Exception as e: + logger.error(f"Failed to compute ground truth for {spatial_func}: {e}") + return []