Log File Analyzer

import re

import os

import json

from datetime import datetime

from pathlib import Path

from collections import defaultdict, Counter


# ============================================================

# CONFIGURATION

# ============================================================


REPORT_FILE = "log_analysis_report.json"


# Common log level patterns

LOG_LEVELS = ["CRITICAL", "ERROR", "WARNING", "WARN", "INFO", "DEBUG", "TRACE", "FATAL"]


# Built-in format patterns

LOG_FORMATS = {

    "apache_access": r'(?P<ip>\S+) \S+ \S+ \[(?P<datetime>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+)[^"]*" (?P<status>\d{3}) (?P<size>\S+)',

    "apache_error":  r'\[(?P<datetime>[^\]]+)\] \[(?P<level>\w+)\] (?P<message>.*)',

    "nginx_access":  r'(?P<ip>\S+) - \S+ \[(?P<datetime>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+)[^"]*" (?P<status>\d{3}) (?P<size>\d+)',

    "python_log":    r'(?P<datetime>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}),\d+ (?P<level>\w+) (?P<logger>\S+) (?P<message>.*)',

    "syslog":        r'(?P<datetime>\w{3}\s+\d+ \d{2}:\d{2}:\d{2}) (?P<host>\S+) (?P<process>\S+): (?P<message>.*)',

    "generic":       r'(?P<datetime>\d{4}[-/]\d{2}[-/]\d{2}[T ]\d{2}:\d{2}:\d{2}).*?(?P<level>CRITICAL|ERROR|WARNING|WARN|INFO|DEBUG|TRACE|FATAL).*?(?P<message>.*)',

}


# ============================================================

# HELPERS

# ============================================================


def draw_bar(value, max_val, width=25, fill="█", empty="░"):

    if max_val == 0:

        return ""

    filled = int((min(value, max_val) / max_val) * width)

    return fill * filled + empty * (width - filled)



def format_size(size_bytes):

    for unit in ["B", "KB", "MB", "GB"]:

        if size_bytes < 1024:

            return f"{size_bytes:.1f} {unit}"

        size_bytes /= 1024

    return f"{size_bytes:.1f} GB"



def safe_read(filepath, encoding="utf-8"):

    try:

        with open(filepath, "r", encoding=encoding, errors="replace") as f:

            return f.readlines()

    except Exception as e:

        print(f"  Error reading file: {e}")

        return []



# ============================================================

# AUTO-DETECT LOG FORMAT

# ============================================================


def detect_format(lines):

    """Try each format pattern on first 20 lines. Return best match."""

    sample = lines[:20]

    scores = {}

    for fmt_name, pattern in LOG_FORMATS.items():

        matches = sum(1 for line in sample if re.search(pattern, line))

        scores[fmt_name] = matches


    best = max(scores, key=scores.get)

    if scores[best] > 0:

        return best, scores[best]

    return "generic", 0



# ============================================================

# CORE PARSER

# ============================================================


