System Health Monitor

import os

import json

import time

import threading

from datetime import datetime, timedelta

from pathlib import Path

from collections import deque


try:

    import psutil

    PSUTIL_OK = True

except ImportError:

    PSUTIL_OK = False


# ============================================================

# CONFIGURATION

# ============================================================


LOG_FILE        = "system_health_log.json"

REFRESH_INTERVAL = 2      # seconds between live updates

HISTORY_LEN      = 60     # data points to keep in history

ALERT_CPU        = 85.0   # % CPU alert threshold

ALERT_RAM        = 85.0   # % RAM alert threshold

ALERT_DISK       = 90.0   # % Disk alert threshold

ALERT_TEMP       = 80.0   # °C CPU temp alert


# ============================================================

# HELPERS

# ============================================================


def format_size(size_bytes):

    for unit in ["B", "KB", "MB", "GB", "TB"]:

        if size_bytes < 1024:

            return f"{size_bytes:.1f} {unit}"

        size_bytes /= 1024

    return f"{size_bytes:.1f} PB"



def format_uptime(seconds):

    td = timedelta(seconds=int(seconds))

    days    = td.days

    hours   = td.seconds // 3600

    minutes = (td.seconds % 3600) // 60

    secs    = td.seconds % 60

    if days:

        return f"{days}d {hours}h {minutes}m"

    elif hours:

        return f"{hours}h {minutes}m {secs}s"

    else:

        return f"{minutes}m {secs}s"



def draw_bar(value, max_val=100, width=30, fill="█", empty="░"):

    filled = int((min(value, max_val) / max_val) * width)

    return fill * filled + empty * (width - filled)



def color_bar(value, width=30):

    """Return bar with level indicator based on value."""

    bar = draw_bar(value, width=width)

    if value >= 90:

        level = "CRITICAL"

    elif value >= 75:

        level = "HIGH    "

    elif value >= 50:

        level = "MEDIUM  "

    else:

        level = "OK      "

    return bar, level



# ============================================================

# DATA COLLECTORS

# ============================================================


def get_cpu_info():

    data = {

        "percent":      psutil.cpu_percent(interval=0.5),

        "per_core":     psutil.cpu_percent(interval=0.5, percpu=True),

        "count_logical": psutil.cpu_count(logical=True),

        "count_physical": psutil.cpu_count(logical=False),

        "freq":         None,

        "temp":         None,

    }

    try:

        freq = psutil.cpu_freq()

        if freq:

            data["freq"] = {

                "current": round(freq.current, 1),

                "min":     round(freq.min, 1),

                "max":     round(freq.max, 1)

            }

    except:

        pass


    try:

        temps = psutil.sensors_temperatures()

        if temps:

            for key in ["coretemp", "cpu_thermal", "k10temp", "acpitz"]:

                if key in temps and temps[key]:

                    data["temp"] = round(temps[key][0].current, 1)

                    break

    except:

        pass


    return data



def get_ram_info():

    vm  = psutil.virtual_memory()

    swp = psutil.swap_memory()

    return {

        "total":        vm.total,

        "available":    vm.available,

        "used":         vm.used,

        "percent":      vm.percent,

        "swap_total":   swp.total,

        "swap_used":    swp.used,

        "swap_percent": swp.percent,

    }



def get_disk_info():

    disks = []

    for part in psutil.disk_partitions(all=False):

        try:

            usage = psutil.disk_usage(part.mountpoint)

            disks.append({

                "device":     part.device,

                "mountpoint": part.mountpoint,

                "fstype":     part.fstype,

                "total":      usage.total,

                "used":       usage.used,

                "free":       usage.free,

                "percent":    usage.percent,

            })

        except (PermissionError, OSError):

            continue

    return disks



def get_network_info():

    net     = psutil.net_io_counters()

    addrs   = psutil.net_if_addrs()

    stats   = psutil.net_if_stats()


    active_ifaces = []

    for iface, stat in stats.items():

        if stat.isup and iface in addrs:

            for addr in addrs[iface]:

                if addr.family.name == "AF_INET":

                    active_ifaces.append({

                        "name":  iface,

                        "ip":    addr.address,

                        "speed": stat.speed

                    })

                    break


    return {

        "bytes_sent":   net.bytes_sent,

        "bytes_recv":   net.bytes_recv,

        "packets_sent": net.packets_sent,

        "packets_recv": net.packets_recv,

        "interfaces":   active_ifaces,

    }



def get_battery_info():

    try:

        bat = psutil.sensors_battery()

        if bat:

            return {

                "percent":    round(bat.percent, 1),

                "plugged":    bat.power_plugged,

                "secs_left":  bat.secsleft if bat.secsleft != psutil.POWER_TIME_UNLIMITED else -1,

            }

    except:

        pass

    return None



