test: add bitmap index cases (#35909)

Signed-off-by: wangting0128 <ting.wang@zilliz.com>
This commit is contained in:
wt 2024-09-03 16:49:03 +08:00 committed by GitHub
parent 74048ce34f
commit cb49b32358
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 977 additions and 35 deletions

View File

@ -1,5 +1,6 @@
import pytest import pytest
import sys import sys
from typing import Dict, List
from pymilvus import DefaultConfig from pymilvus import DefaultConfig
from base.database_wrapper import ApiDatabaseWrapper from base.database_wrapper import ApiDatabaseWrapper
@ -15,6 +16,7 @@ from base.high_level_api_wrapper import HighLevelApiWrapper
from utils.util_log import test_log as log from utils.util_log import test_log as log
from common import common_func as cf from common import common_func as cf
from common import common_type as ct from common import common_type as ct
from common.common_params import IndexPrams
from pymilvus import ResourceGroupInfo from pymilvus import ResourceGroupInfo
@ -395,3 +397,23 @@ class TestcaseBase(Base):
return tmp_user, tmp_pwd, tmp_role return tmp_user, tmp_pwd, tmp_role
def build_multi_index(self, index_params: Dict[str, IndexPrams], collection_obj: ApiCollectionWrapper = None):
collection_obj = collection_obj or self.collection_wrap
for k, v in index_params.items():
collection_obj.create_index(field_name=k, index_params=v.to_dict, index_name=k)
log.info(f"[TestcaseBase] Build all indexes done: {list(index_params.keys())}")
return collection_obj
def drop_multi_index(self, index_names: List[str], collection_obj: ApiCollectionWrapper = None,
check_task=None, check_items=None):
collection_obj = collection_obj or self.collection_wrap
for n in index_names:
collection_obj.drop_index(index_name=n, check_task=check_task, check_items=check_items)
log.info(f"[TestcaseBase] Drop all indexes done: {index_names}")
return collection_obj
def show_indexes(self, collection_obj: ApiCollectionWrapper = None):
collection_obj = collection_obj or self.collection_wrap
indexes = {n.field_name: n.params for n in self.collection_wrap.indexes}
log.info("[TestcaseBase] Collection: `{0}` index: {1}".format(collection_obj.name, indexes))
return indexes

View File

@ -104,6 +104,10 @@ class ResponseChecker:
# describe collection interface(high level api) response check # describe collection interface(high level api) response check
result = self.check_describe_collection_property(self.response, self.func_name, self.check_items) result = self.check_describe_collection_property(self.response, self.func_name, self.check_items)
elif self.check_task == CheckTasks.check_insert_result:
# check `insert` interface response
result = self.check_insert_response(check_items=self.check_items)
# Add check_items here if something new need verify # Add check_items here if something new need verify
return result return result
@ -602,3 +606,18 @@ class ResponseChecker:
log.error("[CheckFunc] Response of API is not an error: %s" % str(res)) log.error("[CheckFunc] Response of API is not an error: %s" % str(res))
assert False assert False
return True return True
def check_insert_response(self, check_items):
# check request successful
self.assert_succ(self.succ, True)
# get insert count
real = check_items.get("insert_count", None) if isinstance(check_items, dict) else None
if real is None:
real = len(self.kwargs_dict.get("data", [[]])[0])
# check insert count
error_message = "[CheckFunc] Insert count does not meet expectations, response:{0} != expected:{1}"
assert self.response.insert_count == real, error_message.format(self.response.insert_count, real)
return True

View File

@ -32,3 +32,9 @@ class PartitionErrorMessage(ExceptionsMessage):
class IndexErrorMessage(ExceptionsMessage): class IndexErrorMessage(ExceptionsMessage):
WrongFieldName = "cannot create index on non-vector field: %s" WrongFieldName = "cannot create index on non-vector field: %s"
DropLoadedIndex = "index cannot be dropped, collection is loaded, please release it first"
CheckVectorIndex = "data type {0} can't build with this index {1}"
SparseFloatVectorMetricType = "only IP is the supported metric type for sparse index"
VectorMetricTypeExist = "metric type not set for vector index"
CheckBitmapIndex = "bitmap index are only supported on bool, int, string and array field"
CheckBitmapOnPK = "create bitmap index on primary key not supported"

View File

@ -14,7 +14,7 @@ from npy_append_array import NpyAppendArray
from faker import Faker from faker import Faker
from pathlib import Path from pathlib import Path
from minio import Minio from minio import Minio
from pymilvus import DataType from pymilvus import DataType, CollectionSchema
from base.schema_wrapper import ApiCollectionSchemaWrapper, ApiFieldSchemaWrapper from base.schema_wrapper import ApiCollectionSchemaWrapper, ApiFieldSchemaWrapper
from common import common_type as ct from common import common_type as ct
from utils.util_log import test_log as log from utils.util_log import test_log as log
@ -24,6 +24,12 @@ fake = Faker()
"""" Methods of processing data """ """" Methods of processing data """
try:
RNG = np.random.default_rng(seed=0)
except ValueError as e:
RNG = None
@singledispatch @singledispatch
def to_serializable(val): def to_serializable(val):
"""Used by default.""" """Used by default."""
@ -1230,20 +1236,23 @@ def gen_data_by_collection_field(field, nb=None, start=None):
if data_type == DataType.BFLOAT16_VECTOR: if data_type == DataType.BFLOAT16_VECTOR:
dim = field.params['dim'] dim = field.params['dim']
if nb is None: if nb is None:
raw_vector = [random.random() for _ in range(dim)] return RNG.uniform(size=dim).astype(bfloat16)
bf16_vector = np.array(raw_vector, dtype=bfloat16).view(np.uint8).tolist() return [RNG.uniform(size=dim).astype(bfloat16) for _ in range(int(nb))]
return bytes(bf16_vector) # if nb is None:
bf16_vectors = [] # raw_vector = [random.random() for _ in range(dim)]
for i in range(nb): # bf16_vector = np.array(raw_vector, dtype=bfloat16).view(np.uint8).tolist()
raw_vector = [random.random() for _ in range(dim)] # return bytes(bf16_vector)
bf16_vector = np.array(raw_vector, dtype=bfloat16).view(np.uint8).tolist() # bf16_vectors = []
bf16_vectors.append(bytes(bf16_vector)) # for i in range(nb):
return bf16_vectors # raw_vector = [random.random() for _ in range(dim)]
# bf16_vector = np.array(raw_vector, dtype=bfloat16).view(np.uint8).tolist()
# bf16_vectors.append(bytes(bf16_vector))
# return bf16_vectors
if data_type == DataType.FLOAT16_VECTOR: if data_type == DataType.FLOAT16_VECTOR:
dim = field.params['dim'] dim = field.params['dim']
if nb is None: if nb is None:
return [random.random() for i in range(dim)] return np.array([random.random() for _ in range(int(dim))], dtype=np.float16)
return [[random.random() for i in range(dim)] for _ in range(nb)] return [np.array([random.random() for _ in range(int(dim))], dtype=np.float16) for _ in range(int(nb))]
if data_type == DataType.BINARY_VECTOR: if data_type == DataType.BINARY_VECTOR:
dim = field.params['dim'] dim = field.params['dim']
if nb is None: if nb is None:
@ -1251,9 +1260,21 @@ def gen_data_by_collection_field(field, nb=None, start=None):
binary_byte = bytes(np.packbits(raw_vector, axis=-1).tolist()) binary_byte = bytes(np.packbits(raw_vector, axis=-1).tolist())
return binary_byte return binary_byte
return [bytes(np.packbits([random.randint(0, 1) for _ in range(dim)], axis=-1).tolist()) for _ in range(nb)] return [bytes(np.packbits([random.randint(0, 1) for _ in range(dim)], axis=-1).tolist()) for _ in range(nb)]
if data_type == DataType.SPARSE_FLOAT_VECTOR:
if nb is None:
return gen_sparse_vectors(nb=1)[0]
return gen_sparse_vectors(nb=nb)
if data_type == DataType.ARRAY: if data_type == DataType.ARRAY:
max_capacity = field.params['max_capacity'] max_capacity = field.params['max_capacity']
element_type = field.element_type element_type = field.element_type
if element_type == DataType.INT8:
if nb is None:
return [random.randint(-128, 127) for _ in range(max_capacity)]
return [[random.randint(-128, 127) for _ in range(max_capacity)] for _ in range(nb)]
if element_type == DataType.INT16:
if nb is None:
return [random.randint(-32768, 32767) for _ in range(max_capacity)]
return [[random.randint(-32768, 32767) for _ in range(max_capacity)] for _ in range(nb)]
if element_type == DataType.INT32: if element_type == DataType.INT32:
if nb is None: if nb is None:
return [random.randint(-2147483648, 2147483647) for _ in range(max_capacity)] return [random.randint(-2147483648, 2147483647) for _ in range(max_capacity)]
@ -1279,7 +1300,6 @@ def gen_data_by_collection_field(field, nb=None, start=None):
if nb is None: if nb is None:
return ["".join([chr(random.randint(97, 122)) for _ in range(length)]) for _ in range(max_capacity)] return ["".join([chr(random.randint(97, 122)) for _ in range(length)]) for _ in range(max_capacity)]
return [["".join([chr(random.randint(97, 122)) for _ in range(length)]) for _ in range(max_capacity)] for _ in range(nb)] return [["".join([chr(random.randint(97, 122)) for _ in range(length)]) for _ in range(max_capacity)] for _ in range(nb)]
return None return None
@ -1296,6 +1316,25 @@ def gen_data_by_collection_schema(schema, nb, r=0):
return data return data
def gen_varchar_values(nb: int, length: int = 0):
return ["".join([chr(random.randint(97, 122)) for _ in range(length)]) for _ in range(nb)]
def gen_values(schema: CollectionSchema, nb, start_id=0, default_values: dict = {}):
"""
generate default value according to the collection fields,
which can replace the value of the specified field
"""
data = []
for field in schema.fields:
default_value = default_values.get(field.name, None)
if default_value is not None:
data.append(default_value)
elif field.auto_id is False:
data.append(gen_data_by_collection_field(field, nb, start_id * nb))
return data
def gen_json_files_for_bulk_insert(data, schema, data_dir): def gen_json_files_for_bulk_insert(data, schema, data_dir):
for d in data: for d in data:
if len(d) > 0: if len(d) > 0:
@ -2288,3 +2327,71 @@ def gen_vectors_based_on_vector_type(num, dim, vector_data_type):
vectors = gen_sparse_vectors(num, dim) vectors = gen_sparse_vectors(num, dim)
return vectors return vectors
def field_types() -> dict:
return dict(sorted(dict(DataType.__members__).items(), key=lambda item: item[0], reverse=True))
def get_array_element_type(data_type: str):
if hasattr(DataType, "ARRAY") and data_type.startswith(DataType.ARRAY.name):
element_type = data_type.lstrip(DataType.ARRAY.name).lstrip("_")
for _field in field_types().keys():
if str(element_type).upper().startswith(_field):
return _field, getattr(DataType, _field)
raise ValueError(f"[get_array_data_type] Can't find element type:{element_type} for array:{data_type}")
raise ValueError(f"[get_array_data_type] Data type is not start with array: {data_type}")
def set_field_schema(field: str, params: dict):
for k, v in field_types().items():
if str(field).upper().startswith(k):
_kwargs = {}
_field_element, _data_type = k, DataType.NONE
if hasattr(DataType, "ARRAY") and _field_element == DataType.ARRAY.name:
_field_element, _data_type = get_array_element_type(field)
_kwargs.update({"max_capacity": ct.default_max_capacity, "element_type": _data_type})
if _field_element in [DataType.STRING.name, DataType.VARCHAR.name]:
_kwargs.update({"max_length": ct.default_length})
elif _field_element in [DataType.BINARY_VECTOR.name, DataType.FLOAT_VECTOR.name,
DataType.FLOAT16_VECTOR.name, DataType.BFLOAT16_VECTOR.name]:
_kwargs.update({"dim": ct.default_dim})
if isinstance(params, dict):
_kwargs.update(params)
else:
raise ValueError(
f"[set_field_schema] Field `{field}` params is not a dict, type: {type(params)}, params: {params}")
return ApiFieldSchemaWrapper().init_field_schema(name=field, dtype=v, **_kwargs)[0]
raise ValueError(f"[set_field_schema] Can't set field:`{field}` schema: {params}")
def set_collection_schema(fields: list, field_params: dict = {}, **kwargs):
"""
:param fields: List[str]
:param field_params: {<field name>: dict<field params>}
int64_1:
is_primary: bool
description: str
varchar_1:
is_primary: bool
description: str
max_length: int = 65535
array_int8_1:
max_capacity: int = 100
array_varchar_1:
max_capacity: int = 100
max_length: int = 65535
float_vector:
dim: int = 128
:param kwargs: <params for collection schema>
description: str
primary_field: str
auto_id: bool
enable_dynamic_field: bool
"""
field_schemas = [set_field_schema(field=field, params=field_params.get(field, {})) for field in fields]
return ApiCollectionSchemaWrapper().init_collection_schema(fields=field_schemas, **kwargs)[0]

View File

@ -0,0 +1,365 @@
from dataclasses import dataclass
from typing import List, Dict
""" Define param names"""
class IndexName:
# Vector
AUTOINDEX = "AUTOINDEX"
FLAT = "FLAT"
IVF_FLAT = "IVF_FLAT"
IVF_SQ8 = "IVF_SQ8"
IVF_PQ = "IVF_PQ"
IVF_HNSW = "IVF_HNSW"
HNSW = "HNSW"
DISKANN = "DISKANN"
SCANN = "SCANN"
# binary
BIN_FLAT = "BIN_FLAT"
BIN_IVF_FLAT = "BIN_IVF_FLAT"
# Sparse
SPARSE_WAND = "SPARSE_WAND"
SPARSE_INVERTED_INDEX = "SPARSE_INVERTED_INDEX"
# GPU
GPU_IVF_FLAT = "GPU_IVF_FLAT"
GPU_IVF_PQ = "GPU_IVF_PQ"
GPU_CAGRA = "GPU_CAGRA"
GPU_BRUTE_FORCE = "GPU_BRUTE_FORCE"
# Scalar
INVERTED = "INVERTED"
BITMAP = "BITMAP"
Trie = "Trie"
STL_SORT = "STL_SORT"
class MetricType:
L2 = "L2"
IP = "IP"
COSINE = "COSINE"
JACCARD = "JACCARD"
""" expressions """
@dataclass
class ExprBase:
expr: str
@property
def subset(self):
return f"({self.expr})"
def __repr__(self):
return self.expr
class Expr:
# BooleanConstant: 'true' | 'True' | 'TRUE' | 'false' | 'False' | 'FALSE'
@staticmethod
def LT(left, right):
return ExprBase(expr=f"{left} < {right}")
@staticmethod
def LE(left, right):
return ExprBase(expr=f"{left} <= {right}")
@staticmethod
def GT(left, right):
return ExprBase(expr=f"{left} > {right}")
@staticmethod
def GE(left, right):
return ExprBase(expr=f"{left} >= {right}")
@staticmethod
def EQ(left, right):
return ExprBase(expr=f"{left} == {right}")
@staticmethod
def NE(left, right):
return ExprBase(expr=f"{left} != {right}")
@staticmethod
def like(left, right):
return ExprBase(expr=f'{left} like "{right}"')
@staticmethod
def LIKE(left, right):
return ExprBase(expr=f'{left} LIKE "{right}"')
@staticmethod
def exists(name):
return ExprBase(expr=f'exists {name}')
@staticmethod
def EXISTS(name):
return ExprBase(expr=f'EXISTS {name}')
@staticmethod
def ADD(left, right):
return ExprBase(expr=f"{left} + {right}")
@staticmethod
def SUB(left, right):
return ExprBase(expr=f"{left} - {right}")
@staticmethod
def MUL(left, right):
return ExprBase(expr=f"{left} * {right}")
@staticmethod
def DIV(left, right):
return ExprBase(expr=f"{left} / {right}")
@staticmethod
def MOD(left, right):
return ExprBase(expr=f"{left} % {right}")
@staticmethod
def POW(left, right):
return ExprBase(expr=f"{left} ** {right}")
@staticmethod
def SHL(left, right):
# Note: not supported
return ExprBase(expr=f"{left}<<{right}")
@staticmethod
def SHR(left, right):
# Note: not supported
return ExprBase(expr=f"{left}>>{right}")
@staticmethod
def BAND(left, right):
# Note: not supported
return ExprBase(expr=f"{left} & {right}")
@staticmethod
def BOR(left, right):
# Note: not supported
return ExprBase(expr=f"{left} | {right}")
@staticmethod
def BXOR(left, right):
# Note: not supported
return ExprBase(expr=f"{left} ^ {right}")
@staticmethod
def AND(left, right):
return ExprBase(expr=f"{left} && {right}")
@staticmethod
def And(left, right):
return ExprBase(expr=f"{left} and {right}")
@staticmethod
def OR(left, right):
return ExprBase(expr=f"{left} || {right}")
@staticmethod
def Or(left, right):
return ExprBase(expr=f"{left} or {right}")
@staticmethod
def BNOT(name):
# Note: not supported
return ExprBase(expr=f"~{name}")
@staticmethod
def NOT(name):
return ExprBase(expr=f"!{name}")
@staticmethod
def Not(name):
return ExprBase(expr=f"not {name}")
@staticmethod
def In(left, right):
return ExprBase(expr=f"{left} in {right}")
@staticmethod
def Nin(left, right):
return ExprBase(expr=f"{left} not in {right}")
@staticmethod
def json_contains(left, right):
return ExprBase(expr=f"json_contains({left}, {right})")
@staticmethod
def JSON_CONTAINS(left, right):
return ExprBase(expr=f"JSON_CONTAINS({left}, {right})")
@staticmethod
def json_contains_all(left, right):
return ExprBase(expr=f"json_contains_all({left}, {right})")
@staticmethod
def JSON_CONTAINS_ALL(left, right):
return ExprBase(expr=f"JSON_CONTAINS_ALL({left}, {right})")
@staticmethod
def json_contains_any(left, right):
return ExprBase(expr=f"json_contains_any({left}, {right})")
@staticmethod
def JSON_CONTAINS_ANY(left, right):
return ExprBase(expr=f"JSON_CONTAINS_ANY({left}, {right})")
@staticmethod
def array_contains(left, right):
return ExprBase(expr=f"array_contains({left}, {right})")
@staticmethod
def ARRAY_CONTAINS(left, right):
return ExprBase(expr=f"ARRAY_CONTAINS({left}, {right})")
@staticmethod
def array_contains_all(left, right):
return ExprBase(expr=f"array_contains_all({left}, {right})")
@staticmethod
def ARRAY_CONTAINS_ALL(left, right):
return ExprBase(expr=f"ARRAY_CONTAINS_ALL({left}, {right})")
@staticmethod
def array_contains_any(left, right):
return ExprBase(expr=f"array_contains_any({left}, {right})")
@staticmethod
def ARRAY_CONTAINS_ANY(left, right):
return ExprBase(expr=f"ARRAY_CONTAINS_ANY({left}, {right})")
@staticmethod
def array_length(name):
return ExprBase(expr=f"array_length({name})")
@staticmethod
def ARRAY_LENGTH(name):
return ExprBase(expr=f"ARRAY_LENGTH({name})")
"""" Define pass in params """
@dataclass
class BasePrams:
@property
def to_dict(self):
return {k: v for k, v in vars(self).items() if v is not None}
@dataclass
class FieldParams(BasePrams):
description: str = None
# varchar
max_length: int = None
# array
max_capacity: int = None
# for vector
dim: int = None
# scalar
is_primary: bool = None
# auto_id: bool = None
is_partition_key: bool = None
is_clustering_key: bool = None
@dataclass
class IndexPrams(BasePrams):
index_type: str = None
params: dict = None
metric_type: str = None
""" Define default params """
class DefaultVectorIndexParams:
@staticmethod
def FLAT(field: str, metric_type=MetricType.L2):
return {field: IndexPrams(index_type=IndexName.FLAT, params={}, metric_type=metric_type)}
@staticmethod
def IVF_FLAT(field: str, nlist: int = 1024, metric_type=MetricType.L2):
return {
field: IndexPrams(index_type=IndexName.IVF_FLAT, params={"nlist": nlist}, metric_type=metric_type)
}
@staticmethod
def IVF_SQ8(field: str, nlist: int = 1024, metric_type=MetricType.L2):
return {
field: IndexPrams(index_type=IndexName.IVF_SQ8, params={"nlist": nlist}, metric_type=metric_type)
}
@staticmethod
def HNSW(field: str, m: int = 8, ef: int = 200, metric_type=MetricType.L2):
return {
field: IndexPrams(index_type=IndexName.HNSW, params={"M": m, "efConstruction": ef}, metric_type=metric_type)
}
@staticmethod
def DISKANN(field: str, metric_type=MetricType.L2):
return {field: IndexPrams(index_type=IndexName.DISKANN, params={}, metric_type=metric_type)}
@staticmethod
def BIN_FLAT(field: str, nlist: int = 1024, metric_type=MetricType.JACCARD):
return {
field: IndexPrams(index_type=IndexName.BIN_FLAT, params={"nlist": nlist}, metric_type=metric_type)
}
@staticmethod
def BIN_IVF_FLAT(field: str, nlist: int = 1024, metric_type=MetricType.JACCARD):
return {
field: IndexPrams(index_type=IndexName.BIN_IVF_FLAT, params={"nlist": nlist},
metric_type=metric_type)
}
@staticmethod
def SPARSE_WAND(field: str, drop_ratio_build: int = 0.2, metric_type=MetricType.IP):
return {
field: IndexPrams(index_type=IndexName.SPARSE_WAND, params={"drop_ratio_build": drop_ratio_build},
metric_type=metric_type)
}
@staticmethod
def SPARSE_INVERTED_INDEX(field: str, drop_ratio_build: int = 0.2, metric_type=MetricType.IP):
return {
field: IndexPrams(index_type=IndexName.SPARSE_INVERTED_INDEX, params={"drop_ratio_build": drop_ratio_build},
metric_type=metric_type)
}
class DefaultScalarIndexParams:
@staticmethod
def Default(field: str):
return {field: IndexPrams()}
@staticmethod
def Trie(field: str):
return {field: IndexPrams(index_type=IndexName.Trie)}
@staticmethod
def STL_SORT(field: str):
return {field: IndexPrams(index_type=IndexName.STL_SORT)}
@staticmethod
def INVERTED(field: str):
return {field: IndexPrams(index_type=IndexName.INVERTED)}
@staticmethod
def BITMAP(field: str):
return {field: IndexPrams(index_type=IndexName.BITMAP)}
@staticmethod
def list_bitmap(fields: List[str]) -> Dict[str, IndexPrams]:
return {n: IndexPrams(index_type=IndexName.BITMAP) for n in fields}

View File

@ -286,6 +286,7 @@ class CheckTasks:
check_value_equal = "check_value_equal" check_value_equal = "check_value_equal"
check_rg_property = "check_resource_group_property" check_rg_property = "check_resource_group_property"
check_describe_collection_property = "check_describe_collection_property" check_describe_collection_property = "check_describe_collection_property"
check_insert_result = "check_insert_result"
class BulkLoadStates: class BulkLoadStates:

View File

@ -13,6 +13,9 @@ from common import common_type as ct
from common.common_type import CaseLabel, CheckTasks from common.common_type import CaseLabel, CheckTasks
from common.code_mapping import CollectionErrorMessage as clem from common.code_mapping import CollectionErrorMessage as clem
from common.code_mapping import IndexErrorMessage as iem from common.code_mapping import IndexErrorMessage as iem
from common.common_params import (
IndexName, FieldParams, IndexPrams, DefaultVectorIndexParams, DefaultScalarIndexParams, MetricType
)
from utils.util_pymilvus import * from utils.util_pymilvus import *
from common.constants import * from common.constants import *
@ -336,7 +339,8 @@ class TestIndexOperation(TestcaseBase):
vec_field2 = cf.gen_float_vec_field(name="vec_field2", dim=32) vec_field2 = cf.gen_float_vec_field(name="vec_field2", dim=32)
str_field = cf.gen_string_field(name="str_field") str_field = cf.gen_string_field(name="str_field")
str_field2 = cf.gen_string_field(name="str_field2") str_field2 = cf.gen_string_field(name="str_field2")
schema, _ = self.collection_schema_wrap.init_collection_schema([id_field, vec_field, vec_field2, str_field, str_field2]) schema, _ = self.collection_schema_wrap.init_collection_schema(
[id_field, vec_field, vec_field2, str_field, str_field2])
collection_w = self.init_collection_wrap(schema=schema) collection_w = self.init_collection_wrap(schema=schema)
vec_index = ct.default_index vec_index = ct.default_index
vec_index_name = "my_index" vec_index_name = "my_index"
@ -378,7 +382,7 @@ class TestIndexOperation(TestcaseBase):
cf.assert_equal_index(index, collection_w.collection.indexes[0]) cf.assert_equal_index(index, collection_w.collection.indexes[0])
self.index_wrap.drop() self.index_wrap.drop()
assert len(collection_w.indexes) == 0 assert len(collection_w.indexes) == 0
@pytest.mark.tags(CaseLabel.L1) @pytest.mark.tags(CaseLabel.L1)
def test_index_drop_repeatedly(self): def test_index_drop_repeatedly(self):
""" """
@ -640,7 +644,8 @@ class TestNewIndexBase(TestcaseBase):
collection_w = self.init_collection_wrap(name=c_name) collection_w = self.init_collection_wrap(name=c_name)
data = cf.gen_default_list_data() data = cf.gen_default_list_data()
collection_w.insert(data=data) collection_w.insert(data=data)
index_prams = [default_ivf_flat_index, {"metric_type": "L2", "index_type": "IVF_SQ8", "params": {"nlist": 1024}}] index_prams = [default_ivf_flat_index,
{"metric_type": "L2", "index_type": "IVF_SQ8", "params": {"nlist": 1024}}]
for index in index_prams: for index in index_prams:
index_name = cf.gen_unique_str("name") index_name = cf.gen_unique_str("name")
collection_w.create_index(default_float_vec_field_name, index, index_name=index_name) collection_w.create_index(default_float_vec_field_name, index, index_name=index_name)
@ -1018,7 +1023,6 @@ class TestNewIndexBase(TestcaseBase):
@pytest.mark.tags(CaseLabel.GPU) @pytest.mark.tags(CaseLabel.GPU)
class TestNewIndexBinary(TestcaseBase): class TestNewIndexBinary(TestcaseBase):
""" """
****************************************************************** ******************************************************************
The following cases are used to test `create_index` function The following cases are used to test `create_index` function
@ -1176,7 +1180,7 @@ class TestIndexInvalid(TestcaseBase):
Test create / describe / drop index interfaces with invalid collection names Test create / describe / drop index interfaces with invalid collection names
""" """
@pytest.fixture(scope="function", params=["Trie", "STL_SORT", "INVERTED"]) @pytest.fixture(scope="function", params=["Trie", "STL_SORT", "INVERTED", IndexName.BITMAP])
def scalar_index(self, request): def scalar_index(self, request):
yield request.param yield request.param
@ -1366,7 +1370,7 @@ class TestIndexInvalid(TestcaseBase):
collection_w.alter_index("random_index_345", {'mmap.enabled': True}, collection_w.alter_index("random_index_345", {'mmap.enabled': True},
check_task=CheckTasks.err_res, check_task=CheckTasks.err_res,
check_items={ct.err_code: 65535, check_items={ct.err_code: 65535,
ct.err_msg: f"index not found"}) ct.err_msg: f"index not found"})
@pytest.mark.tags(CaseLabel.L1) @pytest.mark.tags(CaseLabel.L1)
def test_load_mmap_index(self): def test_load_mmap_index(self):
@ -1460,8 +1464,8 @@ class TestIndexInvalid(TestcaseBase):
params = {"index_type": index, "metric_type": metric_type, "params": param} params = {"index_type": index, "metric_type": metric_type, "params": param}
error = {ct.err_code: 65535, ct.err_msg: "only IP is the supported metric type for sparse index"} error = {ct.err_code: 65535, ct.err_msg: "only IP is the supported metric type for sparse index"}
index, _ = self.index_wrap.init_index(collection_w.collection, ct.default_sparse_vec_field_name, params, index, _ = self.index_wrap.init_index(collection_w.collection, ct.default_sparse_vec_field_name, params,
check_task=CheckTasks.err_res, check_task=CheckTasks.err_res,
check_items=error) check_items=error)
@pytest.mark.tags(CaseLabel.L2) @pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("ratio", [-0.5, 1, 3]) @pytest.mark.parametrize("ratio", [-0.5, 1, 3])
@ -1478,7 +1482,8 @@ class TestIndexInvalid(TestcaseBase):
data = cf.gen_default_list_sparse_data() data = cf.gen_default_list_sparse_data()
collection_w.insert(data=data) collection_w.insert(data=data)
params = {"index_type": index, "metric_type": "IP", "params": {"drop_ratio_build": ratio}} params = {"index_type": index, "metric_type": "IP", "params": {"drop_ratio_build": ratio}}
error = {ct.err_code: 1100, ct.err_msg: f"invalid drop_ratio_build: {ratio}, must be in range [0, 1): invalid parameter[expected=valid index params"} error = {ct.err_code: 1100,
ct.err_msg: f"invalid drop_ratio_build: {ratio}, must be in range [0, 1): invalid parameter[expected=valid index params"}
index, _ = self.index_wrap.init_index(collection_w.collection, ct.default_sparse_vec_field_name, params, index, _ = self.index_wrap.init_index(collection_w.collection, ct.default_sparse_vec_field_name, params,
check_task=CheckTasks.err_res, check_task=CheckTasks.err_res,
check_items=error) check_items=error)
@ -1605,7 +1610,8 @@ class TestIndexString(TestcaseBase):
index, _ = self.index_wrap.init_index(collection_w.collection, default_string_field_name, index, _ = self.index_wrap.init_index(collection_w.collection, default_string_field_name,
default_string_index_params) default_string_index_params)
cf.assert_equal_index(index, collection_w.indexes[0]) cf.assert_equal_index(index, collection_w.indexes[0])
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index, index_name="vector_flat") collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index,
index_name="vector_flat")
collection_w.load() collection_w.load()
assert collection_w.num_entities == default_nb assert collection_w.num_entities == default_nb
@ -1621,7 +1627,8 @@ class TestIndexString(TestcaseBase):
collection_w = self.init_collection_wrap(name=c_name) collection_w = self.init_collection_wrap(name=c_name)
data = cf.gen_default_list_data(ct.default_nb) data = cf.gen_default_list_data(ct.default_nb)
collection_w.insert(data=data) collection_w.insert(data=data)
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index, index_name="vector_flat") collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index,
index_name="vector_flat")
index, _ = self.index_wrap.init_index(collection_w.collection, default_string_field_name, index, _ = self.index_wrap.init_index(collection_w.collection, default_string_field_name,
default_string_index_params) default_string_index_params)
collection_w.load() collection_w.load()
@ -1753,7 +1760,7 @@ class TestIndexString(TestcaseBase):
collection_w.create_index(default_string_field_name, default_string_index_params, index_name=index_name2) collection_w.create_index(default_string_field_name, default_string_index_params, index_name=index_name2)
collection_w.drop_index(index_name=index_name2) collection_w.drop_index(index_name=index_name2)
assert len(collection_w.indexes) == 0 assert len(collection_w.indexes) == 0
@pytest.mark.tags(CaseLabel.L1) @pytest.mark.tags(CaseLabel.L1)
def test_index_with_string_field_empty(self): def test_index_with_string_field_empty(self):
""" """
@ -1767,7 +1774,7 @@ class TestIndexString(TestcaseBase):
nb = 3000 nb = 3000
data = cf.gen_default_list_data(nb) data = cf.gen_default_list_data(nb)
data[2] = [""for _ in range(nb)] data[2] = ["" for _ in range(nb)]
collection_w.insert(data=data) collection_w.insert(data=data)
collection_w.create_index(default_string_field_name, default_string_index_params, index_name=index_name2) collection_w.create_index(default_string_field_name, default_string_index_params, index_name=index_name2)
@ -1783,6 +1790,7 @@ class TestIndexDiskann(TestcaseBase):
The following cases are used to test create index about diskann The following cases are used to test create index about diskann
****************************************************************** ******************************************************************
""" """
@pytest.fixture(scope="function", params=[False, True]) @pytest.fixture(scope="function", params=[False, True])
def _async(self, request): def _async(self, request):
yield request.param yield request.param
@ -1804,14 +1812,15 @@ class TestIndexDiskann(TestcaseBase):
data = cf.gen_default_list_data() data = cf.gen_default_list_data()
collection_w.insert(data=data) collection_w.insert(data=data)
assert collection_w.num_entities == default_nb assert collection_w.num_entities == default_nb
index, _ = self.index_wrap.init_index(collection_w.collection, default_float_vec_field_name, ct.default_diskann_index) index, _ = self.index_wrap.init_index(collection_w.collection, default_float_vec_field_name,
ct.default_diskann_index)
log.info(self.index_wrap.params) log.info(self.index_wrap.params)
cf.assert_equal_index(index, collection_w.indexes[0]) cf.assert_equal_index(index, collection_w.indexes[0])
collection_w.load() collection_w.load()
vectors = [[random.random() for _ in range(default_dim)] for _ in range(default_nq)] vectors = [[random.random() for _ in range(default_dim)] for _ in range(default_nq)]
search_res, _ = collection_w.search(vectors[:default_nq], default_search_field, search_res, _ = collection_w.search(vectors[:default_nq], default_search_field,
ct.default_diskann_search_params, default_limit, ct.default_diskann_search_params, default_limit,
default_search_exp, default_search_exp,
check_task=CheckTasks.check_search_results, check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq, check_items={"nq": default_nq,
"limit": default_limit}) "limit": default_limit})
@ -1854,11 +1863,11 @@ class TestIndexDiskann(TestcaseBase):
vectors = [[random.random() for _ in range(default_dim)] for _ in range(default_nq)] vectors = [[random.random() for _ in range(default_dim)] for _ in range(default_nq)]
search_res, _ = collection_w.search(vectors[:default_nq], default_search_field, search_res, _ = collection_w.search(vectors[:default_nq], default_search_field,
ct.default_diskann_search_params, default_limit, ct.default_diskann_search_params, default_limit,
default_search_exp, default_search_exp,
check_task=CheckTasks.check_search_results, check_task=CheckTasks.check_search_results,
check_items={"nq": default_nq, check_items={"nq": default_nq,
"limit": default_limit}) "limit": default_limit})
@pytest.mark.tags(CaseLabel.L2) @pytest.mark.tags(CaseLabel.L2)
def test_create_diskann_index_drop_with_async(self, _async): def test_create_diskann_index_drop_with_async(self, _async):
""" """
@ -1901,7 +1910,7 @@ class TestIndexDiskann(TestcaseBase):
index_name=field_name) index_name=field_name)
collection_w.load() collection_w.load()
assert collection_w.has_index(index_name=field_name)[0] is True assert collection_w.has_index(index_name=field_name)[0] is True
assert len(collection_w.indexes) == 1 assert len(collection_w.indexes) == 1
collection_w.release() collection_w.release()
collection_w.drop_index(index_name=field_name) collection_w.drop_index(index_name=field_name)
assert collection_w.has_index(index_name=field_name)[0] is False assert collection_w.has_index(index_name=field_name)[0] is False
@ -1926,7 +1935,7 @@ class TestIndexDiskann(TestcaseBase):
collection_w.release() collection_w.release()
collection_w.drop_index(index_name=index_name1) collection_w.drop_index(index_name=index_name1)
assert collection_w.has_index(index_name=index_name1)[0] is False assert collection_w.has_index(index_name=index_name1)[0] is False
@pytest.mark.tags(CaseLabel.L2) @pytest.mark.tags(CaseLabel.L2)
def test_drop_diskann_index_and_create_again(self): def test_drop_diskann_index_and_create_again(self):
""" """
@ -1968,7 +1977,7 @@ class TestIndexDiskann(TestcaseBase):
default_params = {} default_params = {}
collection_w.create_index("float", default_params, index_name="c") collection_w.create_index("float", default_params, index_name="c")
assert collection_w.has_index(index_name="c")[0] == True assert collection_w.has_index(index_name="c")[0] == True
@pytest.mark.tags(CaseLabel.L2) @pytest.mark.tags(CaseLabel.L2)
def test_drop_diskann_index_with_partition(self): def test_drop_diskann_index_with_partition(self):
""" """
@ -2045,7 +2054,8 @@ class TestIndexDiskann(TestcaseBase):
c_name = cf.gen_unique_str(prefix) c_name = cf.gen_unique_str(prefix)
collection_w = self.init_collection_wrap(c_name, schema=default_schema) collection_w = self.init_collection_wrap(c_name, schema=default_schema)
collection_w.insert(cf.gen_default_list_data()) collection_w.insert(cf.gen_default_list_data())
collection_w.create_index(default_float_vec_field_name, ct.default_diskann_index, index_name=ct.default_index_name) collection_w.create_index(default_float_vec_field_name, ct.default_diskann_index,
index_name=ct.default_index_name)
collection_w.set_properties({'mmap.enabled': True}) collection_w.set_properties({'mmap.enabled': True})
desc, _ = collection_w.describe() desc, _ = collection_w.describe()
pro = desc.get("properties") pro = desc.get("properties")
@ -2204,7 +2214,6 @@ class TestInvertedIndexValid(TestcaseBase):
def vector_data_type(self, request): def vector_data_type(self, request):
yield request.param yield request.param
@pytest.mark.tags(CaseLabel.L1) @pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("scalar_field_name", [ct.default_int8_field_name, ct.default_int16_field_name, @pytest.mark.parametrize("scalar_field_name", [ct.default_int8_field_name, ct.default_int16_field_name,
ct.default_int32_field_name, ct.default_int64_field_name, ct.default_int32_field_name, ct.default_int64_field_name,
@ -2285,3 +2294,416 @@ class TestInvertedIndexValid(TestcaseBase):
scalar_index_params = {"index_type": f"{scalar_index[i]}"} scalar_index_params = {"index_type": f"{scalar_index[i]}"}
collection_w.create_index(scalar_fields[i], index_params=scalar_index_params, index_name=index_name) collection_w.create_index(scalar_fields[i], index_params=scalar_index_params, index_name=index_name)
assert collection_w.has_index(index_name=index_name)[0] is True assert collection_w.has_index(index_name=index_name)[0] is True
class TestBitmapIndex(TestcaseBase):
"""
Functional `BITMAP` index
Author: Ting.Wang
"""
def setup_method(self, method):
super().setup_method(method)
# connect to server before testing
self._connect()
@property
def get_bitmap_support_dtype_names(self):
dtypes = [DataType.BOOL, DataType.INT8, DataType.INT16, DataType.INT32, DataType.INT64, DataType.VARCHAR]
dtype_names = [f"{n.name}" for n in dtypes] + [f"ARRAY_{n.name}" for n in dtypes]
return dtype_names
@property
def get_bitmap_not_support_dtype_names(self):
dtypes = [DataType.FLOAT, DataType.DOUBLE]
dtype_names = [f"{n.name}" for n in dtypes] + [f"ARRAY_{n.name}" for n in dtypes] + [DataType.JSON.name]
return dtype_names
@pytest.mark.tags(CaseLabel.L0)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("primary_field", ["int64_pk", "varchar_pk"])
def test_bitmap_on_primary_key_field(self, request, primary_field, auto_id):
"""
target:
1. build BITMAP index on primary key field
method:
1. create an empty collection
2. build `BITMAP` index on primary key field
expected:
1. Primary key filed does not support building bitmap index
"""
# init params
collection_name = f"{request.function.__name__}_{primary_field}_{auto_id}"
# create a collection with fields that can build `BITMAP` index
self.collection_wrap.init_collection(
name=collection_name,
schema=cf.set_collection_schema(
fields=[primary_field, DataType.FLOAT_VECTOR.name],
field_params={primary_field: FieldParams(is_primary=True).to_dict},
auto_id=auto_id
)
)
# build `BITMAP` index on primary key field
self.collection_wrap.create_index(
field_name=primary_field, index_params={"index_type": IndexName.BITMAP}, index_name=primary_field,
check_task=CheckTasks.err_res, check_items={ct.err_code: 1100, ct.err_msg: iem.CheckBitmapOnPK})
@pytest.mark.tags(CaseLabel.L0)
def test_bitmap_on_not_supported_fields(self, request):
"""
target:
1. build `BITMAP` index on not supported fields
method:
1. create an empty collection with fields:
[`varchar_pk`, `SPARSE_FLOAT_VECTOR`, `FLOAT`, `DOUBLE`, `JSON`, `ARRAY`, `ARRAY_FLOAT`, `ARRAY_DOUBLE`]
2. build different `BITMAP` index params on not supported fields
expected:
1. check build index failed, assert error code and message
"""
# init params
collection_name, primary_field = f"{request.function.__name__}", "varchar_pk"
# create a collection with fields that can build `BITMAP` index
self.collection_wrap.init_collection(
name=collection_name,
schema=cf.set_collection_schema(
fields=[primary_field, DataType.SPARSE_FLOAT_VECTOR.name, *self.get_bitmap_not_support_dtype_names],
field_params={primary_field: FieldParams(is_primary=True).to_dict}
)
)
# build `BITMAP` index on sparse vector field
for msg, index_params in {
iem.VectorMetricTypeExist: IndexPrams(index_type=IndexName.BITMAP),
iem.SparseFloatVectorMetricType: IndexPrams(index_type=IndexName.BITMAP, metric_type=MetricType.L2),
iem.CheckVectorIndex.format(DataType.SPARSE_FLOAT_VECTOR, IndexName.BITMAP): IndexPrams(
index_type=IndexName.BITMAP, metric_type=MetricType.IP)
}.items():
self.collection_wrap.create_index(
field_name=DataType.SPARSE_FLOAT_VECTOR.name, index_params=index_params.to_dict,
check_task=CheckTasks.err_res, check_items={ct.err_code: 1100, ct.err_msg: msg}
)
# build `BITMAP` index on not supported scalar fields
for _field_name in self.get_bitmap_not_support_dtype_names:
self.collection_wrap.create_index(
field_name=_field_name, index_params=IndexPrams(index_type=IndexName.BITMAP).to_dict,
check_task=CheckTasks.err_res, check_items={ct.err_code: 1100, ct.err_msg: iem.CheckBitmapIndex}
)
@pytest.mark.tags(CaseLabel.L0)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("primary_field", ["int64_pk", "varchar_pk"])
def test_bitmap_on_empty_collection(self, request, primary_field, auto_id):
"""
target:
1. create `BITMAP` index on all supported fields
2. build scalar index on loaded collection
method:
1. build and drop `BITMAP` index on an empty collection
2. rebuild `BITMAP` index on loaded collection
3. drop index on loaded collection and raises expected error
4. re-build the same index on loaded collection
expected:
1. build and drop index successful on a not loaded collection
2. build index successful on non-indexed and loaded fields
3. can not drop index on loaded collection
"""
# init params
collection_name, nb = f"{request.function.__name__}_{primary_field}_{auto_id}", 3000
# create a collection with fields that can build `BITMAP` index
self.collection_wrap.init_collection(
name=collection_name,
schema=cf.set_collection_schema(
fields=[primary_field, DataType.FLOAT_VECTOR.name, *self.get_bitmap_support_dtype_names],
field_params={primary_field: FieldParams(is_primary=True).to_dict},
auto_id=auto_id
)
)
# build `BITMAP` index on empty collection
index_params = {
**DefaultVectorIndexParams.HNSW(DataType.FLOAT_VECTOR.name),
**DefaultScalarIndexParams.list_bitmap(self.get_bitmap_support_dtype_names)
}
self.build_multi_index(index_params=index_params)
assert sorted([n.field_name for n in self.collection_wrap.indexes]) == sorted(index_params.keys())
# drop scalars' index
self.drop_multi_index(index_names=list(set(index_params.keys()) - {DataType.FLOAT_VECTOR.name}))
assert len(self.collection_wrap.indexes) == 1
# load collection
self.collection_wrap.load()
# build scalars' index after loading collection
self.build_multi_index(index_params={k: v for k, v in index_params.items() if v.index_type == IndexName.BITMAP})
# reload collection
self.collection_wrap.load()
# re-drop scalars' index
self.drop_multi_index(index_names=list(set(index_params.keys()) - {DataType.FLOAT_VECTOR.name}),
check_task=CheckTasks.err_res,
check_items={ct.err_code: 65535, ct.err_msg: iem.DropLoadedIndex})
# re-build loaded index
self.build_multi_index(index_params=index_params)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("primary_field", ["int64_pk", "varchar_pk"])
def test_bitmap_insert_after_loading(self, request, primary_field, auto_id):
"""
target:
1. insert data after building `BITMAP` index and loading collection
method:
1. build index and loaded an empty collection
2. insert 3k data
3. check no indexed data
4. flush collection, re-build index and refresh load collection
5. row number of indexed data equal to insert data
expected:
1. insertion is successful
2. segment row number == inserted rows
"""
# init params
collection_name, nb = f"{request.function.__name__}_{primary_field}_{auto_id}", 3000
# create a collection with fields that can build `BITMAP` index
self.collection_wrap.init_collection(
name=collection_name,
schema=cf.set_collection_schema(
fields=[primary_field, DataType.FLOAT16_VECTOR.name, *self.get_bitmap_support_dtype_names],
field_params={primary_field: FieldParams(is_primary=True).to_dict},
auto_id=auto_id
)
)
# build `BITMAP` index on empty collection
index_params = {
**DefaultVectorIndexParams.IVF_SQ8(DataType.FLOAT16_VECTOR.name),
**DefaultScalarIndexParams.list_bitmap(self.get_bitmap_support_dtype_names)
}
self.build_multi_index(index_params=index_params)
assert sorted([n.field_name for n in self.collection_wrap.indexes]) == sorted(index_params.keys())
# load collection
self.collection_wrap.load()
# prepare 3k data (> 1024 triggering index building)
self.collection_wrap.insert(data=cf.gen_values(self.collection_wrap.schema, nb=nb),
check_task=CheckTasks.check_insert_result)
# check no indexed segments
res, _ = self.utility_wrap.get_query_segment_info(collection_name=collection_name)
assert len(res) == 0
# flush collection, segment sealed
self.collection_wrap.flush()
# re-build vector field index
self.build_multi_index(index_params=DefaultVectorIndexParams.IVF_SQ8(DataType.FLOAT16_VECTOR.name))
# load refresh, ensure that loaded indexed segments
self.collection_wrap.load(_refresh=True)
# check segment row number
counts = [int(n.num_rows) for n in self.utility_wrap.get_query_segment_info(collection_name=collection_name)[0]]
assert sum(counts) == nb, f"`{collection_name}` Segment row count:{sum(counts)} != insert:{nb}"
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("primary_field", ["int64_pk", "varchar_pk"])
def test_bitmap_insert_before_loading(self, request, primary_field, auto_id):
"""
target:
1. insert data before building `BITMAP` index and loading collection
method:
1. insert data into an empty collection
2. flush collection, build index and load collection
3. the number of segments equal to shards_num
expected:
1. insertion is successful
2. the number of segments == shards_num
3. segment row number == inserted rows
"""
# init params
collection_name, nb, shards_num = f"{request.function.__name__}_{primary_field}_{auto_id}", 3000, 16
# create a collection with fields that can build `BITMAP` index
self.collection_wrap.init_collection(
name=collection_name,
schema=cf.set_collection_schema(
fields=[primary_field, DataType.BFLOAT16_VECTOR.name, *self.get_bitmap_support_dtype_names],
field_params={primary_field: FieldParams(is_primary=True).to_dict},
auto_id=auto_id
),
shards_num=shards_num
)
# prepare data (> 1024 triggering index building)
pk_type = "str" if primary_field.startswith(DataType.VARCHAR.name.lower()) else "int"
default_values = {} if auto_id else {primary_field: [eval(f"{pk_type}({n})") for n in range(nb)]}
self.collection_wrap.insert(
data=cf.gen_values(self.collection_wrap.schema, nb=nb, default_values=default_values),
check_task=CheckTasks.check_insert_result
)
# flush collection, segment sealed
self.collection_wrap.flush()
# build `BITMAP` index
index_params = {
**DefaultVectorIndexParams.DISKANN(DataType.BFLOAT16_VECTOR.name),
**DefaultScalarIndexParams.list_bitmap(self.get_bitmap_support_dtype_names)
}
self.build_multi_index(index_params=index_params)
assert sorted([n.field_name for n in self.collection_wrap.indexes]) == sorted(index_params.keys())
# load collection
self.collection_wrap.load()
# get segment info
segment_info, _ = self.utility_wrap.get_query_segment_info(collection_name=collection_name)
# check segment counts == shards_num
assert len(segment_info) == shards_num
# check segment row number
counts = [int(n.num_rows) for n in segment_info]
assert sum(counts) == nb, f"`{collection_name}` Segment row count:{sum(counts)} != insert:{nb}"
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("primary_field", ["int64_pk", "varchar_pk"])
@pytest.mark.parametrize("shards_num, nb", [(2, 3791), (16, 1600), (16, 10)])
def test_bitmap_primary_field_data_repeated(self, request, primary_field, shards_num, nb):
"""
target:
1. the same pk value is inserted into the same shard
method:
1. generate the same pk value and insert data into an empty collection
2. flush collection, build index and load collection
3. the number of segments equal to 1
4. row number of indexed data equal to insert data
expected:
1. insertion is successful
2. the number of segments == 1
3. segment row number == inserted rows
"""
# init params
collection_name = f"{request.function.__name__}_{primary_field}_{shards_num}_{nb}"
# create a collection with fields that can build `BITMAP` index
self.collection_wrap.init_collection(
name=collection_name,
schema=cf.set_collection_schema(
fields=[primary_field, DataType.BINARY_VECTOR.name, *self.get_bitmap_support_dtype_names],
field_params={primary_field: FieldParams(is_primary=True).to_dict},
),
shards_num=shards_num
)
# prepare data (> 1024 triggering index building)
pk_key = str(shards_num) if primary_field.startswith(DataType.VARCHAR.name.lower()) else shards_num
self.collection_wrap.insert(
data=cf.gen_values(self.collection_wrap.schema, nb=nb,
default_values={primary_field: [pk_key for _ in range(nb)]}),
check_task=CheckTasks.check_insert_result
)
# flush collection, segment sealed
self.collection_wrap.flush()
# build `BITMAP` index
index_params = {
**DefaultVectorIndexParams.BIN_IVF_FLAT(DataType.BINARY_VECTOR.name),
**DefaultScalarIndexParams.list_bitmap(self.get_bitmap_support_dtype_names)
}
self.build_multi_index(index_params=index_params)
assert sorted([n.field_name for n in self.collection_wrap.indexes]) == sorted(index_params.keys())
# load collection
self.collection_wrap.load()
# get segment info
segment_info, _ = self.utility_wrap.get_query_segment_info(collection_name=collection_name)
# check segments count
msg = f"`{collection_name}` Segments count:{len(segment_info)} != 1, pk field data is repeated."
assert len(segment_info) == 1, msg
# check segment row number
counts = [int(n.num_rows) for n in segment_info]
assert sum(counts) == nb, f"`{collection_name}` Segment row count:{sum(counts)} != insert:{nb}"
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("primary_field", ["int64_pk", "varchar_pk"])
@pytest.mark.parametrize("shards_num, nb", [(1, 1000), (2, 3791), (16, 1600), (16, 10)])
def test_bitmap_primary_field_data_not_repeated(self, request, primary_field, shards_num, nb):
"""
target:
1. different pk values are inserted into the different shards
method:
1. generate different pk values and insert data into an empty collection
2. flush collection, build index and load collection
3. the number of segments equal to shards_num or less than insert data
4. row number of indexed data equal to insert data
expected:
1. insertion is successful
2. the number of segments == shards_num or <= insert data
3. segment row number == inserted rows
"""
# init params
collection_name = f"{request.function.__name__}_{primary_field}_{shards_num}_{nb}"
# create a collection with fields that can build `BITMAP` index
self.collection_wrap.init_collection(
name=collection_name,
schema=cf.set_collection_schema(
fields=[primary_field, DataType.BINARY_VECTOR.name, *self.get_bitmap_support_dtype_names],
field_params={primary_field: FieldParams(is_primary=True).to_dict},
),
shards_num=shards_num
)
# prepare data (> 1024 triggering index building)
pk_type = "str" if primary_field.startswith(DataType.VARCHAR.name.lower()) else "int"
self.collection_wrap.insert(
data=cf.gen_values(self.collection_wrap.schema, nb=nb,
default_values={primary_field: [eval(f"{pk_type}({n})") for n in range(nb)]}),
check_task=CheckTasks.check_insert_result
)
# flush collection, segment sealed
self.collection_wrap.flush()
# build `BITMAP` index on empty collection
index_params = {
**DefaultVectorIndexParams.BIN_IVF_FLAT(DataType.BINARY_VECTOR.name),
**DefaultScalarIndexParams.list_bitmap(self.get_bitmap_support_dtype_names)
}
self.build_multi_index(index_params=index_params)
assert sorted([n.field_name for n in self.collection_wrap.indexes]) == sorted(index_params.keys())
# load collection
self.collection_wrap.load()
# get segment info
segment_info, _ = self.utility_wrap.get_query_segment_info(collection_name=collection_name)
# check segments count
if shards_num > nb:
msg = f"`{collection_name}` Segments count:{len(segment_info)} > insert data:{nb}"
assert len(segment_info) <= nb, msg
else:
msg = f"`{collection_name}` Segments count:{len(segment_info)} != shards_num:{shards_num}"
assert len(segment_info) == shards_num, msg
# check segment row number
counts = [int(n.num_rows) for n in segment_info]
assert sum(counts) == nb, f"`{collection_name}` Segment row count:{sum(counts)} != insert:{nb}"