def parse_log(filepath, fmt_name=None, keyword_filter=None,

              level_filter=None, max_lines=0):

    """

    Parse a log file and return structured data.

    Returns dict with entries, stats, errors.

    """

    lines = safe_read(filepath)

    if not lines:

        return None


    total_lines = len(lines)

    print(f"\n  File    : {Path(filepath).name}")

    print(f"  Size    : {format_size(Path(filepath).stat().st_size)}")

    print(f"  Lines   : {total_lines:,}")


    # Auto-detect format

    if not fmt_name or fmt_name == "auto":

        fmt_name, score = detect_format(lines)

        print(f"  Format  : {fmt_name} (auto-detected, {score}/20 lines matched)")

    else:

        print(f"  Format  : {fmt_name}")


    pattern = LOG_FORMATS.get(fmt_name, LOG_FORMATS["generic"])


    # Limit lines if requested

    if max_lines > 0:

        lines = lines[:max_lines]

        print(f"  Parsing : first {max_lines:,} lines")


    # Parse each line

    entries       = []

    level_counts  = Counter()

    hour_counts   = Counter()

    ip_counts     = Counter()

    status_counts = Counter()

    path_counts   = Counter()

    message_list  = []

    parse_errors  = 0

    keyword_hits  = []


    for i, line in enumerate(lines):

        line = line.rstrip()

        if not line:

            continue


        match = re.search(pattern, line, re.IGNORECASE)


        if match:

            entry = match.groupdict()


            # Normalize level

            level = entry.get("level", "").upper()

            if not level:

                # Try to find level anywhere in line

                for lvl in LOG_LEVELS:

                    if lvl in line.upper():

                        level = lvl

                        break

            entry["level"] = level or "UNKNOWN"

            level_counts[entry["level"]] += 1


            # Extract hour from datetime

            dt_str = entry.get("datetime", "")

            for fmt in ["%Y-%m-%d %H:%M:%S", "%d/%b/%Y:%H:%M:%S",

                        "%b %d %H:%M:%S", "%Y/%m/%d %H:%M:%S"]:

                try:

                    dt = datetime.strptime(dt_str[:len(fmt)], fmt)

                    hour_counts[dt.strftime("%H:00")] += 1

                    entry["datetime_parsed"] = dt.strftime("%d-%m-%Y %H:%M:%S")

                    break

                except:

                    continue


            # Track IPs, status codes, paths (for web logs)

            if "ip" in entry and entry["ip"]:

                ip_counts[entry["ip"]] += 1

            if "status" in entry and entry["status"]:

                status_counts[entry["status"]] += 1

            if "path" in entry and entry["path"]:

                path_counts[entry["path"]] += 1


            # Track messages

            msg = entry.get("message", line[:120])

            message_list.append(msg)


            # Apply filters

            if level_filter and entry["level"] not in [l.upper() for l in level_filter]:

                continue

            if keyword_filter and keyword_filter.lower() not in line.lower():

                continue


            entry["line_no"] = i + 1

            entry["raw"]     = line[:200]

            entries.append(entry)


            # Keyword tracking

            if keyword_filter and keyword_filter.lower() in line.lower():

                keyword_hits.append({"line": i + 1, "text": line[:200]})


        else:

            parse_errors += 1


    parse_rate = round((1 - parse_errors / max(len(lines), 1)) * 100, 1)

    print(f"  Parsed  : {len(entries):,} entries  ({parse_rate}% success rate)")


    return {

        "filepath":     str(filepath),

        "filename":     Path(filepath).name,

        "total_lines":  total_lines,

        "parsed":       len(entries),

        "parse_errors": parse_errors,

        "format":       fmt_name,

        "entries":      entries,

        "level_counts": dict(level_counts),

        "hour_counts":  dict(hour_counts),

        "ip_counts":    dict(ip_counts),

        "status_counts":dict(status_counts),

        "path_counts":  dict(path_counts),

        "message_list": message_list,

        "keyword_hits": keyword_hits,

    }



# ============================================================

# DISPLAY: LEVEL SUMMARY

# ============================================================


def display_level_summary(result):

    counts   = result["level_counts"]

    total    = sum(counts.values())

    if not counts:

        print("\n  No log level data found.")

        return


    max_val  = max(counts.values())


    print("\n" + "="*55)

    print(f"  LOG LEVEL SUMMARY  —  {result['filename']}")

    print("="*55)

    print(f"  {'LEVEL':<12} {'COUNT':>8}  {'%':>6}  BAR")

    print("  " + "-"*51)


    order = ["FATAL", "CRITICAL", "ERROR", "WARNING", "WARN",

             "INFO", "DEBUG", "TRACE", "UNKNOWN"]


    for level in order:

        count = counts.get(level, 0)

        if count == 0:

            continue

        pct = count / total * 100

        bar = draw_bar(count, max_val)

        print(f"  {level:<12} {count:>8,}  {pct:>5.1f}%  {bar}")


    print("  " + "-"*51)

    print(f"  {'TOTAL':<12} {total:>8,}")

    print("="*55)


    # Highlight issues

    errors   = counts.get("ERROR", 0) + counts.get("CRITICAL", 0) + counts.get("FATAL", 0)

    warnings = counts.get("WARNING", 0) + counts.get("WARN", 0)


    if errors > 0:

        print(f"\n  *** {errors:,} ERROR/CRITICAL/FATAL entries found!")

    if warnings > 0:

        print(f"  *** {warnings:,} WARNING entries found.")

    if errors == 0 and warnings == 0:

        print("\n  All clear — no errors or warnings detected.")



