test: Add int8 vector field support in search checker (#42047)

/kind improvement

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2025-05-26 16:18:49 +08:00 committed by GitHub
parent f84650ece0
commit bfa948c2d4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 72 additions and 7 deletions

View File

@ -13,7 +13,7 @@ from prettytable import PrettyTable
import functools
from collections import Counter
from time import sleep
from pymilvus import AnnSearchRequest, RRFRanker, MilvusClient
from pymilvus import AnnSearchRequest, RRFRanker, MilvusClient, DataType
from pymilvus.bulk_writer import RemoteBulkWriter, BulkFileType
from base.database_wrapper import ApiDatabaseWrapper
from base.collection_wrapper import ApiCollectionWrapper
@ -314,7 +314,7 @@ def exception_handler():
log_message = f"Error in {class_name}.{function_name}: {log_e}"
else:
log_message = f"Error in {function_name}: {log_e}"
log.error(log_message)
log.exception(log_message)
log.error(log_e)
return Error(e), False
@ -380,6 +380,7 @@ class Checker:
self.json_field_names = cf.get_json_field_name_list(schema=schema)
self.float_vector_field_names = cf.get_float_vec_field_name_list(schema=schema)
self.binary_vector_field_names = cf.get_binary_vec_field_name_list(schema=schema)
self.int8_vector_field_names = cf.get_int8_vec_field_name_list(schema=schema)
self.bm25_sparse_field_names = cf.get_bm25_vec_field_name_list(schema=schema)
# get index of collection
indexes = [index.to_dict() for index in self.c_wrap.indexes]
@ -424,6 +425,15 @@ class Checker:
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
# create index for int8 vector fields
for f in self.int8_vector_field_names:
if f in indexed_fields:
continue
self.c_wrap.create_index(f,
constants.DEFAULT_INT8_INDEX_PARAM,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
# create index for binary vector fields
for f in self.binary_vector_field_names:
if f in indexed_fields:
@ -678,18 +688,22 @@ class PartitionReleaseChecker(Checker):
class SearchChecker(Checker):
"""check search operations in a dependent thread"""
def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ):
def __init__(self, collection_name=None, shards_num=2, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("SearchChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
self.insert_data()
self.dense_anns_field_name_list = cf.get_dense_anns_field_name_list(self.schema)
self.data = None
self.anns_field_name = None
self.search_param = None
@trace()
def search(self):
res, result = self.c_wrap.search(
data=cf.gen_vectors(5, self.dim),
anns_field=self.float_vector_field_name,
param=constants.DEFAULT_SEARCH_PARAM,
data=self.data,
anns_field=self.anns_field_name,
param=self.search_param,
limit=1,
partition_names=self.p_names,
timeout=search_timeout,
@ -699,6 +713,15 @@ class SearchChecker(Checker):
@exception_handler()
def run_task(self):
anns_field_item = random.choice(self.dense_anns_field_name_list)
self.anns_field_name = anns_field_item["name"]
dim = anns_field_item["dim"]
self.data = cf.gen_vectors(5, dim, vector_data_type=anns_field_item["dtype"])
if anns_field_item["dtype"] in [DataType.FLOAT_VECTOR, DataType.FLOAT16_VECTOR, DataType.BFLOAT16_VECTOR]:
self.search_param = constants.DEFAULT_SEARCH_PARAM
elif anns_field_item["dtype"] == DataType.INT8_VECTOR:
self.search_param = constants.DEFAULT_INT8_SEARCH_PARAM
res, result = self.search()
return res, result

View File

@ -23,6 +23,8 @@ WAIT_PER_OP = 10 # time to wait in seconds between operations
CHAOS_DURATION = 120 # chaos duration time in seconds
DEFAULT_INDEX_PARAM = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 48, "efConstruction": 500}}
DEFAULT_SEARCH_PARAM = {"metric_type": "L2", "params": {"ef": 64}}
DEFAULT_INT8_INDEX_PARAM = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 48, "efConstruction": 500}}
DEFAULT_INT8_SEARCH_PARAM = {"metric_type": "L2", "params": {"ef": 64}}
DEFAULT_BINARY_INDEX_PARAM = {"index_type": "BIN_IVF_FLAT", "metric_type": "JACCARD", "params": {"M": 48}}
DEFAULT_BINARY_SEARCH_PARAM = {"metric_type": "JACCARD", "params": {"nprobe": 10}}
DEFAULT_BM25_INDEX_PARAM = {"index_type": "SPARSE_INVERTED_INDEX", "metric_type": "BM25", "params": {"bm25_k1": 1.5, "bm25_b": 0.75}}

