scale nb in test case (#3564)

* scale nb in test case

Signed-off-by: zw <zw@milvus.io>

* update cases

Signed-off-by: zw <zw@milvus.io>

* update cases

Signed-off-by: zw <zw@milvus.io>

* add shell script to generate test.yaml used to change default build_index_threshold

Signed-off-by: zw <zw@milvus.io>

* fix groovy script

Signed-off-by: zw <zw@milvus.io>

* fix groovy script

Signed-off-by: zw <zw@milvus.io>

* fix groovy write file

Signed-off-by: zw <zw@milvus.io>

* add restart cases

Signed-off-by: zw <zw@milvus.io>

* add restart cases

Signed-off-by: zw <zw@milvus.io>

* fix some bugs

Signed-off-by: zw <zw@milvus.io>

* disable limit partition case

Signed-off-by: zw <zw@milvus.io>

* add cases

Signed-off-by: zw <zw@milvus.io>

Co-authored-by: zw <zw@milvus.io>
This commit is contained in:
del-zhenwu 2020-09-14 16:55:12 +08:00 committed by GitHub
parent c7a5358b56
commit daecef4b70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 435 additions and 253 deletions

View File

@ -11,7 +11,8 @@ timeout(time: 150, unit: 'MINUTES') {
retry(3) {
try {
dir ('charts/milvus') {
sh "helm install --wait --timeout 300s --set image.repository=registry.zilliz.com/milvus/engine --set persistence.enabled=true --set image.tag=${DOCKER_VERSION} --set image.pullPolicy=Always --set service.type=ClusterIP -f ci/db_backend/mysql_${BINARY_VERSION}_values.yaml -f ci/filebeat/values.yaml --namespace milvus ${env.HELM_RELEASE_NAME} ."
writeFile file: 'test.yaml', text: "extraConfiguration:\n engine:\n build_index_threshold: 1000"
sh "helm install --wait --timeout 300s --set image.repository=registry.zilliz.com/milvus/engine --set persistence.enabled=true --set image.tag=${DOCKER_VERSION} --set image.pullPolicy=Always --set service.type=ClusterIP -f ci/db_backend/mysql_${BINARY_VERSION}_values.yaml -f ci/filebeat/values.yaml -f test.yaml --namespace milvus ${env.HELM_RELEASE_NAME} ."
}
} catch (exc) {
def helmStatusCMD = "helm get manifest --namespace milvus ${env.HELM_RELEASE_NAME} | kubectl describe -n milvus -f - && \

View File

@ -10,12 +10,9 @@ import sklearn.preprocessing
import pytest
from utils import *
nb = 6000
dim = 128
tag = "tag"
collection_id = "count_collection"
add_interval_time = 3
segment_row_count = 5000
default_fields = gen_default_fields()
default_binary_fields = gen_binary_default_fields()
entities = gen_entities(nb)
@ -32,8 +29,8 @@ class TestCollectionCount:
scope="function",
params=[
1,
4000,
6001
1000,
2001
],
)
def insert_count(self, request):
@ -187,8 +184,8 @@ class TestCollectionCountIP:
scope="function",
params=[
1,
4000,
6001
1000,
2001
],
)
def insert_count(self, request):
@ -230,8 +227,8 @@ class TestCollectionCountBinary:
scope="function",
params=[
1,
4000,
6001
1000,
2001
],
)
def insert_count(self, request):
@ -423,8 +420,8 @@ class TestCollectionMultiCollections:
scope="function",
params=[
1,
4000,
6001
1000,
2001
],
)
def insert_count(self, request):

View File

@ -6,7 +6,6 @@ from time import sleep
from multiprocessing import Process
from utils import *
dim = 128
default_segment_row_count = 100000
drop_collection_interval_time = 3
segment_row_count = 5000

View File

