milvus/tests/milvus_python_test/test_partition.py
2019-11-21 17:23:06 +08:00

431 lines
16 KiB
Python

import time
import random
import pdb
import threading
import logging
from multiprocessing import Pool, Process
import pytest
from milvus import Milvus, IndexType, MetricType
from utils import *
dim = 128
index_file_size = 10
table_id = "test_add"
ADD_TIMEOUT = 60
nprobe = 1
epsilon = 0.0001
tag = "1970-01-01"
class TestCreateBase:
"""
******************************************************************
The following cases are used to test `create_partition` function
******************************************************************
"""
def test_create_partition(self, connect, table):
'''
target: test create partition, check status returned
method: call function: create_partition
expected: status ok
'''
partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag)
assert status.OK()
def test_create_partition_repeat(self, connect, table):
'''
target: test create partition, check status returned
method: call function: create_partition
expected: status ok
'''
partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag)
status = connect.create_partition(table, partition_name, tag)
assert not status.OK()
def test_create_partition_recursively(self, connect, table):
'''
target: test create partition, and create partition in parent partition, check status returned
method: call function: create_partition
expected: status not ok
'''
partition_name = gen_unique_str()
new_partition_name = gen_unique_str()
new_tag = "new_tag"
status = connect.create_partition(table, partition_name, tag)
status = connect.create_partition(partition_name, new_partition_name, new_tag)
assert not status.OK()
def test_create_partition_table_not_existed(self, connect):
'''
target: test create partition, its owner table name not existed in db, check status returned
method: call function: create_partition
expected: status not ok
'''
table_name = gen_unique_str()
partition_name = gen_unique_str()
status = connect.create_partition(table_name, partition_name, tag)
assert not status.OK()
def test_create_partition_partition_name_existed(self, connect, table):
'''
target: test create partition, and create the same partition again, check status returned
method: call function: create_partition
expected: status not ok
'''
partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag)
assert status.OK()
tag_new = "tag_new"
status = connect.create_partition(table, partition_name, tag_new)
assert not status.OK()
def test_create_partition_partition_name_equals_table(self, connect, table):
'''
target: test create partition, the partition equals to table, check status returned
method: call function: create_partition
expected: status not ok
'''
status = connect.create_partition(table, table, tag)
assert not status.OK()
def test_create_partition_partition_name_None(self, connect, table):
'''
target: test create partition, partition name set None, check status returned
method: call function: create_partition
expected: status not ok
'''
partition_name = None
status = connect.create_partition(table, partition_name, tag)
assert not status.OK()
def test_create_partition_tag_name_None(self, connect, table):
'''
target: test create partition, tag name set None, check status returned
method: call function: create_partition
expected: status ok
'''
tag_name = None
partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag_name)
assert not status.OK()
def test_create_different_partition_tag_name_existed(self, connect, table):
'''
target: test create partition, and create the same partition tag again, check status returned
method: call function: create_partition with the same tag name
expected: status not ok
'''
partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag)
assert status.OK()
new_partition_name = gen_unique_str()
status = connect.create_partition(table, new_partition_name, tag)
assert not status.OK()
def test_create_partition_add_vectors(self, connect, table):
'''
target: test create partition, and insert vectors, check status returned
method: call function: create_partition
expected: status ok
'''
partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag)
assert status.OK()
nq = 100
vectors = gen_vectors(nq, dim)
ids = [i for i in range(nq)]
status, ids = connect.insert(table, vectors, ids)
assert status.OK()
def test_create_partition_insert_with_tag(self, connect, table):
'''
target: test create partition, and insert vectors, check status returned
method: call function: create_partition
expected: status ok
'''
partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag)
assert status.OK()
nq = 100
vectors = gen_vectors(nq, dim)
ids = [i for i in range(nq)]
status, ids = connect.insert(table, vectors, ids, partition_tag=tag)
assert status.OK()
def test_create_partition_insert_with_tag_not_existed(self, connect, table):
'''
target: test create partition, and insert vectors, check status returned
method: call function: create_partition
expected: status not ok
'''
tag_new = "tag_new"
partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag)
assert status.OK()
nq = 100
vectors = gen_vectors(nq, dim)
ids = [i for i in range(nq)]
status, ids = connect.insert(table, vectors, ids, partition_tag=tag_new)
assert not status.OK()
def test_create_partition_insert_same_tags(self, connect, table):
'''
target: test create partition, and insert vectors, check status returned
method: call function: create_partition
expected: status ok
'''
partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag)
assert status.OK()
nq = 100
vectors = gen_vectors(nq, dim)
ids = [i for i in range(nq)]
status, ids = connect.insert(table, vectors, ids, partition_tag=tag)
ids = [(i+100) for i in range(nq)]
status, ids = connect.insert(table, vectors, ids, partition_tag=tag)
assert status.OK()
time.sleep(1)
status, res = connect.get_table_row_count(partition_name)
assert res == nq * 2
def test_create_partition_insert_same_tags_two_tables(self, connect, table):
'''
target: test create two partitions, and insert vectors with the same tag to each table, check status returned
method: call function: create_partition
expected: status ok, table length is correct
'''
partition_name = gen_unique_str()
table_new = gen_unique_str()
new_partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag)
assert status.OK()
param = {'table_name': table_new,
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.L2}
status = connect.create_table(param)
status = connect.create_partition(table_new, new_partition_name, tag)
assert status.OK()
nq = 100
vectors = gen_vectors(nq, dim)
ids = [i for i in range(nq)]
status, ids = connect.insert(table, vectors, ids, partition_tag=tag)
ids = [(i+100) for i in range(nq)]
status, ids = connect.insert(table_new, vectors, ids, partition_tag=tag)
assert status.OK()
time.sleep(1)
status, res = connect.get_table_row_count(new_partition_name)
assert res == nq
class TestShowBase:
"""
******************************************************************
The following cases are used to test `show_partitions` function
******************************************************************
"""
def test_show_partitions(self, connect, table):
'''
target: test show partitions, check status and partitions returned
method: create partition first, then call function: show_partitions
expected: status ok, partition correct
'''
partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag)
status, res = connect.show_partitions(table)
assert status.OK()
def test_show_partitions_no_partition(self, connect, table):
'''
target: test show partitions with table name, check status and partitions returned
method: call function: show_partitions
expected: status ok, partitions correct
'''
partition_name = gen_unique_str()
status, res = connect.show_partitions(table)
assert status.OK()
def test_show_partitions_no_partition_recursive(self, connect, table):
'''
target: test show partitions with partition name, check status and partitions returned
method: call function: show_partitions
expected: status ok, no partitions
'''
partition_name = gen_unique_str()
status, res = connect.show_partitions(partition_name)
assert status.OK()
assert len(res) == 0
def test_show_multi_partitions(self, connect, table):
'''
target: test show partitions, check status and partitions returned
method: create partitions first, then call function: show_partitions
expected: status ok, partitions correct
'''
partition_name = gen_unique_str()
new_partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag)
status = connect.create_partition(table, new_partition_name, tag)
status, res = connect.show_partitions(table)
assert status.OK()
class TestDropBase:
"""
******************************************************************
The following cases are used to test `drop_partition` function
******************************************************************
"""
def test_drop_partition(self, connect, table):
'''
target: test drop partition, check status and partition if existed
method: create partitions first, then call function: drop_partition
expected: status ok, no partitions in db
'''
partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag)
status = connect.drop_partition(table, tag)
assert status.OK()
# check if the partition existed
status, res = connect.show_partitions(table)
assert partition_name not in res
def test_drop_partition_tag_not_existed(self, connect, table):
'''
target: test drop partition, but tag not existed
method: create partitions first, then call function: drop_partition
expected: status not ok
'''
partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag)
new_tag = "new_tag"
status = connect.drop_partition(table, new_tag)
assert not status.OK()
def test_drop_partition_tag_not_existed_A(self, connect, table):
'''
target: test drop partition, but table not existed
method: create partitions first, then call function: drop_partition
expected: status not ok
'''
partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag)
new_table = gen_unique_str()
status = connect.drop_partition(new_table, tag)
assert not status.OK()
def test_drop_partition_repeatedly(self, connect, table):
'''
target: test drop partition twice, check status and partition if existed
method: create partitions first, then call function: drop_partition
expected: status not ok, no partitions in db
'''
partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag)
status = connect.drop_partition(table, tag)
status = connect.drop_partition(table, tag)
time.sleep(2)
assert not status.OK()
status, res = connect.show_partitions(table)
assert partition_name not in res
def test_drop_partition_create(self, connect, table):
'''
target: test drop partition, and create again, check status
method: create partitions first, then call function: drop_partition, create_partition
expected: status not ok, partition in db
'''
partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag)
status = connect.drop_partition(table, tag)
time.sleep(2)
status = connect.create_partition(table, partition_name, tag)
assert status.OK()
status, res = connect.show_partitions(table)
assert partition_name == res[0].partition_name
class TestNameInvalid(object):
@pytest.fixture(
scope="function",
params=gen_invalid_table_names()
)
def get_partition_name(self, request):
yield request.param
@pytest.fixture(
scope="function",
params=gen_invalid_table_names()
)
def get_tag_name(self, request):
yield request.param
@pytest.fixture(
scope="function",
params=gen_invalid_table_names()
)
def get_table_name(self, request):
yield request.param
def test_create_partition_with_invalid_partition_name(self, connect, table, get_partition_name):
'''
target: test create partition, with invalid partition name, check status returned
method: call function: create_partition
expected: status not ok
'''
partition_name = get_partition_name
status = connect.create_partition(table, partition_name, tag)
assert not status.OK()
def test_create_partition_with_invalid_tag_name(self, connect, table):
'''
target: test create partition, with invalid partition name, check status returned
method: call function: create_partition
expected: status not ok
'''
tag_name = " "
partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag_name)
assert not status.OK()
def test_drop_partition_with_invalid_table_name(self, connect, table, get_table_name):
'''
target: test drop partition, with invalid table name, check status returned
method: call function: drop_partition
expected: status not ok
'''
table_name = get_table_name
partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag)
status = connect.drop_partition(table_name, tag)
assert not status.OK()
def test_drop_partition_with_invalid_tag_name(self, connect, table, get_tag_name):
'''
target: test drop partition, with invalid tag name, check status returned
method: call function: drop_partition
expected: status not ok
'''
tag_name = get_tag_name
partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag)
status = connect.drop_partition(table, tag_name)
assert not status.OK()
def test_show_partitions_with_invalid_table_name(self, connect, table, get_table_name):
'''
target: test show partitions, with invalid table name, check status returned
method: call function: show_partitions
expected: status not ok
'''
table_name = get_table_name
partition_name = gen_unique_str()
status = connect.create_partition(table, partition_name, tag)
status, res = connect.show_partitions(table_name)
assert not status.OK()