From daecef4b7044c8a57c3f748e11d1230af7657528 Mon Sep 17 00:00:00 2001 From: del-zhenwu <56623710+del-zhenwu@users.noreply.github.com> Date: Mon, 14 Sep 2020 16:55:12 +0800 Subject: [PATCH] scale nb in test case (#3564) * scale nb in test case Signed-off-by: zw * update cases Signed-off-by: zw * update cases Signed-off-by: zw * add shell script to generate test.yaml used to change default build_index_threshold Signed-off-by: zw * fix groovy script Signed-off-by: zw * fix groovy script Signed-off-by: zw * fix groovy write file Signed-off-by: zw * add restart cases Signed-off-by: zw * add restart cases Signed-off-by: zw * fix some bugs Signed-off-by: zw * disable limit partition case Signed-off-by: zw * add cases Signed-off-by: zw Co-authored-by: zw --- .../modules/DevTest/SingleNodeDevTest.groovy | 3 +- .../collection/test_collection_count.py | 19 +- .../collection/test_collection_logic.py | 1 - .../collection/test_collection_stats.py | 10 +- .../collection/test_create_collection.py | 4 +- .../collection/test_drop_collection.py | 1 + .../collection/test_get_collection_info.py | 3 +- .../collection/test_load_collection.py | 1 - tests/milvus_python_test/conftest.py | 1 + .../milvus_python_test/entity/test_delete.py | 5 +- .../entity/test_get_entity_by_id.py | 7 +- .../milvus_python_test/entity/test_insert.py | 3 - .../entity/test_list_id_in_segment.py | 5 +- .../milvus_python_test/entity/test_search.py | 34 +- .../stability/test_restart.py | 389 +++++++++++++----- tests/milvus_python_test/test_compact.py | 9 +- tests/milvus_python_test/test_config.py | 4 - tests/milvus_python_test/test_flush.py | 14 +- tests/milvus_python_test/test_index.py | 24 +- tests/milvus_python_test/test_mix.py | 1 - tests/milvus_python_test/test_partition.py | 11 +- tests/milvus_python_test/test_wal.py | 49 --- tests/milvus_python_test/utils.py | 90 ++-- 23 files changed, 435 insertions(+), 253 deletions(-) delete mode 100644 tests/milvus_python_test/test_wal.py diff --git a/.jenkins/modules/DevTest/SingleNodeDevTest.groovy b/.jenkins/modules/DevTest/SingleNodeDevTest.groovy index 37c86bc1ab..b54fb9b489 100644 --- a/.jenkins/modules/DevTest/SingleNodeDevTest.groovy +++ b/.jenkins/modules/DevTest/SingleNodeDevTest.groovy @@ -11,7 +11,8 @@ timeout(time: 150, unit: 'MINUTES') { retry(3) { try { dir ('charts/milvus') { - sh "helm install --wait --timeout 300s --set image.repository=registry.zilliz.com/milvus/engine --set persistence.enabled=true --set image.tag=${DOCKER_VERSION} --set image.pullPolicy=Always --set service.type=ClusterIP -f ci/db_backend/mysql_${BINARY_VERSION}_values.yaml -f ci/filebeat/values.yaml --namespace milvus ${env.HELM_RELEASE_NAME} ." + writeFile file: 'test.yaml', text: "extraConfiguration:\n engine:\n build_index_threshold: 1000" + sh "helm install --wait --timeout 300s --set image.repository=registry.zilliz.com/milvus/engine --set persistence.enabled=true --set image.tag=${DOCKER_VERSION} --set image.pullPolicy=Always --set service.type=ClusterIP -f ci/db_backend/mysql_${BINARY_VERSION}_values.yaml -f ci/filebeat/values.yaml -f test.yaml --namespace milvus ${env.HELM_RELEASE_NAME} ." } } catch (exc) { def helmStatusCMD = "helm get manifest --namespace milvus ${env.HELM_RELEASE_NAME} | kubectl describe -n milvus -f - && \ diff --git a/tests/milvus_python_test/collection/test_collection_count.py b/tests/milvus_python_test/collection/test_collection_count.py index 706a1ee48d..f65cfafb6b 100644 --- a/tests/milvus_python_test/collection/test_collection_count.py +++ b/tests/milvus_python_test/collection/test_collection_count.py @@ -10,12 +10,9 @@ import sklearn.preprocessing import pytest from utils import * -nb = 6000 -dim = 128 tag = "tag" collection_id = "count_collection" add_interval_time = 3 -segment_row_count = 5000 default_fields = gen_default_fields() default_binary_fields = gen_binary_default_fields() entities = gen_entities(nb) @@ -32,8 +29,8 @@ class TestCollectionCount: scope="function", params=[ 1, - 4000, - 6001 + 1000, + 2001 ], ) def insert_count(self, request): @@ -187,8 +184,8 @@ class TestCollectionCountIP: scope="function", params=[ 1, - 4000, - 6001 + 1000, + 2001 ], ) def insert_count(self, request): @@ -230,8 +227,8 @@ class TestCollectionCountBinary: scope="function", params=[ 1, - 4000, - 6001 + 1000, + 2001 ], ) def insert_count(self, request): @@ -423,8 +420,8 @@ class TestCollectionMultiCollections: scope="function", params=[ 1, - 4000, - 6001 + 1000, + 2001 ], ) def insert_count(self, request): diff --git a/tests/milvus_python_test/collection/test_collection_logic.py b/tests/milvus_python_test/collection/test_collection_logic.py index 52c4d996f4..2ce2137676 100644 --- a/tests/milvus_python_test/collection/test_collection_logic.py +++ b/tests/milvus_python_test/collection/test_collection_logic.py @@ -6,7 +6,6 @@ from time import sleep from multiprocessing import Process from utils import * -dim = 128 default_segment_row_count = 100000 drop_collection_interval_time = 3 segment_row_count = 5000 diff --git a/tests/milvus_python_test/collection/test_collection_stats.py b/tests/milvus_python_test/collection/test_collection_stats.py index b6e14d56bf..d59b30e4cb 100644 --- a/tests/milvus_python_test/collection/test_collection_stats.py +++ b/tests/milvus_python_test/collection/test_collection_stats.py @@ -6,14 +6,11 @@ from multiprocessing import Pool, Process import pytest from utils import * -dim = 128 -segment_row_count = 5000 nprobe = 1 top_k = 1 epsilon = 0.0001 tag = "1970_01_01" -nb = 6000 -nlist = 1024 +nlist = 128 collection_id = "collection_stats" field_name = "float_vector" entity = gen_entities(1) @@ -143,14 +140,15 @@ class TestStatsBase: method: add and delete entities, and compact collection, check count in collection info expected: status ok, count as expected ''' + delete_length = 1000 ids = connect.insert(collection, entities) status = connect.flush([collection]) - delete_ids = ids[:3000] + delete_ids = ids[:delete_length] connect.delete_entity_by_id(collection, delete_ids) connect.flush([collection]) stats = connect.get_collection_stats(collection) logging.getLogger().info(stats) - assert stats["row_count"] == nb - 3000 + assert stats["row_count"] == nb - delete_length compact_before = stats["partitions"][0]["segments"][0]["data_size"] connect.compact(collection) stats = connect.get_collection_stats(collection) diff --git a/tests/milvus_python_test/collection/test_create_collection.py b/tests/milvus_python_test/collection/test_create_collection.py index a8f5e5221f..8c207cefec 100644 --- a/tests/milvus_python_test/collection/test_create_collection.py +++ b/tests/milvus_python_test/collection/test_create_collection.py @@ -11,11 +11,9 @@ import pytest from utils import * nb = 1 -dim = 128 collection_id = "create_collection" -default_segment_row_count = 512 * 1024 +default_segment_row_count = 1024 * 512 drop_collection_interval_time = 3 -segment_row_count = 5000 default_fields = gen_default_fields() entities = gen_entities(nb) diff --git a/tests/milvus_python_test/collection/test_drop_collection.py b/tests/milvus_python_test/collection/test_drop_collection.py index 4cb2ffb172..cfc9bc5f4d 100644 --- a/tests/milvus_python_test/collection/test_drop_collection.py +++ b/tests/milvus_python_test/collection/test_drop_collection.py @@ -88,6 +88,7 @@ class TestDropCollectionInvalid(object): def get_collection_name(self, request): yield request.param + @pytest.mark.level(2) def test_drop_collection_with_invalid_collectionname(self, connect, get_collection_name): collection_name = get_collection_name with pytest.raises(Exception) as e: diff --git a/tests/milvus_python_test/collection/test_get_collection_info.py b/tests/milvus_python_test/collection/test_get_collection_info.py index 0b987ad742..6f77631814 100644 --- a/tests/milvus_python_test/collection/test_get_collection_info.py +++ b/tests/milvus_python_test/collection/test_get_collection_info.py @@ -7,10 +7,9 @@ import threading from multiprocessing import Process from utils import * -nb = 1000 collection_id = "info" default_fields = gen_default_fields() -segment_row_count = 5000 +segment_row_count = 1000 field_name = "float_vector" diff --git a/tests/milvus_python_test/collection/test_load_collection.py b/tests/milvus_python_test/collection/test_load_collection.py index 18317bc198..dfc3c1f82d 100644 --- a/tests/milvus_python_test/collection/test_load_collection.py +++ b/tests/milvus_python_test/collection/test_load_collection.py @@ -7,7 +7,6 @@ from multiprocessing import Process from utils import * collection_id = "load_collection" -nb = 6000 default_fields = gen_default_fields() entities = gen_entities(nb) field_name = default_float_vec_field_name diff --git a/tests/milvus_python_test/conftest.py b/tests/milvus_python_test/conftest.py index ea0b7be168..d9021c6ef6 100644 --- a/tests/milvus_python_test/conftest.py +++ b/tests/milvus_python_test/conftest.py @@ -71,6 +71,7 @@ def connect(request): port = http_port try: milvus = get_milvus(host=ip, port=port, handler=handler) + # reset_build_index_threshold(milvus) except Exception as e: logging.getLogger().error(str(e)) pytest.exit("Milvus server can not connected, exit pytest ...") diff --git a/tests/milvus_python_test/entity/test_delete.py b/tests/milvus_python_test/entity/test_delete.py index 13bb9b5faa..a3017bf9c9 100644 --- a/tests/milvus_python_test/entity/test_delete.py +++ b/tests/milvus_python_test/entity/test_delete.py @@ -8,13 +8,9 @@ from multiprocessing import Pool, Process import pytest from utils import * - -dim = 128 -segment_row_count = 5000 collection_id = "test_delete" DELETE_TIMEOUT = 60 tag = "1970_01_01" -nb = 6000 field_name = default_float_vec_field_name entity = gen_entities(1) raw_vector, binary_entity = gen_binary_entities(1) @@ -420,6 +416,7 @@ class TestDeleteInvalid(object): with pytest.raises(Exception) as e: status = connect.delete_entity_by_id(collection, [1, invalid_id]) + @pytest.mark.level(2) def test_delete_entity_with_invalid_collection_name(self, connect, get_collection_name): collection_name = get_collection_name with pytest.raises(Exception) as e: diff --git a/tests/milvus_python_test/entity/test_get_entity_by_id.py b/tests/milvus_python_test/entity/test_get_entity_by_id.py index 2d2d0c4166..04954ec848 100644 --- a/tests/milvus_python_test/entity/test_get_entity_by_id.py +++ b/tests/milvus_python_test/entity/test_get_entity_by_id.py @@ -9,12 +9,9 @@ from threading import current_thread import pytest from utils import * -dim = 128 -segment_row_count = 5000 collection_id = "test_get" DELETE_TIMEOUT = 60 tag = "1970_01_01" -nb = 6000 entity = gen_entities(1) binary_entity = gen_binary_entities(1) entities = gen_entities(nb) @@ -576,7 +573,7 @@ class TestGetBase: future.result() @pytest.mark.level(2) - def test_get_entity_by_id_insert_multi_threads(self, connect, collection): + def test_get_entity_by_id_insert_multi_threads_2(self, connect, collection): ''' target: test.get_entity_by_id method: thread do insert and get @@ -603,7 +600,7 @@ class TestGetBase: executor.submit(get, group_ids, group_entities) step = 100 - vectors = gen_vectors(nb, dimension, False) + vectors = gen_vectors(nb, dim, False) group_vectors = [vectors[i:i + step] for i in range(0, len(vectors), step)] task = executor.submit(insert, group_vectors) task.result() diff --git a/tests/milvus_python_test/entity/test_insert.py b/tests/milvus_python_test/entity/test_insert.py index 5756ffcff7..f3a4463ea4 100644 --- a/tests/milvus_python_test/entity/test_insert.py +++ b/tests/milvus_python_test/entity/test_insert.py @@ -7,13 +7,10 @@ import pytest from milvus import DataType from utils import * -dim = 128 -segment_row_count = 5000 collection_id = "test_insert" ADD_TIMEOUT = 60 tag = "1970_01_01" insert_interval_time = 1.5 -nb = 6000 field_name = default_float_vec_field_name entity = gen_entities(1) raw_vector, binary_entity = gen_binary_entities(1) diff --git a/tests/milvus_python_test/entity/test_list_id_in_segment.py b/tests/milvus_python_test/entity/test_list_id_in_segment.py index b6db5383b6..fa1e0bd55a 100644 --- a/tests/milvus_python_test/entity/test_list_id_in_segment.py +++ b/tests/milvus_python_test/entity/test_list_id_in_segment.py @@ -7,9 +7,6 @@ from multiprocessing import Pool, Process import pytest from utils import * -dim = 128 -segment_row_count = 100000 -nb = 6000 tag = "1970_01_01" field_name = default_float_vec_field_name binary_field_name = default_binary_vec_field_name @@ -229,7 +226,7 @@ class TestListIdInSegmentBase: vector_ids = connect.list_id_in_segment(collection, seg_id) # TODO: segment_row_count = connect.get_collection_info(collection)["segment_row_count"] - assert vector_ids == ids[0:segment_row_count] + assert vector_ids[0:segment_row_count] == ids[0:segment_row_count] class TestListIdInSegmentBinary: diff --git a/tests/milvus_python_test/entity/test_search.py b/tests/milvus_python_test/entity/test_search.py index 1a5db55248..3513f44de5 100644 --- a/tests/milvus_python_test/entity/test_search.py +++ b/tests/milvus_python_test/entity/test_search.py @@ -10,13 +10,10 @@ import numpy as np from milvus import DataType from utils import * -dim = 128 -segment_row_count = 5000 top_k_limit = 2048 collection_id = "search" tag = "1970_01_01" insert_interval_time = 1.5 -nb = 6000 top_k = 10 nq = 1 nprobe = 1 @@ -33,12 +30,12 @@ default_query, default_query_vecs = gen_query_vectors(field_name, entities, top_ default_binary_query, default_binary_query_vecs = gen_query_vectors(binary_field_name, binary_entities, top_k, nq) -def init_data(connect, collection, nb=6000, partition_tags=None, auto_id=True): +def init_data(connect, collection, nb=1200, partition_tags=None, auto_id=True): ''' Generate entities and add it in collection ''' global entities - if nb == 6000: + if nb == 1200: insert_entities = entities else: insert_entities = gen_entities(nb, is_normal=True) @@ -56,14 +53,14 @@ def init_data(connect, collection, nb=6000, partition_tags=None, auto_id=True): return insert_entities, ids -def init_binary_data(connect, collection, nb=6000, insert=True, partition_tags=None): +def init_binary_data(connect, collection, nb=1200, insert=True, partition_tags=None): ''' Generate entities and add it in collection ''' ids = [] global binary_entities global raw_vectors - if nb == 6000: + if nb == 1200: insert_entities = binary_entities insert_raw_vectors = raw_vectors else: @@ -141,7 +138,7 @@ class TestSearchBase: @pytest.fixture( scope="function", - params=[1, 10, 2049] + params=[1, 10] ) def get_top_k(self, request): yield request.param @@ -172,6 +169,25 @@ class TestSearchBase: with pytest.raises(Exception) as e: res = connect.search(collection, query) + def test_search_flat_top_k(self, connect, collection, get_nq): + ''' + target: test basic search fuction, all the search params is corrent, change top-k value + method: search with the given vectors, check the result + expected: the length of the result is top_k + ''' + top_k = 2049 + nq = get_nq + entities, ids = init_data(connect, collection) + query, vecs = gen_query_vectors(field_name, entities, top_k, nq) + if top_k <= top_k_limit: + res = connect.search(collection, query) + assert len(res[0]) == top_k + assert res[0]._distances[0] <= epsilon + assert check_id_result(res[0], ids[0]) + else: + with pytest.raises(Exception) as e: + res = connect.search(collection, query) + def test_search_field(self, connect, collection, get_top_k, get_nq): ''' target: test basic search fuction, all the search params is corrent, change top-k value @@ -1572,6 +1588,8 @@ class TestSearchInvalid(object): index_type = get_simple_index["index_type"] if index_type in ["FLAT"]: pytest.skip("skip in FLAT index") + if index_type != search_params["index_type"]: + pytest.skip("skip if index_type not matched") entities, ids = init_data(connect, collection) connect.create_index(collection, field_name, get_simple_index) query, vecs = gen_query_vectors(field_name, entities, top_k, 1, search_params=search_params["search_params"]) diff --git a/tests/milvus_python_test/stability/test_restart.py b/tests/milvus_python_test/stability/test_restart.py index 9d274a0f6a..9a71cdbd4f 100644 --- a/tests/milvus_python_test/stability/test_restart.py +++ b/tests/milvus_python_test/stability/test_restart.py @@ -3,17 +3,26 @@ import random import pdb import threading import logging +import json from multiprocessing import Pool, Process import pytest from milvus import IndexType, MetricType from utils import * -dim = 128 -index_file_size = 10 -collection_id = "test_partition_restart" -nprobe = 1 +collection_id = "wal" +TIMEOUT = 120 tag = "1970_01_01" +insert_interval_time = 1.5 +big_nb = 100000 +field_name = "float_vector" +entity = gen_entities(1) +binary_entity = gen_binary_entities(1) +entities = gen_entities(nb) +big_entities = gen_entities(big_nb) +raw_vectors, binary_entities = gen_binary_entities(nb) +default_fields = gen_default_fields() +default_index = {"index_type": "IVF_FLAT", "params": {"nlist": 128}, "metric_type": "L2"} class TestRestartBase: @@ -23,112 +32,292 @@ class TestRestartBase: The following cases are used to test `create_partition` function ****************************************************************** """ - @pytest.fixture(scope="function", autouse=True) - def skip_check(self, connect, args): + @pytest.fixture(scope="module", autouse=True) + def skip_check(self, args): + logging.getLogger().info(args) + if "service_name" not in args or not args["service_name"]: + reason = "Skip if service name not provided" + logging.getLogger().info(reason) + pytest.skip(reason) if args["service_name"].find("shards") != -1: reason = "Skip restart cases in shards mode" logging.getLogger().info(reason) pytest.skip(reason) - @pytest.mark.level(2) - def _test_create_partition_insert_restart(self, connect, collection, args): + def test_insert_flush(self, connect, collection, args): ''' target: return the same row count after server restart - method: call function: create partition, then insert, restart server and assert row count - expected: status ok, and row count keep the same + method: call function: create collection, then insert/flush, restart server and assert row count + expected: row count keep the same ''' - status = connect.create_partition(collection, tag) - assert status.OK() - nq = 1000 - vectors = gen_vectors(nq, dim) - ids = [i for i in range(nq)] - status, ids = connect.insert(collection, vectors, ids, partition_tag=tag) - assert status.OK() - status = connect.flush([collection]) - assert status.OK() - status, res = connect.count_entities(collection) - logging.getLogger().info(res) - assert res == nq - - # restart server - if restart_server(args["service_name"]): - logging.getLogger().info("Restart success") - else: - logging.getLogger().info("Restart failed") - # assert row count again - - # debug - new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"]) - status, res = new_connect.count_entities(collection) - logging.getLogger().info(status) - logging.getLogger().info(res) - assert status.OK() - assert res == nq - - @pytest.mark.level(2) - def _test_during_creating_index_restart(self, connect, collection, args): - ''' - target: return the same row count after server restart - method: call function: insert, flush, and create index, server do restart during creating index - expected: row count, vector-id, index info keep the same - ''' - # reset auto_flush_interval - # auto_flush_interval = 100 - get_ids_length = 500 - timeout = 60 - big_nb = 20000 - index_param = {"nlist": 1024, "m": 16} - index_type = IndexType.IVF_PQ - # status, res_set = connect.set_config("db_config", "auto_flush_interval", auto_flush_interval) - # assert status.OK() - # status, res_get = connect.get_config("db_config", "auto_flush_interval") - # assert status.OK() - # assert res_get == str(auto_flush_interval) - # insert and create index - vectors = gen_vectors(big_nb, dim) - status, ids = connect.insert(collection, vectors, ids=[i for i in range(big_nb)]) - status = connect.flush([collection]) - assert status.OK() - status, res_count = connect.count_entities(collection) + ids = connect.insert(collection, entities) + connect.flush([collection]) + ids = connect.insert(collection, entities) + connect.flush([collection]) + res_count = connect.count_entities(collection) logging.getLogger().info(res_count) - assert status.OK() - assert res_count == big_nb - logging.getLogger().info("Start create index async") - status = connect.create_index(collection, index_type, index_param, _async=True) - time.sleep(2) + assert res_count == 2 * nb # restart server - logging.getLogger().info("Before restart server") - if restart_server(args["service_name"]): - logging.getLogger().info("Restart success") - else: - logging.getLogger().info("Restart failed") - # check row count, index_type, vertor-id after server restart - new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"]) - status, res_count = new_connect.count_entities(collection) - assert status.OK() - assert res_count == big_nb - status, res_info = new_connect.get_index_info(collection) - logging.getLogger().info(res_info) - assert res_info._params == index_param - assert res_info._collection_name == collection - assert res_info._index_type == index_type + logging.getLogger().info("Start restart server") + assert restart_server(args["service_name"]) + # assert row count again + new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"]) + res_count = new_connect.count_entities(collection) + logging.getLogger().info(res_count) + assert res_count == 2 * nb + + @pytest.mark.level(2) + def test_insert_during_flushing(self, connect, collection, args): + ''' + target: flushing will recover + method: call function: create collection, then insert/flushing, restart server and assert row count + expected: row count equals 0 + ''' + # disable_autoflush() + ids = connect.insert(collection, big_entities) + connect.flush([collection], _async=True) + res_count = connect.count_entities(collection) + logging.getLogger().info(res_count) + if res_count < big_nb: + # restart server + assert restart_server(args["service_name"]) + # assert row count again + new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"]) + res_count_2 = new_connect.count_entities(collection) + logging.getLogger().info(res_count_2) + timeout = 300 + start_time = time.time() + while new_connect.count_entities(collection) != big_nb and (time.time() - start_time < timeout): + time.sleep(10) + logging.getLogger().info(new_connect.count_entities(collection)) + res_count_3 = new_connect.count_entities(collection) + logging.getLogger().info(res_count_3) + assert res_count_3 == big_nb + + @pytest.mark.level(2) + def test_delete_during_flushing(self, connect, collection, args): + ''' + target: flushing will recover + method: call function: create collection, then delete/flushing, restart server and assert row count + expected: row count equals (nb - delete_length) + ''' + # disable_autoflush() + ids = connect.insert(collection, big_entities) + connect.flush([collection]) + delete_length = 1000 + delete_ids = ids[big_nb//4:big_nb//4+delete_length] + delete_res = connect.delete_entity_by_id(collection, delete_ids) + connect.flush([collection], _async=True) + res_count = connect.count_entities(collection) + logging.getLogger().info(res_count) + # restart server + assert restart_server(args["service_name"]) + # assert row count again + new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"]) + res_count_2 = new_connect.count_entities(collection) + logging.getLogger().info(res_count_2) + timeout = 100 start_time = time.time() - i = 1 - while time.time() - start_time < timeout: - stauts, stats = new_connect.get_collection_stats(collection) - logging.getLogger().info(i) - logging.getLogger().info(stats["partitions"]) - index_name = stats["partitions"][0]["segments"][0]["index_name"] - if index_name == "PQ": - break - time.sleep(4) - i += 1 - if time.time() - start_time >= timeout: - logging.getLogger().info("Timeout") - assert False - get_ids = random.sample(ids, get_ids_length) - status, res = new_connect.get_entity_by_id(collection, get_ids) + while new_connect.count_entities(collection) != big_nb - delete_length and (time.time() - start_time < timeout): + time.sleep(10) + logging.getLogger().info(new_connect.count_entities(collection)) + if new_connect.count_entities(collection) == big_nb - delete_length: + time.sleep(10) + res_count_3 = new_connect.count_entities(collection) + logging.getLogger().info(res_count_3) + assert res_count_3 == big_nb - delete_length + + @pytest.mark.level(2) + def test_during_indexed(self, connect, collection, args): + ''' + target: flushing will recover + method: call function: create collection, then indexed, restart server and assert row count + expected: row count equals nb + ''' + # disable_autoflush() + ids = connect.insert(collection, big_entities) + connect.flush([collection]) + connect.create_index(collection, field_name, default_index) + res_count = connect.count_entities(collection) + logging.getLogger().info(res_count) + stats = connect.get_collection_stats(collection) + # logging.getLogger().info(stats) + # pdb.set_trace() + # restart server + assert restart_server(args["service_name"]) + # assert row count again + new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"]) + assert new_connect.count_entities(collection) == big_nb + stats = connect.get_collection_stats(collection) + for file in stats["partitions"][0]["segments"][0]["files"]: + if file["field"] == field_name and file["name"] != "_raw": + assert file["data_size"] > 0 + if file["index_type"] != default_index["index_type"]: + assert False + else: + assert True + + @pytest.mark.level(2) + def test_during_indexing(self, connect, collection, args): + ''' + target: flushing will recover + method: call function: create collection, then indexing, restart server and assert row count + expected: row count equals nb, server contitue to build index after restart + ''' + # disable_autoflush() + loop = 5 + for i in range(loop): + ids = connect.insert(collection, big_entities) + connect.flush([collection]) + connect.create_index(collection, field_name, default_index, _async=True) + res_count = connect.count_entities(collection) + logging.getLogger().info(res_count) + stats = connect.get_collection_stats(collection) + # logging.getLogger().info(stats) + # restart server + assert restart_server(args["service_name"]) + # assert row count again + new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"]) + res_count_2 = new_connect.count_entities(collection) + logging.getLogger().info(res_count_2) + assert res_count_2 == loop * big_nb + status = new_connect._cmd("status") + assert json.loads(status)["indexing"] == True + # timeout = 100 + # start_time = time.time() + # while time.time() - start_time < timeout: + # time.sleep(5) + # assert new_connect.count_entities(collection) == loop * big_nb + # stats = connect.get_collection_stats(collection) + # assert stats["row_count"] == loop * big_nb + # for file in stats["partitions"][0]["segments"][0]["files"]: + # # logging.getLogger().info(file) + # if file["field"] == field_name and file["name"] != "_raw": + # assert file["data_size"] > 0 + # if file["index_type"] != default_index["index_type"]: + # continue + # for file in stats["partitions"][0]["segments"][0]["files"]: + # if file["field"] == field_name and file["name"] != "_raw": + # assert file["data_size"] > 0 + # if file["index_type"] != default_index["index_type"]: + # assert False + # else: + # assert True + + @pytest.mark.level(2) + def test_delete_flush_during_compacting(self, connect, collection, args): + ''' + target: verify server work after restart during compaction + method: call function: create collection, then delete/flush/compacting, restart server and assert row count + call `compact` again, compact pass + expected: row count equals (nb - delete_length) + ''' + # disable_autoflush() + ids = connect.insert(collection, big_entities) + connect.flush([collection]) + delete_length = 1000 + loop = 10 + for i in range(loop): + delete_ids = ids[i*loop:i*loop+delete_length] + delete_res = connect.delete_entity_by_id(collection, delete_ids) + connect.flush([collection]) + connect.compact(collection, _async=True) + res_count = connect.count_entities(collection) + logging.getLogger().info(res_count) + assert res_count == big_nb - delete_length*loop + info = connect.get_collection_stats(collection) + size_old = info["partitions"][0]["segments"][0]["data_size"] + logging.getLogger().info(size_old) + # restart server + assert restart_server(args["service_name"]) + # assert row count again + new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"]) + res_count_2 = new_connect.count_entities(collection) + logging.getLogger().info(res_count_2) + assert res_count_2 == big_nb - delete_length*loop + info = connect.get_collection_stats(collection) + size_before = info["partitions"][0]["segments"][0]["data_size"] + status = connect.compact(collection) assert status.OK() - for index, item_id in enumerate(get_ids): - assert_equal_vector(res[index], vectors[item_id]) + info = connect.get_collection_stats(collection) + size_after = info["partitions"][0]["segments"][0]["data_size"] + assert size_before > size_after + + + @pytest.mark.level(2) + def test_insert_during_flushing_multi_collections(self, connect, args): + ''' + target: flushing will recover + method: call function: create collections, then insert/flushing, restart server and assert row count + expected: row count equals 0 + ''' + # disable_autoflush() + collection_num = 2 + collection_list = [] + for i in range(collection_num): + collection_name = gen_unique_str(collection_id) + collection_list.append(collection_name) + connect.create_collection(collection_name, default_fields) + ids = connect.insert(collection_name, big_entities) + connect.flush(collection_list, _async=True) + res_count = connect.count_entities(collection_list[-1]) + logging.getLogger().info(res_count) + if res_count < big_nb: + # restart server + assert restart_server(args["service_name"]) + # assert row count again + new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"]) + res_count_2 = new_connect.count_entities(collection_list[-1]) + logging.getLogger().info(res_count_2) + timeout = 300 + start_time = time.time() + while time.time() - start_time < timeout: + count_list = [] + break_flag = True + for index, name in enumerate(collection_list): + tmp_count = new_connect.count_entities(name) + count_list.append(tmp_count) + logging.getLogger().info(count_list) + if tmp_count != big_nb: + break_flag = False + break + if break_flag == True: + break + time.sleep(10) + for name in collection_list: + assert new_connect.count_entities(name) == big_nb + + @pytest.mark.level(2) + def test_insert_during_flushing_multi_partitions(self, connect, collection, args): + ''' + target: flushing will recover + method: call function: create collection/partition, then insert/flushing, restart server and assert row count + expected: row count equals 0 + ''' + # disable_autoflush() + partitions_num = 2 + partitions = [] + for i in range(partitions_num): + tag_tmp = gen_unique_str() + partitions.append(tag_tmp) + connect.create_partition(collection, tag_tmp) + ids = connect.insert(collection, big_entities, partition_tag=tag_tmp) + connect.flush([collection], _async=True) + res_count = connect.count_entities(collection) + logging.getLogger().info(res_count) + if res_count < big_nb: + # restart server + assert restart_server(args["service_name"]) + # assert row count again + new_connect = get_milvus(args["ip"], args["port"], handler=args["handler"]) + res_count_2 = new_connect.count_entities(collection) + logging.getLogger().info(res_count_2) + timeout = 300 + start_time = time.time() + while new_connect.count_entities(collection) != big_nb * 2 and (time.time() - start_time < timeout): + time.sleep(10) + logging.getLogger().info(new_connect.count_entities(collection)) + res_count_3 = new_connect.count_entities(collection) + logging.getLogger().info(res_count_3) + assert res_count_3 == big_nb * 2 \ No newline at end of file diff --git a/tests/milvus_python_test/test_compact.py b/tests/milvus_python_test/test_compact.py index 52d3febb35..6680276907 100644 --- a/tests/milvus_python_test/test_compact.py +++ b/tests/milvus_python_test/test_compact.py @@ -6,15 +6,11 @@ from multiprocessing import Pool, Process import pytest from utils import * -dim = 128 -index_file_size = 10 COMPACT_TIMEOUT = 180 nprobe = 1 top_k = 1 tag = "1970_01_01" -nb = 6000 nq = 2 -segment_row_count = 5000 entity = gen_entities(1) entities = gen_entities(nb) raw_vector, binary_entity = gen_binary_entities(1) @@ -253,8 +249,7 @@ class TestCompactBase: connect.flush([collection]) info = connect.get_collection_stats(collection) logging.getLogger().info(info["partitions"]) - - delete_ids = ids[:3000] + delete_ids = ids[:nb//2] status = connect.delete_entity_by_id(collection, delete_ids) assert status.OK() connect.flush([collection]) @@ -296,7 +291,7 @@ class TestCompactBase: # get collection info before compact info = connect.get_collection_stats(collection) size_before = info["partitions"][0]["segments"][0]["data_size"] - delete_ids = ids[:1500] + delete_ids = ids[:nb//2] status = connect.delete_entity_by_id(collection, delete_ids) assert status.OK() connect.flush([collection]) diff --git a/tests/milvus_python_test/test_config.py b/tests/milvus_python_test/test_config.py index d4c27a22e9..a45fd2a987 100644 --- a/tests/milvus_python_test/test_config.py +++ b/tests/milvus_python_test/test_config.py @@ -8,14 +8,10 @@ import pytest from utils import * import ujson - -dim = 128 -index_file_size = 10 CONFIG_TIMEOUT = 80 nprobe = 1 top_k = 1 tag = "1970_01_01" -nb = 6000 class TestCacheConfig: diff --git a/tests/milvus_python_test/test_flush.py b/tests/milvus_python_test/test_flush.py index d6b1f712f5..2d6ae7c595 100644 --- a/tests/milvus_python_test/test_flush.py +++ b/tests/milvus_python_test/test_flush.py @@ -6,15 +6,11 @@ from multiprocessing import Pool, Process import pytest from utils import * -dim = 128 -segment_row_count = 5000 -index_file_size = 10 collection_id = "test_flush" DELETE_TIMEOUT = 60 nprobe = 1 tag = "1970_01_01" top_k = 1 -nb = 6000 tag = "partition_tag" field_name = "float_vector" entity = gen_entities(1) @@ -307,9 +303,15 @@ class TestFlushAsync: future = connect.flush([collection], _async=True) status = future.result() + def test_flush_async_long_drop_collection(self, connect, collection): + # vectors = gen_vectors(nb, dim) + for i in range(5): + ids = connect.insert(collection, entities) + future = connect.flush([collection], _async=True) + logging.getLogger().info("DROP") + connect.drop_collection(collection) + def test_flush_async(self, connect, collection): - nb = 100000 - vectors = gen_vectors(nb, dim) connect.insert(collection, entities) logging.getLogger().info("before") future = connect.flush([collection], _async=True, _callback=self.check_status) diff --git a/tests/milvus_python_test/test_index.py b/tests/milvus_python_test/test_index.py index c78b710a32..0d72f16f70 100644 --- a/tests/milvus_python_test/test_index.py +++ b/tests/milvus_python_test/test_index.py @@ -8,14 +8,11 @@ import pytest import sklearn.preprocessing from utils import * -nb = 6000 -dim = 128 -index_file_size = 10 BUILD_TIMEOUT = 300 nprobe = 1 top_k = 5 tag = "1970_01_01" -NLIST = 4046 +NLIST = 128 INVALID_NLIST = 100000000 field_name = "float_vector" binary_field_name = "binary_vector" @@ -26,7 +23,7 @@ entities = gen_entities(nb) raw_vector, binary_entity = gen_binary_entities(1) raw_vectors, binary_entities = gen_binary_entities(nb) query, query_vecs = gen_query_vectors(field_name, entities, top_k, 1) -default_index = {"index_type": "IVF_FLAT", "params": {"nlist": 1024}, "metric_type": "L2"} +default_index = {"index_type": "IVF_FLAT", "params": {"nlist": 128}, "metric_type": "L2"} class TestIndexBase: @@ -46,7 +43,7 @@ class TestIndexBase: params=[ 1, 10, - 1500 + 1111 ], ) def get_nq(self, request): @@ -526,7 +523,7 @@ class TestIndexBinary: params=[ 1, 10, - 1500 + 1111 ], ) def get_nq(self, request): @@ -769,6 +766,19 @@ class TestIndexAsync: # TODO: logging.getLogger().info(res) + @pytest.mark.timeout(BUILD_TIMEOUT) + def test_create_index_drop(self, connect, collection, get_simple_index): + ''' + target: test create index interface + method: create collection and add entities in it, create index + expected: return search success + ''' + ids = connect.insert(collection, entities) + logging.getLogger().info("start index") + future = connect.create_index(collection, field_name, get_simple_index, _async=True) + logging.getLogger().info("DROP") + connect.drop_collection(collection) + def test_create_index_with_invalid_collectionname(self, connect): collection_name = " " future = connect.create_index(collection_name, field_name, default_index, _async=True) diff --git a/tests/milvus_python_test/test_mix.py b/tests/milvus_python_test/test_mix.py index e5432b39fc..d5524a7417 100644 --- a/tests/milvus_python_test/test_mix.py +++ b/tests/milvus_python_test/test_mix.py @@ -10,7 +10,6 @@ import sklearn.preprocessing from milvus import IndexType, MetricType from utils import * -dim = 128 index_file_size = 10 collection_id = "test_mix" add_interval_time = 5 diff --git a/tests/milvus_python_test/test_partition.py b/tests/milvus_python_test/test_partition.py index 83335a756a..2d7f072f97 100644 --- a/tests/milvus_python_test/test_partition.py +++ b/tests/milvus_python_test/test_partition.py @@ -8,13 +8,10 @@ import pytest from utils import * -dim = 128 -segment_row_count = 5000 collection_id = "partition" nprobe = 1 tag = "1970_01_01" TIMEOUT = 120 -nb = 6000 tag = "partition_tag" field_name = "float_vector" entity = gen_entities(1) @@ -39,14 +36,16 @@ class TestCreateBase: ''' connect.create_partition(collection, tag) + # TODO: enable @pytest.mark.level(2) - def test_create_partition_limit(self, connect, collection, args): + @pytest.mark.timeout(1200) + def _test_create_partition_limit(self, connect, collection, args): ''' target: test create partitions, check status returned method: call function: create_partition for 4097 times expected: exception raised ''' - threads_num = 16 + threads_num = 8 threads = [] if args["handler"] == "HTTP": pytest.skip("skip in http mode") @@ -374,6 +373,7 @@ class TestNameInvalid(object): def get_collection_name(self, request): yield request.param + @pytest.mark.level(2) def test_drop_partition_with_invalid_collection_name(self, connect, collection, get_collection_name): ''' target: test drop partition, with invalid collection name, check status returned @@ -396,6 +396,7 @@ class TestNameInvalid(object): with pytest.raises(Exception) as e: connect.drop_partition(collection, tag_name) + @pytest.mark.level(2) def test_list_partitions_with_invalid_collection_name(self, connect, collection, get_collection_name): ''' target: test show partitions, with invalid collection name, check status returned diff --git a/tests/milvus_python_test/test_wal.py b/tests/milvus_python_test/test_wal.py deleted file mode 100644 index 2d0086134a..0000000000 --- a/tests/milvus_python_test/test_wal.py +++ /dev/null @@ -1,49 +0,0 @@ -import time -import pdb -import threading -import logging -from multiprocessing import Pool, Process -import pytest -from utils import * - -dim = 128 -collection_id = "test_wal" -segment_row_count = 5000 -WAL_TIMEOUT = 60 -tag = "1970_01_01" -insert_interval_time = 1.5 -nb = 6000 -field_name = "float_vector" -entity = gen_entities(1) -binary_entity = gen_binary_entities(1) -entities = gen_entities(nb) -raw_vectors, binary_entities = gen_binary_entities(nb) -default_fields = gen_default_fields() - - -class TestWalBase: - """ - ****************************************************************** - The following cases are used to test WAL functionality - ****************************************************************** - """ - - @pytest.mark.timeout(WAL_TIMEOUT) - def test_wal_server_crashed_recovery(self, connect, collection): - ''' - target: test wal when server crashed unexpectedly and restarted - method: add vectors, server killed before flush, restarted server and flush - expected: status ok, add request is recovered and vectors added - ''' - ids = connect.insert(collection, entity) - connect.flush([collection]) - res = connect.count_entities(collection) - logging.getLogger().info(res) # should be 0 because no auto flush - logging.getLogger().info("Stop server and restart") - # kill server and restart. auto flush should be set to 15 seconds. - # time.sleep(15) - connect.flush([collection]) - res = connect.count_entities(collection) - assert res == 1 - res = connect.get_entity_by_id(collection, [ids[0]]) - logging.getLogger().info(res) diff --git a/tests/milvus_python_test/utils.py b/tests/milvus_python_test/utils.py index 1a5b5dde60..aa61c0bbd2 100644 --- a/tests/milvus_python_test/utils.py +++ b/tests/milvus_python_test/utils.py @@ -15,12 +15,13 @@ port = 19530 epsilon = 0.000001 default_flush_interval = 1 big_flush_interval = 1000 -dimension = 128 -nb = 6000 +dim = 128 +nb = 1200 top_k = 10 -segment_row_count = 5000 +segment_row_count = 1000 default_float_vec_field_name = "float_vector" default_binary_vec_field_name = "binary_vector" +namespace = "milvus" # TODO: all_index_types = [ @@ -123,6 +124,10 @@ def get_milvus(host, port, uri=None, handler=None, **kwargs): return milvus +def reset_build_index_threshold(connect): + connect.set_config("engine", "build_index_threshold", 1024) + + def disable_flush(connect): connect.set_config("storage", "auto_flush_interval", big_flush_interval) @@ -211,7 +216,7 @@ def gen_single_filter_fields(): def gen_single_vector_fields(): fields = [] for data_type in [DataType.FLOAT_VECTOR, DataType.BINARY_VECTOR]: - field = {"field": data_type.name, "type": data_type, "params": {"dim": dimension}} + field = {"field": data_type.name, "type": data_type, "params": {"dim": dim}} fields.append(field) return fields @@ -221,7 +226,7 @@ def gen_default_fields(auto_id=True): "fields": [ {"field": "int64", "type": DataType.INT64}, {"field": "float", "type": DataType.FLOAT}, - {"field": default_float_vec_field_name, "type": DataType.FLOAT_VECTOR, "params": {"dim": dimension}}, + {"field": default_float_vec_field_name, "type": DataType.FLOAT_VECTOR, "params": {"dim": dim}}, ], "segment_row_count": segment_row_count, "auto_id" : auto_id @@ -234,7 +239,7 @@ def gen_binary_default_fields(auto_id=True): "fields": [ {"field": "int64", "type": DataType.INT64}, {"field": "float", "type": DataType.FLOAT}, - {"field": default_binary_vec_field_name, "type": DataType.BINARY_VECTOR, "params": {"dim": dimension}} + {"field": default_binary_vec_field_name, "type": DataType.BINARY_VECTOR, "params": {"dim": dim}} ], "segment_row_count": segment_row_count, "auto_id" : auto_id @@ -243,7 +248,7 @@ def gen_binary_default_fields(auto_id=True): def gen_entities(nb, is_normal=False): - vectors = gen_vectors(nb, dimension, is_normal) + vectors = gen_vectors(nb, dim, is_normal) entities = [ {"field": "int64", "type": DataType.INT64, "values": [i for i in range(nb)]}, {"field": "float", "type": DataType.FLOAT, "values": [float(i) for i in range(nb)]}, @@ -253,7 +258,7 @@ def gen_entities(nb, is_normal=False): def gen_binary_entities(nb): - raw_vectors, vectors = gen_binary_vectors(nb, dimension) + raw_vectors, vectors = gen_binary_vectors(nb, dim) entities = [ {"field": "int64", "type": DataType.INT64, "values": [i for i in range(nb)]}, {"field": "float", "type": DataType.FLOAT, "values": [float(i) for i in range(nb)]}, @@ -262,7 +267,7 @@ def gen_binary_entities(nb): return raw_vectors, entities -def gen_entities_by_fields(fields, nb, dimension): +def gen_entities_by_fields(fields, nb, dim): entities = [] for field in fields: if field["type"] in [DataType.INT32, DataType.INT64]: @@ -270,9 +275,9 @@ def gen_entities_by_fields(fields, nb, dimension): elif field["type"] in [DataType.FLOAT, DataType.DOUBLE]: field_value = [3.0 for i in range(nb)] elif field["type"] == DataType.BINARY_VECTOR: - field_value = gen_binary_vectors(nb, dimension)[1] + field_value = gen_binary_vectors(nb, dim)[1] elif field["type"] == DataType.FLOAT_VECTOR: - field_value = gen_vectors(nb, dimension) + field_value = gen_vectors(nb, dim) field.update({"values": field_value}) entities.append(field) return entities @@ -401,7 +406,7 @@ def add_field(entities, field_name=None): def add_vector_field(entities, is_normal=False): nb = len(entities[0]["values"]) - vectors = gen_vectors(nb, dimension, is_normal) + vectors = gen_vectors(nb, dim, is_normal) field = { "field": gen_unique_str(), "type": DataType.FLOAT_VECTOR, @@ -456,7 +461,7 @@ def update_field_value(entities, old_type, new_value): return tmp_entities -def add_vector_field(nb, dimension=dimension): +def add_vector_field(nb, dimension=dim): field_name = gen_unique_str() field = { "field": field_name, @@ -468,9 +473,8 @@ def add_vector_field(nb, dimension=dimension): def gen_segment_row_counts(): sizes = [ - 4096, - 8192, - 1000000, + 1024, + 4096 ] return sizes @@ -779,7 +783,6 @@ def restart_server(helm_release_name): from kubernetes import client, config client.rest.logger.setLevel(logging.WARNING) - namespace = "milvus" # service_name = "%s.%s.svc.cluster.local" % (helm_release_name, namespace) config.load_kube_config() v1 = client.CoreV1Api() @@ -793,7 +796,7 @@ def restart_server(helm_release_name): break # v1.patch_namespaced_config_map(config_map_name, namespace, body, pretty='true') # status_res = v1.read_namespaced_service_status(helm_release_name, namespace, pretty='true') - # print(status_res) + logging.getLogger().debug("Pod name: %s" % pod_name) if pod_name is not None: try: v1.delete_namespaced_pod(pod_name, namespace) @@ -802,24 +805,61 @@ def restart_server(helm_release_name): logging.error("Exception when calling CoreV1Api->delete_namespaced_pod") res = False return res - time.sleep(5) + logging.error("Sleep 10s after pod deleted") + time.sleep(10) # check if restart successfully pods = v1.list_namespaced_pod(namespace) for i in pods.items: pod_name_tmp = i.metadata.name - if pod_name_tmp.find(helm_release_name) != -1: - logging.debug(pod_name_tmp) + logging.error(pod_name_tmp) + if pod_name_tmp == pod_name: + continue + elif pod_name_tmp.find(helm_release_name) == -1 or pod_name_tmp.find("mysql") != -1: + continue + else: + status_res = v1.read_namespaced_pod_status(pod_name_tmp, namespace, pretty='true') + logging.error(status_res.status.phase) start_time = time.time() - while time.time() - start_time > timeout: + ready_break = False + while time.time() - start_time <= timeout: + logging.error(time.time()) status_res = v1.read_namespaced_pod_status(pod_name_tmp, namespace, pretty='true') if status_res.status.phase == "Running": + logging.error("Already running") + ready_break = True + time.sleep(10) break - time.sleep(1) + else: + time.sleep(1) if time.time() - start_time > timeout: logging.error("Restart pod: %s timeout" % pod_name_tmp) res = False return res + if ready_break: + break else: - logging.error("Pod: %s not found" % helm_release_name) - res = False + raise Exception("Pod: %s not found" % pod_name) + follow = True + pretty = True + previous = True # bool | Return previous terminated container logs. Defaults to false. (optional) + since_seconds = 56 # int | A relative time in seconds before the current time from which to show logs. If this value precedes the time a pod was started, only logs since the pod start will be returned. If this value is in the future, no logs will be returned. Only one of sinceSeconds or sinceTime may be specified. (optional) + timestamps = True # bool | If true, add an RFC3339 or RFC3339Nano timestamp at the beginning of every line of log output. Defaults to false. (optional) + container = "milvus" + # start_time = time.time() + # while time.time() - start_time <= timeout: + # try: + # api_response = v1.read_namespaced_pod_log(pod_name_tmp, namespace, container=container, follow=follow, + # pretty=pretty, previous=previous, since_seconds=since_seconds, + # timestamps=timestamps) + # logging.error(api_response) + # return res + # except Exception as e: + # logging.error("Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e) + # # waiting for server start + # time.sleep(5) + # # res = False + # # return res + # if time.time() - start_time > timeout: + # logging.error("Restart pod: %s timeout" % pod_name_tmp) + # res = False return res