# ============================================================

# DISPLAY: HOURLY ACTIVITY

# ============================================================


def display_hourly_activity(result):

    counts = result["hour_counts"]

    if not counts:

        print("\n  No timestamp data found for hourly analysis.")

        return


    max_val = max(counts.values())


    print("\n" + "="*55)

    print("  HOURLY ACTIVITY")

    print("="*55)


    for hour in sorted(counts.keys()):

        count = counts[hour]

        bar   = draw_bar(count, max_val, width=30)

        print(f"  {hour}  {count:>6,}  {bar}")


    peak_hour  = max(counts, key=counts.get)

    quiet_hour = min(counts, key=counts.get)

    print("="*55)

    print(f"  Peak hour  : {peak_hour}  ({counts[peak_hour]:,} entries)")

    print(f"  Quiet hour : {quiet_hour}  ({counts[quiet_hour]:,} entries)")



# ============================================================

# DISPLAY: TOP IPs / PATHS / STATUS CODES

# ============================================================


def display_web_stats(result, top_n=10):

    ip_counts     = result.get("ip_counts", {})

    status_counts = result.get("status_counts", {})

    path_counts   = result.get("path_counts", {})


    if not ip_counts and not status_counts and not path_counts:

        print("\n  No web log data (IPs / status codes / paths) found.")

        return


    print("\n" + "="*55)

    print("  WEB LOG STATISTICS")

    print("="*55)


    if ip_counts:

        top_ips = sorted(ip_counts.items(), key=lambda x: x[1], reverse=True)[:top_n]

        max_val = top_ips[0][1]

        print(f"\n  TOP {top_n} IP ADDRESSES:")

        print(f"  {'IP':<18} {'REQUESTS':>10}  BAR")

        print("  " + "-"*45)

        for ip, count in top_ips:

            bar = draw_bar(count, max_val, width=15)

            print(f"  {ip:<18} {count:>10,}  {bar}")


    if status_counts:

        print(f"\n  HTTP STATUS CODES:")

        print(f"  {'CODE':<8} {'COUNT':>10}  MEANING")

        print("  " + "-"*40)

        meanings = {

            "200": "OK", "201": "Created", "301": "Moved Permanently",

            "302": "Found", "304": "Not Modified", "400": "Bad Request",

            "401": "Unauthorized", "403": "Forbidden", "404": "Not Found",

            "500": "Internal Server Error", "502": "Bad Gateway",

            "503": "Service Unavailable"

        }

        for code, count in sorted(status_counts.items()):

            meaning = meanings.get(code, "")

            print(f"  {code:<8} {count:>10,}  {meaning}")


    if path_counts:

        top_paths = sorted(path_counts.items(), key=lambda x: x[1], reverse=True)[:top_n]

        max_val   = top_paths[0][1]

        print(f"\n  TOP {top_n} REQUESTED PATHS:")

        print(f"  {'PATH':<35} {'HITS':>8}  BAR")

        print("  " + "-"*55)

        for path, count in top_paths:

            display_path = path[:33] + ".." if len(path) > 35 else path

            bar = draw_bar(count, max_val, width=10)

            print(f"  {display_path:<35} {count:>8,}  {bar}")


    print("="*55)



# ============================================================

# DISPLAY: RECENT ERRORS

# ============================================================


