mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
test: add more request type checker (#29826)
add more request type checker pr: https://github.com/milvus-io/milvus/pull/29811 --------- Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
parent
df9b3376dc
commit
58b9b8fdba
@ -1,7 +1,7 @@
|
||||
import pytest
|
||||
import unittest
|
||||
from enum import Enum
|
||||
from random import randint
|
||||
import random
|
||||
import time
|
||||
import threading
|
||||
import os
|
||||
@ -282,14 +282,22 @@ def exception_handler():
|
||||
def wrapper(func):
|
||||
@functools.wraps(func)
|
||||
def inner_wrapper(self, *args, **kwargs):
|
||||
class_name = None
|
||||
function_name = None
|
||||
try:
|
||||
function_name = func.__name__
|
||||
class_name = getattr(self, '__class__', None).__name__ if self else None
|
||||
res, result = func(self, *args, **kwargs)
|
||||
return res, result
|
||||
except Exception as e:
|
||||
log_row_length = 300
|
||||
e_str = str(e)
|
||||
log_e = e_str[0:log_row_length] + \
|
||||
'......' if len(e_str) > log_row_length else e_str
|
||||
log_e = e_str[0:log_row_length] + '......' if len(e_str) > log_row_length else e_str
|
||||
if class_name:
|
||||
log_message = f"Error in {class_name}.{function_name}: {log_e}"
|
||||
else:
|
||||
log_message = f"Error in {function_name}: {log_e}"
|
||||
log.error(log_message)
|
||||
log.error(log_e)
|
||||
return Error(e), False
|
||||
|
||||
@ -306,7 +314,7 @@ class Checker:
|
||||
"""
|
||||
|
||||
def __init__(self, collection_name=None, partition_name=None, shards_num=2, dim=ct.default_dim, insert_data=True,
|
||||
schema=None):
|
||||
schema=None, replica_number=1, **kwargs):
|
||||
self.recovery_time = 0
|
||||
self._succ = 0
|
||||
self._fail = 0
|
||||
@ -337,26 +345,45 @@ class Checker:
|
||||
shards_num=shards_num,
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback)
|
||||
self.index_name = "vec_index"
|
||||
self.c_wrap.create_index(self.float_vector_field_name,
|
||||
constants.DEFAULT_INDEX_PARAM,
|
||||
index_name=self.index_name,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing)
|
||||
self.replica_number=replica_number
|
||||
self.c_wrap.load(replica_number=self.replica_number)
|
||||
|
||||
self.p_wrap.init_partition(self.c_name, self.p_name)
|
||||
if insert_data:
|
||||
log.info(f"collection {c_name} created, start to insert data")
|
||||
t0 = time.perf_counter()
|
||||
self.c_wrap.insert(
|
||||
data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=schema, start=0),
|
||||
data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=schema),
|
||||
partition_name=self.p_name,
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback)
|
||||
log.info(f"insert data for collection {c_name} cost {time.perf_counter() - t0}s")
|
||||
|
||||
self.initial_entities = self.c_wrap.num_entities # do as a flush
|
||||
self.scale = 100000 # timestamp scale to make time.time() as int64
|
||||
|
||||
def insert_data(self, nb=constants.ENTITIES_FOR_SEARCH, partition_name=None):
|
||||
def insert_data(self, nb=constants.DELTA_PER_INS, partition_name=None):
|
||||
partition_name = self.p_name if partition_name is None else partition_name
|
||||
self.c_wrap.insert(
|
||||
data=cf.get_column_data_by_schema(nb=nb, schema=self.schema, start=0),
|
||||
partition_name=partition_name,
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback)
|
||||
data = cf.get_column_data_by_schema(nb=nb, schema=self.schema)
|
||||
ts_data = []
|
||||
for i in range(nb):
|
||||
time.sleep(0.001)
|
||||
offset_ts = int(time.time() * self.scale)
|
||||
ts_data.append(offset_ts)
|
||||
data[0] = ts_data # set timestamp (ms) as int64
|
||||
log.debug(f"insert data: {ts_data}")
|
||||
res, result = self.c_wrap.insert(data=data,
|
||||
partition_name=partition_name,
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing)
|
||||
return res, result
|
||||
|
||||
def total(self):
|
||||
return self._succ + self._fail
|
||||
@ -450,7 +477,7 @@ class CollectionLoadChecker(Checker):
|
||||
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
|
||||
self.c_wrap.create_index(self.float_vector_field_name,
|
||||
constants.DEFAULT_INDEX_PARAM,
|
||||
index_name=cf.gen_unique_str('index_'),
|
||||
index_name=self.index_name,
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing)
|
||||
@ -483,7 +510,7 @@ class CollectionReleaseChecker(Checker):
|
||||
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
|
||||
self.c_wrap.create_index(self.float_vector_field_name,
|
||||
constants.DEFAULT_INDEX_PARAM,
|
||||
index_name=cf.gen_unique_str('index_'),
|
||||
index_name=self.index_name,
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing)
|
||||
@ -513,14 +540,16 @@ class PartitionLoadChecker(Checker):
|
||||
def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ):
|
||||
self.replica_number = replica_number
|
||||
if collection_name is None:
|
||||
collection_name = cf.gen_unique_str("LoadChecker_")
|
||||
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
|
||||
collection_name = cf.gen_unique_str("PartitionLoadChecker_")
|
||||
p_name = cf.gen_unique_str("PartitionLoadChecker_")
|
||||
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema, partition_name=p_name)
|
||||
self.c_wrap.create_index(self.float_vector_field_name,
|
||||
constants.DEFAULT_INDEX_PARAM,
|
||||
index_name=cf.gen_unique_str('index_'),
|
||||
index_name=self.index_name,
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing)
|
||||
self.c_wrap.release()
|
||||
|
||||
@trace()
|
||||
def load_partition(self):
|
||||
@ -546,14 +575,16 @@ class PartitionReleaseChecker(Checker):
|
||||
def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ):
|
||||
self.replica_number = replica_number
|
||||
if collection_name is None:
|
||||
collection_name = cf.gen_unique_str("LoadChecker_")
|
||||
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
|
||||
collection_name = cf.gen_unique_str("PartitionReleaseChecker_")
|
||||
p_name = cf.gen_unique_str("PartitionReleaseChecker_")
|
||||
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema, partition_name=p_name)
|
||||
self.c_wrap.create_index(self.float_vector_field_name,
|
||||
constants.DEFAULT_INDEX_PARAM,
|
||||
index_name=cf.gen_unique_str('index_'),
|
||||
index_name=self.index_name,
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing)
|
||||
self.c_wrap.release()
|
||||
self.p_wrap.load(replica_number=self.replica_number)
|
||||
|
||||
@trace()
|
||||
@ -583,7 +614,7 @@ class SearchChecker(Checker):
|
||||
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
|
||||
self.c_wrap.create_index(self.float_vector_field_name,
|
||||
constants.DEFAULT_INDEX_PARAM,
|
||||
index_name=cf.gen_unique_str('index_'),
|
||||
index_name=self.index_name,
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing)
|
||||
@ -743,7 +774,7 @@ class InsertChecker(Checker):
|
||||
try:
|
||||
self.c_wrap.create_index(self.float_vector_field_name,
|
||||
constants.DEFAULT_INDEX_PARAM,
|
||||
index_name=cf.gen_unique_str('index_'),
|
||||
index_name=self.index_name,
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing)
|
||||
@ -774,13 +805,14 @@ class UpsertChecker(Checker):
|
||||
|
||||
def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None):
|
||||
if collection_name is None:
|
||||
collection_name = cf.gen_unique_str("InsertChecker_")
|
||||
collection_name = cf.gen_unique_str("UpsertChecker_")
|
||||
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
|
||||
self.data = cf.get_column_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.schema)
|
||||
|
||||
@trace()
|
||||
def upsert_entities(self):
|
||||
data = cf.get_column_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.schema)
|
||||
res, result = self.c_wrap.upsert(data=data,
|
||||
|
||||
res, result = self.c_wrap.upsert(data=self.data,
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing)
|
||||
@ -788,13 +820,20 @@ class UpsertChecker(Checker):
|
||||
|
||||
@exception_handler()
|
||||
def run_task(self):
|
||||
# half of the data is upsert, the other half is insert
|
||||
rows = len(self.data[0])
|
||||
pk_old = self.data[0][:rows // 2]
|
||||
self.data = cf.get_column_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.schema)
|
||||
pk_new = self.data[0][rows // 2:]
|
||||
pk_update = pk_old + pk_new
|
||||
self.data[0] = pk_update
|
||||
res, result = self.upsert_entities()
|
||||
return res, result
|
||||
|
||||
def keep_running(self):
|
||||
while self._keep_running:
|
||||
self.run_task()
|
||||
sleep(constants.WAIT_PER_OP / 10)
|
||||
sleep(constants.WAIT_PER_OP)
|
||||
|
||||
|
||||
class CollectionCreateChecker(Checker):
|
||||
@ -818,8 +857,10 @@ class CollectionCreateChecker(Checker):
|
||||
@exception_handler()
|
||||
def run_task(self):
|
||||
res, result = self.init_collection()
|
||||
if result:
|
||||
self.c_wrap.drop(timeout=timeout)
|
||||
# if result:
|
||||
# # 50% chance to drop collection
|
||||
# if random.randint(0, 1) == 0:
|
||||
# self.c_wrap.drop(timeout=timeout)
|
||||
return res, result
|
||||
|
||||
def keep_running(self):
|
||||
@ -881,6 +922,17 @@ class PartitionCreateChecker(Checker):
|
||||
if collection_name is None:
|
||||
collection_name = cf.gen_unique_str("PartitionCreateChecker_")
|
||||
super().__init__(collection_name=collection_name, schema=schema, partition_name=partition_name)
|
||||
c_name = cf.gen_unique_str("PartitionDropChecker_")
|
||||
self.c_wrap.init_collection(name=c_name, schema=self.schema)
|
||||
self.c_name = c_name
|
||||
log.info(f"collection {c_name} created")
|
||||
self.p_wrap.init_partition(collection=self.c_name,
|
||||
name=cf.gen_unique_str("PartitionDropChecker_"),
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing
|
||||
)
|
||||
log.info(f"partition: {self.p_wrap}")
|
||||
|
||||
@trace()
|
||||
def create_partition(self):
|
||||
@ -895,8 +947,6 @@ class PartitionCreateChecker(Checker):
|
||||
@exception_handler()
|
||||
def run_task(self):
|
||||
res, result = self.create_partition()
|
||||
if result:
|
||||
self.p_wrap.drop(timeout=timeout)
|
||||
return res, result
|
||||
|
||||
def keep_running(self):
|
||||
@ -912,12 +962,17 @@ class PartitionDropChecker(Checker):
|
||||
if collection_name is None:
|
||||
collection_name = cf.gen_unique_str("PartitionDropChecker_")
|
||||
super().__init__(collection_name=collection_name, schema=schema, partition_name=partition_name)
|
||||
c_name = cf.gen_unique_str("PartitionDropChecker_")
|
||||
self.c_wrap.init_collection(name=c_name, schema=self.schema)
|
||||
self.c_name = c_name
|
||||
log.info(f"collection {c_name} created")
|
||||
self.p_wrap.init_partition(collection=self.c_name,
|
||||
name=cf.gen_unique_str("PartitionDropChecker_"),
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing
|
||||
)
|
||||
log.info(f"partition: {self.p_wrap}")
|
||||
|
||||
@trace()
|
||||
def drop_partition(self):
|
||||
@ -928,12 +983,14 @@ class PartitionDropChecker(Checker):
|
||||
def run_task(self):
|
||||
res, result = self.drop_partition()
|
||||
if result:
|
||||
self.p_wrap.init_partition(collection=self.c_name,
|
||||
name=cf.gen_unique_str("PartitionDropChecker_"),
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing
|
||||
)
|
||||
# create two partition then drop one
|
||||
for i in range(2):
|
||||
self.p_wrap.init_partition(collection=self.c_name,
|
||||
name=cf.gen_unique_str("PartitionDropChecker_"),
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing
|
||||
)
|
||||
return res, result
|
||||
|
||||
def keep_running(self):
|
||||
@ -1007,7 +1064,6 @@ class IndexCreateChecker(Checker):
|
||||
if collection_name is None:
|
||||
collection_name = cf.gen_unique_str("IndexChecker_")
|
||||
super().__init__(collection_name=collection_name, schema=schema)
|
||||
self.index_name = cf.gen_unique_str('index_')
|
||||
for i in range(5):
|
||||
self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
|
||||
timeout=timeout, enable_traceback=enable_traceback)
|
||||
@ -1025,9 +1081,11 @@ class IndexCreateChecker(Checker):
|
||||
|
||||
@exception_handler()
|
||||
def run_task(self):
|
||||
c_name = cf.gen_unique_str("IndexCreateChecker_")
|
||||
self.c_wrap.init_collection(name=c_name, schema=self.schema)
|
||||
res, result = self.create_index()
|
||||
if result:
|
||||
self.c_wrap.drop_index(timeout=timeout)
|
||||
self.c_wrap.drop_index(timeout=timeout, index_name=self.index_name)
|
||||
return res, result
|
||||
|
||||
def keep_running(self):
|
||||
@ -1043,17 +1101,11 @@ class IndexDropChecker(Checker):
|
||||
if collection_name is None:
|
||||
collection_name = cf.gen_unique_str("IndexChecker_")
|
||||
super().__init__(collection_name=collection_name, schema=schema)
|
||||
self.index_name = cf.gen_unique_str('index_')
|
||||
for i in range(5):
|
||||
self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
|
||||
timeout=timeout, enable_traceback=enable_traceback)
|
||||
# do as a flush before indexing
|
||||
log.debug(f"Index ready entities: {self.c_wrap.num_entities}")
|
||||
self.c_wrap.create_index(self.float_vector_field_name,
|
||||
constants.DEFAULT_INDEX_PARAM,
|
||||
index_name=self.index_name,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing)
|
||||
|
||||
@trace()
|
||||
def drop_index(self):
|
||||
@ -1064,6 +1116,7 @@ class IndexDropChecker(Checker):
|
||||
def run_task(self):
|
||||
res, result = self.drop_index()
|
||||
if result:
|
||||
self.c_wrap.init_collection(name=cf.gen_unique_str("IndexDropChecker_"), schema=self.schema)
|
||||
self.c_wrap.create_index(self.float_vector_field_name,
|
||||
constants.DEFAULT_INDEX_PARAM,
|
||||
index_name=self.index_name,
|
||||
@ -1073,6 +1126,12 @@ class IndexDropChecker(Checker):
|
||||
|
||||
def keep_running(self):
|
||||
while self._keep_running:
|
||||
self.c_wrap.init_collection(name=cf.gen_unique_str("IndexDropChecker_"), schema=self.schema)
|
||||
self.c_wrap.create_index(self.float_vector_field_name,
|
||||
constants.DEFAULT_INDEX_PARAM,
|
||||
index_name=self.index_name,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing)
|
||||
self.run_task()
|
||||
sleep(constants.WAIT_PER_OP * 6)
|
||||
|
||||
@ -1086,8 +1145,7 @@ class QueryChecker(Checker):
|
||||
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
|
||||
res, result = self.c_wrap.create_index(self.float_vector_field_name,
|
||||
constants.DEFAULT_INDEX_PARAM,
|
||||
index_name=cf.gen_unique_str(
|
||||
'index_'),
|
||||
index_name=self.index_name,
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing)
|
||||
@ -1105,7 +1163,7 @@ class QueryChecker(Checker):
|
||||
def run_task(self):
|
||||
int_values = []
|
||||
for _ in range(5):
|
||||
int_values.append(randint(0, constants.ENTITIES_FOR_SEARCH))
|
||||
int_values.append(random.randint(0, constants.ENTITIES_FOR_SEARCH))
|
||||
self.term_expr = f'{self.int64_field_name} in {int_values}'
|
||||
res, result = self.query()
|
||||
return res, result
|
||||
@ -1125,28 +1183,44 @@ class DeleteChecker(Checker):
|
||||
super().__init__(collection_name=collection_name, schema=schema)
|
||||
res, result = self.c_wrap.create_index(self.float_vector_field_name,
|
||||
constants.DEFAULT_INDEX_PARAM,
|
||||
index_name=cf.gen_unique_str(
|
||||
'index_'),
|
||||
index_name=self.index_name,
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing)
|
||||
self.c_wrap.load() # load before query
|
||||
self.insert_data()
|
||||
term_expr = f'{self.int64_field_name} > 0'
|
||||
res, _ = self.c_wrap.query(term_expr, output_fields=[
|
||||
self.int64_field_name])
|
||||
query_expr = f'{self.int64_field_name} > 0'
|
||||
res, _ = self.c_wrap.query(query_expr,
|
||||
output_fields=[self.int64_field_name],
|
||||
partition_name=self.p_name)
|
||||
self.ids = [r[self.int64_field_name] for r in res]
|
||||
self.expr = None
|
||||
self.query_expr = query_expr
|
||||
delete_ids = self.ids[:len(self.ids) // 2] # delete half of ids
|
||||
self.delete_expr = f'{self.int64_field_name} in {delete_ids}'
|
||||
|
||||
def update_delete_expr(self):
|
||||
res, _ = self.c_wrap.query(self.query_expr,
|
||||
output_fields=[self.int64_field_name],
|
||||
partition_name=self.p_name)
|
||||
all_ids = [r[self.int64_field_name] for r in res]
|
||||
if len(all_ids) < 100:
|
||||
# insert data to make sure there are enough ids to delete
|
||||
self.insert_data(nb=10000)
|
||||
res, _ = self.c_wrap.query(self.query_expr,
|
||||
output_fields=[self.int64_field_name],
|
||||
partition_name=self.p_name)
|
||||
all_ids = [r[self.int64_field_name] for r in res]
|
||||
delete_ids = all_ids[:len(all_ids) // 2] # delete half of ids
|
||||
self.delete_expr = f'{self.int64_field_name} in {delete_ids}'
|
||||
|
||||
@trace()
|
||||
def delete_entities(self):
|
||||
res, result = self.c_wrap.delete(expr=self.expr, timeout=timeout)
|
||||
res, result = self.c_wrap.delete(expr=self.delete_expr, timeout=timeout, partition_name=self.p_name)
|
||||
return res, result
|
||||
|
||||
@exception_handler()
|
||||
def run_task(self):
|
||||
delete_ids = self.ids.pop()
|
||||
self.expr = f'{self.int64_field_name} in {[delete_ids]}'
|
||||
self.update_delete_expr()
|
||||
res, result = self.delete_entities()
|
||||
return res, result
|
||||
|
||||
@ -1166,8 +1240,7 @@ class CompactChecker(Checker):
|
||||
self.ut = ApiUtilityWrapper()
|
||||
res, result = self.c_wrap.create_index(self.float_vector_field_name,
|
||||
constants.DEFAULT_INDEX_PARAM,
|
||||
index_name=cf.gen_unique_str(
|
||||
'index_'),
|
||||
index_name=self.index_name,
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing)
|
||||
@ -1201,8 +1274,7 @@ class LoadBalanceChecker(Checker):
|
||||
self.utility_wrap = ApiUtilityWrapper()
|
||||
res, result = self.c_wrap.create_index(self.float_vector_field_name,
|
||||
constants.DEFAULT_INDEX_PARAM,
|
||||
index_name=cf.gen_unique_str(
|
||||
'index_'),
|
||||
index_name=self.index_name,
|
||||
timeout=timeout,
|
||||
enable_traceback=enable_traceback,
|
||||
check_task=CheckTasks.check_nothing)
|
||||
|
||||
@ -12,7 +12,7 @@ CHAOS_GROUP = 'chaos-mesh.org' # chaos mesh group
|
||||
CHAOS_VERSION = 'v1alpha1' # chaos mesh version
|
||||
SUCC = 'succ'
|
||||
FAIL = 'fail'
|
||||
DELTA_PER_INS = 10 # entities per insert
|
||||
DELTA_PER_INS = 3000 # entities per insert
|
||||
ENTITIES_FOR_SEARCH = 3000 # entities for search_collection
|
||||
ENTITIES_FOR_BULKINSERT = 1000000 # entities for bulk insert
|
||||
CHAOS_CONFIG_ENV = 'CHAOS_CONFIG_PATH' # env variables for chaos path
|
||||
|
||||
@ -2,7 +2,7 @@ import time
|
||||
|
||||
import pytest
|
||||
from time import sleep
|
||||
from pymilvus import connections
|
||||
from pymilvus import connections, db
|
||||
from chaos.checker import (
|
||||
DatabaseCreateChecker,
|
||||
DatabaseDropChecker,
|
||||
@ -52,7 +52,7 @@ class TestBase:
|
||||
class TestOperations(TestBase):
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def connection(self, host, port, user, password, milvus_ns):
|
||||
def connection(self, host, port, user, password, milvus_ns, database_name):
|
||||
if user and password:
|
||||
# log.info(f"connect to {host}:{port} with user {user} and password {password}")
|
||||
connections.connect('default', host=host, port=port, user=user, password=password, secure=True)
|
||||
@ -60,7 +60,11 @@ class TestOperations(TestBase):
|
||||
connections.connect('default', host=host, port=port)
|
||||
if connections.has_connection("default") is False:
|
||||
raise Exception("no connections")
|
||||
log.info("connect to milvus successfully")
|
||||
all_dbs = db.list_database()
|
||||
if database_name not in all_dbs:
|
||||
db.create_database(database_name)
|
||||
db.using_database(database_name)
|
||||
log.info(f"connect to milvus {host}:{port}, db {database_name} successfully")
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.user = user
|
||||
|
||||
@ -77,6 +77,7 @@ class TestOperations(TestBase):
|
||||
Op.query: QueryChecker(collection_name=c_name),
|
||||
Op.delete: DeleteChecker(collection_name=c_name),
|
||||
}
|
||||
log.info(f"init_health_checkers: {checkers}")
|
||||
self.health_checkers = checkers
|
||||
|
||||
@pytest.fixture(scope="function", params=get_all_collections())
|
||||
|
||||
@ -30,6 +30,7 @@ def pytest_addoption(parser):
|
||||
parser.addoption("--handler", action="store", default="GRPC", help="handler of request")
|
||||
parser.addoption("--tag", action="store", default="all", help="only run tests matching the tag.")
|
||||
parser.addoption('--dry_run', action='store_true', default=False, help="")
|
||||
parser.addoption('--database_name', action='store', default="default", help="name of database")
|
||||
parser.addoption('--partition_name', action='store', default="partition_name", help="name of partition")
|
||||
parser.addoption('--connect_name', action='store', default="connect_name", help="name of connect")
|
||||
parser.addoption('--descriptions', action='store', default="partition_des", help="descriptions of partition")
|
||||
@ -110,6 +111,11 @@ def connect_name(request):
|
||||
return request.config.getoption("--connect_name")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def database_name(request):
|
||||
return request.config.getoption("--database_name")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def partition_name(request):
|
||||
return request.config.getoption("--partition_name")
|
||||
|
||||
@ -47,6 +47,7 @@ pandas==1.5.3
|
||||
tenacity==8.1.0
|
||||
# for standby test
|
||||
etcd-sdk-python==0.0.4
|
||||
deepdiff==6.7.1
|
||||
|
||||
# for test result anaylszer
|
||||
prettytable==3.8.0
|
||||
|
||||
@ -4,6 +4,7 @@ import json
|
||||
from time import sleep
|
||||
from pymilvus import connections
|
||||
from chaos.checker import (InsertChecker,
|
||||
UpsertChecker,
|
||||
SearchChecker,
|
||||
QueryChecker,
|
||||
DeleteChecker,
|
||||
@ -63,6 +64,7 @@ class TestOperations(TestBase):
|
||||
c_name = collection_name
|
||||
checkers = {
|
||||
Op.insert: InsertChecker(collection_name=c_name),
|
||||
Op.upsert: UpsertChecker(collection_name=c_name),
|
||||
Op.search: SearchChecker(collection_name=c_name),
|
||||
Op.query: QueryChecker(collection_name=c_name),
|
||||
Op.delete: DeleteChecker(collection_name=c_name),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user