test: [cp2.5]add hybrid search offset testcase in restful api (#43711)

pr: https://github.com/milvus-io/milvus/pull/43646

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2025-08-05 14:49:48 +08:00 committed by GitHub
parent 51b3d246f9
commit e706e825dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -2707,6 +2707,358 @@ class TestHybridSearchVector(TestBase):
assert rsp['code'] == 0
assert len(rsp['data']) == 10
@pytest.mark.parametrize("insert_round", [1])
@pytest.mark.parametrize("auto_id", [True])
@pytest.mark.parametrize("is_partition_key", [True])
@pytest.mark.parametrize("enable_dynamic_schema", [True])
@pytest.mark.parametrize("nb", [3000])
@pytest.mark.parametrize("dim", [128])
@pytest.mark.parametrize("limit", [100])
@pytest.mark.parametrize("offset", [100,200])
def test_hybrid_search_vector_with_offset(self, nb, dim, insert_round, auto_id,
is_partition_key, enable_dynamic_schema, limit, offset):
"""
Test hybrid search with offset parameter
"""
# create a collection
name = gen_collection_name()
payload = {
"collectionName": name,
"schema": {
"autoId": auto_id,
"enableDynamicField": enable_dynamic_schema,
"fields": [
{"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}},
{"fieldName": "user_id", "dataType": "Int64", "isPartitionKey": is_partition_key,
"elementTypeParams": {}},
{"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}},
{"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}},
{"fieldName": "float_vector_1", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}},
{"fieldName": "float_vector_2", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}},
]
},
"indexParams": [
{"fieldName": "float_vector_1", "indexName": "float_vector_1", "metricType": "COSINE"},
{"fieldName": "float_vector_2", "indexName": "float_vector_2", "metricType": "COSINE"},
]
}
rsp = self.collection_client.collection_create(payload)
assert rsp['code'] == 0
# insert data
for i in range(insert_round):
data = []
for i in range(nb):
if auto_id:
tmp = {
"user_id": i % 100,
"word_count": i,
"book_describe": f"book_{i}",
"float_vector_1": gen_vector(datatype="FloatVector", dim=dim),
"float_vector_2": gen_vector(datatype="FloatVector", dim=dim),
}
else:
tmp = {
"book_id": i,
"user_id": i % 100,
"word_count": i,
"book_describe": f"book_{i}",
"float_vector_1": gen_vector(datatype="FloatVector", dim=dim),
"float_vector_2": gen_vector(datatype="FloatVector", dim=dim),
}
if enable_dynamic_schema:
tmp.update({f"dynamic_field_{i}": i})
data.append(tmp)
payload = {
"collectionName": name,
"data": data,
}
rsp = self.vector_client.vector_insert(payload)
assert rsp['code'] == 0
assert rsp['data']['insertCount'] == nb
float_vector_1 = gen_vector(datatype="FloatVector", dim=dim)
float_vector_2 = gen_vector(datatype="FloatVector", dim=dim)
# hybrid search with offset
payload = {
"collectionName": name,
"search": [{
"data": [float_vector_1],
"annsField": "float_vector_1",
"limit": offset + limit,
"outputFields": ["*"]
},
{
"data": [float_vector_2],
"annsField": "float_vector_2",
"limit": offset + limit,
"outputFields": ["*"]
}
],
"rerank": {
"strategy": "rrf",
"params": {
"k": 10,
}
},
"limit": limit,
"offset": offset,
"outputFields": ["book_id", "user_id", "word_count", "book_describe"]
}
rsp = self.vector_client.vector_hybrid_search(payload)
# Check if offset + limit exceeds max allowed
if offset + limit > constant.MAX_SUM_OFFSET_AND_LIMIT:
assert rsp['code'] == 1
assert "exceeds" in rsp['message'] or "invalid" in rsp['message'].lower()
else:
assert rsp['code'] == 0
assert len(rsp['data']) <= limit
# Verify offset works by comparing with search without offset
payload_no_offset = {
"collectionName": name,
"search": [{
"data": [float_vector_1],
"annsField": "float_vector_1",
"limit": offset + limit,
"outputFields": ["*"]
},
{
"data": [float_vector_2],
"annsField": "float_vector_2",
"limit": offset + limit,
"outputFields": ["*"]
}
],
"rerank": {
"strategy": "rrf",
"params": {
"k": 10,
}
},
"limit": offset + limit,
"outputFields": ["book_id", "user_id", "word_count", "book_describe"]
}
rsp_no_offset = self.vector_client.vector_hybrid_search(payload_no_offset)
if rsp_no_offset['code'] == 0 and len(rsp_no_offset['data']) > offset:
# Extract PKs from results with offset
pks_with_offset = set()
for item in rsp['data']:
if 'book_id' in item:
pks_with_offset.add(item['book_id'])
# Extract PKs from the corresponding portion of results without offset
pks_no_offset_expected = set()
expected_results = rsp_no_offset['data'][offset:offset + limit]
for item in expected_results:
if 'book_id' in item:
pks_no_offset_expected.add(item['book_id'])
# Calculate intersection rate
if len(pks_no_offset_expected) > 0:
intersection = pks_with_offset.intersection(pks_no_offset_expected)
intersection_rate = len(intersection) / len(pks_no_offset_expected)
logger.info(f"PK intersection rate: {intersection_rate:.2%}")
# The intersection rate should be at least 80%
assert intersection_rate >= 0.8, f"PK intersection rate {intersection_rate:.2%} is less than 80%"
else:
# If no expected results, the offset results should also be empty
assert len(pks_with_offset) == 0
@pytest.mark.parametrize("auto_id", [True])
@pytest.mark.parametrize("is_partition_key", [True])
@pytest.mark.parametrize("enable_dynamic_schema", [True])
@pytest.mark.parametrize("nb", [1000])
@pytest.mark.parametrize("dim", [2])
@pytest.mark.parametrize("invalid_offset", [-1, -10, -100, "abc", [], {}])
def test_hybrid_search_vector_with_invalid_offset(self, nb, dim, auto_id,
is_partition_key, enable_dynamic_schema, invalid_offset):
"""
Test hybrid search with invalid offset values
"""
# create a collection
name = gen_collection_name()
payload = {
"collectionName": name,
"schema": {
"autoId": auto_id,
"enableDynamicField": enable_dynamic_schema,
"fields": [
{"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}},
{"fieldName": "user_id", "dataType": "Int64", "isPartitionKey": is_partition_key,
"elementTypeParams": {}},
{"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}},
{"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}},
{"fieldName": "float_vector_1", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}},
{"fieldName": "float_vector_2", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}},
]
},
"indexParams": [
{"fieldName": "float_vector_1", "indexName": "float_vector_1", "metricType": "L2"},
{"fieldName": "float_vector_2", "indexName": "float_vector_2", "metricType": "L2"},
]
}
rsp = self.collection_client.collection_create(payload)
assert rsp['code'] == 0
# insert data
data = []
for i in range(nb):
if auto_id:
tmp = {
"user_id": i % 100,
"word_count": i,
"book_describe": f"book_{i}",
"float_vector_1": gen_vector(datatype="FloatVector", dim=dim),
"float_vector_2": gen_vector(datatype="FloatVector", dim=dim),
}
else:
tmp = {
"book_id": i,
"user_id": i % 100,
"word_count": i,
"book_describe": f"book_{i}",
"float_vector_1": gen_vector(datatype="FloatVector", dim=dim),
"float_vector_2": gen_vector(datatype="FloatVector", dim=dim),
}
if enable_dynamic_schema:
tmp.update({f"dynamic_field_{i}": i})
data.append(tmp)
payload = {
"collectionName": name,
"data": data,
}
rsp = self.vector_client.vector_insert(payload)
assert rsp['code'] == 0
# hybrid search with invalid offset
payload = {
"collectionName": name,
"search": [{
"data": [gen_vector(datatype="FloatVector", dim=dim)],
"annsField": "float_vector_1",
"limit": 10,
"outputFields": ["*"]
},
{
"data": [gen_vector(datatype="FloatVector", dim=dim)],
"annsField": "float_vector_2",
"limit": 10,
"outputFields": ["*"]
}
],
"rerank": {
"strategy": "rrf",
"params": {
"k": 10,
}
},
"limit": 10,
"offset": invalid_offset,
"outputFields": ["user_id", "word_count", "book_describe"]
}
rsp = self.vector_client.vector_hybrid_search(payload)
assert rsp['code'] != 0
assert "offset" in rsp['message'].lower() or "invalid" in rsp['message'].lower()
@pytest.mark.parametrize("nb", [5000])
@pytest.mark.parametrize("dim", [128])
@pytest.mark.parametrize("limit", [100])
@pytest.mark.parametrize("large_offset", [10000, 16384])
@pytest.mark.skip(reason="issue: https://github.com/milvus-io/milvus/issues/43639")
def test_hybrid_search_vector_with_large_offset(self, nb, dim, limit, large_offset, auto_id):
"""
Test hybrid search with large offset values
"""
# create a collection
name = gen_collection_name()
payload = {
"collectionName": name,
"schema": {
"autoId": True,
"enableDynamicField": True,
"fields": [
{"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}},
{"fieldName": "user_id", "dataType": "Int64", "elementTypeParams": {}},
{"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}},
{"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}},
{"fieldName": "float_vector_1", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}},
{"fieldName": "float_vector_2", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}},
]
},
"indexParams": [
{"fieldName": "float_vector_1", "indexName": "float_vector_1", "metricType": "IP"},
{"fieldName": "float_vector_2", "indexName": "float_vector_2", "metricType": "IP"},
]
}
rsp = self.collection_client.collection_create(payload)
assert rsp['code'] == 0
# insert data
data = []
for i in range(nb):
tmp = {
"user_id": i % 100,
"word_count": i,
"book_describe": f"book_{i}",
"float_vector_1": gen_vector(datatype="FloatVector", dim=dim),
"float_vector_2": gen_vector(datatype="FloatVector", dim=dim),
}
data.append(tmp)
payload = {
"collectionName": name,
"data": data,
}
rsp = self.vector_client.vector_insert(payload)
assert rsp['code'] == 0
# hybrid search with large offset
payload = {
"collectionName": name,
"search": [{
"data": [gen_vector(datatype="FloatVector", dim=dim)],
"annsField": "float_vector_1",
"limit": limit + large_offset,
"outputFields": ["*"]
},
{
"data": [gen_vector(datatype="FloatVector", dim=dim)],
"annsField": "float_vector_2",
"limit": limit + large_offset,
"outputFields": ["*"]
}
],
"rerank": {
"strategy": "rrf",
"params": {
"k": 10,
}
},
"limit": limit,
"offset": large_offset,
"outputFields": ["user_id", "word_count", "book_describe"]
}
rsp = self.vector_client.vector_hybrid_search(payload)
# When offset + limit exceeds max allowed
if large_offset + limit > constant.MAX_SUM_OFFSET_AND_LIMIT:
assert rsp['code'] == 65535
assert "exceeds" in rsp['message'] or "invalid" in rsp['message'].lower()
# When offset is larger than the available results
if large_offset >= nb:
# Should return empty results or handle gracefully
assert rsp['code'] == 0
assert len(rsp['data']) == 0
else:
assert rsp['code'] == 0
# Should return remaining results after offset
expected_count = min(limit, nb - large_offset)
assert len(rsp['data']) == expected_count
@pytest.mark.L0
class TestQueryVector(TestBase):