CSV ↔ JSON Converter

import csv

import json

import os

from datetime import datetime

from pathlib import Path

from collections import OrderedDict


# ============================================================

# HELPERS

# ============================================================


def format_size(size_bytes):

    for unit in ["B", "KB", "MB", "GB"]:

        if size_bytes < 1024:

            return f"{size_bytes:.1f} {unit}"

        size_bytes /= 1024

    return f"{size_bytes:.1f} GB"



def detect_type(value):

    """Try to auto-detect and cast string values to int/float/bool."""

    if value is None or value == "":

        return None

    v = str(value).strip()

    if v.lower() in ("true", "yes"):

        return True

    if v.lower() in ("false", "no"):

        return False

    try:

        return int(v)

    except ValueError:

        pass

    try:

        return float(v)

    except ValueError:

        pass

    return v



def flatten_dict(d, parent_key="", sep="."):

    """Flatten nested JSON dict to single level for CSV."""

    items = []

    for k, v in d.items():

        new_key = f"{parent_key}{sep}{k}" if parent_key else k

        if isinstance(v, dict):

            items.extend(flatten_dict(v, new_key, sep).items())

        elif isinstance(v, list):

            items.append((new_key, json.dumps(v)))

        else:

            items.append((new_key, v))

    return dict(items)



# ============================================================

# CSV → JSON

# ============================================================


def csv_to_json(input_path, output_path=None,

                delimiter=",", auto_cast=True,

                indent=2, encoding="utf-8"):

    """

    Convert a CSV file to JSON.

    Returns the output path and record count.

    """

    input_path = Path(input_path)


    if not input_path.exists():

        print(f"  File not found: {input_path}")

        return None, 0


    if not output_path:

        output_path = input_path.with_suffix(".json")

    output_path = Path(output_path)


    records = []

    errors  = []


    try:

        with open(input_path, "r", encoding=encoding, newline="") as f:

            reader = csv.DictReader(f, delimiter=delimiter)

            headers = reader.fieldnames


            if not headers:

                print("  CSV file has no headers.")

                return None, 0


            print(f"\n  Reading CSV: {input_path.name}")

            print(f"  Columns ({len(headers)}): {', '.join(headers)}")


            for i, row in enumerate(reader, 1):

                if auto_cast:

                    record = {k: detect_type(v) for k, v in row.items()}

                else:

                    record = dict(row)

                records.append(record)


                if i % 1000 == 0:

                    print(f"  Processed {i} rows...", end="\r")


    except UnicodeDecodeError:

        print("  Encoding error. Trying latin-1...")

        return csv_to_json(input_path, output_path, delimiter,

                           auto_cast, indent, "latin-1")

    except Exception as e:

        print(f"  Error reading CSV: {e}")

        return None, 0


    # Write JSON

    try:

        with open(output_path, "w", encoding="utf-8") as f:

            json.dump(records, f, indent=indent, ensure_ascii=False)


        size = format_size(output_path.stat().st_size)

        print(f"\n  Converted {len(records)} records")

        print(f"  Output   : {output_path}  ({size})")

        return output_path, len(records)


    except Exception as e:

        print(f"  Error writing JSON: {e}")

        return None, 0



# ============================================================

# JSON → CSV

# ============================================================


def json_to_csv(input_path, output_path=None,

                delimiter=",", flatten=True,

                encoding="utf-8"):

    """

    Convert a JSON file (array of objects) to CSV.

    Returns the output path and record count.

    """

    input_path = Path(input_path)


    if not input_path.exists():

        print(f"  File not found: {input_path}")

        return None, 0


    if not output_path:

        output_path = input_path.with_suffix(".csv")

    output_path = Path(output_path)


    try:

        with open(input_path, "r", encoding=encoding) as f:

            data = json.load(f)

    except json.JSONDecodeError as e:

        print(f"  Invalid JSON: {e}")

        return None, 0

    except Exception as e:

        print(f"  Error reading JSON: {e}")

        return None, 0


    # Handle both list and single object

    if isinstance(data, dict):

        # Try common wrapper keys

        for key in ["data", "records", "results", "items", "rows"]:

            if key in data and isinstance(data[key], list):

                data = data[key]

                print(f"  Extracted records from key: '{key}'")

                break

        else:

            data = [data]


    if not isinstance(data, list) or not data:

        print("  JSON does not contain a list of records.")

        return None, 0


    # Flatten nested dicts if requested

    if flatten:

        data = [flatten_dict(r) if isinstance(r, dict) else {"value": r}

                for r in data]


    # Collect all unique keys across all records

    all_keys = list(OrderedDict.fromkeys(

        k for record in data if isinstance(record, dict)

        for k in record.keys()

    ))


    print(f"\n  Reading JSON: {input_path.name}")

    print(f"  Records   : {len(data)}")

    print(f"  Columns ({len(all_keys)}): {', '.join(str(k) for k in all_keys[:10])}"

          f"{'...' if len(all_keys) > 10 else ''}")


    try:

        with open(output_path, "w", encoding=encoding,

                  newline="") as f:

            writer = csv.DictWriter(f, fieldnames=all_keys,

                                    delimiter=delimiter,

                                    extrasaction="ignore")

            writer.writeheader()


            for i, record in enumerate(data, 1):

                if isinstance(record, dict):

                    # Convert lists/dicts in values to strings

                    row = {k: (json.dumps(v) if isinstance(v, (dict, list)) else v)

                           for k, v in record.items()}

                    writer.writerow(row)

                if i % 1000 == 0:

                    print(f"  Written {i} rows...", end="\r")


        size = format_size(output_path.stat().st_size)

        print(f"\n  Converted {len(data)} records")

        print(f"  Output   : {output_path}  ({size})")

        return output_path, len(data)


    except Exception as e:

        print(f"  Error writing CSV: {e}")

        return None, 0



