From e3a85be43577e225216a4672eaa3bd275c879062 Mon Sep 17 00:00:00 2001 From: zhuwenxing Date: Tue, 30 Dec 2025 14:13:21 +0800 Subject: [PATCH] test: replace parquet with jsonl for EventRecords and RequestRecords in checker (#46671) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit /kind improvement - Core invariant: tests' persistence of EventRecords and RequestRecords must be append-safe under concurrent writers; this PR replaces Parquet with JSONL and uses per-file locks and explicit buffer flushes to guarantee atomic, append-safe writes (EventRecords uses event_lock + append per line; RequestRecords buffers under request_lock and flushes to file when threshold or on sink()). - Logic removed/simplified and rationale: DataFrame-based parquet append/read logic (pyarrow/fastparquet) and implicit parquet buffering were removed in favor of simple line-oriented JSON writes and explicit buffer management. The complex Parquet append/merge paths were redundant because parquet append under concurrent test-writer patterns caused corruption; JSONL removes the append-mode complexity and the parquet-specific buffering/serialization code. - Why no data loss or behavior regression (concrete code paths): EventRecords.insert writes a complete JSON object per event under event_lock to /tmp/ci_logs/event_records_*.jsonl and get_records_df reads every JSON line under the same lock (or returns an empty DataFrame with the same schema on FileNotFound/Error), preserving all fields event_name/event_status/event_ts. RequestRecords.insert appends to an in-memory buffer under request_lock and triggers _flush_buffer() when len(buffer) >= 100; _flush_buffer() writes each buffered JSON line to /tmp/ci_logs/request_records_*.jsonl and clears the buffer; sink() calls _flush_buffer() under request_lock before get_records_df() reads the file — ensuring all buffered records are persisted before reads. Both read paths handle FileNotFoundError and exceptions by returning empty DataFrames with identical column schemas, so external callers see the same API and no silent record loss. - Enhancement summary (concrete): Replaces flaky Parquet append/read with JSONL + explicit locking and deterministic flush semantics, removing the root cause of parquet append corruption in tests while keeping the original DataFrame-based analysis consumers unchanged (get_records_df returns equivalent schemas). Signed-off-by: zhuwenxing --- tests/python_client/chaos/checker.py | 100 ++++++++++-------- tests/python_client/chaos/requirements.txt | 5 +- .../chaos/testcases/test_get_collections.py | 4 +- tests/python_client/utils/util_common.py | 12 ++- 4 files changed, 70 insertions(+), 51 deletions(-) diff --git a/tests/python_client/chaos/checker.py b/tests/python_client/chaos/checker.py index 3cd18defc9..4b3eeedcd9 100644 --- a/tests/python_client/chaos/checker.py +++ b/tests/python_client/chaos/checker.py @@ -53,37 +53,44 @@ class Singleton(type): class EventRecords(metaclass=Singleton): def __init__(self): - self.file_name = f"/tmp/ci_logs/event_records_{uuid.uuid4()}.parquet" - self.created_file = False + self.file_name = f"/tmp/ci_logs/event_records_{uuid.uuid4()}.jsonl" def insert(self, event_name, event_status, ts=None): log.info(f"insert event: {event_name}, {event_status}") insert_ts = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f') if ts is None else ts data = { - "event_name": [event_name], - "event_status": [event_status], - "event_ts": [insert_ts] + "event_name": event_name, + "event_status": event_status, + "event_ts": insert_ts } - df = pd.DataFrame(data) - if not self.created_file: - with event_lock: - df.to_parquet(self.file_name, engine='fastparquet') - self.created_file = True - else: - with event_lock: - df.to_parquet(self.file_name, engine='fastparquet', append=True) + with event_lock: + with open(self.file_name, 'a') as f: + f.write(json.dumps(data) + '\n') def get_records_df(self): - df = pd.read_parquet(self.file_name) - return df + with event_lock: + try: + records = [] + with open(self.file_name, 'r') as f: + for line in f: + line = line.strip() + if line: + records.append(json.loads(line)) + if not records: + return pd.DataFrame(columns=["event_name", "event_status", "event_ts"]) + return pd.DataFrame(records) + except FileNotFoundError: + return pd.DataFrame(columns=["event_name", "event_status", "event_ts"]) + except Exception as e: + log.warning(f"EventRecords read error: {e}") + return pd.DataFrame(columns=["event_name", "event_status", "event_ts"]) class RequestRecords(metaclass=Singleton): def __init__(self): - self.file_name = f"/tmp/ci_logs/request_records_{uuid.uuid4()}.parquet" + self.file_name = f"/tmp/ci_logs/request_records_{uuid.uuid4()}.jsonl" self.buffer = [] - self.created_file = False def insert(self, operation_name, collection_name, start_time, time_cost, result): data = { @@ -93,38 +100,45 @@ class RequestRecords(metaclass=Singleton): "time_cost": time_cost, "result": result } - self.buffer.append(data) - if len(self.buffer) > 100: - df = pd.DataFrame(self.buffer) - if not self.created_file: - with request_lock: - df.to_parquet(self.file_name, engine='fastparquet') - self.created_file = True - else: - with request_lock: - df.to_parquet(self.file_name, engine='fastparquet', append=True) - self.buffer = [] + with request_lock: + self.buffer.append(data) + if len(self.buffer) >= 100: + self._flush_buffer() - def sink(self): - if len(self.buffer) == 0: + def _flush_buffer(self): + """将 buffer 写入文件(调用时需持有 request_lock)""" + if not self.buffer: return try: - df = pd.DataFrame(self.buffer) + with open(self.file_name, 'a') as f: + for record in self.buffer: + f.write(json.dumps(record) + '\n') + self.buffer = [] except Exception as e: - log.error(f"convert buffer {self.buffer} to dataframe error: {e}") - return - if not self.created_file: - with request_lock: - df.to_parquet(self.file_name, engine='fastparquet') - self.created_file = True - else: - with request_lock: - df.to_parquet(self.file_name, engine='fastparquet', append=True) + log.error(f"RequestRecords flush error: {e}") + + def sink(self): + with request_lock: + self._flush_buffer() def get_records_df(self): self.sink() - df = pd.read_parquet(self.file_name) - return df + with request_lock: + try: + records = [] + with open(self.file_name, 'r') as f: + for line in f: + line = line.strip() + if line: + records.append(json.loads(line)) + if not records: + return pd.DataFrame(columns=["operation_name", "collection_name", "start_time", "time_cost", "result"]) + return pd.DataFrame(records) + except FileNotFoundError: + return pd.DataFrame(columns=["operation_name", "collection_name", "start_time", "time_cost", "result"]) + except Exception as e: + log.warning(f"RequestRecords read error: {e}") + return pd.DataFrame(columns=["operation_name", "collection_name", "start_time", "time_cost", "result"]) class ResultAnalyzer: @@ -1281,7 +1295,6 @@ class InsertChecker(Checker): self.scale = 1 * 10 ** 6 self.start_time_stamp = int(time.time() * self.scale) # us self.term_expr = f'{self.int64_field_name} >= {self.start_time_stamp}' - self.file_name = f"/tmp/ci_logs/insert_data_{uuid.uuid4()}.parquet" @trace() def insert_entities(self): @@ -1368,7 +1381,6 @@ class InsertFreshnessChecker(Checker): self.scale = 1 * 10 ** 6 self.start_time_stamp = int(time.time() * self.scale) # us self.term_expr = f'{self.int64_field_name} >= {self.start_time_stamp}' - self.file_name = f"/tmp/ci_logs/insert_data_{uuid.uuid4()}.parquet" def insert_entities(self): data = cf.gen_row_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.get_schema()) diff --git a/tests/python_client/chaos/requirements.txt b/tests/python_client/chaos/requirements.txt index 34ca43c3a5..310d805c50 100644 --- a/tests/python_client/chaos/requirements.txt +++ b/tests/python_client/chaos/requirements.txt @@ -1,5 +1,4 @@ -# for test result anaylszer +# for test result analyzer prettytable==3.8.0 -pyarrow==14.0.1 -fastparquet==2023.7.0 \ No newline at end of file +pyarrow==14.0.1 \ No newline at end of file diff --git a/tests/python_client/chaos/testcases/test_get_collections.py b/tests/python_client/chaos/testcases/test_get_collections.py index a90c00f2e6..5ab769a7ac 100644 --- a/tests/python_client/chaos/testcases/test_get_collections.py +++ b/tests/python_client/chaos/testcases/test_get_collections.py @@ -1,5 +1,6 @@ import time import json +import os from collections import defaultdict import pytest from pymilvus import Collection @@ -37,8 +38,9 @@ class TestGetCollections(TestcaseBase): data = { "all": selected_collections, } + os.makedirs("/tmp/ci_logs", exist_ok=True) with open("/tmp/ci_logs/chaos_test_all_collections.json", "w") as f: - f.write(json.dumps(data)) + json.dump(data, f) log.info(f"write {len(selected_collections)} collections to /tmp/ci_logs/chaos_test_all_collections.json") collections_in_json = get_chaos_test_collections() assert len(selected_collections) == len(collections_in_json) diff --git a/tests/python_client/utils/util_common.py b/tests/python_client/utils/util_common.py index b7d12d3fe7..76fc126ed3 100644 --- a/tests/python_client/utils/util_common.py +++ b/tests/python_client/utils/util_common.py @@ -89,7 +89,7 @@ def get_chaos_test_collections(): def wait_signal_to_apply_chaos(): - all_db_file = glob.glob("/tmp/ci_logs/event_records*.parquet") + all_db_file = glob.glob("/tmp/ci_logs/event_records*.jsonl") log.info(f"all files {all_db_file}") ready_apply_chaos = True timeout = 15*60 @@ -97,7 +97,13 @@ def wait_signal_to_apply_chaos(): for f in all_db_file: while True and (time.time() - t0 < timeout): try: - df = pd.read_parquet(f) + records = [] + with open(f, 'r') as file: + for line in file: + line = line.strip() + if line: + records.append(json.loads(line)) + df = pd.DataFrame(records) if records else pd.DataFrame(columns=["event_name", "event_status", "event_ts"]) log.debug(f"read {f}:result\n {df}") result = df[(df['event_name'] == 'init_chaos') & (df['event_status'] == 'ready')] if len(result) > 0: @@ -107,7 +113,7 @@ def wait_signal_to_apply_chaos(): else: ready_apply_chaos = False except Exception as e: - log.error(f"read_parquet error: {e}") + log.error(f"read jsonl error: {e}") ready_apply_chaos = False time.sleep(10)