mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-05 10:22:41 +08:00
Add async insert case in test20 (#6169)
* add async insert cases Signed-off-by: ThreadDao <yufen.zong@zilliz.com> * [skip ci] skip ci Signed-off-by: ThreadDao <yufen.zong@zilliz.com>
This commit is contained in:
parent
95ad498822
commit
9adea12df3
@ -48,7 +48,7 @@ class TestInsertParams(TestcaseBase):
|
||||
collection_w = self.init_collection_wrap(name=c_name)
|
||||
df = cf.gen_default_dataframe_data(ct.default_nb)
|
||||
mutation_res, _ = collection_w.insert(data=df)
|
||||
# assert mutation_res.insert_count == ct.default_nb
|
||||
assert mutation_res.insert_count == ct.default_nb
|
||||
assert mutation_res.primary_keys == df[ct.default_int64_field_name].values.tolist()
|
||||
assert collection_w.num_entities == ct.default_nb
|
||||
|
||||
@ -63,7 +63,7 @@ class TestInsertParams(TestcaseBase):
|
||||
collection_w = self.init_collection_wrap(name=c_name)
|
||||
data = cf.gen_default_list_data(ct.default_nb)
|
||||
mutation_res, _ = collection_w.insert(data=data)
|
||||
# assert mutation_res.insert_count == ct.default_nb
|
||||
assert mutation_res.insert_count == ct.default_nb
|
||||
assert mutation_res.primary_keys == data[0]
|
||||
assert collection_w.num_entities == ct.default_nb
|
||||
|
||||
@ -181,7 +181,7 @@ class TestInsertParams(TestcaseBase):
|
||||
collection_w = self.init_collection_wrap(name=c_name, schema=default_binary_schema)
|
||||
df, _ = cf.gen_default_binary_dataframe_data(ct.default_nb)
|
||||
mutation_res, _ = collection_w.insert(data=df)
|
||||
# assert mutation_res.insert_count == ct.default_nb
|
||||
assert mutation_res.insert_count == ct.default_nb
|
||||
assert mutation_res.primary_keys == df[ct.default_int64_field_name].values.tolist()
|
||||
assert collection_w.num_entities == ct.default_nb
|
||||
|
||||
@ -196,7 +196,7 @@ class TestInsertParams(TestcaseBase):
|
||||
collection_w = self.init_collection_wrap(name=c_name, schema=default_binary_schema)
|
||||
data, _ = cf.gen_default_binary_list_data(ct.default_nb)
|
||||
mutation_res, _ = collection_w.insert(data=data)
|
||||
# assert mutation_res.insert_count == ct.default_nb
|
||||
assert mutation_res.insert_count == ct.default_nb
|
||||
assert mutation_res.primary_keys == data[0]
|
||||
assert collection_w.num_entities == ct.default_nb
|
||||
|
||||
@ -211,7 +211,7 @@ class TestInsertParams(TestcaseBase):
|
||||
collection_w = self.init_collection_wrap(name=c_name)
|
||||
data = cf.gen_default_list_data(nb=1)
|
||||
mutation_res, _ = collection_w.insert(data=data)
|
||||
# assert mutation_res.insert_count == 1
|
||||
assert mutation_res.insert_count == 1
|
||||
assert mutation_res.primary_keys == data[0]
|
||||
assert collection_w.num_entities == 1
|
||||
|
||||
@ -553,7 +553,7 @@ class TestInsertOperation(TestcaseBase):
|
||||
collection_w = self.init_collection_wrap(name=c_name, schema=schema)
|
||||
data = cf.gen_default_list_data(nb=ct.default_nb)
|
||||
mutation_res, _ = collection_w.insert(data=data[1:])
|
||||
# assert mutation_res.insert_count == ct.default_nb
|
||||
assert mutation_res.insert_count == ct.default_nb
|
||||
assert cf._check_primary_keys(mutation_res.primary_keys, ct.default_nb)
|
||||
assert collection_w.num_entities == ct.default_nb
|
||||
|
||||
@ -600,7 +600,7 @@ class TestInsertOperation(TestcaseBase):
|
||||
data = cf.gen_default_list_data(nb=nb)
|
||||
data[0] = [1 for i in range(nb)]
|
||||
mutation_res, _ = collection_w.insert(data)
|
||||
# assert mutation_res.insert_count == nb
|
||||
assert mutation_res.insert_count == nb
|
||||
assert mutation_res.primary_keys == data[0]
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
@ -639,7 +639,7 @@ class TestInsertOperation(TestcaseBase):
|
||||
assert mutation_res.primary_keys == primary_keys
|
||||
|
||||
for i in range(thread_num):
|
||||
x = threading.Thread(target=insert, args=(i, ))
|
||||
x = threading.Thread(target=insert, args=(i,))
|
||||
threads.append(x)
|
||||
x.start()
|
||||
for t in threads:
|
||||
@ -669,13 +669,12 @@ class TestInsertOperation(TestcaseBase):
|
||||
for _ in range(ct.default_nb // step):
|
||||
df = cf.gen_default_dataframe_data(step)
|
||||
mutation_res, _ = collection_w.insert(data=df)
|
||||
# assert mutation_res.insert_count == step
|
||||
assert mutation_res.insert_count == step
|
||||
assert mutation_res.primary_keys == df[ct.default_int64_field_name].values.tolist()
|
||||
|
||||
assert collection_w.num_entities == ct.default_nb
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="waiting for MutationFuture")
|
||||
class TestInsertAsync(TestcaseBase):
|
||||
"""
|
||||
******************************************************************
|
||||
@ -683,67 +682,115 @@ class TestInsertAsync(TestcaseBase):
|
||||
******************************************************************
|
||||
"""
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_insert_sync(self):
|
||||
"""
|
||||
target: test async insert
|
||||
method: insert with async=True
|
||||
expected: verify num entities
|
||||
"""
|
||||
# c_name = cf.gen_unique_str(prefix)
|
||||
# collection_w = self.init_collection_wrap(name=c_name)
|
||||
# df = cf.gen_default_dataframe_data(nb=100)
|
||||
# future, _ = collection_w.insert(data=df, _async=True)
|
||||
# future.done()
|
||||
# res = future.result()
|
||||
# log.debug(res.primary_keys)
|
||||
# assert mutation_res.insert_count == ct.default_nb
|
||||
# assert mutation_res.primary_keys == df[ct.default_int64_field_name].values.tolist()
|
||||
# assert collection_w.num_entities == ct.default_nb
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
df = cf.gen_default_dataframe_data(nb=ct.default_nb)
|
||||
future, _ = collection_w.insert(data=df, _async=True)
|
||||
future.done()
|
||||
mutation_res = future.result()
|
||||
assert mutation_res.insert_count == ct.default_nb
|
||||
assert mutation_res.primary_keys == df[ct.default_int64_field_name].values.tolist()
|
||||
assert collection_w.num_entities == ct.default_nb
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_insert_async_false(self):
|
||||
"""
|
||||
target: test insert with false async
|
||||
method: async = false
|
||||
expected: verify num entities
|
||||
"""
|
||||
pass
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
df = cf.gen_default_dataframe_data(nb=ct.default_nb)
|
||||
mutation_res, _ = collection_w.insert(data=df, _async=False)
|
||||
assert mutation_res.insert_count == ct.default_nb
|
||||
assert mutation_res.primary_keys == df[ct.default_int64_field_name].values.tolist()
|
||||
assert collection_w.num_entities == ct.default_nb
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
@pytest.mark.xfail(reason="#6160")
|
||||
def test_insert_async_callback(self):
|
||||
"""
|
||||
target: test insert with callback func
|
||||
method: insert with callback func
|
||||
expected: verify num entities
|
||||
"""
|
||||
pass
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
df = cf.gen_default_dataframe_data(nb=ct.default_nb)
|
||||
future, _ = collection_w.insert(data=df, _async=True, _callback=assert_mutation_result)
|
||||
future.done()
|
||||
assert collection_w.num_entities == ct.default_nb
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_insert_async_long(self):
|
||||
"""
|
||||
target: test insert with async
|
||||
method: insert 5w entities with callback func
|
||||
expected: verify num entities
|
||||
"""
|
||||
pass
|
||||
nb = 50000
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
df = cf.gen_default_dataframe_data(nb)
|
||||
future, _ = collection_w.insert(data=df, _async=True)
|
||||
future.done()
|
||||
mutation_res = future.result()
|
||||
assert mutation_res.insert_count == nb
|
||||
assert mutation_res.primary_keys == df[ct.default_int64_field_name].values.tolist()
|
||||
assert collection_w.num_entities == nb
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
@pytest.mark.xfail(reason="#6167")
|
||||
def test_insert_async_callback_timeout(self):
|
||||
"""
|
||||
target: test insert async with callback
|
||||
method: insert 10w entities with timeout=1
|
||||
expected: raise exception
|
||||
"""
|
||||
pass
|
||||
nb = 100000
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
df = cf.gen_default_dataframe_data(nb)
|
||||
err_msg = "timeout"
|
||||
future, _ = collection_w.insert(data=df, _async=True, _callback=assert_mutation_result, timeout=1)
|
||||
future.done()
|
||||
with pytest.raises(Exception, match=err_msg):
|
||||
future.result()
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_insert_async_invalid_data(self):
|
||||
"""
|
||||
target: test insert async with invalid data
|
||||
method: insert async with invalid data
|
||||
expected: raise exception
|
||||
"""
|
||||
pass
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
columns = [ct.default_int64_field_name, ct.default_float_vec_field_name]
|
||||
df = pd.DataFrame(columns=columns)
|
||||
error = {ct.err_code: 1, ct.err_msg: "Cannot infer schema from empty dataframe"}
|
||||
collection_w.insert(data=df, _async=True, check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_insert_async_invalid_partition(self):
|
||||
"""
|
||||
target: test insert async with invalid partition
|
||||
method: insert async with invalid partition
|
||||
expected: raise exception
|
||||
"""
|
||||
pass
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
df = cf.gen_default_dataframe_data()
|
||||
err_msg = "partitionID of partitionName:p can not be find"
|
||||
future, _ = collection_w.insert(data=df, partition_name="p", _async=True)
|
||||
future.done()
|
||||
with pytest.raises(Exception, match=err_msg):
|
||||
future.result()
|
||||
|
||||
|
||||
def assert_mutation_result(mutation_res):
|
||||
# todo
|
||||
log.info(f"The result is {mutation_res}")
|
||||
# mutation_res = future.result()
|
||||
# assert mutation_res.insert_count == 100
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user