# ============================================================

# PREVIEW FILE

# ============================================================


def preview_csv(filepath, rows=5):

    """Print first N rows of a CSV file."""

    try:

        with open(filepath, "r", encoding="utf-8", newline="") as f:

            reader = csv.reader(f)

            headers = next(reader, None)

            if not headers:

                print("  Empty CSV.")

                return


            print(f"\n  PREVIEW: {Path(filepath).name}")

            print("  " + "-"*60)


            # Header

            col_w = min(18, max(8, 60 // len(headers)))

            header_line = "  " + "  ".join(str(h)[:col_w].ljust(col_w) for h in headers)

            print(header_line)

            print("  " + "-"*60)


            # Rows

            for i, row in enumerate(reader):

                if i >= rows:

                    break

                line = "  " + "  ".join(str(v)[:col_w].ljust(col_w) for v in row)

                print(line)


            print("  " + "-"*60)


    except Exception as e:

        print(f"  Preview error: {e}")



def preview_json(filepath, rows=3):

    """Print first N records of a JSON file."""

    try:

        with open(filepath, "r", encoding="utf-8") as f:

            data = json.load(f)


        if isinstance(data, list):

            preview = data[:rows]

        elif isinstance(data, dict):

            preview = [data]

        else:

            preview = [{"value": data}]


        print(f"\n  PREVIEW: {Path(filepath).name}")

        print("  " + "-"*60)

        print(json.dumps(preview, indent=4, ensure_ascii=False)[:2000])

        if len(str(data)) > 2000:

            print("  ... (truncated)")

        print("  " + "-"*60)


    except Exception as e:

        print(f"  Preview error: {e}")



# ============================================================

# FILE INFO

# ============================================================


def file_info(filepath):

    p = Path(filepath)

    if not p.exists():

        print(f"  File not found: {filepath}")

        return


    size  = format_size(p.stat().st_size)

    mtime = datetime.fromtimestamp(p.stat().st_mtime).strftime("%d-%m-%Y %H:%M")

    ext   = p.suffix.lower()


    print(f"\n  File     : {p.name}")

    print(f"  Size     : {size}")

    print(f"  Modified : {mtime}")

    print(f"  Type     : {ext.upper()[1:]} file")


    if ext == ".csv":

        try:

            with open(filepath, "r", encoding="utf-8", newline="") as f:

                reader = csv.reader(f)

                headers = next(reader, [])

                row_count = sum(1 for _ in reader)

            print(f"  Rows     : {row_count:,}")

            print(f"  Columns  : {len(headers)}")

            print(f"  Headers  : {', '.join(headers[:8])}"

                  f"{'...' if len(headers) > 8 else ''}")

        except:

            pass


    elif ext == ".json":

        try:

            with open(filepath, "r", encoding="utf-8") as f:

                data = json.load(f)

            if isinstance(data, list):

                print(f"  Records  : {len(data):,}")

                if data and isinstance(data[0], dict):

                    keys = list(data[0].keys())

                    print(f"  Keys     : {', '.join(str(k) for k in keys[:8])}"

                          f"{'...' if len(keys) > 8 else ''}")

            elif isinstance(data, dict):

                print(f"  Keys     : {', '.join(list(data.keys())[:8])}")

        except:

            pass



# ============================================================

# BATCH CONVERT

# ============================================================


def batch_convert(folder, direction):

    folder = Path(folder)

    if not folder.is_dir():

        print("  Invalid folder.")

        return


    if direction == "csv_to_json":

        files = list(folder.glob("*.csv"))

        ext_from, ext_to = "CSV", "JSON"

    else:

        files = list(folder.glob("*.json"))

        ext_from, ext_to = "JSON", "CSV"


    if not files:

        print(f"  No {ext_from} files found in {folder}")

        return


    print(f"\n  Found {len(files)} {ext_from} file(s). Converting to {ext_to}...")

    success = 0


    for f in files:

        print(f"\n  [{f.name}]")

        if direction == "csv_to_json":

            out, count = csv_to_json(f)

        else:

            out, count = json_to_csv(f)

        if out:

            success += 1


    print(f"\n  Batch done: {success}/{len(files)} converted successfully.")



# ============================================================

# MAIN MENU

# ============================================================


def print_menu():

    print("\n" + "-"*48)

    print("  CSV  <->  JSON  CONVERTER")

    print("-"*48)

    print("  1. CSV  →  JSON")

    print("  2. JSON →  CSV")

    print("  3. Preview a CSV file")

    print("  4. Preview a JSON file")

    print("  5. File info & stats")

    print("  6. Batch convert CSV → JSON (folder)")

    print("  7. Batch convert JSON → CSV (folder)")

    print("  0. Exit")

    print("-"*48)



def main():

    print("\n" + "="*55)

    print("     CSV  <->  JSON  CONVERTER")

    print("="*55)

    print("\n  Bidirectional converter with auto type detection,")

    print("  nested JSON flattening, and batch processing.\n")


    while True:

        print_menu()

        choice = input("  > ").strip()


        # ── CSV → JSON ──────────────────────────────────────

        if choice == "1":

            print("\n  CSV to JSON Conversion")

            inp = input("  Input CSV file path: ").strip()

            if not inp:

                continue


            out = input("  Output JSON path (Enter = same name): ").strip() or None


            delim = input("  Delimiter (, or ; or Tab): ").strip()

            if delim.lower() == "tab":

                delim = "\t"

            elif delim not in [",", ";", "|", "\t"]:

                delim = ","


            cast = input("  Auto-detect types? (y/n, default y): ").strip().lower()

            cast = cast != "n"


            indent = input("  JSON indent spaces (default 2): ").strip()

            indent = int(indent) if indent.isdigit() else 2


            out_path, count = csv_to_json(inp, out, delim, cast, indent)


            if out_path and count:

                preview = input("\n  Preview output? (y/n): ").strip().lower()

                if preview == "y":

                    preview_json(out_path, rows=3)


        # ── JSON → CSV ──────────────────────────────────────

        elif choice == "2":

            print("\n  JSON to CSV Conversion")

            inp = input("  Input JSON file path: ").strip()

            if not inp:

                continue


            out = input("  Output CSV path (Enter = same name): ").strip() or None


            delim = input("  Delimiter (, or ; or Tab): ").strip()

            if delim.lower() == "tab":

                delim = "\t"

            elif delim not in [",", ";", "|", "\t"]:

                delim = ","


            flat = input("  Flatten nested objects? (y/n, default y): ").strip().lower()

            flat = flat != "n"


            out_path, count = json_to_csv(inp, out, delim, flat)


            if out_path and count:

                preview = input("\n  Preview output? (y/n): ").strip().lower()

                if preview == "y":

                    preview_csv(out_path, rows=5)


        # ── Preview CSV ─────────────────────────────────────

        elif choice == "3":

            path = input("\n  CSV file path: ").strip()

            rows = input("  Rows to preview (default 5): ").strip()

            rows = int(rows) if rows.isdigit() else 5

            preview_csv(path, rows)


        # ── Preview JSON ────────────────────────────────────

        elif choice == "4":

            path = input("\n  JSON file path: ").strip()

            rows = input("  Records to preview (default 3): ").strip()

            rows = int(rows) if rows.isdigit() else 3

            preview_json(path, rows)


        # ── File Info ───────────────────────────────────────

        elif choice == "5":

            path = input("\n  File path: ").strip()

            file_info(path)


        # ── Batch CSV → JSON ────────────────────────────────

        elif choice == "6":

            folder = input("\n  Folder path: ").strip()

            batch_convert(folder, "csv_to_json")


        # ── Batch JSON → CSV ────────────────────────────────

        elif choice == "7":

            folder = input("\n  Folder path: ").strip()

            batch_convert(folder, "json_to_csv")


        elif choice == "0":

            print("\n  Goodbye!\n")

            break


        else:

            print("  Invalid choice.")



# ============================================================

# RUN

# ============================================================


if __name__ == "__main__":

    main()

No comments: