feat: [StorageV2] cmd binlog tool (#43648)

related: #39173 

Core Features
* Parquet File Analysis: Analyze Milvus binlog Parquet files with
metadata extraction
* MinIO Integration: Direct connection to MinIO storage for remote file
analysis
* Vector Data Deserialization: Specialized handling of Milvus vector
data in binlog files
* Interactive CLI: Command-line interface with interactive exploration

Analysis Capabilities
* Metadata & Vector Analysis: Extract schema info, row counts, and
vector statistics
* Data Export: Export data to JSON format with configurable limits
* Query Functionality: Search for specific records by ID
* Batch Processing: Analyze multiple Parquet files simultaneously

User Experience
* Verbose Output: Detailed logging for debugging
* Error Handling: Robust error handling for file access and parsing
* Flexible Output: Support for single file and batch analysis formats

---------

Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
Co-authored-by: nico <109071306+NicoYuan1986@users.noreply.github.com>
This commit is contained in:
sthuang 2025-07-31 15:05:37 +08:00 committed by GitHub
parent 1fe60520ae
commit 43c3c160ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 3796 additions and 0 deletions

44
cmd/tools/binlogv2/.gitignore vendored Normal file
View File

@ -0,0 +1,44 @@
# Python files
__pycache__/
*.pyc
*.pyo
*.pyd
.Python
.pytest_cache
**/.pytest_cache
# Virtual environments
venv/
.venv/
env/
ENV/
# IDE files
.idea/
.vscode/
*.swp
*.swo
# OS files
.DS_Store
Thumbs.db
# Logs and output files
*.log
nohup.out
logs/
test_out/
# Data files
*idmap*.txt
*.hdf5
*.npy
*.numpy
# Coverage reports
.coverage
htmlcov/
# Temporary files
*.tmp
*.temp

View File

@ -0,0 +1,237 @@
#!/usr/bin/env python3
"""
Parquet to JSON Export Tool
Specialized for exporting parquet file data to JSON format
"""
import argparse
import json
import sys
from pathlib import Path
import pandas as pd
import pyarrow.parquet as pq
from parquet_analyzer import VectorDeserializer
def export_parquet_to_json(parquet_file: str, output_file: str = None,
num_rows: int = None, start_row: int = 0,
include_vectors: bool = True,
vector_format: str = "deserialized",
pretty_print: bool = True):
"""
Export parquet file to JSON format
Args:
parquet_file: parquet file path
output_file: output JSON file path
num_rows: number of rows to export (None means all)
start_row: starting row number (0-based)
include_vectors: whether to include vector data
vector_format: vector format ("deserialized", "hex", "both")
pretty_print: whether to pretty print output
"""
print(f"📊 Exporting parquet file: {Path(parquet_file).name}")
print("=" * 60)
try:
# Read parquet file
table = pq.read_table(parquet_file)
df = table.to_pandas()
total_rows = len(df)
print(f"📋 File Information:")
print(f" Total Rows: {total_rows:,}")
print(f" Columns: {len(df.columns)}")
print(f" Column Names: {', '.join(df.columns)}")
# Determine export row range
if num_rows is None:
end_row = total_rows
num_rows = total_rows - start_row
else:
end_row = min(start_row + num_rows, total_rows)
num_rows = end_row - start_row
if start_row >= total_rows:
print(f"❌ Starting row {start_row} exceeds file range (0-{total_rows-1})")
return False
print(f"📈 Export Range: Row {start_row} to Row {end_row-1} (Total {num_rows} rows)")
# Get data for specified range
data_subset = df.iloc[start_row:end_row]
# Process data
processed_data = []
for idx, row in data_subset.iterrows():
row_dict = {}
for col_name, value in row.items():
if isinstance(value, bytes) and include_vectors:
# Process vector columns
try:
vec_analysis = VectorDeserializer.deserialize_with_analysis(value, col_name)
if vec_analysis and vec_analysis['deserialized']:
if vector_format == "deserialized":
row_dict[col_name] = {
"type": vec_analysis['vector_type'],
"dimension": vec_analysis['dimension'],
"data": vec_analysis['deserialized']
}
elif vector_format == "hex":
row_dict[col_name] = {
"type": vec_analysis['vector_type'],
"dimension": vec_analysis['dimension'],
"hex": value.hex()
}
elif vector_format == "both":
row_dict[col_name] = {
"type": vec_analysis['vector_type'],
"dimension": vec_analysis['dimension'],
"data": vec_analysis['deserialized'],
"hex": value.hex()
}
else:
row_dict[col_name] = {
"type": "binary",
"size": len(value),
"hex": value.hex()
}
except Exception as e:
row_dict[col_name] = {
"type": "binary",
"size": len(value),
"hex": value.hex(),
"error": str(e)
}
elif isinstance(value, bytes) and not include_vectors:
# When not including vectors, only show basic information
row_dict[col_name] = {
"type": "binary",
"size": len(value),
"hex": value.hex()[:50] + "..." if len(value.hex()) > 50 else value.hex()
}
else:
row_dict[col_name] = value
processed_data.append(row_dict)
# Prepare output structure
result = {
"export_info": {
"source_file": Path(parquet_file).name,
"total_rows": total_rows,
"exported_rows": len(processed_data),
"start_row": start_row,
"end_row": end_row - 1,
"columns": list(df.columns),
"vector_format": vector_format if include_vectors else "excluded"
},
"data": processed_data
}
# Determine output file
if not output_file:
base_name = Path(parquet_file).stem
output_file = f"{base_name}_export_{start_row}-{end_row-1}.json"
# Save to file
with open(output_file, 'w', encoding='utf-8') as f:
if pretty_print:
json.dump(result, f, ensure_ascii=False, indent=2)
else:
json.dump(result, f, ensure_ascii=False, separators=(',', ':'))
# Output statistics
file_size = Path(output_file).stat().st_size
print(f"✅ Export completed!")
print(f"📁 Output file: {output_file}")
print(f"📊 File size: {file_size:,} bytes ({file_size/1024:.2f} KB)")
print(f"📈 Exported rows: {len(processed_data)}")
return True
except Exception as e:
print(f"❌ Export failed: {e}")
return False
def main():
"""Main function"""
parser = argparse.ArgumentParser(
description="Parquet to JSON Export Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Usage Examples:
python export_to_json.py test_large_batch.parquet
python export_to_json.py test_large_batch.parquet --rows 100 --output data.json
python export_to_json.py test_large_batch.parquet --start 1000 --rows 50
python export_to_json.py test_large_batch.parquet --vector-format hex
"""
)
parser.add_argument(
"parquet_file",
help="Parquet file path"
)
parser.add_argument(
"--output", "-o",
help="Output JSON file path"
)
parser.add_argument(
"--rows", "-r",
type=int,
help="Number of rows to export (default: all)"
)
parser.add_argument(
"--start", "-s",
type=int,
default=0,
help="Starting row number (default: 0)"
)
parser.add_argument(
"--no-vectors",
action="store_true",
help="Exclude vector data"
)
parser.add_argument(
"--vector-format",
choices=["deserialized", "hex", "both"],
default="deserialized",
help="Vector data format (default: deserialized)"
)
parser.add_argument(
"--no-pretty",
action="store_true",
help="Don't pretty print JSON output (compressed format)"
)
args = parser.parse_args()
# Check if file exists
if not Path(args.parquet_file).exists():
print(f"❌ File does not exist: {args.parquet_file}")
sys.exit(1)
# Execute export
success = export_parquet_to_json(
parquet_file=args.parquet_file,
output_file=args.output,
num_rows=args.rows,
start_row=args.start,
include_vectors=not args.no_vectors,
vector_format=args.vector_format,
pretty_print=not args.no_pretty
)
if not success:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,688 @@
#!/usr/bin/env python3
"""
MinIO Client for Parquet Analysis
Downloads files from MinIO and passes them to parquet_analyzer_cli.py for analysis
"""
import argparse
import sys
import os
import tempfile
import subprocess
from pathlib import Path
from typing import Optional, List, Dict, Any
import json
try:
from minio import Minio
from minio.error import S3Error
except ImportError:
print("❌ MinIO client library not found. Please install it:")
print(" pip install minio")
sys.exit(1)
class MinioParquetAnalyzer:
"""MinIO client for downloading and analyzing Parquet files and Milvus binlog files"""
def __init__(self, endpoint: str, port: int = 9001, secure: bool = False,
access_key: str = None, secret_key: str = None):
"""
Initialize MinIO client
Args:
endpoint: MinIO server endpoint (hostname/IP)
port: MinIO server port (default: 9000)
secure: Use HTTPS (default: False)
access_key: MinIO access key (optional, for public buckets can be None)
secret_key: MinIO secret key (optional, for public buckets can be None)
"""
self.endpoint = endpoint
self.port = port
self.secure = secure
self.access_key = access_key
self.secret_key = secret_key
# Initialize MinIO client
try:
self.client = Minio(
f"{endpoint}:{port}",
access_key=access_key,
secret_key=secret_key,
secure=secure
)
print(f"✅ Connected to MinIO server: {endpoint}:{port}")
except Exception as e:
print(f"❌ Failed to connect to MinIO server: {e}")
sys.exit(1)
def list_buckets(self) -> List[Dict[str, Any]]:
"""List all buckets"""
try:
buckets = []
for bucket in self.client.list_buckets():
buckets.append({
'name': bucket.name,
'creation_date': bucket.creation_date.isoformat() if bucket.creation_date else None
})
return buckets
except S3Error as e:
print(f"❌ Failed to list buckets: {e}")
return []
def list_objects(self, bucket_name: str, prefix: str = "", recursive: bool = True) -> List[Dict[str, Any]]:
"""List objects in a bucket"""
try:
objects = []
for obj in self.client.list_objects(bucket_name, prefix=prefix, recursive=recursive):
objects.append({
'name': obj.object_name,
'size': obj.size,
'last_modified': obj.last_modified.isoformat() if obj.last_modified else None,
'etag': obj.etag
})
return objects
except S3Error as e:
print(f"❌ Failed to list objects in bucket '{bucket_name}': {e}")
return []
def filter_objects(self, objects: List[Dict[str, Any]],
prefix: str = None, suffix: str = None, contains: str = None,
size_min: int = None, size_max: int = None,
date_from: str = None, date_to: str = None) -> List[Dict[str, Any]]:
"""
Filter objects based on various criteria
Args:
objects: List of objects to filter
prefix: Filter by object name prefix
suffix: Filter by object name suffix
contains: Filter by object name containing string
size_min: Minimum size in MB
size_max: Maximum size in MB
date_from: Filter objects modified after date (YYYY-MM-DD)
date_to: Filter objects modified before date (YYYY-MM-DD)
Returns:
Filtered list of objects
"""
filtered = objects
if prefix:
filtered = [obj for obj in filtered if obj['name'].startswith(prefix)]
if suffix:
filtered = [obj for obj in filtered if obj['name'].endswith(suffix)]
if contains:
# Support complex logic with parentheses, OR (comma) and AND (&) logic
filtered = self._apply_contains_filter(filtered, contains)
if size_min is not None:
size_min_bytes = size_min * 1024 * 1024
filtered = [obj for obj in filtered if obj['size'] >= size_min_bytes]
if size_max is not None:
size_max_bytes = size_max * 1024 * 1024
filtered = [obj for obj in filtered if obj['size'] <= size_max_bytes]
if date_from:
try:
from_date = datetime.datetime.fromisoformat(date_from).date()
filtered = [obj for obj in filtered
if obj['last_modified'] and
datetime.datetime.fromisoformat(obj['last_modified']).date() >= from_date]
except ValueError:
print(f"⚠️ Invalid date format for --filter-date-from: {date_from}")
if date_to:
try:
to_date = datetime.datetime.fromisoformat(date_to).date()
filtered = [obj for obj in filtered
if obj['last_modified'] and
datetime.datetime.fromisoformat(obj['last_modified']).date() <= to_date]
except ValueError:
print(f"⚠️ Invalid date format for --filter-date-to: {date_to}")
return filtered
def _apply_contains_filter(self, objects: List[Dict[str, Any]], contains_expr: str) -> List[Dict[str, Any]]:
"""
Apply complex contains filter with parentheses support
Args:
objects: List of objects to filter
contains_expr: Complex contains expression with parentheses, OR (comma), and AND (&) logic
Returns:
Filtered list of objects
"""
def evaluate_expression(expr: str, obj_name: str) -> bool:
"""Evaluate a single expression for an object name"""
expr = expr.strip()
# Handle parentheses first
if '(' in expr and ')' in expr:
# Find the innermost parentheses
start = expr.rfind('(')
end = expr.find(')', start)
if start != -1 and end != -1:
# Extract the content inside parentheses
inner_expr = expr[start+1:end]
# Evaluate the inner expression
inner_result = evaluate_expression(inner_expr, obj_name)
# Replace the parentheses expression with the result
new_expr = expr[:start] + ('true' if inner_result else 'false') + expr[end+1:]
return evaluate_expression(new_expr, obj_name)
# Handle AND logic (&)
if '&' in expr:
parts = [p.strip() for p in expr.split('&')]
return all(evaluate_expression(part, obj_name) for part in parts)
# Handle OR logic (,)
if ',' in expr:
parts = [p.strip() for p in expr.split(',')]
return any(evaluate_expression(part, obj_name) for part in parts)
# Single keyword
return expr in obj_name
return [obj for obj in objects if evaluate_expression(contains_expr, obj['name'])]
def download_file(self, bucket_name: str, object_name: str, local_path: str = None) -> Optional[str]:
"""
Download a file from MinIO
Args:
bucket_name: Name of the bucket
object_name: Name of the object in the bucket
local_path: Local path to save the file (optional, will use temp file if not provided)
Returns:
Local file path if successful, None otherwise
"""
try:
if not local_path:
# Create temporary file
temp_dir = tempfile.gettempdir()
filename = Path(object_name).name
local_path = os.path.join(temp_dir, f"minio_{filename}")
print(f"📥 Downloading {object_name} from bucket {bucket_name}...")
self.client.fget_object(bucket_name, object_name, local_path)
print(f"✅ Downloaded to: {local_path}")
return local_path
except S3Error as e:
print(f"❌ Failed to download {object_name}: {e}")
return None
except Exception as e:
print(f"❌ Unexpected error downloading {object_name}: {e}")
return None
def analyze_parquet_from_minio(self, bucket_name: str, object_name: str,
command: str = "analyze", output_file: str = None,
rows: int = 10, verbose: bool = False,
id_value: str = None, id_column: str = None) -> bool:
"""
Download Parquet file from MinIO and analyze it using parquet_analyzer_cli.py
Args:
bucket_name: Name of the bucket
object_name: Name of the object in the bucket
command: Analysis command (analyze, metadata, vector, export, data)
output_file: Output file path for export/data commands
rows: Number of rows to export (for data command)
verbose: Verbose output
Returns:
True if successful, False otherwise
"""
# Download the file
local_path = self.download_file(bucket_name, object_name)
if not local_path:
return False
try:
# Build command for parquet_analyzer_cli.py
cli_script = Path(__file__).parent / "parquet_analyzer_cli.py"
if not cli_script.exists():
print(f"❌ parquet_analyzer_cli.py not found at: {cli_script}")
return False
cmd = [sys.executable, str(cli_script), command, local_path]
# Add optional arguments
if output_file:
cmd.extend(["--output", output_file])
if rows != 10:
cmd.extend(["--rows", str(rows)])
if verbose:
cmd.append("--verbose")
if id_value:
cmd.extend(["--id-value", str(id_value)])
if id_column:
cmd.extend(["--id-column", id_column])
print(f"🔍 Running analysis command: {' '.join(cmd)}")
print("=" * 60)
# Execute the command
result = subprocess.run(cmd, capture_output=False, text=True)
if result.returncode == 0:
print("✅ Analysis completed successfully")
return True
else:
print(f"❌ Analysis failed with return code: {result.returncode}")
return False
except Exception as e:
print(f"❌ Failed to run analysis: {e}")
return False
finally:
# Clean up temporary file if it was created
if local_path and local_path.startswith(tempfile.gettempdir()):
try:
os.remove(local_path)
print(f"🧹 Cleaned up temporary file: {local_path}")
except:
pass
def interactive_mode(self):
"""Interactive mode for browsing and analyzing files"""
print("🔍 MinIO Interactive Mode")
print("=" * 40)
# List buckets
buckets = self.list_buckets()
if not buckets:
print("❌ No buckets found or access denied")
return
print(f"📦 Found {len(buckets)} bucket(s):")
for i, bucket in enumerate(buckets):
print(f" {i+1}. {bucket['name']}")
# Select bucket
while True:
try:
choice = input(f"\nSelect bucket (1-{len(buckets)}) or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
bucket_idx = int(choice) - 1
if 0 <= bucket_idx < len(buckets):
selected_bucket = buckets[bucket_idx]['name']
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
# Main interactive loop
while True:
print(f"\n📁 Current bucket: '{selected_bucket}'")
print("=" * 50)
# List objects in selected bucket
print(f"📁 Objects in bucket '{selected_bucket}':")
objects = self.list_objects(selected_bucket)
if not objects:
print("❌ No objects found in this bucket")
return
# Apply filters if user wants to
print("\n🔍 Filter options:")
print(" 1. No filter (show all)")
print(" 2. Apply custom filters")
filter_choice = input("Select filter option (1-2): ").strip()
if filter_choice == "2":
print("\n📋 Available filters:")
print(" - prefix: Filter by object name prefix")
print(" - suffix: Filter by object name suffix")
print(" - contains: Filter by object name containing string(s). Use comma for OR logic, & for AND logic, () for grouping")
print(" - size_min: Filter by minimum size in MB")
print(" - size_max: Filter by maximum size in MB")
print(" - date_from: Filter by modification date (YYYY-MM-DD)")
print(" - date_to: Filter by modification date (YYYY-MM-DD)")
prefix = input("Prefix filter (or press Enter to skip): ").strip() or None
suffix = input("Suffix filter (or press Enter to skip): ").strip() or None
contains = input("Contains filter (or press Enter to skip): ").strip() or None
size_min_str = input("Minimum size in MB (or press Enter to skip): ").strip()
size_min = int(size_min_str) if size_min_str else None
size_max_str = input("Maximum size in MB (or press Enter to skip): ").strip()
size_max = int(size_max_str) if size_max_str else None
date_from = input("Date from (YYYY-MM-DD, or press Enter to skip): ").strip() or None
date_to = input("Date to (YYYY-MM-DD, or press Enter to skip): ").strip() or None
original_count = len(objects)
objects = self.filter_objects(
objects, prefix, suffix, contains, size_min, size_max, date_from, date_to
)
if original_count != len(objects):
print(f"🔍 Applied filters: {original_count}{len(objects)} objects")
# Filter Parquet files and Milvus binlog files
# Milvus binlog files don't have .parquet extension but are actually Parquet format
parquet_files = []
for obj in objects:
name = obj['name'].lower()
# Check for .parquet files
if name.endswith('.parquet'):
parquet_files.append(obj)
# Check for Milvus binlog files (insert_log, delta_log, etc.)
elif any(log_type in name for log_type in ['insert_log', 'delta_log', 'stats_log', 'index_files']):
parquet_files.append(obj)
if not parquet_files:
print("❌ No Parquet files or Milvus binlog files found in this bucket")
return
print(f"📊 Found {len(parquet_files)} Parquet file(s):")
for i, obj in enumerate(parquet_files):
size_mb = obj['size'] / (1024 * 1024)
modified_str = ""
if obj['last_modified']:
modified_str = f" (modified: {obj['last_modified'][:10]})"
print(f" {i+1}. {obj['name']} ({size_mb:.2f} MB){modified_str}")
# Select file
while True:
try:
choice = input(f"\nSelect file (1-{len(parquet_files)}) or 'b' to change bucket or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
elif choice.lower() == 'b':
# Go back to bucket selection
break
file_idx = int(choice) - 1
if 0 <= file_idx < len(parquet_files):
selected_file = parquet_files[file_idx]['name']
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
if choice.lower() == 'b':
# Re-select bucket
print(f"\n📦 Available buckets:")
for i, bucket in enumerate(buckets):
print(f" {i+1}. {bucket['name']}")
while True:
try:
choice = input(f"\nSelect bucket (1-{len(buckets)}) or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
bucket_idx = int(choice) - 1
if 0 <= bucket_idx < len(buckets):
selected_bucket = buckets[bucket_idx]['name']
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
continue
# File analysis loop
while True:
print(f"\n📄 Selected file: {selected_file}")
print("-" * 40)
# Select analysis command
commands = ["analyze", "metadata", "vector", "export", "data", "query"]
print(f"🔍 Available analysis commands:")
for i, cmd in enumerate(commands):
print(f" {i+1}. {cmd}")
while True:
try:
choice = input(f"\nSelect command (1-{len(commands)}) or 'f' to change file or 'b' to change bucket or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
elif choice.lower() == 'b':
# Go back to bucket selection
break
elif choice.lower() == 'f':
# Go back to file selection
break
cmd_idx = int(choice) - 1
if 0 <= cmd_idx < len(commands):
selected_command = commands[cmd_idx]
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
if choice.lower() in ['b', 'f']:
break
# Additional options
output_file = None
rows = 10
verbose = False
id_value = None
id_column = None
if selected_command in ["export", "data"]:
output_choice = input("Enter output file path (or press Enter for default): ").strip()
if output_choice:
output_file = output_choice
if selected_command == "data":
rows_choice = input("Enter number of rows to export (default: 10): ").strip()
if rows_choice:
try:
rows = int(rows_choice)
except ValueError:
print("❌ Invalid number, using default: 10")
if selected_command == "query":
id_value_choice = input("Enter ID value to query (or press Enter to see available ID columns): ").strip()
if id_value_choice:
id_value = id_value_choice
id_column_choice = input("Enter ID column name (or press Enter for auto-detection): ").strip()
if id_column_choice:
id_column = id_column_choice
verbose_choice = input("Verbose output? (y/N): ").strip().lower()
verbose = verbose_choice in ['y', 'yes']
# Run analysis
print(f"\n🚀 Starting analysis...")
success = self.analyze_parquet_from_minio(
selected_bucket, selected_file, selected_command,
output_file, rows, verbose, id_value, id_column
)
if success:
print("✅ Analysis completed successfully!")
else:
print("❌ Analysis failed")
# Ask if user wants to continue with the same file
continue_choice = input(f"\nContinue with the same file '{selected_file}'? (y/N): ").strip().lower()
if continue_choice not in ['y', 'yes']:
break
if choice.lower() == 'b':
continue
def main():
"""Main function"""
parser = argparse.ArgumentParser(
description="MinIO Client for Parquet Analysis and Milvus Binlog Analysis",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Interactive mode
python minio_client.py --endpoint localhost --interactive
# Analyze specific Parquet file
python minio_client.py --endpoint localhost --bucket mybucket --object data.parquet --command analyze
# Analyze Milvus binlog file
python minio_client.py --endpoint localhost --bucket a-bucket --object files/insert_log/459761955352871853/459761955352871854/459761955353071864/0 --command analyze
# Query by ID
python minio_client.py --endpoint localhost --bucket a-bucket --object files/insert_log/459761955352871853/459761955352871854/459761955353071864/0 --command query --id-value 123
# Export metadata
python minio_client.py --endpoint localhost --bucket mybucket --object data.parquet --command export --output result.json
# List buckets
python minio_client.py --endpoint localhost --list-buckets
# List objects in bucket
python minio_client.py --endpoint localhost --bucket mybucket --list-objects
# Filter objects by prefix (insert_log files only)
python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-prefix "files/insert_log/"
# Filter objects by size (files larger than 1MB)
python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-size-min 1
# Filter objects by date range
python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-date-from "2024-01-01" --filter-date-to "2024-01-31"
# Combine multiple filters
python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-prefix "files/" --filter-size-min 0.5 --filter-contains "insert"
# Filter with OR logic (files containing 'insert' OR 'delete')
python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-contains "insert,delete"
# Filter with AND logic (files containing 'insert' AND 'log')
python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-contains "insert&log"
# Filter with parentheses grouping ((insert OR delete) AND log)
python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-contains "(insert,delete)&log"
# Complex nested parentheses ((insert OR delete) AND (log OR bin))
python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-contains "(insert,delete)&(log,bin)"
"""
)
# Connection arguments
parser.add_argument("--endpoint", "-e", required=True, help="MinIO server endpoint")
parser.add_argument("--port", "-p", type=int, default=9000, help="MinIO server port (default: 9000)")
parser.add_argument("--secure", "-s", action="store_true", help="Use HTTPS")
parser.add_argument("--access-key", "-a", help="MinIO access key")
parser.add_argument("--secret-key", "-k", help="MinIO secret key")
# Operation arguments
parser.add_argument("--interactive", "-i", action="store_true", help="Interactive mode")
parser.add_argument("--list-buckets", "-b", action="store_true", help="List all buckets")
parser.add_argument("--bucket", help="Bucket name")
parser.add_argument("--list-objects", "-l", action="store_true", help="List objects in bucket")
parser.add_argument("--object", "-o", help="Object name in bucket")
# Filter arguments
parser.add_argument("--filter-prefix", help="Filter objects by prefix (e.g., 'files/insert_log/')")
parser.add_argument("--filter-suffix", help="Filter objects by suffix (e.g., '.parquet')")
parser.add_argument("--filter-contains", help="Filter objects containing specific string(s). Use comma for OR logic (e.g., 'insert,delete'), use & for AND logic (e.g., 'insert&log'), use () for grouping (e.g., '(insert,delete)&log')")
parser.add_argument("--filter-size-min", type=int, help="Filter objects by minimum size in MB")
parser.add_argument("--filter-size-max", type=int, help="Filter objects by maximum size in MB")
parser.add_argument("--filter-date-from", help="Filter objects modified after date (YYYY-MM-DD)")
parser.add_argument("--filter-date-to", help="Filter objects modified before date (YYYY-MM-DD)")
# Analysis arguments
parser.add_argument("--command", "-c", choices=["analyze", "metadata", "vector", "export", "data", "query"],
default="analyze", help="Analysis command (default: analyze)")
parser.add_argument("--output", help="Output file path (for export/data commands)")
parser.add_argument("--rows", "-r", type=int, default=10, help="Number of rows to export (for data command)")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
parser.add_argument("--id-value", "-q", help="ID value to query (for query command)")
parser.add_argument("--id-column", help="ID column name (for query command, auto-detected if not specified)")
args = parser.parse_args()
# Initialize MinIO client
client = MinioParquetAnalyzer(
endpoint=args.endpoint,
port=args.port,
secure=args.secure,
access_key=args.access_key,
secret_key=args.secret_key
)
# Execute requested operation
if args.interactive:
client.interactive_mode()
elif args.list_buckets:
buckets = client.list_buckets()
if buckets:
print(f"📦 Found {len(buckets)} bucket(s):")
for bucket in buckets:
print(f" - {bucket['name']}")
else:
print("❌ No buckets found or access denied")
elif args.list_objects:
if not args.bucket:
print("❌ --bucket is required for --list-objects")
sys.exit(1)
objects = client.list_objects(args.bucket)
if objects:
# Apply filters if specified
original_count = len(objects)
objects = client.filter_objects(
objects,
prefix=args.filter_prefix,
suffix=args.filter_suffix,
contains=args.filter_contains,
size_min=args.filter_size_min,
size_max=args.filter_size_max,
date_from=args.filter_date_from,
date_to=args.filter_date_to
)
if original_count != len(objects):
print(f"🔍 Applied filters: {original_count}{len(objects)} objects")
if objects:
print(f"📁 Found {len(objects)} object(s) in bucket '{args.bucket}':")
for obj in objects:
size_mb = obj['size'] / (1024 * 1024)
modified_str = ""
if obj['last_modified']:
modified_str = f" (modified: {obj['last_modified'][:10]})"
print(f" - {obj['name']} ({size_mb:.2f} MB){modified_str}")
else:
print("❌ No objects match the specified filters")
else:
print("❌ No objects found or access denied")
elif args.object:
if not args.bucket:
print("❌ --bucket is required when specifying --object")
sys.exit(1)
success = client.analyze_parquet_from_minio(
args.bucket, args.object, args.command,
args.output, args.rows, args.verbose, args.id_value, args.id_column
)
if not success:
sys.exit(1)
else:
print("❌ No operation specified. Use --interactive, --list-buckets, --list-objects, or specify --object")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,804 @@
#!/usr/bin/env python3
"""
MinIO Parquet Analyzer - Integrated Tool
Combines MinIO client with parquet_analyzer_cli.py for seamless analysis
"""
import argparse
import sys
import os
import tempfile
import subprocess
import json
from pathlib import Path
from typing import Optional, List, Dict, Any
from datetime import datetime
try:
from minio import Minio
from minio.error import S3Error
except ImportError:
print("❌ MinIO client library not found. Please install it:")
print(" pip install minio")
sys.exit(1)
class MinioParquetAnalyzer:
"""Integrated MinIO and Parquet Analyzer"""
def __init__(self, endpoint: str, port: int = 9000, secure: bool = False,
access_key: str = None, secret_key: str = None):
"""Initialize the integrated analyzer"""
self.endpoint = endpoint
self.port = port
self.secure = secure
self.access_key = access_key
self.secret_key = secret_key
# Initialize MinIO client
try:
self.client = Minio(
f"{endpoint}:{port}",
access_key=access_key,
secret_key=secret_key,
secure=secure
)
print(f"✅ Connected to MinIO server: {endpoint}:{port}")
except Exception as e:
print(f"❌ Failed to connect to MinIO server: {e}")
sys.exit(1)
# Check if parquet_analyzer_cli.py exists
self.cli_script = Path(__file__).parent / "parquet_analyzer_cli.py"
if not self.cli_script.exists():
print(f"❌ parquet_analyzer_cli.py not found at: {self.cli_script}")
sys.exit(1)
def list_buckets(self) -> List[Dict[str, Any]]:
"""List all buckets"""
try:
buckets = []
for bucket in self.client.list_buckets():
buckets.append({
'name': bucket.name,
'creation_date': bucket.creation_date.isoformat() if bucket.creation_date else None
})
return buckets
except S3Error as e:
print(f"❌ Failed to list buckets: {e}")
return []
def filter_objects(self, objects: List[Dict[str, Any]],
prefix: str = None, suffix: str = None, contains: str = None,
size_min: int = None, size_max: int = None,
date_from: str = None, date_to: str = None) -> List[Dict[str, Any]]:
"""
Filter objects based on various criteria
Args:
objects: List of objects to filter
prefix: Filter by object name prefix
suffix: Filter by object name suffix
contains: Filter by object name containing string
size_min: Minimum size in MB
size_max: Maximum size in MB
date_from: Filter objects modified after date (YYYY-MM-DD)
date_to: Filter objects modified before date (YYYY-MM-DD)
Returns:
Filtered list of objects
"""
filtered = objects
if prefix:
filtered = [obj for obj in filtered if obj['name'].startswith(prefix)]
if suffix:
filtered = [obj for obj in filtered if obj['name'].endswith(suffix)]
if contains:
# Support complex logic with parentheses, OR (comma) and AND (&) logic
filtered = self._apply_contains_filter(filtered, contains)
if size_min is not None:
size_min_bytes = size_min * 1024 * 1024
filtered = [obj for obj in filtered if obj['size'] >= size_min_bytes]
if size_max is not None:
size_max_bytes = size_max * 1024 * 1024
filtered = [obj for obj in filtered if obj['size'] <= size_max_bytes]
if date_from:
try:
from_date = datetime.datetime.fromisoformat(date_from).date()
filtered = [obj for obj in filtered
if obj['last_modified'] and
datetime.datetime.fromisoformat(obj['last_modified']).date() >= from_date]
except ValueError:
print(f"⚠️ Invalid date format for --filter-date-from: {date_from}")
if date_to:
try:
to_date = datetime.datetime.fromisoformat(date_to).date()
filtered = [obj for obj in filtered
if obj['last_modified'] and
datetime.datetime.fromisoformat(obj['last_modified']).date() <= to_date]
except ValueError:
print(f"⚠️ Invalid date format for --filter-date-to: {date_to}")
return filtered
def _apply_contains_filter(self, objects: List[Dict[str, Any]], contains_expr: str) -> List[Dict[str, Any]]:
"""
Apply complex contains filter with parentheses support
Args:
objects: List of objects to filter
contains_expr: Complex contains expression with parentheses, OR (comma), and AND (&) logic
Returns:
Filtered list of objects
"""
def evaluate_expression(expr: str, obj_name: str) -> bool:
"""Evaluate a single expression for an object name"""
expr = expr.strip()
# Handle parentheses first
if '(' in expr and ')' in expr:
# Find the innermost parentheses
start = expr.rfind('(')
end = expr.find(')', start)
if start != -1 and end != -1:
# Extract the content inside parentheses
inner_expr = expr[start+1:end]
# Evaluate the inner expression
inner_result = evaluate_expression(inner_expr, obj_name)
# Replace the parentheses expression with the result
new_expr = expr[:start] + ('true' if inner_result else 'false') + expr[end+1:]
return evaluate_expression(new_expr, obj_name)
# Handle AND logic (&)
if '&' in expr:
parts = [p.strip() for p in expr.split('&')]
return all(evaluate_expression(part, obj_name) for part in parts)
# Handle OR logic (,)
if ',' in expr:
parts = [p.strip() for p in expr.split(',')]
return any(evaluate_expression(part, obj_name) for part in parts)
# Single keyword
return expr in obj_name
return [obj for obj in objects if evaluate_expression(contains_expr, obj['name'])]
def list_parquet_files(self, bucket_name: str, prefix: str = "") -> List[Dict[str, Any]]:
"""List Parquet files in a bucket"""
try:
parquet_files = []
for obj in self.client.list_objects(bucket_name, prefix=prefix, recursive=True):
if obj.object_name.lower().endswith('.parquet'):
parquet_files.append({
'name': obj.object_name,
'size': obj.size,
'size_mb': obj.size / (1024 * 1024),
'last_modified': obj.last_modified.isoformat() if obj.last_modified else None,
'etag': obj.etag
})
return parquet_files
except S3Error as e:
print(f"❌ Failed to list objects in bucket '{bucket_name}': {e}")
return []
def download_and_analyze(self, bucket_name: str, object_name: str,
command: str = "analyze", output_file: str = None,
rows: int = 10, verbose: bool = False,
keep_local: bool = False) -> Dict[str, Any]:
"""
Download Parquet file from MinIO and analyze it
Returns:
Dictionary with analysis results and metadata
"""
result = {
'success': False,
'bucket': bucket_name,
'object': object_name,
'command': command,
'local_path': None,
'analysis_output': None,
'error': None,
'timestamp': datetime.now().isoformat()
}
# Download the file
try:
local_path = self._download_file(bucket_name, object_name, keep_local)
if not local_path:
result['error'] = "Failed to download file"
return result
result['local_path'] = local_path
# Run analysis
analysis_result = self._run_analysis(local_path, command, output_file, rows, verbose)
result['analysis_output'] = analysis_result
if analysis_result['success']:
result['success'] = True
else:
result['error'] = analysis_result['error']
except Exception as e:
result['error'] = str(e)
return result
def _download_file(self, bucket_name: str, object_name: str, keep_local: bool = False) -> Optional[str]:
"""Download file from MinIO"""
try:
if keep_local:
# Save to current directory
local_path = Path(object_name).name
else:
# Create temporary file
temp_dir = tempfile.gettempdir()
filename = Path(object_name).name
local_path = os.path.join(temp_dir, f"minio_{filename}")
print(f"📥 Downloading {object_name} from bucket {bucket_name}...")
self.client.fget_object(bucket_name, object_name, local_path)
print(f"✅ Downloaded to: {local_path}")
return local_path
except S3Error as e:
print(f"❌ Failed to download {object_name}: {e}")
return None
except Exception as e:
print(f"❌ Unexpected error downloading {object_name}: {e}")
return None
def _run_analysis(self, local_path: str, command: str, output_file: str = None,
rows: int = 10, verbose: bool = False) -> Dict[str, Any]:
"""Run parquet_analyzer_cli.py on local file"""
result = {
'success': False,
'command': command,
'output_file': output_file,
'error': None
}
try:
# Build command
cmd = [sys.executable, str(self.cli_script), command, local_path]
# Add optional arguments
if output_file:
cmd.extend(["--output", output_file])
if rows != 10:
cmd.extend(["--rows", str(rows)])
if verbose:
cmd.append("--verbose")
print(f"🔍 Running analysis: {' '.join(cmd)}")
print("=" * 60)
# Execute the command
process = subprocess.run(cmd, capture_output=True, text=True)
result['return_code'] = process.returncode
result['stdout'] = process.stdout
result['stderr'] = process.stderr
if process.returncode == 0:
result['success'] = True
print("✅ Analysis completed successfully")
else:
result['error'] = f"Analysis failed with return code: {process.returncode}"
print(f"{result['error']}")
if process.stderr:
print(f"Error output: {process.stderr}")
except Exception as e:
result['error'] = str(e)
print(f"❌ Failed to run analysis: {e}")
return result
def batch_analyze(self, bucket_name: str, prefix: str = "", command: str = "analyze",
output_dir: str = None, verbose: bool = False) -> List[Dict[str, Any]]:
"""
Analyze multiple Parquet files in a bucket
Args:
bucket_name: Name of the bucket
prefix: Prefix to filter files
command: Analysis command
output_dir: Directory to save output files
verbose: Verbose output
Returns:
List of analysis results
"""
print(f"🔍 Batch analyzing Parquet files in bucket '{bucket_name}'")
if prefix:
print(f"📁 Filtering by prefix: '{prefix}'")
# List Parquet files
parquet_files = self.list_parquet_files(bucket_name, prefix)
if not parquet_files:
print("❌ No Parquet files found")
return []
print(f"📊 Found {len(parquet_files)} Parquet file(s)")
# Create output directory if specified
if output_dir:
os.makedirs(output_dir, exist_ok=True)
print(f"📁 Output directory: {output_dir}")
results = []
for i, file_info in enumerate(parquet_files, 1):
print(f"\n{'='*60}")
print(f"📊 Processing file {i}/{len(parquet_files)}: {file_info['name']}")
print(f"📏 Size: {file_info['size_mb']:.2f} MB")
# Determine output file
output_file = None
if output_dir:
base_name = Path(file_info['name']).stem
output_file = os.path.join(output_dir, f"{base_name}_{command}_result.json")
# Analyze the file
result = self.download_and_analyze(
bucket_name, file_info['name'], command, output_file, verbose=verbose
)
results.append(result)
# Clean up temporary file
if result['local_path'] and result['local_path'].startswith(tempfile.gettempdir()):
try:
os.remove(result['local_path'])
except:
pass
# Print summary
successful = sum(1 for r in results if r['success'])
failed = len(results) - successful
print(f"\n{'='*60}")
print(f"📊 Batch Analysis Summary:")
print(f" Total files: {len(results)}")
print(f" Successful: {successful}")
print(f" Failed: {failed}")
return results
def interactive_mode(self):
"""Interactive mode for browsing and analyzing files"""
print("🔍 MinIO Parquet Analyzer - Interactive Mode")
print("=" * 50)
# List buckets
buckets = self.list_buckets()
if not buckets:
print("❌ No buckets found or access denied")
return
print(f"📦 Found {len(buckets)} bucket(s):")
for i, bucket in enumerate(buckets):
print(f" {i+1}. {bucket['name']}")
# Select bucket
while True:
try:
choice = input(f"\nSelect bucket (1-{len(buckets)}) or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
bucket_idx = int(choice) - 1
if 0 <= bucket_idx < len(buckets):
selected_bucket = buckets[bucket_idx]['name']
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
# List Parquet files in selected bucket
print(f"\n📁 Parquet files in bucket '{selected_bucket}':")
parquet_files = self.list_parquet_files(selected_bucket)
if not parquet_files:
print("❌ No Parquet files found in this bucket")
return
print(f"📊 Found {len(parquet_files)} Parquet file(s):")
for i, obj in enumerate(parquet_files):
print(f" {i+1}. {obj['name']} ({obj['size_mb']:.2f} MB)")
# Select operation mode
print(f"\n🔍 Operation modes:")
print(" 1. Analyze single file")
print(" 2. Batch analyze all files")
print(" 3. Select specific files")
while True:
try:
mode_choice = input(f"\nSelect mode (1-3) or 'q' to quit: ").strip()
if mode_choice.lower() == 'q':
return
mode = int(mode_choice)
if mode in [1, 2, 3]:
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
if mode == 1:
# Single file analysis
self._interactive_single_file(selected_bucket, parquet_files)
elif mode == 2:
# Batch analysis
self._interactive_batch_analysis(selected_bucket, parquet_files)
elif mode == 3:
# Select specific files
self._interactive_select_files(selected_bucket, parquet_files)
def _interactive_single_file(self, bucket_name: str, parquet_files: List[Dict[str, Any]]):
"""Interactive single file analysis"""
# Select file
while True:
try:
choice = input(f"\nSelect file (1-{len(parquet_files)}) or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
file_idx = int(choice) - 1
if 0 <= file_idx < len(parquet_files):
selected_file = parquet_files[file_idx]['name']
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
# Select analysis command
commands = ["analyze", "metadata", "vector", "export", "data"]
print(f"\n🔍 Available analysis commands:")
for i, cmd in enumerate(commands):
print(f" {i+1}. {cmd}")
while True:
try:
choice = input(f"\nSelect command (1-{len(commands)}) or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
cmd_idx = int(choice) - 1
if 0 <= cmd_idx < len(commands):
selected_command = commands[cmd_idx]
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
# Additional options
output_file = None
rows = 10
verbose = False
if selected_command in ["export", "data"]:
output_choice = input("Enter output file path (or press Enter for default): ").strip()
if output_choice:
output_file = output_choice
if selected_command == "data":
rows_choice = input("Enter number of rows to export (default: 10): ").strip()
if rows_choice:
try:
rows = int(rows_choice)
except ValueError:
print("❌ Invalid number, using default: 10")
verbose_choice = input("Verbose output? (y/N): ").strip().lower()
verbose = verbose_choice in ['y', 'yes']
# Run analysis
print(f"\n🚀 Starting analysis...")
result = self.download_and_analyze(
bucket_name, selected_file, selected_command,
output_file, rows, verbose
)
if result['success']:
print("✅ Analysis completed successfully!")
else:
print(f"❌ Analysis failed: {result['error']}")
def _interactive_batch_analysis(self, bucket_name: str, parquet_files: List[Dict[str, Any]]):
"""Interactive batch analysis"""
print(f"\n📊 Batch analysis for {len(parquet_files)} files")
# Select command
commands = ["analyze", "metadata", "vector", "export", "data"]
print(f"\n🔍 Available analysis commands:")
for i, cmd in enumerate(commands):
print(f" {i+1}. {cmd}")
while True:
try:
choice = input(f"\nSelect command (1-{len(commands)}) or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
cmd_idx = int(choice) - 1
if 0 <= cmd_idx < len(commands):
selected_command = commands[cmd_idx]
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
# Output directory
output_dir = input("Enter output directory (or press Enter for current dir): ").strip()
if not output_dir:
output_dir = "."
verbose_choice = input("Verbose output? (y/N): ").strip().lower()
verbose = verbose_choice in ['y', 'yes']
# Run batch analysis
print(f"\n🚀 Starting batch analysis...")
results = self.batch_analyze(bucket_name, "", selected_command, output_dir, verbose)
print(f"✅ Batch analysis completed!")
def _interactive_select_files(self, bucket_name: str, parquet_files: List[Dict[str, Any]]):
"""Interactive select specific files"""
print(f"\n📋 Select files to analyze (comma-separated numbers, e.g., 1,3,5)")
print(f"Available files:")
for i, obj in enumerate(parquet_files):
print(f" {i+1}. {obj['name']} ({obj['size_mb']:.2f} MB)")
while True:
try:
choice = input(f"\nEnter file numbers (1-{len(parquet_files)}) or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
selected_indices = [int(x.strip()) - 1 for x in choice.split(',')]
if all(0 <= idx < len(parquet_files) for idx in selected_indices):
selected_files = [parquet_files[idx]['name'] for idx in selected_indices]
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter valid numbers")
# Select command
commands = ["analyze", "metadata", "vector", "export", "data"]
print(f"\n🔍 Available analysis commands:")
for i, cmd in enumerate(commands):
print(f" {i+1}. {cmd}")
while True:
try:
choice = input(f"\nSelect command (1-{len(commands)}) or 'q' to quit: ").strip()
if choice.lower() == 'q':
return
cmd_idx = int(choice) - 1
if 0 <= cmd_idx < len(commands):
selected_command = commands[cmd_idx]
break
else:
print("❌ Invalid selection")
except ValueError:
print("❌ Please enter a valid number")
# Additional options
output_dir = input("Enter output directory (or press Enter for current dir): ").strip()
if not output_dir:
output_dir = "."
verbose_choice = input("Verbose output? (y/N): ").strip().lower()
verbose = verbose_choice in ['y', 'yes']
# Run analysis for selected files
print(f"\n🚀 Starting analysis for {len(selected_files)} selected files...")
results = []
for i, file_name in enumerate(selected_files, 1):
print(f"\n📊 Processing file {i}/{len(selected_files)}: {file_name}")
# Determine output file
output_file = None
if output_dir:
base_name = Path(file_name).stem
output_file = os.path.join(output_dir, f"{base_name}_{selected_command}_result.json")
result = self.download_and_analyze(
bucket_name, file_name, selected_command, output_file, verbose=verbose
)
results.append(result)
# Print summary
successful = sum(1 for r in results if r['success'])
print(f"\n✅ Analysis completed! {successful}/{len(results)} files processed successfully.")
def main():
"""Main function"""
parser = argparse.ArgumentParser(
description="MinIO Parquet Analyzer - Integrated Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Interactive mode
python minio_parquet_analyzer.py --endpoint localhost --interactive
# Analyze specific file
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --object data.parquet --command analyze
# Batch analyze all Parquet files
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --batch --command metadata --output-dir results
# List buckets
python minio_parquet_analyzer.py --endpoint localhost --list-buckets
# List Parquet files in bucket
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files
# Filter files by prefix (insert_log files only)
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-prefix "files/insert_log/"
# Filter files by size (files larger than 1MB)
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-size-min 1
# Filter files by date range
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-date-from "2024-01-01" --filter-date-to "2024-01-31"
# Combine multiple filters
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-prefix "files/" --filter-size-min 0.5 --filter-contains "insert"
# Filter with OR logic (files containing 'insert' OR 'delete')
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-contains "insert,delete"
# Filter with AND logic (files containing 'insert' AND 'log')
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-contains "insert&log"
# Filter with parentheses grouping ((insert OR delete) AND log)
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-contains "(insert,delete)&log"
# Complex nested parentheses ((insert OR delete) AND (log OR bin))
python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-contains "(insert,delete)&(log,bin)"
"""
)
# Connection arguments
parser.add_argument("--endpoint", "-e", required=True, help="MinIO server endpoint")
parser.add_argument("--port", "-p", type=int, default=9000, help="MinIO server port (default: 9000)")
parser.add_argument("--secure", "-s", action="store_true", help="Use HTTPS")
parser.add_argument("--access-key", "-a", help="MinIO access key")
parser.add_argument("--secret-key", "-k", help="MinIO secret key")
# Operation arguments
parser.add_argument("--interactive", "-i", action="store_true", help="Interactive mode")
parser.add_argument("--list-buckets", "-b", action="store_true", help="List all buckets")
parser.add_argument("--bucket", help="Bucket name")
parser.add_argument("--list-files", "-l", action="store_true", help="List Parquet files in bucket")
parser.add_argument("--object", "-o", help="Object name in bucket")
parser.add_argument("--batch", action="store_true", help="Batch analyze all Parquet files in bucket")
# Filter arguments
parser.add_argument("--filter-prefix", help="Filter objects by prefix (e.g., 'files/insert_log/')")
parser.add_argument("--filter-suffix", help="Filter objects by suffix (e.g., '.parquet')")
parser.add_argument("--filter-contains", help="Filter objects containing specific string(s). Use comma for OR logic (e.g., 'insert,delete'), use & for AND logic (e.g., 'insert&log'), use () for grouping (e.g., '(insert,delete)&log')")
parser.add_argument("--filter-size-min", type=int, help="Filter objects by minimum size in MB")
parser.add_argument("--filter-size-max", type=int, help="Filter objects by maximum size in MB")
parser.add_argument("--filter-date-from", help="Filter objects modified after date (YYYY-MM-DD)")
parser.add_argument("--filter-date-to", help="Filter objects modified before date (YYYY-MM-DD)")
# Analysis arguments
parser.add_argument("--command", "-c", choices=["analyze", "metadata", "vector", "export", "data"],
default="analyze", help="Analysis command (default: analyze)")
parser.add_argument("--output", help="Output file path (for single file analysis)")
parser.add_argument("--output-dir", help="Output directory (for batch analysis)")
parser.add_argument("--rows", "-r", type=int, default=10, help="Number of rows to export (for data command)")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
parser.add_argument("--keep-local", action="store_true", help="Keep downloaded files locally")
args = parser.parse_args()
# Initialize analyzer
analyzer = MinioParquetAnalyzer(
endpoint=args.endpoint,
port=args.port,
secure=args.secure,
access_key=args.access_key,
secret_key=args.secret_key
)
# Execute requested operation
if args.interactive:
analyzer.interactive_mode()
elif args.list_buckets:
buckets = analyzer.list_buckets()
if buckets:
print(f"📦 Found {len(buckets)} bucket(s):")
for bucket in buckets:
print(f" - {bucket['name']}")
else:
print("❌ No buckets found or access denied")
elif args.list_files:
if not args.bucket:
print("❌ --bucket is required for --list-files")
sys.exit(1)
files = analyzer.list_parquet_files(args.bucket)
if files:
# Apply filters if specified
original_count = len(files)
files = analyzer.filter_objects(
files,
prefix=args.filter_prefix,
suffix=args.filter_suffix,
contains=args.filter_contains,
size_min=args.filter_size_min,
size_max=args.filter_size_max,
date_from=args.filter_date_from,
date_to=args.filter_date_to
)
if original_count != len(files):
print(f"🔍 Applied filters: {original_count}{len(files)} files")
if files:
print(f"📊 Found {len(files)} Parquet file(s) in bucket '{args.bucket}':")
for obj in files:
modified_str = ""
if obj.get('last_modified'):
modified_str = f" (modified: {obj['last_modified'][:10]})"
print(f" - {obj['name']} ({obj['size_mb']:.2f} MB){modified_str}")
else:
print("❌ No files match the specified filters")
else:
print("❌ No Parquet files found or access denied")
elif args.batch:
if not args.bucket:
print("❌ --bucket is required for --batch")
sys.exit(1)
results = analyzer.batch_analyze(
args.bucket, "", args.command, args.output_dir, args.verbose
)
if not results:
sys.exit(1)
elif args.object:
if not args.bucket:
print("❌ --bucket is required when specifying --object")
sys.exit(1)
result = analyzer.download_and_analyze(
args.bucket, args.object, args.command,
args.output, args.rows, args.verbose, args.keep_local
)
if not result['success']:
sys.exit(1)
else:
print("❌ No operation specified. Use --interactive, --list-buckets, --list-files, --batch, or specify --object")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,11 @@
"""
Parquet Analyzer Package
A toolkit for analyzing parquet files, including metadata parsing and vector deserialization functionality
"""
from .meta_parser import ParquetMetaParser
from .vector_deserializer import VectorDeserializer
from .analyzer import ParquetAnalyzer
__version__ = "1.0.0"
__all__ = ["ParquetMetaParser", "VectorDeserializer", "ParquetAnalyzer"]

View File

@ -0,0 +1,494 @@
"""
Parquet Analyzer Main Component
Main analyzer that integrates metadata parsing and vector deserialization functionality
"""
import json
from pathlib import Path
from typing import Dict, List, Any, Optional
from .meta_parser import ParquetMetaParser
from .vector_deserializer import VectorDeserializer
class ParquetAnalyzer:
"""Main Parquet file analyzer class"""
def __init__(self, file_path: str):
"""
Initialize analyzer
Args:
file_path: parquet file path
"""
self.file_path = Path(file_path)
self.meta_parser = ParquetMetaParser(file_path)
self.vector_deserializer = VectorDeserializer()
def load(self) -> bool:
"""
Load parquet file
Returns:
bool: whether loading was successful
"""
return self.meta_parser.load()
def analyze_metadata(self) -> Dict[str, Any]:
"""
Analyze metadata information
Returns:
Dict: metadata analysis results
"""
if not self.meta_parser.metadata:
return {}
return {
"basic_info": self.meta_parser.get_basic_info(),
"file_metadata": self.meta_parser.get_file_metadata(),
"schema_metadata": self.meta_parser.get_schema_metadata(),
"column_statistics": self.meta_parser.get_column_statistics(),
"row_group_info": self.meta_parser.get_row_group_info(),
"metadata_summary": self.meta_parser.get_metadata_summary()
}
def analyze_vectors(self) -> List[Dict[str, Any]]:
"""
Analyze vector data
Returns:
List: vector analysis results list
"""
if not self.meta_parser.metadata:
return []
vector_analysis = []
column_stats = self.meta_parser.get_column_statistics()
for col_stats in column_stats:
if "statistics" in col_stats and col_stats["statistics"]:
stats = col_stats["statistics"]
col_name = col_stats["column_name"]
# Check if there's binary data (vector)
if "min" in stats:
min_value = stats["min"]
if isinstance(min_value, bytes):
min_analysis = VectorDeserializer.deserialize_with_analysis(
min_value, col_name
)
if min_analysis:
vector_analysis.append({
"column_name": col_name,
"stat_type": "min",
"analysis": min_analysis
})
elif isinstance(min_value, str) and len(min_value) > 32:
# May be hex string, try to convert back to bytes
try:
min_bytes = bytes.fromhex(min_value)
min_analysis = VectorDeserializer.deserialize_with_analysis(
min_bytes, col_name
)
if min_analysis:
vector_analysis.append({
"column_name": col_name,
"stat_type": "min",
"analysis": min_analysis
})
except ValueError:
pass
if "max" in stats:
max_value = stats["max"]
if isinstance(max_value, bytes):
max_analysis = VectorDeserializer.deserialize_with_analysis(
max_value, col_name
)
if max_analysis:
vector_analysis.append({
"column_name": col_name,
"stat_type": "max",
"analysis": max_analysis
})
elif isinstance(max_value, str) and len(max_value) > 32:
# May be hex string, try to convert back to bytes
try:
max_bytes = bytes.fromhex(max_value)
max_analysis = VectorDeserializer.deserialize_with_analysis(
max_bytes, col_name
)
if max_analysis:
vector_analysis.append({
"column_name": col_name,
"stat_type": "max",
"analysis": max_analysis
})
except ValueError:
pass
return vector_analysis
def analyze(self) -> Dict[str, Any]:
"""
Complete parquet file analysis
Returns:
Dict: complete analysis results
"""
if not self.load():
return {}
return {
"metadata": self.analyze_metadata(),
"vectors": self.analyze_vectors()
}
def export_analysis(self, output_file: Optional[str] = None) -> str:
"""
Export analysis results
Args:
output_file: output file path, if None will auto-generate
Returns:
str: output file path
"""
if output_file is None:
output_file = f"{self.file_path.stem}_analysis.json"
analysis_result = self.analyze()
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(analysis_result, f, indent=2, ensure_ascii=False)
return output_file
def print_summary(self):
"""Print analysis summary"""
if not self.meta_parser.metadata:
print("❌ No parquet file loaded")
return
# Print metadata summary
self.meta_parser.print_summary()
# Print vector analysis summary
vector_analysis = self.analyze_vectors()
if vector_analysis:
print(f"\n🔍 Vector Analysis Summary:")
print("=" * 60)
for vec_analysis in vector_analysis:
col_name = vec_analysis["column_name"]
stat_type = vec_analysis["stat_type"]
analysis = vec_analysis["analysis"]
print(f" Column: {col_name} ({stat_type})")
print(f" Vector Type: {analysis['vector_type']}")
print(f" Dimension: {analysis['dimension']}")
if "statistics" in analysis and analysis["statistics"]:
stats = analysis["statistics"]
print(f" Min: {stats.get('min', 'N/A')}")
print(f" Max: {stats.get('max', 'N/A')}")
print(f" Mean: {stats.get('mean', 'N/A')}")
print(f" Std: {stats.get('std', 'N/A')}")
if analysis["vector_type"] == "BinaryVector" and "statistics" in analysis:
stats = analysis["statistics"]
print(f" Zero Count: {stats.get('zero_count', 'N/A')}")
print(f" One Count: {stats.get('one_count', 'N/A')}")
print()
def get_vector_samples(self, column_name: str, sample_count: int = 5) -> List[Dict[str, Any]]:
"""
Get vector sample data
Args:
column_name: column name
sample_count: number of samples
Returns:
List: vector sample list
"""
# This can be extended to read samples from actual data
# Currently returns min/max from statistics as samples
vector_analysis = self.analyze_vectors()
samples = []
for vec_analysis in vector_analysis:
if vec_analysis["column_name"] == column_name:
analysis = vec_analysis["analysis"]
samples.append({
"type": vec_analysis["stat_type"],
"vector_type": analysis["vector_type"],
"dimension": analysis["dimension"],
"data": analysis["deserialized"][:sample_count] if analysis["deserialized"] else [],
"statistics": analysis.get("statistics", {})
})
return samples
def compare_vectors(self, column_name: str) -> Dict[str, Any]:
"""
Compare different vector statistics for the same column
Args:
column_name: column name
Returns:
Dict: comparison results
"""
vector_analysis = self.analyze_vectors()
column_vectors = [v for v in vector_analysis if v["column_name"] == column_name]
if len(column_vectors) < 2:
return {}
comparison = {
"column_name": column_name,
"vector_count": len(column_vectors),
"comparison": {}
}
for vec_analysis in column_vectors:
stat_type = vec_analysis["stat_type"]
analysis = vec_analysis["analysis"]
comparison["comparison"][stat_type] = {
"vector_type": analysis["vector_type"],
"dimension": analysis["dimension"],
"statistics": analysis.get("statistics", {})
}
return comparison
def validate_vector_consistency(self) -> Dict[str, Any]:
"""
Validate vector data consistency
Returns:
Dict: validation results
"""
vector_analysis = self.analyze_vectors()
validation_result = {
"total_vectors": len(vector_analysis),
"consistent_columns": [],
"inconsistent_columns": [],
"details": {}
}
# Group by column
columns = {}
for vec_analysis in vector_analysis:
col_name = vec_analysis["column_name"]
if col_name not in columns:
columns[col_name] = []
columns[col_name].append(vec_analysis)
for col_name, vec_list in columns.items():
if len(vec_list) >= 2:
# Check if vector types are consistent for the same column
vector_types = set(v["analysis"]["vector_type"] for v in vec_list)
dimensions = set(v["analysis"]["dimension"] for v in vec_list)
is_consistent = len(vector_types) == 1 and len(dimensions) == 1
validation_result["details"][col_name] = {
"vector_types": list(vector_types),
"dimensions": list(dimensions),
"is_consistent": is_consistent,
"vector_count": len(vec_list)
}
if is_consistent:
validation_result["consistent_columns"].append(col_name)
else:
validation_result["inconsistent_columns"].append(col_name)
return validation_result
def query_by_id(self, id_value: Any, id_column: str = None) -> Dict[str, Any]:
"""
Query data by ID value
Args:
id_value: ID value to search for
id_column: ID column name (if None, will try to find primary key column)
Returns:
Dict: query results
"""
try:
import pandas as pd
import pyarrow.parquet as pq
except ImportError:
return {"error": "pandas and pyarrow are required for ID query"}
if not self.meta_parser.metadata:
return {"error": "Parquet file not loaded"}
try:
# Read the parquet file
df = pd.read_parquet(self.file_path)
# If no ID column specified, try to find primary key column
if id_column is None:
# Common primary key column names
pk_candidates = ['id', 'ID', 'Id', 'pk', 'PK', 'primary_key', 'row_id', 'RowID']
for candidate in pk_candidates:
if candidate in df.columns:
id_column = candidate
break
if id_column is None:
# If no common PK found, use the first column
id_column = df.columns[0]
if id_column not in df.columns:
return {
"error": f"ID column '{id_column}' not found in the data",
"available_columns": list(df.columns)
}
# Query by ID
result = df[df[id_column] == id_value]
if result.empty:
return {
"found": False,
"id_column": id_column,
"id_value": id_value,
"message": f"No record found with {id_column} = {id_value}"
}
# Convert to dict for JSON serialization
record = result.iloc[0].to_dict()
# Handle vector columns if present
vector_columns = []
for col_name, value in record.items():
if isinstance(value, bytes) and len(value) > 32:
# This might be a vector, try to deserialize
try:
vector_analysis = VectorDeserializer.deserialize_with_analysis(value, col_name)
if vector_analysis:
vector_columns.append({
"column_name": col_name,
"analysis": vector_analysis
})
# Replace bytes with analysis summary
if vector_analysis["vector_type"] == "JSON":
# For JSON, show the actual content
record[col_name] = vector_analysis["deserialized"]
elif vector_analysis["vector_type"] == "Array":
# For Array, show the actual content
record[col_name] = vector_analysis["deserialized"]
else:
# For vectors, show type and dimension
record[col_name] = {
"vector_type": vector_analysis["vector_type"],
"dimension": vector_analysis["dimension"],
"data_preview": vector_analysis["deserialized"][:5] if vector_analysis["deserialized"] else []
}
except Exception:
# If deserialization fails, keep as bytes but truncate for display
record[col_name] = f"<binary data: {len(value)} bytes>"
return {
"found": True,
"id_column": id_column,
"id_value": id_value,
"record": record,
"vector_columns": vector_columns,
"total_columns": len(df.columns),
"total_rows": len(df)
}
except Exception as e:
return {"error": f"Query failed: {str(e)}"}
def get_id_column_info(self) -> Dict[str, Any]:
"""
Get information about ID columns in the data
Returns:
Dict: ID column information
"""
try:
import pandas as pd
except ImportError:
return {"error": "pandas is required for ID column analysis"}
if not self.meta_parser.metadata:
return {"error": "Parquet file not loaded"}
try:
df = pd.read_parquet(self.file_path)
# Find potential ID columns
id_columns = []
for col in df.columns:
col_data = df[col]
# Check if column looks like an ID column
is_unique = col_data.nunique() == len(col_data)
is_numeric = pd.api.types.is_numeric_dtype(col_data)
is_integer = pd.api.types.is_integer_dtype(col_data)
id_columns.append({
"column_name": col,
"is_unique": is_unique,
"is_numeric": is_numeric,
"is_integer": is_integer,
"unique_count": col_data.nunique(),
"total_count": len(col_data),
"min_value": col_data.min() if is_numeric else None,
"max_value": col_data.max() if is_numeric else None,
"sample_values": col_data.head(5).tolist()
})
return {
"total_columns": len(df.columns),
"total_rows": len(df),
"id_columns": id_columns,
"recommended_id_column": self._get_recommended_id_column(id_columns)
}
except Exception as e:
return {"error": f"ID column analysis failed: {str(e)}"}
def _get_recommended_id_column(self, id_columns: List[Dict[str, Any]]) -> str:
"""
Get recommended ID column based on heuristics
Args:
id_columns: List of ID column information
Returns:
str: Recommended ID column name
"""
# Priority order for ID columns
priority_names = ['id', 'ID', 'Id', 'pk', 'PK', 'primary_key', 'row_id', 'RowID']
# First, look for columns with priority names that are unique
for priority_name in priority_names:
for col_info in id_columns:
if (col_info["column_name"].lower() == priority_name.lower() and
col_info["is_unique"]):
return col_info["column_name"]
# Then, look for any unique integer column
for col_info in id_columns:
if col_info["is_unique"] and col_info["is_integer"]:
return col_info["column_name"]
# Finally, look for any unique column
for col_info in id_columns:
if col_info["is_unique"]:
return col_info["column_name"]
# If no unique column found, return the first column
return id_columns[0]["column_name"] if id_columns else ""

View File

@ -0,0 +1,448 @@
"""
Parquet Metadata Parser Component
Responsible for parsing parquet file metadata information
"""
import pyarrow.parquet as pq
import json
from pathlib import Path
from typing import Dict, List, Any, Optional
class ParquetMetaParser:
"""Parquet file Metadata parser"""
def __init__(self, file_path: str):
"""
Initialize parser
Args:
file_path: parquet file path
"""
self.file_path = Path(file_path)
self.parquet_file = None
self.metadata = None
self.schema = None
def load(self) -> bool:
"""
Load parquet file
Returns:
bool: whether loading was successful
"""
try:
self.parquet_file = pq.ParquetFile(self.file_path)
self.metadata = self.parquet_file.metadata
self.schema = self.parquet_file.schema_arrow
return True
except Exception as e:
print(f"❌ Failed to load parquet file: {e}")
return False
def get_basic_info(self) -> Dict[str, Any]:
"""
Get basic information
Returns:
Dict: dictionary containing basic file information
"""
if not self.metadata:
return {}
file_size = self.file_path.stat().st_size
return {
"name": self.file_path.name,
"size_bytes": file_size,
"size_mb": file_size / 1024 / 1024,
"num_rows": self.metadata.num_rows,
"num_columns": self.metadata.num_columns,
"num_row_groups": self.metadata.num_row_groups,
"created_by": self.metadata.created_by,
"format_version": self.metadata.format_version,
"footer_size_bytes": self.metadata.serialized_size
}
def get_file_metadata(self) -> Dict[str, str]:
"""
Get file-level metadata
Returns:
Dict: file-level metadata dictionary
"""
if not self.metadata or not self.metadata.metadata:
return {}
result = {}
for key, value in self.metadata.metadata.items():
key_str = key.decode() if isinstance(key, bytes) else key
value_str = value.decode() if isinstance(value, bytes) else value
result[key_str] = value_str
return result
def get_schema_metadata(self) -> List[Dict[str, Any]]:
"""
Get Schema-level metadata
Returns:
List: Schema-level metadata list
"""
if not self.schema:
return []
result = []
for i, field in enumerate(self.schema):
if field.metadata:
field_metadata = {}
for k, v in field.metadata.items():
k_str = k.decode() if isinstance(k, bytes) else k
v_str = v.decode() if isinstance(v, bytes) else v
field_metadata[k_str] = v_str
result.append({
"column_index": i,
"column_name": field.name,
"column_type": str(field.type),
"metadata": field_metadata
})
return result
def get_column_statistics(self) -> List[Dict[str, Any]]:
"""
Get column statistics
Returns:
List: column statistics list
"""
if not self.metadata:
return []
result = []
for i in range(self.metadata.num_columns):
col_meta = self.metadata.row_group(0).column(i)
col_stats = {
"column_name": col_meta.path_in_schema,
"compression": str(col_meta.compression),
"encodings": [str(enc) for enc in col_meta.encodings],
"file_offset": col_meta.file_offset,
"compressed_size": col_meta.total_compressed_size,
"uncompressed_size": col_meta.total_uncompressed_size
}
if col_meta.statistics:
stats = col_meta.statistics
col_stats["statistics"] = {}
if hasattr(stats, 'null_count') and stats.null_count is not None:
col_stats["statistics"]["null_count"] = stats.null_count
if hasattr(stats, 'distinct_count') and stats.distinct_count is not None:
col_stats["statistics"]["distinct_count"] = stats.distinct_count
if hasattr(stats, 'min') and stats.min is not None:
if isinstance(stats.min, bytes):
col_stats["statistics"]["min"] = stats.min.hex()
else:
col_stats["statistics"]["min"] = stats.min
if hasattr(stats, 'max') and stats.max is not None:
if isinstance(stats.max, bytes):
col_stats["statistics"]["max"] = stats.max.hex()
else:
col_stats["statistics"]["max"] = stats.max
result.append(col_stats)
return result
def get_row_group_info(self) -> List[Dict[str, Any]]:
"""
Get row group information
Returns:
List: row group information list
"""
if not self.metadata:
return []
result = []
for i in range(self.metadata.num_row_groups):
row_group = self.metadata.row_group(i)
result.append({
"rowgroup_index": i,
"num_rows": row_group.num_rows,
"total_byte_size": row_group.total_byte_size,
"num_columns": row_group.num_columns
})
return result
def parse_row_group_metadata(self, metadata_str: str) -> List[Dict[str, Any]]:
"""
Parse row group metadata string
Args:
metadata_str: metadata string in format "bytes|rows|offset;bytes|rows|offset;..."
Returns:
List: parsed row group metadata
"""
if not metadata_str:
return []
result = []
groups = metadata_str.split(';')
for i, group in enumerate(groups):
if not group.strip():
continue
parts = group.split('|')
if len(parts) >= 3:
try:
bytes_size = int(parts[0])
rows = int(parts[1])
offset = int(parts[2])
result.append({
"rowgroup_index": i,
"bytes": bytes_size,
"rows": rows,
"offset": offset
})
except (ValueError, IndexError):
print(f"⚠️ Warning: Invalid row group metadata format: {group}")
continue
return result
def format_row_group_metadata(self, metadata_str: str) -> str:
"""
Format row group metadata string to readable format
Args:
metadata_str: metadata string in format "bytes|rows|offset;bytes|rows|offset;..."
Returns:
str: formatted string
"""
parsed = self.parse_row_group_metadata(metadata_str)
if not parsed:
return "No valid row group metadata found"
lines = []
for group in parsed:
lines.append(f"row group {group['rowgroup_index']}: {group['bytes']} bytes, {group['rows']} rows, {group['offset']} offset")
return '\n'.join(lines)
def parse_group_field_id_list(self, field_list_str: str) -> List[Dict[str, Any]]:
"""
Parse group field ID list string
Args:
field_list_str: field list string in format "field_ids;field_ids;..."
Returns:
List: parsed field group metadata
"""
if not field_list_str:
return []
result = []
groups = field_list_str.split(';')
for i, group in enumerate(groups):
if not group.strip():
continue
try:
field_ids = [int(fid.strip()) for fid in group.split(',') if fid.strip()]
result.append({
"group_index": i,
"field_ids": field_ids,
"field_count": len(field_ids)
})
except (ValueError, IndexError):
print(f"⚠️ Warning: Invalid field group format: {group}")
continue
return result
def format_group_field_id_list(self, field_list_str: str) -> str:
"""
Format group field ID list string to readable format
Args:
field_list_str: field list string in format "field_ids;field_ids;..."
Returns:
str: formatted string
"""
parsed = self.parse_group_field_id_list(field_list_str)
if not parsed:
return "No valid field group metadata found"
lines = []
for group in parsed:
field_ids_str = ','.join(map(str, group['field_ids']))
lines.append(f"column group {group['group_index']}: {field_ids_str}")
return '\n'.join(lines)
def parse_custom_metadata(self, metadata_dict: Dict[str, str]) -> Dict[str, Any]:
"""
Parse custom metadata and format special fields
Args:
metadata_dict: metadata dictionary
Returns:
Dict: parsed metadata with formatted special fields
"""
result = {
"raw_metadata": metadata_dict,
"formatted_metadata": {}
}
for key, value in metadata_dict.items():
if key == "row_group_metadata":
result["formatted_metadata"]["row_group_metadata"] = {
"raw": value,
"parsed": self.parse_row_group_metadata(value),
"formatted": self.format_row_group_metadata(value)
}
elif key == "group_field_id_list":
result["formatted_metadata"]["group_field_id_list"] = {
"raw": value,
"parsed": self.parse_group_field_id_list(value),
"formatted": self.format_group_field_id_list(value)
}
else:
result["formatted_metadata"][key] = value
return result
def get_metadata_summary(self) -> Dict[str, Any]:
"""
Get metadata summary information
Returns:
Dict: metadata summary
"""
file_metadata = self.get_file_metadata()
schema_metadata = self.get_schema_metadata()
summary = {
"file_metadata_count": len(file_metadata),
"schema_metadata_count": len(schema_metadata),
"total_metadata_count": len(file_metadata) + len(schema_metadata)
}
return summary
def export_metadata(self, output_file: Optional[str] = None) -> str:
"""
Export metadata to JSON file
Args:
output_file: output file path, if None will auto-generate
Returns:
str: output file path
"""
if output_file is None:
output_file = f"{self.file_path.stem}_metadata.json"
file_metadata = self.get_file_metadata()
parsed_metadata = self.parse_custom_metadata(file_metadata)
metadata_export = {
"file_info": self.get_basic_info(),
"file_metadata": file_metadata,
"parsed_metadata": parsed_metadata,
"schema_metadata": self.get_schema_metadata(),
"column_statistics": self.get_column_statistics(),
"row_group_info": self.get_row_group_info(),
"metadata_summary": self.get_metadata_summary()
}
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(metadata_export, f, indent=2, ensure_ascii=False)
return output_file
def print_summary(self):
"""Print metadata summary"""
if not self.metadata:
print("❌ No parquet file loaded")
return
basic_info = self.get_basic_info()
summary = self.get_metadata_summary()
file_metadata = self.get_file_metadata()
print(f"📊 Parquet File Metadata Summary: {basic_info['name']}")
print("=" * 60)
print(f" File Size: {basic_info['size_mb']:.2f} MB")
print(f" Total Rows: {basic_info['num_rows']:,}")
print(f" Columns: {basic_info['num_columns']}")
print(f" Row Groups: {basic_info['num_row_groups']}")
print(f" Created By: {basic_info['created_by']}")
print(f" Parquet Version: {basic_info['format_version']}")
print(f" Footer Size: {basic_info['footer_size_bytes']:,} bytes")
print(f" File-level Metadata: {summary['file_metadata_count']} items")
print(f" Schema-level Metadata: {summary['schema_metadata_count']} items")
print(f" Total Metadata: {summary['total_metadata_count']} items")
# Print formatted custom metadata
if file_metadata:
parsed_metadata = self.parse_custom_metadata(file_metadata)
formatted = parsed_metadata.get("formatted_metadata", {})
if "row_group_metadata" in formatted:
print("\n📋 Row Group Metadata:")
print("-" * 30)
print(formatted["row_group_metadata"]["formatted"])
if "group_field_id_list" in formatted:
print("\n📋 Column Group Metadata:")
print("-" * 30)
print(formatted["group_field_id_list"]["formatted"])
def print_formatted_metadata(self, metadata_key: str = None):
"""
Print formatted metadata for specific keys
Args:
metadata_key: specific metadata key to format, if None prints all
"""
if not self.metadata:
print("❌ No parquet file loaded")
return
file_metadata = self.get_file_metadata()
if not file_metadata:
print("❌ No file metadata found")
return
parsed_metadata = self.parse_custom_metadata(file_metadata)
formatted = parsed_metadata.get("formatted_metadata", {})
if metadata_key:
if metadata_key in formatted:
print(f"\n📋 {metadata_key.upper()} Metadata:")
print("-" * 50)
print(formatted[metadata_key]["formatted"])
else:
print(f"❌ Metadata key '{metadata_key}' not found")
else:
# Print all formatted metadata
for key, value in formatted.items():
if isinstance(value, dict) and "formatted" in value:
print(f"\n📋 {key.upper()} Metadata:")
print("-" * 50)
print(value["formatted"])
else:
print(f"\n📋 {key.upper()}: {value}")

View File

@ -0,0 +1,567 @@
"""
Vector Deserializer Component
Responsible for deserializing vector data in parquet files
References deserialization logic from serde.go
"""
import struct
import numpy as np
from typing import List, Optional, Union, Dict, Any
class VectorDeserializer:
"""Vector deserializer"""
@staticmethod
def detect_vector_type_and_dim(bytes_data: bytes, field_name: str = "") -> tuple[str, int]:
"""
Detect vector type and dimension based on field name and byte data
Args:
bytes_data: byte data
field_name: field name
Returns:
tuple: (vector_type, dimension)
"""
if not bytes_data:
return "Unknown", 0
# First, try to detect if it's JSON data
try:
# Check if it starts with { or [ (JSON indicators)
if bytes_data.startswith(b'{') or bytes_data.startswith(b'['):
return "JSON", len(bytes_data)
# Try to decode as UTF-8 and check if it's valid JSON
decoded = bytes_data.decode('utf-8', errors='ignore')
if decoded.startswith('{') or decoded.startswith('['):
return "JSON", len(bytes_data)
except:
pass
# Check for specific field name patterns
if "json" in field_name.lower():
return "JSON", len(bytes_data)
# Check for array patterns (Protocol Buffers style)
if "array" in field_name.lower():
# Check if it looks like a protobuf array
if len(bytes_data) > 2 and bytes_data[1] == ord('-') and b'\n\x01' in bytes_data:
return "Array", len(bytes_data)
# Infer type based on field name
if "vector" in field_name.lower():
# For vector fields, prioritize checking if it's Int8Vector (one uint8 per byte)
if len(bytes_data) <= 256: # Usually vector dimension won't exceed 256
return "Int8Vector", len(bytes_data)
elif len(bytes_data) % 4 == 0:
dim = len(bytes_data) // 4
return "FloatVector", dim
elif len(bytes_data) % 2 == 0:
dim = len(bytes_data) // 2
if "float16" in field_name.lower():
return "Float16Vector", dim
elif "bfloat16" in field_name.lower():
return "BFloat16Vector", dim
else:
return "Float16Vector", dim # default
else:
# Binary vector, calculate dimension
dim = len(bytes_data) * 8
return "BinaryVector", dim
else:
# Infer based on byte count
if len(bytes_data) % 4 == 0:
dim = len(bytes_data) // 4
return "FloatVector", dim
elif len(bytes_data) % 2 == 0:
dim = len(bytes_data) // 2
return "Float16Vector", dim
else:
dim = len(bytes_data) * 8
return "BinaryVector", dim
@staticmethod
def deserialize_float_vector(bytes_data: bytes, dim: Optional[int] = None) -> Optional[List[float]]:
"""
Deserialize FloatVector
References FloatVector processing logic from serde.go
Args:
bytes_data: byte data
dim: dimension, if None will auto-calculate
Returns:
List[float]: deserialized float vector
"""
if not bytes_data:
return None
try:
if dim is None:
dim = len(bytes_data) // 4
if len(bytes_data) != dim * 4:
raise ValueError(f"FloatVector size mismatch: expected {dim * 4}, got {len(bytes_data)}")
# Use struct to unpack float32 data
floats = struct.unpack(f'<{dim}f', bytes_data)
return list(floats)
except Exception as e:
print(f"FloatVector deserialization failed: {e}")
return None
@staticmethod
def deserialize_binary_vector(bytes_data: bytes, dim: Optional[int] = None) -> Optional[List[int]]:
"""
Deserialize BinaryVector
References BinaryVector processing logic from serde.go
Args:
bytes_data: byte data
dim: dimension, if None will auto-calculate
Returns:
List[int]: deserialized binary vector
"""
if not bytes_data:
return None
try:
if dim is None:
dim = len(bytes_data) * 8
expected_size = (dim + 7) // 8
if len(bytes_data) != expected_size:
raise ValueError(f"BinaryVector size mismatch: expected {expected_size}, got {len(bytes_data)}")
# Convert to binary vector
binary_vector = []
for byte in bytes_data:
for i in range(8):
bit = (byte >> i) & 1
binary_vector.append(bit)
# Only return first dim elements
return binary_vector[:dim]
except Exception as e:
print(f"BinaryVector deserialization failed: {e}")
return None
@staticmethod
def deserialize_int8_vector(bytes_data: bytes, dim: Optional[int] = None) -> Optional[List[int]]:
"""
Deserialize Int8Vector
References Int8Vector processing logic from serde.go
Args:
bytes_data: byte data
dim: dimension, if None will auto-calculate
Returns:
List[int]: deserialized int8 vector
"""
if not bytes_data:
return None
try:
if dim is None:
dim = len(bytes_data)
if len(bytes_data) != dim:
raise ValueError(f"Int8Vector size mismatch: expected {dim}, got {len(bytes_data)}")
# Convert to int8 array
int8_vector = [int8 for int8 in bytes_data]
return int8_vector
except Exception as e:
print(f"Int8Vector deserialization failed: {e}")
return None
@staticmethod
def deserialize_float16_vector(bytes_data: bytes, dim: Optional[int] = None) -> Optional[List[float]]:
"""
Deserialize Float16Vector
References Float16Vector processing logic from serde.go
Args:
bytes_data: byte data
dim: dimension, if None will auto-calculate
Returns:
List[float]: deserialized float16 vector
"""
if not bytes_data:
return None
try:
if dim is None:
dim = len(bytes_data) // 2
if len(bytes_data) != dim * 2:
raise ValueError(f"Float16Vector size mismatch: expected {dim * 2}, got {len(bytes_data)}")
# Convert to float16 array
float16_vector = []
for i in range(0, len(bytes_data), 2):
if i + 1 < len(bytes_data):
# Simple float16 conversion (simplified here)
uint16 = struct.unpack('<H', bytes_data[i:i+2])[0]
# Convert to float32 (simplified version)
float_val = float(uint16) / 65535.0 # normalization
float16_vector.append(float_val)
return float16_vector
except Exception as e:
print(f"Float16Vector deserialization failed: {e}")
return None
@staticmethod
def deserialize_bfloat16_vector(bytes_data: bytes, dim: Optional[int] = None) -> Optional[List[float]]:
"""
Deserialize BFloat16Vector
References BFloat16Vector processing logic from serde.go
Args:
bytes_data: byte data
dim: dimension, if None will auto-calculate
Returns:
List[float]: deserialized bfloat16 vector
"""
if not bytes_data:
return None
try:
if dim is None:
dim = len(bytes_data) // 2
if len(bytes_data) != dim * 2:
raise ValueError(f"BFloat16Vector size mismatch: expected {dim * 2}, got {len(bytes_data)}")
# Convert to bfloat16 array
bfloat16_vector = []
for i in range(0, len(bytes_data), 2):
if i + 1 < len(bytes_data):
# Simple bfloat16 conversion (simplified here)
uint16 = struct.unpack('<H', bytes_data[i:i+2])[0]
# Convert to float32 (simplified version)
float_val = float(uint16) / 65535.0 # normalization
bfloat16_vector.append(float_val)
return bfloat16_vector
except Exception as e:
print(f"BFloat16Vector deserialization failed: {e}")
return None
@staticmethod
def deserialize_json(bytes_data: bytes) -> Optional[Dict[str, Any]]:
"""
Deserialize JSON data
Args:
bytes_data: byte data
Returns:
Dict[str, Any]: deserialized JSON object
"""
if not bytes_data:
return None
try:
import json
# Try to decode as UTF-8
decoded = bytes_data.decode('utf-8')
return json.loads(decoded)
except UnicodeDecodeError:
try:
# Try with different encoding
decoded = bytes_data.decode('latin-1')
return json.loads(decoded)
except:
pass
except json.JSONDecodeError as e:
print(f"JSON deserialization failed: {e}")
return None
except Exception as e:
print(f"JSON deserialization failed: {e}")
return None
@staticmethod
def deserialize_array(bytes_data: bytes) -> Optional[List[str]]:
"""
Deserialize array data (Protocol Buffers style)
Args:
bytes_data: byte data
Returns:
List[str]: deserialized array
"""
if not bytes_data:
return None
try:
# Parse protobuf-style array format
# Format: length + separator + data
# Example: b'2-\n\x01q\n\x01k\n\x01l...'
# Skip the first few bytes (length and separator)
if len(bytes_data) < 3:
return None
# Find the start of data (after the first separator)
start_idx = 0
for i in range(len(bytes_data) - 1):
if bytes_data[i:i+2] == b'\n\x01':
start_idx = i + 2
break
if start_idx == 0:
return None
# Extract the data part
data_part = bytes_data[start_idx:]
# Split by separator and decode
array_items = []
# Split by \n\x01 separator
parts = data_part.split(b'\n\x01')
for part in parts:
if part: # Skip empty parts
try:
# Each part should be a single character
if len(part) == 1:
char = part.decode('utf-8')
array_items.append(char)
except UnicodeDecodeError:
try:
char = part.decode('latin-1')
array_items.append(char)
except:
pass
return array_items
except Exception as e:
print(f"Array deserialization failed: {e}")
return None
@staticmethod
def deserialize_vector(bytes_data: bytes, vector_type: str, dim: Optional[int] = None) -> Optional[Union[List[float], List[int], Dict[str, Any]]]:
"""
Generic vector deserialization method
Args:
bytes_data: byte data
vector_type: vector type
dim: dimension, if None will auto-calculate
Returns:
Union[List[float], List[int], Dict[str, Any]]: deserialized vector
"""
if not bytes_data:
return None
try:
if vector_type == "FloatVector":
return VectorDeserializer.deserialize_float_vector(bytes_data, dim)
elif vector_type == "BinaryVector":
return VectorDeserializer.deserialize_binary_vector(bytes_data, dim)
elif vector_type == "Int8Vector":
return VectorDeserializer.deserialize_int8_vector(bytes_data, dim)
elif vector_type == "Float16Vector":
return VectorDeserializer.deserialize_float16_vector(bytes_data, dim)
elif vector_type == "BFloat16Vector":
return VectorDeserializer.deserialize_bfloat16_vector(bytes_data, dim)
elif vector_type == "JSON":
return VectorDeserializer.deserialize_json(bytes_data)
elif vector_type == "Array":
return VectorDeserializer.deserialize_array(bytes_data)
else:
# Other binary types, return hex representation of original bytes
return bytes_data.hex()
except Exception as e:
print(f"Vector deserialization failed: {e}")
return bytes_data.hex()
@staticmethod
def analyze_vector_statistics(vector_data: Union[List[float], List[int], Dict[str, Any]], vector_type: str) -> Dict[str, Any]:
"""
Analyze vector statistics
Args:
vector_data: vector data
vector_type: vector type
Returns:
Dict: statistics information
"""
if not vector_data:
return {}
try:
if vector_type == "JSON":
# For JSON data, return basic info
if isinstance(vector_data, dict):
return {
"vector_type": vector_type,
"keys": list(vector_data.keys()),
"key_count": len(vector_data),
"is_object": True
}
elif isinstance(vector_data, list):
return {
"vector_type": vector_type,
"length": len(vector_data),
"is_array": True
}
else:
return {
"vector_type": vector_type,
"value_type": type(vector_data).__name__
}
elif vector_type == "Array":
# For Array data, return basic info
if isinstance(vector_data, list):
return {
"vector_type": vector_type,
"length": len(vector_data),
"is_array": True,
"item_types": list(set(type(item).__name__ for item in vector_data))
}
else:
return {
"vector_type": vector_type,
"value_type": type(vector_data).__name__
}
# For numeric vectors
array_data = np.array(vector_data)
stats = {
"vector_type": vector_type,
"dimension": len(vector_data),
"min": float(array_data.min()),
"max": float(array_data.max()),
"mean": float(array_data.mean()),
"std": float(array_data.std())
}
if vector_type == "BinaryVector":
stats["zero_count"] = int(np.sum(array_data == 0))
stats["one_count"] = int(np.sum(array_data == 1))
return stats
except Exception as e:
print(f"Statistics calculation failed: {e}")
return {}
@staticmethod
def analyze_vector_pattern(bytes_data: bytes) -> Dict[str, Any]:
"""
Analyze vector data patterns
Args:
bytes_data: byte data
Returns:
Dict: pattern analysis results
"""
if not bytes_data:
return {}
try:
# Convert to integer array
int_data = list(bytes_data)
# Check if it's an increasing pattern
is_increasing = all(int_data[i] <= int_data[i+1] for i in range(len(int_data)-1))
is_decreasing = all(int_data[i] >= int_data[i+1] for i in range(len(int_data)-1))
# Check if it's a cyclic pattern (i + j) % 256
is_cyclic = True
for i in range(min(10, len(int_data))): # Check first 10 elements
expected = (i) % 256
if int_data[i] != expected:
is_cyclic = False
break
# Check if it's a sequential pattern [start, start+1, start+2, ...]
is_sequential = True
if len(int_data) > 1:
start_val = int_data[0]
for i in range(1, min(10, len(int_data))):
if int_data[i] != start_val + i:
is_sequential = False
break
else:
is_sequential = False
# Calculate statistics
unique_values = len(set(int_data))
min_val = min(int_data)
max_val = max(int_data)
avg_val = sum(int_data) / len(int_data)
return {
"is_increasing": is_increasing,
"is_decreasing": is_decreasing,
"is_cyclic": is_cyclic,
"is_sequential": is_sequential,
"unique_values": unique_values,
"min_value": min_val,
"max_value": max_val,
"avg_value": avg_val,
"pattern_type": "sequential" if is_sequential else "cyclic" if is_cyclic else "increasing" if is_increasing else "decreasing" if is_decreasing else "random"
}
except Exception as e:
print(f"Pattern analysis failed: {e}")
return {}
@staticmethod
def deserialize_with_analysis(bytes_data: bytes, field_name: str = "") -> Dict[str, Any]:
"""
Deserialize and analyze vector data
Args:
bytes_data: byte data
field_name: field name
Returns:
Dict: dictionary containing deserialization results and statistics
"""
if not bytes_data:
return {}
# Detect vector type and dimension
vector_type, dim = VectorDeserializer.detect_vector_type_and_dim(bytes_data, field_name)
# Analyze data patterns
pattern_analysis = VectorDeserializer.analyze_vector_pattern(bytes_data)
# Deserialize
deserialized_data = VectorDeserializer.deserialize_vector(bytes_data, vector_type, dim)
# Analyze statistics
stats = VectorDeserializer.analyze_vector_statistics(deserialized_data, vector_type)
return {
"raw_hex": bytes_data.hex(),
"vector_type": vector_type,
"dimension": dim,
"deserialized": deserialized_data,
"statistics": stats,
"pattern_analysis": pattern_analysis
}

View File

@ -0,0 +1,498 @@
#!/usr/bin/env python3
"""
Parquet Analyzer Command Line Tool
Provides a simple command line interface to use the parquet analyzer
"""
import argparse
import sys
import json
import pandas as pd
import pyarrow.parquet as pq
from pathlib import Path
# Add current directory to Python path
sys.path.append(str(Path(__file__).parent))
from parquet_analyzer import ParquetAnalyzer, ParquetMetaParser, VectorDeserializer
def main():
"""Main function"""
parser = argparse.ArgumentParser(
description="Parquet Analyzer Command Line Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Usage Examples:
python parquet_analyzer_cli.py analyze test_large_batch.parquet
python parquet_analyzer_cli.py metadata test_large_batch.parquet
python parquet_analyzer_cli.py vector test_large_batch.parquet
python parquet_analyzer_cli.py export test_large_batch.parquet --output result.json
python parquet_analyzer_cli.py data test_large_batch.parquet --rows 10 --output data.json
python parquet_analyzer_cli.py query test_large_batch.parquet --id-value 123
python parquet_analyzer_cli.py query test_large_batch.parquet --id-value 123 --id-column user_id
"""
)
parser.add_argument(
"command",
choices=["analyze", "metadata", "vector", "export", "data", "query"],
help="Command to execute"
)
parser.add_argument(
"file",
help="Parquet file path"
)
parser.add_argument(
"--output", "-o",
help="Output file path (for export and data commands)"
)
parser.add_argument(
"--rows", "-r",
type=int,
default=10,
help="Number of rows to export (only for data command, default: 10 rows)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Verbose output"
)
# Query-specific arguments
parser.add_argument(
"--id-value", "-i",
help="ID value to query (for query command)"
)
parser.add_argument(
"--id-column", "-c",
help="ID column name (for query command, auto-detected if not specified)"
)
args = parser.parse_args()
# Check if file exists
if not Path(args.file).exists():
print(f"❌ File does not exist: {args.file}")
sys.exit(1)
if args.command == "analyze":
analyze_file(args.file, args.verbose)
elif args.command == "metadata":
analyze_metadata(args.file, args.verbose)
elif args.command == "vector":
analyze_vectors(args.file, args.verbose)
elif args.command == "export":
export_analysis(args.file, args.output, args.verbose)
elif args.command == "data":
export_data(args.file, args.output, args.rows, args.verbose)
elif args.command == "query":
query_by_id(args.file, args.id_value, args.id_column, args.verbose)
def analyze_file(file_path: str, verbose: bool = False):
"""Analyze parquet file"""
print(f"🔍 Analyzing parquet file: {Path(file_path).name}")
print("=" * 60)
analyzer = ParquetAnalyzer(file_path)
if not analyzer.load():
print("❌ Failed to load parquet file")
sys.exit(1)
# Print summary
analyzer.print_summary()
if verbose:
# Detailed analysis
analysis = analyzer.analyze()
print(f"\n📊 Detailed Analysis Results:")
print(f" File Info: {analysis['metadata']['basic_info']['name']}")
print(f" Size: {analysis['metadata']['basic_info']['size_mb']:.2f} MB")
print(f" Rows: {analysis['metadata']['basic_info']['num_rows']:,}")
print(f" Columns: {analysis['metadata']['basic_info']['num_columns']}")
# Display vector analysis
if analysis['vectors']:
print(f"\n🔍 Vector Analysis:")
for vec_analysis in analysis['vectors']:
col_name = vec_analysis['column_name']
stat_type = vec_analysis['stat_type']
analysis_data = vec_analysis['analysis']
print(f" {col_name} ({stat_type}):")
print(f" Vector Type: {analysis_data['vector_type']}")
print(f" Dimension: {analysis_data['dimension']}")
if analysis_data['statistics']:
stats = analysis_data['statistics']
print(f" Min: {stats.get('min', 'N/A')}")
print(f" Max: {stats.get('max', 'N/A')}")
print(f" Mean: {stats.get('mean', 'N/A')}")
print(f" Std: {stats.get('std', 'N/A')}")
def analyze_metadata(file_path: str, verbose: bool = False):
"""Analyze metadata"""
print(f"📄 Analyzing metadata: {Path(file_path).name}")
print("=" * 60)
meta_parser = ParquetMetaParser(file_path)
if not meta_parser.load():
print("❌ Failed to load parquet file")
sys.exit(1)
# Basic information
basic_info = meta_parser.get_basic_info()
print(f"📊 File Information:")
print(f" Name: {basic_info['name']}")
print(f" Size: {basic_info['size_mb']:.2f} MB")
print(f" Rows: {basic_info['num_rows']:,}")
print(f" Columns: {basic_info['num_columns']}")
print(f" Row Groups: {basic_info['num_row_groups']}")
print(f" Created By: {basic_info['created_by']}")
print(f" Parquet Version: {basic_info['format_version']}")
# File-level metadata
file_metadata = meta_parser.get_file_metadata()
if file_metadata:
print(f"\n📄 File-level Metadata:")
for key, value in file_metadata.items():
print(f" {key}: {value}")
# Schema-level metadata
schema_metadata = meta_parser.get_schema_metadata()
if schema_metadata:
print(f"\n📋 Schema-level Metadata:")
for field in schema_metadata:
print(f" {field['column_name']}: {field['column_type']}")
for k, v in field['metadata'].items():
print(f" {k}: {v}")
# Column statistics
column_stats = meta_parser.get_column_statistics()
if column_stats:
print(f"\n📈 Column Statistics:")
for col_stats in column_stats:
print(f" {col_stats['column_name']}:")
print(f" Compression: {col_stats['compression']}")
print(f" Encodings: {', '.join(col_stats['encodings'])}")
print(f" Compressed Size: {col_stats['compressed_size']:,} bytes")
print(f" Uncompressed Size: {col_stats['uncompressed_size']:,} bytes")
if 'statistics' in col_stats and col_stats['statistics']:
stats = col_stats['statistics']
if 'null_count' in stats:
print(f" Null Count: {stats['null_count']}")
if 'distinct_count' in stats:
print(f" Distinct Count: {stats['distinct_count']}")
if 'min' in stats:
print(f" Min: {stats['min']}")
if 'max' in stats:
print(f" Max: {stats['max']}")
def analyze_vectors(file_path: str, verbose: bool = False):
"""Analyze vector data"""
print(f"🔍 Analyzing vector data: {Path(file_path).name}")
print("=" * 60)
analyzer = ParquetAnalyzer(file_path)
if not analyzer.load():
print("❌ Failed to load parquet file")
sys.exit(1)
vector_analysis = analyzer.analyze_vectors()
if not vector_analysis:
print("❌ No vector data found")
return
print(f"📊 Found {len(vector_analysis)} vector statistics:")
for vec_analysis in vector_analysis:
col_name = vec_analysis['column_name']
stat_type = vec_analysis['stat_type']
analysis = vec_analysis['analysis']
print(f"\n🔍 {col_name} ({stat_type}):")
print(f" Vector Type: {analysis['vector_type']}")
print(f" Dimension: {analysis['dimension']}")
if analysis['statistics']:
stats = analysis['statistics']
print(f" Min: {stats.get('min', 'N/A')}")
print(f" Max: {stats.get('max', 'N/A')}")
print(f" Mean: {stats.get('mean', 'N/A')}")
print(f" Std: {stats.get('std', 'N/A')}")
if analysis['vector_type'] == "BinaryVector":
print(f" Zero Count: {stats.get('zero_count', 'N/A')}")
print(f" One Count: {stats.get('one_count', 'N/A')}")
if verbose and analysis['deserialized']:
print(f" First 5 elements: {analysis['deserialized'][:5]}")
# Validate consistency
validation = analyzer.validate_vector_consistency()
print(f"\n✅ Vector Consistency Validation:")
print(f" Total Vectors: {validation['total_vectors']}")
print(f" Consistent Columns: {validation['consistent_columns']}")
print(f" Inconsistent Columns: {validation['inconsistent_columns']}")
def export_analysis(file_path: str, output_file: str = None, verbose: bool = False):
"""Export analysis results"""
print(f"💾 Exporting analysis results: {Path(file_path).name}")
print("=" * 60)
analyzer = ParquetAnalyzer(file_path)
if not analyzer.load():
print("❌ Failed to load parquet file")
sys.exit(1)
# Export analysis results
output_file = analyzer.export_analysis(output_file)
print(f"✅ Analysis results exported to: {output_file}")
if verbose:
# Show file size
file_size = Path(output_file).stat().st_size
print(f"📊 Output file size: {file_size:,} bytes ({file_size/1024:.2f} KB)")
# Show summary
analysis = analyzer.analyze()
print(f"📈 Analysis Summary:")
print(f" Metadata Count: {analysis['metadata']['metadata_summary']['total_metadata_count']}")
print(f" Vector Count: {len(analysis['vectors'])}")
def export_data(file_path: str, output_file: str = None, num_rows: int = 10, verbose: bool = False):
"""Export first N rows of parquet file data"""
print(f"📊 Exporting data: {Path(file_path).name}")
print("=" * 60)
try:
# Read parquet file
table = pq.read_table(file_path)
df = table.to_pandas()
# Get first N rows
data_subset = df.head(num_rows)
# Process vector columns, convert bytes to readable format
processed_data = []
for idx, row in data_subset.iterrows():
row_dict = {}
for col_name, value in row.items():
if isinstance(value, bytes):
# Try to deserialize as vector
try:
vec_analysis = VectorDeserializer.deserialize_with_analysis(value, col_name)
if vec_analysis and vec_analysis['deserialized']:
if vec_analysis['vector_type'] == "JSON":
# For JSON, show the actual content
row_dict[col_name] = vec_analysis['deserialized']
elif vec_analysis['vector_type'] == "Array":
# For Array, show the actual content
row_dict[col_name] = vec_analysis['deserialized']
else:
# For vectors, show type and dimension
row_dict[col_name] = {
"vector_type": vec_analysis['vector_type'],
"dimension": vec_analysis['dimension'],
"data": vec_analysis['deserialized'][:10], # Only show first 10 elements
"raw_hex": value.hex()[:50] + "..." if len(value.hex()) > 50 else value.hex()
}
else:
row_dict[col_name] = {
"type": "binary",
"size": len(value),
"hex": value.hex()[:50] + "..." if len(value.hex()) > 50 else value.hex()
}
except Exception as e:
row_dict[col_name] = {
"type": "binary",
"size": len(value),
"hex": value.hex()[:50] + "..." if len(value.hex()) > 50 else value.hex(),
"error": str(e)
}
else:
row_dict[col_name] = value
processed_data.append(row_dict)
# Prepare output
result = {
"file_info": {
"name": Path(file_path).name,
"total_rows": len(df),
"total_columns": len(df.columns),
"exported_rows": len(processed_data)
},
"columns": list(df.columns),
"data": processed_data
}
# Determine output file
if not output_file:
output_file = f"{Path(file_path).stem}_data_{num_rows}rows.json"
# Save to file
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f"✅ Data exported to: {output_file}")
print(f"📊 Exported {len(processed_data)} rows (total {len(df)} rows)")
print(f"📋 Columns: {len(df.columns)}")
if verbose:
print(f"\n📈 Data Preview:")
for i, row_data in enumerate(processed_data[:3]): # Only show first 3 rows preview
print(f" Row {i+1}:")
for col_name, value in row_data.items():
if isinstance(value, dict) and 'vector_type' in value:
print(f" {col_name}: {value['vector_type']}({value['dimension']}) - {value['data'][:5]}...")
elif isinstance(value, dict) and 'type' in value:
print(f" {col_name}: {value['type']} ({value['size']} bytes)")
else:
print(f" {col_name}: {value}")
print()
return output_file
except Exception as e:
print(f"❌ Failed to export data: {e}")
sys.exit(1)
def query_by_id(file_path: str, id_value: str = None, id_column: str = None, verbose: bool = False):
"""Query data by ID value"""
print(f"🔍 Querying by ID: {Path(file_path).name}")
print("=" * 60)
analyzer = ParquetAnalyzer(file_path)
if not analyzer.load():
print("❌ Failed to load parquet file")
sys.exit(1)
# If no ID value provided, show ID column information
if id_value is None:
print("📋 ID Column Information:")
print("-" * 40)
id_info = analyzer.get_id_column_info()
if "error" in id_info:
print(f"{id_info['error']}")
sys.exit(1)
print(f"📊 Total rows: {id_info['total_rows']}")
print(f"📋 Total columns: {id_info['total_columns']}")
print(f"🎯 Recommended ID column: {id_info['recommended_id_column']}")
print()
print("📋 Available ID columns:")
for col_info in id_info['id_columns']:
status = "" if col_info['is_unique'] else "⚠️"
print(f" {status} {col_info['column_name']}")
print(f" - Unique: {col_info['is_unique']}")
print(f" - Type: {'Integer' if col_info['is_integer'] else 'Numeric' if col_info['is_numeric'] else 'Other'}")
print(f" - Range: {col_info['min_value']} to {col_info['max_value']}" if col_info['is_numeric'] else " - Range: N/A")
print(f" - Sample values: {col_info['sample_values'][:3]}")
print()
print("💡 Usage: python parquet_analyzer_cli.py query <file> --id-value <value> [--id-column <column>]")
return
# Convert ID value to appropriate type
try:
# Try to convert to integer first
if id_value.isdigit():
id_value = int(id_value)
elif id_value.replace('.', '').replace('-', '').isdigit():
id_value = float(id_value)
except ValueError:
# Keep as string if conversion fails
pass
# Perform the query
result = analyzer.query_by_id(id_value, id_column)
if "error" in result:
print(f"❌ Query failed: {result['error']}")
sys.exit(1)
if not result['found']:
print(f"{result['message']}")
return
# Display results
print(f"✅ Found record with {result['id_column']} = {result['id_value']}")
print(f"📊 Total columns: {result['total_columns']}")
print(f"📈 Total rows in file: {result['total_rows']}")
print()
print("📋 Record Data:")
print("-" * 40)
for col_name, value in result['record'].items():
if isinstance(value, dict) and 'vector_type' in value:
# Vector data
print(f" {col_name}:")
print(f" Type: {value['vector_type']}")
print(f" Dimension: {value['dimension']}")
print(f" Data preview: {value['data_preview'][:5]}...")
elif isinstance(value, dict) and 'name' in value:
# JSON data (likely a person record)
print(f" {col_name}:")
for key, val in value.items():
print(f" {key}: {val}")
elif isinstance(value, list) and len(value) > 0 and isinstance(value[0], str):
# String array data
print(f" {col_name}: {value}")
elif isinstance(value, list):
# Array data
print(f" {col_name}: {value}")
elif isinstance(value, str) and value.startswith('<binary data:'):
# Binary data
print(f" {col_name}: {value}")
else:
# Regular data
print(f" {col_name}: {value}")
# Show vector analysis if available
if result['vector_columns']:
print()
print("🔍 Vector Analysis:")
print("-" * 40)
for vec_info in result['vector_columns']:
col_name = vec_info['column_name']
analysis = vec_info['analysis']
print(f" {col_name}:")
print(f" Type: {analysis['vector_type']}")
print(f" Dimension: {analysis['dimension']}")
if 'statistics' in analysis:
stats = analysis['statistics']
print(f" Statistics: {stats}")
if verbose:
print()
print("🔍 Detailed Analysis:")
print("-" * 40)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()

View File

@ -0,0 +1,5 @@
streamlit>=1.28.0
duckdb>=0.9.0
pandas>=2.0.0
pathlib2>=2.3.7
minio>=7.2.0