def get_top_processes(top_n=10):

    procs = []

    for p in psutil.process_iter(["pid", "name", "cpu_percent",

                                   "memory_percent", "status"]):

        try:

            procs.append(p.info)

        except (psutil.NoSuchProcess, psutil.AccessDenied):

            continue

    # Sort by CPU then memory

    return sorted(procs,

                  key=lambda x: (x.get("cpu_percent") or 0),

                  reverse=True)[:top_n]



def get_system_info():

    boot_time = psutil.boot_time()

    uptime    = time.time() - boot_time

    return {

        "platform":   os.name,

        "boot_time":  datetime.fromtimestamp(boot_time).strftime("%d-%m-%Y %H:%M:%S"),

        "uptime_sec": uptime,

        "uptime_str": format_uptime(uptime),

        "hostname":   os.uname().nodename if hasattr(os, "uname") else os.environ.get("COMPUTERNAME", "N/A"),

        "user":       os.environ.get("USER") or os.environ.get("USERNAME", "N/A"),

    }



# ============================================================

# DISPLAY SECTIONS

# ============================================================


def clear_screen():

    os.system("cls" if os.name == "nt" else "clear")



def display_header():

    now = datetime.now().strftime("%d-%m-%Y  %H:%M:%S")

    print("=" * 65)

    print(f"   SYSTEM HEALTH MONITOR          {now}")

    print("=" * 65)



def display_cpu(cpu):

    bar, level = color_bar(cpu["percent"])

    print(f"\n  CPU Usage   : {cpu['percent']:>5.1f}%  {bar}  {level}")

    if cpu.get("freq"):

        f = cpu["freq"]

        print(f"  CPU Freq    : {f['current']} MHz  "

              f"(min: {f['min']}  max: {f['max']})")

    print(f"  Cores       : {cpu['count_physical']} physical / "

          f"{cpu['count_logical']} logical")

    if cpu.get("temp"):

        temp_bar, _ = color_bar(cpu["temp"], width=20)

        print(f"  Temperature : {cpu['temp']}°C  {temp_bar}")


    # Per-core bars

    if cpu.get("per_core") and len(cpu["per_core"]) <= 16:

        print(f"  Per-Core    :", end="")

        for i, pct in enumerate(cpu["per_core"]):

            mini_bar = draw_bar(pct, width=8)

            print(f"  C{i}: {pct:4.1f}% {mini_bar}", end="")

            if (i + 1) % 2 == 0:

                print()

                print("              ", end="")

        print()



def display_ram(ram):

    bar, level = color_bar(ram["percent"])

    print(f"\n  RAM Usage   : {ram['percent']:>5.1f}%  {bar}  {level}")

    print(f"  Used / Total: {format_size(ram['used'])} / {format_size(ram['total'])}")

    print(f"  Available   : {format_size(ram['available'])}")


    if ram["swap_total"] > 0:

        sbar, slevel = color_bar(ram["swap_percent"])

        print(f"  Swap Usage  : {ram['swap_percent']:>5.1f}%  {sbar}  {slevel}")

        print(f"  Swap Used   : {format_size(ram['swap_used'])} / "

              f"{format_size(ram['swap_total'])}")



def display_disk(disks):

    print(f"\n  DISK USAGE")

    print("  " + "-"*60)

    for d in disks:

        bar, level = color_bar(d["percent"], width=20)

        print(f"  {d['mountpoint']:<12} {d['percent']:>5.1f}%  {bar}  {level}")

        print(f"  {'':12} Used: {format_size(d['used'])} / "

              f"{format_size(d['total'])}  "

              f"Free: {format_size(d['free'])}")

        print(f"  {'':12} FS: {d['fstype']}  Device: {d['device']}")



def display_network(net):

    print(f"\n  NETWORK")

    print("  " + "-"*60)

    print(f"  Sent         : {format_size(net['bytes_sent'])}")

    print(f"  Received     : {format_size(net['bytes_recv'])}")

    print(f"  Packets Sent : {net['packets_sent']:,}")

    print(f"  Packets Recv : {net['packets_recv']:,}")

    if net["interfaces"]:

        print(f"  Interfaces   :")

        for iface in net["interfaces"]:

            speed = f"{iface['speed']} Mbps" if iface["speed"] else "N/A"

            print(f"    {iface['name']:<15} IP: {iface['ip']:<18} Speed: {speed}")



