From 43c3c160ff92ed19280c866a24f0938e819c47be Mon Sep 17 00:00:00 2001 From: sthuang <167743503+shaoting-huang@users.noreply.github.com> Date: Thu, 31 Jul 2025 15:05:37 +0800 Subject: [PATCH] feat: [StorageV2] cmd binlog tool (#43648) related: #39173 Core Features * Parquet File Analysis: Analyze Milvus binlog Parquet files with metadata extraction * MinIO Integration: Direct connection to MinIO storage for remote file analysis * Vector Data Deserialization: Specialized handling of Milvus vector data in binlog files * Interactive CLI: Command-line interface with interactive exploration Analysis Capabilities * Metadata & Vector Analysis: Extract schema info, row counts, and vector statistics * Data Export: Export data to JSON format with configurable limits * Query Functionality: Search for specific records by ID * Batch Processing: Analyze multiple Parquet files simultaneously User Experience * Verbose Output: Detailed logging for debugging * Error Handling: Robust error handling for file access and parsing * Flexible Output: Support for single file and batch analysis formats --------- Signed-off-by: shaoting-huang Co-authored-by: nico <109071306+NicoYuan1986@users.noreply.github.com> --- cmd/tools/binlogv2/.gitignore | 44 + cmd/tools/binlogv2/export_to_json.py | 237 ++++++ cmd/tools/binlogv2/minio_client.py | 688 +++++++++++++++ cmd/tools/binlogv2/minio_parquet_analyzer.py | 804 ++++++++++++++++++ .../binlogv2/parquet_analyzer/__init__.py | 11 + .../binlogv2/parquet_analyzer/analyzer.py | 494 +++++++++++ .../binlogv2/parquet_analyzer/meta_parser.py | 448 ++++++++++ .../parquet_analyzer/vector_deserializer.py | 567 ++++++++++++ cmd/tools/binlogv2/parquet_analyzer_cli.py | 498 +++++++++++ cmd/tools/binlogv2/requirements.txt | 5 + 10 files changed, 3796 insertions(+) create mode 100644 cmd/tools/binlogv2/.gitignore create mode 100644 cmd/tools/binlogv2/export_to_json.py create mode 100644 cmd/tools/binlogv2/minio_client.py create mode 100644 cmd/tools/binlogv2/minio_parquet_analyzer.py create mode 100644 cmd/tools/binlogv2/parquet_analyzer/__init__.py create mode 100644 cmd/tools/binlogv2/parquet_analyzer/analyzer.py create mode 100644 cmd/tools/binlogv2/parquet_analyzer/meta_parser.py create mode 100644 cmd/tools/binlogv2/parquet_analyzer/vector_deserializer.py create mode 100755 cmd/tools/binlogv2/parquet_analyzer_cli.py create mode 100644 cmd/tools/binlogv2/requirements.txt diff --git a/cmd/tools/binlogv2/.gitignore b/cmd/tools/binlogv2/.gitignore new file mode 100644 index 0000000000..701f5607b0 --- /dev/null +++ b/cmd/tools/binlogv2/.gitignore @@ -0,0 +1,44 @@ +# Python files +__pycache__/ +*.pyc +*.pyo +*.pyd +.Python +.pytest_cache +**/.pytest_cache + +# Virtual environments +venv/ +.venv/ +env/ +ENV/ + +# IDE files +.idea/ +.vscode/ +*.swp +*.swo + +# OS files +.DS_Store +Thumbs.db + +# Logs and output files +*.log +nohup.out +logs/ +test_out/ + +# Data files +*idmap*.txt +*.hdf5 +*.npy +*.numpy + +# Coverage reports +.coverage +htmlcov/ + +# Temporary files +*.tmp +*.temp diff --git a/cmd/tools/binlogv2/export_to_json.py b/cmd/tools/binlogv2/export_to_json.py new file mode 100644 index 0000000000..0eae054c18 --- /dev/null +++ b/cmd/tools/binlogv2/export_to_json.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 +""" +Parquet to JSON Export Tool +Specialized for exporting parquet file data to JSON format +""" + +import argparse +import json +import sys +from pathlib import Path +import pandas as pd +import pyarrow.parquet as pq +from parquet_analyzer import VectorDeserializer + + +def export_parquet_to_json(parquet_file: str, output_file: str = None, + num_rows: int = None, start_row: int = 0, + include_vectors: bool = True, + vector_format: str = "deserialized", + pretty_print: bool = True): + """ + Export parquet file to JSON format + + Args: + parquet_file: parquet file path + output_file: output JSON file path + num_rows: number of rows to export (None means all) + start_row: starting row number (0-based) + include_vectors: whether to include vector data + vector_format: vector format ("deserialized", "hex", "both") + pretty_print: whether to pretty print output + """ + + print(f"๐Ÿ“Š Exporting parquet file: {Path(parquet_file).name}") + print("=" * 60) + + try: + # Read parquet file + table = pq.read_table(parquet_file) + df = table.to_pandas() + + total_rows = len(df) + print(f"๐Ÿ“‹ File Information:") + print(f" Total Rows: {total_rows:,}") + print(f" Columns: {len(df.columns)}") + print(f" Column Names: {', '.join(df.columns)}") + + # Determine export row range + if num_rows is None: + end_row = total_rows + num_rows = total_rows - start_row + else: + end_row = min(start_row + num_rows, total_rows) + num_rows = end_row - start_row + + if start_row >= total_rows: + print(f"โŒ Starting row {start_row} exceeds file range (0-{total_rows-1})") + return False + + print(f"๐Ÿ“ˆ Export Range: Row {start_row} to Row {end_row-1} (Total {num_rows} rows)") + + # Get data for specified range + data_subset = df.iloc[start_row:end_row] + + # Process data + processed_data = [] + for idx, row in data_subset.iterrows(): + row_dict = {} + for col_name, value in row.items(): + if isinstance(value, bytes) and include_vectors: + # Process vector columns + try: + vec_analysis = VectorDeserializer.deserialize_with_analysis(value, col_name) + if vec_analysis and vec_analysis['deserialized']: + if vector_format == "deserialized": + row_dict[col_name] = { + "type": vec_analysis['vector_type'], + "dimension": vec_analysis['dimension'], + "data": vec_analysis['deserialized'] + } + elif vector_format == "hex": + row_dict[col_name] = { + "type": vec_analysis['vector_type'], + "dimension": vec_analysis['dimension'], + "hex": value.hex() + } + elif vector_format == "both": + row_dict[col_name] = { + "type": vec_analysis['vector_type'], + "dimension": vec_analysis['dimension'], + "data": vec_analysis['deserialized'], + "hex": value.hex() + } + else: + row_dict[col_name] = { + "type": "binary", + "size": len(value), + "hex": value.hex() + } + except Exception as e: + row_dict[col_name] = { + "type": "binary", + "size": len(value), + "hex": value.hex(), + "error": str(e) + } + elif isinstance(value, bytes) and not include_vectors: + # When not including vectors, only show basic information + row_dict[col_name] = { + "type": "binary", + "size": len(value), + "hex": value.hex()[:50] + "..." if len(value.hex()) > 50 else value.hex() + } + else: + row_dict[col_name] = value + processed_data.append(row_dict) + + # Prepare output structure + result = { + "export_info": { + "source_file": Path(parquet_file).name, + "total_rows": total_rows, + "exported_rows": len(processed_data), + "start_row": start_row, + "end_row": end_row - 1, + "columns": list(df.columns), + "vector_format": vector_format if include_vectors else "excluded" + }, + "data": processed_data + } + + # Determine output file + if not output_file: + base_name = Path(parquet_file).stem + output_file = f"{base_name}_export_{start_row}-{end_row-1}.json" + + # Save to file + with open(output_file, 'w', encoding='utf-8') as f: + if pretty_print: + json.dump(result, f, ensure_ascii=False, indent=2) + else: + json.dump(result, f, ensure_ascii=False, separators=(',', ':')) + + # Output statistics + file_size = Path(output_file).stat().st_size + print(f"โœ… Export completed!") + print(f"๐Ÿ“ Output file: {output_file}") + print(f"๐Ÿ“Š File size: {file_size:,} bytes ({file_size/1024:.2f} KB)") + print(f"๐Ÿ“ˆ Exported rows: {len(processed_data)}") + + return True + + except Exception as e: + print(f"โŒ Export failed: {e}") + return False + + +def main(): + """Main function""" + parser = argparse.ArgumentParser( + description="Parquet to JSON Export Tool", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Usage Examples: + python export_to_json.py test_large_batch.parquet + python export_to_json.py test_large_batch.parquet --rows 100 --output data.json + python export_to_json.py test_large_batch.parquet --start 1000 --rows 50 + python export_to_json.py test_large_batch.parquet --vector-format hex + """ + ) + + parser.add_argument( + "parquet_file", + help="Parquet file path" + ) + + parser.add_argument( + "--output", "-o", + help="Output JSON file path" + ) + + parser.add_argument( + "--rows", "-r", + type=int, + help="Number of rows to export (default: all)" + ) + + parser.add_argument( + "--start", "-s", + type=int, + default=0, + help="Starting row number (default: 0)" + ) + + parser.add_argument( + "--no-vectors", + action="store_true", + help="Exclude vector data" + ) + + parser.add_argument( + "--vector-format", + choices=["deserialized", "hex", "both"], + default="deserialized", + help="Vector data format (default: deserialized)" + ) + + parser.add_argument( + "--no-pretty", + action="store_true", + help="Don't pretty print JSON output (compressed format)" + ) + + args = parser.parse_args() + + # Check if file exists + if not Path(args.parquet_file).exists(): + print(f"โŒ File does not exist: {args.parquet_file}") + sys.exit(1) + + # Execute export + success = export_parquet_to_json( + parquet_file=args.parquet_file, + output_file=args.output, + num_rows=args.rows, + start_row=args.start, + include_vectors=not args.no_vectors, + vector_format=args.vector_format, + pretty_print=not args.no_pretty + ) + + if not success: + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/cmd/tools/binlogv2/minio_client.py b/cmd/tools/binlogv2/minio_client.py new file mode 100644 index 0000000000..c6a04e711b --- /dev/null +++ b/cmd/tools/binlogv2/minio_client.py @@ -0,0 +1,688 @@ +#!/usr/bin/env python3 +""" +MinIO Client for Parquet Analysis +Downloads files from MinIO and passes them to parquet_analyzer_cli.py for analysis +""" + +import argparse +import sys +import os +import tempfile +import subprocess +from pathlib import Path +from typing import Optional, List, Dict, Any +import json + +try: + from minio import Minio + from minio.error import S3Error +except ImportError: + print("โŒ MinIO client library not found. Please install it:") + print(" pip install minio") + sys.exit(1) + + +class MinioParquetAnalyzer: + """MinIO client for downloading and analyzing Parquet files and Milvus binlog files""" + + def __init__(self, endpoint: str, port: int = 9001, secure: bool = False, + access_key: str = None, secret_key: str = None): + """ + Initialize MinIO client + + Args: + endpoint: MinIO server endpoint (hostname/IP) + port: MinIO server port (default: 9000) + secure: Use HTTPS (default: False) + access_key: MinIO access key (optional, for public buckets can be None) + secret_key: MinIO secret key (optional, for public buckets can be None) + """ + self.endpoint = endpoint + self.port = port + self.secure = secure + self.access_key = access_key + self.secret_key = secret_key + + # Initialize MinIO client + try: + self.client = Minio( + f"{endpoint}:{port}", + access_key=access_key, + secret_key=secret_key, + secure=secure + ) + print(f"โœ… Connected to MinIO server: {endpoint}:{port}") + except Exception as e: + print(f"โŒ Failed to connect to MinIO server: {e}") + sys.exit(1) + + def list_buckets(self) -> List[Dict[str, Any]]: + """List all buckets""" + try: + buckets = [] + for bucket in self.client.list_buckets(): + buckets.append({ + 'name': bucket.name, + 'creation_date': bucket.creation_date.isoformat() if bucket.creation_date else None + }) + return buckets + except S3Error as e: + print(f"โŒ Failed to list buckets: {e}") + return [] + + def list_objects(self, bucket_name: str, prefix: str = "", recursive: bool = True) -> List[Dict[str, Any]]: + """List objects in a bucket""" + try: + objects = [] + for obj in self.client.list_objects(bucket_name, prefix=prefix, recursive=recursive): + objects.append({ + 'name': obj.object_name, + 'size': obj.size, + 'last_modified': obj.last_modified.isoformat() if obj.last_modified else None, + 'etag': obj.etag + }) + return objects + except S3Error as e: + print(f"โŒ Failed to list objects in bucket '{bucket_name}': {e}") + return [] + + def filter_objects(self, objects: List[Dict[str, Any]], + prefix: str = None, suffix: str = None, contains: str = None, + size_min: int = None, size_max: int = None, + date_from: str = None, date_to: str = None) -> List[Dict[str, Any]]: + """ + Filter objects based on various criteria + + Args: + objects: List of objects to filter + prefix: Filter by object name prefix + suffix: Filter by object name suffix + contains: Filter by object name containing string + size_min: Minimum size in MB + size_max: Maximum size in MB + date_from: Filter objects modified after date (YYYY-MM-DD) + date_to: Filter objects modified before date (YYYY-MM-DD) + + Returns: + Filtered list of objects + """ + filtered = objects + + if prefix: + filtered = [obj for obj in filtered if obj['name'].startswith(prefix)] + + if suffix: + filtered = [obj for obj in filtered if obj['name'].endswith(suffix)] + + if contains: + # Support complex logic with parentheses, OR (comma) and AND (&) logic + filtered = self._apply_contains_filter(filtered, contains) + + if size_min is not None: + size_min_bytes = size_min * 1024 * 1024 + filtered = [obj for obj in filtered if obj['size'] >= size_min_bytes] + + if size_max is not None: + size_max_bytes = size_max * 1024 * 1024 + filtered = [obj for obj in filtered if obj['size'] <= size_max_bytes] + + if date_from: + try: + from_date = datetime.datetime.fromisoformat(date_from).date() + filtered = [obj for obj in filtered + if obj['last_modified'] and + datetime.datetime.fromisoformat(obj['last_modified']).date() >= from_date] + except ValueError: + print(f"โš ๏ธ Invalid date format for --filter-date-from: {date_from}") + + if date_to: + try: + to_date = datetime.datetime.fromisoformat(date_to).date() + filtered = [obj for obj in filtered + if obj['last_modified'] and + datetime.datetime.fromisoformat(obj['last_modified']).date() <= to_date] + except ValueError: + print(f"โš ๏ธ Invalid date format for --filter-date-to: {date_to}") + + return filtered + + def _apply_contains_filter(self, objects: List[Dict[str, Any]], contains_expr: str) -> List[Dict[str, Any]]: + """ + Apply complex contains filter with parentheses support + + Args: + objects: List of objects to filter + contains_expr: Complex contains expression with parentheses, OR (comma), and AND (&) logic + + Returns: + Filtered list of objects + """ + def evaluate_expression(expr: str, obj_name: str) -> bool: + """Evaluate a single expression for an object name""" + expr = expr.strip() + + # Handle parentheses first + if '(' in expr and ')' in expr: + # Find the innermost parentheses + start = expr.rfind('(') + end = expr.find(')', start) + if start != -1 and end != -1: + # Extract the content inside parentheses + inner_expr = expr[start+1:end] + # Evaluate the inner expression + inner_result = evaluate_expression(inner_expr, obj_name) + # Replace the parentheses expression with the result + new_expr = expr[:start] + ('true' if inner_result else 'false') + expr[end+1:] + return evaluate_expression(new_expr, obj_name) + + # Handle AND logic (&) + if '&' in expr: + parts = [p.strip() for p in expr.split('&')] + return all(evaluate_expression(part, obj_name) for part in parts) + + # Handle OR logic (,) + if ',' in expr: + parts = [p.strip() for p in expr.split(',')] + return any(evaluate_expression(part, obj_name) for part in parts) + + # Single keyword + return expr in obj_name + + return [obj for obj in objects if evaluate_expression(contains_expr, obj['name'])] + + def download_file(self, bucket_name: str, object_name: str, local_path: str = None) -> Optional[str]: + """ + Download a file from MinIO + + Args: + bucket_name: Name of the bucket + object_name: Name of the object in the bucket + local_path: Local path to save the file (optional, will use temp file if not provided) + + Returns: + Local file path if successful, None otherwise + """ + try: + if not local_path: + # Create temporary file + temp_dir = tempfile.gettempdir() + filename = Path(object_name).name + local_path = os.path.join(temp_dir, f"minio_{filename}") + + print(f"๐Ÿ“ฅ Downloading {object_name} from bucket {bucket_name}...") + self.client.fget_object(bucket_name, object_name, local_path) + print(f"โœ… Downloaded to: {local_path}") + return local_path + + except S3Error as e: + print(f"โŒ Failed to download {object_name}: {e}") + return None + except Exception as e: + print(f"โŒ Unexpected error downloading {object_name}: {e}") + return None + + def analyze_parquet_from_minio(self, bucket_name: str, object_name: str, + command: str = "analyze", output_file: str = None, + rows: int = 10, verbose: bool = False, + id_value: str = None, id_column: str = None) -> bool: + """ + Download Parquet file from MinIO and analyze it using parquet_analyzer_cli.py + + Args: + bucket_name: Name of the bucket + object_name: Name of the object in the bucket + command: Analysis command (analyze, metadata, vector, export, data) + output_file: Output file path for export/data commands + rows: Number of rows to export (for data command) + verbose: Verbose output + + Returns: + True if successful, False otherwise + """ + # Download the file + local_path = self.download_file(bucket_name, object_name) + if not local_path: + return False + + try: + # Build command for parquet_analyzer_cli.py + cli_script = Path(__file__).parent / "parquet_analyzer_cli.py" + if not cli_script.exists(): + print(f"โŒ parquet_analyzer_cli.py not found at: {cli_script}") + return False + + cmd = [sys.executable, str(cli_script), command, local_path] + + # Add optional arguments + if output_file: + cmd.extend(["--output", output_file]) + if rows != 10: + cmd.extend(["--rows", str(rows)]) + if verbose: + cmd.append("--verbose") + if id_value: + cmd.extend(["--id-value", str(id_value)]) + if id_column: + cmd.extend(["--id-column", id_column]) + + print(f"๐Ÿ” Running analysis command: {' '.join(cmd)}") + print("=" * 60) + + # Execute the command + result = subprocess.run(cmd, capture_output=False, text=True) + + if result.returncode == 0: + print("โœ… Analysis completed successfully") + return True + else: + print(f"โŒ Analysis failed with return code: {result.returncode}") + return False + + except Exception as e: + print(f"โŒ Failed to run analysis: {e}") + return False + finally: + # Clean up temporary file if it was created + if local_path and local_path.startswith(tempfile.gettempdir()): + try: + os.remove(local_path) + print(f"๐Ÿงน Cleaned up temporary file: {local_path}") + except: + pass + + def interactive_mode(self): + """Interactive mode for browsing and analyzing files""" + print("๐Ÿ” MinIO Interactive Mode") + print("=" * 40) + + # List buckets + buckets = self.list_buckets() + if not buckets: + print("โŒ No buckets found or access denied") + return + + print(f"๐Ÿ“ฆ Found {len(buckets)} bucket(s):") + for i, bucket in enumerate(buckets): + print(f" {i+1}. {bucket['name']}") + + # Select bucket + while True: + try: + choice = input(f"\nSelect bucket (1-{len(buckets)}) or 'q' to quit: ").strip() + if choice.lower() == 'q': + return + + bucket_idx = int(choice) - 1 + if 0 <= bucket_idx < len(buckets): + selected_bucket = buckets[bucket_idx]['name'] + break + else: + print("โŒ Invalid selection") + except ValueError: + print("โŒ Please enter a valid number") + + # Main interactive loop + while True: + print(f"\n๐Ÿ“ Current bucket: '{selected_bucket}'") + print("=" * 50) + + # List objects in selected bucket + print(f"๐Ÿ“ Objects in bucket '{selected_bucket}':") + objects = self.list_objects(selected_bucket) + + if not objects: + print("โŒ No objects found in this bucket") + return + + # Apply filters if user wants to + print("\n๐Ÿ” Filter options:") + print(" 1. No filter (show all)") + print(" 2. Apply custom filters") + + filter_choice = input("Select filter option (1-2): ").strip() + + if filter_choice == "2": + print("\n๐Ÿ“‹ Available filters:") + print(" - prefix: Filter by object name prefix") + print(" - suffix: Filter by object name suffix") + print(" - contains: Filter by object name containing string(s). Use comma for OR logic, & for AND logic, () for grouping") + print(" - size_min: Filter by minimum size in MB") + print(" - size_max: Filter by maximum size in MB") + print(" - date_from: Filter by modification date (YYYY-MM-DD)") + print(" - date_to: Filter by modification date (YYYY-MM-DD)") + + prefix = input("Prefix filter (or press Enter to skip): ").strip() or None + suffix = input("Suffix filter (or press Enter to skip): ").strip() or None + contains = input("Contains filter (or press Enter to skip): ").strip() or None + + size_min_str = input("Minimum size in MB (or press Enter to skip): ").strip() + size_min = int(size_min_str) if size_min_str else None + + size_max_str = input("Maximum size in MB (or press Enter to skip): ").strip() + size_max = int(size_max_str) if size_max_str else None + + date_from = input("Date from (YYYY-MM-DD, or press Enter to skip): ").strip() or None + date_to = input("Date to (YYYY-MM-DD, or press Enter to skip): ").strip() or None + + original_count = len(objects) + objects = self.filter_objects( + objects, prefix, suffix, contains, size_min, size_max, date_from, date_to + ) + + if original_count != len(objects): + print(f"๐Ÿ” Applied filters: {original_count} โ†’ {len(objects)} objects") + + # Filter Parquet files and Milvus binlog files + # Milvus binlog files don't have .parquet extension but are actually Parquet format + parquet_files = [] + for obj in objects: + name = obj['name'].lower() + # Check for .parquet files + if name.endswith('.parquet'): + parquet_files.append(obj) + # Check for Milvus binlog files (insert_log, delta_log, etc.) + elif any(log_type in name for log_type in ['insert_log', 'delta_log', 'stats_log', 'index_files']): + parquet_files.append(obj) + + if not parquet_files: + print("โŒ No Parquet files or Milvus binlog files found in this bucket") + return + + print(f"๐Ÿ“Š Found {len(parquet_files)} Parquet file(s):") + for i, obj in enumerate(parquet_files): + size_mb = obj['size'] / (1024 * 1024) + modified_str = "" + if obj['last_modified']: + modified_str = f" (modified: {obj['last_modified'][:10]})" + print(f" {i+1}. {obj['name']} ({size_mb:.2f} MB){modified_str}") + + # Select file + while True: + try: + choice = input(f"\nSelect file (1-{len(parquet_files)}) or 'b' to change bucket or 'q' to quit: ").strip() + if choice.lower() == 'q': + return + elif choice.lower() == 'b': + # Go back to bucket selection + break + + file_idx = int(choice) - 1 + if 0 <= file_idx < len(parquet_files): + selected_file = parquet_files[file_idx]['name'] + break + else: + print("โŒ Invalid selection") + except ValueError: + print("โŒ Please enter a valid number") + + if choice.lower() == 'b': + # Re-select bucket + print(f"\n๐Ÿ“ฆ Available buckets:") + for i, bucket in enumerate(buckets): + print(f" {i+1}. {bucket['name']}") + + while True: + try: + choice = input(f"\nSelect bucket (1-{len(buckets)}) or 'q' to quit: ").strip() + if choice.lower() == 'q': + return + + bucket_idx = int(choice) - 1 + if 0 <= bucket_idx < len(buckets): + selected_bucket = buckets[bucket_idx]['name'] + break + else: + print("โŒ Invalid selection") + except ValueError: + print("โŒ Please enter a valid number") + continue + + # File analysis loop + while True: + print(f"\n๐Ÿ“„ Selected file: {selected_file}") + print("-" * 40) + + # Select analysis command + commands = ["analyze", "metadata", "vector", "export", "data", "query"] + print(f"๐Ÿ” Available analysis commands:") + for i, cmd in enumerate(commands): + print(f" {i+1}. {cmd}") + + while True: + try: + choice = input(f"\nSelect command (1-{len(commands)}) or 'f' to change file or 'b' to change bucket or 'q' to quit: ").strip() + if choice.lower() == 'q': + return + elif choice.lower() == 'b': + # Go back to bucket selection + break + elif choice.lower() == 'f': + # Go back to file selection + break + + cmd_idx = int(choice) - 1 + if 0 <= cmd_idx < len(commands): + selected_command = commands[cmd_idx] + break + else: + print("โŒ Invalid selection") + except ValueError: + print("โŒ Please enter a valid number") + + if choice.lower() in ['b', 'f']: + break + + # Additional options + output_file = None + rows = 10 + verbose = False + id_value = None + id_column = None + + if selected_command in ["export", "data"]: + output_choice = input("Enter output file path (or press Enter for default): ").strip() + if output_choice: + output_file = output_choice + + if selected_command == "data": + rows_choice = input("Enter number of rows to export (default: 10): ").strip() + if rows_choice: + try: + rows = int(rows_choice) + except ValueError: + print("โŒ Invalid number, using default: 10") + + if selected_command == "query": + id_value_choice = input("Enter ID value to query (or press Enter to see available ID columns): ").strip() + if id_value_choice: + id_value = id_value_choice + id_column_choice = input("Enter ID column name (or press Enter for auto-detection): ").strip() + if id_column_choice: + id_column = id_column_choice + + verbose_choice = input("Verbose output? (y/N): ").strip().lower() + verbose = verbose_choice in ['y', 'yes'] + + # Run analysis + print(f"\n๐Ÿš€ Starting analysis...") + success = self.analyze_parquet_from_minio( + selected_bucket, selected_file, selected_command, + output_file, rows, verbose, id_value, id_column + ) + + if success: + print("โœ… Analysis completed successfully!") + else: + print("โŒ Analysis failed") + + # Ask if user wants to continue with the same file + continue_choice = input(f"\nContinue with the same file '{selected_file}'? (y/N): ").strip().lower() + if continue_choice not in ['y', 'yes']: + break + + if choice.lower() == 'b': + continue + + +def main(): + """Main function""" + parser = argparse.ArgumentParser( + description="MinIO Client for Parquet Analysis and Milvus Binlog Analysis", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Interactive mode + python minio_client.py --endpoint localhost --interactive + + # Analyze specific Parquet file + python minio_client.py --endpoint localhost --bucket mybucket --object data.parquet --command analyze + + # Analyze Milvus binlog file + python minio_client.py --endpoint localhost --bucket a-bucket --object files/insert_log/459761955352871853/459761955352871854/459761955353071864/0 --command analyze + + # Query by ID + python minio_client.py --endpoint localhost --bucket a-bucket --object files/insert_log/459761955352871853/459761955352871854/459761955353071864/0 --command query --id-value 123 + + # Export metadata + python minio_client.py --endpoint localhost --bucket mybucket --object data.parquet --command export --output result.json + + # List buckets + python minio_client.py --endpoint localhost --list-buckets + + # List objects in bucket + python minio_client.py --endpoint localhost --bucket mybucket --list-objects + + # Filter objects by prefix (insert_log files only) + python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-prefix "files/insert_log/" + + # Filter objects by size (files larger than 1MB) + python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-size-min 1 + + # Filter objects by date range + python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-date-from "2024-01-01" --filter-date-to "2024-01-31" + + # Combine multiple filters + python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-prefix "files/" --filter-size-min 0.5 --filter-contains "insert" + + # Filter with OR logic (files containing 'insert' OR 'delete') + python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-contains "insert,delete" + + # Filter with AND logic (files containing 'insert' AND 'log') + python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-contains "insert&log" + + # Filter with parentheses grouping ((insert OR delete) AND log) + python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-contains "(insert,delete)&log" + + # Complex nested parentheses ((insert OR delete) AND (log OR bin)) + python minio_client.py --endpoint localhost --bucket mybucket --list-objects --filter-contains "(insert,delete)&(log,bin)" + """ + ) + + # Connection arguments + parser.add_argument("--endpoint", "-e", required=True, help="MinIO server endpoint") + parser.add_argument("--port", "-p", type=int, default=9000, help="MinIO server port (default: 9000)") + parser.add_argument("--secure", "-s", action="store_true", help="Use HTTPS") + parser.add_argument("--access-key", "-a", help="MinIO access key") + parser.add_argument("--secret-key", "-k", help="MinIO secret key") + + # Operation arguments + parser.add_argument("--interactive", "-i", action="store_true", help="Interactive mode") + parser.add_argument("--list-buckets", "-b", action="store_true", help="List all buckets") + parser.add_argument("--bucket", help="Bucket name") + parser.add_argument("--list-objects", "-l", action="store_true", help="List objects in bucket") + parser.add_argument("--object", "-o", help="Object name in bucket") + + # Filter arguments + parser.add_argument("--filter-prefix", help="Filter objects by prefix (e.g., 'files/insert_log/')") + parser.add_argument("--filter-suffix", help="Filter objects by suffix (e.g., '.parquet')") + parser.add_argument("--filter-contains", help="Filter objects containing specific string(s). Use comma for OR logic (e.g., 'insert,delete'), use & for AND logic (e.g., 'insert&log'), use () for grouping (e.g., '(insert,delete)&log')") + parser.add_argument("--filter-size-min", type=int, help="Filter objects by minimum size in MB") + parser.add_argument("--filter-size-max", type=int, help="Filter objects by maximum size in MB") + parser.add_argument("--filter-date-from", help="Filter objects modified after date (YYYY-MM-DD)") + parser.add_argument("--filter-date-to", help="Filter objects modified before date (YYYY-MM-DD)") + + # Analysis arguments + parser.add_argument("--command", "-c", choices=["analyze", "metadata", "vector", "export", "data", "query"], + default="analyze", help="Analysis command (default: analyze)") + parser.add_argument("--output", help="Output file path (for export/data commands)") + parser.add_argument("--rows", "-r", type=int, default=10, help="Number of rows to export (for data command)") + parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") + parser.add_argument("--id-value", "-q", help="ID value to query (for query command)") + parser.add_argument("--id-column", help="ID column name (for query command, auto-detected if not specified)") + + args = parser.parse_args() + + # Initialize MinIO client + client = MinioParquetAnalyzer( + endpoint=args.endpoint, + port=args.port, + secure=args.secure, + access_key=args.access_key, + secret_key=args.secret_key + ) + + # Execute requested operation + if args.interactive: + client.interactive_mode() + elif args.list_buckets: + buckets = client.list_buckets() + if buckets: + print(f"๐Ÿ“ฆ Found {len(buckets)} bucket(s):") + for bucket in buckets: + print(f" - {bucket['name']}") + else: + print("โŒ No buckets found or access denied") + elif args.list_objects: + if not args.bucket: + print("โŒ --bucket is required for --list-objects") + sys.exit(1) + + objects = client.list_objects(args.bucket) + if objects: + # Apply filters if specified + original_count = len(objects) + objects = client.filter_objects( + objects, + prefix=args.filter_prefix, + suffix=args.filter_suffix, + contains=args.filter_contains, + size_min=args.filter_size_min, + size_max=args.filter_size_max, + date_from=args.filter_date_from, + date_to=args.filter_date_to + ) + + if original_count != len(objects): + print(f"๐Ÿ” Applied filters: {original_count} โ†’ {len(objects)} objects") + + if objects: + print(f"๐Ÿ“ Found {len(objects)} object(s) in bucket '{args.bucket}':") + for obj in objects: + size_mb = obj['size'] / (1024 * 1024) + modified_str = "" + if obj['last_modified']: + modified_str = f" (modified: {obj['last_modified'][:10]})" + print(f" - {obj['name']} ({size_mb:.2f} MB){modified_str}") + else: + print("โŒ No objects match the specified filters") + else: + print("โŒ No objects found or access denied") + elif args.object: + if not args.bucket: + print("โŒ --bucket is required when specifying --object") + sys.exit(1) + + success = client.analyze_parquet_from_minio( + args.bucket, args.object, args.command, + args.output, args.rows, args.verbose, args.id_value, args.id_column + ) + + if not success: + sys.exit(1) + else: + print("โŒ No operation specified. Use --interactive, --list-buckets, --list-objects, or specify --object") + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/cmd/tools/binlogv2/minio_parquet_analyzer.py b/cmd/tools/binlogv2/minio_parquet_analyzer.py new file mode 100644 index 0000000000..c0b7022a77 --- /dev/null +++ b/cmd/tools/binlogv2/minio_parquet_analyzer.py @@ -0,0 +1,804 @@ +#!/usr/bin/env python3 +""" +MinIO Parquet Analyzer - Integrated Tool +Combines MinIO client with parquet_analyzer_cli.py for seamless analysis +""" + +import argparse +import sys +import os +import tempfile +import subprocess +import json +from pathlib import Path +from typing import Optional, List, Dict, Any +from datetime import datetime + +try: + from minio import Minio + from minio.error import S3Error +except ImportError: + print("โŒ MinIO client library not found. Please install it:") + print(" pip install minio") + sys.exit(1) + + +class MinioParquetAnalyzer: + """Integrated MinIO and Parquet Analyzer""" + + def __init__(self, endpoint: str, port: int = 9000, secure: bool = False, + access_key: str = None, secret_key: str = None): + """Initialize the integrated analyzer""" + self.endpoint = endpoint + self.port = port + self.secure = secure + self.access_key = access_key + self.secret_key = secret_key + + # Initialize MinIO client + try: + self.client = Minio( + f"{endpoint}:{port}", + access_key=access_key, + secret_key=secret_key, + secure=secure + ) + print(f"โœ… Connected to MinIO server: {endpoint}:{port}") + except Exception as e: + print(f"โŒ Failed to connect to MinIO server: {e}") + sys.exit(1) + + # Check if parquet_analyzer_cli.py exists + self.cli_script = Path(__file__).parent / "parquet_analyzer_cli.py" + if not self.cli_script.exists(): + print(f"โŒ parquet_analyzer_cli.py not found at: {self.cli_script}") + sys.exit(1) + + def list_buckets(self) -> List[Dict[str, Any]]: + """List all buckets""" + try: + buckets = [] + for bucket in self.client.list_buckets(): + buckets.append({ + 'name': bucket.name, + 'creation_date': bucket.creation_date.isoformat() if bucket.creation_date else None + }) + return buckets + except S3Error as e: + print(f"โŒ Failed to list buckets: {e}") + return [] + + def filter_objects(self, objects: List[Dict[str, Any]], + prefix: str = None, suffix: str = None, contains: str = None, + size_min: int = None, size_max: int = None, + date_from: str = None, date_to: str = None) -> List[Dict[str, Any]]: + """ + Filter objects based on various criteria + + Args: + objects: List of objects to filter + prefix: Filter by object name prefix + suffix: Filter by object name suffix + contains: Filter by object name containing string + size_min: Minimum size in MB + size_max: Maximum size in MB + date_from: Filter objects modified after date (YYYY-MM-DD) + date_to: Filter objects modified before date (YYYY-MM-DD) + + Returns: + Filtered list of objects + """ + filtered = objects + + if prefix: + filtered = [obj for obj in filtered if obj['name'].startswith(prefix)] + + if suffix: + filtered = [obj for obj in filtered if obj['name'].endswith(suffix)] + + if contains: + # Support complex logic with parentheses, OR (comma) and AND (&) logic + filtered = self._apply_contains_filter(filtered, contains) + + if size_min is not None: + size_min_bytes = size_min * 1024 * 1024 + filtered = [obj for obj in filtered if obj['size'] >= size_min_bytes] + + if size_max is not None: + size_max_bytes = size_max * 1024 * 1024 + filtered = [obj for obj in filtered if obj['size'] <= size_max_bytes] + + if date_from: + try: + from_date = datetime.datetime.fromisoformat(date_from).date() + filtered = [obj for obj in filtered + if obj['last_modified'] and + datetime.datetime.fromisoformat(obj['last_modified']).date() >= from_date] + except ValueError: + print(f"โš ๏ธ Invalid date format for --filter-date-from: {date_from}") + + if date_to: + try: + to_date = datetime.datetime.fromisoformat(date_to).date() + filtered = [obj for obj in filtered + if obj['last_modified'] and + datetime.datetime.fromisoformat(obj['last_modified']).date() <= to_date] + except ValueError: + print(f"โš ๏ธ Invalid date format for --filter-date-to: {date_to}") + + return filtered + + def _apply_contains_filter(self, objects: List[Dict[str, Any]], contains_expr: str) -> List[Dict[str, Any]]: + """ + Apply complex contains filter with parentheses support + + Args: + objects: List of objects to filter + contains_expr: Complex contains expression with parentheses, OR (comma), and AND (&) logic + + Returns: + Filtered list of objects + """ + def evaluate_expression(expr: str, obj_name: str) -> bool: + """Evaluate a single expression for an object name""" + expr = expr.strip() + + # Handle parentheses first + if '(' in expr and ')' in expr: + # Find the innermost parentheses + start = expr.rfind('(') + end = expr.find(')', start) + if start != -1 and end != -1: + # Extract the content inside parentheses + inner_expr = expr[start+1:end] + # Evaluate the inner expression + inner_result = evaluate_expression(inner_expr, obj_name) + # Replace the parentheses expression with the result + new_expr = expr[:start] + ('true' if inner_result else 'false') + expr[end+1:] + return evaluate_expression(new_expr, obj_name) + + # Handle AND logic (&) + if '&' in expr: + parts = [p.strip() for p in expr.split('&')] + return all(evaluate_expression(part, obj_name) for part in parts) + + # Handle OR logic (,) + if ',' in expr: + parts = [p.strip() for p in expr.split(',')] + return any(evaluate_expression(part, obj_name) for part in parts) + + # Single keyword + return expr in obj_name + + return [obj for obj in objects if evaluate_expression(contains_expr, obj['name'])] + + def list_parquet_files(self, bucket_name: str, prefix: str = "") -> List[Dict[str, Any]]: + """List Parquet files in a bucket""" + try: + parquet_files = [] + for obj in self.client.list_objects(bucket_name, prefix=prefix, recursive=True): + if obj.object_name.lower().endswith('.parquet'): + parquet_files.append({ + 'name': obj.object_name, + 'size': obj.size, + 'size_mb': obj.size / (1024 * 1024), + 'last_modified': obj.last_modified.isoformat() if obj.last_modified else None, + 'etag': obj.etag + }) + return parquet_files + except S3Error as e: + print(f"โŒ Failed to list objects in bucket '{bucket_name}': {e}") + return [] + + def download_and_analyze(self, bucket_name: str, object_name: str, + command: str = "analyze", output_file: str = None, + rows: int = 10, verbose: bool = False, + keep_local: bool = False) -> Dict[str, Any]: + """ + Download Parquet file from MinIO and analyze it + + Returns: + Dictionary with analysis results and metadata + """ + result = { + 'success': False, + 'bucket': bucket_name, + 'object': object_name, + 'command': command, + 'local_path': None, + 'analysis_output': None, + 'error': None, + 'timestamp': datetime.now().isoformat() + } + + # Download the file + try: + local_path = self._download_file(bucket_name, object_name, keep_local) + if not local_path: + result['error'] = "Failed to download file" + return result + + result['local_path'] = local_path + + # Run analysis + analysis_result = self._run_analysis(local_path, command, output_file, rows, verbose) + result['analysis_output'] = analysis_result + + if analysis_result['success']: + result['success'] = True + else: + result['error'] = analysis_result['error'] + + except Exception as e: + result['error'] = str(e) + + return result + + def _download_file(self, bucket_name: str, object_name: str, keep_local: bool = False) -> Optional[str]: + """Download file from MinIO""" + try: + if keep_local: + # Save to current directory + local_path = Path(object_name).name + else: + # Create temporary file + temp_dir = tempfile.gettempdir() + filename = Path(object_name).name + local_path = os.path.join(temp_dir, f"minio_{filename}") + + print(f"๐Ÿ“ฅ Downloading {object_name} from bucket {bucket_name}...") + self.client.fget_object(bucket_name, object_name, local_path) + print(f"โœ… Downloaded to: {local_path}") + return local_path + + except S3Error as e: + print(f"โŒ Failed to download {object_name}: {e}") + return None + except Exception as e: + print(f"โŒ Unexpected error downloading {object_name}: {e}") + return None + + def _run_analysis(self, local_path: str, command: str, output_file: str = None, + rows: int = 10, verbose: bool = False) -> Dict[str, Any]: + """Run parquet_analyzer_cli.py on local file""" + result = { + 'success': False, + 'command': command, + 'output_file': output_file, + 'error': None + } + + try: + # Build command + cmd = [sys.executable, str(self.cli_script), command, local_path] + + # Add optional arguments + if output_file: + cmd.extend(["--output", output_file]) + if rows != 10: + cmd.extend(["--rows", str(rows)]) + if verbose: + cmd.append("--verbose") + + print(f"๐Ÿ” Running analysis: {' '.join(cmd)}") + print("=" * 60) + + # Execute the command + process = subprocess.run(cmd, capture_output=True, text=True) + + result['return_code'] = process.returncode + result['stdout'] = process.stdout + result['stderr'] = process.stderr + + if process.returncode == 0: + result['success'] = True + print("โœ… Analysis completed successfully") + else: + result['error'] = f"Analysis failed with return code: {process.returncode}" + print(f"โŒ {result['error']}") + if process.stderr: + print(f"Error output: {process.stderr}") + + except Exception as e: + result['error'] = str(e) + print(f"โŒ Failed to run analysis: {e}") + + return result + + def batch_analyze(self, bucket_name: str, prefix: str = "", command: str = "analyze", + output_dir: str = None, verbose: bool = False) -> List[Dict[str, Any]]: + """ + Analyze multiple Parquet files in a bucket + + Args: + bucket_name: Name of the bucket + prefix: Prefix to filter files + command: Analysis command + output_dir: Directory to save output files + verbose: Verbose output + + Returns: + List of analysis results + """ + print(f"๐Ÿ” Batch analyzing Parquet files in bucket '{bucket_name}'") + if prefix: + print(f"๐Ÿ“ Filtering by prefix: '{prefix}'") + + # List Parquet files + parquet_files = self.list_parquet_files(bucket_name, prefix) + + if not parquet_files: + print("โŒ No Parquet files found") + return [] + + print(f"๐Ÿ“Š Found {len(parquet_files)} Parquet file(s)") + + # Create output directory if specified + if output_dir: + os.makedirs(output_dir, exist_ok=True) + print(f"๐Ÿ“ Output directory: {output_dir}") + + results = [] + + for i, file_info in enumerate(parquet_files, 1): + print(f"\n{'='*60}") + print(f"๐Ÿ“Š Processing file {i}/{len(parquet_files)}: {file_info['name']}") + print(f"๐Ÿ“ Size: {file_info['size_mb']:.2f} MB") + + # Determine output file + output_file = None + if output_dir: + base_name = Path(file_info['name']).stem + output_file = os.path.join(output_dir, f"{base_name}_{command}_result.json") + + # Analyze the file + result = self.download_and_analyze( + bucket_name, file_info['name'], command, output_file, verbose=verbose + ) + + results.append(result) + + # Clean up temporary file + if result['local_path'] and result['local_path'].startswith(tempfile.gettempdir()): + try: + os.remove(result['local_path']) + except: + pass + + # Print summary + successful = sum(1 for r in results if r['success']) + failed = len(results) - successful + + print(f"\n{'='*60}") + print(f"๐Ÿ“Š Batch Analysis Summary:") + print(f" Total files: {len(results)}") + print(f" Successful: {successful}") + print(f" Failed: {failed}") + + return results + + def interactive_mode(self): + """Interactive mode for browsing and analyzing files""" + print("๐Ÿ” MinIO Parquet Analyzer - Interactive Mode") + print("=" * 50) + + # List buckets + buckets = self.list_buckets() + if not buckets: + print("โŒ No buckets found or access denied") + return + + print(f"๐Ÿ“ฆ Found {len(buckets)} bucket(s):") + for i, bucket in enumerate(buckets): + print(f" {i+1}. {bucket['name']}") + + # Select bucket + while True: + try: + choice = input(f"\nSelect bucket (1-{len(buckets)}) or 'q' to quit: ").strip() + if choice.lower() == 'q': + return + + bucket_idx = int(choice) - 1 + if 0 <= bucket_idx < len(buckets): + selected_bucket = buckets[bucket_idx]['name'] + break + else: + print("โŒ Invalid selection") + except ValueError: + print("โŒ Please enter a valid number") + + # List Parquet files in selected bucket + print(f"\n๐Ÿ“ Parquet files in bucket '{selected_bucket}':") + parquet_files = self.list_parquet_files(selected_bucket) + + if not parquet_files: + print("โŒ No Parquet files found in this bucket") + return + + print(f"๐Ÿ“Š Found {len(parquet_files)} Parquet file(s):") + for i, obj in enumerate(parquet_files): + print(f" {i+1}. {obj['name']} ({obj['size_mb']:.2f} MB)") + + # Select operation mode + print(f"\n๐Ÿ” Operation modes:") + print(" 1. Analyze single file") + print(" 2. Batch analyze all files") + print(" 3. Select specific files") + + while True: + try: + mode_choice = input(f"\nSelect mode (1-3) or 'q' to quit: ").strip() + if mode_choice.lower() == 'q': + return + + mode = int(mode_choice) + if mode in [1, 2, 3]: + break + else: + print("โŒ Invalid selection") + except ValueError: + print("โŒ Please enter a valid number") + + if mode == 1: + # Single file analysis + self._interactive_single_file(selected_bucket, parquet_files) + elif mode == 2: + # Batch analysis + self._interactive_batch_analysis(selected_bucket, parquet_files) + elif mode == 3: + # Select specific files + self._interactive_select_files(selected_bucket, parquet_files) + + def _interactive_single_file(self, bucket_name: str, parquet_files: List[Dict[str, Any]]): + """Interactive single file analysis""" + # Select file + while True: + try: + choice = input(f"\nSelect file (1-{len(parquet_files)}) or 'q' to quit: ").strip() + if choice.lower() == 'q': + return + + file_idx = int(choice) - 1 + if 0 <= file_idx < len(parquet_files): + selected_file = parquet_files[file_idx]['name'] + break + else: + print("โŒ Invalid selection") + except ValueError: + print("โŒ Please enter a valid number") + + # Select analysis command + commands = ["analyze", "metadata", "vector", "export", "data"] + print(f"\n๐Ÿ” Available analysis commands:") + for i, cmd in enumerate(commands): + print(f" {i+1}. {cmd}") + + while True: + try: + choice = input(f"\nSelect command (1-{len(commands)}) or 'q' to quit: ").strip() + if choice.lower() == 'q': + return + + cmd_idx = int(choice) - 1 + if 0 <= cmd_idx < len(commands): + selected_command = commands[cmd_idx] + break + else: + print("โŒ Invalid selection") + except ValueError: + print("โŒ Please enter a valid number") + + # Additional options + output_file = None + rows = 10 + verbose = False + + if selected_command in ["export", "data"]: + output_choice = input("Enter output file path (or press Enter for default): ").strip() + if output_choice: + output_file = output_choice + + if selected_command == "data": + rows_choice = input("Enter number of rows to export (default: 10): ").strip() + if rows_choice: + try: + rows = int(rows_choice) + except ValueError: + print("โŒ Invalid number, using default: 10") + + verbose_choice = input("Verbose output? (y/N): ").strip().lower() + verbose = verbose_choice in ['y', 'yes'] + + # Run analysis + print(f"\n๐Ÿš€ Starting analysis...") + result = self.download_and_analyze( + bucket_name, selected_file, selected_command, + output_file, rows, verbose + ) + + if result['success']: + print("โœ… Analysis completed successfully!") + else: + print(f"โŒ Analysis failed: {result['error']}") + + def _interactive_batch_analysis(self, bucket_name: str, parquet_files: List[Dict[str, Any]]): + """Interactive batch analysis""" + print(f"\n๐Ÿ“Š Batch analysis for {len(parquet_files)} files") + + # Select command + commands = ["analyze", "metadata", "vector", "export", "data"] + print(f"\n๐Ÿ” Available analysis commands:") + for i, cmd in enumerate(commands): + print(f" {i+1}. {cmd}") + + while True: + try: + choice = input(f"\nSelect command (1-{len(commands)}) or 'q' to quit: ").strip() + if choice.lower() == 'q': + return + + cmd_idx = int(choice) - 1 + if 0 <= cmd_idx < len(commands): + selected_command = commands[cmd_idx] + break + else: + print("โŒ Invalid selection") + except ValueError: + print("โŒ Please enter a valid number") + + # Output directory + output_dir = input("Enter output directory (or press Enter for current dir): ").strip() + if not output_dir: + output_dir = "." + + verbose_choice = input("Verbose output? (y/N): ").strip().lower() + verbose = verbose_choice in ['y', 'yes'] + + # Run batch analysis + print(f"\n๐Ÿš€ Starting batch analysis...") + results = self.batch_analyze(bucket_name, "", selected_command, output_dir, verbose) + + print(f"โœ… Batch analysis completed!") + + def _interactive_select_files(self, bucket_name: str, parquet_files: List[Dict[str, Any]]): + """Interactive select specific files""" + print(f"\n๐Ÿ“‹ Select files to analyze (comma-separated numbers, e.g., 1,3,5)") + print(f"Available files:") + for i, obj in enumerate(parquet_files): + print(f" {i+1}. {obj['name']} ({obj['size_mb']:.2f} MB)") + + while True: + try: + choice = input(f"\nEnter file numbers (1-{len(parquet_files)}) or 'q' to quit: ").strip() + if choice.lower() == 'q': + return + + selected_indices = [int(x.strip()) - 1 for x in choice.split(',')] + if all(0 <= idx < len(parquet_files) for idx in selected_indices): + selected_files = [parquet_files[idx]['name'] for idx in selected_indices] + break + else: + print("โŒ Invalid selection") + except ValueError: + print("โŒ Please enter valid numbers") + + # Select command + commands = ["analyze", "metadata", "vector", "export", "data"] + print(f"\n๐Ÿ” Available analysis commands:") + for i, cmd in enumerate(commands): + print(f" {i+1}. {cmd}") + + while True: + try: + choice = input(f"\nSelect command (1-{len(commands)}) or 'q' to quit: ").strip() + if choice.lower() == 'q': + return + + cmd_idx = int(choice) - 1 + if 0 <= cmd_idx < len(commands): + selected_command = commands[cmd_idx] + break + else: + print("โŒ Invalid selection") + except ValueError: + print("โŒ Please enter a valid number") + + # Additional options + output_dir = input("Enter output directory (or press Enter for current dir): ").strip() + if not output_dir: + output_dir = "." + + verbose_choice = input("Verbose output? (y/N): ").strip().lower() + verbose = verbose_choice in ['y', 'yes'] + + # Run analysis for selected files + print(f"\n๐Ÿš€ Starting analysis for {len(selected_files)} selected files...") + + results = [] + for i, file_name in enumerate(selected_files, 1): + print(f"\n๐Ÿ“Š Processing file {i}/{len(selected_files)}: {file_name}") + + # Determine output file + output_file = None + if output_dir: + base_name = Path(file_name).stem + output_file = os.path.join(output_dir, f"{base_name}_{selected_command}_result.json") + + result = self.download_and_analyze( + bucket_name, file_name, selected_command, output_file, verbose=verbose + ) + results.append(result) + + # Print summary + successful = sum(1 for r in results if r['success']) + print(f"\nโœ… Analysis completed! {successful}/{len(results)} files processed successfully.") + + +def main(): + """Main function""" + parser = argparse.ArgumentParser( + description="MinIO Parquet Analyzer - Integrated Tool", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Interactive mode + python minio_parquet_analyzer.py --endpoint localhost --interactive + + # Analyze specific file + python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --object data.parquet --command analyze + + # Batch analyze all Parquet files + python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --batch --command metadata --output-dir results + + # List buckets + python minio_parquet_analyzer.py --endpoint localhost --list-buckets + + # List Parquet files in bucket + python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files + + # Filter files by prefix (insert_log files only) + python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-prefix "files/insert_log/" + + # Filter files by size (files larger than 1MB) + python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-size-min 1 + + # Filter files by date range + python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-date-from "2024-01-01" --filter-date-to "2024-01-31" + + # Combine multiple filters + python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-prefix "files/" --filter-size-min 0.5 --filter-contains "insert" + + # Filter with OR logic (files containing 'insert' OR 'delete') + python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-contains "insert,delete" + + # Filter with AND logic (files containing 'insert' AND 'log') + python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-contains "insert&log" + + # Filter with parentheses grouping ((insert OR delete) AND log) + python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-contains "(insert,delete)&log" + + # Complex nested parentheses ((insert OR delete) AND (log OR bin)) + python minio_parquet_analyzer.py --endpoint localhost --bucket mybucket --list-files --filter-contains "(insert,delete)&(log,bin)" + """ + ) + + # Connection arguments + parser.add_argument("--endpoint", "-e", required=True, help="MinIO server endpoint") + parser.add_argument("--port", "-p", type=int, default=9000, help="MinIO server port (default: 9000)") + parser.add_argument("--secure", "-s", action="store_true", help="Use HTTPS") + parser.add_argument("--access-key", "-a", help="MinIO access key") + parser.add_argument("--secret-key", "-k", help="MinIO secret key") + + # Operation arguments + parser.add_argument("--interactive", "-i", action="store_true", help="Interactive mode") + parser.add_argument("--list-buckets", "-b", action="store_true", help="List all buckets") + parser.add_argument("--bucket", help="Bucket name") + parser.add_argument("--list-files", "-l", action="store_true", help="List Parquet files in bucket") + parser.add_argument("--object", "-o", help="Object name in bucket") + parser.add_argument("--batch", action="store_true", help="Batch analyze all Parquet files in bucket") + + # Filter arguments + parser.add_argument("--filter-prefix", help="Filter objects by prefix (e.g., 'files/insert_log/')") + parser.add_argument("--filter-suffix", help="Filter objects by suffix (e.g., '.parquet')") + parser.add_argument("--filter-contains", help="Filter objects containing specific string(s). Use comma for OR logic (e.g., 'insert,delete'), use & for AND logic (e.g., 'insert&log'), use () for grouping (e.g., '(insert,delete)&log')") + parser.add_argument("--filter-size-min", type=int, help="Filter objects by minimum size in MB") + parser.add_argument("--filter-size-max", type=int, help="Filter objects by maximum size in MB") + parser.add_argument("--filter-date-from", help="Filter objects modified after date (YYYY-MM-DD)") + parser.add_argument("--filter-date-to", help="Filter objects modified before date (YYYY-MM-DD)") + + # Analysis arguments + parser.add_argument("--command", "-c", choices=["analyze", "metadata", "vector", "export", "data"], + default="analyze", help="Analysis command (default: analyze)") + parser.add_argument("--output", help="Output file path (for single file analysis)") + parser.add_argument("--output-dir", help="Output directory (for batch analysis)") + parser.add_argument("--rows", "-r", type=int, default=10, help="Number of rows to export (for data command)") + parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") + parser.add_argument("--keep-local", action="store_true", help="Keep downloaded files locally") + + args = parser.parse_args() + + # Initialize analyzer + analyzer = MinioParquetAnalyzer( + endpoint=args.endpoint, + port=args.port, + secure=args.secure, + access_key=args.access_key, + secret_key=args.secret_key + ) + + # Execute requested operation + if args.interactive: + analyzer.interactive_mode() + elif args.list_buckets: + buckets = analyzer.list_buckets() + if buckets: + print(f"๐Ÿ“ฆ Found {len(buckets)} bucket(s):") + for bucket in buckets: + print(f" - {bucket['name']}") + else: + print("โŒ No buckets found or access denied") + elif args.list_files: + if not args.bucket: + print("โŒ --bucket is required for --list-files") + sys.exit(1) + + files = analyzer.list_parquet_files(args.bucket) + if files: + # Apply filters if specified + original_count = len(files) + files = analyzer.filter_objects( + files, + prefix=args.filter_prefix, + suffix=args.filter_suffix, + contains=args.filter_contains, + size_min=args.filter_size_min, + size_max=args.filter_size_max, + date_from=args.filter_date_from, + date_to=args.filter_date_to + ) + + if original_count != len(files): + print(f"๐Ÿ” Applied filters: {original_count} โ†’ {len(files)} files") + + if files: + print(f"๐Ÿ“Š Found {len(files)} Parquet file(s) in bucket '{args.bucket}':") + for obj in files: + modified_str = "" + if obj.get('last_modified'): + modified_str = f" (modified: {obj['last_modified'][:10]})" + print(f" - {obj['name']} ({obj['size_mb']:.2f} MB){modified_str}") + else: + print("โŒ No files match the specified filters") + else: + print("โŒ No Parquet files found or access denied") + elif args.batch: + if not args.bucket: + print("โŒ --bucket is required for --batch") + sys.exit(1) + + results = analyzer.batch_analyze( + args.bucket, "", args.command, args.output_dir, args.verbose + ) + + if not results: + sys.exit(1) + elif args.object: + if not args.bucket: + print("โŒ --bucket is required when specifying --object") + sys.exit(1) + + result = analyzer.download_and_analyze( + args.bucket, args.object, args.command, + args.output, args.rows, args.verbose, args.keep_local + ) + + if not result['success']: + sys.exit(1) + else: + print("โŒ No operation specified. Use --interactive, --list-buckets, --list-files, --batch, or specify --object") + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/cmd/tools/binlogv2/parquet_analyzer/__init__.py b/cmd/tools/binlogv2/parquet_analyzer/__init__.py new file mode 100644 index 0000000000..4732fada08 --- /dev/null +++ b/cmd/tools/binlogv2/parquet_analyzer/__init__.py @@ -0,0 +1,11 @@ +""" +Parquet Analyzer Package +A toolkit for analyzing parquet files, including metadata parsing and vector deserialization functionality +""" + +from .meta_parser import ParquetMetaParser +from .vector_deserializer import VectorDeserializer +from .analyzer import ParquetAnalyzer + +__version__ = "1.0.0" +__all__ = ["ParquetMetaParser", "VectorDeserializer", "ParquetAnalyzer"] \ No newline at end of file diff --git a/cmd/tools/binlogv2/parquet_analyzer/analyzer.py b/cmd/tools/binlogv2/parquet_analyzer/analyzer.py new file mode 100644 index 0000000000..63f910ea48 --- /dev/null +++ b/cmd/tools/binlogv2/parquet_analyzer/analyzer.py @@ -0,0 +1,494 @@ +""" +Parquet Analyzer Main Component +Main analyzer that integrates metadata parsing and vector deserialization functionality +""" + +import json +from pathlib import Path +from typing import Dict, List, Any, Optional + +from .meta_parser import ParquetMetaParser +from .vector_deserializer import VectorDeserializer + + +class ParquetAnalyzer: + """Main Parquet file analyzer class""" + + def __init__(self, file_path: str): + """ + Initialize analyzer + + Args: + file_path: parquet file path + """ + self.file_path = Path(file_path) + self.meta_parser = ParquetMetaParser(file_path) + self.vector_deserializer = VectorDeserializer() + + def load(self) -> bool: + """ + Load parquet file + + Returns: + bool: whether loading was successful + """ + return self.meta_parser.load() + + def analyze_metadata(self) -> Dict[str, Any]: + """ + Analyze metadata information + + Returns: + Dict: metadata analysis results + """ + if not self.meta_parser.metadata: + return {} + + return { + "basic_info": self.meta_parser.get_basic_info(), + "file_metadata": self.meta_parser.get_file_metadata(), + "schema_metadata": self.meta_parser.get_schema_metadata(), + "column_statistics": self.meta_parser.get_column_statistics(), + "row_group_info": self.meta_parser.get_row_group_info(), + "metadata_summary": self.meta_parser.get_metadata_summary() + } + + def analyze_vectors(self) -> List[Dict[str, Any]]: + """ + Analyze vector data + + Returns: + List: vector analysis results list + """ + if not self.meta_parser.metadata: + return [] + + vector_analysis = [] + column_stats = self.meta_parser.get_column_statistics() + + for col_stats in column_stats: + if "statistics" in col_stats and col_stats["statistics"]: + stats = col_stats["statistics"] + col_name = col_stats["column_name"] + + # Check if there's binary data (vector) + if "min" in stats: + min_value = stats["min"] + if isinstance(min_value, bytes): + min_analysis = VectorDeserializer.deserialize_with_analysis( + min_value, col_name + ) + if min_analysis: + vector_analysis.append({ + "column_name": col_name, + "stat_type": "min", + "analysis": min_analysis + }) + elif isinstance(min_value, str) and len(min_value) > 32: + # May be hex string, try to convert back to bytes + try: + min_bytes = bytes.fromhex(min_value) + min_analysis = VectorDeserializer.deserialize_with_analysis( + min_bytes, col_name + ) + if min_analysis: + vector_analysis.append({ + "column_name": col_name, + "stat_type": "min", + "analysis": min_analysis + }) + except ValueError: + pass + + if "max" in stats: + max_value = stats["max"] + if isinstance(max_value, bytes): + max_analysis = VectorDeserializer.deserialize_with_analysis( + max_value, col_name + ) + if max_analysis: + vector_analysis.append({ + "column_name": col_name, + "stat_type": "max", + "analysis": max_analysis + }) + elif isinstance(max_value, str) and len(max_value) > 32: + # May be hex string, try to convert back to bytes + try: + max_bytes = bytes.fromhex(max_value) + max_analysis = VectorDeserializer.deserialize_with_analysis( + max_bytes, col_name + ) + if max_analysis: + vector_analysis.append({ + "column_name": col_name, + "stat_type": "max", + "analysis": max_analysis + }) + except ValueError: + pass + + return vector_analysis + + def analyze(self) -> Dict[str, Any]: + """ + Complete parquet file analysis + + Returns: + Dict: complete analysis results + """ + if not self.load(): + return {} + + return { + "metadata": self.analyze_metadata(), + "vectors": self.analyze_vectors() + } + + def export_analysis(self, output_file: Optional[str] = None) -> str: + """ + Export analysis results + + Args: + output_file: output file path, if None will auto-generate + + Returns: + str: output file path + """ + if output_file is None: + output_file = f"{self.file_path.stem}_analysis.json" + + analysis_result = self.analyze() + + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(analysis_result, f, indent=2, ensure_ascii=False) + + return output_file + + def print_summary(self): + """Print analysis summary""" + if not self.meta_parser.metadata: + print("โŒ No parquet file loaded") + return + + # Print metadata summary + self.meta_parser.print_summary() + + # Print vector analysis summary + vector_analysis = self.analyze_vectors() + if vector_analysis: + print(f"\n๐Ÿ” Vector Analysis Summary:") + print("=" * 60) + for vec_analysis in vector_analysis: + col_name = vec_analysis["column_name"] + stat_type = vec_analysis["stat_type"] + analysis = vec_analysis["analysis"] + + print(f" Column: {col_name} ({stat_type})") + print(f" Vector Type: {analysis['vector_type']}") + print(f" Dimension: {analysis['dimension']}") + + if "statistics" in analysis and analysis["statistics"]: + stats = analysis["statistics"] + print(f" Min: {stats.get('min', 'N/A')}") + print(f" Max: {stats.get('max', 'N/A')}") + print(f" Mean: {stats.get('mean', 'N/A')}") + print(f" Std: {stats.get('std', 'N/A')}") + + if analysis["vector_type"] == "BinaryVector" and "statistics" in analysis: + stats = analysis["statistics"] + print(f" Zero Count: {stats.get('zero_count', 'N/A')}") + print(f" One Count: {stats.get('one_count', 'N/A')}") + + print() + + def get_vector_samples(self, column_name: str, sample_count: int = 5) -> List[Dict[str, Any]]: + """ + Get vector sample data + + Args: + column_name: column name + sample_count: number of samples + + Returns: + List: vector sample list + """ + # This can be extended to read samples from actual data + # Currently returns min/max from statistics as samples + vector_analysis = self.analyze_vectors() + samples = [] + + for vec_analysis in vector_analysis: + if vec_analysis["column_name"] == column_name: + analysis = vec_analysis["analysis"] + samples.append({ + "type": vec_analysis["stat_type"], + "vector_type": analysis["vector_type"], + "dimension": analysis["dimension"], + "data": analysis["deserialized"][:sample_count] if analysis["deserialized"] else [], + "statistics": analysis.get("statistics", {}) + }) + + return samples + + def compare_vectors(self, column_name: str) -> Dict[str, Any]: + """ + Compare different vector statistics for the same column + + Args: + column_name: column name + + Returns: + Dict: comparison results + """ + vector_analysis = self.analyze_vectors() + column_vectors = [v for v in vector_analysis if v["column_name"] == column_name] + + if len(column_vectors) < 2: + return {} + + comparison = { + "column_name": column_name, + "vector_count": len(column_vectors), + "comparison": {} + } + + for vec_analysis in column_vectors: + stat_type = vec_analysis["stat_type"] + analysis = vec_analysis["analysis"] + + comparison["comparison"][stat_type] = { + "vector_type": analysis["vector_type"], + "dimension": analysis["dimension"], + "statistics": analysis.get("statistics", {}) + } + + return comparison + + def validate_vector_consistency(self) -> Dict[str, Any]: + """ + Validate vector data consistency + + Returns: + Dict: validation results + """ + vector_analysis = self.analyze_vectors() + validation_result = { + "total_vectors": len(vector_analysis), + "consistent_columns": [], + "inconsistent_columns": [], + "details": {} + } + + # Group by column + columns = {} + for vec_analysis in vector_analysis: + col_name = vec_analysis["column_name"] + if col_name not in columns: + columns[col_name] = [] + columns[col_name].append(vec_analysis) + + for col_name, vec_list in columns.items(): + if len(vec_list) >= 2: + # Check if vector types are consistent for the same column + vector_types = set(v["analysis"]["vector_type"] for v in vec_list) + dimensions = set(v["analysis"]["dimension"] for v in vec_list) + + is_consistent = len(vector_types) == 1 and len(dimensions) == 1 + + validation_result["details"][col_name] = { + "vector_types": list(vector_types), + "dimensions": list(dimensions), + "is_consistent": is_consistent, + "vector_count": len(vec_list) + } + + if is_consistent: + validation_result["consistent_columns"].append(col_name) + else: + validation_result["inconsistent_columns"].append(col_name) + + return validation_result + + def query_by_id(self, id_value: Any, id_column: str = None) -> Dict[str, Any]: + """ + Query data by ID value + + Args: + id_value: ID value to search for + id_column: ID column name (if None, will try to find primary key column) + + Returns: + Dict: query results + """ + try: + import pandas as pd + import pyarrow.parquet as pq + except ImportError: + return {"error": "pandas and pyarrow are required for ID query"} + + if not self.meta_parser.metadata: + return {"error": "Parquet file not loaded"} + + try: + # Read the parquet file + df = pd.read_parquet(self.file_path) + + # If no ID column specified, try to find primary key column + if id_column is None: + # Common primary key column names + pk_candidates = ['id', 'ID', 'Id', 'pk', 'PK', 'primary_key', 'row_id', 'RowID'] + for candidate in pk_candidates: + if candidate in df.columns: + id_column = candidate + break + + if id_column is None: + # If no common PK found, use the first column + id_column = df.columns[0] + + if id_column not in df.columns: + return { + "error": f"ID column '{id_column}' not found in the data", + "available_columns": list(df.columns) + } + + # Query by ID + result = df[df[id_column] == id_value] + + if result.empty: + return { + "found": False, + "id_column": id_column, + "id_value": id_value, + "message": f"No record found with {id_column} = {id_value}" + } + + # Convert to dict for JSON serialization + record = result.iloc[0].to_dict() + + # Handle vector columns if present + vector_columns = [] + for col_name, value in record.items(): + if isinstance(value, bytes) and len(value) > 32: + # This might be a vector, try to deserialize + try: + vector_analysis = VectorDeserializer.deserialize_with_analysis(value, col_name) + if vector_analysis: + vector_columns.append({ + "column_name": col_name, + "analysis": vector_analysis + }) + # Replace bytes with analysis summary + if vector_analysis["vector_type"] == "JSON": + # For JSON, show the actual content + record[col_name] = vector_analysis["deserialized"] + elif vector_analysis["vector_type"] == "Array": + # For Array, show the actual content + record[col_name] = vector_analysis["deserialized"] + else: + # For vectors, show type and dimension + record[col_name] = { + "vector_type": vector_analysis["vector_type"], + "dimension": vector_analysis["dimension"], + "data_preview": vector_analysis["deserialized"][:5] if vector_analysis["deserialized"] else [] + } + except Exception: + # If deserialization fails, keep as bytes but truncate for display + record[col_name] = f"" + + return { + "found": True, + "id_column": id_column, + "id_value": id_value, + "record": record, + "vector_columns": vector_columns, + "total_columns": len(df.columns), + "total_rows": len(df) + } + + except Exception as e: + return {"error": f"Query failed: {str(e)}"} + + def get_id_column_info(self) -> Dict[str, Any]: + """ + Get information about ID columns in the data + + Returns: + Dict: ID column information + """ + try: + import pandas as pd + except ImportError: + return {"error": "pandas is required for ID column analysis"} + + if not self.meta_parser.metadata: + return {"error": "Parquet file not loaded"} + + try: + df = pd.read_parquet(self.file_path) + + # Find potential ID columns + id_columns = [] + for col in df.columns: + col_data = df[col] + + # Check if column looks like an ID column + is_unique = col_data.nunique() == len(col_data) + is_numeric = pd.api.types.is_numeric_dtype(col_data) + is_integer = pd.api.types.is_integer_dtype(col_data) + + id_columns.append({ + "column_name": col, + "is_unique": is_unique, + "is_numeric": is_numeric, + "is_integer": is_integer, + "unique_count": col_data.nunique(), + "total_count": len(col_data), + "min_value": col_data.min() if is_numeric else None, + "max_value": col_data.max() if is_numeric else None, + "sample_values": col_data.head(5).tolist() + }) + + return { + "total_columns": len(df.columns), + "total_rows": len(df), + "id_columns": id_columns, + "recommended_id_column": self._get_recommended_id_column(id_columns) + } + + except Exception as e: + return {"error": f"ID column analysis failed: {str(e)}"} + + def _get_recommended_id_column(self, id_columns: List[Dict[str, Any]]) -> str: + """ + Get recommended ID column based on heuristics + + Args: + id_columns: List of ID column information + + Returns: + str: Recommended ID column name + """ + # Priority order for ID columns + priority_names = ['id', 'ID', 'Id', 'pk', 'PK', 'primary_key', 'row_id', 'RowID'] + + # First, look for columns with priority names that are unique + for priority_name in priority_names: + for col_info in id_columns: + if (col_info["column_name"].lower() == priority_name.lower() and + col_info["is_unique"]): + return col_info["column_name"] + + # Then, look for any unique integer column + for col_info in id_columns: + if col_info["is_unique"] and col_info["is_integer"]: + return col_info["column_name"] + + # Finally, look for any unique column + for col_info in id_columns: + if col_info["is_unique"]: + return col_info["column_name"] + + # If no unique column found, return the first column + return id_columns[0]["column_name"] if id_columns else "" \ No newline at end of file diff --git a/cmd/tools/binlogv2/parquet_analyzer/meta_parser.py b/cmd/tools/binlogv2/parquet_analyzer/meta_parser.py new file mode 100644 index 0000000000..3cec4b9270 --- /dev/null +++ b/cmd/tools/binlogv2/parquet_analyzer/meta_parser.py @@ -0,0 +1,448 @@ +""" +Parquet Metadata Parser Component +Responsible for parsing parquet file metadata information +""" + +import pyarrow.parquet as pq +import json +from pathlib import Path +from typing import Dict, List, Any, Optional + + +class ParquetMetaParser: + """Parquet file Metadata parser""" + + def __init__(self, file_path: str): + """ + Initialize parser + + Args: + file_path: parquet file path + """ + self.file_path = Path(file_path) + self.parquet_file = None + self.metadata = None + self.schema = None + + def load(self) -> bool: + """ + Load parquet file + + Returns: + bool: whether loading was successful + """ + try: + self.parquet_file = pq.ParquetFile(self.file_path) + self.metadata = self.parquet_file.metadata + self.schema = self.parquet_file.schema_arrow + return True + except Exception as e: + print(f"โŒ Failed to load parquet file: {e}") + return False + + def get_basic_info(self) -> Dict[str, Any]: + """ + Get basic information + + Returns: + Dict: dictionary containing basic file information + """ + if not self.metadata: + return {} + + file_size = self.file_path.stat().st_size + return { + "name": self.file_path.name, + "size_bytes": file_size, + "size_mb": file_size / 1024 / 1024, + "num_rows": self.metadata.num_rows, + "num_columns": self.metadata.num_columns, + "num_row_groups": self.metadata.num_row_groups, + "created_by": self.metadata.created_by, + "format_version": self.metadata.format_version, + "footer_size_bytes": self.metadata.serialized_size + } + + def get_file_metadata(self) -> Dict[str, str]: + """ + Get file-level metadata + + Returns: + Dict: file-level metadata dictionary + """ + if not self.metadata or not self.metadata.metadata: + return {} + + result = {} + for key, value in self.metadata.metadata.items(): + key_str = key.decode() if isinstance(key, bytes) else key + value_str = value.decode() if isinstance(value, bytes) else value + result[key_str] = value_str + + return result + + def get_schema_metadata(self) -> List[Dict[str, Any]]: + """ + Get Schema-level metadata + + Returns: + List: Schema-level metadata list + """ + if not self.schema: + return [] + + result = [] + for i, field in enumerate(self.schema): + if field.metadata: + field_metadata = {} + for k, v in field.metadata.items(): + k_str = k.decode() if isinstance(k, bytes) else k + v_str = v.decode() if isinstance(v, bytes) else v + field_metadata[k_str] = v_str + + result.append({ + "column_index": i, + "column_name": field.name, + "column_type": str(field.type), + "metadata": field_metadata + }) + + return result + + def get_column_statistics(self) -> List[Dict[str, Any]]: + """ + Get column statistics + + Returns: + List: column statistics list + """ + if not self.metadata: + return [] + + result = [] + for i in range(self.metadata.num_columns): + col_meta = self.metadata.row_group(0).column(i) + col_stats = { + "column_name": col_meta.path_in_schema, + "compression": str(col_meta.compression), + "encodings": [str(enc) for enc in col_meta.encodings], + "file_offset": col_meta.file_offset, + "compressed_size": col_meta.total_compressed_size, + "uncompressed_size": col_meta.total_uncompressed_size + } + + if col_meta.statistics: + stats = col_meta.statistics + col_stats["statistics"] = {} + if hasattr(stats, 'null_count') and stats.null_count is not None: + col_stats["statistics"]["null_count"] = stats.null_count + if hasattr(stats, 'distinct_count') and stats.distinct_count is not None: + col_stats["statistics"]["distinct_count"] = stats.distinct_count + if hasattr(stats, 'min') and stats.min is not None: + if isinstance(stats.min, bytes): + col_stats["statistics"]["min"] = stats.min.hex() + else: + col_stats["statistics"]["min"] = stats.min + if hasattr(stats, 'max') and stats.max is not None: + if isinstance(stats.max, bytes): + col_stats["statistics"]["max"] = stats.max.hex() + else: + col_stats["statistics"]["max"] = stats.max + + result.append(col_stats) + + return result + + def get_row_group_info(self) -> List[Dict[str, Any]]: + """ + Get row group information + + Returns: + List: row group information list + """ + if not self.metadata: + return [] + + result = [] + for i in range(self.metadata.num_row_groups): + row_group = self.metadata.row_group(i) + result.append({ + "rowgroup_index": i, + "num_rows": row_group.num_rows, + "total_byte_size": row_group.total_byte_size, + "num_columns": row_group.num_columns + }) + + return result + + def parse_row_group_metadata(self, metadata_str: str) -> List[Dict[str, Any]]: + """ + Parse row group metadata string + + Args: + metadata_str: metadata string in format "bytes|rows|offset;bytes|rows|offset;..." + + Returns: + List: parsed row group metadata + """ + if not metadata_str: + return [] + + result = [] + groups = metadata_str.split(';') + + for i, group in enumerate(groups): + if not group.strip(): + continue + + parts = group.split('|') + if len(parts) >= 3: + try: + bytes_size = int(parts[0]) + rows = int(parts[1]) + offset = int(parts[2]) + + result.append({ + "rowgroup_index": i, + "bytes": bytes_size, + "rows": rows, + "offset": offset + }) + except (ValueError, IndexError): + print(f"โš ๏ธ Warning: Invalid row group metadata format: {group}") + continue + + return result + + def format_row_group_metadata(self, metadata_str: str) -> str: + """ + Format row group metadata string to readable format + + Args: + metadata_str: metadata string in format "bytes|rows|offset;bytes|rows|offset;..." + + Returns: + str: formatted string + """ + parsed = self.parse_row_group_metadata(metadata_str) + + if not parsed: + return "No valid row group metadata found" + + lines = [] + for group in parsed: + lines.append(f"row group {group['rowgroup_index']}: {group['bytes']} bytes, {group['rows']} rows, {group['offset']} offset") + + return '\n'.join(lines) + + def parse_group_field_id_list(self, field_list_str: str) -> List[Dict[str, Any]]: + """ + Parse group field ID list string + + Args: + field_list_str: field list string in format "field_ids;field_ids;..." + + Returns: + List: parsed field group metadata + """ + if not field_list_str: + return [] + + result = [] + groups = field_list_str.split(';') + + for i, group in enumerate(groups): + if not group.strip(): + continue + + try: + field_ids = [int(fid.strip()) for fid in group.split(',') if fid.strip()] + + result.append({ + "group_index": i, + "field_ids": field_ids, + "field_count": len(field_ids) + }) + except (ValueError, IndexError): + print(f"โš ๏ธ Warning: Invalid field group format: {group}") + continue + + return result + + def format_group_field_id_list(self, field_list_str: str) -> str: + """ + Format group field ID list string to readable format + + Args: + field_list_str: field list string in format "field_ids;field_ids;..." + + Returns: + str: formatted string + """ + parsed = self.parse_group_field_id_list(field_list_str) + + if not parsed: + return "No valid field group metadata found" + + lines = [] + for group in parsed: + field_ids_str = ','.join(map(str, group['field_ids'])) + lines.append(f"column group {group['group_index']}: {field_ids_str}") + + return '\n'.join(lines) + + def parse_custom_metadata(self, metadata_dict: Dict[str, str]) -> Dict[str, Any]: + """ + Parse custom metadata and format special fields + + Args: + metadata_dict: metadata dictionary + + Returns: + Dict: parsed metadata with formatted special fields + """ + result = { + "raw_metadata": metadata_dict, + "formatted_metadata": {} + } + + for key, value in metadata_dict.items(): + if key == "row_group_metadata": + result["formatted_metadata"]["row_group_metadata"] = { + "raw": value, + "parsed": self.parse_row_group_metadata(value), + "formatted": self.format_row_group_metadata(value) + } + elif key == "group_field_id_list": + result["formatted_metadata"]["group_field_id_list"] = { + "raw": value, + "parsed": self.parse_group_field_id_list(value), + "formatted": self.format_group_field_id_list(value) + } + else: + result["formatted_metadata"][key] = value + + return result + + def get_metadata_summary(self) -> Dict[str, Any]: + """ + Get metadata summary information + + Returns: + Dict: metadata summary + """ + file_metadata = self.get_file_metadata() + schema_metadata = self.get_schema_metadata() + + summary = { + "file_metadata_count": len(file_metadata), + "schema_metadata_count": len(schema_metadata), + "total_metadata_count": len(file_metadata) + len(schema_metadata) + } + + return summary + + def export_metadata(self, output_file: Optional[str] = None) -> str: + """ + Export metadata to JSON file + + Args: + output_file: output file path, if None will auto-generate + + Returns: + str: output file path + """ + if output_file is None: + output_file = f"{self.file_path.stem}_metadata.json" + + file_metadata = self.get_file_metadata() + parsed_metadata = self.parse_custom_metadata(file_metadata) + + metadata_export = { + "file_info": self.get_basic_info(), + "file_metadata": file_metadata, + "parsed_metadata": parsed_metadata, + "schema_metadata": self.get_schema_metadata(), + "column_statistics": self.get_column_statistics(), + "row_group_info": self.get_row_group_info(), + "metadata_summary": self.get_metadata_summary() + } + + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(metadata_export, f, indent=2, ensure_ascii=False) + + return output_file + + def print_summary(self): + """Print metadata summary""" + if not self.metadata: + print("โŒ No parquet file loaded") + return + + basic_info = self.get_basic_info() + summary = self.get_metadata_summary() + file_metadata = self.get_file_metadata() + + print(f"๐Ÿ“Š Parquet File Metadata Summary: {basic_info['name']}") + print("=" * 60) + print(f" File Size: {basic_info['size_mb']:.2f} MB") + print(f" Total Rows: {basic_info['num_rows']:,}") + print(f" Columns: {basic_info['num_columns']}") + print(f" Row Groups: {basic_info['num_row_groups']}") + print(f" Created By: {basic_info['created_by']}") + print(f" Parquet Version: {basic_info['format_version']}") + print(f" Footer Size: {basic_info['footer_size_bytes']:,} bytes") + print(f" File-level Metadata: {summary['file_metadata_count']} items") + print(f" Schema-level Metadata: {summary['schema_metadata_count']} items") + print(f" Total Metadata: {summary['total_metadata_count']} items") + + # Print formatted custom metadata + if file_metadata: + parsed_metadata = self.parse_custom_metadata(file_metadata) + formatted = parsed_metadata.get("formatted_metadata", {}) + + if "row_group_metadata" in formatted: + print("\n๐Ÿ“‹ Row Group Metadata:") + print("-" * 30) + print(formatted["row_group_metadata"]["formatted"]) + + if "group_field_id_list" in formatted: + print("\n๐Ÿ“‹ Column Group Metadata:") + print("-" * 30) + print(formatted["group_field_id_list"]["formatted"]) + + def print_formatted_metadata(self, metadata_key: str = None): + """ + Print formatted metadata for specific keys + + Args: + metadata_key: specific metadata key to format, if None prints all + """ + if not self.metadata: + print("โŒ No parquet file loaded") + return + + file_metadata = self.get_file_metadata() + if not file_metadata: + print("โŒ No file metadata found") + return + + parsed_metadata = self.parse_custom_metadata(file_metadata) + formatted = parsed_metadata.get("formatted_metadata", {}) + + if metadata_key: + if metadata_key in formatted: + print(f"\n๐Ÿ“‹ {metadata_key.upper()} Metadata:") + print("-" * 50) + print(formatted[metadata_key]["formatted"]) + else: + print(f"โŒ Metadata key '{metadata_key}' not found") + else: + # Print all formatted metadata + for key, value in formatted.items(): + if isinstance(value, dict) and "formatted" in value: + print(f"\n๐Ÿ“‹ {key.upper()} Metadata:") + print("-" * 50) + print(value["formatted"]) + else: + print(f"\n๐Ÿ“‹ {key.upper()}: {value}") \ No newline at end of file diff --git a/cmd/tools/binlogv2/parquet_analyzer/vector_deserializer.py b/cmd/tools/binlogv2/parquet_analyzer/vector_deserializer.py new file mode 100644 index 0000000000..790a351a9b --- /dev/null +++ b/cmd/tools/binlogv2/parquet_analyzer/vector_deserializer.py @@ -0,0 +1,567 @@ +""" +Vector Deserializer Component +Responsible for deserializing vector data in parquet files +References deserialization logic from serde.go +""" + +import struct +import numpy as np +from typing import List, Optional, Union, Dict, Any + + +class VectorDeserializer: + """Vector deserializer""" + + @staticmethod + def detect_vector_type_and_dim(bytes_data: bytes, field_name: str = "") -> tuple[str, int]: + """ + Detect vector type and dimension based on field name and byte data + + Args: + bytes_data: byte data + field_name: field name + + Returns: + tuple: (vector_type, dimension) + """ + if not bytes_data: + return "Unknown", 0 + + # First, try to detect if it's JSON data + try: + # Check if it starts with { or [ (JSON indicators) + if bytes_data.startswith(b'{') or bytes_data.startswith(b'['): + return "JSON", len(bytes_data) + + # Try to decode as UTF-8 and check if it's valid JSON + decoded = bytes_data.decode('utf-8', errors='ignore') + if decoded.startswith('{') or decoded.startswith('['): + return "JSON", len(bytes_data) + except: + pass + + # Check for specific field name patterns + if "json" in field_name.lower(): + return "JSON", len(bytes_data) + + # Check for array patterns (Protocol Buffers style) + if "array" in field_name.lower(): + # Check if it looks like a protobuf array + if len(bytes_data) > 2 and bytes_data[1] == ord('-') and b'\n\x01' in bytes_data: + return "Array", len(bytes_data) + + # Infer type based on field name + if "vector" in field_name.lower(): + # For vector fields, prioritize checking if it's Int8Vector (one uint8 per byte) + if len(bytes_data) <= 256: # Usually vector dimension won't exceed 256 + return "Int8Vector", len(bytes_data) + elif len(bytes_data) % 4 == 0: + dim = len(bytes_data) // 4 + return "FloatVector", dim + elif len(bytes_data) % 2 == 0: + dim = len(bytes_data) // 2 + if "float16" in field_name.lower(): + return "Float16Vector", dim + elif "bfloat16" in field_name.lower(): + return "BFloat16Vector", dim + else: + return "Float16Vector", dim # default + else: + # Binary vector, calculate dimension + dim = len(bytes_data) * 8 + return "BinaryVector", dim + else: + # Infer based on byte count + if len(bytes_data) % 4 == 0: + dim = len(bytes_data) // 4 + return "FloatVector", dim + elif len(bytes_data) % 2 == 0: + dim = len(bytes_data) // 2 + return "Float16Vector", dim + else: + dim = len(bytes_data) * 8 + return "BinaryVector", dim + + @staticmethod + def deserialize_float_vector(bytes_data: bytes, dim: Optional[int] = None) -> Optional[List[float]]: + """ + Deserialize FloatVector + References FloatVector processing logic from serde.go + + Args: + bytes_data: byte data + dim: dimension, if None will auto-calculate + + Returns: + List[float]: deserialized float vector + """ + if not bytes_data: + return None + + try: + if dim is None: + dim = len(bytes_data) // 4 + + if len(bytes_data) != dim * 4: + raise ValueError(f"FloatVector size mismatch: expected {dim * 4}, got {len(bytes_data)}") + + # Use struct to unpack float32 data + floats = struct.unpack(f'<{dim}f', bytes_data) + return list(floats) + + except Exception as e: + print(f"FloatVector deserialization failed: {e}") + return None + + @staticmethod + def deserialize_binary_vector(bytes_data: bytes, dim: Optional[int] = None) -> Optional[List[int]]: + """ + Deserialize BinaryVector + References BinaryVector processing logic from serde.go + + Args: + bytes_data: byte data + dim: dimension, if None will auto-calculate + + Returns: + List[int]: deserialized binary vector + """ + if not bytes_data: + return None + + try: + if dim is None: + dim = len(bytes_data) * 8 + + expected_size = (dim + 7) // 8 + if len(bytes_data) != expected_size: + raise ValueError(f"BinaryVector size mismatch: expected {expected_size}, got {len(bytes_data)}") + + # Convert to binary vector + binary_vector = [] + for byte in bytes_data: + for i in range(8): + bit = (byte >> i) & 1 + binary_vector.append(bit) + + # Only return first dim elements + return binary_vector[:dim] + + except Exception as e: + print(f"BinaryVector deserialization failed: {e}") + return None + + @staticmethod + def deserialize_int8_vector(bytes_data: bytes, dim: Optional[int] = None) -> Optional[List[int]]: + """ + Deserialize Int8Vector + References Int8Vector processing logic from serde.go + + Args: + bytes_data: byte data + dim: dimension, if None will auto-calculate + + Returns: + List[int]: deserialized int8 vector + """ + if not bytes_data: + return None + + try: + if dim is None: + dim = len(bytes_data) + + if len(bytes_data) != dim: + raise ValueError(f"Int8Vector size mismatch: expected {dim}, got {len(bytes_data)}") + + # Convert to int8 array + int8_vector = [int8 for int8 in bytes_data] + return int8_vector + + except Exception as e: + print(f"Int8Vector deserialization failed: {e}") + return None + + @staticmethod + def deserialize_float16_vector(bytes_data: bytes, dim: Optional[int] = None) -> Optional[List[float]]: + """ + Deserialize Float16Vector + References Float16Vector processing logic from serde.go + + Args: + bytes_data: byte data + dim: dimension, if None will auto-calculate + + Returns: + List[float]: deserialized float16 vector + """ + if not bytes_data: + return None + + try: + if dim is None: + dim = len(bytes_data) // 2 + + if len(bytes_data) != dim * 2: + raise ValueError(f"Float16Vector size mismatch: expected {dim * 2}, got {len(bytes_data)}") + + # Convert to float16 array + float16_vector = [] + for i in range(0, len(bytes_data), 2): + if i + 1 < len(bytes_data): + # Simple float16 conversion (simplified here) + uint16 = struct.unpack(' Optional[List[float]]: + """ + Deserialize BFloat16Vector + References BFloat16Vector processing logic from serde.go + + Args: + bytes_data: byte data + dim: dimension, if None will auto-calculate + + Returns: + List[float]: deserialized bfloat16 vector + """ + if not bytes_data: + return None + + try: + if dim is None: + dim = len(bytes_data) // 2 + + if len(bytes_data) != dim * 2: + raise ValueError(f"BFloat16Vector size mismatch: expected {dim * 2}, got {len(bytes_data)}") + + # Convert to bfloat16 array + bfloat16_vector = [] + for i in range(0, len(bytes_data), 2): + if i + 1 < len(bytes_data): + # Simple bfloat16 conversion (simplified here) + uint16 = struct.unpack(' Optional[Dict[str, Any]]: + """ + Deserialize JSON data + + Args: + bytes_data: byte data + + Returns: + Dict[str, Any]: deserialized JSON object + """ + if not bytes_data: + return None + + try: + import json + # Try to decode as UTF-8 + decoded = bytes_data.decode('utf-8') + return json.loads(decoded) + except UnicodeDecodeError: + try: + # Try with different encoding + decoded = bytes_data.decode('latin-1') + return json.loads(decoded) + except: + pass + except json.JSONDecodeError as e: + print(f"JSON deserialization failed: {e}") + return None + except Exception as e: + print(f"JSON deserialization failed: {e}") + return None + + @staticmethod + def deserialize_array(bytes_data: bytes) -> Optional[List[str]]: + """ + Deserialize array data (Protocol Buffers style) + + Args: + bytes_data: byte data + + Returns: + List[str]: deserialized array + """ + if not bytes_data: + return None + + try: + # Parse protobuf-style array format + # Format: length + separator + data + # Example: b'2-\n\x01q\n\x01k\n\x01l...' + + # Skip the first few bytes (length and separator) + if len(bytes_data) < 3: + return None + + # Find the start of data (after the first separator) + start_idx = 0 + for i in range(len(bytes_data) - 1): + if bytes_data[i:i+2] == b'\n\x01': + start_idx = i + 2 + break + + if start_idx == 0: + return None + + # Extract the data part + data_part = bytes_data[start_idx:] + + # Split by separator and decode + array_items = [] + + # Split by \n\x01 separator + parts = data_part.split(b'\n\x01') + + for part in parts: + if part: # Skip empty parts + try: + # Each part should be a single character + if len(part) == 1: + char = part.decode('utf-8') + array_items.append(char) + except UnicodeDecodeError: + try: + char = part.decode('latin-1') + array_items.append(char) + except: + pass + + return array_items + + except Exception as e: + print(f"Array deserialization failed: {e}") + return None + + @staticmethod + def deserialize_vector(bytes_data: bytes, vector_type: str, dim: Optional[int] = None) -> Optional[Union[List[float], List[int], Dict[str, Any]]]: + """ + Generic vector deserialization method + + Args: + bytes_data: byte data + vector_type: vector type + dim: dimension, if None will auto-calculate + + Returns: + Union[List[float], List[int], Dict[str, Any]]: deserialized vector + """ + if not bytes_data: + return None + + try: + if vector_type == "FloatVector": + return VectorDeserializer.deserialize_float_vector(bytes_data, dim) + elif vector_type == "BinaryVector": + return VectorDeserializer.deserialize_binary_vector(bytes_data, dim) + elif vector_type == "Int8Vector": + return VectorDeserializer.deserialize_int8_vector(bytes_data, dim) + elif vector_type == "Float16Vector": + return VectorDeserializer.deserialize_float16_vector(bytes_data, dim) + elif vector_type == "BFloat16Vector": + return VectorDeserializer.deserialize_bfloat16_vector(bytes_data, dim) + elif vector_type == "JSON": + return VectorDeserializer.deserialize_json(bytes_data) + elif vector_type == "Array": + return VectorDeserializer.deserialize_array(bytes_data) + else: + # Other binary types, return hex representation of original bytes + return bytes_data.hex() + + except Exception as e: + print(f"Vector deserialization failed: {e}") + return bytes_data.hex() + + @staticmethod + def analyze_vector_statistics(vector_data: Union[List[float], List[int], Dict[str, Any]], vector_type: str) -> Dict[str, Any]: + """ + Analyze vector statistics + + Args: + vector_data: vector data + vector_type: vector type + + Returns: + Dict: statistics information + """ + if not vector_data: + return {} + + try: + if vector_type == "JSON": + # For JSON data, return basic info + if isinstance(vector_data, dict): + return { + "vector_type": vector_type, + "keys": list(vector_data.keys()), + "key_count": len(vector_data), + "is_object": True + } + elif isinstance(vector_data, list): + return { + "vector_type": vector_type, + "length": len(vector_data), + "is_array": True + } + else: + return { + "vector_type": vector_type, + "value_type": type(vector_data).__name__ + } + elif vector_type == "Array": + # For Array data, return basic info + if isinstance(vector_data, list): + return { + "vector_type": vector_type, + "length": len(vector_data), + "is_array": True, + "item_types": list(set(type(item).__name__ for item in vector_data)) + } + else: + return { + "vector_type": vector_type, + "value_type": type(vector_data).__name__ + } + + # For numeric vectors + array_data = np.array(vector_data) + + stats = { + "vector_type": vector_type, + "dimension": len(vector_data), + "min": float(array_data.min()), + "max": float(array_data.max()), + "mean": float(array_data.mean()), + "std": float(array_data.std()) + } + + if vector_type == "BinaryVector": + stats["zero_count"] = int(np.sum(array_data == 0)) + stats["one_count"] = int(np.sum(array_data == 1)) + + return stats + + except Exception as e: + print(f"Statistics calculation failed: {e}") + return {} + + @staticmethod + def analyze_vector_pattern(bytes_data: bytes) -> Dict[str, Any]: + """ + Analyze vector data patterns + + Args: + bytes_data: byte data + + Returns: + Dict: pattern analysis results + """ + if not bytes_data: + return {} + + try: + # Convert to integer array + int_data = list(bytes_data) + + # Check if it's an increasing pattern + is_increasing = all(int_data[i] <= int_data[i+1] for i in range(len(int_data)-1)) + is_decreasing = all(int_data[i] >= int_data[i+1] for i in range(len(int_data)-1)) + + # Check if it's a cyclic pattern (i + j) % 256 + is_cyclic = True + for i in range(min(10, len(int_data))): # Check first 10 elements + expected = (i) % 256 + if int_data[i] != expected: + is_cyclic = False + break + + # Check if it's a sequential pattern [start, start+1, start+2, ...] + is_sequential = True + if len(int_data) > 1: + start_val = int_data[0] + for i in range(1, min(10, len(int_data))): + if int_data[i] != start_val + i: + is_sequential = False + break + else: + is_sequential = False + + # Calculate statistics + unique_values = len(set(int_data)) + min_val = min(int_data) + max_val = max(int_data) + avg_val = sum(int_data) / len(int_data) + + return { + "is_increasing": is_increasing, + "is_decreasing": is_decreasing, + "is_cyclic": is_cyclic, + "is_sequential": is_sequential, + "unique_values": unique_values, + "min_value": min_val, + "max_value": max_val, + "avg_value": avg_val, + "pattern_type": "sequential" if is_sequential else "cyclic" if is_cyclic else "increasing" if is_increasing else "decreasing" if is_decreasing else "random" + } + + except Exception as e: + print(f"Pattern analysis failed: {e}") + return {} + + @staticmethod + def deserialize_with_analysis(bytes_data: bytes, field_name: str = "") -> Dict[str, Any]: + """ + Deserialize and analyze vector data + + Args: + bytes_data: byte data + field_name: field name + + Returns: + Dict: dictionary containing deserialization results and statistics + """ + if not bytes_data: + return {} + + # Detect vector type and dimension + vector_type, dim = VectorDeserializer.detect_vector_type_and_dim(bytes_data, field_name) + + # Analyze data patterns + pattern_analysis = VectorDeserializer.analyze_vector_pattern(bytes_data) + + # Deserialize + deserialized_data = VectorDeserializer.deserialize_vector(bytes_data, vector_type, dim) + + # Analyze statistics + stats = VectorDeserializer.analyze_vector_statistics(deserialized_data, vector_type) + + return { + "raw_hex": bytes_data.hex(), + "vector_type": vector_type, + "dimension": dim, + "deserialized": deserialized_data, + "statistics": stats, + "pattern_analysis": pattern_analysis + } \ No newline at end of file diff --git a/cmd/tools/binlogv2/parquet_analyzer_cli.py b/cmd/tools/binlogv2/parquet_analyzer_cli.py new file mode 100755 index 0000000000..f68f8fdc7c --- /dev/null +++ b/cmd/tools/binlogv2/parquet_analyzer_cli.py @@ -0,0 +1,498 @@ +#!/usr/bin/env python3 +""" +Parquet Analyzer Command Line Tool +Provides a simple command line interface to use the parquet analyzer +""" + +import argparse +import sys +import json +import pandas as pd +import pyarrow.parquet as pq +from pathlib import Path + +# Add current directory to Python path +sys.path.append(str(Path(__file__).parent)) + +from parquet_analyzer import ParquetAnalyzer, ParquetMetaParser, VectorDeserializer + + +def main(): + """Main function""" + parser = argparse.ArgumentParser( + description="Parquet Analyzer Command Line Tool", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Usage Examples: + python parquet_analyzer_cli.py analyze test_large_batch.parquet + python parquet_analyzer_cli.py metadata test_large_batch.parquet + python parquet_analyzer_cli.py vector test_large_batch.parquet + python parquet_analyzer_cli.py export test_large_batch.parquet --output result.json + python parquet_analyzer_cli.py data test_large_batch.parquet --rows 10 --output data.json + python parquet_analyzer_cli.py query test_large_batch.parquet --id-value 123 + python parquet_analyzer_cli.py query test_large_batch.parquet --id-value 123 --id-column user_id + """ + ) + + parser.add_argument( + "command", + choices=["analyze", "metadata", "vector", "export", "data", "query"], + help="Command to execute" + ) + + parser.add_argument( + "file", + help="Parquet file path" + ) + + parser.add_argument( + "--output", "-o", + help="Output file path (for export and data commands)" + ) + + parser.add_argument( + "--rows", "-r", + type=int, + default=10, + help="Number of rows to export (only for data command, default: 10 rows)" + ) + + parser.add_argument( + "--verbose", "-v", + action="store_true", + help="Verbose output" + ) + + # Query-specific arguments + parser.add_argument( + "--id-value", "-i", + help="ID value to query (for query command)" + ) + + parser.add_argument( + "--id-column", "-c", + help="ID column name (for query command, auto-detected if not specified)" + ) + + args = parser.parse_args() + + # Check if file exists + if not Path(args.file).exists(): + print(f"โŒ File does not exist: {args.file}") + sys.exit(1) + + if args.command == "analyze": + analyze_file(args.file, args.verbose) + elif args.command == "metadata": + analyze_metadata(args.file, args.verbose) + elif args.command == "vector": + analyze_vectors(args.file, args.verbose) + elif args.command == "export": + export_analysis(args.file, args.output, args.verbose) + elif args.command == "data": + export_data(args.file, args.output, args.rows, args.verbose) + elif args.command == "query": + query_by_id(args.file, args.id_value, args.id_column, args.verbose) + + +def analyze_file(file_path: str, verbose: bool = False): + """Analyze parquet file""" + print(f"๐Ÿ” Analyzing parquet file: {Path(file_path).name}") + print("=" * 60) + + analyzer = ParquetAnalyzer(file_path) + + if not analyzer.load(): + print("โŒ Failed to load parquet file") + sys.exit(1) + + # Print summary + analyzer.print_summary() + + if verbose: + # Detailed analysis + analysis = analyzer.analyze() + + print(f"\n๐Ÿ“Š Detailed Analysis Results:") + print(f" File Info: {analysis['metadata']['basic_info']['name']}") + print(f" Size: {analysis['metadata']['basic_info']['size_mb']:.2f} MB") + print(f" Rows: {analysis['metadata']['basic_info']['num_rows']:,}") + print(f" Columns: {analysis['metadata']['basic_info']['num_columns']}") + + # Display vector analysis + if analysis['vectors']: + print(f"\n๐Ÿ” Vector Analysis:") + for vec_analysis in analysis['vectors']: + col_name = vec_analysis['column_name'] + stat_type = vec_analysis['stat_type'] + analysis_data = vec_analysis['analysis'] + + print(f" {col_name} ({stat_type}):") + print(f" Vector Type: {analysis_data['vector_type']}") + print(f" Dimension: {analysis_data['dimension']}") + + if analysis_data['statistics']: + stats = analysis_data['statistics'] + print(f" Min: {stats.get('min', 'N/A')}") + print(f" Max: {stats.get('max', 'N/A')}") + print(f" Mean: {stats.get('mean', 'N/A')}") + print(f" Std: {stats.get('std', 'N/A')}") + + +def analyze_metadata(file_path: str, verbose: bool = False): + """Analyze metadata""" + print(f"๐Ÿ“„ Analyzing metadata: {Path(file_path).name}") + print("=" * 60) + + meta_parser = ParquetMetaParser(file_path) + + if not meta_parser.load(): + print("โŒ Failed to load parquet file") + sys.exit(1) + + # Basic information + basic_info = meta_parser.get_basic_info() + print(f"๐Ÿ“Š File Information:") + print(f" Name: {basic_info['name']}") + print(f" Size: {basic_info['size_mb']:.2f} MB") + print(f" Rows: {basic_info['num_rows']:,}") + print(f" Columns: {basic_info['num_columns']}") + print(f" Row Groups: {basic_info['num_row_groups']}") + print(f" Created By: {basic_info['created_by']}") + print(f" Parquet Version: {basic_info['format_version']}") + + # File-level metadata + file_metadata = meta_parser.get_file_metadata() + if file_metadata: + print(f"\n๐Ÿ“„ File-level Metadata:") + for key, value in file_metadata.items(): + print(f" {key}: {value}") + + # Schema-level metadata + schema_metadata = meta_parser.get_schema_metadata() + if schema_metadata: + print(f"\n๐Ÿ“‹ Schema-level Metadata:") + for field in schema_metadata: + print(f" {field['column_name']}: {field['column_type']}") + for k, v in field['metadata'].items(): + print(f" {k}: {v}") + + # Column statistics + column_stats = meta_parser.get_column_statistics() + if column_stats: + print(f"\n๐Ÿ“ˆ Column Statistics:") + for col_stats in column_stats: + print(f" {col_stats['column_name']}:") + print(f" Compression: {col_stats['compression']}") + print(f" Encodings: {', '.join(col_stats['encodings'])}") + print(f" Compressed Size: {col_stats['compressed_size']:,} bytes") + print(f" Uncompressed Size: {col_stats['uncompressed_size']:,} bytes") + + if 'statistics' in col_stats and col_stats['statistics']: + stats = col_stats['statistics'] + if 'null_count' in stats: + print(f" Null Count: {stats['null_count']}") + if 'distinct_count' in stats: + print(f" Distinct Count: {stats['distinct_count']}") + if 'min' in stats: + print(f" Min: {stats['min']}") + if 'max' in stats: + print(f" Max: {stats['max']}") + + +def analyze_vectors(file_path: str, verbose: bool = False): + """Analyze vector data""" + print(f"๐Ÿ” Analyzing vector data: {Path(file_path).name}") + print("=" * 60) + + analyzer = ParquetAnalyzer(file_path) + + if not analyzer.load(): + print("โŒ Failed to load parquet file") + sys.exit(1) + + vector_analysis = analyzer.analyze_vectors() + + if not vector_analysis: + print("โŒ No vector data found") + return + + print(f"๐Ÿ“Š Found {len(vector_analysis)} vector statistics:") + + for vec_analysis in vector_analysis: + col_name = vec_analysis['column_name'] + stat_type = vec_analysis['stat_type'] + analysis = vec_analysis['analysis'] + + print(f"\n๐Ÿ” {col_name} ({stat_type}):") + print(f" Vector Type: {analysis['vector_type']}") + print(f" Dimension: {analysis['dimension']}") + + if analysis['statistics']: + stats = analysis['statistics'] + print(f" Min: {stats.get('min', 'N/A')}") + print(f" Max: {stats.get('max', 'N/A')}") + print(f" Mean: {stats.get('mean', 'N/A')}") + print(f" Std: {stats.get('std', 'N/A')}") + + if analysis['vector_type'] == "BinaryVector": + print(f" Zero Count: {stats.get('zero_count', 'N/A')}") + print(f" One Count: {stats.get('one_count', 'N/A')}") + + if verbose and analysis['deserialized']: + print(f" First 5 elements: {analysis['deserialized'][:5]}") + + # Validate consistency + validation = analyzer.validate_vector_consistency() + print(f"\nโœ… Vector Consistency Validation:") + print(f" Total Vectors: {validation['total_vectors']}") + print(f" Consistent Columns: {validation['consistent_columns']}") + print(f" Inconsistent Columns: {validation['inconsistent_columns']}") + + +def export_analysis(file_path: str, output_file: str = None, verbose: bool = False): + """Export analysis results""" + print(f"๐Ÿ’พ Exporting analysis results: {Path(file_path).name}") + print("=" * 60) + + analyzer = ParquetAnalyzer(file_path) + + if not analyzer.load(): + print("โŒ Failed to load parquet file") + sys.exit(1) + + # Export analysis results + output_file = analyzer.export_analysis(output_file) + + print(f"โœ… Analysis results exported to: {output_file}") + + if verbose: + # Show file size + file_size = Path(output_file).stat().st_size + print(f"๐Ÿ“Š Output file size: {file_size:,} bytes ({file_size/1024:.2f} KB)") + + # Show summary + analysis = analyzer.analyze() + print(f"๐Ÿ“ˆ Analysis Summary:") + print(f" Metadata Count: {analysis['metadata']['metadata_summary']['total_metadata_count']}") + print(f" Vector Count: {len(analysis['vectors'])}") + + +def export_data(file_path: str, output_file: str = None, num_rows: int = 10, verbose: bool = False): + """Export first N rows of parquet file data""" + print(f"๐Ÿ“Š Exporting data: {Path(file_path).name}") + print("=" * 60) + + try: + # Read parquet file + table = pq.read_table(file_path) + df = table.to_pandas() + + # Get first N rows + data_subset = df.head(num_rows) + + # Process vector columns, convert bytes to readable format + processed_data = [] + for idx, row in data_subset.iterrows(): + row_dict = {} + for col_name, value in row.items(): + if isinstance(value, bytes): + # Try to deserialize as vector + try: + vec_analysis = VectorDeserializer.deserialize_with_analysis(value, col_name) + if vec_analysis and vec_analysis['deserialized']: + if vec_analysis['vector_type'] == "JSON": + # For JSON, show the actual content + row_dict[col_name] = vec_analysis['deserialized'] + elif vec_analysis['vector_type'] == "Array": + # For Array, show the actual content + row_dict[col_name] = vec_analysis['deserialized'] + else: + # For vectors, show type and dimension + row_dict[col_name] = { + "vector_type": vec_analysis['vector_type'], + "dimension": vec_analysis['dimension'], + "data": vec_analysis['deserialized'][:10], # Only show first 10 elements + "raw_hex": value.hex()[:50] + "..." if len(value.hex()) > 50 else value.hex() + } + else: + row_dict[col_name] = { + "type": "binary", + "size": len(value), + "hex": value.hex()[:50] + "..." if len(value.hex()) > 50 else value.hex() + } + except Exception as e: + row_dict[col_name] = { + "type": "binary", + "size": len(value), + "hex": value.hex()[:50] + "..." if len(value.hex()) > 50 else value.hex(), + "error": str(e) + } + else: + row_dict[col_name] = value + processed_data.append(row_dict) + + # Prepare output + result = { + "file_info": { + "name": Path(file_path).name, + "total_rows": len(df), + "total_columns": len(df.columns), + "exported_rows": len(processed_data) + }, + "columns": list(df.columns), + "data": processed_data + } + + # Determine output file + if not output_file: + output_file = f"{Path(file_path).stem}_data_{num_rows}rows.json" + + # Save to file + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(result, f, ensure_ascii=False, indent=2) + + print(f"โœ… Data exported to: {output_file}") + print(f"๐Ÿ“Š Exported {len(processed_data)} rows (total {len(df)} rows)") + print(f"๐Ÿ“‹ Columns: {len(df.columns)}") + + if verbose: + print(f"\n๐Ÿ“ˆ Data Preview:") + for i, row_data in enumerate(processed_data[:3]): # Only show first 3 rows preview + print(f" Row {i+1}:") + for col_name, value in row_data.items(): + if isinstance(value, dict) and 'vector_type' in value: + print(f" {col_name}: {value['vector_type']}({value['dimension']}) - {value['data'][:5]}...") + elif isinstance(value, dict) and 'type' in value: + print(f" {col_name}: {value['type']} ({value['size']} bytes)") + else: + print(f" {col_name}: {value}") + print() + + return output_file + + except Exception as e: + print(f"โŒ Failed to export data: {e}") + sys.exit(1) + + +def query_by_id(file_path: str, id_value: str = None, id_column: str = None, verbose: bool = False): + """Query data by ID value""" + print(f"๐Ÿ” Querying by ID: {Path(file_path).name}") + print("=" * 60) + + analyzer = ParquetAnalyzer(file_path) + + if not analyzer.load(): + print("โŒ Failed to load parquet file") + sys.exit(1) + + # If no ID value provided, show ID column information + if id_value is None: + print("๐Ÿ“‹ ID Column Information:") + print("-" * 40) + + id_info = analyzer.get_id_column_info() + + if "error" in id_info: + print(f"โŒ {id_info['error']}") + sys.exit(1) + + print(f"๐Ÿ“Š Total rows: {id_info['total_rows']}") + print(f"๐Ÿ“‹ Total columns: {id_info['total_columns']}") + print(f"๐ŸŽฏ Recommended ID column: {id_info['recommended_id_column']}") + print() + + print("๐Ÿ“‹ Available ID columns:") + for col_info in id_info['id_columns']: + status = "โœ…" if col_info['is_unique'] else "โš ๏ธ" + print(f" {status} {col_info['column_name']}") + print(f" - Unique: {col_info['is_unique']}") + print(f" - Type: {'Integer' if col_info['is_integer'] else 'Numeric' if col_info['is_numeric'] else 'Other'}") + print(f" - Range: {col_info['min_value']} to {col_info['max_value']}" if col_info['is_numeric'] else " - Range: N/A") + print(f" - Sample values: {col_info['sample_values'][:3]}") + print() + + print("๐Ÿ’ก Usage: python parquet_analyzer_cli.py query --id-value [--id-column ]") + return + + # Convert ID value to appropriate type + try: + # Try to convert to integer first + if id_value.isdigit(): + id_value = int(id_value) + elif id_value.replace('.', '').replace('-', '').isdigit(): + id_value = float(id_value) + except ValueError: + # Keep as string if conversion fails + pass + + # Perform the query + result = analyzer.query_by_id(id_value, id_column) + + if "error" in result: + print(f"โŒ Query failed: {result['error']}") + sys.exit(1) + + if not result['found']: + print(f"โŒ {result['message']}") + return + + # Display results + print(f"โœ… Found record with {result['id_column']} = {result['id_value']}") + print(f"๐Ÿ“Š Total columns: {result['total_columns']}") + print(f"๐Ÿ“ˆ Total rows in file: {result['total_rows']}") + print() + + print("๐Ÿ“‹ Record Data:") + print("-" * 40) + + for col_name, value in result['record'].items(): + if isinstance(value, dict) and 'vector_type' in value: + # Vector data + print(f" {col_name}:") + print(f" Type: {value['vector_type']}") + print(f" Dimension: {value['dimension']}") + print(f" Data preview: {value['data_preview'][:5]}...") + elif isinstance(value, dict) and 'name' in value: + # JSON data (likely a person record) + print(f" {col_name}:") + for key, val in value.items(): + print(f" {key}: {val}") + elif isinstance(value, list) and len(value) > 0 and isinstance(value[0], str): + # String array data + print(f" {col_name}: {value}") + elif isinstance(value, list): + # Array data + print(f" {col_name}: {value}") + elif isinstance(value, str) and value.startswith('=1.28.0 +duckdb>=0.9.0 +pandas>=2.0.0 +pathlib2>=2.3.7 +minio>=7.2.0 \ No newline at end of file