diff --git a/tests/python_client/chaos/checker.py b/tests/python_client/chaos/checker.py index 3cd18defc9..4b3eeedcd9 100644 --- a/tests/python_client/chaos/checker.py +++ b/tests/python_client/chaos/checker.py @@ -53,37 +53,44 @@ class Singleton(type): class EventRecords(metaclass=Singleton): def __init__(self): - self.file_name = f"/tmp/ci_logs/event_records_{uuid.uuid4()}.parquet" - self.created_file = False + self.file_name = f"/tmp/ci_logs/event_records_{uuid.uuid4()}.jsonl" def insert(self, event_name, event_status, ts=None): log.info(f"insert event: {event_name}, {event_status}") insert_ts = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f') if ts is None else ts data = { - "event_name": [event_name], - "event_status": [event_status], - "event_ts": [insert_ts] + "event_name": event_name, + "event_status": event_status, + "event_ts": insert_ts } - df = pd.DataFrame(data) - if not self.created_file: - with event_lock: - df.to_parquet(self.file_name, engine='fastparquet') - self.created_file = True - else: - with event_lock: - df.to_parquet(self.file_name, engine='fastparquet', append=True) + with event_lock: + with open(self.file_name, 'a') as f: + f.write(json.dumps(data) + '\n') def get_records_df(self): - df = pd.read_parquet(self.file_name) - return df + with event_lock: + try: + records = [] + with open(self.file_name, 'r') as f: + for line in f: + line = line.strip() + if line: + records.append(json.loads(line)) + if not records: + return pd.DataFrame(columns=["event_name", "event_status", "event_ts"]) + return pd.DataFrame(records) + except FileNotFoundError: + return pd.DataFrame(columns=["event_name", "event_status", "event_ts"]) + except Exception as e: + log.warning(f"EventRecords read error: {e}") + return pd.DataFrame(columns=["event_name", "event_status", "event_ts"]) class RequestRecords(metaclass=Singleton): def __init__(self): - self.file_name = f"/tmp/ci_logs/request_records_{uuid.uuid4()}.parquet" + self.file_name = f"/tmp/ci_logs/request_records_{uuid.uuid4()}.jsonl" self.buffer = [] - self.created_file = False def insert(self, operation_name, collection_name, start_time, time_cost, result): data = { @@ -93,38 +100,45 @@ class RequestRecords(metaclass=Singleton): "time_cost": time_cost, "result": result } - self.buffer.append(data) - if len(self.buffer) > 100: - df = pd.DataFrame(self.buffer) - if not self.created_file: - with request_lock: - df.to_parquet(self.file_name, engine='fastparquet') - self.created_file = True - else: - with request_lock: - df.to_parquet(self.file_name, engine='fastparquet', append=True) - self.buffer = [] + with request_lock: + self.buffer.append(data) + if len(self.buffer) >= 100: + self._flush_buffer() - def sink(self): - if len(self.buffer) == 0: + def _flush_buffer(self): + """将 buffer 写入文件(调用时需持有 request_lock)""" + if not self.buffer: return try: - df = pd.DataFrame(self.buffer) + with open(self.file_name, 'a') as f: + for record in self.buffer: + f.write(json.dumps(record) + '\n') + self.buffer = [] except Exception as e: - log.error(f"convert buffer {self.buffer} to dataframe error: {e}") - return - if not self.created_file: - with request_lock: - df.to_parquet(self.file_name, engine='fastparquet') - self.created_file = True - else: - with request_lock: - df.to_parquet(self.file_name, engine='fastparquet', append=True) + log.error(f"RequestRecords flush error: {e}") + + def sink(self): + with request_lock: + self._flush_buffer() def get_records_df(self): self.sink() - df = pd.read_parquet(self.file_name) - return df + with request_lock: + try: + records = [] + with open(self.file_name, 'r') as f: + for line in f: + line = line.strip() + if line: + records.append(json.loads(line)) + if not records: + return pd.DataFrame(columns=["operation_name", "collection_name", "start_time", "time_cost", "result"]) + return pd.DataFrame(records) + except FileNotFoundError: + return pd.DataFrame(columns=["operation_name", "collection_name", "start_time", "time_cost", "result"]) + except Exception as e: + log.warning(f"RequestRecords read error: {e}") + return pd.DataFrame(columns=["operation_name", "collection_name", "start_time", "time_cost", "result"]) class ResultAnalyzer: @@ -1281,7 +1295,6 @@ class InsertChecker(Checker): self.scale = 1 * 10 ** 6 self.start_time_stamp = int(time.time() * self.scale) # us self.term_expr = f'{self.int64_field_name} >= {self.start_time_stamp}' - self.file_name = f"/tmp/ci_logs/insert_data_{uuid.uuid4()}.parquet" @trace() def insert_entities(self): @@ -1368,7 +1381,6 @@ class InsertFreshnessChecker(Checker): self.scale = 1 * 10 ** 6 self.start_time_stamp = int(time.time() * self.scale) # us self.term_expr = f'{self.int64_field_name} >= {self.start_time_stamp}' - self.file_name = f"/tmp/ci_logs/insert_data_{uuid.uuid4()}.parquet" def insert_entities(self): data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.get_schema()) diff --git a/tests/python_client/chaos/requirements.txt b/tests/python_client/chaos/requirements.txt index 34ca43c3a5..310d805c50 100644 --- a/tests/python_client/chaos/requirements.txt +++ b/tests/python_client/chaos/requirements.txt @@ -1,5 +1,4 @@ -# for test result anaylszer +# for test result analyzer prettytable==3.8.0 -pyarrow==14.0.1 -fastparquet==2023.7.0 \ No newline at end of file +pyarrow==14.0.1 \ No newline at end of file diff --git a/tests/python_client/chaos/testcases/test_get_collections.py b/tests/python_client/chaos/testcases/test_get_collections.py index a90c00f2e6..5ab769a7ac 100644 --- a/tests/python_client/chaos/testcases/test_get_collections.py +++ b/tests/python_client/chaos/testcases/test_get_collections.py @@ -1,5 +1,6 @@ import time import json +import os from collections import defaultdict import pytest from pymilvus import Collection @@ -37,8 +38,9 @@ class TestGetCollections(TestcaseBase): data = { "all": selected_collections, } + os.makedirs("/tmp/ci_logs", exist_ok=True) with open("/tmp/ci_logs/chaos_test_all_collections.json", "w") as f: - f.write(json.dumps(data)) + json.dump(data, f) log.info(f"write {len(selected_collections)} collections to /tmp/ci_logs/chaos_test_all_collections.json") collections_in_json = get_chaos_test_collections() assert len(selected_collections) == len(collections_in_json) diff --git a/tests/python_client/utils/util_common.py b/tests/python_client/utils/util_common.py index b7d12d3fe7..76fc126ed3 100644 --- a/tests/python_client/utils/util_common.py +++ b/tests/python_client/utils/util_common.py @@ -89,7 +89,7 @@ def get_chaos_test_collections(): def wait_signal_to_apply_chaos(): - all_db_file = glob.glob("/tmp/ci_logs/event_records*.parquet") + all_db_file = glob.glob("/tmp/ci_logs/event_records*.jsonl") log.info(f"all files {all_db_file}") ready_apply_chaos = True timeout = 15*60 @@ -97,7 +97,13 @@ def wait_signal_to_apply_chaos(): for f in all_db_file: while True and (time.time() - t0 < timeout): try: - df = pd.read_parquet(f) + records = [] + with open(f, 'r') as file: + for line in file: + line = line.strip() + if line: + records.append(json.loads(line)) + df = pd.DataFrame(records) if records else pd.DataFrame(columns=["event_name", "event_status", "event_ts"]) log.debug(f"read {f}:result\n {df}") result = df[(df['event_name'] == 'init_chaos') & (df['event_status'] == 'ready')] if len(result) > 0: @@ -107,7 +113,7 @@ def wait_signal_to_apply_chaos(): else: ready_apply_chaos = False except Exception as e: - log.error(f"read_parquet error: {e}") + log.error(f"read jsonl error: {e}") ready_apply_chaos = False time.sleep(10)