import re
import os
import json
from datetime import datetime
from pathlib import Path
from collections import defaultdict, Counter
# ============================================================
# CONFIGURATION
# ============================================================
REPORT_FILE = "log_analysis_report.json"
# Common log level patterns
LOG_LEVELS = ["CRITICAL", "ERROR", "WARNING", "WARN", "INFO", "DEBUG", "TRACE", "FATAL"]
# Built-in format patterns
LOG_FORMATS = {
"apache_access": r'(?P<ip>\S+) \S+ \S+ \[(?P<datetime>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+)[^"]*" (?P<status>\d{3}) (?P<size>\S+)',
"apache_error": r'\[(?P<datetime>[^\]]+)\] \[(?P<level>\w+)\] (?P<message>.*)',
"nginx_access": r'(?P<ip>\S+) - \S+ \[(?P<datetime>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+)[^"]*" (?P<status>\d{3}) (?P<size>\d+)',
"python_log": r'(?P<datetime>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}),\d+ (?P<level>\w+) (?P<logger>\S+) (?P<message>.*)',
"syslog": r'(?P<datetime>\w{3}\s+\d+ \d{2}:\d{2}:\d{2}) (?P<host>\S+) (?P<process>\S+): (?P<message>.*)',
"generic": r'(?P<datetime>\d{4}[-/]\d{2}[-/]\d{2}[T ]\d{2}:\d{2}:\d{2}).*?(?P<level>CRITICAL|ERROR|WARNING|WARN|INFO|DEBUG|TRACE|FATAL).*?(?P<message>.*)',
}
# ============================================================
# HELPERS
# ============================================================
def draw_bar(value, max_val, width=25, fill="█", empty="░"):
if max_val == 0:
return ""
filled = int((min(value, max_val) / max_val) * width)
return fill * filled + empty * (width - filled)
def format_size(size_bytes):
for unit in ["B", "KB", "MB", "GB"]:
if size_bytes < 1024:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024
return f"{size_bytes:.1f} GB"
def safe_read(filepath, encoding="utf-8"):
try:
with open(filepath, "r", encoding=encoding, errors="replace") as f:
return f.readlines()
except Exception as e:
print(f" Error reading file: {e}")
return []
# ============================================================
# AUTO-DETECT LOG FORMAT
# ============================================================
def detect_format(lines):
"""Try each format pattern on first 20 lines. Return best match."""
sample = lines[:20]
scores = {}
for fmt_name, pattern in LOG_FORMATS.items():
matches = sum(1 for line in sample if re.search(pattern, line))
scores[fmt_name] = matches
best = max(scores, key=scores.get)
if scores[best] > 0:
return best, scores[best]
return "generic", 0
# ============================================================
# CORE PARSER
# ============================================================
def parse_log(filepath, fmt_name=None, keyword_filter=None,
level_filter=None, max_lines=0):
"""
Parse a log file and return structured data.
Returns dict with entries, stats, errors.
"""
lines = safe_read(filepath)
if not lines:
return None
total_lines = len(lines)
print(f"\n File : {Path(filepath).name}")
print(f" Size : {format_size(Path(filepath).stat().st_size)}")
print(f" Lines : {total_lines:,}")
# Auto-detect format
if not fmt_name or fmt_name == "auto":
fmt_name, score = detect_format(lines)
print(f" Format : {fmt_name} (auto-detected, {score}/20 lines matched)")
else:
print(f" Format : {fmt_name}")
pattern = LOG_FORMATS.get(fmt_name, LOG_FORMATS["generic"])
# Limit lines if requested
if max_lines > 0:
lines = lines[:max_lines]
print(f" Parsing : first {max_lines:,} lines")
# Parse each line
entries = []
level_counts = Counter()
hour_counts = Counter()
ip_counts = Counter()
status_counts = Counter()
path_counts = Counter()
message_list = []
parse_errors = 0
keyword_hits = []
for i, line in enumerate(lines):
line = line.rstrip()
if not line:
continue
match = re.search(pattern, line, re.IGNORECASE)
if match:
entry = match.groupdict()
# Normalize level
level = entry.get("level", "").upper()
if not level:
# Try to find level anywhere in line
for lvl in LOG_LEVELS:
if lvl in line.upper():
level = lvl
break
entry["level"] = level or "UNKNOWN"
level_counts[entry["level"]] += 1
# Extract hour from datetime
dt_str = entry.get("datetime", "")
for fmt in ["%Y-%m-%d %H:%M:%S", "%d/%b/%Y:%H:%M:%S",
"%b %d %H:%M:%S", "%Y/%m/%d %H:%M:%S"]:
try:
dt = datetime.strptime(dt_str[:len(fmt)], fmt)
hour_counts[dt.strftime("%H:00")] += 1
entry["datetime_parsed"] = dt.strftime("%d-%m-%Y %H:%M:%S")
break
except:
continue
# Track IPs, status codes, paths (for web logs)
if "ip" in entry and entry["ip"]:
ip_counts[entry["ip"]] += 1
if "status" in entry and entry["status"]:
status_counts[entry["status"]] += 1
if "path" in entry and entry["path"]:
path_counts[entry["path"]] += 1
# Track messages
msg = entry.get("message", line[:120])
message_list.append(msg)
# Apply filters
if level_filter and entry["level"] not in [l.upper() for l in level_filter]:
continue
if keyword_filter and keyword_filter.lower() not in line.lower():
continue
entry["line_no"] = i + 1
entry["raw"] = line[:200]
entries.append(entry)
# Keyword tracking
if keyword_filter and keyword_filter.lower() in line.lower():
keyword_hits.append({"line": i + 1, "text": line[:200]})
else:
parse_errors += 1
parse_rate = round((1 - parse_errors / max(len(lines), 1)) * 100, 1)
print(f" Parsed : {len(entries):,} entries ({parse_rate}% success rate)")
return {
"filepath": str(filepath),
"filename": Path(filepath).name,
"total_lines": total_lines,
"parsed": len(entries),
"parse_errors": parse_errors,
"format": fmt_name,
"entries": entries,
"level_counts": dict(level_counts),
"hour_counts": dict(hour_counts),
"ip_counts": dict(ip_counts),
"status_counts":dict(status_counts),
"path_counts": dict(path_counts),
"message_list": message_list,
"keyword_hits": keyword_hits,
}
# ============================================================
# DISPLAY: LEVEL SUMMARY
# ============================================================
def display_level_summary(result):
counts = result["level_counts"]
total = sum(counts.values())
if not counts:
print("\n No log level data found.")
return
max_val = max(counts.values())
print("\n" + "="*55)
print(f" LOG LEVEL SUMMARY — {result['filename']}")
print("="*55)
print(f" {'LEVEL':<12} {'COUNT':>8} {'%':>6} BAR")
print(" " + "-"*51)
order = ["FATAL", "CRITICAL", "ERROR", "WARNING", "WARN",
"INFO", "DEBUG", "TRACE", "UNKNOWN"]
for level in order:
count = counts.get(level, 0)
if count == 0:
continue
pct = count / total * 100
bar = draw_bar(count, max_val)
print(f" {level:<12} {count:>8,} {pct:>5.1f}% {bar}")
print(" " + "-"*51)
print(f" {'TOTAL':<12} {total:>8,}")
print("="*55)
# Highlight issues
errors = counts.get("ERROR", 0) + counts.get("CRITICAL", 0) + counts.get("FATAL", 0)
warnings = counts.get("WARNING", 0) + counts.get("WARN", 0)
if errors > 0:
print(f"\n *** {errors:,} ERROR/CRITICAL/FATAL entries found!")
if warnings > 0:
print(f" *** {warnings:,} WARNING entries found.")
if errors == 0 and warnings == 0:
print("\n All clear — no errors or warnings detected.")
# ============================================================
# DISPLAY: HOURLY ACTIVITY
# ============================================================
def display_hourly_activity(result):
counts = result["hour_counts"]
if not counts:
print("\n No timestamp data found for hourly analysis.")
return
max_val = max(counts.values())
print("\n" + "="*55)
print(" HOURLY ACTIVITY")
print("="*55)
for hour in sorted(counts.keys()):
count = counts[hour]
bar = draw_bar(count, max_val, width=30)
print(f" {hour} {count:>6,} {bar}")
peak_hour = max(counts, key=counts.get)
quiet_hour = min(counts, key=counts.get)
print("="*55)
print(f" Peak hour : {peak_hour} ({counts[peak_hour]:,} entries)")
print(f" Quiet hour : {quiet_hour} ({counts[quiet_hour]:,} entries)")
# ============================================================
# DISPLAY: TOP IPs / PATHS / STATUS CODES
# ============================================================
def display_web_stats(result, top_n=10):
ip_counts = result.get("ip_counts", {})
status_counts = result.get("status_counts", {})
path_counts = result.get("path_counts", {})
if not ip_counts and not status_counts and not path_counts:
print("\n No web log data (IPs / status codes / paths) found.")
return
print("\n" + "="*55)
print(" WEB LOG STATISTICS")
print("="*55)
if ip_counts:
top_ips = sorted(ip_counts.items(), key=lambda x: x[1], reverse=True)[:top_n]
max_val = top_ips[0][1]
print(f"\n TOP {top_n} IP ADDRESSES:")
print(f" {'IP':<18} {'REQUESTS':>10} BAR")
print(" " + "-"*45)
for ip, count in top_ips:
bar = draw_bar(count, max_val, width=15)
print(f" {ip:<18} {count:>10,} {bar}")
if status_counts:
print(f"\n HTTP STATUS CODES:")
print(f" {'CODE':<8} {'COUNT':>10} MEANING")
print(" " + "-"*40)
meanings = {
"200": "OK", "201": "Created", "301": "Moved Permanently",
"302": "Found", "304": "Not Modified", "400": "Bad Request",
"401": "Unauthorized", "403": "Forbidden", "404": "Not Found",
"500": "Internal Server Error", "502": "Bad Gateway",
"503": "Service Unavailable"
}
for code, count in sorted(status_counts.items()):
meaning = meanings.get(code, "")
print(f" {code:<8} {count:>10,} {meaning}")
if path_counts:
top_paths = sorted(path_counts.items(), key=lambda x: x[1], reverse=True)[:top_n]
max_val = top_paths[0][1]
print(f"\n TOP {top_n} REQUESTED PATHS:")
print(f" {'PATH':<35} {'HITS':>8} BAR")
print(" " + "-"*55)
for path, count in top_paths:
display_path = path[:33] + ".." if len(path) > 35 else path
bar = draw_bar(count, max_val, width=10)
print(f" {display_path:<35} {count:>8,} {bar}")
print("="*55)
# ============================================================
# DISPLAY: RECENT ERRORS
# ============================================================
def display_errors(result, top_n=20):
entries = result["entries"]
errors = [e for e in entries
if e.get("level") in ("ERROR", "CRITICAL", "FATAL")]
if not errors:
print("\n No ERROR / CRITICAL / FATAL entries found.")
return
print("\n" + "="*65)
print(f" ERRORS & CRITICAL ENTRIES ({len(errors):,} total, showing last {top_n})")
print("="*65)
for e in errors[-top_n:]:
dt = e.get("datetime_parsed") or e.get("datetime", "")[:19]
msg = e.get("message") or e.get("raw", "")
msg = msg[:80]
print(f"\n [{e['level']}] Line {e.get('line_no','?')} {dt}")
print(f" {msg}")
print("="*65)
# ============================================================
# DISPLAY: KEYWORD SEARCH RESULTS
# ============================================================
def display_keyword_hits(result, keyword):
hits = result.get("keyword_hits", [])
if not hits:
print(f"\n No matches found for: '{keyword}'")
return
print("\n" + "="*65)
print(f" KEYWORD SEARCH: '{keyword}' ({len(hits):,} matches)")
print("="*65)
for hit in hits[:30]:
print(f"\n Line {hit['line']:>6}: {hit['text'][:100]}")
if len(hits) > 30:
print(f"\n ... and {len(hits) - 30} more matches.")
print("="*65)
# ============================================================
# DISPLAY: COMMON PATTERNS / REPEATED MESSAGES
# ============================================================
def display_patterns(result, top_n=15):
messages = result.get("message_list", [])
if not messages:
print("\n No messages extracted.")
return
# Normalize messages (strip numbers/IPs for better grouping)
def normalize(msg):
msg = re.sub(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', '<IP>', msg)
msg = re.sub(r'\b\d+\b', '<N>', msg)
msg = re.sub(r'"[^"]*"', '"<VAL>"', msg)
return msg[:80].strip()
normalized = [normalize(m) for m in messages if m.strip()]
counts = Counter(normalized)
top = counts.most_common(top_n)
max_val = top[0][1] if top else 1
print("\n" + "="*65)
print(f" MOST REPEATED LOG PATTERNS (Top {top_n})")
print("="*65)
print(f" {'COUNT':>7} MESSAGE PATTERN")
print(" " + "-"*60)
for pattern, count in top:
bar = draw_bar(count, max_val, width=12)
print(f" {count:>7,} {bar} {pattern}")
print("="*65)
# ============================================================
# FULL REPORT
# ============================================================
def full_report(result):
display_level_summary(result)
display_hourly_activity(result)
display_web_stats(result)
display_errors(result)
display_patterns(result)
# ============================================================
# SAVE REPORT TO JSON
# ============================================================
def save_report(result):
report = {
"filename": result["filename"],
"analyzed_at": datetime.now().strftime("%d-%m-%Y %H:%M:%S"),
"total_lines": result["total_lines"],
"parsed": result["parsed"],
"format": result["format"],
"level_counts": result["level_counts"],
"top_ips": dict(sorted(result["ip_counts"].items(),
key=lambda x: x[1], reverse=True)[:20]),
"status_codes": result["status_counts"],
"top_paths": dict(sorted(result["path_counts"].items(),
key=lambda x: x[1], reverse=True)[:20]),
"hourly": result["hour_counts"],
}
with open(REPORT_FILE, "w") as f:
json.dump(report, f, indent=2)
print(f"\n Report saved: {REPORT_FILE}")
# ============================================================
# BATCH ANALYZE (FOLDER)
# ============================================================
def batch_analyze(folder):
folder = Path(folder)
if not folder.is_dir():
print(" Invalid folder.")
return
log_files = (list(folder.glob("*.log")) +
list(folder.glob("*.txt")) +
list(folder.glob("*.out")))
if not log_files:
print(f" No .log / .txt / .out files in {folder}")
return
print(f"\n Found {len(log_files)} log file(s).\n")
summary = []
for f in log_files:
result = parse_log(f)
if result:
errors = (result["level_counts"].get("ERROR", 0) +
result["level_counts"].get("CRITICAL", 0) +
result["level_counts"].get("FATAL", 0))
warnings = (result["level_counts"].get("WARNING", 0) +
result["level_counts"].get("WARN", 0))
summary.append({
"file": f.name,
"lines": result["total_lines"],
"errors": errors,
"warnings": warnings,
})
print("\n" + "="*60)
print(f" BATCH SUMMARY ({len(summary)} files)")
print("="*60)
print(f" {'FILE':<30} {'LINES':>8} {'ERRORS':>8} {'WARNINGS':>10}")
print(" " + "-"*56)
for s in summary:
print(f" {s['file']:<30} {s['lines']:>8,} {s['errors']:>8,} "
f"{s['warnings']:>10,}")
total_errors = sum(s["errors"] for s in summary)
total_warnings = sum(s["warnings"] for s in summary)
print(" " + "-"*56)
print(f" {'TOTAL':<30} {'':>8} {total_errors:>8,} {total_warnings:>10,}")
print("="*60)
# ============================================================
# MAIN MENU
# ============================================================
def print_menu():
print("\n" + "-"*48)
print(" LOG FILE ANALYZER")
print("-"*48)
print(" 1. Analyze log file (full report)")
print(" 2. Log level summary")
print(" 3. Hourly activity chart")
print(" 4. Web log stats (IPs / status / paths)")
print(" 5. Show errors & critical entries")
print(" 6. Search keyword in log")
print(" 7. Repeated message patterns")
print(" 8. Batch analyze a folder")
print(" 9. Save report to JSON")
print(" 0. Exit")
print("-"*48)
def main():
print("\n" + "="*55)
print(" LOG FILE ANALYZER")
print("="*55)
print("\n Supports: Apache, Nginx, Python, Syslog, Generic logs")
print(" Auto-detects format from first 20 lines.\n")
last_result = None
while True:
print_menu()
choice = input(" > ").strip()
if choice in ["1","2","3","4","5","6","7","9"]:
if not last_result or choice == "1":
path = input("\n Log file path: ").strip()
if not path or not Path(path).exists():
print(" File not found.")
continue
fmt_choice = input(
" Format (auto/apache_access/apache_error/"
"nginx_access/python_log/syslog/generic): "
).strip() or "auto"
kw = None
if choice == "6":
kw = input(" Keyword to search: ").strip()
lvl_filter = None
max_lines = 0
ml = input(" Max lines to parse (0=all): ").strip()
max_lines = int(ml) if ml.isdigit() else 0
last_result = parse_log(path, fmt_choice, kw, lvl_filter, max_lines)
if not last_result:
continue
if choice == "1":
full_report(last_result)
elif choice == "2":
display_level_summary(last_result)
elif choice == "3":
display_hourly_activity(last_result)
elif choice == "4":
display_web_stats(last_result)
elif choice == "5":
n = input(" Show last N errors (default 20): ").strip()
n = int(n) if n.isdigit() else 20
display_errors(last_result, n)
elif choice == "6":
kw = input(" Keyword to search: ").strip()
if kw:
# Re-parse with keyword
last_result = parse_log(last_result["filepath"],
last_result["format"],
kw)
if last_result:
display_keyword_hits(last_result, kw)
elif choice == "7":
n = input(" Top N patterns (default 15): ").strip()
n = int(n) if n.isdigit() else 15
display_patterns(last_result, n)
elif choice == "8":
folder = input("\n Folder path: ").strip()
batch_analyze(folder)
elif choice == "9":
if last_result:
save_report(last_result)
else:
print(" No analysis data. Parse a file first.")
elif choice == "0":
print("\n Goodbye!\n")
break
else:
print(" Invalid choice.")
# ============================================================
# RUN
# ============================================================
if __name__ == "__main__":
main()
No comments:
Post a Comment