View File

@ -731,6 +731,12 @@ def gen_bfloat16_vec_field(name=ct.default_float_vec_field_name, is_primary=Fals
is_primary=is_primary, **kwargs)
return float_vec_field
def gen_int8_vec_field(name=ct.default_int8_vec_field_name, is_primary=False, dim=ct.default_dim,
description=ct.default_desc, **kwargs):
int8_vec_field, _ = ApiFieldSchemaWrapper().init_field_schema(name=name, dtype=DataType.INT8_VECTOR,
description=description, dim=dim,
is_primary=is_primary, **kwargs)
return int8_vec_field
def gen_sparse_vec_field(name=ct.default_sparse_vec_field_name, is_primary=False, description=ct.default_desc, **kwargs):
sparse_vec_field, _ = ApiFieldSchemaWrapper().init_field_schema(name=name, dtype=DataType.SPARSE_FLOAT_VECTOR,
@ -821,7 +827,7 @@ def gen_all_datatype_collection_schema(description=ct.default_desc, primary_fiel
gen_array_field(name="array_varchar", element_type=DataType.VARCHAR, max_length=200),
gen_array_field(name="array_bool", element_type=DataType.BOOL),
gen_float_vec_field(dim=dim),
gen_float_vec_field(name="image_emb", dim=dim),
gen_int8_vec_field(name="image_emb", dim=dim),
gen_float_vec_field(name="text_sparse_emb", vector_data_type=DataType.SPARSE_FLOAT_VECTOR),
gen_float_vec_field(name="voice_emb", dim=dim),
]
@ -1931,6 +1937,15 @@ def get_binary_vec_field_name_list(schema=None):
vec_fields.append(field.name)
return vec_fields
def get_int8_vec_field_name_list(schema=None):
vec_fields = []
if schema is None:
schema = gen_default_collection_schema()
fields = schema.fields
for field in fields:
if field.dtype in [DataType.INT8_VECTOR]:
vec_fields.append(field.name)
return vec_fields
def get_bm25_vec_field_name_list(schema=None):
if not hasattr(schema, "functions"):
@ -1954,6 +1969,20 @@ def get_dim_by_schema(schema=None):
return dim
return None
def get_dense_anns_field_name_list(schema=None):
if schema is None:
schema = gen_default_collection_schema()
fields = schema.fields
anns_fields = []
for field in fields:
if field.dtype in [DataType.FLOAT_VECTOR,DataType.FLOAT16_VECTOR,DataType.BFLOAT16_VECTOR, DataType.INT8_VECTOR, DataType.BINARY_VECTOR]:
item = {
"name": field.name,
"dtype": field.dtype,
"dim": field.params['dim']
}
anns_fields.append(item)
return anns_fields
def gen_varchar_data(length: int, nb: int, text_mode=False):
if text_mode:
@ -2037,6 +2066,16 @@ def gen_data_by_collection_field(field, nb=None, start=None):
if nb is None:
return np.array([random.random() for _ in range(int(dim))], dtype=np.float16)
return [np.array([random.random() for _ in range(int(dim))], dtype=np.float16) for _ in range(int(nb))]
if data_type == DataType.INT8_VECTOR:
dim = field.params['dim']
if nb is None:
raw_vector = [random.randint(-128, 127) for _ in range(dim)]
int8_vector = np.array(raw_vector, dtype=np.int8)
return int8_vector
raw_vectors = [[random.randint(-128, 127) for _ in range(dim)] for _ in range(nb)]
int8_vectors = [np.array(raw_vector, dtype=np.int8) for raw_vector in raw_vectors]
return int8_vectors
if data_type == DataType.BINARY_VECTOR:
dim = field.params['dim']
if nb is None:

View File

@ -45,6 +45,7 @@ default_string_array_field_name = "string_array"
default_float_vec_field_name = "float_vector"
default_float16_vec_field_name = "float16_vector"
default_bfloat16_vec_field_name = "bfloat16_vector"
default_int8_vec_field_name = "int8_vector"
another_float_vec_field_name = "float_vector1"
default_binary_vec_field_name = "binary_vector"
text_sparse_vector = "TEXT_SPARSE_VECTOR"