import csv
import json
import os
from datetime import datetime
from pathlib import Path
from collections import OrderedDict
# ============================================================
# HELPERS
# ============================================================
def format_size(size_bytes):
for unit in ["B", "KB", "MB", "GB"]:
if size_bytes < 1024:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024
return f"{size_bytes:.1f} GB"
def detect_type(value):
"""Try to auto-detect and cast string values to int/float/bool."""
if value is None or value == "":
return None
v = str(value).strip()
if v.lower() in ("true", "yes"):
return True
if v.lower() in ("false", "no"):
return False
try:
return int(v)
except ValueError:
pass
try:
return float(v)
except ValueError:
pass
return v
def flatten_dict(d, parent_key="", sep="."):
"""Flatten nested JSON dict to single level for CSV."""
items = []
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key, sep).items())
elif isinstance(v, list):
items.append((new_key, json.dumps(v)))
else:
items.append((new_key, v))
return dict(items)
# ============================================================
# CSV → JSON
# ============================================================
def csv_to_json(input_path, output_path=None,
delimiter=",", auto_cast=True,
indent=2, encoding="utf-8"):
"""
Convert a CSV file to JSON.
Returns the output path and record count.
"""
input_path = Path(input_path)
if not input_path.exists():
print(f" File not found: {input_path}")
return None, 0
if not output_path:
output_path = input_path.with_suffix(".json")
output_path = Path(output_path)
records = []
errors = []
try:
with open(input_path, "r", encoding=encoding, newline="") as f:
reader = csv.DictReader(f, delimiter=delimiter)
headers = reader.fieldnames
if not headers:
print(" CSV file has no headers.")
return None, 0
print(f"\n Reading CSV: {input_path.name}")
print(f" Columns ({len(headers)}): {', '.join(headers)}")
for i, row in enumerate(reader, 1):
if auto_cast:
record = {k: detect_type(v) for k, v in row.items()}
else:
record = dict(row)
records.append(record)
if i % 1000 == 0:
print(f" Processed {i} rows...", end="\r")
except UnicodeDecodeError:
print(" Encoding error. Trying latin-1...")
return csv_to_json(input_path, output_path, delimiter,
auto_cast, indent, "latin-1")
except Exception as e:
print(f" Error reading CSV: {e}")
return None, 0
# Write JSON
try:
with open(output_path, "w", encoding="utf-8") as f:
json.dump(records, f, indent=indent, ensure_ascii=False)
size = format_size(output_path.stat().st_size)
print(f"\n Converted {len(records)} records")
print(f" Output : {output_path} ({size})")
return output_path, len(records)
except Exception as e:
print(f" Error writing JSON: {e}")
return None, 0
# ============================================================
# JSON → CSV
# ============================================================
def json_to_csv(input_path, output_path=None,
delimiter=",", flatten=True,
encoding="utf-8"):
"""
Convert a JSON file (array of objects) to CSV.
Returns the output path and record count.
"""
input_path = Path(input_path)
if not input_path.exists():
print(f" File not found: {input_path}")
return None, 0
if not output_path:
output_path = input_path.with_suffix(".csv")
output_path = Path(output_path)
try:
with open(input_path, "r", encoding=encoding) as f:
data = json.load(f)
except json.JSONDecodeError as e:
print(f" Invalid JSON: {e}")
return None, 0
except Exception as e:
print(f" Error reading JSON: {e}")
return None, 0
# Handle both list and single object
if isinstance(data, dict):
# Try common wrapper keys
for key in ["data", "records", "results", "items", "rows"]:
if key in data and isinstance(data[key], list):
data = data[key]
print(f" Extracted records from key: '{key}'")
break
else:
data = [data]
if not isinstance(data, list) or not data:
print(" JSON does not contain a list of records.")
return None, 0
# Flatten nested dicts if requested
if flatten:
data = [flatten_dict(r) if isinstance(r, dict) else {"value": r}
for r in data]
# Collect all unique keys across all records
all_keys = list(OrderedDict.fromkeys(
k for record in data if isinstance(record, dict)
for k in record.keys()
))
print(f"\n Reading JSON: {input_path.name}")
print(f" Records : {len(data)}")
print(f" Columns ({len(all_keys)}): {', '.join(str(k) for k in all_keys[:10])}"
f"{'...' if len(all_keys) > 10 else ''}")
try:
with open(output_path, "w", encoding=encoding,
newline="") as f:
writer = csv.DictWriter(f, fieldnames=all_keys,
delimiter=delimiter,
extrasaction="ignore")
writer.writeheader()
for i, record in enumerate(data, 1):
if isinstance(record, dict):
# Convert lists/dicts in values to strings
row = {k: (json.dumps(v) if isinstance(v, (dict, list)) else v)
for k, v in record.items()}
writer.writerow(row)
if i % 1000 == 0:
print(f" Written {i} rows...", end="\r")
size = format_size(output_path.stat().st_size)
print(f"\n Converted {len(data)} records")
print(f" Output : {output_path} ({size})")
return output_path, len(data)
except Exception as e:
print(f" Error writing CSV: {e}")
return None, 0
# ============================================================
# PREVIEW FILE
# ============================================================
def preview_csv(filepath, rows=5):
"""Print first N rows of a CSV file."""
try:
with open(filepath, "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f)
headers = next(reader, None)
if not headers:
print(" Empty CSV.")
return
print(f"\n PREVIEW: {Path(filepath).name}")
print(" " + "-"*60)
# Header
col_w = min(18, max(8, 60 // len(headers)))
header_line = " " + " ".join(str(h)[:col_w].ljust(col_w) for h in headers)
print(header_line)
print(" " + "-"*60)
# Rows
for i, row in enumerate(reader):
if i >= rows:
break
line = " " + " ".join(str(v)[:col_w].ljust(col_w) for v in row)
print(line)
print(" " + "-"*60)
except Exception as e:
print(f" Preview error: {e}")
def preview_json(filepath, rows=3):
"""Print first N records of a JSON file."""
try:
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
preview = data[:rows]
elif isinstance(data, dict):
preview = [data]
else:
preview = [{"value": data}]
print(f"\n PREVIEW: {Path(filepath).name}")
print(" " + "-"*60)
print(json.dumps(preview, indent=4, ensure_ascii=False)[:2000])
if len(str(data)) > 2000:
print(" ... (truncated)")
print(" " + "-"*60)
except Exception as e:
print(f" Preview error: {e}")
# ============================================================
# FILE INFO
# ============================================================
def file_info(filepath):
p = Path(filepath)
if not p.exists():
print(f" File not found: {filepath}")
return
size = format_size(p.stat().st_size)
mtime = datetime.fromtimestamp(p.stat().st_mtime).strftime("%d-%m-%Y %H:%M")
ext = p.suffix.lower()
print(f"\n File : {p.name}")
print(f" Size : {size}")
print(f" Modified : {mtime}")
print(f" Type : {ext.upper()[1:]} file")
if ext == ".csv":
try:
with open(filepath, "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f)
headers = next(reader, [])
row_count = sum(1 for _ in reader)
print(f" Rows : {row_count:,}")
print(f" Columns : {len(headers)}")
print(f" Headers : {', '.join(headers[:8])}"
f"{'...' if len(headers) > 8 else ''}")
except:
pass
elif ext == ".json":
try:
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
print(f" Records : {len(data):,}")
if data and isinstance(data[0], dict):
keys = list(data[0].keys())
print(f" Keys : {', '.join(str(k) for k in keys[:8])}"
f"{'...' if len(keys) > 8 else ''}")
elif isinstance(data, dict):
print(f" Keys : {', '.join(list(data.keys())[:8])}")
except:
pass
# ============================================================
# BATCH CONVERT
# ============================================================
def batch_convert(folder, direction):
folder = Path(folder)
if not folder.is_dir():
print(" Invalid folder.")
return
if direction == "csv_to_json":
files = list(folder.glob("*.csv"))
ext_from, ext_to = "CSV", "JSON"
else:
files = list(folder.glob("*.json"))
ext_from, ext_to = "JSON", "CSV"
if not files:
print(f" No {ext_from} files found in {folder}")
return
print(f"\n Found {len(files)} {ext_from} file(s). Converting to {ext_to}...")
success = 0
for f in files:
print(f"\n [{f.name}]")
if direction == "csv_to_json":
out, count = csv_to_json(f)
else:
out, count = json_to_csv(f)
if out:
success += 1
print(f"\n Batch done: {success}/{len(files)} converted successfully.")
# ============================================================
# MAIN MENU
# ============================================================
def print_menu():
print("\n" + "-"*48)
print(" CSV <-> JSON CONVERTER")
print("-"*48)
print(" 1. CSV → JSON")
print(" 2. JSON → CSV")
print(" 3. Preview a CSV file")
print(" 4. Preview a JSON file")
print(" 5. File info & stats")
print(" 6. Batch convert CSV → JSON (folder)")
print(" 7. Batch convert JSON → CSV (folder)")
print(" 0. Exit")
print("-"*48)
def main():
print("\n" + "="*55)
print(" CSV <-> JSON CONVERTER")
print("="*55)
print("\n Bidirectional converter with auto type detection,")
print(" nested JSON flattening, and batch processing.\n")
while True:
print_menu()
choice = input(" > ").strip()
# ── CSV → JSON ──────────────────────────────────────
if choice == "1":
print("\n CSV to JSON Conversion")
inp = input(" Input CSV file path: ").strip()
if not inp:
continue
out = input(" Output JSON path (Enter = same name): ").strip() or None
delim = input(" Delimiter (, or ; or Tab): ").strip()
if delim.lower() == "tab":
delim = "\t"
elif delim not in [",", ";", "|", "\t"]:
delim = ","
cast = input(" Auto-detect types? (y/n, default y): ").strip().lower()
cast = cast != "n"
indent = input(" JSON indent spaces (default 2): ").strip()
indent = int(indent) if indent.isdigit() else 2
out_path, count = csv_to_json(inp, out, delim, cast, indent)
if out_path and count:
preview = input("\n Preview output? (y/n): ").strip().lower()
if preview == "y":
preview_json(out_path, rows=3)
# ── JSON → CSV ──────────────────────────────────────
elif choice == "2":
print("\n JSON to CSV Conversion")
inp = input(" Input JSON file path: ").strip()
if not inp:
continue
out = input(" Output CSV path (Enter = same name): ").strip() or None
delim = input(" Delimiter (, or ; or Tab): ").strip()
if delim.lower() == "tab":
delim = "\t"
elif delim not in [",", ";", "|", "\t"]:
delim = ","
flat = input(" Flatten nested objects? (y/n, default y): ").strip().lower()
flat = flat != "n"
out_path, count = json_to_csv(inp, out, delim, flat)
if out_path and count:
preview = input("\n Preview output? (y/n): ").strip().lower()
if preview == "y":
preview_csv(out_path, rows=5)
# ── Preview CSV ─────────────────────────────────────
elif choice == "3":
path = input("\n CSV file path: ").strip()
rows = input(" Rows to preview (default 5): ").strip()
rows = int(rows) if rows.isdigit() else 5
preview_csv(path, rows)
# ── Preview JSON ────────────────────────────────────
elif choice == "4":
path = input("\n JSON file path: ").strip()
rows = input(" Records to preview (default 3): ").strip()
rows = int(rows) if rows.isdigit() else 3
preview_json(path, rows)
# ── File Info ───────────────────────────────────────
elif choice == "5":
path = input("\n File path: ").strip()
file_info(path)
# ── Batch CSV → JSON ────────────────────────────────
elif choice == "6":
folder = input("\n Folder path: ").strip()
batch_convert(folder, "csv_to_json")
# ── Batch JSON → CSV ────────────────────────────────
elif choice == "7":
folder = input("\n Folder path: ").strip()
batch_convert(folder, "json_to_csv")
elif choice == "0":
print("\n Goodbye!\n")
break
else:
print(" Invalid choice.")
# ============================================================
# RUN
# ============================================================
if __name__ == "__main__":
main()
No comments:
Post a Comment