def display_battery(bat):

    if not bat:

        return

    print(f"\n  BATTERY")

    print("  " + "-"*60)

    bar, level = color_bar(bat["percent"])

    plug = "Plugged In" if bat["plugged"] else "On Battery"

    print(f"  Charge      : {bat['percent']:>5.1f}%  {bar}  {level}")

    print(f"  Status      : {plug}")

    if bat["secs_left"] > 0 and not bat["plugged"]:

        print(f"  Time Left   : {format_uptime(bat['secs_left'])}")



def display_processes(procs):

    print(f"\n  TOP PROCESSES  (by CPU%)")

    print("  " + "-"*60)

    print(f"  {'PID':<8} {'NAME':<22} {'CPU%':>6}  {'MEM%':>6}  STATUS")

    print("  " + "-"*55)

    for p in procs[:10]:

        name   = (p.get("name") or "?")[:20]

        cpu    = p.get("cpu_percent") or 0.0

        mem    = p.get("memory_percent") or 0.0

        status = p.get("status") or "?"

        print(f"  {p['pid']:<8} {name:<22} {cpu:>5.1f}%  {mem:>5.1f}%  {status}")



def display_system(sys_info):

    print(f"\n  SYSTEM INFO")

    print("  " + "-"*60)

    print(f"  Hostname    : {sys_info['hostname']}")

    print(f"  User        : {sys_info['user']}")

    print(f"  Boot Time   : {sys_info['boot_time']}")

    print(f"  Uptime      : {sys_info['uptime_str']}")



# ============================================================

# ALERTS

# ============================================================


def check_alerts(cpu, ram, disks, bat, temp):

    alerts = []

    if cpu["percent"] >= ALERT_CPU:

        alerts.append(f"HIGH CPU: {cpu['percent']:.1f}% (threshold: {ALERT_CPU}%)")

    if ram["percent"] >= ALERT_RAM:

        alerts.append(f"HIGH RAM: {ram['percent']:.1f}% (threshold: {ALERT_RAM}%)")

    for d in disks:

        if d["percent"] >= ALERT_DISK:

            alerts.append(f"DISK FULL: {d['mountpoint']} at "

                          f"{d['percent']:.1f}% (threshold: {ALERT_DISK}%)")

    if cpu.get("temp") and cpu["temp"] >= ALERT_TEMP:

        alerts.append(f"HIGH TEMP: {cpu['temp']}°C (threshold: {ALERT_TEMP}°C)")

    if bat and not bat["plugged"] and bat["percent"] <= 15:

        alerts.append(f"LOW BATTERY: {bat['percent']}% — please plug in!")

    return alerts



# ============================================================

# LIVE DASHBOARD

# ============================================================


def live_dashboard(duration_secs=0):

    """

    Show a live-refreshing dashboard.

    duration_secs=0 runs until user presses Ctrl+C.

    """

    print("\n  Starting live dashboard... Press Ctrl+C to stop.\n")

    time.sleep(1)


    start      = time.time()

    prev_net   = psutil.net_io_counters()

    prev_time  = time.time()


    try:

        while True:

            clear_screen()

            display_header()


            cpu     = get_cpu_info()

            ram     = get_ram_info()

            disks   = get_disk_info()

            net     = get_network_info()

            bat     = get_battery_info()

            procs   = get_top_processes()

            sysinfo = get_system_info()


            # Network speed (bytes per second)

            now_net   = psutil.net_io_counters()

            now_time  = time.time()

            elapsed   = now_time - prev_time or 1

            dl_speed  = (now_net.bytes_recv - prev_net.bytes_recv) / elapsed

            ul_speed  = (now_net.bytes_sent - prev_net.bytes_sent) / elapsed

            prev_net  = now_net

            prev_time = now_time


            display_system(sysinfo)

            display_cpu(cpu)

            display_ram(ram)

            display_disk(disks)


            # Network with live speed

            print(f"\n  NETWORK  (Live speed)")

            print("  " + "-"*60)

            print(f"  Download Speed : {format_size(dl_speed)}/s")

            print(f"  Upload Speed   : {format_size(ul_speed)}/s")

            print(f"  Total Sent     : {format_size(net['bytes_sent'])}")

            print(f"  Total Received : {format_size(net['bytes_recv'])}")

            if net["interfaces"]:

                for iface in net["interfaces"]:

                    print(f"  {iface['name']:<15} {iface['ip']}")


            display_battery(bat)

            display_processes(procs)


            # Alerts

            alerts = check_alerts(cpu, ram, disks, bat, cpu.get("temp"))

            if alerts:

                print(f"\n  *** ALERTS ***")

                for a in alerts:

                    print(f"  ! {a}")


            print(f"\n  Refreshing every {REFRESH_INTERVAL}s  |  Ctrl+C to stop")


            if duration_secs > 0 and (time.time() - start) >= duration_secs:

                break


            time.sleep(REFRESH_INTERVAL)


    except KeyboardInterrupt:

        print("\n\n  Dashboard stopped.")