def display_errors(result, top_n=20):

    entries = result["entries"]

    errors  = [e for e in entries

               if e.get("level") in ("ERROR", "CRITICAL", "FATAL")]


    if not errors:

        print("\n  No ERROR / CRITICAL / FATAL entries found.")

        return


    print("\n" + "="*65)

    print(f"  ERRORS & CRITICAL ENTRIES  ({len(errors):,} total, showing last {top_n})")

    print("="*65)


    for e in errors[-top_n:]:

        dt  = e.get("datetime_parsed") or e.get("datetime", "")[:19]

        msg = e.get("message") or e.get("raw", "")

        msg = msg[:80]

        print(f"\n  [{e['level']}]  Line {e.get('line_no','?')}  {dt}")

        print(f"  {msg}")


    print("="*65)



# ============================================================

# DISPLAY: KEYWORD SEARCH RESULTS

# ============================================================


def display_keyword_hits(result, keyword):

    hits = result.get("keyword_hits", [])

    if not hits:

        print(f"\n  No matches found for: '{keyword}'")

        return


    print("\n" + "="*65)

    print(f"  KEYWORD SEARCH: '{keyword}'  ({len(hits):,} matches)")

    print("="*65)


    for hit in hits[:30]:

        print(f"\n  Line {hit['line']:>6}:  {hit['text'][:100]}")


    if len(hits) > 30:

        print(f"\n  ... and {len(hits) - 30} more matches.")

    print("="*65)



# ============================================================

# DISPLAY: COMMON PATTERNS / REPEATED MESSAGES

# ============================================================