@ -6,14 +6,11 @@ from multiprocessing import Pool, Process
import pytest
from utils import *
dim = 128
segment_row_count = 5000
nprobe = 1
top_k = 1
epsilon = 0.0001
tag = "1970_01_01"
nb = 6000
nlist = 1024
nlist = 128
collection_id = "collection_stats"
field_name = "float_vector"
entity = gen_entities(1)
@ -143,14 +140,15 @@ class TestStatsBase:
method: add and delete entities, and compact collection, check count in collection info
expected: status ok, count as expected
'''
delete_length = 1000
ids = connect.insert(collection, entities)
status = connect.flush([collection])
delete_ids = ids[:3000]
delete_ids = ids[:delete_length]
connect.delete_entity_by_id(collection, delete_ids)
connect.flush([collection])
stats = connect.get_collection_stats(collection)
logging.getLogger().info(stats)
assert stats["row_count"] == nb - 3000
assert stats["row_count"] == nb - delete_length
compact_before = stats["partitions"][0]["segments"][0]["data_size"]
connect.compact(collection)
stats = connect.get_collection_stats(collection)

View File

@ -11,11 +11,9 @@ import pytest
from utils import *
nb = 1
dim = 128
collection_id = "create_collection"
default_segment_row_count = 512 * 1024
default_segment_row_count = 1024 * 512
drop_collection_interval_time = 3
segment_row_count = 5000
default_fields = gen_default_fields()
entities = gen_entities(nb)

View File

@ -88,6 +88,7 @@ class TestDropCollectionInvalid(object):
def get_collection_name(self, request):
yield request.param
@pytest.mark.level(2)
def test_drop_collection_with_invalid_collectionname(self, connect, get_collection_name):
collection_name = get_collection_name
with pytest.raises(Exception) as e:

View File

@ -7,10 +7,9 @@ import threading
from multiprocessing import Process
from utils import *
nb = 1000
collection_id = "info"
default_fields = gen_default_fields()
segment_row_count = 5000
segment_row_count = 1000
field_name = "float_vector"

View File

@ -7,7 +7,6 @@ from multiprocessing import Process
from utils import *
collection_id = "load_collection"
nb = 6000
default_fields = gen_default_fields()
entities = gen_entities(nb)
field_name = default_float_vec_field_name

View File

@ -71,6 +71,7 @@ def connect(request):
port = http_port
try:
milvus = get_milvus(host=ip, port=port, handler=handler)
# reset_build_index_threshold(milvus)
except Exception as e:
logging.getLogger().error(str(e))
pytest.exit("Milvus server can not connected, exit pytest ...")

View File

@ -8,13 +8,9 @@ from multiprocessing import Pool, Process
import pytest
from utils import *
dim = 128
segment_row_count = 5000
collection_id = "test_delete"
DELETE_TIMEOUT = 60
tag = "1970_01_01"
nb = 6000
field_name = default_float_vec_field_name
entity = gen_entities(1)
raw_vector, binary_entity = gen_binary_entities(1)
@ -420,6 +416,7 @@ class TestDeleteInvalid(object):
with pytest.raises(Exception) as e:
status = connect.delete_entity_by_id(collection, [1, invalid_id])
@pytest.mark.level(2)
def test_delete_entity_with_invalid_collection_name(self, connect, get_collection_name):
collection_name = get_collection_name
with pytest.raises(Exception) as e:

View File

@ -9,12 +9,9 @@ from threading import current_thread
import pytest
from utils import *
dim = 128
segment_row_count = 5000
collection_id = "test_get"
DELETE_TIMEOUT = 60
tag = "1970_01_01"
nb = 6000
entity = gen_entities(1)
binary_entity = gen_binary_entities(1)
entities = gen_entities(nb)
@ -576,7 +573,7 @@ class TestGetBase:
future.result()
@pytest.mark.level(2)
def test_get_entity_by_id_insert_multi_threads(self, connect, collection):
def test_get_entity_by_id_insert_multi_threads_2(self, connect, collection):
'''
target: test.get_entity_by_id
method: thread do insert and get
@ -603,7 +600,7 @@ class TestGetBase:
executor.submit(get, group_ids, group_entities)
step = 100
vectors = gen_vectors(nb, dimension, False)
vectors = gen_vectors(nb, dim, False)
group_vectors = [vectors[i:i + step] for i in range(0, len(vectors), step)]
task = executor.submit(insert, group_vectors)
task.result()

View File

@ -7,13 +7,10 @@ import pytest
from milvus import DataType
from utils import *
dim = 128
segment_row_count = 5000
collection_id = "test_insert"
ADD_TIMEOUT = 60
tag = "1970_01_01"
insert_interval_time = 1.5
nb = 6000
field_name = default_float_vec_field_name
entity = gen_entities(1)
raw_vector, binary_entity = gen_binary_entities(1)

View File

@ -7,9 +7,6 @@ from multiprocessing import Pool, Process
import pytest
from utils import *
dim = 128
segment_row_count = 100000
nb = 6000
tag = "1970_01_01"
field_name = default_float_vec_field_name
binary_field_name = default_binary_vec_field_name
@ -229,7 +226,7 @@ class TestListIdInSegmentBase:
vector_ids = connect.list_id_in_segment(collection, seg_id)
# TODO:
segment_row_count = connect.get_collection_info(collection)["segment_row_count"]
assert vector_ids == ids[0:segment_row_count]
assert vector_ids[0:segment_row_count] == ids[0:segment_row_count]
class TestListIdInSegmentBinary:

View File