# ============================================================

# SNAPSHOT REPORT

# ============================================================


def snapshot_report():

    print("\n  Taking system snapshot...")

    cpu     = get_cpu_info()

    ram     = get_ram_info()

    disks   = get_disk_info()

    net     = get_network_info()

    bat     = get_battery_info()

    sysinfo = get_system_info()


    clear_screen()

    display_header()

    display_system(sysinfo)

    display_cpu(cpu)

    display_ram(ram)

    display_disk(disks)

    display_network(net)

    display_battery(bat)


    alerts = check_alerts(cpu, ram, disks, bat, cpu.get("temp"))

    if alerts:

        print(f"\n  *** ALERTS ***")

        for a in alerts:

            print(f"  ! {a}")

    else:

        print("\n  All systems OK. No alerts.")



# ============================================================

# SAVE SNAPSHOT TO JSON

# ============================================================


def save_snapshot():

    cpu     = get_cpu_info()

    ram     = get_ram_info()

    disks   = get_disk_info()

    net     = get_network_info()

    bat     = get_battery_info()

    sysinfo = get_system_info()


    snapshot = {

        "timestamp": datetime.now().strftime("%d-%m-%Y %H:%M:%S"),

        "system":    sysinfo,

        "cpu": {

            "percent": cpu["percent"],

            "freq":    cpu.get("freq"),

            "temp":    cpu.get("temp"),

            "cores":   cpu["count_logical"]

        },

        "ram": {

            "percent":   ram["percent"],

            "used_gb":   round(ram["used"] / 1e9, 2),

            "total_gb":  round(ram["total"] / 1e9, 2),

        },

        "disks": [

            {

                "mount":   d["mountpoint"],

                "percent": d["percent"],

                "free_gb": round(d["free"] / 1e9, 2),

            }

            for d in disks

        ],

        "battery": bat,

        "network": {

            "bytes_sent": net["bytes_sent"],

            "bytes_recv": net["bytes_recv"],

        }

    }


    # Append to log file

    log = []

    if Path(LOG_FILE).exists():

        try:

            with open(LOG_FILE, "r") as f:

                log = json.load(f)

        except:

            log = []


    log.append(snapshot)

    log = log[-200:]   # keep last 200 snapshots


    with open(LOG_FILE, "w") as f:

        json.dump(log, f, indent=2)


    print(f"\n  Snapshot saved to {LOG_FILE}  ({len(log)} total)")



# ============================================================

# MAIN MENU

# ============================================================


def print_menu():

    print("\n" + "-"*48)

    print("  SYSTEM HEALTH MONITOR")

    print("-"*48)

    print("  1. Live dashboard (auto-refresh)")

    print("  2. Snapshot report (one-time view)")

    print("  3. Top processes")

    print("  4. Disk usage only")

    print("  5. Network info")

    print("  6. Save snapshot to JSON log")

    print("  7. Alert thresholds info")

    print("  0. Exit")

    print("-"*48)



def main():

    print("\n" + "="*55)

    print("     SYSTEM HEALTH MONITOR")

    print("="*55)


    if not PSUTIL_OK:

        print("\n  psutil is not installed!")

        print("  Install it with:  pip install psutil")

        return


    while True:

        print_menu()

        choice = input("  > ").strip()


        if choice == "1":

            live_dashboard()


        elif choice == "2":

            snapshot_report()

            input("\n  Press Enter to return to menu...")


        elif choice == "3":

            procs = get_top_processes(15)

            clear_screen()

            display_header()

            display_processes(procs)

            input("\n  Press Enter to return to menu...")


        elif choice == "4":

            disks = get_disk_info()

            clear_screen()

            display_header()

            display_disk(disks)

            input("\n  Press Enter to return to menu...")


        elif choice == "5":

            net = get_network_info()

            clear_screen()

            display_header()

            display_network(net)

            input("\n  Press Enter to return to menu...")


        elif choice == "6":

            save_snapshot()


        elif choice == "7":

            print(f"\n  Current alert thresholds:")

            print(f"  CPU usage   : >= {ALERT_CPU}%")

            print(f"  RAM usage   : >= {ALERT_RAM}%")

            print(f"  Disk usage  : >= {ALERT_DISK}%")

            print(f"  CPU temp    : >= {ALERT_TEMP}°C")

            print(f"  Battery low : <= 15%")

            print(f"\n  Edit ALERT_* constants at the top of the script to change.")


        elif choice == "0":

            print("\n  Goodbye!\n")

            break


        else:

            print("  Invalid choice.")



# ============================================================

# RUN

# ============================================================


if __name__ == "__main__":

    main()

No comments: