diff --git a/tests/python_client/testcases/test_query.py b/tests/python_client/testcases/test_query.py index df1942d876..cf6653f5f0 100644 --- a/tests/python_client/testcases/test_query.py +++ b/tests/python_client/testcases/test_query.py @@ -3,6 +3,7 @@ import random import numpy as np import pandas as pd from pymilvus import DefaultConfig +import threading from base.client_base import TestcaseBase from common.code_mapping import ConnectionErrorMessage as cem @@ -1330,3 +1331,51 @@ class TestqueryString(TestcaseBase): expression = 'varchar == int64' collection_w.query(expression, check_task=CheckTasks.err_res, check_items={ct.err_code: 1, ct.err_msg: f' cannot parse expression:{expression}'}) + + @pytest.mark.tags(CaseLabel.L1) + def test_query_after_insert_multi_threading(self): + """ + target: test data consistency after multi threading insert + method: multi threads insert, and query, compare queried data with original + expected: verify data consistency + """ + collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) + thread_num = 4 + threads = [] + primary_keys = [] + df_list = [] + + #prepare original data for parallel insert + for i in range(thread_num): + df = cf.gen_default_dataframe_data(ct.default_nb, start=i*ct.default_nb) + df_list.append(df) + primary_key = df[ct.default_int64_field_name].values.tolist() + primary_keys.append(primary_key) + + def insert(thread_i): + log.debug(f'In thread-{thread_i}') + mutation_res, _ = collection_w.insert(df_list[thread_i]) + assert mutation_res.insert_count == ct.default_nb + assert mutation_res.primary_keys == primary_keys[thread_i] + + for i in range(thread_num): + x = threading.Thread(target=insert, args=(i,)) + threads.append(x) + x.start() + for t in threads: + t.join() + assert collection_w.num_entities == ct.default_nb * thread_num + + #Check data consistency after parallel insert + collection_w.load() + df_dict_list = [] + for df in df_list: + df_dict_list += df.to_dict('records') + output_fields = ["*", "%"] + expression = "int64 >= 0" + collection_w.query(expression, output_fields=output_fields, + check_task=CheckTasks.check_query_results, + check_items={exp_res: df_dict_list, + "primary_field": default_int_field_name, + "with_vec": True}) +