mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
test: [cp2.6]refactor connection method to prioritize uri/token and add query limit (#45948)
master pr: #45901 Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
parent
e9d920a785
commit
9b0e5cf032
@ -65,12 +65,24 @@ class Base:
|
|||||||
self._teardown_objects()
|
self._teardown_objects()
|
||||||
|
|
||||||
def _teardown_objects(self):
|
def _teardown_objects(self):
|
||||||
|
# Prioritize uri and token for connection
|
||||||
|
if cf.param_info.param_uri:
|
||||||
|
uri = cf.param_info.param_uri
|
||||||
|
else:
|
||||||
|
uri = "http://" + cf.param_info.param_host + ":" + str(cf.param_info.param_port)
|
||||||
|
|
||||||
|
if cf.param_info.param_token:
|
||||||
|
token = cf.param_info.param_token
|
||||||
|
else:
|
||||||
|
token = f"{cf.param_info.param_user}:{cf.param_info.param_password}" if cf.param_info.param_user and cf.param_info.param_password else None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
""" Drop collection before disconnect """
|
""" Drop collection before disconnect """
|
||||||
if not self.connection_wrap.has_connection(alias=DefaultConfig.DEFAULT_USING)[0]:
|
if not self.connection_wrap.has_connection(alias=DefaultConfig.DEFAULT_USING)[0]:
|
||||||
self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING, host=cf.param_info.param_host,
|
if token:
|
||||||
port=cf.param_info.param_port, user=ct.default_user,
|
self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING, uri=uri, token=token)
|
||||||
password=ct.default_password)
|
else:
|
||||||
|
self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING, uri=uri)
|
||||||
|
|
||||||
if self.collection_wrap.collection is not None:
|
if self.collection_wrap.collection is not None:
|
||||||
if self.collection_wrap.collection.name.startswith("alias"):
|
if self.collection_wrap.collection.name.startswith("alias"):
|
||||||
@ -106,9 +118,10 @@ class Base:
|
|||||||
try:
|
try:
|
||||||
""" Drop roles before disconnect """
|
""" Drop roles before disconnect """
|
||||||
if not self.connection_wrap.has_connection(alias=DefaultConfig.DEFAULT_USING)[0]:
|
if not self.connection_wrap.has_connection(alias=DefaultConfig.DEFAULT_USING)[0]:
|
||||||
self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING, host=cf.param_info.param_host,
|
if token:
|
||||||
port=cf.param_info.param_port, user=ct.default_user,
|
self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING, uri=uri, token=token)
|
||||||
password=ct.default_password)
|
else:
|
||||||
|
self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING, uri=uri)
|
||||||
|
|
||||||
role_list = self.utility_wrap.list_roles(False)[0]
|
role_list = self.utility_wrap.list_roles(False)[0]
|
||||||
for role in role_list.groups:
|
for role in role_list.groups:
|
||||||
@ -145,30 +158,32 @@ class TestcaseBase(Base):
|
|||||||
if self.skip_connection:
|
if self.skip_connection:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if enable_milvus_client_api:
|
# Prioritize uri and token for connection
|
||||||
if cf.param_info.param_uri:
|
if cf.param_info.param_uri:
|
||||||
uri = cf.param_info.param_uri
|
uri = cf.param_info.param_uri
|
||||||
else:
|
|
||||||
uri = "http://" + cf.param_info.param_host + ":" + str(cf.param_info.param_port)
|
|
||||||
self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING,uri=uri,token=cf.param_info.param_token)
|
|
||||||
res, is_succ = self.connection_wrap.MilvusClient(uri=uri,
|
|
||||||
token=cf.param_info.param_token)
|
|
||||||
self.client = MilvusClient(uri=uri, token=cf.param_info.param_token)
|
|
||||||
else:
|
else:
|
||||||
if cf.param_info.param_user and cf.param_info.param_password:
|
uri = "http://" + cf.param_info.param_host + ":" + str(cf.param_info.param_port)
|
||||||
|
|
||||||
|
if cf.param_info.param_token:
|
||||||
|
token = cf.param_info.param_token
|
||||||
|
else:
|
||||||
|
token = f"{cf.param_info.param_user}:{cf.param_info.param_password}" if cf.param_info.param_user and cf.param_info.param_password else None
|
||||||
|
|
||||||
|
if enable_milvus_client_api:
|
||||||
|
self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING, uri=uri, token=token)
|
||||||
|
res, is_succ = self.connection_wrap.MilvusClient(uri=uri, token=token)
|
||||||
|
self.client = MilvusClient(uri=uri, token=token)
|
||||||
|
else:
|
||||||
|
if token:
|
||||||
res, is_succ = self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING,
|
res, is_succ = self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING,
|
||||||
host=cf.param_info.param_host,
|
uri=uri,
|
||||||
port=cf.param_info.param_port,
|
token=token,
|
||||||
user=cf.param_info.param_user,
|
|
||||||
password=cf.param_info.param_password,
|
|
||||||
secure=cf.param_info.param_secure)
|
secure=cf.param_info.param_secure)
|
||||||
else:
|
else:
|
||||||
res, is_succ = self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING,
|
res, is_succ = self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING,
|
||||||
host=cf.param_info.param_host,
|
uri=uri)
|
||||||
port=cf.param_info.param_port)
|
|
||||||
|
|
||||||
uri = "http://" + cf.param_info.param_host + ":" + str(cf.param_info.param_port)
|
self.client = MilvusClient(uri=uri, token=token)
|
||||||
self.client = MilvusClient(uri=uri, token=cf.param_info.param_token)
|
|
||||||
server_version = utility.get_server_version()
|
server_version = utility.get_server_version()
|
||||||
log.info(f"server version: {server_version}")
|
log.info(f"server version: {server_version}")
|
||||||
return res
|
return res
|
||||||
|
|||||||
@ -355,7 +355,7 @@ class Checker:
|
|||||||
self.ms = MilvusSys()
|
self.ms = MilvusSys()
|
||||||
self.bucket_name = cf.param_info.param_bucket_name
|
self.bucket_name = cf.param_info.param_bucket_name
|
||||||
|
|
||||||
# Initialize MilvusClient
|
# Initialize MilvusClient - prioritize uri and token
|
||||||
if cf.param_info.param_uri:
|
if cf.param_info.param_uri:
|
||||||
uri = cf.param_info.param_uri
|
uri = cf.param_info.param_uri
|
||||||
else:
|
else:
|
||||||
@ -371,10 +371,8 @@ class Checker:
|
|||||||
self.alias = cf.gen_unique_str("checker_alias_")
|
self.alias = cf.gen_unique_str("checker_alias_")
|
||||||
connections.connect(
|
connections.connect(
|
||||||
alias=self.alias,
|
alias=self.alias,
|
||||||
host=cf.param_info.param_host,
|
uri=uri,
|
||||||
port=str(cf.param_info.param_port),
|
token=token
|
||||||
user=cf.param_info.param_user,
|
|
||||||
password=cf.param_info.param_password
|
|
||||||
)
|
)
|
||||||
c_name = collection_name if collection_name is not None else cf.gen_unique_str(
|
c_name = collection_name if collection_name is not None else cf.gen_unique_str(
|
||||||
'Checker_')
|
'Checker_')
|
||||||
@ -905,7 +903,7 @@ class SearchChecker(Checker):
|
|||||||
data=self.data,
|
data=self.data,
|
||||||
anns_field=self.anns_field_name,
|
anns_field=self.anns_field_name,
|
||||||
search_params=self.search_param,
|
search_params=self.search_param,
|
||||||
limit=1,
|
limit=5,
|
||||||
partition_names=self.p_names,
|
partition_names=self.p_names,
|
||||||
timeout=search_timeout
|
timeout=search_timeout
|
||||||
)
|
)
|
||||||
@ -963,7 +961,7 @@ class TensorSearchChecker(Checker):
|
|||||||
data=self.data,
|
data=self.data,
|
||||||
anns_field=self.anns_field_name,
|
anns_field=self.anns_field_name,
|
||||||
search_params=self.search_param,
|
search_params=self.search_param,
|
||||||
limit=1,
|
limit=5,
|
||||||
partition_names=self.p_names,
|
partition_names=self.p_names,
|
||||||
timeout=search_timeout
|
timeout=search_timeout
|
||||||
)
|
)
|
||||||
@ -1019,7 +1017,7 @@ class FullTextSearchChecker(Checker):
|
|||||||
data=cf.gen_vectors(5, self.dim, vector_data_type="TEXT_SPARSE_VECTOR"),
|
data=cf.gen_vectors(5, self.dim, vector_data_type="TEXT_SPARSE_VECTOR"),
|
||||||
anns_field=bm25_anns_field,
|
anns_field=bm25_anns_field,
|
||||||
search_params=constants.DEFAULT_BM25_SEARCH_PARAM,
|
search_params=constants.DEFAULT_BM25_SEARCH_PARAM,
|
||||||
limit=1,
|
limit=5,
|
||||||
partition_names=self.p_names,
|
partition_names=self.p_names,
|
||||||
timeout=search_timeout
|
timeout=search_timeout
|
||||||
)
|
)
|
||||||
@ -1863,7 +1861,7 @@ class QueryChecker(Checker):
|
|||||||
@trace()
|
@trace()
|
||||||
def query(self):
|
def query(self):
|
||||||
try:
|
try:
|
||||||
res = self.milvus_client.query(collection_name=self.c_name, filter=self.term_expr, timeout=query_timeout)
|
res = self.milvus_client.query(collection_name=self.c_name, filter=self.term_expr, limit=5, timeout=query_timeout)
|
||||||
return res, True
|
return res, True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.info(f"query error: {e}")
|
log.info(f"query error: {e}")
|
||||||
@ -1898,7 +1896,7 @@ class TextMatchChecker(Checker):
|
|||||||
@trace()
|
@trace()
|
||||||
def text_match(self):
|
def text_match(self):
|
||||||
try:
|
try:
|
||||||
res = self.milvus_client.query(collection_name=self.c_name, filter=self.term_expr, timeout=query_timeout)
|
res = self.milvus_client.query(collection_name=self.c_name, filter=self.term_expr, limit=5, timeout=query_timeout)
|
||||||
return res, True
|
return res, True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.info(f"text_match error: {e}")
|
log.info(f"text_match error: {e}")
|
||||||
@ -1938,7 +1936,7 @@ class PhraseMatchChecker(Checker):
|
|||||||
@trace()
|
@trace()
|
||||||
def phrase_match(self):
|
def phrase_match(self):
|
||||||
try:
|
try:
|
||||||
res = self.milvus_client.query(collection_name=self.c_name, filter=self.term_expr, timeout=query_timeout)
|
res = self.milvus_client.query(collection_name=self.c_name, filter=self.term_expr, limit=5, timeout=query_timeout)
|
||||||
return res, True
|
return res, True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.info(f"phrase_match error: {e}")
|
log.info(f"phrase_match error: {e}")
|
||||||
@ -1991,7 +1989,7 @@ class JsonQueryChecker(Checker):
|
|||||||
@trace()
|
@trace()
|
||||||
def json_query(self):
|
def json_query(self):
|
||||||
try:
|
try:
|
||||||
res = self.milvus_client.query(collection_name=self.c_name, filter=self.term_expr, timeout=query_timeout)
|
res = self.milvus_client.query(collection_name=self.c_name, filter=self.term_expr, limit=5, timeout=query_timeout)
|
||||||
return res, True
|
return res, True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.info(f"json_query error: {e}")
|
log.info(f"json_query error: {e}")
|
||||||
@ -2030,7 +2028,7 @@ class GeoQueryChecker(Checker):
|
|||||||
@trace()
|
@trace()
|
||||||
def geo_query(self):
|
def geo_query(self):
|
||||||
try:
|
try:
|
||||||
res = self.milvus_client.query(collection_name=self.c_name, filter=self.term_expr, timeout=query_timeout)
|
res = self.milvus_client.query(collection_name=self.c_name, filter=self.term_expr, limit=5, timeout=query_timeout)
|
||||||
return res, True
|
return res, True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.info(f"geo_query error: {e}")
|
log.info(f"geo_query error: {e}")
|
||||||
|
|||||||
@ -52,23 +52,36 @@ class TestBase:
|
|||||||
class TestOperations(TestBase):
|
class TestOperations(TestBase):
|
||||||
|
|
||||||
@pytest.fixture(scope="function", autouse=True)
|
@pytest.fixture(scope="function", autouse=True)
|
||||||
def connection(self, host, port, user, password, milvus_ns, database_name):
|
def connection(self, host, port, user, password, uri, token, milvus_ns, database_name):
|
||||||
if user and password:
|
# Prioritize uri and token for connection
|
||||||
# log.info(f"connect to {host}:{port} with user {user} and password {password}")
|
if uri:
|
||||||
connections.connect('default', host=host, port=port, user=user, password=password)
|
actual_uri = uri
|
||||||
else:
|
else:
|
||||||
connections.connect('default', host=host, port=port)
|
actual_uri = f"http://{host}:{port}"
|
||||||
|
|
||||||
|
if token:
|
||||||
|
actual_token = token
|
||||||
|
else:
|
||||||
|
actual_token = f"{user}:{password}" if user and password else None
|
||||||
|
|
||||||
|
if actual_token:
|
||||||
|
connections.connect('default', uri=actual_uri, token=actual_token)
|
||||||
|
else:
|
||||||
|
connections.connect('default', uri=actual_uri)
|
||||||
|
|
||||||
if connections.has_connection("default") is False:
|
if connections.has_connection("default") is False:
|
||||||
raise Exception("no connections")
|
raise Exception("no connections")
|
||||||
all_dbs = db.list_database()
|
all_dbs = db.list_database()
|
||||||
if database_name not in all_dbs:
|
if database_name not in all_dbs:
|
||||||
db.create_database(database_name)
|
db.create_database(database_name)
|
||||||
db.using_database(database_name)
|
db.using_database(database_name)
|
||||||
log.info(f"connect to milvus {host}:{port}, db {database_name} successfully")
|
log.info(f"connect to milvus {actual_uri}, db {database_name} successfully")
|
||||||
self.host = host
|
self.host = host
|
||||||
self.port = port
|
self.port = port
|
||||||
self.user = user
|
self.user = user
|
||||||
self.password = password
|
self.password = password
|
||||||
|
self.uri = actual_uri
|
||||||
|
self.token = actual_token
|
||||||
self.milvus_sys = MilvusSys(alias='default')
|
self.milvus_sys = MilvusSys(alias='default')
|
||||||
self.milvus_ns = milvus_ns
|
self.milvus_ns = milvus_ns
|
||||||
self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys)
|
self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys)
|
||||||
|
|||||||
@ -1,8 +1,7 @@
|
|||||||
import time
|
import time
|
||||||
import pytest
|
import pytest
|
||||||
from faker import Faker
|
from faker import Faker
|
||||||
from pymilvus import Collection
|
from pymilvus import MilvusClient, CollectionSchema
|
||||||
from base.client_base import TestcaseBase
|
|
||||||
from common import common_func as cf
|
from common import common_func as cf
|
||||||
from common import common_type as ct
|
from common import common_type as ct
|
||||||
from common.common_type import CaseLabel
|
from common.common_type import CaseLabel
|
||||||
@ -11,7 +10,8 @@ from utils.util_common import get_collections
|
|||||||
|
|
||||||
fake = Faker()
|
fake = Faker()
|
||||||
|
|
||||||
class TestAllCollection(TestcaseBase):
|
|
||||||
|
class TestAllCollection:
|
||||||
""" Test case of end to end"""
|
""" Test case of end to end"""
|
||||||
|
|
||||||
@pytest.fixture(scope="function", params=get_collections(file_name="chaos_test_all_collections.json"))
|
@pytest.fixture(scope="function", params=get_collections(file_name="chaos_test_all_collections.json"))
|
||||||
@ -20,6 +20,23 @@ class TestAllCollection(TestcaseBase):
|
|||||||
pytest.skip("The collection name is invalid")
|
pytest.skip("The collection name is invalid")
|
||||||
yield request.param
|
yield request.param
|
||||||
|
|
||||||
|
@pytest.fixture(scope="function")
|
||||||
|
def milvus_client(self):
|
||||||
|
"""Initialize MilvusClient"""
|
||||||
|
if cf.param_info.param_uri:
|
||||||
|
uri = cf.param_info.param_uri
|
||||||
|
else:
|
||||||
|
uri = "http://" + cf.param_info.param_host + ":" + str(cf.param_info.param_port)
|
||||||
|
|
||||||
|
if cf.param_info.param_token:
|
||||||
|
token = cf.param_info.param_token
|
||||||
|
else:
|
||||||
|
token = f"{cf.param_info.param_user}:{cf.param_info.param_password}"
|
||||||
|
|
||||||
|
client = MilvusClient(uri=uri, token=token)
|
||||||
|
yield client
|
||||||
|
client.close()
|
||||||
|
|
||||||
def teardown_method(self, method):
|
def teardown_method(self, method):
|
||||||
log.info(("*" * 35) + " teardown " + ("*" * 35))
|
log.info(("*" * 35) + " teardown " + ("*" * 35))
|
||||||
log.info("[teardown_method] Start teardown test case %s..." %
|
log.info("[teardown_method] Start teardown test case %s..." %
|
||||||
@ -27,17 +44,18 @@ class TestAllCollection(TestcaseBase):
|
|||||||
log.info("skip drop collection")
|
log.info("skip drop collection")
|
||||||
|
|
||||||
@pytest.mark.tags(CaseLabel.L1)
|
@pytest.mark.tags(CaseLabel.L1)
|
||||||
def test_milvus_default(self, collection_name):
|
def test_milvus_default(self, collection_name, milvus_client):
|
||||||
self._connect()
|
|
||||||
# create
|
# create
|
||||||
name = collection_name if collection_name else cf.gen_unique_str("Checker_")
|
name = collection_name if collection_name else cf.gen_unique_str("Checker_")
|
||||||
t0 = time.time()
|
t0 = time.time()
|
||||||
schema = Collection(name=name).schema
|
|
||||||
collection_w = self.init_collection_wrap(name=name, schema=schema)
|
# Get schema from existing collection
|
||||||
|
collection_info = milvus_client.describe_collection(collection_name=name)
|
||||||
|
schema = CollectionSchema.construct_from_dict(collection_info)
|
||||||
tt = time.time() - t0
|
tt = time.time() - t0
|
||||||
assert collection_w.name == name
|
assert collection_info['collection_name'] == name
|
||||||
|
|
||||||
# get collection info
|
# get collection info
|
||||||
schema = collection_w.schema
|
|
||||||
dim = cf.get_dim_by_schema(schema=schema)
|
dim = cf.get_dim_by_schema(schema=schema)
|
||||||
int64_field_name = cf.get_int64_field_name(schema=schema)
|
int64_field_name = cf.get_int64_field_name(schema=schema)
|
||||||
float_vector_field_name = cf.get_float_vec_field_name(schema=schema)
|
float_vector_field_name = cf.get_float_vec_field_name(schema=schema)
|
||||||
@ -46,60 +64,86 @@ class TestAllCollection(TestcaseBase):
|
|||||||
bm25_vec_field_name_list = cf.get_bm25_vec_field_name_list(schema=schema)
|
bm25_vec_field_name_list = cf.get_bm25_vec_field_name_list(schema=schema)
|
||||||
|
|
||||||
# compact collection before getting num_entities
|
# compact collection before getting num_entities
|
||||||
collection_w.flush(timeout=180)
|
milvus_client.flush(collection_name=name, timeout=180)
|
||||||
collection_w.compact()
|
compact_job_id = milvus_client.compact(collection_name=name, timeout=180)
|
||||||
collection_w.wait_for_compaction_completed(timeout=720)
|
|
||||||
|
|
||||||
entities = collection_w.num_entities
|
# wait for compaction completed
|
||||||
|
max_wait_time = 720
|
||||||
|
start_time = time.time()
|
||||||
|
while time.time() - start_time < max_wait_time:
|
||||||
|
compact_state = milvus_client.get_compaction_state(compact_job_id)
|
||||||
|
if compact_state == "Completed":
|
||||||
|
break
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
entities = milvus_client.get_collection_stats(collection_name=name).get("row_count", 0)
|
||||||
log.info(f"assert create collection: {tt}, init_entities: {entities}")
|
log.info(f"assert create collection: {tt}, init_entities: {entities}")
|
||||||
|
|
||||||
# insert
|
# insert
|
||||||
offset = -3000
|
offset = -3000
|
||||||
data = cf.gen_row_data_by_schema(nb=ct.default_nb, schema=schema, start=offset)
|
data = cf.gen_row_data_by_schema(nb=ct.default_nb, schema=collection_info, start=offset)
|
||||||
t0 = time.time()
|
t0 = time.time()
|
||||||
_, res = collection_w.insert(data)
|
res = milvus_client.insert(collection_name=name, data=data)
|
||||||
tt = time.time() - t0
|
tt = time.time() - t0
|
||||||
log.info(f"assert insert: {tt}")
|
log.info(f"assert insert: {tt}")
|
||||||
assert res
|
assert res.get('insert_count', 0) > 0
|
||||||
|
|
||||||
# flush
|
# flush
|
||||||
t0 = time.time()
|
t0 = time.time()
|
||||||
_, check_result = collection_w.flush(timeout=180)
|
milvus_client.flush(collection_name=name, timeout=180)
|
||||||
assert check_result
|
|
||||||
# assert collection_w.num_entities == len(data[0]) + entities
|
|
||||||
tt = time.time() - t0
|
tt = time.time() - t0
|
||||||
entities = collection_w.num_entities
|
entities = milvus_client.get_collection_stats(collection_name=name).get("row_count", 0)
|
||||||
log.info(f"assert flush: {tt}, entities: {entities}")
|
log.info(f"assert flush: {tt}, entities: {entities}")
|
||||||
|
|
||||||
# show index infos
|
# show index infos
|
||||||
index_infos = [index.to_dict() for index in collection_w.indexes]
|
index_names = milvus_client.list_indexes(collection_name=name)
|
||||||
log.info(f"index info: {index_infos}")
|
log.info(f"index names: {index_names}")
|
||||||
fields_created_index = [index["field"] for index in index_infos]
|
fields_created_index = []
|
||||||
|
for idx_name in index_names:
|
||||||
|
try:
|
||||||
|
idx_info = milvus_client.describe_index(collection_name=name, index_name=idx_name)
|
||||||
|
if 'field_name' in idx_info:
|
||||||
|
fields_created_index.append(idx_info['field_name'])
|
||||||
|
except Exception as e:
|
||||||
|
log.debug(f"Failed to describe index {idx_name}: {e}")
|
||||||
|
|
||||||
# create index if not have
|
# create index if not have
|
||||||
index_params = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 48, "efConstruction": 500}}
|
index_params = milvus_client.prepare_index_params()
|
||||||
|
|
||||||
for f in float_vector_field_name_list:
|
for f in float_vector_field_name_list:
|
||||||
if f not in fields_created_index:
|
if f not in fields_created_index:
|
||||||
t0 = time.time()
|
t0 = time.time()
|
||||||
index, _ = collection_w.create_index(field_name=float_vector_field_name,
|
index_params.add_index(
|
||||||
index_params=index_params)
|
field_name=f,
|
||||||
|
index_type="HNSW",
|
||||||
|
metric_type="L2",
|
||||||
|
params={"M": 48, "efConstruction": 500}
|
||||||
|
)
|
||||||
|
milvus_client.create_index(
|
||||||
|
collection_name=name,
|
||||||
|
index_params=index_params
|
||||||
|
)
|
||||||
tt = time.time() - t0
|
tt = time.time() - t0
|
||||||
log.info(f"create index for field {f} cost: {tt} seconds")
|
log.info(f"create index for field {f} cost: {tt} seconds")
|
||||||
|
index_params = milvus_client.prepare_index_params() # reset for next field
|
||||||
|
|
||||||
# show index infos
|
# show index infos
|
||||||
index_infos = [index.to_dict() for index in collection_w.indexes]
|
index_names = milvus_client.list_indexes(collection_name=name)
|
||||||
log.info(f"index info: {index_infos}")
|
log.info(f"index names: {index_names}")
|
||||||
|
|
||||||
# load
|
# load
|
||||||
collection_w.load()
|
milvus_client.load_collection(collection_name=name)
|
||||||
|
|
||||||
# search
|
# search
|
||||||
search_vectors = cf.gen_vectors(1, dim)
|
search_vectors = cf.gen_vectors(1, dim)
|
||||||
dense_search_params = {"metric_type": "L2", "params": {"ef": 64}}
|
dense_search_params = {"metric_type": "L2", "params": {"ef": 64}}
|
||||||
t0 = time.time()
|
t0 = time.time()
|
||||||
res_1, _ = collection_w.search(data=search_vectors,
|
res_1 = milvus_client.search(
|
||||||
anns_field=float_vector_field_name,
|
collection_name=name,
|
||||||
param=dense_search_params, limit=1)
|
data=search_vectors,
|
||||||
|
anns_field=float_vector_field_name,
|
||||||
|
search_params=dense_search_params,
|
||||||
|
limit=1
|
||||||
|
)
|
||||||
tt = time.time() - t0
|
tt = time.time() - t0
|
||||||
log.info(f"assert search: {tt}")
|
log.info(f"assert search: {tt}")
|
||||||
assert len(res_1) == 1
|
assert len(res_1) == 1
|
||||||
@ -109,9 +153,13 @@ class TestAllCollection(TestcaseBase):
|
|||||||
queries = [fake.text() for _ in range(1)]
|
queries = [fake.text() for _ in range(1)]
|
||||||
bm25_search_params = {"metric_type": "BM25", "params": {}}
|
bm25_search_params = {"metric_type": "BM25", "params": {}}
|
||||||
t0 = time.time()
|
t0 = time.time()
|
||||||
res_2, _ = collection_w.search(data=queries,
|
res_2 = milvus_client.search(
|
||||||
anns_field=bm25_vec_field_name_list[0],
|
collection_name=name,
|
||||||
param=bm25_search_params, limit=1)
|
data=queries,
|
||||||
|
anns_field=bm25_vec_field_name_list[0],
|
||||||
|
search_params=bm25_search_params,
|
||||||
|
limit=1
|
||||||
|
)
|
||||||
tt = time.time() - t0
|
tt = time.time() - t0
|
||||||
log.info(f"assert full text search: {tt}")
|
log.info(f"assert full text search: {tt}")
|
||||||
assert len(res_2) == 1
|
assert len(res_2) == 1
|
||||||
@ -119,30 +167,37 @@ class TestAllCollection(TestcaseBase):
|
|||||||
# query
|
# query
|
||||||
term_expr = f'{int64_field_name} in {[i for i in range(offset, 0)]}'
|
term_expr = f'{int64_field_name} in {[i for i in range(offset, 0)]}'
|
||||||
t0 = time.time()
|
t0 = time.time()
|
||||||
res, _ = collection_w.query(term_expr)
|
res = milvus_client.query(
|
||||||
|
collection_name=name,
|
||||||
|
filter=term_expr,
|
||||||
|
limit=5
|
||||||
|
)
|
||||||
tt = time.time() - t0
|
tt = time.time() - t0
|
||||||
log.info(f"assert query result {len(res)}: {tt}")
|
log.info(f"assert query result {len(res)}: {tt}")
|
||||||
assert len(res) >= len(data[0])
|
assert len(res) > 0
|
||||||
|
|
||||||
# text match
|
# text match
|
||||||
if len(text_match_fields) > 0:
|
if len(text_match_fields) > 0:
|
||||||
queries = [fake.text().replace("\n", " ") for _ in range(1)]
|
queries = [fake.text().replace("\n", " ") for _ in range(1)]
|
||||||
|
|
||||||
expr = f"text_match({text_match_fields[0]}, '{queries[0]}')"
|
expr = f"text_match({text_match_fields[0]}, '{queries[0]}')"
|
||||||
t0 = time.time()
|
t0 = time.time()
|
||||||
res, _ = collection_w.query(expr)
|
res = milvus_client.query(
|
||||||
|
collection_name=name,
|
||||||
|
filter=expr,
|
||||||
|
limit=5
|
||||||
|
)
|
||||||
tt = time.time() - t0
|
tt = time.time() - t0
|
||||||
log.info(f"assert text match: {tt}")
|
log.info(f"assert text match: {tt}")
|
||||||
assert len(res) >= 0
|
assert len(res) >= 0
|
||||||
|
|
||||||
# insert data
|
# insert data
|
||||||
d = cf.gen_row_data_by_schema(nb=ct.default_nb, schema=schema)
|
d = cf.gen_row_data_by_schema(nb=ct.default_nb, schema=collection_info)
|
||||||
collection_w.insert(d)
|
milvus_client.insert(collection_name=name, data=d)
|
||||||
|
|
||||||
# release and load
|
# release and load
|
||||||
t0 = time.time()
|
t0 = time.time()
|
||||||
collection_w.release()
|
milvus_client.release_collection(collection_name=name)
|
||||||
collection_w.load()
|
milvus_client.load_collection(collection_name=name)
|
||||||
tt = time.time() - t0
|
tt = time.time() - t0
|
||||||
log.info(f"release and load: {tt}")
|
log.info(f"release and load: {tt}")
|
||||||
|
|
||||||
@ -151,9 +206,13 @@ class TestAllCollection(TestcaseBase):
|
|||||||
topk = 5
|
topk = 5
|
||||||
search_vectors = cf.gen_vectors(nq, dim)
|
search_vectors = cf.gen_vectors(nq, dim)
|
||||||
t0 = time.time()
|
t0 = time.time()
|
||||||
res, _ = collection_w.search(data=search_vectors,
|
res = milvus_client.search(
|
||||||
anns_field=float_vector_field_name,
|
collection_name=name,
|
||||||
param=dense_search_params, limit=topk)
|
data=search_vectors,
|
||||||
|
anns_field=float_vector_field_name,
|
||||||
|
search_params=dense_search_params,
|
||||||
|
limit=topk
|
||||||
|
)
|
||||||
tt = time.time() - t0
|
tt = time.time() - t0
|
||||||
log.info(f"assert search: {tt}")
|
log.info(f"assert search: {tt}")
|
||||||
assert len(res) == nq
|
assert len(res) == nq
|
||||||
@ -164,9 +223,13 @@ class TestAllCollection(TestcaseBase):
|
|||||||
queries = [fake.text() for _ in range(1)]
|
queries = [fake.text() for _ in range(1)]
|
||||||
bm25_search_params = {"metric_type": "BM25", "params": {}}
|
bm25_search_params = {"metric_type": "BM25", "params": {}}
|
||||||
t0 = time.time()
|
t0 = time.time()
|
||||||
res_2, _ = collection_w.search(data=queries,
|
res_2 = milvus_client.search(
|
||||||
anns_field=bm25_vec_field_name_list[0],
|
collection_name=name,
|
||||||
param=bm25_search_params, limit=1)
|
data=queries,
|
||||||
|
anns_field=bm25_vec_field_name_list[0],
|
||||||
|
search_params=bm25_search_params,
|
||||||
|
limit=1
|
||||||
|
)
|
||||||
tt = time.time() - t0
|
tt = time.time() - t0
|
||||||
log.info(f"assert full text search: {tt}")
|
log.info(f"assert full text search: {tt}")
|
||||||
assert len(res_2) == 1
|
assert len(res_2) == 1
|
||||||
@ -174,7 +237,11 @@ class TestAllCollection(TestcaseBase):
|
|||||||
# query
|
# query
|
||||||
term_expr = f'{int64_field_name} > -3000'
|
term_expr = f'{int64_field_name} > -3000'
|
||||||
t0 = time.time()
|
t0 = time.time()
|
||||||
res, _ = collection_w.query(term_expr)
|
res = milvus_client.query(
|
||||||
|
collection_name=name,
|
||||||
|
filter=term_expr,
|
||||||
|
limit=5
|
||||||
|
)
|
||||||
tt = time.time() - t0
|
tt = time.time() - t0
|
||||||
log.info(f"assert query result {len(res)}: {tt}")
|
log.info(f"assert query result {len(res)}: {tt}")
|
||||||
assert len(res) > 0
|
assert len(res) > 0
|
||||||
@ -184,7 +251,11 @@ class TestAllCollection(TestcaseBase):
|
|||||||
queries = [fake.text().replace("\n", " ") for _ in range(1)]
|
queries = [fake.text().replace("\n", " ") for _ in range(1)]
|
||||||
expr = f"text_match({text_match_fields[0]}, '{queries[0]}')"
|
expr = f"text_match({text_match_fields[0]}, '{queries[0]}')"
|
||||||
t0 = time.time()
|
t0 = time.time()
|
||||||
res, _ = collection_w.query(expr)
|
res = milvus_client.query(
|
||||||
|
collection_name=name,
|
||||||
|
filter=expr,
|
||||||
|
limit=5
|
||||||
|
)
|
||||||
tt = time.time() - t0
|
tt = time.time() - t0
|
||||||
log.info(f"assert text match: {tt}")
|
log.info(f"assert text match: {tt}")
|
||||||
assert len(res) >= 0
|
assert len(res) >= 0
|
||||||
|
|||||||
@ -35,6 +35,11 @@ def get_all_collections():
|
|||||||
with open("/tmp/ci_logs/chaos_test_all_collections.json", "r") as f:
|
with open("/tmp/ci_logs/chaos_test_all_collections.json", "r") as f:
|
||||||
data = json.load(f)
|
data = json.load(f)
|
||||||
all_collections = data["all"]
|
all_collections = data["all"]
|
||||||
|
log.info(f"all_collections: {all_collections}")
|
||||||
|
if all_collections == [] or all_collections == "":
|
||||||
|
return [None]
|
||||||
|
else:
|
||||||
|
return all_collections
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.warning(f"get_all_collections error: {e}")
|
log.warning(f"get_all_collections error: {e}")
|
||||||
return [None]
|
return [None]
|
||||||
@ -57,19 +62,32 @@ class TestBase:
|
|||||||
class TestOperations(TestBase):
|
class TestOperations(TestBase):
|
||||||
|
|
||||||
@pytest.fixture(scope="function", autouse=True)
|
@pytest.fixture(scope="function", autouse=True)
|
||||||
def connection(self, host, port, user, password, milvus_ns):
|
def connection(self, host, port, user, password, uri, token, milvus_ns):
|
||||||
if user and password:
|
# Prioritize uri and token for connection
|
||||||
# log.info(f"connect to {host}:{port} with user {user} and password {password}")
|
if uri:
|
||||||
connections.connect('default', host=host, port=port, user=user, password=password)
|
actual_uri = uri
|
||||||
else:
|
else:
|
||||||
connections.connect('default', host=host, port=port)
|
actual_uri = f"http://{host}:{port}"
|
||||||
|
|
||||||
|
if token:
|
||||||
|
actual_token = token
|
||||||
|
else:
|
||||||
|
actual_token = f"{user}:{password}" if user and password else None
|
||||||
|
|
||||||
|
if actual_token:
|
||||||
|
connections.connect('default', uri=actual_uri, token=actual_token)
|
||||||
|
else:
|
||||||
|
connections.connect('default', uri=actual_uri)
|
||||||
|
|
||||||
if connections.has_connection("default") is False:
|
if connections.has_connection("default") is False:
|
||||||
raise Exception("no connections")
|
raise Exception("no connections")
|
||||||
log.info("connect to milvus successfully")
|
log.info(f"connect to milvus successfully, uri: {actual_uri}")
|
||||||
self.host = host
|
self.host = host
|
||||||
self.port = port
|
self.port = port
|
||||||
self.user = user
|
self.user = user
|
||||||
self.password = password
|
self.password = password
|
||||||
|
self.uri = actual_uri
|
||||||
|
self.token = actual_token
|
||||||
self.milvus_sys = MilvusSys(alias='default')
|
self.milvus_sys = MilvusSys(alias='default')
|
||||||
self.milvus_ns = milvus_ns
|
self.milvus_ns = milvus_ns
|
||||||
self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys)
|
self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys)
|
||||||
@ -96,9 +114,11 @@ class TestOperations(TestBase):
|
|||||||
|
|
||||||
@pytest.fixture(scope="function", params=get_all_collections())
|
@pytest.fixture(scope="function", params=get_all_collections())
|
||||||
def collection_name(self, request):
|
def collection_name(self, request):
|
||||||
|
log.info(f"collection_name: {request.param}")
|
||||||
if request.param == [] or request.param == "":
|
if request.param == [] or request.param == "":
|
||||||
pytest.skip("The collection name is invalid")
|
yield None
|
||||||
yield request.param
|
else:
|
||||||
|
yield request.param
|
||||||
|
|
||||||
@pytest.mark.tags(CaseLabel.L3)
|
@pytest.mark.tags(CaseLabel.L3)
|
||||||
def test_operations(self, request_duration, is_check, collection_name):
|
def test_operations(self, request_duration, is_check, collection_name):
|
||||||
|
|||||||
@ -45,12 +45,23 @@ class TestBase:
|
|||||||
class TestOperations(TestBase):
|
class TestOperations(TestBase):
|
||||||
|
|
||||||
@pytest.fixture(scope="function", autouse=True)
|
@pytest.fixture(scope="function", autouse=True)
|
||||||
def connection(self, host, port, user, password, db_name, milvus_ns):
|
def connection(self, host, port, user, password, uri, token, db_name, milvus_ns):
|
||||||
if user and password:
|
# Prioritize uri and token for connection
|
||||||
log.info(f"connect to {host}:{port} with user {user} and password {password}")
|
if uri:
|
||||||
connections.connect('default', uri=f"{host}:{port}", token=f"{user}:{password}")
|
actual_uri = uri
|
||||||
else:
|
else:
|
||||||
connections.connect('default', host=host, port=port)
|
actual_uri = f"http://{host}:{port}"
|
||||||
|
|
||||||
|
if token:
|
||||||
|
actual_token = token
|
||||||
|
else:
|
||||||
|
actual_token = f"{user}:{password}" if user and password else None
|
||||||
|
|
||||||
|
if actual_token:
|
||||||
|
connections.connect('default', uri=actual_uri, token=actual_token)
|
||||||
|
else:
|
||||||
|
connections.connect('default', uri=actual_uri)
|
||||||
|
|
||||||
if connections.has_connection("default") is False:
|
if connections.has_connection("default") is False:
|
||||||
raise Exception("no connections")
|
raise Exception("no connections")
|
||||||
all_dbs = db.list_database()
|
all_dbs = db.list_database()
|
||||||
@ -58,11 +69,13 @@ class TestOperations(TestBase):
|
|||||||
if db_name not in all_dbs:
|
if db_name not in all_dbs:
|
||||||
db.create_database(db_name)
|
db.create_database(db_name)
|
||||||
db.using_database(db_name)
|
db.using_database(db_name)
|
||||||
log.info(f"connect to milvus {host}:{port}, db {db_name} successfully")
|
log.info(f"connect to milvus {actual_uri}, db {db_name} successfully")
|
||||||
self.host = host
|
self.host = host
|
||||||
self.port = port
|
self.port = port
|
||||||
self.user = user
|
self.user = user
|
||||||
self.password = password
|
self.password = password
|
||||||
|
self.uri = actual_uri
|
||||||
|
self.token = actual_token
|
||||||
self.milvus_ns = milvus_ns
|
self.milvus_ns = milvus_ns
|
||||||
|
|
||||||
def init_health_checkers(self, collection_name=None):
|
def init_health_checkers(self, collection_name=None):
|
||||||
|
|||||||
@ -55,15 +55,26 @@ class TestBase:
|
|||||||
class TestOperations(TestBase):
|
class TestOperations(TestBase):
|
||||||
|
|
||||||
@pytest.fixture(scope="function", autouse=True)
|
@pytest.fixture(scope="function", autouse=True)
|
||||||
def connection(self, host, port, user, password, milvus_ns, minio_host, enable_import, minio_bucket):
|
def connection(self, host, port, user, password, uri, token, milvus_ns, minio_host, enable_import, minio_bucket):
|
||||||
if user and password:
|
# Prioritize uri and token for connection
|
||||||
# log.info(f"connect to {host}:{port} with user {user} and password {password}")
|
if uri:
|
||||||
connections.connect('default', host=host, port=port, user=user, password=password)
|
actual_uri = uri
|
||||||
else:
|
else:
|
||||||
connections.connect('default', host=host, port=port)
|
actual_uri = f"http://{host}:{port}"
|
||||||
|
|
||||||
|
if token:
|
||||||
|
actual_token = token
|
||||||
|
else:
|
||||||
|
actual_token = f"{user}:{password}" if user and password else None
|
||||||
|
|
||||||
|
if actual_token:
|
||||||
|
connections.connect('default', uri=actual_uri, token=actual_token)
|
||||||
|
else:
|
||||||
|
connections.connect('default', uri=actual_uri)
|
||||||
|
|
||||||
if connections.has_connection("default") is False:
|
if connections.has_connection("default") is False:
|
||||||
raise Exception("no connections")
|
raise Exception("no connections")
|
||||||
log.info("connect to milvus successfully")
|
log.info(f"connect to milvus successfully, uri: {actual_uri}")
|
||||||
pymilvus_version = pymilvus.__version__
|
pymilvus_version = pymilvus.__version__
|
||||||
server_version = utility.get_server_version()
|
server_version = utility.get_server_version()
|
||||||
log.info(f"server version: {server_version}")
|
log.info(f"server version: {server_version}")
|
||||||
@ -72,6 +83,8 @@ class TestOperations(TestBase):
|
|||||||
self.port = port
|
self.port = port
|
||||||
self.user = user
|
self.user = user
|
||||||
self.password = password
|
self.password = password
|
||||||
|
self.uri = actual_uri
|
||||||
|
self.token = actual_token
|
||||||
self.milvus_sys = MilvusSys(alias='default')
|
self.milvus_sys = MilvusSys(alias='default')
|
||||||
self.milvus_ns = milvus_ns
|
self.milvus_ns = milvus_ns
|
||||||
self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys)
|
self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys)
|
||||||
|
|||||||
@ -38,19 +38,32 @@ class TestBase:
|
|||||||
class TestOperations(TestBase):
|
class TestOperations(TestBase):
|
||||||
|
|
||||||
@pytest.fixture(scope="function", autouse=True)
|
@pytest.fixture(scope="function", autouse=True)
|
||||||
def connection(self, host, port, user, password, milvus_ns):
|
def connection(self, host, port, user, password, uri, token, milvus_ns):
|
||||||
if user and password:
|
# Prioritize uri and token for connection
|
||||||
# log.info(f"connect to {host}:{port} with user {user} and password {password}")
|
if uri:
|
||||||
connections.connect('default', host=host, port=port, user=user, password=password)
|
actual_uri = uri
|
||||||
else:
|
else:
|
||||||
connections.connect('default', host=host, port=port)
|
actual_uri = f"http://{host}:{port}"
|
||||||
|
|
||||||
|
if token:
|
||||||
|
actual_token = token
|
||||||
|
else:
|
||||||
|
actual_token = f"{user}:{password}" if user and password else None
|
||||||
|
|
||||||
|
if actual_token:
|
||||||
|
connections.connect('default', uri=actual_uri, token=actual_token)
|
||||||
|
else:
|
||||||
|
connections.connect('default', uri=actual_uri)
|
||||||
|
|
||||||
if connections.has_connection("default") is False:
|
if connections.has_connection("default") is False:
|
||||||
raise Exception("no connections")
|
raise Exception("no connections")
|
||||||
log.info("connect to milvus successfully")
|
log.info(f"connect to milvus successfully, uri: {actual_uri}")
|
||||||
self.host = host
|
self.host = host
|
||||||
self.port = port
|
self.port = port
|
||||||
self.user = user
|
self.user = user
|
||||||
self.password = password
|
self.password = password
|
||||||
|
self.uri = actual_uri
|
||||||
|
self.token = actual_token
|
||||||
self.milvus_sys = MilvusSys(alias='default')
|
self.milvus_sys = MilvusSys(alias='default')
|
||||||
self.chaos_ns = constants.CHAOS_NAMESPACE
|
self.chaos_ns = constants.CHAOS_NAMESPACE
|
||||||
self.milvus_ns = milvus_ns
|
self.milvus_ns = milvus_ns
|
||||||
|
|||||||
@ -56,19 +56,32 @@ class TestOperations(TestBase):
|
|||||||
yield request.param
|
yield request.param
|
||||||
|
|
||||||
@pytest.fixture(scope="function", autouse=True)
|
@pytest.fixture(scope="function", autouse=True)
|
||||||
def connection(self, host, port, user, password):
|
def connection(self, host, port, user, password, uri, token):
|
||||||
if user and password:
|
# Prioritize uri and token for connection
|
||||||
# log.info(f"connect to {host}:{port} with user {user} and password {password}")
|
if uri:
|
||||||
connections.connect('default', host=host, port=port, user=user, password=password)
|
actual_uri = uri
|
||||||
else:
|
else:
|
||||||
connections.connect('default', host=host, port=port)
|
actual_uri = f"http://{host}:{port}"
|
||||||
|
|
||||||
|
if token:
|
||||||
|
actual_token = token
|
||||||
|
else:
|
||||||
|
actual_token = f"{user}:{password}" if user and password else None
|
||||||
|
|
||||||
|
if actual_token:
|
||||||
|
connections.connect('default', uri=actual_uri, token=actual_token)
|
||||||
|
else:
|
||||||
|
connections.connect('default', uri=actual_uri)
|
||||||
|
|
||||||
if connections.has_connection("default") is False:
|
if connections.has_connection("default") is False:
|
||||||
raise Exception("no connections")
|
raise Exception("no connections")
|
||||||
log.info("connect to milvus successfully")
|
log.info(f"connect to milvus successfully, uri: {actual_uri}")
|
||||||
self.host = host
|
self.host = host
|
||||||
self.port = port
|
self.port = port
|
||||||
self.user = user
|
self.user = user
|
||||||
self.password = password
|
self.password = password
|
||||||
|
self.uri = actual_uri
|
||||||
|
self.token = actual_token
|
||||||
|
|
||||||
def init_health_checkers(self, collection_name=None):
|
def init_health_checkers(self, collection_name=None):
|
||||||
c_name = collection_name
|
c_name = collection_name
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user