def display_patterns(result, top_n=15):

    messages = result.get("message_list", [])

    if not messages:

        print("\n  No messages extracted.")

        return


    # Normalize messages (strip numbers/IPs for better grouping)

    def normalize(msg):

        msg = re.sub(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', '<IP>', msg)

        msg = re.sub(r'\b\d+\b', '<N>', msg)

        msg = re.sub(r'"[^"]*"', '"<VAL>"', msg)

        return msg[:80].strip()


    normalized = [normalize(m) for m in messages if m.strip()]

    counts     = Counter(normalized)

    top        = counts.most_common(top_n)


    max_val    = top[0][1] if top else 1


    print("\n" + "="*65)

    print(f"  MOST REPEATED LOG PATTERNS  (Top {top_n})")

    print("="*65)

    print(f"  {'COUNT':>7}  MESSAGE PATTERN")

    print("  " + "-"*60)


    for pattern, count in top:

        bar = draw_bar(count, max_val, width=12)

        print(f"  {count:>7,}  {bar}  {pattern}")


    print("="*65)



# ============================================================

# FULL REPORT

# ============================================================


def full_report(result):

    display_level_summary(result)

    display_hourly_activity(result)

    display_web_stats(result)

    display_errors(result)

    display_patterns(result)



# ============================================================

# SAVE REPORT TO JSON

# ============================================================


def save_report(result):

    report = {

        "filename":      result["filename"],

        "analyzed_at":   datetime.now().strftime("%d-%m-%Y %H:%M:%S"),

        "total_lines":   result["total_lines"],

        "parsed":        result["parsed"],

        "format":        result["format"],

        "level_counts":  result["level_counts"],

        "top_ips":       dict(sorted(result["ip_counts"].items(),

                                     key=lambda x: x[1], reverse=True)[:20]),

        "status_codes":  result["status_counts"],

        "top_paths":     dict(sorted(result["path_counts"].items(),

                                     key=lambda x: x[1], reverse=True)[:20]),

        "hourly":        result["hour_counts"],

    }


    with open(REPORT_FILE, "w") as f:

        json.dump(report, f, indent=2)

    print(f"\n  Report saved: {REPORT_FILE}")



# ============================================================

# BATCH ANALYZE (FOLDER)

# ============================================================


def batch_analyze(folder):

    folder = Path(folder)

    if not folder.is_dir():

        print("  Invalid folder.")

        return


    log_files = (list(folder.glob("*.log")) +

                 list(folder.glob("*.txt")) +

                 list(folder.glob("*.out")))


    if not log_files:

        print(f"  No .log / .txt / .out files in {folder}")

        return


    print(f"\n  Found {len(log_files)} log file(s).\n")

    summary = []


    for f in log_files:

        result = parse_log(f)

        if result:

            errors   = (result["level_counts"].get("ERROR", 0) +

                        result["level_counts"].get("CRITICAL", 0) +

                        result["level_counts"].get("FATAL", 0))

            warnings = (result["level_counts"].get("WARNING", 0) +

                        result["level_counts"].get("WARN", 0))

            summary.append({

                "file":     f.name,

                "lines":    result["total_lines"],

                "errors":   errors,

                "warnings": warnings,

            })


    print("\n" + "="*60)

    print(f"  BATCH SUMMARY  ({len(summary)} files)")

    print("="*60)

    print(f"  {'FILE':<30} {'LINES':>8} {'ERRORS':>8} {'WARNINGS':>10}")

    print("  " + "-"*56)


    for s in summary:

        print(f"  {s['file']:<30} {s['lines']:>8,} {s['errors']:>8,} "

              f"{s['warnings']:>10,}")


    total_errors   = sum(s["errors"]   for s in summary)

    total_warnings = sum(s["warnings"] for s in summary)

    print("  " + "-"*56)

    print(f"  {'TOTAL':<30} {'':>8} {total_errors:>8,} {total_warnings:>10,}")

    print("="*60)



# ============================================================

# MAIN MENU

# ============================================================


def print_menu():

    print("\n" + "-"*48)

    print("  LOG FILE ANALYZER")

    print("-"*48)

    print("  1. Analyze log file (full report)")

    print("  2. Log level summary")

    print("  3. Hourly activity chart")

    print("  4. Web log stats (IPs / status / paths)")

    print("  5. Show errors & critical entries")

    print("  6. Search keyword in log")

    print("  7. Repeated message patterns")

    print("  8. Batch analyze a folder")

    print("  9. Save report to JSON")

    print("  0. Exit")

    print("-"*48)



def main():

    print("\n" + "="*55)

    print("     LOG FILE ANALYZER")

    print("="*55)

    print("\n  Supports: Apache, Nginx, Python, Syslog, Generic logs")

    print("  Auto-detects format from first 20 lines.\n")


    last_result = None


    while True:

        print_menu()

        choice = input("  > ").strip()


        if choice in ["1","2","3","4","5","6","7","9"]:

            if not last_result or choice == "1":

                path = input("\n  Log file path: ").strip()

                if not path or not Path(path).exists():

                    print("  File not found.")

                    continue


                fmt_choice = input(

                    "  Format (auto/apache_access/apache_error/"

                    "nginx_access/python_log/syslog/generic): "

                ).strip() or "auto"


                kw = None

                if choice == "6":

                    kw = input("  Keyword to search: ").strip()


                lvl_filter = None

                max_lines  = 0


                ml = input("  Max lines to parse (0=all): ").strip()

                max_lines = int(ml) if ml.isdigit() else 0


                last_result = parse_log(path, fmt_choice, kw, lvl_filter, max_lines)


                if not last_result:

                    continue


        if choice == "1":

            full_report(last_result)


        elif choice == "2":

            display_level_summary(last_result)


        elif choice == "3":

            display_hourly_activity(last_result)


        elif choice == "4":

            display_web_stats(last_result)


        elif choice == "5":

            n = input("  Show last N errors (default 20): ").strip()

            n = int(n) if n.isdigit() else 20

            display_errors(last_result, n)


        elif choice == "6":

            kw = input("  Keyword to search: ").strip()

            if kw:

                # Re-parse with keyword

                last_result = parse_log(last_result["filepath"],

                                        last_result["format"],

                                        kw)

                if last_result:

                    display_keyword_hits(last_result, kw)


        elif choice == "7":

            n = input("  Top N patterns (default 15): ").strip()

            n = int(n) if n.isdigit() else 15

            display_patterns(last_result, n)


        elif choice == "8":

            folder = input("\n  Folder path: ").strip()

            batch_analyze(folder)


        elif choice == "9":

            if last_result:

                save_report(last_result)

            else:

                print("  No analysis data. Parse a file first.")


        elif choice == "0":

            print("\n  Goodbye!\n")

            break


        else:

            print("  Invalid choice.")



# ============================================================

# RUN

# ============================================================


if __name__ == "__main__":

    main()

No comments: