Add data consistency case after parallel insert (#17714)

Signed-off-by: Binbin Lv <binbin.lv@zilliz.com>
This commit is contained in:
binbin 2022-06-22 20:32:18 +08:00 committed by GitHub
parent 6fdf88f452
commit 379bd7846d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -3,6 +3,7 @@ import random
import numpy as np
import pandas as pd
from pymilvus import DefaultConfig
import threading
from base.client_base import TestcaseBase
from common.code_mapping import ConnectionErrorMessage as cem
@ -1330,3 +1331,51 @@ class TestqueryString(TestcaseBase):
expression = 'varchar == int64'
collection_w.query(expression, check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: f' cannot parse expression:{expression}'})
@pytest.mark.tags(CaseLabel.L1)
def test_query_after_insert_multi_threading(self):
"""
target: test data consistency after multi threading insert
method: multi threads insert, and query, compare queried data with original
expected: verify data consistency
"""
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
thread_num = 4
threads = []
primary_keys = []
df_list = []
#prepare original data for parallel insert
for i in range(thread_num):
df = cf.gen_default_dataframe_data(ct.default_nb, start=i*ct.default_nb)
df_list.append(df)
primary_key = df[ct.default_int64_field_name].values.tolist()
primary_keys.append(primary_key)
def insert(thread_i):
log.debug(f'In thread-{thread_i}')
mutation_res, _ = collection_w.insert(df_list[thread_i])
assert mutation_res.insert_count == ct.default_nb
assert mutation_res.primary_keys == primary_keys[thread_i]
for i in range(thread_num):
x = threading.Thread(target=insert, args=(i,))
threads.append(x)
x.start()
for t in threads:
t.join()
assert collection_w.num_entities == ct.default_nb * thread_num
#Check data consistency after parallel insert
collection_w.load()
df_dict_list = []
for df in df_list:
df_dict_list += df.to_dict('records')
output_fields = ["*", "%"]
expression = "int64 >= 0"
collection_w.query(expression, output_fields=output_fields,
check_task=CheckTasks.check_query_results,
check_items={exp_res: df_dict_list,
"primary_field": default_int_field_name,
"with_vec": True})