milvus/tests/python_client/utils/util_common.py
zhuwenxing e3a85be435
test: replace parquet with jsonl for EventRecords and RequestRecords in checker (#46671)
/kind improvement

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
- Core invariant: tests' persistence of EventRecords and RequestRecords
must be append-safe under concurrent writers; this PR replaces Parquet
with JSONL and uses per-file locks and explicit buffer flushes to
guarantee atomic, append-safe writes (EventRecords uses event_lock +
append per line; RequestRecords buffers under request_lock and flushes
to file when threshold or on sink()).

- Logic removed/simplified and rationale: DataFrame-based parquet
append/read logic (pyarrow/fastparquet) and implicit parquet buffering
were removed in favor of simple line-oriented JSON writes and explicit
buffer management. The complex Parquet append/merge paths were redundant
because parquet append under concurrent test-writer patterns caused
corruption; JSONL removes the append-mode complexity and the
parquet-specific buffering/serialization code.

- Why no data loss or behavior regression (concrete code paths):
EventRecords.insert writes a complete JSON object per event under
event_lock to /tmp/ci_logs/event_records_*.jsonl and get_records_df
reads every JSON line under the same lock (or returns an empty DataFrame
with the same schema on FileNotFound/Error), preserving all fields
event_name/event_status/event_ts. RequestRecords.insert appends to an
in-memory buffer under request_lock and triggers _flush_buffer() when
len(buffer) >= 100; _flush_buffer() writes each buffered JSON line to
/tmp/ci_logs/request_records_*.jsonl and clears the buffer; sink() calls
_flush_buffer() under request_lock before get_records_df() reads the
file — ensuring all buffered records are persisted before reads. Both
read paths handle FileNotFoundError and exceptions by returning empty
DataFrames with identical column schemas, so external callers see the
same API and no silent record loss.

- Enhancement summary (concrete): Replaces flaky Parquet append/read
with JSONL + explicit locking and deterministic flush semantics,
removing the root cause of parquet append corruption in tests while
keeping the original DataFrame-based analysis consumers unchanged
(get_records_df returns equivalent schemas).
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
2025-12-30 14:13:21 +08:00

138 lines
4.3 KiB
Python

import glob
import time
from yaml import full_load
import json
import pandas as pd
from utils.util_log import test_log as log
def gen_experiment_config(yaml):
"""load the yaml file of chaos experiment"""
with open(yaml) as f:
_config = full_load(f)
f.close()
return _config
def findkeys(node, kv):
# refer to https://stackoverflow.com/questions/9807634/find-all-occurrences-of-a-key-in-nested-dictionaries-and-lists
if isinstance(node, list):
for i in node:
for x in findkeys(i, kv):
yield x
elif isinstance(node, dict):
if kv in node:
yield node[kv]
for j in node.values():
for x in findkeys(j, kv):
yield x
def update_key_value(node, modify_k, modify_v):
# update the value of modify_k to modify_v
if isinstance(node, list):
for i in node:
update_key_value(i, modify_k, modify_v)
elif isinstance(node, dict):
if modify_k in node:
node[modify_k] = modify_v
for j in node.values():
update_key_value(j, modify_k, modify_v)
return node
def update_key_name(node, modify_k, modify_k_new):
# update the name of modify_k to modify_k_new
if isinstance(node, list):
for i in node:
update_key_name(i, modify_k, modify_k_new)
elif isinstance(node, dict):
if modify_k in node:
value_backup = node[modify_k]
del node[modify_k]
node[modify_k_new] = value_backup
for j in node.values():
update_key_name(j, modify_k, modify_k_new)
return node
def get_collections(file_name="all_collections.json"):
try:
with open(f"/tmp/ci_logs/{file_name}", "r") as f:
data = json.load(f)
collections = data["all"]
except Exception as e:
log.error(f"get_all_collections error: {e}")
return []
return collections
def get_deploy_test_collections():
try:
with open("/tmp/ci_logs/deploy_test_all_collections.json", "r") as f:
data = json.load(f)
collections = data["all"]
except Exception as e:
log.error(f"get_all_collections error: {e}")
return []
return collections
def get_chaos_test_collections():
try:
with open("/tmp/ci_logs/chaos_test_all_collections.json", "r") as f:
data = json.load(f)
collections = data["all"]
except Exception as e:
log.error(f"get_all_collections error: {e}")
return []
return collections
def wait_signal_to_apply_chaos():
all_db_file = glob.glob("/tmp/ci_logs/event_records*.jsonl")
log.info(f"all files {all_db_file}")
ready_apply_chaos = True
timeout = 15*60
t0 = time.time()
for f in all_db_file:
while True and (time.time() - t0 < timeout):
try:
records = []
with open(f, 'r') as file:
for line in file:
line = line.strip()
if line:
records.append(json.loads(line))
df = pd.DataFrame(records) if records else pd.DataFrame(columns=["event_name", "event_status", "event_ts"])
log.debug(f"read {f}:result\n {df}")
result = df[(df['event_name'] == 'init_chaos') & (df['event_status'] == 'ready')]
if len(result) > 0:
log.info(f"{f}: {result}")
ready_apply_chaos = True
break
else:
ready_apply_chaos = False
except Exception as e:
log.error(f"read jsonl error: {e}")
ready_apply_chaos = False
time.sleep(10)
return ready_apply_chaos
if __name__ == "__main__":
d = { "id" : "abcde",
"key1" : "blah",
"key2" : "blah blah",
"nestedlist" : [
{ "id" : "qwerty",
"nestednestedlist" : [
{ "id" : "xyz", "keyA" : "blah blah blah" },
{ "id" : "fghi", "keyZ" : "blah blah blah" }],
"anothernestednestedlist" : [
{ "id" : "asdf", "keyQ" : "blah blah" },
{ "id" : "yuiop", "keyW" : "blah" }] } ] }
print(list(findkeys(d, 'id')))
update_key_value(d, "none_id", "ccc")
print(d)