@ -10,13 +10,10 @@ import numpy as np
from milvus import DataType
from utils import *
dim = 128
segment_row_count = 5000
top_k_limit = 2048
collection_id = "search"
tag = "1970_01_01"
insert_interval_time = 1.5
nb = 6000
top_k = 10
nq = 1
nprobe = 1
@ -33,12 +30,12 @@ default_query, default_query_vecs = gen_query_vectors(field_name, entities, top_
default_binary_query, default_binary_query_vecs = gen_query_vectors(binary_field_name, binary_entities, top_k, nq)
def init_data(connect, collection, nb=6000, partition_tags=None, auto_id=True):
def init_data(connect, collection, nb=1200, partition_tags=None, auto_id=True):
'''
Generate entities and add it in collection
'''
global entities
if nb == 6000:
if nb == 1200:
insert_entities = entities
else:
insert_entities = gen_entities(nb, is_normal=True)
@ -56,14 +53,14 @@ def init_data(connect, collection, nb=6000, partition_tags=None, auto_id=True):
return insert_entities, ids
def init_binary_data(connect, collection, nb=6000, insert=True, partition_tags=None):
def init_binary_data(connect, collection, nb=1200, insert=True, partition_tags=None):
'''
Generate entities and add it in collection
'''
ids = []
global binary_entities
global raw_vectors
if nb == 6000:
if nb == 1200:
insert_entities = binary_entities
insert_raw_vectors = raw_vectors
else:
@ -141,7 +138,7 @@ class TestSearchBase:
@pytest.fixture(
scope="function",
params=[1, 10, 2049]
params=[1, 10]
)
def get_top_k(self, request):
yield request.param
@ -172,6 +169,25 @@ class TestSearchBase:
with pytest.raises(Exception) as e:
res = connect.search(collection, query)
def test_search_flat_top_k(self, connect, collection, get_nq):
'''
target: test basic search fuction, all the search params is corrent, change top-k value
method: search with the given vectors, check the result
expected: the length of the result is top_k
'''
top_k = 2049
nq = get_nq
entities, ids = init_data(connect, collection)
query, vecs = gen_query_vectors(field_name, entities, top_k, nq)
if top_k <= top_k_limit:
res = connect.search(collection, query)
assert len(res[0]) == top_k
assert res[0]._distances[0] <= epsilon
assert check_id_result(res[0], ids[0])
else:
with pytest.raises(Exception) as e:
res = connect.search(collection, query)
def test_search_field(self, connect, collection, get_top_k, get_nq):
'''
target: test basic search fuction, all the search params is corrent, change top-k value
@ -1572,6 +1588,8 @@ class TestSearchInvalid(object):
index_type = get_simple_index["index_type"]
if index_type in ["FLAT"]:
pytest.skip("skip in FLAT index")
if index_type != search_params["index_type"]:
pytest.skip("skip if index_type not matched")
entities, ids = init_data(connect, collection)
connect.create_index(collection, field_name, get_simple_index)
query, vecs = gen_query_vectors(field_name, entities, top_k, 1, search_params=search_params["search_params"])

View File

@ -3,17 +3,26 @@ import random
import pdb
import threading
import logging
import json
from multiprocessing import Pool, Process
import pytest
from milvus import IndexType, MetricType
from utils import *
dim = 128
index_file_size = 10
collection_id = "test_partition_restart"
nprobe = 1
collection_id = "wal"
TIMEOUT = 120
tag = "1970_01_01"
insert_interval_time = 1.5
big_nb = 100000
field_name = "float_vector"
entity = gen_entities(1)
binary_entity = gen_binary_entities(1)
entities = gen_entities(nb)
big_entities = gen_entities(big_nb)
raw_vectors, binary_entities = gen_binary_entities(nb)
default_fields = gen_default_fields()
default_index = {"index_type": "IVF_FLAT", "params": {"nlist": 128}, "metric_type": "L2"}
class TestRestartBase:
@ -23,112 +32,292 @@ class TestRestartBase:
The following cases are used to test `create_partition` function
******************************************************************
"""
@pytest.fixture(scope="function", autouse=True)
def skip_check(self, connect, args):
@pytest.fixture(scope="module", autouse=True)
def skip_check(self, args):
logging.getLogger().info(args)
if "service_name" not in args or not args["service_name"]:
reason = "Skip if service name not provided"
logging.getLogger().info(reason)
pytest.skip(reason)
if args["service_name"].find("shards") != -1:
reason = "Skip restart cases in shards mode"
logging.getLogger().info(reason)
pytest.skip(reason)
@pytest.mark.level(2)
def _test_create_partition_insert_restart(self, connect, collection, args):
def test_insert_flush(self, connect, collection, args):
'''
target: return the same row count after server restart
method: call function: create partition, then insert, restart server and assert row count
expected: status ok, and row count keep the same
method: call function: create collection, then insert/flush, restart server and assert row count
expected: row count keep the same
'''
status = connect.create_partition(collection, tag)
assert status.OK()
nq = 1000
vectors = gen_vectors(nq, dim)
ids = [i for i in range(nq)]
status, ids = connect.insert(collection, vectors, ids, partition_tag=tag)
assert status.OK()
status = connect.flush([collection])
assert status.OK()
status, res = connect.count_entities(collection)
logging.getLogger().info(res)
assert res == nq
# restart server
if restart_server(args["service_name"]):
logging.getLogger().info("Restart success")
else:
logging.getLogger().info("Restart failed")
# assert row count again
# debug
new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"])
status, res = new_connect.count_entities(collection)
logging.getLogger().info(status)
logging.getLogger().info(res)
assert status.OK()
assert res == nq
@pytest.mark.level(2)
def _test_during_creating_index_restart(self, connect, collection, args):
'''
target: return the same row count after server restart
method: call function: insert, flush, and create index, server do restart during creating index
expected: row count, vector-id, index info keep the same
'''
# reset auto_flush_interval
# auto_flush_interval = 100
get_ids_length = 500
timeout = 60
big_nb = 20000
index_param = {"nlist": 1024, "m": 16}
index_type = IndexType.IVF_PQ
# status, res_set = connect.set_config("db_config", "auto_flush_interval", auto_flush_interval)
# assert status.OK()
# status, res_get = connect.get_config("db_config", "auto_flush_interval")
# assert status.OK()
# assert res_get == str(auto_flush_interval)
# insert and create index
vectors = gen_vectors(big_nb, dim)
status, ids = connect.insert(collection, vectors, ids=[i for i in range(big_nb)])
status = connect.flush([collection])
assert status.OK()
status, res_count = connect.count_entities(collection)
ids = connect.insert(collection, entities)
connect.flush([collection])
ids = connect.insert(collection, entities)
connect.flush([collection])
res_count = connect.count_entities(collection)
logging.getLogger().info(res_count)
assert status.OK()
assert res_count == big_nb
logging.getLogger().info("Start create index async")
status = connect.create_index(collection, index_type, index_param, _async=True)
time.sleep(2)
assert res_count == 2 * nb
# restart server
logging.getLogger().info("Before restart server")
if restart_server(args["service_name"]):
logging.getLogger().info("Restart success")
else:
logging.getLogger().info("Restart failed")
# check row count, index_type, vertor-id after server restart
new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"])
status, res_count = new_connect.count_entities(collection)
assert status.OK()
assert res_count == big_nb
status, res_info = new_connect.get_index_info(collection)
logging.getLogger().info(res_info)
assert res_info._params == index_param
assert res_info._collection_name == collection
assert res_info._index_type == index_type
logging.getLogger().info("Start restart server")
assert restart_server(args["service_name"])
# assert row count again
new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"])
res_count = new_connect.count_entities(collection)
logging.getLogger().info(res_count)
assert res_count == 2 * nb
@pytest.mark.level(2)
def test_insert_during_flushing(self, connect, collection, args):
'''
target: flushing will recover
method: call function: create collection, then insert/flushing, restart server and assert row count
expected: row count equals 0
'''
# disable_autoflush()
ids = connect.insert(collection, big_entities)
connect.flush([collection], _async=True)
res_count = connect.count_entities(collection)
logging.getLogger().info(res_count)
if res_count < big_nb:
# restart server
assert restart_server(args["service_name"])
# assert row count again
new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"])
res_count_2 = new_connect.count_entities(collection)
logging.getLogger().info(res_count_2)
timeout = 300
start_time = time.time()
while new_connect.count_entities(collection) != big_nb and (time.time() - start_time < timeout):
time.sleep(10)
logging.getLogger().info(new_connect.count_entities(collection))
res_count_3 = new_connect.count_entities(collection)
logging.getLogger().info(res_count_3)
assert res_count_3 == big_nb
@pytest.mark.level(2)
def test_delete_during_flushing(self, connect, collection, args):
'''
target: flushing will recover
method: call function: create collection, then delete/flushing, restart server and assert row count
expected: row count equals (nb - delete_length)
'''
# disable_autoflush()
ids = connect.insert(collection, big_entities)
connect.flush([collection])
delete_length = 1000
delete_ids = ids[big_nb//4:big_nb//4+delete_length]
delete_res = connect.delete_entity_by_id(collection, delete_ids)
connect.flush([collection], _async=True)
res_count = connect.count_entities(collection)
logging.getLogger().info(res_count)
# restart server
assert restart_server(args["service_name"])
# assert row count again
new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"])
res_count_2 = new_connect.count_entities(collection)
logging.getLogger().info(res_count_2)
timeout = 100
start_time = time.time()
i = 1
while time.time() - start_time < timeout:
stauts, stats = new_connect.get_collection_stats(collection)
logging.getLogger().info(i)
logging.getLogger().info(stats["partitions"])
index_name = stats["partitions"][0]["segments"][0]["index_name"]
if index_name == "PQ":
break
time.sleep(4)
i += 1
if time.time() - start_time >= timeout:
logging.getLogger().info("Timeout")
assert False
get_ids = random.sample(ids, get_ids_length)
status, res = new_connect.get_entity_by_id(collection, get_ids)
while new_connect.count_entities(collection) != big_nb - delete_length and (time.time() - start_time < timeout):
time.sleep(10)
logging.getLogger().info(new_connect.count_entities(collection))
if new_connect.count_entities(collection) == big_nb - delete_length:
time.sleep(10)
res_count_3 = new_connect.count_entities(collection)
logging.getLogger().info(res_count_3)
assert res_count_3 == big_nb - delete_length
@pytest.mark.level(2)
def test_during_indexed(self, connect, collection, args):
'''
target: flushing will recover
method: call function: create collection, then indexed, restart server and assert row count
expected: row count equals nb
'''
# disable_autoflush()
ids = connect.insert(collection, big_entities)
connect.flush([collection])
connect.create_index(collection, field_name, default_index)
res_count = connect.count_entities(collection)
logging.getLogger().info(res_count)
stats = connect.get_collection_stats(collection)
# logging.getLogger().info(stats)
# pdb.set_trace()
# restart server
assert restart_server(args["service_name"])
# assert row count again
new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"])
assert new_connect.count_entities(collection) == big_nb
stats = connect.get_collection_stats(collection)
for file in stats["partitions"][0]["segments"][0]["files"]:
if file["field"] == field_name and file["name"] != "_raw":
assert file["data_size"] > 0
if file["index_type"] != default_index["index_type"]:
assert False
else:
assert True
@pytest.mark.level(2)
def test_during_indexing(self, connect, collection, args):
'''
target: flushing will recover
method: call function: create collection, then indexing, restart server and assert row count
expected: row count equals nb, server contitue to build index after restart
'''
# disable_autoflush()
loop = 5
for i in range(loop):
ids = connect.insert(collection, big_entities)
connect.flush([collection])
connect.create_index(collection, field_name, default_index, _async=True)
res_count = connect.count_entities(collection)
logging.getLogger().info(res_count)
stats = connect.get_collection_stats(collection)
# logging.getLogger().info(stats)
# restart server
assert restart_server(args["service_name"])
# assert row count again
new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"])
res_count_2 = new_connect.count_entities(collection)
logging.getLogger().info(res_count_2)
assert res_count_2 == loop * big_nb
status = new_connect._cmd("status")
assert json.loads(status)["indexing"] == True
# timeout = 100
# start_time = time.time()
# while time.time() - start_time < timeout:
# time.sleep(5)
# assert new_connect.count_entities(collection) == loop * big_nb
# stats = connect.get_collection_stats(collection)
# assert stats["row_count"] == loop * big_nb
# for file in stats["partitions"][0]["segments"][0]["files"]:
# # logging.getLogger().info(file)
# if file["field"] == field_name and file["name"] != "_raw":
# assert file["data_size"] > 0
# if file["index_type"] != default_index["index_type"]:
# continue
# for file in stats["partitions"][0]["segments"][0]["files"]:
# if file["field"] == field_name and file["name"] != "_raw":
# assert file["data_size"] > 0
# if file["index_type"] != default_index["index_type"]:
# assert False
# else:
# assert True
@pytest.mark.level(2)
def test_delete_flush_during_compacting(self, connect, collection, args):
'''
target: verify server work after restart during compaction
method: call function: create collection, then delete/flush/compacting, restart server and assert row count
call `compact` again, compact pass
expected: row count equals (nb - delete_length)
'''
# disable_autoflush()
ids = connect.insert(collection, big_entities)
connect.flush([collection])
delete_length = 1000
loop = 10
for i in range(loop):
delete_ids = ids[i*loop:i*loop+delete_length]
delete_res = connect.delete_entity_by_id(collection, delete_ids)
connect.flush([collection])
connect.compact(collection, _async=True)
res_count = connect.count_entities(collection)
logging.getLogger().info(res_count)
assert res_count == big_nb - delete_length*loop
info = connect.get_collection_stats(collection)
size_old = info["partitions"][0]["segments"][0]["data_size"]
logging.getLogger().info(size_old)
# restart server
assert restart_server(args["service_name"])
# assert row count again
new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"])
res_count_2 = new_connect.count_entities(collection)
logging.getLogger().info(res_count_2)
assert res_count_2 == big_nb - delete_length*loop
info = connect.get_collection_stats(collection)
size_before = info["partitions"][0]["segments"][0]["data_size"]
status = connect.compact(collection)
assert status.OK()
for index, item_id in enumerate(get_ids):
assert_equal_vector(res[index], vectors[item_id])
info = connect.get_collection_stats(collection)
size_after = info["partitions"][0]["segments"][0]["data_size"]
assert size_before > size_after
@pytest.mark.level(2)
def test_insert_during_flushing_multi_collections(self, connect, args):
'''
target: flushing will recover
method: call function: create collections, then insert/flushing, restart server and assert row count
expected: row count equals 0
'''
# disable_autoflush()
collection_num = 2
collection_list = []
for i in range(collection_num):
collection_name = gen_unique_str(collection_id)
collection_list.append(collection_name)
connect.create_collection(collection_name, default_fields)
ids = connect.insert(collection_name, big_entities)
connect.flush(collection_list, _async=True)
res_count = connect.count_entities(collection_list[-1])
logging.getLogger().info(res_count)
if res_count < big_nb:
# restart server
assert restart_server(args["service_name"])
# assert row count again
new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"])
res_count_2 = new_connect.count_entities(collection_list[-1])
logging.getLogger().info(res_count_2)
timeout = 300
start_time = time.time()
while time.time() - start_time < timeout:
count_list = []
break_flag = True
for index, name in enumerate(collection_list):
tmp_count = new_connect.count_entities(name)
count_list.append(tmp_count)
logging.getLogger().info(count_list)
if tmp_count != big_nb:
break_flag = False
break
if break_flag == True:
break
time.sleep(10)
for name in collection_list:
assert new_connect.count_entities(name) == big_nb
@pytest.mark.level(2)
def test_insert_during_flushing_multi_partitions(self, connect, collection, args):
'''
target: flushing will recover
method: call function: create collection/partition, then insert/flushing, restart server and assert row count
expected: row count equals 0
'''
# disable_autoflush()
partitions_num = 2
partitions = []
for i in range(partitions_num):
tag_tmp = gen_unique_str()
partitions.append(tag_tmp)
connect.create_partition(collection, tag_tmp)
ids = connect.insert(collection, big_entities, partition_tag=tag_tmp)
connect.flush([collection], _async=True)
res_count = connect.count_entities(collection)
logging.getLogger().info(res_count)
if res_count < big_nb:
# restart server
assert restart_server(args["service_name"])
# assert row count again
new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"])
res_count_2 = new_connect.count_entities(collection)
logging.getLogger().info(res_count_2)
timeout = 300
start_time = time.time()
while new_connect.count_entities(collection) != big_nb * 2 and (time.time() - start_time < timeout):
time.sleep(10)
logging.getLogger().info(new_connect.count_entities(collection))
res_count_3 = new_connect.count_entities(collection)
logging.getLogger().info(res_count_3)
assert res_count_3 == big_nb * 2

View File

@ -6,15 +6,11 @@ from multiprocessing import Pool, Process
import pytest
from utils import *
dim = 128
index_file_size = 10
COMPACT_TIMEOUT = 180
nprobe = 1
top_k = 1
tag = "1970_01_01"
nb = 6000
nq = 2
segment_row_count = 5000
entity = gen_entities(1)
entities = gen_entities(nb)
raw_vector, binary_entity = gen_binary_entities(1)
@ -253,8 +249,7 @@ class TestCompactBase:
connect.flush([collection])
info = connect.get_collection_stats(collection)
logging.getLogger().info(info["partitions"])
delete_ids = ids[:3000]
delete_ids = ids[:nb//2]
status = connect.delete_entity_by_id(collection, delete_ids)
assert status.OK()
connect.flush([collection])
@ -296,7 +291,7 @@ class TestCompactBase:
# get collection info before compact
info = connect.get_collection_stats(collection)
size_before = info["partitions"][0]["segments"][0]["data_size"]
delete_ids = ids[:1500]
delete_ids = ids[:nb//2]
status = connect.delete_entity_by_id(collection, delete_ids)
assert status.OK()
connect.flush([collection])

View File

@ -8,14 +8,10 @@ import pytest
from utils import *
import ujson
dim = 128
index_file_size = 10
CONFIG_TIMEOUT = 80
nprobe = 1
top_k = 1
tag = "1970_01_01"
nb = 6000
class TestCacheConfig:

View File

@ -6,15 +6,11 @@ from multiprocessing import Pool, Process
import pytest
from utils import *
dim = 128
segment_row_count = 5000
index_file_size = 10
collection_id = "test_flush"
DELETE_TIMEOUT = 60
nprobe = 1
tag = "1970_01_01"
top_k = 1
nb = 6000
tag = "partition_tag"
field_name = "float_vector"
entity = gen_entities(1)
@ -307,9 +303,15 @@ class TestFlushAsync:
future = connect.flush([collection], _async=True)
status = future.result()
def test_flush_async_long_drop_collection(self, connect, collection):
# vectors = gen_vectors(nb, dim)
for i in range(5):
ids = connect.insert(collection, entities)
future = connect.flush([collection], _async=True)
logging.getLogger().info("DROP")
connect.drop_collection(collection)
def test_flush_async(self, connect, collection):
nb = 100000
vectors = gen_vectors(nb, dim)
connect.insert(collection, entities)
logging.getLogger().info("before")
future = connect.flush([collection], _async=True, _callback=self.check_status)

View File

@ -8,14 +8,11 @@ import pytest
import sklearn.preprocessing
from utils import *
nb = 6000
dim = 128
index_file_size = 10
BUILD_TIMEOUT = 300
nprobe = 1
top_k = 5
tag = "1970_01_01"
NLIST = 4046
NLIST = 128
INVALID_NLIST = 100000000
field_name = "float_vector"
binary_field_name = "binary_vector"
@ -26,7 +23,7 @@ entities = gen_entities(nb)
raw_vector, binary_entity = gen_binary_entities(1)
raw_vectors, binary_entities = gen_binary_entities(nb)
query, query_vecs = gen_query_vectors(field_name, entities, top_k, 1)
default_index = {"index_type": "IVF_FLAT", "params": {"nlist": 1024}, "metric_type": "L2"}
default_index = {"index_type": "IVF_FLAT", "params": {"nlist": 128}, "metric_type": "L2"}
class TestIndexBase:
@ -46,7 +43,7 @@ class TestIndexBase:
params=[
1,
10,
1500
1111
],
)
def get_nq(self, request):
@ -526,7 +523,7 @@ class TestIndexBinary:
params=[
1,
10,
1500
1111
],
)
def get_nq(self, request):
@ -769,6 +766,19 @@ class TestIndexAsync:
# TODO:
logging.getLogger().info(res)
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index_drop(self, connect, collection, get_simple_index):
'''
target: test create index interface
method: create collection and add entities in it, create index
expected: return search success
'''
ids = connect.insert(collection, entities)
logging.getLogger().info("start index")
future = connect.create_index(collection, field_name, get_simple_index, _async=True)
logging.getLogger().info("DROP")
connect.drop_collection(collection)
def test_create_index_with_invalid_collectionname(self, connect):
collection_name = " "
future = connect.create_index(collection_name, field_name, default_index, _async=True)

View File

@ -10,7 +10,6 @@ import sklearn.preprocessing
from milvus import IndexType, MetricType
from utils import *
dim = 128
index_file_size = 10
collection_id = "test_mix"
add_interval_time = 5

View File

@ -8,13 +8,10 @@ import pytest
from utils import *
dim = 128
segment_row_count = 5000
collection_id = "partition"
nprobe = 1
tag = "1970_01_01"
TIMEOUT = 120
nb = 6000
tag = "partition_tag"
field_name = "float_vector"
entity = gen_entities(1)
@ -39,14 +36,16 @@ class TestCreateBase:
'''
connect.create_partition(collection, tag)
# TODO: enable
@pytest.mark.level(2)
def test_create_partition_limit(self, connect, collection, args):
@pytest.mark.timeout(1200)
def _test_create_partition_limit(self, connect, collection, args):
'''
target: test create partitions, check status returned
method: call function: create_partition for 4097 times
expected: exception raised
'''
threads_num = 16
threads_num = 8
threads = []
if args["handler"] == "HTTP":
pytest.skip("skip in http mode")
@ -374,6 +373,7 @@ class TestNameInvalid(object):
def get_collection_name(self, request):
yield request.param
@pytest.mark.level(2)
def test_drop_partition_with_invalid_collection_name(self, connect, collection, get_collection_name):
'''
target: test drop partition, with invalid collection name, check status returned
@ -396,6 +396,7 @@ class TestNameInvalid(object):
with pytest.raises(Exception) as e:
connect.drop_partition(collection, tag_name)
@pytest.mark.level(2)
def test_list_partitions_with_invalid_collection_name(self, connect, collection, get_collection_name):
'''
target: test show partitions, with invalid collection name, check status returned

View File

@ -1,49 +0,0 @@
import time
import pdb
import threading
import logging
from multiprocessing import Pool, Process
import pytest
from utils import *
dim = 128
collection_id = "test_wal"
segment_row_count = 5000
WAL_TIMEOUT = 60
tag = "1970_01_01"
insert_interval_time = 1.5
nb = 6000
field_name = "float_vector"
entity = gen_entities(1)
binary_entity = gen_binary_entities(1)
entities = gen_entities(nb)
raw_vectors, binary_entities = gen_binary_entities(nb)
default_fields = gen_default_fields()
class TestWalBase:
"""
******************************************************************
The following cases are used to test WAL functionality
******************************************************************
"""
@pytest.mark.timeout(WAL_TIMEOUT)
def test_wal_server_crashed_recovery(self, connect, collection):
'''
target: test wal when server crashed unexpectedly and restarted
method: add vectors, server killed before flush, restarted server and flush
expected: status ok, add request is recovered and vectors added
'''
ids = connect.insert(collection, entity)
connect.flush([collection])
res = connect.count_entities(collection)
logging.getLogger().info(res) # should be 0 because no auto flush
logging.getLogger().info("Stop server and restart")
# kill server and restart. auto flush should be set to 15 seconds.
# time.sleep(15)
connect.flush([collection])
res = connect.count_entities(collection)
assert res == 1
res = connect.get_entity_by_id(collection, [ids[0]])
logging.getLogger().info(res)

View File

@ -15,12 +15,13 @@ port = 19530
epsilon = 0.000001
default_flush_interval = 1
big_flush_interval = 1000
dimension = 128
nb = 6000
dim = 128
nb = 1200
top_k = 10
segment_row_count = 5000
segment_row_count = 1000
default_float_vec_field_name = "float_vector"
default_binary_vec_field_name = "binary_vector"
namespace = "milvus"
# TODO:
all_index_types = [
@ -123,6 +124,10 @@ def get_milvus(host, port, uri=None, handler=None, **kwargs):
return milvus
def reset_build_index_threshold(connect):
connect.set_config("engine", "build_index_threshold", 1024)
def disable_flush(connect):
connect.set_config("storage", "auto_flush_interval", big_flush_interval)
@ -211,7 +216,7 @@ def gen_single_filter_fields():
def gen_single_vector_fields():
fields = []
for data_type in [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR]:
field = {"field": data_type.name, "type": data_type, "params": {"dim": dimension}}
field = {"field": data_type.name, "type": data_type, "params": {"dim": dim}}
fields.append(field)
return fields
@ -221,7 +226,7 @@ def gen_default_fields(auto_id=True):
"fields": [
{"field": "int64", "type": DataType.INT64},
{"field": "float", "type": DataType.FLOAT},
{"field": default_float_vec_field_name, "type": DataType.FLOAT_VECTOR, "params": {"dim": dimension}},
{"field": default_float_vec_field_name, "type": DataType.FLOAT_VECTOR, "params": {"dim": dim}},
],
"segment_row_count": segment_row_count,
"auto_id" : auto_id
@ -234,7 +239,7 @@ def gen_binary_default_fields(auto_id=True):
"fields": [
{"field": "int64", "type": DataType.INT64},
{"field": "float", "type": DataType.FLOAT},
{"field": default_binary_vec_field_name, "type": DataType.BINARY_VECTOR, "params": {"dim": dimension}}
{"field": default_binary_vec_field_name, "type": DataType.BINARY_VECTOR, "params": {"dim": dim}}
],
"segment_row_count": segment_row_count,
"auto_id" : auto_id
@ -243,7 +248,7 @@ def gen_binary_default_fields(auto_id=True):
def gen_entities(nb, is_normal=False):
vectors = gen_vectors(nb, dimension, is_normal)
vectors = gen_vectors(nb, dim, is_normal)
entities = [
{"field": "int64", "type": DataType.INT64, "values": [i for i in range(nb)]},
{"field": "float", "type": DataType.FLOAT, "values": [float(i) for i in range(nb)]},
@ -253,7 +258,7 @@ def gen_entities(nb, is_normal=False):
def gen_binary_entities(nb):
raw_vectors, vectors = gen_binary_vectors(nb, dimension)
raw_vectors, vectors = gen_binary_vectors(nb, dim)
entities = [
{"field": "int64", "type": DataType.INT64, "values": [i for i in range(nb)]},
{"field": "float", "type": DataType.FLOAT, "values": [float(i) for i in range(nb)]},
@ -262,7 +267,7 @@ def gen_binary_entities(nb):
return raw_vectors, entities
def gen_entities_by_fields(fields, nb, dimension):
def gen_entities_by_fields(fields, nb, dim):
entities = []
for field in fields:
if field["type"] in [DataType.INT32, DataType.INT64]:
@ -270,9 +275,9 @@ def gen_entities_by_fields(fields, nb, dimension):
elif field["type"] in [DataType.FLOAT, DataType.DOUBLE]:
field_value = [3.0 for i in range(nb)]
elif field["type"] == DataType.BINARY_VECTOR:
field_value = gen_binary_vectors(nb, dimension)[1]
field_value = gen_binary_vectors(nb, dim)[1]
elif field["type"] == DataType.FLOAT_VECTOR:
field_value = gen_vectors(nb, dimension)
field_value = gen_vectors(nb, dim)
field.update({"values": field_value})
entities.append(field)
return entities
@ -401,7 +406,7 @@ def add_field(entities, field_name=None):
def add_vector_field(entities, is_normal=False):
nb = len(entities[0]["values"])
vectors = gen_vectors(nb, dimension, is_normal)
vectors = gen_vectors(nb, dim, is_normal)
field = {
"field": gen_unique_str(),
"type": DataType.FLOAT_VECTOR,
@ -456,7 +461,7 @@ def update_field_value(entities, old_type, new_value):
return tmp_entities
def add_vector_field(nb, dimension=dimension):
def add_vector_field(nb, dimension=dim):
field_name = gen_unique_str()
field = {
"field": field_name,
@ -468,9 +473,8 @@ def add_vector_field(nb, dimension=dimension):
def gen_segment_row_counts():
sizes = [
4096,
8192,
1000000,
1024,
4096
]
return sizes
@ -779,7 +783,6 @@ def restart_server(helm_release_name):
from kubernetes import client, config
client.rest.logger.setLevel(logging.WARNING)
namespace = "milvus"
# service_name = "%s.%s.svc.cluster.local" % (helm_release_name, namespace)
config.load_kube_config()
v1 = client.CoreV1Api()
@ -793,7 +796,7 @@ def restart_server(helm_release_name):
break
# v1.patch_namespaced_config_map(config_map_name, namespace, body, pretty='true')
# status_res = v1.read_namespaced_service_status(helm_release_name, namespace, pretty='true')
# print(status_res)
logging.getLogger().debug("Pod name: %s" % pod_name)
if pod_name is not None:
try:
v1.delete_namespaced_pod(pod_name, namespace)
@ -802,24 +805,61 @@ def restart_server(helm_release_name):
logging.error("Exception when calling CoreV1Api->delete_namespaced_pod")
res = False
return res
time.sleep(5)
logging.error("Sleep 10s after pod deleted")
time.sleep(10)
# check if restart successfully
pods = v1.list_namespaced_pod(namespace)
for i in pods.items:
pod_name_tmp = i.metadata.name
if pod_name_tmp.find(helm_release_name) != -1:
logging.debug(pod_name_tmp)
logging.error(pod_name_tmp)
if pod_name_tmp == pod_name:
continue
elif pod_name_tmp.find(helm_release_name) == -1 or pod_name_tmp.find("mysql") != -1:
continue
else:
status_res = v1.read_namespaced_pod_status(pod_name_tmp, namespace, pretty='true')
logging.error(status_res.status.phase)
start_time = time.time()
while time.time() - start_time > timeout:
ready_break = False
while time.time() - start_time <= timeout:
logging.error(time.time())
status_res = v1.read_namespaced_pod_status(pod_name_tmp, namespace, pretty='true')
if status_res.status.phase == "Running":
logging.error("Already running")
ready_break = True
time.sleep(10)
break
time.sleep(1)
else:
time.sleep(1)
if time.time() - start_time > timeout:
logging.error("Restart pod: %s timeout" % pod_name_tmp)
res = False
return res
if ready_break:
break
else:
logging.error("Pod: %s not found" % helm_release_name)
res = False
raise Exception("Pod: %s not found" % pod_name)
follow = True
pretty = True
previous = True # bool | Return previous terminated container logs. Defaults to false. (optional)
since_seconds = 56 # int | A relative time in seconds before the current time from which to show logs. If this value precedes the time a pod was started, only logs since the pod start will be returned. If this value is in the future, no logs will be returned. Only one of sinceSeconds or sinceTime may be specified. (optional)
timestamps = True # bool | If true, add an RFC3339 or RFC3339Nano timestamp at the beginning of every line of log output. Defaults to false. (optional)
container = "milvus"
# start_time = time.time()
# while time.time() - start_time <= timeout:
# try:
# api_response = v1.read_namespaced_pod_log(pod_name_tmp, namespace, container=container, follow=follow,
# pretty=pretty, previous=previous, since_seconds=since_seconds,
# timestamps=timestamps)
# logging.error(api_response)
# return res
# except Exception as e:
# logging.error("Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e)
# # waiting for server start
# time.sleep(5)
# # res = False
# # return res
# if time.time() - start_time > timeout:
# logging.error("Restart pod: %s timeout" % pod_name_tmp)
# res = False
return res